Skip to main content

oxide_k/
module.rs

1//! # Module Orchestration
2//!
3//! Defines the traits and the registry that the kernel uses to manage the
4//! lifecycle of native Rust modules and WebAssembly (WASM) plugins.
5//!
6//! The micro-kernel does not bake in a single module ABI. Instead it exposes
7//! two complementary traits:
8//!
9//! * [`Module`] – implemented by native, in-process Rust modules. Used for the
10//!   Layer 1 "Native Core" components in the Rust Oxide architecture (network
11//!   stack, SQLite engine, etc.).
12//! * [`WasmModule`] – implemented by WebAssembly plugins. Used for the Layer 2
13//!   sandboxed components (extractors, parsers, `oxide-compress`, etc.).
14//!
15//! The [`ModuleManager`] keeps a map of loaded modules keyed by id and drives
16//! them through a small state machine (`Loaded -> Starting -> Running ->
17//! Stopping -> Stopped`).
18
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use tokio::sync::RwLock;
26
27use crate::bus::MessageBus;
28use crate::error::{KernelError, Result};
29
30/// The lifecycle state of a module known to the [`ModuleManager`].
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum ModuleState {
34    /// The module has been registered but not yet started.
35    Loaded,
36    /// The module is transitioning from `Loaded` to `Running`.
37    Starting,
38    /// The module is fully started and accepting work.
39    Running,
40    /// The module is transitioning from `Running` to `Stopped`.
41    Stopping,
42    /// The module has been stopped.
43    Stopped,
44    /// The module crashed or its `start`/`stop` hook returned an error.
45    Failed,
46}
47
48/// Static information about a module supplied at registration time.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ModuleMetadata {
51    /// Stable, unique module id (e.g. `"oxide-mirror"`).
52    pub id: String,
53    /// Human-readable module name.
54    pub name: String,
55    /// Semantic version string.
56    pub version: String,
57    /// Module kind / execution layer.
58    pub kind: ModuleKind,
59    /// Free-form description.
60    pub description: Option<String>,
61}
62
63/// The execution layer / runtime that hosts a given module.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum ModuleKind {
67    /// A native Rust module ([`Module`]). Layer 1 in the architecture.
68    Native,
69    /// A WebAssembly plugin ([`WasmModule`]). Layer 2 in the architecture.
70    Wasm,
71}
72
73/// Trait implemented by all native, in-process modules.
74///
75/// Methods are `async` because most kernel work flows through Tokio and many
76/// modules need to await I/O during `start`/`stop`. Implementors should not
77/// block the runtime in these hooks.
78#[async_trait]
79pub trait Module: Send + Sync + 'static {
80    /// Static module metadata.
81    fn metadata(&self) -> ModuleMetadata;
82
83    /// Initialize the module. Called once, before `start`.
84    ///
85    /// The `bus` is provided so the module can subscribe to events or publish
86    /// initial state. The default implementation is a no-op.
87    async fn init(&mut self, _bus: MessageBus) -> Result<()> {
88        Ok(())
89    }
90
91    /// Start the module. Must be idempotent.
92    async fn start(&mut self) -> Result<()>;
93
94    /// Stop the module. Must be idempotent and quick; long-running cleanup
95    /// belongs in a separate task signalled from here.
96    async fn stop(&mut self) -> Result<()>;
97}
98
99/// Trait implemented by WebAssembly plugins.
100///
101/// Right now the trait is a thin placeholder: it mirrors [`Module`] but uses a
102/// distinct identity so the manager can keep native and WASM modules separate
103/// and apply different sandboxing/security policies later (e.g. wasmtime
104/// engine configuration, capability gating).
105#[async_trait]
106pub trait WasmModule: Send + Sync + 'static {
107    /// Static module metadata. The `kind` field should be
108    /// [`ModuleKind::Wasm`].
109    fn metadata(&self) -> ModuleMetadata;
110
111    /// Instantiate the WASM module inside the host runtime.
112    async fn instantiate(&mut self, _bus: MessageBus) -> Result<()> {
113        Ok(())
114    }
115
116    /// Start the WASM module's main entry point.
117    async fn start(&mut self) -> Result<()>;
118
119    /// Stop and tear down the WASM instance.
120    async fn stop(&mut self) -> Result<()>;
121}
122
123/// Internal enum that lets the manager hold both module flavours in a single map.
124enum AnyModule {
125    Native(Box<dyn Module>),
126    Wasm(Box<dyn WasmModule>),
127}
128
129impl AnyModule {
130    fn metadata(&self) -> ModuleMetadata {
131        match self {
132            AnyModule::Native(m) => m.metadata(),
133            AnyModule::Wasm(m) => m.metadata(),
134        }
135    }
136
137    async fn init(&mut self, bus: MessageBus) -> Result<()> {
138        match self {
139            AnyModule::Native(m) => m.init(bus).await,
140            AnyModule::Wasm(m) => m.instantiate(bus).await,
141        }
142    }
143
144    async fn start(&mut self) -> Result<()> {
145        match self {
146            AnyModule::Native(m) => m.start().await,
147            AnyModule::Wasm(m) => m.start().await,
148        }
149    }
150
151    async fn stop(&mut self) -> Result<()> {
152        match self {
153            AnyModule::Native(m) => m.stop().await,
154            AnyModule::Wasm(m) => m.stop().await,
155        }
156    }
157}
158
159struct ModuleEntry {
160    module: AnyModule,
161    state: ModuleState,
162}
163
164/// Manages the set of modules loaded into the kernel.
165///
166/// `ModuleManager` is cheaply cloneable (it wraps its state in an `Arc`) so it
167/// can be handed to subsystems that need to inspect module status, e.g. the
168/// future CLI.
169#[derive(Clone)]
170pub struct ModuleManager {
171    bus: MessageBus,
172    inner: Arc<RwLock<HashMap<String, ModuleEntry>>>,
173}
174
175impl ModuleManager {
176    /// Construct a new manager bound to the given message bus.
177    pub fn new(bus: MessageBus) -> Self {
178        Self {
179            bus,
180            inner: Arc::new(RwLock::new(HashMap::new())),
181        }
182    }
183
184    /// Register a native Rust [`Module`] with the manager.
185    ///
186    /// The module is moved into the manager and held until it is unloaded.
187    pub async fn register_native<M: Module>(&self, module: M) -> Result<()> {
188        let metadata = module.metadata();
189        self.insert(metadata.id.clone(), AnyModule::Native(Box::new(module)))
190            .await
191    }
192
193    /// Register a [`WasmModule`] plugin with the manager.
194    pub async fn register_wasm<M: WasmModule>(&self, module: M) -> Result<()> {
195        let metadata = module.metadata();
196        self.insert(metadata.id.clone(), AnyModule::Wasm(Box::new(module)))
197            .await
198    }
199
200    async fn insert(&self, id: String, module: AnyModule) -> Result<()> {
201        let mut map = self.inner.write().await;
202        if map.contains_key(&id) {
203            return Err(KernelError::DuplicateModule(id));
204        }
205        map.insert(
206            id,
207            ModuleEntry {
208                module,
209                state: ModuleState::Loaded,
210            },
211        );
212        Ok(())
213    }
214
215    /// Drive a registered module's `init` hook.
216    pub async fn init(&self, id: &str) -> Result<()> {
217        let mut map = self.inner.write().await;
218        let entry = map
219            .get_mut(id)
220            .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
221        entry.module.init(self.bus.clone()).await
222    }
223
224    /// Start a registered module.
225    pub async fn start(&self, id: &str) -> Result<()> {
226        let mut map = self.inner.write().await;
227        let entry = map
228            .get_mut(id)
229            .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
230
231        entry.state = ModuleState::Starting;
232        match entry.module.start().await {
233            Ok(()) => {
234                entry.state = ModuleState::Running;
235                let id_owned = id.to_string();
236                drop(map);
237                self.bus
238                    .emit_event(
239                        "kernel",
240                        crate::bus::Event::ModuleStarted {
241                            module_id: id_owned,
242                        },
243                    )
244                    .await?;
245                Ok(())
246            }
247            Err(e) => {
248                entry.state = ModuleState::Failed;
249                Err(e)
250            }
251        }
252    }
253
254    /// Stop a registered module.
255    pub async fn stop(&self, id: &str) -> Result<()> {
256        let mut map = self.inner.write().await;
257        let entry = map
258            .get_mut(id)
259            .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
260
261        entry.state = ModuleState::Stopping;
262        match entry.module.stop().await {
263            Ok(()) => {
264                entry.state = ModuleState::Stopped;
265                let id_owned = id.to_string();
266                drop(map);
267                self.bus
268                    .emit_event(
269                        "kernel",
270                        crate::bus::Event::ModuleStopped {
271                            module_id: id_owned,
272                        },
273                    )
274                    .await?;
275                Ok(())
276            }
277            Err(e) => {
278                entry.state = ModuleState::Failed;
279                Err(e)
280            }
281        }
282    }
283
284    /// Stop and remove a module from the manager entirely.
285    pub async fn unload(&self, id: &str) -> Result<()> {
286        // Best-effort stop first; ignore the error if the module is already
287        // stopped — the goal is to remove it.
288        let _ = self.stop(id).await;
289        let mut map = self.inner.write().await;
290        map.remove(id)
291            .ok_or_else(|| KernelError::UnknownModule(id.to_string()))?;
292        Ok(())
293    }
294
295    /// Return the current state of a module, if it is registered.
296    pub async fn state(&self, id: &str) -> Option<ModuleState> {
297        self.inner.read().await.get(id).map(|e| e.state)
298    }
299
300    /// Return a snapshot of all registered modules and their states.
301    pub async fn list(&self) -> Vec<(ModuleMetadata, ModuleState)> {
302        self.inner
303            .read()
304            .await
305            .values()
306            .map(|e| (e.module.metadata(), e.state))
307            .collect()
308    }
309}
310
311impl Debug for ModuleManager {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        f.debug_struct("ModuleManager").finish_non_exhaustive()
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use std::sync::atomic::{AtomicUsize, Ordering};
321    use std::sync::Arc;
322
323    /// Minimal native module used to exercise the manager.
324    struct EchoModule {
325        meta: ModuleMetadata,
326        start_count: Arc<AtomicUsize>,
327        stop_count: Arc<AtomicUsize>,
328    }
329
330    impl EchoModule {
331        fn new(id: &str) -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
332            let start = Arc::new(AtomicUsize::new(0));
333            let stop = Arc::new(AtomicUsize::new(0));
334            (
335                Self {
336                    meta: ModuleMetadata {
337                        id: id.to_string(),
338                        name: format!("Echo {id}"),
339                        version: "0.1.0".into(),
340                        kind: ModuleKind::Native,
341                        description: None,
342                    },
343                    start_count: start.clone(),
344                    stop_count: stop.clone(),
345                },
346                start,
347                stop,
348            )
349        }
350    }
351
352    #[async_trait]
353    impl Module for EchoModule {
354        fn metadata(&self) -> ModuleMetadata {
355            self.meta.clone()
356        }
357        async fn start(&mut self) -> Result<()> {
358            self.start_count.fetch_add(1, Ordering::SeqCst);
359            Ok(())
360        }
361        async fn stop(&mut self) -> Result<()> {
362            self.stop_count.fetch_add(1, Ordering::SeqCst);
363            Ok(())
364        }
365    }
366
367    /// Minimal WASM module stub.
368    struct StubWasm {
369        meta: ModuleMetadata,
370    }
371
372    #[async_trait]
373    impl WasmModule for StubWasm {
374        fn metadata(&self) -> ModuleMetadata {
375            self.meta.clone()
376        }
377        async fn start(&mut self) -> Result<()> {
378            Ok(())
379        }
380        async fn stop(&mut self) -> Result<()> {
381            Ok(())
382        }
383    }
384
385    #[tokio::test]
386    async fn register_start_stop_native_module() {
387        let bus = MessageBus::new();
388        let mgr = ModuleManager::new(bus);
389        let (module, started, stopped) = EchoModule::new("echo");
390
391        mgr.register_native(module).await.unwrap();
392        assert_eq!(mgr.state("echo").await, Some(ModuleState::Loaded));
393
394        mgr.start("echo").await.unwrap();
395        assert_eq!(mgr.state("echo").await, Some(ModuleState::Running));
396        assert_eq!(started.load(Ordering::SeqCst), 1);
397
398        mgr.stop("echo").await.unwrap();
399        assert_eq!(mgr.state("echo").await, Some(ModuleState::Stopped));
400        assert_eq!(stopped.load(Ordering::SeqCst), 1);
401    }
402
403    #[tokio::test]
404    async fn duplicate_registration_is_rejected() {
405        let mgr = ModuleManager::new(MessageBus::new());
406        let (m1, _, _) = EchoModule::new("dup");
407        let (m2, _, _) = EchoModule::new("dup");
408
409        mgr.register_native(m1).await.unwrap();
410        let err = mgr.register_native(m2).await.unwrap_err();
411        assert!(matches!(err, KernelError::DuplicateModule(_)));
412    }
413
414    #[tokio::test]
415    async fn unknown_module_returns_error() {
416        let mgr = ModuleManager::new(MessageBus::new());
417        let err = mgr.start("missing").await.unwrap_err();
418        assert!(matches!(err, KernelError::UnknownModule(_)));
419    }
420
421    #[tokio::test]
422    async fn register_wasm_module() {
423        let mgr = ModuleManager::new(MessageBus::new());
424        let stub = StubWasm {
425            meta: ModuleMetadata {
426                id: "wasm-stub".into(),
427                name: "Stub".into(),
428                version: "0.0.1".into(),
429                kind: ModuleKind::Wasm,
430                description: None,
431            },
432        };
433
434        mgr.register_wasm(stub).await.unwrap();
435        mgr.start("wasm-stub").await.unwrap();
436        assert_eq!(mgr.state("wasm-stub").await, Some(ModuleState::Running));
437
438        let list = mgr.list().await;
439        assert_eq!(list.len(), 1);
440        assert_eq!(list[0].0.kind, ModuleKind::Wasm);
441    }
442
443    #[tokio::test]
444    async fn unload_removes_module() {
445        let mgr = ModuleManager::new(MessageBus::new());
446        let (m, _, _) = EchoModule::new("echo");
447        mgr.register_native(m).await.unwrap();
448        mgr.start("echo").await.unwrap();
449
450        mgr.unload("echo").await.unwrap();
451        assert!(mgr.state("echo").await.is_none());
452    }
453
454    #[tokio::test]
455    async fn start_emits_module_started_event() {
456        let bus = MessageBus::new();
457        let mut sub = bus.subscribe().await;
458        let mgr = ModuleManager::new(bus);
459
460        let (m, _, _) = EchoModule::new("echo");
461        mgr.register_native(m).await.unwrap();
462        mgr.start("echo").await.unwrap();
463
464        let env = sub.receiver.recv().await.unwrap();
465        match env.message {
466            crate::bus::Message::Event(crate::bus::Event::ModuleStarted { module_id }) => {
467                assert_eq!(module_id, "echo");
468            }
469            other => panic!("unexpected message: {other:?}"),
470        }
471    }
472}