1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157 }
158 if !enable_sync {
159 tracing::warn!(
160 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161 );
162 }
163
164 return init_sqlite_to_postgres(source_url, target_url).await;
165 }
166 crate::SourceType::MongoDB => {
167 tracing::info!("Source type: MongoDB");
169
170 if !filter.is_empty() {
172 tracing::warn!(
173 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174 );
175 }
176 if drop_existing {
177 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178 }
179 if !enable_sync {
180 tracing::warn!(
181 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182 );
183 }
184
185 return init_mongodb_to_postgres(source_url, target_url).await;
186 }
187 crate::SourceType::MySQL => {
188 tracing::info!("Source type: MySQL");
190
191 if !filter.is_empty() {
193 tracing::warn!(
194 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195 );
196 }
197 if drop_existing {
198 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199 }
200 if !enable_sync {
201 tracing::warn!(
202 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203 );
204 }
205
206 return init_mysql_to_postgres(source_url, target_url).await;
207 }
208 }
209
210 crate::utils::validate_source_target_different(source_url, target_url)
212 .context("Source and target validation failed")?;
213 tracing::info!("✓ Verified source and target are different databases");
214
215 let temp_path =
218 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219 tracing::debug!("Using temp directory: {}", temp_path.display());
220
221 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222 .context("Failed to determine checkpoint location")?;
223
224 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226 let globals_file = temp_path.join("globals.sql");
227 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231 .context("Failed to remove SUPERUSER from globals dump")?;
232 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233 .context("Failed to remove restricted parameter settings from globals dump")?;
234 remove_restricted_role_grants(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted role grants from globals dump")?;
236 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239 tracing::info!("Step 2/4: Restoring global objects to target...");
241 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243 tracing::info!("Step 3/4: Discovering databases...");
245 let all_databases = {
246 let source_client = postgres::connect_with_retry(source_url).await?;
248 migration::list_databases(&source_client).await?
249 }; let databases: Vec<_> = all_databases
253 .into_iter()
254 .filter(|db| filter.should_replicate_database(&db.name))
255 .collect();
256
257 if databases.is_empty() {
258 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259 if filter.is_empty() {
260 tracing::warn!("⚠ No user databases found on source");
261 tracing::warn!(" This is unusual - the source database appears empty");
262 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
263 } else {
264 tracing::warn!("⚠ No databases matched the filter criteria");
265 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
266 }
267 tracing::info!("✅ Initial replication complete (no databases to replicate)");
268 return Ok(());
269 }
270
271 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272 let filter_hash = filter.fingerprint();
273 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274 source_url,
275 target_url,
276 filter_hash,
277 drop_existing,
278 enable_sync,
279 );
280
281 let mut checkpoint_state = if allow_resume {
282 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283 Some(existing) => {
284 match existing.validate(&checkpoint_metadata, &database_names) {
286 Ok(()) => {
287 if existing.completed_count() > 0 {
289 tracing::info!(
290 "Resume checkpoint found: {}/{} databases already replicated",
291 existing.completed_count(),
292 existing.total_databases()
293 );
294 } else {
295 tracing::info!(
296 "Resume checkpoint found but no databases marked complete yet"
297 );
298 }
299 existing
300 }
301 Err(e) => {
302 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304 tracing::warn!(
305 " Previous run configuration differs from current configuration"
306 );
307 tracing::warn!(" - Schema-only tables may have changed");
308 tracing::warn!(" - Time filters may have changed");
309 tracing::warn!(" - Table selection may have changed");
310 tracing::warn!(" Error: {}", e);
311 tracing::info!("");
312 tracing::info!(
313 "✓ Automatically discarding old checkpoint and starting fresh"
314 );
315 checkpoint::remove_checkpoint(&checkpoint_path)?;
316 checkpoint::InitCheckpoint::new(
317 checkpoint_metadata.clone(),
318 &database_names,
319 )
320 }
321 }
322 }
323 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324 }
325 } else {
326 if checkpoint_path.exists() {
327 tracing::info!(
328 "--no-resume supplied: discarding previous checkpoint at {}",
329 checkpoint_path.display()
330 );
331 }
332 checkpoint::remove_checkpoint(&checkpoint_path)?;
333 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334 };
335
336 checkpoint_state
338 .save(&checkpoint_path)
339 .context("Failed to persist checkpoint state")?;
340
341 tracing::info!("Found {} database(s) to replicate", databases.len());
342
343 if !skip_confirmation {
345 tracing::info!("Analyzing database sizes...");
346 let size_estimates = {
347 let source_client = postgres::connect_with_retry(source_url).await?;
349 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
350 .await?
351 }; if !confirm_replication(&size_estimates)? {
354 bail!("Replication cancelled by user");
355 }
356 }
357
358 tracing::info!("Step 4/4: Replicating databases...");
360 for (idx, db_info) in databases.iter().enumerate() {
361 let filtered_tables = filter.predicate_tables(&db_info.name);
362 if checkpoint_state.is_completed(&db_info.name) {
363 tracing::info!(
364 "Skipping database '{}' (already completed per checkpoint)",
365 db_info.name
366 );
367 continue;
368 }
369 tracing::info!(
370 "Replicating database {}/{}: '{}'",
371 idx + 1,
372 databases.len(),
373 db_info.name
374 );
375
376 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
378 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
379
380 {
383 let target_client = postgres::connect_with_retry(target_url).await?;
384
385 crate::utils::validate_postgres_identifier(&db_info.name)
387 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
388
389 let create_query = format!(
391 "CREATE DATABASE {}",
392 crate::utils::quote_ident(&db_info.name)
393 );
394 match target_client.execute(&create_query, &[]).await {
395 Ok(_) => {
396 tracing::info!(" Created database '{}'", db_info.name);
397 }
398 Err(err) => {
399 if let Some(db_error) = err.as_db_error() {
401 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
402 tracing::info!(
404 " Database '{}' already exists on target",
405 db_info.name
406 );
407
408 if database_is_empty(target_url, &db_info.name).await? {
410 tracing::info!(
411 " Database '{}' is empty, proceeding with restore",
412 db_info.name
413 );
414 } else {
415 let should_drop = if drop_existing {
417 true
419 } else if skip_confirmation {
420 tracing::info!(
422 " Auto-confirming drop for database '{}' (--yes flag)",
423 db_info.name
424 );
425 true
426 } else {
427 prompt_drop_database(&db_info.name)?
429 };
430
431 if should_drop {
432 drop_database_if_exists(&target_client, &db_info.name).await?;
433
434 let create_query = format!(
436 "CREATE DATABASE {}",
437 crate::utils::quote_ident(&db_info.name)
438 );
439 target_client
440 .execute(&create_query, &[])
441 .await
442 .with_context(|| {
443 format!(
444 "Failed to create database '{}' after drop",
445 db_info.name
446 )
447 })?;
448 tracing::info!(" Created database '{}'", db_info.name);
449 } else {
450 bail!("Aborted: Database '{}' already exists", db_info.name);
451 }
452 }
453 } else {
454 return Err(err).with_context(|| {
456 format!("Failed to create database '{}'", db_info.name)
457 });
458 }
459 } else {
460 return Err(err).with_context(|| {
462 format!("Failed to create database '{}'", db_info.name)
463 });
464 }
465 }
466 }
467 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
471 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
472 migration::dump_schema(
473 &source_db_url,
474 &db_info.name,
475 schema_file.to_str().unwrap(),
476 &filter,
477 )
478 .await?;
479
480 tracing::info!(" Restoring schema for '{}'...", db_info.name);
481 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
482
483 tracing::info!(" Dumping data for '{}'...", db_info.name);
485 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
486 migration::dump_data(
487 &source_db_url,
488 &db_info.name,
489 data_dir.to_str().unwrap(),
490 &filter,
491 )
492 .await?;
493
494 tracing::info!(" Restoring data for '{}'...", db_info.name);
495 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
496
497 if !filtered_tables.is_empty() {
498 tracing::info!(
499 " Applying filtered replication for {} table(s)...",
500 filtered_tables.len()
501 );
502 migration::filtered::copy_filtered_tables(
503 &source_db_url,
504 &target_db_url,
505 &filtered_tables,
506 )
507 .await?;
508 }
509
510 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
511
512 checkpoint_state.mark_completed(&db_info.name);
513 checkpoint_state
514 .save(&checkpoint_path)
515 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
516 }
517
518 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
521 tracing::warn!("Failed to clean up temp directory: {}", e);
522 }
524
525 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
526 tracing::warn!("Failed to remove checkpoint state: {}", err);
527 }
528
529 tracing::info!("✅ Initial replication complete");
530
531 let mut should_enable_sync = enable_sync;
533 if enable_sync {
534 tracing::info!("Checking target wal_level for logical replication...");
535 let target_wal_level = {
536 let target_client = postgres::connect_with_retry(target_url).await?;
538 postgres::check_wal_level(&target_client).await?
539 }; if target_wal_level != "logical" {
542 tracing::warn!("");
543 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
544 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
545 tracing::warn!("");
546 tracing::warn!(" To fix this:");
547 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
548 tracing::warn!(" 2. Restart PostgreSQL server");
549 tracing::warn!(
550 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
551 );
552 tracing::warn!("");
553 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
554 should_enable_sync = false;
555 }
556 }
557
558 if should_enable_sync {
560 tracing::info!("");
561 tracing::info!("========================================");
562 tracing::info!("Step 5/5: Setting up continuous replication...");
563 tracing::info!("========================================");
564 tracing::info!("");
565
566 crate::commands::sync(
568 source_url,
569 target_url,
570 Some(filter),
571 None,
572 None,
573 None,
574 false,
575 )
576 .await
577 .context("Failed to set up continuous replication")?;
578
579 tracing::info!("");
580 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
581 } else {
582 tracing::info!("");
583 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
584 tracing::info!(" To enable it later, run:");
585 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
586 }
587
588 Ok(())
589}
590
591fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
593 let parts: Vec<&str> = url.split('?').collect();
598 let base_url = parts[0];
599 let params = if parts.len() > 1 {
600 Some(parts[1])
601 } else {
602 None
603 };
604
605 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
607 if url_parts.len() != 2 {
608 anyhow::bail!("Invalid connection URL format");
609 }
610
611 let mut new_url = format!("{}/{}", url_parts[1], new_database);
613 if let Some(p) = params {
614 new_url = format!("{}?{}", new_url, p);
615 }
616
617 Ok(new_url)
618}
619
620fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
637 use std::time::Duration;
638
639 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
641 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
642
643 println!();
645 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
646 println!("{}", "─".repeat(50));
647
648 for size in sizes {
650 println!(
651 "{:<20} {:<12} {:<15}",
652 size.name,
653 size.size_human,
654 migration::format_duration(size.estimated_duration)
655 );
656 }
657
658 println!("{}", "─".repeat(50));
660 println!(
661 "Total: {} (estimated {})",
662 migration::format_bytes(total_bytes),
663 migration::format_duration(total_duration)
664 );
665 println!();
666
667 print!("Proceed with replication? [y/N]: ");
669 io::stdout().flush()?;
670
671 let mut input = String::new();
672 io::stdin()
673 .read_line(&mut input)
674 .context("Failed to read user input")?;
675
676 Ok(input.trim().to_lowercase() == "y")
677}
678
679async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
681 let db_url = replace_database_in_url(target_url, db_name)?;
683 let client = postgres::connect_with_retry(&db_url).await?;
684
685 let query = "
686 SELECT COUNT(*)
687 FROM information_schema.tables
688 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
689 ";
690
691 let row = client.query_one(query, &[]).await?;
692 let count: i64 = row.get(0);
693
694 Ok(count == 0)
695}
696
697fn prompt_drop_database(db_name: &str) -> Result<bool> {
699 use std::io::{self, Write};
700
701 print!(
702 "\nWarning: Database '{}' already exists on target and contains data.\n\
703 Drop and recreate database? This will delete all existing data. [y/N]: ",
704 db_name
705 );
706 io::stdout().flush()?;
707
708 let mut input = String::new();
709 io::stdin().read_line(&mut input)?;
710
711 Ok(input.trim().eq_ignore_ascii_case("y"))
712}
713
714async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
716 crate::utils::validate_postgres_identifier(db_name)
718 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
719
720 tracing::info!(" Dropping existing database '{}'...", db_name);
721
722 let terminate_query = "
724 SELECT pg_terminate_backend(pid)
725 FROM pg_stat_activity
726 WHERE datname = $1 AND pid <> pg_backend_pid()
727 ";
728 target_conn.execute(terminate_query, &[&db_name]).await?;
729
730 let drop_query = format!(
732 "DROP DATABASE IF EXISTS {}",
733 crate::utils::quote_ident(db_name)
734 );
735 target_conn
736 .execute(&drop_query, &[])
737 .await
738 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
739
740 tracing::info!(" ✓ Database '{}' dropped", db_name);
741 Ok(())
742}
743
744pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
792 tracing::info!("Starting SQLite to PostgreSQL migration...");
793
794 tracing::info!("Step 1/4: Validating SQLite database...");
796 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
797 .context("SQLite file validation failed")?;
798 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
799
800 tracing::info!("Step 2/4: Opening SQLite database...");
802 let sqlite_conn =
803 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
804 tracing::info!(" ✓ SQLite database opened (read-only mode)");
805
806 tracing::info!("Step 3/4: Discovering tables...");
808 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
809 .context("Failed to list tables from SQLite database")?;
810
811 if tables.is_empty() {
812 tracing::warn!("⚠ No tables found in SQLite database");
813 tracing::info!("✅ Migration complete (no tables to migrate)");
814 return Ok(());
815 }
816
817 tracing::info!("Found {} table(s) to migrate", tables.len());
818
819 let target_client = postgres::connect_with_retry(target_url).await?;
821 tracing::info!(" ✓ Connected to PostgreSQL target");
822
823 tracing::info!("Step 4/4: Migrating tables...");
825 for (idx, table_name) in tables.iter().enumerate() {
826 tracing::info!(
827 "Migrating table {}/{}: '{}'",
828 idx + 1,
829 tables.len(),
830 table_name
831 );
832
833 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
835 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
836
837 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
838
839 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
841 .await
842 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
843
844 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
845
846 if !rows.is_empty() {
847 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
849 .await
850 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
851
852 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
853 } else {
854 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
855 }
856 }
857
858 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
859 tracing::info!(
860 " Migrated {} table(s) from '{}' to PostgreSQL",
861 tables.len(),
862 sqlite_path
863 );
864
865 Ok(())
866}
867
868pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
919 tracing::info!("Starting MongoDB to PostgreSQL migration...");
920
921 tracing::info!("Step 1/5: Validating MongoDB connection...");
923 let client = crate::mongodb::connect_mongodb(mongo_url)
924 .await
925 .context("MongoDB connection failed")?;
926 tracing::info!(" ✓ MongoDB connection validated");
927
928 tracing::info!("Step 2/5: Extracting database name...");
930 let db_name = crate::mongodb::extract_database_name(mongo_url)
931 .await
932 .context("Failed to parse MongoDB connection string")?
933 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
934 tracing::info!(" ✓ Database name: '{}'", db_name);
935
936 tracing::info!("Step 3/5: Discovering collections...");
938 let db = client.database(&db_name);
939 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
940 .await
941 .context("Failed to list collections from MongoDB database")?;
942
943 if collections.is_empty() {
944 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
945 tracing::info!("✅ Migration complete (no collections to migrate)");
946 return Ok(());
947 }
948
949 tracing::info!("Found {} collection(s) to migrate", collections.len());
950
951 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
953 let target_client = postgres::connect_with_retry(target_url).await?;
954 tracing::info!(" ✓ Connected to PostgreSQL target");
955
956 tracing::info!("Step 5/5: Migrating collections...");
958 for (idx, collection_name) in collections.iter().enumerate() {
959 tracing::info!(
960 "Migrating collection {}/{}: '{}'",
961 idx + 1,
962 collections.len(),
963 collection_name
964 );
965
966 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
968 .await
969 .with_context(|| {
970 format!(
971 "Failed to convert collection '{}' to JSONB",
972 collection_name
973 )
974 })?;
975
976 tracing::info!(
977 " ✓ Converted {} documents from '{}'",
978 rows.len(),
979 collection_name
980 );
981
982 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
984 .await
985 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
986
987 tracing::info!(
988 " ✓ Created JSONB table '{}' in PostgreSQL",
989 collection_name
990 );
991
992 if !rows.is_empty() {
993 crate::jsonb::writer::insert_jsonb_batch(
995 &target_client,
996 collection_name,
997 rows,
998 "mongodb",
999 )
1000 .await
1001 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1002
1003 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1004 } else {
1005 tracing::info!(
1006 " ✓ Collection '{}' is empty (no documents to insert)",
1007 collection_name
1008 );
1009 }
1010 }
1011
1012 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1013 tracing::info!(
1014 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1015 collections.len(),
1016 db_name
1017 );
1018
1019 Ok(())
1020}
1021
1022pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1073 tracing::info!("Starting MySQL to PostgreSQL replication...");
1074
1075 tracing::info!("Step 1/5: Validating MySQL connection...");
1077 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1078 .await
1079 .context("MySQL connection failed")?;
1080 tracing::info!(" ✓ MySQL connection validated");
1081
1082 tracing::info!("Step 2/5: Extracting database name...");
1084 let db_name = crate::mysql::extract_database_name(mysql_url)
1085 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1086 tracing::info!(" ✓ Database name: '{}'", db_name);
1087
1088 tracing::info!("Step 3/5: Discovering tables...");
1090 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1091 .await
1092 .context("Failed to list tables from MySQL database")?;
1093
1094 if tables.is_empty() {
1095 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1096 tracing::info!("✅ Replication complete (no tables to replicate)");
1097 return Ok(());
1098 }
1099
1100 tracing::info!("Found {} table(s) to replicate", tables.len());
1101
1102 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1104 let target_client = postgres::connect_with_retry(target_url).await?;
1105 tracing::info!(" ✓ Connected to PostgreSQL target");
1106
1107 tracing::info!("Step 5/5: Replicating tables...");
1109 for (idx, table_name) in tables.iter().enumerate() {
1110 tracing::info!(
1111 "Replicating table {}/{}: '{}'",
1112 idx + 1,
1113 tables.len(),
1114 table_name
1115 );
1116
1117 let rows =
1119 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1120 .await
1121 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1122
1123 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1124
1125 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1127 .await
1128 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1129
1130 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1131
1132 if !rows.is_empty() {
1133 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1135 .await
1136 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1137
1138 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1139 } else {
1140 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1141 }
1142 }
1143
1144 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1145 tracing::info!(
1146 " Replicated {} table(s) from database '{}' to PostgreSQL",
1147 tables.len(),
1148 db_name
1149 );
1150
1151 Ok(())
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 use super::*;
1157
1158 #[tokio::test]
1159 #[ignore]
1160 async fn test_init_replicates_database() {
1161 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1162 let target = std::env::var("TEST_TARGET_URL").unwrap();
1163
1164 let filter = crate::filters::ReplicationFilter::empty();
1166 let result = init(&source, &target, true, filter, false, false, true, false).await;
1167 assert!(result.is_ok());
1168 }
1169
1170 #[test]
1171 fn test_replace_database_in_url() {
1172 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1173 let result = replace_database_in_url(url, "newdb").unwrap();
1174 assert_eq!(
1175 result,
1176 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1177 );
1178
1179 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1180 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1181 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1182 }
1183
1184 #[tokio::test]
1185 #[ignore]
1186 async fn test_database_is_empty() {
1187 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1188
1189 let result = database_is_empty(&url, "postgres").await;
1192 assert!(result.is_ok());
1193 }
1194}