Skip to main content

geode_client/
grpc.rs

1//! gRPC transport implementation for Geode.
2//!
3//! This module provides gRPC client functionality using tonic.
4//! It connects to the Geode server's gRPC service as defined in geode.proto.
5
6use std::collections::HashMap;
7use tonic::transport::{Channel, Endpoint};
8use tonic::{Request, Streaming};
9
10use crate::client::{Column, Page};
11use crate::dsn::Dsn;
12use crate::error::{Error, Result};
13use crate::types::Value;
14
15/// gRPC client for Geode.
16///
17/// Provides gRPC-based connection to the Geode database server.
18pub struct GrpcClient {
19    channel: Channel,
20    session_id: String,
21}
22
23impl GrpcClient {
24    /// Connect to a Geode server using gRPC.
25    ///
26    /// # Arguments
27    ///
28    /// * `dsn` - Parsed DSN with gRPC transport
29    ///
30    /// # Example
31    ///
32    /// ```no_run
33    /// use geode_client::dsn::Dsn;
34    /// use geode_client::grpc::GrpcClient;
35    ///
36    /// # async fn example() -> geode_client::Result<()> {
37    /// let dsn = Dsn::parse("grpc://localhost:50051")?;
38    /// let client = GrpcClient::connect(&dsn).await?;
39    /// # Ok(())
40    /// # }
41    /// ```
42    pub async fn connect(dsn: &Dsn) -> Result<Self> {
43        let addr = if dsn.tls_enabled() {
44            format!("https://{}", dsn.address())
45        } else {
46            format!("http://{}", dsn.address())
47        };
48
49        let endpoint = Endpoint::from_shared(addr.clone())
50            .map_err(|e| Error::connection(format!("Invalid endpoint: {}", e)))?;
51
52        // Configure TLS if needed
53        let endpoint = if dsn.tls_enabled() && dsn.skip_verify() {
54            // Skip TLS verification (insecure - for development only)
55            endpoint
56                .tls_config(tonic::transport::ClientTlsConfig::new().with_enabled_roots())
57                .map_err(|e| Error::tls(format!("TLS config error: {}", e)))?
58        } else {
59            endpoint
60        };
61
62        let channel = endpoint
63            .connect()
64            .await
65            .map_err(|e| Error::connection(format!("gRPC connection failed to {}: {}", addr, e)))?;
66
67        let mut client = Self {
68            channel,
69            session_id: String::new(),
70        };
71
72        // Perform handshake
73        client.handshake(dsn.username(), dsn.password()).await?;
74
75        Ok(client)
76    }
77
78    /// Perform authentication handshake.
79    async fn handshake(&mut self, username: Option<&str>, password: Option<&str>) -> Result<()> {
80        let request = GrpcHelloRequest {
81            username: username.unwrap_or("").to_string(),
82            password: password.unwrap_or("").to_string(),
83            tenant_id: None,
84        };
85
86        // Send handshake via gRPC
87        // Note: This is a simplified implementation. The actual gRPC call
88        // depends on the server's service definition.
89        let mut grpc_client = GeodeServiceClient::new(self.channel.clone());
90
91        let response = grpc_client
92            .handshake(Request::new(request))
93            .await
94            .map_err(|e| Error::connection(format!("Handshake failed: {}", e)))?;
95
96        let resp = response.into_inner();
97        if !resp.success {
98            return Err(Error::auth(resp.error_message));
99        }
100
101        self.session_id = resp.session_id;
102        Ok(())
103    }
104
105    /// Execute a GQL query.
106    pub async fn query(&mut self, gql: &str) -> Result<(Page, Option<String>)> {
107        self.query_with_params(gql, &HashMap::new()).await
108    }
109
110    /// Execute a GQL query with parameters.
111    pub async fn query_with_params(
112        &mut self,
113        gql: &str,
114        params: &HashMap<String, Value>,
115    ) -> Result<(Page, Option<String>)> {
116        let params_map: HashMap<String, String> = params
117            .iter()
118            .map(|(k, v)| (k.clone(), v.to_proto_string()))
119            .collect();
120
121        let request = GrpcExecuteRequest {
122            session_id: self.session_id.clone(),
123            query: gql.to_string(),
124            parameters: params_map,
125        };
126
127        let mut grpc_client = GeodeServiceClient::new(self.channel.clone());
128
129        let response = grpc_client
130            .execute(Request::new(request))
131            .await
132            .map_err(|e| Error::query(format!("Query execution failed: {}", e)))?;
133
134        // Process streaming response
135        let mut stream = response.into_inner();
136        let mut columns = Vec::new();
137        let mut rows = Vec::new();
138        let mut final_page = true;
139
140        while let Some(exec_resp) = stream
141            .message()
142            .await
143            .map_err(|e| Error::query(format!("Failed to read response: {}", e)))?
144        {
145            match exec_resp.result {
146                Some(GrpcExecutionResult::Schema(schema)) => {
147                    columns = schema
148                        .columns
149                        .into_iter()
150                        .map(|c| Column {
151                            name: c.name,
152                            col_type: c.r#type,
153                        })
154                        .collect();
155                }
156                Some(GrpcExecutionResult::Page(page)) => {
157                    for row in page.rows {
158                        let mut row_map = HashMap::new();
159                        for (i, col) in columns.iter().enumerate() {
160                            let value = if i < row.values.len() {
161                                Self::convert_proto_value(&row.values[i])
162                            } else {
163                                Value::null()
164                            };
165                            row_map.insert(col.name.clone(), value);
166                        }
167                        rows.push(row_map);
168                    }
169                    final_page = page.last_page;
170                }
171                Some(GrpcExecutionResult::Error(err)) => {
172                    return Err(Error::Query {
173                        code: err.code,
174                        message: err.message,
175                    });
176                }
177                Some(GrpcExecutionResult::Metrics(_)) => {
178                    // Metrics received, continue
179                }
180                Some(GrpcExecutionResult::Heartbeat(_)) => {
181                    // Heartbeat received, continue
182                }
183                None => {}
184            }
185        }
186
187        Ok((
188            Page {
189                columns,
190                rows,
191                ordered: false,
192                order_keys: Vec::new(),
193                final_page,
194            },
195            None,
196        ))
197    }
198
199    /// Convert a proto Value to our Value type.
200    fn convert_proto_value(proto_val: &GrpcValue) -> Value {
201        match &proto_val.kind {
202            Some(GrpcValueKind::StringVal(s)) => Value::string(s.clone()),
203            Some(GrpcValueKind::IntVal(i)) => Value::int(*i),
204            Some(GrpcValueKind::DoubleVal(d)) => {
205                Value::decimal(rust_decimal::Decimal::from_f64_retain(*d).unwrap_or_default())
206            }
207            Some(GrpcValueKind::BoolVal(b)) => Value::bool(*b),
208            Some(GrpcValueKind::NullVal(true)) => Value::null(),
209            _ => Value::null(),
210        }
211    }
212
213    /// Begin a transaction.
214    pub async fn begin(&mut self) -> Result<()> {
215        // gRPC transaction support would go here
216        // For now, return an error as gRPC transactions may need different handling
217        Err(Error::connection("gRPC transactions not yet implemented"))
218    }
219
220    /// Commit a transaction.
221    pub async fn commit(&mut self) -> Result<()> {
222        Err(Error::connection("gRPC transactions not yet implemented"))
223    }
224
225    /// Rollback a transaction.
226    pub async fn rollback(&mut self) -> Result<()> {
227        Err(Error::connection("gRPC transactions not yet implemented"))
228    }
229
230    /// Send a ping request.
231    pub async fn ping(&mut self) -> Result<bool> {
232        let mut grpc_client = GeodeServiceClient::new(self.channel.clone());
233
234        let response = grpc_client
235            .ping(Request::new(GrpcPingRequest {}))
236            .await
237            .map_err(|e| Error::connection(format!("Ping failed: {}", e)))?;
238
239        Ok(response.into_inner().ok)
240    }
241
242    /// Close the connection.
243    pub fn close(&mut self) -> Result<()> {
244        // gRPC channels are automatically closed when dropped
245        // No explicit close action needed
246        Ok(())
247    }
248}
249
250// =============================================================================
251// gRPC Message Types (matching geode.proto service definitions)
252// =============================================================================
253
254/// gRPC HelloRequest message
255#[derive(Clone, PartialEq, Debug)]
256pub struct GrpcHelloRequest {
257    pub username: String,
258    pub password: String,
259    pub tenant_id: Option<String>,
260}
261
262/// gRPC HelloResponse message
263#[derive(Clone, PartialEq, Debug)]
264pub struct GrpcHelloResponse {
265    pub success: bool,
266    pub session_id: String,
267    pub error_message: String,
268    pub capabilities: Vec<String>,
269}
270
271/// gRPC ExecuteRequest message
272#[derive(Clone, PartialEq, Debug)]
273pub struct GrpcExecuteRequest {
274    pub session_id: String,
275    pub query: String,
276    pub parameters: HashMap<String, String>,
277}
278
279/// gRPC ExecutionResponse result variants
280#[derive(Clone, PartialEq, Debug)]
281pub enum GrpcExecutionResult {
282    Schema(GrpcSchemaDefinition),
283    Page(GrpcDataPage),
284    Error(GrpcError),
285    Metrics(GrpcExecutionMetrics),
286    Heartbeat(GrpcHeartbeat),
287}
288
289/// gRPC ExecutionResponse message
290#[derive(Clone, PartialEq, Debug)]
291pub struct GrpcExecutionResponse {
292    pub result: Option<GrpcExecutionResult>,
293}
294
295/// gRPC SchemaDefinition message
296#[derive(Clone, PartialEq, Debug)]
297pub struct GrpcSchemaDefinition {
298    pub columns: Vec<GrpcColumnDefinition>,
299}
300
301/// gRPC ColumnDefinition message
302#[derive(Clone, PartialEq, Debug)]
303pub struct GrpcColumnDefinition {
304    pub name: String,
305    pub r#type: String,
306}
307
308/// gRPC DataPage message
309#[derive(Clone, PartialEq, Debug)]
310pub struct GrpcDataPage {
311    pub rows: Vec<GrpcRow>,
312    pub last_page: bool,
313}
314
315/// gRPC Row message
316#[derive(Clone, PartialEq, Debug)]
317pub struct GrpcRow {
318    pub values: Vec<GrpcValue>,
319}
320
321/// gRPC Value kind variants
322#[derive(Clone, PartialEq, Debug)]
323pub enum GrpcValueKind {
324    StringVal(String),
325    IntVal(i64),
326    DoubleVal(f64),
327    BoolVal(bool),
328    NullVal(bool),
329}
330
331/// gRPC Value message
332#[derive(Clone, PartialEq, Debug, Default)]
333pub struct GrpcValue {
334    pub kind: Option<GrpcValueKind>,
335}
336
337/// gRPC Error message
338#[derive(Clone, PartialEq, Debug)]
339pub struct GrpcError {
340    pub code: String,
341    pub message: String,
342    pub r#type: String,
343}
344
345/// gRPC ExecutionMetrics message
346#[derive(Clone, PartialEq, Debug)]
347pub struct GrpcExecutionMetrics {
348    pub parse_duration_ns: i64,
349    pub plan_duration_ns: i64,
350    pub execute_duration_ns: i64,
351    pub total_duration_ns: i64,
352}
353
354/// gRPC Heartbeat message
355#[derive(Clone, PartialEq, Debug, Default)]
356pub struct GrpcHeartbeat;
357
358/// gRPC PingRequest message
359#[derive(Clone, PartialEq, Debug, Default)]
360pub struct GrpcPingRequest;
361
362/// gRPC PingResponse message
363#[derive(Clone, PartialEq, Debug)]
364pub struct GrpcPingResponse {
365    pub ok: bool,
366}
367
368// =============================================================================
369// gRPC Service Client (manual implementation matching geode.proto)
370// =============================================================================
371
372/// GeodeService gRPC client
373///
374/// Note: This is a stub implementation. The `inner` field is reserved
375/// for future use when full gRPC support is implemented.
376#[allow(dead_code)]
377pub struct GeodeServiceClient<T> {
378    inner: T,
379}
380
381impl GeodeServiceClient<Channel> {
382    /// Create a new GeodeServiceClient from a channel
383    pub fn new(channel: Channel) -> Self {
384        Self { inner: channel }
385    }
386
387    /// Perform handshake RPC
388    ///
389    /// Note: This is a stub implementation. Full gRPC support requires
390    /// proper protobuf code generation with prost-build.
391    pub async fn handshake(
392        &mut self,
393        _request: Request<GrpcHelloRequest>,
394    ) -> std::result::Result<tonic::Response<GrpcHelloResponse>, tonic::Status> {
395        // TODO: Implement proper gRPC handshake using prost-generated types
396        Err(tonic::Status::unimplemented(
397            "gRPC client not fully implemented",
398        ))
399    }
400
401    /// Execute query RPC (streaming response)
402    ///
403    /// Note: This is a stub implementation.
404    pub async fn execute(
405        &mut self,
406        _request: Request<GrpcExecuteRequest>,
407    ) -> std::result::Result<tonic::Response<Streaming<GrpcExecutionResponse>>, tonic::Status> {
408        Err(tonic::Status::unimplemented(
409            "gRPC client not fully implemented",
410        ))
411    }
412
413    /// Ping RPC
414    ///
415    /// Note: This is a stub implementation.
416    pub async fn ping(
417        &mut self,
418        _request: Request<GrpcPingRequest>,
419    ) -> std::result::Result<tonic::Response<GrpcPingResponse>, tonic::Status> {
420        Err(tonic::Status::unimplemented(
421            "gRPC client not fully implemented",
422        ))
423    }
424}
425
426/// Helper to encode HelloRequest for gRPC.
427/// Reserved for future use when full gRPC encoding is implemented.
428#[allow(dead_code)]
429fn encode_grpc_hello_request(req: &GrpcHelloRequest, buf: &mut Vec<u8>) {
430    // Field 1: username
431    if !req.username.is_empty() {
432        buf.push(0x0a); // tag 1, wire type 2 (length-delimited)
433        encode_varint(req.username.len() as u64, buf);
434        buf.extend(req.username.as_bytes());
435    }
436    // Field 2: password
437    if !req.password.is_empty() {
438        buf.push(0x12); // tag 2, wire type 2
439        encode_varint(req.password.len() as u64, buf);
440        buf.extend(req.password.as_bytes());
441    }
442    // Field 3: tenant_id (optional)
443    if let Some(ref tenant) = req.tenant_id {
444        if !tenant.is_empty() {
445            buf.push(0x1a); // tag 3, wire type 2
446            encode_varint(tenant.len() as u64, buf);
447            buf.extend(tenant.as_bytes());
448        }
449    }
450}
451
452/// Encode a varint for protobuf encoding.
453#[allow(dead_code)]
454fn encode_varint(mut v: u64, buf: &mut Vec<u8>) {
455    while v >= 0x80 {
456        buf.push((v as u8) | 0x80);
457        v >>= 7;
458    }
459    buf.push(v as u8);
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    #[test]
467    fn test_grpc_value_default() {
468        let val = GrpcValue::default();
469        assert!(val.kind.is_none());
470    }
471
472    #[test]
473    fn test_convert_proto_value_string() {
474        let proto_val = GrpcValue {
475            kind: Some(GrpcValueKind::StringVal("hello".to_string())),
476        };
477        let val = GrpcClient::convert_proto_value(&proto_val);
478        assert_eq!(val.as_string().unwrap(), "hello");
479    }
480
481    #[test]
482    fn test_convert_proto_value_int() {
483        let proto_val = GrpcValue {
484            kind: Some(GrpcValueKind::IntVal(42)),
485        };
486        let val = GrpcClient::convert_proto_value(&proto_val);
487        assert_eq!(val.as_int().unwrap(), 42);
488    }
489
490    #[test]
491    fn test_convert_proto_value_bool() {
492        let proto_val = GrpcValue {
493            kind: Some(GrpcValueKind::BoolVal(true)),
494        };
495        let val = GrpcClient::convert_proto_value(&proto_val);
496        assert!(val.as_bool().unwrap());
497    }
498
499    #[test]
500    fn test_convert_proto_value_null() {
501        let proto_val = GrpcValue {
502            kind: Some(GrpcValueKind::NullVal(true)),
503        };
504        let val = GrpcClient::convert_proto_value(&proto_val);
505        assert!(val.is_null());
506    }
507
508    #[test]
509    fn test_convert_proto_value_none() {
510        let proto_val = GrpcValue { kind: None };
511        let val = GrpcClient::convert_proto_value(&proto_val);
512        assert!(val.is_null());
513    }
514}