use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
use serde_json::Value;
use super::config::ServiceConfig;
use super::protocol::{RpcError, RpcRequest, RpcResponse};
use super::responses::{
AddServiceResult, BulkDeleteResult, BulkStartResult, BulkStopResult, OkResponse, PingResponse,
PrepareRestartResult, ReloadResult, ServiceStats, ServiceStatus, WhyBlocked,
};
use super::socket;
use super::xinet::{ProxyStatus, XinetConfig};
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("RPC error: {0}")]
Rpc(#[from] RpcError),
}
#[derive(Debug)]
pub struct ZinitClient {
stream: UnixStream,
reader: BufReader<UnixStream>,
next_id: u64,
}
impl ZinitClient {
pub fn connect(path: &Path) -> Result<Self, ClientError> {
let stream = UnixStream::connect(path)?;
let reader = BufReader::new(stream.try_clone()?);
Ok(Self {
stream,
reader,
next_id: 1,
})
}
pub fn connect_default() -> Result<Self, ClientError> {
Self::connect(&socket::default_path())
}
pub fn call(&mut self, method: &str, params: Value) -> Result<RpcResponse, ClientError> {
let id = self.next_id;
self.next_id += 1;
let request = RpcRequest::new(id, method, params);
let mut request_json = serde_json::to_string(&request)?;
request_json.push('\n');
self.stream.write_all(request_json.as_bytes())?;
self.stream.flush()?;
let mut response_line = String::new();
self.reader.read_line(&mut response_line)?;
let response: RpcResponse = serde_json::from_str(&response_line)?;
Ok(response)
}
pub fn ping(&mut self) -> Result<String, ClientError> {
let response = self.call("system.ping", Value::Null)?;
let ping: PingResponse = response.into_result()?;
Ok(ping.version)
}
pub fn shutdown(&mut self) -> Result<(), ClientError> {
let response = self.call("system.shutdown", Value::Null)?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn prepare_restart(&mut self) -> Result<PrepareRestartResult, ClientError> {
let response = self.call("system.prepare_restart", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn list(&mut self) -> Result<Vec<String>, ClientError> {
let response = self.call("service.list", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn status(&mut self, name: &str) -> Result<ServiceStatus, ClientError> {
let response = self.call("service.status", serde_json::json!({ "name": name }))?;
response.into_result().map_err(Into::into)
}
pub fn start(&mut self, name: &str) -> Result<(), ClientError> {
let response = self.call("service.start", serde_json::json!({ "name": name }))?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn stop(&mut self, name: &str) -> Result<(), ClientError> {
let response = self.call("service.stop", serde_json::json!({ "name": name }))?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn restart(&mut self, name: &str) -> Result<(), ClientError> {
let response = self.call("service.restart", serde_json::json!({ "name": name }))?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn kill(&mut self, name: &str, signal: Option<&str>) -> Result<(), ClientError> {
let params = match signal {
Some(sig) => serde_json::json!({ "name": name, "signal": sig }),
None => serde_json::json!({ "name": name }),
};
let response = self.call("service.kill", params)?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn why(&mut self, name: &str) -> Result<WhyBlocked, ClientError> {
let response = self.call("service.why", serde_json::json!({ "name": name }))?;
response.into_result().map_err(Into::into)
}
pub fn tree(&mut self) -> Result<String, ClientError> {
let response = self.call("service.tree", Value::Null)?;
let tree: super::responses::TreeResponse = response.into_result()?;
Ok(tree.ascii)
}
pub fn add_service(
&mut self,
config: &ServiceConfig,
persist: bool,
) -> Result<AddServiceResult, ClientError> {
let response = self.call(
"service.add",
serde_json::json!({
"config": config,
"persist": persist
}),
)?;
response.into_result().map_err(Into::into)
}
pub fn add(&mut self, config: ServiceConfig) -> Result<AddServiceResult, ClientError> {
self.add_service(&config, false)
}
pub fn remove(&mut self, name: &str) -> Result<(), ClientError> {
let response = self.call("service.remove", serde_json::json!({ "name": name }))?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn start_all(&mut self) -> Result<BulkStartResult, ClientError> {
let response = self.call("service.start_all", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn stop_all(&mut self) -> Result<BulkStopResult, ClientError> {
let response = self.call("service.stop_all", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn delete_all(&mut self) -> Result<BulkDeleteResult, ClientError> {
let response = self.call("service.delete_all", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn reload(&mut self) -> Result<ReloadResult, ClientError> {
let response = self.call("service.reload", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn logs(&mut self, name: &str, lines: Option<usize>) -> Result<Vec<String>, ClientError> {
let params = match lines {
Some(n) => serde_json::json!({ "name": name, "lines": n }),
None => serde_json::json!({ "name": name }),
};
let response = self.call("logs.get", params)?;
response.into_result().map_err(Into::into)
}
pub fn stats(&mut self, name: &str) -> Result<ServiceStats, ClientError> {
let response = self.call("service.stats", serde_json::json!({ "name": name }))?;
response.into_result().map_err(Into::into)
}
pub fn is_running(&mut self, name: &str) -> Result<bool, ClientError> {
let status = self.status(name)?;
Ok(status.state.is_running())
}
pub fn xinet_register(&mut self, config: &XinetConfig) -> Result<(), ClientError> {
let response = self.call("xinet.register", serde_json::to_value(config)?)?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn xinet_unregister(&mut self, name: &str) -> Result<(), ClientError> {
let response = self.call("xinet.unregister", serde_json::json!({ "name": name }))?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub fn xinet_list(&mut self) -> Result<Vec<String>, ClientError> {
let response = self.call("xinet.list", Value::Null)?;
response.into_result().map_err(Into::into)
}
pub fn xinet_status(&mut self, name: &str) -> Result<ProxyStatus, ClientError> {
let response = self.call("xinet.status", serde_json::json!({ "name": name }))?;
response.into_result().map_err(Into::into)
}
pub fn xinet_status_all(&mut self) -> Result<Vec<ProxyStatus>, ClientError> {
let response = self.call("xinet.status_all", Value::Null)?;
response.into_result().map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sdk::protocol::error_codes;
#[test]
fn test_client_error_display() {
let io_err = ClientError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"socket not found",
));
assert!(io_err.to_string().contains("IO error"));
let rpc_err = ClientError::Rpc(RpcError {
code: error_codes::SERVICE_NOT_FOUND,
message: "not found".to_string(),
data: None,
});
assert!(rpc_err.to_string().contains("RPC error"));
}
#[test]
fn test_connect_nonexistent_socket() {
let result = ZinitClient::connect(Path::new("/nonexistent/socket.sock"));
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ClientError::Io(_)));
}
}