Skip to main content

uni_plugin/traits/
procedure.rs

1//! Cypher procedure plugins — `CALL ... YIELD ...`.
2//!
3//! Procedures differ from scalar functions in three ways: they can perform
4//! writes, they return streams of rows (`YIELD a, b, c`), and they may
5//! take optional input streams (`CALL ... { } IN TRANSACTIONS OF N`).
6
7use std::any::Any;
8use std::time::Duration;
9
10use arrow_schema::Field;
11use datafusion::execution::SendableRecordBatchStream;
12use datafusion::logical_expr::ColumnarValue;
13use datafusion::scalar::ScalarValue;
14use smol_str::SmolStr;
15
16use crate::capability::SideEffects;
17use crate::errors::FnError;
18use crate::traits::connector::Principal;
19use crate::traits::scalar::ArgType;
20
21/// A Cypher procedure plugin — `CALL uni.foo.bar(args) YIELD ...`.
22///
23/// Procedures return a stream of `RecordBatch`es; the host attaches the
24/// stream to the surrounding query plan via a `ProcedureCallExec` node.
25pub trait ProcedurePlugin: Send + Sync {
26    /// Static signature.
27    fn signature(&self) -> &ProcedureSignature;
28
29    /// Invoke the procedure with the given arguments and execution context.
30    ///
31    /// The returned stream is consumed lazily by downstream `YIELD`. The
32    /// procedure is responsible for cooperatively yielding to the executor
33    /// (no long blocking calls; use `tokio::task::yield_now` between batches).
34    ///
35    /// # Errors
36    ///
37    /// Returns [`FnError`] if the procedure cannot start (validation
38    /// failure, capability check). Errors raised *during* stream production
39    /// are signaled via `Err` items in the stream.
40    fn invoke(
41        &self,
42        ctx: ProcedureContext<'_>,
43        args: &[ColumnarValue],
44    ) -> Result<SendableRecordBatchStream, FnError>;
45}
46
47/// Static signature of a procedure.
48#[derive(Clone, Debug)]
49pub struct ProcedureSignature {
50    /// Named arguments, in declaration order.
51    pub args: Vec<NamedArgType>,
52    /// Schema of the `YIELD` columns.
53    pub yields: Vec<Field>,
54    /// Mode declaration — drives capability requirements.
55    pub mode: ProcedureMode,
56    /// Declared side-effects.
57    pub side_effects: SideEffects,
58    /// Optional retry contract for atomic / CAS-style procedures.
59    pub retry_contract: Option<RetryContract>,
60    /// Optional batch-input shape for `CALL { } IN TRANSACTIONS OF N`.
61    pub batch_input: Option<BatchInputShape>,
62    /// Markdown docs surfaced via `uni.plugin.help`.
63    pub docs: String,
64}
65
66/// Named procedure argument.
67#[derive(Clone, Debug)]
68pub struct NamedArgType {
69    /// Argument name (as `CALL fn(name => value)`).
70    pub name: SmolStr,
71    /// Argument type.
72    pub ty: ArgType,
73    /// Default value if omitted at call site.
74    pub default: Option<ScalarValue>,
75    /// Human-readable description.
76    pub doc: String,
77}
78
79/// Procedure-mode declaration.
80#[derive(Clone, Copy, Debug, PartialEq, Eq)]
81#[non_exhaustive]
82pub enum ProcedureMode {
83    /// Read-only; requires `Capability::Procedure`.
84    Read,
85    /// May mutate graph; requires `Capability::Procedure + ProcedureWrites`.
86    Write,
87    /// May issue DDL; requires `Capability::Procedure + ProcedureSchema`.
88    Schema,
89    /// Administrative; requires `Capability::Procedure + ProcedureDbms`.
90    Dbms,
91}
92
93/// Retry contract for procedures with optimistic-CAS semantics.
94#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95#[non_exhaustive]
96pub enum RetryContract {
97    /// Host will re-run the procedure on retryable conflict up to
98    /// `max_retries` times.
99    Atomic {
100        /// Maximum retry count before giving up.
101        max_retries: u32,
102    },
103}
104
105/// Shape of an optional input stream for `CALL { } IN TRANSACTIONS OF N`.
106#[derive(Clone, Copy, Debug, PartialEq, Eq)]
107#[non_exhaustive]
108pub enum BatchInputShape {
109    /// Plain rows; the host batches them into N-row groups.
110    Rows,
111}
112
113/// Marker trait for the host's procedure execution facilities.
114///
115/// Concrete hosts (such as `uni-query`'s `QueryProcedureHost`) implement
116/// this and expose typed accessors on the concrete type. Plugins
117/// downcast through [`ProcedureHost::as_any`] when they need
118/// host-specific facilities (snapshot, schema manager, vector search,
119/// algorithm registry). The trait is intentionally tiny — adding a new
120/// host accessor does NOT touch the plugin ABI.
121///
122/// The proposal-spec `session: &Session` / `tx: Option<&Transaction>`
123/// fields land in M6 once the public `Session` trait stabilizes; until
124/// then the host pointer is the interim bridge for in-tree built-ins.
125pub trait ProcedureHost: Send + Sync + Any {
126    /// Returns the host as a downcastable `&dyn Any`.
127    fn as_any(&self) -> &dyn Any;
128}
129
130/// Per-call context passed to [`ProcedurePlugin::invoke`].
131///
132/// Carries an optional host pointer (for in-tree built-ins that need
133/// snapshot / schema / algorithm access), an optional principal (for
134/// capability gating), and an optional wall-clock deadline. All fields
135/// are `Option` so pure procedures and unit tests can construct a
136/// context with [`ProcedureContext::default`].
137#[derive(Default)]
138#[non_exhaustive]
139pub struct ProcedureContext<'a> {
140    /// Host services pointer; `None` in pure procedure tests.
141    pub host: Option<&'a dyn ProcedureHost>,
142    /// Optional wall-clock deadline for the procedure invocation.
143    pub deadline: Option<Duration>,
144    /// Authenticated principal, if any.
145    pub principal: Option<&'a Principal>,
146    /// Lifetime marker. The plugin ABI keeps `'a` exposed so future
147    /// fields (session / transaction) can borrow without a breaking
148    /// change.
149    pub _marker: std::marker::PhantomData<&'a ()>,
150}
151
152impl<'a> ProcedureContext<'a> {
153    /// Construct a context with every field set to `None`.
154    #[must_use]
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Attach a host pointer.
160    #[must_use]
161    pub fn with_host(mut self, host: &'a dyn ProcedureHost) -> Self {
162        self.host = Some(host);
163        self
164    }
165
166    /// Attach a wall-clock deadline.
167    #[must_use]
168    pub fn with_deadline(mut self, deadline: Duration) -> Self {
169        self.deadline = Some(deadline);
170        self
171    }
172
173    /// Attach an authenticated principal.
174    #[must_use]
175    pub fn with_principal(mut self, principal: &'a Principal) -> Self {
176        self.principal = Some(principal);
177        self
178    }
179}
180
181impl std::fmt::Debug for ProcedureContext<'_> {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        f.debug_struct("ProcedureContext")
184            .field("host", &self.host.map(|_| "<host>"))
185            .field("deadline", &self.deadline)
186            .field("principal", &self.principal)
187            .finish()
188    }
189}