reifydb_cdc/consume/
checkpoint.rs1use reifydb_core::{common::CommitVersion, encoded::row::EncodedRow, key::cdc_consumer::ToConsumerKey};
5use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
6use reifydb_type::{Result, util::cowvec::CowVec};
7
8pub struct CdcCheckpoint {}
9
10impl CdcCheckpoint {
11 pub fn fetch<K: ToConsumerKey>(txn: &mut Transaction<'_>, consumer: &K) -> Result<CommitVersion> {
12 let key = consumer.to_consumer_key();
13
14 txn.get(&key)?
15 .and_then(|multi| {
16 if multi.row.len() >= 8 {
17 let mut buffer = [0u8; 8];
18 buffer.copy_from_slice(&multi.row[0..8]);
19 Some(CommitVersion(u64::from_be_bytes(buffer)))
20 } else {
21 None
22 }
23 })
24 .map(Ok)
25 .unwrap_or(Ok(CommitVersion(1)))
26 }
27
28 pub fn persist<K: ToConsumerKey>(
29 txn: &mut CommandTransaction,
30 consumer: &K,
31 version: CommitVersion,
32 ) -> Result<()> {
33 let key = consumer.to_consumer_key();
34 let version_bytes = version.0.to_be_bytes().to_vec();
35 txn.set(&key, EncodedRow(CowVec::new(version_bytes)))
36 }
37}