Skip to main content

khive_db/
backend.rs

1//! Concrete storage backend providing capability traits.
2//!
3//! `StorageBackend` owns a `ConnectionPool` and provides factory methods for all
4//! capability traits (`GraphStore`, `NoteStore`, `EventStore`, `VectorStore`,
5//! `TextSearch`, `SqlAccess`). File-backed for production; in-memory for tests.
6
7use std::path::Path;
8use std::sync::Arc;
9
10use rusqlite::OptionalExtension;
11
12use crate::error::SqliteError;
13use crate::pool::{ConnectionPool, PoolConfig};
14use crate::sql_bridge::SqlBridge;
15use crate::stores::{entity, event, graph, note, sparse, text, vectors};
16
17/// Concrete storage backend providing capability traits.
18pub struct StorageBackend {
19    pool: Arc<ConnectionPool>,
20    is_file_backed: bool,
21}
22
23impl StorageBackend {
24    /// File-backed SQLite database.
25    ///
26    /// Opens (or creates) the database at `path`. The underlying pool provides
27    /// 1 writer + N readers in WAL mode for concurrent access.
28    /// No schema is applied — call `apply_schema()` for each service.
29    pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
30        crate::extension::ensure_extensions_loaded();
31        let config = PoolConfig {
32            path: Some(path.as_ref().to_path_buf()),
33            ..PoolConfig::default()
34        };
35        let pool = ConnectionPool::new(config)?;
36        Ok(Self {
37            pool: Arc::new(pool),
38            is_file_backed: true,
39        })
40    }
41
42    /// In-memory SQLite database (for tests).
43    ///
44    /// All data is lost when the backend is dropped. The pool degrades to
45    /// single-connection mode since in-memory databases cannot be shared
46    /// across multiple connections.
47    pub fn memory() -> Result<Self, SqliteError> {
48        crate::extension::ensure_extensions_loaded();
49        let config = PoolConfig {
50            path: None,
51            ..PoolConfig::default()
52        };
53        let pool = ConnectionPool::new(config)?;
54        Ok(Self {
55            pool: Arc::new(pool),
56            is_file_backed: false,
57        })
58    }
59
60    /// Get the SQL access capability.
61    ///
62    /// Returns an `Arc<dyn SqlAccess>` suitable for passing to services.
63    pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
64        Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
65    }
66
67    /// Apply a service's schema plan (run migrations).
68    ///
69    /// Each migration in the plan's `sqlite` list is applied idempotently.
70    /// Already-applied migrations are skipped. The `_schema_versions` table
71    /// tracks which migrations have been run.
72    pub fn apply_schema(
73        &self,
74        plan: &crate::migrations::ServiceSchemaPlan,
75    ) -> Result<(), SqliteError> {
76        let writer = self.pool.try_writer()?;
77        crate::migrations::apply_schema_plan(writer.conn(), plan)
78    }
79
80    /// Apply pack-auxiliary DDL statements.
81    ///
82    /// Executes each DDL statement idempotently via `execute_batch`. Each
83    /// statement MUST be self-contained and use `CREATE TABLE IF NOT EXISTS`
84    /// (or equivalent idempotent DDL) so that calling this method more than
85    /// once does not fail.
86    ///
87    /// Pack auxiliary tables are NOT tracked in `_schema_versions` — they are
88    /// non-versioned. Use `apply_schema` with a `ServiceSchemaPlan` when version
89    /// tracking is needed.
90    ///
91    /// This method is lower-level than `PackRuntime::schema_plan()` — the
92    /// runtime bootstrap calls `pack.schema_plan().statements` and passes the
93    /// slice here. The `SchemaPlan` type lives in `khive-runtime` (above this
94    /// crate in the dep chain); this method accepts a plain `&[&'static str]`
95    /// to avoid a circular dependency.
96    pub fn apply_pack_ddl_statements(
97        &self,
98        statements: &[&'static str],
99    ) -> Result<(), SqliteError> {
100        let writer = self.pool.try_writer()?;
101        for &stmt in statements {
102            writer.conn().execute_batch(stmt)?;
103        }
104        Ok(())
105    }
106
107    /// Get an EntityStore. Applies the entities DDL if not already present.
108    ///
109    /// Idempotent — safe to call multiple times.
110    pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
111        self.entities_for_namespace("local")
112    }
113
114    /// Get an EntityStore. The namespace parameter is validated (non-empty) and
115    /// the entities schema is applied, but the store itself is unscoped — namespace
116    /// is the caller's responsibility on each query/delete call.
117    pub fn entities_for_namespace(
118        &self,
119        namespace: &str,
120    ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
121        if namespace.trim().is_empty() {
122            return Err(SqliteError::InvalidData(
123                "entities namespace must be non-empty".to_string(),
124            ));
125        }
126        let writer = self.pool.try_writer()?;
127        entity::ensure_entities_schema(writer.conn())?;
128
129        Ok(Arc::new(entity::SqlEntityStore::new(
130            Arc::clone(&self.pool),
131            self.is_file_backed,
132        )))
133    }
134
135    /// Get a GraphStore for the default namespace.
136    ///
137    /// Creates the `graph_edges` table (with indexes) if it does not already
138    /// exist. Idempotent — safe to call multiple times.
139    pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
140        self.graph_for_namespace("local")
141    }
142
143    /// Get a GraphStore scoped to a namespace.
144    pub fn graph_for_namespace(
145        &self,
146        namespace: &str,
147    ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
148        if namespace.trim().is_empty() {
149            return Err(SqliteError::InvalidData(
150                "graph namespace must be non-empty".to_string(),
151            ));
152        }
153        let writer = self.pool.try_writer()?;
154        graph::ensure_graph_schema(writer.conn())?;
155
156        Ok(Arc::new(graph::SqlGraphStore::new_scoped(
157            Arc::clone(&self.pool),
158            self.is_file_backed,
159            namespace.trim().to_string(),
160        )))
161    }
162
163    /// Get a NoteStore. Applies the notes DDL if not already present.
164    ///
165    /// Idempotent — safe to call multiple times.
166    pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
167        self.notes_for_namespace("local")
168    }
169
170    /// Get a NoteStore. The namespace parameter is validated (non-empty) and
171    /// the notes schema is applied, but the store itself is unscoped — namespace
172    /// is the caller's responsibility on each query/delete call.
173    pub fn notes_for_namespace(
174        &self,
175        namespace: &str,
176    ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
177        if namespace.trim().is_empty() {
178            return Err(SqliteError::InvalidData(
179                "notes namespace must be non-empty".to_string(),
180            ));
181        }
182        let writer = self.pool.try_writer()?;
183        note::ensure_notes_schema(writer.conn())?;
184
185        Ok(Arc::new(note::SqlNoteStore::new(
186            Arc::clone(&self.pool),
187            self.is_file_backed,
188        )))
189    }
190
191    /// Get an EventStore for the default namespace.
192    ///
193    /// Creates the `events` table (with indexes) if it does not already exist.
194    /// Idempotent — safe to call multiple times.
195    pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
196        self.events_for_namespace("local")
197    }
198
199    /// Get an EventStore scoped to a namespace.
200    pub fn events_for_namespace(
201        &self,
202        namespace: &str,
203    ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
204        if namespace.trim().is_empty() {
205            return Err(SqliteError::InvalidData(
206                "events namespace must be non-empty".to_string(),
207            ));
208        }
209        let writer = self.pool.try_writer()?;
210        event::ensure_events_schema(writer.conn())?;
211
212        Ok(Arc::new(event::SqlEventStore::new_scoped(
213            Arc::clone(&self.pool),
214            self.is_file_backed,
215            namespace.trim().to_string(),
216        )))
217    }
218
219    /// Get a VectorStore for a specific embedding model, scoped to the default namespace.
220    ///
221    /// Creates the vec0 virtual table if it does not already exist. The `model_key`
222    /// must contain only ASCII alphanumeric/underscore characters. The `embedding_model`
223    /// is the canonical display name stored in each vector row.
224    pub fn vectors(
225        &self,
226        model_key: &str,
227        embedding_model: &str,
228        dimensions: usize,
229    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
230        self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
231    }
232
233    /// Get a VectorStore for a specific embedding model with a default namespace.
234    ///
235    /// Creates the vec0 virtual table if it does not already exist. The `namespace`
236    /// is a default for trait methods that lack a per-call namespace parameter
237    /// (count, delete, info). Access control is enforced at the runtime layer.
238    ///
239    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
240    /// The `embedding_model` is the canonical display name stored in the `embedding_model`
241    /// column of each vector row (e.g. `"all-minilm-l6-v2"`).
242    pub fn vectors_for_namespace(
243        &self,
244        model_key: &str,
245        embedding_model: &str,
246        dimensions: usize,
247        namespace: &str,
248    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
249        if model_key.is_empty()
250            || !model_key
251                .chars()
252                .all(|c| c.is_ascii_alphanumeric() || c == '_')
253        {
254            return Err(SqliteError::InvalidData(format!(
255                "invalid model_key '{}': must be non-empty and contain only \
256                 alphanumeric/underscore characters",
257                model_key
258            )));
259        }
260        if namespace.trim().is_empty() {
261            return Err(SqliteError::InvalidData(
262                "vector store namespace must be non-empty".to_string(),
263            ));
264        }
265
266        // Ensure sqlite-vec is registered before creating vec0 tables.
267        crate::extension::ensure_extensions_loaded();
268
269        let table = format!("vec_{}", model_key);
270        let writer = self.pool.try_writer()?;
271
272        // Detect old-schema vec0 tables that predate the `field` column.
273        // vec0 virtual tables do not support ALTER TABLE, so we must drop and recreate
274        // the table if it exists without the `field` column. Vector data is a cache —
275        // callers can re-embed from the source record after the table is rebuilt.
276        // Use pragma_table_info to check columns directly; substring matching on the
277        // CREATE DDL is fragile (a model_key containing "field" would false-match).
278        let table_exists: bool = writer
279            .conn()
280            .query_row(
281                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
282                rusqlite::params![&table],
283                |row| row.get::<_, i64>(0),
284            )
285            .optional()
286            .map_err(SqliteError::Rusqlite)?
287            .is_some();
288
289        if table_exists {
290            // V17 migration (vector_embedding_model_tag_preserving_rebuild) adds
291            // `field` and `embedding_model` to all pre-existing vec0 tables at
292            // migration time.  If this table still lacks either column post-migration
293            // that indicates the database was not migrated — return a hard error
294            // rather than silently dropping data.
295            let pragma = format!("PRAGMA table_xinfo({})", table);
296            let mut stmt = writer.conn().prepare(&pragma)?;
297            let mut rows = stmt.query([])?;
298            let mut has_field = false;
299            let mut has_embedding_model = false;
300            while let Some(row) = rows.next()? {
301                let name: String = row.get(1)?;
302                if name == "field" {
303                    has_field = true;
304                }
305                if name == "embedding_model" {
306                    has_embedding_model = true;
307                }
308            }
309            if !has_field || !has_embedding_model {
310                return Err(SqliteError::InvalidData(format!(
311                    "vec0 table '{}' is missing required column(s) (field={}, \
312                     embedding_model={}); run `kkernel db migrate` to apply V17 \
313                     (vector_embedding_model_tag_preserving_rebuild) before opening \
314                     this vector store",
315                    table, has_field, has_embedding_model,
316                )));
317            }
318        }
319
320        // Ensure the _embedding_models registry table exists.
321        // This is a no-op when the table already exists. Running it here ensures
322        // the registry is present for any caller that opens a vector store without
323        // first calling run_migrations() (e.g., tests that create stores directly).
324        // Production callers are expected to call run_migrations() at startup, which
325        // creates the registry via V14; this is a belt-and-suspenders fallback.
326        // Schema is defined in `migrations::EMBEDDING_MODELS_DDL` (single source of
327        // truth) to prevent the two copies from silently drifting.
328        writer
329            .conn()
330            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
331
332        // Create the vec0 virtual table. Idempotent on fresh databases and after the
333        // old-schema rebuild above.
334        let ddl = format!(
335            "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
336             subject_id TEXT PRIMARY KEY, \
337             namespace TEXT NOT NULL, \
338             kind TEXT NOT NULL, \
339             field TEXT NOT NULL, \
340             embedding_model TEXT NOT NULL, \
341             embedding float[{}] distance_metric=cosine\
342             )",
343            model_key, dimensions
344        );
345        writer.conn().execute_batch(&ddl)?;
346
347        Ok(Arc::new(vectors::SqliteVecStore::new(
348            Arc::clone(&self.pool),
349            self.is_file_backed,
350            model_key.to_string(),
351            embedding_model.to_string(),
352            dimensions,
353            namespace.trim().to_string(),
354        )?))
355    }
356
357    /// Register an embedding model in the `_embedding_models` registry table.
358    ///
359    /// Idempotent: if a row with the same `canonical_key` already exists, updates its
360    /// status back to `'active'` without changing other fields.
361    pub fn register_embedding_model(
362        &self,
363        engine_name: &str,
364        model_id: &str,
365        key_version: &str,
366        dimensions: u32,
367    ) -> Result<(), SqliteError> {
368        let writer = self.pool.try_writer()?;
369        writer
370            .conn()
371            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
372
373        let now = chrono::Utc::now().timestamp_micros();
374        let canonical_key =
375            format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
376        let id = uuid::Uuid::new_v4();
377        writer.conn().execute(
378            "INSERT INTO _embedding_models \
379             (id, engine_name, model_id, key_version, dim, output_dim, status, \
380              activated_at, superseded_at, superseded_by, canonical_key, created_at) \
381             VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
382             ON CONFLICT(canonical_key) DO UPDATE SET \
383                status = 'active', \
384                activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
385            rusqlite::params![
386                id.as_bytes().as_slice(),
387                engine_name,
388                model_id,
389                key_version,
390                dimensions as i64,
391                now,
392                canonical_key,
393                now,
394            ],
395        )?;
396        Ok(())
397    }
398
399    /// Get a SparseStore for a specific model key, scoped to the default namespace.
400    ///
401    /// Creates the sparse table if it does not already exist.
402    pub fn sparse(
403        &self,
404        model_key: &str,
405    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
406        self.sparse_for_namespace(model_key, "local")
407    }
408
409    /// Get a SparseStore for a specific model key with an explicit default namespace.
410    ///
411    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
412    pub fn sparse_for_namespace(
413        &self,
414        model_key: &str,
415        namespace: &str,
416    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
417        if model_key.is_empty()
418            || !model_key
419                .chars()
420                .all(|c| c.is_ascii_alphanumeric() || c == '_')
421        {
422            return Err(SqliteError::InvalidData(format!(
423                "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
424                model_key
425            )));
426        }
427        if namespace.trim().is_empty() {
428            return Err(SqliteError::InvalidData(
429                "sparse store namespace must be non-empty".to_string(),
430            ));
431        }
432
433        let writer = self.pool.try_writer()?;
434        sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
435
436        Ok(Arc::new(sparse::SqliteSparseStore::new(
437            Arc::clone(&self.pool),
438            self.is_file_backed,
439            model_key.to_string(),
440            namespace.trim().to_string(),
441        )?))
442    }
443
444    /// Get a TextSearch for a specific table key.
445    ///
446    /// Creates the FTS5 virtual table if it does not already exist. Uses the
447    /// `trigram` tokenizer by default (CJK-safe).
448    ///
449    /// The `table_key` must contain only ASCII alphanumeric/underscore characters.
450    pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
451        self.text_with_tokenizer(table_key, "trigram")
452    }
453
454    /// Get a TextSearch with an explicit FTS5 tokenizer.
455    ///
456    /// Use when you need a tokenizer other than the default `trigram` — for
457    /// example `unicode61` for Latin-only corpora.
458    ///
459    /// Both `table_key` and `tokenizer` must contain only ASCII
460    /// alphanumeric/underscore characters.
461    pub fn text_with_tokenizer(
462        &self,
463        table_key: &str,
464        tokenizer: &str,
465    ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
466        if table_key.is_empty()
467            || !table_key
468                .chars()
469                .all(|c| c.is_ascii_alphanumeric() || c == '_')
470        {
471            return Err(SqliteError::InvalidData(format!(
472                "invalid table_key '{}': must be non-empty and contain only \
473                 alphanumeric/underscore characters",
474                table_key
475            )));
476        }
477        if tokenizer.is_empty()
478            || !tokenizer
479                .chars()
480                .all(|c| c.is_ascii_alphanumeric() || c == '_')
481        {
482            return Err(SqliteError::InvalidData(format!(
483                "invalid tokenizer '{}': must be non-empty and contain only \
484                 alphanumeric/underscore characters",
485                tokenizer
486            )));
487        }
488
489        let ddl = format!(
490            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
491             subject_id UNINDEXED, \
492             kind UNINDEXED, \
493             title, \
494             body, \
495             tags UNINDEXED, \
496             namespace UNINDEXED, \
497             metadata UNINDEXED, \
498             updated_at UNINDEXED, \
499             tokenize = '{}'\
500             )",
501            table_key, tokenizer
502        );
503        let writer = self.pool.try_writer()?;
504        writer.conn().execute_batch(&ddl)?;
505
506        Ok(Arc::new(text::Fts5TextSearch::new(
507            Arc::clone(&self.pool),
508            self.is_file_backed,
509            table_key.to_string(),
510        )))
511    }
512
513    /// Is this a file-backed backend?
514    pub fn is_file_backed(&self) -> bool {
515        self.is_file_backed
516    }
517
518    /// Access the underlying pool (escape hatch).
519    pub fn pool(&self) -> &ConnectionPool {
520        &self.pool
521    }
522
523    /// Clone the underlying pool Arc.
524    pub fn pool_arc(&self) -> Arc<ConnectionPool> {
525        Arc::clone(&self.pool)
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use khive_storage::types::{SqlStatement, SqlValue};
533
534    #[test]
535    fn memory_backend_creates_successfully() {
536        let backend = StorageBackend::memory().expect("memory backend should create");
537        assert!(!backend.is_file_backed());
538    }
539
540    #[test]
541    fn file_backend_creates_successfully() {
542        let dir = tempfile::tempdir().unwrap();
543        let path = dir.path().join("test.db");
544        let backend = StorageBackend::sqlite(&path).expect("file backend should create");
545        assert!(backend.is_file_backed());
546        assert!(path.exists());
547    }
548
549    #[tokio::test]
550    async fn sql_access_memory_roundtrip() {
551        let backend = StorageBackend::memory().unwrap();
552        let sql = backend.sql();
553
554        let mut writer = sql.writer().await.unwrap();
555        writer
556            .execute_script(
557                "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
558            )
559            .await
560            .unwrap();
561
562        let affected = writer
563            .execute(SqlStatement {
564                sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
565                params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
566                label: None,
567            })
568            .await
569            .unwrap();
570        assert_eq!(affected, 1);
571
572        let mut reader = sql.reader().await.unwrap();
573        let row = reader
574            .query_row(SqlStatement {
575                sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
576                params: vec![SqlValue::Text("row1".into())],
577                label: None,
578            })
579            .await
580            .unwrap();
581
582        let row = row.expect("should find the inserted row");
583        assert_eq!(row.columns.len(), 2);
584        match &row.columns[0].value {
585            SqlValue::Text(s) => assert_eq!(s, "row1"),
586            other => panic!("expected Text, got {other:?}"),
587        }
588        match &row.columns[1].value {
589            SqlValue::Integer(v) => assert_eq!(*v, 42),
590            other => panic!("expected Integer, got {other:?}"),
591        }
592    }
593
594    #[tokio::test]
595    async fn sql_access_file_roundtrip() {
596        let dir = tempfile::tempdir().unwrap();
597        let path = dir.path().join("test_roundtrip.db");
598        let backend = StorageBackend::sqlite(&path).unwrap();
599        let sql = backend.sql();
600
601        let mut writer = sql.writer().await.unwrap();
602        writer
603            .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
604            .await
605            .unwrap();
606        writer
607            .execute(SqlStatement {
608                sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
609                params: vec![
610                    SqlValue::Text("hello".into()),
611                    SqlValue::Text("world".into()),
612                ],
613                label: None,
614            })
615            .await
616            .unwrap();
617
618        let mut reader = sql.reader().await.unwrap();
619        let rows = reader
620            .query_all(SqlStatement {
621                sql: "SELECT k, v FROM test_f".into(),
622                params: vec![],
623                label: None,
624            })
625            .await
626            .unwrap();
627        assert_eq!(rows.len(), 1);
628        match &rows[0].columns[1].value {
629            SqlValue::Text(s) => assert_eq!(s, "world"),
630            other => panic!("expected Text, got {other:?}"),
631        }
632    }
633
634    #[tokio::test]
635    #[cfg(feature = "vectors")]
636    async fn vectors_roundtrip_via_public_api() {
637        let backend = StorageBackend::memory().unwrap();
638        let store = backend.vectors("test_api", "test_api", 3).unwrap();
639
640        let id = uuid::Uuid::new_v4();
641        store
642            .insert(
643                id,
644                khive_types::SubstrateKind::Entity,
645                "local",
646                "content",
647                vec![vec![1.0, 0.0, 0.0]],
648            )
649            .await
650            .unwrap();
651
652        let hits = store
653            .search(khive_storage::types::VectorSearchRequest {
654                query_vectors: vec![vec![1.0, 0.0, 0.0]],
655                top_k: 1,
656                namespace: None,
657                kind: None,
658                embedding_model: None,
659                filter: None,
660                backend_hints: None,
661            })
662            .await
663            .unwrap();
664
665        assert_eq!(hits.len(), 1);
666        assert_eq!(hits[0].subject_id, id);
667        assert!(hits[0].score.to_f64() > 0.99);
668    }
669
670    #[tokio::test]
671    #[cfg(feature = "vectors")]
672    async fn vectors_creates_table_idempotently() {
673        let backend = StorageBackend::memory().unwrap();
674
675        let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
676        let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();
677
678        let id = uuid::Uuid::new_v4();
679        store1
680            .insert(
681                id,
682                khive_types::SubstrateKind::Entity,
683                "local",
684                "content",
685                vec![vec![1.0, 0.0, 0.0]],
686            )
687            .await
688            .unwrap();
689
690        let count = store2.count().await.unwrap();
691        assert_eq!(count, 1);
692    }
693
694    #[tokio::test]
695    async fn text_roundtrip_via_public_api() {
696        let backend = StorageBackend::memory().unwrap();
697        let store = backend.text("test_api").unwrap();
698
699        let id = uuid::Uuid::new_v4();
700        let doc = khive_storage::types::TextDocument {
701            subject_id: id,
702            kind: khive_types::SubstrateKind::Entity,
703            title: Some("Test Title".to_string()),
704            body: "This is a searchable document about Rust.".to_string(),
705            tags: vec!["rust".to_string()],
706            namespace: "test_ns".to_string(),
707            metadata: None,
708            updated_at: chrono::Utc::now(),
709        };
710        store.upsert_document(doc).await.unwrap();
711
712        let hits = store
713            .search(khive_storage::types::TextSearchRequest {
714                query: "Rust".to_string(),
715                mode: khive_storage::types::TextQueryMode::Plain,
716                filter: Some(khive_storage::types::TextFilter {
717                    namespaces: vec!["test_ns".to_string()],
718                    ..Default::default()
719                }),
720                top_k: 1,
721                snippet_chars: 64,
722            })
723            .await
724            .unwrap();
725
726        assert_eq!(hits.len(), 1);
727        assert_eq!(hits[0].subject_id, id);
728        assert!(hits[0].score.to_f64() > 0.0);
729    }
730
731    #[tokio::test]
732    async fn text_creates_table_idempotently() {
733        let backend = StorageBackend::memory().unwrap();
734
735        let store1 = backend.text("idempotent_fts").unwrap();
736        let store2 = backend.text("idempotent_fts").unwrap();
737
738        let id = uuid::Uuid::new_v4();
739        let doc = khive_storage::types::TextDocument {
740            subject_id: id,
741            kind: khive_types::SubstrateKind::Note,
742            title: None,
743            body: "Hello world.".to_string(),
744            tags: vec![],
745            namespace: "test_ns".to_string(),
746            metadata: None,
747            updated_at: chrono::Utc::now(),
748        };
749        store1.upsert_document(doc).await.unwrap();
750
751        let count = store2
752            .count(khive_storage::types::TextFilter {
753                namespaces: vec!["test_ns".to_string()],
754                ..Default::default()
755            })
756            .await
757            .unwrap();
758        assert_eq!(count, 1);
759    }
760
761    #[test]
762    fn invalid_model_key_rejected() {
763        let backend = StorageBackend::memory().unwrap();
764        assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
765        assert!(backend.vectors("", "", 3).is_err());
766    }
767
768    #[test]
769    fn invalid_table_key_rejected() {
770        let backend = StorageBackend::memory().unwrap();
771        assert!(backend.text("bad key!").is_err());
772        assert!(backend.text("").is_err());
773    }
774
775    #[test]
776    fn apply_schema_runs_migrations_idempotently() {
777        static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
778            id: "001_init",
779            up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
780            down_sql: None,
781            is_already_applied: None,
782        }];
783        let plan = crate::migrations::ServiceSchemaPlan {
784            service: "schema_test_svc",
785            sqlite: MIGRATIONS,
786            postgres: &[],
787        };
788
789        let backend = StorageBackend::memory().unwrap();
790        backend.apply_schema(&plan).unwrap();
791        backend.apply_schema(&plan).unwrap();
792
793        let reader = backend.pool().reader().unwrap();
794        let count: i64 = reader
795            .conn()
796            .query_row(
797                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
798                [],
799                |row| row.get(0),
800            )
801            .unwrap();
802        assert_eq!(count, 1);
803    }
804}