Skip to main content

oxgraph_postgres/
build.rs

1//! SPI-agnostic relational ingest into OXGTOPO snapshots.
2
3use alloc::{
4    collections::{BTreeMap, BTreeSet},
5    vec::Vec,
6};
7
8use oxgraph_csr::build::{GraphBuilder, GraphNodeId, export_csr_snapshot};
9
10use crate::{
11    artifact::{PostgresMetadata, attach_postgres_sections},
12    catalog::{NodeKey, RegisteredEdge},
13    error::{BuildError, PostgresGraphError},
14};
15
16/// Collects the distinct endpoint [`NodeKey`]s referenced by `edges`.
17///
18/// Shared by [`dense_node_map_from_edges`], [`DualTopologySnapshot::from_edge_rows`],
19/// and [`estimate_build`] so the endpoint-collection step lives in one place.
20///
21/// # Performance
22///
23/// This function is `O(m log n)` for `m` edges and `n` distinct keys.
24#[expect(
25    clippy::redundant_pub_crate,
26    reason = "shared build/sync helper in a private module"
27)]
28pub(crate) fn distinct_node_keys(edges: &[EdgeRow]) -> BTreeSet<NodeKey> {
29    let mut keys = BTreeSet::new();
30    for edge in edges {
31        keys.insert(edge.source);
32        keys.insert(edge.target);
33    }
34    keys
35}
36
37/// Assigns dense `0..key_count` ids to a sorted set of distinct node keys.
38///
39/// This is the single dense-assignment primitive: [`dense_node_map_from_edges`]
40/// and [`crate::sync::dense_node_map_for_sync_resolution`] both funnel their key
41/// sets through it (the sync path seeds extra keys from keyed sync rows first).
42///
43/// # Errors
44///
45/// Returns [`BuildError::NodeCountOverflow`] when the distinct key count does not fit in `u32`.
46///
47/// # Performance
48///
49/// This function is `O(n)` for `n` distinct keys.
50#[expect(
51    clippy::redundant_pub_crate,
52    reason = "shared build/sync dense-assignment primitive in a private module"
53)]
54pub(crate) fn dense_node_map_from_keys(
55    keys: BTreeSet<NodeKey>,
56) -> Result<BTreeMap<NodeKey, u32>, BuildError> {
57    let mut map = BTreeMap::new();
58    for (index, key) in keys.into_iter().enumerate() {
59        let dense = u32::try_from(index).map_err(|_| BuildError::NodeCountOverflow)?;
60        map.insert(key, dense);
61    }
62    Ok(map)
63}
64
65/// Builds the dense node-id assignment used by [`DualTopologySnapshot::from_edge_rows`].
66///
67/// Keys are sorted in ascending [`NodeKey`] order; dense ids are `0..key_count`.
68///
69/// # Errors
70///
71/// Returns [`BuildError::NodeCountOverflow`] when the distinct key count does not fit in `u32`.
72///
73/// # Performance
74///
75/// This function is `O(n log n + m)` for `n` distinct keys and `m` edges.
76pub fn dense_node_map_from_edges(edges: &[EdgeRow]) -> Result<BTreeMap<NodeKey, u32>, BuildError> {
77    dense_node_map_from_keys(distinct_node_keys(edges))
78}
79
80/// Maps scanned SQL primary-key values to an [`EdgeRow`] for one registered edge mapping.
81///
82/// # Performance
83///
84/// This function is `O(1)`.
85#[must_use]
86pub const fn edge_row_from_scan(edge: &RegisteredEdge, source_pk: u64, target_pk: u64) -> EdgeRow {
87    EdgeRow {
88        source: NodeKey::registered(edge.source_table, source_pk),
89        target: NodeKey::registered(edge.target_table, target_pk),
90    }
91}
92
93/// One edge endpoint pair supplied by a caller scanning source tables.
94#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95pub struct EdgeRow {
96    /// Source node key.
97    pub source: NodeKey,
98    /// Target node key.
99    pub target: NodeKey,
100}
101
102/// Forward CSR plus inbound CSC snapshot bytes with Postgres metadata.
103#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
104pub struct DualTopologySnapshot;
105
106impl DualTopologySnapshot {
107    /// Builds OXGTOPO bytes from scanned edge rows.
108    ///
109    /// Node indices are assigned in ascending [`NodeKey`] order. Catalog registration is
110    /// validated separately via [`crate::Catalog::validate_for_build`] or
111    /// [`crate::SnapshotRebuild::from_catalog_and_edges`].
112    ///
113    /// # Errors
114    ///
115    /// Returns [`PostgresGraphError::Build`] when edge rows are empty or graph construction fails.
116    ///
117    /// # Performance
118    ///
119    /// This function is `O(n log n + m)` where `n` is distinct nodes and `m` is edges.
120    pub fn from_edge_rows(
121        edges: &[EdgeRow],
122        built_at_unix: u64,
123    ) -> Result<Vec<u8>, PostgresGraphError> {
124        if edges.is_empty() {
125            return Err(BuildError::EmptyEdges.into());
126        }
127
128        let key_to_dense = dense_node_map_from_keys(distinct_node_keys(edges))?;
129        let key_count = key_to_dense.len();
130
131        let mut builder = GraphBuilder::<u32, u32>::new();
132        for _ in 0..key_count {
133            builder.add_node()?;
134        }
135        for edge in edges {
136            let source = *key_to_dense
137                .get(&edge.source)
138                .ok_or(BuildError::MissingNodeKey)?;
139            let target = *key_to_dense
140                .get(&edge.target)
141                .ok_or(BuildError::MissingNodeKey)?;
142            builder.add_edge(GraphNodeId::new(source), GraphNodeId::new(target))?;
143        }
144
145        let frozen = builder.freeze()?;
146        let node_count = u32::try_from(key_count).map_err(|_| BuildError::NodeCountOverflow)?;
147        let edge_count =
148            u32::try_from(frozen.edge_ids().len()).map_err(|_| BuildError::EdgeCountOverflow)?;
149
150        let inbound_frozen = frozen.transpose()?;
151        let forward_bytes = export_csr_snapshot(&frozen)?;
152        let inbound_bytes = export_csr_snapshot(&inbound_frozen)?;
153        let metadata =
154            PostgresMetadata::new(node_count, edge_count, built_at_unix, true).with_reverse_index();
155        attach_postgres_sections(&forward_bytes, Some(&inbound_bytes), &metadata)
156    }
157
158    /// Builds OXGTOPO bytes from dense `0..node_count-1` node ids (tests and benches).
159    ///
160    /// # Errors
161    ///
162    /// Returns [`PostgresGraphError::Build`] when `edges` is empty or construction fails.
163    ///
164    /// # Performance
165    ///
166    /// This function is `O(n + m)` for `n` nodes and `m` edges.
167    pub fn from_dense_u32_edges(
168        edges: &[(u32, u32)],
169        built_at_unix: u64,
170    ) -> Result<Vec<u8>, PostgresGraphError> {
171        if edges.is_empty() {
172            return Err(BuildError::EmptyEdges.into());
173        }
174        let max_index = edges
175            .iter()
176            .flat_map(|(source, target)| [*source, *target])
177            .max()
178            .unwrap_or(0);
179        let node_count = max_index
180            .checked_add(1)
181            .ok_or(BuildError::NodeCountOverflow)?;
182        Self::from_dense_u32_edges_with_node_count(node_count, edges, built_at_unix)
183    }
184
185    /// Builds OXGTOPO bytes with an explicit node count (tests with isolated vertices).
186    ///
187    /// # Errors
188    ///
189    /// Returns [`PostgresGraphError::Build`] when `node_count` is zero, an edge endpoint is out of
190    /// range, or construction fails.
191    ///
192    /// # Performance
193    ///
194    /// This function is `O(n + m)` for `n` nodes and `m` edges.
195    pub fn from_dense_u32_edges_with_node_count(
196        node_count: u32,
197        edges: &[(u32, u32)],
198        built_at_unix: u64,
199    ) -> Result<Vec<u8>, PostgresGraphError> {
200        if node_count == 0 {
201            return Err(BuildError::EmptyEdges.into());
202        }
203        let node_count_usize = node_count as usize;
204        for &(source, target) in edges {
205            if source >= node_count || target >= node_count {
206                return Err(BuildError::MissingNodeKey.into());
207            }
208        }
209
210        let mut forward_builder = GraphBuilder::<u32, u32>::new();
211        for _ in 0..node_count_usize {
212            forward_builder.add_node()?;
213        }
214        for &(source, target) in edges {
215            forward_builder.add_edge(GraphNodeId::new(source), GraphNodeId::new(target))?;
216        }
217        let forward_frozen = forward_builder.freeze()?;
218        let edge_count = u32::try_from(forward_frozen.edge_ids().len())
219            .map_err(|_| BuildError::EdgeCountOverflow)?;
220
221        let inbound_frozen = forward_frozen.transpose()?;
222        let forward_bytes = export_csr_snapshot(&forward_frozen)?;
223        let inbound_bytes = export_csr_snapshot(&inbound_frozen)?;
224        let metadata =
225            PostgresMetadata::new(node_count, edge_count, built_at_unix, true).with_reverse_index();
226        attach_postgres_sections(&forward_bytes, Some(&inbound_bytes), &metadata)
227    }
228}
229
230/// Estimates node and edge counts for a build without exporting a snapshot.
231///
232/// # Performance
233///
234/// This function is `O(m)`.
235#[must_use]
236pub fn estimate_build(edges: &[EdgeRow]) -> BuildEstimate {
237    BuildEstimate {
238        node_count: distinct_node_keys(edges).len(),
239        edge_count: edges.len(),
240    }
241}
242
243/// Estimated build sizes for registration UIs.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub struct BuildEstimate {
246    /// Distinct node keys observed in scanned edge rows.
247    pub node_count: usize,
248    /// Edge row count.
249    pub edge_count: usize,
250}