uni_query/query/df_graph/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │ DataFusion ExecutionPlan Tree │
20//! ├─────────────────────────────────────────┤
21//! │ ProjectionExec (DataFusion) │
22//! │ │ │
23//! │ FilterExec (DataFusion) │
24//! │ │ │
25//! │ GraphTraverseExec (CUSTOM) │
26//! │ │ │
27//! │ GraphScanExec (CUSTOM) │
28//! │ │ │
29//! │ UniTableProvider + UniMergeExec │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_program;
63pub mod locy_query;
64pub mod locy_slg;
65pub mod locy_traits;
66pub mod locy_validate;
67pub mod mutation_common;
68pub mod mutation_delete;
69pub mod mutation_foreach;
70pub mod mutation_remove;
71pub mod mutation_set;
72pub mod nfa;
73pub mod optional_filter;
74pub mod pattern_comprehension;
75pub mod pattern_exists;
76pub mod pred_dag;
77pub mod procedure_call;
78pub mod quantifier;
79mod read_set_exec;
80pub mod recursive_cte;
81pub mod reduce;
82pub mod scan;
83pub mod shortest_path;
84pub(crate) mod similar_to_expr;
85pub mod traverse;
86pub mod unwind;
87pub mod vector_knn;
88pub mod vid_lookup_join;
89
90use crate::query::executor::procedure::ProcedureRegistry;
91use parking_lot::RwLock;
92use std::sync::{Arc, Mutex};
93use std::time::Instant;
94use uni_algo::algo::AlgorithmRegistry;
95use uni_common::core::id::{Eid, Vid};
96use uni_store::runtime::context::QueryContext;
97use uni_store::runtime::l0::L0Buffer;
98use uni_store::runtime::property_manager::PropertyManager;
99use uni_store::storage::adjacency_manager::AdjacencyManager;
100use uni_store::storage::direction::Direction;
101use uni_store::storage::manager::StorageManager;
102
103pub mod search_procedures;
104use uni_xervo::runtime::ModelRuntime;
105
106use crate::types::QueryWarning;
107
108pub use apply::GraphApplyExec;
109pub use ext_id_lookup::GraphExtIdLookupExec;
110// CREATE and MERGE both execute through `MutationExec`; these aliases and
111// the `new_*_exec` builders are re-exported here so the planner can refer to
112// them by their clause-specific names without a dedicated module each.
113pub use mutation_common::{
114 MutationContext, MutationExec, MutationExec as MutationCreateExec,
115 MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
116};
117pub use mutation_delete::MutationDeleteExec;
118pub use mutation_foreach::ForeachExec;
119pub use mutation_remove::MutationRemoveExec;
120pub use mutation_set::MutationSetExec;
121pub use optional_filter::OptionalFilterExec;
122pub use procedure_call::GraphProcedureCallExec;
123pub use read_set_exec::ReadSetRecordingExec;
124pub use scan::GraphScanExec;
125pub use shortest_path::GraphShortestPathExec;
126pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
127pub use unwind::GraphUnwindExec;
128pub use vector_knn::GraphVectorKnnExec;
129
130pub use locy_best_by::BestByExec;
131pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
132pub use locy_fixpoint::{
133 DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
134 FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
135};
136pub use locy_fold::FoldExec;
137pub use locy_priority::PriorityExec;
138pub use locy_program::{DerivedStore, LocyProgramExec};
139pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
140
141/// Shared context for graph operators.
142///
143/// Provides access to graph-specific resources needed during query execution:
144/// - CSR adjacency cache for fast neighbor lookups
145/// - L0 buffers for MVCC visibility of uncommitted changes
146/// - Property manager for lazy-loading vertex/edge properties
147/// - Storage manager for schema and dataset access
148///
149/// # Example
150///
151/// ```ignore
152/// let ctx = GraphExecutionContext::new(
153/// storage_manager,
154/// l0_buffer,
155/// property_manager,
156/// );
157///
158/// // Get neighbors with L0 overlay
159/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
160/// ```
161pub struct GraphExecutionContext {
162 /// Storage manager for schema and dataset access.
163 storage: Arc<StorageManager>,
164
165 /// L0 visibility context for MVCC.
166 l0_context: L0Context,
167
168 /// Property manager for lazy property loading.
169 property_manager: Arc<PropertyManager>,
170
171 /// Query timeout deadline.
172 deadline: Option<Instant>,
173
174 /// Algorithm registry for `uni.algo.*` procedure dispatch.
175 algo_registry: Option<Arc<AlgorithmRegistry>>,
176
177 /// External procedure registry for test/user-defined procedures.
178 procedure_registry: Option<Arc<ProcedureRegistry>>,
179 /// Plugin registry — used by the native-label scan dispatcher
180 /// (M5h.2) to route a label's reads through plugin `Storage` when
181 /// one is registered via `PluginRegistry::register_label_storage`.
182 plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
183 /// Uni-Xervo runtime used by vector auto-embedding paths.
184 xervo_runtime: Option<Arc<ModelRuntime>>,
185
186 /// Runtime warnings collected during query execution.
187 warnings: Arc<Mutex<Vec<QueryWarning>>>,
188
189 /// Cooperative cancellation token, threaded from `QueryContext`.
190 cancellation_token: Option<tokio_util::sync::CancellationToken>,
191
192 /// Outer transaction's writer handle (FU-1 / M11 #6). Threaded
193 /// from the [`crate::Executor`] when the query is running inside a
194 /// write-mode transaction; consumed by
195 /// `QueryProcedureHost::with_writer` at procedure invocation time
196 /// so a declared `WRITE`-mode procedure's Cypher body can mutate
197 /// the outer transaction's L0. `Arc<Writer>` (interior-mutable,
198 /// no outer lock) matches the executor's writer handle type.
199 writer: Option<Arc<uni_store::Writer>>,
200}
201
202impl std::fmt::Debug for GraphExecutionContext {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("GraphExecutionContext")
205 .field("l0_context", &self.l0_context)
206 .field("deadline", &self.deadline)
207 .finish_non_exhaustive()
208 }
209}
210
211/// L0 buffer visibility context for MVCC reads.
212///
213/// Maintains references to all L0 buffers that should be visible to a query:
214/// - Current L0: The active write buffer
215/// - Transaction L0: Buffer for the current transaction (if any)
216/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
217///
218/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
219#[derive(Clone, Default)]
220pub struct L0Context {
221 /// Current active L0 buffer.
222 pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
223
224 /// Transaction-local L0 buffer (if in a transaction).
225 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
226
227 /// L0 buffers pending flush to disk.
228 /// These remain visible until flush completes.
229 pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
230}
231
232impl std::fmt::Debug for L0Context {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("L0Context")
235 .field("current_l0", &self.current_l0.is_some())
236 .field("transaction_l0", &self.transaction_l0.is_some())
237 .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
238 .finish()
239 }
240}
241
242impl L0Context {
243 /// Create an empty L0 context with no buffers.
244 pub fn empty() -> Self {
245 Self::default()
246 }
247
248 /// Create L0 context with just a current buffer.
249 pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
250 Self {
251 current_l0: Some(l0),
252 ..Self::default()
253 }
254 }
255
256 /// Create L0 context from a query context.
257 pub fn from_query_context(ctx: &QueryContext) -> Self {
258 Self {
259 current_l0: Some(ctx.l0.clone()),
260 transaction_l0: ctx.transaction_l0.clone(),
261 pending_flush_l0s: ctx.pending_flush_l0s.clone(),
262 }
263 }
264
265 /// Iterate over all L0 buffers in visibility order.
266 /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
267 pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
268 self.pending_flush_l0s
269 .iter()
270 .chain(self.current_l0.iter())
271 .chain(self.transaction_l0.iter())
272 }
273}
274
275impl GraphExecutionContext {
276 /// Shared constructor for the public entry points. The three public
277 /// constructors differ only in the L0 visibility context, deadline,
278 /// and cancellation token; every other field starts at its default.
279 fn with_parts(
280 storage: Arc<StorageManager>,
281 l0_context: L0Context,
282 property_manager: Arc<PropertyManager>,
283 deadline: Option<Instant>,
284 cancellation_token: Option<tokio_util::sync::CancellationToken>,
285 ) -> Self {
286 Self {
287 storage,
288 l0_context,
289 property_manager,
290 deadline,
291 algo_registry: None,
292 procedure_registry: None,
293 plugin_registry: None,
294 xervo_runtime: None,
295 warnings: Arc::new(Mutex::new(Vec::new())),
296 cancellation_token,
297 writer: None,
298 }
299 }
300
301 /// Create a new graph execution context.
302 ///
303 /// # Arguments
304 ///
305 /// * `storage` - Storage manager for schema and dataset access
306 /// * `l0` - Current L0 buffer for MVCC visibility
307 /// * `property_manager` - Manager for lazy property loading
308 pub fn new(
309 storage: Arc<StorageManager>,
310 l0: Arc<RwLock<L0Buffer>>,
311 property_manager: Arc<PropertyManager>,
312 ) -> Self {
313 Self::with_parts(
314 storage,
315 L0Context::with_current(l0),
316 property_manager,
317 None,
318 None,
319 )
320 }
321
322 /// Create context with full L0 visibility.
323 ///
324 /// # Arguments
325 ///
326 /// * `storage` - Storage manager for schema and dataset access
327 /// * `l0_context` - L0 visibility context with all buffers
328 /// * `property_manager` - Manager for lazy property loading
329 pub fn with_l0_context(
330 storage: Arc<StorageManager>,
331 l0_context: L0Context,
332 property_manager: Arc<PropertyManager>,
333 ) -> Self {
334 Self::with_parts(storage, l0_context, property_manager, None, None)
335 }
336
337 /// Create context from a query context.
338 pub fn from_query_context(
339 storage: Arc<StorageManager>,
340 query_ctx: &QueryContext,
341 property_manager: Arc<PropertyManager>,
342 ) -> Self {
343 Self::with_parts(
344 storage,
345 L0Context::from_query_context(query_ctx),
346 property_manager,
347 query_ctx.deadline,
348 query_ctx.cancellation_token.clone(),
349 )
350 }
351
352 /// Set query timeout deadline.
353 pub fn with_deadline(mut self, deadline: Instant) -> Self {
354 self.deadline = Some(deadline);
355 self
356 }
357
358 /// Attach the outer transaction's writer handle so declared
359 /// `WRITE`-mode procedures invoked through this context can run
360 /// their Cypher bodies via the write-enabled inner-query host.
361 #[must_use]
362 pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
363 self.writer = Some(writer);
364 self
365 }
366
367 /// Borrow the outer transaction's writer handle, if any.
368 #[must_use]
369 pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
370 self.writer.as_ref()
371 }
372
373 /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
374 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
375 self.algo_registry = Some(registry);
376 self
377 }
378
379 /// Get a reference to the algorithm registry, if set.
380 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
381 self.algo_registry.as_ref()
382 }
383
384 /// Set the external procedure registry for test/user-defined procedures.
385 pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
386 self.procedure_registry = Some(registry);
387 self
388 }
389
390 /// Set Uni-Xervo runtime for query-time auto-embedding.
391 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
392 self.xervo_runtime = Some(runtime);
393 self
394 }
395
396 /// Get a reference to the procedure registry, if set.
397 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
398 self.procedure_registry.as_ref()
399 }
400
401 /// Attach the plugin registry. Required by the M5h.2 native-label
402 /// plugin-storage routing in `columnar_scan_vertex_batch_static`.
403 pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
404 self.plugin_registry = Some(registry);
405 self
406 }
407
408 /// Reference to the plugin registry (if set).
409 pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
410 self.plugin_registry.as_ref()
411 }
412
413 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
414 self.xervo_runtime.as_ref()
415 }
416
417 /// Record a runtime warning.
418 pub fn push_warning(&self, warning: QueryWarning) {
419 if let Ok(mut w) = self.warnings.lock() {
420 w.push(warning);
421 }
422 }
423
424 /// Take all collected warnings, leaving the collector empty.
425 pub fn take_warnings(&self) -> Vec<QueryWarning> {
426 self.warnings
427 .lock()
428 .map(|mut w| std::mem::take(&mut *w))
429 .unwrap_or_default()
430 }
431
432 /// Check if the query has timed out.
433 ///
434 /// # Errors
435 ///
436 /// Returns an error if the deadline has passed.
437 pub fn check_timeout(&self) -> anyhow::Result<()> {
438 if let Some(ref token) = self.cancellation_token
439 && token.is_cancelled()
440 {
441 return Err(anyhow::anyhow!("Query cancelled"));
442 }
443 if let Some(deadline) = self.deadline
444 && Instant::now() > deadline
445 {
446 return Err(anyhow::anyhow!("Query timed out"));
447 }
448 Ok(())
449 }
450
451 /// Get a reference to the storage manager.
452 pub fn storage(&self) -> &Arc<StorageManager> {
453 &self.storage
454 }
455
456 /// Get a reference to the adjacency manager.
457 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
458 self.storage.adjacency_manager()
459 }
460
461 /// Get a reference to the property manager.
462 pub fn property_manager(&self) -> &Arc<PropertyManager> {
463 &self.property_manager
464 }
465
466 /// Get a reference to the L0 context.
467 pub fn l0_context(&self) -> &L0Context {
468 &self.l0_context
469 }
470
471 /// Wall-clock deadline for the surrounding query, if any.
472 ///
473 /// Internal accessor used by [`crate::query::executor::procedure_host::QueryProcedureHost`]
474 /// to snapshot the deadline so procedure plugins can implement
475 /// `check_timeout` without holding a borrow on this context.
476 #[must_use]
477 pub fn deadline_for_host(&self) -> Option<Instant> {
478 self.deadline
479 }
480
481 /// Cancellation token clone for the surrounding query, if any.
482 ///
483 /// Internal accessor for [`crate::query::executor::procedure_host::QueryProcedureHost`];
484 /// see [`Self::deadline_for_host`].
485 #[must_use]
486 pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
487 self.cancellation_token.clone()
488 }
489
490 /// Create a query context for property manager calls.
491 ///
492 /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
493 pub fn query_context(&self) -> QueryContext {
494 let l0 = self
495 .l0_context
496 .current_l0
497 .clone()
498 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
499
500 let mut ctx = QueryContext::new_with_pending(
501 l0,
502 self.l0_context.transaction_l0.clone(),
503 self.l0_context.pending_flush_l0s.clone(),
504 );
505 if let Some(deadline) = self.deadline {
506 ctx.set_deadline(deadline);
507 }
508 ctx
509 }
510
511 /// Ensure adjacency CSRs are warmed for the given edge types and direction.
512 ///
513 /// This loads any missing CSR data from storage into the adjacency manager
514 /// so that subsequent `get_neighbors` calls return complete results.
515 /// Skips warming if the adjacency manager already has data (Main CSR or
516 /// active overlay) for the edge type, avoiding duplicate entries.
517 pub async fn ensure_adjacency_warmed(
518 &self,
519 edge_type_ids: &[u32],
520 direction: Direction,
521 ) -> anyhow::Result<()> {
522 let am = self.adjacency_manager();
523 // Manifest pin only: tx version pins must NOT filter adjacency
524 // warming/reads (see StorageManager::snapshot_version_hwm).
525 let version = self.storage.snapshot_version_hwm();
526 for &etype_id in edge_type_ids {
527 // Skip if AM already has data (CSR or overlay) for this edge type.
528 // The overlay contains edges from dual-write (Writer), so warming
529 // would duplicate them.
530 if !am.is_active_for(etype_id, direction) {
531 for &dir in direction.expand() {
532 // Use coalesced warming to prevent cache stampede (Issue #13)
533 self.storage
534 .warm_adjacency_coalesced(etype_id, dir, version)
535 .await?;
536 }
537 }
538 }
539 Ok(())
540 }
541
542 /// Create a boxed warming future for use in DataFusion stream state machines.
543 ///
544 /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
545 /// suitable for polling in stream `poll_next` implementations.
546 pub fn warming_future(
547 self: &Arc<Self>,
548 edge_type_ids: Vec<u32>,
549 direction: Direction,
550 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
551 {
552 let ctx = self.clone();
553 Box::pin(async move {
554 ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
555 .await
556 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
557 })
558 }
559
560 /// Get neighbors for a vertex, merging CSR and all L0 buffers.
561 ///
562 /// This implements the MVCC visibility rules:
563 /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
564 /// 2. Overlay pending flush L0s (oldest to newest)
565 /// 3. Overlay current L0
566 /// 4. Overlay transaction L0 (if present)
567 /// 5. Filter tombstones (handled by overlay)
568 ///
569 /// # Arguments
570 ///
571 /// * `vid` - Source vertex ID
572 /// * `edge_type` - Edge type ID to traverse
573 /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
574 ///
575 /// # Returns
576 ///
577 /// Vector of (neighbor VID, edge ID) pairs.
578 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
579 // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
580 let version_hwm = self.storage.snapshot_version_hwm();
581 // Single-vid case: acquire the transaction-L0 guard once for this
582 // vertex (the batch path amortizes it across many vertices).
583 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
584 self.neighbors_for_vid(
585 vid,
586 edge_type,
587 direction,
588 version_hwm,
589 tx_guard.as_deref(),
590 true,
591 )
592 }
593
594 /// Get neighbors for multiple vertices in batch.
595 ///
596 /// More efficient than calling `get_neighbors` repeatedly as it amortizes
597 /// lock acquisition for L0 buffers.
598 ///
599 /// # Arguments
600 ///
601 /// * `vids` - Source vertex IDs
602 /// * `edge_type` - Edge type ID to traverse
603 /// * `direction` - Traversal direction
604 ///
605 /// # Returns
606 ///
607 /// Vector of (source VID, neighbor VID, edge ID) triples.
608 pub fn get_neighbors_batch(
609 &self,
610 vids: &[Vid],
611 edge_type: u32,
612 direction: Direction,
613 ) -> Vec<(Vid, Vid, Eid)> {
614 // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
615 let version_hwm = self.storage.snapshot_version_hwm();
616 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
617
618 let mut results = Vec::new();
619 for &vid in vids {
620 // record_reads=false: the whole batch is recorded in one read-set
621 // lock below instead of two lock acquisitions per source vertex.
622 let neighbors = self.neighbors_for_vid(
623 vid,
624 edge_type,
625 direction,
626 version_hwm,
627 tx_guard.as_deref(),
628 false,
629 );
630 results.extend(
631 neighbors
632 .into_iter()
633 .map(|(neighbor, eid)| (vid, neighbor, eid)),
634 );
635 }
636 drop(tx_guard);
637 self.record_neighbor_reads_batch(vids, &results);
638 results
639 }
640
641 /// Resolve an edge's STORED `(src, dst)` orientation given a traversed hop.
642 ///
643 /// A relationship in a path must report its stored (start -> end) direction
644 /// even when the path traversed it backward (undirected `-[r]-` or incoming
645 /// `<-[r]-`). This first consults the L0 visibility chain (exact for
646 /// in-memory edges); if the edge has been flushed to durable storage and is
647 /// no longer L0-resident, it recovers the orientation with a bounded
648 /// directed-outgoing adjacency probe: the edge is stored
649 /// `(traversal_src -> traversal_dst)` iff `eid` appears among
650 /// `traversal_src`'s outgoing neighbours for one of `edge_type_ids`,
651 /// otherwise it is the reverse. Falls back to the traversal order only when
652 /// the probe is inconclusive (e.g. the type carries no CSR adjacency).
653 ///
654 /// The probe costs at most the out-degree of one vertex per candidate edge
655 /// type — never a full edge scan — and reads the adjacency manager directly,
656 /// so it does not perturb the SSI read-set.
657 ///
658 /// # Examples
659 ///
660 /// ```ignore
661 /// let (src, dst) =
662 /// ctx.resolve_stored_edge_endpoints(eid, node_path[i], node_path[i + 1], &edge_type_ids);
663 /// ```
664 #[must_use]
665 pub fn resolve_stored_edge_endpoints(
666 &self,
667 eid: Eid,
668 traversal_src: Vid,
669 traversal_dst: Vid,
670 edge_type_ids: &[u32],
671 ) -> (u64, u64) {
672 // 1. L0 visibility chain — exact stored endpoints for in-memory edges.
673 let query_ctx = self.query_context();
674 if let Some((src, dst)) =
675 uni_store::runtime::l0_visibility::get_edge_endpoints(eid, &query_ctx)
676 {
677 return (src.as_u64(), dst.as_u64());
678 }
679
680 // 2. Flushed (L1-resident) edge: recover orientation via a directed
681 // outgoing adjacency probe. Read the adjacency manager / versioned
682 // snapshot directly so the probe stays out of the SSI read-set.
683 //
684 // When the caller could not supply the edge's type ids (e.g. an
685 // anonymous `-[]-` relationship reaching BindFixedPath without a
686 // `_type` column), fall back to the adjacency manager's warmed types
687 // — exactly the set traversed by this query, so still a bounded probe.
688 let adjacency_manager = self.adjacency_manager();
689 let warmed_fallback: Vec<u32>;
690 let probe_types: &[u32] = if edge_type_ids.is_empty() {
691 warmed_fallback = adjacency_manager.known_edge_type_ids();
692 &warmed_fallback
693 } else {
694 edge_type_ids
695 };
696 let version_hwm = self.storage.snapshot_version_hwm();
697 let outgoing_contains = |vid: Vid| -> bool {
698 probe_types.iter().any(|&etype| {
699 let neighbors = match version_hwm {
700 Some(hwm) => {
701 self.storage
702 .get_neighbors_at_version(vid, etype, Direction::Outgoing, hwm)
703 }
704 None => adjacency_manager.get_neighbors(vid, etype, Direction::Outgoing),
705 };
706 neighbors.iter().any(|&(_, e)| e == eid)
707 })
708 };
709
710 if outgoing_contains(traversal_src) {
711 (traversal_src.as_u64(), traversal_dst.as_u64())
712 } else if outgoing_contains(traversal_dst) {
713 (traversal_dst.as_u64(), traversal_src.as_u64())
714 } else {
715 // 3. Inconclusive (no CSR adjacency for this type): preserve the
716 // long-standing traversal-order behaviour.
717 (traversal_src.as_u64(), traversal_dst.as_u64())
718 }
719 }
720
721 /// Resolve a single vertex's neighbours, overlaying the transaction L0
722 /// (if visible) and recording the traversal into the SSI read-set.
723 ///
724 /// `tx_guard` is the already-acquired read guard over the transaction
725 /// L0 buffer (if any), so batch callers acquire the lock once and pass
726 /// the borrow in for every vertex.
727 /// When `record_reads` is false the caller takes responsibility for
728 /// recording the traversal into the SSI read-set (the batch path records
729 /// once per batch via [`record_neighbor_reads_batch`]).
730 ///
731 /// [`record_neighbor_reads_batch`]: Self::record_neighbor_reads_batch
732 fn neighbors_for_vid(
733 &self,
734 vid: Vid,
735 edge_type: u32,
736 direction: Direction,
737 version_hwm: Option<u64>,
738 tx_guard: Option<&L0Buffer>,
739 record_reads: bool,
740 ) -> Vec<(Vid, Eid)> {
741 // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
742 // For snapshot queries, filter by version via StorageManager delegate.
743 let mut neighbors = if let Some(hwm) = version_hwm {
744 self.storage
745 .get_neighbors_at_version(vid, edge_type, direction, hwm)
746 } else {
747 self.adjacency_manager()
748 .get_neighbors(vid, edge_type, direction)
749 };
750
751 // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
752 if version_hwm.is_none()
753 && let Some(tx_guard) = tx_guard
754 {
755 overlay_l0_neighbors(
756 vid,
757 edge_type,
758 direction,
759 tx_guard,
760 &mut neighbors,
761 version_hwm,
762 );
763 }
764
765 if record_reads {
766 self.record_neighbor_reads(vid, &neighbors);
767 }
768
769 neighbors
770 }
771
772 /// Records traversed edges and discovered neighbours into the SSI read-set.
773 ///
774 /// No-op unless this is a read-write transaction (`occ_read_set` is `Some`
775 /// only then), so read-only and analytical traversals pay nothing. Recording
776 /// the source plus each neighbour vid and edge id gives item-level
777 /// antidependency coverage for traversals, matching the keyed read paths.
778 fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
779 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
780 return;
781 };
782 let guard = tx_l0.read();
783 let Some(read_set) = &guard.occ_read_set else {
784 return;
785 };
786 let mut rs = read_set.lock();
787 rs.vertices.insert(src);
788 for (nbr, eid) in neighbors {
789 rs.vertices.insert(*nbr);
790 rs.edges.insert(*eid);
791 }
792 }
793
794 /// Batch variant of [`record_neighbor_reads`](Self::record_neighbor_reads):
795 /// records an entire expansion batch under ONE read-set lock instead of
796 /// two lock acquisitions per source vertex.
797 ///
798 /// `srcs` is recorded in full — a source with zero neighbours is still a
799 /// read ("no edges here") that a concurrent edge insert must conflict
800 /// with, exactly as the per-vertex recorder behaves.
801 fn record_neighbor_reads_batch(&self, srcs: &[Vid], triples: &[(Vid, Vid, Eid)]) {
802 if srcs.is_empty() && triples.is_empty() {
803 return;
804 }
805 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
806 return;
807 };
808 let guard = tx_l0.read();
809 let Some(read_set) = &guard.occ_read_set else {
810 return;
811 };
812 let mut rs = read_set.lock();
813 for src in srcs {
814 rs.vertices.insert(*src);
815 }
816 for (_, nbr, eid) in triples {
817 rs.vertices.insert(*nbr);
818 rs.edges.insert(*eid);
819 }
820 }
821
822 /// Records the vertex/edge ids in the given batch columns into the read-set.
823 ///
824 /// Used by [`ReadSetRecordingExec`] to capture the identities of rows that
825 /// survived a scan's filters. No-op when there is no transaction read-set
826 /// (read-only / analytical contexts).
827 ///
828 /// [`ReadSetRecordingExec`]: crate::query::df_graph::ReadSetRecordingExec
829 pub(crate) fn record_batch_ids(
830 &self,
831 batch: &arrow_array::RecordBatch,
832 vertex_cols: &[usize],
833 edge_cols: &[usize],
834 ) {
835 use arrow_array::{Array, UInt64Array};
836
837 if vertex_cols.is_empty() && edge_cols.is_empty() {
838 return;
839 }
840 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
841 return;
842 };
843 let guard = tx_l0.read();
844 let Some(read_set) = &guard.occ_read_set else {
845 return;
846 };
847 let mut rs = read_set.lock();
848 for &col in vertex_cols {
849 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
850 for i in 0..arr.len() {
851 if !arr.is_null(i) {
852 rs.vertices.insert(Vid::from(arr.value(i)));
853 }
854 }
855 }
856 }
857 for &col in edge_cols {
858 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
859 for i in 0..arr.len() {
860 if !arr.is_null(i) {
861 rs.edges.insert(Eid::from(arr.value(i)));
862 }
863 }
864 }
865 }
866 }
867}
868
869/// Overlay L0 buffer neighbors onto existing neighbor list.
870///
871/// Adds new edges from L0 and removes tombstoned edges.
872/// Filters by version if a snapshot boundary is provided.
873fn overlay_l0_neighbors(
874 vid: Vid,
875 edge_type: u32,
876 direction: Direction,
877 l0: &L0Buffer,
878 neighbors: &mut Vec<(Vid, Eid)>,
879 version_hwm: Option<u64>,
880) {
881 use std::collections::HashMap;
882
883 // Convert to map for efficient updates
884 let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
885
886 // Query L0 for each direction
887 for &simple_dir in direction.to_simple_directions() {
888 for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
889 // Skip edges beyond snapshot boundary
890 if version_hwm.is_some_and(|hwm| version > hwm) {
891 continue;
892 }
893
894 // Apply insert or remove tombstone
895 if l0.is_tombstoned(eid) {
896 neighbor_map.remove(&eid);
897 } else {
898 neighbor_map.insert(eid, neighbor);
899 }
900 }
901 }
902
903 // Remove edges tombstoned in this L0 but originating from other layers
904 for eid in l0.tombstones.keys() {
905 neighbor_map.remove(eid);
906 }
907
908 // Convert back to vec
909 *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
910}
911
912/// Extension trait to convert storage Direction to SimpleGraph directions.
913trait DirectionExt {
914 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
915}
916
917impl DirectionExt for Direction {
918 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
919 use uni_common::graph::simple_graph::Direction as SimpleDirection;
920 match self {
921 Direction::Outgoing => &[SimpleDirection::Outgoing],
922 Direction::Incoming => &[SimpleDirection::Incoming],
923 Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
924 }
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931
932 #[test]
933 fn test_l0_context_empty() {
934 let ctx = L0Context::empty();
935 assert!(ctx.current_l0.is_none());
936 assert!(ctx.transaction_l0.is_none());
937 assert!(ctx.pending_flush_l0s.is_empty());
938 }
939}