reifydb-cdc 0.4.5

Change Data Capture module for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{common::CommitVersion, encoded::row::EncodedRow, key::cdc_consumer::ToConsumerKey};
use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
use reifydb_type::{Result, util::cowvec::CowVec};

pub struct CdcCheckpoint {}

impl CdcCheckpoint {
	pub fn fetch<K: ToConsumerKey>(txn: &mut Transaction<'_>, consumer: &K) -> Result<CommitVersion> {
		let key = consumer.to_consumer_key();

		txn.get(&key)?
			.and_then(|multi| {
				if multi.row.len() >= 8 {
					let mut buffer = [0u8; 8];
					buffer.copy_from_slice(&multi.row[0..8]);
					Some(CommitVersion(u64::from_be_bytes(buffer)))
				} else {
					None
				}
			})
			.map(Ok)
			.unwrap_or(Ok(CommitVersion(1)))
	}

	pub fn persist<K: ToConsumerKey>(
		txn: &mut CommandTransaction,
		consumer: &K,
		version: CommitVersion,
	) -> Result<()> {
		let key = consumer.to_consumer_key();
		let version_bytes = version.0.to_be_bytes().to_vec();
		txn.set(&key, EncodedRow(CowVec::new(version_bytes)))
	}
}