uni_plugin/traits/procedure.rs
1//! Cypher procedure plugins — `CALL ... YIELD ...`.
2//!
3//! Procedures differ from scalar functions in three ways: they can perform
4//! writes, they return streams of rows (`YIELD a, b, c`), and they may
5//! take optional input streams (`CALL ... { } IN TRANSACTIONS OF N`).
6
7use std::any::Any;
8use std::time::Duration;
9
10use arrow_schema::Field;
11use datafusion::execution::SendableRecordBatchStream;
12use datafusion::logical_expr::ColumnarValue;
13use datafusion::scalar::ScalarValue;
14use smol_str::SmolStr;
15
16use crate::capability::SideEffects;
17use crate::errors::FnError;
18use crate::traits::connector::Principal;
19use crate::traits::scalar::ArgType;
20
21/// A Cypher procedure plugin — `CALL uni.foo.bar(args) YIELD ...`.
22///
23/// Procedures return a stream of `RecordBatch`es; the host attaches the
24/// stream to the surrounding query plan via a `ProcedureCallExec` node.
25pub trait ProcedurePlugin: Send + Sync {
26 /// Static signature.
27 fn signature(&self) -> &ProcedureSignature;
28
29 /// Invoke the procedure with the given arguments and execution context.
30 ///
31 /// The returned stream is consumed lazily by downstream `YIELD`. The
32 /// procedure is responsible for cooperatively yielding to the executor
33 /// (no long blocking calls; use `tokio::task::yield_now` between batches).
34 ///
35 /// # Errors
36 ///
37 /// Returns [`FnError`] if the procedure cannot start (validation
38 /// failure, capability check). Errors raised *during* stream production
39 /// are signaled via `Err` items in the stream.
40 fn invoke(
41 &self,
42 ctx: ProcedureContext<'_>,
43 args: &[ColumnarValue],
44 ) -> Result<SendableRecordBatchStream, FnError>;
45}
46
47/// Static signature of a procedure.
48#[derive(Clone, Debug)]
49pub struct ProcedureSignature {
50 /// Named arguments, in declaration order.
51 pub args: Vec<NamedArgType>,
52 /// Schema of the `YIELD` columns.
53 pub yields: Vec<Field>,
54 /// Mode declaration — drives capability requirements.
55 pub mode: ProcedureMode,
56 /// Declared side-effects.
57 pub side_effects: SideEffects,
58 /// Optional retry contract for atomic / CAS-style procedures.
59 pub retry_contract: Option<RetryContract>,
60 /// Optional batch-input shape for `CALL { } IN TRANSACTIONS OF N`.
61 pub batch_input: Option<BatchInputShape>,
62 /// Markdown docs surfaced via `uni.plugin.help`.
63 pub docs: String,
64}
65
66/// Named procedure argument.
67#[derive(Clone, Debug)]
68pub struct NamedArgType {
69 /// Argument name (as `CALL fn(name => value)`).
70 pub name: SmolStr,
71 /// Argument type.
72 pub ty: ArgType,
73 /// Default value if omitted at call site.
74 pub default: Option<ScalarValue>,
75 /// Human-readable description.
76 pub doc: String,
77}
78
79/// Procedure-mode declaration.
80#[derive(Clone, Copy, Debug, PartialEq, Eq)]
81#[non_exhaustive]
82pub enum ProcedureMode {
83 /// Read-only; requires `Capability::Procedure`.
84 Read,
85 /// May mutate graph; requires `Capability::Procedure + ProcedureWrites`.
86 Write,
87 /// May issue DDL; requires `Capability::Procedure + ProcedureSchema`.
88 Schema,
89 /// Administrative; requires `Capability::Procedure + ProcedureDbms`.
90 Dbms,
91}
92
93/// Retry contract for procedures with optimistic-CAS semantics.
94#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95#[non_exhaustive]
96pub enum RetryContract {
97 /// Host will re-run the procedure on retryable conflict up to
98 /// `max_retries` times.
99 Atomic {
100 /// Maximum retry count before giving up.
101 max_retries: u32,
102 },
103}
104
105/// Shape of an optional input stream for `CALL { } IN TRANSACTIONS OF N`.
106#[derive(Clone, Copy, Debug, PartialEq, Eq)]
107#[non_exhaustive]
108pub enum BatchInputShape {
109 /// Plain rows; the host batches them into N-row groups.
110 Rows,
111}
112
113/// Marker trait for the host's procedure execution facilities.
114///
115/// Concrete hosts (such as `uni-query`'s `QueryProcedureHost`) implement
116/// this and expose typed accessors on the concrete type. Plugins
117/// downcast through [`ProcedureHost::as_any`] when they need
118/// host-specific facilities (snapshot, schema manager, vector search,
119/// algorithm registry). The trait is intentionally tiny — adding a new
120/// host accessor does NOT touch the plugin ABI.
121///
122/// The proposal-spec `session: &Session` / `tx: Option<&Transaction>`
123/// fields land in M6 once the public `Session` trait stabilizes; until
124/// then the host pointer is the interim bridge for in-tree built-ins.
125pub trait ProcedureHost: Send + Sync + Any {
126 /// Returns the host as a downcastable `&dyn Any`.
127 fn as_any(&self) -> &dyn Any;
128}
129
130/// Per-call context passed to [`ProcedurePlugin::invoke`].
131///
132/// Carries an optional host pointer (for in-tree built-ins that need
133/// snapshot / schema / algorithm access), an optional principal (for
134/// capability gating), and an optional wall-clock deadline. All fields
135/// are `Option` so pure procedures and unit tests can construct a
136/// context with [`ProcedureContext::default`].
137#[derive(Default)]
138#[non_exhaustive]
139pub struct ProcedureContext<'a> {
140 /// Host services pointer; `None` in pure procedure tests.
141 pub host: Option<&'a dyn ProcedureHost>,
142 /// Optional wall-clock deadline for the procedure invocation.
143 pub deadline: Option<Duration>,
144 /// Authenticated principal, if any.
145 pub principal: Option<&'a Principal>,
146 /// Lifetime marker. The plugin ABI keeps `'a` exposed so future
147 /// fields (session / transaction) can borrow without a breaking
148 /// change.
149 pub _marker: std::marker::PhantomData<&'a ()>,
150}
151
152impl<'a> ProcedureContext<'a> {
153 /// Construct a context with every field set to `None`.
154 #[must_use]
155 pub fn new() -> Self {
156 Self::default()
157 }
158
159 /// Attach a host pointer.
160 #[must_use]
161 pub fn with_host(mut self, host: &'a dyn ProcedureHost) -> Self {
162 self.host = Some(host);
163 self
164 }
165
166 /// Attach a wall-clock deadline.
167 #[must_use]
168 pub fn with_deadline(mut self, deadline: Duration) -> Self {
169 self.deadline = Some(deadline);
170 self
171 }
172
173 /// Attach an authenticated principal.
174 #[must_use]
175 pub fn with_principal(mut self, principal: &'a Principal) -> Self {
176 self.principal = Some(principal);
177 self
178 }
179}
180
181impl std::fmt::Debug for ProcedureContext<'_> {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_struct("ProcedureContext")
184 .field("host", &self.host.map(|_| "<host>"))
185 .field("deadline", &self.deadline)
186 .field("principal", &self.principal)
187 .finish()
188 }
189}