database_replicator/commands/
sync.rs

1// ABOUTME: Sync command implementation - Phase 3 of migration
2// ABOUTME: Sets up logical replication between source and target databases
3
4use crate::migration;
5use crate::postgres::connect;
6use crate::replication::{
7    create_publication, create_subscription, detect_subscription_state, drop_subscription,
8    wait_for_sync, SubscriptionState,
9};
10use anyhow::{Context, Result};
11
12/// Set up logical replication between source and target databases
13///
14/// This command performs Phase 3 of the migration process:
15/// 1. Discovers all databases on the source
16/// 2. Filters databases based on the provided filter criteria
17/// 3. For each database:
18///    - Checks if subscription already exists (skips or recreates based on state and force flag)
19///    - Creates a publication on the source database (filtered tables if specified)
20///    - Creates a subscription on the target database pointing to the source
21///    - Waits for the initial sync to complete
22///
23/// After this command succeeds, changes on the source databases will continuously
24/// replicate to the target until the subscriptions are dropped.
25///
26/// This command is idempotent - it can be safely re-run if interrupted or if setup failed partially.
27///
28/// # Arguments
29///
30/// * `source_url` - PostgreSQL connection string for source database
31/// * `target_url` - PostgreSQL connection string for target (Seren) database
32/// * `filter` - Optional replication filter for database and table selection
33/// * `publication_name` - Optional publication name template (defaults to "seren_migration_pub")
34/// * `subscription_name` - Optional subscription name template (defaults to "seren_migration_sub")
35/// * `sync_timeout_secs` - Optional timeout in seconds per database (defaults to 300)
36/// * `force` - Force recreate subscriptions even if they already exist (defaults to false)
37///
38/// # Returns
39///
40/// Returns `Ok(())` if replication setup completes successfully for all databases.
41///
42/// # Errors
43///
44/// This function will return an error if:
45/// - Cannot connect to source or target database
46/// - Cannot discover databases on source
47/// - Publication creation fails for any database
48/// - Subscription creation fails for any database
49/// - Initial sync doesn't complete within timeout for any database
50///
51/// # Examples
52///
53/// ```no_run
54/// # use anyhow::Result;
55/// # use database_replicator::commands::sync;
56/// # use database_replicator::filters::ReplicationFilter;
57/// # async fn example() -> Result<()> {
58/// // Replicate all databases
59/// sync(
60///     "postgresql://user:pass@source.example.com/postgres",
61///     "postgresql://user:pass@target.example.com/postgres",
62///     None,  // No filter - replicate all databases
63///     None,  // Use default publication name
64///     None,  // Use default subscription name
65///     Some(600),  // 10 minute timeout per database
66///     false,  // Don't force recreate
67/// ).await?;
68///
69/// // Replicate only specific databases
70/// let filter = ReplicationFilter::new(
71///     Some(vec!["mydb".to_string(), "analytics".to_string()]),
72///     None,
73///     None,
74///     None,
75/// )?;
76/// sync(
77///     "postgresql://user:pass@source.example.com/postgres",
78///     "postgresql://user:pass@target.example.com/postgres",
79///     Some(filter),
80///     None,
81///     None,
82///     Some(600),
83///     false,  // Don't force recreate
84/// ).await?;
85/// # Ok(())
86/// # }
87/// ```
88pub async fn sync(
89    source_url: &str,
90    target_url: &str,
91    filter: Option<crate::filters::ReplicationFilter>,
92    publication_name: Option<&str>,
93    subscription_name: Option<&str>,
94    sync_timeout_secs: Option<u64>,
95    force: bool,
96) -> Result<()> {
97    let pub_name_template = publication_name.unwrap_or("seren_migration_pub");
98    let sub_name_template = subscription_name.unwrap_or("seren_migration_sub");
99    let timeout = sync_timeout_secs.unwrap_or(300); // 5 minutes default
100    let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
101
102    tracing::info!("Starting logical replication setup...");
103
104    // CRITICAL: Ensure source and target are different to prevent data loss
105    crate::utils::validate_source_target_different(source_url, target_url)
106        .context("Source and target validation failed")?;
107    tracing::info!("✓ Verified source and target are different databases");
108
109    // Check target wal_level before attempting logical replication
110    tracing::info!("Checking target wal_level for logical replication...");
111    let target_client = connect(target_url)
112        .await
113        .context("Failed to connect to target database")?;
114    let target_wal_level = crate::postgres::check_wal_level(&target_client).await?;
115
116    if target_wal_level != "logical" {
117        anyhow::bail!(
118            "Target database wal_level is set to '{}', but 'logical' is required for logical replication\n\
119             \n\
120             To fix this:\n\
121             \n\
122             Option 1: Set wal_level in postgresql.conf\n\
123               1. Edit postgresql.conf: wal_level = logical\n\
124               2. Restart PostgreSQL server\n\
125               3. Re-run this command\n\
126             \n\
127             Option 2: Skip continuous sync (snapshot only)\n\
128               Use the init command with --no-sync flag to perform initial snapshot without setting up logical replication\n\
129             \n\
130             Note: Some managed PostgreSQL services may require configuring wal_level through their control panel.",
131            target_wal_level
132        );
133    }
134    tracing::info!("✓ Target wal_level is set to 'logical' (logical replication supported)");
135
136    // Connect to source database to discover databases
137    tracing::info!("Connecting to source database...");
138    let source_client = connect(source_url)
139        .await
140        .context("Failed to connect to source database")?;
141    tracing::info!("✓ Connected to source");
142
143    // Discover databases on source
144    tracing::info!("Discovering databases on source...");
145    let all_databases = migration::list_databases(&source_client)
146        .await
147        .context("Failed to list databases on source")?;
148
149    // Apply filtering rules
150    let databases: Vec<_> = all_databases
151        .into_iter()
152        .filter(|db| filter.should_replicate_database(&db.name))
153        .collect();
154
155    if databases.is_empty() {
156        if filter.is_empty() {
157            tracing::warn!("⚠ No user databases found on source");
158            tracing::warn!("  This is unusual - the source database appears empty");
159            tracing::warn!("  Only template databases exist");
160        } else {
161            tracing::warn!("⚠ No databases matched the filter criteria");
162            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
163        }
164        tracing::info!("✅ Logical replication setup complete (no databases to replicate)");
165        return Ok(());
166    }
167
168    tracing::info!(
169        "Found {} database(s) to replicate: {}",
170        databases.len(),
171        databases
172            .iter()
173            .map(|db| db.name.as_str())
174            .collect::<Vec<_>>()
175            .join(", ")
176    );
177
178    // Set up replication for each database
179    for db in &databases {
180        tracing::info!("");
181        tracing::info!(
182            "========================================\nDatabase: '{}'\n========================================",
183            db.name
184        );
185
186        // Build database-specific connection URLs
187        let source_db_url = replace_database_in_url(source_url, &db.name).context(format!(
188            "Failed to build source URL for database '{}'",
189            db.name
190        ))?;
191        let target_db_url = replace_database_in_url(target_url, &db.name).context(format!(
192            "Failed to build target URL for database '{}'",
193            db.name
194        ))?;
195
196        // Build database-specific publication and subscription names
197        let pub_name = if databases.len() == 1 {
198            // Single database - use template name as-is
199            pub_name_template.to_string()
200        } else {
201            // Multiple databases - append database name to avoid conflicts
202            format!("{}_{}", pub_name_template, db.name)
203        };
204
205        let sub_name = if databases.len() == 1 {
206            // Single database - use template name as-is
207            sub_name_template.to_string()
208        } else {
209            // Multiple databases - append database name to avoid conflicts
210            format!("{}_{}", sub_name_template, db.name)
211        };
212
213        tracing::info!("Publication: '{}'", pub_name);
214        tracing::info!("Subscription: '{}'", sub_name);
215
216        // Connect to the specific database on source and target
217        tracing::info!("Connecting to source database '{}'...", db.name);
218        let source_db_client = connect(&source_db_url).await.context(format!(
219            "Failed to connect to source database '{}'",
220            db.name
221        ))?;
222        tracing::info!("✓ Connected to source");
223
224        tracing::info!("Connecting to target database '{}'...", db.name);
225        let target_db_client = connect(&target_db_url).await.context(format!(
226            "Failed to connect to target database '{}'",
227            db.name
228        ))?;
229        tracing::info!("✓ Connected to target");
230
231        // Create publication on source database
232        tracing::info!("Creating publication on source database...");
233        create_publication(&source_db_client, &db.name, &pub_name, &filter)
234            .await
235            .context(format!(
236                "Failed to create publication on source database '{}'",
237                db.name
238            ))?;
239
240        // Check if subscription already exists
241        tracing::info!("Checking subscription state...");
242        let sub_state = detect_subscription_state(&target_db_client, &sub_name)
243            .await
244            .context(format!(
245                "Failed to detect subscription state for '{}'",
246                sub_name
247            ))?;
248
249        match sub_state {
250            SubscriptionState::Streaming => {
251                if force {
252                    tracing::info!(
253                        "⚠ Subscription '{}' is already streaming, but --force flag is set",
254                        sub_name
255                    );
256                    tracing::info!("Dropping existing subscription...");
257                    drop_subscription(&target_db_client, &sub_name)
258                        .await
259                        .context(format!("Failed to drop subscription '{}'", sub_name))?;
260                    tracing::info!("Creating new subscription...");
261                    create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
262                        .await
263                        .context(format!(
264                            "Failed to create subscription on target database '{}'",
265                            db.name
266                        ))?;
267                    tracing::info!(
268                        "Waiting for initial sync to complete (timeout: {}s)...",
269                        timeout
270                    );
271                    wait_for_sync(&target_db_client, &sub_name, timeout)
272                        .await
273                        .context(format!(
274                            "Failed to wait for initial sync on database '{}'",
275                            db.name
276                        ))?;
277                } else {
278                    tracing::info!(
279                        "✓ Subscription '{}' is already streaming and healthy",
280                        sub_name
281                    );
282                    tracing::info!("  Skipping subscription creation (use --force to recreate)");
283                }
284            }
285            SubscriptionState::Initializing
286            | SubscriptionState::Copying
287            | SubscriptionState::Syncing => {
288                tracing::info!(
289                    "ℹ Subscription '{}' already exists and is syncing (state: {:?})",
290                    sub_name,
291                    sub_state
292                );
293                tracing::info!(
294                    "Waiting for existing sync to complete (timeout: {}s)...",
295                    timeout
296                );
297                wait_for_sync(&target_db_client, &sub_name, timeout)
298                    .await
299                    .context(format!(
300                        "Failed to wait for existing sync on database '{}'",
301                        db.name
302                    ))?;
303            }
304            SubscriptionState::Error(err_state) => {
305                tracing::warn!(
306                    "⚠ Subscription '{}' is in error state: {}",
307                    sub_name,
308                    err_state
309                );
310                if force {
311                    tracing::info!("Dropping failed subscription and recreating...");
312                    drop_subscription(&target_db_client, &sub_name)
313                        .await
314                        .context(format!("Failed to drop subscription '{}'", sub_name))?;
315                    tracing::info!("Creating new subscription...");
316                    create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
317                        .await
318                        .context(format!(
319                            "Failed to create subscription on target database '{}'",
320                            db.name
321                        ))?;
322                    tracing::info!(
323                        "Waiting for initial sync to complete (timeout: {}s)...",
324                        timeout
325                    );
326                    wait_for_sync(&target_db_client, &sub_name, timeout)
327                        .await
328                        .context(format!(
329                            "Failed to wait for initial sync on database '{}'",
330                            db.name
331                        ))?;
332                } else {
333                    anyhow::bail!(
334                        "Subscription '{}' is in error state: {}.\n\
335                         Use --force flag to drop and recreate the subscription.",
336                        sub_name,
337                        err_state
338                    );
339                }
340            }
341            SubscriptionState::NotFound => {
342                tracing::info!("Creating subscription on target database...");
343                create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
344                    .await
345                    .context(format!(
346                        "Failed to create subscription on target database '{}'",
347                        db.name
348                    ))?;
349                tracing::info!(
350                    "Waiting for initial sync to complete (timeout: {}s)...",
351                    timeout
352                );
353                wait_for_sync(&target_db_client, &sub_name, timeout)
354                    .await
355                    .context(format!(
356                        "Failed to wait for initial sync on database '{}'",
357                        db.name
358                    ))?;
359            }
360        }
361
362        tracing::info!("✓ Replication active for database '{}'", db.name);
363    }
364
365    tracing::info!("");
366    tracing::info!("========================================");
367    tracing::info!("✓ Logical replication is now active!");
368    tracing::info!("========================================");
369    tracing::info!("");
370    tracing::info!(
371        "Changes on {} source database(s) will now continuously",
372        databases.len()
373    );
374    tracing::info!("replicate to the target.");
375    tracing::info!("");
376    tracing::info!("Next steps:");
377    tracing::info!("  1. Run 'status' to monitor replication lag");
378    tracing::info!("  2. Run 'verify' to validate data integrity");
379    tracing::info!("  3. When ready, cutover to the target database");
380
381    Ok(())
382}
383
384/// Replace the database name in a PostgreSQL connection URL
385///
386/// # Arguments
387///
388/// * `url` - PostgreSQL connection URL
389/// * `new_db_name` - New database name to use
390///
391/// # Returns
392///
393/// URL with the database name replaced
394fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
395    // Split into base URL and query parameters
396    let parts: Vec<&str> = url.splitn(2, '?').collect();
397    let base_url = parts[0];
398    let query_params = parts.get(1);
399
400    // Split base URL by '/' to replace the database name
401    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
402
403    if url_parts.len() != 2 {
404        anyhow::bail!("Invalid connection URL format: cannot replace database name");
405    }
406
407    // Rebuild URL with new database name
408    let new_url = if let Some(params) = query_params {
409        format!("{}/{}?{}", url_parts[1], new_db_name, params)
410    } else {
411        format!("{}/{}", url_parts[1], new_db_name)
412    };
413
414    Ok(new_url)
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[tokio::test]
422    #[ignore]
423    async fn test_sync_command() {
424        // This test requires two databases: source and target
425        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
426        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
427
428        let pub_name = "test_sync_pub";
429        let sub_name = "test_sync_sub";
430        let timeout = 60; // 1 minute timeout for test
431
432        let result = sync(
433            &source_url,
434            &target_url,
435            None,
436            Some(pub_name),
437            Some(sub_name),
438            Some(timeout),
439            false,
440        )
441        .await;
442
443        match &result {
444            Ok(_) => println!("✓ Sync command completed successfully"),
445            Err(e) => {
446                println!("Error in sync command: {:?}", e);
447                // If either database doesn't support logical replication, skip
448                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
449                    println!("Skipping test - database might not support logical replication");
450                    return;
451                }
452            }
453        }
454
455        assert!(result.is_ok(), "Sync command failed: {:?}", result);
456
457        // Clean up
458        let target_client = connect(&target_url).await.unwrap();
459        crate::replication::drop_subscription(&target_client, sub_name)
460            .await
461            .unwrap();
462
463        let source_client = connect(&source_url).await.unwrap();
464        crate::replication::drop_publication(&source_client, pub_name)
465            .await
466            .unwrap();
467    }
468
469    #[tokio::test]
470    #[ignore]
471    async fn test_sync_with_defaults() {
472        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
473        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
474
475        let result = sync(&source_url, &target_url, None, None, None, Some(60), false).await;
476
477        match &result {
478            Ok(_) => println!("✓ Sync with defaults completed successfully"),
479            Err(e) => {
480                println!("Error in sync with defaults: {:?}", e);
481                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
482                    println!("Skipping test - database might not support logical replication");
483                    return;
484                }
485            }
486        }
487
488        assert!(result.is_ok(), "Sync with defaults failed: {:?}", result);
489
490        // Clean up using default names
491        let target_client = connect(&target_url).await.unwrap();
492        crate::replication::drop_subscription(&target_client, "seren_migration_sub")
493            .await
494            .unwrap();
495
496        let source_client = connect(&source_url).await.unwrap();
497        crate::replication::drop_publication(&source_client, "seren_migration_pub")
498            .await
499            .unwrap();
500    }
501
502    #[test]
503    fn test_replace_database_in_url() {
504        // Basic URL
505        let url = "postgresql://user:pass@localhost:5432/olddb";
506        let new_url = replace_database_in_url(url, "newdb").unwrap();
507        assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
508
509        // URL with query parameters
510        let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
511        let new_url = replace_database_in_url(url, "newdb").unwrap();
512        assert_eq!(
513            new_url,
514            "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
515        );
516
517        // URL without port
518        let url = "postgresql://user:pass@localhost/olddb";
519        let new_url = replace_database_in_url(url, "newdb").unwrap();
520        assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
521    }
522
523    #[tokio::test]
524    #[ignore]
525    async fn test_sync_with_database_filter() {
526        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
527        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
528
529        println!("Testing sync command with database filter...");
530        println!("⚠ WARNING: This will set up replication for filtered databases!");
531
532        // Create filter that includes only specific database
533        let filter = crate::filters::ReplicationFilter::new(
534            Some(vec!["postgres".to_string()]), // Include only postgres database
535            None,
536            None,
537            None,
538        )
539        .expect("Failed to create filter");
540
541        let result = sync(
542            &source_url,
543            &target_url,
544            Some(filter),
545            None,
546            None,
547            Some(60),
548            false,
549        )
550        .await;
551
552        match &result {
553            Ok(_) => {
554                println!("✓ Sync with database filter completed successfully");
555            }
556            Err(e) => {
557                println!("Sync with database filter failed: {:?}", e);
558                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
559                    println!("Skipping test - database might not support logical replication");
560                    return;
561                }
562            }
563        }
564
565        assert!(result.is_ok(), "Sync with database filter failed");
566
567        // Clean up - for single database, names don't have suffix
568        let db_url = replace_database_in_url(&target_url, "postgres").unwrap();
569        let target_client = connect(&db_url).await.unwrap();
570        crate::replication::drop_subscription(&target_client, "seren_migration_sub")
571            .await
572            .unwrap();
573
574        let source_url_db = replace_database_in_url(&source_url, "postgres").unwrap();
575        let source_client = connect(&source_url_db).await.unwrap();
576        crate::replication::drop_publication(&source_client, "seren_migration_pub")
577            .await
578            .unwrap();
579    }
580}