Skip to main content

reifydb_store_multi/hot/sqlite/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::ops::Bound;
5
6use reifydb_core::common::CommitVersion;
7use rusqlite::{Result as SqliteResult, ToSql, types};
8
9#[inline]
10pub(super) fn version_to_bytes(version: CommitVersion) -> [u8; 8] {
11	version.0.to_be_bytes()
12}
13
14/// Build a range query that returns the latest version <= requested version for each key.
15///
16/// Uses a subquery with window function to get the most recent version per key:
17/// ```sql
18/// SELECT key, version, value FROM (
19///     SELECT key, version, value,
20///            ROW_NUMBER() OVER (PARTITION BY key ORDER BY version DESC) as rn
21///     FROM "{table}" WHERE key >= ?1 AND key < ?2 AND version <= ?3
22/// ) WHERE rn = 1 ORDER BY key LIMIT ?4
23/// ```
24pub(super) fn build_versioned_range_query(
25	table_name: &str,
26	start: Bound<&[u8]>,
27	end: Bound<&[u8]>,
28	version: CommitVersion,
29	reverse: bool,
30	limit: usize,
31) -> (String, Vec<QueryParam>) {
32	let mut conditions = Vec::new();
33	let mut params: Vec<QueryParam> = Vec::new();
34
35	match start {
36		Bound::Included(v) => {
37			conditions.push(format!("key >= ?{}", params.len() + 1));
38			params.push(QueryParam::Blob(v.to_vec()));
39		}
40		Bound::Excluded(v) => {
41			conditions.push(format!("key > ?{}", params.len() + 1));
42			params.push(QueryParam::Blob(v.to_vec()));
43		}
44		Bound::Unbounded => {}
45	}
46
47	match end {
48		Bound::Included(v) => {
49			conditions.push(format!("key <= ?{}", params.len() + 1));
50			params.push(QueryParam::Blob(v.to_vec()));
51		}
52		Bound::Excluded(v) => {
53			conditions.push(format!("key < ?{}", params.len() + 1));
54			params.push(QueryParam::Blob(v.to_vec()));
55		}
56		Bound::Unbounded => {}
57	}
58
59	conditions.push(format!("version <= ?{}", params.len() + 1));
60	params.push(QueryParam::Version(version_to_bytes(version)));
61
62	let where_clause = format!(" WHERE {}", conditions.join(" AND "));
63
64	let order = if reverse {
65		"DESC"
66	} else {
67		"ASC"
68	};
69
70	// Use window function to get the most recent version per key
71	let query = format!(
72		"SELECT key, version, value FROM (\
73			SELECT key, version, value, \
74				ROW_NUMBER() OVER (PARTITION BY key ORDER BY version DESC) as rn \
75			FROM \"{}\"{}\
76		) WHERE rn = 1 ORDER BY key {} LIMIT {}",
77		table_name, where_clause, order, limit
78	);
79
80	(query, params)
81}
82
83/// Query parameter type for SQLite queries.
84#[derive(Debug, Clone)]
85pub(super) enum QueryParam {
86	Blob(Vec<u8>),
87	Version([u8; 8]),
88}
89
90impl ToSql for QueryParam {
91	fn to_sql(&self) -> SqliteResult<types::ToSqlOutput<'_>> {
92		match self {
93			QueryParam::Blob(v) => v.to_sql(),
94			QueryParam::Version(v) => v.as_slice().to_sql(),
95		}
96	}
97}