Skip to main content

oxgraph_postgres/
topology.rs

1//! Typed forward CSR and inbound CSC views opened once per engine.
2
3use alloc::vec::Vec;
4
5use oxgraph_csc::CscSnapshotGraph;
6use oxgraph_csr::{CsrNodeId, CsrSnapshotGraph};
7use oxgraph_graph::{EdgeTargetGraph, OutgoingGraph, OutgoingNeighborsGraph, TopologyCounts};
8use oxgraph_snapshot::Snapshot;
9
10use crate::{
11    artifact::read_metadata,
12    error::{BuildError, PostgresGraphError},
13    overlay::OverlayState,
14};
15
16/// Outgoing adjacency — foundation CSR topology sections only.
17#[derive(Clone, Copy, Debug)]
18pub struct ForwardCsr<'view>(pub(crate) CsrSnapshotGraph<'view, u32, u32>);
19
20/// Incoming adjacency — Postgres inbound CSC sections only.
21#[derive(Clone, Copy, Debug)]
22pub struct InboundCsc<'view>(pub(crate) CscSnapshotGraph<'view>);
23
24/// Both topology views borrowing the same snapshot backing.
25#[derive(Clone, Copy, Debug, yoke::Yokeable)]
26#[yoke(prove_covariant)]
27pub struct GraphTopology<'view> {
28    /// Forward CSR over outgoing edges.
29    pub forward: ForwardCsr<'view>,
30    /// Inbound CSC over predecessor lists.
31    pub inbound: InboundCsc<'view>,
32}
33
34/// Hot topology slice views for BFS (assembled once per query).
35#[derive(Clone, Copy, Debug)]
36#[expect(
37    clippy::redundant_pub_crate,
38    reason = "shared with traverse session module"
39)]
40pub(crate) struct TopologyHot<'view> {
41    /// Forward CSR for outgoing expansion.
42    pub forward: ForwardCsr<'view>,
43    /// Inbound CSC for incoming expansion.
44    pub inbound: InboundCsc<'view>,
45}
46
47impl<'view> TopologyHot<'view> {
48    /// Builds hot views from opened engine topology.
49    ///
50    /// # Performance
51    ///
52    /// This method is `O(1)`.
53    #[must_use]
54    pub(crate) const fn from_topology(topology: &GraphTopology<'view>) -> Self {
55        Self {
56            forward: topology.forward,
57            inbound: topology.inbound,
58        }
59    }
60}
61
62/// Engine-local node-unique adjacency derived from the parallel base topology.
63#[derive(Clone, Debug, Default, PartialEq, Eq)]
64#[expect(
65    clippy::redundant_pub_crate,
66    reason = "shared with engine and traversal modules"
67)]
68pub(crate) struct UniqueAdjacency {
69    /// Outgoing row offsets into [`Self::outgoing_targets`].
70    outgoing_offsets: Vec<usize>,
71    /// Sorted, deduplicated outgoing target ids.
72    outgoing_targets: Vec<u32>,
73    /// Incoming row offsets into [`Self::incoming_sources`].
74    incoming_offsets: Vec<usize>,
75    /// Sorted, deduplicated incoming predecessor ids.
76    incoming_sources: Vec<u32>,
77}
78
79impl UniqueAdjacency {
80    /// Builds node-unique outgoing and incoming adjacency from base topology views.
81    ///
82    /// # Performance
83    ///
84    /// This method is `O(n + m log d)` for `n` nodes, `m` parallel edge slots, and maximum
85    /// per-node degree `d`; it allocates `O(n + u)` memory for `u` unique adjacency slots.
86    #[must_use]
87    pub(crate) fn from_topology(forward: &ForwardCsr<'_>, inbound: &InboundCsc<'_>) -> Self {
88        let node_count = forward.node_count();
89        let (outgoing_offsets, outgoing_targets) =
90            Self::build_unique_rows(node_count, |node| forward.successors(node));
91        let (incoming_offsets, incoming_sources) =
92            Self::build_unique_rows(node_count, |node| inbound.predecessors(node));
93        Self {
94            outgoing_offsets,
95            outgoing_targets,
96            incoming_offsets,
97            incoming_sources,
98        }
99    }
100
101    /// Builds sorted, deduplicated rows from a parallel adjacency row iterator.
102    fn build_unique_rows<I>(
103        node_count: usize,
104        mut neighbors: impl FnMut(u32) -> I,
105    ) -> (Vec<usize>, Vec<u32>)
106    where
107        I: Iterator<Item = u32>,
108    {
109        let mut offsets = Vec::with_capacity(node_count.saturating_add(1));
110        let mut targets = Vec::new();
111        let mut scratch = Vec::new();
112        offsets.push(0);
113        let Ok(node_bound) = u32::try_from(node_count) else {
114            return (offsets, targets);
115        };
116        for node_id in 0..node_bound {
117            scratch.clear();
118            scratch.extend(neighbors(node_id));
119            scratch.sort_unstable();
120            scratch.dedup();
121            targets.extend_from_slice(&scratch);
122            offsets.push(targets.len());
123        }
124        (offsets, targets)
125    }
126
127    /// Returns the row slice for `node`, or an empty slice when out of bounds.
128    fn row<'a>(offsets: &[usize], targets: &'a [u32], node: u32) -> &'a [u32] {
129        let index = node as usize;
130        let Some((&start, &end)) = offsets.get(index).zip(offsets.get(index.saturating_add(1)))
131        else {
132            return &[];
133        };
134        &targets[start..end]
135    }
136
137    /// Returns sorted, node-unique outgoing targets for `source`.
138    ///
139    /// # Performance
140    ///
141    /// This method is `O(1)` to borrow the row.
142    #[must_use]
143    pub(crate) fn outgoing(&self, source: u32) -> &[u32] {
144        Self::row(&self.outgoing_offsets, &self.outgoing_targets, source)
145    }
146
147    /// Returns sorted, node-unique incoming predecessors for `target`.
148    ///
149    /// # Performance
150    ///
151    /// This method is `O(1)` to borrow the row.
152    #[must_use]
153    pub(crate) fn incoming(&self, target: u32) -> &[u32] {
154        Self::row(&self.incoming_offsets, &self.incoming_sources, target)
155    }
156}
157
158impl GraphTopology<'_> {
159    /// Returns whether `node` is visible for traversal under `direction`.
160    #[must_use]
161    pub(crate) fn node_visible(
162        &self,
163        node: u32,
164        direction: crate::traverse::TraversalDirection,
165        overlay: &OverlayState,
166    ) -> bool {
167        match direction {
168            crate::traverse::TraversalDirection::Out => self.forward.node_visible(node, overlay),
169            crate::traverse::TraversalDirection::In => self.inbound.node_visible(node, overlay),
170        }
171    }
172}
173
174impl<'view> GraphTopology<'view> {
175    /// Opens both layouts from validated snapshot bytes.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`PostgresGraphError`] when metadata, sections, or cross-layout counts disagree.
180    ///
181    /// # Performance
182    ///
183    /// This function is `O(s + n + m)`.
184    pub fn open(snapshot: &Snapshot<'view>) -> Result<Self, PostgresGraphError> {
185        let metadata = read_metadata(snapshot)?;
186        if !metadata.has_reverse_index() {
187            return Err(PostgresGraphError::Build(BuildError::MissingReverseIndex));
188        }
189        let forward = ForwardCsr(CsrSnapshotGraph::from_snapshot(snapshot)?);
190        let inbound = InboundCsc(CscSnapshotGraph::from_snapshot(snapshot)?);
191        if forward.0.element_count() != inbound.0.node_count() {
192            return Err(PostgresGraphError::Build(
193                BuildError::TopologyNodeCountMismatch,
194            ));
195        }
196        if forward.0.relation_count() != inbound.0.relation_count() {
197            return Err(PostgresGraphError::Build(
198                BuildError::TopologyEdgeCountMismatch,
199            ));
200        }
201        if u32::try_from(forward.0.element_count()).ok() != Some(metadata.node_count.get()) {
202            return Err(PostgresGraphError::Build(
203                BuildError::MetadataNodeCountMismatch,
204            ));
205        }
206        if u32::try_from(forward.0.relation_count()).ok() != Some(metadata.edge_count.get()) {
207            return Err(PostgresGraphError::Build(
208                BuildError::MetadataEdgeCountMismatch,
209            ));
210        }
211        Ok(Self { forward, inbound })
212    }
213}
214
215impl ForwardCsr<'_> {
216    /// Returns the node count in this forward view.
217    ///
218    /// # Performance
219    ///
220    /// This method is `O(1)`.
221    #[must_use]
222    pub fn node_count(&self) -> usize {
223        self.0.element_count()
224    }
225
226    /// Returns successor node ids for `source`.
227    ///
228    /// # Performance
229    ///
230    /// This method is `O(1)` to create and `O(k)` to yield `k` successors.
231    #[must_use]
232    pub fn successors(&self, source: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
233        self.0.outgoing_neighbors(CsrNodeId(source)).map(|id| id.0)
234    }
235
236    /// Returns whether `node` is visible for traversal seeds and results.
237    #[must_use]
238    pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
239        (node as usize) < self.node_count()
240            && (!overlay.has_node_tombstones() || overlay.node_visible(node))
241    }
242
243    /// Walks outgoing target node ids for `source` via the CSR target slice.
244    pub(crate) fn for_each_out_target(
245        &self,
246        source: u32,
247        mut visit: impl FnMut(u32) -> bool,
248    ) -> bool {
249        self.0
250            .for_each_out_target(CsrNodeId(source), |id| visit(id.0))
251    }
252
253    /// Walks parallel outgoing `(target, edge_id)` slots for `source`.
254    ///
255    /// Stops early when `visit` returns `true`.
256    ///
257    /// # Performance
258    ///
259    /// This method is `O(k)` for `k` outgoing edges.
260    pub(crate) fn for_each_out_edge(
261        &self,
262        source: u32,
263        mut visit: impl FnMut(u32, u32) -> bool,
264    ) -> bool {
265        let graph = &self.0;
266        for edge in OutgoingGraph::outgoing_edges(graph, CsrNodeId(source)) {
267            let target = EdgeTargetGraph::target(graph, edge).0;
268            if visit(target, edge.0) {
269                return true;
270            }
271        }
272        false
273    }
274}
275
276impl InboundCsc<'_> {
277    /// Returns the node count in this inbound view.
278    ///
279    /// # Performance
280    ///
281    /// This method is `O(1)`.
282    #[must_use]
283    pub fn node_count(&self) -> usize {
284        self.0.node_count()
285    }
286
287    /// Returns predecessor node ids for `target`.
288    ///
289    /// # Performance
290    ///
291    /// This method is `O(1)` to create and `O(k)` to yield `k` predecessors.
292    #[must_use]
293    pub fn predecessors(&self, target: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
294        self.0.predecessors(target)
295    }
296
297    /// Returns whether `node` is visible for traversal seeds and results.
298    #[must_use]
299    pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
300        (node as usize) < self.node_count()
301            && (!overlay.has_node_tombstones() || overlay.node_visible(node))
302    }
303
304    /// Walks predecessor node ids for `target` via the CSC source slice.
305    ///
306    /// Stops early when `visit` returns `true`.
307    ///
308    /// # Performance
309    ///
310    /// This method is `O(k)` for `k` predecessors with no iterator adapters.
311    pub(crate) fn for_each_in_source(&self, target: u32, visit: impl FnMut(u32) -> bool) -> bool {
312        self.0.for_each_predecessor(target, visit)
313    }
314}