pub mod apply;
pub mod bind_fixed_path;
pub mod bind_zero_length_path;
pub mod bitmap;
pub mod catalog_scan;
pub mod common;
pub mod comprehension;
pub mod expr_compiler;
pub mod ext_id_lookup;
pub mod locy_abduce;
pub mod locy_assume;
pub mod locy_ast_builder;
pub(crate) mod locy_bdd;
pub mod locy_best_by;
pub mod locy_calibrate;
pub mod locy_delta;
pub mod locy_derive;
pub mod locy_errors;
pub mod locy_eval;
pub mod locy_explain;
pub mod locy_fixpoint;
pub mod locy_fold;
pub mod locy_model_invoke;
pub mod locy_priority;
pub mod locy_program;
pub mod locy_query;
pub mod locy_slg;
pub mod locy_traits;
pub mod locy_validate;
pub mod mutation_common;
pub mod mutation_delete;
pub mod mutation_foreach;
pub mod mutation_remove;
pub mod mutation_set;
pub mod nfa;
pub mod optional_filter;
pub mod pattern_comprehension;
pub mod pattern_exists;
pub mod pred_dag;
pub mod procedure_call;
pub mod quantifier;
mod read_set_exec;
pub mod recursive_cte;
pub mod reduce;
pub mod scan;
pub mod shortest_path;
pub(crate) mod similar_to_expr;
pub mod traverse;
pub mod unwind;
pub mod vector_knn;
pub mod vid_lookup_join;
use crate::query::executor::procedure::ProcedureRegistry;
use parking_lot::RwLock;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use uni_algo::algo::AlgorithmRegistry;
use uni_common::core::id::{Eid, Vid};
use uni_store::runtime::context::QueryContext;
use uni_store::runtime::l0::L0Buffer;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::storage::adjacency_manager::AdjacencyManager;
use uni_store::storage::direction::Direction;
use uni_store::storage::manager::StorageManager;
pub mod search_procedures;
use uni_xervo::runtime::ModelRuntime;
use crate::types::QueryWarning;
pub use apply::GraphApplyExec;
pub use ext_id_lookup::GraphExtIdLookupExec;
pub use mutation_common::{
MutationContext, MutationExec, MutationExec as MutationCreateExec,
MutationExec as MutationMergeExec, new_create_exec, new_merge_exec,
};
pub use mutation_delete::MutationDeleteExec;
pub use mutation_foreach::ForeachExec;
pub use mutation_remove::MutationRemoveExec;
pub use mutation_set::MutationSetExec;
pub use optional_filter::OptionalFilterExec;
pub use procedure_call::GraphProcedureCallExec;
pub use read_set_exec::ReadSetRecordingExec;
pub use scan::GraphScanExec;
pub use shortest_path::GraphShortestPathExec;
pub use traverse::{GraphTraverseExec, GraphTraverseMainExec};
pub use unwind::GraphUnwindExec;
pub use vector_knn::GraphVectorKnnExec;
pub use locy_best_by::BestByExec;
pub use locy_explain::{ProofTerm, ProvenanceAnnotation, ProvenanceStore};
pub use locy_fixpoint::{
DerivedScanEntry, DerivedScanExec, DerivedScanRegistry, FixpointClausePlan, FixpointExec,
FixpointRulePlan, FixpointState, IsRefBinding, MonotonicFoldBinding,
};
pub use locy_fold::FoldExec;
pub use locy_priority::PriorityExec;
pub use locy_program::{DerivedStore, LocyProgramExec};
pub use locy_traits::{DerivedFactSource, LocyExecutionContext};
pub struct GraphExecutionContext {
storage: Arc<StorageManager>,
l0_context: L0Context,
property_manager: Arc<PropertyManager>,
deadline: Option<Instant>,
algo_registry: Option<Arc<AlgorithmRegistry>>,
procedure_registry: Option<Arc<ProcedureRegistry>>,
plugin_registry: Option<Arc<uni_plugin::PluginRegistry>>,
xervo_runtime: Option<Arc<ModelRuntime>>,
warnings: Arc<Mutex<Vec<QueryWarning>>>,
cancellation_token: Option<tokio_util::sync::CancellationToken>,
writer: Option<Arc<uni_store::Writer>>,
}
impl std::fmt::Debug for GraphExecutionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GraphExecutionContext")
.field("l0_context", &self.l0_context)
.field("deadline", &self.deadline)
.finish_non_exhaustive()
}
}
#[derive(Clone, Default)]
pub struct L0Context {
pub current_l0: Option<Arc<RwLock<L0Buffer>>>,
pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
}
impl std::fmt::Debug for L0Context {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("L0Context")
.field("current_l0", &self.current_l0.is_some())
.field("transaction_l0", &self.transaction_l0.is_some())
.field("pending_flush_l0s_count", &self.pending_flush_l0s.len())
.finish()
}
}
impl L0Context {
pub fn empty() -> Self {
Self::default()
}
pub fn with_current(l0: Arc<RwLock<L0Buffer>>) -> Self {
Self {
current_l0: Some(l0),
..Self::default()
}
}
pub fn from_query_context(ctx: &QueryContext) -> Self {
Self {
current_l0: Some(ctx.l0.clone()),
transaction_l0: ctx.transaction_l0.clone(),
pending_flush_l0s: ctx.pending_flush_l0s.clone(),
}
}
pub fn iter_l0_buffers(&self) -> impl Iterator<Item = &Arc<RwLock<L0Buffer>>> {
self.pending_flush_l0s
.iter()
.chain(self.current_l0.iter())
.chain(self.transaction_l0.iter())
}
}
impl GraphExecutionContext {
fn with_parts(
storage: Arc<StorageManager>,
l0_context: L0Context,
property_manager: Arc<PropertyManager>,
deadline: Option<Instant>,
cancellation_token: Option<tokio_util::sync::CancellationToken>,
) -> Self {
Self {
storage,
l0_context,
property_manager,
deadline,
algo_registry: None,
procedure_registry: None,
plugin_registry: None,
xervo_runtime: None,
warnings: Arc::new(Mutex::new(Vec::new())),
cancellation_token,
writer: None,
}
}
pub fn new(
storage: Arc<StorageManager>,
l0: Arc<RwLock<L0Buffer>>,
property_manager: Arc<PropertyManager>,
) -> Self {
Self::with_parts(
storage,
L0Context::with_current(l0),
property_manager,
None,
None,
)
}
pub fn with_l0_context(
storage: Arc<StorageManager>,
l0_context: L0Context,
property_manager: Arc<PropertyManager>,
) -> Self {
Self::with_parts(storage, l0_context, property_manager, None, None)
}
pub fn from_query_context(
storage: Arc<StorageManager>,
query_ctx: &QueryContext,
property_manager: Arc<PropertyManager>,
) -> Self {
Self::with_parts(
storage,
L0Context::from_query_context(query_ctx),
property_manager,
query_ctx.deadline,
query_ctx.cancellation_token.clone(),
)
}
pub fn with_deadline(mut self, deadline: Instant) -> Self {
self.deadline = Some(deadline);
self
}
#[must_use]
pub fn with_writer(mut self, writer: Arc<uni_store::Writer>) -> Self {
self.writer = Some(writer);
self
}
#[must_use]
pub fn writer(&self) -> Option<&Arc<uni_store::Writer>> {
self.writer.as_ref()
}
pub fn with_algo_registry(mut self, registry: Arc<AlgorithmRegistry>) -> Self {
self.algo_registry = Some(registry);
self
}
pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
self.algo_registry.as_ref()
}
pub fn with_procedure_registry(mut self, registry: Arc<ProcedureRegistry>) -> Self {
self.procedure_registry = Some(registry);
self
}
pub fn with_xervo_runtime(mut self, runtime: Arc<ModelRuntime>) -> Self {
self.xervo_runtime = Some(runtime);
self
}
pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
self.procedure_registry.as_ref()
}
pub fn with_plugin_registry(mut self, registry: Arc<uni_plugin::PluginRegistry>) -> Self {
self.plugin_registry = Some(registry);
self
}
pub fn plugin_registry(&self) -> Option<&Arc<uni_plugin::PluginRegistry>> {
self.plugin_registry.as_ref()
}
pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
self.xervo_runtime.as_ref()
}
pub fn push_warning(&self, warning: QueryWarning) {
if let Ok(mut w) = self.warnings.lock() {
w.push(warning);
}
}
pub fn take_warnings(&self) -> Vec<QueryWarning> {
self.warnings
.lock()
.map(|mut w| std::mem::take(&mut *w))
.unwrap_or_default()
}
pub fn check_timeout(&self) -> anyhow::Result<()> {
if let Some(ref token) = self.cancellation_token
&& token.is_cancelled()
{
return Err(anyhow::anyhow!("Query cancelled"));
}
if let Some(deadline) = self.deadline
&& Instant::now() > deadline
{
return Err(anyhow::anyhow!("Query timed out"));
}
Ok(())
}
pub fn storage(&self) -> &Arc<StorageManager> {
&self.storage
}
pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
self.storage.adjacency_manager()
}
pub fn property_manager(&self) -> &Arc<PropertyManager> {
&self.property_manager
}
pub fn l0_context(&self) -> &L0Context {
&self.l0_context
}
#[must_use]
pub fn deadline_for_host(&self) -> Option<Instant> {
self.deadline
}
#[must_use]
pub fn cancellation_token_for_host(&self) -> Option<tokio_util::sync::CancellationToken> {
self.cancellation_token.clone()
}
pub fn query_context(&self) -> QueryContext {
let l0 = self
.l0_context
.current_l0
.clone()
.unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
let mut ctx = QueryContext::new_with_pending(
l0,
self.l0_context.transaction_l0.clone(),
self.l0_context.pending_flush_l0s.clone(),
);
if let Some(deadline) = self.deadline {
ctx.set_deadline(deadline);
}
ctx
}
pub async fn ensure_adjacency_warmed(
&self,
edge_type_ids: &[u32],
direction: Direction,
) -> anyhow::Result<()> {
let am = self.adjacency_manager();
let version = self.storage.snapshot_version_hwm();
for &etype_id in edge_type_ids {
if !am.is_active_for(etype_id, direction) {
for &dir in direction.expand() {
self.storage
.warm_adjacency_coalesced(etype_id, dir, version)
.await?;
}
}
}
Ok(())
}
pub fn warming_future(
self: &Arc<Self>,
edge_type_ids: Vec<u32>,
direction: Direction,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = datafusion::common::Result<()>> + Send>>
{
let ctx = self.clone();
Box::pin(async move {
ctx.ensure_adjacency_warmed(&edge_type_ids, direction)
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
})
}
pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
let version_hwm = self.storage.snapshot_version_hwm();
let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
self.neighbors_for_vid(
vid,
edge_type,
direction,
version_hwm,
tx_guard.as_deref(),
true,
)
}
pub fn get_neighbors_batch(
&self,
vids: &[Vid],
edge_type: u32,
direction: Direction,
) -> Vec<(Vid, Vid, Eid)> {
let version_hwm = self.storage.snapshot_version_hwm();
let tx_guard = self.l0_context.transaction_l0.as_ref().map(|l0| l0.read());
let mut results = Vec::new();
for &vid in vids {
let neighbors = self.neighbors_for_vid(
vid,
edge_type,
direction,
version_hwm,
tx_guard.as_deref(),
false,
);
results.extend(
neighbors
.into_iter()
.map(|(neighbor, eid)| (vid, neighbor, eid)),
);
}
drop(tx_guard);
self.record_neighbor_reads_batch(vids, &results);
results
}
#[must_use]
pub fn resolve_stored_edge_endpoints(
&self,
eid: Eid,
traversal_src: Vid,
traversal_dst: Vid,
edge_type_ids: &[u32],
) -> (u64, u64) {
let query_ctx = self.query_context();
if let Some((src, dst)) =
uni_store::runtime::l0_visibility::get_edge_endpoints(eid, &query_ctx)
{
return (src.as_u64(), dst.as_u64());
}
let adjacency_manager = self.adjacency_manager();
let warmed_fallback: Vec<u32>;
let probe_types: &[u32] = if edge_type_ids.is_empty() {
warmed_fallback = adjacency_manager.known_edge_type_ids();
&warmed_fallback
} else {
edge_type_ids
};
let version_hwm = self.storage.snapshot_version_hwm();
let outgoing_contains = |vid: Vid| -> bool {
probe_types.iter().any(|&etype| {
let neighbors = match version_hwm {
Some(hwm) => {
self.storage
.get_neighbors_at_version(vid, etype, Direction::Outgoing, hwm)
}
None => adjacency_manager.get_neighbors(vid, etype, Direction::Outgoing),
};
neighbors.iter().any(|&(_, e)| e == eid)
})
};
if outgoing_contains(traversal_src) {
(traversal_src.as_u64(), traversal_dst.as_u64())
} else if outgoing_contains(traversal_dst) {
(traversal_dst.as_u64(), traversal_src.as_u64())
} else {
(traversal_src.as_u64(), traversal_dst.as_u64())
}
}
fn neighbors_for_vid(
&self,
vid: Vid,
edge_type: u32,
direction: Direction,
version_hwm: Option<u64>,
tx_guard: Option<&L0Buffer>,
record_reads: bool,
) -> Vec<(Vid, Eid)> {
let mut neighbors = if let Some(hwm) = version_hwm {
self.storage
.get_neighbors_at_version(vid, edge_type, direction, hwm)
} else {
self.adjacency_manager()
.get_neighbors(vid, edge_type, direction)
};
if version_hwm.is_none()
&& let Some(tx_guard) = tx_guard
{
overlay_l0_neighbors(
vid,
edge_type,
direction,
tx_guard,
&mut neighbors,
version_hwm,
);
}
if record_reads {
self.record_neighbor_reads(vid, &neighbors);
}
neighbors
}
fn record_neighbor_reads(&self, src: Vid, neighbors: &[(Vid, Eid)]) {
let Some(tx_l0) = &self.l0_context.transaction_l0 else {
return;
};
let guard = tx_l0.read();
let Some(read_set) = &guard.occ_read_set else {
return;
};
let mut rs = read_set.lock();
rs.vertices.insert(src);
for (nbr, eid) in neighbors {
rs.vertices.insert(*nbr);
rs.edges.insert(*eid);
}
}
fn record_neighbor_reads_batch(&self, srcs: &[Vid], triples: &[(Vid, Vid, Eid)]) {
if srcs.is_empty() && triples.is_empty() {
return;
}
let Some(tx_l0) = &self.l0_context.transaction_l0 else {
return;
};
let guard = tx_l0.read();
let Some(read_set) = &guard.occ_read_set else {
return;
};
let mut rs = read_set.lock();
for src in srcs {
rs.vertices.insert(*src);
}
for (_, nbr, eid) in triples {
rs.vertices.insert(*nbr);
rs.edges.insert(*eid);
}
}
pub(crate) fn record_batch_ids(
&self,
batch: &arrow_array::RecordBatch,
vertex_cols: &[usize],
edge_cols: &[usize],
) {
use arrow_array::{Array, UInt64Array};
if vertex_cols.is_empty() && edge_cols.is_empty() {
return;
}
let Some(tx_l0) = &self.l0_context.transaction_l0 else {
return;
};
let guard = tx_l0.read();
let Some(read_set) = &guard.occ_read_set else {
return;
};
let mut rs = read_set.lock();
for &col in vertex_cols {
if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
for i in 0..arr.len() {
if !arr.is_null(i) {
rs.vertices.insert(Vid::from(arr.value(i)));
}
}
}
}
for &col in edge_cols {
if let Some(arr) = batch.column(col).as_any().downcast_ref::<UInt64Array>() {
for i in 0..arr.len() {
if !arr.is_null(i) {
rs.edges.insert(Eid::from(arr.value(i)));
}
}
}
}
}
}
fn overlay_l0_neighbors(
vid: Vid,
edge_type: u32,
direction: Direction,
l0: &L0Buffer,
neighbors: &mut Vec<(Vid, Eid)>,
version_hwm: Option<u64>,
) {
use std::collections::HashMap;
let mut neighbor_map: HashMap<Eid, Vid> = neighbors.drain(..).map(|(v, e)| (e, v)).collect();
for &simple_dir in direction.to_simple_directions() {
for (neighbor, eid, version) in l0.get_neighbors(vid, edge_type, simple_dir) {
if version_hwm.is_some_and(|hwm| version > hwm) {
continue;
}
if l0.is_tombstoned(eid) {
neighbor_map.remove(&eid);
} else {
neighbor_map.insert(eid, neighbor);
}
}
}
for eid in l0.tombstones.keys() {
neighbor_map.remove(eid);
}
*neighbors = neighbor_map.into_iter().map(|(e, v)| (v, e)).collect();
}
trait DirectionExt {
fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction];
}
impl DirectionExt for Direction {
fn to_simple_directions(&self) -> &'static [uni_common::graph::simple_graph::Direction] {
use uni_common::graph::simple_graph::Direction as SimpleDirection;
match self {
Direction::Outgoing => &[SimpleDirection::Outgoing],
Direction::Incoming => &[SimpleDirection::Incoming],
Direction::Both => &[SimpleDirection::Outgoing, SimpleDirection::Incoming],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_l0_context_empty() {
let ctx = L0Context::empty();
assert!(ctx.current_l0.is_none());
assert!(ctx.transaction_l0.is_none());
assert!(ctx.pending_flush_l0s.is_empty());
}
}