Skip to main content

conclavelib/server/
wss.rs

1//! The production transport: the axum WebSocket endpoint and the `conclave serve` entrypoint.
2//!
3//! TLS terminates at cloudflared and the origin hop is local loopback (DESIGN.md §11/§12), so this
4//! is a plain-HTTP axum server whose single route upgrades to a WebSocket. Each accepted socket is
5//! split into reader / writer pumps that translate between WS binary messages and
6//! [`ProtocolMessage`](crate::protocol::ProtocolMessage) frames, then driven by the shared
7//! [`run_session`]. A background reaper enforces the idle-heartbeat timeout (DESIGN.md §10).
8
9use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration};
10
11use anyhow::Context as _;
12use axum::{
13    Router,
14    extract::{
15        State,
16        ws::{Message, WebSocket, WebSocketUpgrade},
17    },
18    response::Response,
19    routing::get,
20};
21use tokio::{net::TcpListener, sync::mpsc};
22
23use crate::{base::Void, protocol, store::Store};
24
25use super::{hub::Hub, session::run_session};
26
27/// How often the heartbeat reaper sweeps for idle sessions.
28const REAP_INTERVAL: Duration = Duration::from_secs(15);
29/// How long a session may go without any inbound frame before it is reaped (DESIGN.md §10).
30const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
31
32/// The operator-supplied `serve` configuration (DESIGN.md §7, §13).
33pub struct ServerConfig {
34    /// Address to bind the WebSocket endpoint to (e.g. `127.0.0.1:4390`).
35    pub bind: String,
36    /// Data directory for the embedded store; `None` runs a purely in-memory store.
37    pub data_dir: Option<PathBuf>,
38    /// The server-admin allowlist — usernames that may administer server-wide (§7).
39    pub admins: HashSet<String>,
40}
41
42/// Runs the central server until a shutdown signal (Ctrl-C) is received.
43///
44/// # Errors
45///
46/// Returns an error if the store cannot be opened, the bind address is unavailable, or the
47/// underlying HTTP server fails.
48pub async fn serve(config: ServerConfig) -> Void {
49    let store = match &config.data_dir {
50        Some(path) => Store::open(path).await?,
51        None => Store::open_in_memory().await?,
52    };
53    let hub = Hub::new(store, config.admins);
54
55    spawn_reaper(Arc::clone(&hub));
56
57    let app = Router::new().route("/", get(ws_handler)).with_state(hub);
58    let listener = TcpListener::bind(&config.bind).await.with_context(|| format!("failed to bind `{}`", config.bind))?;
59    let addr = listener.local_addr().context("failed to read the bound address")?;
60    tracing::info!(%addr, "conclave server listening");
61
62    axum::serve(listener, app).with_graceful_shutdown(shutdown_signal()).await.context("server terminated with an error")?;
63    Ok(())
64}
65
66/// Spawns the background heartbeat reaper (DESIGN.md §10).
67fn spawn_reaper(hub: Arc<Hub>) {
68    tokio::spawn(async move {
69        let mut ticker = tokio::time::interval(REAP_INTERVAL);
70        loop {
71            ticker.tick().await;
72            let reaped = hub.reap_idle(IDLE_TIMEOUT);
73            if reaped > 0 {
74                tracing::debug!(reaped, "reaped idle sessions");
75            }
76        }
77    });
78}
79
80/// The WebSocket upgrade handler; every connection is dispatched to [`handle_ws`].
81async fn ws_handler(ws: WebSocketUpgrade, State(hub): State<Arc<Hub>>) -> Response {
82    ws.on_upgrade(move |socket| handle_ws(hub, socket))
83}
84
85/// Bridges a WebSocket to [`run_session`]: each WS binary message is one protocol frame.
86async fn handle_ws(hub: Arc<Hub>, socket: WebSocket) {
87    use futures_util::{SinkExt as _, StreamExt as _};
88
89    let (mut sink, mut stream) = socket.split();
90    let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
91    let (outbound_tx, mut outbound_rx) = mpsc::unbounded_channel();
92
93    let read_task = tokio::spawn(async move {
94        while let Some(Ok(message)) = stream.next().await {
95            match message {
96                Message::Binary(data) => match protocol::decode(&data) {
97                    Ok(frame) => {
98                        if inbound_tx.send(frame).is_err() {
99                            break;
100                        }
101                    }
102                    Err(_) => break,
103                },
104                Message::Close(_) => break,
105                // Text / ping / pong are ignored: the heartbeat is an app-level Ping/Pong frame.
106                _ => {}
107            }
108        }
109    });
110
111    let write_task = tokio::spawn(async move {
112        while let Some(frame) = outbound_rx.recv().await {
113            let Ok(bytes) = protocol::encode(&frame) else { break };
114            if sink.send(Message::Binary(bytes.into())).await.is_err() {
115                break;
116            }
117        }
118        let _ = sink.close().await;
119    });
120
121    run_session(hub, inbound_rx, outbound_tx).await;
122    // Await the writer so a final handshake-failure / force-drop frame is flushed and the socket
123    // closed cleanly; abort the reader, which may be parked on an idle-but-open socket.
124    read_task.abort();
125    let _ = write_task.await;
126}
127
128/// Resolves when the process receives Ctrl-C, driving the graceful shutdown.
129async fn shutdown_signal() {
130    let _ = tokio::signal::ctrl_c().await;
131    tracing::info!("shutdown signal received; draining connections");
132}