reifydb_core/interface/transaction/
cdc.rs1use std::ops::Bound;
5
6use async_trait::async_trait;
7use reifydb_type::Result;
8
9use crate::{CommitVersion, interface::Cdc};
10
11#[derive(Debug, Clone)]
13pub struct CdcBatch {
14 pub items: Vec<Cdc>,
16 pub has_more: bool,
18}
19
20impl CdcBatch {
21 pub fn empty() -> Self {
23 Self {
24 items: Vec::new(),
25 has_more: false,
26 }
27 }
28
29 pub fn is_empty(&self) -> bool {
31 self.items.is_empty()
32 }
33}
34
35pub trait CdcTransaction: Send + Sync + Clone + 'static {
36 type Query<'a>: CdcQueryTransaction;
37
38 fn begin_query(&self) -> Result<Self::Query<'_>>;
39
40 fn with_query<F, R>(&self, f: F) -> Result<R>
41 where
42 F: FnOnce(&mut Self::Query<'_>) -> Result<R>,
43 {
44 let mut tx = self.begin_query()?;
45 f(&mut tx)
46 }
47}
48
49#[async_trait]
50pub trait CdcQueryTransaction: Send + Sync + Clone + 'static {
51 async fn get(&self, version: CommitVersion) -> Result<Option<Cdc>>;
52
53 async fn range_batch(
54 &self,
55 start: Bound<CommitVersion>,
56 end: Bound<CommitVersion>,
57 batch_size: u64,
58 ) -> Result<CdcBatch>;
59
60 async fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> Result<CdcBatch> {
61 self.range_batch(start, end, 1024).await
62 }
63
64 async fn scan(&self, batch_size: u64) -> Result<CdcBatch> {
65 self.range_batch(Bound::Unbounded, Bound::Unbounded, batch_size).await
66 }
67
68 async fn count(&self, version: CommitVersion) -> Result<usize>;
69}