Skip to main content

ankurah_storage_sqlite/
engine.rs

1//! SQLite storage engine implementation
2
3use std::collections::BTreeMap;
4use std::path::Path;
5use std::sync::Arc;
6
7use ankurah_core::entity::TemporaryEntity;
8use ankurah_core::error::{MutationError, RetrievalError};
9use ankurah_core::property::backend::backend_from_string;
10use ankurah_core::selection::filter::evaluate_predicate;
11use ankurah_core::storage::{StorageCollection, StorageEngine};
12use ankurah_proto::{
13    AttestationSet, Attested, Clock, CollectionId, EntityId, EntityState, Event, EventId, OperationSet, State, StateBuffers,
14};
15use async_trait::async_trait;
16use rusqlite::{params_from_iter, Connection};
17use tracing::{debug, warn};
18
19use crate::connection::{PooledConnection, SqliteConnectionManager};
20use crate::error::SqliteError;
21use crate::sql_builder::{split_predicate_for_sqlite, SqlBuilder};
22use crate::value::SqliteValue;
23
24/// Default connection pool size
25pub const DEFAULT_POOL_SIZE: u32 = 10;
26
27/// SQLite storage engine
28pub struct SqliteStorageEngine {
29    pool: bb8::Pool<SqliteConnectionManager>,
30}
31
32impl SqliteStorageEngine {
33    /// Create a new storage engine with an existing pool
34    pub fn new(pool: bb8::Pool<SqliteConnectionManager>) -> Self { Self { pool } }
35
36    /// Open a file-based SQLite database
37    pub async fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
38        let manager = SqliteConnectionManager::file(path.as_ref());
39        let pool = bb8::Pool::builder().max_size(DEFAULT_POOL_SIZE).build(manager).await?;
40        Ok(Self::new(pool))
41    }
42
43    /// Open an in-memory SQLite database (for testing)
44    pub async fn open_in_memory() -> anyhow::Result<Self> {
45        let manager = SqliteConnectionManager::memory();
46        // For in-memory, we use a single connection to keep the database alive
47        let pool = bb8::Pool::builder().max_size(1).build(manager).await?;
48        Ok(Self::new(pool))
49    }
50
51    /// Check if a collection name is valid
52    pub fn sane_name(collection: &str) -> bool {
53        for char in collection.chars() {
54            match char {
55                c if c.is_alphanumeric() => {}
56                '_' | '.' | ':' => {}
57                _ => return false,
58            }
59        }
60        true
61    }
62
63    /// Get a reference to the connection pool (for testing/diagnostics)
64    pub fn pool(&self) -> &bb8::Pool<SqliteConnectionManager> { &self.pool }
65}
66
67#[async_trait]
68impl StorageEngine for SqliteStorageEngine {
69    type Value = SqliteValue;
70
71    async fn collection(&self, collection_id: &CollectionId) -> Result<Arc<dyn StorageCollection>, RetrievalError> {
72        if !Self::sane_name(collection_id.as_str()) {
73            return Err(RetrievalError::InvalidBucketName);
74        }
75
76        let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
77
78        let bucket = SqliteBucket::new(self.pool.clone(), collection_id.clone());
79
80        // Create tables if they don't exist
81        let collection_id_clone = collection_id.clone();
82        conn.with_connection(move |c| {
83            create_state_table(c, &collection_id_clone)?;
84            create_event_table(c, &collection_id_clone)?;
85            Ok(())
86        })
87        .await?;
88
89        // Rebuild column cache
90        bucket.rebuild_columns_cache(&conn).await?;
91
92        Ok(Arc::new(bucket))
93    }
94
95    async fn delete_all_collections(&self) -> Result<bool, MutationError> {
96        let conn = self.pool.get().await.map_err(|e| MutationError::General(Box::new(SqliteError::Pool(e.to_string()))))?;
97
98        conn.with_connection(|c| {
99            // Get all table names
100            let mut stmt = c.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")?;
101            let tables: Vec<String> = stmt.query_map([], |row| row.get(0))?.filter_map(|r| r.ok()).collect();
102
103            if tables.is_empty() {
104                return Ok(false);
105            }
106
107            for table in tables {
108                c.execute(&format!(r#"DROP TABLE IF EXISTS "{}""#, table), [])?;
109            }
110
111            Ok(true)
112        })
113        .await
114        .map_err(|e| MutationError::General(Box::new(e)))
115    }
116}
117
118fn create_state_table(conn: &Connection, collection_id: &CollectionId) -> Result<(), SqliteError> {
119    let table_name = collection_id.as_str();
120    let query = format!(
121        r#"CREATE TABLE IF NOT EXISTS "{}"(
122            "id" TEXT PRIMARY KEY,
123            "state_buffer" BLOB NOT NULL,
124            "head" TEXT NOT NULL,
125            "attestations" BLOB
126        )"#,
127        table_name
128    );
129    debug!("Creating state table: {}", query);
130    conn.execute(&query, [])?;
131    Ok(())
132}
133
134fn create_event_table(conn: &Connection, collection_id: &CollectionId) -> Result<(), SqliteError> {
135    let table_name = format!("{}_event", collection_id.as_str());
136    let query = format!(
137        r#"CREATE TABLE IF NOT EXISTS "{}"(
138            "id" TEXT PRIMARY KEY,
139            "entity_id" TEXT,
140            "operations" BLOB,
141            "parent" TEXT,
142            "attestations" BLOB
143        )"#,
144        table_name
145    );
146    debug!("Creating event table: {}", query);
147    conn.execute(&query, [])?;
148
149    // Create index on entity_id for efficient dump_entity_events queries
150    let index_query = format!(r#"CREATE INDEX IF NOT EXISTS "{}_entity_id_idx" ON "{}"("entity_id")"#, table_name, table_name);
151    conn.execute(&index_query, [])?;
152
153    Ok(())
154}
155
156/// Column metadata
157#[derive(Clone, Debug)]
158pub struct SqliteColumn {
159    pub name: String,
160    #[allow(dead_code)]
161    pub data_type: String,
162}
163
164/// SQLite storage bucket (collection)
165pub struct SqliteBucket {
166    pool: bb8::Pool<SqliteConnectionManager>,
167    collection_id: CollectionId,
168    /// Cached state table name (avoids repeated allocations)
169    state_table_name: String,
170    /// Cached event table name (avoids repeated allocations)
171    event_table_name: String,
172    columns: Arc<std::sync::RwLock<Vec<SqliteColumn>>>,
173    ddl_lock: Arc<tokio::sync::Mutex<()>>,
174}
175
176impl SqliteBucket {
177    /// Create a new bucket with cached table names
178    fn new(pool: bb8::Pool<SqliteConnectionManager>, collection_id: CollectionId) -> Self {
179        let state_table_name = collection_id.as_str().to_string();
180        let event_table_name = format!("{}_event", collection_id.as_str());
181        Self {
182            pool,
183            collection_id,
184            state_table_name,
185            event_table_name,
186            columns: Arc::new(std::sync::RwLock::new(Vec::new())),
187            ddl_lock: Arc::new(tokio::sync::Mutex::new(())),
188        }
189    }
190
191    #[inline]
192    fn state_table(&self) -> &str { &self.state_table_name }
193
194    #[inline]
195    fn event_table(&self) -> &str { &self.event_table_name }
196
197    /// Returns all column names currently in the schema cache
198    pub fn existing_columns(&self) -> Vec<String> {
199        let columns = self.columns.read().expect("RwLock poisoned");
200        columns.iter().map(|c| c.name.clone()).collect()
201    }
202
203    /// Check if a column exists in the schema cache
204    pub fn has_column(&self, name: &str) -> bool {
205        let columns = self.columns.read().expect("RwLock poisoned");
206        columns.iter().any(|c| c.name == name)
207    }
208
209    async fn rebuild_columns_cache(&self, conn: &PooledConnection) -> Result<(), SqliteError> {
210        let table_name = self.state_table().to_owned();
211        let new_columns = conn
212            .with_connection(move |c| {
213                let mut stmt = c.prepare(&format!("PRAGMA table_info(\"{}\")", table_name))?;
214                let columns: Vec<SqliteColumn> = stmt
215                    .query_map([], |row| Ok(SqliteColumn { name: row.get(1)?, data_type: row.get(2)? }))?
216                    .filter_map(|r| r.ok())
217                    .collect();
218                Ok(columns)
219            })
220            .await?;
221
222        let mut columns = self.columns.write().expect("RwLock poisoned");
223        *columns = new_columns;
224        Ok(())
225    }
226
227    async fn add_missing_columns(&self, conn: &PooledConnection, missing: Vec<(String, &'static str)>) -> Result<(), SqliteError> {
228        if missing.is_empty() {
229            return Ok(());
230        }
231
232        // Acquire DDL lock
233        let _lock = self.ddl_lock.lock().await;
234
235        // Re-check columns after acquiring lock
236        self.rebuild_columns_cache(conn).await?;
237
238        let table_name = self.state_table();
239        for (column, datatype) in missing {
240            if SqliteStorageEngine::sane_name(&column) && !self.has_column(&column) {
241                let alter_query = format!(r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#, table_name, column, datatype);
242                debug!("Adding column: {}", alter_query);
243
244                let query = alter_query.clone();
245                conn.with_connection(move |c| {
246                    c.execute(&query, [])?;
247                    Ok(())
248                })
249                .await?;
250            }
251        }
252
253        self.rebuild_columns_cache(conn).await?;
254        Ok(())
255    }
256}
257
258#[async_trait]
259impl StorageCollection for SqliteBucket {
260    async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError> {
261        let conn = self.pool.get().await.map_err(|e| MutationError::General(Box::new(SqliteError::Pool(e.to_string()))))?;
262
263        // Ensure head is not empty for new records
264        if state.payload.state.head.is_empty() {
265            warn!("Warning: Empty head detected for entity {}", state.payload.entity_id);
266        }
267
268        let state_buffers = bincode::serialize(&state.payload.state.state_buffers)?;
269        let head_json = serde_json::to_string(&state.payload.state.head).map_err(|e| MutationError::General(Box::new(e)))?;
270        let attestations_blob = bincode::serialize(&state.attestations)?;
271        let id = state.payload.entity_id.to_base64();
272        let id_clone = id.clone(); // Clone for use in closure
273
274        // Collect materialized columns (with JSONB flag for proper SQL generation)
275        let mut materialized: Vec<(String, Option<SqliteValue>, bool)> = Vec::new(); // (name, value, is_jsonb)
276        let mut seen_properties = std::collections::HashSet::new();
277
278        for (name, state_buffer) in state.payload.state.state_buffers.iter() {
279            let backend = backend_from_string(name, Some(state_buffer))?;
280            for (column, value) in backend.property_values() {
281                if !seen_properties.insert(column.clone()) {
282                    continue;
283                }
284
285                let sqlite_value: Option<SqliteValue> = value.map(|v| v.into());
286                let is_jsonb = sqlite_value.as_ref().is_some_and(|v| v.is_jsonb());
287
288                if !self.has_column(&column) {
289                    if let Some(ref sv) = sqlite_value {
290                        self.add_missing_columns(&conn, vec![(column.clone(), sv.sqlite_type())]).await?;
291                    } else {
292                        continue;
293                    }
294                }
295
296                materialized.push((column, sqlite_value, is_jsonb));
297            }
298        }
299
300        // Build the UPSERT query
301        const BASE_COLUMNS: &[&str] = &["id", "state_buffer", "head", "attestations"];
302
303        let table_name = self.state_table();
304        let num_columns = BASE_COLUMNS.len() + materialized.len();
305        let mut columns: Vec<&str> = Vec::with_capacity(num_columns);
306        columns.extend_from_slice(BASE_COLUMNS);
307
308        let mut values: Vec<rusqlite::types::Value> = Vec::with_capacity(num_columns);
309        values.push(rusqlite::types::Value::Text(id));
310        values.push(rusqlite::types::Value::Blob(state_buffers));
311        values.push(rusqlite::types::Value::Text(head_json));
312        values.push(rusqlite::types::Value::Blob(attestations_blob));
313
314        // Track which placeholders need jsonb() wrapper (base columns don't)
315        let mut placeholder_is_jsonb: Vec<bool> = Vec::with_capacity(num_columns);
316        placeholder_is_jsonb.resize(BASE_COLUMNS.len(), false);
317
318        for (name, value, is_jsonb) in &materialized {
319            columns.push(name.as_str());
320            values.push(match value {
321                Some(v) => v.to_sql(),
322                None => rusqlite::types::Value::Null,
323            });
324            placeholder_is_jsonb.push(*is_jsonb);
325        }
326
327        let columns_str = columns.iter().map(|c| format!(r#""{}""#, c)).collect::<Vec<_>>().join(", ");
328        // Use jsonb(?) for JSONB columns to convert JSON text to JSONB binary format
329        let placeholders =
330            placeholder_is_jsonb.iter().map(|is_jsonb| if *is_jsonb { "jsonb(?)" } else { "?" }).collect::<Vec<_>>().join(", ");
331        let update_str = columns.iter().skip(1).map(|c| format!(r#""{}" = excluded."{}""#, c, c)).collect::<Vec<_>>().join(", ");
332
333        // Use SQLite's RETURNING clause (3.35.0+) to get the old head for comparison
334        // If RETURNING is not available, we'll fall back to a separate query
335        let query = format!(
336            r#"INSERT INTO "{}"({}) VALUES({})
337               ON CONFLICT("id") DO UPDATE SET {}"#,
338            table_name, columns_str, placeholders, update_str
339        );
340
341        debug!("set_state query: {}", query);
342
343        let new_head = state.payload.state.head.clone();
344        let table_name_clone = table_name.to_string();
345        let query_clone = query.clone();
346        let values_clone = values.clone();
347        let changed = conn
348            .with_connection(move |c| {
349                // First, get the old head if the entity exists
350                let old_head_json: Option<String> =
351                    match c
352                        .query_row(&format!(r#"SELECT "head" FROM "{}" WHERE "id" = ?"#, table_name_clone), [&id_clone], |row| row.get(0))
353                    {
354                        Ok(json) => Some(json),
355                        Err(rusqlite::Error::QueryReturnedNoRows) => None,
356                        Err(e) => return Err(SqliteError::Rusqlite(e)),
357                    };
358
359                // Execute the UPSERT
360                c.execute(&query_clone, params_from_iter(values_clone.iter())).map_err(|e| SqliteError::Rusqlite(e))?;
361
362                // Determine if state changed
363                let changed = match old_head_json {
364                    Some(json) => {
365                        // Entity existed - compare heads
366                        let old_head: Clock = serde_json::from_str(&json).map_err(|e| SqliteError::Json(e))?;
367                        old_head != new_head
368                    }
369                    None => {
370                        // New entity
371                        true
372                    }
373                };
374
375                Ok(changed)
376            })
377            .await?;
378
379        debug!("set_state: Changed: {}", changed);
380        Ok(changed)
381    }
382
383    async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError> {
384        let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
385
386        let table_name = self.state_table().to_owned();
387        let id_str = id.to_base64();
388        let collection_id = self.collection_id.clone();
389
390        let result = conn
391            .with_connection(move |c| {
392                let query = format!(r#"SELECT "id", "state_buffer", "head", "attestations" FROM "{}" WHERE "id" = ?"#, table_name);
393
394                let result = c.query_row(&query, [&id_str], |row| {
395                    let _row_id: String = row.get(0)?;
396                    let state_buffer: Vec<u8> = row.get(1)?;
397                    let head_json: String = row.get(2)?;
398                    let attestations_blob: Vec<u8> = row.get(3)?;
399                    Ok((state_buffer, head_json, attestations_blob))
400                });
401
402                match result {
403                    Ok((state_buffer, head_json, attestations_blob)) => {
404                        let state_buffers: BTreeMap<String, Vec<u8>> =
405                            bincode::deserialize(&state_buffer).map_err(|e| SqliteError::Serialization(e))?;
406                        let head: Clock = serde_json::from_str(&head_json).map_err(|e| SqliteError::Json(e))?;
407                        let attestations: AttestationSet =
408                            bincode::deserialize(&attestations_blob).map_err(|e| SqliteError::Serialization(e))?;
409
410                        Ok(Attested {
411                            payload: EntityState {
412                                entity_id: id,
413                                collection: collection_id,
414                                state: State { state_buffers: StateBuffers(state_buffers), head },
415                            },
416                            attestations,
417                        })
418                    }
419                    Err(rusqlite::Error::QueryReturnedNoRows) => {
420                        // Table might not exist - create it and return EntityNotFound
421                        // This matches Postgres behavior
422                        let _ = create_state_table(c, &collection_id);
423                        Err(SqliteError::Rusqlite(rusqlite::Error::QueryReturnedNoRows))
424                    }
425                    Err(e) => Err(SqliteError::Rusqlite(e)),
426                }
427            })
428            .await
429            .map_err(|e| match e {
430                SqliteError::Rusqlite(rusqlite::Error::QueryReturnedNoRows) => RetrievalError::EntityNotFound(id),
431                _ => RetrievalError::StorageError(Box::new(e)),
432            })?;
433
434        Ok(result)
435    }
436
437    async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
438        debug!("SqliteBucket({}).fetch_states: {:?}", self.collection_id, selection);
439
440        let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
441
442        // Pre-filter selection based on cached schema to avoid undefined column errors.
443        // If we see columns not in our cache, refresh it first (they might have been added).
444        let referenced = selection.referenced_columns();
445        let cached = self.existing_columns();
446        let unknown_to_cache: Vec<&String> = referenced.iter().filter(|col| !cached.contains(col)).collect();
447
448        // Refresh cache if we see columns we haven't seen before
449        if !unknown_to_cache.is_empty() {
450            debug!("SqliteBucket({}).fetch_states: Unknown columns {:?}, refreshing schema cache", self.collection_id, unknown_to_cache);
451            self.rebuild_columns_cache(&conn).await?;
452        }
453
454        // Now check with (possibly refreshed) cache - columns still missing truly don't exist
455        let existing = self.existing_columns();
456        let missing: Vec<String> = referenced.into_iter().filter(|col| !existing.contains(col)).collect();
457
458        let effective_selection = if missing.is_empty() {
459            selection.clone()
460        } else {
461            debug!("SqliteBucket({}).fetch_states: Columns {:?} don't exist, treating as NULL", self.collection_id, missing);
462            // Note: assume_null() has a limitation with JSON paths - it checks path.property()
463            // (last step) instead of path.first() (column name). This means for paths like
464            // "licensing.territory", if "licensing" is missing, assume_null() won't match
465            // because it checks "territory". However, this should be rare since columns
466            // are created on-demand during set_state. If it happens, assume_null() will
467            // leave the predicate unchanged, which may cause the query to fail.
468            // TODO: Fix assume_null() in ankql to check path.first() for multi-step paths.
469            selection.assume_null(&missing)
470        };
471
472        // Split predicate for pushdown
473        let split = split_predicate_for_sqlite(&effective_selection.predicate);
474        let needs_post_filter = split.needs_post_filter();
475        let remaining_predicate = split.remaining_predicate.clone();
476
477        // Build SQL
478        let sql_selection = ankql::ast::Selection {
479            predicate: split.sql_predicate,
480            order_by: effective_selection.order_by.clone(),
481            limit: if needs_post_filter { None } else { effective_selection.limit },
482        };
483
484        let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer", "head", "attestations"]);
485        builder.table_name(self.state_table());
486        builder.selection(&sql_selection).map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
487
488        let (sql, params) = builder.build().map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
489        debug!("fetch_states SQL: {} with {} params", sql, params.len());
490
491        let collection_id = self.collection_id.clone();
492
493        let mut results = conn
494            .with_connection(move |c| {
495                let mut stmt = c.prepare(&sql)?;
496                let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
497                    let id_str: String = row.get(0)?;
498                    let state_buffer: Vec<u8> = row.get(1)?;
499                    let head_json: String = row.get(2)?;
500                    let attestations_blob: Vec<u8> = row.get(3)?;
501                    Ok((id_str, state_buffer, head_json, attestations_blob))
502                })?;
503
504                let mut results = Vec::new();
505                for row in rows {
506                    let (id_str, state_buffer, head_json, attestations_blob) = row?;
507
508                    let id = EntityId::from_base64(&id_str).map_err(|e| {
509                        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
510                    })?;
511                    let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&state_buffer).map_err(|e| {
512                        rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
513                    })?;
514                    let head: Clock = serde_json::from_str(&head_json).map_err(|e| {
515                        rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
516                    })?;
517                    let attestations: AttestationSet = bincode::deserialize(&attestations_blob).map_err(|e| {
518                        rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
519                    })?;
520
521                    results.push(Attested {
522                        payload: EntityState {
523                            entity_id: id,
524                            collection: collection_id.clone(),
525                            state: State { state_buffers: StateBuffers(state_buffers), head },
526                        },
527                        attestations,
528                    });
529                }
530
531                Ok(results)
532            })
533            .await?;
534
535        // Post-filter if needed
536        if needs_post_filter {
537            debug!("Post-filtering {} results", results.len());
538            results = post_filter_states(&results, &remaining_predicate, &self.collection_id);
539
540            if let Some(limit) = effective_selection.limit {
541                results.truncate(limit as usize);
542            }
543        }
544
545        Ok(results)
546    }
547
548    async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError> {
549        let conn = self.pool.get().await.map_err(|e| MutationError::General(Box::new(SqliteError::Pool(e.to_string()))))?;
550
551        let operations = bincode::serialize(&entity_event.payload.operations)?;
552        let attestations = bincode::serialize(&entity_event.attestations)?;
553        let parent_json = serde_json::to_string(&entity_event.payload.parent).map_err(|e| MutationError::General(Box::new(e)))?;
554
555        let table_name = self.event_table();
556        let event_id = entity_event.payload.id().to_base64();
557        let entity_id = entity_event.payload.entity_id.to_base64();
558
559        let query = format!(
560            r#"INSERT INTO "{}"("id", "entity_id", "operations", "parent", "attestations") VALUES(?, ?, ?, ?, ?)
561               ON CONFLICT ("id") DO NOTHING"#,
562            table_name
563        );
564
565        conn.with_connection(move |c| {
566            let affected = c.execute(&query, rusqlite::params![event_id, entity_id, operations, parent_json, attestations])?;
567            Ok(affected > 0)
568        })
569        .await
570        .map_err(|e| MutationError::General(Box::new(e)))
571    }
572
573    async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError> {
574        if event_ids.is_empty() {
575            return Ok(Vec::new());
576        }
577
578        let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
579
580        let table_name = self.event_table().to_owned();
581        let collection_id = self.collection_id.clone();
582        let id_strings: Vec<String> = event_ids.iter().map(|id| id.to_base64()).collect();
583        let num_ids = id_strings.len();
584
585        conn.with_connection(move |c| {
586            let placeholders = (0..num_ids).map(|_| "?").collect::<Vec<_>>().join(", ");
587            let query = format!(
588                r#"SELECT "id", "entity_id", "operations", "parent", "attestations" FROM "{}" WHERE "id" IN ({})"#,
589                table_name, placeholders
590            );
591
592            let mut stmt = c.prepare(&query)?;
593            let params: Vec<&dyn rusqlite::ToSql> = id_strings.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
594            let rows = stmt.query_map(params.as_slice(), |row| {
595                let _event_id: String = row.get(0)?;
596                let entity_id_str: String = row.get(1)?;
597                let operations: Vec<u8> = row.get(2)?;
598                let parent_json: String = row.get(3)?;
599                let attestations_blob: Vec<u8> = row.get(4)?;
600                Ok((entity_id_str, operations, parent_json, attestations_blob))
601            })?;
602
603            let mut events = Vec::with_capacity(num_ids);
604            for row in rows {
605                let (entity_id_str, operations_blob, parent_json, attestations_blob) = row?;
606
607                let entity_id = EntityId::from_base64(&entity_id_str).map_err(|e| {
608                    rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
609                })?;
610                let operations: OperationSet = bincode::deserialize(&operations_blob).map_err(|e| {
611                    rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
612                })?;
613                let parent: Clock = serde_json::from_str(&parent_json).map_err(|e| {
614                    rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
615                })?;
616                let attestations: AttestationSet = bincode::deserialize(&attestations_blob).map_err(|e| {
617                    rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
618                })?;
619
620                events.push(Attested { payload: Event { collection: collection_id.clone(), entity_id, operations, parent }, attestations });
621            }
622
623            Ok(events)
624        })
625        .await
626        .map_err(|e| RetrievalError::StorageError(Box::new(e)))
627    }
628
629    async fn dump_entity_events(&self, entity_id: EntityId) -> Result<Vec<Attested<Event>>, RetrievalError> {
630        let conn = self.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
631
632        let table_name = self.event_table().to_owned();
633        let collection_id = self.collection_id.clone();
634        let entity_id_str = entity_id.to_base64();
635
636        conn.with_connection(move |c| {
637            let query = format!(r#"SELECT "id", "operations", "parent", "attestations" FROM "{}" WHERE "entity_id" = ?"#, table_name);
638
639            let mut stmt = c.prepare(&query)?;
640            let rows = stmt.query_map([&entity_id_str], |row| {
641                let _event_id: String = row.get(0)?;
642                let operations: Vec<u8> = row.get(1)?;
643                let parent_json: String = row.get(2)?;
644                let attestations_blob: Vec<u8> = row.get(3)?;
645                Ok((operations, parent_json, attestations_blob))
646            })?;
647
648            let mut events = Vec::new();
649            for row in rows {
650                let (operations_blob, parent_json, attestations_blob) = row?;
651
652                let operations: OperationSet = bincode::deserialize(&operations_blob).map_err(|e| {
653                    rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
654                })?;
655                let parent: Clock = serde_json::from_str(&parent_json).map_err(|e| {
656                    rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(std::io::Error::other(e)))
657                })?;
658                let attestations: AttestationSet = bincode::deserialize(&attestations_blob).map_err(|e| {
659                    rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Blob, Box::new(std::io::Error::other(e)))
660                })?;
661
662                events.push(Attested { payload: Event { collection: collection_id.clone(), entity_id, operations, parent }, attestations });
663            }
664
665            Ok(events)
666        })
667        .await
668        .map_err(|e| RetrievalError::StorageError(Box::new(e)))
669    }
670}
671
672/// Post-filter EntityStates using a predicate that couldn't be pushed to SQL.
673fn post_filter_states(
674    states: &[Attested<EntityState>],
675    predicate: &ankql::ast::Predicate,
676    collection_id: &CollectionId,
677) -> Vec<Attested<EntityState>> {
678    states
679        .iter()
680        .filter(|attested| match TemporaryEntity::new(attested.payload.entity_id, collection_id.clone(), &attested.payload.state) {
681            Ok(temp_entity) => match evaluate_predicate(&temp_entity, predicate) {
682                Ok(result) => result,
683                Err(e) => {
684                    warn!("Post-filter evaluation error for entity {}: {}", attested.payload.entity_id, e);
685                    false
686                }
687            },
688            Err(e) => {
689                warn!("Failed to create TemporaryEntity for post-filtering {}: {}", attested.payload.entity_id, e);
690                false
691            }
692        })
693        .cloned()
694        .collect()
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700
701    #[tokio::test]
702    async fn test_open_in_memory() {
703        let engine = SqliteStorageEngine::open_in_memory().await.unwrap();
704        let collection = engine.collection(&"test_collection".into()).await.unwrap();
705        let all = ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None };
706        assert!(collection.fetch_states(&all).await.unwrap().is_empty());
707    }
708
709    #[tokio::test]
710    async fn test_sane_name() {
711        assert!(SqliteStorageEngine::sane_name("test_collection"));
712        assert!(SqliteStorageEngine::sane_name("test.collection"));
713        assert!(SqliteStorageEngine::sane_name("test:collection"));
714        assert!(!SqliteStorageEngine::sane_name("test;collection"));
715        assert!(!SqliteStorageEngine::sane_name("test'collection"));
716    }
717
718    /// Test that SQLite JSONB functions are available and work correctly.
719    ///
720    /// This test verifies:
721    /// 1. The `jsonb()` function exists and can convert JSON text to JSONB
722    /// 2. The `->` operator works for JSON path traversal
723    /// 3. Type-aware comparisons work (numeric vs string)
724    /// 4. JSONB storage and retrieval works correctly
725    #[tokio::test]
726    async fn test_jsonb_function_availability() -> Result<(), SqliteError> {
727        let engine = SqliteStorageEngine::open_in_memory().await.map_err(|e| SqliteError::DDL(e.to_string()))?;
728        let conn = engine.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
729
730        // Test 1: Verify jsonb() function exists and works
731        // jsonb() returns a BLOB (JSONB binary format), so we query it as BLOB
732        let result = conn
733            .with_connection(|c| {
734                let value: Vec<u8> = c.query_row("SELECT jsonb('{\"key\": \"value\"}')", [], |row| row.get(0))?;
735                Ok(value)
736            })
737            .await?;
738        // jsonb() returns JSONB BLOB format - verify it's not empty
739        assert!(!result.is_empty(), "jsonb() function should return a non-empty BLOB");
740
741        // Test 2: Verify -> operator works for path traversal
742        // json_extract returns the SQL value (unquoted string for JSON strings)
743        let result = conn
744            .with_connection(|c| {
745                let value: String =
746                    c.query_row(r#"SELECT json_extract(jsonb('{"territory": "US", "count": 10}'), '$.territory')"#, [], |row| row.get(0))?;
747                Ok(value)
748            })
749            .await?;
750        // json_extract returns the unquoted SQL value, not the JSON string representation
751        assert_eq!(result, "US", "JSON path extraction should return the SQL value");
752
753        // Test 3: Verify numeric comparison is numeric (not lexicographic)
754        // In SQLite, json_extract with numeric comparison should work correctly
755        let result = conn
756            .with_connection(|c| {
757                let value: bool = c.query_row(
758                    r#"SELECT json_extract(jsonb('{"count": 9}'), '$.count') > json_extract(jsonb('{"count": 10}'), '$.count')"#,
759                    [],
760                    |row| row.get(0),
761                )?;
762                Ok(value)
763            })
764            .await?;
765        assert!(!result, "Numeric comparison: 9 > 10 should be false");
766
767        Ok(())
768    }
769
770    /// Test JSON path queries with the -> operator (SQLite JSONB syntax).
771    ///
772    /// This test verifies that:
773    /// 1. JSON properties can be queried using path syntax (e.g., `data.status = 'active'`)
774    /// 2. The SQL builder generates correct SQLite JSONB syntax
775    /// 3. Queries return correct results
776    #[tokio::test]
777    async fn test_json_path_query() -> anyhow::Result<()> {
778        use crate::sql_builder::SqlBuilder;
779        use ankql::parser::parse_selection;
780
781        // Test that the SQL builder generates correct JSONB syntax
782        let selection = parse_selection(r#"data.status = 'active'"#).expect("Failed to parse query");
783        let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer"]);
784        builder.table_name("test_table");
785        builder.selection(&selection).map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
786
787        let (sql, _params) = builder.build().map_err(|e| SqliteError::SqlGeneration(e.to_string()))?;
788
789        // Verify the SQL uses json_extract() for reliable JSON path comparisons
790        assert!(sql.contains("json_extract"), "SQL should use json_extract() for JSON path: {}", sql);
791        assert!(sql.contains(r#"json_extract("data", '$.status')"#), "SQL should extract from data column with $.status path: {}", sql);
792
793        Ok(())
794    }
795
796    /// Test the full cycle: store JSONB via parameter, query via json_extract with parameter.
797    /// This mimics exactly what the real code does.
798    #[tokio::test]
799    async fn test_jsonb_storage_and_parameterized_query() -> Result<(), SqliteError> {
800        let engine = SqliteStorageEngine::open_in_memory().await.map_err(|e| SqliteError::DDL(e.to_string()))?;
801        let conn = engine.pool.get().await.map_err(|e| SqliteError::Pool(e.to_string()))?;
802
803        conn.with_connection(|c| {
804            // Create table with BLOB column for JSONB
805            c.execute(r#"CREATE TABLE test_jsonb (id TEXT PRIMARY KEY, data BLOB)"#, [])?;
806
807            // Insert using jsonb(?) - this is what the real code does
808            let json_text = r#"{"territory": "US", "count": 10}"#;
809            c.execute(r#"INSERT INTO test_jsonb (id, data) VALUES (?, jsonb(?))"#, rusqlite::params!["1", json_text])?;
810
811            // Verify data is stored
812            let count: i32 = c.query_row("SELECT COUNT(*) FROM test_jsonb", [], |row| row.get(0))?;
813            assert_eq!(count, 1, "Should have 1 row");
814
815            // Check what's in the data column
816            let data_type: String = c.query_row("SELECT typeof(data) FROM test_jsonb WHERE id = '1'", [], |row| row.get(0))?;
817            eprintln!("Data column type: {}", data_type);
818
819            // Check what json_extract returns
820            let extracted: String =
821                c.query_row(r#"SELECT json_extract(data, '$.territory') FROM test_jsonb WHERE id = '1'"#, [], |row| row.get(0))?;
822            eprintln!("Extracted territory: '{}'", extracted);
823
824            // Now try the parameterized query - THIS IS WHAT THE REAL CODE DOES
825            let query_param = "US";
826            let result: Result<String, _> = c.query_row(
827                r#"SELECT id FROM test_jsonb WHERE json_extract(data, '$.territory') = ?"#,
828                rusqlite::params![query_param],
829                |row| row.get(0),
830            );
831            eprintln!("Query result: {:?}", result);
832
833            match result {
834                Ok(id) => assert_eq!(id, "1", "Should find the row with territory = US"),
835                Err(e) => panic!("Query failed: {:?}", e),
836            }
837
838            Ok(())
839        })
840        .await
841    }
842}