Skip to main content

silk/
engine.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::graph::MaterializedGraph;
4
5/// BFS traversal result — node IDs in visit order.
6pub fn bfs(
7    graph: &MaterializedGraph,
8    start: &str,
9    max_depth: Option<usize>,
10    edge_type_filter: Option<&str>,
11) -> Vec<String> {
12    let mut visited = HashSet::new();
13    let mut result = Vec::new();
14    let mut queue: VecDeque<(String, usize)> = VecDeque::new();
15
16    if graph.get_node(start).is_none() {
17        return result;
18    }
19
20    visited.insert(start.to_string());
21    queue.push_back((start.to_string(), 0));
22
23    while let Some((node_id, depth)) = queue.pop_front() {
24        result.push(node_id.clone());
25
26        if let Some(max) = max_depth {
27            if depth >= max {
28                continue;
29            }
30        }
31
32        let edges = graph.outgoing_edges(&node_id);
33        for edge in edges {
34            if let Some(filter) = edge_type_filter {
35                if edge.edge_type != filter {
36                    continue;
37                }
38            }
39            if !visited.contains(&edge.target_id) {
40                visited.insert(edge.target_id.clone());
41                queue.push_back((edge.target_id.clone(), depth + 1));
42            }
43        }
44    }
45
46    result
47}
48
49/// DFS traversal result — node IDs in visit order (depth-first).
50pub fn dfs(
51    graph: &MaterializedGraph,
52    start: &str,
53    max_depth: Option<usize>,
54    edge_type_filter: Option<&str>,
55) -> Vec<String> {
56    let mut visited = HashSet::new();
57    let mut result = Vec::new();
58    let mut stack: Vec<(String, usize)> = Vec::new();
59
60    if graph.get_node(start).is_none() {
61        return result;
62    }
63
64    visited.insert(start.to_string());
65    stack.push((start.to_string(), 0));
66
67    while let Some((node_id, depth)) = stack.pop() {
68        result.push(node_id.clone());
69
70        if let Some(max) = max_depth {
71            if depth >= max {
72                continue;
73            }
74        }
75
76        let edges = graph.outgoing_edges(&node_id);
77        for edge in edges {
78            if let Some(filter) = edge_type_filter {
79                if edge.edge_type != filter {
80                    continue;
81                }
82            }
83            if !visited.contains(&edge.target_id) {
84                visited.insert(edge.target_id.clone());
85                stack.push((edge.target_id.clone(), depth + 1));
86            }
87        }
88    }
89
90    result
91}
92
93/// Shortest path between two nodes (unweighted BFS).
94/// Returns the path as a list of node IDs (including start and end),
95/// or None if no path exists.
96pub fn shortest_path(graph: &MaterializedGraph, start: &str, end: &str) -> Option<Vec<String>> {
97    if graph.get_node(start).is_none() || graph.get_node(end).is_none() {
98        return None;
99    }
100    if start == end {
101        return Some(vec![start.to_string()]);
102    }
103
104    let mut visited = HashSet::new();
105    let mut parent: HashMap<String, String> = HashMap::new();
106    let mut queue: VecDeque<String> = VecDeque::new();
107
108    visited.insert(start.to_string());
109    queue.push_back(start.to_string());
110
111    while let Some(current) = queue.pop_front() {
112        for edge in graph.outgoing_edges(&current) {
113            if !visited.contains(&edge.target_id) {
114                visited.insert(edge.target_id.clone());
115                parent.insert(edge.target_id.clone(), current.clone());
116                if edge.target_id == end {
117                    // Reconstruct path.
118                    let mut path = vec![end.to_string()];
119                    let mut cur = end.to_string();
120                    while let Some(p) = parent.get(&cur) {
121                        path.push(p.clone());
122                        cur = p.clone();
123                    }
124                    path.reverse();
125                    return Some(path);
126                }
127                queue.push_back(edge.target_id.clone());
128            }
129        }
130    }
131
132    None
133}
134
135/// Impact analysis: reverse BFS from a node — "what depends on this?"
136/// Traverses incoming edges to find all nodes that transitively depend on `node_id`.
137pub fn impact_analysis(
138    graph: &MaterializedGraph,
139    node_id: &str,
140    max_depth: Option<usize>,
141) -> Vec<String> {
142    let mut visited = HashSet::new();
143    let mut result = Vec::new();
144    let mut queue: VecDeque<(String, usize)> = VecDeque::new();
145
146    if graph.get_node(node_id).is_none() {
147        return result;
148    }
149
150    visited.insert(node_id.to_string());
151    queue.push_back((node_id.to_string(), 0));
152
153    while let Some((current, depth)) = queue.pop_front() {
154        result.push(current.clone());
155
156        if let Some(max) = max_depth {
157            if depth >= max {
158                continue;
159            }
160        }
161
162        for edge in graph.incoming_edges(&current) {
163            if !visited.contains(&edge.source_id) {
164                visited.insert(edge.source_id.clone());
165                queue.push_back((edge.source_id.clone(), depth + 1));
166            }
167        }
168    }
169
170    result
171}
172
173/// Extract subgraph: all nodes and edges within N hops of a start node.
174/// Returns (node_ids, edge_ids).
175pub fn subgraph(graph: &MaterializedGraph, start: &str, hops: usize) -> (Vec<String>, Vec<String>) {
176    let mut visited_nodes = HashSet::new();
177    let mut visited_edges = HashSet::new();
178    let mut queue: VecDeque<(String, usize)> = VecDeque::new();
179
180    if graph.get_node(start).is_none() {
181        return (vec![], vec![]);
182    }
183
184    visited_nodes.insert(start.to_string());
185    queue.push_back((start.to_string(), 0));
186
187    while let Some((node_id, depth)) = queue.pop_front() {
188        if depth >= hops {
189            continue;
190        }
191
192        // Outgoing.
193        for edge in graph.outgoing_edges(&node_id) {
194            visited_edges.insert(edge.edge_id.clone());
195            if !visited_nodes.contains(&edge.target_id) {
196                visited_nodes.insert(edge.target_id.clone());
197                queue.push_back((edge.target_id.clone(), depth + 1));
198            }
199        }
200        // Incoming.
201        for edge in graph.incoming_edges(&node_id) {
202            visited_edges.insert(edge.edge_id.clone());
203            if !visited_nodes.contains(&edge.source_id) {
204                visited_nodes.insert(edge.source_id.clone());
205                queue.push_back((edge.source_id.clone(), depth + 1));
206            }
207        }
208    }
209
210    (
211        visited_nodes.into_iter().collect(),
212        visited_edges.into_iter().collect(),
213    )
214}
215
216/// Pattern match: find chains matching a sequence of node types connected by edges.
217/// E.g., `["source", "processor", "sink"]` finds all matching chains.
218/// Returns list of chains, each chain being a list of node_ids.
219/// Find chains of nodes matching a type sequence (e.g., `["source", "processor", "sink"]`).
220///
221/// Complexity: O(n * b^d) where n = nodes of first type, b = average branching,
222/// d = sequence length. Bounded by `max_results` to prevent runaway expansion on
223/// dense graphs. Cycle-safe: a node cannot appear twice in the same chain.
224pub fn pattern_match(
225    graph: &MaterializedGraph,
226    type_sequence: &[&str],
227    max_results: usize,
228) -> Vec<Vec<String>> {
229    if type_sequence.is_empty() {
230        return vec![];
231    }
232
233    let mut results = Vec::new();
234
235    // Start from all nodes of the first type.
236    let start_nodes = graph.nodes_by_type(type_sequence[0]);
237    for start in start_nodes {
238        let mut chains = vec![vec![start.node_id.clone()]];
239
240        for &next_type in &type_sequence[1..] {
241            let mut extended = Vec::new();
242            for chain in &chains {
243                let last = chain.last().unwrap();
244                for edge in graph.outgoing_edges(last) {
245                    if let Some(target_node) = graph.get_node(&edge.target_id) {
246                        if target_node.node_type == next_type && !chain.contains(&edge.target_id) {
247                            let mut new_chain = chain.clone();
248                            new_chain.push(edge.target_id.clone());
249                            extended.push(new_chain);
250                            if results.len() + extended.len() >= max_results {
251                                results.extend(extended);
252                                results.truncate(max_results);
253                                return results;
254                            }
255                        }
256                    }
257                }
258            }
259            chains = extended;
260        }
261
262        results.extend(chains);
263        if results.len() >= max_results {
264            results.truncate(max_results);
265            return results;
266        }
267    }
268
269    results
270}
271
272/// Topological sort of nodes connected by directed edges.
273/// For DAGs only — returns None if a cycle is detected.
274pub fn topological_sort(graph: &MaterializedGraph) -> Option<Vec<String>> {
275    let nodes = graph.all_nodes();
276    let node_ids: HashSet<String> = nodes.iter().map(|n| n.node_id.clone()).collect();
277
278    // Compute in-degrees.
279    let mut in_degree: HashMap<String, usize> = node_ids.iter().map(|id| (id.clone(), 0)).collect();
280    for edge in graph.all_edges() {
281        if node_ids.contains(&edge.target_id) && node_ids.contains(&edge.source_id) {
282            *in_degree.entry(edge.target_id.clone()).or_default() += 1;
283        }
284    }
285
286    let mut queue: VecDeque<String> = in_degree
287        .iter()
288        .filter(|(_, &deg)| deg == 0)
289        .map(|(id, _)| id.clone())
290        .collect();
291
292    // Sort for determinism.
293    let mut sorted: Vec<String> = queue.drain(..).collect();
294    sorted.sort();
295    queue.extend(sorted);
296
297    let mut result = Vec::new();
298    while let Some(node_id) = queue.pop_front() {
299        result.push(node_id.clone());
300        for edge in graph.outgoing_edges(&node_id) {
301            if let Some(deg) = in_degree.get_mut(&edge.target_id) {
302                *deg -= 1;
303                if *deg == 0 {
304                    queue.push_back(edge.target_id.clone());
305                }
306            }
307        }
308    }
309
310    if result.len() == node_ids.len() {
311        Some(result)
312    } else {
313        None // Cycle detected.
314    }
315}
316
317/// Cycle detection: returns true if the graph contains a cycle.
318pub fn has_cycle(graph: &MaterializedGraph) -> bool {
319    topological_sort(graph).is_none()
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::clock::LamportClock;
326    use crate::entry::{Entry, GraphOp};
327    use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
328    use std::collections::BTreeMap;
329
330    fn test_ontology() -> Ontology {
331        Ontology {
332            node_types: BTreeMap::from([
333                (
334                    "entity".into(),
335                    NodeTypeDef {
336                        description: None,
337                        properties: BTreeMap::new(),
338                        subtypes: None,
339                        parent_type: None,
340                    },
341                ),
342                (
343                    "source".into(),
344                    NodeTypeDef {
345                        description: None,
346                        properties: BTreeMap::new(),
347                        subtypes: None,
348                        parent_type: None,
349                    },
350                ),
351                (
352                    "processor".into(),
353                    NodeTypeDef {
354                        description: None,
355                        properties: BTreeMap::new(),
356                        subtypes: None,
357                        parent_type: None,
358                    },
359                ),
360                (
361                    "queue".into(),
362                    NodeTypeDef {
363                        description: None,
364                        properties: BTreeMap::new(),
365                        subtypes: None,
366                        parent_type: None,
367                    },
368                ),
369                (
370                    "sink".into(),
371                    NodeTypeDef {
372                        description: None,
373                        properties: BTreeMap::new(),
374                        subtypes: None,
375                        parent_type: None,
376                    },
377                ),
378            ]),
379            edge_types: BTreeMap::from([
380                (
381                    "DEPENDS_ON".into(),
382                    EdgeTypeDef {
383                        description: None,
384                        source_types: vec!["entity".into()],
385                        target_types: vec!["entity".into()],
386                        properties: BTreeMap::new(),
387                    },
388                ),
389                (
390                    "FEEDS".into(),
391                    EdgeTypeDef {
392                        description: None,
393                        source_types: vec!["source".into()],
394                        target_types: vec!["processor".into()],
395                        properties: BTreeMap::new(),
396                    },
397                ),
398                (
399                    "ROUTES".into(),
400                    EdgeTypeDef {
401                        description: None,
402                        source_types: vec!["processor".into(), "queue".into(), "sink".into()],
403                        target_types: vec!["queue".into(), "sink".into(), "source".into()],
404                        properties: BTreeMap::new(),
405                    },
406                ),
407            ]),
408        }
409    }
410
411    fn make_entry(op: GraphOp, clock_time: u64) -> Entry {
412        Entry::new(
413            op,
414            vec![],
415            vec![],
416            LamportClock::with_values("test", clock_time, 0),
417            "test",
418        )
419    }
420
421    fn add_node(id: &str, ntype: &str, clock: u64) -> Entry {
422        make_entry(
423            GraphOp::AddNode {
424                node_id: id.into(),
425                node_type: ntype.into(),
426                label: id.into(),
427                properties: BTreeMap::new(),
428                subtype: None,
429            },
430            clock,
431        )
432    }
433
434    fn add_edge(id: &str, etype: &str, src: &str, tgt: &str, clock: u64) -> Entry {
435        make_entry(
436            GraphOp::AddEdge {
437                edge_id: id.into(),
438                edge_type: etype.into(),
439                source_id: src.into(),
440                target_id: tgt.into(),
441                properties: BTreeMap::new(),
442            },
443            clock,
444        )
445    }
446
447    /// Build a linear chain: A → B → C → D
448    fn linear_graph() -> MaterializedGraph {
449        let mut g = MaterializedGraph::new(test_ontology());
450        g.apply(&add_node("a", "entity", 1));
451        g.apply(&add_node("b", "entity", 2));
452        g.apply(&add_node("c", "entity", 3));
453        g.apply(&add_node("d", "entity", 4));
454        g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 5));
455        g.apply(&add_edge("bc", "DEPENDS_ON", "b", "c", 6));
456        g.apply(&add_edge("cd", "DEPENDS_ON", "c", "d", 7));
457        g
458    }
459
460    #[test]
461    fn bfs_traversal_from_node() {
462        let g = linear_graph();
463        let visited = bfs(&g, "a", None, None);
464        assert_eq!(visited, vec!["a", "b", "c", "d"]);
465    }
466
467    #[test]
468    fn bfs_respects_depth_limit() {
469        let g = linear_graph();
470        let visited = bfs(&g, "a", Some(2), None);
471        // depth 0: a, depth 1: b, depth 2: c (but c's children not explored)
472        assert_eq!(visited, vec!["a", "b", "c"]);
473    }
474
475    #[test]
476    fn bfs_filters_edge_types() {
477        let mut g = MaterializedGraph::new(test_ontology());
478        g.apply(&add_node("a", "entity", 1));
479        g.apply(&add_node("b", "entity", 2));
480        g.apply(&add_node("c", "entity", 3));
481        g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 4));
482        g.apply(&add_edge("ac", "DEPENDS_ON", "a", "c", 5));
483        // Add a different edge type that should be filtered out.
484        // (Using DEPENDS_ON for simplicity — in real ontology this would be different)
485
486        let visited = bfs(&g, "a", None, Some("DEPENDS_ON"));
487        assert!(visited.contains(&"a".to_string()));
488        assert!(visited.contains(&"b".to_string()));
489        assert!(visited.contains(&"c".to_string()));
490
491        // Filter by nonexistent type → only start node.
492        let visited2 = bfs(&g, "a", None, Some("NONEXISTENT"));
493        assert_eq!(visited2, vec!["a"]);
494    }
495
496    #[test]
497    fn dfs_traversal_from_node() {
498        let g = linear_graph();
499        let visited = dfs(&g, "a", None, None);
500        // DFS visits all nodes but in depth-first order
501        assert_eq!(visited.len(), 4);
502        assert_eq!(visited[0], "a"); // start node always first
503        assert!(visited.contains(&"d".to_string()));
504    }
505
506    #[test]
507    fn dfs_respects_depth_limit() {
508        let g = linear_graph();
509        let visited = dfs(&g, "a", Some(2), None);
510        assert!(visited.len() <= 3);
511        assert_eq!(visited[0], "a");
512    }
513
514    #[test]
515    fn dfs_visits_deep_before_wide() {
516        // Build a graph: a -> b -> d, a -> c
517        // DFS from a should reach d before c (depth-first)
518        let mut g = MaterializedGraph::new(test_ontology());
519        g.apply(&add_node("a", "entity", 1));
520        g.apply(&add_node("b", "entity", 2));
521        g.apply(&add_node("c", "entity", 3));
522        g.apply(&add_node("d", "entity", 4));
523        g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 5));
524        g.apply(&add_edge("ac", "DEPENDS_ON", "a", "c", 6));
525        g.apply(&add_edge("bd", "DEPENDS_ON", "b", "d", 7));
526
527        let bfs_result = bfs(&g, "a", None, None);
528        let dfs_result = dfs(&g, "a", None, None);
529
530        // Both visit all 4 nodes
531        assert_eq!(bfs_result.len(), 4);
532        assert_eq!(dfs_result.len(), 4);
533
534        // BFS is breadth-first: a, then b and c (depth 1), then d (depth 2)
535        // DFS is depth-first: explores one branch fully before the other
536        // Both start with a
537        assert_eq!(bfs_result[0], "a");
538        assert_eq!(dfs_result[0], "a");
539    }
540
541    #[test]
542    fn shortest_path_finds_path() {
543        let g = linear_graph();
544        let path = shortest_path(&g, "a", "d").unwrap();
545        assert_eq!(path, vec!["a", "b", "c", "d"]);
546    }
547
548    #[test]
549    fn shortest_path_no_path() {
550        let mut g = MaterializedGraph::new(test_ontology());
551        g.apply(&add_node("a", "entity", 1));
552        g.apply(&add_node("b", "entity", 2));
553        // No edge between them.
554        assert!(shortest_path(&g, "a", "b").is_none());
555    }
556
557    #[test]
558    fn impact_analysis_reverse_traversal() {
559        let g = linear_graph(); // a → b → c → d
560                                // "What depends on d?" → reverse: c, b, a
561        let impact = impact_analysis(&g, "d", None);
562        assert!(impact.contains(&"d".to_string()));
563        assert!(impact.contains(&"c".to_string()));
564        assert!(impact.contains(&"b".to_string()));
565        assert!(impact.contains(&"a".to_string()));
566    }
567
568    #[test]
569    fn subgraph_extraction() {
570        let g = linear_graph(); // a → b → c → d
571        let (nodes, edges) = subgraph(&g, "b", 1);
572        // 1 hop from b: a (incoming), c (outgoing)
573        assert!(nodes.contains(&"b".to_string()));
574        assert!(nodes.contains(&"a".to_string()));
575        assert!(nodes.contains(&"c".to_string()));
576        assert!(!nodes.contains(&"d".to_string())); // 2 hops away
577        assert_eq!(edges.len(), 2); // ab, bc
578    }
579
580    #[test]
581    fn pattern_match_type_chain() {
582        let mut g = MaterializedGraph::new(test_ontology());
583        g.apply(&add_node("src1", "source", 1));
584        g.apply(&add_node("proc1", "processor", 2));
585        g.apply(&add_node("q1", "queue", 3));
586        g.apply(&add_node("snk1", "sink", 4));
587        g.apply(&add_edge("e1", "FEEDS", "src1", "proc1", 5));
588        g.apply(&add_edge("e2", "ROUTES", "proc1", "q1", 6));
589        g.apply(&add_edge("e3", "ROUTES", "q1", "snk1", 7));
590
591        let chains = pattern_match(&g, &["source", "processor", "queue", "sink"], 1000);
592        assert_eq!(chains.len(), 1);
593        assert_eq!(chains[0], vec!["src1", "proc1", "q1", "snk1"]);
594    }
595
596    #[test]
597    fn topological_sort_dependency_order() {
598        let g = linear_graph(); // a → b → c → d
599        let sorted = topological_sort(&g).unwrap();
600        // a must come before b, b before c, c before d.
601        let pos = |id: &str| sorted.iter().position(|x| x == id).unwrap();
602        assert!(pos("a") < pos("b"));
603        assert!(pos("b") < pos("c"));
604        assert!(pos("c") < pos("d"));
605    }
606
607    #[test]
608    fn cycle_detection() {
609        let mut g = MaterializedGraph::new(test_ontology());
610        g.apply(&add_node("a", "entity", 1));
611        g.apply(&add_node("b", "entity", 2));
612        g.apply(&add_node("c", "entity", 3));
613        g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 4));
614        g.apply(&add_edge("bc", "DEPENDS_ON", "b", "c", 5));
615        g.apply(&add_edge("ca", "DEPENDS_ON", "c", "a", 6)); // cycle!
616
617        assert!(has_cycle(&g));
618        assert!(topological_sort(&g).is_none());
619    }
620}