use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use futures::{FutureExt, StreamExt, stream};
use skyzen::hyper::Hyper;
use skyzen::routing::{CreateRouteNode, Route, Router};
use skyzen::websocket::{WebSocketMessage, WebSocketUpgrade};
use skyzen::{Responder, Server};
use smol::Task;
use smol::channel::{self, Receiver, Sender};
use smol::lock::Mutex;
use smol::net::TcpListener;
pub const DEFAULT_PORT: u16 = 2006;
pub const PORT_RETRY_COUNT: u16 = 50;
pub const DEBOUNCE_DURATION: Duration = Duration::from_millis(250);
#[derive(Debug, Clone)]
pub enum BroadcastMessage {
Text(String),
Binary(Vec<u8>),
}
#[derive(Debug)]
pub struct HotReloadServer {
port: u16,
addr: SocketAddr,
broadcast_tx: Sender<BroadcastMessage>,
_server_task: Task<()>,
}
#[derive(Debug, thiserror::Error)]
pub enum FailToLaunch {
#[error("No available port found (tried ports {0}..{1})")]
NoAvailablePort(u16, u16),
#[error("Failed to bind to port {0}: {1}")]
BindError(u16, std::io::Error),
}
struct ServerState {
clients: Vec<Sender<BroadcastMessage>>,
}
impl ServerState {
const fn new() -> Self {
Self {
clients: Vec::new(),
}
}
fn add_client(&mut self, sender: Sender<BroadcastMessage>) {
self.clients.push(sender);
}
fn broadcast(&mut self, message: &BroadcastMessage) {
self.clients
.retain(|sender| sender.try_send(message.clone()).is_ok());
}
}
impl HotReloadServer {
pub async fn launch(starting_port: u16) -> Result<Self, FailToLaunch> {
let end_port = starting_port.saturating_add(PORT_RETRY_COUNT);
for port in starting_port..end_port {
match Self::try_launch_on_port(port).await {
Ok(server) => return Ok(server),
Err(FailToLaunch::BindError(_, _)) => {}
Err(e) => return Err(e),
}
}
Err(FailToLaunch::NoAvailablePort(starting_port, end_port))
}
async fn try_launch_on_port(port: u16) -> Result<Self, FailToLaunch> {
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let listener = TcpListener::bind(addr)
.await
.map_err(|e| FailToLaunch::BindError(port, e))?;
let actual_addr = listener
.local_addr()
.map_err(|e| FailToLaunch::BindError(port, e))?;
let (broadcast_tx, broadcast_rx) = channel::unbounded::<BroadcastMessage>();
let state = Arc::new(Mutex::new(ServerState::new()));
let state_for_broadcast = state.clone();
let broadcast_task = smol::spawn(async move {
while let Ok(message) = broadcast_rx.recv().await {
let mut state = state_for_broadcast.lock().await;
state.broadcast(&message);
}
});
let router = build_router(state);
let connections = Box::pin(stream::unfold(listener, |listener| async move {
let result = listener.accept().await;
Some((result.map(|(stream, _addr)| stream), listener))
}));
let server_task = smol::spawn(async move {
let executor: &'static smol::Executor<'static> =
Box::leak(Box::new(smol::Executor::new()));
futures::future::join(
executor.run(std::future::pending::<()>()),
Hyper.serve(
executor,
|err| tracing::warn!("Hot reload connection error: {err}"),
connections,
router,
),
)
.await;
drop(broadcast_task);
});
Ok(Self {
port: actual_addr.port(),
addr: actual_addr,
broadcast_tx,
_server_task: server_task,
})
}
#[must_use]
pub const fn port(&self) -> u16 {
self.port
}
#[must_use]
pub const fn addr(&self) -> SocketAddr {
self.addr
}
#[must_use]
pub fn host(&self) -> String {
self.addr.ip().to_string()
}
pub fn send_building(&self) {
let _ = self
.broadcast_tx
.try_send(BroadcastMessage::Text("building".to_string()));
}
pub fn send_library(&self, data: Vec<u8>) {
let _ = self.broadcast_tx.try_send(BroadcastMessage::Binary(data));
}
pub async fn send_library_file(&self, path: &PathBuf) -> std::io::Result<()> {
let data = smol::fs::read(path).await?;
self.send_library(data);
Ok(())
}
pub(crate) fn broadcast_sender(&self) -> Sender<BroadcastMessage> {
self.broadcast_tx.clone()
}
}
fn build_router(state: Arc<Mutex<ServerState>>) -> Router {
Route::new("/".at(move |ws: WebSocketUpgrade| {
let ws = ws.max_message_size(None);
let state = state.clone();
async move { handle_websocket(ws, state) }
}))
.build()
}
fn handle_websocket(upgrade: WebSocketUpgrade, state: Arc<Mutex<ServerState>>) -> impl Responder {
upgrade.on_upgrade(move |mut socket| async move {
tracing::info!("Hot reload client connected");
let (client_tx, client_rx) = channel::unbounded::<BroadcastMessage>();
{
let mut state = state.lock().await;
state.add_client(client_tx);
}
tracing::debug!("Hot reload client registered, entering event loop");
loop {
futures::select! {
message = client_rx.recv().fuse() => {
match message {
Ok(BroadcastMessage::Text(text)) => {
tracing::debug!("Sending text message to client: {text}");
if let Err(e) = socket.send_text(text).await {
tracing::warn!("Failed to send text to client: {e}");
break;
}
}
Ok(BroadcastMessage::Binary(data)) => {
tracing::debug!("Sending {} bytes to client", data.len());
if let Err(e) = socket.send_binary(data).await {
tracing::warn!("Failed to send binary to client: {e}");
break;
}
}
Err(e) => {
tracing::debug!("Client channel closed: {e}");
break;
}
}
}
msg = socket.next().fuse() => {
match msg {
Some(Ok(WebSocketMessage::Close)) => {
tracing::debug!("Client sent close frame");
break;
}
Some(Err(e)) => {
tracing::debug!("WebSocket error: {e}");
break;
}
None => {
tracing::debug!("WebSocket stream ended");
break;
}
Some(Ok(WebSocketMessage::Ping(data))) => {
tracing::debug!("Received ping, sending pong");
if socket.send_pong(data).await.is_err() {
break;
}
}
Some(Ok(msg)) => {
tracing::debug!("Received message: {msg:?}");
}
}
}
}
}
tracing::info!("Hot reload client disconnected");
})
}
#[derive(Debug)]
pub struct BuildManager {
current_build: Option<Task<Result<PathBuf, crate::build::RustBuildError>>>,
debounce_task: Option<Task<()>>,
debounce_rx: Option<Receiver<()>>,
}
impl BuildManager {
#[must_use]
pub const fn new() -> Self {
Self {
current_build: None,
debounce_task: None,
debounce_rx: None,
}
}
pub fn request_rebuild(&mut self) {
self.current_build.take();
self.debounce_task.take();
self.debounce_rx.take();
let (tx, rx) = channel::bounded(1);
self.debounce_task = Some(smol::spawn(async move {
smol::Timer::after(DEBOUNCE_DURATION).await;
let _ = tx.send(()).await;
}));
self.debounce_rx = Some(rx);
}
pub fn should_start_build(&mut self) -> bool {
if let Some(rx) = &self.debounce_rx {
if rx.try_recv().is_ok() {
self.debounce_task.take();
self.debounce_rx.take();
return true;
}
}
false
}
pub fn start_build(&mut self, rust_build: crate::build::RustBuild) {
self.current_build = Some(smol::spawn(async move { rust_build.dev_build().await }));
}
pub async fn poll_build(&mut self) -> Option<Result<PathBuf, crate::build::RustBuildError>> {
if let Some(task) = &self.current_build {
if task.is_finished() {
if let Some(task) = self.current_build.take() {
return Some(task.await);
}
}
}
None
}
#[must_use]
pub const fn is_building(&self) -> bool {
self.current_build.is_some()
}
#[must_use]
pub const fn is_debouncing(&self) -> bool {
self.debounce_rx.is_some()
}
}
impl Default for BuildManager {
fn default() -> Self {
Self::new()
}
}