aimdb_client/
connection.rs1use crate::error::{ClientError, ClientResult};
6use crate::protocol::{
7 cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
8 RequestExt, Response, ResponseExt, WelcomeMessage,
9};
10use serde_json::json;
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
15use tokio::net::UnixStream;
16
17const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
19
20pub struct AimxClient {
22 socket_path: PathBuf,
23 stream: OwnedWriteHalf,
24 reader: BufReader<OwnedReadHalf>,
25 request_id_counter: u64,
26 server_info: WelcomeMessage,
27}
28
29impl AimxClient {
30 pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
32 let socket_path = socket_path.as_ref().to_path_buf();
33
34 let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
36 .await
37 .map_err(|_| {
38 ClientError::connection_failed(
39 socket_path.display().to_string(),
40 "connection timeout",
41 )
42 })?
43 .map_err(|e| {
44 ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
45 })?;
46
47 let (reader_stream, writer_stream) = stream.into_split();
49
50 let reader = BufReader::new(reader_stream);
51 let mut client = Self {
52 socket_path,
53 stream: writer_stream,
54 reader,
55 request_id_counter: 0,
56 server_info: WelcomeMessage {
57 version: String::new(),
58 server: String::new(),
59 permissions: Vec::new(),
60 writable_records: Vec::new(),
61 max_subscriptions: None,
62 authenticated: None,
63 },
64 };
65
66 client.handshake().await?;
68
69 Ok(client)
70 }
71
72 async fn handshake(&mut self) -> ClientResult<()> {
74 let hello = cli_hello();
76 self.write_message(&hello).await?;
77
78 let welcome: WelcomeMessage = self.read_message().await?;
80 self.server_info = welcome;
81
82 Ok(())
83 }
84
85 pub fn server_info(&self) -> &WelcomeMessage {
87 &self.server_info
88 }
89
90 async fn send_request(
92 &mut self,
93 method: &str,
94 params: Option<serde_json::Value>,
95 ) -> ClientResult<serde_json::Value> {
96 self.request_id_counter += 1;
97 let id = self.request_id_counter;
98
99 let request = if let Some(params) = params {
100 Request::with_params(id, method, params)
101 } else {
102 Request::new(id, method)
103 };
104
105 self.write_message(&request).await?;
106
107 let response: Response = self.read_message().await?;
108
109 match response.into_result() {
110 Ok(result) => Ok(result),
111 Err(error) => Err(ClientError::server_error(
112 error.code,
113 error.message,
114 error.details,
115 )),
116 }
117 }
118
119 pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
121 let result = self.send_request("record.list", None).await?;
122 let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
123 Ok(records)
124 }
125
126 pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
128 let params = json!({ "record": name });
129 self.send_request("record.get", Some(params)).await
130 }
131
132 pub async fn set_record(
134 &mut self,
135 name: &str,
136 value: serde_json::Value,
137 ) -> ClientResult<serde_json::Value> {
138 let params = json!({
139 "name": name,
140 "value": value
141 });
142 self.send_request("record.set", Some(params)).await
143 }
144
145 pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
147 let params = json!({
148 "name": name,
149 "queue_size": queue_size
150 });
151 let result = self.send_request("record.subscribe", Some(params)).await?;
152
153 let subscription_id = result["subscription_id"]
154 .as_str()
155 .ok_or_else(|| {
156 ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
157 })?
158 .to_string();
159
160 Ok(subscription_id)
161 }
162
163 pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
165 let params = json!({ "subscription_id": subscription_id });
166 self.send_request("record.unsubscribe", Some(params))
167 .await?;
168 Ok(())
169 }
170
171 pub async fn receive_event(&mut self) -> ClientResult<Event> {
173 let event_msg: EventMessage = self.read_message().await?;
174 Ok(event_msg.event)
175 }
176
177 async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
179 let data = serialize_message(msg)?;
180 self.stream.write_all(data.as_bytes()).await?;
181 self.stream.flush().await?;
182 Ok(())
183 }
184
185 async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
187 let mut line = String::new();
188 self.reader.read_line(&mut line).await?;
189
190 if line.is_empty() {
191 return Err(ClientError::connection_failed(
192 self.socket_path.display().to_string(),
193 "connection closed by server",
194 ));
195 }
196
197 parse_message(&line).map_err(|e| e.into())
198 }
199}