Skip to main content

reddb_client_connector/
lib.rs

1//! Workspace-internal gRPC connector.
2//!
3//! Thin wrapper around the generated tonic `RedDbClient<Channel>`
4//! that adds bearer-token auth metadata + ergonomic typed
5//! responses. Lives in its own crate so both `reddb-server`
6//! (`rpc_stdio` mode) and `reddb-client` (the published driver)
7//! can reach it without setting up a circular path dependency
8//! through the `reddb` umbrella.
9//!
10//! Engine-free by design: only `tonic` + `reddb-grpc-proto` deps.
11//! See `crates/reddb-client-connector/Cargo.toml` for rationale.
12
13use reddb_grpc_proto::red_db_client::RedDbClient;
14use reddb_grpc_proto::*;
15use tonic::transport::Channel;
16use tonic::Request;
17
18#[derive(Debug, Clone)]
19pub struct HealthStatus {
20    pub healthy: bool,
21    pub state: String,
22    pub checked_at_unix_ms: u64,
23}
24
25#[derive(Debug, Clone)]
26pub struct QueryResponse {
27    pub ok: bool,
28    pub mode: String,
29    pub statement: String,
30    pub engine: String,
31    pub columns: Vec<String>,
32    pub record_count: u64,
33    pub result_json: String,
34}
35
36#[derive(Debug, Clone)]
37pub struct CreatedEntity {
38    pub ok: bool,
39    pub id: u64,
40    pub entity_json: String,
41}
42
43#[derive(Debug, Clone)]
44pub struct BulkCreateStatus {
45    pub ok: bool,
46    pub count: u64,
47    pub ids: Vec<u64>,
48}
49
50#[derive(Debug, Clone)]
51pub struct OperationStatus {
52    pub ok: bool,
53    pub message: String,
54}
55
56#[derive(Clone)]
57pub struct RedDBClient {
58    inner: RedDbClient<Channel>,
59    token: Option<String>,
60    pub addr: String,
61}
62
63impl RedDBClient {
64    pub async fn connect(
65        addr: &str,
66        token: Option<String>,
67    ) -> Result<Self, Box<dyn std::error::Error>> {
68        let endpoint = if addr.starts_with("http") {
69            addr.to_string()
70        } else {
71            format!("http://{}", addr)
72        };
73        let inner = RedDbClient::connect(endpoint.clone()).await?;
74        Ok(Self {
75            inner,
76            token,
77            addr: endpoint,
78        })
79    }
80
81    fn auth_request<T>(&self, inner: T) -> Request<T> {
82        let mut req = Request::new(inner);
83        if let Some(ref token) = self.token {
84            if let Ok(value) = format!("Bearer {}", token).parse() {
85                req.metadata_mut().insert("authorization", value);
86            }
87        }
88        req
89    }
90
91    /// Update the auth token (e.g. after a successful login).
92    pub fn set_token(&mut self, token: String) {
93        self.token = Some(token);
94    }
95
96    pub async fn health_status(&mut self) -> Result<HealthStatus, Box<dyn std::error::Error>> {
97        let req = self.auth_request(Empty {});
98        let resp = self.inner.health(req).await?;
99        let reply = resp.into_inner();
100        Ok(HealthStatus {
101            healthy: reply.healthy,
102            state: reply.state,
103            checked_at_unix_ms: reply.checked_at_unix_ms,
104        })
105    }
106
107    pub async fn health(&mut self) -> Result<String, Box<dyn std::error::Error>> {
108        let reply = self.health_status().await?;
109        Ok(format!(
110            "state: {}, healthy: {}",
111            reply.state, reply.healthy
112        ))
113    }
114
115    pub async fn query_reply(
116        &mut self,
117        sql: &str,
118    ) -> Result<QueryResponse, Box<dyn std::error::Error>> {
119        self.query_reply_with_params(sql, Vec::new()).await
120    }
121
122    pub async fn query_reply_with_params(
123        &mut self,
124        sql: &str,
125        params: Vec<QueryValue>,
126    ) -> Result<QueryResponse, Box<dyn std::error::Error>> {
127        let req = self.auth_request(QueryRequest {
128            query: sql.to_string(),
129            entity_types: vec![],
130            capabilities: vec![],
131            params,
132        });
133        let resp = self.inner.query(req).await?;
134        let reply = resp.into_inner();
135        Ok(QueryResponse {
136            ok: reply.ok,
137            mode: reply.mode,
138            statement: reply.statement,
139            engine: reply.engine,
140            columns: reply.columns,
141            record_count: reply.record_count,
142            result_json: reply.result_json,
143        })
144    }
145
146    pub async fn query(&mut self, sql: &str) -> Result<String, Box<dyn std::error::Error>> {
147        let reply = self.query_reply(sql).await?;
148        Ok(reply.result_json)
149    }
150
151    pub async fn collections(&mut self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
152        let req = self.auth_request(Empty {});
153        let resp = self.inner.collections(req).await?;
154        Ok(resp.into_inner().collections)
155    }
156
157    pub async fn scan(
158        &mut self,
159        collection: &str,
160        limit: u64,
161    ) -> Result<String, Box<dyn std::error::Error>> {
162        let req = self.auth_request(ScanRequest {
163            collection: collection.to_string(),
164            offset: 0,
165            limit,
166        });
167        let resp = self.inner.scan(req).await?;
168        let reply = resp.into_inner();
169        let items: Vec<String> = reply.items.iter().map(|e| e.json.clone()).collect();
170        Ok(format!(
171            "total: {}, items: [{}]",
172            reply.total,
173            items.join(", ")
174        ))
175    }
176
177    pub async fn stats(&mut self) -> Result<String, Box<dyn std::error::Error>> {
178        let req = self.auth_request(Empty {});
179        let resp = self.inner.stats(req).await?;
180        let reply = resp.into_inner();
181        Ok(format!(
182            "collections: {}, entities: {}, memory: {} bytes, started_at: {}",
183            reply.collection_count,
184            reply.total_entities,
185            reply.total_memory_bytes,
186            reply.started_at_unix_ms
187        ))
188    }
189
190    pub async fn create_row(
191        &mut self,
192        collection: &str,
193        json: &str,
194    ) -> Result<String, Box<dyn std::error::Error>> {
195        let reply = self.create_row_entity(collection, json).await?;
196        Ok(format!("id: {}, entity: {}", reply.id, reply.entity_json))
197    }
198
199    pub async fn create_row_entity(
200        &mut self,
201        collection: &str,
202        json: &str,
203    ) -> Result<CreatedEntity, Box<dyn std::error::Error>> {
204        let req = self.auth_request(JsonCreateRequest {
205            collection: collection.to_string(),
206            payload_json: json.to_string(),
207        });
208        let resp = self.inner.create_row(req).await?;
209        let reply = resp.into_inner();
210        Ok(CreatedEntity {
211            ok: reply.ok,
212            id: reply.id,
213            entity_json: reply.entity_json,
214        })
215    }
216
217    pub async fn bulk_create_rows(
218        &mut self,
219        collection: &str,
220        payload_json: Vec<String>,
221    ) -> Result<BulkCreateStatus, Box<dyn std::error::Error>> {
222        let req = self.auth_request(JsonBulkCreateRequest {
223            collection: collection.to_string(),
224            payload_json,
225        });
226        let resp = self.inner.bulk_create_rows(req).await?;
227        let reply = resp.into_inner();
228        Ok(BulkCreateStatus {
229            ok: reply.ok,
230            count: reply.count,
231            ids: reply.items.into_iter().map(|item| item.id).collect(),
232        })
233    }
234
235    pub async fn explain(&mut self, sql: &str) -> Result<String, Box<dyn std::error::Error>> {
236        let req = self.auth_request(QueryRequest {
237            query: sql.to_string(),
238            entity_types: vec![],
239            capabilities: vec![],
240            params: vec![],
241        });
242        let resp = self.inner.explain_query(req).await?;
243        Ok(resp.into_inner().payload)
244    }
245
246    pub async fn login(
247        &mut self,
248        username: &str,
249        password: &str,
250    ) -> Result<String, Box<dyn std::error::Error>> {
251        let payload = format!(
252            "{{\"username\":\"{}\",\"password\":\"{}\"}}",
253            username, password
254        );
255        let req = self.auth_request(JsonPayloadRequest {
256            payload_json: payload,
257        });
258        let resp = self.inner.auth_login(req).await?;
259        let reply = resp.into_inner();
260        Ok(reply.payload)
261    }
262
263    pub async fn replication_status(&mut self) -> Result<String, Box<dyn std::error::Error>> {
264        let req = self.auth_request(Empty {});
265        let resp = self.inner.replication_status(req).await?;
266        Ok(resp.into_inner().payload)
267    }
268
269    /// Fetch the canonical `Topology` payload (issue #167 / ADR 0008).
270    /// Returns the raw `topology_bytes` so the caller can hand them
271    /// straight to `TopologyConsumer::consume_bytes`. Engine-free —
272    /// this connector knows nothing about the wire schema beyond the
273    /// proto envelope.
274    pub async fn topology(&mut self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
275        let req = self.auth_request(TopologyRequest {});
276        let resp = self.inner.topology(req).await?;
277        Ok(resp.into_inner().topology_bytes)
278    }
279
280    pub async fn delete_entity(
281        &mut self,
282        collection: &str,
283        id: u64,
284    ) -> Result<OperationStatus, Box<dyn std::error::Error>> {
285        let req = self.auth_request(DeleteEntityRequest {
286            collection: collection.to_string(),
287            id,
288        });
289        let resp = self.inner.delete_entity(req).await?;
290        let reply = resp.into_inner();
291        Ok(OperationStatus {
292            ok: reply.ok,
293            message: reply.message,
294        })
295    }
296}