Skip to main content

reifydb_store_multi/hot/sqlite/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! SQL query builders for SQLite backend with MVCC versioning.
5//!
6//! The table schema uses (key, version) composite primary key:
7//! ```sql
8//! CREATE TABLE IF NOT EXISTS "{table_name}" (
9//!     key BLOB NOT NULL,
10//!     version BLOB NOT NULL,
11//!     value BLOB,
12//!     PRIMARY KEY (key, version)
13//! ) WITHOUT ROWID;
14//! ```
15
16use std::ops::Bound;
17
18use reifydb_core::common::CommitVersion;
19use rusqlite::{Result as SqliteResult, ToSql, types};
20
21#[inline]
22pub(super) fn version_to_bytes(version: CommitVersion) -> [u8; 8] {
23	version.0.to_be_bytes()
24}
25
26/// Build a range query that returns the latest version <= requested version for each key.
27///
28/// Uses a subquery with window function to get the most recent version per key:
29/// ```sql
30/// SELECT key, version, value FROM (
31///     SELECT key, version, value,
32///            ROW_NUMBER() OVER (PARTITION BY key ORDER BY version DESC) as rn
33///     FROM "{table}" WHERE key >= ?1 AND key < ?2 AND version <= ?3
34/// ) WHERE rn = 1 ORDER BY key LIMIT ?4
35/// ```
36pub(super) fn build_versioned_range_query(
37	table_name: &str,
38	start: Bound<&[u8]>,
39	end: Bound<&[u8]>,
40	version: CommitVersion,
41	reverse: bool,
42	limit: usize,
43) -> (String, Vec<QueryParam>) {
44	let mut conditions = Vec::new();
45	let mut params: Vec<QueryParam> = Vec::new();
46
47	match start {
48		Bound::Included(v) => {
49			conditions.push(format!("key >= ?{}", params.len() + 1));
50			params.push(QueryParam::Blob(v.to_vec()));
51		}
52		Bound::Excluded(v) => {
53			conditions.push(format!("key > ?{}", params.len() + 1));
54			params.push(QueryParam::Blob(v.to_vec()));
55		}
56		Bound::Unbounded => {}
57	}
58
59	match end {
60		Bound::Included(v) => {
61			conditions.push(format!("key <= ?{}", params.len() + 1));
62			params.push(QueryParam::Blob(v.to_vec()));
63		}
64		Bound::Excluded(v) => {
65			conditions.push(format!("key < ?{}", params.len() + 1));
66			params.push(QueryParam::Blob(v.to_vec()));
67		}
68		Bound::Unbounded => {}
69	}
70
71	conditions.push(format!("version <= ?{}", params.len() + 1));
72	params.push(QueryParam::Version(version_to_bytes(version)));
73
74	let where_clause = format!(" WHERE {}", conditions.join(" AND "));
75
76	let order = if reverse {
77		"DESC"
78	} else {
79		"ASC"
80	};
81
82	// Use window function to get the most recent version per key
83	let query = format!(
84		"SELECT key, version, value FROM (\
85			SELECT key, version, value, \
86				ROW_NUMBER() OVER (PARTITION BY key ORDER BY version DESC) as rn \
87			FROM \"{}\"{}\
88		) WHERE rn = 1 ORDER BY key {} LIMIT {}",
89		table_name, where_clause, order, limit
90	);
91
92	(query, params)
93}
94
95/// Query parameter type for SQLite queries.
96#[derive(Debug, Clone)]
97pub(super) enum QueryParam {
98	Blob(Vec<u8>),
99	Version([u8; 8]),
100}
101
102impl ToSql for QueryParam {
103	fn to_sql(&self) -> SqliteResult<types::ToSqlOutput<'_>> {
104		match self {
105			QueryParam::Blob(v) => v.to_sql(),
106			QueryParam::Version(v) => v.as_slice().to_sql(),
107		}
108	}
109}