1use std::collections::HashMap;
7use tonic::transport::{Channel, Endpoint};
8use tonic::{Request, Streaming};
9
10use crate::client::{Column, Page};
11use crate::dsn::Dsn;
12use crate::error::{Error, Result};
13use crate::types::Value;
14
15pub struct GrpcClient {
19 channel: Channel,
20 session_id: String,
21}
22
23impl GrpcClient {
24 pub async fn connect(dsn: &Dsn) -> Result<Self> {
43 let addr = if dsn.tls_enabled() {
44 format!("https://{}", dsn.address())
45 } else {
46 format!("http://{}", dsn.address())
47 };
48
49 let endpoint = Endpoint::from_shared(addr.clone())
50 .map_err(|e| Error::connection(format!("Invalid endpoint: {}", e)))?;
51
52 let endpoint = if dsn.tls_enabled() && dsn.skip_verify() {
54 endpoint
56 .tls_config(tonic::transport::ClientTlsConfig::new().with_enabled_roots())
57 .map_err(|e| Error::tls(format!("TLS config error: {}", e)))?
58 } else {
59 endpoint
60 };
61
62 let channel = endpoint
63 .connect()
64 .await
65 .map_err(|e| Error::connection(format!("gRPC connection failed to {}: {}", addr, e)))?;
66
67 let mut client = Self {
68 channel,
69 session_id: String::new(),
70 };
71
72 client.handshake(dsn.username(), dsn.password()).await?;
74
75 Ok(client)
76 }
77
78 async fn handshake(&mut self, username: Option<&str>, password: Option<&str>) -> Result<()> {
80 let request = GrpcHelloRequest {
81 username: username.unwrap_or("").to_string(),
82 password: password.unwrap_or("").to_string(),
83 tenant_id: None,
84 };
85
86 let mut grpc_client = GeodeServiceClient::new(self.channel.clone());
90
91 let response = grpc_client
92 .handshake(Request::new(request))
93 .await
94 .map_err(|e| Error::connection(format!("Handshake failed: {}", e)))?;
95
96 let resp = response.into_inner();
97 if !resp.success {
98 return Err(Error::auth(resp.error_message));
99 }
100
101 self.session_id = resp.session_id;
102 Ok(())
103 }
104
105 pub async fn query(&mut self, gql: &str) -> Result<(Page, Option<String>)> {
107 self.query_with_params(gql, &HashMap::new()).await
108 }
109
110 pub async fn query_with_params(
112 &mut self,
113 gql: &str,
114 params: &HashMap<String, Value>,
115 ) -> Result<(Page, Option<String>)> {
116 let params_map: HashMap<String, String> = params
117 .iter()
118 .map(|(k, v)| (k.clone(), v.to_proto_string()))
119 .collect();
120
121 let request = GrpcExecuteRequest {
122 session_id: self.session_id.clone(),
123 query: gql.to_string(),
124 parameters: params_map,
125 };
126
127 let mut grpc_client = GeodeServiceClient::new(self.channel.clone());
128
129 let response = grpc_client
130 .execute(Request::new(request))
131 .await
132 .map_err(|e| Error::query(format!("Query execution failed: {}", e)))?;
133
134 let mut stream = response.into_inner();
136 let mut columns = Vec::new();
137 let mut rows = Vec::new();
138 let mut final_page = true;
139
140 while let Some(exec_resp) = stream
141 .message()
142 .await
143 .map_err(|e| Error::query(format!("Failed to read response: {}", e)))?
144 {
145 match exec_resp.result {
146 Some(GrpcExecutionResult::Schema(schema)) => {
147 columns = schema
148 .columns
149 .into_iter()
150 .map(|c| Column {
151 name: c.name,
152 col_type: c.r#type,
153 })
154 .collect();
155 }
156 Some(GrpcExecutionResult::Page(page)) => {
157 for row in page.rows {
158 let mut row_map = HashMap::new();
159 for (i, col) in columns.iter().enumerate() {
160 let value = if i < row.values.len() {
161 Self::convert_proto_value(&row.values[i])
162 } else {
163 Value::null()
164 };
165 row_map.insert(col.name.clone(), value);
166 }
167 rows.push(row_map);
168 }
169 final_page = page.last_page;
170 }
171 Some(GrpcExecutionResult::Error(err)) => {
172 return Err(Error::Query {
173 code: err.code,
174 message: err.message,
175 });
176 }
177 Some(GrpcExecutionResult::Metrics(_)) => {
178 }
180 Some(GrpcExecutionResult::Heartbeat(_)) => {
181 }
183 None => {}
184 }
185 }
186
187 Ok((
188 Page {
189 columns,
190 rows,
191 ordered: false,
192 order_keys: Vec::new(),
193 final_page,
194 },
195 None,
196 ))
197 }
198
199 fn convert_proto_value(proto_val: &GrpcValue) -> Value {
201 match &proto_val.kind {
202 Some(GrpcValueKind::StringVal(s)) => Value::string(s.clone()),
203 Some(GrpcValueKind::IntVal(i)) => Value::int(*i),
204 Some(GrpcValueKind::DoubleVal(d)) => {
205 Value::decimal(rust_decimal::Decimal::from_f64_retain(*d).unwrap_or_default())
206 }
207 Some(GrpcValueKind::BoolVal(b)) => Value::bool(*b),
208 Some(GrpcValueKind::NullVal(true)) => Value::null(),
209 _ => Value::null(),
210 }
211 }
212
213 pub async fn begin(&mut self) -> Result<()> {
215 Err(Error::connection("gRPC transactions not yet implemented"))
218 }
219
220 pub async fn commit(&mut self) -> Result<()> {
222 Err(Error::connection("gRPC transactions not yet implemented"))
223 }
224
225 pub async fn rollback(&mut self) -> Result<()> {
227 Err(Error::connection("gRPC transactions not yet implemented"))
228 }
229
230 pub async fn ping(&mut self) -> Result<bool> {
232 let mut grpc_client = GeodeServiceClient::new(self.channel.clone());
233
234 let response = grpc_client
235 .ping(Request::new(GrpcPingRequest {}))
236 .await
237 .map_err(|e| Error::connection(format!("Ping failed: {}", e)))?;
238
239 Ok(response.into_inner().ok)
240 }
241
242 pub fn close(&mut self) -> Result<()> {
244 Ok(())
247 }
248}
249
250#[derive(Clone, PartialEq, Debug)]
256pub struct GrpcHelloRequest {
257 pub username: String,
258 pub password: String,
259 pub tenant_id: Option<String>,
260}
261
262#[derive(Clone, PartialEq, Debug)]
264pub struct GrpcHelloResponse {
265 pub success: bool,
266 pub session_id: String,
267 pub error_message: String,
268 pub capabilities: Vec<String>,
269}
270
271#[derive(Clone, PartialEq, Debug)]
273pub struct GrpcExecuteRequest {
274 pub session_id: String,
275 pub query: String,
276 pub parameters: HashMap<String, String>,
277}
278
279#[derive(Clone, PartialEq, Debug)]
281pub enum GrpcExecutionResult {
282 Schema(GrpcSchemaDefinition),
283 Page(GrpcDataPage),
284 Error(GrpcError),
285 Metrics(GrpcExecutionMetrics),
286 Heartbeat(GrpcHeartbeat),
287}
288
289#[derive(Clone, PartialEq, Debug)]
291pub struct GrpcExecutionResponse {
292 pub result: Option<GrpcExecutionResult>,
293}
294
295#[derive(Clone, PartialEq, Debug)]
297pub struct GrpcSchemaDefinition {
298 pub columns: Vec<GrpcColumnDefinition>,
299}
300
301#[derive(Clone, PartialEq, Debug)]
303pub struct GrpcColumnDefinition {
304 pub name: String,
305 pub r#type: String,
306}
307
308#[derive(Clone, PartialEq, Debug)]
310pub struct GrpcDataPage {
311 pub rows: Vec<GrpcRow>,
312 pub last_page: bool,
313}
314
315#[derive(Clone, PartialEq, Debug)]
317pub struct GrpcRow {
318 pub values: Vec<GrpcValue>,
319}
320
321#[derive(Clone, PartialEq, Debug)]
323pub enum GrpcValueKind {
324 StringVal(String),
325 IntVal(i64),
326 DoubleVal(f64),
327 BoolVal(bool),
328 NullVal(bool),
329}
330
331#[derive(Clone, PartialEq, Debug, Default)]
333pub struct GrpcValue {
334 pub kind: Option<GrpcValueKind>,
335}
336
337#[derive(Clone, PartialEq, Debug)]
339pub struct GrpcError {
340 pub code: String,
341 pub message: String,
342 pub r#type: String,
343}
344
345#[derive(Clone, PartialEq, Debug)]
347pub struct GrpcExecutionMetrics {
348 pub parse_duration_ns: i64,
349 pub plan_duration_ns: i64,
350 pub execute_duration_ns: i64,
351 pub total_duration_ns: i64,
352}
353
354#[derive(Clone, PartialEq, Debug, Default)]
356pub struct GrpcHeartbeat;
357
358#[derive(Clone, PartialEq, Debug, Default)]
360pub struct GrpcPingRequest;
361
362#[derive(Clone, PartialEq, Debug)]
364pub struct GrpcPingResponse {
365 pub ok: bool,
366}
367
368#[allow(dead_code)]
377pub struct GeodeServiceClient<T> {
378 inner: T,
379}
380
381impl GeodeServiceClient<Channel> {
382 pub fn new(channel: Channel) -> Self {
384 Self { inner: channel }
385 }
386
387 pub async fn handshake(
392 &mut self,
393 _request: Request<GrpcHelloRequest>,
394 ) -> std::result::Result<tonic::Response<GrpcHelloResponse>, tonic::Status> {
395 Err(tonic::Status::unimplemented(
397 "gRPC client not fully implemented",
398 ))
399 }
400
401 pub async fn execute(
405 &mut self,
406 _request: Request<GrpcExecuteRequest>,
407 ) -> std::result::Result<tonic::Response<Streaming<GrpcExecutionResponse>>, tonic::Status> {
408 Err(tonic::Status::unimplemented(
409 "gRPC client not fully implemented",
410 ))
411 }
412
413 pub async fn ping(
417 &mut self,
418 _request: Request<GrpcPingRequest>,
419 ) -> std::result::Result<tonic::Response<GrpcPingResponse>, tonic::Status> {
420 Err(tonic::Status::unimplemented(
421 "gRPC client not fully implemented",
422 ))
423 }
424}
425
426#[allow(dead_code)]
429fn encode_grpc_hello_request(req: &GrpcHelloRequest, buf: &mut Vec<u8>) {
430 if !req.username.is_empty() {
432 buf.push(0x0a); encode_varint(req.username.len() as u64, buf);
434 buf.extend(req.username.as_bytes());
435 }
436 if !req.password.is_empty() {
438 buf.push(0x12); encode_varint(req.password.len() as u64, buf);
440 buf.extend(req.password.as_bytes());
441 }
442 if let Some(ref tenant) = req.tenant_id {
444 if !tenant.is_empty() {
445 buf.push(0x1a); encode_varint(tenant.len() as u64, buf);
447 buf.extend(tenant.as_bytes());
448 }
449 }
450}
451
452#[allow(dead_code)]
454fn encode_varint(mut v: u64, buf: &mut Vec<u8>) {
455 while v >= 0x80 {
456 buf.push((v as u8) | 0x80);
457 v >>= 7;
458 }
459 buf.push(v as u8);
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn test_grpc_value_default() {
468 let val = GrpcValue::default();
469 assert!(val.kind.is_none());
470 }
471
472 #[test]
473 fn test_convert_proto_value_string() {
474 let proto_val = GrpcValue {
475 kind: Some(GrpcValueKind::StringVal("hello".to_string())),
476 };
477 let val = GrpcClient::convert_proto_value(&proto_val);
478 assert_eq!(val.as_string().unwrap(), "hello");
479 }
480
481 #[test]
482 fn test_convert_proto_value_int() {
483 let proto_val = GrpcValue {
484 kind: Some(GrpcValueKind::IntVal(42)),
485 };
486 let val = GrpcClient::convert_proto_value(&proto_val);
487 assert_eq!(val.as_int().unwrap(), 42);
488 }
489
490 #[test]
491 fn test_convert_proto_value_bool() {
492 let proto_val = GrpcValue {
493 kind: Some(GrpcValueKind::BoolVal(true)),
494 };
495 let val = GrpcClient::convert_proto_value(&proto_val);
496 assert!(val.as_bool().unwrap());
497 }
498
499 #[test]
500 fn test_convert_proto_value_null() {
501 let proto_val = GrpcValue {
502 kind: Some(GrpcValueKind::NullVal(true)),
503 };
504 let val = GrpcClient::convert_proto_value(&proto_val);
505 assert!(val.is_null());
506 }
507
508 #[test]
509 fn test_convert_proto_value_none() {
510 let proto_val = GrpcValue { kind: None };
511 let val = GrpcClient::convert_proto_value(&proto_val);
512 assert!(val.is_null());
513 }
514}