use crate::dashboard::config::ValidatedDashboardIpcConfig;
use crate::dashboard::error::DashboardError;
use crate::dashboard::model::{SupportedCommand, TargetProcessRegistration};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::time::{Duration, sleep};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RegistrationAck {
pub ok: bool,
pub target_id: Option<String>,
pub status: Option<String>,
pub error: Option<RegistrationAckError>,
pub retryable: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RegistrationAckError {
pub code: String,
pub message: String,
}
pub fn build_registration_payload(
config: &ValidatedDashboardIpcConfig,
) -> Result<TargetProcessRegistration, DashboardError> {
let registration = config.registration.as_ref().ok_or_else(|| {
DashboardError::validation(
"registration",
Some(config.target_id.clone()),
"dynamic registration is not enabled",
)
})?;
Ok(TargetProcessRegistration {
target_id: config.target_id.clone(),
display_name: registration.display_name.clone(),
ipc_path: config.path.to_string_lossy().into_owned(),
lease_seconds: registration.lease_seconds,
supported_commands: default_supported_commands(),
})
}
pub fn registration_to_line(
registration: &TargetProcessRegistration,
) -> Result<String, DashboardError> {
let mut line = serde_json::to_string(registration).map_err(|error| {
DashboardError::new(
"serialization_failed",
"registration_write",
Some(registration.target_id.clone()),
format!("failed to serialize registration: {error}"),
false,
)
})?;
line.push('\n');
Ok(line)
}
pub async fn send_registration_upsert(
config: &ValidatedDashboardIpcConfig,
) -> Result<RegistrationAck, DashboardError> {
let registration = config.registration.as_ref().ok_or_else(|| {
DashboardError::validation(
"registration",
Some(config.target_id.clone()),
"dynamic registration is not enabled",
)
})?;
let payload = build_registration_payload(config)?;
let line = registration_to_line(&payload)?;
let stream = UnixStream::connect(®istration.relay_registration_path)
.await
.map_err(|error| {
DashboardError::new(
"registration_connect_failed",
"registration_send",
Some(config.target_id.clone()),
format!("failed to connect relay registration socket: {error}"),
true,
)
})?;
let mut stream = BufReader::new(stream);
stream
.get_mut()
.write_all(line.as_bytes())
.await
.map_err(|error| {
DashboardError::new(
"registration_write_failed",
"registration_send",
Some(config.target_id.clone()),
format!("failed to write registration upsert: {error}"),
true,
)
})?;
let mut ack_line = String::new();
stream.read_line(&mut ack_line).await.map_err(|error| {
DashboardError::new(
"registration_ack_read_failed",
"registration_send",
Some(config.target_id.clone()),
format!("failed to read registration ack: {error}"),
true,
)
})?;
let ack: RegistrationAck = serde_json::from_str(ack_line.trim()).map_err(|error| {
DashboardError::new(
"registration_ack_decode_failed",
"registration_send",
Some(config.target_id.clone()),
format!("failed to decode registration ack: {error}"),
false,
)
})?;
Ok(ack)
}
pub async fn run_registration_heartbeat(
config: ValidatedDashboardIpcConfig,
) -> Result<(), DashboardError> {
loop {
match send_registration_upsert(&config).await {
Ok(ack) if ack.ok => {}
Ok(ack) if !ack.retryable => {
let detail = ack
.error
.map(|error| format!("{}: {}", error.code, error.message))
.unwrap_or_else(|| "registration failed".to_owned());
return Err(DashboardError::new(
"registration_failed",
"registration_heartbeat",
Some(config.target_id.clone()),
detail,
false,
));
}
Ok(_) | Err(_) => {}
}
let interval = config
.registration
.as_ref()
.map(|registration| registration.registration_heartbeat_interval_seconds)
.unwrap_or(15);
sleep(Duration::from_secs(interval)).await;
}
}
fn default_supported_commands() -> Vec<SupportedCommand> {
[
"restart_child",
"pause_child",
"resume_child",
"quarantine_child",
"remove_child",
"add_child",
"shutdown_tree",
]
.into_iter()
.map(|name| SupportedCommand {
name: name.to_owned(),
idempotent: false,
timeout_seconds: 30,
})
.collect()
}