use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use arrow_schema::SchemaRef;
use tokio_util::sync::CancellationToken;
use uni_algo::algo::AlgorithmRegistry;
use uni_plugin::traits::procedure::{ProcedureHost, ProcedureMode};
use uni_store::runtime::context::QueryContext;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::runtime::writer::Writer;
use uni_store::storage::manager::StorageManager;
use uni_xervo::runtime::ModelRuntime;
use crate::query::df_graph::{GraphExecutionContext, L0Context};
use crate::query::executor::procedure::ProcedureRegistry;
#[derive(Clone)]
pub struct QueryProcedureHost {
storage: Arc<StorageManager>,
algo_registry: Option<Arc<AlgorithmRegistry>>,
procedure_registry: Option<Arc<ProcedureRegistry>>,
xervo_runtime: Option<Arc<ModelRuntime>>,
property_manager: Option<Arc<PropertyManager>>,
l0_context: L0Context,
deadline: Option<Instant>,
cancellation_token: Option<CancellationToken>,
target_properties: HashMap<String, Vec<String>>,
yield_items: Vec<(String, Option<String>)>,
expected_schema: Option<SchemaRef>,
transient_counter: Arc<AtomicU64>,
writer: Option<Arc<Writer>>,
}
impl QueryProcedureHost {
#[must_use]
pub fn from_graph_ctx(graph_ctx: &GraphExecutionContext) -> Self {
Self::from_graph_ctx_with_request(graph_ctx, HashMap::new(), Vec::new(), None)
}
#[must_use]
pub fn from_graph_ctx_with_request(
graph_ctx: &GraphExecutionContext,
target_properties: HashMap<String, Vec<String>>,
yield_items: Vec<(String, Option<String>)>,
expected_schema: Option<SchemaRef>,
) -> Self {
Self {
storage: Arc::clone(graph_ctx.storage()),
algo_registry: graph_ctx.algo_registry().cloned(),
procedure_registry: graph_ctx.procedure_registry().cloned(),
xervo_runtime: graph_ctx.xervo_runtime().cloned(),
property_manager: Some(Arc::clone(graph_ctx.property_manager())),
l0_context: graph_ctx.l0_context().clone(),
deadline: graph_ctx.deadline_for_host(),
cancellation_token: graph_ctx.cancellation_token_for_host(),
target_properties,
yield_items,
expected_schema,
transient_counter: Arc::new(AtomicU64::new(0)),
writer: None,
}
}
#[must_use]
pub fn from_components(
storage: Arc<StorageManager>,
algo_registry: Option<Arc<AlgorithmRegistry>>,
procedure_registry: Option<Arc<ProcedureRegistry>>,
) -> Self {
Self {
storage,
algo_registry,
procedure_registry,
xervo_runtime: None,
property_manager: None,
l0_context: L0Context::empty(),
deadline: None,
cancellation_token: None,
target_properties: HashMap::new(),
yield_items: Vec::new(),
expected_schema: None,
transient_counter: Arc::new(AtomicU64::new(0)),
writer: None,
}
}
#[must_use]
pub fn with_writer(mut self, writer: Arc<Writer>) -> Self {
self.writer = Some(writer);
self
}
#[must_use]
pub fn allocate_transient_id(&self) -> u64 {
self.transient_counter.fetch_add(1, Ordering::Relaxed) & !(1u64 << 63)
}
#[must_use]
pub fn storage(&self) -> &Arc<StorageManager> {
&self.storage
}
#[must_use]
pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
self.algo_registry.as_ref()
}
#[must_use]
pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
self.procedure_registry.as_ref()
}
#[must_use]
pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
self.xervo_runtime.as_ref()
}
#[must_use]
pub fn property_manager(&self) -> Option<&Arc<PropertyManager>> {
self.property_manager.as_ref()
}
#[must_use]
pub fn target_properties(&self) -> &HashMap<String, Vec<String>> {
&self.target_properties
}
#[must_use]
pub fn yield_items(&self) -> &[(String, Option<String>)] {
&self.yield_items
}
#[must_use]
pub fn expected_schema(&self) -> Option<&SchemaRef> {
self.expected_schema.as_ref()
}
#[must_use]
pub fn l0_context(&self) -> &L0Context {
&self.l0_context
}
#[must_use]
pub fn query_context(&self) -> QueryContext {
use parking_lot::RwLock;
use uni_store::runtime::l0::L0Buffer;
let l0 = self
.l0_context
.current_l0
.clone()
.unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
let mut ctx = QueryContext::new_with_pending(
l0,
self.l0_context.transaction_l0.clone(),
self.l0_context.pending_flush_l0s.clone(),
);
if let Some(deadline) = self.deadline {
ctx.set_deadline(deadline);
}
ctx
}
pub async fn execute_inner_query(
&self,
cypher: &str,
params: &HashMap<String, uni_common::Value>,
mode: ProcedureMode,
) -> anyhow::Result<Vec<HashMap<String, uni_common::Value>>> {
use uni_store::runtime::l0_manager::L0Manager;
use uni_store::runtime::property_manager::PropertyManager as PM;
use crate::query::executor::Executor;
use crate::query::planner::QueryPlanner;
let needs_writer = !matches!(mode, ProcedureMode::Read);
let mut executor = if needs_writer {
let writer = self.writer.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"inner write requires a writer-enabled procedure host \
(mode = {mode:?}); call QueryProcedureHost::with_writer \
at construction time"
)
})?;
Executor::new_with_writer(Arc::clone(&self.storage), Arc::clone(writer))
} else {
Executor::new(Arc::clone(&self.storage))
};
if let Some(current) = self.l0_context.current_l0.as_ref() {
let mut pending = self.l0_context.pending_flush_l0s.clone();
if let Some(tx_l0) = &self.l0_context.transaction_l0 {
pending.push(tx_l0.clone());
}
executor.l0_manager =
Some(Arc::new(L0Manager::from_snapshot(current.clone(), pending)));
}
let schema_manager_arc = self.storage.schema_manager_arc();
let schema = self.storage.schema_manager().schema();
let planner = QueryPlanner::new(schema);
let ast = uni_cypher::parse(cypher)?;
let plan = planner.plan(ast)?;
let prop_manager = if let Some(pm) = &self.property_manager {
Arc::clone(pm)
} else {
Arc::new(PM::new(Arc::clone(&self.storage), schema_manager_arc, 100))
};
executor.execute(plan, &prop_manager, params).await
}
pub fn check_timeout(&self) -> anyhow::Result<()> {
if let Some(ref token) = self.cancellation_token
&& token.is_cancelled()
{
return Err(anyhow::anyhow!("Query cancelled"));
}
if let Some(deadline) = self.deadline
&& Instant::now() > deadline
{
return Err(anyhow::anyhow!("Query timed out"));
}
Ok(())
}
}
impl ProcedureHost for QueryProcedureHost {
fn as_any(&self) -> &dyn Any {
self
}
}