use std::path::{Path, PathBuf};
use std::sync::Arc;
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{RwLock, mpsc, oneshot};
use crate::sdk::{
AddServiceResult, DepType, DependencyInfo, LegacyServiceStatus, OkResponse, PingResponse,
PrepareRestartResult, ReloadResult, RpcRequest, RpcResponse, ServiceConfig, ServiceInfo,
ServiceState, ServiceStats, ServiceStatus, TreeResponse, WhyBlocked, XinetConfig, XinetStatus,
error_codes,
};
fn success_response<T: Serialize>(id: u64, value: T) -> RpcResponse {
match serde_json::to_value(value) {
Ok(v) => RpcResponse::success(id, v),
Err(e) => {
tracing::error!(error = %e, "failed to serialize RPC response");
RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
format!("serialization failed: {}", e),
)
}
}
}
use super::xinet::XinetManager;
use super::debug;
use super::graph::ServiceGraph;
use super::log::LogBuffers;
#[allow(clippy::large_enum_variant)]
pub enum IpcCommand {
StartService {
name: String,
response_tx: oneshot::Sender<Result<(), String>>,
},
StopService {
name: String,
response_tx: oneshot::Sender<Result<(), String>>,
},
RestartService {
name: String,
response_tx: oneshot::Sender<Result<(), String>>,
},
KillService {
name: String,
signal: Option<String>,
response_tx: oneshot::Sender<Result<(), String>>,
},
AddService {
config: ServiceConfig,
persist: bool,
response_tx: oneshot::Sender<Result<(), String>>,
},
RemoveService {
name: String,
response_tx: oneshot::Sender<Result<(), String>>,
},
Reload {
response_tx: oneshot::Sender<Result<ReloadResult, String>>,
},
Shutdown,
PrepareRestart {
response_tx: oneshot::Sender<Result<PrepareRestartResult, String>>,
},
}
impl std::fmt::Debug for IpcCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StartService { name, .. } => {
f.debug_struct("StartService").field("name", name).finish()
}
Self::StopService { name, .. } => {
f.debug_struct("StopService").field("name", name).finish()
}
Self::RestartService { name, .. } => f
.debug_struct("RestartService")
.field("name", name)
.finish(),
Self::KillService { name, signal, .. } => f
.debug_struct("KillService")
.field("name", name)
.field("signal", signal)
.finish(),
Self::AddService {
config, persist, ..
} => f
.debug_struct("AddService")
.field("config", &config.service.name)
.field("persist", persist)
.finish(),
Self::RemoveService { name, .. } => {
f.debug_struct("RemoveService").field("name", name).finish()
}
Self::Reload { .. } => write!(f, "Reload"),
Self::Shutdown => write!(f, "Shutdown"),
Self::PrepareRestart { .. } => write!(f, "PrepareRestart"),
}
}
}
pub async fn run_ipc_server(
socket_path: &Path,
config_dir: PathBuf,
graph: Arc<RwLock<ServiceGraph>>,
log_buffers: LogBuffers,
command_tx: mpsc::Sender<IpcCommand>,
xinet_manager: Option<Arc<XinetManager>>,
) -> std::io::Result<()> {
if socket_path.exists() {
std::fs::remove_file(socket_path)?;
}
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(socket_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o660))?;
}
tracing::info!(path = %socket_path.display(), "IPC server listening");
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let config_dir = config_dir.clone();
let graph = Arc::clone(&graph);
let log_buffers = Arc::clone(&log_buffers);
let command_tx = command_tx.clone();
let xinet_manager = xinet_manager.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(
stream,
config_dir,
graph,
log_buffers,
command_tx,
xinet_manager,
)
.await
{
tracing::debug!(error = %e, "connection handler error");
}
});
}
Err(e) => {
tracing::error!(error = %e, "failed to accept connection");
}
}
}
}
async fn handle_connection(
stream: UnixStream,
config_dir: PathBuf,
graph: Arc<RwLock<ServiceGraph>>,
log_buffers: LogBuffers,
command_tx: mpsc::Sender<IpcCommand>,
xinet_manager: Option<Arc<XinetManager>>,
) -> std::io::Result<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
let bytes_read = reader.read_line(&mut line).await?;
if bytes_read == 0 {
break;
}
let response = match serde_json::from_str::<RpcRequest>(&line) {
Ok(request) => {
dispatch(
&request,
&config_dir,
&graph,
&log_buffers,
&command_tx,
&xinet_manager,
)
.await
}
Err(e) => RpcResponse::error(0, error_codes::PARSE_ERROR, e.to_string()),
};
let mut response_json = serde_json::to_string(&response)?;
response_json.push('\n');
writer.write_all(response_json.as_bytes()).await?;
writer.flush().await?;
}
Ok(())
}
async fn dispatch(
request: &RpcRequest,
config_dir: &Path,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
command_tx: &mpsc::Sender<IpcCommand>,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let id = request.id;
match request.method.as_str() {
"rpc.discover" => handle_discover(id),
"system.ping" => handle_ping(id),
"system.shutdown" => handle_shutdown(id, command_tx).await,
"system.reboot" => handle_reboot(id).await,
"system.prepare_restart" => handle_prepare_restart(id, command_tx).await,
"service.set" => handle_set(id, &request.params, config_dir, graph, command_tx).await,
"service.list" => handle_list_simple(id, graph).await,
"service.status" => handle_status_simple(id, &request.params, graph).await,
"service.get" => handle_get_config(id, &request.params, graph).await,
"service.stats" => handle_stats_simple(id, &request.params, graph).await,
"service.children" => handle_children(id, &request.params, graph).await,
"service.list_full" => handle_list(id, graph).await,
"service.status_full" => handle_status(id, &request.params, graph).await,
"service.is_running" => handle_is_running(id, &request.params, graph).await,
"service.why" => handle_why(id, &request.params, graph).await,
"service.tree" => handle_tree(id, graph).await,
"service.start" => handle_start(id, &request.params, command_tx).await,
"service.stop" => handle_stop(id, &request.params, command_tx).await,
"service.restart" => handle_restart(id, &request.params, command_tx).await,
"service.kill" => handle_kill(id, &request.params, command_tx).await,
"service.delete" | "service.remove" => handle_delete(id, &request.params, command_tx).await,
"logs.get" => handle_logs_simple(id, &request.params, graph, log_buffers).await,
"logs.tail" => handle_logs(id, &request.params, graph, log_buffers, true).await,
"logs.filter" => handle_logs_filter(id, &request.params, graph, log_buffers).await,
"debug.state" => handle_debug_state(id, graph).await,
"debug.process_tree" => handle_debug_process_tree(id, &request.params, graph).await,
"xinet.set" => handle_xinet_set(id, &request.params, xinet_manager).await,
"xinet.delete" => handle_xinet_delete(id, &request.params, xinet_manager).await,
"xinet.list" => handle_xinet_list(id, xinet_manager).await,
"xinet.status" => handle_xinet_status_simple(id, &request.params, xinet_manager).await,
"xinet.status_all" => handle_xinet_status_all(id, xinet_manager).await,
_ => RpcResponse::error(id, error_codes::METHOD_NOT_FOUND, "unknown method"),
}
}
const OPENRPC_SPEC: &str = include_str!("../../docs/reference/openrpc.json");
fn handle_discover(id: u64) -> RpcResponse {
match serde_json::from_str::<serde_json::Value>(OPENRPC_SPEC) {
Ok(spec) => RpcResponse::success(id, spec),
Err(e) => RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
format!("Failed to parse OpenRPC spec: {}", e),
),
}
}
fn handle_ping(id: u64) -> RpcResponse {
let response = PingResponse {
version: env!("CARGO_PKG_VERSION").to_string(),
};
success_response(id, response)
}
async fn handle_reboot(id: u64) -> RpcResponse {
#[cfg(target_os = "linux")]
{
use nix::sys::reboot::{RebootMode, reboot};
if std::process::id() == 1 {
nix::unistd::sync();
match reboot(RebootMode::RB_AUTOBOOT) {
Err(e) => {
return RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
format!("Reboot failed: {}", e),
);
}
Ok(_) => {}
}
success_response(id, OkResponse::default())
} else {
RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Reboot only available when running as PID 1",
)
}
}
#[cfg(not(target_os = "linux"))]
{
RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Reboot only available on Linux",
)
}
}
async fn handle_shutdown(id: u64, command_tx: &mpsc::Sender<IpcCommand>) -> RpcResponse {
let _ = command_tx.send(IpcCommand::Shutdown).await;
success_response(id, OkResponse::default())
}
async fn handle_prepare_restart(id: u64, command_tx: &mpsc::Sender<IpcCommand>) -> RpcResponse {
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::PrepareRestart { response_tx })
.await
.is_err()
{
return RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Failed to send command to supervisor",
);
}
match response_rx.await {
Ok(Ok(result)) => success_response(id, result),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"Supervisor dropped response channel",
),
}
}
async fn handle_list_simple(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let names: Vec<String> = graph
.all_services()
.filter_map(|sid| graph.get(sid).map(|s| s.name.clone()))
.collect();
success_response(id, names)
}
async fn handle_status_simple(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let status = ServiceStatus::from_state(service.name.clone(), &service.state);
success_response(id, status)
}
async fn handle_stats_simple(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let pid = service.state.pid().unwrap_or(0);
let (memory_bytes, cpu_percent) = if pid > 0 {
get_process_stats(pid)
} else {
(0, 0.0)
};
let stats = ServiceStats {
pid,
memory_bytes,
cpu_percent,
};
success_response(id, stats)
}
async fn handle_children(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
use crate::server::process::get_child_processes;
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let pid = service.state.pid().unwrap_or(0);
if pid == 0 {
return success_response(id, serde_json::json!({ "children": [] }));
}
let children = get_child_processes(pid);
success_response(id, serde_json::json!({ "children": children }))
}
async fn handle_logs_simple(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
) -> RpcResponse {
let lines = params
.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
.unwrap_or(100);
if let Some(name) = params.get("name").and_then(|v| v.as_str()) {
let service_id = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(id) => id,
None => {
return RpcResponse::error(
id,
error_codes::SERVICE_NOT_FOUND,
"service not found",
);
}
}
};
let logs = crate::server::log::get_logs(log_buffers, service_id, Some(lines)).await;
let log_strings: Vec<String> = logs.iter().map(|l| l.to_string_format()).collect();
success_response(id, log_strings)
} else {
RpcResponse::success(id, serde_json::json!([]))
}
}
async fn handle_xinet_status_simple(
id: u64,
params: &serde_json::Value,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
match manager.status(name).await {
Some(status) => {
let simple = XinetStatus {
name: status.name,
running: status.running,
connections: status.active_connections as u32,
};
success_response(id, simple)
}
None => RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "proxy not found"),
}
}
async fn handle_list(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let services: Vec<ServiceInfo> = graph
.all_services()
.filter_map(|sid| {
graph.get(sid).map(|s| ServiceInfo {
name: s.name.clone(),
state: s.state.clone(),
is_target: s.is_target(),
})
})
.collect();
success_response(id, services)
}
async fn handle_status(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let dependencies: Vec<DependencyInfo> = graph
.dependencies(service_id)
.iter()
.filter_map(|(dep_id, dep_type)| {
graph.get(*dep_id).map(|dep| {
let satisfied = match dep_type {
DepType::Requires => dep.state.is_satisfied(),
DepType::After => !matches!(
dep.state,
ServiceState::Inactive | ServiceState::Blocked { .. }
),
DepType::Wants => true,
DepType::Conflicts => !dep.state.is_active(),
};
DependencyInfo {
name: dep.name.clone(),
dep_type: *dep_type,
state: dep.state.clone(),
satisfied,
}
})
})
.collect();
let status = LegacyServiceStatus {
name: service.name.clone(),
state: service.state.clone(),
is_target: service.is_target(),
dependencies,
uptime_secs: None,
};
success_response(id, status)
}
async fn handle_get_config(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
match service.service_config() {
Some(config) => success_response(id, config),
None => {
RpcResponse::success(
id,
serde_json::json!({
"name": service.name,
"is_target": true
}),
)
}
}
}
fn get_process_stats(pid: u32) -> (u64, f32) {
use sysinfo::{Pid, ProcessesToUpdate, System};
let mut system = System::new();
let sys_pid = Pid::from_u32(pid);
system.refresh_processes(ProcessesToUpdate::Some(&[sys_pid]), true);
if let Some(process) = system.process(sys_pid) {
let memory_usage = process.memory();
(memory_usage, 0.0)
} else {
(0, 0.0)
}
}
async fn handle_is_running(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let is_running = graph
.get_by_name(name)
.and_then(|service_id| graph.get(service_id))
.map(|service| matches!(service.state, ServiceState::Running { .. }))
.unwrap_or(false);
RpcResponse::success(id, serde_json::json!(is_running))
}
async fn handle_why(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = graph.get(service_id).unwrap();
let blocked = matches!(service.state, ServiceState::Blocked { .. });
let (waiting_on, conflicts_with, port_conflict) = match graph.can_start(service_id) {
Ok(()) => (vec![], vec![], None),
Err(reason) => {
let port_msg = match &reason {
crate::server::error::BlockedReason::PortConflict { ports, services } => {
Some(format!(
"port {} in use by service {}",
ports
.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(", "),
services.join(", ")
))
}
crate::server::error::BlockedReason::ExternalPortConflict {
port,
pid,
process_name,
} => {
let proc_info = match (pid, process_name) {
(Some(p), Some(name)) => format!("{} (PID {})", name, p),
(Some(p), None) => format!("PID {}", p),
(None, Some(name)) => name.clone(),
(None, None) => "unknown process".to_string(),
};
Some(format!(
"port {} in use by external process: {}",
port, proc_info
))
}
_ => None,
};
(reason.waiting_on(), reason.conflicts_with(), port_msg)
}
};
let ascii = graph
.format_why_blocked(name)
.unwrap_or_else(|| "Unknown service".to_string());
let why = WhyBlocked {
name: name.to_string(),
blocked,
waiting_on,
conflicts_with,
port_conflict,
process_conflict: None,
ascii,
};
success_response(id, why)
}
async fn handle_tree(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let tree = graph.format_tree();
let response = TreeResponse { ascii: tree };
success_response(id, response)
}
async fn handle_start(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::StartService {
name: name.to_string(),
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match response_rx.await {
Ok(Ok(())) => success_response(id, OkResponse::default()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "start response dropped"),
}
}
async fn handle_stop(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::StopService {
name: name.to_string(),
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match response_rx.await {
Ok(Ok(())) => success_response(id, OkResponse::default()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "stop response dropped"),
}
}
async fn handle_restart(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::RestartService {
name: name.to_string(),
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match response_rx.await {
Ok(Ok(())) => success_response(id, OkResponse::default()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "restart response dropped"),
}
}
async fn handle_kill(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let signal = params
.get("signal")
.and_then(|v| v.as_str())
.map(String::from);
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::KillService {
name: name.to_string(),
signal,
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match response_rx.await {
Ok(Ok(())) => success_response(id, OkResponse::default()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "kill response dropped"),
}
}
async fn handle_set(
id: u64,
params: &serde_json::Value,
config_dir: &Path,
graph: &Arc<RwLock<ServiceGraph>>,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let config: ServiceConfig =
match serde_json::from_value(params.get("config").cloned().unwrap_or(params.clone())) {
Ok(c) => c,
Err(e) => return RpcResponse::error(id, error_codes::INVALID_PARAMS, e.to_string()),
};
let name = config.service.name.clone();
let errors = crate::sdk::validate::validate_service(&config);
if !errors.is_empty() {
return RpcResponse::error_with_data(
id,
error_codes::VALIDATION_FAILED,
"Validation failed",
serde_json::json!({ "errors": errors }),
);
}
let exec_path = config
.service
.exec
.split_whitespace()
.next()
.unwrap_or(&config.service.exec);
let exec_exists = if exec_path.starts_with('/') {
std::path::Path::new(exec_path).exists()
} else {
std::env::var("PATH")
.unwrap_or_default()
.split(':')
.any(|dir| std::path::Path::new(dir).join(exec_path).exists())
};
if !exec_exists {
return RpcResponse::error(
id,
error_codes::EXEC_NOT_FOUND,
format!("Executable not found: {}", exec_path),
);
}
{
let graph = graph.read().await;
let all_deps = config
.dependencies
.after
.iter()
.chain(config.dependencies.requires.iter())
.chain(config.dependencies.wants.iter())
.chain(config.dependencies.conflicts.iter());
for dep in all_deps {
if dep != &name && graph.get_by_name(dep).is_none() {
return RpcResponse::error(
id,
error_codes::DEPENDENCY_MISSING,
format!("Dependency '{}' not found", dep),
);
}
}
}
let service_exists = {
let graph = graph.read().await;
graph.get_by_name(&name).is_some()
};
if service_exists {
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::StopService {
name: name.clone(),
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
let _ = response_rx.await;
}
{
let graph = graph.read().await;
if graph.get_by_name(&name).is_some() {
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::RemoveService {
name: name.clone(),
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(
id,
error_codes::INTERNAL_ERROR,
"supervisor not available",
);
}
let _ = response_rx.await;
}
}
let file_path = config_dir.join(format!("{}.toml", name));
let _ = std::fs::remove_file(&file_path);
match toml::to_string_pretty(&config) {
Ok(toml_str) => match std::fs::write(&file_path, toml_str) {
Ok(()) => {}
Err(e) => {
return RpcResponse::error(
id,
error_codes::PERSIST_FAILED,
format!("Failed to write {}: {}", file_path.display(), e),
);
}
},
Err(e) => {
return RpcResponse::error(
id,
error_codes::PERSIST_FAILED,
format!("Failed to serialize config: {}", e),
);
}
}
let (add_tx, add_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::AddService {
config,
persist: false, response_tx: add_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match add_rx.await {
Ok(Ok(())) => {
let result = AddServiceResult {
name,
path: Some(file_path.display().to_string()),
warnings: vec![],
};
success_response(id, result)
}
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "add response dropped"),
}
}
async fn handle_delete(
id: u64,
params: &serde_json::Value,
command_tx: &mpsc::Sender<IpcCommand>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(IpcCommand::RemoveService {
name: name.to_string(),
response_tx,
})
.await
.is_err()
{
return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "supervisor not available");
}
match response_rx.await {
Ok(Ok(())) => success_response(id, OkResponse::default()),
Ok(Err(e)) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
Err(_) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, "remove response dropped"),
}
}
async fn handle_logs(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
tail: bool,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let lines = if tail {
params
.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
.or(Some(100))
} else {
params
.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
};
let service_id = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(id) => id,
None => {
return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found");
}
}
};
let logs = crate::server::log::get_logs(log_buffers, service_id, lines).await;
success_response(id, logs)
}
async fn handle_logs_filter(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
log_buffers: &LogBuffers,
) -> RpcResponse {
let name = params
.get("service")
.or_else(|| params.get("name"))
.and_then(|v| v.as_str());
let name = match name {
Some(n) => n,
None => {
return RpcResponse::error(
id,
error_codes::INVALID_PARAMS,
"missing 'service' parameter",
);
}
};
let service_id = {
let graph = graph.read().await;
match graph.get_by_name(name) {
Some(id) => id,
None => {
return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found");
}
}
};
let logs = crate::server::log::get_logs(log_buffers, service_id, None).await;
let log_strings: Vec<String> = logs
.iter()
.map(|l| format!("{} [{}] {}", l.timestamp_ms, l.stream, l.content))
.collect();
success_response(id, log_strings)
}
async fn handle_debug_state(id: u64, graph: &Arc<RwLock<ServiceGraph>>) -> RpcResponse {
let graph = graph.read().await;
let timers = std::collections::HashMap::new();
let output = debug::format_graph_state(&graph, &timers);
RpcResponse::success(id, serde_json::json!({ "output": output }))
}
async fn handle_debug_process_tree(
id: u64,
params: &serde_json::Value,
graph: &Arc<RwLock<ServiceGraph>>,
) -> RpcResponse {
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
let graph = graph.read().await;
let service_id = match graph.get_by_name(name) {
Some(id) => id,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let service = match graph.get(service_id) {
Some(s) => s,
None => return RpcResponse::error(id, error_codes::SERVICE_NOT_FOUND, "service not found"),
};
let pid = match service.state.pid() {
Some(p) if p > 0 => p,
_ => {
return RpcResponse::success(
id,
serde_json::json!({ "output": format!("{} is not running", name) }),
);
}
};
let output = debug::format_process_tree(pid, name);
RpcResponse::success(id, serde_json::json!({ "output": output }))
}
async fn handle_xinet_set(
id: u64,
params: &serde_json::Value,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let config: XinetConfig = match serde_json::from_value(params.clone()) {
Ok(c) => c,
Err(e) => {
return RpcResponse::error(
id,
error_codes::INVALID_PARAMS,
format!("Invalid xinet config: {}", e),
);
}
};
match manager.register(config).await {
Ok(()) => success_response(id, OkResponse::default()),
Err(e) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
}
}
async fn handle_xinet_delete(
id: u64,
params: &serde_json::Value,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return RpcResponse::error(id, error_codes::INVALID_PARAMS, "missing 'name' parameter");
}
};
match manager.unregister(name).await {
Ok(()) => success_response(id, OkResponse::default()),
Err(e) => RpcResponse::error(id, error_codes::INTERNAL_ERROR, e),
}
}
async fn handle_xinet_list(id: u64, xinet_manager: &Option<Arc<XinetManager>>) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let names = manager.list().await;
success_response(id, names)
}
async fn handle_xinet_status_all(
id: u64,
xinet_manager: &Option<Arc<XinetManager>>,
) -> RpcResponse {
let manager = match xinet_manager {
Some(m) => m,
None => return RpcResponse::error(id, error_codes::INTERNAL_ERROR, "xinet not enabled"),
};
let statuses = manager.status_all().await;
success_response(id, statuses)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sdk::{DependencyDef, LifecycleDef, LoggingDef, ServiceConfig, ServiceDef};
use crate::server::graph::{Service, ServiceGraph};
use std::collections::HashMap;
fn make_test_config(name: &str) -> ServiceConfig {
ServiceConfig {
service: ServiceDef {
name: name.to_string(),
exec: "/bin/test".to_string(),
dir: None,
oneshot: false,
env: HashMap::new(),
status: crate::sdk::Status::default(),
class: crate::sdk::ServiceClass::default(),
critical: false,
ports: Vec::new(),
kill_others: false,
process_filters: Vec::new(),
},
dependencies: DependencyDef::default(),
lifecycle: LifecycleDef::default(),
health: None,
logging: LoggingDef::default(),
}
}
#[test]
fn test_handle_ping() {
let response = handle_ping(1);
assert!(response.is_ok());
}
#[test]
fn test_handle_discover() {
let response = handle_discover(1);
assert!(response.is_ok());
if let Some(result) = &response.result {
assert!(
result.get("openrpc").is_some(),
"should have openrpc version"
);
assert!(result.get("info").is_some(), "should have info section");
assert!(result.get("methods").is_some(), "should have methods array");
let info = result.get("info").unwrap();
assert_eq!(
info.get("title").and_then(|v| v.as_str()),
Some("Zinit RPC API")
);
let methods = result.get("methods").unwrap().as_array().unwrap();
assert!(!methods.is_empty(), "should have methods defined");
let has_discover = methods
.iter()
.any(|m| m.get("name").and_then(|n| n.as_str()) == Some("rpc.discover"));
assert!(has_discover, "should include rpc.discover method");
} else {
panic!("Expected result in response");
}
}
#[tokio::test]
async fn test_handle_is_running_not_found() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let params = serde_json::json!({"name": "nonexistent"});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(response.is_ok());
assert_eq!(response.result, Some(serde_json::json!(false)));
}
#[tokio::test]
async fn test_handle_is_running_inactive() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
{
let mut g = graph.write().await;
let service = Service::from_service(make_test_config("test-service"));
g.add_service(service).unwrap();
}
let params = serde_json::json!({"name": "test-service"});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(response.is_ok());
assert_eq!(response.result, Some(serde_json::json!(false)));
}
#[tokio::test]
async fn test_handle_is_running_running() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
{
let mut g = graph.write().await;
let service = Service::from_service(make_test_config("test-service"));
let id = g.add_service(service).unwrap();
g.set_state(id, ServiceState::Running { pid: 1234 });
}
let params = serde_json::json!({"name": "test-service"});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(response.is_ok());
assert_eq!(response.result, Some(serde_json::json!(true)));
}
#[tokio::test]
async fn test_handle_is_running_missing_param() {
let graph = Arc::new(RwLock::new(ServiceGraph::new()));
let params = serde_json::json!({});
let response = handle_is_running(1, ¶ms, &graph).await;
assert!(!response.is_ok());
}
}