1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.include_databases().map(|v| v.to_vec());
108 let preflight_result = crate::preflight::run_preflight_checks(
109 source_url,
110 target_url,
111 databases.as_deref(),
112 )
113 .await?;
114
115 preflight_result.print();
116
117 if !preflight_result.all_passed() {
118 if preflight_result.tool_version_incompatible
120 && crate::utils::is_serendb_target(target_url)
121 && !force_local
122 {
123 println!();
124 tracing::info!(
125 "Tool version incompatible. Switching to SerenAI cloud execution..."
126 );
127 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
129 }
130
131 if force_local {
133 bail!(
134 "Pre-flight checks failed. Cannot continue with --local flag.\n\
135 Fix the issues above or remove --local to allow remote execution."
136 );
137 }
138
139 bail!("Pre-flight checks failed. Fix the issues above and retry.");
140 }
141
142 println!();
143 }
144 crate::SourceType::SQLite => {
145 tracing::info!("Source type: SQLite");
147
148 if !filter.is_empty() {
150 tracing::warn!(
151 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
152 );
153 }
154 if drop_existing {
155 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
156 }
157 if !enable_sync {
158 tracing::warn!(
159 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
160 );
161 }
162
163 return init_sqlite_to_postgres(source_url, target_url).await;
164 }
165 crate::SourceType::MongoDB => {
166 tracing::info!("Source type: MongoDB");
168
169 if !filter.is_empty() {
171 tracing::warn!(
172 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
173 );
174 }
175 if drop_existing {
176 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
177 }
178 if !enable_sync {
179 tracing::warn!(
180 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
181 );
182 }
183
184 return init_mongodb_to_postgres(source_url, target_url).await;
185 }
186 crate::SourceType::MySQL => {
187 tracing::info!("Source type: MySQL");
189
190 if !filter.is_empty() {
192 tracing::warn!(
193 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
194 );
195 }
196 if drop_existing {
197 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
198 }
199 if !enable_sync {
200 tracing::warn!(
201 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
202 );
203 }
204
205 return init_mysql_to_postgres(source_url, target_url).await;
206 }
207 }
208
209 crate::utils::validate_source_target_different(source_url, target_url)
211 .context("Source and target validation failed")?;
212 tracing::info!("✓ Verified source and target are different databases");
213
214 let temp_path =
217 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
218 tracing::debug!("Using temp directory: {}", temp_path.display());
219
220 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
221 .context("Failed to determine checkpoint location")?;
222
223 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
225 let globals_file = temp_path.join("globals.sql");
226 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
227 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
228 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
229 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
230 .context("Failed to remove SUPERUSER from globals dump")?;
231 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
232 .context("Failed to remove restricted parameter settings from globals dump")?;
233 remove_restricted_role_grants(globals_file.to_str().unwrap())
234 .context("Failed to remove restricted role grants from globals dump")?;
235 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
236 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
237
238 tracing::info!("Step 2/4: Restoring global objects to target...");
240 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
241
242 tracing::info!("Step 3/4: Discovering databases...");
244 let all_databases = {
245 let source_client = postgres::connect_with_retry(source_url).await?;
247 migration::list_databases(&source_client).await?
248 }; let databases: Vec<_> = all_databases
252 .into_iter()
253 .filter(|db| filter.should_replicate_database(&db.name))
254 .collect();
255
256 if databases.is_empty() {
257 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
258 if filter.is_empty() {
259 tracing::warn!("⚠ No user databases found on source");
260 tracing::warn!(" This is unusual - the source database appears empty");
261 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
262 } else {
263 tracing::warn!("⚠ No databases matched the filter criteria");
264 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
265 }
266 tracing::info!("✅ Initial replication complete (no databases to replicate)");
267 return Ok(());
268 }
269
270 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
271 let filter_hash = filter.fingerprint();
272 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
273 source_url,
274 target_url,
275 filter_hash,
276 drop_existing,
277 enable_sync,
278 );
279
280 let mut checkpoint_state = if allow_resume {
281 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
282 Some(existing) => {
283 match existing.validate(&checkpoint_metadata, &database_names) {
285 Ok(()) => {
286 if existing.completed_count() > 0 {
288 tracing::info!(
289 "Resume checkpoint found: {}/{} databases already replicated",
290 existing.completed_count(),
291 existing.total_databases()
292 );
293 } else {
294 tracing::info!(
295 "Resume checkpoint found but no databases marked complete yet"
296 );
297 }
298 existing
299 }
300 Err(e) => {
301 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
303 tracing::warn!(
304 " Previous run configuration differs from current configuration"
305 );
306 tracing::warn!(" - Schema-only tables may have changed");
307 tracing::warn!(" - Time filters may have changed");
308 tracing::warn!(" - Table selection may have changed");
309 tracing::warn!(" Error: {}", e);
310 tracing::info!("");
311 tracing::info!(
312 "✓ Automatically discarding old checkpoint and starting fresh"
313 );
314 checkpoint::remove_checkpoint(&checkpoint_path)?;
315 checkpoint::InitCheckpoint::new(
316 checkpoint_metadata.clone(),
317 &database_names,
318 )
319 }
320 }
321 }
322 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
323 }
324 } else {
325 if checkpoint_path.exists() {
326 tracing::info!(
327 "--no-resume supplied: discarding previous checkpoint at {}",
328 checkpoint_path.display()
329 );
330 }
331 checkpoint::remove_checkpoint(&checkpoint_path)?;
332 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
333 };
334
335 checkpoint_state
337 .save(&checkpoint_path)
338 .context("Failed to persist checkpoint state")?;
339
340 tracing::info!("Found {} database(s) to replicate", databases.len());
341
342 if !skip_confirmation {
344 tracing::info!("Analyzing database sizes...");
345 let size_estimates = {
346 let source_client = postgres::connect_with_retry(source_url).await?;
348 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
349 .await?
350 }; if !confirm_replication(&size_estimates)? {
353 bail!("Replication cancelled by user");
354 }
355 }
356
357 tracing::info!("Step 4/4: Replicating databases...");
359 for (idx, db_info) in databases.iter().enumerate() {
360 let filtered_tables = filter.predicate_tables(&db_info.name);
361 if checkpoint_state.is_completed(&db_info.name) {
362 tracing::info!(
363 "Skipping database '{}' (already completed per checkpoint)",
364 db_info.name
365 );
366 continue;
367 }
368 tracing::info!(
369 "Replicating database {}/{}: '{}'",
370 idx + 1,
371 databases.len(),
372 db_info.name
373 );
374
375 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
377 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
378
379 {
382 let target_client = postgres::connect_with_retry(target_url).await?;
383
384 crate::utils::validate_postgres_identifier(&db_info.name)
386 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
387
388 let create_query = format!(
390 "CREATE DATABASE {}",
391 crate::utils::quote_ident(&db_info.name)
392 );
393 match target_client.execute(&create_query, &[]).await {
394 Ok(_) => {
395 tracing::info!(" Created database '{}'", db_info.name);
396 }
397 Err(err) => {
398 if let Some(db_error) = err.as_db_error() {
400 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
401 tracing::info!(
403 " Database '{}' already exists on target",
404 db_info.name
405 );
406
407 if database_is_empty(target_url, &db_info.name).await? {
409 tracing::info!(
410 " Database '{}' is empty, proceeding with restore",
411 db_info.name
412 );
413 } else {
414 let should_drop = if drop_existing {
416 true
418 } else if skip_confirmation {
419 tracing::info!(
421 " Auto-confirming drop for database '{}' (--yes flag)",
422 db_info.name
423 );
424 true
425 } else {
426 prompt_drop_database(&db_info.name)?
428 };
429
430 if should_drop {
431 drop_database_if_exists(&target_client, &db_info.name).await?;
432
433 let create_query = format!(
435 "CREATE DATABASE {}",
436 crate::utils::quote_ident(&db_info.name)
437 );
438 target_client
439 .execute(&create_query, &[])
440 .await
441 .with_context(|| {
442 format!(
443 "Failed to create database '{}' after drop",
444 db_info.name
445 )
446 })?;
447 tracing::info!(" Created database '{}'", db_info.name);
448 } else {
449 bail!("Aborted: Database '{}' already exists", db_info.name);
450 }
451 }
452 } else {
453 return Err(err).with_context(|| {
455 format!("Failed to create database '{}'", db_info.name)
456 });
457 }
458 } else {
459 return Err(err).with_context(|| {
461 format!("Failed to create database '{}'", db_info.name)
462 });
463 }
464 }
465 }
466 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
470 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
471 migration::dump_schema(
472 &source_db_url,
473 &db_info.name,
474 schema_file.to_str().unwrap(),
475 &filter,
476 )
477 .await?;
478
479 tracing::info!(" Restoring schema for '{}'...", db_info.name);
480 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
481
482 tracing::info!(" Dumping data for '{}'...", db_info.name);
484 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
485 migration::dump_data(
486 &source_db_url,
487 &db_info.name,
488 data_dir.to_str().unwrap(),
489 &filter,
490 )
491 .await?;
492
493 tracing::info!(" Restoring data for '{}'...", db_info.name);
494 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
495
496 if !filtered_tables.is_empty() {
497 tracing::info!(
498 " Applying filtered replication for {} table(s)...",
499 filtered_tables.len()
500 );
501 migration::filtered::copy_filtered_tables(
502 &source_db_url,
503 &target_db_url,
504 &filtered_tables,
505 )
506 .await?;
507 }
508
509 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
510
511 checkpoint_state.mark_completed(&db_info.name);
512 checkpoint_state
513 .save(&checkpoint_path)
514 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
515 }
516
517 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
520 tracing::warn!("Failed to clean up temp directory: {}", e);
521 }
523
524 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
525 tracing::warn!("Failed to remove checkpoint state: {}", err);
526 }
527
528 tracing::info!("✅ Initial replication complete");
529
530 let mut should_enable_sync = enable_sync;
532 if enable_sync {
533 tracing::info!("Checking target wal_level for logical replication...");
534 let target_wal_level = {
535 let target_client = postgres::connect_with_retry(target_url).await?;
537 postgres::check_wal_level(&target_client).await?
538 }; if target_wal_level != "logical" {
541 tracing::warn!("");
542 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
543 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
544 tracing::warn!("");
545 tracing::warn!(" To fix this:");
546 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
547 tracing::warn!(" 2. Restart PostgreSQL server");
548 tracing::warn!(
549 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
550 );
551 tracing::warn!("");
552 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
553 should_enable_sync = false;
554 }
555 }
556
557 if should_enable_sync {
559 tracing::info!("");
560 tracing::info!("========================================");
561 tracing::info!("Step 5/5: Setting up continuous replication...");
562 tracing::info!("========================================");
563 tracing::info!("");
564
565 crate::commands::sync(
567 source_url,
568 target_url,
569 Some(filter),
570 None,
571 None,
572 None,
573 false,
574 )
575 .await
576 .context("Failed to set up continuous replication")?;
577
578 tracing::info!("");
579 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
580 } else {
581 tracing::info!("");
582 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
583 tracing::info!(" To enable it later, run:");
584 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
585 }
586
587 Ok(())
588}
589
590fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
592 let parts: Vec<&str> = url.split('?').collect();
597 let base_url = parts[0];
598 let params = if parts.len() > 1 {
599 Some(parts[1])
600 } else {
601 None
602 };
603
604 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
606 if url_parts.len() != 2 {
607 anyhow::bail!("Invalid connection URL format");
608 }
609
610 let mut new_url = format!("{}/{}", url_parts[1], new_database);
612 if let Some(p) = params {
613 new_url = format!("{}?{}", new_url, p);
614 }
615
616 Ok(new_url)
617}
618
619fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
636 use std::time::Duration;
637
638 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
640 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
641
642 println!();
644 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
645 println!("{}", "─".repeat(50));
646
647 for size in sizes {
649 println!(
650 "{:<20} {:<12} {:<15}",
651 size.name,
652 size.size_human,
653 migration::format_duration(size.estimated_duration)
654 );
655 }
656
657 println!("{}", "─".repeat(50));
659 println!(
660 "Total: {} (estimated {})",
661 migration::format_bytes(total_bytes),
662 migration::format_duration(total_duration)
663 );
664 println!();
665
666 print!("Proceed with replication? [y/N]: ");
668 io::stdout().flush()?;
669
670 let mut input = String::new();
671 io::stdin()
672 .read_line(&mut input)
673 .context("Failed to read user input")?;
674
675 Ok(input.trim().to_lowercase() == "y")
676}
677
678async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
680 let db_url = replace_database_in_url(target_url, db_name)?;
682 let client = postgres::connect_with_retry(&db_url).await?;
683
684 let query = "
685 SELECT COUNT(*)
686 FROM information_schema.tables
687 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
688 ";
689
690 let row = client.query_one(query, &[]).await?;
691 let count: i64 = row.get(0);
692
693 Ok(count == 0)
694}
695
696fn prompt_drop_database(db_name: &str) -> Result<bool> {
698 use std::io::{self, Write};
699
700 print!(
701 "\nWarning: Database '{}' already exists on target and contains data.\n\
702 Drop and recreate database? This will delete all existing data. [y/N]: ",
703 db_name
704 );
705 io::stdout().flush()?;
706
707 let mut input = String::new();
708 io::stdin().read_line(&mut input)?;
709
710 Ok(input.trim().eq_ignore_ascii_case("y"))
711}
712
713async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
715 crate::utils::validate_postgres_identifier(db_name)
717 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
718
719 tracing::info!(" Dropping existing database '{}'...", db_name);
720
721 let terminate_query = "
723 SELECT pg_terminate_backend(pid)
724 FROM pg_stat_activity
725 WHERE datname = $1 AND pid <> pg_backend_pid()
726 ";
727 target_conn.execute(terminate_query, &[&db_name]).await?;
728
729 let drop_query = format!(
731 "DROP DATABASE IF EXISTS {}",
732 crate::utils::quote_ident(db_name)
733 );
734 target_conn
735 .execute(&drop_query, &[])
736 .await
737 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
738
739 tracing::info!(" ✓ Database '{}' dropped", db_name);
740 Ok(())
741}
742
743pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
791 tracing::info!("Starting SQLite to PostgreSQL migration...");
792
793 tracing::info!("Step 1/4: Validating SQLite database...");
795 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
796 .context("SQLite file validation failed")?;
797 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
798
799 tracing::info!("Step 2/4: Opening SQLite database...");
801 let sqlite_conn =
802 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
803 tracing::info!(" ✓ SQLite database opened (read-only mode)");
804
805 tracing::info!("Step 3/4: Discovering tables...");
807 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
808 .context("Failed to list tables from SQLite database")?;
809
810 if tables.is_empty() {
811 tracing::warn!("⚠ No tables found in SQLite database");
812 tracing::info!("✅ Migration complete (no tables to migrate)");
813 return Ok(());
814 }
815
816 tracing::info!("Found {} table(s) to migrate", tables.len());
817
818 let target_client = postgres::connect_with_retry(target_url).await?;
820 tracing::info!(" ✓ Connected to PostgreSQL target");
821
822 tracing::info!("Step 4/4: Migrating tables...");
824 for (idx, table_name) in tables.iter().enumerate() {
825 tracing::info!(
826 "Migrating table {}/{}: '{}'",
827 idx + 1,
828 tables.len(),
829 table_name
830 );
831
832 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
834 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
835
836 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
837
838 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
840 .await
841 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
842
843 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
844
845 if !rows.is_empty() {
846 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
848 .await
849 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
850
851 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
852 } else {
853 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
854 }
855 }
856
857 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
858 tracing::info!(
859 " Migrated {} table(s) from '{}' to PostgreSQL",
860 tables.len(),
861 sqlite_path
862 );
863
864 Ok(())
865}
866
867pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
918 tracing::info!("Starting MongoDB to PostgreSQL migration...");
919
920 tracing::info!("Step 1/5: Validating MongoDB connection...");
922 let client = crate::mongodb::connect_mongodb(mongo_url)
923 .await
924 .context("MongoDB connection failed")?;
925 tracing::info!(" ✓ MongoDB connection validated");
926
927 tracing::info!("Step 2/5: Extracting database name...");
929 let db_name = crate::mongodb::extract_database_name(mongo_url)
930 .await
931 .context("Failed to parse MongoDB connection string")?
932 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
933 tracing::info!(" ✓ Database name: '{}'", db_name);
934
935 tracing::info!("Step 3/5: Discovering collections...");
937 let db = client.database(&db_name);
938 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
939 .await
940 .context("Failed to list collections from MongoDB database")?;
941
942 if collections.is_empty() {
943 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
944 tracing::info!("✅ Migration complete (no collections to migrate)");
945 return Ok(());
946 }
947
948 tracing::info!("Found {} collection(s) to migrate", collections.len());
949
950 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
952 let target_client = postgres::connect_with_retry(target_url).await?;
953 tracing::info!(" ✓ Connected to PostgreSQL target");
954
955 tracing::info!("Step 5/5: Migrating collections...");
957 for (idx, collection_name) in collections.iter().enumerate() {
958 tracing::info!(
959 "Migrating collection {}/{}: '{}'",
960 idx + 1,
961 collections.len(),
962 collection_name
963 );
964
965 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
967 .await
968 .with_context(|| {
969 format!(
970 "Failed to convert collection '{}' to JSONB",
971 collection_name
972 )
973 })?;
974
975 tracing::info!(
976 " ✓ Converted {} documents from '{}'",
977 rows.len(),
978 collection_name
979 );
980
981 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
983 .await
984 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
985
986 tracing::info!(
987 " ✓ Created JSONB table '{}' in PostgreSQL",
988 collection_name
989 );
990
991 if !rows.is_empty() {
992 crate::jsonb::writer::insert_jsonb_batch(
994 &target_client,
995 collection_name,
996 rows,
997 "mongodb",
998 )
999 .await
1000 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1001
1002 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1003 } else {
1004 tracing::info!(
1005 " ✓ Collection '{}' is empty (no documents to insert)",
1006 collection_name
1007 );
1008 }
1009 }
1010
1011 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1012 tracing::info!(
1013 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1014 collections.len(),
1015 db_name
1016 );
1017
1018 Ok(())
1019}
1020
1021pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1072 tracing::info!("Starting MySQL to PostgreSQL replication...");
1073
1074 tracing::info!("Step 1/5: Validating MySQL connection...");
1076 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1077 .await
1078 .context("MySQL connection failed")?;
1079 tracing::info!(" ✓ MySQL connection validated");
1080
1081 tracing::info!("Step 2/5: Extracting database name...");
1083 let db_name = crate::mysql::extract_database_name(mysql_url)
1084 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1085 tracing::info!(" ✓ Database name: '{}'", db_name);
1086
1087 tracing::info!("Step 3/5: Discovering tables...");
1089 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1090 .await
1091 .context("Failed to list tables from MySQL database")?;
1092
1093 if tables.is_empty() {
1094 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1095 tracing::info!("✅ Replication complete (no tables to replicate)");
1096 return Ok(());
1097 }
1098
1099 tracing::info!("Found {} table(s) to replicate", tables.len());
1100
1101 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1103 let target_client = postgres::connect_with_retry(target_url).await?;
1104 tracing::info!(" ✓ Connected to PostgreSQL target");
1105
1106 tracing::info!("Step 5/5: Replicating tables...");
1108 for (idx, table_name) in tables.iter().enumerate() {
1109 tracing::info!(
1110 "Replicating table {}/{}: '{}'",
1111 idx + 1,
1112 tables.len(),
1113 table_name
1114 );
1115
1116 let rows =
1118 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1119 .await
1120 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1121
1122 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1123
1124 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1126 .await
1127 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1128
1129 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1130
1131 if !rows.is_empty() {
1132 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1134 .await
1135 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1136
1137 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1138 } else {
1139 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1140 }
1141 }
1142
1143 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1144 tracing::info!(
1145 " Replicated {} table(s) from database '{}' to PostgreSQL",
1146 tables.len(),
1147 db_name
1148 );
1149
1150 Ok(())
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::*;
1156
1157 #[tokio::test]
1158 #[ignore]
1159 async fn test_init_replicates_database() {
1160 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1161 let target = std::env::var("TEST_TARGET_URL").unwrap();
1162
1163 let filter = crate::filters::ReplicationFilter::empty();
1165 let result = init(&source, &target, true, filter, false, false, true, false).await;
1166 assert!(result.is_ok());
1167 }
1168
1169 #[test]
1170 fn test_replace_database_in_url() {
1171 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1172 let result = replace_database_in_url(url, "newdb").unwrap();
1173 assert_eq!(
1174 result,
1175 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1176 );
1177
1178 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1179 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1180 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1181 }
1182
1183 #[tokio::test]
1184 #[ignore]
1185 async fn test_database_is_empty() {
1186 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1187
1188 let result = database_is_empty(&url, "postgres").await;
1191 assert!(result.is_ok());
1192 }
1193}