1use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9#[allow(clippy::too_many_arguments)]
82pub async fn init(
83 source_url: &str,
84 target_url: &str,
85 skip_confirmation: bool,
86 filter: crate::filters::ReplicationFilter,
87 drop_existing: bool,
88 enable_sync: bool,
89 allow_resume: bool,
90 force_local: bool,
91) -> Result<()> {
92 tracing::info!("Starting initial replication...");
93
94 let source_type =
96 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
97
98 match source_type {
99 crate::SourceType::PostgreSQL => {
100 tracing::info!("Source type: PostgreSQL");
102
103 tracing::info!("Running pre-flight checks...");
105
106 let databases = filter.include_databases().map(|v| v.to_vec());
107 let preflight_result = crate::preflight::run_preflight_checks(
108 source_url,
109 target_url,
110 databases.as_deref(),
111 )
112 .await?;
113
114 preflight_result.print();
115
116 if !preflight_result.all_passed() {
117 if preflight_result.tool_version_incompatible
119 && crate::utils::is_serendb_target(target_url)
120 && !force_local
121 {
122 println!();
123 tracing::info!(
124 "Tool version incompatible. Switching to SerenAI cloud execution..."
125 );
126 bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
128 }
129
130 if force_local {
132 bail!(
133 "Pre-flight checks failed. Cannot continue with --local flag.\n\
134 Fix the issues above or remove --local to allow remote execution."
135 );
136 }
137
138 bail!("Pre-flight checks failed. Fix the issues above and retry.");
139 }
140
141 println!();
142 }
143 crate::SourceType::SQLite => {
144 tracing::info!("Source type: SQLite");
146
147 if !filter.is_empty() {
149 tracing::warn!(
150 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
151 );
152 }
153 if drop_existing {
154 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
155 }
156 if !enable_sync {
157 tracing::warn!(
158 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
159 );
160 }
161
162 return init_sqlite_to_postgres(source_url, target_url).await;
163 }
164 crate::SourceType::MongoDB => {
165 tracing::info!("Source type: MongoDB");
167
168 if !filter.is_empty() {
170 tracing::warn!(
171 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
172 );
173 }
174 if drop_existing {
175 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
176 }
177 if !enable_sync {
178 tracing::warn!(
179 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
180 );
181 }
182
183 return init_mongodb_to_postgres(source_url, target_url).await;
184 }
185 crate::SourceType::MySQL => {
186 tracing::info!("Source type: MySQL");
188
189 if !filter.is_empty() {
191 tracing::warn!(
192 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
193 );
194 }
195 if drop_existing {
196 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
197 }
198 if !enable_sync {
199 tracing::warn!(
200 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
201 );
202 }
203
204 return init_mysql_to_postgres(source_url, target_url).await;
205 }
206 }
207
208 crate::utils::validate_source_target_different(source_url, target_url)
210 .context("Source and target validation failed")?;
211 tracing::info!("✓ Verified source and target are different databases");
212
213 let temp_path =
216 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
217 tracing::debug!("Using temp directory: {}", temp_path.display());
218
219 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
220 .context("Failed to determine checkpoint location")?;
221
222 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
224 let globals_file = temp_path.join("globals.sql");
225 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
226
227 tracing::info!("Step 2/4: Restoring global objects to target...");
229 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
230
231 tracing::info!("Step 3/4: Discovering databases...");
233 let all_databases = {
234 let source_client = postgres::connect_with_retry(source_url).await?;
236 migration::list_databases(&source_client).await?
237 }; let databases: Vec<_> = all_databases
241 .into_iter()
242 .filter(|db| filter.should_replicate_database(&db.name))
243 .collect();
244
245 if databases.is_empty() {
246 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
247 if filter.is_empty() {
248 tracing::warn!("⚠ No user databases found on source");
249 tracing::warn!(" This is unusual - the source database appears empty");
250 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
251 } else {
252 tracing::warn!("⚠ No databases matched the filter criteria");
253 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
254 }
255 tracing::info!("✅ Initial replication complete (no databases to replicate)");
256 return Ok(());
257 }
258
259 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
260 let filter_hash = filter.fingerprint();
261 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
262 source_url,
263 target_url,
264 filter_hash,
265 drop_existing,
266 enable_sync,
267 );
268
269 let mut checkpoint_state = if allow_resume {
270 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
271 Some(existing) => {
272 match existing.validate(&checkpoint_metadata, &database_names) {
274 Ok(()) => {
275 if existing.completed_count() > 0 {
277 tracing::info!(
278 "Resume checkpoint found: {}/{} databases already replicated",
279 existing.completed_count(),
280 existing.total_databases()
281 );
282 } else {
283 tracing::info!(
284 "Resume checkpoint found but no databases marked complete yet"
285 );
286 }
287 existing
288 }
289 Err(e) => {
290 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
292 tracing::warn!(
293 " Previous run configuration differs from current configuration"
294 );
295 tracing::warn!(" - Schema-only tables may have changed");
296 tracing::warn!(" - Time filters may have changed");
297 tracing::warn!(" - Table selection may have changed");
298 tracing::warn!(" Error: {}", e);
299 tracing::info!("");
300 tracing::info!(
301 "✓ Automatically discarding old checkpoint and starting fresh"
302 );
303 checkpoint::remove_checkpoint(&checkpoint_path)?;
304 checkpoint::InitCheckpoint::new(
305 checkpoint_metadata.clone(),
306 &database_names,
307 )
308 }
309 }
310 }
311 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
312 }
313 } else {
314 if checkpoint_path.exists() {
315 tracing::info!(
316 "--no-resume supplied: discarding previous checkpoint at {}",
317 checkpoint_path.display()
318 );
319 }
320 checkpoint::remove_checkpoint(&checkpoint_path)?;
321 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
322 };
323
324 checkpoint_state
326 .save(&checkpoint_path)
327 .context("Failed to persist checkpoint state")?;
328
329 tracing::info!("Found {} database(s) to replicate", databases.len());
330
331 if !skip_confirmation {
333 tracing::info!("Analyzing database sizes...");
334 let size_estimates = {
335 let source_client = postgres::connect_with_retry(source_url).await?;
337 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
338 .await?
339 }; if !confirm_replication(&size_estimates)? {
342 bail!("Replication cancelled by user");
343 }
344 }
345
346 tracing::info!("Step 4/4: Replicating databases...");
348 for (idx, db_info) in databases.iter().enumerate() {
349 let filtered_tables = filter.predicate_tables(&db_info.name);
350 if checkpoint_state.is_completed(&db_info.name) {
351 tracing::info!(
352 "Skipping database '{}' (already completed per checkpoint)",
353 db_info.name
354 );
355 continue;
356 }
357 tracing::info!(
358 "Replicating database {}/{}: '{}'",
359 idx + 1,
360 databases.len(),
361 db_info.name
362 );
363
364 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
366 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
367
368 {
371 let target_client = postgres::connect_with_retry(target_url).await?;
372
373 crate::utils::validate_postgres_identifier(&db_info.name)
375 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
376
377 let create_query = format!(
379 "CREATE DATABASE {}",
380 crate::utils::quote_ident(&db_info.name)
381 );
382 match target_client.execute(&create_query, &[]).await {
383 Ok(_) => {
384 tracing::info!(" Created database '{}'", db_info.name);
385 }
386 Err(err) => {
387 if let Some(db_error) = err.as_db_error() {
389 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
390 tracing::info!(
392 " Database '{}' already exists on target",
393 db_info.name
394 );
395
396 if database_is_empty(target_url, &db_info.name).await? {
398 tracing::info!(
399 " Database '{}' is empty, proceeding with restore",
400 db_info.name
401 );
402 } else {
403 let should_drop = if drop_existing {
405 true
407 } else if skip_confirmation {
408 tracing::info!(
410 " Auto-confirming drop for database '{}' (--yes flag)",
411 db_info.name
412 );
413 true
414 } else {
415 prompt_drop_database(&db_info.name)?
417 };
418
419 if should_drop {
420 drop_database_if_exists(&target_client, &db_info.name).await?;
421
422 let create_query = format!(
424 "CREATE DATABASE {}",
425 crate::utils::quote_ident(&db_info.name)
426 );
427 target_client
428 .execute(&create_query, &[])
429 .await
430 .with_context(|| {
431 format!(
432 "Failed to create database '{}' after drop",
433 db_info.name
434 )
435 })?;
436 tracing::info!(" Created database '{}'", db_info.name);
437 } else {
438 bail!("Aborted: Database '{}' already exists", db_info.name);
439 }
440 }
441 } else {
442 return Err(err).with_context(|| {
444 format!("Failed to create database '{}'", db_info.name)
445 });
446 }
447 } else {
448 return Err(err).with_context(|| {
450 format!("Failed to create database '{}'", db_info.name)
451 });
452 }
453 }
454 }
455 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
459 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
460 migration::dump_schema(
461 &source_db_url,
462 &db_info.name,
463 schema_file.to_str().unwrap(),
464 &filter,
465 )
466 .await?;
467
468 tracing::info!(" Restoring schema for '{}'...", db_info.name);
469 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
470
471 tracing::info!(" Dumping data for '{}'...", db_info.name);
473 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
474 migration::dump_data(
475 &source_db_url,
476 &db_info.name,
477 data_dir.to_str().unwrap(),
478 &filter,
479 )
480 .await?;
481
482 tracing::info!(" Restoring data for '{}'...", db_info.name);
483 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
484
485 if !filtered_tables.is_empty() {
486 tracing::info!(
487 " Applying filtered replication for {} table(s)...",
488 filtered_tables.len()
489 );
490 migration::filtered::copy_filtered_tables(
491 &source_db_url,
492 &target_db_url,
493 &filtered_tables,
494 )
495 .await?;
496 }
497
498 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
499
500 checkpoint_state.mark_completed(&db_info.name);
501 checkpoint_state
502 .save(&checkpoint_path)
503 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
504 }
505
506 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
509 tracing::warn!("Failed to clean up temp directory: {}", e);
510 }
512
513 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
514 tracing::warn!("Failed to remove checkpoint state: {}", err);
515 }
516
517 tracing::info!("✅ Initial replication complete");
518
519 let mut should_enable_sync = enable_sync;
521 if enable_sync {
522 tracing::info!("Checking target wal_level for logical replication...");
523 let target_wal_level = {
524 let target_client = postgres::connect_with_retry(target_url).await?;
526 postgres::check_wal_level(&target_client).await?
527 }; if target_wal_level != "logical" {
530 tracing::warn!("");
531 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
532 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
533 tracing::warn!("");
534 tracing::warn!(" To fix this:");
535 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
536 tracing::warn!(" 2. Restart PostgreSQL server");
537 tracing::warn!(
538 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
539 );
540 tracing::warn!("");
541 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
542 should_enable_sync = false;
543 }
544 }
545
546 if should_enable_sync {
548 tracing::info!("");
549 tracing::info!("========================================");
550 tracing::info!("Step 5/5: Setting up continuous replication...");
551 tracing::info!("========================================");
552 tracing::info!("");
553
554 crate::commands::sync(
556 source_url,
557 target_url,
558 Some(filter),
559 None,
560 None,
561 None,
562 false,
563 )
564 .await
565 .context("Failed to set up continuous replication")?;
566
567 tracing::info!("");
568 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
569 } else {
570 tracing::info!("");
571 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
572 tracing::info!(" To enable it later, run:");
573 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
574 }
575
576 Ok(())
577}
578
579fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
581 let parts: Vec<&str> = url.split('?').collect();
586 let base_url = parts[0];
587 let params = if parts.len() > 1 {
588 Some(parts[1])
589 } else {
590 None
591 };
592
593 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
595 if url_parts.len() != 2 {
596 anyhow::bail!("Invalid connection URL format");
597 }
598
599 let mut new_url = format!("{}/{}", url_parts[1], new_database);
601 if let Some(p) = params {
602 new_url = format!("{}?{}", new_url, p);
603 }
604
605 Ok(new_url)
606}
607
608fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
625 use std::time::Duration;
626
627 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
629 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
630
631 println!();
633 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
634 println!("{}", "─".repeat(50));
635
636 for size in sizes {
638 println!(
639 "{:<20} {:<12} {:<15}",
640 size.name,
641 size.size_human,
642 migration::format_duration(size.estimated_duration)
643 );
644 }
645
646 println!("{}", "─".repeat(50));
648 println!(
649 "Total: {} (estimated {})",
650 migration::format_bytes(total_bytes),
651 migration::format_duration(total_duration)
652 );
653 println!();
654
655 print!("Proceed with replication? [y/N]: ");
657 io::stdout().flush()?;
658
659 let mut input = String::new();
660 io::stdin()
661 .read_line(&mut input)
662 .context("Failed to read user input")?;
663
664 Ok(input.trim().to_lowercase() == "y")
665}
666
667async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
669 let db_url = replace_database_in_url(target_url, db_name)?;
671 let client = postgres::connect_with_retry(&db_url).await?;
672
673 let query = "
674 SELECT COUNT(*)
675 FROM information_schema.tables
676 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
677 ";
678
679 let row = client.query_one(query, &[]).await?;
680 let count: i64 = row.get(0);
681
682 Ok(count == 0)
683}
684
685fn prompt_drop_database(db_name: &str) -> Result<bool> {
687 use std::io::{self, Write};
688
689 print!(
690 "\nWarning: Database '{}' already exists on target and contains data.\n\
691 Drop and recreate database? This will delete all existing data. [y/N]: ",
692 db_name
693 );
694 io::stdout().flush()?;
695
696 let mut input = String::new();
697 io::stdin().read_line(&mut input)?;
698
699 Ok(input.trim().eq_ignore_ascii_case("y"))
700}
701
702async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
704 crate::utils::validate_postgres_identifier(db_name)
706 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
707
708 tracing::info!(" Dropping existing database '{}'...", db_name);
709
710 let terminate_query = "
712 SELECT pg_terminate_backend(pid)
713 FROM pg_stat_activity
714 WHERE datname = $1 AND pid <> pg_backend_pid()
715 ";
716 target_conn.execute(terminate_query, &[&db_name]).await?;
717
718 let drop_query = format!(
720 "DROP DATABASE IF EXISTS {}",
721 crate::utils::quote_ident(db_name)
722 );
723 target_conn
724 .execute(&drop_query, &[])
725 .await
726 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
727
728 tracing::info!(" ✓ Database '{}' dropped", db_name);
729 Ok(())
730}
731
732pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
780 tracing::info!("Starting SQLite to PostgreSQL migration...");
781
782 tracing::info!("Step 1/4: Validating SQLite database...");
784 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
785 .context("SQLite file validation failed")?;
786 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
787
788 tracing::info!("Step 2/4: Opening SQLite database...");
790 let sqlite_conn =
791 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
792 tracing::info!(" ✓ SQLite database opened (read-only mode)");
793
794 tracing::info!("Step 3/4: Discovering tables...");
796 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
797 .context("Failed to list tables from SQLite database")?;
798
799 if tables.is_empty() {
800 tracing::warn!("⚠ No tables found in SQLite database");
801 tracing::info!("✅ Migration complete (no tables to migrate)");
802 return Ok(());
803 }
804
805 tracing::info!("Found {} table(s) to migrate", tables.len());
806
807 let target_client = postgres::connect_with_retry(target_url).await?;
809 tracing::info!(" ✓ Connected to PostgreSQL target");
810
811 tracing::info!("Step 4/4: Migrating tables...");
813 for (idx, table_name) in tables.iter().enumerate() {
814 tracing::info!(
815 "Migrating table {}/{}: '{}'",
816 idx + 1,
817 tables.len(),
818 table_name
819 );
820
821 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
823 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
824
825 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
826
827 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
829 .await
830 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
831
832 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
833
834 if !rows.is_empty() {
835 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
837 .await
838 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
839
840 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
841 } else {
842 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
843 }
844 }
845
846 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
847 tracing::info!(
848 " Migrated {} table(s) from '{}' to PostgreSQL",
849 tables.len(),
850 sqlite_path
851 );
852
853 Ok(())
854}
855
856pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
907 tracing::info!("Starting MongoDB to PostgreSQL migration...");
908
909 tracing::info!("Step 1/5: Validating MongoDB connection...");
911 let client = crate::mongodb::connect_mongodb(mongo_url)
912 .await
913 .context("MongoDB connection failed")?;
914 tracing::info!(" ✓ MongoDB connection validated");
915
916 tracing::info!("Step 2/5: Extracting database name...");
918 let db_name = crate::mongodb::extract_database_name(mongo_url)
919 .await
920 .context("Failed to parse MongoDB connection string")?
921 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
922 tracing::info!(" ✓ Database name: '{}'", db_name);
923
924 tracing::info!("Step 3/5: Discovering collections...");
926 let db = client.database(&db_name);
927 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
928 .await
929 .context("Failed to list collections from MongoDB database")?;
930
931 if collections.is_empty() {
932 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
933 tracing::info!("✅ Migration complete (no collections to migrate)");
934 return Ok(());
935 }
936
937 tracing::info!("Found {} collection(s) to migrate", collections.len());
938
939 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
941 let target_client = postgres::connect_with_retry(target_url).await?;
942 tracing::info!(" ✓ Connected to PostgreSQL target");
943
944 tracing::info!("Step 5/5: Migrating collections...");
946 for (idx, collection_name) in collections.iter().enumerate() {
947 tracing::info!(
948 "Migrating collection {}/{}: '{}'",
949 idx + 1,
950 collections.len(),
951 collection_name
952 );
953
954 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
956 .await
957 .with_context(|| {
958 format!(
959 "Failed to convert collection '{}' to JSONB",
960 collection_name
961 )
962 })?;
963
964 tracing::info!(
965 " ✓ Converted {} documents from '{}'",
966 rows.len(),
967 collection_name
968 );
969
970 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
972 .await
973 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
974
975 tracing::info!(
976 " ✓ Created JSONB table '{}' in PostgreSQL",
977 collection_name
978 );
979
980 if !rows.is_empty() {
981 crate::jsonb::writer::insert_jsonb_batch(
983 &target_client,
984 collection_name,
985 rows,
986 "mongodb",
987 )
988 .await
989 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
990
991 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
992 } else {
993 tracing::info!(
994 " ✓ Collection '{}' is empty (no documents to insert)",
995 collection_name
996 );
997 }
998 }
999
1000 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
1001 tracing::info!(
1002 " Migrated {} collection(s) from database '{}' to PostgreSQL",
1003 collections.len(),
1004 db_name
1005 );
1006
1007 Ok(())
1008}
1009
1010pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1061 tracing::info!("Starting MySQL to PostgreSQL replication...");
1062
1063 tracing::info!("Step 1/5: Validating MySQL connection...");
1065 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1066 .await
1067 .context("MySQL connection failed")?;
1068 tracing::info!(" ✓ MySQL connection validated");
1069
1070 tracing::info!("Step 2/5: Extracting database name...");
1072 let db_name = crate::mysql::extract_database_name(mysql_url)
1073 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1074 tracing::info!(" ✓ Database name: '{}'", db_name);
1075
1076 tracing::info!("Step 3/5: Discovering tables...");
1078 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1079 .await
1080 .context("Failed to list tables from MySQL database")?;
1081
1082 if tables.is_empty() {
1083 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1084 tracing::info!("✅ Replication complete (no tables to replicate)");
1085 return Ok(());
1086 }
1087
1088 tracing::info!("Found {} table(s) to replicate", tables.len());
1089
1090 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1092 let target_client = postgres::connect_with_retry(target_url).await?;
1093 tracing::info!(" ✓ Connected to PostgreSQL target");
1094
1095 tracing::info!("Step 5/5: Replicating tables...");
1097 for (idx, table_name) in tables.iter().enumerate() {
1098 tracing::info!(
1099 "Replicating table {}/{}: '{}'",
1100 idx + 1,
1101 tables.len(),
1102 table_name
1103 );
1104
1105 let rows =
1107 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1108 .await
1109 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1110
1111 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1112
1113 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1115 .await
1116 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1117
1118 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1119
1120 if !rows.is_empty() {
1121 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1123 .await
1124 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1125
1126 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1127 } else {
1128 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1129 }
1130 }
1131
1132 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1133 tracing::info!(
1134 " Replicated {} table(s) from database '{}' to PostgreSQL",
1135 tables.len(),
1136 db_name
1137 );
1138
1139 Ok(())
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144 use super::*;
1145
1146 #[tokio::test]
1147 #[ignore]
1148 async fn test_init_replicates_database() {
1149 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1150 let target = std::env::var("TEST_TARGET_URL").unwrap();
1151
1152 let filter = crate::filters::ReplicationFilter::empty();
1154 let result = init(&source, &target, true, filter, false, false, true, false).await;
1155 assert!(result.is_ok());
1156 }
1157
1158 #[test]
1159 fn test_replace_database_in_url() {
1160 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1161 let result = replace_database_in_url(url, "newdb").unwrap();
1162 assert_eq!(
1163 result,
1164 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1165 );
1166
1167 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1168 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1169 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1170 }
1171
1172 #[tokio::test]
1173 #[ignore]
1174 async fn test_database_is_empty() {
1175 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1176
1177 let result = database_is_empty(&url, "postgres").await;
1180 assert!(result.is_ok());
1181 }
1182}