1pub mod memory;
5#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
6pub mod sqlite;
7
8use std::{collections::Bound, sync};
9
10use memory::MemoryCdcStorage;
11use reifydb_core::{
12 common::CommitVersion,
13 encoded::key::EncodedKey,
14 interface::cdc::{Cdc, CdcBatch},
15};
16#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
17use reifydb_sqlite::SqliteConfig;
18use reifydb_type::value::datetime::DateTime;
19
20use crate::error::CdcError;
21
22pub type CdcStorageResult<T> = Result<T, CdcError>;
24
25enum ScanContinuation {
26 Done(CommitVersion),
27 Continue(Bound<CommitVersion>),
28}
29
30#[inline]
34fn scan_batch_for_cutoff(items: &[Cdc], cutoff: DateTime) -> Option<CommitVersion> {
35 for cdc in items {
36 if cdc.timestamp >= cutoff {
37 return Some(cdc.version);
38 }
39 }
40 None
41}
42
43#[inline]
47fn next_start_after_batch(batch: &CdcBatch, max: CommitVersion) -> ScanContinuation {
48 if !batch.has_more {
49 return ScanContinuation::Done(CommitVersion(max.0.saturating_add(1)));
50 }
51 let last = batch.items.last().unwrap().version;
52 ScanContinuation::Continue(Bound::Excluded(last))
53}
54
55#[inline]
59pub(crate) fn normalize_range_inclusive(
60 start: Bound<CommitVersion>,
61 end: Bound<CommitVersion>,
62) -> Option<(CommitVersion, CommitVersion)> {
63 let lo_inc = match start {
64 Bound::Included(v) => v,
65 Bound::Excluded(v) => CommitVersion(v.0.saturating_add(1)),
66 Bound::Unbounded => CommitVersion(0),
67 };
68 let hi_inc = match end {
69 Bound::Included(v) => v,
70 Bound::Excluded(v) => CommitVersion(v.0.saturating_sub(1)),
71 Bound::Unbounded => CommitVersion(u64::MAX),
72 };
73 if lo_inc > hi_inc {
74 None
75 } else {
76 Some((lo_inc, hi_inc))
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct DroppedCdcEntry {
83 pub key: EncodedKey,
84 pub value_bytes: u64,
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct DropBeforeResult {
90 pub count: usize,
91 pub entries: Vec<DroppedCdcEntry>,
92}
93
94pub trait CdcStorage: Send + Sync + Clone + 'static {
102 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
107
108 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
112
113 fn read_range(
118 &self,
119 start: Bound<CommitVersion>,
120 end: Bound<CommitVersion>,
121 batch_size: u64,
122 ) -> CdcStorageResult<CdcBatch>;
123
124 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
128
129 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
133
134 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
138
139 fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
141 Ok(self.read(version)?.is_some())
142 }
143
144 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
147
148 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
159 let Some(min) = self.min_version()? else {
160 return Ok(None);
161 };
162 let Some(max) = self.max_version()? else {
163 return Ok(None);
164 };
165
166 let mut next_start = Bound::Included(min);
167 loop {
168 let batch = self.read_range(next_start, Bound::Unbounded, 256)?;
169 if batch.items.is_empty() {
170 return Ok(Some(CommitVersion(max.0.saturating_add(1))));
171 }
172 if let Some(version) = scan_batch_for_cutoff(&batch.items, cutoff) {
173 return Ok(Some(version));
174 }
175 match next_start_after_batch(&batch, max) {
176 ScanContinuation::Done(v) => return Ok(Some(v)),
177 ScanContinuation::Continue(start) => next_start = start,
178 }
179 }
180 }
181
182 fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
184 self.read_range(start, end, 1024)
185 }
186
187 fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
189 self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
190 }
191}
192
193impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
195 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
196 (**self).write(cdc)
197 }
198
199 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
200 (**self).read(version)
201 }
202
203 fn read_range(
204 &self,
205 start: Bound<CommitVersion>,
206 end: Bound<CommitVersion>,
207 batch_size: u64,
208 ) -> CdcStorageResult<CdcBatch> {
209 (**self).read_range(start, end, batch_size)
210 }
211
212 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
213 (**self).count(version)
214 }
215
216 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
217 (**self).min_version()
218 }
219
220 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
221 (**self).max_version()
222 }
223
224 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
225 (**self).drop_before(version)
226 }
227
228 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
229 (**self).find_ttl_cutoff(cutoff)
230 }
231}
232
233#[derive(Clone)]
237pub enum CdcStore {
238 Memory(MemoryCdcStorage),
240 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
242 Sqlite(sqlite::storage::SqliteCdcStorage),
243}
244
245impl CdcStore {
246 pub fn memory() -> Self {
248 Self::Memory(MemoryCdcStorage::new())
249 }
250
251 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
253 pub fn sqlite(config: SqliteConfig) -> Self {
254 Self::Sqlite(sqlite::storage::SqliteCdcStorage::new(config))
255 }
256
257 pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
259 match self {
260 Self::Memory(s) => s.write(cdc),
261 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
262 Self::Sqlite(s) => s.write(cdc),
263 }
264 }
265
266 pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
268 match self {
269 Self::Memory(s) => s.read(version),
270 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
271 Self::Sqlite(s) => s.read(version),
272 }
273 }
274
275 pub fn read_range(
277 &self,
278 start: Bound<CommitVersion>,
279 end: Bound<CommitVersion>,
280 batch_size: u64,
281 ) -> CdcStorageResult<CdcBatch> {
282 match self {
283 Self::Memory(s) => s.read_range(start, end, batch_size),
284 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
285 Self::Sqlite(s) => s.read_range(start, end, batch_size),
286 }
287 }
288
289 pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
291 match self {
292 Self::Memory(s) => s.count(version),
293 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
294 Self::Sqlite(s) => s.count(version),
295 }
296 }
297
298 pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
300 match self {
301 Self::Memory(s) => s.min_version(),
302 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
303 Self::Sqlite(s) => s.min_version(),
304 }
305 }
306
307 pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
309 match self {
310 Self::Memory(s) => s.max_version(),
311 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
312 Self::Sqlite(s) => s.max_version(),
313 }
314 }
315
316 pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
318 match self {
319 Self::Memory(s) => s.drop_before(version),
320 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
321 Self::Sqlite(s) => s.drop_before(version),
322 }
323 }
324
325 pub fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
327 match self {
328 Self::Memory(s) => s.find_ttl_cutoff(cutoff),
329 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
330 Self::Sqlite(s) => s.find_ttl_cutoff(cutoff),
331 }
332 }
333}
334
335impl CdcStorage for CdcStore {
336 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
337 CdcStore::write(self, cdc)
338 }
339
340 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
341 CdcStore::read(self, version)
342 }
343
344 fn read_range(
345 &self,
346 start: Bound<CommitVersion>,
347 end: Bound<CommitVersion>,
348 batch_size: u64,
349 ) -> CdcStorageResult<CdcBatch> {
350 CdcStore::read_range(self, start, end, batch_size)
351 }
352
353 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
354 CdcStore::count(self, version)
355 }
356
357 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
358 CdcStore::min_version(self)
359 }
360
361 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
362 CdcStore::max_version(self)
363 }
364
365 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
366 CdcStore::delete_before(self, version)
367 }
368
369 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
370 CdcStore::find_ttl_cutoff(self, cutoff)
371 }
372}