use std::sync::atomic::{AtomicI64, Ordering};
use rhei_core::types::{CdcEvent, CdcOperation, SyncMode, SyncResult, SyncStatus};
use tracing::{debug, warn};
const NEVER_SYNCED: i64 = -1;
use crate::converter::{build_batch_insert, cdc_event_to_dml, cdc_events_to_batch};
use crate::error::SyncError;
use crate::temporal_converter::{
build_temporal_batch_insert, cdc_event_to_temporal_dml, cdc_events_to_temporal_batch,
};
const STALE_SCHEMA_PATTERNS: &[&str] = &["column", "No field named", "schema mismatch"];
fn is_stale_schema_error(msg: &str) -> bool {
(msg.contains("not found") && STALE_SCHEMA_PATTERNS.iter().any(|p| msg.contains(p)))
|| msg.contains("No field named")
|| msg.contains("schema mismatch")
|| msg.contains("not in INSERT column list")
}
pub struct CdcSyncEngine<C, O> {
cdc: C,
olap: O,
schema_registry: rhei_core::SchemaRegistry,
last_synced_seq: AtomicI64,
batch_size: u32,
prune_after_sync: bool,
sync_mode: SyncMode,
}
impl<C, O> CdcSyncEngine<C, O>
where
C: rhei_core::CdcConsumer,
O: rhei_core::OlapEngine,
{
pub fn new(
cdc: C,
olap: O,
schema_registry: rhei_core::SchemaRegistry,
batch_size: u32,
) -> Self {
Self {
cdc,
olap,
schema_registry,
last_synced_seq: AtomicI64::new(NEVER_SYNCED),
batch_size,
prune_after_sync: true,
sync_mode: SyncMode::default(),
}
}
pub fn with_prune_after_sync(mut self, prune: bool) -> Self {
self.prune_after_sync = prune;
self
}
pub fn with_sync_mode(mut self, mode: SyncMode) -> Self {
self.sync_mode = mode;
self
}
}
enum SyncOp<'a> {
BatchInsert {
table: &'a str,
events: Vec<&'a CdcEvent>,
},
Single(&'a CdcEvent),
}
fn group_events(events: &[CdcEvent]) -> Vec<SyncOp<'_>> {
let mut ops: Vec<SyncOp<'_>> = Vec::new();
let mut i = 0;
while i < events.len() {
let event = &events[i];
if event.operation == CdcOperation::Insert {
let table = &event.table;
let mut batch: Vec<&CdcEvent> = vec![event];
let mut j = i + 1;
while j < events.len()
&& events[j].operation == CdcOperation::Insert
&& events[j].table == *table
{
batch.push(&events[j]);
j += 1;
}
ops.push(SyncOp::BatchInsert {
table,
events: batch,
});
i = j;
} else {
ops.push(SyncOp::Single(event));
i += 1;
}
}
ops
}
impl<C, O> rhei_core::SyncEngine for CdcSyncEngine<C, O>
where
C: rhei_core::CdcConsumer,
O: rhei_core::OlapEngine,
{
type Error = SyncError;
async fn sync_once(&self) -> Result<SyncResult, Self::Error> {
let raw_seq = self.last_synced_seq.load(Ordering::Relaxed);
let after_seq = if raw_seq == NEVER_SYNCED {
None
} else {
Some(raw_seq)
};
let events = self
.cdc
.poll(after_seq, self.batch_size)
.await
.map_err(|e| SyncError::Cdc(e.to_string()))?;
if events.is_empty() {
return Ok(SyncResult {
events_processed: 0,
rows_inserted: 0,
rows_updated: 0,
rows_deleted: 0,
last_seq: after_seq,
pruned_count: None,
});
}
debug!(count = events.len(), "processing CDC events");
let mut rows_inserted: u64 = 0;
let mut rows_updated: u64 = 0;
let mut rows_deleted: u64 = 0;
let mut last_seq = after_seq;
let ops = group_events(&events);
let use_transaction = self.olap.supports_transactions();
if use_transaction {
self.olap
.execute("BEGIN TRANSACTION")
.await
.map_err(|e| SyncError::Olap(e.to_string()))?;
}
let result = async {
for op in &ops {
match op {
SyncOp::BatchInsert {
table,
events: batch_events,
} => {
let schema = match self.schema_registry.get(table) {
Ok(s) => s,
Err(_) => {
warn!(table, "skipping CDC events for unregistered table");
if let Some(last) = batch_events.last() {
last_seq = Some(last.seq);
}
continue;
}
};
let arrow_result = match self.sync_mode {
SyncMode::Destructive => cdc_events_to_batch(batch_events, &schema),
SyncMode::Temporal => {
cdc_events_to_temporal_batch(batch_events, &schema)
}
};
let used_arrow = match arrow_result {
Ok(batch) => {
if let Err(e) = self
.olap
.load_arrow(table, std::slice::from_ref(&batch))
.await
{
let msg = e.to_string();
if is_stale_schema_error(&msg) {
warn!(
table,
error = %msg,
"skipping batch INSERT due to stale schema \
(column mismatch)"
);
if let Some(last) = batch_events.last() {
last_seq = Some(last.seq);
}
continue;
}
return Err(SyncError::Olap(msg));
}
true
}
Err(SyncError::UnsupportedType(ref reason)) => {
warn!(
table,
reason,
"falling back to SQL batch INSERT (unsupported Arrow type)"
);
false
}
Err(e) => return Err(e),
};
if !used_arrow {
let sql = match self.sync_mode {
SyncMode::Destructive => build_batch_insert(batch_events, &schema)?,
SyncMode::Temporal => {
build_temporal_batch_insert(batch_events, &schema)?
}
};
if let Err(e) = self.olap.execute(&sql).await {
let msg = e.to_string();
if is_stale_schema_error(&msg) {
warn!(
table,
error = %msg,
"skipping batch INSERT due to stale schema \
(column mismatch, SQL path)"
);
if let Some(last) = batch_events.last() {
last_seq = Some(last.seq);
}
continue;
}
return Err(SyncError::Olap(msg));
}
}
rows_inserted += batch_events.len() as u64;
if let Some(last) = batch_events.last() {
last_seq = Some(last.seq);
}
}
SyncOp::Single(event) => {
let schema = match self.schema_registry.get(&event.table) {
Ok(s) => s,
Err(_) => {
warn!(
table = event.table.as_str(),
"skipping CDC event for unregistered table"
);
last_seq = Some(event.seq);
continue;
}
};
let skip = match self.sync_mode {
SyncMode::Destructive => {
let dml = cdc_event_to_dml(event, &schema)?;
if dml.is_empty() {
last_seq = Some(event.seq);
continue;
}
match self.olap.execute(&dml).await {
Ok(_) => false,
Err(e) => {
let msg = e.to_string();
if is_stale_schema_error(&msg) {
warn!(
table = event.table.as_str(),
error = %msg,
"skipping event due to stale schema"
);
true
} else {
return Err(SyncError::Olap(msg));
}
}
}
}
SyncMode::Temporal => {
let stmts = cdc_event_to_temporal_dml(event, &schema)?;
if stmts.len() > 1 {
if let Some(insert_stmt) = stmts.last() {
let explain = format!("EXPLAIN {insert_stmt}");
if let Err(e) = self.olap.query(&explain).await {
let msg = e.to_string();
if is_stale_schema_error(&msg) {
warn!(
table = event.table.as_str(),
error = %msg,
"skipping temporal event due to stale schema \
(detected before execution)"
);
last_seq = Some(event.seq);
continue;
}
return Err(SyncError::Olap(msg));
}
}
}
let mut skipped = false;
for stmt in &stmts {
if let Err(e) = self.olap.execute(stmt).await {
let msg = e.to_string();
if is_stale_schema_error(&msg) {
warn!(
table = event.table.as_str(),
error = %msg,
"skipping temporal event due to stale schema"
);
skipped = true;
break;
}
return Err(SyncError::Olap(msg));
}
}
skipped
}
};
if skip {
last_seq = Some(event.seq);
continue;
}
match event.operation {
CdcOperation::Insert => rows_inserted += 1,
CdcOperation::Update => rows_updated += 1,
CdcOperation::Delete => rows_deleted += 1,
}
last_seq = Some(event.seq);
}
}
}
Ok::<(), SyncError>(())
}
.await;
match result {
Ok(()) => {
if use_transaction {
self.olap
.execute("COMMIT")
.await
.map_err(|e| SyncError::Olap(e.to_string()))?;
}
}
Err(e) => {
if use_transaction {
let _ = self.olap.execute("ROLLBACK").await;
return Err(e);
}
if let Some(seq) = last_seq {
self.last_synced_seq.store(seq, Ordering::Relaxed);
}
return Err(e);
}
}
if let Some(seq) = last_seq {
self.last_synced_seq.store(seq, Ordering::Relaxed);
}
let pruned_count = if self.prune_after_sync {
if let Some(seq) = last_seq {
match self.cdc.prune(seq).await {
Ok(count) => {
debug!(pruned = count, up_to_seq = seq, "pruned CDC events");
Some(count)
}
Err(e) => {
warn!(error = %e, "failed to prune CDC events");
None
}
}
} else {
None
}
} else {
None
};
let events_processed = events.len() as u64;
debug!(
events_processed,
rows_inserted, rows_updated, rows_deleted, "sync cycle complete"
);
#[cfg(feature = "metrics")]
{
metrics::counter!("rhei.sync.events_processed").increment(events_processed);
metrics::counter!("rhei.sync.rows_inserted").increment(rows_inserted);
metrics::counter!("rhei.sync.rows_updated").increment(rows_updated);
metrics::counter!("rhei.sync.rows_deleted").increment(rows_deleted);
if let Some(p) = pruned_count {
metrics::counter!("rhei.sync.rows_pruned").increment(p);
}
}
Ok(SyncResult {
events_processed,
rows_inserted,
rows_updated,
rows_deleted,
last_seq,
pruned_count,
})
}
async fn status(&self) -> Result<SyncStatus, Self::Error> {
let raw_seq = self.last_synced_seq.load(Ordering::Relaxed);
let last_synced = if raw_seq == NEVER_SYNCED {
None
} else {
Some(raw_seq)
};
let latest_available = self
.cdc
.latest_seq()
.await
.map_err(|e| SyncError::Cdc(e.to_string()))?;
let lag = match (last_synced, latest_available) {
(Some(synced), Some(available)) => (available - synced).max(0) as u64,
(None, Some(available)) => available.max(0) as u64,
_ => 0,
};
Ok(SyncStatus {
running: true,
last_synced_seq: last_synced,
latest_available_seq: latest_available,
lag,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use rhei_core::types::{CdcEvent, CdcOperation};
use rhei_core::{SchemaRegistry, TableSchema};
use serde_json::json;
struct MockCdc {
events: Mutex<Option<Vec<CdcEvent>>>,
}
impl MockCdc {
fn new(events: Vec<CdcEvent>) -> Self {
Self {
events: Mutex::new(Some(events)),
}
}
}
impl rhei_core::CdcConsumer for MockCdc {
type Error = crate::SyncError;
async fn poll(
&self,
_after_seq: Option<i64>,
_limit: u32,
) -> Result<Vec<CdcEvent>, Self::Error> {
Ok(self.events.lock().unwrap().take().unwrap_or_default())
}
async fn latest_seq(&self) -> Result<Option<i64>, Self::Error> {
Ok(None)
}
async fn prune(&self, _up_to_seq: i64) -> Result<u64, Self::Error> {
Ok(0)
}
}
struct MockOlap {
executed: Mutex<Vec<String>>,
}
impl MockOlap {
fn new() -> Self {
Self {
executed: Mutex::new(Vec::new()),
}
}
fn statements(&self) -> Vec<String> {
self.executed.lock().unwrap().clone()
}
}
impl rhei_core::OlapEngine for MockOlap {
type Error = crate::SyncError;
async fn query(&self, _sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
Ok(vec![])
}
async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
self.executed.lock().unwrap().push(sql.to_string());
Ok(0)
}
async fn load_arrow(
&self,
_table: &str,
_batches: &[RecordBatch],
) -> Result<u64, Self::Error> {
Err(crate::SyncError::Olap(
"load_arrow not supported in mock".into(),
))
}
async fn create_table(
&self,
_table_name: &str,
_schema: &SchemaRef,
_primary_key: &[String],
) -> Result<(), Self::Error> {
Ok(())
}
async fn table_exists(&self, _table_name: &str) -> Result<bool, Self::Error> {
Ok(true)
}
async fn add_column(
&self,
_table_name: &str,
_column_name: &str,
_data_type: &DataType,
) -> Result<(), Self::Error> {
Ok(())
}
async fn drop_column(
&self,
_table_name: &str,
_column_name: &str,
) -> Result<(), Self::Error> {
Ok(())
}
}
struct MockOlapArrow {
loaded: Mutex<u64>,
executed: Mutex<Vec<String>>,
}
impl MockOlapArrow {
fn new() -> Self {
Self {
loaded: Mutex::new(0),
executed: Mutex::new(Vec::new()),
}
}
}
impl rhei_core::OlapEngine for MockOlapArrow {
type Error = crate::SyncError;
async fn query(&self, _sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
Ok(vec![])
}
async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
self.executed.lock().unwrap().push(sql.to_string());
Ok(0)
}
async fn load_arrow(
&self,
_table: &str,
batches: &[RecordBatch],
) -> Result<u64, Self::Error> {
let n: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
*self.loaded.lock().unwrap() += n;
Ok(n)
}
async fn create_table(
&self,
_table_name: &str,
_schema: &SchemaRef,
_primary_key: &[String],
) -> Result<(), Self::Error> {
Ok(())
}
async fn table_exists(&self, _table_name: &str) -> Result<bool, Self::Error> {
Ok(true)
}
async fn add_column(
&self,
_table_name: &str,
_column_name: &str,
_data_type: &DataType,
) -> Result<(), Self::Error> {
Ok(())
}
async fn drop_column(
&self,
_table_name: &str,
_column_name: &str,
) -> Result<(), Self::Error> {
Ok(())
}
}
fn timestamp_schema() -> Arc<TableSchema> {
use arrow::datatypes::TimeUnit;
Arc::new(TableSchema::new(
"events",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
])),
vec!["id".to_string()],
))
}
fn make_insert_event(seq: i64, id: i64, name: &str) -> CdcEvent {
CdcEvent {
seq,
timestamp: 1000 + seq,
operation: CdcOperation::Insert,
table: "events".into(),
row_id: Some(id),
old_data: None,
new_data: Some(json!({"id": id, "name": name, "created_at": 1234567890})),
}
}
#[tokio::test]
async fn test_timestamp_schema_falls_back_to_sql() {
use rhei_core::SyncEngine;
let schema = timestamp_schema();
let events = vec![
make_insert_event(1, 1, "Alice"),
make_insert_event(2, 2, "Bob"),
];
let registry = SchemaRegistry::default();
registry
.register((*schema).clone())
.expect("register schema");
let cdc = MockCdc::new(events);
let olap = MockOlap::new();
let engine = CdcSyncEngine::new(cdc, olap, registry, 100);
let result = engine.sync_once().await;
let result = result.expect("sync_once should not error for Timestamp schema");
assert_eq!(result.rows_inserted, 2, "both rows should be counted");
let stmts = engine.olap.statements();
let insert_stmts: Vec<_> = stmts
.iter()
.filter(|s| s.to_uppercase().contains("INSERT"))
.collect();
assert_eq!(
insert_stmts.len(),
1,
"expected one batch INSERT via SQL fallback"
);
assert!(
insert_stmts[0].contains("Alice") && insert_stmts[0].contains("Bob"),
"batch INSERT should contain both rows"
);
}
#[tokio::test]
async fn test_supported_schema_uses_arrow_path() {
use rhei_core::SyncEngine;
let schema = Arc::new(TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
])),
vec!["id".to_string()],
));
let events = vec![
CdcEvent {
seq: 1,
timestamp: 1000,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(1),
old_data: None,
new_data: Some(json!({"id": 1, "name": "Alice"})),
},
CdcEvent {
seq: 2,
timestamp: 1001,
operation: CdcOperation::Insert,
table: "users".into(),
row_id: Some(2),
old_data: None,
new_data: Some(json!({"id": 2, "name": "Bob"})),
},
];
let registry = SchemaRegistry::default();
registry
.register((*schema).clone())
.expect("register schema");
let cdc = MockCdc::new(events);
let olap = MockOlapArrow::new();
let engine = CdcSyncEngine::new(cdc, olap, registry, 100);
let result = engine.sync_once().await.expect("sync_once should succeed");
assert_eq!(result.rows_inserted, 2);
let loaded = *engine.olap.loaded.lock().unwrap();
assert_eq!(loaded, 2, "load_arrow should have received 2 rows");
let executed = engine.olap.executed.lock().unwrap().clone();
let has_insert = executed.iter().any(|s| s.to_uppercase().contains("INSERT"));
assert!(
!has_insert,
"SQL INSERT should not be called when Arrow path succeeds"
);
}
}