Skip to main content

atomr_agents_coding_cli_harness_web/
ws.rs

1//! WebSocket terminal bridge for interactive tmux-wrapped sessions.
2
3use atomr_agents_coding_cli_core::CliSessionId;
4use atomr_agents_coding_cli_harness::SessionEvent;
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::{Path, State};
7use axum::response::Response;
8use serde::Deserialize;
9use tracing::warn;
10
11use crate::AppState;
12
13/// `GET /api/cli/sessions/:id/io` — WebSocket upgrade. Binary frames
14/// flow in both directions for PTY bytes; text frames carry a small
15/// control protocol (resize).
16pub async fn session_ws(
17    ws: WebSocketUpgrade,
18    Path(id): Path<String>,
19    State(state): State<AppState>,
20) -> Response {
21    ws.on_upgrade(move |socket| handle_socket(socket, CliSessionId::from(id), state))
22}
23
24#[derive(Debug, Deserialize)]
25#[serde(tag = "kind", rename_all = "snake_case")]
26enum ControlFrame {
27    Resize { cols: u16, rows: u16 },
28    Detach,
29}
30
31async fn handle_socket(mut socket: WebSocket, id: CliSessionId, state: AppState) {
32    let Some(handle) = state.harness.sessions().get(&id) else {
33        let _ = socket
34            .send(Message::Close(Some(axum::extract::ws::CloseFrame {
35                code: 1011,
36                reason: "session not found".into(),
37            })))
38            .await;
39        return;
40    };
41
42    let mut rx = handle.subscribe();
43    let input = handle.input.clone();
44
45    loop {
46        tokio::select! {
47            // PTY → client
48            msg = rx.recv() => {
49                match msg {
50                    Ok(SessionEvent::Bytes(bytes)) => {
51                        if socket.send(Message::Binary(bytes)).await.is_err() {
52                            break;
53                        }
54                    }
55                    Ok(SessionEvent::Exited { code }) => {
56                        let body = serde_json::json!({"kind":"exited","code":code}).to_string();
57                        let _ = socket.send(Message::Text(body)).await;
58                        let _ = socket.send(Message::Close(None)).await;
59                        break;
60                    }
61                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
62                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
63                }
64            }
65            // client → PTY
66            client = socket.recv() => {
67                match client {
68                    Some(Ok(Message::Binary(bytes))) => {
69                        let _ = input.send(atomr_agents_coding_cli_harness::SessionTransport::Stdin(bytes)).await;
70                    }
71                    Some(Ok(Message::Text(s))) => {
72                        match serde_json::from_str::<ControlFrame>(&s) {
73                            Ok(ControlFrame::Resize { cols, rows }) => {
74                                let _ = handle.resize(cols, rows).await;
75                            }
76                            Ok(ControlFrame::Detach) => {
77                                let _ = handle.detach().await;
78                                break;
79                            }
80                            Err(e) => warn!(error = %e, "bad control frame; ignoring"),
81                        }
82                    }
83                    Some(Ok(Message::Close(_))) | None => break,
84                    Some(Ok(_)) => continue,
85                    Some(Err(e)) => {
86                        warn!(error = %e, "websocket error");
87                        break;
88                    }
89                }
90            }
91        }
92    }
93}