running_process/broker/server/
control_socket.rs1use std::io::{Read, Write};
8use std::num::NonZeroUsize;
9
10use interprocess::local_socket::traits::Listener;
11use prost::Message;
12
13use crate::broker::protocol::{
14 read_frame, write_frame, AdminReply, ErrorCode, Frame, FramingError, HelloReply,
15 MAX_HELLO_BYTES,
16};
17
18use super::admin::{handle_admin_frame, AdminFrameError, AdminSnapshot, ADMIN_PAYLOAD_PROTOCOL};
19use super::connection::{
20 bind_local_socket, peer_identity_from_stream, refused_reply, reply_for_framing_error,
21 write_response_frame, BrokerConnectionError, HelloResponder, LocalSocketCleanup,
22 PeerCredentialPolicy,
23};
24use super::hello_handler::PeerIdentity;
25
26#[derive(Clone, Debug, PartialEq)]
28pub enum ControlSocketReply {
29 DroppedPeer,
31 Hello(HelloReply),
33 Admin(AdminReply),
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
39pub enum ControlSocketConnectionLimit {
40 Bounded(NonZeroUsize),
42 Unbounded,
44}
45
46impl ControlSocketConnectionLimit {
47 fn should_continue(self, accepted: usize) -> bool {
48 match self {
49 Self::Bounded(limit) => accepted < limit.get(),
50 Self::Unbounded => true,
51 }
52 }
53}
54
55pub fn handle_control_connection_with_peer_policy<S, R, F>(
57 stream: &mut S,
58 hello_responder: &R,
59 snapshot_provider: &F,
60 peer: PeerIdentity,
61 peer_policy: &PeerCredentialPolicy,
62) -> Result<ControlSocketReply, ControlSocketError>
63where
64 S: Read + Write,
65 R: HelloResponder + ?Sized,
66 F: Fn() -> AdminSnapshot + ?Sized,
67{
68 if !peer_policy.allows(&peer) {
69 return Ok(ControlSocketReply::DroppedPeer);
70 }
71
72 let request_bytes = match read_frame(stream) {
73 Ok(bytes) => bytes,
74 Err(err) => {
75 let reply = reply_for_framing_error(&err);
76 write_response_frame(stream, None, &reply)?;
77 return Ok(ControlSocketReply::Hello(reply));
78 }
79 };
80
81 let request_frame = match Frame::decode(request_bytes.as_slice()) {
82 Ok(frame) => frame,
83 Err(_) => {
84 let reply = refused_reply(ErrorCode::ErrorPeerRejected, "malformed broker Frame", 0);
85 write_response_frame(stream, None, &reply)?;
86 return Ok(ControlSocketReply::Hello(reply));
87 }
88 };
89
90 if request_frame.payload_protocol == ADMIN_PAYLOAD_PROTOCOL {
91 let snapshot = snapshot_provider();
92 let response_frame = handle_admin_frame(request_frame, &snapshot)?;
93 let reply = write_admin_response_frame(stream, &response_frame)?;
94 return Ok(ControlSocketReply::Admin(reply));
95 }
96
97 let reply = if request_bytes.len() > MAX_HELLO_BYTES {
98 refused_reply(
99 ErrorCode::ErrorPeerRejected,
100 "initial Hello frame exceeds 64 KiB",
101 0,
102 )
103 } else {
104 hello_responder.handle_frame(request_frame.clone(), peer)
105 };
106 write_response_frame(stream, Some(&request_frame), &reply)?;
107 Ok(ControlSocketReply::Hello(reply))
108}
109
110pub fn serve_control_socket_connections_with_policy<R, F>(
113 socket_path: &str,
114 hello_responder: &R,
115 snapshot_provider: F,
116 connection_count: usize,
117 peer_policy: &PeerCredentialPolicy,
118) -> Result<(), ControlSocketError>
119where
120 R: HelloResponder + ?Sized,
121 F: Fn() -> AdminSnapshot,
122{
123 let Some(connection_count) = NonZeroUsize::new(connection_count) else {
124 return Ok(());
125 };
126
127 serve_control_socket_connections_with_limit_and_policy(
128 socket_path,
129 hello_responder,
130 snapshot_provider,
131 ControlSocketConnectionLimit::Bounded(connection_count),
132 peer_policy,
133 )
134}
135
136pub fn serve_control_socket_connections_with_limit_and_policy<R, F>(
139 socket_path: &str,
140 hello_responder: &R,
141 snapshot_provider: F,
142 connection_limit: ControlSocketConnectionLimit,
143 peer_policy: &PeerCredentialPolicy,
144) -> Result<(), ControlSocketError>
145where
146 R: HelloResponder + ?Sized,
147 F: Fn() -> AdminSnapshot,
148{
149 let listener = bind_local_socket(socket_path)?;
150 let cleanup = LocalSocketCleanup(socket_path);
151 let result = (|| {
152 let mut accepted = 0;
153 while connection_limit.should_continue(accepted) {
154 let mut stream = listener.accept().map_err(BrokerConnectionError::Io)?;
155 accepted += 1;
156 let peer = peer_identity_from_stream(&stream)?;
157 let _reply = handle_control_connection_with_peer_policy(
158 &mut stream,
159 hello_responder,
160 &snapshot_provider,
161 peer,
162 peer_policy,
163 )?;
164 }
165 Ok(())
166 })();
167 drop(listener);
168 drop(cleanup);
169 result
170}
171
172fn write_admin_response_frame<W: Write>(
173 writer: &mut W,
174 response_frame: &Frame,
175) -> Result<AdminReply, ControlSocketError> {
176 let mut response_bytes = Vec::new();
177 response_frame
178 .encode(&mut response_bytes)
179 .map_err(ControlSocketError::EncodeFrame)?;
180 write_frame(writer, &response_bytes)?;
181 AdminReply::decode(response_frame.payload.as_slice())
182 .map_err(ControlSocketError::DecodeAdminReply)
183}
184
185#[derive(Debug, thiserror::Error)]
187pub enum ControlSocketError {
188 #[error(transparent)]
190 Connection(#[from] BrokerConnectionError),
191 #[error(transparent)]
193 Framing(#[from] FramingError),
194 #[error(transparent)]
196 AdminFrame(#[from] AdminFrameError),
197 #[error("failed to encode broker control response Frame: {0}")]
199 EncodeFrame(prost::EncodeError),
200 #[error("failed to decode admin reply payload: {0}")]
202 DecodeAdminReply(prost::DecodeError),
203}