liminal/routing/dispatch.rs
1//! Conversation-mediated dispatch (ADR-007).
2//!
3//! A dispatch conversation replaces Kafka-style consumer group rebalancing with
4//! per-message, fault-aware consumer selection. For each message the dispatch
5//! conversation evaluates the group's routing function to select a target
6//! consumer, links the dispatch process to that consumer via a beamr process
7//! link, forwards the message, and observes completion. If the linked consumer
8//! crashes, the link fires immediately (no polling, no heartbeat) and the
9//! conversation re-routes the message to another available consumer, excluding
10//! the one that crashed. Selection is always via the routing function — never
11//! random — and adding or removing consumers takes effect on the next dispatch
12//! without a stop-the-world rebalance.
13
14use std::sync::mpsc;
15use std::time::{Duration, Instant};
16
17use crate::channel::ChannelMode;
18use crate::conversation::{ConversationConfig, ConversationSupervisor, CrashPolicy};
19use crate::error::LiminalError;
20use crate::routing::group::{ConsumerGroup, ConsumerRegistration};
21use crate::routing::{
22 ConsumerId, ConsumerStateView, FieldValue, FunctionError, RoutingDecision, RoutingMessage,
23 SupervisedExecutor,
24};
25
26/// Bounded window the dispatcher waits for the linked consumer's EXIT signal
27/// after forwarding the message. The dispatcher blocks on the exit notifier for
28/// this long; if the link fires it wakes the instant the EXIT is observed (the
29/// window never delays re-routing), and if the window elapses with no EXIT the
30/// message is treated as handed off to the linked, still-alive consumer.
31///
32/// The wait is event-driven: the thread parks in [`mpsc::Receiver::recv_timeout`]
33/// and is woken by the EXIT handler, not by sampling consumer liveness.
34const HANDOFF_CONFIRMATION_WINDOW: Duration = Duration::from_millis(250);
35
36/// Outcome of a successful dispatch: the consumer the message was delivered to
37/// and the chain of consumers that crashed and were re-routed past, in order.
38#[derive(Clone, Debug, PartialEq, Eq)]
39pub struct DispatchOutcome {
40 delivered_to: ConsumerId,
41 rerouted_from: Vec<ConsumerId>,
42 reroute_timings: Vec<RerouteTiming>,
43}
44
45impl DispatchOutcome {
46 /// Consumer the message was ultimately delivered to.
47 #[must_use]
48 pub const fn delivered_to(&self) -> &ConsumerId {
49 &self.delivered_to
50 }
51
52 /// Consumers that crashed mid-dispatch and were re-routed past, in order.
53 #[must_use]
54 pub fn rerouted_from(&self) -> &[ConsumerId] {
55 &self.rerouted_from
56 }
57
58 /// Real crash-to-reroute timings, one per consumer in [`Self::rerouted_from`]
59 /// and in the same order. Each spans the consumer's EXIT instant (captured
60 /// in the link handler) through to re-route initiation, so callers can
61 /// verify sub-millisecond, event-driven detection (R3 / CN7).
62 #[must_use]
63 pub fn reroute_timings(&self) -> &[RerouteTiming] {
64 &self.reroute_timings
65 }
66
67 /// True when the message was delivered without any consumer crash.
68 #[must_use]
69 pub fn delivered_first_try(&self) -> bool {
70 self.rerouted_from.is_empty()
71 }
72}
73
74/// Failure surfaced when a message cannot be dispatched to any consumer.
75#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
76pub enum DispatchError {
77 /// The routing function selected no consumer from the available set.
78 #[error("routing function selected no consumer from the available group set")]
79 NoConsumerAvailable,
80 /// The routing function selected a consumer that is not in the group.
81 #[error("routing function selected unknown consumer '{0}'")]
82 UnknownConsumerSelected(String),
83 /// Evaluating the group's routing function failed under supervision.
84 #[error("routing function evaluation failed: {0}")]
85 Evaluation(#[from] FunctionError),
86 /// The dispatch conversation infrastructure could not be driven.
87 #[error("dispatch conversation failed: {0}")]
88 Conversation(String),
89}
90
91impl From<LiminalError> for DispatchError {
92 fn from(error: LiminalError) -> Self {
93 Self::Conversation(error.to_string())
94 }
95}
96
97/// Real crash-to-reroute latency along the event path.
98///
99/// Records the instant the consumer's trapped EXIT signal was observed inside
100/// the conversation actor's link handler and the instant the dispatcher woke to
101/// initiate re-routing. The span between them is verifiably sub-millisecond.
102///
103/// `crash_observed` is captured *inside* the EXIT handler the moment the beamr
104/// process link fires (see `ActorCore::handle_participant_exit`); it is not a
105/// post-detection sample. `reroute_initiated` is captured the instant the
106/// dispatcher's blocked `recv` returns with that value.
107#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub struct RerouteTiming {
109 crash_observed: Instant,
110 reroute_initiated: Instant,
111}
112
113impl RerouteTiming {
114 /// Elapsed time between the EXIT signal firing and re-route initiation.
115 #[must_use]
116 pub fn detection_to_reroute(&self) -> Duration {
117 self.reroute_initiated
118 .saturating_duration_since(self.crash_observed)
119 }
120
121 /// The instant the consumer's EXIT signal was observed in the link handler.
122 #[must_use]
123 pub const fn crash_observed(&self) -> Instant {
124 self.crash_observed
125 }
126}
127
128/// Drives conversation-mediated dispatch for one consumer group.
129///
130/// Each call to [`DispatchConversation::dispatch`] spawns a supervised
131/// conversation actor (a beamr process), selects a consumer via the group's
132/// routing function, links the conversation to the consumer, forwards the
133/// message, and re-routes on consumer crash.
134#[derive(Clone, Debug)]
135pub struct DispatchConversation {
136 group: ConsumerGroup,
137 executor: SupervisedExecutor,
138 supervisor: ConversationSupervisor,
139}
140
141impl DispatchConversation {
142 /// Creates a dispatch driver for `group` using a fresh conversation supervisor.
143 ///
144 /// # Errors
145 ///
146 /// Returns [`DispatchError::Conversation`] if the beamr scheduler backing the
147 /// conversation supervisor cannot start.
148 pub fn new(group: ConsumerGroup) -> Result<Self, DispatchError> {
149 let supervisor = ConversationSupervisor::new()?;
150 let executor = SupervisedExecutor::with_default_timeout(supervisor.scheduler());
151 Ok(Self {
152 group,
153 executor,
154 supervisor,
155 })
156 }
157
158 /// Creates a dispatch driver reusing an existing conversation supervisor and
159 /// supervision executor.
160 #[must_use]
161 pub const fn with_supervisor(
162 group: ConsumerGroup,
163 executor: SupervisedExecutor,
164 supervisor: ConversationSupervisor,
165 ) -> Self {
166 Self {
167 group,
168 executor,
169 supervisor,
170 }
171 }
172
173 /// Returns the consumer group this dispatch conversation routes over.
174 #[must_use]
175 pub const fn group(&self) -> &ConsumerGroup {
176 &self.group
177 }
178
179 /// Returns the conversation supervisor backing dispatch processes.
180 #[must_use]
181 pub const fn supervisor(&self) -> &ConversationSupervisor {
182 &self.supervisor
183 }
184
185 /// Dispatches `message` to a consumer selected by the group's routing function.
186 ///
187 /// Snapshots the group, evaluates the routing function over the live consumer
188 /// state, links a dispatch conversation process to the selected consumer,
189 /// forwards the message, and observes completion. On consumer crash the
190 /// message is re-routed to another consumer, excluding any that have crashed.
191 ///
192 /// # Errors
193 ///
194 /// Returns [`DispatchError::NoConsumerAvailable`] if the routing function
195 /// selects no consumer, [`DispatchError::UnknownConsumerSelected`] if it
196 /// selects a consumer outside the group, [`DispatchError::Evaluation`] if the
197 /// supervised routing function fails, and [`DispatchError::Conversation`] if
198 /// the dispatch conversation process cannot be driven.
199 pub fn dispatch(&self, message: &RoutingMessage) -> Result<DispatchOutcome, DispatchError> {
200 let mut excluded: Vec<ConsumerId> = Vec::new();
201 let mut reroute_timings: Vec<RerouteTiming> = Vec::new();
202 loop {
203 let selected = self.select_consumer(message, &excluded)?;
204 match self.run_attempt(message, &selected)? {
205 AttemptResult::Delivered => {
206 return Ok(DispatchOutcome {
207 delivered_to: selected.consumer().clone(),
208 rerouted_from: excluded,
209 reroute_timings,
210 });
211 }
212 AttemptResult::Crashed(timing) => {
213 // Re-routing is initiated here, the instant the dispatcher
214 // woke from the EXIT notification. CN7 / R3: this is
215 // sub-millisecond because detection is the beamr link firing
216 // (no polling) and the wakeup is the very next step. The
217 // timing spans the real EXIT instant captured in the link
218 // handler through to this re-route initiation.
219 debug_assert!(
220 timing.detection_to_reroute() < Duration::from_millis(1),
221 "crash-to-reroute exceeded one millisecond"
222 );
223 reroute_timings.push(timing);
224 excluded.push(selected.consumer().clone());
225 }
226 }
227 }
228 }
229
230 /// Selects a consumer for `message`, excluding `excluded`, via the routing
231 /// function evaluated over the current group snapshot.
232 fn select_consumer(
233 &self,
234 message: &RoutingMessage,
235 excluded: &[ConsumerId],
236 ) -> Result<ConsumerRegistration, DispatchError> {
237 let snapshot = self.group.snapshot();
238 let available: Vec<&ConsumerRegistration> = snapshot
239 .consumers()
240 .iter()
241 .filter(|registration| !excluded.contains(registration.consumer()))
242 .collect();
243 if available.is_empty() {
244 return Err(DispatchError::NoConsumerAvailable);
245 }
246
247 let state_views: Vec<ConsumerStateView> = available
248 .iter()
249 .map(|registration| registration.state().clone())
250 .collect();
251 let decision: RoutingDecision =
252 self.executor
253 .execute(snapshot.routing_function(), message.clone(), state_views)?;
254
255 let Some(selected_id) = decision.selected() else {
256 return Err(DispatchError::NoConsumerAvailable);
257 };
258 available
259 .into_iter()
260 .find(|registration| registration.consumer() == selected_id)
261 .cloned()
262 .ok_or_else(|| DispatchError::UnknownConsumerSelected(selected_id.as_str().to_owned()))
263 }
264
265 /// Runs one dispatch attempt against `selected`: spawns a conversation
266 /// linked to the consumer, registers an exit notifier, forwards the real
267 /// message, then blocks on the notifier — waking the instant the consumer's
268 /// EXIT signal fires, or treating the message as handed off if the consumer
269 /// stays alive for the hand-off window.
270 fn run_attempt(
271 &self,
272 message: &RoutingMessage,
273 selected: &ConsumerRegistration,
274 ) -> Result<AttemptResult, DispatchError> {
275 let consumer_pid = selected.participant();
276 let actor = self.supervisor.spawn(ConversationConfig::new(
277 vec![consumer_pid],
278 None,
279 ChannelMode::Ephemeral,
280 CrashPolicy::RouteToNext,
281 ))?;
282 // Booting the actor establishes the beamr process link to the consumer
283 // (R2). pid() drives boot to completion, so the link exists before any
284 // message is forwarded.
285 actor.pid()?;
286
287 // Register the EXIT notifier BEFORE forwarding, so a crash that fires
288 // the moment the message reaches the consumer is never missed. The
289 // bounded channel holds the single EXIT instant the link handler sends.
290 let (exit_tx, exit_rx) = mpsc::sync_channel::<Instant>(1);
291 actor.notify_on_participant_exit(consumer_pid, exit_tx)?;
292
293 let handle = actor.handle();
294 // Forward the real dispatched message into the linked conversation
295 // (R2: link before forward is guaranteed by the boot above).
296 handle.send(dispatch_envelope(message)?)?;
297
298 observe_attempt(&exit_rx)
299 }
300}
301
302/// Observes a dispatch attempt by blocking on the consumer's EXIT notifier.
303///
304/// This is the event-driven crash-detection path mandated by R3 / CN7. The
305/// dispatcher parks in [`mpsc::Receiver::recv_timeout`] and is woken the instant
306/// the conversation actor's link handler sends the EXIT instant — there is no
307/// liveness sampling and no heartbeat. The received `Instant` is the moment the
308/// beamr process link fired (captured inside the EXIT handler), so the timing
309/// reflects real detection latency rather than a post-detection sample.
310///
311/// Returns [`AttemptResult::Crashed`] carrying that timing if the EXIT fires
312/// within the hand-off window, or [`AttemptResult::Delivered`] if the window
313/// elapses with no EXIT — meaning the linked consumer stayed alive and the
314/// message has been handed off to it over the live link.
315fn observe_attempt(exit_rx: &mpsc::Receiver<Instant>) -> Result<AttemptResult, DispatchError> {
316 match exit_rx.recv_timeout(HANDOFF_CONFIRMATION_WINDOW) {
317 Ok(crash_observed) => {
318 // Woken by the EXIT signal. Re-route initiation is this instant; the
319 // span back to `crash_observed` is the real, link-driven latency.
320 let reroute_initiated = Instant::now();
321 Ok(AttemptResult::Crashed(RerouteTiming {
322 crash_observed,
323 reroute_initiated,
324 }))
325 }
326 Err(mpsc::RecvTimeoutError::Timeout) => Ok(AttemptResult::Delivered),
327 Err(mpsc::RecvTimeoutError::Disconnected) => Err(DispatchError::Conversation(
328 "dispatch exit notifier disconnected before the hand-off window elapsed".to_owned(),
329 )),
330 }
331}
332
333/// Internal result of a single dispatch attempt.
334#[derive(Debug)]
335enum AttemptResult {
336 /// The consumer stayed alive across the hand-off window with the message
337 /// forwarded over the live link; the message is handed off to it.
338 Delivered,
339 /// The linked consumer's EXIT signal fired; re-route, excluding it. Carries
340 /// the real crash-to-reroute timing along the event path.
341 Crashed(RerouteTiming),
342}
343
344/// Builds the envelope forwarded to the selected consumer, carrying the real
345/// dispatched message content (the routing message's fields), not a placeholder.
346fn dispatch_envelope(message: &RoutingMessage) -> Result<crate::envelope::Envelope, DispatchError> {
347 let payload = encode_message(message)?;
348 Ok(crate::envelope::Envelope::new(
349 payload,
350 None,
351 crate::channel::SchemaId::new(),
352 crate::envelope::PublisherId::default(),
353 ))
354}
355
356/// Serializes the dispatched message's fields to JSON bytes so the forwarded
357/// envelope genuinely carries the message, not a synthetic substitute.
358fn encode_message(message: &RoutingMessage) -> Result<Vec<u8>, DispatchError> {
359 let map: serde_json::Map<String, serde_json::Value> = message
360 .fields()
361 .map(|(name, value)| (name.to_owned(), field_to_json(value)))
362 .collect();
363 serde_json::to_vec(&serde_json::Value::Object(map)).map_err(|error| {
364 DispatchError::Conversation(format!("failed to encode dispatched message: {error}"))
365 })
366}
367
368/// Maps a routing [`FieldValue`] to its JSON representation. A non-finite float
369/// has no JSON number form and is encoded as null, matching `serde_json`'s own
370/// handling of non-finite numbers.
371fn field_to_json(value: &FieldValue) -> serde_json::Value {
372 match value {
373 FieldValue::Text(text) => serde_json::Value::String(text.clone()),
374 FieldValue::Integer(integer) => serde_json::Value::from(*integer),
375 FieldValue::Float(float) => serde_json::Number::from_f64(*float)
376 .map_or(serde_json::Value::Null, serde_json::Value::Number),
377 FieldValue::Boolean(boolean) => serde_json::Value::Bool(*boolean),
378 FieldValue::Null => serde_json::Value::Null,
379 }
380}
381
382#[cfg(test)]
383mod tests;