use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use crate::sync_engine::CdcSyncEngine;
pub fn spawn_sync_loop<C, O>(
engine: Arc<CdcSyncEngine<C, O>>,
interval: Duration,
cancel: CancellationToken,
ddl_lock: Arc<tokio::sync::RwLock<()>>,
) -> JoinHandle<()>
where
C: rhei_core::CdcConsumer + Send + Sync + 'static,
O: rhei_core::OlapEngine + Send + Sync + 'static,
{
tokio::spawn(async move {
info!(?interval, "background sync loop started");
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("background sync loop shutting down");
break;
}
_ = tokio::time::sleep(interval) => {
let _guard = ddl_lock.read().await;
match rhei_core::SyncEngine::sync_once(engine.as_ref()).await {
Ok(result) => {
if result.events_processed > 0 {
debug!(
events = result.events_processed,
inserted = result.rows_inserted,
updated = result.rows_updated,
deleted = result.rows_deleted,
pruned = ?result.pruned_count,
"background sync cycle"
);
}
}
Err(e) => {
error!(error = %e, "background sync cycle failed");
}
}
}
}
}
info!("background sync loop stopped");
})
}