Skip to main content

reifydb_cdc/storage/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Pluggable backing store for the CDC log. The in-memory implementation is the testing default; SQLite is the
5//! durable default for production deployments. Both implement the same trait surface so the producer and consumer
6//! sides are agnostic to which is configured.
7
8pub mod memory;
9#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
10pub mod sqlite;
11
12use std::{collections::Bound, sync};
13
14use memory::MemoryCdcStorage;
15use reifydb_core::{
16	common::CommitVersion,
17	encoded::key::EncodedKey,
18	interface::cdc::{Cdc, CdcBatch},
19};
20#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
21use reifydb_sqlite::SqliteConfig;
22use reifydb_type::value::datetime::DateTime;
23
24use crate::error::CdcError;
25
26pub type CdcStorageResult<T> = Result<T, CdcError>;
27
28enum ScanContinuation {
29	Done(CommitVersion),
30	Continue(Bound<CommitVersion>),
31}
32
33#[inline]
34fn scan_batch_for_cutoff(items: &[Cdc], cutoff: DateTime) -> Option<CommitVersion> {
35	for cdc in items {
36		if cdc.timestamp >= cutoff {
37			return Some(cdc.version);
38		}
39	}
40	None
41}
42
43#[inline]
44fn next_start_after_batch(batch: &CdcBatch, max: CommitVersion) -> ScanContinuation {
45	if !batch.has_more {
46		return ScanContinuation::Done(CommitVersion(max.0.saturating_add(1)));
47	}
48	let last = batch.items.last().unwrap().version;
49	ScanContinuation::Continue(Bound::Excluded(last))
50}
51
52#[inline]
53pub(crate) fn normalize_range_inclusive(
54	start: Bound<CommitVersion>,
55	end: Bound<CommitVersion>,
56) -> Option<(CommitVersion, CommitVersion)> {
57	let lo_inc = match start {
58		Bound::Included(v) => v,
59		Bound::Excluded(v) => CommitVersion(v.0.saturating_add(1)),
60		Bound::Unbounded => CommitVersion(0),
61	};
62	let hi_inc = match end {
63		Bound::Included(v) => v,
64		Bound::Excluded(v) => CommitVersion(v.0.saturating_sub(1)),
65		Bound::Unbounded => CommitVersion(u64::MAX),
66	};
67	if lo_inc > hi_inc {
68		None
69	} else {
70		Some((lo_inc, hi_inc))
71	}
72}
73
74#[derive(Debug, Clone)]
75pub struct DroppedCdcEntry {
76	pub key: EncodedKey,
77	pub value_bytes: u64,
78}
79
80#[derive(Debug, Clone, Default)]
81pub struct DropBeforeResult {
82	pub count: usize,
83	pub entries: Vec<DroppedCdcEntry>,
84}
85
86pub trait CdcStorage: Send + Sync + Clone + 'static {
87	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
88
89	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
90
91	fn read_range(
92		&self,
93		start: Bound<CommitVersion>,
94		end: Bound<CommitVersion>,
95		batch_size: u64,
96	) -> CdcStorageResult<CdcBatch>;
97
98	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
99
100	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
101
102	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
103
104	fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
105		Ok(self.read(version)?.is_some())
106	}
107
108	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
109
110	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
111		let Some(min) = self.min_version()? else {
112			return Ok(None);
113		};
114		let Some(max) = self.max_version()? else {
115			return Ok(None);
116		};
117
118		let mut next_start = Bound::Included(min);
119		loop {
120			let batch = self.read_range(next_start, Bound::Unbounded, 256)?;
121			if batch.items.is_empty() {
122				return Ok(Some(CommitVersion(max.0.saturating_add(1))));
123			}
124			if let Some(version) = scan_batch_for_cutoff(&batch.items, cutoff) {
125				return Ok(Some(version));
126			}
127			match next_start_after_batch(&batch, max) {
128				ScanContinuation::Done(v) => return Ok(Some(v)),
129				ScanContinuation::Continue(start) => next_start = start,
130			}
131		}
132	}
133
134	fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
135		self.read_range(start, end, 1024)
136	}
137
138	fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
139		self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
140	}
141}
142
143impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
144	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
145		(**self).write(cdc)
146	}
147
148	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
149		(**self).read(version)
150	}
151
152	fn read_range(
153		&self,
154		start: Bound<CommitVersion>,
155		end: Bound<CommitVersion>,
156		batch_size: u64,
157	) -> CdcStorageResult<CdcBatch> {
158		(**self).read_range(start, end, batch_size)
159	}
160
161	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
162		(**self).count(version)
163	}
164
165	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
166		(**self).min_version()
167	}
168
169	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
170		(**self).max_version()
171	}
172
173	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
174		(**self).drop_before(version)
175	}
176
177	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
178		(**self).find_ttl_cutoff(cutoff)
179	}
180}
181
182#[derive(Clone)]
183pub enum CdcStore {
184	Memory(MemoryCdcStorage),
185
186	#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
187	Sqlite(sqlite::storage::SqliteCdcStorage),
188}
189
190impl CdcStore {
191	pub fn memory() -> Self {
192		Self::Memory(MemoryCdcStorage::new())
193	}
194
195	#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
196	pub fn sqlite(config: SqliteConfig) -> Self {
197		Self::Sqlite(sqlite::storage::SqliteCdcStorage::new(config))
198	}
199
200	pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
201		match self {
202			Self::Memory(s) => s.write(cdc),
203			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
204			Self::Sqlite(s) => s.write(cdc),
205		}
206	}
207
208	pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
209		match self {
210			Self::Memory(s) => s.read(version),
211			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
212			Self::Sqlite(s) => s.read(version),
213		}
214	}
215
216	pub fn read_range(
217		&self,
218		start: Bound<CommitVersion>,
219		end: Bound<CommitVersion>,
220		batch_size: u64,
221	) -> CdcStorageResult<CdcBatch> {
222		match self {
223			Self::Memory(s) => s.read_range(start, end, batch_size),
224			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
225			Self::Sqlite(s) => s.read_range(start, end, batch_size),
226		}
227	}
228
229	pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
230		match self {
231			Self::Memory(s) => s.count(version),
232			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
233			Self::Sqlite(s) => s.count(version),
234		}
235	}
236
237	pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
238		match self {
239			Self::Memory(s) => s.min_version(),
240			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
241			Self::Sqlite(s) => s.min_version(),
242		}
243	}
244
245	pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
246		match self {
247			Self::Memory(s) => s.max_version(),
248			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
249			Self::Sqlite(s) => s.max_version(),
250		}
251	}
252
253	pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
254		match self {
255			Self::Memory(s) => s.drop_before(version),
256			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
257			Self::Sqlite(s) => s.drop_before(version),
258		}
259	}
260
261	pub fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
262		match self {
263			Self::Memory(s) => s.find_ttl_cutoff(cutoff),
264			#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
265			Self::Sqlite(s) => s.find_ttl_cutoff(cutoff),
266		}
267	}
268}
269
270impl CdcStorage for CdcStore {
271	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
272		CdcStore::write(self, cdc)
273	}
274
275	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
276		CdcStore::read(self, version)
277	}
278
279	fn read_range(
280		&self,
281		start: Bound<CommitVersion>,
282		end: Bound<CommitVersion>,
283		batch_size: u64,
284	) -> CdcStorageResult<CdcBatch> {
285		CdcStore::read_range(self, start, end, batch_size)
286	}
287
288	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
289		CdcStore::count(self, version)
290	}
291
292	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
293		CdcStore::min_version(self)
294	}
295
296	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
297		CdcStore::max_version(self)
298	}
299
300	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
301		CdcStore::delete_before(self, version)
302	}
303
304	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
305		CdcStore::find_ttl_cutoff(self, cutoff)
306	}
307}