reifydb_cdc/
checkpoint.rs1use reifydb_core::{
5 CommitVersion, CowVec,
6 interface::{CommandTransaction, QueryTransaction, ToConsumerKey},
7 value::encoded::EncodedValues,
8};
9
10pub struct CdcCheckpoint {}
11
12impl CdcCheckpoint {
13 pub fn fetch<K: ToConsumerKey>(
14 txn: &mut impl QueryTransaction,
15 consumer: &K,
16 ) -> reifydb_core::Result<CommitVersion> {
17 let key = consumer.to_consumer_key();
18
19 txn.get(&key)?
20 .and_then(|multi| {
21 if multi.values.len() >= 8 {
22 let mut buffer = [0u8; 8];
23 buffer.copy_from_slice(&multi.values[0..8]);
24 Some(CommitVersion(u64::from_be_bytes(buffer)))
25 } else {
26 None
27 }
28 })
29 .map(Ok)
30 .unwrap_or(Ok(CommitVersion(1)))
31 }
32
33 pub fn persist<K: ToConsumerKey>(
34 txn: &mut impl CommandTransaction,
35 consumer: &K,
36 version: CommitVersion,
37 ) -> reifydb_core::Result<()> {
38 let key = consumer.to_consumer_key();
39 let version_bytes = version.0.to_be_bytes().to_vec();
40 txn.set(&key, EncodedValues(CowVec::new(version_bytes)))
41 }
42}