1use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9#[allow(clippy::too_many_arguments)]
82pub async fn init(
83 source_url: &str,
84 target_url: &str,
85 skip_confirmation: bool,
86 filter: crate::filters::ReplicationFilter,
87 drop_existing: bool,
88 enable_sync: bool,
89 allow_resume: bool,
90 force_local: bool,
91) -> Result<()> {
92 tracing::info!("Starting initial replication...");
93
94 let source_type =
96 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
97
98 match source_type {
99 crate::SourceType::PostgreSQL => {
100 tracing::info!("Source type: PostgreSQL");
102
103 tracing::info!("Running pre-flight checks...");
105
106 let databases = filter.include_databases().map(|v| v.to_vec());
107 let preflight_result = crate::preflight::run_preflight_checks(
108 source_url,
109 target_url,
110 databases.as_deref(),
111 )
112 .await?;
113
114 preflight_result.print();
115
116 if !preflight_result.all_passed() {
117 if preflight_result.tool_version_incompatible
119 && crate::utils::is_serendb_target(target_url)
120 && !force_local
121 {
122 println!();
123 tracing::info!(
124 "Tool version incompatible. Switching to SerenAI cloud execution..."
125 );
126 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
128 }
129
130 if force_local {
132 bail!(
133 "Pre-flight checks failed. Cannot continue with --local flag.\n\
134 Fix the issues above or remove --local to allow remote execution."
135 );
136 }
137
138 bail!("Pre-flight checks failed. Fix the issues above and retry.");
139 }
140
141 println!();
142 }
143 crate::SourceType::SQLite => {
144 tracing::info!("Source type: SQLite");
146
147 if !filter.is_empty() {
149 tracing::warn!(
150 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
151 );
152 }
153 if drop_existing {
154 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
155 }
156 if !enable_sync {
157 tracing::warn!(
158 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
159 );
160 }
161
162 return init_sqlite_to_postgres(source_url, target_url).await;
163 }
164 crate::SourceType::MongoDB => {
165 tracing::info!("Source type: MongoDB");
167
168 if !filter.is_empty() {
170 tracing::warn!(
171 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
172 );
173 }
174 if drop_existing {
175 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
176 }
177 if !enable_sync {
178 tracing::warn!(
179 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
180 );
181 }
182
183 return init_mongodb_to_postgres(source_url, target_url).await;
184 }
185 crate::SourceType::MySQL => {
186 tracing::info!("Source type: MySQL");
188
189 if !filter.is_empty() {
191 tracing::warn!(
192 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
193 );
194 }
195 if drop_existing {
196 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
197 }
198 if !enable_sync {
199 tracing::warn!(
200 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
201 );
202 }
203
204 return init_mysql_to_postgres(source_url, target_url).await;
205 }
206 }
207
208 crate::utils::validate_source_target_different(source_url, target_url)
210 .context("Source and target validation failed")?;
211 tracing::info!("✓ Verified source and target are different databases");
212
213 let temp_path =
216 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
217 tracing::debug!("Using temp directory: {}", temp_path.display());
218
219 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
220 .context("Failed to determine checkpoint location")?;
221
222 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
224 let globals_file = temp_path.join("globals.sql");
225 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
226 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
227 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
228 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
229 .context("Failed to remove SUPERUSER from globals dump")?;
230 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
231 .context("Failed to remove restricted parameter settings from globals dump")?;
232
233 tracing::info!("Step 2/4: Restoring global objects to target...");
235 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
236
237 tracing::info!("Step 3/4: Discovering databases...");
239 let all_databases = {
240 let source_client = postgres::connect_with_retry(source_url).await?;
242 migration::list_databases(&source_client).await?
243 }; let databases: Vec<_> = all_databases
247 .into_iter()
248 .filter(|db| filter.should_replicate_database(&db.name))
249 .collect();
250
251 if databases.is_empty() {
252 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
253 if filter.is_empty() {
254 tracing::warn!("⚠ No user databases found on source");
255 tracing::warn!(" This is unusual - the source database appears empty");
256 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
257 } else {
258 tracing::warn!("⚠ No databases matched the filter criteria");
259 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
260 }
261 tracing::info!("✅ Initial replication complete (no databases to replicate)");
262 return Ok(());
263 }
264
265 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
266 let filter_hash = filter.fingerprint();
267 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
268 source_url,
269 target_url,
270 filter_hash,
271 drop_existing,
272 enable_sync,
273 );
274
275 let mut checkpoint_state = if allow_resume {
276 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
277 Some(existing) => {
278 match existing.validate(&checkpoint_metadata, &database_names) {
280 Ok(()) => {
281 if existing.completed_count() > 0 {
283 tracing::info!(
284 "Resume checkpoint found: {}/{} databases already replicated",
285 existing.completed_count(),
286 existing.total_databases()
287 );
288 } else {
289 tracing::info!(
290 "Resume checkpoint found but no databases marked complete yet"
291 );
292 }
293 existing
294 }
295 Err(e) => {
296 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
298 tracing::warn!(
299 " Previous run configuration differs from current configuration"
300 );
301 tracing::warn!(" - Schema-only tables may have changed");
302 tracing::warn!(" - Time filters may have changed");
303 tracing::warn!(" - Table selection may have changed");
304 tracing::warn!(" Error: {}", e);
305 tracing::info!("");
306 tracing::info!(
307 "✓ Automatically discarding old checkpoint and starting fresh"
308 );
309 checkpoint::remove_checkpoint(&checkpoint_path)?;
310 checkpoint::InitCheckpoint::new(
311 checkpoint_metadata.clone(),
312 &database_names,
313 )
314 }
315 }
316 }
317 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
318 }
319 } else {
320 if checkpoint_path.exists() {
321 tracing::info!(
322 "--no-resume supplied: discarding previous checkpoint at {}",
323 checkpoint_path.display()
324 );
325 }
326 checkpoint::remove_checkpoint(&checkpoint_path)?;
327 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
328 };
329
330 checkpoint_state
332 .save(&checkpoint_path)
333 .context("Failed to persist checkpoint state")?;
334
335 tracing::info!("Found {} database(s) to replicate", databases.len());
336
337 if !skip_confirmation {
339 tracing::info!("Analyzing database sizes...");
340 let size_estimates = {
341 let source_client = postgres::connect_with_retry(source_url).await?;
343 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
344 .await?
345 }; if !confirm_replication(&size_estimates)? {
348 bail!("Replication cancelled by user");
349 }
350 }
351
352 tracing::info!("Step 4/4: Replicating databases...");
354 for (idx, db_info) in databases.iter().enumerate() {
355 let filtered_tables = filter.predicate_tables(&db_info.name);
356 if checkpoint_state.is_completed(&db_info.name) {
357 tracing::info!(
358 "Skipping database '{}' (already completed per checkpoint)",
359 db_info.name
360 );
361 continue;
362 }
363 tracing::info!(
364 "Replicating database {}/{}: '{}'",
365 idx + 1,
366 databases.len(),
367 db_info.name
368 );
369
370 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
372 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
373
374 {
377 let target_client = postgres::connect_with_retry(target_url).await?;
378
379 crate::utils::validate_postgres_identifier(&db_info.name)
381 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
382
383 let create_query = format!(
385 "CREATE DATABASE {}",
386 crate::utils::quote_ident(&db_info.name)
387 );
388 match target_client.execute(&create_query, &[]).await {
389 Ok(_) => {
390 tracing::info!(" Created database '{}'", db_info.name);
391 }
392 Err(err) => {
393 if let Some(db_error) = err.as_db_error() {
395 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
396 tracing::info!(
398 " Database '{}' already exists on target",
399 db_info.name
400 );
401
402 if database_is_empty(target_url, &db_info.name).await? {
404 tracing::info!(
405 " Database '{}' is empty, proceeding with restore",
406 db_info.name
407 );
408 } else {
409 let should_drop = if drop_existing {
411 true
413 } else if skip_confirmation {
414 tracing::info!(
416 " Auto-confirming drop for database '{}' (--yes flag)",
417 db_info.name
418 );
419 true
420 } else {
421 prompt_drop_database(&db_info.name)?
423 };
424
425 if should_drop {
426 drop_database_if_exists(&target_client, &db_info.name).await?;
427
428 let create_query = format!(
430 "CREATE DATABASE {}",
431 crate::utils::quote_ident(&db_info.name)
432 );
433 target_client
434 .execute(&create_query, &[])
435 .await
436 .with_context(|| {
437 format!(
438 "Failed to create database '{}' after drop",
439 db_info.name
440 )
441 })?;
442 tracing::info!(" Created database '{}'", db_info.name);
443 } else {
444 bail!("Aborted: Database '{}' already exists", db_info.name);
445 }
446 }
447 } else {
448 return Err(err).with_context(|| {
450 format!("Failed to create database '{}'", db_info.name)
451 });
452 }
453 } else {
454 return Err(err).with_context(|| {
456 format!("Failed to create database '{}'", db_info.name)
457 });
458 }
459 }
460 }
461 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
465 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
466 migration::dump_schema(
467 &source_db_url,
468 &db_info.name,
469 schema_file.to_str().unwrap(),
470 &filter,
471 )
472 .await?;
473
474 tracing::info!(" Restoring schema for '{}'...", db_info.name);
475 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
476
477 tracing::info!(" Dumping data for '{}'...", db_info.name);
479 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
480 migration::dump_data(
481 &source_db_url,
482 &db_info.name,
483 data_dir.to_str().unwrap(),
484 &filter,
485 )
486 .await?;
487
488 tracing::info!(" Restoring data for '{}'...", db_info.name);
489 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
490
491 if !filtered_tables.is_empty() {
492 tracing::info!(
493 " Applying filtered replication for {} table(s)...",
494 filtered_tables.len()
495 );
496 migration::filtered::copy_filtered_tables(
497 &source_db_url,
498 &target_db_url,
499 &filtered_tables,
500 )
501 .await?;
502 }
503
504 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
505
506 checkpoint_state.mark_completed(&db_info.name);
507 checkpoint_state
508 .save(&checkpoint_path)
509 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
510 }
511
512 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
515 tracing::warn!("Failed to clean up temp directory: {}", e);
516 }
518
519 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
520 tracing::warn!("Failed to remove checkpoint state: {}", err);
521 }
522
523 tracing::info!("✅ Initial replication complete");
524
525 let mut should_enable_sync = enable_sync;
527 if enable_sync {
528 tracing::info!("Checking target wal_level for logical replication...");
529 let target_wal_level = {
530 let target_client = postgres::connect_with_retry(target_url).await?;
532 postgres::check_wal_level(&target_client).await?
533 }; if target_wal_level != "logical" {
536 tracing::warn!("");
537 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
538 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
539 tracing::warn!("");
540 tracing::warn!(" To fix this:");
541 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
542 tracing::warn!(" 2. Restart PostgreSQL server");
543 tracing::warn!(
544 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
545 );
546 tracing::warn!("");
547 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
548 should_enable_sync = false;
549 }
550 }
551
552 if should_enable_sync {
554 tracing::info!("");
555 tracing::info!("========================================");
556 tracing::info!("Step 5/5: Setting up continuous replication...");
557 tracing::info!("========================================");
558 tracing::info!("");
559
560 crate::commands::sync(
562 source_url,
563 target_url,
564 Some(filter),
565 None,
566 None,
567 None,
568 false,
569 )
570 .await
571 .context("Failed to set up continuous replication")?;
572
573 tracing::info!("");
574 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
575 } else {
576 tracing::info!("");
577 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
578 tracing::info!(" To enable it later, run:");
579 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
580 }
581
582 Ok(())
583}
584
585fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
587 let parts: Vec<&str> = url.split('?').collect();
592 let base_url = parts[0];
593 let params = if parts.len() > 1 {
594 Some(parts[1])
595 } else {
596 None
597 };
598
599 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
601 if url_parts.len() != 2 {
602 anyhow::bail!("Invalid connection URL format");
603 }
604
605 let mut new_url = format!("{}/{}", url_parts[1], new_database);
607 if let Some(p) = params {
608 new_url = format!("{}?{}", new_url, p);
609 }
610
611 Ok(new_url)
612}
613
614fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
631 use std::time::Duration;
632
633 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
635 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
636
637 println!();
639 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
640 println!("{}", "─".repeat(50));
641
642 for size in sizes {
644 println!(
645 "{:<20} {:<12} {:<15}",
646 size.name,
647 size.size_human,
648 migration::format_duration(size.estimated_duration)
649 );
650 }
651
652 println!("{}", "─".repeat(50));
654 println!(
655 "Total: {} (estimated {})",
656 migration::format_bytes(total_bytes),
657 migration::format_duration(total_duration)
658 );
659 println!();
660
661 print!("Proceed with replication? [y/N]: ");
663 io::stdout().flush()?;
664
665 let mut input = String::new();
666 io::stdin()
667 .read_line(&mut input)
668 .context("Failed to read user input")?;
669
670 Ok(input.trim().to_lowercase() == "y")
671}
672
673async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
675 let db_url = replace_database_in_url(target_url, db_name)?;
677 let client = postgres::connect_with_retry(&db_url).await?;
678
679 let query = "
680 SELECT COUNT(*)
681 FROM information_schema.tables
682 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
683 ";
684
685 let row = client.query_one(query, &[]).await?;
686 let count: i64 = row.get(0);
687
688 Ok(count == 0)
689}
690
691fn prompt_drop_database(db_name: &str) -> Result<bool> {
693 use std::io::{self, Write};
694
695 print!(
696 "\nWarning: Database '{}' already exists on target and contains data.\n\
697 Drop and recreate database? This will delete all existing data. [y/N]: ",
698 db_name
699 );
700 io::stdout().flush()?;
701
702 let mut input = String::new();
703 io::stdin().read_line(&mut input)?;
704
705 Ok(input.trim().eq_ignore_ascii_case("y"))
706}
707
708async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
710 crate::utils::validate_postgres_identifier(db_name)
712 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
713
714 tracing::info!(" Dropping existing database '{}'...", db_name);
715
716 let terminate_query = "
718 SELECT pg_terminate_backend(pid)
719 FROM pg_stat_activity
720 WHERE datname = $1 AND pid <> pg_backend_pid()
721 ";
722 target_conn.execute(terminate_query, &[&db_name]).await?;
723
724 let drop_query = format!(
726 "DROP DATABASE IF EXISTS {}",
727 crate::utils::quote_ident(db_name)
728 );
729 target_conn
730 .execute(&drop_query, &[])
731 .await
732 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
733
734 tracing::info!(" ✓ Database '{}' dropped", db_name);
735 Ok(())
736}
737
738pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
786 tracing::info!("Starting SQLite to PostgreSQL migration...");
787
788 tracing::info!("Step 1/4: Validating SQLite database...");
790 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
791 .context("SQLite file validation failed")?;
792 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
793
794 tracing::info!("Step 2/4: Opening SQLite database...");
796 let sqlite_conn =
797 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
798 tracing::info!(" ✓ SQLite database opened (read-only mode)");
799
800 tracing::info!("Step 3/4: Discovering tables...");
802 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
803 .context("Failed to list tables from SQLite database")?;
804
805 if tables.is_empty() {
806 tracing::warn!("⚠ No tables found in SQLite database");
807 tracing::info!("✅ Migration complete (no tables to migrate)");
808 return Ok(());
809 }
810
811 tracing::info!("Found {} table(s) to migrate", tables.len());
812
813 let target_client = postgres::connect_with_retry(target_url).await?;
815 tracing::info!(" ✓ Connected to PostgreSQL target");
816
817 tracing::info!("Step 4/4: Migrating tables...");
819 for (idx, table_name) in tables.iter().enumerate() {
820 tracing::info!(
821 "Migrating table {}/{}: '{}'",
822 idx + 1,
823 tables.len(),
824 table_name
825 );
826
827 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
829 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
830
831 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
832
833 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
835 .await
836 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
837
838 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
839
840 if !rows.is_empty() {
841 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
843 .await
844 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
845
846 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
847 } else {
848 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
849 }
850 }
851
852 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
853 tracing::info!(
854 " Migrated {} table(s) from '{}' to PostgreSQL",
855 tables.len(),
856 sqlite_path
857 );
858
859 Ok(())
860}
861
862pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
913 tracing::info!("Starting MongoDB to PostgreSQL migration...");
914
915 tracing::info!("Step 1/5: Validating MongoDB connection...");
917 let client = crate::mongodb::connect_mongodb(mongo_url)
918 .await
919 .context("MongoDB connection failed")?;
920 tracing::info!(" ✓ MongoDB connection validated");
921
922 tracing::info!("Step 2/5: Extracting database name...");
924 let db_name = crate::mongodb::extract_database_name(mongo_url)
925 .await
926 .context("Failed to parse MongoDB connection string")?
927 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
928 tracing::info!(" ✓ Database name: '{}'", db_name);
929
930 tracing::info!("Step 3/5: Discovering collections...");
932 let db = client.database(&db_name);
933 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
934 .await
935 .context("Failed to list collections from MongoDB database")?;
936
937 if collections.is_empty() {
938 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
939 tracing::info!("✅ Migration complete (no collections to migrate)");
940 return Ok(());
941 }
942
943 tracing::info!("Found {} collection(s) to migrate", collections.len());
944
945 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
947 let target_client = postgres::connect_with_retry(target_url).await?;
948 tracing::info!(" ✓ Connected to PostgreSQL target");
949
950 tracing::info!("Step 5/5: Migrating collections...");
952 for (idx, collection_name) in collections.iter().enumerate() {
953 tracing::info!(
954 "Migrating collection {}/{}: '{}'",
955 idx + 1,
956 collections.len(),
957 collection_name
958 );
959
960 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
962 .await
963 .with_context(|| {
964 format!(
965 "Failed to convert collection '{}' to JSONB",
966 collection_name
967 )
968 })?;
969
970 tracing::info!(
971 " ✓ Converted {} documents from '{}'",
972 rows.len(),
973 collection_name
974 );
975
976 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
978 .await
979 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
980
981 tracing::info!(
982 " ✓ Created JSONB table '{}' in PostgreSQL",
983 collection_name
984 );
985
986 if !rows.is_empty() {
987 crate::jsonb::writer::insert_jsonb_batch(
989 &target_client,
990 collection_name,
991 rows,
992 "mongodb",
993 )
994 .await
995 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
996
997 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
998 } else {
999 tracing::info!(
1000 " ✓ Collection '{}' is empty (no documents to insert)",
1001 collection_name
1002 );
1003 }
1004 }
1005
1006 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1007 tracing::info!(
1008 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1009 collections.len(),
1010 db_name
1011 );
1012
1013 Ok(())
1014}
1015
1016pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1067 tracing::info!("Starting MySQL to PostgreSQL replication...");
1068
1069 tracing::info!("Step 1/5: Validating MySQL connection...");
1071 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1072 .await
1073 .context("MySQL connection failed")?;
1074 tracing::info!(" ✓ MySQL connection validated");
1075
1076 tracing::info!("Step 2/5: Extracting database name...");
1078 let db_name = crate::mysql::extract_database_name(mysql_url)
1079 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1080 tracing::info!(" ✓ Database name: '{}'", db_name);
1081
1082 tracing::info!("Step 3/5: Discovering tables...");
1084 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1085 .await
1086 .context("Failed to list tables from MySQL database")?;
1087
1088 if tables.is_empty() {
1089 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1090 tracing::info!("✅ Replication complete (no tables to replicate)");
1091 return Ok(());
1092 }
1093
1094 tracing::info!("Found {} table(s) to replicate", tables.len());
1095
1096 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1098 let target_client = postgres::connect_with_retry(target_url).await?;
1099 tracing::info!(" ✓ Connected to PostgreSQL target");
1100
1101 tracing::info!("Step 5/5: Replicating tables...");
1103 for (idx, table_name) in tables.iter().enumerate() {
1104 tracing::info!(
1105 "Replicating table {}/{}: '{}'",
1106 idx + 1,
1107 tables.len(),
1108 table_name
1109 );
1110
1111 let rows =
1113 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1114 .await
1115 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1116
1117 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1118
1119 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1121 .await
1122 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1123
1124 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1125
1126 if !rows.is_empty() {
1127 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1129 .await
1130 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1131
1132 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1133 } else {
1134 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1135 }
1136 }
1137
1138 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1139 tracing::info!(
1140 " Replicated {} table(s) from database '{}' to PostgreSQL",
1141 tables.len(),
1142 db_name
1143 );
1144
1145 Ok(())
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150 use super::*;
1151
1152 #[tokio::test]
1153 #[ignore]
1154 async fn test_init_replicates_database() {
1155 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1156 let target = std::env::var("TEST_TARGET_URL").unwrap();
1157
1158 let filter = crate::filters::ReplicationFilter::empty();
1160 let result = init(&source, &target, true, filter, false, false, true, false).await;
1161 assert!(result.is_ok());
1162 }
1163
1164 #[test]
1165 fn test_replace_database_in_url() {
1166 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1167 let result = replace_database_in_url(url, "newdb").unwrap();
1168 assert_eq!(
1169 result,
1170 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1171 );
1172
1173 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1174 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1175 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1176 }
1177
1178 #[tokio::test]
1179 #[ignore]
1180 async fn test_database_is_empty() {
1181 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1182
1183 let result = database_is_empty(&url, "postgres").await;
1186 assert!(result.is_ok());
1187 }
1188}