Skip to main content

vex_daemon/
lib.rs

1mod protocol;
2mod state;
3mod ws;
4
5use std::net::SocketAddr;
6use std::sync::Arc;
7
8use axum::Router;
9use axum::extract::{State, WebSocketUpgrade};
10use axum::response::IntoResponse;
11use axum::routing::get;
12use tokio::sync::RwLock;
13use tower_http::cors::CorsLayer;
14use vex_app::VexHome;
15
16use crate::state::{DaemonState, SharedState};
17
18/// Run the daemon server. This blocks until the server shuts down.
19/// Should be called from within a tokio runtime.
20pub async fn run(vex_home: VexHome) -> Result<(), String> {
21    // Write PID file for test teardown and external process management
22    let pid_path = vex_home.root().join("daemon.pid");
23    let _ = std::fs::write(&pid_path, std::process::id().to_string());
24
25    let daemon_state = DaemonState::new(vex_home)?;
26
27    let port = daemon_state.config.daemon_port;
28    let token = daemon_state.token.clone();
29    let shutdown_notify = Arc::clone(&daemon_state.shutdown);
30    let shared_state: SharedState = Arc::new(RwLock::new(daemon_state));
31
32    let app = Router::new()
33        .route("/ws", get(ws_handler))
34        .route("/health", get(health_handler))
35        .layer(CorsLayer::permissive())
36        .with_state(shared_state);
37
38    let addr = SocketAddr::from(([0, 0, 0, 0], port));
39    println!("vex daemon API on ws://{addr}/ws");
40    println!("Auth token: {token}");
41    println!("Run `vex hub start-web` to launch the web UI");
42
43    let listener = tokio::net::TcpListener::bind(addr)
44        .await
45        .map_err(|e| format!("failed to bind port {port}: {e}"))?;
46    axum::serve(listener, app)
47        .with_graceful_shutdown(shutdown_signal(shutdown_notify))
48        .await
49        .map_err(|e| format!("server error: {e}"))?;
50
51    Ok(())
52}
53
54/// Future that resolves when a shutdown signal is received.
55/// Triggers on SIGTERM/SIGINT or when the daemon.shutdown RPC is called.
56async fn shutdown_signal(shutdown_notify: Arc<tokio::sync::Notify>) {
57    use tokio::signal;
58
59    let ctrl_c = async {
60        signal::ctrl_c().await.ok();
61    };
62
63    let rpc_shutdown = shutdown_notify.notified();
64
65    #[cfg(unix)]
66    {
67        let mut sigterm =
68            signal::unix::signal(signal::unix::SignalKind::terminate()).expect("SIGTERM handler");
69        tokio::select! {
70            () = ctrl_c => {}
71            _ = sigterm.recv() => {}
72            () = rpc_shutdown => {}
73        }
74    }
75
76    #[cfg(not(unix))]
77    {
78        tokio::select! {
79            () = ctrl_c => {}
80            () = rpc_shutdown => {}
81        }
82    }
83}
84
85async fn ws_handler(ws: WebSocketUpgrade, State(state): State<SharedState>) -> impl IntoResponse {
86    ws.on_upgrade(move |socket| ws::handle_socket(socket, state))
87}
88
89async fn health_handler() -> &'static str {
90    "ok"
91}