Skip to main content

liminal/conversation/
types.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use beamr::process::registry::ProcessHandle;
5
6use crate::channel::ChannelMode;
7use crate::envelope::Envelope;
8use crate::error::LiminalError;
9use crate::tracing::{ConversationSpan, FinishedSpan, TraceContext};
10
11#[derive(Debug)]
12pub struct Conversation {
13    span: ConversationSpan,
14}
15
16impl Conversation {
17    #[must_use]
18    pub fn start(conversation_id: impl Into<String>) -> Self {
19        Self {
20            span: ConversationSpan::root(conversation_id),
21        }
22    }
23
24    #[must_use]
25    pub fn spawn_child(&self, conversation_id: impl Into<String>) -> Self {
26        Self {
27            span: self.span.child(conversation_id),
28        }
29    }
30
31    #[must_use]
32    pub const fn message<Payload>(&self, payload: Payload) -> ConversationMessage<Payload> {
33        ConversationMessage::new(payload, self.span.message_context())
34    }
35
36    #[must_use]
37    pub fn name(&self) -> &str {
38        self.span.name()
39    }
40
41    #[must_use]
42    pub const fn trace_context(&self) -> TraceContext {
43        self.span.context()
44    }
45
46    #[must_use]
47    pub const fn parent_trace_context(&self) -> Option<TraceContext> {
48        self.span.parent()
49    }
50
51    #[must_use]
52    pub fn finish(self) -> FinishedSpan {
53        self.span.finish()
54    }
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct ConversationMessage<Payload> {
59    payload: Payload,
60    trace_context: TraceContext,
61}
62
63impl<Payload> ConversationMessage<Payload> {
64    const fn new(payload: Payload, trace_context: TraceContext) -> Self {
65        Self {
66            payload,
67            trace_context,
68        }
69    }
70
71    #[must_use]
72    pub const fn trace_context(&self) -> TraceContext {
73        self.trace_context
74    }
75
76    #[must_use]
77    pub const fn payload(&self) -> &Payload {
78        &self.payload
79    }
80
81    #[must_use]
82    pub fn into_payload(self) -> Payload {
83        self.payload
84    }
85
86    #[must_use]
87    pub fn map<NextPayload>(
88        self,
89        map_payload: impl FnOnce(Payload) -> NextPayload,
90    ) -> ConversationMessage<NextPayload> {
91        ConversationMessage {
92            payload: map_payload(self.payload),
93            trace_context: self.trace_context,
94        }
95    }
96}
97
98/// PID of a beamr process participating in a conversation.
99#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
100pub struct ParticipantPid(u64);
101
102impl ParticipantPid {
103    /// Creates a participant PID wrapper from a raw beamr PID.
104    #[must_use]
105    pub const fn new(pid: u64) -> Self {
106        Self(pid)
107    }
108
109    /// Returns the raw beamr PID.
110    #[must_use]
111    pub const fn get(self) -> u64 {
112        self.0
113    }
114}
115
116impl From<u64> for ParticipantPid {
117    fn from(pid: u64) -> Self {
118        Self::new(pid)
119    }
120}
121
122impl From<ProcessHandle> for ParticipantPid {
123    fn from(handle: ProcessHandle) -> Self {
124        Self::new(handle.pid())
125    }
126}
127
128/// Policy applied when a linked participant exits unexpectedly.
129#[derive(Clone, Copy, Debug, PartialEq, Eq)]
130pub enum CrashPolicy {
131    /// Fail the conversation immediately.
132    Fail,
133    /// Record that routing should select another participant in a later brief.
134    RouteToNext,
135    /// Record that compensation should run in a later brief.
136    Compensate,
137}
138
139/// Required configuration for a conversation actor.
140#[derive(Clone, Debug, PartialEq, Eq)]
141pub struct ConversationConfig {
142    /// Participant process identifiers linked to the conversation actor.
143    pub participants: Vec<ParticipantPid>,
144    /// Optional in-memory deadline for the conversation.
145    pub timeout: Option<Duration>,
146    /// Durability mode marker; durable persistence is implemented elsewhere.
147    pub mode: ChannelMode,
148    /// Participant-crash policy. This has no default and must be provided.
149    pub on_crash: CrashPolicy,
150}
151
152impl ConversationConfig {
153    /// Creates conversation configuration from all required fields.
154    #[must_use]
155    pub const fn new(
156        participants: Vec<ParticipantPid>,
157        timeout: Option<Duration>,
158        mode: ChannelMode,
159        on_crash: CrashPolicy,
160    ) -> Self {
161        Self {
162            participants,
163            timeout,
164            mode,
165            on_crash,
166        }
167    }
168}
169
170/// Lifecycle phase of a conversation actor.
171#[derive(Clone, Copy, Debug, PartialEq, Eq)]
172pub enum ConversationPhase {
173    /// Actor exists but no exchange has started.
174    Created,
175    /// Messages are being exchanged.
176    Active,
177    /// Normal close has begun.
178    Completing,
179    /// Normal close finished.
180    Closed,
181    /// The conversation failed.
182    Failed,
183}
184
185/// Liveness recorded for one linked participant.
186#[derive(Clone, Copy, Debug, PartialEq, Eq)]
187pub enum ParticipantHealth {
188    /// The participant has not emitted an EXIT signal.
189    Alive,
190    /// The participant emitted an EXIT signal.
191    Dead,
192}
193
194/// Participant liveness snapshot.
195#[derive(Clone, Copy, Debug, PartialEq, Eq)]
196pub struct ParticipantStatus {
197    /// Participant PID.
198    pub participant: ParticipantPid,
199    /// Last known participant liveness.
200    pub health: ParticipantHealth,
201    /// Instant the participant's EXIT signal was observed, set when the
202    /// participant is marked dead. Replayed to a late exit-notifier registrant
203    /// so a crash that lands before registration is not lost.
204    pub exited_at: Option<Instant>,
205}
206
207impl ParticipantStatus {
208    /// Creates an alive participant status.
209    #[must_use]
210    pub const fn alive(participant: ParticipantPid) -> Self {
211        Self {
212            participant,
213            health: ParticipantHealth::Alive,
214            exited_at: None,
215        }
216    }
217
218    /// Marks this participant dead, recording the instant its EXIT was observed.
219    pub const fn mark_dead_at(&mut self, at: Instant) {
220        self.health = ParticipantHealth::Dead;
221        self.exited_at = Some(at);
222    }
223}
224
225/// Opaque conversation context accumulated by the actor.
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub enum ConversationContextEntry {
228    /// Envelope delivered to the conversation.
229    Sent(Envelope),
230    /// Envelope delivered from the conversation to a receiver.
231    Received(Envelope),
232    /// Participant crash handled according to the configured policy.
233    ParticipantCrashed {
234        /// Crashed participant PID.
235        participant: ParticipantPid,
236        /// Policy applied by the actor.
237        policy: CrashPolicy,
238    },
239}
240
241/// Queryable state snapshot owned by a conversation actor.
242#[derive(Clone, Debug, PartialEq, Eq)]
243pub struct ConversationState {
244    /// Current lifecycle phase.
245    pub current_phase: ConversationPhase,
246    /// Accumulated exchanged messages and lifecycle references.
247    pub context: Vec<ConversationContextEntry>,
248    /// Optional in-memory deadline.
249    pub deadline: Option<Instant>,
250    /// Last known participant liveness.
251    pub participants: Vec<ParticipantStatus>,
252    /// Durability mode marker retained for diagnostics and future resume work.
253    pub mode: ChannelMode,
254}
255
256impl ConversationState {
257    /// Creates initial state for a conversation config.
258    #[must_use]
259    pub fn from_config(config: &ConversationConfig, now: Instant) -> Self {
260        let deadline = config.timeout.map(|timeout| now + timeout);
261        let participants = config
262            .participants
263            .iter()
264            .copied()
265            .map(ParticipantStatus::alive)
266            .collect();
267
268        Self {
269            current_phase: ConversationPhase::Created,
270            context: Vec::new(),
271            deadline,
272            participants,
273            mode: config.mode,
274        }
275    }
276
277    /// Transitions Created to Active. Active is accepted as idempotent.
278    ///
279    /// # Errors
280    ///
281    /// Returns [`LiminalError::ConversationFailed`] when the state cannot become active.
282    pub fn activate(&mut self) -> Result<(), LiminalError> {
283        match self.current_phase {
284            ConversationPhase::Created => {
285                self.current_phase = ConversationPhase::Active;
286                Ok(())
287            }
288            ConversationPhase::Active => Ok(()),
289            phase => Err(invalid_transition(phase, ConversationPhase::Active)),
290        }
291    }
292
293    /// Transitions Active to Completing.
294    ///
295    /// # Errors
296    ///
297    /// Returns [`LiminalError::ConversationFailed`] when the state is not active.
298    pub fn begin_completing(&mut self) -> Result<(), LiminalError> {
299        match self.current_phase {
300            ConversationPhase::Active => {
301                self.current_phase = ConversationPhase::Completing;
302                Ok(())
303            }
304            ConversationPhase::Completing => Ok(()),
305            phase => Err(invalid_transition(phase, ConversationPhase::Completing)),
306        }
307    }
308
309    /// Transitions Completing to Closed.
310    ///
311    /// # Errors
312    ///
313    /// Returns [`LiminalError::ConversationFailed`] when completion has not begun.
314    pub fn close(&mut self) -> Result<(), LiminalError> {
315        match self.current_phase {
316            ConversationPhase::Completing => {
317                self.current_phase = ConversationPhase::Closed;
318                Ok(())
319            }
320            ConversationPhase::Closed => Ok(()),
321            phase => Err(invalid_transition(phase, ConversationPhase::Closed)),
322        }
323    }
324
325    /// Transitions any phase to Failed.
326    pub const fn fail(&mut self) {
327        self.current_phase = ConversationPhase::Failed;
328    }
329
330    /// Records an envelope sent into the conversation.
331    pub fn record_sent(&mut self, envelope: Envelope) {
332        self.context.push(ConversationContextEntry::Sent(envelope));
333    }
334
335    /// Records an envelope received from the conversation.
336    pub fn record_received(&mut self, envelope: Envelope) {
337        self.context
338            .push(ConversationContextEntry::Received(envelope));
339    }
340
341    /// Records participant crash handling and marks that participant dead,
342    /// stamping `exited_at` as the instant the EXIT signal was observed.
343    pub fn record_participant_crash(
344        &mut self,
345        participant: ParticipantPid,
346        policy: CrashPolicy,
347        exited_at: Instant,
348    ) {
349        for status in &mut self.participants {
350            if status.participant == participant {
351                status.mark_dead_at(exited_at);
352            }
353        }
354        self.context
355            .push(ConversationContextEntry::ParticipantCrashed {
356                participant,
357                policy,
358            });
359    }
360}
361
362/// Cloneable handle for interacting with a conversation actor.
363#[derive(Clone, Debug)]
364pub struct ConversationHandle {
365    backend: Arc<dyn ConversationHandleBackend>,
366}
367
368impl ConversationHandle {
369    pub(crate) fn new(backend: Arc<dyn ConversationHandleBackend>) -> Self {
370        Self { backend }
371    }
372
373    /// Sends a message envelope to the conversation actor.
374    ///
375    /// # Errors
376    ///
377    /// Returns a [`LiminalError`] when the actor cannot accept or process the message.
378    pub fn send(&self, message: impl Into<Envelope>) -> Result<(), LiminalError> {
379        self.backend.send(message.into())
380    }
381
382    /// Receives the next available envelope from the conversation actor.
383    ///
384    /// # Errors
385    ///
386    /// Returns a [`LiminalError`] when the actor is closed, failed, or unavailable.
387    pub fn receive(&self) -> Result<Envelope, LiminalError> {
388        self.backend.receive()
389    }
390
391    /// Closes the conversation normally.
392    ///
393    /// # Errors
394    ///
395    /// Returns a [`LiminalError`] when the actor cannot close normally.
396    pub fn close(&self) -> Result<(), LiminalError> {
397        self.backend.close()
398    }
399
400    /// Queries the actor state for diagnostics.
401    ///
402    /// # Errors
403    ///
404    /// Returns a [`LiminalError`] when the state cannot be queried.
405    pub fn query_state(&self) -> Result<ConversationState, LiminalError> {
406        self.backend.query_state()
407    }
408
409    /// Returns the current actor PID.
410    ///
411    /// # Errors
412    ///
413    /// Returns a [`LiminalError`] when the supervisor cannot inspect the actor.
414    pub fn actor_pid(&self) -> Result<ParticipantPid, LiminalError> {
415        self.backend.actor_pid()
416    }
417}
418
419pub(crate) trait ConversationHandleBackend: std::fmt::Debug + Send + Sync {
420    fn send(&self, message: Envelope) -> Result<(), LiminalError>;
421    fn receive(&self) -> Result<Envelope, LiminalError>;
422    fn close(&self) -> Result<(), LiminalError>;
423    fn query_state(&self) -> Result<ConversationState, LiminalError>;
424    fn actor_pid(&self) -> Result<ParticipantPid, LiminalError>;
425}
426
427fn invalid_transition(from: ConversationPhase, to: ConversationPhase) -> LiminalError {
428    LiminalError::ConversationFailed {
429        message: format!("invalid conversation phase transition from {from:?} to {to:?}"),
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::{Conversation, ConversationHandle};
436
437    #[test]
438    fn starting_conversation_creates_named_span_with_fresh_trace_context() {
439        let first = Conversation::start("conversation-1");
440        let second = Conversation::start("conversation-2");
441
442        assert_eq!(first.name(), "conversation-1");
443        assert_eq!(first.parent_trace_context(), None);
444        assert_ne!(first.trace_context().trace_id(), 0);
445        assert_ne!(first.trace_context().span_id(), 0);
446        assert_ne!(
447            first.trace_context().trace_id(),
448            second.trace_context().trace_id()
449        );
450    }
451
452    #[test]
453    fn messages_inherit_conversation_trace_context_automatically() {
454        let conversation = Conversation::start("conversation");
455        let message = conversation.message("payload");
456
457        assert_eq!(message.payload(), &"payload");
458        assert_eq!(message.trace_context(), conversation.trace_context());
459    }
460
461    #[test]
462    fn child_conversation_references_parent_trace_context() {
463        let parent = Conversation::start("parent");
464        let child = parent.spawn_child("child");
465
466        assert_eq!(child.name(), "child");
467        assert_eq!(child.parent_trace_context(), Some(parent.trace_context()));
468        assert_eq!(
469            child.trace_context().trace_id(),
470            parent.trace_context().trace_id()
471        );
472        assert_ne!(
473            child.trace_context().span_id(),
474            parent.trace_context().span_id()
475        );
476    }
477
478    #[test]
479    fn message_mapping_preserves_trace_context() {
480        let conversation = Conversation::start("conversation");
481        let context = conversation.trace_context();
482        let mapped = conversation.message(1_u8).map(u16::from);
483
484        assert_eq!(mapped.payload(), &1_u16);
485        assert_eq!(mapped.trace_context(), context);
486    }
487
488    #[test]
489    fn conversation_handle_is_clone_send_sync() {
490        fn assert_clone_send_sync<T: Clone + Send + Sync>() {}
491
492        assert_clone_send_sync::<ConversationHandle>();
493    }
494}