1use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, Semaphore};
12use tracing::{debug, info, warn};
13
14#[cfg(feature = "wasm")]
15use wasmtime::{Engine, Instance, Module, Store, TypedFunc};
16
17use crate::event::StreamEvent;
18
19pub mod wasm_edge_computing_runtime;
21pub mod wasm_edge_computing_sandbox;
22
23pub use wasm_edge_computing_runtime::{
25 AllocationConstraints, AllocationEvent, AllocationPlan, CacheOptimizer, CachedModule,
26 DependencyGraph, EdgeResourceOptimizer, ExecutionProfile, ModelType, NodeAssignment,
27 OptimizationMetrics, OptimizationStrategy, PredictionEngine, PrefetchPredictor, PriorityLevel,
28 ResourceAllocation, ResourceModel, ResourcePrediction, SeasonalityType, TemporalPattern,
29 WasmIntelligentCache, WorkloadDescription, WorkloadFeatures,
30};
31
32pub use wasm_edge_computing_sandbox::{
34 AdaptivePolicy, AdaptiveSecuritySandbox, BehaviorAnomaly, BehaviorProfile, BehavioralAnalyzer,
35 ExecutionBehavior, ImpactLevel, MemoryAccessPattern, NetworkActivityLevel, Priority,
36 SecurityAssessment, SecurityMetrics, SecurityRecommendation, ThreatDetector, ThreatIndicator,
37 ThreatSignature, ThreatType,
38};
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct WasmEdgeConfig {
43 pub optimization_level: OptimizationLevel,
44 pub resource_limits: WasmResourceLimits,
45 pub enable_caching: bool,
46 pub enable_jit: bool,
47 pub security_sandbox: bool,
48 pub allowed_imports: Vec<String>,
49 #[serde(default)]
51 pub max_concurrent_instances: usize,
52 #[serde(default)]
53 pub memory_limit_mb: u64,
54 #[serde(default)]
55 pub execution_timeout_ms: u64,
56 #[serde(default)]
57 pub enable_hot_reload: bool,
58 #[serde(default)]
59 pub edge_locations: Vec<EdgeLocation>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct WasmResourceLimits {
65 pub max_memory_bytes: u64,
66 pub max_execution_time_ms: u64,
67 pub max_stack_size_bytes: u64,
68 pub max_table_elements: u32,
69 pub enable_simd: bool,
70 pub enable_threads: bool,
71 #[serde(default)]
73 pub max_memory_pages: u32,
74 #[serde(default)]
75 pub max_instances: u32,
76 #[serde(default)]
77 pub max_tables: u32,
78 #[serde(default)]
79 pub max_memories: u32,
80 #[serde(default)]
81 pub max_globals: u32,
82 #[serde(default)]
83 pub max_functions: u32,
84 #[serde(default)]
85 pub max_imports: u32,
86 #[serde(default)]
87 pub max_exports: u32,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct EdgeLocation {
93 pub id: String,
94 pub region: String,
95 pub latency_ms: f64,
96 pub capacity_factor: f64,
97 pub available_resources: ResourceMetrics,
98 pub specializations: Vec<ProcessingSpecialization>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum ProcessingSpecialization {
104 RdfProcessing,
105 SparqlOptimization,
106 GraphAnalytics,
107 MachineLearning,
108 Cryptography,
109 ComputerVision,
110 NaturalLanguage,
111 QuantumSimulation,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub enum OptimizationLevel {
117 Debug,
118 Release,
119 Maximum,
120 Adaptive,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct ResourceMetrics {
126 pub cpu_cores: u32,
127 pub memory_mb: u64,
128 pub storage_gb: u64,
129 pub network_mbps: f64,
130 pub gpu_units: u32,
131 pub quantum_qubits: u32,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct WasmPlugin {
137 pub id: String,
138 pub name: String,
139 pub version: String,
140 pub description: String,
141 pub author: String,
142 pub capabilities: Vec<PluginCapability>,
143 pub wasm_bytes: Vec<u8>,
144 pub schema: PluginSchema,
145 pub performance_profile: PerformanceProfile,
146 pub security_level: SecurityLevel,
147 pub created_at: DateTime<Utc>,
148 pub updated_at: DateTime<Utc>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum PluginCapability {
154 EventProcessing,
155 DataTransformation,
156 Filtering,
157 Aggregation,
158 Enrichment,
159 Validation,
160 Compression,
161 Encryption,
162 Analytics,
163 MachineLearning,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct PluginSchema {
169 pub input_types: Vec<String>,
170 pub output_types: Vec<String>,
171 pub configuration_schema: serde_json::Value,
172 pub required_imports: Vec<String>,
173 pub exported_functions: Vec<String>,
174}
175
176impl Default for PluginSchema {
177 fn default() -> Self {
178 Self {
179 input_types: vec!["StreamEvent".to_string()],
180 output_types: vec!["StreamEvent".to_string()],
181 configuration_schema: serde_json::json!({}),
182 required_imports: vec![],
183 exported_functions: vec!["process_events".to_string()],
184 }
185 }
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct PerformanceProfile {
191 pub average_execution_time_us: u64,
192 pub memory_usage_mb: f64,
193 pub cpu_intensity: f64,
194 pub throughput_events_per_second: u64,
195 pub scalability_factor: f64,
196}
197
198impl Default for PerformanceProfile {
199 fn default() -> Self {
200 Self {
201 average_execution_time_us: 100,
202 memory_usage_mb: 1.0,
203 cpu_intensity: 0.5,
204 throughput_events_per_second: 1000,
205 scalability_factor: 1.0,
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
212pub enum SecurityLevel {
213 Untrusted,
214 BasicSandbox,
215 Standard,
216 Enhanced,
217 TrustedVerified,
218 CriticalSecurity,
219 High,
220}
221
222pub struct WasmExecutionContext {
224 #[cfg(feature = "wasm")]
225 pub engine: Engine,
226 #[cfg(feature = "wasm")]
227 pub store: Store<WasmState>,
228 #[cfg(feature = "wasm")]
229 pub instance: Instance,
230 pub plugin_id: String,
231 pub execution_count: u64,
232 pub total_execution_time_us: u64,
233 pub last_execution: DateTime<Utc>,
234 pub resource_usage: ResourceMetrics,
235}
236
237impl std::fmt::Debug for WasmExecutionContext {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("WasmExecutionContext")
240 .field("plugin_id", &self.plugin_id)
241 .field("execution_count", &self.execution_count)
242 .field("total_execution_time_us", &self.total_execution_time_us)
243 .field("last_execution", &self.last_execution)
244 .field("resource_usage", &self.resource_usage)
245 .finish_non_exhaustive()
246 }
247}
248
249#[derive(Debug, Default)]
251pub struct WasmState {
252 pub event_count: u64,
253 pub memory_allocations: u64,
254 pub external_calls: u64,
255 pub start_time: Option<DateTime<Utc>>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct EdgeExecutionResult {
261 pub plugin_id: String,
262 pub execution_id: String,
263 pub input_events: Vec<StreamEvent>,
264 pub output_events: Vec<StreamEvent>,
265 pub execution_time_us: u64,
266 pub memory_used_mb: f64,
267 pub edge_location: String,
268 pub success: bool,
269 pub error_message: Option<String>,
270 pub metadata: HashMap<String, serde_json::Value>,
271}
272
273#[derive(Debug, Clone)]
275pub struct WasmProcessingResult {
276 pub output: Option<StreamEvent>,
277 pub latency_ms: f64,
278}
279
280#[derive(Debug, Clone)]
282pub struct WasmProcessorStats {
283 pub total_processed: u64,
284 pub average_latency_ms: f64,
285 pub active_plugins: usize,
286}
287
288#[derive(Debug, Clone)]
290pub struct PerformanceMetrics {
291 pub total_executions: u64,
292 pub total_execution_time_us: u64,
293 pub average_execution_time_us: f64,
294 pub max_execution_time_us: u64,
295 pub min_execution_time_us: u64,
296 pub success_rate: f64,
297 pub throughput_events_per_second: f64,
298 pub memory_efficiency: f64,
299 pub last_updated: DateTime<Utc>,
300}
301
302impl PerformanceMetrics {
303 fn new() -> Self {
304 Self {
305 total_executions: 0,
306 total_execution_time_us: 0,
307 average_execution_time_us: 0.0,
308 max_execution_time_us: 0,
309 min_execution_time_us: 0,
310 success_rate: 0.0,
311 throughput_events_per_second: 0.0,
312 memory_efficiency: 0.0,
313 last_updated: Utc::now(),
314 }
315 }
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize)]
320pub enum RiskLevel {
321 Low,
322 Medium,
323 High,
324 Critical,
325}
326
327#[derive(Debug)]
329pub struct SecurityManager {
330 trusted_plugins: RwLock<HashMap<String, SecurityLevel>>,
331 execution_policies: RwLock<HashMap<SecurityLevel, ExecutionPolicy>>,
332 audit_log: RwLock<Vec<SecurityAuditEntry>>,
333}
334
335#[derive(Debug, Clone)]
337pub struct ExecutionPolicy {
338 pub max_memory_pages: u32,
339 pub max_execution_time_ms: u64,
340 pub allowed_imports: Vec<String>,
341 pub network_access: bool,
342 pub file_system_access: bool,
343 pub crypto_operations: bool,
344 pub inter_plugin_communication: bool,
345}
346
347#[derive(Debug, Clone)]
349pub struct SecurityAuditEntry {
350 pub timestamp: DateTime<Utc>,
351 pub plugin_id: String,
352 pub action: String,
353 pub risk_level: RiskLevel,
354 pub details: String,
355}
356
357impl Default for SecurityManager {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363impl SecurityManager {
364 pub fn new() -> Self {
365 let mut execution_policies = HashMap::new();
366
367 execution_policies.insert(
368 SecurityLevel::Untrusted,
369 ExecutionPolicy {
370 max_memory_pages: 1,
371 max_execution_time_ms: 100,
372 allowed_imports: vec![],
373 network_access: false,
374 file_system_access: false,
375 crypto_operations: false,
376 inter_plugin_communication: false,
377 },
378 );
379
380 execution_policies.insert(
381 SecurityLevel::TrustedVerified,
382 ExecutionPolicy {
383 max_memory_pages: 64,
384 max_execution_time_ms: 5000,
385 allowed_imports: vec!["env".to_string()],
386 network_access: true,
387 file_system_access: false,
388 crypto_operations: true,
389 inter_plugin_communication: true,
390 },
391 );
392
393 Self {
394 trusted_plugins: RwLock::new(HashMap::new()),
395 execution_policies: RwLock::new(execution_policies),
396 audit_log: RwLock::new(Vec::new()),
397 }
398 }
399
400 pub async fn validate_plugin(&self, plugin: &WasmPlugin) -> Result<()> {
401 self.validate_plugin_metadata(plugin).await?;
402 self.scan_wasm_bytecode(&plugin.wasm_bytes).await?;
403 self.check_plugin_reputation(&plugin.id).await?;
404 Ok(())
405 }
406
407 async fn validate_plugin_metadata(&self, _plugin: &WasmPlugin) -> Result<()> {
408 Ok(())
409 }
410
411 async fn scan_wasm_bytecode(&self, _wasm_bytes: &[u8]) -> Result<()> {
412 Ok(())
413 }
414
415 async fn check_plugin_reputation(&self, _plugin_id: &str) -> Result<()> {
416 Ok(())
417 }
418}
419
420impl Default for ResourceMetrics {
423 fn default() -> Self {
424 Self {
425 cpu_cores: 4,
426 memory_mb: 8192,
427 storage_gb: 256,
428 network_mbps: 1000.0,
429 gpu_units: 0,
430 quantum_qubits: 0,
431 }
432 }
433}
434
435impl Default for WasmEdgeConfig {
436 fn default() -> Self {
437 Self {
438 optimization_level: OptimizationLevel::Release,
439 resource_limits: WasmResourceLimits::default(),
440 enable_caching: true,
441 enable_jit: true,
442 security_sandbox: true,
443 allowed_imports: vec!["env".to_string(), "wasi_snapshot_preview1".to_string()],
444 max_concurrent_instances: 10,
445 memory_limit_mb: 512,
446 execution_timeout_ms: 5000,
447 enable_hot_reload: true,
448 edge_locations: vec![],
449 }
450 }
451}
452
453impl Default for WasmResourceLimits {
454 fn default() -> Self {
455 Self {
456 max_memory_bytes: 512 * 1024 * 1024, max_execution_time_ms: 5000,
458 max_stack_size_bytes: 2 * 1024 * 1024, max_table_elements: 10000,
460 enable_simd: true,
461 enable_threads: false,
462 max_memory_pages: 8192, max_instances: 10,
464 max_tables: 10,
465 max_memories: 10,
466 max_globals: 100,
467 max_functions: 1000,
468 max_imports: 100,
469 max_exports: 100,
470 }
471 }
472}
473
474pub struct WasmEdgeProcessor {
480 config: WasmEdgeConfig,
481 plugins: RwLock<HashMap<String, WasmPlugin>>,
482 execution_contexts: RwLock<HashMap<String, Arc<RwLock<WasmExecutionContext>>>>,
483 execution_semaphore: Semaphore,
484 edge_registry: RwLock<HashMap<String, EdgeLocation>>,
485 plugin_registry: RwLock<HashMap<String, WasmPlugin>>,
486 performance_metrics: RwLock<HashMap<String, PerformanceMetrics>>,
487 security_manager: SecurityManager,
488 #[cfg(feature = "wasm")]
489 wasm_engine: Engine,
490}
491
492impl WasmEdgeProcessor {
493 pub fn new(config: WasmEdgeConfig) -> Result<Self> {
495 #[cfg(feature = "wasm")]
496 let wasm_engine = {
497 let mut wasm_config = wasmtime::Config::new();
498 wasm_config.debug_info(true);
499 wasm_config.wasm_simd(true);
500 wasm_config.wasm_bulk_memory(true);
501 wasm_config.wasm_reference_types(true);
502 wasm_config.wasm_multi_value(true);
503 wasm_config.cranelift_opt_level(wasmtime::OptLevel::Speed);
504 Engine::new(&wasm_config)?
505 };
506
507 #[cfg(not(feature = "wasm"))]
508 let _wasm_engine = ();
509
510 let execution_semaphore = Semaphore::new(config.max_concurrent_instances);
511 let security_manager = SecurityManager::new();
512
513 Ok(Self {
514 config,
515 plugins: RwLock::new(HashMap::new()),
516 execution_contexts: RwLock::new(HashMap::new()),
517 execution_semaphore,
518 edge_registry: RwLock::new(HashMap::new()),
519 plugin_registry: RwLock::new(HashMap::new()),
520 performance_metrics: RwLock::new(HashMap::new()),
521 security_manager,
522 #[cfg(feature = "wasm")]
523 wasm_engine,
524 })
525 }
526
527 pub async fn register_plugin(&self, plugin: WasmPlugin) -> Result<()> {
529 self.security_manager.validate_plugin(&plugin).await?;
530
531 let plugin_id = plugin.id.clone();
532
533 #[cfg(feature = "wasm")]
534 {
535 let module = Module::new(&self.wasm_engine, &plugin.wasm_bytes)
536 .map_err(|e| anyhow!("Failed to compile WASM module: {}", e))?;
537
538 let mut store = Store::new(&self.wasm_engine, WasmState::default());
539 let instance = Instance::new(&mut store, &module, &[])
540 .map_err(|e| anyhow!("Failed to instantiate WASM module: {}", e))?;
541
542 let context = WasmExecutionContext {
543 engine: self.wasm_engine.clone(),
544 store,
545 instance,
546 plugin_id: plugin_id.clone(),
547 execution_count: 0,
548 total_execution_time_us: 0,
549 last_execution: Utc::now(),
550 resource_usage: ResourceMetrics::default(),
551 };
552
553 self.execution_contexts
554 .write()
555 .await
556 .insert(plugin_id.clone(), Arc::new(RwLock::new(context)));
557 }
558
559 self.plugins
560 .write()
561 .await
562 .insert(plugin_id.clone(), plugin.clone());
563 self.plugin_registry
564 .write()
565 .await
566 .insert(plugin_id.clone(), plugin);
567
568 self.performance_metrics
569 .write()
570 .await
571 .insert(plugin_id.clone(), PerformanceMetrics::new());
572
573 info!("Registered WASM plugin: {}", plugin_id);
574 Ok(())
575 }
576
577 pub async fn execute_plugin(
579 &self,
580 plugin_id: &str,
581 events: Vec<StreamEvent>,
582 edge_location: Option<String>,
583 ) -> Result<EdgeExecutionResult> {
584 let _permit = self
585 .execution_semaphore
586 .acquire()
587 .await
588 .map_err(|_| anyhow!("Failed to acquire execution permit"))?;
589
590 let start_time = std::time::Instant::now();
591 let execution_id = uuid::Uuid::new_v4().to_string();
592
593 let edge_id = if let Some(location) = edge_location {
594 location
595 } else {
596 self.select_optimal_edge_location(plugin_id, &events)
597 .await?
598 };
599
600 let plugin = {
601 let plugins = self.plugins.read().await;
602 plugins
603 .get(plugin_id)
604 .ok_or_else(|| anyhow!("Plugin not found: {}", plugin_id))?
605 .clone()
606 };
607
608 let result = self
609 .execute_plugin_internal(&plugin, events.clone(), &edge_id, &execution_id)
610 .await;
611
612 let execution_time = start_time.elapsed();
613
614 self.update_performance_metrics(plugin_id, &result, execution_time.as_micros() as u64)
615 .await;
616
617 let execution_result = EdgeExecutionResult {
618 plugin_id: plugin_id.to_string(),
619 execution_id,
620 input_events: events,
621 output_events: result.as_ref().map(|r| r.clone()).unwrap_or_default(),
622 execution_time_us: execution_time.as_micros() as u64,
623 memory_used_mb: self.estimate_memory_usage(&plugin).await,
624 edge_location: edge_id,
625 success: result.is_ok(),
626 error_message: result.as_ref().err().map(|e| e.to_string()),
627 metadata: HashMap::new(),
628 };
629
630 match result {
631 Ok(output_events) => {
632 debug!(
633 "Plugin execution successful: {} events processed in {:?}",
634 output_events.len(),
635 execution_time
636 );
637 Ok(EdgeExecutionResult {
638 output_events,
639 ..execution_result
640 })
641 }
642 Err(e) => {
643 warn!("Plugin execution failed: {}", e);
644 Ok(execution_result)
645 }
646 }
647 }
648
649 async fn execute_plugin_internal(
651 &self,
652 plugin: &WasmPlugin,
653 events: Vec<StreamEvent>,
654 _edge_id: &str,
655 _execution_id: &str,
656 ) -> Result<Vec<StreamEvent>> {
657 #[cfg(not(feature = "wasm"))]
658 let _ = plugin;
659
660 #[cfg(feature = "wasm")]
661 {
662 let context_arc = {
663 let contexts = self.execution_contexts.read().await;
664 contexts
665 .get(&plugin.id)
666 .ok_or_else(|| {
667 anyhow!("Execution context not found for plugin: {}", plugin.id)
668 })?
669 .clone()
670 };
671
672 let mut context = context_arc.write().await;
673
674 context.store.data_mut().start_time = Some(Utc::now());
675 context.store.data_mut().event_count = 0;
676
677 let process_events: TypedFunc<(i32, i32), i32> = {
678 let WasmExecutionContext {
679 ref instance,
680 ref mut store,
681 ..
682 } = *context;
683 instance
684 .get_typed_func(store, "process_events")
685 .map_err(|e| anyhow!("Failed to get process_events function: {}", e))?
686 };
687
688 let input_json = serde_json::to_string(&events)?;
689 let input_ptr = self.allocate_memory(&mut context, input_json.as_bytes())?;
690
691 let output_ptr = process_events
692 .call(&mut context.store, (input_ptr, input_json.len() as i32))
693 .map_err(|e| anyhow!("WASM execution failed: {}", e))?;
694
695 let output_data = self.read_memory(&mut context, output_ptr)?;
696 let output_json = String::from_utf8(output_data)?;
697 let output_events: Vec<StreamEvent> = serde_json::from_str(&output_json)?;
698
699 context.execution_count += 1;
700 context.last_execution = Utc::now();
701
702 Ok(output_events)
703 }
704
705 #[cfg(not(feature = "wasm"))]
706 {
707 warn!("WASM feature disabled, returning input events unchanged");
708 Ok(events)
709 }
710 }
711
712 async fn select_optimal_edge_location(
714 &self,
715 plugin_id: &str,
716 events: &[StreamEvent],
717 ) -> Result<String> {
718 let edge_registry = self.edge_registry.read().await;
719
720 if edge_registry.is_empty() {
721 return Ok("default".to_string());
722 }
723
724 let mut best_edge = None;
725 let mut best_score = f64::NEG_INFINITY;
726
727 for (edge_id, edge_location) in edge_registry.iter() {
728 let score = self
729 .calculate_edge_score(plugin_id, events, edge_location)
730 .await;
731 if score > best_score {
732 best_score = score;
733 best_edge = Some(edge_id.clone());
734 }
735 }
736
737 best_edge.ok_or_else(|| anyhow!("No suitable edge location found"))
738 }
739
740 pub(crate) async fn calculate_edge_score(
742 &self,
743 _plugin_id: &str,
744 _events: &[StreamEvent],
745 edge: &EdgeLocation,
746 ) -> f64 {
747 let latency_score = 1.0 / (1.0 + edge.latency_ms / 100.0);
748 let capacity_score = edge.capacity_factor;
749 let resource_score = (edge.available_resources.cpu_cores as f64 / 32.0).min(1.0);
750
751 latency_score * 0.4 + capacity_score * 0.3 + resource_score * 0.3
752 }
753
754 #[cfg(feature = "wasm")]
756 fn allocate_memory(&self, context: &mut WasmExecutionContext, data: &[u8]) -> Result<i32> {
757 let allocate: TypedFunc<i32, i32> = {
758 let instance = &context.instance;
759 let store = &mut context.store;
760 instance
761 .get_typed_func(store, "allocate")
762 .map_err(|e| anyhow!("Failed to get allocate function: {}", e))?
763 };
764
765 let ptr = allocate
766 .call(&mut context.store, data.len() as i32)
767 .map_err(|e| anyhow!("Memory allocation failed: {}", e))?;
768
769 let memory = {
770 let instance = &context.instance;
771 let store = &mut context.store;
772 instance
773 .get_memory(store, "memory")
774 .ok_or_else(|| anyhow!("Failed to get WASM memory"))?
775 };
776
777 memory
778 .write(&mut context.store, ptr as usize, data)
779 .map_err(|e| anyhow!("Failed to write to WASM memory: {}", e))?;
780
781 Ok(ptr)
782 }
783
784 #[cfg(feature = "wasm")]
786 fn read_memory(&self, context: &mut WasmExecutionContext, ptr: i32) -> Result<Vec<u8>> {
787 let memory = {
788 let instance = &context.instance;
789 let store = &mut context.store;
790 instance
791 .get_memory(store, "memory")
792 .ok_or_else(|| anyhow!("Failed to get WASM memory"))?
793 };
794
795 let mut len_bytes = [0u8; 4];
796 memory
797 .read(&context.store, ptr as usize, &mut len_bytes)
798 .map_err(|e| anyhow!("Failed to read length from WASM memory: {}", e))?;
799
800 let len = u32::from_le_bytes(len_bytes) as usize;
801
802 let mut data = vec![0u8; len];
803 memory
804 .read(&context.store, (ptr + 4) as usize, &mut data)
805 .map_err(|e| anyhow!("Failed to read data from WASM memory: {}", e))?;
806
807 Ok(data)
808 }
809
810 async fn estimate_memory_usage(&self, plugin: &WasmPlugin) -> f64 {
812 let base_memory = plugin.wasm_bytes.len() as f64 / (1024.0 * 1024.0);
813 let complexity_factor = plugin.capabilities.len() as f64 * 0.1;
814 base_memory + complexity_factor
815 }
816
817 async fn update_performance_metrics(
819 &self,
820 plugin_id: &str,
821 result: &Result<Vec<StreamEvent>>,
822 execution_time_us: u64,
823 ) {
824 let mut metrics_guard = self.performance_metrics.write().await;
825 let metrics = metrics_guard
826 .entry(plugin_id.to_string())
827 .or_insert_with(PerformanceMetrics::new);
828
829 metrics.total_executions += 1;
830 metrics.total_execution_time_us += execution_time_us;
831 metrics.average_execution_time_us =
832 metrics.total_execution_time_us as f64 / metrics.total_executions as f64;
833
834 if execution_time_us > metrics.max_execution_time_us {
835 metrics.max_execution_time_us = execution_time_us;
836 }
837
838 if metrics.min_execution_time_us == 0 || execution_time_us < metrics.min_execution_time_us {
839 metrics.min_execution_time_us = execution_time_us;
840 }
841
842 let success = result.is_ok();
843 let success_count = if success { 1.0 } else { 0.0 };
844 metrics.success_rate = (metrics.success_rate * (metrics.total_executions - 1) as f64
845 + success_count)
846 / metrics.total_executions as f64;
847
848 metrics.last_updated = Utc::now();
849 }
850
851 pub async fn get_plugin_metrics(&self, plugin_id: &str) -> Option<PerformanceMetrics> {
853 self.performance_metrics
854 .read()
855 .await
856 .get(plugin_id)
857 .cloned()
858 }
859
860 pub async fn list_plugins(&self) -> Vec<WasmPlugin> {
862 self.plugins.read().await.values().cloned().collect()
863 }
864
865 pub async fn hot_reload_plugin(&self, plugin_id: &str, new_wasm_bytes: Vec<u8>) -> Result<()> {
867 if !self.config.enable_hot_reload {
868 return Err(anyhow!("Hot reload is disabled"));
869 }
870
871 let mut plugins = self.plugins.write().await;
872 if let Some(plugin) = plugins.get_mut(plugin_id) {
873 plugin.wasm_bytes = new_wasm_bytes;
874 plugin.updated_at = Utc::now();
875
876 #[cfg(feature = "wasm")]
877 {
878 let module = Module::new(&self.wasm_engine, &plugin.wasm_bytes)?;
879 let mut store = Store::new(&self.wasm_engine, WasmState::default());
880 let instance = Instance::new(&mut store, &module, &[])?;
881
882 let context = WasmExecutionContext {
883 engine: self.wasm_engine.clone(),
884 store,
885 instance,
886 plugin_id: plugin_id.to_string(),
887 execution_count: 0,
888 total_execution_time_us: 0,
889 last_execution: Utc::now(),
890 resource_usage: ResourceMetrics::default(),
891 };
892
893 self.execution_contexts
894 .write()
895 .await
896 .insert(plugin_id.to_string(), Arc::new(RwLock::new(context)));
897 }
898
899 info!("Hot reloaded plugin: {}", plugin_id);
900 Ok(())
901 } else {
902 Err(anyhow!("Plugin not found: {}", plugin_id))
903 }
904 }
905
906 pub async fn unregister_plugin(&self, plugin_id: &str) -> Result<()> {
908 self.plugins.write().await.remove(plugin_id);
909 self.execution_contexts.write().await.remove(plugin_id);
910 self.performance_metrics.write().await.remove(plugin_id);
911 info!("Unregistered plugin: {}", plugin_id);
912 Ok(())
913 }
914
915 pub async fn load_plugin(&self, plugin: WasmPlugin) -> Result<()> {
917 self.register_plugin(plugin).await
918 }
919
920 pub async fn process(&mut self, event: StreamEvent) -> Result<WasmProcessingResult> {
922 let plugin_id = {
923 let plugins = self.plugins.read().await;
924 plugins.keys().next().cloned()
925 };
926
927 if let Some(pid) = plugin_id {
928 let result = self.execute_plugin(&pid, vec![event], None).await?;
929 Ok(WasmProcessingResult {
930 output: if result.output_events.is_empty() {
931 None
932 } else {
933 Some(result.output_events[0].clone())
934 },
935 latency_ms: result.execution_time_us as f64 / 1000.0,
936 })
937 } else {
938 Ok(WasmProcessingResult {
939 output: None,
940 latency_ms: 0.0,
941 })
942 }
943 }
944
945 pub async fn process_at_location(
947 &self,
948 event: StreamEvent,
949 location: &EdgeLocation,
950 ) -> Result<WasmProcessingResult> {
951 let plugin_id = {
952 let plugins = self.plugins.read().await;
953 plugins.keys().next().cloned()
954 };
955
956 if let Some(pid) = plugin_id {
957 let result = self
958 .execute_plugin(&pid, vec![event], Some(location.id.clone()))
959 .await?;
960 Ok(WasmProcessingResult {
961 output: if result.output_events.is_empty() {
962 None
963 } else {
964 Some(result.output_events[0].clone())
965 },
966 latency_ms: result.execution_time_us as f64 / 1000.0,
967 })
968 } else {
969 Ok(WasmProcessingResult {
970 output: None,
971 latency_ms: 0.0,
972 })
973 }
974 }
975
976 pub async fn hot_swap_plugin(&self, old_plugin_id: &str, new_plugin: WasmPlugin) -> Result<()> {
978 self.unregister_plugin(old_plugin_id).await?;
979 self.register_plugin(new_plugin).await?;
980 info!("Hot-swapped plugin {} with new version", old_plugin_id);
981 Ok(())
982 }
983
984 pub async fn get_stats(&self) -> WasmProcessorStats {
986 let plugins = self.plugins.read().await;
987 let metrics = self.performance_metrics.read().await;
988
989 let total_processed = metrics.values().map(|m| m.total_executions).sum();
990
991 let average_latency_ms = if metrics.is_empty() {
992 0.0
993 } else {
994 metrics
995 .values()
996 .map(|m| m.average_execution_time_us / 1000.0)
997 .sum::<f64>()
998 / metrics.len() as f64
999 };
1000
1001 WasmProcessorStats {
1002 total_processed,
1003 average_latency_ms,
1004 active_plugins: plugins.len(),
1005 }
1006 }
1007}
1008
1009#[cfg(test)]
1014mod tests {
1015 use super::*;
1016
1017 #[tokio::test]
1018 async fn test_wasm_edge_processor_creation() {
1019 let config = WasmEdgeConfig::default();
1020 let processor = WasmEdgeProcessor::new(config).unwrap();
1021
1022 let plugins = processor.list_plugins().await;
1023 assert_eq!(plugins.len(), 0);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_edge_location_scoring() {
1028 let config = WasmEdgeConfig::default();
1029 let processor = WasmEdgeProcessor::new(config).unwrap();
1030
1031 let edge = EdgeLocation {
1032 id: "test-edge".to_string(),
1033 region: "us-west".to_string(),
1034 latency_ms: 50.0,
1035 capacity_factor: 0.8,
1036 available_resources: ResourceMetrics::default(),
1037 specializations: vec![ProcessingSpecialization::RdfProcessing],
1038 };
1039
1040 let score = processor
1041 .calculate_edge_score("test-plugin", &[], &edge)
1042 .await;
1043 assert!(score > 0.0 && score <= 1.0);
1044 }
1045
1046 #[test]
1047 fn test_security_manager_creation() {
1048 let security_manager = SecurityManager::new();
1049 assert!(security_manager.execution_policies.try_read().is_ok());
1050 }
1051}