1use std::{collections::HashMap, ops::Bound, sync::Arc};
5
6use reifydb_core::{
7 common::CommitVersion, encoded::key::EncodedKey, error::diagnostic::internal::internal,
8 interface::store::EntryKind,
9};
10use reifydb_runtime::sync::mutex::Mutex;
11use reifydb_sqlite::{
12 SqliteConfig,
13 connection::{connect, convert_flags, resolve_db_path},
14 pragma,
15};
16use reifydb_type::{Result, error, util::cowvec::CowVec};
17use rusqlite::{Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, params, params_from_iter};
18use tracing::instrument;
19
20use super::{
21 entry::warm_current_table_name,
22 query::{
23 build_create_warm_current_sql, build_get_warm_current_sql, build_range_warm_current_sql,
24 build_upsert_warm_current_sql, version_from_bytes, version_to_bytes,
25 },
26};
27use crate::tier::{HistoricalCursor, RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
28
29#[derive(Clone)]
30pub struct SqlitePersistentStorage {
31 inner: Arc<SqlitePersistentStorageInner>,
32}
33
34struct SqlitePersistentStorageInner {
35 conn: Mutex<Connection>,
36}
37
38impl SqlitePersistentStorage {
39 #[instrument(name = "store::multi::persistent::sqlite::new", level = "debug", skip(config), fields(
40 db_path = ?config.path,
41 page_size = config.page_size,
42 journal_mode = %config.journal_mode.as_str()
43 ))]
44 pub fn new(config: SqliteConfig) -> Self {
45 let db_path = resolve_db_path(config.path.clone(), "persistent.db");
46 let flags = convert_flags(&config.flags);
47
48 let conn = connect(&db_path, flags).expect("Failed to connect to persistent database");
49 pragma::apply(&conn, &config).expect("Failed to configure persistent SQLite pragmas");
50
51 Self {
52 inner: Arc::new(SqlitePersistentStorageInner {
53 conn: Mutex::new(conn),
54 }),
55 }
56 }
57
58 pub fn in_memory() -> Self {
59 Self::new(SqliteConfig::in_memory())
60 }
61
62 pub fn count_current(&self, table: EntryKind) -> Result<u64> {
63 let table_name = warm_current_table_name(table);
64 let conn = self.inner.conn.lock();
65 let sql = format!("SELECT COUNT(*) FROM \"{}\"", table_name);
66 match conn.query_row(&sql, [], |row| row.get::<_, i64>(0)) {
67 Ok(c) => Ok(c as u64),
68 Err(e) if e.to_string().contains("no such table") => Ok(0),
69 Err(e) => Err(error!(internal(format!("Failed to count persistent current: {}", e)))),
70 }
71 }
72
73 fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
74 conn.execute(&build_create_warm_current_sql(table_name), [])?;
75 Ok(())
76 }
77
78 fn range_chunk(&self, cursor: &mut RangeCursor, req: RangeChunkRequest<'_>) -> Result<RangeBatch> {
79 if cursor.exhausted {
80 return Ok(RangeBatch::empty());
81 }
82
83 let table_name = warm_current_table_name(req.table);
84 let conn = self.inner.conn.lock();
85
86 let sql = build_range_warm_current_sql(
87 &table_name,
88 bound_shape(req.start),
89 bound_shape(req.end),
90 cursor.last_key.is_some(),
91 req.descending,
92 );
93
94 let mut stmt = match conn.prepare_cached(&sql) {
95 Ok(s) => s,
96 Err(e) if e.to_string().contains("no such table") => {
97 cursor.exhausted = true;
98 return Ok(RangeBatch::empty());
99 }
100 Err(e) => return Err(error!(internal(format!("Failed to prepare persistent range: {}", e)))),
101 };
102
103 let version_bytes = version_to_bytes(req.version).to_vec();
104 let limit_i64 = req.batch_size as i64;
105 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
106 match req.start {
107 Bound::Included(s) | Bound::Excluded(s) => params.push(Box::new(s.to_vec())),
108 Bound::Unbounded => {}
109 }
110 match req.end {
111 Bound::Included(e) | Bound::Excluded(e) => params.push(Box::new(e.to_vec())),
112 Bound::Unbounded => {}
113 }
114 if let Some(k) = cursor.last_key.as_deref() {
115 params.push(Box::new(k.to_vec()));
116 }
117 params.push(Box::new(version_bytes));
118 params.push(Box::new(limit_i64));
119
120 let entries = match stmt.query_map(params_from_iter(params), |row| {
121 let key: Vec<u8> = row.get(0)?;
122 let version_blob: Vec<u8> = row.get(1)?;
123 let value: Option<Vec<u8>> = row.get(2)?;
124 Ok(RawEntry {
125 key: EncodedKey::new(key),
126 version: version_from_bytes(&version_blob),
127 value: value.map(CowVec::new),
128 })
129 }) {
130 Ok(rows) => rows
131 .collect::<SqliteResult<Vec<_>>>()
132 .map_err(|e| error!(internal(format!("Failed to read persistent row: {}", e))))?,
133 Err(e) if e.to_string().contains("no such table") => {
134 cursor.exhausted = true;
135 return Ok(RangeBatch::empty());
136 }
137 Err(e) => return Err(error!(internal(format!("Failed to scan persistent range: {}", e)))),
138 };
139
140 if entries.len() < req.batch_size {
141 cursor.exhausted = true;
142 }
143 if let Some(last) = entries.last() {
144 cursor.last_key = Some(last.key.clone());
145 }
146
147 let has_more = !cursor.exhausted;
148 Ok(RangeBatch {
149 entries,
150 has_more,
151 })
152 }
153}
154
155fn bound_shape(b: Bound<&[u8]>) -> Bound<()> {
156 match b {
157 Bound::Included(_) => Bound::Included(()),
158 Bound::Excluded(_) => Bound::Excluded(()),
159 Bound::Unbounded => Bound::Unbounded,
160 }
161}
162
163struct RangeChunkRequest<'a> {
164 table: EntryKind,
165 start: Bound<&'a [u8]>,
166 end: Bound<&'a [u8]>,
167 version: CommitVersion,
168 batch_size: usize,
169 descending: bool,
170}
171
172impl TierStorage for SqlitePersistentStorage {
173 #[instrument(name = "store::multi::persistent::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
174 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
175 let table_name = warm_current_table_name(table);
176 let conn = self.inner.conn.lock();
177 let sql = build_get_warm_current_sql(&table_name);
178
179 let result = match conn.prepare_cached(&sql) {
180 Ok(mut stmt) => stmt.query_row(params![key], |row| {
181 let version_bytes: Vec<u8> = row.get(0)?;
182 let value: Option<Vec<u8>> = row.get(1)?;
183 Ok((version_from_bytes(&version_bytes), value))
184 }),
185 Err(e) if e.to_string().contains("no such table") => Err(QueryReturnedNoRows),
186 Err(e) => return Err(error!(internal(format!("Failed to prepare persistent get: {}", e)))),
187 };
188
189 match result {
190 Ok((stored_version, value)) if stored_version <= version => Ok(value.map(CowVec::new)),
191 Ok(_) => Ok(None),
192 Err(QueryReturnedNoRows) => Ok(None),
193 Err(e) if e.to_string().contains("no such table") => Ok(None),
194 Err(e) => Err(error!(internal(format!("Failed to read persistent: {}", e)))),
195 }
196 }
197
198 #[instrument(name = "store::multi::persistent::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
199 fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
200 if batches.is_empty() {
201 return Ok(());
202 }
203
204 let conn = self.inner.conn.lock();
205 let tx = conn
206 .unchecked_transaction()
207 .map_err(|e| error!(internal(format!("Failed to start persistent transaction: {}", e))))?;
208
209 let new_version_bytes = version_to_bytes(version);
210
211 for (table, entries) in batches {
212 let table_name = warm_current_table_name(table);
213 Self::create_table_if_needed(&tx, &table_name)
214 .map_err(|e| error!(internal(format!("Failed to ensure persistent table: {}", e))))?;
215
216 let upsert_sql = build_upsert_warm_current_sql(&table_name);
217 let mut stmt = tx
218 .prepare_cached(&upsert_sql)
219 .map_err(|e| error!(internal(format!("Failed to prepare persistent upsert: {}", e))))?;
220
221 for (key, value) in entries {
222 let key_slice = key.as_slice();
223 let value_slice = value.as_ref().map(|v| v.as_slice());
224 stmt.execute(params![key_slice, new_version_bytes.as_slice(), value_slice]).map_err(
225 |e| error!(internal(format!("Failed to upsert persistent row: {}", e))),
226 )?;
227 }
228 }
229
230 tx.commit().map_err(|e| error!(internal(format!("Failed to commit persistent transaction: {}", e))))
231 }
232
233 fn range_next(
234 &self,
235 table: EntryKind,
236 cursor: &mut RangeCursor,
237 start: Bound<&[u8]>,
238 end: Bound<&[u8]>,
239 version: CommitVersion,
240 batch_size: usize,
241 ) -> Result<RangeBatch> {
242 self.range_chunk(
243 cursor,
244 RangeChunkRequest {
245 table,
246 start,
247 end,
248 version,
249 batch_size,
250 descending: false,
251 },
252 )
253 }
254
255 fn range_rev_next(
256 &self,
257 table: EntryKind,
258 cursor: &mut RangeCursor,
259 start: Bound<&[u8]>,
260 end: Bound<&[u8]>,
261 version: CommitVersion,
262 batch_size: usize,
263 ) -> Result<RangeBatch> {
264 self.range_chunk(
265 cursor,
266 RangeChunkRequest {
267 table,
268 start,
269 end,
270 version,
271 batch_size,
272 descending: true,
273 },
274 )
275 }
276
277 fn ensure_table(&self, table: EntryKind) -> Result<()> {
278 let table_name = warm_current_table_name(table);
279 let conn = self.inner.conn.lock();
280 Self::create_table_if_needed(&conn, &table_name)
281 .map_err(|e| error!(internal(format!("Failed to ensure persistent table: {}", e))))
282 }
283
284 fn clear_table(&self, table: EntryKind) -> Result<()> {
285 let table_name = warm_current_table_name(table);
286 let conn = self.inner.conn.lock();
287 let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
288 if let Err(e) = result
289 && !e.to_string().contains("no such table")
290 {
291 return Err(error!(internal(format!("Failed to clear persistent {}: {}", table_name, e))));
292 }
293 Ok(())
294 }
295
296 fn drop(&self, _batches: HashMap<EntryKind, Vec<(EncodedKey, CommitVersion)>>) -> Result<()> {
297 panic!("SqlitePersistentStorage::drop: persistent tier has no historical chain to drop versions from");
300 }
301
302 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
303 let table_name = warm_current_table_name(table);
306 let conn = self.inner.conn.lock();
307 let sql = build_get_warm_current_sql(&table_name);
308
309 let result = match conn.prepare_cached(&sql) {
310 Ok(mut stmt) => stmt.query_row(params![key], |row| {
311 let version_bytes: Vec<u8> = row.get(0)?;
312 let value: Option<Vec<u8>> = row.get(1)?;
313 Ok((version_from_bytes(&version_bytes), value.map(CowVec::new)))
314 }),
315 Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
316 Err(e) => {
317 return Err(error!(internal(format!(
318 "Failed to prepare persistent get_all_versions: {}",
319 e
320 ))));
321 }
322 };
323
324 match result {
325 Ok(row) => Ok(vec![row]),
326 Err(QueryReturnedNoRows) => Ok(Vec::new()),
327 Err(e) if e.to_string().contains("no such table") => Ok(Vec::new()),
328 Err(e) => Err(error!(internal(format!("Failed to read persistent versions: {}", e)))),
329 }
330 }
331
332 fn scan_historical_below(
333 &self,
334 _table: EntryKind,
335 _cutoff: CommitVersion,
336 _cursor: &mut HistoricalCursor,
337 _batch_size: usize,
338 ) -> Result<Vec<(EncodedKey, CommitVersion)>> {
339 panic!("SqlitePersistentStorage::scan_historical_below: persistent tier has no historical chain");
342 }
343}
344
345impl TierBackend for SqlitePersistentStorage {}