1use super::*;
7use mockforge_plugin_core::{
8 PluginCapabilities, PluginContext, PluginHealth, PluginId, PluginMetrics, PluginResult,
9 PluginState,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use wasmtime::{Engine, Linker, Module, Store};
15use wasmtime_wasi::{WasiCtx, WasiCtxBuilder};
16
17pub struct PluginSandbox {
19 engine: Option<Arc<Engine>>,
21 _config: PluginLoaderConfig,
23 active_sandboxes: RwLock<HashMap<PluginId, SandboxInstance>>,
25}
26
27impl PluginSandbox {
28 pub fn new(config: PluginLoaderConfig) -> Self {
30 let engine = Some(Arc::new(Engine::default()));
32
33 Self {
34 engine,
35 _config: config,
36 active_sandboxes: RwLock::new(HashMap::new()),
37 }
38 }
39
40 pub async fn create_plugin_instance(
42 &self,
43 context: &PluginLoadContext,
44 ) -> LoaderResult<PluginInstance> {
45 let plugin_id = &context.plugin_id;
46
47 {
49 let sandboxes = self.active_sandboxes.read().await;
50 if sandboxes.contains_key(plugin_id) {
51 return Err(PluginLoaderError::already_loaded(plugin_id.clone()));
52 }
53 }
54
55 let sandbox = if let Some(ref engine) = self.engine {
57 SandboxInstance::new(engine, context).await?
58 } else {
59 SandboxInstance::stub_new(context).await?
61 };
62
63 let mut sandboxes = self.active_sandboxes.write().await;
65 sandboxes.insert(plugin_id.clone(), sandbox);
66
67 let mut core_instance = PluginInstance::new(plugin_id.clone(), context.manifest.clone());
69 core_instance.set_state(PluginState::Ready);
70
71 Ok(core_instance)
72 }
73
74 pub async fn execute_plugin_function(
76 &self,
77 plugin_id: &PluginId,
78 function_name: &str,
79 context: &PluginContext,
80 input: &[u8],
81 ) -> LoaderResult<PluginResult<serde_json::Value>> {
82 let mut sandboxes = self.active_sandboxes.write().await;
83 let sandbox = sandboxes
84 .get_mut(plugin_id)
85 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
86
87 sandbox.execute_function(function_name, context, input).await
88 }
89
90 pub async fn get_plugin_health(&self, plugin_id: &PluginId) -> LoaderResult<PluginHealth> {
92 let sandboxes = self.active_sandboxes.read().await;
93 let sandbox = sandboxes
94 .get(plugin_id)
95 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
96
97 Ok(sandbox.get_health().await)
98 }
99
100 pub async fn destroy_sandbox(&self, plugin_id: &PluginId) -> LoaderResult<()> {
102 let mut sandboxes = self.active_sandboxes.write().await;
103 if let Some(mut sandbox) = sandboxes.remove(plugin_id) {
104 sandbox.destroy().await?;
105 }
106 Ok(())
107 }
108
109 pub async fn list_active_sandboxes(&self) -> Vec<PluginId> {
111 let sandboxes = self.active_sandboxes.read().await;
112 sandboxes.keys().cloned().collect()
113 }
114
115 pub async fn get_sandbox_resources(
117 &self,
118 plugin_id: &PluginId,
119 ) -> LoaderResult<SandboxResources> {
120 let sandboxes = self.active_sandboxes.read().await;
121 let sandbox = sandboxes
122 .get(plugin_id)
123 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
124
125 Ok(sandbox.get_resources().await)
126 }
127
128 pub async fn check_sandbox_health(&self, plugin_id: &PluginId) -> LoaderResult<SandboxHealth> {
130 let sandboxes = self.active_sandboxes.read().await;
131 let sandbox = sandboxes
132 .get(plugin_id)
133 .ok_or_else(|| PluginLoaderError::not_found(plugin_id.clone()))?;
134
135 Ok(sandbox.check_health().await)
136 }
137}
138
139pub struct SandboxInstance {
141 _plugin_id: PluginId,
143 _module: Module,
145 store: Store<WasiCtx>,
147 linker: Linker<WasiCtx>,
149 resources: SandboxResources,
151 health: SandboxHealth,
153 limits: ExecutionLimits,
155}
156
157impl SandboxInstance {
158 async fn new(engine: &Engine, context: &PluginLoadContext) -> LoaderResult<Self> {
160 let plugin_id = &context.plugin_id;
161
162 let module = Module::from_file(engine, &context.plugin_path)
164 .map_err(|e| PluginLoaderError::wasm(format!("Failed to load WASM module: {}", e)))?;
165
166 let wasi_ctx = WasiCtxBuilder::new().inherit_stderr().inherit_stdout().build();
168
169 let mut store = Store::new(engine, wasi_ctx);
171
172 let linker = Linker::new(engine);
174
175 linker
181 .instantiate(&mut store, &module)
182 .map_err(|e| PluginLoaderError::wasm(format!("Failed to instantiate module: {}", e)))?;
183
184 let plugin_capabilities = PluginCapabilities::default();
186 let limits = ExecutionLimits::from_capabilities(&plugin_capabilities);
187
188 Ok(Self {
189 _plugin_id: plugin_id.clone(),
190 _module: module,
191 store,
192 linker,
193 resources: SandboxResources::default(),
194 health: SandboxHealth::healthy(),
195 limits,
196 })
197 }
198
199 async fn stub_new(context: &PluginLoadContext) -> LoaderResult<Self> {
201 let plugin_id = &context.plugin_id;
202
203 let module = Module::new(&Engine::default(), [])
205 .map_err(|e| PluginLoaderError::wasm(format!("Failed to create stub module: {}", e)))?;
206
207 let wasi_ctx = WasiCtxBuilder::new().inherit_stderr().inherit_stdout().build();
208
209 let store = Store::new(&Engine::default(), wasi_ctx);
210 let linker = Linker::new(&Engine::default());
211
212 let plugin_capabilities = PluginCapabilities::default();
213 let limits = ExecutionLimits::from_capabilities(&plugin_capabilities);
214
215 Ok(Self {
216 _plugin_id: plugin_id.clone(),
217 _module: module,
218 store,
219 linker,
220 resources: SandboxResources::default(),
221 health: SandboxHealth::healthy(),
222 limits,
223 })
224 }
225
226 async fn execute_function(
228 &mut self,
229 function_name: &str,
230 context: &PluginContext,
231 input: &[u8],
232 ) -> LoaderResult<PluginResult<serde_json::Value>> {
233 self.resources.execution_count += 1;
235 self.resources.last_execution = chrono::Utc::now();
236
237 if self.resources.execution_count > self.limits.max_executions {
239 return Err(PluginLoaderError::resource_limit(format!(
240 "Maximum executions exceeded: {} allowed, {} used",
241 self.limits.max_executions, self.resources.execution_count
242 )));
243 }
244
245 let time_since_last = chrono::Utc::now().signed_duration_since(self.resources.created_at);
247 let time_since_last_std =
248 std::time::Duration::from_secs(time_since_last.num_seconds() as u64);
249 if time_since_last_std > self.limits.max_lifetime {
250 return Err(PluginLoaderError::resource_limit(format!(
251 "Maximum lifetime exceeded: {}s allowed, {}s used",
252 self.limits.max_lifetime.as_secs(),
253 time_since_last_std.as_secs()
254 )));
255 }
256
257 let start_time = std::time::Instant::now();
259
260 let _func = self.linker.get(&mut self.store, "", function_name).ok_or_else(|| {
262 PluginLoaderError::execution(format!("Function '{}' not found", function_name))
263 })?;
264
265 let result = self.call_wasm_function(function_name, context, input).await;
267
268 let execution_time = start_time.elapsed();
270 self.resources.total_execution_time += execution_time;
271 self.resources.last_execution_time = execution_time;
272
273 if execution_time > self.resources.max_execution_time {
274 self.resources.max_execution_time = execution_time;
275 }
276
277 match result {
278 Ok(data) => {
279 self.resources.success_count += 1;
280 Ok(PluginResult::success(data, execution_time.as_millis() as u64))
281 }
282 Err(e) => {
283 self.resources.error_count += 1;
284 Ok(PluginResult::failure(e.to_string(), execution_time.as_millis() as u64))
285 }
286 }
287 }
288
289 async fn call_wasm_function(
291 &mut self,
292 function_name: &str,
293 context: &PluginContext,
294 input: &[u8],
295 ) -> Result<serde_json::Value, String> {
296 let context_json = serde_json::to_string(context)
298 .map_err(|e| format!("Failed to serialize context: {}", e))?;
299 let combined_input = format!("{}\n{}", context_json, String::from_utf8_lossy(input));
300
301 let func_extern = self
303 .linker
304 .get(&mut self.store, "", function_name)
305 .ok_or_else(|| format!("Function '{}' not found in WASM module", function_name))?;
306 let func = func_extern
307 .into_func()
308 .ok_or_else(|| format!("Export '{}' is not a function", function_name))?;
309
310 let input_bytes = combined_input.as_bytes();
312 let input_len = input_bytes.len() as i32;
313
314 let alloc_extern = self.linker.get(&mut self.store, "", "alloc").ok_or_else(|| {
316 "WASM module must export an 'alloc' function for memory allocation".to_string()
317 })?;
318 let alloc_func = alloc_extern
319 .into_func()
320 .ok_or_else(|| "Export 'alloc' is not a function".to_string())?;
321
322 let mut alloc_result = [wasmtime::Val::I32(0)];
323 alloc_func
324 .call(&mut self.store, &[wasmtime::Val::I32(input_len)], &mut alloc_result)
325 .map_err(|e| format!("Failed to allocate memory for input: {}", e))?;
326
327 let input_ptr = match alloc_result[0] {
328 wasmtime::Val::I32(ptr) => ptr,
329 _ => return Err("alloc function did not return a valid pointer".to_string()),
330 };
331
332 let memory_extern = self
334 .linker
335 .get(&mut self.store, "", "memory")
336 .ok_or_else(|| "WASM module must export a 'memory'".to_string())?;
337 let memory = memory_extern
338 .into_memory()
339 .ok_or_else(|| "Export 'memory' is not a memory".to_string())?;
340
341 memory
342 .write(&mut self.store, input_ptr as usize, input_bytes)
343 .map_err(|e| format!("Failed to write input to WASM memory: {}", e))?;
344
345 let mut func_result = [wasmtime::Val::I32(0), wasmtime::Val::I32(0)];
347 func.call(
348 &mut self.store,
349 &[wasmtime::Val::I32(input_ptr), wasmtime::Val::I32(input_len)],
350 &mut func_result,
351 )
352 .map_err(|e| format!("Failed to call WASM function '{}': {}", function_name, e))?;
353
354 let output_ptr = match func_result[0] {
356 wasmtime::Val::I32(ptr) => ptr,
357 _ => {
358 return Err(format!(
359 "Function '{}' did not return a valid output pointer",
360 function_name
361 ))
362 }
363 };
364
365 let output_len = match func_result[1] {
366 wasmtime::Val::I32(len) => len,
367 _ => {
368 return Err(format!(
369 "Function '{}' did not return a valid output length",
370 function_name
371 ))
372 }
373 };
374
375 let mut output_bytes = vec![0u8; output_len as usize];
377 memory
378 .read(&mut self.store, output_ptr as usize, &mut output_bytes)
379 .map_err(|e| format!("Failed to read output from WASM memory: {}", e))?;
380
381 if let Some(dealloc_extern) = self.linker.get(&mut self.store, "", "dealloc") {
383 if let Some(dealloc_func) = dealloc_extern.into_func() {
384 let _ = dealloc_func.call(
385 &mut self.store,
386 &[wasmtime::Val::I32(input_ptr), wasmtime::Val::I32(input_len)],
387 &mut [],
388 );
389 let _ = dealloc_func.call(
390 &mut self.store,
391 &[
392 wasmtime::Val::I32(output_ptr),
393 wasmtime::Val::I32(output_len),
394 ],
395 &mut [],
396 );
397 }
398 }
399
400 let output_str = String::from_utf8(output_bytes)
402 .map_err(|e| format!("Failed to convert output to string: {}", e))?;
403
404 serde_json::from_str(&output_str)
405 .map_err(|e| format!("Failed to parse WASM output as JSON: {}", e))
406 }
407
408 async fn get_health(&self) -> PluginHealth {
410 if self.health.is_healthy {
411 PluginHealth::healthy(
412 "Sandbox is healthy".to_string(),
413 PluginMetrics {
414 total_executions: self.resources.execution_count,
415 successful_executions: self.resources.success_count,
416 failed_executions: self.resources.error_count,
417 avg_execution_time_ms: self.resources.avg_execution_time_ms(),
418 max_execution_time_ms: self.resources.max_execution_time.as_millis() as u64,
419 memory_usage_bytes: self.resources.memory_usage,
420 peak_memory_usage_bytes: self.resources.peak_memory_usage,
421 },
422 )
423 } else {
424 PluginHealth::unhealthy(
425 PluginState::Error,
426 self.health.last_error.clone(),
427 PluginMetrics::default(),
428 )
429 }
430 }
431
432 async fn get_resources(&self) -> SandboxResources {
434 self.resources.clone()
435 }
436
437 async fn check_health(&self) -> SandboxHealth {
439 self.health.clone()
440 }
441
442 async fn destroy(&mut self) -> LoaderResult<()> {
444 self.health.is_healthy = false;
446 self.health.last_error = "Sandbox destroyed".to_string();
447 Ok(())
448 }
449}
450
451#[derive(Debug, Clone, Default)]
453pub struct SandboxResources {
454 pub execution_count: u64,
456 pub success_count: u64,
458 pub error_count: u64,
460 pub total_execution_time: std::time::Duration,
462 pub last_execution_time: std::time::Duration,
464 pub max_execution_time: std::time::Duration,
466 pub memory_usage: usize,
468 pub peak_memory_usage: usize,
470 pub created_at: chrono::DateTime<chrono::Utc>,
472 pub last_execution: chrono::DateTime<chrono::Utc>,
474}
475
476impl SandboxResources {
477 pub fn avg_execution_time_ms(&self) -> f64 {
479 if self.execution_count == 0 {
480 0.0
481 } else {
482 self.total_execution_time.as_millis() as f64 / self.execution_count as f64
483 }
484 }
485
486 pub fn success_rate(&self) -> f64 {
488 if self.execution_count == 0 {
489 0.0
490 } else {
491 (self.success_count as f64 / self.execution_count as f64) * 100.0
492 }
493 }
494
495 pub fn check_limits(&self, limits: &ExecutionLimits) -> bool {
497 self.execution_count <= limits.max_executions
498 && self.memory_usage <= limits.max_memory_bytes
499 && self.total_execution_time <= limits.max_total_time
500 }
501}
502
503#[derive(Debug, Clone)]
505pub struct SandboxHealth {
506 pub is_healthy: bool,
508 pub last_check: chrono::DateTime<chrono::Utc>,
510 pub last_error: String,
512 pub checks: Vec<HealthCheck>,
514}
515
516impl SandboxHealth {
517 pub fn healthy() -> Self {
519 Self {
520 is_healthy: true,
521 last_check: chrono::Utc::now(),
522 last_error: String::new(),
523 checks: Vec::new(),
524 }
525 }
526
527 pub fn unhealthy<S: Into<String>>(error: S) -> Self {
529 Self {
530 is_healthy: false,
531 last_check: chrono::Utc::now(),
532 last_error: error.into(),
533 checks: Vec::new(),
534 }
535 }
536
537 pub fn add_check(&mut self, check: HealthCheck) {
539 let failed = !check.passed;
540 let error_message = if failed {
541 Some(check.message.clone())
542 } else {
543 None
544 };
545
546 self.checks.push(check);
547 self.last_check = chrono::Utc::now();
548
549 if failed {
551 self.is_healthy = false;
552 if let Some(msg) = error_message {
553 self.last_error = msg;
554 }
555 }
556 }
557
558 pub async fn run_checks(&mut self, resources: &SandboxResources, limits: &ExecutionLimits) {
560 self.checks.clear();
561
562 let memory_check = if resources.memory_usage <= limits.max_memory_bytes {
564 HealthCheck::pass("Memory usage within limits")
565 } else {
566 HealthCheck::fail(format!(
567 "Memory usage {} exceeds limit {}",
568 resources.memory_usage, limits.max_memory_bytes
569 ))
570 };
571 self.add_check(memory_check);
572
573 let execution_check = if resources.execution_count <= limits.max_executions {
575 HealthCheck::pass("Execution count within limits")
576 } else {
577 HealthCheck::fail(format!(
578 "Execution count {} exceeds limit {}",
579 resources.execution_count, limits.max_executions
580 ))
581 };
582 self.add_check(execution_check);
583
584 let success_rate = resources.success_rate();
586 let success_check = if success_rate >= 90.0 {
587 HealthCheck::pass(format!("Success rate: {:.1}%", success_rate))
588 } else {
589 HealthCheck::fail(format!("Low success rate: {:.1}%", success_rate))
590 };
591 self.add_check(success_check);
592 }
593}
594
595#[derive(Debug, Clone)]
597pub struct HealthCheck {
598 pub name: String,
600 pub passed: bool,
602 pub message: String,
604 pub timestamp: chrono::DateTime<chrono::Utc>,
606}
607
608impl HealthCheck {
609 pub fn pass<S: Into<String>>(message: S) -> Self {
611 Self {
612 name: "health_check".to_string(),
613 passed: true,
614 message: message.into(),
615 timestamp: chrono::Utc::now(),
616 }
617 }
618
619 pub fn fail<S: Into<String>>(message: S) -> Self {
621 Self {
622 name: "health_check".to_string(),
623 passed: false,
624 message: message.into(),
625 timestamp: chrono::Utc::now(),
626 }
627 }
628}
629
630#[derive(Debug, Clone)]
632pub struct ExecutionLimits {
633 pub max_executions: u64,
635 pub max_total_time: std::time::Duration,
637 pub max_lifetime: std::time::Duration,
639 pub max_memory_bytes: usize,
641 pub max_cpu_time_per_execution: std::time::Duration,
643}
644
645impl Default for ExecutionLimits {
646 fn default() -> Self {
647 Self {
648 max_executions: 1000,
649 max_total_time: std::time::Duration::from_secs(300), max_lifetime: std::time::Duration::from_secs(3600), max_memory_bytes: 10 * 1024 * 1024, max_cpu_time_per_execution: std::time::Duration::from_secs(5),
653 }
654 }
655}
656
657impl ExecutionLimits {
658 pub fn from_capabilities(capabilities: &PluginCapabilities) -> Self {
660 Self {
661 max_executions: 10000, max_total_time: std::time::Duration::from_secs(600), max_lifetime: std::time::Duration::from_secs(86400), max_memory_bytes: capabilities.resources.max_memory_bytes,
665 max_cpu_time_per_execution: std::time::Duration::from_millis(
666 (capabilities.resources.max_cpu_percent * 1000.0) as u64,
667 ),
668 }
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675
676 #[tokio::test]
677 async fn test_sandbox_resources() {
678 let resources = SandboxResources {
679 execution_count: 10,
680 success_count: 8,
681 error_count: 2,
682 total_execution_time: std::time::Duration::from_millis(1000),
683 ..Default::default()
684 };
685
686 assert_eq!(resources.avg_execution_time_ms(), 100.0);
687 assert_eq!(resources.success_rate(), 80.0);
688 }
689
690 #[tokio::test]
691 async fn test_execution_limits() {
692 let limits = ExecutionLimits::default();
693 assert_eq!(limits.max_executions, 1000);
694 assert_eq!(limits.max_memory_bytes, 10 * 1024 * 1024);
695 }
696
697 #[tokio::test]
698 async fn test_health_checks() {
699 let mut health = SandboxHealth::healthy();
700 assert!(health.is_healthy);
701
702 health.add_check(HealthCheck::fail("Test failure"));
703 assert!(!health.is_healthy);
704 assert_eq!(health.last_error, "Test failure");
705 }
706
707 #[tokio::test]
708 async fn test_plugin_sandbox_creation() {
709 let config = PluginLoaderConfig::default();
710 let sandbox = PluginSandbox::new(config);
711
712 let active = sandbox.list_active_sandboxes().await;
713 assert!(active.is_empty());
714 }
715}