Skip to main content

uni_query/query/df_graph/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │      DataFusion ExecutionPlan Tree      │
20//! ├─────────────────────────────────────────┤
21//! │  ProjectionExec (DataFusion)            │
22//! │       │                                 │
23//! │  FilterExec (DataFusion)                │
24//! │       │                                 │
25//! │  GraphTraverseExec (CUSTOM)             │
26//! │       │                                 │
27//! │  GraphScanExec (CUSTOM)                 │
28//! │       │                                 │
29//! │  UniTableProvider + UniMergeExec        │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_profile;
63pub mod locy_program;
64pub mod locy_query;
65pub mod locy_slg;
66pub mod locy_traits;
67pub mod locy_validate;
68pub mod mutation_common;
69pub mod mutation_delete;
70pub mod mutation_foreach;
71pub mod mutation_remove;
72pub mod mutation_set;
73pub mod nfa;
74pub mod optional_filter;
75pub mod pattern_comprehension;
76pub mod pattern_exists;
77pub mod pred_dag;
78pub mod procedure_call;
79pub mod quantifier;
80pub mod raw_bytes_marker;
81mod read_set_exec;
82pub mod recursive_cte;
83pub mod reduce;
84pub mod scan;
85pub mod shortest_path;
86pub(crate) mod similar_to_expr;
87pub mod traverse;
88pub mod unwind;
89pub mod vector_knn;
90pub mod vid_lookup_join;
91
92use crate::query::executor::procedure::ProcedureRegistry;
93use parking_lot::RwLock;
94use std::sync::{Arc, Mutex};
95use std::time::Instant;
96use uni_algo::algo::AlgorithmRegistry;
97use uni_common::core::id::{Eid, Vid};
98use uni_store::runtime::context::QueryContext;
99use uni_store::runtime::l0::L0Buffer;
100use uni_store::runtime::property_manager::PropertyManager;
101use uni_store::storage::adjacency_manager::AdjacencyManager;
102use uni_store::storage::direction::Direction;
103use uni_store::storage::manager::StorageManager;
104
105pub mod search_procedures;
106use uni_xervo::runtime::ModelRuntime;
107
108use crate::types::QueryWarning;
109
110pub use apply::GraphApplyExec;
111pub use ext_id_lookup::GraphExtIdLookupExec;
112// CREATE and MERGE both execute through `MutationExec`; these aliases and
113// the `new_*_exec` builders are re-exported here so the planner can refer to
114// them by their clause-specific names without a dedicated module each.
115pub use mutation_common::{
116    MutationContext, MutationExec, MutationExec as MutationCreateExec,
117    MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
118};
119pub use mutation_delete::MutationDeleteExec;
120pub use mutation_foreach::ForeachExec;
121pub use mutation_remove::MutationRemoveExec;
122pub use mutation_set::MutationSetExec;
123pub use optional_filter::OptionalFilterExec;
124pub use procedure_call::GraphProcedureCallExec;
125pub use read_set_exec::ReadSetRecordingExec;
126pub use scan::GraphScanExec;
127pub use shortest_path::GraphShortestPathExec;
128pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
129pub use unwind::GraphUnwindExec;
130pub use vector_knn::GraphVectorKnnExec;
131
132pub use locy_best_by::BestByExec;
133pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
134pub use locy_fixpoint::{
135    DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
136    FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
137};
138pub use locy_fold::FoldExec;
139pub use locy_priority::PriorityExec;
140pub use locy_program::{DerivedStore, LocyProgramExec};
141pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
142
143/// Shared context for graph operators.
144///
145/// Provides access to graph-specific resources needed during query execution:
146/// - CSR adjacency cache for fast neighbor lookups
147/// - L0 buffers for MVCC visibility of uncommitted changes
148/// - Property manager for lazy-loading vertex/edge properties
149/// - Storage manager for schema and dataset access
150///
151/// # Example
152///
153/// ```ignore
154/// let ctx = GraphExecutionContext::new(
155///     storage_manager,
156///     l0_buffer,
157///     property_manager,
158/// );
159///
160/// // Get neighbors with L0 overlay
161/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
162/// ```
163pub struct GraphExecutionContext {
164    /// Storage manager for schema and dataset access.
165    storage: Arc<StorageManager>,
166
167    /// L0 visibility context for MVCC.
168    l0_context: L0Context,
169
170    /// Property manager for lazy property loading.
171    property_manager: Arc<PropertyManager>,
172
173    /// Query timeout deadline.
174    deadline: Option<Instant>,
175
176    /// Algorithm registry for `uni.algo.*` procedure dispatch.
177    algo_registry: Option<Arc<AlgorithmRegistry>>,
178
179    /// External procedure registry for test/user-defined procedures.
180    procedure_registry: Option<Arc<ProcedureRegistry>>,
181    /// Plugin registry — used by the native-label scan dispatcher
182    /// (M5h.2) to route a label's reads through plugin `Storage` when
183    /// one is registered via `PluginRegistry::register_label_storage`.
184    plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
185    /// Uni-Xervo runtime used by vector auto-embedding paths.
186    xervo_runtime: Option<Arc<ModelRuntime>>,
187
188    /// Runtime warnings collected during query execution.
189    warnings: Arc<Mutex<Vec<QueryWarning>>>,
190
191    /// Cooperative cancellation token, threaded from `QueryContext`.
192    cancellation_token: Option<tokio_util::sync::CancellationToken>,
193
194    /// Outer transaction's writer handle (FU-1 / M11 #6). Threaded
195    /// from the [`crate::Executor`] when the query is running inside a
196    /// write-mode transaction; consumed by
197    /// `QueryProcedureHost::with_writer` at procedure invocation time
198    /// so a declared `WRITE`-mode procedure's Cypher body can mutate
199    /// the outer transaction's L0. `Arc<Writer>` (interior-mutable,
200    /// no outer lock) matches the executor's writer handle type.
201    writer: Option<Arc<uni_store::Writer>>,
202}
203
204impl std::fmt::Debug for GraphExecutionContext {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        f.debug_struct("GraphExecutionContext")
207            .field("l0_context", &self.l0_context)
208            .field("deadline", &self.deadline)
209            .finish_non_exhaustive()
210    }
211}
212
213/// L0 buffer visibility context for MVCC reads.
214///
215/// Maintains references to all L0 buffers that should be visible to a query:
216/// - Current L0: The active write buffer
217/// - Transaction L0: Buffer for the current transaction (if any)
218/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
219///
220/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
221#[derive(Clone, Default)]
222pub struct L0Context {
223    /// Current active L0 buffer.
224    pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
225
226    /// Transaction-local L0 buffer (if in a transaction).
227    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
228
229    /// L0 buffers pending flush to disk.
230    /// These remain visible until flush completes.
231    pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
232}
233
234impl std::fmt::Debug for L0Context {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        f.debug_struct("L0Context")
237            .field("current_l0", &self.current_l0.is_some())
238            .field("transaction_l0", &self.transaction_l0.is_some())
239            .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
240            .finish()
241    }
242}
243
244impl L0Context {
245    /// Create an empty L0 context with no buffers.
246    pub fn empty() -> Self {
247        Self::default()
248    }
249
250    /// Create L0 context with just a current buffer.
251    pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
252        Self {
253            current_l0: Some(l0),
254            ..Self::default()
255        }
256    }
257
258    /// Create L0 context from a query context.
259    pub fn from_query_context(ctx: &QueryContext) -> Self {
260        Self {
261            current_l0: Some(ctx.l0.clone()),
262            transaction_l0: ctx.transaction_l0.clone(),
263            pending_flush_l0s: ctx.pending_flush_l0s.clone(),
264        }
265    }
266
267    /// Iterate over all L0 buffers in visibility order.
268    /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
269    pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
270        self.pending_flush_l0s
271            .iter()
272            .chain(self.current_l0.iter())
273            .chain(self.transaction_l0.iter())
274    }
275}
276
277impl GraphExecutionContext {
278    /// Shared constructor for the public entry points. The three public
279    /// constructors differ only in the L0 visibility context, deadline,
280    /// and cancellation token; every other field starts at its default.
281    fn with_parts(
282        storage: Arc<StorageManager>,
283        l0_context: L0Context,
284        property_manager: Arc<PropertyManager>,
285        deadline: Option<Instant>,
286        cancellation_token: Option<tokio_util::sync::CancellationToken>,
287    ) -> Self {
288        Self {
289            storage,
290            l0_context,
291            property_manager,
292            deadline,
293            algo_registry: None,
294            procedure_registry: None,
295            plugin_registry: None,
296            xervo_runtime: None,
297            warnings: Arc::new(Mutex::new(Vec::new())),
298            cancellation_token,
299            writer: None,
300        }
301    }
302
303    /// Create a new graph execution context.
304    ///
305    /// # Arguments
306    ///
307    /// * `storage` - Storage manager for schema and dataset access
308    /// * `l0` - Current L0 buffer for MVCC visibility
309    /// * `property_manager` - Manager for lazy property loading
310    pub fn new(
311        storage: Arc<StorageManager>,
312        l0: Arc<RwLock<L0Buffer>>,
313        property_manager: Arc<PropertyManager>,
314    ) -> Self {
315        Self::with_parts(
316            storage,
317            L0Context::with_current(l0),
318            property_manager,
319            None,
320            None,
321        )
322    }
323
324    /// Create context with full L0 visibility.
325    ///
326    /// # Arguments
327    ///
328    /// * `storage` - Storage manager for schema and dataset access
329    /// * `l0_context` - L0 visibility context with all buffers
330    /// * `property_manager` - Manager for lazy property loading
331    pub fn with_l0_context(
332        storage: Arc<StorageManager>,
333        l0_context: L0Context,
334        property_manager: Arc<PropertyManager>,
335    ) -> Self {
336        Self::with_parts(storage, l0_context, property_manager, None, None)
337    }
338
339    /// Create context from a query context.
340    pub fn from_query_context(
341        storage: Arc<StorageManager>,
342        query_ctx: &QueryContext,
343        property_manager: Arc<PropertyManager>,
344    ) -> Self {
345        Self::with_parts(
346            storage,
347            L0Context::from_query_context(query_ctx),
348            property_manager,
349            query_ctx.deadline,
350            query_ctx.cancellation_token.clone(),
351        )
352    }
353
354    /// Set query timeout deadline.
355    pub fn with_deadline(mut self, deadline: Instant) -> Self {
356        self.deadline = Some(deadline);
357        self
358    }
359
360    /// Attach the outer transaction's writer handle so declared
361    /// `WRITE`-mode procedures invoked through this context can run
362    /// their Cypher bodies via the write-enabled inner-query host.
363    #[must_use]
364    pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
365        self.writer = Some(writer);
366        self
367    }
368
369    /// Borrow the outer transaction's writer handle, if any.
370    #[must_use]
371    pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
372        self.writer.as_ref()
373    }
374
375    /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
376    pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
377        self.algo_registry = Some(registry);
378        self
379    }
380
381    /// Get a reference to the algorithm registry, if set.
382    pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
383        self.algo_registry.as_ref()
384    }
385
386    /// Set the external procedure registry for test/user-defined procedures.
387    pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
388        self.procedure_registry = Some(registry);
389        self
390    }
391
392    /// Set Uni-Xervo runtime for query-time auto-embedding.
393    pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
394        self.xervo_runtime = Some(runtime);
395        self
396    }
397
398    /// Get a reference to the procedure registry, if set.
399    pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
400        self.procedure_registry.as_ref()
401    }
402
403    /// Attach the plugin registry. Required by the M5h.2 native-label
404    /// plugin-storage routing in `columnar_scan_vertex_batch_static`.
405    pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
406        self.plugin_registry = Some(registry);
407        self
408    }
409
410    /// Reference to the plugin registry (if set).
411    pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
412        self.plugin_registry.as_ref()
413    }
414
415    pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
416        self.xervo_runtime.as_ref()
417    }
418
419    /// Record a runtime warning.
420    pub fn push_warning(&self, warning: QueryWarning) {
421        if let Ok(mut w) = self.warnings.lock() {
422            w.push(warning);
423        }
424    }
425
426    /// Take all collected warnings, leaving the collector empty.
427    pub fn take_warnings(&self) -> Vec<QueryWarning> {
428        self.warnings
429            .lock()
430            .map(|mut w| std::mem::take(&mut *w))
431            .unwrap_or_default()
432    }
433
434    /// Check if the query has timed out.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the deadline has passed.
439    pub fn check_timeout(&self) -> anyhow::Result<()> {
440        if let Some(ref token) = self.cancellation_token
441            && token.is_cancelled()
442        {
443            return Err(anyhow::anyhow!("Query cancelled"));
444        }
445        if let Some(deadline) = self.deadline
446            && Instant::now() > deadline
447        {
448            return Err(anyhow::anyhow!("Query timed out"));
449        }
450        Ok(())
451    }
452
453    /// Get a reference to the storage manager.
454    pub fn storage(&self) -> &Arc<StorageManager> {
455        &self.storage
456    }
457
458    /// Get a reference to the adjacency manager.
459    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
460        self.storage.adjacency_manager()
461    }
462
463    /// Get a reference to the property manager.
464    pub fn property_manager(&self) -> &Arc<PropertyManager> {
465        &self.property_manager
466    }
467
468    /// Get a reference to the L0 context.
469    pub fn l0_context(&self) -> &L0Context {
470        &self.l0_context
471    }
472
473    /// Resolve a vertex's labels for a traversal-time label predicate.
474    ///
475    /// Checks the L0 chain first (which also records the read for SSI), then
476    /// the persisted `VidLabelsIndex`. The L0 chain covers fork-local and
477    /// committed-but-unflushed writes; the index covers vertices that live
478    /// only in Lance storage — notably every vertex on a fork, whose data is
479    /// flushed to Lance before branching. Returns `None` only when the vertex
480    /// is absent from both, in which case callers fall back to the label that
481    /// scoped the traversal.
482    ///
483    /// # Examples
484    ///
485    /// ```ignore
486    /// if let Some(labels) = ctx.resolve_vertex_labels(target_vid, &query_ctx) {
487    ///     keep = labels.iter().any(|l| l == "B");
488    /// }
489    /// ```
490    pub fn resolve_vertex_labels(&self, vid: Vid, query_ctx: &QueryContext) -> Option<Vec<String>> {
491        uni_store::runtime::l0_visibility::get_vertex_labels_optional(vid, query_ctx)
492            .or_else(|| self.storage.get_labels_from_index(vid))
493    }
494
495    /// Wall-clock deadline for the surrounding query, if any.
496    ///
497    /// Internal accessor used by [`crate::query::executor::procedure_host::QueryProcedureHost`]
498    /// to snapshot the deadline so procedure plugins can implement
499    /// `check_timeout` without holding a borrow on this context.
500    #[must_use]
501    pub fn deadline_for_host(&self) -> Option<Instant> {
502        self.deadline
503    }
504
505    /// Cancellation token clone for the surrounding query, if any.
506    ///
507    /// Internal accessor for [`crate::query::executor::procedure_host::QueryProcedureHost`];
508    /// see [`Self::deadline_for_host`].
509    #[must_use]
510    pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
511        self.cancellation_token.clone()
512    }
513
514    /// Create a query context for property manager calls.
515    ///
516    /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
517    pub fn query_context(&self) -> QueryContext {
518        let l0 = self
519            .l0_context
520            .current_l0
521            .clone()
522            .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
523
524        let mut ctx = QueryContext::new_with_pending(
525            l0,
526            self.l0_context.transaction_l0.clone(),
527            self.l0_context.pending_flush_l0s.clone(),
528        );
529        if let Some(deadline) = self.deadline {
530            ctx.set_deadline(deadline);
531        }
532        ctx
533    }
534
535    /// Ensure adjacency CSRs are warmed for the given edge types and direction.
536    ///
537    /// This loads any missing CSR data from storage into the adjacency manager
538    /// so that subsequent `get_neighbors` calls return complete results.
539    /// Skips warming if the adjacency manager already has data (Main CSR or
540    /// active overlay) for the edge type, avoiding duplicate entries.
541    pub async fn ensure_adjacency_warmed(
542        &self,
543        edge_type_ids: &[u32],
544        direction: Direction,
545    ) -> anyhow::Result<()> {
546        let am = self.adjacency_manager();
547        // Manifest pin only: tx version pins must NOT filter adjacency
548        // warming/reads (see StorageManager::snapshot_version_hwm).
549        let version = self.storage.snapshot_version_hwm();
550        for &etype_id in edge_type_ids {
551            // Skip if AM already has data (CSR or overlay) for this edge type.
552            // The overlay contains edges from dual-write (Writer), so warming
553            // would duplicate them.
554            if !am.is_active_for(etype_id, direction) {
555                for &dir in direction.expand() {
556                    // Use coalesced warming to prevent cache stampede (Issue #13)
557                    self.storage
558                        .warm_adjacency_coalesced(etype_id, dir, version)
559                        .await?;
560                }
561            }
562        }
563        Ok(())
564    }
565
566    /// Create a boxed warming future for use in DataFusion stream state machines.
567    ///
568    /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
569    /// suitable for polling in stream `poll_next` implementations.
570    pub fn warming_future(
571        self: &Arc<Self>,
572        edge_type_ids: Vec<u32>,
573        direction: Direction,
574    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
575    {
576        let ctx = self.clone();
577        Box::pin(async move {
578            ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
579                .await
580                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
581        })
582    }
583
584    /// Get neighbors for a vertex, merging CSR and all L0 buffers.
585    ///
586    /// This implements the MVCC visibility rules:
587    /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
588    /// 2. Overlay pending flush L0s (oldest to newest)
589    /// 3. Overlay current L0
590    /// 4. Overlay transaction L0 (if present)
591    /// 5. Filter tombstones (handled by overlay)
592    ///
593    /// # Arguments
594    ///
595    /// * `vid` - Source vertex ID
596    /// * `edge_type` - Edge type ID to traverse
597    /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
598    ///
599    /// # Returns
600    ///
601    /// Vector of (neighbor VID, edge ID) pairs.
602    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
603        // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
604        let version_hwm = self.storage.snapshot_version_hwm();
605        // Single-vid case: acquire the transaction-L0 guard once for this
606        // vertex (the batch path amortizes it across many vertices).
607        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
608        self.neighbors_for_vid(
609            vid,
610            edge_type,
611            direction,
612            version_hwm,
613            tx_guard.as_deref(),
614            true,
615        )
616    }
617
618    /// Get neighbors for multiple vertices in batch.
619    ///
620    /// More efficient than calling `get_neighbors` repeatedly as it amortizes
621    /// lock acquisition for L0 buffers.
622    ///
623    /// # Arguments
624    ///
625    /// * `vids` - Source vertex IDs
626    /// * `edge_type` - Edge type ID to traverse
627    /// * `direction` - Traversal direction
628    ///
629    /// # Returns
630    ///
631    /// Vector of (source VID, neighbor VID, edge ID) triples.
632    pub fn get_neighbors_batch(
633        &self,
634        vids: &[Vid],
635        edge_type: u32,
636        direction: Direction,
637    ) -> Vec<(Vid, Vid, Eid)> {
638        // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
639        let version_hwm = self.storage.snapshot_version_hwm();
640        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
641
642        let mut results = Vec::new();
643        for &vid in vids {
644            // record_reads=false: the whole batch is recorded in one read-set
645            // lock below instead of two lock acquisitions per source vertex.
646            let neighbors = self.neighbors_for_vid(
647                vid,
648                edge_type,
649                direction,
650                version_hwm,
651                tx_guard.as_deref(),
652                false,
653            );
654            results.extend(
655                neighbors
656                    .into_iter()
657                    .map(|(neighbor, eid)| (vid, neighbor, eid)),
658            );
659        }
660        drop(tx_guard);
661        self.record_neighbor_reads_batch(vids, &results);
662        results
663    }
664
665    /// Resolve an edge's STORED `(src, dst)` orientation given a traversed hop.
666    ///
667    /// A relationship in a path must report its stored (start -> end) direction
668    /// even when the path traversed it backward (undirected `-[r]-` or incoming
669    /// `<-[r]-`). This first consults the L0 visibility chain (exact for
670    /// in-memory edges); if the edge has been flushed to durable storage and is
671    /// no longer L0-resident, it recovers the orientation with a bounded
672    /// directed-outgoing adjacency probe: the edge is stored
673    /// `(traversal_src -> traversal_dst)` iff `eid` appears among
674    /// `traversal_src`'s outgoing neighbours for one of `edge_type_ids`,
675    /// otherwise it is the reverse. Falls back to the traversal order only when
676    /// the probe is inconclusive (e.g. the type carries no CSR adjacency).
677    ///
678    /// The probe costs at most the out-degree of one vertex per candidate edge
679    /// type — never a full edge scan — and reads the adjacency manager directly,
680    /// so it does not perturb the SSI read-set.
681    ///
682    /// # Examples
683    ///
684    /// ```ignore
685    /// let (src, dst) =
686    ///     ctx.resolve_stored_edge_endpoints(eid, node_path[i], node_path[i + 1], &edge_type_ids);
687    /// ```
688    #[must_use]
689    pub fn resolve_stored_edge_endpoints(
690        &self,
691        eid: Eid,
692        traversal_src: Vid,
693        traversal_dst: Vid,
694        edge_type_ids: &[u32],
695    ) -> (u64, u64) {
696        // 1. L0 visibility chain — exact stored endpoints for in-memory edges.
697        let query_ctx = self.query_context();
698        if let Some((src, dst)) =
699            uni_store::runtime::l0_visibility::get_edge_endpoints(eid, &query_ctx)
700        {
701            return (src.as_u64(), dst.as_u64());
702        }
703
704        // 2. Flushed (L1-resident) edge: recover orientation via a directed
705        //    outgoing adjacency probe. Read the adjacency manager / versioned
706        //    snapshot directly so the probe stays out of the SSI read-set.
707        //
708        //    When the caller could not supply the edge's type ids (e.g. an
709        //    anonymous `-[]-` relationship reaching BindFixedPath without a
710        //    `_type` column), fall back to the adjacency manager's warmed types
711        //    — exactly the set traversed by this query, so still a bounded probe.
712        let adjacency_manager = self.adjacency_manager();
713        let warmed_fallback: Vec<u32>;
714        let probe_types: &[u32] = if edge_type_ids.is_empty() {
715            warmed_fallback = adjacency_manager.known_edge_type_ids();
716            &warmed_fallback
717        } else {
718            edge_type_ids
719        };
720        let version_hwm = self.storage.snapshot_version_hwm();
721        let outgoing_contains = |vid: Vid| -> bool {
722            probe_types.iter().any(|&etype| {
723                let neighbors = match version_hwm {
724                    Some(hwm) => {
725                        self.storage
726                            .get_neighbors_at_version(vid, etype, Direction::Outgoing, hwm)
727                    }
728                    None => adjacency_manager.get_neighbors(vid, etype, Direction::Outgoing),
729                };
730                neighbors.iter().any(|&(_, e)| e == eid)
731            })
732        };
733
734        if outgoing_contains(traversal_src) {
735            (traversal_src.as_u64(), traversal_dst.as_u64())
736        } else if outgoing_contains(traversal_dst) {
737            (traversal_dst.as_u64(), traversal_src.as_u64())
738        } else {
739            // 3. Inconclusive (no CSR adjacency for this type): preserve the
740            //    long-standing traversal-order behaviour.
741            (traversal_src.as_u64(), traversal_dst.as_u64())
742        }
743    }
744
745    /// Resolve a single vertex's neighbours, overlaying the transaction L0
746    /// (if visible) and recording the traversal into the SSI read-set.
747    ///
748    /// `tx_guard` is the already-acquired read guard over the transaction
749    /// L0 buffer (if any), so batch callers acquire the lock once and pass
750    /// the borrow in for every vertex.
751    /// When `record_reads` is false the caller takes responsibility for
752    /// recording the traversal into the SSI read-set (the batch path records
753    /// once per batch via [`record_neighbor_reads_batch`]).
754    ///
755    /// [`record_neighbor_reads_batch`]: Self::record_neighbor_reads_batch
756    fn neighbors_for_vid(
757        &self,
758        vid: Vid,
759        edge_type: u32,
760        direction: Direction,
761        version_hwm: Option<u64>,
762        tx_guard: Option<&L0Buffer>,
763        record_reads: bool,
764    ) -> Vec<(Vid, Eid)> {
765        // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
766        // For snapshot queries, filter by version via StorageManager delegate.
767        let mut neighbors = if let Some(hwm) = version_hwm {
768            self.storage
769                .get_neighbors_at_version(vid, edge_type, direction, hwm)
770        } else {
771            self.adjacency_manager()
772                .get_neighbors(vid, edge_type, direction)
773        };
774
775        // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
776        if version_hwm.is_none()
777            && let Some(tx_guard) = tx_guard
778        {
779            overlay_l0_neighbors(
780                vid,
781                edge_type,
782                direction,
783                tx_guard,
784                &mut neighbors,
785                version_hwm,
786            );
787        }
788
789        if record_reads {
790            self.record_neighbor_reads(vid, &neighbors);
791        }
792
793        neighbors
794    }
795
796    /// Records traversed edges and discovered neighbours into the SSI read-set.
797    ///
798    /// No-op unless this is a read-write transaction (`occ_read_set` is `Some`
799    /// only then), so read-only and analytical traversals pay nothing. Recording
800    /// the source plus each neighbour vid and edge id gives item-level
801    /// antidependency coverage for traversals, matching the keyed read paths.
802    fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
803        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
804            return;
805        };
806        let guard = tx_l0.read();
807        let Some(read_set) = &guard.occ_read_set else {
808            return;
809        };
810        let mut rs = read_set.lock();
811        rs.vertices.insert(src);
812        for (nbr, eid) in neighbors {
813            rs.vertices.insert(*nbr);
814            rs.edges.insert(*eid);
815        }
816    }
817
818    /// Batch variant of [`record_neighbor_reads`](Self::record_neighbor_reads):
819    /// records an entire expansion batch under ONE read-set lock instead of
820    /// two lock acquisitions per source vertex.
821    ///
822    /// `srcs` is recorded in full — a source with zero neighbours is still a
823    /// read ("no edges here") that a concurrent edge insert must conflict
824    /// with, exactly as the per-vertex recorder behaves.
825    fn record_neighbor_reads_batch(&self, srcs: &[Vid], triples: &[(Vid, Vid, Eid)]) {
826        if srcs.is_empty() && triples.is_empty() {
827            return;
828        }
829        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
830            return;
831        };
832        let guard = tx_l0.read();
833        let Some(read_set) = &guard.occ_read_set else {
834            return;
835        };
836        let mut rs = read_set.lock();
837        for src in srcs {
838            rs.vertices.insert(*src);
839        }
840        for (_, nbr, eid) in triples {
841            rs.vertices.insert(*nbr);
842            rs.edges.insert(*eid);
843        }
844    }
845
846    /// Records the vertex/edge ids in the given batch columns into the read-set.
847    ///
848    /// Used by [`ReadSetRecordingExec`] to capture the identities of rows that
849    /// survived a scan's filters. No-op when there is no transaction read-set
850    /// (read-only / analytical contexts).
851    ///
852    /// [`ReadSetRecordingExec`]: crate::query::df_graph::ReadSetRecordingExec
853    pub(crate) fn record_batch_ids(
854        &self,
855        batch: &arrow_array::RecordBatch,
856        vertex_cols: &[usize],
857        edge_cols: &[usize],
858    ) {
859        use arrow_array::{Array, UInt64Array};
860
861        if vertex_cols.is_empty() && edge_cols.is_empty() {
862            return;
863        }
864        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
865            return;
866        };
867        let guard = tx_l0.read();
868        let Some(read_set) = &guard.occ_read_set else {
869            return;
870        };
871        let mut rs = read_set.lock();
872        for &col in vertex_cols {
873            if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
874                for i in 0..arr.len() {
875                    if !arr.is_null(i) {
876                        rs.vertices.insert(Vid::from(arr.value(i)));
877                    }
878                }
879            }
880        }
881        for &col in edge_cols {
882            if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
883                for i in 0..arr.len() {
884                    if !arr.is_null(i) {
885                        rs.edges.insert(Eid::from(arr.value(i)));
886                    }
887                }
888            }
889        }
890    }
891}
892
893/// Overlay L0 buffer neighbors onto existing neighbor list.
894///
895/// Adds new edges from L0 and removes tombstoned edges.
896/// Filters by version if a snapshot boundary is provided.
897fn overlay_l0_neighbors(
898    vid: Vid,
899    edge_type: u32,
900    direction: Direction,
901    l0: &L0Buffer,
902    neighbors: &mut Vec<(Vid, Eid)>,
903    version_hwm: Option<u64>,
904) {
905    use std::collections::HashMap;
906
907    // Convert to map for efficient updates
908    let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
909
910    // Query L0 for each direction
911    for &simple_dir in direction.to_simple_directions() {
912        for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
913            // Skip edges beyond snapshot boundary
914            if version_hwm.is_some_and(|hwm| version > hwm) {
915                continue;
916            }
917
918            // Apply insert or remove tombstone
919            if l0.is_tombstoned(eid) {
920                neighbor_map.remove(&eid);
921            } else {
922                neighbor_map.insert(eid, neighbor);
923            }
924        }
925    }
926
927    // Remove edges tombstoned in this L0 but originating from other layers
928    for eid in l0.tombstones.keys() {
929        neighbor_map.remove(eid);
930    }
931
932    // Convert back to vec
933    *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
934}
935
936/// Extension trait to convert storage Direction to SimpleGraph directions.
937trait DirectionExt {
938    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
939}
940
941impl DirectionExt for Direction {
942    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
943        use uni_common::graph::simple_graph::Direction as SimpleDirection;
944        match self {
945            Direction::Outgoing => &[SimpleDirection::Outgoing],
946            Direction::Incoming => &[SimpleDirection::Incoming],
947            Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
948        }
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955
956    #[test]
957    fn test_l0_context_empty() {
958        let ctx = L0Context::empty();
959        assert!(ctx.current_l0.is_none());
960        assert!(ctx.transaction_l0.is_none());
961        assert!(ctx.pending_flush_l0s.is_empty());
962    }
963}