Skip to main content

nemo_flow/
plugin.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Generic plugin infrastructure for NeMo Flow runtimes.
5//!
6//! This module owns:
7//! - config diagnostics and policy enums used by plugin systems
8//! - a global plugin registry
9//! - plugin registration contexts for middleware/subscriber installation
10//! - rollback bookkeeping for registrations created during plugin setup
11
12use std::collections::{HashMap, HashSet};
13use std::fmt;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::{Arc, LazyLock, Mutex, RwLock};
17
18use serde::{Deserialize, Serialize};
19use serde_json::{Map, Value as Json};
20use thiserror::Error;
21
22use crate::api::registry::{
23    deregister_llm_conditional_execution_guardrail, deregister_llm_execution_intercept,
24    deregister_llm_request_intercept, deregister_llm_sanitize_request_guardrail,
25    deregister_llm_sanitize_response_guardrail, deregister_llm_stream_execution_intercept,
26    deregister_tool_conditional_execution_guardrail, deregister_tool_execution_intercept,
27    deregister_tool_request_intercept, deregister_tool_sanitize_request_guardrail,
28    deregister_tool_sanitize_response_guardrail, register_llm_conditional_execution_guardrail,
29    register_llm_execution_intercept, register_llm_request_intercept,
30    register_llm_sanitize_request_guardrail, register_llm_sanitize_response_guardrail,
31    register_llm_stream_execution_intercept, register_tool_conditional_execution_guardrail,
32    register_tool_execution_intercept, register_tool_request_intercept,
33    register_tool_sanitize_request_guardrail, register_tool_sanitize_response_guardrail,
34};
35use crate::api::runtime::{
36    EventSubscriberFn, LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn,
37    LlmSanitizeRequestFn, LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn,
38    ToolExecutionFn, ToolInterceptFn, ToolSanitizeFn,
39};
40use crate::api::subscriber::{deregister_subscriber, register_subscriber};
41
42type PluginMap = HashMap<String, Arc<dyn Plugin>>;
43
44static PLUGIN_HANDLERS: LazyLock<RwLock<PluginMap>> = LazyLock::new(|| RwLock::new(HashMap::new()));
45static ACTIVE_PLUGIN_CONFIGURATION: LazyLock<Mutex<Option<ActivePluginConfiguration>>> =
46    LazyLock::new(|| Mutex::new(None));
47
48/// Error type for generic plugin operations.
49#[derive(Debug, Error)]
50pub enum PluginError {
51    /// Configuration validation failed.
52    #[error("invalid config: {0}")]
53    InvalidConfig(String),
54
55    /// The requested plugin resource was not found.
56    #[error("not found: {0}")]
57    NotFound(String),
58
59    /// A serialization or deserialization operation failed.
60    #[error("serialization error: {0}")]
61    Serialization(#[from] serde_json::Error),
62
63    /// An internal plugin-system error occurred.
64    #[error("internal error: {0}")]
65    Internal(String),
66
67    /// A runtime middleware/subscriber registration failed.
68    #[error("registration failed: {0}")]
69    RegistrationFailed(String),
70}
71
72/// Specialized [`Result`](std::result::Result) type for plugin operations.
73pub type Result<T> = std::result::Result<T, PluginError>;
74
75/// Canonical plugin configuration document.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PluginConfig {
78    /// Plugin config schema version.
79    #[serde(default = "default_plugin_config_version")]
80    pub version: u32,
81    /// Ordered list of top-level plugin components to validate and activate.
82    #[serde(default)]
83    pub components: Vec<PluginComponentSpec>,
84    /// Plugin-level policy for unsupported plugin kinds, fields, and values.
85    #[serde(default)]
86    pub policy: ConfigPolicy,
87}
88
89impl Default for PluginConfig {
90    fn default() -> Self {
91        Self {
92            version: default_plugin_config_version(),
93            components: vec![],
94            policy: ConfigPolicy::default(),
95        }
96    }
97}
98
99/// One configured plugin component.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct PluginComponentSpec {
102    /// Registered plugin kind string.
103    pub kind: String,
104    /// Whether the component should be activated.
105    ///
106    /// Disabled components are still validated but skipped during runtime
107    /// registration.
108    #[serde(default = "default_enabled")]
109    pub enabled: bool,
110    /// Component-local JSON config object passed to the plugin.
111    #[serde(default)]
112    pub config: Map<String, Json>,
113}
114
115impl PluginComponentSpec {
116    /// Creates a new enabled component spec with empty config.
117    pub fn new(kind: impl Into<String>) -> Self {
118        Self {
119            kind: kind.into(),
120            enabled: true,
121            config: Map::new(),
122        }
123    }
124}
125
126/// Structured validation report.
127#[derive(Debug, Clone, Default, Serialize, Deserialize)]
128pub struct ConfigReport {
129    /// Validation and compatibility diagnostics in evaluation order.
130    #[serde(default)]
131    pub diagnostics: Vec<ConfigDiagnostic>,
132}
133
134impl ConfigReport {
135    /// Returns `true` when the report contains at least one error diagnostic.
136    pub fn has_errors(&self) -> bool {
137        self.diagnostics
138            .iter()
139            .any(|diag| diag.level == DiagnosticLevel::Error)
140    }
141}
142
143/// One validation or compatibility diagnostic.
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct ConfigDiagnostic {
146    /// Severity level for the diagnostic.
147    pub level: DiagnosticLevel,
148    /// Stable diagnostic code suitable for machine checks.
149    pub code: String,
150    /// Optional component identifier associated with the diagnostic.
151    #[serde(default, skip_serializing_if = "Option::is_none")]
152    pub component: Option<String>,
153    /// Optional field path associated with the diagnostic.
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub field: Option<String>,
156    /// Human-readable diagnostic message.
157    pub message: String,
158}
159
160/// Diagnostic severity.
161#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(rename_all = "lowercase")]
163pub enum DiagnosticLevel {
164    /// Non-fatal compatibility or validation issue.
165    Warning,
166    /// Fatal validation issue that blocks initialization.
167    Error,
168}
169
170/// Policy for how unsupported plugin/runtime config is handled.
171#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
172pub struct ConfigPolicy {
173    /// Policy applied when a component kind is unknown to the plugin registry.
174    #[serde(default = "default_warn")]
175    pub unknown_component: UnsupportedBehavior,
176    /// Policy applied when a known component contains an unknown field.
177    #[serde(default = "default_warn")]
178    pub unknown_field: UnsupportedBehavior,
179    /// Policy applied when a known field contains an unsupported value.
180    #[serde(default = "default_error")]
181    pub unsupported_value: UnsupportedBehavior,
182}
183
184impl Default for ConfigPolicy {
185    fn default() -> Self {
186        Self {
187            unknown_component: default_warn(),
188            unknown_field: default_warn(),
189            unsupported_value: default_error(),
190        }
191    }
192}
193
194/// Per-policy behavior for unsupported configuration.
195#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
196#[serde(rename_all = "lowercase")]
197pub enum UnsupportedBehavior {
198    /// Suppress the diagnostic entirely.
199    Ignore,
200    /// Emit a warning diagnostic.
201    #[default]
202    Warn,
203    /// Emit an error diagnostic.
204    Error,
205}
206
207fn default_warn() -> UnsupportedBehavior {
208    UnsupportedBehavior::Warn
209}
210
211fn default_error() -> UnsupportedBehavior {
212    UnsupportedBehavior::Error
213}
214
215fn default_plugin_config_version() -> u32 {
216    1
217}
218
219fn default_enabled() -> bool {
220    true
221}
222
223/// Bookkeeping for one middleware/subscriber registration.
224pub struct PluginRegistration {
225    /// Registration kind used for bookkeeping.
226    pub kind: String,
227    /// Runtime-qualified registration name.
228    pub name: String,
229    deregister: Box<dyn FnMut() -> Result<()> + Send>,
230}
231
232impl fmt::Debug for PluginRegistration {
233    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234        f.debug_struct("PluginRegistration")
235            .field("kind", &self.kind)
236            .field("name", &self.name)
237            .finish_non_exhaustive()
238    }
239}
240
241impl PluginRegistration {
242    /// Creates a new registration bookkeeping entry.
243    pub fn new(
244        kind: impl Into<String>,
245        name: impl Into<String>,
246        deregister: Box<dyn FnMut() -> Result<()> + Send>,
247    ) -> Self {
248        Self {
249            kind: kind.into(),
250            name: name.into(),
251            deregister,
252        }
253    }
254}
255
256/// Context provided to plugin handlers during runtime registration.
257///
258/// Each `register_*` call both installs the middleware/subscriber into the
259/// NeMo Flow runtime and records the inverse deregistration closure so the host
260/// can roll back partial setup on failure.
261#[derive(Default)]
262pub struct PluginRegistrationContext {
263    registrations: Vec<PluginRegistration>,
264    namespace: Option<String>,
265}
266
267impl PluginRegistrationContext {
268    /// Creates an empty plugin registration context.
269    pub fn new() -> Self {
270        Self::default()
271    }
272
273    /// Creates a plugin registration context that namespaces all registration names.
274    pub fn with_namespace(namespace: impl Into<String>) -> Self {
275        Self {
276            registrations: vec![],
277            namespace: Some(namespace.into()),
278        }
279    }
280
281    /// Returns the runtime-qualified name for a plugin-local registration.
282    ///
283    /// Plugin handlers should pass stable component-local names such as
284    /// `"tool"` or `"subscriber"`. The host applies the namespace so users do
285    /// not have to provide component instance ids.
286    pub fn qualify_name(&self, name: &str) -> String {
287        match &self.namespace {
288            Some(namespace) => format!("{namespace}{name}"),
289            None => name.to_string(),
290        }
291    }
292
293    /// Registers an event subscriber and records its rollback closure.
294    pub fn register_subscriber(&mut self, name: &str, callback: EventSubscriberFn) -> Result<()> {
295        let qualified_name = self.qualify_name(name);
296        register_subscriber(&qualified_name, callback)
297            .map_err(|err| PluginError::RegistrationFailed(format!("subscriber: {err}")))?;
298
299        let name_owned = qualified_name;
300        self.registrations.push(PluginRegistration::new(
301            "plugin",
302            name_owned.clone(),
303            Box::new(move || {
304                deregister_subscriber(&name_owned)
305                    .map(|_| ())
306                    .map_err(|err| {
307                        PluginError::RegistrationFailed(format!(
308                            "subscriber deregistration failed: {err}"
309                        ))
310                    })
311            }),
312        ));
313        Ok(())
314    }
315
316    /// Registers an LLM request intercept and records its rollback closure.
317    pub fn register_llm_request_intercept(
318        &mut self,
319        name: &str,
320        priority: i32,
321        break_chain: bool,
322        callback: LlmRequestInterceptFn,
323    ) -> Result<()> {
324        let qualified_name = self.qualify_name(name);
325        register_llm_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
326            |err| PluginError::RegistrationFailed(format!("llm request intercept: {err}")),
327        )?;
328
329        let name_owned = qualified_name;
330        self.registrations.push(PluginRegistration::new(
331            "plugin",
332            name_owned.clone(),
333            Box::new(move || {
334                deregister_llm_request_intercept(&name_owned)
335                    .map(|_| ())
336                    .map_err(|err| {
337                        PluginError::RegistrationFailed(format!(
338                            "llm request intercept deregistration failed: {err}"
339                        ))
340                    })
341            }),
342        ));
343        Ok(())
344    }
345
346    /// Registers a tool sanitize-request guardrail and records its rollback closure.
347    pub fn register_tool_sanitize_request_guardrail(
348        &mut self,
349        name: &str,
350        priority: i32,
351        callback: ToolSanitizeFn,
352    ) -> Result<()> {
353        let qualified_name = self.qualify_name(name);
354        register_tool_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
355            |err| {
356                PluginError::RegistrationFailed(format!("tool sanitize request guardrail: {err}"))
357            },
358        )?;
359
360        let name_owned = qualified_name;
361        self.registrations.push(PluginRegistration::new(
362            "plugin",
363            name_owned.clone(),
364            Box::new(move || {
365                deregister_tool_sanitize_request_guardrail(&name_owned)
366                    .map(|_| ())
367                    .map_err(|err| {
368                        PluginError::RegistrationFailed(format!(
369                            "tool sanitize request guardrail deregistration failed: {err}"
370                        ))
371                    })
372            }),
373        ));
374        Ok(())
375    }
376
377    /// Registers a tool sanitize-response guardrail and records its rollback closure.
378    pub fn register_tool_sanitize_response_guardrail(
379        &mut self,
380        name: &str,
381        priority: i32,
382        callback: ToolSanitizeFn,
383    ) -> Result<()> {
384        let qualified_name = self.qualify_name(name);
385        register_tool_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
386            |err| {
387                PluginError::RegistrationFailed(format!("tool sanitize response guardrail: {err}"))
388            },
389        )?;
390
391        let name_owned = qualified_name;
392        self.registrations.push(PluginRegistration::new(
393            "plugin",
394            name_owned.clone(),
395            Box::new(move || {
396                deregister_tool_sanitize_response_guardrail(&name_owned)
397                    .map(|_| ())
398                    .map_err(|err| {
399                        PluginError::RegistrationFailed(format!(
400                            "tool sanitize response guardrail deregistration failed: {err}"
401                        ))
402                    })
403            }),
404        ));
405        Ok(())
406    }
407
408    /// Registers a tool conditional-execution guardrail and records its rollback closure.
409    pub fn register_tool_conditional_execution_guardrail(
410        &mut self,
411        name: &str,
412        priority: i32,
413        callback: ToolConditionalFn,
414    ) -> Result<()> {
415        let qualified_name = self.qualify_name(name);
416        register_tool_conditional_execution_guardrail(&qualified_name, priority, callback)
417            .map_err(|err| {
418                PluginError::RegistrationFailed(format!(
419                    "tool conditional execution guardrail: {err}"
420                ))
421            })?;
422
423        let name_owned = qualified_name;
424        self.registrations.push(PluginRegistration::new(
425            "plugin",
426            name_owned.clone(),
427            Box::new(move || {
428                deregister_tool_conditional_execution_guardrail(&name_owned)
429                    .map(|_| ())
430                    .map_err(|err| {
431                        PluginError::RegistrationFailed(format!(
432                            "tool conditional execution guardrail deregistration failed: {err}"
433                        ))
434                    })
435            }),
436        ));
437        Ok(())
438    }
439
440    /// Registers an LLM sanitize-request guardrail and records its rollback closure.
441    pub fn register_llm_sanitize_request_guardrail(
442        &mut self,
443        name: &str,
444        priority: i32,
445        callback: LlmSanitizeRequestFn,
446    ) -> Result<()> {
447        let qualified_name = self.qualify_name(name);
448        register_llm_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
449            |err| PluginError::RegistrationFailed(format!("llm sanitize request guardrail: {err}")),
450        )?;
451
452        let name_owned = qualified_name;
453        self.registrations.push(PluginRegistration::new(
454            "plugin",
455            name_owned.clone(),
456            Box::new(move || {
457                deregister_llm_sanitize_request_guardrail(&name_owned)
458                    .map(|_| ())
459                    .map_err(|err| {
460                        PluginError::RegistrationFailed(format!(
461                            "llm sanitize request guardrail deregistration failed: {err}"
462                        ))
463                    })
464            }),
465        ));
466        Ok(())
467    }
468
469    /// Registers an LLM sanitize-response guardrail and records its rollback closure.
470    pub fn register_llm_sanitize_response_guardrail(
471        &mut self,
472        name: &str,
473        priority: i32,
474        callback: LlmSanitizeResponseFn,
475    ) -> Result<()> {
476        let qualified_name = self.qualify_name(name);
477        register_llm_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
478            |err| {
479                PluginError::RegistrationFailed(format!("llm sanitize response guardrail: {err}"))
480            },
481        )?;
482
483        let name_owned = qualified_name;
484        self.registrations.push(PluginRegistration::new(
485            "plugin",
486            name_owned.clone(),
487            Box::new(move || {
488                deregister_llm_sanitize_response_guardrail(&name_owned)
489                    .map(|_| ())
490                    .map_err(|err| {
491                        PluginError::RegistrationFailed(format!(
492                            "llm sanitize response guardrail deregistration failed: {err}"
493                        ))
494                    })
495            }),
496        ));
497        Ok(())
498    }
499
500    /// Registers an LLM conditional-execution guardrail and records its rollback closure.
501    pub fn register_llm_conditional_execution_guardrail(
502        &mut self,
503        name: &str,
504        priority: i32,
505        callback: LlmConditionalFn,
506    ) -> Result<()> {
507        let qualified_name = self.qualify_name(name);
508        register_llm_conditional_execution_guardrail(&qualified_name, priority, callback).map_err(
509            |err| {
510                PluginError::RegistrationFailed(format!(
511                    "llm conditional execution guardrail: {err}"
512                ))
513            },
514        )?;
515
516        let name_owned = qualified_name;
517        self.registrations.push(PluginRegistration::new(
518            "plugin",
519            name_owned.clone(),
520            Box::new(move || {
521                deregister_llm_conditional_execution_guardrail(&name_owned)
522                    .map(|_| ())
523                    .map_err(|err| {
524                        PluginError::RegistrationFailed(format!(
525                            "llm conditional execution guardrail deregistration failed: {err}"
526                        ))
527                    })
528            }),
529        ));
530        Ok(())
531    }
532
533    /// Registers an LLM execution intercept and records its rollback closure.
534    pub fn register_llm_execution_intercept(
535        &mut self,
536        name: &str,
537        priority: i32,
538        callback: LlmExecutionFn,
539    ) -> Result<()> {
540        let qualified_name = self.qualify_name(name);
541        register_llm_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
542            PluginError::RegistrationFailed(format!("llm execution intercept: {err}"))
543        })?;
544
545        let name_owned = qualified_name;
546        self.registrations.push(PluginRegistration::new(
547            "plugin",
548            name_owned.clone(),
549            Box::new(move || {
550                deregister_llm_execution_intercept(&name_owned)
551                    .map(|_| ())
552                    .map_err(|err| {
553                        PluginError::RegistrationFailed(format!(
554                            "llm execution intercept deregistration failed: {err}"
555                        ))
556                    })
557            }),
558        ));
559        Ok(())
560    }
561
562    /// Registers an LLM stream execution intercept and records its rollback closure.
563    pub fn register_llm_stream_execution_intercept(
564        &mut self,
565        name: &str,
566        priority: i32,
567        callback: LlmStreamExecutionFn,
568    ) -> Result<()> {
569        let qualified_name = self.qualify_name(name);
570        register_llm_stream_execution_intercept(&qualified_name, priority, callback).map_err(
571            |err| PluginError::RegistrationFailed(format!("llm stream execution intercept: {err}")),
572        )?;
573
574        let name_owned = qualified_name;
575        self.registrations.push(PluginRegistration::new(
576            "plugin",
577            name_owned.clone(),
578            Box::new(move || {
579                deregister_llm_stream_execution_intercept(&name_owned)
580                    .map(|_| ())
581                    .map_err(|err| {
582                        PluginError::RegistrationFailed(format!(
583                            "llm stream execution intercept deregistration failed: {err}"
584                        ))
585                    })
586            }),
587        ));
588        Ok(())
589    }
590
591    /// Registers a tool request intercept and records its rollback closure.
592    pub fn register_tool_request_intercept(
593        &mut self,
594        name: &str,
595        priority: i32,
596        break_chain: bool,
597        callback: ToolInterceptFn,
598    ) -> Result<()> {
599        let qualified_name = self.qualify_name(name);
600        register_tool_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
601            |err| PluginError::RegistrationFailed(format!("tool request intercept: {err}")),
602        )?;
603
604        let name_owned = qualified_name;
605        self.registrations.push(PluginRegistration::new(
606            "plugin",
607            name_owned.clone(),
608            Box::new(move || {
609                deregister_tool_request_intercept(&name_owned)
610                    .map(|_| ())
611                    .map_err(|err| {
612                        PluginError::RegistrationFailed(format!(
613                            "tool request intercept deregistration failed: {err}"
614                        ))
615                    })
616            }),
617        ));
618        Ok(())
619    }
620
621    /// Registers a tool execution intercept and records its rollback closure.
622    pub fn register_tool_execution_intercept(
623        &mut self,
624        name: &str,
625        priority: i32,
626        callback: ToolExecutionFn,
627    ) -> Result<()> {
628        let qualified_name = self.qualify_name(name);
629        register_tool_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
630            PluginError::RegistrationFailed(format!("tool execution intercept: {err}"))
631        })?;
632
633        let name_owned = qualified_name;
634        self.registrations.push(PluginRegistration::new(
635            "plugin",
636            name_owned.clone(),
637            Box::new(move || {
638                deregister_tool_execution_intercept(&name_owned)
639                    .map(|_| ())
640                    .map_err(|err| {
641                        PluginError::RegistrationFailed(format!(
642                            "tool execution intercept deregistration failed: {err}"
643                        ))
644                    })
645            }),
646        ));
647        Ok(())
648    }
649
650    /// Adds a prebuilt registration to the context.
651    pub fn add_registration(&mut self, registration: PluginRegistration) {
652        self.registrations.push(registration);
653    }
654
655    /// Extends the context with prebuilt registrations.
656    pub fn extend_registrations(&mut self, registrations: Vec<PluginRegistration>) {
657        self.registrations.extend(registrations);
658    }
659
660    /// Consumes the context and returns the recorded registrations.
661    pub fn into_registrations(self) -> Vec<PluginRegistration> {
662        self.registrations
663    }
664}
665
666/// Implemented by custom plugins that register runtime middleware.
667pub trait Plugin: Send + Sync + 'static {
668    /// Returns the unique plugin kind string.
669    fn plugin_kind(&self) -> &str;
670
671    /// Returns whether the plugin kind can appear multiple times in the config.
672    ///
673    /// Return `false` for singleton components such as the built-in adaptive
674    /// component.
675    fn allows_multiple_components(&self) -> bool {
676        true
677    }
678
679    /// Validates one plugin component config.
680    ///
681    /// Returning error-level diagnostics prevents `initialize_plugins(...)`
682    /// from activating the configuration.
683    fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic>;
684
685    /// Registers runtime middleware/subscribers for one plugin component.
686    ///
687    /// The provided [`PluginRegistrationContext`] is component-scoped. Any
688    /// error aborts the current initialization and triggers rollback of
689    /// registrations created during the failed activation attempt.
690    fn register<'a>(
691        &'a self,
692        plugin_config: &Map<String, Json>,
693        ctx: &'a mut PluginRegistrationContext,
694    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
695}
696
697/// Registers a plugin by kind.
698///
699/// Registering the same kind twice returns [`PluginError::RegistrationFailed`].
700/// Register a plugin kind with the global plugin registry.
701///
702/// Registered plugins can then participate in validation and initialization of
703/// [`PluginConfig`] documents.
704///
705/// # Parameters
706/// - `plugin`: Plugin implementation to register.
707///
708/// # Returns
709/// A plugin [`Result`] that is `Ok(())` when the plugin kind was added
710/// to the registry.
711///
712/// # Errors
713/// Returns an error when a plugin with the same kind is already registered or
714/// when the registry lock is poisoned.
715///
716/// # Notes
717/// Registration affects future validation and initialization only.
718pub fn register_plugin(plugin: Arc<dyn Plugin>) -> Result<()> {
719    let mut guard = PLUGIN_HANDLERS
720        .write()
721        .map_err(|err| PluginError::Internal(format!("plugin registry lock poisoned: {err}")))?;
722    let plugin_kind = plugin.plugin_kind().to_string();
723    if guard.contains_key(&plugin_kind) {
724        return Err(PluginError::RegistrationFailed(format!(
725            "plugin '{plugin_kind}' is already registered"
726        )));
727    }
728    guard.insert(plugin_kind, plugin);
729    Ok(())
730}
731
732/// Removes a previously registered plugin.
733///
734/// This affects future validation and initialization only. Active runtime
735/// registrations remain until cleared or replaced.
736///
737/// # Parameters
738/// - `plugin_kind`: Plugin kind to remove from the registry.
739///
740/// # Returns
741/// `true` when a plugin was removed from the registry and `false` when the
742/// kind was not registered.
743///
744/// # Notes
745/// Active component registrations created by previous initialization calls are
746/// not removed by this function.
747pub fn deregister_plugin(plugin_kind: &str) -> bool {
748    PLUGIN_HANDLERS
749        .write()
750        .ok()
751        .and_then(|mut guard| guard.remove(plugin_kind))
752        .is_some()
753}
754
755/// Lists registered plugin kinds in sorted order.
756///
757/// This returns the currently registered plugin kinds without inspecting the
758/// active runtime configuration.
759///
760/// # Returns
761/// A sorted [`Vec<String>`] of registered plugin kinds.
762///
763/// # Notes
764/// Disabled or inactive components still appear here when their plugin kind is
765/// registered.
766pub fn list_plugin_kinds() -> Vec<String> {
767    let mut kinds = PLUGIN_HANDLERS
768        .read()
769        .map(|guard| guard.keys().cloned().collect::<Vec<_>>())
770        .unwrap_or_default();
771    kinds.sort();
772    kinds
773}
774
775/// Looks up a registered plugin by kind.
776///
777/// # Parameters
778/// - `plugin_kind`: Plugin kind to resolve.
779///
780/// # Returns
781/// The registered plugin implementation for `plugin_kind`, or `None` when the
782/// kind is unknown.
783///
784/// # Notes
785/// The returned plugin is shared by [`Arc`], so callers receive a cheap clone.
786pub fn lookup_plugin(plugin_kind: &str) -> Option<Arc<dyn Plugin>> {
787    PLUGIN_HANDLERS
788        .read()
789        .ok()
790        .and_then(|guard| guard.get(plugin_kind).cloned())
791}
792
793/// Validates a plugin configuration document.
794///
795/// This is a pure validation pass. It does not mutate the active runtime
796/// configuration.
797///
798/// # Parameters
799/// - `config`: Plugin configuration to validate.
800///
801/// # Returns
802/// A [`ConfigReport`] describing warnings and errors discovered during
803/// validation.
804///
805/// # Notes
806/// Validation checks host policy, plugin multiplicity rules, unknown component
807/// kinds, and plugin-provided validation hooks.
808pub fn validate_plugin_config(config: &PluginConfig) -> ConfigReport {
809    let mut report = ConfigReport::default();
810
811    if config.version != 1 {
812        push_policy_diag(
813            &mut report.diagnostics,
814            config.policy.unsupported_value,
815            "plugin.unsupported_config_version",
816            None,
817            Some("version".to_string()),
818            format!("plugin config version {} is unsupported", config.version),
819        );
820    }
821
822    validate_plugin_multiplicity(&mut report, config);
823
824    for component in &config.components {
825        let Some(plugin) = lookup_plugin(&component.kind) else {
826            push_policy_diag(
827                &mut report.diagnostics,
828                config.policy.unknown_component,
829                "plugin.unknown_component",
830                Some(component.kind.clone()),
831                None,
832                format!("plugin component kind '{}' is unsupported", component.kind),
833            );
834            continue;
835        };
836        report
837            .diagnostics
838            .extend(plugin.validate(&component.config));
839    }
840
841    report
842}
843
844/// Configures the active global plugin components.
845///
846/// Initialization validates the supplied config, replaces the active
847/// configuration, and rolls back partial registration on failure. If a
848/// previous configuration was active, the host attempts to restore it when the
849/// new activation fails.
850///
851/// # Parameters
852/// - `config`: Plugin configuration to validate and activate.
853///
854/// # Returns
855/// A plugin [`Result`] containing the successful [`ConfigReport`].
856///
857/// # Errors
858/// Returns an error when validation fails, when plugin registration fails, or
859/// when the previous configuration cannot be restored after a failed replace.
860///
861/// # Notes
862/// Initialization is replace-with-rollback: the previous active configuration
863/// is removed before the new configuration is activated.
864pub async fn initialize_plugins(config: PluginConfig) -> Result<ConfigReport> {
865    let report = validate_plugin_config(&config);
866    if report.has_errors() {
867        return Err(PluginError::InvalidConfig(join_error_messages(&report)));
868    }
869
870    let previous = {
871        let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
872            PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
873        })?;
874        guard.take()
875    };
876
877    if let Some(mut previous_state) = previous {
878        rollback_registrations(&mut previous_state.registrations);
879        match initialize_plugin_components(&config).await {
880            Ok(registrations) => {
881                store_active_plugin_configuration(config, report.clone(), registrations)?;
882                Ok(report)
883            }
884            Err(err) => match initialize_plugin_components(&previous_state.config).await {
885                Ok(registrations) => {
886                    let previous_report = validate_plugin_config(&previous_state.config);
887                    store_active_plugin_configuration(
888                        previous_state.config,
889                        previous_report,
890                        registrations,
891                    )?;
892                    Err(err)
893                }
894                Err(restore_err) => Err(PluginError::RegistrationFailed(format!(
895                    "{err}; previous plugin configuration could not be restored: {restore_err}"
896                ))),
897            },
898        }
899    } else {
900        let registrations = initialize_plugin_components(&config).await?;
901        store_active_plugin_configuration(config, report.clone(), registrations)?;
902        Ok(report)
903    }
904}
905
906/// Deregisters and clears all configured plugin components.
907///
908/// Registered plugin kinds remain available for future validation and
909/// initialization.
910///
911/// # Returns
912/// A plugin [`Result`] that is `Ok(())` when the active configuration
913/// has been cleared.
914///
915/// # Errors
916/// Returns an error when the active configuration lock is poisoned.
917///
918/// # Notes
919/// Clearing active configuration does not remove plugin kinds from the global
920/// registry.
921pub fn clear_plugin_configuration() -> Result<()> {
922    let previous = {
923        let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
924            PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
925        })?;
926        guard.take()
927    };
928    if let Some(mut previous_state) = previous {
929        rollback_registrations(&mut previous_state.registrations);
930    }
931    Ok(())
932}
933
934/// Returns the last successfully configured plugin report.
935///
936/// `None` indicates that no plugin configuration is currently active.
937///
938/// # Returns
939/// The last successful [`ConfigReport`], or `None` when no configuration is
940/// active.
941///
942/// # Notes
943/// This is a snapshot of the last successful activation and does not re-run
944/// validation.
945pub fn active_plugin_report() -> Option<ConfigReport> {
946    ACTIVE_PLUGIN_CONFIGURATION
947        .lock()
948        .ok()
949        .and_then(|guard| guard.as_ref().map(|state| state.report.clone()))
950}
951
952/// Rolls back registrations in reverse order, ignoring rollback failures.
953///
954/// This is used internally during failed initialization and by
955/// [`clear_plugin_configuration`].
956pub fn rollback_registrations(registrations: &mut Vec<PluginRegistration>) {
957    for registration in registrations.iter_mut().rev() {
958        let _ = (registration.deregister)();
959    }
960    registrations.clear();
961}
962
963struct ActivePluginConfiguration {
964    config: PluginConfig,
965    report: ConfigReport,
966    registrations: Vec<PluginRegistration>,
967}
968
969async fn initialize_plugin_components(config: &PluginConfig) -> Result<Vec<PluginRegistration>> {
970    let totals = plugin_component_totals(config);
971    let mut ordinals: HashMap<&str, usize> = HashMap::new();
972    let mut registrations = vec![];
973
974    for component in config
975        .components
976        .iter()
977        .filter(|component| component.enabled)
978    {
979        let Some(plugin) = lookup_plugin(&component.kind) else {
980            rollback_registrations(&mut registrations);
981            return Err(PluginError::NotFound(format!(
982                "plugin component '{}' is not registered",
983                component.kind
984            )));
985        };
986
987        let ordinal = ordinals
988            .entry(component.kind.as_str())
989            .and_modify(|value| *value += 1)
990            .or_insert(1);
991        let namespace = component_namespace(
992            &component.kind,
993            *ordinal,
994            totals.get(component.kind.as_str()).copied().unwrap_or(1),
995        );
996
997        let mut ctx = PluginRegistrationContext::with_namespace(namespace);
998        if let Err(err) = plugin.register(&component.config, &mut ctx).await {
999            let mut just_registered = ctx.into_registrations();
1000            rollback_registrations(&mut just_registered);
1001            rollback_registrations(&mut registrations);
1002            return Err(err);
1003        }
1004        registrations.extend(ctx.into_registrations());
1005    }
1006
1007    Ok(registrations)
1008}
1009
1010fn store_active_plugin_configuration(
1011    config: PluginConfig,
1012    report: ConfigReport,
1013    registrations: Vec<PluginRegistration>,
1014) -> Result<()> {
1015    let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
1016        PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
1017    })?;
1018    *guard = Some(ActivePluginConfiguration {
1019        config,
1020        report,
1021        registrations,
1022    });
1023    Ok(())
1024}
1025
1026fn plugin_component_totals(config: &PluginConfig) -> HashMap<&str, usize> {
1027    let mut totals = HashMap::new();
1028    for component in &config.components {
1029        *totals.entry(component.kind.as_str()).or_insert(0) += 1;
1030    }
1031    totals
1032}
1033
1034fn component_namespace(kind: &str, ordinal: usize, total: usize) -> String {
1035    if total > 1 {
1036        format!("__nemo_flow_plugin__{kind}__{ordinal}__")
1037    } else {
1038        format!("__nemo_flow_plugin__{kind}__")
1039    }
1040}
1041
1042fn validate_plugin_multiplicity(report: &mut ConfigReport, config: &PluginConfig) {
1043    let totals = plugin_component_totals(config);
1044    let mut emitted = HashSet::new();
1045
1046    for component in &config.components {
1047        let count = totals
1048            .get(component.kind.as_str())
1049            .copied()
1050            .unwrap_or_default();
1051        if count <= 1 || !emitted.insert(component.kind.clone()) {
1052            continue;
1053        }
1054
1055        let allows_multiple = lookup_plugin(&component.kind)
1056            .map(|plugin| plugin.allows_multiple_components())
1057            .unwrap_or(true);
1058        if !allows_multiple {
1059            report.diagnostics.push(ConfigDiagnostic {
1060                level: DiagnosticLevel::Error,
1061                code: "plugin.duplicate_component".to_string(),
1062                component: Some(component.kind.clone()),
1063                field: None,
1064                message: format!(
1065                    "plugin component kind '{}' may only appear once",
1066                    component.kind
1067                ),
1068            });
1069        }
1070    }
1071}
1072
1073fn push_policy_diag(
1074    diagnostics: &mut Vec<ConfigDiagnostic>,
1075    behavior: UnsupportedBehavior,
1076    code: &str,
1077    component: Option<String>,
1078    field: Option<String>,
1079    message: String,
1080) {
1081    let level = match behavior {
1082        UnsupportedBehavior::Ignore => return,
1083        UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
1084        UnsupportedBehavior::Error => DiagnosticLevel::Error,
1085    };
1086
1087    diagnostics.push(ConfigDiagnostic {
1088        level,
1089        code: code.to_string(),
1090        component,
1091        field,
1092        message,
1093    });
1094}
1095
1096fn join_error_messages(report: &ConfigReport) -> String {
1097    report
1098        .diagnostics
1099        .iter()
1100        .filter(|diag| diag.level == DiagnosticLevel::Error)
1101        .map(|diag| diag.message.as_str())
1102        .collect::<Vec<_>>()
1103        .join("; ")
1104}
1105
1106#[cfg(test)]
1107#[path = "../tests/unit/plugin_tests.rs"]
1108mod tests;