atomr_agents_coding_cli_harness_web/
ws.rs1use atomr_agents_coding_cli_core::CliSessionId;
4use atomr_agents_coding_cli_harness::SessionEvent;
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::{Path, State};
7use axum::response::Response;
8use serde::Deserialize;
9use tracing::warn;
10
11use crate::AppState;
12
13pub async fn session_ws(
17 ws: WebSocketUpgrade,
18 Path(id): Path<String>,
19 State(state): State<AppState>,
20) -> Response {
21 ws.on_upgrade(move |socket| handle_socket(socket, CliSessionId::from(id), state))
22}
23
24#[derive(Debug, Deserialize)]
25#[serde(tag = "kind", rename_all = "snake_case")]
26enum ControlFrame {
27 Resize { cols: u16, rows: u16 },
28 Detach,
29}
30
31async fn handle_socket(mut socket: WebSocket, id: CliSessionId, state: AppState) {
32 let Some(handle) = state.harness.sessions().get(&id) else {
33 let _ = socket
34 .send(Message::Close(Some(axum::extract::ws::CloseFrame {
35 code: 1011,
36 reason: "session not found".into(),
37 })))
38 .await;
39 return;
40 };
41
42 let mut rx = handle.subscribe();
43 let input = handle.input.clone();
44
45 loop {
46 tokio::select! {
47 msg = rx.recv() => {
49 match msg {
50 Ok(SessionEvent::Bytes(bytes)) => {
51 if socket.send(Message::Binary(bytes)).await.is_err() {
52 break;
53 }
54 }
55 Ok(SessionEvent::Exited { code }) => {
56 let body = serde_json::json!({"kind":"exited","code":code}).to_string();
57 let _ = socket.send(Message::Text(body)).await;
58 let _ = socket.send(Message::Close(None)).await;
59 break;
60 }
61 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
62 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
63 }
64 }
65 client = socket.recv() => {
67 match client {
68 Some(Ok(Message::Binary(bytes))) => {
69 let _ = input.send(atomr_agents_coding_cli_harness::SessionTransport::Stdin(bytes)).await;
70 }
71 Some(Ok(Message::Text(s))) => {
72 match serde_json::from_str::<ControlFrame>(&s) {
73 Ok(ControlFrame::Resize { cols, rows }) => {
74 let _ = handle.resize(cols, rows).await;
75 }
76 Ok(ControlFrame::Detach) => {
77 let _ = handle.detach().await;
78 break;
79 }
80 Err(e) => warn!(error = %e, "bad control frame; ignoring"),
81 }
82 }
83 Some(Ok(Message::Close(_))) | None => break,
84 Some(Ok(_)) => continue,
85 Some(Err(e)) => {
86 warn!(error = %e, "websocket error");
87 break;
88 }
89 }
90 }
91 }
92 }
93}