1use std::fmt::Debug;
36use std::hash::Hash;
37use std::sync::Arc;
38
39use arc_swap::ArcSwap;
40use dashmap::DashMap;
41use smol_str::SmolStr;
42
43use crate::errors::PluginError;
44use crate::plugin::PluginId;
45use crate::qname::QName;
46use crate::registry::{
47 AggregateEntry, LocyAggregateEntry, LocyPredicateEntry, PluginRecord, PluginRegistry,
48 ProcedureEntry, ScalarEntry, WindowEntry,
49};
50use crate::traits::crdt::CrdtKind;
51use crate::traits::index::IndexKind;
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
59pub enum Discriminator {
60 Arity(usize),
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
69#[non_exhaustive]
70pub enum SurfaceKind {
71 Scalar,
73 Aggregate,
75 Window,
77 Procedure,
79 LocyAggregate,
81 LocyPredicate,
83 Operator,
85 OptimizerRule,
87 Algorithm,
89 Pregel,
91 IndexKind,
93 StorageBackend,
95 LabelStorage,
97 Crdt,
99 Hook,
101 LogicalType,
103 Auth,
105 Authz,
107 Connector,
109 Trigger,
111 Collation,
113 Cdc,
115 Catalog,
117 ReplacementScan,
119 BackgroundJob,
121}
122
123pub trait NamedUniqueSurface: 'static {
131 type Sig: Send + Sync + 'static;
134 type Provider: ?Sized + Send + Sync + 'static;
136
137 const KIND: SurfaceKind;
139}
140
141pub trait VersionedSurface: 'static {
146 type Sig: Send + Sync + 'static;
148 type Provider: ?Sized + Send + Sync + 'static;
150
151 const KIND: SurfaceKind;
153
154 fn discriminator(sig: &Self::Sig) -> Discriminator;
157}
158
159pub trait KeyedUniqueSurface: 'static {
169 type Key: Clone + Eq + Hash + Debug + Send + Sync + 'static;
171 type Provider: ?Sized + Send + Sync + 'static;
173
174 const KIND: SurfaceKind;
176
177 fn duplicate_error(key: &Self::Key) -> PluginError {
188 PluginError::internal(format!("{:?} `{:?}` already registered", Self::KIND, key))
189 }
190
191 fn key_of(_provider: &Self::Provider) -> Option<Self::Key> {
205 None
206 }
207}
208
209pub trait AppendSurface: 'static {
216 type Provider: ?Sized + Send + Sync + 'static;
218
219 const KIND: SurfaceKind;
221}
222
223pub struct AppendEntry<P: ?Sized> {
230 pub plugin: PluginId,
232 pub provider: Arc<P>,
234}
235
236impl<P: ?Sized> Clone for AppendEntry<P> {
237 fn clone(&self) -> Self {
238 Self {
239 plugin: self.plugin.clone(),
240 provider: Arc::clone(&self.provider),
241 }
242 }
243}
244
245impl<P: ?Sized> Debug for AppendEntry<P> {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 f.debug_struct("AppendEntry")
248 .field("plugin", &self.plugin)
249 .finish_non_exhaustive()
250 }
251}
252
253use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
259use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
260use crate::traits::background::BackgroundJobProvider;
261use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
262use crate::traits::cdc::CdcOutputProvider;
263use crate::traits::collation::CollationProvider;
264use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
265use crate::traits::crdt::CrdtKindProvider;
266use crate::traits::hook::SessionHook;
267use crate::traits::index::IndexKindProvider;
268use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
269use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
270use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
271use crate::traits::scalar::{FnSignature, ScalarPluginFn};
272use crate::traits::storage::{Storage, StorageBackend};
273use crate::traits::trigger::TriggerPlugin;
274use crate::traits::types::LogicalTypeProvider;
275use crate::traits::window::{WindowPluginFn, WindowSignature};
276
277macro_rules! marker {
278 ($(#[$attr:meta])* $name:ident) => {
279 $(#[$attr])*
280 #[derive(Debug, Clone, Copy)]
281 pub struct $name;
282 };
283}
284
285marker!(ScalarSurface);
288marker!(AggregateSurface);
290marker!(WindowSurface);
292marker!(LocyAggregateSurface);
294marker!(LocyPredicateSurface);
296marker!(OperatorSurface);
298marker!(AlgorithmSurface);
300marker!(PregelSurface);
302
303marker!(ProcedureSurface);
306
307marker!(IndexKindSurface);
310marker!(StorageBackendSurface);
312marker!(LabelStorageSurface);
314marker!(CrdtSurface);
316marker!(LogicalTypeSurface);
318marker!(CollationSurface);
320marker!(CdcSurface);
322marker!(CatalogSurface);
324
325marker!(OptimizerRuleSurface);
328marker!(HookSurface);
330marker!(AuthSurface);
332marker!(AuthzSurface);
334marker!(ConnectorSurface);
336marker!(TriggerSurface);
338marker!(ReplacementScanSurface);
340marker!(BackgroundJobSurface);
342
343impl NamedUniqueSurface for ScalarSurface {
346 type Sig = FnSignature;
347 type Provider = dyn ScalarPluginFn;
348 const KIND: SurfaceKind = SurfaceKind::Scalar;
349}
350
351impl NamedUniqueSurface for AggregateSurface {
352 type Sig = AggSignature;
353 type Provider = dyn AggregatePluginFn;
354 const KIND: SurfaceKind = SurfaceKind::Aggregate;
355}
356
357impl NamedUniqueSurface for WindowSurface {
358 type Sig = WindowSignature;
359 type Provider = dyn WindowPluginFn;
360 const KIND: SurfaceKind = SurfaceKind::Window;
361}
362
363impl NamedUniqueSurface for LocyAggregateSurface {
364 type Sig = ();
365 type Provider = dyn LocyAggregate;
366 const KIND: SurfaceKind = SurfaceKind::LocyAggregate;
367}
368
369impl NamedUniqueSurface for LocyPredicateSurface {
370 type Sig = PredSignature;
371 type Provider = dyn LocyPredicate;
372 const KIND: SurfaceKind = SurfaceKind::LocyPredicate;
373}
374
375impl NamedUniqueSurface for OperatorSurface {
376 type Sig = ();
377 type Provider = dyn OperatorProvider;
378 const KIND: SurfaceKind = SurfaceKind::Operator;
379}
380
381impl NamedUniqueSurface for AlgorithmSurface {
382 type Sig = ();
383 type Provider = dyn AlgorithmProvider;
384 const KIND: SurfaceKind = SurfaceKind::Algorithm;
385}
386
387impl NamedUniqueSurface for PregelSurface {
388 type Sig = ();
389 type Provider = dyn PregelProgramProvider;
390 const KIND: SurfaceKind = SurfaceKind::Pregel;
391}
392
393impl VersionedSurface for ProcedureSurface {
396 type Sig = ProcedureSignature;
397 type Provider = dyn ProcedurePlugin;
398 const KIND: SurfaceKind = SurfaceKind::Procedure;
399
400 fn discriminator(sig: &Self::Sig) -> Discriminator {
401 Discriminator::Arity(sig.args.len())
402 }
403}
404
405impl KeyedUniqueSurface for IndexKindSurface {
408 type Key = IndexKind;
409 type Provider = dyn IndexKindProvider;
410 const KIND: SurfaceKind = SurfaceKind::IndexKind;
411
412 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
413 Some(provider.kind())
414 }
415}
416
417impl KeyedUniqueSurface for StorageBackendSurface {
418 type Key = SmolStr;
419 type Provider = dyn StorageBackend;
420 const KIND: SurfaceKind = SurfaceKind::StorageBackend;
421
422 fn duplicate_error(key: &Self::Key) -> PluginError {
423 PluginError::StorageSchemeConflict(key.to_string())
424 }
425
426 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
427 Some(SmolStr::new(provider.scheme()))
428 }
429}
430
431impl KeyedUniqueSurface for LabelStorageSurface {
432 type Key = SmolStr;
433 type Provider = dyn Storage;
434 const KIND: SurfaceKind = SurfaceKind::LabelStorage;
435
436 fn duplicate_error(key: &Self::Key) -> PluginError {
437 PluginError::internal(format!("label storage for `{key}` already registered"))
438 }
439
440 }
443
444impl KeyedUniqueSurface for CrdtSurface {
445 type Key = CrdtKind;
446 type Provider = dyn CrdtKindProvider;
447 const KIND: SurfaceKind = SurfaceKind::Crdt;
448
449 fn duplicate_error(key: &Self::Key) -> PluginError {
450 PluginError::internal(format!("CRDT kind `{}` already registered", key.0))
451 }
452
453 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
454 Some(provider.kind())
455 }
456}
457
458impl KeyedUniqueSurface for LogicalTypeSurface {
459 type Key = SmolStr;
460 type Provider = dyn LogicalTypeProvider;
461 const KIND: SurfaceKind = SurfaceKind::LogicalType;
462
463 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
464 Some(SmolStr::new(provider.name()))
465 }
466}
467
468impl KeyedUniqueSurface for CollationSurface {
469 type Key = SmolStr;
470 type Provider = dyn CollationProvider;
471 const KIND: SurfaceKind = SurfaceKind::Collation;
472
473 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
474 Some(SmolStr::new(provider.name()))
475 }
476}
477
478impl KeyedUniqueSurface for CdcSurface {
479 type Key = SmolStr;
480 type Provider = dyn CdcOutputProvider;
481 const KIND: SurfaceKind = SurfaceKind::Cdc;
482
483 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
484 Some(SmolStr::new(provider.name()))
485 }
486}
487
488impl KeyedUniqueSurface for CatalogSurface {
489 type Key = SmolStr;
490 type Provider = dyn CatalogProvider;
491 const KIND: SurfaceKind = SurfaceKind::Catalog;
492
493 fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
494 Some(SmolStr::new(provider.name()))
495 }
496}
497
498impl AppendSurface for OptimizerRuleSurface {
501 type Provider = dyn OptimizerRuleProvider;
502 const KIND: SurfaceKind = SurfaceKind::OptimizerRule;
503}
504
505impl AppendSurface for HookSurface {
506 type Provider = dyn SessionHook;
507 const KIND: SurfaceKind = SurfaceKind::Hook;
508}
509
510impl AppendSurface for AuthSurface {
511 type Provider = dyn AuthProvider;
512 const KIND: SurfaceKind = SurfaceKind::Auth;
513}
514
515impl AppendSurface for AuthzSurface {
516 type Provider = dyn AuthzPolicy;
517 const KIND: SurfaceKind = SurfaceKind::Authz;
518}
519
520impl AppendSurface for ConnectorSurface {
521 type Provider = dyn Connector;
522 const KIND: SurfaceKind = SurfaceKind::Connector;
523}
524
525impl AppendSurface for TriggerSurface {
526 type Provider = dyn TriggerPlugin;
527 const KIND: SurfaceKind = SurfaceKind::Trigger;
528}
529
530impl AppendSurface for ReplacementScanSurface {
531 type Provider = dyn ReplacementScanProvider;
532 const KIND: SurfaceKind = SurfaceKind::ReplacementScan;
533}
534
535impl AppendSurface for BackgroundJobSurface {
536 type Provider = dyn BackgroundJobProvider;
537 const KIND: SurfaceKind = SurfaceKind::BackgroundJob;
538}
539
540pub(crate) trait NamedUniqueOps: NamedUniqueSurface {
556 type Stored: Clone + Send + Sync + 'static;
559
560 fn make_stored(plugin: PluginId, sig: Self::Sig, provider: Arc<Self::Provider>)
562 -> Self::Stored;
563
564 fn slot(registry: &PluginRegistry) -> &DashMap<QName, Self::Stored>;
566
567 fn record_slot(record: &mut PluginRecord) -> &mut Vec<QName>;
570
571 fn preflight(registry: &PluginRegistry, q: &QName) -> Result<(), PluginError> {
578 if Self::slot(registry).contains_key(q) {
579 return Err(PluginError::DuplicateRegistration(q.clone()));
580 }
581 Ok(())
582 }
583
584 fn insert(
587 registry: &PluginRegistry,
588 plugin: PluginId,
589 q: QName,
590 sig: Self::Sig,
591 provider: Arc<Self::Provider>,
592 record: &mut PluginRecord,
593 ) {
594 let stored = Self::make_stored(plugin, sig, provider);
595 Self::slot(registry).insert(q.clone(), stored);
596 Self::record_slot(record).push(q);
597 }
598
599 fn remove(registry: &PluginRegistry, q: &QName) {
601 Self::slot(registry).remove(q);
602 }
603}
604
605pub(crate) trait VersionedOps: VersionedSurface {
610 type Stored: Clone + Send + Sync + 'static;
612
613 fn make_stored(plugin: PluginId, sig: Self::Sig, provider: Arc<Self::Provider>)
615 -> Self::Stored;
616
617 fn entry_discriminator(stored: &Self::Stored) -> Discriminator;
620
621 fn signature_discriminator(sig: &Self::Sig) -> Discriminator {
623 Self::discriminator(sig)
624 }
625
626 fn slot(registry: &PluginRegistry) -> &DashMap<QName, Vec<Self::Stored>>;
628
629 fn record_slot(record: &mut PluginRecord) -> &mut Vec<(QName, usize)>;
632
633 fn discriminator_to_usize(d: Discriminator) -> usize {
635 match d {
636 Discriminator::Arity(n) => n,
637 }
638 }
639
640 fn preflight(registry: &PluginRegistry, q: &QName, sig: &Self::Sig) -> Result<(), PluginError> {
649 let d = Self::signature_discriminator(sig);
650 if let Some(slot) = Self::slot(registry).get(q)
651 && slot.iter().any(|e| Self::entry_discriminator(e) == d)
652 {
653 return Err(PluginError::DuplicateRegistration(q.clone()));
654 }
655 Ok(())
656 }
657
658 fn insert(
661 registry: &PluginRegistry,
662 plugin: PluginId,
663 q: QName,
664 sig: Self::Sig,
665 provider: Arc<Self::Provider>,
666 record: &mut PluginRecord,
667 ) {
668 let d = Self::signature_discriminator(&sig);
669 let stored = Self::make_stored(plugin, sig, provider);
670 let mut entry = Self::slot(registry).entry(q.clone()).or_default();
671 entry.push(stored);
672 drop(entry);
673 Self::record_slot(record).push((q, Self::discriminator_to_usize(d)));
674 }
675
676 fn remove(registry: &PluginRegistry, q: &QName, d: Discriminator) {
679 let slot = Self::slot(registry);
680 if let Some(mut entry) = slot.get_mut(q) {
681 entry.retain(|e| Self::entry_discriminator(e) != d);
682 let empty = entry.is_empty();
683 drop(entry);
684 if empty {
685 slot.remove(q);
686 }
687 }
688 }
689}
690
691pub(crate) trait KeyedUniqueOps: KeyedUniqueSurface {
697 fn slot(registry: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>>;
699
700 fn record_register(record: &mut PluginRecord, key: &Self::Key);
702
703 fn preflight(registry: &PluginRegistry, key: &Self::Key) -> Result<(), PluginError> {
710 if Self::slot(registry).contains_key(key) {
711 return Err(Self::duplicate_error(key));
712 }
713 Ok(())
714 }
715
716 fn insert(
719 registry: &PluginRegistry,
720 key: Self::Key,
721 provider: Arc<Self::Provider>,
722 record: &mut PluginRecord,
723 ) {
724 Self::slot(registry).insert(key.clone(), provider);
725 Self::record_register(record, &key);
726 }
727
728 fn remove(registry: &PluginRegistry, key: &Self::Key) {
730 Self::slot(registry).remove(key);
731 }
732}
733
734pub(crate) trait AppendOps: AppendSurface {
740 fn slot(registry: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>>;
742
743 fn record_register(record: &mut PluginRecord);
745
746 fn insert(
748 registry: &PluginRegistry,
749 plugin: PluginId,
750 provider: Arc<Self::Provider>,
751 record: &mut PluginRecord,
752 ) {
753 let slot = Self::slot(registry);
754 let mut v = (**slot.load()).clone();
755 v.push(AppendEntry { plugin, provider });
756 slot.store(Arc::new(v));
757 Self::record_register(record);
758 }
759
760 fn remove_plugin(registry: &PluginRegistry, plugin: &PluginId) {
762 let slot = Self::slot(registry);
763 let cur = slot.load();
764 if !cur.iter().any(|e| &e.plugin == plugin) {
765 return;
766 }
767 let v: Vec<AppendEntry<Self::Provider>> = cur
768 .iter()
769 .filter(|e| &e.plugin != plugin)
770 .cloned()
771 .collect();
772 slot.store(Arc::new(v));
773 }
774}
775
776impl NamedUniqueOps for ScalarSurface {
779 type Stored = Arc<ScalarEntry>;
780 fn make_stored(
781 plugin: PluginId,
782 sig: Self::Sig,
783 provider: Arc<Self::Provider>,
784 ) -> Self::Stored {
785 Arc::new(ScalarEntry {
786 plugin,
787 signature: sig,
788 function: provider,
789 })
790 }
791 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
792 &r.scalars
793 }
794 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
795 &mut rec.scalars
796 }
797}
798
799impl NamedUniqueOps for AggregateSurface {
800 type Stored = Arc<AggregateEntry>;
801 fn make_stored(
802 plugin: PluginId,
803 sig: Self::Sig,
804 provider: Arc<Self::Provider>,
805 ) -> Self::Stored {
806 Arc::new(AggregateEntry {
807 plugin,
808 signature: sig,
809 aggregate: provider,
810 })
811 }
812 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
813 &r.aggregates
814 }
815 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
816 &mut rec.aggregates
817 }
818}
819
820impl NamedUniqueOps for WindowSurface {
821 type Stored = Arc<WindowEntry>;
822 fn make_stored(
823 plugin: PluginId,
824 sig: Self::Sig,
825 provider: Arc<Self::Provider>,
826 ) -> Self::Stored {
827 Arc::new(WindowEntry {
828 plugin,
829 signature: sig,
830 window: provider,
831 })
832 }
833 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
834 &r.windows
835 }
836 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
837 &mut rec.windows
838 }
839}
840
841impl NamedUniqueOps for LocyAggregateSurface {
842 type Stored = Arc<LocyAggregateEntry>;
843 fn make_stored(
844 plugin: PluginId,
845 _sig: Self::Sig,
846 provider: Arc<Self::Provider>,
847 ) -> Self::Stored {
848 Arc::new(LocyAggregateEntry {
849 plugin,
850 aggregate: provider,
851 })
852 }
853 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
854 &r.locy_aggregates
855 }
856 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
857 &mut rec.locy_aggregates
858 }
859}
860
861impl NamedUniqueOps for LocyPredicateSurface {
862 type Stored = Arc<LocyPredicateEntry>;
863 fn make_stored(
864 plugin: PluginId,
865 sig: Self::Sig,
866 provider: Arc<Self::Provider>,
867 ) -> Self::Stored {
868 Arc::new(LocyPredicateEntry {
869 plugin,
870 signature: sig,
871 predicate: provider,
872 })
873 }
874 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
875 &r.locy_predicates
876 }
877 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
878 &mut rec.locy_predicates
879 }
880}
881
882impl NamedUniqueOps for OperatorSurface {
883 type Stored = Arc<dyn OperatorProvider>;
884 fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
885 provider
886 }
887 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
888 &r.operators
889 }
890 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
891 &mut rec.operators
892 }
893}
894
895impl NamedUniqueOps for AlgorithmSurface {
896 type Stored = Arc<dyn AlgorithmProvider>;
897 fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
898 provider
899 }
900 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
901 &r.algorithms
902 }
903 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
904 &mut rec.algorithms
905 }
906}
907
908impl NamedUniqueOps for PregelSurface {
909 type Stored = Arc<dyn PregelProgramProvider>;
910 fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
911 provider
912 }
913 fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
914 &r.pregels
915 }
916 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
917 &mut rec.pregels
918 }
919}
920
921impl VersionedOps for ProcedureSurface {
924 type Stored = Arc<ProcedureEntry>;
925 fn make_stored(
926 plugin: PluginId,
927 sig: Self::Sig,
928 provider: Arc<Self::Provider>,
929 ) -> Self::Stored {
930 Arc::new(ProcedureEntry {
931 plugin,
932 signature: sig,
933 procedure: provider,
934 })
935 }
936 fn entry_discriminator(stored: &Self::Stored) -> Discriminator {
937 Discriminator::Arity(stored.signature.args.len())
938 }
939 fn slot(r: &PluginRegistry) -> &DashMap<QName, Vec<Self::Stored>> {
940 &r.procedures
941 }
942 fn record_slot(rec: &mut PluginRecord) -> &mut Vec<(QName, usize)> {
943 &mut rec.procedures
944 }
945}
946
947impl KeyedUniqueOps for IndexKindSurface {
950 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
951 &r.index_kinds
952 }
953 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
954 rec.index_kinds.push(key.clone());
955 }
956}
957
958impl KeyedUniqueOps for StorageBackendSurface {
959 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
960 &r.storage_backends
961 }
962 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
963 rec.storage_schemes.push(key.clone());
964 }
965}
966
967impl KeyedUniqueOps for LabelStorageSurface {
968 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
969 &r.label_storages
970 }
971 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
972 rec.label_storages.push(key.clone());
973 }
974}
975
976impl KeyedUniqueOps for CrdtSurface {
977 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
978 &r.crdt_kinds
979 }
980 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
981 rec.crdt_kinds.push(key.clone());
982 }
983}
984
985impl KeyedUniqueOps for LogicalTypeSurface {
986 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
987 &r.logical_types
988 }
989 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
990 rec.logical_types.push(key.clone());
991 }
992}
993
994impl KeyedUniqueOps for CollationSurface {
995 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
996 &r.collations
997 }
998 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
999 rec.collations.push(key.clone());
1000 }
1001}
1002
1003impl KeyedUniqueOps for CdcSurface {
1004 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
1005 &r.cdc_outputs
1006 }
1007 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
1008 rec.cdc_outputs.push(key.clone());
1009 }
1010}
1011
1012impl KeyedUniqueOps for CatalogSurface {
1013 fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
1014 &r.catalogs
1015 }
1016 fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
1017 rec.catalogs.push(key.clone());
1018 }
1019}
1020
1021impl AppendOps for OptimizerRuleSurface {
1024 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1025 &r.optimizer_rules
1026 }
1027 fn record_register(rec: &mut PluginRecord) {
1028 rec.optimizer_rule_count += 1;
1029 }
1030}
1031impl AppendOps for HookSurface {
1032 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1033 &r.hooks
1034 }
1035 fn record_register(rec: &mut PluginRecord) {
1036 rec.hook_count += 1;
1037 }
1038}
1039impl AppendOps for AuthSurface {
1040 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1041 &r.auth_providers
1042 }
1043 fn record_register(rec: &mut PluginRecord) {
1044 rec.auth_count += 1;
1045 }
1046}
1047impl AppendOps for AuthzSurface {
1048 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1049 &r.authz_policies
1050 }
1051 fn record_register(rec: &mut PluginRecord) {
1052 rec.authz_count += 1;
1053 }
1054}
1055impl AppendOps for ConnectorSurface {
1056 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1057 &r.connectors
1058 }
1059 fn record_register(rec: &mut PluginRecord) {
1060 rec.connector_count += 1;
1061 }
1062}
1063impl AppendOps for TriggerSurface {
1064 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1065 &r.triggers
1066 }
1067 fn record_register(rec: &mut PluginRecord) {
1068 rec.trigger_count += 1;
1069 }
1070}
1071impl AppendOps for ReplacementScanSurface {
1072 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1073 &r.replacement_scans
1074 }
1075 fn record_register(rec: &mut PluginRecord) {
1076 rec.replacement_scan_count += 1;
1077 }
1078}
1079impl AppendOps for BackgroundJobSurface {
1080 fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1081 &r.background_jobs
1082 }
1083 fn record_register(rec: &mut PluginRecord) {
1084 rec.background_job_count += 1;
1085 }
1086}
1087
1088pub(crate) trait DynPendingRegistration: Send + Sync {
1103 #[allow(
1105 dead_code,
1106 reason = "Diagnostic surface; exercised by tests and future debug paths."
1107 )]
1108 fn kind(&self) -> SurfaceKind;
1109 fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError>;
1111 fn apply(
1113 self: Box<Self>,
1114 registry: &PluginRegistry,
1115 plugin: PluginId,
1116 record: &mut PluginRecord,
1117 );
1118 #[allow(dead_code, reason = "Diagnostic surface for future error formatting.")]
1120 fn debug_label(&self) -> String;
1121}
1122
1123pub(crate) struct NamedUniqueReg<S: NamedUniqueOps> {
1125 pub q: QName,
1127 pub sig: S::Sig,
1129 pub provider: Arc<S::Provider>,
1131}
1132
1133impl<S> DynPendingRegistration for NamedUniqueReg<S>
1134where
1135 S: NamedUniqueOps + 'static,
1136 S::Sig: Send + Sync,
1137{
1138 fn kind(&self) -> SurfaceKind {
1139 S::KIND
1140 }
1141 fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
1142 S::preflight(registry, &self.q)
1143 }
1144 fn apply(
1145 self: Box<Self>,
1146 registry: &PluginRegistry,
1147 plugin: PluginId,
1148 record: &mut PluginRecord,
1149 ) {
1150 S::insert(registry, plugin, self.q, self.sig, self.provider, record);
1151 }
1152 fn debug_label(&self) -> String {
1153 format!("{:?}({})", S::KIND, self.q)
1154 }
1155}
1156
1157pub(crate) struct VersionedReg<S: VersionedOps> {
1159 pub q: QName,
1161 pub sig: S::Sig,
1163 pub provider: Arc<S::Provider>,
1165}
1166
1167impl<S> DynPendingRegistration for VersionedReg<S>
1168where
1169 S: VersionedOps + 'static,
1170 S::Sig: Send + Sync,
1171{
1172 fn kind(&self) -> SurfaceKind {
1173 S::KIND
1174 }
1175 fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
1176 S::preflight(registry, &self.q, &self.sig)
1177 }
1178 fn apply(
1179 self: Box<Self>,
1180 registry: &PluginRegistry,
1181 plugin: PluginId,
1182 record: &mut PluginRecord,
1183 ) {
1184 S::insert(registry, plugin, self.q, self.sig, self.provider, record);
1185 }
1186 fn debug_label(&self) -> String {
1187 format!("{:?}({})", S::KIND, self.q)
1188 }
1189}
1190
1191pub(crate) struct KeyedUniqueReg<S: KeyedUniqueOps> {
1198 pub key_override: Option<S::Key>,
1201 pub provider: Arc<S::Provider>,
1203}
1204
1205impl<S> KeyedUniqueReg<S>
1206where
1207 S: KeyedUniqueOps,
1208{
1209 pub fn resolve_key(&self) -> Result<S::Key, PluginError> {
1216 if let Some(ref k) = self.key_override {
1217 return Ok(k.clone());
1218 }
1219 S::key_of(&*self.provider).ok_or_else(|| {
1220 PluginError::internal(format!(
1221 "{:?} registration missing explicit key (provider does not self-identify)",
1222 S::KIND
1223 ))
1224 })
1225 }
1226}
1227
1228impl<S> DynPendingRegistration for KeyedUniqueReg<S>
1229where
1230 S: KeyedUniqueOps + 'static,
1231{
1232 fn kind(&self) -> SurfaceKind {
1233 S::KIND
1234 }
1235 fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
1236 let key = self.resolve_key()?;
1237 S::preflight(registry, &key)
1238 }
1239 fn apply(
1240 self: Box<Self>,
1241 registry: &PluginRegistry,
1242 _plugin: PluginId,
1243 record: &mut PluginRecord,
1244 ) {
1245 let key = match self.resolve_key() {
1249 Ok(k) => k,
1250 Err(_) => return, };
1252 S::insert(registry, key, self.provider, record);
1253 }
1254 fn debug_label(&self) -> String {
1255 let k = self
1256 .resolve_key()
1257 .map(|k| format!("{k:?}"))
1258 .unwrap_or_else(|_| "<unresolved>".into());
1259 format!("{:?}({k})", S::KIND)
1260 }
1261}
1262
1263pub(crate) struct AppendReg<S: AppendOps> {
1265 pub provider: Arc<S::Provider>,
1267}
1268
1269impl<S> DynPendingRegistration for AppendReg<S>
1270where
1271 S: AppendOps + 'static,
1272{
1273 fn kind(&self) -> SurfaceKind {
1274 S::KIND
1275 }
1276 fn preflight(&self, _registry: &PluginRegistry) -> Result<(), PluginError> {
1277 Ok(())
1278 }
1279 fn apply(
1280 self: Box<Self>,
1281 registry: &PluginRegistry,
1282 plugin: PluginId,
1283 record: &mut PluginRecord,
1284 ) {
1285 S::insert(registry, plugin, self.provider, record);
1286 }
1287 fn debug_label(&self) -> String {
1288 format!("{:?}", S::KIND)
1289 }
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294 use super::*;
1295
1296 #[test]
1297 fn surface_kind_count_matches_design() {
1298 let kinds = [
1300 <ScalarSurface as NamedUniqueSurface>::KIND,
1301 <AggregateSurface as NamedUniqueSurface>::KIND,
1302 <WindowSurface as NamedUniqueSurface>::KIND,
1303 <LocyAggregateSurface as NamedUniqueSurface>::KIND,
1304 <LocyPredicateSurface as NamedUniqueSurface>::KIND,
1305 <OperatorSurface as NamedUniqueSurface>::KIND,
1306 <AlgorithmSurface as NamedUniqueSurface>::KIND,
1307 <PregelSurface as NamedUniqueSurface>::KIND,
1308 <ProcedureSurface as VersionedSurface>::KIND,
1309 <IndexKindSurface as KeyedUniqueSurface>::KIND,
1310 <StorageBackendSurface as KeyedUniqueSurface>::KIND,
1311 <LabelStorageSurface as KeyedUniqueSurface>::KIND,
1312 <CrdtSurface as KeyedUniqueSurface>::KIND,
1313 <LogicalTypeSurface as KeyedUniqueSurface>::KIND,
1314 <CollationSurface as KeyedUniqueSurface>::KIND,
1315 <CdcSurface as KeyedUniqueSurface>::KIND,
1316 <CatalogSurface as KeyedUniqueSurface>::KIND,
1317 <OptimizerRuleSurface as AppendSurface>::KIND,
1318 <HookSurface as AppendSurface>::KIND,
1319 <AuthSurface as AppendSurface>::KIND,
1320 <AuthzSurface as AppendSurface>::KIND,
1321 <ConnectorSurface as AppendSurface>::KIND,
1322 <TriggerSurface as AppendSurface>::KIND,
1323 <ReplacementScanSurface as AppendSurface>::KIND,
1324 <BackgroundJobSurface as AppendSurface>::KIND,
1325 ];
1326 assert_eq!(kinds.len(), 25);
1335 let mut sorted: Vec<_> = kinds.iter().collect();
1336 sorted.sort_by_key(|k| format!("{k:?}"));
1337 sorted.dedup();
1338 assert_eq!(sorted.len(), 25, "duplicate SurfaceKind in markers");
1339 }
1340
1341 #[test]
1342 fn keyed_unique_storage_backend_duplicate_error_is_typed() {
1343 let err =
1344 <StorageBackendSurface as KeyedUniqueSurface>::duplicate_error(&SmolStr::new("s3"));
1345 assert!(matches!(err, PluginError::StorageSchemeConflict(_)));
1346 }
1347
1348 #[test]
1349 fn keyed_unique_default_duplicate_error_is_internal() {
1350 let err = <LogicalTypeSurface as KeyedUniqueSurface>::duplicate_error(&SmolStr::new("x"));
1351 assert!(matches!(err, PluginError::Internal(_)));
1352 }
1353
1354 struct NoopHook;
1357 impl crate::traits::hook::SessionHook for NoopHook {}
1358
1359 fn pid(s: &str) -> PluginId {
1360 PluginId::new(s)
1361 }
1362
1363 #[test]
1364 fn append_ops_insert_and_remove_round_trip() {
1365 let registry = PluginRegistry::new();
1369 let mut record_a = PluginRecord::default();
1370 let mut record_b = PluginRecord::default();
1371 <HookSurface as AppendOps>::insert(®istry, pid("a"), Arc::new(NoopHook), &mut record_a);
1372 <HookSurface as AppendOps>::insert(®istry, pid("b"), Arc::new(NoopHook), &mut record_b);
1373 assert_eq!(registry.hooks().len(), 2);
1374 assert_eq!(record_a.hook_count, 1);
1375 assert_eq!(record_b.hook_count, 1);
1376
1377 <HookSurface as AppendOps>::remove_plugin(®istry, &pid("a"));
1378 assert_eq!(
1379 registry.hooks().len(),
1380 1,
1381 "remove_plugin should drop plugin a's entry"
1382 );
1383 <HookSurface as AppendOps>::remove_plugin(®istry, &pid("b"));
1384 assert_eq!(registry.hooks().len(), 0);
1385 }
1386
1387 #[test]
1388 fn append_ops_remove_plugin_is_noop_when_no_entries() {
1389 let registry = PluginRegistry::new();
1390 <HookSurface as AppendOps>::remove_plugin(®istry, &pid("ghost"));
1393 assert_eq!(registry.hooks().len(), 0);
1394 }
1395
1396 #[test]
1397 fn append_reg_dyn_dispatch_matches_static_dispatch() {
1398 let registry = PluginRegistry::new();
1401 let mut record = PluginRecord::default();
1402 let reg: Box<dyn DynPendingRegistration> = Box::new(AppendReg::<HookSurface> {
1403 provider: Arc::new(NoopHook),
1404 });
1405 assert_eq!(reg.kind(), SurfaceKind::Hook);
1406 reg.preflight(®istry).unwrap();
1407 reg.apply(®istry, pid("dyn"), &mut record);
1408 assert_eq!(registry.hooks().len(), 1);
1409 assert_eq!(record.hook_count, 1);
1410
1411 <HookSurface as AppendOps>::remove_plugin(®istry, &pid("dyn"));
1412 assert_eq!(registry.hooks().len(), 0);
1413 }
1414
1415 #[test]
1416 fn named_unique_ops_preflight_detects_duplicate() {
1417 let registry = PluginRegistry::new();
1419 let mut record = PluginRecord::default();
1420 let q = QName::builtin("scalar_dup");
1421 <ScalarSurface as NamedUniqueOps>::preflight(®istry, &q).unwrap();
1425 record.scalars.push(q.clone());
1428 }
1434
1435 struct StubCollation(&'static str);
1442 impl crate::traits::collation::CollationProvider for StubCollation {
1443 fn name(&self) -> &str {
1444 self.0
1445 }
1446 fn compare(&self, a: &str, b: &str) -> std::cmp::Ordering {
1447 a.cmp(b)
1448 }
1449 }
1450
1451 #[test]
1452 fn keyed_unique_collation_per_key_record_round_trip() {
1453 let registry = PluginRegistry::new();
1454 let mut record = PluginRecord::default();
1455 let key = SmolStr::new("test.case_fold");
1456 <CollationSurface as KeyedUniqueOps>::insert(
1457 ®istry,
1458 key.clone(),
1459 Arc::new(StubCollation("test.case_fold")),
1460 &mut record,
1461 );
1462 assert_eq!(record.collations, vec![key.clone()]);
1463 assert!(registry.collations.contains_key(&key));
1464
1465 <CollationSurface as KeyedUniqueOps>::remove(®istry, &key);
1466 assert!(
1467 !registry.collations.contains_key(&key),
1468 "remove must drop the keyed-unique slot entry; the legacy \
1469 count-only record could not"
1470 );
1471 }
1472}