dexterous_developer_manager 0.3.0

A modular hot reload system for rust
Documentation
use std::{
    convert::Infallible,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    sync::Arc,
    time::Duration,
};

use axum::{
    body::Body,
    extract::{
        ws::{self, WebSocket},
        Path, Request, State, WebSocketUpgrade,
    },
    http::StatusCode,
    response::{IntoResponse, Response},
    routing::get,
    Router,
};
use camino::Utf8PathBuf;
use dexterous_developer_builder::types::{
    BuildOutputMessages, CurrentBuildState, HashedFileRecord,
};
use dexterous_developer_types::{HotReloadMessage, Target, TargetParseError};
use futures_util::{SinkExt, StreamExt};
use thiserror::Error;
use tokio::sync::broadcast;
use tower::ServiceExt;
use tower_http::services::ServeFile;
use tracing::{error, info};

use crate::{Manager, ManagerError};

pub async fn run_server(port: u16, manager: Manager) -> Result<(), Error> {
    let app = Router::new()
        .route("/targets", get(list_targets))
        .route("/target/:target", get(connect_to_target))
        .route("/files/:target/*file", get(target_file_loader));

    let app = app.with_state(ServerState {
        manager: Arc::new(manager),
    });

    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
    let listener = tokio::net::TcpListener::bind(addr).await?;
    let port = listener.local_addr()?.port();

    info!("Listening on http://127.0.0.1:{port}");

    axum::serve(listener, app).await?;

    Ok(())
}

#[cfg(feature = "test")]
pub async fn run_test_server(
    port: u16,
    manager: Manager,
    port_return: tokio::sync::oneshot::Sender<u16>,
) -> Result<(), Error> {
    let app = Router::new()
        .route("/targets", get(list_targets))
        .route("/target/:target", get(connect_to_target))
        .route("/files/:target/*file", get(target_file_loader));

    let app = app.with_state(ServerState {
        manager: Arc::new(manager),
    });

    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
    let listener = tokio::net::TcpListener::bind(addr).await?;
    let port = listener.local_addr()?.port();

    port_return.send(port).unwrap();

    eprintln!("Listening on http://127.0.0.1:{port}");

    axum::serve(listener, app).await?;
    eprintln!("Ending");

    Ok(())
}

#[derive(Clone)]
pub struct ServerState {
    manager: Arc<Manager>,
}

#[derive(Error, Debug)]
pub enum Error {
    #[error("Server IO Failed {0}")]
    IoError(#[from] std::io::Error),
    #[error("Failed to serialize {0}")]
    SerdeError(#[from] rmp_serde::encode::Error),
    #[error("Couldn't parse target {0}")]
    TargetParseError(#[from] TargetParseError),
    #[error("Internal Manager Error {0}")]
    ManagerError(#[from] ManagerError),
    #[error("The Impossible Happened {0}")]
    Infallible(#[from] Infallible),
}

impl IntoResponse for Error {
    fn into_response(self) -> axum::response::Response {
        (StatusCode::INTERNAL_SERVER_ERROR, format!("{self}")).into_response()
    }
}

async fn list_targets(state: State<ServerState>) -> Result<Vec<u8>, Error> {
    let targets = state.manager.targets();
    let targets = targets.iter().collect::<Vec<_>>();
    let body_value = rmp_serde::to_vec(&targets)?;
    Ok(body_value)
}

async fn connect_to_target(
    target: Path<String>,
    ws: WebSocketUpgrade,
    state: State<ServerState>,
) -> Result<Response, Error> {
    let id = uuid::Uuid::new_v4();
    info!("Client {id} Connecting to Target: {target:?}");
    let target: Target = target.0.parse()?;

    let (initial_build_state, builder_rx) = state.manager.watch_target(&target).await?;
    Ok(ws.on_upgrade(move |socket| {
        connected_to_target(id, socket, target, initial_build_state, builder_rx)
    }))
}

async fn connected_to_target(
    id: uuid::Uuid,
    socket: WebSocket,
    _target: Target,
    initial_build_state: CurrentBuildState,
    mut builder_rx: broadcast::Receiver<BuildOutputMessages>,
) {
    info!("Client {id} Connected");
    let (mut ws_sender, _) = socket.split();

    {
        let initial_state_message = HotReloadMessage::InitialState {
            id,
            root_lib: {
                {
                    let lock = initial_build_state.root_library.lock().await;
                    (*lock).as_ref().cloned()
                }
            },
            libraries: initial_build_state
                .libraries
                .iter()
                .map(|asset| (asset.key().clone(), asset.hash))
                .collect(),
            assets: initial_build_state
                .assets
                .iter()
                .map(|asset| (asset.key().clone(), asset.hash))
                .collect(),
            most_recent_started_build: initial_build_state
                .most_recent_started_build
                .load(std::sync::atomic::Ordering::SeqCst),
            most_recent_completed_build: initial_build_state
                .most_recent_completed_build
                .load(std::sync::atomic::Ordering::SeqCst),
        };
        let Ok(message) = rmp_serde::to_vec(&initial_state_message) else {
            error!("Failed to serialize initial state message for {id}");
            let _ = ws_sender.close().await;
            return;
        };

        if let Err(e) = ws_sender.send(ws::Message::Binary(message)).await {
            error!("Failed to send initial state to {id} - {e}");
            let _ = ws_sender.close().await;
            return;
        }
    }

    while let Ok(msg) = tokio::select! {
        val = builder_rx.recv() => {
            val.map(|msg| match &msg {
                BuildOutputMessages::RootLibraryName(name) => Some(HotReloadMessage::RootLibPath(name.clone())),
                BuildOutputMessages::LibraryUpdated(HashedFileRecord {  name, hash, dependencies, .. }) => Some(HotReloadMessage::UpdatedLibs(name.clone(), *hash, dependencies.clone())),
                BuildOutputMessages::AssetUpdated(HashedFileRecord {  relative_path, hash, .. }) => Some(HotReloadMessage::UpdatedAssets(relative_path.clone(), *hash)),
                BuildOutputMessages::KeepAlive => None,
                BuildOutputMessages::StartedBuild(id) => Some(HotReloadMessage::BuildStarted(*id)),
                BuildOutputMessages::EndedBuild(id) => Some(HotReloadMessage::BuildCompleted(*id)),
            })
        }
        _ = tokio::time::sleep(Duration::from_secs(5)) => Ok(Some(HotReloadMessage::KeepAlive))
    } {
        let Some(msg) = msg else {
            continue;
        };
        let Ok(msg) = rmp_serde::to_vec(&msg) else {
            error!("Failed to serialize update for {id}");
            let _ = ws_sender.close().await;
            return;
        };

        if let Err(e) = ws_sender.send(ws::Message::Binary(msg)).await {
            error!("Failed to send update to {id} - {e}");
            let _ = ws_sender.close().await;
            return;
        }
    }

    info!("Connection closed for {id}");
}

async fn target_file_loader(
    Path((target, file)): Path<(String, Utf8PathBuf)>,
    state: State<ServerState>,
    request: Request<Body>,
) -> Result<Response, Error> {
    let file = Utf8PathBuf::from("./").join(file);
    let target: Target = target.parse()?;
    info!("Requested file {file:?} from {target}");
    let file = match state.manager.get_filepath(&target, &file) {
        Ok(file) => file,
        Err(e) => {
            error!("Couldn't Find File For Download {e:?}");
            return Ok(StatusCode::NOT_FOUND.into_response());
        }
    };
    info!("Found File path: {file:?}");
    let serve = ServeFile::new(file);
    let result = serve.oneshot(request).await?;
    info!("Result has status {:?}", result.status());
    Ok(result.into_response())
}