Skip to main content

aimdb_client/
connection.rs

1//! AimX Client Connection
2//!
3//! Async client for connecting to AimDB instances via Unix domain sockets.
4
5use crate::error::{ClientError, ClientResult};
6use crate::protocol::{
7    cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
8    RequestExt, Response, ResponseExt, WelcomeMessage,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::path::{Path, PathBuf};
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
16use tokio::net::UnixStream;
17
18/// Timeout for connection operations
19const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
20
21/// AimX protocol client
22pub struct AimxClient {
23    socket_path: PathBuf,
24    stream: OwnedWriteHalf,
25    reader: BufReader<OwnedReadHalf>,
26    request_id_counter: u64,
27    server_info: WelcomeMessage,
28}
29
30impl AimxClient {
31    /// Connect to an AimDB instance
32    pub async fn connect(socket_path: impl AsRef<Path>) -> ClientResult<Self> {
33        let socket_path = socket_path.as_ref().to_path_buf();
34
35        // Connect with timeout
36        let stream = tokio::time::timeout(CONNECTION_TIMEOUT, UnixStream::connect(&socket_path))
37            .await
38            .map_err(|_| {
39                ClientError::connection_failed(
40                    socket_path.display().to_string(),
41                    "connection timeout",
42                )
43            })?
44            .map_err(|e| {
45                ClientError::connection_failed(socket_path.display().to_string(), e.to_string())
46            })?;
47
48        // Split into reader and writer
49        let (reader_stream, writer_stream) = stream.into_split();
50
51        let reader = BufReader::new(reader_stream);
52        let mut client = Self {
53            socket_path,
54            stream: writer_stream,
55            reader,
56            request_id_counter: 0,
57            server_info: WelcomeMessage {
58                version: String::new(),
59                server: String::new(),
60                permissions: Vec::new(),
61                writable_records: Vec::new(),
62                max_subscriptions: None,
63                authenticated: None,
64            },
65        };
66
67        // Perform handshake
68        client.handshake().await?;
69
70        Ok(client)
71    }
72
73    /// Perform protocol handshake
74    async fn handshake(&mut self) -> ClientResult<()> {
75        // Send Hello
76        let hello = cli_hello();
77        self.write_message(&hello).await?;
78
79        // Receive Welcome
80        let welcome: WelcomeMessage = self.read_message().await?;
81        self.server_info = welcome;
82
83        Ok(())
84    }
85
86    /// Get server information
87    pub fn server_info(&self) -> &WelcomeMessage {
88        &self.server_info
89    }
90
91    /// Send a request and wait for response
92    async fn send_request(
93        &mut self,
94        method: &str,
95        params: Option<serde_json::Value>,
96    ) -> ClientResult<serde_json::Value> {
97        self.request_id_counter += 1;
98        let id = self.request_id_counter;
99
100        let request = if let Some(params) = params {
101            Request::with_params(id, method, params)
102        } else {
103            Request::new(id, method)
104        };
105
106        self.write_message(&request).await?;
107
108        let response: Response = self.read_message().await?;
109
110        match response.into_result() {
111            Ok(result) => Ok(result),
112            Err(error) => Err(ClientError::server_error(
113                error.code,
114                error.message,
115                error.details,
116            )),
117        }
118    }
119
120    /// List all registered records
121    pub async fn list_records(&mut self) -> ClientResult<Vec<RecordMetadata>> {
122        let result = self.send_request("record.list", None).await?;
123        let records: Vec<RecordMetadata> = serde_json::from_value(result)?;
124        Ok(records)
125    }
126
127    /// Reset stage profiling counters for every record on the server.
128    ///
129    /// Requires the server to be built with the `profiling` feature and the
130    /// connection to have write permission.
131    pub async fn reset_stage_profiling(&mut self) -> ClientResult<serde_json::Value> {
132        self.send_request("profiling.reset", None).await
133    }
134
135    /// Reset buffer introspection counters for every record on the server.
136    ///
137    /// Requires the server to be built with the `metrics` feature and the
138    /// connection to have write permission.
139    pub async fn reset_buffer_metrics(&mut self) -> ClientResult<serde_json::Value> {
140        self.send_request("buffer_metrics.reset", None).await
141    }
142
143    /// Get current value of a record
144    pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
145        let params = json!({ "record": name });
146        self.send_request("record.get", Some(params)).await
147    }
148
149    /// Set value of a writable record
150    pub async fn set_record(
151        &mut self,
152        name: &str,
153        value: serde_json::Value,
154    ) -> ClientResult<serde_json::Value> {
155        let params = json!({
156            "name": name,
157            "value": value
158        });
159        self.send_request("record.set", Some(params)).await
160    }
161
162    /// Subscribe to record updates
163    pub async fn subscribe(&mut self, name: &str, queue_size: usize) -> ClientResult<String> {
164        let params = json!({
165            "name": name,
166            "queue_size": queue_size
167        });
168        let result = self.send_request("record.subscribe", Some(params)).await?;
169
170        let subscription_id = result["subscription_id"]
171            .as_str()
172            .ok_or_else(|| {
173                ClientError::Other(anyhow::anyhow!("Missing subscription_id in response"))
174            })?
175            .to_string();
176
177        Ok(subscription_id)
178    }
179
180    /// Unsubscribe from record updates
181    pub async fn unsubscribe(&mut self, subscription_id: &str) -> ClientResult<()> {
182        let params = json!({ "subscription_id": subscription_id });
183        self.send_request("record.unsubscribe", Some(params))
184            .await?;
185        Ok(())
186    }
187
188    /// Receive next event from subscription
189    pub async fn receive_event(&mut self) -> ClientResult<Event> {
190        let event_msg: EventMessage = self.read_message().await?;
191        Ok(event_msg.event)
192    }
193
194    /// Drain all pending values from a record's drain reader.
195    ///
196    /// Returns all values accumulated since the last drain call,
197    /// in chronological order. This is a destructive read — drained
198    /// values will not be returned again.
199    ///
200    /// The first call for a given record creates the drain reader and
201    /// returns empty (cold start). Subsequent calls return accumulated values.
202    pub async fn drain_record(&mut self, name: &str) -> ClientResult<DrainResponse> {
203        let params = json!({ "name": name });
204        let result = self.send_request("record.drain", Some(params)).await?;
205        let response: DrainResponse = serde_json::from_value(result)?;
206        Ok(response)
207    }
208
209    /// Drain with a limit on the number of values returned.
210    pub async fn drain_record_with_limit(
211        &mut self,
212        name: &str,
213        limit: u32,
214    ) -> ClientResult<DrainResponse> {
215        let params = json!({
216            "name": name,
217            "limit": limit,
218        });
219        let result = self.send_request("record.drain", Some(params)).await?;
220        let response: DrainResponse = serde_json::from_value(result)?;
221        Ok(response)
222    }
223
224    // ========================================================================
225    // Graph Introspection Methods
226    // ========================================================================
227
228    /// Get all nodes in the dependency graph.
229    ///
230    /// Returns a list of GraphNode objects representing all records
231    /// and their connections in the database.
232    pub async fn graph_nodes(&mut self) -> ClientResult<Vec<serde_json::Value>> {
233        let result = self.send_request("graph.nodes", None).await?;
234        let nodes: Vec<serde_json::Value> = serde_json::from_value(result)?;
235        Ok(nodes)
236    }
237
238    /// Get all edges in the dependency graph.
239    ///
240    /// Returns a list of GraphEdge objects representing data flow
241    /// connections between records.
242    pub async fn graph_edges(&mut self) -> ClientResult<Vec<serde_json::Value>> {
243        let result = self.send_request("graph.edges", None).await?;
244        let edges: Vec<serde_json::Value> = serde_json::from_value(result)?;
245        Ok(edges)
246    }
247
248    /// Get the topological ordering of records.
249    ///
250    /// Returns the record keys in topological order, ensuring all
251    /// dependencies are listed before dependents. Useful for understanding
252    /// data flow and initialization order.
253    pub async fn graph_topo_order(&mut self) -> ClientResult<Vec<String>> {
254        let result = self.send_request("graph.topo_order", None).await?;
255        let order: Vec<String> = serde_json::from_value(result)?;
256        Ok(order)
257    }
258
259    /// Write a message to the stream
260    async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
261        let data = serialize_message(msg)?;
262        self.stream.write_all(data.as_bytes()).await?;
263        self.stream.flush().await?;
264        Ok(())
265    }
266
267    /// Read a message from the stream
268    async fn read_message<T: for<'de> serde::Deserialize<'de>>(&mut self) -> ClientResult<T> {
269        let mut line = String::new();
270        self.reader.read_line(&mut line).await?;
271
272        if line.is_empty() {
273            return Err(ClientError::connection_failed(
274                self.socket_path.display().to_string(),
275                "connection closed by server",
276            ));
277        }
278
279        parse_message(&line).map_err(|e| e.into())
280    }
281}
282
283/// Response from a record.drain call
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct DrainResponse {
286    /// Echo of the queried record name
287    pub record_name: String,
288    /// Chronologically ordered values (raw JSON, as written by the producer)
289    pub values: Vec<serde_json::Value>,
290    /// Number of values returned
291    pub count: usize,
292}