1use std::collections::BTreeMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use beamr::scheduler::Scheduler;
13
14use crate::routing::FieldValue;
15use crate::routing::function::loader::{ContentHash, RoutingFunction};
16mod actor;
17
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
20pub struct ConsumerId(String);
21
22impl ConsumerId {
23 #[must_use]
25 pub fn new(id: impl Into<String>) -> Self {
26 Self(id.into())
27 }
28
29 #[must_use]
31 pub fn as_str(&self) -> &str {
32 self.0.as_str()
33 }
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct ConsumerStateView {
39 pub consumer: ConsumerId,
41 pub current_in_flight: u32,
43 pub max_in_flight: u32,
45 pub buffer_depth: u32,
47 pub affinity_tags: Vec<String>,
49}
50
51impl ConsumerStateView {
52 #[must_use]
54 pub const fn new(
55 consumer: ConsumerId,
56 current_in_flight: u32,
57 max_in_flight: u32,
58 buffer_depth: u32,
59 affinity_tags: Vec<String>,
60 ) -> Self {
61 Self {
62 consumer,
63 current_in_flight,
64 max_in_flight,
65 buffer_depth,
66 affinity_tags,
67 }
68 }
69
70 #[must_use]
72 pub const fn available_capacity(&self) -> u32 {
73 self.max_in_flight.saturating_sub(self.current_in_flight)
74 }
75
76 #[must_use]
78 pub const fn has_capacity(&self) -> bool {
79 self.available_capacity() > 0
80 }
81
82 #[must_use]
84 pub fn has_affinity(&self, tag: &str) -> bool {
85 self.affinity_tags
86 .iter()
87 .any(|advertised| advertised == tag)
88 }
89}
90
91#[derive(Clone, Debug, Default, PartialEq, Eq)]
93pub struct RoutingDecision {
94 selected: Option<ConsumerId>,
95}
96
97impl RoutingDecision {
98 #[must_use]
100 pub const fn select(consumer: ConsumerId) -> Self {
101 Self {
102 selected: Some(consumer),
103 }
104 }
105
106 #[must_use]
108 pub const fn none() -> Self {
109 Self { selected: None }
110 }
111
112 #[must_use]
114 pub const fn selected(&self) -> Option<&ConsumerId> {
115 self.selected.as_ref()
116 }
117
118 #[must_use]
120 pub const fn is_selected(&self) -> bool {
121 self.selected.is_some()
122 }
123}
124
125#[derive(Clone, Debug, Default, PartialEq)]
127pub struct RoutingMessage {
128 fields: BTreeMap<String, FieldValue>,
129}
130
131impl RoutingMessage {
132 #[must_use]
134 pub fn new() -> Self {
135 Self::default()
136 }
137
138 #[must_use]
140 pub fn with(mut self, field: impl Into<String>, value: FieldValue) -> Self {
141 self.fields.insert(field.into(), value);
142 self
143 }
144
145 #[must_use]
147 pub fn get(&self, field: &str) -> Option<&FieldValue> {
148 self.fields.get(field)
149 }
150
151 pub fn fields(&self) -> impl Iterator<Item = (&str, &FieldValue)> {
153 self.fields
154 .iter()
155 .map(|(name, value)| (name.as_str(), value))
156 }
157}
158
159#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
161pub enum FunctionError {
162 #[error("routing function '{0}' panicked during execution")]
164 Crashed(ContentHash),
165 #[error("routing function '{0}' exceeded the supervision timeout")]
167 TimedOut(ContentHash),
168 #[error("routing function execution process could not be started: {0}")]
170 SpawnFailed(String),
171}
172
173#[derive(Clone)]
180pub struct SupervisedExecutor {
181 scheduler: Arc<Scheduler>,
182 timeout: Duration,
183}
184
185impl SupervisedExecutor {
186 pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
188
189 #[must_use]
191 pub const fn new(scheduler: Arc<Scheduler>, timeout: Duration) -> Self {
192 Self { scheduler, timeout }
193 }
194
195 #[must_use]
197 pub const fn with_default_timeout(scheduler: Arc<Scheduler>) -> Self {
198 Self::new(scheduler, Self::DEFAULT_TIMEOUT)
199 }
200
201 #[must_use]
203 pub fn scheduler(&self) -> Arc<Scheduler> {
204 Arc::clone(&self.scheduler)
205 }
206
207 pub fn execute(
218 &self,
219 function: &RoutingFunction,
220 message: RoutingMessage,
221 consumers: Vec<ConsumerStateView>,
222 ) -> Result<RoutingDecision, FunctionError> {
223 let invocation = actor::BeamrInvocation::new(Arc::clone(&self.scheduler), self.timeout);
224 let hash = function.content_hash();
225 match invocation.execute(function.clone(), message, consumers) {
226 Ok(decision) => Ok(decision),
227 Err(actor::InvocationError::Crashed) => Err(FunctionError::Crashed(hash)),
228 Err(actor::InvocationError::TimedOut(timed_out_hash)) => {
229 Err(FunctionError::TimedOut(timed_out_hash))
230 }
231 Err(actor::InvocationError::SpawnFailed(message)) => {
232 Err(FunctionError::SpawnFailed(message))
233 }
234 }
235 }
236}
237
238impl std::fmt::Debug for SupervisedExecutor {
239 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 formatter
241 .debug_struct("SupervisedExecutor")
242 .field("timeout", &self.timeout)
243 .finish_non_exhaustive()
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use std::error::Error;
250 use std::sync::Arc;
251 use std::sync::atomic::{AtomicBool, Ordering};
252 use std::thread;
253 use std::time::Duration;
254
255 use super::{
256 ConsumerId, ConsumerStateView, FunctionError, RoutingDecision, RoutingMessage,
257 SupervisedExecutor,
258 };
259 use crate::conversation::ConversationSupervisor;
260 use crate::routing::FieldValue;
261 use crate::routing::function::loader::{ModuleLoader, RoutingModule, RoutingSlot};
262
263 fn consumer(id: &str, current: u32, max: u32, tags: &[&str]) -> ConsumerStateView {
264 ConsumerStateView::new(
265 ConsumerId::new(id),
266 current,
267 max,
268 0,
269 tags.iter().map(|tag| (*tag).to_owned()).collect(),
270 )
271 }
272
273 fn select_first_with_capacity_module(bytecode: &[u8]) -> RoutingModule {
274 RoutingModule::new(bytecode, |_message, consumers| {
275 consumers
276 .iter()
277 .find(|state| state.has_capacity())
278 .map_or_else(RoutingDecision::none, |state| {
279 RoutingDecision::select(state.consumer.clone())
280 })
281 })
282 }
283
284 fn selected_name(decision: &RoutingDecision) -> Option<&str> {
285 decision.selected().map(ConsumerId::as_str)
286 }
287
288 fn supervised_executor() -> Result<(ConversationSupervisor, SupervisedExecutor), Box<dyn Error>>
289 {
290 let supervisor = ConversationSupervisor::new()?;
291 let executor = SupervisedExecutor::with_default_timeout(supervisor.scheduler());
292 Ok((supervisor, executor))
293 }
294
295 #[test]
296 fn execution_returns_decision_using_consumer_state_view() -> Result<(), Box<dyn Error>> {
297 let loader = ModuleLoader::new();
298 let function = loader.load(select_first_with_capacity_module(b"v1"));
299 let (_supervisor, executor) = supervised_executor()?;
300 let consumers = vec![
301 consumer("saturated", 5, 5, &["fast"]),
302 consumer("ready", 1, 4, &["fast"]),
303 ];
304
305 let decision = executor.execute(&function, RoutingMessage::new(), consumers);
306
307 assert!(matches!(decision, Ok(ref outcome) if selected_name(outcome) == Some("ready")));
308 Ok(())
309 }
310
311 #[test]
312 fn message_fields_are_visible_to_routing_function() -> Result<(), Box<dyn Error>> {
313 let loader = ModuleLoader::new();
314 let function = loader.load(RoutingModule::new(
315 b"amount-router",
316 |message, consumers| {
317 let high_value = matches!(
318 message.get("amount"),
319 Some(FieldValue::Integer(amount)) if *amount > 1_000
320 );
321 if high_value {
322 consumers
323 .first()
324 .map_or_else(RoutingDecision::none, |state| {
325 RoutingDecision::select(state.consumer.clone())
326 })
327 } else {
328 RoutingDecision::none()
329 }
330 },
331 ));
332 let (_supervisor, executor) = supervised_executor()?;
333 let message = RoutingMessage::new().with("amount", FieldValue::Integer(5_000));
334
335 let decision = executor.execute(&function, message, vec![consumer("priority", 0, 1, &[])]);
336
337 assert!(matches!(decision, Ok(ref outcome) if selected_name(outcome) == Some("priority")));
338 Ok(())
339 }
340
341 #[test]
342 fn panic_in_function_is_contained_and_other_channels_proceed() -> Result<(), Box<dyn Error>> {
343 let loader = ModuleLoader::new();
344 let crashing = loader.load(RoutingModule::new(b"channel-a", |_message, _consumers| {
345 std::panic::resume_unwind(Box::new(
346 "intentional crash for fault-isolation test".to_owned(),
347 ))
348 }));
349 let healthy = loader.load(select_first_with_capacity_module(b"channel-b"));
350 let (_supervisor, executor) = supervised_executor()?;
351
352 let crashed = executor.execute(&crashing, RoutingMessage::new(), Vec::new());
353 assert_eq!(
354 crashed,
355 Err(FunctionError::Crashed(crashing.content_hash()))
356 );
357
358 let recovered = executor.execute(
359 &healthy,
360 RoutingMessage::new(),
361 vec![consumer("ready", 0, 1, &[])],
362 );
363 assert!(matches!(recovered, Ok(ref outcome) if selected_name(outcome) == Some("ready")));
364 Ok(())
365 }
366
367 #[test]
368 fn repeated_panics_do_not_poison_the_shared_supervisor() -> Result<(), Box<dyn Error>> {
369 let loader = ModuleLoader::new();
377 let crashing = loader.load(RoutingModule::new(b"flaky", |_message, _consumers| {
378 std::panic::resume_unwind(Box::new("repeated intentional crash".to_owned()))
379 }));
380 let healthy = loader.load(select_first_with_capacity_module(b"steady"));
381 let (_supervisor, executor) = supervised_executor()?;
382
383 for _ in 0..16 {
384 let crashed = executor.execute(&crashing, RoutingMessage::new(), Vec::new());
385 assert_eq!(
386 crashed,
387 Err(FunctionError::Crashed(crashing.content_hash()))
388 );
389
390 let served = executor.execute(
391 &healthy,
392 RoutingMessage::new(),
393 vec![consumer("ready", 0, 1, &[])],
394 );
395 assert!(
396 matches!(served, Ok(ref outcome) if selected_name(outcome) == Some("ready")),
397 "scheduler must keep serving healthy invocations after a contained panic"
398 );
399 }
400 Ok(())
401 }
402
403 #[test]
404 fn function_exceeding_timeout_is_terminated_with_error() -> Result<(), Box<dyn Error>> {
405 let loader = ModuleLoader::new();
406 let slow = loader.load(RoutingModule::new(b"slow", |_message, _consumers| {
407 thread::sleep(Duration::from_millis(200));
408 RoutingDecision::none()
409 }));
410 let supervisor = ConversationSupervisor::new()?;
411 let executor = SupervisedExecutor::new(supervisor.scheduler(), Duration::from_millis(20));
412
413 let result = executor.execute(&slow, RoutingMessage::new(), Vec::new());
414
415 assert_eq!(result, Err(FunctionError::TimedOut(slow.content_hash())));
416 Ok(())
417 }
418
419 #[test]
420 fn hot_deploy_does_not_interrupt_in_flight_and_swaps_next_version() -> Result<(), Box<dyn Error>>
421 {
422 let loader = ModuleLoader::new();
423 let entered = Arc::new(AtomicBool::new(false));
424 let release = Arc::new(AtomicBool::new(false));
425 let entered_for_logic = Arc::clone(&entered);
426 let release_for_logic = Arc::clone(&release);
427
428 let old = loader.load(RoutingModule::new(b"v1", move |_message, _consumers| {
429 entered_for_logic.store(true, Ordering::SeqCst);
430 while !release_for_logic.load(Ordering::SeqCst) {
431 thread::sleep(Duration::from_millis(1));
432 }
433 RoutingDecision::select(ConsumerId::new("old"))
434 }));
435 let new = loader.load(RoutingModule::new(b"v2", |_message, _consumers| {
436 RoutingDecision::select(ConsumerId::new("new"))
437 }));
438 let old_hash = old.content_hash();
439 let new_hash = new.content_hash();
440
441 let slot = Arc::new(RoutingSlot::new(old));
442 let (_supervisor, executor) = supervised_executor()?;
443 let slot_for_thread = Arc::clone(&slot);
444 let executor_for_thread = executor.clone();
445
446 let in_flight = thread::spawn(move || {
447 let function = slot_for_thread.current();
448 executor_for_thread.execute(&function, RoutingMessage::new(), Vec::new())
449 });
450
451 while !entered.load(Ordering::SeqCst) {
452 thread::sleep(Duration::from_millis(1));
453 }
454
455 slot.deploy(new);
456 assert_eq!(slot.active_hash(), new_hash);
457 assert!(
458 loader.is_loaded(old_hash),
459 "old module must remain loaded while in flight"
460 );
461 assert_eq!(loader.loaded_count(), 2);
462
463 release.store(true, Ordering::SeqCst);
464
465 let in_flight_result = in_flight.join();
466 assert!(matches!(
467 in_flight_result,
468 Ok(Ok(ref outcome)) if selected_name(outcome) == Some("old")
469 ));
470
471 let next = executor.execute(&slot.current(), RoutingMessage::new(), Vec::new());
472 assert!(matches!(next, Ok(ref outcome) if selected_name(outcome) == Some("new")));
473 Ok(())
474 }
475}