rust_supervisor/dashboard/
runtime.rs1use crate::config::audit::AuditConfig;
7use crate::control::handle::SupervisorHandle;
8use crate::dashboard::config::ValidatedDashboardIpcConfig;
9use crate::dashboard::error::DashboardError;
10use crate::dashboard::ipc_server::{DashboardIpcService, bind_dashboard_listener};
11use crate::dashboard::protocol::{IpcResponse, parse_request_line, response_to_line};
12use crate::dashboard::registration::run_registration_heartbeat;
13use crate::dashboard::state::declared_state_from_spec;
14use crate::ipc::security::IpcSecurityPipeline;
15use crate::ipc::security::peer_identity::{PeerIdentity, extract_peer_identity};
16use crate::journal::ring::EventJournal;
17use crate::spec::supervisor::SupervisorSpec;
18use std::fmt;
19use std::os::unix::io::AsRawFd;
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use tokio::io::{AsyncReadExt, AsyncWriteExt};
24use tokio::net::{UnixListener, UnixStream};
25use tokio::task::{JoinHandle, JoinSet};
26
27const DEFAULT_MAX_FRAME_BYTES: usize = 1_048_576;
29
30static CONNECTION_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33pub struct DashboardIpcRuntimeGuard {
35 ipc_path: PathBuf,
37 ipc_task: JoinHandle<()>,
39 heartbeat_task: Option<JoinHandle<()>>,
41}
42
43impl fmt::Debug for DashboardIpcRuntimeGuard {
44 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
46 formatter
47 .debug_struct("DashboardIpcRuntimeGuard")
48 .field("ipc_path", &self.ipc_path)
49 .field("has_heartbeat_task", &self.heartbeat_task.is_some())
50 .finish_non_exhaustive()
51 }
52}
53
54impl Drop for DashboardIpcRuntimeGuard {
55 fn drop(&mut self) {
57 self.ipc_task.abort();
58 if let Some(task) = self.heartbeat_task.as_ref() {
59 task.abort();
60 }
61 if let Err(error) = std::fs::remove_file(&self.ipc_path)
62 && error.kind() != std::io::ErrorKind::NotFound
63 {
64 tracing::warn!(
65 ipc_path = %self.ipc_path.display(),
66 ?error,
67 "failed to remove dashboard IPC socket"
68 );
69 }
70 }
71}
72
73pub fn start_dashboard_ipc_runtime(
86 config: ValidatedDashboardIpcConfig,
87 audit_config: AuditConfig,
88 spec: SupervisorSpec,
89 handle: SupervisorHandle,
90) -> Result<Arc<DashboardIpcRuntimeGuard>, DashboardError> {
91 let listener = bind_dashboard_listener(&config)?;
92 let ipc_path = config.path.clone();
93 let target_id = config.target_id.clone();
94 let service = dashboard_service(config.clone(), audit_config, spec, handle);
95 let ipc_task = tokio::spawn(run_accept_loop(listener, service, target_id));
96 let heartbeat_task = start_heartbeat_task(config);
97
98 Ok(Arc::new(DashboardIpcRuntimeGuard {
99 ipc_path,
100 ipc_task,
101 heartbeat_task,
102 }))
103}
104
105fn dashboard_service(
111 config: ValidatedDashboardIpcConfig,
112 audit_config: AuditConfig,
113 spec: SupervisorSpec,
114 handle: SupervisorHandle,
115) -> Arc<DashboardIpcService> {
116 let state = declared_state_from_spec(&spec);
117 let journal = EventJournal::new(spec.event_channel_capacity);
118 let mut service =
119 DashboardIpcService::new(config.clone(), spec, state, journal).with_handle(handle);
120 if let Some(security_config) = config.security_config {
121 let pipeline = IpcSecurityPipeline::new(security_config, audit_config);
122 service = service.with_security_pipeline(pipeline);
123 }
124 Arc::new(service)
125}
126
127fn start_heartbeat_task(config: ValidatedDashboardIpcConfig) -> Option<JoinHandle<()>> {
129 config.registration.as_ref()?;
130 Some(tokio::spawn(async move {
131 if let Err(error) = run_registration_heartbeat(config).await {
132 tracing::warn!(?error, "dashboard registration heartbeat stopped");
133 }
134 }))
135}
136
137async fn run_accept_loop(
139 listener: UnixListener,
140 service: Arc<DashboardIpcService>,
141 target_id: String,
142) {
143 let mut connections = JoinSet::new();
144 loop {
145 tokio::select! {
146 accepted = listener.accept() => {
147 match accepted {
148 Ok((stream, _)) => {
149 let service = Arc::clone(&service);
150 let target_id = target_id.clone();
151 connections.spawn(async move {
152 handle_connection(stream, service, target_id).await
153 });
154 }
155 Err(error) => {
156 tracing::warn!(?error, "dashboard IPC accept loop stopped");
157 break;
158 }
159 }
160 }
161 Some(joined) = connections.join_next() => {
162 match joined {
163 Ok(Ok(())) => {}
164 Ok(Err(error)) => {
165 tracing::warn!(?error, "dashboard IPC connection ended with error");
166 }
167 Err(error) => {
168 tracing::warn!(?error, "dashboard IPC connection task failed");
169 }
170 }
171 }
172 }
173 }
174}
175
176async fn handle_connection(
179 stream: UnixStream,
180 service: Arc<DashboardIpcService>,
181 target_id: String,
182) -> Result<(), DashboardError> {
183 let std_stream = stream.into_std().map_err(|error| {
185 io_error(
186 "ipc_into_std_failed",
187 "ipc_connect",
188 Some(target_id.clone()),
189 error,
190 )
191 })?;
192 let peer = extract_peer_identity(&std_stream)?;
193 let raw_fd = std_stream.as_raw_fd();
194 let connection_id = format!(
195 "conn-{raw_fd}-{}",
196 CONNECTION_COUNTER.fetch_add(1, Ordering::Relaxed)
197 );
198 let stream = UnixStream::from_std(std_stream).map_err(|error| {
199 io_error(
200 "ipc_from_std_failed",
201 "ipc_connect",
202 Some(target_id.clone()),
203 error,
204 )
205 })?;
206
207 let mut reader = BoundedFrameReader::new(stream, DEFAULT_MAX_FRAME_BYTES);
208 loop {
209 match reader.read_frame().await {
210 Ok(Some(raw_frame)) => {
211 let raw_body_len = raw_frame.len();
212 let response =
213 response_for_line(&service, &raw_frame, &peer, &connection_id, raw_body_len)
214 .await;
215 write_response(&mut reader, &response, &target_id).await?;
216 }
217 Ok(None) => {
218 return Ok(());
220 }
221 Err(error) => {
222 return Err(error);
223 }
224 }
225 }
226}
227
228struct BoundedFrameReader {
231 stream: UnixStream,
233 max_bytes: usize,
235 buf: Vec<u8>,
237}
238
239impl BoundedFrameReader {
240 fn new(stream: UnixStream, max_bytes: usize) -> Self {
242 Self {
243 stream,
244 max_bytes,
245 buf: Vec::with_capacity(max_bytes.min(4096)),
246 }
247 }
248
249 async fn read_frame(&mut self) -> Result<Option<String>, DashboardError> {
255 self.buf.clear();
256 loop {
257 let mut byte = [0u8; 1];
258 match self.stream.read_exact(&mut byte).await {
259 Ok(_bytes_read) => {
260 if byte[0] == b'\n' {
261 let frame = String::from_utf8(self.buf.clone()).map_err(|_| {
262 DashboardError::new(
263 "invalid_utf8",
264 "ipc_read",
265 None,
266 "frame is not valid UTF-8".to_owned(),
267 false,
268 )
269 })?;
270 return Ok(Some(frame));
271 }
272 self.buf.push(byte[0]);
273 if self.buf.len() > self.max_bytes {
274 return Err(DashboardError::new(
275 "frame_too_large",
276 "ipc_read",
277 None,
278 format!("frame exceeded maximum size of {} bytes", self.max_bytes),
279 false,
280 ));
281 }
282 }
283 Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
284 if self.buf.is_empty() {
285 return Ok(None);
286 }
287 return Err(DashboardError::new(
288 "incomplete_frame",
289 "ipc_read",
290 None,
291 "connection closed before newline delimiter".to_owned(),
292 false,
293 ));
294 }
295 Err(err) => {
296 return Err(io_error("ipc_read_failed", "ipc_read", None, err));
297 }
298 }
299 }
300 }
301
302 fn stream_mut(&mut self) -> &mut UnixStream {
304 &mut self.stream
305 }
306}
307
308impl std::os::unix::io::AsRawFd for BoundedFrameReader {
309 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
310 self.stream.as_raw_fd()
311 }
312}
313
314async fn response_for_line(
316 service: &DashboardIpcService,
317 line: &str,
318 peer: &PeerIdentity,
319 connection_id: &str,
320 raw_body_len: usize,
321) -> IpcResponse {
322 match parse_request_line(line) {
323 Ok(request) => {
324 service
325 .handle_request(request, peer, connection_id, raw_body_len)
326 .await
327 }
328 Err(error) => IpcResponse::error("invalid-request", error),
329 }
330}
331
332async fn write_response(
334 reader: &mut BoundedFrameReader,
335 response: &IpcResponse,
336 target_id: &str,
337) -> Result<(), DashboardError> {
338 let line = response_to_line(response)?;
339 reader
340 .stream_mut()
341 .write_all(line.as_bytes())
342 .await
343 .map_err(|error| {
344 io_error(
345 "ipc_write_failed",
346 "ipc_write",
347 Some(target_id.to_owned()),
348 error,
349 )
350 })
351}
352
353fn io_error(
355 code: &str,
356 stage: &str,
357 target_id: Option<String>,
358 error: std::io::Error,
359) -> DashboardError {
360 DashboardError::new(code, stage, target_id, error.to_string(), true)
361}