rhei_sync/background.rs
1//! Background sync loop that continuously drives CDC-to-OLAP replication.
2//!
3//! The single public entry point [`spawn_sync_loop`] starts a Tokio task that
4//! calls [`rhei_core::SyncEngine::sync_once`] on a fixed interval. A DDL read-lock
5//! prevents schema mutations from racing with an in-progress cycle. Shutdown
6//! is cooperative via a [`tokio_util::sync::CancellationToken`].
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info};
14
15use crate::sync_engine::CdcSyncEngine;
16
17/// Spawn a background task that continuously polls CDC and syncs to OLAP.
18///
19/// The loop runs `sync_once()` on the given interval, logging results and errors.
20/// It shuts down gracefully when the `CancellationToken` is cancelled.
21///
22/// The `ddl_lock` is acquired as a read lock before each cycle to prevent
23/// interleaving with schema changes (which take a write lock).
24///
25/// # Returns
26/// A `JoinHandle` for the background task. The caller retains the
27/// `CancellationToken` and is responsible for cancelling it to stop the loop.
28pub fn spawn_sync_loop<C, O>(
29 engine: Arc<CdcSyncEngine<C, O>>,
30 interval: Duration,
31 cancel: CancellationToken,
32 ddl_lock: Arc<tokio::sync::RwLock<()>>,
33) -> JoinHandle<()>
34where
35 C: rhei_core::CdcConsumer + Send + Sync + 'static,
36 O: rhei_core::OlapEngine + Send + Sync + 'static,
37{
38 tokio::spawn(async move {
39 info!(?interval, "background sync loop started");
40
41 loop {
42 tokio::select! {
43 _ = cancel.cancelled() => {
44 info!("background sync loop shutting down");
45 break;
46 }
47 _ = tokio::time::sleep(interval) => {
48 let _guard = ddl_lock.read().await;
49
50 match rhei_core::SyncEngine::sync_once(engine.as_ref()).await {
51 Ok(result) => {
52 if result.events_processed > 0 {
53 debug!(
54 events = result.events_processed,
55 inserted = result.rows_inserted,
56 updated = result.rows_updated,
57 deleted = result.rows_deleted,
58 pruned = ?result.pruned_count,
59 "background sync cycle"
60 );
61 }
62 }
63 Err(e) => {
64 error!(error = %e, "background sync cycle failed");
65 }
66 }
67 }
68 }
69 }
70
71 info!("background sync loop stopped");
72 })
73}