Skip to main content

liminal/routing/function/
execute.rs

1//! Supervised, isolated execution of routing functions.
2//!
3//! Each invocation runs in its own supervised process, never on the calling
4//! thread. A panic is contained and surfaced as [`FunctionError::Crashed`]; a
5//! function running past the supervision timeout is abandoned and surfaced as
6//! [`FunctionError::TimedOut`]. Neither outcome affects any other channel.
7
8use std::collections::BTreeMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use beamr::scheduler::Scheduler;
13
14use crate::routing::FieldValue;
15use crate::routing::function::loader::{ContentHash, RoutingFunction};
16mod actor;
17
18/// Identifier of a consumer that a routing function may select.
19#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
20pub struct ConsumerId(String);
21
22impl ConsumerId {
23    /// Creates a consumer identifier from an owned or borrowed string.
24    #[must_use]
25    pub fn new(id: impl Into<String>) -> Self {
26        Self(id.into())
27    }
28
29    /// Returns the consumer identifier as a borrowed string.
30    #[must_use]
31    pub fn as_str(&self) -> &str {
32        self.0.as_str()
33    }
34}
35
36/// Per-consumer state presented to a routing function during execution.
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct ConsumerStateView {
39    /// Consumer the state describes.
40    pub consumer: ConsumerId,
41    /// Messages currently in flight to the consumer.
42    pub current_in_flight: u32,
43    /// Maximum messages the consumer will accept in flight.
44    pub max_in_flight: u32,
45    /// Depth of the consumer's pending buffer.
46    pub buffer_depth: u32,
47    /// Affinity tags advertised by the consumer.
48    pub affinity_tags: Vec<String>,
49}
50
51impl ConsumerStateView {
52    /// Creates a consumer state view.
53    #[must_use]
54    pub const fn new(
55        consumer: ConsumerId,
56        current_in_flight: u32,
57        max_in_flight: u32,
58        buffer_depth: u32,
59        affinity_tags: Vec<String>,
60    ) -> Self {
61        Self {
62            consumer,
63            current_in_flight,
64            max_in_flight,
65            buffer_depth,
66            affinity_tags,
67        }
68    }
69
70    /// Returns the remaining in-flight capacity of the consumer.
71    #[must_use]
72    pub const fn available_capacity(&self) -> u32 {
73        self.max_in_flight.saturating_sub(self.current_in_flight)
74    }
75
76    /// Returns true when the consumer can accept at least one more message.
77    #[must_use]
78    pub const fn has_capacity(&self) -> bool {
79        self.available_capacity() > 0
80    }
81
82    /// Returns true when the consumer advertises `tag`.
83    #[must_use]
84    pub fn has_affinity(&self, tag: &str) -> bool {
85        self.affinity_tags
86            .iter()
87            .any(|advertised| advertised == tag)
88    }
89}
90
91/// Routing decision produced by a routing function.
92#[derive(Clone, Debug, Default, PartialEq, Eq)]
93pub struct RoutingDecision {
94    selected: Option<ConsumerId>,
95}
96
97impl RoutingDecision {
98    /// A decision that selects `consumer`.
99    #[must_use]
100    pub const fn select(consumer: ConsumerId) -> Self {
101        Self {
102            selected: Some(consumer),
103        }
104    }
105
106    /// A decision that selects no consumer.
107    #[must_use]
108    pub const fn none() -> Self {
109        Self { selected: None }
110    }
111
112    /// Returns the selected consumer, if any.
113    #[must_use]
114    pub const fn selected(&self) -> Option<&ConsumerId> {
115        self.selected.as_ref()
116    }
117
118    /// Returns true when a consumer was selected.
119    #[must_use]
120    pub const fn is_selected(&self) -> bool {
121        self.selected.is_some()
122    }
123}
124
125/// Owned, supervisor-marshalled view of a message a routing function evaluates.
126#[derive(Clone, Debug, Default, PartialEq)]
127pub struct RoutingMessage {
128    fields: BTreeMap<String, FieldValue>,
129}
130
131impl RoutingMessage {
132    /// Creates an empty routing message.
133    #[must_use]
134    pub fn new() -> Self {
135        Self::default()
136    }
137
138    /// Adds or replaces a field, returning the updated message.
139    #[must_use]
140    pub fn with(mut self, field: impl Into<String>, value: FieldValue) -> Self {
141        self.fields.insert(field.into(), value);
142        self
143    }
144
145    /// Returns the value of `field`, if present.
146    #[must_use]
147    pub fn get(&self, field: &str) -> Option<&FieldValue> {
148        self.fields.get(field)
149    }
150
151    /// Iterates the message's fields in deterministic (key-sorted) order.
152    pub fn fields(&self) -> impl Iterator<Item = (&str, &FieldValue)> {
153        self.fields
154            .iter()
155            .map(|(name, value)| (name.as_str(), value))
156    }
157}
158
159/// Failure surfaced when a supervised routing function does not complete.
160#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
161pub enum FunctionError {
162    /// The routing function panicked; the supervisor contained the crash.
163    #[error("routing function '{0}' panicked during execution")]
164    Crashed(ContentHash),
165    /// The routing function exceeded the supervision timeout and was abandoned.
166    #[error("routing function '{0}' exceeded the supervision timeout")]
167    TimedOut(ContentHash),
168    /// The supervised execution process could not be started.
169    #[error("routing function execution process could not be started: {0}")]
170    SpawnFailed(String),
171}
172
173/// Executes routing functions in supervised, isolated processes.
174///
175/// Each invocation runs in its own supervised process. A panic is contained and
176/// returned as [`FunctionError::Crashed`]; a function that runs past the
177/// supervision timeout is abandoned and returns [`FunctionError::TimedOut`].
178/// Neither outcome affects the execution of any other channel's functions.
179#[derive(Clone)]
180pub struct SupervisedExecutor {
181    scheduler: Arc<Scheduler>,
182    timeout: Duration,
183}
184
185impl SupervisedExecutor {
186    /// Default supervision timeout applied by [`SupervisedExecutor::with_default_timeout`].
187    pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
188
189    /// Creates an executor with the given supervision timeout.
190    #[must_use]
191    pub const fn new(scheduler: Arc<Scheduler>, timeout: Duration) -> Self {
192        Self { scheduler, timeout }
193    }
194
195    /// Creates an executor using [`SupervisedExecutor::DEFAULT_TIMEOUT`].
196    #[must_use]
197    pub const fn with_default_timeout(scheduler: Arc<Scheduler>) -> Self {
198        Self::new(scheduler, Self::DEFAULT_TIMEOUT)
199    }
200
201    /// Returns the beamr scheduler backing supervised invocations.
202    #[must_use]
203    pub fn scheduler(&self) -> Arc<Scheduler> {
204        Arc::clone(&self.scheduler)
205    }
206
207    /// Executes `function` against `message` and `consumers` under supervision.
208    ///
209    /// The function runs in a dedicated supervised process, never on the calling
210    /// thread, and receives the message and per-consumer state views.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`FunctionError::Crashed`] if the function panics,
215    /// [`FunctionError::TimedOut`] if it exceeds the supervision timeout, and
216    /// [`FunctionError::SpawnFailed`] if the supervised process cannot start.
217    pub fn execute(
218        &self,
219        function: &RoutingFunction,
220        message: RoutingMessage,
221        consumers: Vec<ConsumerStateView>,
222    ) -> Result<RoutingDecision, FunctionError> {
223        let invocation = actor::BeamrInvocation::new(Arc::clone(&self.scheduler), self.timeout);
224        let hash = function.content_hash();
225        match invocation.execute(function.clone(), message, consumers) {
226            Ok(decision) => Ok(decision),
227            Err(actor::InvocationError::Crashed) => Err(FunctionError::Crashed(hash)),
228            Err(actor::InvocationError::TimedOut(timed_out_hash)) => {
229                Err(FunctionError::TimedOut(timed_out_hash))
230            }
231            Err(actor::InvocationError::SpawnFailed(message)) => {
232                Err(FunctionError::SpawnFailed(message))
233            }
234        }
235    }
236}
237
238impl std::fmt::Debug for SupervisedExecutor {
239    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        formatter
241            .debug_struct("SupervisedExecutor")
242            .field("timeout", &self.timeout)
243            .finish_non_exhaustive()
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use std::error::Error;
250    use std::sync::Arc;
251    use std::sync::atomic::{AtomicBool, Ordering};
252    use std::thread;
253    use std::time::Duration;
254
255    use super::{
256        ConsumerId, ConsumerStateView, FunctionError, RoutingDecision, RoutingMessage,
257        SupervisedExecutor,
258    };
259    use crate::conversation::ConversationSupervisor;
260    use crate::routing::FieldValue;
261    use crate::routing::function::loader::{ModuleLoader, RoutingModule, RoutingSlot};
262
263    fn consumer(id: &str, current: u32, max: u32, tags: &[&str]) -> ConsumerStateView {
264        ConsumerStateView::new(
265            ConsumerId::new(id),
266            current,
267            max,
268            0,
269            tags.iter().map(|tag| (*tag).to_owned()).collect(),
270        )
271    }
272
273    fn select_first_with_capacity_module(bytecode: &[u8]) -> RoutingModule {
274        RoutingModule::new(bytecode, |_message, consumers| {
275            consumers
276                .iter()
277                .find(|state| state.has_capacity())
278                .map_or_else(RoutingDecision::none, |state| {
279                    RoutingDecision::select(state.consumer.clone())
280                })
281        })
282    }
283
284    fn selected_name(decision: &RoutingDecision) -> Option<&str> {
285        decision.selected().map(ConsumerId::as_str)
286    }
287
288    fn supervised_executor() -> Result<(ConversationSupervisor, SupervisedExecutor), Box<dyn Error>>
289    {
290        let supervisor = ConversationSupervisor::new()?;
291        let executor = SupervisedExecutor::with_default_timeout(supervisor.scheduler());
292        Ok((supervisor, executor))
293    }
294
295    #[test]
296    fn execution_returns_decision_using_consumer_state_view() -> Result<(), Box<dyn Error>> {
297        let loader = ModuleLoader::new();
298        let function = loader.load(select_first_with_capacity_module(b"v1"));
299        let (_supervisor, executor) = supervised_executor()?;
300        let consumers = vec![
301            consumer("saturated", 5, 5, &["fast"]),
302            consumer("ready", 1, 4, &["fast"]),
303        ];
304
305        let decision = executor.execute(&function, RoutingMessage::new(), consumers);
306
307        assert!(matches!(decision, Ok(ref outcome) if selected_name(outcome) == Some("ready")));
308        Ok(())
309    }
310
311    #[test]
312    fn message_fields_are_visible_to_routing_function() -> Result<(), Box<dyn Error>> {
313        let loader = ModuleLoader::new();
314        let function = loader.load(RoutingModule::new(
315            b"amount-router",
316            |message, consumers| {
317                let high_value = matches!(
318                    message.get("amount"),
319                    Some(FieldValue::Integer(amount)) if *amount > 1_000
320                );
321                if high_value {
322                    consumers
323                        .first()
324                        .map_or_else(RoutingDecision::none, |state| {
325                            RoutingDecision::select(state.consumer.clone())
326                        })
327                } else {
328                    RoutingDecision::none()
329                }
330            },
331        ));
332        let (_supervisor, executor) = supervised_executor()?;
333        let message = RoutingMessage::new().with("amount", FieldValue::Integer(5_000));
334
335        let decision = executor.execute(&function, message, vec![consumer("priority", 0, 1, &[])]);
336
337        assert!(matches!(decision, Ok(ref outcome) if selected_name(outcome) == Some("priority")));
338        Ok(())
339    }
340
341    #[test]
342    fn panic_in_function_is_contained_and_other_channels_proceed() -> Result<(), Box<dyn Error>> {
343        let loader = ModuleLoader::new();
344        let crashing = loader.load(RoutingModule::new(b"channel-a", |_message, _consumers| {
345            std::panic::resume_unwind(Box::new(
346                "intentional crash for fault-isolation test".to_owned(),
347            ))
348        }));
349        let healthy = loader.load(select_first_with_capacity_module(b"channel-b"));
350        let (_supervisor, executor) = supervised_executor()?;
351
352        let crashed = executor.execute(&crashing, RoutingMessage::new(), Vec::new());
353        assert_eq!(
354            crashed,
355            Err(FunctionError::Crashed(crashing.content_hash()))
356        );
357
358        let recovered = executor.execute(
359            &healthy,
360            RoutingMessage::new(),
361            vec![consumer("ready", 0, 1, &[])],
362        );
363        assert!(matches!(recovered, Ok(ref outcome) if selected_name(outcome) == Some("ready")));
364        Ok(())
365    }
366
367    #[test]
368    fn repeated_panics_do_not_poison_the_shared_supervisor() -> Result<(), Box<dyn Error>> {
369        // Real beamr supervision (not an ad-hoc panic catch on the calling
370        // thread) must keep the shared scheduler healthy under a sustained
371        // fault load: every crashing invocation is contained as `Crashed`, and
372        // a healthy invocation spawned on the *same* scheduler immediately
373        // after each crash still runs to a correct decision. If a panic escaped
374        // the supervised actor it would unwind the scheduler worker and the
375        // interleaved healthy invocations would hang or fail.
376        let loader = ModuleLoader::new();
377        let crashing = loader.load(RoutingModule::new(b"flaky", |_message, _consumers| {
378            std::panic::resume_unwind(Box::new("repeated intentional crash".to_owned()))
379        }));
380        let healthy = loader.load(select_first_with_capacity_module(b"steady"));
381        let (_supervisor, executor) = supervised_executor()?;
382
383        for _ in 0..16 {
384            let crashed = executor.execute(&crashing, RoutingMessage::new(), Vec::new());
385            assert_eq!(
386                crashed,
387                Err(FunctionError::Crashed(crashing.content_hash()))
388            );
389
390            let served = executor.execute(
391                &healthy,
392                RoutingMessage::new(),
393                vec![consumer("ready", 0, 1, &[])],
394            );
395            assert!(
396                matches!(served, Ok(ref outcome) if selected_name(outcome) == Some("ready")),
397                "scheduler must keep serving healthy invocations after a contained panic"
398            );
399        }
400        Ok(())
401    }
402
403    #[test]
404    fn function_exceeding_timeout_is_terminated_with_error() -> Result<(), Box<dyn Error>> {
405        let loader = ModuleLoader::new();
406        let slow = loader.load(RoutingModule::new(b"slow", |_message, _consumers| {
407            thread::sleep(Duration::from_millis(200));
408            RoutingDecision::none()
409        }));
410        let supervisor = ConversationSupervisor::new()?;
411        let executor = SupervisedExecutor::new(supervisor.scheduler(), Duration::from_millis(20));
412
413        let result = executor.execute(&slow, RoutingMessage::new(), Vec::new());
414
415        assert_eq!(result, Err(FunctionError::TimedOut(slow.content_hash())));
416        Ok(())
417    }
418
419    #[test]
420    fn hot_deploy_does_not_interrupt_in_flight_and_swaps_next_version() -> Result<(), Box<dyn Error>>
421    {
422        let loader = ModuleLoader::new();
423        let entered = Arc::new(AtomicBool::new(false));
424        let release = Arc::new(AtomicBool::new(false));
425        let entered_for_logic = Arc::clone(&entered);
426        let release_for_logic = Arc::clone(&release);
427
428        let old = loader.load(RoutingModule::new(b"v1", move |_message, _consumers| {
429            entered_for_logic.store(true, Ordering::SeqCst);
430            while !release_for_logic.load(Ordering::SeqCst) {
431                thread::sleep(Duration::from_millis(1));
432            }
433            RoutingDecision::select(ConsumerId::new("old"))
434        }));
435        let new = loader.load(RoutingModule::new(b"v2", |_message, _consumers| {
436            RoutingDecision::select(ConsumerId::new("new"))
437        }));
438        let old_hash = old.content_hash();
439        let new_hash = new.content_hash();
440
441        let slot = Arc::new(RoutingSlot::new(old));
442        let (_supervisor, executor) = supervised_executor()?;
443        let slot_for_thread = Arc::clone(&slot);
444        let executor_for_thread = executor.clone();
445
446        let in_flight = thread::spawn(move || {
447            let function = slot_for_thread.current();
448            executor_for_thread.execute(&function, RoutingMessage::new(), Vec::new())
449        });
450
451        while !entered.load(Ordering::SeqCst) {
452            thread::sleep(Duration::from_millis(1));
453        }
454
455        slot.deploy(new);
456        assert_eq!(slot.active_hash(), new_hash);
457        assert!(
458            loader.is_loaded(old_hash),
459            "old module must remain loaded while in flight"
460        );
461        assert_eq!(loader.loaded_count(), 2);
462
463        release.store(true, Ordering::SeqCst);
464
465        let in_flight_result = in_flight.join();
466        assert!(matches!(
467            in_flight_result,
468            Ok(Ok(ref outcome)) if selected_name(outcome) == Some("old")
469        ));
470
471        let next = executor.execute(&slot.current(), RoutingMessage::new(), Vec::new());
472        assert!(matches!(next, Ok(ref outcome) if selected_name(outcome) == Some("new")));
473        Ok(())
474    }
475}