reifydb_store_transaction/cdc/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4pub(crate) mod codec;
5pub(crate) mod converter;
6mod exclude;
7mod layout;
8
9use std::collections::Bound;
10
11use async_trait::async_trait;
12use exclude::should_exclude_from_cdc;
13pub(crate) use reifydb_core::delta::Delta;
14use reifydb_core::{CommitVersion, EncodedKey, interface::Cdc, key::Key};
15use reifydb_type::diagnostic::internal::internal;
16
17pub trait CdcStore: Send + Sync + Clone + 'static + CdcGet + CdcRange + CdcCount {}
18
19/// A batch of CDC range results with continuation info.
20#[derive(Debug, Clone)]
21pub struct CdcBatch {
22	/// The CDC entries in this batch.
23	pub items: Vec<Cdc>,
24	/// Whether there are more items after this batch.
25	pub has_more: bool,
26}
27
28impl CdcBatch {
29	/// Creates an empty batch with no more results.
30	pub fn empty() -> Self {
31		Self {
32			items: Vec::new(),
33			has_more: false,
34		}
35	}
36
37	/// Returns true if this batch contains no items.
38	pub fn is_empty(&self) -> bool {
39		self.items.is_empty()
40	}
41}
42
43#[async_trait]
44pub trait CdcGet: Send + Sync {
45	async fn get(&self, version: CommitVersion) -> reifydb_type::Result<Option<Cdc>>;
46}
47
48#[async_trait]
49pub trait CdcRange: Send + Sync {
50	/// Fetch a batch of CDC entries in version order (ascending).
51	async fn range_batch(
52		&self,
53		start: Bound<CommitVersion>,
54		end: Bound<CommitVersion>,
55		batch_size: u64,
56	) -> reifydb_type::Result<CdcBatch>;
57
58	/// Convenience method with default batch size.
59	async fn range(
60		&self,
61		start: Bound<CommitVersion>,
62		end: Bound<CommitVersion>,
63	) -> reifydb_type::Result<CdcBatch> {
64		self.range_batch(start, end, 1024).await
65	}
66
67	/// Scan all CDC entries.
68	async fn scan(&self, batch_size: u64) -> reifydb_type::Result<CdcBatch> {
69		self.range_batch(Bound::Unbounded, Bound::Unbounded, batch_size).await
70	}
71}
72
73#[async_trait]
74pub trait CdcCount: Send + Sync {
75	async fn count(&self, version: CommitVersion) -> reifydb_type::Result<usize>;
76}
77
78/// Internal representation of CDC change with version references
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub(crate) enum InternalCdcChange {
81	Insert {
82		key: EncodedKey,
83		post_version: CommitVersion,
84	},
85	Update {
86		key: EncodedKey,
87		pre_version: CommitVersion,
88		post_version: CommitVersion,
89	},
90	Delete {
91		key: EncodedKey,
92		pre_version: CommitVersion,
93	},
94}
95
96impl InternalCdcChange {
97	/// Get the key for this change.
98	pub fn key(&self) -> &EncodedKey {
99		match self {
100			InternalCdcChange::Insert {
101				key,
102				..
103			} => key,
104			InternalCdcChange::Update {
105				key,
106				..
107			} => key,
108			InternalCdcChange::Delete {
109				key,
110				..
111			} => key,
112		}
113	}
114}
115
116/// Internal representation of CDC with version references
117#[derive(Debug, Clone, PartialEq)]
118pub(crate) struct InternalCdc {
119	pub version: CommitVersion,
120	pub timestamp: u64,
121	pub changes: Vec<InternalCdcSequencedChange>,
122}
123
124#[derive(Debug, Clone, PartialEq)]
125pub(crate) struct InternalCdcSequencedChange {
126	pub sequence: u16,
127	pub change: InternalCdcChange,
128}
129
130/// Generate an internal CDC change from a Delta
131fn generate_internal_cdc_change(
132	delta: Delta,
133	pre_version: Option<CommitVersion>,
134	post_version: CommitVersion,
135) -> Option<InternalCdcChange> {
136	match delta {
137		Delta::Set {
138			key,
139			values: _,
140		} => {
141			// operators and internal state do not generate cdc events
142			if let Some(kind) = Key::kind(&key) {
143				if should_exclude_from_cdc(kind) {
144					return None;
145				}
146			}
147
148			if let Some(pre_version) = pre_version {
149				Some(InternalCdcChange::Update {
150					key,
151					pre_version,
152					post_version,
153				})
154			} else {
155				Some(InternalCdcChange::Insert {
156					key,
157					post_version,
158				})
159			}
160		}
161
162		Delta::Remove {
163			key,
164		} => {
165			// operators and internal state do not produce cdc events
166			if let Some(kind) = Key::kind(&key) {
167				if should_exclude_from_cdc(kind) {
168					return None;
169				}
170			}
171
172			if let Some(pre_version) = pre_version {
173				Some(InternalCdcChange::Delete {
174					key,
175					pre_version,
176				})
177			} else {
178				None
179			}
180		}
181
182		// Drop operations never generate CDC events - they are for internal cleanup
183		Delta::Drop {
184			..
185		} => None,
186	}
187}
188
189/// Process optimized deltas and generate CDC changes
190///
191/// NOTE: This function expects deltas that are ALREADY OPTIMIZED at the delta level.
192/// All cancellation (Insert+Delete) and coalescing (Update+Update) has already been done.
193/// This function converts each optimized delta to the appropriate CDC change, with one
194/// exception: it collapses Delete→Insert patterns in the same transaction into just Insert.
195pub(crate) fn process_deltas_for_cdc<F>(
196	deltas: impl IntoIterator<Item = Delta>,
197	version: CommitVersion,
198	mut get_storage_version: F,
199) -> reifydb_type::Result<Vec<InternalCdcSequencedChange>>
200where
201	F: FnMut(&EncodedKey) -> Option<CommitVersion>,
202{
203	let mut cdc_changes: Vec<InternalCdcSequencedChange> = Vec::new();
204
205	for (idx, delta) in deltas.into_iter().enumerate() {
206		let sequence = match u16::try_from(idx + 1) {
207			Ok(seq) => seq,
208			Err(_) => return Err(reifydb_type::error!(internal("CDC sequence number exhausted"))),
209		};
210
211		let key = delta.key().clone();
212
213		// Get the pre-version from storage (if it exists)
214		let pre_version = get_storage_version(&key);
215
216		// Generate CDC change based on the optimized delta
217		if let Some(cdc_change) = generate_internal_cdc_change(delta, pre_version, version) {
218			// Check if this is an Insert or Update following a Delete in the same transaction
219			if let Some(last_change) = cdc_changes.last_mut() {
220				if let InternalCdcChange::Delete {
221					key: last_key,
222					pre_version: last_pre_version,
223				} = &last_change.change
224				{
225					if last_key == &key && *last_pre_version != version {
226						// Delete (from storage) + Insert/Update (new) in same transaction
227						// Convert to Insert (complete replacement)
228						match cdc_change {
229							InternalCdcChange::Insert {
230								..
231							} => {
232								last_change.change = cdc_change;
233								continue;
234							}
235							InternalCdcChange::Update {
236								key,
237								pre_version: _,
238								post_version,
239							} => {
240								// Convert Update to Insert
241								last_change.change = InternalCdcChange::Insert {
242									key,
243									post_version,
244								};
245								continue;
246							}
247							_ => {}
248						}
249					}
250				}
251			}
252
253			cdc_changes.push(InternalCdcSequencedChange {
254				sequence,
255				change: cdc_change,
256			});
257		}
258	}
259
260	Ok(cdc_changes)
261}