use std::error::Error;
use std::time::Duration;
use beamr::process::ExitReason;
use super::{DispatchConversation, DispatchError};
use crate::conversation::ParticipantPid;
use crate::routing::function::loader::{ModuleLoader, RoutingModule};
use crate::routing::group::{ConsumerGroup, ConsumerRegistration};
use crate::routing::{
ConsumerId, ConsumerStateView, RoutingDecision, RoutingFunction, RoutingMessage,
};
fn select_first() -> RoutingFunction {
ModuleLoader::new().load(RoutingModule::new(
b"dispatch-first",
|_message, consumers| {
consumers
.first()
.map_or_else(RoutingDecision::none, |state| {
RoutingDecision::select(state.consumer.clone())
})
},
))
}
fn select_by_target() -> RoutingFunction {
ModuleLoader::new().load(RoutingModule::new(
b"dispatch-by-target",
|message, consumers| {
let Some(crate::routing::FieldValue::Text(target)) = message.get("target") else {
return RoutingDecision::none();
};
consumers
.iter()
.find(|state| state.consumer.as_str() == target)
.map_or_else(RoutingDecision::none, |state| {
RoutingDecision::select(state.consumer.clone())
})
},
))
}
fn registration(id: &str, pid: u64) -> ConsumerRegistration {
ConsumerRegistration::new(
ParticipantPid::new(pid),
ConsumerStateView::new(ConsumerId::new(id), 0, 4, 0, Vec::new()),
)
}
fn has_link_to(scheduler: &beamr::scheduler::Scheduler, consumer_pid: u64) -> bool {
(consumer_pid.saturating_sub(256)..consumer_pid + 256)
.filter(|candidate| *candidate != consumer_pid && *candidate != 0)
.any(|candidate| scheduler.is_linked(candidate, consumer_pid))
}
fn wait_for_link(scheduler: &beamr::scheduler::Scheduler, consumer_pid: u64) -> bool {
let deadline = std::time::Instant::now() + Duration::from_secs(3);
while std::time::Instant::now() < deadline {
if has_link_to(scheduler, consumer_pid) {
return true;
}
std::thread::sleep(Duration::from_micros(100));
}
false
}
#[test]
fn dispatch_selects_consumer_via_routing_function() -> Result<(), Box<dyn Error>> {
let dispatch = DispatchConversation::new(ConsumerGroup::new(select_by_target()))?;
let scheduler = dispatch.supervisor().scheduler();
let a = ParticipantPid::new(scheduler.spawn_test_process(false));
let b = ParticipantPid::new(scheduler.spawn_test_process(false));
let c = ParticipantPid::new(scheduler.spawn_test_process(false));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
a,
ConsumerStateView::new(ConsumerId::new("A"), 0, 4, 0, Vec::new()),
));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
b,
ConsumerStateView::new(ConsumerId::new("B"), 0, 4, 0, Vec::new()),
));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
c,
ConsumerStateView::new(ConsumerId::new("C"), 0, 4, 0, Vec::new()),
));
let message =
RoutingMessage::new().with("target", crate::routing::FieldValue::Text("B".to_owned()));
let outcome = dispatch.dispatch(&message)?;
assert_eq!(outcome.delivered_to(), &ConsumerId::new("B"));
assert!(outcome.delivered_first_try());
dispatch.supervisor().shutdown();
Ok(())
}
#[test]
fn dispatch_links_to_selected_consumer_before_completion() -> Result<(), Box<dyn Error>> {
let dispatch = DispatchConversation::new(ConsumerGroup::new(select_first()))?;
let scheduler = dispatch.supervisor().scheduler();
let consumer = ParticipantPid::new(scheduler.spawn_test_process(false));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
consumer,
ConsumerStateView::new(ConsumerId::new("solo"), 0, 4, 0, Vec::new()),
));
let dispatch_for_thread = dispatch.clone();
let worker = std::thread::spawn(move || dispatch_for_thread.dispatch(&RoutingMessage::new()));
let linked = wait_for_link(&scheduler, consumer.get());
assert!(linked, "dispatch must link to the selected consumer");
let outcome = worker.join().map_err(|_| "dispatch thread panicked")??;
assert_eq!(outcome.delivered_to(), &ConsumerId::new("solo"));
dispatch.supervisor().shutdown();
Ok(())
}
#[test]
fn consumer_crash_reroutes_to_backup_with_real_sub_millisecond_latency()
-> Result<(), Box<dyn Error>> {
let dispatch = DispatchConversation::new(ConsumerGroup::new(select_first()))?;
let scheduler = dispatch.supervisor().scheduler();
let primary = ParticipantPid::new(scheduler.spawn_test_process(false));
let backup = ParticipantPid::new(scheduler.spawn_test_process(false));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
primary,
ConsumerStateView::new(ConsumerId::new("a-primary"), 0, 4, 0, Vec::new()),
));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
backup,
ConsumerStateView::new(ConsumerId::new("b-backup"), 0, 4, 0, Vec::new()),
));
let crasher = std::thread::spawn(move || {
let _ = wait_for_link(&scheduler, primary.get());
scheduler.terminate_process(primary.get(), ExitReason::Error);
});
let outcome = dispatch.dispatch(&RoutingMessage::new())?;
crasher.join().map_err(|_| "crasher thread panicked")?;
assert_eq!(outcome.delivered_to(), &ConsumerId::new("b-backup"));
assert_eq!(outcome.rerouted_from(), &[ConsumerId::new("a-primary")]);
let timings = outcome.reroute_timings();
assert_eq!(timings.len(), 1, "exactly one reroute timing expected");
let latency = timings[0].detection_to_reroute();
assert!(
latency < Duration::from_millis(1),
"crash-to-reroute latency {latency:?} exceeded one millisecond"
);
assert!(
timings[0].crash_observed() <= std::time::Instant::now(),
"crash instant must lie in the past"
);
dispatch.supervisor().shutdown();
Ok(())
}
#[test]
fn reroute_timing_spans_real_exit_to_reroute_via_event_path() -> Result<(), Box<dyn Error>> {
let dispatch = DispatchConversation::new(ConsumerGroup::new(select_first()))?;
let scheduler = dispatch.supervisor().scheduler();
let primary = ParticipantPid::new(scheduler.spawn_test_process(false));
let backup = ParticipantPid::new(scheduler.spawn_test_process(false));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
primary,
ConsumerStateView::new(ConsumerId::new("a-primary"), 0, 4, 0, Vec::new()),
));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
backup,
ConsumerStateView::new(ConsumerId::new("b-backup"), 0, 4, 0, Vec::new()),
));
let before_dispatch = std::time::Instant::now();
let crasher = std::thread::spawn(move || {
let _ = wait_for_link(&scheduler, primary.get());
scheduler.terminate_process(primary.get(), ExitReason::Error);
});
let outcome = dispatch.dispatch(&RoutingMessage::new())?;
crasher.join().map_err(|_| "crasher thread panicked")?;
let timing = outcome
.reroute_timings()
.first()
.ok_or("a reroute timing must be recorded after a crash")?;
assert!(
timing.crash_observed() >= before_dispatch,
"EXIT instant must fall after dispatch started"
);
assert!(timing.detection_to_reroute() < Duration::from_millis(1));
dispatch.supervisor().shutdown();
Ok(())
}
#[test]
fn no_remaining_consumer_after_crash_is_an_error() -> Result<(), Box<dyn Error>> {
let dispatch = DispatchConversation::new(ConsumerGroup::new(select_first()))?;
let scheduler = dispatch.supervisor().scheduler();
let only = ParticipantPid::new(scheduler.spawn_test_process(false));
let _ = dispatch.group().add_consumer(ConsumerRegistration::new(
only,
ConsumerStateView::new(ConsumerId::new("only"), 0, 4, 0, Vec::new()),
));
let crasher = std::thread::spawn(move || {
let _ = wait_for_link(&scheduler, only.get());
scheduler.terminate_process(only.get(), ExitReason::Error);
});
let result = dispatch.dispatch(&RoutingMessage::new());
crasher.join().map_err(|_| "crasher thread panicked")?;
assert_eq!(result, Err(DispatchError::NoConsumerAvailable));
dispatch.supervisor().shutdown();
Ok(())
}
#[test]
fn empty_group_yields_no_consumer_available() -> Result<(), Box<dyn Error>> {
let dispatch = DispatchConversation::new(ConsumerGroup::new(select_first()))?;
let result = dispatch.dispatch(&RoutingMessage::new());
assert_eq!(result, Err(DispatchError::NoConsumerAvailable));
dispatch.supervisor().shutdown();
Ok(())
}
#[test]
fn registration_helper_constructs_consumer_state() {
let registration = registration("helper", 1);
assert_eq!(registration.consumer(), &ConsumerId::new("helper"));
}