uni_query/query/df_graph/
mod.rs1pub mod apply;
39pub mod bind_fixed_path;
40pub mod bind_zero_length_path;
41pub mod bitmap;
42pub mod common;
43pub mod comprehension;
44pub mod expr_compiler;
45pub mod ext_id_lookup;
46pub mod locy_abduce;
47pub mod locy_assume;
48pub mod locy_ast_builder;
49pub(crate) mod locy_bdd;
50pub mod locy_best_by;
51pub mod locy_delta;
52pub mod locy_derive;
53pub mod locy_errors;
54pub mod locy_eval;
55pub mod locy_explain;
56pub mod locy_fixpoint;
57pub mod locy_fold;
58pub mod locy_priority;
59pub mod locy_program;
60pub mod locy_query;
61pub mod locy_slg;
62pub mod locy_traits;
63pub mod mutation_common;
64pub mod mutation_create;
65pub mod mutation_delete;
66pub mod mutation_foreach;
67pub mod mutation_merge;
68pub mod mutation_remove;
69pub mod mutation_set;
70pub mod nfa;
71pub mod optional_filter;
72pub mod pattern_comprehension;
73pub mod pred_dag;
74pub mod procedure_call;
75pub mod quantifier;
76pub mod recursive_cte;
77pub mod reduce;
78pub mod scan;
79pub mod shortest_path;
80pub(crate) mod similar_to_expr;
81pub mod traverse;
82pub mod unwind;
83pub mod vector_knn;
84
85use crate::query::executor::procedure::ProcedureRegistry;
86use parking_lot::RwLock;
87use std::sync::{Arc, Mutex};
88use std::time::Instant;
89use uni_algo::algo::AlgorithmRegistry;
90use uni_common::core::id::{Eid, Vid};
91use uni_store::runtime::context::QueryContext;
92use uni_store::runtime::l0::L0Buffer;
93use uni_store::runtime::property_manager::PropertyManager;
94use uni_store::storage::adjacency_manager::AdjacencyManager;
95use uni_store::storage::direction::Direction;
96use uni_store::storage::manager::StorageManager;
97use uni_xervo::runtime::ModelRuntime;
98
99use crate::types::QueryWarning;
100
101pub use apply::GraphApplyExec;
102pub use ext_id_lookup::GraphExtIdLookupExec;
103pub use mutation_common::{MutationContext, MutationExec};
104pub use mutation_create::MutationCreateExec;
105pub use mutation_delete::MutationDeleteExec;
106pub use mutation_foreach::ForeachExec;
107pub use mutation_merge::MutationMergeExec;
108pub use mutation_remove::MutationRemoveExec;
109pub use mutation_set::MutationSetExec;
110pub use optional_filter::OptionalFilterExec;
111pub use procedure_call::GraphProcedureCallExec;
112pub use scan::GraphScanExec;
113pub use shortest_path::GraphShortestPathExec;
114pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
115pub use unwind::GraphUnwindExec;
116pub use vector_knn::GraphVectorKnnExec;
117
118pub use locy_best_by::BestByExec;
119pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
120pub use locy_fixpoint::{
121 DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
122 FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
123};
124pub use locy_fold::FoldExec;
125pub use locy_priority::PriorityExec;
126pub use locy_program::{DerivedStore, LocyProgramExec};
127pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
128
129pub struct GraphExecutionContext {
150 storage: Arc<StorageManager>,
152
153 l0_context: L0Context,
155
156 property_manager: Arc<PropertyManager>,
158
159 deadline: Option<Instant>,
161
162 algo_registry: Option<Arc<AlgorithmRegistry>>,
164
165 procedure_registry: Option<Arc<ProcedureRegistry>>,
167 xervo_runtime: Option<Arc<ModelRuntime>>,
169
170 warnings: Arc<Mutex<Vec<QueryWarning>>>,
172
173 cancellation_token: Option<tokio_util::sync::CancellationToken>,
175}
176
177impl std::fmt::Debug for GraphExecutionContext {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_struct("GraphExecutionContext")
180 .field("l0_context", &self.l0_context)
181 .field("deadline", &self.deadline)
182 .finish_non_exhaustive()
183 }
184}
185
186#[derive(Clone, Default)]
195pub struct L0Context {
196 pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
198
199 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
201
202 pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
205}
206
207impl std::fmt::Debug for L0Context {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("L0Context")
210 .field("current_l0", &self.current_l0.is_some())
211 .field("transaction_l0", &self.transaction_l0.is_some())
212 .field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
213 .finish()
214 }
215}
216
217impl L0Context {
218 pub fn empty() -> Self {
220 Self::default()
221 }
222
223 pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
225 Self {
226 current_l0: Some(l0),
227 ..Self::default()
228 }
229 }
230
231 pub fn from_query_context(ctx: &QueryContext) -> Self {
233 Self {
234 current_l0: Some(ctx.l0.clone()),
235 transaction_l0: ctx.transaction_l0.clone(),
236 pending_flush_l0s: ctx.pending_flush_l0s.clone(),
237 }
238 }
239
240 pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
243 self.pending_flush_l0s
244 .iter()
245 .chain(self.current_l0.iter())
246 .chain(self.transaction_l0.iter())
247 }
248}
249
250impl GraphExecutionContext {
251 pub fn new(
259 storage: Arc<StorageManager>,
260 l0: Arc<RwLock<L0Buffer>>,
261 property_manager: Arc<PropertyManager>,
262 ) -> Self {
263 Self {
264 storage,
265 l0_context: L0Context::with_current(l0),
266 property_manager,
267 deadline: None,
268 algo_registry: None,
269 procedure_registry: None,
270 xervo_runtime: None,
271 warnings: Arc::new(Mutex::new(Vec::new())),
272 cancellation_token: None,
273 }
274 }
275
276 pub fn with_l0_context(
284 storage: Arc<StorageManager>,
285 l0_context: L0Context,
286 property_manager: Arc<PropertyManager>,
287 ) -> Self {
288 Self {
289 storage,
290 l0_context,
291 property_manager,
292 deadline: None,
293 algo_registry: None,
294 procedure_registry: None,
295 xervo_runtime: None,
296 warnings: Arc::new(Mutex::new(Vec::new())),
297 cancellation_token: None,
298 }
299 }
300
301 pub fn from_query_context(
303 storage: Arc<StorageManager>,
304 query_ctx: &QueryContext,
305 property_manager: Arc<PropertyManager>,
306 ) -> Self {
307 Self {
308 storage,
309 l0_context: L0Context::from_query_context(query_ctx),
310 property_manager,
311 deadline: query_ctx.deadline,
312 algo_registry: None,
313 procedure_registry: None,
314 xervo_runtime: None,
315 warnings: Arc::new(Mutex::new(Vec::new())),
316 cancellation_token: query_ctx.cancellation_token.clone(),
317 }
318 }
319
320 pub fn with_deadline(mut self, deadline: Instant) -> Self {
322 self.deadline = Some(deadline);
323 self
324 }
325
326 pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
328 self.algo_registry = Some(registry);
329 self
330 }
331
332 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
334 self.algo_registry.as_ref()
335 }
336
337 pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
339 self.procedure_registry = Some(registry);
340 self
341 }
342
343 pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
345 self.xervo_runtime = Some(runtime);
346 self
347 }
348
349 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
351 self.procedure_registry.as_ref()
352 }
353
354 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
355 self.xervo_runtime.as_ref()
356 }
357
358 pub fn push_warning(&self, warning: QueryWarning) {
360 if let Ok(mut w) = self.warnings.lock() {
361 w.push(warning);
362 }
363 }
364
365 pub fn take_warnings(&self) -> Vec<QueryWarning> {
367 self.warnings
368 .lock()
369 .map(|mut w| std::mem::take(&mut *w))
370 .unwrap_or_default()
371 }
372
373 pub fn check_timeout(&self) -> anyhow::Result<()> {
379 if let Some(ref token) = self.cancellation_token
380 && token.is_cancelled()
381 {
382 return Err(anyhow::anyhow!("Query cancelled"));
383 }
384 if let Some(deadline) = self.deadline
385 && Instant::now() > deadline
386 {
387 return Err(anyhow::anyhow!("Query timed out"));
388 }
389 Ok(())
390 }
391
392 pub fn storage(&self) -> &Arc<StorageManager> {
394 &self.storage
395 }
396
397 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
399 self.storage.adjacency_manager()
400 }
401
402 pub fn property_manager(&self) -> &Arc<PropertyManager> {
404 &self.property_manager
405 }
406
407 pub fn l0_context(&self) -> &L0Context {
409 &self.l0_context
410 }
411
412 pub fn query_context(&self) -> QueryContext {
416 let l0 = self
417 .l0_context
418 .current_l0
419 .clone()
420 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
421
422 let mut ctx = QueryContext::new_with_pending(
423 l0,
424 self.l0_context.transaction_l0.clone(),
425 self.l0_context.pending_flush_l0s.clone(),
426 );
427 if let Some(deadline) = self.deadline {
428 ctx.set_deadline(deadline);
429 }
430 ctx
431 }
432
433 pub async fn ensure_adjacency_warmed(
440 &self,
441 edge_type_ids: &[u32],
442 direction: Direction,
443 ) -> anyhow::Result<()> {
444 let am = self.adjacency_manager();
445 let version = self.storage.version_high_water_mark();
446 for &etype_id in edge_type_ids {
447 if !am.is_active_for(etype_id, direction) {
451 for &dir in direction.expand() {
452 self.storage
454 .warm_adjacency_coalesced(etype_id, dir, version)
455 .await?;
456 }
457 }
458 }
459 Ok(())
460 }
461
462 pub fn warming_future(
467 self: &Arc<Self>,
468 edge_type_ids: Vec<u32>,
469 direction: Direction,
470 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
471 {
472 let ctx = self.clone();
473 Box::pin(async move {
474 ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
475 .await
476 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
477 })
478 }
479
480 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
499 let am = self.adjacency_manager();
500 let version_hwm = self.storage.version_high_water_mark();
501
502 let mut neighbors = if let Some(hwm) = version_hwm {
505 self.storage
506 .get_neighbors_at_version(vid, edge_type, direction, hwm)
507 } else {
508 am.get_neighbors(vid, edge_type, direction)
509 };
510
511 if version_hwm.is_none()
513 && let Some(tx_l0) = &self.l0_context.transaction_l0
514 {
515 let tx_guard = tx_l0.read();
516 overlay_l0_neighbors(
517 vid,
518 edge_type,
519 direction,
520 &tx_guard,
521 &mut neighbors,
522 version_hwm,
523 );
524 }
525
526 neighbors
527 }
528
529 pub fn get_neighbors_batch(
544 &self,
545 vids: &[Vid],
546 edge_type: u32,
547 direction: Direction,
548 ) -> Vec<(Vid, Vid, Eid)> {
549 let am = self.adjacency_manager();
550 let version_hwm = self.storage.version_high_water_mark();
551
552 let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
553
554 let mut results = Vec::new();
555
556 for &vid in vids {
557 let mut neighbors = if let Some(hwm) = version_hwm {
558 self.storage
559 .get_neighbors_at_version(vid, edge_type, direction, hwm)
560 } else {
561 am.get_neighbors(vid, edge_type, direction)
562 };
563
564 if version_hwm.is_none()
566 && let Some(ref tx_guard) = tx_guard
567 {
568 overlay_l0_neighbors(
569 vid,
570 edge_type,
571 direction,
572 tx_guard,
573 &mut neighbors,
574 version_hwm,
575 );
576 }
577
578 results.extend(
579 neighbors
580 .into_iter()
581 .map(|(neighbor, eid)| (vid, neighbor, eid)),
582 );
583 }
584
585 results
586 }
587}
588
589fn overlay_l0_neighbors(
594 vid: Vid,
595 edge_type: u32,
596 direction: Direction,
597 l0: &L0Buffer,
598 neighbors: &mut Vec<(Vid, Eid)>,
599 version_hwm: Option<u64>,
600) {
601 use std::collections::HashMap;
602
603 let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
605
606 for &simple_dir in direction.to_simple_directions() {
608 for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
609 if version_hwm.is_some_and(|hwm| version > hwm) {
611 continue;
612 }
613
614 if l0.is_tombstoned(eid) {
616 neighbor_map.remove(&eid);
617 } else {
618 neighbor_map.insert(eid, neighbor);
619 }
620 }
621 }
622
623 for eid in l0.tombstones.keys() {
625 neighbor_map.remove(eid);
626 }
627
628 *neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
630}
631
632trait DirectionExt {
634 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
635}
636
637impl DirectionExt for Direction {
638 fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
639 use uni_common::graph::simple_graph::Direction as SimpleDirection;
640 match self {
641 Direction::Outgoing => &[SimpleDirection::Outgoing],
642 Direction::Incoming => &[SimpleDirection::Incoming],
643 Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
644 }
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651
652 #[test]
653 fn test_l0_context_empty() {
654 let ctx = L0Context::empty();
655 assert!(ctx.current_l0.is_none());
656 assert!(ctx.transaction_l0.is_none());
657 assert!(ctx.pending_flush_l0s.is_empty());
658 }
659}