Skip to main content

running_process/broker/
http_endpoint_registry.rs

1//! Per-backend HTTP endpoint registry for the v2 broker (slice 5 of #488).
2//!
3//! Stores `BackendId → Option<u16>` so the v2 broker knows which port
4//! each registered backend's HTTP server (if any) is listening on.
5//! Plumbed by the broker↔daemon control plane: when a daemon emits a
6//! `BackendHttpReady` frame, the broker decodes it via
7//! `decode_and_register` and stores the port against the backend id
8//! it tracks.
9//!
10//! No HTTP server lives here. That arrives in slice 7. This slice is
11//! purely the registry + frame plumbing — the state every subsequent
12//! HTTP-related slice needs to read from.
13
14use std::collections::HashMap;
15use std::sync::Mutex;
16
17use prost::Message;
18
19use crate::broker::protocol_v2::BackendHttpReady;
20
21/// Identifier for a backend the broker is tracking. The v2 broker uses
22/// a `String` for transparency at this slice; later slices may swap to
23/// a typed wrapper as the registry grows companion fields.
24pub type BackendId = String;
25
26/// Thread-safe map of `BackendId → Option<u16>` per the design in #483 §2.
27///
28/// `None` means the backend exists but has not yet reported a port
29/// (its HTTP server hasn't bound). `Some(port)` means it has, and the
30/// aggregator iframe can resolve a URL.
31#[derive(Debug, Default)]
32pub struct HttpEndpointRegistry {
33    inner: Mutex<HashMap<BackendId, Option<u16>>>,
34}
35
36impl HttpEndpointRegistry {
37    /// Create an empty registry.
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// Mark `backend_id` as tracked with no port yet.
43    pub fn track(&self, backend_id: BackendId) {
44        let mut map = self.inner.lock().expect("registry mutex poisoned");
45        map.entry(backend_id).or_insert(None);
46    }
47
48    /// Record that `backend_id`'s HTTP server has bound `port`.
49    ///
50    /// Inserts the backend if it wasn't already tracked. Returns the
51    /// previous port for that backend if any.
52    pub fn register_backend_http_endpoint(
53        &self,
54        backend_id: BackendId,
55        port: u16,
56    ) -> Option<u16> {
57        let mut map = self.inner.lock().expect("registry mutex poisoned");
58        map.insert(backend_id, Some(port)).flatten()
59    }
60
61    /// Look up the port for `backend_id`, if any.
62    ///
63    /// Returns `None` both when the backend is untracked AND when it
64    /// is tracked but hasn't reported a port yet — the aggregator
65    /// uses the broader `state()` API below when it needs the distinction.
66    pub fn lookup(&self, backend_id: &str) -> Option<u16> {
67        let map = self.inner.lock().expect("registry mutex poisoned");
68        map.get(backend_id).copied().flatten()
69    }
70
71    /// Get the current state for `backend_id`.
72    ///
73    /// `Some(Some(port))` = registered + bound. `Some(None)` = tracked
74    /// but starting. `None` = untracked.
75    pub fn state(&self, backend_id: &str) -> Option<Option<u16>> {
76        let map = self.inner.lock().expect("registry mutex poisoned");
77        map.get(backend_id).copied()
78    }
79
80    /// Snapshot of all currently-tracked backends and their state.
81    /// Used by the aggregator selector and by tests.
82    pub fn snapshot(&self) -> Vec<(BackendId, Option<u16>)> {
83        let map = self.inner.lock().expect("registry mutex poisoned");
84        map.iter().map(|(k, v)| (k.clone(), *v)).collect()
85    }
86}
87
88/// Errors raised by [`decode_and_register`].
89#[derive(Debug, thiserror::Error)]
90pub enum BackendHttpReadyError {
91    /// The incoming bytes did not decode as a `BackendHttpReady`.
92    #[error("decode BackendHttpReady: {0}")]
93    Decode(#[from] prost::DecodeError),
94
95    /// The decoded `port` did not fit in a `u16` (i.e. > 65535).
96    #[error("BackendHttpReady.port = {0} is out of u16 range")]
97    PortOutOfRange(u32),
98}
99
100/// Decode a `BackendHttpReady` frame body and register the port against
101/// `backend_id` in `registry`.
102///
103/// `frame_body` is the prost-encoded message bytes (the body inside the
104/// envelope produced by `protocol::write_frame`). Returns the registered
105/// port on success.
106pub fn decode_and_register(
107    registry: &HttpEndpointRegistry,
108    backend_id: BackendId,
109    frame_body: &[u8],
110) -> Result<u16, BackendHttpReadyError> {
111    let ready = BackendHttpReady::decode(frame_body)?;
112    let port: u16 = ready
113        .port
114        .try_into()
115        .map_err(|_| BackendHttpReadyError::PortOutOfRange(ready.port))?;
116    registry.register_backend_http_endpoint(backend_id, port);
117    Ok(port)
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn empty_registry_has_no_state_for_unknown_backend() {
126        let reg = HttpEndpointRegistry::new();
127        assert!(reg.state("zccache").is_none());
128        assert!(reg.lookup("zccache").is_none());
129    }
130
131    #[test]
132    fn track_then_lookup_returns_none_for_pending_port() {
133        let reg = HttpEndpointRegistry::new();
134        reg.track("zccache".to_string());
135        // Tracked but no port yet — lookup still returns None.
136        assert!(reg.lookup("zccache").is_none());
137        // But state() distinguishes tracked-no-port from untracked.
138        assert_eq!(reg.state("zccache"), Some(None));
139    }
140
141    #[test]
142    fn register_endpoint_makes_port_available() {
143        let reg = HttpEndpointRegistry::new();
144        reg.track("zccache".to_string());
145        let prev = reg.register_backend_http_endpoint("zccache".to_string(), 8765);
146        assert_eq!(prev, None);
147        assert_eq!(reg.lookup("zccache"), Some(8765));
148        assert_eq!(reg.state("zccache"), Some(Some(8765)));
149    }
150
151    #[test]
152    fn register_endpoint_updates_existing_port_and_returns_previous() {
153        let reg = HttpEndpointRegistry::new();
154        reg.register_backend_http_endpoint("fbuild".to_string(), 8001);
155        let prev = reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
156        assert_eq!(prev, Some(8001));
157        assert_eq!(reg.lookup("fbuild"), Some(8002));
158    }
159
160    #[test]
161    fn snapshot_reflects_all_tracked_backends() {
162        let reg = HttpEndpointRegistry::new();
163        reg.track("zccache".to_string());
164        reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
165
166        let mut snap = reg.snapshot();
167        snap.sort();
168        assert_eq!(
169            snap,
170            vec![("fbuild".to_string(), Some(8002)), ("zccache".to_string(), None)]
171        );
172    }
173
174    #[test]
175    fn decode_and_register_happy_path() {
176        let reg = HttpEndpointRegistry::new();
177        let msg = BackendHttpReady { port: 49_152 };
178        let mut body = Vec::with_capacity(msg.encoded_len());
179        msg.encode(&mut body).expect("encode BackendHttpReady");
180
181        let port = decode_and_register(&reg, "zccache".to_string(), &body)
182            .expect("decode_and_register succeeds");
183        assert_eq!(port, 49_152);
184        assert_eq!(reg.lookup("zccache"), Some(49_152));
185    }
186
187    #[test]
188    fn decode_and_register_rejects_oversized_port() {
189        let reg = HttpEndpointRegistry::new();
190        // Encode a BackendHttpReady carrying a port that overflows u16.
191        let msg = BackendHttpReady { port: 70_000 };
192        let mut body = Vec::with_capacity(msg.encoded_len());
193        msg.encode(&mut body).expect("encode BackendHttpReady");
194
195        let err = decode_and_register(&reg, "zccache".to_string(), &body)
196            .expect_err("port=70000 should be rejected");
197        match err {
198            BackendHttpReadyError::PortOutOfRange(70_000) => {}
199            other => panic!("expected PortOutOfRange(70000), got: {other:?}"),
200        }
201        // Registry untouched.
202        assert!(reg.lookup("zccache").is_none());
203    }
204
205    #[test]
206    fn decode_and_register_rejects_malformed_frame() {
207        let reg = HttpEndpointRegistry::new();
208        // 0xFF is not a valid wire-type byte at start of a proto message.
209        let err = decode_and_register(&reg, "zccache".to_string(), &[0xFF; 8])
210            .expect_err("malformed frame should be rejected");
211        match err {
212            BackendHttpReadyError::Decode(_) => {}
213            other => panic!("expected Decode, got: {other:?}"),
214        }
215        assert!(reg.lookup("zccache").is_none());
216    }
217}