Skip to main content

uni_plugin/
registry.rs

1//! The [`PluginRegistry`] — per-surface trait-object tables.
2//!
3//! All registrations land here. Reads are wait-free via `arc-swap`; writes
4//! are CAS-style. Hot reload swaps a per-plugin entry; queries holding an
5//! `Arc::clone()` of the old entry continue against the old version until
6//! their reference is dropped.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use arc_swap::ArcSwap;
12use dashmap::DashMap;
13use parking_lot::{Mutex, RwLock};
14use smol_str::SmolStr;
15
16use crate::errors::PluginError;
17use crate::plugin::PluginId;
18use crate::qname::QName;
19use crate::traits::aggregate::{AggSignature, AggregatePluginFn};
20use crate::traits::algorithm::{AlgorithmProvider, PregelProgramProvider};
21use crate::traits::background::BackgroundJobProvider;
22use crate::traits::catalog::{CatalogProvider, ReplacementScanProvider};
23use crate::traits::cdc::CdcOutputProvider;
24use crate::traits::collation::CollationProvider;
25use crate::traits::connector::{AuthProvider, AuthzPolicy, Connector};
26use crate::traits::crdt::{CrdtKind, CrdtKindProvider};
27use crate::traits::hook::SessionHook;
28use crate::traits::index::{IndexHandle, IndexKind, IndexKindProvider};
29use crate::traits::locy::{LocyAggregate, LocyPredicate, PredSignature};
30use crate::traits::operator::{OperatorProvider, OptimizerRuleProvider};
31use crate::traits::procedure::{ProcedurePlugin, ProcedureSignature};
32use crate::traits::scalar::{FnSignature, ScalarPluginFn};
33use crate::traits::storage::StorageBackend;
34use crate::traits::trigger::TriggerPlugin;
35use crate::traits::types::LogicalTypeProvider;
36use crate::traits::window::{WindowPluginFn, WindowSignature};
37
38/// A single scalar-fn registry entry.
39pub struct ScalarEntry {
40    /// Owning plugin id.
41    pub plugin: PluginId,
42    /// Function signature.
43    pub signature: FnSignature,
44    /// The registered function.
45    pub function: Arc<dyn ScalarPluginFn>,
46}
47
48impl std::fmt::Debug for ScalarEntry {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("ScalarEntry")
51            .field("plugin", &self.plugin)
52            .field("signature", &self.signature)
53            .finish_non_exhaustive()
54    }
55}
56
57/// A single aggregate-fn registry entry.
58pub struct AggregateEntry {
59    /// Owning plugin id.
60    pub plugin: PluginId,
61    /// Aggregate signature.
62    pub signature: AggSignature,
63    /// The registered aggregate.
64    pub aggregate: Arc<dyn AggregatePluginFn>,
65}
66
67impl std::fmt::Debug for AggregateEntry {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("AggregateEntry")
70            .field("plugin", &self.plugin)
71            .field("signature", &self.signature)
72            .finish_non_exhaustive()
73    }
74}
75
76/// A single window-fn registry entry.
77pub struct WindowEntry {
78    /// Owning plugin id.
79    pub plugin: PluginId,
80    /// Window signature.
81    pub signature: WindowSignature,
82    /// The registered window function.
83    pub window: Arc<dyn WindowPluginFn>,
84}
85
86impl std::fmt::Debug for WindowEntry {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("WindowEntry")
89            .field("plugin", &self.plugin)
90            .field("signature", &self.signature)
91            .finish_non_exhaustive()
92    }
93}
94
95/// A single procedure registry entry.
96pub struct ProcedureEntry {
97    /// Owning plugin id.
98    pub plugin: PluginId,
99    /// Procedure signature.
100    pub signature: ProcedureSignature,
101    /// The registered procedure.
102    pub procedure: Arc<dyn ProcedurePlugin>,
103}
104
105impl std::fmt::Debug for ProcedureEntry {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("ProcedureEntry")
108            .field("plugin", &self.plugin)
109            .field("signature", &self.signature)
110            .finish_non_exhaustive()
111    }
112}
113
114/// A Locy aggregate entry.
115pub struct LocyAggregateEntry {
116    /// Owning plugin id.
117    pub plugin: PluginId,
118    /// The registered aggregate.
119    pub aggregate: Arc<dyn LocyAggregate>,
120}
121
122impl std::fmt::Debug for LocyAggregateEntry {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_struct("LocyAggregateEntry")
125            .field("plugin", &self.plugin)
126            .finish_non_exhaustive()
127    }
128}
129
130/// A Locy predicate entry.
131pub struct LocyPredicateEntry {
132    /// Owning plugin id.
133    pub plugin: PluginId,
134    /// Predicate signature.
135    pub signature: PredSignature,
136    /// The registered predicate.
137    pub predicate: Arc<dyn LocyPredicate>,
138}
139
140impl std::fmt::Debug for LocyPredicateEntry {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("LocyPredicateEntry")
143            .field("plugin", &self.plugin)
144            .field("signature", &self.signature)
145            .finish_non_exhaustive()
146    }
147}
148
149/// A live index handle keyed by index *name* (e.g., `"vec_idx_embedding"`).
150///
151/// Unlike `IndexKindProvider`, which is plugin-registered via the
152/// `PluginRegistrar` and describes a *kind* of index, an `IndexHandleEntry`
153/// represents a *specific* live index — the runtime object produced by
154/// `IndexKindProvider::build().finalize()` (or `IndexKindProvider::open()`).
155/// Handles are inserted by the host (not by the plugin's `register()` call)
156/// because their lifetime tracks the storage layer rather than plugin
157/// metadata.
158///
159/// The planner consults this table by index name when dispatching a vector
160/// KNN query (see `plan_vector_knn`). When `Some`, the planner routes the
161/// probe through the plugin handle; when `None`, the native storage path
162/// runs (preserving the "no behavior change for built-ins" invariant).
163#[derive(Clone)]
164pub struct IndexHandleEntry {
165    /// Kind that produced this handle (informational; matches the
166    /// `IndexKindProvider::kind` that built it).
167    pub kind: IndexKind,
168    /// The live handle.
169    pub handle: Arc<dyn IndexHandle>,
170}
171
172impl std::fmt::Debug for IndexHandleEntry {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct("IndexHandleEntry")
175            .field("kind", &self.kind)
176            .finish_non_exhaustive()
177    }
178}
179
180/// One slot in the virtual label / edge-type allocation table — bundles
181/// the name the planner saw with the `CatalogTable` that owns its rows.
182///
183/// Used by [`PluginRegistry::register_virtual_label`] / `_edge_type`.
184/// Lookups by ID (via `virtual_label_by_id`) return a cheap clone of
185/// this entry so the planner's physical-scan layer can route directly
186/// to `table.scan(...)` without re-consulting the providers.
187#[derive(Clone)]
188pub struct VirtualEntry {
189    /// The user-typed name (e.g. `"External"`).
190    pub name: SmolStr,
191    /// The catalog table that owns the rows for this virtual identifier.
192    pub table: Arc<dyn crate::traits::catalog::CatalogTable>,
193}
194
195impl std::fmt::Debug for VirtualEntry {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        f.debug_struct("VirtualEntry")
198            .field("name", &self.name)
199            .finish_non_exhaustive()
200    }
201}
202
203/// A virtual identifier type (label `u16` or edge-type `u32`) that the
204/// allocator can hand out. Captures the per-type `START`/`SENTINEL`
205/// bounds and the saturating increment so the allocator body can be
206/// written once, generically.
207trait VirtualId:
208    Copy + Eq + Ord + std::hash::Hash + std::fmt::Debug + std::fmt::LowerHex + 'static
209{
210    /// First ID handed out (inclusive lower bound of the virtual range).
211    const START: Self;
212    /// Reserved upper bound (exclusive); reaching it means the space is
213    /// exhausted.
214    const SENTINEL: Self;
215    /// Human-facing label for the kind of identifier, used in the
216    /// exhaustion error message (e.g. `"label"`, `"edge-type"`).
217    const KIND_LABEL: &'static str;
218
219    /// Increment without overflow (the allocator never relies on the
220    /// wrapped value because it bails at `SENTINEL` first).
221    fn next(self) -> Self;
222}
223
224impl VirtualId for u16 {
225    const START: Self = uni_common::core::schema::VIRTUAL_LABEL_ID_START;
226    const SENTINEL: Self = uni_common::core::schema::VIRTUAL_LABEL_ID_SENTINEL;
227    const KIND_LABEL: &'static str = "label";
228
229    fn next(self) -> Self {
230        self.saturating_add(1)
231    }
232}
233
234impl VirtualId for u32 {
235    const START: Self = uni_common::core::edge_type::VIRTUAL_EDGE_TYPE_ID_START;
236    const SENTINEL: Self = uni_common::core::edge_type::VIRTUAL_EDGE_TYPE_ID_SENTINEL;
237    const KIND_LABEL: &'static str = "edge-type";
238
239    fn next(self) -> Self {
240        self.saturating_add(1)
241    }
242}
243
244/// Inner mutable state for a virtual-ID allocator (labels use `u16`,
245/// edge-types use `u32`). Held behind a `parking_lot::Mutex` because
246/// allocations are rare (one per first reference to a previously-unseen
247/// name) and the contention surface is tiny.
248#[derive(Debug)]
249struct VirtualIdSpace<Id: VirtualId> {
250    name_to_id: HashMap<SmolStr, Id>,
251    id_to_entry: HashMap<Id, VirtualEntry>,
252    next_id: Id,
253}
254
255impl<Id: VirtualId> Default for VirtualIdSpace<Id> {
256    fn default() -> Self {
257        Self {
258            name_to_id: HashMap::new(),
259            id_to_entry: HashMap::new(),
260            next_id: Id::START,
261        }
262    }
263}
264
265impl<Id: VirtualId> VirtualIdSpace<Id> {
266    /// Allocate (or look up) an ID for `name`, replacing the stored
267    /// table on re-registration. Returns `Err` when the virtual range is
268    /// exhausted.
269    fn register(
270        &mut self,
271        name: SmolStr,
272        table: Arc<dyn crate::traits::catalog::CatalogTable>,
273    ) -> Result<Id, PluginError> {
274        if let Some(&id) = self.name_to_id.get(&name) {
275            self.id_to_entry.insert(
276                id,
277                VirtualEntry {
278                    name: name.clone(),
279                    table,
280                },
281            );
282            return Ok(id);
283        }
284        if self.next_id >= Id::SENTINEL {
285            return Err(PluginError::Internal(format!(
286                "virtual {}-ID space exhausted ({} slots taken; sentinel {:#x})",
287                Id::KIND_LABEL,
288                self.id_to_entry.len(),
289                Id::SENTINEL,
290            )));
291        }
292        let id = self.next_id;
293        self.next_id = self.next_id.next();
294        self.name_to_id.insert(name.clone(), id);
295        self.id_to_entry.insert(id, VirtualEntry { name, table });
296        Ok(id)
297    }
298}
299
300/// Per-plugin record of *what* this plugin registered, for unregister /
301/// hot-reload.
302///
303/// `pub(crate)` (with `pub(crate)` fields) so the family-ops traits in
304/// [`crate::surfaces`] can update the record without an accessor for each
305/// surface during the Phase 4 migration.
306#[derive(Default, Debug)]
307pub(crate) struct PluginRecord {
308    pub(crate) scalars: Vec<QName>,
309    pub(crate) aggregates: Vec<QName>,
310    pub(crate) windows: Vec<QName>,
311    /// Procedures are arity-overloaded: a given `QName` may be registered
312    /// multiple times with different arities (see `procedure_with_arity`).
313    /// The `usize` is the procedure's positional argument count, used by
314    /// `remove_plugin` to drop the exact overload this plugin owns.
315    pub(crate) procedures: Vec<(QName, usize)>,
316    pub(crate) locy_aggregates: Vec<QName>,
317    pub(crate) locy_predicates: Vec<QName>,
318    pub(crate) operators: Vec<QName>,
319    pub(crate) algorithms: Vec<QName>,
320    pub(crate) pregels: Vec<QName>,
321    pub(crate) index_kinds: Vec<IndexKind>,
322    pub(crate) storage_schemes: Vec<SmolStr>,
323    pub(crate) label_storages: Vec<SmolStr>,
324    pub(crate) crdt_kinds: Vec<CrdtKind>,
325    /// Logical-type extension names this plugin registered. Tracked
326    /// per-key (not count-only) so `remove_plugin` can drop the entries
327    /// on hot reload.
328    pub(crate) logical_types: Vec<SmolStr>,
329    /// Collation names this plugin registered.
330    pub(crate) collations: Vec<SmolStr>,
331    /// CDC output sink names this plugin registered.
332    pub(crate) cdc_outputs: Vec<SmolStr>,
333    /// Catalog names this plugin registered.
334    pub(crate) catalogs: Vec<SmolStr>,
335    pub(crate) hook_count: usize,
336    pub(crate) auth_count: usize,
337    pub(crate) authz_count: usize,
338    pub(crate) connector_count: usize,
339    pub(crate) trigger_count: usize,
340    pub(crate) replacement_scan_count: usize,
341    pub(crate) optimizer_rule_count: usize,
342    pub(crate) background_job_count: usize,
343}
344
345/// A deep-clone snapshot of one plugin's registry footprint.
346///
347/// Produced by [`PluginRegistry::iter_for_plugin`] and consumed by
348/// [`crate::reload::ReloadDispatcher`]. The snapshot is **not** kept
349/// in sync with the live registry; it represents the surfaces a
350/// plugin owned at the moment the snapshot was taken.
351#[derive(Clone, Debug, Default)]
352pub struct PluginRecordSnapshot {
353    /// Scalar fns this plugin registered.
354    pub scalars: Vec<QName>,
355    /// Aggregate fns this plugin registered.
356    pub aggregates: Vec<QName>,
357    /// Window fns this plugin registered.
358    pub windows: Vec<QName>,
359    /// Procedures (qname + arity) this plugin registered.
360    pub procedures: Vec<(QName, usize)>,
361    /// Locy aggregates this plugin registered.
362    pub locy_aggregates: Vec<QName>,
363    /// Locy predicates this plugin registered.
364    pub locy_predicates: Vec<QName>,
365    /// Physical operators this plugin registered.
366    pub operators: Vec<QName>,
367    /// Algorithms this plugin registered.
368    pub algorithms: Vec<QName>,
369    /// Pregel programs this plugin registered.
370    pub pregels: Vec<QName>,
371    /// Index kinds this plugin registered.
372    pub index_kinds: Vec<IndexKind>,
373    /// Storage URI schemes this plugin registered.
374    pub storage_schemes: Vec<SmolStr>,
375    /// Label storages this plugin registered.
376    pub label_storages: Vec<SmolStr>,
377    /// CRDT kinds this plugin registered.
378    pub crdt_kinds: Vec<CrdtKind>,
379    /// Logical-type extension names this plugin registered.
380    pub logical_types: Vec<SmolStr>,
381    /// Collation names this plugin registered.
382    pub collations: Vec<SmolStr>,
383    /// CDC output sink names this plugin registered.
384    pub cdc_outputs: Vec<SmolStr>,
385    /// Catalog names this plugin registered.
386    pub catalogs: Vec<SmolStr>,
387    /// Number of `SessionHook`s this plugin registered.
388    pub hook_count: usize,
389    /// Number of `AuthProvider`s this plugin registered.
390    pub auth_count: usize,
391    /// Number of `AuthzPolicy`s this plugin registered.
392    pub authz_count: usize,
393    /// Number of `Connector`s this plugin registered.
394    pub connector_count: usize,
395    /// Number of `TriggerPlugin`s this plugin registered.
396    pub trigger_count: usize,
397    /// Number of `ReplacementScanProvider`s this plugin registered.
398    pub replacement_scan_count: usize,
399    /// Number of `OptimizerRuleProvider`s this plugin registered.
400    pub optimizer_rule_count: usize,
401    /// Number of `BackgroundJobProvider`s this plugin registered.
402    pub background_job_count: usize,
403}
404
405impl From<&PluginRecord> for PluginRecordSnapshot {
406    /// Deep-clone a live `PluginRecord` into a standalone snapshot. The
407    /// field list lives only on the two struct definitions; this clones
408    /// each (`Vec`s deep-clone their elements, counts are `Copy`).
409    fn from(r: &PluginRecord) -> Self {
410        Self {
411            scalars: r.scalars.clone(),
412            aggregates: r.aggregates.clone(),
413            windows: r.windows.clone(),
414            procedures: r.procedures.clone(),
415            locy_aggregates: r.locy_aggregates.clone(),
416            locy_predicates: r.locy_predicates.clone(),
417            operators: r.operators.clone(),
418            algorithms: r.algorithms.clone(),
419            pregels: r.pregels.clone(),
420            index_kinds: r.index_kinds.clone(),
421            storage_schemes: r.storage_schemes.clone(),
422            label_storages: r.label_storages.clone(),
423            crdt_kinds: r.crdt_kinds.clone(),
424            logical_types: r.logical_types.clone(),
425            collations: r.collations.clone(),
426            cdc_outputs: r.cdc_outputs.clone(),
427            catalogs: r.catalogs.clone(),
428            hook_count: r.hook_count,
429            auth_count: r.auth_count,
430            authz_count: r.authz_count,
431            connector_count: r.connector_count,
432            trigger_count: r.trigger_count,
433            replacement_scan_count: r.replacement_scan_count,
434            optimizer_rule_count: r.optimizer_rule_count,
435            background_job_count: r.background_job_count,
436        }
437    }
438}
439
440/// All-surfaces plugin registry.
441///
442/// Per-surface tables wrapped in `arc-swap` for wait-free reads. The
443/// registry tracks per-plugin ownership so `remove_plugin` can clean up
444/// all of a plugin's registrations in one pass.
445#[derive(Default)]
446pub struct PluginRegistry {
447    pub(crate) scalars: DashMap<QName, Arc<ScalarEntry>>,
448    pub(crate) aggregates: DashMap<QName, Arc<AggregateEntry>>,
449    pub(crate) windows: DashMap<QName, Arc<WindowEntry>>,
450    /// Procedures keyed by qname. Each qname may carry multiple overload
451    /// entries discriminated by `entry.signature.args.len()` so callers can
452    /// register two registrations under the same name with different
453    /// arities (M5c.2: legacy 5-arg + new 2-arg algorithm signatures).
454    /// `procedure(&q)` returns the first registration; arity-aware callers
455    /// use `procedure_with_arity(&q, arity)`.
456    pub(crate) procedures: DashMap<QName, Vec<Arc<ProcedureEntry>>>,
457    pub(crate) locy_aggregates: DashMap<QName, Arc<LocyAggregateEntry>>,
458    pub(crate) locy_predicates: DashMap<QName, Arc<LocyPredicateEntry>>,
459    pub(crate) operators: DashMap<QName, Arc<dyn OperatorProvider>>,
460    pub(crate) optimizer_rules:
461        ArcSwap<Vec<crate::surfaces::AppendEntry<dyn OptimizerRuleProvider>>>,
462    pub(crate) algorithms: DashMap<QName, Arc<dyn AlgorithmProvider>>,
463    pub(crate) pregels: DashMap<QName, Arc<dyn PregelProgramProvider>>,
464    pub(crate) index_kinds: DashMap<IndexKind, Arc<dyn IndexKindProvider>>,
465    index_handles: DashMap<SmolStr, IndexHandleEntry>,
466    pub(crate) storage_backends: DashMap<SmolStr, Arc<dyn StorageBackend>>,
467    /// Per-label plugin storage (M5h.2). Distinct from
468    /// `storage_backends` — the latter is keyed by URI *scheme* and
469    /// opens new `Storage` instances; this map is keyed by *label name*
470    /// and resolves to an already-open `Storage`. The host's
471    /// `StorageManager::scan_vertex_table` consults this map before
472    /// falling through to the native backend so a third-party plugin
473    /// can serve a native-schema label from its own storage.
474    pub(crate) label_storages: DashMap<SmolStr, Arc<dyn crate::traits::storage::Storage>>,
475    pub(crate) crdt_kinds: DashMap<CrdtKind, Arc<dyn CrdtKindProvider>>,
476    pub(crate) hooks: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn SessionHook>>>,
477    pub(crate) logical_types: DashMap<SmolStr, Arc<dyn LogicalTypeProvider>>,
478    pub(crate) auth_providers: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn AuthProvider>>>,
479    pub(crate) authz_policies: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn AuthzPolicy>>>,
480    pub(crate) connectors: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn Connector>>>,
481    pub(crate) triggers: ArcSwap<Vec<crate::surfaces::AppendEntry<dyn TriggerPlugin>>>,
482    pub(crate) collations: DashMap<SmolStr, Arc<dyn CollationProvider>>,
483    pub(crate) cdc_outputs: DashMap<SmolStr, Arc<dyn CdcOutputProvider>>,
484    pub(crate) catalogs: DashMap<SmolStr, Arc<dyn CatalogProvider>>,
485    pub(crate) replacement_scans:
486        ArcSwap<Vec<crate::surfaces::AppendEntry<dyn ReplacementScanProvider>>>,
487    pub(crate) background_jobs:
488        ArcSwap<Vec<crate::surfaces::AppendEntry<dyn BackgroundJobProvider>>>,
489    /// Virtual label-ID allocator. Allocates IDs in the schema's reserved
490    /// virtual range (`uni_common::core::schema::VIRTUAL_LABEL_ID_START..
491    /// VIRTUAL_LABEL_ID_SENTINEL`) on first observation of an unknown label
492    /// name that a `CatalogProvider` or `ReplacementScanProvider` claims.
493    /// See [`Self::register_virtual_label`] / [`Self::virtual_label_by_id`].
494    virtual_labels: Mutex<VirtualIdSpace<u16>>,
495    /// Virtual edge-type allocator. Allocates IDs in
496    /// `uni_common::core::edge_type::VIRTUAL_EDGE_TYPE_ID_START..
497    /// VIRTUAL_EDGE_TYPE_ID_SENTINEL`. Same first-observation semantics.
498    virtual_edge_types: Mutex<VirtualIdSpace<u32>>,
499    per_plugin: RwLock<dashmap::DashMap<PluginId, PluginRecord>>,
500}
501
502impl std::fmt::Debug for PluginRegistry {
503    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504        f.debug_struct("PluginRegistry")
505            .field("scalar_fns", &self.scalars.len())
506            .field("aggregates", &self.aggregates.len())
507            .field("procedures", &self.procedures.len())
508            .field("locy_aggregates", &self.locy_aggregates.len())
509            .field("algorithms", &self.algorithms.len())
510            .field("storage_backends", &self.storage_backends.len())
511            .field("index_kinds", &self.index_kinds.len())
512            .field("hooks", &self.hooks.load().len())
513            .field("plugins", &self.per_plugin.read().len())
514            .finish()
515    }
516}
517
518impl PluginRegistry {
519    /// Construct an empty registry.
520    #[must_use]
521    pub fn new() -> Self {
522        Self::default()
523    }
524
525    /// Look up a registered scalar function by qname.
526    #[must_use]
527    pub fn scalar_fn(&self, q: &QName) -> Option<Arc<ScalarEntry>> {
528        self.scalars.get(q).map(|e| Arc::clone(e.value()))
529    }
530
531    /// Iterate every registered scalar function — `(QName, ScalarEntry)`.
532    ///
533    /// Collects into a `Vec` so the iteration does not hold a long-lived
534    /// reference to the underlying `DashMap` (avoids subtle aliasing
535    /// hazards when callers register or remove plugins mid-iteration).
536    ///
537    /// # Examples
538    ///
539    /// ```ignore
540    /// for (qname, entry) in registry.iter_scalars() {
541    ///     ctx.register_udf(ScalarUDF::new_from_impl(adapt(qname, entry)));
542    /// }
543    /// ```
544    #[must_use]
545    pub fn iter_scalars(&self) -> Vec<(QName, Arc<ScalarEntry>)> {
546        self.scalars
547            .iter()
548            .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
549            .collect()
550    }
551
552    /// Iterate every registered procedure — `(QName, ProcedureEntry)`.
553    ///
554    /// Arity-overloaded names yield one tuple per registered overload.
555    #[must_use]
556    pub fn iter_procedures(&self) -> Vec<(QName, Arc<ProcedureEntry>)> {
557        self.procedures
558            .iter()
559            .flat_map(|kv| {
560                let q = kv.key().clone();
561                kv.value()
562                    .iter()
563                    .map(move |e| (q.clone(), Arc::clone(e)))
564                    .collect::<Vec<_>>()
565            })
566            .collect()
567    }
568
569    /// Iterate every registered Locy aggregate — `(QName, LocyAggregateEntry)`.
570    #[must_use]
571    pub fn iter_locy_aggregates(&self) -> Vec<(QName, Arc<LocyAggregateEntry>)> {
572        self.locy_aggregates
573            .iter()
574            .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
575            .collect()
576    }
577
578    /// Iterate every registered algorithm — `(QName, AlgorithmProvider)`.
579    #[must_use]
580    pub fn iter_algorithms(&self) -> Vec<(QName, Arc<dyn AlgorithmProvider>)> {
581        self.algorithms
582            .iter()
583            .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
584            .collect()
585    }
586
587    /// Iterate every registered index kind — `(IndexKind, IndexKindProvider)`.
588    #[must_use]
589    pub fn iter_index_kinds(&self) -> Vec<(IndexKind, Arc<dyn IndexKindProvider>)> {
590        self.index_kinds
591            .iter()
592            .map(|kv| (kv.key().clone(), Arc::clone(kv.value())))
593            .collect()
594    }
595
596    /// Snapshot the registered catalog providers.
597    ///
598    /// Returns a `Vec` so the iteration does not hold a long-lived reference
599    /// to the underlying `DashMap`.
600    #[must_use]
601    pub fn catalogs(&self) -> Vec<Arc<dyn CatalogProvider>> {
602        self.catalogs
603            .iter()
604            .map(|kv| Arc::clone(kv.value()))
605            .collect()
606    }
607
608    /// Look up a registered aggregate by qname.
609    #[must_use]
610    pub fn aggregate(&self, q: &QName) -> Option<Arc<AggregateEntry>> {
611        self.aggregates.get(q).map(|e| Arc::clone(e.value()))
612    }
613
614    /// Look up a registered window function by qname.
615    #[must_use]
616    pub fn window(&self, q: &QName) -> Option<Arc<WindowEntry>> {
617        self.windows.get(q).map(|e| Arc::clone(e.value()))
618    }
619
620    /// Look up a registered procedure by qname.
621    ///
622    /// If the qname carries multiple arity overloads (M5c.2), this returns
623    /// the *first* registered entry, which preserves the legacy
624    /// single-arity lookup contract. Arity-aware callers should use
625    /// [`Self::procedure_with_arity`] instead.
626    #[must_use]
627    pub fn procedure(&self, q: &QName) -> Option<Arc<ProcedureEntry>> {
628        self.procedures
629            .get(q)
630            .and_then(|e| e.value().first().map(Arc::clone))
631    }
632
633    /// Look up a registered procedure by qname *and* positional argument
634    /// count. Returns the entry whose signature has exactly `arity`
635    /// arguments, or `None` if no overload matches.
636    ///
637    /// Procedures may be registered with the same qname under multiple
638    /// arities (e.g. an algorithm's legacy 5-arg form alongside the new
639    /// `(graphRef, config)` 2-arg form). Resolution sites that know the
640    /// call's argument count should prefer this method; the bare
641    /// [`Self::procedure`] is preserved for callers that only need the
642    /// first registration.
643    #[must_use]
644    pub fn procedure_with_arity(&self, q: &QName, arity: usize) -> Option<Arc<ProcedureEntry>> {
645        self.procedures.get(q).and_then(|e| {
646            e.value()
647                .iter()
648                .find(|entry| entry.signature.args.len() == arity)
649                .map(Arc::clone)
650        })
651    }
652
653    /// Return all arity overloads registered under `q`.
654    ///
655    /// The returned `Vec` is empty when nothing is registered. Useful for
656    /// diagnostic surfaces (e.g. `EXPLAIN` of an ambiguous call) and for
657    /// listing API.
658    #[must_use]
659    pub fn procedure_overloads(&self, q: &QName) -> Vec<Arc<ProcedureEntry>> {
660        self.procedures
661            .get(q)
662            .map(|e| e.value().iter().map(Arc::clone).collect())
663            .unwrap_or_default()
664    }
665
666    /// Look up a registered Locy aggregate by qname.
667    #[must_use]
668    pub fn locy_aggregate(&self, q: &QName) -> Option<Arc<LocyAggregateEntry>> {
669        self.locy_aggregates.get(q).map(|e| Arc::clone(e.value()))
670    }
671
672    /// Look up a registered Locy predicate by qname.
673    #[must_use]
674    pub fn locy_predicate(&self, q: &QName) -> Option<Arc<LocyPredicateEntry>> {
675        self.locy_predicates.get(q).map(|e| Arc::clone(e.value()))
676    }
677
678    /// Look up a registered storage backend by scheme.
679    #[must_use]
680    pub fn storage_backend(&self, scheme: &str) -> Option<Arc<dyn StorageBackend>> {
681        self.storage_backends
682            .get(&SmolStr::new(scheme))
683            .map(|e| Arc::clone(e.value()))
684    }
685
686    /// Look up the plugin `Storage` (if any) registered to serve the
687    /// given native label name (M5h.2). Consulted by the host's
688    /// `StorageManager::scan_vertex_table` before the native backend
689    /// fallback — when this returns `Some`, the planner's graph-scan
690    /// path is routed through plugin storage instead of Lance.
691    #[must_use]
692    pub fn lookup_label_storage(
693        &self,
694        label: &str,
695    ) -> Option<Arc<dyn crate::traits::storage::Storage>> {
696        self.label_storages
697            .get(&SmolStr::new(label))
698            .map(|e| Arc::clone(e.value()))
699    }
700
701    /// Look up a registered index-kind by kind.
702    #[must_use]
703    pub fn index_kind(&self, k: &IndexKind) -> Option<Arc<dyn IndexKindProvider>> {
704        self.index_kinds.get(k).map(|e| Arc::clone(e.value()))
705    }
706
707    /// Register a live `IndexHandle` under an index name.
708    ///
709    /// The host calls this after building a handle from a custom
710    /// `IndexKindProvider` (or after `open()` from persisted bytes). The
711    /// planner consults this table from `plan_vector_knn` to route probes
712    /// through the plugin handle instead of the native storage path.
713    ///
714    /// If an entry already exists under the same name, it is replaced.
715    pub fn register_index_handle(
716        &self,
717        name: impl Into<SmolStr>,
718        kind: IndexKind,
719        handle: Arc<dyn IndexHandle>,
720    ) {
721        self.index_handles
722            .insert(name.into(), IndexHandleEntry { kind, handle });
723    }
724
725    /// Look up a live `IndexHandle` by index name. Returns a cheap clone
726    /// (the inner handle is `Arc`-wrapped).
727    #[must_use]
728    pub fn index_handle(&self, name: &str) -> Option<IndexHandleEntry> {
729        self.index_handles
730            .get(&SmolStr::new(name))
731            .map(|e| e.value().clone())
732    }
733
734    /// Remove a live `IndexHandle`. Returns the removed entry if one
735    /// existed.
736    pub fn deregister_index_handle(&self, name: &str) -> Option<IndexHandleEntry> {
737        self.index_handles
738            .remove(&SmolStr::new(name))
739            .map(|(_, v)| v)
740    }
741
742    /// Allocate (or look up) a virtual label ID for `name`, owned by
743    /// `table`. The host's `QueryPlanner` calls this when an unknown
744    /// label name is claimed by a `CatalogProvider` or
745    /// `ReplacementScanProvider`; subsequent references to the same name
746    /// return the cached ID without re-running discovery.
747    ///
748    /// Idempotent: a second call with the same name returns the
749    /// previously-allocated ID and *replaces* the stored `CatalogTable`
750    /// (so cached `LogicalPlan`s naturally pick up the latest table on
751    /// next execute). Returns `Err` if the virtual range is exhausted
752    /// (255 slots, see `uni_common::core::schema`).
753    pub fn register_virtual_label(
754        &self,
755        name: impl Into<SmolStr>,
756        table: Arc<dyn crate::traits::catalog::CatalogTable>,
757    ) -> Result<u16, PluginError> {
758        self.virtual_labels.lock().register(name.into(), table)
759    }
760
761    /// Look up a virtual label by name. Returns `None` if no provider
762    /// has claimed it yet (the caller hasn't called
763    /// `register_virtual_label`).
764    #[must_use]
765    pub fn virtual_label_by_name(&self, name: &str) -> Option<u16> {
766        let inner = self.virtual_labels.lock();
767        inner.name_to_id.get(&SmolStr::new(name)).copied()
768    }
769
770    /// Look up the catalog table behind a virtual label ID. Returns the
771    /// entry cheaply cloned (inner `Arc<dyn CatalogTable>`).
772    #[must_use]
773    pub fn virtual_label_by_id(&self, id: u16) -> Option<VirtualEntry> {
774        self.virtual_labels.lock().id_to_entry.get(&id).cloned()
775    }
776
777    /// Allocate (or look up) a virtual edge-type ID for `name`. Same
778    /// semantics as [`Self::register_virtual_label`] but for the
779    /// `u32` edge-type ID space.
780    pub fn register_virtual_edge_type(
781        &self,
782        name: impl Into<SmolStr>,
783        table: Arc<dyn crate::traits::catalog::CatalogTable>,
784    ) -> Result<u32, PluginError> {
785        self.virtual_edge_types.lock().register(name.into(), table)
786    }
787
788    /// Look up a virtual edge type by name.
789    #[must_use]
790    pub fn virtual_edge_type_by_name(&self, name: &str) -> Option<u32> {
791        let inner = self.virtual_edge_types.lock();
792        inner.name_to_id.get(&SmolStr::new(name)).copied()
793    }
794
795    /// Look up the catalog table behind a virtual edge-type ID.
796    #[must_use]
797    pub fn virtual_edge_type_by_id(&self, id: u32) -> Option<VirtualEntry> {
798        self.virtual_edge_types.lock().id_to_entry.get(&id).cloned()
799    }
800
801    /// Look up a registered algorithm by qname.
802    #[must_use]
803    pub fn algorithm(&self, q: &QName) -> Option<Arc<dyn AlgorithmProvider>> {
804        self.algorithms.get(q).map(|e| Arc::clone(e.value()))
805    }
806
807    /// Look up a registered CRDT kind.
808    #[must_use]
809    pub fn crdt_kind(&self, k: &CrdtKind) -> Option<Arc<dyn CrdtKindProvider>> {
810        self.crdt_kinds.get(k).map(|e| Arc::clone(e.value()))
811    }
812
813    /// Look up a registered logical type by its Arrow extension name.
814    #[must_use]
815    pub fn logical_type(&self, name: &SmolStr) -> Option<Arc<dyn LogicalTypeProvider>> {
816        self.logical_types.get(name).map(|e| Arc::clone(e.value()))
817    }
818
819    /// Snapshot the registered hook chain.
820    #[must_use]
821    pub fn hooks(&self) -> Arc<Vec<Arc<dyn SessionHook>>> {
822        Self::project_append(&self.hooks)
823    }
824
825    /// Snapshot the registered optimizer-rule providers (M5h).
826    #[must_use]
827    pub fn optimizer_rules(&self) -> Arc<Vec<Arc<dyn OptimizerRuleProvider>>> {
828        Self::project_append(&self.optimizer_rules)
829    }
830
831    /// Snapshot the registered trigger chain.
832    #[must_use]
833    pub fn triggers(&self) -> Arc<Vec<Arc<dyn TriggerPlugin>>> {
834        Self::project_append(&self.triggers)
835    }
836
837    /// Snapshot every registered [`CdcOutputProvider`] keyed by name (FU-4).
838    ///
839    /// Used by `Uni::build` to start a CDC stream per provider before
840    /// the commit broadcaster begins pushing `CdcBatch`es.
841    #[must_use]
842    pub fn cdc_outputs_snapshot(&self) -> Vec<(SmolStr, Arc<dyn CdcOutputProvider>)> {
843        self.cdc_outputs
844            .iter()
845            .map(|e| (e.key().clone(), Arc::clone(e.value())))
846            .collect()
847    }
848
849    /// `true` when no [`CdcOutputProvider`] is registered.
850    ///
851    /// Used by the commit hot-path to skip mutation-row materialization
852    /// when there are no CDC subscribers — preserves the empty-registry
853    /// fast path.
854    #[must_use]
855    pub fn cdc_outputs_is_empty(&self) -> bool {
856        self.cdc_outputs.is_empty()
857    }
858
859    /// Snapshot the registered authentication providers (M5i).
860    #[must_use]
861    pub fn auth_providers(&self) -> Arc<Vec<Arc<dyn AuthProvider>>> {
862        Self::project_append(&self.auth_providers)
863    }
864
865    /// Snapshot the registered authorization policies (M5i).
866    #[must_use]
867    pub fn authz_policies(&self) -> Arc<Vec<Arc<dyn AuthzPolicy>>> {
868        Self::project_append(&self.authz_policies)
869    }
870
871    /// Snapshot the registered wire-protocol connectors (M5i).
872    #[must_use]
873    pub fn connectors(&self) -> Arc<Vec<Arc<dyn Connector>>> {
874        Self::project_append(&self.connectors)
875    }
876
877    /// Snapshot the registered replacement-scan providers.
878    #[must_use]
879    pub fn replacement_scans(&self) -> Arc<Vec<Arc<dyn ReplacementScanProvider>>> {
880        Self::project_append(&self.replacement_scans)
881    }
882
883    /// Apply a batch of pending registrations atomically.
884    ///
885    /// Preflights every entry against the live registry first, then
886    /// applies them in order. Dispatch is per-family (see
887    /// [`crate::surfaces`]): static-typed `*Ops` impls handle storage and
888    /// per-plugin record-keeping; the `DynPendingRegistration` boxes
889    /// erase the family type so a heterogeneous batch can be queued.
890    ///
891    /// # Errors
892    ///
893    /// Returns the first preflight failure (e.g.
894    /// [`PluginError::DuplicateRegistration`] or
895    /// [`PluginError::StorageSchemeConflict`]); nothing in the batch is
896    /// applied in that case.
897    pub(crate) fn apply_pending(
898        &self,
899        plugin_id: &PluginId,
900        pending: Vec<Box<dyn crate::surfaces::DynPendingRegistration>>,
901    ) -> Result<(), PluginError> {
902        for reg in &pending {
903            reg.preflight(self)?;
904        }
905
906        let mut record = PluginRecord::default();
907        for reg in pending {
908            reg.apply(self, plugin_id.clone(), &mut record);
909        }
910
911        self.per_plugin.read().insert(plugin_id.clone(), record);
912
913        Ok(())
914    }
915
916    /// Snapshot the registered background jobs.
917    #[must_use]
918    pub fn background_jobs(&self) -> Arc<Vec<Arc<dyn BackgroundJobProvider>>> {
919        Self::project_append(&self.background_jobs)
920    }
921
922    /// Materialize an `Arc<Vec<Arc<dyn P>>>` view of an append-family slot,
923    /// stripping the per-entry `AppendEntry` ownership tag.
924    ///
925    /// The legacy public read-accessor signature returns `Arc<Vec<Arc<dyn
926    /// P>>>` for wait-free callers (`hooks()`, `triggers()`, …). The
927    /// owner-tagged storage required for proper `remove_plugin`
928    /// implementation (closes the M5e gap; see [`crate::surfaces`]
929    /// foundation work) carries the plugin id inline, so projecting back to
930    /// the legacy shape costs one allocation + N `Arc` clones per call.
931    /// Phase 4f will retire this helper in favour of returning the typed
932    /// `AppendEntry` slice directly.
933    fn project_append<P: ?Sized>(
934        slot: &ArcSwap<Vec<crate::surfaces::AppendEntry<P>>>,
935    ) -> Arc<Vec<Arc<P>>> {
936        let snap = slot.load();
937        let v: Vec<Arc<P>> = snap.iter().map(|e| Arc::clone(&e.provider)).collect();
938        Arc::new(v)
939    }
940
941    /// Snapshot the surfaces a plugin currently owns.
942    ///
943    /// Returns `None` when the plugin has never registered anything (or
944    /// has already been removed). Used by
945    /// [`crate::reload::ReloadDispatcher`] to determine which per-kind
946    /// reload protocols to invoke for the old plugin.
947    ///
948    /// The snapshot is a deep clone of the registry's internal
949    /// `PluginRecord`; mutating the registry afterward does not affect
950    /// the snapshot.
951    #[must_use]
952    pub fn iter_for_plugin(&self, plugin: &PluginId) -> Option<PluginRecordSnapshot> {
953        let guard = self.per_plugin.read();
954        guard.get(plugin).map(|r| PluginRecordSnapshot::from(&*r))
955    }
956
957    /// Remove all registrations for the given plugin.
958    ///
959    /// Used by `Uni::remove_plugin` and as part of hot reload's drain step.
960    /// Dispatches per family via the `*Ops` traits in [`crate::surfaces`];
961    /// the label-storage / logical-type / collation / cdc / catalog
962    /// surfaces are dropped here too (the per-key tracking lifts the old
963    /// count-only gap where hot reload leaked entries on those slots).
964    pub fn remove_plugin(&self, plugin: &PluginId) {
965        use crate::surfaces::{
966            AggregateSurface, AlgorithmSurface, AppendOps, AuthSurface, AuthzSurface,
967            BackgroundJobSurface, CatalogSurface, CdcSurface, CollationSurface, ConnectorSurface,
968            CrdtSurface, Discriminator, HookSurface, IndexKindSurface, KeyedUniqueOps,
969            LabelStorageSurface, LocyAggregateSurface, LocyPredicateSurface, LogicalTypeSurface,
970            NamedUniqueOps, OperatorSurface, OptimizerRuleSurface, PregelSurface, ProcedureSurface,
971            ReplacementScanSurface, ScalarSurface, StorageBackendSurface, TriggerSurface,
972            VersionedOps, WindowSurface,
973        };
974
975        let record = self.per_plugin.read().remove(plugin).map(|(_, r)| r);
976        let Some(record) = record else { return };
977
978        for q in record.scalars {
979            <ScalarSurface as NamedUniqueOps>::remove(self, &q);
980        }
981        for q in record.aggregates {
982            <AggregateSurface as NamedUniqueOps>::remove(self, &q);
983        }
984        for q in record.windows {
985            <WindowSurface as NamedUniqueOps>::remove(self, &q);
986        }
987        for (q, arity) in record.procedures {
988            <ProcedureSurface as VersionedOps>::remove(self, &q, Discriminator::Arity(arity));
989        }
990        for q in record.locy_aggregates {
991            <LocyAggregateSurface as NamedUniqueOps>::remove(self, &q);
992        }
993        for q in record.locy_predicates {
994            <LocyPredicateSurface as NamedUniqueOps>::remove(self, &q);
995        }
996        for q in record.operators {
997            <OperatorSurface as NamedUniqueOps>::remove(self, &q);
998        }
999        for q in record.algorithms {
1000            <AlgorithmSurface as NamedUniqueOps>::remove(self, &q);
1001        }
1002        for q in record.pregels {
1003            <PregelSurface as NamedUniqueOps>::remove(self, &q);
1004        }
1005        for k in record.index_kinds {
1006            <IndexKindSurface as KeyedUniqueOps>::remove(self, &k);
1007        }
1008        for s in record.storage_schemes {
1009            <StorageBackendSurface as KeyedUniqueOps>::remove(self, &s);
1010        }
1011        for l in record.label_storages {
1012            <LabelStorageSurface as KeyedUniqueOps>::remove(self, &l);
1013        }
1014        for k in record.crdt_kinds {
1015            <CrdtSurface as KeyedUniqueOps>::remove(self, &k);
1016        }
1017        for k in record.logical_types {
1018            <LogicalTypeSurface as KeyedUniqueOps>::remove(self, &k);
1019        }
1020        for k in record.collations {
1021            <CollationSurface as KeyedUniqueOps>::remove(self, &k);
1022        }
1023        for k in record.cdc_outputs {
1024            <CdcSurface as KeyedUniqueOps>::remove(self, &k);
1025        }
1026        for k in record.catalogs {
1027            <CatalogSurface as KeyedUniqueOps>::remove(self, &k);
1028        }
1029
1030        <OptimizerRuleSurface as AppendOps>::remove_plugin(self, plugin);
1031        <HookSurface as AppendOps>::remove_plugin(self, plugin);
1032        <AuthSurface as AppendOps>::remove_plugin(self, plugin);
1033        <AuthzSurface as AppendOps>::remove_plugin(self, plugin);
1034        <ConnectorSurface as AppendOps>::remove_plugin(self, plugin);
1035        <TriggerSurface as AppendOps>::remove_plugin(self, plugin);
1036        <ReplacementScanSurface as AppendOps>::remove_plugin(self, plugin);
1037        <BackgroundJobSurface as AppendOps>::remove_plugin(self, plugin);
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044
1045    #[test]
1046    fn registry_default_is_empty() {
1047        let r = PluginRegistry::new();
1048        assert!(r.scalar_fn(&QName::builtin("anything")).is_none());
1049        assert!(r.procedure(&QName::builtin("anything")).is_none());
1050        assert_eq!(r.hooks().len(), 0);
1051    }
1052
1053    #[test]
1054    fn debug_smoke() {
1055        let r = PluginRegistry::new();
1056        let s = format!("{r:?}");
1057        assert!(s.contains("PluginRegistry"));
1058    }
1059}