Skip to main content

mangle_db/
file_source.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! File-based EDB source: reads `.mgr` (simplerow) and `.mg` (Mangle source) files.
16
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Mutex;
20
21use anyhow::{Result, anyhow};
22use mangle_common::Value;
23use sha2::{Digest, Sha256};
24
25use crate::simplerow;
26use crate::source::{EdbSource, Fingerprint, RelationInfo};
27
28/// An EDB source that reads facts from a directory of `.mgr` and `.mg` files.
29///
30/// - `.mgr` files are SimpleRow data files (fast loading).
31/// - `.mg` files are Mangle programs (compiled + executed to extract facts).
32pub struct FileEdbSource {
33    name: String,
34    dir: PathBuf,
35    /// Cached table data (relation_name -> facts). Populated on first access.
36    cache: Mutex<Option<HashMap<String, Vec<Vec<Value>>>>>,
37}
38
39impl FileEdbSource {
40    pub fn new(name: impl Into<String>, dir: impl Into<PathBuf>) -> Self {
41        Self {
42            name: name.into(),
43            dir: dir.into(),
44            cache: Mutex::new(None),
45        }
46    }
47
48    fn load_data(&self) -> Result<HashMap<String, Vec<Vec<Value>>>> {
49        let mut tables = HashMap::new();
50
51        if !self.dir.exists() {
52            return Err(anyhow!(
53                "EDB source directory does not exist: {:?}",
54                self.dir
55            ));
56        }
57
58        for entry in std::fs::read_dir(&self.dir)? {
59            let entry = entry?;
60            let path = entry.path();
61
62            if path.extension().is_some_and(|ext| ext == "mgr") {
63                // SimpleRow file
64                let data = std::fs::read(&path)?;
65                let sr_data = simplerow::read_from_bytes(&data)?;
66                for (name, facts) in sr_data.tables {
67                    tables.entry(name).or_insert_with(Vec::new).extend(facts);
68                }
69            } else if path.extension().is_some_and(|ext| ext == "mg") {
70                // Mangle source file — compile and execute to extract facts
71                let source = std::fs::read_to_string(&path)?;
72                let facts = execute_source_for_facts(&source)?;
73                for (name, tuples) in facts {
74                    tables.entry(name).or_insert_with(Vec::new).extend(tuples);
75                }
76            }
77        }
78
79        // Post-process __retract__ relations: remove matching facts from base relations
80        let retract_prefix = "__retract__";
81        let retract_keys: Vec<String> = tables
82            .keys()
83            .filter(|k| k.starts_with(retract_prefix))
84            .cloned()
85            .collect();
86        for retract_key in retract_keys {
87            let base_name = &retract_key[retract_prefix.len()..];
88            if let Some(retractions) = tables.remove(&retract_key) {
89                if let Some(base_facts) = tables.get_mut(base_name) {
90                    base_facts.retain(|fact| !retractions.contains(fact));
91                }
92            }
93        }
94
95        Ok(tables)
96    }
97
98    fn ensure_loaded(&self) -> Result<()> {
99        let mut cache = self.cache.lock().map_err(|_| anyhow!("lock poisoned"))?;
100        if cache.is_none() {
101            *cache = Some(self.load_data()?);
102        }
103        Ok(())
104    }
105}
106
107impl EdbSource for FileEdbSource {
108    fn name(&self) -> &str {
109        &self.name
110    }
111
112    fn relations(&self) -> Result<Vec<RelationInfo>> {
113        self.ensure_loaded()?;
114        let cache = self.cache.lock().map_err(|_| anyhow!("lock poisoned"))?;
115        let data = cache.as_ref().unwrap();
116        Ok(data
117            .iter()
118            .map(|(name, facts)| RelationInfo {
119                name: name.clone(),
120                estimated_rows: facts.len(),
121            })
122            .collect())
123    }
124
125    fn scan(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
126        self.ensure_loaded()?;
127        let cache = self.cache.lock().map_err(|_| anyhow!("lock poisoned"))?;
128        let data = cache.as_ref().unwrap();
129        Ok(data.get(relation).cloned().unwrap_or_default())
130    }
131
132    fn fingerprint(&self) -> Result<Option<Fingerprint>> {
133        if !self.dir.exists() {
134            return Ok(None);
135        }
136
137        let mut hasher = Sha256::new();
138        let mut entries: Vec<_> = std::fs::read_dir(&self.dir)?
139            .filter_map(|e| e.ok())
140            .filter(|e| {
141                let path = e.path();
142                path.extension()
143                    .is_some_and(|ext| ext == "mgr" || ext == "mg")
144            })
145            .collect();
146        entries.sort_by_key(|e| e.file_name());
147
148        for entry in entries {
149            let path = entry.path();
150            hasher.update(path.file_name().unwrap().as_encoded_bytes());
151            let meta = std::fs::metadata(&path)?;
152            hasher.update(meta.len().to_le_bytes());
153            if let Ok(mtime) = meta.modified() {
154                let secs = mtime
155                    .duration_since(std::time::UNIX_EPOCH)
156                    .unwrap_or_default()
157                    .as_secs();
158                hasher.update(secs.to_le_bytes());
159            }
160        }
161
162        Ok(Some(Fingerprint(hasher.finalize().to_vec())))
163    }
164}
165
166/// Execute a Mangle source string and return all derived facts.
167fn execute_source_for_facts(source: &str) -> Result<HashMap<String, Vec<Vec<Value>>>> {
168    use mangle_ast::Arena;
169    use mangle_interpreter::MemStore;
170
171    let arena = Arena::new_with_global_interner();
172    let (mut ir, stratified) = mangle_driver::compile(source, &arena)?;
173
174    // Collect IDB predicate names while we still have the arena
175    let mut idb_names = Vec::new();
176    for stratum in stratified.strata() {
177        for pred in &stratum {
178            if let Some(name) = arena.predicate_name(*pred) {
179                idb_names.push(name.to_string());
180            }
181        }
182    }
183
184    let store = Box::new(MemStore::new());
185    let interpreter = mangle_driver::execute(&mut ir, &stratified, store)?;
186
187    let mut result = HashMap::new();
188    // Use IDB names (which include facts defined by unit clauses)
189    // plus any relation_names from the store
190    let mut all_names: Vec<String> = interpreter.store().relation_names();
191    for name in &idb_names {
192        if !all_names.contains(name) {
193            all_names.push(name.clone());
194        }
195    }
196
197    for name in &all_names {
198        if let Ok(iter) = interpreter.store().scan(name) {
199            let facts: Vec<Vec<Value>> = iter.collect();
200            if !facts.is_empty() {
201                result.insert(name.clone(), facts);
202            }
203        }
204    }
205    Ok(result)
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    #[test]
212    fn test_file_edb_source_mgr() -> Result<()> {
213        let dir = tempfile::tempdir()?;
214
215        // Write a .mgr file
216        let mgr_path = dir.path().join("edges.mgr");
217        let mut f = std::fs::File::create(&mgr_path)?;
218        let tables = vec![(
219            "edge".to_string(),
220            vec![
221                vec![Value::Number(1), Value::Number(2)],
222                vec![Value::Number(2), Value::Number(3)],
223            ],
224        )];
225        simplerow::write_simple_row(&mut f, &tables)?;
226
227        let source = FileEdbSource::new("test", dir.path());
228
229        let relations = source.relations()?;
230        assert_eq!(relations.len(), 1);
231        assert_eq!(relations[0].name, "edge");
232        assert_eq!(relations[0].estimated_rows, 2);
233
234        let facts = source.scan("edge")?;
235        assert_eq!(facts.len(), 2);
236        assert_eq!(facts[0], vec![Value::Number(1), Value::Number(2)]);
237
238        let fp = source.fingerprint()?;
239        assert!(fp.is_some());
240
241        Ok(())
242    }
243
244    #[test]
245    fn test_file_edb_source_mg() -> Result<()> {
246        let dir = tempfile::tempdir()?;
247
248        // Write a .mg file — use separate lines so parsing is unambiguous
249        let mg_path = dir.path().join("data.mg");
250        std::fs::write(&mg_path, "p(1).\np(2).\np(3).\n")?;
251
252        let source = FileEdbSource::new("test", dir.path());
253
254        let relations = source.relations()?;
255        assert!(relations.iter().any(|r| r.name == "p"));
256
257        let facts = source.scan("p")?;
258        assert_eq!(facts.len(), 3);
259
260        Ok(())
261    }
262
263    #[test]
264    fn test_file_edb_retract_processing() -> Result<()> {
265        let dir = tempfile::tempdir()?;
266
267        // Write initial facts
268        std::fs::write(
269            dir.path().join("data.mg"),
270            "container(\"web\", \"running\").\ncontainer(\"db\", \"running\").\n",
271        )?;
272
273        // Write mutations with a retraction
274        std::fs::write(
275            dir.path().join("mutations.mg"),
276            concat!(
277                "container(\"api\", \"running\").\n",
278                "__retract__container(\"web\", \"running\").\n",
279            ),
280        )?;
281
282        let source = FileEdbSource::new("test", dir.path());
283
284        let facts = source.scan("container")?;
285        // "web" was retracted, "db" and "api" remain
286        assert_eq!(facts.len(), 2);
287
288        let names: Vec<&str> = facts
289            .iter()
290            .map(|f| match &f[0] {
291                Value::String(s) => s.as_str(),
292                _ => panic!("expected string"),
293            })
294            .collect();
295        assert!(names.contains(&"db"));
296        assert!(names.contains(&"api"));
297        assert!(!names.contains(&"web"));
298
299        // __retract__ relation should not appear in the output
300        let relations = source.relations()?;
301        assert!(!relations.iter().any(|r| r.name.starts_with("__retract__")));
302
303        Ok(())
304    }
305}