1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.include_databases().map(|v| v.to_vec());
108 let preflight_result = crate::preflight::run_preflight_checks(
109 source_url,
110 target_url,
111 databases.as_deref(),
112 )
113 .await?;
114
115 preflight_result.print();
116
117 if !preflight_result.all_passed() {
118 if preflight_result.tool_version_incompatible
120 && crate::utils::is_serendb_target(target_url)
121 && !force_local
122 {
123 println!();
124 tracing::info!(
125 "Tool version incompatible. Switching to SerenAI cloud execution..."
126 );
127 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
129 }
130
131 if force_local {
133 bail!(
134 "Pre-flight checks failed. Cannot continue with --local flag.\n\
135 Fix the issues above or remove --local to allow remote execution."
136 );
137 }
138
139 bail!("Pre-flight checks failed. Fix the issues above and retry.");
140 }
141
142 println!();
143 }
144 crate::SourceType::SQLite => {
145 tracing::info!("Source type: SQLite");
147
148 if !filter.is_empty() {
150 tracing::warn!(
151 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
152 );
153 }
154 if drop_existing {
155 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
156 }
157 if !enable_sync {
158 tracing::warn!(
159 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
160 );
161 }
162
163 return init_sqlite_to_postgres(source_url, target_url).await;
164 }
165 crate::SourceType::MongoDB => {
166 tracing::info!("Source type: MongoDB");
168
169 if !filter.is_empty() {
171 tracing::warn!(
172 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
173 );
174 }
175 if drop_existing {
176 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
177 }
178 if !enable_sync {
179 tracing::warn!(
180 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
181 );
182 }
183
184 return init_mongodb_to_postgres(source_url, target_url).await;
185 }
186 crate::SourceType::MySQL => {
187 tracing::info!("Source type: MySQL");
189
190 if !filter.is_empty() {
192 tracing::warn!(
193 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
194 );
195 }
196 if drop_existing {
197 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
198 }
199 if !enable_sync {
200 tracing::warn!(
201 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
202 );
203 }
204
205 return init_mysql_to_postgres(source_url, target_url).await;
206 }
207 }
208
209 crate::utils::validate_source_target_different(source_url, target_url)
211 .context("Source and target validation failed")?;
212 tracing::info!("✓ Verified source and target are different databases");
213
214 let temp_path =
217 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
218 tracing::debug!("Using temp directory: {}", temp_path.display());
219
220 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
221 .context("Failed to determine checkpoint location")?;
222
223 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
225 let globals_file = temp_path.join("globals.sql");
226 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
227 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
228 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
229 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
230 .context("Failed to remove SUPERUSER from globals dump")?;
231 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
232 .context("Failed to remove restricted parameter settings from globals dump")?;
233 remove_restricted_role_grants(globals_file.to_str().unwrap())
234 .context("Failed to remove restricted role grants from globals dump")?;
235
236 tracing::info!("Step 2/4: Restoring global objects to target...");
238 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
239
240 tracing::info!("Step 3/4: Discovering databases...");
242 let all_databases = {
243 let source_client = postgres::connect_with_retry(source_url).await?;
245 migration::list_databases(&source_client).await?
246 }; let databases: Vec<_> = all_databases
250 .into_iter()
251 .filter(|db| filter.should_replicate_database(&db.name))
252 .collect();
253
254 if databases.is_empty() {
255 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
256 if filter.is_empty() {
257 tracing::warn!("⚠ No user databases found on source");
258 tracing::warn!(" This is unusual - the source database appears empty");
259 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
260 } else {
261 tracing::warn!("⚠ No databases matched the filter criteria");
262 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
263 }
264 tracing::info!("✅ Initial replication complete (no databases to replicate)");
265 return Ok(());
266 }
267
268 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
269 let filter_hash = filter.fingerprint();
270 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
271 source_url,
272 target_url,
273 filter_hash,
274 drop_existing,
275 enable_sync,
276 );
277
278 let mut checkpoint_state = if allow_resume {
279 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
280 Some(existing) => {
281 match existing.validate(&checkpoint_metadata, &database_names) {
283 Ok(()) => {
284 if existing.completed_count() > 0 {
286 tracing::info!(
287 "Resume checkpoint found: {}/{} databases already replicated",
288 existing.completed_count(),
289 existing.total_databases()
290 );
291 } else {
292 tracing::info!(
293 "Resume checkpoint found but no databases marked complete yet"
294 );
295 }
296 existing
297 }
298 Err(e) => {
299 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
301 tracing::warn!(
302 " Previous run configuration differs from current configuration"
303 );
304 tracing::warn!(" - Schema-only tables may have changed");
305 tracing::warn!(" - Time filters may have changed");
306 tracing::warn!(" - Table selection may have changed");
307 tracing::warn!(" Error: {}", e);
308 tracing::info!("");
309 tracing::info!(
310 "✓ Automatically discarding old checkpoint and starting fresh"
311 );
312 checkpoint::remove_checkpoint(&checkpoint_path)?;
313 checkpoint::InitCheckpoint::new(
314 checkpoint_metadata.clone(),
315 &database_names,
316 )
317 }
318 }
319 }
320 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
321 }
322 } else {
323 if checkpoint_path.exists() {
324 tracing::info!(
325 "--no-resume supplied: discarding previous checkpoint at {}",
326 checkpoint_path.display()
327 );
328 }
329 checkpoint::remove_checkpoint(&checkpoint_path)?;
330 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
331 };
332
333 checkpoint_state
335 .save(&checkpoint_path)
336 .context("Failed to persist checkpoint state")?;
337
338 tracing::info!("Found {} database(s) to replicate", databases.len());
339
340 if !skip_confirmation {
342 tracing::info!("Analyzing database sizes...");
343 let size_estimates = {
344 let source_client = postgres::connect_with_retry(source_url).await?;
346 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
347 .await?
348 }; if !confirm_replication(&size_estimates)? {
351 bail!("Replication cancelled by user");
352 }
353 }
354
355 tracing::info!("Step 4/4: Replicating databases...");
357 for (idx, db_info) in databases.iter().enumerate() {
358 let filtered_tables = filter.predicate_tables(&db_info.name);
359 if checkpoint_state.is_completed(&db_info.name) {
360 tracing::info!(
361 "Skipping database '{}' (already completed per checkpoint)",
362 db_info.name
363 );
364 continue;
365 }
366 tracing::info!(
367 "Replicating database {}/{}: '{}'",
368 idx + 1,
369 databases.len(),
370 db_info.name
371 );
372
373 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
375 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
376
377 {
380 let target_client = postgres::connect_with_retry(target_url).await?;
381
382 crate::utils::validate_postgres_identifier(&db_info.name)
384 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
385
386 let create_query = format!(
388 "CREATE DATABASE {}",
389 crate::utils::quote_ident(&db_info.name)
390 );
391 match target_client.execute(&create_query, &[]).await {
392 Ok(_) => {
393 tracing::info!(" Created database '{}'", db_info.name);
394 }
395 Err(err) => {
396 if let Some(db_error) = err.as_db_error() {
398 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
399 tracing::info!(
401 " Database '{}' already exists on target",
402 db_info.name
403 );
404
405 if database_is_empty(target_url, &db_info.name).await? {
407 tracing::info!(
408 " Database '{}' is empty, proceeding with restore",
409 db_info.name
410 );
411 } else {
412 let should_drop = if drop_existing {
414 true
416 } else if skip_confirmation {
417 tracing::info!(
419 " Auto-confirming drop for database '{}' (--yes flag)",
420 db_info.name
421 );
422 true
423 } else {
424 prompt_drop_database(&db_info.name)?
426 };
427
428 if should_drop {
429 drop_database_if_exists(&target_client, &db_info.name).await?;
430
431 let create_query = format!(
433 "CREATE DATABASE {}",
434 crate::utils::quote_ident(&db_info.name)
435 );
436 target_client
437 .execute(&create_query, &[])
438 .await
439 .with_context(|| {
440 format!(
441 "Failed to create database '{}' after drop",
442 db_info.name
443 )
444 })?;
445 tracing::info!(" Created database '{}'", db_info.name);
446 } else {
447 bail!("Aborted: Database '{}' already exists", db_info.name);
448 }
449 }
450 } else {
451 return Err(err).with_context(|| {
453 format!("Failed to create database '{}'", db_info.name)
454 });
455 }
456 } else {
457 return Err(err).with_context(|| {
459 format!("Failed to create database '{}'", db_info.name)
460 });
461 }
462 }
463 }
464 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
468 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
469 migration::dump_schema(
470 &source_db_url,
471 &db_info.name,
472 schema_file.to_str().unwrap(),
473 &filter,
474 )
475 .await?;
476
477 tracing::info!(" Restoring schema for '{}'...", db_info.name);
478 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
479
480 tracing::info!(" Dumping data for '{}'...", db_info.name);
482 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
483 migration::dump_data(
484 &source_db_url,
485 &db_info.name,
486 data_dir.to_str().unwrap(),
487 &filter,
488 )
489 .await?;
490
491 tracing::info!(" Restoring data for '{}'...", db_info.name);
492 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
493
494 if !filtered_tables.is_empty() {
495 tracing::info!(
496 " Applying filtered replication for {} table(s)...",
497 filtered_tables.len()
498 );
499 migration::filtered::copy_filtered_tables(
500 &source_db_url,
501 &target_db_url,
502 &filtered_tables,
503 )
504 .await?;
505 }
506
507 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
508
509 checkpoint_state.mark_completed(&db_info.name);
510 checkpoint_state
511 .save(&checkpoint_path)
512 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
513 }
514
515 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
518 tracing::warn!("Failed to clean up temp directory: {}", e);
519 }
521
522 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
523 tracing::warn!("Failed to remove checkpoint state: {}", err);
524 }
525
526 tracing::info!("✅ Initial replication complete");
527
528 let mut should_enable_sync = enable_sync;
530 if enable_sync {
531 tracing::info!("Checking target wal_level for logical replication...");
532 let target_wal_level = {
533 let target_client = postgres::connect_with_retry(target_url).await?;
535 postgres::check_wal_level(&target_client).await?
536 }; if target_wal_level != "logical" {
539 tracing::warn!("");
540 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
541 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
542 tracing::warn!("");
543 tracing::warn!(" To fix this:");
544 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
545 tracing::warn!(" 2. Restart PostgreSQL server");
546 tracing::warn!(
547 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
548 );
549 tracing::warn!("");
550 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
551 should_enable_sync = false;
552 }
553 }
554
555 if should_enable_sync {
557 tracing::info!("");
558 tracing::info!("========================================");
559 tracing::info!("Step 5/5: Setting up continuous replication...");
560 tracing::info!("========================================");
561 tracing::info!("");
562
563 crate::commands::sync(
565 source_url,
566 target_url,
567 Some(filter),
568 None,
569 None,
570 None,
571 false,
572 )
573 .await
574 .context("Failed to set up continuous replication")?;
575
576 tracing::info!("");
577 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
578 } else {
579 tracing::info!("");
580 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
581 tracing::info!(" To enable it later, run:");
582 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
583 }
584
585 Ok(())
586}
587
588fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
590 let parts: Vec<&str> = url.split('?').collect();
595 let base_url = parts[0];
596 let params = if parts.len() > 1 {
597 Some(parts[1])
598 } else {
599 None
600 };
601
602 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
604 if url_parts.len() != 2 {
605 anyhow::bail!("Invalid connection URL format");
606 }
607
608 let mut new_url = format!("{}/{}", url_parts[1], new_database);
610 if let Some(p) = params {
611 new_url = format!("{}?{}", new_url, p);
612 }
613
614 Ok(new_url)
615}
616
617fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
634 use std::time::Duration;
635
636 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
638 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
639
640 println!();
642 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
643 println!("{}", "─".repeat(50));
644
645 for size in sizes {
647 println!(
648 "{:<20} {:<12} {:<15}",
649 size.name,
650 size.size_human,
651 migration::format_duration(size.estimated_duration)
652 );
653 }
654
655 println!("{}", "─".repeat(50));
657 println!(
658 "Total: {} (estimated {})",
659 migration::format_bytes(total_bytes),
660 migration::format_duration(total_duration)
661 );
662 println!();
663
664 print!("Proceed with replication? [y/N]: ");
666 io::stdout().flush()?;
667
668 let mut input = String::new();
669 io::stdin()
670 .read_line(&mut input)
671 .context("Failed to read user input")?;
672
673 Ok(input.trim().to_lowercase() == "y")
674}
675
676async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
678 let db_url = replace_database_in_url(target_url, db_name)?;
680 let client = postgres::connect_with_retry(&db_url).await?;
681
682 let query = "
683 SELECT COUNT(*)
684 FROM information_schema.tables
685 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
686 ";
687
688 let row = client.query_one(query, &[]).await?;
689 let count: i64 = row.get(0);
690
691 Ok(count == 0)
692}
693
694fn prompt_drop_database(db_name: &str) -> Result<bool> {
696 use std::io::{self, Write};
697
698 print!(
699 "\nWarning: Database '{}' already exists on target and contains data.\n\
700 Drop and recreate database? This will delete all existing data. [y/N]: ",
701 db_name
702 );
703 io::stdout().flush()?;
704
705 let mut input = String::new();
706 io::stdin().read_line(&mut input)?;
707
708 Ok(input.trim().eq_ignore_ascii_case("y"))
709}
710
711async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
713 crate::utils::validate_postgres_identifier(db_name)
715 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
716
717 tracing::info!(" Dropping existing database '{}'...", db_name);
718
719 let terminate_query = "
721 SELECT pg_terminate_backend(pid)
722 FROM pg_stat_activity
723 WHERE datname = $1 AND pid <> pg_backend_pid()
724 ";
725 target_conn.execute(terminate_query, &[&db_name]).await?;
726
727 let drop_query = format!(
729 "DROP DATABASE IF EXISTS {}",
730 crate::utils::quote_ident(db_name)
731 );
732 target_conn
733 .execute(&drop_query, &[])
734 .await
735 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
736
737 tracing::info!(" ✓ Database '{}' dropped", db_name);
738 Ok(())
739}
740
741pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
789 tracing::info!("Starting SQLite to PostgreSQL migration...");
790
791 tracing::info!("Step 1/4: Validating SQLite database...");
793 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
794 .context("SQLite file validation failed")?;
795 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
796
797 tracing::info!("Step 2/4: Opening SQLite database...");
799 let sqlite_conn =
800 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
801 tracing::info!(" ✓ SQLite database opened (read-only mode)");
802
803 tracing::info!("Step 3/4: Discovering tables...");
805 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
806 .context("Failed to list tables from SQLite database")?;
807
808 if tables.is_empty() {
809 tracing::warn!("⚠ No tables found in SQLite database");
810 tracing::info!("✅ Migration complete (no tables to migrate)");
811 return Ok(());
812 }
813
814 tracing::info!("Found {} table(s) to migrate", tables.len());
815
816 let target_client = postgres::connect_with_retry(target_url).await?;
818 tracing::info!(" ✓ Connected to PostgreSQL target");
819
820 tracing::info!("Step 4/4: Migrating tables...");
822 for (idx, table_name) in tables.iter().enumerate() {
823 tracing::info!(
824 "Migrating table {}/{}: '{}'",
825 idx + 1,
826 tables.len(),
827 table_name
828 );
829
830 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
832 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
833
834 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
835
836 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
838 .await
839 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
840
841 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
842
843 if !rows.is_empty() {
844 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
846 .await
847 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
848
849 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
850 } else {
851 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
852 }
853 }
854
855 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
856 tracing::info!(
857 " Migrated {} table(s) from '{}' to PostgreSQL",
858 tables.len(),
859 sqlite_path
860 );
861
862 Ok(())
863}
864
865pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
916 tracing::info!("Starting MongoDB to PostgreSQL migration...");
917
918 tracing::info!("Step 1/5: Validating MongoDB connection...");
920 let client = crate::mongodb::connect_mongodb(mongo_url)
921 .await
922 .context("MongoDB connection failed")?;
923 tracing::info!(" ✓ MongoDB connection validated");
924
925 tracing::info!("Step 2/5: Extracting database name...");
927 let db_name = crate::mongodb::extract_database_name(mongo_url)
928 .await
929 .context("Failed to parse MongoDB connection string")?
930 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
931 tracing::info!(" ✓ Database name: '{}'", db_name);
932
933 tracing::info!("Step 3/5: Discovering collections...");
935 let db = client.database(&db_name);
936 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
937 .await
938 .context("Failed to list collections from MongoDB database")?;
939
940 if collections.is_empty() {
941 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
942 tracing::info!("✅ Migration complete (no collections to migrate)");
943 return Ok(());
944 }
945
946 tracing::info!("Found {} collection(s) to migrate", collections.len());
947
948 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
950 let target_client = postgres::connect_with_retry(target_url).await?;
951 tracing::info!(" ✓ Connected to PostgreSQL target");
952
953 tracing::info!("Step 5/5: Migrating collections...");
955 for (idx, collection_name) in collections.iter().enumerate() {
956 tracing::info!(
957 "Migrating collection {}/{}: '{}'",
958 idx + 1,
959 collections.len(),
960 collection_name
961 );
962
963 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
965 .await
966 .with_context(|| {
967 format!(
968 "Failed to convert collection '{}' to JSONB",
969 collection_name
970 )
971 })?;
972
973 tracing::info!(
974 " ✓ Converted {} documents from '{}'",
975 rows.len(),
976 collection_name
977 );
978
979 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
981 .await
982 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
983
984 tracing::info!(
985 " ✓ Created JSONB table '{}' in PostgreSQL",
986 collection_name
987 );
988
989 if !rows.is_empty() {
990 crate::jsonb::writer::insert_jsonb_batch(
992 &target_client,
993 collection_name,
994 rows,
995 "mongodb",
996 )
997 .await
998 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
999
1000 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1001 } else {
1002 tracing::info!(
1003 " ✓ Collection '{}' is empty (no documents to insert)",
1004 collection_name
1005 );
1006 }
1007 }
1008
1009 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1010 tracing::info!(
1011 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1012 collections.len(),
1013 db_name
1014 );
1015
1016 Ok(())
1017}
1018
1019pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1070 tracing::info!("Starting MySQL to PostgreSQL replication...");
1071
1072 tracing::info!("Step 1/5: Validating MySQL connection...");
1074 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1075 .await
1076 .context("MySQL connection failed")?;
1077 tracing::info!(" ✓ MySQL connection validated");
1078
1079 tracing::info!("Step 2/5: Extracting database name...");
1081 let db_name = crate::mysql::extract_database_name(mysql_url)
1082 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1083 tracing::info!(" ✓ Database name: '{}'", db_name);
1084
1085 tracing::info!("Step 3/5: Discovering tables...");
1087 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1088 .await
1089 .context("Failed to list tables from MySQL database")?;
1090
1091 if tables.is_empty() {
1092 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1093 tracing::info!("✅ Replication complete (no tables to replicate)");
1094 return Ok(());
1095 }
1096
1097 tracing::info!("Found {} table(s) to replicate", tables.len());
1098
1099 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1101 let target_client = postgres::connect_with_retry(target_url).await?;
1102 tracing::info!(" ✓ Connected to PostgreSQL target");
1103
1104 tracing::info!("Step 5/5: Replicating tables...");
1106 for (idx, table_name) in tables.iter().enumerate() {
1107 tracing::info!(
1108 "Replicating table {}/{}: '{}'",
1109 idx + 1,
1110 tables.len(),
1111 table_name
1112 );
1113
1114 let rows =
1116 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1117 .await
1118 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1119
1120 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1121
1122 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1124 .await
1125 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1126
1127 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1128
1129 if !rows.is_empty() {
1130 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1132 .await
1133 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1134
1135 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1136 } else {
1137 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1138 }
1139 }
1140
1141 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1142 tracing::info!(
1143 " Replicated {} table(s) from database '{}' to PostgreSQL",
1144 tables.len(),
1145 db_name
1146 );
1147
1148 Ok(())
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use super::*;
1154
1155 #[tokio::test]
1156 #[ignore]
1157 async fn test_init_replicates_database() {
1158 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1159 let target = std::env::var("TEST_TARGET_URL").unwrap();
1160
1161 let filter = crate::filters::ReplicationFilter::empty();
1163 let result = init(&source, &target, true, filter, false, false, true, false).await;
1164 assert!(result.is_ok());
1165 }
1166
1167 #[test]
1168 fn test_replace_database_in_url() {
1169 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1170 let result = replace_database_in_url(url, "newdb").unwrap();
1171 assert_eq!(
1172 result,
1173 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1174 );
1175
1176 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1177 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1178 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1179 }
1180
1181 #[tokio::test]
1182 #[ignore]
1183 async fn test_database_is_empty() {
1184 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1185
1186 let result = database_is_empty(&url, "postgres").await;
1189 assert!(result.is_ok());
1190 }
1191}