Skip to main content

oxgraph_postgres/
topology.rs

1//! Typed forward CSR and inbound CSC views opened once per engine.
2
3use alloc::vec::Vec;
4
5use oxgraph_csc::{CscNodeId, CscSnapshotGraph};
6use oxgraph_csr::{CsrNodeId, CsrSnapshotGraph};
7use oxgraph_graph::{
8    ContainsElement, EdgeTargetGraph, ElementIndex, ElementPredecessors, ElementSuccessors,
9    OutgoingGraph, OutgoingNeighborsGraph, TopologyBase, TopologyCounts,
10};
11use oxgraph_snapshot::Snapshot;
12
13use crate::{
14    artifact::{
15        SNAPSHOT_KIND_PG_INBOUND_OFFSETS_U32, SNAPSHOT_KIND_PG_INBOUND_TARGETS_U32, read_metadata,
16    },
17    error::{BuildError, PostgresGraphError},
18    overlay::OverlayState,
19};
20
21/// Section version the Postgres inbound CSC sections are written under, matching
22/// the CSR section version the layout export uses.
23const INBOUND_SECTION_VERSION: u32 = oxgraph_csr::SNAPSHOT_CSR_SECTION_VERSION;
24
25/// Outgoing adjacency — foundation CSR topology sections only.
26#[derive(Clone, Copy, Debug)]
27pub struct ForwardCsr<'view>(pub(crate) CsrSnapshotGraph<'view, u32, u32>);
28
29/// Incoming adjacency — Postgres inbound CSC sections only.
30#[derive(Clone, Copy, Debug)]
31pub struct InboundCsc<'view>(pub(crate) CscSnapshotGraph<'view>);
32
33/// Both topology views borrowing the same snapshot backing.
34#[derive(Clone, Copy, Debug, yoke::Yokeable)]
35#[yoke(prove_covariant)]
36pub struct GraphTopology<'view> {
37    /// Forward CSR over outgoing edges.
38    pub forward: ForwardCsr<'view>,
39    /// Inbound CSC over predecessor lists.
40    pub inbound: InboundCsc<'view>,
41}
42
43/// Engine-local node-unique adjacency derived from the parallel base topology.
44#[derive(Clone, Debug, Default, PartialEq, Eq)]
45#[expect(
46    clippy::redundant_pub_crate,
47    reason = "shared with engine and traversal modules"
48)]
49pub(crate) struct UniqueAdjacency {
50    /// Outgoing row offsets into [`Self::outgoing_targets`].
51    outgoing_offsets: Vec<usize>,
52    /// Sorted, deduplicated outgoing target ids.
53    outgoing_targets: Vec<u32>,
54    /// Incoming row offsets into [`Self::incoming_sources`].
55    incoming_offsets: Vec<usize>,
56    /// Sorted, deduplicated incoming predecessor ids.
57    incoming_sources: Vec<u32>,
58}
59
60impl UniqueAdjacency {
61    /// Builds node-unique outgoing and incoming adjacency from base topology views.
62    ///
63    /// # Performance
64    ///
65    /// This method is `O(n + m log d)` for `n` nodes, `m` parallel edge slots, and maximum
66    /// per-node degree `d`; it allocates `O(n + u)` memory for `u` unique adjacency slots.
67    #[must_use]
68    pub(crate) fn from_topology(forward: &ForwardCsr<'_>, inbound: &InboundCsc<'_>) -> Self {
69        let node_count = forward.node_count();
70        let (outgoing_offsets, outgoing_targets) =
71            Self::build_unique_rows(node_count, |node| forward.successors(node));
72        let (incoming_offsets, incoming_sources) =
73            Self::build_unique_rows(node_count, |node| inbound.predecessors(node));
74        Self {
75            outgoing_offsets,
76            outgoing_targets,
77            incoming_offsets,
78            incoming_sources,
79        }
80    }
81
82    /// Builds sorted, deduplicated rows from a parallel adjacency row iterator.
83    fn build_unique_rows<I>(
84        node_count: usize,
85        mut neighbors: impl FnMut(u32) -> I,
86    ) -> (Vec<usize>, Vec<u32>)
87    where
88        I: Iterator<Item = u32>,
89    {
90        let mut offsets = Vec::with_capacity(node_count.saturating_add(1));
91        let mut targets = Vec::new();
92        let mut scratch = Vec::new();
93        offsets.push(0);
94        let Ok(node_bound) = u32::try_from(node_count) else {
95            return (offsets, targets);
96        };
97        for node_id in 0..node_bound {
98            scratch.clear();
99            scratch.extend(neighbors(node_id));
100            scratch.sort_unstable();
101            scratch.dedup();
102            targets.extend_from_slice(&scratch);
103            offsets.push(targets.len());
104        }
105        (offsets, targets)
106    }
107
108    /// Returns the row slice for `node`, or an empty slice when out of bounds.
109    fn row<'a>(offsets: &[usize], targets: &'a [u32], node: u32) -> &'a [u32] {
110        let index = node as usize;
111        let Some((&start, &end)) = offsets.get(index).zip(offsets.get(index.saturating_add(1)))
112        else {
113            return &[];
114        };
115        &targets[start..end]
116    }
117
118    /// Returns sorted, node-unique outgoing targets for `source`.
119    ///
120    /// # Performance
121    ///
122    /// This method is `O(1)` to borrow the row.
123    #[must_use]
124    pub(crate) fn outgoing(&self, source: u32) -> &[u32] {
125        Self::row(&self.outgoing_offsets, &self.outgoing_targets, source)
126    }
127
128    /// Returns sorted, node-unique incoming predecessors for `target`.
129    ///
130    /// # Performance
131    ///
132    /// This method is `O(1)` to borrow the row.
133    #[must_use]
134    pub(crate) fn incoming(&self, target: u32) -> &[u32] {
135        Self::row(&self.incoming_offsets, &self.incoming_sources, target)
136    }
137}
138
139impl GraphTopology<'_> {
140    /// Returns whether `node` is visible for traversal under `direction`.
141    #[must_use]
142    pub(crate) fn node_visible(
143        &self,
144        node: u32,
145        direction: crate::traverse::TraversalDirection,
146        overlay: &OverlayState,
147    ) -> bool {
148        match direction {
149            crate::traverse::TraversalDirection::Out => self.forward.node_visible(node, overlay),
150            crate::traverse::TraversalDirection::In => self.inbound.node_visible(node, overlay),
151        }
152    }
153}
154
155impl<'view> GraphTopology<'view> {
156    /// Opens both layouts from validated snapshot bytes.
157    ///
158    /// # Errors
159    ///
160    /// Returns [`PostgresGraphError`] when metadata, sections, or cross-layout counts disagree.
161    ///
162    /// # Performance
163    ///
164    /// This function is `O(s + n + m)`.
165    pub fn open(snapshot: &Snapshot<'view>) -> Result<Self, PostgresGraphError> {
166        let metadata = read_metadata(snapshot)?;
167        if !metadata.has_reverse_index() {
168            return Err(PostgresGraphError::Build(BuildError::MissingReverseIndex));
169        }
170        let forward = ForwardCsr(CsrSnapshotGraph::from_snapshot(snapshot)?);
171        let inbound = InboundCsc(CscSnapshotGraph::from_snapshot_with_kinds(
172            snapshot,
173            SNAPSHOT_KIND_PG_INBOUND_OFFSETS_U32,
174            SNAPSHOT_KIND_PG_INBOUND_TARGETS_U32,
175            INBOUND_SECTION_VERSION,
176        )?);
177        if forward.0.element_count() != inbound.0.node_count() {
178            return Err(PostgresGraphError::Build(
179                BuildError::TopologyNodeCountMismatch,
180            ));
181        }
182        if forward.0.relation_count() != inbound.0.relation_count() {
183            return Err(PostgresGraphError::Build(
184                BuildError::TopologyEdgeCountMismatch,
185            ));
186        }
187        if u32::try_from(forward.0.element_count()).ok() != Some(metadata.node_count.get()) {
188            return Err(PostgresGraphError::Build(
189                BuildError::MetadataNodeCountMismatch,
190            ));
191        }
192        if u32::try_from(forward.0.relation_count()).ok() != Some(metadata.edge_count.get()) {
193            return Err(PostgresGraphError::Build(
194                BuildError::MetadataEdgeCountMismatch,
195            ));
196        }
197        Ok(Self { forward, inbound })
198    }
199}
200
201impl ForwardCsr<'_> {
202    /// Returns the node count in this forward view.
203    ///
204    /// # Performance
205    ///
206    /// This method is `O(1)`.
207    #[must_use]
208    pub fn node_count(&self) -> usize {
209        self.0.element_count()
210    }
211
212    /// Returns successor node ids for `source`.
213    ///
214    /// # Performance
215    ///
216    /// This method is `O(1)` to create and `O(k)` to yield `k` successors.
217    #[must_use]
218    pub fn successors(&self, source: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
219        self.0
220            .outgoing_neighbors(CsrNodeId::new(source))
221            .map(CsrNodeId::get)
222    }
223
224    /// Returns whether `node` is visible for traversal seeds and results.
225    #[must_use]
226    pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
227        (node as usize) < self.node_count()
228            && (!overlay.has_node_tombstones() || overlay.node_visible(node))
229    }
230}
231
232impl InboundCsc<'_> {
233    /// Returns the node count in this inbound view.
234    ///
235    /// # Performance
236    ///
237    /// This method is `O(1)`.
238    #[must_use]
239    pub fn node_count(&self) -> usize {
240        self.0.node_count()
241    }
242
243    /// Returns predecessor node ids for `target`.
244    ///
245    /// # Performance
246    ///
247    /// This method is `O(1)` to create and `O(k)` to yield `k` predecessors.
248    #[must_use]
249    pub fn predecessors(&self, target: u32) -> impl ExactSizeIterator<Item = u32> + '_ {
250        self.0
251            .predecessors(CscNodeId::new(target))
252            .map(CscNodeId::get)
253    }
254
255    /// Returns whether `node` is visible for traversal seeds and results.
256    #[must_use]
257    pub(crate) fn node_visible(&self, node: u32, overlay: &OverlayState) -> bool {
258        (node as usize) < self.node_count()
259            && (!overlay.has_node_tombstones() || overlay.node_visible(node))
260    }
261}
262
263/// Borrowed neighbor-resolution flags shared by both overlay topology views.
264///
265/// These are resolved once per query by the traverse profile and select the
266/// node-unique vs. parallel base walk, whether overlay adjacency is chained in,
267/// and whether node tombstones must be filtered during expansion.
268#[derive(Clone, Copy, Debug)]
269#[expect(
270    clippy::redundant_pub_crate,
271    reason = "shared with the traverse view-construction path"
272)]
273pub(crate) struct OverlayViewFlags {
274    /// Yield node-unique (sorted, deduplicated) base neighbors instead of the
275    /// parallel CSR/CSC walk.
276    pub use_unique: bool,
277    /// Chain overlay adjacency after the base neighbors.
278    pub merge_overlay: bool,
279    /// Filter tombstoned nodes from both yielded and expanded elements.
280    pub check_nodes: bool,
281}
282
283/// Outgoing overlay topology view binding the algo forward bounded BFS.
284///
285/// Wraps the base forward CSR plus the live overlay and resolves neighbors per
286/// [`OverlayViewFlags`]: node-unique deduplicated rows (the `UniqueAdjacency`
287/// semantics) or the parallel CSR walk with per-edge tombstone filtering, with
288/// overlay targets chained after the base row. Tombstoned nodes are neither
289/// yielded nor expanded.
290#[derive(Clone, Copy, Debug)]
291#[expect(
292    clippy::redundant_pub_crate,
293    reason = "constructed by the traverse module from engine-owned state"
294)]
295pub(crate) struct OverlayOutView<'view> {
296    /// Base forward CSR adjacency.
297    forward: ForwardCsr<'view>,
298    /// Live overlay edges and tombstones.
299    overlay: &'view OverlayState,
300    /// Engine-cached node-unique rows (borrowed for the unique path).
301    unique: &'view UniqueAdjacency,
302    /// Resolved neighbor-resolution flags.
303    flags: OverlayViewFlags,
304    /// Canonical exclusive node bound.
305    node_count: usize,
306}
307
308/// Inbound overlay topology view binding the algo reverse bounded BFS.
309///
310/// Wraps the base inbound CSC plus the forward CSR (needed for the edge
311/// tombstone slow path) and the live overlay. Predecessors are resolved per
312/// [`OverlayViewFlags`]: node-unique deduplicated rows or the parallel CSC walk,
313/// with overlay sources chained after the base row. When edge tombstones are
314/// active a predecessor `p` of `current` is visible only when the forward edge
315/// `p -> current` is not tombstoned.
316#[derive(Clone, Copy, Debug)]
317#[expect(
318    clippy::redundant_pub_crate,
319    reason = "constructed by the traverse module from engine-owned state"
320)]
321pub(crate) struct OverlayInView<'view> {
322    /// Base inbound CSC adjacency.
323    inbound: InboundCsc<'view>,
324    /// Base forward CSR (anchors the edge-tombstone forward-edge check).
325    forward: ForwardCsr<'view>,
326    /// Live overlay edges and tombstones.
327    overlay: &'view OverlayState,
328    /// Engine-cached node-unique rows (borrowed for the unique path).
329    unique: &'view UniqueAdjacency,
330    /// Resolved neighbor-resolution flags.
331    flags: OverlayViewFlags,
332    /// Canonical exclusive node bound.
333    node_count: usize,
334}
335
336impl<'view> OverlayOutView<'view> {
337    /// Builds the outgoing overlay view from engine-owned topology and overlay.
338    ///
339    /// # Performance
340    ///
341    /// This method is `O(1)`.
342    #[must_use]
343    pub(crate) const fn new(
344        topology: &GraphTopology<'view>,
345        overlay: &'view OverlayState,
346        unique: &'view UniqueAdjacency,
347        flags: OverlayViewFlags,
348        node_count: usize,
349    ) -> Self {
350        Self {
351            forward: topology.forward,
352            overlay,
353            unique,
354            flags,
355            node_count,
356        }
357    }
358}
359
360impl<'view> OverlayInView<'view> {
361    /// Builds the inbound overlay view from engine-owned topology and overlay.
362    ///
363    /// # Performance
364    ///
365    /// This method is `O(1)`.
366    #[must_use]
367    pub(crate) const fn new(
368        topology: &GraphTopology<'view>,
369        overlay: &'view OverlayState,
370        unique: &'view UniqueAdjacency,
371        flags: OverlayViewFlags,
372        node_count: usize,
373    ) -> Self {
374        Self {
375            inbound: topology.inbound,
376            forward: topology.forward,
377            overlay,
378            unique,
379            flags,
380            node_count,
381        }
382    }
383}
384
385/// Returns whether a forward edge `source -> target` is visible under edge
386/// tombstones (the documented inbound parallel slow path).
387fn has_visible_forward_edge(
388    forward: &CsrSnapshotGraph<'_, u32, u32>,
389    overlay: &OverlayState,
390    source: u32,
391    target: u32,
392) -> bool {
393    let target_id = CsrNodeId::new(target);
394    for edge in OutgoingGraph::outgoing_edges(forward, CsrNodeId::new(source)) {
395        if EdgeTargetGraph::target(forward, edge) == target_id && overlay.edge_visible(edge.get()) {
396            return true;
397        }
398    }
399    false
400}
401
402/// Base outgoing-neighbor source for [`OverlayOutSuccessors`].
403enum OutBaseSource<'view> {
404    /// Node-unique sorted, deduplicated row slice iterator.
405    Unique(core::slice::Iter<'view, u32>),
406    /// Parallel CSR edge walk with optional per-edge tombstone filtering.
407    Parallel {
408        /// Copy of the forward CSR graph for target/edge resolution.
409        graph: CsrSnapshotGraph<'view, u32, u32>,
410        /// Remaining outgoing edge slots for the expanded node.
411        edges: oxgraph_csr::CsrOutEdges<u32>,
412        /// Whether per-edge tombstone filtering is active.
413        edge_tombstones: bool,
414    },
415}
416
417impl OutBaseSource<'_> {
418    /// Yields the next base target, applying per-edge tombstone filtering on the
419    /// parallel path. Returns `None` when the base source is exhausted.
420    fn next_raw(&mut self, overlay: &OverlayState) -> Option<u32> {
421        match self {
422            Self::Unique(iter) => iter.next().copied(),
423            Self::Parallel {
424                graph,
425                edges,
426                edge_tombstones,
427            } => loop {
428                let edge = edges.next()?;
429                if *edge_tombstones && !overlay.edge_visible(edge.get()) {
430                    continue;
431                }
432                return Some(EdgeTargetGraph::target(graph, edge).get());
433            },
434        }
435    }
436}
437
438/// Successor iterator for [`OverlayOutView`] chaining base then overlay rows.
439///
440/// Yields only in-bound, node-visible targets so tombstoned nodes are neither
441/// emitted nor expanded by the algo BFS.
442///
443/// # Performance
444///
445/// Advancing is amortized `O(1)`; the edge-tombstone parallel path is the
446/// documented slow path (`perf: unspecified`).
447#[expect(
448    clippy::redundant_pub_crate,
449    reason = "associated successor type of a crate-internal topology view"
450)]
451pub(crate) struct OverlayOutSuccessors<'view> {
452    /// Base neighbor source (unique row or parallel walk).
453    base: OutBaseSource<'view>,
454    /// Overlay target row iterator chained after the base source.
455    overlay: core::slice::Iter<'view, u32>,
456    /// Live overlay for tombstone filtering.
457    overlay_state: &'view OverlayState,
458    /// Whether node tombstones must be filtered.
459    check_nodes: bool,
460    /// Exclusive node bound.
461    node_count: u32,
462}
463
464impl OverlayOutSuccessors<'_> {
465    /// Returns whether `node` is in bounds and node-visible.
466    fn admit(&self, node: u32) -> bool {
467        node < self.node_count && (!self.check_nodes || self.overlay_state.node_visible(node))
468    }
469
470    /// Pulls the next admitted base target, advancing past filtered slots.
471    fn next_base(&mut self) -> Option<u32> {
472        while let Some(candidate) = self.base.next_raw(self.overlay_state) {
473            if self.admit(candidate) {
474                return Some(candidate);
475            }
476        }
477        None
478    }
479}
480
481impl Iterator for OverlayOutSuccessors<'_> {
482    type Item = u32;
483
484    fn next(&mut self) -> Option<u32> {
485        if let Some(target) = self.next_base() {
486            return Some(target);
487        }
488        loop {
489            let candidate = *self.overlay.next()?;
490            if self.admit(candidate) {
491                return Some(candidate);
492            }
493        }
494    }
495}
496
497impl TopologyBase for OverlayOutView<'_> {
498    type ElementId = u32;
499    type RelationId = u32;
500}
501
502impl ContainsElement for OverlayOutView<'_> {
503    fn contains_element(&self, element: u32) -> bool {
504        (element as usize) < self.node_count
505            && (!self.flags.check_nodes || self.overlay.node_visible(element))
506    }
507}
508
509impl ElementIndex for OverlayOutView<'_> {
510    fn element_bound(&self) -> usize {
511        self.node_count
512    }
513
514    fn element_index(&self, element: u32) -> usize {
515        element as usize
516    }
517}
518
519impl ElementSuccessors for OverlayOutView<'_> {
520    type Successors<'iter>
521        = OverlayOutSuccessors<'iter>
522    where
523        Self: 'iter;
524
525    fn element_successors(&self, element: u32) -> Self::Successors<'_> {
526        let base = if self.flags.use_unique {
527            OutBaseSource::Unique(self.unique.outgoing(element).iter())
528        } else {
529            OutBaseSource::Parallel {
530                graph: self.forward.0,
531                edges: OutgoingGraph::outgoing_edges(&self.forward.0, CsrNodeId::new(element)),
532                edge_tombstones: self.overlay.has_edge_tombstones(),
533            }
534        };
535        let overlay_row: &[u32] = if self.flags.merge_overlay {
536            self.overlay.overlay_targets(element)
537        } else {
538            &[]
539        };
540        OverlayOutSuccessors {
541            base,
542            overlay: overlay_row.iter(),
543            overlay_state: self.overlay,
544            check_nodes: self.flags.check_nodes,
545            node_count: u32::try_from(self.node_count).unwrap_or(u32::MAX),
546        }
547    }
548}
549
550/// Base incoming-neighbor source for [`OverlayInPredecessors`].
551enum InBaseSource<'view> {
552    /// Node-unique sorted, deduplicated row slice iterator.
553    Unique(core::slice::Iter<'view, u32>),
554    /// Parallel CSC predecessor walk with optional forward-edge tombstone check.
555    Parallel {
556        /// Remaining predecessor slots for the expanded node.
557        preds: oxgraph_csc::CscPredecessors<'view, u32, u32>,
558        /// Copy of the forward CSR graph for the forward-edge tombstone check.
559        forward: CsrSnapshotGraph<'view, u32, u32>,
560        /// The expanded node; predecessors are filtered against `p -> current`.
561        current: u32,
562        /// Whether the forward-edge tombstone check is active.
563        edge_tombstones: bool,
564    },
565}
566
567impl InBaseSource<'_> {
568    /// Yields the next base source, applying the forward-edge tombstone check on
569    /// the parallel path. Returns `None` when the base source is exhausted.
570    fn next_raw(&mut self, overlay: &OverlayState) -> Option<u32> {
571        match self {
572            Self::Unique(iter) => iter.next().copied(),
573            Self::Parallel {
574                preds,
575                forward,
576                current,
577                edge_tombstones,
578            } => loop {
579                let pred = preds.next()?.get();
580                if *edge_tombstones && !has_visible_forward_edge(forward, overlay, pred, *current) {
581                    continue;
582                }
583                return Some(pred);
584            },
585        }
586    }
587}
588
589/// Predecessor iterator for [`OverlayInView`] chaining base then overlay rows.
590///
591/// Yields only in-bound, node-visible sources. The edge-tombstone slow path
592/// keeps a predecessor `p` of `current` only when the forward edge
593/// `p -> current` is not tombstoned.
594///
595/// # Performance
596///
597/// Advancing is amortized `O(1)`; the edge-tombstone path is the documented
598/// slow path (`perf: unspecified`, worst case `O(degree)` per predecessor).
599#[expect(
600    clippy::redundant_pub_crate,
601    reason = "associated predecessor type of a crate-internal topology view"
602)]
603pub(crate) struct OverlayInPredecessors<'view> {
604    /// Base predecessor source (unique row or parallel walk).
605    base: InBaseSource<'view>,
606    /// Overlay source row iterator chained after the base source.
607    overlay: core::slice::Iter<'view, u32>,
608    /// Live overlay for tombstone filtering.
609    overlay_state: &'view OverlayState,
610    /// Whether node tombstones must be filtered.
611    check_nodes: bool,
612    /// Exclusive node bound.
613    node_count: u32,
614}
615
616impl OverlayInPredecessors<'_> {
617    /// Returns whether `node` is in bounds and node-visible.
618    fn admit(&self, node: u32) -> bool {
619        node < self.node_count && (!self.check_nodes || self.overlay_state.node_visible(node))
620    }
621
622    /// Pulls the next admitted base source, advancing past filtered slots.
623    fn next_base(&mut self) -> Option<u32> {
624        while let Some(candidate) = self.base.next_raw(self.overlay_state) {
625            if self.admit(candidate) {
626                return Some(candidate);
627            }
628        }
629        None
630    }
631}
632
633impl Iterator for OverlayInPredecessors<'_> {
634    type Item = u32;
635
636    fn next(&mut self) -> Option<u32> {
637        if let Some(source) = self.next_base() {
638            return Some(source);
639        }
640        loop {
641            let candidate = *self.overlay.next()?;
642            if self.admit(candidate) {
643                return Some(candidate);
644            }
645        }
646    }
647}
648
649impl TopologyBase for OverlayInView<'_> {
650    type ElementId = u32;
651    type RelationId = u32;
652}
653
654impl ContainsElement for OverlayInView<'_> {
655    fn contains_element(&self, element: u32) -> bool {
656        (element as usize) < self.node_count
657            && (!self.flags.check_nodes || self.overlay.node_visible(element))
658    }
659}
660
661impl ElementIndex for OverlayInView<'_> {
662    fn element_bound(&self) -> usize {
663        self.node_count
664    }
665
666    fn element_index(&self, element: u32) -> usize {
667        element as usize
668    }
669}
670
671impl ElementPredecessors for OverlayInView<'_> {
672    type Predecessors<'iter>
673        = OverlayInPredecessors<'iter>
674    where
675        Self: 'iter;
676
677    fn element_predecessors(&self, element: u32) -> Self::Predecessors<'_> {
678        let base = if self.flags.use_unique {
679            InBaseSource::Unique(self.unique.incoming(element).iter())
680        } else {
681            InBaseSource::Parallel {
682                preds: self.inbound.0.element_predecessors(CscNodeId::new(element)),
683                forward: self.forward.0,
684                current: element,
685                edge_tombstones: self.overlay.has_edge_tombstones(),
686            }
687        };
688        let overlay_row: &[u32] = if self.flags.merge_overlay {
689            self.overlay.overlay_sources(element)
690        } else {
691            &[]
692        };
693        OverlayInPredecessors {
694            base,
695            overlay: overlay_row.iter(),
696            overlay_state: self.overlay,
697            check_nodes: self.flags.check_nodes,
698            node_count: u32::try_from(self.node_count).unwrap_or(u32::MAX),
699        }
700    }
701}