Skip to main content

uni_query/query/df_graph/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Custom graph operators for DataFusion execution.
5//!
6//! This module provides DataFusion `ExecutionPlan` implementations for graph-specific
7//! operations that cannot be expressed in standard relational algebra:
8//!
9//! - [`GraphScanExec`]: Scans vertices/edges with property materialization
10//! - [`GraphExtIdLookupExec`]: Looks up a vertex by external ID
11//! - [`GraphTraverseExec`]: Single-hop edge traversal using CSR adjacency
12//! - `GraphVariableLengthTraverseExec`: Multi-hop BFS traversal
13//! - [`GraphShortestPathExec`]: Shortest path computation
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │      DataFusion ExecutionPlan Tree      │
20//! ├─────────────────────────────────────────┤
21//! │  ProjectionExec (DataFusion)            │
22//! │       │                                 │
23//! │  FilterExec (DataFusion)                │
24//! │       │                                 │
25//! │  GraphTraverseExec (CUSTOM)             │
26//! │       │                                 │
27//! │  GraphScanExec (CUSTOM)                 │
28//! │       │                                 │
29//! │  UniTableProvider + UniMergeExec        │
30//! └─────────────────────────────────────────┘
31//! ```
32//!
33//! Graph operators use [`GraphExecutionContext`] to access:
34//! - AdjacencyManager for O(1) neighbor lookups
35//! - L0 buffers for uncommitted edge visibility
36//! - Property manager for lazy property loading
37
38pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod common;
43pub mod comprehension;
44pub mod expr_compiler;
45pub mod ext_id_lookup;
46pub mod locy_abduce;
47pub mod locy_assume;
48pub mod locy_ast_builder;
49pub(crate) mod locy_bdd;
50pub mod locy_best_by;
51pub mod locy_delta;
52pub mod locy_derive;
53pub mod locy_errors;
54pub mod locy_eval;
55pub mod locy_explain;
56pub mod locy_fixpoint;
57pub mod locy_fold;
58pub mod locy_priority;
59pub mod locy_program;
60pub mod locy_query;
61pub mod locy_slg;
62pub mod locy_traits;
63pub mod mutation_common;
64pub mod mutation_create;
65pub mod mutation_delete;
66pub mod mutation_foreach;
67pub mod mutation_merge;
68pub mod mutation_remove;
69pub mod mutation_set;
70pub mod nfa;
71pub mod optional_filter;
72pub mod pattern_comprehension;
73pub mod pred_dag;
74pub mod procedure_call;
75pub mod quantifier;
76pub mod recursive_cte;
77pub mod reduce;
78pub mod scan;
79pub mod shortest_path;
80pub(crate) mod similar_to_expr;
81pub mod traverse;
82pub mod unwind;
83pub mod vector_knn;
84
85use crate::query::executor::procedure::ProcedureRegistry;
86use parking_lot::RwLock;
87use std::sync::{Arc, Mutex};
88use std::time::Instant;
89use uni_algo::algo::AlgorithmRegistry;
90use uni_common::core::id::{Eid, Vid};
91use uni_store::runtime::context::QueryContext;
92use uni_store::runtime::l0::L0Buffer;
93use uni_store::runtime::property_manager::PropertyManager;
94use uni_store::storage::adjacency_manager::AdjacencyManager;
95use uni_store::storage::direction::Direction;
96use uni_store::storage::manager::StorageManager;
97use uni_xervo::runtime::ModelRuntime;
98
99use crate::types::QueryWarning;
100
101pub use apply::GraphApplyExec;
102pub use ext_id_lookup::GraphExtIdLookupExec;
103pub use mutation_common::{MutationContext, MutationExec};
104pub use mutation_create::MutationCreateExec;
105pub use mutation_delete::MutationDeleteExec;
106pub use mutation_foreach::ForeachExec;
107pub use mutation_merge::MutationMergeExec;
108pub use mutation_remove::MutationRemoveExec;
109pub use mutation_set::MutationSetExec;
110pub use optional_filter::OptionalFilterExec;
111pub use procedure_call::GraphProcedureCallExec;
112pub use scan::GraphScanExec;
113pub use shortest_path::GraphShortestPathExec;
114pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
115pub use unwind::GraphUnwindExec;
116pub use vector_knn::GraphVectorKnnExec;
117
118pub use locy_best_by::BestByExec;
119pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
120pub use locy_fixpoint::{
121    DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
122    FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
123};
124pub use locy_fold::FoldExec;
125pub use locy_priority::PriorityExec;
126pub use locy_program::{DerivedStore, LocyProgramExec};
127pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
128
129/// Shared context for graph operators.
130///
131/// Provides access to graph-specific resources needed during query execution:
132/// - CSR adjacency cache for fast neighbor lookups
133/// - L0 buffers for MVCC visibility of uncommitted changes
134/// - Property manager for lazy-loading vertex/edge properties
135/// - Storage manager for schema and dataset access
136///
137/// # Example
138///
139/// ```ignore
140/// let ctx = GraphExecutionContext::new(
141///     storage_manager,
142///     l0_buffer,
143///     property_manager,
144/// );
145///
146/// // Get neighbors with L0 overlay
147/// let neighbors = ctx.get_neighbors(vid, edge_type_id, Direction::Outgoing);
148/// ```
149pub struct GraphExecutionContext {
150    /// Storage manager for schema and dataset access.
151    storage: Arc<StorageManager>,
152
153    /// L0 visibility context for MVCC.
154    l0_context: L0Context,
155
156    /// Property manager for lazy property loading.
157    property_manager: Arc<PropertyManager>,
158
159    /// Query timeout deadline.
160    deadline: Option<Instant>,
161
162    /// Algorithm registry for `uni.algo.*` procedure dispatch.
163    algo_registry: Option<Arc<AlgorithmRegistry>>,
164
165    /// External procedure registry for test/user-defined procedures.
166    procedure_registry: Option<Arc<ProcedureRegistry>>,
167    /// Uni-Xervo runtime used by vector auto-embedding paths.
168    xervo_runtime: Option<Arc<ModelRuntime>>,
169
170    /// Runtime warnings collected during query execution.
171    warnings: Arc<Mutex<Vec<QueryWarning>>>,
172}
173
174impl std::fmt::Debug for GraphExecutionContext {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("GraphExecutionContext")
177            .field("l0_context", &self.l0_context)
178            .field("deadline", &self.deadline)
179            .finish_non_exhaustive()
180    }
181}
182
183/// L0 buffer visibility context for MVCC reads.
184///
185/// Maintains references to all L0 buffers that should be visible to a query:
186/// - Current L0: The active write buffer
187/// - Transaction L0: Buffer for the current transaction (if any)
188/// - Pending flush L0s: Buffers being flushed to disk (still visible to reads)
189///
190/// The visibility order is: pending flush L0s (oldest first) → current L0 → transaction L0.
191#[derive(Clone, Default)]
192pub struct L0Context {
193    /// Current active L0 buffer.
194    pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
195
196    /// Transaction-local L0 buffer (if in a transaction).
197    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
198
199    /// L0 buffers pending flush to disk.
200    /// These remain visible until flush completes.
201    pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
202}
203
204impl std::fmt::Debug for L0Context {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        f.debug_struct("L0Context")
207            .field("current_l0", &self.current_l0.is_some())
208            .field("transaction_l0", &self.transaction_l0.is_some())
209            .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
210            .finish()
211    }
212}
213
214impl L0Context {
215    /// Create an empty L0 context with no buffers.
216    pub fn empty() -> Self {
217        Self::default()
218    }
219
220    /// Create L0 context with just a current buffer.
221    pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
222        Self {
223            current_l0: Some(l0),
224            ..Self::default()
225        }
226    }
227
228    /// Create L0 context from a query context.
229    pub fn from_query_context(ctx: &QueryContext) -> Self {
230        Self {
231            current_l0: Some(ctx.l0.clone()),
232            transaction_l0: ctx.transaction_l0.clone(),
233            pending_flush_l0s: ctx.pending_flush_l0s.clone(),
234        }
235    }
236
237    /// Iterate over all L0 buffers in visibility order.
238    /// Order: pending flush L0s (oldest first), then current L0, then transaction L0.
239    pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
240        self.pending_flush_l0s
241            .iter()
242            .chain(self.current_l0.iter())
243            .chain(self.transaction_l0.iter())
244    }
245}
246
247impl GraphExecutionContext {
248    /// Create a new graph execution context.
249    ///
250    /// # Arguments
251    ///
252    /// * `storage` - Storage manager for schema and dataset access
253    /// * `l0` - Current L0 buffer for MVCC visibility
254    /// * `property_manager` - Manager for lazy property loading
255    pub fn new(
256        storage: Arc<StorageManager>,
257        l0: Arc<RwLock<L0Buffer>>,
258        property_manager: Arc<PropertyManager>,
259    ) -> Self {
260        Self {
261            storage,
262            l0_context: L0Context::with_current(l0),
263            property_manager,
264            deadline: None,
265            algo_registry: None,
266            procedure_registry: None,
267            xervo_runtime: None,
268            warnings: Arc::new(Mutex::new(Vec::new())),
269        }
270    }
271
272    /// Create context with full L0 visibility.
273    ///
274    /// # Arguments
275    ///
276    /// * `storage` - Storage manager for schema and dataset access
277    /// * `l0_context` - L0 visibility context with all buffers
278    /// * `property_manager` - Manager for lazy property loading
279    pub fn with_l0_context(
280        storage: Arc<StorageManager>,
281        l0_context: L0Context,
282        property_manager: Arc<PropertyManager>,
283    ) -> Self {
284        Self {
285            storage,
286            l0_context,
287            property_manager,
288            deadline: None,
289            algo_registry: None,
290            procedure_registry: None,
291            xervo_runtime: None,
292            warnings: Arc::new(Mutex::new(Vec::new())),
293        }
294    }
295
296    /// Create context from a query context.
297    pub fn from_query_context(
298        storage: Arc<StorageManager>,
299        query_ctx: &QueryContext,
300        property_manager: Arc<PropertyManager>,
301    ) -> Self {
302        Self {
303            storage,
304            l0_context: L0Context::from_query_context(query_ctx),
305            property_manager,
306            deadline: query_ctx.deadline,
307            algo_registry: None,
308            procedure_registry: None,
309            xervo_runtime: None,
310            warnings: Arc::new(Mutex::new(Vec::new())),
311        }
312    }
313
314    /// Set query timeout deadline.
315    pub fn with_deadline(mut self, deadline: Instant) -> Self {
316        self.deadline = Some(deadline);
317        self
318    }
319
320    /// Set the algorithm registry for `uni.algo.*` procedure dispatch.
321    pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
322        self.algo_registry = Some(registry);
323        self
324    }
325
326    /// Get a reference to the algorithm registry, if set.
327    pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
328        self.algo_registry.as_ref()
329    }
330
331    /// Set the external procedure registry for test/user-defined procedures.
332    pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
333        self.procedure_registry = Some(registry);
334        self
335    }
336
337    /// Set Uni-Xervo runtime for query-time auto-embedding.
338    pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
339        self.xervo_runtime = Some(runtime);
340        self
341    }
342
343    /// Get a reference to the procedure registry, if set.
344    pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
345        self.procedure_registry.as_ref()
346    }
347
348    pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
349        self.xervo_runtime.as_ref()
350    }
351
352    /// Record a runtime warning.
353    pub fn push_warning(&self, warning: QueryWarning) {
354        if let Ok(mut w) = self.warnings.lock() {
355            w.push(warning);
356        }
357    }
358
359    /// Take all collected warnings, leaving the collector empty.
360    pub fn take_warnings(&self) -> Vec<QueryWarning> {
361        self.warnings
362            .lock()
363            .map(|mut w| std::mem::take(&mut *w))
364            .unwrap_or_default()
365    }
366
367    /// Check if the query has timed out.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the deadline has passed.
372    pub fn check_timeout(&self) -> anyhow::Result<()> {
373        if let Some(deadline) = self.deadline
374            && Instant::now() > deadline
375        {
376            return Err(anyhow::anyhow!("Query timed out"));
377        }
378        Ok(())
379    }
380
381    /// Get a reference to the storage manager.
382    pub fn storage(&self) -> &Arc<StorageManager> {
383        &self.storage
384    }
385
386    /// Get a reference to the adjacency manager.
387    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
388        self.storage.adjacency_manager()
389    }
390
391    /// Get a reference to the property manager.
392    pub fn property_manager(&self) -> &Arc<PropertyManager> {
393        &self.property_manager
394    }
395
396    /// Get a reference to the L0 context.
397    pub fn l0_context(&self) -> &L0Context {
398        &self.l0_context
399    }
400
401    /// Create a query context for property manager calls.
402    ///
403    /// If there is no current L0 buffer (e.g., for snapshot queries), creates an empty one.
404    pub fn query_context(&self) -> QueryContext {
405        let l0 = self
406            .l0_context
407            .current_l0
408            .clone()
409            .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
410
411        let mut ctx = QueryContext::new_with_pending(
412            l0,
413            self.l0_context.transaction_l0.clone(),
414            self.l0_context.pending_flush_l0s.clone(),
415        );
416        if let Some(deadline) = self.deadline {
417            ctx.set_deadline(deadline);
418        }
419        ctx
420    }
421
422    /// Ensure adjacency CSRs are warmed for the given edge types and direction.
423    ///
424    /// This loads any missing CSR data from storage into the adjacency manager
425    /// so that subsequent `get_neighbors` calls return complete results.
426    /// Skips warming if the adjacency manager already has data (Main CSR or
427    /// active overlay) for the edge type, avoiding duplicate entries.
428    pub async fn ensure_adjacency_warmed(
429        &self,
430        edge_type_ids: &[u32],
431        direction: Direction,
432    ) -> anyhow::Result<()> {
433        let am = self.adjacency_manager();
434        let version = self.storage.version_high_water_mark();
435        for &etype_id in edge_type_ids {
436            // Skip if AM already has data (CSR or overlay) for this edge type.
437            // The overlay contains edges from dual-write (Writer), so warming
438            // would duplicate them.
439            if !am.is_active_for(etype_id, direction) {
440                for &dir in direction.expand() {
441                    // Use coalesced warming to prevent cache stampede (Issue #13)
442                    self.storage
443                        .warm_adjacency_coalesced(etype_id, dir, version)
444                        .await?;
445                }
446            }
447        }
448        Ok(())
449    }
450
451    /// Create a boxed warming future for use in DataFusion stream state machines.
452    ///
453    /// Wraps `ensure_adjacency_warmed` into a `Pin<Box<dyn Future<Output = DFResult<()>> + Send>>`
454    /// suitable for polling in stream `poll_next` implementations.
455    pub fn warming_future(
456        self: &Arc<Self>,
457        edge_type_ids: Vec<u32>,
458        direction: Direction,
459    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
460    {
461        let ctx = self.clone();
462        Box::pin(async move {
463            ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
464                .await
465                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
466        })
467    }
468
469    /// Get neighbors for a vertex, merging CSR and all L0 buffers.
470    ///
471    /// This implements the MVCC visibility rules:
472    /// 1. Load from CSR (L2 + L1 merged, auto-warms on cache miss)
473    /// 2. Overlay pending flush L0s (oldest to newest)
474    /// 3. Overlay current L0
475    /// 4. Overlay transaction L0 (if present)
476    /// 5. Filter tombstones (handled by overlay)
477    ///
478    /// # Arguments
479    ///
480    /// * `vid` - Source vertex ID
481    /// * `edge_type` - Edge type ID to traverse
482    /// * `direction` - Traversal direction (Outgoing, Incoming, or Both)
483    ///
484    /// # Returns
485    ///
486    /// Vector of (neighbor VID, edge ID) pairs.
487    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
488        let am = self.adjacency_manager();
489        let version_hwm = self.storage.version_high_water_mark();
490
491        // Use AdjacencyManager which reads Main CSR + overlay (dual-write).
492        // For snapshot queries, filter by version via StorageManager delegate.
493        let mut neighbors = if let Some(hwm) = version_hwm {
494            self.storage
495                .get_neighbors_at_version(vid, edge_type, direction, hwm)
496        } else {
497            am.get_neighbors(vid, edge_type, direction)
498        };
499
500        // Overlay transaction L0 if present (transaction edges bypass Writer/AM).
501        if version_hwm.is_none()
502            && let Some(tx_l0) = &self.l0_context.transaction_l0
503        {
504            let tx_guard = tx_l0.read();
505            overlay_l0_neighbors(
506                vid,
507                edge_type,
508                direction,
509                &tx_guard,
510                &mut neighbors,
511                version_hwm,
512            );
513        }
514
515        neighbors
516    }
517
518    /// Get neighbors for multiple vertices in batch.
519    ///
520    /// More efficient than calling `get_neighbors` repeatedly as it amortizes
521    /// lock acquisition for L0 buffers.
522    ///
523    /// # Arguments
524    ///
525    /// * `vids` - Source vertex IDs
526    /// * `edge_type` - Edge type ID to traverse
527    /// * `direction` - Traversal direction
528    ///
529    /// # Returns
530    ///
531    /// Vector of (source VID, neighbor VID, edge ID) triples.
532    pub fn get_neighbors_batch(
533        &self,
534        vids: &[Vid],
535        edge_type: u32,
536        direction: Direction,
537    ) -> Vec<(Vid, Vid, Eid)> {
538        let am = self.adjacency_manager();
539        let version_hwm = self.storage.version_high_water_mark();
540
541        let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
542
543        let mut results = Vec::new();
544
545        for &vid in vids {
546            let mut neighbors = if let Some(hwm) = version_hwm {
547                self.storage
548                    .get_neighbors_at_version(vid, edge_type, direction, hwm)
549            } else {
550                am.get_neighbors(vid, edge_type, direction)
551            };
552
553            // Overlay transaction L0 if present
554            if version_hwm.is_none()
555                && let Some(ref tx_guard) = tx_guard
556            {
557                overlay_l0_neighbors(
558                    vid,
559                    edge_type,
560                    direction,
561                    tx_guard,
562                    &mut neighbors,
563                    version_hwm,
564                );
565            }
566
567            results.extend(
568                neighbors
569                    .into_iter()
570                    .map(|(neighbor, eid)| (vid, neighbor, eid)),
571            );
572        }
573
574        results
575    }
576}
577
578/// Overlay L0 buffer neighbors onto existing neighbor list.
579///
580/// Adds new edges from L0 and removes tombstoned edges.
581/// Filters by version if a snapshot boundary is provided.
582fn overlay_l0_neighbors(
583    vid: Vid,
584    edge_type: u32,
585    direction: Direction,
586    l0: &L0Buffer,
587    neighbors: &mut Vec<(Vid, Eid)>,
588    version_hwm: Option<u64>,
589) {
590    use std::collections::HashMap;
591
592    // Convert to map for efficient updates
593    let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
594
595    // Query L0 for each direction
596    for &simple_dir in direction.to_simple_directions() {
597        for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
598            // Skip edges beyond snapshot boundary
599            if version_hwm.is_some_and(|hwm| version > hwm) {
600                continue;
601            }
602
603            // Apply insert or remove tombstone
604            if l0.is_tombstoned(eid) {
605                neighbor_map.remove(&eid);
606            } else {
607                neighbor_map.insert(eid, neighbor);
608            }
609        }
610    }
611
612    // Remove edges tombstoned in this L0 but originating from other layers
613    for eid in l0.tombstones.keys() {
614        neighbor_map.remove(eid);
615    }
616
617    // Convert back to vec
618    *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
619}
620
621/// Extension trait to convert storage Direction to SimpleGraph directions.
622trait DirectionExt {
623    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
624}
625
626impl DirectionExt for Direction {
627    fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
628        use uni_common::graph::simple_graph::Direction as SimpleDirection;
629        match self {
630            Direction::Outgoing => &[SimpleDirection::Outgoing],
631            Direction::Incoming => &[SimpleDirection::Incoming],
632            Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
633        }
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640
641    #[test]
642    fn test_l0_context_empty() {
643        let ctx = L0Context::empty();
644        assert!(ctx.current_l0.is_none());
645        assert!(ctx.transaction_l0.is_none());
646        assert!(ctx.pending_flush_l0s.is_empty());
647    }
648}