database_replicator/commands/
sync.rs1use crate::migration;
5use crate::postgres::connect;
6use crate::replication::{
7 create_publication, create_subscription, detect_subscription_state, drop_subscription,
8 wait_for_sync, SubscriptionState,
9};
10use crate::serendb::{resolve_target_mode, ConsoleClient, TargetMode};
11use anyhow::{anyhow, Context, Result};
12
13pub async fn sync(
90 source_url: &str,
91 target_url: &str,
92 filter: Option<crate::filters::ReplicationFilter>,
93 publication_name: Option<&str>,
94 subscription_name: Option<&str>,
95 sync_timeout_secs: Option<u64>,
96 force: bool,
97) -> Result<()> {
98 let pub_name_template = publication_name.unwrap_or("seren_migration_pub");
99 let sub_name_template = subscription_name.unwrap_or("seren_migration_sub");
100 let timeout = sync_timeout_secs.unwrap_or(300); let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
102
103 tracing::info!("Starting logical replication setup...");
104
105 crate::utils::validate_source_target_different(source_url, target_url)
107 .context("Source and target validation failed")?;
108 tracing::info!("✓ Verified source and target are different databases");
109
110 tracing::info!("Checking target wal_level for logical replication...");
112 let target_client = connect(target_url)
113 .await
114 .context("Failed to connect to target database")?;
115 let target_wal_level = crate::postgres::check_wal_level(&target_client).await?;
116
117 if target_wal_level != "logical" {
118 anyhow::bail!(
119 "Target database wal_level is set to '{}', but 'logical' is required for logical replication\n\
120 \n\
121 To fix this:\n\
122 \n\
123 Option 1: Set wal_level in postgresql.conf\n\
124 1. Edit postgresql.conf: wal_level = logical\n\
125 2. Restart PostgreSQL server\n\
126 3. Re-run this command\n\
127 \n\
128 Option 2: Skip continuous sync (snapshot only)\n\
129 Use the init command with --no-sync flag to perform initial snapshot without setting up logical replication\n\
130 \n\
131 Note: Some managed PostgreSQL services may require configuring wal_level through their control panel.",
132 target_wal_level
133 );
134 }
135 tracing::info!("✓ Target wal_level is set to 'logical' (logical replication supported)");
136
137 tracing::info!("Connecting to source database...");
139 let source_client = connect(source_url)
140 .await
141 .context("Failed to connect to source database")?;
142 tracing::info!("✓ Connected to source");
143
144 tracing::info!("Discovering databases on source...");
146 let all_databases = migration::list_databases(&source_client)
147 .await
148 .context("Failed to list databases on source")?;
149
150 let databases: Vec<_> = all_databases
152 .into_iter()
153 .filter(|db| filter.should_replicate_database(&db.name))
154 .collect();
155
156 if databases.is_empty() {
157 if filter.is_empty() {
158 tracing::warn!("⚠ No user databases found on source");
159 tracing::warn!(" This is unusual - the source database appears empty");
160 tracing::warn!(" Only template databases exist");
161 } else {
162 tracing::warn!("⚠ No databases matched the filter criteria");
163 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
164 }
165 tracing::info!("✅ Logical replication setup complete (no databases to replicate)");
166 return Ok(());
167 }
168
169 tracing::info!(
170 "Found {} database(s) to replicate: {}",
171 databases.len(),
172 databases
173 .iter()
174 .map(|db| db.name.as_str())
175 .collect::<Vec<_>>()
176 .join(", ")
177 );
178
179 for db in &databases {
181 tracing::info!("");
182 tracing::info!(
183 "========================================\nDatabase: '{}'\n========================================",
184 db.name
185 );
186
187 let source_db_url = replace_database_in_url(source_url, &db.name).context(format!(
189 "Failed to build source URL for database '{}'",
190 db.name
191 ))?;
192 let target_db_url = replace_database_in_url(target_url, &db.name).context(format!(
193 "Failed to build target URL for database '{}'",
194 db.name
195 ))?;
196
197 let pub_name = if databases.len() == 1 {
199 pub_name_template.to_string()
201 } else {
202 format!("{}_{}", pub_name_template, db.name)
204 };
205
206 let sub_name = if databases.len() == 1 {
207 sub_name_template.to_string()
209 } else {
210 format!("{}_{}", sub_name_template, db.name)
212 };
213
214 tracing::info!("Publication: '{}'", pub_name);
215 tracing::info!("Subscription: '{}'", sub_name);
216
217 tracing::info!("Connecting to source database '{}'...", db.name);
219 let source_db_client = connect(&source_db_url).await.context(format!(
220 "Failed to connect to source database '{}'",
221 db.name
222 ))?;
223 tracing::info!("✓ Connected to source");
224
225 tracing::info!("Connecting to target database '{}'...", db.name);
226 let target_db_client = connect(&target_db_url).await.context(format!(
227 "Failed to connect to target database '{}'",
228 db.name
229 ))?;
230 tracing::info!("✓ Connected to target");
231
232 tracing::info!("Creating publication on source database...");
234 create_publication(&source_db_client, &db.name, &pub_name, &filter)
235 .await
236 .context(format!(
237 "Failed to create publication on source database '{}'",
238 db.name
239 ))?;
240
241 tracing::info!("Checking subscription state...");
243 let sub_state = detect_subscription_state(&target_db_client, &sub_name)
244 .await
245 .context(format!(
246 "Failed to detect subscription state for '{}'",
247 sub_name
248 ))?;
249
250 match sub_state {
251 SubscriptionState::Streaming => {
252 if force {
253 tracing::info!(
254 "⚠ Subscription '{}' is already streaming, but --force flag is set",
255 sub_name
256 );
257 tracing::info!("Dropping existing subscription...");
258 drop_subscription(&target_db_client, &sub_name)
259 .await
260 .context(format!("Failed to drop subscription '{}'", sub_name))?;
261 tracing::info!("Creating new subscription...");
262 create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
263 .await
264 .context(format!(
265 "Failed to create subscription on target database '{}'",
266 db.name
267 ))?;
268 tracing::info!(
269 "Waiting for initial sync to complete (timeout: {}s)...",
270 timeout
271 );
272 wait_for_sync(&target_db_client, &sub_name, timeout)
273 .await
274 .context(format!(
275 "Failed to wait for initial sync on database '{}'",
276 db.name
277 ))?;
278 } else {
279 tracing::info!(
280 "✓ Subscription '{}' is already streaming and healthy",
281 sub_name
282 );
283 tracing::info!(" Skipping subscription creation (use --force to recreate)");
284 }
285 }
286 SubscriptionState::Initializing
287 | SubscriptionState::Copying
288 | SubscriptionState::Syncing => {
289 tracing::info!(
290 "ℹ Subscription '{}' already exists and is syncing (state: {:?})",
291 sub_name,
292 sub_state
293 );
294 tracing::info!(
295 "Waiting for existing sync to complete (timeout: {}s)...",
296 timeout
297 );
298 wait_for_sync(&target_db_client, &sub_name, timeout)
299 .await
300 .context(format!(
301 "Failed to wait for existing sync on database '{}'",
302 db.name
303 ))?;
304 }
305 SubscriptionState::Error(err_state) => {
306 tracing::warn!(
307 "⚠ Subscription '{}' is in error state: {}",
308 sub_name,
309 err_state
310 );
311 if force {
312 tracing::info!("Dropping failed subscription and recreating...");
313 drop_subscription(&target_db_client, &sub_name)
314 .await
315 .context(format!("Failed to drop subscription '{}'", sub_name))?;
316 tracing::info!("Creating new subscription...");
317 create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
318 .await
319 .context(format!(
320 "Failed to create subscription on target database '{}'",
321 db.name
322 ))?;
323 tracing::info!(
324 "Waiting for initial sync to complete (timeout: {}s)...",
325 timeout
326 );
327 wait_for_sync(&target_db_client, &sub_name, timeout)
328 .await
329 .context(format!(
330 "Failed to wait for initial sync on database '{}'",
331 db.name
332 ))?;
333 } else {
334 anyhow::bail!(
335 "Subscription '{}' is in error state: {}.\n\
336 Use --force flag to drop and recreate the subscription.",
337 sub_name,
338 err_state
339 );
340 }
341 }
342 SubscriptionState::NotFound => {
343 tracing::info!("Creating subscription on target database...");
344 create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
345 .await
346 .context(format!(
347 "Failed to create subscription on target database '{}'",
348 db.name
349 ))?;
350 tracing::info!(
351 "Waiting for initial sync to complete (timeout: {}s)...",
352 timeout
353 );
354 wait_for_sync(&target_db_client, &sub_name, timeout)
355 .await
356 .context(format!(
357 "Failed to wait for initial sync on database '{}'",
358 db.name
359 ))?;
360 }
361 }
362
363 tracing::info!("✓ Replication active for database '{}'", db.name);
364 }
365
366 tracing::info!("");
367 tracing::info!("========================================");
368 tracing::info!("✓ Logical replication is now active!");
369 tracing::info!("========================================");
370 tracing::info!("");
371 tracing::info!(
372 "Changes on {} source database(s) will now continuously",
373 databases.len()
374 );
375 tracing::info!("replicate to the target.");
376 tracing::info!("");
377 tracing::info!("Next steps:");
378 tracing::info!(" 1. Run 'status' to monitor replication lag");
379 tracing::info!(" 2. Run 'verify' to validate data integrity");
380 tracing::info!(" 3. When ready, cutover to the target database");
381
382 Ok(())
383}
384
385pub async fn resolve_target_for_sync(
387 target: Option<String>,
388 api_key: Option<String>,
389 source_url: &str,
390) -> Result<String> {
391 let mode = resolve_target_mode(target, api_key.clone())?;
392
393 match mode {
394 TargetMode::ConnectionString(url) => Ok(url),
395 TargetMode::SavedState(state) => {
396 println!(
397 "\n\u{1F4C1} Using saved target: {}/{}",
398 state.project_name, state.branch_name
399 );
400 println!(" Databases: {:?}\n", state.databases);
401
402 if !state.source_matches(source_url) {
403 eprintln!("\u{26A0} Warning: Source database has changed since the last init run");
404 eprintln!(" Saved for: {}", state.source_url_hash);
405 eprintln!(" Current: {}", source_url);
406 eprintln!();
407 }
408
409 let api_key = api_key
410 .or_else(|| std::env::var("SEREN_API_KEY").ok())
411 .ok_or_else(|| {
412 anyhow!(
413 "SEREN_API_KEY required to refresh saved SerenDB credentials. Provide --api-key or set SEREN_API_KEY."
414 )
415 })?;
416
417 let primary_db = state
418 .databases
419 .first()
420 .cloned()
421 .ok_or_else(|| anyhow!("Saved target has no databases recorded. Re-run init."))?;
422
423 let client = ConsoleClient::new(None, api_key);
424 let conn_str = client
425 .get_connection_string(
426 &state.project_id,
427 &state.branch_id,
428 &primary_db,
429 false,
430 )
431 .await
432 .context("Failed to fetch connection string for saved SerenDB target")?;
433
434 Ok(conn_str)
435 }
436 TargetMode::ApiKey(_) => anyhow::bail!(
437 "No saved SerenDB target found. Run 'database-replicator init' first or provide --target."
438 ),
439 }
440}
441
442pub fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
453 let parts: Vec<&str> = url.splitn(2, '?').collect();
455 let base_url = parts[0];
456 let query_params = parts.get(1);
457
458 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
460
461 if url_parts.len() != 2 {
462 anyhow::bail!("Invalid connection URL format: cannot replace database name");
463 }
464
465 let new_url = if let Some(params) = query_params {
467 format!("{}/{}?{}", url_parts[1], new_db_name, params)
468 } else {
469 format!("{}/{}", url_parts[1], new_db_name)
470 };
471
472 Ok(new_url)
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478
479 #[tokio::test]
480 #[ignore]
481 async fn test_sync_command() {
482 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
484 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
485
486 let pub_name = "test_sync_pub";
487 let sub_name = "test_sync_sub";
488 let timeout = 60; let result = sync(
491 &source_url,
492 &target_url,
493 None,
494 Some(pub_name),
495 Some(sub_name),
496 Some(timeout),
497 false,
498 )
499 .await;
500
501 match &result {
502 Ok(_) => println!("✓ Sync command completed successfully"),
503 Err(e) => {
504 println!("Error in sync command: {:?}", e);
505 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
507 println!("Skipping test - database might not support logical replication");
508 return;
509 }
510 }
511 }
512
513 assert!(result.is_ok(), "Sync command failed: {:?}", result);
514
515 let target_client = connect(&target_url).await.unwrap();
517 crate::replication::drop_subscription(&target_client, sub_name)
518 .await
519 .unwrap();
520
521 let source_client = connect(&source_url).await.unwrap();
522 crate::replication::drop_publication(&source_client, pub_name)
523 .await
524 .unwrap();
525 }
526
527 #[tokio::test]
528 #[ignore]
529 async fn test_sync_with_defaults() {
530 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
531 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
532
533 let result = sync(&source_url, &target_url, None, None, None, Some(60), false).await;
534
535 match &result {
536 Ok(_) => println!("✓ Sync with defaults completed successfully"),
537 Err(e) => {
538 println!("Error in sync with defaults: {:?}", e);
539 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
540 println!("Skipping test - database might not support logical replication");
541 return;
542 }
543 }
544 }
545
546 assert!(result.is_ok(), "Sync with defaults failed: {:?}", result);
547
548 let target_client = connect(&target_url).await.unwrap();
550 crate::replication::drop_subscription(&target_client, "seren_migration_sub")
551 .await
552 .unwrap();
553
554 let source_client = connect(&source_url).await.unwrap();
555 crate::replication::drop_publication(&source_client, "seren_migration_pub")
556 .await
557 .unwrap();
558 }
559
560 #[test]
561 fn test_replace_database_in_url() {
562 let url = "postgresql://user:pass@localhost:5432/olddb";
564 let new_url = replace_database_in_url(url, "newdb").unwrap();
565 assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
566
567 let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
569 let new_url = replace_database_in_url(url, "newdb").unwrap();
570 assert_eq!(
571 new_url,
572 "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
573 );
574
575 let url = "postgresql://user:pass@localhost/olddb";
577 let new_url = replace_database_in_url(url, "newdb").unwrap();
578 assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
579 }
580
581 #[tokio::test]
582 #[ignore]
583 async fn test_sync_with_database_filter() {
584 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
585 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
586
587 println!("Testing sync command with database filter...");
588 println!("⚠ WARNING: This will set up replication for filtered databases!");
589
590 let filter = crate::filters::ReplicationFilter::new(
592 Some(vec!["postgres".to_string()]), None,
594 None,
595 None,
596 )
597 .expect("Failed to create filter");
598
599 let result = sync(
600 &source_url,
601 &target_url,
602 Some(filter),
603 None,
604 None,
605 Some(60),
606 false,
607 )
608 .await;
609
610 match &result {
611 Ok(_) => {
612 println!("✓ Sync with database filter completed successfully");
613 }
614 Err(e) => {
615 println!("Sync with database filter failed: {:?}", e);
616 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
617 println!("Skipping test - database might not support logical replication");
618 return;
619 }
620 }
621 }
622
623 assert!(result.is_ok(), "Sync with database filter failed");
624
625 let db_url = replace_database_in_url(&target_url, "postgres").unwrap();
627 let target_client = connect(&db_url).await.unwrap();
628 crate::replication::drop_subscription(&target_client, "seren_migration_sub")
629 .await
630 .unwrap();
631
632 let source_url_db = replace_database_in_url(&source_url, "postgres").unwrap();
633 let source_client = connect(&source_url_db).await.unwrap();
634 crate::replication::drop_publication(&source_client, "seren_migration_pub")
635 .await
636 .unwrap();
637 }
638}