Skip to main content

reifydb_cdc/storage/sqlite/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::Bound,
6	iter::repeat_n,
7	sync::{
8		Arc,
9		atomic::{AtomicU8, Ordering},
10	},
11};
12
13use postcard::{from_bytes, to_stdvec};
14use reifydb_core::{
15	common::CommitVersion,
16	interface::cdc::{Cdc, CdcBatch, SystemChange},
17};
18use reifydb_runtime::sync::mutex::Mutex;
19use reifydb_sqlite::{
20	SqliteConfig,
21	connection::{connect, convert_flags, resolve_db_path},
22	pragma,
23};
24use reifydb_type::value::datetime::DateTime;
25use rusqlite::{
26	Connection, Error::QueryReturnedNoRows, Result as RusqliteResult, Transaction, params, params_from_iter,
27	types::Value as SqlValue,
28};
29
30use crate::{
31	compact::{block, block::CompactBlockSummary, cache::BlockCache},
32	error::CdcError,
33	storage::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry, normalize_range_inclusive},
34};
35
36#[derive(Clone)]
37pub struct SqliteCdcStorage {
38	inner: Arc<Inner>,
39}
40
41struct Inner {
42	conn: Mutex<Connection>,
43	block_cache: BlockCache,
44	last_zstd_level: AtomicU8,
45}
46
47/// `(decoded entries, raw version blobs)` returned from `select_oldest_eligible`.
48type CompactionCandidates = (Vec<Cdc>, Vec<Vec<u8>>);
49
50/// `(block index rows: (max_version_blob, payload), live row payloads)` returned
51/// from `snapshot_block_and_live`. Block rows carry the PK so the caller can key
52/// the cache; live rows are bare payloads.
53type RangeSnapshot = (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>);
54
55struct FullBlockScan {
56	cdc_count: usize,
57	entries: Vec<DroppedCdcEntry>,
58	pks: Vec<Vec<u8>>,
59}
60
61struct StraddleScan {
62	cdc_count: usize,
63	entries: Vec<DroppedCdcEntry>,
64	actions: Vec<(Vec<u8>, BlockOutcome)>,
65}
66
67struct LiveScan {
68	cdc_count: usize,
69	entries: Vec<DroppedCdcEntry>,
70}
71
72enum BlockOutcome {
73	Delete,
74	Rewrite {
75		survivors: Vec<Cdc>,
76	},
77}
78
79impl SqliteCdcStorage {
80	pub fn new(config: SqliteConfig) -> Self {
81		Self::new_with_cache_capacity(config, BlockCache::DEFAULT_CAPACITY)
82	}
83
84	pub fn new_with_cache_capacity(config: SqliteConfig, cache_capacity: usize) -> Self {
85		let conn = open_connection(&config);
86		Self {
87			inner: Arc::new(Inner {
88				conn: Mutex::new(conn),
89				block_cache: BlockCache::new(cache_capacity),
90				last_zstd_level: AtomicU8::new(3),
91			}),
92		}
93	}
94
95	pub fn in_memory() -> Self {
96		Self::new(SqliteConfig::in_memory())
97	}
98
99	fn ensure_schema(conn: &Connection) {
100		create_cdc_table(conn);
101		create_cdc_block_table(conn);
102		create_block_timestamp_index(conn);
103	}
104
105	pub fn incremental_vacuum(&self) {
106		let _ = pragma::incremental_vacuum(&self.inner.conn.lock());
107	}
108
109	pub fn shrink_memory(&self) {
110		let _ = pragma::shrink_memory(&self.inner.conn.lock());
111	}
112
113	pub fn shutdown(&self) {
114		let _ = pragma::shutdown(&self.inner.conn.lock());
115	}
116
117	fn read_from_blocks(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
118		let v_bytes = version_to_bytes(version);
119		let Some((max_bytes, payload)) = self.find_block_for_version(&v_bytes)? else {
120			return Ok(None);
121		};
122		let block_max = bytes_to_version(&max_bytes)?;
123		let entries = self.load_block_cached(block_max, &payload)?;
124		Ok(entries.iter().find(|c| c.version == version).cloned())
125	}
126
127	#[inline]
128	fn find_block_for_version(&self, v_bytes: &[u8; 8]) -> CdcStorageResult<Option<(Vec<u8>, Vec<u8>)>> {
129		let conn = self.inner.conn.lock();
130		conn.query_row(
131			r#"SELECT max_version, payload FROM "cdc_block"
132			   WHERE max_version >= ?1 AND min_version <= ?1
133			   ORDER BY max_version ASC LIMIT 1"#,
134			params![v_bytes.as_slice()],
135			|row| Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?)),
136		)
137		.map(Some)
138		.or_else(|e| match e {
139			QueryReturnedNoRows => Ok(None),
140			e => Err(CdcError::Internal(format!("read_from_blocks: {e}"))),
141		})
142	}
143
144	fn load_block_cached(&self, block_max: CommitVersion, payload: &[u8]) -> CdcStorageResult<Arc<Vec<Cdc>>> {
145		if let Some(hit) = self.inner.block_cache.get(block_max) {
146			return Ok(hit);
147		}
148		let entries = block::decode(payload)?;
149		let arc = Arc::new(entries);
150		self.inner.block_cache.put(block_max, arc.clone());
151		Ok(arc)
152	}
153
154	fn read_range_live(
155		&self,
156		start: Bound<CommitVersion>,
157		end: Bound<CommitVersion>,
158		batch_size: u64,
159	) -> CdcStorageResult<CdcBatch> {
160		let (lower_sql, lower_bytes) = lower_bind_clause(start);
161		let (upper_sql, upper_bytes) = upper_bind_clause(end);
162		let sql = format!(
163			r#"SELECT payload FROM "cdc" WHERE 1=1{lower_sql}{upper_sql} ORDER BY version ASC LIMIT ?"#
164		);
165		let limit = (batch_size as i64).saturating_add(1);
166
167		let conn = self.inner.conn.lock();
168		let mut stmt = conn.prepare(&sql).map_err(|e| CdcError::Internal(format!("range prepare: {e}")))?;
169		let values = build_range_params(lower_bytes, upper_bytes, limit);
170		let rows = stmt
171			.query_map(params_from_iter(values.iter()), |row| row.get::<_, Vec<u8>>(0))
172			.map_err(|e| CdcError::Internal(format!("range rows: {e}")))?;
173
174		let (items, has_more) = decode_payload_rows(rows, batch_size as usize)?;
175		Ok(CdcBatch {
176			items,
177			has_more,
178		})
179	}
180
181	fn min_version_live(&self) -> CdcStorageResult<Option<CommitVersion>> {
182		let conn = self.inner.conn.lock();
183		let r: Option<Vec<u8>> = conn
184			.query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
185			.ok()
186			.flatten();
187		r.map(|b| bytes_to_version(&b)).transpose()
188	}
189
190	fn max_version_blocks(&self) -> CdcStorageResult<Option<CommitVersion>> {
191		let conn = self.inner.conn.lock();
192		let r: Option<Vec<u8>> = conn
193			.query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| {
194				row.get::<_, Option<Vec<u8>>>(0)
195			})
196			.ok()
197			.flatten();
198		r.map(|b| bytes_to_version(&b)).transpose()
199	}
200
201	pub fn compact_oldest(
202		&self,
203		target_size: usize,
204		safety_lag: u64,
205		zstd_level: u8,
206		producer_watermark: CommitVersion,
207	) -> CdcStorageResult<Option<CompactBlockSummary>> {
208		self.compact_oldest_inner(target_size, safety_lag, false, zstd_level, producer_watermark)
209	}
210
211	pub fn compact_all(
212		&self,
213		target_size: usize,
214		zstd_level: u8,
215		producer_watermark: CommitVersion,
216	) -> CdcStorageResult<Vec<CompactBlockSummary>> {
217		let mut out = Vec::new();
218		while let Some(s) = self.compact_oldest_inner(target_size, 0, false, zstd_level, producer_watermark)? {
219			out.push(s);
220		}
221		if let Some(tail) = self.compact_oldest_inner(target_size, 0, true, zstd_level, producer_watermark)? {
222			out.push(tail);
223		}
224		Ok(out)
225	}
226
227	fn compact_oldest_inner(
228		&self,
229		target_size: usize,
230		safety_lag: u64,
231		allow_partial: bool,
232		zstd_level: u8,
233		producer_watermark: CommitVersion,
234	) -> CdcStorageResult<Option<CompactBlockSummary>> {
235		if target_size == 0 {
236			return Ok(None);
237		}
238		self.inner.last_zstd_level.store(zstd_level, Ordering::Relaxed);
239
240		let Some((entries, version_blobs)) =
241			self.select_oldest_eligible(target_size, safety_lag, allow_partial, producer_watermark)?
242		else {
243			return Ok(None);
244		};
245
246		let payload = block::encode(&entries, zstd_level)?;
247		let compressed_bytes = payload.len();
248		let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(&entries);
249		let min_version = entries.first().unwrap().version;
250		let max_version = entries.last().unwrap().version;
251
252		let committed = self.commit_block_swap(
253			&version_blobs,
254			&payload,
255			min_version,
256			max_version,
257			min_ts_nanos,
258			max_ts_nanos,
259			entries.len(),
260		)?;
261		if !committed {
262			return Ok(None);
263		}
264		Ok(Some(build_block_summary(&entries, min_version, max_version, compressed_bytes)))
265	}
266
267	/// Phase A: short-lived read under the connection mutex. No txn.
268	///
269	/// Returns `Some((entries, version_blobs))` if a viable batch exists,
270	/// or `None` when there is nothing to compact (no live entries, max
271	/// below safety_lag, or fewer than target_size eligible rows when
272	/// partial blocks are not allowed).
273	fn select_oldest_eligible(
274		&self,
275		target_size: usize,
276		safety_lag: u64,
277		allow_partial: bool,
278		producer_watermark: CommitVersion,
279	) -> CdcStorageResult<Option<CompactionCandidates>> {
280		let conn = self.inner.conn.lock();
281		let Some(max_v) = query_max_live_version(&conn)? else {
282			return Ok(None);
283		};
284		let Some(eligible_max) = compute_eligible_max(max_v, safety_lag, producer_watermark) else {
285			return Ok(None);
286		};
287		let (entries, version_blobs) = query_oldest_candidates(&conn, eligible_max, target_size)?;
288		if entries.is_empty() {
289			return Ok(None);
290		}
291		if !allow_partial && entries.len() < target_size {
292			return Ok(None);
293		}
294		Ok(Some((entries, version_blobs)))
295	}
296
297	/// Phase C: short-lived commit under the connection mutex.
298	///
299	/// DELETE first so we can detect a concurrent `drop_before` via
300	/// rows_affected; rolls back and returns `Ok(false)` if the row count
301	/// mismatches (next tick retries on a fresh snapshot). Returns
302	/// `Ok(true)` after a successful swap.
303	#[allow(clippy::too_many_arguments)]
304	fn commit_block_swap(
305		&self,
306		version_blobs: &[Vec<u8>],
307		payload: &[u8],
308		min_version: CommitVersion,
309		max_version: CommitVersion,
310		min_ts_nanos: i64,
311		max_ts_nanos: i64,
312		num_entries: usize,
313	) -> CdcStorageResult<bool> {
314		let conn = self.inner.conn.lock();
315		let tx = conn
316			.unchecked_transaction()
317			.map_err(|e| CdcError::Internal(format!("compact tx begin: {e}")))?;
318
319		if !delete_compacted_versions(&tx, version_blobs, num_entries)? {
320			tx.rollback().map_err(|e| CdcError::Internal(format!("compact rollback: {e}")))?;
321			return Ok(false);
322		}
323		insert_compacted_block(
324			&tx,
325			payload,
326			min_version,
327			max_version,
328			min_ts_nanos,
329			max_ts_nanos,
330			num_entries,
331		)?;
332		tx.commit().map_err(|e| CdcError::Internal(format!("compact commit: {e}")))?;
333		Ok(true)
334	}
335
336	/// Snapshot both `cdc_block` and `cdc` under a single connection lock so the
337	/// two reads are consistent. Without this, a concurrent compactor can move
338	/// an entry from `cdc` into a new block between the two reads and the row
339	/// goes missing in the merged output: we miss it in the block read (block
340	/// didn't exist yet) and miss it in the live read (row already deleted from
341	/// cdc).
342	#[inline]
343	fn snapshot_block_and_live(
344		&self,
345		lo_inc: CommitVersion,
346		hi_inc: CommitVersion,
347		batch_size: u64,
348	) -> CdcStorageResult<RangeSnapshot> {
349		let lo_b = version_to_bytes(lo_inc);
350		let hi_b = version_to_bytes(hi_inc);
351		let limit = (batch_size as i64).saturating_add(1);
352		let conn = self.inner.conn.lock();
353
354		let block_rows = read_block_index_rows(&conn, &lo_b, &hi_b)?;
355		let live_payloads = read_live_payloads(&conn, &lo_b, &hi_b, limit)?;
356		Ok((block_rows, live_payloads))
357	}
358
359	#[inline]
360	fn decode_block_rows(
361		&self,
362		block_rows: Vec<(Vec<u8>, Vec<u8>)>,
363		lo_inc: CommitVersion,
364		hi_inc: CommitVersion,
365	) -> CdcStorageResult<Vec<Cdc>> {
366		let mut block_items: Vec<Cdc> = Vec::new();
367		for (max_bytes, payload) in block_rows {
368			let block_max = bytes_to_version(&max_bytes)?;
369			let entries = self.load_block_cached(block_max, &payload)?;
370			for cdc in entries.iter() {
371				if cdc.version >= lo_inc && cdc.version <= hi_inc {
372					block_items.push(cdc.clone());
373				}
374			}
375		}
376		Ok(block_items)
377	}
378
379	#[inline]
380	fn scan_full_blocks_below(
381		&self,
382		conn: &Connection,
383		version_bytes: &[u8; 8],
384	) -> CdcStorageResult<FullBlockScan> {
385		let mut stmt = conn
386			.prepare(
387				r#"SELECT max_version, payload FROM "cdc_block"
388				   WHERE max_version < ?1 ORDER BY max_version ASC"#,
389			)
390			.map_err(|e| CdcError::Internal(format!("drop blocks prepare: {e}")))?;
391		let rows = stmt
392			.query_map(params![version_bytes.as_slice()], |row| {
393				Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
394			})
395			.map_err(|e| CdcError::Internal(format!("drop blocks rows: {e}")))?;
396		let mut entries = Vec::new();
397		let mut pks = Vec::new();
398		let mut cdc_count = 0;
399		for row in rows {
400			let (max_bytes, payload) =
401				row.map_err(|e| CdcError::Internal(format!("drop blocks row: {e}")))?;
402			let block_max = bytes_to_version(&max_bytes)?;
403			for cdc in &block::decode(&payload)? {
404				cdc_count += 1;
405				extend_dropped_entries(&mut entries, &cdc.system_changes);
406			}
407			self.inner.block_cache.remove(block_max);
408			pks.push(max_bytes);
409		}
410		Ok(FullBlockScan {
411			cdc_count,
412			entries,
413			pks,
414		})
415	}
416
417	#[inline]
418	fn scan_straddle_blocks(
419		&self,
420		conn: &Connection,
421		version: CommitVersion,
422		version_bytes: &[u8; 8],
423	) -> CdcStorageResult<StraddleScan> {
424		let mut stmt = conn
425			.prepare(
426				r#"SELECT max_version, payload FROM "cdc_block"
427				   WHERE min_version < ?1 AND max_version >= ?1
428				   ORDER BY max_version ASC"#,
429			)
430			.map_err(|e| CdcError::Internal(format!("drop straddle prepare: {e}")))?;
431		let rows = stmt
432			.query_map(params![version_bytes.as_slice()], |row| {
433				Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
434			})
435			.map_err(|e| CdcError::Internal(format!("drop straddle rows: {e}")))?;
436		let mut entries = Vec::new();
437		let mut actions = Vec::new();
438		let mut cdc_count = 0;
439		for row in rows {
440			let (max_bytes, payload) =
441				row.map_err(|e| CdcError::Internal(format!("drop straddle row: {e}")))?;
442			let block_max = bytes_to_version(&max_bytes)?;
443			let decoded = block::decode(&payload)?;
444			let mut survivors: Vec<Cdc> = Vec::with_capacity(decoded.len());
445			for cdc in decoded {
446				if cdc.version < version {
447					cdc_count += 1;
448					extend_dropped_entries(&mut entries, &cdc.system_changes);
449				} else {
450					survivors.push(cdc);
451				}
452			}
453			self.inner.block_cache.remove(block_max);
454			let outcome = if survivors.is_empty() {
455				BlockOutcome::Delete
456			} else {
457				BlockOutcome::Rewrite {
458					survivors,
459				}
460			};
461			actions.push((max_bytes, outcome));
462		}
463		Ok(StraddleScan {
464			cdc_count,
465			entries,
466			actions,
467		})
468	}
469}
470
471fn open_connection(config: &SqliteConfig) -> Connection {
472	let db_path = resolve_db_path(config.path.clone(), "cdc.db");
473	let flags = convert_flags(&config.flags);
474	let conn = connect(&db_path, flags).expect("Failed to connect to CDC SQLite database");
475	pragma::apply(&conn, config).expect("Failed to configure CDC SQLite pragmas");
476	SqliteCdcStorage::ensure_schema(&conn);
477	conn
478}
479
480fn create_cdc_table(conn: &Connection) {
481	conn.execute(
482		r#"CREATE TABLE IF NOT EXISTS "cdc" (
483			version BLOB PRIMARY KEY,
484			payload BLOB NOT NULL
485		) WITHOUT ROWID"#,
486		[],
487	)
488	.expect("Failed to create cdc table");
489}
490
491fn create_cdc_block_table(conn: &Connection) {
492	conn.execute(
493		r#"CREATE TABLE IF NOT EXISTS "cdc_block" (
494			max_version BLOB PRIMARY KEY,
495			min_version BLOB NOT NULL,
496			min_timestamp INTEGER NOT NULL,
497			max_timestamp INTEGER NOT NULL,
498			num_entries INTEGER NOT NULL,
499			payload BLOB NOT NULL
500		) WITHOUT ROWID"#,
501		[],
502	)
503	.expect("Failed to create cdc_block table");
504}
505
506fn create_block_timestamp_index(conn: &Connection) {
507	conn.execute(
508		r#"CREATE INDEX IF NOT EXISTS "cdc_block_max_ts_idx"
509		   ON "cdc_block"(max_timestamp)"#,
510		[],
511	)
512	.expect("Failed to create cdc_block_max_ts index");
513}
514
515#[inline]
516fn lower_bind_clause(start: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
517	match start {
518		Bound::Included(v) => (" AND version >= ?", Some(version_to_bytes(v))),
519		Bound::Excluded(v) => (" AND version > ?", Some(version_to_bytes(v))),
520		Bound::Unbounded => ("", None),
521	}
522}
523
524#[inline]
525fn upper_bind_clause(end: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
526	match end {
527		Bound::Included(v) => (" AND version <= ?", Some(version_to_bytes(v))),
528		Bound::Excluded(v) => (" AND version < ?", Some(version_to_bytes(v))),
529		Bound::Unbounded => ("", None),
530	}
531}
532
533#[inline]
534fn build_range_params(lower_bytes: Option<[u8; 8]>, upper_bytes: Option<[u8; 8]>, limit: i64) -> Vec<SqlValue> {
535	let mut values: Vec<SqlValue> = Vec::new();
536	if let Some(b) = lower_bytes {
537		values.push(SqlValue::Blob(b.to_vec()));
538	}
539	if let Some(b) = upper_bytes {
540		values.push(SqlValue::Blob(b.to_vec()));
541	}
542	values.push(SqlValue::Integer(limit));
543	values
544}
545
546#[inline]
547fn decode_payload_rows<I>(rows: I, batch_size: usize) -> CdcStorageResult<(Vec<Cdc>, bool)>
548where
549	I: IntoIterator<Item = RusqliteResult<Vec<u8>>>,
550{
551	let mut items: Vec<Cdc> = Vec::new();
552	for row in rows {
553		let bytes = row.map_err(|e| CdcError::Internal(format!("range row: {e}")))?;
554		let cdc: Cdc =
555			from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode range: {e}")))?;
556		items.push(cdc);
557	}
558	let has_more = items.len() > batch_size;
559	if has_more {
560		items.truncate(batch_size);
561	}
562	Ok((items, has_more))
563}
564
565#[inline]
566fn query_max_live_version(conn: &Connection) -> CdcStorageResult<Option<u64>> {
567	let max_live: Option<Vec<u8>> = conn
568		.query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
569		.ok()
570		.flatten();
571	max_live.map(|b| bytes_to_version(&b).map(|v| v.0)).transpose()
572}
573
574/// Cap eligibility at the CDC producer's commit watermark. Below this watermark,
575/// every PostCommitEvent has been fully processed by the producer actor, so the
576/// cdc table contains the complete set of entries for those versions. Above the
577/// watermark, an in-flight producer write could still land at a version we are
578/// about to pack, breaking the invariant that for any block, every CDC version
579/// in [block.min, block.max] is contained in that block.
580///
581/// Returns `None` if `max_v < safety_lag` (nothing eligible yet).
582#[inline]
583fn compute_eligible_max(max_v: u64, safety_lag: u64, producer_watermark: CommitVersion) -> Option<CommitVersion> {
584	if max_v < safety_lag {
585		return None;
586	}
587	let safety_capped = max_v.saturating_sub(safety_lag);
588	Some(CommitVersion(safety_capped.min(producer_watermark.0)))
589}
590
591#[inline]
592fn query_oldest_candidates(
593	conn: &Connection,
594	eligible_max: CommitVersion,
595	target_size: usize,
596) -> CdcStorageResult<(Vec<Cdc>, Vec<Vec<u8>>)> {
597	let eligible_max_bytes = version_to_bytes(eligible_max);
598	let mut stmt = conn
599		.prepare(
600			r#"SELECT version, payload FROM "cdc"
601			   WHERE version <= ?1 ORDER BY version ASC LIMIT ?2"#,
602		)
603		.map_err(|e| CdcError::Internal(format!("compact prepare: {e}")))?;
604	let limit = (target_size as i64).saturating_add(1);
605	let rows = stmt
606		.query_map(params![eligible_max_bytes.as_slice(), limit], |row| {
607			Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
608		})
609		.map_err(|e| CdcError::Internal(format!("compact rows: {e}")))?;
610
611	let mut version_blobs: Vec<Vec<u8>> = Vec::with_capacity(target_size);
612	let mut entries: Vec<Cdc> = Vec::with_capacity(target_size);
613	for row in rows {
614		if entries.len() == target_size {
615			break;
616		}
617		let (vb, pb) = row.map_err(|e| CdcError::Internal(format!("compact row: {e}")))?;
618		let cdc: Cdc =
619			from_bytes(&pb).map_err(|e| CdcError::Codec(format!("postcard decode in compact: {e}")))?;
620		version_blobs.push(vb);
621		entries.push(cdc);
622	}
623	Ok((entries, version_blobs))
624}
625
626#[inline]
627fn build_block_summary(
628	entries: &[Cdc],
629	min_version: CommitVersion,
630	max_version: CommitVersion,
631	compressed_bytes: usize,
632) -> CompactBlockSummary {
633	CompactBlockSummary {
634		min_version,
635		max_version,
636		num_entries: entries.len(),
637		compressed_bytes,
638	}
639}
640
641#[inline]
642fn query_min_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
643	let r: Option<Vec<u8>> = conn
644		.query_row(r#"SELECT MIN(min_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
645		.ok()
646		.flatten();
647	r.map(|b| bytes_to_version(&b)).transpose()
648}
649
650#[inline]
651fn query_min_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
652	let r: Option<Vec<u8>> = conn
653		.query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
654		.ok()
655		.flatten();
656	r.map(|b| bytes_to_version(&b)).transpose()
657}
658
659#[inline]
660fn query_max_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
661	let r: Option<Vec<u8>> = conn
662		.query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
663		.ok()
664		.flatten();
665	r.map(|b| bytes_to_version(&b)).transpose()
666}
667
668#[inline]
669fn query_max_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
670	let r: Option<Vec<u8>> = conn
671		.query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
672		.ok()
673		.flatten();
674	r.map(|b| bytes_to_version(&b)).transpose()
675}
676
677fn version_to_bytes(v: CommitVersion) -> [u8; 8] {
678	v.0.to_be_bytes()
679}
680
681fn bytes_to_version(bytes: &[u8]) -> CdcStorageResult<CommitVersion> {
682	let arr: [u8; 8] = bytes.try_into().map_err(|_| CdcError::Internal("bad version bytes".to_string()))?;
683	Ok(CommitVersion(u64::from_be_bytes(arr)))
684}
685
686fn datetime_to_nanos(dt: &DateTime) -> i64 {
687	dt.to_nanos() as i64
688}
689
690/// Fold an entry slice's timestamps into `(min, max)` nanos. Timestamps are
691/// not guaranteed monotonic with version (clock skew, batched commits) so we
692/// compute the range explicitly rather than taking first/last.
693fn summarize_timestamps(entries: &[Cdc]) -> (i64, i64) {
694	entries.iter().fold((i64::MAX, i64::MIN), |(lo, hi), c| {
695		let n = datetime_to_nanos(&c.timestamp);
696		(lo.min(n), hi.max(n))
697	})
698}
699
700#[inline]
701fn read_block_index_rows(
702	conn: &Connection,
703	lo_b: &[u8; 8],
704	hi_b: &[u8; 8],
705) -> CdcStorageResult<Vec<(Vec<u8>, Vec<u8>)>> {
706	let mut stmt = conn
707		.prepare(
708			r#"SELECT max_version, payload FROM "cdc_block"
709			   WHERE max_version >= ?1 AND min_version <= ?2
710			   ORDER BY max_version ASC"#,
711		)
712		.map_err(|e| CdcError::Internal(format!("range blocks prepare: {e}")))?;
713	let rows = stmt
714		.query_map(params![lo_b.as_slice(), hi_b.as_slice()], |row| {
715			Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
716		})
717		.map_err(|e| CdcError::Internal(format!("range blocks rows: {e}")))?;
718	let mut out = Vec::new();
719	for r in rows {
720		out.push(r.map_err(|e| CdcError::Internal(format!("range blocks row: {e}")))?);
721	}
722	Ok(out)
723}
724
725#[inline]
726fn read_live_payloads(conn: &Connection, lo_b: &[u8; 8], hi_b: &[u8; 8], limit: i64) -> CdcStorageResult<Vec<Vec<u8>>> {
727	let mut stmt = conn
728		.prepare(
729			r#"SELECT payload FROM "cdc"
730			   WHERE version >= ?1 AND version <= ?2
731			   ORDER BY version ASC LIMIT ?3"#,
732		)
733		.map_err(|e| CdcError::Internal(format!("range live prepare: {e}")))?;
734	let rows = stmt
735		.query_map(params![lo_b.as_slice(), hi_b.as_slice(), limit], |row| row.get::<_, Vec<u8>>(0))
736		.map_err(|e| CdcError::Internal(format!("range live rows: {e}")))?;
737	let mut out = Vec::new();
738	for r in rows {
739		out.push(r.map_err(|e| CdcError::Internal(format!("range live row: {e}")))?);
740	}
741	Ok(out)
742}
743
744#[inline]
745fn decode_live_payloads(payloads: Vec<Vec<u8>>) -> CdcStorageResult<Vec<Cdc>> {
746	let mut live_items = Vec::with_capacity(payloads.len());
747	for payload in payloads {
748		let cdc: Cdc = from_bytes(&payload)
749			.map_err(|e| CdcError::Codec(format!("postcard decode range live: {e}")))?;
750		live_items.push(cdc);
751	}
752	Ok(live_items)
753}
754
755#[inline]
756fn merge_block_and_live(block_items: Vec<Cdc>, live_items: Vec<Cdc>) -> Vec<Cdc> {
757	let mut merged: Vec<Cdc> = Vec::with_capacity(block_items.len() + live_items.len());
758	let (mut bi, mut li) = (0usize, 0usize);
759	while bi < block_items.len() && li < live_items.len() {
760		let bv = block_items[bi].version;
761		let lv = live_items[li].version;
762		if bv < lv {
763			merged.push(block_items[bi].clone());
764			bi += 1;
765		} else if bv > lv {
766			merged.push(live_items[li].clone());
767			li += 1;
768		} else {
769			// Same version in both (compactor swap raced with our read);
770			// keep the block copy and skip the live duplicate.
771			merged.push(block_items[bi].clone());
772			bi += 1;
773			li += 1;
774		}
775	}
776	while bi < block_items.len() {
777		merged.push(block_items[bi].clone());
778		bi += 1;
779	}
780	while li < live_items.len() {
781		merged.push(live_items[li].clone());
782		li += 1;
783	}
784	merged
785}
786
787#[inline]
788fn scan_live_rows_below(conn: &Connection, version_bytes: &[u8; 8]) -> CdcStorageResult<LiveScan> {
789	let mut stmt = conn
790		.prepare(r#"SELECT payload FROM "cdc" WHERE version < ?1 ORDER BY version ASC"#)
791		.map_err(|e| CdcError::Internal(format!("drop_before prepare: {e}")))?;
792	let rows = stmt
793		.query_map(params![version_bytes.as_slice()], |row| row.get::<_, Vec<u8>>(0))
794		.map_err(|e| CdcError::Internal(format!("drop_before rows: {e}")))?;
795	let mut entries = Vec::new();
796	let mut cdc_count = 0;
797	for row in rows {
798		let bytes = row.map_err(|e| CdcError::Internal(format!("drop_before row: {e}")))?;
799		let cdc: Cdc =
800			from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode drop_before: {e}")))?;
801		cdc_count += 1;
802		extend_dropped_entries(&mut entries, &cdc.system_changes);
803	}
804	Ok(LiveScan {
805		cdc_count,
806		entries,
807	})
808}
809
810#[inline]
811fn apply_drop_before(
812	conn: &Connection,
813	full_block_pks: &[Vec<u8>],
814	straddle_actions: &[(Vec<u8>, BlockOutcome)],
815	version_bytes: &[u8; 8],
816	zstd_level: u8,
817) -> CdcStorageResult<()> {
818	let tx = conn.unchecked_transaction().map_err(|e| CdcError::Internal(format!("drop_before tx begin: {e}")))?;
819
820	for pk in full_block_pks {
821		tx.execute(r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#, params![pk.as_slice()])
822			.map_err(|e| CdcError::Internal(format!("drop block delete: {e}")))?;
823	}
824
825	for (max_bytes, action) in straddle_actions {
826		match action {
827			BlockOutcome::Delete => {
828				tx.execute(
829					r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#,
830					params![max_bytes.as_slice()],
831				)
832				.map_err(|e| CdcError::Internal(format!("drop straddle delete: {e}")))?;
833			}
834			BlockOutcome::Rewrite {
835				survivors,
836			} => {
837				rewrite_straddle_block(&tx, max_bytes, survivors, zstd_level)?;
838			}
839		}
840	}
841
842	tx.execute(r#"DELETE FROM "cdc" WHERE version < ?1"#, params![version_bytes.as_slice()])
843		.map_err(|e| CdcError::Internal(format!("drop_before delete: {e}")))?;
844	tx.commit().map_err(|e| CdcError::Internal(format!("drop_before commit: {e}")))?;
845	Ok(())
846}
847
848#[inline]
849fn rewrite_straddle_block(
850	tx: &Transaction<'_>,
851	max_bytes: &[u8],
852	survivors: &[Cdc],
853	zstd_level: u8,
854) -> CdcStorageResult<()> {
855	let new_min = survivors.first().unwrap().version;
856	let new_max = survivors.last().unwrap().version;
857	debug_assert_eq!(new_max, bytes_to_version(max_bytes)?, "max_version is the block PK and must be preserved");
858	let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(survivors);
859	let payload = block::encode(survivors, zstd_level)?;
860	tx.execute(
861		r#"INSERT OR REPLACE INTO "cdc_block"
862		   (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
863		   VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
864		params![
865			max_bytes,
866			version_to_bytes(new_min).as_slice(),
867			min_ts_nanos,
868			max_ts_nanos,
869			survivors.len() as i64,
870			payload.as_slice(),
871		],
872	)
873	.map_err(|e| CdcError::Internal(format!("drop straddle rewrite: {e}")))?;
874	Ok(())
875}
876
877#[inline]
878fn extend_dropped_entries(out: &mut Vec<DroppedCdcEntry>, system_changes: &[SystemChange]) {
879	for sys_change in system_changes {
880		out.push(DroppedCdcEntry {
881			key: sys_change.key().clone(),
882			value_bytes: sys_change.value_bytes() as u64,
883		});
884	}
885}
886
887/// Delete the live CDC rows whose payloads are now folded into a block.
888/// Returns `true` if the row count matched `expected_count` (caller commits),
889/// `false` if a concurrent `drop_before` already removed some (caller rolls
890/// back so the next compactor tick retries on a fresh snapshot).
891#[inline]
892fn delete_compacted_versions(
893	tx: &Transaction<'_>,
894	version_blobs: &[Vec<u8>],
895	expected_count: usize,
896) -> CdcStorageResult<bool> {
897	let placeholders = repeat_n("?", version_blobs.len()).collect::<Vec<_>>().join(",");
898	let del_sql = format!(r#"DELETE FROM "cdc" WHERE version IN ({})"#, placeholders);
899	let del_params: Vec<SqlValue> = version_blobs.iter().map(|b| SqlValue::Blob(b.clone())).collect();
900	let mut del_stmt =
901		tx.prepare(&del_sql).map_err(|e| CdcError::Internal(format!("compact delete prepare: {e}")))?;
902	let rows_deleted = del_stmt
903		.execute(params_from_iter(del_params.iter()))
904		.map_err(|e| CdcError::Internal(format!("compact delete execute: {e}")))?;
905	Ok(rows_deleted == expected_count)
906}
907
908#[inline]
909fn insert_compacted_block(
910	tx: &Transaction<'_>,
911	payload: &[u8],
912	min_version: CommitVersion,
913	max_version: CommitVersion,
914	min_ts_nanos: i64,
915	max_ts_nanos: i64,
916	num_entries: usize,
917) -> CdcStorageResult<()> {
918	tx.execute(
919		r#"INSERT INTO "cdc_block"
920		   (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
921		   VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
922		params![
923			version_to_bytes(max_version).as_slice(),
924			version_to_bytes(min_version).as_slice(),
925			min_ts_nanos,
926			max_ts_nanos,
927			num_entries as i64,
928			payload,
929		],
930	)
931	.map_err(|e| CdcError::Internal(format!("compact insert block: {e}")))?;
932	Ok(())
933}
934
935impl CdcStorage for SqliteCdcStorage {
936	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
937		let bytes = to_stdvec(cdc).map_err(|e| CdcError::Codec(format!("postcard encode: {e}")))?;
938		let conn = self.inner.conn.lock();
939		conn.execute(
940			r#"INSERT OR REPLACE INTO "cdc" (version, payload) VALUES (?1, ?2)"#,
941			params![version_to_bytes(cdc.version).as_slice(), bytes.as_slice()],
942		)
943		.map_err(|e| CdcError::Internal(format!("insert cdc: {e}")))?;
944		Ok(())
945	}
946
947	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
948		if let Some(cdc) = self.read_live(version)? {
949			return Ok(Some(cdc));
950		}
951		self.read_from_blocks(version)
952	}
953
954	fn read_range(
955		&self,
956		start: Bound<CommitVersion>,
957		end: Bound<CommitVersion>,
958		batch_size: u64,
959	) -> CdcStorageResult<CdcBatch> {
960		let Some((lo_inc, hi_inc)) = normalize_range_inclusive(start, end) else {
961			return Ok(CdcBatch {
962				items: Vec::new(),
963				has_more: false,
964			});
965		};
966		let want = batch_size as usize;
967
968		let (block_rows, live_payloads) = self.snapshot_block_and_live(lo_inc, hi_inc, batch_size)?;
969		let block_items = self.decode_block_rows(block_rows, lo_inc, hi_inc)?;
970		let live_items = decode_live_payloads(live_payloads)?;
971		let mut merged = merge_block_and_live(block_items, live_items);
972
973		let has_more = merged.len() > want;
974		merged.truncate(want);
975		Ok(CdcBatch {
976			items: merged,
977			has_more,
978		})
979	}
980
981	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
982		Ok(self.read(version)?.map(|c| c.system_changes.len()).unwrap_or(0))
983	}
984
985	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
986		let conn = self.inner.conn.lock();
987		let block_min = query_min_block(&conn)?;
988		let live_min = query_min_live(&conn)?;
989		Ok([block_min, live_min].into_iter().flatten().min())
990	}
991
992	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
993		let conn = self.inner.conn.lock();
994		if let Some(v) = query_max_live(&conn)? {
995			return Ok(Some(v));
996		}
997		query_max_block(&conn)
998	}
999
1000	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
1001		let conn = self.inner.conn.lock();
1002		let version_bytes = version_to_bytes(version);
1003		let zstd_level = self.inner.last_zstd_level.load(Ordering::Relaxed);
1004
1005		let full_blocks = self.scan_full_blocks_below(&conn, &version_bytes)?;
1006		let straddle = self.scan_straddle_blocks(&conn, version, &version_bytes)?;
1007		let live = scan_live_rows_below(&conn, &version_bytes)?;
1008
1009		apply_drop_before(&conn, &full_blocks.pks, &straddle.actions, &version_bytes, zstd_level)?;
1010		let _ = conn.execute("PRAGMA incremental_vacuum", []);
1011
1012		let mut entries = full_blocks.entries;
1013		entries.extend(straddle.entries);
1014		entries.extend(live.entries);
1015		Ok(DropBeforeResult {
1016			count: full_blocks.cdc_count + straddle.cdc_count + live.cdc_count,
1017			entries,
1018		})
1019	}
1020
1021	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
1022		let cutoff_nanos = datetime_to_nanos(&cutoff);
1023		if let Some(v) = self.try_block_index_cutoff(cutoff_nanos)? {
1024			return Ok(Some(v));
1025		}
1026		let Some(start) = self.pick_scan_start()? else {
1027			return self.max_version_blocks().map(|opt| opt.map(|v| CommitVersion(v.0.saturating_add(1))));
1028		};
1029		self.scan_live_for_cutoff(start, cutoff_nanos)
1030	}
1031}
1032
1033impl SqliteCdcStorage {
1034	/// Try the indexed `cdc_block.max_timestamp` lookup. Returns the smallest
1035	/// `min_version` of any block whose `max_timestamp >= cutoff`, or `None` if
1036	/// no block straddles the cutoff (caller falls back to scanning live rows).
1037	#[inline]
1038	fn try_block_index_cutoff(&self, cutoff_nanos: i64) -> CdcStorageResult<Option<CommitVersion>> {
1039		let block_hit: Option<Vec<u8>> = {
1040			let conn = self.inner.conn.lock();
1041			conn.query_row(
1042				r#"SELECT min_version FROM "cdc_block"
1043				   WHERE max_timestamp >= ?1 ORDER BY max_timestamp ASC LIMIT 1"#,
1044				params![cutoff_nanos],
1045				|row| row.get::<_, Vec<u8>>(0),
1046			)
1047			.ok()
1048		};
1049		block_hit.map(|b| bytes_to_version(&b)).transpose()
1050	}
1051
1052	/// Pick the start of the live-row scan: the smallest live version, or
1053	/// `None` if the live table is empty (caller returns `block_max + 1`).
1054	#[inline]
1055	fn pick_scan_start(&self) -> CdcStorageResult<Option<CommitVersion>> {
1056		self.min_version_live()
1057	}
1058
1059	#[inline]
1060	fn scan_live_for_cutoff(
1061		&self,
1062		start: CommitVersion,
1063		cutoff_nanos: i64,
1064	) -> CdcStorageResult<Option<CommitVersion>> {
1065		let mut next_start = Bound::Included(start);
1066		loop {
1067			let batch = self.read_range_live(next_start, Bound::Unbounded, 256)?;
1068			if batch.items.is_empty() {
1069				let last = self.max_version()?.unwrap_or(CommitVersion(0));
1070				return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1071			}
1072			for cdc in &batch.items {
1073				if datetime_to_nanos(&cdc.timestamp) >= cutoff_nanos {
1074					return Ok(Some(cdc.version));
1075				}
1076			}
1077			if !batch.has_more {
1078				let last = batch.items.last().unwrap().version;
1079				return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1080			}
1081			next_start = Bound::Excluded(batch.items.last().unwrap().version);
1082		}
1083	}
1084
1085	#[inline]
1086	fn read_live(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
1087		let conn = self.inner.conn.lock();
1088		let result = conn.query_row(
1089			r#"SELECT payload FROM "cdc" WHERE version = ?1"#,
1090			params![version_to_bytes(version).as_slice()],
1091			|row| row.get::<_, Vec<u8>>(0),
1092		);
1093		match result {
1094			Ok(bytes) => {
1095				let cdc: Cdc = from_bytes(&bytes)
1096					.map_err(|e| CdcError::Codec(format!("postcard decode: {e}")))?;
1097				Ok(Some(cdc))
1098			}
1099			Err(QueryReturnedNoRows) => Ok(None),
1100			Err(e) => Err(CdcError::Internal(format!("read cdc: {e}"))),
1101		}
1102	}
1103}