1use std::io::{BufRead, BufReader, Read, Write};
18use std::net::{SocketAddr, TcpListener, TcpStream};
19use std::sync::Arc;
20
21use crate::broker::broker_http_port::{BrokerHttpPort, ResolvedHttpBind};
22use crate::broker::http_endpoint_registry::HttpEndpointRegistry;
23
24#[derive(Debug, thiserror::Error)]
26pub enum BrokerHttpServerError {
27 #[error("bind {addr}:{port} failed: {source}")]
29 Bind {
30 addr: std::net::IpAddr,
32 port: u16,
34 #[source]
36 source: std::io::Error,
37 },
38}
39
40pub struct BrokerHttpServer {
43 listener: TcpListener,
44 local: SocketAddr,
45 registry: Arc<HttpEndpointRegistry>,
46}
47
48impl BrokerHttpServer {
49 pub fn bind(
58 config: BrokerHttpPort,
59 registry: Arc<HttpEndpointRegistry>,
60 ) -> Result<Self, BrokerHttpServerError> {
61 let resolved = BrokerHttpPort::resolve(config);
62 let listener = match resolved.port {
63 BrokerHttpPort::Static { port } => try_bind(resolved, port)?,
64 BrokerHttpPort::Dynamic => try_bind(resolved, 0)?,
65 BrokerHttpPort::StaticOrFallback { preferred } => {
66 match try_bind(resolved, preferred) {
67 Ok(l) => l,
68 Err(BrokerHttpServerError::Bind { source, .. })
69 if source.kind() == std::io::ErrorKind::AddrInUse =>
70 {
71 try_bind(resolved, 0)?
72 }
73 Err(other) => return Err(other),
74 }
75 }
76 };
77 let local = listener.local_addr().map_err(|source| {
78 BrokerHttpServerError::Bind {
79 addr: resolved.addr,
80 port: 0,
81 source,
82 }
83 })?;
84 Ok(Self {
85 listener,
86 local,
87 registry,
88 })
89 }
90
91 pub fn local_addr(&self) -> SocketAddr {
95 self.local
96 }
97
98 pub fn serve_once(&self) -> std::io::Result<()> {
101 let (stream, _peer) = self.listener.accept()?;
102 handle_one(stream, &self.registry)
103 }
104}
105
106fn try_bind(
107 resolved: ResolvedHttpBind,
108 port: u16,
109) -> Result<TcpListener, BrokerHttpServerError> {
110 let bind_addr = SocketAddr::new(resolved.addr, port);
111 TcpListener::bind(bind_addr).map_err(|source| BrokerHttpServerError::Bind {
112 addr: resolved.addr,
113 port,
114 source,
115 })
116}
117
118fn handle_one(mut stream: TcpStream, registry: &HttpEndpointRegistry) -> std::io::Result<()> {
119 let mut reader = BufReader::new(stream.try_clone()?);
122 let mut request_line = String::new();
123 let _ = reader.read_line(&mut request_line);
124 let mut headers_done = false;
125 while !headers_done {
126 let mut buf = [0u8; 1];
127 if reader.read(&mut buf)? == 0 {
128 break;
129 }
130 if buf[0] == b'\r' {
131 let mut peek = [0u8; 3];
132 let n = reader.read(&mut peek)?;
133 if n >= 3 && peek == [b'\n', b'\r', b'\n'] {
134 headers_done = true;
135 }
136 }
137 }
140
141 let path = request_line
142 .split_whitespace()
143 .nth(1)
144 .unwrap_or("/")
145 .to_string();
146
147 let (status_line, content_type, body) = if request_line.starts_with("GET ")
148 && (path == "/" || path.is_empty())
149 {
150 (
151 "HTTP/1.1 200 OK",
152 "text/html; charset=utf-8",
153 render_aggregator_page(registry),
154 )
155 } else {
156 (
157 "HTTP/1.1 404 Not Found",
158 "text/plain; charset=utf-8",
159 "not found\n".to_string(),
160 )
161 };
162
163 let response = format!(
164 "{status_line}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
165 body.len(),
166 body,
167 );
168 stream.write_all(response.as_bytes())?;
169 stream.flush()?;
170 Ok(())
171}
172
173fn render_aggregator_page(registry: &HttpEndpointRegistry) -> String {
184 let mut snap = registry.snapshot();
185 snap.sort_by(|a, b| a.0.cmp(&b.0));
186
187 let mut buttons = String::new();
188 let mut initial_src = String::new();
189 if snap.is_empty() {
190 buttons.push_str(
191 r#"<span class="empty">no backends registered yet</span>"#,
192 );
193 } else {
194 for (id, port) in &snap {
195 match port {
196 Some(p) => {
197 let url = format!("http://127.0.0.1:{p}/");
198 if initial_src.is_empty() {
199 initial_src.clone_from(&url);
200 }
201 buttons.push_str(&format!(
202 r#"<button onclick="document.getElementById('agg').src={url:?}">{}</button>"#,
203 html_escape(id),
204 ));
205 }
206 None => {
207 buttons.push_str(&format!(
208 r#"<button disabled title="backend has not reported a port yet">{} (starting…)</button>"#,
209 html_escape(id),
210 ));
211 }
212 }
213 }
214 }
215
216 let initial_src_attr = if initial_src.is_empty() {
217 "about:blank".to_string()
218 } else {
219 initial_src
220 };
221
222 format!(
223 r#"<!doctype html>
224<html lang="en">
225<head>
226<meta charset="utf-8">
227<title>running-process broker-v2 aggregator</title>
228<style>
229 html, body {{ margin: 0; padding: 0; height: 100%; font-family: system-ui, sans-serif; }}
230 #bar {{ display: flex; gap: 0.4rem; padding: 0.4rem; border-bottom: 1px solid #ccc; background: #f5f5f5; }}
231 #bar button {{ padding: 0.3rem 0.8rem; }}
232 #agg {{ width: 100%; height: calc(100% - 3rem); border: 0; }}
233 .empty {{ color: #888; font-style: italic; }}
234</style>
235</head>
236<body>
237<nav id="bar">{buttons}</nav>
238<iframe id="agg" src="{initial_src_attr}"></iframe>
239</body>
240</html>
241"#
242 )
243}
244
245fn html_escape(s: &str) -> String {
246 let mut out = String::with_capacity(s.len());
247 for c in s.chars() {
248 match c {
249 '<' => out.push_str("<"),
250 '>' => out.push_str(">"),
251 '&' => out.push_str("&"),
252 '"' => out.push_str("""),
253 '\'' => out.push_str("'"),
254 _ => out.push(c),
255 }
256 }
257 out
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use std::sync::Arc;
264 use std::thread;
265 use std::time::Duration;
266
267 fn make_server() -> BrokerHttpServer {
268 let reg = Arc::new(HttpEndpointRegistry::new());
269 reg.track("zccache".to_string());
270 reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
271 BrokerHttpServer::bind(BrokerHttpPort::Dynamic, reg).expect("dynamic bind succeeds")
272 }
273
274 #[test]
275 fn dynamic_bind_yields_nonzero_port() {
276 let s = make_server();
277 let addr = s.local_addr();
278 assert_ne!(addr.port(), 0, "OS should have assigned a real port");
279 }
280
281 #[test]
282 fn placeholder_renders_registered_backends() {
283 let s = make_server();
284 let local = s.local_addr();
285 let handle = thread::spawn(move || {
286 s.serve_once().expect("serve_once succeeds");
287 });
288 let mut client = TcpStream::connect(local).expect("connect");
290 client
291 .write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
292 .expect("write request");
293 client
294 .set_read_timeout(Some(Duration::from_secs(2)))
295 .expect("set_read_timeout");
296 let mut buf = String::new();
297 client.read_to_string(&mut buf).expect("read response");
298
299 assert!(
300 buf.contains("200 OK"),
301 "expected 200 OK in response, got:\n{buf}"
302 );
303 assert!(
304 buf.contains("text/html"),
305 "expected HTML content-type, got:\n{buf}"
306 );
307 assert!(
308 buf.contains("<iframe id=\"agg\""),
309 "expected aggregator iframe element, got:\n{buf}"
310 );
311 assert!(
312 buf.contains("http://127.0.0.1:8002/"),
313 "expected fbuild URL wired into selector, got:\n{buf}"
314 );
315 assert!(
316 buf.contains("zccache (starting"),
317 "expected zccache pending-state button, got:\n{buf}"
318 );
319 assert!(
320 buf.contains("src=\"http://127.0.0.1:8002/\""),
321 "expected fbuild URL as initial iframe src, got:\n{buf}"
322 );
323
324 handle.join().expect("server thread joins");
325 }
326
327 #[test]
328 fn aggregator_page_with_no_backends_shows_empty_state() {
329 let reg = Arc::new(HttpEndpointRegistry::new());
330 let s = BrokerHttpServer::bind(BrokerHttpPort::Dynamic, reg)
331 .expect("dynamic bind succeeds");
332 let local = s.local_addr();
333 let handle = thread::spawn(move || {
334 s.serve_once().expect("serve_once succeeds");
335 });
336 let mut client = TcpStream::connect(local).expect("connect");
337 client
338 .write_all(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n")
339 .expect("write request");
340 client
341 .set_read_timeout(Some(Duration::from_secs(2)))
342 .expect("set_read_timeout");
343 let mut buf = String::new();
344 client.read_to_string(&mut buf).expect("read response");
345
346 assert!(buf.contains("no backends registered yet"), "got:\n{buf}");
347 assert!(
348 buf.contains("src=\"about:blank\""),
349 "empty selector should default the iframe to about:blank, got:\n{buf}"
350 );
351 handle.join().expect("server thread joins");
352 }
353
354 #[test]
355 fn static_or_fallback_falls_back_on_eaddrinuse() {
356 let blocker = TcpListener::bind("127.0.0.1:0").expect("blocker bind");
358 let preferred = blocker.local_addr().expect("blocker addr").port();
359
360 let reg = Arc::new(HttpEndpointRegistry::new());
361 let s = BrokerHttpServer::bind(BrokerHttpPort::StaticOrFallback { preferred }, reg)
362 .expect("StaticOrFallback should fall back to OS-allocated");
363 let fallback_port = s.local_addr().port();
364 assert_ne!(
365 fallback_port, preferred,
366 "StaticOrFallback should have picked a different port"
367 );
368 assert_ne!(fallback_port, 0, "OS should have assigned a real port");
369 drop(blocker);
370 }
371}