Skip to main content

khive_db/
backend.rs

1//! Concrete storage backend providing capability traits.
2//!
3//! `StorageBackend` owns a `ConnectionPool` and provides factory methods for
4//! all storage capability traits (`GraphStore`, `NoteStore`, `EventStore`,
5//! `VectorStore`, `TextSearch`, `SqlAccess`). Services obtain capability handles
6//! without depending on the pool directly.
7//!
8//! # Modes
9//!
10//! - **File-backed** (`StorageBackend::sqlite(path)`): Production use. Opens (or
11//!   creates) the database at the given path. Readers get standalone connections
12//!   for high concurrency.
13//! - **In-memory** (`StorageBackend::memory()`): Testing use. A single shared
14//!   connection through the pool. All data is lost when the backend is dropped.
15//!
16//! # Schema ownership
17//!
18//! `StorageBackend` creates a **bare** pool with no global schema. Each factory
19//! method (`graph()`, `notes()`, etc.) applies only the DDL for its own tables.
20//! Call `apply_schema()` to run service-specific migrations.
21
22use std::path::Path;
23use std::sync::Arc;
24
25use crate::error::SqliteError;
26use crate::pool::{ConnectionPool, PoolConfig};
27use crate::sql_bridge::SqlBridge;
28use crate::stores::{entity, event, graph, note, text, vectors};
29
30/// Concrete storage backend providing capability traits.
31pub struct StorageBackend {
32    pool: Arc<ConnectionPool>,
33    is_file_backed: bool,
34}
35
36impl StorageBackend {
37    /// File-backed SQLite database.
38    ///
39    /// Opens (or creates) the database at `path`. The underlying pool provides
40    /// 1 writer + N readers in WAL mode for concurrent access.
41    /// No schema is applied — call `apply_schema()` for each service.
42    pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
43        crate::extension::ensure_extensions_loaded();
44        let config = PoolConfig {
45            path: Some(path.as_ref().to_path_buf()),
46            ..PoolConfig::default()
47        };
48        let pool = ConnectionPool::new(config)?;
49        Ok(Self {
50            pool: Arc::new(pool),
51            is_file_backed: true,
52        })
53    }
54
55    /// In-memory SQLite database (for tests).
56    ///
57    /// All data is lost when the backend is dropped. The pool degrades to
58    /// single-connection mode since in-memory databases cannot be shared
59    /// across multiple connections.
60    pub fn memory() -> Result<Self, SqliteError> {
61        crate::extension::ensure_extensions_loaded();
62        let config = PoolConfig {
63            path: None,
64            ..PoolConfig::default()
65        };
66        let pool = ConnectionPool::new(config)?;
67        Ok(Self {
68            pool: Arc::new(pool),
69            is_file_backed: false,
70        })
71    }
72
73    /// Get the SQL access capability.
74    ///
75    /// Returns an `Arc<dyn SqlAccess>` suitable for passing to services.
76    pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
77        Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
78    }
79
80    /// Apply a service's schema plan (run migrations).
81    ///
82    /// Each migration in the plan's `sqlite` list is applied idempotently.
83    /// Already-applied migrations are skipped. The `_schema_versions` table
84    /// tracks which migrations have been run.
85    pub fn apply_schema(
86        &self,
87        plan: &crate::migrations::ServiceSchemaPlan,
88    ) -> Result<(), SqliteError> {
89        let writer = self.pool.try_writer()?;
90        crate::migrations::apply_schema_plan(writer.conn(), plan)
91    }
92
93    /// Get an EntityStore. Applies the entities DDL if not already present.
94    ///
95    /// Idempotent — safe to call multiple times.
96    pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
97        self.entities_for_namespace("local")
98    }
99
100    /// Get an EntityStore. The namespace parameter is validated (non-empty) and
101    /// the entities schema is applied, but the store itself is unscoped — namespace
102    /// is the caller's responsibility on each query/delete call.
103    pub fn entities_for_namespace(
104        &self,
105        namespace: &str,
106    ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
107        if namespace.trim().is_empty() {
108            return Err(SqliteError::InvalidData(
109                "entities namespace must be non-empty".to_string(),
110            ));
111        }
112        let writer = self.pool.try_writer()?;
113        entity::ensure_entities_schema(writer.conn())?;
114
115        Ok(Arc::new(entity::SqlEntityStore::new(
116            Arc::clone(&self.pool),
117            self.is_file_backed,
118        )))
119    }
120
121    /// Get a GraphStore for the default namespace.
122    ///
123    /// Creates the `graph_edges` table (with indexes) if it does not already
124    /// exist. Idempotent — safe to call multiple times.
125    pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
126        self.graph_for_namespace("local")
127    }
128
129    /// Get a GraphStore scoped to a namespace.
130    pub fn graph_for_namespace(
131        &self,
132        namespace: &str,
133    ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
134        if namespace.trim().is_empty() {
135            return Err(SqliteError::InvalidData(
136                "graph namespace must be non-empty".to_string(),
137            ));
138        }
139        let writer = self.pool.try_writer()?;
140        graph::ensure_graph_schema(writer.conn())?;
141
142        Ok(Arc::new(graph::SqlGraphStore::new_scoped(
143            Arc::clone(&self.pool),
144            self.is_file_backed,
145            namespace.trim().to_string(),
146        )))
147    }
148
149    /// Get a NoteStore. Applies the notes DDL if not already present.
150    ///
151    /// Idempotent — safe to call multiple times.
152    pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
153        self.notes_for_namespace("local")
154    }
155
156    /// Get a NoteStore. The namespace parameter is validated (non-empty) and
157    /// the notes schema is applied, but the store itself is unscoped — namespace
158    /// is the caller's responsibility on each query/delete call.
159    pub fn notes_for_namespace(
160        &self,
161        namespace: &str,
162    ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
163        if namespace.trim().is_empty() {
164            return Err(SqliteError::InvalidData(
165                "notes namespace must be non-empty".to_string(),
166            ));
167        }
168        let writer = self.pool.try_writer()?;
169        note::ensure_notes_schema(writer.conn())?;
170
171        Ok(Arc::new(note::SqlNoteStore::new(
172            Arc::clone(&self.pool),
173            self.is_file_backed,
174        )))
175    }
176
177    /// Get an EventStore for the default namespace.
178    ///
179    /// Creates the `events` table (with indexes) if it does not already exist.
180    /// Idempotent — safe to call multiple times.
181    pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
182        self.events_for_namespace("local")
183    }
184
185    /// Get an EventStore scoped to a namespace.
186    pub fn events_for_namespace(
187        &self,
188        namespace: &str,
189    ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
190        if namespace.trim().is_empty() {
191            return Err(SqliteError::InvalidData(
192                "events namespace must be non-empty".to_string(),
193            ));
194        }
195        let writer = self.pool.try_writer()?;
196        event::ensure_events_schema(writer.conn())?;
197
198        Ok(Arc::new(event::SqlEventStore::new_scoped(
199            Arc::clone(&self.pool),
200            self.is_file_backed,
201            namespace.trim().to_string(),
202        )))
203    }
204
205    /// Get a VectorStore for a specific embedding model, scoped to the default namespace.
206    ///
207    /// Creates the vec0 virtual table if it does not already exist. The `model_key`
208    /// must contain only ASCII alphanumeric/underscore characters.
209    pub fn vectors(
210        &self,
211        model_key: &str,
212        dimensions: usize,
213    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
214        self.vectors_for_namespace(model_key, dimensions, "local")
215    }
216
217    /// Get a VectorStore for a specific embedding model with a default namespace.
218    ///
219    /// Creates the vec0 virtual table if it does not already exist. The `namespace`
220    /// is a default for trait methods that lack a per-call namespace parameter
221    /// (count, delete, info). Access control is enforced at the runtime layer.
222    ///
223    /// The `model_key` must contain only ASCII alphanumeric/underscore characters.
224    pub fn vectors_for_namespace(
225        &self,
226        model_key: &str,
227        dimensions: usize,
228        namespace: &str,
229    ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
230        if model_key.is_empty()
231            || !model_key
232                .chars()
233                .all(|c| c.is_ascii_alphanumeric() || c == '_')
234        {
235            return Err(SqliteError::InvalidData(format!(
236                "invalid model_key '{}': must be non-empty and contain only \
237                 alphanumeric/underscore characters",
238                model_key
239            )));
240        }
241        if namespace.trim().is_empty() {
242            return Err(SqliteError::InvalidData(
243                "vector store namespace must be non-empty".to_string(),
244            ));
245        }
246
247        // Ensure sqlite-vec is registered before creating vec0 tables.
248        crate::extension::ensure_extensions_loaded();
249
250        // Create the vec0 virtual table. Idempotent.
251        let ddl = format!(
252            "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
253             subject_id TEXT PRIMARY KEY, \
254             namespace TEXT NOT NULL, \
255             kind TEXT NOT NULL, \
256             embedding float[{}] distance_metric=cosine\
257             )",
258            model_key, dimensions
259        );
260        let writer = self.pool.try_writer()?;
261        writer.conn().execute_batch(&ddl)?;
262
263        Ok(Arc::new(vectors::SqliteVecStore::new(
264            Arc::clone(&self.pool),
265            self.is_file_backed,
266            model_key.to_string(),
267            dimensions,
268            namespace.trim().to_string(),
269        )?))
270    }
271
272    /// Get a TextSearch for a specific table key.
273    ///
274    /// Creates the FTS5 virtual table if it does not already exist. Uses the
275    /// `trigram` tokenizer by default (CJK-safe, ADR-013).
276    ///
277    /// The `table_key` must contain only ASCII alphanumeric/underscore characters.
278    pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
279        self.text_with_tokenizer(table_key, "trigram")
280    }
281
282    /// Get a TextSearch with an explicit FTS5 tokenizer.
283    ///
284    /// Use when you need a tokenizer other than the default `trigram` — for
285    /// example `unicode61` for Latin-only corpora.
286    ///
287    /// Both `table_key` and `tokenizer` must contain only ASCII
288    /// alphanumeric/underscore characters.
289    pub fn text_with_tokenizer(
290        &self,
291        table_key: &str,
292        tokenizer: &str,
293    ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
294        if table_key.is_empty()
295            || !table_key
296                .chars()
297                .all(|c| c.is_ascii_alphanumeric() || c == '_')
298        {
299            return Err(SqliteError::InvalidData(format!(
300                "invalid table_key '{}': must be non-empty and contain only \
301                 alphanumeric/underscore characters",
302                table_key
303            )));
304        }
305        if tokenizer.is_empty()
306            || !tokenizer
307                .chars()
308                .all(|c| c.is_ascii_alphanumeric() || c == '_')
309        {
310            return Err(SqliteError::InvalidData(format!(
311                "invalid tokenizer '{}': must be non-empty and contain only \
312                 alphanumeric/underscore characters",
313                tokenizer
314            )));
315        }
316
317        let ddl = format!(
318            "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
319             subject_id UNINDEXED, \
320             kind UNINDEXED, \
321             title, \
322             body, \
323             tags UNINDEXED, \
324             namespace UNINDEXED, \
325             metadata UNINDEXED, \
326             updated_at UNINDEXED, \
327             tokenize = '{}'\
328             )",
329            table_key, tokenizer
330        );
331        let writer = self.pool.try_writer()?;
332        writer.conn().execute_batch(&ddl)?;
333
334        Ok(Arc::new(text::Fts5TextSearch::new(
335            Arc::clone(&self.pool),
336            self.is_file_backed,
337            table_key.to_string(),
338        )))
339    }
340
341    /// Is this a file-backed backend?
342    pub fn is_file_backed(&self) -> bool {
343        self.is_file_backed
344    }
345
346    /// Access the underlying pool (escape hatch).
347    pub fn pool(&self) -> &ConnectionPool {
348        &self.pool
349    }
350
351    /// Clone the underlying pool Arc.
352    pub fn pool_arc(&self) -> Arc<ConnectionPool> {
353        Arc::clone(&self.pool)
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use khive_storage::types::{SqlStatement, SqlValue};
361
362    #[test]
363    fn memory_backend_creates_successfully() {
364        let backend = StorageBackend::memory().expect("memory backend should create");
365        assert!(!backend.is_file_backed());
366    }
367
368    #[test]
369    fn file_backend_creates_successfully() {
370        let dir = tempfile::tempdir().unwrap();
371        let path = dir.path().join("test.db");
372        let backend = StorageBackend::sqlite(&path).expect("file backend should create");
373        assert!(backend.is_file_backed());
374        assert!(path.exists());
375    }
376
377    #[tokio::test]
378    async fn sql_access_memory_roundtrip() {
379        let backend = StorageBackend::memory().unwrap();
380        let sql = backend.sql();
381
382        let mut writer = sql.writer().await.unwrap();
383        writer
384            .execute_script(
385                "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
386            )
387            .await
388            .unwrap();
389
390        let affected = writer
391            .execute(SqlStatement {
392                sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
393                params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
394                label: None,
395            })
396            .await
397            .unwrap();
398        assert_eq!(affected, 1);
399
400        let mut reader = sql.reader().await.unwrap();
401        let row = reader
402            .query_row(SqlStatement {
403                sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
404                params: vec![SqlValue::Text("row1".into())],
405                label: None,
406            })
407            .await
408            .unwrap();
409
410        let row = row.expect("should find the inserted row");
411        assert_eq!(row.columns.len(), 2);
412        match &row.columns[0].value {
413            SqlValue::Text(s) => assert_eq!(s, "row1"),
414            other => panic!("expected Text, got {other:?}"),
415        }
416        match &row.columns[1].value {
417            SqlValue::Integer(v) => assert_eq!(*v, 42),
418            other => panic!("expected Integer, got {other:?}"),
419        }
420    }
421
422    #[tokio::test]
423    async fn sql_access_file_roundtrip() {
424        let dir = tempfile::tempdir().unwrap();
425        let path = dir.path().join("test_roundtrip.db");
426        let backend = StorageBackend::sqlite(&path).unwrap();
427        let sql = backend.sql();
428
429        let mut writer = sql.writer().await.unwrap();
430        writer
431            .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
432            .await
433            .unwrap();
434        writer
435            .execute(SqlStatement {
436                sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
437                params: vec![
438                    SqlValue::Text("hello".into()),
439                    SqlValue::Text("world".into()),
440                ],
441                label: None,
442            })
443            .await
444            .unwrap();
445
446        let mut reader = sql.reader().await.unwrap();
447        let rows = reader
448            .query_all(SqlStatement {
449                sql: "SELECT k, v FROM test_f".into(),
450                params: vec![],
451                label: None,
452            })
453            .await
454            .unwrap();
455        assert_eq!(rows.len(), 1);
456        match &rows[0].columns[1].value {
457            SqlValue::Text(s) => assert_eq!(s, "world"),
458            other => panic!("expected Text, got {other:?}"),
459        }
460    }
461
462    #[tokio::test]
463    #[cfg(feature = "vectors")]
464    async fn vectors_roundtrip_via_public_api() {
465        let backend = StorageBackend::memory().unwrap();
466        let store = backend.vectors("test_api", 3).unwrap();
467
468        let id = uuid::Uuid::new_v4();
469        store
470            .insert(
471                id,
472                khive_types::SubstrateKind::Entity,
473                "local",
474                vec![1.0, 0.0, 0.0],
475            )
476            .await
477            .unwrap();
478
479        let hits = store
480            .search(khive_storage::types::VectorSearchRequest {
481                query_embedding: vec![1.0, 0.0, 0.0],
482                top_k: 1,
483                namespace: None,
484                kind: None,
485            })
486            .await
487            .unwrap();
488
489        assert_eq!(hits.len(), 1);
490        assert_eq!(hits[0].subject_id, id);
491        assert!(hits[0].score.to_f64() > 0.99);
492    }
493
494    #[tokio::test]
495    #[cfg(feature = "vectors")]
496    async fn vectors_creates_table_idempotently() {
497        let backend = StorageBackend::memory().unwrap();
498
499        let store1 = backend.vectors("idempotent", 3).unwrap();
500        let store2 = backend.vectors("idempotent", 3).unwrap();
501
502        let id = uuid::Uuid::new_v4();
503        store1
504            .insert(
505                id,
506                khive_types::SubstrateKind::Entity,
507                "local",
508                vec![1.0, 0.0, 0.0],
509            )
510            .await
511            .unwrap();
512
513        let count = store2.count().await.unwrap();
514        assert_eq!(count, 1);
515    }
516
517    #[tokio::test]
518    async fn text_roundtrip_via_public_api() {
519        let backend = StorageBackend::memory().unwrap();
520        let store = backend.text("test_api").unwrap();
521
522        let id = uuid::Uuid::new_v4();
523        let doc = khive_storage::types::TextDocument {
524            subject_id: id,
525            kind: khive_types::SubstrateKind::Entity,
526            title: Some("Test Title".to_string()),
527            body: "This is a searchable document about Rust.".to_string(),
528            tags: vec!["rust".to_string()],
529            namespace: "test_ns".to_string(),
530            metadata: None,
531            updated_at: chrono::Utc::now(),
532        };
533        store.upsert_document(doc).await.unwrap();
534
535        let hits = store
536            .search(khive_storage::types::TextSearchRequest {
537                query: "Rust".to_string(),
538                mode: khive_storage::types::TextQueryMode::Plain,
539                filter: Some(khive_storage::types::TextFilter {
540                    namespaces: vec!["test_ns".to_string()],
541                    ..Default::default()
542                }),
543                top_k: 1,
544                snippet_chars: 64,
545            })
546            .await
547            .unwrap();
548
549        assert_eq!(hits.len(), 1);
550        assert_eq!(hits[0].subject_id, id);
551        assert!(hits[0].score.to_f64() > 0.0);
552    }
553
554    #[tokio::test]
555    async fn text_creates_table_idempotently() {
556        let backend = StorageBackend::memory().unwrap();
557
558        let store1 = backend.text("idempotent_fts").unwrap();
559        let store2 = backend.text("idempotent_fts").unwrap();
560
561        let id = uuid::Uuid::new_v4();
562        let doc = khive_storage::types::TextDocument {
563            subject_id: id,
564            kind: khive_types::SubstrateKind::Note,
565            title: None,
566            body: "Hello world.".to_string(),
567            tags: vec![],
568            namespace: "test_ns".to_string(),
569            metadata: None,
570            updated_at: chrono::Utc::now(),
571        };
572        store1.upsert_document(doc).await.unwrap();
573
574        let count = store2
575            .count(khive_storage::types::TextFilter {
576                namespaces: vec!["test_ns".to_string()],
577                ..Default::default()
578            })
579            .await
580            .unwrap();
581        assert_eq!(count, 1);
582    }
583
584    #[test]
585    fn invalid_model_key_rejected() {
586        let backend = StorageBackend::memory().unwrap();
587        assert!(backend.vectors("bad key!", 3).is_err());
588        assert!(backend.vectors("", 3).is_err());
589    }
590
591    #[test]
592    fn invalid_table_key_rejected() {
593        let backend = StorageBackend::memory().unwrap();
594        assert!(backend.text("bad key!").is_err());
595        assert!(backend.text("").is_err());
596    }
597
598    #[test]
599    fn apply_schema_runs_migrations_idempotently() {
600        static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
601            id: "001_init",
602            up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
603            down_sql: None,
604            is_already_applied: None,
605        }];
606        let plan = crate::migrations::ServiceSchemaPlan {
607            service: "schema_test_svc",
608            sqlite: MIGRATIONS,
609            postgres: &[],
610        };
611
612        let backend = StorageBackend::memory().unwrap();
613        backend.apply_schema(&plan).unwrap();
614        backend.apply_schema(&plan).unwrap();
615
616        let reader = backend.pool().reader().unwrap();
617        let count: i64 = reader
618            .conn()
619            .query_row(
620                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
621                [],
622                |row| row.get(0),
623            )
624            .unwrap();
625        assert_eq!(count, 1);
626    }
627}