Skip to main content

reddb_server/runtime/
impl_graph.rs

1use super::*;
2
3impl RedDBRuntime {
4    pub fn graph_neighborhood(
5        &self,
6        node: &str,
7        direction: RuntimeGraphDirection,
8        max_depth: usize,
9        edge_labels: Option<Vec<String>>,
10        projection: Option<RuntimeGraphProjection>,
11    ) -> RedDBResult<RuntimeGraphNeighborhoodResult> {
12        let graph =
13            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
14        let node = resolve_graph_node_id(&graph, node)?;
15        let edge_filters = merge_edge_filters(edge_labels, projection.as_ref());
16
17        let mut visited: HashMap<String, usize> = HashMap::new();
18        let mut queue = VecDeque::new();
19        let mut nodes = Vec::new();
20        let mut edges = Vec::new();
21        let mut seen_edges = HashSet::new();
22
23        visited.insert(node.clone(), 0);
24        queue.push_back((node.clone(), 0usize));
25
26        while let Some((current, depth)) = queue.pop_front() {
27            if let Some(stored) = graph.get_node(&current) {
28                nodes.push(RuntimeGraphVisit {
29                    depth,
30                    node: stored_node_to_runtime(stored),
31                });
32            }
33
34            if depth >= max_depth {
35                continue;
36            }
37
38            let mut adjacent =
39                graph_adjacent_edges(&graph, &current, direction, edge_filters.as_ref());
40            adjacent.sort_by(|left, right| left.0.cmp(&right.0));
41
42            for (neighbor, edge) in adjacent {
43                push_runtime_edge(&mut edges, &mut seen_edges, edge);
44                if !visited.contains_key(&neighbor) {
45                    visited.insert(neighbor.clone(), depth + 1);
46                    queue.push_back((neighbor, depth + 1));
47                }
48            }
49        }
50
51        Ok(RuntimeGraphNeighborhoodResult {
52            source: node,
53            direction,
54            max_depth,
55            nodes,
56            edges,
57        })
58    }
59
60    pub fn graph_traverse(
61        &self,
62        source: &str,
63        direction: RuntimeGraphDirection,
64        max_depth: usize,
65        strategy: RuntimeGraphTraversalStrategy,
66        edge_labels: Option<Vec<String>>,
67        projection: Option<RuntimeGraphProjection>,
68    ) -> RedDBResult<RuntimeGraphTraversalResult> {
69        let graph =
70            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
71        let source = resolve_graph_node_id(&graph, source)?;
72        let edge_filters = merge_edge_filters(edge_labels, projection.as_ref());
73
74        let mut visits = Vec::new();
75        let mut edges = Vec::new();
76        let mut seen_nodes = HashSet::new();
77        let mut seen_edges = HashSet::new();
78
79        match strategy {
80            RuntimeGraphTraversalStrategy::Bfs => {
81                let mut queue = VecDeque::new();
82                queue.push_back((source.clone(), 0usize));
83                seen_nodes.insert(source.clone());
84
85                while let Some((current, depth)) = queue.pop_front() {
86                    if let Some(stored) = graph.get_node(&current) {
87                        visits.push(RuntimeGraphVisit {
88                            depth,
89                            node: stored_node_to_runtime(stored),
90                        });
91                    }
92
93                    if depth >= max_depth {
94                        continue;
95                    }
96
97                    let mut adjacent =
98                        graph_adjacent_edges(&graph, &current, direction, edge_filters.as_ref());
99                    adjacent.sort_by(|left, right| left.0.cmp(&right.0));
100                    for (neighbor, edge) in adjacent {
101                        push_runtime_edge(&mut edges, &mut seen_edges, edge);
102                        if seen_nodes.insert(neighbor.clone()) {
103                            queue.push_back((neighbor, depth + 1));
104                        }
105                    }
106                }
107            }
108            RuntimeGraphTraversalStrategy::Dfs => {
109                let mut stack = vec![(source.clone(), 0usize)];
110                while let Some((current, depth)) = stack.pop() {
111                    if !seen_nodes.insert(current.clone()) {
112                        continue;
113                    }
114
115                    if let Some(stored) = graph.get_node(&current) {
116                        visits.push(RuntimeGraphVisit {
117                            depth,
118                            node: stored_node_to_runtime(stored),
119                        });
120                    }
121
122                    if depth >= max_depth {
123                        continue;
124                    }
125
126                    let mut adjacent =
127                        graph_adjacent_edges(&graph, &current, direction, edge_filters.as_ref());
128                    adjacent.sort_by(|left, right| right.0.cmp(&left.0));
129                    for (neighbor, edge) in adjacent {
130                        push_runtime_edge(&mut edges, &mut seen_edges, edge);
131                        if !seen_nodes.contains(&neighbor) {
132                            stack.push((neighbor, depth + 1));
133                        }
134                    }
135                }
136            }
137        }
138
139        Ok(RuntimeGraphTraversalResult {
140            source,
141            direction,
142            strategy,
143            max_depth,
144            visits,
145            edges,
146        })
147    }
148
149    pub fn graph_shortest_path(
150        &self,
151        source: &str,
152        target: &str,
153        direction: RuntimeGraphDirection,
154        algorithm: RuntimeGraphPathAlgorithm,
155        edge_labels: Option<Vec<String>>,
156        projection: Option<RuntimeGraphProjection>,
157    ) -> RedDBResult<RuntimeGraphPathResult> {
158        let graph =
159            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
160        let source_owned = resolve_graph_node_id(&graph, source)?;
161        let target_owned = resolve_graph_node_id(&graph, target)?;
162        let source = source_owned.as_str();
163        let target = target_owned.as_str();
164
165        let merged_edge_filters = merge_edge_filters(edge_labels, projection.as_ref());
166        let path = match (direction, merged_edge_filters.as_ref()) {
167            (RuntimeGraphDirection::Outgoing, None) => match algorithm {
168                RuntimeGraphPathAlgorithm::Bfs => {
169                    let result = BFS::shortest_path(&graph, source, target);
170                    RuntimeGraphPathResult {
171                        source: source.to_string(),
172                        target: target.to_string(),
173                        direction,
174                        algorithm,
175                        nodes_visited: result.nodes_visited,
176                        negative_cycle_detected: None,
177                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
178                    }
179                }
180                RuntimeGraphPathAlgorithm::Dijkstra => {
181                    let result = Dijkstra::shortest_path(&graph, source, target);
182                    RuntimeGraphPathResult {
183                        source: source.to_string(),
184                        target: target.to_string(),
185                        direction,
186                        algorithm,
187                        nodes_visited: result.nodes_visited,
188                        negative_cycle_detected: None,
189                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
190                    }
191                }
192                RuntimeGraphPathAlgorithm::AStar => {
193                    let result = AStar::shortest_path_no_heuristic(&graph, source, target);
194                    RuntimeGraphPathResult {
195                        source: source.to_string(),
196                        target: target.to_string(),
197                        direction,
198                        algorithm,
199                        nodes_visited: result.nodes_visited,
200                        negative_cycle_detected: None,
201                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
202                    }
203                }
204                RuntimeGraphPathAlgorithm::BellmanFord => {
205                    let result = BellmanFord::shortest_path(&graph, source, target);
206                    RuntimeGraphPathResult {
207                        source: source.to_string(),
208                        target: target.to_string(),
209                        direction,
210                        algorithm,
211                        nodes_visited: result.nodes_visited,
212                        negative_cycle_detected: Some(result.has_negative_cycle),
213                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
214                    }
215                }
216            },
217            _ => shortest_path_runtime(
218                &graph,
219                source,
220                target,
221                direction,
222                algorithm,
223                merged_edge_filters.as_ref(),
224            )?,
225        };
226
227        Ok(path)
228    }
229
230    pub fn graph_components(
231        &self,
232        mode: RuntimeGraphComponentsMode,
233        min_size: usize,
234        projection: Option<RuntimeGraphProjection>,
235    ) -> RedDBResult<RuntimeGraphComponentsResult> {
236        let graph =
237            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
238        let min_size = min_size.max(1);
239        let components = match mode {
240            RuntimeGraphComponentsMode::Connected => ConnectedComponents::find(&graph)
241                .components
242                .into_iter()
243                .filter(|component| component.size >= min_size)
244                .map(|component| RuntimeGraphComponent {
245                    id: component.id,
246                    size: component.size,
247                    nodes: component.nodes,
248                })
249                .collect::<Vec<_>>(),
250            RuntimeGraphComponentsMode::Weak => WeaklyConnectedComponents::find(&graph)
251                .components
252                .into_iter()
253                .filter(|component| component.len() >= min_size)
254                .enumerate()
255                .map(|(index, nodes)| RuntimeGraphComponent {
256                    id: format!("wcc:{index}"),
257                    size: nodes.len(),
258                    nodes,
259                })
260                .collect::<Vec<_>>(),
261            RuntimeGraphComponentsMode::Strong => StronglyConnectedComponents::find(&graph)
262                .components
263                .into_iter()
264                .filter(|component| component.len() >= min_size)
265                .enumerate()
266                .map(|(index, nodes)| RuntimeGraphComponent {
267                    id: format!("scc:{index}"),
268                    size: nodes.len(),
269                    nodes,
270                })
271                .collect::<Vec<_>>(),
272        };
273
274        Ok(RuntimeGraphComponentsResult {
275            mode,
276            count: components.len(),
277            components,
278        })
279    }
280
281    pub fn graph_centrality(
282        &self,
283        algorithm: RuntimeGraphCentralityAlgorithm,
284        top_k: usize,
285        normalize: bool,
286        max_iterations: Option<usize>,
287        epsilon: Option<f64>,
288        alpha: Option<f64>,
289        projection: Option<RuntimeGraphProjection>,
290    ) -> RedDBResult<RuntimeGraphCentralityResult> {
291        let graph =
292            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
293        let top_k = top_k.max(1);
294
295        match algorithm {
296            RuntimeGraphCentralityAlgorithm::Degree => {
297                let result = DegreeCentrality::compute(&graph);
298                let mut degree_scores = Vec::new();
299                let mut pairs: Vec<_> = result
300                    .total_degree
301                    .iter()
302                    .map(|(node_id, total_degree)| (node_id.clone(), *total_degree))
303                    .collect();
304                pairs
305                    .sort_by(|left, right| right.1.cmp(&left.1).then_with(|| left.0.cmp(&right.0)));
306                pairs.truncate(top_k);
307
308                for (node_id, total_degree) in pairs {
309                    if let Some(node) = graph.get_node(&node_id) {
310                        degree_scores.push(RuntimeGraphDegreeScore {
311                            node: stored_node_to_runtime(node),
312                            in_degree: result.in_degree.get(&node_id).copied().unwrap_or(0),
313                            out_degree: result.out_degree.get(&node_id).copied().unwrap_or(0),
314                            total_degree,
315                        });
316                    }
317                }
318
319                Ok(RuntimeGraphCentralityResult {
320                    algorithm,
321                    normalized: None,
322                    iterations: None,
323                    converged: None,
324                    scores: Vec::new(),
325                    degree_scores,
326                })
327            }
328            RuntimeGraphCentralityAlgorithm::Closeness => {
329                let result = ClosenessCentrality::compute(&graph);
330                Ok(RuntimeGraphCentralityResult {
331                    algorithm,
332                    normalized: None,
333                    iterations: None,
334                    converged: None,
335                    scores: top_runtime_scores(&graph, result.scores, top_k),
336                    degree_scores: Vec::new(),
337                })
338            }
339            RuntimeGraphCentralityAlgorithm::Betweenness => {
340                let result = BetweennessCentrality::compute(&graph, normalize);
341                Ok(RuntimeGraphCentralityResult {
342                    algorithm,
343                    normalized: Some(result.normalized),
344                    iterations: None,
345                    converged: None,
346                    scores: top_runtime_scores(&graph, result.scores, top_k),
347                    degree_scores: Vec::new(),
348                })
349            }
350            RuntimeGraphCentralityAlgorithm::Eigenvector => {
351                let mut runner = EigenvectorCentrality::new();
352                if let Some(max_iterations) = max_iterations {
353                    runner.max_iterations = max_iterations.max(1);
354                }
355                if let Some(epsilon) = epsilon {
356                    runner.epsilon = epsilon.max(0.0);
357                }
358                let result = runner.compute(&graph);
359                Ok(RuntimeGraphCentralityResult {
360                    algorithm,
361                    normalized: None,
362                    iterations: Some(result.iterations),
363                    converged: Some(result.converged),
364                    scores: top_runtime_scores(&graph, result.scores, top_k),
365                    degree_scores: Vec::new(),
366                })
367            }
368            RuntimeGraphCentralityAlgorithm::PageRank => {
369                let mut runner = PageRank::new();
370                if let Some(max_iterations) = max_iterations {
371                    runner = runner.max_iterations(max_iterations.max(1));
372                }
373                if let Some(alpha) = alpha {
374                    runner = runner.alpha(alpha);
375                }
376                if let Some(epsilon) = epsilon {
377                    runner = runner.epsilon(epsilon);
378                }
379                let result = runner.run(&graph);
380                Ok(RuntimeGraphCentralityResult {
381                    algorithm,
382                    normalized: None,
383                    iterations: Some(result.iterations),
384                    converged: Some(result.converged),
385                    scores: top_runtime_scores(&graph, result.scores, top_k),
386                    degree_scores: Vec::new(),
387                })
388            }
389        }
390    }
391
392    pub fn graph_communities(
393        &self,
394        algorithm: RuntimeGraphCommunityAlgorithm,
395        min_size: usize,
396        max_iterations: Option<usize>,
397        resolution: Option<f64>,
398        projection: Option<RuntimeGraphProjection>,
399    ) -> RedDBResult<RuntimeGraphCommunityResult> {
400        let graph =
401            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
402        let min_size = min_size.max(1);
403
404        match algorithm {
405            RuntimeGraphCommunityAlgorithm::LabelPropagation => {
406                let mut runner = LabelPropagation::new();
407                if let Some(max_iterations) = max_iterations {
408                    runner = runner.max_iterations(max_iterations.max(1));
409                }
410                let result = runner.run(&graph);
411                let communities = result
412                    .communities
413                    .into_iter()
414                    .filter(|community| community.size >= min_size)
415                    .map(|community| RuntimeGraphCommunity {
416                        id: community.label,
417                        size: community.size,
418                        nodes: community.nodes,
419                    })
420                    .collect::<Vec<_>>();
421                Ok(RuntimeGraphCommunityResult {
422                    algorithm,
423                    count: communities.len(),
424                    iterations: Some(result.iterations),
425                    converged: Some(result.converged),
426                    modularity: None,
427                    passes: None,
428                    communities,
429                })
430            }
431            RuntimeGraphCommunityAlgorithm::Louvain => {
432                let mut runner = Louvain::new();
433                if let Some(max_iterations) = max_iterations {
434                    runner = runner.max_iterations(max_iterations.max(1));
435                }
436                if let Some(resolution) = resolution {
437                    runner = runner.resolution(resolution.max(0.0));
438                }
439                let result = runner.run(&graph);
440                let mut communities = result
441                    .community_sizes()
442                    .into_iter()
443                    .filter(|(_, size)| *size >= min_size)
444                    .map(|(id, size)| RuntimeGraphCommunity {
445                        id: format!("community:{id}"),
446                        size,
447                        nodes: result.get_community(id),
448                    })
449                    .collect::<Vec<_>>();
450                communities.sort_by(|left, right| {
451                    right
452                        .size
453                        .cmp(&left.size)
454                        .then_with(|| left.id.cmp(&right.id))
455                });
456                Ok(RuntimeGraphCommunityResult {
457                    algorithm,
458                    count: communities.len(),
459                    iterations: None,
460                    converged: None,
461                    modularity: Some(result.modularity),
462                    passes: Some(result.passes),
463                    communities,
464                })
465            }
466        }
467    }
468
469    pub fn graph_clustering(
470        &self,
471        top_k: usize,
472        include_triangles: bool,
473        projection: Option<RuntimeGraphProjection>,
474    ) -> RedDBResult<RuntimeGraphClusteringResult> {
475        let graph =
476            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
477        let top_k = top_k.max(1);
478        let result = ClusteringCoefficient::compute(&graph);
479        let triangle_count = if include_triangles {
480            Some(crate::storage::engine::TriangleCounting::count(&graph).count)
481        } else {
482            None
483        };
484
485        Ok(RuntimeGraphClusteringResult {
486            global: result.global,
487            local: top_runtime_scores(&graph, result.local, top_k),
488            triangle_count,
489        })
490    }
491
492    pub fn graph_personalized_pagerank(
493        &self,
494        seeds: Vec<String>,
495        top_k: usize,
496        alpha: Option<f64>,
497        epsilon: Option<f64>,
498        max_iterations: Option<usize>,
499        projection: Option<RuntimeGraphProjection>,
500    ) -> RedDBResult<RuntimeGraphCentralityResult> {
501        let graph =
502            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
503        if seeds.is_empty() {
504            return Err(RedDBError::Query(
505                "personalized pagerank requires at least one seed".to_string(),
506            ));
507        }
508        for seed in &seeds {
509            ensure_graph_node(&graph, seed)?;
510        }
511
512        let mut runner = PersonalizedPageRank::new(seeds);
513        if let Some(alpha) = alpha {
514            runner = runner.alpha(alpha);
515        }
516        if let Some(epsilon) = epsilon {
517            runner = runner.epsilon(epsilon);
518        }
519        if let Some(max_iterations) = max_iterations {
520            runner = runner.max_iterations(max_iterations.max(1));
521        }
522        let result = runner.run(&graph);
523
524        Ok(RuntimeGraphCentralityResult {
525            algorithm: RuntimeGraphCentralityAlgorithm::PageRank,
526            normalized: None,
527            iterations: Some(result.iterations),
528            converged: Some(result.converged),
529            scores: top_runtime_scores(&graph, result.scores, top_k.max(1)),
530            degree_scores: Vec::new(),
531        })
532    }
533
534    pub fn graph_hits(
535        &self,
536        top_k: usize,
537        epsilon: Option<f64>,
538        max_iterations: Option<usize>,
539        projection: Option<RuntimeGraphProjection>,
540    ) -> RedDBResult<RuntimeGraphHitsResult> {
541        let graph =
542            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
543        let mut runner = HITS::new();
544        if let Some(epsilon) = epsilon {
545            runner.epsilon = epsilon.max(0.0);
546        }
547        if let Some(max_iterations) = max_iterations {
548            runner.max_iterations = max_iterations.max(1);
549        }
550        let result = runner.compute(&graph);
551
552        Ok(RuntimeGraphHitsResult {
553            iterations: result.iterations,
554            converged: result.converged,
555            hubs: top_runtime_scores(&graph, result.hub_scores, top_k.max(1)),
556            authorities: top_runtime_scores(&graph, result.authority_scores, top_k.max(1)),
557        })
558    }
559
560    pub fn graph_cycles(
561        &self,
562        max_length: usize,
563        max_cycles: usize,
564        projection: Option<RuntimeGraphProjection>,
565    ) -> RedDBResult<RuntimeGraphCyclesResult> {
566        let graph =
567            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
568        let result = CycleDetector::new()
569            .max_length(max_length.max(2))
570            .max_cycles(max_cycles.max(1))
571            .find(&graph);
572
573        Ok(RuntimeGraphCyclesResult {
574            limit_reached: result.limit_reached,
575            cycles: result
576                .cycles
577                .into_iter()
578                .map(|cycle| cycle_to_runtime(&graph, cycle))
579                .collect(),
580        })
581    }
582
583    pub fn graph_topological_sort(
584        &self,
585        projection: Option<RuntimeGraphProjection>,
586    ) -> RedDBResult<RuntimeGraphTopologicalSortResult> {
587        let graph =
588            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
589        let ordered_nodes = match DFS::topological_sort(&graph) {
590            Some(order) => order
591                .into_iter()
592                .filter_map(|id| graph.get_node(&id))
593                .map(stored_node_to_runtime)
594                .collect(),
595            None => Vec::new(),
596        };
597
598        Ok(RuntimeGraphTopologicalSortResult {
599            acyclic: !ordered_nodes.is_empty() || graph.node_count() == 0,
600            ordered_nodes,
601        })
602    }
603
604    pub fn graph_properties(
605        &self,
606        projection: Option<RuntimeGraphProjection>,
607    ) -> RedDBResult<RuntimeGraphPropertiesResult> {
608        let graph =
609            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
610        let node_count = graph.node_count() as usize;
611        let edges = graph.iter_all_edges();
612        let edge_count = edges.len();
613
614        let connected = ConnectedComponents::find(&graph);
615        let weak = WeaklyConnectedComponents::find(&graph);
616        let strong = StronglyConnectedComponents::find(&graph);
617        let cycle_result = CycleDetector::new()
618            .max_length(node_count.max(2))
619            .max_cycles(1)
620            .find(&graph);
621
622        let mut self_loop_count = 0usize;
623        let mut negative_edge_count = 0usize;
624        let mut directed_pairs = HashSet::new();
625        let mut undirected_pairs = HashSet::new();
626
627        for edge in &edges {
628            if edge.weight < 0.0 {
629                negative_edge_count += 1;
630            }
631            if edge.source_id == edge.target_id {
632                self_loop_count += 1;
633                continue;
634            }
635
636            directed_pairs.insert((edge.source_id.clone(), edge.target_id.clone()));
637            let (left, right) = if edge.source_id <= edge.target_id {
638                (edge.source_id.clone(), edge.target_id.clone())
639            } else {
640                (edge.target_id.clone(), edge.source_id.clone())
641            };
642            undirected_pairs.insert((left, right));
643        }
644
645        let expected_undirected_pairs = node_count.saturating_mul(node_count.saturating_sub(1)) / 2;
646        let expected_directed_pairs = node_count.saturating_mul(node_count.saturating_sub(1));
647        let density = if expected_undirected_pairs == 0 {
648            0.0
649        } else {
650            undirected_pairs.len() as f64 / expected_undirected_pairs as f64
651        };
652        let density_directed = if expected_directed_pairs == 0 {
653            0.0
654        } else {
655            directed_pairs.len() as f64 / expected_directed_pairs as f64
656        };
657
658        let is_empty = node_count == 0;
659        let is_connected = node_count <= 1 || connected.count == 1;
660        let is_weakly_connected = node_count <= 1 || weak.count == 1;
661        let is_strongly_connected = node_count <= 1 || strong.count == 1;
662        let is_cyclic = !cycle_result.cycles.is_empty();
663
664        Ok(RuntimeGraphPropertiesResult {
665            node_count,
666            edge_count,
667            self_loop_count,
668            negative_edge_count,
669            connected_component_count: connected.count,
670            weak_component_count: weak.count,
671            strong_component_count: strong.count,
672            is_empty,
673            is_connected,
674            is_weakly_connected,
675            is_strongly_connected,
676            is_complete: node_count <= 1 || undirected_pairs.len() == expected_undirected_pairs,
677            is_complete_directed: node_count <= 1
678                || directed_pairs.len() == expected_directed_pairs,
679            is_cyclic,
680            is_circular: is_cyclic,
681            is_acyclic: !is_cyclic,
682            is_tree: node_count > 0 && is_connected && undirected_pairs.len() + 1 == node_count,
683            density,
684            density_directed,
685        })
686    }
687}