rust_supervisor/dashboard/
runtime.rs1use crate::control::handle::SupervisorHandle;
7use crate::dashboard::config::ValidatedDashboardIpcConfig;
8use crate::dashboard::error::DashboardError;
9use crate::dashboard::ipc_server::{DashboardIpcService, bind_dashboard_listener};
10use crate::dashboard::protocol::{IpcResponse, parse_request_line, response_to_line};
11use crate::dashboard::registration::run_registration_heartbeat;
12use crate::dashboard::state::declared_state_from_spec;
13use crate::journal::ring::EventJournal;
14use crate::spec::supervisor::SupervisorSpec;
15use std::fmt;
16use std::path::PathBuf;
17use std::sync::Arc;
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::net::{UnixListener, UnixStream};
20use tokio::task::{JoinHandle, JoinSet};
21
22pub struct DashboardIpcRuntimeGuard {
24 ipc_path: PathBuf,
26 ipc_task: JoinHandle<()>,
28 heartbeat_task: Option<JoinHandle<()>>,
30}
31
32impl fmt::Debug for DashboardIpcRuntimeGuard {
33 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
35 formatter
36 .debug_struct("DashboardIpcRuntimeGuard")
37 .field("ipc_path", &self.ipc_path)
38 .field("has_heartbeat_task", &self.heartbeat_task.is_some())
39 .finish_non_exhaustive()
40 }
41}
42
43impl Drop for DashboardIpcRuntimeGuard {
44 fn drop(&mut self) {
46 self.ipc_task.abort();
47 if let Some(task) = self.heartbeat_task.as_ref() {
48 task.abort();
49 }
50 if let Err(error) = std::fs::remove_file(&self.ipc_path) {
51 if error.kind() != std::io::ErrorKind::NotFound {
52 tracing::warn!(
53 ipc_path = %self.ipc_path.display(),
54 ?error,
55 "failed to remove dashboard IPC socket"
56 );
57 }
58 }
59 }
60}
61
62pub fn start_dashboard_ipc_runtime(
74 config: ValidatedDashboardIpcConfig,
75 spec: SupervisorSpec,
76 handle: SupervisorHandle,
77) -> Result<Arc<DashboardIpcRuntimeGuard>, DashboardError> {
78 let listener = bind_dashboard_listener(&config)?;
79 let ipc_path = config.path.clone();
80 let target_id = config.target_id.clone();
81 let service = dashboard_service(config.clone(), spec, handle);
82 let ipc_task = tokio::spawn(run_accept_loop(listener, service, target_id));
83 let heartbeat_task = start_heartbeat_task(config);
84
85 Ok(Arc::new(DashboardIpcRuntimeGuard {
86 ipc_path,
87 ipc_task,
88 heartbeat_task,
89 }))
90}
91
92fn dashboard_service(
94 config: ValidatedDashboardIpcConfig,
95 spec: SupervisorSpec,
96 handle: SupervisorHandle,
97) -> Arc<DashboardIpcService> {
98 let state = declared_state_from_spec(&spec);
99 let journal = EventJournal::new(spec.event_channel_capacity);
100 Arc::new(DashboardIpcService::new(config, spec, state, journal).with_handle(handle))
101}
102
103fn start_heartbeat_task(config: ValidatedDashboardIpcConfig) -> Option<JoinHandle<()>> {
105 config.registration.as_ref()?;
106 Some(tokio::spawn(async move {
107 if let Err(error) = run_registration_heartbeat(config).await {
108 tracing::warn!(?error, "dashboard registration heartbeat stopped");
109 }
110 }))
111}
112
113async fn run_accept_loop(
115 listener: UnixListener,
116 service: Arc<DashboardIpcService>,
117 target_id: String,
118) {
119 let mut connections = JoinSet::new();
120 loop {
121 tokio::select! {
122 accepted = listener.accept() => {
123 match accepted {
124 Ok((stream, _)) => {
125 let service = Arc::clone(&service);
126 let target_id = target_id.clone();
127 connections.spawn(async move {
128 handle_connection(stream, service, target_id).await
129 });
130 }
131 Err(error) => {
132 tracing::warn!(?error, "dashboard IPC accept loop stopped");
133 break;
134 }
135 }
136 }
137 Some(joined) = connections.join_next() => {
138 match joined {
139 Ok(Ok(())) => {}
140 Ok(Err(error)) => {
141 tracing::warn!(?error, "dashboard IPC connection ended with error");
142 }
143 Err(error) => {
144 tracing::warn!(?error, "dashboard IPC connection task failed");
145 }
146 }
147 }
148 }
149 }
150}
151
152async fn handle_connection(
154 stream: UnixStream,
155 service: Arc<DashboardIpcService>,
156 target_id: String,
157) -> Result<(), DashboardError> {
158 let mut reader = BufReader::new(stream);
159 loop {
160 let mut line = String::new();
161 let bytes = reader.read_line(&mut line).await.map_err(|error| {
162 io_error(
163 "ipc_read_failed",
164 "ipc_read",
165 Some(target_id.clone()),
166 error,
167 )
168 })?;
169 if bytes == 0 {
170 return Ok(());
171 }
172 let response = response_for_line(&service, line.trim_end()).await;
173 write_response(&mut reader, &response, &target_id).await?;
174 }
175}
176
177async fn response_for_line(service: &DashboardIpcService, line: &str) -> IpcResponse {
179 match parse_request_line(line) {
180 Ok(request) => service.handle_request(request).await,
181 Err(error) => IpcResponse::error("invalid-request", error),
182 }
183}
184
185async fn write_response(
187 reader: &mut BufReader<UnixStream>,
188 response: &IpcResponse,
189 target_id: &str,
190) -> Result<(), DashboardError> {
191 let line = response_to_line(response)?;
192 reader
193 .get_mut()
194 .write_all(line.as_bytes())
195 .await
196 .map_err(|error| {
197 io_error(
198 "ipc_write_failed",
199 "ipc_write",
200 Some(target_id.to_owned()),
201 error,
202 )
203 })
204}
205
206fn io_error(
208 code: &str,
209 stage: &str,
210 target_id: Option<String>,
211 error: std::io::Error,
212) -> DashboardError {
213 DashboardError::new(code, stage, target_id, error.to_string(), true)
214}