1use alloc::{boxed::Box, vec::Vec};
4use core::num::NonZeroUsize;
5
6use yoke::Yoke;
7
8use crate::{
9 artifact::PostgresMetadata,
10 build::EdgeRow,
11 builder::EngineBuilder,
12 catalog::Catalog,
13 config::Config,
14 error::{ConfigError, PostgresGraphError, QueryError},
15 overlay::OverlayState,
16 rebuild::SnapshotRebuild,
17 search::SearchPredicate,
18 sync::{SyncHealth, SyncRow},
19 topology::{GraphTopology, UniqueAdjacency},
20 traverse::{TraversalDirection, TraverseLimits, traverse_core_collect, traverse_core_count},
21};
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25pub struct EngineStatus {
26 pub node_count: u32,
28 pub edge_count: u32,
30 pub read_only: bool,
32 pub overlay_edge_count: usize,
34 pub tombstoned_edges: usize,
36}
37
38#[expect(
40 clippy::redundant_pub_crate,
41 reason = "shared with private builder module"
42)]
43pub(crate) struct EngineCart {
44 pub backing: Vec<u8>,
46 pub metadata: PostgresMetadata,
48}
49
50#[derive(yoke::Yokeable)]
52#[yoke(prove_covariant)]
53#[expect(
54 clippy::redundant_pub_crate,
55 reason = "shared with private builder module"
56)]
57pub(crate) struct EngineState<'a> {
58 pub topology: GraphTopology<'a>,
60}
61
62pub struct Engine {
64 inner: Yoke<EngineState<'static>, Box<EngineCart>>,
66 overlay: OverlayState,
68 config: Config,
70 traverse_scratch: crate::traverse::TraverseScratch,
72 unique_adjacency: UniqueAdjacency,
74 unique_cache_built: bool,
76}
77
78impl Engine {
79 pub(crate) fn from_parts(
81 inner: Yoke<EngineState<'static>, Box<EngineCart>>,
82 overlay: OverlayState,
83 config: Config,
84 traverse_scratch: crate::traverse::TraverseScratch,
85 ) -> Self {
86 Self {
87 inner,
88 overlay,
89 config,
90 traverse_scratch,
91 unique_adjacency: UniqueAdjacency::default(),
92 unique_cache_built: false,
93 }
94 }
95
96 #[must_use]
98 pub fn node_count(&self) -> u32 {
99 self.inner.backing_cart().metadata.node_count.get()
100 }
101
102 pub(crate) fn traverse_workspace_mut(
114 &mut self,
115 needs_unique: bool,
116 ) -> (
117 &GraphTopology<'_>,
118 &UniqueAdjacency,
119 &OverlayState,
120 &mut crate::traverse::TraverseScratch,
121 ) {
122 if needs_unique && !self.unique_cache_built {
123 let forward = self.inner.get().topology.forward;
124 let inbound = self.inner.get().topology.inbound;
125 self.unique_adjacency = UniqueAdjacency::from_topology(&forward, &inbound);
126 self.unique_cache_built = true;
127 }
128 (
129 &self.inner.get().topology,
130 &self.unique_adjacency,
131 &self.overlay,
132 &mut self.traverse_scratch,
133 )
134 }
135
136 pub fn from_snapshot_bytes(bytes: &[u8]) -> Result<Self, PostgresGraphError> {
142 EngineBuilder::new().snapshot_owned(bytes.to_vec()).build()
143 }
144
145 #[must_use]
147 pub fn stats(&self) -> EngineStatus {
148 let metadata = &self.inner.backing_cart().metadata;
149 EngineStatus {
150 node_count: metadata.node_count.get(),
151 edge_count: metadata.edge_count.get(),
152 read_only: metadata.is_read_only(),
153 overlay_edge_count: self.overlay.overlay_edge_count(),
154 tombstoned_edges: self.overlay.tombstoned_edge_count(),
155 }
156 }
157
158 #[must_use]
160 pub const fn config(&self) -> &Config {
161 &self.config
162 }
163
164 pub fn set_config(&mut self, config: Config) -> Result<(), PostgresGraphError> {
170 config.validate()?;
171 self.config = config;
172 Ok(())
173 }
174
175 #[must_use]
177 pub const fn overlay(&self) -> &OverlayState {
178 &self.overlay
179 }
180
181 pub const fn overlay_mut(&mut self) -> &mut OverlayState {
183 &mut self.overlay
184 }
185
186 #[must_use]
188 pub fn snapshot_bytes(&self) -> &[u8] {
189 self.inner.backing_cart().backing.as_slice()
190 }
191
192 #[must_use]
194 pub fn topology(&self) -> &GraphTopology<'_> {
195 &self.inner.get().topology
196 }
197
198 #[must_use]
200 pub fn forward(&self) -> &crate::topology::ForwardCsr<'_> {
201 &self.topology().forward
202 }
203
204 #[must_use]
206 pub fn inbound(&self) -> &crate::topology::InboundCsc<'_> {
207 &self.topology().inbound
208 }
209
210 pub fn replace_snapshot_bytes(&mut self, bytes: &[u8]) -> Result<(), PostgresGraphError> {
216 let engine = EngineBuilder::new()
217 .snapshot_owned(bytes.to_vec())
218 .config(self.config.clone())
219 .overlay(OverlayState::default())
220 .build()?;
221 self.inner = engine.inner;
222 self.overlay.clear();
223 self.unique_adjacency = UniqueAdjacency::default();
224 self.unique_cache_built = false;
225 self.traverse_scratch
226 .reset_after_snapshot(self.node_count() as usize);
227 Ok(())
228 }
229
230 pub fn traverse(
241 &mut self,
242 seed: u32,
243 limits: TraverseLimits,
244 direction: TraversalDirection,
245 ) -> Result<Vec<u32>, PostgresGraphError> {
246 let limits = limits.capped_by(self.config())?;
247 traverse_core_collect(self, &[seed], limits, direction)
248 }
249
250 pub fn traverse_from_seeds(
260 &mut self,
261 seeds: &[u32],
262 limits: TraverseLimits,
263 direction: TraversalDirection,
264 ) -> Result<Vec<u32>, PostgresGraphError> {
265 let limits = limits.capped_by(self.config())?;
266 traverse_core_collect(self, seeds, limits, direction)
267 }
268
269 pub fn visited_count(
279 &mut self,
280 seed: u32,
281 limits: TraverseLimits,
282 direction: TraversalDirection,
283 ) -> Result<usize, PostgresGraphError> {
284 let limits = limits.capped_by(self.config())?;
285 traverse_core_count(self, &[seed], limits, direction)
286 }
287
288 pub fn search(
298 &self,
299 predicate: SearchPredicate,
300 limit: NonZeroUsize,
301 ) -> Result<Vec<u32>, PostgresGraphError> {
302 let effective_limit = core::cmp::min(limit.get(), self.config().search_limit as usize);
303 let effective_limit = NonZeroUsize::new(effective_limit).ok_or(QueryError::LimitZero)?;
304 let node_bound = self.forward().node_count();
305 let mut matches = Vec::new();
306 for node in 0..node_bound {
307 let node_u32 = u32::try_from(node).map_err(|_| QueryError::NodeIndexOverflow)?;
308 if !self.node_visible(node_u32) || !predicate.matches(node_u32) {
309 continue;
310 }
311 matches.push(node_u32);
312 if matches.len() >= effective_limit.get() {
313 break;
314 }
315 }
316 Ok(matches)
317 }
318
319 pub fn apply_sync_rows(&mut self, rows: &[SyncRow]) -> Result<usize, PostgresGraphError> {
325 SyncRow::apply_in_order(rows, self.overlay_mut())
326 }
327
328 #[must_use]
330 pub fn sync_health(&self) -> SyncHealth {
331 let status = self.stats();
332 SyncHealth {
333 overlay_edges: status.overlay_edge_count,
334 tombstoned_edges: status.tombstoned_edges,
335 tombstoned_nodes: self.overlay().tombstoned_node_count(),
336 }
337 }
338
339 pub fn rebuild_from_catalog(
346 &mut self,
347 catalog: &Catalog,
348 edges: &[EdgeRow],
349 built_at_unix: u64,
350 ) -> Result<(), PostgresGraphError> {
351 if !self.config().maintenance_enabled {
352 return Err(ConfigError::MaintenanceDisabled.into());
353 }
354 let bytes = SnapshotRebuild::from_catalog_and_edges(catalog, edges, built_at_unix)?;
355 self.replace_snapshot_bytes(&bytes)
356 }
357
358 fn node_visible(&self, node: u32) -> bool {
360 self.inner
361 .get()
362 .topology
363 .node_visible(node, TraversalDirection::Out, self.overlay())
364 }
365}