1use std::sync::Arc;
9
10use smol_str::SmolStr;
11
12use crate::capability::{Capability, CapabilitySet};
13use crate::errors::PluginError;
14use crate::plugin::PluginId;
15use crate::qname::QName;
16use crate::registry::PluginRegistry;
17use crate::surfaces::{
18 AggregateSurface, AlgorithmSurface, AppendReg, AuthSurface, AuthzSurface, BackgroundJobSurface,
19 CatalogSurface, CdcSurface, CollationSurface, ConnectorSurface, CrdtSurface,
20 DynPendingRegistration, HookSurface, IndexKindSurface, KeyedUniqueReg, LabelStorageSurface,
21 LocyAggregateSurface, LocyPredicateSurface, LogicalTypeSurface, NamedUniqueReg,
22 OperatorSurface, OptimizerRuleSurface, PregelSurface, ProcedureSurface, ReplacementScanSurface,
23 ScalarSurface, StorageBackendSurface, TriggerSurface, VersionedReg, WindowSurface,
24};
25use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
26use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
27use crate::traits::background::BackgroundJobProvider;
28use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
29use crate::traits::cdc::CdcOutputProvider;
30use crate::traits::collation::CollationProvider;
31use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
32use crate::traits::crdt::{CrdtKind, CrdtKindProvider};
33use crate::traits::hook::SessionHook;
34use crate::traits::index::{IndexKind, IndexKindProvider};
35use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
36use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
37use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
38use crate::traits::scalar::{FnSignature, ScalarPluginFn};
39use crate::traits::storage::StorageBackend;
40use crate::traits::trigger::TriggerPlugin;
41use crate::traits::types::LogicalTypeProvider;
42use crate::traits::window::{WindowPluginFn, WindowSignature};
43
44pub struct PluginRegistrar<'a> {
55 plugin_id: PluginId,
56 effective_caps: &'a CapabilitySet,
57 registry: &'a PluginRegistry,
58 pending: Vec<Box<dyn DynPendingRegistration>>,
59 aggregate_qnames: Vec<QName>,
66}
67
68impl<'a> std::fmt::Debug for PluginRegistrar<'a> {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("PluginRegistrar")
71 .field("plugin_id", &self.plugin_id)
72 .field("pending", &self.pending.len())
73 .finish_non_exhaustive()
74 }
75}
76
77impl<'a> PluginRegistrar<'a> {
78 #[must_use]
83 pub fn new(
84 plugin_id: PluginId,
85 effective_caps: &'a CapabilitySet,
86 registry: &'a PluginRegistry,
87 ) -> Self {
88 Self {
89 plugin_id,
90 effective_caps,
91 registry,
92 pending: Vec::new(),
93 aggregate_qnames: Vec::new(),
94 }
95 }
96
97 #[must_use]
103 pub fn staged_aggregate_qnames(&self) -> &[QName] {
104 &self.aggregate_qnames
105 }
106
107 #[must_use]
109 pub fn plugin_id(&self) -> &PluginId {
110 &self.plugin_id
111 }
112
113 pub fn set_plugin_id(&mut self, plugin_id: PluginId) {
121 self.plugin_id = plugin_id;
122 }
123
124 fn require(&self, cap: &Capability) -> Result<(), PluginError> {
125 if self.effective_caps.contains_variant(cap) {
126 Ok(())
127 } else {
128 Err(PluginError::CapabilityRequired(cap.clone()))
129 }
130 }
131
132 fn validate_qname(&self, qname: &QName) -> Result<(), PluginError> {
133 if !qname.is_builtin() && qname.namespace() != self.plugin_id.as_str() {
134 return Err(PluginError::internal(format!(
135 "plugin `{}` cannot register qname `{}` outside its namespace",
136 self.plugin_id, qname
137 )));
138 }
139 Ok(())
140 }
141
142 pub fn scalar_fn(
150 &mut self,
151 qname: QName,
152 sig: FnSignature,
153 f: Arc<dyn ScalarPluginFn>,
154 ) -> Result<&mut Self, PluginError> {
155 self.require(&Capability::ScalarFn)?;
156 self.validate_qname(&qname)?;
157 self.pending.push(Box::new(NamedUniqueReg::<ScalarSurface> {
158 q: qname,
159 sig,
160 provider: f,
161 }));
162 Ok(self)
163 }
164
165 pub fn aggregate_fn(
171 &mut self,
172 qname: QName,
173 sig: AggSignature,
174 f: Arc<dyn AggregatePluginFn>,
175 ) -> Result<&mut Self, PluginError> {
176 self.require(&Capability::AggregateFn)?;
177 self.validate_qname(&qname)?;
178 self.aggregate_qnames.push(qname.clone());
179 self.pending
180 .push(Box::new(NamedUniqueReg::<AggregateSurface> {
181 q: qname,
182 sig,
183 provider: f,
184 }));
185 Ok(self)
186 }
187
188 pub fn window_fn(
194 &mut self,
195 qname: QName,
196 sig: WindowSignature,
197 f: Arc<dyn WindowPluginFn>,
198 ) -> Result<&mut Self, PluginError> {
199 self.require(&Capability::WindowFn)?;
200 self.validate_qname(&qname)?;
201 self.pending.push(Box::new(NamedUniqueReg::<WindowSurface> {
202 q: qname,
203 sig,
204 provider: f,
205 }));
206 Ok(self)
207 }
208
209 pub fn procedure(
216 &mut self,
217 qname: QName,
218 sig: ProcedureSignature,
219 p: Arc<dyn ProcedurePlugin>,
220 ) -> Result<&mut Self, PluginError> {
221 use crate::traits::procedure::ProcedureMode;
222 self.require(&Capability::Procedure)?;
223 match sig.mode {
224 ProcedureMode::Write => self.require(&Capability::ProcedureWrites)?,
225 ProcedureMode::Schema => self.require(&Capability::ProcedureSchema)?,
226 ProcedureMode::Dbms => self.require(&Capability::ProcedureDbms)?,
227 ProcedureMode::Read => {}
228 }
229 self.validate_qname(&qname)?;
230 self.pending
231 .push(Box::new(VersionedReg::<ProcedureSurface> {
232 q: qname,
233 sig,
234 provider: p,
235 }));
236 Ok(self)
237 }
238
239 pub fn locy_aggregate(
245 &mut self,
246 qname: QName,
247 a: Arc<dyn LocyAggregate>,
248 ) -> Result<&mut Self, PluginError> {
249 self.require(&Capability::LocyAggregate)?;
250 self.validate_qname(&qname)?;
251 self.pending
252 .push(Box::new(NamedUniqueReg::<LocyAggregateSurface> {
253 q: qname,
254 sig: (),
255 provider: a,
256 }));
257 Ok(self)
258 }
259
260 pub fn locy_predicate(
266 &mut self,
267 qname: QName,
268 sig: PredSignature,
269 p: Arc<dyn LocyPredicate>,
270 ) -> Result<&mut Self, PluginError> {
271 self.require(&Capability::LocyPredicate)?;
272 self.validate_qname(&qname)?;
273 self.pending
274 .push(Box::new(NamedUniqueReg::<LocyPredicateSurface> {
275 q: qname,
276 sig,
277 provider: p,
278 }));
279 Ok(self)
280 }
281
282 pub fn operator(
288 &mut self,
289 qname: QName,
290 p: Arc<dyn OperatorProvider>,
291 ) -> Result<&mut Self, PluginError> {
292 self.require(&Capability::Operator)?;
293 self.validate_qname(&qname)?;
294 self.pending
295 .push(Box::new(NamedUniqueReg::<OperatorSurface> {
296 q: qname,
297 sig: (),
298 provider: p,
299 }));
300 Ok(self)
301 }
302
303 pub fn optimizer_rule(
309 &mut self,
310 r: Arc<dyn OptimizerRuleProvider>,
311 ) -> Result<&mut Self, PluginError> {
312 self.require(&Capability::Operator)?;
313 self.pending
314 .push(Box::new(AppendReg::<OptimizerRuleSurface> { provider: r }));
315 Ok(self)
316 }
317
318 pub fn index_kind(
324 &mut self,
325 kind: IndexKind,
326 p: Arc<dyn IndexKindProvider>,
327 ) -> Result<&mut Self, PluginError> {
328 self.require(&Capability::Index)?;
329 self.pending
330 .push(Box::new(KeyedUniqueReg::<IndexKindSurface> {
331 key_override: Some(kind),
332 provider: p,
333 }));
334 Ok(self)
335 }
336
337 pub fn storage_backend(
343 &mut self,
344 scheme: &'static str,
345 b: Arc<dyn StorageBackend>,
346 ) -> Result<&mut Self, PluginError> {
347 self.require(&Capability::Storage)?;
348 self.pending
349 .push(Box::new(KeyedUniqueReg::<StorageBackendSurface> {
350 key_override: Some(SmolStr::new(scheme)),
351 provider: b,
352 }));
353 Ok(self)
354 }
355
356 pub fn label_storage(
368 &mut self,
369 label: impl Into<SmolStr>,
370 storage: Arc<dyn crate::traits::storage::Storage>,
371 ) -> Result<&mut Self, PluginError> {
372 self.require(&Capability::Storage)?;
373 self.pending
374 .push(Box::new(KeyedUniqueReg::<LabelStorageSurface> {
375 key_override: Some(label.into()),
376 provider: storage,
377 }));
378 Ok(self)
379 }
380
381 pub fn algorithm(
387 &mut self,
388 qname: QName,
389 p: Arc<dyn AlgorithmProvider>,
390 ) -> Result<&mut Self, PluginError> {
391 self.require(&Capability::Algorithm)?;
392 self.validate_qname(&qname)?;
393 self.pending
394 .push(Box::new(NamedUniqueReg::<AlgorithmSurface> {
395 q: qname,
396 sig: (),
397 provider: p,
398 }));
399 Ok(self)
400 }
401
402 pub fn pregel(
408 &mut self,
409 qname: QName,
410 p: Arc<dyn PregelProgramProvider>,
411 ) -> Result<&mut Self, PluginError> {
412 self.require(&Capability::Algorithm)?;
413 self.validate_qname(&qname)?;
414 self.pending.push(Box::new(NamedUniqueReg::<PregelSurface> {
415 q: qname,
416 sig: (),
417 provider: p,
418 }));
419 Ok(self)
420 }
421
422 pub fn crdt_kind(
428 &mut self,
429 kind: CrdtKind,
430 p: Arc<dyn CrdtKindProvider>,
431 ) -> Result<&mut Self, PluginError> {
432 self.require(&Capability::Crdt)?;
433 self.pending.push(Box::new(KeyedUniqueReg::<CrdtSurface> {
434 key_override: Some(kind),
435 provider: p,
436 }));
437 Ok(self)
438 }
439
440 pub fn hook(&mut self, h: Arc<dyn SessionHook>) -> Result<&mut Self, PluginError> {
446 self.require(&Capability::Hook)?;
447 self.pending
448 .push(Box::new(AppendReg::<HookSurface> { provider: h }));
449 Ok(self)
450 }
451
452 pub fn logical_type(
458 &mut self,
459 t: Arc<dyn LogicalTypeProvider>,
460 ) -> Result<&mut Self, PluginError> {
461 self.require(&Capability::Type)?;
462 self.pending
463 .push(Box::new(KeyedUniqueReg::<LogicalTypeSurface> {
464 key_override: None,
465 provider: t,
466 }));
467 Ok(self)
468 }
469
470 pub fn auth_provider(&mut self, p: Arc<dyn AuthProvider>) -> Result<&mut Self, PluginError> {
476 self.require(&Capability::Auth)?;
477 self.pending
478 .push(Box::new(AppendReg::<AuthSurface> { provider: p }));
479 Ok(self)
480 }
481
482 pub fn authz_policy(&mut self, p: Arc<dyn AuthzPolicy>) -> Result<&mut Self, PluginError> {
488 self.require(&Capability::Authz)?;
489 self.pending
490 .push(Box::new(AppendReg::<AuthzSurface> { provider: p }));
491 Ok(self)
492 }
493
494 pub fn connector(&mut self, c: Arc<dyn Connector>) -> Result<&mut Self, PluginError> {
500 self.require(&Capability::Connector)?;
501 self.pending
502 .push(Box::new(AppendReg::<ConnectorSurface> { provider: c }));
503 Ok(self)
504 }
505
506 pub fn trigger(&mut self, t: Arc<dyn TriggerPlugin>) -> Result<&mut Self, PluginError> {
512 self.require(&Capability::Trigger)?;
513 self.pending
514 .push(Box::new(AppendReg::<TriggerSurface> { provider: t }));
515 Ok(self)
516 }
517
518 pub fn collation(&mut self, c: Arc<dyn CollationProvider>) -> Result<&mut Self, PluginError> {
524 self.require(&Capability::Collation)?;
525 self.pending
526 .push(Box::new(KeyedUniqueReg::<CollationSurface> {
527 key_override: None,
528 provider: c,
529 }));
530 Ok(self)
531 }
532
533 pub fn cdc_output(&mut self, c: Arc<dyn CdcOutputProvider>) -> Result<&mut Self, PluginError> {
539 self.require(&Capability::Cdc)?;
540 self.pending.push(Box::new(KeyedUniqueReg::<CdcSurface> {
541 key_override: None,
542 provider: c,
543 }));
544 Ok(self)
545 }
546
547 pub fn catalog(&mut self, c: Arc<dyn CatalogProvider>) -> Result<&mut Self, PluginError> {
553 self.require(&Capability::Catalog)?;
554 self.pending
555 .push(Box::new(KeyedUniqueReg::<CatalogSurface> {
556 key_override: None,
557 provider: c,
558 }));
559 Ok(self)
560 }
561
562 pub fn replacement_scan(
568 &mut self,
569 r: Arc<dyn ReplacementScanProvider>,
570 ) -> Result<&mut Self, PluginError> {
571 self.require(&Capability::Catalog)?;
572 self.pending
573 .push(Box::new(AppendReg::<ReplacementScanSurface> {
574 provider: r,
575 }));
576 Ok(self)
577 }
578
579 pub fn background_job(
586 &mut self,
587 j: Arc<dyn BackgroundJobProvider>,
588 ) -> Result<&mut Self, PluginError> {
589 self.require(&Capability::BackgroundJob { max_concurrent: 0 })?;
590 self.pending
591 .push(Box::new(AppendReg::<BackgroundJobSurface> { provider: j }));
592 Ok(self)
593 }
594
595 pub fn commit_to_registry(self) -> Result<(), PluginError> {
606 self.registry.apply_pending(&self.plugin_id, self.pending)
607 }
608
609 #[must_use]
615 pub fn pending_len(&self) -> usize {
616 self.pending.len()
617 }
618}