1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::info!(
157 "--drop-existing: existing JSONB tables on the target will be dropped"
158 );
159 }
160 if !enable_sync {
161 tracing::warn!(
162 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
163 );
164 }
165
166 return init_sqlite_to_postgres(source_url, target_url, drop_existing).await;
167 }
168 crate::SourceType::MongoDB => {
169 tracing::info!("Source type: MongoDB");
171
172 if !filter.is_empty() {
174 tracing::warn!(
175 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
176 );
177 }
178 if drop_existing {
179 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
180 }
181 if !enable_sync {
182 tracing::warn!(
183 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
184 );
185 }
186
187 return init_mongodb_to_postgres(source_url, target_url).await;
188 }
189 crate::SourceType::MySQL => {
190 tracing::info!("Source type: MySQL");
192
193 if !filter.is_empty() {
195 tracing::warn!(
196 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
197 );
198 }
199 if drop_existing {
200 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
201 }
202 if !enable_sync {
203 tracing::warn!(
204 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
205 );
206 }
207
208 return init_mysql_to_postgres(source_url, target_url).await;
209 }
210 }
211
212 crate::utils::validate_source_target_different(source_url, target_url)
214 .context("Source and target validation failed")?;
215 tracing::info!("✓ Verified source and target are different databases");
216
217 let temp_path =
220 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
221 tracing::debug!("Using temp directory: {}", temp_path.display());
222
223 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
224 .context("Failed to determine checkpoint location")?;
225
226 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
228 let globals_file = temp_path.join("globals.sql");
229 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
230 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
231 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
232 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
233 .context("Failed to remove SUPERUSER from globals dump")?;
234 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted parameter settings from globals dump")?;
236 remove_restricted_role_grants(globals_file.to_str().unwrap())
237 .context("Failed to remove restricted role grants from globals dump")?;
238 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
239 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
240
241 tracing::info!("Step 2/4: Restoring global objects to target...");
243 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
244
245 tracing::info!("Step 3/4: Discovering databases...");
247 let all_databases = {
248 let source_client = postgres::connect_with_retry(source_url).await?;
250 migration::list_databases(&source_client).await?
251 }; let databases: Vec<_> = all_databases
255 .into_iter()
256 .filter(|db| filter.should_replicate_database(&db.name))
257 .collect();
258
259 if databases.is_empty() {
260 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
261 if filter.is_empty() {
262 tracing::warn!("⚠ No user databases found on source");
263 tracing::warn!(" This is unusual - the source database appears empty");
264 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
265 } else {
266 tracing::warn!("⚠ No databases matched the filter criteria");
267 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
268 }
269 tracing::info!("✅ Initial replication complete (no databases to replicate)");
270 return Ok(());
271 }
272
273 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
274 let filter_hash = filter.fingerprint();
275 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
276 source_url,
277 target_url,
278 filter_hash,
279 drop_existing,
280 enable_sync,
281 );
282
283 let mut checkpoint_state = if allow_resume {
284 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
285 Some(existing) => {
286 match existing.validate(&checkpoint_metadata, &database_names) {
288 Ok(()) => {
289 if existing.completed_count() > 0 {
291 tracing::info!(
292 "Resume checkpoint found: {}/{} databases already replicated",
293 existing.completed_count(),
294 existing.total_databases()
295 );
296 } else {
297 tracing::info!(
298 "Resume checkpoint found but no databases marked complete yet"
299 );
300 }
301 existing
302 }
303 Err(e) => {
304 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
306 tracing::warn!(
307 " Previous run configuration differs from current configuration"
308 );
309 tracing::warn!(" - Schema-only tables may have changed");
310 tracing::warn!(" - Time filters may have changed");
311 tracing::warn!(" - Table selection may have changed");
312 tracing::warn!(" Error: {}", e);
313 tracing::info!("");
314 tracing::info!(
315 "✓ Automatically discarding old checkpoint and starting fresh"
316 );
317 checkpoint::remove_checkpoint(&checkpoint_path)?;
318 checkpoint::InitCheckpoint::new(
319 checkpoint_metadata.clone(),
320 &database_names,
321 )
322 }
323 }
324 }
325 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
326 }
327 } else {
328 if checkpoint_path.exists() {
329 tracing::info!(
330 "--no-resume supplied: discarding previous checkpoint at {}",
331 checkpoint_path.display()
332 );
333 }
334 checkpoint::remove_checkpoint(&checkpoint_path)?;
335 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
336 };
337
338 checkpoint_state
340 .save(&checkpoint_path)
341 .context("Failed to persist checkpoint state")?;
342
343 tracing::info!("Found {} database(s) to replicate", databases.len());
344
345 if let Ok(target_parts) = crate::utils::parse_postgres_url(target_url) {
348 let target_db = &target_parts.database;
349 let source_db_names: Vec<&str> = databases.iter().map(|db| db.name.as_str()).collect();
350
351 if !source_db_names.contains(&target_db.as_str())
352 && target_db != "postgres"
353 && !target_db.is_empty()
354 {
355 println!();
356 println!("========================================");
357 println!("⚠️ IMPORTANT: Target database name ignored");
358 println!("========================================");
359 println!();
360 println!(
361 "You specified target database '{}', but replication preserves",
362 target_db
363 );
364 println!("source database names. Data will be replicated to:");
365 println!();
366 for db_name in &source_db_names {
367 println!(" → {}", db_name);
368 }
369 println!();
370 println!(
371 "The '{}' database in your target URL is used only for",
372 target_db
373 );
374 println!("the initial connection. Source databases will be created");
375 println!("on the target server with their original names.");
376 println!();
377 println!("When running 'sync' later, use the correct database name:");
378 if let Some(first_db) = source_db_names.first() {
379 println!(" --target \"postgresql://.../{}\"\n", first_db);
380 }
381 println!("========================================");
382 println!();
383 }
384 }
385
386 if !skip_confirmation {
388 tracing::info!("Analyzing database sizes...");
389 let size_estimates = {
390 let source_client = postgres::connect_with_retry(source_url).await?;
392 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
393 .await?
394 }; if !confirm_replication(&size_estimates)? {
397 bail!("Replication cancelled by user");
398 }
399 }
400
401 tracing::info!("Step 4/4: Replicating databases...");
403 for (idx, db_info) in databases.iter().enumerate() {
404 let filtered_tables = filter.predicate_tables(&db_info.name);
405 if checkpoint_state.is_completed(&db_info.name) {
406 tracing::info!(
407 "Skipping database '{}' (already completed per checkpoint)",
408 db_info.name
409 );
410 continue;
411 }
412 tracing::info!(
413 "Replicating database {}/{}: '{}'",
414 idx + 1,
415 databases.len(),
416 db_info.name
417 );
418
419 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
421 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
422
423 {
426 let target_client = postgres::connect_with_retry(target_url).await?;
427
428 crate::utils::validate_postgres_identifier(&db_info.name)
430 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
431
432 let create_query = format!(
434 "CREATE DATABASE {}",
435 crate::utils::quote_ident(&db_info.name)
436 );
437 match target_client.execute(&create_query, &[]).await {
438 Ok(_) => {
439 tracing::info!(" Created database '{}'", db_info.name);
440 }
441 Err(err) => {
442 if let Some(db_error) = err.as_db_error() {
444 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
445 tracing::info!(
447 " Database '{}' already exists on target",
448 db_info.name
449 );
450
451 let is_empty = {
454 let db_client =
455 postgres::connect_with_retry(&target_db_url).await?;
456 database_is_empty(&db_client).await?
457 }; if is_empty {
460 tracing::info!(
461 " Database '{}' is empty, proceeding with restore",
462 db_info.name
463 );
464 } else {
465 let should_drop = if drop_existing {
467 true
469 } else if skip_confirmation {
470 tracing::info!(
472 " Auto-confirming drop for database '{}' (--yes flag)",
473 db_info.name
474 );
475 true
476 } else {
477 prompt_drop_database(&db_info.name)?
479 };
480
481 if should_drop {
482 drop_database_if_exists(&target_client, &db_info.name).await?;
483
484 let create_query = format!(
486 "CREATE DATABASE {}",
487 crate::utils::quote_ident(&db_info.name)
488 );
489 target_client
490 .execute(&create_query, &[])
491 .await
492 .with_context(|| {
493 format!(
494 "Failed to create database '{}' after drop",
495 db_info.name
496 )
497 })?;
498 tracing::info!(" Created database '{}'", db_info.name);
499 } else {
500 bail!("Aborted: Database '{}' already exists", db_info.name);
501 }
502 }
503 } else {
504 return Err(err).with_context(|| {
506 format!("Failed to create database '{}'", db_info.name)
507 });
508 }
509 } else {
510 return Err(err).with_context(|| {
512 format!("Failed to create database '{}'", db_info.name)
513 });
514 }
515 }
516 }
517 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
521 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
522 migration::dump_schema(
523 &source_db_url,
524 &db_info.name,
525 schema_file.to_str().unwrap(),
526 &filter,
527 )
528 .await?;
529
530 tracing::info!(" Restoring schema for '{}'...", db_info.name);
531 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
532
533 tracing::info!(" Dumping data for '{}'...", db_info.name);
535 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
536 migration::dump_data(
537 &source_db_url,
538 &db_info.name,
539 data_dir.to_str().unwrap(),
540 &filter,
541 )
542 .await?;
543
544 tracing::info!(" Restoring data for '{}'...", db_info.name);
545 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
546
547 if !filtered_tables.is_empty() {
548 tracing::info!(
549 " Applying filtered replication for {} table(s)...",
550 filtered_tables.len()
551 );
552 migration::filtered::copy_filtered_tables(
553 &source_db_url,
554 &target_db_url,
555 &filtered_tables,
556 )
557 .await?;
558 }
559
560 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
561
562 checkpoint_state.mark_completed(&db_info.name);
563 checkpoint_state
564 .save(&checkpoint_path)
565 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
566 }
567
568 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
571 tracing::warn!("Failed to clean up temp directory: {}", e);
572 }
574
575 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
576 tracing::warn!("Failed to remove checkpoint state: {}", err);
577 }
578
579 tracing::info!("✅ Initial replication complete");
580
581 let mut should_enable_sync = enable_sync;
583 if enable_sync {
584 tracing::info!("Checking target wal_level for logical replication...");
585 let target_wal_level = {
586 let target_client = postgres::connect_with_retry(target_url).await?;
588 postgres::check_wal_level(&target_client).await?
589 }; if target_wal_level != "logical" {
592 tracing::warn!("");
593 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
594 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
595 tracing::warn!("");
596 tracing::warn!(" To fix this:");
597 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
598 tracing::warn!(" 2. Restart PostgreSQL server");
599 tracing::warn!(
600 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
601 );
602 tracing::warn!("");
603 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
604 should_enable_sync = false;
605 }
606 }
607
608 if should_enable_sync {
610 tracing::info!("");
611 tracing::info!("========================================");
612 tracing::info!("Step 5/5: Setting up continuous replication...");
613 tracing::info!("========================================");
614 tracing::info!("");
615
616 crate::commands::sync(
618 source_url,
619 target_url,
620 Some(filter),
621 None,
622 None,
623 None,
624 false,
625 )
626 .await
627 .context("Failed to set up continuous replication")?;
628
629 tracing::info!("");
630 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
631 } else {
632 tracing::info!("");
633 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
634 tracing::info!(" To enable it later, run:");
635 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
636 }
637
638 Ok(())
639}
640
641fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
643 let parts: Vec<&str> = url.split('?').collect();
648 let base_url = parts[0];
649 let params = if parts.len() > 1 {
650 Some(parts[1])
651 } else {
652 None
653 };
654
655 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
657 if url_parts.len() != 2 {
658 anyhow::bail!("Invalid connection URL format");
659 }
660
661 let mut new_url = format!("{}/{}", url_parts[1], new_database);
663 if let Some(p) = params {
664 new_url = format!("{}?{}", new_url, p);
665 }
666
667 Ok(new_url)
668}
669
670fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
687 use std::time::Duration;
688
689 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
691 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
692
693 println!();
695 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
696 println!("{}", "─".repeat(50));
697
698 for size in sizes {
700 println!(
701 "{:<20} {:<12} {:<15}",
702 size.name,
703 size.size_human,
704 migration::format_duration(size.estimated_duration)
705 );
706 }
707
708 println!("{}", "─".repeat(50));
710 println!(
711 "Total: {} (estimated {})",
712 migration::format_bytes(total_bytes),
713 migration::format_duration(total_duration)
714 );
715 println!();
716
717 print!("Proceed with replication? [y/N]: ");
719 io::stdout().flush()?;
720
721 let mut input = String::new();
722 io::stdin()
723 .read_line(&mut input)
724 .context("Failed to read user input")?;
725
726 Ok(input.trim().to_lowercase() == "y")
727}
728
729async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
733 let query = "
734 SELECT COUNT(*)
735 FROM information_schema.tables
736 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
737 ";
738
739 let row = tokio::time::timeout(
741 std::time::Duration::from_secs(30),
742 client.query_one(query, &[]),
743 )
744 .await
745 .context("database_is_empty query timed out after 30 seconds")?
746 .context("Failed to query information_schema.tables")?;
747
748 let count: i64 = row.get(0);
749
750 Ok(count == 0)
751}
752
753fn prompt_drop_database(db_name: &str) -> Result<bool> {
755 use std::io::{self, Write};
756
757 print!(
758 "\nWarning: Database '{}' already exists on target and contains data.\n\
759 Drop and recreate database? This will delete all existing data. [y/N]: ",
760 db_name
761 );
762 io::stdout().flush()?;
763
764 let mut input = String::new();
765 io::stdin().read_line(&mut input)?;
766
767 Ok(input.trim().eq_ignore_ascii_case("y"))
768}
769
770async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
772 crate::utils::validate_postgres_identifier(db_name)
774 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
775
776 tracing::info!(" Dropping existing database '{}'...", db_name);
777
778 let terminate_query = "
781 SELECT pg_terminate_backend(sa.pid)
782 FROM pg_stat_activity sa
783 JOIN pg_roles r ON sa.usename = r.rolname
784 WHERE sa.datname = $1
785 AND sa.pid <> pg_backend_pid()
786 AND NOT r.rolsuper
787 ";
788 target_conn.execute(terminate_query, &[&db_name]).await?;
789
790 let remaining_query = "
792 SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
793 FROM pg_stat_activity sa
794 WHERE sa.datname = $1
795 AND sa.pid <> pg_backend_pid()
796 ";
797 let row = target_conn
798 .query_one(remaining_query, &[&db_name])
799 .await
800 .context("Failed to check remaining connections")?;
801 let remaining_count: i64 = row.get(0);
802 let remaining_users: Option<String> = row.get(1);
803
804 if remaining_count > 0 {
805 let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
806 bail!(
807 "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
808 These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
809 This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
810 processes maintain superuser connections.\n\n\
811 To resolve this:\n\
812 1. Wait a few minutes and retry (system connections may be temporary)\n\
813 2. Ask your database administrator to terminate the blocking sessions:\n\
814 SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
815 3. If using AWS RDS, check for RDS-managed connections in the RDS console",
816 db_name,
817 remaining_count,
818 users,
819 db_name
820 );
821 }
822
823 let drop_query = format!(
825 "DROP DATABASE IF EXISTS {}",
826 crate::utils::quote_ident(db_name)
827 );
828 target_conn
829 .execute(&drop_query, &[])
830 .await
831 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
832
833 tracing::info!(" ✓ Database '{}' dropped", db_name);
834 Ok(())
835}
836
837pub async fn init_sqlite_to_postgres(
887 sqlite_path: &str,
888 target_url: &str,
889 drop_existing: bool,
890) -> Result<()> {
891 tracing::info!("Starting SQLite to PostgreSQL migration...");
892
893 tracing::info!("Step 1/4: Validating SQLite database...");
895 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
896 .context("SQLite file validation failed")?;
897 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
898
899 tracing::info!("Step 2/4: Opening SQLite database...");
901 let sqlite_conn =
902 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
903 tracing::info!(" ✓ SQLite database opened (read-only mode)");
904
905 tracing::info!("Step 3/4: Discovering tables...");
907 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
908 .context("Failed to list tables from SQLite database")?;
909
910 if tables.is_empty() {
911 tracing::warn!("⚠ No tables found in SQLite database");
912 tracing::info!("✅ Migration complete (no tables to migrate)");
913 return Ok(());
914 }
915
916 tracing::info!("Found {} table(s) to migrate", tables.len());
917
918 let target_client = postgres::connect_with_retry(target_url).await?;
920 tracing::info!(" ✓ Connected to PostgreSQL target");
921
922 let mut table_row_counts: Vec<(&str, usize)> = Vec::new();
924 let mut total_rows = 0usize;
925 for table_name in &tables {
926 let count =
927 crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0);
928 table_row_counts.push((table_name, count));
929 total_rows += count;
930 }
931
932 tracing::info!(
933 "Total rows to migrate: {} across {} table(s)",
934 total_rows,
935 tables.len()
936 );
937
938 tracing::info!("Step 4/4: Migrating tables (batched processing)...");
940 let mut migrated_rows = 0usize;
941
942 for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() {
943 tracing::info!(
944 "Migrating table {}/{}: '{}' ({} rows)",
945 idx + 1,
946 tables.len(),
947 table_name,
948 row_count
949 );
950
951 if drop_existing {
952 crate::jsonb::writer::drop_jsonb_table(&target_client, table_name)
953 .await
954 .with_context(|| format!("Failed to drop existing JSONB table '{}'", table_name))?;
955 }
956
957 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
959 .await
960 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
961
962 crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
964 .await
965 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
966
967 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
968
969 let rows_processed = crate::sqlite::converter::convert_table_batched(
971 &sqlite_conn,
972 &target_client,
973 table_name,
974 "sqlite",
975 None, )
977 .await
978 .with_context(|| format!("Failed to migrate table '{}'", table_name))?;
979
980 migrated_rows += rows_processed;
981
982 if rows_processed > 0 {
983 tracing::info!(
984 " ✓ Migrated {} rows from '{}' ({:.1}% of total)",
985 rows_processed,
986 table_name,
987 if total_rows > 0 {
988 migrated_rows as f64 / total_rows as f64 * 100.0
989 } else {
990 100.0
991 }
992 );
993 } else {
994 tracing::info!(" ✓ Table '{}' is empty (no rows to migrate)", table_name);
995 }
996 }
997
998 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
999 tracing::info!(
1000 " Migrated {} row(s) from {} table(s) in '{}'",
1001 migrated_rows,
1002 tables.len(),
1003 sqlite_path
1004 );
1005
1006 Ok(())
1007}
1008
1009pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
1060 tracing::info!("Starting MongoDB to PostgreSQL migration...");
1061
1062 tracing::info!("Step 1/5: Validating MongoDB connection...");
1064 let client = crate::mongodb::connect_mongodb(mongo_url)
1065 .await
1066 .context("MongoDB connection failed")?;
1067 tracing::info!(" ✓ MongoDB connection validated");
1068
1069 tracing::info!("Step 2/5: Extracting database name...");
1071 let db_name = crate::mongodb::extract_database_name(mongo_url)
1072 .await
1073 .context("Failed to parse MongoDB connection string")?
1074 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
1075 tracing::info!(" ✓ Database name: '{}'", db_name);
1076
1077 tracing::info!("Step 3/5: Discovering collections...");
1079 let db = client.database(&db_name);
1080 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
1081 .await
1082 .context("Failed to list collections from MongoDB database")?;
1083
1084 if collections.is_empty() {
1085 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
1086 tracing::info!("✅ Migration complete (no collections to migrate)");
1087 return Ok(());
1088 }
1089
1090 tracing::info!("Found {} collection(s) to migrate", collections.len());
1091
1092 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1094 let target_client = postgres::connect_with_retry(target_url).await?;
1095 tracing::info!(" ✓ Connected to PostgreSQL target");
1096
1097 tracing::info!("Step 5/5: Migrating collections...");
1099 for (idx, collection_name) in collections.iter().enumerate() {
1100 tracing::info!(
1101 "Migrating collection {}/{}: '{}'",
1102 idx + 1,
1103 collections.len(),
1104 collection_name
1105 );
1106
1107 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1109 .await
1110 .with_context(|| {
1111 format!(
1112 "Failed to convert collection '{}' to JSONB",
1113 collection_name
1114 )
1115 })?;
1116
1117 tracing::info!(
1118 " ✓ Converted {} documents from '{}'",
1119 rows.len(),
1120 collection_name
1121 );
1122
1123 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1125 .await
1126 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1127
1128 crate::jsonb::writer::truncate_jsonb_table(&target_client, collection_name)
1130 .await
1131 .with_context(|| format!("Failed to truncate JSONB table '{}'", collection_name))?;
1132
1133 tracing::info!(
1134 " ✓ Created JSONB table '{}' in PostgreSQL",
1135 collection_name
1136 );
1137
1138 if !rows.is_empty() {
1139 crate::jsonb::writer::insert_jsonb_batch(
1141 &target_client,
1142 collection_name,
1143 rows,
1144 "mongodb",
1145 )
1146 .await
1147 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1148
1149 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1150 } else {
1151 tracing::info!(
1152 " ✓ Collection '{}' is empty (no documents to insert)",
1153 collection_name
1154 );
1155 }
1156 }
1157
1158 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1159 tracing::info!(
1160 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1161 collections.len(),
1162 db_name
1163 );
1164
1165 Ok(())
1166}
1167
1168pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1219 tracing::info!("Starting MySQL to PostgreSQL replication...");
1220
1221 tracing::info!("Step 1/5: Validating MySQL connection...");
1223 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1224 .await
1225 .context("MySQL connection failed")?;
1226 tracing::info!(" ✓ MySQL connection validated");
1227
1228 tracing::info!("Step 2/5: Extracting database name...");
1230 let db_name = crate::mysql::extract_database_name(mysql_url)
1231 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1232 tracing::info!(" ✓ Database name: '{}'", db_name);
1233
1234 tracing::info!("Step 3/5: Discovering tables...");
1236 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1237 .await
1238 .context("Failed to list tables from MySQL database")?;
1239
1240 if tables.is_empty() {
1241 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1242 tracing::info!("✅ Replication complete (no tables to replicate)");
1243 return Ok(());
1244 }
1245
1246 tracing::info!("Found {} table(s) to replicate", tables.len());
1247
1248 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1250 let target_client = postgres::connect_with_retry(target_url).await?;
1251 tracing::info!(" ✓ Connected to PostgreSQL target");
1252
1253 tracing::info!("Step 5/5: Replicating tables...");
1255 for (idx, table_name) in tables.iter().enumerate() {
1256 tracing::info!(
1257 "Replicating table {}/{}: '{}'",
1258 idx + 1,
1259 tables.len(),
1260 table_name
1261 );
1262
1263 let rows =
1265 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1266 .await
1267 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1268
1269 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1270
1271 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1273 .await
1274 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1275
1276 crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
1278 .await
1279 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
1280
1281 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1282
1283 if !rows.is_empty() {
1284 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1286 .await
1287 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1288
1289 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1290 } else {
1291 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1292 }
1293 }
1294
1295 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1296 tracing::info!(
1297 " Replicated {} table(s) from database '{}' to PostgreSQL",
1298 tables.len(),
1299 db_name
1300 );
1301
1302 Ok(())
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307 use super::*;
1308
1309 #[tokio::test]
1310 #[ignore]
1311 async fn test_init_replicates_database() {
1312 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1313 let target = std::env::var("TEST_TARGET_URL").unwrap();
1314
1315 let filter = crate::filters::ReplicationFilter::empty();
1317 let result = init(&source, &target, true, filter, false, false, true, false).await;
1318 assert!(result.is_ok());
1319 }
1320
1321 #[test]
1322 fn test_replace_database_in_url() {
1323 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1324 let result = replace_database_in_url(url, "newdb").unwrap();
1325 assert_eq!(
1326 result,
1327 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1328 );
1329
1330 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1331 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1332 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1333 }
1334
1335 #[tokio::test]
1336 #[ignore]
1337 async fn test_database_is_empty() {
1338 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1339
1340 let client = crate::postgres::connect_with_retry(&url)
1342 .await
1343 .expect("Failed to connect");
1344
1345 let result = database_is_empty(&client).await;
1348 assert!(result.is_ok());
1349 }
1350}