solidb_client/client/
mod.rs1mod builder;
2mod bulk;
3mod collection;
4mod database;
5mod document;
6mod index;
7mod query;
8mod transaction;
9
10pub use builder::SoliDBClientBuilder;
11
12use serde_json::Value;
13use tokio::io::{AsyncReadExt, AsyncWriteExt};
14use tokio::net::TcpStream;
15
16use super::protocol::{
17 decode_message, encode_command, Command, DriverError, Response, DRIVER_MAGIC, MAX_MESSAGE_SIZE,
18};
19
20pub struct SoliDBClient {
21 pub(crate) stream: TcpStream,
22 pub(crate) current_tx: Option<String>,
23}
24
25impl SoliDBClient {
26 pub async fn connect(addr: &str) -> Result<Self, DriverError> {
27 let stream = TcpStream::connect(addr).await.map_err(|e| {
28 DriverError::ConnectionError(format!("Failed to connect to {}: {}", addr, e))
29 })?;
30
31 let mut client = Self {
32 stream,
33 current_tx: None,
34 };
35
36 client.stream.write_all(DRIVER_MAGIC).await.map_err(|e| {
37 DriverError::ConnectionError(format!("Failed to send magic header: {}", e))
38 })?;
39 client
40 .stream
41 .flush()
42 .await
43 .map_err(|e| DriverError::ConnectionError(format!("Failed to flush: {}", e)))?;
44
45 Ok(client)
46 }
47
48 pub(crate) async fn send_command(&mut self, command: Command) -> Result<Response, DriverError> {
49 let data = encode_command(&command)?;
50 self.stream
51 .write_all(&data)
52 .await
53 .map_err(|e| DriverError::ConnectionError(format!("Write failed: {}", e)))?;
54 self.stream
55 .flush()
56 .await
57 .map_err(|e| DriverError::ConnectionError(format!("Flush failed: {}", e)))?;
58
59 let mut len_buf = [0u8; 4];
60 self.stream
61 .read_exact(&mut len_buf)
62 .await
63 .map_err(|e| DriverError::ConnectionError(format!("Read length failed: {}", e)))?;
64
65 let msg_len = u32::from_be_bytes(len_buf) as usize;
66 if msg_len > MAX_MESSAGE_SIZE {
67 return Err(DriverError::MessageTooLarge);
68 }
69
70 let mut payload = vec![0u8; msg_len];
71 self.stream
72 .read_exact(&mut payload)
73 .await
74 .map_err(|e| DriverError::ConnectionError(format!("Read payload failed: {}", e)))?;
75
76 decode_message(&payload)
77 }
78
79 pub(crate) fn extract_data(response: Response) -> Result<Option<Value>, DriverError> {
80 match response {
81 Response::Ok { data, .. } => Ok(data),
82 Response::Error { error } => Err(error),
83 Response::Pong { .. } => Ok(None),
84 Response::Batch { .. } => Ok(None),
85 }
86 }
87
88 pub(crate) fn extract_tx_id(response: Response) -> Result<String, DriverError> {
89 match response {
90 Response::Ok {
91 tx_id: Some(id), ..
92 } => Ok(id),
93 Response::Ok { .. } => Err(DriverError::ProtocolError(
94 "Expected transaction ID".to_string(),
95 )),
96 Response::Error { error } => Err(error),
97 _ => Err(DriverError::ProtocolError(
98 "Unexpected response type".to_string(),
99 )),
100 }
101 }
102
103 pub async fn ping(&mut self) -> Result<i64, DriverError> {
104 let response = self.send_command(Command::Ping).await?;
105 match response {
106 Response::Pong { timestamp } => Ok(timestamp),
107 Response::Error { error } => Err(error),
108 _ => Err(DriverError::ProtocolError(
109 "Expected pong response".to_string(),
110 )),
111 }
112 }
113
114 pub async fn auth(
115 &mut self,
116 database: &str,
117 username: &str,
118 password: &str,
119 ) -> Result<(), DriverError> {
120 let response = self
121 .send_command(Command::Auth {
122 database: database.to_string(),
123 username: username.to_string(),
124 password: password.to_string(),
125 })
126 .await?;
127
128 match response {
129 Response::Ok { .. } => Ok(()),
130 Response::Error { error } => Err(error),
131 _ => Err(DriverError::ProtocolError(
132 "Unexpected response".to_string(),
133 )),
134 }
135 }
136}