Skip to main content

rustio_core/
server.rs

1//! The HTTP server. Binds a TCP listener, runs each connection on its
2//! own Tokio task, and shuts down gracefully on Ctrl-C.
3
4use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use http_body_util::{BodyExt, Full};
10use hyper::body::Incoming;
11use hyper::service::service_fn;
12use hyper::StatusCode;
13use hyper_util::rt::TokioIo;
14use tokio::net::TcpListener;
15
16use crate::error::Result;
17use crate::http::{Request, Response};
18use crate::router::Router;
19
20pub struct Server {
21    router: Arc<Router>,
22    addr: SocketAddr,
23}
24
25impl Server {
26    pub fn new(router: Router, addr: SocketAddr) -> Self {
27        Self {
28            router: Arc::new(router),
29            addr,
30        }
31    }
32
33    /// Run until Ctrl-C. Active connections get ~30s to drain before
34    /// we hard-stop.
35    pub async fn run(self) -> Result<()> {
36        let listener = TcpListener::bind(self.addr).await?;
37        log::info!("rustio listening on http://{}", self.addr);
38
39        let shutdown = shutdown_signal();
40        tokio::pin!(shutdown);
41
42        loop {
43            tokio::select! {
44                accept = listener.accept() => {
45                    let (stream, peer) = accept?;
46                    let io = TokioIo::new(stream);
47                    let router = self.router.clone();
48                    tokio::spawn(async move {
49                        let svc = service_fn(move |req: hyper::Request<Incoming>| {
50                            let router = router.clone();
51                            async move { handle(router, req, peer).await }
52                        });
53                        let conn = hyper::server::conn::http1::Builder::new()
54                            .keep_alive(true)
55                            .serve_connection(io, svc);
56                        if let Err(e) = conn.await {
57                            // Normal client disconnects produce noisy errors;
58                            // only log at warn level.
59                            log::debug!("connection error: {e}");
60                        }
61                    });
62                }
63                _ = &mut shutdown => {
64                    log::info!("shutdown signal received, stopping accept loop");
65                    break;
66                }
67            }
68        }
69
70        // Give in-flight requests 30s to finish. Spawned tasks that
71        // don't finish in time are dropped by the runtime.
72        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
73        Ok(())
74    }
75}
76
77async fn handle(
78    router: Arc<Router>,
79    hyper_req: hyper::Request<Incoming>,
80    _peer: SocketAddr,
81) -> std::result::Result<hyper::Response<Full<Bytes>>, hyper::Error> {
82    let method = hyper_req.method().clone();
83    let uri = hyper_req.uri().clone();
84    let path = uri.path().to_string();
85    let query = uri.query().unwrap_or("").to_string();
86
87    let mut headers = HashMap::new();
88    for (name, value) in hyper_req.headers() {
89        if let Ok(v) = value.to_str() {
90            headers.insert(name.as_str().to_ascii_lowercase(), v.to_string());
91        }
92    }
93
94    let body = match hyper_req.into_body().collect().await {
95        Ok(b) => b.to_bytes(),
96        Err(_) => {
97            return Ok(simple_response(
98                StatusCode::BAD_REQUEST,
99                "could not read body",
100            ))
101        }
102    };
103
104    let our_req = Request::new(method, path, query, headers, body);
105    let our_resp = router.dispatch(our_req).await;
106    Ok(to_hyper(our_resp))
107}
108
109fn to_hyper(resp: Response) -> hyper::Response<Full<Bytes>> {
110    let mut builder = hyper::Response::builder().status(resp.status);
111    for (name, value) in resp.headers {
112        builder = builder.header(name, value);
113    }
114    builder.body(Full::new(resp.body)).unwrap_or_else(|_| {
115        hyper::Response::builder()
116            .status(StatusCode::INTERNAL_SERVER_ERROR)
117            .body(Full::new(Bytes::from("internal error")))
118            .unwrap()
119    })
120}
121
122fn simple_response(status: StatusCode, body: &str) -> hyper::Response<Full<Bytes>> {
123    hyper::Response::builder()
124        .status(status)
125        .header("content-type", "text/plain; charset=utf-8")
126        .body(Full::new(Bytes::from(body.to_string())))
127        .unwrap()
128}
129
130async fn shutdown_signal() {
131    let ctrl_c = async {
132        tokio::signal::ctrl_c().await.ok();
133    };
134
135    #[cfg(unix)]
136    let terminate = async {
137        if let Ok(mut sig) =
138            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
139        {
140            sig.recv().await;
141        }
142    };
143
144    #[cfg(not(unix))]
145    let terminate = std::future::pending::<()>();
146
147    tokio::select! {
148        _ = ctrl_c => {}
149        _ = terminate => {}
150    }
151}
152
153/// Serve a static file from disk. Tiny, safe-ish — strips separators
154/// and rejects `..`.
155pub async fn serve_static(root: std::path::PathBuf, name: &str) -> Result<Response> {
156    let safe: String = name
157        .chars()
158        .filter(|c| *c != '/' && *c != '\\' && *c != '\0')
159        .collect();
160    if safe.contains("..") {
161        return Err(crate::error::Error::BadRequest("invalid path".into()));
162    }
163    let path = root.join(&safe);
164    if !path.is_file() {
165        return Err(crate::error::Error::NotFound(safe));
166    }
167    let bytes = tokio::fs::read(&path).await?;
168    Ok(Response::new(StatusCode::OK, Bytes::from(bytes))
169        .with_header("content-type", guess_content_type(&safe)))
170}
171
172fn guess_content_type(name: &str) -> &'static str {
173    match name.rsplit_once('.').map(|(_, ext)| ext) {
174        Some("css") => "text/css; charset=utf-8",
175        Some("js") => "application/javascript; charset=utf-8",
176        Some("png") => "image/png",
177        Some("jpg" | "jpeg") => "image/jpeg",
178        Some("svg") => "image/svg+xml",
179        Some("ico") => "image/x-icon",
180        Some("html") => "text/html; charset=utf-8",
181        Some("woff2") => "font/woff2",
182        Some("json") => "application/json; charset=utf-8",
183        _ => "application/octet-stream",
184    }
185}
186
187pub fn embedded_rustio_css() -> &'static str {
188    include_str!("../assets/static/css/rustio.css")
189}
190
191pub fn embedded_admin_css() -> &'static str {
192    include_str!("../assets/static/css/admin.css")
193}
194
195pub fn embedded_search_js() -> &'static str {
196    include_str!("../assets/static/js/search.js")
197}
198
199// Phase 7a/2 — self-hosted Inter (Latin subset) for the redesigned
200// admin UI. Each weight is a separate woff2 file ~24KB; total ~95KB
201// added to the binary. Served from `/static/fonts/Inter-{Weight}.woff2`
202// by `register_admin_routes`. Single-binary deploy invariant intact.
203pub fn embedded_inter_regular() -> &'static [u8] {
204    include_bytes!("../assets/static/fonts/Inter-Regular.woff2")
205}
206pub fn embedded_inter_medium() -> &'static [u8] {
207    include_bytes!("../assets/static/fonts/Inter-Medium.woff2")
208}
209pub fn embedded_inter_semibold() -> &'static [u8] {
210    include_bytes!("../assets/static/fonts/Inter-SemiBold.woff2")
211}
212pub fn embedded_inter_bold() -> &'static [u8] {
213    include_bytes!("../assets/static/fonts/Inter-Bold.woff2")
214}