Skip to main content

running_process/broker/server/
control_socket.rs

1//! Shared broker control socket dispatch for Hello and admin frames.
2//!
3//! The v1 broker uses one local socket for both client Hello negotiation and
4//! admin verbs. This module keeps the bounded synchronous serve helpers aligned
5//! with that contract while the long-lived daemon loop is still being built.
6
7use std::io::{Read, Write};
8use std::num::NonZeroUsize;
9
10use interprocess::local_socket::traits::Listener;
11use prost::Message;
12
13use crate::broker::protocol::{
14    read_frame, write_frame, AdminReply, ErrorCode, Frame, FramingError, HelloReply,
15    MAX_HELLO_BYTES,
16};
17
18use super::admin::{handle_admin_frame, AdminFrameError, AdminSnapshot, ADMIN_PAYLOAD_PROTOCOL};
19use super::connection::{
20    bind_local_socket, peer_identity_from_stream, refused_reply, reply_for_framing_error,
21    write_response_frame, BrokerConnectionError, HelloResponder, LocalSocketCleanup,
22    PeerCredentialPolicy,
23};
24use super::hello_handler::PeerIdentity;
25
26/// Result of handling one control socket connection.
27#[derive(Clone, Debug, PartialEq)]
28pub enum ControlSocketReply {
29    /// Peer was rejected by credential policy before any bytes were read.
30    DroppedPeer,
31    /// The connection was handled as a Hello exchange.
32    Hello(HelloReply),
33    /// The connection was handled as an admin request.
34    Admin(AdminReply),
35}
36
37/// Connection limit for a broker control-socket accept loop.
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
39pub enum ControlSocketConnectionLimit {
40    /// Accept exactly this many connections, then return.
41    Bounded(NonZeroUsize),
42    /// Continue accepting until the process exits or binding/accepting fails.
43    Unbounded,
44}
45
46impl ControlSocketConnectionLimit {
47    fn should_continue(self, accepted: usize) -> bool {
48        match self {
49            Self::Bounded(limit) => accepted < limit.get(),
50            Self::Unbounded => true,
51        }
52    }
53}
54
55/// Handle one already-accepted broker control connection.
56pub fn handle_control_connection_with_peer_policy<S, R, F>(
57    stream: &mut S,
58    hello_responder: &R,
59    snapshot_provider: &F,
60    peer: PeerIdentity,
61    peer_policy: &PeerCredentialPolicy,
62) -> Result<ControlSocketReply, ControlSocketError>
63where
64    S: Read + Write,
65    R: HelloResponder + ?Sized,
66    F: Fn() -> AdminSnapshot + ?Sized,
67{
68    if !peer_policy.allows(&peer) {
69        return Ok(ControlSocketReply::DroppedPeer);
70    }
71
72    let request_bytes = match read_frame(stream) {
73        Ok(bytes) => bytes,
74        Err(err) => {
75            let reply = reply_for_framing_error(&err);
76            write_response_frame(stream, None, &reply)?;
77            return Ok(ControlSocketReply::Hello(reply));
78        }
79    };
80
81    let request_frame = match Frame::decode(request_bytes.as_slice()) {
82        Ok(frame) => frame,
83        Err(_) => {
84            let reply = refused_reply(ErrorCode::ErrorPeerRejected, "malformed broker Frame", 0);
85            write_response_frame(stream, None, &reply)?;
86            return Ok(ControlSocketReply::Hello(reply));
87        }
88    };
89
90    if request_frame.payload_protocol == ADMIN_PAYLOAD_PROTOCOL {
91        let snapshot = snapshot_provider();
92        let response_frame = handle_admin_frame(request_frame, &snapshot)?;
93        let reply = write_admin_response_frame(stream, &response_frame)?;
94        return Ok(ControlSocketReply::Admin(reply));
95    }
96
97    let reply = if request_bytes.len() > MAX_HELLO_BYTES {
98        refused_reply(
99            ErrorCode::ErrorPeerRejected,
100            "initial Hello frame exceeds 64 KiB",
101            0,
102        )
103    } else {
104        hello_responder.handle_frame(request_frame.clone(), peer)
105    };
106    write_response_frame(stream, Some(&request_frame), &reply)?;
107    Ok(ControlSocketReply::Hello(reply))
108}
109
110/// Run a bounded local-socket accept loop that dispatches Hello and admin
111/// frames on the same endpoint.
112pub fn serve_control_socket_connections_with_policy<R, F>(
113    socket_path: &str,
114    hello_responder: &R,
115    snapshot_provider: F,
116    connection_count: usize,
117    peer_policy: &PeerCredentialPolicy,
118) -> Result<(), ControlSocketError>
119where
120    R: HelloResponder + ?Sized,
121    F: Fn() -> AdminSnapshot,
122{
123    let Some(connection_count) = NonZeroUsize::new(connection_count) else {
124        return Ok(());
125    };
126
127    serve_control_socket_connections_with_limit_and_policy(
128        socket_path,
129        hello_responder,
130        snapshot_provider,
131        ControlSocketConnectionLimit::Bounded(connection_count),
132        peer_policy,
133    )
134}
135
136/// Run a broker control-socket accept loop that dispatches Hello and admin
137/// frames on the same endpoint.
138pub fn serve_control_socket_connections_with_limit_and_policy<R, F>(
139    socket_path: &str,
140    hello_responder: &R,
141    snapshot_provider: F,
142    connection_limit: ControlSocketConnectionLimit,
143    peer_policy: &PeerCredentialPolicy,
144) -> Result<(), ControlSocketError>
145where
146    R: HelloResponder + ?Sized,
147    F: Fn() -> AdminSnapshot,
148{
149    let listener = bind_local_socket(socket_path)?;
150    let cleanup = LocalSocketCleanup(socket_path);
151    let result = (|| {
152        let mut accepted = 0;
153        while connection_limit.should_continue(accepted) {
154            let mut stream = listener.accept().map_err(BrokerConnectionError::Io)?;
155            accepted += 1;
156            let peer = peer_identity_from_stream(&stream)?;
157            let _reply = handle_control_connection_with_peer_policy(
158                &mut stream,
159                hello_responder,
160                &snapshot_provider,
161                peer,
162                peer_policy,
163            )?;
164        }
165        Ok(())
166    })();
167    drop(listener);
168    drop(cleanup);
169    result
170}
171
172fn write_admin_response_frame<W: Write>(
173    writer: &mut W,
174    response_frame: &Frame,
175) -> Result<AdminReply, ControlSocketError> {
176    let mut response_bytes = Vec::new();
177    response_frame
178        .encode(&mut response_bytes)
179        .map_err(ControlSocketError::EncodeFrame)?;
180    write_frame(writer, &response_bytes)?;
181    AdminReply::decode(response_frame.payload.as_slice())
182        .map_err(ControlSocketError::DecodeAdminReply)
183}
184
185/// Errors raised while dispatching a shared broker control socket frame.
186#[derive(Debug, thiserror::Error)]
187pub enum ControlSocketError {
188    /// Hello/local-socket connection handling failed.
189    #[error(transparent)]
190    Connection(#[from] BrokerConnectionError),
191    /// Frame read/write failed.
192    #[error(transparent)]
193    Framing(#[from] FramingError),
194    /// Admin frame validation or dispatch failed.
195    #[error(transparent)]
196    AdminFrame(#[from] AdminFrameError),
197    /// The response frame could not be encoded.
198    #[error("failed to encode broker control response Frame: {0}")]
199    EncodeFrame(prost::EncodeError),
200    /// The admin response payload could not be decoded after dispatch.
201    #[error("failed to decode admin reply payload: {0}")]
202    DecodeAdminReply(prost::DecodeError),
203}