Skip to main content

reifydb_cdc/storage/
memory.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, Bound},
6	sync::Arc,
7};
8
9use reifydb_core::{
10	common::CommitVersion,
11	interface::cdc::{Cdc, CdcBatch},
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14
15use super::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry};
16
17#[derive(Clone)]
18pub struct MemoryCdcStorage {
19	inner: Arc<RwLock<BTreeMap<CommitVersion, Cdc>>>,
20}
21
22impl MemoryCdcStorage {
23	pub fn new() -> Self {
24		Self {
25			inner: Arc::new(RwLock::new(BTreeMap::new())),
26		}
27	}
28
29	pub fn with_entries(entries: impl IntoIterator<Item = Cdc>) -> Self {
30		let map: BTreeMap<CommitVersion, Cdc> = entries.into_iter().map(|cdc| (cdc.version, cdc)).collect();
31		Self {
32			inner: Arc::new(RwLock::new(map)),
33		}
34	}
35
36	pub fn len(&self) -> usize {
37		self.inner.read().len()
38	}
39
40	pub fn is_empty(&self) -> bool {
41		self.inner.read().is_empty()
42	}
43
44	pub fn clear(&self) {
45		self.inner.write().clear();
46	}
47}
48
49impl Default for MemoryCdcStorage {
50	fn default() -> Self {
51		Self::new()
52	}
53}
54
55impl CdcStorage for MemoryCdcStorage {
56	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
57		self.inner.write().insert(cdc.version, cdc.clone());
58		Ok(())
59	}
60
61	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
62		Ok(self.inner.read().get(&version).cloned())
63	}
64
65	fn read_range(
66		&self,
67		start: Bound<CommitVersion>,
68		end: Bound<CommitVersion>,
69		batch_size: u64,
70	) -> CdcStorageResult<CdcBatch> {
71		let guard = self.inner.read();
72		let batch_size = batch_size as usize;
73
74		let range_iter = guard.range((start, end));
75		let mut items: Vec<Cdc> = Vec::with_capacity(batch_size.min(64));
76
77		for (count, (_, cdc)) in range_iter.enumerate() {
78			if count >= batch_size {
79				// We've hit the batch limit, there are more items
80				return Ok(CdcBatch {
81					items,
82					has_more: true,
83				});
84			}
85			items.push(cdc.clone());
86		}
87
88		Ok(CdcBatch {
89			items,
90			has_more: false,
91		})
92	}
93
94	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
95		Ok(self.inner.read().get(&version).map(|cdc| cdc.system_changes.len()).unwrap_or(0))
96	}
97
98	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
99		Ok(self.inner.read().keys().next().copied())
100	}
101
102	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
103		Ok(self.inner.read().keys().next_back().copied())
104	}
105
106	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
107		let mut guard = self.inner.write();
108		let keys_to_remove: Vec<_> = guard.range(..version).map(|(k, _)| *k).collect();
109		let count = keys_to_remove.len();
110
111		let mut entries = Vec::new();
112		for key in &keys_to_remove {
113			if let Some(cdc) = guard.get(key) {
114				for sys_change in &cdc.system_changes {
115					entries.push(DroppedCdcEntry {
116						key: sys_change.key().clone(),
117						value_bytes: sys_change.value_bytes() as u64,
118					});
119				}
120			}
121		}
122
123		for key in keys_to_remove {
124			guard.remove(&key);
125		}
126
127		Ok(DropBeforeResult {
128			count,
129			entries,
130		})
131	}
132}
133
134#[cfg(test)]
135pub mod tests {
136	use std::thread;
137
138	use reifydb_core::{
139		encoded::{key::EncodedKey, row::EncodedRow},
140		interface::cdc::SystemChange,
141	};
142	use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
143
144	use super::*;
145
146	fn make_cdc(version: u64) -> Cdc {
147		Cdc::new(
148			CommitVersion(version),
149			DateTime::from_nanos(12345),
150			Vec::new(),
151			vec![SystemChange::Insert {
152				key: EncodedKey::new(vec![1, 2, 3]),
153				post: EncodedRow(CowVec::new(vec![])),
154			}],
155		)
156	}
157
158	#[test]
159	fn test_clone_shares_storage() {
160		let storage1 = MemoryCdcStorage::new();
161		let storage2 = storage1.clone();
162
163		storage1.write(&make_cdc(1)).unwrap();
164
165		// Both should see the same data
166		assert!(storage1.read(CommitVersion(1)).unwrap().is_some());
167		assert!(storage2.read(CommitVersion(1)).unwrap().is_some());
168	}
169
170	#[test]
171	fn test_concurrent_access() {
172		let storage = MemoryCdcStorage::new();
173		let mut handles = vec![];
174
175		// Spawn multiple writers
176		for i in 0..10 {
177			let s = storage.clone();
178			handles.push(thread::spawn(move || {
179				s.write(&make_cdc(i)).unwrap();
180			}));
181		}
182
183		for h in handles {
184			h.join().unwrap();
185		}
186
187		// All entries should be present
188		assert_eq!(storage.len(), 10);
189	}
190
191	#[test]
192	fn test_range_exclusive_bounds() {
193		let storage = MemoryCdcStorage::new();
194
195		for v in 1..=5 {
196			storage.write(&make_cdc(v)).unwrap();
197		}
198
199		// Exclusive start
200		let batch = storage
201			.read_range(Bound::Excluded(CommitVersion(2)), Bound::Included(CommitVersion(4)), 100)
202			.unwrap();
203		assert_eq!(batch.items.len(), 2); // 3, 4
204		assert_eq!(batch.items[0].version, CommitVersion(3));
205		assert_eq!(batch.items[1].version, CommitVersion(4));
206
207		// Exclusive end
208		let batch = storage
209			.read_range(Bound::Included(CommitVersion(2)), Bound::Excluded(CommitVersion(4)), 100)
210			.unwrap();
211		assert_eq!(batch.items.len(), 2); // 2, 3
212		assert_eq!(batch.items[0].version, CommitVersion(2));
213		assert_eq!(batch.items[1].version, CommitVersion(3));
214	}
215
216	#[test]
217	fn test_overwrite_entry() {
218		let storage = MemoryCdcStorage::new();
219
220		let cdc1 = Cdc::new(
221			CommitVersion(1),
222			DateTime::from_nanos(100),
223			Vec::new(),
224			vec![SystemChange::Insert {
225				key: EncodedKey::new(vec![1]),
226				post: EncodedRow(CowVec::new(vec![])),
227			}],
228		);
229
230		let cdc2 = Cdc::new(
231			CommitVersion(1),
232			DateTime::from_nanos(200), // Different timestamp
233			Vec::new(),
234			vec![
235				SystemChange::Insert {
236					key: EncodedKey::new(vec![2]),
237					post: EncodedRow(CowVec::new(vec![])),
238				},
239				SystemChange::Insert {
240					key: EncodedKey::new(vec![3]),
241					post: EncodedRow(CowVec::new(vec![])),
242				},
243			],
244		);
245
246		storage.write(&cdc1).unwrap();
247		assert_eq!(storage.count(CommitVersion(1)).unwrap(), 1);
248
249		storage.write(&cdc2).unwrap();
250		assert_eq!(storage.count(CommitVersion(1)).unwrap(), 2);
251
252		let read = storage.read(CommitVersion(1)).unwrap().unwrap();
253		assert_eq!(read.timestamp, DateTime::from_nanos(200));
254	}
255}