Skip to main content

reifydb_cdc/storage/
memory.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, Bound},
6	sync::Arc,
7};
8
9use reifydb_core::{
10	common::CommitVersion,
11	interface::cdc::{Cdc, CdcBatch},
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14
15use super::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry, normalize_range_inclusive};
16
17#[derive(Clone)]
18pub struct MemoryCdcStorage {
19	inner: Arc<RwLock<BTreeMap<CommitVersion, Cdc>>>,
20}
21
22impl MemoryCdcStorage {
23	pub fn new() -> Self {
24		Self {
25			inner: Arc::new(RwLock::new(BTreeMap::new())),
26		}
27	}
28
29	pub fn with_entries(entries: impl IntoIterator<Item = Cdc>) -> Self {
30		let map: BTreeMap<CommitVersion, Cdc> = entries.into_iter().map(|cdc| (cdc.version, cdc)).collect();
31		Self {
32			inner: Arc::new(RwLock::new(map)),
33		}
34	}
35
36	pub fn len(&self) -> usize {
37		self.inner.read().len()
38	}
39
40	pub fn is_empty(&self) -> bool {
41		self.inner.read().is_empty()
42	}
43
44	pub fn clear(&self) {
45		self.inner.write().clear();
46	}
47}
48
49impl Default for MemoryCdcStorage {
50	fn default() -> Self {
51		Self::new()
52	}
53}
54
55impl CdcStorage for MemoryCdcStorage {
56	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
57		self.inner.write().insert(cdc.version, cdc.clone());
58		Ok(())
59	}
60
61	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
62		Ok(self.inner.read().get(&version).cloned())
63	}
64
65	fn read_range(
66		&self,
67		start: Bound<CommitVersion>,
68		end: Bound<CommitVersion>,
69		batch_size: u64,
70	) -> CdcStorageResult<CdcBatch> {
71		let Some((lo_inc, hi_inc)) = normalize_range_inclusive(start, end) else {
72			return Ok(CdcBatch {
73				items: Vec::new(),
74				has_more: false,
75			});
76		};
77		let guard = self.inner.read();
78		let (items, has_more) = collect_range_into(&guard, lo_inc, hi_inc, batch_size as usize);
79		Ok(CdcBatch {
80			items,
81			has_more,
82		})
83	}
84
85	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
86		Ok(self.inner.read().get(&version).map(|cdc| cdc.system_changes.len()).unwrap_or(0))
87	}
88
89	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
90		Ok(self.inner.read().keys().next().copied())
91	}
92
93	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
94		Ok(self.inner.read().keys().next_back().copied())
95	}
96
97	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
98		let mut guard = self.inner.write();
99		let keys_to_remove: Vec<_> = guard.range(..version).map(|(k, _)| *k).collect();
100		let count = keys_to_remove.len();
101		let entries = collect_dropped_entries(&guard, &keys_to_remove);
102		for key in keys_to_remove {
103			guard.remove(&key);
104		}
105		Ok(DropBeforeResult {
106			count,
107			entries,
108		})
109	}
110}
111
112#[inline]
113fn collect_range_into(
114	guard: &BTreeMap<CommitVersion, Cdc>,
115	lo_inc: CommitVersion,
116	hi_inc: CommitVersion,
117	batch_size: usize,
118) -> (Vec<Cdc>, bool) {
119	let mut items: Vec<Cdc> = Vec::with_capacity(batch_size.min(64));
120	for (count, (_, cdc)) in guard.range(lo_inc..=hi_inc).enumerate() {
121		if count >= batch_size {
122			return (items, true);
123		}
124		items.push(cdc.clone());
125	}
126	(items, false)
127}
128
129#[inline]
130fn collect_dropped_entries(guard: &BTreeMap<CommitVersion, Cdc>, keys: &[CommitVersion]) -> Vec<DroppedCdcEntry> {
131	let mut entries = Vec::new();
132	for key in keys {
133		if let Some(cdc) = guard.get(key) {
134			for sys_change in &cdc.system_changes {
135				entries.push(DroppedCdcEntry {
136					key: sys_change.key().clone(),
137					value_bytes: sys_change.value_bytes() as u64,
138				});
139			}
140		}
141	}
142	entries
143}