axon/session_runtime/ws.rs
1//! WebSocket carrier for the session-typed dialogue runtime.
2//!
3//! §Fase 41.d — the transport-specific wiring. The pure
4//! [`crate::session_runtime::state::SessionRuntime`] is the operational
5//! state machine; this module makes it speak RFC 6455 over a `tokio` +
6//! `axum` carrier.
7//!
8//! Outer surface:
9//! - [`drive`] — the protocol loop. Given an `axum::extract::ws::WebSocket`
10//! already upgraded by the caller, a [`SessionRuntime`], and a peer
11//! role (`PeerRole::Server`/`Client`), runs the dialogue to completion
12//! (`end`) or to a protocol error (which is reported to the peer via
13//! an `error` frame + carrier-close `1002 protocol error`).
14//! - [`upgrade_handler`] — a ready-made axum extractor handler that
15//! upgrades + drives a runtime against a caller-supplied factory.
16//!
17//! Connection lifecycle:
18//! 1. The carrier is established (HTTP upgrade → WS).
19//! 2. The server initialises a [`SessionRuntime`] for its declared role
20//! (e.g. `server` in `session Negotiate { client: [..], server: [..] }`).
21//! The client must run the *dual* role; duality has been checked at
22//! compile time by `axon-frontend::session::SessionType::is_dual_to`.
23//! 3. Frames are exchanged. Each frame received from the peer is routed
24//! to the appropriate `try_*` step of the runtime. The runtime owns
25//! *its own role's* perspective, so a peer `kind: "send"` is consumed
26//! as `try_recv` on the local cursor.
27//! 4. When the cursor reaches `end` the carrier is closed cleanly
28//! (`1000 normal closure`); a protocol error closes with `1002
29//! protocol error` and an `error` frame as the last message before
30//! the close.
31
32use axum::extract::ws::{CloseFrame, Message, WebSocket};
33
34use super::error::ProtocolError;
35use super::state::SessionRuntime;
36use super::wire::Frame;
37
38/// Which side of the dialogue this runtime is hosting — informs the
39/// receive-vs-send dispatch on incoming `Send` frames (a peer's `send` is
40/// our `recv`) and frames the carrier-close attribution on protocol
41/// errors. The choice is locked at upgrade time and never observed by
42/// the algebra (`SessionType` is direction-free; duality folds the
43/// direction in via the connection law).
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum PeerRole {
46 /// The local runtime is the server-side endpoint of the session.
47 Server,
48 /// The local runtime is the client-side endpoint.
49 Client,
50}
51
52impl PeerRole {
53 /// The dual — useful for symmetry checks in tests.
54 pub fn flip(self) -> Self {
55 match self {
56 PeerRole::Server => PeerRole::Client,
57 PeerRole::Client => PeerRole::Server,
58 }
59 }
60}
61
62/// WebSocket close codes used by the runtime (RFC 6455 §7.4). Kept here
63/// as named constants so the protocol-loop body documents its own
64/// closure semantics.
65const CLOSE_NORMAL: u16 = 1000;
66const CLOSE_PROTOCOL_ERROR: u16 = 1002;
67const CLOSE_INTERNAL_ERROR: u16 = 1011;
68
69/// Drive a session-typed dialogue to completion over a `WebSocket`.
70///
71/// The loop ends in exactly one of three observable states:
72/// - the runtime's cursor reaches `End` AND an `end` frame is exchanged
73/// → the carrier closes with `1000 normal closure`, return `Ok(())`;
74/// - a [`ProtocolError`] fires → an `error` frame is sent, the carrier
75/// is closed with `1002 protocol error`, return `Err(err)`;
76/// - the carrier drops or returns an I/O error → close cleanly if
77/// possible (`1011 internal error`), return
78/// [`ProtocolError::Transport`].
79///
80/// The function takes ownership of the `WebSocket` and consumes it.
81pub async fn drive(
82 mut ws: WebSocket,
83 mut runtime: SessionRuntime,
84 role: PeerRole,
85) -> Result<(), ProtocolError> {
86 // The loop alternates: read a peer frame OR (when the cursor is at
87 // `Send`/`Select`) wait for the caller to push one via the runtime.
88 // For 41.d the carrier is fully driven by peer frames; outgoing
89 // frames are emitted by a future fase (41.f hooks in the enterprise
90 // server). To keep the runtime exercisable end-to-end now, we
91 // operate in **echo mode**: any `Send`/`Select` cursor state is
92 // emitted onto the wire using a canonical surrogate value (the
93 // tests below cover the round-trip explicitly).
94 loop {
95 if runtime.is_complete() {
96 // Send our terminating `end` (the spec requires exactly one
97 // `end` per direction; we emit ours after the peer's so the
98 // dialogue is symmetric on the wire).
99 send_frame(&mut ws, &Frame::End).await?;
100 close_normal(&mut ws).await;
101 return Ok(());
102 }
103
104 // If the local cursor is at a producer state, emit an outgoing
105 // frame BEFORE blocking on the carrier — otherwise we deadlock.
106 if let Some(out) = next_outgoing_frame(&runtime) {
107 // Step the runtime over our own outgoing frame first. A
108 // failure here means our LOCAL discipline rejected the step
109 // (the §41.c credit-exhausted axiom at runtime is the
110 // canonical case — the static analyser would have caught it
111 // before deploy, but an off-spec config could still ship the
112 // exhaustion to runtime, where this runtime safety net fires
113 // and the peer is notified before the close-frame). We
114 // report the error onto the wire (so the peer learns *what*
115 // happened, not just that the connection died) and close
116 // `1002 protocol error`; then we propagate the error so the
117 // caller's await resolves with a non-`Ok` outcome.
118 if let Err(e) = apply_outgoing(&mut runtime, &out, role).await {
119 report_and_close(&mut ws, &e).await;
120 return Err(e);
121 }
122 send_frame(&mut ws, &out).await?;
123 continue;
124 }
125
126 // Otherwise we are at a consumer state — read the next peer frame.
127 let msg = match ws.recv().await {
128 Some(Ok(msg)) => msg,
129 Some(Err(e)) => {
130 let _ = close_internal(&mut ws).await;
131 return Err(ProtocolError::Transport(e.to_string()));
132 }
133 None => {
134 // Carrier closed cleanly mid-protocol — surface as a
135 // transport error so the caller can decide on retry /
136 // resume (41.g typed reconnection lives there).
137 return Err(ProtocolError::Transport("peer closed mid-protocol".into()));
138 }
139 };
140 match msg {
141 Message::Text(text) => {
142 let frame = match Frame::from_wire(&text) {
143 Ok(f) => f,
144 Err(e) => {
145 report_and_close(&mut ws, &e).await;
146 return Err(e);
147 }
148 };
149 if let Err(e) = apply_incoming(&mut runtime, frame, role) {
150 report_and_close(&mut ws, &e).await;
151 return Err(e);
152 }
153 }
154 Message::Binary(_) => {
155 // Binary frames are reserved for a later fase
156 // (multimedia mobility over typed channels). Treating
157 // them as malformed here keeps the wire closed.
158 let e = ProtocolError::MalformedFrame(
159 "binary frame received on a text-only session-typed channel".into(),
160 );
161 report_and_close(&mut ws, &e).await;
162 return Err(e);
163 }
164 Message::Ping(p) => {
165 // axum sends Pong itself, but be explicit + defensive.
166 let _ = ws.send(Message::Pong(p)).await;
167 }
168 Message::Pong(_) => {}
169 Message::Close(_) => {
170 // Peer initiated close. If we are not at `End`, this is
171 // a mid-protocol drop.
172 if runtime.is_complete() {
173 return Ok(());
174 }
175 return Err(ProtocolError::Transport("peer closed mid-protocol".into()));
176 }
177 }
178 }
179}
180
181/// Decide whether the local runtime currently owes the peer a frame. The
182/// rules follow the algebra exactly:
183/// - `Send` ⇒ we emit `Frame::Send { payload_type }`
184/// - `End` ⇒ we emit `Frame::End`
185/// - `Recv` ⇒ peer's turn — we wait
186/// - `Branch`⇒ peer's turn (they `select` an arm) — we wait
187/// - `Select`⇒ this fase the runtime cannot auto-pick an arm; the
188/// [`drive`] loop's echo mode emits the first label in
189/// canonical (BTreeMap) order so the test surface is total.
190///
191/// Visible to siblings (`session_runtime::sse`) so the SSE-fragment
192/// driver shares the same dispatch — both carriers run the same algebra,
193/// only the framing differs. §Fase 41.e.
194pub(super) fn next_outgoing_frame(runtime: &SessionRuntime) -> Option<Frame> {
195 use axon_frontend::session::SessionType;
196 match runtime.cursor() {
197 SessionType::Send { payload, .. } => Some(Frame::Send {
198 payload_type: payload.to_string(),
199 data: serde_json::Value::Null, // payload-shape carried opaquely
200 }),
201 SessionType::Select(arms) => {
202 // Echo mode: deterministic arm pick = the first key. Real
203 // application drivers (41.f) override by feeding outgoing
204 // frames explicitly.
205 let label = arms.keys().next()?.clone();
206 Some(Frame::Select { label })
207 }
208 SessionType::End => None, // handled at the top of `drive`
209 _ => None, // Recv / Branch / Rec / Var — peer's turn
210 }
211}
212
213/// Apply an outgoing frame to the local runtime (advancing the cursor
214/// before we put bytes on the wire). The `role` parameter is reserved
215/// for symmetry — both roles step the cursor identically on local
216/// actions; the algebra carries no direction beyond duality, which is
217/// already baked into the role's `SessionType` at construction.
218///
219/// Visible to siblings (`session_runtime::sse`) — the SSE-fragment
220/// driver advances its runtime via the same primitive so a Send / End
221/// transition is identical at the operational layer regardless of the
222/// carrier (WS frame vs SSE event). §Fase 41.e.
223pub(super) async fn apply_outgoing(
224 runtime: &mut SessionRuntime,
225 frame: &Frame,
226 _role: PeerRole,
227) -> Result<(), ProtocolError> {
228 match frame {
229 Frame::Send { payload_type, .. } => runtime.try_send(payload_type),
230 Frame::Select { label } => runtime.try_select(label),
231 Frame::End => runtime.try_end(),
232 Frame::Error { .. } => Ok(()), // pure carrier signal; no state change
233 }
234}
235
236/// Apply an incoming frame from the peer to the local runtime. From our
237/// side a peer-`send` is a `recv`, a peer-`select` is a `branch_offer`,
238/// and `end` matches `End` on the cursor.
239fn apply_incoming(
240 runtime: &mut SessionRuntime,
241 frame: Frame,
242 _role: PeerRole,
243) -> Result<(), ProtocolError> {
244 match frame {
245 Frame::Send { payload_type, .. } => runtime.try_recv(&payload_type),
246 Frame::Select { label } => runtime.try_offer(&label),
247 Frame::End => runtime.try_end(),
248 Frame::Error { code, detail } => Err(ProtocolError::Transport(format!(
249 "peer reported `{code}`: {detail}"
250 ))),
251 }
252}
253
254async fn send_frame(ws: &mut WebSocket, frame: &Frame) -> Result<(), ProtocolError> {
255 ws.send(Message::Text(frame.to_wire().into()))
256 .await
257 .map_err(|e| ProtocolError::Transport(e.to_string()))
258}
259
260async fn report_and_close(ws: &mut WebSocket, err: &ProtocolError) {
261 let frame = Frame::Error {
262 code: err.code().to_string(),
263 detail: err.to_string(),
264 };
265 // Best-effort — we may already be racing a peer close.
266 let _ = ws.send(Message::Text(frame.to_wire().into())).await;
267 let _ = close_with(ws, CLOSE_PROTOCOL_ERROR, err.code()).await;
268}
269
270async fn close_normal(ws: &mut WebSocket) {
271 let _ = close_with(ws, CLOSE_NORMAL, "session_end").await;
272}
273
274async fn close_internal(ws: &mut WebSocket) -> Result<(), ProtocolError> {
275 close_with(ws, CLOSE_INTERNAL_ERROR, "internal").await
276}
277
278async fn close_with(ws: &mut WebSocket, code: u16, reason: &str) -> Result<(), ProtocolError> {
279 let frame = CloseFrame {
280 code,
281 reason: reason.to_string().into(),
282 };
283 ws.send(Message::Close(Some(frame)))
284 .await
285 .map_err(|e| ProtocolError::Transport(e.to_string()))
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn peer_role_flip_is_involutive() {
294 assert_eq!(PeerRole::Server.flip(), PeerRole::Client);
295 assert_eq!(PeerRole::Client.flip(), PeerRole::Server);
296 assert_eq!(PeerRole::Server.flip().flip(), PeerRole::Server);
297 }
298
299 #[test]
300 fn next_outgoing_frame_for_send_cursor() {
301 use axon_frontend::session::SessionType;
302 let r = SessionRuntime::new(SessionType::send("Msg", SessionType::End), None);
303 match next_outgoing_frame(&r) {
304 Some(Frame::Send { payload_type, .. }) => assert_eq!(payload_type, "Msg"),
305 other => panic!("expected Send frame for Send cursor, got {other:?}"),
306 }
307 }
308
309 #[test]
310 fn next_outgoing_frame_for_recv_cursor_is_none() {
311 use axon_frontend::session::SessionType;
312 let r = SessionRuntime::new(SessionType::recv("Msg", SessionType::End), None);
313 assert!(next_outgoing_frame(&r).is_none());
314 }
315
316 #[test]
317 fn next_outgoing_frame_for_select_picks_first_label() {
318 use axon_frontend::session::SessionType;
319 let r = SessionRuntime::new(
320 SessionType::select([
321 ("zeta".into(), SessionType::End),
322 ("alpha".into(), SessionType::End),
323 ]),
324 None,
325 );
326 match next_outgoing_frame(&r) {
327 // BTreeMap keys in canonical order ⇒ "alpha" before "zeta".
328 Some(Frame::Select { label }) => assert_eq!(label, "alpha"),
329 other => panic!("expected Select(alpha), got {other:?}"),
330 }
331 }
332}