Skip to main content

memlink_runtime/
runtime.rs

1//! High-level module runtime API.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
5
6use dashmap::DashMap;
7
8use crate::error::{Error, Result};
9use crate::ffi::loader::ModuleLoader;
10use crate::instance::ModuleInstance;
11use crate::profile::ModuleProfile;
12use crate::reload::{ReloadConfig, ReloadState};
13use crate::resolver::{ModuleRef, ModuleResolver, local::LocalResolver};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct ModuleHandle(pub u64);
17
18#[derive(Debug, Clone, Default)]
19pub struct ModuleUsage {
20    pub arena_usage: f32,
21    pub arena_bytes: usize,
22    pub call_count: u64,
23}
24
25struct LoadedModule {
26    instance: ModuleInstance,
27    profile: ModuleProfile,
28    call_count: AtomicU64,
29    in_flight: AtomicUsize,
30    draining: AtomicBool,
31}
32
33pub struct Runtime {
34    resolver: Arc<dyn ModuleResolver>,
35    loader: ModuleLoader,
36    instances: DashMap<ModuleHandle, LoadedModule>,
37    handle_counter: AtomicU64,
38}
39
40impl Runtime {
41    pub fn new(resolver: Arc<dyn ModuleResolver>) -> Self {
42        Runtime {
43            resolver,
44            loader: ModuleLoader::new(),
45            instances: DashMap::new(),
46            handle_counter: AtomicU64::new(1),
47        }
48    }
49
50    pub fn with_local_resolver() -> Self {
51        Self::new(Arc::new(LocalResolver::new()))
52    }
53
54    fn next_handle(&self) -> ModuleHandle {
55        let id = self.handle_counter.fetch_add(1, Ordering::Relaxed);
56        ModuleHandle(id)
57    }
58
59    pub fn loaded_count(&self) -> usize {
60        self.instances.len()
61    }
62
63    pub fn is_loaded(&self, handle: ModuleHandle) -> bool {
64        self.instances.contains_key(&handle)
65    }
66
67    pub fn loaded_handles(&self) -> Vec<ModuleHandle> {
68        self.instances.iter().map(|r| *r.key()).collect()
69    }
70}
71
72pub trait ModuleRuntime: Send + Sync {
73    fn load(&self, reference: ModuleRef) -> Result<ModuleHandle>;
74    fn call(&self, handle: ModuleHandle, method: &str, args: &[u8]) -> Result<Vec<u8>>;
75    fn unload(&self, handle: ModuleHandle) -> Result<()>;
76    fn get_usage(&self, handle: ModuleHandle) -> Option<ModuleUsage>;
77    fn get_profile(&self, handle: ModuleHandle) -> Option<ModuleProfile>;
78    fn reload(&self, handle: ModuleHandle, reference: ModuleRef) -> Result<ReloadState>;
79    fn reload_with_config(
80        &self,
81        handle: ModuleHandle,
82        reference: ModuleRef,
83        config: ReloadConfig,
84    ) -> Result<ReloadState>;
85}
86
87impl ModuleRuntime for Runtime {
88    fn load(&self, reference: ModuleRef) -> Result<ModuleHandle> {
89        let artifact = self.resolver.resolve(reference)?;
90        let instance = self.loader.load(artifact)?;
91
92        let path_str = instance.path()
93            .file_stem()
94            .and_then(|s| s.to_str())
95            .unwrap_or("unknown")
96            .to_string();
97
98        let profile = ModuleProfile::new(path_str);
99        let handle = self.next_handle();
100
101        self.instances.insert(handle, LoadedModule {
102            instance,
103            profile,
104            call_count: AtomicU64::new(0),
105            in_flight: AtomicUsize::new(0),
106            draining: AtomicBool::new(false),
107        });
108
109        Ok(handle)
110    }
111
112    fn call(&self, handle: ModuleHandle, method: &str, args: &[u8]) -> Result<Vec<u8>> {
113        let module = self.instances.get(&handle)
114            .ok_or_else(|| Error::FileNotFound(
115                std::path::PathBuf::from(format!("Module handle {}", handle.0))
116            ))?;
117
118        if module.draining.load(Ordering::Relaxed) {
119            return Err(Error::ReloadInProgress(handle.0));
120        }
121
122        module.in_flight.fetch_add(1, Ordering::Relaxed);
123        module.call_count.fetch_add(1, Ordering::Relaxed);
124
125        let result = module.instance.call(method, args);
126
127        module.in_flight.fetch_sub(1, Ordering::Relaxed);
128
129        result
130    }
131
132    fn unload(&self, handle: ModuleHandle) -> Result<()> {
133        let removed = self.instances.remove(&handle);
134
135        if removed.is_some() {
136            Ok(())
137        } else {
138            Err(Error::FileNotFound(
139                std::path::PathBuf::from(format!("Module handle {}", handle.0))
140            ))
141        }
142    }
143
144    fn get_usage(&self, handle: ModuleHandle) -> Option<ModuleUsage> {
145        let module = self.instances.get(&handle)?;
146
147        let arena = module.instance.arena();
148        let usage = arena.usage();
149        let arena_bytes = arena.used();
150        let call_count = module.call_count.load(Ordering::Relaxed);
151
152        Some(ModuleUsage {
153            arena_usage: usage,
154            arena_bytes,
155            call_count,
156        })
157    }
158
159    fn get_profile(&self, handle: ModuleHandle) -> Option<ModuleProfile> {
160        let module = self.instances.get(&handle)?;
161        Some(module.profile.clone())
162    }
163
164    fn reload(&self, handle: ModuleHandle, reference: ModuleRef) -> Result<ReloadState> {
165        self.reload_with_config(handle, reference, ReloadConfig::default())
166    }
167
168    fn reload_with_config(
169        &self,
170        handle: ModuleHandle,
171        reference: ModuleRef,
172        config: ReloadConfig,
173    ) -> Result<ReloadState> {
174        let old_module = self.instances.get(&handle)
175            .ok_or_else(|| Error::FileNotFound(
176                std::path::PathBuf::from(format!("Module handle {}", handle.0))
177            ))?;
178
179        if old_module.draining.load(Ordering::Relaxed) {
180            return Err(Error::ReloadInProgress(handle.0));
181        }
182
183        let in_flight_count = old_module.in_flight.load(Ordering::Relaxed);
184        old_module.draining.store(true, Ordering::Relaxed);
185
186        let new_artifact = self.resolver.resolve(reference)?;
187        let new_instance = self.loader.load(new_artifact)?;
188
189        let path_str = new_instance.path()
190            .file_stem()
191            .and_then(|s| s.to_str())
192            .unwrap_or("unknown")
193            .to_string();
194        let new_profile = ModuleProfile::new(path_str);
195
196        let new_handle = self.next_handle();
197
198        self.instances.insert(new_handle, LoadedModule {
199            instance: new_instance,
200            profile: new_profile,
201            call_count: AtomicU64::new(0),
202            in_flight: AtomicUsize::new(0),
203            draining: AtomicBool::new(false),
204        });
205
206        let reload_state = ReloadState::new(handle.0, new_handle.0, in_flight_count);
207
208        if in_flight_count > 0 {
209            reload_state.wait_for_drain(config.drain_timeout)?;
210        }
211
212        self.instances.remove(&handle);
213
214        Ok(reload_state)
215    }
216}