smg_wasm/
module_manager.rs1use std::{
4 collections::HashMap,
5 sync::{
6 atomic::{AtomicU64, Ordering},
7 Arc, RwLock,
8 },
9};
10
11use uuid::Uuid;
12
13use crate::{
14 config::WasmRuntimeConfig,
15 errors::{Result, WasmError, WasmManagerError, WasmModuleError},
16 module::{WasmModule, WasmModuleAttachPoint},
17 runtime::WasmRuntime,
18 spec::smg::gateway::middleware_types::Action as MiddlewareAction,
19 types::{WasmComponentInput, WasmComponentOutput},
20};
21
22pub struct WasmModuleManager {
23 modules: Arc<RwLock<HashMap<Uuid, WasmModule>>>,
24 runtime: Arc<WasmRuntime>,
25 total_executions: AtomicU64,
27 successful_executions: AtomicU64,
28 failed_executions: AtomicU64,
29 total_execution_time_ms: AtomicU64,
30 max_execution_time_ms: AtomicU64,
31}
32
33impl WasmModuleManager {
34 pub fn new(config: WasmRuntimeConfig) -> Self {
35 let runtime = Arc::new(WasmRuntime::new(config));
36 Self {
37 modules: Arc::new(RwLock::new(HashMap::new())),
38 runtime,
39 total_executions: AtomicU64::new(0),
40 successful_executions: AtomicU64::new(0),
41 failed_executions: AtomicU64::new(0),
42 total_execution_time_ms: AtomicU64::new(0),
43 max_execution_time_ms: AtomicU64::new(0),
44 }
45 }
46
47 pub fn with_default_config() -> Self {
48 Self::new(WasmRuntimeConfig::default())
49 }
50
51 pub fn register_module_internal(&self, module: WasmModule) -> Result<()> {
53 let mut modules = self
54 .modules
55 .write()
56 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
57 modules.insert(module.module_uuid, module);
58 Ok(())
59 }
60
61 pub fn remove_module_internal(&self, module_uuid: Uuid) -> Result<()> {
63 let mut modules = self
64 .modules
65 .write()
66 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
67 if !modules.contains_key(&module_uuid) {
68 return Err(WasmManagerError::ModuleNotFound(module_uuid).into());
69 }
70 modules.remove(&module_uuid);
71 Ok(())
72 }
73
74 pub fn check_duplicate_sha256_hash(&self, sha256_hash: &[u8; 32]) -> Result<()> {
75 let modules = self
76 .modules
77 .read()
78 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
79 if modules
80 .values()
81 .any(|module: &WasmModule| module.module_meta.sha256_hash == *sha256_hash)
82 {
83 return Err(WasmModuleError::DuplicateSha256((*sha256_hash).into()).into());
84 }
85 Ok(())
86 }
87
88 pub fn get_all_modules(&self) -> Result<Vec<WasmModule>> {
89 let modules = self
90 .modules
91 .read()
92 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
93 Ok(modules.values().cloned().collect())
94 }
95
96 pub fn get_module(&self, module_uuid: Uuid) -> Result<Option<WasmModule>> {
97 let modules = self
98 .modules
99 .read()
100 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
101 Ok(modules.get(&module_uuid).cloned())
102 }
103
104 pub fn get_modules(&self) -> Result<Vec<WasmModule>> {
105 let modules = self
106 .modules
107 .read()
108 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
109 Ok(modules.values().cloned().collect())
110 }
111
112 pub fn get_modules_by_attach_point(
114 &self,
115 attach_point: WasmModuleAttachPoint,
116 ) -> Result<Vec<WasmModule>> {
117 let modules = self
118 .modules
119 .read()
120 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
121 Ok(modules
122 .values()
123 .filter(|module| module.module_meta.attach_points.contains(&attach_point))
124 .cloned()
125 .collect())
126 }
127
128 pub fn get_runtime(&self) -> &Arc<WasmRuntime> {
129 &self.runtime
130 }
131
132 pub fn get_max_body_size(&self) -> usize {
134 self.runtime.get_config().max_body_size
135 }
136
137 pub async fn execute_module_interface(
139 &self,
140 module_uuid: Uuid,
141 attach_point: WasmModuleAttachPoint,
142 input: WasmComponentInput,
143 ) -> Result<WasmComponentOutput> {
144 let start_time = std::time::Instant::now();
145
146 let (sha256_hash, wasm_bytes) = {
149 let modules = self
150 .modules
151 .read()
152 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
153 let module = modules
154 .get(&module_uuid)
155 .ok_or_else(|| WasmError::from(WasmManagerError::ModuleNotFound(module_uuid)))?;
156
157 (
158 module.module_meta.sha256_hash,
159 module.module_meta.wasm_bytes.clone(), )
161 };
162
163 {
164 let mut modules = self
165 .modules
166 .write()
167 .map_err(|e| WasmManagerError::LockFailed(e.to_string()))?;
168 if let Some(module) = modules.get_mut(&module_uuid) {
169 let now = std::time::SystemTime::now()
172 .duration_since(std::time::UNIX_EPOCH)
173 .unwrap_or_else(|_| {
174 std::time::Duration::from_nanos(0)
177 })
178 .as_nanos() as u64;
179 module.module_meta.last_accessed_at = now;
180 module.module_meta.access_count += 1;
181 }
182 }
183
184 let result = self
185 .runtime
186 .execute_component_async(sha256_hash, wasm_bytes, attach_point, input)
187 .await;
188
189 let execution_time_ms = start_time.elapsed().as_millis() as u64;
191 self.total_executions.fetch_add(1, Ordering::Relaxed);
192 self.total_execution_time_ms
193 .fetch_add(execution_time_ms, Ordering::Relaxed);
194 self.max_execution_time_ms
196 .fetch_max(execution_time_ms, Ordering::Relaxed);
197
198 if result.is_ok() {
199 self.successful_executions.fetch_add(1, Ordering::Relaxed);
200 } else {
201 self.failed_executions.fetch_add(1, Ordering::Relaxed);
202 }
203
204 result
205 }
206
207 pub fn execute_module_interface_sync(
209 &self,
210 module_uuid: Uuid,
211 attach_point: WasmModuleAttachPoint,
212 input: WasmComponentInput,
213 ) -> Result<WasmComponentOutput> {
214 let handle = tokio::runtime::Handle::current();
215 handle.block_on(self.execute_module_interface(module_uuid, attach_point, input))
216 }
217
218 pub fn get_metrics(&self) -> (u64, u64, u64, u64, u64) {
220 (
221 self.total_executions.load(Ordering::Relaxed),
222 self.successful_executions.load(Ordering::Relaxed),
223 self.failed_executions.load(Ordering::Relaxed),
224 self.total_execution_time_ms.load(Ordering::Relaxed),
225 self.max_execution_time_ms.load(Ordering::Relaxed),
226 )
227 }
228
229 pub async fn execute_module_for_attach_point(
235 &self,
236 module: &WasmModule,
237 attach_point: WasmModuleAttachPoint,
238 input: WasmComponentInput,
239 ) -> Option<MiddlewareAction> {
240 use tracing::error;
241
242 let action_result = self
243 .execute_module_interface(module.module_uuid, attach_point, input)
244 .await;
245
246 match action_result {
247 Ok(output) => match output {
248 WasmComponentOutput::MiddlewareAction(action) => Some(action),
249 },
250 Err(e) => {
251 error!(
252 "Failed to execute WASM module {}: {}",
253 module.module_meta.name, e
254 );
255 None
256 }
257 }
258 }
259}
260
261impl Default for WasmModuleManager {
262 fn default() -> Self {
263 Self::with_default_config()
264 }
265}