liminal/routing/function/
loader.rs1use std::collections::HashMap;
12use std::sync::{Arc, Mutex, MutexGuard};
13
14use crate::routing::function::execute::{ConsumerStateView, RoutingDecision, RoutingMessage};
15
16pub(super) type RoutingLogic =
18 dyn Fn(&RoutingMessage, &[ConsumerStateView]) -> RoutingDecision + Send + Sync + 'static;
19
20#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
25pub struct ContentHash(u64);
26
27impl ContentHash {
28 #[must_use]
30 pub fn of(bytecode: &[u8]) -> Self {
31 const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
32 const PRIME: u64 = 0x0000_0100_0000_01b3;
33
34 let mut hash = OFFSET;
35 for byte in bytecode {
36 hash ^= u64::from(*byte);
37 hash = hash.wrapping_mul(PRIME);
38 }
39 Self(hash)
40 }
41}
42
43impl std::fmt::Display for ContentHash {
44 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(formatter, "{:016x}", self.0)
46 }
47}
48
49pub struct RoutingModule {
54 content_hash: ContentHash,
55 logic: Arc<RoutingLogic>,
56}
57
58impl RoutingModule {
59 #[must_use]
61 pub fn new<F>(bytecode: &[u8], logic: F) -> Self
62 where
63 F: Fn(&RoutingMessage, &[ConsumerStateView]) -> RoutingDecision + Send + Sync + 'static,
64 {
65 Self {
66 content_hash: ContentHash::of(bytecode),
67 logic: Arc::new(logic),
68 }
69 }
70
71 #[must_use]
73 pub const fn content_hash(&self) -> ContentHash {
74 self.content_hash
75 }
76}
77
78impl std::fmt::Debug for RoutingModule {
79 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 formatter
81 .debug_struct("RoutingModule")
82 .field("content_hash", &self.content_hash)
83 .finish_non_exhaustive()
84 }
85}
86
87struct LoadedModule {
88 content_hash: ContentHash,
89 logic: Arc<RoutingLogic>,
90}
91
92impl std::fmt::Debug for LoadedModule {
93 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 formatter
95 .debug_struct("LoadedModule")
96 .field("content_hash", &self.content_hash)
97 .finish_non_exhaustive()
98 }
99}
100
101#[derive(Clone)]
103pub struct RoutingFunction {
104 module: Arc<LoadedModule>,
105}
106
107impl RoutingFunction {
108 #[must_use]
110 pub fn content_hash(&self) -> ContentHash {
111 self.module.content_hash
112 }
113
114 pub(super) fn logic(&self) -> Arc<RoutingLogic> {
116 Arc::clone(&self.module.logic)
117 }
118}
119
120impl std::fmt::Debug for RoutingFunction {
121 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 formatter
123 .debug_struct("RoutingFunction")
124 .field("content_hash", &self.module.content_hash)
125 .finish_non_exhaustive()
126 }
127}
128
129#[derive(Debug, Default)]
136pub struct ModuleLoader {
137 loaded: Mutex<HashMap<ContentHash, Arc<LoadedModule>>>,
138}
139
140impl ModuleLoader {
141 #[must_use]
143 pub fn new() -> Self {
144 Self::default()
145 }
146
147 #[must_use]
152 pub fn load(&self, module: RoutingModule) -> RoutingFunction {
153 let loaded_module = {
154 let mut loaded = lock(&self.loaded);
155 Arc::clone(loaded.entry(module.content_hash).or_insert_with(|| {
156 Arc::new(LoadedModule {
157 content_hash: module.content_hash,
158 logic: module.logic,
159 })
160 }))
161 };
162 RoutingFunction {
163 module: loaded_module,
164 }
165 }
166
167 #[must_use]
169 pub fn loaded_count(&self) -> usize {
170 lock(&self.loaded).len()
171 }
172
173 #[must_use]
175 pub fn is_loaded(&self, hash: ContentHash) -> bool {
176 lock(&self.loaded).contains_key(&hash)
177 }
178}
179
180#[derive(Debug)]
186pub struct RoutingSlot {
187 current: Mutex<RoutingFunction>,
188}
189
190impl RoutingSlot {
191 #[must_use]
193 pub const fn new(initial: RoutingFunction) -> Self {
194 Self {
195 current: Mutex::new(initial),
196 }
197 }
198
199 #[must_use]
201 pub fn current(&self) -> RoutingFunction {
202 lock(&self.current).clone()
203 }
204
205 pub fn deploy(&self, next: RoutingFunction) {
207 *lock(&self.current) = next;
208 }
209
210 #[must_use]
212 pub fn active_hash(&self) -> ContentHash {
213 lock(&self.current).content_hash()
214 }
215}
216
217pub(super) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
218 match mutex.lock() {
219 Ok(guard) => guard,
220 Err(poisoned) => poisoned.into_inner(),
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::{ContentHash, ModuleLoader, RoutingDecision, RoutingModule, RoutingSlot};
227
228 fn noop_module(bytecode: &[u8]) -> RoutingModule {
229 RoutingModule::new(bytecode, |_message, _consumers| RoutingDecision::none())
230 }
231
232 #[test]
233 fn content_hash_is_stable_and_distinguishes_bytecode() {
234 assert_eq!(ContentHash::of(b"module-a"), ContentHash::of(b"module-a"));
235 assert_ne!(ContentHash::of(b"module-a"), ContentHash::of(b"module-b"));
236 }
237
238 #[test]
239 fn load_returns_executable_function_keyed_by_content_hash() {
240 let loader = ModuleLoader::new();
241 let module = noop_module(b"v1");
242 let hash = module.content_hash();
243
244 let function = loader.load(module);
245
246 assert_eq!(function.content_hash(), hash);
247 assert!(loader.is_loaded(hash));
248 }
249
250 #[test]
251 fn loading_same_content_hash_twice_does_not_duplicate() {
252 let loader = ModuleLoader::new();
253
254 let _first = loader.load(noop_module(b"v1"));
255 let _second = loader.load(noop_module(b"v1"));
256
257 assert_eq!(loader.loaded_count(), 1);
258 }
259
260 #[test]
261 fn slot_deploy_swaps_active_function() {
262 let loader = ModuleLoader::new();
263 let old = loader.load(noop_module(b"v1"));
264 let new = loader.load(noop_module(b"v2"));
265 let new_hash = new.content_hash();
266
267 let slot = RoutingSlot::new(old);
268 slot.deploy(new);
269
270 assert_eq!(slot.active_hash(), new_hash);
271 }
272}