Skip to main content

mangle_db/
database.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The `Database` abstraction: compiles a Mangle program, loads EDB facts,
16//! executes the program, and serves queries from the resulting store.
17
18use std::collections::HashSet;
19use std::path::PathBuf;
20use std::sync::{Arc, RwLock};
21
22use anyhow::{Result, anyhow};
23use fxhash::FxHashSet;
24use mangle_analysis::{LoweringContext, Program, StratifiedProgram, rewrite_unit};
25use mangle_ast::{self as ast, Arena};
26use mangle_common::{Store, Value};
27use mangle_interpreter::MemStore;
28use mangle_ir::Ir;
29use mangle_parse::Parser;
30use sha2::{Digest, Sha256};
31
32use crate::backend::IdbBackend;
33use crate::provenance::ProvenanceIndex;
34use crate::source::{EdbSource, Fingerprint};
35
36/// How IDB (derived facts) are handled across restarts.
37pub enum IdbMode {
38    /// IDB is purely in-memory. Lost on drop, recomputed on open.
39    InMemory,
40    /// IDB is cached by the given backend. Loaded if valid, else recomputed.
41    Cached(Arc<dyn IdbBackend>),
42}
43
44/// How IDB is recomputed when EDB changes.
45pub enum RecomputeStrategy {
46    /// Clear all IDB and recompute from scratch. Simple, always correct.
47    Full,
48    /// Track provenance during execution. On EDB mutation, use DRed
49    /// to incrementally maintain IDB.
50    Incremental,
51}
52
53/// Where the working store lives during and after execution.
54pub enum StoreBackend {
55    /// All facts in memory (HashMap-based MemStore).
56    InMemory,
57    /// Disk-backed store for large datasets (requires `disk` feature).
58    Disk(PathBuf),
59}
60
61/// Configuration for opening a `Database`.
62pub struct DatabaseConfig {
63    pub name: String,
64    pub source: String,
65    pub edb_sources: Vec<Arc<dyn EdbSource>>,
66    pub idb_mode: IdbMode,
67    pub recompute: RecomputeStrategy,
68    pub store_backend: StoreBackend,
69}
70
71struct DatabaseState {
72    /// The working store — holds all facts (EDB + IDB) after execution.
73    store: Box<dyn Store + Send + Sync>,
74    edb_relations: HashSet<String>,
75    idb_relations: HashSet<String>,
76    edb_fingerprint: Option<Fingerprint>,
77    program_hash: [u8; 32],
78    /// Provenance data, populated only when RecomputeStrategy::Incremental.
79    provenance: Option<ProvenanceIndex>,
80}
81
82/// A compiled and executed Mangle database.
83///
84/// Thread-safe: queries take a read lock, mutations take a write lock.
85pub struct Database {
86    config_name: String,
87    config_source: String,
88    edb_sources: Vec<Arc<dyn EdbSource>>,
89    idb_mode_is_cached: bool,
90    idb_backend: Option<Arc<dyn IdbBackend>>,
91    recompute_is_incremental: bool,
92    state: RwLock<DatabaseState>,
93}
94
95impl Database {
96    /// Open a database: compile the program, load EDB, execute, serve queries.
97    pub fn open(config: DatabaseConfig) -> Result<Self> {
98        let program_hash = compute_program_hash(&config.source);
99        let edb_fingerprint = compute_edb_fingerprint(&config.edb_sources)?;
100
101        let (idb_mode_is_cached, idb_backend) = match &config.idb_mode {
102            IdbMode::InMemory => (false, None),
103            IdbMode::Cached(backend) => (true, Some(Arc::clone(backend))),
104        };
105        let recompute_is_incremental = matches!(config.recompute, RecomputeStrategy::Incremental);
106
107        // Create the working store
108        let mut store: Box<dyn Store + Send + Sync> = match &config.store_backend {
109            StoreBackend::InMemory => Box::new(MemStore::new()),
110            #[cfg(feature = "disk")]
111            StoreBackend::Disk(path) => Box::new(crate::disk_store::DiskStore::open(path)?),
112            #[cfg(not(feature = "disk"))]
113            StoreBackend::Disk(_) => {
114                return Err(anyhow!("Disk store requires the 'disk' feature"));
115            }
116        };
117
118        // Try loading cached IDB
119        let mut cache_hit = false;
120        if let Some(ref backend) = idb_backend {
121            if let Some((meta, snapshot)) = backend.load(&config.name)? {
122                if meta.program_hash == program_hash
123                    && edb_fingerprint
124                        .as_ref()
125                        .is_some_and(|fp| fp.0 == meta.edb_fingerprint)
126                {
127                    // Cache is valid — load EDB from sources, then IDB from cache
128                    load_edb_into_store(&config.edb_sources, &mut *store)?;
129                    for (rel_name, facts) in snapshot.relations {
130                        store.create_relation(&rel_name);
131                        for tuple in facts {
132                            store.insert(&rel_name, tuple)?;
133                        }
134                    }
135                    store.merge_deltas();
136                    cache_hit = true;
137                }
138            }
139        }
140
141        let (edb_relations, idb_relations, provenance) = if !cache_hit {
142            // Load EDB
143            load_edb_into_store(&config.edb_sources, &mut *store)?;
144            // Compile and execute
145            full_recompute(&config.source, &mut *store)?
146        } else {
147            // We loaded from cache — figure out relation sets from the store
148            // For now, we re-derive them by compiling (without executing)
149            let (edb_rels, idb_rels) = extract_relation_names(&config.source)?;
150            (edb_rels, idb_rels, None)
151        };
152
153        // Save to cache if needed and we just computed
154        if !cache_hit {
155            if let Some(ref backend) = idb_backend {
156                if let Some(ref fp) = edb_fingerprint {
157                    let meta = crate::backend::CacheMeta {
158                        program_hash,
159                        edb_fingerprint: fp.0.clone(),
160                        created_at: std::time::SystemTime::now()
161                            .duration_since(std::time::UNIX_EPOCH)
162                            .unwrap_or_default()
163                            .as_secs(),
164                    };
165                    let snapshot = extract_idb_snapshot(&*store, &idb_relations);
166                    backend.save(&config.name, &meta, &snapshot)?;
167                }
168            }
169        }
170
171        let state = DatabaseState {
172            store,
173            edb_relations,
174            idb_relations,
175            edb_fingerprint,
176            program_hash,
177            provenance,
178        };
179
180        Ok(Database {
181            config_name: config.name,
182            config_source: config.source,
183            edb_sources: config.edb_sources,
184            idb_mode_is_cached: idb_mode_is_cached,
185            idb_backend,
186            recompute_is_incremental,
187            state: RwLock::new(state),
188        })
189    }
190
191    /// Query all tuples in a relation.
192    pub fn query(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
193        let state = self.state.read().map_err(|_| anyhow!("lock poisoned"))?;
194        let iter = state.store.scan(relation)?;
195        Ok(iter.collect())
196    }
197
198    /// Insert a fact into an EDB relation and recompute IDB.
199    pub fn insert(&self, relation: &str, tuple: Vec<Value>) -> Result<()> {
200        let mut state = self.state.write().map_err(|_| anyhow!("lock poisoned"))?;
201        state.store.insert(relation, tuple)?;
202        state.store.merge_deltas();
203
204        if state.edb_relations.contains(relation) {
205            // EDB changed — recompute IDB
206            self.recompute_idb(&mut state)?;
207        }
208        Ok(())
209    }
210
211    /// Retract a fact from an EDB relation and recompute IDB.
212    pub fn retract(&self, relation: &str, tuple: &[Value]) -> Result<()> {
213        let mut state = self.state.write().map_err(|_| anyhow!("lock poisoned"))?;
214        state.store.retract(relation, tuple)?;
215
216        if state.edb_relations.contains(relation) {
217            self.recompute_idb(&mut state)?;
218        }
219        Ok(())
220    }
221
222    /// Create a batch for deferred recomputation.
223    pub fn batch(&self) -> Batch<'_> {
224        Batch { db: self }
225    }
226
227    /// Force reload from sources and recompute everything.
228    pub fn reload(&self) -> Result<()> {
229        let mut state = self.state.write().map_err(|_| anyhow!("lock poisoned"))?;
230
231        // Clear everything
232        let all_rels: Vec<String> = state.store.relation_names();
233        for rel in &all_rels {
234            state.store.clear(rel);
235        }
236
237        // Reload EDB
238        load_edb_into_store(&self.edb_sources, &mut *state.store)?;
239
240        // Recompute
241        let (edb_rels, idb_rels, provenance) =
242            full_recompute(&self.config_source, &mut *state.store)?;
243        state.edb_relations = edb_rels;
244        state.idb_relations = idb_rels;
245        state.provenance = provenance;
246        state.edb_fingerprint = compute_edb_fingerprint(&self.edb_sources)?;
247        state.program_hash = compute_program_hash(&self.config_source);
248
249        // Update cache
250        if let Some(ref backend) = self.idb_backend {
251            if let Some(ref fp) = state.edb_fingerprint {
252                let meta = crate::backend::CacheMeta {
253                    program_hash: state.program_hash,
254                    edb_fingerprint: fp.0.clone(),
255                    created_at: std::time::SystemTime::now()
256                        .duration_since(std::time::UNIX_EPOCH)
257                        .unwrap_or_default()
258                        .as_secs(),
259                };
260                let snapshot = extract_idb_snapshot(&*state.store, &state.idb_relations);
261                backend.save(&self.config_name, &meta, &snapshot)?;
262            }
263        }
264
265        Ok(())
266    }
267
268    /// Returns the names of all relations in the store.
269    pub fn relation_names(&self) -> Result<Vec<String>> {
270        let state = self.state.read().map_err(|_| anyhow!("lock poisoned"))?;
271        Ok(state.store.relation_names())
272    }
273
274    fn recompute_idb(&self, state: &mut DatabaseState) -> Result<()> {
275        // Clear IDB relations
276        for rel in &state.idb_relations {
277            state.store.clear(rel);
278        }
279
280        // Re-execute
281        let (_, idb_rels, provenance) = full_recompute(&self.config_source, &mut *state.store)?;
282        state.idb_relations = idb_rels;
283        state.provenance = provenance;
284
285        // Update cache
286        if let Some(ref backend) = self.idb_backend {
287            if let Some(ref fp) = state.edb_fingerprint {
288                let meta = crate::backend::CacheMeta {
289                    program_hash: state.program_hash,
290                    edb_fingerprint: fp.0.clone(),
291                    created_at: std::time::SystemTime::now()
292                        .duration_since(std::time::UNIX_EPOCH)
293                        .unwrap_or_default()
294                        .as_secs(),
295                };
296                let snapshot = extract_idb_snapshot(&*state.store, &state.idb_relations);
297                backend.save(&self.config_name, &meta, &snapshot)?;
298            }
299        }
300
301        Ok(())
302    }
303}
304
305/// Batch operations with deferred recomputation.
306pub struct Batch<'a> {
307    db: &'a Database,
308}
309
310impl<'a> Batch<'a> {
311    pub fn insert(&self, relation: &str, tuple: Vec<Value>) -> Result<()> {
312        let mut state = self
313            .db
314            .state
315            .write()
316            .map_err(|_| anyhow!("lock poisoned"))?;
317        state.store.insert(relation, tuple)?;
318        state.store.merge_deltas();
319        Ok(())
320    }
321
322    pub fn retract(&self, relation: &str, tuple: &[Value]) -> Result<()> {
323        let mut state = self
324            .db
325            .state
326            .write()
327            .map_err(|_| anyhow!("lock poisoned"))?;
328        state.store.retract(relation, tuple)?;
329        Ok(())
330    }
331
332    /// Apply all batched changes and recompute IDB.
333    pub fn commit(self) -> Result<()> {
334        let mut state = self
335            .db
336            .state
337            .write()
338            .map_err(|_| anyhow!("lock poisoned"))?;
339        self.db.recompute_idb(&mut state)
340    }
341}
342
343// --- Internal helpers ---
344
345fn compute_program_hash(source: &str) -> [u8; 32] {
346    let mut hasher = Sha256::new();
347    hasher.update(source.as_bytes());
348    hasher.finalize().into()
349}
350
351fn compute_edb_fingerprint(sources: &[Arc<dyn EdbSource>]) -> Result<Option<Fingerprint>> {
352    let mut hasher = Sha256::new();
353    for source in sources {
354        match source.fingerprint()? {
355            Some(fp) => hasher.update(&fp.0),
356            None => return Ok(None), // Any source without fingerprint → always recompute
357        }
358    }
359    Ok(Some(Fingerprint(hasher.finalize().to_vec())))
360}
361
362fn load_edb_into_store(sources: &[Arc<dyn EdbSource>], store: &mut dyn Store) -> Result<()> {
363    for source in sources {
364        let relations = source.relations()?;
365        for rel_info in &relations {
366            store.create_relation(&rel_info.name);
367            let tuples = source.scan(&rel_info.name)?;
368            for tuple in tuples {
369                store.insert(&rel_info.name, tuple)?;
370            }
371        }
372    }
373    store.merge_deltas();
374    Ok(())
375}
376
377fn extract_idb_snapshot(
378    store: &dyn Store,
379    idb_relations: &HashSet<String>,
380) -> crate::backend::IdbSnapshot {
381    let mut relations = Vec::new();
382    for rel in idb_relations {
383        if let Ok(iter) = store.scan(rel) {
384            let facts: Vec<Vec<Value>> = iter.collect();
385            if !facts.is_empty() {
386                relations.push((rel.clone(), facts));
387            }
388        }
389    }
390    crate::backend::IdbSnapshot { relations }
391}
392
393/// Extract EDB and IDB relation names by compiling (but not executing) the program.
394fn extract_relation_names(source: &str) -> Result<(HashSet<String>, HashSet<String>)> {
395    let arena = Arena::new_with_global_interner();
396    let (_ir, stratified) = compile_source(source, &arena)?;
397
398    let mut edb_names = HashSet::new();
399    for pred in stratified.extensional_preds() {
400        if let Some(name) = arena.predicate_name(pred) {
401            edb_names.insert(name.to_string());
402        }
403    }
404
405    let mut idb_names = HashSet::new();
406    for stratum in stratified.strata() {
407        for pred in &stratum {
408            if let Some(name) = arena.predicate_name(*pred) {
409                idb_names.insert(name.to_string());
410            }
411        }
412    }
413
414    Ok((edb_names, idb_names))
415}
416
417fn compile_source<'a>(source: &str, arena: &'a Arena) -> Result<(Ir, StratifiedProgram<'a>)> {
418    let mut parser = Parser::new(arena, source.as_bytes(), "source");
419    parser.next_token().map_err(|e| anyhow!(e))?;
420    let unit = parser.parse_unit()?;
421
422    let rewritten_unit = rewrite_unit(arena, unit);
423    let unit = &rewritten_unit;
424
425    let mut program = Program::new(arena);
426    let mut all_preds = FxHashSet::default();
427    let mut idb_preds = FxHashSet::default();
428
429    for clause in unit.clauses {
430        program.add_clause(arena, clause);
431        idb_preds.insert(clause.head.sym);
432        all_preds.insert(clause.head.sym);
433        for premise in clause.premises {
434            if let ast::Term::Atom(atom) = premise {
435                all_preds.insert(atom.sym);
436            } else if let ast::Term::NegAtom(atom) = premise {
437                all_preds.insert(atom.sym);
438            }
439        }
440    }
441
442    for pred in all_preds {
443        if !idb_preds.contains(&pred) {
444            program.ext_preds.push(pred);
445        }
446    }
447
448    let stratified = program.stratify().map_err(|e| anyhow!(e))?;
449    let ctx = LoweringContext::new(arena);
450    let ir = ctx.lower_unit(unit);
451
452    Ok((ir, stratified))
453}
454
455/// Compile and execute the program, transferring results into the target store.
456///
457/// Uses `mangle_driver::execute()` with a fresh MemStore, then copies
458/// the resulting IDB facts into the target store.
459fn full_recompute(
460    source: &str,
461    store: &mut dyn Store,
462) -> Result<(HashSet<String>, HashSet<String>, Option<ProvenanceIndex>)> {
463    let arena = Arena::new_with_global_interner();
464    let (mut ir, stratified) = compile_source(source, &arena)?;
465
466    // Extract predicate names before they go out of scope with the arena
467    let mut edb_names = HashSet::new();
468    for pred in stratified.extensional_preds() {
469        if let Some(name) = arena.predicate_name(pred) {
470            edb_names.insert(name.to_string());
471        }
472    }
473
474    let mut idb_names = HashSet::new();
475    for stratum in stratified.strata() {
476        for pred in &stratum {
477            if let Some(name) = arena.predicate_name(*pred) {
478                idb_names.insert(name.to_string());
479            }
480        }
481    }
482
483    // Build a MemStore with EDB facts copied from the target store
484    let mut exec_store = MemStore::new();
485    for rel in &edb_names {
486        exec_store.create_relation(rel);
487        if let Ok(iter) = store.scan(rel) {
488            for tuple in iter {
489                exec_store.insert(rel, tuple)?;
490            }
491        }
492    }
493    exec_store.merge_deltas();
494
495    // Execute using the driver
496    let interpreter = mangle_driver::execute(&mut ir, &stratified, Box::new(exec_store))?;
497
498    // Copy IDB results back into the target store
499    for rel in &idb_names {
500        store.create_relation(rel);
501        if let Ok(iter) = interpreter.store().scan(rel) {
502            for tuple in iter {
503                store.insert(rel, tuple)?;
504            }
505        }
506    }
507    store.merge_deltas();
508
509    Ok((edb_names, idb_names, None))
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    #[test]
517    fn test_database_basic() -> Result<()> {
518        let config = DatabaseConfig {
519            name: "test".to_string(),
520            source: r#"
521                p(1). p(2).
522                q(X) :- p(X).
523            "#
524            .to_string(),
525            edb_sources: vec![],
526            idb_mode: IdbMode::InMemory,
527            recompute: RecomputeStrategy::Full,
528            store_backend: StoreBackend::InMemory,
529        };
530
531        let db = Database::open(config)?;
532
533        let facts = db.query("q")?;
534        let mut values: Vec<i64> = facts
535            .iter()
536            .map(|t| match t[0] {
537                Value::Number(n) => n,
538                _ => panic!("expected number"),
539            })
540            .collect();
541        values.sort();
542        assert_eq!(values, vec![1, 2]);
543
544        Ok(())
545    }
546
547    #[test]
548    fn test_database_reachability() -> Result<()> {
549        let config = DatabaseConfig {
550            name: "test".to_string(),
551            source: r#"
552                edge(1, 2). edge(2, 3). edge(3, 4).
553                reachable(X, Y) :- edge(X, Y).
554                reachable(X, Z) :- reachable(X, Y), edge(Y, Z).
555            "#
556            .to_string(),
557            edb_sources: vec![],
558            idb_mode: IdbMode::InMemory,
559            recompute: RecomputeStrategy::Full,
560            store_backend: StoreBackend::InMemory,
561        };
562
563        let db = Database::open(config)?;
564
565        let facts = db.query("reachable")?;
566        assert_eq!(facts.len(), 6); // (1,2),(1,3),(1,4),(2,3),(2,4),(3,4)
567
568        Ok(())
569    }
570
571    #[test]
572    fn test_database_insert_recompute() -> Result<()> {
573        let config = DatabaseConfig {
574            name: "test".to_string(),
575            source: r#"
576                q(X) :- p(X).
577            "#
578            .to_string(),
579            edb_sources: vec![],
580            idb_mode: IdbMode::InMemory,
581            recompute: RecomputeStrategy::Full,
582            store_backend: StoreBackend::InMemory,
583        };
584
585        let db = Database::open(config)?;
586
587        // Initially q is empty (p has no facts from source, but we can insert)
588        let facts = db.query("q")?;
589        assert!(facts.is_empty());
590
591        // Insert into EDB relation p
592        db.insert("p", vec![Value::Number(42)])?;
593
594        // q should now contain 42
595        let facts = db.query("q")?;
596        assert_eq!(facts.len(), 1);
597        assert_eq!(facts[0], vec![Value::Number(42)]);
598
599        Ok(())
600    }
601
602    #[test]
603    fn test_database_with_edb_source() -> Result<()> {
604        // Create a programmatic EDB source
605        struct TestSource {
606            facts: Vec<Vec<Value>>,
607        }
608        impl crate::source::EdbSource for TestSource {
609            fn name(&self) -> &str {
610                "test_source"
611            }
612            fn relations(&self) -> Result<Vec<crate::source::RelationInfo>> {
613                Ok(vec![crate::source::RelationInfo {
614                    name: "edge".to_string(),
615                    estimated_rows: self.facts.len(),
616                }])
617            }
618            fn scan(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
619                if relation == "edge" {
620                    Ok(self.facts.clone())
621                } else {
622                    Ok(vec![])
623                }
624            }
625            fn fingerprint(&self) -> Result<Option<crate::source::Fingerprint>> {
626                Ok(Some(crate::source::Fingerprint(vec![1, 2, 3])))
627            }
628        }
629
630        let source = TestSource {
631            facts: vec![
632                vec![Value::Number(1), Value::Number(2)],
633                vec![Value::Number(2), Value::Number(3)],
634                vec![Value::Number(3), Value::Number(4)],
635            ],
636        };
637
638        let config = DatabaseConfig {
639            name: "test".to_string(),
640            source: r#"
641                reachable(X, Y) :- edge(X, Y).
642                reachable(X, Z) :- reachable(X, Y), edge(Y, Z).
643            "#
644            .to_string(),
645            edb_sources: vec![Arc::new(source)],
646            idb_mode: IdbMode::InMemory,
647            recompute: RecomputeStrategy::Full,
648            store_backend: StoreBackend::InMemory,
649        };
650
651        let db = Database::open(config)?;
652
653        let facts = db.query("reachable")?;
654        assert_eq!(facts.len(), 6);
655
656        // Check edge facts are accessible too
657        let edges = db.query("edge")?;
658        assert_eq!(edges.len(), 3);
659
660        Ok(())
661    }
662
663    #[test]
664    fn test_database_with_file_idb_cache() -> Result<()> {
665        let cache_dir = tempfile::tempdir()?;
666        let backend = Arc::new(crate::file_backend::FileIdbBackend::new(cache_dir.path()));
667
668        // First open: computes and caches
669        let config1 = DatabaseConfig {
670            name: "cached_test".to_string(),
671            source: r#"
672                p(1). p(2). p(3).
673                q(X) :- p(X).
674            "#
675            .to_string(),
676            edb_sources: vec![],
677            idb_mode: IdbMode::Cached(backend.clone()),
678            recompute: RecomputeStrategy::Full,
679            store_backend: StoreBackend::InMemory,
680        };
681
682        let db1 = Database::open(config1)?;
683        let facts1 = db1.query("q")?;
684        assert_eq!(facts1.len(), 3);
685        drop(db1);
686
687        // Verify cache files exist
688        assert!(cache_dir.path().join("cached_test.meta.json").exists());
689        assert!(cache_dir.path().join("cached_test.idb.mgr").exists());
690
691        // Second open: should load from cache
692        let config2 = DatabaseConfig {
693            name: "cached_test".to_string(),
694            source: r#"
695                p(1). p(2). p(3).
696                q(X) :- p(X).
697            "#
698            .to_string(),
699            edb_sources: vec![],
700            idb_mode: IdbMode::Cached(backend.clone()),
701            recompute: RecomputeStrategy::Full,
702            store_backend: StoreBackend::InMemory,
703        };
704
705        let db2 = Database::open(config2)?;
706        let facts2 = db2.query("q")?;
707        assert_eq!(facts2.len(), 3);
708
709        Ok(())
710    }
711
712    #[test]
713    fn test_database_retract_recompute() -> Result<()> {
714        // Use an EDB source so that retracted facts don't come back from source text
715        struct TestEdgeSource;
716        impl crate::source::EdbSource for TestEdgeSource {
717            fn name(&self) -> &str {
718                "edges"
719            }
720            fn relations(&self) -> Result<Vec<crate::source::RelationInfo>> {
721                Ok(vec![crate::source::RelationInfo {
722                    name: "edge".to_string(),
723                    estimated_rows: 2,
724                }])
725            }
726            fn scan(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
727                if relation == "edge" {
728                    Ok(vec![
729                        vec![Value::Number(1), Value::Number(2)],
730                        vec![Value::Number(2), Value::Number(3)],
731                    ])
732                } else {
733                    Ok(vec![])
734                }
735            }
736            fn fingerprint(&self) -> Result<Option<crate::source::Fingerprint>> {
737                Ok(None) // Always recompute
738            }
739        }
740
741        let config = DatabaseConfig {
742            name: "test".to_string(),
743            source: r#"
744                reachable(X, Y) :- edge(X, Y).
745                reachable(X, Z) :- reachable(X, Y), edge(Y, Z).
746            "#
747            .to_string(),
748            edb_sources: vec![Arc::new(TestEdgeSource)],
749            idb_mode: IdbMode::InMemory,
750            recompute: RecomputeStrategy::Full,
751            store_backend: StoreBackend::InMemory,
752        };
753
754        let db = Database::open(config)?;
755
756        // Initially: reachable(1,2), reachable(2,3), reachable(1,3)
757        let facts = db.query("reachable")?;
758        assert_eq!(facts.len(), 3);
759
760        // Retract edge(2,3) from the store
761        db.retract("edge", &[Value::Number(2), Value::Number(3)])?;
762
763        // After retract + recompute, only edge(1,2) remains → reachable(1,2) only
764        let facts = db.query("reachable")?;
765        assert_eq!(facts.len(), 1);
766        assert_eq!(facts[0], vec![Value::Number(1), Value::Number(2)]);
767
768        Ok(())
769    }
770
771    #[test]
772    fn test_database_reload() -> Result<()> {
773        let config = DatabaseConfig {
774            name: "test".to_string(),
775            source: r#"
776                p(1). p(2).
777                q(X) :- p(X).
778            "#
779            .to_string(),
780            edb_sources: vec![],
781            idb_mode: IdbMode::InMemory,
782            recompute: RecomputeStrategy::Full,
783            store_backend: StoreBackend::InMemory,
784        };
785
786        let db = Database::open(config)?;
787
788        let facts = db.query("q")?;
789        assert_eq!(facts.len(), 2);
790
791        // Reload should re-derive the same results
792        db.reload()?;
793        let facts = db.query("q")?;
794        assert_eq!(facts.len(), 2);
795
796        Ok(())
797    }
798
799    #[test]
800    fn test_database_relation_names() -> Result<()> {
801        let config = DatabaseConfig {
802            name: "test".to_string(),
803            source: r#"
804                edge(1, 2).
805                reachable(X, Y) :- edge(X, Y).
806            "#
807            .to_string(),
808            edb_sources: vec![],
809            idb_mode: IdbMode::InMemory,
810            recompute: RecomputeStrategy::Full,
811            store_backend: StoreBackend::InMemory,
812        };
813
814        let db = Database::open(config)?;
815
816        let mut names = db.relation_names()?;
817        names.sort();
818        assert!(names.contains(&"edge".to_string()));
819        assert!(names.contains(&"reachable".to_string()));
820
821        Ok(())
822    }
823
824    #[test]
825    fn test_database_empty_sources() -> Result<()> {
826        let config = DatabaseConfig {
827            name: "test".to_string(),
828            source: r#"
829                q(X) :- p(X).
830            "#
831            .to_string(),
832            edb_sources: vec![],
833            idb_mode: IdbMode::InMemory,
834            recompute: RecomputeStrategy::Full,
835            store_backend: StoreBackend::InMemory,
836        };
837
838        let db = Database::open(config)?;
839
840        let facts = db.query("q")?;
841        assert!(facts.is_empty());
842
843        Ok(())
844    }
845}