1use std::io::{Read, Write};
8use std::num::NonZeroUsize;
9
10use interprocess::local_socket::traits::Listener;
11use prost::Message;
12
13use crate::broker::protocol::{
14 read_frame, write_frame, AdminReply, ErrorCode, Frame, FramingError, HelloReply,
15 MAX_HELLO_BYTES,
16};
17
18use super::admin::{handle_admin_frame, AdminFrameError, AdminSnapshot, ADMIN_PAYLOAD_PROTOCOL};
19use super::connection::{
20 bind_local_socket, peer_identity_from_stream, refused_reply, reply_for_framing_error,
21 write_response_frame, BrokerConnectionError, HelloResponder, LocalSocketCleanup,
22 PeerCredentialPolicy,
23};
24use super::fd_pressure::{FdPressureDecision, FdPressureGuard};
25use super::hello_handler::PeerIdentity;
26
27#[derive(Clone, Debug, PartialEq)]
29pub enum ControlSocketReply {
30 DroppedPeer,
32 Hello(HelloReply),
34 Admin(AdminReply),
36}
37
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum ControlSocketConnectionLimit {
41 Bounded(NonZeroUsize),
43 Unbounded,
45}
46
47impl ControlSocketConnectionLimit {
48 fn should_continue(self, accepted: usize) -> bool {
49 match self {
50 Self::Bounded(limit) => accepted < limit.get(),
51 Self::Unbounded => true,
52 }
53 }
54}
55
56pub fn handle_control_connection_with_peer_policy<S, R, F>(
58 stream: &mut S,
59 hello_responder: &R,
60 snapshot_provider: &F,
61 peer: PeerIdentity,
62 peer_policy: &PeerCredentialPolicy,
63) -> Result<ControlSocketReply, ControlSocketError>
64where
65 S: Read + Write,
66 R: HelloResponder + ?Sized,
67 F: Fn() -> AdminSnapshot + ?Sized,
68{
69 handle_control_connection_with_peer_policy_and_fd_guard(
70 stream,
71 hello_responder,
72 snapshot_provider,
73 peer,
74 peer_policy,
75 None,
76 )
77}
78
79pub fn handle_control_connection_with_peer_policy_and_fd_guard<S, R, F>(
84 stream: &mut S,
85 hello_responder: &R,
86 snapshot_provider: &F,
87 peer: PeerIdentity,
88 peer_policy: &PeerCredentialPolicy,
89 fd_guard: Option<&FdPressureGuard>,
90) -> Result<ControlSocketReply, ControlSocketError>
91where
92 S: Read + Write,
93 R: HelloResponder + ?Sized,
94 F: Fn() -> AdminSnapshot + ?Sized,
95{
96 if !peer_policy.allows(&peer) {
97 return Ok(ControlSocketReply::DroppedPeer);
98 }
99
100 let request_bytes = match read_frame(stream) {
101 Ok(bytes) => bytes,
102 Err(err) => {
103 let reply = reply_for_framing_error(&err);
104 write_response_frame(stream, None, &reply)?;
105 return Ok(ControlSocketReply::Hello(reply));
106 }
107 };
108
109 let request_frame = match Frame::decode(request_bytes.as_slice()) {
110 Ok(frame) => frame,
111 Err(_) => {
112 let reply = refused_reply(ErrorCode::ErrorPeerRejected, "malformed broker Frame", 0);
113 write_response_frame(stream, None, &reply)?;
114 return Ok(ControlSocketReply::Hello(reply));
115 }
116 };
117
118 if request_frame.payload_protocol == ADMIN_PAYLOAD_PROTOCOL {
119 let snapshot = snapshot_provider();
120 let response_frame = handle_admin_frame(request_frame, &snapshot)?;
121 let reply = write_admin_response_frame(stream, &response_frame)?;
122 return Ok(ControlSocketReply::Admin(reply));
123 }
124
125 let reply = if request_bytes.len() > MAX_HELLO_BYTES {
126 refused_reply(
127 ErrorCode::ErrorPeerRejected,
128 "initial Hello frame exceeds 64 KiB",
129 0,
130 )
131 } else if let Some(guard) = fd_guard.filter(|guard| guard.is_demoted()) {
132 guard.refusal_reply()
133 } else {
134 hello_responder.handle_frame(request_frame.clone(), peer)
135 };
136 write_response_frame(stream, Some(&request_frame), &reply)?;
137 Ok(ControlSocketReply::Hello(reply))
138}
139
140pub fn serve_control_socket_connections_with_policy<R, F>(
143 socket_path: &str,
144 hello_responder: &R,
145 snapshot_provider: F,
146 connection_count: usize,
147 peer_policy: &PeerCredentialPolicy,
148) -> Result<(), ControlSocketError>
149where
150 R: HelloResponder + ?Sized,
151 F: Fn() -> AdminSnapshot,
152{
153 let Some(connection_count) = NonZeroUsize::new(connection_count) else {
154 return Ok(());
155 };
156
157 serve_control_socket_connections_with_limit_and_policy(
158 socket_path,
159 hello_responder,
160 snapshot_provider,
161 ControlSocketConnectionLimit::Bounded(connection_count),
162 peer_policy,
163 )
164}
165
166pub fn serve_control_socket_connections_with_limit_and_policy<R, F>(
169 socket_path: &str,
170 hello_responder: &R,
171 snapshot_provider: F,
172 connection_limit: ControlSocketConnectionLimit,
173 peer_policy: &PeerCredentialPolicy,
174) -> Result<(), ControlSocketError>
175where
176 R: HelloResponder + ?Sized,
177 F: Fn() -> AdminSnapshot,
178{
179 serve_control_socket_connections_with_limit_policy_and_post_hello(
180 socket_path,
181 hello_responder,
182 snapshot_provider,
183 connection_limit,
184 peer_policy,
185 |_stream, _reply| {},
186 )
187}
188
189pub fn serve_control_socket_connections_with_limit_policy_and_post_hello<R, F, H>(
196 socket_path: &str,
197 hello_responder: &R,
198 snapshot_provider: F,
199 connection_limit: ControlSocketConnectionLimit,
200 peer_policy: &PeerCredentialPolicy,
201 post_hello: H,
202) -> Result<(), ControlSocketError>
203where
204 R: HelloResponder + ?Sized,
205 F: Fn() -> AdminSnapshot,
206 H: FnMut(&mut interprocess::local_socket::Stream, &HelloReply),
207{
208 let fd_guard = FdPressureGuard::default();
209 serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
210 socket_path,
211 hello_responder,
212 snapshot_provider,
213 connection_limit,
214 peer_policy,
215 post_hello,
216 &fd_guard,
217 )
218}
219
220#[allow(clippy::too_many_arguments)]
229pub fn serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard<R, F, H>(
230 socket_path: &str,
231 hello_responder: &R,
232 snapshot_provider: F,
233 connection_limit: ControlSocketConnectionLimit,
234 peer_policy: &PeerCredentialPolicy,
235 mut post_hello: H,
236 fd_guard: &FdPressureGuard,
237) -> Result<(), ControlSocketError>
238where
239 R: HelloResponder + ?Sized,
240 F: Fn() -> AdminSnapshot,
241 H: FnMut(&mut interprocess::local_socket::Stream, &HelloReply),
242{
243 const FD_PRESSURE_ACCEPT_BACKOFF: std::time::Duration = std::time::Duration::from_millis(50);
246
247 let listener = bind_local_socket(socket_path)?;
248 let cleanup = LocalSocketCleanup(socket_path);
249 let result = (|| {
250 let mut accepted = 0;
251 while connection_limit.should_continue(accepted) {
252 let mut stream = match listener.accept() {
253 Ok(stream) => {
254 fd_guard.on_accept_ok();
255 stream
256 }
257 Err(err) => {
258 let was_demoted = fd_guard.is_demoted();
259 if fd_guard.on_accept_error(&err) == FdPressureDecision::Demoted {
260 if !was_demoted {
261 eprintln!(
262 "running-process-broker: accept on {socket_path} demoted \
263 under fd pressure: {err}"
264 );
265 }
266 accepted += 1;
267 std::thread::sleep(FD_PRESSURE_ACCEPT_BACKOFF);
268 continue;
269 }
270 return Err(BrokerConnectionError::Io(err).into());
271 }
272 };
273 accepted += 1;
274 let peer = peer_identity_from_stream(&stream)?;
275 let reply = handle_control_connection_with_peer_policy_and_fd_guard(
276 &mut stream,
277 hello_responder,
278 &snapshot_provider,
279 peer.clone(),
280 peer_policy,
281 Some(fd_guard),
282 )?;
283 if reply == ControlSocketReply::DroppedPeer {
284 eprintln!(
285 "running-process-broker: dropped connection on {socket_path} from peer \
286 pid={} uid_or_sid={:?}: credential policy refused",
287 peer.pid, peer.uid_or_sid
288 );
289 }
290 if let ControlSocketReply::Hello(hello_reply) = &reply {
291 post_hello(&mut stream, hello_reply);
292 }
293 }
294 Ok(())
295 })();
296 drop(listener);
297 drop(cleanup);
298 result
299}
300
301fn write_admin_response_frame<W: Write>(
302 writer: &mut W,
303 response_frame: &Frame,
304) -> Result<AdminReply, ControlSocketError> {
305 let mut response_bytes = Vec::new();
306 response_frame
307 .encode(&mut response_bytes)
308 .map_err(ControlSocketError::EncodeFrame)?;
309 write_frame(writer, &response_bytes)?;
310 AdminReply::decode(response_frame.payload.as_slice())
311 .map_err(ControlSocketError::DecodeAdminReply)
312}
313
314#[derive(Debug, thiserror::Error)]
316pub enum ControlSocketError {
317 #[error(transparent)]
319 Connection(#[from] BrokerConnectionError),
320 #[error(transparent)]
322 Framing(#[from] FramingError),
323 #[error(transparent)]
325 AdminFrame(#[from] AdminFrameError),
326 #[error("failed to encode broker control response Frame: {0}")]
328 EncodeFrame(prost::EncodeError),
329 #[error("failed to decode admin reply payload: {0}")]
331 DecodeAdminReply(prost::DecodeError),
332}