1use super::*;
7use mockforge_plugin_core::{
8 PluginCapabilities, PluginContext, PluginHealth, PluginId, PluginMetrics, PluginResult,
9 PluginState,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use wasmtime::{Engine, Linker, Module, Store};
15use wasmtime_wasi::p2::{WasiCtx, WasiCtxBuilder};
16
17pub struct PluginSandbox {
19 engine: Option<Arc<Engine>>,
21 _config: PluginLoaderConfig,
23 active_sandboxes: RwLock<HashMap<PluginId, SandboxInstance>>,
25}
26
27impl PluginSandbox {
28 pub fn new(config: PluginLoaderConfig) -> Self {
30 let engine = Some(Arc::new(Engine::default()));
32
33 Self {
34 engine,
35 _config: config,
36 active_sandboxes: RwLock::new(HashMap::new()),
37 }
38 }
39
40 pub async fn create_plugin_instance(
42 &self,
43 context: &PluginLoadContext,
44 ) -> LoaderResult<PluginInstance> {
45 let plugin_id = &context.plugin_id;
46
47 {
49 let sandboxes = self.active_sandboxes.read().await;
50 if sandboxes.contains_key(plugin_id) {
51 return Err(PluginLoaderError::already_loaded(plugin_id.clone()));
52 }
53 }
54
55 let sandbox = if let Some(ref engine) = self.engine {
57 SandboxInstance::new(engine, context).await?
58 } else {
59 SandboxInstance::stub_new(context).await?
61 };
62
63 let mut sandboxes = self.active_sandboxes.write().await;
65 sandboxes.insert(plugin_id.clone(), sandbox);
66
67 let mut core_instance =
69 mockforge_plugin_core::PluginInstance::new(plugin_id.clone(), context.manifest.clone());
70 core_instance.set_state(PluginState::Ready);
71
72 Ok(core_instance)
73 }
74
75 pub async fn execute_plugin_function(
77 &self,
78 plugin_id: &PluginId,
79 function_name: &str,
80 context: &PluginContext,
81 input: &[u8],
82 ) -> LoaderResult<PluginResult<serde_json::Value>> {
83 let mut sandboxes = self.active_sandboxes.write().await;
84 let sandbox = sandboxes
85 .get_mut(plugin_id)
86 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
87
88 sandbox.execute_function(function_name, context, input).await
89 }
90
91 pub async fn get_plugin_health(&self, plugin_id: &PluginId) -> LoaderResult<PluginHealth> {
93 let sandboxes = self.active_sandboxes.read().await;
94 let sandbox = sandboxes
95 .get(plugin_id)
96 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
97
98 Ok(sandbox.get_health().await)
99 }
100
101 pub async fn destroy_sandbox(&self, plugin_id: &PluginId) -> LoaderResult<()> {
103 let mut sandboxes = self.active_sandboxes.write().await;
104 if let Some(mut sandbox) = sandboxes.remove(plugin_id) {
105 sandbox.destroy().await?;
106 }
107 Ok(())
108 }
109
110 pub async fn list_active_sandboxes(&self) -> Vec<PluginId> {
112 let sandboxes = self.active_sandboxes.read().await;
113 sandboxes.keys().cloned().collect()
114 }
115
116 pub async fn get_sandbox_resources(
118 &self,
119 plugin_id: &PluginId,
120 ) -> LoaderResult<SandboxResources> {
121 let sandboxes = self.active_sandboxes.read().await;
122 let sandbox = sandboxes
123 .get(plugin_id)
124 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
125
126 Ok(sandbox.get_resources().await)
127 }
128
129 pub async fn check_sandbox_health(&self, plugin_id: &PluginId) -> LoaderResult<SandboxHealth> {
131 let sandboxes = self.active_sandboxes.read().await;
132 let sandbox = sandboxes
133 .get(plugin_id)
134 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
135
136 Ok(sandbox.check_health().await)
137 }
138}
139
140pub struct SandboxInstance {
142 _plugin_id: PluginId,
144 _module: Module,
146 store: Store<WasiCtx>,
148 linker: Linker<WasiCtx>,
150 resources: SandboxResources,
152 health: SandboxHealth,
154 limits: ExecutionLimits,
156}
157
158impl SandboxInstance {
159 async fn new(engine: &Engine, context: &PluginLoadContext) -> LoaderResult<Self> {
161 let plugin_id = &context.plugin_id;
162
163 let module = Module::from_file(engine, &context.plugin_path)
165 .map_err(|e| PluginLoaderError::wasm(format!("Failed to load WASM module: {}", e)))?;
166
167 let wasi_ctx = WasiCtxBuilder::new().inherit_stderr().inherit_stdout().build();
169
170 let mut store = Store::new(engine, wasi_ctx);
172
173 let linker = Linker::new(engine);
175
176 linker
182 .instantiate(&mut store, &module)
183 .map_err(|e| PluginLoaderError::wasm(format!("Failed to instantiate module: {}", e)))?;
184
185 let plugin_capabilities = PluginCapabilities::default();
187 let limits = ExecutionLimits::from_capabilities(&plugin_capabilities);
188
189 Ok(Self {
190 _plugin_id: plugin_id.clone(),
191 _module: module,
192 store,
193 linker,
194 resources: SandboxResources::default(),
195 health: SandboxHealth::healthy(),
196 limits,
197 })
198 }
199
200 async fn stub_new(context: &PluginLoadContext) -> LoaderResult<Self> {
202 let plugin_id = &context.plugin_id;
203
204 let module = Module::new(&Engine::default(), [])
206 .map_err(|e| PluginLoaderError::wasm(format!("Failed to create stub module: {}", e)))?;
207
208 let wasi_ctx = WasiCtxBuilder::new().inherit_stderr().inherit_stdout().build();
209
210 let store = Store::new(&Engine::default(), wasi_ctx);
211 let linker = Linker::new(&Engine::default());
212
213 let plugin_capabilities = PluginCapabilities::default();
214 let limits = ExecutionLimits::from_capabilities(&plugin_capabilities);
215
216 Ok(Self {
217 _plugin_id: plugin_id.clone(),
218 _module: module,
219 store,
220 linker,
221 resources: SandboxResources::default(),
222 health: SandboxHealth::healthy(),
223 limits,
224 })
225 }
226
227 async fn execute_function(
229 &mut self,
230 function_name: &str,
231 context: &PluginContext,
232 input: &[u8],
233 ) -> LoaderResult<PluginResult<serde_json::Value>> {
234 self.resources.execution_count += 1;
236 self.resources.last_execution = chrono::Utc::now();
237
238 if self.resources.execution_count > self.limits.max_executions {
240 return Err(PluginLoaderError::resource_limit(format!(
241 "Maximum executions exceeded: {} allowed, {} used",
242 self.limits.max_executions, self.resources.execution_count
243 )));
244 }
245
246 let time_since_last = chrono::Utc::now().signed_duration_since(self.resources.created_at);
248 let time_since_last_std =
249 std::time::Duration::from_secs(time_since_last.num_seconds() as u64);
250 if time_since_last_std > self.limits.max_lifetime {
251 return Err(PluginLoaderError::resource_limit(format!(
252 "Maximum lifetime exceeded: {}s allowed, {}s used",
253 self.limits.max_lifetime.as_secs(),
254 time_since_last_std.as_secs()
255 )));
256 }
257
258 let start_time = std::time::Instant::now();
260
261 let _func = self.linker.get(&mut self.store, "", function_name).ok_or_else(|| {
263 PluginLoaderError::execution(format!("Function '{}' not found", function_name))
264 })?;
265
266 let result = self.call_wasm_function(function_name, context, input).await;
268
269 let execution_time = start_time.elapsed();
271 self.resources.total_execution_time += execution_time;
272 self.resources.last_execution_time = execution_time;
273
274 if execution_time > self.resources.max_execution_time {
275 self.resources.max_execution_time = execution_time;
276 }
277
278 match result {
279 Ok(data) => {
280 self.resources.success_count += 1;
281 Ok(PluginResult::success(data, execution_time.as_millis() as u64))
282 }
283 Err(e) => {
284 self.resources.error_count += 1;
285 Ok(PluginResult::failure(e.to_string(), execution_time.as_millis() as u64))
286 }
287 }
288 }
289
290 async fn call_wasm_function(
292 &mut self,
293 function_name: &str,
294 context: &PluginContext,
295 input: &[u8],
296 ) -> Result<serde_json::Value, String> {
297 let context_json = serde_json::to_string(context)
299 .map_err(|e| format!("Failed to serialize context: {}", e))?;
300 let combined_input = format!("{}\n{}", context_json, String::from_utf8_lossy(input));
301
302 let func_extern = self
304 .linker
305 .get(&mut self.store, "", function_name)
306 .ok_or_else(|| format!("Function '{}' not found in WASM module", function_name))?;
307 let func = func_extern
308 .into_func()
309 .ok_or_else(|| format!("Export '{}' is not a function", function_name))?;
310
311 let input_bytes = combined_input.as_bytes();
313 let input_len = input_bytes.len() as i32;
314
315 let alloc_extern = self.linker.get(&mut self.store, "", "alloc").ok_or_else(|| {
317 "WASM module must export an 'alloc' function for memory allocation".to_string()
318 })?;
319 let alloc_func = alloc_extern
320 .into_func()
321 .ok_or_else(|| "Export 'alloc' is not a function".to_string())?;
322
323 let mut alloc_result = [wasmtime::Val::I32(0)];
324 alloc_func
325 .call(&mut self.store, &[wasmtime::Val::I32(input_len)], &mut alloc_result)
326 .map_err(|e| format!("Failed to allocate memory for input: {}", e))?;
327
328 let input_ptr = match alloc_result[0] {
329 wasmtime::Val::I32(ptr) => ptr,
330 _ => return Err("alloc function did not return a valid pointer".to_string()),
331 };
332
333 let memory_extern = self
335 .linker
336 .get(&mut self.store, "", "memory")
337 .ok_or_else(|| "WASM module must export a 'memory'".to_string())?;
338 let memory = memory_extern
339 .into_memory()
340 .ok_or_else(|| "Export 'memory' is not a memory".to_string())?;
341
342 memory
343 .write(&mut self.store, input_ptr as usize, input_bytes)
344 .map_err(|e| format!("Failed to write input to WASM memory: {}", e))?;
345
346 let mut func_result = [wasmtime::Val::I32(0), wasmtime::Val::I32(0)];
348 func.call(
349 &mut self.store,
350 &[wasmtime::Val::I32(input_ptr), wasmtime::Val::I32(input_len)],
351 &mut func_result,
352 )
353 .map_err(|e| format!("Failed to call WASM function '{}': {}", function_name, e))?;
354
355 let output_ptr = match func_result[0] {
357 wasmtime::Val::I32(ptr) => ptr,
358 _ => {
359 return Err(format!(
360 "Function '{}' did not return a valid output pointer",
361 function_name
362 ))
363 }
364 };
365
366 let output_len = match func_result[1] {
367 wasmtime::Val::I32(len) => len,
368 _ => {
369 return Err(format!(
370 "Function '{}' did not return a valid output length",
371 function_name
372 ))
373 }
374 };
375
376 let mut output_bytes = vec![0u8; output_len as usize];
378 memory
379 .read(&mut self.store, output_ptr as usize, &mut output_bytes)
380 .map_err(|e| format!("Failed to read output from WASM memory: {}", e))?;
381
382 if let Some(dealloc_extern) = self.linker.get(&mut self.store, "", "dealloc") {
384 if let Some(dealloc_func) = dealloc_extern.into_func() {
385 let _ = dealloc_func.call(
386 &mut self.store,
387 &[wasmtime::Val::I32(input_ptr), wasmtime::Val::I32(input_len)],
388 &mut [],
389 );
390 let _ = dealloc_func.call(
391 &mut self.store,
392 &[
393 wasmtime::Val::I32(output_ptr),
394 wasmtime::Val::I32(output_len),
395 ],
396 &mut [],
397 );
398 }
399 }
400
401 let output_str = String::from_utf8(output_bytes)
403 .map_err(|e| format!("Failed to convert output to string: {}", e))?;
404
405 serde_json::from_str(&output_str)
406 .map_err(|e| format!("Failed to parse WASM output as JSON: {}", e))
407 }
408
409 async fn get_health(&self) -> PluginHealth {
411 if self.health.is_healthy {
412 PluginHealth::healthy(
413 "Sandbox is healthy".to_string(),
414 PluginMetrics {
415 total_executions: self.resources.execution_count,
416 successful_executions: self.resources.success_count,
417 failed_executions: self.resources.error_count,
418 avg_execution_time_ms: self.resources.avg_execution_time_ms(),
419 max_execution_time_ms: self.resources.max_execution_time.as_millis() as u64,
420 memory_usage_bytes: self.resources.memory_usage,
421 peak_memory_usage_bytes: self.resources.peak_memory_usage,
422 },
423 )
424 } else {
425 PluginHealth::unhealthy(
426 PluginState::Error,
427 self.health.last_error.clone(),
428 PluginMetrics::default(),
429 )
430 }
431 }
432
433 async fn get_resources(&self) -> SandboxResources {
435 self.resources.clone()
436 }
437
438 async fn check_health(&self) -> SandboxHealth {
440 self.health.clone()
441 }
442
443 async fn destroy(&mut self) -> LoaderResult<()> {
445 self.health.is_healthy = false;
447 self.health.last_error = "Sandbox destroyed".to_string();
448 Ok(())
449 }
450}
451
452#[derive(Debug, Clone, Default)]
454pub struct SandboxResources {
455 pub execution_count: u64,
457 pub success_count: u64,
459 pub error_count: u64,
461 pub total_execution_time: std::time::Duration,
463 pub last_execution_time: std::time::Duration,
465 pub max_execution_time: std::time::Duration,
467 pub memory_usage: usize,
469 pub peak_memory_usage: usize,
471 pub created_at: chrono::DateTime<chrono::Utc>,
473 pub last_execution: chrono::DateTime<chrono::Utc>,
475}
476
477impl SandboxResources {
478 pub fn avg_execution_time_ms(&self) -> f64 {
480 if self.execution_count == 0 {
481 0.0
482 } else {
483 self.total_execution_time.as_millis() as f64 / self.execution_count as f64
484 }
485 }
486
487 pub fn success_rate(&self) -> f64 {
489 if self.execution_count == 0 {
490 0.0
491 } else {
492 (self.success_count as f64 / self.execution_count as f64) * 100.0
493 }
494 }
495
496 pub fn check_limits(&self, limits: &ExecutionLimits) -> bool {
498 self.execution_count <= limits.max_executions
499 && self.memory_usage <= limits.max_memory_bytes
500 && self.total_execution_time <= limits.max_total_time
501 }
502}
503
504#[derive(Debug, Clone)]
506pub struct SandboxHealth {
507 pub is_healthy: bool,
509 pub last_check: chrono::DateTime<chrono::Utc>,
511 pub last_error: String,
513 pub checks: Vec<HealthCheck>,
515}
516
517impl SandboxHealth {
518 pub fn healthy() -> Self {
520 Self {
521 is_healthy: true,
522 last_check: chrono::Utc::now(),
523 last_error: String::new(),
524 checks: Vec::new(),
525 }
526 }
527
528 pub fn unhealthy<S: Into<String>>(error: S) -> Self {
530 Self {
531 is_healthy: false,
532 last_check: chrono::Utc::now(),
533 last_error: error.into(),
534 checks: Vec::new(),
535 }
536 }
537
538 pub fn add_check(&mut self, check: HealthCheck) {
540 let failed = !check.passed;
541 let error_message = if failed {
542 Some(check.message.clone())
543 } else {
544 None
545 };
546
547 self.checks.push(check);
548 self.last_check = chrono::Utc::now();
549
550 if failed {
552 self.is_healthy = false;
553 if let Some(msg) = error_message {
554 self.last_error = msg;
555 }
556 }
557 }
558
559 pub async fn run_checks(&mut self, resources: &SandboxResources, limits: &ExecutionLimits) {
561 self.checks.clear();
562
563 let memory_check = if resources.memory_usage <= limits.max_memory_bytes {
565 HealthCheck::pass("Memory usage within limits")
566 } else {
567 HealthCheck::fail(format!(
568 "Memory usage {} exceeds limit {}",
569 resources.memory_usage, limits.max_memory_bytes
570 ))
571 };
572 self.add_check(memory_check);
573
574 let execution_check = if resources.execution_count <= limits.max_executions {
576 HealthCheck::pass("Execution count within limits")
577 } else {
578 HealthCheck::fail(format!(
579 "Execution count {} exceeds limit {}",
580 resources.execution_count, limits.max_executions
581 ))
582 };
583 self.add_check(execution_check);
584
585 let success_rate = resources.success_rate();
587 let success_check = if success_rate >= 90.0 {
588 HealthCheck::pass(format!("Success rate: {:.1}%", success_rate))
589 } else {
590 HealthCheck::fail(format!("Low success rate: {:.1}%", success_rate))
591 };
592 self.add_check(success_check);
593 }
594}
595
596#[derive(Debug, Clone)]
598pub struct HealthCheck {
599 pub name: String,
601 pub passed: bool,
603 pub message: String,
605 pub timestamp: chrono::DateTime<chrono::Utc>,
607}
608
609impl HealthCheck {
610 pub fn pass<S: Into<String>>(message: S) -> Self {
612 Self {
613 name: "health_check".to_string(),
614 passed: true,
615 message: message.into(),
616 timestamp: chrono::Utc::now(),
617 }
618 }
619
620 pub fn fail<S: Into<String>>(message: S) -> Self {
622 Self {
623 name: "health_check".to_string(),
624 passed: false,
625 message: message.into(),
626 timestamp: chrono::Utc::now(),
627 }
628 }
629}
630
631#[derive(Debug, Clone)]
633pub struct ExecutionLimits {
634 pub max_executions: u64,
636 pub max_total_time: std::time::Duration,
638 pub max_lifetime: std::time::Duration,
640 pub max_memory_bytes: usize,
642 pub max_cpu_time_per_execution: std::time::Duration,
644}
645
646impl Default for ExecutionLimits {
647 fn default() -> Self {
648 Self {
649 max_executions: 1000,
650 max_total_time: std::time::Duration::from_secs(300), max_lifetime: std::time::Duration::from_secs(3600), max_memory_bytes: 10 * 1024 * 1024, max_cpu_time_per_execution: std::time::Duration::from_secs(5),
654 }
655 }
656}
657
658impl ExecutionLimits {
659 pub fn from_capabilities(capabilities: &PluginCapabilities) -> Self {
661 Self {
662 max_executions: 10000, max_total_time: std::time::Duration::from_secs(600), max_lifetime: std::time::Duration::from_secs(86400), max_memory_bytes: capabilities.resources.max_memory_bytes,
666 max_cpu_time_per_execution: std::time::Duration::from_millis(
667 (capabilities.resources.max_cpu_percent * 1000.0) as u64,
668 ),
669 }
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use super::*;
676
677 #[tokio::test]
678 async fn test_sandbox_resources() {
679 let resources = SandboxResources {
680 execution_count: 10,
681 success_count: 8,
682 error_count: 2,
683 total_execution_time: std::time::Duration::from_millis(1000),
684 ..Default::default()
685 };
686
687 assert_eq!(resources.avg_execution_time_ms(), 100.0);
688 assert_eq!(resources.success_rate(), 80.0);
689 }
690
691 #[tokio::test]
692 async fn test_execution_limits() {
693 let limits = ExecutionLimits::default();
694 assert_eq!(limits.max_executions, 1000);
695 assert_eq!(limits.max_memory_bytes, 10 * 1024 * 1024);
696 }
697
698 #[tokio::test]
699 async fn test_health_checks() {
700 let mut health = SandboxHealth::healthy();
701 assert!(health.is_healthy);
702
703 health.add_check(HealthCheck::fail("Test failure"));
704 assert!(!health.is_healthy);
705 assert_eq!(health.last_error, "Test failure");
706 }
707
708 #[tokio::test]
709 async fn test_plugin_sandbox_creation() {
710 let config = PluginLoaderConfig::default();
711 let sandbox = PluginSandbox::new(config);
712
713 let active = sandbox.list_active_sandboxes().await;
714 assert!(active.is_empty());
715 }
716}