use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use smol_str::SmolStr;
use crate::errors::PluginError;
use crate::plugin::PluginId;
use crate::qname::QName;
use crate::registry::{
AggregateEntry, LocyAggregateEntry, LocyPredicateEntry, PluginRecord, PluginRegistry,
ProcedureEntry, ScalarEntry, WindowEntry,
};
use crate::traits::crdt::CrdtKind;
use crate::traits::index::IndexKind;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Discriminator {
Arity(usize),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SurfaceKind {
Scalar,
Aggregate,
Window,
Procedure,
LocyAggregate,
LocyPredicate,
Operator,
OptimizerRule,
Algorithm,
Pregel,
IndexKind,
StorageBackend,
LabelStorage,
Crdt,
Hook,
LogicalType,
Auth,
Authz,
Connector,
Trigger,
Collation,
Cdc,
Catalog,
ReplacementScan,
BackgroundJob,
}
pub trait NamedUniqueSurface: 'static {
type Sig: Send + Sync + 'static;
type Provider: ?Sized + Send + Sync + 'static;
const KIND: SurfaceKind;
}
pub trait VersionedSurface: 'static {
type Sig: Send + Sync + 'static;
type Provider: ?Sized + Send + Sync + 'static;
const KIND: SurfaceKind;
fn discriminator(sig: &Self::Sig) -> Discriminator;
}
pub trait KeyedUniqueSurface: 'static {
type Key: Clone + Eq + Hash + Debug + Send + Sync + 'static;
type Provider: ?Sized + Send + Sync + 'static;
const KIND: SurfaceKind;
fn duplicate_error(key: &Self::Key) -> PluginError {
PluginError::internal(format!("{:?} `{:?}` already registered", Self::KIND, key))
}
fn key_of(_provider: &Self::Provider) -> Option<Self::Key> {
None
}
}
pub trait AppendSurface: 'static {
type Provider: ?Sized + Send + Sync + 'static;
const KIND: SurfaceKind;
}
pub struct AppendEntry<P: ?Sized> {
pub plugin: PluginId,
pub provider: Arc<P>,
}
impl<P: ?Sized> Clone for AppendEntry<P> {
fn clone(&self) -> Self {
Self {
plugin: self.plugin.clone(),
provider: Arc::clone(&self.provider),
}
}
}
impl<P: ?Sized> Debug for AppendEntry<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppendEntry")
.field("plugin", &self.plugin)
.finish_non_exhaustive()
}
}
use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
use crate::traits::background::BackgroundJobProvider;
use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
use crate::traits::cdc::CdcOutputProvider;
use crate::traits::collation::CollationProvider;
use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
use crate::traits::crdt::CrdtKindProvider;
use crate::traits::hook::SessionHook;
use crate::traits::index::IndexKindProvider;
use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
use crate::traits::scalar::{FnSignature, ScalarPluginFn};
use crate::traits::storage::{Storage, StorageBackend};
use crate::traits::trigger::TriggerPlugin;
use crate::traits::types::LogicalTypeProvider;
use crate::traits::window::{WindowPluginFn, WindowSignature};
macro_rules! marker {
($(#[$attr:meta])* $name:ident) => {
$(#[$attr])*
#[derive(Debug, Clone, Copy)]
pub struct $name;
};
}
// Named-unique markers (7).
marker!(ScalarSurface);
marker!(AggregateSurface);
marker!(WindowSurface);
marker!(LocyAggregateSurface);
marker!(LocyPredicateSurface);
marker!(OperatorSurface);
marker!(AlgorithmSurface);
marker!(PregelSurface);
marker!(ProcedureSurface);
marker!(IndexKindSurface);
marker!(StorageBackendSurface);
marker!(LabelStorageSurface);
marker!(CrdtSurface);
marker!(LogicalTypeSurface);
marker!(CollationSurface);
marker!(CdcSurface);
marker!(CatalogSurface);
marker!(OptimizerRuleSurface);
marker!(HookSurface);
marker!(AuthSurface);
marker!(AuthzSurface);
marker!(ConnectorSurface);
marker!(TriggerSurface);
marker!(ReplacementScanSurface);
marker!(BackgroundJobSurface);
impl NamedUniqueSurface for ScalarSurface {
type Sig = FnSignature;
type Provider = dyn ScalarPluginFn;
const KIND: SurfaceKind = SurfaceKind::Scalar;
}
impl NamedUniqueSurface for AggregateSurface {
type Sig = AggSignature;
type Provider = dyn AggregatePluginFn;
const KIND: SurfaceKind = SurfaceKind::Aggregate;
}
impl NamedUniqueSurface for WindowSurface {
type Sig = WindowSignature;
type Provider = dyn WindowPluginFn;
const KIND: SurfaceKind = SurfaceKind::Window;
}
impl NamedUniqueSurface for LocyAggregateSurface {
type Sig = ();
type Provider = dyn LocyAggregate;
const KIND: SurfaceKind = SurfaceKind::LocyAggregate;
}
impl NamedUniqueSurface for LocyPredicateSurface {
type Sig = PredSignature;
type Provider = dyn LocyPredicate;
const KIND: SurfaceKind = SurfaceKind::LocyPredicate;
}
impl NamedUniqueSurface for OperatorSurface {
type Sig = ();
type Provider = dyn OperatorProvider;
const KIND: SurfaceKind = SurfaceKind::Operator;
}
impl NamedUniqueSurface for AlgorithmSurface {
type Sig = ();
type Provider = dyn AlgorithmProvider;
const KIND: SurfaceKind = SurfaceKind::Algorithm;
}
impl NamedUniqueSurface for PregelSurface {
type Sig = ();
type Provider = dyn PregelProgramProvider;
const KIND: SurfaceKind = SurfaceKind::Pregel;
}
impl VersionedSurface for ProcedureSurface {
type Sig = ProcedureSignature;
type Provider = dyn ProcedurePlugin;
const KIND: SurfaceKind = SurfaceKind::Procedure;
fn discriminator(sig: &Self::Sig) -> Discriminator {
Discriminator::Arity(sig.args.len())
}
}
impl KeyedUniqueSurface for IndexKindSurface {
type Key = IndexKind;
type Provider = dyn IndexKindProvider;
const KIND: SurfaceKind = SurfaceKind::IndexKind;
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(provider.kind())
}
}
impl KeyedUniqueSurface for StorageBackendSurface {
type Key = SmolStr;
type Provider = dyn StorageBackend;
const KIND: SurfaceKind = SurfaceKind::StorageBackend;
fn duplicate_error(key: &Self::Key) -> PluginError {
PluginError::StorageSchemeConflict(key.to_string())
}
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(SmolStr::new(provider.scheme()))
}
}
impl KeyedUniqueSurface for LabelStorageSurface {
type Key = SmolStr;
type Provider = dyn Storage;
const KIND: SurfaceKind = SurfaceKind::LabelStorage;
fn duplicate_error(key: &Self::Key) -> PluginError {
PluginError::internal(format!("label storage for `{key}` already registered"))
}
}
impl KeyedUniqueSurface for CrdtSurface {
type Key = CrdtKind;
type Provider = dyn CrdtKindProvider;
const KIND: SurfaceKind = SurfaceKind::Crdt;
fn duplicate_error(key: &Self::Key) -> PluginError {
PluginError::internal(format!("CRDT kind `{}` already registered", key.0))
}
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(provider.kind())
}
}
impl KeyedUniqueSurface for LogicalTypeSurface {
type Key = SmolStr;
type Provider = dyn LogicalTypeProvider;
const KIND: SurfaceKind = SurfaceKind::LogicalType;
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(SmolStr::new(provider.name()))
}
}
impl KeyedUniqueSurface for CollationSurface {
type Key = SmolStr;
type Provider = dyn CollationProvider;
const KIND: SurfaceKind = SurfaceKind::Collation;
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(SmolStr::new(provider.name()))
}
}
impl KeyedUniqueSurface for CdcSurface {
type Key = SmolStr;
type Provider = dyn CdcOutputProvider;
const KIND: SurfaceKind = SurfaceKind::Cdc;
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(SmolStr::new(provider.name()))
}
}
impl KeyedUniqueSurface for CatalogSurface {
type Key = SmolStr;
type Provider = dyn CatalogProvider;
const KIND: SurfaceKind = SurfaceKind::Catalog;
fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
Some(SmolStr::new(provider.name()))
}
}
impl AppendSurface for OptimizerRuleSurface {
type Provider = dyn OptimizerRuleProvider;
const KIND: SurfaceKind = SurfaceKind::OptimizerRule;
}
impl AppendSurface for HookSurface {
type Provider = dyn SessionHook;
const KIND: SurfaceKind = SurfaceKind::Hook;
}
impl AppendSurface for AuthSurface {
type Provider = dyn AuthProvider;
const KIND: SurfaceKind = SurfaceKind::Auth;
}
impl AppendSurface for AuthzSurface {
type Provider = dyn AuthzPolicy;
const KIND: SurfaceKind = SurfaceKind::Authz;
}
impl AppendSurface for ConnectorSurface {
type Provider = dyn Connector;
const KIND: SurfaceKind = SurfaceKind::Connector;
}
impl AppendSurface for TriggerSurface {
type Provider = dyn TriggerPlugin;
const KIND: SurfaceKind = SurfaceKind::Trigger;
}
impl AppendSurface for ReplacementScanSurface {
type Provider = dyn ReplacementScanProvider;
const KIND: SurfaceKind = SurfaceKind::ReplacementScan;
}
impl AppendSurface for BackgroundJobSurface {
type Provider = dyn BackgroundJobProvider;
const KIND: SurfaceKind = SurfaceKind::BackgroundJob;
}
pub(crate) trait NamedUniqueOps: NamedUniqueSurface {
type Stored: Clone + Send + Sync + 'static;
fn make_stored(plugin: PluginId, sig: Self::Sig, provider: Arc<Self::Provider>)
-> Self::Stored;
fn slot(registry: &PluginRegistry) -> &DashMap<QName, Self::Stored>;
fn record_slot(record: &mut PluginRecord) -> &mut Vec<QName>;
fn preflight(registry: &PluginRegistry, q: &QName) -> Result<(), PluginError> {
if Self::slot(registry).contains_key(q) {
return Err(PluginError::DuplicateRegistration(q.clone()));
}
Ok(())
}
fn insert(
registry: &PluginRegistry,
plugin: PluginId,
q: QName,
sig: Self::Sig,
provider: Arc<Self::Provider>,
record: &mut PluginRecord,
) {
let stored = Self::make_stored(plugin, sig, provider);
Self::slot(registry).insert(q.clone(), stored);
Self::record_slot(record).push(q);
}
fn remove(registry: &PluginRegistry, q: &QName) {
Self::slot(registry).remove(q);
}
}
pub(crate) trait VersionedOps: VersionedSurface {
type Stored: Clone + Send + Sync + 'static;
fn make_stored(plugin: PluginId, sig: Self::Sig, provider: Arc<Self::Provider>)
-> Self::Stored;
fn entry_discriminator(stored: &Self::Stored) -> Discriminator;
fn signature_discriminator(sig: &Self::Sig) -> Discriminator {
Self::discriminator(sig)
}
fn slot(registry: &PluginRegistry) -> &DashMap<QName, Vec<Self::Stored>>;
fn record_slot(record: &mut PluginRecord) -> &mut Vec<(QName, usize)>;
fn discriminator_to_usize(d: Discriminator) -> usize {
match d {
Discriminator::Arity(n) => n,
}
}
fn preflight(registry: &PluginRegistry, q: &QName, sig: &Self::Sig) -> Result<(), PluginError> {
let d = Self::signature_discriminator(sig);
if let Some(slot) = Self::slot(registry).get(q)
&& slot.iter().any(|e| Self::entry_discriminator(e) == d)
{
return Err(PluginError::DuplicateRegistration(q.clone()));
}
Ok(())
}
fn insert(
registry: &PluginRegistry,
plugin: PluginId,
q: QName,
sig: Self::Sig,
provider: Arc<Self::Provider>,
record: &mut PluginRecord,
) {
let d = Self::signature_discriminator(&sig);
let stored = Self::make_stored(plugin, sig, provider);
let mut entry = Self::slot(registry).entry(q.clone()).or_default();
entry.push(stored);
drop(entry);
Self::record_slot(record).push((q, Self::discriminator_to_usize(d)));
}
fn remove(registry: &PluginRegistry, q: &QName, d: Discriminator) {
let slot = Self::slot(registry);
if let Some(mut entry) = slot.get_mut(q) {
entry.retain(|e| Self::entry_discriminator(e) != d);
let empty = entry.is_empty();
drop(entry);
if empty {
slot.remove(q);
}
}
}
}
pub(crate) trait KeyedUniqueOps: KeyedUniqueSurface {
fn slot(registry: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>>;
fn record_register(record: &mut PluginRecord, key: &Self::Key);
fn preflight(registry: &PluginRegistry, key: &Self::Key) -> Result<(), PluginError> {
if Self::slot(registry).contains_key(key) {
return Err(Self::duplicate_error(key));
}
Ok(())
}
fn insert(
registry: &PluginRegistry,
key: Self::Key,
provider: Arc<Self::Provider>,
record: &mut PluginRecord,
) {
Self::slot(registry).insert(key.clone(), provider);
Self::record_register(record, &key);
}
fn remove(registry: &PluginRegistry, key: &Self::Key) {
Self::slot(registry).remove(key);
}
}
pub(crate) trait AppendOps: AppendSurface {
fn slot(registry: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>>;
fn record_register(record: &mut PluginRecord);
fn insert(
registry: &PluginRegistry,
plugin: PluginId,
provider: Arc<Self::Provider>,
record: &mut PluginRecord,
) {
let slot = Self::slot(registry);
let mut v = (**slot.load()).clone();
v.push(AppendEntry { plugin, provider });
slot.store(Arc::new(v));
Self::record_register(record);
}
fn remove_plugin(registry: &PluginRegistry, plugin: &PluginId) {
let slot = Self::slot(registry);
let cur = slot.load();
if !cur.iter().any(|e| &e.plugin == plugin) {
return;
}
let v: Vec<AppendEntry<Self::Provider>> = cur
.iter()
.filter(|e| &e.plugin != plugin)
.cloned()
.collect();
slot.store(Arc::new(v));
}
}
impl NamedUniqueOps for ScalarSurface {
type Stored = Arc<ScalarEntry>;
fn make_stored(
plugin: PluginId,
sig: Self::Sig,
provider: Arc<Self::Provider>,
) -> Self::Stored {
Arc::new(ScalarEntry {
plugin,
signature: sig,
function: provider,
})
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.scalars
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.scalars
}
}
impl NamedUniqueOps for AggregateSurface {
type Stored = Arc<AggregateEntry>;
fn make_stored(
plugin: PluginId,
sig: Self::Sig,
provider: Arc<Self::Provider>,
) -> Self::Stored {
Arc::new(AggregateEntry {
plugin,
signature: sig,
aggregate: provider,
})
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.aggregates
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.aggregates
}
}
impl NamedUniqueOps for WindowSurface {
type Stored = Arc<WindowEntry>;
fn make_stored(
plugin: PluginId,
sig: Self::Sig,
provider: Arc<Self::Provider>,
) -> Self::Stored {
Arc::new(WindowEntry {
plugin,
signature: sig,
window: provider,
})
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.windows
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.windows
}
}
impl NamedUniqueOps for LocyAggregateSurface {
type Stored = Arc<LocyAggregateEntry>;
fn make_stored(
plugin: PluginId,
_sig: Self::Sig,
provider: Arc<Self::Provider>,
) -> Self::Stored {
Arc::new(LocyAggregateEntry {
plugin,
aggregate: provider,
})
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.locy_aggregates
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.locy_aggregates
}
}
impl NamedUniqueOps for LocyPredicateSurface {
type Stored = Arc<LocyPredicateEntry>;
fn make_stored(
plugin: PluginId,
sig: Self::Sig,
provider: Arc<Self::Provider>,
) -> Self::Stored {
Arc::new(LocyPredicateEntry {
plugin,
signature: sig,
predicate: provider,
})
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.locy_predicates
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.locy_predicates
}
}
impl NamedUniqueOps for OperatorSurface {
type Stored = Arc<dyn OperatorProvider>;
fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
provider
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.operators
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.operators
}
}
impl NamedUniqueOps for AlgorithmSurface {
type Stored = Arc<dyn AlgorithmProvider>;
fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
provider
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.algorithms
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.algorithms
}
}
impl NamedUniqueOps for PregelSurface {
type Stored = Arc<dyn PregelProgramProvider>;
fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
provider
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
&r.pregels
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
&mut rec.pregels
}
}
impl VersionedOps for ProcedureSurface {
type Stored = Arc<ProcedureEntry>;
fn make_stored(
plugin: PluginId,
sig: Self::Sig,
provider: Arc<Self::Provider>,
) -> Self::Stored {
Arc::new(ProcedureEntry {
plugin,
signature: sig,
procedure: provider,
})
}
fn entry_discriminator(stored: &Self::Stored) -> Discriminator {
Discriminator::Arity(stored.signature.args.len())
}
fn slot(r: &PluginRegistry) -> &DashMap<QName, Vec<Self::Stored>> {
&r.procedures
}
fn record_slot(rec: &mut PluginRecord) -> &mut Vec<(QName, usize)> {
&mut rec.procedures
}
}
impl KeyedUniqueOps for IndexKindSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.index_kinds
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.index_kinds.push(key.clone());
}
}
impl KeyedUniqueOps for StorageBackendSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.storage_backends
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.storage_schemes.push(key.clone());
}
}
impl KeyedUniqueOps for LabelStorageSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.label_storages
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.label_storages.push(key.clone());
}
}
impl KeyedUniqueOps for CrdtSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.crdt_kinds
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.crdt_kinds.push(key.clone());
}
}
impl KeyedUniqueOps for LogicalTypeSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.logical_types
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.logical_types.push(key.clone());
}
}
impl KeyedUniqueOps for CollationSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.collations
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.collations.push(key.clone());
}
}
impl KeyedUniqueOps for CdcSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.cdc_outputs
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.cdc_outputs.push(key.clone());
}
}
impl KeyedUniqueOps for CatalogSurface {
fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
&r.catalogs
}
fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
rec.catalogs.push(key.clone());
}
}
impl AppendOps for OptimizerRuleSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.optimizer_rules
}
fn record_register(rec: &mut PluginRecord) {
rec.optimizer_rule_count += 1;
}
}
impl AppendOps for HookSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.hooks
}
fn record_register(rec: &mut PluginRecord) {
rec.hook_count += 1;
}
}
impl AppendOps for AuthSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.auth_providers
}
fn record_register(rec: &mut PluginRecord) {
rec.auth_count += 1;
}
}
impl AppendOps for AuthzSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.authz_policies
}
fn record_register(rec: &mut PluginRecord) {
rec.authz_count += 1;
}
}
impl AppendOps for ConnectorSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.connectors
}
fn record_register(rec: &mut PluginRecord) {
rec.connector_count += 1;
}
}
impl AppendOps for TriggerSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.triggers
}
fn record_register(rec: &mut PluginRecord) {
rec.trigger_count += 1;
}
}
impl AppendOps for ReplacementScanSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.replacement_scans
}
fn record_register(rec: &mut PluginRecord) {
rec.replacement_scan_count += 1;
}
}
impl AppendOps for BackgroundJobSurface {
fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
&r.background_jobs
}
fn record_register(rec: &mut PluginRecord) {
rec.background_job_count += 1;
}
}
pub(crate) trait DynPendingRegistration: Send + Sync {
#[allow(
dead_code,
reason = "Diagnostic surface; exercised by tests and future debug paths."
)]
fn kind(&self) -> SurfaceKind;
fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError>;
fn apply(
self: Box<Self>,
registry: &PluginRegistry,
plugin: PluginId,
record: &mut PluginRecord,
);
#[allow(dead_code, reason = "Diagnostic surface for future error formatting.")]
fn debug_label(&self) -> String;
}
pub(crate) struct NamedUniqueReg<S: NamedUniqueOps> {
pub q: QName,
pub sig: S::Sig,
pub provider: Arc<S::Provider>,
}
impl<S> DynPendingRegistration for NamedUniqueReg<S>
where
S: NamedUniqueOps + 'static,
S::Sig: Send + Sync,
{
fn kind(&self) -> SurfaceKind {
S::KIND
}
fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
S::preflight(registry, &self.q)
}
fn apply(
self: Box<Self>,
registry: &PluginRegistry,
plugin: PluginId,
record: &mut PluginRecord,
) {
S::insert(registry, plugin, self.q, self.sig, self.provider, record);
}
fn debug_label(&self) -> String {
format!("{:?}({})", S::KIND, self.q)
}
}
pub(crate) struct VersionedReg<S: VersionedOps> {
pub q: QName,
pub sig: S::Sig,
pub provider: Arc<S::Provider>,
}
impl<S> DynPendingRegistration for VersionedReg<S>
where
S: VersionedOps + 'static,
S::Sig: Send + Sync,
{
fn kind(&self) -> SurfaceKind {
S::KIND
}
fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
S::preflight(registry, &self.q, &self.sig)
}
fn apply(
self: Box<Self>,
registry: &PluginRegistry,
plugin: PluginId,
record: &mut PluginRecord,
) {
S::insert(registry, plugin, self.q, self.sig, self.provider, record);
}
fn debug_label(&self) -> String {
format!("{:?}({})", S::KIND, self.q)
}
}
pub(crate) struct KeyedUniqueReg<S: KeyedUniqueOps> {
pub key_override: Option<S::Key>,
pub provider: Arc<S::Provider>,
}
impl<S> KeyedUniqueReg<S>
where
S: KeyedUniqueOps,
{
pub fn resolve_key(&self) -> Result<S::Key, PluginError> {
if let Some(ref k) = self.key_override {
return Ok(k.clone());
}
S::key_of(&*self.provider).ok_or_else(|| {
PluginError::internal(format!(
"{:?} registration missing explicit key (provider does not self-identify)",
S::KIND
))
})
}
}
impl<S> DynPendingRegistration for KeyedUniqueReg<S>
where
S: KeyedUniqueOps + 'static,
{
fn kind(&self) -> SurfaceKind {
S::KIND
}
fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
let key = self.resolve_key()?;
S::preflight(registry, &key)
}
fn apply(
self: Box<Self>,
registry: &PluginRegistry,
_plugin: PluginId,
record: &mut PluginRecord,
) {
let key = match self.resolve_key() {
Ok(k) => k,
Err(_) => return, };
S::insert(registry, key, self.provider, record);
}
fn debug_label(&self) -> String {
let k = self
.resolve_key()
.map(|k| format!("{k:?}"))
.unwrap_or_else(|_| "<unresolved>".into());
format!("{:?}({k})", S::KIND)
}
}
pub(crate) struct AppendReg<S: AppendOps> {
pub provider: Arc<S::Provider>,
}
impl<S> DynPendingRegistration for AppendReg<S>
where
S: AppendOps + 'static,
{
fn kind(&self) -> SurfaceKind {
S::KIND
}
fn preflight(&self, _registry: &PluginRegistry) -> Result<(), PluginError> {
Ok(())
}
fn apply(
self: Box<Self>,
registry: &PluginRegistry,
plugin: PluginId,
record: &mut PluginRecord,
) {
S::insert(registry, plugin, self.provider, record);
}
fn debug_label(&self) -> String {
format!("{:?}", S::KIND)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn surface_kind_count_matches_design() {
let kinds = [
<ScalarSurface as NamedUniqueSurface>::KIND,
<AggregateSurface as NamedUniqueSurface>::KIND,
<WindowSurface as NamedUniqueSurface>::KIND,
<LocyAggregateSurface as NamedUniqueSurface>::KIND,
<LocyPredicateSurface as NamedUniqueSurface>::KIND,
<OperatorSurface as NamedUniqueSurface>::KIND,
<AlgorithmSurface as NamedUniqueSurface>::KIND,
<PregelSurface as NamedUniqueSurface>::KIND,
<ProcedureSurface as VersionedSurface>::KIND,
<IndexKindSurface as KeyedUniqueSurface>::KIND,
<StorageBackendSurface as KeyedUniqueSurface>::KIND,
<LabelStorageSurface as KeyedUniqueSurface>::KIND,
<CrdtSurface as KeyedUniqueSurface>::KIND,
<LogicalTypeSurface as KeyedUniqueSurface>::KIND,
<CollationSurface as KeyedUniqueSurface>::KIND,
<CdcSurface as KeyedUniqueSurface>::KIND,
<CatalogSurface as KeyedUniqueSurface>::KIND,
<OptimizerRuleSurface as AppendSurface>::KIND,
<HookSurface as AppendSurface>::KIND,
<AuthSurface as AppendSurface>::KIND,
<AuthzSurface as AppendSurface>::KIND,
<ConnectorSurface as AppendSurface>::KIND,
<TriggerSurface as AppendSurface>::KIND,
<ReplacementScanSurface as AppendSurface>::KIND,
<BackgroundJobSurface as AppendSurface>::KIND,
];
assert_eq!(kinds.len(), 25);
let mut sorted: Vec<_> = kinds.iter().collect();
sorted.sort_by_key(|k| format!("{k:?}"));
sorted.dedup();
assert_eq!(sorted.len(), 25, "duplicate SurfaceKind in markers");
}
#[test]
fn keyed_unique_storage_backend_duplicate_error_is_typed() {
let err =
<StorageBackendSurface as KeyedUniqueSurface>::duplicate_error(&SmolStr::new("s3"));
assert!(matches!(err, PluginError::StorageSchemeConflict(_)));
}
#[test]
fn keyed_unique_default_duplicate_error_is_internal() {
let err = <LogicalTypeSurface as KeyedUniqueSurface>::duplicate_error(&SmolStr::new("x"));
assert!(matches!(err, PluginError::Internal(_)));
}
struct NoopHook;
impl crate::traits::hook::SessionHook for NoopHook {}
fn pid(s: &str) -> PluginId {
PluginId::new(s)
}
#[test]
fn append_ops_insert_and_remove_round_trip() {
let registry = PluginRegistry::new();
let mut record_a = PluginRecord::default();
let mut record_b = PluginRecord::default();
<HookSurface as AppendOps>::insert(®istry, pid("a"), Arc::new(NoopHook), &mut record_a);
<HookSurface as AppendOps>::insert(®istry, pid("b"), Arc::new(NoopHook), &mut record_b);
assert_eq!(registry.hooks().len(), 2);
assert_eq!(record_a.hook_count, 1);
assert_eq!(record_b.hook_count, 1);
<HookSurface as AppendOps>::remove_plugin(®istry, &pid("a"));
assert_eq!(
registry.hooks().len(),
1,
"remove_plugin should drop plugin a's entry"
);
<HookSurface as AppendOps>::remove_plugin(®istry, &pid("b"));
assert_eq!(registry.hooks().len(), 0);
}
#[test]
fn append_ops_remove_plugin_is_noop_when_no_entries() {
let registry = PluginRegistry::new();
<HookSurface as AppendOps>::remove_plugin(®istry, &pid("ghost"));
assert_eq!(registry.hooks().len(), 0);
}
#[test]
fn append_reg_dyn_dispatch_matches_static_dispatch() {
let registry = PluginRegistry::new();
let mut record = PluginRecord::default();
let reg: Box<dyn DynPendingRegistration> = Box::new(AppendReg::<HookSurface> {
provider: Arc::new(NoopHook),
});
assert_eq!(reg.kind(), SurfaceKind::Hook);
reg.preflight(®istry).unwrap();
reg.apply(®istry, pid("dyn"), &mut record);
assert_eq!(registry.hooks().len(), 1);
assert_eq!(record.hook_count, 1);
<HookSurface as AppendOps>::remove_plugin(®istry, &pid("dyn"));
assert_eq!(registry.hooks().len(), 0);
}
#[test]
fn named_unique_ops_preflight_detects_duplicate() {
let registry = PluginRegistry::new();
let mut record = PluginRecord::default();
let q = QName::builtin("scalar_dup");
<ScalarSurface as NamedUniqueOps>::preflight(®istry, &q).unwrap();
record.scalars.push(q.clone());
}
struct StubCollation(&'static str);
impl crate::traits::collation::CollationProvider for StubCollation {
fn name(&self) -> &str {
self.0
}
fn compare(&self, a: &str, b: &str) -> std::cmp::Ordering {
a.cmp(b)
}
}
#[test]
fn keyed_unique_collation_per_key_record_round_trip() {
let registry = PluginRegistry::new();
let mut record = PluginRecord::default();
let key = SmolStr::new("test.case_fold");
<CollationSurface as KeyedUniqueOps>::insert(
®istry,
key.clone(),
Arc::new(StubCollation("test.case_fold")),
&mut record,
);
assert_eq!(record.collations, vec![key.clone()]);
assert!(registry.collations.contains_key(&key));
<CollationSurface as KeyedUniqueOps>::remove(®istry, &key);
assert!(
!registry.collations.contains_key(&key),
"remove must drop the keyed-unique slot entry; the legacy \
count-only record could not"
);
}
}