1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157 }
158 if !enable_sync {
159 tracing::warn!(
160 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161 );
162 }
163
164 return init_sqlite_to_postgres(source_url, target_url).await;
165 }
166 crate::SourceType::MongoDB => {
167 tracing::info!("Source type: MongoDB");
169
170 if !filter.is_empty() {
172 tracing::warn!(
173 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174 );
175 }
176 if drop_existing {
177 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178 }
179 if !enable_sync {
180 tracing::warn!(
181 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182 );
183 }
184
185 return init_mongodb_to_postgres(source_url, target_url).await;
186 }
187 crate::SourceType::MySQL => {
188 tracing::info!("Source type: MySQL");
190
191 if !filter.is_empty() {
193 tracing::warn!(
194 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195 );
196 }
197 if drop_existing {
198 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199 }
200 if !enable_sync {
201 tracing::warn!(
202 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203 );
204 }
205
206 return init_mysql_to_postgres(source_url, target_url).await;
207 }
208 }
209
210 crate::utils::validate_source_target_different(source_url, target_url)
212 .context("Source and target validation failed")?;
213 tracing::info!("✓ Verified source and target are different databases");
214
215 let temp_path =
218 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219 tracing::debug!("Using temp directory: {}", temp_path.display());
220
221 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222 .context("Failed to determine checkpoint location")?;
223
224 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226 let globals_file = temp_path.join("globals.sql");
227 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231 .context("Failed to remove SUPERUSER from globals dump")?;
232 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233 .context("Failed to remove restricted parameter settings from globals dump")?;
234 remove_restricted_role_grants(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted role grants from globals dump")?;
236 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239 tracing::info!("Step 2/4: Restoring global objects to target...");
241 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243 tracing::info!("Step 3/4: Discovering databases...");
245 let all_databases = {
246 let source_client = postgres::connect_with_retry(source_url).await?;
248 migration::list_databases(&source_client).await?
249 }; let databases: Vec<_> = all_databases
253 .into_iter()
254 .filter(|db| filter.should_replicate_database(&db.name))
255 .collect();
256
257 if databases.is_empty() {
258 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259 if filter.is_empty() {
260 tracing::warn!("⚠ No user databases found on source");
261 tracing::warn!(" This is unusual - the source database appears empty");
262 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
263 } else {
264 tracing::warn!("⚠ No databases matched the filter criteria");
265 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
266 }
267 tracing::info!("✅ Initial replication complete (no databases to replicate)");
268 return Ok(());
269 }
270
271 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272 let filter_hash = filter.fingerprint();
273 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274 source_url,
275 target_url,
276 filter_hash,
277 drop_existing,
278 enable_sync,
279 );
280
281 let mut checkpoint_state = if allow_resume {
282 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283 Some(existing) => {
284 match existing.validate(&checkpoint_metadata, &database_names) {
286 Ok(()) => {
287 if existing.completed_count() > 0 {
289 tracing::info!(
290 "Resume checkpoint found: {}/{} databases already replicated",
291 existing.completed_count(),
292 existing.total_databases()
293 );
294 } else {
295 tracing::info!(
296 "Resume checkpoint found but no databases marked complete yet"
297 );
298 }
299 existing
300 }
301 Err(e) => {
302 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304 tracing::warn!(
305 " Previous run configuration differs from current configuration"
306 );
307 tracing::warn!(" - Schema-only tables may have changed");
308 tracing::warn!(" - Time filters may have changed");
309 tracing::warn!(" - Table selection may have changed");
310 tracing::warn!(" Error: {}", e);
311 tracing::info!("");
312 tracing::info!(
313 "✓ Automatically discarding old checkpoint and starting fresh"
314 );
315 checkpoint::remove_checkpoint(&checkpoint_path)?;
316 checkpoint::InitCheckpoint::new(
317 checkpoint_metadata.clone(),
318 &database_names,
319 )
320 }
321 }
322 }
323 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324 }
325 } else {
326 if checkpoint_path.exists() {
327 tracing::info!(
328 "--no-resume supplied: discarding previous checkpoint at {}",
329 checkpoint_path.display()
330 );
331 }
332 checkpoint::remove_checkpoint(&checkpoint_path)?;
333 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334 };
335
336 checkpoint_state
338 .save(&checkpoint_path)
339 .context("Failed to persist checkpoint state")?;
340
341 tracing::info!("Found {} database(s) to replicate", databases.len());
342
343 if let Ok(target_parts) = crate::utils::parse_postgres_url(target_url) {
346 let target_db = &target_parts.database;
347 let source_db_names: Vec<&str> = databases.iter().map(|db| db.name.as_str()).collect();
348
349 if !source_db_names.contains(&target_db.as_str())
350 && target_db != "postgres"
351 && !target_db.is_empty()
352 {
353 println!();
354 println!("========================================");
355 println!("⚠️ IMPORTANT: Target database name ignored");
356 println!("========================================");
357 println!();
358 println!(
359 "You specified target database '{}', but replication preserves",
360 target_db
361 );
362 println!("source database names. Data will be replicated to:");
363 println!();
364 for db_name in &source_db_names {
365 println!(" → {}", db_name);
366 }
367 println!();
368 println!(
369 "The '{}' database in your target URL is used only for",
370 target_db
371 );
372 println!("the initial connection. Source databases will be created");
373 println!("on the target server with their original names.");
374 println!();
375 println!("When running 'sync' later, use the correct database name:");
376 if let Some(first_db) = source_db_names.first() {
377 println!(" --target \"postgresql://.../{}\"\n", first_db);
378 }
379 println!("========================================");
380 println!();
381 }
382 }
383
384 if !skip_confirmation {
386 tracing::info!("Analyzing database sizes...");
387 let size_estimates = {
388 let source_client = postgres::connect_with_retry(source_url).await?;
390 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
391 .await?
392 }; if !confirm_replication(&size_estimates)? {
395 bail!("Replication cancelled by user");
396 }
397 }
398
399 tracing::info!("Step 4/4: Replicating databases...");
401 for (idx, db_info) in databases.iter().enumerate() {
402 let filtered_tables = filter.predicate_tables(&db_info.name);
403 if checkpoint_state.is_completed(&db_info.name) {
404 tracing::info!(
405 "Skipping database '{}' (already completed per checkpoint)",
406 db_info.name
407 );
408 continue;
409 }
410 tracing::info!(
411 "Replicating database {}/{}: '{}'",
412 idx + 1,
413 databases.len(),
414 db_info.name
415 );
416
417 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
419 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
420
421 {
424 let target_client = postgres::connect_with_retry(target_url).await?;
425
426 crate::utils::validate_postgres_identifier(&db_info.name)
428 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
429
430 let create_query = format!(
432 "CREATE DATABASE {}",
433 crate::utils::quote_ident(&db_info.name)
434 );
435 match target_client.execute(&create_query, &[]).await {
436 Ok(_) => {
437 tracing::info!(" Created database '{}'", db_info.name);
438 }
439 Err(err) => {
440 if let Some(db_error) = err.as_db_error() {
442 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
443 tracing::info!(
445 " Database '{}' already exists on target",
446 db_info.name
447 );
448
449 let is_empty = {
452 let db_client =
453 postgres::connect_with_retry(&target_db_url).await?;
454 database_is_empty(&db_client).await?
455 }; if is_empty {
458 tracing::info!(
459 " Database '{}' is empty, proceeding with restore",
460 db_info.name
461 );
462 } else {
463 let should_drop = if drop_existing {
465 true
467 } else if skip_confirmation {
468 tracing::info!(
470 " Auto-confirming drop for database '{}' (--yes flag)",
471 db_info.name
472 );
473 true
474 } else {
475 prompt_drop_database(&db_info.name)?
477 };
478
479 if should_drop {
480 drop_database_if_exists(&target_client, &db_info.name).await?;
481
482 let create_query = format!(
484 "CREATE DATABASE {}",
485 crate::utils::quote_ident(&db_info.name)
486 );
487 target_client
488 .execute(&create_query, &[])
489 .await
490 .with_context(|| {
491 format!(
492 "Failed to create database '{}' after drop",
493 db_info.name
494 )
495 })?;
496 tracing::info!(" Created database '{}'", db_info.name);
497 } else {
498 bail!("Aborted: Database '{}' already exists", db_info.name);
499 }
500 }
501 } else {
502 return Err(err).with_context(|| {
504 format!("Failed to create database '{}'", db_info.name)
505 });
506 }
507 } else {
508 return Err(err).with_context(|| {
510 format!("Failed to create database '{}'", db_info.name)
511 });
512 }
513 }
514 }
515 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
519 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
520 migration::dump_schema(
521 &source_db_url,
522 &db_info.name,
523 schema_file.to_str().unwrap(),
524 &filter,
525 )
526 .await?;
527
528 tracing::info!(" Restoring schema for '{}'...", db_info.name);
529 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
530
531 tracing::info!(" Dumping data for '{}'...", db_info.name);
533 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
534 migration::dump_data(
535 &source_db_url,
536 &db_info.name,
537 data_dir.to_str().unwrap(),
538 &filter,
539 )
540 .await?;
541
542 tracing::info!(" Restoring data for '{}'...", db_info.name);
543 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
544
545 if !filtered_tables.is_empty() {
546 tracing::info!(
547 " Applying filtered replication for {} table(s)...",
548 filtered_tables.len()
549 );
550 migration::filtered::copy_filtered_tables(
551 &source_db_url,
552 &target_db_url,
553 &filtered_tables,
554 )
555 .await?;
556 }
557
558 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
559
560 checkpoint_state.mark_completed(&db_info.name);
561 checkpoint_state
562 .save(&checkpoint_path)
563 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
564 }
565
566 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
569 tracing::warn!("Failed to clean up temp directory: {}", e);
570 }
572
573 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
574 tracing::warn!("Failed to remove checkpoint state: {}", err);
575 }
576
577 tracing::info!("✅ Initial replication complete");
578
579 let mut should_enable_sync = enable_sync;
581 if enable_sync {
582 tracing::info!("Checking target wal_level for logical replication...");
583 let target_wal_level = {
584 let target_client = postgres::connect_with_retry(target_url).await?;
586 postgres::check_wal_level(&target_client).await?
587 }; if target_wal_level != "logical" {
590 tracing::warn!("");
591 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
592 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
593 tracing::warn!("");
594 tracing::warn!(" To fix this:");
595 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
596 tracing::warn!(" 2. Restart PostgreSQL server");
597 tracing::warn!(
598 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
599 );
600 tracing::warn!("");
601 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
602 should_enable_sync = false;
603 }
604 }
605
606 if should_enable_sync {
608 tracing::info!("");
609 tracing::info!("========================================");
610 tracing::info!("Step 5/5: Setting up continuous replication...");
611 tracing::info!("========================================");
612 tracing::info!("");
613
614 crate::commands::sync(
616 source_url,
617 target_url,
618 Some(filter),
619 None,
620 None,
621 None,
622 false,
623 )
624 .await
625 .context("Failed to set up continuous replication")?;
626
627 tracing::info!("");
628 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
629 } else {
630 tracing::info!("");
631 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
632 tracing::info!(" To enable it later, run:");
633 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
634 }
635
636 Ok(())
637}
638
639fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
641 let parts: Vec<&str> = url.split('?').collect();
646 let base_url = parts[0];
647 let params = if parts.len() > 1 {
648 Some(parts[1])
649 } else {
650 None
651 };
652
653 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
655 if url_parts.len() != 2 {
656 anyhow::bail!("Invalid connection URL format");
657 }
658
659 let mut new_url = format!("{}/{}", url_parts[1], new_database);
661 if let Some(p) = params {
662 new_url = format!("{}?{}", new_url, p);
663 }
664
665 Ok(new_url)
666}
667
668fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
685 use std::time::Duration;
686
687 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
689 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
690
691 println!();
693 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
694 println!("{}", "─".repeat(50));
695
696 for size in sizes {
698 println!(
699 "{:<20} {:<12} {:<15}",
700 size.name,
701 size.size_human,
702 migration::format_duration(size.estimated_duration)
703 );
704 }
705
706 println!("{}", "─".repeat(50));
708 println!(
709 "Total: {} (estimated {})",
710 migration::format_bytes(total_bytes),
711 migration::format_duration(total_duration)
712 );
713 println!();
714
715 print!("Proceed with replication? [y/N]: ");
717 io::stdout().flush()?;
718
719 let mut input = String::new();
720 io::stdin()
721 .read_line(&mut input)
722 .context("Failed to read user input")?;
723
724 Ok(input.trim().to_lowercase() == "y")
725}
726
727async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
731 let query = "
732 SELECT COUNT(*)
733 FROM information_schema.tables
734 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
735 ";
736
737 let row = tokio::time::timeout(
739 std::time::Duration::from_secs(30),
740 client.query_one(query, &[]),
741 )
742 .await
743 .context("database_is_empty query timed out after 30 seconds")?
744 .context("Failed to query information_schema.tables")?;
745
746 let count: i64 = row.get(0);
747
748 Ok(count == 0)
749}
750
751fn prompt_drop_database(db_name: &str) -> Result<bool> {
753 use std::io::{self, Write};
754
755 print!(
756 "\nWarning: Database '{}' already exists on target and contains data.\n\
757 Drop and recreate database? This will delete all existing data. [y/N]: ",
758 db_name
759 );
760 io::stdout().flush()?;
761
762 let mut input = String::new();
763 io::stdin().read_line(&mut input)?;
764
765 Ok(input.trim().eq_ignore_ascii_case("y"))
766}
767
768async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
770 crate::utils::validate_postgres_identifier(db_name)
772 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
773
774 tracing::info!(" Dropping existing database '{}'...", db_name);
775
776 let terminate_query = "
779 SELECT pg_terminate_backend(sa.pid)
780 FROM pg_stat_activity sa
781 JOIN pg_roles r ON sa.usename = r.rolname
782 WHERE sa.datname = $1
783 AND sa.pid <> pg_backend_pid()
784 AND NOT r.rolsuper
785 ";
786 target_conn.execute(terminate_query, &[&db_name]).await?;
787
788 let remaining_query = "
790 SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
791 FROM pg_stat_activity sa
792 WHERE sa.datname = $1
793 AND sa.pid <> pg_backend_pid()
794 ";
795 let row = target_conn
796 .query_one(remaining_query, &[&db_name])
797 .await
798 .context("Failed to check remaining connections")?;
799 let remaining_count: i64 = row.get(0);
800 let remaining_users: Option<String> = row.get(1);
801
802 if remaining_count > 0 {
803 let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
804 bail!(
805 "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
806 These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
807 This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
808 processes maintain superuser connections.\n\n\
809 To resolve this:\n\
810 1. Wait a few minutes and retry (system connections may be temporary)\n\
811 2. Ask your database administrator to terminate the blocking sessions:\n\
812 SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
813 3. If using AWS RDS, check for RDS-managed connections in the RDS console",
814 db_name,
815 remaining_count,
816 users,
817 db_name
818 );
819 }
820
821 let drop_query = format!(
823 "DROP DATABASE IF EXISTS {}",
824 crate::utils::quote_ident(db_name)
825 );
826 target_conn
827 .execute(&drop_query, &[])
828 .await
829 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
830
831 tracing::info!(" ✓ Database '{}' dropped", db_name);
832 Ok(())
833}
834
835pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
883 tracing::info!("Starting SQLite to PostgreSQL migration...");
884
885 tracing::info!("Step 1/4: Validating SQLite database...");
887 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
888 .context("SQLite file validation failed")?;
889 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
890
891 tracing::info!("Step 2/4: Opening SQLite database...");
893 let sqlite_conn =
894 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
895 tracing::info!(" ✓ SQLite database opened (read-only mode)");
896
897 tracing::info!("Step 3/4: Discovering tables...");
899 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
900 .context("Failed to list tables from SQLite database")?;
901
902 if tables.is_empty() {
903 tracing::warn!("⚠ No tables found in SQLite database");
904 tracing::info!("✅ Migration complete (no tables to migrate)");
905 return Ok(());
906 }
907
908 tracing::info!("Found {} table(s) to migrate", tables.len());
909
910 let target_client = postgres::connect_with_retry(target_url).await?;
912 tracing::info!(" ✓ Connected to PostgreSQL target");
913
914 tracing::info!("Step 4/4: Migrating tables...");
916 for (idx, table_name) in tables.iter().enumerate() {
917 tracing::info!(
918 "Migrating table {}/{}: '{}'",
919 idx + 1,
920 tables.len(),
921 table_name
922 );
923
924 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
926 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
927
928 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
929
930 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
932 .await
933 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
934
935 crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
937 .await
938 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
939
940 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
941
942 if !rows.is_empty() {
943 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
945 .await
946 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
947
948 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
949 } else {
950 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
951 }
952 }
953
954 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
955 tracing::info!(
956 " Migrated {} table(s) from '{}' to PostgreSQL",
957 tables.len(),
958 sqlite_path
959 );
960
961 Ok(())
962}
963
964pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
1015 tracing::info!("Starting MongoDB to PostgreSQL migration...");
1016
1017 tracing::info!("Step 1/5: Validating MongoDB connection...");
1019 let client = crate::mongodb::connect_mongodb(mongo_url)
1020 .await
1021 .context("MongoDB connection failed")?;
1022 tracing::info!(" ✓ MongoDB connection validated");
1023
1024 tracing::info!("Step 2/5: Extracting database name...");
1026 let db_name = crate::mongodb::extract_database_name(mongo_url)
1027 .await
1028 .context("Failed to parse MongoDB connection string")?
1029 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
1030 tracing::info!(" ✓ Database name: '{}'", db_name);
1031
1032 tracing::info!("Step 3/5: Discovering collections...");
1034 let db = client.database(&db_name);
1035 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
1036 .await
1037 .context("Failed to list collections from MongoDB database")?;
1038
1039 if collections.is_empty() {
1040 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
1041 tracing::info!("✅ Migration complete (no collections to migrate)");
1042 return Ok(());
1043 }
1044
1045 tracing::info!("Found {} collection(s) to migrate", collections.len());
1046
1047 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1049 let target_client = postgres::connect_with_retry(target_url).await?;
1050 tracing::info!(" ✓ Connected to PostgreSQL target");
1051
1052 tracing::info!("Step 5/5: Migrating collections...");
1054 for (idx, collection_name) in collections.iter().enumerate() {
1055 tracing::info!(
1056 "Migrating collection {}/{}: '{}'",
1057 idx + 1,
1058 collections.len(),
1059 collection_name
1060 );
1061
1062 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1064 .await
1065 .with_context(|| {
1066 format!(
1067 "Failed to convert collection '{}' to JSONB",
1068 collection_name
1069 )
1070 })?;
1071
1072 tracing::info!(
1073 " ✓ Converted {} documents from '{}'",
1074 rows.len(),
1075 collection_name
1076 );
1077
1078 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1080 .await
1081 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1082
1083 crate::jsonb::writer::truncate_jsonb_table(&target_client, collection_name)
1085 .await
1086 .with_context(|| format!("Failed to truncate JSONB table '{}'", collection_name))?;
1087
1088 tracing::info!(
1089 " ✓ Created JSONB table '{}' in PostgreSQL",
1090 collection_name
1091 );
1092
1093 if !rows.is_empty() {
1094 crate::jsonb::writer::insert_jsonb_batch(
1096 &target_client,
1097 collection_name,
1098 rows,
1099 "mongodb",
1100 )
1101 .await
1102 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1103
1104 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1105 } else {
1106 tracing::info!(
1107 " ✓ Collection '{}' is empty (no documents to insert)",
1108 collection_name
1109 );
1110 }
1111 }
1112
1113 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1114 tracing::info!(
1115 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1116 collections.len(),
1117 db_name
1118 );
1119
1120 Ok(())
1121}
1122
1123pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1174 tracing::info!("Starting MySQL to PostgreSQL replication...");
1175
1176 tracing::info!("Step 1/5: Validating MySQL connection...");
1178 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1179 .await
1180 .context("MySQL connection failed")?;
1181 tracing::info!(" ✓ MySQL connection validated");
1182
1183 tracing::info!("Step 2/5: Extracting database name...");
1185 let db_name = crate::mysql::extract_database_name(mysql_url)
1186 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1187 tracing::info!(" ✓ Database name: '{}'", db_name);
1188
1189 tracing::info!("Step 3/5: Discovering tables...");
1191 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1192 .await
1193 .context("Failed to list tables from MySQL database")?;
1194
1195 if tables.is_empty() {
1196 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1197 tracing::info!("✅ Replication complete (no tables to replicate)");
1198 return Ok(());
1199 }
1200
1201 tracing::info!("Found {} table(s) to replicate", tables.len());
1202
1203 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1205 let target_client = postgres::connect_with_retry(target_url).await?;
1206 tracing::info!(" ✓ Connected to PostgreSQL target");
1207
1208 tracing::info!("Step 5/5: Replicating tables...");
1210 for (idx, table_name) in tables.iter().enumerate() {
1211 tracing::info!(
1212 "Replicating table {}/{}: '{}'",
1213 idx + 1,
1214 tables.len(),
1215 table_name
1216 );
1217
1218 let rows =
1220 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1221 .await
1222 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1223
1224 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1225
1226 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1228 .await
1229 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1230
1231 crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
1233 .await
1234 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
1235
1236 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1237
1238 if !rows.is_empty() {
1239 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1241 .await
1242 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1243
1244 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1245 } else {
1246 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1247 }
1248 }
1249
1250 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1251 tracing::info!(
1252 " Replicated {} table(s) from database '{}' to PostgreSQL",
1253 tables.len(),
1254 db_name
1255 );
1256
1257 Ok(())
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use super::*;
1263
1264 #[tokio::test]
1265 #[ignore]
1266 async fn test_init_replicates_database() {
1267 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1268 let target = std::env::var("TEST_TARGET_URL").unwrap();
1269
1270 let filter = crate::filters::ReplicationFilter::empty();
1272 let result = init(&source, &target, true, filter, false, false, true, false).await;
1273 assert!(result.is_ok());
1274 }
1275
1276 #[test]
1277 fn test_replace_database_in_url() {
1278 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1279 let result = replace_database_in_url(url, "newdb").unwrap();
1280 assert_eq!(
1281 result,
1282 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1283 );
1284
1285 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1286 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1287 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1288 }
1289
1290 #[tokio::test]
1291 #[ignore]
1292 async fn test_database_is_empty() {
1293 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1294
1295 let client = crate::postgres::connect_with_retry(&url)
1297 .await
1298 .expect("Failed to connect");
1299
1300 let result = database_is_empty(&client).await;
1303 assert!(result.is_ok());
1304 }
1305}