1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::graph::MaterializedGraph;
4
5pub fn bfs(
7 graph: &MaterializedGraph,
8 start: &str,
9 max_depth: Option<usize>,
10 edge_type_filter: Option<&str>,
11) -> Vec<String> {
12 let mut visited = HashSet::new();
13 let mut result = Vec::new();
14 let mut queue: VecDeque<(String, usize)> = VecDeque::new();
15
16 if graph.get_node(start).is_none() {
17 return result;
18 }
19
20 visited.insert(start.to_string());
21 queue.push_back((start.to_string(), 0));
22
23 while let Some((node_id, depth)) = queue.pop_front() {
24 result.push(node_id.clone());
25
26 if let Some(max) = max_depth {
27 if depth >= max {
28 continue;
29 }
30 }
31
32 let edges = graph.outgoing_edges(&node_id);
33 for edge in edges {
34 if let Some(filter) = edge_type_filter {
35 if edge.edge_type != filter {
36 continue;
37 }
38 }
39 if !visited.contains(&edge.target_id) {
40 visited.insert(edge.target_id.clone());
41 queue.push_back((edge.target_id.clone(), depth + 1));
42 }
43 }
44 }
45
46 result
47}
48
49pub fn dfs(
51 graph: &MaterializedGraph,
52 start: &str,
53 max_depth: Option<usize>,
54 edge_type_filter: Option<&str>,
55) -> Vec<String> {
56 let mut visited = HashSet::new();
57 let mut result = Vec::new();
58 let mut stack: Vec<(String, usize)> = Vec::new();
59
60 if graph.get_node(start).is_none() {
61 return result;
62 }
63
64 visited.insert(start.to_string());
65 stack.push((start.to_string(), 0));
66
67 while let Some((node_id, depth)) = stack.pop() {
68 result.push(node_id.clone());
69
70 if let Some(max) = max_depth {
71 if depth >= max {
72 continue;
73 }
74 }
75
76 let edges = graph.outgoing_edges(&node_id);
77 for edge in edges {
78 if let Some(filter) = edge_type_filter {
79 if edge.edge_type != filter {
80 continue;
81 }
82 }
83 if !visited.contains(&edge.target_id) {
84 visited.insert(edge.target_id.clone());
85 stack.push((edge.target_id.clone(), depth + 1));
86 }
87 }
88 }
89
90 result
91}
92
93pub fn shortest_path(graph: &MaterializedGraph, start: &str, end: &str) -> Option<Vec<String>> {
97 if graph.get_node(start).is_none() || graph.get_node(end).is_none() {
98 return None;
99 }
100 if start == end {
101 return Some(vec![start.to_string()]);
102 }
103
104 let mut visited = HashSet::new();
105 let mut parent: HashMap<String, String> = HashMap::new();
106 let mut queue: VecDeque<String> = VecDeque::new();
107
108 visited.insert(start.to_string());
109 queue.push_back(start.to_string());
110
111 while let Some(current) = queue.pop_front() {
112 for edge in graph.outgoing_edges(¤t) {
113 if !visited.contains(&edge.target_id) {
114 visited.insert(edge.target_id.clone());
115 parent.insert(edge.target_id.clone(), current.clone());
116 if edge.target_id == end {
117 let mut path = vec![end.to_string()];
119 let mut cur = end.to_string();
120 while let Some(p) = parent.get(&cur) {
121 path.push(p.clone());
122 cur = p.clone();
123 }
124 path.reverse();
125 return Some(path);
126 }
127 queue.push_back(edge.target_id.clone());
128 }
129 }
130 }
131
132 None
133}
134
135pub fn impact_analysis(
138 graph: &MaterializedGraph,
139 node_id: &str,
140 max_depth: Option<usize>,
141) -> Vec<String> {
142 let mut visited = HashSet::new();
143 let mut result = Vec::new();
144 let mut queue: VecDeque<(String, usize)> = VecDeque::new();
145
146 if graph.get_node(node_id).is_none() {
147 return result;
148 }
149
150 visited.insert(node_id.to_string());
151 queue.push_back((node_id.to_string(), 0));
152
153 while let Some((current, depth)) = queue.pop_front() {
154 result.push(current.clone());
155
156 if let Some(max) = max_depth {
157 if depth >= max {
158 continue;
159 }
160 }
161
162 for edge in graph.incoming_edges(¤t) {
163 if !visited.contains(&edge.source_id) {
164 visited.insert(edge.source_id.clone());
165 queue.push_back((edge.source_id.clone(), depth + 1));
166 }
167 }
168 }
169
170 result
171}
172
173pub fn subgraph(graph: &MaterializedGraph, start: &str, hops: usize) -> (Vec<String>, Vec<String>) {
176 let mut visited_nodes = HashSet::new();
177 let mut visited_edges = HashSet::new();
178 let mut queue: VecDeque<(String, usize)> = VecDeque::new();
179
180 if graph.get_node(start).is_none() {
181 return (vec![], vec![]);
182 }
183
184 visited_nodes.insert(start.to_string());
185 queue.push_back((start.to_string(), 0));
186
187 while let Some((node_id, depth)) = queue.pop_front() {
188 if depth >= hops {
189 continue;
190 }
191
192 for edge in graph.outgoing_edges(&node_id) {
194 visited_edges.insert(edge.edge_id.clone());
195 if !visited_nodes.contains(&edge.target_id) {
196 visited_nodes.insert(edge.target_id.clone());
197 queue.push_back((edge.target_id.clone(), depth + 1));
198 }
199 }
200 for edge in graph.incoming_edges(&node_id) {
202 visited_edges.insert(edge.edge_id.clone());
203 if !visited_nodes.contains(&edge.source_id) {
204 visited_nodes.insert(edge.source_id.clone());
205 queue.push_back((edge.source_id.clone(), depth + 1));
206 }
207 }
208 }
209
210 (
211 visited_nodes.into_iter().collect(),
212 visited_edges.into_iter().collect(),
213 )
214}
215
216pub fn pattern_match(
225 graph: &MaterializedGraph,
226 type_sequence: &[&str],
227 max_results: usize,
228) -> Vec<Vec<String>> {
229 if type_sequence.is_empty() {
230 return vec![];
231 }
232
233 let mut results = Vec::new();
234
235 let start_nodes = graph.nodes_by_type(type_sequence[0]);
237 for start in start_nodes {
238 let mut chains = vec![vec![start.node_id.clone()]];
239
240 for &next_type in &type_sequence[1..] {
241 let mut extended = Vec::new();
242 for chain in &chains {
243 let last = chain.last().unwrap();
244 for edge in graph.outgoing_edges(last) {
245 if let Some(target_node) = graph.get_node(&edge.target_id) {
246 if target_node.node_type == next_type && !chain.contains(&edge.target_id) {
247 let mut new_chain = chain.clone();
248 new_chain.push(edge.target_id.clone());
249 extended.push(new_chain);
250 if results.len() + extended.len() >= max_results {
251 results.extend(extended);
252 results.truncate(max_results);
253 return results;
254 }
255 }
256 }
257 }
258 }
259 chains = extended;
260 }
261
262 results.extend(chains);
263 if results.len() >= max_results {
264 results.truncate(max_results);
265 return results;
266 }
267 }
268
269 results
270}
271
272pub fn topological_sort(graph: &MaterializedGraph) -> Option<Vec<String>> {
275 let nodes = graph.all_nodes();
276 let node_ids: HashSet<String> = nodes.iter().map(|n| n.node_id.clone()).collect();
277
278 let mut in_degree: HashMap<String, usize> = node_ids.iter().map(|id| (id.clone(), 0)).collect();
280 for edge in graph.all_edges() {
281 if node_ids.contains(&edge.target_id) && node_ids.contains(&edge.source_id) {
282 *in_degree.entry(edge.target_id.clone()).or_default() += 1;
283 }
284 }
285
286 let mut queue: VecDeque<String> = in_degree
287 .iter()
288 .filter(|(_, °)| deg == 0)
289 .map(|(id, _)| id.clone())
290 .collect();
291
292 let mut sorted: Vec<String> = queue.drain(..).collect();
294 sorted.sort();
295 queue.extend(sorted);
296
297 let mut result = Vec::new();
298 while let Some(node_id) = queue.pop_front() {
299 result.push(node_id.clone());
300 for edge in graph.outgoing_edges(&node_id) {
301 if let Some(deg) = in_degree.get_mut(&edge.target_id) {
302 *deg -= 1;
303 if *deg == 0 {
304 queue.push_back(edge.target_id.clone());
305 }
306 }
307 }
308 }
309
310 if result.len() == node_ids.len() {
311 Some(result)
312 } else {
313 None }
315}
316
317pub fn has_cycle(graph: &MaterializedGraph) -> bool {
319 topological_sort(graph).is_none()
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::clock::LamportClock;
326 use crate::entry::{Entry, GraphOp};
327 use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
328 use std::collections::BTreeMap;
329
330 fn test_ontology() -> Ontology {
331 Ontology {
332 node_types: BTreeMap::from([
333 (
334 "entity".into(),
335 NodeTypeDef {
336 description: None,
337 properties: BTreeMap::new(),
338 subtypes: None,
339 parent_type: None,
340 },
341 ),
342 (
343 "source".into(),
344 NodeTypeDef {
345 description: None,
346 properties: BTreeMap::new(),
347 subtypes: None,
348 parent_type: None,
349 },
350 ),
351 (
352 "processor".into(),
353 NodeTypeDef {
354 description: None,
355 properties: BTreeMap::new(),
356 subtypes: None,
357 parent_type: None,
358 },
359 ),
360 (
361 "queue".into(),
362 NodeTypeDef {
363 description: None,
364 properties: BTreeMap::new(),
365 subtypes: None,
366 parent_type: None,
367 },
368 ),
369 (
370 "sink".into(),
371 NodeTypeDef {
372 description: None,
373 properties: BTreeMap::new(),
374 subtypes: None,
375 parent_type: None,
376 },
377 ),
378 ]),
379 edge_types: BTreeMap::from([
380 (
381 "DEPENDS_ON".into(),
382 EdgeTypeDef {
383 description: None,
384 source_types: vec!["entity".into()],
385 target_types: vec!["entity".into()],
386 properties: BTreeMap::new(),
387 },
388 ),
389 (
390 "FEEDS".into(),
391 EdgeTypeDef {
392 description: None,
393 source_types: vec!["source".into()],
394 target_types: vec!["processor".into()],
395 properties: BTreeMap::new(),
396 },
397 ),
398 (
399 "ROUTES".into(),
400 EdgeTypeDef {
401 description: None,
402 source_types: vec!["processor".into(), "queue".into(), "sink".into()],
403 target_types: vec!["queue".into(), "sink".into(), "source".into()],
404 properties: BTreeMap::new(),
405 },
406 ),
407 ]),
408 }
409 }
410
411 fn make_entry(op: GraphOp, clock_time: u64) -> Entry {
412 Entry::new(
413 op,
414 vec![],
415 vec![],
416 LamportClock::with_values("test", clock_time, 0),
417 "test",
418 )
419 }
420
421 fn add_node(id: &str, ntype: &str, clock: u64) -> Entry {
422 make_entry(
423 GraphOp::AddNode {
424 node_id: id.into(),
425 node_type: ntype.into(),
426 label: id.into(),
427 properties: BTreeMap::new(),
428 subtype: None,
429 },
430 clock,
431 )
432 }
433
434 fn add_edge(id: &str, etype: &str, src: &str, tgt: &str, clock: u64) -> Entry {
435 make_entry(
436 GraphOp::AddEdge {
437 edge_id: id.into(),
438 edge_type: etype.into(),
439 source_id: src.into(),
440 target_id: tgt.into(),
441 properties: BTreeMap::new(),
442 },
443 clock,
444 )
445 }
446
447 fn linear_graph() -> MaterializedGraph {
449 let mut g = MaterializedGraph::new(test_ontology());
450 g.apply(&add_node("a", "entity", 1));
451 g.apply(&add_node("b", "entity", 2));
452 g.apply(&add_node("c", "entity", 3));
453 g.apply(&add_node("d", "entity", 4));
454 g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 5));
455 g.apply(&add_edge("bc", "DEPENDS_ON", "b", "c", 6));
456 g.apply(&add_edge("cd", "DEPENDS_ON", "c", "d", 7));
457 g
458 }
459
460 #[test]
461 fn bfs_traversal_from_node() {
462 let g = linear_graph();
463 let visited = bfs(&g, "a", None, None);
464 assert_eq!(visited, vec!["a", "b", "c", "d"]);
465 }
466
467 #[test]
468 fn bfs_respects_depth_limit() {
469 let g = linear_graph();
470 let visited = bfs(&g, "a", Some(2), None);
471 assert_eq!(visited, vec!["a", "b", "c"]);
473 }
474
475 #[test]
476 fn bfs_filters_edge_types() {
477 let mut g = MaterializedGraph::new(test_ontology());
478 g.apply(&add_node("a", "entity", 1));
479 g.apply(&add_node("b", "entity", 2));
480 g.apply(&add_node("c", "entity", 3));
481 g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 4));
482 g.apply(&add_edge("ac", "DEPENDS_ON", "a", "c", 5));
483 let visited = bfs(&g, "a", None, Some("DEPENDS_ON"));
487 assert!(visited.contains(&"a".to_string()));
488 assert!(visited.contains(&"b".to_string()));
489 assert!(visited.contains(&"c".to_string()));
490
491 let visited2 = bfs(&g, "a", None, Some("NONEXISTENT"));
493 assert_eq!(visited2, vec!["a"]);
494 }
495
496 #[test]
497 fn dfs_traversal_from_node() {
498 let g = linear_graph();
499 let visited = dfs(&g, "a", None, None);
500 assert_eq!(visited.len(), 4);
502 assert_eq!(visited[0], "a"); assert!(visited.contains(&"d".to_string()));
504 }
505
506 #[test]
507 fn dfs_respects_depth_limit() {
508 let g = linear_graph();
509 let visited = dfs(&g, "a", Some(2), None);
510 assert!(visited.len() <= 3);
511 assert_eq!(visited[0], "a");
512 }
513
514 #[test]
515 fn dfs_visits_deep_before_wide() {
516 let mut g = MaterializedGraph::new(test_ontology());
519 g.apply(&add_node("a", "entity", 1));
520 g.apply(&add_node("b", "entity", 2));
521 g.apply(&add_node("c", "entity", 3));
522 g.apply(&add_node("d", "entity", 4));
523 g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 5));
524 g.apply(&add_edge("ac", "DEPENDS_ON", "a", "c", 6));
525 g.apply(&add_edge("bd", "DEPENDS_ON", "b", "d", 7));
526
527 let bfs_result = bfs(&g, "a", None, None);
528 let dfs_result = dfs(&g, "a", None, None);
529
530 assert_eq!(bfs_result.len(), 4);
532 assert_eq!(dfs_result.len(), 4);
533
534 assert_eq!(bfs_result[0], "a");
538 assert_eq!(dfs_result[0], "a");
539 }
540
541 #[test]
542 fn shortest_path_finds_path() {
543 let g = linear_graph();
544 let path = shortest_path(&g, "a", "d").unwrap();
545 assert_eq!(path, vec!["a", "b", "c", "d"]);
546 }
547
548 #[test]
549 fn shortest_path_no_path() {
550 let mut g = MaterializedGraph::new(test_ontology());
551 g.apply(&add_node("a", "entity", 1));
552 g.apply(&add_node("b", "entity", 2));
553 assert!(shortest_path(&g, "a", "b").is_none());
555 }
556
557 #[test]
558 fn impact_analysis_reverse_traversal() {
559 let g = linear_graph(); let impact = impact_analysis(&g, "d", None);
562 assert!(impact.contains(&"d".to_string()));
563 assert!(impact.contains(&"c".to_string()));
564 assert!(impact.contains(&"b".to_string()));
565 assert!(impact.contains(&"a".to_string()));
566 }
567
568 #[test]
569 fn subgraph_extraction() {
570 let g = linear_graph(); let (nodes, edges) = subgraph(&g, "b", 1);
572 assert!(nodes.contains(&"b".to_string()));
574 assert!(nodes.contains(&"a".to_string()));
575 assert!(nodes.contains(&"c".to_string()));
576 assert!(!nodes.contains(&"d".to_string())); assert_eq!(edges.len(), 2); }
579
580 #[test]
581 fn pattern_match_type_chain() {
582 let mut g = MaterializedGraph::new(test_ontology());
583 g.apply(&add_node("src1", "source", 1));
584 g.apply(&add_node("proc1", "processor", 2));
585 g.apply(&add_node("q1", "queue", 3));
586 g.apply(&add_node("snk1", "sink", 4));
587 g.apply(&add_edge("e1", "FEEDS", "src1", "proc1", 5));
588 g.apply(&add_edge("e2", "ROUTES", "proc1", "q1", 6));
589 g.apply(&add_edge("e3", "ROUTES", "q1", "snk1", 7));
590
591 let chains = pattern_match(&g, &["source", "processor", "queue", "sink"], 1000);
592 assert_eq!(chains.len(), 1);
593 assert_eq!(chains[0], vec!["src1", "proc1", "q1", "snk1"]);
594 }
595
596 #[test]
597 fn topological_sort_dependency_order() {
598 let g = linear_graph(); let sorted = topological_sort(&g).unwrap();
600 let pos = |id: &str| sorted.iter().position(|x| x == id).unwrap();
602 assert!(pos("a") < pos("b"));
603 assert!(pos("b") < pos("c"));
604 assert!(pos("c") < pos("d"));
605 }
606
607 #[test]
608 fn cycle_detection() {
609 let mut g = MaterializedGraph::new(test_ontology());
610 g.apply(&add_node("a", "entity", 1));
611 g.apply(&add_node("b", "entity", 2));
612 g.apply(&add_node("c", "entity", 3));
613 g.apply(&add_edge("ab", "DEPENDS_ON", "a", "b", 4));
614 g.apply(&add_edge("bc", "DEPENDS_ON", "b", "c", 5));
615 g.apply(&add_edge("ca", "DEPENDS_ON", "c", "a", 6)); assert!(has_cycle(&g));
618 assert!(topological_sort(&g).is_none());
619 }
620}