use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{RwLock, mpsc, oneshot};
use crate::sdk::{
AddServiceParams, AddServiceResult, DepType, DependencyInfo, LegacyServiceStatus, OkResponse,
PingResponse, PrepareRestartResult, ReloadResult, RpcRequest, RpcResponse, ServiceConfig,
ServiceInfo, ServiceState, ServiceStats, ServiceStatus, TreeResponse, WhyBlocked, XinetConfig,
XinetStatus, error_codes,
};
use super::xinet::XinetManager;
use super::debug;
use super::graph::ServiceGraph;
use super::log::LogBuffers;
#[allow(clippy::large_enum_variant)]
pub enum IpcCommand {
StartService {
name: String,
},
StopService {
name: String,
},
RestartService {
name: String,
},
KillService {
name: String,
signal: Option<String>,
},
AddService {
config: ServiceConfig,
persist: bool,
},
RemoveService {
name: String,
},
Reload {
response_tx: oneshot::Sender<Result<ReloadResult, String>>,
},
Shutdown,
PrepareRestart {
response_tx: oneshot::Sender<Result<PrepareRestartResult, String>>,
},
}
impl std::fmt::Debug for IpcCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StartService { name } => {
f.debug_struct("StartService").field("name", name).finish()
}
Self::StopService { name } => {
f.debug_struct("StopService").field("name", name).finish()
}
Self::RestartService { name } => f
.debug_struct("RestartService")
.field("name", name)
.finish(),
Self::KillService { name, signal } => f
.debug_struct("KillService")
.field("name", name)
.field("signal", signal)
.finish(),
Self::AddService { config, persist } => f
.debug_struct("AddService")
.field("config", &config.service.name)
.field("persist", persist)
.finish(),
Self::RemoveService { name } => {
f.debug_struct("RemoveService").field("name", name).finish()
}
Self::Reload { .. } => write!(f, "Reload"),
Self::Shutdown => write!(f, "Shutdown"),
Self::PrepareRestart { .. } => write!(f, "PrepareRestart"),
}
}
}
pub async fn run_ipc_server(
socket_path: &Path,
config_dir: PathBuf,
graph: Arc<RwLock<ServiceGraph>>,
log_buffers: LogBuffers,
command_tx: mpsc::Sender<IpcCommand>,
xinet_manager: Option<Arc<XinetManager>>,
) -> std::io::Result<()> {
if socket_path.exists() {
std::fs::remove_file(socket_path)?;
}
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(socket_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o660))?;
}
tracing::info!(path = %socket_path.display(), "IPC server listening");
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let config_dir = config_dir.clone();
let graph = Arc::clone(&graph);
let log_buffers = Arc::clone(&log_buffers);
let command_tx = command_tx.clone();
let xinet_manager = xinet_manager.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(
stream,
config_dir,
graph,
log_buffers,
command_tx,
xinet_manager,
)
.await
{
tracing::debug!(error = %e, "connection handler error");
}
});
}
Err(e) => {
tracing::error!(error = %e, "failed to accept connection");
}
}
}
}
async fn handle_connection(
stream: UnixStream,
config_dir: PathBuf,
graph: Arc<RwLock<ServiceGraph>>,
log_buffers: LogBuffers,
command_tx: mpsc::Sender<IpcCommand>,
xinet_manager: Option<Arc<XinetManager>>,
) -> std::io::Result<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
let bytes_read = reader.read_line(&mut line).await?;
if bytes_read == 0 {
break;
}
let response = match serde_json::from_str::<RpcRequest>(&line) {
Ok(request) => {
dispatch(
&request,
&config_dir,
&graph,
&log_buffers,
&command_tx,
&xinet_manager,
)
.await
}
Err(e) => RpcResponse::error(0, error_codes::PARSE_ERROR, e.to_string()),
};
let mut response_json = serde_json::to_string(&response)?;
response_json.push('\n');
writer.write_all(response_json.as_bytes()).await?;
writer.flush().await?;
}
Ok(())
}
async fn dispatch(
request: &RpcRequest,
config_dir: &Path,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
command_tx: &mpsc::Sender<IpcCommand>,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let id = request.id;
match request.method.as_str() {
"rpc.discover" => handle_discover(id),
"system.ping" => handle_ping(id),
"system.shutdown" => handle_shutdown(id, command_tx).await,
"system.reboot" => handle_reboot(id).await,
"system.prepare_restart" => handle_prepare_restart(id, command_tx).await,
"service.create" => {
handle_add(id, &request.params, config_dir, graph, command_tx, None).await
}
"service.update" => handle_update(id, &request.params, config_dir, graph, command_tx).await,
"service.list" => handle_list_simple(id, graph).await,
"service.status" => handle_status_simple(id, &request.params, graph).await,
"service.get" => handle_get_config(id, &request.params, graph).await,
"service.stats" => handle_stats_simple(id, &request.params, graph).await,
"service.list_full" => handle_list(id, graph).await,
"service.status_full" => handle_status(id, &request.params, graph).await,
"service.is_running" => handle_is_running(id, &request.params, graph).await,
"service.why" => handle_why(id, &request.params, graph).await,
"service.tree" => handle_tree(id, graph).await,
"service.start" => handle_start(id, &request.params, command_tx).await,
"service.stop" => handle_stop(id, &request.params, command_tx).await,
"service.restart" => handle_restart(id, &request.params, command_tx).await,
"service.kill" => handle_kill(id, &request.params, command_tx).await,
"service.delete" | "service.remove" => handle_delete(id, &request.params, command_tx).await,
"service.add" => handle_add(id, &request.params, config_dir, graph, command_tx, None).await,
"service.monitor" => {
handle_add(
id,
&request.params,
config_dir,
graph,
command_tx,
Some(false),
)
.await
}
"service.register" => {
handle_add(
id,
&request.params,
config_dir,
graph,
command_tx,
Some(true),
)
.await
}
"service.reload" => {
if request.params.get("name").is_some() {
handle_reload_service(id, &request.params, config_dir, graph, command_tx).await
} else {
handle_reload(id, command_tx).await
}
}
"service.start_all" => handle_start_all(id, graph, command_tx).await,
"service.stop_all" => handle_stop_all(id, graph, command_tx).await,
"service.delete_all" => handle_delete_all(id, graph, command_tx).await,
"logs.get" => handle_logs_simple(id, &request.params, graph, log_buffers).await,
"logs.tail" => handle_logs(id, &request.params, graph, log_buffers, true).await,
"logs.filter" => handle_logs_filter(id, &request.params, graph, log_buffers).await,
"debug.state" => handle_debug_state(id, graph).await,
"debug.process_tree" => handle_debug_process_tree(id, &request.params, graph).await,
"xinet.create" => handle_xinet_register(id, &request.params, xinet_manager).await,
"xinet.delete" => handle_xinet_unregister(id, &request.params, xinet_manager).await,
"xinet.list" => handle_xinet_list(id, xinet_manager).await,
"xinet.status" => handle_xinet_status_simple(id, &request.params, xinet_manager).await,
"xinet.register" => handle_xinet_register(id, &request.params, xinet_manager).await,
"xinet.unregister" => handle_xinet_unregister(id, &request.params, xinet_manager).await,
"xinet.status_all" => handle_xinet_status_all(id, xinet_manager).await,
_ => RpcResponse::error(id, error_codes::METHOD_NOT_FOUND, "unknown method"),
}
}
const OPENRPC_SPEC: &str = include_str!("../../docs/reference/openrpc.json");
fn handle_discover(id: u64) -> RpcResponse {
match serde_json::from_str::<serde_json::Value>(OPENRPC_SPEC) {
Ok(spec) => RpcResponse::success(id, spec),
Err(e) => RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
format!("Failed to parse OpenRPC spec: {}", e),
),
}
}
fn handle_ping(id: u64) -> RpcResponse {
let response = PingResponse {
version: env!("CARGO_PKG_VERSION").to_string(),
};
RpcResponse::success(id, serde_json::to_value(response).unwrap())
}
async fn handle_reboot(id: u64) -> RpcResponse {
#[cfg(target_os = "linux")]
{
use nix::sys::reboot::{RebootMode, reboot};
if std::process::id() == 1 {
nix::unistd::sync();
if let Err(e) = reboot(RebootMode::RB_AUTOBOOT) {
return RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
format!("Reboot failed: {}", e),
);
}
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
} else {
RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Reboot only available when running as PID 1",
)
}
}
#[cfg(not(target_os = "linux"))]
{
RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Reboot only available on Linux",
)
}
}
async fn handle_shutdown(id: u64, command_tx: &mpsc::Sender<IpcCommand>) -> RpcResponse {
let _ = command_tx.send(IpcCommand::Shutdown).await;
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
}
async fn handle_prepare_restart(id: u64, command_tx: &mpsc::Sender<IpcCommand>) -> RpcResponse {
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::PrepareRestart { response_tx })
.await
.is_err()
{
return RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Failed to send command to supervisor",
);
}
match response_rx.await {
Ok(Ok(result)) => RpcResponse::success(id, serde_json::to_value(result).unwrap()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Supervisor dropped response channel",
),
}
}
async fn handle_list_simple(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let names: Vec<String> = graph
.all_services()
.filter_map(|sid| graph.get(sid).map(|s| s.name.clone()))
.collect();
RpcResponse::success(id, serde_json::to_value(names).unwrap())
}
async fn handle_status_simple(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let status = ServiceStatus::from_state(service.name.clone(), &service.state);
RpcResponse::success(id, serde_json::to_value(status).unwrap())
}
async fn handle_stats_simple(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let pid = service.state.pid().unwrap_or(0);
let (memory_bytes, cpu_percent) = if pid > 0 {
get_process_stats(pid)
} else {
(0, 0.0)
};
let stats = ServiceStats {
pid,
memory_bytes,
cpu_percent,
};
RpcResponse::success(id, serde_json::to_value(stats).unwrap())
}
async fn handle_update(
id: u64,
params: &serde_json::Value,
config_dir: &Path,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let config: ServiceConfig =
match serde_json::from_value(params.get("config").cloned().unwrap_or(params.clone())) {
Ok(c) => c,
Err(e) => return RpcResponse::error(id, error_codes::INVALID_PARAMS, e.to_string()),
};
let name = config.service.name.clone();
let was_running = {
let graph = graph.read().await;
match graph.get_by_name(&name) {
Some(service_id) => {
let service = graph.get(service_id).unwrap();
service.state.is_active()
}
None => {
return RpcResponse::error(
id,
error_codes::SERVICE_NOT_FOUND,
format!("Service '{}' not found", name),
);
}
}
};
let _ = command_tx
.send(IpcCommand::RemoveService { name: name.clone() })
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let file_path = config_dir.join(format!("{}.toml", name));
if let Err(e) = std::fs::write(
&file_path,
toml::to_string_pretty(&config).unwrap_or_default(),
) {
return RpcResponse::error(
id,
error_codes::PERSIST_FAILED,
format!("Failed to write config: {}", e),
);
}
let _ = command_tx
.send(IpcCommand::AddService {
config,
persist: false,
})
.await;
if was_running {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let _ = command_tx
.send(IpcCommand::StartService { name: name.clone() })
.await;
}
RpcResponse::success(id, serde_json::json!({ "name": name }))
}
async fn handle_logs_simple(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
) -> RpcResponse {
let lines = params
.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
.unwrap_or(100);
if let Some(name) = params.get("name").and_then(|v| v.as_str()) {
let service_id = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(id) => id,
None => {
return RpcResponse::error(
id,
error_codes::SERVICE_NOT_FOUND,
"service not found",
);
}
}
};
let logs = crate::server::log::get_logs(log_buffers, service_id, Some(lines)).await;
let log_strings: Vec<String> = logs.iter().map(|l| l.to_string_format()).collect();
RpcResponse::success(id, serde_json::to_value(log_strings).unwrap())
} else {
RpcResponse::success(id, serde_json::json!([]))
}
}
async fn handle_xinet_status_simple(
id: u64,
params: &serde_json::Value,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
match manager.status(name).await {
Some(status) => {
let simple = XinetStatus {
name: status.name,
running: status.running,
connections: status.active_connections as u32,
};
RpcResponse::success(id, serde_json::to_value(simple).unwrap())
}
None => RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "proxy not found"),
}
}
async fn handle_list(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let services: Vec<ServiceInfo> = graph
.all_services()
.filter_map(|sid| {
graph.get(sid).map(|s| ServiceInfo {
name: s.name.clone(),
state: s.state.clone(),
is_target: s.is_target(),
})
})
.collect();
RpcResponse::success(id, serde_json::to_value(services).unwrap())
}
async fn handle_status(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let dependencies: Vec<DependencyInfo> = graph
.dependencies(service_id)
.iter()
.filter_map(|(dep_id, dep_type)| {
graph.get(*dep_id).map(|dep| {
let satisfied = match dep_type {
DepType::Requires => dep.state.is_satisfied(),
DepType::After => !matches!(
dep.state,
ServiceState::Inactive | ServiceState::Blocked { .. }
),
DepType::Wants => true,
DepType::Conflicts => !dep.state.is_active(),
};
DependencyInfo {
name: dep.name.clone(),
dep_type: *dep_type,
state: dep.state.clone(),
satisfied,
}
})
})
.collect();
let status = LegacyServiceStatus {
name: service.name.clone(),
state: service.state.clone(),
is_target: service.is_target(),
dependencies,
uptime_secs: None,
};
RpcResponse::success(id, serde_json::to_value(status).unwrap())
}
async fn handle_get_config(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
match service.service_config() {
Some(config) => RpcResponse::success(id, serde_json::to_value(config).unwrap()),
None => {
RpcResponse::success(
id,
serde_json::json!({
"name": service.name,
"is_target": true
}),
)
}
}
}
fn get_process_stats(pid: u32) -> (u64, f32) {
use sysinfo::{Pid, ProcessesToUpdate, System};
let mut system = System::new();
let sys_pid = Pid::from_u32(pid);
system.refresh_processes(ProcessesToUpdate::Some(&[sys_pid]), true);
if let Some(process) = system.process(sys_pid) {
let memory_usage = process.memory();
(memory_usage, 0.0)
} else {
(0, 0.0)
}
}
async fn handle_is_running(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let is_running = graph
.get_by_name(name)
.and_then(|service_id| graph.get(service_id))
.map(|service| matches!(service.state, ServiceState::Running { .. }))
.unwrap_or(false);
RpcResponse::success(id, serde_json::json!(is_running))
}
async fn handle_why(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let blocked = matches!(service.state, ServiceState::Blocked { .. });
let (waiting_on, conflicts_with) = match graph.can_start(service_id) {
Ok(()) => (vec![], vec![]),
Err(reason) => (reason.waiting_on(), reason.conflicts_with()),
};
let ascii = graph
.format_why_blocked(name)
.unwrap_or_else(|| "Unknown service".to_string());
let why = WhyBlocked {
name: name.to_string(),
blocked,
waiting_on,
conflicts_with,
ascii,
};
RpcResponse::success(id, serde_json::to_value(why).unwrap())
}
async fn handle_tree(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let tree = graph.format_tree();
let response = TreeResponse { ascii: tree };
RpcResponse::success(id, serde_json::to_value(response).unwrap())
}
async fn handle_start(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let _ = command_tx
.send(IpcCommand::StartService {
name: name.to_string(),
})
.await;
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
}
async fn handle_stop(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let _ = command_tx
.send(IpcCommand::StopService {
name: name.to_string(),
})
.await;
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
}
async fn handle_restart(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let _ = command_tx
.send(IpcCommand::RestartService {
name: name.to_string(),
})
.await;
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
}
async fn handle_kill(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let signal = params
.get("signal")
.and_then(|v| v.as_str())
.map(String::from);
let _ = command_tx
.send(IpcCommand::KillService {
name: name.to_string(),
signal,
})
.await;
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
}
async fn handle_add(
id: u64,
params: &serde_json::Value,
config_dir: &Path,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
persist_override: Option<bool>,
) -> RpcResponse {
let add_params: AddServiceParams = match serde_json::from_value(params.clone()) {
Ok(p) => p,
Err(e) => return RpcResponse::error(id, error_codes::INVALID_PARAMS, e.to_string()),
};
let config = add_params.config;
let persist = persist_override.unwrap_or(add_params.persist);
let name = config.service.name.clone();
let errors = crate::sdk::validate::validate_service(&config);
if !errors.is_empty() {
return RpcResponse::error_with_data(
id,
error_codes::VALIDATION_FAILED,
"Validation failed",
serde_json::json!({ "errors": errors }),
);
}
{
let graph = graph.read().await;
if graph.get_by_name(&name).is_some() {
return RpcResponse::error(
id,
error_codes::SERVICE_EXISTS,
format!("Service '{}' already exists", name),
);
}
let all_deps = config
.dependencies
.after
.iter()
.chain(config.dependencies.requires.iter())
.chain(config.dependencies.wants.iter())
.chain(config.dependencies.conflicts.iter());
for dep in all_deps {
if graph.get_by_name(dep).is_none() {
return RpcResponse::error(
id,
error_codes::DEPENDENCY_MISSING,
format!("Dependency '{}' not found", dep),
);
}
}
}
let exec_path = config
.service
.exec
.split_whitespace()
.next()
.unwrap_or(&config.service.exec);
if !std::path::Path::new(exec_path).exists() {
return RpcResponse::error(
id,
error_codes::EXEC_NOT_FOUND,
format!("Executable not found: {}", exec_path),
);
}
let path = if persist {
let file_path = config_dir.join(format!("{}.toml", name));
match toml::to_string_pretty(&config) {
Ok(toml_str) => match std::fs::write(&file_path, toml_str) {
Ok(()) => Some(file_path.display().to_string()),
Err(e) => {
return RpcResponse::error(
id,
error_codes::PERSIST_FAILED,
format!("Failed to write {}: {}", file_path.display(), e),
);
}
},
Err(e) => {
return RpcResponse::error(
id,
error_codes::PERSIST_FAILED,
format!("Failed to serialize config: {}", e),
);
}
}
} else {
None
};
let _ = command_tx
.send(IpcCommand::AddService { config, persist })
.await;
let result = AddServiceResult {
name,
path,
warnings: vec![],
};
RpcResponse::success(id, serde_json::to_value(result).unwrap())
}
async fn handle_delete(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let _ = command_tx
.send(IpcCommand::RemoveService {
name: name.to_string(),
})
.await;
RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap())
}
async fn handle_reload(id: u64, command_tx: &mpsc::Sender<IpcCommand>) -> RpcResponse {
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::Reload { response_tx })
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match response_rx.await {
Ok(Ok(result)) => RpcResponse::success(id, serde_json::to_value(result).unwrap()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "reload response dropped"),
}
}
async fn handle_reload_service(
id: u64,
params: &serde_json::Value,
config_dir: &Path,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let config_path = config_dir.join(format!("{}.toml", name));
if !config_path.exists() {
return RpcResponse::error(
id,
error_codes::SERVICE_NOT_FOUND,
format!("Config file not found: {}", config_path.display()),
);
}
let content = match std::fs::read_to_string(&config_path) {
Ok(c) => c,
Err(e) => {
return RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
format!("Failed to read config: {}", e),
);
}
};
let new_config: ServiceConfig = match toml::from_str(&content) {
Ok(c) => c,
Err(e) => {
return RpcResponse::error(
id,
error_codes::INVALID_PARAMS,
format!("Failed to parse config: {}", e),
);
}
};
if new_config.service.name != name {
return RpcResponse::error(
id,
error_codes::INVALID_PARAMS,
format!(
"Config name '{}' doesn't match filename '{}'",
new_config.service.name, name
),
);
}
let was_running = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(service_id) => {
let service = graph.get(service_id).unwrap();
service.state.is_active()
}
None => {
return RpcResponse::error(
id,
error_codes::SERVICE_NOT_FOUND,
"service not in graph",
);
}
}
};
let _ = command_tx
.send(IpcCommand::RemoveService {
name: name.to_string(),
})
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let _ = command_tx
.send(IpcCommand::AddService {
config: new_config,
persist: false, })
.await;
if was_running {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let _ = command_tx
.send(IpcCommand::StartService {
name: name.to_string(),
})
.await;
}
RpcResponse::success(
id,
serde_json::json!({
"reloaded": name,
"restarted": was_running
}),
)
}
async fn handle_start_all(
id: u64,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let services_to_start: Vec<String> = {
let graph = graph.read().await;
graph
.all_services()
.filter_map(|sid| {
let service = graph.get(sid)?;
if service.is_protected() {
return None;
}
if service.state.is_active() {
return None;
}
Some(service.name.clone())
})
.collect()
};
let count = services_to_start.len();
for name in &services_to_start {
let _ = command_tx
.send(IpcCommand::StartService { name: name.clone() })
.await;
}
RpcResponse::success(
id,
serde_json::json!({
"started": services_to_start,
"count": count
}),
)
}
async fn handle_stop_all(
id: u64,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let services_to_stop: Vec<String> = {
let graph = graph.read().await;
graph
.shutdown_order()
.into_iter()
.filter_map(|sid| {
let service = graph.get(sid)?;
if service.is_protected() {
return None;
}
if !service.state.is_active() && service.state.pid().is_none() {
return None;
}
Some(service.name.clone())
})
.collect()
};
let count = services_to_stop.len();
for name in &services_to_stop {
let _ = command_tx
.send(IpcCommand::StopService { name: name.clone() })
.await;
}
RpcResponse::success(
id,
serde_json::json!({
"stopped": services_to_stop,
"count": count
}),
)
}
async fn handle_delete_all(
id: u64,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let services_to_delete: Vec<String> = {
let graph = graph.read().await;
graph
.shutdown_order()
.into_iter()
.filter_map(|sid| {
let service = graph.get(sid)?;
if service.is_protected() {
return None;
}
Some(service.name.clone())
})
.collect()
};
let count = services_to_delete.len();
for name in &services_to_delete {
let _ = command_tx
.send(IpcCommand::RemoveService { name: name.clone() })
.await;
}
RpcResponse::success(
id,
serde_json::json!({
"deleted": services_to_delete,
"count": count
}),
)
}
async fn handle_logs(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
tail: bool,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let lines = if tail {
params
.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
.or(Some(100))
} else {
params
.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
};
let service_id = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(id) => id,
None => {
return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found");
}
}
};
let logs = crate::server::log::get_logs(log_buffers, service_id, lines).await;
RpcResponse::success(id, serde_json::to_value(logs).unwrap())
}
async fn handle_logs_filter(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
) -> RpcResponse {
let name = params
.get("service")
.or_else(|| params.get("name"))
.and_then(|v| v.as_str());
let name = match name {
Some(n) => n,
None => {
return RpcResponse::error(
id,
error_codes::INVALID_PARAMS,
"missing 'service' parameter",
);
}
};
let service_id = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(id) => id,
None => {
return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found");
}
}
};
let logs = crate::server::log::get_logs(log_buffers, service_id, None).await;
let log_strings: Vec<String> = logs
.iter()
.map(|l| format!("{} [{}] {}", l.timestamp_ms, l.stream, l.content))
.collect();
RpcResponse::success(id, serde_json::to_value(log_strings).unwrap())
}
async fn handle_debug_state(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let timers = std::collections::HashMap::new();
let output = debug::format_graph_state(&graph, &timers);
RpcResponse::success(id, serde_json::json!({ "output": output }))
}
async fn handle_debug_process_tree(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = match graph.get(service_id) {
Some(s) => s,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let pid = match service.state.pid() {
Some(p) if p > 0 => p,
_ => {
return RpcResponse::success(
id,
serde_json::json!({ "output": format!("{} is not running", name) }),
);
}
};
let output = debug::format_process_tree(pid, name);
RpcResponse::success(id, serde_json::json!({ "output": output }))
}
async fn handle_xinet_register(
id: u64,
params: &serde_json::Value,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let config: XinetConfig = match serde_json::from_value(params.clone()) {
Ok(c) => c,
Err(e) => {
return RpcResponse::error(
id,
error_codes::INVALID_PARAMS,
format!("Invalid xinet config: {}", e),
);
}
};
match manager.register(config).await {
Ok(()) => RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap()),
Err(e) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
}
}
async fn handle_xinet_unregister(
id: u64,
params: &serde_json::Value,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
match manager.unregister(name).await {
Ok(()) => RpcResponse::success(id, serde_json::to_value(OkResponse::default()).unwrap()),
Err(e) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
}
}
async fn handle_xinet_list(id: u64, xinet_manager: &Option<Arc<XinetManager>>) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let names = manager.list().await;
RpcResponse::success(id, serde_json::to_value(names).unwrap())
}
async fn handle_xinet_status_all(
id: u64,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let statuses = manager.status_all().await;
RpcResponse::success(id, serde_json::to_value(statuses).unwrap())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sdk::{DependencyDef, LifecycleDef, LoggingDef, ServiceConfig, ServiceDef};
use crate::server::graph::{Service, ServiceGraph};
use std::collections::HashMap;
fn make_test_config(name: &str) -> ServiceConfig {
ServiceConfig {
service: ServiceDef {
name: name.to_string(),
exec: "/bin/test".to_string(),
dir: None,
oneshot: false,
env: HashMap::new(),
status: crate::sdk::Status::default(),
class: crate::sdk::ServiceClass::default(),
critical: false,
},
dependencies: DependencyDef::default(),
lifecycle: LifecycleDef::default(),
health: None,
logging: LoggingDef::default(),
}
}
#[test]
fn test_handle_ping() {
let response = handle_ping(1);
assert!(response.is_ok());
}
#[test]
fn test_handle_discover() {
let response = handle_discover(1);
assert!(response.is_ok());
if let Some(result) = &response.result {
assert!(
result.get("openrpc").is_some(),
"should have openrpc version"
);
assert!(result.get("info").is_some(), "should have info section");
assert!(result.get("methods").is_some(), "should have methods array");
let info = result.get("info").unwrap();
assert_eq!(
info.get("title").and_then(|v| v.as_str()),
Some("Zinit RPC API")
);
let methods = result.get("methods").unwrap().as_array().unwrap();
assert!(!methods.is_empty(), "should have methods defined");
let has_discover = methods
.iter()
.any(|m| m.get("name").and_then(|n| n.as_str()) == Some("rpc.discover"));
assert!(has_discover, "should include rpc.discover method");
} else {
panic!("Expected result in response");
}
}
#[tokio::test]
async fn test_handle_is_running_not_found() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let params = serde_json::json!({"name": "nonexistent"});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(response.is_ok());
assert_eq!(response.result, Some(serde_json::json!(false)));
}
#[tokio::test]
async fn test_handle_is_running_inactive() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
{
let mut g = graph.write().await;
let service = Service::from_service(make_test_config("test-service"));
g.add_service(service).unwrap();
}
let params = serde_json::json!({"name": "test-service"});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(response.is_ok());
assert_eq!(response.result, Some(serde_json::json!(false)));
}
#[tokio::test]
async fn test_handle_is_running_running() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
{
let mut g = graph.write().await;
let service = Service::from_service(make_test_config("test-service"));
let id = g.add_service(service).unwrap();
g.set_state(id, ServiceState::Running { pid: 1234 });
}
let params = serde_json::json!({"name": "test-service"});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(response.is_ok());
assert_eq!(response.result, Some(serde_json::json!(true)));
}
#[tokio::test]
async fn test_handle_is_running_missing_param() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let params = serde_json::json!({});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(!response.is_ok());
}
fn make_system_config(name: &str) -> ServiceConfig {
ServiceConfig {
service: ServiceDef {
name: name.to_string(),
exec: "/bin/test".to_string(),
dir: None,
oneshot: false,
env: HashMap::new(),
status: crate::sdk::Status::default(),
class: crate::sdk::ServiceClass::System,
critical: false,
},
dependencies: DependencyDef::default(),
lifecycle: LifecycleDef::default(),
health: None,
logging: LoggingDef::default(),
}
}
#[tokio::test]
async fn test_handle_start_all_skips_system() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let (tx, mut rx) = mpsc::channel(10);
{
let mut g = graph.write().await;
let user_svc = Service::from_service(make_test_config("user-service"));
let system_svc = Service::from_service(make_system_config("system-service"));
g.add_service(user_svc).unwrap();
g.add_service(system_svc).unwrap();
}
let response = handle_start_all(1, &graph, &tx).await;
assert!(response.is_ok());
let result = response.result.unwrap();
let started: Vec<String> =
serde_json::from_value(result.get("started").unwrap().clone()).unwrap();
assert!(started.contains(&"user-service".to_string()));
assert!(!started.contains(&"system-service".to_string()));
assert_eq!(result.get("count").unwrap().as_u64().unwrap(), 1);
let cmd = rx.recv().await.unwrap();
match cmd {
IpcCommand::StartService { name } => assert_eq!(name, "user-service"),
_ => panic!("Expected StartService command"),
}
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_handle_stop_all_skips_system() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let (tx, mut rx) = mpsc::channel(10);
{
let mut g = graph.write().await;
let user_svc = Service::from_service(make_test_config("user-service"));
let system_svc = Service::from_service(make_system_config("system-service"));
let user_id = g.add_service(user_svc).unwrap();
let system_id = g.add_service(system_svc).unwrap();
g.set_state(user_id, ServiceState::Running { pid: 1001 });
g.set_state(system_id, ServiceState::Running { pid: 1002 });
}
let response = handle_stop_all(1, &graph, &tx).await;
assert!(response.is_ok());
let result = response.result.unwrap();
let stopped: Vec<String> =
serde_json::from_value(result.get("stopped").unwrap().clone()).unwrap();
assert!(stopped.contains(&"user-service".to_string()));
assert!(!stopped.contains(&"system-service".to_string()));
let cmd = rx.recv().await.unwrap();
match cmd {
IpcCommand::StopService { name } => assert_eq!(name, "user-service"),
_ => panic!("Expected StopService command"),
}
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_handle_delete_all_skips_system() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let (tx, mut rx) = mpsc::channel(10);
{
let mut g = graph.write().await;
let user_svc = Service::from_service(make_test_config("user-service"));
let system_svc = Service::from_service(make_system_config("system-service"));
g.add_service(user_svc).unwrap();
g.add_service(system_svc).unwrap();
}
let response = handle_delete_all(1, &graph, &tx).await;
assert!(response.is_ok());
let result = response.result.unwrap();
let deleted: Vec<String> =
serde_json::from_value(result.get("deleted").unwrap().clone()).unwrap();
assert!(deleted.contains(&"user-service".to_string()));
assert!(!deleted.contains(&"system-service".to_string()));
let cmd = rx.recv().await.unwrap();
match cmd {
IpcCommand::RemoveService { name } => assert_eq!(name, "user-service"),
_ => panic!("Expected RemoveService command"),
}
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_handle_start_all_empty_when_all_system() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let (tx, mut rx) = mpsc::channel(10);
{
let mut g = graph.write().await;
let system_svc = Service::from_service(make_system_config("system-service"));
g.add_service(system_svc).unwrap();
}
let response = handle_start_all(1, &graph, &tx).await;
assert!(response.is_ok());
let result = response.result.unwrap();
assert_eq!(result.get("count").unwrap().as_u64().unwrap(), 0);
assert!(rx.try_recv().is_err());
}
}