uni_query/query/df_graph/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │ DataFusion ExecutionPlan Tree │
20//! ├─────────────────────────────────────────┤
21//! │ ProjectionExec (DataFusion) │
22//! │ │ │
23//! │ FilterExec (DataFusion) │
24//! │ │ │
25//! │ GraphTraverseExec (CUSTOM) │
26//! │ │ │
27//! │ GraphScanExec (CUSTOM) │
28//! │ │ │
29//! │ UniTableProvider + UniMergeExec │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_profile;
63pub mod locy_program;
64pub mod locy_query;
65pub mod locy_slg;
66pub mod locy_traits;
67pub mod locy_validate;
68pub mod mutation_common;
69pub mod mutation_delete;
70pub mod mutation_foreach;
71pub mod mutation_remove;
72pub mod mutation_set;
73pub mod nfa;
74pub mod optional_filter;
75pub mod pattern_comprehension;
76pub mod pattern_exists;
77pub mod pred_dag;
78pub mod procedure_call;
79pub mod quantifier;
80pub mod raw_bytes_marker;
81mod read_set_exec;
82pub mod recursive_cte;
83pub mod reduce;
84pub mod scan;
85pub mod shortest_path;
86pub(crate) mod similar_to_expr;
87pub mod traverse;
88pub mod unwind;
89pub mod vector_knn;
90pub mod vid_lookup_join;
91
92use crate::query::executor::procedure::ProcedureRegistry;
93use parking_lot::RwLock;
94use std::sync::{Arc, Mutex};
95use std::time::Instant;
96use uni_algo::algo::AlgorithmRegistry;
97use uni_common::core::id::{Eid, Vid};
98use uni_store::runtime::context::QueryContext;
99use uni_store::runtime::l0::L0Buffer;
100use uni_store::runtime::property_manager::PropertyManager;
101use uni_store::storage::adjacency_manager::AdjacencyManager;
102use uni_store::storage::direction::Direction;
103use uni_store::storage::manager::StorageManager;
104
105pub mod search_procedures;
106use uni_xervo::runtime::ModelRuntime;
107
108use crate::types::QueryWarning;
109
110pub use apply::GraphApplyExec;
111pub use ext_id_lookup::GraphExtIdLookupExec;
112// CREATE and MERGE both execute through `MutationExec`; these aliases and
113// the `new_*_exec` builders are re-exported here so the planner can refer to
114// them by their clause-specific names without a dedicated module each.
115pub use mutation_common::{
116 MutationContext, MutationExec, MutationExec as MutationCreateExec,
117 MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
118};
119pub use mutation_delete::MutationDeleteExec;
120pub use mutation_foreach::ForeachExec;
121pub use mutation_remove::MutationRemoveExec;
122pub use mutation_set::MutationSetExec;
123pub use optional_filter::OptionalFilterExec;
124pub use procedure_call::GraphProcedureCallExec;
125pub use read_set_exec::ReadSetRecordingExec;
126pub use scan::GraphScanExec;
127pub use shortest_path::GraphShortestPathExec;
128pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
129pub use unwind::GraphUnwindExec;
130pub use vector_knn::GraphVectorKnnExec;
131
132pub use locy_best_by::BestByExec;
133pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
134pub use locy_fixpoint::{
135 DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
136 FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
137};
138pub use locy_fold::FoldExec;
139pub use locy_priority::PriorityExec;
140pub use locy_program::{DerivedStore, LocyProgramExec};
141pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
142
143/// Shared context for graph operators.
144///
145/// Provides access to graph-specific resources needed during query execution:
146/// - CSR adjacency cache for fast neighbor lookups
147/// - L0 buffers for MVCC visibility of uncommitted changes
148/// - Property manager for lazy-loading vertex/edge properties
149/// - Storage manager for schema and dataset access
150///
151/// # Example
152///
153/// ```ignore
154/// let ctx = GraphExecutionContext::new(
155/// storage_manager,
156/// l0_buffer,
157/// property_manager,
158/// );
159///
160/// // Get neighbors with L0 overlay
161/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
162/// ```
163pub struct GraphExecutionContext {
164 /// Storage manager for schema and dataset access.
165 storage: Arc<StorageManager>,
166
167 /// L0 visibility context for MVCC.
168 l0_context: L0Context,
169
170 /// Property manager for lazy property loading.
171 property_manager: Arc<PropertyManager>,
172
173 /// Query timeout deadline.
174 deadline: Option<Instant>,
175
176 /// Algorithm registry for `uni.algo.*` procedure dispatch.
177 algo_registry: Option<Arc<AlgorithmRegistry>>,
178
179 /// External procedure registry for test/user-defined procedures.
180 procedure_registry: Option<Arc<ProcedureRegistry>>,
181 /// Plugin registry — used by the native-label scan dispatcher
182 /// (M5h.2) to route a label's reads through plugin `Storage` when
183 /// one is registered via `PluginRegistry::register_label_storage`.
184 plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
185 /// Uni-Xervo runtime used by vector auto-embedding paths.
186 xervo_runtime: Option<Arc<ModelRuntime>>,
187
188 /// Runtime warnings collected during query execution.
189 warnings: Arc<Mutex<Vec<QueryWarning>>>,
190
191 /// Cooperative cancellation token, threaded from `QueryContext`.
192 cancellation_token: Option<tokio_util::sync::CancellationToken>,
193
194 /// Outer transaction's writer handle (FU-1 / M11 #6). Threaded
195 /// from the [`crate::Executor`] when the query is running inside a
196 /// write-mode transaction; consumed by
197 /// `QueryProcedureHost::with_writer` at procedure invocation time
198 /// so a declared `WRITE`-mode procedure's Cypher body can mutate
199 /// the outer transaction's L0. `Arc<Writer>` (interior-mutable,
200 /// no outer lock) matches the executor's writer handle type.
201 writer: Option<Arc<uni_store::Writer>>,
202}
203
204impl std::fmt::Debug for GraphExecutionContext {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 f.debug_struct("GraphExecutionContext")
207 .field("l0_context", &self.l0_context)
208 .field("deadline", &self.deadline)
209 .finish_non_exhaustive()
210 }
211}
212
213/// L0 buffer visibility context for MVCC reads.
214///
215/// Maintains references to all L0 buffers that should be visible to a query:
216/// - Current L0: The active write buffer
217/// - Transaction L0: Buffer for the current transaction (if any)
218/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
219///
220/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
221#[derive(Clone, Default)]
222pub struct L0Context {
223 /// Current active L0 buffer.
224 pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
225
226 /// Transaction-local L0 buffer (if in a transaction).
227 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
228
229 /// L0 buffers pending flush to disk.
230 /// These remain visible until flush completes.
231 pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
232}
233
234impl std::fmt::Debug for L0Context {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 f.debug_struct("L0Context")
237 .field("current_l0", &self.current_l0.is_some())
238 .field("transaction_l0", &self.transaction_l0.is_some())
239 .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
240 .finish()
241 }
242}
243
244impl L0Context {
245 /// Create an empty L0 context with no buffers.
246 pub fn empty() -> Self {
247 Self::default()
248 }
249
250 /// Create L0 context with just a current buffer.
251 pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
252 Self {
253 current_l0: Some(l0),
254 ..Self::default()
255 }
256 }
257
258 /// Create L0 context from a query context.
259 pub fn from_query_context(ctx: &QueryContext) -> Self {
260 Self {
261 current_l0: Some(ctx.l0.clone()),
262 transaction_l0: ctx.transaction_l0.clone(),
263 pending_flush_l0s: ctx.pending_flush_l0s.clone(),
264 }
265 }
266
267 /// Iterate over all L0 buffers in visibility order.
268 /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
269 pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
270 self.pending_flush_l0s
271 .iter()
272 .chain(self.current_l0.iter())
273 .chain(self.transaction_l0.iter())
274 }
275}
276
277impl GraphExecutionContext {
278 /// Shared constructor for the public entry points. The three public
279 /// constructors differ only in the L0 visibility context, deadline,
280 /// and cancellation token; every other field starts at its default.
281 fn with_parts(
282 storage: Arc<StorageManager>,
283 l0_context: L0Context,
284 property_manager: Arc<PropertyManager>,
285 deadline: Option<Instant>,
286 cancellation_token: Option<tokio_util::sync::CancellationToken>,
287 ) -> Self {
288 Self {
289 storage,
290 l0_context,
291 property_manager,
292 deadline,
293 algo_registry: None,
294 procedure_registry: None,
295 plugin_registry: None,
296 xervo_runtime: None,
297 warnings: Arc::new(Mutex::new(Vec::new())),
298 cancellation_token,
299 writer: None,
300 }
301 }
302
303 /// Create a new graph execution context.
304 ///
305 /// # Arguments
306 ///
307 /// * `storage` - Storage manager for schema and dataset access
308 /// * `l0` - Current L0 buffer for MVCC visibility
309 /// * `property_manager` - Manager for lazy property loading
310 pub fn new(
311 storage: Arc<StorageManager>,
312 l0: Arc<RwLock<L0Buffer>>,
313 property_manager: Arc<PropertyManager>,
314 ) -> Self {
315 Self::with_parts(
316 storage,
317 L0Context::with_current(l0),
318 property_manager,
319 None,
320 None,
321 )
322 }
323
324 /// Create context with full L0 visibility.
325 ///
326 /// # Arguments
327 ///
328 /// * `storage` - Storage manager for schema and dataset access
329 /// * `l0_context` - L0 visibility context with all buffers
330 /// * `property_manager` - Manager for lazy property loading
331 pub fn with_l0_context(
332 storage: Arc<StorageManager>,
333 l0_context: L0Context,
334 property_manager: Arc<PropertyManager>,
335 ) -> Self {
336 Self::with_parts(storage, l0_context, property_manager, None, None)
337 }
338
339 /// Create context from a query context.
340 pub fn from_query_context(
341 storage: Arc<StorageManager>,
342 query_ctx: &QueryContext,
343 property_manager: Arc<PropertyManager>,
344 ) -> Self {
345 Self::with_parts(
346 storage,
347 L0Context::from_query_context(query_ctx),
348 property_manager,
349 query_ctx.deadline,
350 query_ctx.cancellation_token.clone(),
351 )
352 }
353
354 /// Set query timeout deadline.
355 pub fn with_deadline(mut self, deadline: Instant) -> Self {
356 self.deadline = Some(deadline);
357 self
358 }
359
360 /// Attach the outer transaction's writer handle so declared
361 /// `WRITE`-mode procedures invoked through this context can run
362 /// their Cypher bodies via the write-enabled inner-query host.
363 #[must_use]
364 pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
365 self.writer = Some(writer);
366 self
367 }
368
369 /// Borrow the outer transaction's writer handle, if any.
370 #[must_use]
371 pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
372 self.writer.as_ref()
373 }
374
375 /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
376 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
377 self.algo_registry = Some(registry);
378 self
379 }
380
381 /// Get a reference to the algorithm registry, if set.
382 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
383 self.algo_registry.as_ref()
384 }
385
386 /// Set the external procedure registry for test/user-defined procedures.
387 pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
388 self.procedure_registry = Some(registry);
389 self
390 }
391
392 /// Set Uni-Xervo runtime for query-time auto-embedding.
393 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
394 self.xervo_runtime = Some(runtime);
395 self
396 }
397
398 /// Get a reference to the procedure registry, if set.
399 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
400 self.procedure_registry.as_ref()
401 }
402
403 /// Attach the plugin registry. Required by the M5h.2 native-label
404 /// plugin-storage routing in `columnar_scan_vertex_batch_static`.
405 pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
406 self.plugin_registry = Some(registry);
407 self
408 }
409
410 /// Reference to the plugin registry (if set).
411 pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
412 self.plugin_registry.as_ref()
413 }
414
415 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
416 self.xervo_runtime.as_ref()
417 }
418
419 /// Record a runtime warning.
420 pub fn push_warning(&self, warning: QueryWarning) {
421 if let Ok(mut w) = self.warnings.lock() {
422 w.push(warning);
423 }
424 }
425
426 /// Take all collected warnings, leaving the collector empty.
427 pub fn take_warnings(&self) -> Vec<QueryWarning> {
428 self.warnings
429 .lock()
430 .map(|mut w| std::mem::take(&mut *w))
431 .unwrap_or_default()
432 }
433
434 /// Check if the query has timed out.
435 ///
436 /// # Errors
437 ///
438 /// Returns an error if the deadline has passed.
439 pub fn check_timeout(&self) -> anyhow::Result<()> {
440 if let Some(ref token) = self.cancellation_token
441 && token.is_cancelled()
442 {
443 return Err(anyhow::anyhow!("Query cancelled"));
444 }
445 if let Some(deadline) = self.deadline
446 && Instant::now() > deadline
447 {
448 return Err(anyhow::anyhow!("Query timed out"));
449 }
450 Ok(())
451 }
452
453 /// Get a reference to the storage manager.
454 pub fn storage(&self) -> &Arc<StorageManager> {
455 &self.storage
456 }
457
458 /// Get a reference to the adjacency manager.
459 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
460 self.storage.adjacency_manager()
461 }
462
463 /// Get a reference to the property manager.
464 pub fn property_manager(&self) -> &Arc<PropertyManager> {
465 &self.property_manager
466 }
467
468 /// Get a reference to the L0 context.
469 pub fn l0_context(&self) -> &L0Context {
470 &self.l0_context
471 }
472
473 /// Resolve a vertex's labels for a traversal-time label predicate.
474 ///
475 /// Checks the L0 chain first (which also records the read for SSI), then
476 /// the persisted `VidLabelsIndex`. The L0 chain covers fork-local and
477 /// committed-but-unflushed writes; the index covers vertices that live
478 /// only in Lance storage — notably every vertex on a fork, whose data is
479 /// flushed to Lance before branching. Returns `None` only when the vertex
480 /// is absent from both, in which case callers fall back to the label that
481 /// scoped the traversal.
482 ///
483 /// # Examples
484 ///
485 /// ```ignore
486 /// if let Some(labels) = ctx.resolve_vertex_labels(target_vid, &query_ctx) {
487 /// keep = labels.iter().any(|l| l == "B");
488 /// }
489 /// ```
490 pub fn resolve_vertex_labels(&self, vid: Vid, query_ctx: &QueryContext) -> Option<Vec<String>> {
491 uni_store::runtime::l0_visibility::get_vertex_labels_optional(vid, query_ctx)
492 .or_else(|| self.storage.get_labels_from_index(vid))
493 }
494
495 /// Wall-clock deadline for the surrounding query, if any.
496 ///
497 /// Internal accessor used by [`crate::query::executor::procedure_host::QueryProcedureHost`]
498 /// to snapshot the deadline so procedure plugins can implement
499 /// `check_timeout` without holding a borrow on this context.
500 #[must_use]
501 pub fn deadline_for_host(&self) -> Option<Instant> {
502 self.deadline
503 }
504
505 /// Cancellation token clone for the surrounding query, if any.
506 ///
507 /// Internal accessor for [`crate::query::executor::procedure_host::QueryProcedureHost`];
508 /// see [`Self::deadline_for_host`].
509 #[must_use]
510 pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
511 self.cancellation_token.clone()
512 }
513
514 /// Create a query context for property manager calls.
515 ///
516 /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
517 pub fn query_context(&self) -> QueryContext {
518 let l0 = self
519 .l0_context
520 .current_l0
521 .clone()
522 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
523
524 let mut ctx = QueryContext::new_with_pending(
525 l0,
526 self.l0_context.transaction_l0.clone(),
527 self.l0_context.pending_flush_l0s.clone(),
528 );
529 if let Some(deadline) = self.deadline {
530 ctx.set_deadline(deadline);
531 }
532 ctx
533 }
534
535 /// Ensure adjacency CSRs are warmed for the given edge types and direction.
536 ///
537 /// This loads any missing CSR data from storage into the adjacency manager
538 /// so that subsequent `get_neighbors` calls return complete results.
539 /// Skips warming if the adjacency manager already has data (Main CSR or
540 /// active overlay) for the edge type, avoiding duplicate entries.
541 pub async fn ensure_adjacency_warmed(
542 &self,
543 edge_type_ids: &[u32],
544 direction: Direction,
545 ) -> anyhow::Result<()> {
546 let am = self.adjacency_manager();
547 // Manifest pin only: tx version pins must NOT filter adjacency
548 // warming/reads (see StorageManager::snapshot_version_hwm).
549 let version = self.storage.snapshot_version_hwm();
550 for &etype_id in edge_type_ids {
551 // Skip if AM already has data (CSR or overlay) for this edge type.
552 // The overlay contains edges from dual-write (Writer), so warming
553 // would duplicate them.
554 if !am.is_active_for(etype_id, direction) {
555 for &dir in direction.expand() {
556 // Use coalesced warming to prevent cache stampede (Issue #13)
557 self.storage
558 .warm_adjacency_coalesced(etype_id, dir, version)
559 .await?;
560 }
561 }
562 }
563 Ok(())
564 }
565
566 /// Create a boxed warming future for use in DataFusion stream state machines.
567 ///
568 /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
569 /// suitable for polling in stream `poll_next` implementations.
570 pub fn warming_future(
571 self: &Arc<Self>,
572 edge_type_ids: Vec<u32>,
573 direction: Direction,
574 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
575 {
576 let ctx = self.clone();
577 Box::pin(async move {
578 ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
579 .await
580 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
581 })
582 }
583
584 /// Get neighbors for a vertex, merging CSR and all L0 buffers.
585 ///
586 /// This implements the MVCC visibility rules:
587 /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
588 /// 2. Overlay pending flush L0s (oldest to newest)
589 /// 3. Overlay current L0
590 /// 4. Overlay transaction L0 (if present)
591 /// 5. Filter tombstones (handled by overlay)
592 ///
593 /// # Arguments
594 ///
595 /// * `vid` - Source vertex ID
596 /// * `edge_type` - Edge type ID to traverse
597 /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
598 ///
599 /// # Returns
600 ///
601 /// Vector of (neighbor VID, edge ID) pairs.
602 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
603 // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
604 let version_hwm = self.storage.snapshot_version_hwm();
605 // Single-vid case: acquire the transaction-L0 guard once for this
606 // vertex (the batch path amortizes it across many vertices).
607 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
608 self.neighbors_for_vid(
609 vid,
610 edge_type,
611 direction,
612 version_hwm,
613 tx_guard.as_deref(),
614 true,
615 )
616 }
617
618 /// Get neighbors for multiple vertices in batch.
619 ///
620 /// More efficient than calling `get_neighbors` repeatedly as it amortizes
621 /// lock acquisition for L0 buffers.
622 ///
623 /// # Arguments
624 ///
625 /// * `vids` - Source vertex IDs
626 /// * `edge_type` - Edge type ID to traverse
627 /// * `direction` - Traversal direction
628 ///
629 /// # Returns
630 ///
631 /// Vector of (source VID, neighbor VID, edge ID) triples.
632 pub fn get_neighbors_batch(
633 &self,
634 vids: &[Vid],
635 edge_type: u32,
636 direction: Direction,
637 ) -> Vec<(Vid, Vid, Eid)> {
638 // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
639 let version_hwm = self.storage.snapshot_version_hwm();
640 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
641
642 let mut results = Vec::new();
643 for &vid in vids {
644 // record_reads=false: the whole batch is recorded in one read-set
645 // lock below instead of two lock acquisitions per source vertex.
646 let neighbors = self.neighbors_for_vid(
647 vid,
648 edge_type,
649 direction,
650 version_hwm,
651 tx_guard.as_deref(),
652 false,
653 );
654 results.extend(
655 neighbors
656 .into_iter()
657 .map(|(neighbor, eid)| (vid, neighbor, eid)),
658 );
659 }
660 drop(tx_guard);
661 self.record_neighbor_reads_batch(vids, &results);
662 results
663 }
664
665 /// Resolve an edge's STORED `(src, dst)` orientation given a traversed hop.
666 ///
667 /// A relationship in a path must report its stored (start -> end) direction
668 /// even when the path traversed it backward (undirected `-[r]-` or incoming
669 /// `<-[r]-`). This first consults the L0 visibility chain (exact for
670 /// in-memory edges); if the edge has been flushed to durable storage and is
671 /// no longer L0-resident, it recovers the orientation with a bounded
672 /// directed-outgoing adjacency probe: the edge is stored
673 /// `(traversal_src -> traversal_dst)` iff `eid` appears among
674 /// `traversal_src`'s outgoing neighbours for one of `edge_type_ids`,
675 /// otherwise it is the reverse. Falls back to the traversal order only when
676 /// the probe is inconclusive (e.g. the type carries no CSR adjacency).
677 ///
678 /// The probe costs at most the out-degree of one vertex per candidate edge
679 /// type — never a full edge scan — and reads the adjacency manager directly,
680 /// so it does not perturb the SSI read-set.
681 ///
682 /// # Examples
683 ///
684 /// ```ignore
685 /// let (src, dst) =
686 /// ctx.resolve_stored_edge_endpoints(eid, node_path[i], node_path[i + 1], &edge_type_ids);
687 /// ```
688 #[must_use]
689 pub fn resolve_stored_edge_endpoints(
690 &self,
691 eid: Eid,
692 traversal_src: Vid,
693 traversal_dst: Vid,
694 edge_type_ids: &[u32],
695 ) -> (u64, u64) {
696 // 1. L0 visibility chain — exact stored endpoints for in-memory edges.
697 let query_ctx = self.query_context();
698 if let Some((src, dst)) =
699 uni_store::runtime::l0_visibility::get_edge_endpoints(eid, &query_ctx)
700 {
701 return (src.as_u64(), dst.as_u64());
702 }
703
704 // 2. Flushed (L1-resident) edge: recover orientation via a directed
705 // outgoing adjacency probe. Read the adjacency manager / versioned
706 // snapshot directly so the probe stays out of the SSI read-set.
707 //
708 // When the caller could not supply the edge's type ids (e.g. an
709 // anonymous `-[]-` relationship reaching BindFixedPath without a
710 // `_type` column), fall back to the adjacency manager's warmed types
711 // — exactly the set traversed by this query, so still a bounded probe.
712 let adjacency_manager = self.adjacency_manager();
713 let warmed_fallback: Vec<u32>;
714 let probe_types: &[u32] = if edge_type_ids.is_empty() {
715 warmed_fallback = adjacency_manager.known_edge_type_ids();
716 &warmed_fallback
717 } else {
718 edge_type_ids
719 };
720 let version_hwm = self.storage.snapshot_version_hwm();
721 let outgoing_contains = |vid: Vid| -> bool {
722 probe_types.iter().any(|&etype| {
723 let neighbors = match version_hwm {
724 Some(hwm) => {
725 self.storage
726 .get_neighbors_at_version(vid, etype, Direction::Outgoing, hwm)
727 }
728 None => adjacency_manager.get_neighbors(vid, etype, Direction::Outgoing),
729 };
730 neighbors.iter().any(|&(_, e)| e == eid)
731 })
732 };
733
734 if outgoing_contains(traversal_src) {
735 (traversal_src.as_u64(), traversal_dst.as_u64())
736 } else if outgoing_contains(traversal_dst) {
737 (traversal_dst.as_u64(), traversal_src.as_u64())
738 } else {
739 // 3. Inconclusive (no CSR adjacency for this type): preserve the
740 // long-standing traversal-order behaviour.
741 (traversal_src.as_u64(), traversal_dst.as_u64())
742 }
743 }
744
745 /// Resolve a single vertex's neighbours, overlaying the transaction L0
746 /// (if visible) and recording the traversal into the SSI read-set.
747 ///
748 /// `tx_guard` is the already-acquired read guard over the transaction
749 /// L0 buffer (if any), so batch callers acquire the lock once and pass
750 /// the borrow in for every vertex.
751 /// When `record_reads` is false the caller takes responsibility for
752 /// recording the traversal into the SSI read-set (the batch path records
753 /// once per batch via [`record_neighbor_reads_batch`]).
754 ///
755 /// [`record_neighbor_reads_batch`]: Self::record_neighbor_reads_batch
756 fn neighbors_for_vid(
757 &self,
758 vid: Vid,
759 edge_type: u32,
760 direction: Direction,
761 version_hwm: Option<u64>,
762 tx_guard: Option<&L0Buffer>,
763 record_reads: bool,
764 ) -> Vec<(Vid, Eid)> {
765 // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
766 // For snapshot queries, filter by version via StorageManager delegate.
767 let mut neighbors = if let Some(hwm) = version_hwm {
768 self.storage
769 .get_neighbors_at_version(vid, edge_type, direction, hwm)
770 } else {
771 self.adjacency_manager()
772 .get_neighbors(vid, edge_type, direction)
773 };
774
775 // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
776 if version_hwm.is_none()
777 && let Some(tx_guard) = tx_guard
778 {
779 overlay_l0_neighbors(
780 vid,
781 edge_type,
782 direction,
783 tx_guard,
784 &mut neighbors,
785 version_hwm,
786 );
787 }
788
789 if record_reads {
790 self.record_neighbor_reads(vid, &neighbors);
791 }
792
793 neighbors
794 }
795
796 /// Records traversed edges and discovered neighbours into the SSI read-set.
797 ///
798 /// No-op unless this is a read-write transaction (`occ_read_set` is `Some`
799 /// only then), so read-only and analytical traversals pay nothing. Recording
800 /// the source plus each neighbour vid and edge id gives item-level
801 /// antidependency coverage for traversals, matching the keyed read paths.
802 fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
803 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
804 return;
805 };
806 let guard = tx_l0.read();
807 let Some(read_set) = &guard.occ_read_set else {
808 return;
809 };
810 let mut rs = read_set.lock();
811 rs.vertices.insert(src);
812 for (nbr, eid) in neighbors {
813 rs.vertices.insert(*nbr);
814 rs.edges.insert(*eid);
815 }
816 }
817
818 /// Batch variant of [`record_neighbor_reads`](Self::record_neighbor_reads):
819 /// records an entire expansion batch under ONE read-set lock instead of
820 /// two lock acquisitions per source vertex.
821 ///
822 /// `srcs` is recorded in full — a source with zero neighbours is still a
823 /// read ("no edges here") that a concurrent edge insert must conflict
824 /// with, exactly as the per-vertex recorder behaves.
825 fn record_neighbor_reads_batch(&self, srcs: &[Vid], triples: &[(Vid, Vid, Eid)]) {
826 if srcs.is_empty() && triples.is_empty() {
827 return;
828 }
829 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
830 return;
831 };
832 let guard = tx_l0.read();
833 let Some(read_set) = &guard.occ_read_set else {
834 return;
835 };
836 let mut rs = read_set.lock();
837 for src in srcs {
838 rs.vertices.insert(*src);
839 }
840 for (_, nbr, eid) in triples {
841 rs.vertices.insert(*nbr);
842 rs.edges.insert(*eid);
843 }
844 }
845
846 /// Records the vertex/edge ids in the given batch columns into the read-set.
847 ///
848 /// Used by [`ReadSetRecordingExec`] to capture the identities of rows that
849 /// survived a scan's filters. No-op when there is no transaction read-set
850 /// (read-only / analytical contexts).
851 ///
852 /// [`ReadSetRecordingExec`]: crate::query::df_graph::ReadSetRecordingExec
853 pub(crate) fn record_batch_ids(
854 &self,
855 batch: &arrow_array::RecordBatch,
856 vertex_cols: &[usize],
857 edge_cols: &[usize],
858 ) {
859 use arrow_array::{Array, UInt64Array};
860
861 if vertex_cols.is_empty() && edge_cols.is_empty() {
862 return;
863 }
864 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
865 return;
866 };
867 let guard = tx_l0.read();
868 let Some(read_set) = &guard.occ_read_set else {
869 return;
870 };
871 let mut rs = read_set.lock();
872 for &col in vertex_cols {
873 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
874 for i in 0..arr.len() {
875 if !arr.is_null(i) {
876 rs.vertices.insert(Vid::from(arr.value(i)));
877 }
878 }
879 }
880 }
881 for &col in edge_cols {
882 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
883 for i in 0..arr.len() {
884 if !arr.is_null(i) {
885 rs.edges.insert(Eid::from(arr.value(i)));
886 }
887 }
888 }
889 }
890 }
891}
892
893/// Overlay L0 buffer neighbors onto existing neighbor list.
894///
895/// Adds new edges from L0 and removes tombstoned edges.
896/// Filters by version if a snapshot boundary is provided.
897fn overlay_l0_neighbors(
898 vid: Vid,
899 edge_type: u32,
900 direction: Direction,
901 l0: &L0Buffer,
902 neighbors: &mut Vec<(Vid, Eid)>,
903 version_hwm: Option<u64>,
904) {
905 use std::collections::HashMap;
906
907 // Convert to map for efficient updates
908 let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
909
910 // Query L0 for each direction
911 for &simple_dir in direction.to_simple_directions() {
912 for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
913 // Skip edges beyond snapshot boundary
914 if version_hwm.is_some_and(|hwm| version > hwm) {
915 continue;
916 }
917
918 // Apply insert or remove tombstone
919 if l0.is_tombstoned(eid) {
920 neighbor_map.remove(&eid);
921 } else {
922 neighbor_map.insert(eid, neighbor);
923 }
924 }
925 }
926
927 // Remove edges tombstoned in this L0 but originating from other layers
928 for eid in l0.tombstones.keys() {
929 neighbor_map.remove(eid);
930 }
931
932 // Convert back to vec
933 *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
934}
935
936/// Extension trait to convert storage Direction to SimpleGraph directions.
937trait DirectionExt {
938 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
939}
940
941impl DirectionExt for Direction {
942 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
943 use uni_common::graph::simple_graph::Direction as SimpleDirection;
944 match self {
945 Direction::Outgoing => &[SimpleDirection::Outgoing],
946 Direction::Incoming => &[SimpleDirection::Incoming],
947 Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
948 }
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955
956 #[test]
957 fn test_l0_context_empty() {
958 let ctx = L0Context::empty();
959 assert!(ctx.current_l0.is_none());
960 assert!(ctx.transaction_l0.is_none());
961 assert!(ctx.pending_flush_l0s.is_empty());
962 }
963}