Skip to main content

smg_wasm/
module_manager.rs

1//! WASM Module Manager
2
3use std::{
4    collections::HashMap,
5    sync::{
6        atomic::{AtomicU64, Ordering},
7        Arc, RwLock,
8    },
9};
10
11use uuid::Uuid;
12
13use crate::{
14    config::WasmRuntimeConfig,
15    errors::{Result, WasmError, WasmManagerError, WasmModuleError},
16    module::{WasmModule, WasmModuleAttachPoint},
17    runtime::WasmRuntime,
18    spec::smg::gateway::middleware_types::Action as MiddlewareAction,
19    types::{WasmComponentInput, WasmComponentOutput},
20};
21
22pub struct WasmModuleManager {
23    modules: Arc<RwLock<HashMap<Uuid, WasmModule>>>,
24    runtime: Arc<WasmRuntime>,
25    // Metrics
26    total_executions: AtomicU64,
27    successful_executions: AtomicU64,
28    failed_executions: AtomicU64,
29    total_execution_time_ms: AtomicU64,
30    max_execution_time_ms: AtomicU64,
31}
32
33impl WasmModuleManager {
34    pub fn new(config: WasmRuntimeConfig) -> Self {
35        let runtime = Arc::new(WasmRuntime::new(config));
36        Self {
37            modules: Arc::new(RwLock::new(HashMap::new())),
38            runtime,
39            total_executions: AtomicU64::new(0),
40            successful_executions: AtomicU64::new(0),
41            failed_executions: AtomicU64::new(0),
42            total_execution_time_ms: AtomicU64::new(0),
43            max_execution_time_ms: AtomicU64::new(0),
44        }
45    }
46
47    pub fn with_default_config() -> Self {
48        Self::new(WasmRuntimeConfig::default())
49    }
50
51    /// Register a module (for workflow steps)
52    pub fn register_module_internal(&self, module: WasmModule) -> Result<()> {
53        let mut modules = self
54            .modules
55            .write()
56            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
57        modules.insert(module.module_uuid, module);
58        Ok(())
59    }
60
61    /// Remove a module (for workflow steps)
62    pub fn remove_module_internal(&self, module_uuid: Uuid) -> Result<()> {
63        let mut modules = self
64            .modules
65            .write()
66            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
67        if !modules.contains_key(&module_uuid) {
68            return Err(WasmManagerError::ModuleNotFound(module_uuid).into());
69        }
70        modules.remove(&module_uuid);
71        Ok(())
72    }
73
74    pub fn check_duplicate_sha256_hash(&self, sha256_hash: &[u8; 32]) -> Result<()> {
75        let modules = self
76            .modules
77            .read()
78            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
79        if modules
80            .values()
81            .any(|module: &WasmModule| module.module_meta.sha256_hash == *sha256_hash)
82        {
83            return Err(WasmModuleError::DuplicateSha256((*sha256_hash).into()).into());
84        }
85        Ok(())
86    }
87
88    pub fn get_all_modules(&self) -> Result<Vec<WasmModule>> {
89        let modules = self
90            .modules
91            .read()
92            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
93        Ok(modules.values().cloned().collect())
94    }
95
96    pub fn get_module(&self, module_uuid: Uuid) -> Result<Option<WasmModule>> {
97        let modules = self
98            .modules
99            .read()
100            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
101        Ok(modules.get(&module_uuid).cloned())
102    }
103
104    pub fn get_modules(&self) -> Result<Vec<WasmModule>> {
105        let modules = self
106            .modules
107            .read()
108            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
109        Ok(modules.values().cloned().collect())
110    }
111
112    /// get modules by attach point
113    pub fn get_modules_by_attach_point(
114        &self,
115        attach_point: WasmModuleAttachPoint,
116    ) -> Result<Vec<WasmModule>> {
117        let modules = self
118            .modules
119            .read()
120            .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
121        Ok(modules
122            .values()
123            .filter(|module| module.module_meta.attach_points.contains(&attach_point))
124            .cloned()
125            .collect())
126    }
127
128    pub fn get_runtime(&self) -> &Arc<WasmRuntime> {
129        &self.runtime
130    }
131
132    /// Get the configured maximum body size for HTTP request/response processing
133    pub fn get_max_body_size(&self) -> usize {
134        self.runtime.get_config().max_body_size
135    }
136
137    /// Execute WASM module using WebAssembly component model based on attach_point
138    pub async fn execute_module_interface(
139        &self,
140        module_uuid: Uuid,
141        attach_point: WasmModuleAttachPoint,
142        input: WasmComponentInput,
143    ) -> Result<WasmComponentOutput> {
144        let start_time = std::time::Instant::now();
145
146        // Get the SHA256 hash and Arc-wrapped WASM bytes with a read lock.
147        // The Arc clone is ~1ns (atomic increment) instead of cloning the full bytes.
148        let (sha256_hash, wasm_bytes) = {
149            let modules = self
150                .modules
151                .read()
152                .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
153            let module = modules
154                .get(&module_uuid)
155                .ok_or_else(|| WasmError::from(WasmManagerError::ModuleNotFound(module_uuid)))?;
156
157            (
158                module.module_meta.sha256_hash,
159                module.module_meta.wasm_bytes.clone(), // Arc clone (cheap)
160            )
161        };
162
163        {
164            let mut modules = self
165                .modules
166                .write()
167                .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
168            if let Some(module) = modules.get_mut(&module_uuid) {
169                // SystemTime::duration_since only fails if the system time is before UNIX_EPOCH,
170                // which should never happen in normal operation. If it does, use current time as fallback.
171                let now = std::time::SystemTime::now()
172                    .duration_since(std::time::UNIX_EPOCH)
173                    .unwrap_or_else(|_| {
174                        // Fallback to a reasonable timestamp if system time is invalid
175                        // This should never occur in practice, but provides a safe fallback
176                        std::time::Duration::from_nanos(0)
177                    })
178                    .as_nanos() as u64;
179                module.module_meta.last_accessed_at = now;
180                module.module_meta.access_count += 1;
181            }
182        }
183
184        let result = self
185            .runtime
186            .execute_component_async(sha256_hash, wasm_bytes, attach_point, input)
187            .await;
188
189        // Record metrics
190        let execution_time_ms = start_time.elapsed().as_millis() as u64;
191        self.total_executions.fetch_add(1, Ordering::Relaxed);
192        self.total_execution_time_ms
193            .fetch_add(execution_time_ms, Ordering::Relaxed);
194        // Update max execution time
195        self.max_execution_time_ms
196            .fetch_max(execution_time_ms, Ordering::Relaxed);
197
198        if result.is_ok() {
199            self.successful_executions.fetch_add(1, Ordering::Relaxed);
200        } else {
201            self.failed_executions.fetch_add(1, Ordering::Relaxed);
202        }
203
204        result
205    }
206
207    /// Execute WASM module using WebAssembly component model (sync version)
208    pub fn execute_module_interface_sync(
209        &self,
210        module_uuid: Uuid,
211        attach_point: WasmModuleAttachPoint,
212        input: WasmComponentInput,
213    ) -> Result<WasmComponentOutput> {
214        let handle = tokio::runtime::Handle::current();
215        handle.block_on(self.execute_module_interface(module_uuid, attach_point, input))
216    }
217
218    /// Get current metrics
219    pub fn get_metrics(&self) -> (u64, u64, u64, u64, u64) {
220        (
221            self.total_executions.load(Ordering::Relaxed),
222            self.successful_executions.load(Ordering::Relaxed),
223            self.failed_executions.load(Ordering::Relaxed),
224            self.total_execution_time_ms.load(Ordering::Relaxed),
225            self.max_execution_time_ms.load(Ordering::Relaxed),
226        )
227    }
228
229    /// Execute a WASM module for a given attach point
230    /// Returns the Action if successful, or None if execution failed
231    ///
232    /// This is a convenience method that wraps execute_module_interface and handles
233    /// error logging automatically.
234    pub async fn execute_module_for_attach_point(
235        &self,
236        module: &WasmModule,
237        attach_point: WasmModuleAttachPoint,
238        input: WasmComponentInput,
239    ) -> Option<MiddlewareAction> {
240        use tracing::error;
241
242        let action_result = self
243            .execute_module_interface(module.module_uuid, attach_point, input)
244            .await;
245
246        match action_result {
247            Ok(output) => match output {
248                WasmComponentOutput::MiddlewareAction(action) => Some(action),
249            },
250            Err(e) => {
251                error!(
252                    "Failed to execute WASM module {}: {}",
253                    module.module_meta.name, e
254                );
255                None
256            }
257        }
258    }
259}
260
261impl Default for WasmModuleManager {
262    fn default() -> Self {
263        Self::with_default_config()
264    }
265}