reifydb_cdc/storage/
memory.rs1use std::{
5 collections::{BTreeMap, Bound},
6 sync::Arc,
7};
8
9use reifydb_core::{
10 common::CommitVersion,
11 interface::cdc::{Cdc, CdcBatch},
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14
15use super::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry};
16
17#[derive(Clone)]
18pub struct MemoryCdcStorage {
19 inner: Arc<RwLock<BTreeMap<CommitVersion, Cdc>>>,
20}
21
22impl MemoryCdcStorage {
23 pub fn new() -> Self {
24 Self {
25 inner: Arc::new(RwLock::new(BTreeMap::new())),
26 }
27 }
28
29 pub fn with_entries(entries: impl IntoIterator<Item = Cdc>) -> Self {
30 let map: BTreeMap<CommitVersion, Cdc> = entries.into_iter().map(|cdc| (cdc.version, cdc)).collect();
31 Self {
32 inner: Arc::new(RwLock::new(map)),
33 }
34 }
35
36 pub fn len(&self) -> usize {
37 self.inner.read().len()
38 }
39
40 pub fn is_empty(&self) -> bool {
41 self.inner.read().is_empty()
42 }
43
44 pub fn clear(&self) {
45 self.inner.write().clear();
46 }
47}
48
49impl Default for MemoryCdcStorage {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl CdcStorage for MemoryCdcStorage {
56 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
57 self.inner.write().insert(cdc.version, cdc.clone());
58 Ok(())
59 }
60
61 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
62 Ok(self.inner.read().get(&version).cloned())
63 }
64
65 fn read_range(
66 &self,
67 start: Bound<CommitVersion>,
68 end: Bound<CommitVersion>,
69 batch_size: u64,
70 ) -> CdcStorageResult<CdcBatch> {
71 let guard = self.inner.read();
72 let batch_size = batch_size as usize;
73
74 let range_iter = guard.range((start, end));
75 let mut items: Vec<Cdc> = Vec::with_capacity(batch_size.min(64));
76
77 for (count, (_, cdc)) in range_iter.enumerate() {
78 if count >= batch_size {
79 return Ok(CdcBatch {
81 items,
82 has_more: true,
83 });
84 }
85 items.push(cdc.clone());
86 }
87
88 Ok(CdcBatch {
89 items,
90 has_more: false,
91 })
92 }
93
94 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
95 Ok(self.inner.read().get(&version).map(|cdc| cdc.system_changes.len()).unwrap_or(0))
96 }
97
98 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
99 Ok(self.inner.read().keys().next().copied())
100 }
101
102 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
103 Ok(self.inner.read().keys().next_back().copied())
104 }
105
106 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
107 let mut guard = self.inner.write();
108 let keys_to_remove: Vec<_> = guard.range(..version).map(|(k, _)| *k).collect();
109 let count = keys_to_remove.len();
110
111 let mut entries = Vec::new();
112 for key in &keys_to_remove {
113 if let Some(cdc) = guard.get(key) {
114 for sys_change in &cdc.system_changes {
115 entries.push(DroppedCdcEntry {
116 key: sys_change.key().clone(),
117 value_bytes: sys_change.value_bytes() as u64,
118 });
119 }
120 }
121 }
122
123 for key in keys_to_remove {
124 guard.remove(&key);
125 }
126
127 Ok(DropBeforeResult {
128 count,
129 entries,
130 })
131 }
132}
133
134#[cfg(test)]
135pub mod tests {
136 use std::thread;
137
138 use reifydb_core::{
139 encoded::{key::EncodedKey, row::EncodedRow},
140 interface::cdc::SystemChange,
141 };
142 use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
143
144 use super::*;
145
146 fn make_cdc(version: u64) -> Cdc {
147 Cdc::new(
148 CommitVersion(version),
149 DateTime::from_nanos(12345),
150 Vec::new(),
151 vec![SystemChange::Insert {
152 key: EncodedKey::new(vec![1, 2, 3]),
153 post: EncodedRow(CowVec::new(vec![])),
154 }],
155 )
156 }
157
158 #[test]
159 fn test_clone_shares_storage() {
160 let storage1 = MemoryCdcStorage::new();
161 let storage2 = storage1.clone();
162
163 storage1.write(&make_cdc(1)).unwrap();
164
165 assert!(storage1.read(CommitVersion(1)).unwrap().is_some());
167 assert!(storage2.read(CommitVersion(1)).unwrap().is_some());
168 }
169
170 #[test]
171 fn test_concurrent_access() {
172 let storage = MemoryCdcStorage::new();
173 let mut handles = vec![];
174
175 for i in 0..10 {
177 let s = storage.clone();
178 handles.push(thread::spawn(move || {
179 s.write(&make_cdc(i)).unwrap();
180 }));
181 }
182
183 for h in handles {
184 h.join().unwrap();
185 }
186
187 assert_eq!(storage.len(), 10);
189 }
190
191 #[test]
192 fn test_range_exclusive_bounds() {
193 let storage = MemoryCdcStorage::new();
194
195 for v in 1..=5 {
196 storage.write(&make_cdc(v)).unwrap();
197 }
198
199 let batch = storage
201 .read_range(Bound::Excluded(CommitVersion(2)), Bound::Included(CommitVersion(4)), 100)
202 .unwrap();
203 assert_eq!(batch.items.len(), 2); assert_eq!(batch.items[0].version, CommitVersion(3));
205 assert_eq!(batch.items[1].version, CommitVersion(4));
206
207 let batch = storage
209 .read_range(Bound::Included(CommitVersion(2)), Bound::Excluded(CommitVersion(4)), 100)
210 .unwrap();
211 assert_eq!(batch.items.len(), 2); assert_eq!(batch.items[0].version, CommitVersion(2));
213 assert_eq!(batch.items[1].version, CommitVersion(3));
214 }
215
216 #[test]
217 fn test_overwrite_entry() {
218 let storage = MemoryCdcStorage::new();
219
220 let cdc1 = Cdc::new(
221 CommitVersion(1),
222 DateTime::from_nanos(100),
223 Vec::new(),
224 vec![SystemChange::Insert {
225 key: EncodedKey::new(vec![1]),
226 post: EncodedRow(CowVec::new(vec![])),
227 }],
228 );
229
230 let cdc2 = Cdc::new(
231 CommitVersion(1),
232 DateTime::from_nanos(200), Vec::new(),
234 vec![
235 SystemChange::Insert {
236 key: EncodedKey::new(vec![2]),
237 post: EncodedRow(CowVec::new(vec![])),
238 },
239 SystemChange::Insert {
240 key: EncodedKey::new(vec![3]),
241 post: EncodedRow(CowVec::new(vec![])),
242 },
243 ],
244 );
245
246 storage.write(&cdc1).unwrap();
247 assert_eq!(storage.count(CommitVersion(1)).unwrap(), 1);
248
249 storage.write(&cdc2).unwrap();
250 assert_eq!(storage.count(CommitVersion(1)).unwrap(), 2);
251
252 let read = storage.read(CommitVersion(1)).unwrap().unwrap();
253 assert_eq!(read.timestamp, DateTime::from_nanos(200));
254 }
255}