Skip to main content

grafeo_core/graph/lpg/store/
traversal.rs

1//! Traversal and iteration methods for the LPG store.
2
3use super::LpgStore;
4use crate::graph::Direction;
5use crate::graph::lpg::{Edge, Node};
6use grafeo_common::types::{EdgeId, NodeId};
7
8impl LpgStore {
9    // === Traversal ===
10
11    /// Iterates over neighbors of a node in the specified direction.
12    ///
13    /// This is the fast path for graph traversal - goes straight to the
14    /// adjacency index without loading full node data.
15    pub fn neighbors(
16        &self,
17        node: NodeId,
18        direction: Direction,
19    ) -> impl Iterator<Item = NodeId> + '_ {
20        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
21            Direction::Outgoing | Direction::Both => {
22                Box::new(self.forward_adj.neighbors(node).into_iter())
23            }
24            Direction::Incoming => Box::new(std::iter::empty()),
25        };
26
27        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
28            Direction::Incoming | Direction::Both => {
29                if let Some(ref adj) = self.backward_adj {
30                    Box::new(adj.neighbors(node).into_iter())
31                } else {
32                    Box::new(std::iter::empty())
33                }
34            }
35            Direction::Outgoing => Box::new(std::iter::empty()),
36        };
37
38        forward.chain(backward)
39    }
40
41    /// Returns edges from a node with their targets.
42    ///
43    /// Returns an iterator of (target_node, edge_id) pairs.
44    pub fn edges_from(
45        &self,
46        node: NodeId,
47        direction: Direction,
48    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
49        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
50            Direction::Outgoing | Direction::Both => {
51                Box::new(self.forward_adj.edges_from(node).into_iter())
52            }
53            Direction::Incoming => Box::new(std::iter::empty()),
54        };
55
56        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
57            Direction::Incoming | Direction::Both => {
58                if let Some(ref adj) = self.backward_adj {
59                    Box::new(adj.edges_from(node).into_iter())
60                } else {
61                    Box::new(std::iter::empty())
62                }
63            }
64            Direction::Outgoing => Box::new(std::iter::empty()),
65        };
66
67        forward.chain(backward)
68    }
69
70    /// Returns edges to a node (where the node is the destination).
71    ///
72    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
73    /// Uses the backward adjacency index for O(degree) lookup.
74    ///
75    /// # Example
76    ///
77    /// ```ignore
78    /// // For edges: A->B, C->B
79    /// let incoming = store.edges_to(B);
80    /// // Returns: [(A, edge1), (C, edge2)]
81    /// ```
82    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
83        if let Some(ref backward) = self.backward_adj {
84            backward.edges_from(node)
85        } else {
86            // Fallback: scan all edges (slow but correct)
87            self.all_edges()
88                .filter_map(|edge| {
89                    if edge.dst == node {
90                        Some((edge.src, edge.id))
91                    } else {
92                        None
93                    }
94                })
95                .collect()
96        }
97    }
98
99    /// Returns the out-degree of a node (number of outgoing edges).
100    ///
101    /// Uses the forward adjacency index for O(1) lookup.
102    #[must_use]
103    pub fn out_degree(&self, node: NodeId) -> usize {
104        self.forward_adj.out_degree(node)
105    }
106
107    /// Returns the in-degree of a node (number of incoming edges).
108    ///
109    /// Uses the backward adjacency index for O(1) lookup if available,
110    /// otherwise falls back to scanning edges.
111    #[must_use]
112    pub fn in_degree(&self, node: NodeId) -> usize {
113        if let Some(ref backward) = self.backward_adj {
114            backward.in_degree(node)
115        } else {
116            // Fallback: count edges (slow)
117            self.all_edges().filter(|edge| edge.dst == node).count()
118        }
119    }
120
121    // === Admin API: Iteration ===
122
123    /// Returns an iterator over all nodes in the database.
124    ///
125    /// This creates a snapshot of all visible nodes at the current epoch.
126    /// Useful for dump/export operations.
127    #[cfg(not(feature = "tiered-storage"))]
128    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
129        let epoch = self.current_epoch();
130        let node_ids: Vec<NodeId> = self
131            .nodes
132            .read()
133            .iter()
134            .filter_map(|(id, chain)| {
135                chain
136                    .visible_at(epoch)
137                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
138            })
139            .collect();
140
141        node_ids.into_iter().filter_map(move |id| self.get_node(id))
142    }
143
144    /// Returns an iterator over all nodes in the database.
145    /// (Tiered storage version)
146    #[cfg(feature = "tiered-storage")]
147    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
148        let node_ids = self.node_ids();
149        node_ids.into_iter().filter_map(move |id| self.get_node(id))
150    }
151
152    /// Returns an iterator over all edges in the database.
153    ///
154    /// This creates a snapshot of all visible edges at the current epoch.
155    /// Useful for dump/export operations.
156    #[cfg(not(feature = "tiered-storage"))]
157    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
158        let epoch = self.current_epoch();
159        let edge_ids: Vec<EdgeId> = self
160            .edges
161            .read()
162            .iter()
163            .filter_map(|(id, chain)| {
164                chain
165                    .visible_at(epoch)
166                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
167            })
168            .collect();
169
170        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
171    }
172
173    /// Returns an iterator over all edges in the database.
174    /// (Tiered storage version)
175    #[cfg(feature = "tiered-storage")]
176    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
177        let epoch = self.current_epoch();
178        let versions = self.edge_versions.read();
179        let edge_ids: Vec<EdgeId> = versions
180            .iter()
181            .filter_map(|(id, index)| {
182                index.visible_at(epoch).and_then(|vref| {
183                    self.read_edge_record(&vref)
184                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
185                })
186            })
187            .collect();
188
189        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
190    }
191
192    /// Returns an iterator over nodes with a specific label.
193    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
194        let node_ids = self.nodes_by_label(label);
195        node_ids.into_iter().filter_map(move |id| self.get_node(id))
196    }
197
198    /// Returns an iterator over edges with a specific type.
199    #[cfg(not(feature = "tiered-storage"))]
200    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
201        let epoch = self.current_epoch();
202        let type_to_id = self.edge_type_to_id.read();
203
204        if let Some(&type_id) = type_to_id.get(edge_type) {
205            let edge_ids: Vec<EdgeId> = self
206                .edges
207                .read()
208                .iter()
209                .filter_map(|(id, chain)| {
210                    chain.visible_at(epoch).and_then(|r| {
211                        if !r.is_deleted() && r.type_id == type_id {
212                            Some(*id)
213                        } else {
214                            None
215                        }
216                    })
217                })
218                .collect();
219
220            // Return a boxed iterator for the found edges
221            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
222                as Box<dyn Iterator<Item = Edge> + 'a>
223        } else {
224            // Return empty iterator
225            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
226        }
227    }
228
229    /// Returns an iterator over edges with a specific type.
230    /// (Tiered storage version)
231    #[cfg(feature = "tiered-storage")]
232    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
233        let epoch = self.current_epoch();
234        let type_to_id = self.edge_type_to_id.read();
235
236        if let Some(&type_id) = type_to_id.get(edge_type) {
237            let versions = self.edge_versions.read();
238            let edge_ids: Vec<EdgeId> = versions
239                .iter()
240                .filter_map(|(id, index)| {
241                    index.visible_at(epoch).and_then(|vref| {
242                        self.read_edge_record(&vref).and_then(|r| {
243                            if !r.is_deleted() && r.type_id == type_id {
244                                Some(*id)
245                            } else {
246                                None
247                            }
248                        })
249                    })
250                })
251                .collect();
252
253            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
254                as Box<dyn Iterator<Item = Edge> + 'a>
255        } else {
256            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
257        }
258    }
259}