Skip to main content

dynomite/stats/
rest.rs

1//! Hand-rolled HTTP/1.1 server exposing the stats snapshot as JSON.
2//!
3//! The C reference handles a handful of query commands. This module
4//! implements a minimal subset: a `GET /` (and `GET /info`) request
5//! that returns the latest snapshot. Every other request returns an
6//! empty `200 OK` with body `OK\r\n`, matching the C fallback path.
7
8#![allow(clippy::needless_continue)]
9
10use std::io;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Duration;
14
15use parking_lot::Mutex;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::{TcpListener, TcpStream};
18use tracing::Instrument as _;
19
20use crate::admin::cluster_info::{format_text, ClusterInfoSnapshot};
21use crate::stats::prometheus::render_prometheus;
22use crate::stats::snapshot::Snapshot;
23
24/// Type alias for a closure that produces a fresh
25/// [`ClusterInfoSnapshot`] every time the `/cluster-info.txt`
26/// route is hit.
27///
28/// The closure must be `Send + Sync` so a clone can be moved
29/// into each accept-handler task. The runtime owns the closure;
30/// embedders set it via
31/// [`StatsServer::with_cluster_info_provider`].
32pub type ClusterInfoProvider = Arc<dyn Fn() -> ClusterInfoSnapshot + Send + Sync>;
33
34/// Maximum number of bytes the server will read for an HTTP request
35/// line plus headers. Requests larger than this are rejected.
36///
37/// # Examples
38///
39/// ```
40/// assert!(dynomite::stats::MAX_REQUEST_BYTES >= 1024);
41/// ```
42pub const MAX_REQUEST_BYTES: usize = 8 * 1024;
43
44/// Maximum number of headers parsed in a single request.
45///
46/// # Examples
47///
48/// ```
49/// assert!(dynomite::stats::MAX_HEADERS > 0);
50/// ```
51pub const MAX_HEADERS: usize = 32;
52
53/// Maximum time the server waits for a single read from a connected
54/// peer before closing the socket. Mirrors the implicit blocking-recv
55/// behavior of the reference engine while protecting tokio tasks from
56/// slow-loris clients.
57const READ_TIMEOUT: Duration = Duration::from_secs(5);
58
59/// A bound TCP listener serving the stats endpoint.
60///
61/// Construct via [`StatsServer::bind`] then call
62/// [`StatsServer::run`] to accept connections in a loop, or
63/// [`StatsServer::accept_one`] for one-shot tests.
64///
65/// # Examples
66///
67/// ```no_run
68/// use std::sync::Arc;
69/// use dynomite::stats::{Snapshot, StatsServer};
70/// use parking_lot::Mutex;
71///
72/// # async fn _example() -> std::io::Result<()> {
73/// let sink = Arc::new(Mutex::new(Snapshot::default()));
74/// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
75/// let _addr = server.local_addr()?;
76/// # Ok(())
77/// # }
78/// ```
79pub struct StatsServer {
80    listener: TcpListener,
81    source: Arc<Mutex<Snapshot>>,
82    cluster_info: Option<ClusterInfoProvider>,
83}
84
85impl StatsServer {
86    /// Bind a listener at `addr`. Returns the bound server alongside
87    /// its actual local address (useful when binding to port 0).
88    ///
89    /// # Examples
90    ///
91    /// ```no_run
92    /// use std::sync::Arc;
93    /// use dynomite::stats::{Snapshot, StatsServer};
94    /// use parking_lot::Mutex;
95    ///
96    /// # async fn _example() -> std::io::Result<()> {
97    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
98    /// let _server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
99    /// # Ok(())
100    /// # }
101    /// ```
102    pub async fn bind(addr: SocketAddr, source: Arc<Mutex<Snapshot>>) -> io::Result<Self> {
103        let listener = TcpListener::bind(addr).await?;
104        Ok(Self {
105            listener,
106            source,
107            cluster_info: None,
108        })
109    }
110
111    /// Attach a [`ClusterInfoProvider`] so the server answers
112    /// `GET /cluster-info.txt` with a freshly assembled
113    /// snapshot. When no provider is registered the route
114    /// returns `503 Service Unavailable`.
115    ///
116    /// # Examples
117    ///
118    /// ```no_run
119    /// use std::sync::Arc;
120    /// use dynomite::admin::cluster_info::ClusterInfoSnapshot;
121    /// use dynomite::stats::{Snapshot, StatsServer};
122    /// use parking_lot::Mutex;
123    ///
124    /// # async fn _example() -> std::io::Result<()> {
125    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
126    /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink)
127    ///     .await?
128    ///     .with_cluster_info_provider(Arc::new(ClusterInfoSnapshot::synthetic));
129    /// drop(server);
130    /// # Ok(())
131    /// # }
132    /// ```
133    #[must_use]
134    pub fn with_cluster_info_provider(mut self, provider: ClusterInfoProvider) -> Self {
135        self.cluster_info = Some(provider);
136        self
137    }
138
139    /// Returns the local socket address the server is listening on.
140    ///
141    /// # Examples
142    ///
143    /// ```no_run
144    /// use std::sync::Arc;
145    /// use dynomite::stats::{Snapshot, StatsServer};
146    /// use parking_lot::Mutex;
147    ///
148    /// # async fn _example() -> std::io::Result<()> {
149    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
150    /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
151    /// let addr = server.local_addr()?;
152    /// assert!(addr.port() != 0);
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub fn local_addr(&self) -> io::Result<SocketAddr> {
157        self.listener.local_addr()
158    }
159
160    /// Accept a single connection, serve one HTTP/1.1 request, and
161    /// return.
162    ///
163    /// # Examples
164    ///
165    /// ```no_run
166    /// use std::sync::Arc;
167    /// use dynomite::stats::{Snapshot, StatsServer};
168    /// use parking_lot::Mutex;
169    ///
170    /// # async fn _example() -> std::io::Result<()> {
171    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
172    /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
173    /// server.accept_one().await?;
174    /// # Ok(())
175    /// # }
176    /// ```
177    pub async fn accept_one(&self) -> io::Result<()> {
178        let (sock, _peer) = self.listener.accept().await?;
179        let snapshot = self.source.lock().clone();
180        let cluster_info = self.cluster_info.clone();
181        serve_connection(sock, snapshot, cluster_info).await
182    }
183
184    /// Run the accept loop until cancelled. Each connection is handled
185    /// on a fresh task so a slow client cannot stall the listener.
186    ///
187    /// # Examples
188    ///
189    /// ```no_run
190    /// use std::sync::Arc;
191    /// use dynomite::stats::{Snapshot, StatsServer};
192    /// use parking_lot::Mutex;
193    ///
194    /// # async fn _example() -> std::io::Result<()> {
195    /// let sink = Arc::new(Mutex::new(Snapshot::default()));
196    /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
197    /// let _ = tokio::spawn(async move { server.run().await });
198    /// # Ok(())
199    /// # }
200    /// ```
201    pub async fn run(self) -> io::Result<()> {
202        let span = tracing::info_span!(
203            "stats_server.run",
204            local = %self.listener.local_addr().map_or_else(|_| String::from("?"), |a| a.to_string()),
205        );
206        let listener = self.listener;
207        let source = self.source;
208        let cluster_info = self.cluster_info;
209        async move {
210            loop {
211                let (sock, _peer) = listener.accept().await?;
212                let snapshot = source.lock().clone();
213                let ci = cluster_info.clone();
214                tokio::spawn(async move {
215                    let _ = serve_connection(sock, snapshot, ci).await;
216                });
217            }
218        }
219        .instrument(span)
220        .await
221    }
222}
223
224async fn serve_connection(
225    mut sock: TcpStream,
226    snapshot: Snapshot,
227    cluster_info: Option<ClusterInfoProvider>,
228) -> io::Result<()> {
229    let mut buf = vec![0u8; MAX_REQUEST_BYTES];
230    let mut filled = 0usize;
231    loop {
232        if filled == buf.len() {
233            return write_response(&mut sock, 400, "Bad Request", b"").await;
234        }
235        let read_result = tokio::time::timeout(READ_TIMEOUT, sock.read(&mut buf[filled..])).await;
236        let Ok(Ok(n)) = read_result else {
237            // Read error or timeout: close silently, matching the
238            // reference error path which drops the connection without
239            // writing a response.
240            let _ = sock.shutdown().await;
241            return Ok(());
242        };
243        if n == 0 {
244            break;
245        }
246        filled += n;
247        let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
248        let mut req = httparse::Request::new(&mut headers);
249        match req.parse(&buf[..filled]) {
250            Ok(httparse::Status::Complete(_)) => {
251                return handle_parsed(&mut sock, &req, snapshot, cluster_info).await;
252            }
253            Ok(httparse::Status::Partial) => continue,
254            Err(_) => {
255                return write_response(&mut sock, 400, "Bad Request", b"").await;
256            }
257        }
258    }
259    Ok(())
260}
261
262async fn handle_parsed(
263    sock: &mut TcpStream,
264    req: &httparse::Request<'_, '_>,
265    snapshot: Snapshot,
266    cluster_info: Option<ClusterInfoProvider>,
267) -> io::Result<()> {
268    let path = req.path.unwrap_or("/");
269    if !matches!(req.method, Some("GET")) {
270        return write_response(sock, 405, "Method Not Allowed", b"").await;
271    }
272    match path {
273        "/" | "/info" | "/stats" => {
274            let body = snapshot.to_json();
275            write_json_response(sock, body.as_bytes()).await
276        }
277        "/metrics" => {
278            let body = render_prometheus(&snapshot);
279            write_metrics_response(sock, body.as_bytes()).await
280        }
281        "/cluster-info.txt" => match cluster_info {
282            Some(provider) => {
283                let snap = provider();
284                let mut body: Vec<u8> = Vec::with_capacity(4096);
285                if format_text(&snap, &mut body).is_err() {
286                    return write_response(sock, 500, "Internal Server Error", b"").await;
287                }
288                write_text_response(sock, &body).await
289            }
290            None => write_response(sock, 503, "Service Unavailable", b"").await,
291        },
292        _ => write_response(sock, 200, "OK", b"OK\r\n").await,
293    }
294}
295
296async fn write_text_response(sock: &mut TcpStream, body: &[u8]) -> io::Result<()> {
297    let header = format!(
298        "HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=us-ascii\r\n\
299         Content-Length: {}\r\nConnection: close\r\n\r\n",
300        body.len()
301    );
302    sock.write_all(header.as_bytes()).await?;
303    sock.write_all(body).await?;
304    sock.shutdown().await?;
305    Ok(())
306}
307
308async fn write_response(
309    sock: &mut TcpStream,
310    code: u16,
311    reason: &str,
312    body: &[u8],
313) -> io::Result<()> {
314    let header = format!(
315        "HTTP/1.1 {code} {reason}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
316        body.len()
317    );
318    sock.write_all(header.as_bytes()).await?;
319    if !body.is_empty() {
320        sock.write_all(body).await?;
321    }
322    sock.shutdown().await?;
323    Ok(())
324}
325
326async fn write_json_response(sock: &mut TcpStream, body: &[u8]) -> io::Result<()> {
327    let header = format!(
328        "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\n\
329         Content-Length: {}\r\nConnection: close\r\n\r\n",
330        body.len()
331    );
332    sock.write_all(header.as_bytes()).await?;
333    sock.write_all(body).await?;
334    sock.shutdown().await?;
335    Ok(())
336}
337
338async fn write_metrics_response(sock: &mut TcpStream, body: &[u8]) -> io::Result<()> {
339    let header = format!(
340        "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4; charset=utf-8\r\n\
341         Content-Length: {}\r\nConnection: close\r\n\r\n",
342        body.len()
343    );
344    sock.write_all(header.as_bytes()).await?;
345    sock.write_all(body).await?;
346    sock.shutdown().await?;
347    Ok(())
348}