reifydb_core/interface/transaction/
cdc.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::ops::Bound;
5
6use async_trait::async_trait;
7use reifydb_type::Result;
8
9use crate::{CommitVersion, interface::Cdc};
10
11/// A batch of CDC entries with continuation info.
12#[derive(Debug, Clone)]
13pub struct CdcBatch {
14	/// The CDC entries in this batch.
15	pub items: Vec<Cdc>,
16	/// Whether there are more items after this batch.
17	pub has_more: bool,
18}
19
20impl CdcBatch {
21	/// Creates an empty batch with no more results.
22	pub fn empty() -> Self {
23		Self {
24			items: Vec::new(),
25			has_more: false,
26		}
27	}
28
29	/// Returns true if this batch contains no items.
30	pub fn is_empty(&self) -> bool {
31		self.items.is_empty()
32	}
33}
34
35pub trait CdcTransaction: Send + Sync + Clone + 'static {
36	type Query<'a>: CdcQueryTransaction;
37
38	fn begin_query(&self) -> Result<Self::Query<'_>>;
39
40	fn with_query<F, R>(&self, f: F) -> Result<R>
41	where
42		F: FnOnce(&mut Self::Query<'_>) -> Result<R>,
43	{
44		let mut tx = self.begin_query()?;
45		f(&mut tx)
46	}
47}
48
49#[async_trait]
50pub trait CdcQueryTransaction: Send + Sync + Clone + 'static {
51	async fn get(&self, version: CommitVersion) -> Result<Option<Cdc>>;
52
53	async fn range_batch(
54		&self,
55		start: Bound<CommitVersion>,
56		end: Bound<CommitVersion>,
57		batch_size: u64,
58	) -> Result<CdcBatch>;
59
60	async fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> Result<CdcBatch> {
61		self.range_batch(start, end, 1024).await
62	}
63
64	async fn scan(&self, batch_size: u64) -> Result<CdcBatch> {
65		self.range_batch(Bound::Unbounded, Bound::Unbounded, batch_size).await
66	}
67
68	async fn count(&self, version: CommitVersion) -> Result<usize>;
69}