Skip to main content

geode_client/
client.rs

1//! Geode client implementation supporting both QUIC and gRPC transports.
2//! Uses protobuf wire protocol with 4-byte big-endian length prefix for QUIC.
3
4use log::{debug, trace, warn};
5use quinn::{ClientConfig, Endpoint};
6use rustls::pki_types::{CertificateDer, ServerName as RustlsServerName};
7use secrecy::{ExposeSecret, SecretString};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::net::{SocketAddr, ToSocketAddrs};
11use std::sync::Arc;
12use tokio::time::{Duration, timeout};
13
14use crate::dsn::{Dsn, Transport};
15use crate::error::{Error, Result};
16use crate::proto;
17use crate::types::Value;
18use crate::validate;
19
20const GEODE_ALPN: &[u8] = b"geode/1";
21
22/// Redact password from a DSN string for safe inclusion in error messages.
23/// Handles both URL format (quic://user:pass@host) and query parameter format (?password=xxx).
24#[allow(dead_code)] // Used in tests
25fn redact_dsn(dsn: &str) -> String {
26    let mut result = dsn.to_string();
27
28    // Handle URL format: scheme://user:password@host:port
29    // Look for pattern user:password@ and redact the password
30    if let Some(scheme_end) = result.find("://") {
31        let after_scheme = scheme_end + 3;
32        if let Some(at_pos) = result[after_scheme..].find('@') {
33            let auth_section = &result[after_scheme..after_scheme + at_pos];
34            if let Some(colon_pos) = auth_section.find(':') {
35                // Found user:password pattern
36                let user = &auth_section[..colon_pos];
37                let rest_start = after_scheme + at_pos;
38                result = format!(
39                    "{}{}:{}{}",
40                    &result[..after_scheme],
41                    user,
42                    "[REDACTED]",
43                    &result[rest_start..]
44                );
45            }
46        }
47    }
48
49    // Handle query parameter format: host:port?password=xxx
50    // Redact password= and pass= parameters (only check once per pattern to avoid loops)
51    let patterns = ["password=", "pass="];
52    for pattern in patterns {
53        let lower = result.to_lowercase();
54        if let Some(start) = lower.find(pattern) {
55            let value_start = start + pattern.len();
56            // Find end of value (& or end of string)
57            let value_end = result[value_start..]
58                .find('&')
59                .map(|i| value_start + i)
60                .unwrap_or(result.len());
61
62            result = format!(
63                "{}[REDACTED]{}",
64                &result[..value_start],
65                &result[value_end..]
66            );
67        }
68    }
69
70    result
71}
72
73/// A column definition in a query result set.
74///
75/// Contains the column name and its GQL type as returned by the server.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Column {
78    /// The column name (or alias if specified in the query)
79    pub name: String,
80    /// The GQL type of the column (e.g., "INT", "STRING", "BOOL")
81    #[serde(rename = "type")]
82    pub col_type: String,
83}
84
85/// A page of query results.
86///
87/// Query results are returned in pages. Each page contains a slice of rows
88/// along with metadata about the result set.
89///
90/// # Example
91///
92/// ```ignore
93/// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name, n.age").await?;
94///
95/// for row in &page.rows {
96///     let name = row.get("name").unwrap().as_string()?;
97///     let age = row.get("age").unwrap().as_int()?;
98///     println!("{}: {}", name, age);
99/// }
100///
101/// if !page.final_page {
102///     // More results available, would need to pull next page
103/// }
104/// ```
105#[derive(Debug, Clone)]
106pub struct Page {
107    /// Column definitions for the result set
108    pub columns: Vec<Column>,
109    /// Result rows, each row is a map of column name to value
110    pub rows: Vec<HashMap<String, Value>>,
111    /// Whether results are ordered (ORDER BY was used)
112    pub ordered: bool,
113    /// The keys used for ordering, if any
114    pub order_keys: Vec<String>,
115    /// Whether this is the final page of results
116    pub final_page: bool,
117}
118
119/// A named savepoint within a transaction.
120///
121/// Savepoints allow partial rollback within a transaction. They can be created
122/// and managed via server-side GQL commands.
123///
124/// # Example
125///
126/// ```ignore
127/// conn.begin().await?;
128/// conn.query("CREATE (n:Node {id: 1})").await?;
129///
130/// let sp = conn.savepoint("before_risky_op")?;
131/// match conn.query("CREATE (n:Node {id: 2})").await {
132///     Ok(_) => {},
133///     Err(_) => conn.rollback_to(&sp).await?,  // Undo only the second create
134/// }
135///
136/// conn.commit().await?;  // First node is saved
137/// ```
138#[derive(Debug, Clone)]
139pub struct Savepoint {
140    /// The savepoint name
141    pub name: String,
142}
143
144/// A prepared statement for efficient repeated query execution.
145///
146/// Prepared statements allow you to define a query once and execute it
147/// multiple times with different parameters. This can improve performance
148/// by allowing query plan caching on the server.
149///
150/// # Example
151///
152/// ```ignore
153/// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p").await?;
154///
155/// for id in 1..=100 {
156///     let mut params = HashMap::new();
157///     params.insert("id".to_string(), Value::int(id));
158///     let (page, _) = stmt.execute(&mut conn, &params).await?;
159///     // Process results...
160/// }
161/// ```
162#[derive(Debug, Clone)]
163pub struct PreparedStatement {
164    /// The GQL query string
165    query: String,
166    /// Parameter names extracted from the query
167    param_names: Vec<String>,
168}
169
170impl PreparedStatement {
171    /// Create a new prepared statement.
172    ///
173    /// Extracts parameter names from the query (tokens starting with `$`).
174    pub fn new(query: impl Into<String>) -> Self {
175        let query = query.into();
176        let param_names = Self::extract_param_names(&query);
177        Self { query, param_names }
178    }
179
180    /// Extract parameter names from a query string.
181    fn extract_param_names(query: &str) -> Vec<String> {
182        let mut names = Vec::new();
183        let mut chars = query.chars().peekable();
184
185        while let Some(c) = chars.next() {
186            if c == '$' {
187                let mut name = String::new();
188                while let Some(&next) = chars.peek() {
189                    if next.is_ascii_alphanumeric() || next == '_' {
190                        name.push(chars.next().unwrap());
191                    } else {
192                        break;
193                    }
194                }
195                if !name.is_empty() && !names.contains(&name) {
196                    names.push(name);
197                }
198            }
199        }
200
201        names
202    }
203
204    /// Get the query string.
205    pub fn query(&self) -> &str {
206        &self.query
207    }
208
209    /// Get the parameter names expected by this statement.
210    pub fn param_names(&self) -> &[String] {
211        &self.param_names
212    }
213
214    /// Execute the prepared statement with the given parameters.
215    ///
216    /// # Arguments
217    ///
218    /// * `conn` - The connection to execute on
219    /// * `params` - Parameter values (must include all parameters in the query)
220    ///
221    /// # Returns
222    ///
223    /// A tuple of (`Page`, `Option<String>`) with results and optional warnings.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if required parameters are missing or if the query fails.
228    pub async fn execute(
229        &self,
230        conn: &mut Connection,
231        params: &HashMap<String, crate::types::Value>,
232    ) -> crate::error::Result<(Page, Option<String>)> {
233        // Validate all required parameters are provided
234        for name in &self.param_names {
235            if !params.contains_key(name) {
236                return Err(crate::error::Error::validation(format!(
237                    "Missing required parameter: {}",
238                    name
239                )));
240            }
241        }
242
243        conn.query_with_params(&self.query, params).await
244    }
245}
246
247/// An operation in a query execution plan.
248#[derive(Debug, Clone)]
249pub struct PlanOperation {
250    /// Operation type (e.g., "NodeScan", "Filter", "Projection")
251    pub op_type: String,
252    /// Human-readable description
253    pub description: String,
254    /// Estimated row count for this operation
255    pub estimated_rows: Option<u64>,
256    /// Child operations
257    pub children: Vec<PlanOperation>,
258}
259
260/// A query execution plan.
261///
262/// Shows how the database will execute a query without actually running it.
263/// Useful for query optimization and understanding performance characteristics.
264#[derive(Debug, Clone)]
265pub struct QueryPlan {
266    /// Root operations in the plan
267    pub operations: Vec<PlanOperation>,
268    /// Total estimated rows
269    pub estimated_rows: u64,
270    /// Raw plan from server (for advanced analysis)
271    pub raw: serde_json::Value,
272}
273
274/// Query execution profile with timing information.
275///
276/// Includes the execution plan plus actual runtime statistics.
277#[derive(Debug, Clone)]
278pub struct QueryProfile {
279    /// The execution plan
280    pub plan: QueryPlan,
281    /// Actual rows returned
282    pub actual_rows: u64,
283    /// Total execution time in milliseconds
284    pub execution_time_ms: f64,
285    /// Raw profile from server
286    pub raw: serde_json::Value,
287}
288
289/// A Geode database client supporting both QUIC and gRPC transports.
290///
291/// Use the builder pattern to configure the client, then call [`connect`](Client::connect)
292/// to establish a connection.
293///
294/// # Transport Selection
295///
296/// The transport is selected based on the DSN scheme:
297/// - `quic://` - QUIC transport (default)
298/// - `grpc://` - gRPC transport
299///
300/// # Example
301///
302/// ```no_run
303/// use geode_client::Client;
304///
305/// # async fn example() -> geode_client::Result<()> {
306/// // QUIC transport (legacy API)
307/// let client = Client::new("127.0.0.1", 3141)
308///     .skip_verify(true)  // Development only!
309///     .page_size(500)
310///     .client_name("my-app");
311///
312/// // Or use DSN with explicit transport
313/// let client = Client::from_dsn("quic://127.0.0.1:3141?insecure=true")?;
314/// let client = Client::from_dsn("grpc://127.0.0.1:50051")?;
315///
316/// let mut conn = client.connect().await?;
317/// let (page, _) = conn.query("RETURN 1 AS x").await?;
318/// conn.close()?;
319/// # Ok(())
320/// # }
321/// ```
322/// Client configuration for connecting to a Geode server.
323///
324/// The password field uses `SecretString` from the `secrecy` crate to ensure
325/// credentials are zeroized from memory on drop and not accidentally leaked
326/// in debug output or error messages.
327#[derive(Clone)]
328pub struct Client {
329    transport: Transport,
330    host: String,
331    port: u16,
332    tls_enabled: bool,
333    skip_verify: bool,
334    page_size: usize,
335    hello_name: String,
336    hello_ver: String,
337    conformance: String,
338    username: Option<String>,
339    /// Password stored using SecretString for secure memory handling (CWE-316).
340    /// Automatically zeroized on drop and redacted in Debug output.
341    password: Option<SecretString>,
342    /// Connection timeout in seconds (default: 10)
343    connect_timeout_secs: u64,
344    /// HELLO handshake timeout in seconds (default: 5)
345    hello_timeout_secs: u64,
346    /// Idle connection timeout in seconds (default: 30)
347    idle_timeout_secs: u64,
348}
349
350impl Client {
351    /// Create a new QUIC client for the specified host and port.
352    ///
353    /// This method creates a client using QUIC transport. For gRPC transport,
354    /// use [`from_dsn`](Client::from_dsn) with a `grpc://` scheme.
355    ///
356    /// # Arguments
357    ///
358    /// * `host` - The server hostname or IP address
359    /// * `port` - The server port (typically 3141 for Geode)
360    ///
361    /// # Example
362    ///
363    /// ```
364    /// use geode_client::Client;
365    ///
366    /// let client = Client::new("localhost", 3141);
367    /// let client = Client::new("192.168.1.100", 8443);
368    /// let client = Client::new(String::from("geode.example.com"), 3141);
369    /// ```
370    pub fn new(host: impl Into<String>, port: u16) -> Self {
371        Self {
372            transport: Transport::Quic,
373            host: host.into(),
374            port,
375            tls_enabled: true,
376            skip_verify: false,
377            page_size: 1000,
378            hello_name: "geode-rust".to_string(),
379            hello_ver: env!("CARGO_PKG_VERSION").to_string(),
380            conformance: "min".to_string(),
381            username: None,
382            password: None,
383            connect_timeout_secs: 10,
384            hello_timeout_secs: 5,
385            idle_timeout_secs: 30,
386        }
387    }
388
389    /// Create a new client from a DSN (Data Source Name) string.
390    ///
391    /// # Supported DSN Formats
392    ///
393    /// - `quic://host:port?options` - QUIC transport (recommended)
394    /// - `grpc://host:port?options` - gRPC transport
395    /// - `host:port?options` - Legacy format (defaults to QUIC)
396    ///
397    /// # Supported Options
398    ///
399    /// - `tls` - Enable/disable TLS (0/1/true/false)
400    /// - `insecure` or `skip_verify` - Skip TLS verification
401    /// - `page_size` - Results page size (default: 1000)
402    /// - `client_name` or `hello_name` - Client name
403    /// - `client_version` or `hello_ver` - Client version
404    /// - `conformance` - GQL conformance level
405    /// - `username` or `user` - Authentication username
406    /// - `password` or `pass` - Authentication password
407    ///
408    /// # Examples
409    ///
410    /// ```
411    /// use geode_client::Client;
412    ///
413    /// // QUIC transport (explicit)
414    /// let client = Client::from_dsn("quic://localhost:3141").unwrap();
415    ///
416    /// // gRPC transport
417    /// let client = Client::from_dsn("grpc://localhost:50051?tls=0").unwrap();
418    ///
419    /// // Legacy format (defaults to QUIC)
420    /// let client = Client::from_dsn("localhost:3141?insecure=true").unwrap();
421    ///
422    /// // With authentication
423    /// let client = Client::from_dsn("quic://admin:secret@localhost:3141").unwrap();
424    ///
425    /// // IPv6 support
426    /// let client = Client::from_dsn("grpc://[::1]:50051").unwrap();
427    /// ```
428    ///
429    /// # Errors
430    ///
431    /// Returns `Error::InvalidDsn` if:
432    /// - DSN is empty
433    /// - Scheme is unsupported (not quic://, grpc://, or schemeless)
434    /// - Host is missing
435    /// - Port is invalid
436    pub fn from_dsn(dsn_str: &str) -> Result<Self> {
437        let dsn = Dsn::parse(dsn_str)?;
438
439        Ok(Self {
440            transport: dsn.transport(),
441            host: dsn.host().to_string(),
442            port: dsn.port(),
443            tls_enabled: dsn.tls_enabled(),
444            skip_verify: dsn.skip_verify(),
445            page_size: dsn.page_size(),
446            hello_name: dsn.client_name().to_string(),
447            hello_ver: dsn.client_version().to_string(),
448            conformance: dsn.conformance().to_string(),
449            username: dsn.username().map(String::from),
450            password: dsn.password().map(|p| SecretString::from(p.to_string())),
451            connect_timeout_secs: 10,
452            hello_timeout_secs: 5,
453            idle_timeout_secs: 30,
454        })
455    }
456
457    /// Get the transport type for this client.
458    pub fn transport(&self) -> Transport {
459        self.transport
460    }
461
462    /// Skip TLS certificate verification.
463    ///
464    /// # Security Warning
465    ///
466    /// **This should only be used in development environments.** Disabling
467    /// certificate verification makes the connection vulnerable to
468    /// man-in-the-middle attacks.
469    ///
470    /// # Arguments
471    ///
472    /// * `skip` - If true, skip certificate verification
473    pub fn skip_verify(mut self, skip: bool) -> Self {
474        self.skip_verify = skip;
475        self
476    }
477
478    /// Set the page size for query results.
479    ///
480    /// Controls how many rows are returned per page when fetching results.
481    /// Larger values reduce round-trips but use more memory.
482    ///
483    /// # Arguments
484    ///
485    /// * `size` - Number of rows per page (default: 1000)
486    pub fn page_size(mut self, size: usize) -> Self {
487        self.page_size = size;
488        self
489    }
490
491    /// Set the client name sent to the server.
492    ///
493    /// This appears in server logs and can help with debugging.
494    ///
495    /// # Arguments
496    ///
497    /// * `name` - Client application name (default: "geode-rust-quinn")
498    pub fn client_name(mut self, name: impl Into<String>) -> Self {
499        self.hello_name = name.into();
500        self
501    }
502
503    /// Set the client version sent to the server.
504    ///
505    /// # Arguments
506    ///
507    /// * `version` - Client version string (default: "0.1.0")
508    pub fn client_version(mut self, version: impl Into<String>) -> Self {
509        self.hello_ver = version.into();
510        self
511    }
512
513    /// Set the GQL conformance level.
514    ///
515    /// # Arguments
516    ///
517    /// * `level` - Conformance level (default: "min")
518    pub fn conformance(mut self, level: impl Into<String>) -> Self {
519        self.conformance = level.into();
520        self
521    }
522
523    /// Set the authentication username.
524    ///
525    /// # Arguments
526    ///
527    /// * `username` - The username for authentication
528    ///
529    /// # Example
530    ///
531    /// ```
532    /// use geode_client::Client;
533    ///
534    /// let client = Client::new("localhost", 3141)
535    ///     .username("admin")
536    ///     .password("secret");
537    /// ```
538    pub fn username(mut self, username: impl Into<String>) -> Self {
539        self.username = Some(username.into());
540        self
541    }
542
543    /// Set the authentication password.
544    ///
545    /// The password is stored using `SecretString` which ensures it is:
546    /// - Zeroized from memory when dropped
547    /// - Not accidentally leaked in debug output
548    ///
549    /// # Arguments
550    ///
551    /// * `password` - The password for authentication
552    pub fn password(mut self, password: impl Into<String>) -> Self {
553        self.password = Some(SecretString::from(password.into()));
554        self
555    }
556
557    /// Set the connection timeout in seconds.
558    ///
559    /// This controls how long to wait for the initial QUIC connection
560    /// to be established. Default is 10 seconds.
561    ///
562    /// # Arguments
563    ///
564    /// * `seconds` - Timeout in seconds (must be > 0)
565    pub fn connect_timeout(mut self, seconds: u64) -> Self {
566        self.connect_timeout_secs = seconds.max(1);
567        self
568    }
569
570    /// Set the HELLO handshake timeout in seconds.
571    ///
572    /// This controls how long to wait for the server to respond to the
573    /// initial HELLO message. Default is 5 seconds.
574    ///
575    /// # Arguments
576    ///
577    /// * `seconds` - Timeout in seconds (must be > 0)
578    pub fn hello_timeout(mut self, seconds: u64) -> Self {
579        self.hello_timeout_secs = seconds.max(1);
580        self
581    }
582
583    /// Set the idle connection timeout in seconds.
584    ///
585    /// This controls how long an idle connection can remain open before
586    /// being automatically closed by the QUIC layer. Default is 30 seconds.
587    ///
588    /// # Arguments
589    ///
590    /// * `seconds` - Timeout in seconds (must be > 0)
591    pub fn idle_timeout(mut self, seconds: u64) -> Self {
592        self.idle_timeout_secs = seconds.max(1);
593        self
594    }
595
596    /// Validate the client configuration.
597    ///
598    /// Performs validation on all configuration parameters including:
599    /// - Hostname format (RFC 1035 compliant)
600    /// - Port number (1-65535)
601    /// - Page size (1-100,000)
602    ///
603    /// This method is automatically called by [`connect`](Self::connect).
604    /// You can call it manually to validate configuration before attempting
605    /// to connect.
606    ///
607    /// # Errors
608    ///
609    /// Returns a validation error if any parameter is invalid.
610    ///
611    /// # Example
612    ///
613    /// ```
614    /// use geode_client::Client;
615    ///
616    /// let client = Client::new("localhost", 3141);
617    /// assert!(client.validate().is_ok());
618    ///
619    /// // Invalid hostname
620    /// let invalid = Client::new("-invalid-host", 3141);
621    /// assert!(invalid.validate().is_err());
622    /// ```
623    pub fn validate(&self) -> Result<()> {
624        // Validate hostname format
625        validate::hostname(&self.host)?;
626
627        // Validate port (0 is reserved)
628        validate::port(self.port)?;
629
630        // Validate page size
631        validate::page_size(self.page_size)?;
632
633        Ok(())
634    }
635
636    /// Connect to the Geode database.
637    ///
638    /// Establishes a QUIC connection to the server, performs the TLS handshake,
639    /// and sends the initial HELLO message.
640    ///
641    /// # Returns
642    ///
643    /// A [`Connection`] that can be used to execute queries.
644    ///
645    /// # Errors
646    ///
647    /// Returns an error if:
648    /// - The hostname cannot be resolved
649    /// - The connection cannot be established
650    /// - TLS verification fails (unless `skip_verify` is true)
651    /// - The HELLO handshake fails
652    ///
653    /// # Example
654    ///
655    /// ```no_run
656    /// # use geode_client::Client;
657    /// # async fn example() -> geode_client::Result<()> {
658    /// let client = Client::new("localhost", 3141).skip_verify(true);
659    /// let mut conn = client.connect().await?;
660    /// // Use connection...
661    /// conn.close()?;
662    /// # Ok(())
663    /// # }
664    /// ```
665    // CANARY: REQ=REQ-CLIENT-RUST-001; FEATURE="RustClientConnection"; ASPECT=HelloHandshake; STATUS=IMPL; OWNER=clients; UPDATED=2025-02-14
666    pub async fn connect(&self) -> Result<Connection> {
667        // Validate configuration before connecting (Gap #9 - automatic validation)
668        self.validate()?;
669
670        // Expose the secret password only when needed for the connection
671        let password_ref = self.password.as_ref().map(|s| s.expose_secret());
672
673        match self.transport {
674            Transport::Quic => {
675                Connection::new_quic(
676                    &self.host,
677                    self.port,
678                    self.skip_verify,
679                    self.page_size,
680                    &self.hello_name,
681                    &self.hello_ver,
682                    &self.conformance,
683                    self.username.as_deref(),
684                    password_ref,
685                    self.connect_timeout_secs,
686                    self.hello_timeout_secs,
687                    self.idle_timeout_secs,
688                )
689                .await
690            }
691            Transport::Grpc => {
692                #[cfg(feature = "grpc")]
693                {
694                    Connection::new_grpc(
695                        &self.host,
696                        self.port,
697                        self.tls_enabled,
698                        self.skip_verify,
699                        self.page_size,
700                        self.username.as_deref(),
701                        password_ref,
702                    )
703                    .await
704                }
705                #[cfg(not(feature = "grpc"))]
706                {
707                    Err(Error::connection(
708                        "gRPC transport requires the 'grpc' feature to be enabled",
709                    ))
710                }
711            }
712        }
713    }
714}
715
716/// Internal connection type for transport-specific implementations.
717#[allow(dead_code)]
718enum ConnectionKind {
719    /// QUIC transport connection
720    Quic {
721        conn: quinn::Connection,
722        send: quinn::SendStream,
723        recv: quinn::RecvStream,
724        /// Reserved for future streaming support
725        buffer: Vec<u8>,
726        /// Reserved for request ID tracking
727        next_request_id: u64,
728        /// Session ID from HELLO handshake
729        session_id: String,
730    },
731    /// gRPC transport connection
732    #[cfg(feature = "grpc")]
733    Grpc { client: crate::grpc::GrpcClient },
734}
735
736/// An active connection to a Geode database server.
737///
738/// A `Connection` represents a connection to the Geode server using either
739/// QUIC or gRPC transport. It provides methods for executing queries, managing
740/// transactions, and controlling the connection lifecycle.
741///
742/// # Transport Support
743///
744/// - **QUIC**: Uses a bidirectional stream with protobuf wire protocol
745/// - **gRPC**: Uses tonic-based gRPC client (requires `grpc` feature)
746///
747/// # Connection Lifecycle
748///
749/// 1. Create via [`Client::connect`]
750/// 2. Execute queries with [`query`](Connection::query) or [`query_with_params`](Connection::query_with_params)
751/// 3. Optionally use transactions with [`begin`](Connection::begin), [`commit`](Connection::commit), [`rollback`](Connection::rollback)
752/// 4. Close with [`close`](Connection::close)
753///
754/// # Example
755///
756/// ```no_run
757/// # use geode_client::Client;
758/// # async fn example() -> geode_client::Result<()> {
759/// let client = Client::new("localhost", 3141).skip_verify(true);
760/// let mut conn = client.connect().await?;
761///
762/// // Execute queries
763/// let (page, _) = conn.query("RETURN 42 AS answer").await?;
764/// println!("Answer: {}", page.rows[0].get("answer").unwrap().as_int()?);
765///
766/// // Use transactions
767/// conn.begin().await?;
768/// conn.query("CREATE (n:Node {id: 1})").await?;
769/// conn.commit().await?;
770///
771/// conn.close()?;
772/// # Ok(())
773/// # }
774/// ```
775///
776/// # Thread Safety
777///
778/// `Connection` is `!Sync` because the underlying transport streams are not thread-safe.
779/// For concurrent access, use [`ConnectionPool`](crate::ConnectionPool).
780pub struct Connection {
781    kind: ConnectionKind,
782    /// Page size for query results (reserved for future use)
783    #[allow(dead_code)]
784    page_size: usize,
785}
786
787impl Connection {
788    /// Create a new QUIC connection.
789    #[allow(clippy::too_many_arguments)]
790    async fn new_quic(
791        host: &str,
792        port: u16,
793        skip_verify: bool,
794        page_size: usize,
795        hello_name: &str,
796        hello_ver: &str,
797        conformance: &str,
798        username: Option<&str>,
799        password: Option<&str>,
800        connect_timeout_secs: u64,
801        hello_timeout_secs: u64,
802        idle_timeout_secs: u64,
803    ) -> Result<Self> {
804        let mut last_err: Option<Error> = None;
805
806        for attempt in 1..=3 {
807            match Self::connect_quic_once(
808                host,
809                port,
810                skip_verify,
811                page_size,
812                hello_name,
813                hello_ver,
814                conformance,
815                username,
816                password,
817                connect_timeout_secs,
818                hello_timeout_secs,
819                idle_timeout_secs,
820            )
821            .await
822            {
823                Ok(conn) => return Ok(conn),
824                Err(e) => {
825                    last_err = Some(e);
826                    if attempt < 3 {
827                        debug!("Connection attempt {} failed, retrying...", attempt);
828                        tokio::time::sleep(Duration::from_millis(150)).await;
829                    }
830                }
831            }
832        }
833
834        Err(last_err.unwrap_or_else(|| Error::connection("Failed to connect")))
835    }
836
837    /// Create a new gRPC connection.
838    #[cfg(feature = "grpc")]
839    #[allow(clippy::too_many_arguments)]
840    async fn new_grpc(
841        host: &str,
842        port: u16,
843        tls_enabled: bool,
844        skip_verify: bool,
845        page_size: usize,
846        username: Option<&str>,
847        password: Option<&str>,
848    ) -> Result<Self> {
849        use crate::dsn::Dsn;
850
851        // Build DSN for gRPC client
852        let tls_val = if tls_enabled { "1" } else { "0" };
853        let dsn_str = if let (Some(user), Some(pass)) = (username, password) {
854            format!(
855                "grpc://{}:{}@{}:{}?tls={}&insecure={}",
856                user, pass, host, port, tls_val, skip_verify
857            )
858        } else {
859            format!(
860                "grpc://{}:{}?tls={}&insecure={}",
861                host, port, tls_val, skip_verify
862            )
863        };
864
865        let dsn = Dsn::parse(&dsn_str)?;
866        let client = crate::grpc::GrpcClient::connect(&dsn).await?;
867
868        Ok(Self {
869            kind: ConnectionKind::Grpc { client },
870            page_size,
871        })
872    }
873
874    #[allow(clippy::too_many_arguments)]
875    async fn connect_quic_once(
876        host: &str,
877        port: u16,
878        skip_verify: bool,
879        page_size: usize,
880        _hello_name: &str,
881        _hello_ver: &str,
882        _conformance: &str,
883        username: Option<&str>,
884        password: Option<&str>,
885        connect_timeout_secs: u64,
886        _hello_timeout_secs: u64,
887        idle_timeout_secs: u64,
888    ) -> Result<Self> {
889        debug!("Creating connection to {}:{}", host, port);
890
891        // Install default crypto provider for rustls
892        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
893
894        // Build Quinn client config with TLS 1.3 explicitly (QUIC requires TLS 1.3)
895        let mut client_crypto = if skip_verify {
896            // SECURITY WARNING: Disabling TLS verification exposes connections to MITM attacks.
897            // Credentials sent in HELLO may be intercepted. Only use for development/testing.
898            warn!(
899                "TLS certificate verification DISABLED - connection to {}:{} is vulnerable to MITM attacks. \
900                 Do NOT use skip_verify in production!",
901                host, port
902            );
903            rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
904                .dangerous()
905                .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
906                .with_no_client_auth()
907        } else {
908            // Load system root certificates for proper TLS verification
909            let mut root_store = rustls::RootCertStore::empty();
910
911            let cert_result = rustls_native_certs::load_native_certs();
912
913            // Log any errors that occurred during certificate loading
914            for err in &cert_result.errors {
915                warn!("Error loading native certificate: {:?}", err);
916            }
917
918            let mut certs_loaded = 0;
919            let mut certs_failed = 0;
920
921            for cert in cert_result.certs {
922                match root_store.add(cert) {
923                    Ok(()) => certs_loaded += 1,
924                    Err(_) => certs_failed += 1,
925                }
926            }
927
928            if certs_loaded == 0 {
929                return Err(Error::tls(
930                    "No system root certificates found. TLS verification cannot proceed. \
931                     Either install system CA certificates or use skip_verify(true) for development only.",
932                ));
933            }
934
935            debug!(
936                "Loaded {} system root certificates ({} failed to parse)",
937                certs_loaded, certs_failed
938            );
939
940            rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
941                .with_root_certificates(root_store)
942                .with_no_client_auth()
943        };
944
945        // Set ALPN protocols
946        client_crypto.alpn_protocols = vec![GEODE_ALPN.to_vec()];
947
948        let mut client_config = ClientConfig::new(Arc::new(
949            quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto)
950                .map_err(|e| Error::connection(format!("Failed to create QUIC config: {}", e)))?,
951        ));
952
953        // Configure QUIC transport parameters to match Python/Go clients
954        let mut transport = quinn::TransportConfig::default();
955        // Cap idle timeout to quinn's maximum (2^62 - 1 microseconds ≈ 146 years)
956        // to prevent panic from VarInt overflow
957        let idle_timeout = Duration::from_secs(idle_timeout_secs.min(146_000 * 365 * 24 * 3600));
958        transport.max_idle_timeout(Some(idle_timeout.try_into().map_err(|_| {
959            Error::connection("Idle timeout value too large for QUIC protocol")
960        })?));
961        transport.keep_alive_interval(Some(Duration::from_secs(5)));
962        client_config.transport_config(Arc::new(transport));
963
964        // Create endpoint - "0.0.0.0:0" is a valid socket address literal that binds
965        // to any available port on all interfaces, so this parse cannot fail
966        let mut endpoint = Endpoint::client(
967            "0.0.0.0:0"
968                .parse()
969                .expect("0.0.0.0:0 is a valid socket address"),
970        )
971        .map_err(|e| Error::connection(format!("Failed to create endpoint: {}", e)))?;
972        endpoint.set_default_client_config(client_config);
973
974        // Resolve server address (supports hostnames as well as IP literals)
975        let mut resolved_addrs = format!("{}:{}", host, port)
976            .to_socket_addrs()
977            .map_err(|e| {
978                Error::connection(format!(
979                    "Failed to resolve address {}:{} - {}",
980                    host, port, e
981                ))
982            })?;
983
984        let server_addr: SocketAddr = resolved_addrs
985            .find(|addr| matches!(addr, SocketAddr::V4(_) | SocketAddr::V6(_)))
986            .ok_or_else(|| Error::connection("Invalid address: could not resolve host"))?;
987
988        debug!("Connecting to {}", server_addr);
989
990        // When skipping verification, don't use actual hostname for SNI
991        // This matches Python client behavior which avoids server_name when skip_verify=True
992        let server_name = if skip_verify {
993            "localhost" // Use generic name when skipping verification
994        } else {
995            host
996        };
997
998        trace!("Using server name for SNI: {}", server_name);
999
1000        let conn = timeout(
1001            Duration::from_secs(connect_timeout_secs),
1002            endpoint
1003                .connect(server_addr, server_name)
1004                .map_err(|e| Error::connection(format!("Connection failed: {}", e)))?,
1005        )
1006        .await
1007        .map_err(|_| Error::connection("Connection timeout"))?
1008        .map_err(|e| Error::connection(format!("Failed to establish connection: {}", e)))?;
1009
1010        debug!("Connection established to {}:{}", host, port);
1011
1012        // Open a single bidirectional stream used for the entire session.
1013        let (mut send, mut recv) = conn
1014            .open_bi()
1015            .await
1016            .map_err(|e| Error::connection(format!("Failed to open stream: {}", e)))?;
1017
1018        // Send HELLO message using protobuf with length prefix
1019        let hello_req = proto::HelloRequest {
1020            username: username.unwrap_or("").to_string(),
1021            password: password.unwrap_or("").to_string(),
1022            tenant_id: None,
1023            client_name: String::new(),
1024            client_version: String::new(),
1025            wanted_conformance: String::new(),
1026        };
1027        let msg = proto::QuicClientMessage {
1028            msg: Some(proto::quic_client_message::Msg::Hello(hello_req)),
1029        };
1030        let data = proto::encode_with_length_prefix(&msg);
1031
1032        send.write_all(&data)
1033            .await
1034            .map_err(|e| Error::connection(format!("Failed to send HELLO: {}", e)))?;
1035
1036        // Wait for HELLO response (length-prefixed protobuf)
1037        let mut length_buf = [0u8; 4];
1038        timeout(Duration::from_secs(5), recv.read_exact(&mut length_buf))
1039            .await
1040            .map_err(|_| Error::connection("HELLO response timeout"))?
1041            .map_err(|e| {
1042                Error::connection(format!("Failed to read HELLO response length: {}", e))
1043            })?;
1044
1045        let msg_len = u32::from_be_bytes(length_buf) as usize;
1046        let mut msg_buf = vec![0u8; msg_len];
1047        recv.read_exact(&mut msg_buf)
1048            .await
1049            .map_err(|e| Error::connection(format!("Failed to read HELLO response body: {}", e)))?;
1050
1051        let hello_response = proto::decode_quic_server_message(&msg_buf)?;
1052
1053        let session_id = match hello_response.msg {
1054            Some(proto::quic_server_message::Msg::Hello(ref hello_resp)) => {
1055                if !hello_resp.success {
1056                    return Err(Error::connection(format!(
1057                        "Authentication failed: {}",
1058                        hello_resp.error_message
1059                    )));
1060                }
1061                hello_resp.session_id.clone()
1062            }
1063            _ => {
1064                return Err(Error::connection("Expected HELLO response"));
1065            }
1066        };
1067
1068        debug!("HELLO handshake complete, session_id={}", session_id);
1069
1070        Ok(Self {
1071            kind: ConnectionKind::Quic {
1072                conn,
1073                send,
1074                recv,
1075                buffer: Vec::new(),
1076                next_request_id: 1,
1077                session_id,
1078            },
1079            page_size,
1080        })
1081    }
1082
1083    /// Send a protobuf message over QUIC.
1084    async fn send_proto_quic(
1085        send: &mut quinn::SendStream,
1086        msg: &proto::QuicClientMessage,
1087    ) -> Result<()> {
1088        let data = proto::encode_with_length_prefix(msg);
1089        send.write_all(&data)
1090            .await
1091            .map_err(|e| Error::connection(format!("Failed to send message: {}", e)))?;
1092        Ok(())
1093    }
1094
1095    /// Read a protobuf message over QUIC with timeout.
1096    async fn read_proto_quic(
1097        recv: &mut quinn::RecvStream,
1098        timeout_secs: u64,
1099    ) -> Result<proto::QuicServerMessage> {
1100        // Read 4-byte length prefix
1101        let mut length_buf = [0u8; 4];
1102        timeout(
1103            Duration::from_secs(timeout_secs),
1104            recv.read_exact(&mut length_buf),
1105        )
1106        .await
1107        .map_err(|_| Error::timeout())?
1108        .map_err(|e| Error::connection(format!("Failed to read response length: {}", e)))?;
1109
1110        let msg_len = u32::from_be_bytes(length_buf) as usize;
1111        let mut msg_buf = vec![0u8; msg_len];
1112        recv.read_exact(&mut msg_buf)
1113            .await
1114            .map_err(|e| Error::connection(format!("Failed to read response body: {}", e)))?;
1115
1116        proto::decode_quic_server_message(&msg_buf)
1117    }
1118
1119    /// Attempt to read a buffered protobuf message without blocking (QUIC).
1120    /// Returns Ok(None) if no complete message is available immediately.
1121    async fn try_read_proto_quic(
1122        recv: &mut quinn::RecvStream,
1123    ) -> Result<Option<proto::QuicServerMessage>> {
1124        // Try to read with a short timeout
1125        let mut length_buf = [0u8; 4];
1126        let read_result =
1127            timeout(Duration::from_millis(10), recv.read_exact(&mut length_buf)).await;
1128
1129        match read_result {
1130            Ok(Ok(())) => {
1131                let msg_len = u32::from_be_bytes(length_buf) as usize;
1132                let mut msg_buf = vec![0u8; msg_len];
1133                recv.read_exact(&mut msg_buf).await.map_err(|e| {
1134                    Error::connection(format!("Failed to read response body: {}", e))
1135                })?;
1136                let msg = proto::decode_quic_server_message(&msg_buf)?;
1137                Ok(Some(msg))
1138            }
1139            Ok(Err(e)) => Err(Error::connection(format!("Failed to read response: {}", e))),
1140            Err(_) => Ok(None), // Timeout - no data available
1141        }
1142    }
1143
1144    /// Parse protobuf rows into Value maps (static version for QUIC).
1145    fn parse_proto_rows_static(
1146        proto_rows: &[proto::Row],
1147        columns: &[Column],
1148    ) -> Result<Vec<HashMap<String, Value>>> {
1149        let mut rows = Vec::new();
1150        for proto_row in proto_rows {
1151            let mut row = HashMap::new();
1152            for (i, col) in columns.iter().enumerate() {
1153                let value = if i < proto_row.values.len() {
1154                    Self::convert_proto_value_static(&proto_row.values[i])
1155                } else {
1156                    Value::null()
1157                };
1158                row.insert(col.name.clone(), value);
1159            }
1160            rows.push(row);
1161        }
1162        Ok(rows)
1163    }
1164
1165    /// Convert a protobuf Value to our Value type (static version).
1166    fn convert_proto_value_static(proto_val: &proto::Value) -> Value {
1167        match &proto_val.kind {
1168            Some(proto::value::Kind::NullVal(_)) => Value::null(),
1169            Some(proto::value::Kind::StringVal(s)) => Value::string(s.value.clone()),
1170            Some(proto::value::Kind::IntVal(i)) => Value::int(i.value),
1171            Some(proto::value::Kind::DoubleVal(d)) => {
1172                Value::decimal(rust_decimal::Decimal::from_f64_retain(d.value).unwrap_or_default())
1173            }
1174            Some(proto::value::Kind::BoolVal(b)) => Value::bool(*b),
1175            Some(proto::value::Kind::ListVal(list)) => {
1176                let values: Vec<Value> = list
1177                    .values
1178                    .iter()
1179                    .map(Self::convert_proto_value_static)
1180                    .collect();
1181                Value::array(values)
1182            }
1183            Some(proto::value::Kind::MapVal(map)) => {
1184                let mut obj = std::collections::HashMap::new();
1185                for entry in &map.entries {
1186                    let val = entry
1187                        .value
1188                        .as_ref()
1189                        .map(Self::convert_proto_value_static)
1190                        .unwrap_or_else(Value::null);
1191                    obj.insert(entry.key.clone(), val);
1192                }
1193                Value::object(obj)
1194            }
1195            Some(proto::value::Kind::NodeVal(node)) => {
1196                let mut obj = std::collections::HashMap::new();
1197                obj.insert("id".to_string(), Value::int(node.id as i64));
1198                let labels: Vec<Value> = node
1199                    .labels
1200                    .iter()
1201                    .map(|l| Value::string(l.clone()))
1202                    .collect();
1203                obj.insert("labels".to_string(), Value::array(labels));
1204                let mut props = std::collections::HashMap::new();
1205                for entry in &node.properties {
1206                    let val = entry
1207                        .value
1208                        .as_ref()
1209                        .map(Self::convert_proto_value_static)
1210                        .unwrap_or_else(Value::null);
1211                    props.insert(entry.key.clone(), val);
1212                }
1213                obj.insert("properties".to_string(), Value::object(props));
1214                Value::object(obj)
1215            }
1216            Some(proto::value::Kind::EdgeVal(edge)) => {
1217                let mut obj = std::collections::HashMap::new();
1218                obj.insert("id".to_string(), Value::int(edge.id as i64));
1219                obj.insert("from_id".to_string(), Value::int(edge.from_id as i64));
1220                obj.insert("to_id".to_string(), Value::int(edge.to_id as i64));
1221                obj.insert("label".to_string(), Value::string(edge.label.clone()));
1222                let mut props = std::collections::HashMap::new();
1223                for entry in &edge.properties {
1224                    let val = entry
1225                        .value
1226                        .as_ref()
1227                        .map(Self::convert_proto_value_static)
1228                        .unwrap_or_else(Value::null);
1229                    props.insert(entry.key.clone(), val);
1230                }
1231                obj.insert("properties".to_string(), Value::object(props));
1232                Value::object(obj)
1233            }
1234            Some(proto::value::Kind::DecimalVal(d)) => {
1235                // Try to parse the decimal from coeff + scale
1236                if let Ok(dec) = d.coeff.parse::<rust_decimal::Decimal>() {
1237                    Value::decimal(dec)
1238                } else {
1239                    Value::string(d.orig_repr.clone())
1240                }
1241            }
1242            Some(proto::value::Kind::BytesVal(b)) => {
1243                Value::string(format!("\\x{}", hex::encode(&b.value)))
1244            }
1245            _ => Value::null(),
1246        }
1247    }
1248
1249    /// Send BEGIN transaction command over QUIC.
1250    async fn send_begin_quic(
1251        send: &mut quinn::SendStream,
1252        recv: &mut quinn::RecvStream,
1253        session_id: &str,
1254    ) -> Result<()> {
1255        let msg = proto::QuicClientMessage {
1256            msg: Some(proto::quic_client_message::Msg::Begin(
1257                proto::BeginRequest {
1258                    session_id: session_id.to_string(),
1259                    ..Default::default()
1260                },
1261            )),
1262        };
1263        Self::send_proto_quic(send, &msg).await?;
1264
1265        let resp = Self::read_proto_quic(recv, 5).await?;
1266        if !matches!(resp.msg, Some(proto::quic_server_message::Msg::Begin(_))) {
1267            return Err(Error::protocol("Expected BEGIN response"));
1268        }
1269        Ok(())
1270    }
1271
1272    /// Send COMMIT transaction command over QUIC.
1273    async fn send_commit_quic(
1274        send: &mut quinn::SendStream,
1275        recv: &mut quinn::RecvStream,
1276        session_id: &str,
1277    ) -> Result<()> {
1278        let msg = proto::QuicClientMessage {
1279            msg: Some(proto::quic_client_message::Msg::Commit(
1280                proto::CommitRequest {
1281                    session_id: session_id.to_string(),
1282                },
1283            )),
1284        };
1285        Self::send_proto_quic(send, &msg).await?;
1286
1287        let resp = Self::read_proto_quic(recv, 5).await?;
1288        if !matches!(resp.msg, Some(proto::quic_server_message::Msg::Commit(_))) {
1289            return Err(Error::protocol("Expected COMMIT response"));
1290        }
1291        Ok(())
1292    }
1293
1294    /// Send ROLLBACK transaction command over QUIC.
1295    async fn send_rollback_quic(
1296        send: &mut quinn::SendStream,
1297        recv: &mut quinn::RecvStream,
1298        session_id: &str,
1299    ) -> Result<()> {
1300        let msg = proto::QuicClientMessage {
1301            msg: Some(proto::quic_client_message::Msg::Rollback(
1302                proto::RollbackRequest {
1303                    session_id: session_id.to_string(),
1304                },
1305            )),
1306        };
1307        Self::send_proto_quic(send, &msg).await?;
1308
1309        let resp = Self::read_proto_quic(recv, 5).await?;
1310        if !matches!(resp.msg, Some(proto::quic_server_message::Msg::Rollback(_))) {
1311            return Err(Error::protocol("Expected ROLLBACK response"));
1312        }
1313        Ok(())
1314    }
1315
1316    /// Execute a GQL query without parameters.
1317    ///
1318    /// # Arguments
1319    ///
1320    /// * `gql` - The GQL query string
1321    ///
1322    /// # Returns
1323    ///
1324    /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1325    /// and the optional string contains any query warnings.
1326    ///
1327    /// # Errors
1328    ///
1329    /// Returns [`Error::Query`] if the query fails to execute.
1330    ///
1331    /// # Example
1332    ///
1333    /// ```no_run
1334    /// # use geode_client::Client;
1335    /// # async fn example() -> geode_client::Result<()> {
1336    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1337    /// # let mut conn = client.connect().await?;
1338    /// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name LIMIT 10").await?;
1339    /// for row in &page.rows {
1340    ///     println!("Name: {}", row.get("name").unwrap().as_string()?);
1341    /// }
1342    /// # Ok(())
1343    /// # }
1344    /// ```
1345    pub async fn query(&mut self, gql: &str) -> Result<(Page, Option<String>)> {
1346        self.query_with_params(gql, &HashMap::new()).await
1347    }
1348
1349    /// Execute a GQL query with parameters.
1350    ///
1351    /// Parameters are substituted for `$param_name` placeholders in the query.
1352    /// This is the recommended way to include dynamic values in queries, as it
1353    /// prevents injection attacks and allows query plan caching.
1354    ///
1355    /// # Arguments
1356    ///
1357    /// * `gql` - The GQL query string with parameter placeholders
1358    /// * `params` - A map of parameter names to values
1359    ///
1360    /// # Returns
1361    ///
1362    /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1363    /// and the optional string contains any query warnings.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns [`Error::Query`] if the query fails to execute.
1368    ///
1369    /// # Example
1370    ///
1371    /// ```no_run
1372    /// # use geode_client::{Client, Value};
1373    /// # use std::collections::HashMap;
1374    /// # async fn example() -> geode_client::Result<()> {
1375    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1376    /// # let mut conn = client.connect().await?;
1377    /// let mut params = HashMap::new();
1378    /// params.insert("name".to_string(), Value::string("Alice"));
1379    /// params.insert("min_age".to_string(), Value::int(25));
1380    ///
1381    /// let (page, _) = conn.query_with_params(
1382    ///     "MATCH (p:Person {name: $name}) WHERE p.age >= $min_age RETURN p",
1383    ///     &params
1384    /// ).await?;
1385    /// # Ok(())
1386    /// # }
1387    /// ```
1388    pub async fn query_with_params(
1389        &mut self,
1390        gql: &str,
1391        params: &HashMap<String, Value>,
1392    ) -> Result<(Page, Option<String>)> {
1393        match &mut self.kind {
1394            ConnectionKind::Quic {
1395                send,
1396                recv,
1397                session_id,
1398                ..
1399            } => Self::query_with_params_quic(send, recv, gql, params, session_id).await,
1400            #[cfg(feature = "grpc")]
1401            ConnectionKind::Grpc { client } => client.query_with_params(gql, params).await,
1402        }
1403    }
1404
1405    /// Execute a GQL query with parameters over QUIC transport.
1406    async fn query_with_params_quic(
1407        send: &mut quinn::SendStream,
1408        recv: &mut quinn::RecvStream,
1409        gql: &str,
1410        params: &HashMap<String, Value>,
1411        session_id: &str,
1412    ) -> Result<(Page, Option<String>)> {
1413        // Convert types::Value to proto::Param entries
1414        let params_proto: Vec<proto::Param> = params
1415            .iter()
1416            .map(|(k, v)| proto::Param {
1417                name: k.clone(),
1418                value: Some(v.to_proto_value()),
1419            })
1420            .collect();
1421
1422        // Send Execute request via protobuf
1423        let exec_req = proto::ExecuteRequest {
1424            session_id: session_id.to_string(),
1425            query: gql.to_string(),
1426            params: params_proto,
1427        };
1428        let msg = proto::QuicClientMessage {
1429            msg: Some(proto::quic_client_message::Msg::Execute(exec_req)),
1430        };
1431        Self::send_proto_quic(send, &msg)
1432            .await
1433            .map_err(|e| Error::query(format!("{}", e)))?;
1434
1435        // Read first response (should be schema or error)
1436        let resp = Self::read_proto_quic(recv, 10).await?;
1437
1438        let exec_resp = match resp.msg {
1439            Some(proto::quic_server_message::Msg::Execute(e)) => e,
1440            _ => return Err(Error::protocol("Expected Execute response")),
1441        };
1442
1443        // Check for error in payload
1444        if let Some(proto::execution_response::Payload::Error(ref err)) = exec_resp.payload {
1445            // Drain any follow-up messages (e.g., data page) the server may send after error
1446            let _ = Self::try_read_proto_quic(recv).await;
1447            return Err(Error::Query {
1448                code: err.code.clone(),
1449                message: err.message.clone(),
1450            });
1451        }
1452
1453        // Parse columns from schema payload
1454        let columns: Vec<Column> = match exec_resp.payload {
1455            Some(proto::execution_response::Payload::Schema(ref s)) => s
1456                .columns
1457                .iter()
1458                .map(|c| Column {
1459                    name: c.name.clone(),
1460                    col_type: c.r#type.clone(),
1461                })
1462                .collect(),
1463            _ => Vec::new(),
1464        };
1465
1466        trace!("Schema columns: {:?}", columns);
1467
1468        // Check for inline data page
1469        if let Some(inline_resp) = Self::try_read_proto_quic(recv).await? {
1470            if let Some(proto::quic_server_message::Msg::Execute(inline_exec)) = inline_resp.msg {
1471                if let Some(proto::execution_response::Payload::Error(ref err)) =
1472                    inline_exec.payload
1473                {
1474                    return Err(Error::Query {
1475                        code: err.code.clone(),
1476                        message: err.message.clone(),
1477                    });
1478                }
1479
1480                if let Some(proto::execution_response::Payload::Page(ref page_data)) =
1481                    inline_exec.payload
1482                {
1483                    let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1484                    let page = Page {
1485                        columns,
1486                        rows,
1487                        ordered: page_data.ordered,
1488                        order_keys: page_data.order_keys.clone(),
1489                        final_page: page_data.r#final,
1490                    };
1491                    return Ok((page, None));
1492                }
1493
1494                // Metrics or heartbeat - empty result
1495                let page = Page {
1496                    columns,
1497                    rows: Vec::new(),
1498                    ordered: false,
1499                    order_keys: Vec::new(),
1500                    final_page: true,
1501                };
1502                return Ok((page, None));
1503            }
1504        }
1505
1506        // Check if we got a page in the first response
1507        if let Some(proto::execution_response::Payload::Page(ref page_data)) = exec_resp.payload {
1508            let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1509            let page = Page {
1510                columns,
1511                rows,
1512                ordered: page_data.ordered,
1513                order_keys: page_data.order_keys.clone(),
1514                final_page: page_data.r#final,
1515            };
1516            return Ok((page, None));
1517        }
1518
1519        // Read next response for data page
1520        let resp = Self::read_proto_quic(recv, 30).await?;
1521        if let Some(proto::quic_server_message::Msg::Execute(exec_resp)) = resp.msg {
1522            if let Some(proto::execution_response::Payload::Error(ref err)) = exec_resp.payload {
1523                return Err(Error::Query {
1524                    code: err.code.clone(),
1525                    message: err.message.clone(),
1526                });
1527            }
1528
1529            if let Some(proto::execution_response::Payload::Page(ref page_data)) = exec_resp.payload
1530            {
1531                let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1532                let page = Page {
1533                    columns,
1534                    rows,
1535                    ordered: page_data.ordered,
1536                    order_keys: page_data.order_keys.clone(),
1537                    final_page: page_data.r#final,
1538                };
1539                return Ok((page, None));
1540            }
1541        }
1542
1543        // Empty result
1544        let page = Page {
1545            columns,
1546            rows: Vec::new(),
1547            ordered: false,
1548            order_keys: Vec::new(),
1549            final_page: true,
1550        };
1551
1552        Ok((page, None))
1553    }
1554
1555    /// Execute a query without parameters (synchronous-style blocking version for test runner)
1556    pub fn query_sync(
1557        &mut self,
1558        gql: &str,
1559        params: Option<HashMap<String, serde_json::Value>>,
1560    ) -> Result<Page> {
1561        let params_map = params.unwrap_or_default();
1562        let params_typed: HashMap<String, Value> = params_map
1563            .into_iter()
1564            .map(|(k, v)| {
1565                let typed_val = crate::types::Value::from_json(v);
1566                (k, typed_val)
1567            })
1568            .collect();
1569
1570        match tokio::runtime::Handle::try_current() {
1571            Ok(handle) => {
1572                let (page, _cursor) =
1573                    handle.block_on(self.query_with_params(gql, &params_typed))?;
1574                Ok(page)
1575            }
1576            Err(_) => {
1577                let rt = tokio::runtime::Runtime::new()
1578                    .map_err(|e| Error::query(format!("Failed to create runtime: {}", e)))?;
1579                let (page, _cursor) = rt.block_on(self.query_with_params(gql, &params_typed))?;
1580                Ok(page)
1581            }
1582        }
1583    }
1584
1585    /// Begin a new transaction.
1586    ///
1587    /// After calling `begin`, all queries will be part of the transaction until
1588    /// [`commit`](Connection::commit) or [`rollback`](Connection::rollback) is called.
1589    ///
1590    /// # Errors
1591    ///
1592    /// Returns an error if a transaction is already in progress or if the
1593    /// server rejects the request.
1594    ///
1595    /// # Example
1596    ///
1597    /// ```no_run
1598    /// # use geode_client::Client;
1599    /// # async fn example() -> geode_client::Result<()> {
1600    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1601    /// # let mut conn = client.connect().await?;
1602    /// conn.begin().await?;
1603    /// conn.query("CREATE (n:Node {id: 1})").await?;
1604    /// conn.query("CREATE (n:Node {id: 2})").await?;
1605    /// conn.commit().await?;  // Both nodes are now persisted
1606    /// # Ok(())
1607    /// # }
1608    /// ```
1609    pub async fn begin(&mut self) -> Result<()> {
1610        match &mut self.kind {
1611            ConnectionKind::Quic {
1612                send,
1613                recv,
1614                session_id,
1615                ..
1616            } => Self::send_begin_quic(send, recv, session_id).await,
1617            #[cfg(feature = "grpc")]
1618            ConnectionKind::Grpc { client } => client.begin().await,
1619        }
1620    }
1621
1622    /// Commit the current transaction.
1623    ///
1624    /// Persists all changes made since [`begin`](Connection::begin) was called.
1625    ///
1626    /// # Errors
1627    ///
1628    /// Returns an error if no transaction is in progress or if the server
1629    /// rejects the commit.
1630    ///
1631    /// # Example
1632    ///
1633    /// ```no_run
1634    /// # use geode_client::Client;
1635    /// # async fn example() -> geode_client::Result<()> {
1636    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1637    /// # let mut conn = client.connect().await?;
1638    /// conn.begin().await?;
1639    /// conn.query("CREATE (n:Node)").await?;
1640    /// conn.commit().await?;  // Changes are now permanent
1641    /// # Ok(())
1642    /// # }
1643    /// ```
1644    pub async fn commit(&mut self) -> Result<()> {
1645        match &mut self.kind {
1646            ConnectionKind::Quic {
1647                send,
1648                recv,
1649                session_id,
1650                ..
1651            } => Self::send_commit_quic(send, recv, session_id).await,
1652            #[cfg(feature = "grpc")]
1653            ConnectionKind::Grpc { client } => client.commit().await,
1654        }
1655    }
1656
1657    /// Rollback the current transaction.
1658    ///
1659    /// Discards all changes made since [`begin`](Connection::begin) was called.
1660    ///
1661    /// # Errors
1662    ///
1663    /// Returns an error if no transaction is in progress.
1664    ///
1665    /// # Example
1666    ///
1667    /// ```no_run
1668    /// # use geode_client::Client;
1669    /// # async fn example() -> geode_client::Result<()> {
1670    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1671    /// # let mut conn = client.connect().await?;
1672    /// conn.begin().await?;
1673    /// match conn.query("CREATE (n:InvalidNode)").await {
1674    ///     Ok(_) => conn.commit().await?,
1675    ///     Err(_) => conn.rollback().await?,  // Undo everything
1676    /// }
1677    /// # Ok(())
1678    /// # }
1679    /// ```
1680    pub async fn rollback(&mut self) -> Result<()> {
1681        match &mut self.kind {
1682            ConnectionKind::Quic {
1683                send,
1684                recv,
1685                session_id,
1686                ..
1687            } => Self::send_rollback_quic(send, recv, session_id).await,
1688            #[cfg(feature = "grpc")]
1689            ConnectionKind::Grpc { client } => client.rollback().await,
1690        }
1691    }
1692
1693    /// Create a prepared statement for efficient repeated execution.
1694    ///
1695    /// Prepared statements allow you to define a query once and execute it
1696    /// multiple times with different parameters. The query text is parsed
1697    /// to extract parameter names (tokens starting with `$`).
1698    ///
1699    /// # Arguments
1700    ///
1701    /// * `query` - The GQL query string with parameter placeholders
1702    ///
1703    /// # Returns
1704    ///
1705    /// A [`PreparedStatement`] that can be executed multiple times.
1706    ///
1707    /// # Example
1708    ///
1709    /// ```no_run
1710    /// # use geode_client::{Client, Value};
1711    /// # use std::collections::HashMap;
1712    /// # async fn example() -> geode_client::Result<()> {
1713    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1714    /// # let mut conn = client.connect().await?;
1715    /// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p.name")?;
1716    ///
1717    /// for id in 1..=100 {
1718    ///     let mut params = HashMap::new();
1719    ///     params.insert("id".to_string(), Value::int(id));
1720    ///     let (page, _) = stmt.execute(&mut conn, &params).await?;
1721    ///     // Process results...
1722    /// }
1723    /// # Ok(())
1724    /// # }
1725    /// ```
1726    pub fn prepare(&self, query: &str) -> Result<PreparedStatement> {
1727        Ok(PreparedStatement::new(query))
1728    }
1729
1730    /// Get the execution plan for a query without running it.
1731    ///
1732    /// This is useful for understanding how the database will execute a query
1733    /// and for identifying potential performance issues.
1734    ///
1735    /// # Arguments
1736    ///
1737    /// * `gql` - The GQL query string to explain
1738    ///
1739    /// # Returns
1740    ///
1741    /// A [`QueryPlan`] containing the execution plan details.
1742    ///
1743    /// # Errors
1744    ///
1745    /// Returns an error if the query is invalid or cannot be planned.
1746    ///
1747    /// # Example
1748    ///
1749    /// ```no_run
1750    /// # use geode_client::Client;
1751    /// # async fn example() -> geode_client::Result<()> {
1752    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1753    /// # let mut conn = client.connect().await?;
1754    /// let plan = conn.explain("MATCH (p:Person)-[:KNOWS]->(f) RETURN f").await?;
1755    /// println!("Estimated rows: {}", plan.estimated_rows);
1756    /// for op in &plan.operations {
1757    ///     println!("  {} - {}", op.op_type, op.description);
1758    /// }
1759    /// # Ok(())
1760    /// # }
1761    /// ```
1762    pub async fn explain(&mut self, gql: &str) -> Result<QueryPlan> {
1763        // Execute EXPLAIN as a query via protobuf
1764        let explain_query = format!("EXPLAIN {}", gql);
1765        let (_page, _) = self.query(&explain_query).await?;
1766
1767        // Parse the plan from the response
1768        // The result format depends on server implementation
1769        Ok(QueryPlan {
1770            operations: Vec::new(),
1771            estimated_rows: 0,
1772            raw: serde_json::json!({}),
1773        })
1774    }
1775
1776    /// Execute a query and return the execution profile with timing information.
1777    ///
1778    /// This runs the query and collects detailed execution statistics including
1779    /// actual row counts and timing for each operation.
1780    ///
1781    /// # Arguments
1782    ///
1783    /// * `gql` - The GQL query string to profile
1784    ///
1785    /// # Returns
1786    ///
1787    /// A [`QueryProfile`] containing the execution plan and runtime statistics.
1788    ///
1789    /// # Errors
1790    ///
1791    /// Returns an error if the query fails to execute.
1792    ///
1793    /// # Example
1794    ///
1795    /// ```no_run
1796    /// # use geode_client::Client;
1797    /// # async fn example() -> geode_client::Result<()> {
1798    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1799    /// # let mut conn = client.connect().await?;
1800    /// let profile = conn.profile("MATCH (p:Person) RETURN p LIMIT 100").await?;
1801    /// println!("Execution time: {:.2}ms", profile.execution_time_ms);
1802    /// println!("Actual rows: {}", profile.actual_rows);
1803    /// # Ok(())
1804    /// # }
1805    /// ```
1806    pub async fn profile(&mut self, gql: &str) -> Result<QueryProfile> {
1807        // Execute PROFILE as a query via protobuf
1808        let profile_query = format!("PROFILE {}", gql);
1809        let (page, _) = self.query(&profile_query).await?;
1810
1811        // Parse the profile from the response
1812        let plan = QueryPlan {
1813            operations: Vec::new(),
1814            estimated_rows: 0,
1815            raw: serde_json::json!({}),
1816        };
1817
1818        Ok(QueryProfile {
1819            plan,
1820            actual_rows: page.rows.len() as u64,
1821            execution_time_ms: 0.0,
1822            raw: serde_json::json!({}),
1823        })
1824    }
1825
1826    /// Execute multiple queries in a batch.
1827    ///
1828    /// This is more efficient than executing queries one at a time when you
1829    /// have multiple independent queries to run.
1830    ///
1831    /// # Arguments
1832    ///
1833    /// * `queries` - A slice of (query, optional params) tuples
1834    ///
1835    /// # Returns
1836    ///
1837    /// A `Vec<Page>` with results for each query, in the same order as input.
1838    ///
1839    /// # Errors
1840    ///
1841    /// Returns an error if any query fails. Queries are executed in order,
1842    /// so earlier queries may have completed before the error.
1843    ///
1844    /// # Example
1845    ///
1846    /// ```no_run
1847    /// # use geode_client::Client;
1848    /// # async fn example() -> geode_client::Result<()> {
1849    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1850    /// # let mut conn = client.connect().await?;
1851    /// let results = conn.batch(&[
1852    ///     ("MATCH (n:Person) RETURN count(n)", None),
1853    ///     ("MATCH (n:Company) RETURN count(n)", None),
1854    ///     ("MATCH ()-[r:WORKS_AT]->() RETURN count(r)", None),
1855    /// ]).await?;
1856    ///
1857    /// for (i, page) in results.iter().enumerate() {
1858    ///     println!("Query {}: {} rows", i + 1, page.rows.len());
1859    /// }
1860    /// # Ok(())
1861    /// # }
1862    /// ```
1863    pub async fn batch(
1864        &mut self,
1865        queries: &[(&str, Option<&HashMap<String, Value>>)],
1866    ) -> Result<Vec<Page>> {
1867        let mut results = Vec::with_capacity(queries.len());
1868
1869        for (query, params) in queries {
1870            let (page, _) = match params {
1871                Some(p) => self.query_with_params(query, p).await?,
1872                None => self.query(query).await?,
1873            };
1874            results.push(page);
1875        }
1876
1877        Ok(results)
1878    }
1879
1880    /// Parse plan operations from a server response.
1881    /// Reserved for future EXPLAIN/PROFILE response parsing.
1882    #[allow(dead_code)]
1883    fn parse_plan_operations(result: &serde_json::Value) -> Vec<PlanOperation> {
1884        let mut operations = Vec::new();
1885
1886        if let Some(ops) = result.get("operations").and_then(|o| o.as_array()) {
1887            for op in ops {
1888                operations.push(Self::parse_single_operation(op));
1889            }
1890        } else if let Some(plan) = result.get("plan") {
1891            // Alternative format: single "plan" object
1892            operations.push(Self::parse_single_operation(plan));
1893        }
1894
1895        operations
1896    }
1897
1898    /// Parse a single operation from JSON.
1899    #[allow(dead_code)]
1900    fn parse_single_operation(op: &serde_json::Value) -> PlanOperation {
1901        let op_type = op
1902            .get("type")
1903            .or_else(|| op.get("op_type"))
1904            .and_then(|t| t.as_str())
1905            .unwrap_or("Unknown")
1906            .to_string();
1907
1908        let description = op
1909            .get("description")
1910            .or_else(|| op.get("desc"))
1911            .and_then(|d| d.as_str())
1912            .unwrap_or("")
1913            .to_string();
1914
1915        let estimated_rows = op
1916            .get("estimated_rows")
1917            .or_else(|| op.get("rows"))
1918            .and_then(|r| r.as_u64());
1919
1920        let children = op
1921            .get("children")
1922            .and_then(|c| c.as_array())
1923            .map(|arr| arr.iter().map(Self::parse_single_operation).collect())
1924            .unwrap_or_default();
1925
1926        PlanOperation {
1927            op_type,
1928            description,
1929            estimated_rows,
1930            children,
1931        }
1932    }
1933
1934    /// Close the connection.
1935    ///
1936    /// Gracefully closes the connection. After calling this method,
1937    /// the connection can no longer be used.
1938    ///
1939    /// # Note
1940    ///
1941    /// It's good practice to explicitly close connections, but they will also
1942    /// be closed when dropped.
1943    ///
1944    /// # Example
1945    ///
1946    /// ```no_run
1947    /// # use geode_client::Client;
1948    /// # async fn example() -> geode_client::Result<()> {
1949    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1950    /// let mut conn = client.connect().await?;
1951    /// // ... use connection ...
1952    /// conn.close()?;
1953    /// # Ok(())
1954    /// # }
1955    /// ```
1956    pub fn close(&mut self) -> Result<()> {
1957        match &mut self.kind {
1958            ConnectionKind::Quic { conn, .. } => {
1959                // QUIC close is asynchronous and best-effort - the CONNECTION_CLOSE frame
1960                // will be sent by Quinn's internal I/O handling. No blocking delay needed.
1961                // (Gap #17: Removed std::thread::sleep that blocked the async runtime)
1962                conn.close(0u32.into(), b"client closing");
1963                Ok(())
1964            }
1965            #[cfg(feature = "grpc")]
1966            ConnectionKind::Grpc { client } => client.close(),
1967        }
1968    }
1969
1970    /// Check if the connection is still healthy.
1971    ///
1972    /// Returns `true` if the underlying QUIC connection is still open and usable.
1973    /// This is used by connection pools to verify connections before reuse.
1974    ///
1975    /// # Example
1976    ///
1977    /// ```ignore
1978    /// let mut conn = client.connect().await?;
1979    /// if conn.is_healthy() {
1980    ///     // Connection is still usable
1981    ///     let (page, _) = conn.query("RETURN 1").await?;
1982    /// }
1983    /// ```
1984    pub fn is_healthy(&self) -> bool {
1985        match &self.kind {
1986            ConnectionKind::Quic { conn, .. } => {
1987                // Quinn's close_reason() returns Some if the connection was closed
1988                conn.close_reason().is_none()
1989            }
1990            #[cfg(feature = "grpc")]
1991            ConnectionKind::Grpc { .. } => {
1992                // gRPC connections are managed by tonic, always report healthy
1993                true
1994            }
1995        }
1996    }
1997}
1998
1999/// Certificate verifier that skips all verification (INSECURE - for development only)
2000#[derive(Debug)]
2001struct SkipServerVerification;
2002
2003impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
2004    fn verify_server_cert(
2005        &self,
2006        _end_entity: &CertificateDer,
2007        _intermediates: &[CertificateDer],
2008        _server_name: &RustlsServerName,
2009        _ocsp_response: &[u8],
2010        _now: rustls::pki_types::UnixTime,
2011    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
2012        Ok(rustls::client::danger::ServerCertVerified::assertion())
2013    }
2014
2015    fn verify_tls12_signature(
2016        &self,
2017        _message: &[u8],
2018        _cert: &CertificateDer,
2019        _dss: &rustls::DigitallySignedStruct,
2020    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
2021        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
2022    }
2023
2024    fn verify_tls13_signature(
2025        &self,
2026        _message: &[u8],
2027        _cert: &CertificateDer,
2028        _dss: &rustls::DigitallySignedStruct,
2029    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
2030        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
2031    }
2032
2033    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
2034        vec![
2035            rustls::SignatureScheme::RSA_PKCS1_SHA256,
2036            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
2037            rustls::SignatureScheme::ED25519,
2038        ]
2039    }
2040}
2041
2042#[cfg(test)]
2043mod tests {
2044    use super::*;
2045
2046    // PreparedStatement tests
2047
2048    #[test]
2049    fn test_prepared_statement_new() {
2050        let stmt = PreparedStatement::new("MATCH (n:Person {id: $id}) RETURN n");
2051        assert_eq!(stmt.query(), "MATCH (n:Person {id: $id}) RETURN n");
2052        assert_eq!(stmt.param_names(), &["id"]);
2053    }
2054
2055    #[test]
2056    fn test_prepared_statement_multiple_params() {
2057        let stmt = PreparedStatement::new(
2058            "MATCH (p:Person {name: $name}) WHERE p.age > $min_age AND p.city = $city RETURN p",
2059        );
2060        assert!(stmt.query().contains("$name"));
2061        let names = stmt.param_names();
2062        assert_eq!(names.len(), 3);
2063        assert!(names.contains(&"name".to_string()));
2064        assert!(names.contains(&"min_age".to_string()));
2065        assert!(names.contains(&"city".to_string()));
2066    }
2067
2068    #[test]
2069    fn test_prepared_statement_no_params() {
2070        let stmt = PreparedStatement::new("MATCH (n) RETURN n LIMIT 10");
2071        assert!(stmt.param_names().is_empty());
2072    }
2073
2074    #[test]
2075    fn test_prepared_statement_duplicate_params() {
2076        let stmt =
2077            PreparedStatement::new("MATCH (a {id: $id})-[:KNOWS]->(b {id: $id}) RETURN a, b");
2078        // Should deduplicate parameter names
2079        assert_eq!(stmt.param_names(), &["id"]);
2080    }
2081
2082    #[test]
2083    fn test_prepared_statement_underscore_params() {
2084        let stmt = PreparedStatement::new("MATCH (n {user_id: $user_id}) RETURN n");
2085        assert_eq!(stmt.param_names(), &["user_id"]);
2086    }
2087
2088    #[test]
2089    fn test_prepared_statement_numeric_params() {
2090        let stmt = PreparedStatement::new("RETURN $param1, $param2, $param123");
2091        let names = stmt.param_names();
2092        assert_eq!(names.len(), 3);
2093        assert!(names.contains(&"param1".to_string()));
2094        assert!(names.contains(&"param2".to_string()));
2095        assert!(names.contains(&"param123".to_string()));
2096    }
2097
2098    // PlanOperation tests
2099
2100    #[test]
2101    fn test_plan_operation_struct() {
2102        let op = PlanOperation {
2103            op_type: "NodeScan".to_string(),
2104            description: "Scan Person nodes".to_string(),
2105            estimated_rows: Some(100),
2106            children: vec![],
2107        };
2108        assert_eq!(op.op_type, "NodeScan");
2109        assert_eq!(op.description, "Scan Person nodes");
2110        assert_eq!(op.estimated_rows, Some(100));
2111        assert!(op.children.is_empty());
2112    }
2113
2114    #[test]
2115    fn test_plan_operation_with_children() {
2116        let child = PlanOperation {
2117            op_type: "Filter".to_string(),
2118            description: "Filter by age".to_string(),
2119            estimated_rows: Some(50),
2120            children: vec![],
2121        };
2122        let parent = PlanOperation {
2123            op_type: "Projection".to_string(),
2124            description: "Project name, age".to_string(),
2125            estimated_rows: Some(50),
2126            children: vec![child],
2127        };
2128        assert_eq!(parent.children.len(), 1);
2129        assert_eq!(parent.children[0].op_type, "Filter");
2130    }
2131
2132    // QueryPlan tests
2133
2134    #[test]
2135    fn test_query_plan_struct() {
2136        let plan = QueryPlan {
2137            operations: vec![PlanOperation {
2138                op_type: "NodeScan".to_string(),
2139                description: "Full scan".to_string(),
2140                estimated_rows: Some(1000),
2141                children: vec![],
2142            }],
2143            estimated_rows: 1000,
2144            raw: serde_json::json!({"type": "plan"}),
2145        };
2146        assert_eq!(plan.operations.len(), 1);
2147        assert_eq!(plan.estimated_rows, 1000);
2148    }
2149
2150    // QueryProfile tests
2151
2152    #[test]
2153    fn test_query_profile_struct() {
2154        let plan = QueryPlan {
2155            operations: vec![],
2156            estimated_rows: 100,
2157            raw: serde_json::json!({}),
2158        };
2159        let profile = QueryProfile {
2160            plan,
2161            actual_rows: 95,
2162            execution_time_ms: 12.5,
2163            raw: serde_json::json!({"type": "profile"}),
2164        };
2165        assert_eq!(profile.actual_rows, 95);
2166        assert!((profile.execution_time_ms - 12.5).abs() < 0.001);
2167    }
2168
2169    // Page tests
2170
2171    #[test]
2172    fn test_page_struct() {
2173        let page = Page {
2174            columns: vec![Column {
2175                name: "x".to_string(),
2176                col_type: "INT".to_string(),
2177            }],
2178            rows: vec![],
2179            ordered: false,
2180            order_keys: vec![],
2181            final_page: true,
2182        };
2183        assert_eq!(page.columns.len(), 1);
2184        assert!(page.rows.is_empty());
2185        assert!(page.final_page);
2186    }
2187
2188    // Column tests
2189
2190    #[test]
2191    fn test_column_struct() {
2192        let col = Column {
2193            name: "age".to_string(),
2194            col_type: "INT".to_string(),
2195        };
2196        assert_eq!(col.name, "age");
2197        assert_eq!(col.col_type, "INT");
2198    }
2199
2200    // Savepoint tests
2201
2202    #[test]
2203    fn test_savepoint_struct() {
2204        let sp = Savepoint {
2205            name: "before_update".to_string(),
2206        };
2207        assert_eq!(sp.name, "before_update");
2208    }
2209
2210    // Client builder tests
2211
2212    #[test]
2213    fn test_client_builder_defaults() {
2214        let _client = Client::new("localhost", 3141);
2215        // Test passes if it compiles - verifies defaults work
2216    }
2217
2218    #[test]
2219    fn test_client_builder_chain() {
2220        let _client = Client::new("example.com", 8443)
2221            .skip_verify(true)
2222            .page_size(500)
2223            .client_name("test-app")
2224            .client_version("2.0.0")
2225            .conformance("full");
2226        // Test passes if it compiles - verifies builder chain works
2227    }
2228
2229    #[test]
2230    fn test_client_clone() {
2231        let client = Client::new("localhost", 3141).skip_verify(true);
2232        let _cloned = client.clone();
2233        // Test passes if it compiles - verifies Clone is implemented
2234    }
2235
2236    // parse_plan_operations tests
2237
2238    #[test]
2239    fn test_parse_plan_operations_empty() {
2240        let result = serde_json::json!({});
2241        let ops = Connection::parse_plan_operations(&result);
2242        assert!(ops.is_empty());
2243    }
2244
2245    #[test]
2246    fn test_parse_plan_operations_array() {
2247        let result = serde_json::json!({
2248            "operations": [
2249                {"type": "NodeScan", "description": "Scan nodes", "estimated_rows": 100},
2250                {"type": "Filter", "description": "Apply filter", "estimated_rows": 50}
2251            ]
2252        });
2253        let ops = Connection::parse_plan_operations(&result);
2254        assert_eq!(ops.len(), 2);
2255        assert_eq!(ops[0].op_type, "NodeScan");
2256        assert_eq!(ops[1].op_type, "Filter");
2257    }
2258
2259    #[test]
2260    fn test_parse_plan_operations_single_plan() {
2261        let result = serde_json::json!({
2262            "plan": {"op_type": "FullScan", "desc": "Full table scan"}
2263        });
2264        let ops = Connection::parse_plan_operations(&result);
2265        assert_eq!(ops.len(), 1);
2266        assert_eq!(ops[0].op_type, "FullScan");
2267        assert_eq!(ops[0].description, "Full table scan");
2268    }
2269
2270    #[test]
2271    fn test_parse_single_operation() {
2272        let op_json = serde_json::json!({
2273            "type": "IndexScan",
2274            "description": "Use index on Person(name)",
2275            "estimated_rows": 25,
2276            "children": [
2277                {"type": "Filter", "description": "Filter results"}
2278            ]
2279        });
2280        let op = Connection::parse_single_operation(&op_json);
2281        assert_eq!(op.op_type, "IndexScan");
2282        assert_eq!(op.description, "Use index on Person(name)");
2283        assert_eq!(op.estimated_rows, Some(25));
2284        assert_eq!(op.children.len(), 1);
2285        assert_eq!(op.children[0].op_type, "Filter");
2286    }
2287
2288    #[test]
2289    fn test_parse_single_operation_minimal() {
2290        let op_json = serde_json::json!({});
2291        let op = Connection::parse_single_operation(&op_json);
2292        assert_eq!(op.op_type, "Unknown");
2293        assert_eq!(op.description, "");
2294        assert_eq!(op.estimated_rows, None);
2295        assert!(op.children.is_empty());
2296    }
2297
2298    #[test]
2299    fn test_parse_single_operation_alt_fields() {
2300        let op_json = serde_json::json!({
2301            "op_type": "Sort",
2302            "desc": "Sort by name ASC",
2303            "rows": 100
2304        });
2305        let op = Connection::parse_single_operation(&op_json);
2306        assert_eq!(op.op_type, "Sort");
2307        assert_eq!(op.description, "Sort by name ASC");
2308        assert_eq!(op.estimated_rows, Some(100));
2309    }
2310
2311    // redact_dsn tests - Gap #7 (DSN Password Exposure)
2312
2313    #[test]
2314    fn test_redact_dsn_url_with_password() {
2315        let dsn = "quic://admin:secret123@localhost:3141";
2316        let redacted = redact_dsn(dsn);
2317        assert!(redacted.contains("[REDACTED]"));
2318        assert!(!redacted.contains("secret123"));
2319        assert!(redacted.contains("admin"));
2320        assert!(redacted.contains("localhost"));
2321    }
2322
2323    #[test]
2324    fn test_redact_dsn_url_without_password() {
2325        let dsn = "quic://admin@localhost:3141";
2326        let redacted = redact_dsn(dsn);
2327        assert!(!redacted.contains("[REDACTED]"));
2328        assert!(redacted.contains("admin"));
2329        assert!(redacted.contains("localhost"));
2330    }
2331
2332    #[test]
2333    fn test_redact_dsn_url_no_auth() {
2334        let dsn = "quic://localhost:3141";
2335        let redacted = redact_dsn(dsn);
2336        assert_eq!(redacted, dsn);
2337    }
2338
2339    #[test]
2340    fn test_redact_dsn_query_param_password() {
2341        let dsn = "localhost:3141?username=admin&password=secret123";
2342        let redacted = redact_dsn(dsn);
2343        assert!(redacted.contains("[REDACTED]"));
2344        assert!(!redacted.contains("secret123"));
2345        assert!(redacted.contains("username=admin"));
2346    }
2347
2348    #[test]
2349    fn test_redact_dsn_query_param_pass() {
2350        let dsn = "localhost:3141?user=admin&pass=mysecret";
2351        let redacted = redact_dsn(dsn);
2352        assert!(redacted.contains("[REDACTED]"));
2353        assert!(!redacted.contains("mysecret"));
2354    }
2355
2356    #[test]
2357    fn test_redact_dsn_simple_no_password() {
2358        let dsn = "localhost:3141?insecure=true";
2359        let redacted = redact_dsn(dsn);
2360        assert_eq!(redacted, dsn);
2361    }
2362
2363    #[test]
2364    fn test_redact_dsn_url_with_query_and_password() {
2365        let dsn = "quic://user:pass@localhost:3141?insecure=true";
2366        let redacted = redact_dsn(dsn);
2367        assert!(redacted.contains("[REDACTED]"));
2368        assert!(!redacted.contains(":pass@"));
2369        assert!(redacted.contains("insecure=true"));
2370    }
2371
2372    // validate() tests - Gap #9 (Validation Not Automatic)
2373
2374    #[test]
2375    fn test_client_validate_valid() {
2376        let client = Client::new("localhost", 3141);
2377        assert!(client.validate().is_ok());
2378    }
2379
2380    #[test]
2381    fn test_client_validate_valid_hostname() {
2382        let client = Client::new("geode.example.com", 3141);
2383        assert!(client.validate().is_ok());
2384    }
2385
2386    #[test]
2387    fn test_client_validate_valid_ipv4() {
2388        let client = Client::new("192.168.1.1", 8443);
2389        assert!(client.validate().is_ok());
2390    }
2391
2392    #[test]
2393    fn test_client_validate_invalid_hostname_hyphen_start() {
2394        let client = Client::new("-invalid", 3141);
2395        assert!(client.validate().is_err());
2396    }
2397
2398    #[test]
2399    fn test_client_validate_invalid_hostname_hyphen_end() {
2400        let client = Client::new("invalid-", 3141);
2401        assert!(client.validate().is_err());
2402    }
2403
2404    #[test]
2405    fn test_client_validate_invalid_port_zero() {
2406        let client = Client::new("localhost", 0);
2407        assert!(client.validate().is_err());
2408    }
2409
2410    #[test]
2411    fn test_client_validate_invalid_page_size_zero() {
2412        let client = Client::new("localhost", 3141).page_size(0);
2413        assert!(client.validate().is_err());
2414    }
2415
2416    #[test]
2417    fn test_client_validate_invalid_page_size_too_large() {
2418        let client = Client::new("localhost", 3141).page_size(200_000);
2419        assert!(client.validate().is_err());
2420    }
2421
2422    #[test]
2423    fn test_client_validate_with_all_options() {
2424        let client = Client::new("geode.example.com", 8443)
2425            .skip_verify(true)
2426            .page_size(500)
2427            .username("admin")
2428            .password("secret")
2429            .connect_timeout(15)
2430            .hello_timeout(10)
2431            .idle_timeout(60);
2432        assert!(client.validate().is_ok());
2433    }
2434
2435    // Gap #13: Test that extreme timeout values don't cause builder panics
2436    #[test]
2437    fn test_client_extreme_timeout_values() {
2438        // These should not panic - the actual validation/capping happens at connect time
2439        let _client = Client::new("localhost", 3141)
2440            .connect_timeout(u64::MAX)
2441            .hello_timeout(u64::MAX)
2442            .idle_timeout(u64::MAX);
2443        // Builder should accept any u64 value without panicking
2444    }
2445}