Skip to main content

nedb_engine/
graph.rs

1//! DAG edge store — typed directed edges between node hashes.
2//!
3//! Layout: `graph/{from_hash}/{edge_type}/{to_hash}`
4//! Existence of the file = the edge exists. No file content needed.
5//!
6//! This makes TRACE queries pure filesystem traversal:
7//!   FROM nodes TRACE caused_by → list dir graph/{hash}/caused_by/
8//!   Each entry is the hash of a causal predecessor node.
9//!   Follow recursively until limit reached or no more edges.
10//!
11//! Write is atomic (create file). Read is readdir. Both are O(degree).
12//! No global lock. Multiple threads can add edges concurrently.
13
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18
19pub struct GraphStore {
20    root: PathBuf,
21    /// In-memory edges: (from, edge_type) → Set<to>. None = disk-backed.
22    mem:  Option<Arc<dashmap::DashMap<(String, String), std::collections::HashSet<String>>>>,
23}
24
25impl GraphStore {
26    pub fn new(db_root: &Path) -> Result<Self> {
27        let root = db_root.join("graph");
28        fs::create_dir_all(&root)?;
29        Ok(Self { root, mem: None })
30    }
31
32    /// Create a pure in-memory graph store — no disk I/O.
33    pub fn in_memory() -> Self {
34        Self {
35            root: PathBuf::from(":memory:"),
36            mem:  Some(Arc::new(dashmap::DashMap::new())),
37        }
38    }
39
40    fn edge_path(&self, from: &str, edge_type: &str, to: &str) -> PathBuf {
41        self.root.join(from).join(edge_type).join(to)
42    }
43
44    /// Add a directed edge: from → to with the given type label.
45    pub fn add_edge(&self, from: &str, edge_type: &str, to: &str) -> Result<()> {
46        if let Some(ref mem) = self.mem {
47            mem.entry((from.to_string(), edge_type.to_string()))
48               .or_default()
49               .insert(to.to_string());
50            return Ok(());
51        }
52        let path = self.edge_path(from, edge_type, to);
53        fs::create_dir_all(path.parent().unwrap())?;
54        if !path.exists() {
55            fs::write(&path, b"")?;
56        }
57        Ok(())
58    }
59
60    /// Get all outgoing edges of a given type from a node.
61    pub fn outgoing(&self, from: &str, edge_type: &str) -> Vec<String> {
62        if let Some(ref mem) = self.mem {
63            return mem.get(&(from.to_string(), edge_type.to_string()))
64                .map(|s| s.iter().cloned().collect())
65                .unwrap_or_default();
66        }
67        let dir = self.root.join(from).join(edge_type);
68        fs::read_dir(&dir)
69            .into_iter()
70            .flatten()
71            .filter_map(|e| e.ok())
72            .map(|e| e.file_name().to_string_lossy().to_string())
73            .collect()
74    }
75
76    /// Get all incoming edges of a given type to a node (reverse lookup).
77    /// This requires scanning the `{reverse_edge_type}` edges stored when writing.
78    pub fn incoming(&self, to: &str, reverse_edge_type: &str) -> Vec<String> {
79        self.outgoing(to, reverse_edge_type)
80    }
81
82    /// TRACE: walk the DAG from `start` following `edge_type` edges.
83    /// Returns hashes in BFS order, up to `limit`.
84    pub fn trace(
85        &self,
86        start: &str,
87        edge_type: &str,
88        reverse: bool,
89        limit: usize,
90    ) -> Vec<String> {
91        let mut result = Vec::new();
92        let mut queue  = vec![start.to_string()];
93        let mut seen   = std::collections::HashSet::new();
94        seen.insert(start.to_string());
95
96        while !queue.is_empty() && result.len() < limit {
97            let current = queue.remove(0);
98            result.push(current.clone());
99
100            let next_hashes = if reverse {
101                // "reverse" means traverse the reverse-edge (e.g. "caused" instead of "caused_by")
102                let rev_type = format!("{}_rev", edge_type);
103                self.outgoing(&current, &rev_type)
104            } else {
105                self.outgoing(&current, edge_type)
106            };
107
108            for next in next_hashes {
109                if !seen.contains(&next) {
110                    seen.insert(next.clone());
111                    queue.push(next);
112                }
113            }
114        }
115        result
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use tempfile::tempdir;
123
124    #[test]
125    fn add_and_traverse_edge() {
126        let dir = tempdir().unwrap();
127        let g = GraphStore::new(dir.path()).unwrap();
128
129        g.add_edge("hash_c", "caused_by", "hash_b").unwrap();
130        g.add_edge("hash_b", "caused_by", "hash_a").unwrap();
131
132        let trace = g.trace("hash_c", "caused_by", false, 10);
133        assert_eq!(trace, vec!["hash_c", "hash_b", "hash_a"]);
134    }
135
136    #[test]
137    fn idempotent_edge() {
138        let dir = tempdir().unwrap();
139        let g = GraphStore::new(dir.path()).unwrap();
140        g.add_edge("a", "caused_by", "b").unwrap();
141        g.add_edge("a", "caused_by", "b").unwrap();   // second call is no-op
142        assert_eq!(g.outgoing("a", "caused_by").len(), 1);
143    }
144}