Skip to main content

liminal/routing/
dispatch.rs

1//! Conversation-mediated dispatch (ADR-007).
2//!
3//! A dispatch conversation replaces Kafka-style consumer group rebalancing with
4//! per-message, fault-aware consumer selection. For each message the dispatch
5//! conversation evaluates the group's routing function to select a target
6//! consumer, links the dispatch process to that consumer via a beamr process
7//! link, forwards the message, and observes completion. If the linked consumer
8//! crashes, the link fires immediately (no polling, no heartbeat) and the
9//! conversation re-routes the message to another available consumer, excluding
10//! the one that crashed. Selection is always via the routing function — never
11//! random — and adding or removing consumers takes effect on the next dispatch
12//! without a stop-the-world rebalance.
13
14use std::sync::mpsc;
15use std::time::{Duration, Instant};
16
17use crate::channel::ChannelMode;
18use crate::conversation::{ConversationConfig, ConversationSupervisor, CrashPolicy};
19use crate::error::LiminalError;
20use crate::routing::group::{ConsumerGroup, ConsumerRegistration};
21use crate::routing::{
22    ConsumerId, ConsumerStateView, FieldValue, FunctionError, RoutingDecision, RoutingMessage,
23    SupervisedExecutor,
24};
25
26/// Bounded window the dispatcher waits for the linked consumer's EXIT signal
27/// after forwarding the message. The dispatcher blocks on the exit notifier for
28/// this long; if the link fires it wakes the instant the EXIT is observed (the
29/// window never delays re-routing), and if the window elapses with no EXIT the
30/// message is treated as handed off to the linked, still-alive consumer.
31///
32/// The wait is event-driven: the thread parks in [`mpsc::Receiver::recv_timeout`]
33/// and is woken by the EXIT handler, not by sampling consumer liveness.
34const HANDOFF_CONFIRMATION_WINDOW: Duration = Duration::from_millis(250);
35
36/// Outcome of a successful dispatch: the consumer the message was delivered to
37/// and the chain of consumers that crashed and were re-routed past, in order.
38#[derive(Clone, Debug, PartialEq, Eq)]
39pub struct DispatchOutcome {
40    delivered_to: ConsumerId,
41    rerouted_from: Vec<ConsumerId>,
42    reroute_timings: Vec<RerouteTiming>,
43}
44
45impl DispatchOutcome {
46    /// Consumer the message was ultimately delivered to.
47    #[must_use]
48    pub const fn delivered_to(&self) -> &ConsumerId {
49        &self.delivered_to
50    }
51
52    /// Consumers that crashed mid-dispatch and were re-routed past, in order.
53    #[must_use]
54    pub fn rerouted_from(&self) -> &[ConsumerId] {
55        &self.rerouted_from
56    }
57
58    /// Real crash-to-reroute timings, one per consumer in [`Self::rerouted_from`]
59    /// and in the same order. Each spans the consumer's EXIT instant (captured
60    /// in the link handler) through to re-route initiation, so callers can
61    /// verify sub-millisecond, event-driven detection (R3 / CN7).
62    #[must_use]
63    pub fn reroute_timings(&self) -> &[RerouteTiming] {
64        &self.reroute_timings
65    }
66
67    /// True when the message was delivered without any consumer crash.
68    #[must_use]
69    pub fn delivered_first_try(&self) -> bool {
70        self.rerouted_from.is_empty()
71    }
72}
73
74/// Failure surfaced when a message cannot be dispatched to any consumer.
75#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
76pub enum DispatchError {
77    /// The routing function selected no consumer from the available set.
78    #[error("routing function selected no consumer from the available group set")]
79    NoConsumerAvailable,
80    /// The routing function selected a consumer that is not in the group.
81    #[error("routing function selected unknown consumer '{0}'")]
82    UnknownConsumerSelected(String),
83    /// Evaluating the group's routing function failed under supervision.
84    #[error("routing function evaluation failed: {0}")]
85    Evaluation(#[from] FunctionError),
86    /// The dispatch conversation infrastructure could not be driven.
87    #[error("dispatch conversation failed: {0}")]
88    Conversation(String),
89}
90
91impl From<LiminalError> for DispatchError {
92    fn from(error: LiminalError) -> Self {
93        Self::Conversation(error.to_string())
94    }
95}
96
97/// Real crash-to-reroute latency along the event path.
98///
99/// Records the instant the consumer's trapped EXIT signal was observed inside
100/// the conversation actor's link handler and the instant the dispatcher woke to
101/// initiate re-routing. The span between them is verifiably sub-millisecond.
102///
103/// `crash_observed` is captured *inside* the EXIT handler the moment the beamr
104/// process link fires (see `ActorCore::handle_participant_exit`); it is not a
105/// post-detection sample. `reroute_initiated` is captured the instant the
106/// dispatcher's blocked `recv` returns with that value.
107#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub struct RerouteTiming {
109    crash_observed: Instant,
110    reroute_initiated: Instant,
111}
112
113impl RerouteTiming {
114    /// Elapsed time between the EXIT signal firing and re-route initiation.
115    #[must_use]
116    pub fn detection_to_reroute(&self) -> Duration {
117        self.reroute_initiated
118            .saturating_duration_since(self.crash_observed)
119    }
120
121    /// The instant the consumer's EXIT signal was observed in the link handler.
122    #[must_use]
123    pub const fn crash_observed(&self) -> Instant {
124        self.crash_observed
125    }
126}
127
128/// Drives conversation-mediated dispatch for one consumer group.
129///
130/// Each call to [`DispatchConversation::dispatch`] spawns a supervised
131/// conversation actor (a beamr process), selects a consumer via the group's
132/// routing function, links the conversation to the consumer, forwards the
133/// message, and re-routes on consumer crash.
134#[derive(Clone, Debug)]
135pub struct DispatchConversation {
136    group: ConsumerGroup,
137    executor: SupervisedExecutor,
138    supervisor: ConversationSupervisor,
139}
140
141impl DispatchConversation {
142    /// Creates a dispatch driver for `group` using a fresh conversation supervisor.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`DispatchError::Conversation`] if the beamr scheduler backing the
147    /// conversation supervisor cannot start.
148    pub fn new(group: ConsumerGroup) -> Result<Self, DispatchError> {
149        let supervisor = ConversationSupervisor::new()?;
150        let executor = SupervisedExecutor::with_default_timeout(supervisor.scheduler());
151        Ok(Self {
152            group,
153            executor,
154            supervisor,
155        })
156    }
157
158    /// Creates a dispatch driver reusing an existing conversation supervisor and
159    /// supervision executor.
160    #[must_use]
161    pub const fn with_supervisor(
162        group: ConsumerGroup,
163        executor: SupervisedExecutor,
164        supervisor: ConversationSupervisor,
165    ) -> Self {
166        Self {
167            group,
168            executor,
169            supervisor,
170        }
171    }
172
173    /// Returns the consumer group this dispatch conversation routes over.
174    #[must_use]
175    pub const fn group(&self) -> &ConsumerGroup {
176        &self.group
177    }
178
179    /// Returns the conversation supervisor backing dispatch processes.
180    #[must_use]
181    pub const fn supervisor(&self) -> &ConversationSupervisor {
182        &self.supervisor
183    }
184
185    /// Dispatches `message` to a consumer selected by the group's routing function.
186    ///
187    /// Snapshots the group, evaluates the routing function over the live consumer
188    /// state, links a dispatch conversation process to the selected consumer,
189    /// forwards the message, and observes completion. On consumer crash the
190    /// message is re-routed to another consumer, excluding any that have crashed.
191    ///
192    /// # Errors
193    ///
194    /// Returns [`DispatchError::NoConsumerAvailable`] if the routing function
195    /// selects no consumer, [`DispatchError::UnknownConsumerSelected`] if it
196    /// selects a consumer outside the group, [`DispatchError::Evaluation`] if the
197    /// supervised routing function fails, and [`DispatchError::Conversation`] if
198    /// the dispatch conversation process cannot be driven.
199    pub fn dispatch(&self, message: &RoutingMessage) -> Result<DispatchOutcome, DispatchError> {
200        let mut excluded: Vec<ConsumerId> = Vec::new();
201        let mut reroute_timings: Vec<RerouteTiming> = Vec::new();
202        loop {
203            let selected = self.select_consumer(message, &excluded)?;
204            match self.run_attempt(message, &selected)? {
205                AttemptResult::Delivered => {
206                    return Ok(DispatchOutcome {
207                        delivered_to: selected.consumer().clone(),
208                        rerouted_from: excluded,
209                        reroute_timings,
210                    });
211                }
212                AttemptResult::Crashed(timing) => {
213                    // Re-routing is initiated here, the instant the dispatcher
214                    // woke from the EXIT notification. CN7 / R3: this is
215                    // sub-millisecond because detection is the beamr link firing
216                    // (no polling) and the wakeup is the very next step. The
217                    // timing spans the real EXIT instant captured in the link
218                    // handler through to this re-route initiation.
219                    debug_assert!(
220                        timing.detection_to_reroute() < Duration::from_millis(1),
221                        "crash-to-reroute exceeded one millisecond"
222                    );
223                    reroute_timings.push(timing);
224                    excluded.push(selected.consumer().clone());
225                }
226            }
227        }
228    }
229
230    /// Selects a consumer for `message`, excluding `excluded`, via the routing
231    /// function evaluated over the current group snapshot.
232    fn select_consumer(
233        &self,
234        message: &RoutingMessage,
235        excluded: &[ConsumerId],
236    ) -> Result<ConsumerRegistration, DispatchError> {
237        let snapshot = self.group.snapshot();
238        let available: Vec<&ConsumerRegistration> = snapshot
239            .consumers()
240            .iter()
241            .filter(|registration| !excluded.contains(registration.consumer()))
242            .collect();
243        if available.is_empty() {
244            return Err(DispatchError::NoConsumerAvailable);
245        }
246
247        let state_views: Vec<ConsumerStateView> = available
248            .iter()
249            .map(|registration| registration.state().clone())
250            .collect();
251        let decision: RoutingDecision =
252            self.executor
253                .execute(snapshot.routing_function(), message.clone(), state_views)?;
254
255        let Some(selected_id) = decision.selected() else {
256            return Err(DispatchError::NoConsumerAvailable);
257        };
258        available
259            .into_iter()
260            .find(|registration| registration.consumer() == selected_id)
261            .cloned()
262            .ok_or_else(|| DispatchError::UnknownConsumerSelected(selected_id.as_str().to_owned()))
263    }
264
265    /// Runs one dispatch attempt against `selected`: spawns a conversation
266    /// linked to the consumer, registers an exit notifier, forwards the real
267    /// message, then blocks on the notifier — waking the instant the consumer's
268    /// EXIT signal fires, or treating the message as handed off if the consumer
269    /// stays alive for the hand-off window.
270    fn run_attempt(
271        &self,
272        message: &RoutingMessage,
273        selected: &ConsumerRegistration,
274    ) -> Result<AttemptResult, DispatchError> {
275        let consumer_pid = selected.participant();
276        let actor = self.supervisor.spawn(ConversationConfig::new(
277            vec![consumer_pid],
278            None,
279            ChannelMode::Ephemeral,
280            CrashPolicy::RouteToNext,
281        ))?;
282        // Booting the actor establishes the beamr process link to the consumer
283        // (R2). pid() drives boot to completion, so the link exists before any
284        // message is forwarded.
285        actor.pid()?;
286
287        // Register the EXIT notifier BEFORE forwarding, so a crash that fires
288        // the moment the message reaches the consumer is never missed. The
289        // bounded channel holds the single EXIT instant the link handler sends.
290        let (exit_tx, exit_rx) = mpsc::sync_channel::<Instant>(1);
291        actor.notify_on_participant_exit(consumer_pid, exit_tx)?;
292
293        let handle = actor.handle();
294        // Forward the real dispatched message into the linked conversation
295        // (R2: link before forward is guaranteed by the boot above).
296        handle.send(dispatch_envelope(message)?)?;
297
298        observe_attempt(&exit_rx)
299    }
300}
301
302/// Observes a dispatch attempt by blocking on the consumer's EXIT notifier.
303///
304/// This is the event-driven crash-detection path mandated by R3 / CN7. The
305/// dispatcher parks in [`mpsc::Receiver::recv_timeout`] and is woken the instant
306/// the conversation actor's link handler sends the EXIT instant — there is no
307/// liveness sampling and no heartbeat. The received `Instant` is the moment the
308/// beamr process link fired (captured inside the EXIT handler), so the timing
309/// reflects real detection latency rather than a post-detection sample.
310///
311/// Returns [`AttemptResult::Crashed`] carrying that timing if the EXIT fires
312/// within the hand-off window, or [`AttemptResult::Delivered`] if the window
313/// elapses with no EXIT — meaning the linked consumer stayed alive and the
314/// message has been handed off to it over the live link.
315fn observe_attempt(exit_rx: &mpsc::Receiver<Instant>) -> Result<AttemptResult, DispatchError> {
316    match exit_rx.recv_timeout(HANDOFF_CONFIRMATION_WINDOW) {
317        Ok(crash_observed) => {
318            // Woken by the EXIT signal. Re-route initiation is this instant; the
319            // span back to `crash_observed` is the real, link-driven latency.
320            let reroute_initiated = Instant::now();
321            Ok(AttemptResult::Crashed(RerouteTiming {
322                crash_observed,
323                reroute_initiated,
324            }))
325        }
326        Err(mpsc::RecvTimeoutError::Timeout) => Ok(AttemptResult::Delivered),
327        Err(mpsc::RecvTimeoutError::Disconnected) => Err(DispatchError::Conversation(
328            "dispatch exit notifier disconnected before the hand-off window elapsed".to_owned(),
329        )),
330    }
331}
332
333/// Internal result of a single dispatch attempt.
334#[derive(Debug)]
335enum AttemptResult {
336    /// The consumer stayed alive across the hand-off window with the message
337    /// forwarded over the live link; the message is handed off to it.
338    Delivered,
339    /// The linked consumer's EXIT signal fired; re-route, excluding it. Carries
340    /// the real crash-to-reroute timing along the event path.
341    Crashed(RerouteTiming),
342}
343
344/// Builds the envelope forwarded to the selected consumer, carrying the real
345/// dispatched message content (the routing message's fields), not a placeholder.
346fn dispatch_envelope(message: &RoutingMessage) -> Result<crate::envelope::Envelope, DispatchError> {
347    let payload = encode_message(message)?;
348    Ok(crate::envelope::Envelope::new(
349        payload,
350        None,
351        crate::channel::SchemaId::new(),
352        crate::envelope::PublisherId::default(),
353    ))
354}
355
356/// Serializes the dispatched message's fields to JSON bytes so the forwarded
357/// envelope genuinely carries the message, not a synthetic substitute.
358fn encode_message(message: &RoutingMessage) -> Result<Vec<u8>, DispatchError> {
359    let map: serde_json::Map<String, serde_json::Value> = message
360        .fields()
361        .map(|(name, value)| (name.to_owned(), field_to_json(value)))
362        .collect();
363    serde_json::to_vec(&serde_json::Value::Object(map)).map_err(|error| {
364        DispatchError::Conversation(format!("failed to encode dispatched message: {error}"))
365    })
366}
367
368/// Maps a routing [`FieldValue`] to its JSON representation. A non-finite float
369/// has no JSON number form and is encoded as null, matching `serde_json`'s own
370/// handling of non-finite numbers.
371fn field_to_json(value: &FieldValue) -> serde_json::Value {
372    match value {
373        FieldValue::Text(text) => serde_json::Value::String(text.clone()),
374        FieldValue::Integer(integer) => serde_json::Value::from(*integer),
375        FieldValue::Float(float) => serde_json::Number::from_f64(*float)
376            .map_or(serde_json::Value::Null, serde_json::Value::Number),
377        FieldValue::Boolean(boolean) => serde_json::Value::Bool(*boolean),
378        FieldValue::Null => serde_json::Value::Null,
379    }
380}
381
382#[cfg(test)]
383mod tests;