waterui_cli/debug/
hot_reload.rs1use std::net::SocketAddr;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::{FutureExt, StreamExt, stream};
11use skyzen::hyper::Hyper;
12use skyzen::routing::{CreateRouteNode, Route, Router};
13use skyzen::websocket::{WebSocketMessage, WebSocketUpgrade};
14use skyzen::{Responder, Server};
15use smol::Task;
16use smol::channel::{self, Receiver, Sender};
17use smol::lock::Mutex;
18use smol::net::TcpListener;
19
20pub const DEFAULT_PORT: u16 = 2006;
22
23pub const PORT_RETRY_COUNT: u16 = 50;
25
26pub const DEBOUNCE_DURATION: Duration = Duration::from_millis(250);
28
29#[derive(Debug, Clone)]
31pub enum BroadcastMessage {
32 Text(String),
34 Binary(Vec<u8>),
36}
37
38#[derive(Debug)]
40pub struct HotReloadServer {
41 port: u16,
42 addr: SocketAddr,
43 broadcast_tx: Sender<BroadcastMessage>,
44 _server_task: Task<()>,
45}
46
47#[derive(Debug, thiserror::Error)]
49pub enum FailToLaunch {
50 #[error("No available port found (tried ports {0}..{1})")]
52 NoAvailablePort(u16, u16),
53
54 #[error("Failed to bind to port {0}: {1}")]
56 BindError(u16, std::io::Error),
57}
58
59struct ServerState {
61 clients: Vec<Sender<BroadcastMessage>>,
63}
64
65impl ServerState {
66 const fn new() -> Self {
67 Self {
68 clients: Vec::new(),
69 }
70 }
71
72 fn add_client(&mut self, sender: Sender<BroadcastMessage>) {
73 self.clients.push(sender);
74 }
75
76 fn broadcast(&mut self, message: &BroadcastMessage) {
77 self.clients
79 .retain(|sender| sender.try_send(message.clone()).is_ok());
80 }
81}
82
83impl HotReloadServer {
84 pub async fn launch(starting_port: u16) -> Result<Self, FailToLaunch> {
91 let end_port = starting_port.saturating_add(PORT_RETRY_COUNT);
92
93 for port in starting_port..end_port {
94 match Self::try_launch_on_port(port).await {
95 Ok(server) => return Ok(server),
96 Err(FailToLaunch::BindError(_, _)) => {}
97 Err(e) => return Err(e),
98 }
99 }
100
101 Err(FailToLaunch::NoAvailablePort(starting_port, end_port))
102 }
103
104 async fn try_launch_on_port(port: u16) -> Result<Self, FailToLaunch> {
106 let addr = SocketAddr::from(([127, 0, 0, 1], port));
107 let listener = TcpListener::bind(addr)
108 .await
109 .map_err(|e| FailToLaunch::BindError(port, e))?;
110
111 let actual_addr = listener
112 .local_addr()
113 .map_err(|e| FailToLaunch::BindError(port, e))?;
114
115 let (broadcast_tx, broadcast_rx) = channel::unbounded::<BroadcastMessage>();
117
118 let state = Arc::new(Mutex::new(ServerState::new()));
120
121 let state_for_broadcast = state.clone();
123 let broadcast_task = smol::spawn(async move {
124 while let Ok(message) = broadcast_rx.recv().await {
125 let mut state = state_for_broadcast.lock().await;
126 state.broadcast(&message);
127 }
128 });
129
130 let router = build_router(state);
132
133 let connections = Box::pin(stream::unfold(listener, |listener| async move {
135 let result = listener.accept().await;
136 Some((result.map(|(stream, _addr)| stream), listener))
137 }));
138
139 let server_task = smol::spawn(async move {
141 let executor: &'static smol::Executor<'static> =
143 Box::leak(Box::new(smol::Executor::new()));
144
145 futures::future::join(
148 executor.run(std::future::pending::<()>()),
150 Hyper.serve(
152 executor,
153 |err| tracing::warn!("Hot reload connection error: {err}"),
154 connections,
155 router,
156 ),
157 )
158 .await;
159
160 drop(broadcast_task);
161 });
162
163 Ok(Self {
164 port: actual_addr.port(),
165 addr: actual_addr,
166 broadcast_tx,
167 _server_task: server_task,
168 })
169 }
170
171 #[must_use]
173 pub const fn port(&self) -> u16 {
174 self.port
175 }
176
177 #[must_use]
179 pub const fn addr(&self) -> SocketAddr {
180 self.addr
181 }
182
183 #[must_use]
185 pub fn host(&self) -> String {
186 self.addr.ip().to_string()
187 }
188
189 pub fn send_building(&self) {
193 let _ = self
194 .broadcast_tx
195 .try_send(BroadcastMessage::Text("building".to_string()));
196 }
197
198 pub fn send_library(&self, data: Vec<u8>) {
202 let _ = self.broadcast_tx.try_send(BroadcastMessage::Binary(data));
203 }
204
205 pub async fn send_library_file(&self, path: &PathBuf) -> std::io::Result<()> {
212 let data = smol::fs::read(path).await?;
213 self.send_library(data);
214 Ok(())
215 }
216
217 pub(crate) fn broadcast_sender(&self) -> Sender<BroadcastMessage> {
219 self.broadcast_tx.clone()
220 }
221}
222
223fn build_router(state: Arc<Mutex<ServerState>>) -> Router {
225 Route::new("/".at(move |ws: WebSocketUpgrade| {
226 let ws = ws.max_message_size(None);
227 let state = state.clone();
228 async move { handle_websocket(ws, state) }
229 }))
230 .build()
231}
232
233fn handle_websocket(upgrade: WebSocketUpgrade, state: Arc<Mutex<ServerState>>) -> impl Responder {
235 upgrade.on_upgrade(move |mut socket| async move {
236 tracing::info!("Hot reload client connected");
237
238 let (client_tx, client_rx) = channel::unbounded::<BroadcastMessage>();
240
241 {
243 let mut state = state.lock().await;
244 state.add_client(client_tx);
245 }
246
247 tracing::debug!("Hot reload client registered, entering event loop");
248
249 loop {
251 futures::select! {
252 message = client_rx.recv().fuse() => {
254 match message {
255 Ok(BroadcastMessage::Text(text)) => {
256 tracing::debug!("Sending text message to client: {text}");
257 if let Err(e) = socket.send_text(text).await {
258 tracing::warn!("Failed to send text to client: {e}");
259 break;
260 }
261 }
262 Ok(BroadcastMessage::Binary(data)) => {
263 tracing::debug!("Sending {} bytes to client", data.len());
264 if let Err(e) = socket.send_binary(data).await {
265 tracing::warn!("Failed to send binary to client: {e}");
266 break;
267 }
268 }
269 Err(e) => {
270 tracing::debug!("Client channel closed: {e}");
271 break;
272 }
273 }
274 }
275 msg = socket.next().fuse() => {
277 match msg {
278 Some(Ok(WebSocketMessage::Close)) => {
279 tracing::debug!("Client sent close frame");
280 break;
281 }
282 Some(Err(e)) => {
283 tracing::debug!("WebSocket error: {e}");
284 break;
285 }
286 None => {
287 tracing::debug!("WebSocket stream ended");
288 break;
289 }
290 Some(Ok(WebSocketMessage::Ping(data))) => {
291 tracing::debug!("Received ping, sending pong");
292 if socket.send_pong(data).await.is_err() {
293 break;
294 }
295 }
296 Some(Ok(msg)) => {
297 tracing::debug!("Received message: {msg:?}");
298 }
299 }
300 }
301 }
302 }
303
304 tracing::info!("Hot reload client disconnected");
305 })
306}
307
308#[derive(Debug)]
310pub struct BuildManager {
311 current_build: Option<Task<Result<PathBuf, crate::build::RustBuildError>>>,
313 debounce_task: Option<Task<()>>,
315 debounce_rx: Option<Receiver<()>>,
317}
318
319impl BuildManager {
320 #[must_use]
322 pub const fn new() -> Self {
323 Self {
324 current_build: None,
325 debounce_task: None,
326 debounce_rx: None,
327 }
328 }
329
330 pub fn request_rebuild(&mut self) {
335 self.current_build.take();
337
338 self.debounce_task.take();
340 self.debounce_rx.take();
341
342 let (tx, rx) = channel::bounded(1);
344 self.debounce_task = Some(smol::spawn(async move {
345 smol::Timer::after(DEBOUNCE_DURATION).await;
346 let _ = tx.send(()).await;
347 }));
348 self.debounce_rx = Some(rx);
349 }
350
351 pub fn should_start_build(&mut self) -> bool {
355 if let Some(rx) = &self.debounce_rx {
356 if rx.try_recv().is_ok() {
357 self.debounce_task.take();
358 self.debounce_rx.take();
359 return true;
360 }
361 }
362 false
363 }
364
365 pub fn start_build(&mut self, rust_build: crate::build::RustBuild) {
367 self.current_build = Some(smol::spawn(async move { rust_build.dev_build().await }));
368 }
369
370 pub async fn poll_build(&mut self) -> Option<Result<PathBuf, crate::build::RustBuildError>> {
374 if let Some(task) = &self.current_build {
375 if task.is_finished() {
377 if let Some(task) = self.current_build.take() {
378 return Some(task.await);
379 }
380 }
381 }
382 None
383 }
384
385 #[must_use]
387 pub const fn is_building(&self) -> bool {
388 self.current_build.is_some()
389 }
390
391 #[must_use]
393 pub const fn is_debouncing(&self) -> bool {
394 self.debounce_rx.is_some()
395 }
396}
397
398impl Default for BuildManager {
399 fn default() -> Self {
400 Self::new()
401 }
402}