waterui_cli/debug/
hot_reload.rs

1//! Hot reload server for `WaterUI` CLI.
2//!
3//! Provides a WebSocket server that broadcasts dylib updates to connected apps.
4
5use std::net::SocketAddr;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::{FutureExt, StreamExt, stream};
11use skyzen::hyper::Hyper;
12use skyzen::routing::{CreateRouteNode, Route, Router};
13use skyzen::websocket::{WebSocketMessage, WebSocketUpgrade};
14use skyzen::{Responder, Server};
15use smol::Task;
16use smol::channel::{self, Receiver, Sender};
17use smol::lock::Mutex;
18use smol::net::TcpListener;
19
20/// Default starting port for hot reload server.
21pub const DEFAULT_PORT: u16 = 2006;
22
23/// Number of ports to try before giving up.
24pub const PORT_RETRY_COUNT: u16 = 50;
25
26/// Debounce duration for file changes before triggering a rebuild.
27pub const DEBOUNCE_DURATION: Duration = Duration::from_millis(250);
28
29/// Message types that can be broadcast to connected clients.
30#[derive(Debug, Clone)]
31pub enum BroadcastMessage {
32    /// Text message (e.g., "building" status).
33    Text(String),
34    /// Binary message (e.g., compiled library).
35    Binary(Vec<u8>),
36}
37
38/// Hot reload server that broadcasts dylib updates to connected apps.
39#[derive(Debug)]
40pub struct HotReloadServer {
41    port: u16,
42    addr: SocketAddr,
43    broadcast_tx: Sender<BroadcastMessage>,
44    _server_task: Task<()>,
45}
46
47/// Errors that can occur when launching the hot reload server.
48#[derive(Debug, thiserror::Error)]
49pub enum FailToLaunch {
50    /// No available port found after trying all candidates.
51    #[error("No available port found (tried ports {0}..{1})")]
52    NoAvailablePort(u16, u16),
53
54    /// Failed to bind to a specific port.
55    #[error("Failed to bind to port {0}: {1}")]
56    BindError(u16, std::io::Error),
57}
58
59/// Shared state for managing connected WebSocket clients.
60struct ServerState {
61    /// Senders to all connected clients.
62    clients: Vec<Sender<BroadcastMessage>>,
63}
64
65impl ServerState {
66    const fn new() -> Self {
67        Self {
68            clients: Vec::new(),
69        }
70    }
71
72    fn add_client(&mut self, sender: Sender<BroadcastMessage>) {
73        self.clients.push(sender);
74    }
75
76    fn broadcast(&mut self, message: &BroadcastMessage) {
77        // Remove disconnected clients and send to remaining ones
78        self.clients
79            .retain(|sender| sender.try_send(message.clone()).is_ok());
80    }
81}
82
83impl HotReloadServer {
84    /// Launch the hot reload server, trying ports starting from `starting_port`.
85    ///
86    /// Will try up to `PORT_RETRY_COUNT` consecutive ports if the initial port is busy.
87    ///
88    /// # Errors
89    /// Returns `FailToLaunch::NoAvailablePort` if no port could be bound.
90    pub async fn launch(starting_port: u16) -> Result<Self, FailToLaunch> {
91        let end_port = starting_port.saturating_add(PORT_RETRY_COUNT);
92
93        for port in starting_port..end_port {
94            match Self::try_launch_on_port(port).await {
95                Ok(server) => return Ok(server),
96                Err(FailToLaunch::BindError(_, _)) => {}
97                Err(e) => return Err(e),
98            }
99        }
100
101        Err(FailToLaunch::NoAvailablePort(starting_port, end_port))
102    }
103
104    /// Try to launch the server on a specific port.
105    async fn try_launch_on_port(port: u16) -> Result<Self, FailToLaunch> {
106        let addr = SocketAddr::from(([127, 0, 0, 1], port));
107        let listener = TcpListener::bind(addr)
108            .await
109            .map_err(|e| FailToLaunch::BindError(port, e))?;
110
111        let actual_addr = listener
112            .local_addr()
113            .map_err(|e| FailToLaunch::BindError(port, e))?;
114
115        // Channel for broadcasting messages to the server task
116        let (broadcast_tx, broadcast_rx) = channel::unbounded::<BroadcastMessage>();
117
118        // Shared state for managing clients
119        let state = Arc::new(Mutex::new(ServerState::new()));
120
121        // Spawn background task to handle broadcasts
122        let state_for_broadcast = state.clone();
123        let broadcast_task = smol::spawn(async move {
124            while let Ok(message) = broadcast_rx.recv().await {
125                let mut state = state_for_broadcast.lock().await;
126                state.broadcast(&message);
127            }
128        });
129
130        // Build the router with WebSocket endpoint
131        let router = build_router(state);
132
133        // Convert TcpListener to an owned Stream of connections
134        let connections = Box::pin(stream::unfold(listener, |listener| async move {
135            let result = listener.accept().await;
136            Some((result.map(|(stream, _addr)| stream), listener))
137        }));
138
139        // Spawn the server task using smol's global executor
140        let server_task = smol::spawn(async move {
141            // Create a static executor for the server (leaked to satisfy 'static requirement)
142            let executor: &'static smol::Executor<'static> =
143                Box::leak(Box::new(smol::Executor::new()));
144
145            // Run the executor in parallel with serving
146            // The executor must be driven to process connection handlers
147            futures::future::join(
148                // Drive the executor (runs forever until dropped)
149                executor.run(std::future::pending::<()>()),
150                // Serve connections
151                Hyper.serve(
152                    executor,
153                    |err| tracing::warn!("Hot reload connection error: {err}"),
154                    connections,
155                    router,
156                ),
157            )
158            .await;
159
160            drop(broadcast_task);
161        });
162
163        Ok(Self {
164            port: actual_addr.port(),
165            addr: actual_addr,
166            broadcast_tx,
167            _server_task: server_task,
168        })
169    }
170
171    /// Get the port the server is listening on.
172    #[must_use]
173    pub const fn port(&self) -> u16 {
174        self.port
175    }
176
177    /// Get the address the server is listening on.
178    #[must_use]
179    pub const fn addr(&self) -> SocketAddr {
180        self.addr
181    }
182
183    /// Get the host string for environment variable.
184    #[must_use]
185    pub fn host(&self) -> String {
186        self.addr.ip().to_string()
187    }
188
189    /// Notify all connected clients that a build is starting.
190    ///
191    /// This provides instant feedback to the user before compilation completes.
192    pub fn send_building(&self) {
193        let _ = self
194            .broadcast_tx
195            .try_send(BroadcastMessage::Text("building".to_string()));
196    }
197
198    /// Broadcast a library binary to all connected clients.
199    ///
200    /// Returns immediately; the broadcast happens asynchronously.
201    pub fn send_library(&self, data: Vec<u8>) {
202        let _ = self.broadcast_tx.try_send(BroadcastMessage::Binary(data));
203    }
204
205    /// Broadcast a library file to all connected clients.
206    ///
207    /// Reads the file and sends its contents to all connected apps.
208    ///
209    /// # Errors
210    /// Returns an error if the file cannot be read.
211    pub async fn send_library_file(&self, path: &PathBuf) -> std::io::Result<()> {
212        let data = smol::fs::read(path).await?;
213        self.send_library(data);
214        Ok(())
215    }
216
217    /// Get a clone of the broadcast sender for sending library data.
218    pub(crate) fn broadcast_sender(&self) -> Sender<BroadcastMessage> {
219        self.broadcast_tx.clone()
220    }
221}
222
223/// Build the skyzen router with WebSocket endpoint.
224fn build_router(state: Arc<Mutex<ServerState>>) -> Router {
225    Route::new("/".at(move |ws: WebSocketUpgrade| {
226        let ws = ws.max_message_size(None);
227        let state = state.clone();
228        async move { handle_websocket(ws, state) }
229    }))
230    .build()
231}
232
233/// Handle a single WebSocket connection.
234fn handle_websocket(upgrade: WebSocketUpgrade, state: Arc<Mutex<ServerState>>) -> impl Responder {
235    upgrade.on_upgrade(move |mut socket| async move {
236        tracing::info!("Hot reload client connected");
237
238        // Create a channel for this client to receive broadcasts
239        let (client_tx, client_rx) = channel::unbounded::<BroadcastMessage>();
240
241        // Register this client
242        {
243            let mut state = state.lock().await;
244            state.add_client(client_tx);
245        }
246
247        tracing::debug!("Hot reload client registered, entering event loop");
248
249        // Handle the WebSocket connection - interleave sending and receiving
250        loop {
251            futures::select! {
252                // Check for data to send to client
253                message = client_rx.recv().fuse() => {
254                    match message {
255                        Ok(BroadcastMessage::Text(text)) => {
256                            tracing::debug!("Sending text message to client: {text}");
257                            if let Err(e) = socket.send_text(text).await {
258                                tracing::warn!("Failed to send text to client: {e}");
259                                break;
260                            }
261                        }
262                        Ok(BroadcastMessage::Binary(data)) => {
263                            tracing::debug!("Sending {} bytes to client", data.len());
264                            if let Err(e) = socket.send_binary(data).await {
265                                tracing::warn!("Failed to send binary to client: {e}");
266                                break;
267                            }
268                        }
269                        Err(e) => {
270                            tracing::debug!("Client channel closed: {e}");
271                            break;
272                        }
273                    }
274                }
275                // Check for messages from client
276                msg = socket.next().fuse() => {
277                    match msg {
278                        Some(Ok(WebSocketMessage::Close)) => {
279                            tracing::debug!("Client sent close frame");
280                            break;
281                        }
282                        Some(Err(e)) => {
283                            tracing::debug!("WebSocket error: {e}");
284                            break;
285                        }
286                        None => {
287                            tracing::debug!("WebSocket stream ended");
288                            break;
289                        }
290                        Some(Ok(WebSocketMessage::Ping(data))) => {
291                            tracing::debug!("Received ping, sending pong");
292                            if socket.send_pong(data).await.is_err() {
293                                break;
294                            }
295                        }
296                        Some(Ok(msg)) => {
297                            tracing::debug!("Received message: {msg:?}");
298                        }
299                    }
300                }
301            }
302        }
303
304        tracing::info!("Hot reload client disconnected");
305    })
306}
307
308/// Manages hot reload builds with debouncing and cancellation.
309#[derive(Debug)]
310pub struct BuildManager {
311    /// Currently running build task (can be cancelled by dropping).
312    current_build: Option<Task<Result<PathBuf, crate::build::RustBuildError>>>,
313    /// Debounce timer task.
314    debounce_task: Option<Task<()>>,
315    /// Channel to signal debounce completion.
316    debounce_rx: Option<Receiver<()>>,
317}
318
319impl BuildManager {
320    /// Create a new build manager.
321    #[must_use]
322    pub const fn new() -> Self {
323        Self {
324            current_build: None,
325            debounce_task: None,
326            debounce_rx: None,
327        }
328    }
329
330    /// Request a rebuild, cancelling any in-flight build and resetting debounce.
331    ///
332    /// This method should be called when a file change is detected.
333    /// The actual build will start after `DEBOUNCE_DURATION` of no further changes.
334    pub fn request_rebuild(&mut self) {
335        // Cancel any in-flight build by dropping
336        self.current_build.take();
337
338        // Cancel previous debounce timer by dropping
339        self.debounce_task.take();
340        self.debounce_rx.take();
341
342        // Start new debounce timer
343        let (tx, rx) = channel::bounded(1);
344        self.debounce_task = Some(smol::spawn(async move {
345            smol::Timer::after(DEBOUNCE_DURATION).await;
346            let _ = tx.send(()).await;
347        }));
348        self.debounce_rx = Some(rx);
349    }
350
351    /// Check if the debounce timer has fired and a build should start.
352    ///
353    /// Returns `true` if a build should be started.
354    pub fn should_start_build(&mut self) -> bool {
355        if let Some(rx) = &self.debounce_rx {
356            if rx.try_recv().is_ok() {
357                self.debounce_task.take();
358                self.debounce_rx.take();
359                return true;
360            }
361        }
362        false
363    }
364
365    /// Start a build for the given rust build configuration.
366    pub fn start_build(&mut self, rust_build: crate::build::RustBuild) {
367        self.current_build = Some(smol::spawn(async move { rust_build.dev_build().await }));
368    }
369
370    /// Check if the current build has completed.
371    ///
372    /// Returns the build result if it completed.
373    pub async fn poll_build(&mut self) -> Option<Result<PathBuf, crate::build::RustBuildError>> {
374        if let Some(task) = &self.current_build {
375            // Check if task is done without blocking.
376            if task.is_finished() {
377                if let Some(task) = self.current_build.take() {
378                    return Some(task.await);
379                }
380            }
381        }
382        None
383    }
384
385    /// Check if a build is currently in progress.
386    #[must_use]
387    pub const fn is_building(&self) -> bool {
388        self.current_build.is_some()
389    }
390
391    /// Check if we're waiting for debounce.
392    #[must_use]
393    pub const fn is_debouncing(&self) -> bool {
394        self.debounce_rx.is_some()
395    }
396}
397
398impl Default for BuildManager {
399    fn default() -> Self {
400        Self::new()
401    }
402}