Skip to main content

uni_plugin_host/
hooks.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Session hooks — before/after interception for queries and commits.
5//!
6//! Hooks allow cross-cutting concerns (audit logging, authorization, metrics)
7//! to be injected into the query and commit lifecycle without modifying
8//! individual query call sites.
9
10use std::collections::HashMap;
11
12use uni_common::{Result, Value};
13use uni_query::QueryMetrics;
14
15use crate::commit_result::CommitResult;
16
17/// The type of query being executed.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum QueryType {
20    /// A Cypher query (read or write).
21    Cypher,
22    /// A Locy program evaluation.
23    Locy,
24    /// An execute (mutation) statement.
25    Execute,
26}
27
28/// Context passed to query hooks.
29#[derive(Debug, Clone)]
30pub struct HookContext {
31    /// The session ID that initiated the query.
32    pub session_id: String,
33    /// The query text (Cypher or Locy program).
34    pub query_text: String,
35    /// The type of query.
36    pub query_type: QueryType,
37    /// Parameters bound to the query.
38    pub params: HashMap<String, Value>,
39}
40
41/// Context passed to commit hooks.
42#[derive(Debug, Clone)]
43pub struct CommitHookContext {
44    /// The session ID that owns the transaction.
45    pub session_id: String,
46    /// The transaction ID being committed.
47    pub tx_id: String,
48    /// Number of mutations in the transaction.
49    pub mutation_count: usize,
50}
51
52/// Trait for session lifecycle hooks.
53///
54/// Implement this trait to intercept queries and commits at the session level.
55/// Hooks are stored as `Arc<dyn SessionHook>` and can be shared across sessions
56/// and templates.
57///
58/// # Failure Semantics
59///
60/// - `before_query`: Returning `Err` aborts the query with `HookRejected`.
61/// - `after_query`: Infallible — panics are caught and logged.
62/// - `before_commit`: Returning `Err` aborts the commit with `HookRejected`.
63/// - `after_commit`: Infallible — panics are caught and logged.
64pub trait SessionHook: Send + Sync {
65    /// Called before a query is executed. Return `Err` to reject the query.
66    fn before_query(&self, _ctx: &HookContext) -> Result<()> {
67        Ok(())
68    }
69
70    /// Called after a query completes. Panics are caught and logged.
71    fn after_query(&self, _ctx: &HookContext, _metrics: &QueryMetrics) {}
72
73    /// Called before a transaction is committed. Return `Err` to reject the commit.
74    fn before_commit(&self, _ctx: &CommitHookContext) -> Result<()> {
75        Ok(())
76    }
77
78    /// Called after a transaction is successfully committed. Panics are caught and logged.
79    fn after_commit(&self, _ctx: &CommitHookContext, _result: &CommitResult) {}
80}
81
82// ============================================================================
83// M5e — Phased-hook bridge.
84//
85// The plugin framework ships a richer, Postgres-style phased SessionHook
86// (`uni_plugin::traits::hook::SessionHook`) with on_parse / on_analyze /
87// on_plan / on_execute_start / on_execute_end / before_commit /
88// after_commit / on_abort. [`LegacyHookAdapter`] wraps a legacy
89// 4-method [`SessionHook`] into the phased trait so existing hooks can
90// be registered through `PluginRegistrar::hook()` without being
91// rewritten.
92//
93// Routing chosen to match the legacy semantics:
94// - legacy `before_query` → phased `on_parse` (earliest pre-execution
95//   phase that can reject).
96// - legacy `after_query`  → phased `on_execute_end`.
97// - legacy `before_commit`→ phased `before_commit`.
98// - legacy `after_commit` → phased `after_commit`.
99//
100// Phases the legacy trait does not model (analyze, plan,
101// execute_start, abort) are pass-through `Continue`s.
102//
103// `Session::add_hook` continues to dispatch through its in-process
104// HashMap for legacy compatibility; the bridge enables hooks to ALSO
105// participate in the plugin registry's phased dispatch when the host
106// chooses to surface them that way (the migration of Session's own
107// dispatch onto the plugin registry is a separate, larger change).
108// ============================================================================
109
110use std::sync::Arc;
111
112use datafusion::scalar::ScalarValue;
113use uni_plugin::errors::HookOutcome;
114use uni_plugin::traits::hook::{
115    AbortContext, AnalyzeContext, CommitContext as PluginCommitContext, ExecuteContext,
116    ParseContext, PlanContext, QueryMetrics as PluginQueryMetrics, QueryType as PluginQueryType,
117    SessionHook as PluginSessionHook,
118};
119
120/// Adapter: wraps a legacy [`SessionHook`] so it satisfies the phased
121/// [`uni_plugin::traits::hook::SessionHook`] contract and can be
122/// registered through [`uni_plugin::PluginRegistrar::hook`].
123///
124/// The adapter holds the legacy hook by `Arc` so multiple registrations
125/// (legacy `Session::add_hook` and a phased `Uni::add_plugin`) can share
126/// the same underlying implementation without duplicating its state.
127pub struct LegacyHookAdapter {
128    name: String,
129    inner: Arc<dyn SessionHook>,
130}
131
132impl LegacyHookAdapter {
133    /// Construct an adapter from any legacy [`SessionHook`].
134    ///
135    /// The `name` is used only for diagnostics — phased dispatch keys
136    /// off the registered hook entry, not the legacy name.
137    #[must_use]
138    pub fn new(name: impl Into<String>, inner: Arc<dyn SessionHook>) -> Self {
139        Self {
140            name: name.into(),
141            inner,
142        }
143    }
144
145    /// Diagnostic name of the wrapped legacy hook.
146    #[must_use]
147    pub fn name(&self) -> &str {
148        &self.name
149    }
150}
151
152impl std::fmt::Debug for LegacyHookAdapter {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("LegacyHookAdapter")
155            .field("name", &self.name)
156            .finish_non_exhaustive()
157    }
158}
159
160impl PluginSessionHook for LegacyHookAdapter {
161    fn on_parse(&self, ctx: &ParseContext<'_>) -> HookOutcome {
162        // Synthesize a legacy HookContext at the earliest pre-execution
163        // phase. v1.1 fields on `ParseContext` (query_type, params) now
164        // flow through; hosts that leave them at default still see the
165        // pre-v1.1 behavior (Cypher / empty params).
166        let legacy_ctx = HookContext {
167            session_id: ctx.session_id.to_owned(),
168            query_text: ctx.source.to_owned(),
169            query_type: plugin_query_type_to_legacy(ctx.query_type),
170            params: params_to_legacy(ctx.params),
171        };
172        match self.inner.before_query(&legacy_ctx) {
173            Ok(()) => HookOutcome::Continue,
174            Err(e) => HookOutcome::Reject {
175                reason: e.to_string(),
176            },
177        }
178    }
179
180    fn on_execute_end(&self, ctx: &ExecuteContext<'_>, metrics: &PluginQueryMetrics) {
181        // Legacy after_query expects a `uni_query::QueryMetrics`. The
182        // plugin-side `QueryMetrics` is structurally different; surface
183        // the wall-clock + row count via a fresh, zero-filled legacy
184        // instance so the legacy hook still observes timing data.
185        let legacy_ctx = HookContext {
186            session_id: ctx.session_id.to_owned(),
187            query_text: String::new(),
188            query_type: QueryType::Cypher,
189            params: HashMap::new(),
190        };
191        let legacy_metrics = uni_query::QueryMetrics {
192            total_time: metrics.elapsed,
193            rows_returned: metrics.rows_out as usize,
194            bytes_read: metrics.bytes_read as usize,
195            ..Default::default()
196        };
197        self.inner.after_query(&legacy_ctx, &legacy_metrics);
198    }
199
200    fn before_commit(&self, ctx: &PluginCommitContext<'_>) -> HookOutcome {
201        let legacy_ctx = CommitHookContext {
202            session_id: ctx.session_id.to_owned(),
203            tx_id: String::new(),
204            mutation_count: 0,
205        };
206        match self.inner.before_commit(&legacy_ctx) {
207            Ok(()) => HookOutcome::Continue,
208            Err(e) => HookOutcome::Reject {
209                reason: e.to_string(),
210            },
211        }
212    }
213
214    fn after_commit(&self, ctx: &PluginCommitContext<'_>) {
215        let legacy_ctx = CommitHookContext {
216            session_id: ctx.session_id.to_owned(),
217            tx_id: String::new(),
218            mutation_count: ctx.commit_result.map(|r| r.mutations as usize).unwrap_or(0),
219        };
220        // v1.1 path: if the host populated `commit_result`, mirror its
221        // fields into the legacy `CommitResult`. Otherwise keep the
222        // pre-v1.1 zero-stub behavior for backward compat (the all-zero
223        // `CommitResult::default()`).
224        let result = ctx
225            .commit_result
226            .map(|r| CommitResult {
227                mutations_committed: r.mutations as usize,
228                version: r.version,
229                wal_lsn: r.wal_lsn,
230                duration: r.duration,
231                ..CommitResult::default()
232            })
233            .unwrap_or_default();
234        self.inner.after_commit(&legacy_ctx, &result);
235    }
236
237    // Phases not modeled by the legacy trait pass through with no-op
238    // defaults; `#[non_exhaustive]` context types are tolerated.
239    fn on_analyze(&self, _ctx: &AnalyzeContext<'_>) -> HookOutcome {
240        HookOutcome::Continue
241    }
242    fn on_plan(&self, _ctx: &PlanContext<'_>) -> HookOutcome {
243        HookOutcome::Continue
244    }
245    fn on_execute_start(&self, _ctx: &ExecuteContext<'_>) -> HookOutcome {
246        HookOutcome::Continue
247    }
248    fn on_abort(&self, _ctx: &AbortContext<'_>) {}
249}
250
251/// Translate `uni-plugin`'s phased [`PluginQueryType`] enum to the
252/// host's legacy [`QueryType`]. Both enums carry the same three
253/// variants; we keep them separate so `uni-plugin` doesn't need a
254/// `uni-db` dep.
255fn plugin_query_type_to_legacy(t: PluginQueryType) -> QueryType {
256    match t {
257        PluginQueryType::Cypher => QueryType::Cypher,
258        PluginQueryType::Locy => QueryType::Locy,
259        PluginQueryType::Execute => QueryType::Execute,
260    }
261}
262
263/// Best-effort conversion from the Arrow-shaped phased
264/// `&[(SmolStr, ScalarValue)]` params slice to the legacy
265/// `HashMap<String, Value>` shape. Primitive types (bool, int, float,
266/// string, bytes) map directly; everything else (lists, structs, …)
267/// is surfaced as [`Value::Null`] with a tracing warning so the legacy
268/// hook still sees the key.
269fn params_to_legacy<S: AsRef<str>>(
270    params: &[(S, ScalarValue)],
271) -> HashMap<String, uni_common::Value> {
272    params
273        .iter()
274        .map(|(k, v)| (k.as_ref().to_owned(), scalar_to_value(v)))
275        .collect()
276}
277
278// ============================================================================
279// M5e — `BuiltinHookPlugin`: turns a legacy [`SessionHook`] into a
280// [`uni_plugin::Plugin`] so it can be installed through
281// [`crate::api::Uni::add_plugin`]. This is the public sugar the
282// `Session::add_hook` deprecation note has been pointing at.
283// ============================================================================
284
285use std::sync::atomic::{AtomicU64, Ordering};
286
287use uni_plugin::{
288    AbiRange, Capability, CapabilitySet, Determinism, Plugin, PluginError, PluginId,
289    PluginManifest, PluginRegistrar, ProvidedSurfaces, Scope, SideEffects as PluginSideEffects,
290};
291
292/// Wraps a legacy [`SessionHook`] in a [`Plugin`] so the host can install
293/// it through `Uni::add_plugin`. The wrapped hook is registered via
294/// [`LegacyHookAdapter`] so it participates in the phased
295/// `uni_plugin::SessionHook` dispatch chain.
296///
297/// Each `BuiltinHookPlugin::new` call mints a unique plugin id from a
298/// monotonic atomic counter so repeated `add_hook`-style registrations
299/// never collide on the registry. The id is not meant to be addressable
300/// externally — it's an implementation detail of the registration path.
301pub struct BuiltinHookPlugin {
302    manifest: PluginManifest,
303    adapter: Arc<LegacyHookAdapter>,
304}
305
306static BUILTIN_HOOK_PLUGIN_SEQ: AtomicU64 = AtomicU64::new(0);
307
308impl BuiltinHookPlugin {
309    /// Wrap a legacy [`SessionHook`] in a plugin wrapper. `name` is used
310    /// as the diagnostic label on the wrapped adapter; it is not the
311    /// plugin id (which is generated from a static counter for
312    /// uniqueness).
313    #[must_use]
314    pub fn new(name: impl Into<String>, hook: Arc<dyn SessionHook>) -> Self {
315        let name = name.into();
316        let seq = BUILTIN_HOOK_PLUGIN_SEQ.fetch_add(1, Ordering::Relaxed);
317        let id = PluginId::new(format!("builtin.hook.{seq}"));
318        let manifest = PluginManifest {
319            id,
320            version: "1.0.0".parse().expect("static version parses"),
321            abi: AbiRange::parse("^1").expect("static ABI range parses"),
322            depends_on: vec![],
323            capabilities: CapabilitySet::from_iter_of([Capability::Hook]),
324            determinism: Determinism::Nondeterministic,
325            side_effects: PluginSideEffects::ReadOnly,
326            scope: Scope::Instance,
327            hash: None,
328            signature: None,
329            provides: ProvidedSurfaces::default(),
330            docs: "BuiltinHookPlugin — legacy SessionHook adapter".to_owned(),
331            metadata: std::collections::BTreeMap::new(),
332        };
333        Self {
334            manifest,
335            adapter: Arc::new(LegacyHookAdapter::new(name, hook)),
336        }
337    }
338}
339
340impl Plugin for BuiltinHookPlugin {
341    fn manifest(&self) -> &PluginManifest {
342        &self.manifest
343    }
344
345    fn register(&self, r: &mut PluginRegistrar<'_>) -> std::result::Result<(), PluginError> {
346        r.hook(Arc::clone(&self.adapter) as Arc<dyn PluginSessionHook>)?;
347        Ok(())
348    }
349}
350
351fn scalar_to_value(v: &ScalarValue) -> uni_common::Value {
352    use uni_common::Value;
353    match v {
354        ScalarValue::Null => Value::Null,
355        ScalarValue::Boolean(Some(b)) => Value::Bool(*b),
356        ScalarValue::Int8(Some(i)) => Value::Int(i64::from(*i)),
357        ScalarValue::Int16(Some(i)) => Value::Int(i64::from(*i)),
358        ScalarValue::Int32(Some(i)) => Value::Int(i64::from(*i)),
359        ScalarValue::Int64(Some(i)) => Value::Int(*i),
360        ScalarValue::UInt8(Some(i)) => Value::Int(i64::from(*i)),
361        ScalarValue::UInt16(Some(i)) => Value::Int(i64::from(*i)),
362        ScalarValue::UInt32(Some(i)) => Value::Int(i64::from(*i)),
363        ScalarValue::UInt64(Some(i)) => {
364            // Truncating cast — values above i64::MAX (extremely rare for
365            // user-supplied params) saturate to i64::MAX.
366            Value::Int(i64::try_from(*i).unwrap_or(i64::MAX))
367        }
368        ScalarValue::Float32(Some(f)) => Value::Float(f64::from(*f)),
369        ScalarValue::Float64(Some(f)) => Value::Float(*f),
370        ScalarValue::Utf8(Some(s))
371        | ScalarValue::LargeUtf8(Some(s))
372        | ScalarValue::Utf8View(Some(s)) => Value::String(s.clone()),
373        ScalarValue::Binary(Some(b))
374        | ScalarValue::LargeBinary(Some(b))
375        | ScalarValue::BinaryView(Some(b)) => Value::Bytes(b.clone()),
376        other => {
377            tracing::warn!(
378                "LegacyHookAdapter::params_to_legacy: unsupported ScalarValue \
379                 variant {other:?}; surfacing as Value::Null. Hooks needing \
380                 typed access should register against the phased trait."
381            );
382            Value::Null
383        }
384    }
385}