use alloc::vec::Vec;
use oxgraph_csc::{CscNodeId, CscSnapshotGraph};
use oxgraph_csr::{CsrNodeId, CsrSnapshotGraph};
use oxgraph_graph::{
ContainsElement, EdgeTargetGraph, ElementIndex, ElementPredecessors, ElementSuccessors,
OutgoingGraph, OutgoingNeighborsGraph, TopologyBase, TopologyCounts,
};
use oxgraph_snapshot::Snapshot;
use crate::{
artifact::{
SNAPSHOT_KIND_PG_INBOUND_OFFSETS_U32, SNAPSHOT_KIND_PG_INBOUND_TARGETS_U32, read_metadata,
},
error::{BuildError, PostgresGraphError},
overlay::OverlayState,
};
const INBOUND_SECTION_VERSION: u32 = oxgraph_csr::SNAPSHOT_CSR_SECTION_VERSION;
#[derive(Clone, Copy, Debug)]
pub struct ForwardCsr<'view>(pub(crate) CsrSnapshotGraph<'view, u32, u32>);
#[derive(Clone, Copy, Debug)]
pub struct InboundCsc<'view>(pub(crate) CscSnapshotGraph<'view>);
#[derive(Clone, Copy, Debug, yoke::Yokeable)]
#[yoke(prove_covariant)]
pub struct GraphTopology<'view> {
pub forward: ForwardCsr<'view>,
pub inbound: InboundCsc<'view>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[expect(
clippy::redundant_pub_crate,
reason = "shared with engine and traversal modules"
)]
pub(crate) struct UniqueAdjacency {
outgoing_offsets: Vec<usize>,
outgoing_targets: Vec<u32>,
incoming_offsets: Vec<usize>,
incoming_sources: Vec<u32>,
}
impl UniqueAdjacency {
#[must_use]
pub(crate) fn from_topology(forward: &ForwardCsr<'_>, inbound: &InboundCsc<'_>) -> Self {
let node_count = forward.node_count();
let (outgoing_offsets, outgoing_targets) =
Self::build_unique_rows(node_count, |node| forward.successors(node));
let (incoming_offsets, incoming_sources) =
Self::build_unique_rows(node_count, |node| inbound.predecessors(node));
Self {
outgoing_offsets,
outgoing_targets,
incoming_offsets,
incoming_sources,
}
}
fn build_unique_rows<I>(
node_count: usize,
mut neighbors: impl FnMut(u32) -> I,
) -> (Vec<usize>, Vec<u32>)
where
I: Iterator<Item = u32>,
{
let mut offsets = Vec::with_capacity(node_count.saturating_add(1));
let mut targets = Vec::new();
let mut scratch = Vec::new();
offsets.push(0);
let Ok(node_bound) = u32::try_from(node_count) else {
return (offsets, targets);
};
for node_id in 0..node_bound {
scratch.clear();
scratch.extend(neighbors(node_id));
scratch.sort_unstable();
scratch.dedup();
targets.extend_from_slice(&scratch);
offsets.push(targets.len());
}
(offsets, targets)
}
fn row<'a>(offsets: &[usize], targets: &'a [u32], node: u32) -> &'a [u32] {
let index = node as usize;
let Some((&start, &end)) = offsets.get(index).zip(offsets.get(index.saturating_add(1)))
else {
return &[];
};
&targets[start..end]
}
#[must_use]
pub(crate) fn outgoing(&self, source: u32) -> &[u32] {
Self::row(&self.outgoing_offsets, &self.outgoing_targets, source)
}
#[must_use]
pub(crate) fn incoming(&self, target: u32) -> &[u32] {
Self::row(&self.incoming_offsets, &self.incoming_sources, target)
}
}
impl GraphTopology<'_> {
#[must_use]
pub(crate) fn node_visible(
&self,
node: u32,
direction: crate::traverse::TraversalDirection,
overlay: &OverlayState,
) -> bool {
match direction {
crate::traverse::TraversalDirection::Out => self.forward.node_visible(node, overlay),
crate::traverse::TraversalDirection::In => self.inbound.node_visible(node, overlay),
}
}
}
impl<'view> GraphTopology<'view> {
pub fn open(snapshot: &Snapshot<'view>) -> Result<Self, PostgresGraphError> {
let metadata = read_metadata(snapshot)?;
if !metadata.has_reverse_index() {
return Err(PostgresGraphError::Build(BuildError::MissingReverseIndex));
}
let forward = ForwardCsr(CsrSnapshotGraph::from_snapshot(snapshot)?);
let inbound = InboundCsc(CscSnapshotGraph::from_snapshot_with_kinds(
snapshot,
SNAPSHOT_KIND_PG_INBOUND_OFFSETS_U32,
SNAPSHOT_KIND_PG_INBOUND_TARGETS_U32,
INBOUND_SECTION_VERSION,
)?);
if forward.0.element_count() != inbound.0.node_count() {
return Err(PostgresGraphError::Build(
BuildError::TopologyNodeCountMismatch,
));
}
if forward.0.relation_count() != inbound.0.relation_count() {
return Err(PostgresGraphError::Build(
BuildError::TopologyEdgeCountMismatch,
));
}
if u32::try_from(forward.0.element_count()).ok() != Some(metadata.node_count.get()) {
return Err(PostgresGraphError::Build(
BuildError::MetadataNodeCountMismatch,
));
}
if u32::try_from(forward.0.relation_count()).ok() != Some(metadata.edge_count.get()) {
return Err(PostgresGraphError::Build(
BuildError::MetadataEdgeCountMismatch,
));
}
Ok(Self { forward, inbound })
}
}
impl ForwardCsr<'_> {
#[must_use]
pub fn node_count(&self) -> usize {
self.0.element_count()
}
#[must_use]
pub fn successors(&self, source: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
self.0
.outgoing_neighbors(CsrNodeId::new(source))
.map(CsrNodeId::get)
}
#[must_use]
pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
(node as usize) < self.node_count()
&& (!overlay.has_node_tombstones() || overlay.node_visible(node))
}
}
impl InboundCsc<'_> {
#[must_use]
pub fn node_count(&self) -> usize {
self.0.node_count()
}
#[must_use]
pub fn predecessors(&self, target: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
self.0
.predecessors(CscNodeId::new(target))
.map(CscNodeId::get)
}
#[must_use]
pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
(node as usize) < self.node_count()
&& (!overlay.has_node_tombstones() || overlay.node_visible(node))
}
}
#[derive(Clone, Copy, Debug)]
#[expect(
clippy::redundant_pub_crate,
reason = "shared with the traverse view-construction path"
)]
pub(crate) struct OverlayViewFlags {
pub use_unique: bool,
pub merge_overlay: bool,
pub check_nodes: bool,
}
#[derive(Clone, Copy, Debug)]
#[expect(
clippy::redundant_pub_crate,
reason = "constructed by the traverse module from engine-owned state"
)]
pub(crate) struct OverlayOutView<'view> {
forward: ForwardCsr<'view>,
overlay: &'view OverlayState,
unique: &'view UniqueAdjacency,
flags: OverlayViewFlags,
node_count: usize,
}
#[derive(Clone, Copy, Debug)]
#[expect(
clippy::redundant_pub_crate,
reason = "constructed by the traverse module from engine-owned state"
)]
pub(crate) struct OverlayInView<'view> {
inbound: InboundCsc<'view>,
forward: ForwardCsr<'view>,
overlay: &'view OverlayState,
unique: &'view UniqueAdjacency,
flags: OverlayViewFlags,
node_count: usize,
}
impl<'view> OverlayOutView<'view> {
#[must_use]
pub(crate) const fn new(
topology: &GraphTopology<'view>,
overlay: &'view OverlayState,
unique: &'view UniqueAdjacency,
flags: OverlayViewFlags,
node_count: usize,
) -> Self {
Self {
forward: topology.forward,
overlay,
unique,
flags,
node_count,
}
}
}
impl<'view> OverlayInView<'view> {
#[must_use]
pub(crate) const fn new(
topology: &GraphTopology<'view>,
overlay: &'view OverlayState,
unique: &'view UniqueAdjacency,
flags: OverlayViewFlags,
node_count: usize,
) -> Self {
Self {
inbound: topology.inbound,
forward: topology.forward,
overlay,
unique,
flags,
node_count,
}
}
}
fn has_visible_forward_edge(
forward: &CsrSnapshotGraph<'_, u32, u32>,
overlay: &OverlayState,
source: u32,
target: u32,
) -> bool {
let target_id = CsrNodeId::new(target);
for edge in OutgoingGraph::outgoing_edges(forward, CsrNodeId::new(source)) {
if EdgeTargetGraph::target(forward, edge) == target_id && overlay.edge_visible(edge.get()) {
return true;
}
}
false
}
enum OutBaseSource<'view> {
Unique(core::slice::Iter<'view, u32>),
Parallel {
graph: CsrSnapshotGraph<'view, u32, u32>,
edges: oxgraph_csr::CsrOutEdges<u32>,
edge_tombstones: bool,
},
}
impl OutBaseSource<'_> {
fn next_raw(&mut self, overlay: &OverlayState) -> Option<u32> {
match self {
Self::Unique(iter) => iter.next().copied(),
Self::Parallel {
graph,
edges,
edge_tombstones,
} => loop {
let edge = edges.next()?;
if *edge_tombstones && !overlay.edge_visible(edge.get()) {
continue;
}
return Some(EdgeTargetGraph::target(graph, edge).get());
},
}
}
}
#[expect(
clippy::redundant_pub_crate,
reason = "associated successor type of a crate-internal topology view"
)]
pub(crate) struct OverlayOutSuccessors<'view> {
base: OutBaseSource<'view>,
overlay: core::slice::Iter<'view, u32>,
overlay_state: &'view OverlayState,
check_nodes: bool,
node_count: u32,
}
impl OverlayOutSuccessors<'_> {
fn admit(&self, node: u32) -> bool {
node < self.node_count && (!self.check_nodes || self.overlay_state.node_visible(node))
}
fn next_base(&mut self) -> Option<u32> {
while let Some(candidate) = self.base.next_raw(self.overlay_state) {
if self.admit(candidate) {
return Some(candidate);
}
}
None
}
}
impl Iterator for OverlayOutSuccessors<'_> {
type Item = u32;
fn next(&mut self) -> Option<u32> {
if let Some(target) = self.next_base() {
return Some(target);
}
loop {
let candidate = *self.overlay.next()?;
if self.admit(candidate) {
return Some(candidate);
}
}
}
}
impl TopologyBase for OverlayOutView<'_> {
type ElementId = u32;
type RelationId = u32;
}
impl ContainsElement for OverlayOutView<'_> {
fn contains_element(&self, element: u32) -> bool {
(element as usize) < self.node_count
&& (!self.flags.check_nodes || self.overlay.node_visible(element))
}
}
impl ElementIndex for OverlayOutView<'_> {
fn element_bound(&self) -> usize {
self.node_count
}
fn element_index(&self, element: u32) -> usize {
element as usize
}
}
impl ElementSuccessors for OverlayOutView<'_> {
type Successors<'iter>
= OverlayOutSuccessors<'iter>
where
Self: 'iter;
fn element_successors(&self, element: u32) -> Self::Successors<'_> {
let base = if self.flags.use_unique {
OutBaseSource::Unique(self.unique.outgoing(element).iter())
} else {
OutBaseSource::Parallel {
graph: self.forward.0,
edges: OutgoingGraph::outgoing_edges(&self.forward.0, CsrNodeId::new(element)),
edge_tombstones: self.overlay.has_edge_tombstones(),
}
};
let overlay_row: &[u32] = if self.flags.merge_overlay {
self.overlay.overlay_targets(element)
} else {
&[]
};
OverlayOutSuccessors {
base,
overlay: overlay_row.iter(),
overlay_state: self.overlay,
check_nodes: self.flags.check_nodes,
node_count: u32::try_from(self.node_count).unwrap_or(u32::MAX),
}
}
}
enum InBaseSource<'view> {
Unique(core::slice::Iter<'view, u32>),
Parallel {
preds: oxgraph_csc::CscPredecessors<'view, u32, u32>,
forward: CsrSnapshotGraph<'view, u32, u32>,
current: u32,
edge_tombstones: bool,
},
}
impl InBaseSource<'_> {
fn next_raw(&mut self, overlay: &OverlayState) -> Option<u32> {
match self {
Self::Unique(iter) => iter.next().copied(),
Self::Parallel {
preds,
forward,
current,
edge_tombstones,
} => loop {
let pred = preds.next()?.get();
if *edge_tombstones && !has_visible_forward_edge(forward, overlay, pred, *current) {
continue;
}
return Some(pred);
},
}
}
}
#[expect(
clippy::redundant_pub_crate,
reason = "associated predecessor type of a crate-internal topology view"
)]
pub(crate) struct OverlayInPredecessors<'view> {
base: InBaseSource<'view>,
overlay: core::slice::Iter<'view, u32>,
overlay_state: &'view OverlayState,
check_nodes: bool,
node_count: u32,
}
impl OverlayInPredecessors<'_> {
fn admit(&self, node: u32) -> bool {
node < self.node_count && (!self.check_nodes || self.overlay_state.node_visible(node))
}
fn next_base(&mut self) -> Option<u32> {
while let Some(candidate) = self.base.next_raw(self.overlay_state) {
if self.admit(candidate) {
return Some(candidate);
}
}
None
}
}
impl Iterator for OverlayInPredecessors<'_> {
type Item = u32;
fn next(&mut self) -> Option<u32> {
if let Some(source) = self.next_base() {
return Some(source);
}
loop {
let candidate = *self.overlay.next()?;
if self.admit(candidate) {
return Some(candidate);
}
}
}
}
impl TopologyBase for OverlayInView<'_> {
type ElementId = u32;
type RelationId = u32;
}
impl ContainsElement for OverlayInView<'_> {
fn contains_element(&self, element: u32) -> bool {
(element as usize) < self.node_count
&& (!self.flags.check_nodes || self.overlay.node_visible(element))
}
}
impl ElementIndex for OverlayInView<'_> {
fn element_bound(&self) -> usize {
self.node_count
}
fn element_index(&self, element: u32) -> usize {
element as usize
}
}
impl ElementPredecessors for OverlayInView<'_> {
type Predecessors<'iter>
= OverlayInPredecessors<'iter>
where
Self: 'iter;
fn element_predecessors(&self, element: u32) -> Self::Predecessors<'_> {
let base = if self.flags.use_unique {
InBaseSource::Unique(self.unique.incoming(element).iter())
} else {
InBaseSource::Parallel {
preds: self.inbound.0.element_predecessors(CscNodeId::new(element)),
forward: self.forward.0,
current: element,
edge_tombstones: self.overlay.has_edge_tombstones(),
}
};
let overlay_row: &[u32] = if self.flags.merge_overlay {
self.overlay.overlay_sources(element)
} else {
&[]
};
OverlayInPredecessors {
base,
overlay: overlay_row.iter(),
overlay_state: self.overlay,
check_nodes: self.flags.check_nodes,
node_count: u32::try_from(self.node_count).unwrap_or(u32::MAX),
}
}
}