1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use beamr::process::registry::ProcessHandle;
5
6use crate::channel::ChannelMode;
7use crate::envelope::Envelope;
8use crate::error::LiminalError;
9use crate::tracing::{ConversationSpan, FinishedSpan, TraceContext};
10
11#[derive(Debug)]
12pub struct Conversation {
13 span: ConversationSpan,
14}
15
16impl Conversation {
17 #[must_use]
18 pub fn start(conversation_id: impl Into<String>) -> Self {
19 Self {
20 span: ConversationSpan::root(conversation_id),
21 }
22 }
23
24 #[must_use]
25 pub fn spawn_child(&self, conversation_id: impl Into<String>) -> Self {
26 Self {
27 span: self.span.child(conversation_id),
28 }
29 }
30
31 #[must_use]
32 pub const fn message<Payload>(&self, payload: Payload) -> ConversationMessage<Payload> {
33 ConversationMessage::new(payload, self.span.message_context())
34 }
35
36 #[must_use]
37 pub fn name(&self) -> &str {
38 self.span.name()
39 }
40
41 #[must_use]
42 pub const fn trace_context(&self) -> TraceContext {
43 self.span.context()
44 }
45
46 #[must_use]
47 pub const fn parent_trace_context(&self) -> Option<TraceContext> {
48 self.span.parent()
49 }
50
51 #[must_use]
52 pub fn finish(self) -> FinishedSpan {
53 self.span.finish()
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct ConversationMessage<Payload> {
59 payload: Payload,
60 trace_context: TraceContext,
61}
62
63impl<Payload> ConversationMessage<Payload> {
64 const fn new(payload: Payload, trace_context: TraceContext) -> Self {
65 Self {
66 payload,
67 trace_context,
68 }
69 }
70
71 #[must_use]
72 pub const fn trace_context(&self) -> TraceContext {
73 self.trace_context
74 }
75
76 #[must_use]
77 pub const fn payload(&self) -> &Payload {
78 &self.payload
79 }
80
81 #[must_use]
82 pub fn into_payload(self) -> Payload {
83 self.payload
84 }
85
86 #[must_use]
87 pub fn map<NextPayload>(
88 self,
89 map_payload: impl FnOnce(Payload) -> NextPayload,
90 ) -> ConversationMessage<NextPayload> {
91 ConversationMessage {
92 payload: map_payload(self.payload),
93 trace_context: self.trace_context,
94 }
95 }
96}
97
98#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
100pub struct ParticipantPid(u64);
101
102impl ParticipantPid {
103 #[must_use]
105 pub const fn new(pid: u64) -> Self {
106 Self(pid)
107 }
108
109 #[must_use]
111 pub const fn get(self) -> u64 {
112 self.0
113 }
114}
115
116impl From<u64> for ParticipantPid {
117 fn from(pid: u64) -> Self {
118 Self::new(pid)
119 }
120}
121
122impl From<ProcessHandle> for ParticipantPid {
123 fn from(handle: ProcessHandle) -> Self {
124 Self::new(handle.pid())
125 }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
130pub enum CrashPolicy {
131 Fail,
133 RouteToNext,
135 Compensate,
137}
138
139#[derive(Clone, Debug, PartialEq, Eq)]
141pub struct ConversationConfig {
142 pub participants: Vec<ParticipantPid>,
144 pub timeout: Option<Duration>,
146 pub mode: ChannelMode,
148 pub on_crash: CrashPolicy,
150}
151
152impl ConversationConfig {
153 #[must_use]
155 pub const fn new(
156 participants: Vec<ParticipantPid>,
157 timeout: Option<Duration>,
158 mode: ChannelMode,
159 on_crash: CrashPolicy,
160 ) -> Self {
161 Self {
162 participants,
163 timeout,
164 mode,
165 on_crash,
166 }
167 }
168}
169
170#[derive(Clone, Copy, Debug, PartialEq, Eq)]
172pub enum ConversationPhase {
173 Created,
175 Active,
177 Completing,
179 Closed,
181 Failed,
183}
184
185#[derive(Clone, Copy, Debug, PartialEq, Eq)]
187pub enum ParticipantHealth {
188 Alive,
190 Dead,
192}
193
194#[derive(Clone, Copy, Debug, PartialEq, Eq)]
196pub struct ParticipantStatus {
197 pub participant: ParticipantPid,
199 pub health: ParticipantHealth,
201 pub exited_at: Option<Instant>,
205}
206
207impl ParticipantStatus {
208 #[must_use]
210 pub const fn alive(participant: ParticipantPid) -> Self {
211 Self {
212 participant,
213 health: ParticipantHealth::Alive,
214 exited_at: None,
215 }
216 }
217
218 pub const fn mark_dead_at(&mut self, at: Instant) {
220 self.health = ParticipantHealth::Dead;
221 self.exited_at = Some(at);
222 }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq)]
227pub enum ConversationContextEntry {
228 Sent(Envelope),
230 Received(Envelope),
232 ParticipantCrashed {
234 participant: ParticipantPid,
236 policy: CrashPolicy,
238 },
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
243pub struct ConversationState {
244 pub current_phase: ConversationPhase,
246 pub context: Vec<ConversationContextEntry>,
248 pub deadline: Option<Instant>,
250 pub participants: Vec<ParticipantStatus>,
252 pub mode: ChannelMode,
254}
255
256impl ConversationState {
257 #[must_use]
259 pub fn from_config(config: &ConversationConfig, now: Instant) -> Self {
260 let deadline = config.timeout.map(|timeout| now + timeout);
261 let participants = config
262 .participants
263 .iter()
264 .copied()
265 .map(ParticipantStatus::alive)
266 .collect();
267
268 Self {
269 current_phase: ConversationPhase::Created,
270 context: Vec::new(),
271 deadline,
272 participants,
273 mode: config.mode,
274 }
275 }
276
277 pub fn activate(&mut self) -> Result<(), LiminalError> {
283 match self.current_phase {
284 ConversationPhase::Created => {
285 self.current_phase = ConversationPhase::Active;
286 Ok(())
287 }
288 ConversationPhase::Active => Ok(()),
289 phase => Err(invalid_transition(phase, ConversationPhase::Active)),
290 }
291 }
292
293 pub fn begin_completing(&mut self) -> Result<(), LiminalError> {
299 match self.current_phase {
300 ConversationPhase::Active => {
301 self.current_phase = ConversationPhase::Completing;
302 Ok(())
303 }
304 ConversationPhase::Completing => Ok(()),
305 phase => Err(invalid_transition(phase, ConversationPhase::Completing)),
306 }
307 }
308
309 pub fn close(&mut self) -> Result<(), LiminalError> {
315 match self.current_phase {
316 ConversationPhase::Completing => {
317 self.current_phase = ConversationPhase::Closed;
318 Ok(())
319 }
320 ConversationPhase::Closed => Ok(()),
321 phase => Err(invalid_transition(phase, ConversationPhase::Closed)),
322 }
323 }
324
325 pub const fn fail(&mut self) {
327 self.current_phase = ConversationPhase::Failed;
328 }
329
330 pub fn record_sent(&mut self, envelope: Envelope) {
332 self.context.push(ConversationContextEntry::Sent(envelope));
333 }
334
335 pub fn record_received(&mut self, envelope: Envelope) {
337 self.context
338 .push(ConversationContextEntry::Received(envelope));
339 }
340
341 pub fn record_participant_crash(
344 &mut self,
345 participant: ParticipantPid,
346 policy: CrashPolicy,
347 exited_at: Instant,
348 ) {
349 for status in &mut self.participants {
350 if status.participant == participant {
351 status.mark_dead_at(exited_at);
352 }
353 }
354 self.context
355 .push(ConversationContextEntry::ParticipantCrashed {
356 participant,
357 policy,
358 });
359 }
360}
361
362#[derive(Clone, Debug)]
364pub struct ConversationHandle {
365 backend: Arc<dyn ConversationHandleBackend>,
366}
367
368impl ConversationHandle {
369 pub(crate) fn new(backend: Arc<dyn ConversationHandleBackend>) -> Self {
370 Self { backend }
371 }
372
373 pub fn send(&self, message: impl Into<Envelope>) -> Result<(), LiminalError> {
379 self.backend.send(message.into())
380 }
381
382 pub fn receive(&self) -> Result<Envelope, LiminalError> {
388 self.backend.receive()
389 }
390
391 pub fn close(&self) -> Result<(), LiminalError> {
397 self.backend.close()
398 }
399
400 pub fn query_state(&self) -> Result<ConversationState, LiminalError> {
406 self.backend.query_state()
407 }
408
409 pub fn actor_pid(&self) -> Result<ParticipantPid, LiminalError> {
415 self.backend.actor_pid()
416 }
417}
418
419pub(crate) trait ConversationHandleBackend: std::fmt::Debug + Send + Sync {
420 fn send(&self, message: Envelope) -> Result<(), LiminalError>;
421 fn receive(&self) -> Result<Envelope, LiminalError>;
422 fn close(&self) -> Result<(), LiminalError>;
423 fn query_state(&self) -> Result<ConversationState, LiminalError>;
424 fn actor_pid(&self) -> Result<ParticipantPid, LiminalError>;
425}
426
427fn invalid_transition(from: ConversationPhase, to: ConversationPhase) -> LiminalError {
428 LiminalError::ConversationFailed {
429 message: format!("invalid conversation phase transition from {from:?} to {to:?}"),
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::{Conversation, ConversationHandle};
436
437 #[test]
438 fn starting_conversation_creates_named_span_with_fresh_trace_context() {
439 let first = Conversation::start("conversation-1");
440 let second = Conversation::start("conversation-2");
441
442 assert_eq!(first.name(), "conversation-1");
443 assert_eq!(first.parent_trace_context(), None);
444 assert_ne!(first.trace_context().trace_id(), 0);
445 assert_ne!(first.trace_context().span_id(), 0);
446 assert_ne!(
447 first.trace_context().trace_id(),
448 second.trace_context().trace_id()
449 );
450 }
451
452 #[test]
453 fn messages_inherit_conversation_trace_context_automatically() {
454 let conversation = Conversation::start("conversation");
455 let message = conversation.message("payload");
456
457 assert_eq!(message.payload(), &"payload");
458 assert_eq!(message.trace_context(), conversation.trace_context());
459 }
460
461 #[test]
462 fn child_conversation_references_parent_trace_context() {
463 let parent = Conversation::start("parent");
464 let child = parent.spawn_child("child");
465
466 assert_eq!(child.name(), "child");
467 assert_eq!(child.parent_trace_context(), Some(parent.trace_context()));
468 assert_eq!(
469 child.trace_context().trace_id(),
470 parent.trace_context().trace_id()
471 );
472 assert_ne!(
473 child.trace_context().span_id(),
474 parent.trace_context().span_id()
475 );
476 }
477
478 #[test]
479 fn message_mapping_preserves_trace_context() {
480 let conversation = Conversation::start("conversation");
481 let context = conversation.trace_context();
482 let mapped = conversation.message(1_u8).map(u16::from);
483
484 assert_eq!(mapped.payload(), &1_u16);
485 assert_eq!(mapped.trace_context(), context);
486 }
487
488 #[test]
489 fn conversation_handle_is_clone_send_sync() {
490 fn assert_clone_send_sync<T: Clone + Send + Sync>() {}
491
492 assert_clone_send_sync::<ConversationHandle>();
493 }
494}