use std::sync::Arc;
use smol_str::SmolStr;
use crate::capability::{Capability, CapabilitySet};
use crate::errors::PluginError;
use crate::plugin::PluginId;
use crate::qname::QName;
use crate::registry::PluginRegistry;
use crate::surfaces::{
AggregateSurface, AlgorithmSurface, AppendReg, AuthSurface, AuthzSurface, BackgroundJobSurface,
CatalogSurface, CdcSurface, CollationSurface, ConnectorSurface, CrdtSurface,
DynPendingRegistration, HookSurface, IndexKindSurface, KeyedUniqueReg, LabelStorageSurface,
LocyAggregateSurface, LocyPredicateSurface, LogicalTypeSurface, NamedUniqueReg,
OperatorSurface, OptimizerRuleSurface, PregelSurface, ProcedureSurface, ReplacementScanSurface,
ScalarSurface, StorageBackendSurface, TriggerSurface, VersionedReg, WindowSurface,
};
use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
use crate::traits::background::BackgroundJobProvider;
use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
use crate::traits::cdc::CdcOutputProvider;
use crate::traits::collation::CollationProvider;
use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
use crate::traits::crdt::{CrdtKind, CrdtKindProvider};
use crate::traits::hook::SessionHook;
use crate::traits::index::{IndexKind, IndexKindProvider};
use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
use crate::traits::scalar::{FnSignature, ScalarPluginFn};
use crate::traits::storage::StorageBackend;
use crate::traits::trigger::TriggerPlugin;
use crate::traits::types::LogicalTypeProvider;
use crate::traits::window::{WindowPluginFn, WindowSignature};
pub struct PluginRegistrar<'a> {
plugin_id: PluginId,
effective_caps: &'a CapabilitySet,
registry: &'a PluginRegistry,
pending: Vec<Box<dyn DynPendingRegistration>>,
}
impl<'a> std::fmt::Debug for PluginRegistrar<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PluginRegistrar")
.field("plugin_id", &self.plugin_id)
.field("pending", &self.pending.len())
.finish_non_exhaustive()
}
}
impl<'a> PluginRegistrar<'a> {
#[must_use]
pub fn new(
plugin_id: PluginId,
effective_caps: &'a CapabilitySet,
registry: &'a PluginRegistry,
) -> Self {
Self {
plugin_id,
effective_caps,
registry,
pending: Vec::new(),
}
}
#[must_use]
pub fn plugin_id(&self) -> &PluginId {
&self.plugin_id
}
pub fn set_plugin_id(&mut self, plugin_id: PluginId) {
self.plugin_id = plugin_id;
}
fn require(&self, cap: &Capability) -> Result<(), PluginError> {
if self.effective_caps.contains_variant(cap) {
Ok(())
} else {
Err(PluginError::CapabilityRequired(cap.clone()))
}
}
fn validate_qname(&self, qname: &QName) -> Result<(), PluginError> {
if !qname.is_builtin() && qname.namespace() != self.plugin_id.as_str() {
return Err(PluginError::internal(format!(
"plugin `{}` cannot register qname `{}` outside its namespace",
self.plugin_id, qname
)));
}
Ok(())
}
pub fn scalar_fn(
&mut self,
qname: QName,
sig: FnSignature,
f: Arc<dyn ScalarPluginFn>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::ScalarFn)?;
self.validate_qname(&qname)?;
self.pending.push(Box::new(NamedUniqueReg::<ScalarSurface> {
q: qname,
sig,
provider: f,
}));
Ok(self)
}
pub fn aggregate_fn(
&mut self,
qname: QName,
sig: AggSignature,
f: Arc<dyn AggregatePluginFn>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::AggregateFn)?;
self.validate_qname(&qname)?;
self.pending
.push(Box::new(NamedUniqueReg::<AggregateSurface> {
q: qname,
sig,
provider: f,
}));
Ok(self)
}
pub fn window_fn(
&mut self,
qname: QName,
sig: WindowSignature,
f: Arc<dyn WindowPluginFn>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::WindowFn)?;
self.validate_qname(&qname)?;
self.pending.push(Box::new(NamedUniqueReg::<WindowSurface> {
q: qname,
sig,
provider: f,
}));
Ok(self)
}
pub fn procedure(
&mut self,
qname: QName,
sig: ProcedureSignature,
p: Arc<dyn ProcedurePlugin>,
) -> Result<&mut Self, PluginError> {
use crate::traits::procedure::ProcedureMode;
self.require(&Capability::Procedure)?;
match sig.mode {
ProcedureMode::Write => self.require(&Capability::ProcedureWrites)?,
ProcedureMode::Schema => self.require(&Capability::ProcedureSchema)?,
ProcedureMode::Dbms => self.require(&Capability::ProcedureDbms)?,
ProcedureMode::Read => {}
}
self.validate_qname(&qname)?;
self.pending
.push(Box::new(VersionedReg::<ProcedureSurface> {
q: qname,
sig,
provider: p,
}));
Ok(self)
}
pub fn locy_aggregate(
&mut self,
qname: QName,
a: Arc<dyn LocyAggregate>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::LocyAggregate)?;
self.validate_qname(&qname)?;
self.pending
.push(Box::new(NamedUniqueReg::<LocyAggregateSurface> {
q: qname,
sig: (),
provider: a,
}));
Ok(self)
}
pub fn locy_predicate(
&mut self,
qname: QName,
sig: PredSignature,
p: Arc<dyn LocyPredicate>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::LocyPredicate)?;
self.validate_qname(&qname)?;
self.pending
.push(Box::new(NamedUniqueReg::<LocyPredicateSurface> {
q: qname,
sig,
provider: p,
}));
Ok(self)
}
pub fn operator(
&mut self,
qname: QName,
p: Arc<dyn OperatorProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Operator)?;
self.validate_qname(&qname)?;
self.pending
.push(Box::new(NamedUniqueReg::<OperatorSurface> {
q: qname,
sig: (),
provider: p,
}));
Ok(self)
}
pub fn optimizer_rule(
&mut self,
r: Arc<dyn OptimizerRuleProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Operator)?;
self.pending
.push(Box::new(AppendReg::<OptimizerRuleSurface> { provider: r }));
Ok(self)
}
pub fn index_kind(
&mut self,
kind: IndexKind,
p: Arc<dyn IndexKindProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Index)?;
self.pending
.push(Box::new(KeyedUniqueReg::<IndexKindSurface> {
key_override: Some(kind),
provider: p,
}));
Ok(self)
}
pub fn storage_backend(
&mut self,
scheme: &'static str,
b: Arc<dyn StorageBackend>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Storage)?;
self.pending
.push(Box::new(KeyedUniqueReg::<StorageBackendSurface> {
key_override: Some(SmolStr::new(scheme)),
provider: b,
}));
Ok(self)
}
pub fn label_storage(
&mut self,
label: impl Into<SmolStr>,
storage: Arc<dyn crate::traits::storage::Storage>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Storage)?;
self.pending
.push(Box::new(KeyedUniqueReg::<LabelStorageSurface> {
key_override: Some(label.into()),
provider: storage,
}));
Ok(self)
}
pub fn algorithm(
&mut self,
qname: QName,
p: Arc<dyn AlgorithmProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Algorithm)?;
self.validate_qname(&qname)?;
self.pending
.push(Box::new(NamedUniqueReg::<AlgorithmSurface> {
q: qname,
sig: (),
provider: p,
}));
Ok(self)
}
pub fn pregel(
&mut self,
qname: QName,
p: Arc<dyn PregelProgramProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Algorithm)?;
self.validate_qname(&qname)?;
self.pending.push(Box::new(NamedUniqueReg::<PregelSurface> {
q: qname,
sig: (),
provider: p,
}));
Ok(self)
}
pub fn crdt_kind(
&mut self,
kind: CrdtKind,
p: Arc<dyn CrdtKindProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Crdt)?;
self.pending.push(Box::new(KeyedUniqueReg::<CrdtSurface> {
key_override: Some(kind),
provider: p,
}));
Ok(self)
}
pub fn hook(&mut self, h: Arc<dyn SessionHook>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Hook)?;
self.pending
.push(Box::new(AppendReg::<HookSurface> { provider: h }));
Ok(self)
}
pub fn logical_type(
&mut self,
t: Arc<dyn LogicalTypeProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Type)?;
self.pending
.push(Box::new(KeyedUniqueReg::<LogicalTypeSurface> {
key_override: None,
provider: t,
}));
Ok(self)
}
pub fn auth_provider(&mut self, p: Arc<dyn AuthProvider>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Auth)?;
self.pending
.push(Box::new(AppendReg::<AuthSurface> { provider: p }));
Ok(self)
}
pub fn authz_policy(&mut self, p: Arc<dyn AuthzPolicy>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Authz)?;
self.pending
.push(Box::new(AppendReg::<AuthzSurface> { provider: p }));
Ok(self)
}
pub fn connector(&mut self, c: Arc<dyn Connector>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Connector)?;
self.pending
.push(Box::new(AppendReg::<ConnectorSurface> { provider: c }));
Ok(self)
}
pub fn trigger(&mut self, t: Arc<dyn TriggerPlugin>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Trigger)?;
self.pending
.push(Box::new(AppendReg::<TriggerSurface> { provider: t }));
Ok(self)
}
pub fn collation(&mut self, c: Arc<dyn CollationProvider>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Collation)?;
self.pending
.push(Box::new(KeyedUniqueReg::<CollationSurface> {
key_override: None,
provider: c,
}));
Ok(self)
}
pub fn cdc_output(&mut self, c: Arc<dyn CdcOutputProvider>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Cdc)?;
self.pending.push(Box::new(KeyedUniqueReg::<CdcSurface> {
key_override: None,
provider: c,
}));
Ok(self)
}
pub fn catalog(&mut self, c: Arc<dyn CatalogProvider>) -> Result<&mut Self, PluginError> {
self.require(&Capability::Catalog)?;
self.pending
.push(Box::new(KeyedUniqueReg::<CatalogSurface> {
key_override: None,
provider: c,
}));
Ok(self)
}
pub fn replacement_scan(
&mut self,
r: Arc<dyn ReplacementScanProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::Catalog)?;
self.pending
.push(Box::new(AppendReg::<ReplacementScanSurface> {
provider: r,
}));
Ok(self)
}
pub fn background_job(
&mut self,
j: Arc<dyn BackgroundJobProvider>,
) -> Result<&mut Self, PluginError> {
self.require(&Capability::BackgroundJob { max_concurrent: 0 })?;
self.pending
.push(Box::new(AppendReg::<BackgroundJobSurface> { provider: j }));
Ok(self)
}
pub fn commit_to_registry(self) -> Result<(), PluginError> {
self.registry.apply_pending(&self.plugin_id, self.pending)
}
#[must_use]
pub fn pending_len(&self) -> usize {
self.pending.len()
}
}