Skip to main content

grafeo_core/graph/lpg/store/
traversal.rs

1//! Traversal and iteration methods for the LPG store.
2
3use super::LpgStore;
4use crate::graph::Direction;
5use crate::graph::lpg::{Edge, Node};
6use grafeo_common::types::{EdgeId, NodeId};
7
8impl LpgStore {
9    // === Traversal ===
10
11    /// Iterates over neighbors of a node in the specified direction.
12    ///
13    /// This is the fast path for graph traversal - goes straight to the
14    /// adjacency index without loading full node data.
15    pub fn neighbors(
16        &self,
17        node: NodeId,
18        direction: Direction,
19    ) -> impl Iterator<Item = NodeId> + '_ {
20        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
21            Direction::Outgoing | Direction::Both => {
22                Box::new(self.forward_adj.neighbors(node).into_iter())
23            }
24            Direction::Incoming => Box::new(std::iter::empty()),
25        };
26
27        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
28            Direction::Incoming | Direction::Both => {
29                if let Some(ref adj) = self.backward_adj {
30                    Box::new(adj.neighbors(node).into_iter())
31                } else {
32                    Box::new(std::iter::empty())
33                }
34            }
35            Direction::Outgoing => Box::new(std::iter::empty()),
36        };
37
38        forward.chain(backward)
39    }
40
41    /// Returns edges from a node with their targets.
42    ///
43    /// Returns an iterator of (target_node, edge_id) pairs.
44    pub fn edges_from(
45        &self,
46        node: NodeId,
47        direction: Direction,
48    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
49        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
50            Direction::Outgoing | Direction::Both => {
51                Box::new(self.forward_adj.edges_from(node).into_iter())
52            }
53            Direction::Incoming => Box::new(std::iter::empty()),
54        };
55
56        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
57            Direction::Incoming | Direction::Both => {
58                if let Some(ref adj) = self.backward_adj {
59                    Box::new(adj.edges_from(node).into_iter())
60                } else {
61                    Box::new(std::iter::empty())
62                }
63            }
64            Direction::Outgoing => Box::new(std::iter::empty()),
65        };
66
67        forward.chain(backward)
68    }
69
70    /// Returns edges to a node (where the node is the destination).
71    ///
72    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
73    /// Uses the backward adjacency index for O(degree) lookup.
74    ///
75    /// # Example
76    ///
77    /// ```
78    /// # use grafeo_core::graph::lpg::LpgStore;
79    /// # use grafeo_common::types::Value;
80    /// let store = LpgStore::new();
81    /// let a = store.create_node(&["Node"]);
82    /// let b = store.create_node(&["Node"]);
83    /// let c = store.create_node(&["Node"]);
84    /// let _e1 = store.create_edge(a, b, "LINKS");
85    /// let _e2 = store.create_edge(c, b, "LINKS");
86    ///
87    /// // For edges: A->B, C->B
88    /// let incoming = store.edges_to(b);
89    /// assert_eq!(incoming.len(), 2);
90    /// ```
91    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
92        if let Some(ref backward) = self.backward_adj {
93            backward.edges_from(node)
94        } else {
95            // Fallback: scan all edges (slow but correct)
96            self.all_edges()
97                .filter_map(|edge| {
98                    if edge.dst == node {
99                        Some((edge.src, edge.id))
100                    } else {
101                        None
102                    }
103                })
104                .collect()
105        }
106    }
107
108    /// Returns the out-degree of a node (number of outgoing edges).
109    ///
110    /// Uses the forward adjacency index for O(1) lookup.
111    #[must_use]
112    pub fn out_degree(&self, node: NodeId) -> usize {
113        self.forward_adj.out_degree(node)
114    }
115
116    /// Returns the in-degree of a node (number of incoming edges).
117    ///
118    /// Uses the backward adjacency index for O(1) lookup if available,
119    /// otherwise falls back to scanning edges.
120    #[must_use]
121    pub fn in_degree(&self, node: NodeId) -> usize {
122        if let Some(ref backward) = self.backward_adj {
123            backward.in_degree(node)
124        } else {
125            // Fallback: count edges (slow)
126            self.all_edges().filter(|edge| edge.dst == node).count()
127        }
128    }
129
130    // === Admin API: Iteration ===
131
132    /// Returns an iterator over all nodes in the database.
133    ///
134    /// This creates a snapshot of all visible nodes at the current epoch.
135    /// Useful for dump/export operations.
136    #[cfg(not(feature = "tiered-storage"))]
137    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
138        let epoch = self.current_epoch();
139        let node_ids: Vec<NodeId> = self
140            .nodes
141            .read()
142            .iter()
143            .filter_map(|(id, chain)| {
144                chain
145                    .visible_at(epoch)
146                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
147            })
148            .collect();
149
150        node_ids.into_iter().filter_map(move |id| self.get_node(id))
151    }
152
153    /// Returns an iterator over all nodes in the database.
154    /// (Tiered storage version)
155    #[cfg(feature = "tiered-storage")]
156    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
157        let node_ids = self.node_ids();
158        node_ids.into_iter().filter_map(move |id| self.get_node(id))
159    }
160
161    /// Returns an iterator over all edges in the database.
162    ///
163    /// This creates a snapshot of all visible edges at the current epoch.
164    /// Useful for dump/export operations.
165    #[cfg(not(feature = "tiered-storage"))]
166    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
167        let epoch = self.current_epoch();
168        let edge_ids: Vec<EdgeId> = self
169            .edges
170            .read()
171            .iter()
172            .filter_map(|(id, chain)| {
173                chain
174                    .visible_at(epoch)
175                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
176            })
177            .collect();
178
179        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
180    }
181
182    /// Returns an iterator over all edges in the database.
183    /// (Tiered storage version)
184    #[cfg(feature = "tiered-storage")]
185    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
186        let epoch = self.current_epoch();
187        let versions = self.edge_versions.read();
188        let edge_ids: Vec<EdgeId> = versions
189            .iter()
190            .filter_map(|(id, index)| {
191                index.visible_at(epoch).and_then(|vref| {
192                    self.read_edge_record(&vref)
193                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
194                })
195            })
196            .collect();
197
198        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
199    }
200
201    /// Returns an iterator over nodes with a specific label.
202    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
203        let node_ids = self.nodes_by_label(label);
204        node_ids.into_iter().filter_map(move |id| self.get_node(id))
205    }
206
207    /// Returns an iterator over edges with a specific type.
208    #[cfg(not(feature = "tiered-storage"))]
209    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
210        let epoch = self.current_epoch();
211        let type_to_id = self.edge_type_to_id.read();
212
213        if let Some(&type_id) = type_to_id.get(edge_type) {
214            let edge_ids: Vec<EdgeId> = self
215                .edges
216                .read()
217                .iter()
218                .filter_map(|(id, chain)| {
219                    chain.visible_at(epoch).and_then(|r| {
220                        if !r.is_deleted() && r.type_id == type_id {
221                            Some(*id)
222                        } else {
223                            None
224                        }
225                    })
226                })
227                .collect();
228
229            // Return a boxed iterator for the found edges
230            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
231                as Box<dyn Iterator<Item = Edge> + 'a>
232        } else {
233            // Return empty iterator
234            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
235        }
236    }
237
238    /// Returns an iterator over edges with a specific type.
239    /// (Tiered storage version)
240    #[cfg(feature = "tiered-storage")]
241    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
242        let epoch = self.current_epoch();
243        let type_to_id = self.edge_type_to_id.read();
244
245        if let Some(&type_id) = type_to_id.get(edge_type) {
246            let versions = self.edge_versions.read();
247            let edge_ids: Vec<EdgeId> = versions
248                .iter()
249                .filter_map(|(id, index)| {
250                    index.visible_at(epoch).and_then(|vref| {
251                        self.read_edge_record(&vref).and_then(|r| {
252                            if !r.is_deleted() && r.type_id == type_id {
253                                Some(*id)
254                            } else {
255                                None
256                            }
257                        })
258                    })
259                })
260                .collect();
261
262            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
263                as Box<dyn Iterator<Item = Edge> + 'a>
264        } else {
265            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
266        }
267    }
268}