Skip to main content

cvkg_cli/
ws_server.rs

1//! WebSocket Server
2//! Multiplexed WebSocket server for runtime communication, DevTools, hot reload, and agent streams
3
4use axum::{
5    Router,
6    extract::ws::{Message, WebSocket, WebSocketUpgrade},
7    response::IntoResponse,
8    routing::get,
9};
10use futures_util::StreamExt;
11use std::net::SocketAddr;
12use tracing::{error, info};
13
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use tokio::sync::broadcast;
17
18/// Shared application state for the WebSocket server
19#[derive(Clone)]
20pub struct AppState {
21    pub patch_tx: broadcast::Sender<WsMessage>,
22}
23
24/// WebSocket message types
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum WsMessage {
27    Patch(super::patch_engine::RuntimePatch),
28    State(super::dev_runtime::RuntimeStateSnapshot),
29    Event(super::dev_runtime::RuntimeEvent),
30    Devtools(serde_json::Value),
31}
32
33/// WebSocket handler for runtime communication
34async fn runtime_ws(
35    axum::extract::State(_state): axum::extract::State<AppState>,
36    ws: WebSocketUpgrade,
37) -> impl IntoResponse {
38    ws.on_upgrade(handle_runtime_socket)
39}
40
41/// WebSocket handler for DevTools
42async fn devtools_ws(
43    axum::extract::State(_state): axum::extract::State<AppState>,
44    ws: WebSocketUpgrade,
45) -> impl IntoResponse {
46    ws.on_upgrade(handle_devtools_socket)
47}
48
49/// WebSocket handler for hot reload
50async fn hotreload_ws(
51    axum::extract::State(state): axum::extract::State<AppState>,
52    ws: WebSocketUpgrade,
53) -> impl IntoResponse {
54    ws.on_upgrade(move |socket| handle_hotreload_socket(socket, state))
55}
56
57/// WebSocket handler for agent streams
58async fn agent_ws(
59    axum::extract::State(_state): axum::extract::State<AppState>,
60    ws: WebSocketUpgrade,
61) -> impl IntoResponse {
62    ws.on_upgrade(handle_agent_socket)
63}
64
65/// Handle runtime WebSocket connection
66async fn handle_runtime_socket(mut ws: WebSocket) {
67    info!("Runtime WebSocket client connected");
68
69    // Send initial handshake
70    let _ = ws
71        .send(Message::Text(
72            serde_json::to_string(&serde_json::json!({
73                "type": "handshake",
74                "payload": {
75                    "client": "runtime",
76                    "capabilities": ["patch", "state", "event"]
77                }
78            }))
79            .unwrap()
80            .into(),
81        ))
82        .await;
83
84    while let Some(result) = ws.next().await {
85        match result {
86            Ok(Message::Text(text)) => {
87                // Handle incoming runtime messages
88                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
89                    // Process runtime messages
90                    info!("Received runtime message: {}", message);
91                }
92            }
93            Ok(Message::Binary(bin)) => {
94                // Handle binary messages if needed
95                info!("Received binary message of {} bytes", bin.len());
96            }
97            Ok(Message::Close(_)) => {
98                info!("Runtime WebSocket client disconnected");
99                break;
100            }
101            Err(e) => {
102                error!("WebSocket error: {}", e);
103                break;
104            }
105            _ => {}
106        }
107    }
108}
109
110/// Handle DevTools WebSocket connection
111async fn handle_devtools_socket(mut ws: WebSocket) {
112    info!("DevTools WebSocket client connected");
113
114    while let Some(result) = ws.next().await {
115        match result {
116            Ok(Message::Text(text)) => {
117                // Handle DevTools messages
118                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
119                    info!("Received DevTools message: {}", message);
120                }
121            }
122            Ok(Message::Close(_)) => {
123                info!("DevTools WebSocket client disconnected");
124                break;
125            }
126            Err(e) => {
127                error!("DevTools WebSocket error: {}", e);
128                break;
129            }
130            _ => {}
131        }
132    }
133}
134
135/// Handle hot reload WebSocket connection
136async fn handle_hotreload_socket(mut ws: WebSocket, state: AppState) {
137    info!("Hot reload WebSocket client connected");
138
139    let mut patch_rx = state.patch_tx.subscribe();
140
141    loop {
142        tokio::select! {
143            // Listen for broadcasted patches from the build pipeline
144            Ok(msg) = patch_rx.recv() => {
145                if let Ok(serialized) = serde_json::to_string(&msg) {
146                    if let Err(e) = ws.send(Message::Text(serialized.into())).await {
147                        error!("Failed to send patch to client: {}", e);
148                        break;
149                    }
150                }
151            }
152            // Listen for client messages or disconnects
153            Some(result) = ws.next() => {
154                match result {
155                    Ok(Message::Close(_)) => {
156                        info!("Hot reload WebSocket client disconnected");
157                        break;
158                    }
159                    Err(e) => {
160                        error!("Hot reload WebSocket error: {}", e);
161                        break;
162                    }
163                    _ => {}
164                }
165            }
166        }
167    }
168}
169
170/// Handle agent stream WebSocket connection
171async fn handle_agent_socket(mut ws: WebSocket) {
172    info!("Agent stream WebSocket client connected");
173
174    while let Some(result) = ws.next().await {
175        match result {
176            Ok(Message::Text(text)) => {
177                // Handle agent stream messages
178                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
179                    info!("Received agent message: {}", message);
180                }
181            }
182            Ok(Message::Close(_)) => {
183                info!("Agent stream WebSocket client disconnected");
184                break;
185            }
186            Err(e) => {
187                error!("Agent stream WebSocket error: {}", e);
188                break;
189            }
190            _ => {}
191        }
192    }
193}
194
195/// Create the WebSocket router with all endpoints
196pub fn create_router(state: AppState) -> Router {
197    Router::new()
198        .route("/ws/runtime", get(runtime_ws))
199        .route("/ws/devtools", get(devtools_ws))
200        .route("/ws/hotreload", get(hotreload_ws))
201        .route("/ws/agent", get(agent_ws))
202        .with_state(state)
203}
204
205/// Start the WebSocket server
206pub async fn start_server(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
207    let (tx, _) = broadcast::channel(100);
208    let state = AppState {
209        patch_tx: tx.clone(),
210    };
211
212    // Start the build pipeline watcher
213    let tx_clone = tx.clone();
214    let patch_engine = Arc::new(tokio::sync::Mutex::new(
215        super::patch_engine::PatchEngine::new(),
216    ));
217
218    super::build_pipeline::BuildPipeline::watch_changes(".", move |artifact| {
219        let mut engine = patch_engine.blocking_lock();
220        let patch = engine.generate_patch(artifact);
221        let _ = tx_clone.send(WsMessage::Patch(patch));
222    });
223
224    let app = create_router(state);
225    info!("Starting WebSocket server on {}", addr);
226
227    let listener = tokio::net::TcpListener::bind(addr).await?;
228    axum::serve(listener, app).await?;
229
230    Ok(())
231}