aimdb_client/
connection.rs1use crate::error::{ClientError, ClientResult};
6use crate::protocol::{
7 cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
8 RequestExt, Response, ResponseExt, WelcomeMessage,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::path::{Path, PathBuf};
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
16use tokio::net::UnixStream;
17
18const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
20
21pub struct AimxClient {
23 socket_path: PathBuf,
24 stream: OwnedWriteHalf,
25 reader: BufReader<OwnedReadHalf>,
26 request_id_counter: u64,
27 server_info: WelcomeMessage,
28}
29
30impl AimxClient {
31 pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
33 let socket_path = socket_path.as_ref().to_path_buf();
34
35 let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
37 .await
38 .map_err(|_| {
39 ClientError::connection_failed(
40 socket_path.display().to_string(),
41 "connection timeout",
42 )
43 })?
44 .map_err(|e| {
45 ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
46 })?;
47
48 let (reader_stream, writer_stream) = stream.into_split();
50
51 let reader = BufReader::new(reader_stream);
52 let mut client = Self {
53 socket_path,
54 stream: writer_stream,
55 reader,
56 request_id_counter: 0,
57 server_info: WelcomeMessage {
58 version: String::new(),
59 server: String::new(),
60 permissions: Vec::new(),
61 writable_records: Vec::new(),
62 max_subscriptions: None,
63 authenticated: None,
64 },
65 };
66
67 client.handshake().await?;
69
70 Ok(client)
71 }
72
73 async fn handshake(&mut self) -> ClientResult<()> {
75 let hello = cli_hello();
77 self.write_message(&hello).await?;
78
79 let welcome: WelcomeMessage = self.read_message().await?;
81 self.server_info = welcome;
82
83 Ok(())
84 }
85
86 pub fn server_info(&self) -> &WelcomeMessage {
88 &self.server_info
89 }
90
91 async fn send_request(
93 &mut self,
94 method: &str,
95 params: Option<serde_json::Value>,
96 ) -> ClientResult<serde_json::Value> {
97 self.request_id_counter += 1;
98 let id = self.request_id_counter;
99
100 let request = if let Some(params) = params {
101 Request::with_params(id, method, params)
102 } else {
103 Request::new(id, method)
104 };
105
106 self.write_message(&request).await?;
107
108 let response: Response = self.read_message().await?;
109
110 match response.into_result() {
111 Ok(result) => Ok(result),
112 Err(error) => Err(ClientError::server_error(
113 error.code,
114 error.message,
115 error.details,
116 )),
117 }
118 }
119
120 pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
122 let result = self.send_request("record.list", None).await?;
123 let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
124 Ok(records)
125 }
126
127 pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
129 let params = json!({ "record": name });
130 self.send_request("record.get", Some(params)).await
131 }
132
133 pub async fn set_record(
135 &mut self,
136 name: &str,
137 value: serde_json::Value,
138 ) -> ClientResult<serde_json::Value> {
139 let params = json!({
140 "name": name,
141 "value": value
142 });
143 self.send_request("record.set", Some(params)).await
144 }
145
146 pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
148 let params = json!({
149 "name": name,
150 "queue_size": queue_size
151 });
152 let result = self.send_request("record.subscribe", Some(params)).await?;
153
154 let subscription_id = result["subscription_id"]
155 .as_str()
156 .ok_or_else(|| {
157 ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
158 })?
159 .to_string();
160
161 Ok(subscription_id)
162 }
163
164 pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
166 let params = json!({ "subscription_id": subscription_id });
167 self.send_request("record.unsubscribe", Some(params))
168 .await?;
169 Ok(())
170 }
171
172 pub async fn receive_event(&mut self) -> ClientResult<Event> {
174 let event_msg: EventMessage = self.read_message().await?;
175 Ok(event_msg.event)
176 }
177
178 pub async fn drain_record(&mut self, name: &str) -> ClientResult<DrainResponse> {
187 let params = json!({ "name": name });
188 let result = self.send_request("record.drain", Some(params)).await?;
189 let response: DrainResponse = serde_json::from_value(result)?;
190 Ok(response)
191 }
192
193 pub async fn drain_record_with_limit(
195 &mut self,
196 name: &str,
197 limit: u32,
198 ) -> ClientResult<DrainResponse> {
199 let params = json!({
200 "name": name,
201 "limit": limit,
202 });
203 let result = self.send_request("record.drain", Some(params)).await?;
204 let response: DrainResponse = serde_json::from_value(result)?;
205 Ok(response)
206 }
207
208 pub async fn graph_nodes(&mut self) -> ClientResult<Vec<serde_json::Value>> {
217 let result = self.send_request("graph.nodes", None).await?;
218 let nodes: Vec<serde_json::Value> = serde_json::from_value(result)?;
219 Ok(nodes)
220 }
221
222 pub async fn graph_edges(&mut self) -> ClientResult<Vec<serde_json::Value>> {
227 let result = self.send_request("graph.edges", None).await?;
228 let edges: Vec<serde_json::Value> = serde_json::from_value(result)?;
229 Ok(edges)
230 }
231
232 pub async fn graph_topo_order(&mut self) -> ClientResult<Vec<String>> {
238 let result = self.send_request("graph.topo_order", None).await?;
239 let order: Vec<String> = serde_json::from_value(result)?;
240 Ok(order)
241 }
242
243 async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
245 let data = serialize_message(msg)?;
246 self.stream.write_all(data.as_bytes()).await?;
247 self.stream.flush().await?;
248 Ok(())
249 }
250
251 async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
253 let mut line = String::new();
254 self.reader.read_line(&mut line).await?;
255
256 if line.is_empty() {
257 return Err(ClientError::connection_failed(
258 self.socket_path.display().to_string(),
259 "connection closed by server",
260 ));
261 }
262
263 parse_message(&line).map_err(|e| e.into())
264 }
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct DrainResponse {
270 pub record_name: String,
272 pub values: Vec<serde_json::Value>,
274 pub count: usize,
276}