database_replicator/commands/
sync.rs

1// ABOUTME: Sync command implementation - Phase 3 of migration
2// ABOUTME: Sets up logical replication between source and target databases
3
4use crate::migration;
5use crate::postgres::connect;
6use crate::replication::{
7    create_publication, create_subscription, detect_subscription_state, drop_subscription,
8    wait_for_sync, SubscriptionState,
9};
10use crate::serendb::{resolve_target_mode, ConsoleClient, TargetMode};
11use anyhow::{anyhow, Context, Result};
12
13/// Set up logical replication between source and target databases
14///
15/// This command performs Phase 3 of the migration process:
16/// 1. Discovers all databases on the source
17/// 2. Filters databases based on the provided filter criteria
18/// 3. For each database:
19///    - Checks if subscription already exists (skips or recreates based on state and force flag)
20///    - Creates a publication on the source database (filtered tables if specified)
21///    - Creates a subscription on the target database pointing to the source
22///    - Waits for the initial sync to complete
23///
24/// After this command succeeds, changes on the source databases will continuously
25/// replicate to the target until the subscriptions are dropped.
26///
27/// This command is idempotent - it can be safely re-run if interrupted or if setup failed partially.
28///
29/// # Arguments
30///
31/// * `source_url` - PostgreSQL connection string for source database
32/// * `target_url` - PostgreSQL connection string for target (Seren) database
33/// * `filter` - Optional replication filter for database and table selection
34/// * `publication_name` - Optional publication name template (defaults to "seren_migration_pub")
35/// * `subscription_name` - Optional subscription name template (defaults to "seren_migration_sub")
36/// * `sync_timeout_secs` - Optional timeout in seconds per database (defaults to 300)
37/// * `force` - Force recreate subscriptions even if they already exist (defaults to false)
38///
39/// # Returns
40///
41/// Returns `Ok(())` if replication setup completes successfully for all databases.
42///
43/// # Errors
44///
45/// This function will return an error if:
46/// - Cannot connect to source or target database
47/// - Cannot discover databases on source
48/// - Publication creation fails for any database
49/// - Subscription creation fails for any database
50/// - Initial sync doesn't complete within timeout for any database
51///
52/// # Examples
53///
54/// ```no_run
55/// # use anyhow::Result;
56/// # use database_replicator::commands::sync;
57/// # use database_replicator::filters::ReplicationFilter;
58/// # async fn example() -> Result<()> {
59/// // Replicate all databases
60/// sync(
61///     "postgresql://user:pass@source.example.com/postgres",
62///     "postgresql://user:pass@target.example.com/postgres",
63///     None,  // No filter - replicate all databases
64///     None,  // Use default publication name
65///     None,  // Use default subscription name
66///     Some(600),  // 10 minute timeout per database
67///     false,  // Don't force recreate
68/// ).await?;
69///
70/// // Replicate only specific databases
71/// let filter = ReplicationFilter::new(
72///     Some(vec!["mydb".to_string(), "analytics".to_string()]),
73///     None,
74///     None,
75///     None,
76/// )?;
77/// sync(
78///     "postgresql://user:pass@source.example.com/postgres",
79///     "postgresql://user:pass@target.example.com/postgres",
80///     Some(filter),
81///     None,
82///     None,
83///     Some(600),
84///     false,  // Don't force recreate
85/// ).await?;
86/// # Ok(())
87/// # }
88/// ```
89pub async fn sync(
90    source_url: &str,
91    target_url: &str,
92    filter: Option<crate::filters::ReplicationFilter>,
93    publication_name: Option<&str>,
94    subscription_name: Option<&str>,
95    sync_timeout_secs: Option<u64>,
96    force: bool,
97) -> Result<()> {
98    let pub_name_template = publication_name.unwrap_or("seren_migration_pub");
99    let sub_name_template = subscription_name.unwrap_or("seren_migration_sub");
100    let timeout = sync_timeout_secs.unwrap_or(300); // 5 minutes default
101    let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
102
103    tracing::info!("Starting logical replication setup...");
104
105    // CRITICAL: Ensure source and target are different to prevent data loss
106    crate::utils::validate_source_target_different(source_url, target_url)
107        .context("Source and target validation failed")?;
108    tracing::info!("✓ Verified source and target are different databases");
109
110    // Check target wal_level before attempting logical replication
111    tracing::info!("Checking target wal_level for logical replication...");
112    let target_client = connect(target_url)
113        .await
114        .context("Failed to connect to target database")?;
115    let target_wal_level = crate::postgres::check_wal_level(&target_client).await?;
116
117    if target_wal_level != "logical" {
118        anyhow::bail!(
119            "Target database wal_level is set to '{}', but 'logical' is required for logical replication\n\
120             \n\
121             To fix this:\n\
122             \n\
123             Option 1: Set wal_level in postgresql.conf\n\
124               1. Edit postgresql.conf: wal_level = logical\n\
125               2. Restart PostgreSQL server\n\
126               3. Re-run this command\n\
127             \n\
128             Option 2: Skip continuous sync (snapshot only)\n\
129               Use the init command with --no-sync flag to perform initial snapshot without setting up logical replication\n\
130             \n\
131             Note: Some managed PostgreSQL services may require configuring wal_level through their control panel.",
132            target_wal_level
133        );
134    }
135    tracing::info!("✓ Target wal_level is set to 'logical' (logical replication supported)");
136
137    // Connect to source database to discover databases
138    tracing::info!("Connecting to source database...");
139    let source_client = connect(source_url)
140        .await
141        .context("Failed to connect to source database")?;
142    tracing::info!("✓ Connected to source");
143
144    // Discover databases on source
145    tracing::info!("Discovering databases on source...");
146    let all_databases = migration::list_databases(&source_client)
147        .await
148        .context("Failed to list databases on source")?;
149
150    // Apply filtering rules
151    let databases: Vec<_> = all_databases
152        .into_iter()
153        .filter(|db| filter.should_replicate_database(&db.name))
154        .collect();
155
156    if databases.is_empty() {
157        if filter.is_empty() {
158            tracing::warn!("⚠ No user databases found on source");
159            tracing::warn!("  This is unusual - the source database appears empty");
160            tracing::warn!("  Only template databases exist");
161        } else {
162            tracing::warn!("⚠ No databases matched the filter criteria");
163            tracing::warn!("  Check your --include-databases or --exclude-databases settings");
164        }
165        tracing::info!("✅ Logical replication setup complete (no databases to replicate)");
166        return Ok(());
167    }
168
169    tracing::info!(
170        "Found {} database(s) to replicate: {}",
171        databases.len(),
172        databases
173            .iter()
174            .map(|db| db.name.as_str())
175            .collect::<Vec<_>>()
176            .join(", ")
177    );
178
179    // Set up replication for each database
180    for db in &databases {
181        tracing::info!("");
182        tracing::info!(
183            "========================================\nDatabase: '{}'\n========================================",
184            db.name
185        );
186
187        // Build database-specific connection URLs
188        let source_db_url = replace_database_in_url(source_url, &db.name).context(format!(
189            "Failed to build source URL for database '{}'",
190            db.name
191        ))?;
192        let target_db_url = replace_database_in_url(target_url, &db.name).context(format!(
193            "Failed to build target URL for database '{}'",
194            db.name
195        ))?;
196
197        // Build database-specific publication and subscription names
198        let pub_name = if databases.len() == 1 {
199            // Single database - use template name as-is
200            pub_name_template.to_string()
201        } else {
202            // Multiple databases - append database name to avoid conflicts
203            format!("{}_{}", pub_name_template, db.name)
204        };
205
206        let sub_name = if databases.len() == 1 {
207            // Single database - use template name as-is
208            sub_name_template.to_string()
209        } else {
210            // Multiple databases - append database name to avoid conflicts
211            format!("{}_{}", sub_name_template, db.name)
212        };
213
214        tracing::info!("Publication: '{}'", pub_name);
215        tracing::info!("Subscription: '{}'", sub_name);
216
217        // Connect to the specific database on source and target
218        tracing::info!("Connecting to source database '{}'...", db.name);
219        let source_db_client = connect(&source_db_url).await.context(format!(
220            "Failed to connect to source database '{}'",
221            db.name
222        ))?;
223        tracing::info!("✓ Connected to source");
224
225        tracing::info!("Connecting to target database '{}'...", db.name);
226        let target_db_client = connect(&target_db_url).await.context(format!(
227            "Failed to connect to target database '{}'",
228            db.name
229        ))?;
230        tracing::info!("✓ Connected to target");
231
232        // Create publication on source database
233        tracing::info!("Creating publication on source database...");
234        create_publication(&source_db_client, &db.name, &pub_name, &filter)
235            .await
236            .context(format!(
237                "Failed to create publication on source database '{}'",
238                db.name
239            ))?;
240
241        // Check if subscription already exists
242        tracing::info!("Checking subscription state...");
243        let sub_state = detect_subscription_state(&target_db_client, &sub_name)
244            .await
245            .context(format!(
246                "Failed to detect subscription state for '{}'",
247                sub_name
248            ))?;
249
250        match sub_state {
251            SubscriptionState::Streaming => {
252                if force {
253                    tracing::info!(
254                        "⚠ Subscription '{}' is already streaming, but --force flag is set",
255                        sub_name
256                    );
257                    tracing::info!("Dropping existing subscription...");
258                    drop_subscription(&target_db_client, &sub_name)
259                        .await
260                        .context(format!("Failed to drop subscription '{}'", sub_name))?;
261                    tracing::info!("Creating new subscription...");
262                    create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
263                        .await
264                        .context(format!(
265                            "Failed to create subscription on target database '{}'",
266                            db.name
267                        ))?;
268                    tracing::info!(
269                        "Waiting for initial sync to complete (timeout: {}s)...",
270                        timeout
271                    );
272                    wait_for_sync(&target_db_client, &sub_name, timeout)
273                        .await
274                        .context(format!(
275                            "Failed to wait for initial sync on database '{}'",
276                            db.name
277                        ))?;
278                } else {
279                    tracing::info!(
280                        "✓ Subscription '{}' is already streaming and healthy",
281                        sub_name
282                    );
283                    tracing::info!("  Skipping subscription creation (use --force to recreate)");
284                }
285            }
286            SubscriptionState::Initializing
287            | SubscriptionState::Copying
288            | SubscriptionState::Syncing => {
289                tracing::info!(
290                    "ℹ Subscription '{}' already exists and is syncing (state: {:?})",
291                    sub_name,
292                    sub_state
293                );
294                tracing::info!(
295                    "Waiting for existing sync to complete (timeout: {}s)...",
296                    timeout
297                );
298                wait_for_sync(&target_db_client, &sub_name, timeout)
299                    .await
300                    .context(format!(
301                        "Failed to wait for existing sync on database '{}'",
302                        db.name
303                    ))?;
304            }
305            SubscriptionState::Error(err_state) => {
306                tracing::warn!(
307                    "⚠ Subscription '{}' is in error state: {}",
308                    sub_name,
309                    err_state
310                );
311                if force {
312                    tracing::info!("Dropping failed subscription and recreating...");
313                    drop_subscription(&target_db_client, &sub_name)
314                        .await
315                        .context(format!("Failed to drop subscription '{}'", sub_name))?;
316                    tracing::info!("Creating new subscription...");
317                    create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
318                        .await
319                        .context(format!(
320                            "Failed to create subscription on target database '{}'",
321                            db.name
322                        ))?;
323                    tracing::info!(
324                        "Waiting for initial sync to complete (timeout: {}s)...",
325                        timeout
326                    );
327                    wait_for_sync(&target_db_client, &sub_name, timeout)
328                        .await
329                        .context(format!(
330                            "Failed to wait for initial sync on database '{}'",
331                            db.name
332                        ))?;
333                } else {
334                    anyhow::bail!(
335                        "Subscription '{}' is in error state: {}.\n\
336                         Use --force flag to drop and recreate the subscription.",
337                        sub_name,
338                        err_state
339                    );
340                }
341            }
342            SubscriptionState::NotFound => {
343                tracing::info!("Creating subscription on target database...");
344                create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
345                    .await
346                    .context(format!(
347                        "Failed to create subscription on target database '{}'",
348                        db.name
349                    ))?;
350                tracing::info!(
351                    "Waiting for initial sync to complete (timeout: {}s)...",
352                    timeout
353                );
354                wait_for_sync(&target_db_client, &sub_name, timeout)
355                    .await
356                    .context(format!(
357                        "Failed to wait for initial sync on database '{}'",
358                        db.name
359                    ))?;
360            }
361        }
362
363        tracing::info!("✓ Replication active for database '{}'", db.name);
364    }
365
366    tracing::info!("");
367    tracing::info!("========================================");
368    tracing::info!("✓ Logical replication is now active!");
369    tracing::info!("========================================");
370    tracing::info!("");
371    tracing::info!(
372        "Changes on {} source database(s) will now continuously",
373        databases.len()
374    );
375    tracing::info!("replicate to the target.");
376    tracing::info!("");
377    tracing::info!("Next steps:");
378    tracing::info!("  1. Run 'status' to monitor replication lag");
379    tracing::info!("  2. Run 'verify' to validate data integrity");
380    tracing::info!("  3. When ready, cutover to the target database");
381
382    Ok(())
383}
384
385/// Resolve the effective target URL for sync, honoring saved SerenDB state when using API keys.
386pub async fn resolve_target_for_sync(
387    target: Option<String>,
388    api_key: Option<String>,
389    source_url: &str,
390) -> Result<String> {
391    let mode = resolve_target_mode(target, api_key.clone())?;
392
393    match mode {
394        TargetMode::ConnectionString(url) => Ok(url),
395        TargetMode::SavedState(state) => {
396            println!(
397                "\n\u{1F4C1} Using saved target: {}/{}",
398                state.project_name, state.branch_name
399            );
400            println!("   Databases: {:?}\n", state.databases);
401
402            if !state.source_matches(source_url) {
403                eprintln!("\u{26A0}  Warning: Source database has changed since the last init run");
404                eprintln!("   Saved for: {}", state.source_url_hash);
405                eprintln!("   Current:   {}", source_url);
406                eprintln!();
407            }
408
409            let api_key = api_key
410                .or_else(|| std::env::var("SEREN_API_KEY").ok())
411                .ok_or_else(|| {
412                    anyhow!(
413                        "SEREN_API_KEY required to refresh saved SerenDB credentials. Provide --api-key or set SEREN_API_KEY."
414                    )
415                })?;
416
417            let primary_db = state
418                .databases
419                .first()
420                .cloned()
421                .ok_or_else(|| anyhow!("Saved target has no databases recorded. Re-run init."))?;
422
423            let client = ConsoleClient::new(None, api_key);
424            let conn_str = client
425                .get_connection_string(
426                    &state.project_id,
427                    &state.branch_id,
428                    &primary_db,
429                    false,
430                )
431                .await
432                .context("Failed to fetch connection string for saved SerenDB target")?;
433
434            Ok(conn_str)
435        }
436        TargetMode::ApiKey(_) => anyhow::bail!(
437            "No saved SerenDB target found. Run 'database-replicator init' first or provide --target."
438        ),
439    }
440}
441
442/// Replace the database name in a PostgreSQL connection URL
443///
444/// # Arguments
445///
446/// * `url` - PostgreSQL connection URL
447/// * `new_db_name` - New database name to use
448///
449/// # Returns
450///
451/// URL with the database name replaced
452pub fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
453    // Split into base URL and query parameters
454    let parts: Vec<&str> = url.splitn(2, '?').collect();
455    let base_url = parts[0];
456    let query_params = parts.get(1);
457
458    // Split base URL by '/' to replace the database name
459    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
460
461    if url_parts.len() != 2 {
462        anyhow::bail!("Invalid connection URL format: cannot replace database name");
463    }
464
465    // Rebuild URL with new database name
466    let new_url = if let Some(params) = query_params {
467        format!("{}/{}?{}", url_parts[1], new_db_name, params)
468    } else {
469        format!("{}/{}", url_parts[1], new_db_name)
470    };
471
472    Ok(new_url)
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478
479    #[tokio::test]
480    #[ignore]
481    async fn test_sync_command() {
482        // This test requires two databases: source and target
483        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
484        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
485
486        let pub_name = "test_sync_pub";
487        let sub_name = "test_sync_sub";
488        let timeout = 60; // 1 minute timeout for test
489
490        let result = sync(
491            &source_url,
492            &target_url,
493            None,
494            Some(pub_name),
495            Some(sub_name),
496            Some(timeout),
497            false,
498        )
499        .await;
500
501        match &result {
502            Ok(_) => println!("✓ Sync command completed successfully"),
503            Err(e) => {
504                println!("Error in sync command: {:?}", e);
505                // If either database doesn't support logical replication, skip
506                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
507                    println!("Skipping test - database might not support logical replication");
508                    return;
509                }
510            }
511        }
512
513        assert!(result.is_ok(), "Sync command failed: {:?}", result);
514
515        // Clean up
516        let target_client = connect(&target_url).await.unwrap();
517        crate::replication::drop_subscription(&target_client, sub_name)
518            .await
519            .unwrap();
520
521        let source_client = connect(&source_url).await.unwrap();
522        crate::replication::drop_publication(&source_client, pub_name)
523            .await
524            .unwrap();
525    }
526
527    #[tokio::test]
528    #[ignore]
529    async fn test_sync_with_defaults() {
530        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
531        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
532
533        let result = sync(&source_url, &target_url, None, None, None, Some(60), false).await;
534
535        match &result {
536            Ok(_) => println!("✓ Sync with defaults completed successfully"),
537            Err(e) => {
538                println!("Error in sync with defaults: {:?}", e);
539                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
540                    println!("Skipping test - database might not support logical replication");
541                    return;
542                }
543            }
544        }
545
546        assert!(result.is_ok(), "Sync with defaults failed: {:?}", result);
547
548        // Clean up using default names
549        let target_client = connect(&target_url).await.unwrap();
550        crate::replication::drop_subscription(&target_client, "seren_migration_sub")
551            .await
552            .unwrap();
553
554        let source_client = connect(&source_url).await.unwrap();
555        crate::replication::drop_publication(&source_client, "seren_migration_pub")
556            .await
557            .unwrap();
558    }
559
560    #[test]
561    fn test_replace_database_in_url() {
562        // Basic URL
563        let url = "postgresql://user:pass@localhost:5432/olddb";
564        let new_url = replace_database_in_url(url, "newdb").unwrap();
565        assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
566
567        // URL with query parameters
568        let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
569        let new_url = replace_database_in_url(url, "newdb").unwrap();
570        assert_eq!(
571            new_url,
572            "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
573        );
574
575        // URL without port
576        let url = "postgresql://user:pass@localhost/olddb";
577        let new_url = replace_database_in_url(url, "newdb").unwrap();
578        assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
579    }
580
581    #[tokio::test]
582    #[ignore]
583    async fn test_sync_with_database_filter() {
584        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
585        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
586
587        println!("Testing sync command with database filter...");
588        println!("⚠ WARNING: This will set up replication for filtered databases!");
589
590        // Create filter that includes only specific database
591        let filter = crate::filters::ReplicationFilter::new(
592            Some(vec!["postgres".to_string()]), // Include only postgres database
593            None,
594            None,
595            None,
596        )
597        .expect("Failed to create filter");
598
599        let result = sync(
600            &source_url,
601            &target_url,
602            Some(filter),
603            None,
604            None,
605            Some(60),
606            false,
607        )
608        .await;
609
610        match &result {
611            Ok(_) => {
612                println!("✓ Sync with database filter completed successfully");
613            }
614            Err(e) => {
615                println!("Sync with database filter failed: {:?}", e);
616                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
617                    println!("Skipping test - database might not support logical replication");
618                    return;
619                }
620            }
621        }
622
623        assert!(result.is_ok(), "Sync with database filter failed");
624
625        // Clean up - for single database, names don't have suffix
626        let db_url = replace_database_in_url(&target_url, "postgres").unwrap();
627        let target_client = connect(&db_url).await.unwrap();
628        crate::replication::drop_subscription(&target_client, "seren_migration_sub")
629            .await
630            .unwrap();
631
632        let source_url_db = replace_database_in_url(&source_url, "postgres").unwrap();
633        let source_client = connect(&source_url_db).await.unwrap();
634        crate::replication::drop_publication(&source_client, "seren_migration_pub")
635            .await
636            .unwrap();
637    }
638}