Skip to main content

uni_query/query/df_graph/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │      DataFusion ExecutionPlan Tree      │
20//! ├─────────────────────────────────────────┤
21//! │  ProjectionExec (DataFusion)            │
22//! │       │                                 │
23//! │  FilterExec (DataFusion)                │
24//! │       │                                 │
25//! │  GraphTraverseExec (CUSTOM)             │
26//! │       │                                 │
27//! │  GraphScanExec (CUSTOM)                 │
28//! │       │                                 │
29//! │  UniTableProvider + UniMergeExec        │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod common;
43pub mod comprehension;
44pub mod expr_compiler;
45pub mod ext_id_lookup;
46pub mod locy_abduce;
47pub mod locy_assume;
48pub mod locy_ast_builder;
49pub(crate) mod locy_bdd;
50pub mod locy_best_by;
51pub mod locy_delta;
52pub mod locy_derive;
53pub mod locy_errors;
54pub mod locy_eval;
55pub mod locy_explain;
56pub mod locy_fixpoint;
57pub mod locy_fold;
58pub mod locy_priority;
59pub mod locy_program;
60pub mod locy_query;
61pub mod locy_slg;
62pub mod locy_traits;
63pub mod mutation_common;
64pub mod mutation_create;
65pub mod mutation_delete;
66pub mod mutation_foreach;
67pub mod mutation_merge;
68pub mod mutation_remove;
69pub mod mutation_set;
70pub mod nfa;
71pub mod optional_filter;
72pub mod pattern_comprehension;
73pub mod pred_dag;
74pub mod procedure_call;
75pub mod quantifier;
76pub mod recursive_cte;
77pub mod reduce;
78pub mod scan;
79pub mod shortest_path;
80pub(crate) mod similar_to_expr;
81pub mod traverse;
82pub mod unwind;
83pub mod vector_knn;
84
85use crate::query::executor::procedure::ProcedureRegistry;
86use parking_lot::RwLock;
87use std::sync::{Arc, Mutex};
88use std::time::Instant;
89use uni_algo::algo::AlgorithmRegistry;
90use uni_common::core::id::{Eid, Vid};
91use uni_store::runtime::context::QueryContext;
92use uni_store::runtime::l0::L0Buffer;
93use uni_store::runtime::property_manager::PropertyManager;
94use uni_store::storage::adjacency_manager::AdjacencyManager;
95use uni_store::storage::direction::Direction;
96use uni_store::storage::manager::StorageManager;
97use uni_xervo::runtime::ModelRuntime;
98
99use crate::types::QueryWarning;
100
101pub use apply::GraphApplyExec;
102pub use ext_id_lookup::GraphExtIdLookupExec;
103pub use mutation_common::{MutationContext, MutationExec};
104pub use mutation_create::MutationCreateExec;
105pub use mutation_delete::MutationDeleteExec;
106pub use mutation_foreach::ForeachExec;
107pub use mutation_merge::MutationMergeExec;
108pub use mutation_remove::MutationRemoveExec;
109pub use mutation_set::MutationSetExec;
110pub use optional_filter::OptionalFilterExec;
111pub use procedure_call::GraphProcedureCallExec;
112pub use scan::GraphScanExec;
113pub use shortest_path::GraphShortestPathExec;
114pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
115pub use unwind::GraphUnwindExec;
116pub use vector_knn::GraphVectorKnnExec;
117
118pub use locy_best_by::BestByExec;
119pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
120pub use locy_fixpoint::{
121    DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
122    FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
123};
124pub use locy_fold::FoldExec;
125pub use locy_priority::PriorityExec;
126pub use locy_program::{DerivedStore, LocyProgramExec};
127pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
128
129/// Shared context for graph operators.
130///
131/// Provides access to graph-specific resources needed during query execution:
132/// - CSR adjacency cache for fast neighbor lookups
133/// - L0 buffers for MVCC visibility of uncommitted changes
134/// - Property manager for lazy-loading vertex/edge properties
135/// - Storage manager for schema and dataset access
136///
137/// # Example
138///
139/// ```ignore
140/// let ctx = GraphExecutionContext::new(
141///     storage_manager,
142///     l0_buffer,
143///     property_manager,
144/// );
145///
146/// // Get neighbors with L0 overlay
147/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
148/// ```
149pub struct GraphExecutionContext {
150    /// Storage manager for schema and dataset access.
151    storage: Arc<StorageManager>,
152
153    /// L0 visibility context for MVCC.
154    l0_context: L0Context,
155
156    /// Property manager for lazy property loading.
157    property_manager: Arc<PropertyManager>,
158
159    /// Query timeout deadline.
160    deadline: Option<Instant>,
161
162    /// Algorithm registry for `uni.algo.*` procedure dispatch.
163    algo_registry: Option<Arc<AlgorithmRegistry>>,
164
165    /// External procedure registry for test/user-defined procedures.
166    procedure_registry: Option<Arc<ProcedureRegistry>>,
167    /// Uni-Xervo runtime used by vector auto-embedding paths.
168    xervo_runtime: Option<Arc<ModelRuntime>>,
169
170    /// Runtime warnings collected during query execution.
171    warnings: Arc<Mutex<Vec<QueryWarning>>>,
172
173    /// Cooperative cancellation token, threaded from `QueryContext`.
174    cancellation_token: Option<tokio_util::sync::CancellationToken>,
175}
176
177impl std::fmt::Debug for GraphExecutionContext {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        f.debug_struct("GraphExecutionContext")
180            .field("l0_context", &self.l0_context)
181            .field("deadline", &self.deadline)
182            .finish_non_exhaustive()
183    }
184}
185
186/// L0 buffer visibility context for MVCC reads.
187///
188/// Maintains references to all L0 buffers that should be visible to a query:
189/// - Current L0: The active write buffer
190/// - Transaction L0: Buffer for the current transaction (if any)
191/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
192///
193/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
194#[derive(Clone, Default)]
195pub struct L0Context {
196    /// Current active L0 buffer.
197    pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
198
199    /// Transaction-local L0 buffer (if in a transaction).
200    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
201
202    /// L0 buffers pending flush to disk.
203    /// These remain visible until flush completes.
204    pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
205}
206
207impl std::fmt::Debug for L0Context {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("L0Context")
210            .field("current_l0", &self.current_l0.is_some())
211            .field("transaction_l0", &self.transaction_l0.is_some())
212            .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
213            .finish()
214    }
215}
216
217impl L0Context {
218    /// Create an empty L0 context with no buffers.
219    pub fn empty() -> Self {
220        Self::default()
221    }
222
223    /// Create L0 context with just a current buffer.
224    pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
225        Self {
226            current_l0: Some(l0),
227            ..Self::default()
228        }
229    }
230
231    /// Create L0 context from a query context.
232    pub fn from_query_context(ctx: &QueryContext) -> Self {
233        Self {
234            current_l0: Some(ctx.l0.clone()),
235            transaction_l0: ctx.transaction_l0.clone(),
236            pending_flush_l0s: ctx.pending_flush_l0s.clone(),
237        }
238    }
239
240    /// Iterate over all L0 buffers in visibility order.
241    /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
242    pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
243        self.pending_flush_l0s
244            .iter()
245            .chain(self.current_l0.iter())
246            .chain(self.transaction_l0.iter())
247    }
248}
249
250impl GraphExecutionContext {
251    /// Create a new graph execution context.
252    ///
253    /// # Arguments
254    ///
255    /// * `storage` - Storage manager for schema and dataset access
256    /// * `l0` - Current L0 buffer for MVCC visibility
257    /// * `property_manager` - Manager for lazy property loading
258    pub fn new(
259        storage: Arc<StorageManager>,
260        l0: Arc<RwLock<L0Buffer>>,
261        property_manager: Arc<PropertyManager>,
262    ) -> Self {
263        Self {
264            storage,
265            l0_context: L0Context::with_current(l0),
266            property_manager,
267            deadline: None,
268            algo_registry: None,
269            procedure_registry: None,
270            xervo_runtime: None,
271            warnings: Arc::new(Mutex::new(Vec::new())),
272            cancellation_token: None,
273        }
274    }
275
276    /// Create context with full L0 visibility.
277    ///
278    /// # Arguments
279    ///
280    /// * `storage` - Storage manager for schema and dataset access
281    /// * `l0_context` - L0 visibility context with all buffers
282    /// * `property_manager` - Manager for lazy property loading
283    pub fn with_l0_context(
284        storage: Arc<StorageManager>,
285        l0_context: L0Context,
286        property_manager: Arc<PropertyManager>,
287    ) -> Self {
288        Self {
289            storage,
290            l0_context,
291            property_manager,
292            deadline: None,
293            algo_registry: None,
294            procedure_registry: None,
295            xervo_runtime: None,
296            warnings: Arc::new(Mutex::new(Vec::new())),
297            cancellation_token: None,
298        }
299    }
300
301    /// Create context from a query context.
302    pub fn from_query_context(
303        storage: Arc<StorageManager>,
304        query_ctx: &QueryContext,
305        property_manager: Arc<PropertyManager>,
306    ) -> Self {
307        Self {
308            storage,
309            l0_context: L0Context::from_query_context(query_ctx),
310            property_manager,
311            deadline: query_ctx.deadline,
312            algo_registry: None,
313            procedure_registry: None,
314            xervo_runtime: None,
315            warnings: Arc::new(Mutex::new(Vec::new())),
316            cancellation_token: query_ctx.cancellation_token.clone(),
317        }
318    }
319
320    /// Set query timeout deadline.
321    pub fn with_deadline(mut self, deadline: Instant) -> Self {
322        self.deadline = Some(deadline);
323        self
324    }
325
326    /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
327    pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
328        self.algo_registry = Some(registry);
329        self
330    }
331
332    /// Get a reference to the algorithm registry, if set.
333    pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
334        self.algo_registry.as_ref()
335    }
336
337    /// Set the external procedure registry for test/user-defined procedures.
338    pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
339        self.procedure_registry = Some(registry);
340        self
341    }
342
343    /// Set Uni-Xervo runtime for query-time auto-embedding.
344    pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
345        self.xervo_runtime = Some(runtime);
346        self
347    }
348
349    /// Get a reference to the procedure registry, if set.
350    pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
351        self.procedure_registry.as_ref()
352    }
353
354    pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
355        self.xervo_runtime.as_ref()
356    }
357
358    /// Record a runtime warning.
359    pub fn push_warning(&self, warning: QueryWarning) {
360        if let Ok(mut w) = self.warnings.lock() {
361            w.push(warning);
362        }
363    }
364
365    /// Take all collected warnings, leaving the collector empty.
366    pub fn take_warnings(&self) -> Vec<QueryWarning> {
367        self.warnings
368            .lock()
369            .map(|mut w| std::mem::take(&mut *w))
370            .unwrap_or_default()
371    }
372
373    /// Check if the query has timed out.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the deadline has passed.
378    pub fn check_timeout(&self) -> anyhow::Result<()> {
379        if let Some(ref token) = self.cancellation_token
380            && token.is_cancelled()
381        {
382            return Err(anyhow::anyhow!("Query cancelled"));
383        }
384        if let Some(deadline) = self.deadline
385            && Instant::now() > deadline
386        {
387            return Err(anyhow::anyhow!("Query timed out"));
388        }
389        Ok(())
390    }
391
392    /// Get a reference to the storage manager.
393    pub fn storage(&self) -> &Arc<StorageManager> {
394        &self.storage
395    }
396
397    /// Get a reference to the adjacency manager.
398    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
399        self.storage.adjacency_manager()
400    }
401
402    /// Get a reference to the property manager.
403    pub fn property_manager(&self) -> &Arc<PropertyManager> {
404        &self.property_manager
405    }
406
407    /// Get a reference to the L0 context.
408    pub fn l0_context(&self) -> &L0Context {
409        &self.l0_context
410    }
411
412    /// Create a query context for property manager calls.
413    ///
414    /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
415    pub fn query_context(&self) -> QueryContext {
416        let l0 = self
417            .l0_context
418            .current_l0
419            .clone()
420            .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
421
422        let mut ctx = QueryContext::new_with_pending(
423            l0,
424            self.l0_context.transaction_l0.clone(),
425            self.l0_context.pending_flush_l0s.clone(),
426        );
427        if let Some(deadline) = self.deadline {
428            ctx.set_deadline(deadline);
429        }
430        ctx
431    }
432
433    /// Ensure adjacency CSRs are warmed for the given edge types and direction.
434    ///
435    /// This loads any missing CSR data from storage into the adjacency manager
436    /// so that subsequent `get_neighbors` calls return complete results.
437    /// Skips warming if the adjacency manager already has data (Main CSR or
438    /// active overlay) for the edge type, avoiding duplicate entries.
439    pub async fn ensure_adjacency_warmed(
440        &self,
441        edge_type_ids: &[u32],
442        direction: Direction,
443    ) -> anyhow::Result<()> {
444        let am = self.adjacency_manager();
445        let version = self.storage.version_high_water_mark();
446        for &etype_id in edge_type_ids {
447            // Skip if AM already has data (CSR or overlay) for this edge type.
448            // The overlay contains edges from dual-write (Writer), so warming
449            // would duplicate them.
450            if !am.is_active_for(etype_id, direction) {
451                for &dir in direction.expand() {
452                    // Use coalesced warming to prevent cache stampede (Issue #13)
453                    self.storage
454                        .warm_adjacency_coalesced(etype_id, dir, version)
455                        .await?;
456                }
457            }
458        }
459        Ok(())
460    }
461
462    /// Create a boxed warming future for use in DataFusion stream state machines.
463    ///
464    /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
465    /// suitable for polling in stream `poll_next` implementations.
466    pub fn warming_future(
467        self: &Arc<Self>,
468        edge_type_ids: Vec<u32>,
469        direction: Direction,
470    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
471    {
472        let ctx = self.clone();
473        Box::pin(async move {
474            ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
475                .await
476                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
477        })
478    }
479
480    /// Get neighbors for a vertex, merging CSR and all L0 buffers.
481    ///
482    /// This implements the MVCC visibility rules:
483    /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
484    /// 2. Overlay pending flush L0s (oldest to newest)
485    /// 3. Overlay current L0
486    /// 4. Overlay transaction L0 (if present)
487    /// 5. Filter tombstones (handled by overlay)
488    ///
489    /// # Arguments
490    ///
491    /// * `vid` - Source vertex ID
492    /// * `edge_type` - Edge type ID to traverse
493    /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
494    ///
495    /// # Returns
496    ///
497    /// Vector of (neighbor VID, edge ID) pairs.
498    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
499        let am = self.adjacency_manager();
500        let version_hwm = self.storage.version_high_water_mark();
501
502        // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
503        // For snapshot queries, filter by version via StorageManager delegate.
504        let mut neighbors = if let Some(hwm) = version_hwm {
505            self.storage
506                .get_neighbors_at_version(vid, edge_type, direction, hwm)
507        } else {
508            am.get_neighbors(vid, edge_type, direction)
509        };
510
511        // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
512        if version_hwm.is_none()
513            && let Some(tx_l0) = &self.l0_context.transaction_l0
514        {
515            let tx_guard = tx_l0.read();
516            overlay_l0_neighbors(
517                vid,
518                edge_type,
519                direction,
520                &tx_guard,
521                &mut neighbors,
522                version_hwm,
523            );
524        }
525
526        neighbors
527    }
528
529    /// Get neighbors for multiple vertices in batch.
530    ///
531    /// More efficient than calling `get_neighbors` repeatedly as it amortizes
532    /// lock acquisition for L0 buffers.
533    ///
534    /// # Arguments
535    ///
536    /// * `vids` - Source vertex IDs
537    /// * `edge_type` - Edge type ID to traverse
538    /// * `direction` - Traversal direction
539    ///
540    /// # Returns
541    ///
542    /// Vector of (source VID, neighbor VID, edge ID) triples.
543    pub fn get_neighbors_batch(
544        &self,
545        vids: &[Vid],
546        edge_type: u32,
547        direction: Direction,
548    ) -> Vec<(Vid, Vid, Eid)> {
549        let am = self.adjacency_manager();
550        let version_hwm = self.storage.version_high_water_mark();
551
552        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
553
554        let mut results = Vec::new();
555
556        for &vid in vids {
557            let mut neighbors = if let Some(hwm) = version_hwm {
558                self.storage
559                    .get_neighbors_at_version(vid, edge_type, direction, hwm)
560            } else {
561                am.get_neighbors(vid, edge_type, direction)
562            };
563
564            // Overlay transaction L0 if present
565            if version_hwm.is_none()
566                && let Some(ref tx_guard) = tx_guard
567            {
568                overlay_l0_neighbors(
569                    vid,
570                    edge_type,
571                    direction,
572                    tx_guard,
573                    &mut neighbors,
574                    version_hwm,
575                );
576            }
577
578            results.extend(
579                neighbors
580                    .into_iter()
581                    .map(|(neighbor, eid)| (vid, neighbor, eid)),
582            );
583        }
584
585        results
586    }
587}
588
589/// Overlay L0 buffer neighbors onto existing neighbor list.
590///
591/// Adds new edges from L0 and removes tombstoned edges.
592/// Filters by version if a snapshot boundary is provided.
593fn overlay_l0_neighbors(
594    vid: Vid,
595    edge_type: u32,
596    direction: Direction,
597    l0: &L0Buffer,
598    neighbors: &mut Vec<(Vid, Eid)>,
599    version_hwm: Option<u64>,
600) {
601    use std::collections::HashMap;
602
603    // Convert to map for efficient updates
604    let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
605
606    // Query L0 for each direction
607    for &simple_dir in direction.to_simple_directions() {
608        for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
609            // Skip edges beyond snapshot boundary
610            if version_hwm.is_some_and(|hwm| version > hwm) {
611                continue;
612            }
613
614            // Apply insert or remove tombstone
615            if l0.is_tombstoned(eid) {
616                neighbor_map.remove(&eid);
617            } else {
618                neighbor_map.insert(eid, neighbor);
619            }
620        }
621    }
622
623    // Remove edges tombstoned in this L0 but originating from other layers
624    for eid in l0.tombstones.keys() {
625        neighbor_map.remove(eid);
626    }
627
628    // Convert back to vec
629    *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
630}
631
632/// Extension trait to convert storage Direction to SimpleGraph directions.
633trait DirectionExt {
634    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
635}
636
637impl DirectionExt for Direction {
638    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
639        use uni_common::graph::simple_graph::Direction as SimpleDirection;
640        match self {
641            Direction::Outgoing => &[SimpleDirection::Outgoing],
642            Direction::Incoming => &[SimpleDirection::Incoming],
643            Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
644        }
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651
652    #[test]
653    fn test_l0_context_empty() {
654        let ctx = L0Context::empty();
655        assert!(ctx.current_l0.is_none());
656        assert!(ctx.transaction_l0.is_none());
657        assert!(ctx.pending_flush_l0s.is_empty());
658    }
659}