geode_client/client.rs
1//! Geode client implementation supporting both QUIC and gRPC transports.
2//! Uses protobuf wire protocol with 4-byte big-endian length prefix for QUIC.
3
4use log::{debug, trace, warn};
5use quinn::{ClientConfig, Endpoint};
6use rustls::pki_types::{CertificateDer, ServerName as RustlsServerName};
7use secrecy::{ExposeSecret, SecretString};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::net::{SocketAddr, ToSocketAddrs};
11use std::sync::Arc;
12use tokio::time::{Duration, timeout};
13
14use crate::dsn::{Dsn, Transport};
15use crate::error::{Error, Result};
16use crate::proto;
17use crate::types::Value;
18use crate::validate;
19
20const GEODE_ALPN: &[u8] = b"geode/1";
21
22/// Redact password from a DSN string for safe inclusion in error messages.
23/// Handles both URL format (quic://user:pass@host) and query parameter format (?password=xxx).
24#[allow(dead_code)] // Used in tests
25fn redact_dsn(dsn: &str) -> String {
26 let mut result = dsn.to_string();
27
28 // Handle URL format: scheme://user:password@host:port
29 // Look for pattern user:password@ and redact the password
30 if let Some(scheme_end) = result.find("://") {
31 let after_scheme = scheme_end + 3;
32 if let Some(at_pos) = result[after_scheme..].find('@') {
33 let auth_section = &result[after_scheme..after_scheme + at_pos];
34 if let Some(colon_pos) = auth_section.find(':') {
35 // Found user:password pattern
36 let user = &auth_section[..colon_pos];
37 let rest_start = after_scheme + at_pos;
38 result = format!(
39 "{}{}:{}{}",
40 &result[..after_scheme],
41 user,
42 "[REDACTED]",
43 &result[rest_start..]
44 );
45 }
46 }
47 }
48
49 // Handle query parameter format: host:port?password=xxx
50 // Redact password= and pass= parameters (only check once per pattern to avoid loops)
51 let patterns = ["password=", "pass="];
52 for pattern in patterns {
53 let lower = result.to_lowercase();
54 if let Some(start) = lower.find(pattern) {
55 let value_start = start + pattern.len();
56 // Find end of value (& or end of string)
57 let value_end = result[value_start..]
58 .find('&')
59 .map(|i| value_start + i)
60 .unwrap_or(result.len());
61
62 result = format!(
63 "{}[REDACTED]{}",
64 &result[..value_start],
65 &result[value_end..]
66 );
67 }
68 }
69
70 result
71}
72
73/// A column definition in a query result set.
74///
75/// Contains the column name and its GQL type as returned by the server.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Column {
78 /// The column name (or alias if specified in the query)
79 pub name: String,
80 /// The GQL type of the column (e.g., "INT", "STRING", "BOOL")
81 #[serde(rename = "type")]
82 pub col_type: String,
83}
84
85/// A page of query results.
86///
87/// Query results are returned in pages. Each page contains a slice of rows
88/// along with metadata about the result set.
89///
90/// # Example
91///
92/// ```ignore
93/// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name, n.age").await?;
94///
95/// for row in &page.rows {
96/// let name = row.get("name").unwrap().as_string()?;
97/// let age = row.get("age").unwrap().as_int()?;
98/// println!("{}: {}", name, age);
99/// }
100///
101/// if !page.final_page {
102/// // More results available, would need to pull next page
103/// }
104/// ```
105#[derive(Debug, Clone)]
106pub struct Page {
107 /// Column definitions for the result set
108 pub columns: Vec<Column>,
109 /// Result rows, each row is a map of column name to value
110 pub rows: Vec<HashMap<String, Value>>,
111 /// Whether results are ordered (ORDER BY was used)
112 pub ordered: bool,
113 /// The keys used for ordering, if any
114 pub order_keys: Vec<String>,
115 /// Whether this is the final page of results
116 pub final_page: bool,
117}
118
119/// A named savepoint within a transaction.
120///
121/// Savepoints allow partial rollback within a transaction. They can be created
122/// and managed via server-side GQL commands.
123///
124/// # Example
125///
126/// ```ignore
127/// conn.begin().await?;
128/// conn.query("CREATE (n:Node {id: 1})").await?;
129///
130/// let sp = conn.savepoint("before_risky_op")?;
131/// match conn.query("CREATE (n:Node {id: 2})").await {
132/// Ok(_) => {},
133/// Err(_) => conn.rollback_to(&sp).await?, // Undo only the second create
134/// }
135///
136/// conn.commit().await?; // First node is saved
137/// ```
138#[derive(Debug, Clone)]
139pub struct Savepoint {
140 /// The savepoint name
141 pub name: String,
142}
143
144/// A prepared statement for efficient repeated query execution.
145///
146/// Prepared statements allow you to define a query once and execute it
147/// multiple times with different parameters. This can improve performance
148/// by allowing query plan caching on the server.
149///
150/// # Example
151///
152/// ```ignore
153/// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p").await?;
154///
155/// for id in 1..=100 {
156/// let mut params = HashMap::new();
157/// params.insert("id".to_string(), Value::int(id));
158/// let (page, _) = stmt.execute(&mut conn, ¶ms).await?;
159/// // Process results...
160/// }
161/// ```
162#[derive(Debug, Clone)]
163pub struct PreparedStatement {
164 /// The GQL query string
165 query: String,
166 /// Parameter names extracted from the query
167 param_names: Vec<String>,
168}
169
170impl PreparedStatement {
171 /// Create a new prepared statement.
172 ///
173 /// Extracts parameter names from the query (tokens starting with `$`).
174 pub fn new(query: impl Into<String>) -> Self {
175 let query = query.into();
176 let param_names = Self::extract_param_names(&query);
177 Self { query, param_names }
178 }
179
180 /// Extract parameter names from a query string.
181 fn extract_param_names(query: &str) -> Vec<String> {
182 let mut names = Vec::new();
183 let mut chars = query.chars().peekable();
184
185 while let Some(c) = chars.next() {
186 if c == '$' {
187 let mut name = String::new();
188 while let Some(&next) = chars.peek() {
189 if next.is_ascii_alphanumeric() || next == '_' {
190 name.push(chars.next().unwrap());
191 } else {
192 break;
193 }
194 }
195 if !name.is_empty() && !names.contains(&name) {
196 names.push(name);
197 }
198 }
199 }
200
201 names
202 }
203
204 /// Get the query string.
205 pub fn query(&self) -> &str {
206 &self.query
207 }
208
209 /// Get the parameter names expected by this statement.
210 pub fn param_names(&self) -> &[String] {
211 &self.param_names
212 }
213
214 /// Execute the prepared statement with the given parameters.
215 ///
216 /// # Arguments
217 ///
218 /// * `conn` - The connection to execute on
219 /// * `params` - Parameter values (must include all parameters in the query)
220 ///
221 /// # Returns
222 ///
223 /// A tuple of (`Page`, `Option<String>`) with results and optional warnings.
224 ///
225 /// # Errors
226 ///
227 /// Returns an error if required parameters are missing or if the query fails.
228 pub async fn execute(
229 &self,
230 conn: &mut Connection,
231 params: &HashMap<String, crate::types::Value>,
232 ) -> crate::error::Result<(Page, Option<String>)> {
233 // Validate all required parameters are provided
234 for name in &self.param_names {
235 if !params.contains_key(name) {
236 return Err(crate::error::Error::validation(format!(
237 "Missing required parameter: {}",
238 name
239 )));
240 }
241 }
242
243 conn.query_with_params(&self.query, params).await
244 }
245}
246
247/// An operation in a query execution plan.
248#[derive(Debug, Clone)]
249pub struct PlanOperation {
250 /// Operation type (e.g., "NodeScan", "Filter", "Projection")
251 pub op_type: String,
252 /// Human-readable description
253 pub description: String,
254 /// Estimated row count for this operation
255 pub estimated_rows: Option<u64>,
256 /// Child operations
257 pub children: Vec<PlanOperation>,
258}
259
260/// A query execution plan.
261///
262/// Shows how the database will execute a query without actually running it.
263/// Useful for query optimization and understanding performance characteristics.
264#[derive(Debug, Clone)]
265pub struct QueryPlan {
266 /// Root operations in the plan
267 pub operations: Vec<PlanOperation>,
268 /// Total estimated rows
269 pub estimated_rows: u64,
270 /// Raw plan from server (for advanced analysis)
271 pub raw: serde_json::Value,
272}
273
274/// Query execution profile with timing information.
275///
276/// Includes the execution plan plus actual runtime statistics.
277#[derive(Debug, Clone)]
278pub struct QueryProfile {
279 /// The execution plan
280 pub plan: QueryPlan,
281 /// Actual rows returned
282 pub actual_rows: u64,
283 /// Total execution time in milliseconds
284 pub execution_time_ms: f64,
285 /// Raw profile from server
286 pub raw: serde_json::Value,
287}
288
289/// A Geode database client supporting both QUIC and gRPC transports.
290///
291/// Use the builder pattern to configure the client, then call [`connect`](Client::connect)
292/// to establish a connection.
293///
294/// # Transport Selection
295///
296/// The transport is selected based on the DSN scheme:
297/// - `quic://` - QUIC transport (default)
298/// - `grpc://` - gRPC transport
299///
300/// # Example
301///
302/// ```no_run
303/// use geode_client::Client;
304///
305/// # async fn example() -> geode_client::Result<()> {
306/// // QUIC transport (legacy API)
307/// let client = Client::new("127.0.0.1", 3141)
308/// .skip_verify(true) // Development only!
309/// .page_size(500)
310/// .client_name("my-app");
311///
312/// // Or use DSN with explicit transport
313/// let client = Client::from_dsn("quic://127.0.0.1:3141?insecure=true")?;
314/// let client = Client::from_dsn("grpc://127.0.0.1:50051")?;
315///
316/// let mut conn = client.connect().await?;
317/// let (page, _) = conn.query("RETURN 1 AS x").await?;
318/// conn.close()?;
319/// # Ok(())
320/// # }
321/// ```
322/// Client configuration for connecting to a Geode server.
323///
324/// The password field uses `SecretString` from the `secrecy` crate to ensure
325/// credentials are zeroized from memory on drop and not accidentally leaked
326/// in debug output or error messages.
327#[derive(Clone)]
328pub struct Client {
329 transport: Transport,
330 host: String,
331 port: u16,
332 tls_enabled: bool,
333 skip_verify: bool,
334 page_size: usize,
335 hello_name: String,
336 hello_ver: String,
337 conformance: String,
338 username: Option<String>,
339 /// Password stored using SecretString for secure memory handling (CWE-316).
340 /// Automatically zeroized on drop and redacted in Debug output.
341 password: Option<SecretString>,
342 /// Connection timeout in seconds (default: 10)
343 connect_timeout_secs: u64,
344 /// HELLO handshake timeout in seconds (default: 5)
345 hello_timeout_secs: u64,
346 /// Idle connection timeout in seconds (default: 30)
347 idle_timeout_secs: u64,
348}
349
350impl Client {
351 /// Create a new QUIC client for the specified host and port.
352 ///
353 /// This method creates a client using QUIC transport. For gRPC transport,
354 /// use [`from_dsn`](Client::from_dsn) with a `grpc://` scheme.
355 ///
356 /// # Arguments
357 ///
358 /// * `host` - The server hostname or IP address
359 /// * `port` - The server port (typically 3141 for Geode)
360 ///
361 /// # Example
362 ///
363 /// ```
364 /// use geode_client::Client;
365 ///
366 /// let client = Client::new("localhost", 3141);
367 /// let client = Client::new("192.168.1.100", 8443);
368 /// let client = Client::new(String::from("geode.example.com"), 3141);
369 /// ```
370 pub fn new(host: impl Into<String>, port: u16) -> Self {
371 Self {
372 transport: Transport::Quic,
373 host: host.into(),
374 port,
375 tls_enabled: true,
376 skip_verify: false,
377 page_size: 1000,
378 hello_name: "geode-rust".to_string(),
379 hello_ver: env!("CARGO_PKG_VERSION").to_string(),
380 conformance: "min".to_string(),
381 username: None,
382 password: None,
383 connect_timeout_secs: 10,
384 hello_timeout_secs: 5,
385 idle_timeout_secs: 30,
386 }
387 }
388
389 /// Create a new client from a DSN (Data Source Name) string.
390 ///
391 /// # Supported DSN Formats
392 ///
393 /// - `quic://host:port?options` - QUIC transport (recommended)
394 /// - `grpc://host:port?options` - gRPC transport
395 /// - `host:port?options` - Legacy format (defaults to QUIC)
396 ///
397 /// # Supported Options
398 ///
399 /// - `tls` - Enable/disable TLS (0/1/true/false)
400 /// - `insecure` or `skip_verify` - Skip TLS verification
401 /// - `page_size` - Results page size (default: 1000)
402 /// - `client_name` or `hello_name` - Client name
403 /// - `client_version` or `hello_ver` - Client version
404 /// - `conformance` - GQL conformance level
405 /// - `username` or `user` - Authentication username
406 /// - `password` or `pass` - Authentication password
407 ///
408 /// # Examples
409 ///
410 /// ```
411 /// use geode_client::Client;
412 ///
413 /// // QUIC transport (explicit)
414 /// let client = Client::from_dsn("quic://localhost:3141").unwrap();
415 ///
416 /// // gRPC transport
417 /// let client = Client::from_dsn("grpc://localhost:50051?tls=0").unwrap();
418 ///
419 /// // Legacy format (defaults to QUIC)
420 /// let client = Client::from_dsn("localhost:3141?insecure=true").unwrap();
421 ///
422 /// // With authentication
423 /// let client = Client::from_dsn("quic://admin:secret@localhost:3141").unwrap();
424 ///
425 /// // IPv6 support
426 /// let client = Client::from_dsn("grpc://[::1]:50051").unwrap();
427 /// ```
428 ///
429 /// # Errors
430 ///
431 /// Returns `Error::InvalidDsn` if:
432 /// - DSN is empty
433 /// - Scheme is unsupported (not quic://, grpc://, or schemeless)
434 /// - Host is missing
435 /// - Port is invalid
436 pub fn from_dsn(dsn_str: &str) -> Result<Self> {
437 let dsn = Dsn::parse(dsn_str)?;
438
439 Ok(Self {
440 transport: dsn.transport(),
441 host: dsn.host().to_string(),
442 port: dsn.port(),
443 tls_enabled: dsn.tls_enabled(),
444 skip_verify: dsn.skip_verify(),
445 page_size: dsn.page_size(),
446 hello_name: dsn.client_name().to_string(),
447 hello_ver: dsn.client_version().to_string(),
448 conformance: dsn.conformance().to_string(),
449 username: dsn.username().map(String::from),
450 password: dsn.password().map(|p| SecretString::from(p.to_string())),
451 connect_timeout_secs: 10,
452 hello_timeout_secs: 5,
453 idle_timeout_secs: 30,
454 })
455 }
456
457 /// Get the transport type for this client.
458 pub fn transport(&self) -> Transport {
459 self.transport
460 }
461
462 /// Skip TLS certificate verification.
463 ///
464 /// # Security Warning
465 ///
466 /// **This should only be used in development environments.** Disabling
467 /// certificate verification makes the connection vulnerable to
468 /// man-in-the-middle attacks.
469 ///
470 /// # Arguments
471 ///
472 /// * `skip` - If true, skip certificate verification
473 pub fn skip_verify(mut self, skip: bool) -> Self {
474 self.skip_verify = skip;
475 self
476 }
477
478 /// Set the page size for query results.
479 ///
480 /// Controls how many rows are returned per page when fetching results.
481 /// Larger values reduce round-trips but use more memory.
482 ///
483 /// # Arguments
484 ///
485 /// * `size` - Number of rows per page (default: 1000)
486 pub fn page_size(mut self, size: usize) -> Self {
487 self.page_size = size;
488 self
489 }
490
491 /// Set the client name sent to the server.
492 ///
493 /// This appears in server logs and can help with debugging.
494 ///
495 /// # Arguments
496 ///
497 /// * `name` - Client application name (default: "geode-rust-quinn")
498 pub fn client_name(mut self, name: impl Into<String>) -> Self {
499 self.hello_name = name.into();
500 self
501 }
502
503 /// Set the client version sent to the server.
504 ///
505 /// # Arguments
506 ///
507 /// * `version` - Client version string (default: "0.1.0")
508 pub fn client_version(mut self, version: impl Into<String>) -> Self {
509 self.hello_ver = version.into();
510 self
511 }
512
513 /// Set the GQL conformance level.
514 ///
515 /// # Arguments
516 ///
517 /// * `level` - Conformance level (default: "min")
518 pub fn conformance(mut self, level: impl Into<String>) -> Self {
519 self.conformance = level.into();
520 self
521 }
522
523 /// Set the authentication username.
524 ///
525 /// # Arguments
526 ///
527 /// * `username` - The username for authentication
528 ///
529 /// # Example
530 ///
531 /// ```
532 /// use geode_client::Client;
533 ///
534 /// let client = Client::new("localhost", 3141)
535 /// .username("admin")
536 /// .password("secret");
537 /// ```
538 pub fn username(mut self, username: impl Into<String>) -> Self {
539 self.username = Some(username.into());
540 self
541 }
542
543 /// Set the authentication password.
544 ///
545 /// The password is stored using `SecretString` which ensures it is:
546 /// - Zeroized from memory when dropped
547 /// - Not accidentally leaked in debug output
548 ///
549 /// # Arguments
550 ///
551 /// * `password` - The password for authentication
552 pub fn password(mut self, password: impl Into<String>) -> Self {
553 self.password = Some(SecretString::from(password.into()));
554 self
555 }
556
557 /// Set the connection timeout in seconds.
558 ///
559 /// This controls how long to wait for the initial QUIC connection
560 /// to be established. Default is 10 seconds.
561 ///
562 /// # Arguments
563 ///
564 /// * `seconds` - Timeout in seconds (must be > 0)
565 pub fn connect_timeout(mut self, seconds: u64) -> Self {
566 self.connect_timeout_secs = seconds.max(1);
567 self
568 }
569
570 /// Set the HELLO handshake timeout in seconds.
571 ///
572 /// This controls how long to wait for the server to respond to the
573 /// initial HELLO message. Default is 5 seconds.
574 ///
575 /// # Arguments
576 ///
577 /// * `seconds` - Timeout in seconds (must be > 0)
578 pub fn hello_timeout(mut self, seconds: u64) -> Self {
579 self.hello_timeout_secs = seconds.max(1);
580 self
581 }
582
583 /// Set the idle connection timeout in seconds.
584 ///
585 /// This controls how long an idle connection can remain open before
586 /// being automatically closed by the QUIC layer. Default is 30 seconds.
587 ///
588 /// # Arguments
589 ///
590 /// * `seconds` - Timeout in seconds (must be > 0)
591 pub fn idle_timeout(mut self, seconds: u64) -> Self {
592 self.idle_timeout_secs = seconds.max(1);
593 self
594 }
595
596 /// Validate the client configuration.
597 ///
598 /// Performs validation on all configuration parameters including:
599 /// - Hostname format (RFC 1035 compliant)
600 /// - Port number (1-65535)
601 /// - Page size (1-100,000)
602 ///
603 /// This method is automatically called by [`connect`](Self::connect).
604 /// You can call it manually to validate configuration before attempting
605 /// to connect.
606 ///
607 /// # Errors
608 ///
609 /// Returns a validation error if any parameter is invalid.
610 ///
611 /// # Example
612 ///
613 /// ```
614 /// use geode_client::Client;
615 ///
616 /// let client = Client::new("localhost", 3141);
617 /// assert!(client.validate().is_ok());
618 ///
619 /// // Invalid hostname
620 /// let invalid = Client::new("-invalid-host", 3141);
621 /// assert!(invalid.validate().is_err());
622 /// ```
623 pub fn validate(&self) -> Result<()> {
624 // Validate hostname format
625 validate::hostname(&self.host)?;
626
627 // Validate port (0 is reserved)
628 validate::port(self.port)?;
629
630 // Validate page size
631 validate::page_size(self.page_size)?;
632
633 Ok(())
634 }
635
636 /// Connect to the Geode database.
637 ///
638 /// Establishes a QUIC connection to the server, performs the TLS handshake,
639 /// and sends the initial HELLO message.
640 ///
641 /// # Returns
642 ///
643 /// A [`Connection`] that can be used to execute queries.
644 ///
645 /// # Errors
646 ///
647 /// Returns an error if:
648 /// - The hostname cannot be resolved
649 /// - The connection cannot be established
650 /// - TLS verification fails (unless `skip_verify` is true)
651 /// - The HELLO handshake fails
652 ///
653 /// # Example
654 ///
655 /// ```no_run
656 /// # use geode_client::Client;
657 /// # async fn example() -> geode_client::Result<()> {
658 /// let client = Client::new("localhost", 3141).skip_verify(true);
659 /// let mut conn = client.connect().await?;
660 /// // Use connection...
661 /// conn.close()?;
662 /// # Ok(())
663 /// # }
664 /// ```
665 // CANARY: REQ=REQ-CLIENT-RUST-001; FEATURE="RustClientConnection"; ASPECT=HelloHandshake; STATUS=IMPL; OWNER=clients; UPDATED=2025-02-14
666 pub async fn connect(&self) -> Result<Connection> {
667 // Validate configuration before connecting (Gap #9 - automatic validation)
668 self.validate()?;
669
670 // Expose the secret password only when needed for the connection
671 let password_ref = self.password.as_ref().map(|s| s.expose_secret());
672
673 match self.transport {
674 Transport::Quic => {
675 Connection::new_quic(
676 &self.host,
677 self.port,
678 self.skip_verify,
679 self.page_size,
680 &self.hello_name,
681 &self.hello_ver,
682 &self.conformance,
683 self.username.as_deref(),
684 password_ref,
685 self.connect_timeout_secs,
686 self.hello_timeout_secs,
687 self.idle_timeout_secs,
688 )
689 .await
690 }
691 Transport::Grpc => {
692 #[cfg(feature = "grpc")]
693 {
694 Connection::new_grpc(
695 &self.host,
696 self.port,
697 self.tls_enabled,
698 self.skip_verify,
699 self.page_size,
700 self.username.as_deref(),
701 password_ref,
702 )
703 .await
704 }
705 #[cfg(not(feature = "grpc"))]
706 {
707 Err(Error::connection(
708 "gRPC transport requires the 'grpc' feature to be enabled",
709 ))
710 }
711 }
712 }
713 }
714}
715
716/// Internal connection type for transport-specific implementations.
717#[allow(dead_code)]
718enum ConnectionKind {
719 /// QUIC transport connection
720 Quic {
721 conn: quinn::Connection,
722 send: quinn::SendStream,
723 recv: quinn::RecvStream,
724 /// Reserved for future streaming support
725 buffer: Vec<u8>,
726 /// Reserved for request ID tracking
727 next_request_id: u64,
728 /// Session ID from HELLO handshake
729 session_id: String,
730 },
731 /// gRPC transport connection
732 #[cfg(feature = "grpc")]
733 Grpc { client: crate::grpc::GrpcClient },
734}
735
736/// An active connection to a Geode database server.
737///
738/// A `Connection` represents a connection to the Geode server using either
739/// QUIC or gRPC transport. It provides methods for executing queries, managing
740/// transactions, and controlling the connection lifecycle.
741///
742/// # Transport Support
743///
744/// - **QUIC**: Uses a bidirectional stream with protobuf wire protocol
745/// - **gRPC**: Uses tonic-based gRPC client (requires `grpc` feature)
746///
747/// # Connection Lifecycle
748///
749/// 1. Create via [`Client::connect`]
750/// 2. Execute queries with [`query`](Connection::query) or [`query_with_params`](Connection::query_with_params)
751/// 3. Optionally use transactions with [`begin`](Connection::begin), [`commit`](Connection::commit), [`rollback`](Connection::rollback)
752/// 4. Close with [`close`](Connection::close)
753///
754/// # Example
755///
756/// ```no_run
757/// # use geode_client::Client;
758/// # async fn example() -> geode_client::Result<()> {
759/// let client = Client::new("localhost", 3141).skip_verify(true);
760/// let mut conn = client.connect().await?;
761///
762/// // Execute queries
763/// let (page, _) = conn.query("RETURN 42 AS answer").await?;
764/// println!("Answer: {}", page.rows[0].get("answer").unwrap().as_int()?);
765///
766/// // Use transactions
767/// conn.begin().await?;
768/// conn.query("CREATE (n:Node {id: 1})").await?;
769/// conn.commit().await?;
770///
771/// conn.close()?;
772/// # Ok(())
773/// # }
774/// ```
775///
776/// # Thread Safety
777///
778/// `Connection` is `!Sync` because the underlying transport streams are not thread-safe.
779/// For concurrent access, use [`ConnectionPool`](crate::ConnectionPool).
780pub struct Connection {
781 kind: ConnectionKind,
782 /// Page size for query results (reserved for future use)
783 #[allow(dead_code)]
784 page_size: usize,
785}
786
787impl Connection {
788 /// Create a new QUIC connection.
789 #[allow(clippy::too_many_arguments)]
790 async fn new_quic(
791 host: &str,
792 port: u16,
793 skip_verify: bool,
794 page_size: usize,
795 hello_name: &str,
796 hello_ver: &str,
797 conformance: &str,
798 username: Option<&str>,
799 password: Option<&str>,
800 connect_timeout_secs: u64,
801 hello_timeout_secs: u64,
802 idle_timeout_secs: u64,
803 ) -> Result<Self> {
804 let mut last_err: Option<Error> = None;
805
806 for attempt in 1..=3 {
807 match Self::connect_quic_once(
808 host,
809 port,
810 skip_verify,
811 page_size,
812 hello_name,
813 hello_ver,
814 conformance,
815 username,
816 password,
817 connect_timeout_secs,
818 hello_timeout_secs,
819 idle_timeout_secs,
820 )
821 .await
822 {
823 Ok(conn) => return Ok(conn),
824 Err(e) => {
825 last_err = Some(e);
826 if attempt < 3 {
827 debug!("Connection attempt {} failed, retrying...", attempt);
828 tokio::time::sleep(Duration::from_millis(150)).await;
829 }
830 }
831 }
832 }
833
834 Err(last_err.unwrap_or_else(|| Error::connection("Failed to connect")))
835 }
836
837 /// Create a new gRPC connection.
838 #[cfg(feature = "grpc")]
839 #[allow(clippy::too_many_arguments)]
840 async fn new_grpc(
841 host: &str,
842 port: u16,
843 tls_enabled: bool,
844 skip_verify: bool,
845 page_size: usize,
846 username: Option<&str>,
847 password: Option<&str>,
848 ) -> Result<Self> {
849 use crate::dsn::Dsn;
850
851 // Build DSN for gRPC client
852 let tls_val = if tls_enabled { "1" } else { "0" };
853 let dsn_str = if let (Some(user), Some(pass)) = (username, password) {
854 format!(
855 "grpc://{}:{}@{}:{}?tls={}&insecure={}",
856 user, pass, host, port, tls_val, skip_verify
857 )
858 } else {
859 format!(
860 "grpc://{}:{}?tls={}&insecure={}",
861 host, port, tls_val, skip_verify
862 )
863 };
864
865 let dsn = Dsn::parse(&dsn_str)?;
866 let client = crate::grpc::GrpcClient::connect(&dsn).await?;
867
868 Ok(Self {
869 kind: ConnectionKind::Grpc { client },
870 page_size,
871 })
872 }
873
874 #[allow(clippy::too_many_arguments)]
875 async fn connect_quic_once(
876 host: &str,
877 port: u16,
878 skip_verify: bool,
879 page_size: usize,
880 _hello_name: &str,
881 _hello_ver: &str,
882 _conformance: &str,
883 username: Option<&str>,
884 password: Option<&str>,
885 connect_timeout_secs: u64,
886 _hello_timeout_secs: u64,
887 idle_timeout_secs: u64,
888 ) -> Result<Self> {
889 debug!("Creating connection to {}:{}", host, port);
890
891 // Install default crypto provider for rustls
892 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
893
894 // Build Quinn client config with TLS 1.3 explicitly (QUIC requires TLS 1.3)
895 let mut client_crypto = if skip_verify {
896 // SECURITY WARNING: Disabling TLS verification exposes connections to MITM attacks.
897 // Credentials sent in HELLO may be intercepted. Only use for development/testing.
898 warn!(
899 "TLS certificate verification DISABLED - connection to {}:{} is vulnerable to MITM attacks. \
900 Do NOT use skip_verify in production!",
901 host, port
902 );
903 rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
904 .dangerous()
905 .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
906 .with_no_client_auth()
907 } else {
908 // Load system root certificates for proper TLS verification
909 let mut root_store = rustls::RootCertStore::empty();
910
911 let cert_result = rustls_native_certs::load_native_certs();
912
913 // Log any errors that occurred during certificate loading
914 for err in &cert_result.errors {
915 warn!("Error loading native certificate: {:?}", err);
916 }
917
918 let mut certs_loaded = 0;
919 let mut certs_failed = 0;
920
921 for cert in cert_result.certs {
922 match root_store.add(cert) {
923 Ok(()) => certs_loaded += 1,
924 Err(_) => certs_failed += 1,
925 }
926 }
927
928 if certs_loaded == 0 {
929 return Err(Error::tls(
930 "No system root certificates found. TLS verification cannot proceed. \
931 Either install system CA certificates or use skip_verify(true) for development only.",
932 ));
933 }
934
935 debug!(
936 "Loaded {} system root certificates ({} failed to parse)",
937 certs_loaded, certs_failed
938 );
939
940 rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
941 .with_root_certificates(root_store)
942 .with_no_client_auth()
943 };
944
945 // Set ALPN protocols
946 client_crypto.alpn_protocols = vec![GEODE_ALPN.to_vec()];
947
948 let mut client_config = ClientConfig::new(Arc::new(
949 quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto)
950 .map_err(|e| Error::connection(format!("Failed to create QUIC config: {}", e)))?,
951 ));
952
953 // Configure QUIC transport parameters to match Python/Go clients
954 let mut transport = quinn::TransportConfig::default();
955 // Cap idle timeout to quinn's maximum (2^62 - 1 microseconds ≈ 146 years)
956 // to prevent panic from VarInt overflow
957 let idle_timeout = Duration::from_secs(idle_timeout_secs.min(146_000 * 365 * 24 * 3600));
958 transport.max_idle_timeout(Some(idle_timeout.try_into().map_err(|_| {
959 Error::connection("Idle timeout value too large for QUIC protocol")
960 })?));
961 transport.keep_alive_interval(Some(Duration::from_secs(5)));
962 client_config.transport_config(Arc::new(transport));
963
964 // Create endpoint - "0.0.0.0:0" is a valid socket address literal that binds
965 // to any available port on all interfaces, so this parse cannot fail
966 let mut endpoint = Endpoint::client(
967 "0.0.0.0:0"
968 .parse()
969 .expect("0.0.0.0:0 is a valid socket address"),
970 )
971 .map_err(|e| Error::connection(format!("Failed to create endpoint: {}", e)))?;
972 endpoint.set_default_client_config(client_config);
973
974 // Resolve server address (supports hostnames as well as IP literals)
975 let mut resolved_addrs = format!("{}:{}", host, port)
976 .to_socket_addrs()
977 .map_err(|e| {
978 Error::connection(format!(
979 "Failed to resolve address {}:{} - {}",
980 host, port, e
981 ))
982 })?;
983
984 let server_addr: SocketAddr = resolved_addrs
985 .find(|addr| matches!(addr, SocketAddr::V4(_) | SocketAddr::V6(_)))
986 .ok_or_else(|| Error::connection("Invalid address: could not resolve host"))?;
987
988 debug!("Connecting to {}", server_addr);
989
990 // When skipping verification, don't use actual hostname for SNI
991 // This matches Python client behavior which avoids server_name when skip_verify=True
992 let server_name = if skip_verify {
993 "localhost" // Use generic name when skipping verification
994 } else {
995 host
996 };
997
998 trace!("Using server name for SNI: {}", server_name);
999
1000 let conn = timeout(
1001 Duration::from_secs(connect_timeout_secs),
1002 endpoint
1003 .connect(server_addr, server_name)
1004 .map_err(|e| Error::connection(format!("Connection failed: {}", e)))?,
1005 )
1006 .await
1007 .map_err(|_| Error::connection("Connection timeout"))?
1008 .map_err(|e| Error::connection(format!("Failed to establish connection: {}", e)))?;
1009
1010 debug!("Connection established to {}:{}", host, port);
1011
1012 // Open a single bidirectional stream used for the entire session.
1013 let (mut send, mut recv) = conn
1014 .open_bi()
1015 .await
1016 .map_err(|e| Error::connection(format!("Failed to open stream: {}", e)))?;
1017
1018 // Send HELLO message using protobuf with length prefix
1019 let hello_req = proto::HelloRequest {
1020 username: username.unwrap_or("").to_string(),
1021 password: password.unwrap_or("").to_string(),
1022 tenant_id: None,
1023 client_name: String::new(),
1024 client_version: String::new(),
1025 wanted_conformance: String::new(),
1026 };
1027 let msg = proto::QuicClientMessage {
1028 msg: Some(proto::quic_client_message::Msg::Hello(hello_req)),
1029 };
1030 let data = proto::encode_with_length_prefix(&msg);
1031
1032 send.write_all(&data)
1033 .await
1034 .map_err(|e| Error::connection(format!("Failed to send HELLO: {}", e)))?;
1035
1036 // Wait for HELLO response (length-prefixed protobuf)
1037 let mut length_buf = [0u8; 4];
1038 timeout(Duration::from_secs(5), recv.read_exact(&mut length_buf))
1039 .await
1040 .map_err(|_| Error::connection("HELLO response timeout"))?
1041 .map_err(|e| {
1042 Error::connection(format!("Failed to read HELLO response length: {}", e))
1043 })?;
1044
1045 let msg_len = u32::from_be_bytes(length_buf) as usize;
1046 let mut msg_buf = vec![0u8; msg_len];
1047 recv.read_exact(&mut msg_buf)
1048 .await
1049 .map_err(|e| Error::connection(format!("Failed to read HELLO response body: {}", e)))?;
1050
1051 let hello_response = proto::decode_quic_server_message(&msg_buf)?;
1052
1053 let session_id = match hello_response.msg {
1054 Some(proto::quic_server_message::Msg::Hello(ref hello_resp)) => {
1055 if !hello_resp.success {
1056 return Err(Error::connection(format!(
1057 "Authentication failed: {}",
1058 hello_resp.error_message
1059 )));
1060 }
1061 hello_resp.session_id.clone()
1062 }
1063 _ => {
1064 return Err(Error::connection("Expected HELLO response"));
1065 }
1066 };
1067
1068 debug!("HELLO handshake complete, session_id={}", session_id);
1069
1070 Ok(Self {
1071 kind: ConnectionKind::Quic {
1072 conn,
1073 send,
1074 recv,
1075 buffer: Vec::new(),
1076 next_request_id: 1,
1077 session_id,
1078 },
1079 page_size,
1080 })
1081 }
1082
1083 /// Send a protobuf message over QUIC.
1084 async fn send_proto_quic(
1085 send: &mut quinn::SendStream,
1086 msg: &proto::QuicClientMessage,
1087 ) -> Result<()> {
1088 let data = proto::encode_with_length_prefix(msg);
1089 send.write_all(&data)
1090 .await
1091 .map_err(|e| Error::connection(format!("Failed to send message: {}", e)))?;
1092 Ok(())
1093 }
1094
1095 /// Read a protobuf message over QUIC with timeout.
1096 async fn read_proto_quic(
1097 recv: &mut quinn::RecvStream,
1098 timeout_secs: u64,
1099 ) -> Result<proto::QuicServerMessage> {
1100 // Read 4-byte length prefix
1101 let mut length_buf = [0u8; 4];
1102 timeout(
1103 Duration::from_secs(timeout_secs),
1104 recv.read_exact(&mut length_buf),
1105 )
1106 .await
1107 .map_err(|_| Error::timeout())?
1108 .map_err(|e| Error::connection(format!("Failed to read response length: {}", e)))?;
1109
1110 let msg_len = u32::from_be_bytes(length_buf) as usize;
1111 let mut msg_buf = vec![0u8; msg_len];
1112 recv.read_exact(&mut msg_buf)
1113 .await
1114 .map_err(|e| Error::connection(format!("Failed to read response body: {}", e)))?;
1115
1116 proto::decode_quic_server_message(&msg_buf)
1117 }
1118
1119 /// Attempt to read a buffered protobuf message without blocking (QUIC).
1120 /// Returns Ok(None) if no complete message is available immediately.
1121 async fn try_read_proto_quic(
1122 recv: &mut quinn::RecvStream,
1123 ) -> Result<Option<proto::QuicServerMessage>> {
1124 // Try to read with a short timeout
1125 let mut length_buf = [0u8; 4];
1126 let read_result =
1127 timeout(Duration::from_millis(10), recv.read_exact(&mut length_buf)).await;
1128
1129 match read_result {
1130 Ok(Ok(())) => {
1131 let msg_len = u32::from_be_bytes(length_buf) as usize;
1132 let mut msg_buf = vec![0u8; msg_len];
1133 recv.read_exact(&mut msg_buf).await.map_err(|e| {
1134 Error::connection(format!("Failed to read response body: {}", e))
1135 })?;
1136 let msg = proto::decode_quic_server_message(&msg_buf)?;
1137 Ok(Some(msg))
1138 }
1139 Ok(Err(e)) => Err(Error::connection(format!("Failed to read response: {}", e))),
1140 Err(_) => Ok(None), // Timeout - no data available
1141 }
1142 }
1143
1144 /// Parse protobuf rows into Value maps (static version for QUIC).
1145 fn parse_proto_rows_static(
1146 proto_rows: &[proto::Row],
1147 columns: &[Column],
1148 ) -> Result<Vec<HashMap<String, Value>>> {
1149 let mut rows = Vec::new();
1150 for proto_row in proto_rows {
1151 let mut row = HashMap::new();
1152 for (i, col) in columns.iter().enumerate() {
1153 let value = if i < proto_row.values.len() {
1154 Self::convert_proto_value_static(&proto_row.values[i])
1155 } else {
1156 Value::null()
1157 };
1158 row.insert(col.name.clone(), value);
1159 }
1160 rows.push(row);
1161 }
1162 Ok(rows)
1163 }
1164
1165 /// Convert a protobuf Value to our Value type (static version).
1166 fn convert_proto_value_static(proto_val: &proto::Value) -> Value {
1167 match &proto_val.kind {
1168 Some(proto::value::Kind::NullVal(_)) => Value::null(),
1169 Some(proto::value::Kind::StringVal(s)) => Value::string(s.value.clone()),
1170 Some(proto::value::Kind::IntVal(i)) => Value::int(i.value),
1171 Some(proto::value::Kind::DoubleVal(d)) => {
1172 Value::decimal(rust_decimal::Decimal::from_f64_retain(d.value).unwrap_or_default())
1173 }
1174 Some(proto::value::Kind::BoolVal(b)) => Value::bool(*b),
1175 Some(proto::value::Kind::ListVal(list)) => {
1176 let values: Vec<Value> = list
1177 .values
1178 .iter()
1179 .map(Self::convert_proto_value_static)
1180 .collect();
1181 Value::array(values)
1182 }
1183 Some(proto::value::Kind::MapVal(map)) => {
1184 let mut obj = std::collections::HashMap::new();
1185 for entry in &map.entries {
1186 let val = entry
1187 .value
1188 .as_ref()
1189 .map(Self::convert_proto_value_static)
1190 .unwrap_or_else(Value::null);
1191 obj.insert(entry.key.clone(), val);
1192 }
1193 Value::object(obj)
1194 }
1195 Some(proto::value::Kind::NodeVal(node)) => {
1196 let mut obj = std::collections::HashMap::new();
1197 obj.insert("id".to_string(), Value::int(node.id as i64));
1198 let labels: Vec<Value> = node
1199 .labels
1200 .iter()
1201 .map(|l| Value::string(l.clone()))
1202 .collect();
1203 obj.insert("labels".to_string(), Value::array(labels));
1204 let mut props = std::collections::HashMap::new();
1205 for entry in &node.properties {
1206 let val = entry
1207 .value
1208 .as_ref()
1209 .map(Self::convert_proto_value_static)
1210 .unwrap_or_else(Value::null);
1211 props.insert(entry.key.clone(), val);
1212 }
1213 obj.insert("properties".to_string(), Value::object(props));
1214 Value::object(obj)
1215 }
1216 Some(proto::value::Kind::EdgeVal(edge)) => {
1217 let mut obj = std::collections::HashMap::new();
1218 obj.insert("id".to_string(), Value::int(edge.id as i64));
1219 obj.insert("from_id".to_string(), Value::int(edge.from_id as i64));
1220 obj.insert("to_id".to_string(), Value::int(edge.to_id as i64));
1221 obj.insert("label".to_string(), Value::string(edge.label.clone()));
1222 let mut props = std::collections::HashMap::new();
1223 for entry in &edge.properties {
1224 let val = entry
1225 .value
1226 .as_ref()
1227 .map(Self::convert_proto_value_static)
1228 .unwrap_or_else(Value::null);
1229 props.insert(entry.key.clone(), val);
1230 }
1231 obj.insert("properties".to_string(), Value::object(props));
1232 Value::object(obj)
1233 }
1234 Some(proto::value::Kind::DecimalVal(d)) => {
1235 // Try to parse the decimal from coeff + scale
1236 if let Ok(dec) = d.coeff.parse::<rust_decimal::Decimal>() {
1237 Value::decimal(dec)
1238 } else {
1239 Value::string(d.orig_repr.clone())
1240 }
1241 }
1242 Some(proto::value::Kind::BytesVal(b)) => {
1243 Value::string(format!("\\x{}", hex::encode(&b.value)))
1244 }
1245 _ => Value::null(),
1246 }
1247 }
1248
1249 /// Send BEGIN transaction command over QUIC.
1250 async fn send_begin_quic(
1251 send: &mut quinn::SendStream,
1252 recv: &mut quinn::RecvStream,
1253 session_id: &str,
1254 ) -> Result<()> {
1255 let msg = proto::QuicClientMessage {
1256 msg: Some(proto::quic_client_message::Msg::Begin(
1257 proto::BeginRequest {
1258 session_id: session_id.to_string(),
1259 ..Default::default()
1260 },
1261 )),
1262 };
1263 Self::send_proto_quic(send, &msg).await?;
1264
1265 let resp = Self::read_proto_quic(recv, 5).await?;
1266 if !matches!(resp.msg, Some(proto::quic_server_message::Msg::Begin(_))) {
1267 return Err(Error::protocol("Expected BEGIN response"));
1268 }
1269 Ok(())
1270 }
1271
1272 /// Send COMMIT transaction command over QUIC.
1273 async fn send_commit_quic(
1274 send: &mut quinn::SendStream,
1275 recv: &mut quinn::RecvStream,
1276 session_id: &str,
1277 ) -> Result<()> {
1278 let msg = proto::QuicClientMessage {
1279 msg: Some(proto::quic_client_message::Msg::Commit(
1280 proto::CommitRequest {
1281 session_id: session_id.to_string(),
1282 },
1283 )),
1284 };
1285 Self::send_proto_quic(send, &msg).await?;
1286
1287 let resp = Self::read_proto_quic(recv, 5).await?;
1288 if !matches!(resp.msg, Some(proto::quic_server_message::Msg::Commit(_))) {
1289 return Err(Error::protocol("Expected COMMIT response"));
1290 }
1291 Ok(())
1292 }
1293
1294 /// Send ROLLBACK transaction command over QUIC.
1295 async fn send_rollback_quic(
1296 send: &mut quinn::SendStream,
1297 recv: &mut quinn::RecvStream,
1298 session_id: &str,
1299 ) -> Result<()> {
1300 let msg = proto::QuicClientMessage {
1301 msg: Some(proto::quic_client_message::Msg::Rollback(
1302 proto::RollbackRequest {
1303 session_id: session_id.to_string(),
1304 },
1305 )),
1306 };
1307 Self::send_proto_quic(send, &msg).await?;
1308
1309 let resp = Self::read_proto_quic(recv, 5).await?;
1310 if !matches!(resp.msg, Some(proto::quic_server_message::Msg::Rollback(_))) {
1311 return Err(Error::protocol("Expected ROLLBACK response"));
1312 }
1313 Ok(())
1314 }
1315
1316 /// Execute a GQL query without parameters.
1317 ///
1318 /// # Arguments
1319 ///
1320 /// * `gql` - The GQL query string
1321 ///
1322 /// # Returns
1323 ///
1324 /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1325 /// and the optional string contains any query warnings.
1326 ///
1327 /// # Errors
1328 ///
1329 /// Returns [`Error::Query`] if the query fails to execute.
1330 ///
1331 /// # Example
1332 ///
1333 /// ```no_run
1334 /// # use geode_client::Client;
1335 /// # async fn example() -> geode_client::Result<()> {
1336 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1337 /// # let mut conn = client.connect().await?;
1338 /// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name LIMIT 10").await?;
1339 /// for row in &page.rows {
1340 /// println!("Name: {}", row.get("name").unwrap().as_string()?);
1341 /// }
1342 /// # Ok(())
1343 /// # }
1344 /// ```
1345 pub async fn query(&mut self, gql: &str) -> Result<(Page, Option<String>)> {
1346 self.query_with_params(gql, &HashMap::new()).await
1347 }
1348
1349 /// Execute a GQL query with parameters.
1350 ///
1351 /// Parameters are substituted for `$param_name` placeholders in the query.
1352 /// This is the recommended way to include dynamic values in queries, as it
1353 /// prevents injection attacks and allows query plan caching.
1354 ///
1355 /// # Arguments
1356 ///
1357 /// * `gql` - The GQL query string with parameter placeholders
1358 /// * `params` - A map of parameter names to values
1359 ///
1360 /// # Returns
1361 ///
1362 /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1363 /// and the optional string contains any query warnings.
1364 ///
1365 /// # Errors
1366 ///
1367 /// Returns [`Error::Query`] if the query fails to execute.
1368 ///
1369 /// # Example
1370 ///
1371 /// ```no_run
1372 /// # use geode_client::{Client, Value};
1373 /// # use std::collections::HashMap;
1374 /// # async fn example() -> geode_client::Result<()> {
1375 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1376 /// # let mut conn = client.connect().await?;
1377 /// let mut params = HashMap::new();
1378 /// params.insert("name".to_string(), Value::string("Alice"));
1379 /// params.insert("min_age".to_string(), Value::int(25));
1380 ///
1381 /// let (page, _) = conn.query_with_params(
1382 /// "MATCH (p:Person {name: $name}) WHERE p.age >= $min_age RETURN p",
1383 /// ¶ms
1384 /// ).await?;
1385 /// # Ok(())
1386 /// # }
1387 /// ```
1388 pub async fn query_with_params(
1389 &mut self,
1390 gql: &str,
1391 params: &HashMap<String, Value>,
1392 ) -> Result<(Page, Option<String>)> {
1393 match &mut self.kind {
1394 ConnectionKind::Quic {
1395 send,
1396 recv,
1397 session_id,
1398 ..
1399 } => Self::query_with_params_quic(send, recv, gql, params, session_id).await,
1400 #[cfg(feature = "grpc")]
1401 ConnectionKind::Grpc { client } => client.query_with_params(gql, params).await,
1402 }
1403 }
1404
1405 /// Execute a GQL query with parameters over QUIC transport.
1406 async fn query_with_params_quic(
1407 send: &mut quinn::SendStream,
1408 recv: &mut quinn::RecvStream,
1409 gql: &str,
1410 params: &HashMap<String, Value>,
1411 session_id: &str,
1412 ) -> Result<(Page, Option<String>)> {
1413 // Convert types::Value to proto::Param entries
1414 let params_proto: Vec<proto::Param> = params
1415 .iter()
1416 .map(|(k, v)| proto::Param {
1417 name: k.clone(),
1418 value: Some(v.to_proto_value()),
1419 })
1420 .collect();
1421
1422 // Send Execute request via protobuf
1423 let exec_req = proto::ExecuteRequest {
1424 session_id: session_id.to_string(),
1425 query: gql.to_string(),
1426 params: params_proto,
1427 };
1428 let msg = proto::QuicClientMessage {
1429 msg: Some(proto::quic_client_message::Msg::Execute(exec_req)),
1430 };
1431 Self::send_proto_quic(send, &msg)
1432 .await
1433 .map_err(|e| Error::query(format!("{}", e)))?;
1434
1435 // Read first response (should be schema or error)
1436 let resp = Self::read_proto_quic(recv, 10).await?;
1437
1438 let exec_resp = match resp.msg {
1439 Some(proto::quic_server_message::Msg::Execute(e)) => e,
1440 _ => return Err(Error::protocol("Expected Execute response")),
1441 };
1442
1443 // Check for error in payload
1444 if let Some(proto::execution_response::Payload::Error(ref err)) = exec_resp.payload {
1445 // Drain any follow-up messages (e.g., data page) the server may send after error
1446 let _ = Self::try_read_proto_quic(recv).await;
1447 return Err(Error::Query {
1448 code: err.code.clone(),
1449 message: err.message.clone(),
1450 });
1451 }
1452
1453 // Parse columns from schema payload
1454 let columns: Vec<Column> = match exec_resp.payload {
1455 Some(proto::execution_response::Payload::Schema(ref s)) => s
1456 .columns
1457 .iter()
1458 .map(|c| Column {
1459 name: c.name.clone(),
1460 col_type: c.r#type.clone(),
1461 })
1462 .collect(),
1463 _ => Vec::new(),
1464 };
1465
1466 trace!("Schema columns: {:?}", columns);
1467
1468 // Check for inline data page
1469 if let Some(inline_resp) = Self::try_read_proto_quic(recv).await? {
1470 if let Some(proto::quic_server_message::Msg::Execute(inline_exec)) = inline_resp.msg {
1471 if let Some(proto::execution_response::Payload::Error(ref err)) =
1472 inline_exec.payload
1473 {
1474 return Err(Error::Query {
1475 code: err.code.clone(),
1476 message: err.message.clone(),
1477 });
1478 }
1479
1480 if let Some(proto::execution_response::Payload::Page(ref page_data)) =
1481 inline_exec.payload
1482 {
1483 let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1484 let page = Page {
1485 columns,
1486 rows,
1487 ordered: page_data.ordered,
1488 order_keys: page_data.order_keys.clone(),
1489 final_page: page_data.r#final,
1490 };
1491 return Ok((page, None));
1492 }
1493
1494 // Metrics or heartbeat - empty result
1495 let page = Page {
1496 columns,
1497 rows: Vec::new(),
1498 ordered: false,
1499 order_keys: Vec::new(),
1500 final_page: true,
1501 };
1502 return Ok((page, None));
1503 }
1504 }
1505
1506 // Check if we got a page in the first response
1507 if let Some(proto::execution_response::Payload::Page(ref page_data)) = exec_resp.payload {
1508 let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1509 let page = Page {
1510 columns,
1511 rows,
1512 ordered: page_data.ordered,
1513 order_keys: page_data.order_keys.clone(),
1514 final_page: page_data.r#final,
1515 };
1516 return Ok((page, None));
1517 }
1518
1519 // Read next response for data page
1520 let resp = Self::read_proto_quic(recv, 30).await?;
1521 if let Some(proto::quic_server_message::Msg::Execute(exec_resp)) = resp.msg {
1522 if let Some(proto::execution_response::Payload::Error(ref err)) = exec_resp.payload {
1523 return Err(Error::Query {
1524 code: err.code.clone(),
1525 message: err.message.clone(),
1526 });
1527 }
1528
1529 if let Some(proto::execution_response::Payload::Page(ref page_data)) = exec_resp.payload
1530 {
1531 let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1532 let page = Page {
1533 columns,
1534 rows,
1535 ordered: page_data.ordered,
1536 order_keys: page_data.order_keys.clone(),
1537 final_page: page_data.r#final,
1538 };
1539 return Ok((page, None));
1540 }
1541 }
1542
1543 // Empty result
1544 let page = Page {
1545 columns,
1546 rows: Vec::new(),
1547 ordered: false,
1548 order_keys: Vec::new(),
1549 final_page: true,
1550 };
1551
1552 Ok((page, None))
1553 }
1554
1555 /// Execute a query without parameters (synchronous-style blocking version for test runner)
1556 pub fn query_sync(
1557 &mut self,
1558 gql: &str,
1559 params: Option<HashMap<String, serde_json::Value>>,
1560 ) -> Result<Page> {
1561 let params_map = params.unwrap_or_default();
1562 let params_typed: HashMap<String, Value> = params_map
1563 .into_iter()
1564 .map(|(k, v)| {
1565 let typed_val = crate::types::Value::from_json(v);
1566 (k, typed_val)
1567 })
1568 .collect();
1569
1570 match tokio::runtime::Handle::try_current() {
1571 Ok(handle) => {
1572 let (page, _cursor) =
1573 handle.block_on(self.query_with_params(gql, ¶ms_typed))?;
1574 Ok(page)
1575 }
1576 Err(_) => {
1577 let rt = tokio::runtime::Runtime::new()
1578 .map_err(|e| Error::query(format!("Failed to create runtime: {}", e)))?;
1579 let (page, _cursor) = rt.block_on(self.query_with_params(gql, ¶ms_typed))?;
1580 Ok(page)
1581 }
1582 }
1583 }
1584
1585 /// Begin a new transaction.
1586 ///
1587 /// After calling `begin`, all queries will be part of the transaction until
1588 /// [`commit`](Connection::commit) or [`rollback`](Connection::rollback) is called.
1589 ///
1590 /// # Errors
1591 ///
1592 /// Returns an error if a transaction is already in progress or if the
1593 /// server rejects the request.
1594 ///
1595 /// # Example
1596 ///
1597 /// ```no_run
1598 /// # use geode_client::Client;
1599 /// # async fn example() -> geode_client::Result<()> {
1600 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1601 /// # let mut conn = client.connect().await?;
1602 /// conn.begin().await?;
1603 /// conn.query("CREATE (n:Node {id: 1})").await?;
1604 /// conn.query("CREATE (n:Node {id: 2})").await?;
1605 /// conn.commit().await?; // Both nodes are now persisted
1606 /// # Ok(())
1607 /// # }
1608 /// ```
1609 pub async fn begin(&mut self) -> Result<()> {
1610 match &mut self.kind {
1611 ConnectionKind::Quic {
1612 send,
1613 recv,
1614 session_id,
1615 ..
1616 } => Self::send_begin_quic(send, recv, session_id).await,
1617 #[cfg(feature = "grpc")]
1618 ConnectionKind::Grpc { client } => client.begin().await,
1619 }
1620 }
1621
1622 /// Commit the current transaction.
1623 ///
1624 /// Persists all changes made since [`begin`](Connection::begin) was called.
1625 ///
1626 /// # Errors
1627 ///
1628 /// Returns an error if no transaction is in progress or if the server
1629 /// rejects the commit.
1630 ///
1631 /// # Example
1632 ///
1633 /// ```no_run
1634 /// # use geode_client::Client;
1635 /// # async fn example() -> geode_client::Result<()> {
1636 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1637 /// # let mut conn = client.connect().await?;
1638 /// conn.begin().await?;
1639 /// conn.query("CREATE (n:Node)").await?;
1640 /// conn.commit().await?; // Changes are now permanent
1641 /// # Ok(())
1642 /// # }
1643 /// ```
1644 pub async fn commit(&mut self) -> Result<()> {
1645 match &mut self.kind {
1646 ConnectionKind::Quic {
1647 send,
1648 recv,
1649 session_id,
1650 ..
1651 } => Self::send_commit_quic(send, recv, session_id).await,
1652 #[cfg(feature = "grpc")]
1653 ConnectionKind::Grpc { client } => client.commit().await,
1654 }
1655 }
1656
1657 /// Rollback the current transaction.
1658 ///
1659 /// Discards all changes made since [`begin`](Connection::begin) was called.
1660 ///
1661 /// # Errors
1662 ///
1663 /// Returns an error if no transaction is in progress.
1664 ///
1665 /// # Example
1666 ///
1667 /// ```no_run
1668 /// # use geode_client::Client;
1669 /// # async fn example() -> geode_client::Result<()> {
1670 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1671 /// # let mut conn = client.connect().await?;
1672 /// conn.begin().await?;
1673 /// match conn.query("CREATE (n:InvalidNode)").await {
1674 /// Ok(_) => conn.commit().await?,
1675 /// Err(_) => conn.rollback().await?, // Undo everything
1676 /// }
1677 /// # Ok(())
1678 /// # }
1679 /// ```
1680 pub async fn rollback(&mut self) -> Result<()> {
1681 match &mut self.kind {
1682 ConnectionKind::Quic {
1683 send,
1684 recv,
1685 session_id,
1686 ..
1687 } => Self::send_rollback_quic(send, recv, session_id).await,
1688 #[cfg(feature = "grpc")]
1689 ConnectionKind::Grpc { client } => client.rollback().await,
1690 }
1691 }
1692
1693 /// Create a prepared statement for efficient repeated execution.
1694 ///
1695 /// Prepared statements allow you to define a query once and execute it
1696 /// multiple times with different parameters. The query text is parsed
1697 /// to extract parameter names (tokens starting with `$`).
1698 ///
1699 /// # Arguments
1700 ///
1701 /// * `query` - The GQL query string with parameter placeholders
1702 ///
1703 /// # Returns
1704 ///
1705 /// A [`PreparedStatement`] that can be executed multiple times.
1706 ///
1707 /// # Example
1708 ///
1709 /// ```no_run
1710 /// # use geode_client::{Client, Value};
1711 /// # use std::collections::HashMap;
1712 /// # async fn example() -> geode_client::Result<()> {
1713 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1714 /// # let mut conn = client.connect().await?;
1715 /// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p.name")?;
1716 ///
1717 /// for id in 1..=100 {
1718 /// let mut params = HashMap::new();
1719 /// params.insert("id".to_string(), Value::int(id));
1720 /// let (page, _) = stmt.execute(&mut conn, ¶ms).await?;
1721 /// // Process results...
1722 /// }
1723 /// # Ok(())
1724 /// # }
1725 /// ```
1726 pub fn prepare(&self, query: &str) -> Result<PreparedStatement> {
1727 Ok(PreparedStatement::new(query))
1728 }
1729
1730 /// Get the execution plan for a query without running it.
1731 ///
1732 /// This is useful for understanding how the database will execute a query
1733 /// and for identifying potential performance issues.
1734 ///
1735 /// # Arguments
1736 ///
1737 /// * `gql` - The GQL query string to explain
1738 ///
1739 /// # Returns
1740 ///
1741 /// A [`QueryPlan`] containing the execution plan details.
1742 ///
1743 /// # Errors
1744 ///
1745 /// Returns an error if the query is invalid or cannot be planned.
1746 ///
1747 /// # Example
1748 ///
1749 /// ```no_run
1750 /// # use geode_client::Client;
1751 /// # async fn example() -> geode_client::Result<()> {
1752 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1753 /// # let mut conn = client.connect().await?;
1754 /// let plan = conn.explain("MATCH (p:Person)-[:KNOWS]->(f) RETURN f").await?;
1755 /// println!("Estimated rows: {}", plan.estimated_rows);
1756 /// for op in &plan.operations {
1757 /// println!(" {} - {}", op.op_type, op.description);
1758 /// }
1759 /// # Ok(())
1760 /// # }
1761 /// ```
1762 pub async fn explain(&mut self, gql: &str) -> Result<QueryPlan> {
1763 // Execute EXPLAIN as a query via protobuf
1764 let explain_query = format!("EXPLAIN {}", gql);
1765 let (_page, _) = self.query(&explain_query).await?;
1766
1767 // Parse the plan from the response
1768 // The result format depends on server implementation
1769 Ok(QueryPlan {
1770 operations: Vec::new(),
1771 estimated_rows: 0,
1772 raw: serde_json::json!({}),
1773 })
1774 }
1775
1776 /// Execute a query and return the execution profile with timing information.
1777 ///
1778 /// This runs the query and collects detailed execution statistics including
1779 /// actual row counts and timing for each operation.
1780 ///
1781 /// # Arguments
1782 ///
1783 /// * `gql` - The GQL query string to profile
1784 ///
1785 /// # Returns
1786 ///
1787 /// A [`QueryProfile`] containing the execution plan and runtime statistics.
1788 ///
1789 /// # Errors
1790 ///
1791 /// Returns an error if the query fails to execute.
1792 ///
1793 /// # Example
1794 ///
1795 /// ```no_run
1796 /// # use geode_client::Client;
1797 /// # async fn example() -> geode_client::Result<()> {
1798 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1799 /// # let mut conn = client.connect().await?;
1800 /// let profile = conn.profile("MATCH (p:Person) RETURN p LIMIT 100").await?;
1801 /// println!("Execution time: {:.2}ms", profile.execution_time_ms);
1802 /// println!("Actual rows: {}", profile.actual_rows);
1803 /// # Ok(())
1804 /// # }
1805 /// ```
1806 pub async fn profile(&mut self, gql: &str) -> Result<QueryProfile> {
1807 // Execute PROFILE as a query via protobuf
1808 let profile_query = format!("PROFILE {}", gql);
1809 let (page, _) = self.query(&profile_query).await?;
1810
1811 // Parse the profile from the response
1812 let plan = QueryPlan {
1813 operations: Vec::new(),
1814 estimated_rows: 0,
1815 raw: serde_json::json!({}),
1816 };
1817
1818 Ok(QueryProfile {
1819 plan,
1820 actual_rows: page.rows.len() as u64,
1821 execution_time_ms: 0.0,
1822 raw: serde_json::json!({}),
1823 })
1824 }
1825
1826 /// Execute multiple queries in a batch.
1827 ///
1828 /// This is more efficient than executing queries one at a time when you
1829 /// have multiple independent queries to run.
1830 ///
1831 /// # Arguments
1832 ///
1833 /// * `queries` - A slice of (query, optional params) tuples
1834 ///
1835 /// # Returns
1836 ///
1837 /// A `Vec<Page>` with results for each query, in the same order as input.
1838 ///
1839 /// # Errors
1840 ///
1841 /// Returns an error if any query fails. Queries are executed in order,
1842 /// so earlier queries may have completed before the error.
1843 ///
1844 /// # Example
1845 ///
1846 /// ```no_run
1847 /// # use geode_client::Client;
1848 /// # async fn example() -> geode_client::Result<()> {
1849 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1850 /// # let mut conn = client.connect().await?;
1851 /// let results = conn.batch(&[
1852 /// ("MATCH (n:Person) RETURN count(n)", None),
1853 /// ("MATCH (n:Company) RETURN count(n)", None),
1854 /// ("MATCH ()-[r:WORKS_AT]->() RETURN count(r)", None),
1855 /// ]).await?;
1856 ///
1857 /// for (i, page) in results.iter().enumerate() {
1858 /// println!("Query {}: {} rows", i + 1, page.rows.len());
1859 /// }
1860 /// # Ok(())
1861 /// # }
1862 /// ```
1863 pub async fn batch(
1864 &mut self,
1865 queries: &[(&str, Option<&HashMap<String, Value>>)],
1866 ) -> Result<Vec<Page>> {
1867 let mut results = Vec::with_capacity(queries.len());
1868
1869 for (query, params) in queries {
1870 let (page, _) = match params {
1871 Some(p) => self.query_with_params(query, p).await?,
1872 None => self.query(query).await?,
1873 };
1874 results.push(page);
1875 }
1876
1877 Ok(results)
1878 }
1879
1880 /// Parse plan operations from a server response.
1881 /// Reserved for future EXPLAIN/PROFILE response parsing.
1882 #[allow(dead_code)]
1883 fn parse_plan_operations(result: &serde_json::Value) -> Vec<PlanOperation> {
1884 let mut operations = Vec::new();
1885
1886 if let Some(ops) = result.get("operations").and_then(|o| o.as_array()) {
1887 for op in ops {
1888 operations.push(Self::parse_single_operation(op));
1889 }
1890 } else if let Some(plan) = result.get("plan") {
1891 // Alternative format: single "plan" object
1892 operations.push(Self::parse_single_operation(plan));
1893 }
1894
1895 operations
1896 }
1897
1898 /// Parse a single operation from JSON.
1899 #[allow(dead_code)]
1900 fn parse_single_operation(op: &serde_json::Value) -> PlanOperation {
1901 let op_type = op
1902 .get("type")
1903 .or_else(|| op.get("op_type"))
1904 .and_then(|t| t.as_str())
1905 .unwrap_or("Unknown")
1906 .to_string();
1907
1908 let description = op
1909 .get("description")
1910 .or_else(|| op.get("desc"))
1911 .and_then(|d| d.as_str())
1912 .unwrap_or("")
1913 .to_string();
1914
1915 let estimated_rows = op
1916 .get("estimated_rows")
1917 .or_else(|| op.get("rows"))
1918 .and_then(|r| r.as_u64());
1919
1920 let children = op
1921 .get("children")
1922 .and_then(|c| c.as_array())
1923 .map(|arr| arr.iter().map(Self::parse_single_operation).collect())
1924 .unwrap_or_default();
1925
1926 PlanOperation {
1927 op_type,
1928 description,
1929 estimated_rows,
1930 children,
1931 }
1932 }
1933
1934 /// Close the connection.
1935 ///
1936 /// Gracefully closes the connection. After calling this method,
1937 /// the connection can no longer be used.
1938 ///
1939 /// # Note
1940 ///
1941 /// It's good practice to explicitly close connections, but they will also
1942 /// be closed when dropped.
1943 ///
1944 /// # Example
1945 ///
1946 /// ```no_run
1947 /// # use geode_client::Client;
1948 /// # async fn example() -> geode_client::Result<()> {
1949 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1950 /// let mut conn = client.connect().await?;
1951 /// // ... use connection ...
1952 /// conn.close()?;
1953 /// # Ok(())
1954 /// # }
1955 /// ```
1956 pub fn close(&mut self) -> Result<()> {
1957 match &mut self.kind {
1958 ConnectionKind::Quic { conn, .. } => {
1959 // QUIC close is asynchronous and best-effort - the CONNECTION_CLOSE frame
1960 // will be sent by Quinn's internal I/O handling. No blocking delay needed.
1961 // (Gap #17: Removed std::thread::sleep that blocked the async runtime)
1962 conn.close(0u32.into(), b"client closing");
1963 Ok(())
1964 }
1965 #[cfg(feature = "grpc")]
1966 ConnectionKind::Grpc { client } => client.close(),
1967 }
1968 }
1969
1970 /// Check if the connection is still healthy.
1971 ///
1972 /// Returns `true` if the underlying QUIC connection is still open and usable.
1973 /// This is used by connection pools to verify connections before reuse.
1974 ///
1975 /// # Example
1976 ///
1977 /// ```ignore
1978 /// let mut conn = client.connect().await?;
1979 /// if conn.is_healthy() {
1980 /// // Connection is still usable
1981 /// let (page, _) = conn.query("RETURN 1").await?;
1982 /// }
1983 /// ```
1984 pub fn is_healthy(&self) -> bool {
1985 match &self.kind {
1986 ConnectionKind::Quic { conn, .. } => {
1987 // Quinn's close_reason() returns Some if the connection was closed
1988 conn.close_reason().is_none()
1989 }
1990 #[cfg(feature = "grpc")]
1991 ConnectionKind::Grpc { .. } => {
1992 // gRPC connections are managed by tonic, always report healthy
1993 true
1994 }
1995 }
1996 }
1997}
1998
1999/// Certificate verifier that skips all verification (INSECURE - for development only)
2000#[derive(Debug)]
2001struct SkipServerVerification;
2002
2003impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
2004 fn verify_server_cert(
2005 &self,
2006 _end_entity: &CertificateDer,
2007 _intermediates: &[CertificateDer],
2008 _server_name: &RustlsServerName,
2009 _ocsp_response: &[u8],
2010 _now: rustls::pki_types::UnixTime,
2011 ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
2012 Ok(rustls::client::danger::ServerCertVerified::assertion())
2013 }
2014
2015 fn verify_tls12_signature(
2016 &self,
2017 _message: &[u8],
2018 _cert: &CertificateDer,
2019 _dss: &rustls::DigitallySignedStruct,
2020 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
2021 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
2022 }
2023
2024 fn verify_tls13_signature(
2025 &self,
2026 _message: &[u8],
2027 _cert: &CertificateDer,
2028 _dss: &rustls::DigitallySignedStruct,
2029 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
2030 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
2031 }
2032
2033 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
2034 vec![
2035 rustls::SignatureScheme::RSA_PKCS1_SHA256,
2036 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
2037 rustls::SignatureScheme::ED25519,
2038 ]
2039 }
2040}
2041
2042#[cfg(test)]
2043mod tests {
2044 use super::*;
2045
2046 // PreparedStatement tests
2047
2048 #[test]
2049 fn test_prepared_statement_new() {
2050 let stmt = PreparedStatement::new("MATCH (n:Person {id: $id}) RETURN n");
2051 assert_eq!(stmt.query(), "MATCH (n:Person {id: $id}) RETURN n");
2052 assert_eq!(stmt.param_names(), &["id"]);
2053 }
2054
2055 #[test]
2056 fn test_prepared_statement_multiple_params() {
2057 let stmt = PreparedStatement::new(
2058 "MATCH (p:Person {name: $name}) WHERE p.age > $min_age AND p.city = $city RETURN p",
2059 );
2060 assert!(stmt.query().contains("$name"));
2061 let names = stmt.param_names();
2062 assert_eq!(names.len(), 3);
2063 assert!(names.contains(&"name".to_string()));
2064 assert!(names.contains(&"min_age".to_string()));
2065 assert!(names.contains(&"city".to_string()));
2066 }
2067
2068 #[test]
2069 fn test_prepared_statement_no_params() {
2070 let stmt = PreparedStatement::new("MATCH (n) RETURN n LIMIT 10");
2071 assert!(stmt.param_names().is_empty());
2072 }
2073
2074 #[test]
2075 fn test_prepared_statement_duplicate_params() {
2076 let stmt =
2077 PreparedStatement::new("MATCH (a {id: $id})-[:KNOWS]->(b {id: $id}) RETURN a, b");
2078 // Should deduplicate parameter names
2079 assert_eq!(stmt.param_names(), &["id"]);
2080 }
2081
2082 #[test]
2083 fn test_prepared_statement_underscore_params() {
2084 let stmt = PreparedStatement::new("MATCH (n {user_id: $user_id}) RETURN n");
2085 assert_eq!(stmt.param_names(), &["user_id"]);
2086 }
2087
2088 #[test]
2089 fn test_prepared_statement_numeric_params() {
2090 let stmt = PreparedStatement::new("RETURN $param1, $param2, $param123");
2091 let names = stmt.param_names();
2092 assert_eq!(names.len(), 3);
2093 assert!(names.contains(&"param1".to_string()));
2094 assert!(names.contains(&"param2".to_string()));
2095 assert!(names.contains(&"param123".to_string()));
2096 }
2097
2098 // PlanOperation tests
2099
2100 #[test]
2101 fn test_plan_operation_struct() {
2102 let op = PlanOperation {
2103 op_type: "NodeScan".to_string(),
2104 description: "Scan Person nodes".to_string(),
2105 estimated_rows: Some(100),
2106 children: vec![],
2107 };
2108 assert_eq!(op.op_type, "NodeScan");
2109 assert_eq!(op.description, "Scan Person nodes");
2110 assert_eq!(op.estimated_rows, Some(100));
2111 assert!(op.children.is_empty());
2112 }
2113
2114 #[test]
2115 fn test_plan_operation_with_children() {
2116 let child = PlanOperation {
2117 op_type: "Filter".to_string(),
2118 description: "Filter by age".to_string(),
2119 estimated_rows: Some(50),
2120 children: vec![],
2121 };
2122 let parent = PlanOperation {
2123 op_type: "Projection".to_string(),
2124 description: "Project name, age".to_string(),
2125 estimated_rows: Some(50),
2126 children: vec![child],
2127 };
2128 assert_eq!(parent.children.len(), 1);
2129 assert_eq!(parent.children[0].op_type, "Filter");
2130 }
2131
2132 // QueryPlan tests
2133
2134 #[test]
2135 fn test_query_plan_struct() {
2136 let plan = QueryPlan {
2137 operations: vec![PlanOperation {
2138 op_type: "NodeScan".to_string(),
2139 description: "Full scan".to_string(),
2140 estimated_rows: Some(1000),
2141 children: vec![],
2142 }],
2143 estimated_rows: 1000,
2144 raw: serde_json::json!({"type": "plan"}),
2145 };
2146 assert_eq!(plan.operations.len(), 1);
2147 assert_eq!(plan.estimated_rows, 1000);
2148 }
2149
2150 // QueryProfile tests
2151
2152 #[test]
2153 fn test_query_profile_struct() {
2154 let plan = QueryPlan {
2155 operations: vec![],
2156 estimated_rows: 100,
2157 raw: serde_json::json!({}),
2158 };
2159 let profile = QueryProfile {
2160 plan,
2161 actual_rows: 95,
2162 execution_time_ms: 12.5,
2163 raw: serde_json::json!({"type": "profile"}),
2164 };
2165 assert_eq!(profile.actual_rows, 95);
2166 assert!((profile.execution_time_ms - 12.5).abs() < 0.001);
2167 }
2168
2169 // Page tests
2170
2171 #[test]
2172 fn test_page_struct() {
2173 let page = Page {
2174 columns: vec![Column {
2175 name: "x".to_string(),
2176 col_type: "INT".to_string(),
2177 }],
2178 rows: vec![],
2179 ordered: false,
2180 order_keys: vec![],
2181 final_page: true,
2182 };
2183 assert_eq!(page.columns.len(), 1);
2184 assert!(page.rows.is_empty());
2185 assert!(page.final_page);
2186 }
2187
2188 // Column tests
2189
2190 #[test]
2191 fn test_column_struct() {
2192 let col = Column {
2193 name: "age".to_string(),
2194 col_type: "INT".to_string(),
2195 };
2196 assert_eq!(col.name, "age");
2197 assert_eq!(col.col_type, "INT");
2198 }
2199
2200 // Savepoint tests
2201
2202 #[test]
2203 fn test_savepoint_struct() {
2204 let sp = Savepoint {
2205 name: "before_update".to_string(),
2206 };
2207 assert_eq!(sp.name, "before_update");
2208 }
2209
2210 // Client builder tests
2211
2212 #[test]
2213 fn test_client_builder_defaults() {
2214 let _client = Client::new("localhost", 3141);
2215 // Test passes if it compiles - verifies defaults work
2216 }
2217
2218 #[test]
2219 fn test_client_builder_chain() {
2220 let _client = Client::new("example.com", 8443)
2221 .skip_verify(true)
2222 .page_size(500)
2223 .client_name("test-app")
2224 .client_version("2.0.0")
2225 .conformance("full");
2226 // Test passes if it compiles - verifies builder chain works
2227 }
2228
2229 #[test]
2230 fn test_client_clone() {
2231 let client = Client::new("localhost", 3141).skip_verify(true);
2232 let _cloned = client.clone();
2233 // Test passes if it compiles - verifies Clone is implemented
2234 }
2235
2236 // parse_plan_operations tests
2237
2238 #[test]
2239 fn test_parse_plan_operations_empty() {
2240 let result = serde_json::json!({});
2241 let ops = Connection::parse_plan_operations(&result);
2242 assert!(ops.is_empty());
2243 }
2244
2245 #[test]
2246 fn test_parse_plan_operations_array() {
2247 let result = serde_json::json!({
2248 "operations": [
2249 {"type": "NodeScan", "description": "Scan nodes", "estimated_rows": 100},
2250 {"type": "Filter", "description": "Apply filter", "estimated_rows": 50}
2251 ]
2252 });
2253 let ops = Connection::parse_plan_operations(&result);
2254 assert_eq!(ops.len(), 2);
2255 assert_eq!(ops[0].op_type, "NodeScan");
2256 assert_eq!(ops[1].op_type, "Filter");
2257 }
2258
2259 #[test]
2260 fn test_parse_plan_operations_single_plan() {
2261 let result = serde_json::json!({
2262 "plan": {"op_type": "FullScan", "desc": "Full table scan"}
2263 });
2264 let ops = Connection::parse_plan_operations(&result);
2265 assert_eq!(ops.len(), 1);
2266 assert_eq!(ops[0].op_type, "FullScan");
2267 assert_eq!(ops[0].description, "Full table scan");
2268 }
2269
2270 #[test]
2271 fn test_parse_single_operation() {
2272 let op_json = serde_json::json!({
2273 "type": "IndexScan",
2274 "description": "Use index on Person(name)",
2275 "estimated_rows": 25,
2276 "children": [
2277 {"type": "Filter", "description": "Filter results"}
2278 ]
2279 });
2280 let op = Connection::parse_single_operation(&op_json);
2281 assert_eq!(op.op_type, "IndexScan");
2282 assert_eq!(op.description, "Use index on Person(name)");
2283 assert_eq!(op.estimated_rows, Some(25));
2284 assert_eq!(op.children.len(), 1);
2285 assert_eq!(op.children[0].op_type, "Filter");
2286 }
2287
2288 #[test]
2289 fn test_parse_single_operation_minimal() {
2290 let op_json = serde_json::json!({});
2291 let op = Connection::parse_single_operation(&op_json);
2292 assert_eq!(op.op_type, "Unknown");
2293 assert_eq!(op.description, "");
2294 assert_eq!(op.estimated_rows, None);
2295 assert!(op.children.is_empty());
2296 }
2297
2298 #[test]
2299 fn test_parse_single_operation_alt_fields() {
2300 let op_json = serde_json::json!({
2301 "op_type": "Sort",
2302 "desc": "Sort by name ASC",
2303 "rows": 100
2304 });
2305 let op = Connection::parse_single_operation(&op_json);
2306 assert_eq!(op.op_type, "Sort");
2307 assert_eq!(op.description, "Sort by name ASC");
2308 assert_eq!(op.estimated_rows, Some(100));
2309 }
2310
2311 // redact_dsn tests - Gap #7 (DSN Password Exposure)
2312
2313 #[test]
2314 fn test_redact_dsn_url_with_password() {
2315 let dsn = "quic://admin:secret123@localhost:3141";
2316 let redacted = redact_dsn(dsn);
2317 assert!(redacted.contains("[REDACTED]"));
2318 assert!(!redacted.contains("secret123"));
2319 assert!(redacted.contains("admin"));
2320 assert!(redacted.contains("localhost"));
2321 }
2322
2323 #[test]
2324 fn test_redact_dsn_url_without_password() {
2325 let dsn = "quic://admin@localhost:3141";
2326 let redacted = redact_dsn(dsn);
2327 assert!(!redacted.contains("[REDACTED]"));
2328 assert!(redacted.contains("admin"));
2329 assert!(redacted.contains("localhost"));
2330 }
2331
2332 #[test]
2333 fn test_redact_dsn_url_no_auth() {
2334 let dsn = "quic://localhost:3141";
2335 let redacted = redact_dsn(dsn);
2336 assert_eq!(redacted, dsn);
2337 }
2338
2339 #[test]
2340 fn test_redact_dsn_query_param_password() {
2341 let dsn = "localhost:3141?username=admin&password=secret123";
2342 let redacted = redact_dsn(dsn);
2343 assert!(redacted.contains("[REDACTED]"));
2344 assert!(!redacted.contains("secret123"));
2345 assert!(redacted.contains("username=admin"));
2346 }
2347
2348 #[test]
2349 fn test_redact_dsn_query_param_pass() {
2350 let dsn = "localhost:3141?user=admin&pass=mysecret";
2351 let redacted = redact_dsn(dsn);
2352 assert!(redacted.contains("[REDACTED]"));
2353 assert!(!redacted.contains("mysecret"));
2354 }
2355
2356 #[test]
2357 fn test_redact_dsn_simple_no_password() {
2358 let dsn = "localhost:3141?insecure=true";
2359 let redacted = redact_dsn(dsn);
2360 assert_eq!(redacted, dsn);
2361 }
2362
2363 #[test]
2364 fn test_redact_dsn_url_with_query_and_password() {
2365 let dsn = "quic://user:pass@localhost:3141?insecure=true";
2366 let redacted = redact_dsn(dsn);
2367 assert!(redacted.contains("[REDACTED]"));
2368 assert!(!redacted.contains(":pass@"));
2369 assert!(redacted.contains("insecure=true"));
2370 }
2371
2372 // validate() tests - Gap #9 (Validation Not Automatic)
2373
2374 #[test]
2375 fn test_client_validate_valid() {
2376 let client = Client::new("localhost", 3141);
2377 assert!(client.validate().is_ok());
2378 }
2379
2380 #[test]
2381 fn test_client_validate_valid_hostname() {
2382 let client = Client::new("geode.example.com", 3141);
2383 assert!(client.validate().is_ok());
2384 }
2385
2386 #[test]
2387 fn test_client_validate_valid_ipv4() {
2388 let client = Client::new("192.168.1.1", 8443);
2389 assert!(client.validate().is_ok());
2390 }
2391
2392 #[test]
2393 fn test_client_validate_invalid_hostname_hyphen_start() {
2394 let client = Client::new("-invalid", 3141);
2395 assert!(client.validate().is_err());
2396 }
2397
2398 #[test]
2399 fn test_client_validate_invalid_hostname_hyphen_end() {
2400 let client = Client::new("invalid-", 3141);
2401 assert!(client.validate().is_err());
2402 }
2403
2404 #[test]
2405 fn test_client_validate_invalid_port_zero() {
2406 let client = Client::new("localhost", 0);
2407 assert!(client.validate().is_err());
2408 }
2409
2410 #[test]
2411 fn test_client_validate_invalid_page_size_zero() {
2412 let client = Client::new("localhost", 3141).page_size(0);
2413 assert!(client.validate().is_err());
2414 }
2415
2416 #[test]
2417 fn test_client_validate_invalid_page_size_too_large() {
2418 let client = Client::new("localhost", 3141).page_size(200_000);
2419 assert!(client.validate().is_err());
2420 }
2421
2422 #[test]
2423 fn test_client_validate_with_all_options() {
2424 let client = Client::new("geode.example.com", 8443)
2425 .skip_verify(true)
2426 .page_size(500)
2427 .username("admin")
2428 .password("secret")
2429 .connect_timeout(15)
2430 .hello_timeout(10)
2431 .idle_timeout(60);
2432 assert!(client.validate().is_ok());
2433 }
2434
2435 // Gap #13: Test that extreme timeout values don't cause builder panics
2436 #[test]
2437 fn test_client_extreme_timeout_values() {
2438 // These should not panic - the actual validation/capping happens at connect time
2439 let _client = Client::new("localhost", 3141)
2440 .connect_timeout(u64::MAX)
2441 .hello_timeout(u64::MAX)
2442 .idle_timeout(u64::MAX);
2443 // Builder should accept any u64 value without panicking
2444 }
2445}