Skip to main content

khive_db/
backend.rs

1//! Concrete storage backend providing capability traits.
2//!
3//! `StorageBackend` owns a `ConnectionPool` and provides factory methods for
4//! all storage capability traits (`GraphStore`, `NoteStore`, `EventStore`,
5//! `VectorStore`, `TextSearch`, `SqlAccess`). Services obtain capability handles
6//! without depending on the pool directly.
7//!
8//! # Modes
9//!
10//! - **File-backed** (`StorageBackend::sqlite(path)`): Production use. Opens (or
11//!   creates) the database at the given path. Readers get standalone connections
12//!   for high concurrency.
13//! - **In-memory** (`StorageBackend::memory()`): Testing use. A single shared
14//!   connection through the pool. All data is lost when the backend is dropped.
15//!
16//! # Schema ownership
17//!
18//! `StorageBackend` creates a **bare** pool with no global schema. Each factory
19//! method (`graph()`, `notes()`, etc.) applies only the DDL for its own tables.
20//! Call `apply_schema()` to run service-specific migrations.
21
22use std::path::Path;
23use std::sync::Arc;
24
25use rusqlite::OptionalExtension;
26
27use crate::error::SqliteError;
28use crate::pool::{ConnectionPool, PoolConfig};
29use crate::sql_bridge::SqlBridge;
30use crate::stores::{entity, event, graph, note, sparse, text, vectors};
31
32/// Concrete storage backend providing capability traits.
33pub struct StorageBackend {
34    pool: Arc<ConnectionPool>,
35    is_file_backed: bool,
36}
37
38impl StorageBackend {
39    /// File-backed SQLite database.
40    ///
41    /// Opens (or creates) the database at `path`. The underlying pool provides
42    /// 1 writer + N readers in WAL mode for concurrent access.
43    /// No schema is applied — call `apply_schema()` for each service.
44    pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
45        crate::extension::ensure_extensions_loaded();
46        let config = PoolConfig {
47            path: Some(path.as_ref().to_path_buf()),
48            ..PoolConfig::default()
49        };
50        let pool = ConnectionPool::new(config)?;
51        Ok(Self {
52            pool: Arc::new(pool),
53            is_file_backed: true,
54        })
55    }
56
57    /// In-memory SQLite database (for tests).
58    ///
59    /// All data is lost when the backend is dropped. The pool degrades to
60    /// single-connection mode since in-memory databases cannot be shared
61    /// across multiple connections.
62    pub fn memory() -> Result<Self, SqliteError> {
63        crate::extension::ensure_extensions_loaded();
64        let config = PoolConfig {
65            path: None,
66            ..PoolConfig::default()
67        };
68        let pool = ConnectionPool::new(config)?;
69        Ok(Self {
70            pool: Arc::new(pool),
71            is_file_backed: false,
72        })
73    }
74
75    /// Get the SQL access capability.
76    ///
77    /// Returns an `Arc<dyn SqlAccess>` suitable for passing to services.
78    pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
79        Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
80    }
81
82    /// Apply a service's schema plan (run migrations).
83    ///
84    /// Each migration in the plan's `sqlite` list is applied idempotently.
85    /// Already-applied migrations are skipped. The `_schema_versions` table
86    /// tracks which migrations have been run.
87    pub fn apply_schema(
88        &self,
89        plan: &crate::migrations::ServiceSchemaPlan,
90    ) -> Result<(), SqliteError> {
91        let writer = self.pool.try_writer()?;
92        crate::migrations::apply_schema_plan(writer.conn(), plan)
93    }
94
95    /// Apply pack-auxiliary DDL statements (ADR-017 §Storage profile and
96    /// pack-auxiliary schema).
97    ///
98    /// Executes each DDL statement idempotently via `execute_batch`. Each
99    /// statement MUST be self-contained and use `CREATE TABLE IF NOT EXISTS`
100    /// (or equivalent idempotent DDL) so that calling this method more than
101    /// once does not fail.
102    ///
103    /// Pack auxiliary tables are NOT tracked in `_schema_versions` — they are
104    /// non-versioned in v1 (ADR-017). Use `apply_schema` with a
105    /// `ServiceSchemaPlan` when version tracking is needed.
106    ///
107    /// This method is lower-level than `PackRuntime::schema_plan()` — the
108    /// runtime bootstrap calls `pack.schema_plan().statements` and passes the
109    /// slice here. The `SchemaPlan` type lives in `khive-runtime` (above this
110    /// crate in the dep chain); this method accepts a plain `&[&'static str]`
111    /// to avoid a circular dependency.
112    pub fn apply_pack_ddl_statements(
113        &self,
114        statements: &[&'static str],
115    ) -> Result<(), SqliteError> {
116        let writer = self.pool.try_writer()?;
117        for &stmt in statements {
118            writer.conn().execute_batch(stmt)?;
119        }
120        Ok(())
121    }
122
123    /// Get an EntityStore. Applies the entities DDL if not already present.
124    ///
125    /// Idempotent — safe to call multiple times.
126    pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
127        self.entities_for_namespace("local")
128    }
129
130    /// Get an EntityStore. The namespace parameter is validated (non-empty) and
131    /// the entities schema is applied, but the store itself is unscoped — namespace
132    /// is the caller's responsibility on each query/delete call.
133    pub fn entities_for_namespace(
134        &self,
135        namespace: &str,
136    ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
137        if namespace.trim().is_empty() {
138            return Err(SqliteError::InvalidData(
139                "entities namespace must be non-empty".to_string(),
140            ));
141        }
142        let writer = self.pool.try_writer()?;
143        entity::ensure_entities_schema(writer.conn())?;
144
145        Ok(Arc::new(entity::SqlEntityStore::new(
146            Arc::clone(&self.pool),
147            self.is_file_backed,
148        )))
149    }
150
151    /// Get a GraphStore for the default namespace.
152    ///
153    /// Creates the `graph_edges` table (with indexes) if it does not already
154    /// exist. Idempotent — safe to call multiple times.
155    pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
156        self.graph_for_namespace("local")
157    }
158
159    /// Get a GraphStore scoped to a namespace.
160    pub fn graph_for_namespace(
161        &self,
162        namespace: &str,
163    ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
164        if namespace.trim().is_empty() {
165            return Err(SqliteError::InvalidData(
166                "graph namespace must be non-empty".to_string(),
167            ));
168        }
169        let writer = self.pool.try_writer()?;
170        graph::ensure_graph_schema(writer.conn())?;
171
172        Ok(Arc::new(graph::SqlGraphStore::new_scoped(
173            Arc::clone(&self.pool),
174            self.is_file_backed,
175            namespace.trim().to_string(),
176        )))
177    }
178
179    /// Get a NoteStore. Applies the notes DDL if not already present.
180    ///
181    /// Idempotent — safe to call multiple times.
182    pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
183        self.notes_for_namespace("local")
184    }
185
186    /// Get a NoteStore. The namespace parameter is validated (non-empty) and
187    /// the notes schema is applied, but the store itself is unscoped — namespace
188    /// is the caller's responsibility on each query/delete call.
189    pub fn notes_for_namespace(
190        &self,
191        namespace: &str,
192    ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
193        if namespace.trim().is_empty() {
194            return Err(SqliteError::InvalidData(
195                "notes namespace must be non-empty".to_string(),
196            ));
197        }
198        let writer = self.pool.try_writer()?;
199        note::ensure_notes_schema(writer.conn())?;
200
201        Ok(Arc::new(note::SqlNoteStore::new(
202            Arc::clone(&self.pool),
203            self.is_file_backed,
204        )))
205    }
206
207    /// Get an EventStore for the default namespace.
208    ///
209    /// Creates the `events` table (with indexes) if it does not already exist.
210    /// Idempotent — safe to call multiple times.
211    pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
212        self.events_for_namespace("local")
213    }
214
215    /// Get an EventStore scoped to a namespace.
216    pub fn events_for_namespace(
217        &self,
218        namespace: &str,
219    ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
220        if namespace.trim().is_empty() {
221            return Err(SqliteError::InvalidData(
222                "events namespace must be non-empty".to_string(),
223            ));
224        }
225        let writer = self.pool.try_writer()?;
226        event::ensure_events_schema(writer.conn())?;
227
228        Ok(Arc::new(event::SqlEventStore::new_scoped(
229            Arc::clone(&self.pool),
230            self.is_file_backed,
231            namespace.trim().to_string(),
232        )))
233    }
234
235    /// Get a VectorStore for a specific embedding model, scoped to the default namespace.
236    ///
237    /// Creates the vec0 virtual table if it does not already exist. The `model_key`
238    /// must contain only ASCII alphanumeric/underscore characters. The `embedding_model`
239    /// is the canonical display name stored in each vector row.
240    pub fn vectors(
241        &self,
242        model_key: &str,
243        embedding_model: &str,
244        dimensions: usize,
245    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
246        self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
247    }
248
249    /// Get a VectorStore for a specific embedding model with a default namespace.
250    ///
251    /// Creates the vec0 virtual table if it does not already exist. The `namespace`
252    /// is a default for trait methods that lack a per-call namespace parameter
253    /// (count, delete, info). Access control is enforced at the runtime layer.
254    ///
255    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
256    /// The `embedding_model` is the canonical display name stored in the `embedding_model`
257    /// column of each vector row (e.g. `"all-minilm-l6-v2"`).
258    pub fn vectors_for_namespace(
259        &self,
260        model_key: &str,
261        embedding_model: &str,
262        dimensions: usize,
263        namespace: &str,
264    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
265        if model_key.is_empty()
266            || !model_key
267                .chars()
268                .all(|c| c.is_ascii_alphanumeric() || c == '_')
269        {
270            return Err(SqliteError::InvalidData(format!(
271                "invalid model_key '{}': must be non-empty and contain only \
272                 alphanumeric/underscore characters",
273                model_key
274            )));
275        }
276        if namespace.trim().is_empty() {
277            return Err(SqliteError::InvalidData(
278                "vector store namespace must be non-empty".to_string(),
279            ));
280        }
281
282        // Ensure sqlite-vec is registered before creating vec0 tables.
283        crate::extension::ensure_extensions_loaded();
284
285        let table = format!("vec_{}", model_key);
286        let writer = self.pool.try_writer()?;
287
288        // Detect old-schema vec0 tables that predate the `field` column (ADR-044).
289        // vec0 virtual tables do not support ALTER TABLE, so we must drop and recreate
290        // the table if it exists without the `field` column. Vector data is a cache —
291        // callers can re-embed from the source record after the table is rebuilt.
292        // Use pragma_table_info to check columns directly; substring matching on the
293        // CREATE DDL is fragile (a model_key containing "field" would false-match).
294        let table_exists: bool = writer
295            .conn()
296            .query_row(
297                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
298                rusqlite::params![&table],
299                |row| row.get::<_, i64>(0),
300            )
301            .optional()
302            .map_err(SqliteError::Rusqlite)?
303            .is_some();
304
305        if table_exists {
306            // V17 migration (vector_embedding_model_tag_preserving_rebuild) adds
307            // `field` and `embedding_model` to all pre-existing vec0 tables at
308            // migration time.  If this table still lacks either column post-migration
309            // that indicates the database was not migrated — return a hard error
310            // rather than silently dropping data.
311            let pragma = format!("PRAGMA table_xinfo({})", table);
312            let mut stmt = writer.conn().prepare(&pragma)?;
313            let mut rows = stmt.query([])?;
314            let mut has_field = false;
315            let mut has_embedding_model = false;
316            while let Some(row) = rows.next()? {
317                let name: String = row.get(1)?;
318                if name == "field" {
319                    has_field = true;
320                }
321                if name == "embedding_model" {
322                    has_embedding_model = true;
323                }
324            }
325            if !has_field || !has_embedding_model {
326                return Err(SqliteError::InvalidData(format!(
327                    "vec0 table '{}' is missing required column(s) (field={}, \
328                     embedding_model={}); run `kkernel db migrate` to apply V17 \
329                     (vector_embedding_model_tag_preserving_rebuild) before opening \
330                     this vector store",
331                    table, has_field, has_embedding_model,
332                )));
333            }
334        }
335
336        // Ensure the _embedding_models registry table exists (ADR-043 §1).
337        // This is a no-op when the table already exists. Running it here ensures
338        // the registry is present for any caller that opens a vector store without
339        // first calling run_migrations() (e.g., tests that create stores directly).
340        // Production callers are expected to call run_migrations() at startup, which
341        // creates the registry via V14; this is a belt-and-suspenders fallback.
342        // Schema is defined in `migrations::EMBEDDING_MODELS_DDL` (single source of
343        // truth) to prevent the two copies from silently drifting.
344        writer
345            .conn()
346            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
347
348        // Create the vec0 virtual table. Idempotent on fresh databases and after the
349        // old-schema rebuild above.
350        let ddl = format!(
351            "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
352             subject_id TEXT PRIMARY KEY, \
353             namespace TEXT NOT NULL, \
354             kind TEXT NOT NULL, \
355             field TEXT NOT NULL, \
356             embedding_model TEXT NOT NULL, \
357             embedding float[{}] distance_metric=cosine\
358             )",
359            model_key, dimensions
360        );
361        writer.conn().execute_batch(&ddl)?;
362
363        Ok(Arc::new(vectors::SqliteVecStore::new(
364            Arc::clone(&self.pool),
365            self.is_file_backed,
366            model_key.to_string(),
367            embedding_model.to_string(),
368            dimensions,
369            namespace.trim().to_string(),
370        )?))
371    }
372
373    /// Register an embedding model in the `_embedding_models` registry table (ADR-043).
374    ///
375    /// Idempotent: if a row with the same `canonical_key` already exists, updates its
376    /// status back to `'active'` without changing other fields.
377    pub fn register_embedding_model(
378        &self,
379        engine_name: &str,
380        model_id: &str,
381        key_version: &str,
382        dimensions: u32,
383    ) -> Result<(), SqliteError> {
384        let writer = self.pool.try_writer()?;
385        writer
386            .conn()
387            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
388
389        let now = chrono::Utc::now().timestamp_micros();
390        let canonical_key =
391            format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
392        let id = uuid::Uuid::new_v4();
393        writer.conn().execute(
394            "INSERT INTO _embedding_models \
395             (id, engine_name, model_id, key_version, dim, output_dim, status, \
396              activated_at, superseded_at, superseded_by, canonical_key, created_at) \
397             VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
398             ON CONFLICT(canonical_key) DO UPDATE SET \
399                status = 'active', \
400                activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
401            rusqlite::params![
402                id.as_bytes().as_slice(),
403                engine_name,
404                model_id,
405                key_version,
406                dimensions as i64,
407                now,
408                canonical_key,
409                now,
410            ],
411        )?;
412        Ok(())
413    }
414
415    /// Get a SparseStore for a specific model key, scoped to the default namespace.
416    ///
417    /// Creates the sparse table if it does not already exist.
418    pub fn sparse(
419        &self,
420        model_key: &str,
421    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
422        self.sparse_for_namespace(model_key, "local")
423    }
424
425    /// Get a SparseStore for a specific model key with an explicit default namespace.
426    ///
427    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
428    pub fn sparse_for_namespace(
429        &self,
430        model_key: &str,
431        namespace: &str,
432    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
433        if model_key.is_empty()
434            || !model_key
435                .chars()
436                .all(|c| c.is_ascii_alphanumeric() || c == '_')
437        {
438            return Err(SqliteError::InvalidData(format!(
439                "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
440                model_key
441            )));
442        }
443        if namespace.trim().is_empty() {
444            return Err(SqliteError::InvalidData(
445                "sparse store namespace must be non-empty".to_string(),
446            ));
447        }
448
449        let writer = self.pool.try_writer()?;
450        sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
451
452        Ok(Arc::new(sparse::SqliteSparseStore::new(
453            Arc::clone(&self.pool),
454            self.is_file_backed,
455            model_key.to_string(),
456            namespace.trim().to_string(),
457        )?))
458    }
459
460    /// Get a TextSearch for a specific table key.
461    ///
462    /// Creates the FTS5 virtual table if it does not already exist. Uses the
463    /// `trigram` tokenizer by default (CJK-safe, ADR-013).
464    ///
465    /// The `table_key` must contain only ASCII alphanumeric/underscore characters.
466    pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
467        self.text_with_tokenizer(table_key, "trigram")
468    }
469
470    /// Get a TextSearch with an explicit FTS5 tokenizer.
471    ///
472    /// Use when you need a tokenizer other than the default `trigram` — for
473    /// example `unicode61` for Latin-only corpora.
474    ///
475    /// Both `table_key` and `tokenizer` must contain only ASCII
476    /// alphanumeric/underscore characters.
477    pub fn text_with_tokenizer(
478        &self,
479        table_key: &str,
480        tokenizer: &str,
481    ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
482        if table_key.is_empty()
483            || !table_key
484                .chars()
485                .all(|c| c.is_ascii_alphanumeric() || c == '_')
486        {
487            return Err(SqliteError::InvalidData(format!(
488                "invalid table_key '{}': must be non-empty and contain only \
489                 alphanumeric/underscore characters",
490                table_key
491            )));
492        }
493        if tokenizer.is_empty()
494            || !tokenizer
495                .chars()
496                .all(|c| c.is_ascii_alphanumeric() || c == '_')
497        {
498            return Err(SqliteError::InvalidData(format!(
499                "invalid tokenizer '{}': must be non-empty and contain only \
500                 alphanumeric/underscore characters",
501                tokenizer
502            )));
503        }
504
505        let ddl = format!(
506            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
507             subject_id UNINDEXED, \
508             kind UNINDEXED, \
509             title, \
510             body, \
511             tags UNINDEXED, \
512             namespace UNINDEXED, \
513             metadata UNINDEXED, \
514             updated_at UNINDEXED, \
515             tokenize = '{}'\
516             )",
517            table_key, tokenizer
518        );
519        let writer = self.pool.try_writer()?;
520        writer.conn().execute_batch(&ddl)?;
521
522        Ok(Arc::new(text::Fts5TextSearch::new(
523            Arc::clone(&self.pool),
524            self.is_file_backed,
525            table_key.to_string(),
526        )))
527    }
528
529    /// Is this a file-backed backend?
530    pub fn is_file_backed(&self) -> bool {
531        self.is_file_backed
532    }
533
534    /// Access the underlying pool (escape hatch).
535    pub fn pool(&self) -> &ConnectionPool {
536        &self.pool
537    }
538
539    /// Clone the underlying pool Arc.
540    pub fn pool_arc(&self) -> Arc<ConnectionPool> {
541        Arc::clone(&self.pool)
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use khive_storage::types::{SqlStatement, SqlValue};
549
550    #[test]
551    fn memory_backend_creates_successfully() {
552        let backend = StorageBackend::memory().expect("memory backend should create");
553        assert!(!backend.is_file_backed());
554    }
555
556    #[test]
557    fn file_backend_creates_successfully() {
558        let dir = tempfile::tempdir().unwrap();
559        let path = dir.path().join("test.db");
560        let backend = StorageBackend::sqlite(&path).expect("file backend should create");
561        assert!(backend.is_file_backed());
562        assert!(path.exists());
563    }
564
565    #[tokio::test]
566    async fn sql_access_memory_roundtrip() {
567        let backend = StorageBackend::memory().unwrap();
568        let sql = backend.sql();
569
570        let mut writer = sql.writer().await.unwrap();
571        writer
572            .execute_script(
573                "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
574            )
575            .await
576            .unwrap();
577
578        let affected = writer
579            .execute(SqlStatement {
580                sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
581                params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
582                label: None,
583            })
584            .await
585            .unwrap();
586        assert_eq!(affected, 1);
587
588        let mut reader = sql.reader().await.unwrap();
589        let row = reader
590            .query_row(SqlStatement {
591                sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
592                params: vec![SqlValue::Text("row1".into())],
593                label: None,
594            })
595            .await
596            .unwrap();
597
598        let row = row.expect("should find the inserted row");
599        assert_eq!(row.columns.len(), 2);
600        match &row.columns[0].value {
601            SqlValue::Text(s) => assert_eq!(s, "row1"),
602            other => panic!("expected Text, got {other:?}"),
603        }
604        match &row.columns[1].value {
605            SqlValue::Integer(v) => assert_eq!(*v, 42),
606            other => panic!("expected Integer, got {other:?}"),
607        }
608    }
609
610    #[tokio::test]
611    async fn sql_access_file_roundtrip() {
612        let dir = tempfile::tempdir().unwrap();
613        let path = dir.path().join("test_roundtrip.db");
614        let backend = StorageBackend::sqlite(&path).unwrap();
615        let sql = backend.sql();
616
617        let mut writer = sql.writer().await.unwrap();
618        writer
619            .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
620            .await
621            .unwrap();
622        writer
623            .execute(SqlStatement {
624                sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
625                params: vec![
626                    SqlValue::Text("hello".into()),
627                    SqlValue::Text("world".into()),
628                ],
629                label: None,
630            })
631            .await
632            .unwrap();
633
634        let mut reader = sql.reader().await.unwrap();
635        let rows = reader
636            .query_all(SqlStatement {
637                sql: "SELECT k, v FROM test_f".into(),
638                params: vec![],
639                label: None,
640            })
641            .await
642            .unwrap();
643        assert_eq!(rows.len(), 1);
644        match &rows[0].columns[1].value {
645            SqlValue::Text(s) => assert_eq!(s, "world"),
646            other => panic!("expected Text, got {other:?}"),
647        }
648    }
649
650    #[tokio::test]
651    #[cfg(feature = "vectors")]
652    async fn vectors_roundtrip_via_public_api() {
653        let backend = StorageBackend::memory().unwrap();
654        let store = backend.vectors("test_api", "test_api", 3).unwrap();
655
656        let id = uuid::Uuid::new_v4();
657        store
658            .insert(
659                id,
660                khive_types::SubstrateKind::Entity,
661                "local",
662                "content",
663                vec![vec![1.0, 0.0, 0.0]],
664            )
665            .await
666            .unwrap();
667
668        let hits = store
669            .search(khive_storage::types::VectorSearchRequest {
670                query_vectors: vec![vec![1.0, 0.0, 0.0]],
671                top_k: 1,
672                namespace: None,
673                kind: None,
674                embedding_model: None,
675                filter: None,
676                backend_hints: None,
677            })
678            .await
679            .unwrap();
680
681        assert_eq!(hits.len(), 1);
682        assert_eq!(hits[0].subject_id, id);
683        assert!(hits[0].score.to_f64() > 0.99);
684    }
685
686    #[tokio::test]
687    #[cfg(feature = "vectors")]
688    async fn vectors_creates_table_idempotently() {
689        let backend = StorageBackend::memory().unwrap();
690
691        let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
692        let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();
693
694        let id = uuid::Uuid::new_v4();
695        store1
696            .insert(
697                id,
698                khive_types::SubstrateKind::Entity,
699                "local",
700                "content",
701                vec![vec![1.0, 0.0, 0.0]],
702            )
703            .await
704            .unwrap();
705
706        let count = store2.count().await.unwrap();
707        assert_eq!(count, 1);
708    }
709
710    #[tokio::test]
711    async fn text_roundtrip_via_public_api() {
712        let backend = StorageBackend::memory().unwrap();
713        let store = backend.text("test_api").unwrap();
714
715        let id = uuid::Uuid::new_v4();
716        let doc = khive_storage::types::TextDocument {
717            subject_id: id,
718            kind: khive_types::SubstrateKind::Entity,
719            title: Some("Test Title".to_string()),
720            body: "This is a searchable document about Rust.".to_string(),
721            tags: vec!["rust".to_string()],
722            namespace: "test_ns".to_string(),
723            metadata: None,
724            updated_at: chrono::Utc::now(),
725        };
726        store.upsert_document(doc).await.unwrap();
727
728        let hits = store
729            .search(khive_storage::types::TextSearchRequest {
730                query: "Rust".to_string(),
731                mode: khive_storage::types::TextQueryMode::Plain,
732                filter: Some(khive_storage::types::TextFilter {
733                    namespaces: vec!["test_ns".to_string()],
734                    ..Default::default()
735                }),
736                top_k: 1,
737                snippet_chars: 64,
738            })
739            .await
740            .unwrap();
741
742        assert_eq!(hits.len(), 1);
743        assert_eq!(hits[0].subject_id, id);
744        assert!(hits[0].score.to_f64() > 0.0);
745    }
746
747    #[tokio::test]
748    async fn text_creates_table_idempotently() {
749        let backend = StorageBackend::memory().unwrap();
750
751        let store1 = backend.text("idempotent_fts").unwrap();
752        let store2 = backend.text("idempotent_fts").unwrap();
753
754        let id = uuid::Uuid::new_v4();
755        let doc = khive_storage::types::TextDocument {
756            subject_id: id,
757            kind: khive_types::SubstrateKind::Note,
758            title: None,
759            body: "Hello world.".to_string(),
760            tags: vec![],
761            namespace: "test_ns".to_string(),
762            metadata: None,
763            updated_at: chrono::Utc::now(),
764        };
765        store1.upsert_document(doc).await.unwrap();
766
767        let count = store2
768            .count(khive_storage::types::TextFilter {
769                namespaces: vec!["test_ns".to_string()],
770                ..Default::default()
771            })
772            .await
773            .unwrap();
774        assert_eq!(count, 1);
775    }
776
777    #[test]
778    fn invalid_model_key_rejected() {
779        let backend = StorageBackend::memory().unwrap();
780        assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
781        assert!(backend.vectors("", "", 3).is_err());
782    }
783
784    #[test]
785    fn invalid_table_key_rejected() {
786        let backend = StorageBackend::memory().unwrap();
787        assert!(backend.text("bad key!").is_err());
788        assert!(backend.text("").is_err());
789    }
790
791    #[test]
792    fn apply_schema_runs_migrations_idempotently() {
793        static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
794            id: "001_init",
795            up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
796            down_sql: None,
797            is_already_applied: None,
798        }];
799        let plan = crate::migrations::ServiceSchemaPlan {
800            service: "schema_test_svc",
801            sqlite: MIGRATIONS,
802            postgres: &[],
803        };
804
805        let backend = StorageBackend::memory().unwrap();
806        backend.apply_schema(&plan).unwrap();
807        backend.apply_schema(&plan).unwrap();
808
809        let reader = backend.pool().reader().unwrap();
810        let count: i64 = reader
811            .conn()
812            .query_row(
813                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
814                [],
815                |row| row.get(0),
816            )
817            .unwrap();
818        assert_eq!(count, 1);
819    }
820}