1use std::collections::{HashMap, HashSet};
13use std::fmt;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock};
17
18use serde::{Deserialize, Serialize};
19use serde_json::{Map, Value as Json};
20use thiserror::Error;
21
22use crate::api::registry::{
23 deregister_llm_conditional_execution_guardrail, deregister_llm_execution_intercept,
24 deregister_llm_request_intercept, deregister_llm_sanitize_request_guardrail,
25 deregister_llm_sanitize_response_guardrail, deregister_llm_stream_execution_intercept,
26 deregister_tool_conditional_execution_guardrail, deregister_tool_execution_intercept,
27 deregister_tool_request_intercept, deregister_tool_sanitize_request_guardrail,
28 deregister_tool_sanitize_response_guardrail, register_llm_conditional_execution_guardrail,
29 register_llm_execution_intercept, register_llm_request_intercept,
30 register_llm_sanitize_request_guardrail, register_llm_sanitize_response_guardrail,
31 register_llm_stream_execution_intercept, register_tool_conditional_execution_guardrail,
32 register_tool_execution_intercept, register_tool_request_intercept,
33 register_tool_sanitize_request_guardrail, register_tool_sanitize_response_guardrail,
34};
35use crate::api::runtime::{
36 EventSubscriberFn, LlmConditionalFn, LlmExecutionFn, LlmRequestInterceptFn,
37 LlmSanitizeRequestFn, LlmSanitizeResponseFn, LlmStreamExecutionFn, ToolConditionalFn,
38 ToolExecutionFn, ToolInterceptFn, ToolSanitizeFn,
39};
40use crate::api::subscriber::{deregister_subscriber, register_subscriber};
41
42type PluginMap = HashMap<String, Arc<dyn Plugin>>;
43
44static PLUGIN_HANDLERS: LazyLock<RwLock<PluginMap>> = LazyLock::new(|| RwLock::new(HashMap::new()));
45static ACTIVE_PLUGIN_CONFIGURATION: LazyLock<Mutex<Option<ActivePluginConfiguration>>> =
46 LazyLock::new(|| Mutex::new(None));
47static BUILTIN_PLUGIN_REGISTRATION: OnceLock<Result<()>> = OnceLock::new();
48
49#[derive(Debug, Error)]
51pub enum PluginError {
52 #[error("invalid config: {0}")]
54 InvalidConfig(String),
55
56 #[error("not found: {0}")]
58 NotFound(String),
59
60 #[error("serialization error: {0}")]
62 Serialization(#[from] serde_json::Error),
63
64 #[error("internal error: {0}")]
66 Internal(String),
67
68 #[error("registration failed: {0}")]
70 RegistrationFailed(String),
71}
72
73pub type Result<T> = std::result::Result<T, PluginError>;
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
79pub struct PluginConfig {
80 #[serde(default = "default_plugin_config_version")]
82 pub version: u32,
83 #[serde(default)]
85 pub components: Vec<PluginComponentSpec>,
86 #[serde(default)]
88 pub policy: ConfigPolicy,
89}
90
91impl Default for PluginConfig {
92 fn default() -> Self {
93 Self {
94 version: default_plugin_config_version(),
95 components: vec![],
96 policy: ConfigPolicy::default(),
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
104pub struct PluginComponentSpec {
105 pub kind: String,
107 #[serde(default = "default_enabled")]
112 pub enabled: bool,
113 #[serde(default)]
115 pub config: Map<String, Json>,
116}
117
118impl PluginComponentSpec {
119 pub fn new(kind: impl Into<String>) -> Self {
121 Self {
122 kind: kind.into(),
123 enabled: true,
124 config: Map::new(),
125 }
126 }
127}
128
129#[derive(Debug, Clone, Default, Serialize, Deserialize)]
131#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
132pub struct ConfigReport {
133 #[serde(default)]
135 pub diagnostics: Vec<ConfigDiagnostic>,
136}
137
138impl ConfigReport {
139 pub fn has_errors(&self) -> bool {
141 self.diagnostics
142 .iter()
143 .any(|diag| diag.level == DiagnosticLevel::Error)
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
150pub struct ConfigDiagnostic {
151 pub level: DiagnosticLevel,
153 pub code: String,
155 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub component: Option<String>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
160 pub field: Option<String>,
161 pub message: String,
163}
164
165#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
167#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
168#[serde(rename_all = "lowercase")]
169pub enum DiagnosticLevel {
170 Warning,
172 Error,
174}
175
176#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
178#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
179pub struct ConfigPolicy {
180 #[serde(default = "default_warn")]
182 pub unknown_component: UnsupportedBehavior,
183 #[serde(default = "default_warn")]
185 pub unknown_field: UnsupportedBehavior,
186 #[serde(default = "default_error")]
188 pub unsupported_value: UnsupportedBehavior,
189}
190
191impl Default for ConfigPolicy {
192 fn default() -> Self {
193 Self {
194 unknown_component: default_warn(),
195 unknown_field: default_warn(),
196 unsupported_value: default_error(),
197 }
198 }
199}
200
201crate::editor_config! {
202 impl ConfigPolicy {
203 unknown_component => {
204 label: "unknown_component",
205 kind: Enum,
206 values: ["warn", "ignore", "error"],
207 },
208 unknown_field => {
209 label: "unknown_field",
210 kind: Enum,
211 values: ["warn", "ignore", "error"],
212 },
213 unsupported_value => {
214 label: "unsupported_value",
215 kind: Enum,
216 values: ["warn", "ignore", "error"],
217 },
218 }
219}
220
221#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
223#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
224#[serde(rename_all = "lowercase")]
225pub enum UnsupportedBehavior {
226 Ignore,
228 #[default]
230 Warn,
231 Error,
233}
234
235fn default_warn() -> UnsupportedBehavior {
236 UnsupportedBehavior::Warn
237}
238
239fn default_error() -> UnsupportedBehavior {
240 UnsupportedBehavior::Error
241}
242
243fn default_plugin_config_version() -> u32 {
244 1
245}
246
247fn default_enabled() -> bool {
248 true
249}
250
251pub struct PluginRegistration {
253 pub kind: String,
255 pub name: String,
257 deregister: Box<dyn FnMut() -> Result<()> + Send>,
258}
259
260impl fmt::Debug for PluginRegistration {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 f.debug_struct("PluginRegistration")
263 .field("kind", &self.kind)
264 .field("name", &self.name)
265 .finish_non_exhaustive()
266 }
267}
268
269impl PluginRegistration {
270 pub fn new(
272 kind: impl Into<String>,
273 name: impl Into<String>,
274 deregister: Box<dyn FnMut() -> Result<()> + Send>,
275 ) -> Self {
276 Self {
277 kind: kind.into(),
278 name: name.into(),
279 deregister,
280 }
281 }
282}
283
284#[derive(Default)]
290pub struct PluginRegistrationContext {
291 registrations: Vec<PluginRegistration>,
292 namespace: Option<String>,
293}
294
295impl PluginRegistrationContext {
296 pub fn new() -> Self {
298 Self::default()
299 }
300
301 pub fn with_namespace(namespace: impl Into<String>) -> Self {
303 Self {
304 registrations: vec![],
305 namespace: Some(namespace.into()),
306 }
307 }
308
309 pub fn qualify_name(&self, name: &str) -> String {
315 match &self.namespace {
316 Some(namespace) => format!("{namespace}{name}"),
317 None => name.to_string(),
318 }
319 }
320
321 pub fn register_subscriber(&mut self, name: &str, callback: EventSubscriberFn) -> Result<()> {
323 let qualified_name = self.qualify_name(name);
324 register_subscriber(&qualified_name, callback)
325 .map_err(|err| PluginError::RegistrationFailed(format!("subscriber: {err}")))?;
326
327 let name_owned = qualified_name;
328 self.registrations.push(PluginRegistration::new(
329 "plugin",
330 name_owned.clone(),
331 Box::new(move || {
332 deregister_subscriber(&name_owned)
333 .map(|_| ())
334 .map_err(|err| {
335 PluginError::RegistrationFailed(format!(
336 "subscriber deregistration failed: {err}"
337 ))
338 })
339 }),
340 ));
341 Ok(())
342 }
343
344 pub fn register_llm_request_intercept(
346 &mut self,
347 name: &str,
348 priority: i32,
349 break_chain: bool,
350 callback: LlmRequestInterceptFn,
351 ) -> Result<()> {
352 let qualified_name = self.qualify_name(name);
353 register_llm_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
354 |err| PluginError::RegistrationFailed(format!("llm request intercept: {err}")),
355 )?;
356
357 let name_owned = qualified_name;
358 self.registrations.push(PluginRegistration::new(
359 "plugin",
360 name_owned.clone(),
361 Box::new(move || {
362 deregister_llm_request_intercept(&name_owned)
363 .map(|_| ())
364 .map_err(|err| {
365 PluginError::RegistrationFailed(format!(
366 "llm request intercept deregistration failed: {err}"
367 ))
368 })
369 }),
370 ));
371 Ok(())
372 }
373
374 pub fn register_tool_sanitize_request_guardrail(
376 &mut self,
377 name: &str,
378 priority: i32,
379 callback: ToolSanitizeFn,
380 ) -> Result<()> {
381 let qualified_name = self.qualify_name(name);
382 register_tool_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
383 |err| {
384 PluginError::RegistrationFailed(format!("tool sanitize request guardrail: {err}"))
385 },
386 )?;
387
388 let name_owned = qualified_name;
389 self.registrations.push(PluginRegistration::new(
390 "plugin",
391 name_owned.clone(),
392 Box::new(move || {
393 deregister_tool_sanitize_request_guardrail(&name_owned)
394 .map(|_| ())
395 .map_err(|err| {
396 PluginError::RegistrationFailed(format!(
397 "tool sanitize request guardrail deregistration failed: {err}"
398 ))
399 })
400 }),
401 ));
402 Ok(())
403 }
404
405 pub fn register_tool_sanitize_response_guardrail(
407 &mut self,
408 name: &str,
409 priority: i32,
410 callback: ToolSanitizeFn,
411 ) -> Result<()> {
412 let qualified_name = self.qualify_name(name);
413 register_tool_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
414 |err| {
415 PluginError::RegistrationFailed(format!("tool sanitize response guardrail: {err}"))
416 },
417 )?;
418
419 let name_owned = qualified_name;
420 self.registrations.push(PluginRegistration::new(
421 "plugin",
422 name_owned.clone(),
423 Box::new(move || {
424 deregister_tool_sanitize_response_guardrail(&name_owned)
425 .map(|_| ())
426 .map_err(|err| {
427 PluginError::RegistrationFailed(format!(
428 "tool sanitize response guardrail deregistration failed: {err}"
429 ))
430 })
431 }),
432 ));
433 Ok(())
434 }
435
436 pub fn register_tool_conditional_execution_guardrail(
438 &mut self,
439 name: &str,
440 priority: i32,
441 callback: ToolConditionalFn,
442 ) -> Result<()> {
443 let qualified_name = self.qualify_name(name);
444 register_tool_conditional_execution_guardrail(&qualified_name, priority, callback)
445 .map_err(|err| {
446 PluginError::RegistrationFailed(format!(
447 "tool conditional execution guardrail: {err}"
448 ))
449 })?;
450
451 let name_owned = qualified_name;
452 self.registrations.push(PluginRegistration::new(
453 "plugin",
454 name_owned.clone(),
455 Box::new(move || {
456 deregister_tool_conditional_execution_guardrail(&name_owned)
457 .map(|_| ())
458 .map_err(|err| {
459 PluginError::RegistrationFailed(format!(
460 "tool conditional execution guardrail deregistration failed: {err}"
461 ))
462 })
463 }),
464 ));
465 Ok(())
466 }
467
468 pub fn register_llm_sanitize_request_guardrail(
470 &mut self,
471 name: &str,
472 priority: i32,
473 callback: LlmSanitizeRequestFn,
474 ) -> Result<()> {
475 let qualified_name = self.qualify_name(name);
476 register_llm_sanitize_request_guardrail(&qualified_name, priority, callback).map_err(
477 |err| PluginError::RegistrationFailed(format!("llm sanitize request guardrail: {err}")),
478 )?;
479
480 let name_owned = qualified_name;
481 self.registrations.push(PluginRegistration::new(
482 "plugin",
483 name_owned.clone(),
484 Box::new(move || {
485 deregister_llm_sanitize_request_guardrail(&name_owned)
486 .map(|_| ())
487 .map_err(|err| {
488 PluginError::RegistrationFailed(format!(
489 "llm sanitize request guardrail deregistration failed: {err}"
490 ))
491 })
492 }),
493 ));
494 Ok(())
495 }
496
497 pub fn register_llm_sanitize_response_guardrail(
499 &mut self,
500 name: &str,
501 priority: i32,
502 callback: LlmSanitizeResponseFn,
503 ) -> Result<()> {
504 let qualified_name = self.qualify_name(name);
505 register_llm_sanitize_response_guardrail(&qualified_name, priority, callback).map_err(
506 |err| {
507 PluginError::RegistrationFailed(format!("llm sanitize response guardrail: {err}"))
508 },
509 )?;
510
511 let name_owned = qualified_name;
512 self.registrations.push(PluginRegistration::new(
513 "plugin",
514 name_owned.clone(),
515 Box::new(move || {
516 deregister_llm_sanitize_response_guardrail(&name_owned)
517 .map(|_| ())
518 .map_err(|err| {
519 PluginError::RegistrationFailed(format!(
520 "llm sanitize response guardrail deregistration failed: {err}"
521 ))
522 })
523 }),
524 ));
525 Ok(())
526 }
527
528 pub fn register_llm_conditional_execution_guardrail(
530 &mut self,
531 name: &str,
532 priority: i32,
533 callback: LlmConditionalFn,
534 ) -> Result<()> {
535 let qualified_name = self.qualify_name(name);
536 register_llm_conditional_execution_guardrail(&qualified_name, priority, callback).map_err(
537 |err| {
538 PluginError::RegistrationFailed(format!(
539 "llm conditional execution guardrail: {err}"
540 ))
541 },
542 )?;
543
544 let name_owned = qualified_name;
545 self.registrations.push(PluginRegistration::new(
546 "plugin",
547 name_owned.clone(),
548 Box::new(move || {
549 deregister_llm_conditional_execution_guardrail(&name_owned)
550 .map(|_| ())
551 .map_err(|err| {
552 PluginError::RegistrationFailed(format!(
553 "llm conditional execution guardrail deregistration failed: {err}"
554 ))
555 })
556 }),
557 ));
558 Ok(())
559 }
560
561 pub fn register_llm_execution_intercept(
563 &mut self,
564 name: &str,
565 priority: i32,
566 callback: LlmExecutionFn,
567 ) -> Result<()> {
568 let qualified_name = self.qualify_name(name);
569 register_llm_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
570 PluginError::RegistrationFailed(format!("llm execution intercept: {err}"))
571 })?;
572
573 let name_owned = qualified_name;
574 self.registrations.push(PluginRegistration::new(
575 "plugin",
576 name_owned.clone(),
577 Box::new(move || {
578 deregister_llm_execution_intercept(&name_owned)
579 .map(|_| ())
580 .map_err(|err| {
581 PluginError::RegistrationFailed(format!(
582 "llm execution intercept deregistration failed: {err}"
583 ))
584 })
585 }),
586 ));
587 Ok(())
588 }
589
590 pub fn register_llm_stream_execution_intercept(
592 &mut self,
593 name: &str,
594 priority: i32,
595 callback: LlmStreamExecutionFn,
596 ) -> Result<()> {
597 let qualified_name = self.qualify_name(name);
598 register_llm_stream_execution_intercept(&qualified_name, priority, callback).map_err(
599 |err| PluginError::RegistrationFailed(format!("llm stream execution intercept: {err}")),
600 )?;
601
602 let name_owned = qualified_name;
603 self.registrations.push(PluginRegistration::new(
604 "plugin",
605 name_owned.clone(),
606 Box::new(move || {
607 deregister_llm_stream_execution_intercept(&name_owned)
608 .map(|_| ())
609 .map_err(|err| {
610 PluginError::RegistrationFailed(format!(
611 "llm stream execution intercept deregistration failed: {err}"
612 ))
613 })
614 }),
615 ));
616 Ok(())
617 }
618
619 pub fn register_tool_request_intercept(
621 &mut self,
622 name: &str,
623 priority: i32,
624 break_chain: bool,
625 callback: ToolInterceptFn,
626 ) -> Result<()> {
627 let qualified_name = self.qualify_name(name);
628 register_tool_request_intercept(&qualified_name, priority, break_chain, callback).map_err(
629 |err| PluginError::RegistrationFailed(format!("tool request intercept: {err}")),
630 )?;
631
632 let name_owned = qualified_name;
633 self.registrations.push(PluginRegistration::new(
634 "plugin",
635 name_owned.clone(),
636 Box::new(move || {
637 deregister_tool_request_intercept(&name_owned)
638 .map(|_| ())
639 .map_err(|err| {
640 PluginError::RegistrationFailed(format!(
641 "tool request intercept deregistration failed: {err}"
642 ))
643 })
644 }),
645 ));
646 Ok(())
647 }
648
649 pub fn register_tool_execution_intercept(
651 &mut self,
652 name: &str,
653 priority: i32,
654 callback: ToolExecutionFn,
655 ) -> Result<()> {
656 let qualified_name = self.qualify_name(name);
657 register_tool_execution_intercept(&qualified_name, priority, callback).map_err(|err| {
658 PluginError::RegistrationFailed(format!("tool execution intercept: {err}"))
659 })?;
660
661 let name_owned = qualified_name;
662 self.registrations.push(PluginRegistration::new(
663 "plugin",
664 name_owned.clone(),
665 Box::new(move || {
666 deregister_tool_execution_intercept(&name_owned)
667 .map(|_| ())
668 .map_err(|err| {
669 PluginError::RegistrationFailed(format!(
670 "tool execution intercept deregistration failed: {err}"
671 ))
672 })
673 }),
674 ));
675 Ok(())
676 }
677
678 pub fn add_registration(&mut self, registration: PluginRegistration) {
680 self.registrations.push(registration);
681 }
682
683 pub fn extend_registrations(&mut self, registrations: Vec<PluginRegistration>) {
685 self.registrations.extend(registrations);
686 }
687
688 pub fn into_registrations(self) -> Vec<PluginRegistration> {
690 self.registrations
691 }
692}
693
694pub trait Plugin: Send + Sync + 'static {
696 fn plugin_kind(&self) -> &str;
698
699 fn allows_multiple_components(&self) -> bool {
704 true
705 }
706
707 fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic>;
712
713 fn register<'a>(
719 &'a self,
720 plugin_config: &Map<String, Json>,
721 ctx: &'a mut PluginRegistrationContext,
722 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
723}
724
725pub fn register_plugin(plugin: Arc<dyn Plugin>) -> Result<()> {
747 let mut guard = PLUGIN_HANDLERS
748 .write()
749 .map_err(|err| PluginError::Internal(format!("plugin registry lock poisoned: {err}")))?;
750 let plugin_kind = plugin.plugin_kind().to_string();
751 if guard.contains_key(&plugin_kind) {
752 return Err(PluginError::RegistrationFailed(format!(
753 "plugin '{plugin_kind}' is already registered"
754 )));
755 }
756 guard.insert(plugin_kind, plugin);
757 Ok(())
758}
759
760pub fn ensure_builtin_plugins_registered() -> Result<()> {
765 match BUILTIN_PLUGIN_REGISTRATION
766 .get_or_init(crate::observability::plugin_component::register_observability_component)
767 {
768 Ok(()) => Ok(()),
769 Err(err) => Err(clone_cached_plugin_error(err)),
770 }
771}
772
773fn clone_cached_plugin_error(err: &PluginError) -> PluginError {
774 match err {
775 PluginError::InvalidConfig(message) => PluginError::InvalidConfig(message.clone()),
776 PluginError::NotFound(message) => PluginError::NotFound(message.clone()),
777 PluginError::Serialization(err) => PluginError::Internal(err.to_string()),
778 PluginError::Internal(message) => PluginError::Internal(message.clone()),
779 PluginError::RegistrationFailed(message) => {
780 PluginError::RegistrationFailed(message.clone())
781 }
782 }
783}
784
785pub fn deregister_plugin(plugin_kind: &str) -> bool {
801 PLUGIN_HANDLERS
802 .write()
803 .ok()
804 .and_then(|mut guard| guard.remove(plugin_kind))
805 .is_some()
806}
807
808pub fn list_plugin_kinds() -> Vec<String> {
820 let _ = ensure_builtin_plugins_registered();
821 let mut kinds = PLUGIN_HANDLERS
822 .read()
823 .map(|guard| guard.keys().cloned().collect::<Vec<_>>())
824 .unwrap_or_default();
825 kinds.sort();
826 kinds
827}
828
829pub fn lookup_plugin(plugin_kind: &str) -> Option<Arc<dyn Plugin>> {
841 let _ = ensure_builtin_plugins_registered();
842 PLUGIN_HANDLERS
843 .read()
844 .ok()
845 .and_then(|guard| guard.get(plugin_kind).cloned())
846}
847
848pub fn validate_plugin_config(config: &PluginConfig) -> ConfigReport {
864 let _ = ensure_builtin_plugins_registered();
865 let mut report = ConfigReport::default();
866
867 if config.version != 1 {
868 push_policy_diag(
869 &mut report.diagnostics,
870 config.policy.unsupported_value,
871 "plugin.unsupported_config_version",
872 None,
873 Some("version".to_string()),
874 format!("plugin config version {} is unsupported", config.version),
875 );
876 }
877
878 validate_plugin_multiplicity(&mut report, config);
879
880 for component in &config.components {
881 let Some(plugin) = lookup_plugin(&component.kind) else {
882 push_policy_diag(
883 &mut report.diagnostics,
884 config.policy.unknown_component,
885 "plugin.unknown_component",
886 Some(component.kind.clone()),
887 None,
888 format!("plugin component kind '{}' is unsupported", component.kind),
889 );
890 continue;
891 };
892 report
893 .diagnostics
894 .extend(plugin.validate(&component.config));
895 }
896
897 report
898}
899
900#[cfg(feature = "schema")]
902pub fn plugin_config_schema() -> Json {
903 serde_json::to_value(schemars::schema_for!(PluginConfig))
904 .expect("plugin config schema should serialize")
905}
906
907pub async fn initialize_plugins(config: PluginConfig) -> Result<ConfigReport> {
928 let report = validate_plugin_config(&config);
929 if report.has_errors() {
930 return Err(PluginError::InvalidConfig(join_error_messages(&report)));
931 }
932
933 let previous = {
934 let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
935 PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
936 })?;
937 guard.take()
938 };
939
940 if let Some(mut previous_state) = previous {
941 rollback_registrations(&mut previous_state.registrations);
942 match initialize_plugin_components(&config).await {
943 Ok(registrations) => {
944 store_active_plugin_configuration(config, report.clone(), registrations)?;
945 Ok(report)
946 }
947 Err(err) => match initialize_plugin_components(&previous_state.config).await {
948 Ok(registrations) => {
949 let previous_report = validate_plugin_config(&previous_state.config);
950 store_active_plugin_configuration(
951 previous_state.config,
952 previous_report,
953 registrations,
954 )?;
955 Err(err)
956 }
957 Err(restore_err) => Err(PluginError::RegistrationFailed(format!(
958 "{err}; previous plugin configuration could not be restored: {restore_err}"
959 ))),
960 },
961 }
962 } else {
963 let registrations = initialize_plugin_components(&config).await?;
964 store_active_plugin_configuration(config, report.clone(), registrations)?;
965 Ok(report)
966 }
967}
968
969pub fn clear_plugin_configuration() -> Result<()> {
985 let previous = {
986 let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
987 PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
988 })?;
989 guard.take()
990 };
991 if let Some(mut previous_state) = previous {
992 rollback_registrations(&mut previous_state.registrations);
993 }
994 Ok(())
995}
996
997pub fn active_plugin_report() -> Option<ConfigReport> {
1009 ACTIVE_PLUGIN_CONFIGURATION
1010 .lock()
1011 .ok()
1012 .and_then(|guard| guard.as_ref().map(|state| state.report.clone()))
1013}
1014
1015pub fn rollback_registrations(registrations: &mut Vec<PluginRegistration>) {
1020 for registration in registrations.iter_mut().rev() {
1021 let _ = (registration.deregister)();
1022 }
1023 registrations.clear();
1024}
1025
1026struct ActivePluginConfiguration {
1027 config: PluginConfig,
1028 report: ConfigReport,
1029 registrations: Vec<PluginRegistration>,
1030}
1031
1032async fn initialize_plugin_components(config: &PluginConfig) -> Result<Vec<PluginRegistration>> {
1033 ensure_builtin_plugins_registered()?;
1034 let totals = plugin_component_totals(config);
1035 let mut ordinals: HashMap<&str, usize> = HashMap::new();
1036 let mut registrations = vec![];
1037
1038 for component in config
1039 .components
1040 .iter()
1041 .filter(|component| component.enabled)
1042 {
1043 let Some(plugin) = lookup_plugin(&component.kind) else {
1044 rollback_registrations(&mut registrations);
1045 return Err(PluginError::NotFound(format!(
1046 "plugin component '{}' is not registered",
1047 component.kind
1048 )));
1049 };
1050
1051 let ordinal = ordinals
1052 .entry(component.kind.as_str())
1053 .and_modify(|value| *value += 1)
1054 .or_insert(1);
1055 let namespace = component_namespace(
1056 &component.kind,
1057 *ordinal,
1058 totals.get(component.kind.as_str()).copied().unwrap_or(1),
1059 );
1060
1061 let mut ctx = PluginRegistrationContext::with_namespace(namespace);
1062 if let Err(err) = plugin.register(&component.config, &mut ctx).await {
1063 let mut just_registered = ctx.into_registrations();
1064 rollback_registrations(&mut just_registered);
1065 rollback_registrations(&mut registrations);
1066 return Err(err);
1067 }
1068 registrations.extend(ctx.into_registrations());
1069 }
1070
1071 Ok(registrations)
1072}
1073
1074fn store_active_plugin_configuration(
1075 config: PluginConfig,
1076 report: ConfigReport,
1077 registrations: Vec<PluginRegistration>,
1078) -> Result<()> {
1079 let mut guard = ACTIVE_PLUGIN_CONFIGURATION.lock().map_err(|err| {
1080 PluginError::Internal(format!("active plugin configuration lock poisoned: {err}"))
1081 })?;
1082 *guard = Some(ActivePluginConfiguration {
1083 config,
1084 report,
1085 registrations,
1086 });
1087 Ok(())
1088}
1089
1090fn plugin_component_totals(config: &PluginConfig) -> HashMap<&str, usize> {
1091 let mut totals = HashMap::new();
1092 for component in &config.components {
1093 *totals.entry(component.kind.as_str()).or_insert(0) += 1;
1094 }
1095 totals
1096}
1097
1098fn component_namespace(kind: &str, ordinal: usize, total: usize) -> String {
1099 if total > 1 {
1100 format!("__nemo_flow_plugin__{kind}__{ordinal}__")
1101 } else {
1102 format!("__nemo_flow_plugin__{kind}__")
1103 }
1104}
1105
1106fn validate_plugin_multiplicity(report: &mut ConfigReport, config: &PluginConfig) {
1107 let totals = plugin_component_totals(config);
1108 let mut emitted = HashSet::new();
1109
1110 for component in &config.components {
1111 let count = totals
1112 .get(component.kind.as_str())
1113 .copied()
1114 .unwrap_or_default();
1115 if count <= 1 || !emitted.insert(component.kind.clone()) {
1116 continue;
1117 }
1118
1119 let allows_multiple = lookup_plugin(&component.kind)
1120 .map(|plugin| plugin.allows_multiple_components())
1121 .unwrap_or(true);
1122 if !allows_multiple {
1123 report.diagnostics.push(ConfigDiagnostic {
1124 level: DiagnosticLevel::Error,
1125 code: "plugin.duplicate_component".to_string(),
1126 component: Some(component.kind.clone()),
1127 field: None,
1128 message: format!(
1129 "plugin component kind '{}' may only appear once",
1130 component.kind
1131 ),
1132 });
1133 }
1134 }
1135}
1136
1137fn push_policy_diag(
1138 diagnostics: &mut Vec<ConfigDiagnostic>,
1139 behavior: UnsupportedBehavior,
1140 code: &str,
1141 component: Option<String>,
1142 field: Option<String>,
1143 message: String,
1144) {
1145 let level = match behavior {
1146 UnsupportedBehavior::Ignore => return,
1147 UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
1148 UnsupportedBehavior::Error => DiagnosticLevel::Error,
1149 };
1150
1151 diagnostics.push(ConfigDiagnostic {
1152 level,
1153 code: code.to_string(),
1154 component,
1155 field,
1156 message,
1157 });
1158}
1159
1160fn join_error_messages(report: &ConfigReport) -> String {
1161 report
1162 .diagnostics
1163 .iter()
1164 .filter(|diag| diag.level == DiagnosticLevel::Error)
1165 .map(|diag| diag.message.as_str())
1166 .collect::<Vec<_>>()
1167 .join("; ")
1168}
1169
1170#[cfg(test)]
1171#[path = "../tests/unit/plugin_tests.rs"]
1172mod tests;