use reifydb_core::{common::CommitVersion, encoded::row::EncodedRow, key::cdc_consumer::ToConsumerKey};
use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
use reifydb_type::{Result, util::cowvec::CowVec};
pub struct CdcCheckpoint {}
impl CdcCheckpoint {
pub fn fetch<K: ToConsumerKey>(txn: &mut Transaction<'_>, consumer: &K) -> Result<CommitVersion> {
let key = consumer.to_consumer_key();
txn.get(&key)?
.and_then(|multi| {
if multi.row.len() >= 8 {
let mut buffer = [0u8; 8];
buffer.copy_from_slice(&multi.row[0..8]);
Some(CommitVersion(u64::from_be_bytes(buffer)))
} else {
None
}
})
.map(Ok)
.unwrap_or(Ok(CommitVersion(1)))
}
pub fn persist<K: ToConsumerKey>(
txn: &mut CommandTransaction,
consumer: &K,
version: CommitVersion,
) -> Result<()> {
let key = consumer.to_consumer_key();
let version_bytes = version.0.to_be_bytes().to_vec();
txn.set(&key, EncodedRow(CowVec::new(version_bytes)))
}
}