liminal_server/server/connection/conversation.rs
1//! Connection-owned conversation resources.
2//!
3//! A connection process owns a [`ConnectionConversation`] per open conversation.
4//! The default implementation ([`LiminalConversationResource`]) wraps a real
5//! beamr-backed supervised conversation actor: messages are forwarded over its
6//! handle, and a participant crash is surfaced structurally through the actor's
7//! trapped linked-EXIT notifier — never by polling, sleeping, or a heartbeat.
8
9use std::sync::{Mutex, mpsc};
10use std::time::{Duration, Instant};
11
12use liminal::channel::SchemaId;
13use liminal::conversation::{ConversationActor, ConversationPhase, ParticipantPid};
14use liminal::envelope::{Envelope, PublisherId};
15use liminal::protocol::{
16 CausalContext as ProtocolCausalContext, MessageEnvelope, SchemaId as ProtocolSchemaId,
17};
18
19use crate::ServerError;
20
21/// Marker for library conversation state owned by a single connection process.
22pub trait ConversationResource: std::fmt::Debug + Send {
23 /// Delegates one conversation message to the library resource.
24 ///
25 /// # Errors
26 /// Returns [`ServerError`] when the liminal library rejects the conversation message.
27 fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError>;
28
29 /// Returns the participant PIDs linked to the supervised conversation, if any.
30 ///
31 /// A trace-only conversation has no participant process and returns an empty
32 /// slice; a real supervised conversation returns the linked participant PIDs.
33 fn participant_pids(&self) -> Vec<u64>;
34
35 /// Returns true if the conversation has structurally detected a participant
36 /// crash via the trapped linked-EXIT path (never by polling/sleeping).
37 ///
38 /// This is non-blocking: it observes whether the actor's exit notifier has
39 /// already fired (the link-EXIT event landed) and falls back to the actor's
40 /// structurally-set `Failed` phase. It does not sample liveness.
41 fn has_detected_crash(&self) -> bool;
42
43 /// Blocks up to `timeout` waiting for a structural linked-EXIT crash signal,
44 /// returning the [`Instant`] the EXIT was observed inside the actor's link
45 /// handler, or `None` if no crash is detected within the bound.
46 ///
47 /// The wait is event-driven (parks on the exit notifier and is woken by the
48 /// EXIT handler), not a poll loop. Used by tests to prove real detection.
49 fn await_crash(&self, timeout: Duration) -> Option<Instant>;
50
51 /// Receives the next reply the participant produced for this conversation,
52 /// bounded by `timeout`.
53 ///
54 /// A real participant processes each forwarded message and delivers a reply
55 /// back through the conversation; this drains that reply. A trace-only or
56 /// non-replying resource times out.
57 ///
58 /// # Errors
59 /// Returns [`ServerError`] when no reply arrives within `timeout`, the
60 /// participant crashed, or the conversation is unavailable.
61 fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError>;
62
63 /// Releases or finishes the library conversation resource.
64 ///
65 /// # Errors
66 /// Returns [`ServerError`] when the liminal library reports a close failure.
67 fn close(self: Box<Self>) -> Result<(), ServerError>;
68}
69
70/// Library conversation resource owned by a single connection process.
71#[derive(Debug)]
72pub struct ConnectionConversation {
73 resource: Box<dyn ConversationResource>,
74}
75
76impl ConnectionConversation {
77 /// Creates an owned conversation resource for one connection process.
78 #[must_use]
79 pub fn new(resource: Box<dyn ConversationResource>) -> Self {
80 Self { resource }
81 }
82
83 pub(super) fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError> {
84 self.resource.message(envelope)
85 }
86
87 /// Returns the participant PIDs linked to the supervised conversation.
88 #[must_use]
89 pub fn participant_pids(&self) -> Vec<u64> {
90 self.resource.participant_pids()
91 }
92
93 /// Returns true once a participant crash has been structurally detected
94 /// through the linked-EXIT mechanism.
95 #[must_use]
96 pub fn has_detected_crash(&self) -> bool {
97 self.resource.has_detected_crash()
98 }
99
100 /// Blocks (event-driven) up to `timeout` for a structural crash signal.
101 #[must_use]
102 pub fn await_crash(&self, timeout: Duration) -> Option<Instant> {
103 self.resource.await_crash(timeout)
104 }
105
106 /// Receives the next participant reply for this conversation, bounded by
107 /// `timeout`.
108 ///
109 /// # Errors
110 /// Returns [`ServerError`] when no reply arrives in time or the conversation
111 /// is unavailable.
112 pub fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError> {
113 self.resource.receive_reply(timeout)
114 }
115
116 pub(super) fn close(self) -> Result<(), ServerError> {
117 self.resource.close()
118 }
119}
120
121/// A real supervised conversation owned by one connection process.
122///
123/// Wraps a beamr-backed [`ConversationActor`] (a genuine supervised process that
124/// traps its participants' EXITs) rather than a trace-only span. Messages are
125/// forwarded to the actor over its handle, and a participant crash is surfaced
126/// structurally through the link-EXIT notifier — never by polling.
127#[derive(Debug)]
128pub(super) struct LiminalConversationResource {
129 actor: ConversationActor,
130 participant: ParticipantPid,
131 /// Receives the link-EXIT instant from the actor's trapped-EXIT handler. The
132 /// single observed instant is cached in `crash_observed` once drained so the
133 /// (one-shot) signal is not lost across repeated observations.
134 exit_rx: Mutex<mpsc::Receiver<Instant>>,
135 crash_observed: Mutex<Option<Instant>>,
136}
137
138impl LiminalConversationResource {
139 /// Creates a resource around a booted, crash-armed supervised actor.
140 pub(super) const fn new(
141 actor: ConversationActor,
142 participant: ParticipantPid,
143 exit_rx: mpsc::Receiver<Instant>,
144 ) -> Self {
145 Self {
146 actor,
147 participant,
148 exit_rx: Mutex::new(exit_rx),
149 crash_observed: Mutex::new(None),
150 }
151 }
152
153 /// Returns the cached crash instant or, non-blocking, the one already sent by
154 /// the EXIT handler. This reads an already-fired structural event; it never
155 /// sleeps or samples participant liveness.
156 fn poll_exit_signal(&self) -> Option<Instant> {
157 if let Ok(cached) = self.crash_observed.lock() {
158 if let Some(instant) = *cached {
159 return Some(instant);
160 }
161 }
162 let received = self.exit_rx.lock().map_or(None, |rx| rx.try_recv().ok());
163 self.cache(received);
164 received
165 }
166
167 /// Caches an observed crash instant so the one-shot signal is replayable.
168 fn cache(&self, instant: Option<Instant>) {
169 if let Some(instant) = instant {
170 if let Ok(mut cached) = self.crash_observed.lock() {
171 *cached = Some(instant);
172 }
173 }
174 }
175
176 /// True when the actor's structurally-tracked phase is `Failed`, which the
177 /// trapped-EXIT handler sets under `CrashPolicy::Fail`. This is a structural
178 /// state read, not a liveness sample.
179 fn actor_phase_failed(&self) -> bool {
180 matches!(
181 self.actor.state().map(|state| state.current_phase),
182 Ok(ConversationPhase::Failed)
183 )
184 }
185}
186
187impl ConversationResource for LiminalConversationResource {
188 fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError> {
189 // If the participant has already crashed (structural EXIT observed),
190 // refuse the message rather than forwarding into a failed conversation.
191 if self.poll_exit_signal().is_some() || self.actor_phase_failed() {
192 return Err(ServerError::ListenerAccept {
193 message: format!(
194 "conversation participant {} crashed; message rejected",
195 self.participant.get()
196 ),
197 });
198 }
199 let payload = envelope.payload.clone();
200 let message = Envelope::new(payload, None, SchemaId::new(), PublisherId::default());
201 self.actor
202 .handle()
203 .send(message)
204 .map_err(|error| ServerError::ListenerAccept {
205 message: format!("conversation message delivery failed: {error}"),
206 })
207 }
208
209 fn participant_pids(&self) -> Vec<u64> {
210 vec![self.participant.get()]
211 }
212
213 fn has_detected_crash(&self) -> bool {
214 self.poll_exit_signal().is_some() || self.actor_phase_failed()
215 }
216
217 fn await_crash(&self, timeout: Duration) -> Option<Instant> {
218 if let Some(instant) = self.poll_exit_signal() {
219 return Some(instant);
220 }
221 // Event-driven: park on the exit notifier; the actor's trapped-EXIT
222 // handler wakes us the instant the participant's link fires. No polling.
223 let received = self
224 .exit_rx
225 .lock()
226 .map_or(None, |rx| rx.recv_timeout(timeout).ok());
227 self.cache(received);
228 received
229 }
230
231 fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError> {
232 // The participant produced a reply that the conversation actor delivered
233 // back into the conversation; drain it (bounded). This is the reply leg
234 // of the request-reply path — proof the participant genuinely processed
235 // the forwarded message, not just that it was linked.
236 let reply =
237 self.actor
238 .receive_timeout(timeout)
239 .map_err(|error| ServerError::ListenerAccept {
240 message: format!("conversation reply receive failed: {error}"),
241 })?;
242 Ok(MessageEnvelope::new(
243 ProtocolSchemaId::new([0; ProtocolSchemaId::WIRE_LEN]),
244 ProtocolCausalContext::independent(),
245 reply.payload,
246 ))
247 }
248
249 fn close(self: Box<Self>) -> Result<(), ServerError> {
250 let Self { actor, .. } = *self;
251 // A crashed (Failed) conversation cannot transition to Closed; tearing
252 // down its handle is sufficient and is not an error.
253 if matches!(
254 actor.state().map(|state| state.current_phase),
255 Ok(ConversationPhase::Failed)
256 ) {
257 actor.handle().close().ok();
258 return Ok(());
259 }
260 actor
261 .handle()
262 .close()
263 .map_err(|error| ServerError::ListenerAccept {
264 message: format!("conversation close failed: {error}"),
265 })
266 }
267}