reifydb_cdc/storage/
memory.rs1use std::{
5 collections::{BTreeMap, Bound},
6 sync::Arc,
7};
8
9use reifydb_core::{
10 common::CommitVersion,
11 interface::cdc::{Cdc, CdcBatch},
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14
15use super::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry, normalize_range_inclusive};
16
17#[derive(Clone)]
18pub struct MemoryCdcStorage {
19 inner: Arc<RwLock<BTreeMap<CommitVersion, Cdc>>>,
20}
21
22impl MemoryCdcStorage {
23 pub fn new() -> Self {
24 Self {
25 inner: Arc::new(RwLock::new(BTreeMap::new())),
26 }
27 }
28
29 pub fn with_entries(entries: impl IntoIterator<Item = Cdc>) -> Self {
30 let map: BTreeMap<CommitVersion, Cdc> = entries.into_iter().map(|cdc| (cdc.version, cdc)).collect();
31 Self {
32 inner: Arc::new(RwLock::new(map)),
33 }
34 }
35
36 pub fn len(&self) -> usize {
37 self.inner.read().len()
38 }
39
40 pub fn is_empty(&self) -> bool {
41 self.inner.read().is_empty()
42 }
43
44 pub fn clear(&self) {
45 self.inner.write().clear();
46 }
47}
48
49impl Default for MemoryCdcStorage {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl CdcStorage for MemoryCdcStorage {
56 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
57 self.inner.write().insert(cdc.version, cdc.clone());
58 Ok(())
59 }
60
61 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
62 Ok(self.inner.read().get(&version).cloned())
63 }
64
65 fn read_range(
66 &self,
67 start: Bound<CommitVersion>,
68 end: Bound<CommitVersion>,
69 batch_size: u64,
70 ) -> CdcStorageResult<CdcBatch> {
71 let Some((lo_inc, hi_inc)) = normalize_range_inclusive(start, end) else {
72 return Ok(CdcBatch {
73 items: Vec::new(),
74 has_more: false,
75 });
76 };
77 let guard = self.inner.read();
78 let (items, has_more) = collect_range_into(&guard, lo_inc, hi_inc, batch_size as usize);
79 Ok(CdcBatch {
80 items,
81 has_more,
82 })
83 }
84
85 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
86 Ok(self.inner.read().get(&version).map(|cdc| cdc.system_changes.len()).unwrap_or(0))
87 }
88
89 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
90 Ok(self.inner.read().keys().next().copied())
91 }
92
93 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
94 Ok(self.inner.read().keys().next_back().copied())
95 }
96
97 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
98 let mut guard = self.inner.write();
99 let keys_to_remove: Vec<_> = guard.range(..version).map(|(k, _)| *k).collect();
100 let count = keys_to_remove.len();
101 let entries = collect_dropped_entries(&guard, &keys_to_remove);
102 for key in keys_to_remove {
103 guard.remove(&key);
104 }
105 Ok(DropBeforeResult {
106 count,
107 entries,
108 })
109 }
110}
111
112#[inline]
113fn collect_range_into(
114 guard: &BTreeMap<CommitVersion, Cdc>,
115 lo_inc: CommitVersion,
116 hi_inc: CommitVersion,
117 batch_size: usize,
118) -> (Vec<Cdc>, bool) {
119 let mut items: Vec<Cdc> = Vec::with_capacity(batch_size.min(64));
120 for (count, (_, cdc)) in guard.range(lo_inc..=hi_inc).enumerate() {
121 if count >= batch_size {
122 return (items, true);
123 }
124 items.push(cdc.clone());
125 }
126 (items, false)
127}
128
129#[inline]
130fn collect_dropped_entries(guard: &BTreeMap<CommitVersion, Cdc>, keys: &[CommitVersion]) -> Vec<DroppedCdcEntry> {
131 let mut entries = Vec::new();
132 for key in keys {
133 if let Some(cdc) = guard.get(key) {
134 for sys_change in &cdc.system_changes {
135 entries.push(DroppedCdcEntry {
136 key: sys_change.key().clone(),
137 value_bytes: sys_change.value_bytes() as u64,
138 });
139 }
140 }
141 }
142 entries
143}