Skip to main content

liminal_server/server/connection/
conversation.rs

1//! Connection-owned conversation resources.
2//!
3//! A connection process owns a [`ConnectionConversation`] per open conversation.
4//! The default implementation ([`LiminalConversationResource`]) wraps a real
5//! beamr-backed supervised conversation actor: messages are forwarded over its
6//! handle, and a participant crash is surfaced structurally through the actor's
7//! trapped linked-EXIT notifier — never by polling, sleeping, or a heartbeat.
8
9use std::sync::{Mutex, mpsc};
10use std::time::{Duration, Instant};
11
12use liminal::channel::SchemaId;
13use liminal::conversation::{ConversationActor, ConversationPhase, ParticipantPid};
14use liminal::envelope::{Envelope, PublisherId};
15use liminal::protocol::{
16    CausalContext as ProtocolCausalContext, MessageEnvelope, SchemaId as ProtocolSchemaId,
17};
18
19use crate::ServerError;
20
21/// Marker for library conversation state owned by a single connection process.
22pub trait ConversationResource: std::fmt::Debug + Send {
23    /// Delegates one conversation message to the library resource.
24    ///
25    /// # Errors
26    /// Returns [`ServerError`] when the liminal library rejects the conversation message.
27    fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError>;
28
29    /// Returns the participant PIDs linked to the supervised conversation, if any.
30    ///
31    /// A trace-only conversation has no participant process and returns an empty
32    /// slice; a real supervised conversation returns the linked participant PIDs.
33    fn participant_pids(&self) -> Vec<u64>;
34
35    /// Returns true if the conversation has structurally detected a participant
36    /// crash via the trapped linked-EXIT path (never by polling/sleeping).
37    ///
38    /// This is non-blocking: it observes whether the actor's exit notifier has
39    /// already fired (the link-EXIT event landed) and falls back to the actor's
40    /// structurally-set `Failed` phase. It does not sample liveness.
41    fn has_detected_crash(&self) -> bool;
42
43    /// Blocks up to `timeout` waiting for a structural linked-EXIT crash signal,
44    /// returning the [`Instant`] the EXIT was observed inside the actor's link
45    /// handler, or `None` if no crash is detected within the bound.
46    ///
47    /// The wait is event-driven (parks on the exit notifier and is woken by the
48    /// EXIT handler), not a poll loop. Used by tests to prove real detection.
49    fn await_crash(&self, timeout: Duration) -> Option<Instant>;
50
51    /// Receives the next reply the participant produced for this conversation,
52    /// bounded by `timeout`.
53    ///
54    /// A real participant processes each forwarded message and delivers a reply
55    /// back through the conversation; this drains that reply. A trace-only or
56    /// non-replying resource times out.
57    ///
58    /// # Errors
59    /// Returns [`ServerError`] when no reply arrives within `timeout`, the
60    /// participant crashed, or the conversation is unavailable.
61    fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError>;
62
63    /// Releases or finishes the library conversation resource.
64    ///
65    /// # Errors
66    /// Returns [`ServerError`] when the liminal library reports a close failure.
67    fn close(self: Box<Self>) -> Result<(), ServerError>;
68}
69
70/// Library conversation resource owned by a single connection process.
71#[derive(Debug)]
72pub struct ConnectionConversation {
73    resource: Box<dyn ConversationResource>,
74}
75
76impl ConnectionConversation {
77    /// Creates an owned conversation resource for one connection process.
78    #[must_use]
79    pub fn new(resource: Box<dyn ConversationResource>) -> Self {
80        Self { resource }
81    }
82
83    pub(super) fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError> {
84        self.resource.message(envelope)
85    }
86
87    /// Returns the participant PIDs linked to the supervised conversation.
88    #[must_use]
89    pub fn participant_pids(&self) -> Vec<u64> {
90        self.resource.participant_pids()
91    }
92
93    /// Returns true once a participant crash has been structurally detected
94    /// through the linked-EXIT mechanism.
95    #[must_use]
96    pub fn has_detected_crash(&self) -> bool {
97        self.resource.has_detected_crash()
98    }
99
100    /// Blocks (event-driven) up to `timeout` for a structural crash signal.
101    #[must_use]
102    pub fn await_crash(&self, timeout: Duration) -> Option<Instant> {
103        self.resource.await_crash(timeout)
104    }
105
106    /// Receives the next participant reply for this conversation, bounded by
107    /// `timeout`.
108    ///
109    /// # Errors
110    /// Returns [`ServerError`] when no reply arrives in time or the conversation
111    /// is unavailable.
112    pub fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError> {
113        self.resource.receive_reply(timeout)
114    }
115
116    pub(super) fn close(self) -> Result<(), ServerError> {
117        self.resource.close()
118    }
119}
120
121/// A real supervised conversation owned by one connection process.
122///
123/// Wraps a beamr-backed [`ConversationActor`] (a genuine supervised process that
124/// traps its participants' EXITs) rather than a trace-only span. Messages are
125/// forwarded to the actor over its handle, and a participant crash is surfaced
126/// structurally through the link-EXIT notifier — never by polling.
127#[derive(Debug)]
128pub(super) struct LiminalConversationResource {
129    actor: ConversationActor,
130    participant: ParticipantPid,
131    /// Receives the link-EXIT instant from the actor's trapped-EXIT handler. The
132    /// single observed instant is cached in `crash_observed` once drained so the
133    /// (one-shot) signal is not lost across repeated observations.
134    exit_rx: Mutex<mpsc::Receiver<Instant>>,
135    crash_observed: Mutex<Option<Instant>>,
136}
137
138impl LiminalConversationResource {
139    /// Creates a resource around a booted, crash-armed supervised actor.
140    pub(super) const fn new(
141        actor: ConversationActor,
142        participant: ParticipantPid,
143        exit_rx: mpsc::Receiver<Instant>,
144    ) -> Self {
145        Self {
146            actor,
147            participant,
148            exit_rx: Mutex::new(exit_rx),
149            crash_observed: Mutex::new(None),
150        }
151    }
152
153    /// Returns the cached crash instant or, non-blocking, the one already sent by
154    /// the EXIT handler. This reads an already-fired structural event; it never
155    /// sleeps or samples participant liveness.
156    fn poll_exit_signal(&self) -> Option<Instant> {
157        if let Ok(cached) = self.crash_observed.lock() {
158            if let Some(instant) = *cached {
159                return Some(instant);
160            }
161        }
162        let received = self.exit_rx.lock().map_or(None, |rx| rx.try_recv().ok());
163        self.cache(received);
164        received
165    }
166
167    /// Caches an observed crash instant so the one-shot signal is replayable.
168    fn cache(&self, instant: Option<Instant>) {
169        if let Some(instant) = instant {
170            if let Ok(mut cached) = self.crash_observed.lock() {
171                *cached = Some(instant);
172            }
173        }
174    }
175
176    /// True when the actor's structurally-tracked phase is `Failed`, which the
177    /// trapped-EXIT handler sets under `CrashPolicy::Fail`. This is a structural
178    /// state read, not a liveness sample.
179    fn actor_phase_failed(&self) -> bool {
180        matches!(
181            self.actor.state().map(|state| state.current_phase),
182            Ok(ConversationPhase::Failed)
183        )
184    }
185}
186
187impl ConversationResource for LiminalConversationResource {
188    fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError> {
189        // If the participant has already crashed (structural EXIT observed),
190        // refuse the message rather than forwarding into a failed conversation.
191        if self.poll_exit_signal().is_some() || self.actor_phase_failed() {
192            return Err(ServerError::ListenerAccept {
193                message: format!(
194                    "conversation participant {} crashed; message rejected",
195                    self.participant.get()
196                ),
197            });
198        }
199        let payload = envelope.payload.clone();
200        let message = Envelope::new(payload, None, SchemaId::new(), PublisherId::default());
201        self.actor
202            .handle()
203            .send(message)
204            .map_err(|error| ServerError::ListenerAccept {
205                message: format!("conversation message delivery failed: {error}"),
206            })
207    }
208
209    fn participant_pids(&self) -> Vec<u64> {
210        vec![self.participant.get()]
211    }
212
213    fn has_detected_crash(&self) -> bool {
214        self.poll_exit_signal().is_some() || self.actor_phase_failed()
215    }
216
217    fn await_crash(&self, timeout: Duration) -> Option<Instant> {
218        if let Some(instant) = self.poll_exit_signal() {
219            return Some(instant);
220        }
221        // Event-driven: park on the exit notifier; the actor's trapped-EXIT
222        // handler wakes us the instant the participant's link fires. No polling.
223        let received = self
224            .exit_rx
225            .lock()
226            .map_or(None, |rx| rx.recv_timeout(timeout).ok());
227        self.cache(received);
228        received
229    }
230
231    fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError> {
232        // The participant produced a reply that the conversation actor delivered
233        // back into the conversation; drain it (bounded). This is the reply leg
234        // of the request-reply path — proof the participant genuinely processed
235        // the forwarded message, not just that it was linked.
236        let reply =
237            self.actor
238                .receive_timeout(timeout)
239                .map_err(|error| ServerError::ListenerAccept {
240                    message: format!("conversation reply receive failed: {error}"),
241                })?;
242        Ok(MessageEnvelope::new(
243            ProtocolSchemaId::new([0; ProtocolSchemaId::WIRE_LEN]),
244            ProtocolCausalContext::independent(),
245            reply.payload,
246        ))
247    }
248
249    fn close(self: Box<Self>) -> Result<(), ServerError> {
250        let Self { actor, .. } = *self;
251        // A crashed (Failed) conversation cannot transition to Closed; tearing
252        // down its handle is sufficient and is not an error.
253        if matches!(
254            actor.state().map(|state| state.current_phase),
255            Ok(ConversationPhase::Failed)
256        ) {
257            actor.handle().close().ok();
258            return Ok(());
259        }
260        actor
261            .handle()
262            .close()
263            .map_err(|error| ServerError::ListenerAccept {
264                message: format!("conversation close failed: {error}"),
265            })
266    }
267}