use crate::control::command::CommandResult;
use crate::control::handle::SupervisorHandle;
use crate::dashboard::config::ValidatedDashboardIpcConfig;
use crate::dashboard::error::DashboardError;
use crate::dashboard::model::{
ControlCommandKind, ControlCommandRequest, ControlCommandResult, DashboardState,
TargetProcessRegistration,
};
use crate::dashboard::protocol::{
DASHBOARD_IPC_PROTOCOL_VERSION, IpcMethod, IpcRequest, IpcResponse, IpcResult,
decode_command_params,
};
use crate::dashboard::registration::build_registration_payload;
use crate::dashboard::state::{DashboardStateInput, build_dashboard_state};
use crate::id::types::{ChildId, SupervisorPath};
use crate::journal::ring::EventJournal;
use crate::spec::supervisor::SupervisorSpec;
use crate::state::supervisor::SupervisorState;
use serde_json::json;
use std::os::unix::fs::FileTypeExt;
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::net::UnixListener;
#[derive(Clone)]
pub struct DashboardIpcService {
config: ValidatedDashboardIpcConfig,
spec: SupervisorSpec,
state: SupervisorState,
journal: EventJournal,
handle: Option<SupervisorHandle>,
state_generation: u64,
}
impl DashboardIpcService {
pub fn new(
config: ValidatedDashboardIpcConfig,
spec: SupervisorSpec,
state: SupervisorState,
journal: EventJournal,
) -> Self {
Self {
config,
spec,
state,
journal,
handle: None,
state_generation: 1,
}
}
pub fn with_handle(mut self, handle: SupervisorHandle) -> Self {
self.handle = Some(handle);
self
}
pub fn registration_payload(&self) -> Result<TargetProcessRegistration, DashboardError> {
build_registration_payload(&self.config)
}
pub async fn handle_request(&self, request: IpcRequest) -> IpcResponse {
match self.dispatch(&request).await {
Ok(result) => IpcResponse::ok(request.request_id, result),
Err(error) => IpcResponse::error(request.request_id, error),
}
}
async fn dispatch(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
let method = IpcMethod::parse(&request.method)?;
match method {
IpcMethod::Hello => Ok(IpcResult::Hello {
protocol_version: DASHBOARD_IPC_PROTOCOL_VERSION.to_owned(),
registration: self.registration_payload()?,
}),
IpcMethod::CurrentState => {
let state = self.current_dashboard_state();
Ok(IpcResult::State {
target_id: state.target.target_id.clone(),
state: Box::new(state),
})
}
IpcMethod::EventsSubscribe => {
require_session_trigger(request, &self.config.target_id)?;
Ok(IpcResult::Subscription {
target_id: self.config.target_id.clone(),
subscription: "events".to_owned(),
})
}
IpcMethod::LogsTail => {
require_session_trigger(request, &self.config.target_id)?;
Ok(IpcResult::Subscription {
target_id: self.config.target_id.clone(),
subscription: "logs".to_owned(),
})
}
IpcMethod::CommandRestartChild
| IpcMethod::CommandPauseChild
| IpcMethod::CommandResumeChild
| IpcMethod::CommandQuarantineChild
| IpcMethod::CommandRemoveChild
| IpcMethod::CommandAddChild
| IpcMethod::CommandShutdownTree => self.command_result(request).await,
}
}
pub fn current_dashboard_state(&self) -> DashboardState {
let registration = self.registration_payload().ok();
build_dashboard_state(
DashboardStateInput {
target_id: self.config.target_id.clone(),
display_name: registration
.as_ref()
.map(|registration| registration.display_name.clone())
.unwrap_or_else(|| self.config.target_id.clone()),
state_generation: self.state_generation,
recent_limit: 128,
},
&self.spec,
&self.state,
&self.journal,
)
}
async fn command_result(&self, request: &IpcRequest) -> Result<IpcResult, DashboardError> {
let command = decode_command_params(request)?;
validate_command(&command)?;
if command.target_id != self.config.target_id {
return Err(DashboardError::validation(
"command_validate",
Some(self.config.target_id.clone()),
"command target_id must match target process",
));
}
let result = if let Some(handle) = self.handle.as_ref() {
execute_command(handle, &command).await
} else {
Err(DashboardError::target_unavailable(
"command_dispatch",
command.target_id.clone(),
"runtime control handle is not attached",
))
};
let result = match result {
Ok(result) => ControlCommandResult {
command_id: command.command_id.clone(),
target_id: command.target_id.clone(),
accepted: true,
status: "completed".to_owned(),
error: None,
state_delta: Some(json!(result)),
completed_at_unix_nanos: Some(unix_nanos_now()),
},
Err(error) => ControlCommandResult {
command_id: command.command_id.clone(),
target_id: command.target_id.clone(),
accepted: false,
status: "failed".to_owned(),
error: Some(error),
state_delta: None,
completed_at_unix_nanos: Some(unix_nanos_now()),
},
};
Ok(IpcResult::CommandResult {
target_id: command.target_id,
result,
})
}
}
pub fn bind_dashboard_listener(
config: &ValidatedDashboardIpcConfig,
) -> Result<UnixListener, DashboardError> {
prepare_socket_path(config)?;
UnixListener::bind(&config.path).map_err(|error| {
DashboardError::new(
"ipc_bind_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to bind target IPC socket: {error}"),
true,
)
})
}
fn prepare_socket_path(config: &ValidatedDashboardIpcConfig) -> Result<(), DashboardError> {
let metadata = match std::fs::symlink_metadata(&config.path) {
Ok(metadata) => metadata,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(error) => {
return Err(DashboardError::new(
"ipc_path_metadata_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to inspect IPC path: {error}"),
false,
));
}
};
match config.bind_mode {
crate::config::configurable::DashboardIpcBindMode::CreateNew => {
Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path already exists and bind_mode is create_new",
))
}
crate::config::configurable::DashboardIpcBindMode::ReplaceStale => {
if metadata.file_type().is_symlink() {
return Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path must not be a symlink",
));
}
if !metadata.file_type().is_socket() {
return Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path must be a Unix socket before stale replacement",
));
}
if StdUnixStream::connect(&config.path).is_ok() {
return Err(DashboardError::validation(
"ipc_bind",
Some(config.target_id.clone()),
"IPC path is served by a live process",
));
}
std::fs::remove_file(&config.path).map_err(|error| {
DashboardError::new(
"ipc_stale_remove_failed",
"ipc_bind",
Some(config.target_id.clone()),
format!("failed to remove stale IPC path: {error}"),
true,
)
})
}
}
}
fn require_session_trigger(request: &IpcRequest, target_id: &str) -> Result<(), DashboardError> {
let established = request
.params
.get("session_established")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
if established {
Ok(())
} else {
Err(DashboardError::new(
"session_required",
"subscription",
Some(target_id.to_owned()),
"event and log subscription must be triggered by an established dashboard session",
false,
))
}
}
pub fn validate_command(command: &ControlCommandRequest) -> Result<(), DashboardError> {
if command.reason.trim().is_empty() {
return Err(DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"command reason must not be empty",
));
}
if command.requested_by.trim().is_empty() {
return Err(DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"requested_by must be derived by relay",
));
}
if matches!(
command.command,
ControlCommandKind::ShutdownTree
| ControlCommandKind::RemoveChild
| ControlCommandKind::AddChild
) && !command.confirmed
{
return Err(DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"dangerous command requires confirmation",
));
}
Ok(())
}
async fn execute_command(
handle: &SupervisorHandle,
command: &ControlCommandRequest,
) -> Result<CommandResult, DashboardError> {
let result = match command.command {
ControlCommandKind::RestartChild => {
handle
.restart_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::PauseChild => {
handle
.pause_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::ResumeChild => {
handle
.resume_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::QuarantineChild => {
handle
.quarantine_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::RemoveChild => {
handle
.remove_child(child_id(command)?, &command.requested_by, &command.reason)
.await
}
ControlCommandKind::AddChild => {
handle
.add_child(
SupervisorPath::root(),
command.target.child_manifest.clone().unwrap_or_default(),
&command.requested_by,
&command.reason,
)
.await
}
ControlCommandKind::ShutdownTree => {
handle
.shutdown_tree(&command.requested_by, &command.reason)
.await
}
};
result.map_err(|error| {
DashboardError::new(
"command_failed",
"command_dispatch",
Some(command.target_id.clone()),
error.to_string(),
true,
)
})
}
fn child_id(command: &ControlCommandRequest) -> Result<ChildId, DashboardError> {
let child_path = command.target.child_path.as_deref().ok_or_else(|| {
DashboardError::validation(
"command_validate",
Some(command.target_id.clone()),
"child_path is required for child command",
)
})?;
let value = child_path
.rsplit('/')
.find(|segment| !segment.is_empty())
.unwrap_or(child_path);
Ok(ChildId::new(value))
}
fn unix_nanos_now() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::ZERO)
.as_nanos()
}