Skip to main content

uni_query/query/df_graph/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │      DataFusion ExecutionPlan Tree      │
20//! ├─────────────────────────────────────────┤
21//! │  ProjectionExec (DataFusion)            │
22//! │       │                                 │
23//! │  FilterExec (DataFusion)                │
24//! │       │                                 │
25//! │  GraphTraverseExec (CUSTOM)             │
26//! │       │                                 │
27//! │  GraphScanExec (CUSTOM)                 │
28//! │       │                                 │
29//! │  UniTableProvider + UniMergeExec        │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_program;
63pub mod locy_query;
64pub mod locy_slg;
65pub mod locy_traits;
66pub mod locy_validate;
67pub mod mutation_common;
68pub mod mutation_delete;
69pub mod mutation_foreach;
70pub mod mutation_remove;
71pub mod mutation_set;
72pub mod nfa;
73pub mod optional_filter;
74pub mod pattern_comprehension;
75pub mod pattern_exists;
76pub mod pred_dag;
77pub mod procedure_call;
78pub mod quantifier;
79mod read_set_exec;
80pub mod recursive_cte;
81pub mod reduce;
82pub mod scan;
83pub mod shortest_path;
84pub(crate) mod similar_to_expr;
85pub mod traverse;
86pub mod unwind;
87pub mod vector_knn;
88pub mod vid_lookup_join;
89
90use crate::query::executor::procedure::ProcedureRegistry;
91use parking_lot::RwLock;
92use std::sync::{Arc, Mutex};
93use std::time::Instant;
94use uni_algo::algo::AlgorithmRegistry;
95use uni_common::core::id::{Eid, Vid};
96use uni_store::runtime::context::QueryContext;
97use uni_store::runtime::l0::L0Buffer;
98use uni_store::runtime::property_manager::PropertyManager;
99use uni_store::storage::adjacency_manager::AdjacencyManager;
100use uni_store::storage::direction::Direction;
101use uni_store::storage::manager::StorageManager;
102
103pub mod search_procedures;
104use uni_xervo::runtime::ModelRuntime;
105
106use crate::types::QueryWarning;
107
108pub use apply::GraphApplyExec;
109pub use ext_id_lookup::GraphExtIdLookupExec;
110// CREATE and MERGE both execute through `MutationExec`; these aliases and
111// the `new_*_exec` builders are re-exported here so the planner can refer to
112// them by their clause-specific names without a dedicated module each.
113pub use mutation_common::{
114    MutationContext, MutationExec, MutationExec as MutationCreateExec,
115    MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
116};
117pub use mutation_delete::MutationDeleteExec;
118pub use mutation_foreach::ForeachExec;
119pub use mutation_remove::MutationRemoveExec;
120pub use mutation_set::MutationSetExec;
121pub use optional_filter::OptionalFilterExec;
122pub use procedure_call::GraphProcedureCallExec;
123pub use read_set_exec::ReadSetRecordingExec;
124pub use scan::GraphScanExec;
125pub use shortest_path::GraphShortestPathExec;
126pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
127pub use unwind::GraphUnwindExec;
128pub use vector_knn::GraphVectorKnnExec;
129
130pub use locy_best_by::BestByExec;
131pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
132pub use locy_fixpoint::{
133    DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
134    FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
135};
136pub use locy_fold::FoldExec;
137pub use locy_priority::PriorityExec;
138pub use locy_program::{DerivedStore, LocyProgramExec};
139pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
140
141/// Shared context for graph operators.
142///
143/// Provides access to graph-specific resources needed during query execution:
144/// - CSR adjacency cache for fast neighbor lookups
145/// - L0 buffers for MVCC visibility of uncommitted changes
146/// - Property manager for lazy-loading vertex/edge properties
147/// - Storage manager for schema and dataset access
148///
149/// # Example
150///
151/// ```ignore
152/// let ctx = GraphExecutionContext::new(
153///     storage_manager,
154///     l0_buffer,
155///     property_manager,
156/// );
157///
158/// // Get neighbors with L0 overlay
159/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
160/// ```
161pub struct GraphExecutionContext {
162    /// Storage manager for schema and dataset access.
163    storage: Arc<StorageManager>,
164
165    /// L0 visibility context for MVCC.
166    l0_context: L0Context,
167
168    /// Property manager for lazy property loading.
169    property_manager: Arc<PropertyManager>,
170
171    /// Query timeout deadline.
172    deadline: Option<Instant>,
173
174    /// Algorithm registry for `uni.algo.*` procedure dispatch.
175    algo_registry: Option<Arc<AlgorithmRegistry>>,
176
177    /// External procedure registry for test/user-defined procedures.
178    procedure_registry: Option<Arc<ProcedureRegistry>>,
179    /// Plugin registry — used by the native-label scan dispatcher
180    /// (M5h.2) to route a label's reads through plugin `Storage` when
181    /// one is registered via `PluginRegistry::register_label_storage`.
182    plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
183    /// Uni-Xervo runtime used by vector auto-embedding paths.
184    xervo_runtime: Option<Arc<ModelRuntime>>,
185
186    /// Runtime warnings collected during query execution.
187    warnings: Arc<Mutex<Vec<QueryWarning>>>,
188
189    /// Cooperative cancellation token, threaded from `QueryContext`.
190    cancellation_token: Option<tokio_util::sync::CancellationToken>,
191
192    /// Outer transaction's writer handle (FU-1 / M11 #6). Threaded
193    /// from the [`crate::Executor`] when the query is running inside a
194    /// write-mode transaction; consumed by
195    /// `QueryProcedureHost::with_writer` at procedure invocation time
196    /// so a declared `WRITE`-mode procedure's Cypher body can mutate
197    /// the outer transaction's L0. `Arc<Writer>` (interior-mutable,
198    /// no outer lock) matches the executor's writer handle type.
199    writer: Option<Arc<uni_store::Writer>>,
200}
201
202impl std::fmt::Debug for GraphExecutionContext {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("GraphExecutionContext")
205            .field("l0_context", &self.l0_context)
206            .field("deadline", &self.deadline)
207            .finish_non_exhaustive()
208    }
209}
210
211/// L0 buffer visibility context for MVCC reads.
212///
213/// Maintains references to all L0 buffers that should be visible to a query:
214/// - Current L0: The active write buffer
215/// - Transaction L0: Buffer for the current transaction (if any)
216/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
217///
218/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
219#[derive(Clone, Default)]
220pub struct L0Context {
221    /// Current active L0 buffer.
222    pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
223
224    /// Transaction-local L0 buffer (if in a transaction).
225    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
226
227    /// L0 buffers pending flush to disk.
228    /// These remain visible until flush completes.
229    pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
230}
231
232impl std::fmt::Debug for L0Context {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("L0Context")
235            .field("current_l0", &self.current_l0.is_some())
236            .field("transaction_l0", &self.transaction_l0.is_some())
237            .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
238            .finish()
239    }
240}
241
242impl L0Context {
243    /// Create an empty L0 context with no buffers.
244    pub fn empty() -> Self {
245        Self::default()
246    }
247
248    /// Create L0 context with just a current buffer.
249    pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
250        Self {
251            current_l0: Some(l0),
252            ..Self::default()
253        }
254    }
255
256    /// Create L0 context from a query context.
257    pub fn from_query_context(ctx: &QueryContext) -> Self {
258        Self {
259            current_l0: Some(ctx.l0.clone()),
260            transaction_l0: ctx.transaction_l0.clone(),
261            pending_flush_l0s: ctx.pending_flush_l0s.clone(),
262        }
263    }
264
265    /// Iterate over all L0 buffers in visibility order.
266    /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
267    pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
268        self.pending_flush_l0s
269            .iter()
270            .chain(self.current_l0.iter())
271            .chain(self.transaction_l0.iter())
272    }
273}
274
275impl GraphExecutionContext {
276    /// Shared constructor for the public entry points. The three public
277    /// constructors differ only in the L0 visibility context, deadline,
278    /// and cancellation token; every other field starts at its default.
279    fn with_parts(
280        storage: Arc<StorageManager>,
281        l0_context: L0Context,
282        property_manager: Arc<PropertyManager>,
283        deadline: Option<Instant>,
284        cancellation_token: Option<tokio_util::sync::CancellationToken>,
285    ) -> Self {
286        Self {
287            storage,
288            l0_context,
289            property_manager,
290            deadline,
291            algo_registry: None,
292            procedure_registry: None,
293            plugin_registry: None,
294            xervo_runtime: None,
295            warnings: Arc::new(Mutex::new(Vec::new())),
296            cancellation_token,
297            writer: None,
298        }
299    }
300
301    /// Create a new graph execution context.
302    ///
303    /// # Arguments
304    ///
305    /// * `storage` - Storage manager for schema and dataset access
306    /// * `l0` - Current L0 buffer for MVCC visibility
307    /// * `property_manager` - Manager for lazy property loading
308    pub fn new(
309        storage: Arc<StorageManager>,
310        l0: Arc<RwLock<L0Buffer>>,
311        property_manager: Arc<PropertyManager>,
312    ) -> Self {
313        Self::with_parts(
314            storage,
315            L0Context::with_current(l0),
316            property_manager,
317            None,
318            None,
319        )
320    }
321
322    /// Create context with full L0 visibility.
323    ///
324    /// # Arguments
325    ///
326    /// * `storage` - Storage manager for schema and dataset access
327    /// * `l0_context` - L0 visibility context with all buffers
328    /// * `property_manager` - Manager for lazy property loading
329    pub fn with_l0_context(
330        storage: Arc<StorageManager>,
331        l0_context: L0Context,
332        property_manager: Arc<PropertyManager>,
333    ) -> Self {
334        Self::with_parts(storage, l0_context, property_manager, None, None)
335    }
336
337    /// Create context from a query context.
338    pub fn from_query_context(
339        storage: Arc<StorageManager>,
340        query_ctx: &QueryContext,
341        property_manager: Arc<PropertyManager>,
342    ) -> Self {
343        Self::with_parts(
344            storage,
345            L0Context::from_query_context(query_ctx),
346            property_manager,
347            query_ctx.deadline,
348            query_ctx.cancellation_token.clone(),
349        )
350    }
351
352    /// Set query timeout deadline.
353    pub fn with_deadline(mut self, deadline: Instant) -> Self {
354        self.deadline = Some(deadline);
355        self
356    }
357
358    /// Attach the outer transaction's writer handle so declared
359    /// `WRITE`-mode procedures invoked through this context can run
360    /// their Cypher bodies via the write-enabled inner-query host.
361    #[must_use]
362    pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
363        self.writer = Some(writer);
364        self
365    }
366
367    /// Borrow the outer transaction's writer handle, if any.
368    #[must_use]
369    pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
370        self.writer.as_ref()
371    }
372
373    /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
374    pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
375        self.algo_registry = Some(registry);
376        self
377    }
378
379    /// Get a reference to the algorithm registry, if set.
380    pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
381        self.algo_registry.as_ref()
382    }
383
384    /// Set the external procedure registry for test/user-defined procedures.
385    pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
386        self.procedure_registry = Some(registry);
387        self
388    }
389
390    /// Set Uni-Xervo runtime for query-time auto-embedding.
391    pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
392        self.xervo_runtime = Some(runtime);
393        self
394    }
395
396    /// Get a reference to the procedure registry, if set.
397    pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
398        self.procedure_registry.as_ref()
399    }
400
401    /// Attach the plugin registry. Required by the M5h.2 native-label
402    /// plugin-storage routing in `columnar_scan_vertex_batch_static`.
403    pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
404        self.plugin_registry = Some(registry);
405        self
406    }
407
408    /// Reference to the plugin registry (if set).
409    pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
410        self.plugin_registry.as_ref()
411    }
412
413    pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
414        self.xervo_runtime.as_ref()
415    }
416
417    /// Record a runtime warning.
418    pub fn push_warning(&self, warning: QueryWarning) {
419        if let Ok(mut w) = self.warnings.lock() {
420            w.push(warning);
421        }
422    }
423
424    /// Take all collected warnings, leaving the collector empty.
425    pub fn take_warnings(&self) -> Vec<QueryWarning> {
426        self.warnings
427            .lock()
428            .map(|mut w| std::mem::take(&mut *w))
429            .unwrap_or_default()
430    }
431
432    /// Check if the query has timed out.
433    ///
434    /// # Errors
435    ///
436    /// Returns an error if the deadline has passed.
437    pub fn check_timeout(&self) -> anyhow::Result<()> {
438        if let Some(ref token) = self.cancellation_token
439            && token.is_cancelled()
440        {
441            return Err(anyhow::anyhow!("Query cancelled"));
442        }
443        if let Some(deadline) = self.deadline
444            && Instant::now() > deadline
445        {
446            return Err(anyhow::anyhow!("Query timed out"));
447        }
448        Ok(())
449    }
450
451    /// Get a reference to the storage manager.
452    pub fn storage(&self) -> &Arc<StorageManager> {
453        &self.storage
454    }
455
456    /// Get a reference to the adjacency manager.
457    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
458        self.storage.adjacency_manager()
459    }
460
461    /// Get a reference to the property manager.
462    pub fn property_manager(&self) -> &Arc<PropertyManager> {
463        &self.property_manager
464    }
465
466    /// Get a reference to the L0 context.
467    pub fn l0_context(&self) -> &L0Context {
468        &self.l0_context
469    }
470
471    /// Wall-clock deadline for the surrounding query, if any.
472    ///
473    /// Internal accessor used by [`crate::query::executor::procedure_host::QueryProcedureHost`]
474    /// to snapshot the deadline so procedure plugins can implement
475    /// `check_timeout` without holding a borrow on this context.
476    #[must_use]
477    pub fn deadline_for_host(&self) -> Option<Instant> {
478        self.deadline
479    }
480
481    /// Cancellation token clone for the surrounding query, if any.
482    ///
483    /// Internal accessor for [`crate::query::executor::procedure_host::QueryProcedureHost`];
484    /// see [`Self::deadline_for_host`].
485    #[must_use]
486    pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
487        self.cancellation_token.clone()
488    }
489
490    /// Create a query context for property manager calls.
491    ///
492    /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
493    pub fn query_context(&self) -> QueryContext {
494        let l0 = self
495            .l0_context
496            .current_l0
497            .clone()
498            .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
499
500        let mut ctx = QueryContext::new_with_pending(
501            l0,
502            self.l0_context.transaction_l0.clone(),
503            self.l0_context.pending_flush_l0s.clone(),
504        );
505        if let Some(deadline) = self.deadline {
506            ctx.set_deadline(deadline);
507        }
508        ctx
509    }
510
511    /// Ensure adjacency CSRs are warmed for the given edge types and direction.
512    ///
513    /// This loads any missing CSR data from storage into the adjacency manager
514    /// so that subsequent `get_neighbors` calls return complete results.
515    /// Skips warming if the adjacency manager already has data (Main CSR or
516    /// active overlay) for the edge type, avoiding duplicate entries.
517    pub async fn ensure_adjacency_warmed(
518        &self,
519        edge_type_ids: &[u32],
520        direction: Direction,
521    ) -> anyhow::Result<()> {
522        let am = self.adjacency_manager();
523        let version = self.storage.version_high_water_mark();
524        for &etype_id in edge_type_ids {
525            // Skip if AM already has data (CSR or overlay) for this edge type.
526            // The overlay contains edges from dual-write (Writer), so warming
527            // would duplicate them.
528            if !am.is_active_for(etype_id, direction) {
529                for &dir in direction.expand() {
530                    // Use coalesced warming to prevent cache stampede (Issue #13)
531                    self.storage
532                        .warm_adjacency_coalesced(etype_id, dir, version)
533                        .await?;
534                }
535            }
536        }
537        Ok(())
538    }
539
540    /// Create a boxed warming future for use in DataFusion stream state machines.
541    ///
542    /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
543    /// suitable for polling in stream `poll_next` implementations.
544    pub fn warming_future(
545        self: &Arc<Self>,
546        edge_type_ids: Vec<u32>,
547        direction: Direction,
548    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
549    {
550        let ctx = self.clone();
551        Box::pin(async move {
552            ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
553                .await
554                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
555        })
556    }
557
558    /// Get neighbors for a vertex, merging CSR and all L0 buffers.
559    ///
560    /// This implements the MVCC visibility rules:
561    /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
562    /// 2. Overlay pending flush L0s (oldest to newest)
563    /// 3. Overlay current L0
564    /// 4. Overlay transaction L0 (if present)
565    /// 5. Filter tombstones (handled by overlay)
566    ///
567    /// # Arguments
568    ///
569    /// * `vid` - Source vertex ID
570    /// * `edge_type` - Edge type ID to traverse
571    /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
572    ///
573    /// # Returns
574    ///
575    /// Vector of (neighbor VID, edge ID) pairs.
576    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
577        let version_hwm = self.storage.version_high_water_mark();
578        // Single-vid case: acquire the transaction-L0 guard once for this
579        // vertex (the batch path amortizes it across many vertices).
580        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
581        self.neighbors_for_vid(vid, edge_type, direction, version_hwm, tx_guard.as_deref())
582    }
583
584    /// Get neighbors for multiple vertices in batch.
585    ///
586    /// More efficient than calling `get_neighbors` repeatedly as it amortizes
587    /// lock acquisition for L0 buffers.
588    ///
589    /// # Arguments
590    ///
591    /// * `vids` - Source vertex IDs
592    /// * `edge_type` - Edge type ID to traverse
593    /// * `direction` - Traversal direction
594    ///
595    /// # Returns
596    ///
597    /// Vector of (source VID, neighbor VID, edge ID) triples.
598    pub fn get_neighbors_batch(
599        &self,
600        vids: &[Vid],
601        edge_type: u32,
602        direction: Direction,
603    ) -> Vec<(Vid, Vid, Eid)> {
604        let version_hwm = self.storage.version_high_water_mark();
605        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
606
607        let mut results = Vec::new();
608        for &vid in vids {
609            let neighbors =
610                self.neighbors_for_vid(vid, edge_type, direction, version_hwm, tx_guard.as_deref());
611            results.extend(
612                neighbors
613                    .into_iter()
614                    .map(|(neighbor, eid)| (vid, neighbor, eid)),
615            );
616        }
617        results
618    }
619
620    /// Resolve a single vertex's neighbours, overlaying the transaction L0
621    /// (if visible) and recording the traversal into the SSI read-set.
622    ///
623    /// `tx_guard` is the already-acquired read guard over the transaction
624    /// L0 buffer (if any), so batch callers acquire the lock once and pass
625    /// the borrow in for every vertex.
626    fn neighbors_for_vid(
627        &self,
628        vid: Vid,
629        edge_type: u32,
630        direction: Direction,
631        version_hwm: Option<u64>,
632        tx_guard: Option<&L0Buffer>,
633    ) -> Vec<(Vid, Eid)> {
634        // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
635        // For snapshot queries, filter by version via StorageManager delegate.
636        let mut neighbors = if let Some(hwm) = version_hwm {
637            self.storage
638                .get_neighbors_at_version(vid, edge_type, direction, hwm)
639        } else {
640            self.adjacency_manager()
641                .get_neighbors(vid, edge_type, direction)
642        };
643
644        // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
645        if version_hwm.is_none()
646            && let Some(tx_guard) = tx_guard
647        {
648            overlay_l0_neighbors(
649                vid,
650                edge_type,
651                direction,
652                tx_guard,
653                &mut neighbors,
654                version_hwm,
655            );
656        }
657
658        self.record_neighbor_reads(vid, &neighbors);
659
660        neighbors
661    }
662
663    /// Records traversed edges and discovered neighbours into the SSI read-set.
664    ///
665    /// No-op unless this is a read-write transaction (`occ_read_set` is `Some`
666    /// only then), so read-only and analytical traversals pay nothing. Recording
667    /// the source plus each neighbour vid and edge id gives item-level
668    /// antidependency coverage for traversals, matching the keyed read paths.
669    fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
670        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
671            return;
672        };
673        let guard = tx_l0.read();
674        let Some(read_set) = &guard.occ_read_set else {
675            return;
676        };
677        let mut rs = read_set.lock();
678        rs.vertices.insert(src);
679        for (nbr, eid) in neighbors {
680            rs.vertices.insert(*nbr);
681            rs.edges.insert(*eid);
682        }
683    }
684
685    /// Records the vertex/edge ids in the given batch columns into the read-set.
686    ///
687    /// Used by [`ReadSetRecordingExec`] to capture the identities of rows that
688    /// survived a scan's filters. No-op when there is no transaction read-set
689    /// (read-only / analytical contexts).
690    ///
691    /// [`ReadSetRecordingExec`]: crate::query::df_graph::ReadSetRecordingExec
692    pub(crate) fn record_batch_ids(
693        &self,
694        batch: &arrow_array::RecordBatch,
695        vertex_cols: &[usize],
696        edge_cols: &[usize],
697    ) {
698        use arrow_array::{Array, UInt64Array};
699
700        if vertex_cols.is_empty() && edge_cols.is_empty() {
701            return;
702        }
703        let Some(tx_l0) = &self.l0_context.transaction_l0 else {
704            return;
705        };
706        let guard = tx_l0.read();
707        let Some(read_set) = &guard.occ_read_set else {
708            return;
709        };
710        let mut rs = read_set.lock();
711        for &col in vertex_cols {
712            if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
713                for i in 0..arr.len() {
714                    if !arr.is_null(i) {
715                        rs.vertices.insert(Vid::from(arr.value(i)));
716                    }
717                }
718            }
719        }
720        for &col in edge_cols {
721            if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
722                for i in 0..arr.len() {
723                    if !arr.is_null(i) {
724                        rs.edges.insert(Eid::from(arr.value(i)));
725                    }
726                }
727            }
728        }
729    }
730}
731
732/// Overlay L0 buffer neighbors onto existing neighbor list.
733///
734/// Adds new edges from L0 and removes tombstoned edges.
735/// Filters by version if a snapshot boundary is provided.
736fn overlay_l0_neighbors(
737    vid: Vid,
738    edge_type: u32,
739    direction: Direction,
740    l0: &L0Buffer,
741    neighbors: &mut Vec<(Vid, Eid)>,
742    version_hwm: Option<u64>,
743) {
744    use std::collections::HashMap;
745
746    // Convert to map for efficient updates
747    let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
748
749    // Query L0 for each direction
750    for &simple_dir in direction.to_simple_directions() {
751        for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
752            // Skip edges beyond snapshot boundary
753            if version_hwm.is_some_and(|hwm| version > hwm) {
754                continue;
755            }
756
757            // Apply insert or remove tombstone
758            if l0.is_tombstoned(eid) {
759                neighbor_map.remove(&eid);
760            } else {
761                neighbor_map.insert(eid, neighbor);
762            }
763        }
764    }
765
766    // Remove edges tombstoned in this L0 but originating from other layers
767    for eid in l0.tombstones.keys() {
768        neighbor_map.remove(eid);
769    }
770
771    // Convert back to vec
772    *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
773}
774
775/// Extension trait to convert storage Direction to SimpleGraph directions.
776trait DirectionExt {
777    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
778}
779
780impl DirectionExt for Direction {
781    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
782        use uni_common::graph::simple_graph::Direction as SimpleDirection;
783        match self {
784            Direction::Outgoing => &[SimpleDirection::Outgoing],
785            Direction::Incoming => &[SimpleDirection::Incoming],
786            Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
787        }
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use super::*;
794
795    #[test]
796    fn test_l0_context_empty() {
797        let ctx = L0Context::empty();
798        assert!(ctx.current_l0.is_none());
799        assert!(ctx.transaction_l0.is_none());
800        assert!(ctx.pending_flush_l0s.is_empty());
801    }
802}