Skip to main content

uni_query/query/df_graph/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │      DataFusion ExecutionPlan Tree      │
20//! ├─────────────────────────────────────────┤
21//! │  ProjectionExec (DataFusion)            │
22//! │       │                                 │
23//! │  FilterExec (DataFusion)                │
24//! │       │                                 │
25//! │  GraphTraverseExec (CUSTOM)             │
26//! │       │                                 │
27//! │  GraphScanExec (CUSTOM)                 │
28//! │       │                                 │
29//! │  UniTableProvider + UniMergeExec        │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_profile;
63pub mod locy_program;
64pub mod locy_query;
65pub mod locy_slg;
66pub mod locy_traits;
67pub mod locy_validate;
68pub mod mutation_common;
69pub mod mutation_delete;
70pub mod mutation_foreach;
71pub mod mutation_remove;
72pub mod mutation_set;
73pub mod nfa;
74pub mod optional_filter;
75pub mod pattern_comprehension;
76pub mod pattern_exists;
77pub mod pred_dag;
78pub mod procedure_call;
79pub mod quantifier;
80mod read_set_exec;
81pub mod recursive_cte;
82pub mod reduce;
83pub mod scan;
84pub mod shortest_path;
85pub(crate) mod similar_to_expr;
86pub mod traverse;
87pub mod unwind;
88pub mod vector_knn;
89pub mod vid_lookup_join;
90
91use crate::query::executor::procedure::ProcedureRegistry;
92use parking_lot::RwLock;
93use std::sync::{Arc, Mutex};
94use std::time::Instant;
95use uni_algo::algo::AlgorithmRegistry;
96use uni_common::core::id::{Eid, Vid};
97use uni_store::runtime::context::QueryContext;
98use uni_store::runtime::l0::L0Buffer;
99use uni_store::runtime::property_manager::PropertyManager;
100use uni_store::storage::adjacency_manager::AdjacencyManager;
101use uni_store::storage::direction::Direction;
102use uni_store::storage::manager::StorageManager;
103
104pub mod search_procedures;
105use uni_xervo::runtime::ModelRuntime;
106
107use crate::types::QueryWarning;
108
109pub use apply::GraphApplyExec;
110pub use ext_id_lookup::GraphExtIdLookupExec;
111// CREATE and MERGE both execute through `MutationExec`; these aliases and
112// the `new_*_exec` builders are re-exported here so the planner can refer to
113// them by their clause-specific names without a dedicated module each.
114pub use mutation_common::{
115    MutationContext, MutationExec, MutationExec as MutationCreateExec,
116    MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
117};
118pub use mutation_delete::MutationDeleteExec;
119pub use mutation_foreach::ForeachExec;
120pub use mutation_remove::MutationRemoveExec;
121pub use mutation_set::MutationSetExec;
122pub use optional_filter::OptionalFilterExec;
123pub use procedure_call::GraphProcedureCallExec;
124pub use read_set_exec::ReadSetRecordingExec;
125pub use scan::GraphScanExec;
126pub use shortest_path::GraphShortestPathExec;
127pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
128pub use unwind::GraphUnwindExec;
129pub use vector_knn::GraphVectorKnnExec;
130
131pub use locy_best_by::BestByExec;
132pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
133pub use locy_fixpoint::{
134    DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
135    FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
136};
137pub use locy_fold::FoldExec;
138pub use locy_priority::PriorityExec;
139pub use locy_program::{DerivedStore, LocyProgramExec};
140pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
141
142/// Shared context for graph operators.
143///
144/// Provides access to graph-specific resources needed during query execution:
145/// - CSR adjacency cache for fast neighbor lookups
146/// - L0 buffers for MVCC visibility of uncommitted changes
147/// - Property manager for lazy-loading vertex/edge properties
148/// - Storage manager for schema and dataset access
149///
150/// # Example
151///
152/// ```ignore
153/// let ctx = GraphExecutionContext::new(
154///     storage_manager,
155///     l0_buffer,
156///     property_manager,
157/// );
158///
159/// // Get neighbors with L0 overlay
160/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
161/// ```
162pub struct GraphExecutionContext {
163    /// Storage manager for schema and dataset access.
164    storage: Arc<StorageManager>,
165
166    /// L0 visibility context for MVCC.
167    l0_context: L0Context,
168
169    /// Property manager for lazy property loading.
170    property_manager: Arc<PropertyManager>,
171
172    /// Query timeout deadline.
173    deadline: Option<Instant>,
174
175    /// Algorithm registry for `uni.algo.*` procedure dispatch.
176    algo_registry: Option<Arc<AlgorithmRegistry>>,
177
178    /// External procedure registry for test/user-defined procedures.
179    procedure_registry: Option<Arc<ProcedureRegistry>>,
180    /// Plugin registry — used by the native-label scan dispatcher
181    /// (M5h.2) to route a label's reads through plugin `Storage` when
182    /// one is registered via `PluginRegistry::register_label_storage`.
183    plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
184    /// Uni-Xervo runtime used by vector auto-embedding paths.
185    xervo_runtime: Option<Arc<ModelRuntime>>,
186
187    /// Runtime warnings collected during query execution.
188    warnings: Arc<Mutex<Vec<QueryWarning>>>,
189
190    /// Cooperative cancellation token, threaded from `QueryContext`.
191    cancellation_token: Option<tokio_util::sync::CancellationToken>,
192
193    /// Outer transaction's writer handle (FU-1 / M11 #6). Threaded
194    /// from the [`crate::Executor`] when the query is running inside a
195    /// write-mode transaction; consumed by
196    /// `QueryProcedureHost::with_writer` at procedure invocation time
197    /// so a declared `WRITE`-mode procedure's Cypher body can mutate
198    /// the outer transaction's L0. `Arc<Writer>` (interior-mutable,
199    /// no outer lock) matches the executor's writer handle type.
200    writer: Option<Arc<uni_store::Writer>>,
201}
202
203impl std::fmt::Debug for GraphExecutionContext {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("GraphExecutionContext")
206            .field("l0_context", &self.l0_context)
207            .field("deadline", &self.deadline)
208            .finish_non_exhaustive()
209    }
210}
211
212/// L0 buffer visibility context for MVCC reads.
213///
214/// Maintains references to all L0 buffers that should be visible to a query:
215/// - Current L0: The active write buffer
216/// - Transaction L0: Buffer for the current transaction (if any)
217/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
218///
219/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
220#[derive(Clone, Default)]
221pub struct L0Context {
222    /// Current active L0 buffer.
223    pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
224
225    /// Transaction-local L0 buffer (if in a transaction).
226    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
227
228    /// L0 buffers pending flush to disk.
229    /// These remain visible until flush completes.
230    pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
231}
232
233impl std::fmt::Debug for L0Context {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        f.debug_struct("L0Context")
236            .field("current_l0", &self.current_l0.is_some())
237            .field("transaction_l0", &self.transaction_l0.is_some())
238            .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
239            .finish()
240    }
241}
242
243impl L0Context {
244    /// Create an empty L0 context with no buffers.
245    pub fn empty() -> Self {
246        Self::default()
247    }
248
249    /// Create L0 context with just a current buffer.
250    pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
251        Self {
252            current_l0: Some(l0),
253            ..Self::default()
254        }
255    }
256
257    /// Create L0 context from a query context.
258    pub fn from_query_context(ctx: &QueryContext) -> Self {
259        Self {
260            current_l0: Some(ctx.l0.clone()),
261            transaction_l0: ctx.transaction_l0.clone(),
262            pending_flush_l0s: ctx.pending_flush_l0s.clone(),
263        }
264    }
265
266    /// Iterate over all L0 buffers in visibility order.
267    /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
268    pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
269        self.pending_flush_l0s
270            .iter()
271            .chain(self.current_l0.iter())
272            .chain(self.transaction_l0.iter())
273    }
274}
275
276impl GraphExecutionContext {
277    /// Shared constructor for the public entry points. The three public
278    /// constructors differ only in the L0 visibility context, deadline,
279    /// and cancellation token; every other field starts at its default.
280    fn with_parts(
281        storage: Arc<StorageManager>,
282        l0_context: L0Context,
283        property_manager: Arc<PropertyManager>,
284        deadline: Option<Instant>,
285        cancellation_token: Option<tokio_util::sync::CancellationToken>,
286    ) -> Self {
287        Self {
288            storage,
289            l0_context,
290            property_manager,
291            deadline,
292            algo_registry: None,
293            procedure_registry: None,
294            plugin_registry: None,
295            xervo_runtime: None,
296            warnings: Arc::new(Mutex::new(Vec::new())),
297            cancellation_token,
298            writer: None,
299        }
300    }
301
302    /// Create a new graph execution context.
303    ///
304    /// # Arguments
305    ///
306    /// * `storage` - Storage manager for schema and dataset access
307    /// * `l0` - Current L0 buffer for MVCC visibility
308    /// * `property_manager` - Manager for lazy property loading
309    pub fn new(
310        storage: Arc<StorageManager>,
311        l0: Arc<RwLock<L0Buffer>>,
312        property_manager: Arc<PropertyManager>,
313    ) -> Self {
314        Self::with_parts(
315            storage,
316            L0Context::with_current(l0),
317            property_manager,
318            None,
319            None,
320        )
321    }
322
323    /// Create context with full L0 visibility.
324    ///
325    /// # Arguments
326    ///
327    /// * `storage` - Storage manager for schema and dataset access
328    /// * `l0_context` - L0 visibility context with all buffers
329    /// * `property_manager` - Manager for lazy property loading
330    pub fn with_l0_context(
331        storage: Arc<StorageManager>,
332        l0_context: L0Context,
333        property_manager: Arc<PropertyManager>,
334    ) -> Self {
335        Self::with_parts(storage, l0_context, property_manager, None, None)
336    }
337
338    /// Create context from a query context.
339    pub fn from_query_context(
340        storage: Arc<StorageManager>,
341        query_ctx: &QueryContext,
342        property_manager: Arc<PropertyManager>,
343    ) -> Self {
344        Self::with_parts(
345            storage,
346            L0Context::from_query_context(query_ctx),
347            property_manager,
348            query_ctx.deadline,
349            query_ctx.cancellation_token.clone(),
350        )
351    }
352
353    /// Set query timeout deadline.
354    pub fn with_deadline(mut self, deadline: Instant) -> Self {
355        self.deadline = Some(deadline);
356        self
357    }
358
359    /// Attach the outer transaction's writer handle so declared
360    /// `WRITE`-mode procedures invoked through this context can run
361    /// their Cypher bodies via the write-enabled inner-query host.
362    #[must_use]
363    pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
364        self.writer = Some(writer);
365        self
366    }
367
368    /// Borrow the outer transaction's writer handle, if any.
369    #[must_use]
370    pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
371        self.writer.as_ref()
372    }
373
374    /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
375    pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
376        self.algo_registry = Some(registry);
377        self
378    }
379
380    /// Get a reference to the algorithm registry, if set.
381    pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
382        self.algo_registry.as_ref()
383    }
384
385    /// Set the external procedure registry for test/user-defined procedures.
386    pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
387        self.procedure_registry = Some(registry);
388        self
389    }
390
391    /// Set Uni-Xervo runtime for query-time auto-embedding.
392    pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
393        self.xervo_runtime = Some(runtime);
394        self
395    }
396
397    /// Get a reference to the procedure registry, if set.
398    pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
399        self.procedure_registry.as_ref()
400    }
401
402    /// Attach the plugin registry. Required by the M5h.2 native-label
403    /// plugin-storage routing in `columnar_scan_vertex_batch_static`.
404    pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
405        self.plugin_registry = Some(registry);
406        self
407    }
408
409    /// Reference to the plugin registry (if set).
410    pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
411        self.plugin_registry.as_ref()
412    }
413
414    pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
415        self.xervo_runtime.as_ref()
416    }
417
418    /// Record a runtime warning.
419    pub fn push_warning(&self, warning: QueryWarning) {
420        if let Ok(mut w) = self.warnings.lock() {
421            w.push(warning);
422        }
423    }
424
425    /// Take all collected warnings, leaving the collector empty.
426    pub fn take_warnings(&self) -> Vec<QueryWarning> {
427        self.warnings
428            .lock()
429            .map(|mut w| std::mem::take(&mut *w))
430            .unwrap_or_default()
431    }
432
433    /// Check if the query has timed out.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if the deadline has passed.
438    pub fn check_timeout(&self) -> anyhow::Result<()> {
439        if let Some(ref token) = self.cancellation_token
440            && token.is_cancelled()
441        {
442            return Err(anyhow::anyhow!("Query cancelled"));
443        }
444        if let Some(deadline) = self.deadline
445            && Instant::now() > deadline
446        {
447            return Err(anyhow::anyhow!("Query timed out"));
448        }
449        Ok(())
450    }
451
452    /// Get a reference to the storage manager.
453    pub fn storage(&self) -> &Arc<StorageManager> {
454        &self.storage
455    }
456
457    /// Get a reference to the adjacency manager.
458    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
459        self.storage.adjacency_manager()
460    }
461
462    /// Get a reference to the property manager.
463    pub fn property_manager(&self) -> &Arc<PropertyManager> {
464        &self.property_manager
465    }
466
467    /// Get a reference to the L0 context.
468    pub fn l0_context(&self) -> &L0Context {
469        &self.l0_context
470    }
471
472    /// Wall-clock deadline for the surrounding query, if any.
473    ///
474    /// Internal accessor used by [`crate::query::executor::procedure_host::QueryProcedureHost`]
475    /// to snapshot the deadline so procedure plugins can implement
476    /// `check_timeout` without holding a borrow on this context.
477    #[must_use]
478    pub fn deadline_for_host(&self) -> Option<Instant> {
479        self.deadline
480    }
481
482    /// Cancellation token clone for the surrounding query, if any.
483    ///
484    /// Internal accessor for [`crate::query::executor::procedure_host::QueryProcedureHost`];
485    /// see [`Self::deadline_for_host`].
486    #[must_use]
487    pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
488        self.cancellation_token.clone()
489    }
490
491    /// Create a query context for property manager calls.
492    ///
493    /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
494    pub fn query_context(&self) -> QueryContext {
495        let l0 = self
496            .l0_context
497            .current_l0
498            .clone()
499            .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
500
501        let mut ctx = QueryContext::new_with_pending(
502            l0,
503            self.l0_context.transaction_l0.clone(),
504            self.l0_context.pending_flush_l0s.clone(),
505        );
506        if let Some(deadline) = self.deadline {
507            ctx.set_deadline(deadline);
508        }
509        ctx
510    }
511
512    /// Ensure adjacency CSRs are warmed for the given edge types and direction.
513    ///
514    /// This loads any missing CSR data from storage into the adjacency manager
515    /// so that subsequent `get_neighbors` calls return complete results.
516    /// Skips warming if the adjacency manager already has data (Main CSR or
517    /// active overlay) for the edge type, avoiding duplicate entries.
518    pub async fn ensure_adjacency_warmed(
519        &self,
520        edge_type_ids: &[u32],
521        direction: Direction,
522    ) -> anyhow::Result<()> {
523        let am = self.adjacency_manager();
524        // Manifest pin only: tx version pins must NOT filter adjacency
525        // warming/reads (see StorageManager::snapshot_version_hwm).
526        let version = self.storage.snapshot_version_hwm();
527        for &etype_id in edge_type_ids {
528            // Skip if AM already has data (CSR or overlay) for this edge type.
529            // The overlay contains edges from dual-write (Writer), so warming
530            // would duplicate them.
531            if !am.is_active_for(etype_id, direction) {
532                for &dir in direction.expand() {
533                    // Use coalesced warming to prevent cache stampede (Issue #13)
534                    self.storage
535                        .warm_adjacency_coalesced(etype_id, dir, version)
536                        .await?;
537                }
538            }
539        }
540        Ok(())
541    }
542
543    /// Create a boxed warming future for use in DataFusion stream state machines.
544    ///
545    /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
546    /// suitable for polling in stream `poll_next` implementations.
547    pub fn warming_future(
548        self: &Arc<Self>,
549        edge_type_ids: Vec<u32>,
550        direction: Direction,
551    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
552    {
553        let ctx = self.clone();
554        Box::pin(async move {
555            ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
556                .await
557                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
558        })
559    }
560
561    /// Get neighbors for a vertex, merging CSR and all L0 buffers.
562    ///
563    /// This implements the MVCC visibility rules:
564    /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
565    /// 2. Overlay pending flush L0s (oldest to newest)
566    /// 3. Overlay current L0
567    /// 4. Overlay transaction L0 (if present)
568    /// 5. Filter tombstones (handled by overlay)
569    ///
570    /// # Arguments
571    ///
572    /// * `vid` - Source vertex ID
573    /// * `edge_type` - Edge type ID to traverse
574    /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
575    ///
576    /// # Returns
577    ///
578    /// Vector of (neighbor VID, edge ID) pairs.
579    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
580        // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
581        let version_hwm = self.storage.snapshot_version_hwm();
582        // Single-vid case: acquire the transaction-L0 guard once for this
583        // vertex (the batch path amortizes it across many vertices).
584        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
585        self.neighbors_for_vid(
586            vid,
587            edge_type,
588            direction,
589            version_hwm,
590            tx_guard.as_deref(),
591            true,
592        )
593    }
594
595    /// Get neighbors for multiple vertices in batch.
596    ///
597    /// More efficient than calling `get_neighbors` repeatedly as it amortizes
598    /// lock acquisition for L0 buffers.
599    ///
600    /// # Arguments
601    ///
602    /// * `vids` - Source vertex IDs
603    /// * `edge_type` - Edge type ID to traverse
604    /// * `direction` - Traversal direction
605    ///
606    /// # Returns
607    ///
608    /// Vector of (source VID, neighbor VID, edge ID) triples.
609    pub fn get_neighbors_batch(
610        &self,
611        vids: &[Vid],
612        edge_type: u32,
613        direction: Direction,
614    ) -> Vec<(Vid, Vid, Eid)> {
615        // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
616        let version_hwm = self.storage.snapshot_version_hwm();
617        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
618
619        let mut results = Vec::new();
620        for &vid in vids {
621            // record_reads=false: the whole batch is recorded in one read-set
622            // lock below instead of two lock acquisitions per source vertex.
623            let neighbors = self.neighbors_for_vid(
624                vid,
625                edge_type,
626                direction,
627                version_hwm,
628                tx_guard.as_deref(),
629                false,
630            );
631            results.extend(
632                neighbors
633                    .into_iter()
634                    .map(|(neighbor, eid)| (vid, neighbor, eid)),
635            );
636        }
637        drop(tx_guard);
638        self.record_neighbor_reads_batch(vids, &results);
639        results
640    }
641
642    /// Resolve an edge's STORED `(src, dst)` orientation given a traversed hop.
643    ///
644    /// A relationship in a path must report its stored (start -> end) direction
645    /// even when the path traversed it backward (undirected `-[r]-` or incoming
646    /// `<-[r]-`). This first consults the L0 visibility chain (exact for
647    /// in-memory edges); if the edge has been flushed to durable storage and is
648    /// no longer L0-resident, it recovers the orientation with a bounded
649    /// directed-outgoing adjacency probe: the edge is stored
650    /// `(traversal_src -> traversal_dst)` iff `eid` appears among
651    /// `traversal_src`'s outgoing neighbours for one of `edge_type_ids`,
652    /// otherwise it is the reverse. Falls back to the traversal order only when
653    /// the probe is inconclusive (e.g. the type carries no CSR adjacency).
654    ///
655    /// The probe costs at most the out-degree of one vertex per candidate edge
656    /// type — never a full edge scan — and reads the adjacency manager directly,
657    /// so it does not perturb the SSI read-set.
658    ///
659    /// # Examples
660    ///
661    /// ```ignore
662    /// let (src, dst) =
663    ///     ctx.resolve_stored_edge_endpoints(eid, node_path[i], node_path[i + 1], &edge_type_ids);
664    /// ```
665    #[must_use]
666    pub fn resolve_stored_edge_endpoints(
667        &self,
668        eid: Eid,
669        traversal_src: Vid,
670        traversal_dst: Vid,
671        edge_type_ids: &[u32],
672    ) -> (u64, u64) {
673        // 1. L0 visibility chain — exact stored endpoints for in-memory edges.
674        let query_ctx = self.query_context();
675        if let Some((src, dst)) =
676            uni_store::runtime::l0_visibility::get_edge_endpoints(eid, &query_ctx)
677        {
678            return (src.as_u64(), dst.as_u64());
679        }
680
681        // 2. Flushed (L1-resident) edge: recover orientation via a directed
682        //    outgoing adjacency probe. Read the adjacency manager / versioned
683        //    snapshot directly so the probe stays out of the SSI read-set.
684        //
685        //    When the caller could not supply the edge's type ids (e.g. an
686        //    anonymous `-[]-` relationship reaching BindFixedPath without a
687        //    `_type` column), fall back to the adjacency manager's warmed types
688        //    — exactly the set traversed by this query, so still a bounded probe.
689        let adjacency_manager = self.adjacency_manager();
690        let warmed_fallback: Vec<u32>;
691        let probe_types: &[u32] = if edge_type_ids.is_empty() {
692            warmed_fallback = adjacency_manager.known_edge_type_ids();
693            &warmed_fallback
694        } else {
695            edge_type_ids
696        };
697        let version_hwm = self.storage.snapshot_version_hwm();
698        let outgoing_contains = |vid: Vid| -> bool {
699            probe_types.iter().any(|&etype| {
700                let neighbors = match version_hwm {
701                    Some(hwm) => {
702                        self.storage
703                            .get_neighbors_at_version(vid, etype, Direction::Outgoing, hwm)
704                    }
705                    None => adjacency_manager.get_neighbors(vid, etype, Direction::Outgoing),
706                };
707                neighbors.iter().any(|&(_, e)| e == eid)
708            })
709        };
710
711        if outgoing_contains(traversal_src) {
712            (traversal_src.as_u64(), traversal_dst.as_u64())
713        } else if outgoing_contains(traversal_dst) {
714            (traversal_dst.as_u64(), traversal_src.as_u64())
715        } else {
716            // 3. Inconclusive (no CSR adjacency for this type): preserve the
717            //    long-standing traversal-order behaviour.
718            (traversal_src.as_u64(), traversal_dst.as_u64())
719        }
720    }
721
722    /// Resolve a single vertex's neighbours, overlaying the transaction L0
723    /// (if visible) and recording the traversal into the SSI read-set.
724    ///
725    /// `tx_guard` is the already-acquired read guard over the transaction
726    /// L0 buffer (if any), so batch callers acquire the lock once and pass
727    /// the borrow in for every vertex.
728    /// When `record_reads` is false the caller takes responsibility for
729    /// recording the traversal into the SSI read-set (the batch path records
730    /// once per batch via [`record_neighbor_reads_batch`]).
731    ///
732    /// [`record_neighbor_reads_batch`]: Self::record_neighbor_reads_batch
733    fn neighbors_for_vid(
734        &self,
735        vid: Vid,
736        edge_type: u32,
737        direction: Direction,
738        version_hwm: Option<u64>,
739        tx_guard: Option<&L0Buffer>,
740        record_reads: bool,
741    ) -> Vec<(Vid, Eid)> {
742        // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
743        // For snapshot queries, filter by version via StorageManager delegate.
744        let mut neighbors = if let Some(hwm) = version_hwm {
745            self.storage
746                .get_neighbors_at_version(vid, edge_type, direction, hwm)
747        } else {
748            self.adjacency_manager()
749                .get_neighbors(vid, edge_type, direction)
750        };
751
752        // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
753        if version_hwm.is_none()
754            && let Some(tx_guard) = tx_guard
755        {
756            overlay_l0_neighbors(
757                vid,
758                edge_type,
759                direction,
760                tx_guard,
761                &mut neighbors,
762                version_hwm,
763            );
764        }
765
766        if record_reads {
767            self.record_neighbor_reads(vid, &neighbors);
768        }
769
770        neighbors
771    }
772
773    /// Records traversed edges and discovered neighbours into the SSI read-set.
774    ///
775    /// No-op unless this is a read-write transaction (`occ_read_set` is `Some`
776    /// only then), so read-only and analytical traversals pay nothing. Recording
777    /// the source plus each neighbour vid and edge id gives item-level
778    /// antidependency coverage for traversals, matching the keyed read paths.
779    fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
780        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
781            return;
782        };
783        let guard = tx_l0.read();
784        let Some(read_set) = &guard.occ_read_set else {
785            return;
786        };
787        let mut rs = read_set.lock();
788        rs.vertices.insert(src);
789        for (nbr, eid) in neighbors {
790            rs.vertices.insert(*nbr);
791            rs.edges.insert(*eid);
792        }
793    }
794
795    /// Batch variant of [`record_neighbor_reads`](Self::record_neighbor_reads):
796    /// records an entire expansion batch under ONE read-set lock instead of
797    /// two lock acquisitions per source vertex.
798    ///
799    /// `srcs` is recorded in full — a source with zero neighbours is still a
800    /// read ("no edges here") that a concurrent edge insert must conflict
801    /// with, exactly as the per-vertex recorder behaves.
802    fn record_neighbor_reads_batch(&self, srcs: &[Vid], triples: &[(Vid, Vid, Eid)]) {
803        if srcs.is_empty() && triples.is_empty() {
804            return;
805        }
806        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
807            return;
808        };
809        let guard = tx_l0.read();
810        let Some(read_set) = &guard.occ_read_set else {
811            return;
812        };
813        let mut rs = read_set.lock();
814        for src in srcs {
815            rs.vertices.insert(*src);
816        }
817        for (_, nbr, eid) in triples {
818            rs.vertices.insert(*nbr);
819            rs.edges.insert(*eid);
820        }
821    }
822
823    /// Records the vertex/edge ids in the given batch columns into the read-set.
824    ///
825    /// Used by [`ReadSetRecordingExec`] to capture the identities of rows that
826    /// survived a scan's filters. No-op when there is no transaction read-set
827    /// (read-only / analytical contexts).
828    ///
829    /// [`ReadSetRecordingExec`]: crate::query::df_graph::ReadSetRecordingExec
830    pub(crate) fn record_batch_ids(
831        &self,
832        batch: &arrow_array::RecordBatch,
833        vertex_cols: &[usize],
834        edge_cols: &[usize],
835    ) {
836        use arrow_array::{Array, UInt64Array};
837
838        if vertex_cols.is_empty() && edge_cols.is_empty() {
839            return;
840        }
841        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
842            return;
843        };
844        let guard = tx_l0.read();
845        let Some(read_set) = &guard.occ_read_set else {
846            return;
847        };
848        let mut rs = read_set.lock();
849        for &col in vertex_cols {
850            if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
851                for i in 0..arr.len() {
852                    if !arr.is_null(i) {
853                        rs.vertices.insert(Vid::from(arr.value(i)));
854                    }
855                }
856            }
857        }
858        for &col in edge_cols {
859            if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
860                for i in 0..arr.len() {
861                    if !arr.is_null(i) {
862                        rs.edges.insert(Eid::from(arr.value(i)));
863                    }
864                }
865            }
866        }
867    }
868}
869
870/// Overlay L0 buffer neighbors onto existing neighbor list.
871///
872/// Adds new edges from L0 and removes tombstoned edges.
873/// Filters by version if a snapshot boundary is provided.
874fn overlay_l0_neighbors(
875    vid: Vid,
876    edge_type: u32,
877    direction: Direction,
878    l0: &L0Buffer,
879    neighbors: &mut Vec<(Vid, Eid)>,
880    version_hwm: Option<u64>,
881) {
882    use std::collections::HashMap;
883
884    // Convert to map for efficient updates
885    let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
886
887    // Query L0 for each direction
888    for &simple_dir in direction.to_simple_directions() {
889        for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
890            // Skip edges beyond snapshot boundary
891            if version_hwm.is_some_and(|hwm| version > hwm) {
892                continue;
893            }
894
895            // Apply insert or remove tombstone
896            if l0.is_tombstoned(eid) {
897                neighbor_map.remove(&eid);
898            } else {
899                neighbor_map.insert(eid, neighbor);
900            }
901        }
902    }
903
904    // Remove edges tombstoned in this L0 but originating from other layers
905    for eid in l0.tombstones.keys() {
906        neighbor_map.remove(eid);
907    }
908
909    // Convert back to vec
910    *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
911}
912
913/// Extension trait to convert storage Direction to SimpleGraph directions.
914trait DirectionExt {
915    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
916}
917
918impl DirectionExt for Direction {
919    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
920        use uni_common::graph::simple_graph::Direction as SimpleDirection;
921        match self {
922            Direction::Outgoing => &[SimpleDirection::Outgoing],
923            Direction::Incoming => &[SimpleDirection::Incoming],
924            Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
925        }
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    use super::*;
932
933    #[test]
934    fn test_l0_context_empty() {
935        let ctx = L0Context::empty();
936        assert!(ctx.current_l0.is_none());
937        assert!(ctx.transaction_l0.is_none());
938        assert!(ctx.pending_flush_l0s.is_empty());
939    }
940}