reifydb_cdc/
checkpoint.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	CommitVersion, CowVec,
6	interface::{CommandTransaction, QueryTransaction, ToConsumerKey},
7	value::encoded::EncodedValues,
8};
9
10pub struct CdcCheckpoint {}
11
12impl CdcCheckpoint {
13	pub fn fetch<K: ToConsumerKey>(
14		txn: &mut impl QueryTransaction,
15		consumer: &K,
16	) -> reifydb_core::Result<CommitVersion> {
17		let key = consumer.to_consumer_key();
18
19		txn.get(&key)?
20			.and_then(|multi| {
21				if multi.values.len() >= 8 {
22					let mut buffer = [0u8; 8];
23					buffer.copy_from_slice(&multi.values[0..8]);
24					Some(CommitVersion(u64::from_be_bytes(buffer)))
25				} else {
26					None
27				}
28			})
29			.map(Ok)
30			.unwrap_or(Ok(CommitVersion(1)))
31	}
32
33	pub fn persist<K: ToConsumerKey>(
34		txn: &mut impl CommandTransaction,
35		consumer: &K,
36		version: CommitVersion,
37	) -> reifydb_core::Result<()> {
38		let key = consumer.to_consumer_key();
39		let version_bytes = version.0.to_be_bytes().to_vec();
40		txn.set(&key, EncodedValues(CowVec::new(version_bytes)))
41	}
42}