1use axum::{
5 Router,
6 extract::ws::{Message, WebSocket, WebSocketUpgrade},
7 response::IntoResponse,
8 routing::get,
9};
10use futures_util::StreamExt;
11use std::net::SocketAddr;
12use tracing::{error, info};
13
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use tokio::sync::broadcast;
17
18#[derive(Clone)]
20pub struct AppState {
21 pub patch_tx: broadcast::Sender<WsMessage>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum WsMessage {
27 Patch(super::patch_engine::RuntimePatch),
28 State(super::dev_runtime::RuntimeStateSnapshot),
29 Event(super::dev_runtime::RuntimeEvent),
30 Devtools(serde_json::Value),
31}
32
33async fn runtime_ws(
35 axum::extract::State(_state): axum::extract::State<AppState>,
36 ws: WebSocketUpgrade,
37) -> impl IntoResponse {
38 ws.on_upgrade(handle_runtime_socket)
39}
40
41async fn devtools_ws(
43 axum::extract::State(_state): axum::extract::State<AppState>,
44 ws: WebSocketUpgrade,
45) -> impl IntoResponse {
46 ws.on_upgrade(handle_devtools_socket)
47}
48
49async fn hotreload_ws(
51 axum::extract::State(state): axum::extract::State<AppState>,
52 ws: WebSocketUpgrade,
53) -> impl IntoResponse {
54 ws.on_upgrade(move |socket| handle_hotreload_socket(socket, state))
55}
56
57async fn agent_ws(
59 axum::extract::State(_state): axum::extract::State<AppState>,
60 ws: WebSocketUpgrade,
61) -> impl IntoResponse {
62 ws.on_upgrade(handle_agent_socket)
63}
64
65async fn handle_runtime_socket(mut ws: WebSocket) {
67 info!("Runtime WebSocket client connected");
68
69 let _ = ws
71 .send(Message::Text(
72 serde_json::to_string(&serde_json::json!({
73 "type": "handshake",
74 "payload": {
75 "client": "runtime",
76 "capabilities": ["patch", "state", "event"]
77 }
78 }))
79 .unwrap(),
80 ))
81 .await;
82
83 while let Some(result) = ws.next().await {
84 match result {
85 Ok(Message::Text(text)) => {
86 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
88 info!("Received runtime message: {}", message);
90 }
91 }
92 Ok(Message::Binary(bin)) => {
93 info!("Received binary message of {} bytes", bin.len());
95 }
96 Ok(Message::Close(_)) => {
97 info!("Runtime WebSocket client disconnected");
98 break;
99 }
100 Err(e) => {
101 error!("WebSocket error: {}", e);
102 break;
103 }
104 _ => {}
105 }
106 }
107}
108
109async fn handle_devtools_socket(mut ws: WebSocket) {
111 info!("DevTools WebSocket client connected");
112
113 while let Some(result) = ws.next().await {
114 match result {
115 Ok(Message::Text(text)) => {
116 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
118 info!("Received DevTools message: {}", message);
119 }
120 }
121 Ok(Message::Close(_)) => {
122 info!("DevTools WebSocket client disconnected");
123 break;
124 }
125 Err(e) => {
126 error!("DevTools WebSocket error: {}", e);
127 break;
128 }
129 _ => {}
130 }
131 }
132}
133
134async fn handle_hotreload_socket(mut ws: WebSocket, state: AppState) {
136 info!("Hot reload WebSocket client connected");
137
138 let mut patch_rx = state.patch_tx.subscribe();
139
140 loop {
141 tokio::select! {
142 Ok(msg) = patch_rx.recv() => {
144 if let Ok(serialized) = serde_json::to_string(&msg)
145 && let Err(e) = ws.send(Message::Text(serialized)).await {
146 error!("Failed to send patch to client: {}", e);
147 break;
148 }
149 }
150 Some(result) = ws.next() => {
152 match result {
153 Ok(Message::Close(_)) => {
154 info!("Hot reload WebSocket client disconnected");
155 break;
156 }
157 Err(e) => {
158 error!("Hot reload WebSocket error: {}", e);
159 break;
160 }
161 _ => {}
162 }
163 }
164 }
165 }
166}
167
168async fn handle_agent_socket(mut ws: WebSocket) {
170 info!("Agent stream WebSocket client connected");
171
172 while let Some(result) = ws.next().await {
173 match result {
174 Ok(Message::Text(text)) => {
175 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
177 info!("Received agent message: {}", message);
178 }
179 }
180 Ok(Message::Close(_)) => {
181 info!("Agent stream WebSocket client disconnected");
182 break;
183 }
184 Err(e) => {
185 error!("Agent stream WebSocket error: {}", e);
186 break;
187 }
188 _ => {}
189 }
190 }
191}
192
193pub fn create_router(state: AppState) -> Router {
195 Router::new()
196 .route("/ws/runtime", get(runtime_ws))
197 .route("/ws/devtools", get(devtools_ws))
198 .route("/ws/hotreload", get(hotreload_ws))
199 .route("/ws/agent", get(agent_ws))
200 .with_state(state)
201}
202
203pub async fn start_server(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
205 let (tx, _) = broadcast::channel(100);
206 let state = AppState {
207 patch_tx: tx.clone(),
208 };
209
210 let tx_clone = tx.clone();
212 let patch_engine = Arc::new(tokio::sync::Mutex::new(
213 super::patch_engine::PatchEngine::new(),
214 ));
215
216 super::build_pipeline::BuildPipeline::watch_changes(".", move |artifact| {
217 let mut engine = patch_engine.blocking_lock();
218 let patch = engine.generate_patch(artifact);
219 let _ = tx_clone.send(WsMessage::Patch(patch));
220 });
221
222 let app = create_router(state);
223 info!("Starting WebSocket server on {}", addr);
224
225 let listener = tokio::net::TcpListener::bind(addr).await?;
226 axum::serve(listener, app).await?;
227
228 Ok(())
229}