1pub mod memory;
9#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
10pub mod sqlite;
11
12use std::{collections::Bound, sync};
13
14use memory::MemoryCdcStorage;
15use reifydb_core::{
16 common::CommitVersion,
17 encoded::key::EncodedKey,
18 interface::cdc::{Cdc, CdcBatch},
19};
20#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
21use reifydb_sqlite::SqliteConfig;
22use reifydb_type::value::datetime::DateTime;
23
24use crate::error::CdcError;
25
26pub type CdcStorageResult<T> = Result<T, CdcError>;
27
28enum ScanContinuation {
29 Done(CommitVersion),
30 Continue(Bound<CommitVersion>),
31}
32
33#[inline]
34fn scan_batch_for_cutoff(items: &[Cdc], cutoff: DateTime) -> Option<CommitVersion> {
35 for cdc in items {
36 if cdc.timestamp >= cutoff {
37 return Some(cdc.version);
38 }
39 }
40 None
41}
42
43#[inline]
44fn next_start_after_batch(batch: &CdcBatch, max: CommitVersion) -> ScanContinuation {
45 if !batch.has_more {
46 return ScanContinuation::Done(CommitVersion(max.0.saturating_add(1)));
47 }
48 let last = batch.items.last().unwrap().version;
49 ScanContinuation::Continue(Bound::Excluded(last))
50}
51
52#[inline]
53pub(crate) fn normalize_range_inclusive(
54 start: Bound<CommitVersion>,
55 end: Bound<CommitVersion>,
56) -> Option<(CommitVersion, CommitVersion)> {
57 let lo_inc = match start {
58 Bound::Included(v) => v,
59 Bound::Excluded(v) => CommitVersion(v.0.saturating_add(1)),
60 Bound::Unbounded => CommitVersion(0),
61 };
62 let hi_inc = match end {
63 Bound::Included(v) => v,
64 Bound::Excluded(v) => CommitVersion(v.0.saturating_sub(1)),
65 Bound::Unbounded => CommitVersion(u64::MAX),
66 };
67 if lo_inc > hi_inc {
68 None
69 } else {
70 Some((lo_inc, hi_inc))
71 }
72}
73
74#[derive(Debug, Clone)]
75pub struct DroppedCdcEntry {
76 pub key: EncodedKey,
77 pub value_bytes: u64,
78}
79
80#[derive(Debug, Clone, Default)]
81pub struct DropBeforeResult {
82 pub count: usize,
83 pub entries: Vec<DroppedCdcEntry>,
84}
85
86pub trait CdcStorage: Send + Sync + Clone + 'static {
87 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
88
89 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
90
91 fn read_range(
92 &self,
93 start: Bound<CommitVersion>,
94 end: Bound<CommitVersion>,
95 batch_size: u64,
96 ) -> CdcStorageResult<CdcBatch>;
97
98 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
99
100 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
101
102 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
103
104 fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
105 Ok(self.read(version)?.is_some())
106 }
107
108 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
109
110 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
111 let Some(min) = self.min_version()? else {
112 return Ok(None);
113 };
114 let Some(max) = self.max_version()? else {
115 return Ok(None);
116 };
117
118 let mut next_start = Bound::Included(min);
119 loop {
120 let batch = self.read_range(next_start, Bound::Unbounded, 256)?;
121 if batch.items.is_empty() {
122 return Ok(Some(CommitVersion(max.0.saturating_add(1))));
123 }
124 if let Some(version) = scan_batch_for_cutoff(&batch.items, cutoff) {
125 return Ok(Some(version));
126 }
127 match next_start_after_batch(&batch, max) {
128 ScanContinuation::Done(v) => return Ok(Some(v)),
129 ScanContinuation::Continue(start) => next_start = start,
130 }
131 }
132 }
133
134 fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
135 self.read_range(start, end, 1024)
136 }
137
138 fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
139 self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
140 }
141}
142
143impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
144 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
145 (**self).write(cdc)
146 }
147
148 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
149 (**self).read(version)
150 }
151
152 fn read_range(
153 &self,
154 start: Bound<CommitVersion>,
155 end: Bound<CommitVersion>,
156 batch_size: u64,
157 ) -> CdcStorageResult<CdcBatch> {
158 (**self).read_range(start, end, batch_size)
159 }
160
161 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
162 (**self).count(version)
163 }
164
165 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
166 (**self).min_version()
167 }
168
169 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
170 (**self).max_version()
171 }
172
173 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
174 (**self).drop_before(version)
175 }
176
177 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
178 (**self).find_ttl_cutoff(cutoff)
179 }
180}
181
182#[derive(Clone)]
183pub enum CdcStore {
184 Memory(MemoryCdcStorage),
185
186 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
187 Sqlite(sqlite::storage::SqliteCdcStorage),
188}
189
190impl CdcStore {
191 pub fn memory() -> Self {
192 Self::Memory(MemoryCdcStorage::new())
193 }
194
195 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
196 pub fn sqlite(config: SqliteConfig) -> Self {
197 Self::Sqlite(sqlite::storage::SqliteCdcStorage::new(config))
198 }
199
200 pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
201 match self {
202 Self::Memory(s) => s.write(cdc),
203 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
204 Self::Sqlite(s) => s.write(cdc),
205 }
206 }
207
208 pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
209 match self {
210 Self::Memory(s) => s.read(version),
211 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
212 Self::Sqlite(s) => s.read(version),
213 }
214 }
215
216 pub fn read_range(
217 &self,
218 start: Bound<CommitVersion>,
219 end: Bound<CommitVersion>,
220 batch_size: u64,
221 ) -> CdcStorageResult<CdcBatch> {
222 match self {
223 Self::Memory(s) => s.read_range(start, end, batch_size),
224 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
225 Self::Sqlite(s) => s.read_range(start, end, batch_size),
226 }
227 }
228
229 pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
230 match self {
231 Self::Memory(s) => s.count(version),
232 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
233 Self::Sqlite(s) => s.count(version),
234 }
235 }
236
237 pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
238 match self {
239 Self::Memory(s) => s.min_version(),
240 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
241 Self::Sqlite(s) => s.min_version(),
242 }
243 }
244
245 pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
246 match self {
247 Self::Memory(s) => s.max_version(),
248 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
249 Self::Sqlite(s) => s.max_version(),
250 }
251 }
252
253 pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
254 match self {
255 Self::Memory(s) => s.drop_before(version),
256 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
257 Self::Sqlite(s) => s.drop_before(version),
258 }
259 }
260
261 pub fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
262 match self {
263 Self::Memory(s) => s.find_ttl_cutoff(cutoff),
264 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
265 Self::Sqlite(s) => s.find_ttl_cutoff(cutoff),
266 }
267 }
268}
269
270impl CdcStorage for CdcStore {
271 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
272 CdcStore::write(self, cdc)
273 }
274
275 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
276 CdcStore::read(self, version)
277 }
278
279 fn read_range(
280 &self,
281 start: Bound<CommitVersion>,
282 end: Bound<CommitVersion>,
283 batch_size: u64,
284 ) -> CdcStorageResult<CdcBatch> {
285 CdcStore::read_range(self, start, end, batch_size)
286 }
287
288 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
289 CdcStore::count(self, version)
290 }
291
292 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
293 CdcStore::min_version(self)
294 }
295
296 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
297 CdcStore::max_version(self)
298 }
299
300 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
301 CdcStore::delete_before(self, version)
302 }
303
304 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
305 CdcStore::find_ttl_cutoff(self, cutoff)
306 }
307}