aimdb_client/
connection.rs

1//! AimX Client Connection
2//!
3//! Async client for connecting to AimDB instances via Unix domain sockets.
4
5use crate::error::{ClientError, ClientResult};
6use crate::protocol::{
7    cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
8    RequestExt, Response, ResponseExt, WelcomeMessage,
9};
10use serde_json::json;
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
15use tokio::net::UnixStream;
16
17/// Timeout for connection operations
18const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
19
20/// AimX protocol client
21pub struct AimxClient {
22    socket_path: PathBuf,
23    stream: OwnedWriteHalf,
24    reader: BufReader<OwnedReadHalf>,
25    request_id_counter: u64,
26    server_info: WelcomeMessage,
27}
28
29impl AimxClient {
30    /// Connect to an AimDB instance
31    pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
32        let socket_path = socket_path.as_ref().to_path_buf();
33
34        // Connect with timeout
35        let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
36            .await
37            .map_err(|_| {
38                ClientError::connection_failed(
39                    socket_path.display().to_string(),
40                    "connection timeout",
41                )
42            })?
43            .map_err(|e| {
44                ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
45            })?;
46
47        // Split into reader and writer
48        let (reader_stream, writer_stream) = stream.into_split();
49
50        let reader = BufReader::new(reader_stream);
51        let mut client = Self {
52            socket_path,
53            stream: writer_stream,
54            reader,
55            request_id_counter: 0,
56            server_info: WelcomeMessage {
57                version: String::new(),
58                server: String::new(),
59                permissions: Vec::new(),
60                writable_records: Vec::new(),
61                max_subscriptions: None,
62                authenticated: None,
63            },
64        };
65
66        // Perform handshake
67        client.handshake().await?;
68
69        Ok(client)
70    }
71
72    /// Perform protocol handshake
73    async fn handshake(&mut self) -> ClientResult<()> {
74        // Send Hello
75        let hello = cli_hello();
76        self.write_message(&hello).await?;
77
78        // Receive Welcome
79        let welcome: WelcomeMessage = self.read_message().await?;
80        self.server_info = welcome;
81
82        Ok(())
83    }
84
85    /// Get server information
86    pub fn server_info(&self) -> &WelcomeMessage {
87        &self.server_info
88    }
89
90    /// Send a request and wait for response
91    async fn send_request(
92        &mut self,
93        method: &str,
94        params: Option<serde_json::Value>,
95    ) -> ClientResult<serde_json::Value> {
96        self.request_id_counter += 1;
97        let id = self.request_id_counter;
98
99        let request = if let Some(params) = params {
100            Request::with_params(id, method, params)
101        } else {
102            Request::new(id, method)
103        };
104
105        self.write_message(&request).await?;
106
107        let response: Response = self.read_message().await?;
108
109        match response.into_result() {
110            Ok(result) => Ok(result),
111            Err(error) => Err(ClientError::server_error(
112                error.code,
113                error.message,
114                error.details,
115            )),
116        }
117    }
118
119    /// List all registered records
120    pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
121        let result = self.send_request("record.list", None).await?;
122        let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
123        Ok(records)
124    }
125
126    /// Get current value of a record
127    pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
128        let params = json!({ "record": name });
129        self.send_request("record.get", Some(params)).await
130    }
131
132    /// Set value of a writable record
133    pub async fn set_record(
134        &mut self,
135        name: &str,
136        value: serde_json::Value,
137    ) -> ClientResult<serde_json::Value> {
138        let params = json!({
139            "name": name,
140            "value": value
141        });
142        self.send_request("record.set", Some(params)).await
143    }
144
145    /// Subscribe to record updates
146    pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
147        let params = json!({
148            "name": name,
149            "queue_size": queue_size
150        });
151        let result = self.send_request("record.subscribe", Some(params)).await?;
152
153        let subscription_id = result["subscription_id"]
154            .as_str()
155            .ok_or_else(|| {
156                ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
157            })?
158            .to_string();
159
160        Ok(subscription_id)
161    }
162
163    /// Unsubscribe from record updates
164    pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
165        let params = json!({ "subscription_id": subscription_id });
166        self.send_request("record.unsubscribe", Some(params))
167            .await?;
168        Ok(())
169    }
170
171    /// Receive next event from subscription
172    pub async fn receive_event(&mut self) -> ClientResult<Event> {
173        let event_msg: EventMessage = self.read_message().await?;
174        Ok(event_msg.event)
175    }
176
177    /// Write a message to the stream
178    async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
179        let data = serialize_message(msg)?;
180        self.stream.write_all(data.as_bytes()).await?;
181        self.stream.flush().await?;
182        Ok(())
183    }
184
185    /// Read a message from the stream
186    async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
187        let mut line = String::new();
188        self.reader.read_line(&mut line).await?;
189
190        if line.is_empty() {
191            return Err(ClientError::connection_failed(
192                self.socket_path.display().to_string(),
193                "connection closed by server",
194            ));
195        }
196
197        parse_message(&line).map_err(|e| e.into())
198    }
199}