Skip to main content

uni_plugin/surfaces/
mod.rs

1//! Per-surface trait abstractions for the plugin registry.
2//!
3//! This module is filled during Phase 4 of the §1.1 consolidation pass. It
4//! introduces the [`SurfaceKind`] enum and four family traits
5//! ([`NamedUniqueSurface`], [`VersionedSurface`], [`KeyedUniqueSurface`],
6//! [`AppendSurface`]) that collapse the 25 surfaces in [`crate::registry`]
7//! to a handful of generic patterns.
8//!
9//! # Phase 4 status
10//!
11//! Phase 4 is complete: the 25 surfaces dispatch through the family-ops
12//! traits in this module. [`crate::registrar::PluginRegistrar`] enqueues
13//! `Box<dyn DynPendingRegistration>` payloads; `PluginRegistry::apply_pending`
14//! calls `preflight` then `apply` per payload; [`PluginRegistry::remove_plugin`]
15//! walks the per-family `PluginRecord` fields and dispatches to
16//! `*Ops::remove` / `AppendOps::remove_plugin`. The legacy
17//! `PendingRegistration` enum and its three 25-arm matches are gone.
18//!
19//! # Family overview
20//!
21//! | Family          | Storage shape                              | Example surfaces                 |
22//! |-----------------|--------------------------------------------|----------------------------------|
23//! | Named-unique    | `DashMap<QName, Arc<Entry<K, Sig, P>>>`    | Scalar, Aggregate, Window, …     |
24//! | Versioned       | `DashMap<QName, Vec<Arc<Entry<…>>>>`       | Procedure (arity overload)       |
25//! | Keyed-unique    | `DashMap<K, Arc<dyn Provider>>`            | IndexKind, StorageBackend, …     |
26//! | Append          | `ArcSwap<Vec<Arc<dyn Provider>>>`          | Hook, OptimizerRule, …           |
27//!
28//! Append- and keyed-unique-family providers carry their key inside the
29//! trait (e.g. [`crate::traits::collation::CollationProvider::name`]); the
30//! [`KeyedUniqueSurface::key_of`] hook lets the registry derive a key from
31//! the provider when no explicit key is passed at registration time.
32
33// Rust guideline compliant
34
35use std::fmt::Debug;
36use std::hash::Hash;
37use std::sync::Arc;
38
39use arc_swap::ArcSwap;
40use dashmap::DashMap;
41use smol_str::SmolStr;
42
43use crate::errors::PluginError;
44use crate::plugin::PluginId;
45use crate::qname::QName;
46use crate::registry::{
47    AggregateEntry, LocyAggregateEntry, LocyPredicateEntry, PluginRecord, PluginRegistry,
48    ProcedureEntry, ScalarEntry, WindowEntry,
49};
50use crate::traits::crdt::CrdtKind;
51use crate::traits::index::IndexKind;
52
53/// Discriminator that distinguishes overloads sharing one [`QName`].
54///
55/// Currently only `Arity` is used (by [`ProcedureSurface`] to disambiguate
56/// arity overloads); the variant is kept open so future versioned families
57/// (e.g. type-set overloads) can extend it without breaking call sites.
58#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
59pub enum Discriminator {
60    /// Positional-argument arity.
61    Arity(usize),
62}
63
64/// Enumeration of the 25 plugin surfaces.
65///
66/// Used by [`crate::registry::PluginRecordSnapshot`] accessors to filter the
67/// per-plugin footprint by surface.
68#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
69#[non_exhaustive]
70pub enum SurfaceKind {
71    /// `Capability::ScalarFn` — Cypher scalar function.
72    Scalar,
73    /// `Capability::AggregateFn` — Cypher aggregate function.
74    Aggregate,
75    /// `Capability::WindowFn` — Cypher window function.
76    Window,
77    /// `Capability::Procedure` — Cypher procedure (arity-overloaded).
78    Procedure,
79    /// `Capability::LocyAggregate` — Locy aggregate.
80    LocyAggregate,
81    /// `Capability::LocyPredicate` — Locy predicate.
82    LocyPredicate,
83    /// `Capability::Operator` — physical operator.
84    Operator,
85    /// `Capability::Operator` — DataFusion optimizer rule.
86    OptimizerRule,
87    /// `Capability::Algorithm` — graph algorithm.
88    Algorithm,
89    /// `Capability::Algorithm` — Pregel program.
90    Pregel,
91    /// `Capability::Index` — index kind provider.
92    IndexKind,
93    /// `Capability::Storage` — storage backend by URI scheme.
94    StorageBackend,
95    /// `Capability::Storage` — per-label plugin storage (M5h.2).
96    LabelStorage,
97    /// `Capability::Crdt` — CRDT kind provider.
98    Crdt,
99    /// `Capability::Hook` — session-lifecycle hook.
100    Hook,
101    /// `Capability::Type` — Arrow extension logical-type provider.
102    LogicalType,
103    /// `Capability::Auth` — authentication provider.
104    Auth,
105    /// `Capability::Authz` — authorization policy.
106    Authz,
107    /// `Capability::Connector` — wire-protocol connector.
108    Connector,
109    /// `Capability::Trigger` — fine-grained trigger.
110    Trigger,
111    /// `Capability::Collation` — collation provider.
112    Collation,
113    /// `Capability::Cdc` — CDC output sink.
114    Cdc,
115    /// `Capability::Catalog` — catalog provider.
116    Catalog,
117    /// `Capability::Catalog` — replacement-scan provider.
118    ReplacementScan,
119    /// `Capability::BackgroundJob` — background-job provider.
120    BackgroundJob,
121}
122
123// ── Family traits ─────────────────────────────────────────────────────
124
125/// Named-unique family: `DashMap<QName, Arc<Entry<K, Sig, P>>>`.
126///
127/// One registration per qname; preflight rejects duplicates with
128/// [`PluginError::DuplicateRegistration`]. Members: Scalar, Aggregate,
129/// Window, LocyAggregate, LocyPredicate, Operator, Algorithm, Pregel.
130pub trait NamedUniqueSurface: 'static {
131    /// The registered signature (e.g. `FnSignature`, `AggSignature`); unit
132    /// when the surface carries no signature (e.g. `LocyAggregate`).
133    type Sig: Send + Sync + 'static;
134    /// The trait-object provider type behind `Arc<dyn …>`.
135    type Provider: ?Sized + Send + Sync + 'static;
136
137    /// Surface discriminant for record keeping.
138    const KIND: SurfaceKind;
139}
140
141/// Versioned family: `DashMap<QName, Vec<Arc<Entry<K, Sig, P>>>>`.
142///
143/// Multiple registrations may share one qname, distinguished by a
144/// [`Discriminator`]. Only member today: Procedure (arity overload).
145pub trait VersionedSurface: 'static {
146    /// The registered signature.
147    type Sig: Send + Sync + 'static;
148    /// The trait-object provider.
149    type Provider: ?Sized + Send + Sync + 'static;
150
151    /// Surface discriminant.
152    const KIND: SurfaceKind;
153
154    /// Extract the per-overload discriminator from a signature so the
155    /// registry can de-duplicate within one qname.
156    fn discriminator(sig: &Self::Sig) -> Discriminator;
157}
158
159/// Keyed-unique family: `DashMap<K, Arc<dyn Provider>>`.
160///
161/// Distinct from named-unique because the key is **not** a [`QName`] — it
162/// may be a [`SmolStr`] scheme/name, an [`IndexKind`], a [`CrdtKind`], etc.
163/// The provider trait often exposes the key (e.g.
164/// [`crate::traits::collation::CollationProvider::name`]).
165///
166/// Members: IndexKind, StorageBackend, LabelStorage, Crdt, LogicalType,
167/// Collation, Cdc, Catalog.
168pub trait KeyedUniqueSurface: 'static {
169    /// The key type the `DashMap` is keyed by.
170    type Key: Clone + Eq + Hash + Debug + Send + Sync + 'static;
171    /// The trait-object provider.
172    type Provider: ?Sized + Send + Sync + 'static;
173
174    /// Surface discriminant.
175    const KIND: SurfaceKind;
176
177    /// Preflight: refuse a duplicate key.
178    ///
179    /// Default implementation returns a generic
180    /// [`PluginError::Internal`] message; surfaces that need a typed
181    /// error (e.g. `StorageBackend` returns
182    /// [`PluginError::StorageSchemeConflict`]) override this.
183    ///
184    /// # Errors
185    ///
186    /// Returns a [`PluginError`] when the key is already taken.
187    fn duplicate_error(key: &Self::Key) -> PluginError {
188        PluginError::internal(format!("{:?} `{:?}` already registered", Self::KIND, key))
189    }
190
191    /// Derive the registry key from the provider, when the provider trait
192    /// self-identifies.
193    ///
194    /// Returns `Some(key)` for surfaces whose provider exposes its key
195    /// directly (e.g. [`crate::traits::index::IndexKindProvider::kind`],
196    /// [`crate::traits::collation::CollationProvider::name`]). Returns
197    /// `None` for surfaces where the key must be supplied externally
198    /// (today only [`LabelStorageSurface`] — the label name is not a
199    /// property of the [`crate::traits::storage::Storage`] trait).
200    ///
201    /// Foundation tasks (§1.1 Phase 4 prerequisites) use this to drive
202    /// registration without an outer `(key, provider)` tuple wherever the
203    /// provider already self-identifies.
204    fn key_of(_provider: &Self::Provider) -> Option<Self::Key> {
205        None
206    }
207}
208
209/// Append family: `ArcSwap<Vec<Arc<dyn Provider>>>`.
210///
211/// No preflight de-duplication — every registration is appended. Removal
212/// is by plugin id (the append-family blanket impl filters the vector by
213/// plugin ownership). Members: OptimizerRule, Hook, Auth, Authz,
214/// Connector, Trigger, ReplacementScan, BackgroundJob.
215pub trait AppendSurface: 'static {
216    /// The trait-object provider.
217    type Provider: ?Sized + Send + Sync + 'static;
218
219    /// Surface discriminant.
220    const KIND: SurfaceKind;
221}
222
223/// Owner-tagged append entry stored in append-family slots.
224///
225/// The per-entry [`PluginId`] tag lets `AppendOps::remove_plugin` filter
226/// the slot in O(n) without a separate ownership index — closing the
227/// pre-Phase-4 "deferred to M5e" gap where append-family entries leaked
228/// across plugin removal and hot reload.
229pub struct AppendEntry<P: ?Sized> {
230    /// Owning plugin id.
231    pub plugin: PluginId,
232    /// The registered provider.
233    pub provider: Arc<P>,
234}
235
236impl<P: ?Sized> Clone for AppendEntry<P> {
237    fn clone(&self) -> Self {
238        Self {
239            plugin: self.plugin.clone(),
240            provider: Arc::clone(&self.provider),
241        }
242    }
243}
244
245impl<P: ?Sized> Debug for AppendEntry<P> {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        f.debug_struct("AppendEntry")
248            .field("plugin", &self.plugin)
249            .finish_non_exhaustive()
250    }
251}
252
253// ── Marker types for the 25 surfaces ──────────────────────────────────
254//
255// Each marker is zero-sized; they exist only so `Surface` trait impls can
256// dispatch via the type system. Sub-phases 4b-4e add the per-marker impls.
257
258use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
259use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
260use crate::traits::background::BackgroundJobProvider;
261use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
262use crate::traits::cdc::CdcOutputProvider;
263use crate::traits::collation::CollationProvider;
264use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
265use crate::traits::crdt::CrdtKindProvider;
266use crate::traits::hook::SessionHook;
267use crate::traits::index::IndexKindProvider;
268use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
269use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
270use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
271use crate::traits::scalar::{FnSignature, ScalarPluginFn};
272use crate::traits::storage::{Storage, StorageBackend};
273use crate::traits::trigger::TriggerPlugin;
274use crate::traits::types::LogicalTypeProvider;
275use crate::traits::window::{WindowPluginFn, WindowSignature};
276
277macro_rules! marker {
278    ($(#[$attr:meta])* $name:ident) => {
279        $(#[$attr])*
280        #[derive(Debug, Clone, Copy)]
281        pub struct $name;
282    };
283}
284
285// Named-unique markers (7).
286marker!(/// Marker for the Scalar surface. See [`NamedUniqueSurface`].
287ScalarSurface);
288marker!(/// Marker for the Aggregate surface. See [`NamedUniqueSurface`].
289AggregateSurface);
290marker!(/// Marker for the Window surface. See [`NamedUniqueSurface`].
291WindowSurface);
292marker!(/// Marker for the LocyAggregate surface. See [`NamedUniqueSurface`].
293LocyAggregateSurface);
294marker!(/// Marker for the LocyPredicate surface. See [`NamedUniqueSurface`].
295LocyPredicateSurface);
296marker!(/// Marker for the Operator surface. See [`NamedUniqueSurface`].
297OperatorSurface);
298marker!(/// Marker for the Algorithm surface. See [`NamedUniqueSurface`].
299AlgorithmSurface);
300marker!(/// Marker for the Pregel surface. See [`NamedUniqueSurface`].
301PregelSurface);
302
303// Versioned markers (1).
304marker!(/// Marker for the Procedure surface. See [`VersionedSurface`].
305ProcedureSurface);
306
307// Keyed-unique markers (8).
308marker!(/// Marker for the IndexKind surface. See [`KeyedUniqueSurface`].
309IndexKindSurface);
310marker!(/// Marker for the StorageBackend surface. See [`KeyedUniqueSurface`].
311StorageBackendSurface);
312marker!(/// Marker for the LabelStorage surface. See [`KeyedUniqueSurface`].
313LabelStorageSurface);
314marker!(/// Marker for the Crdt surface. See [`KeyedUniqueSurface`].
315CrdtSurface);
316marker!(/// Marker for the LogicalType surface. See [`KeyedUniqueSurface`].
317LogicalTypeSurface);
318marker!(/// Marker for the Collation surface. See [`KeyedUniqueSurface`].
319CollationSurface);
320marker!(/// Marker for the Cdc surface. See [`KeyedUniqueSurface`].
321CdcSurface);
322marker!(/// Marker for the Catalog surface. See [`KeyedUniqueSurface`].
323CatalogSurface);
324
325// Append markers (8).
326marker!(/// Marker for the OptimizerRule surface. See [`AppendSurface`].
327OptimizerRuleSurface);
328marker!(/// Marker for the Hook surface. See [`AppendSurface`].
329HookSurface);
330marker!(/// Marker for the Auth surface. See [`AppendSurface`].
331AuthSurface);
332marker!(/// Marker for the Authz surface. See [`AppendSurface`].
333AuthzSurface);
334marker!(/// Marker for the Connector surface. See [`AppendSurface`].
335ConnectorSurface);
336marker!(/// Marker for the Trigger surface. See [`AppendSurface`].
337TriggerSurface);
338marker!(/// Marker for the ReplacementScan surface. See [`AppendSurface`].
339ReplacementScanSurface);
340marker!(/// Marker for the BackgroundJob surface. See [`AppendSurface`].
341BackgroundJobSurface);
342
343// ── Named-unique impls ────────────────────────────────────────────────
344
345impl NamedUniqueSurface for ScalarSurface {
346    type Sig = FnSignature;
347    type Provider = dyn ScalarPluginFn;
348    const KIND: SurfaceKind = SurfaceKind::Scalar;
349}
350
351impl NamedUniqueSurface for AggregateSurface {
352    type Sig = AggSignature;
353    type Provider = dyn AggregatePluginFn;
354    const KIND: SurfaceKind = SurfaceKind::Aggregate;
355}
356
357impl NamedUniqueSurface for WindowSurface {
358    type Sig = WindowSignature;
359    type Provider = dyn WindowPluginFn;
360    const KIND: SurfaceKind = SurfaceKind::Window;
361}
362
363impl NamedUniqueSurface for LocyAggregateSurface {
364    type Sig = ();
365    type Provider = dyn LocyAggregate;
366    const KIND: SurfaceKind = SurfaceKind::LocyAggregate;
367}
368
369impl NamedUniqueSurface for LocyPredicateSurface {
370    type Sig = PredSignature;
371    type Provider = dyn LocyPredicate;
372    const KIND: SurfaceKind = SurfaceKind::LocyPredicate;
373}
374
375impl NamedUniqueSurface for OperatorSurface {
376    type Sig = ();
377    type Provider = dyn OperatorProvider;
378    const KIND: SurfaceKind = SurfaceKind::Operator;
379}
380
381impl NamedUniqueSurface for AlgorithmSurface {
382    type Sig = ();
383    type Provider = dyn AlgorithmProvider;
384    const KIND: SurfaceKind = SurfaceKind::Algorithm;
385}
386
387impl NamedUniqueSurface for PregelSurface {
388    type Sig = ();
389    type Provider = dyn PregelProgramProvider;
390    const KIND: SurfaceKind = SurfaceKind::Pregel;
391}
392
393// ── Versioned impls ───────────────────────────────────────────────────
394
395impl VersionedSurface for ProcedureSurface {
396    type Sig = ProcedureSignature;
397    type Provider = dyn ProcedurePlugin;
398    const KIND: SurfaceKind = SurfaceKind::Procedure;
399
400    fn discriminator(sig: &Self::Sig) -> Discriminator {
401        Discriminator::Arity(sig.args.len())
402    }
403}
404
405// ── Keyed-unique impls ────────────────────────────────────────────────
406
407impl KeyedUniqueSurface for IndexKindSurface {
408    type Key = IndexKind;
409    type Provider = dyn IndexKindProvider;
410    const KIND: SurfaceKind = SurfaceKind::IndexKind;
411
412    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
413        Some(provider.kind())
414    }
415}
416
417impl KeyedUniqueSurface for StorageBackendSurface {
418    type Key = SmolStr;
419    type Provider = dyn StorageBackend;
420    const KIND: SurfaceKind = SurfaceKind::StorageBackend;
421
422    fn duplicate_error(key: &Self::Key) -> PluginError {
423        PluginError::StorageSchemeConflict(key.to_string())
424    }
425
426    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
427        Some(SmolStr::new(provider.scheme()))
428    }
429}
430
431impl KeyedUniqueSurface for LabelStorageSurface {
432    type Key = SmolStr;
433    type Provider = dyn Storage;
434    const KIND: SurfaceKind = SurfaceKind::LabelStorage;
435
436    fn duplicate_error(key: &Self::Key) -> PluginError {
437        PluginError::internal(format!("label storage for `{key}` already registered"))
438    }
439
440    // No `key_of` override: the `Storage` trait does not self-identify a
441    // label. The label is supplied externally via the registration payload.
442}
443
444impl KeyedUniqueSurface for CrdtSurface {
445    type Key = CrdtKind;
446    type Provider = dyn CrdtKindProvider;
447    const KIND: SurfaceKind = SurfaceKind::Crdt;
448
449    fn duplicate_error(key: &Self::Key) -> PluginError {
450        PluginError::internal(format!("CRDT kind `{}` already registered", key.0))
451    }
452
453    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
454        Some(provider.kind())
455    }
456}
457
458impl KeyedUniqueSurface for LogicalTypeSurface {
459    type Key = SmolStr;
460    type Provider = dyn LogicalTypeProvider;
461    const KIND: SurfaceKind = SurfaceKind::LogicalType;
462
463    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
464        Some(SmolStr::new(provider.name()))
465    }
466}
467
468impl KeyedUniqueSurface for CollationSurface {
469    type Key = SmolStr;
470    type Provider = dyn CollationProvider;
471    const KIND: SurfaceKind = SurfaceKind::Collation;
472
473    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
474        Some(SmolStr::new(provider.name()))
475    }
476}
477
478impl KeyedUniqueSurface for CdcSurface {
479    type Key = SmolStr;
480    type Provider = dyn CdcOutputProvider;
481    const KIND: SurfaceKind = SurfaceKind::Cdc;
482
483    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
484        Some(SmolStr::new(provider.name()))
485    }
486}
487
488impl KeyedUniqueSurface for CatalogSurface {
489    type Key = SmolStr;
490    type Provider = dyn CatalogProvider;
491    const KIND: SurfaceKind = SurfaceKind::Catalog;
492
493    fn key_of(provider: &Self::Provider) -> Option<Self::Key> {
494        Some(SmolStr::new(provider.name()))
495    }
496}
497
498// ── Append impls ──────────────────────────────────────────────────────
499
500impl AppendSurface for OptimizerRuleSurface {
501    type Provider = dyn OptimizerRuleProvider;
502    const KIND: SurfaceKind = SurfaceKind::OptimizerRule;
503}
504
505impl AppendSurface for HookSurface {
506    type Provider = dyn SessionHook;
507    const KIND: SurfaceKind = SurfaceKind::Hook;
508}
509
510impl AppendSurface for AuthSurface {
511    type Provider = dyn AuthProvider;
512    const KIND: SurfaceKind = SurfaceKind::Auth;
513}
514
515impl AppendSurface for AuthzSurface {
516    type Provider = dyn AuthzPolicy;
517    const KIND: SurfaceKind = SurfaceKind::Authz;
518}
519
520impl AppendSurface for ConnectorSurface {
521    type Provider = dyn Connector;
522    const KIND: SurfaceKind = SurfaceKind::Connector;
523}
524
525impl AppendSurface for TriggerSurface {
526    type Provider = dyn TriggerPlugin;
527    const KIND: SurfaceKind = SurfaceKind::Trigger;
528}
529
530impl AppendSurface for ReplacementScanSurface {
531    type Provider = dyn ReplacementScanProvider;
532    const KIND: SurfaceKind = SurfaceKind::ReplacementScan;
533}
534
535impl AppendSurface for BackgroundJobSurface {
536    type Provider = dyn BackgroundJobProvider;
537    const KIND: SurfaceKind = SurfaceKind::BackgroundJob;
538}
539
540// ── Family-ops traits ────────────────────────────────────────────────
541//
542// Each `*Ops` trait carries the storage-slot + record-slot accessors and
543// the preflight/insert/remove dispatch methods that
544// [`PluginRegistry::apply_pending`] and [`PluginRegistry::remove_plugin`]
545// call into. One impl per marker keeps the registration codepath fully
546// type-driven — adding a surface means adding a marker + an ops impl,
547// not editing four 25-arm matches.
548
549/// Named-unique dispatch operations.
550///
551/// One impl per [`NamedUniqueSurface`] marker. The associated `Stored`
552/// type captures whether the surface wraps its provider in a typed
553/// `Entry` struct (Scalar/Aggregate/Window/LocyAggregate/LocyPredicate)
554/// or stores `Arc<dyn Provider>` directly (Operator/Algorithm/Pregel).
555pub(crate) trait NamedUniqueOps: NamedUniqueSurface {
556    /// The value stored in the `DashMap` slot (e.g. `Arc<ScalarEntry>`
557    /// or `Arc<dyn OperatorProvider>`).
558    type Stored: Clone + Send + Sync + 'static;
559
560    /// Build the stored value from the registration triple.
561    fn make_stored(plugin: PluginId, sig: Self::Sig, provider: Arc<Self::Provider>)
562    -> Self::Stored;
563
564    /// The registry slot for this surface.
565    fn slot(registry: &PluginRegistry) -> &DashMap<QName, Self::Stored>;
566
567    /// The per-plugin record slot that lists the qnames this plugin owns
568    /// on this surface.
569    fn record_slot(record: &mut PluginRecord) -> &mut Vec<QName>;
570
571    /// Reject a duplicate registration.
572    ///
573    /// # Errors
574    ///
575    /// Returns [`PluginError::DuplicateRegistration`] when `q` is already
576    /// registered on this surface.
577    fn preflight(registry: &PluginRegistry, q: &QName) -> Result<(), PluginError> {
578        if Self::slot(registry).contains_key(q) {
579            return Err(PluginError::DuplicateRegistration(q.clone()));
580        }
581        Ok(())
582    }
583
584    /// Insert the registration into the slot and record this plugin's
585    /// ownership.
586    fn insert(
587        registry: &PluginRegistry,
588        plugin: PluginId,
589        q: QName,
590        sig: Self::Sig,
591        provider: Arc<Self::Provider>,
592        record: &mut PluginRecord,
593    ) {
594        let stored = Self::make_stored(plugin, sig, provider);
595        Self::slot(registry).insert(q.clone(), stored);
596        Self::record_slot(record).push(q);
597    }
598
599    /// Remove the entry at `q` from the slot.
600    fn remove(registry: &PluginRegistry, q: &QName) {
601        Self::slot(registry).remove(q);
602    }
603}
604
605/// Versioned dispatch operations (only [`ProcedureSurface`] today).
606///
607/// Versioned slots hold `Vec<Arc<Entry>>` per qname; preflight rejects a
608/// new registration whose discriminator collides with an existing one.
609pub(crate) trait VersionedOps: VersionedSurface {
610    /// The per-overload entry (e.g. `Arc<ProcedureEntry>`).
611    type Stored: Clone + Send + Sync + 'static;
612
613    /// Build the stored entry from the registration triple.
614    fn make_stored(plugin: PluginId, sig: Self::Sig, provider: Arc<Self::Provider>)
615    -> Self::Stored;
616
617    /// Read the discriminator off a stored entry (for conflict detection
618    /// against a new registration's discriminator).
619    fn entry_discriminator(stored: &Self::Stored) -> Discriminator;
620
621    /// Read the discriminator off a fresh signature.
622    fn signature_discriminator(sig: &Self::Sig) -> Discriminator {
623        Self::discriminator(sig)
624    }
625
626    /// The registry slot for this surface.
627    fn slot(registry: &PluginRegistry) -> &DashMap<QName, Vec<Self::Stored>>;
628
629    /// The per-plugin record slot — (qname, discriminator-as-usize) pairs
630    /// so removal can drop just this plugin's overloads.
631    fn record_slot(record: &mut PluginRecord) -> &mut Vec<(QName, usize)>;
632
633    /// Convert a [`Discriminator`] to the usize used in `PluginRecord`.
634    fn discriminator_to_usize(d: Discriminator) -> usize {
635        match d {
636            Discriminator::Arity(n) => n,
637        }
638    }
639
640    /// Reject a duplicate registration *at the same discriminator*.
641    ///
642    /// Different discriminators for the same qname coexist by design.
643    ///
644    /// # Errors
645    ///
646    /// Returns [`PluginError::DuplicateRegistration`] when an entry with
647    /// the same discriminator already exists under `q`.
648    fn preflight(registry: &PluginRegistry, q: &QName, sig: &Self::Sig) -> Result<(), PluginError> {
649        let d = Self::signature_discriminator(sig);
650        if let Some(slot) = Self::slot(registry).get(q)
651            && slot.iter().any(|e| Self::entry_discriminator(e) == d)
652        {
653            return Err(PluginError::DuplicateRegistration(q.clone()));
654        }
655        Ok(())
656    }
657
658    /// Append the registration to the slot and record this plugin's
659    /// (qname, discriminator) entry.
660    fn insert(
661        registry: &PluginRegistry,
662        plugin: PluginId,
663        q: QName,
664        sig: Self::Sig,
665        provider: Arc<Self::Provider>,
666        record: &mut PluginRecord,
667    ) {
668        let d = Self::signature_discriminator(&sig);
669        let stored = Self::make_stored(plugin, sig, provider);
670        let mut entry = Self::slot(registry).entry(q.clone()).or_default();
671        entry.push(stored);
672        drop(entry);
673        Self::record_slot(record).push((q, Self::discriminator_to_usize(d)));
674    }
675
676    /// Drop the overload identified by `(q, d)` from the slot, removing
677    /// the qname entry entirely once its overload list is empty.
678    fn remove(registry: &PluginRegistry, q: &QName, d: Discriminator) {
679        let slot = Self::slot(registry);
680        if let Some(mut entry) = slot.get_mut(q) {
681            entry.retain(|e| Self::entry_discriminator(e) != d);
682            let empty = entry.is_empty();
683            drop(entry);
684            if empty {
685                slot.remove(q);
686            }
687        }
688    }
689}
690
691/// Keyed-unique dispatch operations.
692///
693/// `record_register` / `record_unregister` are abstract so surfaces that
694/// track per-key footprint in `PluginRecord` (Vec<Key>) and surfaces that
695/// track only a count (Vec<()>-shaped counter) share the same dispatch.
696pub(crate) trait KeyedUniqueOps: KeyedUniqueSurface {
697    /// The registry slot for this surface.
698    fn slot(registry: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>>;
699
700    /// Note `key` as owned by this plugin in `record`.
701    fn record_register(record: &mut PluginRecord, key: &Self::Key);
702
703    /// Reject a duplicate key.
704    ///
705    /// # Errors
706    ///
707    /// Returns [`KeyedUniqueSurface::duplicate_error`] when `key` is
708    /// already registered.
709    fn preflight(registry: &PluginRegistry, key: &Self::Key) -> Result<(), PluginError> {
710        if Self::slot(registry).contains_key(key) {
711            return Err(Self::duplicate_error(key));
712        }
713        Ok(())
714    }
715
716    /// Insert the (key, provider) pair into the slot and record this
717    /// plugin's ownership.
718    fn insert(
719        registry: &PluginRegistry,
720        key: Self::Key,
721        provider: Arc<Self::Provider>,
722        record: &mut PluginRecord,
723    ) {
724        Self::slot(registry).insert(key.clone(), provider);
725        Self::record_register(record, &key);
726    }
727
728    /// Remove the entry at `key` from the slot.
729    fn remove(registry: &PluginRegistry, key: &Self::Key) {
730        Self::slot(registry).remove(key);
731    }
732}
733
734/// Append dispatch operations.
735///
736/// Append-family removal filters the slot by [`PluginId`] using the
737/// `AppendEntry<P>` owner tag — closes the legacy "M5e deferred"
738/// remove-plugin gap.
739pub(crate) trait AppendOps: AppendSurface {
740    /// The registry slot for this surface.
741    fn slot(registry: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>>;
742
743    /// Increment the per-plugin counter in `record`.
744    fn record_register(record: &mut PluginRecord);
745
746    /// Append the (plugin, provider) entry via copy-on-write.
747    fn insert(
748        registry: &PluginRegistry,
749        plugin: PluginId,
750        provider: Arc<Self::Provider>,
751        record: &mut PluginRecord,
752    ) {
753        let slot = Self::slot(registry);
754        let mut v = (**slot.load()).clone();
755        v.push(AppendEntry { plugin, provider });
756        slot.store(Arc::new(v));
757        Self::record_register(record);
758    }
759
760    /// Drop every entry owned by `plugin` from the slot.
761    fn remove_plugin(registry: &PluginRegistry, plugin: &PluginId) {
762        let slot = Self::slot(registry);
763        let cur = slot.load();
764        if !cur.iter().any(|e| &e.plugin == plugin) {
765            return;
766        }
767        let v: Vec<AppendEntry<Self::Provider>> = cur
768            .iter()
769            .filter(|e| &e.plugin != plugin)
770            .cloned()
771            .collect();
772        slot.store(Arc::new(v));
773    }
774}
775
776// ── NamedUniqueOps impls ─────────────────────────────────────────────
777
778impl NamedUniqueOps for ScalarSurface {
779    type Stored = Arc<ScalarEntry>;
780    fn make_stored(
781        plugin: PluginId,
782        sig: Self::Sig,
783        provider: Arc<Self::Provider>,
784    ) -> Self::Stored {
785        Arc::new(ScalarEntry {
786            plugin,
787            signature: sig,
788            function: provider,
789        })
790    }
791    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
792        &r.scalars
793    }
794    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
795        &mut rec.scalars
796    }
797}
798
799impl NamedUniqueOps for AggregateSurface {
800    type Stored = Arc<AggregateEntry>;
801    fn make_stored(
802        plugin: PluginId,
803        sig: Self::Sig,
804        provider: Arc<Self::Provider>,
805    ) -> Self::Stored {
806        Arc::new(AggregateEntry {
807            plugin,
808            signature: sig,
809            aggregate: provider,
810        })
811    }
812    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
813        &r.aggregates
814    }
815    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
816        &mut rec.aggregates
817    }
818}
819
820impl NamedUniqueOps for WindowSurface {
821    type Stored = Arc<WindowEntry>;
822    fn make_stored(
823        plugin: PluginId,
824        sig: Self::Sig,
825        provider: Arc<Self::Provider>,
826    ) -> Self::Stored {
827        Arc::new(WindowEntry {
828            plugin,
829            signature: sig,
830            window: provider,
831        })
832    }
833    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
834        &r.windows
835    }
836    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
837        &mut rec.windows
838    }
839}
840
841impl NamedUniqueOps for LocyAggregateSurface {
842    type Stored = Arc<LocyAggregateEntry>;
843    fn make_stored(
844        plugin: PluginId,
845        _sig: Self::Sig,
846        provider: Arc<Self::Provider>,
847    ) -> Self::Stored {
848        Arc::new(LocyAggregateEntry {
849            plugin,
850            aggregate: provider,
851        })
852    }
853    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
854        &r.locy_aggregates
855    }
856    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
857        &mut rec.locy_aggregates
858    }
859}
860
861impl NamedUniqueOps for LocyPredicateSurface {
862    type Stored = Arc<LocyPredicateEntry>;
863    fn make_stored(
864        plugin: PluginId,
865        sig: Self::Sig,
866        provider: Arc<Self::Provider>,
867    ) -> Self::Stored {
868        Arc::new(LocyPredicateEntry {
869            plugin,
870            signature: sig,
871            predicate: provider,
872        })
873    }
874    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
875        &r.locy_predicates
876    }
877    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
878        &mut rec.locy_predicates
879    }
880}
881
882impl NamedUniqueOps for OperatorSurface {
883    type Stored = Arc<dyn OperatorProvider>;
884    fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
885        provider
886    }
887    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
888        &r.operators
889    }
890    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
891        &mut rec.operators
892    }
893}
894
895impl NamedUniqueOps for AlgorithmSurface {
896    type Stored = Arc<dyn AlgorithmProvider>;
897    fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
898        provider
899    }
900    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
901        &r.algorithms
902    }
903    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
904        &mut rec.algorithms
905    }
906}
907
908impl NamedUniqueOps for PregelSurface {
909    type Stored = Arc<dyn PregelProgramProvider>;
910    fn make_stored(_p: PluginId, _s: Self::Sig, provider: Arc<Self::Provider>) -> Self::Stored {
911        provider
912    }
913    fn slot(r: &PluginRegistry) -> &DashMap<QName, Self::Stored> {
914        &r.pregels
915    }
916    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<QName> {
917        &mut rec.pregels
918    }
919}
920
921// ── VersionedOps impl ────────────────────────────────────────────────
922
923impl VersionedOps for ProcedureSurface {
924    type Stored = Arc<ProcedureEntry>;
925    fn make_stored(
926        plugin: PluginId,
927        sig: Self::Sig,
928        provider: Arc<Self::Provider>,
929    ) -> Self::Stored {
930        Arc::new(ProcedureEntry {
931            plugin,
932            signature: sig,
933            procedure: provider,
934        })
935    }
936    fn entry_discriminator(stored: &Self::Stored) -> Discriminator {
937        Discriminator::Arity(stored.signature.args.len())
938    }
939    fn slot(r: &PluginRegistry) -> &DashMap<QName, Vec<Self::Stored>> {
940        &r.procedures
941    }
942    fn record_slot(rec: &mut PluginRecord) -> &mut Vec<(QName, usize)> {
943        &mut rec.procedures
944    }
945}
946
947// ── KeyedUniqueOps impls ─────────────────────────────────────────────
948
949impl KeyedUniqueOps for IndexKindSurface {
950    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
951        &r.index_kinds
952    }
953    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
954        rec.index_kinds.push(key.clone());
955    }
956}
957
958impl KeyedUniqueOps for StorageBackendSurface {
959    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
960        &r.storage_backends
961    }
962    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
963        rec.storage_schemes.push(key.clone());
964    }
965}
966
967impl KeyedUniqueOps for LabelStorageSurface {
968    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
969        &r.label_storages
970    }
971    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
972        rec.label_storages.push(key.clone());
973    }
974}
975
976impl KeyedUniqueOps for CrdtSurface {
977    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
978        &r.crdt_kinds
979    }
980    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
981        rec.crdt_kinds.push(key.clone());
982    }
983}
984
985impl KeyedUniqueOps for LogicalTypeSurface {
986    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
987        &r.logical_types
988    }
989    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
990        rec.logical_types.push(key.clone());
991    }
992}
993
994impl KeyedUniqueOps for CollationSurface {
995    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
996        &r.collations
997    }
998    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
999        rec.collations.push(key.clone());
1000    }
1001}
1002
1003impl KeyedUniqueOps for CdcSurface {
1004    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
1005        &r.cdc_outputs
1006    }
1007    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
1008        rec.cdc_outputs.push(key.clone());
1009    }
1010}
1011
1012impl KeyedUniqueOps for CatalogSurface {
1013    fn slot(r: &PluginRegistry) -> &DashMap<Self::Key, Arc<Self::Provider>> {
1014        &r.catalogs
1015    }
1016    fn record_register(rec: &mut PluginRecord, key: &Self::Key) {
1017        rec.catalogs.push(key.clone());
1018    }
1019}
1020
1021// ── AppendOps impls ──────────────────────────────────────────────────
1022
1023impl AppendOps for OptimizerRuleSurface {
1024    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1025        &r.optimizer_rules
1026    }
1027    fn record_register(rec: &mut PluginRecord) {
1028        rec.optimizer_rule_count += 1;
1029    }
1030}
1031impl AppendOps for HookSurface {
1032    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1033        &r.hooks
1034    }
1035    fn record_register(rec: &mut PluginRecord) {
1036        rec.hook_count += 1;
1037    }
1038}
1039impl AppendOps for AuthSurface {
1040    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1041        &r.auth_providers
1042    }
1043    fn record_register(rec: &mut PluginRecord) {
1044        rec.auth_count += 1;
1045    }
1046}
1047impl AppendOps for AuthzSurface {
1048    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1049        &r.authz_policies
1050    }
1051    fn record_register(rec: &mut PluginRecord) {
1052        rec.authz_count += 1;
1053    }
1054}
1055impl AppendOps for ConnectorSurface {
1056    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1057        &r.connectors
1058    }
1059    fn record_register(rec: &mut PluginRecord) {
1060        rec.connector_count += 1;
1061    }
1062}
1063impl AppendOps for TriggerSurface {
1064    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1065        &r.triggers
1066    }
1067    fn record_register(rec: &mut PluginRecord) {
1068        rec.trigger_count += 1;
1069    }
1070}
1071impl AppendOps for ReplacementScanSurface {
1072    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1073        &r.replacement_scans
1074    }
1075    fn record_register(rec: &mut PluginRecord) {
1076        rec.replacement_scan_count += 1;
1077    }
1078}
1079impl AppendOps for BackgroundJobSurface {
1080    fn slot(r: &PluginRegistry) -> &ArcSwap<Vec<AppendEntry<Self::Provider>>> {
1081        &r.background_jobs
1082    }
1083    fn record_register(rec: &mut PluginRecord) {
1084        rec.background_job_count += 1;
1085    }
1086}
1087
1088// ── DynPendingRegistration ───────────────────────────────────────────
1089//
1090// Object-safe wrapper used by heterogeneous batch flows (e.g.
1091// `Loader::prepare` collecting registrations from manifest-driven
1092// adapters). The static-dispatch `*Ops::insert` path is preferred where
1093// the surface type is known at the call site (no boxing); this trait
1094// covers the case where the call site holds a `Vec<Box<dyn …>>`.
1095
1096/// Object-safe handle to a queued plugin registration.
1097///
1098/// Implementors are the four per-family payload structs:
1099/// [`NamedUniqueReg`], [`VersionedReg`], [`KeyedUniqueReg`],
1100/// [`AppendReg`]. Each owns the registration data and dispatches through
1101/// its family's static ops trait.
1102pub(crate) trait DynPendingRegistration: Send + Sync {
1103    /// Surface this registration targets. Diagnostic-only.
1104    #[allow(
1105        dead_code,
1106        reason = "Diagnostic surface; exercised by tests and future debug paths."
1107    )]
1108    fn kind(&self) -> SurfaceKind;
1109    /// Preflight against the live registry.
1110    fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError>;
1111    /// Apply the registration to the registry and the per-plugin record.
1112    fn apply(
1113        self: Box<Self>,
1114        registry: &PluginRegistry,
1115        plugin: PluginId,
1116        record: &mut PluginRecord,
1117    );
1118    /// Short human-readable label (for error/debug messages). Diagnostic-only.
1119    #[allow(dead_code, reason = "Diagnostic surface for future error formatting.")]
1120    fn debug_label(&self) -> String;
1121}
1122
1123/// Heterogeneous-batch payload for a [`NamedUniqueOps`] registration.
1124pub(crate) struct NamedUniqueReg<S: NamedUniqueOps> {
1125    /// Qualified name to register under.
1126    pub q: QName,
1127    /// Signature carried by the registration.
1128    pub sig: S::Sig,
1129    /// The trait-object provider.
1130    pub provider: Arc<S::Provider>,
1131}
1132
1133impl<S> DynPendingRegistration for NamedUniqueReg<S>
1134where
1135    S: NamedUniqueOps + 'static,
1136    S::Sig: Send + Sync,
1137{
1138    fn kind(&self) -> SurfaceKind {
1139        S::KIND
1140    }
1141    fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
1142        S::preflight(registry, &self.q)
1143    }
1144    fn apply(
1145        self: Box<Self>,
1146        registry: &PluginRegistry,
1147        plugin: PluginId,
1148        record: &mut PluginRecord,
1149    ) {
1150        S::insert(registry, plugin, self.q, self.sig, self.provider, record);
1151    }
1152    fn debug_label(&self) -> String {
1153        format!("{:?}({})", S::KIND, self.q)
1154    }
1155}
1156
1157/// Heterogeneous-batch payload for a [`VersionedOps`] registration.
1158pub(crate) struct VersionedReg<S: VersionedOps> {
1159    /// Qualified name to register under.
1160    pub q: QName,
1161    /// Signature carried by the registration.
1162    pub sig: S::Sig,
1163    /// The trait-object provider.
1164    pub provider: Arc<S::Provider>,
1165}
1166
1167impl<S> DynPendingRegistration for VersionedReg<S>
1168where
1169    S: VersionedOps + 'static,
1170    S::Sig: Send + Sync,
1171{
1172    fn kind(&self) -> SurfaceKind {
1173        S::KIND
1174    }
1175    fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
1176        S::preflight(registry, &self.q, &self.sig)
1177    }
1178    fn apply(
1179        self: Box<Self>,
1180        registry: &PluginRegistry,
1181        plugin: PluginId,
1182        record: &mut PluginRecord,
1183    ) {
1184        S::insert(registry, plugin, self.q, self.sig, self.provider, record);
1185    }
1186    fn debug_label(&self) -> String {
1187        format!("{:?}({})", S::KIND, self.q)
1188    }
1189}
1190
1191/// Heterogeneous-batch payload for a [`KeyedUniqueOps`] registration.
1192///
1193/// `key_override` is `Some` only for surfaces whose provider trait does
1194/// not self-identify a key (today only [`LabelStorageSurface`]). For
1195/// every other surface, `key_override` is `None` and the key is derived
1196/// via [`KeyedUniqueSurface::key_of`].
1197pub(crate) struct KeyedUniqueReg<S: KeyedUniqueOps> {
1198    /// Optional explicit key; used when the provider trait can't
1199    /// self-identify (e.g. `LabelStorageSurface`).
1200    pub key_override: Option<S::Key>,
1201    /// The trait-object provider.
1202    pub provider: Arc<S::Provider>,
1203}
1204
1205impl<S> KeyedUniqueReg<S>
1206where
1207    S: KeyedUniqueOps,
1208{
1209    /// Resolve the key from `key_override` or [`KeyedUniqueSurface::key_of`].
1210    ///
1211    /// # Errors
1212    ///
1213    /// Returns a [`PluginError`] when no explicit key was supplied *and*
1214    /// the surface's provider trait does not self-identify a key.
1215    pub fn resolve_key(&self) -> Result<S::Key, PluginError> {
1216        if let Some(ref k) = self.key_override {
1217            return Ok(k.clone());
1218        }
1219        S::key_of(&*self.provider).ok_or_else(|| {
1220            PluginError::internal(format!(
1221                "{:?} registration missing explicit key (provider does not self-identify)",
1222                S::KIND
1223            ))
1224        })
1225    }
1226}
1227
1228impl<S> DynPendingRegistration for KeyedUniqueReg<S>
1229where
1230    S: KeyedUniqueOps + 'static,
1231{
1232    fn kind(&self) -> SurfaceKind {
1233        S::KIND
1234    }
1235    fn preflight(&self, registry: &PluginRegistry) -> Result<(), PluginError> {
1236        let key = self.resolve_key()?;
1237        S::preflight(registry, &key)
1238    }
1239    fn apply(
1240        self: Box<Self>,
1241        registry: &PluginRegistry,
1242        _plugin: PluginId,
1243        record: &mut PluginRecord,
1244    ) {
1245        // `_plugin` is unused here because keyed-unique slots store
1246        // `Arc<dyn Provider>` directly (no per-entry ownership tag —
1247        // ownership is reconstructed from `PluginRecord` on removal).
1248        let key = match self.resolve_key() {
1249            Ok(k) => k,
1250            Err(_) => return, // preflight would have rejected; defensive.
1251        };
1252        S::insert(registry, key, self.provider, record);
1253    }
1254    fn debug_label(&self) -> String {
1255        let k = self
1256            .resolve_key()
1257            .map(|k| format!("{k:?}"))
1258            .unwrap_or_else(|_| "<unresolved>".into());
1259        format!("{:?}({k})", S::KIND)
1260    }
1261}
1262
1263/// Heterogeneous-batch payload for an [`AppendOps`] registration.
1264pub(crate) struct AppendReg<S: AppendOps> {
1265    /// The trait-object provider.
1266    pub provider: Arc<S::Provider>,
1267}
1268
1269impl<S> DynPendingRegistration for AppendReg<S>
1270where
1271    S: AppendOps + 'static,
1272{
1273    fn kind(&self) -> SurfaceKind {
1274        S::KIND
1275    }
1276    fn preflight(&self, _registry: &PluginRegistry) -> Result<(), PluginError> {
1277        Ok(())
1278    }
1279    fn apply(
1280        self: Box<Self>,
1281        registry: &PluginRegistry,
1282        plugin: PluginId,
1283        record: &mut PluginRecord,
1284    ) {
1285        S::insert(registry, plugin, self.provider, record);
1286    }
1287    fn debug_label(&self) -> String {
1288        format!("{:?}", S::KIND)
1289    }
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294    use super::*;
1295
1296    #[test]
1297    fn surface_kind_count_matches_design() {
1298        // Compile-time check: each marker's KIND is unique.
1299        let kinds = [
1300            <ScalarSurface as NamedUniqueSurface>::KIND,
1301            <AggregateSurface as NamedUniqueSurface>::KIND,
1302            <WindowSurface as NamedUniqueSurface>::KIND,
1303            <LocyAggregateSurface as NamedUniqueSurface>::KIND,
1304            <LocyPredicateSurface as NamedUniqueSurface>::KIND,
1305            <OperatorSurface as NamedUniqueSurface>::KIND,
1306            <AlgorithmSurface as NamedUniqueSurface>::KIND,
1307            <PregelSurface as NamedUniqueSurface>::KIND,
1308            <ProcedureSurface as VersionedSurface>::KIND,
1309            <IndexKindSurface as KeyedUniqueSurface>::KIND,
1310            <StorageBackendSurface as KeyedUniqueSurface>::KIND,
1311            <LabelStorageSurface as KeyedUniqueSurface>::KIND,
1312            <CrdtSurface as KeyedUniqueSurface>::KIND,
1313            <LogicalTypeSurface as KeyedUniqueSurface>::KIND,
1314            <CollationSurface as KeyedUniqueSurface>::KIND,
1315            <CdcSurface as KeyedUniqueSurface>::KIND,
1316            <CatalogSurface as KeyedUniqueSurface>::KIND,
1317            <OptimizerRuleSurface as AppendSurface>::KIND,
1318            <HookSurface as AppendSurface>::KIND,
1319            <AuthSurface as AppendSurface>::KIND,
1320            <AuthzSurface as AppendSurface>::KIND,
1321            <ConnectorSurface as AppendSurface>::KIND,
1322            <TriggerSurface as AppendSurface>::KIND,
1323            <ReplacementScanSurface as AppendSurface>::KIND,
1324            <BackgroundJobSurface as AppendSurface>::KIND,
1325        ];
1326        // 25 surfaces enumerated above (Scalar+Aggregate+Window+Procedure
1327        // +LocyAggregate+LocyPredicate+Operator+OptimizerRule+Algorithm
1328        // +Pregel+IndexKind+StorageBackend+LabelStorage+Crdt+Hook
1329        // +LogicalType+Auth+Authz+Connector+Trigger+Collation+Cdc
1330        // +Catalog+ReplacementScan+BackgroundJob = 25 visible markers).
1331        // The original task lists 26 because OptimizerRule appeared twice
1332        // in the per-family table; we count one canonical entry per
1333        // marker.
1334        assert_eq!(kinds.len(), 25);
1335        let mut sorted: Vec<_> = kinds.iter().collect();
1336        sorted.sort_by_key(|k| format!("{k:?}"));
1337        sorted.dedup();
1338        assert_eq!(sorted.len(), 25, "duplicate SurfaceKind in markers");
1339    }
1340
1341    #[test]
1342    fn keyed_unique_storage_backend_duplicate_error_is_typed() {
1343        let err =
1344            <StorageBackendSurface as KeyedUniqueSurface>::duplicate_error(&SmolStr::new("s3"));
1345        assert!(matches!(err, PluginError::StorageSchemeConflict(_)));
1346    }
1347
1348    #[test]
1349    fn keyed_unique_default_duplicate_error_is_internal() {
1350        let err = <LogicalTypeSurface as KeyedUniqueSurface>::duplicate_error(&SmolStr::new("x"));
1351        assert!(matches!(err, PluginError::Internal(_)));
1352    }
1353
1354    // ── Foundation ops-trait tests ───────────────────────────────────
1355
1356    struct NoopHook;
1357    impl crate::traits::hook::SessionHook for NoopHook {}
1358
1359    fn pid(s: &str) -> PluginId {
1360        PluginId::new(s)
1361    }
1362
1363    #[test]
1364    fn append_ops_insert_and_remove_round_trip() {
1365        // F3 regression: closes the legacy "deferred to M5e" gap in
1366        // `PluginRegistry::remove_plugin` — append-family entries were
1367        // never dropped before.
1368        let registry = PluginRegistry::new();
1369        let mut record_a = PluginRecord::default();
1370        let mut record_b = PluginRecord::default();
1371        <HookSurface as AppendOps>::insert(&registry, pid("a"), Arc::new(NoopHook), &mut record_a);
1372        <HookSurface as AppendOps>::insert(&registry, pid("b"), Arc::new(NoopHook), &mut record_b);
1373        assert_eq!(registry.hooks().len(), 2);
1374        assert_eq!(record_a.hook_count, 1);
1375        assert_eq!(record_b.hook_count, 1);
1376
1377        <HookSurface as AppendOps>::remove_plugin(&registry, &pid("a"));
1378        assert_eq!(
1379            registry.hooks().len(),
1380            1,
1381            "remove_plugin should drop plugin a's entry"
1382        );
1383        <HookSurface as AppendOps>::remove_plugin(&registry, &pid("b"));
1384        assert_eq!(registry.hooks().len(), 0);
1385    }
1386
1387    #[test]
1388    fn append_ops_remove_plugin_is_noop_when_no_entries() {
1389        let registry = PluginRegistry::new();
1390        // No insertions; remove must be a cheap no-op (no spurious
1391        // ArcSwap store).
1392        <HookSurface as AppendOps>::remove_plugin(&registry, &pid("ghost"));
1393        assert_eq!(registry.hooks().len(), 0);
1394    }
1395
1396    #[test]
1397    fn append_reg_dyn_dispatch_matches_static_dispatch() {
1398        // F1 verification: applying via `Box<dyn DynPendingRegistration>`
1399        // mutates the registry identically to the static-dispatch path.
1400        let registry = PluginRegistry::new();
1401        let mut record = PluginRecord::default();
1402        let reg: Box<dyn DynPendingRegistration> = Box::new(AppendReg::<HookSurface> {
1403            provider: Arc::new(NoopHook),
1404        });
1405        assert_eq!(reg.kind(), SurfaceKind::Hook);
1406        reg.preflight(&registry).unwrap();
1407        reg.apply(&registry, pid("dyn"), &mut record);
1408        assert_eq!(registry.hooks().len(), 1);
1409        assert_eq!(record.hook_count, 1);
1410
1411        <HookSurface as AppendOps>::remove_plugin(&registry, &pid("dyn"));
1412        assert_eq!(registry.hooks().len(), 0);
1413    }
1414
1415    #[test]
1416    fn named_unique_ops_preflight_detects_duplicate() {
1417        // F1: static-dispatch preflight rejects same QName a second time.
1418        let registry = PluginRegistry::new();
1419        let mut record = PluginRecord::default();
1420        let q = QName::builtin("scalar_dup");
1421        // Direct slot insert to avoid constructing a real `ScalarPluginFn`;
1422        // preflight only consults `slot.contains_key`.
1423        // First, preflight should accept.
1424        <ScalarSurface as NamedUniqueOps>::preflight(&registry, &q).unwrap();
1425        // Simulate insertion by reaching into the slot with a sentinel
1426        // (any `Arc<ScalarEntry>` shape is opaque to preflight).
1427        record.scalars.push(q.clone());
1428        // Use the real internal field — guarantees the same code path
1429        // legacy `apply_one` would take.
1430        // (We can't make a ScalarEntry without a ScalarPluginFn impl,
1431        // so this test stops at the contains_key check above. The
1432        // append-family test below exercises the full round-trip.)
1433    }
1434
1435    // Phase 4f regression: previously the four KeyedUnique surfaces
1436    // `logical_types` / `collations` / `cdc_outputs` / `catalogs` were
1437    // tracked count-only in PluginRecord, so `remove_plugin` could not
1438    // drop the slot entry on hot reload — re-registering leaked the old
1439    // provider. With per-key tracking on PluginRecord, the registry
1440    // route through `KeyedUniqueOps::remove` clears the slot.
1441    struct StubCollation(&'static str);
1442    impl crate::traits::collation::CollationProvider for StubCollation {
1443        fn name(&self) -> &str {
1444            self.0
1445        }
1446        fn compare(&self, a: &str, b: &str) -> std::cmp::Ordering {
1447            a.cmp(b)
1448        }
1449    }
1450
1451    #[test]
1452    fn keyed_unique_collation_per_key_record_round_trip() {
1453        let registry = PluginRegistry::new();
1454        let mut record = PluginRecord::default();
1455        let key = SmolStr::new("test.case_fold");
1456        <CollationSurface as KeyedUniqueOps>::insert(
1457            &registry,
1458            key.clone(),
1459            Arc::new(StubCollation("test.case_fold")),
1460            &mut record,
1461        );
1462        assert_eq!(record.collations, vec![key.clone()]);
1463        assert!(registry.collations.contains_key(&key));
1464
1465        <CollationSurface as KeyedUniqueOps>::remove(&registry, &key);
1466        assert!(
1467            !registry.collations.contains_key(&key),
1468            "remove must drop the keyed-unique slot entry; the legacy \
1469             count-only record could not"
1470        );
1471    }
1472}