use std::{
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
thread::sleep,
time::Duration,
};
use reifydb_cdc::consume::{
checkpoint::CdcCheckpoint,
consumer::{CdcConsume, CdcConsumer},
poll::{PollConsumer, PollConsumerConfig},
};
use reifydb_core::{
encoded::{key::EncodedKey, row::EncodedRow},
interface::{
catalog::{id::TableId, schema::SchemaId},
cdc::{Cdc, CdcConsumerId, SystemChange},
},
key::{EncodableKey, Key, Key::Row, cdc_consumer::CdcConsumerKey, row::RowKey},
};
use reifydb_engine::{engine::StandardEngine, test_harness::TestEngine};
use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
use reifydb_type::{
error::{Diagnostic, Error},
fragment::Fragment,
util::cowvec::CowVec,
value::{identity::IdentityId, row_number::RowNumber},
};
#[test]
fn test_consumer_lifecycle() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(100), None);
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
assert!(!test_instance.is_running());
test_instance.start().expect("Failed to start consumer");
assert!(test_instance.is_running());
sleep(Duration::from_millis(50));
assert!(test_instance.is_running());
test_instance.stop().expect("Failed to stop consumer");
assert!(!test_instance.is_running());
test_instance.stop().expect("Should be able to stop already stopped consumer");
assert!(!test_instance.is_running());
}
#[test]
fn test_event_processing() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 5);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), None);
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(200));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 5, "Should have processed 5 changes");
let transactions = consumer_clone.get_transactions();
assert_eq!(transactions.len(), 5, "Should have 5 transactions");
for (i, cdc) in transactions.iter().enumerate() {
assert_eq!(cdc.system_changes.len(), 1, "Each transaction should have 1 change");
if let SystemChange::Insert {
key,
..
} = &cdc.system_changes[0]
{
if let Some(Row(table_row)) = Key::decode(key) {
assert_eq!(table_row.object, TableId(1));
assert_eq!(table_row.row, RowNumber((i + 1) as u64));
} else {
panic!("Expected Row key");
}
}
}
assert!(consumer_clone.get_process_count() >= 1, "Should have processed at least once");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_checkpoint_persistence() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 3);
let config = PollConsumerConfig::new(consumer_id.clone(), "cdc-poll-test", Duration::from_millis(50), None);
let mut test_instance =
PollConsumer::new(config, t.inner().clone(), consumer, cdc_store.clone(), runtime.clone());
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
test_instance.stop().expect("Failed to stop consumer");
let changes_first_run = consumer_clone.get_total_changes();
assert_eq!(changes_first_run, 3, "Should have processed 3 changes in first run");
insert_test_events(&t, 2);
let consumer2 = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer2_clone = consumer2.clone();
let config2 = PollConsumerConfig::new(consumer_id.clone(), "cdc-poll-test", Duration::from_millis(50), None);
let runtime2 = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let mut test_instance2 = PollConsumer::new(config2, t.inner().clone(), consumer2, cdc_store, runtime2);
test_instance2.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
test_instance2.stop().expect("Failed to stop consumer");
let changes_second_run = consumer2_clone.get_total_changes();
assert_eq!(changes_second_run, 2, "Should have processed only 2 new changes");
let mut txn = t.begin_query(IdentityId::system()).expect("Failed to begin transaction");
let consumer_key = CdcConsumerKey {
consumer: consumer_id,
}
.encode();
let checkpoint = txn.get(&consumer_key).expect("Failed to get checkpoint").expect("Checkpoint should exist");
let mut buffer = [0u8; 8];
buffer.copy_from_slice(&checkpoint.row[0..8]);
let stored_version = u64::from_be_bytes(buffer);
assert!(stored_version >= 3, "Checkpoint should be after initial events");
}
#[test]
fn test_error_handling() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 3);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), None);
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(100));
let changes_before_error = consumer_clone.get_total_changes();
assert_eq!(changes_before_error, 3, "Should have processed 3 changes before error");
consumer_clone.set_should_fail(true);
insert_test_events(&t, 2);
sleep(Duration::from_millis(150));
let changes_during_error = consumer_clone.get_total_changes();
assert_eq!(changes_during_error, 3, "Should not have processed new changes during error");
consumer_clone.set_should_fail(false);
sleep(Duration::from_millis(150));
let changes_after_recovery = consumer_clone.get_total_changes();
assert_eq!(changes_after_recovery, 5, "Should have processed new changes after recovery");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_empty_events_handling() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), None);
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 0, "Should have no changes to process");
assert_eq!(consumer_clone.get_process_count(), 0, "Should not have called consume");
insert_test_events(&t, 1);
sleep(Duration::from_millis(100));
let changes_after_insert = consumer_clone.get_total_changes();
assert_eq!(changes_after_insert, 1, "Should have processed 1 change");
assert!(consumer_clone.get_process_count() >= 1, "Should have called consume");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_multiple_consumers() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let consumer_id1 = CdcConsumerId::new("consumer-1");
let consumer1 = TestConsumer::new(t.inner().clone(), consumer_id1.clone());
let consumer1_clone = consumer1.clone();
let consumer_id2 = CdcConsumerId::new("consumer-2");
let consumer2 = TestConsumer::new(t.inner().clone(), consumer_id2.clone());
let consumer2_clone = consumer2.clone();
insert_test_events(&t, 3);
let config1 = PollConsumerConfig::new(consumer_id1.clone(), "cdc-poll-test-1", Duration::from_millis(50), None);
let mut test_instance1 =
PollConsumer::new(config1, t.inner().clone(), consumer1, cdc_store.clone(), runtime.clone());
let config2 = PollConsumerConfig::new(consumer_id2.clone(), "cdc-poll-test-2", Duration::from_millis(75), None);
let mut test_instance2 = PollConsumer::new(config2, t.inner().clone(), consumer2, cdc_store, runtime);
test_instance1.start().expect("Failed to start consumer 1");
test_instance2.start().expect("Failed to start consumer 2");
sleep(Duration::from_millis(200));
let changes1 = consumer1_clone.get_total_changes();
let changes2 = consumer2_clone.get_total_changes();
assert_eq!(changes1, 3, "Consumer 1 should have processed 3 changes");
assert_eq!(changes2, 3, "Consumer 2 should have processed 3 changes");
insert_test_events(&t, 2);
sleep(Duration::from_millis(200));
let changes1_after = consumer1_clone.get_total_changes();
let changes2_after = consumer2_clone.get_total_changes();
assert_eq!(changes1_after, 5, "Consumer 1 should have processed 5 changes total");
assert_eq!(changes2_after, 5, "Consumer 2 should have processed 5 changes total");
let mut txn = t.begin_query(IdentityId::system()).expect("Failed to begin transaction");
let consumer1_key = CdcConsumerKey {
consumer: consumer_id1,
}
.encode();
let consumer2_key = CdcConsumerKey {
consumer: consumer_id2,
}
.encode();
let checkpoint1 =
txn.get(&consumer1_key).expect("Failed to get checkpoint 1").expect("Checkpoint 1 should exist");
let checkpoint2 =
txn.get(&consumer2_key).expect("Failed to get checkpoint 2").expect("Checkpoint 2 should exist");
let mut buffer = [0u8; 8];
buffer.copy_from_slice(&checkpoint1.row[0..8]);
let version1 = u64::from_be_bytes(buffer);
buffer.copy_from_slice(&checkpoint2.row[0..8]);
let version2 = u64::from_be_bytes(buffer);
assert!(version1 >= 5, "Consumer 1 should have processed all events");
assert!(version2 >= 5, "Consumer 2 should have processed all events");
test_instance1.stop().expect("Failed to stop consumer 1");
test_instance2.stop().expect("Failed to stop consumer 2");
}
#[test]
fn test_non_table_events_filtered() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let mut txn = t.begin_command(IdentityId::system()).expect("Failed to begin transaction");
let table_key = RowKey::encoded(SchemaId::table(1), RowNumber(1));
txn.set(&table_key, EncodedRow(CowVec::new(b"table_value".to_vec()))).expect("Failed to set table encoded");
let non_table_key = EncodedKey(CowVec::new(b"non_table_key".to_vec()));
txn.set(&non_table_key, EncodedRow(CowVec::new(b"non_table_value".to_vec())))
.expect("Failed to set non-table encoded");
txn.commit().expect("Failed to commit transaction");
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), None);
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
test_instance.stop().expect("Failed to stop consumer");
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 2, "Should have processed 2 changes (both in same transaction)");
let transactions = consumer_clone.get_transactions();
assert_eq!(transactions.len(), 1, "Should have 1 transaction");
assert_eq!(transactions[0].system_changes.len(), 2, "Transaction should have 2 changes");
let table_change = transactions[0]
.system_changes
.iter()
.find(|c| matches!(Key::decode(c.key()), Some(Row(_))))
.expect("Should have at least one table change");
if let SystemChange::Insert {
key,
..
} = table_change
{
if let Some(Row(table_row)) = Key::decode(key) {
assert_eq!(table_row.object, TableId(1));
assert_eq!(table_row.row, RowNumber(1));
} else {
panic!("Expected Row key");
}
}
}
#[test]
fn test_rapid_start_stop() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
for _ in 0..5 {
let config =
PollConsumerConfig::new(consumer_id.clone(), "cdc-poll-test", Duration::from_millis(100), None);
let mut test_instance = PollConsumer::new(
config,
t.inner().clone(),
consumer.clone(),
cdc_store.clone(),
runtime.clone(),
);
test_instance.start().expect("Failed to start consumer");
assert!(test_instance.is_running());
sleep(Duration::from_millis(10));
test_instance.stop().expect("Failed to stop consumer");
assert!(!test_instance.is_running());
}
}
#[test]
fn test_batch_size_limits_processing() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 25);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), Some(10));
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(300));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 25, "Should have processed all 25 changes");
let process_count = consumer_clone.get_process_count();
assert!(process_count >= 3, "Should have been called at least 3 times (for batches of 10, 10, 5)");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_batch_size_one_processes_sequentially() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 5);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), Some(1));
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(400));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 5, "Should have processed all 5 changes");
let process_count = consumer_clone.get_process_count();
assert!(process_count >= 5, "Should have been called at least 5 times (one per event)");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_batch_size_none_processes_all_at_once() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 20);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), None);
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 20, "Should have processed all 20 changes");
let process_count = consumer_clone.get_process_count();
assert!(process_count <= 2, "Should have been called at most 2 times with unbounded batch");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_batch_size_larger_than_events() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 5);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), Some(100));
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 5, "Should have processed all 5 changes");
let process_count = consumer_clone.get_process_count();
assert!(process_count <= 2, "Should have processed efficiently in 1-2 calls");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_batch_size_with_checkpoint_resume() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 15);
let config = PollConsumerConfig::new(consumer_id.clone(), "cdc-poll-test", Duration::from_millis(50), Some(5));
let mut test_instance =
PollConsumer::new(config, t.inner().clone(), consumer, cdc_store.clone(), runtime.clone());
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(180));
test_instance.stop().expect("Failed to stop consumer");
let changes_first_run = consumer_clone.get_total_changes();
assert!(changes_first_run >= 5, "Should have processed at least one batch of 5");
insert_test_events(&t, 3);
let consumer2 = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer2_clone = consumer2.clone();
let config2 = PollConsumerConfig::new(consumer_id.clone(), "cdc-poll-test", Duration::from_millis(50), Some(5));
let runtime2 = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let mut test_instance2 = PollConsumer::new(config2, t.inner().clone(), consumer2, cdc_store, runtime2);
test_instance2.start().expect("Failed to start consumer");
sleep(Duration::from_millis(250));
test_instance2.stop().expect("Failed to stop consumer");
let changes_second_run = consumer2_clone.get_total_changes();
let total_expected = 18 - changes_first_run;
assert_eq!(changes_second_run, total_expected, "Should have processed remaining events plus new ones");
test_instance2.stop().expect("Failed to stop consumer");
}
#[test]
fn test_batch_size_exact_match() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let consumer_id = CdcConsumerId::flow_consumer();
let consumer = TestConsumer::new(t.inner().clone(), consumer_id.clone());
let consumer_clone = consumer.clone();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
insert_test_events(&t, 10);
let config = PollConsumerConfig::new(consumer_id, "cdc-poll-test", Duration::from_millis(50), Some(10));
let mut test_instance = PollConsumer::new(config, t.inner().clone(), consumer, cdc_store, runtime);
test_instance.start().expect("Failed to start consumer");
sleep(Duration::from_millis(150));
let changes = consumer_clone.get_total_changes();
assert_eq!(changes, 10, "Should have processed all 10 changes");
let process_count = consumer_clone.get_process_count();
assert!(process_count <= 2, "Should have processed in 1-2 calls with exact batch size match");
test_instance.stop().expect("Failed to stop consumer");
}
#[test]
fn test_multiple_consumers_different_batch_sizes() {
let t = TestEngine::new();
let cdc_store = t.cdc_store();
let runtime = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
let consumer_id1 = CdcConsumerId::new("consumer-batch-3");
let consumer1 = TestConsumer::new(t.inner().clone(), consumer_id1.clone());
let consumer1_clone = consumer1.clone();
let consumer_id2 = CdcConsumerId::new("consumer-unbounded");
let consumer2 = TestConsumer::new(t.inner().clone(), consumer_id2.clone());
let consumer2_clone = consumer2.clone();
insert_test_events(&t, 10);
let config1 =
PollConsumerConfig::new(consumer_id1.clone(), "cdc-poll-test-1", Duration::from_millis(50), Some(3));
let mut test_instance1 =
PollConsumer::new(config1, t.inner().clone(), consumer1, cdc_store.clone(), runtime.clone());
let config2 = PollConsumerConfig::new(consumer_id2.clone(), "cdc-poll-test-2", Duration::from_millis(75), None);
let mut test_instance2 = PollConsumer::new(config2, t.inner().clone(), consumer2, cdc_store, runtime);
test_instance1.start().expect("Failed to start consumer 1");
test_instance2.start().expect("Failed to start consumer 2");
sleep(Duration::from_millis(400));
let changes1 = consumer1_clone.get_total_changes();
let changes2 = consumer2_clone.get_total_changes();
assert_eq!(changes1, 10, "Consumer 1 should have processed all 10 changes");
assert_eq!(changes2, 10, "Consumer 2 should have processed all 10 changes");
let process_count1 = consumer1_clone.get_process_count();
let process_count2 = consumer2_clone.get_process_count();
assert!(process_count1 >= 4, "Consumer 1 should have at least 4 calls (10 events / batch size 3)");
assert!(process_count2 <= 2, "Consumer 2 should have at most 2 calls (unbounded)");
test_instance1.stop().expect("Failed to stop consumer 1");
test_instance2.stop().expect("Failed to stop consumer 2");
}
struct TestConsumer {
host: StandardEngine,
consumer_key: EncodedKey,
cdc_received: Arc<Mutex<Vec<Cdc>>>,
process_count: Arc<AtomicUsize>,
should_fail: Arc<AtomicBool>,
}
impl TestConsumer {
fn new(host: StandardEngine, consumer_id: CdcConsumerId) -> Self {
let consumer_key = CdcConsumerKey {
consumer: consumer_id,
}
.encode();
Self {
host,
consumer_key,
cdc_received: Arc::new(Mutex::new(Vec::new())),
process_count: Arc::new(AtomicUsize::new(0)),
should_fail: Arc::new(AtomicBool::new(false)),
}
}
fn set_should_fail(&self, should_fail: bool) {
self.should_fail.store(should_fail, Ordering::SeqCst);
}
fn get_transactions(&self) -> Vec<Cdc> {
self.cdc_received.lock().unwrap().clone()
}
fn get_total_changes(&self) -> usize {
self.cdc_received.lock().unwrap().iter().map(|cdc| cdc.system_changes.len()).sum()
}
fn get_process_count(&self) -> usize {
self.process_count.load(Ordering::SeqCst)
}
}
impl Clone for TestConsumer {
fn clone(&self) -> Self {
Self {
host: self.host.clone(),
consumer_key: self.consumer_key.clone(),
cdc_received: Arc::clone(&self.cdc_received),
process_count: Arc::clone(&self.process_count),
should_fail: Arc::clone(&self.should_fail),
}
}
}
impl CdcConsume for TestConsumer {
fn consume(&self, transactions: Vec<Cdc>, reply: Box<dyn FnOnce(reifydb_type::Result<()>) + Send>) {
if self.should_fail.load(Ordering::SeqCst) {
(reply)(Err(Error(Diagnostic {
code: "TEST_ERROR".to_string(),
statement: None,
message: "Test failure".to_string(),
column: None,
fragment: Fragment::None,
label: None,
help: None,
notes: vec![],
cause: None,
operator_chain: None,
})));
return;
}
let latest_version = transactions.last().map(|c| c.version);
if let Some(version) = latest_version {
match self.host.begin_command(IdentityId::system()) {
Ok(mut txn) => {
if let Err(e) = CdcCheckpoint::persist(&mut txn, &self.consumer_key, version) {
(reply)(Err(e));
return;
}
if let Err(e) = txn.commit() {
(reply)(Err(e));
return;
}
}
Err(e) => {
(reply)(Err(e));
return;
}
}
}
let mut received = self.cdc_received.lock().unwrap();
received.extend(transactions);
self.process_count.fetch_add(1, Ordering::SeqCst);
(reply)(Ok(()));
}
}
fn insert_test_events(engine: &StandardEngine, count: usize) {
for i in 0..count {
let mut txn = engine.begin_command(IdentityId::system()).unwrap();
let key = RowKey::encoded(SchemaId::table(1), RowNumber((i + 1) as u64));
let value = format!("value_{}", i);
txn.set(&key, EncodedRow(CowVec::new(value.into_bytes()))).unwrap();
txn.commit().unwrap();
}
}