1use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9#[allow(clippy::too_many_arguments)]
82pub async fn init(
83 source_url: &str,
84 target_url: &str,
85 skip_confirmation: bool,
86 filter: crate::filters::ReplicationFilter,
87 drop_existing: bool,
88 enable_sync: bool,
89 allow_resume: bool,
90 force_local: bool,
91) -> Result<()> {
92 tracing::info!("Starting initial replication...");
93
94 let source_type =
96 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
97
98 match source_type {
99 crate::SourceType::PostgreSQL => {
100 tracing::info!("Source type: PostgreSQL");
102
103 tracing::info!("Running pre-flight checks...");
105
106 let databases = filter.include_databases().map(|v| v.to_vec());
107 let preflight_result = crate::preflight::run_preflight_checks(
108 source_url,
109 target_url,
110 databases.as_deref(),
111 )
112 .await?;
113
114 preflight_result.print();
115
116 if !preflight_result.all_passed() {
117 if preflight_result.tool_version_incompatible
119 && crate::utils::is_serendb_target(target_url)
120 && !force_local
121 {
122 println!();
123 tracing::info!(
124 "Tool version incompatible. Switching to SerenAI cloud execution..."
125 );
126 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
128 }
129
130 if force_local {
132 bail!(
133 "Pre-flight checks failed. Cannot continue with --local flag.\n\
134 Fix the issues above or remove --local to allow remote execution."
135 );
136 }
137
138 bail!("Pre-flight checks failed. Fix the issues above and retry.");
139 }
140
141 println!();
142 }
143 crate::SourceType::SQLite => {
144 tracing::info!("Source type: SQLite");
146
147 if !filter.is_empty() {
149 tracing::warn!(
150 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
151 );
152 }
153 if drop_existing {
154 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
155 }
156 if !enable_sync {
157 tracing::warn!(
158 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
159 );
160 }
161
162 return init_sqlite_to_postgres(source_url, target_url).await;
163 }
164 crate::SourceType::MongoDB => {
165 tracing::info!("Source type: MongoDB");
167
168 if !filter.is_empty() {
170 tracing::warn!(
171 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
172 );
173 }
174 if drop_existing {
175 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
176 }
177 if !enable_sync {
178 tracing::warn!(
179 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
180 );
181 }
182
183 return init_mongodb_to_postgres(source_url, target_url).await;
184 }
185 crate::SourceType::MySQL => {
186 tracing::info!("Source type: MySQL");
188
189 if !filter.is_empty() {
191 tracing::warn!(
192 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
193 );
194 }
195 if drop_existing {
196 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
197 }
198 if !enable_sync {
199 tracing::warn!(
200 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
201 );
202 }
203
204 return init_mysql_to_postgres(source_url, target_url).await;
205 }
206 }
207
208 crate::utils::validate_source_target_different(source_url, target_url)
210 .context("Source and target validation failed")?;
211 tracing::info!("✓ Verified source and target are different databases");
212
213 let temp_path =
216 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
217 tracing::debug!("Using temp directory: {}", temp_path.display());
218
219 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
220 .context("Failed to determine checkpoint location")?;
221
222 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
224 let globals_file = temp_path.join("globals.sql");
225 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
226 migration::sanitize_globals_dump(globals_file.to_str().unwrap())
227 .context("Failed to update globals dump so duplicate roles are ignored during restore")?;
228
229 tracing::info!("Step 2/4: Restoring global objects to target...");
231 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
232
233 tracing::info!("Step 3/4: Discovering databases...");
235 let all_databases = {
236 let source_client = postgres::connect_with_retry(source_url).await?;
238 migration::list_databases(&source_client).await?
239 }; let databases: Vec<_> = all_databases
243 .into_iter()
244 .filter(|db| filter.should_replicate_database(&db.name))
245 .collect();
246
247 if databases.is_empty() {
248 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
249 if filter.is_empty() {
250 tracing::warn!("⚠ No user databases found on source");
251 tracing::warn!(" This is unusual - the source database appears empty");
252 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
253 } else {
254 tracing::warn!("⚠ No databases matched the filter criteria");
255 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
256 }
257 tracing::info!("✅ Initial replication complete (no databases to replicate)");
258 return Ok(());
259 }
260
261 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
262 let filter_hash = filter.fingerprint();
263 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
264 source_url,
265 target_url,
266 filter_hash,
267 drop_existing,
268 enable_sync,
269 );
270
271 let mut checkpoint_state = if allow_resume {
272 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
273 Some(existing) => {
274 match existing.validate(&checkpoint_metadata, &database_names) {
276 Ok(()) => {
277 if existing.completed_count() > 0 {
279 tracing::info!(
280 "Resume checkpoint found: {}/{} databases already replicated",
281 existing.completed_count(),
282 existing.total_databases()
283 );
284 } else {
285 tracing::info!(
286 "Resume checkpoint found but no databases marked complete yet"
287 );
288 }
289 existing
290 }
291 Err(e) => {
292 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
294 tracing::warn!(
295 " Previous run configuration differs from current configuration"
296 );
297 tracing::warn!(" - Schema-only tables may have changed");
298 tracing::warn!(" - Time filters may have changed");
299 tracing::warn!(" - Table selection may have changed");
300 tracing::warn!(" Error: {}", e);
301 tracing::info!("");
302 tracing::info!(
303 "✓ Automatically discarding old checkpoint and starting fresh"
304 );
305 checkpoint::remove_checkpoint(&checkpoint_path)?;
306 checkpoint::InitCheckpoint::new(
307 checkpoint_metadata.clone(),
308 &database_names,
309 )
310 }
311 }
312 }
313 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
314 }
315 } else {
316 if checkpoint_path.exists() {
317 tracing::info!(
318 "--no-resume supplied: discarding previous checkpoint at {}",
319 checkpoint_path.display()
320 );
321 }
322 checkpoint::remove_checkpoint(&checkpoint_path)?;
323 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
324 };
325
326 checkpoint_state
328 .save(&checkpoint_path)
329 .context("Failed to persist checkpoint state")?;
330
331 tracing::info!("Found {} database(s) to replicate", databases.len());
332
333 if !skip_confirmation {
335 tracing::info!("Analyzing database sizes...");
336 let size_estimates = {
337 let source_client = postgres::connect_with_retry(source_url).await?;
339 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
340 .await?
341 }; if !confirm_replication(&size_estimates)? {
344 bail!("Replication cancelled by user");
345 }
346 }
347
348 tracing::info!("Step 4/4: Replicating databases...");
350 for (idx, db_info) in databases.iter().enumerate() {
351 let filtered_tables = filter.predicate_tables(&db_info.name);
352 if checkpoint_state.is_completed(&db_info.name) {
353 tracing::info!(
354 "Skipping database '{}' (already completed per checkpoint)",
355 db_info.name
356 );
357 continue;
358 }
359 tracing::info!(
360 "Replicating database {}/{}: '{}'",
361 idx + 1,
362 databases.len(),
363 db_info.name
364 );
365
366 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
368 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
369
370 {
373 let target_client = postgres::connect_with_retry(target_url).await?;
374
375 crate::utils::validate_postgres_identifier(&db_info.name)
377 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
378
379 let create_query = format!(
381 "CREATE DATABASE {}",
382 crate::utils::quote_ident(&db_info.name)
383 );
384 match target_client.execute(&create_query, &[]).await {
385 Ok(_) => {
386 tracing::info!(" Created database '{}'", db_info.name);
387 }
388 Err(err) => {
389 if let Some(db_error) = err.as_db_error() {
391 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
392 tracing::info!(
394 " Database '{}' already exists on target",
395 db_info.name
396 );
397
398 if database_is_empty(target_url, &db_info.name).await? {
400 tracing::info!(
401 " Database '{}' is empty, proceeding with restore",
402 db_info.name
403 );
404 } else {
405 let should_drop = if drop_existing {
407 true
409 } else if skip_confirmation {
410 tracing::info!(
412 " Auto-confirming drop for database '{}' (--yes flag)",
413 db_info.name
414 );
415 true
416 } else {
417 prompt_drop_database(&db_info.name)?
419 };
420
421 if should_drop {
422 drop_database_if_exists(&target_client, &db_info.name).await?;
423
424 let create_query = format!(
426 "CREATE DATABASE {}",
427 crate::utils::quote_ident(&db_info.name)
428 );
429 target_client
430 .execute(&create_query, &[])
431 .await
432 .with_context(|| {
433 format!(
434 "Failed to create database '{}' after drop",
435 db_info.name
436 )
437 })?;
438 tracing::info!(" Created database '{}'", db_info.name);
439 } else {
440 bail!("Aborted: Database '{}' already exists", db_info.name);
441 }
442 }
443 } else {
444 return Err(err).with_context(|| {
446 format!("Failed to create database '{}'", db_info.name)
447 });
448 }
449 } else {
450 return Err(err).with_context(|| {
452 format!("Failed to create database '{}'", db_info.name)
453 });
454 }
455 }
456 }
457 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
461 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
462 migration::dump_schema(
463 &source_db_url,
464 &db_info.name,
465 schema_file.to_str().unwrap(),
466 &filter,
467 )
468 .await?;
469
470 tracing::info!(" Restoring schema for '{}'...", db_info.name);
471 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
472
473 tracing::info!(" Dumping data for '{}'...", db_info.name);
475 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
476 migration::dump_data(
477 &source_db_url,
478 &db_info.name,
479 data_dir.to_str().unwrap(),
480 &filter,
481 )
482 .await?;
483
484 tracing::info!(" Restoring data for '{}'...", db_info.name);
485 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
486
487 if !filtered_tables.is_empty() {
488 tracing::info!(
489 " Applying filtered replication for {} table(s)...",
490 filtered_tables.len()
491 );
492 migration::filtered::copy_filtered_tables(
493 &source_db_url,
494 &target_db_url,
495 &filtered_tables,
496 )
497 .await?;
498 }
499
500 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
501
502 checkpoint_state.mark_completed(&db_info.name);
503 checkpoint_state
504 .save(&checkpoint_path)
505 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
506 }
507
508 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
511 tracing::warn!("Failed to clean up temp directory: {}", e);
512 }
514
515 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
516 tracing::warn!("Failed to remove checkpoint state: {}", err);
517 }
518
519 tracing::info!("✅ Initial replication complete");
520
521 let mut should_enable_sync = enable_sync;
523 if enable_sync {
524 tracing::info!("Checking target wal_level for logical replication...");
525 let target_wal_level = {
526 let target_client = postgres::connect_with_retry(target_url).await?;
528 postgres::check_wal_level(&target_client).await?
529 }; if target_wal_level != "logical" {
532 tracing::warn!("");
533 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
534 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
535 tracing::warn!("");
536 tracing::warn!(" To fix this:");
537 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
538 tracing::warn!(" 2. Restart PostgreSQL server");
539 tracing::warn!(
540 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
541 );
542 tracing::warn!("");
543 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
544 should_enable_sync = false;
545 }
546 }
547
548 if should_enable_sync {
550 tracing::info!("");
551 tracing::info!("========================================");
552 tracing::info!("Step 5/5: Setting up continuous replication...");
553 tracing::info!("========================================");
554 tracing::info!("");
555
556 crate::commands::sync(
558 source_url,
559 target_url,
560 Some(filter),
561 None,
562 None,
563 None,
564 false,
565 )
566 .await
567 .context("Failed to set up continuous replication")?;
568
569 tracing::info!("");
570 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
571 } else {
572 tracing::info!("");
573 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
574 tracing::info!(" To enable it later, run:");
575 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
576 }
577
578 Ok(())
579}
580
581fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
583 let parts: Vec<&str> = url.split('?').collect();
588 let base_url = parts[0];
589 let params = if parts.len() > 1 {
590 Some(parts[1])
591 } else {
592 None
593 };
594
595 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
597 if url_parts.len() != 2 {
598 anyhow::bail!("Invalid connection URL format");
599 }
600
601 let mut new_url = format!("{}/{}", url_parts[1], new_database);
603 if let Some(p) = params {
604 new_url = format!("{}?{}", new_url, p);
605 }
606
607 Ok(new_url)
608}
609
610fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
627 use std::time::Duration;
628
629 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
631 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
632
633 println!();
635 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
636 println!("{}", "─".repeat(50));
637
638 for size in sizes {
640 println!(
641 "{:<20} {:<12} {:<15}",
642 size.name,
643 size.size_human,
644 migration::format_duration(size.estimated_duration)
645 );
646 }
647
648 println!("{}", "─".repeat(50));
650 println!(
651 "Total: {} (estimated {})",
652 migration::format_bytes(total_bytes),
653 migration::format_duration(total_duration)
654 );
655 println!();
656
657 print!("Proceed with replication? [y/N]: ");
659 io::stdout().flush()?;
660
661 let mut input = String::new();
662 io::stdin()
663 .read_line(&mut input)
664 .context("Failed to read user input")?;
665
666 Ok(input.trim().to_lowercase() == "y")
667}
668
669async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
671 let db_url = replace_database_in_url(target_url, db_name)?;
673 let client = postgres::connect_with_retry(&db_url).await?;
674
675 let query = "
676 SELECT COUNT(*)
677 FROM information_schema.tables
678 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
679 ";
680
681 let row = client.query_one(query, &[]).await?;
682 let count: i64 = row.get(0);
683
684 Ok(count == 0)
685}
686
687fn prompt_drop_database(db_name: &str) -> Result<bool> {
689 use std::io::{self, Write};
690
691 print!(
692 "\nWarning: Database '{}' already exists on target and contains data.\n\
693 Drop and recreate database? This will delete all existing data. [y/N]: ",
694 db_name
695 );
696 io::stdout().flush()?;
697
698 let mut input = String::new();
699 io::stdin().read_line(&mut input)?;
700
701 Ok(input.trim().eq_ignore_ascii_case("y"))
702}
703
704async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
706 crate::utils::validate_postgres_identifier(db_name)
708 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
709
710 tracing::info!(" Dropping existing database '{}'...", db_name);
711
712 let terminate_query = "
714 SELECT pg_terminate_backend(pid)
715 FROM pg_stat_activity
716 WHERE datname = $1 AND pid <> pg_backend_pid()
717 ";
718 target_conn.execute(terminate_query, &[&db_name]).await?;
719
720 let drop_query = format!(
722 "DROP DATABASE IF EXISTS {}",
723 crate::utils::quote_ident(db_name)
724 );
725 target_conn
726 .execute(&drop_query, &[])
727 .await
728 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
729
730 tracing::info!(" ✓ Database '{}' dropped", db_name);
731 Ok(())
732}
733
734pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
782 tracing::info!("Starting SQLite to PostgreSQL migration...");
783
784 tracing::info!("Step 1/4: Validating SQLite database...");
786 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
787 .context("SQLite file validation failed")?;
788 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
789
790 tracing::info!("Step 2/4: Opening SQLite database...");
792 let sqlite_conn =
793 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
794 tracing::info!(" ✓ SQLite database opened (read-only mode)");
795
796 tracing::info!("Step 3/4: Discovering tables...");
798 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
799 .context("Failed to list tables from SQLite database")?;
800
801 if tables.is_empty() {
802 tracing::warn!("⚠ No tables found in SQLite database");
803 tracing::info!("✅ Migration complete (no tables to migrate)");
804 return Ok(());
805 }
806
807 tracing::info!("Found {} table(s) to migrate", tables.len());
808
809 let target_client = postgres::connect_with_retry(target_url).await?;
811 tracing::info!(" ✓ Connected to PostgreSQL target");
812
813 tracing::info!("Step 4/4: Migrating tables...");
815 for (idx, table_name) in tables.iter().enumerate() {
816 tracing::info!(
817 "Migrating table {}/{}: '{}'",
818 idx + 1,
819 tables.len(),
820 table_name
821 );
822
823 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
825 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
826
827 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
828
829 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
831 .await
832 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
833
834 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
835
836 if !rows.is_empty() {
837 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
839 .await
840 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
841
842 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
843 } else {
844 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
845 }
846 }
847
848 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
849 tracing::info!(
850 " Migrated {} table(s) from '{}' to PostgreSQL",
851 tables.len(),
852 sqlite_path
853 );
854
855 Ok(())
856}
857
858pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
909 tracing::info!("Starting MongoDB to PostgreSQL migration...");
910
911 tracing::info!("Step 1/5: Validating MongoDB connection...");
913 let client = crate::mongodb::connect_mongodb(mongo_url)
914 .await
915 .context("MongoDB connection failed")?;
916 tracing::info!(" ✓ MongoDB connection validated");
917
918 tracing::info!("Step 2/5: Extracting database name...");
920 let db_name = crate::mongodb::extract_database_name(mongo_url)
921 .await
922 .context("Failed to parse MongoDB connection string")?
923 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
924 tracing::info!(" ✓ Database name: '{}'", db_name);
925
926 tracing::info!("Step 3/5: Discovering collections...");
928 let db = client.database(&db_name);
929 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
930 .await
931 .context("Failed to list collections from MongoDB database")?;
932
933 if collections.is_empty() {
934 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
935 tracing::info!("✅ Migration complete (no collections to migrate)");
936 return Ok(());
937 }
938
939 tracing::info!("Found {} collection(s) to migrate", collections.len());
940
941 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
943 let target_client = postgres::connect_with_retry(target_url).await?;
944 tracing::info!(" ✓ Connected to PostgreSQL target");
945
946 tracing::info!("Step 5/5: Migrating collections...");
948 for (idx, collection_name) in collections.iter().enumerate() {
949 tracing::info!(
950 "Migrating collection {}/{}: '{}'",
951 idx + 1,
952 collections.len(),
953 collection_name
954 );
955
956 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
958 .await
959 .with_context(|| {
960 format!(
961 "Failed to convert collection '{}' to JSONB",
962 collection_name
963 )
964 })?;
965
966 tracing::info!(
967 " ✓ Converted {} documents from '{}'",
968 rows.len(),
969 collection_name
970 );
971
972 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
974 .await
975 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
976
977 tracing::info!(
978 " ✓ Created JSONB table '{}' in PostgreSQL",
979 collection_name
980 );
981
982 if !rows.is_empty() {
983 crate::jsonb::writer::insert_jsonb_batch(
985 &target_client,
986 collection_name,
987 rows,
988 "mongodb",
989 )
990 .await
991 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
992
993 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
994 } else {
995 tracing::info!(
996 " ✓ Collection '{}' is empty (no documents to insert)",
997 collection_name
998 );
999 }
1000 }
1001
1002 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1003 tracing::info!(
1004 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1005 collections.len(),
1006 db_name
1007 );
1008
1009 Ok(())
1010}
1011
1012pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1063 tracing::info!("Starting MySQL to PostgreSQL replication...");
1064
1065 tracing::info!("Step 1/5: Validating MySQL connection...");
1067 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1068 .await
1069 .context("MySQL connection failed")?;
1070 tracing::info!(" ✓ MySQL connection validated");
1071
1072 tracing::info!("Step 2/5: Extracting database name...");
1074 let db_name = crate::mysql::extract_database_name(mysql_url)
1075 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1076 tracing::info!(" ✓ Database name: '{}'", db_name);
1077
1078 tracing::info!("Step 3/5: Discovering tables...");
1080 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1081 .await
1082 .context("Failed to list tables from MySQL database")?;
1083
1084 if tables.is_empty() {
1085 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1086 tracing::info!("✅ Replication complete (no tables to replicate)");
1087 return Ok(());
1088 }
1089
1090 tracing::info!("Found {} table(s) to replicate", tables.len());
1091
1092 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1094 let target_client = postgres::connect_with_retry(target_url).await?;
1095 tracing::info!(" ✓ Connected to PostgreSQL target");
1096
1097 tracing::info!("Step 5/5: Replicating tables...");
1099 for (idx, table_name) in tables.iter().enumerate() {
1100 tracing::info!(
1101 "Replicating table {}/{}: '{}'",
1102 idx + 1,
1103 tables.len(),
1104 table_name
1105 );
1106
1107 let rows =
1109 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1110 .await
1111 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1112
1113 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1114
1115 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1117 .await
1118 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1119
1120 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1121
1122 if !rows.is_empty() {
1123 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1125 .await
1126 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1127
1128 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1129 } else {
1130 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1131 }
1132 }
1133
1134 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1135 tracing::info!(
1136 " Replicated {} table(s) from database '{}' to PostgreSQL",
1137 tables.len(),
1138 db_name
1139 );
1140
1141 Ok(())
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use super::*;
1147
1148 #[tokio::test]
1149 #[ignore]
1150 async fn test_init_replicates_database() {
1151 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1152 let target = std::env::var("TEST_TARGET_URL").unwrap();
1153
1154 let filter = crate::filters::ReplicationFilter::empty();
1156 let result = init(&source, &target, true, filter, false, false, true, false).await;
1157 assert!(result.is_ok());
1158 }
1159
1160 #[test]
1161 fn test_replace_database_in_url() {
1162 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1163 let result = replace_database_in_url(url, "newdb").unwrap();
1164 assert_eq!(
1165 result,
1166 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1167 );
1168
1169 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1170 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1171 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1172 }
1173
1174 #[tokio::test]
1175 #[ignore]
1176 async fn test_database_is_empty() {
1177 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1178
1179 let result = database_is_empty(&url, "postgres").await;
1182 assert!(result.is_ok());
1183 }
1184}