pub mod apply;
pub mod bind_fixed_path;
pub mod bind_zero_length_path;
pub mod bitmap;
pub mod common;
pub mod comprehension;
pub mod expr_compiler;
pub mod ext_id_lookup;
pub mod locy_abduce;
pub mod locy_assume;
pub mod locy_ast_builder;
pub(crate) mod locy_bdd;
pub mod locy_best_by;
pub mod locy_delta;
pub mod locy_derive;
pub mod locy_errors;
pub mod locy_eval;
pub mod locy_explain;
pub mod locy_fixpoint;
pub mod locy_fold;
pub mod locy_priority;
pub mod locy_program;
pub mod locy_query;
pub mod locy_slg;
pub mod locy_traits;
pub mod mutation_common;
pub mod mutation_create;
pub mod mutation_delete;
pub mod mutation_foreach;
pub mod mutation_merge;
pub mod mutation_remove;
pub mod mutation_set;
pub mod nfa;
pub mod optional_filter;
pub mod pattern_comprehension;
pub mod pred_dag;
pub mod procedure_call;
pub mod quantifier;
pub mod recursive_cte;
pub mod reduce;
pub mod scan;
pub mod shortest_path;
pub(crate) mod similar_to_expr;
pub mod traverse;
pub mod unwind;
pub mod vector_knn;
use crate::query::executor::procedure::ProcedureRegistry;
use parking_lot::RwLock;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use uni_algo::algo::AlgorithmRegistry;
use uni_common::core::id::{Eid, Vid};
use uni_store::runtime::context::QueryContext;
use uni_store::runtime::l0::L0Buffer;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::storage::adjacency_manager::AdjacencyManager;
use uni_store::storage::direction::Direction;
use uni_store::storage::manager::StorageManager;
use uni_xervo::runtime::ModelRuntime;
use crate::types::QueryWarning;
pub use apply::GraphApplyExec;
pub use ext_id_lookup::GraphExtIdLookupExec;
pub use mutation_common::{MutationContext, MutationExec};
pub use mutation_create::MutationCreateExec;
pub use mutation_delete::MutationDeleteExec;
pub use mutation_foreach::ForeachExec;
pub use mutation_merge::MutationMergeExec;
pub use mutation_remove::MutationRemoveExec;
pub use mutation_set::MutationSetExec;
pub use optional_filter::OptionalFilterExec;
pub use procedure_call::GraphProcedureCallExec;
pub use scan::GraphScanExec;
pub use shortest_path::GraphShortestPathExec;
pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
pub use unwind::GraphUnwindExec;
pub use vector_knn::GraphVectorKnnExec;
pub use locy_best_by::BestByExec;
pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
pub use locy_fixpoint::{
DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
};
pub use locy_fold::FoldExec;
pub use locy_priority::PriorityExec;
pub use locy_program::{DerivedStore, LocyProgramExec};
pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
pub struct GraphExecutionContext {
storage: Arc<StorageManager>,
l0_context: L0Context,
property_manager: Arc<PropertyManager>,
deadline: Option<Instant>,
algo_registry: Option<Arc<AlgorithmRegistry>>,
procedure_registry: Option<Arc<ProcedureRegistry>>,
xervo_runtime: Option<Arc<ModelRuntime>>,
warnings: Arc<Mutex<Vec<QueryWarning>>>,
cancellation_token: Option<tokio_util::sync::CancellationToken>,
}
impl std::fmt::Debug for GraphExecutionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GraphExecutionContext")
.field("l0_context", &self.l0_context)
.field("deadline", &self.deadline)
.finish_non_exhaustive()
}
}
#[derive(Clone, Default)]
pub struct L0Context {
pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
}
impl std::fmt::Debug for L0Context {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("L0Context")
.field("current_l0", &self.current_l0.is_some())
.field("transaction_l0", &self.transaction_l0.is_some())
.field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
.finish()
}
}
impl L0Context {
pub fn empty() -> Self {
Self::default()
}
pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
Self {
current_l0: Some(l0),
..Self::default()
}
}
pub fn from_query_context(ctx: &QueryContext) -> Self {
Self {
current_l0: Some(ctx.l0.clone()),
transaction_l0: ctx.transaction_l0.clone(),
pending_flush_l0s: ctx.pending_flush_l0s.clone(),
}
}
pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
self.pending_flush_l0s
.iter()
.chain(self.current_l0.iter())
.chain(self.transaction_l0.iter())
}
}
impl GraphExecutionContext {
pub fn new(
storage: Arc<StorageManager>,
l0: Arc<RwLock<L0Buffer>>,
property_manager: Arc<PropertyManager>,
) -> Self {
Self {
storage,
l0_context: L0Context::with_current(l0),
property_manager,
deadline: None,
algo_registry: None,
procedure_registry: None,
xervo_runtime: None,
warnings: Arc::new(Mutex::new(Vec::new())),
cancellation_token: None,
}
}
pub fn with_l0_context(
storage: Arc<StorageManager>,
l0_context: L0Context,
property_manager: Arc<PropertyManager>,
) -> Self {
Self {
storage,
l0_context,
property_manager,
deadline: None,
algo_registry: None,
procedure_registry: None,
xervo_runtime: None,
warnings: Arc::new(Mutex::new(Vec::new())),
cancellation_token: None,
}
}
pub fn from_query_context(
storage: Arc<StorageManager>,
query_ctx: &QueryContext,
property_manager: Arc<PropertyManager>,
) -> Self {
Self {
storage,
l0_context: L0Context::from_query_context(query_ctx),
property_manager,
deadline: query_ctx.deadline,
algo_registry: None,
procedure_registry: None,
xervo_runtime: None,
warnings: Arc::new(Mutex::new(Vec::new())),
cancellation_token: query_ctx.cancellation_token.clone(),
}
}
pub fn with_deadline(mut self, deadline: Instant) -> Self {
self.deadline = Some(deadline);
self
}
pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
self.algo_registry = Some(registry);
self
}
pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
self.algo_registry.as_ref()
}
pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
self.procedure_registry = Some(registry);
self
}
pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
self.xervo_runtime = Some(runtime);
self
}
pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
self.procedure_registry.as_ref()
}
pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
self.xervo_runtime.as_ref()
}
pub fn push_warning(&self, warning: QueryWarning) {
if let Ok(mut w) = self.warnings.lock() {
w.push(warning);
}
}
pub fn take_warnings(&self) -> Vec<QueryWarning> {
self.warnings
.lock()
.map(|mut w| std::mem::take(&mut *w))
.unwrap_or_default()
}
pub fn check_timeout(&self) -> anyhow::Result<()> {
if let Some(ref token) = self.cancellation_token
&& token.is_cancelled()
{
return Err(anyhow::anyhow!("Query cancelled"));
}
if let Some(deadline) = self.deadline
&& Instant::now() > deadline
{
return Err(anyhow::anyhow!("Query timed out"));
}
Ok(())
}
pub fn storage(&self) -> &Arc<StorageManager> {
&self.storage
}
pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
self.storage.adjacency_manager()
}
pub fn property_manager(&self) -> &Arc<PropertyManager> {
&self.property_manager
}
pub fn l0_context(&self) -> &L0Context {
&self.l0_context
}
pub fn query_context(&self) -> QueryContext {
let l0 = self
.l0_context
.current_l0
.clone()
.unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
let mut ctx = QueryContext::new_with_pending(
l0,
self.l0_context.transaction_l0.clone(),
self.l0_context.pending_flush_l0s.clone(),
);
if let Some(deadline) = self.deadline {
ctx.set_deadline(deadline);
}
ctx
}
pub async fn ensure_adjacency_warmed(
&self,
edge_type_ids: &[u32],
direction: Direction,
) -> anyhow::Result<()> {
let am = self.adjacency_manager();
let version = self.storage.version_high_water_mark();
for &etype_id in edge_type_ids {
if !am.is_active_for(etype_id, direction) {
for &dir in direction.expand() {
self.storage
.warm_adjacency_coalesced(etype_id, dir, version)
.await?;
}
}
}
Ok(())
}
pub fn warming_future(
self: &Arc<Self>,
edge_type_ids: Vec<u32>,
direction: Direction,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
{
let ctx = self.clone();
Box::pin(async move {
ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
})
}
pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
let am = self.adjacency_manager();
let version_hwm = self.storage.version_high_water_mark();
let mut neighbors = if let Some(hwm) = version_hwm {
self.storage
.get_neighbors_at_version(vid, edge_type, direction, hwm)
} else {
am.get_neighbors(vid, edge_type, direction)
};
if version_hwm.is_none()
&& let Some(tx_l0) = &self.l0_context.transaction_l0
{
let tx_guard = tx_l0.read();
overlay_l0_neighbors(
vid,
edge_type,
direction,
&tx_guard,
&mut neighbors,
version_hwm,
);
}
neighbors
}
pub fn get_neighbors_batch(
&self,
vids: &[Vid],
edge_type: u32,
direction: Direction,
) -> Vec<(Vid, Vid, Eid)> {
let am = self.adjacency_manager();
let version_hwm = self.storage.version_high_water_mark();
let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
let mut results = Vec::new();
for &vid in vids {
let mut neighbors = if let Some(hwm) = version_hwm {
self.storage
.get_neighbors_at_version(vid, edge_type, direction, hwm)
} else {
am.get_neighbors(vid, edge_type, direction)
};
if version_hwm.is_none()
&& let Some(ref tx_guard) = tx_guard
{
overlay_l0_neighbors(
vid,
edge_type,
direction,
tx_guard,
&mut neighbors,
version_hwm,
);
}
results.extend(
neighbors
.into_iter()
.map(|(neighbor, eid)| (vid, neighbor, eid)),
);
}
results
}
}
fn overlay_l0_neighbors(
vid: Vid,
edge_type: u32,
direction: Direction,
l0: &L0Buffer,
neighbors: &mut Vec<(Vid, Eid)>,
version_hwm: Option<u64>,
) {
use std::collections::HashMap;
let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
for &simple_dir in direction.to_simple_directions() {
for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
if version_hwm.is_some_and(|hwm| version > hwm) {
continue;
}
if l0.is_tombstoned(eid) {
neighbor_map.remove(&eid);
} else {
neighbor_map.insert(eid, neighbor);
}
}
}
for eid in l0.tombstones.keys() {
neighbor_map.remove(eid);
}
*neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
}
trait DirectionExt {
fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
}
impl DirectionExt for Direction {
fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
use uni_common::graph::simple_graph::Direction as SimpleDirection;
match self {
Direction::Outgoing => &[SimpleDirection::Outgoing],
Direction::Incoming => &[SimpleDirection::Incoming],
Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_l0_context_empty() {
let ctx = L0Context::empty();
assert!(ctx.current_l0.is_none());
assert!(ctx.transaction_l0.is_none());
assert!(ctx.pending_flush_l0s.is_empty());
}
}