1use reddb_grpc_proto::red_db_client::RedDbClient;
14use reddb_grpc_proto::*;
15use tonic::transport::Channel;
16use tonic::Request;
17
18#[derive(Debug, Clone)]
19pub struct HealthStatus {
20 pub healthy: bool,
21 pub state: String,
22 pub checked_at_unix_ms: u64,
23}
24
25#[derive(Debug, Clone)]
26pub struct QueryResponse {
27 pub ok: bool,
28 pub mode: String,
29 pub statement: String,
30 pub engine: String,
31 pub columns: Vec<String>,
32 pub record_count: u64,
33 pub result_json: String,
34}
35
36#[derive(Debug, Clone)]
37pub struct CreatedEntity {
38 pub ok: bool,
39 pub id: u64,
40 pub entity_json: String,
41}
42
43#[derive(Debug, Clone)]
44pub struct BulkCreateStatus {
45 pub ok: bool,
46 pub count: u64,
47 pub ids: Vec<u64>,
48}
49
50#[derive(Debug, Clone)]
51pub struct OperationStatus {
52 pub ok: bool,
53 pub message: String,
54}
55
56#[derive(Clone)]
57pub struct RedDBClient {
58 inner: RedDbClient<Channel>,
59 token: Option<String>,
60 pub addr: String,
61}
62
63impl RedDBClient {
64 pub async fn connect(
65 addr: &str,
66 token: Option<String>,
67 ) -> Result<Self, Box<dyn std::error::Error>> {
68 let endpoint = if addr.starts_with("http") {
69 addr.to_string()
70 } else {
71 format!("http://{}", addr)
72 };
73 let inner = RedDbClient::connect(endpoint.clone()).await?;
74 Ok(Self {
75 inner,
76 token,
77 addr: endpoint,
78 })
79 }
80
81 fn auth_request<T>(&self, inner: T) -> Request<T> {
82 let mut req = Request::new(inner);
83 if let Some(ref token) = self.token {
84 if let Ok(value) = format!("Bearer {}", token).parse() {
85 req.metadata_mut().insert("authorization", value);
86 }
87 }
88 req
89 }
90
91 pub fn set_token(&mut self, token: String) {
93 self.token = Some(token);
94 }
95
96 pub async fn health_status(&mut self) -> Result<HealthStatus, Box<dyn std::error::Error>> {
97 let req = self.auth_request(Empty {});
98 let resp = self.inner.health(req).await?;
99 let reply = resp.into_inner();
100 Ok(HealthStatus {
101 healthy: reply.healthy,
102 state: reply.state,
103 checked_at_unix_ms: reply.checked_at_unix_ms,
104 })
105 }
106
107 pub async fn health(&mut self) -> Result<String, Box<dyn std::error::Error>> {
108 let reply = self.health_status().await?;
109 Ok(format!(
110 "state: {}, healthy: {}",
111 reply.state, reply.healthy
112 ))
113 }
114
115 pub async fn query_reply(
116 &mut self,
117 sql: &str,
118 ) -> Result<QueryResponse, Box<dyn std::error::Error>> {
119 self.query_reply_with_params(sql, Vec::new()).await
120 }
121
122 pub async fn query_reply_with_params(
123 &mut self,
124 sql: &str,
125 params: Vec<QueryValue>,
126 ) -> Result<QueryResponse, Box<dyn std::error::Error>> {
127 let req = self.auth_request(QueryRequest {
128 query: sql.to_string(),
129 entity_types: vec![],
130 capabilities: vec![],
131 params,
132 });
133 let resp = self.inner.query(req).await?;
134 let reply = resp.into_inner();
135 Ok(QueryResponse {
136 ok: reply.ok,
137 mode: reply.mode,
138 statement: reply.statement,
139 engine: reply.engine,
140 columns: reply.columns,
141 record_count: reply.record_count,
142 result_json: reply.result_json,
143 })
144 }
145
146 pub async fn query(&mut self, sql: &str) -> Result<String, Box<dyn std::error::Error>> {
147 let reply = self.query_reply(sql).await?;
148 Ok(reply.result_json)
149 }
150
151 pub async fn collections(&mut self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
152 let req = self.auth_request(Empty {});
153 let resp = self.inner.collections(req).await?;
154 Ok(resp.into_inner().collections)
155 }
156
157 pub async fn scan(
158 &mut self,
159 collection: &str,
160 limit: u64,
161 ) -> Result<String, Box<dyn std::error::Error>> {
162 let req = self.auth_request(ScanRequest {
163 collection: collection.to_string(),
164 offset: 0,
165 limit,
166 });
167 let resp = self.inner.scan(req).await?;
168 let reply = resp.into_inner();
169 let items: Vec<String> = reply.items.iter().map(|e| e.json.clone()).collect();
170 Ok(format!(
171 "total: {}, items: [{}]",
172 reply.total,
173 items.join(", ")
174 ))
175 }
176
177 pub async fn stats(&mut self) -> Result<String, Box<dyn std::error::Error>> {
178 let req = self.auth_request(Empty {});
179 let resp = self.inner.stats(req).await?;
180 let reply = resp.into_inner();
181 Ok(format!(
182 "collections: {}, entities: {}, memory: {} bytes, started_at: {}",
183 reply.collection_count,
184 reply.total_entities,
185 reply.total_memory_bytes,
186 reply.started_at_unix_ms
187 ))
188 }
189
190 pub async fn create_row(
191 &mut self,
192 collection: &str,
193 json: &str,
194 ) -> Result<String, Box<dyn std::error::Error>> {
195 let reply = self.create_row_entity(collection, json).await?;
196 Ok(format!("id: {}, entity: {}", reply.id, reply.entity_json))
197 }
198
199 pub async fn create_row_entity(
200 &mut self,
201 collection: &str,
202 json: &str,
203 ) -> Result<CreatedEntity, Box<dyn std::error::Error>> {
204 let req = self.auth_request(JsonCreateRequest {
205 collection: collection.to_string(),
206 payload_json: json.to_string(),
207 });
208 let resp = self.inner.create_row(req).await?;
209 let reply = resp.into_inner();
210 Ok(CreatedEntity {
211 ok: reply.ok,
212 id: reply.id,
213 entity_json: reply.entity_json,
214 })
215 }
216
217 pub async fn bulk_create_rows(
218 &mut self,
219 collection: &str,
220 payload_json: Vec<String>,
221 ) -> Result<BulkCreateStatus, Box<dyn std::error::Error>> {
222 let req = self.auth_request(JsonBulkCreateRequest {
223 collection: collection.to_string(),
224 payload_json,
225 });
226 let resp = self.inner.bulk_create_rows(req).await?;
227 let reply = resp.into_inner();
228 Ok(BulkCreateStatus {
229 ok: reply.ok,
230 count: reply.count,
231 ids: reply.items.into_iter().map(|item| item.id).collect(),
232 })
233 }
234
235 pub async fn explain(&mut self, sql: &str) -> Result<String, Box<dyn std::error::Error>> {
236 let req = self.auth_request(QueryRequest {
237 query: sql.to_string(),
238 entity_types: vec![],
239 capabilities: vec![],
240 params: vec![],
241 });
242 let resp = self.inner.explain_query(req).await?;
243 Ok(resp.into_inner().payload)
244 }
245
246 pub async fn login(
247 &mut self,
248 username: &str,
249 password: &str,
250 ) -> Result<String, Box<dyn std::error::Error>> {
251 let payload = format!(
252 "{{\"username\":\"{}\",\"password\":\"{}\"}}",
253 username, password
254 );
255 let req = self.auth_request(JsonPayloadRequest {
256 payload_json: payload,
257 });
258 let resp = self.inner.auth_login(req).await?;
259 let reply = resp.into_inner();
260 Ok(reply.payload)
261 }
262
263 pub async fn replication_status(&mut self) -> Result<String, Box<dyn std::error::Error>> {
264 let req = self.auth_request(Empty {});
265 let resp = self.inner.replication_status(req).await?;
266 Ok(resp.into_inner().payload)
267 }
268
269 pub async fn topology(&mut self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
275 let req = self.auth_request(TopologyRequest {});
276 let resp = self.inner.topology(req).await?;
277 Ok(resp.into_inner().topology_bytes)
278 }
279
280 pub async fn delete_entity(
281 &mut self,
282 collection: &str,
283 id: u64,
284 ) -> Result<OperationStatus, Box<dyn std::error::Error>> {
285 let req = self.auth_request(DeleteEntityRequest {
286 collection: collection.to_string(),
287 id,
288 });
289 let resp = self.inner.delete_entity(req).await?;
290 let reply = resp.into_inner();
291 Ok(OperationStatus {
292 ok: reply.ok,
293 message: reply.message,
294 })
295 }
296}