Skip to main content

running_process/broker/
broker_http_server.rs

1//! Broker HTTP server scaffold (slice 7 of #488).
2//!
3//! Tiny single-threaded HTTP/1.1 server using only `std::net::TcpListener`
4//! — no hyper/axum dep yet, just enough to bind a port, accept a request,
5//! and respond with a placeholder page that lists the currently-registered
6//! backends from [`super::http_endpoint_registry::HttpEndpointRegistry`].
7//!
8//! Honors the resolved bind state from
9//! [`super::broker_http_port::BrokerHttpPort::resolve`]: the port is one of
10//! `Static`, `Dynamic`, or `StaticOrFallback`; the address comes from the
11//! env override or defaults to `127.0.0.1`.
12//!
13//! The aggregator iframe page lands in slice 8. This slice produces only a
14//! plain-text list so consumers can verify the server is reachable and the
15//! registry is wired correctly.
16
17use std::io::{BufRead, BufReader, Read, Write};
18use std::net::{SocketAddr, TcpListener, TcpStream};
19use std::sync::Arc;
20
21use crate::broker::broker_http_port::{BrokerHttpPort, ResolvedHttpBind};
22use crate::broker::http_endpoint_registry::HttpEndpointRegistry;
23
24/// Errors raised by [`bind_broker_http_server`].
25#[derive(Debug, thiserror::Error)]
26pub enum BrokerHttpServerError {
27    /// `bind(addr:port)` failed and we have no fallback to fall back to.
28    #[error("bind {addr}:{port} failed: {source}")]
29    Bind {
30        /// IP we tried to bind on.
31        addr: std::net::IpAddr,
32        /// Port we tried to bind on.
33        port: u16,
34        /// Underlying IO error.
35        #[source]
36        source: std::io::Error,
37    },
38}
39
40/// A bound but not-yet-serving HTTP listener. Caller decides whether to
41/// drive [`serve_once`] in a blocking thread, behind tokio, etc.
42pub struct BrokerHttpServer {
43    listener: TcpListener,
44    local: SocketAddr,
45    registry: Arc<HttpEndpointRegistry>,
46}
47
48impl BrokerHttpServer {
49    /// Resolve the [`BrokerHttpPort`] config + env, then bind a
50    /// `TcpListener` on the resulting address.
51    ///
52    /// Behavior per #483 §3:
53    /// - `Static`: bind exactly that port; bubble up the bind error.
54    /// - `Dynamic`: bind to `port=0` (OS-allocated).
55    /// - `StaticOrFallback`: try the preferred port; on EADDRINUSE
56    ///   retry with `port=0`.
57    pub fn bind(
58        config: BrokerHttpPort,
59        registry: Arc<HttpEndpointRegistry>,
60    ) -> Result<Self, BrokerHttpServerError> {
61        let resolved = BrokerHttpPort::resolve(config);
62        let listener = match resolved.port {
63            BrokerHttpPort::Static { port } => try_bind(resolved, port)?,
64            BrokerHttpPort::Dynamic => try_bind(resolved, 0)?,
65            BrokerHttpPort::StaticOrFallback { preferred } => {
66                match try_bind(resolved, preferred) {
67                    Ok(l) => l,
68                    Err(BrokerHttpServerError::Bind { source, .. })
69                        if source.kind() == std::io::ErrorKind::AddrInUse =>
70                    {
71                        try_bind(resolved, 0)?
72                    }
73                    Err(other) => return Err(other),
74                }
75            }
76        };
77        let local = listener.local_addr().map_err(|source| {
78            BrokerHttpServerError::Bind {
79                addr: resolved.addr,
80                port: 0,
81                source,
82            }
83        })?;
84        Ok(Self {
85            listener,
86            local,
87            registry,
88        })
89    }
90
91    /// The actual bound `SocketAddr` (post-resolution). Use this to
92    /// populate `GetBrokerHttpEndpointResponse.port` and the runtime-file
93    /// shape (slice 9 plumbs the resolved address through).
94    pub fn local_addr(&self) -> SocketAddr {
95        self.local
96    }
97
98    /// Accept ONE connection and respond with the placeholder page,
99    /// then return. Intended for tests + the future slice-7 serve loop.
100    pub fn serve_once(&self) -> std::io::Result<()> {
101        let (stream, _peer) = self.listener.accept()?;
102        handle_one(stream, &self.registry)
103    }
104}
105
106fn try_bind(
107    resolved: ResolvedHttpBind,
108    port: u16,
109) -> Result<TcpListener, BrokerHttpServerError> {
110    let bind_addr = SocketAddr::new(resolved.addr, port);
111    TcpListener::bind(bind_addr).map_err(|source| BrokerHttpServerError::Bind {
112        addr: resolved.addr,
113        port,
114        source,
115    })
116}
117
118fn handle_one(mut stream: TcpStream, registry: &HttpEndpointRegistry) -> std::io::Result<()> {
119    // Minimal HTTP/1.1: read until "\r\n\r\n", grab the request line,
120    // route GET / to the placeholder page, fall through to 404.
121    let mut reader = BufReader::new(stream.try_clone()?);
122    let mut request_line = String::new();
123    let _ = reader.read_line(&mut request_line);
124    let mut headers_done = false;
125    while !headers_done {
126        let mut buf = [0u8; 1];
127        if reader.read(&mut buf)? == 0 {
128            break;
129        }
130        if buf[0] == b'\r' {
131            let mut peek = [0u8; 3];
132            let n = reader.read(&mut peek)?;
133            if n >= 3 && peek == [b'\n', b'\r', b'\n'] {
134                headers_done = true;
135            }
136        }
137        // The placeholder server does not consume request bodies; we
138        // assume the client is a no-body GET.
139    }
140
141    let path = request_line
142        .split_whitespace()
143        .nth(1)
144        .unwrap_or("/")
145        .to_string();
146
147    let (status_line, content_type, body) = if request_line.starts_with("GET ")
148        && (path == "/" || path.is_empty())
149    {
150        (
151            "HTTP/1.1 200 OK",
152            "text/html; charset=utf-8",
153            render_aggregator_page(registry),
154        )
155    } else {
156        (
157            "HTTP/1.1 404 Not Found",
158            "text/plain; charset=utf-8",
159            "not found\n".to_string(),
160        )
161    };
162
163    let response = format!(
164        "{status_line}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
165        body.len(),
166        body,
167    );
168    stream.write_all(response.as_bytes())?;
169    stream.flush()?;
170    Ok(())
171}
172
173/// Render the aggregator page (slice 8 of #488): top-bar selector +
174/// full-bleed iframe. The selector emits one button per registered
175/// backend; clicking flips the iframe's `src` to that backend's HTTP
176/// root. Backends whose registry slot is `None` render as disabled
177/// buttons with `(starting…)` text — they don't accidentally try to
178/// load a URL the broker doesn't have yet.
179///
180/// The page is a single self-contained document: no external CSS,
181/// no external JS, no fonts. Keeps it loadable on locked-down
182/// operator boxes and trivially auditable.
183fn render_aggregator_page(registry: &HttpEndpointRegistry) -> String {
184    let mut snap = registry.snapshot();
185    snap.sort_by(|a, b| a.0.cmp(&b.0));
186
187    let mut buttons = String::new();
188    let mut initial_src = String::new();
189    if snap.is_empty() {
190        buttons.push_str(
191            r#"<span class="empty">no backends registered yet</span>"#,
192        );
193    } else {
194        for (id, port) in &snap {
195            match port {
196                Some(p) => {
197                    let url = format!("http://127.0.0.1:{p}/");
198                    if initial_src.is_empty() {
199                        initial_src.clone_from(&url);
200                    }
201                    buttons.push_str(&format!(
202                        r#"<button onclick="document.getElementById('agg').src={url:?}">{}</button>"#,
203                        html_escape(id),
204                    ));
205                }
206                None => {
207                    buttons.push_str(&format!(
208                        r#"<button disabled title="backend has not reported a port yet">{} (starting…)</button>"#,
209                        html_escape(id),
210                    ));
211                }
212            }
213        }
214    }
215
216    let initial_src_attr = if initial_src.is_empty() {
217        "about:blank".to_string()
218    } else {
219        initial_src
220    };
221
222    format!(
223        r#"<!doctype html>
224<html lang="en">
225<head>
226<meta charset="utf-8">
227<title>running-process broker-v2 aggregator</title>
228<style>
229  html, body {{ margin: 0; padding: 0; height: 100%; font-family: system-ui, sans-serif; }}
230  #bar {{ display: flex; gap: 0.4rem; padding: 0.4rem; border-bottom: 1px solid #ccc; background: #f5f5f5; }}
231  #bar button {{ padding: 0.3rem 0.8rem; }}
232  #agg {{ width: 100%; height: calc(100% - 3rem); border: 0; }}
233  .empty {{ color: #888; font-style: italic; }}
234</style>
235</head>
236<body>
237<nav id="bar">{buttons}</nav>
238<iframe id="agg" src="{initial_src_attr}"></iframe>
239</body>
240</html>
241"#
242    )
243}
244
245fn html_escape(s: &str) -> String {
246    let mut out = String::with_capacity(s.len());
247    for c in s.chars() {
248        match c {
249            '<' => out.push_str("&lt;"),
250            '>' => out.push_str("&gt;"),
251            '&' => out.push_str("&amp;"),
252            '"' => out.push_str("&quot;"),
253            '\'' => out.push_str("&#39;"),
254            _ => out.push(c),
255        }
256    }
257    out
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use std::io::Read as _;
264    use std::sync::Arc;
265    use std::thread;
266    use std::time::Duration;
267
268    fn make_server() -> BrokerHttpServer {
269        let reg = Arc::new(HttpEndpointRegistry::new());
270        reg.track("zccache".to_string());
271        reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
272        BrokerHttpServer::bind(BrokerHttpPort::Dynamic, reg).expect("dynamic bind succeeds")
273    }
274
275    #[test]
276    fn dynamic_bind_yields_nonzero_port() {
277        let s = make_server();
278        let addr = s.local_addr();
279        assert_ne!(addr.port(), 0, "OS should have assigned a real port");
280    }
281
282    #[test]
283    fn placeholder_renders_registered_backends() {
284        let s = make_server();
285        let local = s.local_addr();
286        let handle = thread::spawn(move || {
287            s.serve_once().expect("serve_once succeeds");
288        });
289        // Hit the server with a minimal HTTP GET.
290        let mut client = TcpStream::connect(local).expect("connect");
291        client
292            .write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
293            .expect("write request");
294        client
295            .set_read_timeout(Some(Duration::from_secs(2)))
296            .expect("set_read_timeout");
297        let mut buf = String::new();
298        client.read_to_string(&mut buf).expect("read response");
299
300        assert!(
301            buf.contains("200 OK"),
302            "expected 200 OK in response, got:\n{buf}"
303        );
304        assert!(
305            buf.contains("text/html"),
306            "expected HTML content-type, got:\n{buf}"
307        );
308        assert!(
309            buf.contains("<iframe id=\"agg\""),
310            "expected aggregator iframe element, got:\n{buf}"
311        );
312        assert!(
313            buf.contains("http://127.0.0.1:8002/"),
314            "expected fbuild URL wired into selector, got:\n{buf}"
315        );
316        assert!(
317            buf.contains("zccache (starting"),
318            "expected zccache pending-state button, got:\n{buf}"
319        );
320        assert!(
321            buf.contains("src=\"http://127.0.0.1:8002/\""),
322            "expected fbuild URL as initial iframe src, got:\n{buf}"
323        );
324
325        handle.join().expect("server thread joins");
326    }
327
328    #[test]
329    fn aggregator_page_with_no_backends_shows_empty_state() {
330        let reg = Arc::new(HttpEndpointRegistry::new());
331        let s = BrokerHttpServer::bind(BrokerHttpPort::Dynamic, reg)
332            .expect("dynamic bind succeeds");
333        let local = s.local_addr();
334        let handle = thread::spawn(move || {
335            s.serve_once().expect("serve_once succeeds");
336        });
337        let mut client = TcpStream::connect(local).expect("connect");
338        client
339            .write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
340            .expect("write request");
341        client
342            .set_read_timeout(Some(Duration::from_secs(2)))
343            .expect("set_read_timeout");
344        let mut buf = String::new();
345        client.read_to_string(&mut buf).expect("read response");
346
347        assert!(buf.contains("no backends registered yet"), "got:\n{buf}");
348        assert!(
349            buf.contains("src=\"about:blank\""),
350            "empty selector should default the iframe to about:blank, got:\n{buf}"
351        );
352        handle.join().expect("server thread joins");
353    }
354
355    #[test]
356    fn static_or_fallback_falls_back_on_eaddrinuse() {
357        // Bind a sacrificial listener to force EADDRINUSE on its port.
358        let blocker = TcpListener::bind("127.0.0.1:0").expect("blocker bind");
359        let preferred = blocker.local_addr().expect("blocker addr").port();
360
361        let reg = Arc::new(HttpEndpointRegistry::new());
362        let s = BrokerHttpServer::bind(BrokerHttpPort::StaticOrFallback { preferred }, reg)
363            .expect("StaticOrFallback should fall back to OS-allocated");
364        let fallback_port = s.local_addr().port();
365        assert_ne!(
366            fallback_port, preferred,
367            "StaticOrFallback should have picked a different port"
368        );
369        assert_ne!(fallback_port, 0, "OS should have assigned a real port");
370        drop(blocker);
371    }
372}