Skip to main content

oxgraph_postgres/
build.rs

1//! SPI-agnostic relational ingest into OXGTOPO snapshots.
2
3use alloc::{
4    collections::{BTreeMap, BTreeSet},
5    vec::Vec,
6};
7
8use oxgraph_csr::build::{GraphBuilder, GraphNodeId, export_csr_snapshot};
9use oxgraph_graph::OutgoingNeighborsGraph;
10
11use crate::{
12    artifact::{PostgresMetadata, attach_postgres_sections},
13    catalog::{NodeKey, RegisteredEdge},
14    error::{BuildError, PostgresGraphError},
15};
16
17/// Builds the dense node-id assignment used by [`DualTopologySnapshot::from_edge_rows`].
18///
19/// Keys are sorted in ascending [`NodeKey`] order; dense ids are `0..key_count`.
20///
21/// # Errors
22///
23/// Returns [`BuildError::NodeCountOverflow`] when the distinct key count does not fit in `u32`.
24///
25/// # Performance
26///
27/// This function is `O(n log n + m)` for `n` distinct keys and `m` edges.
28pub fn dense_node_map_from_edges(edges: &[EdgeRow]) -> Result<BTreeMap<NodeKey, u32>, BuildError> {
29    let mut keys = BTreeSet::new();
30    for edge in edges {
31        keys.insert(edge.source);
32        keys.insert(edge.target);
33    }
34    let mut map = BTreeMap::new();
35    for (index, key) in keys.into_iter().enumerate() {
36        let dense = u32::try_from(index).map_err(|_| BuildError::NodeCountOverflow)?;
37        map.insert(key, dense);
38    }
39    Ok(map)
40}
41
42/// Maps scanned SQL primary-key values to an [`EdgeRow`] for one registered edge mapping.
43///
44/// # Performance
45///
46/// This function is `O(1)`.
47#[must_use]
48pub const fn edge_row_from_scan(edge: &RegisteredEdge, source_pk: u64, target_pk: u64) -> EdgeRow {
49    EdgeRow {
50        source: NodeKey::registered(edge.source_table, source_pk),
51        target: NodeKey::registered(edge.target_table, target_pk),
52    }
53}
54
55/// One edge endpoint pair supplied by a caller scanning source tables.
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub struct EdgeRow {
58    /// Source node key.
59    pub source: NodeKey,
60    /// Target node key.
61    pub target: NodeKey,
62}
63
64/// Forward CSR plus inbound CSC snapshot bytes with Postgres metadata.
65#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
66pub struct DualTopologySnapshot;
67
68impl DualTopologySnapshot {
69    /// Builds OXGTOPO bytes from scanned edge rows.
70    ///
71    /// Node indices are assigned in ascending [`NodeKey`] order. Catalog registration is
72    /// validated separately via [`crate::Catalog::validate_for_build`] or
73    /// [`crate::SnapshotRebuild::from_catalog_and_edges`].
74    ///
75    /// # Errors
76    ///
77    /// Returns [`PostgresGraphError::Build`] when edge rows are empty or graph construction fails.
78    ///
79    /// # Performance
80    ///
81    /// This function is `O(n log n + m)` where `n` is distinct nodes and `m` is edges.
82    pub fn from_edge_rows(
83        edges: &[EdgeRow],
84        built_at_unix: u64,
85    ) -> Result<Vec<u8>, PostgresGraphError> {
86        if edges.is_empty() {
87            return Err(BuildError::EmptyEdges.into());
88        }
89
90        let mut keys: BTreeSet<NodeKey> = BTreeSet::new();
91        for edge in edges {
92            keys.insert(edge.source);
93            keys.insert(edge.target);
94        }
95
96        let mut builder = GraphBuilder::<u32, u32>::new();
97        let mut key_to_node: BTreeMap<NodeKey, GraphNodeId<u32>> = BTreeMap::new();
98        for key in &keys {
99            let node = builder.add_node()?;
100            key_to_node.insert(*key, node);
101        }
102        for edge in edges {
103            let source = *key_to_node
104                .get(&edge.source)
105                .ok_or(BuildError::MissingNodeKey)?;
106            let target = *key_to_node
107                .get(&edge.target)
108                .ok_or(BuildError::MissingNodeKey)?;
109            builder.add_edge(source, target)?;
110        }
111
112        let frozen = builder.freeze()?;
113        let node_count = u32::try_from(keys.len()).map_err(|_| BuildError::NodeCountOverflow)?;
114        let edge_count =
115            u32::try_from(frozen.edge_ids().len()).map_err(|_| BuildError::EdgeCountOverflow)?;
116
117        let mut inbound_builder = GraphBuilder::<u32, u32>::new();
118        for _ in 0..keys.len() {
119            inbound_builder.add_node()?;
120        }
121        for source in 0..keys.len() {
122            let source_id =
123                GraphNodeId(u32::try_from(source).map_err(|_| BuildError::NodeCountOverflow)?);
124            for target in frozen.outgoing_neighbors(source_id) {
125                inbound_builder.add_edge(target, source_id)?;
126            }
127        }
128        let inbound_frozen = inbound_builder.freeze()?;
129        if inbound_frozen.edge_ids().len() != frozen.edge_ids().len() {
130            return Err(BuildError::EdgeCountMismatch.into());
131        }
132
133        let forward_bytes = export_csr_snapshot(&frozen)?;
134        let inbound_bytes = export_csr_snapshot(&inbound_frozen)?;
135        let metadata =
136            PostgresMetadata::new(node_count, edge_count, built_at_unix, true).with_reverse_index();
137        attach_postgres_sections(&forward_bytes, Some(&inbound_bytes), &metadata)
138    }
139
140    /// Builds OXGTOPO bytes from dense `0..node_count-1` node ids (tests and benches).
141    ///
142    /// # Errors
143    ///
144    /// Returns [`PostgresGraphError::Build`] when `edges` is empty or construction fails.
145    ///
146    /// # Performance
147    ///
148    /// This function is `O(n + m)` for `n` nodes and `m` edges.
149    pub fn from_dense_u32_edges(
150        edges: &[(u32, u32)],
151        built_at_unix: u64,
152    ) -> Result<Vec<u8>, PostgresGraphError> {
153        if edges.is_empty() {
154            return Err(BuildError::EmptyEdges.into());
155        }
156        let max_index = edges
157            .iter()
158            .flat_map(|(source, target)| [*source, *target])
159            .max()
160            .unwrap_or(0);
161        Self::from_dense_u32_edges_with_node_count(max_index + 1, edges, built_at_unix)
162    }
163
164    /// Builds OXGTOPO bytes with an explicit node count (tests with isolated vertices).
165    ///
166    /// # Errors
167    ///
168    /// Returns [`PostgresGraphError::Build`] when `node_count` is zero, an edge endpoint is out of
169    /// range, or construction fails.
170    ///
171    /// # Performance
172    ///
173    /// This function is `O(n + m)` for `n` nodes and `m` edges.
174    pub fn from_dense_u32_edges_with_node_count(
175        node_count: u32,
176        edges: &[(u32, u32)],
177        built_at_unix: u64,
178    ) -> Result<Vec<u8>, PostgresGraphError> {
179        if node_count == 0 {
180            return Err(BuildError::EmptyEdges.into());
181        }
182        let node_count_usize = node_count as usize;
183        for &(source, target) in edges {
184            if source >= node_count || target >= node_count {
185                return Err(BuildError::MissingNodeKey.into());
186            }
187        }
188
189        let mut forward_builder = GraphBuilder::<u32, u32>::new();
190        for _ in 0..node_count_usize {
191            forward_builder.add_node()?;
192        }
193        for &(source, target) in edges {
194            forward_builder.add_edge(GraphNodeId(source), GraphNodeId(target))?;
195        }
196        let forward_frozen = forward_builder.freeze()?;
197        let edge_count = u32::try_from(forward_frozen.edge_ids().len())
198            .map_err(|_| BuildError::EdgeCountOverflow)?;
199
200        let mut inbound_builder = GraphBuilder::<u32, u32>::new();
201        for _ in 0..node_count_usize {
202            inbound_builder.add_node()?;
203        }
204        for &(source, target) in edges {
205            inbound_builder.add_edge(GraphNodeId(target), GraphNodeId(source))?;
206        }
207        let inbound_frozen = inbound_builder.freeze()?;
208        if inbound_frozen.edge_ids().len() != forward_frozen.edge_ids().len() {
209            return Err(BuildError::EdgeCountMismatch.into());
210        }
211
212        let forward_bytes = export_csr_snapshot(&forward_frozen)?;
213        let inbound_bytes = export_csr_snapshot(&inbound_frozen)?;
214        let metadata =
215            PostgresMetadata::new(node_count, edge_count, built_at_unix, true).with_reverse_index();
216        attach_postgres_sections(&forward_bytes, Some(&inbound_bytes), &metadata)
217    }
218}
219
220/// Estimates node and edge counts for a build without exporting a snapshot.
221///
222/// # Performance
223///
224/// This function is `O(m)`.
225#[must_use]
226pub fn estimate_build(edges: &[EdgeRow]) -> BuildEstimate {
227    let mut keys = BTreeSet::new();
228    for edge in edges {
229        keys.insert(edge.source);
230        keys.insert(edge.target);
231    }
232    BuildEstimate {
233        node_count: keys.len(),
234        edge_count: edges.len(),
235    }
236}
237
238/// Estimated build sizes for registration UIs.
239#[derive(Clone, Copy, Debug, PartialEq, Eq)]
240pub struct BuildEstimate {
241    /// Distinct node keys observed in scanned edge rows.
242    pub node_count: usize,
243    /// Edge row count.
244    pub edge_count: usize,
245}