Skip to main content

running_process/broker/server/
control_socket.rs

1//! Shared broker control socket dispatch for Hello and admin frames.
2//!
3//! The v1 broker uses one local socket for both client Hello negotiation and
4//! admin verbs. This module keeps the bounded synchronous serve helpers aligned
5//! with that contract while the long-lived daemon loop is still being built.
6
7use std::io::{Read, Write};
8use std::num::NonZeroUsize;
9
10use interprocess::local_socket::traits::Listener;
11use prost::Message;
12
13use crate::broker::protocol::{
14    read_frame, write_frame, AdminReply, ErrorCode, Frame, FramingError, HelloReply,
15    MAX_HELLO_BYTES,
16};
17
18use super::admin::{handle_admin_frame, AdminFrameError, AdminSnapshot, ADMIN_PAYLOAD_PROTOCOL};
19use super::connection::{
20    bind_local_socket, peer_identity_from_stream, refused_reply, reply_for_framing_error,
21    write_response_frame, BrokerConnectionError, HelloResponder, LocalSocketCleanup,
22    PeerCredentialPolicy,
23};
24use super::fd_pressure::{FdPressureDecision, FdPressureGuard};
25use super::hello_handler::PeerIdentity;
26
27/// Result of handling one control socket connection.
28#[derive(Clone, Debug, PartialEq)]
29pub enum ControlSocketReply {
30    /// Peer was rejected by credential policy before any bytes were read.
31    DroppedPeer,
32    /// The connection was handled as a Hello exchange.
33    Hello(HelloReply),
34    /// The connection was handled as an admin request.
35    Admin(AdminReply),
36}
37
38/// Connection limit for a broker control-socket accept loop.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum ControlSocketConnectionLimit {
41    /// Accept exactly this many connections, then return.
42    Bounded(NonZeroUsize),
43    /// Continue accepting until the process exits or binding/accepting fails.
44    Unbounded,
45}
46
47impl ControlSocketConnectionLimit {
48    fn should_continue(self, accepted: usize) -> bool {
49        match self {
50            Self::Bounded(limit) => accepted < limit.get(),
51            Self::Unbounded => true,
52        }
53    }
54}
55
56/// Handle one already-accepted broker control connection.
57pub fn handle_control_connection_with_peer_policy<S, R, F>(
58    stream: &mut S,
59    hello_responder: &R,
60    snapshot_provider: &F,
61    peer: PeerIdentity,
62    peer_policy: &PeerCredentialPolicy,
63) -> Result<ControlSocketReply, ControlSocketError>
64where
65    S: Read + Write,
66    R: HelloResponder + ?Sized,
67    F: Fn() -> AdminSnapshot + ?Sized,
68{
69    handle_control_connection_with_peer_policy_and_fd_guard(
70        stream,
71        hello_responder,
72        snapshot_provider,
73        peer,
74        peer_policy,
75        None,
76    )
77}
78
79/// Handle one already-accepted broker control connection, refusing Hello
80/// frames with `ERROR_FD_PRESSURE` while `fd_guard` reports a demotion
81/// (#390). Admin frames are always served so `status` can surface the
82/// demoted state.
83pub fn handle_control_connection_with_peer_policy_and_fd_guard<S, R, F>(
84    stream: &mut S,
85    hello_responder: &R,
86    snapshot_provider: &F,
87    peer: PeerIdentity,
88    peer_policy: &PeerCredentialPolicy,
89    fd_guard: Option<&FdPressureGuard>,
90) -> Result<ControlSocketReply, ControlSocketError>
91where
92    S: Read + Write,
93    R: HelloResponder + ?Sized,
94    F: Fn() -> AdminSnapshot + ?Sized,
95{
96    if !peer_policy.allows(&peer) {
97        return Ok(ControlSocketReply::DroppedPeer);
98    }
99
100    let request_bytes = match read_frame(stream) {
101        Ok(bytes) => bytes,
102        Err(err) => {
103            let reply = reply_for_framing_error(&err);
104            write_response_frame(stream, None, &reply)?;
105            return Ok(ControlSocketReply::Hello(reply));
106        }
107    };
108
109    let request_frame = match Frame::decode(request_bytes.as_slice()) {
110        Ok(frame) => frame,
111        Err(_) => {
112            let reply = refused_reply(ErrorCode::ErrorPeerRejected, "malformed broker Frame", 0);
113            write_response_frame(stream, None, &reply)?;
114            return Ok(ControlSocketReply::Hello(reply));
115        }
116    };
117
118    if request_frame.payload_protocol == ADMIN_PAYLOAD_PROTOCOL {
119        let snapshot = snapshot_provider();
120        let response_frame = handle_admin_frame(request_frame, &snapshot)?;
121        let reply = write_admin_response_frame(stream, &response_frame)?;
122        return Ok(ControlSocketReply::Admin(reply));
123    }
124
125    let reply = if request_bytes.len() > MAX_HELLO_BYTES {
126        refused_reply(
127            ErrorCode::ErrorPeerRejected,
128            "initial Hello frame exceeds 64 KiB",
129            0,
130        )
131    } else if let Some(guard) = fd_guard.filter(|guard| guard.is_demoted()) {
132        guard.refusal_reply()
133    } else {
134        hello_responder.handle_frame(request_frame.clone(), peer)
135    };
136    write_response_frame(stream, Some(&request_frame), &reply)?;
137    Ok(ControlSocketReply::Hello(reply))
138}
139
140/// Run a bounded local-socket accept loop that dispatches Hello and admin
141/// frames on the same endpoint.
142pub fn serve_control_socket_connections_with_policy<R, F>(
143    socket_path: &str,
144    hello_responder: &R,
145    snapshot_provider: F,
146    connection_count: usize,
147    peer_policy: &PeerCredentialPolicy,
148) -> Result<(), ControlSocketError>
149where
150    R: HelloResponder + ?Sized,
151    F: Fn() -> AdminSnapshot,
152{
153    let Some(connection_count) = NonZeroUsize::new(connection_count) else {
154        return Ok(());
155    };
156
157    serve_control_socket_connections_with_limit_and_policy(
158        socket_path,
159        hello_responder,
160        snapshot_provider,
161        ControlSocketConnectionLimit::Bounded(connection_count),
162        peer_policy,
163    )
164}
165
166/// Run a broker control-socket accept loop that dispatches Hello and admin
167/// frames on the same endpoint.
168pub fn serve_control_socket_connections_with_limit_and_policy<R, F>(
169    socket_path: &str,
170    hello_responder: &R,
171    snapshot_provider: F,
172    connection_limit: ControlSocketConnectionLimit,
173    peer_policy: &PeerCredentialPolicy,
174) -> Result<(), ControlSocketError>
175where
176    R: HelloResponder + ?Sized,
177    F: Fn() -> AdminSnapshot,
178{
179    serve_control_socket_connections_with_limit_policy_and_post_hello(
180        socket_path,
181        hello_responder,
182        snapshot_provider,
183        connection_limit,
184        peer_policy,
185        |_stream, _reply| {},
186    )
187}
188
189/// Run a broker control-socket accept loop with a post-Hello connection hook.
190///
191/// `post_hello` runs after a Hello reply has been written, with the client
192/// connection still open. The production serve path uses it to attempt the
193/// optional handle-passing handoff (#387) when negotiation issued a handoff
194/// token; the hook must stay silent toward the client on failure.
195pub fn serve_control_socket_connections_with_limit_policy_and_post_hello<R, F, H>(
196    socket_path: &str,
197    hello_responder: &R,
198    snapshot_provider: F,
199    connection_limit: ControlSocketConnectionLimit,
200    peer_policy: &PeerCredentialPolicy,
201    post_hello: H,
202) -> Result<(), ControlSocketError>
203where
204    R: HelloResponder + ?Sized,
205    F: Fn() -> AdminSnapshot,
206    H: FnMut(&mut interprocess::local_socket::Stream, &HelloReply),
207{
208    let fd_guard = FdPressureGuard::default();
209    serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
210        socket_path,
211        hello_responder,
212        snapshot_provider,
213        connection_limit,
214        peer_policy,
215        post_hello,
216        &fd_guard,
217    )
218}
219
220/// Run a broker control-socket accept loop with fd-pressure self-demotion
221/// (#390).
222///
223/// `fd_guard` is shared so callers can surface the demotion state in admin
224/// snapshots. When `accept()` fails with EMFILE/ENFILE the loop demotes
225/// instead of returning the error: subsequent Hello connections receive a
226/// structured `ERROR_FD_PRESSURE` refusal (admin verbs keep working), and
227/// the guard recovers automatically after a streak of successful accepts.
228#[allow(clippy::too_many_arguments)]
229pub fn serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard<R, F, H>(
230    socket_path: &str,
231    hello_responder: &R,
232    snapshot_provider: F,
233    connection_limit: ControlSocketConnectionLimit,
234    peer_policy: &PeerCredentialPolicy,
235    mut post_hello: H,
236    fd_guard: &FdPressureGuard,
237) -> Result<(), ControlSocketError>
238where
239    R: HelloResponder + ?Sized,
240    F: Fn() -> AdminSnapshot,
241    H: FnMut(&mut interprocess::local_socket::Stream, &HelloReply),
242{
243    /// Back-off between accepts while demoted so a hard fd-exhaustion loop
244    /// cannot spin the broker's CPU at 100%.
245    const FD_PRESSURE_ACCEPT_BACKOFF: std::time::Duration = std::time::Duration::from_millis(50);
246
247    let listener = bind_local_socket(socket_path)?;
248    let cleanup = LocalSocketCleanup(socket_path);
249    let result = (|| {
250        let mut accepted = 0;
251        while connection_limit.should_continue(accepted) {
252            let mut stream = match listener.accept() {
253                Ok(stream) => {
254                    fd_guard.on_accept_ok();
255                    stream
256                }
257                Err(err) => {
258                    let was_demoted = fd_guard.is_demoted();
259                    if fd_guard.on_accept_error(&err) == FdPressureDecision::Demoted {
260                        if !was_demoted {
261                            eprintln!(
262                                "running-process-broker: accept on {socket_path} demoted \
263                                 under fd pressure: {err}"
264                            );
265                        }
266                        accepted += 1;
267                        std::thread::sleep(FD_PRESSURE_ACCEPT_BACKOFF);
268                        continue;
269                    }
270                    return Err(BrokerConnectionError::Io(err).into());
271                }
272            };
273            accepted += 1;
274            let peer = peer_identity_from_stream(&stream)?;
275            let reply = handle_control_connection_with_peer_policy_and_fd_guard(
276                &mut stream,
277                hello_responder,
278                &snapshot_provider,
279                peer.clone(),
280                peer_policy,
281                Some(fd_guard),
282            )?;
283            if reply == ControlSocketReply::DroppedPeer {
284                eprintln!(
285                    "running-process-broker: dropped connection on {socket_path} from peer \
286                     pid={} uid_or_sid={:?}: credential policy refused",
287                    peer.pid, peer.uid_or_sid
288                );
289            }
290            if let ControlSocketReply::Hello(hello_reply) = &reply {
291                post_hello(&mut stream, hello_reply);
292            }
293        }
294        Ok(())
295    })();
296    drop(listener);
297    drop(cleanup);
298    result
299}
300
301fn write_admin_response_frame<W: Write>(
302    writer: &mut W,
303    response_frame: &Frame,
304) -> Result<AdminReply, ControlSocketError> {
305    let mut response_bytes = Vec::new();
306    response_frame
307        .encode(&mut response_bytes)
308        .map_err(ControlSocketError::EncodeFrame)?;
309    write_frame(writer, &response_bytes)?;
310    AdminReply::decode(response_frame.payload.as_slice())
311        .map_err(ControlSocketError::DecodeAdminReply)
312}
313
314/// Errors raised while dispatching a shared broker control socket frame.
315#[derive(Debug, thiserror::Error)]
316pub enum ControlSocketError {
317    /// Hello/local-socket connection handling failed.
318    #[error(transparent)]
319    Connection(#[from] BrokerConnectionError),
320    /// Frame read/write failed.
321    #[error(transparent)]
322    Framing(#[from] FramingError),
323    /// Admin frame validation or dispatch failed.
324    #[error(transparent)]
325    AdminFrame(#[from] AdminFrameError),
326    /// The response frame could not be encoded.
327    #[error("failed to encode broker control response Frame: {0}")]
328    EncodeFrame(prost::EncodeError),
329    /// The admin response payload could not be decoded after dispatch.
330    #[error("failed to decode admin reply payload: {0}")]
331    DecodeAdminReply(prost::DecodeError),
332}