Skip to main content

reifydb_cdc/consume/
checkpoint.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{common::CommitVersion, encoded::row::EncodedRow, key::cdc_consumer::ToConsumerKey};
5use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
6use reifydb_type::{Result, util::cowvec::CowVec};
7
8pub struct CdcCheckpoint {}
9
10impl CdcCheckpoint {
11	pub fn fetch<K: ToConsumerKey>(txn: &mut Transaction<'_>, consumer: &K) -> Result<CommitVersion> {
12		let key = consumer.to_consumer_key();
13
14		txn.get(&key)?
15			.and_then(|multi| {
16				if multi.row.len() >= 8 {
17					let mut buffer = [0u8; 8];
18					buffer.copy_from_slice(&multi.row[0..8]);
19					Some(CommitVersion(u64::from_be_bytes(buffer)))
20				} else {
21					None
22				}
23			})
24			.map(Ok)
25			.unwrap_or(Ok(CommitVersion(1)))
26	}
27
28	pub fn persist<K: ToConsumerKey>(
29		txn: &mut CommandTransaction,
30		consumer: &K,
31		version: CommitVersion,
32	) -> Result<()> {
33		let key = consumer.to_consumer_key();
34		let version_bytes = version.0.to_be_bytes().to_vec();
35		txn.set(&key, EncodedRow(CowVec::new(version_bytes)))
36	}
37}