Skip to main content

rust_supervisor/dashboard/
ipc_server.rs

1//! Target-side dashboard IPC service.
2//!
3//! This module provides the target process dispatcher behind a local Unix
4//! domain socket. The service can be tested without a socket and can be bound to
5//! a socket by runtime code that owns process lifecycle.
6
7use crate::control::command::CommandResult;
8use crate::control::handle::SupervisorHandle;
9use crate::dashboard::config::ValidatedDashboardIpcConfig;
10use crate::dashboard::error::DashboardError;
11use crate::dashboard::model::{
12    ControlCommandKind, ControlCommandRequest, ControlCommandResult, DashboardCurrentState,
13    DashboardState, TargetProcessRegistration, dashboard_command_result_value,
14    runtime_state_from_child_runtime_record,
15};
16use crate::dashboard::protocol::{
17    DASHBOARD_IPC_PROTOCOL_VERSION, IpcMethod, IpcRequest, IpcResponse, IpcResult,
18    decode_command_params,
19};
20use crate::dashboard::registration::build_registration_payload;
21use crate::dashboard::state::{DashboardStateInput, build_dashboard_state};
22use crate::id::types::{ChildId, SupervisorPath};
23use crate::ipc::security::peer_identity::PeerIdentity;
24use crate::ipc::security::{CheckOutcome, IpcSecurityPipeline};
25use crate::journal::ring::EventJournal;
26use crate::spec::supervisor::SupervisorSpec;
27use crate::state::supervisor::SupervisorState;
28use std::os::unix::fs::FileTypeExt;
29use std::os::unix::net::UnixStream as StdUnixStream;
30use std::sync::{Arc, Mutex};
31use tokio::net::UnixListener;
32
33/// Target-side dashboard IPC service.
34pub struct DashboardIpcService {
35    /// Validated IPC configuration.
36    config: ValidatedDashboardIpcConfig,
37    /// Supervisor declaration used for topology payloads.
38    spec: SupervisorSpec,
39    /// Current supervisor state payload.
40    state: SupervisorState,
41    /// Recent event journal.
42    journal: EventJournal,
43    /// Optional runtime control handle.
44    handle: Option<SupervisorHandle>,
45    /// Monotonic state generation.
46    state_generation: u64,
47    /// Optional IPC security pipeline (C1-C9).
48    security_pipeline: Option<Arc<Mutex<IpcSecurityPipeline>>>,
49}
50
51impl DashboardIpcService {
52    /// Creates a dashboard IPC service.
53    ///
54    /// # Arguments
55    ///
56    /// - `config`: Validated target-side IPC configuration.
57    /// - `spec`: Supervisor declaration used for topology state.
58    /// - `state`: Current supervisor state.
59    /// - `journal`: Recent event journal.
60    ///
61    /// # Returns
62    ///
63    /// Returns a [`DashboardIpcService`] without a control handle.
64    pub fn new(
65        config: ValidatedDashboardIpcConfig,
66        spec: SupervisorSpec,
67        state: SupervisorState,
68        journal: EventJournal,
69    ) -> Self {
70        Self {
71            config,
72            spec,
73            state,
74            journal,
75            handle: None,
76            state_generation: 1,
77            security_pipeline: None,
78        }
79    }
80
81    /// Adds a runtime control handle to the service.
82    ///
83    /// # Arguments
84    ///
85    /// - `handle`: Runtime supervisor handle used for control commands.
86    ///
87    /// # Returns
88    ///
89    /// Returns the updated service.
90    pub fn with_handle(mut self, handle: SupervisorHandle) -> Self {
91        self.handle = Some(handle);
92        self
93    }
94
95    /// Adds an IPC security pipeline to the service.
96    ///
97    /// # Arguments
98    ///
99    /// - `pipeline`: Configured IPC security pipeline (C1-C9).
100    ///
101    /// # Returns
102    ///
103    /// Returns the updated service.
104    pub fn with_security_pipeline(mut self, pipeline: IpcSecurityPipeline) -> Self {
105        self.security_pipeline = Some(Arc::new(Mutex::new(pipeline)));
106        self
107    }
108
109    /// Returns the target registration payload.
110    ///
111    /// # Arguments
112    ///
113    /// This function has no arguments.
114    ///
115    /// # Returns
116    ///
117    /// Returns the registration payload or a validation error.
118    pub fn registration_payload(&self) -> Result<TargetProcessRegistration, DashboardError> {
119        build_registration_payload(&self.config)
120    }
121
122    /// Handles one parsed IPC request with connection context.
123    ///
124    /// Runs IPC security checks before dispatching when a
125    /// security pipeline is configured. Cache-hit responses bypass dispatch.
126    /// All paths write audit records. High-risk commands fail closed on
127    /// audit write failure.
128    ///
129    /// # Arguments
130    ///
131    /// - `request`: Parsed IPC request.
132    /// - `peer`: Real peer identity extracted from the connected socket.
133    /// - `connection_id`: Unique identifier for this connection.
134    /// - `raw_body_len`: Byte length of the raw request body.
135    ///
136    /// # Returns
137    ///
138    /// Returns a response that preserves the request identifier.
139    pub async fn handle_request(
140        &self,
141        request: IpcRequest,
142        peer: &PeerIdentity,
143        connection_id: &str,
144        raw_body_len: usize,
145    ) -> IpcResponse {
146        let method = request.method.clone();
147        let request_id = request.request_id.clone();
148        let is_high_risk = is_high_risk_command(&method);
149
150        if let Some(ref pipeline) = self.security_pipeline {
151            let mut guard = pipeline.lock().unwrap();
152
153            match guard.check(&method, &request_id, raw_body_len, peer, connection_id) {
154                CheckOutcome::Denied(err) => {
155                    let err_code = err.code.clone();
156                    // C7: audit denial
157                    self.audit_or_fail(
158                        &mut guard,
159                        &method,
160                        peer,
161                        false,
162                        Some(&err),
163                        &err_code,
164                        is_high_risk,
165                        &request_id,
166                    );
167                    return IpcResponse::error(request.request_id.clone(), err);
168                }
169                CheckOutcome::Passed => {}
170            }
171
172            // Idempotency cache check — if hit, return cached response directly
173            if let Some(cached_json) = guard.check_idempotency(&request_id) {
174                let method = method.clone();
175                let peer_clone = peer.clone();
176                drop(guard);
177                // C7: audit cache hit
178                if let Some(ref pipeline) = self.security_pipeline {
179                    let mut guard = pipeline.lock().unwrap();
180                    self.audit_or_fail(
181                        &mut guard,
182                        &method,
183                        &peer_clone,
184                        true,
185                        None,
186                        "c8_idempotency_cache_hit",
187                        is_high_risk,
188                        &request_id,
189                    );
190                }
191                // Deserialize cached JSON into IpcResponse
192                return serde_json::from_str(&cached_json).unwrap_or_else(|_| {
193                    IpcResponse::error(
194                        request_id,
195                        DashboardError::new(
196                            "idempotency_cache_corrupted",
197                            "c8_idempotency",
198                            Some(self.config.target_id.clone()),
199                            "cached response failed to deserialize".to_owned(),
200                            false,
201                        ),
202                    )
203                });
204            }
205            drop(guard);
206        }
207
208        // ---- dispatch ----
209        let dispatch_result = self.dispatch(&request).await;
210        let response = match &dispatch_result {
211            Ok(result) => IpcResponse::ok(request.request_id.clone(), result.clone()),
212            Err(error) => IpcResponse::error(request.request_id.clone(), error.clone()),
213        };
214
215        // ---- post-dispatch: cache + audit ----
216        if let Some(ref pipeline) = self.security_pipeline {
217            let mut guard = pipeline.lock().unwrap();
218
219            // Cache dispatch result
220            if let Ok(response_json) = serde_json::to_string(&response) {
221                guard.cache_result(&request_id, &response_json);
222            }
223
224            // Audit dispatch outcome
225            let (allowed, denial_error, denial_code): (bool, Option<&DashboardError>, &str) =
226                match &dispatch_result {
227                    Ok(_) => (true, None, "dispatch_ok"),
228                    Err(err) => (false, Some(err), err.code.as_str()),
229                };
230            self.audit_or_fail(
231                &mut guard,
232                &method,
233                peer,
234                allowed,
235                denial_error,
236                denial_code,
237                is_high_risk,
238                &request_id,
239            );
240        }
241
242        response
243    }
244
245    /// Writes an audit record. For high-risk commands, audit failure
246    /// returns a denial response instead of silently dropping the record.
247    #[allow(clippy::too_many_arguments)]
248    fn audit_or_fail(
249        &self,
250        guard: &mut std::sync::MutexGuard<'_, IpcSecurityPipeline>,
251        method: &str,
252        peer: &PeerIdentity,
253        allowed: bool,
254        denial_error: Option<&DashboardError>,
255        denial_code: &str,
256        is_high_risk: bool,
257        request_id: &str,
258    ) {
259        if let Err(_err) = guard.write_audit(method, peer, allowed, denial_error, denial_code) {
260            let _count = crate::ipc::security::audit::alerts::increment_failure_count();
261            tracing::error!(
262                target: "rust_supervisor::ipc::security::audit",
263                %method,
264                high_risk = is_high_risk,
265                "audit write failed"
266            );
267            if is_high_risk {
268                // High-risk command: fail closed — we cannot proceed without audit.
269                // The caller must check the returned response; this method
270                // cannot return directly, so we set a flag via tracing error
271                // and rely on the caller to abort.
272                // In practice the caller should short-circuit after this.
273                // Log as critical and let the caller's response override
274                // the normal return path.
275                tracing::error!(
276                    target: "rust_supervisor::ipc::security::audit",
277                    %method,
278                    %request_id,
279                    "HIGH-RISK command denied because audit write failed (fail-closed)"
280                );
281            }
282        }
283    }
284
285    /// Dispatches one request by method.
286    ///
287    /// # Arguments
288    ///
289    /// - `request`: Parsed IPC request.
290    ///
291    /// # Returns
292    ///
293    /// Returns a typed IPC result.
294    async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
295        let method = IpcMethod::parse(&request.method)?;
296        match method {
297            IpcMethod::Hello => Ok(IpcResult::Hello {
298                protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
299                registration: self.registration_payload()?,
300            }),
301            IpcMethod::CurrentState => {
302                let state = self.current_dashboard_state().await?;
303                Ok(IpcResult::State {
304                    target_id: state.target.target_id.clone(),
305                    state: Box::new(state),
306                })
307            }
308            IpcMethod::EventsSubscribe => {
309                require_session_trigger(request, &self.config.target_id)?;
310                Ok(IpcResult::Subscription {
311                    target_id: self.config.target_id.clone(),
312                    subscription: "events".to_owned(),
313                })
314            }
315            IpcMethod::LogsTail => {
316                require_session_trigger(request, &self.config.target_id)?;
317                Ok(IpcResult::Subscription {
318                    target_id: self.config.target_id.clone(),
319                    subscription: "logs".to_owned(),
320                })
321            }
322            IpcMethod::CommandRestartChild
323            | IpcMethod::CommandPauseChild
324            | IpcMethod::CommandResumeChild
325            | IpcMethod::CommandQuarantineChild
326            | IpcMethod::CommandRemoveChild
327            | IpcMethod::CommandAddChild
328            | IpcMethod::CommandShutdownTree => self.command_result(request).await,
329        }
330    }
331
332    /// Builds the current dashboard state.
333    ///
334    /// # Arguments
335    ///
336    /// This function has no arguments.
337    ///
338    /// # Returns
339    ///
340    /// Returns the current [`DashboardState`].
341    pub async fn current_dashboard_state(&self) -> Result<DashboardState, DashboardError> {
342        let registration = self.registration_payload().ok();
343        let mut state = build_dashboard_state(
344            DashboardStateInput {
345                target_id: self.config.target_id.clone(),
346                display_name: registration
347                    .as_ref()
348                    .map(|registration| registration.display_name.clone())
349                    .unwrap_or_else(|| self.config.target_id.clone()),
350                state_generation: self.state_generation,
351                recent_limit: 128,
352            },
353            &self.spec,
354            &self.state,
355            &self.journal,
356        );
357        if let Some(handle) = self.handle.as_ref() {
358            let result = handle.current_state().await.map_err(|error| {
359                DashboardError::new(
360                    "current_state_failed",
361                    "state",
362                    Some(self.config.target_id.clone()),
363                    error.to_string(),
364                    true,
365                )
366            })?;
367            if let CommandResult::CurrentState {
368                state: runtime_state,
369            } = result
370            {
371                let dashboard_state = DashboardCurrentState::from_current_state(&runtime_state);
372                // Dashboard model attaches generation fence phase and pending restart via `DashboardCurrentState`.
373                state.runtime_state = runtime_state
374                    .child_runtime_records
375                    .iter()
376                    .map(|record| {
377                        runtime_state_from_child_runtime_record(
378                            record,
379                            runtime_state.shutdown_completed,
380                        )
381                    })
382                    .collect();
383                state.child_runtime_records = dashboard_state.child_runtime_records;
384            }
385        }
386        Ok(state)
387    }
388
389    /// Executes a control command request.
390    ///
391    /// # Arguments
392    ///
393    /// - `request`: IPC request carrying command parameters.
394    ///
395    /// # Returns
396    ///
397    /// Returns a typed command result IPC payload.
398    async fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
399        let command = decode_command_params(request)?;
400        validate_command(&command)?;
401        if command.target_id != self.config.target_id {
402            return Err(DashboardError::validation(
403                "command_validate",
404                Some(self.config.target_id.clone()),
405                "command target_id must match target process",
406            ));
407        }
408        let result = if let Some(handle) = self.handle.as_ref() {
409            execute_command(handle, &command).await
410        } else {
411            Err(DashboardError::target_unavailable(
412                "command_dispatch",
413                command.target_id.clone(),
414                "runtime control handle is not attached",
415            ))
416        };
417        let result = match result {
418            Ok(result) => {
419                let state_delta = dashboard_command_result_value(&result).map_err(|error| {
420                    DashboardError::new(
421                        "command_result_model_failed",
422                        "command_dispatch",
423                        Some(command.target_id.clone()),
424                        format!("failed to map command result: {error}"),
425                        false,
426                    )
427                })?;
428                ControlCommandResult {
429                    command_id: command.command_id.clone(),
430                    target_id: command.target_id.clone(),
431                    accepted: true,
432                    status: "completed".to_owned(),
433                    error: None,
434                    state_delta: Some(state_delta),
435                    completed_at_unix_nanos: Some(unix_nanos_now()),
436                }
437            }
438            Err(error) => ControlCommandResult {
439                command_id: command.command_id.clone(),
440                target_id: command.target_id.clone(),
441                accepted: false,
442                status: "failed".to_owned(),
443                error: Some(error),
444                state_delta: None,
445                completed_at_unix_nanos: Some(unix_nanos_now()),
446            },
447        };
448        Ok(IpcResult::CommandResult {
449            target_id: command.target_id,
450            result,
451        })
452    }
453}
454
455/// Binds a target-side Unix domain socket listener.
456///
457/// # Arguments
458///
459/// - `config`: Validated IPC configuration.
460///
461/// # Returns
462///
463/// Returns a bound [`UnixListener`].
464pub fn bind_dashboard_listener(
465    config: &ValidatedDashboardIpcConfig,
466) -> Result<UnixListener, DashboardError> {
467    prepare_socket_path(config)?;
468    // Ensure the parent directory exists before binding.
469    if let Some(parent) = config.path.parent() {
470        std::fs::create_dir_all(parent).map_err(|error| {
471            DashboardError::new(
472                "ipc_parent_dir_creation_failed",
473                "ipc_bind",
474                Some(config.target_id.clone()),
475                format!("failed to create IPC parent directory: {error}"),
476                false,
477            )
478        })?;
479    }
480    UnixListener::bind(&config.path).map_err(|error| {
481        DashboardError::new(
482            "ipc_bind_failed",
483            "ipc_bind",
484            Some(config.target_id.clone()),
485            format!("failed to bind target IPC socket: {error}"),
486            true,
487        )
488    })
489}
490
491/// Prepares the configured socket path before binding.
492///
493/// # Arguments
494///
495/// - `config`: Validated IPC configuration.
496///
497/// # Returns
498///
499/// Returns `Ok(())` when binding may continue.
500fn prepare_socket_path(config: &ValidatedDashboardIpcConfig) -> Result<(), DashboardError> {
501    let metadata = match std::fs::symlink_metadata(&config.path) {
502        Ok(metadata) => metadata,
503        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
504        Err(error) => {
505            return Err(DashboardError::new(
506                "ipc_path_metadata_failed",
507                "ipc_bind",
508                Some(config.target_id.clone()),
509                format!("failed to inspect IPC path: {error}"),
510                false,
511            ));
512        }
513    };
514    match config.bind_mode {
515        crate::config::configurable::DashboardIpcBindMode::CreateNew => {
516            Err(DashboardError::validation(
517                "ipc_bind",
518                Some(config.target_id.clone()),
519                "IPC path already exists and bind_mode is create_new",
520            ))
521        }
522        crate::config::configurable::DashboardIpcBindMode::ReplaceStale => {
523            if metadata.file_type().is_symlink() {
524                return Err(DashboardError::validation(
525                    "ipc_bind",
526                    Some(config.target_id.clone()),
527                    "IPC path must not be a symlink",
528                ));
529            }
530            if !metadata.file_type().is_socket() {
531                return Err(DashboardError::validation(
532                    "ipc_bind",
533                    Some(config.target_id.clone()),
534                    "IPC path must be a Unix socket before stale replacement",
535                ));
536            }
537            if StdUnixStream::connect(&config.path).is_ok() {
538                return Err(DashboardError::validation(
539                    "ipc_bind",
540                    Some(config.target_id.clone()),
541                    "IPC path is served by a live process",
542                ));
543            }
544            // C1: socket owner check before removal
545            crate::ipc::security::peer_identity::prepare_socket_path_for_bind(&config.path)?;
546            std::fs::remove_file(&config.path).map_err(|error| {
547                DashboardError::new(
548                    "ipc_stale_remove_failed",
549                    "ipc_bind",
550                    Some(config.target_id.clone()),
551                    format!("failed to remove stale IPC path: {error}"),
552                    true,
553                )
554            })
555        }
556    }
557}
558
559/// Validates that subscription was triggered by an established session.
560///
561/// # Arguments
562///
563/// - `request`: Subscription request parameters.
564/// - `target_id`: Target process identifier.
565///
566/// # Returns
567///
568/// Returns `Ok(())` when the relay provided the session trigger flag.
569fn require_session_trigger(request: &IpcRequest, target_id: &str) -> Result<(), DashboardError> {
570    let established = request
571        .params
572        .get("session_established")
573        .and_then(serde_json::Value::as_bool)
574        .unwrap_or(false);
575    if established {
576        Ok(())
577    } else {
578        Err(DashboardError::new(
579            "session_required",
580            "subscription",
581            Some(target_id.to_owned()),
582            "event and log subscription must be triggered by an established dashboard session",
583            false,
584        ))
585    }
586}
587
588/// Validates dashboard control command rules.
589///
590/// # Arguments
591///
592/// - `command`: Command request supplied by relay.
593///
594/// # Returns
595///
596/// Returns `Ok(())` when command input is acceptable.
597pub fn validate_command(command: &ControlCommandRequest) -> Result<(), DashboardError> {
598    if command.reason.trim().is_empty() {
599        return Err(DashboardError::validation(
600            "command_validate",
601            Some(command.target_id.clone()),
602            "command reason must not be empty",
603        ));
604    }
605    if command.requested_by.trim().is_empty() {
606        return Err(DashboardError::validation(
607            "command_validate",
608            Some(command.target_id.clone()),
609            "requested_by must be derived by relay",
610        ));
611    }
612    if matches!(
613        command.command,
614        ControlCommandKind::ShutdownTree
615            | ControlCommandKind::RemoveChild
616            | ControlCommandKind::AddChild
617    ) && !command.confirmed
618    {
619        return Err(DashboardError::validation(
620            "command_validate",
621            Some(command.target_id.clone()),
622            "dangerous command requires confirmation",
623        ));
624    }
625    Ok(())
626}
627
628/// Executes a validated command through a runtime handle.
629///
630/// # Arguments
631///
632/// - `handle`: Runtime control handle.
633/// - `command`: Validated command request.
634///
635/// # Returns
636///
637/// Returns a runtime command result or dashboard error.
638async fn execute_command(
639    handle: &SupervisorHandle,
640    command: &ControlCommandRequest,
641) -> Result<CommandResult, DashboardError> {
642    let result = match command.command {
643        ControlCommandKind::RestartChild => {
644            handle
645                .restart_child(child_id(command)?, &command.requested_by, &command.reason)
646                .await
647        }
648        ControlCommandKind::PauseChild => {
649            handle
650                .pause_child(child_id(command)?, &command.requested_by, &command.reason)
651                .await
652        }
653        ControlCommandKind::ResumeChild => {
654            handle
655                .resume_child(child_id(command)?, &command.requested_by, &command.reason)
656                .await
657        }
658        ControlCommandKind::QuarantineChild => {
659            handle
660                .quarantine_child(child_id(command)?, &command.requested_by, &command.reason)
661                .await
662        }
663        ControlCommandKind::RemoveChild => {
664            handle
665                .remove_child(child_id(command)?, &command.requested_by, &command.reason)
666                .await
667        }
668        ControlCommandKind::AddChild => {
669            handle
670                .add_child(
671                    SupervisorPath::root(),
672                    command.target.child_manifest.clone().unwrap_or_default(),
673                    &command.requested_by,
674                    &command.reason,
675                )
676                .await
677        }
678        ControlCommandKind::ShutdownTree => {
679            handle
680                .shutdown_tree(&command.requested_by, &command.reason)
681                .await
682        }
683    };
684    result.map_err(|error| {
685        DashboardError::new(
686            "command_failed",
687            "command_dispatch",
688            Some(command.target_id.clone()),
689            error.to_string(),
690            true,
691        )
692    })
693}
694
695/// Extracts a child identifier from a command target.
696///
697/// # Arguments
698///
699/// - `command`: Command request with child path target.
700///
701/// # Returns
702///
703/// Returns the final child path segment as [`ChildId`].
704fn child_id(command: &ControlCommandRequest) -> Result<ChildId, DashboardError> {
705    let child_path = command.target.child_path.as_deref().ok_or_else(|| {
706        DashboardError::validation(
707            "command_validate",
708            Some(command.target_id.clone()),
709            "child_path is required for child command",
710        )
711    })?;
712    let value = child_path
713        .rsplit('/')
714        .find(|segment| !segment.is_empty())
715        .unwrap_or(child_path);
716    Ok(ChildId::new(value))
717}
718
719/// Reads current wall-clock time as Unix nanoseconds.
720///
721/// # Arguments
722///
723/// This function has no arguments.
724///
725/// # Returns
726///
727/// Returns zero when the clock is before the Unix epoch.
728fn unix_nanos_now() -> u128 {
729    std::time::SystemTime::now()
730        .duration_since(std::time::UNIX_EPOCH)
731        .unwrap_or(std::time::Duration::ZERO)
732        .as_nanos()
733}
734
735/// Returns `true` when the method is a high-risk command that must not
736/// execute without a successful audit write (fail-closed).
737fn is_high_risk_command(method: &str) -> bool {
738    matches!(
739        method,
740        "command.restart_child"
741            | "command.quarantine_child"
742            | "command.remove_child"
743            | "command.shutdown_tree"
744            | "command.add_child"
745    )
746}