1use axum::{
5 Router,
6 extract::ws::{Message, WebSocket, WebSocketUpgrade},
7 response::IntoResponse,
8 routing::get,
9};
10use futures_util::StreamExt;
11use std::net::SocketAddr;
12use tracing::{error, info};
13
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use tokio::sync::broadcast;
17
18#[derive(Clone)]
20pub struct AppState {
21 pub patch_tx: broadcast::Sender<WsMessage>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum WsMessage {
27 Patch(super::patch_engine::RuntimePatch),
28 State(super::dev_runtime::RuntimeStateSnapshot),
29 Event(super::dev_runtime::RuntimeEvent),
30 Devtools(serde_json::Value),
31}
32
33async fn runtime_ws(
35 axum::extract::State(_state): axum::extract::State<AppState>,
36 ws: WebSocketUpgrade,
37) -> impl IntoResponse {
38 ws.on_upgrade(handle_runtime_socket)
39}
40
41async fn devtools_ws(
43 axum::extract::State(_state): axum::extract::State<AppState>,
44 ws: WebSocketUpgrade,
45) -> impl IntoResponse {
46 ws.on_upgrade(handle_devtools_socket)
47}
48
49async fn hotreload_ws(
51 axum::extract::State(state): axum::extract::State<AppState>,
52 ws: WebSocketUpgrade,
53) -> impl IntoResponse {
54 ws.on_upgrade(move |socket| handle_hotreload_socket(socket, state))
55}
56
57async fn agent_ws(
59 axum::extract::State(_state): axum::extract::State<AppState>,
60 ws: WebSocketUpgrade,
61) -> impl IntoResponse {
62 ws.on_upgrade(handle_agent_socket)
63}
64
65async fn handle_runtime_socket(mut ws: WebSocket) {
67 info!("Runtime WebSocket client connected");
68
69 let _ = ws
71 .send(Message::Text(
72 serde_json::to_string(&serde_json::json!({
73 "type": "handshake",
74 "payload": {
75 "client": "runtime",
76 "capabilities": ["patch", "state", "event"]
77 }
78 }))
79 .unwrap()
80 .into(),
81 ))
82 .await;
83
84 while let Some(result) = ws.next().await {
85 match result {
86 Ok(Message::Text(text)) => {
87 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
89 info!("Received runtime message: {}", message);
91 }
92 }
93 Ok(Message::Binary(bin)) => {
94 info!("Received binary message of {} bytes", bin.len());
96 }
97 Ok(Message::Close(_)) => {
98 info!("Runtime WebSocket client disconnected");
99 break;
100 }
101 Err(e) => {
102 error!("WebSocket error: {}", e);
103 break;
104 }
105 _ => {}
106 }
107 }
108}
109
110async fn handle_devtools_socket(mut ws: WebSocket) {
112 info!("DevTools WebSocket client connected");
113
114 while let Some(result) = ws.next().await {
115 match result {
116 Ok(Message::Text(text)) => {
117 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
119 info!("Received DevTools message: {}", message);
120 }
121 }
122 Ok(Message::Close(_)) => {
123 info!("DevTools WebSocket client disconnected");
124 break;
125 }
126 Err(e) => {
127 error!("DevTools WebSocket error: {}", e);
128 break;
129 }
130 _ => {}
131 }
132 }
133}
134
135async fn handle_hotreload_socket(mut ws: WebSocket, state: AppState) {
137 info!("Hot reload WebSocket client connected");
138
139 let mut patch_rx = state.patch_tx.subscribe();
140
141 loop {
142 tokio::select! {
143 Ok(msg) = patch_rx.recv() => {
145 if let Ok(serialized) = serde_json::to_string(&msg) {
146 if let Err(e) = ws.send(Message::Text(serialized.into())).await {
147 error!("Failed to send patch to client: {}", e);
148 break;
149 }
150 }
151 }
152 Some(result) = ws.next() => {
154 match result {
155 Ok(Message::Close(_)) => {
156 info!("Hot reload WebSocket client disconnected");
157 break;
158 }
159 Err(e) => {
160 error!("Hot reload WebSocket error: {}", e);
161 break;
162 }
163 _ => {}
164 }
165 }
166 }
167 }
168}
169
170async fn handle_agent_socket(mut ws: WebSocket) {
172 info!("Agent stream WebSocket client connected");
173
174 while let Some(result) = ws.next().await {
175 match result {
176 Ok(Message::Text(text)) => {
177 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
179 info!("Received agent message: {}", message);
180 }
181 }
182 Ok(Message::Close(_)) => {
183 info!("Agent stream WebSocket client disconnected");
184 break;
185 }
186 Err(e) => {
187 error!("Agent stream WebSocket error: {}", e);
188 break;
189 }
190 _ => {}
191 }
192 }
193}
194
195pub fn create_router(state: AppState) -> Router {
197 Router::new()
198 .route("/ws/runtime", get(runtime_ws))
199 .route("/ws/devtools", get(devtools_ws))
200 .route("/ws/hotreload", get(hotreload_ws))
201 .route("/ws/agent", get(agent_ws))
202 .with_state(state)
203}
204
205pub async fn start_server(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
207 let (tx, _) = broadcast::channel(100);
208 let state = AppState {
209 patch_tx: tx.clone(),
210 };
211
212 let tx_clone = tx.clone();
214 let patch_engine = Arc::new(tokio::sync::Mutex::new(
215 super::patch_engine::PatchEngine::new(),
216 ));
217
218 super::build_pipeline::BuildPipeline::watch_changes(".", move |artifact| {
219 let mut engine = patch_engine.blocking_lock();
220 let patch = engine.generate_patch(artifact);
221 let _ = tx_clone.send(WsMessage::Patch(patch));
222 });
223
224 let app = create_router(state);
225 info!("Starting WebSocket server on {}", addr);
226
227 let listener = tokio::net::TcpListener::bind(addr).await?;
228 axum::serve(listener, app).await?;
229
230 Ok(())
231}