1use std::sync::Arc;
9
10use smol_str::SmolStr;
11
12use crate::capability::{Capability, CapabilitySet};
13use crate::errors::PluginError;
14use crate::plugin::PluginId;
15use crate::qname::QName;
16use crate::registry::PluginRegistry;
17use crate::surfaces::{
18 AggregateSurface, AlgorithmSurface, AppendReg, AuthSurface, AuthzSurface, BackgroundJobSurface,
19 CatalogSurface, CdcSurface, CollationSurface, ConnectorSurface, CrdtSurface,
20 DynPendingRegistration, HookSurface, IndexKindSurface, KeyedUniqueReg, LabelStorageSurface,
21 LocyAggregateSurface, LocyPredicateSurface, LogicalTypeSurface, NamedUniqueReg,
22 OperatorSurface, OptimizerRuleSurface, PregelSurface, ProcedureSurface, ReplacementScanSurface,
23 ScalarSurface, StorageBackendSurface, TriggerSurface, VersionedReg, WindowSurface,
24};
25use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
26use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
27use crate::traits::background::BackgroundJobProvider;
28use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
29use crate::traits::cdc::CdcOutputProvider;
30use crate::traits::collation::CollationProvider;
31use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
32use crate::traits::crdt::{CrdtKind, CrdtKindProvider};
33use crate::traits::hook::SessionHook;
34use crate::traits::index::{IndexKind, IndexKindProvider};
35use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
36use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
37use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
38use crate::traits::scalar::{FnSignature, ScalarPluginFn};
39use crate::traits::storage::StorageBackend;
40use crate::traits::trigger::TriggerPlugin;
41use crate::traits::types::LogicalTypeProvider;
42use crate::traits::window::{WindowPluginFn, WindowSignature};
43
44pub struct PluginRegistrar<'a> {
55 plugin_id: PluginId,
56 effective_caps: &'a CapabilitySet,
57 registry: &'a PluginRegistry,
58 pending: Vec<Box<dyn DynPendingRegistration>>,
59}
60
61impl<'a> std::fmt::Debug for PluginRegistrar<'a> {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("PluginRegistrar")
64 .field("plugin_id", &self.plugin_id)
65 .field("pending", &self.pending.len())
66 .finish_non_exhaustive()
67 }
68}
69
70impl<'a> PluginRegistrar<'a> {
71 #[must_use]
76 pub fn new(
77 plugin_id: PluginId,
78 effective_caps: &'a CapabilitySet,
79 registry: &'a PluginRegistry,
80 ) -> Self {
81 Self {
82 plugin_id,
83 effective_caps,
84 registry,
85 pending: Vec::new(),
86 }
87 }
88
89 #[must_use]
91 pub fn plugin_id(&self) -> &PluginId {
92 &self.plugin_id
93 }
94
95 pub fn set_plugin_id(&mut self, plugin_id: PluginId) {
103 self.plugin_id = plugin_id;
104 }
105
106 fn require(&self, cap: &Capability) -> Result<(), PluginError> {
107 if self.effective_caps.contains_variant(cap) {
108 Ok(())
109 } else {
110 Err(PluginError::CapabilityRequired(cap.clone()))
111 }
112 }
113
114 fn validate_qname(&self, qname: &QName) -> Result<(), PluginError> {
115 if !qname.is_builtin() && qname.namespace() != self.plugin_id.as_str() {
116 return Err(PluginError::internal(format!(
117 "plugin `{}` cannot register qname `{}` outside its namespace",
118 self.plugin_id, qname
119 )));
120 }
121 Ok(())
122 }
123
124 pub fn scalar_fn(
132 &mut self,
133 qname: QName,
134 sig: FnSignature,
135 f: Arc<dyn ScalarPluginFn>,
136 ) -> Result<&mut Self, PluginError> {
137 self.require(&Capability::ScalarFn)?;
138 self.validate_qname(&qname)?;
139 self.pending.push(Box::new(NamedUniqueReg::<ScalarSurface> {
140 q: qname,
141 sig,
142 provider: f,
143 }));
144 Ok(self)
145 }
146
147 pub fn aggregate_fn(
153 &mut self,
154 qname: QName,
155 sig: AggSignature,
156 f: Arc<dyn AggregatePluginFn>,
157 ) -> Result<&mut Self, PluginError> {
158 self.require(&Capability::AggregateFn)?;
159 self.validate_qname(&qname)?;
160 self.pending
161 .push(Box::new(NamedUniqueReg::<AggregateSurface> {
162 q: qname,
163 sig,
164 provider: f,
165 }));
166 Ok(self)
167 }
168
169 pub fn window_fn(
175 &mut self,
176 qname: QName,
177 sig: WindowSignature,
178 f: Arc<dyn WindowPluginFn>,
179 ) -> Result<&mut Self, PluginError> {
180 self.require(&Capability::WindowFn)?;
181 self.validate_qname(&qname)?;
182 self.pending.push(Box::new(NamedUniqueReg::<WindowSurface> {
183 q: qname,
184 sig,
185 provider: f,
186 }));
187 Ok(self)
188 }
189
190 pub fn procedure(
197 &mut self,
198 qname: QName,
199 sig: ProcedureSignature,
200 p: Arc<dyn ProcedurePlugin>,
201 ) -> Result<&mut Self, PluginError> {
202 use crate::traits::procedure::ProcedureMode;
203 self.require(&Capability::Procedure)?;
204 match sig.mode {
205 ProcedureMode::Write => self.require(&Capability::ProcedureWrites)?,
206 ProcedureMode::Schema => self.require(&Capability::ProcedureSchema)?,
207 ProcedureMode::Dbms => self.require(&Capability::ProcedureDbms)?,
208 ProcedureMode::Read => {}
209 }
210 self.validate_qname(&qname)?;
211 self.pending
212 .push(Box::new(VersionedReg::<ProcedureSurface> {
213 q: qname,
214 sig,
215 provider: p,
216 }));
217 Ok(self)
218 }
219
220 pub fn locy_aggregate(
226 &mut self,
227 qname: QName,
228 a: Arc<dyn LocyAggregate>,
229 ) -> Result<&mut Self, PluginError> {
230 self.require(&Capability::LocyAggregate)?;
231 self.validate_qname(&qname)?;
232 self.pending
233 .push(Box::new(NamedUniqueReg::<LocyAggregateSurface> {
234 q: qname,
235 sig: (),
236 provider: a,
237 }));
238 Ok(self)
239 }
240
241 pub fn locy_predicate(
247 &mut self,
248 qname: QName,
249 sig: PredSignature,
250 p: Arc<dyn LocyPredicate>,
251 ) -> Result<&mut Self, PluginError> {
252 self.require(&Capability::LocyPredicate)?;
253 self.validate_qname(&qname)?;
254 self.pending
255 .push(Box::new(NamedUniqueReg::<LocyPredicateSurface> {
256 q: qname,
257 sig,
258 provider: p,
259 }));
260 Ok(self)
261 }
262
263 pub fn operator(
269 &mut self,
270 qname: QName,
271 p: Arc<dyn OperatorProvider>,
272 ) -> Result<&mut Self, PluginError> {
273 self.require(&Capability::Operator)?;
274 self.validate_qname(&qname)?;
275 self.pending
276 .push(Box::new(NamedUniqueReg::<OperatorSurface> {
277 q: qname,
278 sig: (),
279 provider: p,
280 }));
281 Ok(self)
282 }
283
284 pub fn optimizer_rule(
290 &mut self,
291 r: Arc<dyn OptimizerRuleProvider>,
292 ) -> Result<&mut Self, PluginError> {
293 self.require(&Capability::Operator)?;
294 self.pending
295 .push(Box::new(AppendReg::<OptimizerRuleSurface> { provider: r }));
296 Ok(self)
297 }
298
299 pub fn index_kind(
305 &mut self,
306 kind: IndexKind,
307 p: Arc<dyn IndexKindProvider>,
308 ) -> Result<&mut Self, PluginError> {
309 self.require(&Capability::Index)?;
310 self.pending
311 .push(Box::new(KeyedUniqueReg::<IndexKindSurface> {
312 key_override: Some(kind),
313 provider: p,
314 }));
315 Ok(self)
316 }
317
318 pub fn storage_backend(
324 &mut self,
325 scheme: &'static str,
326 b: Arc<dyn StorageBackend>,
327 ) -> Result<&mut Self, PluginError> {
328 self.require(&Capability::Storage)?;
329 self.pending
330 .push(Box::new(KeyedUniqueReg::<StorageBackendSurface> {
331 key_override: Some(SmolStr::new(scheme)),
332 provider: b,
333 }));
334 Ok(self)
335 }
336
337 pub fn label_storage(
349 &mut self,
350 label: impl Into<SmolStr>,
351 storage: Arc<dyn crate::traits::storage::Storage>,
352 ) -> Result<&mut Self, PluginError> {
353 self.require(&Capability::Storage)?;
354 self.pending
355 .push(Box::new(KeyedUniqueReg::<LabelStorageSurface> {
356 key_override: Some(label.into()),
357 provider: storage,
358 }));
359 Ok(self)
360 }
361
362 pub fn algorithm(
368 &mut self,
369 qname: QName,
370 p: Arc<dyn AlgorithmProvider>,
371 ) -> Result<&mut Self, PluginError> {
372 self.require(&Capability::Algorithm)?;
373 self.validate_qname(&qname)?;
374 self.pending
375 .push(Box::new(NamedUniqueReg::<AlgorithmSurface> {
376 q: qname,
377 sig: (),
378 provider: p,
379 }));
380 Ok(self)
381 }
382
383 pub fn pregel(
389 &mut self,
390 qname: QName,
391 p: Arc<dyn PregelProgramProvider>,
392 ) -> Result<&mut Self, PluginError> {
393 self.require(&Capability::Algorithm)?;
394 self.validate_qname(&qname)?;
395 self.pending.push(Box::new(NamedUniqueReg::<PregelSurface> {
396 q: qname,
397 sig: (),
398 provider: p,
399 }));
400 Ok(self)
401 }
402
403 pub fn crdt_kind(
409 &mut self,
410 kind: CrdtKind,
411 p: Arc<dyn CrdtKindProvider>,
412 ) -> Result<&mut Self, PluginError> {
413 self.require(&Capability::Crdt)?;
414 self.pending.push(Box::new(KeyedUniqueReg::<CrdtSurface> {
415 key_override: Some(kind),
416 provider: p,
417 }));
418 Ok(self)
419 }
420
421 pub fn hook(&mut self, h: Arc<dyn SessionHook>) -> Result<&mut Self, PluginError> {
427 self.require(&Capability::Hook)?;
428 self.pending
429 .push(Box::new(AppendReg::<HookSurface> { provider: h }));
430 Ok(self)
431 }
432
433 pub fn logical_type(
439 &mut self,
440 t: Arc<dyn LogicalTypeProvider>,
441 ) -> Result<&mut Self, PluginError> {
442 self.require(&Capability::Type)?;
443 self.pending
444 .push(Box::new(KeyedUniqueReg::<LogicalTypeSurface> {
445 key_override: None,
446 provider: t,
447 }));
448 Ok(self)
449 }
450
451 pub fn auth_provider(&mut self, p: Arc<dyn AuthProvider>) -> Result<&mut Self, PluginError> {
457 self.require(&Capability::Auth)?;
458 self.pending
459 .push(Box::new(AppendReg::<AuthSurface> { provider: p }));
460 Ok(self)
461 }
462
463 pub fn authz_policy(&mut self, p: Arc<dyn AuthzPolicy>) -> Result<&mut Self, PluginError> {
469 self.require(&Capability::Authz)?;
470 self.pending
471 .push(Box::new(AppendReg::<AuthzSurface> { provider: p }));
472 Ok(self)
473 }
474
475 pub fn connector(&mut self, c: Arc<dyn Connector>) -> Result<&mut Self, PluginError> {
481 self.require(&Capability::Connector)?;
482 self.pending
483 .push(Box::new(AppendReg::<ConnectorSurface> { provider: c }));
484 Ok(self)
485 }
486
487 pub fn trigger(&mut self, t: Arc<dyn TriggerPlugin>) -> Result<&mut Self, PluginError> {
493 self.require(&Capability::Trigger)?;
494 self.pending
495 .push(Box::new(AppendReg::<TriggerSurface> { provider: t }));
496 Ok(self)
497 }
498
499 pub fn collation(&mut self, c: Arc<dyn CollationProvider>) -> Result<&mut Self, PluginError> {
505 self.require(&Capability::Collation)?;
506 self.pending
507 .push(Box::new(KeyedUniqueReg::<CollationSurface> {
508 key_override: None,
509 provider: c,
510 }));
511 Ok(self)
512 }
513
514 pub fn cdc_output(&mut self, c: Arc<dyn CdcOutputProvider>) -> Result<&mut Self, PluginError> {
520 self.require(&Capability::Cdc)?;
521 self.pending.push(Box::new(KeyedUniqueReg::<CdcSurface> {
522 key_override: None,
523 provider: c,
524 }));
525 Ok(self)
526 }
527
528 pub fn catalog(&mut self, c: Arc<dyn CatalogProvider>) -> Result<&mut Self, PluginError> {
534 self.require(&Capability::Catalog)?;
535 self.pending
536 .push(Box::new(KeyedUniqueReg::<CatalogSurface> {
537 key_override: None,
538 provider: c,
539 }));
540 Ok(self)
541 }
542
543 pub fn replacement_scan(
549 &mut self,
550 r: Arc<dyn ReplacementScanProvider>,
551 ) -> Result<&mut Self, PluginError> {
552 self.require(&Capability::Catalog)?;
553 self.pending
554 .push(Box::new(AppendReg::<ReplacementScanSurface> {
555 provider: r,
556 }));
557 Ok(self)
558 }
559
560 pub fn background_job(
567 &mut self,
568 j: Arc<dyn BackgroundJobProvider>,
569 ) -> Result<&mut Self, PluginError> {
570 self.require(&Capability::BackgroundJob { max_concurrent: 0 })?;
571 self.pending
572 .push(Box::new(AppendReg::<BackgroundJobSurface> { provider: j }));
573 Ok(self)
574 }
575
576 pub fn commit_to_registry(self) -> Result<(), PluginError> {
587 self.registry.apply_pending(&self.plugin_id, self.pending)
588 }
589
590 #[must_use]
596 pub fn pending_len(&self) -> usize {
597 self.pending.len()
598 }
599}