1use std::collections::HashMap;
9use std::sync::Arc;
10
11use arc_swap::ArcSwap;
12use dashmap::DashMap;
13use parking_lot::{Mutex, RwLock};
14use smol_str::SmolStr;
15
16use crate::errors::PluginError;
17use crate::plugin::PluginId;
18use crate::qname::QName;
19use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
20use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
21use crate::traits::background::BackgroundJobProvider;
22use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
23use crate::traits::cdc::CdcOutputProvider;
24use crate::traits::collation::CollationProvider;
25use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
26use crate::traits::crdt::{CrdtKind, CrdtKindProvider};
27use crate::traits::hook::SessionHook;
28use crate::traits::index::{IndexHandle, IndexKind, IndexKindProvider};
29use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
30use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
31use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
32use crate::traits::scalar::{FnSignature, ScalarPluginFn};
33use crate::traits::storage::StorageBackend;
34use crate::traits::trigger::TriggerPlugin;
35use crate::traits::types::LogicalTypeProvider;
36use crate::traits::window::{WindowPluginFn, WindowSignature};
37
38pub struct ScalarEntry {
40 pub plugin: PluginId,
42 pub signature: FnSignature,
44 pub function: Arc<dyn ScalarPluginFn>,
46}
47
48impl std::fmt::Debug for ScalarEntry {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("ScalarEntry")
51 .field("plugin", &self.plugin)
52 .field("signature", &self.signature)
53 .finish_non_exhaustive()
54 }
55}
56
57pub struct AggregateEntry {
59 pub plugin: PluginId,
61 pub signature: AggSignature,
63 pub aggregate: Arc<dyn AggregatePluginFn>,
65}
66
67impl std::fmt::Debug for AggregateEntry {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("AggregateEntry")
70 .field("plugin", &self.plugin)
71 .field("signature", &self.signature)
72 .finish_non_exhaustive()
73 }
74}
75
76pub struct WindowEntry {
78 pub plugin: PluginId,
80 pub signature: WindowSignature,
82 pub window: Arc<dyn WindowPluginFn>,
84}
85
86impl std::fmt::Debug for WindowEntry {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("WindowEntry")
89 .field("plugin", &self.plugin)
90 .field("signature", &self.signature)
91 .finish_non_exhaustive()
92 }
93}
94
95pub struct ProcedureEntry {
97 pub plugin: PluginId,
99 pub signature: ProcedureSignature,
101 pub procedure: Arc<dyn ProcedurePlugin>,
103}
104
105impl std::fmt::Debug for ProcedureEntry {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("ProcedureEntry")
108 .field("plugin", &self.plugin)
109 .field("signature", &self.signature)
110 .finish_non_exhaustive()
111 }
112}
113
114pub struct LocyAggregateEntry {
116 pub plugin: PluginId,
118 pub aggregate: Arc<dyn LocyAggregate>,
120}
121
122impl std::fmt::Debug for LocyAggregateEntry {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 f.debug_struct("LocyAggregateEntry")
125 .field("plugin", &self.plugin)
126 .finish_non_exhaustive()
127 }
128}
129
130pub struct LocyPredicateEntry {
132 pub plugin: PluginId,
134 pub signature: PredSignature,
136 pub predicate: Arc<dyn LocyPredicate>,
138}
139
140impl std::fmt::Debug for LocyPredicateEntry {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("LocyPredicateEntry")
143 .field("plugin", &self.plugin)
144 .field("signature", &self.signature)
145 .finish_non_exhaustive()
146 }
147}
148
149#[derive(Clone)]
164pub struct IndexHandleEntry {
165 pub kind: IndexKind,
168 pub handle: Arc<dyn IndexHandle>,
170}
171
172impl std::fmt::Debug for IndexHandleEntry {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("IndexHandleEntry")
175 .field("kind", &self.kind)
176 .finish_non_exhaustive()
177 }
178}
179
180#[derive(Clone)]
188pub struct VirtualEntry {
189 pub name: SmolStr,
191 pub table: Arc<dyn crate::traits::catalog::CatalogTable>,
193}
194
195impl std::fmt::Debug for VirtualEntry {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 f.debug_struct("VirtualEntry")
198 .field("name", &self.name)
199 .finish_non_exhaustive()
200 }
201}
202
203trait VirtualId:
208 Copy + Eq + Ord + std::hash::Hash + std::fmt::Debug + std::fmt::LowerHex + 'static
209{
210 const START: Self;
212 const SENTINEL: Self;
215 const KIND_LABEL: &'static str;
218
219 fn next(self) -> Self;
222}
223
224impl VirtualId for u16 {
225 const START: Self = uni_common::core::schema::VIRTUAL_LABEL_ID_START;
226 const SENTINEL: Self = uni_common::core::schema::VIRTUAL_LABEL_ID_SENTINEL;
227 const KIND_LABEL: &'static str = "label";
228
229 fn next(self) -> Self {
230 self.saturating_add(1)
231 }
232}
233
234impl VirtualId for u32 {
235 const START: Self = uni_common::core::edge_type::VIRTUAL_EDGE_TYPE_ID_START;
236 const SENTINEL: Self = uni_common::core::edge_type::VIRTUAL_EDGE_TYPE_ID_SENTINEL;
237 const KIND_LABEL: &'static str = "edge-type";
238
239 fn next(self) -> Self {
240 self.saturating_add(1)
241 }
242}
243
244#[derive(Debug)]
249struct VirtualIdSpace<Id: VirtualId> {
250 name_to_id: HashMap<SmolStr, Id>,
251 id_to_entry: HashMap<Id, VirtualEntry>,
252 next_id: Id,
253}
254
255impl<Id: VirtualId> Default for VirtualIdSpace<Id> {
256 fn default() -> Self {
257 Self {
258 name_to_id: HashMap::new(),
259 id_to_entry: HashMap::new(),
260 next_id: Id::START,
261 }
262 }
263}
264
265impl<Id: VirtualId> VirtualIdSpace<Id> {
266 fn register(
270 &mut self,
271 name: SmolStr,
272 table: Arc<dyn crate::traits::catalog::CatalogTable>,
273 ) -> Result<Id, PluginError> {
274 if let Some(&id) = self.name_to_id.get(&name) {
275 self.id_to_entry.insert(
276 id,
277 VirtualEntry {
278 name: name.clone(),
279 table,
280 },
281 );
282 return Ok(id);
283 }
284 if self.next_id >= Id::SENTINEL {
285 return Err(PluginError::Internal(format!(
286 "virtual {}-ID space exhausted ({} slots taken; sentinel {:#x})",
287 Id::KIND_LABEL,
288 self.id_to_entry.len(),
289 Id::SENTINEL,
290 )));
291 }
292 let id = self.next_id;
293 self.next_id = self.next_id.next();
294 self.name_to_id.insert(name.clone(), id);
295 self.id_to_entry.insert(id, VirtualEntry { name, table });
296 Ok(id)
297 }
298}
299
300#[derive(Default, Debug)]
307pub(crate) struct PluginRecord {
308 pub(crate) scalars: Vec<QName>,
309 pub(crate) aggregates: Vec<QName>,
310 pub(crate) windows: Vec<QName>,
311 pub(crate) procedures: Vec<(QName, usize)>,
316 pub(crate) locy_aggregates: Vec<QName>,
317 pub(crate) locy_predicates: Vec<QName>,
318 pub(crate) operators: Vec<QName>,
319 pub(crate) algorithms: Vec<QName>,
320 pub(crate) pregels: Vec<QName>,
321 pub(crate) index_kinds: Vec<IndexKind>,
322 pub(crate) storage_schemes: Vec<SmolStr>,
323 pub(crate) label_storages: Vec<SmolStr>,
324 pub(crate) crdt_kinds: Vec<CrdtKind>,
325 pub(crate) logical_types: Vec<SmolStr>,
329 pub(crate) collations: Vec<SmolStr>,
331 pub(crate) cdc_outputs: Vec<SmolStr>,
333 pub(crate) catalogs: Vec<SmolStr>,
335 pub(crate) hook_count: usize,
336 pub(crate) auth_count: usize,
337 pub(crate) authz_count: usize,
338 pub(crate) connector_count: usize,
339 pub(crate) trigger_count: usize,
340 pub(crate) replacement_scan_count: usize,
341 pub(crate) optimizer_rule_count: usize,
342 pub(crate) background_job_count: usize,
343}
344
345#[derive(Clone, Debug, Default)]
352pub struct PluginRecordSnapshot {
353 pub scalars: Vec<QName>,
355 pub aggregates: Vec<QName>,
357 pub windows: Vec<QName>,
359 pub procedures: Vec<(QName, usize)>,
361 pub locy_aggregates: Vec<QName>,
363 pub locy_predicates: Vec<QName>,
365 pub operators: Vec<QName>,
367 pub algorithms: Vec<QName>,
369 pub pregels: Vec<QName>,
371 pub index_kinds: Vec<IndexKind>,
373 pub storage_schemes: Vec<SmolStr>,
375 pub label_storages: Vec<SmolStr>,
377 pub crdt_kinds: Vec<CrdtKind>,
379 pub logical_types: Vec<SmolStr>,
381 pub collations: Vec<SmolStr>,
383 pub cdc_outputs: Vec<SmolStr>,
385 pub catalogs: Vec<SmolStr>,
387 pub hook_count: usize,
389 pub auth_count: usize,
391 pub authz_count: usize,
393 pub connector_count: usize,
395 pub trigger_count: usize,
397 pub replacement_scan_count: usize,
399 pub optimizer_rule_count: usize,
401 pub background_job_count: usize,
403}
404
405impl From<&PluginRecord> for PluginRecordSnapshot {
406 fn from(r: &PluginRecord) -> Self {
410 Self {
411 scalars: r.scalars.clone(),
412 aggregates: r.aggregates.clone(),
413 windows: r.windows.clone(),
414 procedures: r.procedures.clone(),
415 locy_aggregates: r.locy_aggregates.clone(),
416 locy_predicates: r.locy_predicates.clone(),
417 operators: r.operators.clone(),
418 algorithms: r.algorithms.clone(),
419 pregels: r.pregels.clone(),
420 index_kinds: r.index_kinds.clone(),
421 storage_schemes: r.storage_schemes.clone(),
422 label_storages: r.label_storages.clone(),
423 crdt_kinds: r.crdt_kinds.clone(),
424 logical_types: r.logical_types.clone(),
425 collations: r.collations.clone(),
426 cdc_outputs: r.cdc_outputs.clone(),
427 catalogs: r.catalogs.clone(),
428 hook_count: r.hook_count,
429 auth_count: r.auth_count,
430 authz_count: r.authz_count,
431 connector_count: r.connector_count,
432 trigger_count: r.trigger_count,
433 replacement_scan_count: r.replacement_scan_count,
434 optimizer_rule_count: r.optimizer_rule_count,
435 background_job_count: r.background_job_count,
436 }
437 }
438}
439
440#[derive(Default)]
446pub struct PluginRegistry {
447 pub(crate) scalars: DashMap<QName, Arc<ScalarEntry>>,
448 pub(crate) aggregates: DashMap<QName, Arc<AggregateEntry>>,
449 pub(crate) windows: DashMap<QName, Arc<WindowEntry>>,
450 pub(crate) procedures: DashMap<QName, Vec<Arc<ProcedureEntry>>>,
457 pub(crate) locy_aggregates: DashMap<QName, Arc<LocyAggregateEntry>>,
458 pub(crate) locy_predicates: DashMap<QName, Arc<LocyPredicateEntry>>,
459 pub(crate) operators: DashMap<QName, Arc<dyn OperatorProvider>>,
460 pub(crate) optimizer_rules:
461 ArcSwap<Vec<crate::surfaces::AppendEntry<dyn OptimizerRuleProvider>>>,
462 pub(crate) algorithms: DashMap<QName, Arc<dyn AlgorithmProvider>>,
463 pub(crate) pregels: DashMap<QName, Arc<dyn PregelProgramProvider>>,
464 pub(crate) index_kinds: DashMap<IndexKind, Arc<dyn IndexKindProvider>>,
465 index_handles: DashMap<SmolStr, IndexHandleEntry>,
466 pub(crate) storage_backends: DashMap<SmolStr, Arc<dyn StorageBackend>>,
467 pub(crate) label_storages: DashMap<SmolStr, Arc<dyn crate::traits::storage::Storage>>,
475 pub(crate) crdt_kinds: DashMap<CrdtKind, Arc<dyn CrdtKindProvider>>,
476 pub(crate) hooks: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn SessionHook>>>,
477 pub(crate) logical_types: DashMap<SmolStr, Arc<dyn LogicalTypeProvider>>,
478 pub(crate) auth_providers: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn AuthProvider>>>,
479 pub(crate) authz_policies: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn AuthzPolicy>>>,
480 pub(crate) connectors: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn Connector>>>,
481 pub(crate) triggers: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn TriggerPlugin>>>,
482 pub(crate) collations: DashMap<SmolStr, Arc<dyn CollationProvider>>,
483 pub(crate) cdc_outputs: DashMap<SmolStr, Arc<dyn CdcOutputProvider>>,
484 pub(crate) catalogs: DashMap<SmolStr, Arc<dyn CatalogProvider>>,
485 pub(crate) replacement_scans:
486 ArcSwap<Vec<crate::surfaces::AppendEntry<dyn ReplacementScanProvider>>>,
487 pub(crate) background_jobs:
488 ArcSwap<Vec<crate::surfaces::AppendEntry<dyn BackgroundJobProvider>>>,
489 virtual_labels: Mutex<VirtualIdSpace<u16>>,
495 virtual_edge_types: Mutex<VirtualIdSpace<u32>>,
499 per_plugin: RwLock<dashmap::DashMap<PluginId, PluginRecord>>,
500}
501
502impl std::fmt::Debug for PluginRegistry {
503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504 f.debug_struct("PluginRegistry")
505 .field("scalar_fns", &self.scalars.len())
506 .field("aggregates", &self.aggregates.len())
507 .field("procedures", &self.procedures.len())
508 .field("locy_aggregates", &self.locy_aggregates.len())
509 .field("algorithms", &self.algorithms.len())
510 .field("storage_backends", &self.storage_backends.len())
511 .field("index_kinds", &self.index_kinds.len())
512 .field("hooks", &self.hooks.load().len())
513 .field("plugins", &self.per_plugin.read().len())
514 .finish()
515 }
516}
517
518impl PluginRegistry {
519 #[must_use]
521 pub fn new() -> Self {
522 Self::default()
523 }
524
525 #[must_use]
527 pub fn scalar_fn(&self, q: &QName) -> Option<Arc<ScalarEntry>> {
528 self.scalars.get(q).map(|e| Arc::clone(e.value()))
529 }
530
531 #[must_use]
545 pub fn iter_scalars(&self) -> Vec<(QName, Arc<ScalarEntry>)> {
546 self.scalars
547 .iter()
548 .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
549 .collect()
550 }
551
552 #[must_use]
556 pub fn iter_procedures(&self) -> Vec<(QName, Arc<ProcedureEntry>)> {
557 self.procedures
558 .iter()
559 .flat_map(|kv| {
560 let q = kv.key().clone();
561 kv.value()
562 .iter()
563 .map(move |e| (q.clone(), Arc::clone(e)))
564 .collect::<Vec<_>>()
565 })
566 .collect()
567 }
568
569 #[must_use]
571 pub fn iter_locy_aggregates(&self) -> Vec<(QName, Arc<LocyAggregateEntry>)> {
572 self.locy_aggregates
573 .iter()
574 .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
575 .collect()
576 }
577
578 #[must_use]
580 pub fn iter_algorithms(&self) -> Vec<(QName, Arc<dyn AlgorithmProvider>)> {
581 self.algorithms
582 .iter()
583 .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
584 .collect()
585 }
586
587 #[must_use]
589 pub fn iter_index_kinds(&self) -> Vec<(IndexKind, Arc<dyn IndexKindProvider>)> {
590 self.index_kinds
591 .iter()
592 .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
593 .collect()
594 }
595
596 #[must_use]
601 pub fn catalogs(&self) -> Vec<Arc<dyn CatalogProvider>> {
602 self.catalogs
603 .iter()
604 .map(|kv| Arc::clone(kv.value()))
605 .collect()
606 }
607
608 #[must_use]
610 pub fn aggregate(&self, q: &QName) -> Option<Arc<AggregateEntry>> {
611 self.aggregates.get(q).map(|e| Arc::clone(e.value()))
612 }
613
614 #[must_use]
616 pub fn window(&self, q: &QName) -> Option<Arc<WindowEntry>> {
617 self.windows.get(q).map(|e| Arc::clone(e.value()))
618 }
619
620 #[must_use]
627 pub fn procedure(&self, q: &QName) -> Option<Arc<ProcedureEntry>> {
628 self.procedures
629 .get(q)
630 .and_then(|e| e.value().first().map(Arc::clone))
631 }
632
633 #[must_use]
644 pub fn procedure_with_arity(&self, q: &QName, arity: usize) -> Option<Arc<ProcedureEntry>> {
645 self.procedures.get(q).and_then(|e| {
646 e.value()
647 .iter()
648 .find(|entry| entry.signature.args.len() == arity)
649 .map(Arc::clone)
650 })
651 }
652
653 #[must_use]
659 pub fn procedure_overloads(&self, q: &QName) -> Vec<Arc<ProcedureEntry>> {
660 self.procedures
661 .get(q)
662 .map(|e| e.value().iter().map(Arc::clone).collect())
663 .unwrap_or_default()
664 }
665
666 #[must_use]
668 pub fn locy_aggregate(&self, q: &QName) -> Option<Arc<LocyAggregateEntry>> {
669 self.locy_aggregates.get(q).map(|e| Arc::clone(e.value()))
670 }
671
672 #[must_use]
674 pub fn locy_predicate(&self, q: &QName) -> Option<Arc<LocyPredicateEntry>> {
675 self.locy_predicates.get(q).map(|e| Arc::clone(e.value()))
676 }
677
678 #[must_use]
680 pub fn storage_backend(&self, scheme: &str) -> Option<Arc<dyn StorageBackend>> {
681 self.storage_backends
682 .get(&SmolStr::new(scheme))
683 .map(|e| Arc::clone(e.value()))
684 }
685
686 #[must_use]
692 pub fn lookup_label_storage(
693 &self,
694 label: &str,
695 ) -> Option<Arc<dyn crate::traits::storage::Storage>> {
696 self.label_storages
697 .get(&SmolStr::new(label))
698 .map(|e| Arc::clone(e.value()))
699 }
700
701 #[must_use]
703 pub fn index_kind(&self, k: &IndexKind) -> Option<Arc<dyn IndexKindProvider>> {
704 self.index_kinds.get(k).map(|e| Arc::clone(e.value()))
705 }
706
707 pub fn register_index_handle(
716 &self,
717 name: impl Into<SmolStr>,
718 kind: IndexKind,
719 handle: Arc<dyn IndexHandle>,
720 ) {
721 self.index_handles
722 .insert(name.into(), IndexHandleEntry { kind, handle });
723 }
724
725 #[must_use]
728 pub fn index_handle(&self, name: &str) -> Option<IndexHandleEntry> {
729 self.index_handles
730 .get(&SmolStr::new(name))
731 .map(|e| e.value().clone())
732 }
733
734 pub fn deregister_index_handle(&self, name: &str) -> Option<IndexHandleEntry> {
737 self.index_handles
738 .remove(&SmolStr::new(name))
739 .map(|(_, v)| v)
740 }
741
742 pub fn register_virtual_label(
754 &self,
755 name: impl Into<SmolStr>,
756 table: Arc<dyn crate::traits::catalog::CatalogTable>,
757 ) -> Result<u16, PluginError> {
758 self.virtual_labels.lock().register(name.into(), table)
759 }
760
761 #[must_use]
765 pub fn virtual_label_by_name(&self, name: &str) -> Option<u16> {
766 let inner = self.virtual_labels.lock();
767 inner.name_to_id.get(&SmolStr::new(name)).copied()
768 }
769
770 #[must_use]
773 pub fn virtual_label_by_id(&self, id: u16) -> Option<VirtualEntry> {
774 self.virtual_labels.lock().id_to_entry.get(&id).cloned()
775 }
776
777 pub fn register_virtual_edge_type(
781 &self,
782 name: impl Into<SmolStr>,
783 table: Arc<dyn crate::traits::catalog::CatalogTable>,
784 ) -> Result<u32, PluginError> {
785 self.virtual_edge_types.lock().register(name.into(), table)
786 }
787
788 #[must_use]
790 pub fn virtual_edge_type_by_name(&self, name: &str) -> Option<u32> {
791 let inner = self.virtual_edge_types.lock();
792 inner.name_to_id.get(&SmolStr::new(name)).copied()
793 }
794
795 #[must_use]
797 pub fn virtual_edge_type_by_id(&self, id: u32) -> Option<VirtualEntry> {
798 self.virtual_edge_types.lock().id_to_entry.get(&id).cloned()
799 }
800
801 #[must_use]
803 pub fn algorithm(&self, q: &QName) -> Option<Arc<dyn AlgorithmProvider>> {
804 self.algorithms.get(q).map(|e| Arc::clone(e.value()))
805 }
806
807 #[must_use]
809 pub fn crdt_kind(&self, k: &CrdtKind) -> Option<Arc<dyn CrdtKindProvider>> {
810 self.crdt_kinds.get(k).map(|e| Arc::clone(e.value()))
811 }
812
813 #[must_use]
815 pub fn logical_type(&self, name: &SmolStr) -> Option<Arc<dyn LogicalTypeProvider>> {
816 self.logical_types.get(name).map(|e| Arc::clone(e.value()))
817 }
818
819 #[must_use]
821 pub fn hooks(&self) -> Arc<Vec<Arc<dyn SessionHook>>> {
822 Self::project_append(&self.hooks)
823 }
824
825 #[must_use]
827 pub fn optimizer_rules(&self) -> Arc<Vec<Arc<dyn OptimizerRuleProvider>>> {
828 Self::project_append(&self.optimizer_rules)
829 }
830
831 #[must_use]
833 pub fn triggers(&self) -> Arc<Vec<Arc<dyn TriggerPlugin>>> {
834 Self::project_append(&self.triggers)
835 }
836
837 #[must_use]
842 pub fn cdc_outputs_snapshot(&self) -> Vec<(SmolStr, Arc<dyn CdcOutputProvider>)> {
843 self.cdc_outputs
844 .iter()
845 .map(|e| (e.key().clone(), Arc::clone(e.value())))
846 .collect()
847 }
848
849 #[must_use]
855 pub fn cdc_outputs_is_empty(&self) -> bool {
856 self.cdc_outputs.is_empty()
857 }
858
859 #[must_use]
861 pub fn auth_providers(&self) -> Arc<Vec<Arc<dyn AuthProvider>>> {
862 Self::project_append(&self.auth_providers)
863 }
864
865 #[must_use]
867 pub fn authz_policies(&self) -> Arc<Vec<Arc<dyn AuthzPolicy>>> {
868 Self::project_append(&self.authz_policies)
869 }
870
871 #[must_use]
873 pub fn connectors(&self) -> Arc<Vec<Arc<dyn Connector>>> {
874 Self::project_append(&self.connectors)
875 }
876
877 #[must_use]
879 pub fn replacement_scans(&self) -> Arc<Vec<Arc<dyn ReplacementScanProvider>>> {
880 Self::project_append(&self.replacement_scans)
881 }
882
883 pub(crate) fn apply_pending(
898 &self,
899 plugin_id: &PluginId,
900 pending: Vec<Box<dyn crate::surfaces::DynPendingRegistration>>,
901 ) -> Result<(), PluginError> {
902 for reg in &pending {
903 reg.preflight(self)?;
904 }
905
906 let mut record = PluginRecord::default();
907 for reg in pending {
908 reg.apply(self, plugin_id.clone(), &mut record);
909 }
910
911 self.per_plugin.read().insert(plugin_id.clone(), record);
912
913 Ok(())
914 }
915
916 #[must_use]
918 pub fn background_jobs(&self) -> Arc<Vec<Arc<dyn BackgroundJobProvider>>> {
919 Self::project_append(&self.background_jobs)
920 }
921
922 fn project_append<P: ?Sized>(
934 slot: &ArcSwap<Vec<crate::surfaces::AppendEntry<P>>>,
935 ) -> Arc<Vec<Arc<P>>> {
936 let snap = slot.load();
937 let v: Vec<Arc<P>> = snap.iter().map(|e| Arc::clone(&e.provider)).collect();
938 Arc::new(v)
939 }
940
941 #[must_use]
952 pub fn iter_for_plugin(&self, plugin: &PluginId) -> Option<PluginRecordSnapshot> {
953 let guard = self.per_plugin.read();
954 guard.get(plugin).map(|r| PluginRecordSnapshot::from(&*r))
955 }
956
957 pub fn remove_plugin(&self, plugin: &PluginId) {
965 use crate::surfaces::{
966 AggregateSurface, AlgorithmSurface, AppendOps, AuthSurface, AuthzSurface,
967 BackgroundJobSurface, CatalogSurface, CdcSurface, CollationSurface, ConnectorSurface,
968 CrdtSurface, Discriminator, HookSurface, IndexKindSurface, KeyedUniqueOps,
969 LabelStorageSurface, LocyAggregateSurface, LocyPredicateSurface, LogicalTypeSurface,
970 NamedUniqueOps, OperatorSurface, OptimizerRuleSurface, PregelSurface, ProcedureSurface,
971 ReplacementScanSurface, ScalarSurface, StorageBackendSurface, TriggerSurface,
972 VersionedOps, WindowSurface,
973 };
974
975 let record = self.per_plugin.read().remove(plugin).map(|(_, r)| r);
976 let Some(record) = record else { return };
977
978 for q in record.scalars {
979 <ScalarSurface as NamedUniqueOps>::remove(self, &q);
980 }
981 for q in record.aggregates {
982 <AggregateSurface as NamedUniqueOps>::remove(self, &q);
983 }
984 for q in record.windows {
985 <WindowSurface as NamedUniqueOps>::remove(self, &q);
986 }
987 for (q, arity) in record.procedures {
988 <ProcedureSurface as VersionedOps>::remove(self, &q, Discriminator::Arity(arity));
989 }
990 for q in record.locy_aggregates {
991 <LocyAggregateSurface as NamedUniqueOps>::remove(self, &q);
992 }
993 for q in record.locy_predicates {
994 <LocyPredicateSurface as NamedUniqueOps>::remove(self, &q);
995 }
996 for q in record.operators {
997 <OperatorSurface as NamedUniqueOps>::remove(self, &q);
998 }
999 for q in record.algorithms {
1000 <AlgorithmSurface as NamedUniqueOps>::remove(self, &q);
1001 }
1002 for q in record.pregels {
1003 <PregelSurface as NamedUniqueOps>::remove(self, &q);
1004 }
1005 for k in record.index_kinds {
1006 <IndexKindSurface as KeyedUniqueOps>::remove(self, &k);
1007 }
1008 for s in record.storage_schemes {
1009 <StorageBackendSurface as KeyedUniqueOps>::remove(self, &s);
1010 }
1011 for l in record.label_storages {
1012 <LabelStorageSurface as KeyedUniqueOps>::remove(self, &l);
1013 }
1014 for k in record.crdt_kinds {
1015 <CrdtSurface as KeyedUniqueOps>::remove(self, &k);
1016 }
1017 for k in record.logical_types {
1018 <LogicalTypeSurface as KeyedUniqueOps>::remove(self, &k);
1019 }
1020 for k in record.collations {
1021 <CollationSurface as KeyedUniqueOps>::remove(self, &k);
1022 }
1023 for k in record.cdc_outputs {
1024 <CdcSurface as KeyedUniqueOps>::remove(self, &k);
1025 }
1026 for k in record.catalogs {
1027 <CatalogSurface as KeyedUniqueOps>::remove(self, &k);
1028 }
1029
1030 <OptimizerRuleSurface as AppendOps>::remove_plugin(self, plugin);
1031 <HookSurface as AppendOps>::remove_plugin(self, plugin);
1032 <AuthSurface as AppendOps>::remove_plugin(self, plugin);
1033 <AuthzSurface as AppendOps>::remove_plugin(self, plugin);
1034 <ConnectorSurface as AppendOps>::remove_plugin(self, plugin);
1035 <TriggerSurface as AppendOps>::remove_plugin(self, plugin);
1036 <ReplacementScanSurface as AppendOps>::remove_plugin(self, plugin);
1037 <BackgroundJobSurface as AppendOps>::remove_plugin(self, plugin);
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044
1045 #[test]
1046 fn registry_default_is_empty() {
1047 let r = PluginRegistry::new();
1048 assert!(r.scalar_fn(&QName::builtin("anything")).is_none());
1049 assert!(r.procedure(&QName::builtin("anything")).is_none());
1050 assert_eq!(r.hooks().len(), 0);
1051 }
1052
1053 #[test]
1054 fn debug_smoke() {
1055 let r = PluginRegistry::new();
1056 let s = format!("{r:?}");
1057 assert!(s.contains("PluginRegistry"));
1058 }
1059}