Skip to main content

running_process/broker/
get_http_endpoint_dispatch.rs

1//! `GetBrokerHttpEndpoint` RPC dispatch (slice 6 of #488).
2//!
3//! The CLI calls `GetBrokerHttpEndpoint` over the v2 broker control
4//! channel to discover the broker's HTTP endpoint (per #483 §4 — the
5//! single discovery surface). This module implements the broker side
6//! of that RPC: given the broker's currently-resolved HTTP port + its
7//! own pid, build a `GetBrokerHttpEndpointResponse` and serialize it.
8//!
9//! The real plumbing (read incoming frame → dispatch on payload type →
10//! write response frame) lives in the broker's connection loop, which
11//! is filled in by later slices. This slice exposes the typed
12//! request/response handler so subsequent slices have a pinned API to
13//! call.
14
15use prost::Message;
16
17use crate::broker::protocol_v2::{
18    GetBrokerHttpEndpointRequest, GetBrokerHttpEndpointResponse,
19};
20
21/// In-broker resolved HTTP endpoint state (set at boot per #483 §3 via
22/// `BrokerHttpPort::resolve(config, env)`).
23#[derive(Debug, Clone, Copy)]
24pub struct BrokerHttpEndpoint {
25    /// The port the broker's own HTTP server bound. Slice 7 actually
26    /// binds it; before then the broker can stub this to its
27    /// configured-static port for early consumer testing.
28    pub port: u16,
29    /// The broker's process id. Used by consumers to disambiguate a
30    /// fresh response from a stale one mid-restart (#483 §4 rationale).
31    pub pid: u32,
32}
33
34impl BrokerHttpEndpoint {
35    /// Build a `GetBrokerHttpEndpointResponse` carrying this endpoint.
36    pub fn to_response(self) -> GetBrokerHttpEndpointResponse {
37        GetBrokerHttpEndpointResponse {
38            port: self.port as u32,
39            pid: self.pid,
40        }
41    }
42}
43
44/// Errors from [`decode_request_and_dispatch`].
45#[derive(Debug, thiserror::Error)]
46pub enum GetHttpEndpointError {
47    /// The incoming frame body did not decode as `GetBrokerHttpEndpointRequest`.
48    #[error("decode GetBrokerHttpEndpointRequest: {0}")]
49    Decode(#[from] prost::DecodeError),
50
51    /// Encoding the response failed.
52    #[error("encode GetBrokerHttpEndpointResponse: {0}")]
53    Encode(#[from] prost::EncodeError),
54}
55
56/// Decode an incoming `GetBrokerHttpEndpointRequest` frame body and
57/// produce a serialized `GetBrokerHttpEndpointResponse` body the
58/// connection loop can write back via `protocol::write_frame`.
59///
60/// The request currently has no fields (`GetBrokerHttpEndpointRequest`
61/// is an empty marker per #483 §4) — decoding is purely validation
62/// that the peer sent a structurally well-formed proto message of the
63/// expected type.
64pub fn decode_request_and_dispatch(
65    request_body: &[u8],
66    endpoint: BrokerHttpEndpoint,
67) -> Result<Vec<u8>, GetHttpEndpointError> {
68    let _request = GetBrokerHttpEndpointRequest::decode(request_body)?;
69    let response = endpoint.to_response();
70    let mut body = Vec::with_capacity(response.encoded_len());
71    response.encode(&mut body)?;
72    Ok(body)
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn to_response_carries_port_and_pid() {
81        let resp = BrokerHttpEndpoint {
82            port: 8765,
83            pid: 12_345,
84        }
85        .to_response();
86        assert_eq!(resp.port, 8765);
87        assert_eq!(resp.pid, 12_345);
88    }
89
90    #[test]
91    fn dispatch_round_trip_with_empty_request() {
92        let req = GetBrokerHttpEndpointRequest::default();
93        let mut body = Vec::with_capacity(req.encoded_len());
94        req.encode(&mut body).expect("encode request");
95
96        let resp_body = decode_request_and_dispatch(
97            &body,
98            BrokerHttpEndpoint {
99                port: 4242,
100                pid: 99_999,
101            },
102        )
103        .expect("dispatch succeeds");
104
105        let resp =
106            GetBrokerHttpEndpointResponse::decode(resp_body.as_slice()).expect("decode response");
107        assert_eq!(resp.port, 4242);
108        assert_eq!(resp.pid, 99_999);
109    }
110
111    #[test]
112    fn dispatch_rejects_malformed_request_body() {
113        let err = decode_request_and_dispatch(
114            &[0xFF; 4],
115            BrokerHttpEndpoint {
116                port: 4242,
117                pid: 99_999,
118            },
119        )
120        .expect_err("malformed request body should be rejected");
121        match err {
122            GetHttpEndpointError::Decode(_) => {}
123            other => panic!("expected Decode error, got: {other:?}"),
124        }
125    }
126}