Skip to main content

khive_db/
backend.rs

1//! Concrete storage backend providing capability traits.
2//!
3//! `StorageBackend` owns a `ConnectionPool` and provides factory methods for
4//! all storage capability traits (`GraphStore`, `NoteStore`, `EventStore`,
5//! `VectorStore`, `TextSearch`, `SqlAccess`). Services obtain capability handles
6//! without depending on the pool directly.
7//!
8//! # Modes
9//!
10//! - **File-backed** (`StorageBackend::sqlite(path)`): Production use. Opens (or
11//!   creates) the database at the given path. Readers get standalone connections
12//!   for high concurrency.
13//! - **In-memory** (`StorageBackend::memory()`): Testing use. A single shared
14//!   connection through the pool. All data is lost when the backend is dropped.
15//!
16//! # Schema ownership
17//!
18//! `StorageBackend` creates a **bare** pool with no global schema. Each factory
19//! method (`graph()`, `notes()`, etc.) applies only the DDL for its own tables.
20//! Call `apply_schema()` to run service-specific migrations.
21
22use std::path::Path;
23use std::sync::Arc;
24
25use rusqlite::OptionalExtension;
26
27use crate::error::SqliteError;
28use crate::pool::{ConnectionPool, PoolConfig};
29use crate::sql_bridge::SqlBridge;
30use crate::stores::{entity, event, graph, note, sparse, text, vectors};
31
32/// Concrete storage backend providing capability traits.
33pub struct StorageBackend {
34    pool: Arc<ConnectionPool>,
35    is_file_backed: bool,
36}
37
38impl StorageBackend {
39    /// File-backed SQLite database.
40    ///
41    /// Opens (or creates) the database at `path`. The underlying pool provides
42    /// 1 writer + N readers in WAL mode for concurrent access.
43    /// No schema is applied — call `apply_schema()` for each service.
44    pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
45        crate::extension::ensure_extensions_loaded();
46        let config = PoolConfig {
47            path: Some(path.as_ref().to_path_buf()),
48            ..PoolConfig::default()
49        };
50        let pool = ConnectionPool::new(config)?;
51        Ok(Self {
52            pool: Arc::new(pool),
53            is_file_backed: true,
54        })
55    }
56
57    /// In-memory SQLite database (for tests).
58    ///
59    /// All data is lost when the backend is dropped. The pool degrades to
60    /// single-connection mode since in-memory databases cannot be shared
61    /// across multiple connections.
62    pub fn memory() -> Result<Self, SqliteError> {
63        crate::extension::ensure_extensions_loaded();
64        let config = PoolConfig {
65            path: None,
66            ..PoolConfig::default()
67        };
68        let pool = ConnectionPool::new(config)?;
69        Ok(Self {
70            pool: Arc::new(pool),
71            is_file_backed: false,
72        })
73    }
74
75    /// Get the SQL access capability.
76    ///
77    /// Returns an `Arc<dyn SqlAccess>` suitable for passing to services.
78    pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
79        Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
80    }
81
82    /// Apply a service's schema plan (run migrations).
83    ///
84    /// Each migration in the plan's `sqlite` list is applied idempotently.
85    /// Already-applied migrations are skipped. The `_schema_versions` table
86    /// tracks which migrations have been run.
87    pub fn apply_schema(
88        &self,
89        plan: &crate::migrations::ServiceSchemaPlan,
90    ) -> Result<(), SqliteError> {
91        let writer = self.pool.try_writer()?;
92        crate::migrations::apply_schema_plan(writer.conn(), plan)
93    }
94
95    /// Apply pack-auxiliary DDL statements (ADR-017 §Storage profile and
96    /// pack-auxiliary schema).
97    ///
98    /// Executes each DDL statement idempotently via `execute_batch`. Each
99    /// statement MUST be self-contained and use `CREATE TABLE IF NOT EXISTS`
100    /// (or equivalent idempotent DDL) so that calling this method more than
101    /// once does not fail.
102    ///
103    /// Pack auxiliary tables are NOT tracked in `_schema_versions` — they are
104    /// non-versioned in v1 (ADR-017). Use `apply_schema` with a
105    /// `ServiceSchemaPlan` when version tracking is needed.
106    ///
107    /// This method is lower-level than `PackRuntime::schema_plan()` — the
108    /// runtime bootstrap calls `pack.schema_plan().statements` and passes the
109    /// slice here. The `SchemaPlan` type lives in `khive-runtime` (above this
110    /// crate in the dep chain); this method accepts a plain `&[&'static str]`
111    /// to avoid a circular dependency.
112    pub fn apply_pack_ddl_statements(
113        &self,
114        statements: &[&'static str],
115    ) -> Result<(), SqliteError> {
116        let writer = self.pool.try_writer()?;
117        for &stmt in statements {
118            writer.conn().execute_batch(stmt)?;
119        }
120        Ok(())
121    }
122
123    /// Get an EntityStore. Applies the entities DDL if not already present.
124    ///
125    /// Idempotent — safe to call multiple times.
126    pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
127        self.entities_for_namespace("local")
128    }
129
130    /// Get an EntityStore. The namespace parameter is validated (non-empty) and
131    /// the entities schema is applied, but the store itself is unscoped — namespace
132    /// is the caller's responsibility on each query/delete call.
133    pub fn entities_for_namespace(
134        &self,
135        namespace: &str,
136    ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
137        if namespace.trim().is_empty() {
138            return Err(SqliteError::InvalidData(
139                "entities namespace must be non-empty".to_string(),
140            ));
141        }
142        let writer = self.pool.try_writer()?;
143        entity::ensure_entities_schema(writer.conn())?;
144
145        Ok(Arc::new(entity::SqlEntityStore::new(
146            Arc::clone(&self.pool),
147            self.is_file_backed,
148        )))
149    }
150
151    /// Get a GraphStore for the default namespace.
152    ///
153    /// Creates the `graph_edges` table (with indexes) if it does not already
154    /// exist. Idempotent — safe to call multiple times.
155    pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
156        self.graph_for_namespace("local")
157    }
158
159    /// Get a GraphStore scoped to a namespace.
160    pub fn graph_for_namespace(
161        &self,
162        namespace: &str,
163    ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
164        if namespace.trim().is_empty() {
165            return Err(SqliteError::InvalidData(
166                "graph namespace must be non-empty".to_string(),
167            ));
168        }
169        let writer = self.pool.try_writer()?;
170        graph::ensure_graph_schema(writer.conn())?;
171
172        Ok(Arc::new(graph::SqlGraphStore::new_scoped(
173            Arc::clone(&self.pool),
174            self.is_file_backed,
175            namespace.trim().to_string(),
176        )))
177    }
178
179    /// Get a NoteStore. Applies the notes DDL if not already present.
180    ///
181    /// Idempotent — safe to call multiple times.
182    pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
183        self.notes_for_namespace("local")
184    }
185
186    /// Get a NoteStore. The namespace parameter is validated (non-empty) and
187    /// the notes schema is applied, but the store itself is unscoped — namespace
188    /// is the caller's responsibility on each query/delete call.
189    pub fn notes_for_namespace(
190        &self,
191        namespace: &str,
192    ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
193        if namespace.trim().is_empty() {
194            return Err(SqliteError::InvalidData(
195                "notes namespace must be non-empty".to_string(),
196            ));
197        }
198        let writer = self.pool.try_writer()?;
199        note::ensure_notes_schema(writer.conn())?;
200
201        Ok(Arc::new(note::SqlNoteStore::new(
202            Arc::clone(&self.pool),
203            self.is_file_backed,
204        )))
205    }
206
207    /// Get an EventStore for the default namespace.
208    ///
209    /// Creates the `events` table (with indexes) if it does not already exist.
210    /// Idempotent — safe to call multiple times.
211    pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
212        self.events_for_namespace("local")
213    }
214
215    /// Get an EventStore scoped to a namespace.
216    pub fn events_for_namespace(
217        &self,
218        namespace: &str,
219    ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
220        if namespace.trim().is_empty() {
221            return Err(SqliteError::InvalidData(
222                "events namespace must be non-empty".to_string(),
223            ));
224        }
225        let writer = self.pool.try_writer()?;
226        event::ensure_events_schema(writer.conn())?;
227
228        Ok(Arc::new(event::SqlEventStore::new_scoped(
229            Arc::clone(&self.pool),
230            self.is_file_backed,
231            namespace.trim().to_string(),
232        )))
233    }
234
235    /// Get a VectorStore for a specific embedding model, scoped to the default namespace.
236    ///
237    /// Creates the vec0 virtual table if it does not already exist. The `model_key`
238    /// must contain only ASCII alphanumeric/underscore characters.
239    pub fn vectors(
240        &self,
241        model_key: &str,
242        dimensions: usize,
243    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
244        self.vectors_for_namespace(model_key, dimensions, "local")
245    }
246
247    /// Get a VectorStore for a specific embedding model with a default namespace.
248    ///
249    /// Creates the vec0 virtual table if it does not already exist. The `namespace`
250    /// is a default for trait methods that lack a per-call namespace parameter
251    /// (count, delete, info). Access control is enforced at the runtime layer.
252    ///
253    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
254    pub fn vectors_for_namespace(
255        &self,
256        model_key: &str,
257        dimensions: usize,
258        namespace: &str,
259    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
260        if model_key.is_empty()
261            || !model_key
262                .chars()
263                .all(|c| c.is_ascii_alphanumeric() || c == '_')
264        {
265            return Err(SqliteError::InvalidData(format!(
266                "invalid model_key '{}': must be non-empty and contain only \
267                 alphanumeric/underscore characters",
268                model_key
269            )));
270        }
271        if namespace.trim().is_empty() {
272            return Err(SqliteError::InvalidData(
273                "vector store namespace must be non-empty".to_string(),
274            ));
275        }
276
277        // Ensure sqlite-vec is registered before creating vec0 tables.
278        crate::extension::ensure_extensions_loaded();
279
280        let table = format!("vec_{}", model_key);
281        let writer = self.pool.try_writer()?;
282
283        // Detect old-schema vec0 tables that predate the `field` column (ADR-044).
284        // vec0 virtual tables do not support ALTER TABLE, so we must drop and recreate
285        // the table if it exists without the `field` column. Vector data is a cache —
286        // callers can re-embed from the source record after the table is rebuilt.
287        // Use pragma_table_info to check columns directly; substring matching on the
288        // CREATE DDL is fragile (a model_key containing "field" would false-match).
289        let table_exists: bool = writer
290            .conn()
291            .query_row(
292                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
293                rusqlite::params![&table],
294                |row| row.get::<_, i64>(0),
295            )
296            .optional()
297            .map_err(SqliteError::Rusqlite)?
298            .is_some();
299
300        if table_exists {
301            let has_field: bool = {
302                let pragma = format!("PRAGMA table_xinfo({})", table);
303                let mut stmt = writer.conn().prepare(&pragma)?;
304                let mut rows = stmt.query([])?;
305                let mut found = false;
306                while let Some(row) = rows.next()? {
307                    let name: String = row.get(1)?;
308                    if name == "field" {
309                        found = true;
310                        break;
311                    }
312                }
313                found
314            };
315            if !has_field {
316                let drop_ddl = format!("DROP TABLE IF EXISTS {}", table);
317                writer.conn().execute_batch(&drop_ddl)?;
318            }
319        }
320
321        // Ensure the _embedding_models registry table exists (ADR-043 §1).
322        // This is a no-op when the table already exists. Running it here ensures
323        // the registry is present for any caller that opens a vector store without
324        // first calling run_migrations() (e.g., tests that create stores directly).
325        // Production callers are expected to call run_migrations() at startup, which
326        // creates the registry via V14; this is a belt-and-suspenders fallback.
327        // Schema is defined in `migrations::EMBEDDING_MODELS_DDL` (single source of
328        // truth) to prevent the two copies from silently drifting.
329        writer
330            .conn()
331            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
332
333        // Create the vec0 virtual table. Idempotent on fresh databases and after the
334        // old-schema rebuild above.
335        //
336        // NOTE: `embedding_model_id` is NOT included in this DDL because sqlite-vec
337        // enforces NOT NULL on TEXT metadata columns at insert time, so the column
338        // cannot be added at virtual-table creation as a nullable FK.  The column will
339        // be present after the ADR-043 §8 startup backfill rebuild (steps 2-4), which
340        // is deferred to a follow-up PR — see the tracking issue filed against MAJ-2
341        // of codex round-1 review of PR #374.
342        let ddl = format!(
343            "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
344             subject_id TEXT PRIMARY KEY, \
345             namespace TEXT NOT NULL, \
346             kind TEXT NOT NULL, \
347             field TEXT NOT NULL, \
348             embedding float[{}] distance_metric=cosine\
349             )",
350            model_key, dimensions
351        );
352        writer.conn().execute_batch(&ddl)?;
353
354        Ok(Arc::new(vectors::SqliteVecStore::new(
355            Arc::clone(&self.pool),
356            self.is_file_backed,
357            model_key.to_string(),
358            dimensions,
359            namespace.trim().to_string(),
360        )?))
361    }
362
363    /// Get a SparseStore for a specific model key, scoped to the default namespace.
364    ///
365    /// Creates the sparse table if it does not already exist.
366    pub fn sparse(
367        &self,
368        model_key: &str,
369    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
370        self.sparse_for_namespace(model_key, "local")
371    }
372
373    /// Get a SparseStore for a specific model key with an explicit default namespace.
374    ///
375    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
376    pub fn sparse_for_namespace(
377        &self,
378        model_key: &str,
379        namespace: &str,
380    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
381        if model_key.is_empty()
382            || !model_key
383                .chars()
384                .all(|c| c.is_ascii_alphanumeric() || c == '_')
385        {
386            return Err(SqliteError::InvalidData(format!(
387                "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
388                model_key
389            )));
390        }
391        if namespace.trim().is_empty() {
392            return Err(SqliteError::InvalidData(
393                "sparse store namespace must be non-empty".to_string(),
394            ));
395        }
396
397        let writer = self.pool.try_writer()?;
398        sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
399
400        Ok(Arc::new(sparse::SqliteSparseStore::new(
401            Arc::clone(&self.pool),
402            self.is_file_backed,
403            model_key.to_string(),
404            namespace.trim().to_string(),
405        )?))
406    }
407
408    /// Get a TextSearch for a specific table key.
409    ///
410    /// Creates the FTS5 virtual table if it does not already exist. Uses the
411    /// `trigram` tokenizer by default (CJK-safe, ADR-013).
412    ///
413    /// The `table_key` must contain only ASCII alphanumeric/underscore characters.
414    pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
415        self.text_with_tokenizer(table_key, "trigram")
416    }
417
418    /// Get a TextSearch with an explicit FTS5 tokenizer.
419    ///
420    /// Use when you need a tokenizer other than the default `trigram` — for
421    /// example `unicode61` for Latin-only corpora.
422    ///
423    /// Both `table_key` and `tokenizer` must contain only ASCII
424    /// alphanumeric/underscore characters.
425    pub fn text_with_tokenizer(
426        &self,
427        table_key: &str,
428        tokenizer: &str,
429    ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
430        if table_key.is_empty()
431            || !table_key
432                .chars()
433                .all(|c| c.is_ascii_alphanumeric() || c == '_')
434        {
435            return Err(SqliteError::InvalidData(format!(
436                "invalid table_key '{}': must be non-empty and contain only \
437                 alphanumeric/underscore characters",
438                table_key
439            )));
440        }
441        if tokenizer.is_empty()
442            || !tokenizer
443                .chars()
444                .all(|c| c.is_ascii_alphanumeric() || c == '_')
445        {
446            return Err(SqliteError::InvalidData(format!(
447                "invalid tokenizer '{}': must be non-empty and contain only \
448                 alphanumeric/underscore characters",
449                tokenizer
450            )));
451        }
452
453        let ddl = format!(
454            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
455             subject_id UNINDEXED, \
456             kind UNINDEXED, \
457             title, \
458             body, \
459             tags UNINDEXED, \
460             namespace UNINDEXED, \
461             metadata UNINDEXED, \
462             updated_at UNINDEXED, \
463             tokenize = '{}'\
464             )",
465            table_key, tokenizer
466        );
467        let writer = self.pool.try_writer()?;
468        writer.conn().execute_batch(&ddl)?;
469
470        Ok(Arc::new(text::Fts5TextSearch::new(
471            Arc::clone(&self.pool),
472            self.is_file_backed,
473            table_key.to_string(),
474        )))
475    }
476
477    /// Is this a file-backed backend?
478    pub fn is_file_backed(&self) -> bool {
479        self.is_file_backed
480    }
481
482    /// Access the underlying pool (escape hatch).
483    pub fn pool(&self) -> &ConnectionPool {
484        &self.pool
485    }
486
487    /// Clone the underlying pool Arc.
488    pub fn pool_arc(&self) -> Arc<ConnectionPool> {
489        Arc::clone(&self.pool)
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use khive_storage::types::{SqlStatement, SqlValue};
497
498    #[test]
499    fn memory_backend_creates_successfully() {
500        let backend = StorageBackend::memory().expect("memory backend should create");
501        assert!(!backend.is_file_backed());
502    }
503
504    #[test]
505    fn file_backend_creates_successfully() {
506        let dir = tempfile::tempdir().unwrap();
507        let path = dir.path().join("test.db");
508        let backend = StorageBackend::sqlite(&path).expect("file backend should create");
509        assert!(backend.is_file_backed());
510        assert!(path.exists());
511    }
512
513    #[tokio::test]
514    async fn sql_access_memory_roundtrip() {
515        let backend = StorageBackend::memory().unwrap();
516        let sql = backend.sql();
517
518        let mut writer = sql.writer().await.unwrap();
519        writer
520            .execute_script(
521                "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
522            )
523            .await
524            .unwrap();
525
526        let affected = writer
527            .execute(SqlStatement {
528                sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
529                params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
530                label: None,
531            })
532            .await
533            .unwrap();
534        assert_eq!(affected, 1);
535
536        let mut reader = sql.reader().await.unwrap();
537        let row = reader
538            .query_row(SqlStatement {
539                sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
540                params: vec![SqlValue::Text("row1".into())],
541                label: None,
542            })
543            .await
544            .unwrap();
545
546        let row = row.expect("should find the inserted row");
547        assert_eq!(row.columns.len(), 2);
548        match &row.columns[0].value {
549            SqlValue::Text(s) => assert_eq!(s, "row1"),
550            other => panic!("expected Text, got {other:?}"),
551        }
552        match &row.columns[1].value {
553            SqlValue::Integer(v) => assert_eq!(*v, 42),
554            other => panic!("expected Integer, got {other:?}"),
555        }
556    }
557
558    #[tokio::test]
559    async fn sql_access_file_roundtrip() {
560        let dir = tempfile::tempdir().unwrap();
561        let path = dir.path().join("test_roundtrip.db");
562        let backend = StorageBackend::sqlite(&path).unwrap();
563        let sql = backend.sql();
564
565        let mut writer = sql.writer().await.unwrap();
566        writer
567            .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
568            .await
569            .unwrap();
570        writer
571            .execute(SqlStatement {
572                sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
573                params: vec![
574                    SqlValue::Text("hello".into()),
575                    SqlValue::Text("world".into()),
576                ],
577                label: None,
578            })
579            .await
580            .unwrap();
581
582        let mut reader = sql.reader().await.unwrap();
583        let rows = reader
584            .query_all(SqlStatement {
585                sql: "SELECT k, v FROM test_f".into(),
586                params: vec![],
587                label: None,
588            })
589            .await
590            .unwrap();
591        assert_eq!(rows.len(), 1);
592        match &rows[0].columns[1].value {
593            SqlValue::Text(s) => assert_eq!(s, "world"),
594            other => panic!("expected Text, got {other:?}"),
595        }
596    }
597
598    #[tokio::test]
599    #[cfg(feature = "vectors")]
600    async fn vectors_roundtrip_via_public_api() {
601        let backend = StorageBackend::memory().unwrap();
602        let store = backend.vectors("test_api", 3).unwrap();
603
604        let id = uuid::Uuid::new_v4();
605        store
606            .insert(
607                id,
608                khive_types::SubstrateKind::Entity,
609                "local",
610                "content",
611                vec![vec![1.0, 0.0, 0.0]],
612            )
613            .await
614            .unwrap();
615
616        let hits = store
617            .search(khive_storage::types::VectorSearchRequest {
618                query_vectors: vec![vec![1.0, 0.0, 0.0]],
619                top_k: 1,
620                namespace: None,
621                kind: None,
622                filter: None,
623                backend_hints: None,
624            })
625            .await
626            .unwrap();
627
628        assert_eq!(hits.len(), 1);
629        assert_eq!(hits[0].subject_id, id);
630        assert!(hits[0].score.to_f64() > 0.99);
631    }
632
633    #[tokio::test]
634    #[cfg(feature = "vectors")]
635    async fn vectors_creates_table_idempotently() {
636        let backend = StorageBackend::memory().unwrap();
637
638        let store1 = backend.vectors("idempotent", 3).unwrap();
639        let store2 = backend.vectors("idempotent", 3).unwrap();
640
641        let id = uuid::Uuid::new_v4();
642        store1
643            .insert(
644                id,
645                khive_types::SubstrateKind::Entity,
646                "local",
647                "content",
648                vec![vec![1.0, 0.0, 0.0]],
649            )
650            .await
651            .unwrap();
652
653        let count = store2.count().await.unwrap();
654        assert_eq!(count, 1);
655    }
656
657    #[tokio::test]
658    async fn text_roundtrip_via_public_api() {
659        let backend = StorageBackend::memory().unwrap();
660        let store = backend.text("test_api").unwrap();
661
662        let id = uuid::Uuid::new_v4();
663        let doc = khive_storage::types::TextDocument {
664            subject_id: id,
665            kind: khive_types::SubstrateKind::Entity,
666            title: Some("Test Title".to_string()),
667            body: "This is a searchable document about Rust.".to_string(),
668            tags: vec!["rust".to_string()],
669            namespace: "test_ns".to_string(),
670            metadata: None,
671            updated_at: chrono::Utc::now(),
672        };
673        store.upsert_document(doc).await.unwrap();
674
675        let hits = store
676            .search(khive_storage::types::TextSearchRequest {
677                query: "Rust".to_string(),
678                mode: khive_storage::types::TextQueryMode::Plain,
679                filter: Some(khive_storage::types::TextFilter {
680                    namespaces: vec!["test_ns".to_string()],
681                    ..Default::default()
682                }),
683                top_k: 1,
684                snippet_chars: 64,
685            })
686            .await
687            .unwrap();
688
689        assert_eq!(hits.len(), 1);
690        assert_eq!(hits[0].subject_id, id);
691        assert!(hits[0].score.to_f64() > 0.0);
692    }
693
694    #[tokio::test]
695    async fn text_creates_table_idempotently() {
696        let backend = StorageBackend::memory().unwrap();
697
698        let store1 = backend.text("idempotent_fts").unwrap();
699        let store2 = backend.text("idempotent_fts").unwrap();
700
701        let id = uuid::Uuid::new_v4();
702        let doc = khive_storage::types::TextDocument {
703            subject_id: id,
704            kind: khive_types::SubstrateKind::Note,
705            title: None,
706            body: "Hello world.".to_string(),
707            tags: vec![],
708            namespace: "test_ns".to_string(),
709            metadata: None,
710            updated_at: chrono::Utc::now(),
711        };
712        store1.upsert_document(doc).await.unwrap();
713
714        let count = store2
715            .count(khive_storage::types::TextFilter {
716                namespaces: vec!["test_ns".to_string()],
717                ..Default::default()
718            })
719            .await
720            .unwrap();
721        assert_eq!(count, 1);
722    }
723
724    #[test]
725    fn invalid_model_key_rejected() {
726        let backend = StorageBackend::memory().unwrap();
727        assert!(backend.vectors("bad key!", 3).is_err());
728        assert!(backend.vectors("", 3).is_err());
729    }
730
731    #[test]
732    fn invalid_table_key_rejected() {
733        let backend = StorageBackend::memory().unwrap();
734        assert!(backend.text("bad key!").is_err());
735        assert!(backend.text("").is_err());
736    }
737
738    #[test]
739    fn apply_schema_runs_migrations_idempotently() {
740        static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
741            id: "001_init",
742            up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
743            down_sql: None,
744            is_already_applied: None,
745        }];
746        let plan = crate::migrations::ServiceSchemaPlan {
747            service: "schema_test_svc",
748            sqlite: MIGRATIONS,
749            postgres: &[],
750        };
751
752        let backend = StorageBackend::memory().unwrap();
753        backend.apply_schema(&plan).unwrap();
754        backend.apply_schema(&plan).unwrap();
755
756        let reader = backend.pool().reader().unwrap();
757        let count: i64 = reader
758            .conn()
759            .query_row(
760                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
761                [],
762                |row| row.get(0),
763            )
764            .unwrap();
765        assert_eq!(count, 1);
766    }
767}