1use std::{
5 collections::Bound,
6 iter::repeat_n,
7 sync::{
8 Arc,
9 atomic::{AtomicU8, Ordering},
10 },
11};
12
13use postcard::{from_bytes, to_stdvec};
14use reifydb_core::{
15 common::CommitVersion,
16 interface::cdc::{Cdc, CdcBatch, SystemChange},
17};
18use reifydb_runtime::sync::mutex::Mutex;
19use reifydb_sqlite::{
20 SqliteConfig,
21 connection::{connect, convert_flags, resolve_db_path},
22 pragma,
23};
24use reifydb_type::value::datetime::DateTime;
25use rusqlite::{
26 Connection, Error::QueryReturnedNoRows, Result as RusqliteResult, Transaction, params, params_from_iter,
27 types::Value as SqlValue,
28};
29
30use crate::{
31 compact::{block, block::CompactBlockSummary, cache::BlockCache},
32 error::CdcError,
33 storage::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry, normalize_range_inclusive},
34};
35
36#[derive(Clone)]
37pub struct SqliteCdcStorage {
38 inner: Arc<Inner>,
39}
40
41struct Inner {
42 conn: Mutex<Connection>,
43 block_cache: BlockCache,
44 last_zstd_level: AtomicU8,
45}
46
47type CompactionCandidates = (Vec<Cdc>, Vec<Vec<u8>>);
48
49type RangeSnapshot = (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>);
50
51struct FullBlockScan {
52 cdc_count: usize,
53 entries: Vec<DroppedCdcEntry>,
54 pks: Vec<Vec<u8>>,
55}
56
57struct StraddleScan {
58 cdc_count: usize,
59 entries: Vec<DroppedCdcEntry>,
60 actions: Vec<(Vec<u8>, BlockOutcome)>,
61}
62
63struct LiveScan {
64 cdc_count: usize,
65 entries: Vec<DroppedCdcEntry>,
66}
67
68enum BlockOutcome {
69 Delete,
70 Rewrite {
71 survivors: Vec<Cdc>,
72 },
73}
74
75impl SqliteCdcStorage {
76 pub fn new(config: SqliteConfig) -> Self {
77 Self::new_with_cache_capacity(config, BlockCache::DEFAULT_CAPACITY)
78 }
79
80 pub fn new_with_cache_capacity(config: SqliteConfig, cache_capacity: usize) -> Self {
81 let conn = open_connection(&config);
82 Self {
83 inner: Arc::new(Inner {
84 conn: Mutex::new(conn),
85 block_cache: BlockCache::new(cache_capacity),
86 last_zstd_level: AtomicU8::new(3),
87 }),
88 }
89 }
90
91 pub fn in_memory() -> Self {
92 Self::new(SqliteConfig::in_memory())
93 }
94
95 fn ensure_schema(conn: &Connection) {
96 create_cdc_table(conn);
97 create_cdc_block_table(conn);
98 create_block_timestamp_index(conn);
99 }
100
101 pub fn incremental_vacuum(&self) {
102 let _ = pragma::incremental_vacuum(&self.inner.conn.lock());
103 }
104
105 pub fn shrink_memory(&self) {
106 let _ = pragma::shrink_memory(&self.inner.conn.lock());
107 }
108
109 pub fn shutdown(&self) {
110 let _ = pragma::shutdown(&self.inner.conn.lock());
111 }
112
113 fn read_from_blocks(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
114 let v_bytes = version_to_bytes(version);
115 let Some((max_bytes, payload)) = self.find_block_for_version(&v_bytes)? else {
116 return Ok(None);
117 };
118 let block_max = bytes_to_version(&max_bytes)?;
119 let entries = self.load_block_cached(block_max, &payload)?;
120 Ok(entries.iter().find(|c| c.version == version).cloned())
121 }
122
123 #[inline]
124 fn find_block_for_version(&self, v_bytes: &[u8; 8]) -> CdcStorageResult<Option<(Vec<u8>, Vec<u8>)>> {
125 let conn = self.inner.conn.lock();
126 conn.query_row(
127 r#"SELECT max_version, payload FROM "cdc_block"
128 WHERE max_version >= ?1 AND min_version <= ?1
129 ORDER BY max_version ASC LIMIT 1"#,
130 params![v_bytes.as_slice()],
131 |row| Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?)),
132 )
133 .map(Some)
134 .or_else(|e| match e {
135 QueryReturnedNoRows => Ok(None),
136 e => Err(CdcError::Internal(format!("read_from_blocks: {e}"))),
137 })
138 }
139
140 fn load_block_cached(&self, block_max: CommitVersion, payload: &[u8]) -> CdcStorageResult<Arc<Vec<Cdc>>> {
141 if let Some(hit) = self.inner.block_cache.get(block_max) {
142 return Ok(hit);
143 }
144 let entries = block::decode(payload)?;
145 let arc = Arc::new(entries);
146 self.inner.block_cache.put(block_max, arc.clone());
147 Ok(arc)
148 }
149
150 fn read_range_live(
151 &self,
152 start: Bound<CommitVersion>,
153 end: Bound<CommitVersion>,
154 batch_size: u64,
155 ) -> CdcStorageResult<CdcBatch> {
156 let (lower_sql, lower_bytes) = lower_bind_clause(start);
157 let (upper_sql, upper_bytes) = upper_bind_clause(end);
158 let sql = format!(
159 r#"SELECT payload FROM "cdc" WHERE 1=1{lower_sql}{upper_sql} ORDER BY version ASC LIMIT ?"#
160 );
161 let limit = (batch_size as i64).saturating_add(1);
162
163 let conn = self.inner.conn.lock();
164 let mut stmt = conn.prepare(&sql).map_err(|e| CdcError::Internal(format!("range prepare: {e}")))?;
165 let values = build_range_params(lower_bytes, upper_bytes, limit);
166 let rows = stmt
167 .query_map(params_from_iter(values.iter()), |row| row.get::<_, Vec<u8>>(0))
168 .map_err(|e| CdcError::Internal(format!("range rows: {e}")))?;
169
170 let (items, has_more) = decode_payload_rows(rows, batch_size as usize)?;
171 Ok(CdcBatch {
172 items,
173 has_more,
174 })
175 }
176
177 fn min_version_live(&self) -> CdcStorageResult<Option<CommitVersion>> {
178 let conn = self.inner.conn.lock();
179 let r: Option<Vec<u8>> = conn
180 .query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
181 .ok()
182 .flatten();
183 r.map(|b| bytes_to_version(&b)).transpose()
184 }
185
186 fn max_version_blocks(&self) -> CdcStorageResult<Option<CommitVersion>> {
187 let conn = self.inner.conn.lock();
188 let r: Option<Vec<u8>> = conn
189 .query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| {
190 row.get::<_, Option<Vec<u8>>>(0)
191 })
192 .ok()
193 .flatten();
194 r.map(|b| bytes_to_version(&b)).transpose()
195 }
196
197 pub fn compact_oldest(
198 &self,
199 target_size: usize,
200 safety_lag: u64,
201 zstd_level: u8,
202 producer_watermark: CommitVersion,
203 ) -> CdcStorageResult<Option<CompactBlockSummary>> {
204 self.compact_oldest_inner(target_size, safety_lag, false, zstd_level, producer_watermark)
205 }
206
207 pub fn compact_all(
208 &self,
209 target_size: usize,
210 zstd_level: u8,
211 producer_watermark: CommitVersion,
212 ) -> CdcStorageResult<Vec<CompactBlockSummary>> {
213 let mut out = Vec::new();
214 while let Some(s) = self.compact_oldest_inner(target_size, 0, false, zstd_level, producer_watermark)? {
215 out.push(s);
216 }
217 if let Some(tail) = self.compact_oldest_inner(target_size, 0, true, zstd_level, producer_watermark)? {
218 out.push(tail);
219 }
220 Ok(out)
221 }
222
223 fn compact_oldest_inner(
224 &self,
225 target_size: usize,
226 safety_lag: u64,
227 allow_partial: bool,
228 zstd_level: u8,
229 producer_watermark: CommitVersion,
230 ) -> CdcStorageResult<Option<CompactBlockSummary>> {
231 if target_size == 0 {
232 return Ok(None);
233 }
234 self.inner.last_zstd_level.store(zstd_level, Ordering::Relaxed);
235
236 let Some((entries, version_blobs)) =
237 self.select_oldest_eligible(target_size, safety_lag, allow_partial, producer_watermark)?
238 else {
239 return Ok(None);
240 };
241
242 let payload = block::encode(&entries, zstd_level)?;
243 let compressed_bytes = payload.len();
244 let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(&entries);
245 let min_version = entries.first().unwrap().version;
246 let max_version = entries.last().unwrap().version;
247
248 let committed = self.commit_block_swap(
249 &version_blobs,
250 &payload,
251 min_version,
252 max_version,
253 min_ts_nanos,
254 max_ts_nanos,
255 entries.len(),
256 )?;
257 if !committed {
258 return Ok(None);
259 }
260 Ok(Some(build_block_summary(&entries, min_version, max_version, compressed_bytes)))
261 }
262
263 fn select_oldest_eligible(
264 &self,
265 target_size: usize,
266 safety_lag: u64,
267 allow_partial: bool,
268 producer_watermark: CommitVersion,
269 ) -> CdcStorageResult<Option<CompactionCandidates>> {
270 let conn = self.inner.conn.lock();
271 let Some(max_v) = query_max_live_version(&conn)? else {
272 return Ok(None);
273 };
274 let Some(eligible_max) = compute_eligible_max(max_v, safety_lag, producer_watermark) else {
275 return Ok(None);
276 };
277 let (entries, version_blobs) = query_oldest_candidates(&conn, eligible_max, target_size)?;
278 if entries.is_empty() {
279 return Ok(None);
280 }
281 if !allow_partial && entries.len() < target_size {
282 return Ok(None);
283 }
284 Ok(Some((entries, version_blobs)))
285 }
286
287 #[allow(clippy::too_many_arguments)]
288 fn commit_block_swap(
289 &self,
290 version_blobs: &[Vec<u8>],
291 payload: &[u8],
292 min_version: CommitVersion,
293 max_version: CommitVersion,
294 min_ts_nanos: i64,
295 max_ts_nanos: i64,
296 num_entries: usize,
297 ) -> CdcStorageResult<bool> {
298 let conn = self.inner.conn.lock();
299 let tx = conn
300 .unchecked_transaction()
301 .map_err(|e| CdcError::Internal(format!("compact tx begin: {e}")))?;
302
303 if !delete_compacted_versions(&tx, version_blobs, num_entries)? {
304 tx.rollback().map_err(|e| CdcError::Internal(format!("compact rollback: {e}")))?;
305 return Ok(false);
306 }
307 insert_compacted_block(
308 &tx,
309 payload,
310 min_version,
311 max_version,
312 min_ts_nanos,
313 max_ts_nanos,
314 num_entries,
315 )?;
316 tx.commit().map_err(|e| CdcError::Internal(format!("compact commit: {e}")))?;
317 Ok(true)
318 }
319
320 #[inline]
321 fn snapshot_block_and_live(
322 &self,
323 lo_inc: CommitVersion,
324 hi_inc: CommitVersion,
325 batch_size: u64,
326 ) -> CdcStorageResult<RangeSnapshot> {
327 let lo_b = version_to_bytes(lo_inc);
328 let hi_b = version_to_bytes(hi_inc);
329 let limit = (batch_size as i64).saturating_add(1);
330 let conn = self.inner.conn.lock();
331
332 let block_rows = read_block_index_rows(&conn, &lo_b, &hi_b)?;
333 let live_payloads = read_live_payloads(&conn, &lo_b, &hi_b, limit)?;
334 Ok((block_rows, live_payloads))
335 }
336
337 #[inline]
338 fn decode_block_rows(
339 &self,
340 block_rows: Vec<(Vec<u8>, Vec<u8>)>,
341 lo_inc: CommitVersion,
342 hi_inc: CommitVersion,
343 ) -> CdcStorageResult<Vec<Cdc>> {
344 let mut block_items: Vec<Cdc> = Vec::new();
345 for (max_bytes, payload) in block_rows {
346 let block_max = bytes_to_version(&max_bytes)?;
347 let entries = self.load_block_cached(block_max, &payload)?;
348 for cdc in entries.iter() {
349 if cdc.version >= lo_inc && cdc.version <= hi_inc {
350 block_items.push(cdc.clone());
351 }
352 }
353 }
354 Ok(block_items)
355 }
356
357 #[inline]
358 fn scan_full_blocks_below(
359 &self,
360 conn: &Connection,
361 version_bytes: &[u8; 8],
362 ) -> CdcStorageResult<FullBlockScan> {
363 let mut stmt = conn
364 .prepare(
365 r#"SELECT max_version, payload FROM "cdc_block"
366 WHERE max_version < ?1 ORDER BY max_version ASC"#,
367 )
368 .map_err(|e| CdcError::Internal(format!("drop blocks prepare: {e}")))?;
369 let rows = stmt
370 .query_map(params![version_bytes.as_slice()], |row| {
371 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
372 })
373 .map_err(|e| CdcError::Internal(format!("drop blocks rows: {e}")))?;
374 let mut entries = Vec::new();
375 let mut pks = Vec::new();
376 let mut cdc_count = 0;
377 for row in rows {
378 let (max_bytes, payload) =
379 row.map_err(|e| CdcError::Internal(format!("drop blocks row: {e}")))?;
380 let block_max = bytes_to_version(&max_bytes)?;
381 for cdc in &block::decode(&payload)? {
382 cdc_count += 1;
383 extend_dropped_entries(&mut entries, &cdc.system_changes);
384 }
385 self.inner.block_cache.remove(block_max);
386 pks.push(max_bytes);
387 }
388 Ok(FullBlockScan {
389 cdc_count,
390 entries,
391 pks,
392 })
393 }
394
395 #[inline]
396 fn scan_straddle_blocks(
397 &self,
398 conn: &Connection,
399 version: CommitVersion,
400 version_bytes: &[u8; 8],
401 ) -> CdcStorageResult<StraddleScan> {
402 let mut stmt = conn
403 .prepare(
404 r#"SELECT max_version, payload FROM "cdc_block"
405 WHERE min_version < ?1 AND max_version >= ?1
406 ORDER BY max_version ASC"#,
407 )
408 .map_err(|e| CdcError::Internal(format!("drop straddle prepare: {e}")))?;
409 let rows = stmt
410 .query_map(params![version_bytes.as_slice()], |row| {
411 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
412 })
413 .map_err(|e| CdcError::Internal(format!("drop straddle rows: {e}")))?;
414 let mut entries = Vec::new();
415 let mut actions = Vec::new();
416 let mut cdc_count = 0;
417 for row in rows {
418 let (max_bytes, payload) =
419 row.map_err(|e| CdcError::Internal(format!("drop straddle row: {e}")))?;
420 let block_max = bytes_to_version(&max_bytes)?;
421 let decoded = block::decode(&payload)?;
422 let mut survivors: Vec<Cdc> = Vec::with_capacity(decoded.len());
423 for cdc in decoded {
424 if cdc.version < version {
425 cdc_count += 1;
426 extend_dropped_entries(&mut entries, &cdc.system_changes);
427 } else {
428 survivors.push(cdc);
429 }
430 }
431 self.inner.block_cache.remove(block_max);
432 let outcome = if survivors.is_empty() {
433 BlockOutcome::Delete
434 } else {
435 BlockOutcome::Rewrite {
436 survivors,
437 }
438 };
439 actions.push((max_bytes, outcome));
440 }
441 Ok(StraddleScan {
442 cdc_count,
443 entries,
444 actions,
445 })
446 }
447}
448
449fn open_connection(config: &SqliteConfig) -> Connection {
450 let db_path = resolve_db_path(config.path.clone(), "cdc.db");
451 let flags = convert_flags(&config.flags);
452 let conn = connect(&db_path, flags).expect("Failed to connect to CDC SQLite database");
453 pragma::apply(&conn, config).expect("Failed to configure CDC SQLite pragmas");
454 SqliteCdcStorage::ensure_schema(&conn);
455 conn
456}
457
458fn create_cdc_table(conn: &Connection) {
459 conn.execute(
460 r#"CREATE TABLE IF NOT EXISTS "cdc" (
461 version BLOB PRIMARY KEY,
462 payload BLOB NOT NULL
463 ) WITHOUT ROWID"#,
464 [],
465 )
466 .expect("Failed to create cdc table");
467}
468
469fn create_cdc_block_table(conn: &Connection) {
470 conn.execute(
471 r#"CREATE TABLE IF NOT EXISTS "cdc_block" (
472 max_version BLOB PRIMARY KEY,
473 min_version BLOB NOT NULL,
474 min_timestamp INTEGER NOT NULL,
475 max_timestamp INTEGER NOT NULL,
476 num_entries INTEGER NOT NULL,
477 payload BLOB NOT NULL
478 ) WITHOUT ROWID"#,
479 [],
480 )
481 .expect("Failed to create cdc_block table");
482}
483
484fn create_block_timestamp_index(conn: &Connection) {
485 conn.execute(
486 r#"CREATE INDEX IF NOT EXISTS "cdc_block_max_ts_idx"
487 ON "cdc_block"(max_timestamp)"#,
488 [],
489 )
490 .expect("Failed to create cdc_block_max_ts index");
491}
492
493#[inline]
494fn lower_bind_clause(start: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
495 match start {
496 Bound::Included(v) => (" AND version >= ?", Some(version_to_bytes(v))),
497 Bound::Excluded(v) => (" AND version > ?", Some(version_to_bytes(v))),
498 Bound::Unbounded => ("", None),
499 }
500}
501
502#[inline]
503fn upper_bind_clause(end: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
504 match end {
505 Bound::Included(v) => (" AND version <= ?", Some(version_to_bytes(v))),
506 Bound::Excluded(v) => (" AND version < ?", Some(version_to_bytes(v))),
507 Bound::Unbounded => ("", None),
508 }
509}
510
511#[inline]
512fn build_range_params(lower_bytes: Option<[u8; 8]>, upper_bytes: Option<[u8; 8]>, limit: i64) -> Vec<SqlValue> {
513 let mut values: Vec<SqlValue> = Vec::new();
514 if let Some(b) = lower_bytes {
515 values.push(SqlValue::Blob(b.to_vec()));
516 }
517 if let Some(b) = upper_bytes {
518 values.push(SqlValue::Blob(b.to_vec()));
519 }
520 values.push(SqlValue::Integer(limit));
521 values
522}
523
524#[inline]
525fn decode_payload_rows<I>(rows: I, batch_size: usize) -> CdcStorageResult<(Vec<Cdc>, bool)>
526where
527 I: IntoIterator<Item = RusqliteResult<Vec<u8>>>,
528{
529 let mut items: Vec<Cdc> = Vec::new();
530 for row in rows {
531 let bytes = row.map_err(|e| CdcError::Internal(format!("range row: {e}")))?;
532 let cdc: Cdc =
533 from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode range: {e}")))?;
534 items.push(cdc);
535 }
536 let has_more = items.len() > batch_size;
537 if has_more {
538 items.truncate(batch_size);
539 }
540 Ok((items, has_more))
541}
542
543#[inline]
544fn query_max_live_version(conn: &Connection) -> CdcStorageResult<Option<u64>> {
545 let max_live: Option<Vec<u8>> = conn
546 .query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
547 .ok()
548 .flatten();
549 max_live.map(|b| bytes_to_version(&b).map(|v| v.0)).transpose()
550}
551
552#[inline]
553fn compute_eligible_max(max_v: u64, safety_lag: u64, producer_watermark: CommitVersion) -> Option<CommitVersion> {
554 if max_v < safety_lag {
555 return None;
556 }
557 let safety_capped = max_v.saturating_sub(safety_lag);
558 Some(CommitVersion(safety_capped.min(producer_watermark.0)))
559}
560
561#[inline]
562fn query_oldest_candidates(
563 conn: &Connection,
564 eligible_max: CommitVersion,
565 target_size: usize,
566) -> CdcStorageResult<(Vec<Cdc>, Vec<Vec<u8>>)> {
567 let eligible_max_bytes = version_to_bytes(eligible_max);
568 let mut stmt = conn
569 .prepare(
570 r#"SELECT version, payload FROM "cdc"
571 WHERE version <= ?1 ORDER BY version ASC LIMIT ?2"#,
572 )
573 .map_err(|e| CdcError::Internal(format!("compact prepare: {e}")))?;
574 let limit = (target_size as i64).saturating_add(1);
575 let rows = stmt
576 .query_map(params![eligible_max_bytes.as_slice(), limit], |row| {
577 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
578 })
579 .map_err(|e| CdcError::Internal(format!("compact rows: {e}")))?;
580
581 let mut version_blobs: Vec<Vec<u8>> = Vec::with_capacity(target_size);
582 let mut entries: Vec<Cdc> = Vec::with_capacity(target_size);
583 for row in rows {
584 if entries.len() == target_size {
585 break;
586 }
587 let (vb, pb) = row.map_err(|e| CdcError::Internal(format!("compact row: {e}")))?;
588 let cdc: Cdc =
589 from_bytes(&pb).map_err(|e| CdcError::Codec(format!("postcard decode in compact: {e}")))?;
590 version_blobs.push(vb);
591 entries.push(cdc);
592 }
593 Ok((entries, version_blobs))
594}
595
596#[inline]
597fn build_block_summary(
598 entries: &[Cdc],
599 min_version: CommitVersion,
600 max_version: CommitVersion,
601 compressed_bytes: usize,
602) -> CompactBlockSummary {
603 CompactBlockSummary {
604 min_version,
605 max_version,
606 num_entries: entries.len(),
607 compressed_bytes,
608 }
609}
610
611#[inline]
612fn query_min_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
613 let r: Option<Vec<u8>> = conn
614 .query_row(r#"SELECT MIN(min_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
615 .ok()
616 .flatten();
617 r.map(|b| bytes_to_version(&b)).transpose()
618}
619
620#[inline]
621fn query_min_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
622 let r: Option<Vec<u8>> = conn
623 .query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
624 .ok()
625 .flatten();
626 r.map(|b| bytes_to_version(&b)).transpose()
627}
628
629#[inline]
630fn query_max_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
631 let r: Option<Vec<u8>> = conn
632 .query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
633 .ok()
634 .flatten();
635 r.map(|b| bytes_to_version(&b)).transpose()
636}
637
638#[inline]
639fn query_max_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
640 let r: Option<Vec<u8>> = conn
641 .query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
642 .ok()
643 .flatten();
644 r.map(|b| bytes_to_version(&b)).transpose()
645}
646
647fn version_to_bytes(v: CommitVersion) -> [u8; 8] {
648 v.0.to_be_bytes()
649}
650
651fn bytes_to_version(bytes: &[u8]) -> CdcStorageResult<CommitVersion> {
652 let arr: [u8; 8] = bytes.try_into().map_err(|_| CdcError::Internal("bad version bytes".to_string()))?;
653 Ok(CommitVersion(u64::from_be_bytes(arr)))
654}
655
656fn datetime_to_nanos(dt: &DateTime) -> i64 {
657 dt.to_nanos() as i64
658}
659
660fn summarize_timestamps(entries: &[Cdc]) -> (i64, i64) {
661 entries.iter().fold((i64::MAX, i64::MIN), |(lo, hi), c| {
662 let n = datetime_to_nanos(&c.timestamp);
663 (lo.min(n), hi.max(n))
664 })
665}
666
667#[inline]
668fn read_block_index_rows(
669 conn: &Connection,
670 lo_b: &[u8; 8],
671 hi_b: &[u8; 8],
672) -> CdcStorageResult<Vec<(Vec<u8>, Vec<u8>)>> {
673 let mut stmt = conn
674 .prepare(
675 r#"SELECT max_version, payload FROM "cdc_block"
676 WHERE max_version >= ?1 AND min_version <= ?2
677 ORDER BY max_version ASC"#,
678 )
679 .map_err(|e| CdcError::Internal(format!("range blocks prepare: {e}")))?;
680 let rows = stmt
681 .query_map(params![lo_b.as_slice(), hi_b.as_slice()], |row| {
682 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
683 })
684 .map_err(|e| CdcError::Internal(format!("range blocks rows: {e}")))?;
685 let mut out = Vec::new();
686 for r in rows {
687 out.push(r.map_err(|e| CdcError::Internal(format!("range blocks row: {e}")))?);
688 }
689 Ok(out)
690}
691
692#[inline]
693fn read_live_payloads(conn: &Connection, lo_b: &[u8; 8], hi_b: &[u8; 8], limit: i64) -> CdcStorageResult<Vec<Vec<u8>>> {
694 let mut stmt = conn
695 .prepare(
696 r#"SELECT payload FROM "cdc"
697 WHERE version >= ?1 AND version <= ?2
698 ORDER BY version ASC LIMIT ?3"#,
699 )
700 .map_err(|e| CdcError::Internal(format!("range live prepare: {e}")))?;
701 let rows = stmt
702 .query_map(params![lo_b.as_slice(), hi_b.as_slice(), limit], |row| row.get::<_, Vec<u8>>(0))
703 .map_err(|e| CdcError::Internal(format!("range live rows: {e}")))?;
704 let mut out = Vec::new();
705 for r in rows {
706 out.push(r.map_err(|e| CdcError::Internal(format!("range live row: {e}")))?);
707 }
708 Ok(out)
709}
710
711#[inline]
712fn decode_live_payloads(payloads: Vec<Vec<u8>>) -> CdcStorageResult<Vec<Cdc>> {
713 let mut live_items = Vec::with_capacity(payloads.len());
714 for payload in payloads {
715 let cdc: Cdc = from_bytes(&payload)
716 .map_err(|e| CdcError::Codec(format!("postcard decode range live: {e}")))?;
717 live_items.push(cdc);
718 }
719 Ok(live_items)
720}
721
722#[inline]
723fn merge_block_and_live(block_items: Vec<Cdc>, live_items: Vec<Cdc>) -> Vec<Cdc> {
724 let mut merged: Vec<Cdc> = Vec::with_capacity(block_items.len() + live_items.len());
725 let (mut bi, mut li) = (0usize, 0usize);
726 while bi < block_items.len() && li < live_items.len() {
727 let bv = block_items[bi].version;
728 let lv = live_items[li].version;
729 if bv < lv {
730 merged.push(block_items[bi].clone());
731 bi += 1;
732 } else if bv > lv {
733 merged.push(live_items[li].clone());
734 li += 1;
735 } else {
736 merged.push(block_items[bi].clone());
737 bi += 1;
738 li += 1;
739 }
740 }
741 while bi < block_items.len() {
742 merged.push(block_items[bi].clone());
743 bi += 1;
744 }
745 while li < live_items.len() {
746 merged.push(live_items[li].clone());
747 li += 1;
748 }
749 merged
750}
751
752#[inline]
753fn scan_live_rows_below(conn: &Connection, version_bytes: &[u8; 8]) -> CdcStorageResult<LiveScan> {
754 let mut stmt = conn
755 .prepare(r#"SELECT payload FROM "cdc" WHERE version < ?1 ORDER BY version ASC"#)
756 .map_err(|e| CdcError::Internal(format!("drop_before prepare: {e}")))?;
757 let rows = stmt
758 .query_map(params![version_bytes.as_slice()], |row| row.get::<_, Vec<u8>>(0))
759 .map_err(|e| CdcError::Internal(format!("drop_before rows: {e}")))?;
760 let mut entries = Vec::new();
761 let mut cdc_count = 0;
762 for row in rows {
763 let bytes = row.map_err(|e| CdcError::Internal(format!("drop_before row: {e}")))?;
764 let cdc: Cdc =
765 from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode drop_before: {e}")))?;
766 cdc_count += 1;
767 extend_dropped_entries(&mut entries, &cdc.system_changes);
768 }
769 Ok(LiveScan {
770 cdc_count,
771 entries,
772 })
773}
774
775#[inline]
776fn apply_drop_before(
777 conn: &Connection,
778 full_block_pks: &[Vec<u8>],
779 straddle_actions: &[(Vec<u8>, BlockOutcome)],
780 version_bytes: &[u8; 8],
781 zstd_level: u8,
782) -> CdcStorageResult<()> {
783 let tx = conn.unchecked_transaction().map_err(|e| CdcError::Internal(format!("drop_before tx begin: {e}")))?;
784
785 for pk in full_block_pks {
786 tx.execute(r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#, params![pk.as_slice()])
787 .map_err(|e| CdcError::Internal(format!("drop block delete: {e}")))?;
788 }
789
790 for (max_bytes, action) in straddle_actions {
791 match action {
792 BlockOutcome::Delete => {
793 tx.execute(
794 r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#,
795 params![max_bytes.as_slice()],
796 )
797 .map_err(|e| CdcError::Internal(format!("drop straddle delete: {e}")))?;
798 }
799 BlockOutcome::Rewrite {
800 survivors,
801 } => {
802 rewrite_straddle_block(&tx, max_bytes, survivors, zstd_level)?;
803 }
804 }
805 }
806
807 tx.execute(r#"DELETE FROM "cdc" WHERE version < ?1"#, params![version_bytes.as_slice()])
808 .map_err(|e| CdcError::Internal(format!("drop_before delete: {e}")))?;
809 tx.commit().map_err(|e| CdcError::Internal(format!("drop_before commit: {e}")))?;
810 Ok(())
811}
812
813#[inline]
814fn rewrite_straddle_block(
815 tx: &Transaction<'_>,
816 max_bytes: &[u8],
817 survivors: &[Cdc],
818 zstd_level: u8,
819) -> CdcStorageResult<()> {
820 let new_min = survivors.first().unwrap().version;
821 let new_max = survivors.last().unwrap().version;
822 debug_assert_eq!(new_max, bytes_to_version(max_bytes)?, "max_version is the block PK and must be preserved");
823 let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(survivors);
824 let payload = block::encode(survivors, zstd_level)?;
825 tx.execute(
826 r#"INSERT OR REPLACE INTO "cdc_block"
827 (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
828 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
829 params![
830 max_bytes,
831 version_to_bytes(new_min).as_slice(),
832 min_ts_nanos,
833 max_ts_nanos,
834 survivors.len() as i64,
835 payload.as_slice(),
836 ],
837 )
838 .map_err(|e| CdcError::Internal(format!("drop straddle rewrite: {e}")))?;
839 Ok(())
840}
841
842#[inline]
843fn extend_dropped_entries(out: &mut Vec<DroppedCdcEntry>, system_changes: &[SystemChange]) {
844 for sys_change in system_changes {
845 out.push(DroppedCdcEntry {
846 key: sys_change.key().clone(),
847 value_bytes: sys_change.value_bytes() as u64,
848 });
849 }
850}
851
852#[inline]
853fn delete_compacted_versions(
854 tx: &Transaction<'_>,
855 version_blobs: &[Vec<u8>],
856 expected_count: usize,
857) -> CdcStorageResult<bool> {
858 let placeholders = repeat_n("?", version_blobs.len()).collect::<Vec<_>>().join(",");
859 let del_sql = format!(r#"DELETE FROM "cdc" WHERE version IN ({})"#, placeholders);
860 let del_params: Vec<SqlValue> = version_blobs.iter().map(|b| SqlValue::Blob(b.clone())).collect();
861 let mut del_stmt =
862 tx.prepare(&del_sql).map_err(|e| CdcError::Internal(format!("compact delete prepare: {e}")))?;
863 let rows_deleted = del_stmt
864 .execute(params_from_iter(del_params.iter()))
865 .map_err(|e| CdcError::Internal(format!("compact delete execute: {e}")))?;
866 Ok(rows_deleted == expected_count)
867}
868
869#[inline]
870fn insert_compacted_block(
871 tx: &Transaction<'_>,
872 payload: &[u8],
873 min_version: CommitVersion,
874 max_version: CommitVersion,
875 min_ts_nanos: i64,
876 max_ts_nanos: i64,
877 num_entries: usize,
878) -> CdcStorageResult<()> {
879 tx.execute(
880 r#"INSERT INTO "cdc_block"
881 (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
882 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
883 params![
884 version_to_bytes(max_version).as_slice(),
885 version_to_bytes(min_version).as_slice(),
886 min_ts_nanos,
887 max_ts_nanos,
888 num_entries as i64,
889 payload,
890 ],
891 )
892 .map_err(|e| CdcError::Internal(format!("compact insert block: {e}")))?;
893 Ok(())
894}
895
896impl CdcStorage for SqliteCdcStorage {
897 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
898 let bytes = to_stdvec(cdc).map_err(|e| CdcError::Codec(format!("postcard encode: {e}")))?;
899 let conn = self.inner.conn.lock();
900 conn.execute(
901 r#"INSERT OR REPLACE INTO "cdc" (version, payload) VALUES (?1, ?2)"#,
902 params![version_to_bytes(cdc.version).as_slice(), bytes.as_slice()],
903 )
904 .map_err(|e| CdcError::Internal(format!("insert cdc: {e}")))?;
905 Ok(())
906 }
907
908 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
909 if let Some(cdc) = self.read_live(version)? {
910 return Ok(Some(cdc));
911 }
912 self.read_from_blocks(version)
913 }
914
915 fn read_range(
916 &self,
917 start: Bound<CommitVersion>,
918 end: Bound<CommitVersion>,
919 batch_size: u64,
920 ) -> CdcStorageResult<CdcBatch> {
921 let Some((lo_inc, hi_inc)) = normalize_range_inclusive(start, end) else {
922 return Ok(CdcBatch {
923 items: Vec::new(),
924 has_more: false,
925 });
926 };
927 let want = batch_size as usize;
928
929 let (block_rows, live_payloads) = self.snapshot_block_and_live(lo_inc, hi_inc, batch_size)?;
930 let block_items = self.decode_block_rows(block_rows, lo_inc, hi_inc)?;
931 let live_items = decode_live_payloads(live_payloads)?;
932 let mut merged = merge_block_and_live(block_items, live_items);
933
934 let has_more = merged.len() > want;
935 merged.truncate(want);
936 Ok(CdcBatch {
937 items: merged,
938 has_more,
939 })
940 }
941
942 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
943 Ok(self.read(version)?.map(|c| c.system_changes.len()).unwrap_or(0))
944 }
945
946 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
947 let conn = self.inner.conn.lock();
948 let block_min = query_min_block(&conn)?;
949 let live_min = query_min_live(&conn)?;
950 Ok([block_min, live_min].into_iter().flatten().min())
951 }
952
953 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
954 let conn = self.inner.conn.lock();
955 if let Some(v) = query_max_live(&conn)? {
956 return Ok(Some(v));
957 }
958 query_max_block(&conn)
959 }
960
961 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
962 let conn = self.inner.conn.lock();
963 let version_bytes = version_to_bytes(version);
964 let zstd_level = self.inner.last_zstd_level.load(Ordering::Relaxed);
965
966 let full_blocks = self.scan_full_blocks_below(&conn, &version_bytes)?;
967 let straddle = self.scan_straddle_blocks(&conn, version, &version_bytes)?;
968 let live = scan_live_rows_below(&conn, &version_bytes)?;
969
970 apply_drop_before(&conn, &full_blocks.pks, &straddle.actions, &version_bytes, zstd_level)?;
971 let _ = pragma::incremental_vacuum(&conn);
972
973 let mut entries = full_blocks.entries;
974 entries.extend(straddle.entries);
975 entries.extend(live.entries);
976 Ok(DropBeforeResult {
977 count: full_blocks.cdc_count + straddle.cdc_count + live.cdc_count,
978 entries,
979 })
980 }
981
982 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
983 let cutoff_nanos = datetime_to_nanos(&cutoff);
984 if let Some(v) = self.try_block_index_cutoff(cutoff_nanos)? {
985 return Ok(Some(v));
986 }
987 let Some(start) = self.pick_scan_start()? else {
988 return self.max_version_blocks().map(|opt| opt.map(|v| CommitVersion(v.0.saturating_add(1))));
989 };
990 self.scan_live_for_cutoff(start, cutoff_nanos)
991 }
992}
993
994impl SqliteCdcStorage {
995 #[inline]
996 fn try_block_index_cutoff(&self, cutoff_nanos: i64) -> CdcStorageResult<Option<CommitVersion>> {
997 let block_hit: Option<Vec<u8>> = {
998 let conn = self.inner.conn.lock();
999 conn.query_row(
1000 r#"SELECT min_version FROM "cdc_block"
1001 WHERE max_timestamp >= ?1 ORDER BY max_timestamp ASC LIMIT 1"#,
1002 params![cutoff_nanos],
1003 |row| row.get::<_, Vec<u8>>(0),
1004 )
1005 .ok()
1006 };
1007 block_hit.map(|b| bytes_to_version(&b)).transpose()
1008 }
1009
1010 #[inline]
1011 fn pick_scan_start(&self) -> CdcStorageResult<Option<CommitVersion>> {
1012 self.min_version_live()
1013 }
1014
1015 #[inline]
1016 fn scan_live_for_cutoff(
1017 &self,
1018 start: CommitVersion,
1019 cutoff_nanos: i64,
1020 ) -> CdcStorageResult<Option<CommitVersion>> {
1021 let mut next_start = Bound::Included(start);
1022 loop {
1023 let batch = self.read_range_live(next_start, Bound::Unbounded, 256)?;
1024 if batch.items.is_empty() {
1025 let last = self.max_version()?.unwrap_or(CommitVersion(0));
1026 return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1027 }
1028 for cdc in &batch.items {
1029 if datetime_to_nanos(&cdc.timestamp) >= cutoff_nanos {
1030 return Ok(Some(cdc.version));
1031 }
1032 }
1033 if !batch.has_more {
1034 let last = batch.items.last().unwrap().version;
1035 return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1036 }
1037 next_start = Bound::Excluded(batch.items.last().unwrap().version);
1038 }
1039 }
1040
1041 #[inline]
1042 fn read_live(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
1043 let conn = self.inner.conn.lock();
1044 let result = conn.query_row(
1045 r#"SELECT payload FROM "cdc" WHERE version = ?1"#,
1046 params![version_to_bytes(version).as_slice()],
1047 |row| row.get::<_, Vec<u8>>(0),
1048 );
1049 match result {
1050 Ok(bytes) => {
1051 let cdc: Cdc = from_bytes(&bytes)
1052 .map_err(|e| CdcError::Codec(format!("postcard decode: {e}")))?;
1053 Ok(Some(cdc))
1054 }
1055 Err(QueryReturnedNoRows) => Ok(None),
1056 Err(e) => Err(CdcError::Internal(format!("read cdc: {e}"))),
1057 }
1058 }
1059}