1use alloc::vec::Vec;
4
5use oxgraph_csc::{CscNodeId, CscSnapshotGraph};
6use oxgraph_csr::{CsrNodeId, CsrSnapshotGraph};
7use oxgraph_graph::{
8 ContainsElement, EdgeTargetGraph, ElementIndex, ElementPredecessors, ElementSuccessors,
9 OutgoingGraph, OutgoingNeighborsGraph, TopologyBase, TopologyCounts,
10};
11use oxgraph_snapshot::Snapshot;
12
13use crate::{
14 artifact::{
15 SNAPSHOT_KIND_PG_INBOUND_OFFSETS_U32, SNAPSHOT_KIND_PG_INBOUND_TARGETS_U32, read_metadata,
16 },
17 error::{BuildError, PostgresGraphError},
18 overlay::OverlayState,
19};
20
21const INBOUND_SECTION_VERSION: u32 = oxgraph_csr::SNAPSHOT_CSR_SECTION_VERSION;
24
25#[derive(Clone, Copy, Debug)]
27pub struct ForwardCsr<'view>(pub(crate) CsrSnapshotGraph<'view, u32, u32>);
28
29#[derive(Clone, Copy, Debug)]
31pub struct InboundCsc<'view>(pub(crate) CscSnapshotGraph<'view>);
32
33#[derive(Clone, Copy, Debug, yoke::Yokeable)]
35#[yoke(prove_covariant)]
36pub struct GraphTopology<'view> {
37 pub forward: ForwardCsr<'view>,
39 pub inbound: InboundCsc<'view>,
41}
42
43#[derive(Clone, Debug, Default, PartialEq, Eq)]
45#[expect(
46 clippy::redundant_pub_crate,
47 reason = "shared with engine and traversal modules"
48)]
49pub(crate) struct UniqueAdjacency {
50 outgoing_offsets: Vec<usize>,
52 outgoing_targets: Vec<u32>,
54 incoming_offsets: Vec<usize>,
56 incoming_sources: Vec<u32>,
58}
59
60impl UniqueAdjacency {
61 #[must_use]
68 pub(crate) fn from_topology(forward: &ForwardCsr<'_>, inbound: &InboundCsc<'_>) -> Self {
69 let node_count = forward.node_count();
70 let (outgoing_offsets, outgoing_targets) =
71 Self::build_unique_rows(node_count, |node| forward.successors(node));
72 let (incoming_offsets, incoming_sources) =
73 Self::build_unique_rows(node_count, |node| inbound.predecessors(node));
74 Self {
75 outgoing_offsets,
76 outgoing_targets,
77 incoming_offsets,
78 incoming_sources,
79 }
80 }
81
82 fn build_unique_rows<I>(
84 node_count: usize,
85 mut neighbors: impl FnMut(u32) -> I,
86 ) -> (Vec<usize>, Vec<u32>)
87 where
88 I: Iterator<Item = u32>,
89 {
90 let mut offsets = Vec::with_capacity(node_count.saturating_add(1));
91 let mut targets = Vec::new();
92 let mut scratch = Vec::new();
93 offsets.push(0);
94 let Ok(node_bound) = u32::try_from(node_count) else {
95 return (offsets, targets);
96 };
97 for node_id in 0..node_bound {
98 scratch.clear();
99 scratch.extend(neighbors(node_id));
100 scratch.sort_unstable();
101 scratch.dedup();
102 targets.extend_from_slice(&scratch);
103 offsets.push(targets.len());
104 }
105 (offsets, targets)
106 }
107
108 fn row<'a>(offsets: &[usize], targets: &'a [u32], node: u32) -> &'a [u32] {
110 let index = node as usize;
111 let Some((&start, &end)) = offsets.get(index).zip(offsets.get(index.saturating_add(1)))
112 else {
113 return &[];
114 };
115 &targets[start..end]
116 }
117
118 #[must_use]
124 pub(crate) fn outgoing(&self, source: u32) -> &[u32] {
125 Self::row(&self.outgoing_offsets, &self.outgoing_targets, source)
126 }
127
128 #[must_use]
134 pub(crate) fn incoming(&self, target: u32) -> &[u32] {
135 Self::row(&self.incoming_offsets, &self.incoming_sources, target)
136 }
137}
138
139impl GraphTopology<'_> {
140 #[must_use]
142 pub(crate) fn node_visible(
143 &self,
144 node: u32,
145 direction: crate::traverse::TraversalDirection,
146 overlay: &OverlayState,
147 ) -> bool {
148 match direction {
149 crate::traverse::TraversalDirection::Out => self.forward.node_visible(node, overlay),
150 crate::traverse::TraversalDirection::In => self.inbound.node_visible(node, overlay),
151 }
152 }
153}
154
155impl<'view> GraphTopology<'view> {
156 pub fn open(snapshot: &Snapshot<'view>) -> Result<Self, PostgresGraphError> {
166 let metadata = read_metadata(snapshot)?;
167 if !metadata.has_reverse_index() {
168 return Err(PostgresGraphError::Build(BuildError::MissingReverseIndex));
169 }
170 let forward = ForwardCsr(CsrSnapshotGraph::from_snapshot(snapshot)?);
171 let inbound = InboundCsc(CscSnapshotGraph::from_snapshot_with_kinds(
172 snapshot,
173 SNAPSHOT_KIND_PG_INBOUND_OFFSETS_U32,
174 SNAPSHOT_KIND_PG_INBOUND_TARGETS_U32,
175 INBOUND_SECTION_VERSION,
176 )?);
177 if forward.0.element_count() != inbound.0.node_count() {
178 return Err(PostgresGraphError::Build(
179 BuildError::TopologyNodeCountMismatch,
180 ));
181 }
182 if forward.0.relation_count() != inbound.0.relation_count() {
183 return Err(PostgresGraphError::Build(
184 BuildError::TopologyEdgeCountMismatch,
185 ));
186 }
187 if u32::try_from(forward.0.element_count()).ok() != Some(metadata.node_count.get()) {
188 return Err(PostgresGraphError::Build(
189 BuildError::MetadataNodeCountMismatch,
190 ));
191 }
192 if u32::try_from(forward.0.relation_count()).ok() != Some(metadata.edge_count.get()) {
193 return Err(PostgresGraphError::Build(
194 BuildError::MetadataEdgeCountMismatch,
195 ));
196 }
197 Ok(Self { forward, inbound })
198 }
199}
200
201impl ForwardCsr<'_> {
202 #[must_use]
208 pub fn node_count(&self) -> usize {
209 self.0.element_count()
210 }
211
212 #[must_use]
218 pub fn successors(&self, source: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
219 self.0
220 .outgoing_neighbors(CsrNodeId::new(source))
221 .map(CsrNodeId::get)
222 }
223
224 #[must_use]
226 pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
227 (node as usize) < self.node_count()
228 && (!overlay.has_node_tombstones() || overlay.node_visible(node))
229 }
230}
231
232impl InboundCsc<'_> {
233 #[must_use]
239 pub fn node_count(&self) -> usize {
240 self.0.node_count()
241 }
242
243 #[must_use]
249 pub fn predecessors(&self, target: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
250 self.0
251 .predecessors(CscNodeId::new(target))
252 .map(CscNodeId::get)
253 }
254
255 #[must_use]
257 pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
258 (node as usize) < self.node_count()
259 && (!overlay.has_node_tombstones() || overlay.node_visible(node))
260 }
261}
262
263#[derive(Clone, Copy, Debug)]
269#[expect(
270 clippy::redundant_pub_crate,
271 reason = "shared with the traverse view-construction path"
272)]
273pub(crate) struct OverlayViewFlags {
274 pub use_unique: bool,
277 pub merge_overlay: bool,
279 pub check_nodes: bool,
281}
282
283#[derive(Clone, Copy, Debug)]
291#[expect(
292 clippy::redundant_pub_crate,
293 reason = "constructed by the traverse module from engine-owned state"
294)]
295pub(crate) struct OverlayOutView<'view> {
296 forward: ForwardCsr<'view>,
298 overlay: &'view OverlayState,
300 unique: &'view UniqueAdjacency,
302 flags: OverlayViewFlags,
304 node_count: usize,
306}
307
308#[derive(Clone, Copy, Debug)]
317#[expect(
318 clippy::redundant_pub_crate,
319 reason = "constructed by the traverse module from engine-owned state"
320)]
321pub(crate) struct OverlayInView<'view> {
322 inbound: InboundCsc<'view>,
324 forward: ForwardCsr<'view>,
326 overlay: &'view OverlayState,
328 unique: &'view UniqueAdjacency,
330 flags: OverlayViewFlags,
332 node_count: usize,
334}
335
336impl<'view> OverlayOutView<'view> {
337 #[must_use]
343 pub(crate) const fn new(
344 topology: &GraphTopology<'view>,
345 overlay: &'view OverlayState,
346 unique: &'view UniqueAdjacency,
347 flags: OverlayViewFlags,
348 node_count: usize,
349 ) -> Self {
350 Self {
351 forward: topology.forward,
352 overlay,
353 unique,
354 flags,
355 node_count,
356 }
357 }
358}
359
360impl<'view> OverlayInView<'view> {
361 #[must_use]
367 pub(crate) const fn new(
368 topology: &GraphTopology<'view>,
369 overlay: &'view OverlayState,
370 unique: &'view UniqueAdjacency,
371 flags: OverlayViewFlags,
372 node_count: usize,
373 ) -> Self {
374 Self {
375 inbound: topology.inbound,
376 forward: topology.forward,
377 overlay,
378 unique,
379 flags,
380 node_count,
381 }
382 }
383}
384
385fn has_visible_forward_edge(
388 forward: &CsrSnapshotGraph<'_, u32, u32>,
389 overlay: &OverlayState,
390 source: u32,
391 target: u32,
392) -> bool {
393 let target_id = CsrNodeId::new(target);
394 for edge in OutgoingGraph::outgoing_edges(forward, CsrNodeId::new(source)) {
395 if EdgeTargetGraph::target(forward, edge) == target_id && overlay.edge_visible(edge.get()) {
396 return true;
397 }
398 }
399 false
400}
401
402enum OutBaseSource<'view> {
404 Unique(core::slice::Iter<'view, u32>),
406 Parallel {
408 graph: CsrSnapshotGraph<'view, u32, u32>,
410 edges: oxgraph_csr::CsrOutEdges<u32>,
412 edge_tombstones: bool,
414 },
415}
416
417impl OutBaseSource<'_> {
418 fn next_raw(&mut self, overlay: &OverlayState) -> Option<u32> {
421 match self {
422 Self::Unique(iter) => iter.next().copied(),
423 Self::Parallel {
424 graph,
425 edges,
426 edge_tombstones,
427 } => loop {
428 let edge = edges.next()?;
429 if *edge_tombstones && !overlay.edge_visible(edge.get()) {
430 continue;
431 }
432 return Some(EdgeTargetGraph::target(graph, edge).get());
433 },
434 }
435 }
436}
437
438#[expect(
448 clippy::redundant_pub_crate,
449 reason = "associated successor type of a crate-internal topology view"
450)]
451pub(crate) struct OverlayOutSuccessors<'view> {
452 base: OutBaseSource<'view>,
454 overlay: core::slice::Iter<'view, u32>,
456 overlay_state: &'view OverlayState,
458 check_nodes: bool,
460 node_count: u32,
462}
463
464impl OverlayOutSuccessors<'_> {
465 fn admit(&self, node: u32) -> bool {
467 node < self.node_count && (!self.check_nodes || self.overlay_state.node_visible(node))
468 }
469
470 fn next_base(&mut self) -> Option<u32> {
472 while let Some(candidate) = self.base.next_raw(self.overlay_state) {
473 if self.admit(candidate) {
474 return Some(candidate);
475 }
476 }
477 None
478 }
479}
480
481impl Iterator for OverlayOutSuccessors<'_> {
482 type Item = u32;
483
484 fn next(&mut self) -> Option<u32> {
485 if let Some(target) = self.next_base() {
486 return Some(target);
487 }
488 loop {
489 let candidate = *self.overlay.next()?;
490 if self.admit(candidate) {
491 return Some(candidate);
492 }
493 }
494 }
495}
496
497impl TopologyBase for OverlayOutView<'_> {
498 type ElementId = u32;
499 type RelationId = u32;
500}
501
502impl ContainsElement for OverlayOutView<'_> {
503 fn contains_element(&self, element: u32) -> bool {
504 (element as usize) < self.node_count
505 && (!self.flags.check_nodes || self.overlay.node_visible(element))
506 }
507}
508
509impl ElementIndex for OverlayOutView<'_> {
510 fn element_bound(&self) -> usize {
511 self.node_count
512 }
513
514 fn element_index(&self, element: u32) -> usize {
515 element as usize
516 }
517}
518
519impl ElementSuccessors for OverlayOutView<'_> {
520 type Successors<'iter>
521 = OverlayOutSuccessors<'iter>
522 where
523 Self: 'iter;
524
525 fn element_successors(&self, element: u32) -> Self::Successors<'_> {
526 let base = if self.flags.use_unique {
527 OutBaseSource::Unique(self.unique.outgoing(element).iter())
528 } else {
529 OutBaseSource::Parallel {
530 graph: self.forward.0,
531 edges: OutgoingGraph::outgoing_edges(&self.forward.0, CsrNodeId::new(element)),
532 edge_tombstones: self.overlay.has_edge_tombstones(),
533 }
534 };
535 let overlay_row: &[u32] = if self.flags.merge_overlay {
536 self.overlay.overlay_targets(element)
537 } else {
538 &[]
539 };
540 OverlayOutSuccessors {
541 base,
542 overlay: overlay_row.iter(),
543 overlay_state: self.overlay,
544 check_nodes: self.flags.check_nodes,
545 node_count: u32::try_from(self.node_count).unwrap_or(u32::MAX),
546 }
547 }
548}
549
550enum InBaseSource<'view> {
552 Unique(core::slice::Iter<'view, u32>),
554 Parallel {
556 preds: oxgraph_csc::CscPredecessors<'view, u32, u32>,
558 forward: CsrSnapshotGraph<'view, u32, u32>,
560 current: u32,
562 edge_tombstones: bool,
564 },
565}
566
567impl InBaseSource<'_> {
568 fn next_raw(&mut self, overlay: &OverlayState) -> Option<u32> {
571 match self {
572 Self::Unique(iter) => iter.next().copied(),
573 Self::Parallel {
574 preds,
575 forward,
576 current,
577 edge_tombstones,
578 } => loop {
579 let pred = preds.next()?.get();
580 if *edge_tombstones && !has_visible_forward_edge(forward, overlay, pred, *current) {
581 continue;
582 }
583 return Some(pred);
584 },
585 }
586 }
587}
588
589#[expect(
600 clippy::redundant_pub_crate,
601 reason = "associated predecessor type of a crate-internal topology view"
602)]
603pub(crate) struct OverlayInPredecessors<'view> {
604 base: InBaseSource<'view>,
606 overlay: core::slice::Iter<'view, u32>,
608 overlay_state: &'view OverlayState,
610 check_nodes: bool,
612 node_count: u32,
614}
615
616impl OverlayInPredecessors<'_> {
617 fn admit(&self, node: u32) -> bool {
619 node < self.node_count && (!self.check_nodes || self.overlay_state.node_visible(node))
620 }
621
622 fn next_base(&mut self) -> Option<u32> {
624 while let Some(candidate) = self.base.next_raw(self.overlay_state) {
625 if self.admit(candidate) {
626 return Some(candidate);
627 }
628 }
629 None
630 }
631}
632
633impl Iterator for OverlayInPredecessors<'_> {
634 type Item = u32;
635
636 fn next(&mut self) -> Option<u32> {
637 if let Some(source) = self.next_base() {
638 return Some(source);
639 }
640 loop {
641 let candidate = *self.overlay.next()?;
642 if self.admit(candidate) {
643 return Some(candidate);
644 }
645 }
646 }
647}
648
649impl TopologyBase for OverlayInView<'_> {
650 type ElementId = u32;
651 type RelationId = u32;
652}
653
654impl ContainsElement for OverlayInView<'_> {
655 fn contains_element(&self, element: u32) -> bool {
656 (element as usize) < self.node_count
657 && (!self.flags.check_nodes || self.overlay.node_visible(element))
658 }
659}
660
661impl ElementIndex for OverlayInView<'_> {
662 fn element_bound(&self) -> usize {
663 self.node_count
664 }
665
666 fn element_index(&self, element: u32) -> usize {
667 element as usize
668 }
669}
670
671impl ElementPredecessors for OverlayInView<'_> {
672 type Predecessors<'iter>
673 = OverlayInPredecessors<'iter>
674 where
675 Self: 'iter;
676
677 fn element_predecessors(&self, element: u32) -> Self::Predecessors<'_> {
678 let base = if self.flags.use_unique {
679 InBaseSource::Unique(self.unique.incoming(element).iter())
680 } else {
681 InBaseSource::Parallel {
682 preds: self.inbound.0.element_predecessors(CscNodeId::new(element)),
683 forward: self.forward.0,
684 current: element,
685 edge_tombstones: self.overlay.has_edge_tombstones(),
686 }
687 };
688 let overlay_row: &[u32] = if self.flags.merge_overlay {
689 self.overlay.overlay_sources(element)
690 } else {
691 &[]
692 };
693 OverlayInPredecessors {
694 base,
695 overlay: overlay_row.iter(),
696 overlay_state: self.overlay,
697 check_nodes: self.flags.check_nodes,
698 node_count: u32::try_from(self.node_count).unwrap_or(u32::MAX),
699 }
700 }
701}