rust_supervisor/dashboard/
ipc_server.rs1use crate::control::command::CommandResult;
8use crate::control::handle::SupervisorHandle;
9use crate::dashboard::config::ValidatedDashboardIpcConfig;
10use crate::dashboard::error::DashboardError;
11use crate::dashboard::model::{
12 ControlCommandKind, ControlCommandRequest, ControlCommandResult, DashboardState,
13 TargetProcessRegistration,
14};
15use crate::dashboard::protocol::{
16 DASHBOARD_IPC_PROTOCOL_VERSION, IpcMethod, IpcRequest, IpcResponse, IpcResult,
17 decode_command_params,
18};
19use crate::dashboard::registration::build_registration_payload;
20use crate::dashboard::state::{DashboardStateInput, build_dashboard_state};
21use crate::id::types::{ChildId, SupervisorPath};
22use crate::journal::ring::EventJournal;
23use crate::spec::supervisor::SupervisorSpec;
24use crate::state::supervisor::SupervisorState;
25use serde_json::json;
26use std::os::unix::fs::FileTypeExt;
27use std::os::unix::net::UnixStream as StdUnixStream;
28use tokio::net::UnixListener;
29
30#[derive(Clone)]
32pub struct DashboardIpcService {
33 config: ValidatedDashboardIpcConfig,
35 spec: SupervisorSpec,
37 state: SupervisorState,
39 journal: EventJournal,
41 handle: Option<SupervisorHandle>,
43 state_generation: u64,
45}
46
47impl DashboardIpcService {
48 pub fn new(
61 config: ValidatedDashboardIpcConfig,
62 spec: SupervisorSpec,
63 state: SupervisorState,
64 journal: EventJournal,
65 ) -> Self {
66 Self {
67 config,
68 spec,
69 state,
70 journal,
71 handle: None,
72 state_generation: 1,
73 }
74 }
75
76 pub fn with_handle(mut self, handle: SupervisorHandle) -> Self {
86 self.handle = Some(handle);
87 self
88 }
89
90 pub fn registration_payload(&self) -> Result<TargetProcessRegistration, DashboardError> {
100 build_registration_payload(&self.config)
101 }
102
103 pub async fn handle_request(&self, request: IpcRequest) -> IpcResponse {
113 match self.dispatch(&request).await {
114 Ok(result) => IpcResponse::ok(request.request_id, result),
115 Err(error) => IpcResponse::error(request.request_id, error),
116 }
117 }
118
119 async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
129 let method = IpcMethod::parse(&request.method)?;
130 match method {
131 IpcMethod::Hello => Ok(IpcResult::Hello {
132 protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
133 registration: self.registration_payload()?,
134 }),
135 IpcMethod::CurrentState => {
136 let state = self.current_dashboard_state();
137 Ok(IpcResult::State {
138 target_id: state.target.target_id.clone(),
139 state: Box::new(state),
140 })
141 }
142 IpcMethod::EventsSubscribe => {
143 require_session_trigger(request, &self.config.target_id)?;
144 Ok(IpcResult::Subscription {
145 target_id: self.config.target_id.clone(),
146 subscription: "events".to_owned(),
147 })
148 }
149 IpcMethod::LogsTail => {
150 require_session_trigger(request, &self.config.target_id)?;
151 Ok(IpcResult::Subscription {
152 target_id: self.config.target_id.clone(),
153 subscription: "logs".to_owned(),
154 })
155 }
156 IpcMethod::CommandRestartChild
157 | IpcMethod::CommandPauseChild
158 | IpcMethod::CommandResumeChild
159 | IpcMethod::CommandQuarantineChild
160 | IpcMethod::CommandRemoveChild
161 | IpcMethod::CommandAddChild
162 | IpcMethod::CommandShutdownTree => self.command_result(request).await,
163 }
164 }
165
166 pub fn current_dashboard_state(&self) -> DashboardState {
176 let registration = self.registration_payload().ok();
177 build_dashboard_state(
178 DashboardStateInput {
179 target_id: self.config.target_id.clone(),
180 display_name: registration
181 .as_ref()
182 .map(|registration| registration.display_name.clone())
183 .unwrap_or_else(|| self.config.target_id.clone()),
184 state_generation: self.state_generation,
185 recent_limit: 128,
186 },
187 &self.spec,
188 &self.state,
189 &self.journal,
190 )
191 }
192
193 async fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
203 let command = decode_command_params(request)?;
204 validate_command(&command)?;
205 if command.target_id != self.config.target_id {
206 return Err(DashboardError::validation(
207 "command_validate",
208 Some(self.config.target_id.clone()),
209 "command target_id must match target process",
210 ));
211 }
212 let result = if let Some(handle) = self.handle.as_ref() {
213 execute_command(handle, &command).await
214 } else {
215 Err(DashboardError::target_unavailable(
216 "command_dispatch",
217 command.target_id.clone(),
218 "runtime control handle is not attached",
219 ))
220 };
221 let result = match result {
222 Ok(result) => ControlCommandResult {
223 command_id: command.command_id.clone(),
224 target_id: command.target_id.clone(),
225 accepted: true,
226 status: "completed".to_owned(),
227 error: None,
228 state_delta: Some(json!(result)),
229 completed_at_unix_nanos: Some(unix_nanos_now()),
230 },
231 Err(error) => ControlCommandResult {
232 command_id: command.command_id.clone(),
233 target_id: command.target_id.clone(),
234 accepted: false,
235 status: "failed".to_owned(),
236 error: Some(error),
237 state_delta: None,
238 completed_at_unix_nanos: Some(unix_nanos_now()),
239 },
240 };
241 Ok(IpcResult::CommandResult {
242 target_id: command.target_id,
243 result,
244 })
245 }
246}
247
248pub fn bind_dashboard_listener(
258 config: &ValidatedDashboardIpcConfig,
259) -> Result<UnixListener, DashboardError> {
260 prepare_socket_path(config)?;
261 UnixListener::bind(&config.path).map_err(|error| {
262 DashboardError::new(
263 "ipc_bind_failed",
264 "ipc_bind",
265 Some(config.target_id.clone()),
266 format!("failed to bind target IPC socket: {error}"),
267 true,
268 )
269 })
270}
271
272fn prepare_socket_path(config: &ValidatedDashboardIpcConfig) -> Result<(), DashboardError> {
282 let metadata = match std::fs::symlink_metadata(&config.path) {
283 Ok(metadata) => metadata,
284 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
285 Err(error) => {
286 return Err(DashboardError::new(
287 "ipc_path_metadata_failed",
288 "ipc_bind",
289 Some(config.target_id.clone()),
290 format!("failed to inspect IPC path: {error}"),
291 false,
292 ));
293 }
294 };
295 match config.bind_mode {
296 crate::config::configurable::DashboardIpcBindMode::CreateNew => {
297 Err(DashboardError::validation(
298 "ipc_bind",
299 Some(config.target_id.clone()),
300 "IPC path already exists and bind_mode is create_new",
301 ))
302 }
303 crate::config::configurable::DashboardIpcBindMode::ReplaceStale => {
304 if metadata.file_type().is_symlink() {
305 return Err(DashboardError::validation(
306 "ipc_bind",
307 Some(config.target_id.clone()),
308 "IPC path must not be a symlink",
309 ));
310 }
311 if !metadata.file_type().is_socket() {
312 return Err(DashboardError::validation(
313 "ipc_bind",
314 Some(config.target_id.clone()),
315 "IPC path must be a Unix socket before stale replacement",
316 ));
317 }
318 if StdUnixStream::connect(&config.path).is_ok() {
319 return Err(DashboardError::validation(
320 "ipc_bind",
321 Some(config.target_id.clone()),
322 "IPC path is served by a live process",
323 ));
324 }
325 std::fs::remove_file(&config.path).map_err(|error| {
326 DashboardError::new(
327 "ipc_stale_remove_failed",
328 "ipc_bind",
329 Some(config.target_id.clone()),
330 format!("failed to remove stale IPC path: {error}"),
331 true,
332 )
333 })
334 }
335 }
336}
337
338fn require_session_trigger(request: &IpcRequest, target_id: &str) -> Result<(), DashboardError> {
349 let established = request
350 .params
351 .get("session_established")
352 .and_then(serde_json::Value::as_bool)
353 .unwrap_or(false);
354 if established {
355 Ok(())
356 } else {
357 Err(DashboardError::new(
358 "session_required",
359 "subscription",
360 Some(target_id.to_owned()),
361 "event and log subscription must be triggered by an established dashboard session",
362 false,
363 ))
364 }
365}
366
367pub fn validate_command(command: &ControlCommandRequest) -> Result<(), DashboardError> {
377 if command.reason.trim().is_empty() {
378 return Err(DashboardError::validation(
379 "command_validate",
380 Some(command.target_id.clone()),
381 "command reason must not be empty",
382 ));
383 }
384 if command.requested_by.trim().is_empty() {
385 return Err(DashboardError::validation(
386 "command_validate",
387 Some(command.target_id.clone()),
388 "requested_by must be derived by relay",
389 ));
390 }
391 if matches!(
392 command.command,
393 ControlCommandKind::ShutdownTree
394 | ControlCommandKind::RemoveChild
395 | ControlCommandKind::AddChild
396 ) && !command.confirmed
397 {
398 return Err(DashboardError::validation(
399 "command_validate",
400 Some(command.target_id.clone()),
401 "dangerous command requires confirmation",
402 ));
403 }
404 Ok(())
405}
406
407async fn execute_command(
418 handle: &SupervisorHandle,
419 command: &ControlCommandRequest,
420) -> Result<CommandResult, DashboardError> {
421 let result = match command.command {
422 ControlCommandKind::RestartChild => {
423 handle
424 .restart_child(child_id(command)?, &command.requested_by, &command.reason)
425 .await
426 }
427 ControlCommandKind::PauseChild => {
428 handle
429 .pause_child(child_id(command)?, &command.requested_by, &command.reason)
430 .await
431 }
432 ControlCommandKind::ResumeChild => {
433 handle
434 .resume_child(child_id(command)?, &command.requested_by, &command.reason)
435 .await
436 }
437 ControlCommandKind::QuarantineChild => {
438 handle
439 .quarantine_child(child_id(command)?, &command.requested_by, &command.reason)
440 .await
441 }
442 ControlCommandKind::RemoveChild => {
443 handle
444 .remove_child(child_id(command)?, &command.requested_by, &command.reason)
445 .await
446 }
447 ControlCommandKind::AddChild => {
448 handle
449 .add_child(
450 SupervisorPath::root(),
451 command.target.child_manifest.clone().unwrap_or_default(),
452 &command.requested_by,
453 &command.reason,
454 )
455 .await
456 }
457 ControlCommandKind::ShutdownTree => {
458 handle
459 .shutdown_tree(&command.requested_by, &command.reason)
460 .await
461 }
462 };
463 result.map_err(|error| {
464 DashboardError::new(
465 "command_failed",
466 "command_dispatch",
467 Some(command.target_id.clone()),
468 error.to_string(),
469 true,
470 )
471 })
472}
473
474fn child_id(command: &ControlCommandRequest) -> Result<ChildId, DashboardError> {
484 let child_path = command.target.child_path.as_deref().ok_or_else(|| {
485 DashboardError::validation(
486 "command_validate",
487 Some(command.target_id.clone()),
488 "child_path is required for child command",
489 )
490 })?;
491 let value = child_path
492 .rsplit('/')
493 .find(|segment| !segment.is_empty())
494 .unwrap_or(child_path);
495 Ok(ChildId::new(value))
496}
497
498fn unix_nanos_now() -> u128 {
508 std::time::SystemTime::now()
509 .duration_since(std::time::UNIX_EPOCH)
510 .unwrap_or(std::time::Duration::ZERO)
511 .as_nanos()
512}