database_replicator/xmin/
daemon.rs1use anyhow::{Context, Result};
5use std::path::PathBuf;
6use std::time::Duration;
7use tokio::time::interval;
8
9use super::reader::XminReader;
10use super::reconciler::Reconciler;
11use super::state::SyncState;
12use super::writer::{get_primary_key_columns, get_table_columns, row_to_values, ChangeWriter};
13
14#[derive(Debug, Clone)]
16pub struct DaemonConfig {
17 pub sync_interval: Duration,
19 pub reconcile_interval: Option<Duration>,
22 pub state_path: PathBuf,
24 pub batch_size: usize,
26 pub tables: Vec<String>,
28 pub schema: String,
30}
31
32impl Default for DaemonConfig {
33 fn default() -> Self {
34 Self {
35 sync_interval: Duration::from_secs(3600), reconcile_interval: Some(Duration::from_secs(86400)), state_path: SyncState::default_path(),
38 batch_size: 1000,
39 tables: Vec::new(),
40 schema: "public".to_string(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Default)]
47pub struct SyncStats {
48 pub tables_synced: usize,
49 pub rows_synced: u64,
50 pub rows_deleted: u64,
51 pub errors: Vec<String>,
52 pub duration_ms: u64,
53}
54
55impl SyncStats {
56 pub fn is_success(&self) -> bool {
58 self.errors.is_empty()
59 }
60}
61
62pub struct SyncDaemon {
70 config: DaemonConfig,
71 source_url: String,
72 target_url: String,
73}
74
75impl SyncDaemon {
76 pub fn new(source_url: String, target_url: String, config: DaemonConfig) -> Self {
78 Self {
79 config,
80 source_url,
81 target_url,
82 }
83 }
84
85 pub async fn run_sync_cycle(&self) -> Result<SyncStats> {
93 let start = std::time::Instant::now();
94 let mut stats = SyncStats::default();
95
96 let mut state = self.load_or_create_state().await?;
98
99 let source_client = crate::postgres::connect_with_retry(&self.source_url)
101 .await
102 .context("Failed to connect to source database")?;
103 let target_client = crate::postgres::connect_with_retry(&self.target_url)
104 .await
105 .context("Failed to connect to target database")?;
106
107 let reader = XminReader::new(&source_client);
108 let writer = ChangeWriter::new(&target_client);
109
110 let tables = if self.config.tables.is_empty() {
112 reader.list_tables(&self.config.schema).await?
113 } else {
114 self.config.tables.clone()
115 };
116
117 for table in &tables {
119 match self
120 .sync_table(&reader, &writer, &mut state, &self.config.schema, table)
121 .await
122 {
123 Ok(rows) => {
124 stats.tables_synced += 1;
125 stats.rows_synced += rows;
126 }
127 Err(e) => {
128 tracing::error!("Failed to sync {}.{}: {:?}", self.config.schema, table, e);
130 let error_msg =
131 format!("Failed to sync {}.{}: {}", self.config.schema, table, e);
132 stats.errors.push(error_msg);
133 }
134 }
135 }
136
137 state.save(&self.config.state_path).await?;
139
140 stats.duration_ms = start.elapsed().as_millis() as u64;
141 Ok(stats)
142 }
143
144 pub async fn run_reconciliation(&self) -> Result<SyncStats> {
146 let start = std::time::Instant::now();
147 let mut stats = SyncStats::default();
148
149 let source_client = crate::postgres::connect_with_retry(&self.source_url)
151 .await
152 .context("Failed to connect to source database")?;
153 let target_client = crate::postgres::connect_with_retry(&self.target_url)
154 .await
155 .context("Failed to connect to target database")?;
156
157 let reconciler = Reconciler::new(&source_client, &target_client);
158 let reader = XminReader::new(&source_client);
159
160 let tables = if self.config.tables.is_empty() {
162 reader.list_tables(&self.config.schema).await?
163 } else {
164 self.config.tables.clone()
165 };
166
167 for table in &tables {
169 match reconciler
171 .table_exists_in_target(&self.config.schema, table)
172 .await
173 {
174 Ok(true) => {}
175 Ok(false) => {
176 tracing::warn!(
177 "Skipping reconciliation for {}.{}: table does not exist in target",
178 self.config.schema,
179 table
180 );
181 continue;
182 }
183 Err(e) => {
184 tracing::warn!(
185 "Skipping reconciliation for {}.{}: failed to check table existence: {}",
186 self.config.schema,
187 table,
188 e
189 );
190 continue;
191 }
192 }
193
194 let pk_columns = reader.get_primary_key(&self.config.schema, table).await?;
196 if pk_columns.is_empty() {
197 tracing::warn!(
198 "Skipping reconciliation for {}.{}: no primary key",
199 self.config.schema,
200 table
201 );
202 continue;
203 }
204
205 match reconciler
206 .reconcile_table(&self.config.schema, table, &pk_columns)
207 .await
208 {
209 Ok(deleted) => {
210 stats.tables_synced += 1;
211 stats.rows_deleted += deleted;
212 }
213 Err(e) => {
214 let error_msg = format!(
215 "Failed to reconcile {}.{}: {}",
216 self.config.schema, table, e
217 );
218 tracing::error!("{}", error_msg);
219 stats.errors.push(error_msg);
220 }
221 }
222 }
223
224 stats.duration_ms = start.elapsed().as_millis() as u64;
225 Ok(stats)
226 }
227
228 pub async fn run(&self, mut shutdown: tokio::sync::broadcast::Receiver<()>) -> Result<()> {
233 let mut sync_interval = interval(self.config.sync_interval);
234 let mut reconcile_interval = self.config.reconcile_interval.map(|d| interval(d));
235
236 let mut cycles = 0u64;
237 let mut reconcile_cycles = 0u64;
238
239 tracing::info!(
240 "Starting SyncDaemon with sync_interval={:?}, reconcile_interval={:?}",
241 self.config.sync_interval,
242 self.config.reconcile_interval
243 );
244
245 loop {
246 tokio::select! {
247 biased; _ = shutdown.recv() => {
250 tracing::info!("Shutdown signal received, stopping SyncDaemon");
251 break;
252 }
253 _ = sync_interval.tick() => {
254 cycles += 1;
255 tracing::info!("Starting sync cycle {}", cycles);
256
257 tokio::select! {
259 biased;
260 _ = shutdown.recv() => {
261 tracing::info!("Shutdown signal received during sync cycle, aborting");
262 break;
263 }
264 result = self.run_sync_cycle() => {
265 match result {
266 Ok(stats) => {
267 tracing::info!(
268 "Sync cycle {} completed: {} tables, {} rows in {}ms",
269 cycles,
270 stats.tables_synced,
271 stats.rows_synced,
272 stats.duration_ms
273 );
274 if !stats.errors.is_empty() {
275 tracing::warn!("Sync cycle had {} errors", stats.errors.len());
276 }
277 }
278 Err(e) => {
279 tracing::error!("Sync cycle {} failed: {}", cycles, e);
280 }
281 }
282 }
283 }
284 }
285 _ = async {
286 if let Some(ref mut interval) = reconcile_interval {
287 interval.tick().await
288 } else {
289 std::future::pending::<tokio::time::Instant>().await
290 }
291 } => {
292 reconcile_cycles += 1;
293 tracing::info!("Starting reconciliation cycle {}", reconcile_cycles);
294
295 tokio::select! {
297 biased;
298 _ = shutdown.recv() => {
299 tracing::info!("Shutdown signal received during reconciliation, aborting");
300 break;
301 }
302 result = self.run_reconciliation() => {
303 match result {
304 Ok(stats) => {
305 tracing::info!(
306 "Reconciliation cycle {} completed: {} tables, {} rows deleted in {}ms",
307 reconcile_cycles,
308 stats.tables_synced,
309 stats.rows_deleted,
310 stats.duration_ms
311 );
312 }
313 Err(e) => {
314 tracing::error!("Reconciliation cycle {} failed: {}", reconcile_cycles, e);
315 }
316 }
317 }
318 }
319 }
320 }
321 }
322
323 Ok(())
324 }
325
326 async fn sync_table(
328 &self,
329 reader: &XminReader<'_>,
330 writer: &ChangeWriter<'_>,
331 state: &mut SyncState,
332 schema: &str,
333 table: &str,
334 ) -> Result<u64> {
335 let table_state = state.get_or_create_table(schema, table);
337 let since_xmin = table_state.last_xmin;
338
339 let columns = get_table_columns(reader.client(), schema, table).await?;
341 let pk_columns = get_primary_key_columns(reader.client(), schema, table).await?;
342
343 if pk_columns.is_empty() {
344 anyhow::bail!("Table {}.{} has no primary key", schema, table);
345 }
346
347 let column_names: Vec<String> = columns.iter().map(|(name, _)| name.clone()).collect();
348
349 let (rows, max_xmin, was_full_sync) = reader
351 .read_changes_with_wraparound_check(schema, table, &column_names, since_xmin)
352 .await?;
353
354 if rows.is_empty() {
355 tracing::debug!(
356 "No changes in {}.{} since xmin {}",
357 schema,
358 table,
359 since_xmin
360 );
361 return Ok(0);
362 }
363
364 if was_full_sync {
365 tracing::warn!(
366 "xmin wraparound detected for {}.{} - performed full table sync ({} rows)",
367 schema,
368 table,
369 rows.len()
370 );
371 } else {
372 tracing::info!(
373 "Found {} changed rows in {}.{} (xmin {} -> {})",
374 rows.len(),
375 schema,
376 table,
377 since_xmin,
378 max_xmin
379 );
380 }
381
382 let values: Vec<Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>> = rows
384 .iter()
385 .map(|row| row_to_values(row, &columns))
386 .collect();
387
388 let affected = writer
390 .apply_batch(schema, table, &pk_columns, &column_names, values)
391 .await?;
392
393 state.update_table(schema, table, max_xmin, affected);
395
396 Ok(affected)
397 }
398
399 async fn load_or_create_state(&self) -> Result<SyncState> {
401 if self.config.state_path.exists() {
402 match SyncState::load(&self.config.state_path).await {
403 Ok(state) => {
404 tracing::info!(
405 "Loaded existing sync state from {:?}",
406 self.config.state_path
407 );
408 return Ok(state);
409 }
410 Err(e) => {
411 tracing::warn!(
412 "Failed to load sync state from {:?}: {}. Creating new state.",
413 self.config.state_path,
414 e
415 );
416 }
417 }
418 }
419
420 tracing::info!("Creating new sync state");
421 Ok(SyncState::new(&self.source_url, &self.target_url))
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428
429 #[test]
430 fn test_daemon_config_default() {
431 let config = DaemonConfig::default();
432 assert_eq!(config.sync_interval, Duration::from_secs(3600));
433 assert_eq!(config.reconcile_interval, Some(Duration::from_secs(86400)));
434 assert_eq!(config.batch_size, 1000);
435 assert_eq!(config.schema, "public");
436 }
437
438 #[test]
439 fn test_sync_stats_success() {
440 let stats = SyncStats {
441 tables_synced: 5,
442 rows_synced: 100,
443 rows_deleted: 0,
444 errors: vec![],
445 duration_ms: 500,
446 };
447 assert!(stats.is_success());
448 }
449
450 #[test]
451 fn test_sync_stats_with_errors() {
452 let stats = SyncStats {
453 tables_synced: 4,
454 rows_synced: 80,
455 rows_deleted: 0,
456 errors: vec!["Failed to sync table X".to_string()],
457 duration_ms: 500,
458 };
459 assert!(!stats.is_success());
460 }
461}