Skip to main content

uni_plugin/
registrar.rs

1//! The [`PluginRegistrar`] a plugin's `register()` method calls.
2//!
3//! Every registration method is capability-gated against the effective
4//! capability set computed at load time (manifest-declared ∩ host-granted).
5//! Registrations claiming a `QName` that is already taken fail with
6//! [`crate::PluginError::DuplicateRegistration`].
7
8use std::sync::Arc;
9
10use smol_str::SmolStr;
11
12use crate::capability::{Capability, CapabilitySet};
13use crate::errors::PluginError;
14use crate::plugin::PluginId;
15use crate::qname::QName;
16use crate::registry::PluginRegistry;
17use crate::surfaces::{
18    AggregateSurface, AlgorithmSurface, AppendReg, AuthSurface, AuthzSurface, BackgroundJobSurface,
19    CatalogSurface, CdcSurface, CollationSurface, ConnectorSurface, CrdtSurface,
20    DynPendingRegistration, HookSurface, IndexKindSurface, KeyedUniqueReg, LabelStorageSurface,
21    LocyAggregateSurface, LocyPredicateSurface, LogicalTypeSurface, NamedUniqueReg,
22    OperatorSurface, OptimizerRuleSurface, PregelSurface, ProcedureSurface, ReplacementScanSurface,
23    ScalarSurface, StorageBackendSurface, TriggerSurface, VersionedReg, WindowSurface,
24};
25use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
26use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
27use crate::traits::background::BackgroundJobProvider;
28use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
29use crate::traits::cdc::CdcOutputProvider;
30use crate::traits::collation::CollationProvider;
31use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
32use crate::traits::crdt::{CrdtKind, CrdtKindProvider};
33use crate::traits::hook::SessionHook;
34use crate::traits::index::{IndexKind, IndexKindProvider};
35use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
36use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
37use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
38use crate::traits::scalar::{FnSignature, ScalarPluginFn};
39use crate::traits::storage::StorageBackend;
40use crate::traits::trigger::TriggerPlugin;
41use crate::traits::types::LogicalTypeProvider;
42use crate::traits::window::{WindowPluginFn, WindowSignature};
43
44/// The builder passed to [`crate::Plugin::register`].
45///
46/// Each registration method takes a [`QName`] and a trait-object
47/// implementation. The registrar verifies the corresponding capability is
48/// present in the effective set, rejects duplicate qnames, and forwards the
49/// registration to the [`PluginRegistry`].
50///
51/// The registrar is short-lived: one is created per `register()` call;
52/// changes flush to the [`PluginRegistry`] when `register()` returns
53/// successfully. A failed `register()` rolls back any partial state.
54pub struct PluginRegistrar<'a> {
55    plugin_id: PluginId,
56    effective_caps: &'a CapabilitySet,
57    registry: &'a PluginRegistry,
58    pending: Vec<Box<dyn DynPendingRegistration>>,
59    /// QNames of aggregate functions staged via [`Self::aggregate_fn`]. The
60    /// pending registrations are type-erased, so we record aggregate qnames
61    /// separately to let the host loader publish each one's Cypher
62    /// routing hint (`uni_cypher::register_plugin_aggregate`) after a
63    /// successful commit — without this, the Cypher planner classifies
64    /// `RETURN myAgg(x)` as a scalar UDF and fails to resolve it.
65    aggregate_qnames: Vec<QName>,
66}
67
68impl<'a> std::fmt::Debug for PluginRegistrar<'a> {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("PluginRegistrar")
71            .field("plugin_id", &self.plugin_id)
72            .field("pending", &self.pending.len())
73            .finish_non_exhaustive()
74    }
75}
76
77impl<'a> PluginRegistrar<'a> {
78    /// Construct a registrar for the given plugin.
79    ///
80    /// Created by the host loader; plugin authors never construct these
81    /// directly.
82    #[must_use]
83    pub fn new(
84        plugin_id: PluginId,
85        effective_caps: &'a CapabilitySet,
86        registry: &'a PluginRegistry,
87    ) -> Self {
88        Self {
89            plugin_id,
90            effective_caps,
91            registry,
92            pending: Vec::new(),
93            aggregate_qnames: Vec::new(),
94        }
95    }
96
97    /// QNames of aggregate functions staged on this registrar (in registration
98    /// order). The host loader uses these, after a successful
99    /// [`Self::commit_to_registry`], to publish each aggregate's Cypher
100    /// routing hint so `RETURN myAgg(x)` is planned as an aggregate rather
101    /// than a scalar UDF. Empty until [`Self::aggregate_fn`] is called.
102    #[must_use]
103    pub fn staged_aggregate_qnames(&self) -> &[QName] {
104        &self.aggregate_qnames
105    }
106
107    /// Returns the plugin id being registered.
108    #[must_use]
109    pub fn plugin_id(&self) -> &PluginId {
110        &self.plugin_id
111    }
112
113    /// Override the plugin id mid-registration.
114    ///
115    /// Used by external loaders (`uni-plugin-extism`, `uni-plugin-wasm`)
116    /// during their two-pass dance: pass 1 reads the plugin's
117    /// `manifest` export to learn the canonical id, then sets it here
118    /// so that `validate_qname` accepts qnames in the plugin's
119    /// declared namespace.
120    pub fn set_plugin_id(&mut self, plugin_id: PluginId) {
121        self.plugin_id = plugin_id;
122    }
123
124    fn require(&self, cap: &Capability) -> Result<(), PluginError> {
125        if self.effective_caps.contains_variant(cap) {
126            Ok(())
127        } else {
128            Err(PluginError::CapabilityRequired(cap.clone()))
129        }
130    }
131
132    fn validate_qname(&self, qname: &QName) -> Result<(), PluginError> {
133        if !qname.is_builtin() && qname.namespace() != self.plugin_id.as_str() {
134            return Err(PluginError::internal(format!(
135                "plugin `{}` cannot register qname `{}` outside its namespace",
136                self.plugin_id, qname
137            )));
138        }
139        Ok(())
140    }
141
142    /// Register a Cypher scalar function.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::ScalarFn`]
147    /// is absent, or [`PluginError::DuplicateRegistration`] (raised at
148    /// commit time) on qname collision.
149    pub fn scalar_fn(
150        &mut self,
151        qname: QName,
152        sig: FnSignature,
153        f: Arc<dyn ScalarPluginFn>,
154    ) -> Result<&mut Self, PluginError> {
155        self.require(&Capability::ScalarFn)?;
156        self.validate_qname(&qname)?;
157        self.pending.push(Box::new(NamedUniqueReg::<ScalarSurface> {
158            q: qname,
159            sig,
160            provider: f,
161        }));
162        Ok(self)
163    }
164
165    /// Register a Cypher aggregate function.
166    ///
167    /// # Errors
168    ///
169    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::AggregateFn`] is absent.
170    pub fn aggregate_fn(
171        &mut self,
172        qname: QName,
173        sig: AggSignature,
174        f: Arc<dyn AggregatePluginFn>,
175    ) -> Result<&mut Self, PluginError> {
176        self.require(&Capability::AggregateFn)?;
177        self.validate_qname(&qname)?;
178        self.aggregate_qnames.push(qname.clone());
179        self.pending
180            .push(Box::new(NamedUniqueReg::<AggregateSurface> {
181                q: qname,
182                sig,
183                provider: f,
184            }));
185        Ok(self)
186    }
187
188    /// Register a Cypher window function.
189    ///
190    /// # Errors
191    ///
192    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::WindowFn`] is absent.
193    pub fn window_fn(
194        &mut self,
195        qname: QName,
196        sig: WindowSignature,
197        f: Arc<dyn WindowPluginFn>,
198    ) -> Result<&mut Self, PluginError> {
199        self.require(&Capability::WindowFn)?;
200        self.validate_qname(&qname)?;
201        self.pending.push(Box::new(NamedUniqueReg::<WindowSurface> {
202            q: qname,
203            sig,
204            provider: f,
205        }));
206        Ok(self)
207    }
208
209    /// Register a Cypher procedure.
210    ///
211    /// # Errors
212    ///
213    /// Returns [`PluginError::CapabilityRequired`] if the procedure's mode's
214    /// required capability is absent.
215    pub fn procedure(
216        &mut self,
217        qname: QName,
218        sig: ProcedureSignature,
219        p: Arc<dyn ProcedurePlugin>,
220    ) -> Result<&mut Self, PluginError> {
221        use crate::traits::procedure::ProcedureMode;
222        self.require(&Capability::Procedure)?;
223        match sig.mode {
224            ProcedureMode::Write => self.require(&Capability::ProcedureWrites)?,
225            ProcedureMode::Schema => self.require(&Capability::ProcedureSchema)?,
226            ProcedureMode::Dbms => self.require(&Capability::ProcedureDbms)?,
227            ProcedureMode::Read => {}
228        }
229        self.validate_qname(&qname)?;
230        self.pending
231            .push(Box::new(VersionedReg::<ProcedureSurface> {
232                q: qname,
233                sig,
234                provider: p,
235            }));
236        Ok(self)
237    }
238
239    /// Register a Locy aggregate.
240    ///
241    /// # Errors
242    ///
243    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::LocyAggregate`] is absent.
244    pub fn locy_aggregate(
245        &mut self,
246        qname: QName,
247        a: Arc<dyn LocyAggregate>,
248    ) -> Result<&mut Self, PluginError> {
249        self.require(&Capability::LocyAggregate)?;
250        self.validate_qname(&qname)?;
251        self.pending
252            .push(Box::new(NamedUniqueReg::<LocyAggregateSurface> {
253                q: qname,
254                sig: (),
255                provider: a,
256            }));
257        Ok(self)
258    }
259
260    /// Register a Locy predicate.
261    ///
262    /// # Errors
263    ///
264    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::LocyPredicate`] is absent.
265    pub fn locy_predicate(
266        &mut self,
267        qname: QName,
268        sig: PredSignature,
269        p: Arc<dyn LocyPredicate>,
270    ) -> Result<&mut Self, PluginError> {
271        self.require(&Capability::LocyPredicate)?;
272        self.validate_qname(&qname)?;
273        self.pending
274            .push(Box::new(NamedUniqueReg::<LocyPredicateSurface> {
275                q: qname,
276                sig,
277                provider: p,
278            }));
279        Ok(self)
280    }
281
282    /// Register a physical operator.
283    ///
284    /// # Errors
285    ///
286    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Operator`] is absent.
287    pub fn operator(
288        &mut self,
289        qname: QName,
290        p: Arc<dyn OperatorProvider>,
291    ) -> Result<&mut Self, PluginError> {
292        self.require(&Capability::Operator)?;
293        self.validate_qname(&qname)?;
294        self.pending
295            .push(Box::new(NamedUniqueReg::<OperatorSurface> {
296                q: qname,
297                sig: (),
298                provider: p,
299            }));
300        Ok(self)
301    }
302
303    /// Register an optimizer rule.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Operator`] is absent.
308    pub fn optimizer_rule(
309        &mut self,
310        r: Arc<dyn OptimizerRuleProvider>,
311    ) -> Result<&mut Self, PluginError> {
312        self.require(&Capability::Operator)?;
313        self.pending
314            .push(Box::new(AppendReg::<OptimizerRuleSurface> { provider: r }));
315        Ok(self)
316    }
317
318    /// Register an index kind.
319    ///
320    /// # Errors
321    ///
322    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Index`] is absent.
323    pub fn index_kind(
324        &mut self,
325        kind: IndexKind,
326        p: Arc<dyn IndexKindProvider>,
327    ) -> Result<&mut Self, PluginError> {
328        self.require(&Capability::Index)?;
329        self.pending
330            .push(Box::new(KeyedUniqueReg::<IndexKindSurface> {
331                key_override: Some(kind),
332                provider: p,
333            }));
334        Ok(self)
335    }
336
337    /// Register a storage backend by URI scheme.
338    ///
339    /// # Errors
340    ///
341    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Storage`] is absent.
342    pub fn storage_backend(
343        &mut self,
344        scheme: &'static str,
345        b: Arc<dyn StorageBackend>,
346    ) -> Result<&mut Self, PluginError> {
347        self.require(&Capability::Storage)?;
348        self.pending
349            .push(Box::new(KeyedUniqueReg::<StorageBackendSurface> {
350                key_override: Some(SmolStr::new(scheme)),
351                provider: b,
352            }));
353        Ok(self)
354    }
355
356    /// Register a per-label plugin storage (M5h.2).
357    ///
358    /// Native-schema label scans for `label` will be routed through
359    /// `storage` instead of the host's native backend. Distinct from
360    /// [`Self::storage_backend`], which is keyed by URI scheme and
361    /// opens new `Storage` instances on demand.
362    ///
363    /// # Errors
364    ///
365    /// Returns [`PluginError::CapabilityRequired`] if
366    /// [`Capability::Storage`] is absent.
367    pub fn label_storage(
368        &mut self,
369        label: impl Into<SmolStr>,
370        storage: Arc<dyn crate::traits::storage::Storage>,
371    ) -> Result<&mut Self, PluginError> {
372        self.require(&Capability::Storage)?;
373        self.pending
374            .push(Box::new(KeyedUniqueReg::<LabelStorageSurface> {
375                key_override: Some(label.into()),
376                provider: storage,
377            }));
378        Ok(self)
379    }
380
381    /// Register a graph algorithm.
382    ///
383    /// # Errors
384    ///
385    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Algorithm`] is absent.
386    pub fn algorithm(
387        &mut self,
388        qname: QName,
389        p: Arc<dyn AlgorithmProvider>,
390    ) -> Result<&mut Self, PluginError> {
391        self.require(&Capability::Algorithm)?;
392        self.validate_qname(&qname)?;
393        self.pending
394            .push(Box::new(NamedUniqueReg::<AlgorithmSurface> {
395                q: qname,
396                sig: (),
397                provider: p,
398            }));
399        Ok(self)
400    }
401
402    /// Register a Pregel-style algorithm.
403    ///
404    /// # Errors
405    ///
406    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Algorithm`] is absent.
407    pub fn pregel(
408        &mut self,
409        qname: QName,
410        p: Arc<dyn PregelProgramProvider>,
411    ) -> Result<&mut Self, PluginError> {
412        self.require(&Capability::Algorithm)?;
413        self.validate_qname(&qname)?;
414        self.pending.push(Box::new(NamedUniqueReg::<PregelSurface> {
415            q: qname,
416            sig: (),
417            provider: p,
418        }));
419        Ok(self)
420    }
421
422    /// Register a CRDT kind.
423    ///
424    /// # Errors
425    ///
426    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Crdt`] is absent.
427    pub fn crdt_kind(
428        &mut self,
429        kind: CrdtKind,
430        p: Arc<dyn CrdtKindProvider>,
431    ) -> Result<&mut Self, PluginError> {
432        self.require(&Capability::Crdt)?;
433        self.pending.push(Box::new(KeyedUniqueReg::<CrdtSurface> {
434            key_override: Some(kind),
435            provider: p,
436        }));
437        Ok(self)
438    }
439
440    /// Register a session-lifecycle hook.
441    ///
442    /// # Errors
443    ///
444    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Hook`] is absent.
445    pub fn hook(&mut self, h: Arc<dyn SessionHook>) -> Result<&mut Self, PluginError> {
446        self.require(&Capability::Hook)?;
447        self.pending
448            .push(Box::new(AppendReg::<HookSurface> { provider: h }));
449        Ok(self)
450    }
451
452    /// Register a logical type.
453    ///
454    /// # Errors
455    ///
456    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Type`] is absent.
457    pub fn logical_type(
458        &mut self,
459        t: Arc<dyn LogicalTypeProvider>,
460    ) -> Result<&mut Self, PluginError> {
461        self.require(&Capability::Type)?;
462        self.pending
463            .push(Box::new(KeyedUniqueReg::<LogicalTypeSurface> {
464                key_override: None,
465                provider: t,
466            }));
467        Ok(self)
468    }
469
470    /// Register an authentication provider.
471    ///
472    /// # Errors
473    ///
474    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Auth`] is absent.
475    pub fn auth_provider(&mut self, p: Arc<dyn AuthProvider>) -> Result<&mut Self, PluginError> {
476        self.require(&Capability::Auth)?;
477        self.pending
478            .push(Box::new(AppendReg::<AuthSurface> { provider: p }));
479        Ok(self)
480    }
481
482    /// Register an authorization policy.
483    ///
484    /// # Errors
485    ///
486    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Authz`] is absent.
487    pub fn authz_policy(&mut self, p: Arc<dyn AuthzPolicy>) -> Result<&mut Self, PluginError> {
488        self.require(&Capability::Authz)?;
489        self.pending
490            .push(Box::new(AppendReg::<AuthzSurface> { provider: p }));
491        Ok(self)
492    }
493
494    /// Register a wire-protocol connector.
495    ///
496    /// # Errors
497    ///
498    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Connector`] is absent.
499    pub fn connector(&mut self, c: Arc<dyn Connector>) -> Result<&mut Self, PluginError> {
500        self.require(&Capability::Connector)?;
501        self.pending
502            .push(Box::new(AppendReg::<ConnectorSurface> { provider: c }));
503        Ok(self)
504    }
505
506    /// Register a fine-grained trigger.
507    ///
508    /// # Errors
509    ///
510    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Trigger`] is absent.
511    pub fn trigger(&mut self, t: Arc<dyn TriggerPlugin>) -> Result<&mut Self, PluginError> {
512        self.require(&Capability::Trigger)?;
513        self.pending
514            .push(Box::new(AppendReg::<TriggerSurface> { provider: t }));
515        Ok(self)
516    }
517
518    /// Register a collation.
519    ///
520    /// # Errors
521    ///
522    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Collation`] is absent.
523    pub fn collation(&mut self, c: Arc<dyn CollationProvider>) -> Result<&mut Self, PluginError> {
524        self.require(&Capability::Collation)?;
525        self.pending
526            .push(Box::new(KeyedUniqueReg::<CollationSurface> {
527                key_override: None,
528                provider: c,
529            }));
530        Ok(self)
531    }
532
533    /// Register a CDC output sink.
534    ///
535    /// # Errors
536    ///
537    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Cdc`] is absent.
538    pub fn cdc_output(&mut self, c: Arc<dyn CdcOutputProvider>) -> Result<&mut Self, PluginError> {
539        self.require(&Capability::Cdc)?;
540        self.pending.push(Box::new(KeyedUniqueReg::<CdcSurface> {
541            key_override: None,
542            provider: c,
543        }));
544        Ok(self)
545    }
546
547    /// Register a catalog provider.
548    ///
549    /// # Errors
550    ///
551    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Catalog`] is absent.
552    pub fn catalog(&mut self, c: Arc<dyn CatalogProvider>) -> Result<&mut Self, PluginError> {
553        self.require(&Capability::Catalog)?;
554        self.pending
555            .push(Box::new(KeyedUniqueReg::<CatalogSurface> {
556                key_override: None,
557                provider: c,
558            }));
559        Ok(self)
560    }
561
562    /// Register a replacement-scan provider.
563    ///
564    /// # Errors
565    ///
566    /// Returns [`PluginError::CapabilityRequired`] if [`Capability::Catalog`] is absent.
567    pub fn replacement_scan(
568        &mut self,
569        r: Arc<dyn ReplacementScanProvider>,
570    ) -> Result<&mut Self, PluginError> {
571        self.require(&Capability::Catalog)?;
572        self.pending
573            .push(Box::new(AppendReg::<ReplacementScanSurface> {
574                provider: r,
575            }));
576        Ok(self)
577    }
578
579    /// Register a background-job provider.
580    ///
581    /// # Errors
582    ///
583    /// Returns [`PluginError::CapabilityRequired`] if no `BackgroundJob`
584    /// capability variant is present in the effective set.
585    pub fn background_job(
586        &mut self,
587        j: Arc<dyn BackgroundJobProvider>,
588    ) -> Result<&mut Self, PluginError> {
589        self.require(&Capability::BackgroundJob { max_concurrent: 0 })?;
590        self.pending
591            .push(Box::new(AppendReg::<BackgroundJobSurface> { provider: j }));
592        Ok(self)
593    }
594
595    /// Commit batched registrations to the registry.
596    ///
597    /// Called by the host loader after the plugin's `register()` returns
598    /// successfully; failures during `register()` are rolled back by simply
599    /// dropping the registrar without committing.
600    ///
601    /// # Errors
602    ///
603    /// Returns [`PluginError::DuplicateRegistration`] if any pending qname
604    /// is already taken in the registry.
605    pub fn commit_to_registry(self) -> Result<(), PluginError> {
606        self.registry.apply_pending(&self.plugin_id, self.pending)
607    }
608
609    /// Returns the number of pending registrations.
610    ///
611    /// Exposed for diagnostics and integration tests that want to verify
612    /// a plugin's `register()` queued the expected number of items before
613    /// the registrar commits.
614    #[must_use]
615    pub fn pending_len(&self) -> usize {
616        self.pending.len()
617    }
618}