1use crate::{checkpoint, migration, postgres};
5use anyhow::{bail, Context, Result};
6use std::io::{self, Write};
7use tokio_postgres::Client;
8
9pub async fn init(
79 source_url: &str,
80 target_url: &str,
81 skip_confirmation: bool,
82 filter: crate::filters::ReplicationFilter,
83 drop_existing: bool,
84 enable_sync: bool,
85 allow_resume: bool,
86) -> Result<()> {
87 tracing::info!("Starting initial replication...");
88
89 let source_type =
91 crate::detect_source_type(source_url).context("Failed to detect source database type")?;
92
93 match source_type {
94 crate::SourceType::PostgreSQL => {
95 tracing::info!("Source type: PostgreSQL");
97 }
98 crate::SourceType::SQLite => {
99 tracing::info!("Source type: SQLite");
101
102 if !filter.is_empty() {
104 tracing::warn!(
105 "⚠ Filters are not supported for SQLite sources (all tables will be migrated)"
106 );
107 }
108 if drop_existing {
109 tracing::warn!("⚠ --drop-existing flag is not applicable for SQLite sources");
110 }
111 if !enable_sync {
112 tracing::warn!(
113 "⚠ SQLite sources don't support continuous replication (one-time migration only)"
114 );
115 }
116
117 return init_sqlite_to_postgres(source_url, target_url).await;
118 }
119 crate::SourceType::MongoDB => {
120 tracing::info!("Source type: MongoDB");
122
123 if !filter.is_empty() {
125 tracing::warn!(
126 "⚠ Filters are not supported for MongoDB sources (all collections will be migrated)"
127 );
128 }
129 if drop_existing {
130 tracing::warn!("⚠ --drop-existing flag is not applicable for MongoDB sources");
131 }
132 if !enable_sync {
133 tracing::warn!(
134 "⚠ MongoDB sources don't support continuous replication (one-time migration only)"
135 );
136 }
137
138 return init_mongodb_to_postgres(source_url, target_url).await;
139 }
140 crate::SourceType::MySQL => {
141 tracing::info!("Source type: MySQL");
143
144 if !filter.is_empty() {
146 tracing::warn!(
147 "⚠ Filters are not supported for MySQL sources (all tables will be replicated)"
148 );
149 }
150 if drop_existing {
151 tracing::warn!("⚠ --drop-existing flag is not applicable for MySQL sources");
152 }
153 if !enable_sync {
154 tracing::warn!(
155 "⚠ MySQL sources don't support continuous replication (one-time replication only)"
156 );
157 }
158
159 return init_mysql_to_postgres(source_url, target_url).await;
160 }
161 }
162
163 crate::utils::validate_source_target_different(source_url, target_url)
165 .context("Source and target validation failed")?;
166 tracing::info!("✓ Verified source and target are different databases");
167
168 let temp_path =
171 crate::utils::create_managed_temp_dir().context("Failed to create temp directory")?;
172 tracing::debug!("Using temp directory: {}", temp_path.display());
173
174 let checkpoint_path = checkpoint::checkpoint_path(source_url, target_url)
175 .context("Failed to determine checkpoint location")?;
176
177 tracing::info!("Step 1/4: Dumping global objects (roles, tablespaces)...");
179 let globals_file = temp_path.join("globals.sql");
180 migration::dump_globals(source_url, globals_file.to_str().unwrap()).await?;
181
182 tracing::info!("Step 2/4: Restoring global objects to target...");
184 migration::restore_globals(target_url, globals_file.to_str().unwrap()).await?;
185
186 tracing::info!("Step 3/4: Discovering databases...");
188 let all_databases = {
189 let source_client = postgres::connect_with_retry(source_url).await?;
191 migration::list_databases(&source_client).await?
192 }; let databases: Vec<_> = all_databases
196 .into_iter()
197 .filter(|db| filter.should_replicate_database(&db.name))
198 .collect();
199
200 if databases.is_empty() {
201 let _ = checkpoint::remove_checkpoint(&checkpoint_path);
202 if filter.is_empty() {
203 tracing::warn!("⚠ No user databases found on source");
204 tracing::warn!(" This is unusual - the source database appears empty");
205 tracing::warn!(" Only global objects (roles, tablespaces) will be replicated");
206 } else {
207 tracing::warn!("⚠ No databases matched the filter criteria");
208 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
209 }
210 tracing::info!("✅ Initial replication complete (no databases to replicate)");
211 return Ok(());
212 }
213
214 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
215 let filter_hash = filter.fingerprint();
216 let checkpoint_metadata = checkpoint::InitCheckpointMetadata::new(
217 source_url,
218 target_url,
219 filter_hash,
220 drop_existing,
221 enable_sync,
222 );
223
224 let mut checkpoint_state = if allow_resume {
225 match checkpoint::InitCheckpoint::load(&checkpoint_path)? {
226 Some(existing) => {
227 match existing.validate(&checkpoint_metadata, &database_names) {
229 Ok(()) => {
230 if existing.completed_count() > 0 {
232 tracing::info!(
233 "Resume checkpoint found: {}/{} databases already replicated",
234 existing.completed_count(),
235 existing.total_databases()
236 );
237 } else {
238 tracing::info!(
239 "Resume checkpoint found but no databases marked complete yet"
240 );
241 }
242 existing
243 }
244 Err(e) => {
245 tracing::warn!("⚠ Checkpoint metadata mismatch detected:");
247 tracing::warn!(
248 " Previous run configuration differs from current configuration"
249 );
250 tracing::warn!(" - Schema-only tables may have changed");
251 tracing::warn!(" - Time filters may have changed");
252 tracing::warn!(" - Table selection may have changed");
253 tracing::warn!(" Error: {}", e);
254 tracing::info!("");
255 tracing::info!(
256 "✓ Automatically discarding old checkpoint and starting fresh"
257 );
258 checkpoint::remove_checkpoint(&checkpoint_path)?;
259 checkpoint::InitCheckpoint::new(
260 checkpoint_metadata.clone(),
261 &database_names,
262 )
263 }
264 }
265 }
266 None => checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names),
267 }
268 } else {
269 if checkpoint_path.exists() {
270 tracing::info!(
271 "--no-resume supplied: discarding previous checkpoint at {}",
272 checkpoint_path.display()
273 );
274 }
275 checkpoint::remove_checkpoint(&checkpoint_path)?;
276 checkpoint::InitCheckpoint::new(checkpoint_metadata.clone(), &database_names)
277 };
278
279 checkpoint_state
281 .save(&checkpoint_path)
282 .context("Failed to persist checkpoint state")?;
283
284 tracing::info!("Found {} database(s) to replicate", databases.len());
285
286 if !skip_confirmation {
288 tracing::info!("Analyzing database sizes...");
289 let size_estimates = {
290 let source_client = postgres::connect_with_retry(source_url).await?;
292 migration::estimate_database_sizes(source_url, &source_client, &databases, &filter)
293 .await?
294 }; if !confirm_replication(&size_estimates)? {
297 bail!("Replication cancelled by user");
298 }
299 }
300
301 tracing::info!("Step 4/4: Replicating databases...");
303 for (idx, db_info) in databases.iter().enumerate() {
304 let filtered_tables = filter.predicate_tables(&db_info.name);
305 if checkpoint_state.is_completed(&db_info.name) {
306 tracing::info!(
307 "Skipping database '{}' (already completed per checkpoint)",
308 db_info.name
309 );
310 continue;
311 }
312 tracing::info!(
313 "Replicating database {}/{}: '{}'",
314 idx + 1,
315 databases.len(),
316 db_info.name
317 );
318
319 let source_db_url = replace_database_in_url(source_url, &db_info.name)?;
321 let target_db_url = replace_database_in_url(target_url, &db_info.name)?;
322
323 {
326 let target_client = postgres::connect_with_retry(target_url).await?;
327
328 crate::utils::validate_postgres_identifier(&db_info.name)
330 .with_context(|| format!("Invalid database name: '{}'", db_info.name))?;
331
332 let create_query = format!("CREATE DATABASE \"{}\"", db_info.name);
334 match target_client.execute(&create_query, &[]).await {
335 Ok(_) => {
336 tracing::info!(" Created database '{}'", db_info.name);
337 }
338 Err(err) => {
339 if let Some(db_error) = err.as_db_error() {
341 if db_error.code() == &tokio_postgres::error::SqlState::DUPLICATE_DATABASE {
342 tracing::info!(
344 " Database '{}' already exists on target",
345 db_info.name
346 );
347
348 if database_is_empty(target_url, &db_info.name).await? {
350 tracing::info!(
351 " Database '{}' is empty, proceeding with restore",
352 db_info.name
353 );
354 } else {
355 let should_drop = if drop_existing {
357 true
359 } else if skip_confirmation {
360 tracing::info!(
362 " Auto-confirming drop for database '{}' (--yes flag)",
363 db_info.name
364 );
365 true
366 } else {
367 prompt_drop_database(&db_info.name)?
369 };
370
371 if should_drop {
372 drop_database_if_exists(&target_client, &db_info.name).await?;
373
374 let create_query =
376 format!("CREATE DATABASE \"{}\"", db_info.name);
377 target_client
378 .execute(&create_query, &[])
379 .await
380 .with_context(|| {
381 format!(
382 "Failed to create database '{}' after drop",
383 db_info.name
384 )
385 })?;
386 tracing::info!(" Created database '{}'", db_info.name);
387 } else {
388 bail!("Aborted: Database '{}' already exists", db_info.name);
389 }
390 }
391 } else {
392 return Err(err).with_context(|| {
394 format!("Failed to create database '{}'", db_info.name)
395 });
396 }
397 } else {
398 return Err(err).with_context(|| {
400 format!("Failed to create database '{}'", db_info.name)
401 });
402 }
403 }
404 }
405 } tracing::info!(" Dumping schema for '{}'...", db_info.name);
409 let schema_file = temp_path.join(format!("{}_schema.sql", db_info.name));
410 migration::dump_schema(
411 &source_db_url,
412 &db_info.name,
413 schema_file.to_str().unwrap(),
414 &filter,
415 )
416 .await?;
417
418 tracing::info!(" Restoring schema for '{}'...", db_info.name);
419 migration::restore_schema(&target_db_url, schema_file.to_str().unwrap()).await?;
420
421 tracing::info!(" Dumping data for '{}'...", db_info.name);
423 let data_dir = temp_path.join(format!("{}_data.dump", db_info.name));
424 migration::dump_data(
425 &source_db_url,
426 &db_info.name,
427 data_dir.to_str().unwrap(),
428 &filter,
429 )
430 .await?;
431
432 tracing::info!(" Restoring data for '{}'...", db_info.name);
433 migration::restore_data(&target_db_url, data_dir.to_str().unwrap()).await?;
434
435 if !filtered_tables.is_empty() {
436 tracing::info!(
437 " Applying filtered replication for {} table(s)...",
438 filtered_tables.len()
439 );
440 migration::filtered::copy_filtered_tables(
441 &source_db_url,
442 &target_db_url,
443 &filtered_tables,
444 )
445 .await?;
446 }
447
448 tracing::info!("✓ Database '{}' replicated successfully", db_info.name);
449
450 checkpoint_state.mark_completed(&db_info.name);
451 checkpoint_state
452 .save(&checkpoint_path)
453 .with_context(|| format!("Failed to update checkpoint for '{}'", db_info.name))?;
454 }
455
456 if let Err(e) = crate::utils::remove_managed_temp_dir(&temp_path) {
459 tracing::warn!("Failed to clean up temp directory: {}", e);
460 }
462
463 if let Err(err) = checkpoint::remove_checkpoint(&checkpoint_path) {
464 tracing::warn!("Failed to remove checkpoint state: {}", err);
465 }
466
467 tracing::info!("✅ Initial replication complete");
468
469 let mut should_enable_sync = enable_sync;
471 if enable_sync {
472 tracing::info!("Checking target wal_level for logical replication...");
473 let target_wal_level = {
474 let target_client = postgres::connect_with_retry(target_url).await?;
476 postgres::check_wal_level(&target_client).await?
477 }; if target_wal_level != "logical" {
480 tracing::warn!("");
481 tracing::warn!("⚠ Target database wal_level is set to '{}', but 'logical' is required for continuous sync", target_wal_level);
482 tracing::warn!(" Continuous replication (subscriptions) cannot be set up");
483 tracing::warn!("");
484 tracing::warn!(" To fix this:");
485 tracing::warn!(" 1. Edit postgresql.conf: wal_level = logical");
486 tracing::warn!(" 2. Restart PostgreSQL server");
487 tracing::warn!(
488 " 3. Run: postgres-seren-replicator sync --source <url> --target <url>"
489 );
490 tracing::warn!("");
491 tracing::info!("✓ Continuing with snapshot-only replication (sync disabled)");
492 should_enable_sync = false;
493 }
494 }
495
496 if should_enable_sync {
498 tracing::info!("");
499 tracing::info!("========================================");
500 tracing::info!("Step 5/5: Setting up continuous replication...");
501 tracing::info!("========================================");
502 tracing::info!("");
503
504 crate::commands::sync(
506 source_url,
507 target_url,
508 Some(filter),
509 None,
510 None,
511 None,
512 false,
513 )
514 .await
515 .context("Failed to set up continuous replication")?;
516
517 tracing::info!("");
518 tracing::info!("✅ Complete! Snapshot and continuous replication are active");
519 } else {
520 tracing::info!("");
521 tracing::info!("ℹ Continuous replication was not set up (--no-sync flag)");
522 tracing::info!(" To enable it later, run:");
523 tracing::info!(" postgres-seren-replicator sync --source <url> --target <url>");
524 }
525
526 Ok(())
527}
528
529fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
531 let parts: Vec<&str> = url.split('?').collect();
536 let base_url = parts[0];
537 let params = if parts.len() > 1 {
538 Some(parts[1])
539 } else {
540 None
541 };
542
543 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
545 if url_parts.len() != 2 {
546 anyhow::bail!("Invalid connection URL format");
547 }
548
549 let mut new_url = format!("{}/{}", url_parts[1], new_database);
551 if let Some(p) = params {
552 new_url = format!("{}?{}", new_url, p);
553 }
554
555 Ok(new_url)
556}
557
558fn confirm_replication(sizes: &[migration::DatabaseSizeInfo]) -> Result<bool> {
575 use std::time::Duration;
576
577 let total_bytes: i64 = sizes.iter().map(|s| s.size_bytes).sum();
579 let total_duration: Duration = sizes.iter().map(|s| s.estimated_duration).sum();
580
581 println!();
583 println!("{:<20} {:<12} {:<15}", "Database", "Size", "Est. Time");
584 println!("{}", "─".repeat(50));
585
586 for size in sizes {
588 println!(
589 "{:<20} {:<12} {:<15}",
590 size.name,
591 size.size_human,
592 migration::format_duration(size.estimated_duration)
593 );
594 }
595
596 println!("{}", "─".repeat(50));
598 println!(
599 "Total: {} (estimated {})",
600 migration::format_bytes(total_bytes),
601 migration::format_duration(total_duration)
602 );
603 println!();
604
605 print!("Proceed with replication? [y/N]: ");
607 io::stdout().flush()?;
608
609 let mut input = String::new();
610 io::stdin()
611 .read_line(&mut input)
612 .context("Failed to read user input")?;
613
614 Ok(input.trim().to_lowercase() == "y")
615}
616
617async fn database_is_empty(target_url: &str, db_name: &str) -> Result<bool> {
619 let db_url = replace_database_in_url(target_url, db_name)?;
621 let client = postgres::connect_with_retry(&db_url).await?;
622
623 let query = "
624 SELECT COUNT(*)
625 FROM information_schema.tables
626 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
627 ";
628
629 let row = client.query_one(query, &[]).await?;
630 let count: i64 = row.get(0);
631
632 Ok(count == 0)
633}
634
635fn prompt_drop_database(db_name: &str) -> Result<bool> {
637 use std::io::{self, Write};
638
639 print!(
640 "\nWarning: Database '{}' already exists on target and contains data.\n\
641 Drop and recreate database? This will delete all existing data. [y/N]: ",
642 db_name
643 );
644 io::stdout().flush()?;
645
646 let mut input = String::new();
647 io::stdin().read_line(&mut input)?;
648
649 Ok(input.trim().eq_ignore_ascii_case("y"))
650}
651
652async fn drop_database_if_exists(target_conn: &Client, db_name: &str) -> Result<()> {
654 crate::utils::validate_postgres_identifier(db_name)
656 .with_context(|| format!("Invalid database name: '{}'", db_name))?;
657
658 tracing::info!(" Dropping existing database '{}'...", db_name);
659
660 let terminate_query = "
662 SELECT pg_terminate_backend(pid)
663 FROM pg_stat_activity
664 WHERE datname = $1 AND pid <> pg_backend_pid()
665 ";
666 target_conn.execute(terminate_query, &[&db_name]).await?;
667
668 let drop_query = format!("DROP DATABASE IF EXISTS \"{}\"", db_name);
670 target_conn
671 .execute(&drop_query, &[])
672 .await
673 .with_context(|| format!("Failed to drop database '{}'", db_name))?;
674
675 tracing::info!(" ✓ Database '{}' dropped", db_name);
676 Ok(())
677}
678
679pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Result<()> {
727 tracing::info!("Starting SQLite to PostgreSQL migration...");
728
729 tracing::info!("Step 1/4: Validating SQLite database...");
731 let canonical_path = crate::sqlite::validate_sqlite_path(sqlite_path)
732 .context("SQLite file validation failed")?;
733 tracing::info!(" ✓ SQLite file validated: {}", canonical_path.display());
734
735 tracing::info!("Step 2/4: Opening SQLite database...");
737 let sqlite_conn =
738 crate::sqlite::open_sqlite(sqlite_path).context("Failed to open SQLite database")?;
739 tracing::info!(" ✓ SQLite database opened (read-only mode)");
740
741 tracing::info!("Step 3/4: Discovering tables...");
743 let tables = crate::sqlite::reader::list_tables(&sqlite_conn)
744 .context("Failed to list tables from SQLite database")?;
745
746 if tables.is_empty() {
747 tracing::warn!("⚠ No tables found in SQLite database");
748 tracing::info!("✅ Migration complete (no tables to migrate)");
749 return Ok(());
750 }
751
752 tracing::info!("Found {} table(s) to migrate", tables.len());
753
754 let target_client = postgres::connect_with_retry(target_url).await?;
756 tracing::info!(" ✓ Connected to PostgreSQL target");
757
758 tracing::info!("Step 4/4: Migrating tables...");
760 for (idx, table_name) in tables.iter().enumerate() {
761 tracing::info!(
762 "Migrating table {}/{}: '{}'",
763 idx + 1,
764 tables.len(),
765 table_name
766 );
767
768 let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
770 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
771
772 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
773
774 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
776 .await
777 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
778
779 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
780
781 if !rows.is_empty() {
782 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
784 .await
785 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
786
787 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
788 } else {
789 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
790 }
791 }
792
793 tracing::info!("✅ SQLite to PostgreSQL migration complete!");
794 tracing::info!(
795 " Migrated {} table(s) from '{}' to PostgreSQL",
796 tables.len(),
797 sqlite_path
798 );
799
800 Ok(())
801}
802
803pub async fn init_mongodb_to_postgres(mongo_url: &str, target_url: &str) -> Result<()> {
854 tracing::info!("Starting MongoDB to PostgreSQL migration...");
855
856 tracing::info!("Step 1/5: Validating MongoDB connection...");
858 let client = crate::mongodb::connect_mongodb(mongo_url)
859 .await
860 .context("MongoDB connection failed")?;
861 tracing::info!(" ✓ MongoDB connection validated");
862
863 tracing::info!("Step 2/5: Extracting database name...");
865 let db_name = crate::mongodb::extract_database_name(mongo_url)
866 .await
867 .context("Failed to parse MongoDB connection string")?
868 .context("MongoDB URL must include database name (e.g., mongodb://host:27017/dbname)")?;
869 tracing::info!(" ✓ Database name: '{}'", db_name);
870
871 tracing::info!("Step 3/5: Discovering collections...");
873 let db = client.database(&db_name);
874 let collections = crate::mongodb::reader::list_collections(&client, &db_name)
875 .await
876 .context("Failed to list collections from MongoDB database")?;
877
878 if collections.is_empty() {
879 tracing::warn!("⚠ No collections found in MongoDB database '{}'", db_name);
880 tracing::info!("✅ Migration complete (no collections to migrate)");
881 return Ok(());
882 }
883
884 tracing::info!("Found {} collection(s) to migrate", collections.len());
885
886 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
888 let target_client = postgres::connect_with_retry(target_url).await?;
889 tracing::info!(" ✓ Connected to PostgreSQL target");
890
891 tracing::info!("Step 5/5: Migrating collections...");
893 for (idx, collection_name) in collections.iter().enumerate() {
894 tracing::info!(
895 "Migrating collection {}/{}: '{}'",
896 idx + 1,
897 collections.len(),
898 collection_name
899 );
900
901 let rows = crate::mongodb::converter::convert_collection_to_jsonb(&db, collection_name)
903 .await
904 .with_context(|| {
905 format!(
906 "Failed to convert collection '{}' to JSONB",
907 collection_name
908 )
909 })?;
910
911 tracing::info!(
912 " ✓ Converted {} documents from '{}'",
913 rows.len(),
914 collection_name
915 );
916
917 crate::jsonb::writer::create_jsonb_table(&target_client, collection_name, "mongodb")
919 .await
920 .with_context(|| format!("Failed to create JSONB table '{}'", collection_name))?;
921
922 tracing::info!(
923 " ✓ Created JSONB table '{}' in PostgreSQL",
924 collection_name
925 );
926
927 if !rows.is_empty() {
928 crate::jsonb::writer::insert_jsonb_batch(
930 &target_client,
931 collection_name,
932 rows,
933 "mongodb",
934 )
935 .await
936 .with_context(|| format!("Failed to insert data into table '{}'", collection_name))?;
937
938 tracing::info!(" ✓ Inserted all documents into '{}'", collection_name);
939 } else {
940 tracing::info!(
941 " ✓ Collection '{}' is empty (no documents to insert)",
942 collection_name
943 );
944 }
945 }
946
947 tracing::info!("✅ MongoDB to PostgreSQL migration complete!");
948 tracing::info!(
949 " Migrated {} collection(s) from database '{}' to PostgreSQL",
950 collections.len(),
951 db_name
952 );
953
954 Ok(())
955}
956
957pub async fn init_mysql_to_postgres(mysql_url: &str, target_url: &str) -> Result<()> {
1008 tracing::info!("Starting MySQL to PostgreSQL replication...");
1009
1010 tracing::info!("Step 1/5: Validating MySQL connection...");
1012 let mut mysql_conn = crate::mysql::connect_mysql(mysql_url)
1013 .await
1014 .context("MySQL connection failed")?;
1015 tracing::info!(" ✓ MySQL connection validated");
1016
1017 tracing::info!("Step 2/5: Extracting database name...");
1019 let db_name = crate::mysql::extract_database_name(mysql_url)
1020 .context("MySQL URL must include database name (e.g., mysql://host:3306/dbname)")?;
1021 tracing::info!(" ✓ Database name: '{}'", db_name);
1022
1023 tracing::info!("Step 3/5: Discovering tables...");
1025 let tables = crate::mysql::reader::list_tables(&mut mysql_conn, &db_name)
1026 .await
1027 .context("Failed to list tables from MySQL database")?;
1028
1029 if tables.is_empty() {
1030 tracing::warn!("⚠ No tables found in MySQL database '{}'", db_name);
1031 tracing::info!("✅ Replication complete (no tables to replicate)");
1032 return Ok(());
1033 }
1034
1035 tracing::info!("Found {} table(s) to replicate", tables.len());
1036
1037 tracing::info!("Step 4/5: Connecting to PostgreSQL target...");
1039 let target_client = postgres::connect_with_retry(target_url).await?;
1040 tracing::info!(" ✓ Connected to PostgreSQL target");
1041
1042 tracing::info!("Step 5/5: Replicating tables...");
1044 for (idx, table_name) in tables.iter().enumerate() {
1045 tracing::info!(
1046 "Replicating table {}/{}: '{}'",
1047 idx + 1,
1048 tables.len(),
1049 table_name
1050 );
1051
1052 let rows =
1054 crate::mysql::converter::convert_table_to_jsonb(&mut mysql_conn, &db_name, table_name)
1055 .await
1056 .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
1057
1058 tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
1059
1060 crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "mysql")
1062 .await
1063 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
1064
1065 tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
1066
1067 if !rows.is_empty() {
1068 crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "mysql")
1070 .await
1071 .with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
1072
1073 tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
1074 } else {
1075 tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
1076 }
1077 }
1078
1079 tracing::info!("✅ MySQL to PostgreSQL replication complete!");
1080 tracing::info!(
1081 " Replicated {} table(s) from database '{}' to PostgreSQL",
1082 tables.len(),
1083 db_name
1084 );
1085
1086 Ok(())
1087}
1088
1089#[cfg(test)]
1090mod tests {
1091 use super::*;
1092
1093 #[tokio::test]
1094 #[ignore]
1095 async fn test_init_replicates_database() {
1096 let source = std::env::var("TEST_SOURCE_URL").unwrap();
1097 let target = std::env::var("TEST_TARGET_URL").unwrap();
1098
1099 let filter = crate::filters::ReplicationFilter::empty();
1101 let result = init(&source, &target, true, filter, false, false, true).await;
1102 assert!(result.is_ok());
1103 }
1104
1105 #[test]
1106 fn test_replace_database_in_url() {
1107 let url = "postgresql://user:pass@host:5432/olddb?sslmode=require";
1108 let result = replace_database_in_url(url, "newdb").unwrap();
1109 assert_eq!(
1110 result,
1111 "postgresql://user:pass@host:5432/newdb?sslmode=require"
1112 );
1113
1114 let url_no_params = "postgresql://user:pass@host:5432/olddb";
1115 let result = replace_database_in_url(url_no_params, "newdb").unwrap();
1116 assert_eq!(result, "postgresql://user:pass@host:5432/newdb");
1117 }
1118
1119 #[tokio::test]
1120 #[ignore]
1121 async fn test_database_is_empty() {
1122 let url = std::env::var("TEST_TARGET_URL").expect("TEST_TARGET_URL not set");
1123
1124 let result = database_is_empty(&url, "postgres").await;
1127 assert!(result.is_ok());
1128 }
1129}