uni_query/query/df_graph/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │ DataFusion ExecutionPlan Tree │
20//! ├─────────────────────────────────────────┤
21//! │ ProjectionExec (DataFusion) │
22//! │ │ │
23//! │ FilterExec (DataFusion) │
24//! │ │ │
25//! │ GraphTraverseExec (CUSTOM) │
26//! │ │ │
27//! │ GraphScanExec (CUSTOM) │
28//! │ │ │
29//! │ UniTableProvider + UniMergeExec │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_profile;
63pub mod locy_program;
64pub mod locy_query;
65pub mod locy_slg;
66pub mod locy_traits;
67pub mod locy_validate;
68pub mod mutation_common;
69pub mod mutation_delete;
70pub mod mutation_foreach;
71pub mod mutation_remove;
72pub mod mutation_set;
73pub mod nfa;
74pub mod optional_filter;
75pub mod pattern_comprehension;
76pub mod pattern_exists;
77pub mod pred_dag;
78pub mod procedure_call;
79pub mod quantifier;
80mod read_set_exec;
81pub mod recursive_cte;
82pub mod reduce;
83pub mod scan;
84pub mod shortest_path;
85pub(crate) mod similar_to_expr;
86pub mod traverse;
87pub mod unwind;
88pub mod vector_knn;
89pub mod vid_lookup_join;
90
91use crate::query::executor::procedure::ProcedureRegistry;
92use parking_lot::RwLock;
93use std::sync::{Arc, Mutex};
94use std::time::Instant;
95use uni_algo::algo::AlgorithmRegistry;
96use uni_common::core::id::{Eid, Vid};
97use uni_store::runtime::context::QueryContext;
98use uni_store::runtime::l0::L0Buffer;
99use uni_store::runtime::property_manager::PropertyManager;
100use uni_store::storage::adjacency_manager::AdjacencyManager;
101use uni_store::storage::direction::Direction;
102use uni_store::storage::manager::StorageManager;
103
104pub mod search_procedures;
105use uni_xervo::runtime::ModelRuntime;
106
107use crate::types::QueryWarning;
108
109pub use apply::GraphApplyExec;
110pub use ext_id_lookup::GraphExtIdLookupExec;
111// CREATE and MERGE both execute through `MutationExec`; these aliases and
112// the `new_*_exec` builders are re-exported here so the planner can refer to
113// them by their clause-specific names without a dedicated module each.
114pub use mutation_common::{
115 MutationContext, MutationExec, MutationExec as MutationCreateExec,
116 MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
117};
118pub use mutation_delete::MutationDeleteExec;
119pub use mutation_foreach::ForeachExec;
120pub use mutation_remove::MutationRemoveExec;
121pub use mutation_set::MutationSetExec;
122pub use optional_filter::OptionalFilterExec;
123pub use procedure_call::GraphProcedureCallExec;
124pub use read_set_exec::ReadSetRecordingExec;
125pub use scan::GraphScanExec;
126pub use shortest_path::GraphShortestPathExec;
127pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
128pub use unwind::GraphUnwindExec;
129pub use vector_knn::GraphVectorKnnExec;
130
131pub use locy_best_by::BestByExec;
132pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
133pub use locy_fixpoint::{
134 DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
135 FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
136};
137pub use locy_fold::FoldExec;
138pub use locy_priority::PriorityExec;
139pub use locy_program::{DerivedStore, LocyProgramExec};
140pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
141
142/// Shared context for graph operators.
143///
144/// Provides access to graph-specific resources needed during query execution:
145/// - CSR adjacency cache for fast neighbor lookups
146/// - L0 buffers for MVCC visibility of uncommitted changes
147/// - Property manager for lazy-loading vertex/edge properties
148/// - Storage manager for schema and dataset access
149///
150/// # Example
151///
152/// ```ignore
153/// let ctx = GraphExecutionContext::new(
154/// storage_manager,
155/// l0_buffer,
156/// property_manager,
157/// );
158///
159/// // Get neighbors with L0 overlay
160/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
161/// ```
162pub struct GraphExecutionContext {
163 /// Storage manager for schema and dataset access.
164 storage: Arc<StorageManager>,
165
166 /// L0 visibility context for MVCC.
167 l0_context: L0Context,
168
169 /// Property manager for lazy property loading.
170 property_manager: Arc<PropertyManager>,
171
172 /// Query timeout deadline.
173 deadline: Option<Instant>,
174
175 /// Algorithm registry for `uni.algo.*` procedure dispatch.
176 algo_registry: Option<Arc<AlgorithmRegistry>>,
177
178 /// External procedure registry for test/user-defined procedures.
179 procedure_registry: Option<Arc<ProcedureRegistry>>,
180 /// Plugin registry — used by the native-label scan dispatcher
181 /// (M5h.2) to route a label's reads through plugin `Storage` when
182 /// one is registered via `PluginRegistry::register_label_storage`.
183 plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
184 /// Uni-Xervo runtime used by vector auto-embedding paths.
185 xervo_runtime: Option<Arc<ModelRuntime>>,
186
187 /// Runtime warnings collected during query execution.
188 warnings: Arc<Mutex<Vec<QueryWarning>>>,
189
190 /// Cooperative cancellation token, threaded from `QueryContext`.
191 cancellation_token: Option<tokio_util::sync::CancellationToken>,
192
193 /// Outer transaction's writer handle (FU-1 / M11 #6). Threaded
194 /// from the [`crate::Executor`] when the query is running inside a
195 /// write-mode transaction; consumed by
196 /// `QueryProcedureHost::with_writer` at procedure invocation time
197 /// so a declared `WRITE`-mode procedure's Cypher body can mutate
198 /// the outer transaction's L0. `Arc<Writer>` (interior-mutable,
199 /// no outer lock) matches the executor's writer handle type.
200 writer: Option<Arc<uni_store::Writer>>,
201}
202
203impl std::fmt::Debug for GraphExecutionContext {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("GraphExecutionContext")
206 .field("l0_context", &self.l0_context)
207 .field("deadline", &self.deadline)
208 .finish_non_exhaustive()
209 }
210}
211
212/// L0 buffer visibility context for MVCC reads.
213///
214/// Maintains references to all L0 buffers that should be visible to a query:
215/// - Current L0: The active write buffer
216/// - Transaction L0: Buffer for the current transaction (if any)
217/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
218///
219/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
220#[derive(Clone, Default)]
221pub struct L0Context {
222 /// Current active L0 buffer.
223 pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
224
225 /// Transaction-local L0 buffer (if in a transaction).
226 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
227
228 /// L0 buffers pending flush to disk.
229 /// These remain visible until flush completes.
230 pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
231}
232
233impl std::fmt::Debug for L0Context {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 f.debug_struct("L0Context")
236 .field("current_l0", &self.current_l0.is_some())
237 .field("transaction_l0", &self.transaction_l0.is_some())
238 .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
239 .finish()
240 }
241}
242
243impl L0Context {
244 /// Create an empty L0 context with no buffers.
245 pub fn empty() -> Self {
246 Self::default()
247 }
248
249 /// Create L0 context with just a current buffer.
250 pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
251 Self {
252 current_l0: Some(l0),
253 ..Self::default()
254 }
255 }
256
257 /// Create L0 context from a query context.
258 pub fn from_query_context(ctx: &QueryContext) -> Self {
259 Self {
260 current_l0: Some(ctx.l0.clone()),
261 transaction_l0: ctx.transaction_l0.clone(),
262 pending_flush_l0s: ctx.pending_flush_l0s.clone(),
263 }
264 }
265
266 /// Iterate over all L0 buffers in visibility order.
267 /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
268 pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
269 self.pending_flush_l0s
270 .iter()
271 .chain(self.current_l0.iter())
272 .chain(self.transaction_l0.iter())
273 }
274}
275
276impl GraphExecutionContext {
277 /// Shared constructor for the public entry points. The three public
278 /// constructors differ only in the L0 visibility context, deadline,
279 /// and cancellation token; every other field starts at its default.
280 fn with_parts(
281 storage: Arc<StorageManager>,
282 l0_context: L0Context,
283 property_manager: Arc<PropertyManager>,
284 deadline: Option<Instant>,
285 cancellation_token: Option<tokio_util::sync::CancellationToken>,
286 ) -> Self {
287 Self {
288 storage,
289 l0_context,
290 property_manager,
291 deadline,
292 algo_registry: None,
293 procedure_registry: None,
294 plugin_registry: None,
295 xervo_runtime: None,
296 warnings: Arc::new(Mutex::new(Vec::new())),
297 cancellation_token,
298 writer: None,
299 }
300 }
301
302 /// Create a new graph execution context.
303 ///
304 /// # Arguments
305 ///
306 /// * `storage` - Storage manager for schema and dataset access
307 /// * `l0` - Current L0 buffer for MVCC visibility
308 /// * `property_manager` - Manager for lazy property loading
309 pub fn new(
310 storage: Arc<StorageManager>,
311 l0: Arc<RwLock<L0Buffer>>,
312 property_manager: Arc<PropertyManager>,
313 ) -> Self {
314 Self::with_parts(
315 storage,
316 L0Context::with_current(l0),
317 property_manager,
318 None,
319 None,
320 )
321 }
322
323 /// Create context with full L0 visibility.
324 ///
325 /// # Arguments
326 ///
327 /// * `storage` - Storage manager for schema and dataset access
328 /// * `l0_context` - L0 visibility context with all buffers
329 /// * `property_manager` - Manager for lazy property loading
330 pub fn with_l0_context(
331 storage: Arc<StorageManager>,
332 l0_context: L0Context,
333 property_manager: Arc<PropertyManager>,
334 ) -> Self {
335 Self::with_parts(storage, l0_context, property_manager, None, None)
336 }
337
338 /// Create context from a query context.
339 pub fn from_query_context(
340 storage: Arc<StorageManager>,
341 query_ctx: &QueryContext,
342 property_manager: Arc<PropertyManager>,
343 ) -> Self {
344 Self::with_parts(
345 storage,
346 L0Context::from_query_context(query_ctx),
347 property_manager,
348 query_ctx.deadline,
349 query_ctx.cancellation_token.clone(),
350 )
351 }
352
353 /// Set query timeout deadline.
354 pub fn with_deadline(mut self, deadline: Instant) -> Self {
355 self.deadline = Some(deadline);
356 self
357 }
358
359 /// Attach the outer transaction's writer handle so declared
360 /// `WRITE`-mode procedures invoked through this context can run
361 /// their Cypher bodies via the write-enabled inner-query host.
362 #[must_use]
363 pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
364 self.writer = Some(writer);
365 self
366 }
367
368 /// Borrow the outer transaction's writer handle, if any.
369 #[must_use]
370 pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
371 self.writer.as_ref()
372 }
373
374 /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
375 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
376 self.algo_registry = Some(registry);
377 self
378 }
379
380 /// Get a reference to the algorithm registry, if set.
381 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
382 self.algo_registry.as_ref()
383 }
384
385 /// Set the external procedure registry for test/user-defined procedures.
386 pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
387 self.procedure_registry = Some(registry);
388 self
389 }
390
391 /// Set Uni-Xervo runtime for query-time auto-embedding.
392 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
393 self.xervo_runtime = Some(runtime);
394 self
395 }
396
397 /// Get a reference to the procedure registry, if set.
398 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
399 self.procedure_registry.as_ref()
400 }
401
402 /// Attach the plugin registry. Required by the M5h.2 native-label
403 /// plugin-storage routing in `columnar_scan_vertex_batch_static`.
404 pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
405 self.plugin_registry = Some(registry);
406 self
407 }
408
409 /// Reference to the plugin registry (if set).
410 pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
411 self.plugin_registry.as_ref()
412 }
413
414 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
415 self.xervo_runtime.as_ref()
416 }
417
418 /// Record a runtime warning.
419 pub fn push_warning(&self, warning: QueryWarning) {
420 if let Ok(mut w) = self.warnings.lock() {
421 w.push(warning);
422 }
423 }
424
425 /// Take all collected warnings, leaving the collector empty.
426 pub fn take_warnings(&self) -> Vec<QueryWarning> {
427 self.warnings
428 .lock()
429 .map(|mut w| std::mem::take(&mut *w))
430 .unwrap_or_default()
431 }
432
433 /// Check if the query has timed out.
434 ///
435 /// # Errors
436 ///
437 /// Returns an error if the deadline has passed.
438 pub fn check_timeout(&self) -> anyhow::Result<()> {
439 if let Some(ref token) = self.cancellation_token
440 && token.is_cancelled()
441 {
442 return Err(anyhow::anyhow!("Query cancelled"));
443 }
444 if let Some(deadline) = self.deadline
445 && Instant::now() > deadline
446 {
447 return Err(anyhow::anyhow!("Query timed out"));
448 }
449 Ok(())
450 }
451
452 /// Get a reference to the storage manager.
453 pub fn storage(&self) -> &Arc<StorageManager> {
454 &self.storage
455 }
456
457 /// Get a reference to the adjacency manager.
458 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
459 self.storage.adjacency_manager()
460 }
461
462 /// Get a reference to the property manager.
463 pub fn property_manager(&self) -> &Arc<PropertyManager> {
464 &self.property_manager
465 }
466
467 /// Get a reference to the L0 context.
468 pub fn l0_context(&self) -> &L0Context {
469 &self.l0_context
470 }
471
472 /// Wall-clock deadline for the surrounding query, if any.
473 ///
474 /// Internal accessor used by [`crate::query::executor::procedure_host::QueryProcedureHost`]
475 /// to snapshot the deadline so procedure plugins can implement
476 /// `check_timeout` without holding a borrow on this context.
477 #[must_use]
478 pub fn deadline_for_host(&self) -> Option<Instant> {
479 self.deadline
480 }
481
482 /// Cancellation token clone for the surrounding query, if any.
483 ///
484 /// Internal accessor for [`crate::query::executor::procedure_host::QueryProcedureHost`];
485 /// see [`Self::deadline_for_host`].
486 #[must_use]
487 pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
488 self.cancellation_token.clone()
489 }
490
491 /// Create a query context for property manager calls.
492 ///
493 /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
494 pub fn query_context(&self) -> QueryContext {
495 let l0 = self
496 .l0_context
497 .current_l0
498 .clone()
499 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
500
501 let mut ctx = QueryContext::new_with_pending(
502 l0,
503 self.l0_context.transaction_l0.clone(),
504 self.l0_context.pending_flush_l0s.clone(),
505 );
506 if let Some(deadline) = self.deadline {
507 ctx.set_deadline(deadline);
508 }
509 ctx
510 }
511
512 /// Ensure adjacency CSRs are warmed for the given edge types and direction.
513 ///
514 /// This loads any missing CSR data from storage into the adjacency manager
515 /// so that subsequent `get_neighbors` calls return complete results.
516 /// Skips warming if the adjacency manager already has data (Main CSR or
517 /// active overlay) for the edge type, avoiding duplicate entries.
518 pub async fn ensure_adjacency_warmed(
519 &self,
520 edge_type_ids: &[u32],
521 direction: Direction,
522 ) -> anyhow::Result<()> {
523 let am = self.adjacency_manager();
524 // Manifest pin only: tx version pins must NOT filter adjacency
525 // warming/reads (see StorageManager::snapshot_version_hwm).
526 let version = self.storage.snapshot_version_hwm();
527 for &etype_id in edge_type_ids {
528 // Skip if AM already has data (CSR or overlay) for this edge type.
529 // The overlay contains edges from dual-write (Writer), so warming
530 // would duplicate them.
531 if !am.is_active_for(etype_id, direction) {
532 for &dir in direction.expand() {
533 // Use coalesced warming to prevent cache stampede (Issue #13)
534 self.storage
535 .warm_adjacency_coalesced(etype_id, dir, version)
536 .await?;
537 }
538 }
539 }
540 Ok(())
541 }
542
543 /// Create a boxed warming future for use in DataFusion stream state machines.
544 ///
545 /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
546 /// suitable for polling in stream `poll_next` implementations.
547 pub fn warming_future(
548 self: &Arc<Self>,
549 edge_type_ids: Vec<u32>,
550 direction: Direction,
551 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
552 {
553 let ctx = self.clone();
554 Box::pin(async move {
555 ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
556 .await
557 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
558 })
559 }
560
561 /// Get neighbors for a vertex, merging CSR and all L0 buffers.
562 ///
563 /// This implements the MVCC visibility rules:
564 /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
565 /// 2. Overlay pending flush L0s (oldest to newest)
566 /// 3. Overlay current L0
567 /// 4. Overlay transaction L0 (if present)
568 /// 5. Filter tombstones (handled by overlay)
569 ///
570 /// # Arguments
571 ///
572 /// * `vid` - Source vertex ID
573 /// * `edge_type` - Edge type ID to traverse
574 /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
575 ///
576 /// # Returns
577 ///
578 /// Vector of (neighbor VID, edge ID) pairs.
579 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
580 // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
581 let version_hwm = self.storage.snapshot_version_hwm();
582 // Single-vid case: acquire the transaction-L0 guard once for this
583 // vertex (the batch path amortizes it across many vertices).
584 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
585 self.neighbors_for_vid(
586 vid,
587 edge_type,
588 direction,
589 version_hwm,
590 tx_guard.as_deref(),
591 true,
592 )
593 }
594
595 /// Get neighbors for multiple vertices in batch.
596 ///
597 /// More efficient than calling `get_neighbors` repeatedly as it amortizes
598 /// lock acquisition for L0 buffers.
599 ///
600 /// # Arguments
601 ///
602 /// * `vids` - Source vertex IDs
603 /// * `edge_type` - Edge type ID to traverse
604 /// * `direction` - Traversal direction
605 ///
606 /// # Returns
607 ///
608 /// Vector of (source VID, neighbor VID, edge ID) triples.
609 pub fn get_neighbors_batch(
610 &self,
611 vids: &[Vid],
612 edge_type: u32,
613 direction: Direction,
614 ) -> Vec<(Vid, Vid, Eid)> {
615 // Manifest pin only (time-travel); tx pins read live edges + L0 overlays.
616 let version_hwm = self.storage.snapshot_version_hwm();
617 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
618
619 let mut results = Vec::new();
620 for &vid in vids {
621 // record_reads=false: the whole batch is recorded in one read-set
622 // lock below instead of two lock acquisitions per source vertex.
623 let neighbors = self.neighbors_for_vid(
624 vid,
625 edge_type,
626 direction,
627 version_hwm,
628 tx_guard.as_deref(),
629 false,
630 );
631 results.extend(
632 neighbors
633 .into_iter()
634 .map(|(neighbor, eid)| (vid, neighbor, eid)),
635 );
636 }
637 drop(tx_guard);
638 self.record_neighbor_reads_batch(vids, &results);
639 results
640 }
641
642 /// Resolve an edge's STORED `(src, dst)` orientation given a traversed hop.
643 ///
644 /// A relationship in a path must report its stored (start -> end) direction
645 /// even when the path traversed it backward (undirected `-[r]-` or incoming
646 /// `<-[r]-`). This first consults the L0 visibility chain (exact for
647 /// in-memory edges); if the edge has been flushed to durable storage and is
648 /// no longer L0-resident, it recovers the orientation with a bounded
649 /// directed-outgoing adjacency probe: the edge is stored
650 /// `(traversal_src -> traversal_dst)` iff `eid` appears among
651 /// `traversal_src`'s outgoing neighbours for one of `edge_type_ids`,
652 /// otherwise it is the reverse. Falls back to the traversal order only when
653 /// the probe is inconclusive (e.g. the type carries no CSR adjacency).
654 ///
655 /// The probe costs at most the out-degree of one vertex per candidate edge
656 /// type — never a full edge scan — and reads the adjacency manager directly,
657 /// so it does not perturb the SSI read-set.
658 ///
659 /// # Examples
660 ///
661 /// ```ignore
662 /// let (src, dst) =
663 /// ctx.resolve_stored_edge_endpoints(eid, node_path[i], node_path[i + 1], &edge_type_ids);
664 /// ```
665 #[must_use]
666 pub fn resolve_stored_edge_endpoints(
667 &self,
668 eid: Eid,
669 traversal_src: Vid,
670 traversal_dst: Vid,
671 edge_type_ids: &[u32],
672 ) -> (u64, u64) {
673 // 1. L0 visibility chain — exact stored endpoints for in-memory edges.
674 let query_ctx = self.query_context();
675 if let Some((src, dst)) =
676 uni_store::runtime::l0_visibility::get_edge_endpoints(eid, &query_ctx)
677 {
678 return (src.as_u64(), dst.as_u64());
679 }
680
681 // 2. Flushed (L1-resident) edge: recover orientation via a directed
682 // outgoing adjacency probe. Read the adjacency manager / versioned
683 // snapshot directly so the probe stays out of the SSI read-set.
684 //
685 // When the caller could not supply the edge's type ids (e.g. an
686 // anonymous `-[]-` relationship reaching BindFixedPath without a
687 // `_type` column), fall back to the adjacency manager's warmed types
688 // — exactly the set traversed by this query, so still a bounded probe.
689 let adjacency_manager = self.adjacency_manager();
690 let warmed_fallback: Vec<u32>;
691 let probe_types: &[u32] = if edge_type_ids.is_empty() {
692 warmed_fallback = adjacency_manager.known_edge_type_ids();
693 &warmed_fallback
694 } else {
695 edge_type_ids
696 };
697 let version_hwm = self.storage.snapshot_version_hwm();
698 let outgoing_contains = |vid: Vid| -> bool {
699 probe_types.iter().any(|&etype| {
700 let neighbors = match version_hwm {
701 Some(hwm) => {
702 self.storage
703 .get_neighbors_at_version(vid, etype, Direction::Outgoing, hwm)
704 }
705 None => adjacency_manager.get_neighbors(vid, etype, Direction::Outgoing),
706 };
707 neighbors.iter().any(|&(_, e)| e == eid)
708 })
709 };
710
711 if outgoing_contains(traversal_src) {
712 (traversal_src.as_u64(), traversal_dst.as_u64())
713 } else if outgoing_contains(traversal_dst) {
714 (traversal_dst.as_u64(), traversal_src.as_u64())
715 } else {
716 // 3. Inconclusive (no CSR adjacency for this type): preserve the
717 // long-standing traversal-order behaviour.
718 (traversal_src.as_u64(), traversal_dst.as_u64())
719 }
720 }
721
722 /// Resolve a single vertex's neighbours, overlaying the transaction L0
723 /// (if visible) and recording the traversal into the SSI read-set.
724 ///
725 /// `tx_guard` is the already-acquired read guard over the transaction
726 /// L0 buffer (if any), so batch callers acquire the lock once and pass
727 /// the borrow in for every vertex.
728 /// When `record_reads` is false the caller takes responsibility for
729 /// recording the traversal into the SSI read-set (the batch path records
730 /// once per batch via [`record_neighbor_reads_batch`]).
731 ///
732 /// [`record_neighbor_reads_batch`]: Self::record_neighbor_reads_batch
733 fn neighbors_for_vid(
734 &self,
735 vid: Vid,
736 edge_type: u32,
737 direction: Direction,
738 version_hwm: Option<u64>,
739 tx_guard: Option<&L0Buffer>,
740 record_reads: bool,
741 ) -> Vec<(Vid, Eid)> {
742 // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
743 // For snapshot queries, filter by version via StorageManager delegate.
744 let mut neighbors = if let Some(hwm) = version_hwm {
745 self.storage
746 .get_neighbors_at_version(vid, edge_type, direction, hwm)
747 } else {
748 self.adjacency_manager()
749 .get_neighbors(vid, edge_type, direction)
750 };
751
752 // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
753 if version_hwm.is_none()
754 && let Some(tx_guard) = tx_guard
755 {
756 overlay_l0_neighbors(
757 vid,
758 edge_type,
759 direction,
760 tx_guard,
761 &mut neighbors,
762 version_hwm,
763 );
764 }
765
766 if record_reads {
767 self.record_neighbor_reads(vid, &neighbors);
768 }
769
770 neighbors
771 }
772
773 /// Records traversed edges and discovered neighbours into the SSI read-set.
774 ///
775 /// No-op unless this is a read-write transaction (`occ_read_set` is `Some`
776 /// only then), so read-only and analytical traversals pay nothing. Recording
777 /// the source plus each neighbour vid and edge id gives item-level
778 /// antidependency coverage for traversals, matching the keyed read paths.
779 fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
780 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
781 return;
782 };
783 let guard = tx_l0.read();
784 let Some(read_set) = &guard.occ_read_set else {
785 return;
786 };
787 let mut rs = read_set.lock();
788 rs.vertices.insert(src);
789 for (nbr, eid) in neighbors {
790 rs.vertices.insert(*nbr);
791 rs.edges.insert(*eid);
792 }
793 }
794
795 /// Batch variant of [`record_neighbor_reads`](Self::record_neighbor_reads):
796 /// records an entire expansion batch under ONE read-set lock instead of
797 /// two lock acquisitions per source vertex.
798 ///
799 /// `srcs` is recorded in full — a source with zero neighbours is still a
800 /// read ("no edges here") that a concurrent edge insert must conflict
801 /// with, exactly as the per-vertex recorder behaves.
802 fn record_neighbor_reads_batch(&self, srcs: &[Vid], triples: &[(Vid, Vid, Eid)]) {
803 if srcs.is_empty() && triples.is_empty() {
804 return;
805 }
806 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
807 return;
808 };
809 let guard = tx_l0.read();
810 let Some(read_set) = &guard.occ_read_set else {
811 return;
812 };
813 let mut rs = read_set.lock();
814 for src in srcs {
815 rs.vertices.insert(*src);
816 }
817 for (_, nbr, eid) in triples {
818 rs.vertices.insert(*nbr);
819 rs.edges.insert(*eid);
820 }
821 }
822
823 /// Records the vertex/edge ids in the given batch columns into the read-set.
824 ///
825 /// Used by [`ReadSetRecordingExec`] to capture the identities of rows that
826 /// survived a scan's filters. No-op when there is no transaction read-set
827 /// (read-only / analytical contexts).
828 ///
829 /// [`ReadSetRecordingExec`]: crate::query::df_graph::ReadSetRecordingExec
830 pub(crate) fn record_batch_ids(
831 &self,
832 batch: &arrow_array::RecordBatch,
833 vertex_cols: &[usize],
834 edge_cols: &[usize],
835 ) {
836 use arrow_array::{Array, UInt64Array};
837
838 if vertex_cols.is_empty() && edge_cols.is_empty() {
839 return;
840 }
841 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
842 return;
843 };
844 let guard = tx_l0.read();
845 let Some(read_set) = &guard.occ_read_set else {
846 return;
847 };
848 let mut rs = read_set.lock();
849 for &col in vertex_cols {
850 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
851 for i in 0..arr.len() {
852 if !arr.is_null(i) {
853 rs.vertices.insert(Vid::from(arr.value(i)));
854 }
855 }
856 }
857 }
858 for &col in edge_cols {
859 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
860 for i in 0..arr.len() {
861 if !arr.is_null(i) {
862 rs.edges.insert(Eid::from(arr.value(i)));
863 }
864 }
865 }
866 }
867 }
868}
869
870/// Overlay L0 buffer neighbors onto existing neighbor list.
871///
872/// Adds new edges from L0 and removes tombstoned edges.
873/// Filters by version if a snapshot boundary is provided.
874fn overlay_l0_neighbors(
875 vid: Vid,
876 edge_type: u32,
877 direction: Direction,
878 l0: &L0Buffer,
879 neighbors: &mut Vec<(Vid, Eid)>,
880 version_hwm: Option<u64>,
881) {
882 use std::collections::HashMap;
883
884 // Convert to map for efficient updates
885 let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
886
887 // Query L0 for each direction
888 for &simple_dir in direction.to_simple_directions() {
889 for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
890 // Skip edges beyond snapshot boundary
891 if version_hwm.is_some_and(|hwm| version > hwm) {
892 continue;
893 }
894
895 // Apply insert or remove tombstone
896 if l0.is_tombstoned(eid) {
897 neighbor_map.remove(&eid);
898 } else {
899 neighbor_map.insert(eid, neighbor);
900 }
901 }
902 }
903
904 // Remove edges tombstoned in this L0 but originating from other layers
905 for eid in l0.tombstones.keys() {
906 neighbor_map.remove(eid);
907 }
908
909 // Convert back to vec
910 *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
911}
912
913/// Extension trait to convert storage Direction to SimpleGraph directions.
914trait DirectionExt {
915 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
916}
917
918impl DirectionExt for Direction {
919 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
920 use uni_common::graph::simple_graph::Direction as SimpleDirection;
921 match self {
922 Direction::Outgoing => &[SimpleDirection::Outgoing],
923 Direction::Incoming => &[SimpleDirection::Incoming],
924 Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
925 }
926 }
927}
928
929#[cfg(test)]
930mod tests {
931 use super::*;
932
933 #[test]
934 fn test_l0_context_empty() {
935 let ctx = L0Context::empty();
936 assert!(ctx.current_l0.is_none());
937 assert!(ctx.transaction_l0.is_none());
938 assert!(ctx.pending_flush_l0s.is_empty());
939 }
940}