use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use rhei_core::types::{CdcEvent, CdcOperation};
use crate::config::{DeleteDetection, TimestampCdcConfig, TimestampTableConfig};
use crate::error::SidecarError;
use crate::source::SourceConnector;
use crate::watermark::{NullWatermarkStore, Watermark, WatermarkStore};
struct PollSnapshot {
watermarks: HashMap<String, Watermark>,
global_seq: i64,
}
pub struct TimestampCdcConsumer<S: SourceConnector> {
conn: Arc<Mutex<S>>,
config: TimestampCdcConfig,
watermarks: Mutex<HashMap<String, Watermark>>,
global_seq: AtomicI64,
watermark_store: Box<dyn WatermarkStore>,
}
impl<S: SourceConnector> TimestampCdcConsumer<S> {
pub fn new(conn: S, config: TimestampCdcConfig) -> Self {
Self::with_watermark_store(conn, config, Box::new(NullWatermarkStore))
}
pub fn with_watermark_store(
conn: S,
config: TimestampCdcConfig,
store: Box<dyn WatermarkStore>,
) -> Self {
let persisted_seq = match store.load_global_seq() {
Ok(seq) => seq,
Err(e) => {
warn!(error = %e, "failed to load global_seq from watermark store; resetting to 1 (may cause duplicate events)");
1
}
};
let mut watermarks = HashMap::new();
for table in &config.tables {
let wm = match store.load(&table.table_name) {
Ok(Some(wm)) => wm,
Ok(None) => Watermark::new(),
Err(e) => {
warn!(
table = %table.table_name,
error = %e,
"failed to load watermark from store; resetting to zero (may cause duplicate events)"
);
Watermark::new()
}
};
watermarks.insert(table.table_name.clone(), wm);
}
Self {
conn: Arc::new(Mutex::new(conn)),
config,
watermarks: Mutex::new(watermarks),
global_seq: AtomicI64::new(persisted_seq),
watermark_store: store,
}
}
pub fn try_with_watermark_store(
conn: S,
config: TimestampCdcConfig,
store: Box<dyn WatermarkStore>,
) -> Result<Self, SidecarError> {
let persisted_seq = store.load_global_seq()?;
let mut watermarks = HashMap::new();
for table in &config.tables {
let wm = store
.load(&table.table_name)?
.unwrap_or_else(Watermark::new);
watermarks.insert(table.table_name.clone(), wm);
}
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
config,
watermarks: Mutex::new(watermarks),
global_seq: AtomicI64::new(persisted_seq),
watermark_store: store,
})
}
pub(crate) fn build_poll_query(
table_config: &TimestampTableConfig,
wm: &Watermark,
limit: u32,
) -> String {
let columns = if table_config.columns.is_empty() {
"*".to_string()
} else {
table_config.columns.join(", ")
};
let updated_at = &table_config.updated_at_column;
let pk_cols = &table_config.primary_key;
let where_clause = match &wm.last_pk {
Some(last_pk_values) if last_pk_values.len() == pk_cols.len() => {
let pk_tuple = pk_cols.join(", ");
let val_tuple: String = last_pk_values
.iter()
.map(|v| pk_to_sql_literal(v))
.collect::<Vec<_>>()
.join(", ");
format!(
"WHERE {updated_at} > {ts} OR ({updated_at} = {ts} AND ({pk_tuple}) > ({val_tuple}))",
ts = wm.timestamp,
)
}
_ => format!("WHERE {updated_at} > {}", wm.timestamp),
};
let order_cols: Vec<String> = std::iter::once(format!("{updated_at} ASC"))
.chain(pk_cols.iter().map(|pk| format!("{pk} ASC")))
.collect();
let order_by = order_cols.join(", ");
format!(
"SELECT {columns} FROM {} {where_clause} ORDER BY {order_by} LIMIT {limit}",
table_config.table_name,
)
}
pub(crate) fn build_soft_delete_query(
table_config: &TimestampTableConfig,
delete_col: &str,
wm: &Watermark,
limit: u32,
) -> String {
let columns = if table_config.columns.is_empty() {
"*".to_string()
} else {
table_config.columns.join(", ")
};
let pk_cols = &table_config.primary_key;
let order_cols: Vec<String> = std::iter::once(format!("{delete_col} ASC"))
.chain(pk_cols.iter().map(|pk| format!("{pk} ASC")))
.collect();
let order_by = order_cols.join(", ");
format!(
"SELECT {columns} FROM {} WHERE {delete_col} IS NOT NULL AND {delete_col} > {} ORDER BY {order_by} LIMIT {limit}",
table_config.table_name,
wm.timestamp,
)
}
fn extract_pk_values(
row_data: &serde_json::Map<String, serde_json::Value>,
pk_cols: &[String],
) -> Vec<String> {
pk_cols
.iter()
.map(|col| {
row_data
.get(col)
.map(|v| v.to_string())
.unwrap_or_else(|| "null".to_string())
})
.collect()
}
fn row_to_event(
batch: &RecordBatch,
row_idx: usize,
table_config: &TimestampTableConfig,
seq: i64,
) -> Result<CdcEvent, SidecarError> {
let schema = batch.schema();
let mut row_data = serde_json::Map::new();
for (col_idx, field) in schema.fields().iter().enumerate() {
let col = batch.column(col_idx);
let val = array_value_to_json(col, row_idx);
row_data.insert(field.name().clone(), val);
}
let ts_val = row_data
.get(&table_config.updated_at_column)
.and_then(|v| v.as_i64())
.unwrap_or(0);
let created_at_val = row_data
.get(&table_config.created_at_column)
.and_then(|v| v.as_i64())
.unwrap_or(0);
let operation = if created_at_val == ts_val {
CdcOperation::Insert
} else {
CdcOperation::Update
};
let row_id = row_data
.get(&table_config.primary_key[0])
.and_then(|v| v.as_i64());
Ok(CdcEvent {
seq,
timestamp: ts_val,
operation,
table: table_config.table_name.clone(),
row_id,
old_data: None, new_data: Some(serde_json::Value::Object(row_data)),
})
}
async fn exec_query(&self, sql: String) -> Result<Vec<RecordBatch>, SidecarError> {
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || {
let mut conn = conn.blocking_lock();
conn.query(&sql)
})
.await
.map_err(|e| SidecarError::Join(e.to_string()))?
}
fn persist_watermarks(
&self,
watermarks: &HashMap<String, Watermark>,
seq: i64,
) -> Result<(), SidecarError> {
for (table, wm) in watermarks {
self.watermark_store.save(table, wm)?;
}
self.watermark_store.save_global_seq(seq)?;
Ok(())
}
}
fn pk_to_sql_literal(pk: &str) -> String {
if pk.parse::<i64>().is_ok() || pk.parse::<f64>().is_ok() {
pk.to_string()
} else {
let unquoted = pk.trim_matches('"');
format!("'{}'", unquoted.replace('\'', "''"))
}
}
fn array_value_to_json(array: &dyn Array, idx: usize) -> serde_json::Value {
use arrow::array::*;
if array.is_null(idx) {
return serde_json::Value::Null;
}
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
serde_json::Value::Number(arr.value(idx).into())
} else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
serde_json::Value::Number((arr.value(idx) as i64).into())
} else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
serde_json::Value::Number((arr.value(idx) as i64).into())
} else if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
serde_json::json!(arr.value(idx))
} else if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
serde_json::json!(arr.value(idx) as f64)
} else if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
serde_json::Value::String(arr.value(idx).to_string())
} else if let Some(arr) = array.as_any().downcast_ref::<LargeStringArray>() {
serde_json::Value::String(arr.value(idx).to_string())
} else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
serde_json::Value::Bool(arr.value(idx))
} else {
warn!(data_type = ?array.data_type(), "unsupported Arrow type in array_value_to_json, returning null");
serde_json::Value::Null
}
}
impl<S: SourceConnector> rhei_core::CdcConsumer for TimestampCdcConsumer<S> {
type Error = SidecarError;
async fn poll(
&self,
_after_seq: Option<i64>,
limit: u32,
) -> Result<Vec<CdcEvent>, Self::Error> {
let mut all_events = Vec::new();
let mut watermarks = self.watermarks.lock().await;
let snapshot = PollSnapshot {
watermarks: watermarks.clone(),
global_seq: self.global_seq.load(Ordering::Relaxed),
};
let mut next_seq = snapshot.global_seq;
for table_config in &self.config.tables {
let wm = watermarks
.get(&table_config.table_name)
.cloned()
.unwrap_or_else(Watermark::new);
let query = Self::build_poll_query(table_config, &wm, limit);
debug!(table = %table_config.table_name, query = %query, "polling external source");
let batches = self.exec_query(query).await?;
let mut last_ts = wm.timestamp;
let mut last_pk: Option<Vec<String>> = wm.last_pk.clone();
let events_before = all_events.len();
for batch in &batches {
for row in 0..batch.num_rows() {
let seq = self.global_seq.fetch_add(1, Ordering::Relaxed);
next_seq = seq + 1;
let event = Self::row_to_event(batch, row, table_config, seq)?;
last_ts = event.timestamp;
if let Some(ref new_data) = event.new_data {
let pk_values = Self::extract_pk_values(
new_data.as_object().unwrap(),
&table_config.primary_key,
);
last_pk = Some(pk_values);
}
all_events.push(event);
}
}
let table_had_events = all_events.len() > events_before;
if table_had_events {
if let Some(pk_values) = last_pk {
watermarks
.entry(table_config.table_name.clone())
.or_insert_with(Watermark::new)
.advance(last_ts, pk_values);
}
}
if let DeleteDetection::FullDiff { .. } = self.config.delete_detection {
warn!(table = %table_config.table_name, "FullDiff delete detection is not yet implemented; no deletes will be detected");
}
if let DeleteDetection::SoftDelete { ref column } = self.config.delete_detection {
let delete_query = Self::build_soft_delete_query(table_config, column, &wm, limit);
let delete_batches = self.exec_query(delete_query).await?;
for batch in &delete_batches {
let del_schema = batch.schema();
for row in 0..batch.num_rows() {
let seq = self.global_seq.fetch_add(1, Ordering::Relaxed);
next_seq = seq + 1;
let mut row_data = serde_json::Map::new();
for (col_idx, field) in del_schema.fields().iter().enumerate() {
let val = array_value_to_json(batch.column(col_idx), row);
row_data.insert(field.name().clone(), val);
}
let row_id = row_data
.get(&table_config.primary_key[0])
.and_then(|v| v.as_i64());
let delete_ts = row_data
.get(column)
.and_then(|v| v.as_i64())
.unwrap_or(wm.timestamp);
all_events.push(CdcEvent {
seq,
timestamp: delete_ts,
operation: CdcOperation::Delete,
table: table_config.table_name.clone(),
row_id,
old_data: Some(serde_json::Value::Object(row_data)),
new_data: None,
});
}
}
}
}
if !all_events.is_empty() {
if let Err(e) = self.persist_watermarks(&watermarks, next_seq) {
*watermarks = snapshot.watermarks;
self.global_seq
.store(snapshot.global_seq, Ordering::Relaxed);
return Err(e);
}
}
debug!(events = all_events.len(), "polled external source");
Ok(all_events)
}
async fn latest_seq(&self) -> Result<Option<i64>, Self::Error> {
let seq = self.global_seq.load(Ordering::Relaxed);
if seq <= 1 {
Ok(None)
} else {
Ok(Some(seq - 1))
}
}
async fn prune(&self, _up_to_seq: i64) -> Result<u64, Self::Error> {
Ok(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::*;
fn test_table_config() -> TimestampTableConfig {
TimestampTableConfig {
table_name: "events".to_string(),
created_at_column: "created_at".to_string(),
updated_at_column: "updated_at".to_string(),
primary_key: vec!["id".to_string()],
columns: vec![],
}
}
fn composite_pk_table_config() -> TimestampTableConfig {
TimestampTableConfig {
table_name: "order_items".to_string(),
created_at_column: "created_at".to_string(),
updated_at_column: "updated_at".to_string(),
primary_key: vec!["order_id".to_string(), "item_id".to_string()],
columns: vec![],
}
}
#[cfg(feature = "sqlite")]
type TestConsumer = TimestampCdcConsumer<connector_arrow::rusqlite::SQLiteConnection>;
#[test]
fn test_poll_query_generation() {
let config = test_table_config();
let wm = Watermark::new();
let query = TestConsumer::build_poll_query(&config, &wm, 100);
assert_eq!(
query,
"SELECT * FROM events WHERE updated_at > 0 ORDER BY updated_at ASC, id ASC LIMIT 100"
);
let mut wm2 = Watermark::new();
wm2.advance(1000, vec!["42".to_string()]);
let query2 = TestConsumer::build_poll_query(&config, &wm2, 50);
assert_eq!(
query2,
"SELECT * FROM events WHERE updated_at > 1000 OR (updated_at = 1000 AND (id) > (42)) ORDER BY updated_at ASC, id ASC LIMIT 50"
);
}
#[test]
fn test_poll_query_composite_pk() {
let config = composite_pk_table_config();
let wm = Watermark::new();
let query = TestConsumer::build_poll_query(&config, &wm, 100);
assert_eq!(
query,
"SELECT * FROM order_items WHERE updated_at > 0 ORDER BY updated_at ASC, order_id ASC, item_id ASC LIMIT 100"
);
let mut wm2 = Watermark::new();
wm2.advance(1000, vec!["10".to_string(), "3".to_string()]);
let query2 = TestConsumer::build_poll_query(&config, &wm2, 50);
assert_eq!(
query2,
"SELECT * FROM order_items WHERE updated_at > 1000 OR (updated_at = 1000 AND (order_id, item_id) > (10, 3)) ORDER BY updated_at ASC, order_id ASC, item_id ASC LIMIT 50"
);
}
#[test]
fn test_poll_query_composite_pk_string_values() {
let config = composite_pk_table_config();
let mut wm = Watermark::new();
wm.advance(500, vec!["\"tenant_a\"".to_string(), "42".to_string()]);
let query = TestConsumer::build_poll_query(&config, &wm, 10);
assert!(query.contains("('tenant_a', 42)"));
}
#[test]
fn test_poll_query_with_columns() {
let mut config = test_table_config();
config.columns = vec![
"id".to_string(),
"name".to_string(),
"updated_at".to_string(),
];
let wm = Watermark::new();
let query = TestConsumer::build_poll_query(&config, &wm, 10);
assert!(query.starts_with("SELECT id, name, updated_at FROM events"));
}
#[test]
fn test_soft_delete_query() {
let config = test_table_config();
let wm = Watermark::new();
let query = TestConsumer::build_soft_delete_query(&config, "deleted_at", &wm, 100);
assert!(query.contains("deleted_at IS NOT NULL"));
assert!(query.contains("deleted_at > 0"));
assert!(query.contains("ORDER BY deleted_at ASC, id ASC"));
}
#[test]
fn test_soft_delete_query_composite_pk() {
let config = composite_pk_table_config();
let wm = Watermark::new();
let query = TestConsumer::build_soft_delete_query(&config, "deleted_at", &wm, 50);
assert!(query.contains("ORDER BY deleted_at ASC, order_id ASC, item_id ASC"));
}
#[test]
fn test_insert_vs_update_heuristic() {
let schema = arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false),
arrow::datatypes::Field::new("created_at", arrow::datatypes::DataType::Int64, false),
arrow::datatypes::Field::new("updated_at", arrow::datatypes::DataType::Int64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(arrow::array::Int64Array::from(vec![1])),
Arc::new(arrow::array::Int64Array::from(vec![1000])),
Arc::new(arrow::array::Int64Array::from(vec![1000])),
],
)
.unwrap();
let config = test_table_config();
let event = TestConsumer::row_to_event(&batch, 0, &config, 1).unwrap();
assert_eq!(event.operation, CdcOperation::Insert);
let schema2 = arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false),
arrow::datatypes::Field::new("created_at", arrow::datatypes::DataType::Int64, false),
arrow::datatypes::Field::new("updated_at", arrow::datatypes::DataType::Int64, false),
]);
let batch2 = RecordBatch::try_new(
Arc::new(schema2),
vec![
Arc::new(arrow::array::Int64Array::from(vec![1])),
Arc::new(arrow::array::Int64Array::from(vec![1000])),
Arc::new(arrow::array::Int64Array::from(vec![2000])),
],
)
.unwrap();
let event2 = TestConsumer::row_to_event(&batch2, 0, &config, 2).unwrap();
assert_eq!(event2.operation, CdcOperation::Update);
}
#[cfg(feature = "sqlite")]
#[test]
fn test_prune_is_noop() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let conn = connector_arrow::rusqlite::SQLiteConnection::new(
connector_arrow::rusqlite::rusqlite::Connection::open_in_memory().unwrap(),
);
let config = TimestampCdcConfig {
tables: vec![],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
};
let consumer = TimestampCdcConsumer::new(conn, config);
rt.block_on(async {
use rhei_core::CdcConsumer;
let pruned = consumer.prune(999).await.unwrap();
assert_eq!(pruned, 0);
});
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_watermark_persistence_across_restarts() {
use crate::watermark::RocksDbWatermarkStore;
let dir = tempfile::tempdir().unwrap();
let wm_path = dir.path().join("wm_restart_test");
let wm_path_str = wm_path.to_str().unwrap().to_string();
let raw_conn = connector_arrow::rusqlite::rusqlite::Connection::open_in_memory().unwrap();
raw_conn
.execute_batch(
"CREATE TABLE events (id INTEGER PRIMARY KEY, name TEXT, created_at INTEGER, updated_at INTEGER);
INSERT INTO events VALUES (1, 'a', 100, 100);
INSERT INTO events VALUES (2, 'b', 200, 200);",
)
.unwrap();
let sqlite_conn = connector_arrow::rusqlite::SQLiteConnection::new(raw_conn);
let table_config = test_table_config();
let cdc_config = TimestampCdcConfig {
tables: vec![table_config],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
};
let store = Box::new(RocksDbWatermarkStore::open(&wm_path_str).unwrap());
let consumer =
TimestampCdcConsumer::with_watermark_store(sqlite_conn, cdc_config.clone(), store);
let events = {
use rhei_core::CdcConsumer;
consumer.poll(None, 100).await.unwrap()
};
assert_eq!(events.len(), 2);
drop(consumer);
let raw_conn2 = connector_arrow::rusqlite::rusqlite::Connection::open_in_memory().unwrap();
raw_conn2
.execute_batch(
"CREATE TABLE events (id INTEGER PRIMARY KEY, name TEXT, created_at INTEGER, updated_at INTEGER);
INSERT INTO events VALUES (1, 'a', 100, 100);
INSERT INTO events VALUES (2, 'b', 200, 200);",
)
.unwrap();
let sqlite_conn2 = connector_arrow::rusqlite::SQLiteConnection::new(raw_conn2);
let store2 = Box::new(RocksDbWatermarkStore::open(&wm_path_str).unwrap());
let consumer2 =
TimestampCdcConsumer::with_watermark_store(sqlite_conn2, cdc_config.clone(), store2);
let events2 = {
use rhei_core::CdcConsumer;
consumer2.poll(None, 100).await.unwrap()
};
assert_eq!(events2.len(), 0);
{
let conn_guard = consumer2.conn.clone();
tokio::task::spawn_blocking(move || {
let mut conn = conn_guard.blocking_lock();
conn.query("INSERT INTO events VALUES (3, 'c', 300, 300)")
.ok();
})
.await
.unwrap();
}
let events3 = {
use rhei_core::CdcConsumer;
consumer2.poll(None, 100).await.unwrap()
};
assert_eq!(events3.len(), 1);
assert_eq!(events3[0].table, "events");
}
struct FailingSaveStore {
inner: Box<dyn WatermarkStore>,
}
impl FailingSaveStore {
fn new(inner: Box<dyn WatermarkStore>) -> Self {
Self { inner }
}
}
impl WatermarkStore for FailingSaveStore {
fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError> {
self.inner.load(table_name)
}
fn save(&self, _table_name: &str, _wm: &Watermark) -> Result<(), SidecarError> {
Err(SidecarError::WatermarkStore(
"injected save failure".to_string(),
))
}
fn load_global_seq(&self) -> Result<i64, SidecarError> {
self.inner.load_global_seq()
}
fn save_global_seq(&self, _seq: i64) -> Result<(), SidecarError> {
Err(SidecarError::WatermarkStore(
"injected save_global_seq failure".to_string(),
))
}
}
struct FailingLoadStore;
impl WatermarkStore for FailingLoadStore {
fn load(&self, _table_name: &str) -> Result<Option<Watermark>, SidecarError> {
Err(SidecarError::WatermarkStore(
"injected load failure (corrupt store)".to_string(),
))
}
fn save(&self, _table_name: &str, _wm: &Watermark) -> Result<(), SidecarError> {
Ok(())
}
fn load_global_seq(&self) -> Result<i64, SidecarError> {
Err(SidecarError::WatermarkStore(
"injected load_global_seq failure (corrupt store)".to_string(),
))
}
fn save_global_seq(&self, _seq: i64) -> Result<(), SidecarError> {
Ok(())
}
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_poll_rollback_on_persist_failure() {
use crate::watermark::NullWatermarkStore;
use rhei_core::CdcConsumer;
let raw_conn = connector_arrow::rusqlite::rusqlite::Connection::open_in_memory().unwrap();
raw_conn
.execute_batch(
"CREATE TABLE events (id INTEGER PRIMARY KEY, name TEXT, created_at INTEGER, updated_at INTEGER);
INSERT INTO events VALUES (1, 'a', 100, 100);
INSERT INTO events VALUES (2, 'b', 200, 200);",
)
.unwrap();
let sqlite_conn = connector_arrow::rusqlite::SQLiteConnection::new(raw_conn);
let table_config = test_table_config();
let cdc_config = TimestampCdcConfig {
tables: vec![table_config],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
};
let failing_store = Box::new(FailingSaveStore::new(Box::new(NullWatermarkStore)));
let consumer =
TimestampCdcConsumer::with_watermark_store(sqlite_conn, cdc_config, failing_store);
let first_result = consumer.poll(None, 100).await;
assert!(
first_result.is_err(),
"poll should propagate the persistence error"
);
let wm_guard = consumer.watermarks.lock().await;
let events_wm = wm_guard
.get("events")
.cloned()
.unwrap_or_else(Watermark::new);
assert_eq!(
events_wm.timestamp, 0,
"watermark timestamp should be rolled back to pre-poll value"
);
assert!(
events_wm.last_pk.is_none(),
"watermark last_pk should be rolled back to pre-poll value"
);
drop(wm_guard);
assert_eq!(
consumer.global_seq.load(Ordering::Relaxed),
1,
"global_seq should be rolled back to pre-poll value"
);
}
#[cfg(feature = "sqlite")]
#[test]
fn test_try_with_watermark_store_fails_on_corrupt_load() {
let raw_conn = connector_arrow::rusqlite::rusqlite::Connection::open_in_memory().unwrap();
let sqlite_conn = connector_arrow::rusqlite::SQLiteConnection::new(raw_conn);
let table_config = test_table_config();
let cdc_config = TimestampCdcConfig {
tables: vec![table_config],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
};
let result = TimestampCdcConsumer::try_with_watermark_store(
sqlite_conn,
cdc_config,
Box::new(FailingLoadStore),
);
assert!(
result.is_err(),
"try_with_watermark_store must return Err on corrupt watermark load"
);
let err_str = result.err().unwrap().to_string();
assert!(
err_str.contains("watermark store"),
"error message should mention watermark store, got: {err_str}"
);
}
#[cfg(feature = "sqlite")]
#[test]
fn test_with_watermark_store_falls_back_on_corrupt_load() {
let raw_conn = connector_arrow::rusqlite::rusqlite::Connection::open_in_memory().unwrap();
let sqlite_conn = connector_arrow::rusqlite::SQLiteConnection::new(raw_conn);
let table_config = test_table_config();
let cdc_config = TimestampCdcConfig {
tables: vec![table_config],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
};
let consumer = TimestampCdcConsumer::with_watermark_store(
sqlite_conn,
cdc_config,
Box::new(FailingLoadStore),
);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let wm_guard = consumer.watermarks.lock().await;
let events_wm = wm_guard
.get("events")
.cloned()
.unwrap_or_else(Watermark::new);
assert_eq!(
events_wm.timestamp, 0,
"permissive fallback should use zero watermark"
);
});
assert_eq!(
consumer.global_seq.load(Ordering::Relaxed),
1,
"permissive fallback should use seq=1"
);
}
}