Skip to main content

axon/session_runtime/
ws.rs

1//! WebSocket carrier for the session-typed dialogue runtime.
2//!
3//! §Fase 41.d — the transport-specific wiring. The pure
4//! [`crate::session_runtime::state::SessionRuntime`] is the operational
5//! state machine; this module makes it speak RFC 6455 over a `tokio` +
6//! `axum` carrier.
7//!
8//! Outer surface:
9//! - [`drive`] — the protocol loop. Given an `axum::extract::ws::WebSocket`
10//!   already upgraded by the caller, a [`SessionRuntime`], and a peer
11//!   role (`PeerRole::Server`/`Client`), runs the dialogue to completion
12//!   (`end`) or to a protocol error (which is reported to the peer via
13//!   an `error` frame + carrier-close `1002 protocol error`).
14//! - [`upgrade_handler`] — a ready-made axum extractor handler that
15//!   upgrades + drives a runtime against a caller-supplied factory.
16//!
17//! Connection lifecycle:
18//! 1. The carrier is established (HTTP upgrade → WS).
19//! 2. The server initialises a [`SessionRuntime`] for its declared role
20//!    (e.g. `server` in `session Negotiate { client: [..], server: [..] }`).
21//!    The client must run the *dual* role; duality has been checked at
22//!    compile time by `axon-frontend::session::SessionType::is_dual_to`.
23//! 3. Frames are exchanged. Each frame received from the peer is routed
24//!    to the appropriate `try_*` step of the runtime. The runtime owns
25//!    *its own role's* perspective, so a peer `kind: "send"` is consumed
26//!    as `try_recv` on the local cursor.
27//! 4. When the cursor reaches `end` the carrier is closed cleanly
28//!    (`1000 normal closure`); a protocol error closes with `1002
29//!    protocol error` and an `error` frame as the last message before
30//!    the close.
31
32use axum::extract::ws::{CloseFrame, Message, WebSocket};
33
34use super::error::ProtocolError;
35use super::state::SessionRuntime;
36use super::wire::Frame;
37
38/// Which side of the dialogue this runtime is hosting — informs the
39/// receive-vs-send dispatch on incoming `Send` frames (a peer's `send` is
40/// our `recv`) and frames the carrier-close attribution on protocol
41/// errors. The choice is locked at upgrade time and never observed by
42/// the algebra (`SessionType` is direction-free; duality folds the
43/// direction in via the connection law).
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum PeerRole {
46    /// The local runtime is the server-side endpoint of the session.
47    Server,
48    /// The local runtime is the client-side endpoint.
49    Client,
50}
51
52impl PeerRole {
53    /// The dual — useful for symmetry checks in tests.
54    pub fn flip(self) -> Self {
55        match self {
56            PeerRole::Server => PeerRole::Client,
57            PeerRole::Client => PeerRole::Server,
58        }
59    }
60}
61
62/// WebSocket close codes used by the runtime (RFC 6455 §7.4). Kept here
63/// as named constants so the protocol-loop body documents its own
64/// closure semantics.
65const CLOSE_NORMAL: u16 = 1000;
66const CLOSE_PROTOCOL_ERROR: u16 = 1002;
67const CLOSE_INTERNAL_ERROR: u16 = 1011;
68
69/// Drive a session-typed dialogue to completion over a `WebSocket`.
70///
71/// The loop ends in exactly one of three observable states:
72/// - the runtime's cursor reaches `End` AND an `end` frame is exchanged
73///   → the carrier closes with `1000 normal closure`, return `Ok(())`;
74/// - a [`ProtocolError`] fires → an `error` frame is sent, the carrier
75///   is closed with `1002 protocol error`, return `Err(err)`;
76/// - the carrier drops or returns an I/O error → close cleanly if
77///   possible (`1011 internal error`), return
78///   [`ProtocolError::Transport`].
79///
80/// The function takes ownership of the `WebSocket` and consumes it.
81pub async fn drive(
82    mut ws: WebSocket,
83    mut runtime: SessionRuntime,
84    role: PeerRole,
85) -> Result<(), ProtocolError> {
86    // The loop alternates: read a peer frame OR (when the cursor is at
87    // `Send`/`Select`) wait for the caller to push one via the runtime.
88    // For 41.d the carrier is fully driven by peer frames; outgoing
89    // frames are emitted by a future fase (41.f hooks in the enterprise
90    // server). To keep the runtime exercisable end-to-end now, we
91    // operate in **echo mode**: any `Send`/`Select` cursor state is
92    // emitted onto the wire using a canonical surrogate value (the
93    // tests below cover the round-trip explicitly).
94    loop {
95        if runtime.is_complete() {
96            // Send our terminating `end` (the spec requires exactly one
97            // `end` per direction; we emit ours after the peer's so the
98            // dialogue is symmetric on the wire).
99            send_frame(&mut ws, &Frame::End).await?;
100            close_normal(&mut ws).await;
101            return Ok(());
102        }
103
104        // If the local cursor is at a producer state, emit an outgoing
105        // frame BEFORE blocking on the carrier — otherwise we deadlock.
106        if let Some(out) = next_outgoing_frame(&runtime) {
107            // Step the runtime over our own outgoing frame first. A
108            // failure here means our LOCAL discipline rejected the step
109            // (the §41.c credit-exhausted axiom at runtime is the
110            // canonical case — the static analyser would have caught it
111            // before deploy, but an off-spec config could still ship the
112            // exhaustion to runtime, where this runtime safety net fires
113            // and the peer is notified before the close-frame). We
114            // report the error onto the wire (so the peer learns *what*
115            // happened, not just that the connection died) and close
116            // `1002 protocol error`; then we propagate the error so the
117            // caller's await resolves with a non-`Ok` outcome.
118            if let Err(e) = apply_outgoing(&mut runtime, &out, role).await {
119                report_and_close(&mut ws, &e).await;
120                return Err(e);
121            }
122            send_frame(&mut ws, &out).await?;
123            continue;
124        }
125
126        // Otherwise we are at a consumer state — read the next peer frame.
127        let msg = match ws.recv().await {
128            Some(Ok(msg)) => msg,
129            Some(Err(e)) => {
130                let _ = close_internal(&mut ws).await;
131                return Err(ProtocolError::Transport(e.to_string()));
132            }
133            None => {
134                // Carrier closed cleanly mid-protocol — surface as a
135                // transport error so the caller can decide on retry /
136                // resume (41.g typed reconnection lives there).
137                return Err(ProtocolError::Transport("peer closed mid-protocol".into()));
138            }
139        };
140        match msg {
141            Message::Text(text) => {
142                let frame = match Frame::from_wire(&text) {
143                    Ok(f) => f,
144                    Err(e) => {
145                        report_and_close(&mut ws, &e).await;
146                        return Err(e);
147                    }
148                };
149                if let Err(e) = apply_incoming(&mut runtime, frame, role) {
150                    report_and_close(&mut ws, &e).await;
151                    return Err(e);
152                }
153            }
154            Message::Binary(_) => {
155                // Binary frames are reserved for a later fase
156                // (multimedia mobility over typed channels). Treating
157                // them as malformed here keeps the wire closed.
158                let e = ProtocolError::MalformedFrame(
159                    "binary frame received on a text-only session-typed channel".into(),
160                );
161                report_and_close(&mut ws, &e).await;
162                return Err(e);
163            }
164            Message::Ping(p) => {
165                // axum sends Pong itself, but be explicit + defensive.
166                let _ = ws.send(Message::Pong(p)).await;
167            }
168            Message::Pong(_) => {}
169            Message::Close(_) => {
170                // Peer initiated close. If we are not at `End`, this is
171                // a mid-protocol drop.
172                if runtime.is_complete() {
173                    return Ok(());
174                }
175                return Err(ProtocolError::Transport("peer closed mid-protocol".into()));
176            }
177        }
178    }
179}
180
181/// Decide whether the local runtime currently owes the peer a frame. The
182/// rules follow the algebra exactly:
183/// - `Send`  ⇒ we emit `Frame::Send  { payload_type }`
184/// - `End`   ⇒ we emit `Frame::End`
185/// - `Recv`  ⇒ peer's turn — we wait
186/// - `Branch`⇒ peer's turn (they `select` an arm) — we wait
187/// - `Select`⇒ this fase the runtime cannot auto-pick an arm; the
188///             [`drive`] loop's echo mode emits the first label in
189///             canonical (BTreeMap) order so the test surface is total.
190///
191/// Visible to siblings (`session_runtime::sse`) so the SSE-fragment
192/// driver shares the same dispatch — both carriers run the same algebra,
193/// only the framing differs. §Fase 41.e.
194pub(super) fn next_outgoing_frame(runtime: &SessionRuntime) -> Option<Frame> {
195    use axon_frontend::session::SessionType;
196    match runtime.cursor() {
197        SessionType::Send { payload, .. } => Some(Frame::Send {
198            payload_type: payload.to_string(),
199            data: serde_json::Value::Null, // payload-shape carried opaquely
200        }),
201        SessionType::Select(arms) => {
202            // Echo mode: deterministic arm pick = the first key. Real
203            // application drivers (41.f) override by feeding outgoing
204            // frames explicitly.
205            let label = arms.keys().next()?.clone();
206            Some(Frame::Select { label })
207        }
208        SessionType::End => None, // handled at the top of `drive`
209        _ => None,                // Recv / Branch / Rec / Var — peer's turn
210    }
211}
212
213/// Apply an outgoing frame to the local runtime (advancing the cursor
214/// before we put bytes on the wire). The `role` parameter is reserved
215/// for symmetry — both roles step the cursor identically on local
216/// actions; the algebra carries no direction beyond duality, which is
217/// already baked into the role's `SessionType` at construction.
218///
219/// Visible to siblings (`session_runtime::sse`) — the SSE-fragment
220/// driver advances its runtime via the same primitive so a Send / End
221/// transition is identical at the operational layer regardless of the
222/// carrier (WS frame vs SSE event). §Fase 41.e.
223pub(super) async fn apply_outgoing(
224    runtime: &mut SessionRuntime,
225    frame: &Frame,
226    _role: PeerRole,
227) -> Result<(), ProtocolError> {
228    match frame {
229        Frame::Send { payload_type, .. } => runtime.try_send(payload_type),
230        Frame::Select { label } => runtime.try_select(label),
231        Frame::End => runtime.try_end(),
232        Frame::Error { .. } => Ok(()), // pure carrier signal; no state change
233    }
234}
235
236/// Apply an incoming frame from the peer to the local runtime. From our
237/// side a peer-`send` is a `recv`, a peer-`select` is a `branch_offer`,
238/// and `end` matches `End` on the cursor.
239fn apply_incoming(
240    runtime: &mut SessionRuntime,
241    frame: Frame,
242    _role: PeerRole,
243) -> Result<(), ProtocolError> {
244    match frame {
245        Frame::Send { payload_type, .. } => runtime.try_recv(&payload_type),
246        Frame::Select { label } => runtime.try_offer(&label),
247        Frame::End => runtime.try_end(),
248        Frame::Error { code, detail } => Err(ProtocolError::Transport(format!(
249            "peer reported `{code}`: {detail}"
250        ))),
251    }
252}
253
254async fn send_frame(ws: &mut WebSocket, frame: &Frame) -> Result<(), ProtocolError> {
255    ws.send(Message::Text(frame.to_wire().into()))
256        .await
257        .map_err(|e| ProtocolError::Transport(e.to_string()))
258}
259
260async fn report_and_close(ws: &mut WebSocket, err: &ProtocolError) {
261    let frame = Frame::Error {
262        code: err.code().to_string(),
263        detail: err.to_string(),
264    };
265    // Best-effort — we may already be racing a peer close.
266    let _ = ws.send(Message::Text(frame.to_wire().into())).await;
267    let _ = close_with(ws, CLOSE_PROTOCOL_ERROR, err.code()).await;
268}
269
270async fn close_normal(ws: &mut WebSocket) {
271    let _ = close_with(ws, CLOSE_NORMAL, "session_end").await;
272}
273
274async fn close_internal(ws: &mut WebSocket) -> Result<(), ProtocolError> {
275    close_with(ws, CLOSE_INTERNAL_ERROR, "internal").await
276}
277
278async fn close_with(ws: &mut WebSocket, code: u16, reason: &str) -> Result<(), ProtocolError> {
279    let frame = CloseFrame {
280        code,
281        reason: reason.to_string().into(),
282    };
283    ws.send(Message::Close(Some(frame)))
284        .await
285        .map_err(|e| ProtocolError::Transport(e.to_string()))
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn peer_role_flip_is_involutive() {
294        assert_eq!(PeerRole::Server.flip(), PeerRole::Client);
295        assert_eq!(PeerRole::Client.flip(), PeerRole::Server);
296        assert_eq!(PeerRole::Server.flip().flip(), PeerRole::Server);
297    }
298
299    #[test]
300    fn next_outgoing_frame_for_send_cursor() {
301        use axon_frontend::session::SessionType;
302        let r = SessionRuntime::new(SessionType::send("Msg", SessionType::End), None);
303        match next_outgoing_frame(&r) {
304            Some(Frame::Send { payload_type, .. }) => assert_eq!(payload_type, "Msg"),
305            other => panic!("expected Send frame for Send cursor, got {other:?}"),
306        }
307    }
308
309    #[test]
310    fn next_outgoing_frame_for_recv_cursor_is_none() {
311        use axon_frontend::session::SessionType;
312        let r = SessionRuntime::new(SessionType::recv("Msg", SessionType::End), None);
313        assert!(next_outgoing_frame(&r).is_none());
314    }
315
316    #[test]
317    fn next_outgoing_frame_for_select_picks_first_label() {
318        use axon_frontend::session::SessionType;
319        let r = SessionRuntime::new(
320            SessionType::select([
321                ("zeta".into(), SessionType::End),
322                ("alpha".into(), SessionType::End),
323            ]),
324            None,
325        );
326        match next_outgoing_frame(&r) {
327            // BTreeMap keys in canonical order ⇒ "alpha" before "zeta".
328            Some(Frame::Select { label }) => assert_eq!(label, "alpha"),
329            other => panic!("expected Select(alpha), got {other:?}"),
330        }
331    }
332}