Skip to main content

reifydb_cdc/storage/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! CDC Storage abstraction.
5//!
6//! This module provides a trait for CDC storage backends and an in-memory implementation.
7//! CDC storage is independent of MVCC versioned storage - it uses simple BE u64 keys
8//! (CommitVersion) and stores fully resolved values.
9
10pub mod memory;
11
12use std::{collections::Bound, sync};
13
14use memory::MemoryCdcStorage;
15use reifydb_core::{
16	common::CommitVersion,
17	encoded::key::EncodedKey,
18	interface::cdc::{Cdc, CdcBatch},
19};
20
21use crate::error::CdcError;
22
23/// Result type for CDC storage operations.
24pub type CdcStorageResult<T> = Result<T, CdcError>;
25
26/// Information about a dropped CDC entry for stats tracking.
27#[derive(Debug, Clone)]
28pub struct DroppedCdcEntry {
29	pub key: EncodedKey,
30	pub value_bytes: u64,
31}
32
33/// Result of a drop_before operation.
34#[derive(Debug, Clone, Default)]
35pub struct DropBeforeResult {
36	pub count: usize,
37	pub entries: Vec<DroppedCdcEntry>,
38}
39
40/// Trait for CDC storage backends.
41///
42/// CDC storage stores fully resolved change data capture entries keyed by CommitVersion.
43/// Unlike MVCC storage, CDC entries are immutable and use simple version keys.
44///
45/// Implementations must be thread-safe and cloneable to support concurrent access
46/// from multiple consumers and the CDC generation pipeline.
47pub trait CdcStorage: Send + Sync + Clone + 'static {
48	/// Write a CDC entry (fully resolved values).
49	///
50	/// The entry is keyed by its version. If an entry already exists at this version,
51	/// it will be overwritten (this should only happen during recovery/replay).
52	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
53
54	/// Read a CDC entry by version.
55	///
56	/// Returns `None` if no entry exists at the given version.
57	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
58
59	/// Read CDC entries in a version range.
60	///
61	/// Returns entries in ascending version order up to `batch_size` entries.
62	/// The `CdcBatch.has_more` flag indicates if more entries exist beyond the batch.
63	fn read_range(
64		&self,
65		start: Bound<CommitVersion>,
66		end: Bound<CommitVersion>,
67		batch_size: u64,
68	) -> CdcStorageResult<CdcBatch>;
69
70	/// Count CDC changes at a specific version.
71	///
72	/// Returns 0 if no entry exists at the given version.
73	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
74
75	/// Get the minimum (oldest) CDC version in storage.
76	///
77	/// Returns `None` if storage is empty.
78	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
79
80	/// Get the maximum (newest) CDC version in storage.
81	///
82	/// Returns `None` if storage is empty.
83	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
84
85	/// Check if a CDC entry exists at the given version.
86	fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
87		Ok(self.read(version)?.is_some())
88	}
89
90	/// Delete all CDC entries with version strictly less than the given version.
91	/// Returns the count and entry information for stats tracking.
92	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
93
94	/// Convenience method with default batch size.
95	fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
96		self.read_range(start, end, 1024)
97	}
98
99	/// Scan all CDC entries with the given batch size.
100	fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
101		self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
102	}
103}
104
105/// Blanket implementation for CdcStore compatibility with existing traits.
106impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
107	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
108		(**self).write(cdc)
109	}
110
111	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
112		(**self).read(version)
113	}
114
115	fn read_range(
116		&self,
117		start: Bound<CommitVersion>,
118		end: Bound<CommitVersion>,
119		batch_size: u64,
120	) -> CdcStorageResult<CdcBatch> {
121		(**self).read_range(start, end, batch_size)
122	}
123
124	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
125		(**self).count(version)
126	}
127
128	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
129		(**self).min_version()
130	}
131
132	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
133		(**self).max_version()
134	}
135
136	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
137		(**self).drop_before(version)
138	}
139}
140
141/// CDC storage abstraction enum.
142///
143/// Provides a unified interface over different CDC storage backends.
144/// Currently supports in-memory storage, with room for future backends.
145#[derive(Clone)]
146pub enum CdcStore {
147	/// In-memory CDC storage backed by a BTreeMap.
148	Memory(MemoryCdcStorage),
149}
150
151impl CdcStore {
152	/// Create an in-memory CDC store.
153	pub fn memory() -> Self {
154		Self::Memory(MemoryCdcStorage::new())
155	}
156
157	/// Write a CDC entry.
158	pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
159		match self {
160			Self::Memory(s) => s.write(cdc),
161		}
162	}
163
164	/// Read a CDC entry by version.
165	pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
166		match self {
167			Self::Memory(s) => s.read(version),
168		}
169	}
170
171	/// Read CDC entries in a version range.
172	pub fn read_range(
173		&self,
174		start: Bound<CommitVersion>,
175		end: Bound<CommitVersion>,
176		batch_size: u64,
177	) -> CdcStorageResult<CdcBatch> {
178		match self {
179			Self::Memory(s) => s.read_range(start, end, batch_size),
180		}
181	}
182
183	/// Count CDC changes at a specific version.
184	pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
185		match self {
186			Self::Memory(s) => s.count(version),
187		}
188	}
189
190	/// Get the minimum (oldest) CDC version in storage.
191	pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
192		match self {
193			Self::Memory(s) => s.min_version(),
194		}
195	}
196
197	/// Get the maximum (newest) CDC version in storage.
198	pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
199		match self {
200			Self::Memory(s) => s.max_version(),
201		}
202	}
203
204	/// Delete all CDC entries with version strictly less than the given version.
205	pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
206		match self {
207			Self::Memory(s) => s.drop_before(version),
208		}
209	}
210}
211
212impl CdcStorage for CdcStore {
213	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
214		CdcStore::write(self, cdc)
215	}
216
217	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
218		CdcStore::read(self, version)
219	}
220
221	fn read_range(
222		&self,
223		start: Bound<CommitVersion>,
224		end: Bound<CommitVersion>,
225		batch_size: u64,
226	) -> CdcStorageResult<CdcBatch> {
227		CdcStore::read_range(self, start, end, batch_size)
228	}
229
230	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
231		CdcStore::count(self, version)
232	}
233
234	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
235		CdcStore::min_version(self)
236	}
237
238	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
239		CdcStore::max_version(self)
240	}
241
242	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
243		CdcStore::delete_before(self, version)
244	}
245}
246
247#[cfg(test)]
248pub mod tests {
249	use reifydb_core::{
250		encoded::{key::EncodedKey, row::EncodedRow},
251		interface::cdc::SystemChange,
252	};
253	use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
254
255	use super::*;
256
257	fn create_test_cdc(version: u64, num_changes: usize) -> Cdc {
258		let system_changes: Vec<SystemChange> = (0..num_changes)
259			.map(|i| SystemChange::Insert {
260				key: EncodedKey::new(vec![i as u8]),
261				post: EncodedRow(CowVec::new(vec![])),
262			})
263			.collect();
264
265		Cdc::new(CommitVersion(version), DateTime::from_nanos(12345), Vec::new(), system_changes)
266	}
267
268	#[test]
269	fn test_memory_storage_write_read() {
270		let storage = MemoryCdcStorage::new();
271		let cdc = create_test_cdc(1, 3);
272
273		storage.write(&cdc).unwrap();
274
275		let read_cdc = storage.read(CommitVersion(1)).unwrap();
276		assert!(read_cdc.is_some());
277		let read_cdc = read_cdc.unwrap();
278		assert_eq!(read_cdc.version, CommitVersion(1));
279		assert_eq!(read_cdc.system_changes.len(), 3);
280	}
281
282	#[test]
283	fn test_memory_storage_read_nonexistent() {
284		let storage = MemoryCdcStorage::new();
285		let result = storage.read(CommitVersion(999)).unwrap();
286		assert!(result.is_none());
287	}
288
289	#[test]
290	fn test_memory_storage_range() {
291		let storage = MemoryCdcStorage::new();
292
293		for v in 1..=10 {
294			storage.write(&create_test_cdc(v, 1)).unwrap();
295		}
296
297		// Read range [3, 7]
298		let batch = storage
299			.read_range(Bound::Included(CommitVersion(3)), Bound::Included(CommitVersion(7)), 100)
300			.unwrap();
301
302		assert_eq!(batch.items.len(), 5);
303		assert!(!batch.has_more);
304		assert_eq!(batch.items[0].version, CommitVersion(3));
305		assert_eq!(batch.items[4].version, CommitVersion(7));
306	}
307
308	#[test]
309	fn test_memory_storage_range_batch_size() {
310		let storage = MemoryCdcStorage::new();
311
312		for v in 1..=10 {
313			storage.write(&create_test_cdc(v, 1)).unwrap();
314		}
315
316		// Read with batch size 3
317		let batch = storage.read_range(Bound::Unbounded, Bound::Unbounded, 3).unwrap();
318
319		assert_eq!(batch.items.len(), 3);
320		assert!(batch.has_more);
321	}
322
323	#[test]
324	fn test_memory_storage_count() {
325		let storage = MemoryCdcStorage::new();
326		let cdc = create_test_cdc(1, 5);
327		storage.write(&cdc).unwrap();
328
329		assert_eq!(storage.count(CommitVersion(1)).unwrap(), 5);
330		assert_eq!(storage.count(CommitVersion(2)).unwrap(), 0);
331	}
332
333	#[test]
334	fn test_memory_storage_min_max_version() {
335		let storage = MemoryCdcStorage::new();
336
337		assert!(storage.min_version().unwrap().is_none());
338		assert!(storage.max_version().unwrap().is_none());
339
340		storage.write(&create_test_cdc(5, 1)).unwrap();
341		storage.write(&create_test_cdc(3, 1)).unwrap();
342		storage.write(&create_test_cdc(8, 1)).unwrap();
343
344		assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(3)));
345		assert_eq!(storage.max_version().unwrap(), Some(CommitVersion(8)));
346	}
347
348	#[test]
349	fn test_delete_before_empty_storage() {
350		let storage = MemoryCdcStorage::new();
351
352		// Deleting from empty storage should return 0
353		let result = storage.drop_before(CommitVersion(10)).unwrap();
354		assert_eq!(result.count, 0);
355		assert!(result.entries.is_empty());
356	}
357
358	#[test]
359	fn test_delete_before_some_entries() {
360		let storage = MemoryCdcStorage::new();
361
362		// Add entries at versions 1, 3, 5, 7, 9
363		for v in [1, 3, 5, 7, 9] {
364			storage.write(&create_test_cdc(v, 1)).unwrap();
365		}
366
367		// Delete entries before version 5 (should delete 1 and 3)
368		let result = storage.drop_before(CommitVersion(5)).unwrap();
369		assert_eq!(result.count, 2);
370		// Each CDC entry has 1 change, so 2 entries total
371		assert_eq!(result.entries.len(), 2);
372
373		// Verify versions 1 and 3 are gone
374		assert!(storage.read(CommitVersion(1)).unwrap().is_none());
375		assert!(storage.read(CommitVersion(3)).unwrap().is_none());
376
377		// Verify versions 5, 7, 9 remain
378		assert!(storage.read(CommitVersion(5)).unwrap().is_some());
379		assert!(storage.read(CommitVersion(7)).unwrap().is_some());
380		assert!(storage.read(CommitVersion(9)).unwrap().is_some());
381
382		// Min version should now be 5
383		assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(5)));
384	}
385
386	#[test]
387	fn test_delete_before_all_entries() {
388		let storage = MemoryCdcStorage::new();
389
390		// Add entries at versions 1, 2, 3
391		for v in 1..=3 {
392			storage.write(&create_test_cdc(v, 1)).unwrap();
393		}
394
395		// Delete all entries (version 10 is greater than all)
396		let result = storage.drop_before(CommitVersion(10)).unwrap();
397		assert_eq!(result.count, 3);
398		assert_eq!(result.entries.len(), 3);
399
400		// Verify storage is empty
401		assert!(storage.min_version().unwrap().is_none());
402		assert!(storage.max_version().unwrap().is_none());
403	}
404
405	#[test]
406	fn test_delete_before_none_when_version_too_low() {
407		let storage = MemoryCdcStorage::new();
408
409		// Add entries at versions 5, 6, 7
410		for v in 5..=7 {
411			storage.write(&create_test_cdc(v, 1)).unwrap();
412		}
413
414		// Delete entries before version 3 (should delete nothing)
415		let result = storage.drop_before(CommitVersion(3)).unwrap();
416		assert_eq!(result.count, 0);
417		assert!(result.entries.is_empty());
418
419		// All entries should remain
420		assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(5)));
421		assert_eq!(storage.max_version().unwrap(), Some(CommitVersion(7)));
422	}
423
424	#[test]
425	fn test_delete_before_boundary_condition() {
426		let storage = MemoryCdcStorage::new();
427
428		// Add entries at versions 1, 2, 3, 4, 5
429		for v in 1..=5 {
430			storage.write(&create_test_cdc(v, 1)).unwrap();
431		}
432
433		// Delete entries before version 3 (should delete 1 and 2, keep 3, 4, 5)
434		let result = storage.drop_before(CommitVersion(3)).unwrap();
435		assert_eq!(result.count, 2);
436		assert_eq!(result.entries.len(), 2);
437
438		// Version 3 should still exist (strictly less than)
439		assert!(storage.read(CommitVersion(3)).unwrap().is_some());
440		assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(3)));
441	}
442
443	#[test]
444	fn test_drop_before_returns_entry_stats() {
445		let storage = MemoryCdcStorage::new();
446
447		// Create CDC with known key/value sizes
448		let cdc = Cdc::new(
449			CommitVersion(1),
450			DateTime::from_nanos(12345),
451			Vec::new(),
452			vec![SystemChange::Insert {
453				key: EncodedKey::new(vec![1, 2, 3]),                     // 3 bytes
454				post: EncodedRow(CowVec::new(vec![10, 20, 30, 40, 50])), // 5 bytes
455			}],
456		);
457		storage.write(&cdc).unwrap();
458
459		let result = storage.drop_before(CommitVersion(2)).unwrap();
460
461		assert_eq!(result.count, 1);
462		assert_eq!(result.entries.len(), 1);
463		assert_eq!(result.entries[0].key.as_ref(), &[1, 2, 3]);
464		assert_eq!(result.entries[0].value_bytes, 5);
465	}
466}