Skip to main content

reifydb_cdc/storage/sqlite/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::Bound,
6	iter::repeat_n,
7	sync::{
8		Arc,
9		atomic::{AtomicU8, Ordering},
10	},
11};
12
13use postcard::{from_bytes, to_stdvec};
14use reifydb_core::{
15	common::CommitVersion,
16	interface::cdc::{Cdc, CdcBatch, SystemChange},
17};
18use reifydb_runtime::sync::mutex::Mutex;
19use reifydb_sqlite::{
20	SqliteConfig,
21	connection::{connect, convert_flags, resolve_db_path},
22	pragma,
23};
24use reifydb_type::value::datetime::DateTime;
25use rusqlite::{
26	Connection, Error::QueryReturnedNoRows, Result as RusqliteResult, Transaction, params, params_from_iter,
27	types::Value as SqlValue,
28};
29
30use crate::{
31	compact::{block, block::CompactBlockSummary, cache::BlockCache},
32	error::CdcError,
33	storage::{CdcStorage, CdcStorageResult, DropBeforeResult, DroppedCdcEntry, normalize_range_inclusive},
34};
35
36#[derive(Clone)]
37pub struct SqliteCdcStorage {
38	inner: Arc<Inner>,
39}
40
41struct Inner {
42	conn: Mutex<Connection>,
43	block_cache: BlockCache,
44	last_zstd_level: AtomicU8,
45}
46
47type CompactionCandidates = (Vec<Cdc>, Vec<Vec<u8>>);
48
49type RangeSnapshot = (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>);
50
51struct FullBlockScan {
52	cdc_count: usize,
53	entries: Vec<DroppedCdcEntry>,
54	pks: Vec<Vec<u8>>,
55}
56
57struct StraddleScan {
58	cdc_count: usize,
59	entries: Vec<DroppedCdcEntry>,
60	actions: Vec<(Vec<u8>, BlockOutcome)>,
61}
62
63struct LiveScan {
64	cdc_count: usize,
65	entries: Vec<DroppedCdcEntry>,
66}
67
68enum BlockOutcome {
69	Delete,
70	Rewrite {
71		survivors: Vec<Cdc>,
72	},
73}
74
75impl SqliteCdcStorage {
76	pub fn new(config: SqliteConfig) -> Self {
77		Self::new_with_cache_capacity(config, BlockCache::DEFAULT_CAPACITY)
78	}
79
80	pub fn new_with_cache_capacity(config: SqliteConfig, cache_capacity: usize) -> Self {
81		let conn = open_connection(&config);
82		Self {
83			inner: Arc::new(Inner {
84				conn: Mutex::new(conn),
85				block_cache: BlockCache::new(cache_capacity),
86				last_zstd_level: AtomicU8::new(3),
87			}),
88		}
89	}
90
91	pub fn in_memory() -> Self {
92		Self::new(SqliteConfig::in_memory())
93	}
94
95	fn ensure_schema(conn: &Connection) {
96		create_cdc_table(conn);
97		create_cdc_block_table(conn);
98		create_block_timestamp_index(conn);
99	}
100
101	pub fn incremental_vacuum(&self) {
102		let _ = pragma::incremental_vacuum(&self.inner.conn.lock());
103	}
104
105	pub fn shrink_memory(&self) {
106		let _ = pragma::shrink_memory(&self.inner.conn.lock());
107	}
108
109	pub fn shutdown(&self) {
110		let _ = pragma::shutdown(&self.inner.conn.lock());
111	}
112
113	fn read_from_blocks(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
114		let v_bytes = version_to_bytes(version);
115		let Some((max_bytes, payload)) = self.find_block_for_version(&v_bytes)? else {
116			return Ok(None);
117		};
118		let block_max = bytes_to_version(&max_bytes)?;
119		let entries = self.load_block_cached(block_max, &payload)?;
120		Ok(entries.iter().find(|c| c.version == version).cloned())
121	}
122
123	#[inline]
124	fn find_block_for_version(&self, v_bytes: &[u8; 8]) -> CdcStorageResult<Option<(Vec<u8>, Vec<u8>)>> {
125		let conn = self.inner.conn.lock();
126		conn.query_row(
127			r#"SELECT max_version, payload FROM "cdc_block"
128			   WHERE max_version >= ?1 AND min_version <= ?1
129			   ORDER BY max_version ASC LIMIT 1"#,
130			params![v_bytes.as_slice()],
131			|row| Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?)),
132		)
133		.map(Some)
134		.or_else(|e| match e {
135			QueryReturnedNoRows => Ok(None),
136			e => Err(CdcError::Internal(format!("read_from_blocks: {e}"))),
137		})
138	}
139
140	fn load_block_cached(&self, block_max: CommitVersion, payload: &[u8]) -> CdcStorageResult<Arc<Vec<Cdc>>> {
141		if let Some(hit) = self.inner.block_cache.get(block_max) {
142			return Ok(hit);
143		}
144		let entries = block::decode(payload)?;
145		let arc = Arc::new(entries);
146		self.inner.block_cache.put(block_max, arc.clone());
147		Ok(arc)
148	}
149
150	fn read_range_live(
151		&self,
152		start: Bound<CommitVersion>,
153		end: Bound<CommitVersion>,
154		batch_size: u64,
155	) -> CdcStorageResult<CdcBatch> {
156		let (lower_sql, lower_bytes) = lower_bind_clause(start);
157		let (upper_sql, upper_bytes) = upper_bind_clause(end);
158		let sql = format!(
159			r#"SELECT payload FROM "cdc" WHERE 1=1{lower_sql}{upper_sql} ORDER BY version ASC LIMIT ?"#
160		);
161		let limit = (batch_size as i64).saturating_add(1);
162
163		let conn = self.inner.conn.lock();
164		let mut stmt = conn.prepare(&sql).map_err(|e| CdcError::Internal(format!("range prepare: {e}")))?;
165		let values = build_range_params(lower_bytes, upper_bytes, limit);
166		let rows = stmt
167			.query_map(params_from_iter(values.iter()), |row| row.get::<_, Vec<u8>>(0))
168			.map_err(|e| CdcError::Internal(format!("range rows: {e}")))?;
169
170		let (items, has_more) = decode_payload_rows(rows, batch_size as usize)?;
171		Ok(CdcBatch {
172			items,
173			has_more,
174		})
175	}
176
177	fn min_version_live(&self) -> CdcStorageResult<Option<CommitVersion>> {
178		let conn = self.inner.conn.lock();
179		let r: Option<Vec<u8>> = conn
180			.query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
181			.ok()
182			.flatten();
183		r.map(|b| bytes_to_version(&b)).transpose()
184	}
185
186	fn max_version_blocks(&self) -> CdcStorageResult<Option<CommitVersion>> {
187		let conn = self.inner.conn.lock();
188		let r: Option<Vec<u8>> = conn
189			.query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| {
190				row.get::<_, Option<Vec<u8>>>(0)
191			})
192			.ok()
193			.flatten();
194		r.map(|b| bytes_to_version(&b)).transpose()
195	}
196
197	pub fn compact_oldest(
198		&self,
199		target_size: usize,
200		safety_lag: u64,
201		zstd_level: u8,
202		producer_watermark: CommitVersion,
203	) -> CdcStorageResult<Option<CompactBlockSummary>> {
204		self.compact_oldest_inner(target_size, safety_lag, false, zstd_level, producer_watermark)
205	}
206
207	pub fn compact_all(
208		&self,
209		target_size: usize,
210		zstd_level: u8,
211		producer_watermark: CommitVersion,
212	) -> CdcStorageResult<Vec<CompactBlockSummary>> {
213		let mut out = Vec::new();
214		while let Some(s) = self.compact_oldest_inner(target_size, 0, false, zstd_level, producer_watermark)? {
215			out.push(s);
216		}
217		if let Some(tail) = self.compact_oldest_inner(target_size, 0, true, zstd_level, producer_watermark)? {
218			out.push(tail);
219		}
220		Ok(out)
221	}
222
223	fn compact_oldest_inner(
224		&self,
225		target_size: usize,
226		safety_lag: u64,
227		allow_partial: bool,
228		zstd_level: u8,
229		producer_watermark: CommitVersion,
230	) -> CdcStorageResult<Option<CompactBlockSummary>> {
231		if target_size == 0 {
232			return Ok(None);
233		}
234		self.inner.last_zstd_level.store(zstd_level, Ordering::Relaxed);
235
236		let Some((entries, version_blobs)) =
237			self.select_oldest_eligible(target_size, safety_lag, allow_partial, producer_watermark)?
238		else {
239			return Ok(None);
240		};
241
242		let payload = block::encode(&entries, zstd_level)?;
243		let compressed_bytes = payload.len();
244		let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(&entries);
245		let min_version = entries.first().unwrap().version;
246		let max_version = entries.last().unwrap().version;
247
248		let committed = self.commit_block_swap(
249			&version_blobs,
250			&payload,
251			min_version,
252			max_version,
253			min_ts_nanos,
254			max_ts_nanos,
255			entries.len(),
256		)?;
257		if !committed {
258			return Ok(None);
259		}
260		Ok(Some(build_block_summary(&entries, min_version, max_version, compressed_bytes)))
261	}
262
263	fn select_oldest_eligible(
264		&self,
265		target_size: usize,
266		safety_lag: u64,
267		allow_partial: bool,
268		producer_watermark: CommitVersion,
269	) -> CdcStorageResult<Option<CompactionCandidates>> {
270		let conn = self.inner.conn.lock();
271		let Some(max_v) = query_max_live_version(&conn)? else {
272			return Ok(None);
273		};
274		let Some(eligible_max) = compute_eligible_max(max_v, safety_lag, producer_watermark) else {
275			return Ok(None);
276		};
277		let (entries, version_blobs) = query_oldest_candidates(&conn, eligible_max, target_size)?;
278		if entries.is_empty() {
279			return Ok(None);
280		}
281		if !allow_partial && entries.len() < target_size {
282			return Ok(None);
283		}
284		Ok(Some((entries, version_blobs)))
285	}
286
287	#[allow(clippy::too_many_arguments)]
288	fn commit_block_swap(
289		&self,
290		version_blobs: &[Vec<u8>],
291		payload: &[u8],
292		min_version: CommitVersion,
293		max_version: CommitVersion,
294		min_ts_nanos: i64,
295		max_ts_nanos: i64,
296		num_entries: usize,
297	) -> CdcStorageResult<bool> {
298		let conn = self.inner.conn.lock();
299		let tx = conn
300			.unchecked_transaction()
301			.map_err(|e| CdcError::Internal(format!("compact tx begin: {e}")))?;
302
303		if !delete_compacted_versions(&tx, version_blobs, num_entries)? {
304			tx.rollback().map_err(|e| CdcError::Internal(format!("compact rollback: {e}")))?;
305			return Ok(false);
306		}
307		insert_compacted_block(
308			&tx,
309			payload,
310			min_version,
311			max_version,
312			min_ts_nanos,
313			max_ts_nanos,
314			num_entries,
315		)?;
316		tx.commit().map_err(|e| CdcError::Internal(format!("compact commit: {e}")))?;
317		Ok(true)
318	}
319
320	#[inline]
321	fn snapshot_block_and_live(
322		&self,
323		lo_inc: CommitVersion,
324		hi_inc: CommitVersion,
325		batch_size: u64,
326	) -> CdcStorageResult<RangeSnapshot> {
327		let lo_b = version_to_bytes(lo_inc);
328		let hi_b = version_to_bytes(hi_inc);
329		let limit = (batch_size as i64).saturating_add(1);
330		let conn = self.inner.conn.lock();
331
332		let block_rows = read_block_index_rows(&conn, &lo_b, &hi_b)?;
333		let live_payloads = read_live_payloads(&conn, &lo_b, &hi_b, limit)?;
334		Ok((block_rows, live_payloads))
335	}
336
337	#[inline]
338	fn decode_block_rows(
339		&self,
340		block_rows: Vec<(Vec<u8>, Vec<u8>)>,
341		lo_inc: CommitVersion,
342		hi_inc: CommitVersion,
343	) -> CdcStorageResult<Vec<Cdc>> {
344		let mut block_items: Vec<Cdc> = Vec::new();
345		for (max_bytes, payload) in block_rows {
346			let block_max = bytes_to_version(&max_bytes)?;
347			let entries = self.load_block_cached(block_max, &payload)?;
348			for cdc in entries.iter() {
349				if cdc.version >= lo_inc && cdc.version <= hi_inc {
350					block_items.push(cdc.clone());
351				}
352			}
353		}
354		Ok(block_items)
355	}
356
357	#[inline]
358	fn scan_full_blocks_below(
359		&self,
360		conn: &Connection,
361		version_bytes: &[u8; 8],
362	) -> CdcStorageResult<FullBlockScan> {
363		let mut stmt = conn
364			.prepare(
365				r#"SELECT max_version, payload FROM "cdc_block"
366				   WHERE max_version < ?1 ORDER BY max_version ASC"#,
367			)
368			.map_err(|e| CdcError::Internal(format!("drop blocks prepare: {e}")))?;
369		let rows = stmt
370			.query_map(params![version_bytes.as_slice()], |row| {
371				Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
372			})
373			.map_err(|e| CdcError::Internal(format!("drop blocks rows: {e}")))?;
374		let mut entries = Vec::new();
375		let mut pks = Vec::new();
376		let mut cdc_count = 0;
377		for row in rows {
378			let (max_bytes, payload) =
379				row.map_err(|e| CdcError::Internal(format!("drop blocks row: {e}")))?;
380			let block_max = bytes_to_version(&max_bytes)?;
381			for cdc in &block::decode(&payload)? {
382				cdc_count += 1;
383				extend_dropped_entries(&mut entries, &cdc.system_changes);
384			}
385			self.inner.block_cache.remove(block_max);
386			pks.push(max_bytes);
387		}
388		Ok(FullBlockScan {
389			cdc_count,
390			entries,
391			pks,
392		})
393	}
394
395	#[inline]
396	fn scan_straddle_blocks(
397		&self,
398		conn: &Connection,
399		version: CommitVersion,
400		version_bytes: &[u8; 8],
401	) -> CdcStorageResult<StraddleScan> {
402		let mut stmt = conn
403			.prepare(
404				r#"SELECT max_version, payload FROM "cdc_block"
405				   WHERE min_version < ?1 AND max_version >= ?1
406				   ORDER BY max_version ASC"#,
407			)
408			.map_err(|e| CdcError::Internal(format!("drop straddle prepare: {e}")))?;
409		let rows = stmt
410			.query_map(params![version_bytes.as_slice()], |row| {
411				Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
412			})
413			.map_err(|e| CdcError::Internal(format!("drop straddle rows: {e}")))?;
414		let mut entries = Vec::new();
415		let mut actions = Vec::new();
416		let mut cdc_count = 0;
417		for row in rows {
418			let (max_bytes, payload) =
419				row.map_err(|e| CdcError::Internal(format!("drop straddle row: {e}")))?;
420			let block_max = bytes_to_version(&max_bytes)?;
421			let decoded = block::decode(&payload)?;
422			let mut survivors: Vec<Cdc> = Vec::with_capacity(decoded.len());
423			for cdc in decoded {
424				if cdc.version < version {
425					cdc_count += 1;
426					extend_dropped_entries(&mut entries, &cdc.system_changes);
427				} else {
428					survivors.push(cdc);
429				}
430			}
431			self.inner.block_cache.remove(block_max);
432			let outcome = if survivors.is_empty() {
433				BlockOutcome::Delete
434			} else {
435				BlockOutcome::Rewrite {
436					survivors,
437				}
438			};
439			actions.push((max_bytes, outcome));
440		}
441		Ok(StraddleScan {
442			cdc_count,
443			entries,
444			actions,
445		})
446	}
447}
448
449fn open_connection(config: &SqliteConfig) -> Connection {
450	let db_path = resolve_db_path(config.path.clone(), "cdc.db");
451	let flags = convert_flags(&config.flags);
452	let conn = connect(&db_path, flags).expect("Failed to connect to CDC SQLite database");
453	pragma::apply(&conn, config).expect("Failed to configure CDC SQLite pragmas");
454	SqliteCdcStorage::ensure_schema(&conn);
455	conn
456}
457
458fn create_cdc_table(conn: &Connection) {
459	conn.execute(
460		r#"CREATE TABLE IF NOT EXISTS "cdc" (
461			version BLOB PRIMARY KEY,
462			payload BLOB NOT NULL
463		) WITHOUT ROWID"#,
464		[],
465	)
466	.expect("Failed to create cdc table");
467}
468
469fn create_cdc_block_table(conn: &Connection) {
470	conn.execute(
471		r#"CREATE TABLE IF NOT EXISTS "cdc_block" (
472			max_version BLOB PRIMARY KEY,
473			min_version BLOB NOT NULL,
474			min_timestamp INTEGER NOT NULL,
475			max_timestamp INTEGER NOT NULL,
476			num_entries INTEGER NOT NULL,
477			payload BLOB NOT NULL
478		) WITHOUT ROWID"#,
479		[],
480	)
481	.expect("Failed to create cdc_block table");
482}
483
484fn create_block_timestamp_index(conn: &Connection) {
485	conn.execute(
486		r#"CREATE INDEX IF NOT EXISTS "cdc_block_max_ts_idx"
487		   ON "cdc_block"(max_timestamp)"#,
488		[],
489	)
490	.expect("Failed to create cdc_block_max_ts index");
491}
492
493#[inline]
494fn lower_bind_clause(start: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
495	match start {
496		Bound::Included(v) => (" AND version >= ?", Some(version_to_bytes(v))),
497		Bound::Excluded(v) => (" AND version > ?", Some(version_to_bytes(v))),
498		Bound::Unbounded => ("", None),
499	}
500}
501
502#[inline]
503fn upper_bind_clause(end: Bound<CommitVersion>) -> (&'static str, Option<[u8; 8]>) {
504	match end {
505		Bound::Included(v) => (" AND version <= ?", Some(version_to_bytes(v))),
506		Bound::Excluded(v) => (" AND version < ?", Some(version_to_bytes(v))),
507		Bound::Unbounded => ("", None),
508	}
509}
510
511#[inline]
512fn build_range_params(lower_bytes: Option<[u8; 8]>, upper_bytes: Option<[u8; 8]>, limit: i64) -> Vec<SqlValue> {
513	let mut values: Vec<SqlValue> = Vec::new();
514	if let Some(b) = lower_bytes {
515		values.push(SqlValue::Blob(b.to_vec()));
516	}
517	if let Some(b) = upper_bytes {
518		values.push(SqlValue::Blob(b.to_vec()));
519	}
520	values.push(SqlValue::Integer(limit));
521	values
522}
523
524#[inline]
525fn decode_payload_rows<I>(rows: I, batch_size: usize) -> CdcStorageResult<(Vec<Cdc>, bool)>
526where
527	I: IntoIterator<Item = RusqliteResult<Vec<u8>>>,
528{
529	let mut items: Vec<Cdc> = Vec::new();
530	for row in rows {
531		let bytes = row.map_err(|e| CdcError::Internal(format!("range row: {e}")))?;
532		let cdc: Cdc =
533			from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode range: {e}")))?;
534		items.push(cdc);
535	}
536	let has_more = items.len() > batch_size;
537	if has_more {
538		items.truncate(batch_size);
539	}
540	Ok((items, has_more))
541}
542
543#[inline]
544fn query_max_live_version(conn: &Connection) -> CdcStorageResult<Option<u64>> {
545	let max_live: Option<Vec<u8>> = conn
546		.query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
547		.ok()
548		.flatten();
549	max_live.map(|b| bytes_to_version(&b).map(|v| v.0)).transpose()
550}
551
552#[inline]
553fn compute_eligible_max(max_v: u64, safety_lag: u64, producer_watermark: CommitVersion) -> Option<CommitVersion> {
554	if max_v < safety_lag {
555		return None;
556	}
557	let safety_capped = max_v.saturating_sub(safety_lag);
558	Some(CommitVersion(safety_capped.min(producer_watermark.0)))
559}
560
561#[inline]
562fn query_oldest_candidates(
563	conn: &Connection,
564	eligible_max: CommitVersion,
565	target_size: usize,
566) -> CdcStorageResult<(Vec<Cdc>, Vec<Vec<u8>>)> {
567	let eligible_max_bytes = version_to_bytes(eligible_max);
568	let mut stmt = conn
569		.prepare(
570			r#"SELECT version, payload FROM "cdc"
571			   WHERE version <= ?1 ORDER BY version ASC LIMIT ?2"#,
572		)
573		.map_err(|e| CdcError::Internal(format!("compact prepare: {e}")))?;
574	let limit = (target_size as i64).saturating_add(1);
575	let rows = stmt
576		.query_map(params![eligible_max_bytes.as_slice(), limit], |row| {
577			Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
578		})
579		.map_err(|e| CdcError::Internal(format!("compact rows: {e}")))?;
580
581	let mut version_blobs: Vec<Vec<u8>> = Vec::with_capacity(target_size);
582	let mut entries: Vec<Cdc> = Vec::with_capacity(target_size);
583	for row in rows {
584		if entries.len() == target_size {
585			break;
586		}
587		let (vb, pb) = row.map_err(|e| CdcError::Internal(format!("compact row: {e}")))?;
588		let cdc: Cdc =
589			from_bytes(&pb).map_err(|e| CdcError::Codec(format!("postcard decode in compact: {e}")))?;
590		version_blobs.push(vb);
591		entries.push(cdc);
592	}
593	Ok((entries, version_blobs))
594}
595
596#[inline]
597fn build_block_summary(
598	entries: &[Cdc],
599	min_version: CommitVersion,
600	max_version: CommitVersion,
601	compressed_bytes: usize,
602) -> CompactBlockSummary {
603	CompactBlockSummary {
604		min_version,
605		max_version,
606		num_entries: entries.len(),
607		compressed_bytes,
608	}
609}
610
611#[inline]
612fn query_min_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
613	let r: Option<Vec<u8>> = conn
614		.query_row(r#"SELECT MIN(min_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
615		.ok()
616		.flatten();
617	r.map(|b| bytes_to_version(&b)).transpose()
618}
619
620#[inline]
621fn query_min_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
622	let r: Option<Vec<u8>> = conn
623		.query_row(r#"SELECT MIN(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
624		.ok()
625		.flatten();
626	r.map(|b| bytes_to_version(&b)).transpose()
627}
628
629#[inline]
630fn query_max_live(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
631	let r: Option<Vec<u8>> = conn
632		.query_row(r#"SELECT MAX(version) FROM "cdc""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
633		.ok()
634		.flatten();
635	r.map(|b| bytes_to_version(&b)).transpose()
636}
637
638#[inline]
639fn query_max_block(conn: &Connection) -> CdcStorageResult<Option<CommitVersion>> {
640	let r: Option<Vec<u8>> = conn
641		.query_row(r#"SELECT MAX(max_version) FROM "cdc_block""#, [], |row| row.get::<_, Option<Vec<u8>>>(0))
642		.ok()
643		.flatten();
644	r.map(|b| bytes_to_version(&b)).transpose()
645}
646
647fn version_to_bytes(v: CommitVersion) -> [u8; 8] {
648	v.0.to_be_bytes()
649}
650
651fn bytes_to_version(bytes: &[u8]) -> CdcStorageResult<CommitVersion> {
652	let arr: [u8; 8] = bytes.try_into().map_err(|_| CdcError::Internal("bad version bytes".to_string()))?;
653	Ok(CommitVersion(u64::from_be_bytes(arr)))
654}
655
656fn datetime_to_nanos(dt: &DateTime) -> i64 {
657	dt.to_nanos() as i64
658}
659
660fn summarize_timestamps(entries: &[Cdc]) -> (i64, i64) {
661	entries.iter().fold((i64::MAX, i64::MIN), |(lo, hi), c| {
662		let n = datetime_to_nanos(&c.timestamp);
663		(lo.min(n), hi.max(n))
664	})
665}
666
667#[inline]
668fn read_block_index_rows(
669	conn: &Connection,
670	lo_b: &[u8; 8],
671	hi_b: &[u8; 8],
672) -> CdcStorageResult<Vec<(Vec<u8>, Vec<u8>)>> {
673	let mut stmt = conn
674		.prepare(
675			r#"SELECT max_version, payload FROM "cdc_block"
676			   WHERE max_version >= ?1 AND min_version <= ?2
677			   ORDER BY max_version ASC"#,
678		)
679		.map_err(|e| CdcError::Internal(format!("range blocks prepare: {e}")))?;
680	let rows = stmt
681		.query_map(params![lo_b.as_slice(), hi_b.as_slice()], |row| {
682			Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
683		})
684		.map_err(|e| CdcError::Internal(format!("range blocks rows: {e}")))?;
685	let mut out = Vec::new();
686	for r in rows {
687		out.push(r.map_err(|e| CdcError::Internal(format!("range blocks row: {e}")))?);
688	}
689	Ok(out)
690}
691
692#[inline]
693fn read_live_payloads(conn: &Connection, lo_b: &[u8; 8], hi_b: &[u8; 8], limit: i64) -> CdcStorageResult<Vec<Vec<u8>>> {
694	let mut stmt = conn
695		.prepare(
696			r#"SELECT payload FROM "cdc"
697			   WHERE version >= ?1 AND version <= ?2
698			   ORDER BY version ASC LIMIT ?3"#,
699		)
700		.map_err(|e| CdcError::Internal(format!("range live prepare: {e}")))?;
701	let rows = stmt
702		.query_map(params![lo_b.as_slice(), hi_b.as_slice(), limit], |row| row.get::<_, Vec<u8>>(0))
703		.map_err(|e| CdcError::Internal(format!("range live rows: {e}")))?;
704	let mut out = Vec::new();
705	for r in rows {
706		out.push(r.map_err(|e| CdcError::Internal(format!("range live row: {e}")))?);
707	}
708	Ok(out)
709}
710
711#[inline]
712fn decode_live_payloads(payloads: Vec<Vec<u8>>) -> CdcStorageResult<Vec<Cdc>> {
713	let mut live_items = Vec::with_capacity(payloads.len());
714	for payload in payloads {
715		let cdc: Cdc = from_bytes(&payload)
716			.map_err(|e| CdcError::Codec(format!("postcard decode range live: {e}")))?;
717		live_items.push(cdc);
718	}
719	Ok(live_items)
720}
721
722#[inline]
723fn merge_block_and_live(block_items: Vec<Cdc>, live_items: Vec<Cdc>) -> Vec<Cdc> {
724	let mut merged: Vec<Cdc> = Vec::with_capacity(block_items.len() + live_items.len());
725	let (mut bi, mut li) = (0usize, 0usize);
726	while bi < block_items.len() && li < live_items.len() {
727		let bv = block_items[bi].version;
728		let lv = live_items[li].version;
729		if bv < lv {
730			merged.push(block_items[bi].clone());
731			bi += 1;
732		} else if bv > lv {
733			merged.push(live_items[li].clone());
734			li += 1;
735		} else {
736			merged.push(block_items[bi].clone());
737			bi += 1;
738			li += 1;
739		}
740	}
741	while bi < block_items.len() {
742		merged.push(block_items[bi].clone());
743		bi += 1;
744	}
745	while li < live_items.len() {
746		merged.push(live_items[li].clone());
747		li += 1;
748	}
749	merged
750}
751
752#[inline]
753fn scan_live_rows_below(conn: &Connection, version_bytes: &[u8; 8]) -> CdcStorageResult<LiveScan> {
754	let mut stmt = conn
755		.prepare(r#"SELECT payload FROM "cdc" WHERE version < ?1 ORDER BY version ASC"#)
756		.map_err(|e| CdcError::Internal(format!("drop_before prepare: {e}")))?;
757	let rows = stmt
758		.query_map(params![version_bytes.as_slice()], |row| row.get::<_, Vec<u8>>(0))
759		.map_err(|e| CdcError::Internal(format!("drop_before rows: {e}")))?;
760	let mut entries = Vec::new();
761	let mut cdc_count = 0;
762	for row in rows {
763		let bytes = row.map_err(|e| CdcError::Internal(format!("drop_before row: {e}")))?;
764		let cdc: Cdc =
765			from_bytes(&bytes).map_err(|e| CdcError::Codec(format!("postcard decode drop_before: {e}")))?;
766		cdc_count += 1;
767		extend_dropped_entries(&mut entries, &cdc.system_changes);
768	}
769	Ok(LiveScan {
770		cdc_count,
771		entries,
772	})
773}
774
775#[inline]
776fn apply_drop_before(
777	conn: &Connection,
778	full_block_pks: &[Vec<u8>],
779	straddle_actions: &[(Vec<u8>, BlockOutcome)],
780	version_bytes: &[u8; 8],
781	zstd_level: u8,
782) -> CdcStorageResult<()> {
783	let tx = conn.unchecked_transaction().map_err(|e| CdcError::Internal(format!("drop_before tx begin: {e}")))?;
784
785	for pk in full_block_pks {
786		tx.execute(r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#, params![pk.as_slice()])
787			.map_err(|e| CdcError::Internal(format!("drop block delete: {e}")))?;
788	}
789
790	for (max_bytes, action) in straddle_actions {
791		match action {
792			BlockOutcome::Delete => {
793				tx.execute(
794					r#"DELETE FROM "cdc_block" WHERE max_version = ?1"#,
795					params![max_bytes.as_slice()],
796				)
797				.map_err(|e| CdcError::Internal(format!("drop straddle delete: {e}")))?;
798			}
799			BlockOutcome::Rewrite {
800				survivors,
801			} => {
802				rewrite_straddle_block(&tx, max_bytes, survivors, zstd_level)?;
803			}
804		}
805	}
806
807	tx.execute(r#"DELETE FROM "cdc" WHERE version < ?1"#, params![version_bytes.as_slice()])
808		.map_err(|e| CdcError::Internal(format!("drop_before delete: {e}")))?;
809	tx.commit().map_err(|e| CdcError::Internal(format!("drop_before commit: {e}")))?;
810	Ok(())
811}
812
813#[inline]
814fn rewrite_straddle_block(
815	tx: &Transaction<'_>,
816	max_bytes: &[u8],
817	survivors: &[Cdc],
818	zstd_level: u8,
819) -> CdcStorageResult<()> {
820	let new_min = survivors.first().unwrap().version;
821	let new_max = survivors.last().unwrap().version;
822	debug_assert_eq!(new_max, bytes_to_version(max_bytes)?, "max_version is the block PK and must be preserved");
823	let (min_ts_nanos, max_ts_nanos) = summarize_timestamps(survivors);
824	let payload = block::encode(survivors, zstd_level)?;
825	tx.execute(
826		r#"INSERT OR REPLACE INTO "cdc_block"
827		   (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
828		   VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
829		params![
830			max_bytes,
831			version_to_bytes(new_min).as_slice(),
832			min_ts_nanos,
833			max_ts_nanos,
834			survivors.len() as i64,
835			payload.as_slice(),
836		],
837	)
838	.map_err(|e| CdcError::Internal(format!("drop straddle rewrite: {e}")))?;
839	Ok(())
840}
841
842#[inline]
843fn extend_dropped_entries(out: &mut Vec<DroppedCdcEntry>, system_changes: &[SystemChange]) {
844	for sys_change in system_changes {
845		out.push(DroppedCdcEntry {
846			key: sys_change.key().clone(),
847			value_bytes: sys_change.value_bytes() as u64,
848		});
849	}
850}
851
852#[inline]
853fn delete_compacted_versions(
854	tx: &Transaction<'_>,
855	version_blobs: &[Vec<u8>],
856	expected_count: usize,
857) -> CdcStorageResult<bool> {
858	let placeholders = repeat_n("?", version_blobs.len()).collect::<Vec<_>>().join(",");
859	let del_sql = format!(r#"DELETE FROM "cdc" WHERE version IN ({})"#, placeholders);
860	let del_params: Vec<SqlValue> = version_blobs.iter().map(|b| SqlValue::Blob(b.clone())).collect();
861	let mut del_stmt =
862		tx.prepare(&del_sql).map_err(|e| CdcError::Internal(format!("compact delete prepare: {e}")))?;
863	let rows_deleted = del_stmt
864		.execute(params_from_iter(del_params.iter()))
865		.map_err(|e| CdcError::Internal(format!("compact delete execute: {e}")))?;
866	Ok(rows_deleted == expected_count)
867}
868
869#[inline]
870fn insert_compacted_block(
871	tx: &Transaction<'_>,
872	payload: &[u8],
873	min_version: CommitVersion,
874	max_version: CommitVersion,
875	min_ts_nanos: i64,
876	max_ts_nanos: i64,
877	num_entries: usize,
878) -> CdcStorageResult<()> {
879	tx.execute(
880		r#"INSERT INTO "cdc_block"
881		   (max_version, min_version, min_timestamp, max_timestamp, num_entries, payload)
882		   VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
883		params![
884			version_to_bytes(max_version).as_slice(),
885			version_to_bytes(min_version).as_slice(),
886			min_ts_nanos,
887			max_ts_nanos,
888			num_entries as i64,
889			payload,
890		],
891	)
892	.map_err(|e| CdcError::Internal(format!("compact insert block: {e}")))?;
893	Ok(())
894}
895
896impl CdcStorage for SqliteCdcStorage {
897	fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
898		let bytes = to_stdvec(cdc).map_err(|e| CdcError::Codec(format!("postcard encode: {e}")))?;
899		let conn = self.inner.conn.lock();
900		conn.execute(
901			r#"INSERT OR REPLACE INTO "cdc" (version, payload) VALUES (?1, ?2)"#,
902			params![version_to_bytes(cdc.version).as_slice(), bytes.as_slice()],
903		)
904		.map_err(|e| CdcError::Internal(format!("insert cdc: {e}")))?;
905		Ok(())
906	}
907
908	fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
909		if let Some(cdc) = self.read_live(version)? {
910			return Ok(Some(cdc));
911		}
912		self.read_from_blocks(version)
913	}
914
915	fn read_range(
916		&self,
917		start: Bound<CommitVersion>,
918		end: Bound<CommitVersion>,
919		batch_size: u64,
920	) -> CdcStorageResult<CdcBatch> {
921		let Some((lo_inc, hi_inc)) = normalize_range_inclusive(start, end) else {
922			return Ok(CdcBatch {
923				items: Vec::new(),
924				has_more: false,
925			});
926		};
927		let want = batch_size as usize;
928
929		let (block_rows, live_payloads) = self.snapshot_block_and_live(lo_inc, hi_inc, batch_size)?;
930		let block_items = self.decode_block_rows(block_rows, lo_inc, hi_inc)?;
931		let live_items = decode_live_payloads(live_payloads)?;
932		let mut merged = merge_block_and_live(block_items, live_items);
933
934		let has_more = merged.len() > want;
935		merged.truncate(want);
936		Ok(CdcBatch {
937			items: merged,
938			has_more,
939		})
940	}
941
942	fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
943		Ok(self.read(version)?.map(|c| c.system_changes.len()).unwrap_or(0))
944	}
945
946	fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
947		let conn = self.inner.conn.lock();
948		let block_min = query_min_block(&conn)?;
949		let live_min = query_min_live(&conn)?;
950		Ok([block_min, live_min].into_iter().flatten().min())
951	}
952
953	fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
954		let conn = self.inner.conn.lock();
955		if let Some(v) = query_max_live(&conn)? {
956			return Ok(Some(v));
957		}
958		query_max_block(&conn)
959	}
960
961	fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
962		let conn = self.inner.conn.lock();
963		let version_bytes = version_to_bytes(version);
964		let zstd_level = self.inner.last_zstd_level.load(Ordering::Relaxed);
965
966		let full_blocks = self.scan_full_blocks_below(&conn, &version_bytes)?;
967		let straddle = self.scan_straddle_blocks(&conn, version, &version_bytes)?;
968		let live = scan_live_rows_below(&conn, &version_bytes)?;
969
970		apply_drop_before(&conn, &full_blocks.pks, &straddle.actions, &version_bytes, zstd_level)?;
971		let _ = pragma::incremental_vacuum(&conn);
972
973		let mut entries = full_blocks.entries;
974		entries.extend(straddle.entries);
975		entries.extend(live.entries);
976		Ok(DropBeforeResult {
977			count: full_blocks.cdc_count + straddle.cdc_count + live.cdc_count,
978			entries,
979		})
980	}
981
982	fn find_ttl_cutoff(&self, cutoff: DateTime) -> CdcStorageResult<Option<CommitVersion>> {
983		let cutoff_nanos = datetime_to_nanos(&cutoff);
984		if let Some(v) = self.try_block_index_cutoff(cutoff_nanos)? {
985			return Ok(Some(v));
986		}
987		let Some(start) = self.pick_scan_start()? else {
988			return self.max_version_blocks().map(|opt| opt.map(|v| CommitVersion(v.0.saturating_add(1))));
989		};
990		self.scan_live_for_cutoff(start, cutoff_nanos)
991	}
992}
993
994impl SqliteCdcStorage {
995	#[inline]
996	fn try_block_index_cutoff(&self, cutoff_nanos: i64) -> CdcStorageResult<Option<CommitVersion>> {
997		let block_hit: Option<Vec<u8>> = {
998			let conn = self.inner.conn.lock();
999			conn.query_row(
1000				r#"SELECT min_version FROM "cdc_block"
1001				   WHERE max_timestamp >= ?1 ORDER BY max_timestamp ASC LIMIT 1"#,
1002				params![cutoff_nanos],
1003				|row| row.get::<_, Vec<u8>>(0),
1004			)
1005			.ok()
1006		};
1007		block_hit.map(|b| bytes_to_version(&b)).transpose()
1008	}
1009
1010	#[inline]
1011	fn pick_scan_start(&self) -> CdcStorageResult<Option<CommitVersion>> {
1012		self.min_version_live()
1013	}
1014
1015	#[inline]
1016	fn scan_live_for_cutoff(
1017		&self,
1018		start: CommitVersion,
1019		cutoff_nanos: i64,
1020	) -> CdcStorageResult<Option<CommitVersion>> {
1021		let mut next_start = Bound::Included(start);
1022		loop {
1023			let batch = self.read_range_live(next_start, Bound::Unbounded, 256)?;
1024			if batch.items.is_empty() {
1025				let last = self.max_version()?.unwrap_or(CommitVersion(0));
1026				return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1027			}
1028			for cdc in &batch.items {
1029				if datetime_to_nanos(&cdc.timestamp) >= cutoff_nanos {
1030					return Ok(Some(cdc.version));
1031				}
1032			}
1033			if !batch.has_more {
1034				let last = batch.items.last().unwrap().version;
1035				return Ok(Some(CommitVersion(last.0.saturating_add(1))));
1036			}
1037			next_start = Bound::Excluded(batch.items.last().unwrap().version);
1038		}
1039	}
1040
1041	#[inline]
1042	fn read_live(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
1043		let conn = self.inner.conn.lock();
1044		let result = conn.query_row(
1045			r#"SELECT payload FROM "cdc" WHERE version = ?1"#,
1046			params![version_to_bytes(version).as_slice()],
1047			|row| row.get::<_, Vec<u8>>(0),
1048		);
1049		match result {
1050			Ok(bytes) => {
1051				let cdc: Cdc = from_bytes(&bytes)
1052					.map_err(|e| CdcError::Codec(format!("postcard decode: {e}")))?;
1053				Ok(Some(cdc))
1054			}
1055			Err(QueryReturnedNoRows) => Ok(None),
1056			Err(e) => Err(CdcError::Internal(format!("read cdc: {e}"))),
1057		}
1058	}
1059}