1use std::path::Path;
23use std::sync::Arc;
24
25use crate::error::SqliteError;
26use crate::pool::{ConnectionPool, PoolConfig};
27use crate::sql_bridge::SqlBridge;
28use crate::stores::{entity, event, graph, note, text, vectors};
29
30pub struct StorageBackend {
32 pool: Arc<ConnectionPool>,
33 is_file_backed: bool,
34}
35
36impl StorageBackend {
37 pub fn sqlite(path: impl AsRef<Path>) -> Result<Self, SqliteError> {
43 crate::extension::ensure_extensions_loaded();
44 let config = PoolConfig {
45 path: Some(path.as_ref().to_path_buf()),
46 ..PoolConfig::default()
47 };
48 let pool = ConnectionPool::new(config)?;
49 Ok(Self {
50 pool: Arc::new(pool),
51 is_file_backed: true,
52 })
53 }
54
55 pub fn memory() -> Result<Self, SqliteError> {
61 crate::extension::ensure_extensions_loaded();
62 let config = PoolConfig {
63 path: None,
64 ..PoolConfig::default()
65 };
66 let pool = ConnectionPool::new(config)?;
67 Ok(Self {
68 pool: Arc::new(pool),
69 is_file_backed: false,
70 })
71 }
72
73 pub fn sql(&self) -> Arc<dyn khive_storage::SqlAccess> {
77 Arc::new(SqlBridge::new(Arc::clone(&self.pool), self.is_file_backed))
78 }
79
80 pub fn apply_schema(
86 &self,
87 plan: &crate::migrations::ServiceSchemaPlan,
88 ) -> Result<(), SqliteError> {
89 let writer = self.pool.try_writer()?;
90 crate::migrations::apply_schema_plan(writer.conn(), plan)
91 }
92
93 pub fn entities(&self) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
97 self.entities_for_namespace("local")
98 }
99
100 pub fn entities_for_namespace(
104 &self,
105 namespace: &str,
106 ) -> Result<Arc<dyn khive_storage::EntityStore>, SqliteError> {
107 if namespace.trim().is_empty() {
108 return Err(SqliteError::InvalidData(
109 "entities namespace must be non-empty".to_string(),
110 ));
111 }
112 let writer = self.pool.try_writer()?;
113 entity::ensure_entities_schema(writer.conn())?;
114
115 Ok(Arc::new(entity::SqlEntityStore::new(
116 Arc::clone(&self.pool),
117 self.is_file_backed,
118 )))
119 }
120
121 pub fn graph(&self) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
126 self.graph_for_namespace("local")
127 }
128
129 pub fn graph_for_namespace(
131 &self,
132 namespace: &str,
133 ) -> Result<Arc<dyn khive_storage::GraphStore>, SqliteError> {
134 if namespace.trim().is_empty() {
135 return Err(SqliteError::InvalidData(
136 "graph namespace must be non-empty".to_string(),
137 ));
138 }
139 let writer = self.pool.try_writer()?;
140 graph::ensure_graph_schema(writer.conn())?;
141
142 Ok(Arc::new(graph::SqlGraphStore::new_scoped(
143 Arc::clone(&self.pool),
144 self.is_file_backed,
145 namespace.trim().to_string(),
146 )))
147 }
148
149 pub fn notes(&self) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
153 self.notes_for_namespace("local")
154 }
155
156 pub fn notes_for_namespace(
160 &self,
161 namespace: &str,
162 ) -> Result<Arc<dyn khive_storage::NoteStore>, SqliteError> {
163 if namespace.trim().is_empty() {
164 return Err(SqliteError::InvalidData(
165 "notes namespace must be non-empty".to_string(),
166 ));
167 }
168 let writer = self.pool.try_writer()?;
169 note::ensure_notes_schema(writer.conn())?;
170
171 Ok(Arc::new(note::SqlNoteStore::new(
172 Arc::clone(&self.pool),
173 self.is_file_backed,
174 )))
175 }
176
177 pub fn events(&self) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
182 self.events_for_namespace("local")
183 }
184
185 pub fn events_for_namespace(
187 &self,
188 namespace: &str,
189 ) -> Result<Arc<dyn khive_storage::EventStore>, SqliteError> {
190 if namespace.trim().is_empty() {
191 return Err(SqliteError::InvalidData(
192 "events namespace must be non-empty".to_string(),
193 ));
194 }
195 let writer = self.pool.try_writer()?;
196 event::ensure_events_schema(writer.conn())?;
197
198 Ok(Arc::new(event::SqlEventStore::new_scoped(
199 Arc::clone(&self.pool),
200 self.is_file_backed,
201 namespace.trim().to_string(),
202 )))
203 }
204
205 pub fn vectors(
210 &self,
211 model_key: &str,
212 dimensions: usize,
213 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
214 self.vectors_for_namespace(model_key, dimensions, "local")
215 }
216
217 pub fn vectors_for_namespace(
225 &self,
226 model_key: &str,
227 dimensions: usize,
228 namespace: &str,
229 ) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
230 if model_key.is_empty()
231 || !model_key
232 .chars()
233 .all(|c| c.is_ascii_alphanumeric() || c == '_')
234 {
235 return Err(SqliteError::InvalidData(format!(
236 "invalid model_key '{}': must be non-empty and contain only \
237 alphanumeric/underscore characters",
238 model_key
239 )));
240 }
241 if namespace.trim().is_empty() {
242 return Err(SqliteError::InvalidData(
243 "vector store namespace must be non-empty".to_string(),
244 ));
245 }
246
247 crate::extension::ensure_extensions_loaded();
249
250 let ddl = format!(
252 "CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
253 subject_id TEXT PRIMARY KEY, \
254 namespace TEXT NOT NULL, \
255 kind TEXT NOT NULL, \
256 embedding float[{}] distance_metric=cosine\
257 )",
258 model_key, dimensions
259 );
260 let writer = self.pool.try_writer()?;
261 writer.conn().execute_batch(&ddl)?;
262
263 Ok(Arc::new(vectors::SqliteVecStore::new(
264 Arc::clone(&self.pool),
265 self.is_file_backed,
266 model_key.to_string(),
267 dimensions,
268 namespace.trim().to_string(),
269 )?))
270 }
271
272 pub fn text(&self, table_key: &str) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
279 self.text_with_tokenizer(table_key, "trigram")
280 }
281
282 pub fn text_with_tokenizer(
290 &self,
291 table_key: &str,
292 tokenizer: &str,
293 ) -> Result<Arc<dyn khive_storage::TextSearch>, SqliteError> {
294 if table_key.is_empty()
295 || !table_key
296 .chars()
297 .all(|c| c.is_ascii_alphanumeric() || c == '_')
298 {
299 return Err(SqliteError::InvalidData(format!(
300 "invalid table_key '{}': must be non-empty and contain only \
301 alphanumeric/underscore characters",
302 table_key
303 )));
304 }
305 if tokenizer.is_empty()
306 || !tokenizer
307 .chars()
308 .all(|c| c.is_ascii_alphanumeric() || c == '_')
309 {
310 return Err(SqliteError::InvalidData(format!(
311 "invalid tokenizer '{}': must be non-empty and contain only \
312 alphanumeric/underscore characters",
313 tokenizer
314 )));
315 }
316
317 let ddl = format!(
318 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_{} USING fts5(\
319 subject_id UNINDEXED, \
320 kind UNINDEXED, \
321 title, \
322 body, \
323 tags UNINDEXED, \
324 namespace UNINDEXED, \
325 metadata UNINDEXED, \
326 updated_at UNINDEXED, \
327 tokenize = '{}'\
328 )",
329 table_key, tokenizer
330 );
331 let writer = self.pool.try_writer()?;
332 writer.conn().execute_batch(&ddl)?;
333
334 Ok(Arc::new(text::Fts5TextSearch::new(
335 Arc::clone(&self.pool),
336 self.is_file_backed,
337 table_key.to_string(),
338 )))
339 }
340
341 pub fn is_file_backed(&self) -> bool {
343 self.is_file_backed
344 }
345
346 pub fn pool(&self) -> &ConnectionPool {
348 &self.pool
349 }
350
351 pub fn pool_arc(&self) -> Arc<ConnectionPool> {
353 Arc::clone(&self.pool)
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use khive_storage::types::{SqlStatement, SqlValue};
361
362 #[test]
363 fn memory_backend_creates_successfully() {
364 let backend = StorageBackend::memory().expect("memory backend should create");
365 assert!(!backend.is_file_backed());
366 }
367
368 #[test]
369 fn file_backend_creates_successfully() {
370 let dir = tempfile::tempdir().unwrap();
371 let path = dir.path().join("test.db");
372 let backend = StorageBackend::sqlite(&path).expect("file backend should create");
373 assert!(backend.is_file_backed());
374 assert!(path.exists());
375 }
376
377 #[tokio::test]
378 async fn sql_access_memory_roundtrip() {
379 let backend = StorageBackend::memory().unwrap();
380 let sql = backend.sql();
381
382 let mut writer = sql.writer().await.unwrap();
383 writer
384 .execute_script(
385 "CREATE TABLE test_rt (id TEXT PRIMARY KEY, value INTEGER NOT NULL)".into(),
386 )
387 .await
388 .unwrap();
389
390 let affected = writer
391 .execute(SqlStatement {
392 sql: "INSERT INTO test_rt (id, value) VALUES (?1, ?2)".into(),
393 params: vec![SqlValue::Text("row1".into()), SqlValue::Integer(42)],
394 label: None,
395 })
396 .await
397 .unwrap();
398 assert_eq!(affected, 1);
399
400 let mut reader = sql.reader().await.unwrap();
401 let row = reader
402 .query_row(SqlStatement {
403 sql: "SELECT id, value FROM test_rt WHERE id = ?1".into(),
404 params: vec![SqlValue::Text("row1".into())],
405 label: None,
406 })
407 .await
408 .unwrap();
409
410 let row = row.expect("should find the inserted row");
411 assert_eq!(row.columns.len(), 2);
412 match &row.columns[0].value {
413 SqlValue::Text(s) => assert_eq!(s, "row1"),
414 other => panic!("expected Text, got {other:?}"),
415 }
416 match &row.columns[1].value {
417 SqlValue::Integer(v) => assert_eq!(*v, 42),
418 other => panic!("expected Integer, got {other:?}"),
419 }
420 }
421
422 #[tokio::test]
423 async fn sql_access_file_roundtrip() {
424 let dir = tempfile::tempdir().unwrap();
425 let path = dir.path().join("test_roundtrip.db");
426 let backend = StorageBackend::sqlite(&path).unwrap();
427 let sql = backend.sql();
428
429 let mut writer = sql.writer().await.unwrap();
430 writer
431 .execute_script("CREATE TABLE test_f (k TEXT PRIMARY KEY, v TEXT)".into())
432 .await
433 .unwrap();
434 writer
435 .execute(SqlStatement {
436 sql: "INSERT INTO test_f (k, v) VALUES (?1, ?2)".into(),
437 params: vec![
438 SqlValue::Text("hello".into()),
439 SqlValue::Text("world".into()),
440 ],
441 label: None,
442 })
443 .await
444 .unwrap();
445
446 let mut reader = sql.reader().await.unwrap();
447 let rows = reader
448 .query_all(SqlStatement {
449 sql: "SELECT k, v FROM test_f".into(),
450 params: vec![],
451 label: None,
452 })
453 .await
454 .unwrap();
455 assert_eq!(rows.len(), 1);
456 match &rows[0].columns[1].value {
457 SqlValue::Text(s) => assert_eq!(s, "world"),
458 other => panic!("expected Text, got {other:?}"),
459 }
460 }
461
462 #[tokio::test]
463 #[cfg(feature = "vectors")]
464 async fn vectors_roundtrip_via_public_api() {
465 let backend = StorageBackend::memory().unwrap();
466 let store = backend.vectors("test_api", 3).unwrap();
467
468 let id = uuid::Uuid::new_v4();
469 store
470 .insert(
471 id,
472 khive_types::SubstrateKind::Entity,
473 "local",
474 vec![1.0, 0.0, 0.0],
475 )
476 .await
477 .unwrap();
478
479 let hits = store
480 .search(khive_storage::types::VectorSearchRequest {
481 query_embedding: vec![1.0, 0.0, 0.0],
482 top_k: 1,
483 namespace: None,
484 kind: None,
485 })
486 .await
487 .unwrap();
488
489 assert_eq!(hits.len(), 1);
490 assert_eq!(hits[0].subject_id, id);
491 assert!(hits[0].score.to_f64() > 0.99);
492 }
493
494 #[tokio::test]
495 #[cfg(feature = "vectors")]
496 async fn vectors_creates_table_idempotently() {
497 let backend = StorageBackend::memory().unwrap();
498
499 let store1 = backend.vectors("idempotent", 3).unwrap();
500 let store2 = backend.vectors("idempotent", 3).unwrap();
501
502 let id = uuid::Uuid::new_v4();
503 store1
504 .insert(
505 id,
506 khive_types::SubstrateKind::Entity,
507 "local",
508 vec![1.0, 0.0, 0.0],
509 )
510 .await
511 .unwrap();
512
513 let count = store2.count().await.unwrap();
514 assert_eq!(count, 1);
515 }
516
517 #[tokio::test]
518 async fn text_roundtrip_via_public_api() {
519 let backend = StorageBackend::memory().unwrap();
520 let store = backend.text("test_api").unwrap();
521
522 let id = uuid::Uuid::new_v4();
523 let doc = khive_storage::types::TextDocument {
524 subject_id: id,
525 kind: khive_types::SubstrateKind::Entity,
526 title: Some("Test Title".to_string()),
527 body: "This is a searchable document about Rust.".to_string(),
528 tags: vec!["rust".to_string()],
529 namespace: "test_ns".to_string(),
530 metadata: None,
531 updated_at: chrono::Utc::now(),
532 };
533 store.upsert_document(doc).await.unwrap();
534
535 let hits = store
536 .search(khive_storage::types::TextSearchRequest {
537 query: "Rust".to_string(),
538 mode: khive_storage::types::TextQueryMode::Plain,
539 filter: Some(khive_storage::types::TextFilter {
540 namespaces: vec!["test_ns".to_string()],
541 ..Default::default()
542 }),
543 top_k: 1,
544 snippet_chars: 64,
545 })
546 .await
547 .unwrap();
548
549 assert_eq!(hits.len(), 1);
550 assert_eq!(hits[0].subject_id, id);
551 assert!(hits[0].score.to_f64() > 0.0);
552 }
553
554 #[tokio::test]
555 async fn text_creates_table_idempotently() {
556 let backend = StorageBackend::memory().unwrap();
557
558 let store1 = backend.text("idempotent_fts").unwrap();
559 let store2 = backend.text("idempotent_fts").unwrap();
560
561 let id = uuid::Uuid::new_v4();
562 let doc = khive_storage::types::TextDocument {
563 subject_id: id,
564 kind: khive_types::SubstrateKind::Note,
565 title: None,
566 body: "Hello world.".to_string(),
567 tags: vec![],
568 namespace: "test_ns".to_string(),
569 metadata: None,
570 updated_at: chrono::Utc::now(),
571 };
572 store1.upsert_document(doc).await.unwrap();
573
574 let count = store2
575 .count(khive_storage::types::TextFilter {
576 namespaces: vec!["test_ns".to_string()],
577 ..Default::default()
578 })
579 .await
580 .unwrap();
581 assert_eq!(count, 1);
582 }
583
584 #[test]
585 fn invalid_model_key_rejected() {
586 let backend = StorageBackend::memory().unwrap();
587 assert!(backend.vectors("bad key!", 3).is_err());
588 assert!(backend.vectors("", 3).is_err());
589 }
590
591 #[test]
592 fn invalid_table_key_rejected() {
593 let backend = StorageBackend::memory().unwrap();
594 assert!(backend.text("bad key!").is_err());
595 assert!(backend.text("").is_err());
596 }
597
598 #[test]
599 fn apply_schema_runs_migrations_idempotently() {
600 static MIGRATIONS: &[crate::migrations::Migration] = &[crate::migrations::Migration {
601 id: "001_init",
602 up_sql: "CREATE TABLE IF NOT EXISTS schema_test (id TEXT PRIMARY KEY);",
603 down_sql: None,
604 is_already_applied: None,
605 }];
606 let plan = crate::migrations::ServiceSchemaPlan {
607 service: "schema_test_svc",
608 sqlite: MIGRATIONS,
609 postgres: &[],
610 };
611
612 let backend = StorageBackend::memory().unwrap();
613 backend.apply_schema(&plan).unwrap();
614 backend.apply_schema(&plan).unwrap();
615
616 let reader = backend.pool().reader().unwrap();
617 let count: i64 = reader
618 .conn()
619 .query_row(
620 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_test'",
621 [],
622 |row| row.get(0),
623 )
624 .unwrap();
625 assert_eq!(count, 1);
626 }
627}