1use std::path::Path;
8use std::sync::Arc;
9
10use rusqlite::OptionalExtension;
11
12use crate::error::SqliteError;
13use crate::pool::{ConnectionPool, PoolConfig};
14use crate::sql_bridge::SqlBridge;
15use crate::stores::{entity, event, graph, note, sparse, text, vectors};
16
17pub struct StorageBackend {
19 pool: Arc<ConnectionPool>,
20 is_file_backed: bool,
21}
22
23impl StorageBackend {
24 pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
30 crate::extension::ensure_extensions_loaded();
31 let config = PoolConfig {
32 path: Some(path.as_ref().to_path_buf()),
33 ..PoolConfig::default()
34 };
35 let pool = ConnectionPool::new(config)?;
36 Ok(Self {
37 pool: Arc::new(pool),
38 is_file_backed: true,
39 })
40 }
41
42 pub fn memory() -> Result<Self, SqliteError> {
48 crate::extension::ensure_extensions_loaded();
49 let config = PoolConfig {
50 path: None,
51 ..PoolConfig::default()
52 };
53 let pool = ConnectionPool::new(config)?;
54 Ok(Self {
55 pool: Arc::new(pool),
56 is_file_backed: false,
57 })
58 }
59
60 pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
64 Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
65 }
66
67 pub fn apply_schema(
73 &self,
74 plan: &crate::migrations::ServiceSchemaPlan,
75 ) -> Result<(), SqliteError> {
76 let writer = self.pool.try_writer()?;
77 crate::migrations::apply_schema_plan(writer.conn(), plan)
78 }
79
80 pub fn apply_pack_ddl_statements(
97 &self,
98 statements: &[&'static str],
99 ) -> Result<(), SqliteError> {
100 let writer = self.pool.try_writer()?;
101 for &stmt in statements {
102 writer.conn().execute_batch(stmt)?;
103 }
104 Ok(())
105 }
106
107 pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
111 self.entities_for_namespace("local")
112 }
113
114 pub fn entities_for_namespace(
118 &self,
119 namespace: &str,
120 ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
121 if namespace.trim().is_empty() {
122 return Err(SqliteError::InvalidData(
123 "entities namespace must be non-empty".to_string(),
124 ));
125 }
126 let writer = self.pool.try_writer()?;
127 entity::ensure_entities_schema(writer.conn())?;
128
129 Ok(Arc::new(entity::SqlEntityStore::new(
130 Arc::clone(&self.pool),
131 self.is_file_backed,
132 )))
133 }
134
135 pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
140 self.graph_for_namespace("local")
141 }
142
143 pub fn graph_for_namespace(
145 &self,
146 namespace: &str,
147 ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
148 if namespace.trim().is_empty() {
149 return Err(SqliteError::InvalidData(
150 "graph namespace must be non-empty".to_string(),
151 ));
152 }
153 let writer = self.pool.try_writer()?;
154 graph::ensure_graph_schema(writer.conn())?;
155
156 Ok(Arc::new(graph::SqlGraphStore::new_scoped(
157 Arc::clone(&self.pool),
158 self.is_file_backed,
159 namespace.trim().to_string(),
160 )))
161 }
162
163 pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
167 self.notes_for_namespace("local")
168 }
169
170 pub fn notes_for_namespace(
174 &self,
175 namespace: &str,
176 ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
177 if namespace.trim().is_empty() {
178 return Err(SqliteError::InvalidData(
179 "notes namespace must be non-empty".to_string(),
180 ));
181 }
182 let writer = self.pool.try_writer()?;
183 note::ensure_notes_schema(writer.conn())?;
184
185 Ok(Arc::new(note::SqlNoteStore::new(
186 Arc::clone(&self.pool),
187 self.is_file_backed,
188 )))
189 }
190
191 pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
196 self.events_for_namespace("local")
197 }
198
199 pub fn events_for_namespace(
201 &self,
202 namespace: &str,
203 ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
204 if namespace.trim().is_empty() {
205 return Err(SqliteError::InvalidData(
206 "events namespace must be non-empty".to_string(),
207 ));
208 }
209 let writer = self.pool.try_writer()?;
210 event::ensure_events_schema(writer.conn())?;
211
212 Ok(Arc::new(event::SqlEventStore::new_scoped(
213 Arc::clone(&self.pool),
214 self.is_file_backed,
215 namespace.trim().to_string(),
216 )))
217 }
218
219 pub fn vectors(
225 &self,
226 model_key: &str,
227 embedding_model: &str,
228 dimensions: usize,
229 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
230 self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
231 }
232
233 pub fn vectors_for_namespace(
243 &self,
244 model_key: &str,
245 embedding_model: &str,
246 dimensions: usize,
247 namespace: &str,
248 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
249 if model_key.is_empty()
250 || !model_key
251 .chars()
252 .all(|c| c.is_ascii_alphanumeric() || c == '_')
253 {
254 return Err(SqliteError::InvalidData(format!(
255 "invalid model_key '{}': must be non-empty and contain only \
256 alphanumeric/underscore characters",
257 model_key
258 )));
259 }
260 if namespace.trim().is_empty() {
261 return Err(SqliteError::InvalidData(
262 "vector store namespace must be non-empty".to_string(),
263 ));
264 }
265
266 crate::extension::ensure_extensions_loaded();
268
269 let table = format!("vec_{}", model_key);
270 let writer = self.pool.try_writer()?;
271
272 let table_exists: bool = writer
279 .conn()
280 .query_row(
281 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
282 rusqlite::params![&table],
283 |row| row.get::<_, i64>(0),
284 )
285 .optional()
286 .map_err(SqliteError::Rusqlite)?
287 .is_some();
288
289 if table_exists {
290 let pragma = format!("PRAGMA table_xinfo({})", table);
296 let mut stmt = writer.conn().prepare(&pragma)?;
297 let mut rows = stmt.query([])?;
298 let mut has_field = false;
299 let mut has_embedding_model = false;
300 while let Some(row) = rows.next()? {
301 let name: String = row.get(1)?;
302 if name == "field" {
303 has_field = true;
304 }
305 if name == "embedding_model" {
306 has_embedding_model = true;
307 }
308 }
309 if !has_field || !has_embedding_model {
310 return Err(SqliteError::InvalidData(format!(
311 "vec0 table '{}' is missing required column(s) (field={}, \
312 embedding_model={}); this is a pre-v0.2.8 vector schema and is \
313 not supported — recreate the database",
314 table, has_field, has_embedding_model,
315 )));
316 }
317 }
318
319 writer
328 .conn()
329 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
330
331 let ddl = format!(
334 "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
335 subject_id TEXT PRIMARY KEY, \
336 namespace TEXT NOT NULL, \
337 kind TEXT NOT NULL, \
338 field TEXT NOT NULL, \
339 embedding_model TEXT NOT NULL, \
340 embedding float[{}] distance_metric=cosine\
341 )",
342 model_key, dimensions
343 );
344 writer.conn().execute_batch(&ddl)?;
345
346 Ok(Arc::new(vectors::SqliteVecStore::new(
347 Arc::clone(&self.pool),
348 self.is_file_backed,
349 model_key.to_string(),
350 embedding_model.to_string(),
351 dimensions,
352 namespace.trim().to_string(),
353 )?))
354 }
355
356 pub fn register_embedding_model(
361 &self,
362 engine_name: &str,
363 model_id: &str,
364 key_version: &str,
365 dimensions: u32,
366 ) -> Result<(), SqliteError> {
367 let writer = self.pool.try_writer()?;
368 writer
369 .conn()
370 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
371
372 let now = chrono::Utc::now().timestamp_micros();
373 let canonical_key =
374 format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
375 let id = uuid::Uuid::new_v4();
376 writer.conn().execute(
377 "INSERT INTO _embedding_models \
378 (id, engine_name, model_id, key_version, dim, output_dim, status, \
379 activated_at, superseded_at, superseded_by, canonical_key, created_at) \
380 VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
381 ON CONFLICT(canonical_key) DO UPDATE SET \
382 status = 'active', \
383 activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
384 rusqlite::params![
385 id.as_bytes().as_slice(),
386 engine_name,
387 model_id,
388 key_version,
389 dimensions as i64,
390 now,
391 canonical_key,
392 now,
393 ],
394 )?;
395 Ok(())
396 }
397
398 pub fn sparse(
402 &self,
403 model_key: &str,
404 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
405 self.sparse_for_namespace(model_key, "local")
406 }
407
408 pub fn sparse_for_namespace(
412 &self,
413 model_key: &str,
414 namespace: &str,
415 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
416 if model_key.is_empty()
417 || !model_key
418 .chars()
419 .all(|c| c.is_ascii_alphanumeric() || c == '_')
420 {
421 return Err(SqliteError::InvalidData(format!(
422 "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
423 model_key
424 )));
425 }
426 if namespace.trim().is_empty() {
427 return Err(SqliteError::InvalidData(
428 "sparse store namespace must be non-empty".to_string(),
429 ));
430 }
431
432 let writer = self.pool.try_writer()?;
433 sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
434
435 Ok(Arc::new(sparse::SqliteSparseStore::new(
436 Arc::clone(&self.pool),
437 self.is_file_backed,
438 model_key.to_string(),
439 namespace.trim().to_string(),
440 )?))
441 }
442
443 pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
450 self.text_with_tokenizer(table_key, "trigram")
451 }
452
453 pub fn text_with_tokenizer(
461 &self,
462 table_key: &str,
463 tokenizer: &str,
464 ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
465 if table_key.is_empty()
466 || !table_key
467 .chars()
468 .all(|c| c.is_ascii_alphanumeric() || c == '_')
469 {
470 return Err(SqliteError::InvalidData(format!(
471 "invalid table_key '{}': must be non-empty and contain only \
472 alphanumeric/underscore characters",
473 table_key
474 )));
475 }
476 if tokenizer.is_empty()
477 || !tokenizer
478 .chars()
479 .all(|c| c.is_ascii_alphanumeric() || c == '_')
480 {
481 return Err(SqliteError::InvalidData(format!(
482 "invalid tokenizer '{}': must be non-empty and contain only \
483 alphanumeric/underscore characters",
484 tokenizer
485 )));
486 }
487
488 let ddl = format!(
489 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
490 subject_id UNINDEXED, \
491 kind UNINDEXED, \
492 title, \
493 body, \
494 tags UNINDEXED, \
495 namespace UNINDEXED, \
496 metadata UNINDEXED, \
497 updated_at UNINDEXED, \
498 tokenize = '{}'\
499 )",
500 table_key, tokenizer
501 );
502 let writer = self.pool.try_writer()?;
503 writer.conn().execute_batch(&ddl)?;
504
505 Ok(Arc::new(text::Fts5TextSearch::new(
506 Arc::clone(&self.pool),
507 self.is_file_backed,
508 table_key.to_string(),
509 )))
510 }
511
512 pub fn is_file_backed(&self) -> bool {
514 self.is_file_backed
515 }
516
517 pub fn pool(&self) -> &ConnectionPool {
519 &self.pool
520 }
521
522 pub fn pool_arc(&self) -> Arc<ConnectionPool> {
524 Arc::clone(&self.pool)
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use khive_storage::types::{SqlStatement, SqlValue};
532
533 #[test]
534 fn memory_backend_creates_successfully() {
535 let backend = StorageBackend::memory().expect("memory backend should create");
536 assert!(!backend.is_file_backed());
537 }
538
539 #[test]
540 fn file_backend_creates_successfully() {
541 let dir = tempfile::tempdir().unwrap();
542 let path = dir.path().join("test.db");
543 let backend = StorageBackend::sqlite(&path).expect("file backend should create");
544 assert!(backend.is_file_backed());
545 assert!(path.exists());
546 }
547
548 #[tokio::test]
549 async fn sql_access_memory_roundtrip() {
550 let backend = StorageBackend::memory().unwrap();
551 let sql = backend.sql();
552
553 let mut writer = sql.writer().await.unwrap();
554 writer
555 .execute_script(
556 "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
557 )
558 .await
559 .unwrap();
560
561 let affected = writer
562 .execute(SqlStatement {
563 sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
564 params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
565 label: None,
566 })
567 .await
568 .unwrap();
569 assert_eq!(affected, 1);
570
571 let mut reader = sql.reader().await.unwrap();
572 let row = reader
573 .query_row(SqlStatement {
574 sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
575 params: vec![SqlValue::Text("row1".into())],
576 label: None,
577 })
578 .await
579 .unwrap();
580
581 let row = row.expect("should find the inserted row");
582 assert_eq!(row.columns.len(), 2);
583 match &row.columns[0].value {
584 SqlValue::Text(s) => assert_eq!(s, "row1"),
585 other => panic!("expected Text, got {other:?}"),
586 }
587 match &row.columns[1].value {
588 SqlValue::Integer(v) => assert_eq!(*v, 42),
589 other => panic!("expected Integer, got {other:?}"),
590 }
591 }
592
593 #[tokio::test]
594 async fn sql_access_file_roundtrip() {
595 let dir = tempfile::tempdir().unwrap();
596 let path = dir.path().join("test_roundtrip.db");
597 let backend = StorageBackend::sqlite(&path).unwrap();
598 let sql = backend.sql();
599
600 let mut writer = sql.writer().await.unwrap();
601 writer
602 .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
603 .await
604 .unwrap();
605 writer
606 .execute(SqlStatement {
607 sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
608 params: vec![
609 SqlValue::Text("hello".into()),
610 SqlValue::Text("world".into()),
611 ],
612 label: None,
613 })
614 .await
615 .unwrap();
616
617 let mut reader = sql.reader().await.unwrap();
618 let rows = reader
619 .query_all(SqlStatement {
620 sql: "SELECT k, v FROM test_f".into(),
621 params: vec![],
622 label: None,
623 })
624 .await
625 .unwrap();
626 assert_eq!(rows.len(), 1);
627 match &rows[0].columns[1].value {
628 SqlValue::Text(s) => assert_eq!(s, "world"),
629 other => panic!("expected Text, got {other:?}"),
630 }
631 }
632
633 #[tokio::test]
634 #[cfg(feature = "vectors")]
635 async fn vectors_roundtrip_via_public_api() {
636 let backend = StorageBackend::memory().unwrap();
637 let store = backend.vectors("test_api", "test_api", 3).unwrap();
638
639 let id = uuid::Uuid::new_v4();
640 store
641 .insert(
642 id,
643 khive_types::SubstrateKind::Entity,
644 "local",
645 "content",
646 vec![vec![1.0, 0.0, 0.0]],
647 )
648 .await
649 .unwrap();
650
651 let hits = store
652 .search(khive_storage::types::VectorSearchRequest {
653 query_vectors: vec![vec![1.0, 0.0, 0.0]],
654 top_k: 1,
655 namespace: None,
656 kind: None,
657 embedding_model: None,
658 filter: None,
659 backend_hints: None,
660 })
661 .await
662 .unwrap();
663
664 assert_eq!(hits.len(), 1);
665 assert_eq!(hits[0].subject_id, id);
666 assert!(hits[0].score.to_f64() > 0.99);
667 }
668
669 #[tokio::test]
670 #[cfg(feature = "vectors")]
671 async fn vectors_creates_table_idempotently() {
672 let backend = StorageBackend::memory().unwrap();
673
674 let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
675 let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();
676
677 let id = uuid::Uuid::new_v4();
678 store1
679 .insert(
680 id,
681 khive_types::SubstrateKind::Entity,
682 "local",
683 "content",
684 vec![vec![1.0, 0.0, 0.0]],
685 )
686 .await
687 .unwrap();
688
689 let count = store2.count().await.unwrap();
690 assert_eq!(count, 1);
691 }
692
693 #[tokio::test]
694 async fn text_roundtrip_via_public_api() {
695 let backend = StorageBackend::memory().unwrap();
696 let store = backend.text("test_api").unwrap();
697
698 let id = uuid::Uuid::new_v4();
699 let doc = khive_storage::types::TextDocument {
700 subject_id: id,
701 kind: khive_types::SubstrateKind::Entity,
702 title: Some("Test Title".to_string()),
703 body: "This is a searchable document about Rust.".to_string(),
704 tags: vec!["rust".to_string()],
705 namespace: "test_ns".to_string(),
706 metadata: None,
707 updated_at: chrono::Utc::now(),
708 };
709 store.upsert_document(doc).await.unwrap();
710
711 let hits = store
712 .search(khive_storage::types::TextSearchRequest {
713 query: "Rust".to_string(),
714 mode: khive_storage::types::TextQueryMode::Plain,
715 filter: Some(khive_storage::types::TextFilter {
716 namespaces: vec!["test_ns".to_string()],
717 ..Default::default()
718 }),
719 top_k: 1,
720 snippet_chars: 64,
721 })
722 .await
723 .unwrap();
724
725 assert_eq!(hits.len(), 1);
726 assert_eq!(hits[0].subject_id, id);
727 assert!(hits[0].score.to_f64() > 0.0);
728 }
729
730 #[tokio::test]
731 async fn text_creates_table_idempotently() {
732 let backend = StorageBackend::memory().unwrap();
733
734 let store1 = backend.text("idempotent_fts").unwrap();
735 let store2 = backend.text("idempotent_fts").unwrap();
736
737 let id = uuid::Uuid::new_v4();
738 let doc = khive_storage::types::TextDocument {
739 subject_id: id,
740 kind: khive_types::SubstrateKind::Note,
741 title: None,
742 body: "Hello world.".to_string(),
743 tags: vec![],
744 namespace: "test_ns".to_string(),
745 metadata: None,
746 updated_at: chrono::Utc::now(),
747 };
748 store1.upsert_document(doc).await.unwrap();
749
750 let count = store2
751 .count(khive_storage::types::TextFilter {
752 namespaces: vec!["test_ns".to_string()],
753 ..Default::default()
754 })
755 .await
756 .unwrap();
757 assert_eq!(count, 1);
758 }
759
760 #[test]
761 fn invalid_model_key_rejected() {
762 let backend = StorageBackend::memory().unwrap();
763 assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
764 assert!(backend.vectors("", "", 3).is_err());
765 }
766
767 #[test]
768 fn invalid_table_key_rejected() {
769 let backend = StorageBackend::memory().unwrap();
770 assert!(backend.text("bad key!").is_err());
771 assert!(backend.text("").is_err());
772 }
773
774 #[test]
775 fn apply_schema_runs_migrations_idempotently() {
776 static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
777 id: "001_init",
778 up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
779 down_sql: None,
780 is_already_applied: None,
781 }];
782 let plan = crate::migrations::ServiceSchemaPlan {
783 service: "schema_test_svc",
784 sqlite: MIGRATIONS,
785 postgres: &[],
786 };
787
788 let backend = StorageBackend::memory().unwrap();
789 backend.apply_schema(&plan).unwrap();
790 backend.apply_schema(&plan).unwrap();
791
792 let reader = backend.pool().reader().unwrap();
793 let count: i64 = reader
794 .conn()
795 .query_row(
796 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
797 [],
798 |row| row.get(0),
799 )
800 .unwrap();
801 assert_eq!(count, 1);
802 }
803}