use std::sync::mpsc;
use std::time::{Duration, Instant};
use crate::channel::ChannelMode;
use crate::conversation::{ConversationConfig, ConversationSupervisor, CrashPolicy};
use crate::error::LiminalError;
use crate::routing::group::{ConsumerGroup, ConsumerRegistration};
use crate::routing::{
ConsumerId, ConsumerStateView, FieldValue, FunctionError, RoutingDecision, RoutingMessage,
SupervisedExecutor,
};
const HANDOFF_CONFIRMATION_WINDOW: Duration = Duration::from_millis(250);
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DispatchOutcome {
delivered_to: ConsumerId,
rerouted_from: Vec<ConsumerId>,
reroute_timings: Vec<RerouteTiming>,
}
impl DispatchOutcome {
#[must_use]
pub const fn delivered_to(&self) -> &ConsumerId {
&self.delivered_to
}
#[must_use]
pub fn rerouted_from(&self) -> &[ConsumerId] {
&self.rerouted_from
}
#[must_use]
pub fn reroute_timings(&self) -> &[RerouteTiming] {
&self.reroute_timings
}
#[must_use]
pub fn delivered_first_try(&self) -> bool {
self.rerouted_from.is_empty()
}
}
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum DispatchError {
#[error("routing function selected no consumer from the available group set")]
NoConsumerAvailable,
#[error("routing function selected unknown consumer '{0}'")]
UnknownConsumerSelected(String),
#[error("routing function evaluation failed: {0}")]
Evaluation(#[from] FunctionError),
#[error("dispatch conversation failed: {0}")]
Conversation(String),
}
impl From<LiminalError> for DispatchError {
fn from(error: LiminalError) -> Self {
Self::Conversation(error.to_string())
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RerouteTiming {
crash_observed: Instant,
reroute_initiated: Instant,
}
impl RerouteTiming {
#[must_use]
pub fn detection_to_reroute(&self) -> Duration {
self.reroute_initiated
.saturating_duration_since(self.crash_observed)
}
#[must_use]
pub const fn crash_observed(&self) -> Instant {
self.crash_observed
}
}
#[derive(Clone, Debug)]
pub struct DispatchConversation {
group: ConsumerGroup,
executor: SupervisedExecutor,
supervisor: ConversationSupervisor,
}
impl DispatchConversation {
pub fn new(group: ConsumerGroup) -> Result<Self, DispatchError> {
let supervisor = ConversationSupervisor::new()?;
let executor = SupervisedExecutor::with_default_timeout(supervisor.scheduler());
Ok(Self {
group,
executor,
supervisor,
})
}
#[must_use]
pub const fn with_supervisor(
group: ConsumerGroup,
executor: SupervisedExecutor,
supervisor: ConversationSupervisor,
) -> Self {
Self {
group,
executor,
supervisor,
}
}
#[must_use]
pub const fn group(&self) -> &ConsumerGroup {
&self.group
}
#[must_use]
pub const fn supervisor(&self) -> &ConversationSupervisor {
&self.supervisor
}
pub fn dispatch(&self, message: &RoutingMessage) -> Result<DispatchOutcome, DispatchError> {
let mut excluded: Vec<ConsumerId> = Vec::new();
let mut reroute_timings: Vec<RerouteTiming> = Vec::new();
loop {
let selected = self.select_consumer(message, &excluded)?;
match self.run_attempt(message, &selected)? {
AttemptResult::Delivered => {
return Ok(DispatchOutcome {
delivered_to: selected.consumer().clone(),
rerouted_from: excluded,
reroute_timings,
});
}
AttemptResult::Crashed(timing) => {
debug_assert!(
timing.detection_to_reroute() < Duration::from_millis(1),
"crash-to-reroute exceeded one millisecond"
);
reroute_timings.push(timing);
excluded.push(selected.consumer().clone());
}
}
}
}
fn select_consumer(
&self,
message: &RoutingMessage,
excluded: &[ConsumerId],
) -> Result<ConsumerRegistration, DispatchError> {
let snapshot = self.group.snapshot();
let available: Vec<&ConsumerRegistration> = snapshot
.consumers()
.iter()
.filter(|registration| !excluded.contains(registration.consumer()))
.collect();
if available.is_empty() {
return Err(DispatchError::NoConsumerAvailable);
}
let state_views: Vec<ConsumerStateView> = available
.iter()
.map(|registration| registration.state().clone())
.collect();
let decision: RoutingDecision =
self.executor
.execute(snapshot.routing_function(), message.clone(), state_views)?;
let Some(selected_id) = decision.selected() else {
return Err(DispatchError::NoConsumerAvailable);
};
available
.into_iter()
.find(|registration| registration.consumer() == selected_id)
.cloned()
.ok_or_else(|| DispatchError::UnknownConsumerSelected(selected_id.as_str().to_owned()))
}
fn run_attempt(
&self,
message: &RoutingMessage,
selected: &ConsumerRegistration,
) -> Result<AttemptResult, DispatchError> {
let consumer_pid = selected.participant();
let actor = self.supervisor.spawn(ConversationConfig::new(
vec![consumer_pid],
None,
ChannelMode::Ephemeral,
CrashPolicy::RouteToNext,
))?;
actor.pid()?;
let (exit_tx, exit_rx) = mpsc::sync_channel::<Instant>(1);
actor.notify_on_participant_exit(consumer_pid, exit_tx)?;
let handle = actor.handle();
handle.send(dispatch_envelope(message)?)?;
observe_attempt(&exit_rx)
}
}
fn observe_attempt(exit_rx: &mpsc::Receiver<Instant>) -> Result<AttemptResult, DispatchError> {
match exit_rx.recv_timeout(HANDOFF_CONFIRMATION_WINDOW) {
Ok(crash_observed) => {
let reroute_initiated = Instant::now();
Ok(AttemptResult::Crashed(RerouteTiming {
crash_observed,
reroute_initiated,
}))
}
Err(mpsc::RecvTimeoutError::Timeout) => Ok(AttemptResult::Delivered),
Err(mpsc::RecvTimeoutError::Disconnected) => Err(DispatchError::Conversation(
"dispatch exit notifier disconnected before the hand-off window elapsed".to_owned(),
)),
}
}
#[derive(Debug)]
enum AttemptResult {
Delivered,
Crashed(RerouteTiming),
}
fn dispatch_envelope(message: &RoutingMessage) -> Result<crate::envelope::Envelope, DispatchError> {
let payload = encode_message(message)?;
Ok(crate::envelope::Envelope::new(
payload,
None,
crate::channel::SchemaId::new(),
crate::envelope::PublisherId::default(),
))
}
fn encode_message(message: &RoutingMessage) -> Result<Vec<u8>, DispatchError> {
let map: serde_json::Map<String, serde_json::Value> = message
.fields()
.map(|(name, value)| (name.to_owned(), field_to_json(value)))
.collect();
serde_json::to_vec(&serde_json::Value::Object(map)).map_err(|error| {
DispatchError::Conversation(format!("failed to encode dispatched message: {error}"))
})
}
fn field_to_json(value: &FieldValue) -> serde_json::Value {
match value {
FieldValue::Text(text) => serde_json::Value::String(text.clone()),
FieldValue::Integer(integer) => serde_json::Value::from(*integer),
FieldValue::Float(float) => serde_json::Number::from_f64(*float)
.map_or(serde_json::Value::Null, serde_json::Value::Number),
FieldValue::Boolean(boolean) => serde_json::Value::Bool(*boolean),
FieldValue::Null => serde_json::Value::Null,
}
}
#[cfg(test)]
mod tests;