1use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Mutex;
20
21use anyhow::{Result, anyhow};
22use mangle_common::Value;
23use sha2::{Digest, Sha256};
24
25use crate::simplerow;
26use crate::source::{EdbSource, Fingerprint, RelationInfo};
27
28pub struct FileEdbSource {
33 name: String,
34 dir: PathBuf,
35 cache: Mutex<Option<HashMap<String, Vec<Vec<Value>>>>>,
37}
38
39impl FileEdbSource {
40 pub fn new(name: impl Into<String>, dir: impl Into<PathBuf>) -> Self {
41 Self {
42 name: name.into(),
43 dir: dir.into(),
44 cache: Mutex::new(None),
45 }
46 }
47
48 fn load_data(&self) -> Result<HashMap<String, Vec<Vec<Value>>>> {
49 let mut tables = HashMap::new();
50
51 if !self.dir.exists() {
52 return Err(anyhow!(
53 "EDB source directory does not exist: {:?}",
54 self.dir
55 ));
56 }
57
58 for entry in std::fs::read_dir(&self.dir)? {
59 let entry = entry?;
60 let path = entry.path();
61
62 if path.extension().is_some_and(|ext| ext == "mgr") {
63 let data = std::fs::read(&path)?;
65 let sr_data = simplerow::read_from_bytes(&data)?;
66 for (name, facts) in sr_data.tables {
67 tables.entry(name).or_insert_with(Vec::new).extend(facts);
68 }
69 } else if path.extension().is_some_and(|ext| ext == "mg") {
70 let source = std::fs::read_to_string(&path)?;
72 let facts = execute_source_for_facts(&source)?;
73 for (name, tuples) in facts {
74 tables.entry(name).or_insert_with(Vec::new).extend(tuples);
75 }
76 }
77 }
78
79 let retract_prefix = "__retract__";
81 let retract_keys: Vec<String> = tables
82 .keys()
83 .filter(|k| k.starts_with(retract_prefix))
84 .cloned()
85 .collect();
86 for retract_key in retract_keys {
87 let base_name = &retract_key[retract_prefix.len()..];
88 if let Some(retractions) = tables.remove(&retract_key) {
89 if let Some(base_facts) = tables.get_mut(base_name) {
90 base_facts.retain(|fact| !retractions.contains(fact));
91 }
92 }
93 }
94
95 Ok(tables)
96 }
97
98 fn ensure_loaded(&self) -> Result<()> {
99 let mut cache = self.cache.lock().map_err(|_| anyhow!("lock poisoned"))?;
100 if cache.is_none() {
101 *cache = Some(self.load_data()?);
102 }
103 Ok(())
104 }
105}
106
107impl EdbSource for FileEdbSource {
108 fn name(&self) -> &str {
109 &self.name
110 }
111
112 fn relations(&self) -> Result<Vec<RelationInfo>> {
113 self.ensure_loaded()?;
114 let cache = self.cache.lock().map_err(|_| anyhow!("lock poisoned"))?;
115 let data = cache.as_ref().unwrap();
116 Ok(data
117 .iter()
118 .map(|(name, facts)| RelationInfo {
119 name: name.clone(),
120 estimated_rows: facts.len(),
121 })
122 .collect())
123 }
124
125 fn scan(&self, relation: &str) -> Result<Vec<Vec<Value>>> {
126 self.ensure_loaded()?;
127 let cache = self.cache.lock().map_err(|_| anyhow!("lock poisoned"))?;
128 let data = cache.as_ref().unwrap();
129 Ok(data.get(relation).cloned().unwrap_or_default())
130 }
131
132 fn fingerprint(&self) -> Result<Option<Fingerprint>> {
133 if !self.dir.exists() {
134 return Ok(None);
135 }
136
137 let mut hasher = Sha256::new();
138 let mut entries: Vec<_> = std::fs::read_dir(&self.dir)?
139 .filter_map(|e| e.ok())
140 .filter(|e| {
141 let path = e.path();
142 path.extension()
143 .is_some_and(|ext| ext == "mgr" || ext == "mg")
144 })
145 .collect();
146 entries.sort_by_key(|e| e.file_name());
147
148 for entry in entries {
149 let path = entry.path();
150 hasher.update(path.file_name().unwrap().as_encoded_bytes());
151 let meta = std::fs::metadata(&path)?;
152 hasher.update(meta.len().to_le_bytes());
153 if let Ok(mtime) = meta.modified() {
154 let secs = mtime
155 .duration_since(std::time::UNIX_EPOCH)
156 .unwrap_or_default()
157 .as_secs();
158 hasher.update(secs.to_le_bytes());
159 }
160 }
161
162 Ok(Some(Fingerprint(hasher.finalize().to_vec())))
163 }
164}
165
166fn execute_source_for_facts(source: &str) -> Result<HashMap<String, Vec<Vec<Value>>>> {
168 use mangle_ast::Arena;
169 use mangle_interpreter::MemStore;
170
171 let arena = Arena::new_with_global_interner();
172 let (mut ir, stratified) = mangle_driver::compile(source, &arena)?;
173
174 let mut idb_names = Vec::new();
176 for stratum in stratified.strata() {
177 for pred in &stratum {
178 if let Some(name) = arena.predicate_name(*pred) {
179 idb_names.push(name.to_string());
180 }
181 }
182 }
183
184 let store = Box::new(MemStore::new());
185 let interpreter = mangle_driver::execute(&mut ir, &stratified, store)?;
186
187 let mut result = HashMap::new();
188 let mut all_names: Vec<String> = interpreter.store().relation_names();
191 for name in &idb_names {
192 if !all_names.contains(name) {
193 all_names.push(name.clone());
194 }
195 }
196
197 for name in &all_names {
198 if let Ok(iter) = interpreter.store().scan(name) {
199 let facts: Vec<Vec<Value>> = iter.collect();
200 if !facts.is_empty() {
201 result.insert(name.clone(), facts);
202 }
203 }
204 }
205 Ok(result)
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 #[test]
212 fn test_file_edb_source_mgr() -> Result<()> {
213 let dir = tempfile::tempdir()?;
214
215 let mgr_path = dir.path().join("edges.mgr");
217 let mut f = std::fs::File::create(&mgr_path)?;
218 let tables = vec![(
219 "edge".to_string(),
220 vec![
221 vec![Value::Number(1), Value::Number(2)],
222 vec![Value::Number(2), Value::Number(3)],
223 ],
224 )];
225 simplerow::write_simple_row(&mut f, &tables)?;
226
227 let source = FileEdbSource::new("test", dir.path());
228
229 let relations = source.relations()?;
230 assert_eq!(relations.len(), 1);
231 assert_eq!(relations[0].name, "edge");
232 assert_eq!(relations[0].estimated_rows, 2);
233
234 let facts = source.scan("edge")?;
235 assert_eq!(facts.len(), 2);
236 assert_eq!(facts[0], vec![Value::Number(1), Value::Number(2)]);
237
238 let fp = source.fingerprint()?;
239 assert!(fp.is_some());
240
241 Ok(())
242 }
243
244 #[test]
245 fn test_file_edb_source_mg() -> Result<()> {
246 let dir = tempfile::tempdir()?;
247
248 let mg_path = dir.path().join("data.mg");
250 std::fs::write(&mg_path, "p(1).\np(2).\np(3).\n")?;
251
252 let source = FileEdbSource::new("test", dir.path());
253
254 let relations = source.relations()?;
255 assert!(relations.iter().any(|r| r.name == "p"));
256
257 let facts = source.scan("p")?;
258 assert_eq!(facts.len(), 3);
259
260 Ok(())
261 }
262
263 #[test]
264 fn test_file_edb_retract_processing() -> Result<()> {
265 let dir = tempfile::tempdir()?;
266
267 std::fs::write(
269 dir.path().join("data.mg"),
270 "container(\"web\", \"running\").\ncontainer(\"db\", \"running\").\n",
271 )?;
272
273 std::fs::write(
275 dir.path().join("mutations.mg"),
276 concat!(
277 "container(\"api\", \"running\").\n",
278 "__retract__container(\"web\", \"running\").\n",
279 ),
280 )?;
281
282 let source = FileEdbSource::new("test", dir.path());
283
284 let facts = source.scan("container")?;
285 assert_eq!(facts.len(), 2);
287
288 let names: Vec<&str> = facts
289 .iter()
290 .map(|f| match &f[0] {
291 Value::String(s) => s.as_str(),
292 _ => panic!("expected string"),
293 })
294 .collect();
295 assert!(names.contains(&"db"));
296 assert!(names.contains(&"api"));
297 assert!(!names.contains(&"web"));
298
299 let relations = source.relations()?;
301 assert!(!relations.iter().any(|r| r.name.starts_with("__retract__")));
302
303 Ok(())
304 }
305}