Skip to main content

oxgraph_postgres/
engine.rs

1//! Active graph engine: snapshot backing, dual topology, overlays, and config.
2
3use alloc::{boxed::Box, vec::Vec};
4use core::num::NonZeroUsize;
5
6use yoke::Yoke;
7
8use crate::{
9    artifact::PostgresMetadata,
10    build::EdgeRow,
11    builder::EngineBuilder,
12    catalog::Catalog,
13    config::Config,
14    error::{ConfigError, PostgresGraphError, QueryError},
15    overlay::OverlayState,
16    rebuild::SnapshotRebuild,
17    search::SearchPredicate,
18    sync::{SyncHealth, SyncRow},
19    topology::{GraphTopology, UniqueAdjacency},
20    traverse::{TraversalDirection, TraverseLimits, traverse_core_collect, traverse_core_count},
21};
22
23/// Runtime status returned by admin/discovery surfaces.
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25pub struct EngineStatus {
26    /// Node count recorded in Postgres metadata.
27    pub node_count: u32,
28    /// Edge count recorded in Postgres metadata.
29    pub edge_count: u32,
30    /// Whether the artifact is marked read-only.
31    pub read_only: bool,
32    /// Number of overlay edge insertions not yet compacted.
33    pub overlay_edge_count: usize,
34    /// Number of tombstoned base edges.
35    pub tombstoned_edges: usize,
36}
37
38/// Yoke cart owning snapshot bytes and parsed metadata.
39#[expect(
40    clippy::redundant_pub_crate,
41    reason = "shared with private builder module"
42)]
43pub(crate) struct EngineCart {
44    /// Owned OXGTOPO bytes backing both topology views.
45    pub backing: Vec<u8>,
46    /// Parsed Postgres metadata section.
47    pub metadata: PostgresMetadata,
48}
49
50/// Topology views borrowing the cart backing (lifetime-erased in [`Engine`]).
51#[derive(yoke::Yokeable)]
52#[yoke(prove_covariant)]
53#[expect(
54    clippy::redundant_pub_crate,
55    reason = "shared with private builder module"
56)]
57pub(crate) struct EngineState<'a> {
58    /// Forward CSR and inbound CSC opened once at build.
59    pub topology: GraphTopology<'a>,
60}
61
62/// Active [`OxGraph`] backend state loaded from OXGTOPO bytes.
63pub struct Engine {
64    /// Yoke-attached topology views and owned snapshot cart.
65    inner: Yoke<EngineState<'static>, Box<EngineCart>>,
66    /// Overlay buffers applied on top of the base artifact.
67    overlay: OverlayState,
68    /// Operational config mirrored from extension GUCs.
69    config: Config,
70    /// Reused BFS scratch (dense epoch marks and frontier queue).
71    traverse_scratch: crate::traverse::TraverseScratch,
72    /// Node-unique adjacency (empty until the first unique-profile traverse builds it).
73    unique_adjacency: UniqueAdjacency,
74    /// Whether [`Self::unique_adjacency`] has been populated from topology.
75    unique_cache_built: bool,
76}
77
78impl Engine {
79    /// Constructs an engine from validated yoke state and runtime buffers.
80    pub(crate) fn from_parts(
81        inner: Yoke<EngineState<'static>, Box<EngineCart>>,
82        overlay: OverlayState,
83        config: Config,
84        traverse_scratch: crate::traverse::TraverseScratch,
85    ) -> Self {
86        Self {
87            inner,
88            overlay,
89            config,
90            traverse_scratch,
91            unique_adjacency: UniqueAdjacency::default(),
92            unique_cache_built: false,
93        }
94    }
95
96    /// Returns the canonical node count from artifact metadata.
97    #[must_use]
98    pub fn node_count(&self) -> u32 {
99        self.inner.backing_cart().metadata.node_count.get()
100    }
101
102    /// Disjoint topology views, unique cache, overlay, and scratch for one BFS query.
103    ///
104    /// When `needs_unique` is set the node-unique adjacency is built lazily on
105    /// first use (`O(n + m log d)`) and cached for the snapshot's lifetime; engine
106    /// open stays `O(n + m)` for topology attach only. The parallel profile never
107    /// touches the unique cache.
108    ///
109    /// # Performance
110    ///
111    /// This method is `O(1)` once the cache is built, `O(n + m log d)` on the
112    /// first unique-profile query after a snapshot replacement.
113    pub(crate) fn traverse_workspace_mut(
114        &mut self,
115        needs_unique: bool,
116    ) -> (
117        &GraphTopology<'_>,
118        &UniqueAdjacency,
119        &OverlayState,
120        &mut crate::traverse::TraverseScratch,
121    ) {
122        if needs_unique && !self.unique_cache_built {
123            let forward = self.inner.get().topology.forward;
124            let inbound = self.inner.get().topology.inbound;
125            self.unique_adjacency = UniqueAdjacency::from_topology(&forward, &inbound);
126            self.unique_cache_built = true;
127        }
128        (
129            &self.inner.get().topology,
130            &self.unique_adjacency,
131            &self.overlay,
132            &mut self.traverse_scratch,
133        )
134    }
135
136    /// Loads an engine via [`EngineBuilder`].
137    ///
138    /// # Errors
139    ///
140    /// Returns [`PostgresGraphError`] when snapshot validation or topology attach fails.
141    pub fn from_snapshot_bytes(bytes: &[u8]) -> Result<Self, PostgresGraphError> {
142        EngineBuilder::new().snapshot_owned(bytes.to_vec()).build()
143    }
144
145    /// Returns operational status for admin surfaces.
146    #[must_use]
147    pub fn stats(&self) -> EngineStatus {
148        let metadata = &self.inner.backing_cart().metadata;
149        EngineStatus {
150            node_count: metadata.node_count.get(),
151            edge_count: metadata.edge_count.get(),
152            read_only: metadata.is_read_only(),
153            overlay_edge_count: self.overlay.overlay_edge_count(),
154            tombstoned_edges: self.overlay.tombstoned_edge_count(),
155        }
156    }
157
158    /// Returns the active configuration mirror.
159    #[must_use]
160    pub const fn config(&self) -> &Config {
161        &self.config
162    }
163
164    /// Updates configuration after validation.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`PostgresGraphError::Config`] when limits or freshness settings are invalid.
169    pub fn set_config(&mut self, config: Config) -> Result<(), PostgresGraphError> {
170        config.validate()?;
171        self.config = config;
172        Ok(())
173    }
174
175    /// Borrows overlay state for read-only query visibility checks.
176    #[must_use]
177    pub const fn overlay(&self) -> &OverlayState {
178        &self.overlay
179    }
180
181    /// Borrows the overlay buffer for sync replay.
182    pub const fn overlay_mut(&mut self) -> &mut OverlayState {
183        &mut self.overlay
184    }
185
186    /// Returns immutable snapshot bytes backing the engine.
187    #[must_use]
188    pub fn snapshot_bytes(&self) -> &[u8] {
189        self.inner.backing_cart().backing.as_slice()
190    }
191
192    /// Returns both topology views opened at engine build.
193    #[must_use]
194    pub fn topology(&self) -> &GraphTopology<'_> {
195        &self.inner.get().topology
196    }
197
198    /// Returns the forward CSR view (outgoing adjacency only).
199    #[must_use]
200    pub fn forward(&self) -> &crate::topology::ForwardCsr<'_> {
201        &self.topology().forward
202    }
203
204    /// Returns the inbound CSC view (incoming adjacency only).
205    #[must_use]
206    pub fn inbound(&self) -> &crate::topology::InboundCsc<'_> {
207        &self.topology().inbound
208    }
209
210    /// Replaces artifact bytes after maintenance rebuild.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`PostgresGraphError`] when the replacement snapshot fails validation.
215    pub fn replace_snapshot_bytes(&mut self, bytes: &[u8]) -> Result<(), PostgresGraphError> {
216        let engine = EngineBuilder::new()
217            .snapshot_owned(bytes.to_vec())
218            .config(self.config.clone())
219            .overlay(OverlayState::default())
220            .build()?;
221        self.inner = engine.inner;
222        self.overlay.clear();
223        self.unique_adjacency = UniqueAdjacency::default();
224        self.unique_cache_built = false;
225        self.traverse_scratch
226            .reset_after_snapshot(self.node_count() as usize);
227        Ok(())
228    }
229
230    /// Breadth-first traversal from one seed node.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`PostgresGraphError::Query`] when the seed is out of bounds or limits are zero.
235    ///
236    /// # Performance
237    ///
238    /// This method is `O(r + e)` where `r` is nodes discovered up to `result_limit` and `e` is
239    /// edges examined along the chosen profile; `≤ 1ms` for `n ≤ 10k` on typical chain fixtures.
240    pub fn traverse(
241        &mut self,
242        seed: u32,
243        limits: TraverseLimits,
244        direction: TraversalDirection,
245    ) -> Result<Vec<u32>, PostgresGraphError> {
246        let limits = limits.capped_by(self.config())?;
247        traverse_core_collect(self, &[seed], limits, direction)
248    }
249
250    /// Breadth-first traversal from multiple seed nodes in one kernel run.
251    ///
252    /// # Errors
253    ///
254    /// Returns [`PostgresGraphError::Query`] when every seed is out of bounds or limits are zero.
255    ///
256    /// # Performance
257    ///
258    /// Same as [`Self::traverse`] with multiple seeds; one kernel run.
259    pub fn traverse_from_seeds(
260        &mut self,
261        seeds: &[u32],
262        limits: TraverseLimits,
263        direction: TraversalDirection,
264    ) -> Result<Vec<u32>, PostgresGraphError> {
265        let limits = limits.capped_by(self.config())?;
266        traverse_core_collect(self, seeds, limits, direction)
267    }
268
269    /// Returns visited-node count for one seed without collecting ids.
270    ///
271    /// # Errors
272    ///
273    /// Returns [`PostgresGraphError::Query`] when the seed is out of bounds or limits are zero.
274    ///
275    /// # Performance
276    ///
277    /// This method is `O(r + e)` without output allocation; matches collect cardinality.
278    pub fn visited_count(
279        &mut self,
280        seed: u32,
281        limits: TraverseLimits,
282        direction: TraversalDirection,
283    ) -> Result<usize, PostgresGraphError> {
284        let limits = limits.capped_by(self.config())?;
285        traverse_core_count(self, &[seed], limits, direction)
286    }
287
288    /// Searches dense node ids using a simple predicate.
289    ///
290    /// # Errors
291    ///
292    /// Returns [`PostgresGraphError::Query`] when the effective limit is zero.
293    ///
294    /// # Performance
295    ///
296    /// This method is `O(n)` for `n` canonical nodes until the effective limit is reached.
297    pub fn search(
298        &self,
299        predicate: SearchPredicate,
300        limit: NonZeroUsize,
301    ) -> Result<Vec<u32>, PostgresGraphError> {
302        let effective_limit = core::cmp::min(limit.get(), self.config().search_limit as usize);
303        let effective_limit = NonZeroUsize::new(effective_limit).ok_or(QueryError::LimitZero)?;
304        let node_bound = self.forward().node_count();
305        let mut matches = Vec::new();
306        for node in 0..node_bound {
307            let node_u32 = u32::try_from(node).map_err(|_| QueryError::NodeIndexOverflow)?;
308            if !self.node_visible(node_u32) || !predicate.matches(node_u32) {
309                continue;
310            }
311            matches.push(node_u32);
312            if matches.len() >= effective_limit.get() {
313                break;
314            }
315        }
316        Ok(matches)
317    }
318
319    /// Applies sync rows to the overlay in sequence order.
320    ///
321    /// # Errors
322    ///
323    /// Returns [`PostgresGraphError::Sync`] when row sequence numbers are not monotonic.
324    pub fn apply_sync_rows(&mut self, rows: &[SyncRow]) -> Result<usize, PostgresGraphError> {
325        SyncRow::apply_in_order(rows, self.overlay_mut())
326    }
327
328    /// Returns sync overlay health for admin surfaces.
329    #[must_use]
330    pub fn sync_health(&self) -> SyncHealth {
331        let status = self.stats();
332        SyncHealth {
333            overlay_edges: status.overlay_edge_count,
334            tombstoned_edges: status.tombstoned_edges,
335            tombstoned_nodes: self.overlay().tombstoned_node_count(),
336        }
337    }
338
339    /// Rebuilds the base artifact from catalog metadata and freshly scanned edge rows.
340    ///
341    /// # Errors
342    ///
343    /// Returns [`PostgresGraphError::Config`] when maintenance is disabled, or build/validation
344    /// errors from catalog planning and snapshot encoding.
345    pub fn rebuild_from_catalog(
346        &mut self,
347        catalog: &Catalog,
348        edges: &[EdgeRow],
349        built_at_unix: u64,
350    ) -> Result<(), PostgresGraphError> {
351        if !self.config().maintenance_enabled {
352            return Err(ConfigError::MaintenanceDisabled.into());
353        }
354        let bytes = SnapshotRebuild::from_catalog_and_edges(catalog, edges, built_at_unix)?;
355        self.replace_snapshot_bytes(&bytes)
356    }
357
358    /// Returns whether `node` is visible under the active freshness policy.
359    fn node_visible(&self, node: u32) -> bool {
360        self.inner
361            .get()
362            .topology
363            .node_visible(node, TraversalDirection::Out, self.overlay())
364    }
365}