use std::collections::{HashMap, HashSet};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use thiserror::Error;
use crate::api::registry::{
deregister_llm_conditional_execution_guardrail, deregister_llm_execution_intercept,
deregister_llm_request_intercept, deregister_llm_sanitize_request_guardrail,
deregister_llm_sanitize_response_guardrail, deregister_llm_stream_execution_intercept,
deregister_tool_conditional_execution_guardrail, deregister_tool_execution_intercept,
deregister_tool_request_intercept, deregister_tool_sanitize_request_guardrail,
deregister_tool_sanitize_response_guardrail, register_llm_conditional_execution_guardrail,
register_llm_execution_intercept, register_llm_request_intercept,
register_llm_sanitize_request_guardrail, register_llm_sanitize_response_guardrail,
register_llm_stream_execution_intercept, register_tool_conditional_execution_guardrail,
register_tool_execution_intercept, register_tool_request_intercept,
register_tool_sanitize_request_guardrail, register_tool_sanitize_response_guardrail,
};
use crate::api::runtime::{
EventSubscriberFn, LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn,
LlmSanitizeRequestFn, LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn,
ToolExecutionFn, ToolInterceptFn, ToolSanitizeFn,
};
use crate::api::subscriber::{deregister_subscriber, register_subscriber};
type PluginMap = HashMap<String, Arc<dyn Plugin>>;
static PLUGIN_HANDLERS: LazyLock<RwLock<PluginMap>> = LazyLock::new(|| RwLock::new(HashMap::new()));
static ACTIVE_PLUGIN_CONFIGURATION: LazyLock<Mutex<Option<ActivePluginConfiguration>>> =
LazyLock::new(|| Mutex::new(None));
static BUILTIN_PLUGIN_REGISTRATION: OnceLock<Result<()>> = OnceLock::new();
#[derive(Debug, Error)]
pub enum PluginError {
#[error("invalid config: {0}")]
InvalidConfig(String),
#[error("not found: {0}")]
NotFound(String),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("internal error: {0}")]
Internal(String),
#[error("registration failed: {0}")]
RegistrationFailed(String),
}
pub type Result<T> = std::result::Result<T, PluginError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct PluginConfig {
#[serde(default = "default_plugin_config_version")]
pub version: u32,
#[serde(default)]
pub components: Vec<PluginComponentSpec>,
#[serde(default)]
pub policy: ConfigPolicy,
}
impl Default for PluginConfig {
fn default() -> Self {
Self {
version: default_plugin_config_version(),
components: vec![],
policy: ConfigPolicy::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct PluginComponentSpec {
pub kind: String,
#[serde(default = "default_enabled")]
pub enabled: bool,
#[serde(default)]
pub config: Map<String, Json>,
}
impl PluginComponentSpec {
pub fn new(kind: impl Into<String>) -> Self {
Self {
kind: kind.into(),
enabled: true,
config: Map::new(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct ConfigReport {
#[serde(default)]
pub diagnostics: Vec<ConfigDiagnostic>,
}
impl ConfigReport {
pub fn has_errors(&self) -> bool {
self.diagnostics
.iter()
.any(|diag| diag.level == DiagnosticLevel::Error)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct ConfigDiagnostic {
pub level: DiagnosticLevel,
pub code: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub component: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub field: Option<String>,
pub message: String,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "lowercase")]
pub enum DiagnosticLevel {
Warning,
Error,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct ConfigPolicy {
#[serde(default = "default_warn")]
pub unknown_component: UnsupportedBehavior,
#[serde(default = "default_warn")]
pub unknown_field: UnsupportedBehavior,
#[serde(default = "default_error")]
pub unsupported_value: UnsupportedBehavior,
}
impl Default for ConfigPolicy {
fn default() -> Self {
Self {
unknown_component: default_warn(),
unknown_field: default_warn(),
unsupported_value: default_error(),
}
}
}
crate::editor_config! {
impl ConfigPolicy {
unknown_component => {
label: "unknown_component",
kind: Enum,
values: ["warn", "ignore", "error"],
},
unknown_field => {
label: "unknown_field",
kind: Enum,
values: ["warn", "ignore", "error"],
},
unsupported_value => {
label: "unsupported_value",
kind: Enum,
values: ["warn", "ignore", "error"],
},
}
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "lowercase")]
pub enum UnsupportedBehavior {
Ignore,
#[default]
Warn,
Error,
}
fn default_warn() -> UnsupportedBehavior {
UnsupportedBehavior::Warn
}
fn default_error() -> UnsupportedBehavior {
UnsupportedBehavior::Error
}
fn default_plugin_config_version() -> u32 {
1
}
fn default_enabled() -> bool {
true
}
pub struct PluginRegistration {
pub kind: String,
pub name: String,
deregister: Box<dyn FnMut() -> Result<()> + Send>,
}
impl fmt::Debug for PluginRegistration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PluginRegistration")
.field("kind", &self.kind)
.field("name", &self.name)
.finish_non_exhaustive()
}
}
impl PluginRegistration {
pub fn new(
kind: impl Into<String>,
name: impl Into<String>,
deregister: Box<dyn FnMut() -> Result<()> + Send>,
) -> Self {
Self {
kind: kind.into(),
name: name.into(),
deregister,
}
}
}
#[derive(Default)]
pub struct PluginRegistrationContext {
registrations: Vec<PluginRegistration>,
namespace: Option<String>,
}
impl PluginRegistrationContext {
pub fn new() -> Self {
Self::default()
}
pub fn with_namespace(namespace: impl Into<String>) -> Self {
Self {
registrations: vec![],
namespace: Some(namespace.into()),
}
}
pub fn qualify_name(&self, name: &str) -> String {
match &self.namespace {
Some(namespace) => format!("{namespace}{name}"),
None => name.to_string(),
}
}
pub fn register_subscriber(&mut self, name: &str, callback: EventSubscriberFn) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_subscriber(&qualified_name, callback)
.map_err(|err| PluginError::RegistrationFailed(format!("subscriber: {err}")))?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_subscriber(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"subscriber deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_llm_request_intercept(
&mut self,
name: &str,
priority: i32,
break_chain: bool,
callback: LlmRequestInterceptFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_llm_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
|err| PluginError::RegistrationFailed(format!("llm request intercept: {err}")),
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_llm_request_intercept(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"llm request intercept deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_tool_sanitize_request_guardrail(
&mut self,
name: &str,
priority: i32,
callback: ToolSanitizeFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_tool_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
|err| {
PluginError::RegistrationFailed(format!("tool sanitize request guardrail: {err}"))
},
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_tool_sanitize_request_guardrail(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"tool sanitize request guardrail deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_tool_sanitize_response_guardrail(
&mut self,
name: &str,
priority: i32,
callback: ToolSanitizeFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_tool_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
|err| {
PluginError::RegistrationFailed(format!("tool sanitize response guardrail: {err}"))
},
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_tool_sanitize_response_guardrail(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"tool sanitize response guardrail deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_tool_conditional_execution_guardrail(
&mut self,
name: &str,
priority: i32,
callback: ToolConditionalFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_tool_conditional_execution_guardrail(&qualified_name, priority, callback)
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"tool conditional execution guardrail: {err}"
))
})?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_tool_conditional_execution_guardrail(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"tool conditional execution guardrail deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_llm_sanitize_request_guardrail(
&mut self,
name: &str,
priority: i32,
callback: LlmSanitizeRequestFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_llm_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
|err| PluginError::RegistrationFailed(format!("llm sanitize request guardrail: {err}")),
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_llm_sanitize_request_guardrail(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"llm sanitize request guardrail deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_llm_sanitize_response_guardrail(
&mut self,
name: &str,
priority: i32,
callback: LlmSanitizeResponseFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_llm_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
|err| {
PluginError::RegistrationFailed(format!("llm sanitize response guardrail: {err}"))
},
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_llm_sanitize_response_guardrail(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"llm sanitize response guardrail deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_llm_conditional_execution_guardrail(
&mut self,
name: &str,
priority: i32,
callback: LlmConditionalFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_llm_conditional_execution_guardrail(&qualified_name, priority, callback).map_err(
|err| {
PluginError::RegistrationFailed(format!(
"llm conditional execution guardrail: {err}"
))
},
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_llm_conditional_execution_guardrail(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"llm conditional execution guardrail deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_llm_execution_intercept(
&mut self,
name: &str,
priority: i32,
callback: LlmExecutionFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_llm_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
PluginError::RegistrationFailed(format!("llm execution intercept: {err}"))
})?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_llm_execution_intercept(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"llm execution intercept deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_llm_stream_execution_intercept(
&mut self,
name: &str,
priority: i32,
callback: LlmStreamExecutionFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_llm_stream_execution_intercept(&qualified_name, priority, callback).map_err(
|err| PluginError::RegistrationFailed(format!("llm stream execution intercept: {err}")),
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_llm_stream_execution_intercept(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"llm stream execution intercept deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_tool_request_intercept(
&mut self,
name: &str,
priority: i32,
break_chain: bool,
callback: ToolInterceptFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_tool_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
|err| PluginError::RegistrationFailed(format!("tool request intercept: {err}")),
)?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_tool_request_intercept(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"tool request intercept deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn register_tool_execution_intercept(
&mut self,
name: &str,
priority: i32,
callback: ToolExecutionFn,
) -> Result<()> {
let qualified_name = self.qualify_name(name);
register_tool_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
PluginError::RegistrationFailed(format!("tool execution intercept: {err}"))
})?;
let name_owned = qualified_name;
self.registrations.push(PluginRegistration::new(
"plugin",
name_owned.clone(),
Box::new(move || {
deregister_tool_execution_intercept(&name_owned)
.map(|_| ())
.map_err(|err| {
PluginError::RegistrationFailed(format!(
"tool execution intercept deregistration failed: {err}"
))
})
}),
));
Ok(())
}
pub fn add_registration(&mut self, registration: PluginRegistration) {
self.registrations.push(registration);
}
pub fn extend_registrations(&mut self, registrations: Vec<PluginRegistration>) {
self.registrations.extend(registrations);
}
pub fn into_registrations(self) -> Vec<PluginRegistration> {
self.registrations
}
}
pub trait Plugin: Send + Sync + 'static {
fn plugin_kind(&self) -> &str;
fn allows_multiple_components(&self) -> bool {
true
}
fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic>;
fn register<'a>(
&'a self,
plugin_config: &Map<String, Json>,
ctx: &'a mut PluginRegistrationContext,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}
pub fn register_plugin(plugin: Arc<dyn Plugin>) -> Result<()> {
let mut guard = PLUGIN_HANDLERS
.write()
.map_err(|err| PluginError::Internal(format!("plugin registry lock poisoned: {err}")))?;
let plugin_kind = plugin.plugin_kind().to_string();
if guard.contains_key(&plugin_kind) {
return Err(PluginError::RegistrationFailed(format!(
"plugin '{plugin_kind}' is already registered"
)));
}
guard.insert(plugin_kind, plugin);
Ok(())
}
pub fn ensure_builtin_plugins_registered() -> Result<()> {
match BUILTIN_PLUGIN_REGISTRATION
.get_or_init(crate::observability::plugin_component::register_observability_component)
{
Ok(()) => Ok(()),
Err(err) => Err(clone_cached_plugin_error(err)),
}
}
fn clone_cached_plugin_error(err: &PluginError) -> PluginError {
match err {
PluginError::InvalidConfig(message) => PluginError::InvalidConfig(message.clone()),
PluginError::NotFound(message) => PluginError::NotFound(message.clone()),
PluginError::Serialization(err) => PluginError::Internal(err.to_string()),
PluginError::Internal(message) => PluginError::Internal(message.clone()),
PluginError::RegistrationFailed(message) => {
PluginError::RegistrationFailed(message.clone())
}
}
}
pub fn deregister_plugin(plugin_kind: &str) -> bool {
PLUGIN_HANDLERS
.write()
.ok()
.and_then(|mut guard| guard.remove(plugin_kind))
.is_some()
}
pub fn list_plugin_kinds() -> Vec<String> {
let _ = ensure_builtin_plugins_registered();
let mut kinds = PLUGIN_HANDLERS
.read()
.map(|guard| guard.keys().cloned().collect::<Vec<_>>())
.unwrap_or_default();
kinds.sort();
kinds
}
pub fn lookup_plugin(plugin_kind: &str) -> Option<Arc<dyn Plugin>> {
let _ = ensure_builtin_plugins_registered();
PLUGIN_HANDLERS
.read()
.ok()
.and_then(|guard| guard.get(plugin_kind).cloned())
}
pub fn validate_plugin_config(config: &PluginConfig) -> ConfigReport {
let _ = ensure_builtin_plugins_registered();
let mut report = ConfigReport::default();
if config.version != 1 {
push_policy_diag(
&mut report.diagnostics,
config.policy.unsupported_value,
"plugin.unsupported_config_version",
None,
Some("version".to_string()),
format!("plugin config version {} is unsupported", config.version),
);
}
validate_plugin_multiplicity(&mut report, config);
for component in &config.components {
let Some(plugin) = lookup_plugin(&component.kind) else {
push_policy_diag(
&mut report.diagnostics,
config.policy.unknown_component,
"plugin.unknown_component",
Some(component.kind.clone()),
None,
format!("plugin component kind '{}' is unsupported", component.kind),
);
continue;
};
report
.diagnostics
.extend(plugin.validate(&component.config));
}
report
}
#[cfg(feature = "schema")]
pub fn plugin_config_schema() -> Json {
serde_json::to_value(schemars::schema_for!(PluginConfig))
.expect("plugin config schema should serialize")
}
pub async fn initialize_plugins(config: PluginConfig) -> Result<ConfigReport> {
let report = validate_plugin_config(&config);
if report.has_errors() {
return Err(PluginError::InvalidConfig(join_error_messages(&report)));
}
let previous = {
let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
})?;
guard.take()
};
if let Some(mut previous_state) = previous {
rollback_registrations(&mut previous_state.registrations);
match initialize_plugin_components(&config).await {
Ok(registrations) => {
store_active_plugin_configuration(config, report.clone(), registrations)?;
Ok(report)
}
Err(err) => match initialize_plugin_components(&previous_state.config).await {
Ok(registrations) => {
let previous_report = validate_plugin_config(&previous_state.config);
store_active_plugin_configuration(
previous_state.config,
previous_report,
registrations,
)?;
Err(err)
}
Err(restore_err) => Err(PluginError::RegistrationFailed(format!(
"{err}; previous plugin configuration could not be restored: {restore_err}"
))),
},
}
} else {
let registrations = initialize_plugin_components(&config).await?;
store_active_plugin_configuration(config, report.clone(), registrations)?;
Ok(report)
}
}
pub fn clear_plugin_configuration() -> Result<()> {
let previous = {
let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
})?;
guard.take()
};
if let Some(mut previous_state) = previous {
rollback_registrations(&mut previous_state.registrations);
}
Ok(())
}
pub fn active_plugin_report() -> Option<ConfigReport> {
ACTIVE_PLUGIN_CONFIGURATION
.lock()
.ok()
.and_then(|guard| guard.as_ref().map(|state| state.report.clone()))
}
pub fn rollback_registrations(registrations: &mut Vec<PluginRegistration>) {
for registration in registrations.iter_mut().rev() {
let _ = (registration.deregister)();
}
registrations.clear();
}
struct ActivePluginConfiguration {
config: PluginConfig,
report: ConfigReport,
registrations: Vec<PluginRegistration>,
}
async fn initialize_plugin_components(config: &PluginConfig) -> Result<Vec<PluginRegistration>> {
ensure_builtin_plugins_registered()?;
let totals = plugin_component_totals(config);
let mut ordinals: HashMap<&str, usize> = HashMap::new();
let mut registrations = vec![];
for component in config
.components
.iter()
.filter(|component| component.enabled)
{
let Some(plugin) = lookup_plugin(&component.kind) else {
rollback_registrations(&mut registrations);
return Err(PluginError::NotFound(format!(
"plugin component '{}' is not registered",
component.kind
)));
};
let ordinal = ordinals
.entry(component.kind.as_str())
.and_modify(|value| *value += 1)
.or_insert(1);
let namespace = component_namespace(
&component.kind,
*ordinal,
totals.get(component.kind.as_str()).copied().unwrap_or(1),
);
let mut ctx = PluginRegistrationContext::with_namespace(namespace);
if let Err(err) = plugin.register(&component.config, &mut ctx).await {
let mut just_registered = ctx.into_registrations();
rollback_registrations(&mut just_registered);
rollback_registrations(&mut registrations);
return Err(err);
}
registrations.extend(ctx.into_registrations());
}
Ok(registrations)
}
fn store_active_plugin_configuration(
config: PluginConfig,
report: ConfigReport,
registrations: Vec<PluginRegistration>,
) -> Result<()> {
let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
})?;
*guard = Some(ActivePluginConfiguration {
config,
report,
registrations,
});
Ok(())
}
fn plugin_component_totals(config: &PluginConfig) -> HashMap<&str, usize> {
let mut totals = HashMap::new();
for component in &config.components {
*totals.entry(component.kind.as_str()).or_insert(0) += 1;
}
totals
}
fn component_namespace(kind: &str, ordinal: usize, total: usize) -> String {
if total > 1 {
format!("__nemo_flow_plugin__{kind}__{ordinal}__")
} else {
format!("__nemo_flow_plugin__{kind}__")
}
}
fn validate_plugin_multiplicity(report: &mut ConfigReport, config: &PluginConfig) {
let totals = plugin_component_totals(config);
let mut emitted = HashSet::new();
for component in &config.components {
let count = totals
.get(component.kind.as_str())
.copied()
.unwrap_or_default();
if count <= 1 || !emitted.insert(component.kind.clone()) {
continue;
}
let allows_multiple = lookup_plugin(&component.kind)
.map(|plugin| plugin.allows_multiple_components())
.unwrap_or(true);
if !allows_multiple {
report.diagnostics.push(ConfigDiagnostic {
level: DiagnosticLevel::Error,
code: "plugin.duplicate_component".to_string(),
component: Some(component.kind.clone()),
field: None,
message: format!(
"plugin component kind '{}' may only appear once",
component.kind
),
});
}
}
}
fn push_policy_diag(
diagnostics: &mut Vec<ConfigDiagnostic>,
behavior: UnsupportedBehavior,
code: &str,
component: Option<String>,
field: Option<String>,
message: String,
) {
let level = match behavior {
UnsupportedBehavior::Ignore => return,
UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
UnsupportedBehavior::Error => DiagnosticLevel::Error,
};
diagnostics.push(ConfigDiagnostic {
level,
code: code.to_string(),
component,
field,
message,
});
}
fn join_error_messages(report: &ConfigReport) -> String {
report
.diagnostics
.iter()
.filter(|diag| diag.level == DiagnosticLevel::Error)
.map(|diag| diag.message.as_str())
.collect::<Vec<_>>()
.join("; ")
}
#[cfg(test)]
#[path = "../tests/unit/plugin_tests.rs"]
mod tests;