1use std::path::Path;
8use std::sync::Arc;
9
10use rusqlite::OptionalExtension;
11
12use crate::error::SqliteError;
13use crate::pool::{ConnectionPool, PoolConfig};
14use crate::sql_bridge::SqlBridge;
15use crate::stores::{entity, event, graph, note, sparse, text, vectors};
16
17pub struct StorageBackend {
19 pool: Arc<ConnectionPool>,
20 is_file_backed: bool,
21}
22
23impl StorageBackend {
24 pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
30 crate::extension::ensure_extensions_loaded();
31 let config = PoolConfig {
32 path: Some(path.as_ref().to_path_buf()),
33 ..PoolConfig::default()
34 };
35 let pool = ConnectionPool::new(config)?;
36 Ok(Self {
37 pool: Arc::new(pool),
38 is_file_backed: true,
39 })
40 }
41
42 pub fn memory() -> Result<Self, SqliteError> {
48 crate::extension::ensure_extensions_loaded();
49 let config = PoolConfig {
50 path: None,
51 ..PoolConfig::default()
52 };
53 let pool = ConnectionPool::new(config)?;
54 Ok(Self {
55 pool: Arc::new(pool),
56 is_file_backed: false,
57 })
58 }
59
60 pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
64 Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
65 }
66
67 pub fn apply_schema(
73 &self,
74 plan: &crate::migrations::ServiceSchemaPlan,
75 ) -> Result<(), SqliteError> {
76 let writer = self.pool.try_writer()?;
77 crate::migrations::apply_schema_plan(writer.conn(), plan)
78 }
79
80 pub fn apply_pack_ddl_statements(
97 &self,
98 statements: &[&'static str],
99 ) -> Result<(), SqliteError> {
100 let writer = self.pool.try_writer()?;
101 for &stmt in statements {
102 writer.conn().execute_batch(stmt)?;
103 }
104 Ok(())
105 }
106
107 pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
111 self.entities_for_namespace("local")
112 }
113
114 pub fn entities_for_namespace(
118 &self,
119 namespace: &str,
120 ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
121 if namespace.trim().is_empty() {
122 return Err(SqliteError::InvalidData(
123 "entities namespace must be non-empty".to_string(),
124 ));
125 }
126 let writer = self.pool.try_writer()?;
127 entity::ensure_entities_schema(writer.conn())?;
128
129 Ok(Arc::new(entity::SqlEntityStore::new(
130 Arc::clone(&self.pool),
131 self.is_file_backed,
132 )))
133 }
134
135 pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
140 self.graph_for_namespace("local")
141 }
142
143 pub fn graph_for_namespace(
145 &self,
146 namespace: &str,
147 ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
148 if namespace.trim().is_empty() {
149 return Err(SqliteError::InvalidData(
150 "graph namespace must be non-empty".to_string(),
151 ));
152 }
153 let writer = self.pool.try_writer()?;
154 graph::ensure_graph_schema(writer.conn())?;
155
156 Ok(Arc::new(graph::SqlGraphStore::new_scoped(
157 Arc::clone(&self.pool),
158 self.is_file_backed,
159 namespace.trim().to_string(),
160 )))
161 }
162
163 pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
167 self.notes_for_namespace("local")
168 }
169
170 pub fn notes_for_namespace(
174 &self,
175 namespace: &str,
176 ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
177 if namespace.trim().is_empty() {
178 return Err(SqliteError::InvalidData(
179 "notes namespace must be non-empty".to_string(),
180 ));
181 }
182 let writer = self.pool.try_writer()?;
183 note::ensure_notes_schema(writer.conn())?;
184
185 Ok(Arc::new(note::SqlNoteStore::new(
186 Arc::clone(&self.pool),
187 self.is_file_backed,
188 )))
189 }
190
191 pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
196 self.events_for_namespace("local")
197 }
198
199 pub fn events_for_namespace(
201 &self,
202 namespace: &str,
203 ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
204 if namespace.trim().is_empty() {
205 return Err(SqliteError::InvalidData(
206 "events namespace must be non-empty".to_string(),
207 ));
208 }
209 let writer = self.pool.try_writer()?;
210 event::ensure_events_schema(writer.conn())?;
211
212 Ok(Arc::new(event::SqlEventStore::new_scoped(
213 Arc::clone(&self.pool),
214 self.is_file_backed,
215 namespace.trim().to_string(),
216 )))
217 }
218
219 pub fn vectors(
225 &self,
226 model_key: &str,
227 embedding_model: &str,
228 dimensions: usize,
229 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
230 self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
231 }
232
233 pub fn vectors_for_namespace(
243 &self,
244 model_key: &str,
245 embedding_model: &str,
246 dimensions: usize,
247 namespace: &str,
248 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
249 if model_key.is_empty()
250 || !model_key
251 .chars()
252 .all(|c| c.is_ascii_alphanumeric() || c == '_')
253 {
254 return Err(SqliteError::InvalidData(format!(
255 "invalid model_key '{}': must be non-empty and contain only \
256 alphanumeric/underscore characters",
257 model_key
258 )));
259 }
260 if namespace.trim().is_empty() {
261 return Err(SqliteError::InvalidData(
262 "vector store namespace must be non-empty".to_string(),
263 ));
264 }
265
266 crate::extension::ensure_extensions_loaded();
268
269 let table = format!("vec_{}", model_key);
270 let writer = self.pool.try_writer()?;
271
272 let table_exists: bool = writer
279 .conn()
280 .query_row(
281 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
282 rusqlite::params![&table],
283 |row| row.get::<_, i64>(0),
284 )
285 .optional()
286 .map_err(SqliteError::Rusqlite)?
287 .is_some();
288
289 if table_exists {
290 let pragma = format!("PRAGMA table_xinfo({})", table);
296 let mut stmt = writer.conn().prepare(&pragma)?;
297 let mut rows = stmt.query([])?;
298 let mut has_field = false;
299 let mut has_embedding_model = false;
300 while let Some(row) = rows.next()? {
301 let name: String = row.get(1)?;
302 if name == "field" {
303 has_field = true;
304 }
305 if name == "embedding_model" {
306 has_embedding_model = true;
307 }
308 }
309 if !has_field || !has_embedding_model {
310 return Err(SqliteError::InvalidData(format!(
311 "vec0 table '{}' is missing required column(s) (field={}, \
312 embedding_model={}); run `kkernel db migrate` to apply V17 \
313 (vector_embedding_model_tag_preserving_rebuild) before opening \
314 this vector store",
315 table, has_field, has_embedding_model,
316 )));
317 }
318 }
319
320 writer
329 .conn()
330 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
331
332 let ddl = format!(
335 "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
336 subject_id TEXT PRIMARY KEY, \
337 namespace TEXT NOT NULL, \
338 kind TEXT NOT NULL, \
339 field TEXT NOT NULL, \
340 embedding_model TEXT NOT NULL, \
341 embedding float[{}] distance_metric=cosine\
342 )",
343 model_key, dimensions
344 );
345 writer.conn().execute_batch(&ddl)?;
346
347 Ok(Arc::new(vectors::SqliteVecStore::new(
348 Arc::clone(&self.pool),
349 self.is_file_backed,
350 model_key.to_string(),
351 embedding_model.to_string(),
352 dimensions,
353 namespace.trim().to_string(),
354 )?))
355 }
356
357 pub fn register_embedding_model(
362 &self,
363 engine_name: &str,
364 model_id: &str,
365 key_version: &str,
366 dimensions: u32,
367 ) -> Result<(), SqliteError> {
368 let writer = self.pool.try_writer()?;
369 writer
370 .conn()
371 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
372
373 let now = chrono::Utc::now().timestamp_micros();
374 let canonical_key =
375 format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
376 let id = uuid::Uuid::new_v4();
377 writer.conn().execute(
378 "INSERT INTO _embedding_models \
379 (id, engine_name, model_id, key_version, dim, output_dim, status, \
380 activated_at, superseded_at, superseded_by, canonical_key, created_at) \
381 VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
382 ON CONFLICT(canonical_key) DO UPDATE SET \
383 status = 'active', \
384 activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
385 rusqlite::params![
386 id.as_bytes().as_slice(),
387 engine_name,
388 model_id,
389 key_version,
390 dimensions as i64,
391 now,
392 canonical_key,
393 now,
394 ],
395 )?;
396 Ok(())
397 }
398
399 pub fn sparse(
403 &self,
404 model_key: &str,
405 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
406 self.sparse_for_namespace(model_key, "local")
407 }
408
409 pub fn sparse_for_namespace(
413 &self,
414 model_key: &str,
415 namespace: &str,
416 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
417 if model_key.is_empty()
418 || !model_key
419 .chars()
420 .all(|c| c.is_ascii_alphanumeric() || c == '_')
421 {
422 return Err(SqliteError::InvalidData(format!(
423 "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
424 model_key
425 )));
426 }
427 if namespace.trim().is_empty() {
428 return Err(SqliteError::InvalidData(
429 "sparse store namespace must be non-empty".to_string(),
430 ));
431 }
432
433 let writer = self.pool.try_writer()?;
434 sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
435
436 Ok(Arc::new(sparse::SqliteSparseStore::new(
437 Arc::clone(&self.pool),
438 self.is_file_backed,
439 model_key.to_string(),
440 namespace.trim().to_string(),
441 )?))
442 }
443
444 pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
451 self.text_with_tokenizer(table_key, "trigram")
452 }
453
454 pub fn text_with_tokenizer(
462 &self,
463 table_key: &str,
464 tokenizer: &str,
465 ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
466 if table_key.is_empty()
467 || !table_key
468 .chars()
469 .all(|c| c.is_ascii_alphanumeric() || c == '_')
470 {
471 return Err(SqliteError::InvalidData(format!(
472 "invalid table_key '{}': must be non-empty and contain only \
473 alphanumeric/underscore characters",
474 table_key
475 )));
476 }
477 if tokenizer.is_empty()
478 || !tokenizer
479 .chars()
480 .all(|c| c.is_ascii_alphanumeric() || c == '_')
481 {
482 return Err(SqliteError::InvalidData(format!(
483 "invalid tokenizer '{}': must be non-empty and contain only \
484 alphanumeric/underscore characters",
485 tokenizer
486 )));
487 }
488
489 let ddl = format!(
490 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
491 subject_id UNINDEXED, \
492 kind UNINDEXED, \
493 title, \
494 body, \
495 tags UNINDEXED, \
496 namespace UNINDEXED, \
497 metadata UNINDEXED, \
498 updated_at UNINDEXED, \
499 tokenize = '{}'\
500 )",
501 table_key, tokenizer
502 );
503 let writer = self.pool.try_writer()?;
504 writer.conn().execute_batch(&ddl)?;
505
506 Ok(Arc::new(text::Fts5TextSearch::new(
507 Arc::clone(&self.pool),
508 self.is_file_backed,
509 table_key.to_string(),
510 )))
511 }
512
513 pub fn is_file_backed(&self) -> bool {
515 self.is_file_backed
516 }
517
518 pub fn pool(&self) -> &ConnectionPool {
520 &self.pool
521 }
522
523 pub fn pool_arc(&self) -> Arc<ConnectionPool> {
525 Arc::clone(&self.pool)
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use khive_storage::types::{SqlStatement, SqlValue};
533
534 #[test]
535 fn memory_backend_creates_successfully() {
536 let backend = StorageBackend::memory().expect("memory backend should create");
537 assert!(!backend.is_file_backed());
538 }
539
540 #[test]
541 fn file_backend_creates_successfully() {
542 let dir = tempfile::tempdir().unwrap();
543 let path = dir.path().join("test.db");
544 let backend = StorageBackend::sqlite(&path).expect("file backend should create");
545 assert!(backend.is_file_backed());
546 assert!(path.exists());
547 }
548
549 #[tokio::test]
550 async fn sql_access_memory_roundtrip() {
551 let backend = StorageBackend::memory().unwrap();
552 let sql = backend.sql();
553
554 let mut writer = sql.writer().await.unwrap();
555 writer
556 .execute_script(
557 "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
558 )
559 .await
560 .unwrap();
561
562 let affected = writer
563 .execute(SqlStatement {
564 sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
565 params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
566 label: None,
567 })
568 .await
569 .unwrap();
570 assert_eq!(affected, 1);
571
572 let mut reader = sql.reader().await.unwrap();
573 let row = reader
574 .query_row(SqlStatement {
575 sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
576 params: vec![SqlValue::Text("row1".into())],
577 label: None,
578 })
579 .await
580 .unwrap();
581
582 let row = row.expect("should find the inserted row");
583 assert_eq!(row.columns.len(), 2);
584 match &row.columns[0].value {
585 SqlValue::Text(s) => assert_eq!(s, "row1"),
586 other => panic!("expected Text, got {other:?}"),
587 }
588 match &row.columns[1].value {
589 SqlValue::Integer(v) => assert_eq!(*v, 42),
590 other => panic!("expected Integer, got {other:?}"),
591 }
592 }
593
594 #[tokio::test]
595 async fn sql_access_file_roundtrip() {
596 let dir = tempfile::tempdir().unwrap();
597 let path = dir.path().join("test_roundtrip.db");
598 let backend = StorageBackend::sqlite(&path).unwrap();
599 let sql = backend.sql();
600
601 let mut writer = sql.writer().await.unwrap();
602 writer
603 .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
604 .await
605 .unwrap();
606 writer
607 .execute(SqlStatement {
608 sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
609 params: vec![
610 SqlValue::Text("hello".into()),
611 SqlValue::Text("world".into()),
612 ],
613 label: None,
614 })
615 .await
616 .unwrap();
617
618 let mut reader = sql.reader().await.unwrap();
619 let rows = reader
620 .query_all(SqlStatement {
621 sql: "SELECT k, v FROM test_f".into(),
622 params: vec![],
623 label: None,
624 })
625 .await
626 .unwrap();
627 assert_eq!(rows.len(), 1);
628 match &rows[0].columns[1].value {
629 SqlValue::Text(s) => assert_eq!(s, "world"),
630 other => panic!("expected Text, got {other:?}"),
631 }
632 }
633
634 #[tokio::test]
635 #[cfg(feature = "vectors")]
636 async fn vectors_roundtrip_via_public_api() {
637 let backend = StorageBackend::memory().unwrap();
638 let store = backend.vectors("test_api", "test_api", 3).unwrap();
639
640 let id = uuid::Uuid::new_v4();
641 store
642 .insert(
643 id,
644 khive_types::SubstrateKind::Entity,
645 "local",
646 "content",
647 vec![vec![1.0, 0.0, 0.0]],
648 )
649 .await
650 .unwrap();
651
652 let hits = store
653 .search(khive_storage::types::VectorSearchRequest {
654 query_vectors: vec![vec![1.0, 0.0, 0.0]],
655 top_k: 1,
656 namespace: None,
657 kind: None,
658 embedding_model: None,
659 filter: None,
660 backend_hints: None,
661 })
662 .await
663 .unwrap();
664
665 assert_eq!(hits.len(), 1);
666 assert_eq!(hits[0].subject_id, id);
667 assert!(hits[0].score.to_f64() > 0.99);
668 }
669
670 #[tokio::test]
671 #[cfg(feature = "vectors")]
672 async fn vectors_creates_table_idempotently() {
673 let backend = StorageBackend::memory().unwrap();
674
675 let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
676 let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();
677
678 let id = uuid::Uuid::new_v4();
679 store1
680 .insert(
681 id,
682 khive_types::SubstrateKind::Entity,
683 "local",
684 "content",
685 vec![vec![1.0, 0.0, 0.0]],
686 )
687 .await
688 .unwrap();
689
690 let count = store2.count().await.unwrap();
691 assert_eq!(count, 1);
692 }
693
694 #[tokio::test]
695 async fn text_roundtrip_via_public_api() {
696 let backend = StorageBackend::memory().unwrap();
697 let store = backend.text("test_api").unwrap();
698
699 let id = uuid::Uuid::new_v4();
700 let doc = khive_storage::types::TextDocument {
701 subject_id: id,
702 kind: khive_types::SubstrateKind::Entity,
703 title: Some("Test Title".to_string()),
704 body: "This is a searchable document about Rust.".to_string(),
705 tags: vec!["rust".to_string()],
706 namespace: "test_ns".to_string(),
707 metadata: None,
708 updated_at: chrono::Utc::now(),
709 };
710 store.upsert_document(doc).await.unwrap();
711
712 let hits = store
713 .search(khive_storage::types::TextSearchRequest {
714 query: "Rust".to_string(),
715 mode: khive_storage::types::TextQueryMode::Plain,
716 filter: Some(khive_storage::types::TextFilter {
717 namespaces: vec!["test_ns".to_string()],
718 ..Default::default()
719 }),
720 top_k: 1,
721 snippet_chars: 64,
722 })
723 .await
724 .unwrap();
725
726 assert_eq!(hits.len(), 1);
727 assert_eq!(hits[0].subject_id, id);
728 assert!(hits[0].score.to_f64() > 0.0);
729 }
730
731 #[tokio::test]
732 async fn text_creates_table_idempotently() {
733 let backend = StorageBackend::memory().unwrap();
734
735 let store1 = backend.text("idempotent_fts").unwrap();
736 let store2 = backend.text("idempotent_fts").unwrap();
737
738 let id = uuid::Uuid::new_v4();
739 let doc = khive_storage::types::TextDocument {
740 subject_id: id,
741 kind: khive_types::SubstrateKind::Note,
742 title: None,
743 body: "Hello world.".to_string(),
744 tags: vec![],
745 namespace: "test_ns".to_string(),
746 metadata: None,
747 updated_at: chrono::Utc::now(),
748 };
749 store1.upsert_document(doc).await.unwrap();
750
751 let count = store2
752 .count(khive_storage::types::TextFilter {
753 namespaces: vec!["test_ns".to_string()],
754 ..Default::default()
755 })
756 .await
757 .unwrap();
758 assert_eq!(count, 1);
759 }
760
761 #[test]
762 fn invalid_model_key_rejected() {
763 let backend = StorageBackend::memory().unwrap();
764 assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
765 assert!(backend.vectors("", "", 3).is_err());
766 }
767
768 #[test]
769 fn invalid_table_key_rejected() {
770 let backend = StorageBackend::memory().unwrap();
771 assert!(backend.text("bad key!").is_err());
772 assert!(backend.text("").is_err());
773 }
774
775 #[test]
776 fn apply_schema_runs_migrations_idempotently() {
777 static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
778 id: "001_init",
779 up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
780 down_sql: None,
781 is_already_applied: None,
782 }];
783 let plan = crate::migrations::ServiceSchemaPlan {
784 service: "schema_test_svc",
785 sqlite: MIGRATIONS,
786 postgres: &[],
787 };
788
789 let backend = StorageBackend::memory().unwrap();
790 backend.apply_schema(&plan).unwrap();
791 backend.apply_schema(&plan).unwrap();
792
793 let reader = backend.pool().reader().unwrap();
794 let count: i64 = reader
795 .conn()
796 .query_row(
797 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
798 [],
799 |row| row.get(0),
800 )
801 .unwrap();
802 assert_eq!(count, 1);
803 }
804}