Skip to main content

rust_supervisor/dashboard/
runtime.rs

1//! Dashboard IPC runtime lifecycle.
2//!
3//! The runtime owns the target-side Unix socket accept loop and the dynamic
4//! registration heartbeat used by the relay integration.
5
6use crate::control::handle::SupervisorHandle;
7use crate::dashboard::config::ValidatedDashboardIpcConfig;
8use crate::dashboard::error::DashboardError;
9use crate::dashboard::ipc_server::{DashboardIpcService, bind_dashboard_listener};
10use crate::dashboard::protocol::{IpcResponse, parse_request_line, response_to_line};
11use crate::dashboard::registration::run_registration_heartbeat;
12use crate::dashboard::state::declared_state_from_spec;
13use crate::journal::ring::EventJournal;
14use crate::spec::supervisor::SupervisorSpec;
15use std::fmt;
16use std::path::PathBuf;
17use std::sync::Arc;
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::net::{UnixListener, UnixStream};
20use tokio::task::{JoinHandle, JoinSet};
21
22/// Guard that owns dashboard IPC background tasks and socket cleanup.
23pub struct DashboardIpcRuntimeGuard {
24    /// Socket path created by this runtime.
25    ipc_path: PathBuf,
26    /// Target-side IPC accept task.
27    ipc_task: JoinHandle<()>,
28    /// Optional registration heartbeat task.
29    heartbeat_task: Option<JoinHandle<()>>,
30}
31
32impl fmt::Debug for DashboardIpcRuntimeGuard {
33    /// Formats guard diagnostics without exposing task internals.
34    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
35        formatter
36            .debug_struct("DashboardIpcRuntimeGuard")
37            .field("ipc_path", &self.ipc_path)
38            .field("has_heartbeat_task", &self.heartbeat_task.is_some())
39            .finish_non_exhaustive()
40    }
41}
42
43impl Drop for DashboardIpcRuntimeGuard {
44    /// Stops background tasks and removes the socket created by this runtime.
45    fn drop(&mut self) {
46        self.ipc_task.abort();
47        if let Some(task) = self.heartbeat_task.as_ref() {
48            task.abort();
49        }
50        if let Err(error) = std::fs::remove_file(&self.ipc_path) {
51            if error.kind() != std::io::ErrorKind::NotFound {
52                tracing::warn!(
53                    ipc_path = %self.ipc_path.display(),
54                    ?error,
55                    "failed to remove dashboard IPC socket"
56                );
57            }
58        }
59    }
60}
61
62/// Starts the dashboard IPC runtime for an enabled target configuration.
63///
64/// # Arguments
65///
66/// - `config`: Validated dashboard IPC configuration.
67/// - `spec`: Supervisor declaration used to build dashboard state.
68/// - `handle`: Runtime control handle used by command requests.
69///
70/// # Returns
71///
72/// Returns a guard that stops runtime tasks and removes the socket on drop.
73pub fn start_dashboard_ipc_runtime(
74    config: ValidatedDashboardIpcConfig,
75    spec: SupervisorSpec,
76    handle: SupervisorHandle,
77) -> Result<Arc<DashboardIpcRuntimeGuard>, DashboardError> {
78    let listener = bind_dashboard_listener(&config)?;
79    let ipc_path = config.path.clone();
80    let target_id = config.target_id.clone();
81    let service = dashboard_service(config.clone(), spec, handle);
82    let ipc_task = tokio::spawn(run_accept_loop(listener, service, target_id));
83    let heartbeat_task = start_heartbeat_task(config);
84
85    Ok(Arc::new(DashboardIpcRuntimeGuard {
86        ipc_path,
87        ipc_task,
88        heartbeat_task,
89    }))
90}
91
92/// Builds the service used by all socket connections.
93fn dashboard_service(
94    config: ValidatedDashboardIpcConfig,
95    spec: SupervisorSpec,
96    handle: SupervisorHandle,
97) -> Arc<DashboardIpcService> {
98    let state = declared_state_from_spec(&spec);
99    let journal = EventJournal::new(spec.event_channel_capacity);
100    Arc::new(DashboardIpcService::new(config, spec, state, journal).with_handle(handle))
101}
102
103/// Starts the dynamic registration heartbeat when registration is enabled.
104fn start_heartbeat_task(config: ValidatedDashboardIpcConfig) -> Option<JoinHandle<()>> {
105    config.registration.as_ref()?;
106    Some(tokio::spawn(async move {
107        if let Err(error) = run_registration_heartbeat(config).await {
108            tracing::warn!(?error, "dashboard registration heartbeat stopped");
109        }
110    }))
111}
112
113/// Accepts target-side IPC connections until the listener fails or is aborted.
114async fn run_accept_loop(
115    listener: UnixListener,
116    service: Arc<DashboardIpcService>,
117    target_id: String,
118) {
119    let mut connections = JoinSet::new();
120    loop {
121        tokio::select! {
122            accepted = listener.accept() => {
123                match accepted {
124                    Ok((stream, _)) => {
125                        let service = Arc::clone(&service);
126                        let target_id = target_id.clone();
127                        connections.spawn(async move {
128                            handle_connection(stream, service, target_id).await
129                        });
130                    }
131                    Err(error) => {
132                        tracing::warn!(?error, "dashboard IPC accept loop stopped");
133                        break;
134                    }
135                }
136            }
137            Some(joined) = connections.join_next() => {
138                match joined {
139                    Ok(Ok(())) => {}
140                    Ok(Err(error)) => {
141                        tracing::warn!(?error, "dashboard IPC connection ended with error");
142                    }
143                    Err(error) => {
144                        tracing::warn!(?error, "dashboard IPC connection task failed");
145                    }
146                }
147            }
148        }
149    }
150}
151
152/// Handles one newline-delimited JSON IPC connection.
153async fn handle_connection(
154    stream: UnixStream,
155    service: Arc<DashboardIpcService>,
156    target_id: String,
157) -> Result<(), DashboardError> {
158    let mut reader = BufReader::new(stream);
159    loop {
160        let mut line = String::new();
161        let bytes = reader.read_line(&mut line).await.map_err(|error| {
162            io_error(
163                "ipc_read_failed",
164                "ipc_read",
165                Some(target_id.clone()),
166                error,
167            )
168        })?;
169        if bytes == 0 {
170            return Ok(());
171        }
172        let response = response_for_line(&service, line.trim_end()).await;
173        write_response(&mut reader, &response, &target_id).await?;
174    }
175}
176
177/// Converts one request line into a response.
178async fn response_for_line(service: &DashboardIpcService, line: &str) -> IpcResponse {
179    match parse_request_line(line) {
180        Ok(request) => service.handle_request(request).await,
181        Err(error) => IpcResponse::error("invalid-request", error),
182    }
183}
184
185/// Writes one response line to the socket.
186async fn write_response(
187    reader: &mut BufReader<UnixStream>,
188    response: &IpcResponse,
189    target_id: &str,
190) -> Result<(), DashboardError> {
191    let line = response_to_line(response)?;
192    reader
193        .get_mut()
194        .write_all(line.as_bytes())
195        .await
196        .map_err(|error| {
197            io_error(
198                "ipc_write_failed",
199                "ipc_write",
200                Some(target_id.to_owned()),
201                error,
202            )
203        })
204}
205
206/// Creates a structured IPC runtime I/O error.
207fn io_error(
208    code: &str,
209    stage: &str,
210    target_id: Option<String>,
211    error: std::io::Error,
212) -> DashboardError {
213    DashboardError::new(code, stage, target_id, error.to_string(), true)
214}