Skip to main content

nemo_flow/
plugin.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Generic plugin infrastructure for NeMo Flow runtimes.
5//!
6//! This module owns:
7//! - config diagnostics and policy enums used by plugin systems
8//! - a global plugin registry
9//! - plugin registration contexts for middleware/subscriber installation
10//! - rollback bookkeeping for registrations created during plugin setup
11
12use std::collections::{HashMap, HashSet};
13use std::fmt;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock};
17
18use serde::{Deserialize, Serialize};
19use serde_json::{Map, Value as Json};
20use thiserror::Error;
21
22use crate::api::registry::{
23    deregister_llm_conditional_execution_guardrail, deregister_llm_execution_intercept,
24    deregister_llm_request_intercept, deregister_llm_sanitize_request_guardrail,
25    deregister_llm_sanitize_response_guardrail, deregister_llm_stream_execution_intercept,
26    deregister_tool_conditional_execution_guardrail, deregister_tool_execution_intercept,
27    deregister_tool_request_intercept, deregister_tool_sanitize_request_guardrail,
28    deregister_tool_sanitize_response_guardrail, register_llm_conditional_execution_guardrail,
29    register_llm_execution_intercept, register_llm_request_intercept,
30    register_llm_sanitize_request_guardrail, register_llm_sanitize_response_guardrail,
31    register_llm_stream_execution_intercept, register_tool_conditional_execution_guardrail,
32    register_tool_execution_intercept, register_tool_request_intercept,
33    register_tool_sanitize_request_guardrail, register_tool_sanitize_response_guardrail,
34};
35use crate::api::runtime::{
36    EventSubscriberFn, LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn,
37    LlmSanitizeRequestFn, LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn,
38    ToolExecutionFn, ToolInterceptFn, ToolSanitizeFn,
39};
40use crate::api::subscriber::{deregister_subscriber, register_subscriber};
41
42type PluginMap = HashMap<String, Arc<dyn Plugin>>;
43
44static PLUGIN_HANDLERS: LazyLock<RwLock<PluginMap>> = LazyLock::new(|| RwLock::new(HashMap::new()));
45static ACTIVE_PLUGIN_CONFIGURATION: LazyLock<Mutex<Option<ActivePluginConfiguration>>> =
46    LazyLock::new(|| Mutex::new(None));
47static BUILTIN_PLUGIN_REGISTRATION: OnceLock<Result<()>> = OnceLock::new();
48
49/// Error type for generic plugin operations.
50#[derive(Debug, Error)]
51pub enum PluginError {
52    /// Configuration validation failed.
53    #[error("invalid config: {0}")]
54    InvalidConfig(String),
55
56    /// The requested plugin resource was not found.
57    #[error("not found: {0}")]
58    NotFound(String),
59
60    /// A serialization or deserialization operation failed.
61    #[error("serialization error: {0}")]
62    Serialization(#[from] serde_json::Error),
63
64    /// An internal plugin-system error occurred.
65    #[error("internal error: {0}")]
66    Internal(String),
67
68    /// A runtime middleware/subscriber registration failed.
69    #[error("registration failed: {0}")]
70    RegistrationFailed(String),
71}
72
73/// Specialized [`Result`](std::result::Result) type for plugin operations.
74pub type Result<T> = std::result::Result<T, PluginError>;
75
76/// Canonical plugin configuration document.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
79pub struct PluginConfig {
80    /// Plugin config schema version.
81    #[serde(default = "default_plugin_config_version")]
82    pub version: u32,
83    /// Ordered list of top-level plugin components to validate and activate.
84    #[serde(default)]
85    pub components: Vec<PluginComponentSpec>,
86    /// Plugin-level policy for unsupported plugin kinds, fields, and values.
87    #[serde(default)]
88    pub policy: ConfigPolicy,
89}
90
91impl Default for PluginConfig {
92    fn default() -> Self {
93        Self {
94            version: default_plugin_config_version(),
95            components: vec![],
96            policy: ConfigPolicy::default(),
97        }
98    }
99}
100
101/// One configured plugin component.
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
104pub struct PluginComponentSpec {
105    /// Registered plugin kind string.
106    pub kind: String,
107    /// Whether the component should be activated.
108    ///
109    /// Disabled components are still validated but skipped during runtime
110    /// registration.
111    #[serde(default = "default_enabled")]
112    pub enabled: bool,
113    /// Component-local JSON config object passed to the plugin.
114    #[serde(default)]
115    pub config: Map<String, Json>,
116}
117
118impl PluginComponentSpec {
119    /// Creates a new enabled component spec with empty config.
120    pub fn new(kind: impl Into<String>) -> Self {
121        Self {
122            kind: kind.into(),
123            enabled: true,
124            config: Map::new(),
125        }
126    }
127}
128
129/// Structured validation report.
130#[derive(Debug, Clone, Default, Serialize, Deserialize)]
131#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
132pub struct ConfigReport {
133    /// Validation and compatibility diagnostics in evaluation order.
134    #[serde(default)]
135    pub diagnostics: Vec<ConfigDiagnostic>,
136}
137
138impl ConfigReport {
139    /// Returns `true` when the report contains at least one error diagnostic.
140    pub fn has_errors(&self) -> bool {
141        self.diagnostics
142            .iter()
143            .any(|diag| diag.level == DiagnosticLevel::Error)
144    }
145}
146
147/// One validation or compatibility diagnostic.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
150pub struct ConfigDiagnostic {
151    /// Severity level for the diagnostic.
152    pub level: DiagnosticLevel,
153    /// Stable diagnostic code suitable for machine checks.
154    pub code: String,
155    /// Optional component identifier associated with the diagnostic.
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub component: Option<String>,
158    /// Optional field path associated with the diagnostic.
159    #[serde(default, skip_serializing_if = "Option::is_none")]
160    pub field: Option<String>,
161    /// Human-readable diagnostic message.
162    pub message: String,
163}
164
165/// Diagnostic severity.
166#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
167#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
168#[serde(rename_all = "lowercase")]
169pub enum DiagnosticLevel {
170    /// Non-fatal compatibility or validation issue.
171    Warning,
172    /// Fatal validation issue that blocks initialization.
173    Error,
174}
175
176/// Policy for how unsupported plugin/runtime config is handled.
177#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
178#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
179pub struct ConfigPolicy {
180    /// Policy applied when a component kind is unknown to the plugin registry.
181    #[serde(default = "default_warn")]
182    pub unknown_component: UnsupportedBehavior,
183    /// Policy applied when a known component contains an unknown field.
184    #[serde(default = "default_warn")]
185    pub unknown_field: UnsupportedBehavior,
186    /// Policy applied when a known field contains an unsupported value.
187    #[serde(default = "default_error")]
188    pub unsupported_value: UnsupportedBehavior,
189}
190
191impl Default for ConfigPolicy {
192    fn default() -> Self {
193        Self {
194            unknown_component: default_warn(),
195            unknown_field: default_warn(),
196            unsupported_value: default_error(),
197        }
198    }
199}
200
201crate::editor_config! {
202    impl ConfigPolicy {
203        unknown_component => {
204            label: "unknown_component",
205            kind: Enum,
206            values: ["warn", "ignore", "error"],
207        },
208        unknown_field => {
209            label: "unknown_field",
210            kind: Enum,
211            values: ["warn", "ignore", "error"],
212        },
213        unsupported_value => {
214            label: "unsupported_value",
215            kind: Enum,
216            values: ["warn", "ignore", "error"],
217        },
218    }
219}
220
221/// Per-policy behavior for unsupported configuration.
222#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
223#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
224#[serde(rename_all = "lowercase")]
225pub enum UnsupportedBehavior {
226    /// Suppress the diagnostic entirely.
227    Ignore,
228    /// Emit a warning diagnostic.
229    #[default]
230    Warn,
231    /// Emit an error diagnostic.
232    Error,
233}
234
235fn default_warn() -> UnsupportedBehavior {
236    UnsupportedBehavior::Warn
237}
238
239fn default_error() -> UnsupportedBehavior {
240    UnsupportedBehavior::Error
241}
242
243fn default_plugin_config_version() -> u32 {
244    1
245}
246
247fn default_enabled() -> bool {
248    true
249}
250
251/// Bookkeeping for one middleware/subscriber registration.
252pub struct PluginRegistration {
253    /// Registration kind used for bookkeeping.
254    pub kind: String,
255    /// Runtime-qualified registration name.
256    pub name: String,
257    deregister: Box<dyn FnMut() -> Result<()> + Send>,
258}
259
260impl fmt::Debug for PluginRegistration {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        f.debug_struct("PluginRegistration")
263            .field("kind", &self.kind)
264            .field("name", &self.name)
265            .finish_non_exhaustive()
266    }
267}
268
269impl PluginRegistration {
270    /// Creates a new registration bookkeeping entry.
271    pub fn new(
272        kind: impl Into<String>,
273        name: impl Into<String>,
274        deregister: Box<dyn FnMut() -> Result<()> + Send>,
275    ) -> Self {
276        Self {
277            kind: kind.into(),
278            name: name.into(),
279            deregister,
280        }
281    }
282}
283
284/// Context provided to plugin handlers during runtime registration.
285///
286/// Each `register_*` call both installs the middleware/subscriber into the
287/// NeMo Flow runtime and records the inverse deregistration closure so the host
288/// can roll back partial setup on failure.
289#[derive(Default)]
290pub struct PluginRegistrationContext {
291    registrations: Vec<PluginRegistration>,
292    namespace: Option<String>,
293}
294
295impl PluginRegistrationContext {
296    /// Creates an empty plugin registration context.
297    pub fn new() -> Self {
298        Self::default()
299    }
300
301    /// Creates a plugin registration context that namespaces all registration names.
302    pub fn with_namespace(namespace: impl Into<String>) -> Self {
303        Self {
304            registrations: vec![],
305            namespace: Some(namespace.into()),
306        }
307    }
308
309    /// Returns the runtime-qualified name for a plugin-local registration.
310    ///
311    /// Plugin handlers should pass stable component-local names such as
312    /// `"tool"` or `"subscriber"`. The host applies the namespace so users do
313    /// not have to provide component instance ids.
314    pub fn qualify_name(&self, name: &str) -> String {
315        match &self.namespace {
316            Some(namespace) => format!("{namespace}{name}"),
317            None => name.to_string(),
318        }
319    }
320
321    /// Registers an event subscriber and records its rollback closure.
322    pub fn register_subscriber(&mut self, name: &str, callback: EventSubscriberFn) -> Result<()> {
323        let qualified_name = self.qualify_name(name);
324        register_subscriber(&qualified_name, callback)
325            .map_err(|err| PluginError::RegistrationFailed(format!("subscriber: {err}")))?;
326
327        let name_owned = qualified_name;
328        self.registrations.push(PluginRegistration::new(
329            "plugin",
330            name_owned.clone(),
331            Box::new(move || {
332                deregister_subscriber(&name_owned)
333                    .map(|_| ())
334                    .map_err(|err| {
335                        PluginError::RegistrationFailed(format!(
336                            "subscriber deregistration failed: {err}"
337                        ))
338                    })
339            }),
340        ));
341        Ok(())
342    }
343
344    /// Registers an LLM request intercept and records its rollback closure.
345    pub fn register_llm_request_intercept(
346        &mut self,
347        name: &str,
348        priority: i32,
349        break_chain: bool,
350        callback: LlmRequestInterceptFn,
351    ) -> Result<()> {
352        let qualified_name = self.qualify_name(name);
353        register_llm_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
354            |err| PluginError::RegistrationFailed(format!("llm request intercept: {err}")),
355        )?;
356
357        let name_owned = qualified_name;
358        self.registrations.push(PluginRegistration::new(
359            "plugin",
360            name_owned.clone(),
361            Box::new(move || {
362                deregister_llm_request_intercept(&name_owned)
363                    .map(|_| ())
364                    .map_err(|err| {
365                        PluginError::RegistrationFailed(format!(
366                            "llm request intercept deregistration failed: {err}"
367                        ))
368                    })
369            }),
370        ));
371        Ok(())
372    }
373
374    /// Registers a tool sanitize-request guardrail and records its rollback closure.
375    pub fn register_tool_sanitize_request_guardrail(
376        &mut self,
377        name: &str,
378        priority: i32,
379        callback: ToolSanitizeFn,
380    ) -> Result<()> {
381        let qualified_name = self.qualify_name(name);
382        register_tool_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
383            |err| {
384                PluginError::RegistrationFailed(format!("tool sanitize request guardrail: {err}"))
385            },
386        )?;
387
388        let name_owned = qualified_name;
389        self.registrations.push(PluginRegistration::new(
390            "plugin",
391            name_owned.clone(),
392            Box::new(move || {
393                deregister_tool_sanitize_request_guardrail(&name_owned)
394                    .map(|_| ())
395                    .map_err(|err| {
396                        PluginError::RegistrationFailed(format!(
397                            "tool sanitize request guardrail deregistration failed: {err}"
398                        ))
399                    })
400            }),
401        ));
402        Ok(())
403    }
404
405    /// Registers a tool sanitize-response guardrail and records its rollback closure.
406    pub fn register_tool_sanitize_response_guardrail(
407        &mut self,
408        name: &str,
409        priority: i32,
410        callback: ToolSanitizeFn,
411    ) -> Result<()> {
412        let qualified_name = self.qualify_name(name);
413        register_tool_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
414            |err| {
415                PluginError::RegistrationFailed(format!("tool sanitize response guardrail: {err}"))
416            },
417        )?;
418
419        let name_owned = qualified_name;
420        self.registrations.push(PluginRegistration::new(
421            "plugin",
422            name_owned.clone(),
423            Box::new(move || {
424                deregister_tool_sanitize_response_guardrail(&name_owned)
425                    .map(|_| ())
426                    .map_err(|err| {
427                        PluginError::RegistrationFailed(format!(
428                            "tool sanitize response guardrail deregistration failed: {err}"
429                        ))
430                    })
431            }),
432        ));
433        Ok(())
434    }
435
436    /// Registers a tool conditional-execution guardrail and records its rollback closure.
437    pub fn register_tool_conditional_execution_guardrail(
438        &mut self,
439        name: &str,
440        priority: i32,
441        callback: ToolConditionalFn,
442    ) -> Result<()> {
443        let qualified_name = self.qualify_name(name);
444        register_tool_conditional_execution_guardrail(&qualified_name, priority, callback)
445            .map_err(|err| {
446                PluginError::RegistrationFailed(format!(
447                    "tool conditional execution guardrail: {err}"
448                ))
449            })?;
450
451        let name_owned = qualified_name;
452        self.registrations.push(PluginRegistration::new(
453            "plugin",
454            name_owned.clone(),
455            Box::new(move || {
456                deregister_tool_conditional_execution_guardrail(&name_owned)
457                    .map(|_| ())
458                    .map_err(|err| {
459                        PluginError::RegistrationFailed(format!(
460                            "tool conditional execution guardrail deregistration failed: {err}"
461                        ))
462                    })
463            }),
464        ));
465        Ok(())
466    }
467
468    /// Registers an LLM sanitize-request guardrail and records its rollback closure.
469    pub fn register_llm_sanitize_request_guardrail(
470        &mut self,
471        name: &str,
472        priority: i32,
473        callback: LlmSanitizeRequestFn,
474    ) -> Result<()> {
475        let qualified_name = self.qualify_name(name);
476        register_llm_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
477            |err| PluginError::RegistrationFailed(format!("llm sanitize request guardrail: {err}")),
478        )?;
479
480        let name_owned = qualified_name;
481        self.registrations.push(PluginRegistration::new(
482            "plugin",
483            name_owned.clone(),
484            Box::new(move || {
485                deregister_llm_sanitize_request_guardrail(&name_owned)
486                    .map(|_| ())
487                    .map_err(|err| {
488                        PluginError::RegistrationFailed(format!(
489                            "llm sanitize request guardrail deregistration failed: {err}"
490                        ))
491                    })
492            }),
493        ));
494        Ok(())
495    }
496
497    /// Registers an LLM sanitize-response guardrail and records its rollback closure.
498    pub fn register_llm_sanitize_response_guardrail(
499        &mut self,
500        name: &str,
501        priority: i32,
502        callback: LlmSanitizeResponseFn,
503    ) -> Result<()> {
504        let qualified_name = self.qualify_name(name);
505        register_llm_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
506            |err| {
507                PluginError::RegistrationFailed(format!("llm sanitize response guardrail: {err}"))
508            },
509        )?;
510
511        let name_owned = qualified_name;
512        self.registrations.push(PluginRegistration::new(
513            "plugin",
514            name_owned.clone(),
515            Box::new(move || {
516                deregister_llm_sanitize_response_guardrail(&name_owned)
517                    .map(|_| ())
518                    .map_err(|err| {
519                        PluginError::RegistrationFailed(format!(
520                            "llm sanitize response guardrail deregistration failed: {err}"
521                        ))
522                    })
523            }),
524        ));
525        Ok(())
526    }
527
528    /// Registers an LLM conditional-execution guardrail and records its rollback closure.
529    pub fn register_llm_conditional_execution_guardrail(
530        &mut self,
531        name: &str,
532        priority: i32,
533        callback: LlmConditionalFn,
534    ) -> Result<()> {
535        let qualified_name = self.qualify_name(name);
536        register_llm_conditional_execution_guardrail(&qualified_name, priority, callback).map_err(
537            |err| {
538                PluginError::RegistrationFailed(format!(
539                    "llm conditional execution guardrail: {err}"
540                ))
541            },
542        )?;
543
544        let name_owned = qualified_name;
545        self.registrations.push(PluginRegistration::new(
546            "plugin",
547            name_owned.clone(),
548            Box::new(move || {
549                deregister_llm_conditional_execution_guardrail(&name_owned)
550                    .map(|_| ())
551                    .map_err(|err| {
552                        PluginError::RegistrationFailed(format!(
553                            "llm conditional execution guardrail deregistration failed: {err}"
554                        ))
555                    })
556            }),
557        ));
558        Ok(())
559    }
560
561    /// Registers an LLM execution intercept and records its rollback closure.
562    pub fn register_llm_execution_intercept(
563        &mut self,
564        name: &str,
565        priority: i32,
566        callback: LlmExecutionFn,
567    ) -> Result<()> {
568        let qualified_name = self.qualify_name(name);
569        register_llm_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
570            PluginError::RegistrationFailed(format!("llm execution intercept: {err}"))
571        })?;
572
573        let name_owned = qualified_name;
574        self.registrations.push(PluginRegistration::new(
575            "plugin",
576            name_owned.clone(),
577            Box::new(move || {
578                deregister_llm_execution_intercept(&name_owned)
579                    .map(|_| ())
580                    .map_err(|err| {
581                        PluginError::RegistrationFailed(format!(
582                            "llm execution intercept deregistration failed: {err}"
583                        ))
584                    })
585            }),
586        ));
587        Ok(())
588    }
589
590    /// Registers an LLM stream execution intercept and records its rollback closure.
591    pub fn register_llm_stream_execution_intercept(
592        &mut self,
593        name: &str,
594        priority: i32,
595        callback: LlmStreamExecutionFn,
596    ) -> Result<()> {
597        let qualified_name = self.qualify_name(name);
598        register_llm_stream_execution_intercept(&qualified_name, priority, callback).map_err(
599            |err| PluginError::RegistrationFailed(format!("llm stream execution intercept: {err}")),
600        )?;
601
602        let name_owned = qualified_name;
603        self.registrations.push(PluginRegistration::new(
604            "plugin",
605            name_owned.clone(),
606            Box::new(move || {
607                deregister_llm_stream_execution_intercept(&name_owned)
608                    .map(|_| ())
609                    .map_err(|err| {
610                        PluginError::RegistrationFailed(format!(
611                            "llm stream execution intercept deregistration failed: {err}"
612                        ))
613                    })
614            }),
615        ));
616        Ok(())
617    }
618
619    /// Registers a tool request intercept and records its rollback closure.
620    pub fn register_tool_request_intercept(
621        &mut self,
622        name: &str,
623        priority: i32,
624        break_chain: bool,
625        callback: ToolInterceptFn,
626    ) -> Result<()> {
627        let qualified_name = self.qualify_name(name);
628        register_tool_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
629            |err| PluginError::RegistrationFailed(format!("tool request intercept: {err}")),
630        )?;
631
632        let name_owned = qualified_name;
633        self.registrations.push(PluginRegistration::new(
634            "plugin",
635            name_owned.clone(),
636            Box::new(move || {
637                deregister_tool_request_intercept(&name_owned)
638                    .map(|_| ())
639                    .map_err(|err| {
640                        PluginError::RegistrationFailed(format!(
641                            "tool request intercept deregistration failed: {err}"
642                        ))
643                    })
644            }),
645        ));
646        Ok(())
647    }
648
649    /// Registers a tool execution intercept and records its rollback closure.
650    pub fn register_tool_execution_intercept(
651        &mut self,
652        name: &str,
653        priority: i32,
654        callback: ToolExecutionFn,
655    ) -> Result<()> {
656        let qualified_name = self.qualify_name(name);
657        register_tool_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
658            PluginError::RegistrationFailed(format!("tool execution intercept: {err}"))
659        })?;
660
661        let name_owned = qualified_name;
662        self.registrations.push(PluginRegistration::new(
663            "plugin",
664            name_owned.clone(),
665            Box::new(move || {
666                deregister_tool_execution_intercept(&name_owned)
667                    .map(|_| ())
668                    .map_err(|err| {
669                        PluginError::RegistrationFailed(format!(
670                            "tool execution intercept deregistration failed: {err}"
671                        ))
672                    })
673            }),
674        ));
675        Ok(())
676    }
677
678    /// Adds a prebuilt registration to the context.
679    pub fn add_registration(&mut self, registration: PluginRegistration) {
680        self.registrations.push(registration);
681    }
682
683    /// Extends the context with prebuilt registrations.
684    pub fn extend_registrations(&mut self, registrations: Vec<PluginRegistration>) {
685        self.registrations.extend(registrations);
686    }
687
688    /// Consumes the context and returns the recorded registrations.
689    pub fn into_registrations(self) -> Vec<PluginRegistration> {
690        self.registrations
691    }
692}
693
694/// Implemented by custom plugins that register runtime middleware.
695pub trait Plugin: Send + Sync + 'static {
696    /// Returns the unique plugin kind string.
697    fn plugin_kind(&self) -> &str;
698
699    /// Returns whether the plugin kind can appear multiple times in the config.
700    ///
701    /// Return `false` for singleton components such as the built-in adaptive
702    /// component.
703    fn allows_multiple_components(&self) -> bool {
704        true
705    }
706
707    /// Validates one plugin component config.
708    ///
709    /// Returning error-level diagnostics prevents `initialize_plugins(...)`
710    /// from activating the configuration.
711    fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic>;
712
713    /// Registers runtime middleware/subscribers for one plugin component.
714    ///
715    /// The provided [`PluginRegistrationContext`] is component-scoped. Any
716    /// error aborts the current initialization and triggers rollback of
717    /// registrations created during the failed activation attempt.
718    fn register<'a>(
719        &'a self,
720        plugin_config: &Map<String, Json>,
721        ctx: &'a mut PluginRegistrationContext,
722    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
723}
724
725/// Registers a plugin by kind.
726///
727/// Registering the same kind twice returns [`PluginError::RegistrationFailed`].
728/// Register a plugin kind with the global plugin registry.
729///
730/// Registered plugins can then participate in validation and initialization of
731/// [`PluginConfig`] documents.
732///
733/// # Parameters
734/// - `plugin`: Plugin implementation to register.
735///
736/// # Returns
737/// A plugin [`Result`] that is `Ok(())` when the plugin kind was added
738/// to the registry.
739///
740/// # Errors
741/// Returns an error when a plugin with the same kind is already registered or
742/// when the registry lock is poisoned.
743///
744/// # Notes
745/// Registration affects future validation and initialization only.
746pub fn register_plugin(plugin: Arc<dyn Plugin>) -> Result<()> {
747    let mut guard = PLUGIN_HANDLERS
748        .write()
749        .map_err(|err| PluginError::Internal(format!("plugin registry lock poisoned: {err}")))?;
750    let plugin_kind = plugin.plugin_kind().to_string();
751    if guard.contains_key(&plugin_kind) {
752        return Err(PluginError::RegistrationFailed(format!(
753            "plugin '{plugin_kind}' is already registered"
754        )));
755    }
756    guard.insert(plugin_kind, plugin);
757    Ok(())
758}
759
760/// Registers core-provided plugin kinds.
761///
762/// Built-in plugins are available to validation and initialization without a
763/// binding or application-specific registration call.
764pub fn ensure_builtin_plugins_registered() -> Result<()> {
765    match BUILTIN_PLUGIN_REGISTRATION
766        .get_or_init(crate::observability::plugin_component::register_observability_component)
767    {
768        Ok(()) => Ok(()),
769        Err(err) => Err(clone_cached_plugin_error(err)),
770    }
771}
772
773fn clone_cached_plugin_error(err: &PluginError) -> PluginError {
774    match err {
775        PluginError::InvalidConfig(message) => PluginError::InvalidConfig(message.clone()),
776        PluginError::NotFound(message) => PluginError::NotFound(message.clone()),
777        PluginError::Serialization(err) => PluginError::Internal(err.to_string()),
778        PluginError::Internal(message) => PluginError::Internal(message.clone()),
779        PluginError::RegistrationFailed(message) => {
780            PluginError::RegistrationFailed(message.clone())
781        }
782    }
783}
784
785/// Removes a previously registered plugin.
786///
787/// This affects future validation and initialization only. Active runtime
788/// registrations remain until cleared or replaced.
789///
790/// # Parameters
791/// - `plugin_kind`: Plugin kind to remove from the registry.
792///
793/// # Returns
794/// `true` when a plugin was removed from the registry and `false` when the
795/// kind was not registered.
796///
797/// # Notes
798/// Active component registrations created by previous initialization calls are
799/// not removed by this function.
800pub fn deregister_plugin(plugin_kind: &str) -> bool {
801    PLUGIN_HANDLERS
802        .write()
803        .ok()
804        .and_then(|mut guard| guard.remove(plugin_kind))
805        .is_some()
806}
807
808/// Lists registered plugin kinds in sorted order.
809///
810/// This returns the currently registered plugin kinds without inspecting the
811/// active runtime configuration.
812///
813/// # Returns
814/// A sorted [`Vec<String>`] of registered plugin kinds.
815///
816/// # Notes
817/// Disabled or inactive components still appear here when their plugin kind is
818/// registered.
819pub fn list_plugin_kinds() -> Vec<String> {
820    let _ = ensure_builtin_plugins_registered();
821    let mut kinds = PLUGIN_HANDLERS
822        .read()
823        .map(|guard| guard.keys().cloned().collect::<Vec<_>>())
824        .unwrap_or_default();
825    kinds.sort();
826    kinds
827}
828
829/// Looks up a registered plugin by kind.
830///
831/// # Parameters
832/// - `plugin_kind`: Plugin kind to resolve.
833///
834/// # Returns
835/// The registered plugin implementation for `plugin_kind`, or `None` when the
836/// kind is unknown.
837///
838/// # Notes
839/// The returned plugin is shared by [`Arc`], so callers receive a cheap clone.
840pub fn lookup_plugin(plugin_kind: &str) -> Option<Arc<dyn Plugin>> {
841    let _ = ensure_builtin_plugins_registered();
842    PLUGIN_HANDLERS
843        .read()
844        .ok()
845        .and_then(|guard| guard.get(plugin_kind).cloned())
846}
847
848/// Validates a plugin configuration document.
849///
850/// This is a pure validation pass. It does not mutate the active runtime
851/// configuration.
852///
853/// # Parameters
854/// - `config`: Plugin configuration to validate.
855///
856/// # Returns
857/// A [`ConfigReport`] describing warnings and errors discovered during
858/// validation.
859///
860/// # Notes
861/// Validation checks host policy, plugin multiplicity rules, unknown component
862/// kinds, and plugin-provided validation hooks.
863pub fn validate_plugin_config(config: &PluginConfig) -> ConfigReport {
864    let _ = ensure_builtin_plugins_registered();
865    let mut report = ConfigReport::default();
866
867    if config.version != 1 {
868        push_policy_diag(
869            &mut report.diagnostics,
870            config.policy.unsupported_value,
871            "plugin.unsupported_config_version",
872            None,
873            Some("version".to_string()),
874            format!("plugin config version {} is unsupported", config.version),
875        );
876    }
877
878    validate_plugin_multiplicity(&mut report, config);
879
880    for component in &config.components {
881        let Some(plugin) = lookup_plugin(&component.kind) else {
882            push_policy_diag(
883                &mut report.diagnostics,
884                config.policy.unknown_component,
885                "plugin.unknown_component",
886                Some(component.kind.clone()),
887                None,
888                format!("plugin component kind '{}' is unsupported", component.kind),
889            );
890            continue;
891        };
892        report
893            .diagnostics
894            .extend(plugin.validate(&component.config));
895    }
896
897    report
898}
899
900/// Returns the JSON Schema for the canonical plugin configuration document.
901#[cfg(feature = "schema")]
902pub fn plugin_config_schema() -> Json {
903    serde_json::to_value(schemars::schema_for!(PluginConfig))
904        .expect("plugin config schema should serialize")
905}
906
907/// Configures the active global plugin components.
908///
909/// Initialization validates the supplied config, replaces the active
910/// configuration, and rolls back partial registration on failure. If a
911/// previous configuration was active, the host attempts to restore it when the
912/// new activation fails.
913///
914/// # Parameters
915/// - `config`: Plugin configuration to validate and activate.
916///
917/// # Returns
918/// A plugin [`Result`] containing the successful [`ConfigReport`].
919///
920/// # Errors
921/// Returns an error when validation fails, when plugin registration fails, or
922/// when the previous configuration cannot be restored after a failed replace.
923///
924/// # Notes
925/// Initialization is replace-with-rollback: the previous active configuration
926/// is removed before the new configuration is activated.
927pub async fn initialize_plugins(config: PluginConfig) -> Result<ConfigReport> {
928    let report = validate_plugin_config(&config);
929    if report.has_errors() {
930        return Err(PluginError::InvalidConfig(join_error_messages(&report)));
931    }
932
933    let previous = {
934        let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
935            PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
936        })?;
937        guard.take()
938    };
939
940    if let Some(mut previous_state) = previous {
941        rollback_registrations(&mut previous_state.registrations);
942        match initialize_plugin_components(&config).await {
943            Ok(registrations) => {
944                store_active_plugin_configuration(config, report.clone(), registrations)?;
945                Ok(report)
946            }
947            Err(err) => match initialize_plugin_components(&previous_state.config).await {
948                Ok(registrations) => {
949                    let previous_report = validate_plugin_config(&previous_state.config);
950                    store_active_plugin_configuration(
951                        previous_state.config,
952                        previous_report,
953                        registrations,
954                    )?;
955                    Err(err)
956                }
957                Err(restore_err) => Err(PluginError::RegistrationFailed(format!(
958                    "{err}; previous plugin configuration could not be restored: {restore_err}"
959                ))),
960            },
961        }
962    } else {
963        let registrations = initialize_plugin_components(&config).await?;
964        store_active_plugin_configuration(config, report.clone(), registrations)?;
965        Ok(report)
966    }
967}
968
969/// Deregisters and clears all configured plugin components.
970///
971/// Registered plugin kinds remain available for future validation and
972/// initialization.
973///
974/// # Returns
975/// A plugin [`Result`] that is `Ok(())` when the active configuration
976/// has been cleared.
977///
978/// # Errors
979/// Returns an error when the active configuration lock is poisoned.
980///
981/// # Notes
982/// Clearing active configuration does not remove plugin kinds from the global
983/// registry.
984pub fn clear_plugin_configuration() -> Result<()> {
985    let previous = {
986        let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
987            PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
988        })?;
989        guard.take()
990    };
991    if let Some(mut previous_state) = previous {
992        rollback_registrations(&mut previous_state.registrations);
993    }
994    Ok(())
995}
996
997/// Returns the last successfully configured plugin report.
998///
999/// `None` indicates that no plugin configuration is currently active.
1000///
1001/// # Returns
1002/// The last successful [`ConfigReport`], or `None` when no configuration is
1003/// active.
1004///
1005/// # Notes
1006/// This is a snapshot of the last successful activation and does not re-run
1007/// validation.
1008pub fn active_plugin_report() -> Option<ConfigReport> {
1009    ACTIVE_PLUGIN_CONFIGURATION
1010        .lock()
1011        .ok()
1012        .and_then(|guard| guard.as_ref().map(|state| state.report.clone()))
1013}
1014
1015/// Rolls back registrations in reverse order, ignoring rollback failures.
1016///
1017/// This is used internally during failed initialization and by
1018/// [`clear_plugin_configuration`].
1019pub fn rollback_registrations(registrations: &mut Vec<PluginRegistration>) {
1020    for registration in registrations.iter_mut().rev() {
1021        let _ = (registration.deregister)();
1022    }
1023    registrations.clear();
1024}
1025
1026struct ActivePluginConfiguration {
1027    config: PluginConfig,
1028    report: ConfigReport,
1029    registrations: Vec<PluginRegistration>,
1030}
1031
1032async fn initialize_plugin_components(config: &PluginConfig) -> Result<Vec<PluginRegistration>> {
1033    ensure_builtin_plugins_registered()?;
1034    let totals = plugin_component_totals(config);
1035    let mut ordinals: HashMap<&str, usize> = HashMap::new();
1036    let mut registrations = vec![];
1037
1038    for component in config
1039        .components
1040        .iter()
1041        .filter(|component| component.enabled)
1042    {
1043        let Some(plugin) = lookup_plugin(&component.kind) else {
1044            rollback_registrations(&mut registrations);
1045            return Err(PluginError::NotFound(format!(
1046                "plugin component '{}' is not registered",
1047                component.kind
1048            )));
1049        };
1050
1051        let ordinal = ordinals
1052            .entry(component.kind.as_str())
1053            .and_modify(|value| *value += 1)
1054            .or_insert(1);
1055        let namespace = component_namespace(
1056            &component.kind,
1057            *ordinal,
1058            totals.get(component.kind.as_str()).copied().unwrap_or(1),
1059        );
1060
1061        let mut ctx = PluginRegistrationContext::with_namespace(namespace);
1062        if let Err(err) = plugin.register(&component.config, &mut ctx).await {
1063            let mut just_registered = ctx.into_registrations();
1064            rollback_registrations(&mut just_registered);
1065            rollback_registrations(&mut registrations);
1066            return Err(err);
1067        }
1068        registrations.extend(ctx.into_registrations());
1069    }
1070
1071    Ok(registrations)
1072}
1073
1074fn store_active_plugin_configuration(
1075    config: PluginConfig,
1076    report: ConfigReport,
1077    registrations: Vec<PluginRegistration>,
1078) -> Result<()> {
1079    let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
1080        PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
1081    })?;
1082    *guard = Some(ActivePluginConfiguration {
1083        config,
1084        report,
1085        registrations,
1086    });
1087    Ok(())
1088}
1089
1090fn plugin_component_totals(config: &PluginConfig) -> HashMap<&str, usize> {
1091    let mut totals = HashMap::new();
1092    for component in &config.components {
1093        *totals.entry(component.kind.as_str()).or_insert(0) += 1;
1094    }
1095    totals
1096}
1097
1098fn component_namespace(kind: &str, ordinal: usize, total: usize) -> String {
1099    if total > 1 {
1100        format!("__nemo_flow_plugin__{kind}__{ordinal}__")
1101    } else {
1102        format!("__nemo_flow_plugin__{kind}__")
1103    }
1104}
1105
1106fn validate_plugin_multiplicity(report: &mut ConfigReport, config: &PluginConfig) {
1107    let totals = plugin_component_totals(config);
1108    let mut emitted = HashSet::new();
1109
1110    for component in &config.components {
1111        let count = totals
1112            .get(component.kind.as_str())
1113            .copied()
1114            .unwrap_or_default();
1115        if count <= 1 || !emitted.insert(component.kind.clone()) {
1116            continue;
1117        }
1118
1119        let allows_multiple = lookup_plugin(&component.kind)
1120            .map(|plugin| plugin.allows_multiple_components())
1121            .unwrap_or(true);
1122        if !allows_multiple {
1123            report.diagnostics.push(ConfigDiagnostic {
1124                level: DiagnosticLevel::Error,
1125                code: "plugin.duplicate_component".to_string(),
1126                component: Some(component.kind.clone()),
1127                field: None,
1128                message: format!(
1129                    "plugin component kind '{}' may only appear once",
1130                    component.kind
1131                ),
1132            });
1133        }
1134    }
1135}
1136
1137fn push_policy_diag(
1138    diagnostics: &mut Vec<ConfigDiagnostic>,
1139    behavior: UnsupportedBehavior,
1140    code: &str,
1141    component: Option<String>,
1142    field: Option<String>,
1143    message: String,
1144) {
1145    let level = match behavior {
1146        UnsupportedBehavior::Ignore => return,
1147        UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
1148        UnsupportedBehavior::Error => DiagnosticLevel::Error,
1149    };
1150
1151    diagnostics.push(ConfigDiagnostic {
1152        level,
1153        code: code.to_string(),
1154        component,
1155        field,
1156        message,
1157    });
1158}
1159
1160fn join_error_messages(report: &ConfigReport) -> String {
1161    report
1162        .diagnostics
1163        .iter()
1164        .filter(|diag| diag.level == DiagnosticLevel::Error)
1165        .map(|diag| diag.message.as_str())
1166        .collect::<Vec<_>>()
1167        .join("; ")
1168}
1169
1170#[cfg(test)]
1171#[path = "../tests/unit/plugin_tests.rs"]
1172mod tests;