Skip to main content

oxirs_stream/wasm_edge_computing/
mod.rs

1//! # WebAssembly Edge Computing Module
2//!
3//! Ultra-low latency edge processing using WebAssembly with hot-swappable plugins,
4//! distributed execution, and advanced resource management for next-generation streaming.
5
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, Semaphore};
12use tracing::{debug, info, warn};
13
14#[cfg(feature = "wasm")]
15use wasmtime::{Engine, Instance, Module, Store, TypedFunc};
16
17use crate::event::StreamEvent;
18
19// Sibling modules
20pub mod wasm_edge_computing_runtime;
21pub mod wasm_edge_computing_sandbox;
22
23// Re-export runtime types
24pub use wasm_edge_computing_runtime::{
25    AllocationConstraints, AllocationEvent, AllocationPlan, CacheOptimizer, CachedModule,
26    DependencyGraph, EdgeResourceOptimizer, ExecutionProfile, ModelType, NodeAssignment,
27    OptimizationMetrics, OptimizationStrategy, PredictionEngine, PrefetchPredictor, PriorityLevel,
28    ResourceAllocation, ResourceModel, ResourcePrediction, SeasonalityType, TemporalPattern,
29    WasmIntelligentCache, WorkloadDescription, WorkloadFeatures,
30};
31
32// Re-export sandbox types
33pub use wasm_edge_computing_sandbox::{
34    AdaptivePolicy, AdaptiveSecuritySandbox, BehaviorAnomaly, BehaviorProfile, BehavioralAnalyzer,
35    ExecutionBehavior, ImpactLevel, MemoryAccessPattern, NetworkActivityLevel, Priority,
36    SecurityAssessment, SecurityMetrics, SecurityRecommendation, ThreatDetector, ThreatIndicator,
37    ThreatSignature, ThreatType,
38};
39
40/// WASM edge processor configuration
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct WasmEdgeConfig {
43    pub optimization_level: OptimizationLevel,
44    pub resource_limits: WasmResourceLimits,
45    pub enable_caching: bool,
46    pub enable_jit: bool,
47    pub security_sandbox: bool,
48    pub allowed_imports: Vec<String>,
49    // Optional legacy fields for backward compatibility
50    #[serde(default)]
51    pub max_concurrent_instances: usize,
52    #[serde(default)]
53    pub memory_limit_mb: u64,
54    #[serde(default)]
55    pub execution_timeout_ms: u64,
56    #[serde(default)]
57    pub enable_hot_reload: bool,
58    #[serde(default)]
59    pub edge_locations: Vec<EdgeLocation>,
60}
61
62/// WASM resource limits
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct WasmResourceLimits {
65    pub max_memory_bytes: u64,
66    pub max_execution_time_ms: u64,
67    pub max_stack_size_bytes: u64,
68    pub max_table_elements: u32,
69    pub enable_simd: bool,
70    pub enable_threads: bool,
71    // Legacy fields
72    #[serde(default)]
73    pub max_memory_pages: u32,
74    #[serde(default)]
75    pub max_instances: u32,
76    #[serde(default)]
77    pub max_tables: u32,
78    #[serde(default)]
79    pub max_memories: u32,
80    #[serde(default)]
81    pub max_globals: u32,
82    #[serde(default)]
83    pub max_functions: u32,
84    #[serde(default)]
85    pub max_imports: u32,
86    #[serde(default)]
87    pub max_exports: u32,
88}
89
90/// Edge computing location
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct EdgeLocation {
93    pub id: String,
94    pub region: String,
95    pub latency_ms: f64,
96    pub capacity_factor: f64,
97    pub available_resources: ResourceMetrics,
98    pub specializations: Vec<ProcessingSpecialization>,
99}
100
101/// Processing specializations for edge nodes
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum ProcessingSpecialization {
104    RdfProcessing,
105    SparqlOptimization,
106    GraphAnalytics,
107    MachineLearning,
108    Cryptography,
109    ComputerVision,
110    NaturalLanguage,
111    QuantumSimulation,
112}
113
114/// WASM optimization levels
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub enum OptimizationLevel {
117    Debug,
118    Release,
119    Maximum,
120    Adaptive,
121}
122
123/// Resource usage metrics
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct ResourceMetrics {
126    pub cpu_cores: u32,
127    pub memory_mb: u64,
128    pub storage_gb: u64,
129    pub network_mbps: f64,
130    pub gpu_units: u32,
131    pub quantum_qubits: u32,
132}
133
134/// WASM plugin metadata
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct WasmPlugin {
137    pub id: String,
138    pub name: String,
139    pub version: String,
140    pub description: String,
141    pub author: String,
142    pub capabilities: Vec<PluginCapability>,
143    pub wasm_bytes: Vec<u8>,
144    pub schema: PluginSchema,
145    pub performance_profile: PerformanceProfile,
146    pub security_level: SecurityLevel,
147    pub created_at: DateTime<Utc>,
148    pub updated_at: DateTime<Utc>,
149}
150
151/// Plugin capabilities
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum PluginCapability {
154    EventProcessing,
155    DataTransformation,
156    Filtering,
157    Aggregation,
158    Enrichment,
159    Validation,
160    Compression,
161    Encryption,
162    Analytics,
163    MachineLearning,
164}
165
166/// Plugin input/output schema
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct PluginSchema {
169    pub input_types: Vec<String>,
170    pub output_types: Vec<String>,
171    pub configuration_schema: serde_json::Value,
172    pub required_imports: Vec<String>,
173    pub exported_functions: Vec<String>,
174}
175
176impl Default for PluginSchema {
177    fn default() -> Self {
178        Self {
179            input_types: vec!["StreamEvent".to_string()],
180            output_types: vec!["StreamEvent".to_string()],
181            configuration_schema: serde_json::json!({}),
182            required_imports: vec![],
183            exported_functions: vec!["process_events".to_string()],
184        }
185    }
186}
187
188/// Performance characteristics
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct PerformanceProfile {
191    pub average_execution_time_us: u64,
192    pub memory_usage_mb: f64,
193    pub cpu_intensity: f64,
194    pub throughput_events_per_second: u64,
195    pub scalability_factor: f64,
196}
197
198impl Default for PerformanceProfile {
199    fn default() -> Self {
200        Self {
201            average_execution_time_us: 100,
202            memory_usage_mb: 1.0,
203            cpu_intensity: 0.5,
204            throughput_events_per_second: 1000,
205            scalability_factor: 1.0,
206        }
207    }
208}
209
210/// Security levels for plugins
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
212pub enum SecurityLevel {
213    Untrusted,
214    BasicSandbox,
215    Standard,
216    Enhanced,
217    TrustedVerified,
218    CriticalSecurity,
219    High,
220}
221
222/// WASM execution context
223pub struct WasmExecutionContext {
224    #[cfg(feature = "wasm")]
225    pub engine: Engine,
226    #[cfg(feature = "wasm")]
227    pub store: Store<WasmState>,
228    #[cfg(feature = "wasm")]
229    pub instance: Instance,
230    pub plugin_id: String,
231    pub execution_count: u64,
232    pub total_execution_time_us: u64,
233    pub last_execution: DateTime<Utc>,
234    pub resource_usage: ResourceMetrics,
235}
236
237impl std::fmt::Debug for WasmExecutionContext {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("WasmExecutionContext")
240            .field("plugin_id", &self.plugin_id)
241            .field("execution_count", &self.execution_count)
242            .field("total_execution_time_us", &self.total_execution_time_us)
243            .field("last_execution", &self.last_execution)
244            .field("resource_usage", &self.resource_usage)
245            .finish_non_exhaustive()
246    }
247}
248
249/// WASM execution state
250#[derive(Debug, Default)]
251pub struct WasmState {
252    pub event_count: u64,
253    pub memory_allocations: u64,
254    pub external_calls: u64,
255    pub start_time: Option<DateTime<Utc>>,
256}
257
258/// Edge execution result
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct EdgeExecutionResult {
261    pub plugin_id: String,
262    pub execution_id: String,
263    pub input_events: Vec<StreamEvent>,
264    pub output_events: Vec<StreamEvent>,
265    pub execution_time_us: u64,
266    pub memory_used_mb: f64,
267    pub edge_location: String,
268    pub success: bool,
269    pub error_message: Option<String>,
270    pub metadata: HashMap<String, serde_json::Value>,
271}
272
273/// WASM processing result for single event processing
274#[derive(Debug, Clone)]
275pub struct WasmProcessingResult {
276    pub output: Option<StreamEvent>,
277    pub latency_ms: f64,
278}
279
280/// WASM processor statistics
281#[derive(Debug, Clone)]
282pub struct WasmProcessorStats {
283    pub total_processed: u64,
284    pub average_latency_ms: f64,
285    pub active_plugins: usize,
286}
287
288/// Performance tracking for plugins
289#[derive(Debug, Clone)]
290pub struct PerformanceMetrics {
291    pub total_executions: u64,
292    pub total_execution_time_us: u64,
293    pub average_execution_time_us: f64,
294    pub max_execution_time_us: u64,
295    pub min_execution_time_us: u64,
296    pub success_rate: f64,
297    pub throughput_events_per_second: f64,
298    pub memory_efficiency: f64,
299    pub last_updated: DateTime<Utc>,
300}
301
302impl PerformanceMetrics {
303    fn new() -> Self {
304        Self {
305            total_executions: 0,
306            total_execution_time_us: 0,
307            average_execution_time_us: 0.0,
308            max_execution_time_us: 0,
309            min_execution_time_us: 0,
310            success_rate: 0.0,
311            throughput_events_per_second: 0.0,
312            memory_efficiency: 0.0,
313            last_updated: Utc::now(),
314        }
315    }
316}
317
318/// Risk assessment levels
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub enum RiskLevel {
321    Low,
322    Medium,
323    High,
324    Critical,
325}
326
327/// Security manager for WASM execution
328#[derive(Debug)]
329pub struct SecurityManager {
330    trusted_plugins: RwLock<HashMap<String, SecurityLevel>>,
331    execution_policies: RwLock<HashMap<SecurityLevel, ExecutionPolicy>>,
332    audit_log: RwLock<Vec<SecurityAuditEntry>>,
333}
334
335/// Execution policies based on security level
336#[derive(Debug, Clone)]
337pub struct ExecutionPolicy {
338    pub max_memory_pages: u32,
339    pub max_execution_time_ms: u64,
340    pub allowed_imports: Vec<String>,
341    pub network_access: bool,
342    pub file_system_access: bool,
343    pub crypto_operations: bool,
344    pub inter_plugin_communication: bool,
345}
346
347/// Security audit entry
348#[derive(Debug, Clone)]
349pub struct SecurityAuditEntry {
350    pub timestamp: DateTime<Utc>,
351    pub plugin_id: String,
352    pub action: String,
353    pub risk_level: RiskLevel,
354    pub details: String,
355}
356
357impl Default for SecurityManager {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363impl SecurityManager {
364    pub fn new() -> Self {
365        let mut execution_policies = HashMap::new();
366
367        execution_policies.insert(
368            SecurityLevel::Untrusted,
369            ExecutionPolicy {
370                max_memory_pages: 1,
371                max_execution_time_ms: 100,
372                allowed_imports: vec![],
373                network_access: false,
374                file_system_access: false,
375                crypto_operations: false,
376                inter_plugin_communication: false,
377            },
378        );
379
380        execution_policies.insert(
381            SecurityLevel::TrustedVerified,
382            ExecutionPolicy {
383                max_memory_pages: 64,
384                max_execution_time_ms: 5000,
385                allowed_imports: vec!["env".to_string()],
386                network_access: true,
387                file_system_access: false,
388                crypto_operations: true,
389                inter_plugin_communication: true,
390            },
391        );
392
393        Self {
394            trusted_plugins: RwLock::new(HashMap::new()),
395            execution_policies: RwLock::new(execution_policies),
396            audit_log: RwLock::new(Vec::new()),
397        }
398    }
399
400    pub async fn validate_plugin(&self, plugin: &WasmPlugin) -> Result<()> {
401        self.validate_plugin_metadata(plugin).await?;
402        self.scan_wasm_bytecode(&plugin.wasm_bytes).await?;
403        self.check_plugin_reputation(&plugin.id).await?;
404        Ok(())
405    }
406
407    async fn validate_plugin_metadata(&self, _plugin: &WasmPlugin) -> Result<()> {
408        Ok(())
409    }
410
411    async fn scan_wasm_bytecode(&self, _wasm_bytes: &[u8]) -> Result<()> {
412        Ok(())
413    }
414
415    async fn check_plugin_reputation(&self, _plugin_id: &str) -> Result<()> {
416        Ok(())
417    }
418}
419
420// Default implementations
421
422impl Default for ResourceMetrics {
423    fn default() -> Self {
424        Self {
425            cpu_cores: 4,
426            memory_mb: 8192,
427            storage_gb: 256,
428            network_mbps: 1000.0,
429            gpu_units: 0,
430            quantum_qubits: 0,
431        }
432    }
433}
434
435impl Default for WasmEdgeConfig {
436    fn default() -> Self {
437        Self {
438            optimization_level: OptimizationLevel::Release,
439            resource_limits: WasmResourceLimits::default(),
440            enable_caching: true,
441            enable_jit: true,
442            security_sandbox: true,
443            allowed_imports: vec!["env".to_string(), "wasi_snapshot_preview1".to_string()],
444            max_concurrent_instances: 10,
445            memory_limit_mb: 512,
446            execution_timeout_ms: 5000,
447            enable_hot_reload: true,
448            edge_locations: vec![],
449        }
450    }
451}
452
453impl Default for WasmResourceLimits {
454    fn default() -> Self {
455        Self {
456            max_memory_bytes: 512 * 1024 * 1024, // 512 MB
457            max_execution_time_ms: 5000,
458            max_stack_size_bytes: 2 * 1024 * 1024, // 2 MB
459            max_table_elements: 10000,
460            enable_simd: true,
461            enable_threads: false,
462            max_memory_pages: 8192, // 512 MB / 64KB per page
463            max_instances: 10,
464            max_tables: 10,
465            max_memories: 10,
466            max_globals: 100,
467            max_functions: 1000,
468            max_imports: 100,
469            max_exports: 100,
470        }
471    }
472}
473
474// ============================================================
475// WasmEdgeProcessor
476// ============================================================
477
478/// Advanced WASM edge computing processor
479pub struct WasmEdgeProcessor {
480    config: WasmEdgeConfig,
481    plugins: RwLock<HashMap<String, WasmPlugin>>,
482    execution_contexts: RwLock<HashMap<String, Arc<RwLock<WasmExecutionContext>>>>,
483    execution_semaphore: Semaphore,
484    edge_registry: RwLock<HashMap<String, EdgeLocation>>,
485    plugin_registry: RwLock<HashMap<String, WasmPlugin>>,
486    performance_metrics: RwLock<HashMap<String, PerformanceMetrics>>,
487    security_manager: SecurityManager,
488    #[cfg(feature = "wasm")]
489    wasm_engine: Engine,
490}
491
492impl WasmEdgeProcessor {
493    /// Create new WASM edge processor
494    pub fn new(config: WasmEdgeConfig) -> Result<Self> {
495        #[cfg(feature = "wasm")]
496        let wasm_engine = {
497            let mut wasm_config = wasmtime::Config::new();
498            wasm_config.debug_info(true);
499            wasm_config.wasm_simd(true);
500            wasm_config.wasm_bulk_memory(true);
501            wasm_config.wasm_reference_types(true);
502            wasm_config.wasm_multi_value(true);
503            wasm_config.cranelift_opt_level(wasmtime::OptLevel::Speed);
504            Engine::new(&wasm_config)?
505        };
506
507        #[cfg(not(feature = "wasm"))]
508        let _wasm_engine = ();
509
510        let execution_semaphore = Semaphore::new(config.max_concurrent_instances);
511        let security_manager = SecurityManager::new();
512
513        Ok(Self {
514            config,
515            plugins: RwLock::new(HashMap::new()),
516            execution_contexts: RwLock::new(HashMap::new()),
517            execution_semaphore,
518            edge_registry: RwLock::new(HashMap::new()),
519            plugin_registry: RwLock::new(HashMap::new()),
520            performance_metrics: RwLock::new(HashMap::new()),
521            security_manager,
522            #[cfg(feature = "wasm")]
523            wasm_engine,
524        })
525    }
526
527    /// Register a WASM plugin
528    pub async fn register_plugin(&self, plugin: WasmPlugin) -> Result<()> {
529        self.security_manager.validate_plugin(&plugin).await?;
530
531        let plugin_id = plugin.id.clone();
532
533        #[cfg(feature = "wasm")]
534        {
535            let module = Module::new(&self.wasm_engine, &plugin.wasm_bytes)
536                .map_err(|e| anyhow!("Failed to compile WASM module: {}", e))?;
537
538            let mut store = Store::new(&self.wasm_engine, WasmState::default());
539            let instance = Instance::new(&mut store, &module, &[])
540                .map_err(|e| anyhow!("Failed to instantiate WASM module: {}", e))?;
541
542            let context = WasmExecutionContext {
543                engine: self.wasm_engine.clone(),
544                store,
545                instance,
546                plugin_id: plugin_id.clone(),
547                execution_count: 0,
548                total_execution_time_us: 0,
549                last_execution: Utc::now(),
550                resource_usage: ResourceMetrics::default(),
551            };
552
553            self.execution_contexts
554                .write()
555                .await
556                .insert(plugin_id.clone(), Arc::new(RwLock::new(context)));
557        }
558
559        self.plugins
560            .write()
561            .await
562            .insert(plugin_id.clone(), plugin.clone());
563        self.plugin_registry
564            .write()
565            .await
566            .insert(plugin_id.clone(), plugin);
567
568        self.performance_metrics
569            .write()
570            .await
571            .insert(plugin_id.clone(), PerformanceMetrics::new());
572
573        info!("Registered WASM plugin: {}", plugin_id);
574        Ok(())
575    }
576
577    /// Execute plugin on edge location
578    pub async fn execute_plugin(
579        &self,
580        plugin_id: &str,
581        events: Vec<StreamEvent>,
582        edge_location: Option<String>,
583    ) -> Result<EdgeExecutionResult> {
584        let _permit = self
585            .execution_semaphore
586            .acquire()
587            .await
588            .map_err(|_| anyhow!("Failed to acquire execution permit"))?;
589
590        let start_time = std::time::Instant::now();
591        let execution_id = uuid::Uuid::new_v4().to_string();
592
593        let edge_id = if let Some(location) = edge_location {
594            location
595        } else {
596            self.select_optimal_edge_location(plugin_id, &events)
597                .await?
598        };
599
600        let plugin = {
601            let plugins = self.plugins.read().await;
602            plugins
603                .get(plugin_id)
604                .ok_or_else(|| anyhow!("Plugin not found: {}", plugin_id))?
605                .clone()
606        };
607
608        let result = self
609            .execute_plugin_internal(&plugin, events.clone(), &edge_id, &execution_id)
610            .await;
611
612        let execution_time = start_time.elapsed();
613
614        self.update_performance_metrics(plugin_id, &result, execution_time.as_micros() as u64)
615            .await;
616
617        let execution_result = EdgeExecutionResult {
618            plugin_id: plugin_id.to_string(),
619            execution_id,
620            input_events: events,
621            output_events: result.as_ref().map(|r| r.clone()).unwrap_or_default(),
622            execution_time_us: execution_time.as_micros() as u64,
623            memory_used_mb: self.estimate_memory_usage(&plugin).await,
624            edge_location: edge_id,
625            success: result.is_ok(),
626            error_message: result.as_ref().err().map(|e| e.to_string()),
627            metadata: HashMap::new(),
628        };
629
630        match result {
631            Ok(output_events) => {
632                debug!(
633                    "Plugin execution successful: {} events processed in {:?}",
634                    output_events.len(),
635                    execution_time
636                );
637                Ok(EdgeExecutionResult {
638                    output_events,
639                    ..execution_result
640                })
641            }
642            Err(e) => {
643                warn!("Plugin execution failed: {}", e);
644                Ok(execution_result)
645            }
646        }
647    }
648
649    /// Internal plugin execution
650    async fn execute_plugin_internal(
651        &self,
652        plugin: &WasmPlugin,
653        events: Vec<StreamEvent>,
654        _edge_id: &str,
655        _execution_id: &str,
656    ) -> Result<Vec<StreamEvent>> {
657        #[cfg(not(feature = "wasm"))]
658        let _ = plugin;
659
660        #[cfg(feature = "wasm")]
661        {
662            let context_arc = {
663                let contexts = self.execution_contexts.read().await;
664                contexts
665                    .get(&plugin.id)
666                    .ok_or_else(|| {
667                        anyhow!("Execution context not found for plugin: {}", plugin.id)
668                    })?
669                    .clone()
670            };
671
672            let mut context = context_arc.write().await;
673
674            context.store.data_mut().start_time = Some(Utc::now());
675            context.store.data_mut().event_count = 0;
676
677            let process_events: TypedFunc<(i32, i32), i32> = {
678                let WasmExecutionContext {
679                    ref instance,
680                    ref mut store,
681                    ..
682                } = *context;
683                instance
684                    .get_typed_func(store, "process_events")
685                    .map_err(|e| anyhow!("Failed to get process_events function: {}", e))?
686            };
687
688            let input_json = serde_json::to_string(&events)?;
689            let input_ptr = self.allocate_memory(&mut context, input_json.as_bytes())?;
690
691            let output_ptr = process_events
692                .call(&mut context.store, (input_ptr, input_json.len() as i32))
693                .map_err(|e| anyhow!("WASM execution failed: {}", e))?;
694
695            let output_data = self.read_memory(&mut context, output_ptr)?;
696            let output_json = String::from_utf8(output_data)?;
697            let output_events: Vec<StreamEvent> = serde_json::from_str(&output_json)?;
698
699            context.execution_count += 1;
700            context.last_execution = Utc::now();
701
702            Ok(output_events)
703        }
704
705        #[cfg(not(feature = "wasm"))]
706        {
707            warn!("WASM feature disabled, returning input events unchanged");
708            Ok(events)
709        }
710    }
711
712    /// Select optimal edge location for execution
713    async fn select_optimal_edge_location(
714        &self,
715        plugin_id: &str,
716        events: &[StreamEvent],
717    ) -> Result<String> {
718        let edge_registry = self.edge_registry.read().await;
719
720        if edge_registry.is_empty() {
721            return Ok("default".to_string());
722        }
723
724        let mut best_edge = None;
725        let mut best_score = f64::NEG_INFINITY;
726
727        for (edge_id, edge_location) in edge_registry.iter() {
728            let score = self
729                .calculate_edge_score(plugin_id, events, edge_location)
730                .await;
731            if score > best_score {
732                best_score = score;
733                best_edge = Some(edge_id.clone());
734            }
735        }
736
737        best_edge.ok_or_else(|| anyhow!("No suitable edge location found"))
738    }
739
740    /// Calculate edge location suitability score
741    pub(crate) async fn calculate_edge_score(
742        &self,
743        _plugin_id: &str,
744        _events: &[StreamEvent],
745        edge: &EdgeLocation,
746    ) -> f64 {
747        let latency_score = 1.0 / (1.0 + edge.latency_ms / 100.0);
748        let capacity_score = edge.capacity_factor;
749        let resource_score = (edge.available_resources.cpu_cores as f64 / 32.0).min(1.0);
750
751        latency_score * 0.4 + capacity_score * 0.3 + resource_score * 0.3
752    }
753
754    /// Allocate memory in WASM instance
755    #[cfg(feature = "wasm")]
756    fn allocate_memory(&self, context: &mut WasmExecutionContext, data: &[u8]) -> Result<i32> {
757        let allocate: TypedFunc<i32, i32> = {
758            let instance = &context.instance;
759            let store = &mut context.store;
760            instance
761                .get_typed_func(store, "allocate")
762                .map_err(|e| anyhow!("Failed to get allocate function: {}", e))?
763        };
764
765        let ptr = allocate
766            .call(&mut context.store, data.len() as i32)
767            .map_err(|e| anyhow!("Memory allocation failed: {}", e))?;
768
769        let memory = {
770            let instance = &context.instance;
771            let store = &mut context.store;
772            instance
773                .get_memory(store, "memory")
774                .ok_or_else(|| anyhow!("Failed to get WASM memory"))?
775        };
776
777        memory
778            .write(&mut context.store, ptr as usize, data)
779            .map_err(|e| anyhow!("Failed to write to WASM memory: {}", e))?;
780
781        Ok(ptr)
782    }
783
784    /// Read memory from WASM instance
785    #[cfg(feature = "wasm")]
786    fn read_memory(&self, context: &mut WasmExecutionContext, ptr: i32) -> Result<Vec<u8>> {
787        let memory = {
788            let instance = &context.instance;
789            let store = &mut context.store;
790            instance
791                .get_memory(store, "memory")
792                .ok_or_else(|| anyhow!("Failed to get WASM memory"))?
793        };
794
795        let mut len_bytes = [0u8; 4];
796        memory
797            .read(&context.store, ptr as usize, &mut len_bytes)
798            .map_err(|e| anyhow!("Failed to read length from WASM memory: {}", e))?;
799
800        let len = u32::from_le_bytes(len_bytes) as usize;
801
802        let mut data = vec![0u8; len];
803        memory
804            .read(&context.store, (ptr + 4) as usize, &mut data)
805            .map_err(|e| anyhow!("Failed to read data from WASM memory: {}", e))?;
806
807        Ok(data)
808    }
809
810    /// Estimate memory usage for plugin
811    async fn estimate_memory_usage(&self, plugin: &WasmPlugin) -> f64 {
812        let base_memory = plugin.wasm_bytes.len() as f64 / (1024.0 * 1024.0);
813        let complexity_factor = plugin.capabilities.len() as f64 * 0.1;
814        base_memory + complexity_factor
815    }
816
817    /// Update performance metrics
818    async fn update_performance_metrics(
819        &self,
820        plugin_id: &str,
821        result: &Result<Vec<StreamEvent>>,
822        execution_time_us: u64,
823    ) {
824        let mut metrics_guard = self.performance_metrics.write().await;
825        let metrics = metrics_guard
826            .entry(plugin_id.to_string())
827            .or_insert_with(PerformanceMetrics::new);
828
829        metrics.total_executions += 1;
830        metrics.total_execution_time_us += execution_time_us;
831        metrics.average_execution_time_us =
832            metrics.total_execution_time_us as f64 / metrics.total_executions as f64;
833
834        if execution_time_us > metrics.max_execution_time_us {
835            metrics.max_execution_time_us = execution_time_us;
836        }
837
838        if metrics.min_execution_time_us == 0 || execution_time_us < metrics.min_execution_time_us {
839            metrics.min_execution_time_us = execution_time_us;
840        }
841
842        let success = result.is_ok();
843        let success_count = if success { 1.0 } else { 0.0 };
844        metrics.success_rate = (metrics.success_rate * (metrics.total_executions - 1) as f64
845            + success_count)
846            / metrics.total_executions as f64;
847
848        metrics.last_updated = Utc::now();
849    }
850
851    /// Get plugin performance metrics
852    pub async fn get_plugin_metrics(&self, plugin_id: &str) -> Option<PerformanceMetrics> {
853        self.performance_metrics
854            .read()
855            .await
856            .get(plugin_id)
857            .cloned()
858    }
859
860    /// List all registered plugins
861    pub async fn list_plugins(&self) -> Vec<WasmPlugin> {
862        self.plugins.read().await.values().cloned().collect()
863    }
864
865    /// Hot reload plugin
866    pub async fn hot_reload_plugin(&self, plugin_id: &str, new_wasm_bytes: Vec<u8>) -> Result<()> {
867        if !self.config.enable_hot_reload {
868            return Err(anyhow!("Hot reload is disabled"));
869        }
870
871        let mut plugins = self.plugins.write().await;
872        if let Some(plugin) = plugins.get_mut(plugin_id) {
873            plugin.wasm_bytes = new_wasm_bytes;
874            plugin.updated_at = Utc::now();
875
876            #[cfg(feature = "wasm")]
877            {
878                let module = Module::new(&self.wasm_engine, &plugin.wasm_bytes)?;
879                let mut store = Store::new(&self.wasm_engine, WasmState::default());
880                let instance = Instance::new(&mut store, &module, &[])?;
881
882                let context = WasmExecutionContext {
883                    engine: self.wasm_engine.clone(),
884                    store,
885                    instance,
886                    plugin_id: plugin_id.to_string(),
887                    execution_count: 0,
888                    total_execution_time_us: 0,
889                    last_execution: Utc::now(),
890                    resource_usage: ResourceMetrics::default(),
891                };
892
893                self.execution_contexts
894                    .write()
895                    .await
896                    .insert(plugin_id.to_string(), Arc::new(RwLock::new(context)));
897            }
898
899            info!("Hot reloaded plugin: {}", plugin_id);
900            Ok(())
901        } else {
902            Err(anyhow!("Plugin not found: {}", plugin_id))
903        }
904    }
905
906    /// Unregister plugin
907    pub async fn unregister_plugin(&self, plugin_id: &str) -> Result<()> {
908        self.plugins.write().await.remove(plugin_id);
909        self.execution_contexts.write().await.remove(plugin_id);
910        self.performance_metrics.write().await.remove(plugin_id);
911        info!("Unregistered plugin: {}", plugin_id);
912        Ok(())
913    }
914
915    /// Load plugin (alias for register_plugin for API compatibility)
916    pub async fn load_plugin(&self, plugin: WasmPlugin) -> Result<()> {
917        self.register_plugin(plugin).await
918    }
919
920    /// Process a single event using the processor
921    pub async fn process(&mut self, event: StreamEvent) -> Result<WasmProcessingResult> {
922        let plugin_id = {
923            let plugins = self.plugins.read().await;
924            plugins.keys().next().cloned()
925        };
926
927        if let Some(pid) = plugin_id {
928            let result = self.execute_plugin(&pid, vec![event], None).await?;
929            Ok(WasmProcessingResult {
930                output: if result.output_events.is_empty() {
931                    None
932                } else {
933                    Some(result.output_events[0].clone())
934                },
935                latency_ms: result.execution_time_us as f64 / 1000.0,
936            })
937        } else {
938            Ok(WasmProcessingResult {
939                output: None,
940                latency_ms: 0.0,
941            })
942        }
943    }
944
945    /// Process event at a specific edge location
946    pub async fn process_at_location(
947        &self,
948        event: StreamEvent,
949        location: &EdgeLocation,
950    ) -> Result<WasmProcessingResult> {
951        let plugin_id = {
952            let plugins = self.plugins.read().await;
953            plugins.keys().next().cloned()
954        };
955
956        if let Some(pid) = plugin_id {
957            let result = self
958                .execute_plugin(&pid, vec![event], Some(location.id.clone()))
959                .await?;
960            Ok(WasmProcessingResult {
961                output: if result.output_events.is_empty() {
962                    None
963                } else {
964                    Some(result.output_events[0].clone())
965                },
966                latency_ms: result.execution_time_us as f64 / 1000.0,
967            })
968        } else {
969            Ok(WasmProcessingResult {
970                output: None,
971                latency_ms: 0.0,
972            })
973        }
974    }
975
976    /// Hot-swap plugin (alias for hot_reload_plugin for API compatibility)
977    pub async fn hot_swap_plugin(&self, old_plugin_id: &str, new_plugin: WasmPlugin) -> Result<()> {
978        self.unregister_plugin(old_plugin_id).await?;
979        self.register_plugin(new_plugin).await?;
980        info!("Hot-swapped plugin {} with new version", old_plugin_id);
981        Ok(())
982    }
983
984    /// Get processor statistics
985    pub async fn get_stats(&self) -> WasmProcessorStats {
986        let plugins = self.plugins.read().await;
987        let metrics = self.performance_metrics.read().await;
988
989        let total_processed = metrics.values().map(|m| m.total_executions).sum();
990
991        let average_latency_ms = if metrics.is_empty() {
992            0.0
993        } else {
994            metrics
995                .values()
996                .map(|m| m.average_execution_time_us / 1000.0)
997                .sum::<f64>()
998                / metrics.len() as f64
999        };
1000
1001        WasmProcessorStats {
1002            total_processed,
1003            average_latency_ms,
1004            active_plugins: plugins.len(),
1005        }
1006    }
1007}
1008
1009// ============================================================
1010// Tests
1011// ============================================================
1012
1013#[cfg(test)]
1014mod tests {
1015    use super::*;
1016
1017    #[tokio::test]
1018    async fn test_wasm_edge_processor_creation() {
1019        let config = WasmEdgeConfig::default();
1020        let processor = WasmEdgeProcessor::new(config).unwrap();
1021
1022        let plugins = processor.list_plugins().await;
1023        assert_eq!(plugins.len(), 0);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_edge_location_scoring() {
1028        let config = WasmEdgeConfig::default();
1029        let processor = WasmEdgeProcessor::new(config).unwrap();
1030
1031        let edge = EdgeLocation {
1032            id: "test-edge".to_string(),
1033            region: "us-west".to_string(),
1034            latency_ms: 50.0,
1035            capacity_factor: 0.8,
1036            available_resources: ResourceMetrics::default(),
1037            specializations: vec![ProcessingSpecialization::RdfProcessing],
1038        };
1039
1040        let score = processor
1041            .calculate_edge_score("test-plugin", &[], &edge)
1042            .await;
1043        assert!(score > 0.0 && score <= 1.0);
1044    }
1045
1046    #[test]
1047    fn test_security_manager_creation() {
1048        let security_manager = SecurityManager::new();
1049        assert!(security_manager.execution_policies.try_read().is_ok());
1050    }
1051}