Skip to main content

liminal/routing/function/
loader.rs

1//! Content-hash module loading and hot deployment for routing functions.
2//!
3//! Status (2026-06): this is an in-memory `HashMap` keyed by content hash, not a
4//! beamr module registry. Module bytecode is only hashed for dedup; it is never
5//! executed — routing logic is supplied as a native Rust closure. Modules are
6//! content-addressed: loading the same bytecode twice reuses the already-loaded
7//! module, and hot deployment atomically swaps the active function while
8//! in-flight executions keep their own reference to the previous version until
9//! they complete.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex, MutexGuard};
13
14use crate::routing::function::execute::{ConsumerStateView, RoutingDecision, RoutingMessage};
15
16/// Native entrypoint a loaded routing module exposes to the supervisor.
17pub(super) type RoutingLogic =
18    dyn Fn(&RoutingMessage, &[ConsumerStateView]) -> RoutingDecision + Send + Sync + 'static;
19
20/// Content hash identifying a routing function module.
21///
22/// Two modules with identical bytecode share the same content hash and are
23/// loaded only once by [`ModuleLoader`].
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
25pub struct ContentHash(u64);
26
27impl ContentHash {
28    /// Computes the content hash of routing function bytecode.
29    #[must_use]
30    pub fn of(bytecode: &[u8]) -> Self {
31        const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
32        const PRIME: u64 = 0x0000_0100_0000_01b3;
33
34        let mut hash = OFFSET;
35        for byte in bytecode {
36            hash ^= u64::from(*byte);
37            hash = hash.wrapping_mul(PRIME);
38        }
39        Self(hash)
40    }
41}
42
43impl std::fmt::Display for ContentHash {
44    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(formatter, "{:016x}", self.0)
46    }
47}
48
49/// A routing module submitted for loading.
50///
51/// Carries the module bytecode (used to derive its [`ContentHash`]) and the
52/// native entrypoint the loaded module executes.
53pub struct RoutingModule {
54    content_hash: ContentHash,
55    logic: Arc<RoutingLogic>,
56}
57
58impl RoutingModule {
59    /// Creates a routing module from its bytecode and native entrypoint.
60    #[must_use]
61    pub fn new<F>(bytecode: &[u8], logic: F) -> Self
62    where
63        F: Fn(&RoutingMessage, &[ConsumerStateView]) -> RoutingDecision + Send + Sync + 'static,
64    {
65        Self {
66            content_hash: ContentHash::of(bytecode),
67            logic: Arc::new(logic),
68        }
69    }
70
71    /// Returns the content hash of the module.
72    #[must_use]
73    pub const fn content_hash(&self) -> ContentHash {
74        self.content_hash
75    }
76}
77
78impl std::fmt::Debug for RoutingModule {
79    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        formatter
81            .debug_struct("RoutingModule")
82            .field("content_hash", &self.content_hash)
83            .finish_non_exhaustive()
84    }
85}
86
87struct LoadedModule {
88    content_hash: ContentHash,
89    logic: Arc<RoutingLogic>,
90}
91
92impl std::fmt::Debug for LoadedModule {
93    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        formatter
95            .debug_struct("LoadedModule")
96            .field("content_hash", &self.content_hash)
97            .finish_non_exhaustive()
98    }
99}
100
101/// An executable routing function referencing a loaded module.
102#[derive(Clone)]
103pub struct RoutingFunction {
104    module: Arc<LoadedModule>,
105}
106
107impl RoutingFunction {
108    /// Returns the content hash of the underlying module.
109    #[must_use]
110    pub fn content_hash(&self) -> ContentHash {
111        self.module.content_hash
112    }
113
114    /// Returns a handle to the module's native entrypoint for the supervisor.
115    pub(super) fn logic(&self) -> Arc<RoutingLogic> {
116        Arc::clone(&self.module.logic)
117    }
118}
119
120impl std::fmt::Debug for RoutingFunction {
121    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        formatter
123            .debug_struct("RoutingFunction")
124            .field("content_hash", &self.module.content_hash)
125            .finish_non_exhaustive()
126    }
127}
128
129/// Loads routing modules by content hash, deduplicating identical bytecode.
130///
131/// Backed by an in-memory `HashMap` keyed by content hash, not a beamr module
132/// registry; the bytecode is hashed for dedup only and is never executed.
133/// Loading the same content hash twice returns a handle to the already-loaded
134/// module rather than loading it again.
135#[derive(Debug, Default)]
136pub struct ModuleLoader {
137    loaded: Mutex<HashMap<ContentHash, Arc<LoadedModule>>>,
138}
139
140impl ModuleLoader {
141    /// Creates an empty module loader.
142    #[must_use]
143    pub fn new() -> Self {
144        Self::default()
145    }
146
147    /// Loads `module`, returning an executable routing function.
148    ///
149    /// If a module with the same content hash is already loaded, the existing
150    /// module is reused and no duplicate is loaded.
151    #[must_use]
152    pub fn load(&self, module: RoutingModule) -> RoutingFunction {
153        let loaded_module = {
154            let mut loaded = lock(&self.loaded);
155            Arc::clone(loaded.entry(module.content_hash).or_insert_with(|| {
156                Arc::new(LoadedModule {
157                    content_hash: module.content_hash,
158                    logic: module.logic,
159                })
160            }))
161        };
162        RoutingFunction {
163            module: loaded_module,
164        }
165    }
166
167    /// Returns the number of distinct modules currently loaded.
168    #[must_use]
169    pub fn loaded_count(&self) -> usize {
170        lock(&self.loaded).len()
171    }
172
173    /// Returns true when a module with `hash` is loaded.
174    #[must_use]
175    pub fn is_loaded(&self, hash: ContentHash) -> bool {
176        lock(&self.loaded).contains_key(&hash)
177    }
178}
179
180/// Holds the active routing function for a channel and supports hot deployment.
181///
182/// Deploying a new version atomically swaps the active reference. In-flight
183/// executions hold their own clone of the previous function and complete
184/// normally; the previous module stays loaded until those clones are dropped.
185#[derive(Debug)]
186pub struct RoutingSlot {
187    current: Mutex<RoutingFunction>,
188}
189
190impl RoutingSlot {
191    /// Creates a slot holding `initial` as the active routing function.
192    #[must_use]
193    pub const fn new(initial: RoutingFunction) -> Self {
194        Self {
195            current: Mutex::new(initial),
196        }
197    }
198
199    /// Returns a handle to the currently active routing function.
200    #[must_use]
201    pub fn current(&self) -> RoutingFunction {
202        lock(&self.current).clone()
203    }
204
205    /// Hot-deploys `next` as the active routing function.
206    pub fn deploy(&self, next: RoutingFunction) {
207        *lock(&self.current) = next;
208    }
209
210    /// Returns the content hash of the active routing function.
211    #[must_use]
212    pub fn active_hash(&self) -> ContentHash {
213        lock(&self.current).content_hash()
214    }
215}
216
217pub(super) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
218    match mutex.lock() {
219        Ok(guard) => guard,
220        Err(poisoned) => poisoned.into_inner(),
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::{ContentHash, ModuleLoader, RoutingDecision, RoutingModule, RoutingSlot};
227
228    fn noop_module(bytecode: &[u8]) -> RoutingModule {
229        RoutingModule::new(bytecode, |_message, _consumers| RoutingDecision::none())
230    }
231
232    #[test]
233    fn content_hash_is_stable_and_distinguishes_bytecode() {
234        assert_eq!(ContentHash::of(b"module-a"), ContentHash::of(b"module-a"));
235        assert_ne!(ContentHash::of(b"module-a"), ContentHash::of(b"module-b"));
236    }
237
238    #[test]
239    fn load_returns_executable_function_keyed_by_content_hash() {
240        let loader = ModuleLoader::new();
241        let module = noop_module(b"v1");
242        let hash = module.content_hash();
243
244        let function = loader.load(module);
245
246        assert_eq!(function.content_hash(), hash);
247        assert!(loader.is_loaded(hash));
248    }
249
250    #[test]
251    fn loading_same_content_hash_twice_does_not_duplicate() {
252        let loader = ModuleLoader::new();
253
254        let _first = loader.load(noop_module(b"v1"));
255        let _second = loader.load(noop_module(b"v1"));
256
257        assert_eq!(loader.loaded_count(), 1);
258    }
259
260    #[test]
261    fn slot_deploy_swaps_active_function() {
262        let loader = ModuleLoader::new();
263        let old = loader.load(noop_module(b"v1"));
264        let new = loader.load(noop_module(b"v2"));
265        let new_hash = new.content_hash();
266
267        let slot = RoutingSlot::new(old);
268        slot.deploy(new);
269
270        assert_eq!(slot.active_hash(), new_hash);
271    }
272}