use crate::control::command::CommandResult;
use crate::control::handle::SupervisorHandle;
use crate::dashboard::config::ValidatedDashboardIpcConfig;
use crate::dashboard::error::DashboardError;
use crate::dashboard::model::{
ControlCommandKind, ControlCommandRequest, ControlCommandResult, DashboardCurrentState,
DashboardState, TargetProcessRegistration, dashboard_command_result_value,
runtime_state_from_child_runtime_record,
};
use crate::dashboard::protocol::{
DASHBOARD_IPC_PROTOCOL_VERSION, IpcMethod, IpcRequest, IpcResponse, IpcResult,
decode_command_params,
};
use crate::dashboard::registration::build_registration_payload;
use crate::dashboard::state::{DashboardStateInput, build_dashboard_state};
use crate::id::types::{ChildId, SupervisorPath};
use crate::ipc::security::peer_identity::PeerIdentity;
use crate::ipc::security::{CheckOutcome, IpcSecurityPipeline};
use crate::journal::ring::EventJournal;
use crate::spec::supervisor::SupervisorSpec;
use crate::state::supervisor::SupervisorState;
use std::os::unix::fs::FileTypeExt;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::sync::{Arc, Mutex};
use tokio::net::UnixListener;
pub struct DashboardIpcService {
config: ValidatedDashboardIpcConfig,
spec: SupervisorSpec,
state: SupervisorState,
journal: EventJournal,
handle: Option<SupervisorHandle>,
state_generation: u64,
security_pipeline: Option<Arc<Mutex<IpcSecurityPipeline>>>,
}
impl DashboardIpcService {
pub fn new(
config: ValidatedDashboardIpcConfig,
spec: SupervisorSpec,
state: SupervisorState,
journal: EventJournal,
) -> Self {
Self {
config,
spec,
state,
journal,
handle: None,
state_generation: 1,
security_pipeline: None,
}
}
pub fn with_handle(mut self, handle: SupervisorHandle) -> Self {
self.handle = Some(handle);
self
}
pub fn with_security_pipeline(mut self, pipeline: IpcSecurityPipeline) -> Self {
self.security_pipeline = Some(Arc::new(Mutex::new(pipeline)));
self
}
pub fn registration_payload(&self) -> Result<TargetProcessRegistration, DashboardError> {
build_registration_payload(&self.config)
}
pub async fn handle_request(
&self,
request: IpcRequest,
peer: &PeerIdentity,
connection_id: &str,
raw_body_len: usize,
) -> IpcResponse {
let method = request.method.clone();
let request_id = request.request_id.clone();
let is_high_risk = is_high_risk_command(&method);
if let Some(ref pipeline) = self.security_pipeline {
let mut guard = pipeline.lock().unwrap();
match guard.check(&method, &request_id, raw_body_len, peer, connection_id) {
CheckOutcome::Denied(err) => {
let err_code = err.code.clone();
self.audit_or_fail(
&mut guard,
&method,
peer,
false,
Some(&err),
&err_code,
is_high_risk,
&request_id,
);
return IpcResponse::error(request.request_id.clone(), err);
}
CheckOutcome::Passed => {}
}
if let Some(cached_json) = guard.check_idempotency(&request_id) {
let method = method.clone();
let peer_clone = peer.clone();
drop(guard);
if let Some(ref pipeline) = self.security_pipeline {
let mut guard = pipeline.lock().unwrap();
self.audit_or_fail(
&mut guard,
&method,
&peer_clone,
true,
None,
"c8_idempotency_cache_hit",
is_high_risk,
&request_id,
);
}
return serde_json::from_str(&cached_json).unwrap_or_else(|_| {
IpcResponse::error(
request_id,
DashboardError::new(
"idempotency_cache_corrupted",
"c8_idempotency",
Some(self.config.target_id.clone()),
"cached response failed to deserialize".to_owned(),
false,
),
)
});
}
drop(guard);
}
let dispatch_result = self.dispatch(&request).await;
let response = match &dispatch_result {
Ok(result) => IpcResponse::ok(request.request_id.clone(), result.clone()),
Err(error) => IpcResponse::error(request.request_id.clone(), error.clone()),
};
if let Some(ref pipeline) = self.security_pipeline {
let mut guard = pipeline.lock().unwrap();
if let Ok(response_json) = serde_json::to_string(&response) {
guard.cache_result(&request_id, &response_json);
}
let (allowed, denial_error, denial_code): (bool, Option<&DashboardError>, &str) =
match &dispatch_result {
Ok(_) => (true, None, "dispatch_ok"),
Err(err) => (false, Some(err), err.code.as_str()),
};
self.audit_or_fail(
&mut guard,
&method,
peer,
allowed,
denial_error,
denial_code,
is_high_risk,
&request_id,
);
}
response
}
#[allow(clippy::too_many_arguments)]
fn audit_or_fail(
&self,
guard: &mut std::sync::MutexGuard<'_, IpcSecurityPipeline>,
method: &str,
peer: &PeerIdentity,
allowed: bool,
denial_error: Option<&DashboardError>,
denial_code: &str,
is_high_risk: bool,
request_id: &str,
) {
if let Err(_err) = guard.write_audit(method, peer, allowed, denial_error, denial_code) {
let _count = crate::ipc::security::audit::alerts::increment_failure_count();
tracing::error!(
target: "rust_supervisor::ipc::security::audit",
%method,
high_risk = is_high_risk,
"audit write failed"
);
if is_high_risk {
tracing::error!(
target: "rust_supervisor::ipc::security::audit",
%method,
%request_id,
"HIGH-RISK command denied because audit write failed (fail-closed)"
);
}
}
}
async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
let method = IpcMethod::parse(&request.method)?;
match method {
IpcMethod::Hello => Ok(IpcResult::Hello {
protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
registration: self.registration_payload()?,
}),
IpcMethod::CurrentState => {
let state = self.current_dashboard_state().await?;
Ok(IpcResult::State {
target_id: state.target.target_id.clone(),
state: Box::new(state),
})
}
IpcMethod::EventsSubscribe => {
require_session_trigger(request, &self.config.target_id)?;
Ok(IpcResult::Subscription {
target_id: self.config.target_id.clone(),
subscription: "events".to_owned(),
})
}
IpcMethod::LogsTail => {
require_session_trigger(request, &self.config.target_id)?;
Ok(IpcResult::Subscription {
target_id: self.config.target_id.clone(),
subscription: "logs".to_owned(),
})
}
IpcMethod::CommandRestartChild
| IpcMethod::CommandPauseChild
| IpcMethod::CommandResumeChild
| IpcMethod::CommandQuarantineChild
| IpcMethod::CommandRemoveChild
| IpcMethod::CommandAddChild
| IpcMethod::CommandShutdownTree => self.command_result(request).await,
}
}
pub async fn current_dashboard_state(&self) -> Result<DashboardState, DashboardError> {
let registration = self.registration_payload().ok();
let mut state = build_dashboard_state(
DashboardStateInput {
target_id: self.config.target_id.clone(),
display_name: registration
.as_ref()
.map(|registration| registration.display_name.clone())
.unwrap_or_else(|| self.config.target_id.clone()),
state_generation: self.state_generation,
recent_limit: 128,
},
&self.spec,
&self.state,
&self.journal,
);
if let Some(handle) = self.handle.as_ref() {
let result = handle.current_state().await.map_err(|error| {
DashboardError::new(
"current_state_failed",
"state",
Some(self.config.target_id.clone()),
error.to_string(),
true,
)
})?;
if let CommandResult::CurrentState {
state: runtime_state,
} = result
{
let dashboard_state = DashboardCurrentState::from_current_state(&runtime_state);
state.runtime_state = runtime_state
.child_runtime_records
.iter()
.map(|record| {
runtime_state_from_child_runtime_record(
record,
runtime_state.shutdown_completed,
)
})
.collect();
state.child_runtime_records = dashboard_state.child_runtime_records;
}
}
Ok(state)
}
async fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
let command = decode_command_params(request)?;
validate_command(&command)?;
if command.target_id != self.config.target_id {
return Err(DashboardError::validation(
"command_validate",
Some(self.config.target_id.clone()),
"command target_id must match target process",
));
}
let result = if let Some(handle) = self.handle.as_ref() {
execute_command(handle, &command).await
} else {
Err(DashboardError::target_unavailable(
"command_dispatch",
command.target_id.clone(),
"runtime control handle is not attached",
))
};
let result = match result {
Ok(result) => {
let state_delta = dashboard_command_result_value(&result).map_err(|error| {
DashboardError::new(
"command_result_model_failed",
"command_dispatch",
Some(command.target_id.clone()),
format!("failed to map command result: {error}"),
false,
)
})?;
ControlCommandResult {
command_id: command.command_id.clone(),
target_id: command.target_id.clone(),
accepted: true,
status: "completed".to_owned(),
error: None,
state_delta: Some(state_delta),
completed_at_unix_nanos: Some(unix_nanos_now()),
}
}
Err(error) => ControlCommandResult {
command_id: command.command_id.clone(),
target_id: command.target_id.clone(),
accepted: false,
status: "failed".to_owned(),
error: Some(error),
state_delta: None,
completed_at_unix_nanos: Some(unix_nanos_now()),
},
};
Ok(IpcResult::CommandResult {
target_id: command.target_id,
result,
})
}
}
pub fn bind_dashboard_listener(
config: &ValidatedDashboardIpcConfig,
) -> Result<UnixListener, DashboardError> {
prepare_socket_path(config)?;
if let Some(parent) = config.path.parent() {
std::fs::create_dir_all(parent).map_err(|error| {
DashboardError::new(
"ipc_parent_dir_creation_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to create IPC parent directory: {error}"),
false,
)
})?;
}
UnixListener::bind(&config.path).map_err(|error| {
DashboardError::new(
"ipc_bind_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to bind target IPC socket: {error}"),
true,
)
})
}
fn prepare_socket_path(config: &ValidatedDashboardIpcConfig) -> Result<(), DashboardError> {
let metadata = match std::fs::symlink_metadata(&config.path) {
Ok(metadata) => metadata,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(error) => {
return Err(DashboardError::new(
"ipc_path_metadata_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to inspect IPC path: {error}"),
false,
));
}
};
match config.bind_mode {
crate::config::configurable::DashboardIpcBindMode::CreateNew => {
Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path already exists and bind_mode is create_new",
))
}
crate::config::configurable::DashboardIpcBindMode::ReplaceStale => {
if metadata.file_type().is_symlink() {
return Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path must not be a symlink",
));
}
if !metadata.file_type().is_socket() {
return Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path must be a Unix socket before stale replacement",
));
}
if StdUnixStream::connect(&config.path).is_ok() {
return Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path is served by a live process",
));
}
crate::ipc::security::peer_identity::prepare_socket_path_for_bind(&config.path)?;
std::fs::remove_file(&config.path).map_err(|error| {
DashboardError::new(
"ipc_stale_remove_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to remove stale IPC path: {error}"),
true,
)
})
}
}
}
fn require_session_trigger(request: &IpcRequest, target_id: &str) -> Result<(), DashboardError> {
let established = request
.params
.get("session_established")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
if established {
Ok(())
} else {
Err(DashboardError::new(
"session_required",
"subscription",
Some(target_id.to_owned()),
"event and log subscription must be triggered by an established dashboard session",
false,
))
}
}
pub fn validate_command(command: &ControlCommandRequest) -> Result<(), DashboardError> {
if command.reason.trim().is_empty() {
return Err(DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"command reason must not be empty",
));
}
if command.requested_by.trim().is_empty() {
return Err(DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"requested_by must be derived by relay",
));
}
if matches!(
command.command,
ControlCommandKind::ShutdownTree
| ControlCommandKind::RemoveChild
| ControlCommandKind::AddChild
) && !command.confirmed
{
return Err(DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"dangerous command requires confirmation",
));
}
Ok(())
}
async fn execute_command(
handle: &SupervisorHandle,
command: &ControlCommandRequest,
) -> Result<CommandResult, DashboardError> {
let result = match command.command {
ControlCommandKind::RestartChild => {
handle
.restart_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::PauseChild => {
handle
.pause_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::ResumeChild => {
handle
.resume_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::QuarantineChild => {
handle
.quarantine_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::RemoveChild => {
handle
.remove_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::AddChild => {
handle
.add_child(
SupervisorPath::root(),
command.target.child_manifest.clone().unwrap_or_default(),
&command.requested_by,
&command.reason,
)
.await
}
ControlCommandKind::ShutdownTree => {
handle
.shutdown_tree(&command.requested_by, &command.reason)
.await
}
};
result.map_err(|error| {
DashboardError::new(
"command_failed",
"command_dispatch",
Some(command.target_id.clone()),
error.to_string(),
true,
)
})
}
fn child_id(command: &ControlCommandRequest) -> Result<ChildId, DashboardError> {
let child_path = command.target.child_path.as_deref().ok_or_else(|| {
DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"child_path is required for child command",
)
})?;
let value = child_path
.rsplit('/')
.find(|segment| !segment.is_empty())
.unwrap_or(child_path);
Ok(ChildId::new(value))
}
fn unix_nanos_now() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::ZERO)
.as_nanos()
}
fn is_high_risk_command(method: &str) -> bool {
matches!(
method,
"command.restart_child"
| "command.quarantine_child"
| "command.remove_child"
| "command.shutdown_tree"
| "command.add_child"
)
}