Skip to main content

oxgraph_postgres/
engine.rs

1//! Active graph engine: snapshot backing, dual topology, overlays, and config.
2
3use alloc::{boxed::Box, vec::Vec};
4use core::{
5    cell::{Cell, RefCell},
6    num::NonZeroUsize,
7};
8
9use yoke::Yoke;
10
11use crate::{
12    artifact::PostgresMetadata,
13    build::EdgeRow,
14    builder::EngineBuilder,
15    catalog::Catalog,
16    config::Config,
17    error::{ConfigError, PostgresGraphError, QueryError},
18    overlay::OverlayState,
19    rebuild::SnapshotRebuild,
20    search::SearchPredicate,
21    sync::{SyncHealth, SyncRow},
22    topology::{GraphTopology, TopologyHot, UniqueAdjacency},
23    traverse::{TraversalDirection, TraverseLimits, traverse_core_collect, traverse_core_count},
24};
25
26/// Runtime status returned by admin/discovery surfaces.
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub struct EngineStatus {
29    /// Node count recorded in Postgres metadata.
30    pub node_count: u32,
31    /// Edge count recorded in Postgres metadata.
32    pub edge_count: u32,
33    /// Whether the artifact is marked read-only.
34    pub read_only: bool,
35    /// Number of overlay edge insertions not yet compacted.
36    pub overlay_edge_count: usize,
37    /// Number of tombstoned base edges.
38    pub tombstoned_edges: usize,
39}
40
41/// Yoke cart owning snapshot bytes and parsed metadata.
42#[expect(
43    clippy::redundant_pub_crate,
44    reason = "shared with private builder module"
45)]
46pub(crate) struct EngineCart {
47    /// Owned OXGTOPO bytes backing both topology views.
48    pub backing: Vec<u8>,
49    /// Parsed Postgres metadata section.
50    pub metadata: PostgresMetadata,
51}
52
53/// Topology views borrowing the cart backing (lifetime-erased in [`Engine`]).
54#[derive(yoke::Yokeable)]
55#[yoke(prove_covariant)]
56#[expect(
57    clippy::redundant_pub_crate,
58    reason = "shared with private builder module"
59)]
60pub(crate) struct EngineState<'a> {
61    /// Forward CSR and inbound CSC opened once at build.
62    pub topology: GraphTopology<'a>,
63}
64
65/// Active [`OxGraph`] backend state loaded from OXGTOPO bytes.
66pub struct Engine {
67    /// Yoke-attached topology views and owned snapshot cart.
68    inner: Yoke<EngineState<'static>, Box<EngineCart>>,
69    /// Overlay buffers applied on top of the base artifact.
70    overlay: OverlayState,
71    /// Operational config mirrored from extension GUCs.
72    config: Config,
73    /// Reused BFS scratch (epoch marks, dual frontier waves, collect buffer).
74    traverse_scratch: crate::traverse::TraverseScratch,
75    /// Node-unique adjacency (placeholder until first traverse builds it).
76    unique_adjacency: RefCell<UniqueAdjacency>,
77    /// Whether [`Self::unique_adjacency`] has been populated from topology.
78    unique_cache_built: Cell<bool>,
79}
80
81impl Engine {
82    /// Constructs an engine from validated yoke state and runtime buffers.
83    #[expect(clippy::missing_const_for_fn, reason = "Yoke attachment is not const")]
84    #[expect(
85        clippy::too_many_arguments,
86        reason = "constructs the validated Engine field set without intermediate mutation"
87    )]
88    pub(crate) fn from_parts(
89        inner: Yoke<EngineState<'static>, Box<EngineCart>>,
90        overlay: OverlayState,
91        config: Config,
92        traverse_scratch: crate::traverse::TraverseScratch,
93        unique_adjacency: RefCell<UniqueAdjacency>,
94        unique_cache_built: Cell<bool>,
95    ) -> Self {
96        Self {
97            inner,
98            overlay,
99            config,
100            traverse_scratch,
101            unique_adjacency,
102            unique_cache_built,
103        }
104    }
105
106    /// Returns the canonical node count from artifact metadata.
107    #[must_use]
108    pub fn node_count(&self) -> u32 {
109        self.inner.backing_cart().metadata.node_count.get()
110    }
111
112    /// Disjoint hot topology, unique cache, overlay, and scratch for one BFS query.
113    ///
114    /// Unique adjacency is built lazily on the first traverse (`O(n + m log d)`); engine open
115    /// stays `O(n + m)` for topology attach only.
116    pub(crate) fn traverse_workspace_mut(
117        &mut self,
118    ) -> (
119        TopologyHot<'_>,
120        core::cell::Ref<'_, UniqueAdjacency>,
121        &OverlayState,
122        &mut crate::traverse::TraverseScratch,
123    ) {
124        if !self.unique_cache_built.get() {
125            let forward = self.inner.get().topology.forward;
126            let inbound = self.inner.get().topology.inbound;
127            *self.unique_adjacency.borrow_mut() =
128                UniqueAdjacency::from_topology(&forward, &inbound);
129            self.unique_cache_built.set(true);
130        }
131        let unique = self.unique_adjacency.borrow();
132        let topology = &self.inner.get().topology;
133        let hot = TopologyHot::from_topology(topology);
134        (hot, unique, &self.overlay, &mut self.traverse_scratch)
135    }
136
137    /// Loads an engine via [`EngineBuilder`].
138    ///
139    /// # Errors
140    ///
141    /// Returns [`PostgresGraphError`] when snapshot validation or topology attach fails.
142    pub fn from_snapshot_bytes(bytes: &[u8]) -> Result<Self, PostgresGraphError> {
143        EngineBuilder::new().snapshot_owned(bytes.to_vec()).build()
144    }
145
146    /// Returns operational status for admin surfaces.
147    #[must_use]
148    pub fn status(&self) -> EngineStatus {
149        let metadata = &self.inner.backing_cart().metadata;
150        EngineStatus {
151            node_count: metadata.node_count.get(),
152            edge_count: metadata.edge_count.get(),
153            read_only: metadata.is_read_only(),
154            overlay_edge_count: self.overlay.added_edges.len(),
155            tombstoned_edges: self.overlay.tombstoned_edges.len(),
156        }
157    }
158
159    /// Returns the active configuration mirror.
160    #[must_use]
161    pub const fn config(&self) -> &Config {
162        &self.config
163    }
164
165    /// Updates configuration after validation.
166    ///
167    /// # Errors
168    ///
169    /// Returns [`PostgresGraphError::Config`] when limits or freshness settings are invalid.
170    pub fn set_config(&mut self, config: Config) -> Result<(), PostgresGraphError> {
171        config.validate()?;
172        self.config = config;
173        Ok(())
174    }
175
176    /// Borrows overlay state for read-only query visibility checks.
177    #[must_use]
178    pub const fn overlay(&self) -> &OverlayState {
179        &self.overlay
180    }
181
182    /// Borrows the overlay buffer for sync replay.
183    pub const fn overlay_mut(&mut self) -> &mut OverlayState {
184        &mut self.overlay
185    }
186
187    /// Returns immutable snapshot bytes backing the engine.
188    #[must_use]
189    pub fn snapshot_bytes(&self) -> &[u8] {
190        self.inner.backing_cart().backing.as_slice()
191    }
192
193    /// Returns both topology views opened at engine build.
194    #[must_use]
195    pub fn topology(&self) -> &GraphTopology<'_> {
196        &self.inner.get().topology
197    }
198
199    /// Returns the forward CSR view (outgoing adjacency only).
200    #[must_use]
201    pub fn forward(&self) -> &crate::topology::ForwardCsr<'_> {
202        &self.topology().forward
203    }
204
205    /// Returns the inbound CSC view (incoming adjacency only).
206    #[must_use]
207    pub fn inbound(&self) -> &crate::topology::InboundCsc<'_> {
208        &self.topology().inbound
209    }
210
211    /// Replaces artifact bytes after maintenance rebuild.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`PostgresGraphError`] when the replacement snapshot fails validation.
216    pub fn replace_snapshot_bytes(&mut self, bytes: &[u8]) -> Result<(), PostgresGraphError> {
217        let engine = EngineBuilder::new()
218            .snapshot_owned(bytes.to_vec())
219            .config(self.config.clone())
220            .overlay(OverlayState::default())
221            .build()?;
222        self.inner = engine.inner;
223        self.overlay.clear();
224        *self.unique_adjacency.borrow_mut() = UniqueAdjacency::default();
225        self.unique_cache_built.set(false);
226        self.traverse_scratch
227            .reset_after_snapshot(self.node_count() as usize);
228        Ok(())
229    }
230
231    /// Breadth-first traversal from one seed node.
232    ///
233    /// # Errors
234    ///
235    /// Returns [`PostgresGraphError::Query`] when the seed is out of bounds or limits are zero.
236    ///
237    /// # Performance
238    ///
239    /// This method is `O(r + e)` where `r` is nodes discovered up to `result_limit` and `e` is
240    /// edges examined along the chosen profile; `≤ 1ms` for `n ≤ 10k` on typical chain fixtures.
241    pub fn traverse(
242        &mut self,
243        seed: u32,
244        limits: TraverseLimits,
245        direction: TraversalDirection,
246    ) -> Result<Vec<u32>, PostgresGraphError> {
247        let limits = limits.capped_by(self.config())?;
248        traverse_core_collect(self, &[seed], limits, direction)
249    }
250
251    /// Breadth-first traversal from multiple seed nodes in one kernel run.
252    ///
253    /// # Errors
254    ///
255    /// Returns [`PostgresGraphError::Query`] when every seed is out of bounds or limits are zero.
256    ///
257    /// # Performance
258    ///
259    /// Same as [`Self::traverse`] with multiple seeds; one kernel run.
260    pub fn traverse_from_seeds(
261        &mut self,
262        seeds: &[u32],
263        limits: TraverseLimits,
264        direction: TraversalDirection,
265    ) -> Result<Vec<u32>, PostgresGraphError> {
266        let limits = limits.capped_by(self.config())?;
267        traverse_core_collect(self, seeds, limits, direction)
268    }
269
270    /// Returns visited-node count for one seed without collecting ids.
271    ///
272    /// # Errors
273    ///
274    /// Returns [`PostgresGraphError::Query`] when the seed is out of bounds or limits are zero.
275    ///
276    /// # Performance
277    ///
278    /// This method is `O(r + e)` without output allocation; matches collect cardinality.
279    pub fn visited_count(
280        &mut self,
281        seed: u32,
282        limits: TraverseLimits,
283        direction: TraversalDirection,
284    ) -> Result<usize, PostgresGraphError> {
285        let limits = limits.capped_by(self.config())?;
286        traverse_core_count(self, &[seed], limits, direction)
287    }
288
289    /// Searches dense node ids using a simple predicate.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`PostgresGraphError::Query`] when the effective limit is zero.
294    ///
295    /// # Performance
296    ///
297    /// This method is `O(n)` for `n` canonical nodes until the effective limit is reached.
298    pub fn search(
299        &self,
300        predicate: SearchPredicate,
301        limit: NonZeroUsize,
302    ) -> Result<Vec<u32>, PostgresGraphError> {
303        let effective_limit = core::cmp::min(limit.get(), self.config().search_limit as usize);
304        let effective_limit = NonZeroUsize::new(effective_limit).ok_or(QueryError::LimitZero)?;
305        let node_bound = self.forward().node_count();
306        let mut matches = Vec::new();
307        for node in 0..node_bound {
308            let node_u32 = u32::try_from(node).map_err(|_| QueryError::NodeIndexOverflow)?;
309            if !self.node_visible(node_u32) || !predicate.matches(node_u32) {
310                continue;
311            }
312            matches.push(node_u32);
313            if matches.len() >= effective_limit.get() {
314                break;
315            }
316        }
317        Ok(matches)
318    }
319
320    /// Applies sync rows to the overlay in sequence order.
321    ///
322    /// # Errors
323    ///
324    /// Returns [`PostgresGraphError::Sync`] when row sequence numbers are not monotonic.
325    pub fn apply_sync_rows(&mut self, rows: &[SyncRow]) -> Result<usize, PostgresGraphError> {
326        SyncRow::apply_in_order(rows, self.overlay_mut())
327    }
328
329    /// Returns sync overlay health for admin surfaces.
330    #[must_use]
331    pub fn sync_health(&self) -> SyncHealth {
332        let status = self.status();
333        SyncHealth {
334            overlay_edges: status.overlay_edge_count,
335            tombstoned_edges: status.tombstoned_edges,
336            tombstoned_nodes: self.overlay().tombstoned_nodes.len(),
337        }
338    }
339
340    /// Rebuilds the base artifact from catalog metadata and freshly scanned edge rows.
341    ///
342    /// # Errors
343    ///
344    /// Returns [`PostgresGraphError::Config`] when maintenance is disabled, or build/validation
345    /// errors from catalog planning and snapshot encoding.
346    pub fn rebuild_from_catalog(
347        &mut self,
348        catalog: &Catalog,
349        edges: &[EdgeRow],
350        built_at_unix: u64,
351    ) -> Result<(), PostgresGraphError> {
352        if !self.config().maintenance_enabled {
353            return Err(ConfigError::MaintenanceDisabled.into());
354        }
355        let bytes = SnapshotRebuild::from_catalog_and_edges(catalog, edges, built_at_unix)?;
356        self.replace_snapshot_bytes(&bytes)
357    }
358
359    /// Returns whether `node` is visible under the active freshness policy.
360    fn node_visible(&self, node: u32) -> bool {
361        self.inner
362            .get()
363            .topology
364            .node_visible(node, TraversalDirection::Out, self.overlay())
365    }
366}