1use std::collections::{HashMap, HashSet};
13use std::fmt;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::{Arc, LazyLock, Mutex, RwLock};
17
18use serde::{Deserialize, Serialize};
19use serde_json::{Map, Value as Json};
20use thiserror::Error;
21
22use crate::api::registry::{
23 deregister_llm_conditional_execution_guardrail, deregister_llm_execution_intercept,
24 deregister_llm_request_intercept, deregister_llm_sanitize_request_guardrail,
25 deregister_llm_sanitize_response_guardrail, deregister_llm_stream_execution_intercept,
26 deregister_tool_conditional_execution_guardrail, deregister_tool_execution_intercept,
27 deregister_tool_request_intercept, deregister_tool_sanitize_request_guardrail,
28 deregister_tool_sanitize_response_guardrail, register_llm_conditional_execution_guardrail,
29 register_llm_execution_intercept, register_llm_request_intercept,
30 register_llm_sanitize_request_guardrail, register_llm_sanitize_response_guardrail,
31 register_llm_stream_execution_intercept, register_tool_conditional_execution_guardrail,
32 register_tool_execution_intercept, register_tool_request_intercept,
33 register_tool_sanitize_request_guardrail, register_tool_sanitize_response_guardrail,
34};
35use crate::api::runtime::{
36 EventSubscriberFn, LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn,
37 LlmSanitizeRequestFn, LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn,
38 ToolExecutionFn, ToolInterceptFn, ToolSanitizeFn,
39};
40use crate::api::subscriber::{deregister_subscriber, register_subscriber};
41
42type PluginMap = HashMap<String, Arc<dyn Plugin>>;
43
44static PLUGIN_HANDLERS: LazyLock<RwLock<PluginMap>> = LazyLock::new(|| RwLock::new(HashMap::new()));
45static ACTIVE_PLUGIN_CONFIGURATION: LazyLock<Mutex<Option<ActivePluginConfiguration>>> =
46 LazyLock::new(|| Mutex::new(None));
47
48#[derive(Debug, Error)]
50pub enum PluginError {
51 #[error("invalid config: {0}")]
53 InvalidConfig(String),
54
55 #[error("not found: {0}")]
57 NotFound(String),
58
59 #[error("serialization error: {0}")]
61 Serialization(#[from] serde_json::Error),
62
63 #[error("internal error: {0}")]
65 Internal(String),
66
67 #[error("registration failed: {0}")]
69 RegistrationFailed(String),
70}
71
72pub type Result<T> = std::result::Result<T, PluginError>;
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PluginConfig {
78 #[serde(default = "default_plugin_config_version")]
80 pub version: u32,
81 #[serde(default)]
83 pub components: Vec<PluginComponentSpec>,
84 #[serde(default)]
86 pub policy: ConfigPolicy,
87}
88
89impl Default for PluginConfig {
90 fn default() -> Self {
91 Self {
92 version: default_plugin_config_version(),
93 components: vec![],
94 policy: ConfigPolicy::default(),
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct PluginComponentSpec {
102 pub kind: String,
104 #[serde(default = "default_enabled")]
109 pub enabled: bool,
110 #[serde(default)]
112 pub config: Map<String, Json>,
113}
114
115impl PluginComponentSpec {
116 pub fn new(kind: impl Into<String>) -> Self {
118 Self {
119 kind: kind.into(),
120 enabled: true,
121 config: Map::new(),
122 }
123 }
124}
125
126#[derive(Debug, Clone, Default, Serialize, Deserialize)]
128pub struct ConfigReport {
129 #[serde(default)]
131 pub diagnostics: Vec<ConfigDiagnostic>,
132}
133
134impl ConfigReport {
135 pub fn has_errors(&self) -> bool {
137 self.diagnostics
138 .iter()
139 .any(|diag| diag.level == DiagnosticLevel::Error)
140 }
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct ConfigDiagnostic {
146 pub level: DiagnosticLevel,
148 pub code: String,
150 #[serde(default, skip_serializing_if = "Option::is_none")]
152 pub component: Option<String>,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub field: Option<String>,
156 pub message: String,
158}
159
160#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(rename_all = "lowercase")]
163pub enum DiagnosticLevel {
164 Warning,
166 Error,
168}
169
170#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
172pub struct ConfigPolicy {
173 #[serde(default = "default_warn")]
175 pub unknown_component: UnsupportedBehavior,
176 #[serde(default = "default_warn")]
178 pub unknown_field: UnsupportedBehavior,
179 #[serde(default = "default_error")]
181 pub unsupported_value: UnsupportedBehavior,
182}
183
184impl Default for ConfigPolicy {
185 fn default() -> Self {
186 Self {
187 unknown_component: default_warn(),
188 unknown_field: default_warn(),
189 unsupported_value: default_error(),
190 }
191 }
192}
193
194#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
196#[serde(rename_all = "lowercase")]
197pub enum UnsupportedBehavior {
198 Ignore,
200 #[default]
202 Warn,
203 Error,
205}
206
207fn default_warn() -> UnsupportedBehavior {
208 UnsupportedBehavior::Warn
209}
210
211fn default_error() -> UnsupportedBehavior {
212 UnsupportedBehavior::Error
213}
214
215fn default_plugin_config_version() -> u32 {
216 1
217}
218
219fn default_enabled() -> bool {
220 true
221}
222
223pub struct PluginRegistration {
225 pub kind: String,
227 pub name: String,
229 deregister: Box<dyn FnMut() -> Result<()> + Send>,
230}
231
232impl fmt::Debug for PluginRegistration {
233 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234 f.debug_struct("PluginRegistration")
235 .field("kind", &self.kind)
236 .field("name", &self.name)
237 .finish_non_exhaustive()
238 }
239}
240
241impl PluginRegistration {
242 pub fn new(
244 kind: impl Into<String>,
245 name: impl Into<String>,
246 deregister: Box<dyn FnMut() -> Result<()> + Send>,
247 ) -> Self {
248 Self {
249 kind: kind.into(),
250 name: name.into(),
251 deregister,
252 }
253 }
254}
255
256#[derive(Default)]
262pub struct PluginRegistrationContext {
263 registrations: Vec<PluginRegistration>,
264 namespace: Option<String>,
265}
266
267impl PluginRegistrationContext {
268 pub fn new() -> Self {
270 Self::default()
271 }
272
273 pub fn with_namespace(namespace: impl Into<String>) -> Self {
275 Self {
276 registrations: vec![],
277 namespace: Some(namespace.into()),
278 }
279 }
280
281 pub fn qualify_name(&self, name: &str) -> String {
287 match &self.namespace {
288 Some(namespace) => format!("{namespace}{name}"),
289 None => name.to_string(),
290 }
291 }
292
293 pub fn register_subscriber(&mut self, name: &str, callback: EventSubscriberFn) -> Result<()> {
295 let qualified_name = self.qualify_name(name);
296 register_subscriber(&qualified_name, callback)
297 .map_err(|err| PluginError::RegistrationFailed(format!("subscriber: {err}")))?;
298
299 let name_owned = qualified_name;
300 self.registrations.push(PluginRegistration::new(
301 "plugin",
302 name_owned.clone(),
303 Box::new(move || {
304 deregister_subscriber(&name_owned)
305 .map(|_| ())
306 .map_err(|err| {
307 PluginError::RegistrationFailed(format!(
308 "subscriber deregistration failed: {err}"
309 ))
310 })
311 }),
312 ));
313 Ok(())
314 }
315
316 pub fn register_llm_request_intercept(
318 &mut self,
319 name: &str,
320 priority: i32,
321 break_chain: bool,
322 callback: LlmRequestInterceptFn,
323 ) -> Result<()> {
324 let qualified_name = self.qualify_name(name);
325 register_llm_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
326 |err| PluginError::RegistrationFailed(format!("llm request intercept: {err}")),
327 )?;
328
329 let name_owned = qualified_name;
330 self.registrations.push(PluginRegistration::new(
331 "plugin",
332 name_owned.clone(),
333 Box::new(move || {
334 deregister_llm_request_intercept(&name_owned)
335 .map(|_| ())
336 .map_err(|err| {
337 PluginError::RegistrationFailed(format!(
338 "llm request intercept deregistration failed: {err}"
339 ))
340 })
341 }),
342 ));
343 Ok(())
344 }
345
346 pub fn register_tool_sanitize_request_guardrail(
348 &mut self,
349 name: &str,
350 priority: i32,
351 callback: ToolSanitizeFn,
352 ) -> Result<()> {
353 let qualified_name = self.qualify_name(name);
354 register_tool_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
355 |err| {
356 PluginError::RegistrationFailed(format!("tool sanitize request guardrail: {err}"))
357 },
358 )?;
359
360 let name_owned = qualified_name;
361 self.registrations.push(PluginRegistration::new(
362 "plugin",
363 name_owned.clone(),
364 Box::new(move || {
365 deregister_tool_sanitize_request_guardrail(&name_owned)
366 .map(|_| ())
367 .map_err(|err| {
368 PluginError::RegistrationFailed(format!(
369 "tool sanitize request guardrail deregistration failed: {err}"
370 ))
371 })
372 }),
373 ));
374 Ok(())
375 }
376
377 pub fn register_tool_sanitize_response_guardrail(
379 &mut self,
380 name: &str,
381 priority: i32,
382 callback: ToolSanitizeFn,
383 ) -> Result<()> {
384 let qualified_name = self.qualify_name(name);
385 register_tool_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
386 |err| {
387 PluginError::RegistrationFailed(format!("tool sanitize response guardrail: {err}"))
388 },
389 )?;
390
391 let name_owned = qualified_name;
392 self.registrations.push(PluginRegistration::new(
393 "plugin",
394 name_owned.clone(),
395 Box::new(move || {
396 deregister_tool_sanitize_response_guardrail(&name_owned)
397 .map(|_| ())
398 .map_err(|err| {
399 PluginError::RegistrationFailed(format!(
400 "tool sanitize response guardrail deregistration failed: {err}"
401 ))
402 })
403 }),
404 ));
405 Ok(())
406 }
407
408 pub fn register_tool_conditional_execution_guardrail(
410 &mut self,
411 name: &str,
412 priority: i32,
413 callback: ToolConditionalFn,
414 ) -> Result<()> {
415 let qualified_name = self.qualify_name(name);
416 register_tool_conditional_execution_guardrail(&qualified_name, priority, callback)
417 .map_err(|err| {
418 PluginError::RegistrationFailed(format!(
419 "tool conditional execution guardrail: {err}"
420 ))
421 })?;
422
423 let name_owned = qualified_name;
424 self.registrations.push(PluginRegistration::new(
425 "plugin",
426 name_owned.clone(),
427 Box::new(move || {
428 deregister_tool_conditional_execution_guardrail(&name_owned)
429 .map(|_| ())
430 .map_err(|err| {
431 PluginError::RegistrationFailed(format!(
432 "tool conditional execution guardrail deregistration failed: {err}"
433 ))
434 })
435 }),
436 ));
437 Ok(())
438 }
439
440 pub fn register_llm_sanitize_request_guardrail(
442 &mut self,
443 name: &str,
444 priority: i32,
445 callback: LlmSanitizeRequestFn,
446 ) -> Result<()> {
447 let qualified_name = self.qualify_name(name);
448 register_llm_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
449 |err| PluginError::RegistrationFailed(format!("llm sanitize request guardrail: {err}")),
450 )?;
451
452 let name_owned = qualified_name;
453 self.registrations.push(PluginRegistration::new(
454 "plugin",
455 name_owned.clone(),
456 Box::new(move || {
457 deregister_llm_sanitize_request_guardrail(&name_owned)
458 .map(|_| ())
459 .map_err(|err| {
460 PluginError::RegistrationFailed(format!(
461 "llm sanitize request guardrail deregistration failed: {err}"
462 ))
463 })
464 }),
465 ));
466 Ok(())
467 }
468
469 pub fn register_llm_sanitize_response_guardrail(
471 &mut self,
472 name: &str,
473 priority: i32,
474 callback: LlmSanitizeResponseFn,
475 ) -> Result<()> {
476 let qualified_name = self.qualify_name(name);
477 register_llm_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
478 |err| {
479 PluginError::RegistrationFailed(format!("llm sanitize response guardrail: {err}"))
480 },
481 )?;
482
483 let name_owned = qualified_name;
484 self.registrations.push(PluginRegistration::new(
485 "plugin",
486 name_owned.clone(),
487 Box::new(move || {
488 deregister_llm_sanitize_response_guardrail(&name_owned)
489 .map(|_| ())
490 .map_err(|err| {
491 PluginError::RegistrationFailed(format!(
492 "llm sanitize response guardrail deregistration failed: {err}"
493 ))
494 })
495 }),
496 ));
497 Ok(())
498 }
499
500 pub fn register_llm_conditional_execution_guardrail(
502 &mut self,
503 name: &str,
504 priority: i32,
505 callback: LlmConditionalFn,
506 ) -> Result<()> {
507 let qualified_name = self.qualify_name(name);
508 register_llm_conditional_execution_guardrail(&qualified_name, priority, callback).map_err(
509 |err| {
510 PluginError::RegistrationFailed(format!(
511 "llm conditional execution guardrail: {err}"
512 ))
513 },
514 )?;
515
516 let name_owned = qualified_name;
517 self.registrations.push(PluginRegistration::new(
518 "plugin",
519 name_owned.clone(),
520 Box::new(move || {
521 deregister_llm_conditional_execution_guardrail(&name_owned)
522 .map(|_| ())
523 .map_err(|err| {
524 PluginError::RegistrationFailed(format!(
525 "llm conditional execution guardrail deregistration failed: {err}"
526 ))
527 })
528 }),
529 ));
530 Ok(())
531 }
532
533 pub fn register_llm_execution_intercept(
535 &mut self,
536 name: &str,
537 priority: i32,
538 callback: LlmExecutionFn,
539 ) -> Result<()> {
540 let qualified_name = self.qualify_name(name);
541 register_llm_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
542 PluginError::RegistrationFailed(format!("llm execution intercept: {err}"))
543 })?;
544
545 let name_owned = qualified_name;
546 self.registrations.push(PluginRegistration::new(
547 "plugin",
548 name_owned.clone(),
549 Box::new(move || {
550 deregister_llm_execution_intercept(&name_owned)
551 .map(|_| ())
552 .map_err(|err| {
553 PluginError::RegistrationFailed(format!(
554 "llm execution intercept deregistration failed: {err}"
555 ))
556 })
557 }),
558 ));
559 Ok(())
560 }
561
562 pub fn register_llm_stream_execution_intercept(
564 &mut self,
565 name: &str,
566 priority: i32,
567 callback: LlmStreamExecutionFn,
568 ) -> Result<()> {
569 let qualified_name = self.qualify_name(name);
570 register_llm_stream_execution_intercept(&qualified_name, priority, callback).map_err(
571 |err| PluginError::RegistrationFailed(format!("llm stream execution intercept: {err}")),
572 )?;
573
574 let name_owned = qualified_name;
575 self.registrations.push(PluginRegistration::new(
576 "plugin",
577 name_owned.clone(),
578 Box::new(move || {
579 deregister_llm_stream_execution_intercept(&name_owned)
580 .map(|_| ())
581 .map_err(|err| {
582 PluginError::RegistrationFailed(format!(
583 "llm stream execution intercept deregistration failed: {err}"
584 ))
585 })
586 }),
587 ));
588 Ok(())
589 }
590
591 pub fn register_tool_request_intercept(
593 &mut self,
594 name: &str,
595 priority: i32,
596 break_chain: bool,
597 callback: ToolInterceptFn,
598 ) -> Result<()> {
599 let qualified_name = self.qualify_name(name);
600 register_tool_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
601 |err| PluginError::RegistrationFailed(format!("tool request intercept: {err}")),
602 )?;
603
604 let name_owned = qualified_name;
605 self.registrations.push(PluginRegistration::new(
606 "plugin",
607 name_owned.clone(),
608 Box::new(move || {
609 deregister_tool_request_intercept(&name_owned)
610 .map(|_| ())
611 .map_err(|err| {
612 PluginError::RegistrationFailed(format!(
613 "tool request intercept deregistration failed: {err}"
614 ))
615 })
616 }),
617 ));
618 Ok(())
619 }
620
621 pub fn register_tool_execution_intercept(
623 &mut self,
624 name: &str,
625 priority: i32,
626 callback: ToolExecutionFn,
627 ) -> Result<()> {
628 let qualified_name = self.qualify_name(name);
629 register_tool_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
630 PluginError::RegistrationFailed(format!("tool execution intercept: {err}"))
631 })?;
632
633 let name_owned = qualified_name;
634 self.registrations.push(PluginRegistration::new(
635 "plugin",
636 name_owned.clone(),
637 Box::new(move || {
638 deregister_tool_execution_intercept(&name_owned)
639 .map(|_| ())
640 .map_err(|err| {
641 PluginError::RegistrationFailed(format!(
642 "tool execution intercept deregistration failed: {err}"
643 ))
644 })
645 }),
646 ));
647 Ok(())
648 }
649
650 pub fn add_registration(&mut self, registration: PluginRegistration) {
652 self.registrations.push(registration);
653 }
654
655 pub fn extend_registrations(&mut self, registrations: Vec<PluginRegistration>) {
657 self.registrations.extend(registrations);
658 }
659
660 pub fn into_registrations(self) -> Vec<PluginRegistration> {
662 self.registrations
663 }
664}
665
666pub trait Plugin: Send + Sync + 'static {
668 fn plugin_kind(&self) -> &str;
670
671 fn allows_multiple_components(&self) -> bool {
676 true
677 }
678
679 fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic>;
684
685 fn register<'a>(
691 &'a self,
692 plugin_config: &Map<String, Json>,
693 ctx: &'a mut PluginRegistrationContext,
694 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
695}
696
697pub fn register_plugin(plugin: Arc<dyn Plugin>) -> Result<()> {
719 let mut guard = PLUGIN_HANDLERS
720 .write()
721 .map_err(|err| PluginError::Internal(format!("plugin registry lock poisoned: {err}")))?;
722 let plugin_kind = plugin.plugin_kind().to_string();
723 if guard.contains_key(&plugin_kind) {
724 return Err(PluginError::RegistrationFailed(format!(
725 "plugin '{plugin_kind}' is already registered"
726 )));
727 }
728 guard.insert(plugin_kind, plugin);
729 Ok(())
730}
731
732pub fn deregister_plugin(plugin_kind: &str) -> bool {
748 PLUGIN_HANDLERS
749 .write()
750 .ok()
751 .and_then(|mut guard| guard.remove(plugin_kind))
752 .is_some()
753}
754
755pub fn list_plugin_kinds() -> Vec<String> {
767 let mut kinds = PLUGIN_HANDLERS
768 .read()
769 .map(|guard| guard.keys().cloned().collect::<Vec<_>>())
770 .unwrap_or_default();
771 kinds.sort();
772 kinds
773}
774
775pub fn lookup_plugin(plugin_kind: &str) -> Option<Arc<dyn Plugin>> {
787 PLUGIN_HANDLERS
788 .read()
789 .ok()
790 .and_then(|guard| guard.get(plugin_kind).cloned())
791}
792
793pub fn validate_plugin_config(config: &PluginConfig) -> ConfigReport {
809 let mut report = ConfigReport::default();
810
811 if config.version != 1 {
812 push_policy_diag(
813 &mut report.diagnostics,
814 config.policy.unsupported_value,
815 "plugin.unsupported_config_version",
816 None,
817 Some("version".to_string()),
818 format!("plugin config version {} is unsupported", config.version),
819 );
820 }
821
822 validate_plugin_multiplicity(&mut report, config);
823
824 for component in &config.components {
825 let Some(plugin) = lookup_plugin(&component.kind) else {
826 push_policy_diag(
827 &mut report.diagnostics,
828 config.policy.unknown_component,
829 "plugin.unknown_component",
830 Some(component.kind.clone()),
831 None,
832 format!("plugin component kind '{}' is unsupported", component.kind),
833 );
834 continue;
835 };
836 report
837 .diagnostics
838 .extend(plugin.validate(&component.config));
839 }
840
841 report
842}
843
844pub async fn initialize_plugins(config: PluginConfig) -> Result<ConfigReport> {
865 let report = validate_plugin_config(&config);
866 if report.has_errors() {
867 return Err(PluginError::InvalidConfig(join_error_messages(&report)));
868 }
869
870 let previous = {
871 let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
872 PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
873 })?;
874 guard.take()
875 };
876
877 if let Some(mut previous_state) = previous {
878 rollback_registrations(&mut previous_state.registrations);
879 match initialize_plugin_components(&config).await {
880 Ok(registrations) => {
881 store_active_plugin_configuration(config, report.clone(), registrations)?;
882 Ok(report)
883 }
884 Err(err) => match initialize_plugin_components(&previous_state.config).await {
885 Ok(registrations) => {
886 let previous_report = validate_plugin_config(&previous_state.config);
887 store_active_plugin_configuration(
888 previous_state.config,
889 previous_report,
890 registrations,
891 )?;
892 Err(err)
893 }
894 Err(restore_err) => Err(PluginError::RegistrationFailed(format!(
895 "{err}; previous plugin configuration could not be restored: {restore_err}"
896 ))),
897 },
898 }
899 } else {
900 let registrations = initialize_plugin_components(&config).await?;
901 store_active_plugin_configuration(config, report.clone(), registrations)?;
902 Ok(report)
903 }
904}
905
906pub fn clear_plugin_configuration() -> Result<()> {
922 let previous = {
923 let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
924 PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
925 })?;
926 guard.take()
927 };
928 if let Some(mut previous_state) = previous {
929 rollback_registrations(&mut previous_state.registrations);
930 }
931 Ok(())
932}
933
934pub fn active_plugin_report() -> Option<ConfigReport> {
946 ACTIVE_PLUGIN_CONFIGURATION
947 .lock()
948 .ok()
949 .and_then(|guard| guard.as_ref().map(|state| state.report.clone()))
950}
951
952pub fn rollback_registrations(registrations: &mut Vec<PluginRegistration>) {
957 for registration in registrations.iter_mut().rev() {
958 let _ = (registration.deregister)();
959 }
960 registrations.clear();
961}
962
963struct ActivePluginConfiguration {
964 config: PluginConfig,
965 report: ConfigReport,
966 registrations: Vec<PluginRegistration>,
967}
968
969async fn initialize_plugin_components(config: &PluginConfig) -> Result<Vec<PluginRegistration>> {
970 let totals = plugin_component_totals(config);
971 let mut ordinals: HashMap<&str, usize> = HashMap::new();
972 let mut registrations = vec![];
973
974 for component in config
975 .components
976 .iter()
977 .filter(|component| component.enabled)
978 {
979 let Some(plugin) = lookup_plugin(&component.kind) else {
980 rollback_registrations(&mut registrations);
981 return Err(PluginError::NotFound(format!(
982 "plugin component '{}' is not registered",
983 component.kind
984 )));
985 };
986
987 let ordinal = ordinals
988 .entry(component.kind.as_str())
989 .and_modify(|value| *value += 1)
990 .or_insert(1);
991 let namespace = component_namespace(
992 &component.kind,
993 *ordinal,
994 totals.get(component.kind.as_str()).copied().unwrap_or(1),
995 );
996
997 let mut ctx = PluginRegistrationContext::with_namespace(namespace);
998 if let Err(err) = plugin.register(&component.config, &mut ctx).await {
999 let mut just_registered = ctx.into_registrations();
1000 rollback_registrations(&mut just_registered);
1001 rollback_registrations(&mut registrations);
1002 return Err(err);
1003 }
1004 registrations.extend(ctx.into_registrations());
1005 }
1006
1007 Ok(registrations)
1008}
1009
1010fn store_active_plugin_configuration(
1011 config: PluginConfig,
1012 report: ConfigReport,
1013 registrations: Vec<PluginRegistration>,
1014) -> Result<()> {
1015 let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
1016 PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
1017 })?;
1018 *guard = Some(ActivePluginConfiguration {
1019 config,
1020 report,
1021 registrations,
1022 });
1023 Ok(())
1024}
1025
1026fn plugin_component_totals(config: &PluginConfig) -> HashMap<&str, usize> {
1027 let mut totals = HashMap::new();
1028 for component in &config.components {
1029 *totals.entry(component.kind.as_str()).or_insert(0) += 1;
1030 }
1031 totals
1032}
1033
1034fn component_namespace(kind: &str, ordinal: usize, total: usize) -> String {
1035 if total > 1 {
1036 format!("__nemo_flow_plugin__{kind}__{ordinal}__")
1037 } else {
1038 format!("__nemo_flow_plugin__{kind}__")
1039 }
1040}
1041
1042fn validate_plugin_multiplicity(report: &mut ConfigReport, config: &PluginConfig) {
1043 let totals = plugin_component_totals(config);
1044 let mut emitted = HashSet::new();
1045
1046 for component in &config.components {
1047 let count = totals
1048 .get(component.kind.as_str())
1049 .copied()
1050 .unwrap_or_default();
1051 if count <= 1 || !emitted.insert(component.kind.clone()) {
1052 continue;
1053 }
1054
1055 let allows_multiple = lookup_plugin(&component.kind)
1056 .map(|plugin| plugin.allows_multiple_components())
1057 .unwrap_or(true);
1058 if !allows_multiple {
1059 report.diagnostics.push(ConfigDiagnostic {
1060 level: DiagnosticLevel::Error,
1061 code: "plugin.duplicate_component".to_string(),
1062 component: Some(component.kind.clone()),
1063 field: None,
1064 message: format!(
1065 "plugin component kind '{}' may only appear once",
1066 component.kind
1067 ),
1068 });
1069 }
1070 }
1071}
1072
1073fn push_policy_diag(
1074 diagnostics: &mut Vec<ConfigDiagnostic>,
1075 behavior: UnsupportedBehavior,
1076 code: &str,
1077 component: Option<String>,
1078 field: Option<String>,
1079 message: String,
1080) {
1081 let level = match behavior {
1082 UnsupportedBehavior::Ignore => return,
1083 UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
1084 UnsupportedBehavior::Error => DiagnosticLevel::Error,
1085 };
1086
1087 diagnostics.push(ConfigDiagnostic {
1088 level,
1089 code: code.to_string(),
1090 component,
1091 field,
1092 message,
1093 });
1094}
1095
1096fn join_error_messages(report: &ConfigReport) -> String {
1097 report
1098 .diagnostics
1099 .iter()
1100 .filter(|diag| diag.level == DiagnosticLevel::Error)
1101 .map(|diag| diag.message.as_str())
1102 .collect::<Vec<_>>()
1103 .join("; ")
1104}
1105
1106#[cfg(test)]
1107#[path = "../tests/unit/plugin_tests.rs"]
1108mod tests;