1use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
5
6use dashmap::DashMap;
7
8use crate::error::{Error, Result};
9use crate::ffi::loader::ModuleLoader;
10use crate::instance::ModuleInstance;
11use crate::profile::ModuleProfile;
12use crate::reload::{ReloadConfig, ReloadState};
13use crate::resolver::{ModuleRef, ModuleResolver, local::LocalResolver};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct ModuleHandle(pub u64);
17
18#[derive(Debug, Clone, Default)]
19pub struct ModuleUsage {
20 pub arena_usage: f32,
21 pub arena_bytes: usize,
22 pub call_count: u64,
23}
24
25struct LoadedModule {
26 instance: ModuleInstance,
27 profile: ModuleProfile,
28 call_count: AtomicU64,
29 in_flight: AtomicUsize,
30 draining: AtomicBool,
31}
32
33pub struct Runtime {
34 resolver: Arc<dyn ModuleResolver>,
35 loader: ModuleLoader,
36 instances: DashMap<ModuleHandle, LoadedModule>,
37 handle_counter: AtomicU64,
38}
39
40impl Runtime {
41 pub fn new(resolver: Arc<dyn ModuleResolver>) -> Self {
42 Runtime {
43 resolver,
44 loader: ModuleLoader::new(),
45 instances: DashMap::new(),
46 handle_counter: AtomicU64::new(1),
47 }
48 }
49
50 pub fn with_local_resolver() -> Self {
51 Self::new(Arc::new(LocalResolver::new()))
52 }
53
54 fn next_handle(&self) -> ModuleHandle {
55 let id = self.handle_counter.fetch_add(1, Ordering::Relaxed);
56 ModuleHandle(id)
57 }
58
59 pub fn loaded_count(&self) -> usize {
60 self.instances.len()
61 }
62
63 pub fn is_loaded(&self, handle: ModuleHandle) -> bool {
64 self.instances.contains_key(&handle)
65 }
66
67 pub fn loaded_handles(&self) -> Vec<ModuleHandle> {
68 self.instances.iter().map(|r| *r.key()).collect()
69 }
70}
71
72pub trait ModuleRuntime: Send + Sync {
73 fn load(&self, reference: ModuleRef) -> Result<ModuleHandle>;
74 fn call(&self, handle: ModuleHandle, method: &str, args: &[u8]) -> Result<Vec<u8>>;
75 fn unload(&self, handle: ModuleHandle) -> Result<()>;
76 fn get_usage(&self, handle: ModuleHandle) -> Option<ModuleUsage>;
77 fn get_profile(&self, handle: ModuleHandle) -> Option<ModuleProfile>;
78 fn reload(&self, handle: ModuleHandle, reference: ModuleRef) -> Result<ReloadState>;
79 fn reload_with_config(
80 &self,
81 handle: ModuleHandle,
82 reference: ModuleRef,
83 config: ReloadConfig,
84 ) -> Result<ReloadState>;
85}
86
87impl ModuleRuntime for Runtime {
88 fn load(&self, reference: ModuleRef) -> Result<ModuleHandle> {
89 let artifact = self.resolver.resolve(reference)?;
90 let instance = self.loader.load(artifact)?;
91
92 let path_str = instance.path()
93 .file_stem()
94 .and_then(|s| s.to_str())
95 .unwrap_or("unknown")
96 .to_string();
97
98 let profile = ModuleProfile::new(path_str);
99 let handle = self.next_handle();
100
101 self.instances.insert(handle, LoadedModule {
102 instance,
103 profile,
104 call_count: AtomicU64::new(0),
105 in_flight: AtomicUsize::new(0),
106 draining: AtomicBool::new(false),
107 });
108
109 Ok(handle)
110 }
111
112 fn call(&self, handle: ModuleHandle, method: &str, args: &[u8]) -> Result<Vec<u8>> {
113 let module = self.instances.get(&handle)
114 .ok_or_else(|| Error::FileNotFound(
115 std::path::PathBuf::from(format!("Module handle {}", handle.0))
116 ))?;
117
118 if module.draining.load(Ordering::Relaxed) {
119 return Err(Error::ReloadInProgress(handle.0));
120 }
121
122 module.in_flight.fetch_add(1, Ordering::Relaxed);
123 module.call_count.fetch_add(1, Ordering::Relaxed);
124
125 let result = module.instance.call(method, args);
126
127 module.in_flight.fetch_sub(1, Ordering::Relaxed);
128
129 result
130 }
131
132 fn unload(&self, handle: ModuleHandle) -> Result<()> {
133 let removed = self.instances.remove(&handle);
134
135 if removed.is_some() {
136 Ok(())
137 } else {
138 Err(Error::FileNotFound(
139 std::path::PathBuf::from(format!("Module handle {}", handle.0))
140 ))
141 }
142 }
143
144 fn get_usage(&self, handle: ModuleHandle) -> Option<ModuleUsage> {
145 let module = self.instances.get(&handle)?;
146
147 let arena = module.instance.arena();
148 let usage = arena.usage();
149 let arena_bytes = arena.used();
150 let call_count = module.call_count.load(Ordering::Relaxed);
151
152 Some(ModuleUsage {
153 arena_usage: usage,
154 arena_bytes,
155 call_count,
156 })
157 }
158
159 fn get_profile(&self, handle: ModuleHandle) -> Option<ModuleProfile> {
160 let module = self.instances.get(&handle)?;
161 Some(module.profile.clone())
162 }
163
164 fn reload(&self, handle: ModuleHandle, reference: ModuleRef) -> Result<ReloadState> {
165 self.reload_with_config(handle, reference, ReloadConfig::default())
166 }
167
168 fn reload_with_config(
169 &self,
170 handle: ModuleHandle,
171 reference: ModuleRef,
172 config: ReloadConfig,
173 ) -> Result<ReloadState> {
174 let old_module = self.instances.get(&handle)
175 .ok_or_else(|| Error::FileNotFound(
176 std::path::PathBuf::from(format!("Module handle {}", handle.0))
177 ))?;
178
179 if old_module.draining.load(Ordering::Relaxed) {
180 return Err(Error::ReloadInProgress(handle.0));
181 }
182
183 let in_flight_count = old_module.in_flight.load(Ordering::Relaxed);
184 old_module.draining.store(true, Ordering::Relaxed);
185
186 let new_artifact = self.resolver.resolve(reference)?;
187 let new_instance = self.loader.load(new_artifact)?;
188
189 let path_str = new_instance.path()
190 .file_stem()
191 .and_then(|s| s.to_str())
192 .unwrap_or("unknown")
193 .to_string();
194 let new_profile = ModuleProfile::new(path_str);
195
196 let new_handle = self.next_handle();
197
198 self.instances.insert(new_handle, LoadedModule {
199 instance: new_instance,
200 profile: new_profile,
201 call_count: AtomicU64::new(0),
202 in_flight: AtomicUsize::new(0),
203 draining: AtomicBool::new(false),
204 });
205
206 let reload_state = ReloadState::new(handle.0, new_handle.0, in_flight_count);
207
208 if in_flight_count > 0 {
209 reload_state.wait_for_drain(config.drain_timeout)?;
210 }
211
212 self.instances.remove(&handle);
213
214 Ok(reload_state)
215 }
216}