1use alloc::{boxed::Box, vec::Vec};
4use core::{
5 cell::{Cell, RefCell},
6 num::NonZeroUsize,
7};
8
9use yoke::Yoke;
10
11use crate::{
12 artifact::PostgresMetadata,
13 build::EdgeRow,
14 builder::EngineBuilder,
15 catalog::Catalog,
16 config::Config,
17 error::{ConfigError, PostgresGraphError, QueryError},
18 overlay::OverlayState,
19 rebuild::SnapshotRebuild,
20 search::SearchPredicate,
21 sync::{SyncHealth, SyncRow},
22 topology::{GraphTopology, TopologyHot, UniqueAdjacency},
23 traverse::{TraversalDirection, TraverseLimits, traverse_core_collect, traverse_core_count},
24};
25
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub struct EngineStatus {
29 pub node_count: u32,
31 pub edge_count: u32,
33 pub read_only: bool,
35 pub overlay_edge_count: usize,
37 pub tombstoned_edges: usize,
39}
40
41#[expect(
43 clippy::redundant_pub_crate,
44 reason = "shared with private builder module"
45)]
46pub(crate) struct EngineCart {
47 pub backing: Vec<u8>,
49 pub metadata: PostgresMetadata,
51}
52
53#[derive(yoke::Yokeable)]
55#[yoke(prove_covariant)]
56#[expect(
57 clippy::redundant_pub_crate,
58 reason = "shared with private builder module"
59)]
60pub(crate) struct EngineState<'a> {
61 pub topology: GraphTopology<'a>,
63}
64
65pub struct Engine {
67 inner: Yoke<EngineState<'static>, Box<EngineCart>>,
69 overlay: OverlayState,
71 config: Config,
73 traverse_scratch: crate::traverse::TraverseScratch,
75 unique_adjacency: RefCell<UniqueAdjacency>,
77 unique_cache_built: Cell<bool>,
79}
80
81impl Engine {
82 #[expect(clippy::missing_const_for_fn, reason = "Yoke attachment is not const")]
84 #[expect(
85 clippy::too_many_arguments,
86 reason = "constructs the validated Engine field set without intermediate mutation"
87 )]
88 pub(crate) fn from_parts(
89 inner: Yoke<EngineState<'static>, Box<EngineCart>>,
90 overlay: OverlayState,
91 config: Config,
92 traverse_scratch: crate::traverse::TraverseScratch,
93 unique_adjacency: RefCell<UniqueAdjacency>,
94 unique_cache_built: Cell<bool>,
95 ) -> Self {
96 Self {
97 inner,
98 overlay,
99 config,
100 traverse_scratch,
101 unique_adjacency,
102 unique_cache_built,
103 }
104 }
105
106 #[must_use]
108 pub fn node_count(&self) -> u32 {
109 self.inner.backing_cart().metadata.node_count.get()
110 }
111
112 pub(crate) fn traverse_workspace_mut(
117 &mut self,
118 ) -> (
119 TopologyHot<'_>,
120 core::cell::Ref<'_, UniqueAdjacency>,
121 &OverlayState,
122 &mut crate::traverse::TraverseScratch,
123 ) {
124 if !self.unique_cache_built.get() {
125 let forward = self.inner.get().topology.forward;
126 let inbound = self.inner.get().topology.inbound;
127 *self.unique_adjacency.borrow_mut() =
128 UniqueAdjacency::from_topology(&forward, &inbound);
129 self.unique_cache_built.set(true);
130 }
131 let unique = self.unique_adjacency.borrow();
132 let topology = &self.inner.get().topology;
133 let hot = TopologyHot::from_topology(topology);
134 (hot, unique, &self.overlay, &mut self.traverse_scratch)
135 }
136
137 pub fn from_snapshot_bytes(bytes: &[u8]) -> Result<Self, PostgresGraphError> {
143 EngineBuilder::new().snapshot_owned(bytes.to_vec()).build()
144 }
145
146 #[must_use]
148 pub fn status(&self) -> EngineStatus {
149 let metadata = &self.inner.backing_cart().metadata;
150 EngineStatus {
151 node_count: metadata.node_count.get(),
152 edge_count: metadata.edge_count.get(),
153 read_only: metadata.is_read_only(),
154 overlay_edge_count: self.overlay.added_edges.len(),
155 tombstoned_edges: self.overlay.tombstoned_edges.len(),
156 }
157 }
158
159 #[must_use]
161 pub const fn config(&self) -> &Config {
162 &self.config
163 }
164
165 pub fn set_config(&mut self, config: Config) -> Result<(), PostgresGraphError> {
171 config.validate()?;
172 self.config = config;
173 Ok(())
174 }
175
176 #[must_use]
178 pub const fn overlay(&self) -> &OverlayState {
179 &self.overlay
180 }
181
182 pub const fn overlay_mut(&mut self) -> &mut OverlayState {
184 &mut self.overlay
185 }
186
187 #[must_use]
189 pub fn snapshot_bytes(&self) -> &[u8] {
190 self.inner.backing_cart().backing.as_slice()
191 }
192
193 #[must_use]
195 pub fn topology(&self) -> &GraphTopology<'_> {
196 &self.inner.get().topology
197 }
198
199 #[must_use]
201 pub fn forward(&self) -> &crate::topology::ForwardCsr<'_> {
202 &self.topology().forward
203 }
204
205 #[must_use]
207 pub fn inbound(&self) -> &crate::topology::InboundCsc<'_> {
208 &self.topology().inbound
209 }
210
211 pub fn replace_snapshot_bytes(&mut self, bytes: &[u8]) -> Result<(), PostgresGraphError> {
217 let engine = EngineBuilder::new()
218 .snapshot_owned(bytes.to_vec())
219 .config(self.config.clone())
220 .overlay(OverlayState::default())
221 .build()?;
222 self.inner = engine.inner;
223 self.overlay.clear();
224 *self.unique_adjacency.borrow_mut() = UniqueAdjacency::default();
225 self.unique_cache_built.set(false);
226 self.traverse_scratch
227 .reset_after_snapshot(self.node_count() as usize);
228 Ok(())
229 }
230
231 pub fn traverse(
242 &mut self,
243 seed: u32,
244 limits: TraverseLimits,
245 direction: TraversalDirection,
246 ) -> Result<Vec<u32>, PostgresGraphError> {
247 let limits = limits.capped_by(self.config())?;
248 traverse_core_collect(self, &[seed], limits, direction)
249 }
250
251 pub fn traverse_from_seeds(
261 &mut self,
262 seeds: &[u32],
263 limits: TraverseLimits,
264 direction: TraversalDirection,
265 ) -> Result<Vec<u32>, PostgresGraphError> {
266 let limits = limits.capped_by(self.config())?;
267 traverse_core_collect(self, seeds, limits, direction)
268 }
269
270 pub fn visited_count(
280 &mut self,
281 seed: u32,
282 limits: TraverseLimits,
283 direction: TraversalDirection,
284 ) -> Result<usize, PostgresGraphError> {
285 let limits = limits.capped_by(self.config())?;
286 traverse_core_count(self, &[seed], limits, direction)
287 }
288
289 pub fn search(
299 &self,
300 predicate: SearchPredicate,
301 limit: NonZeroUsize,
302 ) -> Result<Vec<u32>, PostgresGraphError> {
303 let effective_limit = core::cmp::min(limit.get(), self.config().search_limit as usize);
304 let effective_limit = NonZeroUsize::new(effective_limit).ok_or(QueryError::LimitZero)?;
305 let node_bound = self.forward().node_count();
306 let mut matches = Vec::new();
307 for node in 0..node_bound {
308 let node_u32 = u32::try_from(node).map_err(|_| QueryError::NodeIndexOverflow)?;
309 if !self.node_visible(node_u32) || !predicate.matches(node_u32) {
310 continue;
311 }
312 matches.push(node_u32);
313 if matches.len() >= effective_limit.get() {
314 break;
315 }
316 }
317 Ok(matches)
318 }
319
320 pub fn apply_sync_rows(&mut self, rows: &[SyncRow]) -> Result<usize, PostgresGraphError> {
326 SyncRow::apply_in_order(rows, self.overlay_mut())
327 }
328
329 #[must_use]
331 pub fn sync_health(&self) -> SyncHealth {
332 let status = self.status();
333 SyncHealth {
334 overlay_edges: status.overlay_edge_count,
335 tombstoned_edges: status.tombstoned_edges,
336 tombstoned_nodes: self.overlay().tombstoned_nodes.len(),
337 }
338 }
339
340 pub fn rebuild_from_catalog(
347 &mut self,
348 catalog: &Catalog,
349 edges: &[EdgeRow],
350 built_at_unix: u64,
351 ) -> Result<(), PostgresGraphError> {
352 if !self.config().maintenance_enabled {
353 return Err(ConfigError::MaintenanceDisabled.into());
354 }
355 let bytes = SnapshotRebuild::from_catalog_and_edges(catalog, edges, built_at_unix)?;
356 self.replace_snapshot_bytes(&bytes)
357 }
358
359 fn node_visible(&self, node: u32) -> bool {
361 self.inner
362 .get()
363 .topology
364 .node_visible(node, TraversalDirection::Out, self.overlay())
365 }
366}