use std::collections::HashMap;
use std::sync::Mutex;
use prost::Message;
use crate::broker::protocol_v2::BackendHttpReady;
pub type BackendId = String;
#[derive(Debug, Default)]
pub struct HttpEndpointRegistry {
inner: Mutex<HashMap<BackendId, Option<u16>>>,
}
impl HttpEndpointRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn track(&self, backend_id: BackendId) {
let mut map = self.inner.lock().expect("registry mutex poisoned");
map.entry(backend_id).or_insert(None);
}
pub fn register_backend_http_endpoint(
&self,
backend_id: BackendId,
port: u16,
) -> Option<u16> {
let mut map = self.inner.lock().expect("registry mutex poisoned");
map.insert(backend_id, Some(port)).flatten()
}
pub fn lookup(&self, backend_id: &str) -> Option<u16> {
let map = self.inner.lock().expect("registry mutex poisoned");
map.get(backend_id).copied().flatten()
}
pub fn state(&self, backend_id: &str) -> Option<Option<u16>> {
let map = self.inner.lock().expect("registry mutex poisoned");
map.get(backend_id).copied()
}
pub fn snapshot(&self) -> Vec<(BackendId, Option<u16>)> {
let map = self.inner.lock().expect("registry mutex poisoned");
map.iter().map(|(k, v)| (k.clone(), *v)).collect()
}
}
#[derive(Debug, thiserror::Error)]
pub enum BackendHttpReadyError {
#[error("decode BackendHttpReady: {0}")]
Decode(#[from] prost::DecodeError),
#[error("BackendHttpReady.port = {0} is out of u16 range")]
PortOutOfRange(u32),
}
pub fn decode_and_register(
registry: &HttpEndpointRegistry,
backend_id: BackendId,
frame_body: &[u8],
) -> Result<u16, BackendHttpReadyError> {
let ready = BackendHttpReady::decode(frame_body)?;
let port: u16 = ready
.port
.try_into()
.map_err(|_| BackendHttpReadyError::PortOutOfRange(ready.port))?;
registry.register_backend_http_endpoint(backend_id, port);
Ok(port)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_registry_has_no_state_for_unknown_backend() {
let reg = HttpEndpointRegistry::new();
assert!(reg.state("zccache").is_none());
assert!(reg.lookup("zccache").is_none());
}
#[test]
fn track_then_lookup_returns_none_for_pending_port() {
let reg = HttpEndpointRegistry::new();
reg.track("zccache".to_string());
assert!(reg.lookup("zccache").is_none());
assert_eq!(reg.state("zccache"), Some(None));
}
#[test]
fn register_endpoint_makes_port_available() {
let reg = HttpEndpointRegistry::new();
reg.track("zccache".to_string());
let prev = reg.register_backend_http_endpoint("zccache".to_string(), 8765);
assert_eq!(prev, None);
assert_eq!(reg.lookup("zccache"), Some(8765));
assert_eq!(reg.state("zccache"), Some(Some(8765)));
}
#[test]
fn register_endpoint_updates_existing_port_and_returns_previous() {
let reg = HttpEndpointRegistry::new();
reg.register_backend_http_endpoint("fbuild".to_string(), 8001);
let prev = reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
assert_eq!(prev, Some(8001));
assert_eq!(reg.lookup("fbuild"), Some(8002));
}
#[test]
fn snapshot_reflects_all_tracked_backends() {
let reg = HttpEndpointRegistry::new();
reg.track("zccache".to_string());
reg.register_backend_http_endpoint("fbuild".to_string(), 8002);
let mut snap = reg.snapshot();
snap.sort();
assert_eq!(
snap,
vec![("fbuild".to_string(), Some(8002)), ("zccache".to_string(), None)]
);
}
#[test]
fn decode_and_register_happy_path() {
let reg = HttpEndpointRegistry::new();
let msg = BackendHttpReady { port: 49_152 };
let mut body = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut body).expect("encode BackendHttpReady");
let port = decode_and_register(®, "zccache".to_string(), &body)
.expect("decode_and_register succeeds");
assert_eq!(port, 49_152);
assert_eq!(reg.lookup("zccache"), Some(49_152));
}
#[test]
fn decode_and_register_rejects_oversized_port() {
let reg = HttpEndpointRegistry::new();
let msg = BackendHttpReady { port: 70_000 };
let mut body = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut body).expect("encode BackendHttpReady");
let err = decode_and_register(®, "zccache".to_string(), &body)
.expect_err("port=70000 should be rejected");
match err {
BackendHttpReadyError::PortOutOfRange(70_000) => {}
other => panic!("expected PortOutOfRange(70000), got: {other:?}"),
}
assert!(reg.lookup("zccache").is_none());
}
#[test]
fn decode_and_register_rejects_malformed_frame() {
let reg = HttpEndpointRegistry::new();
let err = decode_and_register(®, "zccache".to_string(), &[0xFF; 8])
.expect_err("malformed frame should be rejected");
match err {
BackendHttpReadyError::Decode(_) => {}
other => panic!("expected Decode, got: {other:?}"),
}
assert!(reg.lookup("zccache").is_none());
}
}