database_replicator/xmin/
daemon.rs1use anyhow::{Context, Result};
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::time::interval;
8
9use super::reader::{detect_wraparound, WraparoundCheck, XminReader};
10use super::reconciler::Reconciler;
11use super::state::SyncState;
12use super::writer::{get_primary_key_columns, get_table_columns, row_to_values, ChangeWriter};
13
14#[derive(Debug, Clone)]
16pub struct DaemonConfig {
17 pub sync_interval: Duration,
19 pub reconcile_interval: Option<Duration>,
22 pub state_path: PathBuf,
24 pub batch_size: usize,
26 pub tables: Vec<String>,
28 pub schema: String,
30}
31
32impl Default for DaemonConfig {
33 fn default() -> Self {
34 Self {
35 sync_interval: Duration::from_secs(3600), reconcile_interval: Some(Duration::from_secs(86400)), state_path: SyncState::default_path(),
38 batch_size: 10_000, tables: Vec::new(),
40 schema: "public".to_string(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Default)]
47pub struct SyncStats {
48 pub tables_synced: usize,
49 pub rows_synced: u64,
50 pub rows_deleted: u64,
51 pub errors: Vec<String>,
52 pub duration_ms: u64,
53}
54
55impl SyncStats {
56 pub fn is_success(&self) -> bool {
58 self.errors.is_empty()
59 }
60}
61
62pub struct SyncDaemon {
70 config: DaemonConfig,
71 source_url: String,
72 target_url: String,
73}
74
75impl SyncDaemon {
76 pub fn new(source_url: String, target_url: String, config: DaemonConfig) -> Self {
78 Self {
79 config,
80 source_url,
81 target_url,
82 }
83 }
84
85 pub async fn run_sync_cycle(&self) -> Result<SyncStats> {
93 let start = std::time::Instant::now();
94 let mut stats = SyncStats::default();
95
96 let mut state = self.load_or_create_state().await?;
98
99 let source_client = crate::postgres::connect_with_retry(&self.source_url)
101 .await
102 .context("Failed to connect to source database")?;
103 let target_client = crate::postgres::connect_with_retry(&self.target_url)
104 .await
105 .context("Failed to connect to target database")?;
106
107 let reader = XminReader::new(&source_client);
108 let writer = ChangeWriter::new(&target_client);
109
110 let tables = if self.config.tables.is_empty() {
112 reader.list_tables(&self.config.schema).await?
113 } else {
114 self.config.tables.clone()
115 };
116
117 for table in &tables {
119 match self
120 .sync_table(&reader, &writer, &mut state, &self.config.schema, table)
121 .await
122 {
123 Ok(rows) => {
124 stats.tables_synced += 1;
125 stats.rows_synced += rows;
126 }
127 Err(e) => {
128 tracing::error!("Failed to sync {}.{}: {:?}", self.config.schema, table, e);
130 let error_msg =
131 format!("Failed to sync {}.{}: {}", self.config.schema, table, e);
132 stats.errors.push(error_msg);
133 }
134 }
135 }
136
137 state.save(&self.config.state_path).await?;
139
140 stats.duration_ms = start.elapsed().as_millis() as u64;
141 Ok(stats)
142 }
143
144 pub async fn run_reconciliation(&self) -> Result<SyncStats> {
146 let start = std::time::Instant::now();
147 let mut stats = SyncStats::default();
148
149 let source_client = crate::postgres::connect_with_retry(&self.source_url)
151 .await
152 .context("Failed to connect to source database")?;
153 let target_client = crate::postgres::connect_with_retry(&self.target_url)
154 .await
155 .context("Failed to connect to target database")?;
156
157 let reconciler = Reconciler::new(&source_client, &target_client);
158 let reader = XminReader::new(&source_client);
159
160 let tables = if self.config.tables.is_empty() {
162 reader.list_tables(&self.config.schema).await?
163 } else {
164 self.config.tables.clone()
165 };
166
167 for table in &tables {
169 match reconciler
171 .table_exists_in_target(&self.config.schema, table)
172 .await
173 {
174 Ok(true) => {}
175 Ok(false) => {
176 tracing::warn!(
177 "Skipping reconciliation for {}.{}: table does not exist in target",
178 self.config.schema,
179 table
180 );
181 continue;
182 }
183 Err(e) => {
184 tracing::warn!(
185 "Skipping reconciliation for {}.{}: failed to check table existence: {}",
186 self.config.schema,
187 table,
188 e
189 );
190 continue;
191 }
192 }
193
194 let pk_columns = reader.get_primary_key(&self.config.schema, table).await?;
196 if pk_columns.is_empty() {
197 tracing::warn!(
198 "Skipping reconciliation for {}.{}: no primary key",
199 self.config.schema,
200 table
201 );
202 continue;
203 }
204
205 match reconciler
206 .reconcile_table_batched(
207 &self.config.schema,
208 table,
209 &pk_columns,
210 self.config.batch_size,
211 )
212 .await
213 {
214 Ok(deleted) => {
215 stats.tables_synced += 1;
216 stats.rows_deleted += deleted;
217 }
218 Err(e) => {
219 let error_msg = format!(
220 "Failed to reconcile {}.{}: {}",
221 self.config.schema, table, e
222 );
223 tracing::error!("{}", error_msg);
224 stats.errors.push(error_msg);
225 }
226 }
227 }
228
229 stats.duration_ms = start.elapsed().as_millis() as u64;
230 Ok(stats)
231 }
232
233 pub async fn run(&self, mut shutdown: tokio::sync::broadcast::Receiver<()>) -> Result<()> {
238 let mut sync_interval = interval(self.config.sync_interval);
239 let mut reconcile_interval = self.config.reconcile_interval.map(|d| interval(d));
240
241 let mut cycles = 0u64;
242 let mut reconcile_cycles = 0u64;
243
244 tracing::info!(
245 "Starting SyncDaemon with sync_interval={:?}, reconcile_interval={:?}",
246 self.config.sync_interval,
247 self.config.reconcile_interval
248 );
249
250 loop {
251 tokio::select! {
252 biased; _ = shutdown.recv() => {
255 tracing::info!("Shutdown signal received, stopping SyncDaemon");
256 break;
257 }
258 _ = sync_interval.tick() => {
259 cycles += 1;
260 tracing::info!("Starting sync cycle {}", cycles);
261
262 tokio::select! {
264 biased;
265 _ = shutdown.recv() => {
266 tracing::info!("Shutdown signal received during sync cycle, aborting");
267 break;
268 }
269 result = self.run_sync_cycle() => {
270 match result {
271 Ok(stats) => {
272 tracing::info!(
273 "Sync cycle {} completed: {} tables, {} rows in {}ms",
274 cycles,
275 stats.tables_synced,
276 stats.rows_synced,
277 stats.duration_ms
278 );
279 if !stats.errors.is_empty() {
280 tracing::warn!("Sync cycle had {} errors", stats.errors.len());
281 }
282 }
283 Err(e) => {
284 tracing::error!("Sync cycle {} failed: {}", cycles, e);
285 }
286 }
287 }
288 }
289 }
290 _ = async {
291 if let Some(ref mut interval) = reconcile_interval {
292 interval.tick().await
293 } else {
294 std::future::pending::<tokio::time::Instant>().await
295 }
296 } => {
297 reconcile_cycles += 1;
298 tracing::info!("Starting reconciliation cycle {}", reconcile_cycles);
299
300 tokio::select! {
302 biased;
303 _ = shutdown.recv() => {
304 tracing::info!("Shutdown signal received during reconciliation, aborting");
305 break;
306 }
307 result = self.run_reconciliation() => {
308 match result {
309 Ok(stats) => {
310 tracing::info!(
311 "Reconciliation cycle {} completed: {} tables, {} rows deleted in {}ms",
312 reconcile_cycles,
313 stats.tables_synced,
314 stats.rows_deleted,
315 stats.duration_ms
316 );
317 }
318 Err(e) => {
319 tracing::error!("Reconciliation cycle {} failed: {}", reconcile_cycles, e);
320 }
321 }
322 }
323 }
324 }
325 }
326 }
327
328 Ok(())
329 }
330
331 async fn sync_table(
337 &self,
338 reader: &XminReader<'_>,
339 writer: &ChangeWriter<'_>,
340 state: &mut SyncState,
341 schema: &str,
342 table: &str,
343 ) -> Result<u64> {
344 let table_state = state.get_or_create_table(schema, table);
346 let stored_xmin = table_state.last_xmin;
347
348 let columns = get_table_columns(reader.client(), schema, table).await?;
350 let pk_columns = get_primary_key_columns(reader.client(), schema, table).await?;
351
352 if pk_columns.is_empty() {
353 anyhow::bail!("Table {}.{} has no primary key", schema, table);
354 }
355
356 let column_names: Vec<String> = columns.iter().map(|(name, _)| name.clone()).collect();
357
358 let current_xmin = reader.get_current_xmin().await?;
360 let (since_xmin, is_full_sync) = if detect_wraparound(stored_xmin, current_xmin)
361 == WraparoundCheck::WraparoundDetected
362 {
363 tracing::warn!(
364 "xmin wraparound detected for {}.{} - performing full table sync",
365 schema,
366 table
367 );
368 (0, true) } else {
370 (stored_xmin, false)
371 };
372
373 let batch_size = self.config.batch_size;
375 let mut batch_reader = reader
376 .read_changes_batched(schema, table, &column_names, since_xmin, batch_size)
377 .await?;
378
379 let mut total_rows = 0u64;
380 let mut max_xmin = since_xmin;
381 let mut batch_count = 0u64;
382
383 while let Some((rows, batch_max_xmin)) = reader.fetch_batch(&mut batch_reader).await? {
385 if rows.is_empty() {
386 break;
387 }
388
389 batch_count += 1;
390 let batch_len = rows.len();
391
392 if batch_count == 1 {
394 if is_full_sync {
395 tracing::info!(
396 "Starting full table sync for {}.{} (batch size: {})",
397 schema,
398 table,
399 batch_size
400 );
401 } else {
402 tracing::info!(
403 "Found changes in {}.{} (xmin {} -> {}), processing in batches",
404 schema,
405 table,
406 since_xmin,
407 batch_max_xmin
408 );
409 }
410 }
411
412 let values: Vec<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>> = rows
414 .iter()
415 .map(|row| row_to_values(row, &columns))
416 .collect();
417
418 let affected = writer
419 .apply_batch(schema, table, &pk_columns, &column_names, values)
420 .await?;
421
422 total_rows += affected;
423 max_xmin = batch_max_xmin;
424
425 state.update_table(schema, table, max_xmin, affected);
427
428 if batch_count.is_multiple_of(10) || total_rows % 100_000 < batch_len as u64 {
430 tracing::info!(
431 "Progress: {}.{} - {} rows synced ({} batches), current xmin: {}",
432 schema,
433 table,
434 total_rows,
435 batch_count,
436 max_xmin
437 );
438 }
439 }
440
441 if total_rows == 0 {
442 tracing::debug!(
443 "No changes in {}.{} since xmin {}",
444 schema,
445 table,
446 since_xmin
447 );
448 } else {
449 tracing::info!(
450 "Completed sync for {}.{}: {} rows in {} batches (xmin {} -> {})",
451 schema,
452 table,
453 total_rows,
454 batch_count,
455 since_xmin,
456 max_xmin
457 );
458 }
459
460 Ok(total_rows)
461 }
462
463 async fn load_or_create_state(&self) -> Result<SyncState> {
465 if self.config.state_path.exists() {
466 match SyncState::load(&self.config.state_path).await {
467 Ok(state) => {
468 tracing::info!(
469 "Loaded existing sync state from {:?}",
470 self.config.state_path
471 );
472 return Ok(state);
473 }
474 Err(e) => {
475 tracing::warn!(
476 "Failed to load sync state from {:?}: {}. Creating new state.",
477 self.config.state_path,
478 e
479 );
480 }
481 }
482 }
483
484 tracing::info!("Creating new sync state");
485 Ok(SyncState::new(&self.source_url, &self.target_url))
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492
493 #[test]
494 fn test_daemon_config_default() {
495 let config = DaemonConfig::default();
496 assert_eq!(config.sync_interval, Duration::from_secs(3600));
497 assert_eq!(config.reconcile_interval, Some(Duration::from_secs(86400)));
498 assert_eq!(config.batch_size, 10_000);
499 assert_eq!(config.schema, "public");
500 }
501
502 #[test]
503 fn test_sync_stats_success() {
504 let stats = SyncStats {
505 tables_synced: 5,
506 rows_synced: 100,
507 rows_deleted: 0,
508 errors: vec![],
509 duration_ms: 500,
510 };
511 assert!(stats.is_success());
512 }
513
514 #[test]
515 fn test_sync_stats_with_errors() {
516 let stats = SyncStats {
517 tables_synced: 4,
518 rows_synced: 80,
519 rows_deleted: 0,
520 errors: vec!["Failed to sync table X".to_string()],
521 duration_ms: 500,
522 };
523 assert!(!stats.is_success());
524 }
525}