Skip to main content

reddb_server/runtime/
impl_graph.rs

1use super::*;
2
3impl RedDBRuntime {
4    pub fn graph_neighborhood(
5        &self,
6        node: &str,
7        direction: RuntimeGraphDirection,
8        max_depth: usize,
9        edge_labels: Option<Vec<String>>,
10        projection: Option<RuntimeGraphProjection>,
11    ) -> RedDBResult<RuntimeGraphNeighborhoodResult> {
12        let graph =
13            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
14        ensure_graph_node(&graph, node)?;
15        let edge_filters = merge_edge_filters(edge_labels, projection.as_ref());
16
17        let mut visited: HashMap<String, usize> = HashMap::new();
18        let mut queue = VecDeque::new();
19        let mut nodes = Vec::new();
20        let mut edges = Vec::new();
21        let mut seen_edges = HashSet::new();
22
23        visited.insert(node.to_string(), 0);
24        queue.push_back((node.to_string(), 0usize));
25
26        while let Some((current, depth)) = queue.pop_front() {
27            if let Some(stored) = graph.get_node(&current) {
28                nodes.push(RuntimeGraphVisit {
29                    depth,
30                    node: stored_node_to_runtime(stored),
31                });
32            }
33
34            if depth >= max_depth {
35                continue;
36            }
37
38            let mut adjacent =
39                graph_adjacent_edges(&graph, &current, direction, edge_filters.as_ref());
40            adjacent.sort_by(|left, right| left.0.cmp(&right.0));
41
42            for (neighbor, edge) in adjacent {
43                push_runtime_edge(&mut edges, &mut seen_edges, edge);
44                if !visited.contains_key(&neighbor) {
45                    visited.insert(neighbor.clone(), depth + 1);
46                    queue.push_back((neighbor, depth + 1));
47                }
48            }
49        }
50
51        Ok(RuntimeGraphNeighborhoodResult {
52            source: node.to_string(),
53            direction,
54            max_depth,
55            nodes,
56            edges,
57        })
58    }
59
60    pub fn graph_traverse(
61        &self,
62        source: &str,
63        direction: RuntimeGraphDirection,
64        max_depth: usize,
65        strategy: RuntimeGraphTraversalStrategy,
66        edge_labels: Option<Vec<String>>,
67        projection: Option<RuntimeGraphProjection>,
68    ) -> RedDBResult<RuntimeGraphTraversalResult> {
69        let graph =
70            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
71        ensure_graph_node(&graph, source)?;
72        let edge_filters = merge_edge_filters(edge_labels, projection.as_ref());
73
74        let mut visits = Vec::new();
75        let mut edges = Vec::new();
76        let mut seen_nodes = HashSet::new();
77        let mut seen_edges = HashSet::new();
78
79        match strategy {
80            RuntimeGraphTraversalStrategy::Bfs => {
81                let mut queue = VecDeque::new();
82                queue.push_back((source.to_string(), 0usize));
83                seen_nodes.insert(source.to_string());
84
85                while let Some((current, depth)) = queue.pop_front() {
86                    if let Some(stored) = graph.get_node(&current) {
87                        visits.push(RuntimeGraphVisit {
88                            depth,
89                            node: stored_node_to_runtime(stored),
90                        });
91                    }
92
93                    if depth >= max_depth {
94                        continue;
95                    }
96
97                    let mut adjacent =
98                        graph_adjacent_edges(&graph, &current, direction, edge_filters.as_ref());
99                    adjacent.sort_by(|left, right| left.0.cmp(&right.0));
100                    for (neighbor, edge) in adjacent {
101                        push_runtime_edge(&mut edges, &mut seen_edges, edge);
102                        if seen_nodes.insert(neighbor.clone()) {
103                            queue.push_back((neighbor, depth + 1));
104                        }
105                    }
106                }
107            }
108            RuntimeGraphTraversalStrategy::Dfs => {
109                let mut stack = vec![(source.to_string(), 0usize)];
110                while let Some((current, depth)) = stack.pop() {
111                    if !seen_nodes.insert(current.clone()) {
112                        continue;
113                    }
114
115                    if let Some(stored) = graph.get_node(&current) {
116                        visits.push(RuntimeGraphVisit {
117                            depth,
118                            node: stored_node_to_runtime(stored),
119                        });
120                    }
121
122                    if depth >= max_depth {
123                        continue;
124                    }
125
126                    let mut adjacent =
127                        graph_adjacent_edges(&graph, &current, direction, edge_filters.as_ref());
128                    adjacent.sort_by(|left, right| right.0.cmp(&left.0));
129                    for (neighbor, edge) in adjacent {
130                        push_runtime_edge(&mut edges, &mut seen_edges, edge);
131                        if !seen_nodes.contains(&neighbor) {
132                            stack.push((neighbor, depth + 1));
133                        }
134                    }
135                }
136            }
137        }
138
139        Ok(RuntimeGraphTraversalResult {
140            source: source.to_string(),
141            direction,
142            strategy,
143            max_depth,
144            visits,
145            edges,
146        })
147    }
148
149    pub fn graph_shortest_path(
150        &self,
151        source: &str,
152        target: &str,
153        direction: RuntimeGraphDirection,
154        algorithm: RuntimeGraphPathAlgorithm,
155        edge_labels: Option<Vec<String>>,
156        projection: Option<RuntimeGraphProjection>,
157    ) -> RedDBResult<RuntimeGraphPathResult> {
158        let graph =
159            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
160        ensure_graph_node(&graph, source)?;
161        ensure_graph_node(&graph, target)?;
162
163        let merged_edge_filters = merge_edge_filters(edge_labels, projection.as_ref());
164        let path = match (direction, merged_edge_filters.as_ref()) {
165            (RuntimeGraphDirection::Outgoing, None) => match algorithm {
166                RuntimeGraphPathAlgorithm::Bfs => {
167                    let result = BFS::shortest_path(&graph, source, target);
168                    RuntimeGraphPathResult {
169                        source: source.to_string(),
170                        target: target.to_string(),
171                        direction,
172                        algorithm,
173                        nodes_visited: result.nodes_visited,
174                        negative_cycle_detected: None,
175                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
176                    }
177                }
178                RuntimeGraphPathAlgorithm::Dijkstra => {
179                    let result = Dijkstra::shortest_path(&graph, source, target);
180                    RuntimeGraphPathResult {
181                        source: source.to_string(),
182                        target: target.to_string(),
183                        direction,
184                        algorithm,
185                        nodes_visited: result.nodes_visited,
186                        negative_cycle_detected: None,
187                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
188                    }
189                }
190                RuntimeGraphPathAlgorithm::AStar => {
191                    let result = AStar::shortest_path_no_heuristic(&graph, source, target);
192                    RuntimeGraphPathResult {
193                        source: source.to_string(),
194                        target: target.to_string(),
195                        direction,
196                        algorithm,
197                        nodes_visited: result.nodes_visited,
198                        negative_cycle_detected: None,
199                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
200                    }
201                }
202                RuntimeGraphPathAlgorithm::BellmanFord => {
203                    let result = BellmanFord::shortest_path(&graph, source, target);
204                    RuntimeGraphPathResult {
205                        source: source.to_string(),
206                        target: target.to_string(),
207                        direction,
208                        algorithm,
209                        nodes_visited: result.nodes_visited,
210                        negative_cycle_detected: Some(result.has_negative_cycle),
211                        path: result.path.map(|path| path_to_runtime(&graph, &path)),
212                    }
213                }
214            },
215            _ => shortest_path_runtime(
216                &graph,
217                source,
218                target,
219                direction,
220                algorithm,
221                merged_edge_filters.as_ref(),
222            )?,
223        };
224
225        Ok(path)
226    }
227
228    pub fn graph_components(
229        &self,
230        mode: RuntimeGraphComponentsMode,
231        min_size: usize,
232        projection: Option<RuntimeGraphProjection>,
233    ) -> RedDBResult<RuntimeGraphComponentsResult> {
234        let graph =
235            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
236        let min_size = min_size.max(1);
237        let components = match mode {
238            RuntimeGraphComponentsMode::Connected => ConnectedComponents::find(&graph)
239                .components
240                .into_iter()
241                .filter(|component| component.size >= min_size)
242                .map(|component| RuntimeGraphComponent {
243                    id: component.id,
244                    size: component.size,
245                    nodes: component.nodes,
246                })
247                .collect::<Vec<_>>(),
248            RuntimeGraphComponentsMode::Weak => WeaklyConnectedComponents::find(&graph)
249                .components
250                .into_iter()
251                .filter(|component| component.len() >= min_size)
252                .enumerate()
253                .map(|(index, nodes)| RuntimeGraphComponent {
254                    id: format!("wcc:{index}"),
255                    size: nodes.len(),
256                    nodes,
257                })
258                .collect::<Vec<_>>(),
259            RuntimeGraphComponentsMode::Strong => StronglyConnectedComponents::find(&graph)
260                .components
261                .into_iter()
262                .filter(|component| component.len() >= min_size)
263                .enumerate()
264                .map(|(index, nodes)| RuntimeGraphComponent {
265                    id: format!("scc:{index}"),
266                    size: nodes.len(),
267                    nodes,
268                })
269                .collect::<Vec<_>>(),
270        };
271
272        Ok(RuntimeGraphComponentsResult {
273            mode,
274            count: components.len(),
275            components,
276        })
277    }
278
279    pub fn graph_centrality(
280        &self,
281        algorithm: RuntimeGraphCentralityAlgorithm,
282        top_k: usize,
283        normalize: bool,
284        max_iterations: Option<usize>,
285        epsilon: Option<f64>,
286        alpha: Option<f64>,
287        projection: Option<RuntimeGraphProjection>,
288    ) -> RedDBResult<RuntimeGraphCentralityResult> {
289        let graph =
290            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
291        let top_k = top_k.max(1);
292
293        match algorithm {
294            RuntimeGraphCentralityAlgorithm::Degree => {
295                let result = DegreeCentrality::compute(&graph);
296                let mut degree_scores = Vec::new();
297                let mut pairs: Vec<_> = result
298                    .total_degree
299                    .iter()
300                    .map(|(node_id, total_degree)| (node_id.clone(), *total_degree))
301                    .collect();
302                pairs
303                    .sort_by(|left, right| right.1.cmp(&left.1).then_with(|| left.0.cmp(&right.0)));
304                pairs.truncate(top_k);
305
306                for (node_id, total_degree) in pairs {
307                    if let Some(node) = graph.get_node(&node_id) {
308                        degree_scores.push(RuntimeGraphDegreeScore {
309                            node: stored_node_to_runtime(node),
310                            in_degree: result.in_degree.get(&node_id).copied().unwrap_or(0),
311                            out_degree: result.out_degree.get(&node_id).copied().unwrap_or(0),
312                            total_degree,
313                        });
314                    }
315                }
316
317                Ok(RuntimeGraphCentralityResult {
318                    algorithm,
319                    normalized: None,
320                    iterations: None,
321                    converged: None,
322                    scores: Vec::new(),
323                    degree_scores,
324                })
325            }
326            RuntimeGraphCentralityAlgorithm::Closeness => {
327                let result = ClosenessCentrality::compute(&graph);
328                Ok(RuntimeGraphCentralityResult {
329                    algorithm,
330                    normalized: None,
331                    iterations: None,
332                    converged: None,
333                    scores: top_runtime_scores(&graph, result.scores, top_k),
334                    degree_scores: Vec::new(),
335                })
336            }
337            RuntimeGraphCentralityAlgorithm::Betweenness => {
338                let result = BetweennessCentrality::compute(&graph, normalize);
339                Ok(RuntimeGraphCentralityResult {
340                    algorithm,
341                    normalized: Some(result.normalized),
342                    iterations: None,
343                    converged: None,
344                    scores: top_runtime_scores(&graph, result.scores, top_k),
345                    degree_scores: Vec::new(),
346                })
347            }
348            RuntimeGraphCentralityAlgorithm::Eigenvector => {
349                let mut runner = EigenvectorCentrality::new();
350                if let Some(max_iterations) = max_iterations {
351                    runner.max_iterations = max_iterations.max(1);
352                }
353                if let Some(epsilon) = epsilon {
354                    runner.epsilon = epsilon.max(0.0);
355                }
356                let result = runner.compute(&graph);
357                Ok(RuntimeGraphCentralityResult {
358                    algorithm,
359                    normalized: None,
360                    iterations: Some(result.iterations),
361                    converged: Some(result.converged),
362                    scores: top_runtime_scores(&graph, result.scores, top_k),
363                    degree_scores: Vec::new(),
364                })
365            }
366            RuntimeGraphCentralityAlgorithm::PageRank => {
367                let mut runner = PageRank::new();
368                if let Some(max_iterations) = max_iterations {
369                    runner = runner.max_iterations(max_iterations.max(1));
370                }
371                if let Some(alpha) = alpha {
372                    runner = runner.alpha(alpha);
373                }
374                if let Some(epsilon) = epsilon {
375                    runner = runner.epsilon(epsilon);
376                }
377                let result = runner.run(&graph);
378                Ok(RuntimeGraphCentralityResult {
379                    algorithm,
380                    normalized: None,
381                    iterations: Some(result.iterations),
382                    converged: Some(result.converged),
383                    scores: top_runtime_scores(&graph, result.scores, top_k),
384                    degree_scores: Vec::new(),
385                })
386            }
387        }
388    }
389
390    pub fn graph_communities(
391        &self,
392        algorithm: RuntimeGraphCommunityAlgorithm,
393        min_size: usize,
394        max_iterations: Option<usize>,
395        resolution: Option<f64>,
396        projection: Option<RuntimeGraphProjection>,
397    ) -> RedDBResult<RuntimeGraphCommunityResult> {
398        let graph =
399            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
400        let min_size = min_size.max(1);
401
402        match algorithm {
403            RuntimeGraphCommunityAlgorithm::LabelPropagation => {
404                let mut runner = LabelPropagation::new();
405                if let Some(max_iterations) = max_iterations {
406                    runner = runner.max_iterations(max_iterations.max(1));
407                }
408                let result = runner.run(&graph);
409                let communities = result
410                    .communities
411                    .into_iter()
412                    .filter(|community| community.size >= min_size)
413                    .map(|community| RuntimeGraphCommunity {
414                        id: community.label,
415                        size: community.size,
416                        nodes: community.nodes,
417                    })
418                    .collect::<Vec<_>>();
419                Ok(RuntimeGraphCommunityResult {
420                    algorithm,
421                    count: communities.len(),
422                    iterations: Some(result.iterations),
423                    converged: Some(result.converged),
424                    modularity: None,
425                    passes: None,
426                    communities,
427                })
428            }
429            RuntimeGraphCommunityAlgorithm::Louvain => {
430                let mut runner = Louvain::new();
431                if let Some(max_iterations) = max_iterations {
432                    runner = runner.max_iterations(max_iterations.max(1));
433                }
434                if let Some(resolution) = resolution {
435                    runner = runner.resolution(resolution.max(0.0));
436                }
437                let result = runner.run(&graph);
438                let mut communities = result
439                    .community_sizes()
440                    .into_iter()
441                    .filter(|(_, size)| *size >= min_size)
442                    .map(|(id, size)| RuntimeGraphCommunity {
443                        id: format!("community:{id}"),
444                        size,
445                        nodes: result.get_community(id),
446                    })
447                    .collect::<Vec<_>>();
448                communities.sort_by(|left, right| {
449                    right
450                        .size
451                        .cmp(&left.size)
452                        .then_with(|| left.id.cmp(&right.id))
453                });
454                Ok(RuntimeGraphCommunityResult {
455                    algorithm,
456                    count: communities.len(),
457                    iterations: None,
458                    converged: None,
459                    modularity: Some(result.modularity),
460                    passes: Some(result.passes),
461                    communities,
462                })
463            }
464        }
465    }
466
467    pub fn graph_clustering(
468        &self,
469        top_k: usize,
470        include_triangles: bool,
471        projection: Option<RuntimeGraphProjection>,
472    ) -> RedDBResult<RuntimeGraphClusteringResult> {
473        let graph =
474            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
475        let top_k = top_k.max(1);
476        let result = ClusteringCoefficient::compute(&graph);
477        let triangle_count = if include_triangles {
478            Some(crate::storage::engine::TriangleCounting::count(&graph).count)
479        } else {
480            None
481        };
482
483        Ok(RuntimeGraphClusteringResult {
484            global: result.global,
485            local: top_runtime_scores(&graph, result.local, top_k),
486            triangle_count,
487        })
488    }
489
490    pub fn graph_personalized_pagerank(
491        &self,
492        seeds: Vec<String>,
493        top_k: usize,
494        alpha: Option<f64>,
495        epsilon: Option<f64>,
496        max_iterations: Option<usize>,
497        projection: Option<RuntimeGraphProjection>,
498    ) -> RedDBResult<RuntimeGraphCentralityResult> {
499        let graph =
500            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
501        if seeds.is_empty() {
502            return Err(RedDBError::Query(
503                "personalized pagerank requires at least one seed".to_string(),
504            ));
505        }
506        for seed in &seeds {
507            ensure_graph_node(&graph, seed)?;
508        }
509
510        let mut runner = PersonalizedPageRank::new(seeds);
511        if let Some(alpha) = alpha {
512            runner = runner.alpha(alpha);
513        }
514        if let Some(epsilon) = epsilon {
515            runner = runner.epsilon(epsilon);
516        }
517        if let Some(max_iterations) = max_iterations {
518            runner = runner.max_iterations(max_iterations.max(1));
519        }
520        let result = runner.run(&graph);
521
522        Ok(RuntimeGraphCentralityResult {
523            algorithm: RuntimeGraphCentralityAlgorithm::PageRank,
524            normalized: None,
525            iterations: Some(result.iterations),
526            converged: Some(result.converged),
527            scores: top_runtime_scores(&graph, result.scores, top_k.max(1)),
528            degree_scores: Vec::new(),
529        })
530    }
531
532    pub fn graph_hits(
533        &self,
534        top_k: usize,
535        epsilon: Option<f64>,
536        max_iterations: Option<usize>,
537        projection: Option<RuntimeGraphProjection>,
538    ) -> RedDBResult<RuntimeGraphHitsResult> {
539        let graph =
540            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
541        let mut runner = HITS::new();
542        if let Some(epsilon) = epsilon {
543            runner.epsilon = epsilon.max(0.0);
544        }
545        if let Some(max_iterations) = max_iterations {
546            runner.max_iterations = max_iterations.max(1);
547        }
548        let result = runner.compute(&graph);
549
550        Ok(RuntimeGraphHitsResult {
551            iterations: result.iterations,
552            converged: result.converged,
553            hubs: top_runtime_scores(&graph, result.hub_scores, top_k.max(1)),
554            authorities: top_runtime_scores(&graph, result.authority_scores, top_k.max(1)),
555        })
556    }
557
558    pub fn graph_cycles(
559        &self,
560        max_length: usize,
561        max_cycles: usize,
562        projection: Option<RuntimeGraphProjection>,
563    ) -> RedDBResult<RuntimeGraphCyclesResult> {
564        let graph =
565            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
566        let result = CycleDetector::new()
567            .max_length(max_length.max(2))
568            .max_cycles(max_cycles.max(1))
569            .find(&graph);
570
571        Ok(RuntimeGraphCyclesResult {
572            limit_reached: result.limit_reached,
573            cycles: result
574                .cycles
575                .into_iter()
576                .map(|cycle| cycle_to_runtime(&graph, cycle))
577                .collect(),
578        })
579    }
580
581    pub fn graph_topological_sort(
582        &self,
583        projection: Option<RuntimeGraphProjection>,
584    ) -> RedDBResult<RuntimeGraphTopologicalSortResult> {
585        let graph =
586            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
587        let ordered_nodes = match DFS::topological_sort(&graph) {
588            Some(order) => order
589                .into_iter()
590                .filter_map(|id| graph.get_node(&id))
591                .map(stored_node_to_runtime)
592                .collect(),
593            None => Vec::new(),
594        };
595
596        Ok(RuntimeGraphTopologicalSortResult {
597            acyclic: !ordered_nodes.is_empty() || graph.node_count() == 0,
598            ordered_nodes,
599        })
600    }
601
602    pub fn graph_properties(
603        &self,
604        projection: Option<RuntimeGraphProjection>,
605    ) -> RedDBResult<RuntimeGraphPropertiesResult> {
606        let graph =
607            materialize_graph_with_projection(self.inner.db.store().as_ref(), projection.as_ref())?;
608        let node_count = graph.node_count() as usize;
609        let edges = graph.iter_all_edges();
610        let edge_count = edges.len();
611
612        let connected = ConnectedComponents::find(&graph);
613        let weak = WeaklyConnectedComponents::find(&graph);
614        let strong = StronglyConnectedComponents::find(&graph);
615        let cycle_result = CycleDetector::new()
616            .max_length(node_count.max(2))
617            .max_cycles(1)
618            .find(&graph);
619
620        let mut self_loop_count = 0usize;
621        let mut negative_edge_count = 0usize;
622        let mut directed_pairs = HashSet::new();
623        let mut undirected_pairs = HashSet::new();
624
625        for edge in &edges {
626            if edge.weight < 0.0 {
627                negative_edge_count += 1;
628            }
629            if edge.source_id == edge.target_id {
630                self_loop_count += 1;
631                continue;
632            }
633
634            directed_pairs.insert((edge.source_id.clone(), edge.target_id.clone()));
635            let (left, right) = if edge.source_id <= edge.target_id {
636                (edge.source_id.clone(), edge.target_id.clone())
637            } else {
638                (edge.target_id.clone(), edge.source_id.clone())
639            };
640            undirected_pairs.insert((left, right));
641        }
642
643        let expected_undirected_pairs = node_count.saturating_mul(node_count.saturating_sub(1)) / 2;
644        let expected_directed_pairs = node_count.saturating_mul(node_count.saturating_sub(1));
645        let density = if expected_undirected_pairs == 0 {
646            0.0
647        } else {
648            undirected_pairs.len() as f64 / expected_undirected_pairs as f64
649        };
650        let density_directed = if expected_directed_pairs == 0 {
651            0.0
652        } else {
653            directed_pairs.len() as f64 / expected_directed_pairs as f64
654        };
655
656        let is_empty = node_count == 0;
657        let is_connected = node_count <= 1 || connected.count == 1;
658        let is_weakly_connected = node_count <= 1 || weak.count == 1;
659        let is_strongly_connected = node_count <= 1 || strong.count == 1;
660        let is_cyclic = !cycle_result.cycles.is_empty();
661
662        Ok(RuntimeGraphPropertiesResult {
663            node_count,
664            edge_count,
665            self_loop_count,
666            negative_edge_count,
667            connected_component_count: connected.count,
668            weak_component_count: weak.count,
669            strong_component_count: strong.count,
670            is_empty,
671            is_connected,
672            is_weakly_connected,
673            is_strongly_connected,
674            is_complete: node_count <= 1 || undirected_pairs.len() == expected_undirected_pairs,
675            is_complete_directed: node_count <= 1
676                || directed_pairs.len() == expected_directed_pairs,
677            is_cyclic,
678            is_circular: is_cyclic,
679            is_acyclic: !is_cyclic,
680            is_tree: node_count > 0 && is_connected && undirected_pairs.len() + 1 == node_count,
681            density,
682            density_directed,
683        })
684    }
685}