use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Instant;
use tokio::task::JoinSet;
use crate::core::{
system_events, Event, EventBus, Plugin, PluginContext, PluginError, PluginOutput,
PluginRegistry, PluginResult, PluginSystemConfig, RetryConfig, SandboxManager, SecurityManager,
};
use std::sync::Arc;
pub struct PluginExecutor {
registry: PluginRegistry,
workspace: PathBuf,
security_manager: SecurityManager,
sandbox_manager: SandboxManager,
config: PluginSystemConfig,
event_bus: Arc<EventBus>,
}
#[derive(Debug)]
pub struct ExecutionResult {
pub success: bool,
pub plugin_outputs: HashMap<String, PluginOutput>,
pub execution_order: Vec<Vec<String>>,
pub total_duration: std::time::Duration,
pub failed_plugins: Vec<String>,
}
impl PluginExecutor {
pub fn new(registry: PluginRegistry, workspace: PathBuf) -> Self {
Self {
registry,
workspace,
security_manager: SecurityManager::new(),
sandbox_manager: SandboxManager::new(),
config: PluginSystemConfig::default(),
event_bus: Arc::new(EventBus::new()),
}
}
pub fn with_config(registry: PluginRegistry, config: PluginSystemConfig) -> Self {
let workspace = config
.system
.workspace
.clone()
.unwrap_or_else(|| std::env::temp_dir().join("pluggable-workspace"));
Self {
registry,
workspace,
security_manager: SecurityManager::new(),
sandbox_manager: SandboxManager::new(),
config,
event_bus: Arc::new(EventBus::new()),
}
}
pub fn with_security_manager(
registry: PluginRegistry,
workspace: PathBuf,
security_manager: SecurityManager,
) -> Self {
Self {
registry,
workspace,
security_manager,
sandbox_manager: SandboxManager::new(),
config: PluginSystemConfig::default(),
event_bus: Arc::new(EventBus::new()),
}
}
pub fn with_managers(
registry: PluginRegistry,
workspace: PathBuf,
security_manager: SecurityManager,
sandbox_manager: SandboxManager,
) -> Self {
Self {
registry,
workspace,
security_manager,
sandbox_manager,
config: PluginSystemConfig::default(),
event_bus: Arc::new(EventBus::new()),
}
}
pub fn with_full_config(
registry: PluginRegistry,
config: PluginSystemConfig,
security_manager: SecurityManager,
sandbox_manager: SandboxManager,
) -> Self {
let workspace = config
.system
.workspace
.clone()
.unwrap_or_else(|| std::env::temp_dir().join("pluggable-workspace"));
Self {
registry,
workspace,
security_manager,
sandbox_manager,
config,
event_bus: Arc::new(EventBus::new()),
}
}
pub async fn execute_pipeline(&mut self) -> PluginResult<ExecutionResult> {
let start_time = Instant::now();
if let Err(e) = self
.event_bus
.publish(Event::new(
system_events::PIPELINE_STARTED,
serde_json::json!({
"plugin_count": self.registry.len()
}),
))
.await
{
eprintln!("Warning: Failed to publish pipeline started event: {e}");
}
self.registry.validate_dependencies()?;
let execution_order = self.registry.resolve_execution_order()?;
let mut plugin_outputs = HashMap::new();
let mut failed_plugins = Vec::new();
let mut global_success = true;
for batch in &execution_order {
let batch_result = self.execute_batch(batch, &plugin_outputs).await;
match batch_result {
Ok(batch_outputs) => {
for (plugin_name, output) in batch_outputs {
plugin_outputs.insert(plugin_name, output);
}
}
Err(batch_errors) => {
global_success = false;
for (plugin_name, error) in batch_errors {
failed_plugins.push(plugin_name.clone());
plugin_outputs.insert(
plugin_name,
PluginOutput::failure(format!("Execution failed: {error}")),
);
}
break;
}
}
}
let total_duration = start_time.elapsed();
let pipeline_event = if global_success {
Event::new(
system_events::PIPELINE_COMPLETED,
serde_json::json!({
"success": true,
"duration_ms": total_duration.as_millis(),
"plugin_count": plugin_outputs.len(),
"failed_plugins": failed_plugins.len()
}),
)
} else {
Event::new(
system_events::PIPELINE_FAILED,
serde_json::json!({
"success": false,
"duration_ms": total_duration.as_millis(),
"plugin_count": plugin_outputs.len(),
"failed_plugins": failed_plugins.clone()
}),
)
.with_priority(crate::core::events::EventPriority::High)
};
if let Err(e) = self.event_bus.publish(pipeline_event).await {
eprintln!("Warning: Failed to publish pipeline completion event: {e}");
}
Ok(ExecutionResult {
success: global_success,
plugin_outputs,
execution_order,
total_duration,
failed_plugins,
})
}
async fn execute_batch(
&mut self,
batch: &[String],
previous_outputs: &HashMap<String, PluginOutput>,
) -> Result<HashMap<String, PluginOutput>, HashMap<String, PluginError>> {
let mut join_set = JoinSet::new();
let mut plugin_handles = HashMap::new();
let max_parallel = self.config.system.max_parallel_plugins;
let batch_size = std::cmp::min(batch.len(), max_parallel);
let batch_slice = &batch[..batch_size];
for plugin_name in batch_slice {
if let Some(plugin) = self.registry.take_plugin(plugin_name) {
let plugin_permissions =
if let Some(plugin_config) = self.config.plugins.get(plugin_name) {
if !plugin_config.permissions.is_empty() {
plugin_config.permissions.clone()
} else {
plugin.permissions()
}
} else {
plugin.permissions()
};
if let Err(e) = self
.security_manager
.validate_plugin_permissions(plugin_name, &plugin_permissions)
{
let mut errors = HashMap::new();
errors.insert(plugin_name.clone(), e);
return Err(errors);
}
self.security_manager
.grant_permissions(plugin_name, plugin_permissions);
let security_context = match self.security_manager.create_context(plugin_name) {
Ok(ctx) => ctx,
Err(e) => {
let mut errors = HashMap::new();
errors.insert(plugin_name.clone(), e);
return Err(errors);
}
};
if let Err(e) = self
.sandbox_manager
.create_sandbox(plugin_name.clone(), security_context.clone())
.await
{
let mut errors = HashMap::new();
errors.insert(plugin_name.clone(), e);
return Err(errors);
}
let mut context = PluginContext::with_security_and_events(
plugin_name.clone(),
self.workspace.clone(),
security_context,
self.event_bus.clone(),
);
let plugin_metadata = plugin.metadata();
for dependency in &plugin_metadata.dependencies {
if let Some(dep_output) = previous_outputs.get(dependency) {
context.add_dependency_output(dependency.clone(), dep_output.clone());
}
}
let plugin_config = self
.config
.plugins
.get(plugin_name)
.map(|pc| pc.config.clone())
.unwrap_or_else(|| serde_json::json!({}));
let retry_config = self
.config
.plugins
.get(plugin_name)
.map(|pc| &pc.retry)
.cloned()
.unwrap_or_default();
let handle = join_set.spawn(async move {
Self::execute_single_plugin_with_retry(
plugin,
plugin_config,
context,
retry_config,
)
.await
});
plugin_handles.insert(plugin_name.clone(), handle);
if plugin_handles.len() >= max_parallel {
break;
}
} else {
let mut errors = HashMap::new();
errors.insert(
plugin_name.clone(),
PluginError::PluginNotFound(plugin_name.clone()),
);
return Err(errors);
}
}
let mut batch_outputs = HashMap::new();
let mut batch_errors = HashMap::new();
while let Some(result) = join_set.join_next().await {
match result {
Ok((plugin_name, plugin_result, plugin_box)) => {
if let Err(e) = self.sandbox_manager.remove_sandbox(&plugin_name).await {
eprintln!("Warning: Failed to cleanup sandbox for '{plugin_name}': {e}");
}
if let Err(e) = self.registry.put_plugin(plugin_box) {
batch_errors.insert(plugin_name.clone(), e);
continue;
}
match plugin_result {
Ok(output) => {
batch_outputs.insert(plugin_name, output);
}
Err(error) => {
batch_errors.insert(plugin_name, error);
}
}
}
Err(join_error) => {
batch_errors.insert(
"unknown".to_string(),
PluginError::ExecutionFailed(format!("Task join error: {join_error}")),
);
}
}
}
if batch_errors.is_empty() {
Ok(batch_outputs)
} else {
Err(batch_errors)
}
}
async fn execute_single_plugin_with_retry(
plugin: Box<dyn Plugin>,
config: serde_json::Value,
context: PluginContext,
retry_config: RetryConfig,
) -> (String, PluginResult<PluginOutput>, Box<dyn Plugin>) {
let plugin_name = plugin.metadata().name.clone();
let mut current_plugin = plugin;
let mut last_error = None;
for attempt in 0..retry_config.max_attempts {
let result =
Self::execute_single_plugin(current_plugin, config.clone(), context.clone()).await;
match &result.1 {
Ok(_) => return result, Err(e) => {
last_error = Some(e.clone());
current_plugin = result.2;
if attempt < retry_config.max_attempts - 1 {
let delay = std::cmp::min(
(retry_config.delay_ms as f64
* retry_config.backoff_multiplier.powi(attempt as i32))
as u64,
retry_config.max_delay_ms,
);
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
}
}
}
}
(plugin_name, Err(last_error.unwrap()), current_plugin)
}
async fn execute_single_plugin(
mut plugin: Box<dyn Plugin>,
config: serde_json::Value,
mut context: PluginContext,
) -> (String, PluginResult<PluginOutput>, Box<dyn Plugin>) {
let plugin_name = plugin.metadata().name.clone();
let start_time = Instant::now();
if let Err(e) = context
.publish_event(system_events::plugin_execution_started(&plugin_name))
.await
{
eprintln!("Warning: Failed to publish execution started event: {e}");
}
let result = async {
plugin.before_initialize(&context).await?;
plugin.initialize(config, &context).await?;
plugin.after_initialize(&context).await?;
if let Err(e) = context
.publish_event(system_events::plugin_initialized(&plugin_name))
.await
{
eprintln!("Warning: Failed to publish initialized event: {e}");
}
plugin.before_execute(&context).await?;
let mut output = plugin.execute(&mut context).await?;
output.execution_time = start_time.elapsed();
plugin.after_execute(&context, &output).await?;
plugin.on_success(&context, &output).await?;
Ok(output)
}
.await;
let final_result = match &result {
Ok(_output) => {
if let Err(e) = context
.publish_event(system_events::plugin_execution_completed(
&plugin_name,
start_time.elapsed(),
))
.await
{
eprintln!("Warning: Failed to publish execution completed event: {e}");
}
result
}
Err(error) => {
if let Err(hook_error) = plugin.on_error(&context, error).await {
eprintln!("Warning: on_error hook failed for {plugin_name}: {hook_error}");
}
if let Err(e) = context
.publish_event(system_events::plugin_execution_failed(&plugin_name, error))
.await
{
eprintln!("Warning: Failed to publish execution failed event: {e}");
}
result
}
};
if let Err(e) = context
.publish_event(system_events::plugin_cleanup_started(&plugin_name))
.await
{
eprintln!("Warning: Failed to publish cleanup started event: {e}");
}
if let Err(hook_error) = plugin.before_cleanup(&context).await {
eprintln!("Warning: before_cleanup hook failed for {plugin_name}: {hook_error}");
}
let cleanup_result = plugin.cleanup(&context).await;
if cleanup_result.is_ok() {
if let Err(hook_error) = plugin.after_cleanup(&context).await {
eprintln!("Warning: after_cleanup hook failed for {plugin_name}: {hook_error}");
}
}
if let Err(e) = context
.publish_event(system_events::plugin_cleanup_completed(&plugin_name))
.await
{
eprintln!("Warning: Failed to publish cleanup completed event: {e}");
}
if let Err(cleanup_error) = cleanup_result {
if final_result.is_ok() {
return (
plugin_name,
Err(PluginError::CleanupFailed(cleanup_error.to_string())),
plugin,
);
}
eprintln!("Warning: Plugin cleanup failed: {cleanup_error}");
}
(plugin_name, final_result, plugin)
}
pub async fn execute_plugin(&mut self, plugin_name: &str) -> PluginResult<PluginOutput> {
let plugin = self
.registry
.take_plugin(plugin_name)
.ok_or_else(|| PluginError::PluginNotFound(plugin_name.to_string()))?;
let plugin_permissions = plugin.permissions();
self.security_manager
.validate_plugin_permissions(plugin_name, &plugin_permissions)?;
self.security_manager
.grant_permissions(plugin_name, plugin_permissions);
let security_context = self.security_manager.create_context(plugin_name)?;
self.sandbox_manager
.create_sandbox(plugin_name.to_string(), security_context.clone())
.await?;
let context = PluginContext::with_security_and_events(
plugin_name,
self.workspace.clone(),
security_context,
self.event_bus.clone(),
);
let plugin_config = self
.config
.plugins
.get(plugin_name)
.map(|pc| pc.config.clone())
.unwrap_or_else(|| serde_json::json!({}));
let retry_config = self
.config
.plugins
.get(plugin_name)
.map(|pc| &pc.retry)
.cloned()
.unwrap_or_default();
let (_, result, plugin_box) =
Self::execute_single_plugin_with_retry(plugin, plugin_config, context, retry_config)
.await;
if let Err(e) = self.sandbox_manager.remove_sandbox(plugin_name).await {
eprintln!("Warning: Failed to cleanup sandbox for '{plugin_name}': {e}");
}
self.registry.put_plugin(plugin_box)?;
result
}
pub fn registry(&self) -> &PluginRegistry {
&self.registry
}
pub fn registry_mut(&mut self) -> &mut PluginRegistry {
&mut self.registry
}
pub fn security_manager_mut(&mut self) -> &mut SecurityManager {
&mut self.security_manager
}
pub fn sandbox_manager(&self) -> &SandboxManager {
&self.sandbox_manager
}
pub fn sandbox_manager_mut(&mut self) -> &mut SandboxManager {
&mut self.sandbox_manager
}
pub fn event_bus(&self) -> &Arc<EventBus> {
&self.event_bus
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{Permission, PluginMetadata, PluginRegistry, SecurityManager};
use async_trait::async_trait;
use serde_json::json;
use tempfile::TempDir;
struct TestPluginA {
metadata: PluginMetadata,
execution_count: std::sync::Arc<std::sync::Mutex<u32>>,
}
impl TestPluginA {
fn new(execution_count: std::sync::Arc<std::sync::Mutex<u32>>) -> Self {
let mut metadata = PluginMetadata::new("plugin-a", "1.0.0");
metadata.dependencies = vec![];
Self {
metadata,
execution_count,
}
}
}
#[async_trait]
impl Plugin for TestPluginA {
fn metadata(&self) -> &PluginMetadata {
&self.metadata
}
fn schema(&self) -> serde_json::Value {
json!({})
}
async fn initialize(
&mut self,
_config: serde_json::Value,
_context: &PluginContext,
) -> PluginResult<()> {
Ok(())
}
async fn execute(&mut self, _context: &mut PluginContext) -> PluginResult<PluginOutput> {
let mut count = self.execution_count.lock().unwrap();
*count += 1;
Ok(PluginOutput::success(
json!({"plugin": "a", "count": *count}),
))
}
async fn cleanup(&mut self, _context: &PluginContext) -> PluginResult<()> {
Ok(())
}
}
struct TestPluginB {
metadata: PluginMetadata,
execution_count: std::sync::Arc<std::sync::Mutex<u32>>,
}
impl TestPluginB {
fn new(execution_count: std::sync::Arc<std::sync::Mutex<u32>>) -> Self {
let mut metadata = PluginMetadata::new("plugin-b", "1.0.0");
metadata.dependencies = vec!["plugin-a".to_string()];
Self {
metadata,
execution_count,
}
}
}
#[async_trait]
impl Plugin for TestPluginB {
fn metadata(&self) -> &PluginMetadata {
&self.metadata
}
fn schema(&self) -> serde_json::Value {
json!({})
}
async fn initialize(
&mut self,
_config: serde_json::Value,
_context: &PluginContext,
) -> PluginResult<()> {
Ok(())
}
async fn execute(&mut self, context: &mut PluginContext) -> PluginResult<PluginOutput> {
let dep_output = context.get_dependency_output("plugin-a");
assert!(dep_output.is_some());
let mut count = self.execution_count.lock().unwrap();
*count += 1;
Ok(PluginOutput::success(
json!({"plugin": "b", "count": *count}),
))
}
async fn cleanup(&mut self, _context: &PluginContext) -> PluginResult<()> {
Ok(())
}
}
struct FailingPlugin {
metadata: PluginMetadata,
}
impl FailingPlugin {
fn new() -> Self {
let metadata = PluginMetadata::new("failing-plugin", "1.0.0");
Self { metadata }
}
}
#[async_trait]
impl Plugin for FailingPlugin {
fn metadata(&self) -> &PluginMetadata {
&self.metadata
}
fn schema(&self) -> serde_json::Value {
json!({})
}
async fn initialize(
&mut self,
_config: serde_json::Value,
_context: &PluginContext,
) -> PluginResult<()> {
Ok(())
}
async fn execute(&mut self, _context: &mut PluginContext) -> PluginResult<PluginOutput> {
Err(PluginError::ExecutionFailed(
"Intentional failure".to_string(),
))
}
async fn cleanup(&mut self, _context: &PluginContext) -> PluginResult<()> {
Ok(())
}
}
#[tokio::test]
async fn test_executor_creation() {
let registry = PluginRegistry::new();
let temp_dir = TempDir::new().unwrap();
let executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
assert_eq!(executor.registry().len(), 0);
}
#[tokio::test]
async fn test_single_plugin_execution() {
let mut registry = PluginRegistry::new();
let execution_count = std::sync::Arc::new(std::sync::Mutex::new(0));
let plugin = TestPluginA::new(execution_count.clone());
registry.register(plugin).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
let result = executor.execute_plugin("plugin-a").await.unwrap();
assert!(result.success);
assert_eq!(result.data["plugin"], "a");
assert_eq!(*execution_count.lock().unwrap(), 1);
}
#[tokio::test]
async fn test_pipeline_execution_with_dependencies() {
let mut registry = PluginRegistry::new();
let execution_count = std::sync::Arc::new(std::sync::Mutex::new(0));
let plugin_a = TestPluginA::new(execution_count.clone());
let plugin_b = TestPluginB::new(execution_count.clone());
registry.register(plugin_a).unwrap();
registry.register(plugin_b).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
let result = executor.execute_pipeline().await.unwrap();
assert!(result.success);
assert_eq!(result.plugin_outputs.len(), 2);
assert_eq!(result.execution_order.len(), 2); assert_eq!(result.execution_order[0], vec!["plugin-a"]);
assert_eq!(result.execution_order[1], vec!["plugin-b"]);
assert_eq!(*execution_count.lock().unwrap(), 2);
}
#[tokio::test]
async fn test_pipeline_execution_with_failure() {
let mut registry = PluginRegistry::new();
let execution_count = std::sync::Arc::new(std::sync::Mutex::new(0));
let plugin_a = TestPluginA::new(execution_count.clone());
let failing_plugin = FailingPlugin::new();
registry.register(plugin_a).unwrap();
registry.register(failing_plugin).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
let result = executor.execute_pipeline().await.unwrap();
assert!(!result.success);
assert_eq!(result.failed_plugins.len(), 1);
assert!(result
.failed_plugins
.contains(&"failing-plugin".to_string()));
}
#[tokio::test]
async fn test_executor_with_config() {
let config = PluginSystemConfig::default();
assert_eq!(config.system.max_parallel_plugins, 4);
assert!(config.plugins.is_empty());
let mut custom_config = PluginSystemConfig::default();
custom_config.system.max_parallel_plugins = 8;
let plugin_config = crate::core::config::PluginConfig {
config: json!({"key": "value"}),
..Default::default()
};
custom_config
.plugins
.insert("test".to_string(), plugin_config);
assert_eq!(custom_config.system.max_parallel_plugins, 8);
assert_eq!(custom_config.plugins.len(), 1);
}
struct SecuredTestPlugin {
metadata: PluginMetadata,
permissions: Vec<Permission>,
}
impl SecuredTestPlugin {
fn new(name: &str, permissions: Vec<Permission>) -> Self {
let metadata = PluginMetadata::new(name, "1.0.0");
Self {
metadata,
permissions,
}
}
}
#[async_trait]
impl Plugin for SecuredTestPlugin {
fn metadata(&self) -> &PluginMetadata {
&self.metadata
}
fn schema(&self) -> serde_json::Value {
json!({})
}
fn permissions(&self) -> Vec<Permission> {
self.permissions.clone()
}
async fn initialize(
&mut self,
_config: serde_json::Value,
_context: &PluginContext,
) -> PluginResult<()> {
Ok(())
}
async fn execute(&mut self, context: &mut PluginContext) -> PluginResult<PluginOutput> {
if let Some(security_ctx) = context.security_context() {
assert_eq!(security_ctx.plugin_name(), self.metadata.name);
}
Ok(PluginOutput::success(
json!({"message": "Secured execution"}),
))
}
async fn cleanup(&mut self, _context: &PluginContext) -> PluginResult<()> {
Ok(())
}
}
#[tokio::test]
async fn test_plugin_execution_with_permissions() {
let mut registry = PluginRegistry::new();
let permissions = vec![Permission::fs_read("/tmp"), Permission::TempDir];
let plugin = SecuredTestPlugin::new("secured-plugin", permissions);
registry.register(plugin).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
let result = executor.execute_plugin("secured-plugin").await.unwrap();
assert!(result.success);
assert_eq!(result.data["message"], "Secured execution");
}
#[tokio::test]
async fn test_plugin_execution_with_restricted_permissions() {
let mut registry = PluginRegistry::new();
let permissions = vec![Permission::fs_read("/etc")];
let plugin = SecuredTestPlugin::new("secured-plugin", permissions);
registry.register(plugin).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
executor
.security_manager_mut()
.add_global_restriction(Permission::fs_read("/etc"));
let result = executor.execute_plugin("secured-plugin").await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
PluginError::PermissionDenied { .. }
));
}
#[tokio::test]
async fn test_pipeline_execution_with_security() {
let mut registry = PluginRegistry::new();
let _execution_count = std::sync::Arc::new(std::sync::Mutex::new(0));
let plugin_a = SecuredTestPlugin::new("secured-plugin-a", vec![Permission::TempDir]);
let plugin_b =
SecuredTestPlugin::new("secured-plugin-b", vec![Permission::fs_read("/tmp")]);
registry.register(plugin_a).unwrap();
registry.register(plugin_b).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
let result = executor.execute_pipeline().await.unwrap();
assert!(result.success);
assert_eq!(result.plugin_outputs.len(), 2);
}
#[tokio::test]
async fn test_executor_with_custom_security_manager() {
let mut registry = PluginRegistry::new();
let plugin = SecuredTestPlugin::new("secured-plugin", vec![Permission::fs_read("/tmp")]);
registry.register(plugin).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut security_manager = SecurityManager::new();
security_manager.grant_permissions("secured-plugin", vec![Permission::fs_read("/tmp")]);
let mut executor = PluginExecutor::with_security_manager(
registry,
temp_dir.path().to_path_buf(),
security_manager,
);
let result = executor.execute_plugin("secured-plugin").await.unwrap();
assert!(result.success);
}
#[tokio::test]
async fn test_executor_with_sandbox_integration() {
let mut registry = PluginRegistry::new();
let plugin = SecuredTestPlugin::new("sandbox-test-plugin", vec![Permission::TempDir]);
registry.register(plugin).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
assert_eq!(executor.sandbox_manager().active_sandboxes().await.len(), 0);
let result = executor
.execute_plugin("sandbox-test-plugin")
.await
.unwrap();
assert!(result.success);
assert_eq!(result.data["message"], "Secured execution");
assert_eq!(executor.sandbox_manager().active_sandboxes().await.len(), 0);
}
#[tokio::test]
async fn test_pipeline_execution_with_sandbox_cleanup() {
let mut registry = PluginRegistry::new();
let plugin_a = SecuredTestPlugin::new("sandbox-plugin-a", vec![Permission::TempDir]);
let plugin_b =
SecuredTestPlugin::new("sandbox-plugin-b", vec![Permission::fs_read("/tmp")]);
registry.register(plugin_a).unwrap();
registry.register(plugin_b).unwrap();
let temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::new(registry, temp_dir.path().to_path_buf());
assert_eq!(executor.sandbox_manager().active_sandboxes().await.len(), 0);
let result = executor.execute_pipeline().await.unwrap();
assert!(result.success);
assert_eq!(result.plugin_outputs.len(), 2);
assert_eq!(executor.sandbox_manager().active_sandboxes().await.len(), 0);
}
#[tokio::test]
async fn test_configuration_driven_execution() {
let mut registry = PluginRegistry::new();
let execution_count = std::sync::Arc::new(std::sync::Mutex::new(0));
let plugin_a = TestPluginA::new(execution_count.clone());
registry.register(plugin_a).unwrap();
let mut config = PluginSystemConfig::default();
config.system.max_parallel_plugins = 2;
config.system.debug = true;
let plugin_config = crate::core::config::PluginConfig {
enabled: true,
config: json!({"custom_setting": "test_value"}),
permissions: vec![Permission::TempDir],
retry: crate::core::config::RetryConfig {
max_attempts: 2,
..Default::default()
},
..Default::default()
};
config.plugins.insert("plugin-a".to_string(), plugin_config);
let mut executor = PluginExecutor::with_config(registry, config.clone());
let result = executor.execute_plugin("plugin-a").await.unwrap();
assert!(result.success);
assert_eq!(*execution_count.lock().unwrap(), 1);
let pipeline_result = executor.execute_pipeline().await.unwrap();
assert!(pipeline_result.success);
assert_eq!(pipeline_result.plugin_outputs.len(), 1);
assert_eq!(*execution_count.lock().unwrap(), 2);
assert_eq!(executor.config.system.max_parallel_plugins, 2);
assert!(executor.config.system.debug);
assert!(executor.config.plugins.contains_key("plugin-a"));
}
#[tokio::test]
async fn test_configuration_permission_override() {
let mut registry = PluginRegistry::new();
let plugin = SecuredTestPlugin::new("test-plugin", vec![Permission::fs_read("/etc")]);
registry.register(plugin).unwrap();
let mut config = PluginSystemConfig::default();
let plugin_config = crate::core::config::PluginConfig {
permissions: vec![Permission::TempDir], ..Default::default()
};
config
.plugins
.insert("test-plugin".to_string(), plugin_config);
let _temp_dir = TempDir::new().unwrap();
let mut executor = PluginExecutor::with_config(registry, config);
let result = executor.execute_plugin("test-plugin").await.unwrap();
assert!(result.success);
}
}