dynomite/stats/rest.rs
1//! Hand-rolled HTTP/1.1 server exposing the stats snapshot as JSON.
2//!
3//! The C reference handles a handful of query commands. This module
4//! implements a minimal subset: a `GET /` (and `GET /info`) request
5//! that returns the latest snapshot. Every other request returns an
6//! empty `200 OK` with body `OK\r\n`, matching the C fallback path.
7
8#![allow(clippy::needless_continue)]
9
10use std::io;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Duration;
14
15use parking_lot::Mutex;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::{TcpListener, TcpStream};
18use tracing::Instrument as _;
19
20use crate::admin::cluster_info::{format_text, ClusterInfoSnapshot};
21use crate::stats::prometheus::render_prometheus;
22use crate::stats::snapshot::Snapshot;
23
24/// Type alias for a closure that produces a fresh
25/// [`ClusterInfoSnapshot`] every time the `/cluster-info.txt`
26/// route is hit.
27///
28/// The closure must be `Send + Sync` so a clone can be moved
29/// into each accept-handler task. The runtime owns the closure;
30/// embedders set it via
31/// [`StatsServer::with_cluster_info_provider`].
32pub type ClusterInfoProvider = Arc<dyn Fn() -> ClusterInfoSnapshot + Send + Sync>;
33
34/// Maximum number of bytes the server will read for an HTTP request
35/// line plus headers. Requests larger than this are rejected.
36///
37/// # Examples
38///
39/// ```
40/// assert!(dynomite::stats::MAX_REQUEST_BYTES >= 1024);
41/// ```
42pub const MAX_REQUEST_BYTES: usize = 8 * 1024;
43
44/// Maximum number of headers parsed in a single request.
45///
46/// # Examples
47///
48/// ```
49/// assert!(dynomite::stats::MAX_HEADERS > 0);
50/// ```
51pub const MAX_HEADERS: usize = 32;
52
53/// Maximum time the server waits for a single read from a connected
54/// peer before closing the socket. Mirrors the implicit blocking-recv
55/// behavior of the reference engine while protecting tokio tasks from
56/// slow-loris clients.
57const READ_TIMEOUT: Duration = Duration::from_secs(5);
58
59/// A bound TCP listener serving the stats endpoint.
60///
61/// Construct via [`StatsServer::bind`] then call
62/// [`StatsServer::run`] to accept connections in a loop, or
63/// [`StatsServer::accept_one`] for one-shot tests.
64///
65/// # Examples
66///
67/// ```no_run
68/// use std::sync::Arc;
69/// use dynomite::stats::{Snapshot, StatsServer};
70/// use parking_lot::Mutex;
71///
72/// # async fn _example() -> std::io::Result<()> {
73/// let sink = Arc::new(Mutex::new(Snapshot::default()));
74/// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
75/// let _addr = server.local_addr()?;
76/// # Ok(())
77/// # }
78/// ```
79pub struct StatsServer {
80 listener: TcpListener,
81 source: Arc<Mutex<Snapshot>>,
82 cluster_info: Option<ClusterInfoProvider>,
83}
84
85impl StatsServer {
86 /// Bind a listener at `addr`. Returns the bound server alongside
87 /// its actual local address (useful when binding to port 0).
88 ///
89 /// # Examples
90 ///
91 /// ```no_run
92 /// use std::sync::Arc;
93 /// use dynomite::stats::{Snapshot, StatsServer};
94 /// use parking_lot::Mutex;
95 ///
96 /// # async fn _example() -> std::io::Result<()> {
97 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
98 /// let _server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
99 /// # Ok(())
100 /// # }
101 /// ```
102 pub async fn bind(addr: SocketAddr, source: Arc<Mutex<Snapshot>>) -> io::Result<Self> {
103 let listener = TcpListener::bind(addr).await?;
104 Ok(Self {
105 listener,
106 source,
107 cluster_info: None,
108 })
109 }
110
111 /// Attach a [`ClusterInfoProvider`] so the server answers
112 /// `GET /cluster-info.txt` with a freshly assembled
113 /// snapshot. When no provider is registered the route
114 /// returns `503 Service Unavailable`.
115 ///
116 /// # Examples
117 ///
118 /// ```no_run
119 /// use std::sync::Arc;
120 /// use dynomite::admin::cluster_info::ClusterInfoSnapshot;
121 /// use dynomite::stats::{Snapshot, StatsServer};
122 /// use parking_lot::Mutex;
123 ///
124 /// # async fn _example() -> std::io::Result<()> {
125 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
126 /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink)
127 /// .await?
128 /// .with_cluster_info_provider(Arc::new(ClusterInfoSnapshot::synthetic));
129 /// drop(server);
130 /// # Ok(())
131 /// # }
132 /// ```
133 #[must_use]
134 pub fn with_cluster_info_provider(mut self, provider: ClusterInfoProvider) -> Self {
135 self.cluster_info = Some(provider);
136 self
137 }
138
139 /// Returns the local socket address the server is listening on.
140 ///
141 /// # Examples
142 ///
143 /// ```no_run
144 /// use std::sync::Arc;
145 /// use dynomite::stats::{Snapshot, StatsServer};
146 /// use parking_lot::Mutex;
147 ///
148 /// # async fn _example() -> std::io::Result<()> {
149 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
150 /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
151 /// let addr = server.local_addr()?;
152 /// assert!(addr.port() != 0);
153 /// # Ok(())
154 /// # }
155 /// ```
156 pub fn local_addr(&self) -> io::Result<SocketAddr> {
157 self.listener.local_addr()
158 }
159
160 /// Accept a single connection, serve one HTTP/1.1 request, and
161 /// return.
162 ///
163 /// # Examples
164 ///
165 /// ```no_run
166 /// use std::sync::Arc;
167 /// use dynomite::stats::{Snapshot, StatsServer};
168 /// use parking_lot::Mutex;
169 ///
170 /// # async fn _example() -> std::io::Result<()> {
171 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
172 /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
173 /// server.accept_one().await?;
174 /// # Ok(())
175 /// # }
176 /// ```
177 pub async fn accept_one(&self) -> io::Result<()> {
178 let (sock, _peer) = self.listener.accept().await?;
179 let snapshot = self.source.lock().clone();
180 let cluster_info = self.cluster_info.clone();
181 serve_connection(sock, snapshot, cluster_info).await
182 }
183
184 /// Run the accept loop until cancelled. Each connection is handled
185 /// on a fresh task so a slow client cannot stall the listener.
186 ///
187 /// # Examples
188 ///
189 /// ```no_run
190 /// use std::sync::Arc;
191 /// use dynomite::stats::{Snapshot, StatsServer};
192 /// use parking_lot::Mutex;
193 ///
194 /// # async fn _example() -> std::io::Result<()> {
195 /// let sink = Arc::new(Mutex::new(Snapshot::default()));
196 /// let server = StatsServer::bind("127.0.0.1:0".parse().unwrap(), sink).await?;
197 /// let _ = tokio::spawn(async move { server.run().await });
198 /// # Ok(())
199 /// # }
200 /// ```
201 pub async fn run(self) -> io::Result<()> {
202 let span = tracing::info_span!(
203 "stats_server.run",
204 local = %self.listener.local_addr().map_or_else(|_| String::from("?"), |a| a.to_string()),
205 );
206 let listener = self.listener;
207 let source = self.source;
208 let cluster_info = self.cluster_info;
209 async move {
210 loop {
211 let (sock, _peer) = listener.accept().await?;
212 let snapshot = source.lock().clone();
213 let ci = cluster_info.clone();
214 tokio::spawn(async move {
215 let _ = serve_connection(sock, snapshot, ci).await;
216 });
217 }
218 }
219 .instrument(span)
220 .await
221 }
222}
223
224async fn serve_connection(
225 mut sock: TcpStream,
226 snapshot: Snapshot,
227 cluster_info: Option<ClusterInfoProvider>,
228) -> io::Result<()> {
229 let mut buf = vec![0u8; MAX_REQUEST_BYTES];
230 let mut filled = 0usize;
231 loop {
232 if filled == buf.len() {
233 return write_response(&mut sock, 400, "Bad Request", b"").await;
234 }
235 let read_result = tokio::time::timeout(READ_TIMEOUT, sock.read(&mut buf[filled..])).await;
236 let Ok(Ok(n)) = read_result else {
237 // Read error or timeout: close silently, matching the
238 // reference error path which drops the connection without
239 // writing a response.
240 let _ = sock.shutdown().await;
241 return Ok(());
242 };
243 if n == 0 {
244 break;
245 }
246 filled += n;
247 let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
248 let mut req = httparse::Request::new(&mut headers);
249 match req.parse(&buf[..filled]) {
250 Ok(httparse::Status::Complete(_)) => {
251 return handle_parsed(&mut sock, &req, snapshot, cluster_info).await;
252 }
253 Ok(httparse::Status::Partial) => continue,
254 Err(_) => {
255 return write_response(&mut sock, 400, "Bad Request", b"").await;
256 }
257 }
258 }
259 Ok(())
260}
261
262async fn handle_parsed(
263 sock: &mut TcpStream,
264 req: &httparse::Request<'_, '_>,
265 snapshot: Snapshot,
266 cluster_info: Option<ClusterInfoProvider>,
267) -> io::Result<()> {
268 let path = req.path.unwrap_or("/");
269 if !matches!(req.method, Some("GET")) {
270 return write_response(sock, 405, "Method Not Allowed", b"").await;
271 }
272 match path {
273 "/" | "/info" | "/stats" => {
274 let body = snapshot.to_json();
275 write_json_response(sock, body.as_bytes()).await
276 }
277 "/metrics" => {
278 let body = render_prometheus(&snapshot);
279 write_metrics_response(sock, body.as_bytes()).await
280 }
281 "/cluster-info.txt" => match cluster_info {
282 Some(provider) => {
283 let snap = provider();
284 let mut body: Vec<u8> = Vec::with_capacity(4096);
285 if format_text(&snap, &mut body).is_err() {
286 return write_response(sock, 500, "Internal Server Error", b"").await;
287 }
288 write_text_response(sock, &body).await
289 }
290 None => write_response(sock, 503, "Service Unavailable", b"").await,
291 },
292 _ => write_response(sock, 200, "OK", b"OK\r\n").await,
293 }
294}
295
296async fn write_text_response(sock: &mut TcpStream, body: &[u8]) -> io::Result<()> {
297 let header = format!(
298 "HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=us-ascii\r\n\
299 Content-Length: {}\r\nConnection: close\r\n\r\n",
300 body.len()
301 );
302 sock.write_all(header.as_bytes()).await?;
303 sock.write_all(body).await?;
304 sock.shutdown().await?;
305 Ok(())
306}
307
308async fn write_response(
309 sock: &mut TcpStream,
310 code: u16,
311 reason: &str,
312 body: &[u8],
313) -> io::Result<()> {
314 let header = format!(
315 "HTTP/1.1 {code} {reason}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
316 body.len()
317 );
318 sock.write_all(header.as_bytes()).await?;
319 if !body.is_empty() {
320 sock.write_all(body).await?;
321 }
322 sock.shutdown().await?;
323 Ok(())
324}
325
326async fn write_json_response(sock: &mut TcpStream, body: &[u8]) -> io::Result<()> {
327 let header = format!(
328 "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\n\
329 Content-Length: {}\r\nConnection: close\r\n\r\n",
330 body.len()
331 );
332 sock.write_all(header.as_bytes()).await?;
333 sock.write_all(body).await?;
334 sock.shutdown().await?;
335 Ok(())
336}
337
338async fn write_metrics_response(sock: &mut TcpStream, body: &[u8]) -> io::Result<()> {
339 let header = format!(
340 "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4; charset=utf-8\r\n\
341 Content-Length: {}\r\nConnection: close\r\n\r\n",
342 body.len()
343 );
344 sock.write_all(header.as_bytes()).await?;
345 sock.write_all(body).await?;
346 sock.shutdown().await?;
347 Ok(())
348}