aimdb_client/
connection.rs1use crate::error::{ClientError, ClientResult};
6use crate::protocol::{
7 cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
8 RequestExt, Response, ResponseExt, WelcomeMessage,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::path::{Path, PathBuf};
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
16use tokio::net::UnixStream;
17
18const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
20
21pub struct AimxClient {
23 socket_path: PathBuf,
24 stream: OwnedWriteHalf,
25 reader: BufReader<OwnedReadHalf>,
26 request_id_counter: u64,
27 server_info: WelcomeMessage,
28}
29
30impl AimxClient {
31 pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
33 let socket_path = socket_path.as_ref().to_path_buf();
34
35 let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
37 .await
38 .map_err(|_| {
39 ClientError::connection_failed(
40 socket_path.display().to_string(),
41 "connection timeout",
42 )
43 })?
44 .map_err(|e| {
45 ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
46 })?;
47
48 let (reader_stream, writer_stream) = stream.into_split();
50
51 let reader = BufReader::new(reader_stream);
52 let mut client = Self {
53 socket_path,
54 stream: writer_stream,
55 reader,
56 request_id_counter: 0,
57 server_info: WelcomeMessage {
58 version: String::new(),
59 server: String::new(),
60 permissions: Vec::new(),
61 writable_records: Vec::new(),
62 max_subscriptions: None,
63 authenticated: None,
64 },
65 };
66
67 client.handshake().await?;
69
70 Ok(client)
71 }
72
73 async fn handshake(&mut self) -> ClientResult<()> {
75 let hello = cli_hello();
77 self.write_message(&hello).await?;
78
79 let welcome: WelcomeMessage = self.read_message().await?;
81 self.server_info = welcome;
82
83 Ok(())
84 }
85
86 pub fn server_info(&self) -> &WelcomeMessage {
88 &self.server_info
89 }
90
91 async fn send_request(
93 &mut self,
94 method: &str,
95 params: Option<serde_json::Value>,
96 ) -> ClientResult<serde_json::Value> {
97 self.request_id_counter += 1;
98 let id = self.request_id_counter;
99
100 let request = if let Some(params) = params {
101 Request::with_params(id, method, params)
102 } else {
103 Request::new(id, method)
104 };
105
106 self.write_message(&request).await?;
107
108 let response: Response = self.read_message().await?;
109
110 match response.into_result() {
111 Ok(result) => Ok(result),
112 Err(error) => Err(ClientError::server_error(
113 error.code,
114 error.message,
115 error.details,
116 )),
117 }
118 }
119
120 pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
122 let result = self.send_request("record.list", None).await?;
123 let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
124 Ok(records)
125 }
126
127 pub async fn reset_stage_profiling(&mut self) -> ClientResult<serde_json::Value> {
132 self.send_request("profiling.reset", None).await
133 }
134
135 pub async fn reset_buffer_metrics(&mut self) -> ClientResult<serde_json::Value> {
140 self.send_request("buffer_metrics.reset", None).await
141 }
142
143 pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
145 let params = json!({ "record": name });
146 self.send_request("record.get", Some(params)).await
147 }
148
149 pub async fn set_record(
151 &mut self,
152 name: &str,
153 value: serde_json::Value,
154 ) -> ClientResult<serde_json::Value> {
155 let params = json!({
156 "name": name,
157 "value": value
158 });
159 self.send_request("record.set", Some(params)).await
160 }
161
162 pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
164 let params = json!({
165 "name": name,
166 "queue_size": queue_size
167 });
168 let result = self.send_request("record.subscribe", Some(params)).await?;
169
170 let subscription_id = result["subscription_id"]
171 .as_str()
172 .ok_or_else(|| {
173 ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
174 })?
175 .to_string();
176
177 Ok(subscription_id)
178 }
179
180 pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
182 let params = json!({ "subscription_id": subscription_id });
183 self.send_request("record.unsubscribe", Some(params))
184 .await?;
185 Ok(())
186 }
187
188 pub async fn receive_event(&mut self) -> ClientResult<Event> {
190 let event_msg: EventMessage = self.read_message().await?;
191 Ok(event_msg.event)
192 }
193
194 pub async fn drain_record(&mut self, name: &str) -> ClientResult<DrainResponse> {
203 let params = json!({ "name": name });
204 let result = self.send_request("record.drain", Some(params)).await?;
205 let response: DrainResponse = serde_json::from_value(result)?;
206 Ok(response)
207 }
208
209 pub async fn drain_record_with_limit(
211 &mut self,
212 name: &str,
213 limit: u32,
214 ) -> ClientResult<DrainResponse> {
215 let params = json!({
216 "name": name,
217 "limit": limit,
218 });
219 let result = self.send_request("record.drain", Some(params)).await?;
220 let response: DrainResponse = serde_json::from_value(result)?;
221 Ok(response)
222 }
223
224 pub async fn graph_nodes(&mut self) -> ClientResult<Vec<serde_json::Value>> {
233 let result = self.send_request("graph.nodes", None).await?;
234 let nodes: Vec<serde_json::Value> = serde_json::from_value(result)?;
235 Ok(nodes)
236 }
237
238 pub async fn graph_edges(&mut self) -> ClientResult<Vec<serde_json::Value>> {
243 let result = self.send_request("graph.edges", None).await?;
244 let edges: Vec<serde_json::Value> = serde_json::from_value(result)?;
245 Ok(edges)
246 }
247
248 pub async fn graph_topo_order(&mut self) -> ClientResult<Vec<String>> {
254 let result = self.send_request("graph.topo_order", None).await?;
255 let order: Vec<String> = serde_json::from_value(result)?;
256 Ok(order)
257 }
258
259 async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
261 let data = serialize_message(msg)?;
262 self.stream.write_all(data.as_bytes()).await?;
263 self.stream.flush().await?;
264 Ok(())
265 }
266
267 async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
269 let mut line = String::new();
270 self.reader.read_line(&mut line).await?;
271
272 if line.is_empty() {
273 return Err(ClientError::connection_failed(
274 self.socket_path.display().to_string(),
275 "connection closed by server",
276 ));
277 }
278
279 parse_message(&line).map_err(|e| e.into())
280 }
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct DrainResponse {
286 pub record_name: String,
288 pub values: Vec<serde_json::Value>,
290 pub count: usize,
292}