1pub mod memory;
11
12use std::{collections::Bound, sync};
13
14use memory::MemoryCdcStorage;
15use reifydb_core::{
16 common::CommitVersion,
17 encoded::key::EncodedKey,
18 interface::cdc::{Cdc, CdcBatch},
19};
20
21use crate::error::CdcError;
22
23pub type CdcStorageResult<T> = Result<T, CdcError>;
25
26#[derive(Debug, Clone)]
28pub struct DroppedCdcEntry {
29 pub key: EncodedKey,
30 pub value_bytes: u64,
31}
32
33#[derive(Debug, Clone, Default)]
35pub struct DropBeforeResult {
36 pub count: usize,
37 pub entries: Vec<DroppedCdcEntry>,
38}
39
40pub trait CdcStorage: Send + Sync + Clone + 'static {
48 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
53
54 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
58
59 fn read_range(
64 &self,
65 start: Bound<CommitVersion>,
66 end: Bound<CommitVersion>,
67 batch_size: u64,
68 ) -> CdcStorageResult<CdcBatch>;
69
70 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
74
75 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
79
80 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
84
85 fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
87 Ok(self.read(version)?.is_some())
88 }
89
90 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
93
94 fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
96 self.read_range(start, end, 1024)
97 }
98
99 fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
101 self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
102 }
103}
104
105impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
107 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
108 (**self).write(cdc)
109 }
110
111 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
112 (**self).read(version)
113 }
114
115 fn read_range(
116 &self,
117 start: Bound<CommitVersion>,
118 end: Bound<CommitVersion>,
119 batch_size: u64,
120 ) -> CdcStorageResult<CdcBatch> {
121 (**self).read_range(start, end, batch_size)
122 }
123
124 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
125 (**self).count(version)
126 }
127
128 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
129 (**self).min_version()
130 }
131
132 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
133 (**self).max_version()
134 }
135
136 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
137 (**self).drop_before(version)
138 }
139}
140
141#[derive(Clone)]
146pub enum CdcStore {
147 Memory(MemoryCdcStorage),
149}
150
151impl CdcStore {
152 pub fn memory() -> Self {
154 Self::Memory(MemoryCdcStorage::new())
155 }
156
157 pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
159 match self {
160 Self::Memory(s) => s.write(cdc),
161 }
162 }
163
164 pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
166 match self {
167 Self::Memory(s) => s.read(version),
168 }
169 }
170
171 pub fn read_range(
173 &self,
174 start: Bound<CommitVersion>,
175 end: Bound<CommitVersion>,
176 batch_size: u64,
177 ) -> CdcStorageResult<CdcBatch> {
178 match self {
179 Self::Memory(s) => s.read_range(start, end, batch_size),
180 }
181 }
182
183 pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
185 match self {
186 Self::Memory(s) => s.count(version),
187 }
188 }
189
190 pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
192 match self {
193 Self::Memory(s) => s.min_version(),
194 }
195 }
196
197 pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
199 match self {
200 Self::Memory(s) => s.max_version(),
201 }
202 }
203
204 pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
206 match self {
207 Self::Memory(s) => s.drop_before(version),
208 }
209 }
210}
211
212impl CdcStorage for CdcStore {
213 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
214 CdcStore::write(self, cdc)
215 }
216
217 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
218 CdcStore::read(self, version)
219 }
220
221 fn read_range(
222 &self,
223 start: Bound<CommitVersion>,
224 end: Bound<CommitVersion>,
225 batch_size: u64,
226 ) -> CdcStorageResult<CdcBatch> {
227 CdcStore::read_range(self, start, end, batch_size)
228 }
229
230 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
231 CdcStore::count(self, version)
232 }
233
234 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
235 CdcStore::min_version(self)
236 }
237
238 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
239 CdcStore::max_version(self)
240 }
241
242 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
243 CdcStore::delete_before(self, version)
244 }
245}
246
247#[cfg(test)]
248pub mod tests {
249 use reifydb_core::{
250 encoded::{key::EncodedKey, row::EncodedRow},
251 interface::cdc::SystemChange,
252 };
253 use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
254
255 use super::*;
256
257 fn create_test_cdc(version: u64, num_changes: usize) -> Cdc {
258 let system_changes: Vec<SystemChange> = (0..num_changes)
259 .map(|i| SystemChange::Insert {
260 key: EncodedKey::new(vec![i as u8]),
261 post: EncodedRow(CowVec::new(vec![])),
262 })
263 .collect();
264
265 Cdc::new(CommitVersion(version), DateTime::from_nanos(12345), Vec::new(), system_changes)
266 }
267
268 #[test]
269 fn test_memory_storage_write_read() {
270 let storage = MemoryCdcStorage::new();
271 let cdc = create_test_cdc(1, 3);
272
273 storage.write(&cdc).unwrap();
274
275 let read_cdc = storage.read(CommitVersion(1)).unwrap();
276 assert!(read_cdc.is_some());
277 let read_cdc = read_cdc.unwrap();
278 assert_eq!(read_cdc.version, CommitVersion(1));
279 assert_eq!(read_cdc.system_changes.len(), 3);
280 }
281
282 #[test]
283 fn test_memory_storage_read_nonexistent() {
284 let storage = MemoryCdcStorage::new();
285 let result = storage.read(CommitVersion(999)).unwrap();
286 assert!(result.is_none());
287 }
288
289 #[test]
290 fn test_memory_storage_range() {
291 let storage = MemoryCdcStorage::new();
292
293 for v in 1..=10 {
294 storage.write(&create_test_cdc(v, 1)).unwrap();
295 }
296
297 let batch = storage
299 .read_range(Bound::Included(CommitVersion(3)), Bound::Included(CommitVersion(7)), 100)
300 .unwrap();
301
302 assert_eq!(batch.items.len(), 5);
303 assert!(!batch.has_more);
304 assert_eq!(batch.items[0].version, CommitVersion(3));
305 assert_eq!(batch.items[4].version, CommitVersion(7));
306 }
307
308 #[test]
309 fn test_memory_storage_range_batch_size() {
310 let storage = MemoryCdcStorage::new();
311
312 for v in 1..=10 {
313 storage.write(&create_test_cdc(v, 1)).unwrap();
314 }
315
316 let batch = storage.read_range(Bound::Unbounded, Bound::Unbounded, 3).unwrap();
318
319 assert_eq!(batch.items.len(), 3);
320 assert!(batch.has_more);
321 }
322
323 #[test]
324 fn test_memory_storage_count() {
325 let storage = MemoryCdcStorage::new();
326 let cdc = create_test_cdc(1, 5);
327 storage.write(&cdc).unwrap();
328
329 assert_eq!(storage.count(CommitVersion(1)).unwrap(), 5);
330 assert_eq!(storage.count(CommitVersion(2)).unwrap(), 0);
331 }
332
333 #[test]
334 fn test_memory_storage_min_max_version() {
335 let storage = MemoryCdcStorage::new();
336
337 assert!(storage.min_version().unwrap().is_none());
338 assert!(storage.max_version().unwrap().is_none());
339
340 storage.write(&create_test_cdc(5, 1)).unwrap();
341 storage.write(&create_test_cdc(3, 1)).unwrap();
342 storage.write(&create_test_cdc(8, 1)).unwrap();
343
344 assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(3)));
345 assert_eq!(storage.max_version().unwrap(), Some(CommitVersion(8)));
346 }
347
348 #[test]
349 fn test_delete_before_empty_storage() {
350 let storage = MemoryCdcStorage::new();
351
352 let result = storage.drop_before(CommitVersion(10)).unwrap();
354 assert_eq!(result.count, 0);
355 assert!(result.entries.is_empty());
356 }
357
358 #[test]
359 fn test_delete_before_some_entries() {
360 let storage = MemoryCdcStorage::new();
361
362 for v in [1, 3, 5, 7, 9] {
364 storage.write(&create_test_cdc(v, 1)).unwrap();
365 }
366
367 let result = storage.drop_before(CommitVersion(5)).unwrap();
369 assert_eq!(result.count, 2);
370 assert_eq!(result.entries.len(), 2);
372
373 assert!(storage.read(CommitVersion(1)).unwrap().is_none());
375 assert!(storage.read(CommitVersion(3)).unwrap().is_none());
376
377 assert!(storage.read(CommitVersion(5)).unwrap().is_some());
379 assert!(storage.read(CommitVersion(7)).unwrap().is_some());
380 assert!(storage.read(CommitVersion(9)).unwrap().is_some());
381
382 assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(5)));
384 }
385
386 #[test]
387 fn test_delete_before_all_entries() {
388 let storage = MemoryCdcStorage::new();
389
390 for v in 1..=3 {
392 storage.write(&create_test_cdc(v, 1)).unwrap();
393 }
394
395 let result = storage.drop_before(CommitVersion(10)).unwrap();
397 assert_eq!(result.count, 3);
398 assert_eq!(result.entries.len(), 3);
399
400 assert!(storage.min_version().unwrap().is_none());
402 assert!(storage.max_version().unwrap().is_none());
403 }
404
405 #[test]
406 fn test_delete_before_none_when_version_too_low() {
407 let storage = MemoryCdcStorage::new();
408
409 for v in 5..=7 {
411 storage.write(&create_test_cdc(v, 1)).unwrap();
412 }
413
414 let result = storage.drop_before(CommitVersion(3)).unwrap();
416 assert_eq!(result.count, 0);
417 assert!(result.entries.is_empty());
418
419 assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(5)));
421 assert_eq!(storage.max_version().unwrap(), Some(CommitVersion(7)));
422 }
423
424 #[test]
425 fn test_delete_before_boundary_condition() {
426 let storage = MemoryCdcStorage::new();
427
428 for v in 1..=5 {
430 storage.write(&create_test_cdc(v, 1)).unwrap();
431 }
432
433 let result = storage.drop_before(CommitVersion(3)).unwrap();
435 assert_eq!(result.count, 2);
436 assert_eq!(result.entries.len(), 2);
437
438 assert!(storage.read(CommitVersion(3)).unwrap().is_some());
440 assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(3)));
441 }
442
443 #[test]
444 fn test_drop_before_returns_entry_stats() {
445 let storage = MemoryCdcStorage::new();
446
447 let cdc = Cdc::new(
449 CommitVersion(1),
450 DateTime::from_nanos(12345),
451 Vec::new(),
452 vec![SystemChange::Insert {
453 key: EncodedKey::new(vec![1, 2, 3]), post: EncodedRow(CowVec::new(vec![10, 20, 30, 40, 50])), }],
456 );
457 storage.write(&cdc).unwrap();
458
459 let result = storage.drop_before(CommitVersion(2)).unwrap();
460
461 assert_eq!(result.count, 1);
462 assert_eq!(result.entries.len(), 1);
463 assert_eq!(result.entries[0].key.as_ref(), &[1, 2, 3]);
464 assert_eq!(result.entries[0].value_bytes, 5);
465 }
466}