1use std::{
5 collections::Bound,
6 iter::repeat_n,
7 sync::{
8 Arc,
9 atomic::{AtomicU8, Ordering},
10 },
11};
12
13use postcard::{from_bytes, to_stdvec};
14use reifydb_core::{
15 common::CommitVersion,
16 interface::cdc::{Cdc, CdcBatch, SystemChange},
17};
18use reifydb_runtime::sync::mutex::Mutex;
19use reifydb_sqlite::{
20 SqliteConfig,
21 connection::{connect, convert_flags, resolve_db_path},
22 pragma,
23};
24use reifydb_type::value::datetime::DateTime;
25use rusqlite::{
26 Connection, Error::QueryReturnedNoRows, Result as RusqliteResult, Transaction, params, params_from_iter,
27 types::Value as SqlValue,
28};
29
30use crate::{
31 compact::{block, block::CompactBlockSummary, cache::BlockCache},
32 error::CdcError,
33 storage::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry, normalize_range_inclusive},
34};
35
36#[derive(Clone)]
37pub struct SqliteCdcStorage {
38 inner: Arc<Inner>,
39}
40
41struct Inner {
42 conn: Mutex<Connection>,
43 block_cache: BlockCache,
44 last_zstd_level: AtomicU8,
45}
46
47type CompactionCandidates = (Vec<Cdc>, Vec<Vec<u8>>);
49
50type RangeSnapshot = (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>);
54
55struct FullBlockScan {
56 cdc_count: usize,
57 entries: Vec<DroppedCdcEntry>,
58 pks: Vec<Vec<u8>>,
59}
60
61struct StraddleScan {
62 cdc_count: usize,
63 entries: Vec<DroppedCdcEntry>,
64 actions: Vec<(Vec<u8>, BlockOutcome)>,
65}
66
67struct LiveScan {
68 cdc_count: usize,
69 entries: Vec<DroppedCdcEntry>,
70}
71
72enum BlockOutcome {
73 Delete,
74 Rewrite {
75 survivors: Vec<Cdc>,
76 },
77}
78
79impl SqliteCdcStorage {
80 pub fn new(config: SqliteConfig) -> Self {
81 Self::new_with_cache_capacity(config, BlockCache::DEFAULT_CAPACITY)
82 }
83
84 pub fn new_with_cache_capacity(config: SqliteConfig, cache_capacity: usize) -> Self {
85 let conn = open_connection(&config);
86 Self {
87 inner: Arc::new(Inner {
88 conn: Mutex::new(conn),
89 block_cache: BlockCache::new(cache_capacity),
90 last_zstd_level: AtomicU8::new(3),
91 }),
92 }
93 }
94
95 pub fn in_memory() -> Self {
96 Self::new(SqliteConfig::in_memory())
97 }
98
99 fn ensure_schema(conn: &Connection) {
100 create_cdc_table(conn);
101 create_cdc_block_table(conn);
102 create_block_timestamp_index(conn);
103 }
104
105 pub fn incremental_vacuum(&self) {
106 let _ = pragma::incremental_vacuum(&self.inner.conn.lock());
107 }
108
109 pub fn shrink_memory(&self) {
110 let _ = pragma::shrink_memory(&self.inner.conn.lock());
111 }
112
113 pub fn shutdown(&self) {
114 let _ = pragma::shutdown(&self.inner.conn.lock());
115 }
116
117 fn read_from_blocks(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
118 let v_bytes = version_to_bytes(version);
119 let Some((max_bytes, payload)) = self.find_block_for_version(&v_bytes)? else {
120 return Ok(None);
121 };
122 let block_max = bytes_to_version(&max_bytes)?;
123 let entries = self.load_block_cached(block_max, &payload)?;
124 Ok(entries.iter().find(|c| c.version == version).cloned())
125 }
126
127 #[inline]
128 fn find_block_for_version(&self, v_bytes: &[u8; 8]) -> CdcStorageResult<Option<(Vec<u8>, Vec<u8>)>> {
129 let conn = self.inner.conn.lock();
130 conn.query_row(
131 r#"SELECT max_version, payload FROM "cdc_block"
132 WHERE max_version >= ?1 AND min_version <= ?1
133 ORDER BY max_version ASC LIMIT 1"#,
134 params![v_bytes.as_slice()],
135 |row| Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?)),
136 )
137 .map(Some)
138 .or_else(|e| match e {
139 QueryReturnedNoRows => Ok(None),
140 e => Err(CdcError::Internal(format!("read_from_blocks: {e}"))),
141 })
142 }
143
144 fn load_block_cached(&self, block_max: CommitVersion, payload: &[u8]) -> CdcStorageResult<Arc<Vec<Cdc>>> {
145 if let Some(hit) = self.inner.block_cache.get(block_max) {
146 return Ok(hit);
147 }
148 let entries = block::decode(payload)?;
149 let arc = Arc::new(entries);
150 self.inner.block_cache.put(block_max, arc.clone());
151 Ok(arc)
152 }
153
154 fn read_range_live(
155 &self,
156 start: Bound<CommitVersion>,
157 end: Bound<CommitVersion>,
158 batch_size: u64,
159 ) -> CdcStorageResult<CdcBatch> {
160 let (lower_sql, lower_bytes) = lower_bind_clause(start);
161 let (upper_sql, upper_bytes) = upper_bind_clause(end);
162 let sql = format!(
163 r#"SELECT payload FROM "cdc" WHERE 1=1{lower_sql}{upper_sql} ORDER BY version ASC LIMIT ?"#
164 );
165 let limit = (batch_size as i64).saturating_add(1);
166
167 let conn = self.inner.conn.lock();
168 let mut stmt = conn.prepare(&sql).map_err(|e| CdcError::Internal(format!("range prepare: {e}")))?;
169 let values = build_range_params(lower_bytes, upper_bytes, limit);
170 let rows = stmt
171 .query_map(params_from_iter(values.iter()), |row| row.get::<_, Vec<u8>>(0))
172 .map_err(|e| CdcError::Internal(format!("range rows: {e}")))?;
173
174 let (items, has_more) = decode_payload_rows(rows, batch_size as usize)?;
175 Ok(CdcBatch {
176 items,
177 has_more,
178 })
179 }
180
181 fn min_version_live(&self) -> CdcStorageResult<Option<CommitVersion>> {
182 let conn = self.inner.conn.lock();
183 let r: Option<Vec<u8>> = conn
184 .query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
185 .ok()
186 .flatten();
187 r.map(|b| bytes_to_version(&b)).transpose()
188 }
189
190 fn max_version_blocks(&self) -> CdcStorageResult<Option<CommitVersion>> {
191 let conn = self.inner.conn.lock();
192 let r: Option<Vec<u8>> = conn
193 .query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| {
194 row.get::<_, Option<Vec<u8>>>(0)
195 })
196 .ok()
197 .flatten();
198 r.map(|b| bytes_to_version(&b)).transpose()
199 }
200
201 pub fn compact_oldest(
202 &self,
203 target_size: usize,
204 safety_lag: u64,
205 zstd_level: u8,
206 producer_watermark: CommitVersion,
207 ) -> CdcStorageResult<Option<CompactBlockSummary>> {
208 self.compact_oldest_inner(target_size, safety_lag, false, zstd_level, producer_watermark)
209 }
210
211 pub fn compact_all(
212 &self,
213 target_size: usize,
214 zstd_level: u8,
215 producer_watermark: CommitVersion,
216 ) -> CdcStorageResult<Vec<CompactBlockSummary>> {
217 let mut out = Vec::new();
218 while let Some(s) = self.compact_oldest_inner(target_size, 0, false, zstd_level, producer_watermark)? {
219 out.push(s);
220 }
221 if let Some(tail) = self.compact_oldest_inner(target_size, 0, true, zstd_level, producer_watermark)? {
222 out.push(tail);
223 }
224 Ok(out)
225 }
226
227 fn compact_oldest_inner(
228 &self,
229 target_size: usize,
230 safety_lag: u64,
231 allow_partial: bool,
232 zstd_level: u8,
233 producer_watermark: CommitVersion,
234 ) -> CdcStorageResult<Option<CompactBlockSummary>> {
235 if target_size == 0 {
236 return Ok(None);
237 }
238 self.inner.last_zstd_level.store(zstd_level, Ordering::Relaxed);
239
240 let Some((entries, version_blobs)) =
241 self.select_oldest_eligible(target_size, safety_lag, allow_partial, producer_watermark)?
242 else {
243 return Ok(None);
244 };
245
246 let payload = block::encode(&entries, zstd_level)?;
247 let compressed_bytes = payload.len();
248 let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(&entries);
249 let min_version = entries.first().unwrap().version;
250 let max_version = entries.last().unwrap().version;
251
252 let committed = self.commit_block_swap(
253 &version_blobs,
254 &payload,
255 min_version,
256 max_version,
257 min_ts_nanos,
258 max_ts_nanos,
259 entries.len(),
260 )?;
261 if !committed {
262 return Ok(None);
263 }
264 Ok(Some(build_block_summary(&entries, min_version, max_version, compressed_bytes)))
265 }
266
267 fn select_oldest_eligible(
274 &self,
275 target_size: usize,
276 safety_lag: u64,
277 allow_partial: bool,
278 producer_watermark: CommitVersion,
279 ) -> CdcStorageResult<Option<CompactionCandidates>> {
280 let conn = self.inner.conn.lock();
281 let Some(max_v) = query_max_live_version(&conn)? else {
282 return Ok(None);
283 };
284 let Some(eligible_max) = compute_eligible_max(max_v, safety_lag, producer_watermark) else {
285 return Ok(None);
286 };
287 let (entries, version_blobs) = query_oldest_candidates(&conn, eligible_max, target_size)?;
288 if entries.is_empty() {
289 return Ok(None);
290 }
291 if !allow_partial && entries.len() < target_size {
292 return Ok(None);
293 }
294 Ok(Some((entries, version_blobs)))
295 }
296
297 #[allow(clippy::too_many_arguments)]
304 fn commit_block_swap(
305 &self,
306 version_blobs: &[Vec<u8>],
307 payload: &[u8],
308 min_version: CommitVersion,
309 max_version: CommitVersion,
310 min_ts_nanos: i64,
311 max_ts_nanos: i64,
312 num_entries: usize,
313 ) -> CdcStorageResult<bool> {
314 let conn = self.inner.conn.lock();
315 let tx = conn
316 .unchecked_transaction()
317 .map_err(|e| CdcError::Internal(format!("compact tx begin: {e}")))?;
318
319 if !delete_compacted_versions(&tx, version_blobs, num_entries)? {
320 tx.rollback().map_err(|e| CdcError::Internal(format!("compact rollback: {e}")))?;
321 return Ok(false);
322 }
323 insert_compacted_block(
324 &tx,
325 payload,
326 min_version,
327 max_version,
328 min_ts_nanos,
329 max_ts_nanos,
330 num_entries,
331 )?;
332 tx.commit().map_err(|e| CdcError::Internal(format!("compact commit: {e}")))?;
333 Ok(true)
334 }
335
336 #[inline]
343 fn snapshot_block_and_live(
344 &self,
345 lo_inc: CommitVersion,
346 hi_inc: CommitVersion,
347 batch_size: u64,
348 ) -> CdcStorageResult<RangeSnapshot> {
349 let lo_b = version_to_bytes(lo_inc);
350 let hi_b = version_to_bytes(hi_inc);
351 let limit = (batch_size as i64).saturating_add(1);
352 let conn = self.inner.conn.lock();
353
354 let block_rows = read_block_index_rows(&conn, &lo_b, &hi_b)?;
355 let live_payloads = read_live_payloads(&conn, &lo_b, &hi_b, limit)?;
356 Ok((block_rows, live_payloads))
357 }
358
359 #[inline]
360 fn decode_block_rows(
361 &self,
362 block_rows: Vec<(Vec<u8>, Vec<u8>)>,
363 lo_inc: CommitVersion,
364 hi_inc: CommitVersion,
365 ) -> CdcStorageResult<Vec<Cdc>> {
366 let mut block_items: Vec<Cdc> = Vec::new();
367 for (max_bytes, payload) in block_rows {
368 let block_max = bytes_to_version(&max_bytes)?;
369 let entries = self.load_block_cached(block_max, &payload)?;
370 for cdc in entries.iter() {
371 if cdc.version >= lo_inc && cdc.version <= hi_inc {
372 block_items.push(cdc.clone());
373 }
374 }
375 }
376 Ok(block_items)
377 }
378
379 #[inline]
380 fn scan_full_blocks_below(
381 &self,
382 conn: &Connection,
383 version_bytes: &[u8; 8],
384 ) -> CdcStorageResult<FullBlockScan> {
385 let mut stmt = conn
386 .prepare(
387 r#"SELECT max_version, payload FROM "cdc_block"
388 WHERE max_version < ?1 ORDER BY max_version ASC"#,
389 )
390 .map_err(|e| CdcError::Internal(format!("drop blocks prepare: {e}")))?;
391 let rows = stmt
392 .query_map(params![version_bytes.as_slice()], |row| {
393 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
394 })
395 .map_err(|e| CdcError::Internal(format!("drop blocks rows: {e}")))?;
396 let mut entries = Vec::new();
397 let mut pks = Vec::new();
398 let mut cdc_count = 0;
399 for row in rows {
400 let (max_bytes, payload) =
401 row.map_err(|e| CdcError::Internal(format!("drop blocks row: {e}")))?;
402 let block_max = bytes_to_version(&max_bytes)?;
403 for cdc in &block::decode(&payload)? {
404 cdc_count += 1;
405 extend_dropped_entries(&mut entries, &cdc.system_changes);
406 }
407 self.inner.block_cache.remove(block_max);
408 pks.push(max_bytes);
409 }
410 Ok(FullBlockScan {
411 cdc_count,
412 entries,
413 pks,
414 })
415 }
416
417 #[inline]
418 fn scan_straddle_blocks(
419 &self,
420 conn: &Connection,
421 version: CommitVersion,
422 version_bytes: &[u8; 8],
423 ) -> CdcStorageResult<StraddleScan> {
424 let mut stmt = conn
425 .prepare(
426 r#"SELECT max_version, payload FROM "cdc_block"
427 WHERE min_version < ?1 AND max_version >= ?1
428 ORDER BY max_version ASC"#,
429 )
430 .map_err(|e| CdcError::Internal(format!("drop straddle prepare: {e}")))?;
431 let rows = stmt
432 .query_map(params![version_bytes.as_slice()], |row| {
433 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
434 })
435 .map_err(|e| CdcError::Internal(format!("drop straddle rows: {e}")))?;
436 let mut entries = Vec::new();
437 let mut actions = Vec::new();
438 let mut cdc_count = 0;
439 for row in rows {
440 let (max_bytes, payload) =
441 row.map_err(|e| CdcError::Internal(format!("drop straddle row: {e}")))?;
442 let block_max = bytes_to_version(&max_bytes)?;
443 let decoded = block::decode(&payload)?;
444 let mut survivors: Vec<Cdc> = Vec::with_capacity(decoded.len());
445 for cdc in decoded {
446 if cdc.version < version {
447 cdc_count += 1;
448 extend_dropped_entries(&mut entries, &cdc.system_changes);
449 } else {
450 survivors.push(cdc);
451 }
452 }
453 self.inner.block_cache.remove(block_max);
454 let outcome = if survivors.is_empty() {
455 BlockOutcome::Delete
456 } else {
457 BlockOutcome::Rewrite {
458 survivors,
459 }
460 };
461 actions.push((max_bytes, outcome));
462 }
463 Ok(StraddleScan {
464 cdc_count,
465 entries,
466 actions,
467 })
468 }
469}
470
471fn open_connection(config: &SqliteConfig) -> Connection {
472 let db_path = resolve_db_path(config.path.clone(), "cdc.db");
473 let flags = convert_flags(&config.flags);
474 let conn = connect(&db_path, flags).expect("Failed to connect to CDC SQLite database");
475 pragma::apply(&conn, config).expect("Failed to configure CDC SQLite pragmas");
476 SqliteCdcStorage::ensure_schema(&conn);
477 conn
478}
479
480fn create_cdc_table(conn: &Connection) {
481 conn.execute(
482 r#"CREATE TABLE IF NOT EXISTS "cdc" (
483 version BLOB PRIMARY KEY,
484 payload BLOB NOT NULL
485 ) WITHOUT ROWID"#,
486 [],
487 )
488 .expect("Failed to create cdc table");
489}
490
491fn create_cdc_block_table(conn: &Connection) {
492 conn.execute(
493 r#"CREATE TABLE IF NOT EXISTS "cdc_block" (
494 max_version BLOB PRIMARY KEY,
495 min_version BLOB NOT NULL,
496 min_timestamp INTEGER NOT NULL,
497 max_timestamp INTEGER NOT NULL,
498 num_entries INTEGER NOT NULL,
499 payload BLOB NOT NULL
500 ) WITHOUT ROWID"#,
501 [],
502 )
503 .expect("Failed to create cdc_block table");
504}
505
506fn create_block_timestamp_index(conn: &Connection) {
507 conn.execute(
508 r#"CREATE INDEX IF NOT EXISTS "cdc_block_max_ts_idx"
509 ON "cdc_block"(max_timestamp)"#,
510 [],
511 )
512 .expect("Failed to create cdc_block_max_ts index");
513}
514
515#[inline]
516fn lower_bind_clause(start: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
517 match start {
518 Bound::Included(v) => (" AND version >= ?", Some(version_to_bytes(v))),
519 Bound::Excluded(v) => (" AND version > ?", Some(version_to_bytes(v))),
520 Bound::Unbounded => ("", None),
521 }
522}
523
524#[inline]
525fn upper_bind_clause(end: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
526 match end {
527 Bound::Included(v) => (" AND version <= ?", Some(version_to_bytes(v))),
528 Bound::Excluded(v) => (" AND version < ?", Some(version_to_bytes(v))),
529 Bound::Unbounded => ("", None),
530 }
531}
532
533#[inline]
534fn build_range_params(lower_bytes: Option<[u8; 8]>, upper_bytes: Option<[u8; 8]>, limit: i64) -> Vec<SqlValue> {
535 let mut values: Vec<SqlValue> = Vec::new();
536 if let Some(b) = lower_bytes {
537 values.push(SqlValue::Blob(b.to_vec()));
538 }
539 if let Some(b) = upper_bytes {
540 values.push(SqlValue::Blob(b.to_vec()));
541 }
542 values.push(SqlValue::Integer(limit));
543 values
544}
545
546#[inline]
547fn decode_payload_rows<I>(rows: I, batch_size: usize) -> CdcStorageResult<(Vec<Cdc>, bool)>
548where
549 I: IntoIterator<Item = RusqliteResult<Vec<u8>>>,
550{
551 let mut items: Vec<Cdc> = Vec::new();
552 for row in rows {
553 let bytes = row.map_err(|e| CdcError::Internal(format!("range row: {e}")))?;
554 let cdc: Cdc =
555 from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode range: {e}")))?;
556 items.push(cdc);
557 }
558 let has_more = items.len() > batch_size;
559 if has_more {
560 items.truncate(batch_size);
561 }
562 Ok((items, has_more))
563}
564
565#[inline]
566fn query_max_live_version(conn: &Connection) -> CdcStorageResult<Option<u64>> {
567 let max_live: Option<Vec<u8>> = conn
568 .query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
569 .ok()
570 .flatten();
571 max_live.map(|b| bytes_to_version(&b).map(|v| v.0)).transpose()
572}
573
574#[inline]
583fn compute_eligible_max(max_v: u64, safety_lag: u64, producer_watermark: CommitVersion) -> Option<CommitVersion> {
584 if max_v < safety_lag {
585 return None;
586 }
587 let safety_capped = max_v.saturating_sub(safety_lag);
588 Some(CommitVersion(safety_capped.min(producer_watermark.0)))
589}
590
591#[inline]
592fn query_oldest_candidates(
593 conn: &Connection,
594 eligible_max: CommitVersion,
595 target_size: usize,
596) -> CdcStorageResult<(Vec<Cdc>, Vec<Vec<u8>>)> {
597 let eligible_max_bytes = version_to_bytes(eligible_max);
598 let mut stmt = conn
599 .prepare(
600 r#"SELECT version, payload FROM "cdc"
601 WHERE version <= ?1 ORDER BY version ASC LIMIT ?2"#,
602 )
603 .map_err(|e| CdcError::Internal(format!("compact prepare: {e}")))?;
604 let limit = (target_size as i64).saturating_add(1);
605 let rows = stmt
606 .query_map(params![eligible_max_bytes.as_slice(), limit], |row| {
607 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
608 })
609 .map_err(|e| CdcError::Internal(format!("compact rows: {e}")))?;
610
611 let mut version_blobs: Vec<Vec<u8>> = Vec::with_capacity(target_size);
612 let mut entries: Vec<Cdc> = Vec::with_capacity(target_size);
613 for row in rows {
614 if entries.len() == target_size {
615 break;
616 }
617 let (vb, pb) = row.map_err(|e| CdcError::Internal(format!("compact row: {e}")))?;
618 let cdc: Cdc =
619 from_bytes(&pb).map_err(|e| CdcError::Codec(format!("postcard decode in compact: {e}")))?;
620 version_blobs.push(vb);
621 entries.push(cdc);
622 }
623 Ok((entries, version_blobs))
624}
625
626#[inline]
627fn build_block_summary(
628 entries: &[Cdc],
629 min_version: CommitVersion,
630 max_version: CommitVersion,
631 compressed_bytes: usize,
632) -> CompactBlockSummary {
633 CompactBlockSummary {
634 min_version,
635 max_version,
636 num_entries: entries.len(),
637 compressed_bytes,
638 }
639}
640
641#[inline]
642fn query_min_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
643 let r: Option<Vec<u8>> = conn
644 .query_row(r#"SELECT MIN(min_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
645 .ok()
646 .flatten();
647 r.map(|b| bytes_to_version(&b)).transpose()
648}
649
650#[inline]
651fn query_min_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
652 let r: Option<Vec<u8>> = conn
653 .query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
654 .ok()
655 .flatten();
656 r.map(|b| bytes_to_version(&b)).transpose()
657}
658
659#[inline]
660fn query_max_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
661 let r: Option<Vec<u8>> = conn
662 .query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
663 .ok()
664 .flatten();
665 r.map(|b| bytes_to_version(&b)).transpose()
666}
667
668#[inline]
669fn query_max_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
670 let r: Option<Vec<u8>> = conn
671 .query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
672 .ok()
673 .flatten();
674 r.map(|b| bytes_to_version(&b)).transpose()
675}
676
677fn version_to_bytes(v: CommitVersion) -> [u8; 8] {
678 v.0.to_be_bytes()
679}
680
681fn bytes_to_version(bytes: &[u8]) -> CdcStorageResult<CommitVersion> {
682 let arr: [u8; 8] = bytes.try_into().map_err(|_| CdcError::Internal("bad version bytes".to_string()))?;
683 Ok(CommitVersion(u64::from_be_bytes(arr)))
684}
685
686fn datetime_to_nanos(dt: &DateTime) -> i64 {
687 dt.to_nanos() as i64
688}
689
690fn summarize_timestamps(entries: &[Cdc]) -> (i64, i64) {
694 entries.iter().fold((i64::MAX, i64::MIN), |(lo, hi), c| {
695 let n = datetime_to_nanos(&c.timestamp);
696 (lo.min(n), hi.max(n))
697 })
698}
699
700#[inline]
701fn read_block_index_rows(
702 conn: &Connection,
703 lo_b: &[u8; 8],
704 hi_b: &[u8; 8],
705) -> CdcStorageResult<Vec<(Vec<u8>, Vec<u8>)>> {
706 let mut stmt = conn
707 .prepare(
708 r#"SELECT max_version, payload FROM "cdc_block"
709 WHERE max_version >= ?1 AND min_version <= ?2
710 ORDER BY max_version ASC"#,
711 )
712 .map_err(|e| CdcError::Internal(format!("range blocks prepare: {e}")))?;
713 let rows = stmt
714 .query_map(params![lo_b.as_slice(), hi_b.as_slice()], |row| {
715 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
716 })
717 .map_err(|e| CdcError::Internal(format!("range blocks rows: {e}")))?;
718 let mut out = Vec::new();
719 for r in rows {
720 out.push(r.map_err(|e| CdcError::Internal(format!("range blocks row: {e}")))?);
721 }
722 Ok(out)
723}
724
725#[inline]
726fn read_live_payloads(conn: &Connection, lo_b: &[u8; 8], hi_b: &[u8; 8], limit: i64) -> CdcStorageResult<Vec<Vec<u8>>> {
727 let mut stmt = conn
728 .prepare(
729 r#"SELECT payload FROM "cdc"
730 WHERE version >= ?1 AND version <= ?2
731 ORDER BY version ASC LIMIT ?3"#,
732 )
733 .map_err(|e| CdcError::Internal(format!("range live prepare: {e}")))?;
734 let rows = stmt
735 .query_map(params![lo_b.as_slice(), hi_b.as_slice(), limit], |row| row.get::<_, Vec<u8>>(0))
736 .map_err(|e| CdcError::Internal(format!("range live rows: {e}")))?;
737 let mut out = Vec::new();
738 for r in rows {
739 out.push(r.map_err(|e| CdcError::Internal(format!("range live row: {e}")))?);
740 }
741 Ok(out)
742}
743
744#[inline]
745fn decode_live_payloads(payloads: Vec<Vec<u8>>) -> CdcStorageResult<Vec<Cdc>> {
746 let mut live_items = Vec::with_capacity(payloads.len());
747 for payload in payloads {
748 let cdc: Cdc = from_bytes(&payload)
749 .map_err(|e| CdcError::Codec(format!("postcard decode range live: {e}")))?;
750 live_items.push(cdc);
751 }
752 Ok(live_items)
753}
754
755#[inline]
756fn merge_block_and_live(block_items: Vec<Cdc>, live_items: Vec<Cdc>) -> Vec<Cdc> {
757 let mut merged: Vec<Cdc> = Vec::with_capacity(block_items.len() + live_items.len());
758 let (mut bi, mut li) = (0usize, 0usize);
759 while bi < block_items.len() && li < live_items.len() {
760 let bv = block_items[bi].version;
761 let lv = live_items[li].version;
762 if bv < lv {
763 merged.push(block_items[bi].clone());
764 bi += 1;
765 } else if bv > lv {
766 merged.push(live_items[li].clone());
767 li += 1;
768 } else {
769 merged.push(block_items[bi].clone());
772 bi += 1;
773 li += 1;
774 }
775 }
776 while bi < block_items.len() {
777 merged.push(block_items[bi].clone());
778 bi += 1;
779 }
780 while li < live_items.len() {
781 merged.push(live_items[li].clone());
782 li += 1;
783 }
784 merged
785}
786
787#[inline]
788fn scan_live_rows_below(conn: &Connection, version_bytes: &[u8; 8]) -> CdcStorageResult<LiveScan> {
789 let mut stmt = conn
790 .prepare(r#"SELECT payload FROM "cdc" WHERE version < ?1 ORDER BY version ASC"#)
791 .map_err(|e| CdcError::Internal(format!("drop_before prepare: {e}")))?;
792 let rows = stmt
793 .query_map(params![version_bytes.as_slice()], |row| row.get::<_, Vec<u8>>(0))
794 .map_err(|e| CdcError::Internal(format!("drop_before rows: {e}")))?;
795 let mut entries = Vec::new();
796 let mut cdc_count = 0;
797 for row in rows {
798 let bytes = row.map_err(|e| CdcError::Internal(format!("drop_before row: {e}")))?;
799 let cdc: Cdc =
800 from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode drop_before: {e}")))?;
801 cdc_count += 1;
802 extend_dropped_entries(&mut entries, &cdc.system_changes);
803 }
804 Ok(LiveScan {
805 cdc_count,
806 entries,
807 })
808}
809
810#[inline]
811fn apply_drop_before(
812 conn: &Connection,
813 full_block_pks: &[Vec<u8>],
814 straddle_actions: &[(Vec<u8>, BlockOutcome)],
815 version_bytes: &[u8; 8],
816 zstd_level: u8,
817) -> CdcStorageResult<()> {
818 let tx = conn.unchecked_transaction().map_err(|e| CdcError::Internal(format!("drop_before tx begin: {e}")))?;
819
820 for pk in full_block_pks {
821 tx.execute(r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#, params![pk.as_slice()])
822 .map_err(|e| CdcError::Internal(format!("drop block delete: {e}")))?;
823 }
824
825 for (max_bytes, action) in straddle_actions {
826 match action {
827 BlockOutcome::Delete => {
828 tx.execute(
829 r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#,
830 params![max_bytes.as_slice()],
831 )
832 .map_err(|e| CdcError::Internal(format!("drop straddle delete: {e}")))?;
833 }
834 BlockOutcome::Rewrite {
835 survivors,
836 } => {
837 rewrite_straddle_block(&tx, max_bytes, survivors, zstd_level)?;
838 }
839 }
840 }
841
842 tx.execute(r#"DELETE FROM "cdc" WHERE version < ?1"#, params![version_bytes.as_slice()])
843 .map_err(|e| CdcError::Internal(format!("drop_before delete: {e}")))?;
844 tx.commit().map_err(|e| CdcError::Internal(format!("drop_before commit: {e}")))?;
845 Ok(())
846}
847
848#[inline]
849fn rewrite_straddle_block(
850 tx: &Transaction<'_>,
851 max_bytes: &[u8],
852 survivors: &[Cdc],
853 zstd_level: u8,
854) -> CdcStorageResult<()> {
855 let new_min = survivors.first().unwrap().version;
856 let new_max = survivors.last().unwrap().version;
857 debug_assert_eq!(new_max, bytes_to_version(max_bytes)?, "max_version is the block PK and must be preserved");
858 let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(survivors);
859 let payload = block::encode(survivors, zstd_level)?;
860 tx.execute(
861 r#"INSERT OR REPLACE INTO "cdc_block"
862 (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
863 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
864 params![
865 max_bytes,
866 version_to_bytes(new_min).as_slice(),
867 min_ts_nanos,
868 max_ts_nanos,
869 survivors.len() as i64,
870 payload.as_slice(),
871 ],
872 )
873 .map_err(|e| CdcError::Internal(format!("drop straddle rewrite: {e}")))?;
874 Ok(())
875}
876
877#[inline]
878fn extend_dropped_entries(out: &mut Vec<DroppedCdcEntry>, system_changes: &[SystemChange]) {
879 for sys_change in system_changes {
880 out.push(DroppedCdcEntry {
881 key: sys_change.key().clone(),
882 value_bytes: sys_change.value_bytes() as u64,
883 });
884 }
885}
886
887#[inline]
892fn delete_compacted_versions(
893 tx: &Transaction<'_>,
894 version_blobs: &[Vec<u8>],
895 expected_count: usize,
896) -> CdcStorageResult<bool> {
897 let placeholders = repeat_n("?", version_blobs.len()).collect::<Vec<_>>().join(",");
898 let del_sql = format!(r#"DELETE FROM "cdc" WHERE version IN ({})"#, placeholders);
899 let del_params: Vec<SqlValue> = version_blobs.iter().map(|b| SqlValue::Blob(b.clone())).collect();
900 let mut del_stmt =
901 tx.prepare(&del_sql).map_err(|e| CdcError::Internal(format!("compact delete prepare: {e}")))?;
902 let rows_deleted = del_stmt
903 .execute(params_from_iter(del_params.iter()))
904 .map_err(|e| CdcError::Internal(format!("compact delete execute: {e}")))?;
905 Ok(rows_deleted == expected_count)
906}
907
908#[inline]
909fn insert_compacted_block(
910 tx: &Transaction<'_>,
911 payload: &[u8],
912 min_version: CommitVersion,
913 max_version: CommitVersion,
914 min_ts_nanos: i64,
915 max_ts_nanos: i64,
916 num_entries: usize,
917) -> CdcStorageResult<()> {
918 tx.execute(
919 r#"INSERT INTO "cdc_block"
920 (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
921 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
922 params![
923 version_to_bytes(max_version).as_slice(),
924 version_to_bytes(min_version).as_slice(),
925 min_ts_nanos,
926 max_ts_nanos,
927 num_entries as i64,
928 payload,
929 ],
930 )
931 .map_err(|e| CdcError::Internal(format!("compact insert block: {e}")))?;
932 Ok(())
933}
934
935impl CdcStorage for SqliteCdcStorage {
936 fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
937 let bytes = to_stdvec(cdc).map_err(|e| CdcError::Codec(format!("postcard encode: {e}")))?;
938 let conn = self.inner.conn.lock();
939 conn.execute(
940 r#"INSERT OR REPLACE INTO "cdc" (version, payload) VALUES (?1, ?2)"#,
941 params![version_to_bytes(cdc.version).as_slice(), bytes.as_slice()],
942 )
943 .map_err(|e| CdcError::Internal(format!("insert cdc: {e}")))?;
944 Ok(())
945 }
946
947 fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
948 if let Some(cdc) = self.read_live(version)? {
949 return Ok(Some(cdc));
950 }
951 self.read_from_blocks(version)
952 }
953
954 fn read_range(
955 &self,
956 start: Bound<CommitVersion>,
957 end: Bound<CommitVersion>,
958 batch_size: u64,
959 ) -> CdcStorageResult<CdcBatch> {
960 let Some((lo_inc, hi_inc)) = normalize_range_inclusive(start, end) else {
961 return Ok(CdcBatch {
962 items: Vec::new(),
963 has_more: false,
964 });
965 };
966 let want = batch_size as usize;
967
968 let (block_rows, live_payloads) = self.snapshot_block_and_live(lo_inc, hi_inc, batch_size)?;
969 let block_items = self.decode_block_rows(block_rows, lo_inc, hi_inc)?;
970 let live_items = decode_live_payloads(live_payloads)?;
971 let mut merged = merge_block_and_live(block_items, live_items);
972
973 let has_more = merged.len() > want;
974 merged.truncate(want);
975 Ok(CdcBatch {
976 items: merged,
977 has_more,
978 })
979 }
980
981 fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
982 Ok(self.read(version)?.map(|c| c.system_changes.len()).unwrap_or(0))
983 }
984
985 fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
986 let conn = self.inner.conn.lock();
987 let block_min = query_min_block(&conn)?;
988 let live_min = query_min_live(&conn)?;
989 Ok([block_min, live_min].into_iter().flatten().min())
990 }
991
992 fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
993 let conn = self.inner.conn.lock();
994 if let Some(v) = query_max_live(&conn)? {
995 return Ok(Some(v));
996 }
997 query_max_block(&conn)
998 }
999
1000 fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
1001 let conn = self.inner.conn.lock();
1002 let version_bytes = version_to_bytes(version);
1003 let zstd_level = self.inner.last_zstd_level.load(Ordering::Relaxed);
1004
1005 let full_blocks = self.scan_full_blocks_below(&conn, &version_bytes)?;
1006 let straddle = self.scan_straddle_blocks(&conn, version, &version_bytes)?;
1007 let live = scan_live_rows_below(&conn, &version_bytes)?;
1008
1009 apply_drop_before(&conn, &full_blocks.pks, &straddle.actions, &version_bytes, zstd_level)?;
1010 let _ = conn.execute("PRAGMA incremental_vacuum", []);
1011
1012 let mut entries = full_blocks.entries;
1013 entries.extend(straddle.entries);
1014 entries.extend(live.entries);
1015 Ok(DropBeforeResult {
1016 count: full_blocks.cdc_count + straddle.cdc_count + live.cdc_count,
1017 entries,
1018 })
1019 }
1020
1021 fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
1022 let cutoff_nanos = datetime_to_nanos(&cutoff);
1023 if let Some(v) = self.try_block_index_cutoff(cutoff_nanos)? {
1024 return Ok(Some(v));
1025 }
1026 let Some(start) = self.pick_scan_start()? else {
1027 return self.max_version_blocks().map(|opt| opt.map(|v| CommitVersion(v.0.saturating_add(1))));
1028 };
1029 self.scan_live_for_cutoff(start, cutoff_nanos)
1030 }
1031}
1032
1033impl SqliteCdcStorage {
1034 #[inline]
1038 fn try_block_index_cutoff(&self, cutoff_nanos: i64) -> CdcStorageResult<Option<CommitVersion>> {
1039 let block_hit: Option<Vec<u8>> = {
1040 let conn = self.inner.conn.lock();
1041 conn.query_row(
1042 r#"SELECT min_version FROM "cdc_block"
1043 WHERE max_timestamp >= ?1 ORDER BY max_timestamp ASC LIMIT 1"#,
1044 params![cutoff_nanos],
1045 |row| row.get::<_, Vec<u8>>(0),
1046 )
1047 .ok()
1048 };
1049 block_hit.map(|b| bytes_to_version(&b)).transpose()
1050 }
1051
1052 #[inline]
1055 fn pick_scan_start(&self) -> CdcStorageResult<Option<CommitVersion>> {
1056 self.min_version_live()
1057 }
1058
1059 #[inline]
1060 fn scan_live_for_cutoff(
1061 &self,
1062 start: CommitVersion,
1063 cutoff_nanos: i64,
1064 ) -> CdcStorageResult<Option<CommitVersion>> {
1065 let mut next_start = Bound::Included(start);
1066 loop {
1067 let batch = self.read_range_live(next_start, Bound::Unbounded, 256)?;
1068 if batch.items.is_empty() {
1069 let last = self.max_version()?.unwrap_or(CommitVersion(0));
1070 return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1071 }
1072 for cdc in &batch.items {
1073 if datetime_to_nanos(&cdc.timestamp) >= cutoff_nanos {
1074 return Ok(Some(cdc.version));
1075 }
1076 }
1077 if !batch.has_more {
1078 let last = batch.items.last().unwrap().version;
1079 return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1080 }
1081 next_start = Bound::Excluded(batch.items.last().unwrap().version);
1082 }
1083 }
1084
1085 #[inline]
1086 fn read_live(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
1087 let conn = self.inner.conn.lock();
1088 let result = conn.query_row(
1089 r#"SELECT payload FROM "cdc" WHERE version = ?1"#,
1090 params![version_to_bytes(version).as_slice()],
1091 |row| row.get::<_, Vec<u8>>(0),
1092 );
1093 match result {
1094 Ok(bytes) => {
1095 let cdc: Cdc = from_bytes(&bytes)
1096 .map_err(|e| CdcError::Codec(format!("postcard decode: {e}")))?;
1097 Ok(Some(cdc))
1098 }
1099 Err(QueryReturnedNoRows) => Ok(None),
1100 Err(e) => Err(CdcError::Internal(format!("read cdc: {e}"))),
1101 }
1102 }
1103}