Skip to main content

rhei_sync/
background.rs

1//! Background sync loop that continuously drives CDC-to-OLAP replication.
2//!
3//! The single public entry point [`spawn_sync_loop`] starts a Tokio task that
4//! calls [`rhei_core::SyncEngine::sync_once`] on a fixed interval.  A DDL read-lock
5//! prevents schema mutations from racing with an in-progress cycle.  Shutdown
6//! is cooperative via a [`tokio_util::sync::CancellationToken`].
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info};
14
15use crate::sync_engine::CdcSyncEngine;
16
17/// Spawn a background task that continuously polls CDC and syncs to OLAP.
18///
19/// The loop runs `sync_once()` on the given interval, logging results and errors.
20/// It shuts down gracefully when the `CancellationToken` is cancelled.
21///
22/// The `ddl_lock` is acquired as a read lock before each cycle to prevent
23/// interleaving with schema changes (which take a write lock).
24///
25/// # Returns
26/// A `JoinHandle` for the background task. The caller retains the
27/// `CancellationToken` and is responsible for cancelling it to stop the loop.
28pub fn spawn_sync_loop<C, O>(
29    engine: Arc<CdcSyncEngine<C, O>>,
30    interval: Duration,
31    cancel: CancellationToken,
32    ddl_lock: Arc<tokio::sync::RwLock<()>>,
33) -> JoinHandle<()>
34where
35    C: rhei_core::CdcConsumer + Send + Sync + 'static,
36    O: rhei_core::OlapEngine + Send + Sync + 'static,
37{
38    tokio::spawn(async move {
39        info!(?interval, "background sync loop started");
40
41        loop {
42            tokio::select! {
43                _ = cancel.cancelled() => {
44                    info!("background sync loop shutting down");
45                    break;
46                }
47                _ = tokio::time::sleep(interval) => {
48                    let _guard = ddl_lock.read().await;
49
50                    match rhei_core::SyncEngine::sync_once(engine.as_ref()).await {
51                        Ok(result) => {
52                            if result.events_processed > 0 {
53                                debug!(
54                                    events = result.events_processed,
55                                    inserted = result.rows_inserted,
56                                    updated = result.rows_updated,
57                                    deleted = result.rows_deleted,
58                                    pruned = ?result.pruned_count,
59                                    "background sync cycle"
60                                );
61                            }
62                        }
63                        Err(e) => {
64                            error!(error = %e, "background sync cycle failed");
65                        }
66                    }
67                }
68            }
69        }
70
71        info!("background sync loop stopped");
72    })
73}