Skip to main content

rust_supervisor/dashboard/
runtime.rs

1//! Dashboard IPC runtime lifecycle.
2//!
3//! The runtime owns the target-side Unix socket accept loop and the dynamic
4//! registration heartbeat used by the relay integration.
5
6use crate::config::audit::AuditConfig;
7use crate::control::handle::SupervisorHandle;
8use crate::dashboard::config::ValidatedDashboardIpcConfig;
9use crate::dashboard::error::DashboardError;
10use crate::dashboard::ipc_server::{DashboardIpcService, bind_dashboard_listener};
11use crate::dashboard::protocol::{IpcResponse, parse_request_line, response_to_line};
12use crate::dashboard::registration::run_registration_heartbeat;
13use crate::dashboard::state::declared_state_from_spec;
14use crate::ipc::security::IpcSecurityPipeline;
15use crate::ipc::security::peer_identity::{PeerIdentity, extract_peer_identity};
16use crate::journal::ring::EventJournal;
17use crate::spec::supervisor::SupervisorSpec;
18use std::fmt;
19use std::os::unix::io::AsRawFd;
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use tokio::io::{AsyncReadExt, AsyncWriteExt};
24use tokio::net::{UnixListener, UnixStream};
25use tokio::task::{JoinHandle, JoinSet};
26
27/// Default maximum frame size for bounded frame reader: 1 MiB.
28const DEFAULT_MAX_FRAME_BYTES: usize = 1_048_576;
29
30/// Per-process connection counter for unique connection_id generation.
31static CONNECTION_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33/// Guard that owns dashboard IPC background tasks and socket cleanup.
34pub struct DashboardIpcRuntimeGuard {
35    /// Socket path created by this runtime.
36    ipc_path: PathBuf,
37    /// Target-side IPC accept task.
38    ipc_task: JoinHandle<()>,
39    /// Optional registration heartbeat task.
40    heartbeat_task: Option<JoinHandle<()>>,
41}
42
43impl fmt::Debug for DashboardIpcRuntimeGuard {
44    /// Formats guard diagnostics without exposing task internals.
45    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
46        formatter
47            .debug_struct("DashboardIpcRuntimeGuard")
48            .field("ipc_path", &self.ipc_path)
49            .field("has_heartbeat_task", &self.heartbeat_task.is_some())
50            .finish_non_exhaustive()
51    }
52}
53
54impl Drop for DashboardIpcRuntimeGuard {
55    /// Stops background tasks and removes the socket created by this runtime.
56    fn drop(&mut self) {
57        self.ipc_task.abort();
58        if let Some(task) = self.heartbeat_task.as_ref() {
59            task.abort();
60        }
61        if let Err(error) = std::fs::remove_file(&self.ipc_path)
62            && error.kind() != std::io::ErrorKind::NotFound
63        {
64            tracing::warn!(
65                ipc_path = %self.ipc_path.display(),
66                ?error,
67                "failed to remove dashboard IPC socket"
68            );
69        }
70    }
71}
72
73/// Starts the dashboard IPC runtime for an enabled target configuration.
74///
75/// # Arguments
76///
77/// - `config`: Validated dashboard IPC configuration.
78/// - `audit_config`: Root audit persistence configuration.
79/// - `spec`: Supervisor declaration used to build dashboard state.
80/// - `handle`: Runtime control handle used by command requests.
81///
82/// # Returns
83///
84/// Returns a guard that stops runtime tasks and removes the socket on drop.
85pub fn start_dashboard_ipc_runtime(
86    config: ValidatedDashboardIpcConfig,
87    audit_config: AuditConfig,
88    spec: SupervisorSpec,
89    handle: SupervisorHandle,
90) -> Result<Arc<DashboardIpcRuntimeGuard>, DashboardError> {
91    let listener = bind_dashboard_listener(&config)?;
92    let ipc_path = config.path.clone();
93    let target_id = config.target_id.clone();
94    let service = dashboard_service(config.clone(), audit_config, spec, handle);
95    let ipc_task = tokio::spawn(run_accept_loop(listener, service, target_id));
96    let heartbeat_task = start_heartbeat_task(config);
97
98    Ok(Arc::new(DashboardIpcRuntimeGuard {
99        ipc_path,
100        ipc_task,
101        heartbeat_task,
102    }))
103}
104
105/// Builds the service used by all socket connections.
106///
107/// When `config.security_config` is present, an IPC security pipeline is
108/// constructed with the root audit config and wired into the service via
109/// `with_security_pipeline`.
110fn dashboard_service(
111    config: ValidatedDashboardIpcConfig,
112    audit_config: AuditConfig,
113    spec: SupervisorSpec,
114    handle: SupervisorHandle,
115) -> Arc<DashboardIpcService> {
116    let state = declared_state_from_spec(&spec);
117    let journal = EventJournal::new(spec.event_channel_capacity);
118    let mut service =
119        DashboardIpcService::new(config.clone(), spec, state, journal).with_handle(handle);
120    if let Some(security_config) = config.security_config {
121        let pipeline = IpcSecurityPipeline::new(security_config, audit_config);
122        service = service.with_security_pipeline(pipeline);
123    }
124    Arc::new(service)
125}
126
127/// Starts the dynamic registration heartbeat when registration is enabled.
128fn start_heartbeat_task(config: ValidatedDashboardIpcConfig) -> Option<JoinHandle<()>> {
129    config.registration.as_ref()?;
130    Some(tokio::spawn(async move {
131        if let Err(error) = run_registration_heartbeat(config).await {
132            tracing::warn!(?error, "dashboard registration heartbeat stopped");
133        }
134    }))
135}
136
137/// Accepts target-side IPC connections until the listener fails or is aborted.
138async fn run_accept_loop(
139    listener: UnixListener,
140    service: Arc<DashboardIpcService>,
141    target_id: String,
142) {
143    let mut connections = JoinSet::new();
144    loop {
145        tokio::select! {
146            accepted = listener.accept() => {
147                match accepted {
148                    Ok((stream, _)) => {
149                        let service = Arc::clone(&service);
150                        let target_id = target_id.clone();
151                        connections.spawn(async move {
152                            handle_connection(stream, service, target_id).await
153                        });
154                    }
155                    Err(error) => {
156                        tracing::warn!(?error, "dashboard IPC accept loop stopped");
157                        break;
158                    }
159                }
160            }
161            Some(joined) = connections.join_next() => {
162                match joined {
163                    Ok(Ok(())) => {}
164                    Ok(Err(error)) => {
165                        tracing::warn!(?error, "dashboard IPC connection ended with error");
166                    }
167                    Err(error) => {
168                        tracing::warn!(?error, "dashboard IPC connection task failed");
169                    }
170                }
171            }
172        }
173    }
174}
175
176/// Handles one IPC connection with bounded frame reading, real peer
177/// credential extraction, and per-connection unique identifier.
178async fn handle_connection(
179    stream: UnixStream,
180    service: Arc<DashboardIpcService>,
181    target_id: String,
182) -> Result<(), DashboardError> {
183    // ---- extract real peer credential before wrapping into tokio ----
184    let std_stream = stream.into_std().map_err(|error| {
185        io_error(
186            "ipc_into_std_failed",
187            "ipc_connect",
188            Some(target_id.clone()),
189            error,
190        )
191    })?;
192    let peer = extract_peer_identity(&std_stream)?;
193    let raw_fd = std_stream.as_raw_fd();
194    let connection_id = format!(
195        "conn-{raw_fd}-{}",
196        CONNECTION_COUNTER.fetch_add(1, Ordering::Relaxed)
197    );
198    let stream = UnixStream::from_std(std_stream).map_err(|error| {
199        io_error(
200            "ipc_from_std_failed",
201            "ipc_connect",
202            Some(target_id.clone()),
203            error,
204        )
205    })?;
206
207    let mut reader = BoundedFrameReader::new(stream, DEFAULT_MAX_FRAME_BYTES);
208    loop {
209        match reader.read_frame().await {
210            Ok(Some(raw_frame)) => {
211                let raw_body_len = raw_frame.len();
212                let response =
213                    response_for_line(&service, &raw_frame, &peer, &connection_id, raw_body_len)
214                        .await;
215                write_response(&mut reader, &response, &target_id).await?;
216            }
217            Ok(None) => {
218                // EOF — peer closed connection gracefully
219                return Ok(());
220            }
221            Err(error) => {
222                return Err(error);
223            }
224        }
225    }
226}
227
228/// Bounded frame reader that limits each frame to `max_bytes` before
229/// allocating the target buffer.
230struct BoundedFrameReader {
231    /// Inner tokio stream.
232    stream: UnixStream,
233    /// Maximum frame size in bytes.
234    max_bytes: usize,
235    /// Read buffer reused across frames.
236    buf: Vec<u8>,
237}
238
239impl BoundedFrameReader {
240    /// Creates a new bounded frame reader.
241    fn new(stream: UnixStream, max_bytes: usize) -> Self {
242        Self {
243            stream,
244            max_bytes,
245            buf: Vec::with_capacity(max_bytes.min(4096)),
246        }
247    }
248
249    /// Reads one newline-delimited frame.
250    ///
251    /// Returns `Ok(Some(frame))` for a complete frame, `Ok(None)` for EOF
252    /// before any data, or `Err` when the frame exceeds `max_bytes` or a
253    /// read error occurs.
254    async fn read_frame(&mut self) -> Result<Option<String>, DashboardError> {
255        self.buf.clear();
256        loop {
257            let mut byte = [0u8; 1];
258            match self.stream.read_exact(&mut byte).await {
259                Ok(_bytes_read) => {
260                    if byte[0] == b'\n' {
261                        let frame = String::from_utf8(self.buf.clone()).map_err(|_| {
262                            DashboardError::new(
263                                "invalid_utf8",
264                                "ipc_read",
265                                None,
266                                "frame is not valid UTF-8".to_owned(),
267                                false,
268                            )
269                        })?;
270                        return Ok(Some(frame));
271                    }
272                    self.buf.push(byte[0]);
273                    if self.buf.len() > self.max_bytes {
274                        return Err(DashboardError::new(
275                            "frame_too_large",
276                            "ipc_read",
277                            None,
278                            format!("frame exceeded maximum size of {} bytes", self.max_bytes),
279                            false,
280                        ));
281                    }
282                }
283                Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
284                    if self.buf.is_empty() {
285                        return Ok(None);
286                    }
287                    return Err(DashboardError::new(
288                        "incomplete_frame",
289                        "ipc_read",
290                        None,
291                        "connection closed before newline delimiter".to_owned(),
292                        false,
293                    ));
294                }
295                Err(err) => {
296                    return Err(io_error("ipc_read_failed", "ipc_read", None, err));
297                }
298            }
299        }
300    }
301
302    /// Returns a mutable reference to the inner stream for writing.
303    fn stream_mut(&mut self) -> &mut UnixStream {
304        &mut self.stream
305    }
306}
307
308impl std::os::unix::io::AsRawFd for BoundedFrameReader {
309    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
310        self.stream.as_raw_fd()
311    }
312}
313
314/// Converts one request line into a response, passing connection context.
315async fn response_for_line(
316    service: &DashboardIpcService,
317    line: &str,
318    peer: &PeerIdentity,
319    connection_id: &str,
320    raw_body_len: usize,
321) -> IpcResponse {
322    match parse_request_line(line) {
323        Ok(request) => {
324            service
325                .handle_request(request, peer, connection_id, raw_body_len)
326                .await
327        }
328        Err(error) => IpcResponse::error("invalid-request", error),
329    }
330}
331
332/// Writes one response line to the socket.
333async fn write_response(
334    reader: &mut BoundedFrameReader,
335    response: &IpcResponse,
336    target_id: &str,
337) -> Result<(), DashboardError> {
338    let line = response_to_line(response)?;
339    reader
340        .stream_mut()
341        .write_all(line.as_bytes())
342        .await
343        .map_err(|error| {
344            io_error(
345                "ipc_write_failed",
346                "ipc_write",
347                Some(target_id.to_owned()),
348                error,
349            )
350        })
351}
352
353/// Creates a structured IPC runtime I/O error.
354fn io_error(
355    code: &str,
356    stage: &str,
357    target_id: Option<String>,
358    error: std::io::Error,
359) -> DashboardError {
360    DashboardError::new(code, stage, target_id, error.to_string(), true)
361}