1use std::collections::BTreeMap;
4use std::path::Path;
5use std::sync::Arc;
6
7use ankurah_core::entity::TemporaryEntity;
8use ankurah_core::error::{MutationError, RetrievalError};
9use ankurah_core::property::backend::backend_from_string;
10use ankurah_core::selection::filter::evaluate_predicate;
11use ankurah_core::storage::{StorageCollection, StorageEngine};
12use ankurah_proto::{
13 AttestationSet, Attested, Clock, CollectionId, EntityId, EntityState, Event, EventId, OperationSet, State, StateBuffers,
14};
15use async_trait::async_trait;
16use rusqlite::{params_from_iter, Connection};
17use tracing::{debug, warn};
18
19use crate::connection::{PooledConnection, SqliteConnectionManager};
20use crate::error::SqliteError;
21use crate::sql_builder::{split_predicate_for_sqlite, SqlBuilder};
22use crate::value::SqliteValue;
23
24pub const DEFAULT_POOL_SIZE: u32 = 10;
26
27pub struct SqliteStorageEngine {
29 pool: bb8::Pool<SqliteConnectionManager>,
30}
31
32impl SqliteStorageEngine {
33 pub fn new(pool: bb8::Pool<SqliteConnectionManager>) -> Self { Self { pool } }
35
36 pub async fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
38 let manager = SqliteConnectionManager::file(path.as_ref());
39 let pool = bb8::Pool::builder().max_size(DEFAULT_POOL_SIZE).build(manager).await?;
40 Ok(Self::new(pool))
41 }
42
43 pub async fn open_in_memory() -> anyhow::Result<Self> {
45 let manager = SqliteConnectionManager::memory();
46 let pool = bb8::Pool::builder().max_size(1).build(manager).await?;
48 Ok(Self::new(pool))
49 }
50
51 pub fn sane_name(collection: &str) -> bool {
53 for char in collection.chars() {
54 match char {
55 c if c.is_alphanumeric() => {}
56 '_' | '.' | ':' => {}
57 _ => return false,
58 }
59 }
60 true
61 }
62
63 pub fn pool(&self) -> &bb8::Pool<SqliteConnectionManager> { &self.pool }
65}
66
67#[async_trait]
68impl StorageEngine for SqliteStorageEngine {
69 type Value = SqliteValue;
70
71 async fn collection(&self, collection_id: &CollectionId) -> Result<Arc<dyn StorageCollection>, RetrievalError> {
72 if !Self::sane_name(collection_id.as_str()) {
73 return Err(RetrievalError::InvalidBucketName);
74 }
75
76 let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
77
78 let bucket = SqliteBucket::new(self.pool.clone(), collection_id.clone());
79
80 let collection_id_clone = collection_id.clone();
82 conn.with_connection(move |c| {
83 create_state_table(c, &collection_id_clone)?;
84 create_event_table(c, &collection_id_clone)?;
85 Ok(())
86 })
87 .await?;
88
89 bucket.rebuild_columns_cache(&conn).await?;
91
92 Ok(Arc::new(bucket))
93 }
94
95 async fn delete_all_collections(&self) -> Result<bool, MutationError> {
96 let conn = self.pool.get().await.map_err(|e| MutationError::General(Box::new(SqliteError::Pool(e.to_string()))))?;
97
98 conn.with_connection(|c| {
99 let mut stmt = c.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")?;
101 let tables: Vec<String> = stmt.query_map([], |row| row.get(0))?.filter_map(|r| r.ok()).collect();
102
103 if tables.is_empty() {
104 return Ok(false);
105 }
106
107 for table in tables {
108 c.execute(&format!(r#"DROP TABLE IF EXISTS "{}""#, table), [])?;
109 }
110
111 Ok(true)
112 })
113 .await
114 .map_err(|e| MutationError::General(Box::new(e)))
115 }
116}
117
118fn create_state_table(conn: &Connection, collection_id: &CollectionId) -> Result<(), SqliteError> {
119 let table_name = collection_id.as_str();
120 let query = format!(
121 r#"CREATE TABLE IF NOT EXISTS "{}"(
122 "id" TEXT PRIMARY KEY,
123 "state_buffer" BLOB NOT NULL,
124 "head" TEXT NOT NULL,
125 "attestations" BLOB
126 )"#,
127 table_name
128 );
129 debug!("Creating state table: {}", query);
130 conn.execute(&query, [])?;
131 Ok(())
132}
133
134fn create_event_table(conn: &Connection, collection_id: &CollectionId) -> Result<(), SqliteError> {
135 let table_name = format!("{}_event", collection_id.as_str());
136 let query = format!(
137 r#"CREATE TABLE IF NOT EXISTS "{}"(
138 "id" TEXT PRIMARY KEY,
139 "entity_id" TEXT,
140 "operations" BLOB,
141 "parent" TEXT,
142 "attestations" BLOB
143 )"#,
144 table_name
145 );
146 debug!("Creating event table: {}", query);
147 conn.execute(&query, [])?;
148
149 let index_query = format!(r#"CREATE INDEX IF NOT EXISTS "{}_entity_id_idx" ON "{}"("entity_id")"#, table_name, table_name);
151 conn.execute(&index_query, [])?;
152
153 Ok(())
154}
155
156#[derive(Clone, Debug)]
158pub struct SqliteColumn {
159 pub name: String,
160 #[allow(dead_code)]
161 pub data_type: String,
162}
163
164pub struct SqliteBucket {
166 pool: bb8::Pool<SqliteConnectionManager>,
167 collection_id: CollectionId,
168 state_table_name: String,
170 event_table_name: String,
172 columns: Arc<std::sync::RwLock<Vec<SqliteColumn>>>,
173 ddl_lock: Arc<tokio::sync::Mutex<()>>,
174}
175
176impl SqliteBucket {
177 fn new(pool: bb8::Pool<SqliteConnectionManager>, collection_id: CollectionId) -> Self {
179 let state_table_name = collection_id.as_str().to_string();
180 let event_table_name = format!("{}_event", collection_id.as_str());
181 Self {
182 pool,
183 collection_id,
184 state_table_name,
185 event_table_name,
186 columns: Arc::new(std::sync::RwLock::new(Vec::new())),
187 ddl_lock: Arc::new(tokio::sync::Mutex::new(())),
188 }
189 }
190
191 #[inline]
192 fn state_table(&self) -> &str { &self.state_table_name }
193
194 #[inline]
195 fn event_table(&self) -> &str { &self.event_table_name }
196
197 pub fn existing_columns(&self) -> Vec<String> {
199 let columns = self.columns.read().expect("RwLock poisoned");
200 columns.iter().map(|c| c.name.clone()).collect()
201 }
202
203 pub fn has_column(&self, name: &str) -> bool {
205 let columns = self.columns.read().expect("RwLock poisoned");
206 columns.iter().any(|c| c.name == name)
207 }
208
209 async fn rebuild_columns_cache(&self, conn: &PooledConnection) -> Result<(), SqliteError> {
210 let table_name = self.state_table().to_owned();
211 let new_columns = conn
212 .with_connection(move |c| {
213 let mut stmt = c.prepare(&format!("PRAGMA table_info(\"{}\")", table_name))?;
214 let columns: Vec<SqliteColumn> = stmt
215 .query_map([], |row| Ok(SqliteColumn { name: row.get(1)?, data_type: row.get(2)? }))?
216 .filter_map(|r| r.ok())
217 .collect();
218 Ok(columns)
219 })
220 .await?;
221
222 let mut columns = self.columns.write().expect("RwLock poisoned");
223 *columns = new_columns;
224 Ok(())
225 }
226
227 async fn add_missing_columns(&self, conn: &PooledConnection, missing: Vec<(String, &'static str)>) -> Result<(), SqliteError> {
228 if missing.is_empty() {
229 return Ok(());
230 }
231
232 let _lock = self.ddl_lock.lock().await;
234
235 self.rebuild_columns_cache(conn).await?;
237
238 let table_name = self.state_table();
239 for (column, datatype) in missing {
240 if SqliteStorageEngine::sane_name(&column) && !self.has_column(&column) {
241 let alter_query = format!(r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#, table_name, column, datatype);
242 debug!("Adding column: {}", alter_query);
243
244 let query = alter_query.clone();
245 conn.with_connection(move |c| {
246 c.execute(&query, [])?;
247 Ok(())
248 })
249 .await?;
250 }
251 }
252
253 self.rebuild_columns_cache(conn).await?;
254 Ok(())
255 }
256}
257
258#[async_trait]
259impl StorageCollection for SqliteBucket {
260 async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError> {
261 let conn = self.pool.get().await.map_err(|e| MutationError::General(Box::new(SqliteError::Pool(e.to_string()))))?;
262
263 if state.payload.state.head.is_empty() {
265 warn!("Warning: Empty head detected for entity {}", state.payload.entity_id);
266 }
267
268 let state_buffers = bincode::serialize(&state.payload.state.state_buffers)?;
269 let head_json = serde_json::to_string(&state.payload.state.head).map_err(|e| MutationError::General(Box::new(e)))?;
270 let attestations_blob = bincode::serialize(&state.attestations)?;
271 let id = state.payload.entity_id.to_base64();
272 let id_clone = id.clone(); let mut materialized: Vec<(String, Option<SqliteValue>, bool)> = Vec::new(); let mut seen_properties = std::collections::HashSet::new();
277
278 for (name, state_buffer) in state.payload.state.state_buffers.iter() {
279 let backend = backend_from_string(name, Some(state_buffer))?;
280 for (column, value) in backend.property_values() {
281 if !seen_properties.insert(column.clone()) {
282 continue;
283 }
284
285 let sqlite_value: Option<SqliteValue> = value.map(|v| v.into());
286 let is_jsonb = sqlite_value.as_ref().is_some_and(|v| v.is_jsonb());
287
288 if !self.has_column(&column) {
289 if let Some(ref sv) = sqlite_value {
290 self.add_missing_columns(&conn, vec![(column.clone(), sv.sqlite_type())]).await?;
291 } else {
292 continue;
293 }
294 }
295
296 materialized.push((column, sqlite_value, is_jsonb));
297 }
298 }
299
300 const BASE_COLUMNS: &[&str] = &["id", "state_buffer", "head", "attestations"];
302
303 let table_name = self.state_table();
304 let num_columns = BASE_COLUMNS.len() + materialized.len();
305 let mut columns: Vec<&str> = Vec::with_capacity(num_columns);
306 columns.extend_from_slice(BASE_COLUMNS);
307
308 let mut values: Vec<rusqlite::types::Value> = Vec::with_capacity(num_columns);
309 values.push(rusqlite::types::Value::Text(id));
310 values.push(rusqlite::types::Value::Blob(state_buffers));
311 values.push(rusqlite::types::Value::Text(head_json));
312 values.push(rusqlite::types::Value::Blob(attestations_blob));
313
314 let mut placeholder_is_jsonb: Vec<bool> = Vec::with_capacity(num_columns);
316 placeholder_is_jsonb.resize(BASE_COLUMNS.len(), false);
317
318 for (name, value, is_jsonb) in &materialized {
319 columns.push(name.as_str());
320 values.push(match value {
321 Some(v) => v.to_sql(),
322 None => rusqlite::types::Value::Null,
323 });
324 placeholder_is_jsonb.push(*is_jsonb);
325 }
326
327 let columns_str = columns.iter().map(|c| format!(r#""{}""#, c)).collect::<Vec<_>>().join(", ");
328 let placeholders =
330 placeholder_is_jsonb.iter().map(|is_jsonb| if *is_jsonb { "jsonb(?)" } else { "?" }).collect::<Vec<_>>().join(", ");
331 let update_str = columns.iter().skip(1).map(|c| format!(r#""{}" = excluded."{}""#, c, c)).collect::<Vec<_>>().join(", ");
332
333 let query = format!(
336 r#"INSERT INTO "{}"({}) VALUES({})
337 ON CONFLICT("id") DO UPDATE SET {}"#,
338 table_name, columns_str, placeholders, update_str
339 );
340
341 debug!("set_state query: {}", query);
342
343 let new_head = state.payload.state.head.clone();
344 let table_name_clone = table_name.to_string();
345 let query_clone = query.clone();
346 let values_clone = values.clone();
347 let changed = conn
348 .with_connection(move |c| {
349 let old_head_json: Option<String> =
351 match c
352 .query_row(&format!(r#"SELECT "head" FROM "{}" WHERE "id" = ?"#, table_name_clone), [&id_clone], |row| row.get(0))
353 {
354 Ok(json) => Some(json),
355 Err(rusqlite::Error::QueryReturnedNoRows) => None,
356 Err(e) => return Err(SqliteError::Rusqlite(e)),
357 };
358
359 c.execute(&query_clone, params_from_iter(values_clone.iter())).map_err(|e| SqliteError::Rusqlite(e))?;
361
362 let changed = match old_head_json {
364 Some(json) => {
365 let old_head: Clock = serde_json::from_str(&json).map_err(|e| SqliteError::Json(e))?;
367 old_head != new_head
368 }
369 None => {
370 true
372 }
373 };
374
375 Ok(changed)
376 })
377 .await?;
378
379 debug!("set_state: Changed: {}", changed);
380 Ok(changed)
381 }
382
383 async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError> {
384 let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
385
386 let table_name = self.state_table().to_owned();
387 let id_str = id.to_base64();
388 let collection_id = self.collection_id.clone();
389
390 let result = conn
391 .with_connection(move |c| {
392 let query = format!(r#"SELECT "id", "state_buffer", "head", "attestations" FROM "{}" WHERE "id" = ?"#, table_name);
393
394 let result = c.query_row(&query, [&id_str], |row| {
395 let _row_id: String = row.get(0)?;
396 let state_buffer: Vec<u8> = row.get(1)?;
397 let head_json: String = row.get(2)?;
398 let attestations_blob: Vec<u8> = row.get(3)?;
399 Ok((state_buffer, head_json, attestations_blob))
400 });
401
402 match result {
403 Ok((state_buffer, head_json, attestations_blob)) => {
404 let state_buffers: BTreeMap<String, Vec<u8>> =
405 bincode::deserialize(&state_buffer).map_err(|e| SqliteError::Serialization(e))?;
406 let head: Clock = serde_json::from_str(&head_json).map_err(|e| SqliteError::Json(e))?;
407 let attestations: AttestationSet =
408 bincode::deserialize(&attestations_blob).map_err(|e| SqliteError::Serialization(e))?;
409
410 Ok(Attested {
411 payload: EntityState {
412 entity_id: id,
413 collection: collection_id,
414 state: State { state_buffers: StateBuffers(state_buffers), head },
415 },
416 attestations,
417 })
418 }
419 Err(rusqlite::Error::QueryReturnedNoRows) => {
420 let _ = create_state_table(c, &collection_id);
423 Err(SqliteError::Rusqlite(rusqlite::Error::QueryReturnedNoRows))
424 }
425 Err(e) => Err(SqliteError::Rusqlite(e)),
426 }
427 })
428 .await
429 .map_err(|e| match e {
430 SqliteError::Rusqlite(rusqlite::Error::QueryReturnedNoRows) => RetrievalError::EntityNotFound(id),
431 _ => RetrievalError::StorageError(Box::new(e)),
432 })?;
433
434 Ok(result)
435 }
436
437 async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
438 debug!("SqliteBucket({}).fetch_states: {:?}", self.collection_id, selection);
439
440 let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
441
442 let referenced = selection.referenced_columns();
445 let cached = self.existing_columns();
446 let unknown_to_cache: Vec<&String> = referenced.iter().filter(|col| !cached.contains(col)).collect();
447
448 if !unknown_to_cache.is_empty() {
450 debug!("SqliteBucket({}).fetch_states: Unknown columns {:?}, refreshing schema cache", self.collection_id, unknown_to_cache);
451 self.rebuild_columns_cache(&conn).await?;
452 }
453
454 let existing = self.existing_columns();
456 let missing: Vec<String> = referenced.into_iter().filter(|col| !existing.contains(col)).collect();
457
458 let effective_selection = if missing.is_empty() {
459 selection.clone()
460 } else {
461 debug!("SqliteBucket({}).fetch_states: Columns {:?} don't exist, treating as NULL", self.collection_id, missing);
462 selection.assume_null(&missing)
470 };
471
472 let split = split_predicate_for_sqlite(&effective_selection.predicate);
474 let needs_post_filter = split.needs_post_filter();
475 let remaining_predicate = split.remaining_predicate.clone();
476
477 let sql_selection = ankql::ast::Selection {
479 predicate: split.sql_predicate,
480 order_by: effective_selection.order_by.clone(),
481 limit: if needs_post_filter { None } else { effective_selection.limit },
482 };
483
484 let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer", "head", "attestations"]);
485 builder.table_name(self.state_table());
486 builder.selection(&sql_selection).map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
487
488 let (sql, params) = builder.build().map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
489 debug!("fetch_states SQL: {} with {} params", sql, params.len());
490
491 let collection_id = self.collection_id.clone();
492
493 let mut results = conn
494 .with_connection(move |c| {
495 let mut stmt = c.prepare(&sql)?;
496 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
497 let id_str: String = row.get(0)?;
498 let state_buffer: Vec<u8> = row.get(1)?;
499 let head_json: String = row.get(2)?;
500 let attestations_blob: Vec<u8> = row.get(3)?;
501 Ok((id_str, state_buffer, head_json, attestations_blob))
502 })?;
503
504 let mut results = Vec::new();
505 for row in rows {
506 let (id_str, state_buffer, head_json, attestations_blob) = row?;
507
508 let id = EntityId::from_base64(&id_str).map_err(|e| {
509 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
510 })?;
511 let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&state_buffer).map_err(|e| {
512 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
513 })?;
514 let head: Clock = serde_json::from_str(&head_json).map_err(|e| {
515 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
516 })?;
517 let attestations: AttestationSet = bincode::deserialize(&attestations_blob).map_err(|e| {
518 rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
519 })?;
520
521 results.push(Attested {
522 payload: EntityState {
523 entity_id: id,
524 collection: collection_id.clone(),
525 state: State { state_buffers: StateBuffers(state_buffers), head },
526 },
527 attestations,
528 });
529 }
530
531 Ok(results)
532 })
533 .await?;
534
535 if needs_post_filter {
537 debug!("Post-filtering {} results", results.len());
538 results = post_filter_states(&results, &remaining_predicate, &self.collection_id);
539
540 if let Some(limit) = effective_selection.limit {
541 results.truncate(limit as usize);
542 }
543 }
544
545 Ok(results)
546 }
547
548 async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError> {
549 let conn = self.pool.get().await.map_err(|e| MutationError::General(Box::new(SqliteError::Pool(e.to_string()))))?;
550
551 let operations = bincode::serialize(&entity_event.payload.operations)?;
552 let attestations = bincode::serialize(&entity_event.attestations)?;
553 let parent_json = serde_json::to_string(&entity_event.payload.parent).map_err(|e| MutationError::General(Box::new(e)))?;
554
555 let table_name = self.event_table();
556 let event_id = entity_event.payload.id().to_base64();
557 let entity_id = entity_event.payload.entity_id.to_base64();
558
559 let query = format!(
560 r#"INSERT INTO "{}"("id", "entity_id", "operations", "parent", "attestations") VALUES(?, ?, ?, ?, ?)
561 ON CONFLICT ("id") DO NOTHING"#,
562 table_name
563 );
564
565 conn.with_connection(move |c| {
566 let affected = c.execute(&query, rusqlite::params![event_id, entity_id, operations, parent_json, attestations])?;
567 Ok(affected > 0)
568 })
569 .await
570 .map_err(|e| MutationError::General(Box::new(e)))
571 }
572
573 async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError> {
574 if event_ids.is_empty() {
575 return Ok(Vec::new());
576 }
577
578 let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
579
580 let table_name = self.event_table().to_owned();
581 let collection_id = self.collection_id.clone();
582 let id_strings: Vec<String> = event_ids.iter().map(|id| id.to_base64()).collect();
583 let num_ids = id_strings.len();
584
585 conn.with_connection(move |c| {
586 let placeholders = (0..num_ids).map(|_| "?").collect::<Vec<_>>().join(", ");
587 let query = format!(
588 r#"SELECT "id", "entity_id", "operations", "parent", "attestations" FROM "{}" WHERE "id" IN ({})"#,
589 table_name, placeholders
590 );
591
592 let mut stmt = c.prepare(&query)?;
593 let params: Vec<&dyn rusqlite::ToSql> = id_strings.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
594 let rows = stmt.query_map(params.as_slice(), |row| {
595 let _event_id: String = row.get(0)?;
596 let entity_id_str: String = row.get(1)?;
597 let operations: Vec<u8> = row.get(2)?;
598 let parent_json: String = row.get(3)?;
599 let attestations_blob: Vec<u8> = row.get(4)?;
600 Ok((entity_id_str, operations, parent_json, attestations_blob))
601 })?;
602
603 let mut events = Vec::with_capacity(num_ids);
604 for row in rows {
605 let (entity_id_str, operations_blob, parent_json, attestations_blob) = row?;
606
607 let entity_id = EntityId::from_base64(&entity_id_str).map_err(|e| {
608 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
609 })?;
610 let operations: OperationSet = bincode::deserialize(&operations_blob).map_err(|e| {
611 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
612 })?;
613 let parent: Clock = serde_json::from_str(&parent_json).map_err(|e| {
614 rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
615 })?;
616 let attestations: AttestationSet = bincode::deserialize(&attestations_blob).map_err(|e| {
617 rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
618 })?;
619
620 events.push(Attested { payload: Event { collection: collection_id.clone(), entity_id, operations, parent }, attestations });
621 }
622
623 Ok(events)
624 })
625 .await
626 .map_err(|e| RetrievalError::StorageError(Box::new(e)))
627 }
628
629 async fn dump_entity_events(&self, entity_id: EntityId) -> Result<Vec<Attested<Event>>, RetrievalError> {
630 let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
631
632 let table_name = self.event_table().to_owned();
633 let collection_id = self.collection_id.clone();
634 let entity_id_str = entity_id.to_base64();
635
636 conn.with_connection(move |c| {
637 let query = format!(r#"SELECT "id", "operations", "parent", "attestations" FROM "{}" WHERE "entity_id" = ?"#, table_name);
638
639 let mut stmt = c.prepare(&query)?;
640 let rows = stmt.query_map([&entity_id_str], |row| {
641 let _event_id: String = row.get(0)?;
642 let operations: Vec<u8> = row.get(1)?;
643 let parent_json: String = row.get(2)?;
644 let attestations_blob: Vec<u8> = row.get(3)?;
645 Ok((operations, parent_json, attestations_blob))
646 })?;
647
648 let mut events = Vec::new();
649 for row in rows {
650 let (operations_blob, parent_json, attestations_blob) = row?;
651
652 let operations: OperationSet = bincode::deserialize(&operations_blob).map_err(|e| {
653 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
654 })?;
655 let parent: Clock = serde_json::from_str(&parent_json).map_err(|e| {
656 rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
657 })?;
658 let attestations: AttestationSet = bincode::deserialize(&attestations_blob).map_err(|e| {
659 rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
660 })?;
661
662 events.push(Attested { payload: Event { collection: collection_id.clone(), entity_id, operations, parent }, attestations });
663 }
664
665 Ok(events)
666 })
667 .await
668 .map_err(|e| RetrievalError::StorageError(Box::new(e)))
669 }
670}
671
672fn post_filter_states(
674 states: &[Attested<EntityState>],
675 predicate: &ankql::ast::Predicate,
676 collection_id: &CollectionId,
677) -> Vec<Attested<EntityState>> {
678 states
679 .iter()
680 .filter(|attested| match TemporaryEntity::new(attested.payload.entity_id, collection_id.clone(), &attested.payload.state) {
681 Ok(temp_entity) => match evaluate_predicate(&temp_entity, predicate) {
682 Ok(result) => result,
683 Err(e) => {
684 warn!("Post-filter evaluation error for entity {}: {}", attested.payload.entity_id, e);
685 false
686 }
687 },
688 Err(e) => {
689 warn!("Failed to create TemporaryEntity for post-filtering {}: {}", attested.payload.entity_id, e);
690 false
691 }
692 })
693 .cloned()
694 .collect()
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700
701 #[tokio::test]
702 async fn test_open_in_memory() {
703 let engine = SqliteStorageEngine::open_in_memory().await.unwrap();
704 let collection = engine.collection(&"test_collection".into()).await.unwrap();
705 let all = ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None };
706 assert!(collection.fetch_states(&all).await.unwrap().is_empty());
707 }
708
709 #[tokio::test]
710 async fn test_sane_name() {
711 assert!(SqliteStorageEngine::sane_name("test_collection"));
712 assert!(SqliteStorageEngine::sane_name("test.collection"));
713 assert!(SqliteStorageEngine::sane_name("test:collection"));
714 assert!(!SqliteStorageEngine::sane_name("test;collection"));
715 assert!(!SqliteStorageEngine::sane_name("test'collection"));
716 }
717
718 #[tokio::test]
726 async fn test_jsonb_function_availability() -> Result<(), SqliteError> {
727 let engine = SqliteStorageEngine::open_in_memory().await.map_err(|e| SqliteError::DDL(e.to_string()))?;
728 let conn = engine.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
729
730 let result = conn
733 .with_connection(|c| {
734 let value: Vec<u8> = c.query_row("SELECT jsonb('{\"key\": \"value\"}')", [], |row| row.get(0))?;
735 Ok(value)
736 })
737 .await?;
738 assert!(!result.is_empty(), "jsonb() function should return a non-empty BLOB");
740
741 let result = conn
744 .with_connection(|c| {
745 let value: String =
746 c.query_row(r#"SELECT json_extract(jsonb('{"territory": "US", "count": 10}'), '$.territory')"#, [], |row| row.get(0))?;
747 Ok(value)
748 })
749 .await?;
750 assert_eq!(result, "US", "JSON path extraction should return the SQL value");
752
753 let result = conn
756 .with_connection(|c| {
757 let value: bool = c.query_row(
758 r#"SELECT json_extract(jsonb('{"count": 9}'), '$.count') > json_extract(jsonb('{"count": 10}'), '$.count')"#,
759 [],
760 |row| row.get(0),
761 )?;
762 Ok(value)
763 })
764 .await?;
765 assert!(!result, "Numeric comparison: 9 > 10 should be false");
766
767 Ok(())
768 }
769
770 #[tokio::test]
777 async fn test_json_path_query() -> anyhow::Result<()> {
778 use crate::sql_builder::SqlBuilder;
779 use ankql::parser::parse_selection;
780
781 let selection = parse_selection(r#"data.status = 'active'"#).expect("Failed to parse query");
783 let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer"]);
784 builder.table_name("test_table");
785 builder.selection(&selection).map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
786
787 let (sql, _params) = builder.build().map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
788
789 assert!(sql.contains("json_extract"), "SQL should use json_extract() for JSON path: {}", sql);
791 assert!(sql.contains(r#"json_extract("data", '$.status')"#), "SQL should extract from data column with $.status path: {}", sql);
792
793 Ok(())
794 }
795
796 #[tokio::test]
799 async fn test_jsonb_storage_and_parameterized_query() -> Result<(), SqliteError> {
800 let engine = SqliteStorageEngine::open_in_memory().await.map_err(|e| SqliteError::DDL(e.to_string()))?;
801 let conn = engine.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
802
803 conn.with_connection(|c| {
804 c.execute(r#"CREATE TABLE test_jsonb (id TEXT PRIMARY KEY, data BLOB)"#, [])?;
806
807 let json_text = r#"{"territory": "US", "count": 10}"#;
809 c.execute(r#"INSERT INTO test_jsonb (id, data) VALUES (?, jsonb(?))"#, rusqlite::params!["1", json_text])?;
810
811 let count: i32 = c.query_row("SELECT COUNT(*) FROM test_jsonb", [], |row| row.get(0))?;
813 assert_eq!(count, 1, "Should have 1 row");
814
815 let data_type: String = c.query_row("SELECT typeof(data) FROM test_jsonb WHERE id = '1'", [], |row| row.get(0))?;
817 eprintln!("Data column type: {}", data_type);
818
819 let extracted: String =
821 c.query_row(r#"SELECT json_extract(data, '$.territory') FROM test_jsonb WHERE id = '1'"#, [], |row| row.get(0))?;
822 eprintln!("Extracted territory: '{}'", extracted);
823
824 let query_param = "US";
826 let result: Result<String, _> = c.query_row(
827 r#"SELECT id FROM test_jsonb WHERE json_extract(data, '$.territory') = ?"#,
828 rusqlite::params![query_param],
829 |row| row.get(0),
830 );
831 eprintln!("Query result: {:?}", result);
832
833 match result {
834 Ok(id) => assert_eq!(id, "1", "Should find the row with territory = US"),
835 Err(e) => panic!("Query failed: {:?}", e),
836 }
837
838 Ok(())
839 })
840 .await
841 }
842}