Skip to main content

aimdb_client/
connection.rs

1//! AimX Client Connection
2//!
3//! Async client for connecting to AimDB instances via Unix domain sockets.
4
5use crate::error::{ClientError, ClientResult};
6use crate::protocol::{
7    cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
8    RequestExt, Response, ResponseExt, WelcomeMessage,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::path::{Path, PathBuf};
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
16use tokio::net::UnixStream;
17
18/// Timeout for connection operations
19const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
20
21/// AimX protocol client
22pub struct AimxClient {
23    socket_path: PathBuf,
24    stream: OwnedWriteHalf,
25    reader: BufReader<OwnedReadHalf>,
26    request_id_counter: u64,
27    server_info: WelcomeMessage,
28}
29
30impl AimxClient {
31    /// Connect to an AimDB instance
32    pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
33        let socket_path = socket_path.as_ref().to_path_buf();
34
35        // Connect with timeout
36        let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
37            .await
38            .map_err(|_| {
39                ClientError::connection_failed(
40                    socket_path.display().to_string(),
41                    "connection timeout",
42                )
43            })?
44            .map_err(|e| {
45                ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
46            })?;
47
48        // Split into reader and writer
49        let (reader_stream, writer_stream) = stream.into_split();
50
51        let reader = BufReader::new(reader_stream);
52        let mut client = Self {
53            socket_path,
54            stream: writer_stream,
55            reader,
56            request_id_counter: 0,
57            server_info: WelcomeMessage {
58                version: String::new(),
59                server: String::new(),
60                permissions: Vec::new(),
61                writable_records: Vec::new(),
62                max_subscriptions: None,
63                authenticated: None,
64            },
65        };
66
67        // Perform handshake
68        client.handshake().await?;
69
70        Ok(client)
71    }
72
73    /// Perform protocol handshake
74    async fn handshake(&mut self) -> ClientResult<()> {
75        // Send Hello
76        let hello = cli_hello();
77        self.write_message(&hello).await?;
78
79        // Receive Welcome
80        let welcome: WelcomeMessage = self.read_message().await?;
81        self.server_info = welcome;
82
83        Ok(())
84    }
85
86    /// Get server information
87    pub fn server_info(&self) -> &WelcomeMessage {
88        &self.server_info
89    }
90
91    /// Send a request and wait for response
92    async fn send_request(
93        &mut self,
94        method: &str,
95        params: Option<serde_json::Value>,
96    ) -> ClientResult<serde_json::Value> {
97        self.request_id_counter += 1;
98        let id = self.request_id_counter;
99
100        let request = if let Some(params) = params {
101            Request::with_params(id, method, params)
102        } else {
103            Request::new(id, method)
104        };
105
106        self.write_message(&request).await?;
107
108        let response: Response = self.read_message().await?;
109
110        match response.into_result() {
111            Ok(result) => Ok(result),
112            Err(error) => Err(ClientError::server_error(
113                error.code,
114                error.message,
115                error.details,
116            )),
117        }
118    }
119
120    /// List all registered records
121    pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
122        let result = self.send_request("record.list", None).await?;
123        let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
124        Ok(records)
125    }
126
127    /// Get current value of a record
128    pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
129        let params = json!({ "record": name });
130        self.send_request("record.get", Some(params)).await
131    }
132
133    /// Set value of a writable record
134    pub async fn set_record(
135        &mut self,
136        name: &str,
137        value: serde_json::Value,
138    ) -> ClientResult<serde_json::Value> {
139        let params = json!({
140            "name": name,
141            "value": value
142        });
143        self.send_request("record.set", Some(params)).await
144    }
145
146    /// Subscribe to record updates
147    pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
148        let params = json!({
149            "name": name,
150            "queue_size": queue_size
151        });
152        let result = self.send_request("record.subscribe", Some(params)).await?;
153
154        let subscription_id = result["subscription_id"]
155            .as_str()
156            .ok_or_else(|| {
157                ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
158            })?
159            .to_string();
160
161        Ok(subscription_id)
162    }
163
164    /// Unsubscribe from record updates
165    pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
166        let params = json!({ "subscription_id": subscription_id });
167        self.send_request("record.unsubscribe", Some(params))
168            .await?;
169        Ok(())
170    }
171
172    /// Receive next event from subscription
173    pub async fn receive_event(&mut self) -> ClientResult<Event> {
174        let event_msg: EventMessage = self.read_message().await?;
175        Ok(event_msg.event)
176    }
177
178    /// Drain all pending values from a record's drain reader.
179    ///
180    /// Returns all values accumulated since the last drain call,
181    /// in chronological order. This is a destructive read — drained
182    /// values will not be returned again.
183    ///
184    /// The first call for a given record creates the drain reader and
185    /// returns empty (cold start). Subsequent calls return accumulated values.
186    pub async fn drain_record(&mut self, name: &str) -> ClientResult<DrainResponse> {
187        let params = json!({ "name": name });
188        let result = self.send_request("record.drain", Some(params)).await?;
189        let response: DrainResponse = serde_json::from_value(result)?;
190        Ok(response)
191    }
192
193    /// Drain with a limit on the number of values returned.
194    pub async fn drain_record_with_limit(
195        &mut self,
196        name: &str,
197        limit: u32,
198    ) -> ClientResult<DrainResponse> {
199        let params = json!({
200            "name": name,
201            "limit": limit,
202        });
203        let result = self.send_request("record.drain", Some(params)).await?;
204        let response: DrainResponse = serde_json::from_value(result)?;
205        Ok(response)
206    }
207
208    // ========================================================================
209    // Graph Introspection Methods
210    // ========================================================================
211
212    /// Get all nodes in the dependency graph.
213    ///
214    /// Returns a list of GraphNode objects representing all records
215    /// and their connections in the database.
216    pub async fn graph_nodes(&mut self) -> ClientResult<Vec<serde_json::Value>> {
217        let result = self.send_request("graph.nodes", None).await?;
218        let nodes: Vec<serde_json::Value> = serde_json::from_value(result)?;
219        Ok(nodes)
220    }
221
222    /// Get all edges in the dependency graph.
223    ///
224    /// Returns a list of GraphEdge objects representing data flow
225    /// connections between records.
226    pub async fn graph_edges(&mut self) -> ClientResult<Vec<serde_json::Value>> {
227        let result = self.send_request("graph.edges", None).await?;
228        let edges: Vec<serde_json::Value> = serde_json::from_value(result)?;
229        Ok(edges)
230    }
231
232    /// Get the topological ordering of records.
233    ///
234    /// Returns the record keys in topological order, ensuring all
235    /// dependencies are listed before dependents. Useful for understanding
236    /// data flow and initialization order.
237    pub async fn graph_topo_order(&mut self) -> ClientResult<Vec<String>> {
238        let result = self.send_request("graph.topo_order", None).await?;
239        let order: Vec<String> = serde_json::from_value(result)?;
240        Ok(order)
241    }
242
243    /// Write a message to the stream
244    async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
245        let data = serialize_message(msg)?;
246        self.stream.write_all(data.as_bytes()).await?;
247        self.stream.flush().await?;
248        Ok(())
249    }
250
251    /// Read a message from the stream
252    async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
253        let mut line = String::new();
254        self.reader.read_line(&mut line).await?;
255
256        if line.is_empty() {
257            return Err(ClientError::connection_failed(
258                self.socket_path.display().to_string(),
259                "connection closed by server",
260            ));
261        }
262
263        parse_message(&line).map_err(|e| e.into())
264    }
265}
266
267/// Response from a record.drain call
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct DrainResponse {
270    /// Echo of the queried record name
271    pub record_name: String,
272    /// Chronologically ordered values (raw JSON, as written by the producer)
273    pub values: Vec<serde_json::Value>,
274    /// Number of values returned
275    pub count: usize,
276}