use std::net::SocketAddr;
use std::path::Path;
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::{TcpStream, UnixStream};
use super::config::ServiceConfig;
use super::protocol::{RpcError, RpcRequest, RpcResponse};
use super::responses::{
AddServiceResult, BulkDeleteResult, BulkStartResult, BulkStopResult, OkResponse, PingResponse,
PrepareRestartResult, ReloadResult, ServiceStats, ServiceStatus, WhyBlocked,
};
use super::socket;
use super::xinet::{ProxyStatus, XinetConfig};
#[derive(Debug, thiserror::Error)]
pub enum AsyncClientError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("RPC error: {0}")]
Rpc(#[from] RpcError),
}
enum Transport {
Unix {
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
writer: BufWriter<tokio::net::unix::OwnedWriteHalf>,
},
Tcp {
reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
writer: BufWriter<tokio::net::tcp::OwnedWriteHalf>,
},
}
pub struct AsyncZinitClient {
transport: Transport,
next_id: u64,
}
impl AsyncZinitClient {
pub async fn connect_unix(path: impl AsRef<Path>) -> Result<Self, AsyncClientError> {
let stream = UnixStream::connect(path.as_ref()).await?;
let (read_half, write_half) = stream.into_split();
Ok(Self {
transport: Transport::Unix {
reader: BufReader::new(read_half),
writer: BufWriter::new(write_half),
},
next_id: 1,
})
}
pub async fn connect_tcp(addr: SocketAddr) -> Result<Self, AsyncClientError> {
let stream = TcpStream::connect(addr).await?;
let (read_half, write_half) = stream.into_split();
Ok(Self {
transport: Transport::Tcp {
reader: BufReader::new(read_half),
writer: BufWriter::new(write_half),
},
next_id: 1,
})
}
pub async fn connect_default() -> Result<Self, AsyncClientError> {
Self::connect_unix(socket::default_path()).await
}
pub async fn connect(uri: &str) -> Result<Self, AsyncClientError> {
if let Some(path) = uri.strip_prefix("unix:") {
Self::connect_unix(path).await
} else if let Some(addr_str) = uri.strip_prefix("tcp://") {
let addr: SocketAddr = addr_str.parse().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Invalid TCP address: {}", e),
)
})?;
Self::connect_tcp(addr).await
} else if uri.starts_with('/') {
Self::connect_unix(uri).await
} else {
Err(AsyncClientError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Unknown URI scheme: {}", uri),
)))
}
}
pub async fn call(
&mut self,
method: &str,
params: Value,
) -> Result<RpcResponse, AsyncClientError> {
let id = self.next_id;
self.next_id += 1;
let request = RpcRequest::new(id, method, params);
let mut request_json = serde_json::to_string(&request)?;
request_json.push('\n');
match &mut self.transport {
Transport::Unix { writer, .. } => {
writer.write_all(request_json.as_bytes()).await?;
writer.flush().await?;
}
Transport::Tcp { writer, .. } => {
writer.write_all(request_json.as_bytes()).await?;
writer.flush().await?;
}
}
let mut response_line = String::new();
match &mut self.transport {
Transport::Unix { reader, .. } => {
reader.read_line(&mut response_line).await?;
}
Transport::Tcp { reader, .. } => {
reader.read_line(&mut response_line).await?;
}
}
let response: RpcResponse = serde_json::from_str(&response_line)?;
Ok(response)
}
pub async fn ping(&mut self) -> Result<String, AsyncClientError> {
let response = self.call("system.ping", Value::Null).await?;
let ping: PingResponse = response.into_result()?;
Ok(ping.version)
}
pub async fn shutdown(&mut self) -> Result<(), AsyncClientError> {
let response = self.call("system.shutdown", Value::Null).await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn prepare_restart(&mut self) -> Result<PrepareRestartResult, AsyncClientError> {
let response = self.call("system.prepare_restart", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn list(&mut self) -> Result<Vec<String>, AsyncClientError> {
let response = self.call("service.list", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn status(&mut self, name: &str) -> Result<ServiceStatus, AsyncClientError> {
let response = self
.call("service.status", serde_json::json!({ "name": name }))
.await?;
response.into_result().map_err(Into::into)
}
pub async fn start(&mut self, name: &str) -> Result<(), AsyncClientError> {
let response = self
.call("service.start", serde_json::json!({ "name": name }))
.await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn stop(&mut self, name: &str) -> Result<(), AsyncClientError> {
let response = self
.call("service.stop", serde_json::json!({ "name": name }))
.await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn restart(&mut self, name: &str) -> Result<(), AsyncClientError> {
let response = self
.call("service.restart", serde_json::json!({ "name": name }))
.await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn kill(&mut self, name: &str, signal: Option<&str>) -> Result<(), AsyncClientError> {
let params = match signal {
Some(sig) => serde_json::json!({ "name": name, "signal": sig }),
None => serde_json::json!({ "name": name }),
};
let response = self.call("service.kill", params).await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn why(&mut self, name: &str) -> Result<WhyBlocked, AsyncClientError> {
let response = self
.call("service.why", serde_json::json!({ "name": name }))
.await?;
response.into_result().map_err(Into::into)
}
pub async fn tree(&mut self) -> Result<String, AsyncClientError> {
let response = self.call("service.tree", Value::Null).await?;
let tree: super::responses::TreeResponse = response.into_result()?;
Ok(tree.ascii)
}
pub async fn add_service(
&mut self,
config: &ServiceConfig,
persist: bool,
) -> Result<AddServiceResult, AsyncClientError> {
let response = self
.call(
"service.add",
serde_json::json!({
"config": config,
"persist": persist
}),
)
.await?;
response.into_result().map_err(Into::into)
}
pub async fn add(
&mut self,
config: ServiceConfig,
) -> Result<AddServiceResult, AsyncClientError> {
self.add_service(&config, false).await
}
pub async fn remove(&mut self, name: &str) -> Result<(), AsyncClientError> {
let response = self
.call("service.remove", serde_json::json!({ "name": name }))
.await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn reload(&mut self) -> Result<ReloadResult, AsyncClientError> {
let response = self.call("service.reload", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn logs(
&mut self,
name: &str,
lines: Option<usize>,
) -> Result<Vec<String>, AsyncClientError> {
let params = match lines {
Some(n) => serde_json::json!({ "name": name, "lines": n }),
None => serde_json::json!({ "name": name }),
};
let response = self.call("logs.get", params).await?;
response.into_result().map_err(Into::into)
}
pub async fn start_all(&mut self) -> Result<BulkStartResult, AsyncClientError> {
let response = self.call("service.start_all", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn stop_all(&mut self) -> Result<BulkStopResult, AsyncClientError> {
let response = self.call("service.stop_all", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn delete_all(&mut self) -> Result<BulkDeleteResult, AsyncClientError> {
let response = self.call("service.delete_all", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn stats(&mut self, name: &str) -> Result<ServiceStats, AsyncClientError> {
let response = self
.call("service.stats", serde_json::json!({ "name": name }))
.await?;
response.into_result().map_err(Into::into)
}
pub async fn is_running(&mut self, name: &str) -> Result<bool, AsyncClientError> {
let status = self.status(name).await?;
Ok(status.state.is_running())
}
pub async fn xinet_register(&mut self, config: &XinetConfig) -> Result<(), AsyncClientError> {
let response = self
.call("xinet.register", serde_json::to_value(config)?)
.await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn xinet_unregister(&mut self, name: &str) -> Result<(), AsyncClientError> {
let response = self
.call("xinet.unregister", serde_json::json!({ "name": name }))
.await?;
let _: OkResponse = response.into_result()?;
Ok(())
}
pub async fn xinet_list(&mut self) -> Result<Vec<String>, AsyncClientError> {
let response = self.call("xinet.list", Value::Null).await?;
response.into_result().map_err(Into::into)
}
pub async fn xinet_status(&mut self, name: &str) -> Result<ProxyStatus, AsyncClientError> {
let response = self
.call("xinet.status", serde_json::json!({ "name": name }))
.await?;
response.into_result().map_err(Into::into)
}
pub async fn xinet_status_all(&mut self) -> Result<Vec<ProxyStatus>, AsyncClientError> {
let response = self.call("xinet.status_all", Value::Null).await?;
response.into_result().map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sdk::protocol::error_codes;
#[test]
fn test_async_client_error_display() {
let io_err = AsyncClientError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"socket not found",
));
assert!(io_err.to_string().contains("IO error"));
let rpc_err = AsyncClientError::Rpc(RpcError {
code: error_codes::SERVICE_NOT_FOUND,
message: "not found".to_string(),
data: None,
});
assert!(rpc_err.to_string().contains("RPC error"));
}
}