uni_query/query/executor/procedure_host.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Concrete [`ProcedureHost`] implementation backed by a snapshot of
5//! [`GraphExecutionContext`].
6//!
7//! `ProcedurePlugin` impls living in `crates/uni-query/src/procedures_plugin/`
8//! (host-coupled plugins for `uni.schema.*`, `uni.vector.*`, `uni.fts.*`,
9//! `uni.search`, `uni.algo.*`) downcast a `&dyn ProcedureHost` to
10//! [`QueryProcedureHost`] to reach the storage, schema, algorithm
11//! registry, L0 visibility, query context, deadline, and other host
12//! facilities that the `uni-plugin` ABI cannot expose without a cyclic
13//! dependency. This is the interim bridge while the proposal-spec
14//! `session` / `tx` plumbing waits on the M6 ABI freeze.
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Instant;
21
22use arrow_schema::SchemaRef;
23use tokio_util::sync::CancellationToken;
24use uni_algo::algo::AlgorithmRegistry;
25use uni_plugin::traits::procedure::{ProcedureHost, ProcedureMode};
26use uni_store::runtime::context::QueryContext;
27use uni_store::runtime::property_manager::PropertyManager;
28use uni_store::runtime::writer::Writer;
29use uni_store::storage::manager::StorageManager;
30use uni_xervo::runtime::ModelRuntime;
31
32use crate::query::df_graph::{GraphExecutionContext, L0Context};
33use crate::query::executor::procedure::ProcedureRegistry;
34
35// Rust guideline compliant
36
37/// Host facade exposing a snapshot of [`GraphExecutionContext`] to
38/// in-tree procedure plugins.
39///
40/// Built-in host-coupled procedures invoked via `CALL uni.X` receive a
41/// `ProcedureContext` whose `host` field points at a `QueryProcedureHost`
42/// constructed by the dispatch sites
43/// (`procedure_call::execute_plugin_procedure` and
44/// `executor::procedure`). Plugins downcast to recover the concrete
45/// type, then call the typed accessors below.
46///
47/// All fields are owned (Arc-shared) rather than borrowed, so the host
48/// is `'static`-friendly — which is the constraint
49/// [`std::any::Any`] imposes for downcasting. Construction is a small
50/// number of Arc-clones; the per-call cost is negligible.
51#[derive(Clone)]
52pub struct QueryProcedureHost {
53 storage: Arc<StorageManager>,
54 algo_registry: Option<Arc<AlgorithmRegistry>>,
55 procedure_registry: Option<Arc<ProcedureRegistry>>,
56 xervo_runtime: Option<Arc<ModelRuntime>>,
57 property_manager: Option<Arc<PropertyManager>>,
58 l0_context: L0Context,
59 deadline: Option<Instant>,
60 cancellation_token: Option<CancellationToken>,
61 /// Per-request projection map: output variable name → requested
62 /// property names. Populated from the surrounding query's plan in
63 /// `procedure_call.rs::execute_plugin_procedure`. Empty if the
64 /// procedure is invoked without surrounding projection context
65 /// (simple-executor path).
66 target_properties: HashMap<String, Vec<String>>,
67 /// Per-request YIELD list: `(yield_name, alias)`. Search procedures
68 /// (`uni.vector.query` / `.fts.query` / `.search`) need this to
69 /// expand `node` yields into the planner-expected
70 /// `{alias}._vid` / `{alias}` / `{alias}._labels` / `{alias}.X`
71 /// column shape; other plugins ignore it.
72 yield_items: Vec<(String, Option<String>)>,
73 /// Per-request planner-expected output schema. When the plugin
74 /// produces a batch whose schema matches this, the dispatcher
75 /// passes it through without reprojection.
76 expected_schema: Option<SchemaRef>,
77 /// Monotonic per-query counter feeding `allocate_transient_id`
78 /// (M5g). Shared across `Clone`s of the host so all dispatches
79 /// within the same procedure invocation draw from one stream.
80 /// Bottom 63 bits become a `Vid`/`Eid` after OR-ing with
81 /// `Vid::EPHEMERAL_BIT`.
82 transient_counter: Arc<AtomicU64>,
83 /// Outer transaction's writer handle, threaded through when the
84 /// host is constructed inside a write transaction. Required for
85 /// `Write`/`Schema`/`Dbms`-mode invocations of
86 /// [`Self::execute_inner_query`]; when `None`, write-mode inner
87 /// queries fail with a clear "no writer available" error.
88 writer: Option<Arc<Writer>>,
89}
90
91impl QueryProcedureHost {
92 /// Snapshot the host-shaped components of `graph_ctx`. The
93 /// per-request fields start empty; use
94 /// [`Self::from_graph_ctx_with_request`] when the surrounding query
95 /// has projection / yield context.
96 #[must_use]
97 pub fn from_graph_ctx(graph_ctx: &GraphExecutionContext) -> Self {
98 Self::from_graph_ctx_with_request(graph_ctx, HashMap::new(), Vec::new(), None)
99 }
100
101 /// Snapshot the host-shaped components of `graph_ctx` along with
102 /// the per-request projection map, YIELD list, and planner-expected
103 /// output schema. Used by the DataFusion procedure dispatcher
104 /// (`procedure_call.rs::execute_plugin_procedure`) to feed search
105 /// procedures (`uni.vector.query` etc.) everything they need to
106 /// expand `node` yields into the planner-expected column shape.
107 #[must_use]
108 pub fn from_graph_ctx_with_request(
109 graph_ctx: &GraphExecutionContext,
110 target_properties: HashMap<String, Vec<String>>,
111 yield_items: Vec<(String, Option<String>)>,
112 expected_schema: Option<SchemaRef>,
113 ) -> Self {
114 Self {
115 storage: Arc::clone(graph_ctx.storage()),
116 algo_registry: graph_ctx.algo_registry().cloned(),
117 procedure_registry: graph_ctx.procedure_registry().cloned(),
118 xervo_runtime: graph_ctx.xervo_runtime().cloned(),
119 property_manager: Some(Arc::clone(graph_ctx.property_manager())),
120 l0_context: graph_ctx.l0_context().clone(),
121 deadline: graph_ctx.deadline_for_host(),
122 cancellation_token: graph_ctx.cancellation_token_for_host(),
123 target_properties,
124 yield_items,
125 expected_schema,
126 transient_counter: Arc::new(AtomicU64::new(0)),
127 writer: None,
128 }
129 }
130
131 /// Construct a host from raw components (used by the simple
132 /// executor, which holds these directly rather than via a
133 /// `GraphExecutionContext`).
134 #[must_use]
135 pub fn from_components(
136 storage: Arc<StorageManager>,
137 algo_registry: Option<Arc<AlgorithmRegistry>>,
138 procedure_registry: Option<Arc<ProcedureRegistry>>,
139 ) -> Self {
140 Self {
141 storage,
142 algo_registry,
143 procedure_registry,
144 xervo_runtime: None,
145 property_manager: None,
146 l0_context: L0Context::empty(),
147 deadline: None,
148 cancellation_token: None,
149 target_properties: HashMap::new(),
150 yield_items: Vec::new(),
151 expected_schema: None,
152 transient_counter: Arc::new(AtomicU64::new(0)),
153 writer: None,
154 }
155 }
156
157 /// Attach the outer transaction's writer handle to this host.
158 ///
159 /// Required for `Write`/`Schema`/`Dbms`-mode invocations of
160 /// [`Self::execute_inner_query`]. Call sites that construct a host
161 /// inside a write transaction should thread the writer through; the
162 /// inner-query path otherwise has no path to mutate the graph.
163 #[must_use]
164 pub fn with_writer(mut self, writer: Arc<Writer>) -> Self {
165 self.writer = Some(writer);
166 self
167 }
168
169 /// Allocate a fresh transient id, unique within this host's
170 /// lifetime. Wraps the bottom 63 bits and OR-s in the ephemeral
171 /// bit before returning. Use `Vid::ephemeral` / `Eid::ephemeral`
172 /// when you want the typed `Vid` / `Eid` form.
173 ///
174 /// Always available — no capability is required. Per proposal
175 /// §4.13.1, IDs are stable only within a single query execution.
176 #[must_use]
177 pub fn allocate_transient_id(&self) -> u64 {
178 // Bottom 63 bits only (mask in case of wraparound on a long
179 // run); the high bit is OR'd by `Vid::ephemeral` / `Eid::ephemeral`.
180 self.transient_counter.fetch_add(1, Ordering::Relaxed) & !(1u64 << 63)
181 }
182
183 /// Storage manager — schema, datasets, vector / fts search.
184 #[must_use]
185 pub fn storage(&self) -> &Arc<StorageManager> {
186 &self.storage
187 }
188
189 /// Algorithm registry, if the host wired one in.
190 #[must_use]
191 pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
192 self.algo_registry.as_ref()
193 }
194
195 /// Procedure registry, if the host wired one in.
196 #[must_use]
197 pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
198 self.procedure_registry.as_ref()
199 }
200
201 /// Uni-Xervo runtime for query-time auto-embedding, if wired.
202 #[must_use]
203 pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
204 self.xervo_runtime.as_ref()
205 }
206
207 /// Property manager for lazy property loading, if the host wired
208 /// one in. Returns `None` on the simple-executor path
209 /// (`from_components` does not have access to it).
210 #[must_use]
211 pub fn property_manager(&self) -> Option<&Arc<PropertyManager>> {
212 self.property_manager.as_ref()
213 }
214
215 /// Per-request projection map (output variable name → requested
216 /// property names). Empty unless the host was constructed via
217 /// [`Self::from_graph_ctx_with_request`] with non-empty data.
218 #[must_use]
219 pub fn target_properties(&self) -> &HashMap<String, Vec<String>> {
220 &self.target_properties
221 }
222
223 /// Per-request YIELD list as `(yield_name, alias)` pairs.
224 #[must_use]
225 pub fn yield_items(&self) -> &[(String, Option<String>)] {
226 &self.yield_items
227 }
228
229 /// Planner-expected output schema. Used by search procedures to
230 /// emit columns matching the schema the surrounding query plan
231 /// expects, avoiding a name-mismatch reprojection step in the
232 /// dispatcher.
233 #[must_use]
234 pub fn expected_schema(&self) -> Option<&SchemaRef> {
235 self.expected_schema.as_ref()
236 }
237
238 /// L0 visibility context (current / pending / transaction buffers).
239 #[must_use]
240 pub fn l0_context(&self) -> &L0Context {
241 &self.l0_context
242 }
243
244 /// Build a `QueryContext` for property-manager calls. Mirrors
245 /// [`GraphExecutionContext::query_context`].
246 #[must_use]
247 pub fn query_context(&self) -> QueryContext {
248 use parking_lot::RwLock;
249 use uni_store::runtime::l0::L0Buffer;
250
251 let l0 = self
252 .l0_context
253 .current_l0
254 .clone()
255 .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
256 let mut ctx = QueryContext::new_with_pending(
257 l0,
258 self.l0_context.transaction_l0.clone(),
259 self.l0_context.pending_flush_l0s.clone(),
260 );
261 if let Some(deadline) = self.deadline {
262 ctx.set_deadline(deadline);
263 }
264 ctx
265 }
266
267 /// Run an inner Cypher query against the same storage / L0
268 /// snapshot the outer procedure sees, returning the materialised
269 /// row vector.
270 ///
271 /// Used by:
272 ///
273 /// - the V2 algorithm adapter (M5c.3) to materialise
274 /// `ProjectionInput::Cypher { node_query, edge_query, ... }`;
275 /// - the meta-plugin persistence backend (M9 cutover) to issue
276 /// `MERGE (:_DeclaredPlugin {...})` through Cypher;
277 /// - the synthetic-procedure plugin (M9 cutover) to evaluate the
278 /// stored body of a `CALL uni.plugin.declareProcedure(...)`.
279 ///
280 /// `mode` controls which Cypher operations are accepted:
281 ///
282 /// - [`ProcedureMode::Read`] constructs the inner executor without
283 /// a writer; mutation clauses (`CREATE`, `SET`, `MERGE`,
284 /// `DELETE`, `REMOVE`) fail with "Database is in read-only mode".
285 /// - [`ProcedureMode::Write`] / [`ProcedureMode::Schema`] /
286 /// [`ProcedureMode::Dbms`] construct the inner executor with the
287 /// outer transaction's writer handle (set via
288 /// [`Self::with_writer`]); mutations land in the outer
289 /// transaction's L0 buffer. If no writer was threaded through,
290 /// write-mode invocations error with `"inner write requires a
291 /// writer-enabled procedure host"`.
292 ///
293 /// L0 visibility mirrors the outer query's snapshot
294 /// (`l0_context.current_l0` / `transaction_l0` /
295 /// `pending_flush_l0s`) so recently-written rows are visible. The
296 /// `PropertyManager` is reused from the outer host when present;
297 /// otherwise a fresh per-call one is constructed.
298 ///
299 /// `params` are bound into the inner executor by name, exactly as
300 /// `session.query(cypher, params)` would for a top-level Cypher
301 /// query.
302 ///
303 /// # Errors
304 ///
305 /// Returns any parse / plan / execution error from the inner
306 /// query. Write-attempt errors in `Read` mode propagate as the
307 /// host's "Database is in read-only mode" string. Write-mode
308 /// invocations without a writer attached return a clear error
309 /// rather than silently downgrading to read-only.
310 pub async fn execute_inner_query(
311 &self,
312 cypher: &str,
313 params: &HashMap<String, uni_common::Value>,
314 mode: ProcedureMode,
315 ) -> anyhow::Result<Vec<HashMap<String, uni_common::Value>>> {
316 use uni_store::runtime::l0_manager::L0Manager;
317 use uni_store::runtime::property_manager::PropertyManager as PM;
318
319 use crate::query::executor::Executor;
320 use crate::query::planner::QueryPlanner;
321
322 let needs_writer = !matches!(mode, ProcedureMode::Read);
323 let mut executor = if needs_writer {
324 let writer = self.writer.as_ref().ok_or_else(|| {
325 anyhow::anyhow!(
326 "inner write requires a writer-enabled procedure host \
327 (mode = {mode:?}); call QueryProcedureHost::with_writer \
328 at construction time"
329 )
330 })?;
331 Executor::new_with_writer(Arc::clone(&self.storage), Arc::clone(writer))
332 } else {
333 Executor::new(Arc::clone(&self.storage))
334 };
335
336 // Mirror outer L0 visibility into the inner executor.
337 if let Some(current) = self.l0_context.current_l0.as_ref() {
338 let mut pending = self.l0_context.pending_flush_l0s.clone();
339 if let Some(tx_l0) = &self.l0_context.transaction_l0 {
340 pending.push(tx_l0.clone());
341 }
342 executor.l0_manager =
343 Some(Arc::new(L0Manager::from_snapshot(current.clone(), pending)));
344 }
345
346 let schema_manager_arc = self.storage.schema_manager_arc();
347 let schema = self.storage.schema_manager().schema();
348 let planner = QueryPlanner::new(schema);
349 let ast = uni_cypher::parse(cypher)?;
350 let plan = planner.plan(ast)?;
351
352 let prop_manager = if let Some(pm) = &self.property_manager {
353 Arc::clone(pm)
354 } else {
355 Arc::new(PM::new(Arc::clone(&self.storage), schema_manager_arc, 100))
356 };
357
358 executor.execute(plan, &prop_manager, params).await
359 }
360
361 /// Check whether the query has timed out or been cancelled.
362 ///
363 /// # Errors
364 ///
365 /// Returns an error if the deadline has passed or the cancellation
366 /// token has been triggered.
367 pub fn check_timeout(&self) -> anyhow::Result<()> {
368 if let Some(ref token) = self.cancellation_token
369 && token.is_cancelled()
370 {
371 return Err(anyhow::anyhow!("Query cancelled"));
372 }
373 if let Some(deadline) = self.deadline
374 && Instant::now() > deadline
375 {
376 return Err(anyhow::anyhow!("Query timed out"));
377 }
378 Ok(())
379 }
380}
381
382impl ProcedureHost for QueryProcedureHost {
383 fn as_any(&self) -> &dyn Any {
384 self
385 }
386}