use crate::error::{ClientError, ClientResult};
use crate::protocol::{
cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
RequestExt, Response, ResponseExt, WelcomeMessage,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::UnixStream;
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub struct AimxClient {
socket_path: PathBuf,
stream: OwnedWriteHalf,
reader: BufReader<OwnedReadHalf>,
request_id_counter: u64,
server_info: WelcomeMessage,
}
impl AimxClient {
pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
let socket_path = socket_path.as_ref().to_path_buf();
let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
.await
.map_err(|_| {
ClientError::connection_failed(
socket_path.display().to_string(),
"connection timeout",
)
})?
.map_err(|e| {
ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
})?;
let (reader_stream, writer_stream) = stream.into_split();
let reader = BufReader::new(reader_stream);
let mut client = Self {
socket_path,
stream: writer_stream,
reader,
request_id_counter: 0,
server_info: WelcomeMessage {
version: String::new(),
server: String::new(),
permissions: Vec::new(),
writable_records: Vec::new(),
max_subscriptions: None,
authenticated: None,
},
};
client.handshake().await?;
Ok(client)
}
async fn handshake(&mut self) -> ClientResult<()> {
let hello = cli_hello();
self.write_message(&hello).await?;
let welcome: WelcomeMessage = self.read_message().await?;
self.server_info = welcome;
Ok(())
}
pub fn server_info(&self) -> &WelcomeMessage {
&self.server_info
}
async fn send_request(
&mut self,
method: &str,
params: Option<serde_json::Value>,
) -> ClientResult<serde_json::Value> {
self.request_id_counter += 1;
let id = self.request_id_counter;
let request = if let Some(params) = params {
Request::with_params(id, method, params)
} else {
Request::new(id, method)
};
self.write_message(&request).await?;
let response: Response = self.read_message().await?;
match response.into_result() {
Ok(result) => Ok(result),
Err(error) => Err(ClientError::server_error(
error.code,
error.message,
error.details,
)),
}
}
pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
let result = self.send_request("record.list", None).await?;
let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
Ok(records)
}
pub async fn reset_stage_profiling(&mut self) -> ClientResult<serde_json::Value> {
self.send_request("profiling.reset", None).await
}
pub async fn reset_buffer_metrics(&mut self) -> ClientResult<serde_json::Value> {
self.send_request("buffer_metrics.reset", None).await
}
pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
let params = json!({ "record": name });
self.send_request("record.get", Some(params)).await
}
pub async fn set_record(
&mut self,
name: &str,
value: serde_json::Value,
) -> ClientResult<serde_json::Value> {
let params = json!({
"name": name,
"value": value
});
self.send_request("record.set", Some(params)).await
}
pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
let params = json!({
"name": name,
"queue_size": queue_size
});
let result = self.send_request("record.subscribe", Some(params)).await?;
let subscription_id = result["subscription_id"]
.as_str()
.ok_or_else(|| {
ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
})?
.to_string();
Ok(subscription_id)
}
pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
let params = json!({ "subscription_id": subscription_id });
self.send_request("record.unsubscribe", Some(params))
.await?;
Ok(())
}
pub async fn receive_event(&mut self) -> ClientResult<Event> {
let event_msg: EventMessage = self.read_message().await?;
Ok(event_msg.event)
}
pub async fn drain_record(&mut self, name: &str) -> ClientResult<DrainResponse> {
let params = json!({ "name": name });
let result = self.send_request("record.drain", Some(params)).await?;
let response: DrainResponse = serde_json::from_value(result)?;
Ok(response)
}
pub async fn drain_record_with_limit(
&mut self,
name: &str,
limit: u32,
) -> ClientResult<DrainResponse> {
let params = json!({
"name": name,
"limit": limit,
});
let result = self.send_request("record.drain", Some(params)).await?;
let response: DrainResponse = serde_json::from_value(result)?;
Ok(response)
}
pub async fn graph_nodes(&mut self) -> ClientResult<Vec<serde_json::Value>> {
let result = self.send_request("graph.nodes", None).await?;
let nodes: Vec<serde_json::Value> = serde_json::from_value(result)?;
Ok(nodes)
}
pub async fn graph_edges(&mut self) -> ClientResult<Vec<serde_json::Value>> {
let result = self.send_request("graph.edges", None).await?;
let edges: Vec<serde_json::Value> = serde_json::from_value(result)?;
Ok(edges)
}
pub async fn graph_topo_order(&mut self) -> ClientResult<Vec<String>> {
let result = self.send_request("graph.topo_order", None).await?;
let order: Vec<String> = serde_json::from_value(result)?;
Ok(order)
}
async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
let data = serialize_message(msg)?;
self.stream.write_all(data.as_bytes()).await?;
self.stream.flush().await?;
Ok(())
}
async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
let mut line = String::new();
self.reader.read_line(&mut line).await?;
if line.is_empty() {
return Err(ClientError::connection_failed(
self.socket_path.display().to_string(),
"connection closed by server",
));
}
parse_message(&line).map_err(|e| e.into())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DrainResponse {
pub record_name: String,
pub values: Vec<serde_json::Value>,
pub count: usize,
}