Skip to main content

khive_db/
backend.rs

1//! Concrete storage backend providing capability traits.
2//!
3//! `StorageBackend` owns a `ConnectionPool` and provides factory methods for all
4//! capability traits (`GraphStore`, `NoteStore`, `EventStore`, `VectorStore`,
5//! `TextSearch`, `SqlAccess`). File-backed for production; in-memory for tests.
6
7use std::path::Path;
8use std::sync::Arc;
9
10use rusqlite::OptionalExtension;
11
12use crate::error::SqliteError;
13use crate::pool::{ConnectionPool, PoolConfig};
14use crate::sql_bridge::SqlBridge;
15use crate::stores::{entity, event, graph, note, sparse, text, vectors};
16
17/// Concrete storage backend providing capability traits.
18pub struct StorageBackend {
19    pool: Arc<ConnectionPool>,
20    is_file_backed: bool,
21}
22
23impl StorageBackend {
24    /// File-backed SQLite database.
25    ///
26    /// Opens (or creates) the database at `path`. The underlying pool provides
27    /// 1 writer + N readers in WAL mode for concurrent access.
28    /// No schema is applied — call `apply_schema()` for each service.
29    pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
30        crate::extension::ensure_extensions_loaded();
31        let config = PoolConfig {
32            path: Some(path.as_ref().to_path_buf()),
33            ..PoolConfig::default()
34        };
35        let pool = ConnectionPool::new(config)?;
36        Ok(Self {
37            pool: Arc::new(pool),
38            is_file_backed: true,
39        })
40    }
41
42    /// In-memory SQLite database (for tests).
43    ///
44    /// All data is lost when the backend is dropped. The pool degrades to
45    /// single-connection mode since in-memory databases cannot be shared
46    /// across multiple connections.
47    pub fn memory() -> Result<Self, SqliteError> {
48        crate::extension::ensure_extensions_loaded();
49        let config = PoolConfig {
50            path: None,
51            ..PoolConfig::default()
52        };
53        let pool = ConnectionPool::new(config)?;
54        Ok(Self {
55            pool: Arc::new(pool),
56            is_file_backed: false,
57        })
58    }
59
60    /// Get the SQL access capability.
61    ///
62    /// Returns an `Arc<dyn SqlAccess>` suitable for passing to services.
63    pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
64        Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
65    }
66
67    /// Apply a service's schema plan (run migrations).
68    ///
69    /// Each migration in the plan's `sqlite` list is applied idempotently.
70    /// Already-applied migrations are skipped. The `_schema_versions` table
71    /// tracks which migrations have been run.
72    pub fn apply_schema(
73        &self,
74        plan: &crate::migrations::ServiceSchemaPlan,
75    ) -> Result<(), SqliteError> {
76        let writer = self.pool.try_writer()?;
77        crate::migrations::apply_schema_plan(writer.conn(), plan)
78    }
79
80    /// Apply pack-auxiliary DDL statements.
81    ///
82    /// Executes each DDL statement idempotently via `execute_batch`. Each
83    /// statement MUST be self-contained and use `CREATE TABLE IF NOT EXISTS`
84    /// (or equivalent idempotent DDL) so that calling this method more than
85    /// once does not fail.
86    ///
87    /// Pack auxiliary tables are NOT tracked in `_schema_versions` — they are
88    /// non-versioned. Use `apply_schema` with a `ServiceSchemaPlan` when version
89    /// tracking is needed.
90    ///
91    /// This method is lower-level than `PackRuntime::schema_plan()` — the
92    /// runtime bootstrap calls `pack.schema_plan().statements` and passes the
93    /// slice here. The `SchemaPlan` type lives in `khive-runtime` (above this
94    /// crate in the dep chain); this method accepts a plain `&[&'static str]`
95    /// to avoid a circular dependency.
96    pub fn apply_pack_ddl_statements(
97        &self,
98        statements: &[&'static str],
99    ) -> Result<(), SqliteError> {
100        let writer = self.pool.try_writer()?;
101        for &stmt in statements {
102            writer.conn().execute_batch(stmt)?;
103        }
104        Ok(())
105    }
106
107    /// Get an EntityStore. Applies the entities DDL if not already present.
108    ///
109    /// Idempotent — safe to call multiple times.
110    pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
111        self.entities_for_namespace("local")
112    }
113
114    /// Get an EntityStore. The namespace parameter is validated (non-empty) and
115    /// the entities schema is applied, but the store itself is unscoped — namespace
116    /// is the caller's responsibility on each query/delete call.
117    pub fn entities_for_namespace(
118        &self,
119        namespace: &str,
120    ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
121        if namespace.trim().is_empty() {
122            return Err(SqliteError::InvalidData(
123                "entities namespace must be non-empty".to_string(),
124            ));
125        }
126        let writer = self.pool.try_writer()?;
127        entity::ensure_entities_schema(writer.conn())?;
128
129        Ok(Arc::new(entity::SqlEntityStore::new(
130            Arc::clone(&self.pool),
131            self.is_file_backed,
132        )))
133    }
134
135    /// Get a GraphStore for the default namespace.
136    ///
137    /// Creates the `graph_edges` table (with indexes) if it does not already
138    /// exist. Idempotent — safe to call multiple times.
139    pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
140        self.graph_for_namespace("local")
141    }
142
143    /// Get a GraphStore scoped to a namespace.
144    pub fn graph_for_namespace(
145        &self,
146        namespace: &str,
147    ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
148        if namespace.trim().is_empty() {
149            return Err(SqliteError::InvalidData(
150                "graph namespace must be non-empty".to_string(),
151            ));
152        }
153        let writer = self.pool.try_writer()?;
154        graph::ensure_graph_schema(writer.conn())?;
155
156        Ok(Arc::new(graph::SqlGraphStore::new_scoped(
157            Arc::clone(&self.pool),
158            self.is_file_backed,
159            namespace.trim().to_string(),
160        )))
161    }
162
163    /// Get a NoteStore. Applies the notes DDL if not already present.
164    ///
165    /// Idempotent — safe to call multiple times.
166    pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
167        self.notes_for_namespace("local")
168    }
169
170    /// Get a NoteStore. The namespace parameter is validated (non-empty) and
171    /// the notes schema is applied, but the store itself is unscoped — namespace
172    /// is the caller's responsibility on each query/delete call.
173    pub fn notes_for_namespace(
174        &self,
175        namespace: &str,
176    ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
177        if namespace.trim().is_empty() {
178            return Err(SqliteError::InvalidData(
179                "notes namespace must be non-empty".to_string(),
180            ));
181        }
182        let writer = self.pool.try_writer()?;
183        note::ensure_notes_schema(writer.conn())?;
184
185        Ok(Arc::new(note::SqlNoteStore::new(
186            Arc::clone(&self.pool),
187            self.is_file_backed,
188        )))
189    }
190
191    /// Get an EventStore for the default namespace.
192    ///
193    /// Creates the `events` table (with indexes) if it does not already exist.
194    /// Idempotent — safe to call multiple times.
195    pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
196        self.events_for_namespace("local")
197    }
198
199    /// Get an EventStore scoped to a namespace.
200    pub fn events_for_namespace(
201        &self,
202        namespace: &str,
203    ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
204        if namespace.trim().is_empty() {
205            return Err(SqliteError::InvalidData(
206                "events namespace must be non-empty".to_string(),
207            ));
208        }
209        let writer = self.pool.try_writer()?;
210        event::ensure_events_schema(writer.conn())?;
211
212        Ok(Arc::new(event::SqlEventStore::new_scoped(
213            Arc::clone(&self.pool),
214            self.is_file_backed,
215            namespace.trim().to_string(),
216        )))
217    }
218
219    /// Get a VectorStore for a specific embedding model, scoped to the default namespace.
220    ///
221    /// Creates the vec0 virtual table if it does not already exist. The `model_key`
222    /// must contain only ASCII alphanumeric/underscore characters. The `embedding_model`
223    /// is the canonical display name stored in each vector row.
224    pub fn vectors(
225        &self,
226        model_key: &str,
227        embedding_model: &str,
228        dimensions: usize,
229    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
230        self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
231    }
232
233    /// Get a VectorStore for a specific embedding model with a default namespace.
234    ///
235    /// Creates the vec0 virtual table if it does not already exist. The `namespace`
236    /// is a default for trait methods that lack a per-call namespace parameter
237    /// (count, delete, info). Access control is enforced at the runtime layer.
238    ///
239    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
240    /// The `embedding_model` is the canonical display name stored in the `embedding_model`
241    /// column of each vector row (e.g. `"all-minilm-l6-v2"`).
242    pub fn vectors_for_namespace(
243        &self,
244        model_key: &str,
245        embedding_model: &str,
246        dimensions: usize,
247        namespace: &str,
248    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
249        if model_key.is_empty()
250            || !model_key
251                .chars()
252                .all(|c| c.is_ascii_alphanumeric() || c == '_')
253        {
254            return Err(SqliteError::InvalidData(format!(
255                "invalid model_key '{}': must be non-empty and contain only \
256                 alphanumeric/underscore characters",
257                model_key
258            )));
259        }
260        if namespace.trim().is_empty() {
261            return Err(SqliteError::InvalidData(
262                "vector store namespace must be non-empty".to_string(),
263            ));
264        }
265
266        // Ensure sqlite-vec is registered before creating vec0 tables.
267        crate::extension::ensure_extensions_loaded();
268
269        let table = format!("vec_{}", model_key);
270        let writer = self.pool.try_writer()?;
271
272        // Detect old-schema vec0 tables that predate the `field` column.
273        // vec0 virtual tables do not support ALTER TABLE, so we must drop and recreate
274        // the table if it exists without the `field` column. Vector data is a cache —
275        // callers can re-embed from the source record after the table is rebuilt.
276        // Use pragma_table_info to check columns directly; substring matching on the
277        // CREATE DDL is fragile (a model_key containing "field" would false-match).
278        let table_exists: bool = writer
279            .conn()
280            .query_row(
281                "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
282                rusqlite::params![&table],
283                |row| row.get::<_, i64>(0),
284            )
285            .optional()
286            .map_err(SqliteError::Rusqlite)?
287            .is_some();
288
289        if table_exists {
290            // V17 migration (vector_embedding_model_tag_preserving_rebuild) adds
291            // `field` and `embedding_model` to all pre-existing vec0 tables at
292            // migration time.  If this table still lacks either column post-migration
293            // that indicates the database was not migrated — return a hard error
294            // rather than silently dropping data.
295            let pragma = format!("PRAGMA table_xinfo({})", table);
296            let mut stmt = writer.conn().prepare(&pragma)?;
297            let mut rows = stmt.query([])?;
298            let mut has_field = false;
299            let mut has_embedding_model = false;
300            while let Some(row) = rows.next()? {
301                let name: String = row.get(1)?;
302                if name == "field" {
303                    has_field = true;
304                }
305                if name == "embedding_model" {
306                    has_embedding_model = true;
307                }
308            }
309            if !has_field || !has_embedding_model {
310                return Err(SqliteError::InvalidData(format!(
311                    "vec0 table '{}' is missing required column(s) (field={}, \
312                     embedding_model={}); this is a pre-v0.2.8 vector schema and is \
313                     not supported — recreate the database",
314                    table, has_field, has_embedding_model,
315                )));
316            }
317        }
318
319        // Ensure the _embedding_models registry table exists.
320        // This is a no-op when the table already exists. Running it here ensures
321        // the registry is present for any caller that opens a vector store without
322        // first calling run_migrations() (e.g., tests that create stores directly).
323        // Production callers are expected to call run_migrations() at startup, which
324        // creates the registry via V14; this is a belt-and-suspenders fallback.
325        // Schema is defined in `migrations::EMBEDDING_MODELS_DDL` (single source of
326        // truth) to prevent the two copies from silently drifting.
327        writer
328            .conn()
329            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
330
331        // Create the vec0 virtual table. Idempotent on fresh databases and after the
332        // old-schema rebuild above.
333        let ddl = format!(
334            "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
335             subject_id TEXT PRIMARY KEY, \
336             namespace TEXT NOT NULL, \
337             kind TEXT NOT NULL, \
338             field TEXT NOT NULL, \
339             embedding_model TEXT NOT NULL, \
340             embedding float[{}] distance_metric=cosine\
341             )",
342            model_key, dimensions
343        );
344        writer.conn().execute_batch(&ddl)?;
345
346        Ok(Arc::new(vectors::SqliteVecStore::new(
347            Arc::clone(&self.pool),
348            self.is_file_backed,
349            model_key.to_string(),
350            embedding_model.to_string(),
351            dimensions,
352            namespace.trim().to_string(),
353        )?))
354    }
355
356    /// Register an embedding model in the `_embedding_models` registry table.
357    ///
358    /// Idempotent: if a row with the same `canonical_key` already exists, updates its
359    /// status back to `'active'` without changing other fields.
360    pub fn register_embedding_model(
361        &self,
362        engine_name: &str,
363        model_id: &str,
364        key_version: &str,
365        dimensions: u32,
366    ) -> Result<(), SqliteError> {
367        let writer = self.pool.try_writer()?;
368        writer
369            .conn()
370            .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
371
372        let now = chrono::Utc::now().timestamp_micros();
373        let canonical_key =
374            format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
375        let id = uuid::Uuid::new_v4();
376        writer.conn().execute(
377            "INSERT INTO _embedding_models \
378             (id, engine_name, model_id, key_version, dim, output_dim, status, \
379              activated_at, superseded_at, superseded_by, canonical_key, created_at) \
380             VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
381             ON CONFLICT(canonical_key) DO UPDATE SET \
382                status = 'active', \
383                activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
384            rusqlite::params![
385                id.as_bytes().as_slice(),
386                engine_name,
387                model_id,
388                key_version,
389                dimensions as i64,
390                now,
391                canonical_key,
392                now,
393            ],
394        )?;
395        Ok(())
396    }
397
398    /// Get a SparseStore for a specific model key, scoped to the default namespace.
399    ///
400    /// Creates the sparse table if it does not already exist.
401    pub fn sparse(
402        &self,
403        model_key: &str,
404    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
405        self.sparse_for_namespace(model_key, "local")
406    }
407
408    /// Get a SparseStore for a specific model key with an explicit default namespace.
409    ///
410    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
411    pub fn sparse_for_namespace(
412        &self,
413        model_key: &str,
414        namespace: &str,
415    ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
416        if model_key.is_empty()
417            || !model_key
418                .chars()
419                .all(|c| c.is_ascii_alphanumeric() || c == '_')
420        {
421            return Err(SqliteError::InvalidData(format!(
422                "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
423                model_key
424            )));
425        }
426        if namespace.trim().is_empty() {
427            return Err(SqliteError::InvalidData(
428                "sparse store namespace must be non-empty".to_string(),
429            ));
430        }
431
432        let writer = self.pool.try_writer()?;
433        sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
434
435        Ok(Arc::new(sparse::SqliteSparseStore::new(
436            Arc::clone(&self.pool),
437            self.is_file_backed,
438            model_key.to_string(),
439            namespace.trim().to_string(),
440        )?))
441    }
442
443    /// Get a TextSearch for a specific table key.
444    ///
445    /// Creates the FTS5 virtual table if it does not already exist. Uses the
446    /// `trigram` tokenizer by default (CJK-safe).
447    ///
448    /// The `table_key` must contain only ASCII alphanumeric/underscore characters.
449    pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
450        self.text_with_tokenizer(table_key, "trigram")
451    }
452
453    /// Get a TextSearch with an explicit FTS5 tokenizer.
454    ///
455    /// Use when you need a tokenizer other than the default `trigram` — for
456    /// example `unicode61` for Latin-only corpora.
457    ///
458    /// Both `table_key` and `tokenizer` must contain only ASCII
459    /// alphanumeric/underscore characters.
460    pub fn text_with_tokenizer(
461        &self,
462        table_key: &str,
463        tokenizer: &str,
464    ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
465        if table_key.is_empty()
466            || !table_key
467                .chars()
468                .all(|c| c.is_ascii_alphanumeric() || c == '_')
469        {
470            return Err(SqliteError::InvalidData(format!(
471                "invalid table_key '{}': must be non-empty and contain only \
472                 alphanumeric/underscore characters",
473                table_key
474            )));
475        }
476        if tokenizer.is_empty()
477            || !tokenizer
478                .chars()
479                .all(|c| c.is_ascii_alphanumeric() || c == '_')
480        {
481            return Err(SqliteError::InvalidData(format!(
482                "invalid tokenizer '{}': must be non-empty and contain only \
483                 alphanumeric/underscore characters",
484                tokenizer
485            )));
486        }
487
488        let ddl = format!(
489            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
490             subject_id UNINDEXED, \
491             kind UNINDEXED, \
492             title, \
493             body, \
494             tags UNINDEXED, \
495             namespace UNINDEXED, \
496             metadata UNINDEXED, \
497             updated_at UNINDEXED, \
498             tokenize = '{}'\
499             )",
500            table_key, tokenizer
501        );
502        let writer = self.pool.try_writer()?;
503        writer.conn().execute_batch(&ddl)?;
504
505        Ok(Arc::new(text::Fts5TextSearch::new(
506            Arc::clone(&self.pool),
507            self.is_file_backed,
508            table_key.to_string(),
509        )))
510    }
511
512    /// Is this a file-backed backend?
513    pub fn is_file_backed(&self) -> bool {
514        self.is_file_backed
515    }
516
517    /// Access the underlying pool (escape hatch).
518    pub fn pool(&self) -> &ConnectionPool {
519        &self.pool
520    }
521
522    /// Clone the underlying pool Arc.
523    pub fn pool_arc(&self) -> Arc<ConnectionPool> {
524        Arc::clone(&self.pool)
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use khive_storage::types::{SqlStatement, SqlValue};
532
533    #[test]
534    fn memory_backend_creates_successfully() {
535        let backend = StorageBackend::memory().expect("memory backend should create");
536        assert!(!backend.is_file_backed());
537    }
538
539    #[test]
540    fn file_backend_creates_successfully() {
541        let dir = tempfile::tempdir().unwrap();
542        let path = dir.path().join("test.db");
543        let backend = StorageBackend::sqlite(&path).expect("file backend should create");
544        assert!(backend.is_file_backed());
545        assert!(path.exists());
546    }
547
548    #[tokio::test]
549    async fn sql_access_memory_roundtrip() {
550        let backend = StorageBackend::memory().unwrap();
551        let sql = backend.sql();
552
553        let mut writer = sql.writer().await.unwrap();
554        writer
555            .execute_script(
556                "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
557            )
558            .await
559            .unwrap();
560
561        let affected = writer
562            .execute(SqlStatement {
563                sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
564                params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
565                label: None,
566            })
567            .await
568            .unwrap();
569        assert_eq!(affected, 1);
570
571        let mut reader = sql.reader().await.unwrap();
572        let row = reader
573            .query_row(SqlStatement {
574                sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
575                params: vec![SqlValue::Text("row1".into())],
576                label: None,
577            })
578            .await
579            .unwrap();
580
581        let row = row.expect("should find the inserted row");
582        assert_eq!(row.columns.len(), 2);
583        match &row.columns[0].value {
584            SqlValue::Text(s) => assert_eq!(s, "row1"),
585            other => panic!("expected Text, got {other:?}"),
586        }
587        match &row.columns[1].value {
588            SqlValue::Integer(v) => assert_eq!(*v, 42),
589            other => panic!("expected Integer, got {other:?}"),
590        }
591    }
592
593    #[tokio::test]
594    async fn sql_access_file_roundtrip() {
595        let dir = tempfile::tempdir().unwrap();
596        let path = dir.path().join("test_roundtrip.db");
597        let backend = StorageBackend::sqlite(&path).unwrap();
598        let sql = backend.sql();
599
600        let mut writer = sql.writer().await.unwrap();
601        writer
602            .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
603            .await
604            .unwrap();
605        writer
606            .execute(SqlStatement {
607                sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
608                params: vec![
609                    SqlValue::Text("hello".into()),
610                    SqlValue::Text("world".into()),
611                ],
612                label: None,
613            })
614            .await
615            .unwrap();
616
617        let mut reader = sql.reader().await.unwrap();
618        let rows = reader
619            .query_all(SqlStatement {
620                sql: "SELECT k, v FROM test_f".into(),
621                params: vec![],
622                label: None,
623            })
624            .await
625            .unwrap();
626        assert_eq!(rows.len(), 1);
627        match &rows[0].columns[1].value {
628            SqlValue::Text(s) => assert_eq!(s, "world"),
629            other => panic!("expected Text, got {other:?}"),
630        }
631    }
632
633    #[tokio::test]
634    #[cfg(feature = "vectors")]
635    async fn vectors_roundtrip_via_public_api() {
636        let backend = StorageBackend::memory().unwrap();
637        let store = backend.vectors("test_api", "test_api", 3).unwrap();
638
639        let id = uuid::Uuid::new_v4();
640        store
641            .insert(
642                id,
643                khive_types::SubstrateKind::Entity,
644                "local",
645                "content",
646                vec![vec![1.0, 0.0, 0.0]],
647            )
648            .await
649            .unwrap();
650
651        let hits = store
652            .search(khive_storage::types::VectorSearchRequest {
653                query_vectors: vec![vec![1.0, 0.0, 0.0]],
654                top_k: 1,
655                namespace: None,
656                kind: None,
657                embedding_model: None,
658                filter: None,
659                backend_hints: None,
660            })
661            .await
662            .unwrap();
663
664        assert_eq!(hits.len(), 1);
665        assert_eq!(hits[0].subject_id, id);
666        assert!(hits[0].score.to_f64() > 0.99);
667    }
668
669    #[tokio::test]
670    #[cfg(feature = "vectors")]
671    async fn vectors_creates_table_idempotently() {
672        let backend = StorageBackend::memory().unwrap();
673
674        let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
675        let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();
676
677        let id = uuid::Uuid::new_v4();
678        store1
679            .insert(
680                id,
681                khive_types::SubstrateKind::Entity,
682                "local",
683                "content",
684                vec![vec![1.0, 0.0, 0.0]],
685            )
686            .await
687            .unwrap();
688
689        let count = store2.count().await.unwrap();
690        assert_eq!(count, 1);
691    }
692
693    #[tokio::test]
694    async fn text_roundtrip_via_public_api() {
695        let backend = StorageBackend::memory().unwrap();
696        let store = backend.text("test_api").unwrap();
697
698        let id = uuid::Uuid::new_v4();
699        let doc = khive_storage::types::TextDocument {
700            subject_id: id,
701            kind: khive_types::SubstrateKind::Entity,
702            title: Some("Test Title".to_string()),
703            body: "This is a searchable document about Rust.".to_string(),
704            tags: vec!["rust".to_string()],
705            namespace: "test_ns".to_string(),
706            metadata: None,
707            updated_at: chrono::Utc::now(),
708        };
709        store.upsert_document(doc).await.unwrap();
710
711        let hits = store
712            .search(khive_storage::types::TextSearchRequest {
713                query: "Rust".to_string(),
714                mode: khive_storage::types::TextQueryMode::Plain,
715                filter: Some(khive_storage::types::TextFilter {
716                    namespaces: vec!["test_ns".to_string()],
717                    ..Default::default()
718                }),
719                top_k: 1,
720                snippet_chars: 64,
721            })
722            .await
723            .unwrap();
724
725        assert_eq!(hits.len(), 1);
726        assert_eq!(hits[0].subject_id, id);
727        assert!(hits[0].score.to_f64() > 0.0);
728    }
729
730    #[tokio::test]
731    async fn text_creates_table_idempotently() {
732        let backend = StorageBackend::memory().unwrap();
733
734        let store1 = backend.text("idempotent_fts").unwrap();
735        let store2 = backend.text("idempotent_fts").unwrap();
736
737        let id = uuid::Uuid::new_v4();
738        let doc = khive_storage::types::TextDocument {
739            subject_id: id,
740            kind: khive_types::SubstrateKind::Note,
741            title: None,
742            body: "Hello world.".to_string(),
743            tags: vec![],
744            namespace: "test_ns".to_string(),
745            metadata: None,
746            updated_at: chrono::Utc::now(),
747        };
748        store1.upsert_document(doc).await.unwrap();
749
750        let count = store2
751            .count(khive_storage::types::TextFilter {
752                namespaces: vec!["test_ns".to_string()],
753                ..Default::default()
754            })
755            .await
756            .unwrap();
757        assert_eq!(count, 1);
758    }
759
760    #[test]
761    fn invalid_model_key_rejected() {
762        let backend = StorageBackend::memory().unwrap();
763        assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
764        assert!(backend.vectors("", "", 3).is_err());
765    }
766
767    #[test]
768    fn invalid_table_key_rejected() {
769        let backend = StorageBackend::memory().unwrap();
770        assert!(backend.text("bad key!").is_err());
771        assert!(backend.text("").is_err());
772    }
773
774    #[test]
775    fn apply_schema_runs_migrations_idempotently() {
776        static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
777            id: "001_init",
778            up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
779            down_sql: None,
780            is_already_applied: None,
781        }];
782        let plan = crate::migrations::ServiceSchemaPlan {
783            service: "schema_test_svc",
784            sqlite: MIGRATIONS,
785            postgres: &[],
786        };
787
788        let backend = StorageBackend::memory().unwrap();
789        backend.apply_schema(&plan).unwrap();
790        backend.apply_schema(&plan).unwrap();
791
792        let reader = backend.pool().reader().unwrap();
793        let count: i64 = reader
794            .conn()
795            .query_row(
796                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
797                [],
798                |row| row.get(0),
799            )
800            .unwrap();
801        assert_eq!(count, 1);
802    }
803}