reifydb_store_transaction/cdc/
mod.rs1pub(crate) mod codec;
5pub(crate) mod converter;
6mod exclude;
7mod layout;
8
9use std::collections::Bound;
10
11use async_trait::async_trait;
12use exclude::should_exclude_from_cdc;
13pub(crate) use reifydb_core::delta::Delta;
14use reifydb_core::{CommitVersion, EncodedKey, interface::Cdc, key::Key};
15use reifydb_type::diagnostic::internal::internal;
16
17pub trait CdcStore: Send + Sync + Clone + 'static + CdcGet + CdcRange + CdcCount {}
18
19#[derive(Debug, Clone)]
21pub struct CdcBatch {
22 pub items: Vec<Cdc>,
24 pub has_more: bool,
26}
27
28impl CdcBatch {
29 pub fn empty() -> Self {
31 Self {
32 items: Vec::new(),
33 has_more: false,
34 }
35 }
36
37 pub fn is_empty(&self) -> bool {
39 self.items.is_empty()
40 }
41}
42
43#[async_trait]
44pub trait CdcGet: Send + Sync {
45 async fn get(&self, version: CommitVersion) -> reifydb_type::Result<Option<Cdc>>;
46}
47
48#[async_trait]
49pub trait CdcRange: Send + Sync {
50 async fn range_batch(
52 &self,
53 start: Bound<CommitVersion>,
54 end: Bound<CommitVersion>,
55 batch_size: u64,
56 ) -> reifydb_type::Result<CdcBatch>;
57
58 async fn range(
60 &self,
61 start: Bound<CommitVersion>,
62 end: Bound<CommitVersion>,
63 ) -> reifydb_type::Result<CdcBatch> {
64 self.range_batch(start, end, 1024).await
65 }
66
67 async fn scan(&self, batch_size: u64) -> reifydb_type::Result<CdcBatch> {
69 self.range_batch(Bound::Unbounded, Bound::Unbounded, batch_size).await
70 }
71}
72
73#[async_trait]
74pub trait CdcCount: Send + Sync {
75 async fn count(&self, version: CommitVersion) -> reifydb_type::Result<usize>;
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
80pub(crate) enum InternalCdcChange {
81 Insert {
82 key: EncodedKey,
83 post_version: CommitVersion,
84 },
85 Update {
86 key: EncodedKey,
87 pre_version: CommitVersion,
88 post_version: CommitVersion,
89 },
90 Delete {
91 key: EncodedKey,
92 pre_version: CommitVersion,
93 },
94}
95
96impl InternalCdcChange {
97 pub fn key(&self) -> &EncodedKey {
99 match self {
100 InternalCdcChange::Insert {
101 key,
102 ..
103 } => key,
104 InternalCdcChange::Update {
105 key,
106 ..
107 } => key,
108 InternalCdcChange::Delete {
109 key,
110 ..
111 } => key,
112 }
113 }
114}
115
116#[derive(Debug, Clone, PartialEq)]
118pub(crate) struct InternalCdc {
119 pub version: CommitVersion,
120 pub timestamp: u64,
121 pub changes: Vec<InternalCdcSequencedChange>,
122}
123
124#[derive(Debug, Clone, PartialEq)]
125pub(crate) struct InternalCdcSequencedChange {
126 pub sequence: u16,
127 pub change: InternalCdcChange,
128}
129
130fn generate_internal_cdc_change(
132 delta: Delta,
133 pre_version: Option<CommitVersion>,
134 post_version: CommitVersion,
135) -> Option<InternalCdcChange> {
136 match delta {
137 Delta::Set {
138 key,
139 values: _,
140 } => {
141 if let Some(kind) = Key::kind(&key) {
143 if should_exclude_from_cdc(kind) {
144 return None;
145 }
146 }
147
148 if let Some(pre_version) = pre_version {
149 Some(InternalCdcChange::Update {
150 key,
151 pre_version,
152 post_version,
153 })
154 } else {
155 Some(InternalCdcChange::Insert {
156 key,
157 post_version,
158 })
159 }
160 }
161
162 Delta::Remove {
163 key,
164 } => {
165 if let Some(kind) = Key::kind(&key) {
167 if should_exclude_from_cdc(kind) {
168 return None;
169 }
170 }
171
172 if let Some(pre_version) = pre_version {
173 Some(InternalCdcChange::Delete {
174 key,
175 pre_version,
176 })
177 } else {
178 None
179 }
180 }
181
182 Delta::Drop {
184 ..
185 } => None,
186 }
187}
188
189pub(crate) fn process_deltas_for_cdc<F>(
196 deltas: impl IntoIterator<Item = Delta>,
197 version: CommitVersion,
198 mut get_storage_version: F,
199) -> reifydb_type::Result<Vec<InternalCdcSequencedChange>>
200where
201 F: FnMut(&EncodedKey) -> Option<CommitVersion>,
202{
203 let mut cdc_changes: Vec<InternalCdcSequencedChange> = Vec::new();
204
205 for (idx, delta) in deltas.into_iter().enumerate() {
206 let sequence = match u16::try_from(idx + 1) {
207 Ok(seq) => seq,
208 Err(_) => return Err(reifydb_type::error!(internal("CDC sequence number exhausted"))),
209 };
210
211 let key = delta.key().clone();
212
213 let pre_version = get_storage_version(&key);
215
216 if let Some(cdc_change) = generate_internal_cdc_change(delta, pre_version, version) {
218 if let Some(last_change) = cdc_changes.last_mut() {
220 if let InternalCdcChange::Delete {
221 key: last_key,
222 pre_version: last_pre_version,
223 } = &last_change.change
224 {
225 if last_key == &key && *last_pre_version != version {
226 match cdc_change {
229 InternalCdcChange::Insert {
230 ..
231 } => {
232 last_change.change = cdc_change;
233 continue;
234 }
235 InternalCdcChange::Update {
236 key,
237 pre_version: _,
238 post_version,
239 } => {
240 last_change.change = InternalCdcChange::Insert {
242 key,
243 post_version,
244 };
245 continue;
246 }
247 _ => {}
248 }
249 }
250 }
251 }
252
253 cdc_changes.push(InternalCdcSequencedChange {
254 sequence,
255 change: cdc_change,
256 });
257 }
258 }
259
260 Ok(cdc_changes)
261}