1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157 }
158 if !enable_sync {
159 tracing::warn!(
160 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161 );
162 }
163
164 return init_sqlite_to_postgres(source_url, target_url).await;
165 }
166 crate::SourceType::MongoDB => {
167 tracing::info!("Source type: MongoDB");
169
170 if !filter.is_empty() {
172 tracing::warn!(
173 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174 );
175 }
176 if drop_existing {
177 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178 }
179 if !enable_sync {
180 tracing::warn!(
181 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182 );
183 }
184
185 return init_mongodb_to_postgres(source_url, target_url).await;
186 }
187 crate::SourceType::MySQL => {
188 tracing::info!("Source type: MySQL");
190
191 if !filter.is_empty() {
193 tracing::warn!(
194 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195 );
196 }
197 if drop_existing {
198 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199 }
200 if !enable_sync {
201 tracing::warn!(
202 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203 );
204 }
205
206 return init_mysql_to_postgres(source_url, target_url).await;
207 }
208 }
209
210 crate::utils::validate_source_target_different(source_url, target_url)
212 .context("Source and target validation failed")?;
213 tracing::info!("✓ Verified source and target are different databases");
214
215 let temp_path =
218 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219 tracing::debug!("Using temp directory: {}", temp_path.display());
220
221 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222 .context("Failed to determine checkpoint location")?;
223
224 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226 let globals_file = temp_path.join("globals.sql");
227 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231 .context("Failed to remove SUPERUSER from globals dump")?;
232 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233 .context("Failed to remove restricted parameter settings from globals dump")?;
234 remove_restricted_role_grants(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted role grants from globals dump")?;
236 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239 tracing::info!("Step 2/4: Restoring global objects to target...");
241 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243 tracing::info!("Step 3/4: Discovering databases...");
245 let all_databases = {
246 let source_client = postgres::connect_with_retry(source_url).await?;
248 migration::list_databases(&source_client).await?
249 }; let databases: Vec<_> = all_databases
253 .into_iter()
254 .filter(|db| filter.should_replicate_database(&db.name))
255 .collect();
256
257 if databases.is_empty() {
258 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259 if filter.is_empty() {
260 tracing::warn!("⚠ No user databases found on source");
261 tracing::warn!(" This is unusual - the source database appears empty");
262 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
263 } else {
264 tracing::warn!("⚠ No databases matched the filter criteria");
265 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
266 }
267 tracing::info!("✅ Initial replication complete (no databases to replicate)");
268 return Ok(());
269 }
270
271 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272 let filter_hash = filter.fingerprint();
273 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274 source_url,
275 target_url,
276 filter_hash,
277 drop_existing,
278 enable_sync,
279 );
280
281 let mut checkpoint_state = if allow_resume {
282 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283 Some(existing) => {
284 match existing.validate(&checkpoint_metadata, &database_names) {
286 Ok(()) => {
287 if existing.completed_count() > 0 {
289 tracing::info!(
290 "Resume checkpoint found: {}/{} databases already replicated",
291 existing.completed_count(),
292 existing.total_databases()
293 );
294 } else {
295 tracing::info!(
296 "Resume checkpoint found but no databases marked complete yet"
297 );
298 }
299 existing
300 }
301 Err(e) => {
302 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304 tracing::warn!(
305 " Previous run configuration differs from current configuration"
306 );
307 tracing::warn!(" - Schema-only tables may have changed");
308 tracing::warn!(" - Time filters may have changed");
309 tracing::warn!(" - Table selection may have changed");
310 tracing::warn!(" Error: {}", e);
311 tracing::info!("");
312 tracing::info!(
313 "✓ Automatically discarding old checkpoint and starting fresh"
314 );
315 checkpoint::remove_checkpoint(&checkpoint_path)?;
316 checkpoint::InitCheckpoint::new(
317 checkpoint_metadata.clone(),
318 &database_names,
319 )
320 }
321 }
322 }
323 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324 }
325 } else {
326 if checkpoint_path.exists() {
327 tracing::info!(
328 "--no-resume supplied: discarding previous checkpoint at {}",
329 checkpoint_path.display()
330 );
331 }
332 checkpoint::remove_checkpoint(&checkpoint_path)?;
333 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334 };
335
336 checkpoint_state
338 .save(&checkpoint_path)
339 .context("Failed to persist checkpoint state")?;
340
341 tracing::info!("Found {} database(s) to replicate", databases.len());
342
343 if !skip_confirmation {
345 tracing::info!("Analyzing database sizes...");
346 let size_estimates = {
347 let source_client = postgres::connect_with_retry(source_url).await?;
349 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350 .await?
351 }; if !confirm_replication(&size_estimates)? {
354 bail!("Replication cancelled by user");
355 }
356 }
357
358 tracing::info!("Step 4/4: Replicating databases...");
360 for (idx, db_info) in databases.iter().enumerate() {
361 let filtered_tables = filter.predicate_tables(&db_info.name);
362 if checkpoint_state.is_completed(&db_info.name) {
363 tracing::info!(
364 "Skipping database '{}' (already completed per checkpoint)",
365 db_info.name
366 );
367 continue;
368 }
369 tracing::info!(
370 "Replicating database {}/{}: '{}'",
371 idx + 1,
372 databases.len(),
373 db_info.name
374 );
375
376 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380 {
383 let target_client = postgres::connect_with_retry(target_url).await?;
384
385 crate::utils::validate_postgres_identifier(&db_info.name)
387 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389 let create_query = format!(
391 "CREATE DATABASE {}",
392 crate::utils::quote_ident(&db_info.name)
393 );
394 match target_client.execute(&create_query, &[]).await {
395 Ok(_) => {
396 tracing::info!(" Created database '{}'", db_info.name);
397 }
398 Err(err) => {
399 if let Some(db_error) = err.as_db_error() {
401 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402 tracing::info!(
404 " Database '{}' already exists on target",
405 db_info.name
406 );
407
408 if database_is_empty(target_url, &db_info.name).await? {
410 tracing::info!(
411 " Database '{}' is empty, proceeding with restore",
412 db_info.name
413 );
414 } else {
415 let should_drop = if drop_existing {
417 true
419 } else if skip_confirmation {
420 tracing::info!(
422 " Auto-confirming drop for database '{}' (--yes flag)",
423 db_info.name
424 );
425 true
426 } else {
427 prompt_drop_database(&db_info.name)?
429 };
430
431 if should_drop {
432 drop_database_if_exists(&target_client, &db_info.name).await?;
433
434 let create_query = format!(
436 "CREATE DATABASE {}",
437 crate::utils::quote_ident(&db_info.name)
438 );
439 target_client
440 .execute(&create_query, &[])
441 .await
442 .with_context(|| {
443 format!(
444 "Failed to create database '{}' after drop",
445 db_info.name
446 )
447 })?;
448 tracing::info!(" Created database '{}'", db_info.name);
449 } else {
450 bail!("Aborted: Database '{}' already exists", db_info.name);
451 }
452 }
453 } else {
454 return Err(err).with_context(|| {
456 format!("Failed to create database '{}'", db_info.name)
457 });
458 }
459 } else {
460 return Err(err).with_context(|| {
462 format!("Failed to create database '{}'", db_info.name)
463 });
464 }
465 }
466 }
467 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
471 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472 migration::dump_schema(
473 &source_db_url,
474 &db_info.name,
475 schema_file.to_str().unwrap(),
476 &filter,
477 )
478 .await?;
479
480 tracing::info!(" Restoring schema for '{}'...", db_info.name);
481 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483 tracing::info!(" Dumping data for '{}'...", db_info.name);
485 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486 migration::dump_data(
487 &source_db_url,
488 &db_info.name,
489 data_dir.to_str().unwrap(),
490 &filter,
491 )
492 .await?;
493
494 tracing::info!(" Restoring data for '{}'...", db_info.name);
495 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497 if !filtered_tables.is_empty() {
498 tracing::info!(
499 " Applying filtered replication for {} table(s)...",
500 filtered_tables.len()
501 );
502 migration::filtered::copy_filtered_tables(
503 &source_db_url,
504 &target_db_url,
505 &filtered_tables,
506 )
507 .await?;
508 }
509
510 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512 checkpoint_state.mark_completed(&db_info.name);
513 checkpoint_state
514 .save(&checkpoint_path)
515 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516 }
517
518 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521 tracing::warn!("Failed to clean up temp directory: {}", e);
522 }
524
525 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526 tracing::warn!("Failed to remove checkpoint state: {}", err);
527 }
528
529 tracing::info!("✅ Initial replication complete");
530
531 let mut should_enable_sync = enable_sync;
533 if enable_sync {
534 tracing::info!("Checking target wal_level for logical replication...");
535 let target_wal_level = {
536 let target_client = postgres::connect_with_retry(target_url).await?;
538 postgres::check_wal_level(&target_client).await?
539 }; if target_wal_level != "logical" {
542 tracing::warn!("");
543 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
545 tracing::warn!("");
546 tracing::warn!(" To fix this:");
547 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
548 tracing::warn!(" 2. Restart PostgreSQL server");
549 tracing::warn!(
550 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551 );
552 tracing::warn!("");
553 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554 should_enable_sync = false;
555 }
556 }
557
558 if should_enable_sync {
560 tracing::info!("");
561 tracing::info!("========================================");
562 tracing::info!("Step 5/5: Setting up continuous replication...");
563 tracing::info!("========================================");
564 tracing::info!("");
565
566 crate::commands::sync(
568 source_url,
569 target_url,
570 Some(filter),
571 None,
572 None,
573 None,
574 false,
575 )
576 .await
577 .context("Failed to set up continuous replication")?;
578
579 tracing::info!("");
580 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581 } else {
582 tracing::info!("");
583 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584 tracing::info!(" To enable it later, run:");
585 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
586 }
587
588 Ok(())
589}
590
591fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593 let parts: Vec<&str> = url.split('?').collect();
598 let base_url = parts[0];
599 let params = if parts.len() > 1 {
600 Some(parts[1])
601 } else {
602 None
603 };
604
605 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607 if url_parts.len() != 2 {
608 anyhow::bail!("Invalid connection URL format");
609 }
610
611 let mut new_url = format!("{}/{}", url_parts[1], new_database);
613 if let Some(p) = params {
614 new_url = format!("{}?{}", new_url, p);
615 }
616
617 Ok(new_url)
618}
619
620fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637 use std::time::Duration;
638
639 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643 println!();
645 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646 println!("{}", "─".repeat(50));
647
648 for size in sizes {
650 println!(
651 "{:<20} {:<12} {:<15}",
652 size.name,
653 size.size_human,
654 migration::format_duration(size.estimated_duration)
655 );
656 }
657
658 println!("{}", "─".repeat(50));
660 println!(
661 "Total: {} (estimated {})",
662 migration::format_bytes(total_bytes),
663 migration::format_duration(total_duration)
664 );
665 println!();
666
667 print!("Proceed with replication? [y/N]: ");
669 io::stdout().flush()?;
670
671 let mut input = String::new();
672 io::stdin()
673 .read_line(&mut input)
674 .context("Failed to read user input")?;
675
676 Ok(input.trim().to_lowercase() == "y")
677}
678
679async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
681 let db_url = replace_database_in_url(target_url, db_name)?;
683 let client = postgres::connect_with_retry(&db_url).await?;
684
685 let query = "
686 SELECT COUNT(*)
687 FROM information_schema.tables
688 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
689 ";
690
691 let row = client.query_one(query, &[]).await?;
692 let count: i64 = row.get(0);
693
694 Ok(count == 0)
695}
696
697fn prompt_drop_database(db_name: &str) -> Result<bool> {
699 use std::io::{self, Write};
700
701 print!(
702 "\nWarning: Database '{}' already exists on target and contains data.\n\
703 Drop and recreate database? This will delete all existing data. [y/N]: ",
704 db_name
705 );
706 io::stdout().flush()?;
707
708 let mut input = String::new();
709 io::stdin().read_line(&mut input)?;
710
711 Ok(input.trim().eq_ignore_ascii_case("y"))
712}
713
714async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
716 crate::utils::validate_postgres_identifier(db_name)
718 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
719
720 tracing::info!(" Dropping existing database '{}'...", db_name);
721
722 let terminate_query = "
725 SELECT pg_terminate_backend(sa.pid)
726 FROM pg_stat_activity sa
727 JOIN pg_roles r ON sa.usename = r.rolname
728 WHERE sa.datname = $1
729 AND sa.pid <> pg_backend_pid()
730 AND NOT r.rolsuper
731 ";
732 target_conn.execute(terminate_query, &[&db_name]).await?;
733
734 let remaining_query = "
736 SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
737 FROM pg_stat_activity sa
738 WHERE sa.datname = $1
739 AND sa.pid <> pg_backend_pid()
740 ";
741 let row = target_conn
742 .query_one(remaining_query, &[&db_name])
743 .await
744 .context("Failed to check remaining connections")?;
745 let remaining_count: i64 = row.get(0);
746 let remaining_users: Option<String> = row.get(1);
747
748 if remaining_count > 0 {
749 let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
750 bail!(
751 "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
752 These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
753 This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
754 processes maintain superuser connections.\n\n\
755 To resolve this:\n\
756 1. Wait a few minutes and retry (system connections may be temporary)\n\
757 2. Ask your database administrator to terminate the blocking sessions:\n\
758 SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
759 3. If using AWS RDS, check for RDS-managed connections in the RDS console",
760 db_name,
761 remaining_count,
762 users,
763 db_name
764 );
765 }
766
767 let drop_query = format!(
769 "DROP DATABASE IF EXISTS {}",
770 crate::utils::quote_ident(db_name)
771 );
772 target_conn
773 .execute(&drop_query, &[])
774 .await
775 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
776
777 tracing::info!(" ✓ Database '{}' dropped", db_name);
778 Ok(())
779}
780
781pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
829 tracing::info!("Starting SQLite to PostgreSQL migration...");
830
831 tracing::info!("Step 1/4: Validating SQLite database...");
833 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
834 .context("SQLite file validation failed")?;
835 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
836
837 tracing::info!("Step 2/4: Opening SQLite database...");
839 let sqlite_conn =
840 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
841 tracing::info!(" ✓ SQLite database opened (read-only mode)");
842
843 tracing::info!("Step 3/4: Discovering tables...");
845 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
846 .context("Failed to list tables from SQLite database")?;
847
848 if tables.is_empty() {
849 tracing::warn!("⚠ No tables found in SQLite database");
850 tracing::info!("✅ Migration complete (no tables to migrate)");
851 return Ok(());
852 }
853
854 tracing::info!("Found {} table(s) to migrate", tables.len());
855
856 let target_client = postgres::connect_with_retry(target_url).await?;
858 tracing::info!(" ✓ Connected to PostgreSQL target");
859
860 tracing::info!("Step 4/4: Migrating tables...");
862 for (idx, table_name) in tables.iter().enumerate() {
863 tracing::info!(
864 "Migrating table {}/{}: '{}'",
865 idx + 1,
866 tables.len(),
867 table_name
868 );
869
870 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
872 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
873
874 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
875
876 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
878 .await
879 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
880
881 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
882
883 if !rows.is_empty() {
884 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
886 .await
887 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
888
889 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
890 } else {
891 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
892 }
893 }
894
895 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
896 tracing::info!(
897 " Migrated {} table(s) from '{}' to PostgreSQL",
898 tables.len(),
899 sqlite_path
900 );
901
902 Ok(())
903}
904
905pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
956 tracing::info!("Starting MongoDB to PostgreSQL migration...");
957
958 tracing::info!("Step 1/5: Validating MongoDB connection...");
960 let client = crate::mongodb::connect_mongodb(mongo_url)
961 .await
962 .context("MongoDB connection failed")?;
963 tracing::info!(" ✓ MongoDB connection validated");
964
965 tracing::info!("Step 2/5: Extracting database name...");
967 let db_name = crate::mongodb::extract_database_name(mongo_url)
968 .await
969 .context("Failed to parse MongoDB connection string")?
970 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
971 tracing::info!(" ✓ Database name: '{}'", db_name);
972
973 tracing::info!("Step 3/5: Discovering collections...");
975 let db = client.database(&db_name);
976 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
977 .await
978 .context("Failed to list collections from MongoDB database")?;
979
980 if collections.is_empty() {
981 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
982 tracing::info!("✅ Migration complete (no collections to migrate)");
983 return Ok(());
984 }
985
986 tracing::info!("Found {} collection(s) to migrate", collections.len());
987
988 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
990 let target_client = postgres::connect_with_retry(target_url).await?;
991 tracing::info!(" ✓ Connected to PostgreSQL target");
992
993 tracing::info!("Step 5/5: Migrating collections...");
995 for (idx, collection_name) in collections.iter().enumerate() {
996 tracing::info!(
997 "Migrating collection {}/{}: '{}'",
998 idx + 1,
999 collections.len(),
1000 collection_name
1001 );
1002
1003 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1005 .await
1006 .with_context(|| {
1007 format!(
1008 "Failed to convert collection '{}' to JSONB",
1009 collection_name
1010 )
1011 })?;
1012
1013 tracing::info!(
1014 " ✓ Converted {} documents from '{}'",
1015 rows.len(),
1016 collection_name
1017 );
1018
1019 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1021 .await
1022 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1023
1024 tracing::info!(
1025 " ✓ Created JSONB table '{}' in PostgreSQL",
1026 collection_name
1027 );
1028
1029 if !rows.is_empty() {
1030 crate::jsonb::writer::insert_jsonb_batch(
1032 &target_client,
1033 collection_name,
1034 rows,
1035 "mongodb",
1036 )
1037 .await
1038 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1039
1040 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1041 } else {
1042 tracing::info!(
1043 " ✓ Collection '{}' is empty (no documents to insert)",
1044 collection_name
1045 );
1046 }
1047 }
1048
1049 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1050 tracing::info!(
1051 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1052 collections.len(),
1053 db_name
1054 );
1055
1056 Ok(())
1057}
1058
1059pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1110 tracing::info!("Starting MySQL to PostgreSQL replication...");
1111
1112 tracing::info!("Step 1/5: Validating MySQL connection...");
1114 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1115 .await
1116 .context("MySQL connection failed")?;
1117 tracing::info!(" ✓ MySQL connection validated");
1118
1119 tracing::info!("Step 2/5: Extracting database name...");
1121 let db_name = crate::mysql::extract_database_name(mysql_url)
1122 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1123 tracing::info!(" ✓ Database name: '{}'", db_name);
1124
1125 tracing::info!("Step 3/5: Discovering tables...");
1127 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1128 .await
1129 .context("Failed to list tables from MySQL database")?;
1130
1131 if tables.is_empty() {
1132 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1133 tracing::info!("✅ Replication complete (no tables to replicate)");
1134 return Ok(());
1135 }
1136
1137 tracing::info!("Found {} table(s) to replicate", tables.len());
1138
1139 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1141 let target_client = postgres::connect_with_retry(target_url).await?;
1142 tracing::info!(" ✓ Connected to PostgreSQL target");
1143
1144 tracing::info!("Step 5/5: Replicating tables...");
1146 for (idx, table_name) in tables.iter().enumerate() {
1147 tracing::info!(
1148 "Replicating table {}/{}: '{}'",
1149 idx + 1,
1150 tables.len(),
1151 table_name
1152 );
1153
1154 let rows =
1156 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1157 .await
1158 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1159
1160 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1161
1162 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1164 .await
1165 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1166
1167 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1168
1169 if !rows.is_empty() {
1170 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1172 .await
1173 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1174
1175 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1176 } else {
1177 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1178 }
1179 }
1180
1181 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1182 tracing::info!(
1183 " Replicated {} table(s) from database '{}' to PostgreSQL",
1184 tables.len(),
1185 db_name
1186 );
1187
1188 Ok(())
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193 use super::*;
1194
1195 #[tokio::test]
1196 #[ignore]
1197 async fn test_init_replicates_database() {
1198 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1199 let target = std::env::var("TEST_TARGET_URL").unwrap();
1200
1201 let filter = crate::filters::ReplicationFilter::empty();
1203 let result = init(&source, &target, true, filter, false, false, true, false).await;
1204 assert!(result.is_ok());
1205 }
1206
1207 #[test]
1208 fn test_replace_database_in_url() {
1209 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1210 let result = replace_database_in_url(url, "newdb").unwrap();
1211 assert_eq!(
1212 result,
1213 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1214 );
1215
1216 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1217 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1218 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1219 }
1220
1221 #[tokio::test]
1222 #[ignore]
1223 async fn test_database_is_empty() {
1224 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1225
1226 let result = database_is_empty(&url, "postgres").await;
1229 assert!(result.is_ok());
1230 }
1231}