1use std::io::{BufRead, BufReader, Read, Write};
18use std::net::{SocketAddr, TcpListener, TcpStream};
19use std::sync::Arc;
20
21use crate::broker::broker_http_port::{BrokerHttpPort, ResolvedHttpBind};
22use crate::broker::http_endpoint_registry::HttpEndpointRegistry;
23
24#[derive(Debug, thiserror::Error)]
26pub enum BrokerHttpServerError {
27 #[error("bind {addr}:{port} failed: {source}")]
29 Bind {
30 addr: std::net::IpAddr,
32 port: u16,
34 #[source]
36 source: std::io::Error,
37 },
38}
39
40pub struct BrokerHttpServer {
43 listener: TcpListener,
44 local: SocketAddr,
45 registry: Arc<HttpEndpointRegistry>,
46}
47
48impl BrokerHttpServer {
49 pub fn bind(
58 config: BrokerHttpPort,
59 registry: Arc<HttpEndpointRegistry>,
60 ) -> Result<Self, BrokerHttpServerError> {
61 let resolved = BrokerHttpPort::resolve(config);
62 let listener = match resolved.port {
63 BrokerHttpPort::Static { port } => try_bind(resolved, port)?,
64 BrokerHttpPort::Dynamic => try_bind(resolved, 0)?,
65 BrokerHttpPort::StaticOrFallback { preferred } => {
66 match try_bind(resolved, preferred) {
67 Ok(l) => l,
68 Err(BrokerHttpServerError::Bind { source, .. })
69 if source.kind() == std::io::ErrorKind::AddrInUse =>
70 {
71 try_bind(resolved, 0)?
72 }
73 Err(other) => return Err(other),
74 }
75 }
76 };
77 let local = listener.local_addr().map_err(|source| {
78 BrokerHttpServerError::Bind {
79 addr: resolved.addr,
80 port: 0,
81 source,
82 }
83 })?;
84 Ok(Self {
85 listener,
86 local,
87 registry,
88 })
89 }
90
91 pub fn local_addr(&self) -> SocketAddr {
95 self.local
96 }
97
98 pub fn serve_once(&self) -> std::io::Result<()> {
101 let (stream, _peer) = self.listener.accept()?;
102 handle_one(stream, &self.registry)
103 }
104}
105
106fn try_bind(
107 resolved: ResolvedHttpBind,
108 port: u16,
109) -> Result<TcpListener, BrokerHttpServerError> {
110 let bind_addr = SocketAddr::new(resolved.addr, port);
111 TcpListener::bind(bind_addr).map_err(|source| BrokerHttpServerError::Bind {
112 addr: resolved.addr,
113 port,
114 source,
115 })
116}
117
118fn handle_one(mut stream: TcpStream, registry: &HttpEndpointRegistry) -> std::io::Result<()> {
119 let mut reader = BufReader::new(stream.try_clone()?);
122 let mut request_line = String::new();
123 let _ = reader.read_line(&mut request_line);
124 let mut headers_done = false;
125 while !headers_done {
126 let mut buf = [0u8; 1];
127 if reader.read(&mut buf)? == 0 {
128 break;
129 }
130 if buf[0] == b'\r' {
131 let mut peek = [0u8; 3];
132 let n = reader.read(&mut peek)?;
133 if n >= 3 && peek == [b'\n', b'\r', b'\n'] {
134 headers_done = true;
135 }
136 }
137 }
140
141 let path = request_line
142 .split_whitespace()
143 .nth(1)
144 .unwrap_or("/")
145 .to_string();
146
147 let (status_line, content_type, body) = if request_line.starts_with("GET ")
148 && (path == "/" || path.is_empty())
149 {
150 (
151 "HTTP/1.1 200 OK",
152 "text/html; charset=utf-8",
153 render_aggregator_page(registry),
154 )
155 } else {
156 (
157 "HTTP/1.1 404 Not Found",
158 "text/plain; charset=utf-8",
159 "not found\n".to_string(),
160 )
161 };
162
163 let response = format!(
164 "{status_line}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
165 body.len(),
166 body,
167 );
168 stream.write_all(response.as_bytes())?;
169 stream.flush()?;
170 Ok(())
171}
172
173fn render_aggregator_page(registry: &HttpEndpointRegistry) -> String {
184 let mut snap = registry.snapshot();
185 snap.sort_by(|a, b| a.0.cmp(&b.0));
186
187 let mut buttons = String::new();
188 let mut initial_src = String::new();
189 if snap.is_empty() {
190 buttons.push_str(
191 r#"<span class="empty">no backends registered yet</span>"#,
192 );
193 } else {
194 for (id, port) in &snap {
195 match port {
196 Some(p) => {
197 let url = format!("http://127.0.0.1:{p}/");
198 if initial_src.is_empty() {
199 initial_src.clone_from(&url);
200 }
201 buttons.push_str(&format!(
202 r#"<button onclick="document.getElementById('agg').src={url:?}">{}</button>"#,
203 html_escape(id),
204 ));
205 }
206 None => {
207 buttons.push_str(&format!(
208 r#"<button disabled title="backend has not reported a port yet">{} (starting…)</button>"#,
209 html_escape(id),
210 ));
211 }
212 }
213 }
214 }
215
216 let initial_src_attr = if initial_src.is_empty() {
217 "about:blank".to_string()
218 } else {
219 initial_src
220 };
221
222 format!(
223 r#"<!doctype html>
224<html lang="en">
225<head>
226<meta charset="utf-8">
227<title>running-process broker-v2 aggregator</title>
228<style>
229 html, body {{ margin: 0; padding: 0; height: 100%; font-family: system-ui, sans-serif; }}
230 #bar {{ display: flex; gap: 0.4rem; padding: 0.4rem; border-bottom: 1px solid #ccc; background: #f5f5f5; }}
231 #bar button {{ padding: 0.3rem 0.8rem; }}
232 #agg {{ width: 100%; height: calc(100% - 3rem); border: 0; }}
233 .empty {{ color: #888; font-style: italic; }}
234</style>
235</head>
236<body>
237<nav id="bar">{buttons}</nav>
238<iframe id="agg" src="{initial_src_attr}"></iframe>
239</body>
240</html>
241"#
242 )
243}
244
245fn html_escape(s: &str) -> String {
246 let mut out = String::with_capacity(s.len());
247 for c in s.chars() {
248 match c {
249 '<' => out.push_str("<"),
250 '>' => out.push_str(">"),
251 '&' => out.push_str("&"),
252 '"' => out.push_str("""),
253 '\'' => out.push_str("'"),
254 _ => out.push(c),
255 }
256 }
257 out
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use std::io::Read as _;
264 use std::sync::Arc;
265 use std::thread;
266 use std::time::Duration;
267
268 fn make_server() -> BrokerHttpServer {
269 let reg = Arc::new(HttpEndpointRegistry::new());
270 reg.track("zccache".to_string());
271 reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
272 BrokerHttpServer::bind(BrokerHttpPort::Dynamic, reg).expect("dynamic bind succeeds")
273 }
274
275 #[test]
276 fn dynamic_bind_yields_nonzero_port() {
277 let s = make_server();
278 let addr = s.local_addr();
279 assert_ne!(addr.port(), 0, "OS should have assigned a real port");
280 }
281
282 #[test]
283 fn placeholder_renders_registered_backends() {
284 let s = make_server();
285 let local = s.local_addr();
286 let handle = thread::spawn(move || {
287 s.serve_once().expect("serve_once succeeds");
288 });
289 let mut client = TcpStream::connect(local).expect("connect");
291 client
292 .write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
293 .expect("write request");
294 client
295 .set_read_timeout(Some(Duration::from_secs(2)))
296 .expect("set_read_timeout");
297 let mut buf = String::new();
298 client.read_to_string(&mut buf).expect("read response");
299
300 assert!(
301 buf.contains("200 OK"),
302 "expected 200 OK in response, got:\n{buf}"
303 );
304 assert!(
305 buf.contains("text/html"),
306 "expected HTML content-type, got:\n{buf}"
307 );
308 assert!(
309 buf.contains("<iframe id=\"agg\""),
310 "expected aggregator iframe element, got:\n{buf}"
311 );
312 assert!(
313 buf.contains("http://127.0.0.1:8002/"),
314 "expected fbuild URL wired into selector, got:\n{buf}"
315 );
316 assert!(
317 buf.contains("zccache (starting"),
318 "expected zccache pending-state button, got:\n{buf}"
319 );
320 assert!(
321 buf.contains("src=\"http://127.0.0.1:8002/\""),
322 "expected fbuild URL as initial iframe src, got:\n{buf}"
323 );
324
325 handle.join().expect("server thread joins");
326 }
327
328 #[test]
329 fn aggregator_page_with_no_backends_shows_empty_state() {
330 let reg = Arc::new(HttpEndpointRegistry::new());
331 let s = BrokerHttpServer::bind(BrokerHttpPort::Dynamic, reg)
332 .expect("dynamic bind succeeds");
333 let local = s.local_addr();
334 let handle = thread::spawn(move || {
335 s.serve_once().expect("serve_once succeeds");
336 });
337 let mut client = TcpStream::connect(local).expect("connect");
338 client
339 .write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
340 .expect("write request");
341 client
342 .set_read_timeout(Some(Duration::from_secs(2)))
343 .expect("set_read_timeout");
344 let mut buf = String::new();
345 client.read_to_string(&mut buf).expect("read response");
346
347 assert!(buf.contains("no backends registered yet"), "got:\n{buf}");
348 assert!(
349 buf.contains("src=\"about:blank\""),
350 "empty selector should default the iframe to about:blank, got:\n{buf}"
351 );
352 handle.join().expect("server thread joins");
353 }
354
355 #[test]
356 fn static_or_fallback_falls_back_on_eaddrinuse() {
357 let blocker = TcpListener::bind("127.0.0.1:0").expect("blocker bind");
359 let preferred = blocker.local_addr().expect("blocker addr").port();
360
361 let reg = Arc::new(HttpEndpointRegistry::new());
362 let s = BrokerHttpServer::bind(BrokerHttpPort::StaticOrFallback { preferred }, reg)
363 .expect("StaticOrFallback should fall back to OS-allocated");
364 let fallback_port = s.local_addr().port();
365 assert_ne!(
366 fallback_port, preferred,
367 "StaticOrFallback should have picked a different port"
368 );
369 assert_ne!(fallback_port, 0, "OS should have assigned a real port");
370 drop(blocker);
371 }
372}