use axum::{
Router,
extract::State,
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::IntoResponse,
routing::get,
};
use futures_util::StreamExt;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use crate::patch_engine::{PatchEngine, RuntimePatch};
#[derive(Clone)]
pub struct AppState {
pub patch_tx: broadcast::Sender<WsMessage>,
pub patch_engine: Arc<std::sync::Mutex<PatchEngine>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsMessage {
Patch(RuntimePatch),
State(crate::dev_runtime::RuntimeStateSnapshot),
Event(crate::dev_runtime::RuntimeEvent),
Devtools(DevtoolsMessage),
Handshake {
client: String,
capabilities: Vec<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum DevtoolsMessage {
Command(DevtoolsCommand),
Response(serde_json::Value),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "command", rename_all = "snake_case")]
pub enum DevtoolsCommand {
QueryMetrics,
ToggleOverlay { show: bool },
QueryGraph,
QueryAccessibility {
path: String,
},
Ping,
}
async fn runtime_ws(State(state): State<AppState>, ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_runtime_socket(socket, state))
}
async fn devtools_ws(State(state): State<AppState>, ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_devtools_socket(socket, state))
}
async fn hotreload_ws(State(state): State<AppState>, ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_hotreload_socket(socket, state))
}
async fn agent_ws(State(state): State<AppState>, ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_agent_socket(socket, state))
}
async fn send_ws(ws: &mut WebSocket, msg: &WsMessage) {
match serde_json::to_string(msg) {
Ok(json) => {
if let Err(e) = ws.send(Message::Text(json.into())).await {
error!("Failed to send WS message: {}", e);
}
}
Err(e) => error!("Failed to serialize WS message: {}", e),
}
}
async fn handle_runtime_socket(mut ws: WebSocket, state: AppState) {
info!("Runtime WebSocket client connected");
send_ws(
&mut ws,
&WsMessage::Handshake {
client: "runtime".to_string(),
capabilities: vec!["patch".into(), "state".into(), "event".into()],
},
)
.await;
while let Some(result) = ws.next().await {
match result {
Ok(Message::Text(text)) => {
match serde_json::from_str::<WsMessage>(&text) {
Ok(WsMessage::Patch(patch)) => {
info!(
"Runtime patch received: {:?}",
std::mem::discriminant(&patch)
);
let _ = state.patch_tx.send(WsMessage::Patch(patch));
}
Ok(WsMessage::Event(event)) => {
info!("Runtime event received: {:?}", event);
let _ = state.patch_tx.send(WsMessage::Event(event));
}
Ok(WsMessage::State(_snapshot)) => {
info!("Runtime state snapshot received");
}
Ok(other) => {
warn!("Unexpected message type on runtime WS: {:?}", other);
}
Err(e) => {
warn!("Failed to parse runtime message: {}", e);
}
}
}
Ok(Message::Binary(bin)) => {
info!(
"Received binary message of {} bytes on runtime WS",
bin.len()
);
}
Ok(Message::Close(_)) => {
info!("Runtime WebSocket client disconnected");
break;
}
Err(e) => {
error!("Runtime WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
async fn handle_devtools_socket(mut ws: WebSocket, _state: AppState) {
info!("DevTools WebSocket client connected");
send_ws(
&mut ws,
&WsMessage::Handshake {
client: "devtools".to_string(),
capabilities: vec!["metrics".into(), "overlay".into(), "graph".into()],
},
)
.await;
while let Some(result) = ws.next().await {
match result {
Ok(Message::Text(text)) => {
match serde_json::from_str::<DevtoolsCommand>(&text) {
Ok(DevtoolsCommand::QueryMetrics) => {
let metrics = crate::devtools::capture_metrics();
let response = serde_json::json!({
"type": "metrics",
"fps": metrics.fps,
"frame_time_ms": metrics.frame_time_ms,
"node_count": metrics.node_count,
"edge_count": metrics.edge_count,
"gpu_memory_mb": metrics.gpu_memory_mb,
});
send_ws(
&mut ws,
&WsMessage::Devtools(DevtoolsMessage::Response(response)),
)
.await;
}
Ok(DevtoolsCommand::ToggleOverlay { show }) => {
info!("DevTools overlay toggled: {}", show);
let response = serde_json::json!({
"type": "overlay_toggled",
"show": show,
});
send_ws(
&mut ws,
&WsMessage::Devtools(DevtoolsMessage::Response(response)),
)
.await;
}
Ok(DevtoolsCommand::QueryGraph) => {
let response = serde_json::json!({
"type": "graph",
"nodes": [],
"edges": [],
});
send_ws(
&mut ws,
&WsMessage::Devtools(DevtoolsMessage::Response(response)),
)
.await;
}
Ok(DevtoolsCommand::QueryAccessibility { path }) => {
let response = serde_json::json!({
"type": "accessibility",
"path": path,
"properties": {
"role": "button",
"label": "Sample Button",
"description": None::<String>,
"disabled": false,
"checked": None::<bool>,
"expanded": None::<bool>,
"hidden": false,
"shortcut": None::<String>,
},
});
send_ws(
&mut ws,
&WsMessage::Devtools(DevtoolsMessage::Response(response)),
)
.await;
}
Ok(DevtoolsCommand::Ping) => {
let response = serde_json::json!({ "type": "pong" });
send_ws(
&mut ws,
&WsMessage::Devtools(DevtoolsMessage::Response(response)),
)
.await;
}
Err(e) => {
warn!("Failed to parse DevTools message: {}", e);
let error = serde_json::json!({
"type": "error",
"message": format!("Invalid command: {}", e),
});
send_ws(
&mut ws,
&WsMessage::Devtools(DevtoolsMessage::Response(error)),
)
.await;
}
}
}
Ok(Message::Close(_)) => {
info!("DevTools WebSocket client disconnected");
break;
}
Err(e) => {
error!("DevTools WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
async fn handle_hotreload_socket(mut ws: WebSocket, state: AppState) {
info!("Hot reload WebSocket client connected");
let mut patch_rx = state.patch_tx.subscribe();
send_ws(
&mut ws,
&WsMessage::Handshake {
client: "hotreload".to_string(),
capabilities: vec!["patch".into()],
},
)
.await;
loop {
tokio::select! {
Ok(msg) = patch_rx.recv() => {
send_ws(&mut ws, &msg).await;
}
Some(result) = ws.next() => {
match result {
Ok(Message::Close(_)) => {
info!("Hot reload WebSocket client disconnected");
break;
}
Err(e) => {
error!("Hot reload WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
}
}
async fn handle_agent_socket(mut ws: WebSocket, state: AppState) {
info!("Agent stream WebSocket client connected");
send_ws(
&mut ws,
&WsMessage::Handshake {
client: "agent".to_string(),
capabilities: vec!["event".into()],
},
)
.await;
while let Some(result) = ws.next().await {
match result {
Ok(Message::Text(text)) => {
match serde_json::from_str::<crate::dev_runtime::AgentEvent>(&text) {
Ok(event) => {
let runtime_event = crate::dev_runtime::RuntimeEvent::Agent(event);
let _ = state.patch_tx.send(WsMessage::Event(runtime_event));
}
Err(e) => {
match serde_json::from_str::<crate::dev_runtime::RuntimeEvent>(&text) {
Ok(event) => {
let _ = state.patch_tx.send(WsMessage::Event(event));
}
Err(e2) => {
warn!(
"Failed to parse agent message as AgentEvent ({}) or RuntimeEvent ({})",
e, e2
);
}
}
}
}
}
Ok(Message::Close(_)) => {
info!("Agent stream WebSocket client disconnected");
break;
}
Err(e) => {
error!("Agent stream WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
pub fn create_router(state: AppState) -> Router {
Router::new()
.route("/ws/runtime", get(runtime_ws))
.route("/ws/devtools", get(devtools_ws))
.route("/ws/hotreload", get(hotreload_ws))
.route("/ws/agent", get(agent_ws))
.route("/health", get(|| async { "OK" }))
.route("/", get(serve_shell))
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(state)
}
async fn serve_shell() -> impl IntoResponse {
axum::response::Html(
r#"<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>CVKG Dev Server</title>
<style>
body { margin: 0; background: #0b0b14; color: #c0c0c8; font-family: 'JetBrains Mono', monospace; display: flex; align-items: center; justify-content: center; height: 100vh; }
.status { text-align: center; }
.status h1 { font-size: 24px; color: #00cccc; margin-bottom: 8px; }
.status p { font-size: 14px; color: #6a6a8a; }
.status .indicator { display: inline-block; width: 8px; height: 8px; border-radius: 50%; background: #4a8a4a; margin-right: 6px; }
</style>
</head>
<body>
<div class="status">
<h1>⚡ CVKG Dev Server</h1>
<p><span class="indicator"></span>Connected — WebSocket hot reload active</p>
<p style="margin-top: 16px; font-size: 12px;">Waiting for changes...</p>
</div>
</body>
</html>"#,
)
}
const HOT_RELOAD_STATE_PATH: &str = ".cvkg/hot_reload_state.json";
pub type DashboardState = Arc<std::sync::Mutex<crate::devtools_dashboard::GraphState>>;
pub fn start_file_watcher(
path: &str,
patch_engine: Arc<std::sync::Mutex<crate::patch_engine::PatchEngine>>,
) -> broadcast::Sender<WsMessage> {
use crate::build_pipeline::BuildPipeline;
let (tx, _) = broadcast::channel(100);
let tx_clone = tx.clone();
let patch_engine = Arc::clone(&patch_engine);
let _ = std::fs::create_dir_all(".cvkg");
BuildPipeline::watch_changes(path, move |artifact| {
if let Some(ds) = crate::devtools_dashboard::dashboard_state() {
let guard = ds.lock().unwrap_or_else(|e| e.into_inner());
crate::devtools::update_metrics(crate::devtools::PerfMetrics {
frame_time_ms: guard.frame_time_ms,
fps: if guard.frame_time_ms > 0.0 {
1000.0 / guard.frame_time_ms
} else {
0.0
},
node_count: guard.nodes.len(),
edge_count: guard.edges.len(),
gpu_memory_mb: guard.gpu_memory_mb,
});
}
let state = crate::dev_runtime::HotReloadState {
theme_mode: "dark".to_string(),
window_size: (1200.0, 800.0),
scroll_positions: std::collections::HashMap::new(),
input_text: std::collections::HashMap::new(),
expanded_nodes: std::collections::HashMap::new(),
saved_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
};
if let Err(e) = state.save(std::path::Path::new(HOT_RELOAD_STATE_PATH)) {
warn!("Failed to save hot-reload state: {}", e);
}
let mut engine = match patch_engine.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let patch = engine.generate_patch(artifact);
let _ = tx_clone.send(WsMessage::Patch(patch));
});
if std::path::Path::new(HOT_RELOAD_STATE_PATH).exists() {
match crate::dev_runtime::HotReloadState::load(std::path::Path::new(HOT_RELOAD_STATE_PATH))
{
Ok(state) => {
info!(
"Loaded hot-reload state from {} (theme: {}, saved_at: {})",
HOT_RELOAD_STATE_PATH, state.theme_mode, state.saved_at
);
}
Err(e) => {
debug!("No previous hot-reload state found: {}", e);
}
}
}
tx
}
pub async fn start_server(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
let patch_engine = Arc::new(std::sync::Mutex::new(PatchEngine::new()));
let patch_tx = start_file_watcher(".", Arc::clone(&patch_engine));
let state = AppState {
patch_tx: patch_tx.clone(),
patch_engine: Arc::clone(&patch_engine),
};
let app = create_router(state);
info!("Starting WebSocket server on {} (Ctrl+C to stop)", addr);
let animation_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(16)); let mut solver = cvkg_anim::SpringSolver::new(cvkg_anim::SpringParams::default(), 0.0, 0.0);
let mut physics_world =
cvkg_physics::PhysicsWorld::new(cvkg_physics::WorldConfig::default());
loop {
interval.tick().await;
let dt = 0.016;
let _value = solver.tick(dt);
physics_world.step(dt);
}
});
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
animation_handle.abort();
info!("CVKG dev server shut down gracefully.");
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
info!("Ctrl+C received, shutting down gracefully...");
},
_ = terminate => {
info!("SIGTERM received, shutting down gracefully...");
},
}
}