1use std::collections::HashSet;
19use std::path::PathBuf;
20use std::sync::{Arc, RwLock};
21
22use anyhow::{Result, anyhow};
23use fxhash::FxHashSet;
24use mangle_analysis::{LoweringContext, Program, StratifiedProgram, rewrite_unit};
25use mangle_ast::{self as ast, Arena};
26use mangle_common::{Store, Value};
27use mangle_interpreter::MemStore;
28use mangle_ir::Ir;
29use mangle_parse::Parser;
30use sha2::{Digest, Sha256};
31
32use crate::backend::IdbBackend;
33use crate::provenance::ProvenanceIndex;
34use crate::source::{EdbSource, Fingerprint};
35
36pub enum IdbMode {
38 InMemory,
40 Cached(Arc<dyn IdbBackend>),
42}
43
44pub enum RecomputeStrategy {
46 Full,
48 Incremental,
51}
52
53pub enum StoreBackend {
55 InMemory,
57 Disk(PathBuf),
59}
60
61pub struct DatabaseConfig {
63 pub name: String,
64 pub source: String,
65 pub edb_sources: Vec<Arc<dyn EdbSource>>,
66 pub idb_mode: IdbMode,
67 pub recompute: RecomputeStrategy,
68 pub store_backend: StoreBackend,
69}
70
71struct DatabaseState {
72 store: Box<dyn Store + Send + Sync>,
74 edb_relations: HashSet<String>,
75 idb_relations: HashSet<String>,
76 edb_fingerprint: Option<Fingerprint>,
77 program_hash: [u8; 32],
78 provenance: Option<ProvenanceIndex>,
80}
81
82pub struct Database {
86 config_name: String,
87 config_source: String,
88 edb_sources: Vec<Arc<dyn EdbSource>>,
89 idb_mode_is_cached: bool,
90 idb_backend: Option<Arc<dyn IdbBackend>>,
91 recompute_is_incremental: bool,
92 state: RwLock<DatabaseState>,
93}
94
95impl Database {
96 pub fn open(config: DatabaseConfig) -> Result<Self> {
98 let program_hash = compute_program_hash(&config.source);
99 let edb_fingerprint = compute_edb_fingerprint(&config.edb_sources)?;
100
101 let (idb_mode_is_cached, idb_backend) = match &config.idb_mode {
102 IdbMode::InMemory => (false, None),
103 IdbMode::Cached(backend) => (true, Some(Arc::clone(backend))),
104 };
105 let recompute_is_incremental = matches!(config.recompute, RecomputeStrategy::Incremental);
106
107 let mut store: Box<dyn Store + Send + Sync> = match &config.store_backend {
109 StoreBackend::InMemory => Box::new(MemStore::new()),
110 #[cfg(feature = "disk")]
111 StoreBackend::Disk(path) => Box::new(crate::disk_store::DiskStore::open(path)?),
112 #[cfg(not(feature = "disk"))]
113 StoreBackend::Disk(_) => {
114 return Err(anyhow!("Disk store requires the 'disk' feature"));
115 }
116 };
117
118 let mut cache_hit = false;
120 if let Some(ref backend) = idb_backend {
121 if let Some((meta, snapshot)) = backend.load(&config.name)? {
122 if meta.program_hash == program_hash
123 && edb_fingerprint
124 .as_ref()
125 .is_some_and(|fp| fp.0 == meta.edb_fingerprint)
126 {
127 load_edb_into_store(&config.edb_sources, &mut *store)?;
129 for (rel_name, facts) in snapshot.relations {
130 store.create_relation(&rel_name);
131 for tuple in facts {
132 store.insert(&rel_name, tuple)?;
133 }
134 }
135 store.merge_deltas();
136 cache_hit = true;
137 }
138 }
139 }
140
141 let (edb_relations, idb_relations, provenance) = if !cache_hit {
142 load_edb_into_store(&config.edb_sources, &mut *store)?;
144 full_recompute(&config.source, &mut *store)?
146 } else {
147 let (edb_rels, idb_rels) = extract_relation_names(&config.source)?;
150 (edb_rels, idb_rels, None)
151 };
152
153 if !cache_hit {
155 if let Some(ref backend) = idb_backend {
156 if let Some(ref fp) = edb_fingerprint {
157 let meta = crate::backend::CacheMeta {
158 program_hash,
159 edb_fingerprint: fp.0.clone(),
160 created_at: std::time::SystemTime::now()
161 .duration_since(std::time::UNIX_EPOCH)
162 .unwrap_or_default()
163 .as_secs(),
164 };
165 let snapshot = extract_idb_snapshot(&*store, &idb_relations);
166 backend.save(&config.name, &meta, &snapshot)?;
167 }
168 }
169 }
170
171 let state = DatabaseState {
172 store,
173 edb_relations,
174 idb_relations,
175 edb_fingerprint,
176 program_hash,
177 provenance,
178 };
179
180 Ok(Database {
181 config_name: config.name,
182 config_source: config.source,
183 edb_sources: config.edb_sources,
184 idb_mode_is_cached: idb_mode_is_cached,
185 idb_backend,
186 recompute_is_incremental,
187 state: RwLock::new(state),
188 })
189 }
190
191 pub fn query(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
193 let state = self.state.read().map_err(|_| anyhow!("lock poisoned"))?;
194 let iter = state.store.scan(relation)?;
195 Ok(iter.collect())
196 }
197
198 pub fn insert(&self, relation: &str, tuple: Vec<Value>) -> Result<()> {
200 let mut state = self.state.write().map_err(|_| anyhow!("lock poisoned"))?;
201 state.store.insert(relation, tuple)?;
202 state.store.merge_deltas();
203
204 if state.edb_relations.contains(relation) {
205 self.recompute_idb(&mut state)?;
207 }
208 Ok(())
209 }
210
211 pub fn retract(&self, relation: &str, tuple: &[Value]) -> Result<()> {
213 let mut state = self.state.write().map_err(|_| anyhow!("lock poisoned"))?;
214 state.store.retract(relation, tuple)?;
215
216 if state.edb_relations.contains(relation) {
217 self.recompute_idb(&mut state)?;
218 }
219 Ok(())
220 }
221
222 pub fn batch(&self) -> Batch<'_> {
224 Batch { db: self }
225 }
226
227 pub fn reload(&self) -> Result<()> {
229 let mut state = self.state.write().map_err(|_| anyhow!("lock poisoned"))?;
230
231 let all_rels: Vec<String> = state.store.relation_names();
233 for rel in &all_rels {
234 state.store.clear(rel);
235 }
236
237 load_edb_into_store(&self.edb_sources, &mut *state.store)?;
239
240 let (edb_rels, idb_rels, provenance) =
242 full_recompute(&self.config_source, &mut *state.store)?;
243 state.edb_relations = edb_rels;
244 state.idb_relations = idb_rels;
245 state.provenance = provenance;
246 state.edb_fingerprint = compute_edb_fingerprint(&self.edb_sources)?;
247 state.program_hash = compute_program_hash(&self.config_source);
248
249 if let Some(ref backend) = self.idb_backend {
251 if let Some(ref fp) = state.edb_fingerprint {
252 let meta = crate::backend::CacheMeta {
253 program_hash: state.program_hash,
254 edb_fingerprint: fp.0.clone(),
255 created_at: std::time::SystemTime::now()
256 .duration_since(std::time::UNIX_EPOCH)
257 .unwrap_or_default()
258 .as_secs(),
259 };
260 let snapshot = extract_idb_snapshot(&*state.store, &state.idb_relations);
261 backend.save(&self.config_name, &meta, &snapshot)?;
262 }
263 }
264
265 Ok(())
266 }
267
268 pub fn relation_names(&self) -> Result<Vec<String>> {
270 let state = self.state.read().map_err(|_| anyhow!("lock poisoned"))?;
271 Ok(state.store.relation_names())
272 }
273
274 fn recompute_idb(&self, state: &mut DatabaseState) -> Result<()> {
275 for rel in &state.idb_relations {
277 state.store.clear(rel);
278 }
279
280 let (_, idb_rels, provenance) = full_recompute(&self.config_source, &mut *state.store)?;
282 state.idb_relations = idb_rels;
283 state.provenance = provenance;
284
285 if let Some(ref backend) = self.idb_backend {
287 if let Some(ref fp) = state.edb_fingerprint {
288 let meta = crate::backend::CacheMeta {
289 program_hash: state.program_hash,
290 edb_fingerprint: fp.0.clone(),
291 created_at: std::time::SystemTime::now()
292 .duration_since(std::time::UNIX_EPOCH)
293 .unwrap_or_default()
294 .as_secs(),
295 };
296 let snapshot = extract_idb_snapshot(&*state.store, &state.idb_relations);
297 backend.save(&self.config_name, &meta, &snapshot)?;
298 }
299 }
300
301 Ok(())
302 }
303}
304
305pub struct Batch<'a> {
307 db: &'a Database,
308}
309
310impl<'a> Batch<'a> {
311 pub fn insert(&self, relation: &str, tuple: Vec<Value>) -> Result<()> {
312 let mut state = self
313 .db
314 .state
315 .write()
316 .map_err(|_| anyhow!("lock poisoned"))?;
317 state.store.insert(relation, tuple)?;
318 state.store.merge_deltas();
319 Ok(())
320 }
321
322 pub fn retract(&self, relation: &str, tuple: &[Value]) -> Result<()> {
323 let mut state = self
324 .db
325 .state
326 .write()
327 .map_err(|_| anyhow!("lock poisoned"))?;
328 state.store.retract(relation, tuple)?;
329 Ok(())
330 }
331
332 pub fn commit(self) -> Result<()> {
334 let mut state = self
335 .db
336 .state
337 .write()
338 .map_err(|_| anyhow!("lock poisoned"))?;
339 self.db.recompute_idb(&mut state)
340 }
341}
342
343fn compute_program_hash(source: &str) -> [u8; 32] {
346 let mut hasher = Sha256::new();
347 hasher.update(source.as_bytes());
348 hasher.finalize().into()
349}
350
351fn compute_edb_fingerprint(sources: &[Arc<dyn EdbSource>]) -> Result<Option<Fingerprint>> {
352 let mut hasher = Sha256::new();
353 for source in sources {
354 match source.fingerprint()? {
355 Some(fp) => hasher.update(&fp.0),
356 None => return Ok(None), }
358 }
359 Ok(Some(Fingerprint(hasher.finalize().to_vec())))
360}
361
362fn load_edb_into_store(sources: &[Arc<dyn EdbSource>], store: &mut dyn Store) -> Result<()> {
363 for source in sources {
364 let relations = source.relations()?;
365 for rel_info in &relations {
366 store.create_relation(&rel_info.name);
367 let tuples = source.scan(&rel_info.name)?;
368 for tuple in tuples {
369 store.insert(&rel_info.name, tuple)?;
370 }
371 }
372 }
373 store.merge_deltas();
374 Ok(())
375}
376
377fn extract_idb_snapshot(
378 store: &dyn Store,
379 idb_relations: &HashSet<String>,
380) -> crate::backend::IdbSnapshot {
381 let mut relations = Vec::new();
382 for rel in idb_relations {
383 if let Ok(iter) = store.scan(rel) {
384 let facts: Vec<Vec<Value>> = iter.collect();
385 if !facts.is_empty() {
386 relations.push((rel.clone(), facts));
387 }
388 }
389 }
390 crate::backend::IdbSnapshot { relations }
391}
392
393fn extract_relation_names(source: &str) -> Result<(HashSet<String>, HashSet<String>)> {
395 let arena = Arena::new_with_global_interner();
396 let (_ir, stratified) = compile_source(source, &arena)?;
397
398 let mut edb_names = HashSet::new();
399 for pred in stratified.extensional_preds() {
400 if let Some(name) = arena.predicate_name(pred) {
401 edb_names.insert(name.to_string());
402 }
403 }
404
405 let mut idb_names = HashSet::new();
406 for stratum in stratified.strata() {
407 for pred in &stratum {
408 if let Some(name) = arena.predicate_name(*pred) {
409 idb_names.insert(name.to_string());
410 }
411 }
412 }
413
414 Ok((edb_names, idb_names))
415}
416
417fn compile_source<'a>(source: &str, arena: &'a Arena) -> Result<(Ir, StratifiedProgram<'a>)> {
418 let mut parser = Parser::new(arena, source.as_bytes(), "source");
419 parser.next_token().map_err(|e| anyhow!(e))?;
420 let unit = parser.parse_unit()?;
421
422 let rewritten_unit = rewrite_unit(arena, unit);
423 let unit = &rewritten_unit;
424
425 let mut program = Program::new(arena);
426 let mut all_preds = FxHashSet::default();
427 let mut idb_preds = FxHashSet::default();
428
429 for clause in unit.clauses {
430 program.add_clause(arena, clause);
431 idb_preds.insert(clause.head.sym);
432 all_preds.insert(clause.head.sym);
433 for premise in clause.premises {
434 if let ast::Term::Atom(atom) = premise {
435 all_preds.insert(atom.sym);
436 } else if let ast::Term::NegAtom(atom) = premise {
437 all_preds.insert(atom.sym);
438 }
439 }
440 }
441
442 for pred in all_preds {
443 if !idb_preds.contains(&pred) {
444 program.ext_preds.push(pred);
445 }
446 }
447
448 let stratified = program.stratify().map_err(|e| anyhow!(e))?;
449 let ctx = LoweringContext::new(arena);
450 let ir = ctx.lower_unit(unit);
451
452 Ok((ir, stratified))
453}
454
455fn full_recompute(
460 source: &str,
461 store: &mut dyn Store,
462) -> Result<(HashSet<String>, HashSet<String>, Option<ProvenanceIndex>)> {
463 let arena = Arena::new_with_global_interner();
464 let (mut ir, stratified) = compile_source(source, &arena)?;
465
466 let mut edb_names = HashSet::new();
468 for pred in stratified.extensional_preds() {
469 if let Some(name) = arena.predicate_name(pred) {
470 edb_names.insert(name.to_string());
471 }
472 }
473
474 let mut idb_names = HashSet::new();
475 for stratum in stratified.strata() {
476 for pred in &stratum {
477 if let Some(name) = arena.predicate_name(*pred) {
478 idb_names.insert(name.to_string());
479 }
480 }
481 }
482
483 let mut exec_store = MemStore::new();
485 for rel in &edb_names {
486 exec_store.create_relation(rel);
487 if let Ok(iter) = store.scan(rel) {
488 for tuple in iter {
489 exec_store.insert(rel, tuple)?;
490 }
491 }
492 }
493 exec_store.merge_deltas();
494
495 let interpreter = mangle_driver::execute(&mut ir, &stratified, Box::new(exec_store))?;
497
498 for rel in &idb_names {
500 store.create_relation(rel);
501 if let Ok(iter) = interpreter.store().scan(rel) {
502 for tuple in iter {
503 store.insert(rel, tuple)?;
504 }
505 }
506 }
507 store.merge_deltas();
508
509 Ok((edb_names, idb_names, None))
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515
516 #[test]
517 fn test_database_basic() -> Result<()> {
518 let config = DatabaseConfig {
519 name: "test".to_string(),
520 source: r#"
521 p(1). p(2).
522 q(X) :- p(X).
523 "#
524 .to_string(),
525 edb_sources: vec![],
526 idb_mode: IdbMode::InMemory,
527 recompute: RecomputeStrategy::Full,
528 store_backend: StoreBackend::InMemory,
529 };
530
531 let db = Database::open(config)?;
532
533 let facts = db.query("q")?;
534 let mut values: Vec<i64> = facts
535 .iter()
536 .map(|t| match t[0] {
537 Value::Number(n) => n,
538 _ => panic!("expected number"),
539 })
540 .collect();
541 values.sort();
542 assert_eq!(values, vec![1, 2]);
543
544 Ok(())
545 }
546
547 #[test]
548 fn test_database_reachability() -> Result<()> {
549 let config = DatabaseConfig {
550 name: "test".to_string(),
551 source: r#"
552 edge(1, 2). edge(2, 3). edge(3, 4).
553 reachable(X, Y) :- edge(X, Y).
554 reachable(X, Z) :- reachable(X, Y), edge(Y, Z).
555 "#
556 .to_string(),
557 edb_sources: vec![],
558 idb_mode: IdbMode::InMemory,
559 recompute: RecomputeStrategy::Full,
560 store_backend: StoreBackend::InMemory,
561 };
562
563 let db = Database::open(config)?;
564
565 let facts = db.query("reachable")?;
566 assert_eq!(facts.len(), 6); Ok(())
569 }
570
571 #[test]
572 fn test_database_insert_recompute() -> Result<()> {
573 let config = DatabaseConfig {
574 name: "test".to_string(),
575 source: r#"
576 q(X) :- p(X).
577 "#
578 .to_string(),
579 edb_sources: vec![],
580 idb_mode: IdbMode::InMemory,
581 recompute: RecomputeStrategy::Full,
582 store_backend: StoreBackend::InMemory,
583 };
584
585 let db = Database::open(config)?;
586
587 let facts = db.query("q")?;
589 assert!(facts.is_empty());
590
591 db.insert("p", vec![Value::Number(42)])?;
593
594 let facts = db.query("q")?;
596 assert_eq!(facts.len(), 1);
597 assert_eq!(facts[0], vec![Value::Number(42)]);
598
599 Ok(())
600 }
601
602 #[test]
603 fn test_database_with_edb_source() -> Result<()> {
604 struct TestSource {
606 facts: Vec<Vec<Value>>,
607 }
608 impl crate::source::EdbSource for TestSource {
609 fn name(&self) -> &str {
610 "test_source"
611 }
612 fn relations(&self) -> Result<Vec<crate::source::RelationInfo>> {
613 Ok(vec![crate::source::RelationInfo {
614 name: "edge".to_string(),
615 estimated_rows: self.facts.len(),
616 }])
617 }
618 fn scan(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
619 if relation == "edge" {
620 Ok(self.facts.clone())
621 } else {
622 Ok(vec![])
623 }
624 }
625 fn fingerprint(&self) -> Result<Option<crate::source::Fingerprint>> {
626 Ok(Some(crate::source::Fingerprint(vec![1, 2, 3])))
627 }
628 }
629
630 let source = TestSource {
631 facts: vec![
632 vec![Value::Number(1), Value::Number(2)],
633 vec![Value::Number(2), Value::Number(3)],
634 vec![Value::Number(3), Value::Number(4)],
635 ],
636 };
637
638 let config = DatabaseConfig {
639 name: "test".to_string(),
640 source: r#"
641 reachable(X, Y) :- edge(X, Y).
642 reachable(X, Z) :- reachable(X, Y), edge(Y, Z).
643 "#
644 .to_string(),
645 edb_sources: vec![Arc::new(source)],
646 idb_mode: IdbMode::InMemory,
647 recompute: RecomputeStrategy::Full,
648 store_backend: StoreBackend::InMemory,
649 };
650
651 let db = Database::open(config)?;
652
653 let facts = db.query("reachable")?;
654 assert_eq!(facts.len(), 6);
655
656 let edges = db.query("edge")?;
658 assert_eq!(edges.len(), 3);
659
660 Ok(())
661 }
662
663 #[test]
664 fn test_database_with_file_idb_cache() -> Result<()> {
665 let cache_dir = tempfile::tempdir()?;
666 let backend = Arc::new(crate::file_backend::FileIdbBackend::new(cache_dir.path()));
667
668 let config1 = DatabaseConfig {
670 name: "cached_test".to_string(),
671 source: r#"
672 p(1). p(2). p(3).
673 q(X) :- p(X).
674 "#
675 .to_string(),
676 edb_sources: vec![],
677 idb_mode: IdbMode::Cached(backend.clone()),
678 recompute: RecomputeStrategy::Full,
679 store_backend: StoreBackend::InMemory,
680 };
681
682 let db1 = Database::open(config1)?;
683 let facts1 = db1.query("q")?;
684 assert_eq!(facts1.len(), 3);
685 drop(db1);
686
687 assert!(cache_dir.path().join("cached_test.meta.json").exists());
689 assert!(cache_dir.path().join("cached_test.idb.mgr").exists());
690
691 let config2 = DatabaseConfig {
693 name: "cached_test".to_string(),
694 source: r#"
695 p(1). p(2). p(3).
696 q(X) :- p(X).
697 "#
698 .to_string(),
699 edb_sources: vec![],
700 idb_mode: IdbMode::Cached(backend.clone()),
701 recompute: RecomputeStrategy::Full,
702 store_backend: StoreBackend::InMemory,
703 };
704
705 let db2 = Database::open(config2)?;
706 let facts2 = db2.query("q")?;
707 assert_eq!(facts2.len(), 3);
708
709 Ok(())
710 }
711
712 #[test]
713 fn test_database_retract_recompute() -> Result<()> {
714 struct TestEdgeSource;
716 impl crate::source::EdbSource for TestEdgeSource {
717 fn name(&self) -> &str {
718 "edges"
719 }
720 fn relations(&self) -> Result<Vec<crate::source::RelationInfo>> {
721 Ok(vec![crate::source::RelationInfo {
722 name: "edge".to_string(),
723 estimated_rows: 2,
724 }])
725 }
726 fn scan(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
727 if relation == "edge" {
728 Ok(vec![
729 vec![Value::Number(1), Value::Number(2)],
730 vec![Value::Number(2), Value::Number(3)],
731 ])
732 } else {
733 Ok(vec![])
734 }
735 }
736 fn fingerprint(&self) -> Result<Option<crate::source::Fingerprint>> {
737 Ok(None) }
739 }
740
741 let config = DatabaseConfig {
742 name: "test".to_string(),
743 source: r#"
744 reachable(X, Y) :- edge(X, Y).
745 reachable(X, Z) :- reachable(X, Y), edge(Y, Z).
746 "#
747 .to_string(),
748 edb_sources: vec![Arc::new(TestEdgeSource)],
749 idb_mode: IdbMode::InMemory,
750 recompute: RecomputeStrategy::Full,
751 store_backend: StoreBackend::InMemory,
752 };
753
754 let db = Database::open(config)?;
755
756 let facts = db.query("reachable")?;
758 assert_eq!(facts.len(), 3);
759
760 db.retract("edge", &[Value::Number(2), Value::Number(3)])?;
762
763 let facts = db.query("reachable")?;
765 assert_eq!(facts.len(), 1);
766 assert_eq!(facts[0], vec![Value::Number(1), Value::Number(2)]);
767
768 Ok(())
769 }
770
771 #[test]
772 fn test_database_reload() -> Result<()> {
773 let config = DatabaseConfig {
774 name: "test".to_string(),
775 source: r#"
776 p(1). p(2).
777 q(X) :- p(X).
778 "#
779 .to_string(),
780 edb_sources: vec![],
781 idb_mode: IdbMode::InMemory,
782 recompute: RecomputeStrategy::Full,
783 store_backend: StoreBackend::InMemory,
784 };
785
786 let db = Database::open(config)?;
787
788 let facts = db.query("q")?;
789 assert_eq!(facts.len(), 2);
790
791 db.reload()?;
793 let facts = db.query("q")?;
794 assert_eq!(facts.len(), 2);
795
796 Ok(())
797 }
798
799 #[test]
800 fn test_database_relation_names() -> Result<()> {
801 let config = DatabaseConfig {
802 name: "test".to_string(),
803 source: r#"
804 edge(1, 2).
805 reachable(X, Y) :- edge(X, Y).
806 "#
807 .to_string(),
808 edb_sources: vec![],
809 idb_mode: IdbMode::InMemory,
810 recompute: RecomputeStrategy::Full,
811 store_backend: StoreBackend::InMemory,
812 };
813
814 let db = Database::open(config)?;
815
816 let mut names = db.relation_names()?;
817 names.sort();
818 assert!(names.contains(&"edge".to_string()));
819 assert!(names.contains(&"reachable".to_string()));
820
821 Ok(())
822 }
823
824 #[test]
825 fn test_database_empty_sources() -> Result<()> {
826 let config = DatabaseConfig {
827 name: "test".to_string(),
828 source: r#"
829 q(X) :- p(X).
830 "#
831 .to_string(),
832 edb_sources: vec![],
833 idb_mode: IdbMode::InMemory,
834 recompute: RecomputeStrategy::Full,
835 store_backend: StoreBackend::InMemory,
836 };
837
838 let db = Database::open(config)?;
839
840 let facts = db.query("q")?;
841 assert!(facts.is_empty());
842
843 Ok(())
844 }
845}