1use std::collections::HashMap;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use tokio::sync::RwLock;
26
27use crate::bus::MessageBus;
28use crate::error::{KernelError, Result};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum ModuleState {
34 Loaded,
36 Starting,
38 Running,
40 Stopping,
42 Stopped,
44 Failed,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ModuleMetadata {
51 pub id: String,
53 pub name: String,
55 pub version: String,
57 pub kind: ModuleKind,
59 pub description: Option<String>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum ModuleKind {
67 Native,
69 Wasm,
71}
72
73#[async_trait]
79pub trait Module: Send + Sync + 'static {
80 fn metadata(&self) -> ModuleMetadata;
82
83 async fn init(&mut self, _bus: MessageBus) -> Result<()> {
88 Ok(())
89 }
90
91 async fn start(&mut self) -> Result<()>;
93
94 async fn stop(&mut self) -> Result<()>;
97}
98
99#[async_trait]
106pub trait WasmModule: Send + Sync + 'static {
107 fn metadata(&self) -> ModuleMetadata;
110
111 async fn instantiate(&mut self, _bus: MessageBus) -> Result<()> {
113 Ok(())
114 }
115
116 async fn start(&mut self) -> Result<()>;
118
119 async fn stop(&mut self) -> Result<()>;
121}
122
123enum AnyModule {
125 Native(Box<dyn Module>),
126 Wasm(Box<dyn WasmModule>),
127}
128
129impl AnyModule {
130 fn metadata(&self) -> ModuleMetadata {
131 match self {
132 AnyModule::Native(m) => m.metadata(),
133 AnyModule::Wasm(m) => m.metadata(),
134 }
135 }
136
137 async fn init(&mut self, bus: MessageBus) -> Result<()> {
138 match self {
139 AnyModule::Native(m) => m.init(bus).await,
140 AnyModule::Wasm(m) => m.instantiate(bus).await,
141 }
142 }
143
144 async fn start(&mut self) -> Result<()> {
145 match self {
146 AnyModule::Native(m) => m.start().await,
147 AnyModule::Wasm(m) => m.start().await,
148 }
149 }
150
151 async fn stop(&mut self) -> Result<()> {
152 match self {
153 AnyModule::Native(m) => m.stop().await,
154 AnyModule::Wasm(m) => m.stop().await,
155 }
156 }
157}
158
159struct ModuleEntry {
160 module: AnyModule,
161 state: ModuleState,
162}
163
164#[derive(Clone)]
170pub struct ModuleManager {
171 bus: MessageBus,
172 inner: Arc<RwLock<HashMap<String, ModuleEntry>>>,
173}
174
175impl ModuleManager {
176 pub fn new(bus: MessageBus) -> Self {
178 Self {
179 bus,
180 inner: Arc::new(RwLock::new(HashMap::new())),
181 }
182 }
183
184 pub async fn register_native<M: Module>(&self, module: M) -> Result<()> {
188 let metadata = module.metadata();
189 self.insert(metadata.id.clone(), AnyModule::Native(Box::new(module)))
190 .await
191 }
192
193 pub async fn register_wasm<M: WasmModule>(&self, module: M) -> Result<()> {
195 let metadata = module.metadata();
196 self.insert(metadata.id.clone(), AnyModule::Wasm(Box::new(module)))
197 .await
198 }
199
200 async fn insert(&self, id: String, module: AnyModule) -> Result<()> {
201 let mut map = self.inner.write().await;
202 if map.contains_key(&id) {
203 return Err(KernelError::DuplicateModule(id));
204 }
205 map.insert(
206 id,
207 ModuleEntry {
208 module,
209 state: ModuleState::Loaded,
210 },
211 );
212 Ok(())
213 }
214
215 pub async fn init(&self, id: &str) -> Result<()> {
217 let mut map = self.inner.write().await;
218 let entry = map
219 .get_mut(id)
220 .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
221 entry.module.init(self.bus.clone()).await
222 }
223
224 pub async fn start(&self, id: &str) -> Result<()> {
226 let mut map = self.inner.write().await;
227 let entry = map
228 .get_mut(id)
229 .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
230
231 entry.state = ModuleState::Starting;
232 match entry.module.start().await {
233 Ok(()) => {
234 entry.state = ModuleState::Running;
235 let id_owned = id.to_string();
236 drop(map);
237 self.bus
238 .emit_event(
239 "kernel",
240 crate::bus::Event::ModuleStarted {
241 module_id: id_owned,
242 },
243 )
244 .await?;
245 Ok(())
246 }
247 Err(e) => {
248 entry.state = ModuleState::Failed;
249 Err(e)
250 }
251 }
252 }
253
254 pub async fn stop(&self, id: &str) -> Result<()> {
256 let mut map = self.inner.write().await;
257 let entry = map
258 .get_mut(id)
259 .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
260
261 entry.state = ModuleState::Stopping;
262 match entry.module.stop().await {
263 Ok(()) => {
264 entry.state = ModuleState::Stopped;
265 let id_owned = id.to_string();
266 drop(map);
267 self.bus
268 .emit_event(
269 "kernel",
270 crate::bus::Event::ModuleStopped {
271 module_id: id_owned,
272 },
273 )
274 .await?;
275 Ok(())
276 }
277 Err(e) => {
278 entry.state = ModuleState::Failed;
279 Err(e)
280 }
281 }
282 }
283
284 pub async fn unload(&self, id: &str) -> Result<()> {
286 let _ = self.stop(id).await;
289 let mut map = self.inner.write().await;
290 map.remove(id)
291 .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
292 Ok(())
293 }
294
295 pub async fn state(&self, id: &str) -> Option<ModuleState> {
297 self.inner.read().await.get(id).map(|e| e.state)
298 }
299
300 pub async fn list(&self) -> Vec<(ModuleMetadata, ModuleState)> {
302 self.inner
303 .read()
304 .await
305 .values()
306 .map(|e| (e.module.metadata(), e.state))
307 .collect()
308 }
309}
310
311impl Debug for ModuleManager {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 f.debug_struct("ModuleManager").finish_non_exhaustive()
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use std::sync::atomic::{AtomicUsize, Ordering};
321 use std::sync::Arc;
322
323 struct EchoModule {
325 meta: ModuleMetadata,
326 start_count: Arc<AtomicUsize>,
327 stop_count: Arc<AtomicUsize>,
328 }
329
330 impl EchoModule {
331 fn new(id: &str) -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
332 let start = Arc::new(AtomicUsize::new(0));
333 let stop = Arc::new(AtomicUsize::new(0));
334 (
335 Self {
336 meta: ModuleMetadata {
337 id: id.to_string(),
338 name: format!("Echo {id}"),
339 version: "0.1.0".into(),
340 kind: ModuleKind::Native,
341 description: None,
342 },
343 start_count: start.clone(),
344 stop_count: stop.clone(),
345 },
346 start,
347 stop,
348 )
349 }
350 }
351
352 #[async_trait]
353 impl Module for EchoModule {
354 fn metadata(&self) -> ModuleMetadata {
355 self.meta.clone()
356 }
357 async fn start(&mut self) -> Result<()> {
358 self.start_count.fetch_add(1, Ordering::SeqCst);
359 Ok(())
360 }
361 async fn stop(&mut self) -> Result<()> {
362 self.stop_count.fetch_add(1, Ordering::SeqCst);
363 Ok(())
364 }
365 }
366
367 struct StubWasm {
369 meta: ModuleMetadata,
370 }
371
372 #[async_trait]
373 impl WasmModule for StubWasm {
374 fn metadata(&self) -> ModuleMetadata {
375 self.meta.clone()
376 }
377 async fn start(&mut self) -> Result<()> {
378 Ok(())
379 }
380 async fn stop(&mut self) -> Result<()> {
381 Ok(())
382 }
383 }
384
385 #[tokio::test]
386 async fn register_start_stop_native_module() {
387 let bus = MessageBus::new();
388 let mgr = ModuleManager::new(bus);
389 let (module, started, stopped) = EchoModule::new("echo");
390
391 mgr.register_native(module).await.unwrap();
392 assert_eq!(mgr.state("echo").await, Some(ModuleState::Loaded));
393
394 mgr.start("echo").await.unwrap();
395 assert_eq!(mgr.state("echo").await, Some(ModuleState::Running));
396 assert_eq!(started.load(Ordering::SeqCst), 1);
397
398 mgr.stop("echo").await.unwrap();
399 assert_eq!(mgr.state("echo").await, Some(ModuleState::Stopped));
400 assert_eq!(stopped.load(Ordering::SeqCst), 1);
401 }
402
403 #[tokio::test]
404 async fn duplicate_registration_is_rejected() {
405 let mgr = ModuleManager::new(MessageBus::new());
406 let (m1, _, _) = EchoModule::new("dup");
407 let (m2, _, _) = EchoModule::new("dup");
408
409 mgr.register_native(m1).await.unwrap();
410 let err = mgr.register_native(m2).await.unwrap_err();
411 assert!(matches!(err, KernelError::DuplicateModule(_)));
412 }
413
414 #[tokio::test]
415 async fn unknown_module_returns_error() {
416 let mgr = ModuleManager::new(MessageBus::new());
417 let err = mgr.start("missing").await.unwrap_err();
418 assert!(matches!(err, KernelError::UnknownModule(_)));
419 }
420
421 #[tokio::test]
422 async fn register_wasm_module() {
423 let mgr = ModuleManager::new(MessageBus::new());
424 let stub = StubWasm {
425 meta: ModuleMetadata {
426 id: "wasm-stub".into(),
427 name: "Stub".into(),
428 version: "0.0.1".into(),
429 kind: ModuleKind::Wasm,
430 description: None,
431 },
432 };
433
434 mgr.register_wasm(stub).await.unwrap();
435 mgr.start("wasm-stub").await.unwrap();
436 assert_eq!(mgr.state("wasm-stub").await, Some(ModuleState::Running));
437
438 let list = mgr.list().await;
439 assert_eq!(list.len(), 1);
440 assert_eq!(list[0].0.kind, ModuleKind::Wasm);
441 }
442
443 #[tokio::test]
444 async fn unload_removes_module() {
445 let mgr = ModuleManager::new(MessageBus::new());
446 let (m, _, _) = EchoModule::new("echo");
447 mgr.register_native(m).await.unwrap();
448 mgr.start("echo").await.unwrap();
449
450 mgr.unload("echo").await.unwrap();
451 assert!(mgr.state("echo").await.is_none());
452 }
453
454 #[tokio::test]
455 async fn start_emits_module_started_event() {
456 let bus = MessageBus::new();
457 let mut sub = bus.subscribe().await;
458 let mgr = ModuleManager::new(bus);
459
460 let (m, _, _) = EchoModule::new("echo");
461 mgr.register_native(m).await.unwrap();
462 mgr.start("echo").await.unwrap();
463
464 let env = sub.receiver.recv().await.unwrap();
465 match env.message {
466 crate::bus::Message::Event(crate::bus::Event::ModuleStarted { module_id }) => {
467 assert_eq!(module_id, "echo");
468 }
469 other => panic!("unexpected message: {other:?}"),
470 }
471 }
472}