Skip to main content

reddb_client_connector/
lib.rs

1//! Workspace-internal gRPC connector.
2//!
3//! Thin wrapper around the generated tonic `RedDbClient<Channel>`
4//! that adds bearer-token auth metadata + ergonomic typed
5//! responses. Lives in its own crate so both `reddb-server`
6//! (`rpc_stdio` mode) and `reddb-client` (the published driver)
7//! can reach it without setting up a circular path dependency
8//! through the `reddb` umbrella.
9//!
10//! Engine-free by design: only `tonic` + `reddb-grpc-proto` deps.
11//! See `crates/reddb-client-connector/Cargo.toml` for rationale.
12
13use reddb_grpc_proto::red_db_client::RedDbClient;
14use reddb_grpc_proto::*;
15use tonic::transport::Channel;
16use tonic::Request;
17
18#[derive(Debug, Clone)]
19pub struct HealthStatus {
20    pub healthy: bool,
21    pub state: String,
22    pub checked_at_unix_ms: u64,
23}
24
25#[derive(Debug, Clone)]
26pub struct QueryResponse {
27    pub ok: bool,
28    pub mode: String,
29    pub statement: String,
30    pub engine: String,
31    pub columns: Vec<String>,
32    pub record_count: u64,
33    pub result_json: String,
34}
35
36#[derive(Debug, Clone)]
37pub struct CreatedEntity {
38    pub ok: bool,
39    pub id: u64,
40    pub entity_json: String,
41}
42
43#[derive(Debug, Clone)]
44pub struct BulkCreateStatus {
45    pub ok: bool,
46    pub count: u64,
47}
48
49#[derive(Debug, Clone)]
50pub struct OperationStatus {
51    pub ok: bool,
52    pub message: String,
53}
54
55#[derive(Clone)]
56pub struct RedDBClient {
57    inner: RedDbClient<Channel>,
58    token: Option<String>,
59    pub addr: String,
60}
61
62impl RedDBClient {
63    pub async fn connect(
64        addr: &str,
65        token: Option<String>,
66    ) -> Result<Self, Box<dyn std::error::Error>> {
67        let endpoint = if addr.starts_with("http") {
68            addr.to_string()
69        } else {
70            format!("http://{}", addr)
71        };
72        let inner = RedDbClient::connect(endpoint.clone()).await?;
73        Ok(Self {
74            inner,
75            token,
76            addr: endpoint,
77        })
78    }
79
80    fn auth_request<T>(&self, inner: T) -> Request<T> {
81        let mut req = Request::new(inner);
82        if let Some(ref token) = self.token {
83            if let Ok(value) = format!("Bearer {}", token).parse() {
84                req.metadata_mut().insert("authorization", value);
85            }
86        }
87        req
88    }
89
90    /// Update the auth token (e.g. after a successful login).
91    pub fn set_token(&mut self, token: String) {
92        self.token = Some(token);
93    }
94
95    pub async fn health_status(&mut self) -> Result<HealthStatus, Box<dyn std::error::Error>> {
96        let req = self.auth_request(Empty {});
97        let resp = self.inner.health(req).await?;
98        let reply = resp.into_inner();
99        Ok(HealthStatus {
100            healthy: reply.healthy,
101            state: reply.state,
102            checked_at_unix_ms: reply.checked_at_unix_ms,
103        })
104    }
105
106    pub async fn health(&mut self) -> Result<String, Box<dyn std::error::Error>> {
107        let reply = self.health_status().await?;
108        Ok(format!(
109            "state: {}, healthy: {}",
110            reply.state, reply.healthy
111        ))
112    }
113
114    pub async fn query_reply(
115        &mut self,
116        sql: &str,
117    ) -> Result<QueryResponse, Box<dyn std::error::Error>> {
118        let req = self.auth_request(QueryRequest {
119            query: sql.to_string(),
120            entity_types: vec![],
121            capabilities: vec![],
122        });
123        let resp = self.inner.query(req).await?;
124        let reply = resp.into_inner();
125        Ok(QueryResponse {
126            ok: reply.ok,
127            mode: reply.mode,
128            statement: reply.statement,
129            engine: reply.engine,
130            columns: reply.columns,
131            record_count: reply.record_count,
132            result_json: reply.result_json,
133        })
134    }
135
136    pub async fn query(&mut self, sql: &str) -> Result<String, Box<dyn std::error::Error>> {
137        let reply = self.query_reply(sql).await?;
138        Ok(reply.result_json)
139    }
140
141    pub async fn collections(&mut self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
142        let req = self.auth_request(Empty {});
143        let resp = self.inner.collections(req).await?;
144        Ok(resp.into_inner().collections)
145    }
146
147    pub async fn scan(
148        &mut self,
149        collection: &str,
150        limit: u64,
151    ) -> Result<String, Box<dyn std::error::Error>> {
152        let req = self.auth_request(ScanRequest {
153            collection: collection.to_string(),
154            offset: 0,
155            limit,
156        });
157        let resp = self.inner.scan(req).await?;
158        let reply = resp.into_inner();
159        let items: Vec<String> = reply.items.iter().map(|e| e.json.clone()).collect();
160        Ok(format!(
161            "total: {}, items: [{}]",
162            reply.total,
163            items.join(", ")
164        ))
165    }
166
167    pub async fn stats(&mut self) -> Result<String, Box<dyn std::error::Error>> {
168        let req = self.auth_request(Empty {});
169        let resp = self.inner.stats(req).await?;
170        let reply = resp.into_inner();
171        Ok(format!(
172            "collections: {}, entities: {}, memory: {} bytes, started_at: {}",
173            reply.collection_count,
174            reply.total_entities,
175            reply.total_memory_bytes,
176            reply.started_at_unix_ms
177        ))
178    }
179
180    pub async fn create_row(
181        &mut self,
182        collection: &str,
183        json: &str,
184    ) -> Result<String, Box<dyn std::error::Error>> {
185        let reply = self.create_row_entity(collection, json).await?;
186        Ok(format!("id: {}, entity: {}", reply.id, reply.entity_json))
187    }
188
189    pub async fn create_row_entity(
190        &mut self,
191        collection: &str,
192        json: &str,
193    ) -> Result<CreatedEntity, Box<dyn std::error::Error>> {
194        let req = self.auth_request(JsonCreateRequest {
195            collection: collection.to_string(),
196            payload_json: json.to_string(),
197        });
198        let resp = self.inner.create_row(req).await?;
199        let reply = resp.into_inner();
200        Ok(CreatedEntity {
201            ok: reply.ok,
202            id: reply.id,
203            entity_json: reply.entity_json,
204        })
205    }
206
207    pub async fn bulk_create_rows(
208        &mut self,
209        collection: &str,
210        payload_json: Vec<String>,
211    ) -> Result<BulkCreateStatus, Box<dyn std::error::Error>> {
212        let req = self.auth_request(JsonBulkCreateRequest {
213            collection: collection.to_string(),
214            payload_json,
215        });
216        let resp = self.inner.bulk_create_rows(req).await?;
217        let reply = resp.into_inner();
218        Ok(BulkCreateStatus {
219            ok: reply.ok,
220            count: reply.count,
221        })
222    }
223
224    pub async fn explain(&mut self, sql: &str) -> Result<String, Box<dyn std::error::Error>> {
225        let req = self.auth_request(QueryRequest {
226            query: sql.to_string(),
227            entity_types: vec![],
228            capabilities: vec![],
229        });
230        let resp = self.inner.explain_query(req).await?;
231        Ok(resp.into_inner().payload)
232    }
233
234    pub async fn login(
235        &mut self,
236        username: &str,
237        password: &str,
238    ) -> Result<String, Box<dyn std::error::Error>> {
239        let payload = format!(
240            "{{\"username\":\"{}\",\"password\":\"{}\"}}",
241            username, password
242        );
243        let req = self.auth_request(JsonPayloadRequest {
244            payload_json: payload,
245        });
246        let resp = self.inner.auth_login(req).await?;
247        let reply = resp.into_inner();
248        Ok(reply.payload)
249    }
250
251    pub async fn replication_status(&mut self) -> Result<String, Box<dyn std::error::Error>> {
252        let req = self.auth_request(Empty {});
253        let resp = self.inner.replication_status(req).await?;
254        Ok(resp.into_inner().payload)
255    }
256
257    /// Fetch the canonical `Topology` payload (issue #167 / ADR 0008).
258    /// Returns the raw `topology_bytes` so the caller can hand them
259    /// straight to `TopologyConsumer::consume_bytes`. Engine-free —
260    /// this connector knows nothing about the wire schema beyond the
261    /// proto envelope.
262    pub async fn topology(&mut self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
263        let req = self.auth_request(TopologyRequest {});
264        let resp = self.inner.topology(req).await?;
265        Ok(resp.into_inner().topology_bytes)
266    }
267
268    pub async fn delete_entity(
269        &mut self,
270        collection: &str,
271        id: u64,
272    ) -> Result<OperationStatus, Box<dyn std::error::Error>> {
273        let req = self.auth_request(DeleteEntityRequest {
274            collection: collection.to_string(),
275            id,
276        });
277        let resp = self.inner.delete_entity(req).await?;
278        let reply = resp.into_inner();
279        Ok(OperationStatus {
280            ok: reply.ok,
281            message: reply.message,
282        })
283    }
284}