running_process/broker/
http_endpoint_registry.rs1use std::collections::HashMap;
15use std::sync::Mutex;
16
17use prost::Message;
18
19use crate::broker::protocol_v2::BackendHttpReady;
20
21pub type BackendId = String;
25
26#[derive(Debug, Default)]
32pub struct HttpEndpointRegistry {
33 inner: Mutex<HashMap<BackendId, Option<u16>>>,
34}
35
36impl HttpEndpointRegistry {
37 pub fn new() -> Self {
39 Self::default()
40 }
41
42 pub fn track(&self, backend_id: BackendId) {
44 let mut map = self.inner.lock().expect("registry mutex poisoned");
45 map.entry(backend_id).or_insert(None);
46 }
47
48 pub fn register_backend_http_endpoint(
53 &self,
54 backend_id: BackendId,
55 port: u16,
56 ) -> Option<u16> {
57 let mut map = self.inner.lock().expect("registry mutex poisoned");
58 map.insert(backend_id, Some(port)).flatten()
59 }
60
61 pub fn lookup(&self, backend_id: &str) -> Option<u16> {
67 let map = self.inner.lock().expect("registry mutex poisoned");
68 map.get(backend_id).copied().flatten()
69 }
70
71 pub fn state(&self, backend_id: &str) -> Option<Option<u16>> {
76 let map = self.inner.lock().expect("registry mutex poisoned");
77 map.get(backend_id).copied()
78 }
79
80 pub fn snapshot(&self) -> Vec<(BackendId, Option<u16>)> {
83 let map = self.inner.lock().expect("registry mutex poisoned");
84 map.iter().map(|(k, v)| (k.clone(), *v)).collect()
85 }
86}
87
88#[derive(Debug, thiserror::Error)]
90pub enum BackendHttpReadyError {
91 #[error("decode BackendHttpReady: {0}")]
93 Decode(#[from] prost::DecodeError),
94
95 #[error("BackendHttpReady.port = {0} is out of u16 range")]
97 PortOutOfRange(u32),
98}
99
100pub fn decode_and_register(
107 registry: &HttpEndpointRegistry,
108 backend_id: BackendId,
109 frame_body: &[u8],
110) -> Result<u16, BackendHttpReadyError> {
111 let ready = BackendHttpReady::decode(frame_body)?;
112 let port: u16 = ready
113 .port
114 .try_into()
115 .map_err(|_| BackendHttpReadyError::PortOutOfRange(ready.port))?;
116 registry.register_backend_http_endpoint(backend_id, port);
117 Ok(port)
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn empty_registry_has_no_state_for_unknown_backend() {
126 let reg = HttpEndpointRegistry::new();
127 assert!(reg.state("zccache").is_none());
128 assert!(reg.lookup("zccache").is_none());
129 }
130
131 #[test]
132 fn track_then_lookup_returns_none_for_pending_port() {
133 let reg = HttpEndpointRegistry::new();
134 reg.track("zccache".to_string());
135 assert!(reg.lookup("zccache").is_none());
137 assert_eq!(reg.state("zccache"), Some(None));
139 }
140
141 #[test]
142 fn register_endpoint_makes_port_available() {
143 let reg = HttpEndpointRegistry::new();
144 reg.track("zccache".to_string());
145 let prev = reg.register_backend_http_endpoint("zccache".to_string(), 8765);
146 assert_eq!(prev, None);
147 assert_eq!(reg.lookup("zccache"), Some(8765));
148 assert_eq!(reg.state("zccache"), Some(Some(8765)));
149 }
150
151 #[test]
152 fn register_endpoint_updates_existing_port_and_returns_previous() {
153 let reg = HttpEndpointRegistry::new();
154 reg.register_backend_http_endpoint("fbuild".to_string(), 8001);
155 let prev = reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
156 assert_eq!(prev, Some(8001));
157 assert_eq!(reg.lookup("fbuild"), Some(8002));
158 }
159
160 #[test]
161 fn snapshot_reflects_all_tracked_backends() {
162 let reg = HttpEndpointRegistry::new();
163 reg.track("zccache".to_string());
164 reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
165
166 let mut snap = reg.snapshot();
167 snap.sort();
168 assert_eq!(
169 snap,
170 vec![("fbuild".to_string(), Some(8002)), ("zccache".to_string(), None)]
171 );
172 }
173
174 #[test]
175 fn decode_and_register_happy_path() {
176 let reg = HttpEndpointRegistry::new();
177 let msg = BackendHttpReady { port: 49_152 };
178 let mut body = Vec::with_capacity(msg.encoded_len());
179 msg.encode(&mut body).expect("encode BackendHttpReady");
180
181 let port = decode_and_register(®, "zccache".to_string(), &body)
182 .expect("decode_and_register succeeds");
183 assert_eq!(port, 49_152);
184 assert_eq!(reg.lookup("zccache"), Some(49_152));
185 }
186
187 #[test]
188 fn decode_and_register_rejects_oversized_port() {
189 let reg = HttpEndpointRegistry::new();
190 let msg = BackendHttpReady { port: 70_000 };
192 let mut body = Vec::with_capacity(msg.encoded_len());
193 msg.encode(&mut body).expect("encode BackendHttpReady");
194
195 let err = decode_and_register(®, "zccache".to_string(), &body)
196 .expect_err("port=70000 should be rejected");
197 match err {
198 BackendHttpReadyError::PortOutOfRange(70_000) => {}
199 other => panic!("expected PortOutOfRange(70000), got: {other:?}"),
200 }
201 assert!(reg.lookup("zccache").is_none());
203 }
204
205 #[test]
206 fn decode_and_register_rejects_malformed_frame() {
207 let reg = HttpEndpointRegistry::new();
208 let err = decode_and_register(®, "zccache".to_string(), &[0xFF; 8])
210 .expect_err("malformed frame should be rejected");
211 match err {
212 BackendHttpReadyError::Decode(_) => {}
213 other => panic!("expected Decode, got: {other:?}"),
214 }
215 assert!(reg.lookup("zccache").is_none());
216 }
217}