1use std::path::Path;
23use std::sync::Arc;
24
25use rusqlite::OptionalExtension;
26
27use crate::error::SqliteError;
28use crate::pool::{ConnectionPool, PoolConfig};
29use crate::sql_bridge::SqlBridge;
30use crate::stores::{entity, event, graph, note, sparse, text, vectors};
31
32pub struct StorageBackend {
34 pool: Arc<ConnectionPool>,
35 is_file_backed: bool,
36}
37
38impl StorageBackend {
39 pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
45 crate::extension::ensure_extensions_loaded();
46 let config = PoolConfig {
47 path: Some(path.as_ref().to_path_buf()),
48 ..PoolConfig::default()
49 };
50 let pool = ConnectionPool::new(config)?;
51 Ok(Self {
52 pool: Arc::new(pool),
53 is_file_backed: true,
54 })
55 }
56
57 pub fn memory() -> Result<Self, SqliteError> {
63 crate::extension::ensure_extensions_loaded();
64 let config = PoolConfig {
65 path: None,
66 ..PoolConfig::default()
67 };
68 let pool = ConnectionPool::new(config)?;
69 Ok(Self {
70 pool: Arc::new(pool),
71 is_file_backed: false,
72 })
73 }
74
75 pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
79 Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
80 }
81
82 pub fn apply_schema(
88 &self,
89 plan: &crate::migrations::ServiceSchemaPlan,
90 ) -> Result<(), SqliteError> {
91 let writer = self.pool.try_writer()?;
92 crate::migrations::apply_schema_plan(writer.conn(), plan)
93 }
94
95 pub fn apply_pack_ddl_statements(
113 &self,
114 statements: &[&'static str],
115 ) -> Result<(), SqliteError> {
116 let writer = self.pool.try_writer()?;
117 for &stmt in statements {
118 writer.conn().execute_batch(stmt)?;
119 }
120 Ok(())
121 }
122
123 pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
127 self.entities_for_namespace("local")
128 }
129
130 pub fn entities_for_namespace(
134 &self,
135 namespace: &str,
136 ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
137 if namespace.trim().is_empty() {
138 return Err(SqliteError::InvalidData(
139 "entities namespace must be non-empty".to_string(),
140 ));
141 }
142 let writer = self.pool.try_writer()?;
143 entity::ensure_entities_schema(writer.conn())?;
144
145 Ok(Arc::new(entity::SqlEntityStore::new(
146 Arc::clone(&self.pool),
147 self.is_file_backed,
148 )))
149 }
150
151 pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
156 self.graph_for_namespace("local")
157 }
158
159 pub fn graph_for_namespace(
161 &self,
162 namespace: &str,
163 ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
164 if namespace.trim().is_empty() {
165 return Err(SqliteError::InvalidData(
166 "graph namespace must be non-empty".to_string(),
167 ));
168 }
169 let writer = self.pool.try_writer()?;
170 graph::ensure_graph_schema(writer.conn())?;
171
172 Ok(Arc::new(graph::SqlGraphStore::new_scoped(
173 Arc::clone(&self.pool),
174 self.is_file_backed,
175 namespace.trim().to_string(),
176 )))
177 }
178
179 pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
183 self.notes_for_namespace("local")
184 }
185
186 pub fn notes_for_namespace(
190 &self,
191 namespace: &str,
192 ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
193 if namespace.trim().is_empty() {
194 return Err(SqliteError::InvalidData(
195 "notes namespace must be non-empty".to_string(),
196 ));
197 }
198 let writer = self.pool.try_writer()?;
199 note::ensure_notes_schema(writer.conn())?;
200
201 Ok(Arc::new(note::SqlNoteStore::new(
202 Arc::clone(&self.pool),
203 self.is_file_backed,
204 )))
205 }
206
207 pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
212 self.events_for_namespace("local")
213 }
214
215 pub fn events_for_namespace(
217 &self,
218 namespace: &str,
219 ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
220 if namespace.trim().is_empty() {
221 return Err(SqliteError::InvalidData(
222 "events namespace must be non-empty".to_string(),
223 ));
224 }
225 let writer = self.pool.try_writer()?;
226 event::ensure_events_schema(writer.conn())?;
227
228 Ok(Arc::new(event::SqlEventStore::new_scoped(
229 Arc::clone(&self.pool),
230 self.is_file_backed,
231 namespace.trim().to_string(),
232 )))
233 }
234
235 pub fn vectors(
240 &self,
241 model_key: &str,
242 dimensions: usize,
243 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
244 self.vectors_for_namespace(model_key, dimensions, "local")
245 }
246
247 pub fn vectors_for_namespace(
255 &self,
256 model_key: &str,
257 dimensions: usize,
258 namespace: &str,
259 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
260 if model_key.is_empty()
261 || !model_key
262 .chars()
263 .all(|c| c.is_ascii_alphanumeric() || c == '_')
264 {
265 return Err(SqliteError::InvalidData(format!(
266 "invalid model_key '{}': must be non-empty and contain only \
267 alphanumeric/underscore characters",
268 model_key
269 )));
270 }
271 if namespace.trim().is_empty() {
272 return Err(SqliteError::InvalidData(
273 "vector store namespace must be non-empty".to_string(),
274 ));
275 }
276
277 crate::extension::ensure_extensions_loaded();
279
280 let table = format!("vec_{}", model_key);
281 let writer = self.pool.try_writer()?;
282
283 let table_exists: bool = writer
290 .conn()
291 .query_row(
292 "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
293 rusqlite::params![&table],
294 |row| row.get::<_, i64>(0),
295 )
296 .optional()
297 .map_err(SqliteError::Rusqlite)?
298 .is_some();
299
300 if table_exists {
301 let has_field: bool = {
302 let pragma = format!("PRAGMA table_xinfo({})", table);
303 let mut stmt = writer.conn().prepare(&pragma)?;
304 let mut rows = stmt.query([])?;
305 let mut found = false;
306 while let Some(row) = rows.next()? {
307 let name: String = row.get(1)?;
308 if name == "field" {
309 found = true;
310 break;
311 }
312 }
313 found
314 };
315 if !has_field {
316 let drop_ddl = format!("DROP TABLE IF EXISTS {}", table);
317 writer.conn().execute_batch(&drop_ddl)?;
318 }
319 }
320
321 writer
330 .conn()
331 .execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;
332
333 let ddl = format!(
343 "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
344 subject_id TEXT PRIMARY KEY, \
345 namespace TEXT NOT NULL, \
346 kind TEXT NOT NULL, \
347 field TEXT NOT NULL, \
348 embedding float[{}] distance_metric=cosine\
349 )",
350 model_key, dimensions
351 );
352 writer.conn().execute_batch(&ddl)?;
353
354 Ok(Arc::new(vectors::SqliteVecStore::new(
355 Arc::clone(&self.pool),
356 self.is_file_backed,
357 model_key.to_string(),
358 dimensions,
359 namespace.trim().to_string(),
360 )?))
361 }
362
363 pub fn sparse(
367 &self,
368 model_key: &str,
369 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
370 self.sparse_for_namespace(model_key, "local")
371 }
372
373 pub fn sparse_for_namespace(
377 &self,
378 model_key: &str,
379 namespace: &str,
380 ) -> Result<Arc<dyn khive_storage::SparseStore>, SqliteError> {
381 if model_key.is_empty()
382 || !model_key
383 .chars()
384 .all(|c| c.is_ascii_alphanumeric() || c == '_')
385 {
386 return Err(SqliteError::InvalidData(format!(
387 "invalid model_key '{}': must be non-empty and contain only alphanumeric/underscore characters",
388 model_key
389 )));
390 }
391 if namespace.trim().is_empty() {
392 return Err(SqliteError::InvalidData(
393 "sparse store namespace must be non-empty".to_string(),
394 ));
395 }
396
397 let writer = self.pool.try_writer()?;
398 sparse::ensure_sparse_schema(writer.conn(), model_key).map_err(SqliteError::Rusqlite)?;
399
400 Ok(Arc::new(sparse::SqliteSparseStore::new(
401 Arc::clone(&self.pool),
402 self.is_file_backed,
403 model_key.to_string(),
404 namespace.trim().to_string(),
405 )?))
406 }
407
408 pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
415 self.text_with_tokenizer(table_key, "trigram")
416 }
417
418 pub fn text_with_tokenizer(
426 &self,
427 table_key: &str,
428 tokenizer: &str,
429 ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
430 if table_key.is_empty()
431 || !table_key
432 .chars()
433 .all(|c| c.is_ascii_alphanumeric() || c == '_')
434 {
435 return Err(SqliteError::InvalidData(format!(
436 "invalid table_key '{}': must be non-empty and contain only \
437 alphanumeric/underscore characters",
438 table_key
439 )));
440 }
441 if tokenizer.is_empty()
442 || !tokenizer
443 .chars()
444 .all(|c| c.is_ascii_alphanumeric() || c == '_')
445 {
446 return Err(SqliteError::InvalidData(format!(
447 "invalid tokenizer '{}': must be non-empty and contain only \
448 alphanumeric/underscore characters",
449 tokenizer
450 )));
451 }
452
453 let ddl = format!(
454 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
455 subject_id UNINDEXED, \
456 kind UNINDEXED, \
457 title, \
458 body, \
459 tags UNINDEXED, \
460 namespace UNINDEXED, \
461 metadata UNINDEXED, \
462 updated_at UNINDEXED, \
463 tokenize = '{}'\
464 )",
465 table_key, tokenizer
466 );
467 let writer = self.pool.try_writer()?;
468 writer.conn().execute_batch(&ddl)?;
469
470 Ok(Arc::new(text::Fts5TextSearch::new(
471 Arc::clone(&self.pool),
472 self.is_file_backed,
473 table_key.to_string(),
474 )))
475 }
476
477 pub fn is_file_backed(&self) -> bool {
479 self.is_file_backed
480 }
481
482 pub fn pool(&self) -> &ConnectionPool {
484 &self.pool
485 }
486
487 pub fn pool_arc(&self) -> Arc<ConnectionPool> {
489 Arc::clone(&self.pool)
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use khive_storage::types::{SqlStatement, SqlValue};
497
498 #[test]
499 fn memory_backend_creates_successfully() {
500 let backend = StorageBackend::memory().expect("memory backend should create");
501 assert!(!backend.is_file_backed());
502 }
503
504 #[test]
505 fn file_backend_creates_successfully() {
506 let dir = tempfile::tempdir().unwrap();
507 let path = dir.path().join("test.db");
508 let backend = StorageBackend::sqlite(&path).expect("file backend should create");
509 assert!(backend.is_file_backed());
510 assert!(path.exists());
511 }
512
513 #[tokio::test]
514 async fn sql_access_memory_roundtrip() {
515 let backend = StorageBackend::memory().unwrap();
516 let sql = backend.sql();
517
518 let mut writer = sql.writer().await.unwrap();
519 writer
520 .execute_script(
521 "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
522 )
523 .await
524 .unwrap();
525
526 let affected = writer
527 .execute(SqlStatement {
528 sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
529 params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
530 label: None,
531 })
532 .await
533 .unwrap();
534 assert_eq!(affected, 1);
535
536 let mut reader = sql.reader().await.unwrap();
537 let row = reader
538 .query_row(SqlStatement {
539 sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
540 params: vec![SqlValue::Text("row1".into())],
541 label: None,
542 })
543 .await
544 .unwrap();
545
546 let row = row.expect("should find the inserted row");
547 assert_eq!(row.columns.len(), 2);
548 match &row.columns[0].value {
549 SqlValue::Text(s) => assert_eq!(s, "row1"),
550 other => panic!("expected Text, got {other:?}"),
551 }
552 match &row.columns[1].value {
553 SqlValue::Integer(v) => assert_eq!(*v, 42),
554 other => panic!("expected Integer, got {other:?}"),
555 }
556 }
557
558 #[tokio::test]
559 async fn sql_access_file_roundtrip() {
560 let dir = tempfile::tempdir().unwrap();
561 let path = dir.path().join("test_roundtrip.db");
562 let backend = StorageBackend::sqlite(&path).unwrap();
563 let sql = backend.sql();
564
565 let mut writer = sql.writer().await.unwrap();
566 writer
567 .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
568 .await
569 .unwrap();
570 writer
571 .execute(SqlStatement {
572 sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
573 params: vec![
574 SqlValue::Text("hello".into()),
575 SqlValue::Text("world".into()),
576 ],
577 label: None,
578 })
579 .await
580 .unwrap();
581
582 let mut reader = sql.reader().await.unwrap();
583 let rows = reader
584 .query_all(SqlStatement {
585 sql: "SELECT k, v FROM test_f".into(),
586 params: vec![],
587 label: None,
588 })
589 .await
590 .unwrap();
591 assert_eq!(rows.len(), 1);
592 match &rows[0].columns[1].value {
593 SqlValue::Text(s) => assert_eq!(s, "world"),
594 other => panic!("expected Text, got {other:?}"),
595 }
596 }
597
598 #[tokio::test]
599 #[cfg(feature = "vectors")]
600 async fn vectors_roundtrip_via_public_api() {
601 let backend = StorageBackend::memory().unwrap();
602 let store = backend.vectors("test_api", 3).unwrap();
603
604 let id = uuid::Uuid::new_v4();
605 store
606 .insert(
607 id,
608 khive_types::SubstrateKind::Entity,
609 "local",
610 "content",
611 vec![vec![1.0, 0.0, 0.0]],
612 )
613 .await
614 .unwrap();
615
616 let hits = store
617 .search(khive_storage::types::VectorSearchRequest {
618 query_vectors: vec![vec![1.0, 0.0, 0.0]],
619 top_k: 1,
620 namespace: None,
621 kind: None,
622 filter: None,
623 backend_hints: None,
624 })
625 .await
626 .unwrap();
627
628 assert_eq!(hits.len(), 1);
629 assert_eq!(hits[0].subject_id, id);
630 assert!(hits[0].score.to_f64() > 0.99);
631 }
632
633 #[tokio::test]
634 #[cfg(feature = "vectors")]
635 async fn vectors_creates_table_idempotently() {
636 let backend = StorageBackend::memory().unwrap();
637
638 let store1 = backend.vectors("idempotent", 3).unwrap();
639 let store2 = backend.vectors("idempotent", 3).unwrap();
640
641 let id = uuid::Uuid::new_v4();
642 store1
643 .insert(
644 id,
645 khive_types::SubstrateKind::Entity,
646 "local",
647 "content",
648 vec![vec![1.0, 0.0, 0.0]],
649 )
650 .await
651 .unwrap();
652
653 let count = store2.count().await.unwrap();
654 assert_eq!(count, 1);
655 }
656
657 #[tokio::test]
658 async fn text_roundtrip_via_public_api() {
659 let backend = StorageBackend::memory().unwrap();
660 let store = backend.text("test_api").unwrap();
661
662 let id = uuid::Uuid::new_v4();
663 let doc = khive_storage::types::TextDocument {
664 subject_id: id,
665 kind: khive_types::SubstrateKind::Entity,
666 title: Some("Test Title".to_string()),
667 body: "This is a searchable document about Rust.".to_string(),
668 tags: vec!["rust".to_string()],
669 namespace: "test_ns".to_string(),
670 metadata: None,
671 updated_at: chrono::Utc::now(),
672 };
673 store.upsert_document(doc).await.unwrap();
674
675 let hits = store
676 .search(khive_storage::types::TextSearchRequest {
677 query: "Rust".to_string(),
678 mode: khive_storage::types::TextQueryMode::Plain,
679 filter: Some(khive_storage::types::TextFilter {
680 namespaces: vec!["test_ns".to_string()],
681 ..Default::default()
682 }),
683 top_k: 1,
684 snippet_chars: 64,
685 })
686 .await
687 .unwrap();
688
689 assert_eq!(hits.len(), 1);
690 assert_eq!(hits[0].subject_id, id);
691 assert!(hits[0].score.to_f64() > 0.0);
692 }
693
694 #[tokio::test]
695 async fn text_creates_table_idempotently() {
696 let backend = StorageBackend::memory().unwrap();
697
698 let store1 = backend.text("idempotent_fts").unwrap();
699 let store2 = backend.text("idempotent_fts").unwrap();
700
701 let id = uuid::Uuid::new_v4();
702 let doc = khive_storage::types::TextDocument {
703 subject_id: id,
704 kind: khive_types::SubstrateKind::Note,
705 title: None,
706 body: "Hello world.".to_string(),
707 tags: vec![],
708 namespace: "test_ns".to_string(),
709 metadata: None,
710 updated_at: chrono::Utc::now(),
711 };
712 store1.upsert_document(doc).await.unwrap();
713
714 let count = store2
715 .count(khive_storage::types::TextFilter {
716 namespaces: vec!["test_ns".to_string()],
717 ..Default::default()
718 })
719 .await
720 .unwrap();
721 assert_eq!(count, 1);
722 }
723
724 #[test]
725 fn invalid_model_key_rejected() {
726 let backend = StorageBackend::memory().unwrap();
727 assert!(backend.vectors("bad key!", 3).is_err());
728 assert!(backend.vectors("", 3).is_err());
729 }
730
731 #[test]
732 fn invalid_table_key_rejected() {
733 let backend = StorageBackend::memory().unwrap();
734 assert!(backend.text("bad key!").is_err());
735 assert!(backend.text("").is_err());
736 }
737
738 #[test]
739 fn apply_schema_runs_migrations_idempotently() {
740 static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
741 id: "001_init",
742 up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
743 down_sql: None,
744 is_already_applied: None,
745 }];
746 let plan = crate::migrations::ServiceSchemaPlan {
747 service: "schema_test_svc",
748 sqlite: MIGRATIONS,
749 postgres: &[],
750 };
751
752 let backend = StorageBackend::memory().unwrap();
753 backend.apply_schema(&plan).unwrap();
754 backend.apply_schema(&plan).unwrap();
755
756 let reader = backend.pool().reader().unwrap();
757 let count: i64 = reader
758 .conn()
759 .query_row(
760 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
761 [],
762 |row| row.get(0),
763 )
764 .unwrap();
765 assert_eq!(count, 1);
766 }
767}