wavecraft_dev_server/
ws_server.rs1use futures_util::{SinkExt, StreamExt};
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::broadcast;
12use tokio_tungstenite::{accept_async, tungstenite::protocol::Message};
13use tracing::{debug, error, info, warn};
14use wavecraft_bridge::{IpcHandler, ParameterHost};
15
16pub struct WsServer<H: ParameterHost + 'static> {
18 port: u16,
20 handler: Arc<IpcHandler<H>>,
22 shutdown_tx: broadcast::Sender<()>,
24 verbose: bool,
26}
27
28impl<H: ParameterHost + 'static> WsServer<H> {
29 pub fn new(port: u16, handler: Arc<IpcHandler<H>>, verbose: bool) -> Self {
31 let (shutdown_tx, _) = broadcast::channel(1);
32 Self {
33 port,
34 handler,
35 shutdown_tx,
36 verbose,
37 }
38 }
39
40 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
42 let addr: SocketAddr = format!("127.0.0.1:{}", self.port).parse()?;
43 let listener = TcpListener::bind(&addr).await?;
44
45 info!("Server listening on ws://{}", addr);
46
47 let handler = Arc::clone(&self.handler);
48 let mut shutdown_rx = self.shutdown_tx.subscribe();
49 let verbose = self.verbose;
50
51 tokio::spawn(async move {
52 loop {
53 tokio::select! {
54 result = listener.accept() => {
55 match result {
56 Ok((stream, addr)) => {
57 info!("Client connected: {}", addr);
58 let handler = Arc::clone(&handler);
59 tokio::spawn(handle_connection(handler, stream, addr, verbose));
60 }
61 Err(e) => {
62 error!("Accept error: {}", e);
63 }
64 }
65 }
66 _ = shutdown_rx.recv() => {
67 info!("Server shutting down");
68 break;
69 }
70 }
71 }
72 });
73
74 Ok(())
75 }
76
77 #[allow(dead_code)]
81 pub fn shutdown(&self) {
82 let _ = self.shutdown_tx.send(());
83 }
84}
85
86async fn handle_connection<H: ParameterHost>(
88 handler: Arc<IpcHandler<H>>,
89 stream: TcpStream,
90 addr: SocketAddr,
91 verbose: bool,
92) {
93 let ws_stream = match accept_async(stream).await {
94 Ok(ws) => ws,
95 Err(e) => {
96 error!("Error during handshake with {}: {}", addr, e);
97 return;
98 }
99 };
100
101 info!("WebSocket connection established: {}", addr);
102
103 let (mut write, mut read) = ws_stream.split();
104
105 while let Some(msg) = read.next().await {
106 match msg {
107 Ok(Message::Text(json)) => {
108 if verbose {
110 debug!("Received from {}: {}", addr, json);
111 }
112
113 let response = handler.handle_json(&json);
115
116 if verbose {
118 debug!("Sending to {}: {}", addr, response);
119 }
120
121 if let Err(e) = write.send(Message::Text(response)).await {
123 error!("Error sending response to {}: {}", addr, e);
124 break;
125 }
126 }
127 Ok(Message::Close(_)) => {
128 info!("Client closed connection: {}", addr);
129 break;
130 }
131 Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {
132 }
134 Ok(Message::Binary(_)) => {
135 warn!("Unexpected binary message from {}", addr);
136 }
137 Ok(Message::Frame(_)) => {
138 }
140 Err(e) => {
141 error!("Error receiving from {}: {}", addr, e);
142 break;
143 }
144 }
145 }
146
147 info!("Connection closed: {}", addr);
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::app::AppState;
154
155 #[tokio::test]
156 async fn test_server_creation() {
157 let state = AppState::new();
158 let handler = Arc::new(IpcHandler::new(state));
159 let server = WsServer::new(9001, handler, false);
160
161 assert_eq!(server.port, 9001);
163 }
164}