uni_query/query/df_graph/
mod.rs1pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod common;
43pub mod comprehension;
44pub mod expr_compiler;
45pub mod ext_id_lookup;
46pub mod locy_abduce;
47pub mod locy_assume;
48pub mod locy_ast_builder;
49pub(crate) mod locy_bdd;
50pub mod locy_best_by;
51pub mod locy_delta;
52pub mod locy_derive;
53pub mod locy_errors;
54pub mod locy_eval;
55pub mod locy_explain;
56pub mod locy_fixpoint;
57pub mod locy_fold;
58pub mod locy_priority;
59pub mod locy_program;
60pub mod locy_query;
61pub mod locy_slg;
62pub mod locy_traits;
63pub mod mutation_common;
64pub mod mutation_create;
65pub mod mutation_delete;
66pub mod mutation_foreach;
67pub mod mutation_merge;
68pub mod mutation_remove;
69pub mod mutation_set;
70pub mod nfa;
71pub mod optional_filter;
72pub mod pattern_comprehension;
73pub mod pred_dag;
74pub mod procedure_call;
75pub mod quantifier;
76pub mod recursive_cte;
77pub mod reduce;
78pub mod scan;
79pub mod shortest_path;
80pub(crate) mod similar_to_expr;
81pub mod traverse;
82pub mod unwind;
83pub mod vector_knn;
84
85use crate::query::executor::procedure::ProcedureRegistry;
86use parking_lot::RwLock;
87use std::sync::{Arc, Mutex};
88use std::time::Instant;
89use uni_algo::algo::AlgorithmRegistry;
90use uni_common::core::id::{Eid, Vid};
91use uni_store::runtime::context::QueryContext;
92use uni_store::runtime::l0::L0Buffer;
93use uni_store::runtime::property_manager::PropertyManager;
94use uni_store::storage::adjacency_manager::AdjacencyManager;
95use uni_store::storage::direction::Direction;
96use uni_store::storage::manager::StorageManager;
97use uni_xervo::runtime::ModelRuntime;
98
99use crate::types::QueryWarning;
100
101pub use apply::GraphApplyExec;
102pub use ext_id_lookup::GraphExtIdLookupExec;
103pub use mutation_common::{MutationContext, MutationExec};
104pub use mutation_create::MutationCreateExec;
105pub use mutation_delete::MutationDeleteExec;
106pub use mutation_foreach::ForeachExec;
107pub use mutation_merge::MutationMergeExec;
108pub use mutation_remove::MutationRemoveExec;
109pub use mutation_set::MutationSetExec;
110pub use optional_filter::OptionalFilterExec;
111pub use procedure_call::GraphProcedureCallExec;
112pub use scan::GraphScanExec;
113pub use shortest_path::GraphShortestPathExec;
114pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
115pub use unwind::GraphUnwindExec;
116pub use vector_knn::GraphVectorKnnExec;
117
118pub use locy_best_by::BestByExec;
119pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
120pub use locy_fixpoint::{
121 DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
122 FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
123};
124pub use locy_fold::FoldExec;
125pub use locy_priority::PriorityExec;
126pub use locy_program::{DerivedStore, LocyProgramExec};
127pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
128
129pub struct GraphExecutionContext {
150 storage: Arc<StorageManager>,
152
153 l0_context: L0Context,
155
156 property_manager: Arc<PropertyManager>,
158
159 deadline: Option<Instant>,
161
162 algo_registry: Option<Arc<AlgorithmRegistry>>,
164
165 procedure_registry: Option<Arc<ProcedureRegistry>>,
167 xervo_runtime: Option<Arc<ModelRuntime>>,
169
170 warnings: Arc<Mutex<Vec<QueryWarning>>>,
172}
173
174impl std::fmt::Debug for GraphExecutionContext {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("GraphExecutionContext")
177 .field("l0_context", &self.l0_context)
178 .field("deadline", &self.deadline)
179 .finish_non_exhaustive()
180 }
181}
182
183#[derive(Clone, Default)]
192pub struct L0Context {
193 pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
195
196 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
198
199 pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
202}
203
204impl std::fmt::Debug for L0Context {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 f.debug_struct("L0Context")
207 .field("current_l0", &self.current_l0.is_some())
208 .field("transaction_l0", &self.transaction_l0.is_some())
209 .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
210 .finish()
211 }
212}
213
214impl L0Context {
215 pub fn empty() -> Self {
217 Self::default()
218 }
219
220 pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
222 Self {
223 current_l0: Some(l0),
224 ..Self::default()
225 }
226 }
227
228 pub fn from_query_context(ctx: &QueryContext) -> Self {
230 Self {
231 current_l0: Some(ctx.l0.clone()),
232 transaction_l0: ctx.transaction_l0.clone(),
233 pending_flush_l0s: ctx.pending_flush_l0s.clone(),
234 }
235 }
236
237 pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
240 self.pending_flush_l0s
241 .iter()
242 .chain(self.current_l0.iter())
243 .chain(self.transaction_l0.iter())
244 }
245}
246
247impl GraphExecutionContext {
248 pub fn new(
256 storage: Arc<StorageManager>,
257 l0: Arc<RwLock<L0Buffer>>,
258 property_manager: Arc<PropertyManager>,
259 ) -> Self {
260 Self {
261 storage,
262 l0_context: L0Context::with_current(l0),
263 property_manager,
264 deadline: None,
265 algo_registry: None,
266 procedure_registry: None,
267 xervo_runtime: None,
268 warnings: Arc::new(Mutex::new(Vec::new())),
269 }
270 }
271
272 pub fn with_l0_context(
280 storage: Arc<StorageManager>,
281 l0_context: L0Context,
282 property_manager: Arc<PropertyManager>,
283 ) -> Self {
284 Self {
285 storage,
286 l0_context,
287 property_manager,
288 deadline: None,
289 algo_registry: None,
290 procedure_registry: None,
291 xervo_runtime: None,
292 warnings: Arc::new(Mutex::new(Vec::new())),
293 }
294 }
295
296 pub fn from_query_context(
298 storage: Arc<StorageManager>,
299 query_ctx: &QueryContext,
300 property_manager: Arc<PropertyManager>,
301 ) -> Self {
302 Self {
303 storage,
304 l0_context: L0Context::from_query_context(query_ctx),
305 property_manager,
306 deadline: query_ctx.deadline,
307 algo_registry: None,
308 procedure_registry: None,
309 xervo_runtime: None,
310 warnings: Arc::new(Mutex::new(Vec::new())),
311 }
312 }
313
314 pub fn with_deadline(mut self, deadline: Instant) -> Self {
316 self.deadline = Some(deadline);
317 self
318 }
319
320 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
322 self.algo_registry = Some(registry);
323 self
324 }
325
326 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
328 self.algo_registry.as_ref()
329 }
330
331 pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
333 self.procedure_registry = Some(registry);
334 self
335 }
336
337 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
339 self.xervo_runtime = Some(runtime);
340 self
341 }
342
343 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
345 self.procedure_registry.as_ref()
346 }
347
348 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
349 self.xervo_runtime.as_ref()
350 }
351
352 pub fn push_warning(&self, warning: QueryWarning) {
354 if let Ok(mut w) = self.warnings.lock() {
355 w.push(warning);
356 }
357 }
358
359 pub fn take_warnings(&self) -> Vec<QueryWarning> {
361 self.warnings
362 .lock()
363 .map(|mut w| std::mem::take(&mut *w))
364 .unwrap_or_default()
365 }
366
367 pub fn check_timeout(&self) -> anyhow::Result<()> {
373 if let Some(deadline) = self.deadline
374 && Instant::now() > deadline
375 {
376 return Err(anyhow::anyhow!("Query timed out"));
377 }
378 Ok(())
379 }
380
381 pub fn storage(&self) -> &Arc<StorageManager> {
383 &self.storage
384 }
385
386 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
388 self.storage.adjacency_manager()
389 }
390
391 pub fn property_manager(&self) -> &Arc<PropertyManager> {
393 &self.property_manager
394 }
395
396 pub fn l0_context(&self) -> &L0Context {
398 &self.l0_context
399 }
400
401 pub fn query_context(&self) -> QueryContext {
405 let l0 = self
406 .l0_context
407 .current_l0
408 .clone()
409 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
410
411 let mut ctx = QueryContext::new_with_pending(
412 l0,
413 self.l0_context.transaction_l0.clone(),
414 self.l0_context.pending_flush_l0s.clone(),
415 );
416 if let Some(deadline) = self.deadline {
417 ctx.set_deadline(deadline);
418 }
419 ctx
420 }
421
422 pub async fn ensure_adjacency_warmed(
429 &self,
430 edge_type_ids: &[u32],
431 direction: Direction,
432 ) -> anyhow::Result<()> {
433 let am = self.adjacency_manager();
434 let version = self.storage.version_high_water_mark();
435 for &etype_id in edge_type_ids {
436 if !am.is_active_for(etype_id, direction) {
440 for &dir in direction.expand() {
441 self.storage
443 .warm_adjacency_coalesced(etype_id, dir, version)
444 .await?;
445 }
446 }
447 }
448 Ok(())
449 }
450
451 pub fn warming_future(
456 self: &Arc<Self>,
457 edge_type_ids: Vec<u32>,
458 direction: Direction,
459 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
460 {
461 let ctx = self.clone();
462 Box::pin(async move {
463 ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
464 .await
465 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
466 })
467 }
468
469 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
488 let am = self.adjacency_manager();
489 let version_hwm = self.storage.version_high_water_mark();
490
491 let mut neighbors = if let Some(hwm) = version_hwm {
494 self.storage
495 .get_neighbors_at_version(vid, edge_type, direction, hwm)
496 } else {
497 am.get_neighbors(vid, edge_type, direction)
498 };
499
500 if version_hwm.is_none()
502 && let Some(tx_l0) = &self.l0_context.transaction_l0
503 {
504 let tx_guard = tx_l0.read();
505 overlay_l0_neighbors(
506 vid,
507 edge_type,
508 direction,
509 &tx_guard,
510 &mut neighbors,
511 version_hwm,
512 );
513 }
514
515 neighbors
516 }
517
518 pub fn get_neighbors_batch(
533 &self,
534 vids: &[Vid],
535 edge_type: u32,
536 direction: Direction,
537 ) -> Vec<(Vid, Vid, Eid)> {
538 let am = self.adjacency_manager();
539 let version_hwm = self.storage.version_high_water_mark();
540
541 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
542
543 let mut results = Vec::new();
544
545 for &vid in vids {
546 let mut neighbors = if let Some(hwm) = version_hwm {
547 self.storage
548 .get_neighbors_at_version(vid, edge_type, direction, hwm)
549 } else {
550 am.get_neighbors(vid, edge_type, direction)
551 };
552
553 if version_hwm.is_none()
555 && let Some(ref tx_guard) = tx_guard
556 {
557 overlay_l0_neighbors(
558 vid,
559 edge_type,
560 direction,
561 tx_guard,
562 &mut neighbors,
563 version_hwm,
564 );
565 }
566
567 results.extend(
568 neighbors
569 .into_iter()
570 .map(|(neighbor, eid)| (vid, neighbor, eid)),
571 );
572 }
573
574 results
575 }
576}
577
578fn overlay_l0_neighbors(
583 vid: Vid,
584 edge_type: u32,
585 direction: Direction,
586 l0: &L0Buffer,
587 neighbors: &mut Vec<(Vid, Eid)>,
588 version_hwm: Option<u64>,
589) {
590 use std::collections::HashMap;
591
592 let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
594
595 for &simple_dir in direction.to_simple_directions() {
597 for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
598 if version_hwm.is_some_and(|hwm| version > hwm) {
600 continue;
601 }
602
603 if l0.is_tombstoned(eid) {
605 neighbor_map.remove(&eid);
606 } else {
607 neighbor_map.insert(eid, neighbor);
608 }
609 }
610 }
611
612 for eid in l0.tombstones.keys() {
614 neighbor_map.remove(eid);
615 }
616
617 *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
619}
620
621trait DirectionExt {
623 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
624}
625
626impl DirectionExt for Direction {
627 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
628 use uni_common::graph::simple_graph::Direction as SimpleDirection;
629 match self {
630 Direction::Outgoing => &[SimpleDirection::Outgoing],
631 Direction::Incoming => &[SimpleDirection::Incoming],
632 Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
633 }
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[test]
642 fn test_l0_context_empty() {
643 let ctx = L0Context::empty();
644 assert!(ctx.current_l0.is_none());
645 assert!(ctx.transaction_l0.is_none());
646 assert!(ctx.pending_flush_l0s.is_empty());
647 }
648}