conclavelib/server/
wss.rs1use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration};
10
11use anyhow::Context as _;
12use axum::{
13 Router,
14 extract::{
15 State,
16 ws::{Message, WebSocket, WebSocketUpgrade},
17 },
18 response::Response,
19 routing::get,
20};
21use tokio::{net::TcpListener, sync::mpsc};
22
23use crate::{base::Void, protocol, store::Store};
24
25use super::{hub::Hub, session::run_session};
26
27const REAP_INTERVAL: Duration = Duration::from_secs(15);
29const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
31
32pub struct ServerConfig {
34 pub bind: String,
36 pub data_dir: Option<PathBuf>,
38 pub admins: HashSet<String>,
40}
41
42pub async fn serve(config: ServerConfig) -> Void {
49 let store = match &config.data_dir {
50 Some(path) => Store::open(path).await?,
51 None => Store::open_in_memory().await?,
52 };
53 let hub = Hub::new(store, config.admins);
54
55 spawn_reaper(Arc::clone(&hub));
56
57 let app = Router::new().route("/", get(ws_handler)).with_state(hub);
58 let listener = TcpListener::bind(&config.bind).await.with_context(|| format!("failed to bind `{}`", config.bind))?;
59 let addr = listener.local_addr().context("failed to read the bound address")?;
60 tracing::info!(%addr, "conclave server listening");
61
62 axum::serve(listener, app).with_graceful_shutdown(shutdown_signal()).await.context("server terminated with an error")?;
63 Ok(())
64}
65
66fn spawn_reaper(hub: Arc<Hub>) {
68 tokio::spawn(async move {
69 let mut ticker = tokio::time::interval(REAP_INTERVAL);
70 loop {
71 ticker.tick().await;
72 let reaped = hub.reap_idle(IDLE_TIMEOUT);
73 if reaped > 0 {
74 tracing::debug!(reaped, "reaped idle sessions");
75 }
76 }
77 });
78}
79
80async fn ws_handler(ws: WebSocketUpgrade, State(hub): State<Arc<Hub>>) -> Response {
82 ws.on_upgrade(move |socket| handle_ws(hub, socket))
83}
84
85async fn handle_ws(hub: Arc<Hub>, socket: WebSocket) {
87 use futures_util::{SinkExt as _, StreamExt as _};
88
89 let (mut sink, mut stream) = socket.split();
90 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
91 let (outbound_tx, mut outbound_rx) = mpsc::unbounded_channel();
92
93 let read_task = tokio::spawn(async move {
94 while let Some(Ok(message)) = stream.next().await {
95 match message {
96 Message::Binary(data) => match protocol::decode(&data) {
97 Ok(frame) => {
98 if inbound_tx.send(frame).is_err() {
99 break;
100 }
101 }
102 Err(_) => break,
103 },
104 Message::Close(_) => break,
105 _ => {}
107 }
108 }
109 });
110
111 let write_task = tokio::spawn(async move {
112 while let Some(frame) = outbound_rx.recv().await {
113 let Ok(bytes) = protocol::encode(&frame) else { break };
114 if sink.send(Message::Binary(bytes.into())).await.is_err() {
115 break;
116 }
117 }
118 let _ = sink.close().await;
119 });
120
121 run_session(hub, inbound_rx, outbound_tx).await;
122 read_task.abort();
125 let _ = write_task.await;
126}
127
128async fn shutdown_signal() {
130 let _ = tokio::signal::ctrl_c().await;
131 tracing::info!("shutdown signal received; draining connections");
132}