database_replicator/xmin/
daemon.rs

1// ABOUTME: SyncDaemon for xmin-based sync - orchestrates continuous replication
2// ABOUTME: Runs sync cycles at configurable intervals with reconciliation
3
4use anyhow::{Context, Result};
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::time::interval;
8
9use super::reader::XminReader;
10use super::reconciler::Reconciler;
11use super::state::SyncState;
12use super::writer::{get_primary_key_columns, get_table_columns, row_to_values, ChangeWriter};
13
14/// Configuration for the SyncDaemon.
15#[derive(Debug, Clone)]
16pub struct DaemonConfig {
17    /// Interval between sync cycles
18    pub sync_interval: Duration,
19    /// Interval between reconciliation cycles (delete detection)
20    /// Set to None to disable reconciliation
21    pub reconcile_interval: Option<Duration>,
22    /// Path to store sync state
23    pub state_path: PathBuf,
24    /// Maximum rows to process per batch
25    pub batch_size: usize,
26    /// Tables to sync (empty = all tables)
27    pub tables: Vec<String>,
28    /// Schema to sync from
29    pub schema: String,
30}
31
32impl Default for DaemonConfig {
33    fn default() -> Self {
34        Self {
35            sync_interval: Duration::from_secs(3600), // 1 hour
36            reconcile_interval: Some(Duration::from_secs(86400)), // 1 day
37            state_path: SyncState::default_path(),
38            batch_size: 1000,
39            tables: Vec::new(),
40            schema: "public".to_string(),
41        }
42    }
43}
44
45/// Statistics from a sync cycle.
46#[derive(Debug, Clone, Default)]
47pub struct SyncStats {
48    pub tables_synced: usize,
49    pub rows_synced: u64,
50    pub rows_deleted: u64,
51    pub errors: Vec<String>,
52    pub duration_ms: u64,
53}
54
55impl SyncStats {
56    /// Check if the sync cycle completed without errors.
57    pub fn is_success(&self) -> bool {
58        self.errors.is_empty()
59    }
60}
61
62/// SyncDaemon orchestrates continuous xmin-based replication.
63///
64/// It runs periodic sync cycles that:
65/// 1. Read changed rows from source using xmin
66/// 2. Apply changes to target using upsert
67/// 3. Periodically run reconciliation to detect deletes
68/// 4. Persist sync state for resume capability
69pub struct SyncDaemon {
70    config: DaemonConfig,
71    source_url: String,
72    target_url: String,
73}
74
75impl SyncDaemon {
76    /// Create a new SyncDaemon with the given configuration.
77    pub fn new(source_url: String, target_url: String, config: DaemonConfig) -> Self {
78        Self {
79            config,
80            source_url,
81            target_url,
82        }
83    }
84
85    /// Run a single sync cycle for all configured tables.
86    ///
87    /// This is the main entry point for synchronization. It:
88    /// 1. Loads or creates sync state
89    /// 2. Connects to source and target databases
90    /// 3. Syncs each table
91    /// 4. Saves updated state
92    pub async fn run_sync_cycle(&self) -> Result<SyncStats> {
93        let start = std::time::Instant::now();
94        let mut stats = SyncStats::default();
95
96        // Load or create sync state
97        let mut state = self.load_or_create_state().await?;
98
99        // Connect to databases
100        let source_client = crate::postgres::connect_with_retry(&self.source_url)
101            .await
102            .context("Failed to connect to source database")?;
103        let target_client = crate::postgres::connect_with_retry(&self.target_url)
104            .await
105            .context("Failed to connect to target database")?;
106
107        let reader = XminReader::new(&source_client);
108        let writer = ChangeWriter::new(&target_client);
109
110        // Get tables to sync
111        let tables = if self.config.tables.is_empty() {
112            reader.list_tables(&self.config.schema).await?
113        } else {
114            self.config.tables.clone()
115        };
116
117        // Sync each table
118        for table in &tables {
119            match self
120                .sync_table(&reader, &writer, &mut state, &self.config.schema, table)
121                .await
122            {
123                Ok(rows) => {
124                    stats.tables_synced += 1;
125                    stats.rows_synced += rows;
126                }
127                Err(e) => {
128                    // Log with :? to show full error chain including root cause
129                    tracing::error!("Failed to sync {}.{}: {:?}", self.config.schema, table, e);
130                    let error_msg =
131                        format!("Failed to sync {}.{}: {}", self.config.schema, table, e);
132                    stats.errors.push(error_msg);
133                }
134            }
135        }
136
137        // Save state
138        state.save(&self.config.state_path).await?;
139
140        stats.duration_ms = start.elapsed().as_millis() as u64;
141        Ok(stats)
142    }
143
144    /// Run reconciliation to detect and delete orphaned rows.
145    pub async fn run_reconciliation(&self) -> Result<SyncStats> {
146        let start = std::time::Instant::now();
147        let mut stats = SyncStats::default();
148
149        // Connect to databases
150        let source_client = crate::postgres::connect_with_retry(&self.source_url)
151            .await
152            .context("Failed to connect to source database")?;
153        let target_client = crate::postgres::connect_with_retry(&self.target_url)
154            .await
155            .context("Failed to connect to target database")?;
156
157        let reconciler = Reconciler::new(&source_client, &target_client);
158        let reader = XminReader::new(&source_client);
159
160        // Get tables to reconcile
161        let tables = if self.config.tables.is_empty() {
162            reader.list_tables(&self.config.schema).await?
163        } else {
164            self.config.tables.clone()
165        };
166
167        // Reconcile each table
168        for table in &tables {
169            // Check if table exists in target before reconciliation
170            match reconciler
171                .table_exists_in_target(&self.config.schema, table)
172                .await
173            {
174                Ok(true) => {}
175                Ok(false) => {
176                    tracing::warn!(
177                        "Skipping reconciliation for {}.{}: table does not exist in target",
178                        self.config.schema,
179                        table
180                    );
181                    continue;
182                }
183                Err(e) => {
184                    tracing::warn!(
185                        "Skipping reconciliation for {}.{}: failed to check table existence: {}",
186                        self.config.schema,
187                        table,
188                        e
189                    );
190                    continue;
191                }
192            }
193
194            // Get primary key columns
195            let pk_columns = reader.get_primary_key(&self.config.schema, table).await?;
196            if pk_columns.is_empty() {
197                tracing::warn!(
198                    "Skipping reconciliation for {}.{}: no primary key",
199                    self.config.schema,
200                    table
201                );
202                continue;
203            }
204
205            match reconciler
206                .reconcile_table(&self.config.schema, table, &pk_columns)
207                .await
208            {
209                Ok(deleted) => {
210                    stats.tables_synced += 1;
211                    stats.rows_deleted += deleted;
212                }
213                Err(e) => {
214                    let error_msg = format!(
215                        "Failed to reconcile {}.{}: {}",
216                        self.config.schema, table, e
217                    );
218                    tracing::error!("{}", error_msg);
219                    stats.errors.push(error_msg);
220                }
221            }
222        }
223
224        stats.duration_ms = start.elapsed().as_millis() as u64;
225        Ok(stats)
226    }
227
228    /// Run the daemon continuously until stopped.
229    ///
230    /// This starts the main loop that runs sync cycles at the configured interval.
231    /// Reconciliation runs at its own interval if configured.
232    pub async fn run(&self, mut shutdown: tokio::sync::broadcast::Receiver<()>) -> Result<()> {
233        let mut sync_interval = interval(self.config.sync_interval);
234        let mut reconcile_interval = self.config.reconcile_interval.map(|d| interval(d));
235
236        let mut cycles = 0u64;
237        let mut reconcile_cycles = 0u64;
238
239        tracing::info!(
240            "Starting SyncDaemon with sync_interval={:?}, reconcile_interval={:?}",
241            self.config.sync_interval,
242            self.config.reconcile_interval
243        );
244
245        loop {
246            tokio::select! {
247                biased; // Check shutdown first
248
249                _ = shutdown.recv() => {
250                    tracing::info!("Shutdown signal received, stopping SyncDaemon");
251                    break;
252                }
253                _ = sync_interval.tick() => {
254                    cycles += 1;
255                    tracing::info!("Starting sync cycle {}", cycles);
256
257                    // Run sync cycle with shutdown check - abort if shutdown received
258                    tokio::select! {
259                        biased;
260                        _ = shutdown.recv() => {
261                            tracing::info!("Shutdown signal received during sync cycle, aborting");
262                            break;
263                        }
264                        result = self.run_sync_cycle() => {
265                            match result {
266                                Ok(stats) => {
267                                    tracing::info!(
268                                        "Sync cycle {} completed: {} tables, {} rows in {}ms",
269                                        cycles,
270                                        stats.tables_synced,
271                                        stats.rows_synced,
272                                        stats.duration_ms
273                                    );
274                                    if !stats.errors.is_empty() {
275                                        tracing::warn!("Sync cycle had {} errors", stats.errors.len());
276                                    }
277                                }
278                                Err(e) => {
279                                    tracing::error!("Sync cycle {} failed: {}", cycles, e);
280                                }
281                            }
282                        }
283                    }
284                }
285                _ = async {
286                    if let Some(ref mut interval) = reconcile_interval {
287                        interval.tick().await
288                    } else {
289                        std::future::pending::<tokio::time::Instant>().await
290                    }
291                } => {
292                    reconcile_cycles += 1;
293                    tracing::info!("Starting reconciliation cycle {}", reconcile_cycles);
294
295                    // Run reconciliation with shutdown check
296                    tokio::select! {
297                        biased;
298                        _ = shutdown.recv() => {
299                            tracing::info!("Shutdown signal received during reconciliation, aborting");
300                            break;
301                        }
302                        result = self.run_reconciliation() => {
303                            match result {
304                                Ok(stats) => {
305                                    tracing::info!(
306                                        "Reconciliation cycle {} completed: {} tables, {} rows deleted in {}ms",
307                                        reconcile_cycles,
308                                        stats.tables_synced,
309                                        stats.rows_deleted,
310                                        stats.duration_ms
311                                    );
312                                }
313                                Err(e) => {
314                                    tracing::error!("Reconciliation cycle {} failed: {}", reconcile_cycles, e);
315                                }
316                            }
317                        }
318                    }
319                }
320            }
321        }
322
323        Ok(())
324    }
325
326    /// Sync a single table.
327    async fn sync_table(
328        &self,
329        reader: &XminReader<'_>,
330        writer: &ChangeWriter<'_>,
331        state: &mut SyncState,
332        schema: &str,
333        table: &str,
334    ) -> Result<u64> {
335        // Get table state
336        let table_state = state.get_or_create_table(schema, table);
337        let since_xmin = table_state.last_xmin;
338
339        // Get table metadata from SOURCE (not target - tables may not exist there yet)
340        let columns = get_table_columns(reader.client(), schema, table).await?;
341        let pk_columns = get_primary_key_columns(reader.client(), schema, table).await?;
342
343        if pk_columns.is_empty() {
344            anyhow::bail!("Table {}.{} has no primary key", schema, table);
345        }
346
347        let column_names: Vec<String> = columns.iter().map(|(name, _)| name.clone()).collect();
348
349        // Read changes with wraparound detection
350        let (rows, max_xmin, was_full_sync) = reader
351            .read_changes_with_wraparound_check(schema, table, &column_names, since_xmin)
352            .await?;
353
354        if rows.is_empty() {
355            tracing::debug!(
356                "No changes in {}.{} since xmin {}",
357                schema,
358                table,
359                since_xmin
360            );
361            return Ok(0);
362        }
363
364        if was_full_sync {
365            tracing::warn!(
366                "xmin wraparound detected for {}.{} - performed full table sync ({} rows)",
367                schema,
368                table,
369                rows.len()
370            );
371        } else {
372            tracing::info!(
373                "Found {} changed rows in {}.{} (xmin {} -> {})",
374                rows.len(),
375                schema,
376                table,
377                since_xmin,
378                max_xmin
379            );
380        }
381
382        // Convert rows to values (excluding the _xmin column we added)
383        let values: Vec<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>> = rows
384            .iter()
385            .map(|row| row_to_values(row, &columns))
386            .collect();
387
388        // Apply changes
389        let affected = writer
390            .apply_batch(schema, table, &pk_columns, &column_names, values)
391            .await?;
392
393        // Update state
394        state.update_table(schema, table, max_xmin, affected);
395
396        Ok(affected)
397    }
398
399    /// Load existing state or create new state.
400    async fn load_or_create_state(&self) -> Result<SyncState> {
401        if self.config.state_path.exists() {
402            match SyncState::load(&self.config.state_path).await {
403                Ok(state) => {
404                    tracing::info!(
405                        "Loaded existing sync state from {:?}",
406                        self.config.state_path
407                    );
408                    return Ok(state);
409                }
410                Err(e) => {
411                    tracing::warn!(
412                        "Failed to load sync state from {:?}: {}. Creating new state.",
413                        self.config.state_path,
414                        e
415                    );
416                }
417            }
418        }
419
420        tracing::info!("Creating new sync state");
421        Ok(SyncState::new(&self.source_url, &self.target_url))
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428
429    #[test]
430    fn test_daemon_config_default() {
431        let config = DaemonConfig::default();
432        assert_eq!(config.sync_interval, Duration::from_secs(3600));
433        assert_eq!(config.reconcile_interval, Some(Duration::from_secs(86400)));
434        assert_eq!(config.batch_size, 1000);
435        assert_eq!(config.schema, "public");
436    }
437
438    #[test]
439    fn test_sync_stats_success() {
440        let stats = SyncStats {
441            tables_synced: 5,
442            rows_synced: 100,
443            rows_deleted: 0,
444            errors: vec![],
445            duration_ms: 500,
446        };
447        assert!(stats.is_success());
448    }
449
450    #[test]
451    fn test_sync_stats_with_errors() {
452        let stats = SyncStats {
453            tables_synced: 4,
454            rows_synced: 80,
455            rows_deleted: 0,
456            errors: vec!["Failed to sync table X".to_string()],
457            duration_ms: 500,
458        };
459        assert!(!stats.is_success());
460    }
461}