Skip to main content

rust_supervisor/dashboard/
ipc_server.rs

1//! Target-side dashboard IPC service.
2//!
3//! This module provides the target process dispatcher behind a local Unix
4//! domain socket. The service can be tested without a socket and can be bound to
5//! a socket by runtime code that owns process lifecycle.
6
7use crate::control::command::CommandResult;
8use crate::control::handle::SupervisorHandle;
9use crate::dashboard::config::ValidatedDashboardIpcConfig;
10use crate::dashboard::error::DashboardError;
11use crate::dashboard::model::{
12    ControlCommandKind, ControlCommandRequest, ControlCommandResult, DashboardState,
13    TargetProcessRegistration,
14};
15use crate::dashboard::protocol::{
16    DASHBOARD_IPC_PROTOCOL_VERSION, IpcMethod, IpcRequest, IpcResponse, IpcResult,
17    decode_command_params,
18};
19use crate::dashboard::registration::build_registration_payload;
20use crate::dashboard::state::{DashboardStateInput, build_dashboard_state};
21use crate::id::types::{ChildId, SupervisorPath};
22use crate::journal::ring::EventJournal;
23use crate::spec::supervisor::SupervisorSpec;
24use crate::state::supervisor::SupervisorState;
25use serde_json::json;
26use std::os::unix::fs::FileTypeExt;
27use std::os::unix::net::UnixStream as StdUnixStream;
28use tokio::net::UnixListener;
29
30/// Target-side dashboard IPC service.
31#[derive(Clone)]
32pub struct DashboardIpcService {
33    /// Validated IPC configuration.
34    config: ValidatedDashboardIpcConfig,
35    /// Supervisor declaration used for topology payloads.
36    spec: SupervisorSpec,
37    /// Current supervisor state payload.
38    state: SupervisorState,
39    /// Recent event journal.
40    journal: EventJournal,
41    /// Optional runtime control handle.
42    handle: Option<SupervisorHandle>,
43    /// Monotonic state generation.
44    state_generation: u64,
45}
46
47impl DashboardIpcService {
48    /// Creates a dashboard IPC service.
49    ///
50    /// # Arguments
51    ///
52    /// - `config`: Validated target-side IPC configuration.
53    /// - `spec`: Supervisor declaration used for topology state.
54    /// - `state`: Current supervisor state.
55    /// - `journal`: Recent event journal.
56    ///
57    /// # Returns
58    ///
59    /// Returns a [`DashboardIpcService`] without a control handle.
60    pub fn new(
61        config: ValidatedDashboardIpcConfig,
62        spec: SupervisorSpec,
63        state: SupervisorState,
64        journal: EventJournal,
65    ) -> Self {
66        Self {
67            config,
68            spec,
69            state,
70            journal,
71            handle: None,
72            state_generation: 1,
73        }
74    }
75
76    /// Adds a runtime control handle to the service.
77    ///
78    /// # Arguments
79    ///
80    /// - `handle`: Runtime supervisor handle used for control commands.
81    ///
82    /// # Returns
83    ///
84    /// Returns the updated service.
85    pub fn with_handle(mut self, handle: SupervisorHandle) -> Self {
86        self.handle = Some(handle);
87        self
88    }
89
90    /// Returns the target registration payload.
91    ///
92    /// # Arguments
93    ///
94    /// This function has no arguments.
95    ///
96    /// # Returns
97    ///
98    /// Returns the registration payload or a validation error.
99    pub fn registration_payload(&self) -> Result<TargetProcessRegistration, DashboardError> {
100        build_registration_payload(&self.config)
101    }
102
103    /// Handles one parsed IPC request.
104    ///
105    /// # Arguments
106    ///
107    /// - `request`: Parsed IPC request.
108    ///
109    /// # Returns
110    ///
111    /// Returns a response that preserves the request identifier.
112    pub async fn handle_request(&self, request: IpcRequest) -> IpcResponse {
113        match self.dispatch(&request).await {
114            Ok(result) => IpcResponse::ok(request.request_id, result),
115            Err(error) => IpcResponse::error(request.request_id, error),
116        }
117    }
118
119    /// Dispatches one request by method.
120    ///
121    /// # Arguments
122    ///
123    /// - `request`: Parsed IPC request.
124    ///
125    /// # Returns
126    ///
127    /// Returns a typed IPC result.
128    async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
129        let method = IpcMethod::parse(&request.method)?;
130        match method {
131            IpcMethod::Hello => Ok(IpcResult::Hello {
132                protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
133                registration: self.registration_payload()?,
134            }),
135            IpcMethod::CurrentState => {
136                let state = self.current_dashboard_state();
137                Ok(IpcResult::State {
138                    target_id: state.target.target_id.clone(),
139                    state: Box::new(state),
140                })
141            }
142            IpcMethod::EventsSubscribe => {
143                require_session_trigger(request, &self.config.target_id)?;
144                Ok(IpcResult::Subscription {
145                    target_id: self.config.target_id.clone(),
146                    subscription: "events".to_owned(),
147                })
148            }
149            IpcMethod::LogsTail => {
150                require_session_trigger(request, &self.config.target_id)?;
151                Ok(IpcResult::Subscription {
152                    target_id: self.config.target_id.clone(),
153                    subscription: "logs".to_owned(),
154                })
155            }
156            IpcMethod::CommandRestartChild
157            | IpcMethod::CommandPauseChild
158            | IpcMethod::CommandResumeChild
159            | IpcMethod::CommandQuarantineChild
160            | IpcMethod::CommandRemoveChild
161            | IpcMethod::CommandAddChild
162            | IpcMethod::CommandShutdownTree => self.command_result(request).await,
163        }
164    }
165
166    /// Builds the current dashboard state.
167    ///
168    /// # Arguments
169    ///
170    /// This function has no arguments.
171    ///
172    /// # Returns
173    ///
174    /// Returns the current [`DashboardState`].
175    pub fn current_dashboard_state(&self) -> DashboardState {
176        let registration = self.registration_payload().ok();
177        build_dashboard_state(
178            DashboardStateInput {
179                target_id: self.config.target_id.clone(),
180                display_name: registration
181                    .as_ref()
182                    .map(|registration| registration.display_name.clone())
183                    .unwrap_or_else(|| self.config.target_id.clone()),
184                state_generation: self.state_generation,
185                recent_limit: 128,
186            },
187            &self.spec,
188            &self.state,
189            &self.journal,
190        )
191    }
192
193    /// Executes a control command request.
194    ///
195    /// # Arguments
196    ///
197    /// - `request`: IPC request carrying command parameters.
198    ///
199    /// # Returns
200    ///
201    /// Returns a typed command result IPC payload.
202    async fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
203        let command = decode_command_params(request)?;
204        validate_command(&command)?;
205        if command.target_id != self.config.target_id {
206            return Err(DashboardError::validation(
207                "command_validate",
208                Some(self.config.target_id.clone()),
209                "command target_id must match target process",
210            ));
211        }
212        let result = if let Some(handle) = self.handle.as_ref() {
213            execute_command(handle, &command).await
214        } else {
215            Err(DashboardError::target_unavailable(
216                "command_dispatch",
217                command.target_id.clone(),
218                "runtime control handle is not attached",
219            ))
220        };
221        let result = match result {
222            Ok(result) => ControlCommandResult {
223                command_id: command.command_id.clone(),
224                target_id: command.target_id.clone(),
225                accepted: true,
226                status: "completed".to_owned(),
227                error: None,
228                state_delta: Some(json!(result)),
229                completed_at_unix_nanos: Some(unix_nanos_now()),
230            },
231            Err(error) => ControlCommandResult {
232                command_id: command.command_id.clone(),
233                target_id: command.target_id.clone(),
234                accepted: false,
235                status: "failed".to_owned(),
236                error: Some(error),
237                state_delta: None,
238                completed_at_unix_nanos: Some(unix_nanos_now()),
239            },
240        };
241        Ok(IpcResult::CommandResult {
242            target_id: command.target_id,
243            result,
244        })
245    }
246}
247
248/// Binds a target-side Unix domain socket listener.
249///
250/// # Arguments
251///
252/// - `config`: Validated IPC configuration.
253///
254/// # Returns
255///
256/// Returns a bound [`UnixListener`].
257pub fn bind_dashboard_listener(
258    config: &ValidatedDashboardIpcConfig,
259) -> Result<UnixListener, DashboardError> {
260    prepare_socket_path(config)?;
261    UnixListener::bind(&config.path).map_err(|error| {
262        DashboardError::new(
263            "ipc_bind_failed",
264            "ipc_bind",
265            Some(config.target_id.clone()),
266            format!("failed to bind target IPC socket: {error}"),
267            true,
268        )
269    })
270}
271
272/// Prepares the configured socket path before binding.
273///
274/// # Arguments
275///
276/// - `config`: Validated IPC configuration.
277///
278/// # Returns
279///
280/// Returns `Ok(())` when binding may continue.
281fn prepare_socket_path(config: &ValidatedDashboardIpcConfig) -> Result<(), DashboardError> {
282    let metadata = match std::fs::symlink_metadata(&config.path) {
283        Ok(metadata) => metadata,
284        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
285        Err(error) => {
286            return Err(DashboardError::new(
287                "ipc_path_metadata_failed",
288                "ipc_bind",
289                Some(config.target_id.clone()),
290                format!("failed to inspect IPC path: {error}"),
291                false,
292            ));
293        }
294    };
295    match config.bind_mode {
296        crate::config::configurable::DashboardIpcBindMode::CreateNew => {
297            Err(DashboardError::validation(
298                "ipc_bind",
299                Some(config.target_id.clone()),
300                "IPC path already exists and bind_mode is create_new",
301            ))
302        }
303        crate::config::configurable::DashboardIpcBindMode::ReplaceStale => {
304            if metadata.file_type().is_symlink() {
305                return Err(DashboardError::validation(
306                    "ipc_bind",
307                    Some(config.target_id.clone()),
308                    "IPC path must not be a symlink",
309                ));
310            }
311            if !metadata.file_type().is_socket() {
312                return Err(DashboardError::validation(
313                    "ipc_bind",
314                    Some(config.target_id.clone()),
315                    "IPC path must be a Unix socket before stale replacement",
316                ));
317            }
318            if StdUnixStream::connect(&config.path).is_ok() {
319                return Err(DashboardError::validation(
320                    "ipc_bind",
321                    Some(config.target_id.clone()),
322                    "IPC path is served by a live process",
323                ));
324            }
325            std::fs::remove_file(&config.path).map_err(|error| {
326                DashboardError::new(
327                    "ipc_stale_remove_failed",
328                    "ipc_bind",
329                    Some(config.target_id.clone()),
330                    format!("failed to remove stale IPC path: {error}"),
331                    true,
332                )
333            })
334        }
335    }
336}
337
338/// Validates that subscription was triggered by an established session.
339///
340/// # Arguments
341///
342/// - `request`: Subscription request parameters.
343/// - `target_id`: Target process identifier.
344///
345/// # Returns
346///
347/// Returns `Ok(())` when the relay provided the session trigger flag.
348fn require_session_trigger(request: &IpcRequest, target_id: &str) -> Result<(), DashboardError> {
349    let established = request
350        .params
351        .get("session_established")
352        .and_then(serde_json::Value::as_bool)
353        .unwrap_or(false);
354    if established {
355        Ok(())
356    } else {
357        Err(DashboardError::new(
358            "session_required",
359            "subscription",
360            Some(target_id.to_owned()),
361            "event and log subscription must be triggered by an established dashboard session",
362            false,
363        ))
364    }
365}
366
367/// Validates dashboard control command rules.
368///
369/// # Arguments
370///
371/// - `command`: Command request supplied by relay.
372///
373/// # Returns
374///
375/// Returns `Ok(())` when command input is acceptable.
376pub fn validate_command(command: &ControlCommandRequest) -> Result<(), DashboardError> {
377    if command.reason.trim().is_empty() {
378        return Err(DashboardError::validation(
379            "command_validate",
380            Some(command.target_id.clone()),
381            "command reason must not be empty",
382        ));
383    }
384    if command.requested_by.trim().is_empty() {
385        return Err(DashboardError::validation(
386            "command_validate",
387            Some(command.target_id.clone()),
388            "requested_by must be derived by relay",
389        ));
390    }
391    if matches!(
392        command.command,
393        ControlCommandKind::ShutdownTree
394            | ControlCommandKind::RemoveChild
395            | ControlCommandKind::AddChild
396    ) && !command.confirmed
397    {
398        return Err(DashboardError::validation(
399            "command_validate",
400            Some(command.target_id.clone()),
401            "dangerous command requires confirmation",
402        ));
403    }
404    Ok(())
405}
406
407/// Executes a validated command through a runtime handle.
408///
409/// # Arguments
410///
411/// - `handle`: Runtime control handle.
412/// - `command`: Validated command request.
413///
414/// # Returns
415///
416/// Returns a runtime command result or dashboard error.
417async fn execute_command(
418    handle: &SupervisorHandle,
419    command: &ControlCommandRequest,
420) -> Result<CommandResult, DashboardError> {
421    let result = match command.command {
422        ControlCommandKind::RestartChild => {
423            handle
424                .restart_child(child_id(command)?, &command.requested_by, &command.reason)
425                .await
426        }
427        ControlCommandKind::PauseChild => {
428            handle
429                .pause_child(child_id(command)?, &command.requested_by, &command.reason)
430                .await
431        }
432        ControlCommandKind::ResumeChild => {
433            handle
434                .resume_child(child_id(command)?, &command.requested_by, &command.reason)
435                .await
436        }
437        ControlCommandKind::QuarantineChild => {
438            handle
439                .quarantine_child(child_id(command)?, &command.requested_by, &command.reason)
440                .await
441        }
442        ControlCommandKind::RemoveChild => {
443            handle
444                .remove_child(child_id(command)?, &command.requested_by, &command.reason)
445                .await
446        }
447        ControlCommandKind::AddChild => {
448            handle
449                .add_child(
450                    SupervisorPath::root(),
451                    command.target.child_manifest.clone().unwrap_or_default(),
452                    &command.requested_by,
453                    &command.reason,
454                )
455                .await
456        }
457        ControlCommandKind::ShutdownTree => {
458            handle
459                .shutdown_tree(&command.requested_by, &command.reason)
460                .await
461        }
462    };
463    result.map_err(|error| {
464        DashboardError::new(
465            "command_failed",
466            "command_dispatch",
467            Some(command.target_id.clone()),
468            error.to_string(),
469            true,
470        )
471    })
472}
473
474/// Extracts a child identifier from a command target.
475///
476/// # Arguments
477///
478/// - `command`: Command request with child path target.
479///
480/// # Returns
481///
482/// Returns the final child path segment as [`ChildId`].
483fn child_id(command: &ControlCommandRequest) -> Result<ChildId, DashboardError> {
484    let child_path = command.target.child_path.as_deref().ok_or_else(|| {
485        DashboardError::validation(
486            "command_validate",
487            Some(command.target_id.clone()),
488            "child_path is required for child command",
489        )
490    })?;
491    let value = child_path
492        .rsplit('/')
493        .find(|segment| !segment.is_empty())
494        .unwrap_or(child_path);
495    Ok(ChildId::new(value))
496}
497
498/// Reads current wall-clock time as Unix nanoseconds.
499///
500/// # Arguments
501///
502/// This function has no arguments.
503///
504/// # Returns
505///
506/// Returns zero when the clock is before the Unix epoch.
507fn unix_nanos_now() -> u128 {
508    std::time::SystemTime::now()
509        .duration_since(std::time::UNIX_EPOCH)
510        .unwrap_or(std::time::Duration::ZERO)
511        .as_nanos()
512}