rhei-sync 1.5.0

CDC sync engine and query router for Rhei
Documentation
//! Background sync loop that continuously drives CDC-to-OLAP replication.
//!
//! The single public entry point [`spawn_sync_loop`] starts a Tokio task that
//! calls [`rhei_core::SyncEngine::sync_once`] on a fixed interval.  A DDL read-lock
//! prevents schema mutations from racing with an in-progress cycle.  Shutdown
//! is cooperative via a [`tokio_util::sync::CancellationToken`].

use std::sync::Arc;
use std::time::Duration;

use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

use crate::sync_engine::CdcSyncEngine;

/// Spawn a background task that continuously polls CDC and syncs to OLAP.
///
/// The loop runs `sync_once()` on the given interval, logging results and errors.
/// It shuts down gracefully when the `CancellationToken` is cancelled.
///
/// The `ddl_lock` is acquired as a read lock before each cycle to prevent
/// interleaving with schema changes (which take a write lock).
///
/// # Returns
/// A `JoinHandle` for the background task. The caller retains the
/// `CancellationToken` and is responsible for cancelling it to stop the loop.
pub fn spawn_sync_loop<C, O>(
    engine: Arc<CdcSyncEngine<C, O>>,
    interval: Duration,
    cancel: CancellationToken,
    ddl_lock: Arc<tokio::sync::RwLock<()>>,
) -> JoinHandle<()>
where
    C: rhei_core::CdcConsumer + Send + Sync + 'static,
    O: rhei_core::OlapEngine + Send + Sync + 'static,
{
    tokio::spawn(async move {
        info!(?interval, "background sync loop started");

        loop {
            tokio::select! {
                _ = cancel.cancelled() => {
                    info!("background sync loop shutting down");
                    break;
                }
                _ = tokio::time::sleep(interval) => {
                    let _guard = ddl_lock.read().await;

                    match rhei_core::SyncEngine::sync_once(engine.as_ref()).await {
                        Ok(result) => {
                            if result.events_processed > 0 {
                                debug!(
                                    events = result.events_processed,
                                    inserted = result.rows_inserted,
                                    updated = result.rows_updated,
                                    deleted = result.rows_deleted,
                                    pruned = ?result.pruned_count,
                                    "background sync cycle"
                                );
                            }
                        }
                        Err(e) => {
                            error!(error = %e, "background sync cycle failed");
                        }
                    }
                }
            }
        }

        info!("background sync loop stopped");
    })
}