Skip to main content

uni_query/query/executor/
procedure_host.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Concrete [`ProcedureHost`] implementation backed by a snapshot of
5//! [`GraphExecutionContext`].
6//!
7//! `ProcedurePlugin` impls living in `crates/uni-query/src/procedures_plugin/`
8//! (host-coupled plugins for `uni.schema.*`, `uni.vector.*`, `uni.fts.*`,
9//! `uni.search`, `uni.algo.*`) downcast a `&dyn ProcedureHost` to
10//! [`QueryProcedureHost`] to reach the storage, schema, algorithm
11//! registry, L0 visibility, query context, deadline, and other host
12//! facilities that the `uni-plugin` ABI cannot expose without a cyclic
13//! dependency. This is the interim bridge while the proposal-spec
14//! `session` / `tx` plumbing waits on the M6 ABI freeze.
15
16use std::any::Any;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Instant;
21
22use arrow_schema::SchemaRef;
23use tokio_util::sync::CancellationToken;
24use uni_algo::algo::AlgorithmRegistry;
25use uni_plugin::traits::procedure::{ProcedureHost, ProcedureMode};
26use uni_store::runtime::context::QueryContext;
27use uni_store::runtime::property_manager::PropertyManager;
28use uni_store::runtime::writer::Writer;
29use uni_store::storage::manager::StorageManager;
30use uni_xervo::runtime::ModelRuntime;
31
32use crate::query::df_graph::{GraphExecutionContext, L0Context};
33use crate::query::executor::procedure::ProcedureRegistry;
34
35// Rust guideline compliant
36
37/// Host facade exposing a snapshot of [`GraphExecutionContext`] to
38/// in-tree procedure plugins.
39///
40/// Built-in host-coupled procedures invoked via `CALL uni.X` receive a
41/// `ProcedureContext` whose `host` field points at a `QueryProcedureHost`
42/// constructed by the dispatch sites
43/// (`procedure_call::execute_plugin_procedure` and
44/// `executor::procedure`). Plugins downcast to recover the concrete
45/// type, then call the typed accessors below.
46///
47/// All fields are owned (Arc-shared) rather than borrowed, so the host
48/// is `'static`-friendly — which is the constraint
49/// [`std::any::Any`] imposes for downcasting. Construction is a small
50/// number of Arc-clones; the per-call cost is negligible.
51#[derive(Clone)]
52pub struct QueryProcedureHost {
53    storage: Arc<StorageManager>,
54    algo_registry: Option<Arc<AlgorithmRegistry>>,
55    procedure_registry: Option<Arc<ProcedureRegistry>>,
56    xervo_runtime: Option<Arc<ModelRuntime>>,
57    property_manager: Option<Arc<PropertyManager>>,
58    l0_context: L0Context,
59    deadline: Option<Instant>,
60    cancellation_token: Option<CancellationToken>,
61    /// Per-request projection map: output variable name → requested
62    /// property names. Populated from the surrounding query's plan in
63    /// `procedure_call.rs::execute_plugin_procedure`. Empty if the
64    /// procedure is invoked without surrounding projection context
65    /// (simple-executor path).
66    target_properties: HashMap<String, Vec<String>>,
67    /// Per-request YIELD list: `(yield_name, alias)`. Search procedures
68    /// (`uni.vector.query` / `.fts.query` / `.search`) need this to
69    /// expand `node` yields into the planner-expected
70    /// `{alias}._vid` / `{alias}` / `{alias}._labels` / `{alias}.X`
71    /// column shape; other plugins ignore it.
72    yield_items: Vec<(String, Option<String>)>,
73    /// Per-request planner-expected output schema. When the plugin
74    /// produces a batch whose schema matches this, the dispatcher
75    /// passes it through without reprojection.
76    expected_schema: Option<SchemaRef>,
77    /// Monotonic per-query counter feeding `allocate_transient_id`
78    /// (M5g). Shared across `Clone`s of the host so all dispatches
79    /// within the same procedure invocation draw from one stream.
80    /// Bottom 63 bits become a `Vid`/`Eid` after OR-ing with
81    /// `Vid::EPHEMERAL_BIT`.
82    transient_counter: Arc<AtomicU64>,
83    /// Outer transaction's writer handle, threaded through when the
84    /// host is constructed inside a write transaction. Required for
85    /// `Write`/`Schema`/`Dbms`-mode invocations of
86    /// [`Self::execute_inner_query`]; when `None`, write-mode inner
87    /// queries fail with a clear "no writer available" error.
88    writer: Option<Arc<Writer>>,
89}
90
91impl QueryProcedureHost {
92    /// Snapshot the host-shaped components of `graph_ctx`. The
93    /// per-request fields start empty; use
94    /// [`Self::from_graph_ctx_with_request`] when the surrounding query
95    /// has projection / yield context.
96    #[must_use]
97    pub fn from_graph_ctx(graph_ctx: &GraphExecutionContext) -> Self {
98        Self::from_graph_ctx_with_request(graph_ctx, HashMap::new(), Vec::new(), None)
99    }
100
101    /// Snapshot the host-shaped components of `graph_ctx` along with
102    /// the per-request projection map, YIELD list, and planner-expected
103    /// output schema. Used by the DataFusion procedure dispatcher
104    /// (`procedure_call.rs::execute_plugin_procedure`) to feed search
105    /// procedures (`uni.vector.query` etc.) everything they need to
106    /// expand `node` yields into the planner-expected column shape.
107    #[must_use]
108    pub fn from_graph_ctx_with_request(
109        graph_ctx: &GraphExecutionContext,
110        target_properties: HashMap<String, Vec<String>>,
111        yield_items: Vec<(String, Option<String>)>,
112        expected_schema: Option<SchemaRef>,
113    ) -> Self {
114        Self {
115            storage: Arc::clone(graph_ctx.storage()),
116            algo_registry: graph_ctx.algo_registry().cloned(),
117            procedure_registry: graph_ctx.procedure_registry().cloned(),
118            xervo_runtime: graph_ctx.xervo_runtime().cloned(),
119            property_manager: Some(Arc::clone(graph_ctx.property_manager())),
120            l0_context: graph_ctx.l0_context().clone(),
121            deadline: graph_ctx.deadline_for_host(),
122            cancellation_token: graph_ctx.cancellation_token_for_host(),
123            target_properties,
124            yield_items,
125            expected_schema,
126            transient_counter: Arc::new(AtomicU64::new(0)),
127            writer: None,
128        }
129    }
130
131    /// Construct a host from raw components (used by the simple
132    /// executor, which holds these directly rather than via a
133    /// `GraphExecutionContext`).
134    #[must_use]
135    pub fn from_components(
136        storage: Arc<StorageManager>,
137        algo_registry: Option<Arc<AlgorithmRegistry>>,
138        procedure_registry: Option<Arc<ProcedureRegistry>>,
139    ) -> Self {
140        Self {
141            storage,
142            algo_registry,
143            procedure_registry,
144            xervo_runtime: None,
145            property_manager: None,
146            l0_context: L0Context::empty(),
147            deadline: None,
148            cancellation_token: None,
149            target_properties: HashMap::new(),
150            yield_items: Vec::new(),
151            expected_schema: None,
152            transient_counter: Arc::new(AtomicU64::new(0)),
153            writer: None,
154        }
155    }
156
157    /// Attach the outer transaction's writer handle to this host.
158    ///
159    /// Required for `Write`/`Schema`/`Dbms`-mode invocations of
160    /// [`Self::execute_inner_query`]. Call sites that construct a host
161    /// inside a write transaction should thread the writer through; the
162    /// inner-query path otherwise has no path to mutate the graph.
163    #[must_use]
164    pub fn with_writer(mut self, writer: Arc<Writer>) -> Self {
165        self.writer = Some(writer);
166        self
167    }
168
169    /// Allocate a fresh transient id, unique within this host's
170    /// lifetime. Wraps the bottom 63 bits and OR-s in the ephemeral
171    /// bit before returning. Use `Vid::ephemeral` / `Eid::ephemeral`
172    /// when you want the typed `Vid` / `Eid` form.
173    ///
174    /// Always available — no capability is required. Per proposal
175    /// §4.13.1, IDs are stable only within a single query execution.
176    #[must_use]
177    pub fn allocate_transient_id(&self) -> u64 {
178        // Bottom 63 bits only (mask in case of wraparound on a long
179        // run); the high bit is OR'd by `Vid::ephemeral` / `Eid::ephemeral`.
180        self.transient_counter.fetch_add(1, Ordering::Relaxed) & !(1u64 << 63)
181    }
182
183    /// Storage manager — schema, datasets, vector / fts search.
184    #[must_use]
185    pub fn storage(&self) -> &Arc<StorageManager> {
186        &self.storage
187    }
188
189    /// Algorithm registry, if the host wired one in.
190    #[must_use]
191    pub fn algo_registry(&self) -> Option<&Arc<AlgorithmRegistry>> {
192        self.algo_registry.as_ref()
193    }
194
195    /// Procedure registry, if the host wired one in.
196    #[must_use]
197    pub fn procedure_registry(&self) -> Option<&Arc<ProcedureRegistry>> {
198        self.procedure_registry.as_ref()
199    }
200
201    /// Uni-Xervo runtime for query-time auto-embedding, if wired.
202    #[must_use]
203    pub fn xervo_runtime(&self) -> Option<&Arc<ModelRuntime>> {
204        self.xervo_runtime.as_ref()
205    }
206
207    /// Property manager for lazy property loading, if the host wired
208    /// one in. Returns `None` on the simple-executor path
209    /// (`from_components` does not have access to it).
210    #[must_use]
211    pub fn property_manager(&self) -> Option<&Arc<PropertyManager>> {
212        self.property_manager.as_ref()
213    }
214
215    /// Per-request projection map (output variable name → requested
216    /// property names). Empty unless the host was constructed via
217    /// [`Self::from_graph_ctx_with_request`] with non-empty data.
218    #[must_use]
219    pub fn target_properties(&self) -> &HashMap<String, Vec<String>> {
220        &self.target_properties
221    }
222
223    /// Per-request YIELD list as `(yield_name, alias)` pairs.
224    #[must_use]
225    pub fn yield_items(&self) -> &[(String, Option<String>)] {
226        &self.yield_items
227    }
228
229    /// Planner-expected output schema. Used by search procedures to
230    /// emit columns matching the schema the surrounding query plan
231    /// expects, avoiding a name-mismatch reprojection step in the
232    /// dispatcher.
233    #[must_use]
234    pub fn expected_schema(&self) -> Option<&SchemaRef> {
235        self.expected_schema.as_ref()
236    }
237
238    /// L0 visibility context (current / pending / transaction buffers).
239    #[must_use]
240    pub fn l0_context(&self) -> &L0Context {
241        &self.l0_context
242    }
243
244    /// Build a `QueryContext` for property-manager calls. Mirrors
245    /// [`GraphExecutionContext::query_context`].
246    #[must_use]
247    pub fn query_context(&self) -> QueryContext {
248        use parking_lot::RwLock;
249        use uni_store::runtime::l0::L0Buffer;
250
251        let l0 = self
252            .l0_context
253            .current_l0
254            .clone()
255            .unwrap_or_else(|| Arc::new(RwLock::new(L0Buffer::new(0, None))));
256        let mut ctx = QueryContext::new_with_pending(
257            l0,
258            self.l0_context.transaction_l0.clone(),
259            self.l0_context.pending_flush_l0s.clone(),
260        );
261        if let Some(deadline) = self.deadline {
262            ctx.set_deadline(deadline);
263        }
264        ctx
265    }
266
267    /// Run an inner Cypher query against the same storage / L0
268    /// snapshot the outer procedure sees, returning the materialised
269    /// row vector.
270    ///
271    /// Used by:
272    ///
273    /// - the V2 algorithm adapter (M5c.3) to materialise
274    ///   `ProjectionInput::Cypher { node_query, edge_query, ... }`;
275    /// - the meta-plugin persistence backend (M9 cutover) to issue
276    ///   `MERGE (:_DeclaredPlugin {...})` through Cypher;
277    /// - the synthetic-procedure plugin (M9 cutover) to evaluate the
278    ///   stored body of a `CALL uni.plugin.declareProcedure(...)`.
279    ///
280    /// `mode` controls which Cypher operations are accepted:
281    ///
282    /// - [`ProcedureMode::Read`] constructs the inner executor without
283    ///   a writer; mutation clauses (`CREATE`, `SET`, `MERGE`,
284    ///   `DELETE`, `REMOVE`) fail with "Database is in read-only mode".
285    /// - [`ProcedureMode::Write`] / [`ProcedureMode::Schema`] /
286    ///   [`ProcedureMode::Dbms`] construct the inner executor with the
287    ///   outer transaction's writer handle (set via
288    ///   [`Self::with_writer`]); mutations land in the outer
289    ///   transaction's L0 buffer. If no writer was threaded through,
290    ///   write-mode invocations error with `"inner write requires a
291    ///   writer-enabled procedure host"`.
292    ///
293    /// L0 visibility mirrors the outer query's snapshot
294    /// (`l0_context.current_l0` / `transaction_l0` /
295    /// `pending_flush_l0s`) so recently-written rows are visible. The
296    /// `PropertyManager` is reused from the outer host when present;
297    /// otherwise a fresh per-call one is constructed.
298    ///
299    /// `params` are bound into the inner executor by name, exactly as
300    /// `session.query(cypher, params)` would for a top-level Cypher
301    /// query.
302    ///
303    /// # Errors
304    ///
305    /// Returns any parse / plan / execution error from the inner
306    /// query. Write-attempt errors in `Read` mode propagate as the
307    /// host's "Database is in read-only mode" string. Write-mode
308    /// invocations without a writer attached return a clear error
309    /// rather than silently downgrading to read-only.
310    pub async fn execute_inner_query(
311        &self,
312        cypher: &str,
313        params: &HashMap<String, uni_common::Value>,
314        mode: ProcedureMode,
315    ) -> anyhow::Result<Vec<HashMap<String, uni_common::Value>>> {
316        use uni_store::runtime::l0_manager::L0Manager;
317        use uni_store::runtime::property_manager::PropertyManager as PM;
318
319        use crate::query::executor::Executor;
320        use crate::query::planner::QueryPlanner;
321
322        let needs_writer = !matches!(mode, ProcedureMode::Read);
323        let mut executor = if needs_writer {
324            let writer = self.writer.as_ref().ok_or_else(|| {
325                anyhow::anyhow!(
326                    "inner write requires a writer-enabled procedure host \
327                     (mode = {mode:?}); call QueryProcedureHost::with_writer \
328                     at construction time"
329                )
330            })?;
331            Executor::new_with_writer(Arc::clone(&self.storage), Arc::clone(writer))
332        } else {
333            Executor::new(Arc::clone(&self.storage))
334        };
335
336        // Mirror outer L0 visibility into the inner executor.
337        if let Some(current) = self.l0_context.current_l0.as_ref() {
338            let mut pending = self.l0_context.pending_flush_l0s.clone();
339            if let Some(tx_l0) = &self.l0_context.transaction_l0 {
340                pending.push(tx_l0.clone());
341            }
342            executor.l0_manager =
343                Some(Arc::new(L0Manager::from_snapshot(current.clone(), pending)));
344        }
345
346        let schema_manager_arc = self.storage.schema_manager_arc();
347        let schema = self.storage.schema_manager().schema();
348        let planner = QueryPlanner::new(schema);
349        let ast = uni_cypher::parse(cypher)?;
350        let plan = planner.plan(ast)?;
351
352        let prop_manager = if let Some(pm) = &self.property_manager {
353            Arc::clone(pm)
354        } else {
355            Arc::new(PM::new(Arc::clone(&self.storage), schema_manager_arc, 100))
356        };
357
358        executor.execute(plan, &prop_manager, params).await
359    }
360
361    /// Check whether the query has timed out or been cancelled.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if the deadline has passed or the cancellation
366    /// token has been triggered.
367    pub fn check_timeout(&self) -> anyhow::Result<()> {
368        if let Some(ref token) = self.cancellation_token
369            && token.is_cancelled()
370        {
371            return Err(anyhow::anyhow!("Query cancelled"));
372        }
373        if let Some(deadline) = self.deadline
374            && Instant::now() > deadline
375        {
376            return Err(anyhow::anyhow!("Query timed out"));
377        }
378        Ok(())
379    }
380}
381
382impl ProcedureHost for QueryProcedureHost {
383    fn as_any(&self) -> &dyn Any {
384        self
385    }
386}