database_replicator/xmin/
daemon.rs

1// ABOUTME: SyncDaemon for xmin-based sync - orchestrates continuous replication
2// ABOUTME: Runs sync cycles at configurable intervals with reconciliation
3
4use anyhow::{Context, Result};
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::time::interval;
8
9use super::reader::{detect_wraparound, WraparoundCheck, XminReader};
10use super::reconciler::Reconciler;
11use super::state::SyncState;
12use super::writer::{get_primary_key_columns, get_table_columns, row_to_values, ChangeWriter};
13
14/// Configuration for the SyncDaemon.
15#[derive(Debug, Clone)]
16pub struct DaemonConfig {
17    /// Interval between sync cycles
18    pub sync_interval: Duration,
19    /// Interval between reconciliation cycles (delete detection)
20    /// Set to None to disable reconciliation
21    pub reconcile_interval: Option<Duration>,
22    /// Path to store sync state
23    pub state_path: PathBuf,
24    /// Maximum rows to process per batch
25    pub batch_size: usize,
26    /// Tables to sync (empty = all tables)
27    pub tables: Vec<String>,
28    /// Schema to sync from
29    pub schema: String,
30}
31
32impl Default for DaemonConfig {
33    fn default() -> Self {
34        Self {
35            sync_interval: Duration::from_secs(3600), // 1 hour
36            reconcile_interval: Some(Duration::from_secs(86400)), // 1 day
37            state_path: SyncState::default_path(),
38            batch_size: 10_000, // 10K rows per batch for good throughput while bounding memory
39            tables: Vec::new(),
40            schema: "public".to_string(),
41        }
42    }
43}
44
45/// Statistics from a sync cycle.
46#[derive(Debug, Clone, Default)]
47pub struct SyncStats {
48    pub tables_synced: usize,
49    pub rows_synced: u64,
50    pub rows_deleted: u64,
51    pub errors: Vec<String>,
52    pub duration_ms: u64,
53}
54
55impl SyncStats {
56    /// Check if the sync cycle completed without errors.
57    pub fn is_success(&self) -> bool {
58        self.errors.is_empty()
59    }
60}
61
62/// SyncDaemon orchestrates continuous xmin-based replication.
63///
64/// It runs periodic sync cycles that:
65/// 1. Read changed rows from source using xmin
66/// 2. Apply changes to target using upsert
67/// 3. Periodically run reconciliation to detect deletes
68/// 4. Persist sync state for resume capability
69pub struct SyncDaemon {
70    config: DaemonConfig,
71    source_url: String,
72    target_url: String,
73}
74
75impl SyncDaemon {
76    /// Create a new SyncDaemon with the given configuration.
77    pub fn new(source_url: String, target_url: String, config: DaemonConfig) -> Self {
78        Self {
79            config,
80            source_url,
81            target_url,
82        }
83    }
84
85    /// Run a single sync cycle for all configured tables.
86    ///
87    /// This is the main entry point for synchronization. It:
88    /// 1. Loads or creates sync state
89    /// 2. Connects to source and target databases
90    /// 3. Syncs each table
91    /// 4. Saves updated state
92    pub async fn run_sync_cycle(&self) -> Result<SyncStats> {
93        let start = std::time::Instant::now();
94        let mut stats = SyncStats::default();
95
96        // Load or create sync state
97        let mut state = self.load_or_create_state().await?;
98
99        // Connect to databases
100        let source_client = crate::postgres::connect_with_retry(&self.source_url)
101            .await
102            .context("Failed to connect to source database")?;
103        let target_client = crate::postgres::connect_with_retry(&self.target_url)
104            .await
105            .context("Failed to connect to target database")?;
106
107        let reader = XminReader::new(&source_client);
108        let writer = ChangeWriter::new(&target_client);
109
110        // Get tables to sync
111        let tables = if self.config.tables.is_empty() {
112            reader.list_tables(&self.config.schema).await?
113        } else {
114            self.config.tables.clone()
115        };
116
117        // Sync each table
118        for table in &tables {
119            match self
120                .sync_table(&reader, &writer, &mut state, &self.config.schema, table)
121                .await
122            {
123                Ok(rows) => {
124                    stats.tables_synced += 1;
125                    stats.rows_synced += rows;
126                }
127                Err(e) => {
128                    // Log with :? to show full error chain including root cause
129                    tracing::error!("Failed to sync {}.{}: {:?}", self.config.schema, table, e);
130                    let error_msg =
131                        format!("Failed to sync {}.{}: {}", self.config.schema, table, e);
132                    stats.errors.push(error_msg);
133                }
134            }
135        }
136
137        // Save state
138        state.save(&self.config.state_path).await?;
139
140        stats.duration_ms = start.elapsed().as_millis() as u64;
141        Ok(stats)
142    }
143
144    /// Run reconciliation to detect and delete orphaned rows.
145    pub async fn run_reconciliation(&self) -> Result<SyncStats> {
146        let start = std::time::Instant::now();
147        let mut stats = SyncStats::default();
148
149        // Connect to databases
150        let source_client = crate::postgres::connect_with_retry(&self.source_url)
151            .await
152            .context("Failed to connect to source database")?;
153        let target_client = crate::postgres::connect_with_retry(&self.target_url)
154            .await
155            .context("Failed to connect to target database")?;
156
157        let reconciler = Reconciler::new(&source_client, &target_client);
158        let reader = XminReader::new(&source_client);
159
160        // Get tables to reconcile
161        let tables = if self.config.tables.is_empty() {
162            reader.list_tables(&self.config.schema).await?
163        } else {
164            self.config.tables.clone()
165        };
166
167        // Reconcile each table
168        for table in &tables {
169            // Check if table exists in target before reconciliation
170            match reconciler
171                .table_exists_in_target(&self.config.schema, table)
172                .await
173            {
174                Ok(true) => {}
175                Ok(false) => {
176                    tracing::warn!(
177                        "Skipping reconciliation for {}.{}: table does not exist in target",
178                        self.config.schema,
179                        table
180                    );
181                    continue;
182                }
183                Err(e) => {
184                    tracing::warn!(
185                        "Skipping reconciliation for {}.{}: failed to check table existence: {}",
186                        self.config.schema,
187                        table,
188                        e
189                    );
190                    continue;
191                }
192            }
193
194            // Get primary key columns
195            let pk_columns = reader.get_primary_key(&self.config.schema, table).await?;
196            if pk_columns.is_empty() {
197                tracing::warn!(
198                    "Skipping reconciliation for {}.{}: no primary key",
199                    self.config.schema,
200                    table
201                );
202                continue;
203            }
204
205            match reconciler
206                .reconcile_table_batched(
207                    &self.config.schema,
208                    table,
209                    &pk_columns,
210                    self.config.batch_size,
211                )
212                .await
213            {
214                Ok(deleted) => {
215                    stats.tables_synced += 1;
216                    stats.rows_deleted += deleted;
217                }
218                Err(e) => {
219                    let error_msg = format!(
220                        "Failed to reconcile {}.{}: {}",
221                        self.config.schema, table, e
222                    );
223                    tracing::error!("{}", error_msg);
224                    stats.errors.push(error_msg);
225                }
226            }
227        }
228
229        stats.duration_ms = start.elapsed().as_millis() as u64;
230        Ok(stats)
231    }
232
233    /// Run the daemon continuously until stopped.
234    ///
235    /// This starts the main loop that runs sync cycles at the configured interval.
236    /// Reconciliation runs at its own interval if configured.
237    pub async fn run(&self, mut shutdown: tokio::sync::broadcast::Receiver<()>) -> Result<()> {
238        let mut sync_interval = interval(self.config.sync_interval);
239        let mut reconcile_interval = self.config.reconcile_interval.map(|d| interval(d));
240
241        let mut cycles = 0u64;
242        let mut reconcile_cycles = 0u64;
243
244        tracing::info!(
245            "Starting SyncDaemon with sync_interval={:?}, reconcile_interval={:?}",
246            self.config.sync_interval,
247            self.config.reconcile_interval
248        );
249
250        loop {
251            tokio::select! {
252                biased; // Check shutdown first
253
254                _ = shutdown.recv() => {
255                    tracing::info!("Shutdown signal received, stopping SyncDaemon");
256                    break;
257                }
258                _ = sync_interval.tick() => {
259                    cycles += 1;
260                    tracing::info!("Starting sync cycle {}", cycles);
261
262                    // Run sync cycle with shutdown check - abort if shutdown received
263                    tokio::select! {
264                        biased;
265                        _ = shutdown.recv() => {
266                            tracing::info!("Shutdown signal received during sync cycle, aborting");
267                            break;
268                        }
269                        result = self.run_sync_cycle() => {
270                            match result {
271                                Ok(stats) => {
272                                    tracing::info!(
273                                        "Sync cycle {} completed: {} tables, {} rows in {}ms",
274                                        cycles,
275                                        stats.tables_synced,
276                                        stats.rows_synced,
277                                        stats.duration_ms
278                                    );
279                                    if !stats.errors.is_empty() {
280                                        tracing::warn!("Sync cycle had {} errors", stats.errors.len());
281                                    }
282                                }
283                                Err(e) => {
284                                    tracing::error!("Sync cycle {} failed: {}", cycles, e);
285                                }
286                            }
287                        }
288                    }
289                }
290                _ = async {
291                    if let Some(ref mut interval) = reconcile_interval {
292                        interval.tick().await
293                    } else {
294                        std::future::pending::<tokio::time::Instant>().await
295                    }
296                } => {
297                    reconcile_cycles += 1;
298                    tracing::info!("Starting reconciliation cycle {}", reconcile_cycles);
299
300                    // Run reconciliation with shutdown check
301                    tokio::select! {
302                        biased;
303                        _ = shutdown.recv() => {
304                            tracing::info!("Shutdown signal received during reconciliation, aborting");
305                            break;
306                        }
307                        result = self.run_reconciliation() => {
308                            match result {
309                                Ok(stats) => {
310                                    tracing::info!(
311                                        "Reconciliation cycle {} completed: {} tables, {} rows deleted in {}ms",
312                                        reconcile_cycles,
313                                        stats.tables_synced,
314                                        stats.rows_deleted,
315                                        stats.duration_ms
316                                    );
317                                }
318                                Err(e) => {
319                                    tracing::error!("Reconciliation cycle {} failed: {}", reconcile_cycles, e);
320                                }
321                            }
322                        }
323                    }
324                }
325            }
326        }
327
328        Ok(())
329    }
330
331    /// Sync a single table using batched processing.
332    ///
333    /// This method processes rows in batches to avoid loading entire tables into memory.
334    /// This is critical for large tables (millions of rows) where loading everything
335    /// at once would cause OOM or connection timeouts.
336    async fn sync_table(
337        &self,
338        reader: &XminReader<'_>,
339        writer: &ChangeWriter<'_>,
340        state: &mut SyncState,
341        schema: &str,
342        table: &str,
343    ) -> Result<u64> {
344        // Get table state
345        let table_state = state.get_or_create_table(schema, table);
346        let stored_xmin = table_state.last_xmin;
347
348        // Get table metadata from SOURCE (not target - tables may not exist there yet)
349        let columns = get_table_columns(reader.client(), schema, table).await?;
350        let pk_columns = get_primary_key_columns(reader.client(), schema, table).await?;
351
352        if pk_columns.is_empty() {
353            anyhow::bail!("Table {}.{} has no primary key", schema, table);
354        }
355
356        let column_names: Vec<String> = columns.iter().map(|(name, _)| name.clone()).collect();
357
358        // Check for xmin wraparound before starting
359        let current_xmin = reader.get_current_xmin().await?;
360        let (since_xmin, is_full_sync) = if detect_wraparound(stored_xmin, current_xmin)
361            == WraparoundCheck::WraparoundDetected
362        {
363            tracing::warn!(
364                "xmin wraparound detected for {}.{} - performing full table sync",
365                schema,
366                table
367            );
368            (0, true) // Start from beginning
369        } else {
370            (stored_xmin, false)
371        };
372
373        // Use batched reading to avoid loading entire table into memory
374        let batch_size = self.config.batch_size;
375        let mut batch_reader = reader
376            .read_changes_batched(schema, table, &column_names, since_xmin, batch_size)
377            .await?;
378
379        let mut total_rows = 0u64;
380        let mut max_xmin = since_xmin;
381        let mut batch_count = 0u64;
382
383        // Process batches until exhausted
384        while let Some((rows, batch_max_xmin)) = reader.fetch_batch(&mut batch_reader).await? {
385            if rows.is_empty() {
386                break;
387            }
388
389            batch_count += 1;
390            let batch_len = rows.len();
391
392            // Log first batch with total context, then periodic progress
393            if batch_count == 1 {
394                if is_full_sync {
395                    tracing::info!(
396                        "Starting full table sync for {}.{} (batch size: {})",
397                        schema,
398                        table,
399                        batch_size
400                    );
401                } else {
402                    tracing::info!(
403                        "Found changes in {}.{} (xmin {} -> {}), processing in batches",
404                        schema,
405                        table,
406                        since_xmin,
407                        batch_max_xmin
408                    );
409                }
410            }
411
412            // Convert and apply batch immediately (memory = O(batch_size))
413            let values: Vec<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>> = rows
414                .iter()
415                .map(|row| row_to_values(row, &columns))
416                .collect();
417
418            let affected = writer
419                .apply_batch(schema, table, &pk_columns, &column_names, values)
420                .await?;
421
422            total_rows += affected;
423            max_xmin = batch_max_xmin;
424
425            // Update state after each batch for resume capability
426            state.update_table(schema, table, max_xmin, affected);
427
428            // Log progress every 10 batches or 100K rows
429            if batch_count.is_multiple_of(10) || total_rows % 100_000 < batch_len as u64 {
430                tracing::info!(
431                    "Progress: {}.{} - {} rows synced ({} batches), current xmin: {}",
432                    schema,
433                    table,
434                    total_rows,
435                    batch_count,
436                    max_xmin
437                );
438            }
439        }
440
441        if total_rows == 0 {
442            tracing::debug!(
443                "No changes in {}.{} since xmin {}",
444                schema,
445                table,
446                since_xmin
447            );
448        } else {
449            tracing::info!(
450                "Completed sync for {}.{}: {} rows in {} batches (xmin {} -> {})",
451                schema,
452                table,
453                total_rows,
454                batch_count,
455                since_xmin,
456                max_xmin
457            );
458        }
459
460        Ok(total_rows)
461    }
462
463    /// Load existing state or create new state.
464    async fn load_or_create_state(&self) -> Result<SyncState> {
465        if self.config.state_path.exists() {
466            match SyncState::load(&self.config.state_path).await {
467                Ok(state) => {
468                    tracing::info!(
469                        "Loaded existing sync state from {:?}",
470                        self.config.state_path
471                    );
472                    return Ok(state);
473                }
474                Err(e) => {
475                    tracing::warn!(
476                        "Failed to load sync state from {:?}: {}. Creating new state.",
477                        self.config.state_path,
478                        e
479                    );
480                }
481            }
482        }
483
484        tracing::info!("Creating new sync state");
485        Ok(SyncState::new(&self.source_url, &self.target_url))
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492
493    #[test]
494    fn test_daemon_config_default() {
495        let config = DaemonConfig::default();
496        assert_eq!(config.sync_interval, Duration::from_secs(3600));
497        assert_eq!(config.reconcile_interval, Some(Duration::from_secs(86400)));
498        assert_eq!(config.batch_size, 10_000);
499        assert_eq!(config.schema, "public");
500    }
501
502    #[test]
503    fn test_sync_stats_success() {
504        let stats = SyncStats {
505            tables_synced: 5,
506            rows_synced: 100,
507            rows_deleted: 0,
508            errors: vec![],
509            duration_ms: 500,
510        };
511        assert!(stats.is_success());
512    }
513
514    #[test]
515    fn test_sync_stats_with_errors() {
516        let stats = SyncStats {
517            tables_synced: 4,
518            rows_synced: 80,
519            rows_deleted: 0,
520            errors: vec!["Failed to sync table X".to_string()],
521            duration_ms: 500,
522        };
523        assert!(!stats.is_success());
524    }
525}