Skip to main content

reifydb_cdc/storage/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod memory;
5#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
6pub mod sqlite;
7
8use std::{collections::Bound, sync};
9
10use memory::MemoryCdcStorage;
11use reifydb_core::{
12	common::CommitVersion,
13	encoded::key::EncodedKey,
14	interface::cdc::{Cdc, CdcBatch},
15};
16#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
17use reifydb_sqlite::SqliteConfig;
18use reifydb_type::value::datetime::DateTime;
19
20use crate::error::CdcError;
21
22/// Result type for CDC storage operations.
23pub type CdcStorageResult<T> = Result<T, CdcError>;
24
25enum ScanContinuation {
26	Done(CommitVersion),
27	Continue(Bound<CommitVersion>),
28}
29
30/// Walk a non-empty batch looking for the first entry with `timestamp >= cutoff`.
31/// Returns `Some(version)` on hit; `None` if every entry is still older than
32/// the cutoff (the caller should fetch the next batch or terminate).
33#[inline]
34fn scan_batch_for_cutoff(items: &[Cdc], cutoff: DateTime) -> Option<CommitVersion> {
35	for cdc in items {
36		if cdc.timestamp >= cutoff {
37			return Some(cdc.version);
38		}
39	}
40	None
41}
42
43/// Decide what to do after a batch with no cutoff hit:
44/// - `Done(max + 1)` if the batch was the last one (no `has_more`).
45/// - `Continue(Excluded(last_version))` to fetch the next batch.
46#[inline]
47fn next_start_after_batch(batch: &CdcBatch, max: CommitVersion) -> ScanContinuation {
48	if !batch.has_more {
49		return ScanContinuation::Done(CommitVersion(max.0.saturating_add(1)));
50	}
51	let last = batch.items.last().unwrap().version;
52	ScanContinuation::Continue(Bound::Excluded(last))
53}
54
55/// Normalize a half-open range request from the trait API into an inclusive
56/// `[lo, hi]` pair. Returns `None` if the range is empty (lo > hi after the
57/// Excluded/Unbounded substitutions are applied).
58#[inline]
59pub(crate) fn normalize_range_inclusive(
60	start: Bound<CommitVersion>,
61	end: Bound<CommitVersion>,
62) -> Option<(CommitVersion, CommitVersion)> {
63	let lo_inc = match start {
64		Bound::Included(v) => v,
65		Bound::Excluded(v) => CommitVersion(v.0.saturating_add(1)),
66		Bound::Unbounded => CommitVersion(0),
67	};
68	let hi_inc = match end {
69		Bound::Included(v) => v,
70		Bound::Excluded(v) => CommitVersion(v.0.saturating_sub(1)),
71		Bound::Unbounded => CommitVersion(u64::MAX),
72	};
73	if lo_inc > hi_inc {
74		None
75	} else {
76		Some((lo_inc, hi_inc))
77	}
78}
79
80/// Information about a dropped CDC entry for stats tracking.
81#[derive(Debug, Clone)]
82pub struct DroppedCdcEntry {
83	pub key: EncodedKey,
84	pub value_bytes: u64,
85}
86
87/// Result of a drop_before operation.
88#[derive(Debug, Clone, Default)]
89pub struct DropBeforeResult {
90	pub count: usize,
91	pub entries: Vec<DroppedCdcEntry>,
92}
93
94/// Trait for CDC storage backends.
95///
96/// CDC storage stores fully resolved change data capture entries keyed by CommitVersion.
97/// Unlike MVCC storage, CDC entries are immutable and use simple version keys.
98///
99/// Implementations must be thread-safe and cloneable to support concurrent access
100/// from multiple consumers and the CDC generation pipeline.
101pub trait CdcStorage: Send + Sync + Clone + 'static {
102	/// Write a CDC entry (fully resolved values).
103	///
104	/// The entry is keyed by its version. If an entry already exists at this version,
105	/// it will be overwritten (this should only happen during recovery/replay).
106	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
107
108	/// Read a CDC entry by version.
109	///
110	/// Returns `None` if no entry exists at the given version.
111	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
112
113	/// Read CDC entries in a version range.
114	///
115	/// Returns entries in ascending version order up to `batch_size` entries.
116	/// The `CdcBatch.has_more` flag indicates if more entries exist beyond the batch.
117	fn read_range(
118		&self,
119		start: Bound<CommitVersion>,
120		end: Bound<CommitVersion>,
121		batch_size: u64,
122	) -> CdcStorageResult<CdcBatch>;
123
124	/// Count CDC changes at a specific version.
125	///
126	/// Returns 0 if no entry exists at the given version.
127	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
128
129	/// Get the minimum (oldest) CDC version in storage.
130	///
131	/// Returns `None` if storage is empty.
132	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
133
134	/// Get the maximum (newest) CDC version in storage.
135	///
136	/// Returns `None` if storage is empty.
137	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
138
139	/// Check if a CDC entry exists at the given version.
140	fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
141		Ok(self.read(version)?.is_some())
142	}
143
144	/// Delete all CDC entries with version strictly less than the given version.
145	/// Returns the count and entry information for stats tracking.
146	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
147
148	/// Find the smallest CDC version V such that `cdc[V].timestamp >= cutoff`.
149	///
150	/// Returns `Some(V)` if such an entry exists. If every stored entry has
151	/// `timestamp < cutoff`, returns `Some(max_version + 1)` so callers can pass it
152	/// straight to `drop_before` to evict everything older than the cutoff.
153	/// Returns `None` if storage is empty.
154	///
155	/// Default impl scans `read_range` from the smallest version upward in batches of
156	/// 256, stopping at the first entry whose timestamp is `>= cutoff`. Backends with
157	/// an indexed timestamp column should override.
158	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
159		let Some(min) = self.min_version()? else {
160			return Ok(None);
161		};
162		let Some(max) = self.max_version()? else {
163			return Ok(None);
164		};
165
166		let mut next_start = Bound::Included(min);
167		loop {
168			let batch = self.read_range(next_start, Bound::Unbounded, 256)?;
169			if batch.items.is_empty() {
170				return Ok(Some(CommitVersion(max.0.saturating_add(1))));
171			}
172			if let Some(version) = scan_batch_for_cutoff(&batch.items, cutoff) {
173				return Ok(Some(version));
174			}
175			match next_start_after_batch(&batch, max) {
176				ScanContinuation::Done(v) => return Ok(Some(v)),
177				ScanContinuation::Continue(start) => next_start = start,
178			}
179		}
180	}
181
182	/// Convenience method with default batch size.
183	fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
184		self.read_range(start, end, 1024)
185	}
186
187	/// Scan all CDC entries with the given batch size.
188	fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
189		self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
190	}
191}
192
193/// Blanket implementation for CdcStore compatibility with existing traits.
194impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
195	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
196		(**self).write(cdc)
197	}
198
199	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
200		(**self).read(version)
201	}
202
203	fn read_range(
204		&self,
205		start: Bound<CommitVersion>,
206		end: Bound<CommitVersion>,
207		batch_size: u64,
208	) -> CdcStorageResult<CdcBatch> {
209		(**self).read_range(start, end, batch_size)
210	}
211
212	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
213		(**self).count(version)
214	}
215
216	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
217		(**self).min_version()
218	}
219
220	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
221		(**self).max_version()
222	}
223
224	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
225		(**self).drop_before(version)
226	}
227
228	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
229		(**self).find_ttl_cutoff(cutoff)
230	}
231}
232
233/// CDC storage abstraction enum.
234///
235/// Provides a unified interface over different CDC storage backends.
236#[derive(Clone)]
237pub enum CdcStore {
238	/// In-memory CDC storage backed by a BTreeMap.
239	Memory(MemoryCdcStorage),
240	/// SQLite-backed CDC storage for persistent durability.
241	#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
242	Sqlite(sqlite::storage::SqliteCdcStorage),
243}
244
245impl CdcStore {
246	/// Create an in-memory CDC store.
247	pub fn memory() -> Self {
248		Self::Memory(MemoryCdcStorage::new())
249	}
250
251	/// Create a SQLite-backed CDC store with the given configuration.
252	#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
253	pub fn sqlite(config: SqliteConfig) -> Self {
254		Self::Sqlite(sqlite::storage::SqliteCdcStorage::new(config))
255	}
256
257	/// Write a CDC entry.
258	pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
259		match self {
260			Self::Memory(s) => s.write(cdc),
261			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
262			Self::Sqlite(s) => s.write(cdc),
263		}
264	}
265
266	/// Read a CDC entry by version.
267	pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
268		match self {
269			Self::Memory(s) => s.read(version),
270			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
271			Self::Sqlite(s) => s.read(version),
272		}
273	}
274
275	/// Read CDC entries in a version range.
276	pub fn read_range(
277		&self,
278		start: Bound<CommitVersion>,
279		end: Bound<CommitVersion>,
280		batch_size: u64,
281	) -> CdcStorageResult<CdcBatch> {
282		match self {
283			Self::Memory(s) => s.read_range(start, end, batch_size),
284			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
285			Self::Sqlite(s) => s.read_range(start, end, batch_size),
286		}
287	}
288
289	/// Count CDC changes at a specific version.
290	pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
291		match self {
292			Self::Memory(s) => s.count(version),
293			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
294			Self::Sqlite(s) => s.count(version),
295		}
296	}
297
298	/// Get the minimum (oldest) CDC version in storage.
299	pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
300		match self {
301			Self::Memory(s) => s.min_version(),
302			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
303			Self::Sqlite(s) => s.min_version(),
304		}
305	}
306
307	/// Get the maximum (newest) CDC version in storage.
308	pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
309		match self {
310			Self::Memory(s) => s.max_version(),
311			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
312			Self::Sqlite(s) => s.max_version(),
313		}
314	}
315
316	/// Delete all CDC entries with version strictly less than the given version.
317	pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
318		match self {
319			Self::Memory(s) => s.drop_before(version),
320			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
321			Self::Sqlite(s) => s.drop_before(version),
322		}
323	}
324
325	/// Find the smallest CDC version V such that `cdc[V].timestamp >= cutoff`.
326	pub fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
327		match self {
328			Self::Memory(s) => s.find_ttl_cutoff(cutoff),
329			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
330			Self::Sqlite(s) => s.find_ttl_cutoff(cutoff),
331		}
332	}
333}
334
335impl CdcStorage for CdcStore {
336	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
337		CdcStore::write(self, cdc)
338	}
339
340	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
341		CdcStore::read(self, version)
342	}
343
344	fn read_range(
345		&self,
346		start: Bound<CommitVersion>,
347		end: Bound<CommitVersion>,
348		batch_size: u64,
349	) -> CdcStorageResult<CdcBatch> {
350		CdcStore::read_range(self, start, end, batch_size)
351	}
352
353	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
354		CdcStore::count(self, version)
355	}
356
357	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
358		CdcStore::min_version(self)
359	}
360
361	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
362		CdcStore::max_version(self)
363	}
364
365	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
366		CdcStore::delete_before(self, version)
367	}
368
369	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
370		CdcStore::find_ttl_cutoff(self, cutoff)
371	}
372}