1pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod catalog_scan;
43pub mod common;
44pub mod comprehension;
45pub mod expr_compiler;
46pub mod ext_id_lookup;
47pub mod locy_abduce;
48pub mod locy_assume;
49pub mod locy_ast_builder;
50pub(crate) mod locy_bdd;
51pub mod locy_best_by;
52pub mod locy_calibrate;
53pub mod locy_delta;
54pub mod locy_derive;
55pub mod locy_errors;
56pub mod locy_eval;
57pub mod locy_explain;
58pub mod locy_fixpoint;
59pub mod locy_fold;
60pub mod locy_model_invoke;
61pub mod locy_priority;
62pub mod locy_program;
63pub mod locy_query;
64pub mod locy_slg;
65pub mod locy_traits;
66pub mod locy_validate;
67pub mod mutation_common;
68pub mod mutation_delete;
69pub mod mutation_foreach;
70pub mod mutation_remove;
71pub mod mutation_set;
72pub mod nfa;
73pub mod optional_filter;
74pub mod pattern_comprehension;
75pub mod pattern_exists;
76pub mod pred_dag;
77pub mod procedure_call;
78pub mod quantifier;
79mod read_set_exec;
80pub mod recursive_cte;
81pub mod reduce;
82pub mod scan;
83pub mod shortest_path;
84pub(crate) mod similar_to_expr;
85pub mod traverse;
86pub mod unwind;
87pub mod vector_knn;
88pub mod vid_lookup_join;
89
90use crate::query::executor::procedure::ProcedureRegistry;
91use parking_lot::RwLock;
92use std::sync::{Arc, Mutex};
93use std::time::Instant;
94use uni_algo::algo::AlgorithmRegistry;
95use uni_common::core::id::{Eid, Vid};
96use uni_store::runtime::context::QueryContext;
97use uni_store::runtime::l0::L0Buffer;
98use uni_store::runtime::property_manager::PropertyManager;
99use uni_store::storage::adjacency_manager::AdjacencyManager;
100use uni_store::storage::direction::Direction;
101use uni_store::storage::manager::StorageManager;
102
103pub mod search_procedures;
104use uni_xervo::runtime::ModelRuntime;
105
106use crate::types::QueryWarning;
107
108pub use apply::GraphApplyExec;
109pub use ext_id_lookup::GraphExtIdLookupExec;
110pub use mutation_common::{
114 MutationContext, MutationExec, MutationExec as MutationCreateExec,
115 MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
116};
117pub use mutation_delete::MutationDeleteExec;
118pub use mutation_foreach::ForeachExec;
119pub use mutation_remove::MutationRemoveExec;
120pub use mutation_set::MutationSetExec;
121pub use optional_filter::OptionalFilterExec;
122pub use procedure_call::GraphProcedureCallExec;
123pub use read_set_exec::ReadSetRecordingExec;
124pub use scan::GraphScanExec;
125pub use shortest_path::GraphShortestPathExec;
126pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
127pub use unwind::GraphUnwindExec;
128pub use vector_knn::GraphVectorKnnExec;
129
130pub use locy_best_by::BestByExec;
131pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
132pub use locy_fixpoint::{
133 DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
134 FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
135};
136pub use locy_fold::FoldExec;
137pub use locy_priority::PriorityExec;
138pub use locy_program::{DerivedStore, LocyProgramExec};
139pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
140
141pub struct GraphExecutionContext {
162 storage: Arc<StorageManager>,
164
165 l0_context: L0Context,
167
168 property_manager: Arc<PropertyManager>,
170
171 deadline: Option<Instant>,
173
174 algo_registry: Option<Arc<AlgorithmRegistry>>,
176
177 procedure_registry: Option<Arc<ProcedureRegistry>>,
179 plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
183 xervo_runtime: Option<Arc<ModelRuntime>>,
185
186 warnings: Arc<Mutex<Vec<QueryWarning>>>,
188
189 cancellation_token: Option<tokio_util::sync::CancellationToken>,
191
192 writer: Option<Arc<uni_store::Writer>>,
200}
201
202impl std::fmt::Debug for GraphExecutionContext {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("GraphExecutionContext")
205 .field("l0_context", &self.l0_context)
206 .field("deadline", &self.deadline)
207 .finish_non_exhaustive()
208 }
209}
210
211#[derive(Clone, Default)]
220pub struct L0Context {
221 pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
223
224 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
226
227 pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
230}
231
232impl std::fmt::Debug for L0Context {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("L0Context")
235 .field("current_l0", &self.current_l0.is_some())
236 .field("transaction_l0", &self.transaction_l0.is_some())
237 .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
238 .finish()
239 }
240}
241
242impl L0Context {
243 pub fn empty() -> Self {
245 Self::default()
246 }
247
248 pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
250 Self {
251 current_l0: Some(l0),
252 ..Self::default()
253 }
254 }
255
256 pub fn from_query_context(ctx: &QueryContext) -> Self {
258 Self {
259 current_l0: Some(ctx.l0.clone()),
260 transaction_l0: ctx.transaction_l0.clone(),
261 pending_flush_l0s: ctx.pending_flush_l0s.clone(),
262 }
263 }
264
265 pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
268 self.pending_flush_l0s
269 .iter()
270 .chain(self.current_l0.iter())
271 .chain(self.transaction_l0.iter())
272 }
273}
274
275impl GraphExecutionContext {
276 fn with_parts(
280 storage: Arc<StorageManager>,
281 l0_context: L0Context,
282 property_manager: Arc<PropertyManager>,
283 deadline: Option<Instant>,
284 cancellation_token: Option<tokio_util::sync::CancellationToken>,
285 ) -> Self {
286 Self {
287 storage,
288 l0_context,
289 property_manager,
290 deadline,
291 algo_registry: None,
292 procedure_registry: None,
293 plugin_registry: None,
294 xervo_runtime: None,
295 warnings: Arc::new(Mutex::new(Vec::new())),
296 cancellation_token,
297 writer: None,
298 }
299 }
300
301 pub fn new(
309 storage: Arc<StorageManager>,
310 l0: Arc<RwLock<L0Buffer>>,
311 property_manager: Arc<PropertyManager>,
312 ) -> Self {
313 Self::with_parts(
314 storage,
315 L0Context::with_current(l0),
316 property_manager,
317 None,
318 None,
319 )
320 }
321
322 pub fn with_l0_context(
330 storage: Arc<StorageManager>,
331 l0_context: L0Context,
332 property_manager: Arc<PropertyManager>,
333 ) -> Self {
334 Self::with_parts(storage, l0_context, property_manager, None, None)
335 }
336
337 pub fn from_query_context(
339 storage: Arc<StorageManager>,
340 query_ctx: &QueryContext,
341 property_manager: Arc<PropertyManager>,
342 ) -> Self {
343 Self::with_parts(
344 storage,
345 L0Context::from_query_context(query_ctx),
346 property_manager,
347 query_ctx.deadline,
348 query_ctx.cancellation_token.clone(),
349 )
350 }
351
352 pub fn with_deadline(mut self, deadline: Instant) -> Self {
354 self.deadline = Some(deadline);
355 self
356 }
357
358 #[must_use]
362 pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
363 self.writer = Some(writer);
364 self
365 }
366
367 #[must_use]
369 pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
370 self.writer.as_ref()
371 }
372
373 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
375 self.algo_registry = Some(registry);
376 self
377 }
378
379 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
381 self.algo_registry.as_ref()
382 }
383
384 pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
386 self.procedure_registry = Some(registry);
387 self
388 }
389
390 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
392 self.xervo_runtime = Some(runtime);
393 self
394 }
395
396 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
398 self.procedure_registry.as_ref()
399 }
400
401 pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
404 self.plugin_registry = Some(registry);
405 self
406 }
407
408 pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
410 self.plugin_registry.as_ref()
411 }
412
413 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
414 self.xervo_runtime.as_ref()
415 }
416
417 pub fn push_warning(&self, warning: QueryWarning) {
419 if let Ok(mut w) = self.warnings.lock() {
420 w.push(warning);
421 }
422 }
423
424 pub fn take_warnings(&self) -> Vec<QueryWarning> {
426 self.warnings
427 .lock()
428 .map(|mut w| std::mem::take(&mut *w))
429 .unwrap_or_default()
430 }
431
432 pub fn check_timeout(&self) -> anyhow::Result<()> {
438 if let Some(ref token) = self.cancellation_token
439 && token.is_cancelled()
440 {
441 return Err(anyhow::anyhow!("Query cancelled"));
442 }
443 if let Some(deadline) = self.deadline
444 && Instant::now() > deadline
445 {
446 return Err(anyhow::anyhow!("Query timed out"));
447 }
448 Ok(())
449 }
450
451 pub fn storage(&self) -> &Arc<StorageManager> {
453 &self.storage
454 }
455
456 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
458 self.storage.adjacency_manager()
459 }
460
461 pub fn property_manager(&self) -> &Arc<PropertyManager> {
463 &self.property_manager
464 }
465
466 pub fn l0_context(&self) -> &L0Context {
468 &self.l0_context
469 }
470
471 #[must_use]
477 pub fn deadline_for_host(&self) -> Option<Instant> {
478 self.deadline
479 }
480
481 #[must_use]
486 pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
487 self.cancellation_token.clone()
488 }
489
490 pub fn query_context(&self) -> QueryContext {
494 let l0 = self
495 .l0_context
496 .current_l0
497 .clone()
498 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
499
500 let mut ctx = QueryContext::new_with_pending(
501 l0,
502 self.l0_context.transaction_l0.clone(),
503 self.l0_context.pending_flush_l0s.clone(),
504 );
505 if let Some(deadline) = self.deadline {
506 ctx.set_deadline(deadline);
507 }
508 ctx
509 }
510
511 pub async fn ensure_adjacency_warmed(
518 &self,
519 edge_type_ids: &[u32],
520 direction: Direction,
521 ) -> anyhow::Result<()> {
522 let am = self.adjacency_manager();
523 let version = self.storage.version_high_water_mark();
524 for &etype_id in edge_type_ids {
525 if !am.is_active_for(etype_id, direction) {
529 for &dir in direction.expand() {
530 self.storage
532 .warm_adjacency_coalesced(etype_id, dir, version)
533 .await?;
534 }
535 }
536 }
537 Ok(())
538 }
539
540 pub fn warming_future(
545 self: &Arc<Self>,
546 edge_type_ids: Vec<u32>,
547 direction: Direction,
548 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
549 {
550 let ctx = self.clone();
551 Box::pin(async move {
552 ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
553 .await
554 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
555 })
556 }
557
558 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
577 let version_hwm = self.storage.version_high_water_mark();
578 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
581 self.neighbors_for_vid(vid, edge_type, direction, version_hwm, tx_guard.as_deref())
582 }
583
584 pub fn get_neighbors_batch(
599 &self,
600 vids: &[Vid],
601 edge_type: u32,
602 direction: Direction,
603 ) -> Vec<(Vid, Vid, Eid)> {
604 let version_hwm = self.storage.version_high_water_mark();
605 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
606
607 let mut results = Vec::new();
608 for &vid in vids {
609 let neighbors =
610 self.neighbors_for_vid(vid, edge_type, direction, version_hwm, tx_guard.as_deref());
611 results.extend(
612 neighbors
613 .into_iter()
614 .map(|(neighbor, eid)| (vid, neighbor, eid)),
615 );
616 }
617 results
618 }
619
620 fn neighbors_for_vid(
627 &self,
628 vid: Vid,
629 edge_type: u32,
630 direction: Direction,
631 version_hwm: Option<u64>,
632 tx_guard: Option<&L0Buffer>,
633 ) -> Vec<(Vid, Eid)> {
634 let mut neighbors = if let Some(hwm) = version_hwm {
637 self.storage
638 .get_neighbors_at_version(vid, edge_type, direction, hwm)
639 } else {
640 self.adjacency_manager()
641 .get_neighbors(vid, edge_type, direction)
642 };
643
644 if version_hwm.is_none()
646 && let Some(tx_guard) = tx_guard
647 {
648 overlay_l0_neighbors(
649 vid,
650 edge_type,
651 direction,
652 tx_guard,
653 &mut neighbors,
654 version_hwm,
655 );
656 }
657
658 self.record_neighbor_reads(vid, &neighbors);
659
660 neighbors
661 }
662
663 fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
670 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
671 return;
672 };
673 let guard = tx_l0.read();
674 let Some(read_set) = &guard.occ_read_set else {
675 return;
676 };
677 let mut rs = read_set.lock();
678 rs.vertices.insert(src);
679 for (nbr, eid) in neighbors {
680 rs.vertices.insert(*nbr);
681 rs.edges.insert(*eid);
682 }
683 }
684
685 pub(crate) fn record_batch_ids(
693 &self,
694 batch: &arrow_array::RecordBatch,
695 vertex_cols: &[usize],
696 edge_cols: &[usize],
697 ) {
698 use arrow_array::{Array, UInt64Array};
699
700 if vertex_cols.is_empty() && edge_cols.is_empty() {
701 return;
702 }
703 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
704 return;
705 };
706 let guard = tx_l0.read();
707 let Some(read_set) = &guard.occ_read_set else {
708 return;
709 };
710 let mut rs = read_set.lock();
711 for &col in vertex_cols {
712 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
713 for i in 0..arr.len() {
714 if !arr.is_null(i) {
715 rs.vertices.insert(Vid::from(arr.value(i)));
716 }
717 }
718 }
719 }
720 for &col in edge_cols {
721 if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
722 for i in 0..arr.len() {
723 if !arr.is_null(i) {
724 rs.edges.insert(Eid::from(arr.value(i)));
725 }
726 }
727 }
728 }
729 }
730}
731
732fn overlay_l0_neighbors(
737 vid: Vid,
738 edge_type: u32,
739 direction: Direction,
740 l0: &L0Buffer,
741 neighbors: &mut Vec<(Vid, Eid)>,
742 version_hwm: Option<u64>,
743) {
744 use std::collections::HashMap;
745
746 let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
748
749 for &simple_dir in direction.to_simple_directions() {
751 for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
752 if version_hwm.is_some_and(|hwm| version > hwm) {
754 continue;
755 }
756
757 if l0.is_tombstoned(eid) {
759 neighbor_map.remove(&eid);
760 } else {
761 neighbor_map.insert(eid, neighbor);
762 }
763 }
764 }
765
766 for eid in l0.tombstones.keys() {
768 neighbor_map.remove(eid);
769 }
770
771 *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
773}
774
775trait DirectionExt {
777 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
778}
779
780impl DirectionExt for Direction {
781 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
782 use uni_common::graph::simple_graph::Direction as SimpleDirection;
783 match self {
784 Direction::Outgoing => &[SimpleDirection::Outgoing],
785 Direction::Incoming => &[SimpleDirection::Incoming],
786 Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
787 }
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794
795 #[test]
796 fn test_l0_context_empty() {
797 let ctx = L0Context::empty();
798 assert!(ctx.current_l0.is_none());
799 assert!(ctx.transaction_l0.is_none());
800 assert!(ctx.pending_flush_l0s.is_empty());
801 }
802}