1use std::path::Path;
23use std::sync::Arc;
24
25use rusqlite::OptionalExtension;
26
27use crate::error::SqliteError;
28use crate::pool::{ConnectionPool, PoolConfig};
29use crate::sql_bridge::SqlBridge;
30use crate::stores::{entity, event, graph, note, sparse, text, vectors};
31
32pub struct StorageBackend {
34 pool: Arc<ConnectionPool>,
35 is_file_backed: bool,
36}
37
38impl StorageBackend {
39 pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
45 crate::extension::ensure_extensions_loaded();
46 let config = PoolConfig {
47 path: Some(path.as_ref().to_path_buf()),
48 ..PoolConfig::default()
49 };
50 let pool = ConnectionPool::new(config)?;
51 Ok(Self {
52 pool: Arc::new(pool),
53 is_file_backed: true,
54 })
55 }
56
57 pub fn memory() -> Result<Self, SqliteError> {
63 crate::extension::ensure_extensions_loaded();
64 let config = PoolConfig {
65 path: None,
66 ..PoolConfig::default()
67 };
68 let pool = ConnectionPool::new(config)?;
69 Ok(Self {
70 pool: Arc::new(pool),
71 is_file_backed: false,
72 })
73 }
74
75 pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
79 Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
80 }
81
82 pub fn apply_schema(
88 &self,
89 plan: &crate::migrations::ServiceSchemaPlan,
90 ) -> Result<(), SqliteError> {
91 let writer = self.pool.try_writer()?;
92 crate::migrations::apply_schema_plan(writer.conn(), plan)
93 }
94
95 pub fn apply_pack_ddl_statements(
113 &self,
114 statements: &[&'static str],
115 ) -> Result<(), SqliteError> {
116 let writer = self.pool.try_writer()?;
117 for &stmt in statements {
118 writer.conn().execute_batch(stmt)?;
119 }
120 Ok(())
121 }
122
123 pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
127 self.entities_for_namespace("local")
128 }
129
130 pub fn entities_for_namespace(
134 &self,
135 namespace: &str,
136 ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
137 if namespace.trim().is_empty() {
138 return Err(SqliteError::InvalidData(
139 "entities namespace must be non-empty".to_string(),
140 ));
141 }
142 let writer = self.pool.try_writer()?;
143 entity::ensure_entities_schema(writer.conn())?;
144
145 Ok(Arc::new(entity::SqlEntityStore::new(
146 Arc::clone(&self.pool),
147 self.is_file_backed,
148 )))
149 }
150
151 pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
156 self.graph_for_namespace("local")
157 }
158
159 pub fn graph_for_namespace(
161 &self,
162 namespace: &str,
163 ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
164 if namespace.trim().is_empty() {
165 return Err(SqliteError::InvalidData(
166 "graph namespace must be non-empty".to_string(),
167 ));
168 }
169 let writer = self.pool.try_writer()?;
170 graph::ensure_graph_schema(writer.conn())?;
171
172 Ok(Arc::new(graph::SqlGraphStore::new_scoped(
173 Arc::clone(&self.pool),
174 self.is_file_backed,
175 namespace.trim().to_string(),
176 )))
177 }
178
179 pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
183 self.notes_for_namespace("local")
184 }
185
186 pub fn notes_for_namespace(
190 &self,
191 namespace: &str,
192 ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
193 if namespace.trim().is_empty() {
194 return Err(SqliteError::InvalidData(
195 "notes namespace must be non-empty".to_string(),
196 ));
197 }
198 let writer = self.pool.try_writer()?;
199 note::ensure_notes_schema(writer.conn())?;
200
201 Ok(Arc::new(note::SqlNoteStore::new(
202 Arc::clone(&self.pool),
203 self.is_file_backed,
204 )))
205 }
206
207 pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
212 self.events_for_namespace("local")
213 }
214
215 pub fn events_for_namespace(
217 &self,
218 namespace: &str,
219 ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
220 if namespace.trim().is_empty() {
221 return Err(SqliteError::InvalidData(
222 "events namespace must be non-empty".to_string(),
223 ));
224 }
225 let writer = self.pool.try_writer()?;
226 event::ensure_events_schema(writer.conn())?;
227
228 Ok(Arc::new(event::SqlEventStore::new_scoped(
229 Arc::clone(&self.pool),
230 self.is_file_backed,
231 namespace.trim().to_string(),
232 )))
233 }
234
235 pub fn vectors(
241 &self,
242 model_key: &str,
243 embedding_model: &str,
244 dimensions: usize,
245 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
246 self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
247 }
248
249 pub fn vectors_for_namespace(
259 &self,
260 model_key: &str,
261 embedding_model: &str,
262 dimensions: usize,
263 namespace: &str,
264 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
265 if model_key.is_empty()
266 || !model_key
267 .chars()
268 .all(|c| c.is_ascii_alphanumeric() || c == '_')
269 {
270 return Err(SqliteError::InvalidData(format!(
271 "invalid model_key '{}': must be non-empty and contain only \
272 alphanumeric/underscore characters",
273 model_key
274 )));
275 }
276 if namespace.trim().is_empty() {
277 return Err(SqliteError::InvalidData(
278 "vector store namespace must be non-empty".to_string(),
279 ));
280 }
281
282 crate::extension::ensure_extensions_loaded();
284
285 let table = format!("vec_{}", model_key);
286 let writer = self.pool.try_writer()?;
287
288 let table_exists: bool = writer
295 .conn()
296 .query_row(
297 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
298 rusqlite::params![&table],
299 |row| row.get::<_, i64>(0),
300 )
301 .optional()
302 .map_err(SqliteError::Rusqlite)?
303 .is_some();
304
305 if table_exists {
306 let pragma = format!("PRAGMA table_xinfo({})", table);
312 let mut stmt = writer.conn().prepare(&pragma)?;
313 let mut rows = stmt.query([])?;
314 let mut has_field = false;
315 let mut has_embedding_model = false;
316 while let Some(row) = rows.next()? {
317 let name: String = row.get(1)?;
318 if name == "field" {
319 has_field = true;
320 }
321 if name == "embedding_model" {
322 has_embedding_model = true;
323 }
324 }
325 if !has_field || !has_embedding_model {
326 return Err(SqliteError::InvalidData(format!(
327 "vec0 table '{}' is missing required column(s) (field={}, \
328 embedding_model={}); run `kkernel db migrate` to apply V17 \
329 (vector_embedding_model_tag_preserving_rebuild) before opening \
330 this vector store",
331 table, has_field, has_embedding_model,
332 )));
333 }
334 }
335
336 writer
345 .conn()
346 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
347
348 let ddl = format!(
351 "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
352 subject_id TEXT PRIMARY KEY, \
353 namespace TEXT NOT NULL, \
354 kind TEXT NOT NULL, \
355 field TEXT NOT NULL, \
356 embedding_model TEXT NOT NULL, \
357 embedding float[{}] distance_metric=cosine\
358 )",
359 model_key, dimensions
360 );
361 writer.conn().execute_batch(&ddl)?;
362
363 Ok(Arc::new(vectors::SqliteVecStore::new(
364 Arc::clone(&self.pool),
365 self.is_file_backed,
366 model_key.to_string(),
367 embedding_model.to_string(),
368 dimensions,
369 namespace.trim().to_string(),
370 )?))
371 }
372
373 pub fn register_embedding_model(
378 &self,
379 engine_name: &str,
380 model_id: &str,
381 key_version: &str,
382 dimensions: u32,
383 ) -> Result<(), SqliteError> {
384 let writer = self.pool.try_writer()?;
385 writer
386 .conn()
387 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
388
389 let now = chrono::Utc::now().timestamp_micros();
390 let canonical_key =
391 format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
392 let id = uuid::Uuid::new_v4();
393 writer.conn().execute(
394 "INSERT INTO _embedding_models \
395 (id, engine_name, model_id, key_version, dim, output_dim, status, \
396 activated_at, superseded_at, superseded_by, canonical_key, created_at) \
397 VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
398 ON CONFLICT(canonical_key) DO UPDATE SET \
399 status = 'active', \
400 activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
401 rusqlite::params![
402 id.as_bytes().as_slice(),
403 engine_name,
404 model_id,
405 key_version,
406 dimensions as i64,
407 now,
408 canonical_key,
409 now,
410 ],
411 )?;
412 Ok(())
413 }
414
415 pub fn sparse(
419 &self,
420 model_key: &str,
421 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
422 self.sparse_for_namespace(model_key, "local")
423 }
424
425 pub fn sparse_for_namespace(
429 &self,
430 model_key: &str,
431 namespace: &str,
432 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
433 if model_key.is_empty()
434 || !model_key
435 .chars()
436 .all(|c| c.is_ascii_alphanumeric() || c == '_')
437 {
438 return Err(SqliteError::InvalidData(format!(
439 "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
440 model_key
441 )));
442 }
443 if namespace.trim().is_empty() {
444 return Err(SqliteError::InvalidData(
445 "sparse store namespace must be non-empty".to_string(),
446 ));
447 }
448
449 let writer = self.pool.try_writer()?;
450 sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
451
452 Ok(Arc::new(sparse::SqliteSparseStore::new(
453 Arc::clone(&self.pool),
454 self.is_file_backed,
455 model_key.to_string(),
456 namespace.trim().to_string(),
457 )?))
458 }
459
460 pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
467 self.text_with_tokenizer(table_key, "trigram")
468 }
469
470 pub fn text_with_tokenizer(
478 &self,
479 table_key: &str,
480 tokenizer: &str,
481 ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
482 if table_key.is_empty()
483 || !table_key
484 .chars()
485 .all(|c| c.is_ascii_alphanumeric() || c == '_')
486 {
487 return Err(SqliteError::InvalidData(format!(
488 "invalid table_key '{}': must be non-empty and contain only \
489 alphanumeric/underscore characters",
490 table_key
491 )));
492 }
493 if tokenizer.is_empty()
494 || !tokenizer
495 .chars()
496 .all(|c| c.is_ascii_alphanumeric() || c == '_')
497 {
498 return Err(SqliteError::InvalidData(format!(
499 "invalid tokenizer '{}': must be non-empty and contain only \
500 alphanumeric/underscore characters",
501 tokenizer
502 )));
503 }
504
505 let ddl = format!(
506 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
507 subject_id UNINDEXED, \
508 kind UNINDEXED, \
509 title, \
510 body, \
511 tags UNINDEXED, \
512 namespace UNINDEXED, \
513 metadata UNINDEXED, \
514 updated_at UNINDEXED, \
515 tokenize = '{}'\
516 )",
517 table_key, tokenizer
518 );
519 let writer = self.pool.try_writer()?;
520 writer.conn().execute_batch(&ddl)?;
521
522 Ok(Arc::new(text::Fts5TextSearch::new(
523 Arc::clone(&self.pool),
524 self.is_file_backed,
525 table_key.to_string(),
526 )))
527 }
528
529 pub fn is_file_backed(&self) -> bool {
531 self.is_file_backed
532 }
533
534 pub fn pool(&self) -> &ConnectionPool {
536 &self.pool
537 }
538
539 pub fn pool_arc(&self) -> Arc<ConnectionPool> {
541 Arc::clone(&self.pool)
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use khive_storage::types::{SqlStatement, SqlValue};
549
550 #[test]
551 fn memory_backend_creates_successfully() {
552 let backend = StorageBackend::memory().expect("memory backend should create");
553 assert!(!backend.is_file_backed());
554 }
555
556 #[test]
557 fn file_backend_creates_successfully() {
558 let dir = tempfile::tempdir().unwrap();
559 let path = dir.path().join("test.db");
560 let backend = StorageBackend::sqlite(&path).expect("file backend should create");
561 assert!(backend.is_file_backed());
562 assert!(path.exists());
563 }
564
565 #[tokio::test]
566 async fn sql_access_memory_roundtrip() {
567 let backend = StorageBackend::memory().unwrap();
568 let sql = backend.sql();
569
570 let mut writer = sql.writer().await.unwrap();
571 writer
572 .execute_script(
573 "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
574 )
575 .await
576 .unwrap();
577
578 let affected = writer
579 .execute(SqlStatement {
580 sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
581 params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
582 label: None,
583 })
584 .await
585 .unwrap();
586 assert_eq!(affected, 1);
587
588 let mut reader = sql.reader().await.unwrap();
589 let row = reader
590 .query_row(SqlStatement {
591 sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
592 params: vec![SqlValue::Text("row1".into())],
593 label: None,
594 })
595 .await
596 .unwrap();
597
598 let row = row.expect("should find the inserted row");
599 assert_eq!(row.columns.len(), 2);
600 match &row.columns[0].value {
601 SqlValue::Text(s) => assert_eq!(s, "row1"),
602 other => panic!("expected Text, got {other:?}"),
603 }
604 match &row.columns[1].value {
605 SqlValue::Integer(v) => assert_eq!(*v, 42),
606 other => panic!("expected Integer, got {other:?}"),
607 }
608 }
609
610 #[tokio::test]
611 async fn sql_access_file_roundtrip() {
612 let dir = tempfile::tempdir().unwrap();
613 let path = dir.path().join("test_roundtrip.db");
614 let backend = StorageBackend::sqlite(&path).unwrap();
615 let sql = backend.sql();
616
617 let mut writer = sql.writer().await.unwrap();
618 writer
619 .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
620 .await
621 .unwrap();
622 writer
623 .execute(SqlStatement {
624 sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
625 params: vec![
626 SqlValue::Text("hello".into()),
627 SqlValue::Text("world".into()),
628 ],
629 label: None,
630 })
631 .await
632 .unwrap();
633
634 let mut reader = sql.reader().await.unwrap();
635 let rows = reader
636 .query_all(SqlStatement {
637 sql: "SELECT k, v FROM test_f".into(),
638 params: vec![],
639 label: None,
640 })
641 .await
642 .unwrap();
643 assert_eq!(rows.len(), 1);
644 match &rows[0].columns[1].value {
645 SqlValue::Text(s) => assert_eq!(s, "world"),
646 other => panic!("expected Text, got {other:?}"),
647 }
648 }
649
650 #[tokio::test]
651 #[cfg(feature = "vectors")]
652 async fn vectors_roundtrip_via_public_api() {
653 let backend = StorageBackend::memory().unwrap();
654 let store = backend.vectors("test_api", "test_api", 3).unwrap();
655
656 let id = uuid::Uuid::new_v4();
657 store
658 .insert(
659 id,
660 khive_types::SubstrateKind::Entity,
661 "local",
662 "content",
663 vec![vec![1.0, 0.0, 0.0]],
664 )
665 .await
666 .unwrap();
667
668 let hits = store
669 .search(khive_storage::types::VectorSearchRequest {
670 query_vectors: vec![vec![1.0, 0.0, 0.0]],
671 top_k: 1,
672 namespace: None,
673 kind: None,
674 embedding_model: None,
675 filter: None,
676 backend_hints: None,
677 })
678 .await
679 .unwrap();
680
681 assert_eq!(hits.len(), 1);
682 assert_eq!(hits[0].subject_id, id);
683 assert!(hits[0].score.to_f64() > 0.99);
684 }
685
686 #[tokio::test]
687 #[cfg(feature = "vectors")]
688 async fn vectors_creates_table_idempotently() {
689 let backend = StorageBackend::memory().unwrap();
690
691 let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
692 let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();
693
694 let id = uuid::Uuid::new_v4();
695 store1
696 .insert(
697 id,
698 khive_types::SubstrateKind::Entity,
699 "local",
700 "content",
701 vec![vec![1.0, 0.0, 0.0]],
702 )
703 .await
704 .unwrap();
705
706 let count = store2.count().await.unwrap();
707 assert_eq!(count, 1);
708 }
709
710 #[tokio::test]
711 async fn text_roundtrip_via_public_api() {
712 let backend = StorageBackend::memory().unwrap();
713 let store = backend.text("test_api").unwrap();
714
715 let id = uuid::Uuid::new_v4();
716 let doc = khive_storage::types::TextDocument {
717 subject_id: id,
718 kind: khive_types::SubstrateKind::Entity,
719 title: Some("Test Title".to_string()),
720 body: "This is a searchable document about Rust.".to_string(),
721 tags: vec!["rust".to_string()],
722 namespace: "test_ns".to_string(),
723 metadata: None,
724 updated_at: chrono::Utc::now(),
725 };
726 store.upsert_document(doc).await.unwrap();
727
728 let hits = store
729 .search(khive_storage::types::TextSearchRequest {
730 query: "Rust".to_string(),
731 mode: khive_storage::types::TextQueryMode::Plain,
732 filter: Some(khive_storage::types::TextFilter {
733 namespaces: vec!["test_ns".to_string()],
734 ..Default::default()
735 }),
736 top_k: 1,
737 snippet_chars: 64,
738 })
739 .await
740 .unwrap();
741
742 assert_eq!(hits.len(), 1);
743 assert_eq!(hits[0].subject_id, id);
744 assert!(hits[0].score.to_f64() > 0.0);
745 }
746
747 #[tokio::test]
748 async fn text_creates_table_idempotently() {
749 let backend = StorageBackend::memory().unwrap();
750
751 let store1 = backend.text("idempotent_fts").unwrap();
752 let store2 = backend.text("idempotent_fts").unwrap();
753
754 let id = uuid::Uuid::new_v4();
755 let doc = khive_storage::types::TextDocument {
756 subject_id: id,
757 kind: khive_types::SubstrateKind::Note,
758 title: None,
759 body: "Hello world.".to_string(),
760 tags: vec![],
761 namespace: "test_ns".to_string(),
762 metadata: None,
763 updated_at: chrono::Utc::now(),
764 };
765 store1.upsert_document(doc).await.unwrap();
766
767 let count = store2
768 .count(khive_storage::types::TextFilter {
769 namespaces: vec!["test_ns".to_string()],
770 ..Default::default()
771 })
772 .await
773 .unwrap();
774 assert_eq!(count, 1);
775 }
776
777 #[test]
778 fn invalid_model_key_rejected() {
779 let backend = StorageBackend::memory().unwrap();
780 assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
781 assert!(backend.vectors("", "", 3).is_err());
782 }
783
784 #[test]
785 fn invalid_table_key_rejected() {
786 let backend = StorageBackend::memory().unwrap();
787 assert!(backend.text("bad key!").is_err());
788 assert!(backend.text("").is_err());
789 }
790
791 #[test]
792 fn apply_schema_runs_migrations_idempotently() {
793 static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
794 id: "001_init",
795 up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
796 down_sql: None,
797 is_already_applied: None,
798 }];
799 let plan = crate::migrations::ServiceSchemaPlan {
800 service: "schema_test_svc",
801 sqlite: MIGRATIONS,
802 postgres: &[],
803 };
804
805 let backend = StorageBackend::memory().unwrap();
806 backend.apply_schema(&plan).unwrap();
807 backend.apply_schema(&plan).unwrap();
808
809 let reader = backend.pool().reader().unwrap();
810 let count: i64 = reader
811 .conn()
812 .query_row(
813 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
814 [],
815 |row| row.get(0),
816 )
817 .unwrap();
818 assert_eq!(count, 1);
819 }
820}