Skip to main content

cvkg_cli/
ws_server.rs

1//! WebSocket Server
2//! Multiplexed WebSocket server for runtime communication, DevTools, hot reload, and agent streams
3
4use axum::{
5    extract::ws::{WebSocketUpgrade, WebSocket, Message},
6    response::IntoResponse,   routing::get,
7    Router,
8};
9use futures_util::StreamExt;
10use std::net::SocketAddr;
11use tracing::{info, error};
12
13use serde::{Deserialize, Serialize};
14
15/// WebSocket message types
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub enum WsMessage {
18    Patch(super::patch_engine::RuntimePatch),
19    State(super::dev_runtime::RuntimeStateSnapshot),
20    Event(super::dev_runtime::RuntimeEvent),
21    Devtools(serde_json::Value),
22}
23
24/// WebSocket handler for runtime communication
25async fn runtime_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
26    ws.on_upgrade(handle_runtime_socket)
27}
28
29/// WebSocket handler for DevTools
30async fn devtools_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
31    ws.on_upgrade(handle_devtools_socket)
32}
33
34/// WebSocket handler for hot reload
35async fn hotreload_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
36    ws.on_upgrade(handle_hotreload_socket)
37}
38
39/// WebSocket handler for agent streams
40async fn agent_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
41    ws.on_upgrade(handle_agent_socket)
42}
43
44/// Handle runtime WebSocket connection
45async fn handle_runtime_socket(mut ws: WebSocket) {
46    info!("Runtime WebSocket client connected");
47    
48    // Send initial handshake
49    let _ = ws
50        .send(Message::Text(
51            serde_json::to_string(&serde_json::json!({
52                "type": "handshake",
53                "payload": {
54                    "client": "runtime",
55                    "capabilities": ["patch", "state", "event"]
56                }
57            })).unwrap().into(),
58        ))
59        .await;
60    
61    while let Some(result) = ws.next().await {
62        match result {
63            Ok(Message::Text(text)) => {
64                // Handle incoming runtime messages
65                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
66                    // Process runtime messages
67                    info!("Received runtime message: {}", message);
68                }
69            }
70            Ok(Message::Binary(bin)) => {
71                // Handle binary messages if needed
72                info!("Received binary message of {} bytes", bin.len());
73            }
74            Ok(Message::Close(_)) => {
75                info!("Runtime WebSocket client disconnected");
76                break;
77            }
78            Err(e) => {
79                error!("WebSocket error: {}", e);
80                break;
81            }
82            _ => {}
83        }
84    }
85}
86
87/// Handle DevTools WebSocket connection
88async fn handle_devtools_socket(mut ws: WebSocket) {
89    info!("DevTools WebSocket client connected");
90    
91    while let Some(result) = ws.next().await {
92        match result {
93            Ok(Message::Text(text)) => {
94                // Handle DevTools messages
95                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
96                    info!("Received DevTools message: {}", message);
97                }
98            }
99            Ok(Message::Close(_)) => {
100                info!("DevTools WebSocket client disconnected");
101                break;
102            }
103            Err(e) => {
104                error!("DevTools WebSocket error: {}", e);
105                break;
106            }
107            _ => {}
108        }
109    }
110}
111
112/// Handle hot reload WebSocket connection
113async fn handle_hotreload_socket(mut ws: WebSocket) {
114    info!("Hot reload WebSocket client connected");
115    
116    while let Some(result) = ws.next().await {
117        match result {
118            Ok(Message::Text(text)) => {
119                // Handle hot reload messages (patch delivery)
120                if let Ok(message) = serde_json::from_str::<WsMessage>(&text) {
121                    if let WsMessage::Patch(patch) = message {
122                        info!("Received patch for hot reload: {:?}", patch);
123                        // TODO: Apply patch to runtime
124                    }
125                }
126            }
127            Ok(Message::Close(_)) => {
128                info!("Hot reload WebSocket client disconnected");
129                break;
130            }
131            Err(e) => {
132                error!("Hot reload WebSocket error: {}", e);
133                break;
134            }
135            _ => {}
136        }
137    }
138}
139
140/// Handle agent stream WebSocket connection
141async fn handle_agent_socket(mut ws: WebSocket) {
142    info!("Agent stream WebSocket client connected");
143    
144    while let Some(result) = ws.next().await {
145        match result {
146            Ok(Message::Text(text)) => {
147                // Handle agent stream messages
148                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
149                    info!("Received agent message: {}", message);
150                }
151            }
152            Ok(Message::Close(_)) => {
153                info!("Agent stream WebSocket client disconnected");
154                break;
155            }
156            Err(e) => {
157                error!("Agent stream WebSocket error: {}", e);
158                break;
159            }
160            _ => {}
161        }
162    }
163}
164
165/// Create the WebSocket router with all endpoints
166pub fn create_router() -> Router {
167    Router::new()
168        .route("/ws/runtime", get(runtime_ws))
169        .route("/ws/devtools", get(devtools_ws))
170        .route("/ws/hotreload", get(hotreload_ws))
171        .route("/ws/agent", get(agent_ws))
172}
173
174/// Start the WebSocket server
175pub async fn start_server(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
176    let app = create_router();
177    info!("Starting WebSocket server on {}", addr);
178    
179    let listener = tokio::net::TcpListener::bind(addr).await?;
180    axum::serve(listener, app).await?;
181    
182    Ok(())
183}