1use crate::migration::dump::remove_restricted_role_grants;
5use crate::{checkpoint, migration, postgres};
6use anyhow::{bail, Context, Result};
7use std::io::{self, Write};
8use tokio_postgres::Client;
9
10#[allow(clippy::too_many_arguments)]
83pub async fn init(
84 source_url: &str,
85 target_url: &str,
86 skip_confirmation: bool,
87 filter: crate::filters::ReplicationFilter,
88 drop_existing: bool,
89 enable_sync: bool,
90 allow_resume: bool,
91 force_local: bool,
92) -> Result<()> {
93 tracing::info!("Starting initial replication...");
94
95 let source_type =
97 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
98
99 match source_type {
100 crate::SourceType::PostgreSQL => {
101 tracing::info!("Source type: PostgreSQL");
103
104 tracing::info!("Running pre-flight checks...");
106
107 let databases = filter.databases_to_check();
109 let preflight_result = crate::preflight::run_preflight_checks(
110 source_url,
111 target_url,
112 databases.as_deref(),
113 )
114 .await?;
115
116 preflight_result.print();
117
118 if !preflight_result.all_passed() {
119 if preflight_result.tool_version_incompatible
121 && crate::utils::is_serendb_target(target_url)
122 && !force_local
123 {
124 println!();
125 tracing::info!(
126 "Tool version incompatible. Switching to SerenAI cloud execution..."
127 );
128 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
130 }
131
132 if force_local {
134 bail!(
135 "Pre-flight checks failed. Cannot continue with --local flag.\n\
136 Fix the issues above or remove --local to allow remote execution."
137 );
138 }
139
140 bail!("Pre-flight checks failed. Fix the issues above and retry.");
141 }
142
143 println!();
144 }
145 crate::SourceType::SQLite => {
146 tracing::info!("Source type: SQLite");
148
149 if !filter.is_empty() {
151 tracing::warn!(
152 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
153 );
154 }
155 if drop_existing {
156 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
157 }
158 if !enable_sync {
159 tracing::warn!(
160 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
161 );
162 }
163
164 return init_sqlite_to_postgres(source_url, target_url).await;
165 }
166 crate::SourceType::MongoDB => {
167 tracing::info!("Source type: MongoDB");
169
170 if !filter.is_empty() {
172 tracing::warn!(
173 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
174 );
175 }
176 if drop_existing {
177 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
178 }
179 if !enable_sync {
180 tracing::warn!(
181 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
182 );
183 }
184
185 return init_mongodb_to_postgres(source_url, target_url).await;
186 }
187 crate::SourceType::MySQL => {
188 tracing::info!("Source type: MySQL");
190
191 if !filter.is_empty() {
193 tracing::warn!(
194 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
195 );
196 }
197 if drop_existing {
198 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
199 }
200 if !enable_sync {
201 tracing::warn!(
202 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
203 );
204 }
205
206 return init_mysql_to_postgres(source_url, target_url).await;
207 }
208 }
209
210 crate::utils::validate_source_target_different(source_url, target_url)
212 .context("Source and target validation failed")?;
213 tracing::info!("✓ Verified source and target are different databases");
214
215 let temp_path =
218 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
219 tracing::debug!("Using temp directory: {}", temp_path.display());
220
221 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
222 .context("Failed to determine checkpoint location")?;
223
224 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
226 let globals_file = temp_path.join("globals.sql");
227 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
228 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
229 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
230 migration::remove_superuser_from_globals(globals_file.to_str().unwrap())
231 .context("Failed to remove SUPERUSER from globals dump")?;
232 migration::remove_restricted_guc_settings(globals_file.to_str().unwrap())
233 .context("Failed to remove restricted parameter settings from globals dump")?;
234 remove_restricted_role_grants(globals_file.to_str().unwrap())
235 .context("Failed to remove restricted role grants from globals dump")?;
236 migration::remove_tablespace_statements(globals_file.to_str().unwrap())
237 .context("Failed to remove CREATE TABLESPACE statements from globals dump")?;
238
239 tracing::info!("Step 2/4: Restoring global objects to target...");
241 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
242
243 tracing::info!("Step 3/4: Discovering databases...");
245 let all_databases = {
246 let source_client = postgres::connect_with_retry(source_url).await?;
248 migration::list_databases(&source_client).await?
249 }; let databases: Vec<_> = all_databases
253 .into_iter()
254 .filter(|db| filter.should_replicate_database(&db.name))
255 .collect();
256
257 if databases.is_empty() {
258 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
259 if filter.is_empty() {
260 tracing::warn!("⚠ No user databases found on source");
261 tracing::warn!(" This is unusual - the source database appears empty");
262 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
263 } else {
264 tracing::warn!("⚠ No databases matched the filter criteria");
265 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
266 }
267 tracing::info!("✅ Initial replication complete (no databases to replicate)");
268 return Ok(());
269 }
270
271 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
272 let filter_hash = filter.fingerprint();
273 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
274 source_url,
275 target_url,
276 filter_hash,
277 drop_existing,
278 enable_sync,
279 );
280
281 let mut checkpoint_state = if allow_resume {
282 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
283 Some(existing) => {
284 match existing.validate(&checkpoint_metadata, &database_names) {
286 Ok(()) => {
287 if existing.completed_count() > 0 {
289 tracing::info!(
290 "Resume checkpoint found: {}/{} databases already replicated",
291 existing.completed_count(),
292 existing.total_databases()
293 );
294 } else {
295 tracing::info!(
296 "Resume checkpoint found but no databases marked complete yet"
297 );
298 }
299 existing
300 }
301 Err(e) => {
302 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
304 tracing::warn!(
305 " Previous run configuration differs from current configuration"
306 );
307 tracing::warn!(" - Schema-only tables may have changed");
308 tracing::warn!(" - Time filters may have changed");
309 tracing::warn!(" - Table selection may have changed");
310 tracing::warn!(" Error: {}", e);
311 tracing::info!("");
312 tracing::info!(
313 "✓ Automatically discarding old checkpoint and starting fresh"
314 );
315 checkpoint::remove_checkpoint(&checkpoint_path)?;
316 checkpoint::InitCheckpoint::new(
317 checkpoint_metadata.clone(),
318 &database_names,
319 )
320 }
321 }
322 }
323 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
324 }
325 } else {
326 if checkpoint_path.exists() {
327 tracing::info!(
328 "--no-resume supplied: discarding previous checkpoint at {}",
329 checkpoint_path.display()
330 );
331 }
332 checkpoint::remove_checkpoint(&checkpoint_path)?;
333 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
334 };
335
336 checkpoint_state
338 .save(&checkpoint_path)
339 .context("Failed to persist checkpoint state")?;
340
341 tracing::info!("Found {} database(s) to replicate", databases.len());
342
343 if let Ok(target_parts) = crate::utils::parse_postgres_url(target_url) {
346 let target_db = &target_parts.database;
347 let source_db_names: Vec<&str> = databases.iter().map(|db| db.name.as_str()).collect();
348
349 if !source_db_names.contains(&target_db.as_str())
350 && target_db != "postgres"
351 && !target_db.is_empty()
352 {
353 println!();
354 println!("========================================");
355 println!("⚠️ IMPORTANT: Target database name ignored");
356 println!("========================================");
357 println!();
358 println!(
359 "You specified target database '{}', but replication preserves",
360 target_db
361 );
362 println!("source database names. Data will be replicated to:");
363 println!();
364 for db_name in &source_db_names {
365 println!(" → {}", db_name);
366 }
367 println!();
368 println!(
369 "The '{}' database in your target URL is used only for",
370 target_db
371 );
372 println!("the initial connection. Source databases will be created");
373 println!("on the target server with their original names.");
374 println!();
375 println!("When running 'sync' later, use the correct database name:");
376 if let Some(first_db) = source_db_names.first() {
377 println!(" --target \"postgresql://.../{}\"\n", first_db);
378 }
379 println!("========================================");
380 println!();
381 }
382 }
383
384 if !skip_confirmation {
386 tracing::info!("Analyzing database sizes...");
387 let size_estimates = {
388 let source_client = postgres::connect_with_retry(source_url).await?;
390 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
391 .await?
392 }; if !confirm_replication(&size_estimates)? {
395 bail!("Replication cancelled by user");
396 }
397 }
398
399 tracing::info!("Step 4/4: Replicating databases...");
401 for (idx, db_info) in databases.iter().enumerate() {
402 let filtered_tables = filter.predicate_tables(&db_info.name);
403 if checkpoint_state.is_completed(&db_info.name) {
404 tracing::info!(
405 "Skipping database '{}' (already completed per checkpoint)",
406 db_info.name
407 );
408 continue;
409 }
410 tracing::info!(
411 "Replicating database {}/{}: '{}'",
412 idx + 1,
413 databases.len(),
414 db_info.name
415 );
416
417 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
419 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
420
421 {
424 let target_client = postgres::connect_with_retry(target_url).await?;
425
426 crate::utils::validate_postgres_identifier(&db_info.name)
428 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
429
430 let create_query = format!(
432 "CREATE DATABASE {}",
433 crate::utils::quote_ident(&db_info.name)
434 );
435 match target_client.execute(&create_query, &[]).await {
436 Ok(_) => {
437 tracing::info!(" Created database '{}'", db_info.name);
438 }
439 Err(err) => {
440 if let Some(db_error) = err.as_db_error() {
442 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
443 tracing::info!(
445 " Database '{}' already exists on target",
446 db_info.name
447 );
448
449 let is_empty = {
452 let db_client =
453 postgres::connect_with_retry(&target_db_url).await?;
454 database_is_empty(&db_client).await?
455 }; if is_empty {
458 tracing::info!(
459 " Database '{}' is empty, proceeding with restore",
460 db_info.name
461 );
462 } else {
463 let should_drop = if drop_existing {
465 true
467 } else if skip_confirmation {
468 tracing::info!(
470 " Auto-confirming drop for database '{}' (--yes flag)",
471 db_info.name
472 );
473 true
474 } else {
475 prompt_drop_database(&db_info.name)?
477 };
478
479 if should_drop {
480 drop_database_if_exists(&target_client, &db_info.name).await?;
481
482 let create_query = format!(
484 "CREATE DATABASE {}",
485 crate::utils::quote_ident(&db_info.name)
486 );
487 target_client
488 .execute(&create_query, &[])
489 .await
490 .with_context(|| {
491 format!(
492 "Failed to create database '{}' after drop",
493 db_info.name
494 )
495 })?;
496 tracing::info!(" Created database '{}'", db_info.name);
497 } else {
498 bail!("Aborted: Database '{}' already exists", db_info.name);
499 }
500 }
501 } else {
502 return Err(err).with_context(|| {
504 format!("Failed to create database '{}'", db_info.name)
505 });
506 }
507 } else {
508 return Err(err).with_context(|| {
510 format!("Failed to create database '{}'", db_info.name)
511 });
512 }
513 }
514 }
515 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
519 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
520 migration::dump_schema(
521 &source_db_url,
522 &db_info.name,
523 schema_file.to_str().unwrap(),
524 &filter,
525 )
526 .await?;
527
528 tracing::info!(" Restoring schema for '{}'...", db_info.name);
529 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
530
531 tracing::info!(" Dumping data for '{}'...", db_info.name);
533 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
534 migration::dump_data(
535 &source_db_url,
536 &db_info.name,
537 data_dir.to_str().unwrap(),
538 &filter,
539 )
540 .await?;
541
542 tracing::info!(" Restoring data for '{}'...", db_info.name);
543 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
544
545 if !filtered_tables.is_empty() {
546 tracing::info!(
547 " Applying filtered replication for {} table(s)...",
548 filtered_tables.len()
549 );
550 migration::filtered::copy_filtered_tables(
551 &source_db_url,
552 &target_db_url,
553 &filtered_tables,
554 )
555 .await?;
556 }
557
558 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
559
560 checkpoint_state.mark_completed(&db_info.name);
561 checkpoint_state
562 .save(&checkpoint_path)
563 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
564 }
565
566 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
569 tracing::warn!("Failed to clean up temp directory: {}", e);
570 }
572
573 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
574 tracing::warn!("Failed to remove checkpoint state: {}", err);
575 }
576
577 tracing::info!("✅ Initial replication complete");
578
579 let mut should_enable_sync = enable_sync;
581 if enable_sync {
582 tracing::info!("Checking target wal_level for logical replication...");
583 let target_wal_level = {
584 let target_client = postgres::connect_with_retry(target_url).await?;
586 postgres::check_wal_level(&target_client).await?
587 }; if target_wal_level != "logical" {
590 tracing::warn!("");
591 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
592 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
593 tracing::warn!("");
594 tracing::warn!(" To fix this:");
595 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
596 tracing::warn!(" 2. Restart PostgreSQL server");
597 tracing::warn!(
598 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
599 );
600 tracing::warn!("");
601 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
602 should_enable_sync = false;
603 }
604 }
605
606 if should_enable_sync {
608 tracing::info!("");
609 tracing::info!("========================================");
610 tracing::info!("Step 5/5: Setting up continuous replication...");
611 tracing::info!("========================================");
612 tracing::info!("");
613
614 crate::commands::sync(
616 source_url,
617 target_url,
618 Some(filter),
619 None,
620 None,
621 None,
622 false,
623 )
624 .await
625 .context("Failed to set up continuous replication")?;
626
627 tracing::info!("");
628 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
629 } else {
630 tracing::info!("");
631 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
632 tracing::info!(" To enable it later, run:");
633 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
634 }
635
636 Ok(())
637}
638
639fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
641 let parts: Vec<&str> = url.split('?').collect();
646 let base_url = parts[0];
647 let params = if parts.len() > 1 {
648 Some(parts[1])
649 } else {
650 None
651 };
652
653 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
655 if url_parts.len() != 2 {
656 anyhow::bail!("Invalid connection URL format");
657 }
658
659 let mut new_url = format!("{}/{}", url_parts[1], new_database);
661 if let Some(p) = params {
662 new_url = format!("{}?{}", new_url, p);
663 }
664
665 Ok(new_url)
666}
667
668fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
685 use std::time::Duration;
686
687 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
689 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
690
691 println!();
693 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
694 println!("{}", "─".repeat(50));
695
696 for size in sizes {
698 println!(
699 "{:<20} {:<12} {:<15}",
700 size.name,
701 size.size_human,
702 migration::format_duration(size.estimated_duration)
703 );
704 }
705
706 println!("{}", "─".repeat(50));
708 println!(
709 "Total: {} (estimated {})",
710 migration::format_bytes(total_bytes),
711 migration::format_duration(total_duration)
712 );
713 println!();
714
715 print!("Proceed with replication? [y/N]: ");
717 io::stdout().flush()?;
718
719 let mut input = String::new();
720 io::stdin()
721 .read_line(&mut input)
722 .context("Failed to read user input")?;
723
724 Ok(input.trim().to_lowercase() == "y")
725}
726
727async fn database_is_empty(client: &tokio_postgres::Client) -> Result<bool> {
731 let query = "
732 SELECT COUNT(*)
733 FROM information_schema.tables
734 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
735 ";
736
737 let row = tokio::time::timeout(
739 std::time::Duration::from_secs(30),
740 client.query_one(query, &[]),
741 )
742 .await
743 .context("database_is_empty query timed out after 30 seconds")?
744 .context("Failed to query information_schema.tables")?;
745
746 let count: i64 = row.get(0);
747
748 Ok(count == 0)
749}
750
751fn prompt_drop_database(db_name: &str) -> Result<bool> {
753 use std::io::{self, Write};
754
755 print!(
756 "\nWarning: Database '{}' already exists on target and contains data.\n\
757 Drop and recreate database? This will delete all existing data. [y/N]: ",
758 db_name
759 );
760 io::stdout().flush()?;
761
762 let mut input = String::new();
763 io::stdin().read_line(&mut input)?;
764
765 Ok(input.trim().eq_ignore_ascii_case("y"))
766}
767
768async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
770 crate::utils::validate_postgres_identifier(db_name)
772 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
773
774 tracing::info!(" Dropping existing database '{}'...", db_name);
775
776 let terminate_query = "
779 SELECT pg_terminate_backend(sa.pid)
780 FROM pg_stat_activity sa
781 JOIN pg_roles r ON sa.usename = r.rolname
782 WHERE sa.datname = $1
783 AND sa.pid <> pg_backend_pid()
784 AND NOT r.rolsuper
785 ";
786 target_conn.execute(terminate_query, &[&db_name]).await?;
787
788 let remaining_query = "
790 SELECT COUNT(*), STRING_AGG(DISTINCT sa.usename, ', ')
791 FROM pg_stat_activity sa
792 WHERE sa.datname = $1
793 AND sa.pid <> pg_backend_pid()
794 ";
795 let row = target_conn
796 .query_one(remaining_query, &[&db_name])
797 .await
798 .context("Failed to check remaining connections")?;
799 let remaining_count: i64 = row.get(0);
800 let remaining_users: Option<String> = row.get(1);
801
802 if remaining_count > 0 {
803 let users = remaining_users.unwrap_or_else(|| "unknown".to_string());
804 bail!(
805 "Cannot drop database '{}': {} active connection(s) from user(s): {}\n\n\
806 These are likely SUPERUSER sessions that cannot be terminated by regular users.\n\
807 This is common on managed PostgreSQL services (AWS RDS, SerenDB) where system\n\
808 processes maintain superuser connections.\n\n\
809 To resolve this:\n\
810 1. Wait a few minutes and retry (system connections may be temporary)\n\
811 2. Ask your database administrator to terminate the blocking sessions:\n\
812 SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}';\n\
813 3. If using AWS RDS, check for RDS-managed connections in the RDS console",
814 db_name,
815 remaining_count,
816 users,
817 db_name
818 );
819 }
820
821 let drop_query = format!(
823 "DROP DATABASE IF EXISTS {}",
824 crate::utils::quote_ident(db_name)
825 );
826 target_conn
827 .execute(&drop_query, &[])
828 .await
829 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
830
831 tracing::info!(" ✓ Database '{}' dropped", db_name);
832 Ok(())
833}
834
835pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
883 tracing::info!("Starting SQLite to PostgreSQL migration...");
884
885 tracing::info!("Step 1/4: Validating SQLite database...");
887 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
888 .context("SQLite file validation failed")?;
889 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
890
891 tracing::info!("Step 2/4: Opening SQLite database...");
893 let sqlite_conn =
894 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
895 tracing::info!(" ✓ SQLite database opened (read-only mode)");
896
897 tracing::info!("Step 3/4: Discovering tables...");
899 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
900 .context("Failed to list tables from SQLite database")?;
901
902 if tables.is_empty() {
903 tracing::warn!("⚠ No tables found in SQLite database");
904 tracing::info!("✅ Migration complete (no tables to migrate)");
905 return Ok(());
906 }
907
908 tracing::info!("Found {} table(s) to migrate", tables.len());
909
910 let target_client = postgres::connect_with_retry(target_url).await?;
912 tracing::info!(" ✓ Connected to PostgreSQL target");
913
914 let mut table_row_counts: Vec<(&str, usize)> = Vec::new();
916 let mut total_rows = 0usize;
917 for table_name in &tables {
918 let count =
919 crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0);
920 table_row_counts.push((table_name, count));
921 total_rows += count;
922 }
923
924 tracing::info!(
925 "Total rows to migrate: {} across {} table(s)",
926 total_rows,
927 tables.len()
928 );
929
930 tracing::info!("Step 4/4: Migrating tables (batched processing)...");
932 let mut migrated_rows = 0usize;
933
934 for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() {
935 tracing::info!(
936 "Migrating table {}/{}: '{}' ({} rows)",
937 idx + 1,
938 tables.len(),
939 table_name,
940 row_count
941 );
942
943 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
945 .await
946 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
947
948 crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
950 .await
951 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
952
953 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
954
955 let rows_processed = crate::sqlite::converter::convert_table_batched(
957 &sqlite_conn,
958 &target_client,
959 table_name,
960 "sqlite",
961 None, )
963 .await
964 .with_context(|| format!("Failed to migrate table '{}'", table_name))?;
965
966 migrated_rows += rows_processed;
967
968 if rows_processed > 0 {
969 tracing::info!(
970 " ✓ Migrated {} rows from '{}' ({:.1}% of total)",
971 rows_processed,
972 table_name,
973 if total_rows > 0 {
974 migrated_rows as f64 / total_rows as f64 * 100.0
975 } else {
976 100.0
977 }
978 );
979 } else {
980 tracing::info!(" ✓ Table '{}' is empty (no rows to migrate)", table_name);
981 }
982 }
983
984 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
985 tracing::info!(
986 " Migrated {} row(s) from {} table(s) in '{}'",
987 migrated_rows,
988 tables.len(),
989 sqlite_path
990 );
991
992 Ok(())
993}
994
995pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
1046 tracing::info!("Starting MongoDB to PostgreSQL migration...");
1047
1048 tracing::info!("Step 1/5: Validating MongoDB connection...");
1050 let client = crate::mongodb::connect_mongodb(mongo_url)
1051 .await
1052 .context("MongoDB connection failed")?;
1053 tracing::info!(" ✓ MongoDB connection validated");
1054
1055 tracing::info!("Step 2/5: Extracting database name...");
1057 let db_name = crate::mongodb::extract_database_name(mongo_url)
1058 .await
1059 .context("Failed to parse MongoDB connection string")?
1060 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
1061 tracing::info!(" ✓ Database name: '{}'", db_name);
1062
1063 tracing::info!("Step 3/5: Discovering collections...");
1065 let db = client.database(&db_name);
1066 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
1067 .await
1068 .context("Failed to list collections from MongoDB database")?;
1069
1070 if collections.is_empty() {
1071 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
1072 tracing::info!("✅ Migration complete (no collections to migrate)");
1073 return Ok(());
1074 }
1075
1076 tracing::info!("Found {} collection(s) to migrate", collections.len());
1077
1078 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1080 let target_client = postgres::connect_with_retry(target_url).await?;
1081 tracing::info!(" ✓ Connected to PostgreSQL target");
1082
1083 tracing::info!("Step 5/5: Migrating collections...");
1085 for (idx, collection_name) in collections.iter().enumerate() {
1086 tracing::info!(
1087 "Migrating collection {}/{}: '{}'",
1088 idx + 1,
1089 collections.len(),
1090 collection_name
1091 );
1092
1093 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
1095 .await
1096 .with_context(|| {
1097 format!(
1098 "Failed to convert collection '{}' to JSONB",
1099 collection_name
1100 )
1101 })?;
1102
1103 tracing::info!(
1104 " ✓ Converted {} documents from '{}'",
1105 rows.len(),
1106 collection_name
1107 );
1108
1109 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
1111 .await
1112 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
1113
1114 crate::jsonb::writer::truncate_jsonb_table(&target_client, collection_name)
1116 .await
1117 .with_context(|| format!("Failed to truncate JSONB table '{}'", collection_name))?;
1118
1119 tracing::info!(
1120 " ✓ Created JSONB table '{}' in PostgreSQL",
1121 collection_name
1122 );
1123
1124 if !rows.is_empty() {
1125 crate::jsonb::writer::insert_jsonb_batch(
1127 &target_client,
1128 collection_name,
1129 rows,
1130 "mongodb",
1131 )
1132 .await
1133 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
1134
1135 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
1136 } else {
1137 tracing::info!(
1138 " ✓ Collection '{}' is empty (no documents to insert)",
1139 collection_name
1140 );
1141 }
1142 }
1143
1144 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1145 tracing::info!(
1146 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1147 collections.len(),
1148 db_name
1149 );
1150
1151 Ok(())
1152}
1153
1154pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1205 tracing::info!("Starting MySQL to PostgreSQL replication...");
1206
1207 tracing::info!("Step 1/5: Validating MySQL connection...");
1209 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1210 .await
1211 .context("MySQL connection failed")?;
1212 tracing::info!(" ✓ MySQL connection validated");
1213
1214 tracing::info!("Step 2/5: Extracting database name...");
1216 let db_name = crate::mysql::extract_database_name(mysql_url)
1217 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1218 tracing::info!(" ✓ Database name: '{}'", db_name);
1219
1220 tracing::info!("Step 3/5: Discovering tables...");
1222 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1223 .await
1224 .context("Failed to list tables from MySQL database")?;
1225
1226 if tables.is_empty() {
1227 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1228 tracing::info!("✅ Replication complete (no tables to replicate)");
1229 return Ok(());
1230 }
1231
1232 tracing::info!("Found {} table(s) to replicate", tables.len());
1233
1234 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1236 let target_client = postgres::connect_with_retry(target_url).await?;
1237 tracing::info!(" ✓ Connected to PostgreSQL target");
1238
1239 tracing::info!("Step 5/5: Replicating tables...");
1241 for (idx, table_name) in tables.iter().enumerate() {
1242 tracing::info!(
1243 "Replicating table {}/{}: '{}'",
1244 idx + 1,
1245 tables.len(),
1246 table_name
1247 );
1248
1249 let rows =
1251 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1252 .await
1253 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1254
1255 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1256
1257 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1259 .await
1260 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1261
1262 crate::jsonb::writer::truncate_jsonb_table(&target_client, table_name)
1264 .await
1265 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
1266
1267 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1268
1269 if !rows.is_empty() {
1270 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1272 .await
1273 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1274
1275 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1276 } else {
1277 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1278 }
1279 }
1280
1281 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1282 tracing::info!(
1283 " Replicated {} table(s) from database '{}' to PostgreSQL",
1284 tables.len(),
1285 db_name
1286 );
1287
1288 Ok(())
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use super::*;
1294
1295 #[tokio::test]
1296 #[ignore]
1297 async fn test_init_replicates_database() {
1298 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1299 let target = std::env::var("TEST_TARGET_URL").unwrap();
1300
1301 let filter = crate::filters::ReplicationFilter::empty();
1303 let result = init(&source, &target, true, filter, false, false, true, false).await;
1304 assert!(result.is_ok());
1305 }
1306
1307 #[test]
1308 fn test_replace_database_in_url() {
1309 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1310 let result = replace_database_in_url(url, "newdb").unwrap();
1311 assert_eq!(
1312 result,
1313 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1314 );
1315
1316 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1317 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1318 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1319 }
1320
1321 #[tokio::test]
1322 #[ignore]
1323 async fn test_database_is_empty() {
1324 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1325
1326 let client = crate::postgres::connect_with_retry(&url)
1328 .await
1329 .expect("Failed to connect");
1330
1331 let result = database_is_empty(&client).await;
1334 assert!(result.is_ok());
1335 }
1336}