Skip to main content

geode_client/
client.rs

1//! Geode client implementation supporting both QUIC and gRPC transports.
2//! Uses protobuf wire protocol with 4-byte big-endian length prefix for QUIC.
3
4use log::{debug, trace, warn};
5use quinn::{ClientConfig, Endpoint};
6use rustls::pki_types::{CertificateDer, ServerName as RustlsServerName};
7use secrecy::{ExposeSecret, SecretString};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::net::{SocketAddr, ToSocketAddrs};
11use std::sync::Arc;
12use tokio::time::{Duration, timeout};
13
14use crate::dsn::{Dsn, Transport};
15use crate::error::{Error, Result};
16use crate::proto;
17use crate::types::Value;
18use crate::validate;
19
20const GEODE_ALPN: &[u8] = b"geode/1";
21
22/// Redact password from a DSN string for safe inclusion in error messages.
23/// Handles both URL format (quic://user:pass@host) and query parameter format (?password=xxx).
24#[allow(dead_code)] // Used in tests
25fn redact_dsn(dsn: &str) -> String {
26    let mut result = dsn.to_string();
27
28    // Handle URL format: scheme://user:password@host:port
29    // Look for pattern user:password@ and redact the password
30    if let Some(scheme_end) = result.find("://") {
31        let after_scheme = scheme_end + 3;
32        if let Some(at_pos) = result[after_scheme..].find('@') {
33            let auth_section = &result[after_scheme..after_scheme + at_pos];
34            if let Some(colon_pos) = auth_section.find(':') {
35                // Found user:password pattern
36                let user = &auth_section[..colon_pos];
37                let rest_start = after_scheme + at_pos;
38                result = format!(
39                    "{}{}:{}{}",
40                    &result[..after_scheme],
41                    user,
42                    "[REDACTED]",
43                    &result[rest_start..]
44                );
45            }
46        }
47    }
48
49    // Handle query parameter format: host:port?password=xxx
50    // Redact password= and pass= parameters (only check once per pattern to avoid loops)
51    let patterns = ["password=", "pass="];
52    for pattern in patterns {
53        let lower = result.to_lowercase();
54        if let Some(start) = lower.find(pattern) {
55            let value_start = start + pattern.len();
56            // Find end of value (& or end of string)
57            let value_end = result[value_start..]
58                .find('&')
59                .map(|i| value_start + i)
60                .unwrap_or(result.len());
61
62            result = format!(
63                "{}[REDACTED]{}",
64                &result[..value_start],
65                &result[value_end..]
66            );
67        }
68    }
69
70    result
71}
72
73/// A column definition in a query result set.
74///
75/// Contains the column name and its GQL type as returned by the server.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Column {
78    /// The column name (or alias if specified in the query)
79    pub name: String,
80    /// The GQL type of the column (e.g., "INT", "STRING", "BOOL")
81    #[serde(rename = "type")]
82    pub col_type: String,
83}
84
85/// A page of query results.
86///
87/// Query results are returned in pages. Each page contains a slice of rows
88/// along with metadata about the result set.
89///
90/// # Example
91///
92/// ```ignore
93/// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name, n.age").await?;
94///
95/// for row in &page.rows {
96///     let name = row.get("name").unwrap().as_string()?;
97///     let age = row.get("age").unwrap().as_int()?;
98///     println!("{}: {}", name, age);
99/// }
100///
101/// if !page.final_page {
102///     // More results available, would need to pull next page
103/// }
104/// ```
105#[derive(Debug, Clone)]
106pub struct Page {
107    /// Column definitions for the result set
108    pub columns: Vec<Column>,
109    /// Result rows, each row is a map of column name to value
110    pub rows: Vec<HashMap<String, Value>>,
111    /// Whether results are ordered (ORDER BY was used)
112    pub ordered: bool,
113    /// The keys used for ordering, if any
114    pub order_keys: Vec<String>,
115    /// Whether this is the final page of results
116    pub final_page: bool,
117}
118
119/// A named savepoint within a transaction.
120///
121/// Savepoints allow partial rollback within a transaction. They can be created
122/// and managed via server-side GQL commands.
123///
124/// # Example
125///
126/// ```ignore
127/// conn.begin().await?;
128/// conn.query("CREATE (n:Node {id: 1})").await?;
129///
130/// let sp = conn.savepoint("before_risky_op")?;
131/// match conn.query("CREATE (n:Node {id: 2})").await {
132///     Ok(_) => {},
133///     Err(_) => conn.rollback_to(&sp).await?,  // Undo only the second create
134/// }
135///
136/// conn.commit().await?;  // First node is saved
137/// ```
138#[derive(Debug, Clone)]
139pub struct Savepoint {
140    /// The savepoint name
141    pub name: String,
142}
143
144/// A prepared statement for efficient repeated query execution.
145///
146/// Prepared statements allow you to define a query once and execute it
147/// multiple times with different parameters. This can improve performance
148/// by allowing query plan caching on the server.
149///
150/// # Example
151///
152/// ```ignore
153/// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p").await?;
154///
155/// for id in 1..=100 {
156///     let mut params = HashMap::new();
157///     params.insert("id".to_string(), Value::int(id));
158///     let (page, _) = stmt.execute(&mut conn, &params).await?;
159///     // Process results...
160/// }
161/// ```
162#[derive(Debug, Clone)]
163pub struct PreparedStatement {
164    /// The GQL query string
165    query: String,
166    /// Parameter names extracted from the query
167    param_names: Vec<String>,
168}
169
170impl PreparedStatement {
171    /// Create a new prepared statement.
172    ///
173    /// Extracts parameter names from the query (tokens starting with `$`).
174    pub fn new(query: impl Into<String>) -> Self {
175        let query = query.into();
176        let param_names = Self::extract_param_names(&query);
177        Self { query, param_names }
178    }
179
180    /// Extract parameter names from a query string.
181    fn extract_param_names(query: &str) -> Vec<String> {
182        let mut names = Vec::new();
183        let mut chars = query.chars().peekable();
184
185        while let Some(c) = chars.next() {
186            if c == '$' {
187                let mut name = String::new();
188                while let Some(&next) = chars.peek() {
189                    if next.is_ascii_alphanumeric() || next == '_' {
190                        name.push(chars.next().unwrap());
191                    } else {
192                        break;
193                    }
194                }
195                if !name.is_empty() && !names.contains(&name) {
196                    names.push(name);
197                }
198            }
199        }
200
201        names
202    }
203
204    /// Get the query string.
205    pub fn query(&self) -> &str {
206        &self.query
207    }
208
209    /// Get the parameter names expected by this statement.
210    pub fn param_names(&self) -> &[String] {
211        &self.param_names
212    }
213
214    /// Execute the prepared statement with the given parameters.
215    ///
216    /// # Arguments
217    ///
218    /// * `conn` - The connection to execute on
219    /// * `params` - Parameter values (must include all parameters in the query)
220    ///
221    /// # Returns
222    ///
223    /// A tuple of (`Page`, `Option<String>`) with results and optional warnings.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if required parameters are missing or if the query fails.
228    pub async fn execute(
229        &self,
230        conn: &mut Connection,
231        params: &HashMap<String, crate::types::Value>,
232    ) -> crate::error::Result<(Page, Option<String>)> {
233        // Validate all required parameters are provided
234        for name in &self.param_names {
235            if !params.contains_key(name) {
236                return Err(crate::error::Error::validation(format!(
237                    "Missing required parameter: {}",
238                    name
239                )));
240            }
241        }
242
243        conn.query_with_params(&self.query, params).await
244    }
245}
246
247/// An operation in a query execution plan.
248#[derive(Debug, Clone)]
249pub struct PlanOperation {
250    /// Operation type (e.g., "NodeScan", "Filter", "Projection")
251    pub op_type: String,
252    /// Human-readable description
253    pub description: String,
254    /// Estimated row count for this operation
255    pub estimated_rows: Option<u64>,
256    /// Child operations
257    pub children: Vec<PlanOperation>,
258}
259
260/// A query execution plan.
261///
262/// Shows how the database will execute a query without actually running it.
263/// Useful for query optimization and understanding performance characteristics.
264#[derive(Debug, Clone)]
265pub struct QueryPlan {
266    /// Root operations in the plan
267    pub operations: Vec<PlanOperation>,
268    /// Total estimated rows
269    pub estimated_rows: u64,
270    /// Raw plan from server (for advanced analysis)
271    pub raw: serde_json::Value,
272}
273
274/// Query execution profile with timing information.
275///
276/// Includes the execution plan plus actual runtime statistics.
277#[derive(Debug, Clone)]
278pub struct QueryProfile {
279    /// The execution plan
280    pub plan: QueryPlan,
281    /// Actual rows returned
282    pub actual_rows: u64,
283    /// Total execution time in milliseconds
284    pub execution_time_ms: f64,
285    /// Raw profile from server
286    pub raw: serde_json::Value,
287}
288
289/// A Geode database client supporting both QUIC and gRPC transports.
290///
291/// Use the builder pattern to configure the client, then call [`connect`](Client::connect)
292/// to establish a connection.
293///
294/// # Transport Selection
295///
296/// The transport is selected based on the DSN scheme:
297/// - `quic://` - QUIC transport (default)
298/// - `grpc://` - gRPC transport
299///
300/// # Example
301///
302/// ```no_run
303/// use geode_client::Client;
304///
305/// # async fn example() -> geode_client::Result<()> {
306/// // QUIC transport (legacy API)
307/// let client = Client::new("127.0.0.1", 3141)
308///     .skip_verify(true)  // Development only!
309///     .page_size(500)
310///     .client_name("my-app");
311///
312/// // Or use DSN with explicit transport
313/// let client = Client::from_dsn("quic://127.0.0.1:3141?insecure=true")?;
314/// let client = Client::from_dsn("grpc://127.0.0.1:50051")?;
315///
316/// let mut conn = client.connect().await?;
317/// let (page, _) = conn.query("RETURN 1 AS x").await?;
318/// conn.close()?;
319/// # Ok(())
320/// # }
321/// ```
322/// Client configuration for connecting to a Geode server.
323///
324/// The password field uses `SecretString` from the `secrecy` crate to ensure
325/// credentials are zeroized from memory on drop and not accidentally leaked
326/// in debug output or error messages.
327#[derive(Clone)]
328pub struct Client {
329    transport: Transport,
330    host: String,
331    port: u16,
332    skip_verify: bool,
333    page_size: usize,
334    hello_name: String,
335    hello_ver: String,
336    conformance: String,
337    username: Option<String>,
338    /// Password stored using SecretString for secure memory handling (CWE-316).
339    /// Automatically zeroized on drop and redacted in Debug output.
340    password: Option<SecretString>,
341    /// Connection timeout in seconds (default: 10)
342    connect_timeout_secs: u64,
343    /// HELLO handshake timeout in seconds (default: 5)
344    hello_timeout_secs: u64,
345    /// Idle connection timeout in seconds (default: 30)
346    idle_timeout_secs: u64,
347}
348
349impl Client {
350    /// Create a new QUIC client for the specified host and port.
351    ///
352    /// This method creates a client using QUIC transport. For gRPC transport,
353    /// use [`from_dsn`](Client::from_dsn) with a `grpc://` scheme.
354    ///
355    /// # Arguments
356    ///
357    /// * `host` - The server hostname or IP address
358    /// * `port` - The server port (typically 3141 for Geode)
359    ///
360    /// # Example
361    ///
362    /// ```
363    /// use geode_client::Client;
364    ///
365    /// let client = Client::new("localhost", 3141);
366    /// let client = Client::new("192.168.1.100", 8443);
367    /// let client = Client::new(String::from("geode.example.com"), 3141);
368    /// ```
369    pub fn new(host: impl Into<String>, port: u16) -> Self {
370        Self {
371            transport: Transport::Quic,
372            host: host.into(),
373            port,
374            skip_verify: false,
375            page_size: 1000,
376            hello_name: "geode-rust".to_string(),
377            hello_ver: env!("CARGO_PKG_VERSION").to_string(),
378            conformance: "min".to_string(),
379            username: None,
380            password: None,
381            connect_timeout_secs: 10,
382            hello_timeout_secs: 5,
383            idle_timeout_secs: 30,
384        }
385    }
386
387    /// Create a new client from a DSN (Data Source Name) string.
388    ///
389    /// # Supported DSN Formats
390    ///
391    /// - `quic://host:port?options` - QUIC transport (recommended)
392    /// - `grpc://host:port?options` - gRPC transport
393    /// - `host:port?options` - Legacy format (defaults to QUIC)
394    ///
395    /// # Supported Options
396    ///
397    /// - `tls` - Enable/disable TLS (0/1/true/false)
398    /// - `insecure` or `skip_verify` - Skip TLS verification
399    /// - `page_size` - Results page size (default: 1000)
400    /// - `client_name` or `hello_name` - Client name
401    /// - `client_version` or `hello_ver` - Client version
402    /// - `conformance` - GQL conformance level
403    /// - `username` or `user` - Authentication username
404    /// - `password` or `pass` - Authentication password
405    ///
406    /// # Examples
407    ///
408    /// ```
409    /// use geode_client::Client;
410    ///
411    /// // QUIC transport (explicit)
412    /// let client = Client::from_dsn("quic://localhost:3141").unwrap();
413    ///
414    /// // gRPC transport
415    /// let client = Client::from_dsn("grpc://localhost:50051?tls=0").unwrap();
416    ///
417    /// // Legacy format (defaults to QUIC)
418    /// let client = Client::from_dsn("localhost:3141?insecure=true").unwrap();
419    ///
420    /// // With authentication
421    /// let client = Client::from_dsn("quic://admin:secret@localhost:3141").unwrap();
422    ///
423    /// // IPv6 support
424    /// let client = Client::from_dsn("grpc://[::1]:50051").unwrap();
425    /// ```
426    ///
427    /// # Errors
428    ///
429    /// Returns `Error::InvalidDsn` if:
430    /// - DSN is empty
431    /// - Scheme is unsupported (not quic://, grpc://, or schemeless)
432    /// - Host is missing
433    /// - Port is invalid
434    pub fn from_dsn(dsn_str: &str) -> Result<Self> {
435        let dsn = Dsn::parse(dsn_str)?;
436
437        Ok(Self {
438            transport: dsn.transport(),
439            host: dsn.host().to_string(),
440            port: dsn.port(),
441            skip_verify: dsn.skip_verify(),
442            page_size: dsn.page_size(),
443            hello_name: dsn.client_name().to_string(),
444            hello_ver: dsn.client_version().to_string(),
445            conformance: dsn.conformance().to_string(),
446            username: dsn.username().map(String::from),
447            password: dsn.password().map(|p| SecretString::from(p.to_string())),
448            connect_timeout_secs: 10,
449            hello_timeout_secs: 5,
450            idle_timeout_secs: 30,
451        })
452    }
453
454    /// Get the transport type for this client.
455    pub fn transport(&self) -> Transport {
456        self.transport
457    }
458
459    /// Skip TLS certificate verification.
460    ///
461    /// # Security Warning
462    ///
463    /// **This should only be used in development environments.** Disabling
464    /// certificate verification makes the connection vulnerable to
465    /// man-in-the-middle attacks.
466    ///
467    /// # Arguments
468    ///
469    /// * `skip` - If true, skip certificate verification
470    pub fn skip_verify(mut self, skip: bool) -> Self {
471        self.skip_verify = skip;
472        self
473    }
474
475    /// Set the page size for query results.
476    ///
477    /// Controls how many rows are returned per page when fetching results.
478    /// Larger values reduce round-trips but use more memory.
479    ///
480    /// # Arguments
481    ///
482    /// * `size` - Number of rows per page (default: 1000)
483    pub fn page_size(mut self, size: usize) -> Self {
484        self.page_size = size;
485        self
486    }
487
488    /// Set the client name sent to the server.
489    ///
490    /// This appears in server logs and can help with debugging.
491    ///
492    /// # Arguments
493    ///
494    /// * `name` - Client application name (default: "geode-rust-quinn")
495    pub fn client_name(mut self, name: impl Into<String>) -> Self {
496        self.hello_name = name.into();
497        self
498    }
499
500    /// Set the client version sent to the server.
501    ///
502    /// # Arguments
503    ///
504    /// * `version` - Client version string (default: "0.1.0")
505    pub fn client_version(mut self, version: impl Into<String>) -> Self {
506        self.hello_ver = version.into();
507        self
508    }
509
510    /// Set the GQL conformance level.
511    ///
512    /// # Arguments
513    ///
514    /// * `level` - Conformance level (default: "min")
515    pub fn conformance(mut self, level: impl Into<String>) -> Self {
516        self.conformance = level.into();
517        self
518    }
519
520    /// Set the authentication username.
521    ///
522    /// # Arguments
523    ///
524    /// * `username` - The username for authentication
525    ///
526    /// # Example
527    ///
528    /// ```
529    /// use geode_client::Client;
530    ///
531    /// let client = Client::new("localhost", 3141)
532    ///     .username("admin")
533    ///     .password("secret");
534    /// ```
535    pub fn username(mut self, username: impl Into<String>) -> Self {
536        self.username = Some(username.into());
537        self
538    }
539
540    /// Set the authentication password.
541    ///
542    /// The password is stored using `SecretString` which ensures it is:
543    /// - Zeroized from memory when dropped
544    /// - Not accidentally leaked in debug output
545    ///
546    /// # Arguments
547    ///
548    /// * `password` - The password for authentication
549    pub fn password(mut self, password: impl Into<String>) -> Self {
550        self.password = Some(SecretString::from(password.into()));
551        self
552    }
553
554    /// Set the connection timeout in seconds.
555    ///
556    /// This controls how long to wait for the initial QUIC connection
557    /// to be established. Default is 10 seconds.
558    ///
559    /// # Arguments
560    ///
561    /// * `seconds` - Timeout in seconds (must be > 0)
562    pub fn connect_timeout(mut self, seconds: u64) -> Self {
563        self.connect_timeout_secs = seconds.max(1);
564        self
565    }
566
567    /// Set the HELLO handshake timeout in seconds.
568    ///
569    /// This controls how long to wait for the server to respond to the
570    /// initial HELLO message. Default is 5 seconds.
571    ///
572    /// # Arguments
573    ///
574    /// * `seconds` - Timeout in seconds (must be > 0)
575    pub fn hello_timeout(mut self, seconds: u64) -> Self {
576        self.hello_timeout_secs = seconds.max(1);
577        self
578    }
579
580    /// Set the idle connection timeout in seconds.
581    ///
582    /// This controls how long an idle connection can remain open before
583    /// being automatically closed by the QUIC layer. Default is 30 seconds.
584    ///
585    /// # Arguments
586    ///
587    /// * `seconds` - Timeout in seconds (must be > 0)
588    pub fn idle_timeout(mut self, seconds: u64) -> Self {
589        self.idle_timeout_secs = seconds.max(1);
590        self
591    }
592
593    /// Validate the client configuration.
594    ///
595    /// Performs validation on all configuration parameters including:
596    /// - Hostname format (RFC 1035 compliant)
597    /// - Port number (1-65535)
598    /// - Page size (1-100,000)
599    ///
600    /// This method is automatically called by [`connect`](Self::connect).
601    /// You can call it manually to validate configuration before attempting
602    /// to connect.
603    ///
604    /// # Errors
605    ///
606    /// Returns a validation error if any parameter is invalid.
607    ///
608    /// # Example
609    ///
610    /// ```
611    /// use geode_client::Client;
612    ///
613    /// let client = Client::new("localhost", 3141);
614    /// assert!(client.validate().is_ok());
615    ///
616    /// // Invalid hostname
617    /// let invalid = Client::new("-invalid-host", 3141);
618    /// assert!(invalid.validate().is_err());
619    /// ```
620    pub fn validate(&self) -> Result<()> {
621        // Validate hostname format
622        validate::hostname(&self.host)?;
623
624        // Validate port (0 is reserved)
625        validate::port(self.port)?;
626
627        // Validate page size
628        validate::page_size(self.page_size)?;
629
630        Ok(())
631    }
632
633    /// Connect to the Geode database.
634    ///
635    /// Establishes a QUIC connection to the server, performs the TLS handshake,
636    /// and sends the initial HELLO message.
637    ///
638    /// # Returns
639    ///
640    /// A [`Connection`] that can be used to execute queries.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if:
645    /// - The hostname cannot be resolved
646    /// - The connection cannot be established
647    /// - TLS verification fails (unless `skip_verify` is true)
648    /// - The HELLO handshake fails
649    ///
650    /// # Example
651    ///
652    /// ```no_run
653    /// # use geode_client::Client;
654    /// # async fn example() -> geode_client::Result<()> {
655    /// let client = Client::new("localhost", 3141).skip_verify(true);
656    /// let mut conn = client.connect().await?;
657    /// // Use connection...
658    /// conn.close()?;
659    /// # Ok(())
660    /// # }
661    /// ```
662    // CANARY: REQ=REQ-CLIENT-RUST-001; FEATURE="RustClientConnection"; ASPECT=HelloHandshake; STATUS=IMPL; OWNER=clients; UPDATED=2025-02-14
663    pub async fn connect(&self) -> Result<Connection> {
664        // Validate configuration before connecting (Gap #9 - automatic validation)
665        self.validate()?;
666
667        // Expose the secret password only when needed for the connection
668        let password_ref = self.password.as_ref().map(|s| s.expose_secret());
669
670        match self.transport {
671            Transport::Quic => {
672                Connection::new_quic(
673                    &self.host,
674                    self.port,
675                    self.skip_verify,
676                    self.page_size,
677                    &self.hello_name,
678                    &self.hello_ver,
679                    &self.conformance,
680                    self.username.as_deref(),
681                    password_ref,
682                    self.connect_timeout_secs,
683                    self.hello_timeout_secs,
684                    self.idle_timeout_secs,
685                )
686                .await
687            }
688            Transport::Grpc => {
689                #[cfg(feature = "grpc")]
690                {
691                    Connection::new_grpc(
692                        &self.host,
693                        self.port,
694                        self.skip_verify,
695                        self.page_size,
696                        self.username.as_deref(),
697                        password_ref,
698                    )
699                    .await
700                }
701                #[cfg(not(feature = "grpc"))]
702                {
703                    Err(Error::connection(
704                        "gRPC transport requires the 'grpc' feature to be enabled",
705                    ))
706                }
707            }
708        }
709    }
710}
711
712/// Internal connection type for transport-specific implementations.
713#[allow(dead_code)]
714enum ConnectionKind {
715    /// QUIC transport connection
716    Quic {
717        conn: quinn::Connection,
718        send: quinn::SendStream,
719        recv: quinn::RecvStream,
720        /// Reserved for future streaming support
721        buffer: Vec<u8>,
722        /// Reserved for request ID tracking
723        next_request_id: u64,
724    },
725    /// gRPC transport connection
726    #[cfg(feature = "grpc")]
727    Grpc { client: crate::grpc::GrpcClient },
728}
729
730/// An active connection to a Geode database server.
731///
732/// A `Connection` represents a connection to the Geode server using either
733/// QUIC or gRPC transport. It provides methods for executing queries, managing
734/// transactions, and controlling the connection lifecycle.
735///
736/// # Transport Support
737///
738/// - **QUIC**: Uses a bidirectional stream with protobuf wire protocol
739/// - **gRPC**: Uses tonic-based gRPC client (requires `grpc` feature)
740///
741/// # Connection Lifecycle
742///
743/// 1. Create via [`Client::connect`]
744/// 2. Execute queries with [`query`](Connection::query) or [`query_with_params`](Connection::query_with_params)
745/// 3. Optionally use transactions with [`begin`](Connection::begin), [`commit`](Connection::commit), [`rollback`](Connection::rollback)
746/// 4. Close with [`close`](Connection::close)
747///
748/// # Example
749///
750/// ```no_run
751/// # use geode_client::Client;
752/// # async fn example() -> geode_client::Result<()> {
753/// let client = Client::new("localhost", 3141).skip_verify(true);
754/// let mut conn = client.connect().await?;
755///
756/// // Execute queries
757/// let (page, _) = conn.query("RETURN 42 AS answer").await?;
758/// println!("Answer: {}", page.rows[0].get("answer").unwrap().as_int()?);
759///
760/// // Use transactions
761/// conn.begin().await?;
762/// conn.query("CREATE (n:Node {id: 1})").await?;
763/// conn.commit().await?;
764///
765/// conn.close()?;
766/// # Ok(())
767/// # }
768/// ```
769///
770/// # Thread Safety
771///
772/// `Connection` is `!Sync` because the underlying transport streams are not thread-safe.
773/// For concurrent access, use [`ConnectionPool`](crate::ConnectionPool).
774pub struct Connection {
775    kind: ConnectionKind,
776    /// Page size for query results (reserved for future use)
777    #[allow(dead_code)]
778    page_size: usize,
779}
780
781impl Connection {
782    /// Create a new QUIC connection.
783    #[allow(clippy::too_many_arguments)]
784    async fn new_quic(
785        host: &str,
786        port: u16,
787        skip_verify: bool,
788        page_size: usize,
789        hello_name: &str,
790        hello_ver: &str,
791        conformance: &str,
792        username: Option<&str>,
793        password: Option<&str>,
794        connect_timeout_secs: u64,
795        hello_timeout_secs: u64,
796        idle_timeout_secs: u64,
797    ) -> Result<Self> {
798        let mut last_err: Option<Error> = None;
799
800        for attempt in 1..=3 {
801            match Self::connect_quic_once(
802                host,
803                port,
804                skip_verify,
805                page_size,
806                hello_name,
807                hello_ver,
808                conformance,
809                username,
810                password,
811                connect_timeout_secs,
812                hello_timeout_secs,
813                idle_timeout_secs,
814            )
815            .await
816            {
817                Ok(conn) => return Ok(conn),
818                Err(e) => {
819                    last_err = Some(e);
820                    if attempt < 3 {
821                        debug!("Connection attempt {} failed, retrying...", attempt);
822                        tokio::time::sleep(Duration::from_millis(150)).await;
823                    }
824                }
825            }
826        }
827
828        Err(last_err.unwrap_or_else(|| Error::connection("Failed to connect")))
829    }
830
831    /// Create a new gRPC connection.
832    #[cfg(feature = "grpc")]
833    #[allow(clippy::too_many_arguments)]
834    async fn new_grpc(
835        host: &str,
836        port: u16,
837        skip_verify: bool,
838        page_size: usize,
839        username: Option<&str>,
840        password: Option<&str>,
841    ) -> Result<Self> {
842        use crate::dsn::Dsn;
843
844        // Build DSN for gRPC client
845        let dsn_str = if let (Some(user), Some(pass)) = (username, password) {
846            format!(
847                "grpc://{}:{}@{}:{}?insecure={}",
848                user, pass, host, port, skip_verify
849            )
850        } else {
851            format!("grpc://{}:{}?insecure={}", host, port, skip_verify)
852        };
853
854        let dsn = Dsn::parse(&dsn_str)?;
855        let client = crate::grpc::GrpcClient::connect(&dsn).await?;
856
857        Ok(Self {
858            kind: ConnectionKind::Grpc { client },
859            page_size,
860        })
861    }
862
863    #[allow(clippy::too_many_arguments)]
864    async fn connect_quic_once(
865        host: &str,
866        port: u16,
867        skip_verify: bool,
868        page_size: usize,
869        _hello_name: &str,
870        _hello_ver: &str,
871        _conformance: &str,
872        username: Option<&str>,
873        password: Option<&str>,
874        connect_timeout_secs: u64,
875        _hello_timeout_secs: u64,
876        idle_timeout_secs: u64,
877    ) -> Result<Self> {
878        debug!("Creating connection to {}:{}", host, port);
879
880        // Install default crypto provider for rustls
881        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
882
883        // Build Quinn client config with TLS 1.3 explicitly (QUIC requires TLS 1.3)
884        let mut client_crypto = if skip_verify {
885            // SECURITY WARNING: Disabling TLS verification exposes connections to MITM attacks.
886            // Credentials sent in HELLO may be intercepted. Only use for development/testing.
887            warn!(
888                "TLS certificate verification DISABLED - connection to {}:{} is vulnerable to MITM attacks. \
889                 Do NOT use skip_verify in production!",
890                host, port
891            );
892            rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
893                .dangerous()
894                .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
895                .with_no_client_auth()
896        } else {
897            // Load system root certificates for proper TLS verification
898            let mut root_store = rustls::RootCertStore::empty();
899
900            let cert_result = rustls_native_certs::load_native_certs();
901
902            // Log any errors that occurred during certificate loading
903            for err in &cert_result.errors {
904                warn!("Error loading native certificate: {:?}", err);
905            }
906
907            let mut certs_loaded = 0;
908            let mut certs_failed = 0;
909
910            for cert in cert_result.certs {
911                match root_store.add(cert) {
912                    Ok(()) => certs_loaded += 1,
913                    Err(_) => certs_failed += 1,
914                }
915            }
916
917            if certs_loaded == 0 {
918                return Err(Error::tls(
919                    "No system root certificates found. TLS verification cannot proceed. \
920                     Either install system CA certificates or use skip_verify(true) for development only.",
921                ));
922            }
923
924            debug!(
925                "Loaded {} system root certificates ({} failed to parse)",
926                certs_loaded, certs_failed
927            );
928
929            rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
930                .with_root_certificates(root_store)
931                .with_no_client_auth()
932        };
933
934        // Set ALPN protocols
935        client_crypto.alpn_protocols = vec![GEODE_ALPN.to_vec()];
936
937        let mut client_config = ClientConfig::new(Arc::new(
938            quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto)
939                .map_err(|e| Error::connection(format!("Failed to create QUIC config: {}", e)))?,
940        ));
941
942        // Configure QUIC transport parameters to match Python/Go clients
943        let mut transport = quinn::TransportConfig::default();
944        // Cap idle timeout to quinn's maximum (2^62 - 1 microseconds ≈ 146 years)
945        // to prevent panic from VarInt overflow
946        let idle_timeout = Duration::from_secs(idle_timeout_secs.min(146_000 * 365 * 24 * 3600));
947        transport.max_idle_timeout(Some(idle_timeout.try_into().map_err(|_| {
948            Error::connection("Idle timeout value too large for QUIC protocol")
949        })?));
950        transport.keep_alive_interval(Some(Duration::from_secs(5)));
951        client_config.transport_config(Arc::new(transport));
952
953        // Create endpoint - "0.0.0.0:0" is a valid socket address literal that binds
954        // to any available port on all interfaces, so this parse cannot fail
955        let mut endpoint = Endpoint::client(
956            "0.0.0.0:0"
957                .parse()
958                .expect("0.0.0.0:0 is a valid socket address"),
959        )
960        .map_err(|e| Error::connection(format!("Failed to create endpoint: {}", e)))?;
961        endpoint.set_default_client_config(client_config);
962
963        // Resolve server address (supports hostnames as well as IP literals)
964        let mut resolved_addrs = format!("{}:{}", host, port)
965            .to_socket_addrs()
966            .map_err(|e| {
967                Error::connection(format!(
968                    "Failed to resolve address {}:{} - {}",
969                    host, port, e
970                ))
971            })?;
972
973        let server_addr: SocketAddr = resolved_addrs
974            .find(|addr| matches!(addr, SocketAddr::V4(_) | SocketAddr::V6(_)))
975            .ok_or_else(|| Error::connection("Invalid address: could not resolve host"))?;
976
977        debug!("Connecting to {}", server_addr);
978
979        // When skipping verification, don't use actual hostname for SNI
980        // This matches Python client behavior which avoids server_name when skip_verify=True
981        let server_name = if skip_verify {
982            "localhost" // Use generic name when skipping verification
983        } else {
984            host
985        };
986
987        trace!("Using server name for SNI: {}", server_name);
988
989        let conn = timeout(
990            Duration::from_secs(connect_timeout_secs),
991            endpoint
992                .connect(server_addr, server_name)
993                .map_err(|e| Error::connection(format!("Connection failed: {}", e)))?,
994        )
995        .await
996        .map_err(|_| Error::connection("Connection timeout"))?
997        .map_err(|e| Error::connection(format!("Failed to establish connection: {}", e)))?;
998
999        debug!("Connection established to {}:{}", host, port);
1000
1001        // Open a single bidirectional stream used for the entire session.
1002        let (mut send, mut recv) = conn
1003            .open_bi()
1004            .await
1005            .map_err(|e| Error::connection(format!("Failed to open stream: {}", e)))?;
1006
1007        // Send HELLO message using protobuf with length prefix
1008        let hello_req = proto::HelloRequest {
1009            username: username.unwrap_or("").to_string(),
1010            password: password.unwrap_or("").to_string(),
1011            tenant_id: String::new(),
1012        };
1013        let msg = proto::QuicClientMessage {
1014            hello: Some(hello_req),
1015            ..Default::default()
1016        };
1017        let data = proto::encode_with_length_prefix(&msg);
1018
1019        send.write_all(&data)
1020            .await
1021            .map_err(|e| Error::connection(format!("Failed to send HELLO: {}", e)))?;
1022
1023        // Wait for HELLO response (length-prefixed protobuf)
1024        let mut length_buf = [0u8; 4];
1025        timeout(Duration::from_secs(5), recv.read_exact(&mut length_buf))
1026            .await
1027            .map_err(|_| Error::connection("HELLO response timeout"))?
1028            .map_err(|e| {
1029                Error::connection(format!("Failed to read HELLO response length: {}", e))
1030            })?;
1031
1032        let msg_len = u32::from_be_bytes(length_buf) as usize;
1033        let mut msg_buf = vec![0u8; msg_len];
1034        recv.read_exact(&mut msg_buf)
1035            .await
1036            .map_err(|e| Error::connection(format!("Failed to read HELLO response body: {}", e)))?;
1037
1038        let hello_response = proto::decode_quic_server_message(&msg_buf)?;
1039
1040        if let Some(ref hello_resp) = hello_response.hello {
1041            if !hello_resp.success {
1042                return Err(Error::connection(format!(
1043                    "Authentication failed: {}",
1044                    hello_resp.error_message
1045                )));
1046            }
1047        } else {
1048            return Err(Error::connection("Expected HELLO response"));
1049        }
1050
1051        debug!("HELLO handshake complete");
1052
1053        Ok(Self {
1054            kind: ConnectionKind::Quic {
1055                conn,
1056                send,
1057                recv,
1058                buffer: Vec::new(),
1059                next_request_id: 1,
1060            },
1061            page_size,
1062        })
1063    }
1064
1065    /// Send a protobuf message over QUIC.
1066    async fn send_proto_quic(
1067        send: &mut quinn::SendStream,
1068        msg: &proto::QuicClientMessage,
1069    ) -> Result<()> {
1070        let data = proto::encode_with_length_prefix(msg);
1071        send.write_all(&data)
1072            .await
1073            .map_err(|e| Error::connection(format!("Failed to send message: {}", e)))?;
1074        Ok(())
1075    }
1076
1077    /// Read a protobuf message over QUIC with timeout.
1078    async fn read_proto_quic(
1079        recv: &mut quinn::RecvStream,
1080        timeout_secs: u64,
1081    ) -> Result<proto::QuicServerMessage> {
1082        // Read 4-byte length prefix
1083        let mut length_buf = [0u8; 4];
1084        timeout(
1085            Duration::from_secs(timeout_secs),
1086            recv.read_exact(&mut length_buf),
1087        )
1088        .await
1089        .map_err(|_| Error::timeout())?
1090        .map_err(|e| Error::connection(format!("Failed to read response length: {}", e)))?;
1091
1092        let msg_len = u32::from_be_bytes(length_buf) as usize;
1093        let mut msg_buf = vec![0u8; msg_len];
1094        recv.read_exact(&mut msg_buf)
1095            .await
1096            .map_err(|e| Error::connection(format!("Failed to read response body: {}", e)))?;
1097
1098        proto::decode_quic_server_message(&msg_buf)
1099    }
1100
1101    /// Attempt to read a buffered protobuf message without blocking (QUIC).
1102    /// Returns Ok(None) if no complete message is available immediately.
1103    async fn try_read_proto_quic(
1104        recv: &mut quinn::RecvStream,
1105    ) -> Result<Option<proto::QuicServerMessage>> {
1106        // Try to read with a short timeout
1107        let mut length_buf = [0u8; 4];
1108        let read_result =
1109            timeout(Duration::from_millis(10), recv.read_exact(&mut length_buf)).await;
1110
1111        match read_result {
1112            Ok(Ok(())) => {
1113                let msg_len = u32::from_be_bytes(length_buf) as usize;
1114                let mut msg_buf = vec![0u8; msg_len];
1115                recv.read_exact(&mut msg_buf).await.map_err(|e| {
1116                    Error::connection(format!("Failed to read response body: {}", e))
1117                })?;
1118                let msg = proto::decode_quic_server_message(&msg_buf)?;
1119                Ok(Some(msg))
1120            }
1121            Ok(Err(e)) => Err(Error::connection(format!("Failed to read response: {}", e))),
1122            Err(_) => Ok(None), // Timeout - no data available
1123        }
1124    }
1125
1126    /// Parse protobuf rows into Value maps (static version for QUIC).
1127    fn parse_proto_rows_static(
1128        proto_rows: &[proto::Row],
1129        columns: &[Column],
1130    ) -> Result<Vec<HashMap<String, Value>>> {
1131        let mut rows = Vec::new();
1132        for proto_row in proto_rows {
1133            let mut row = HashMap::new();
1134            for (i, col) in columns.iter().enumerate() {
1135                let value = if i < proto_row.values.len() {
1136                    Self::convert_proto_value_static(&proto_row.values[i])
1137                } else {
1138                    Value::null()
1139                };
1140                row.insert(col.name.clone(), value);
1141            }
1142            rows.push(row);
1143        }
1144        Ok(rows)
1145    }
1146
1147    /// Convert a protobuf Value to our Value type (static version).
1148    fn convert_proto_value_static(proto_val: &proto::Value) -> Value {
1149        if proto_val.null_val {
1150            return Value::null();
1151        }
1152        if let Some(ref s) = proto_val.string_val {
1153            return Value::string(s.clone());
1154        }
1155        if let Some(i) = proto_val.int_val {
1156            return Value::int(i);
1157        }
1158        if let Some(d) = proto_val.double_val {
1159            return Value::decimal(rust_decimal::Decimal::from_f64_retain(d).unwrap_or_default());
1160        }
1161        if let Some(b) = proto_val.bool_val {
1162            return Value::bool(b);
1163        }
1164        Value::null()
1165    }
1166
1167    /// Send BEGIN transaction command over QUIC.
1168    async fn send_begin_quic(
1169        send: &mut quinn::SendStream,
1170        recv: &mut quinn::RecvStream,
1171    ) -> Result<()> {
1172        let msg = proto::QuicClientMessage {
1173            begin: Some(proto::BeginRequest::default()),
1174            ..Default::default()
1175        };
1176        Self::send_proto_quic(send, &msg).await?;
1177
1178        let resp = Self::read_proto_quic(recv, 5).await?;
1179        if resp.begin.is_none() {
1180            return Err(Error::protocol("Expected BEGIN response"));
1181        }
1182        Ok(())
1183    }
1184
1185    /// Send COMMIT transaction command over QUIC.
1186    async fn send_commit_quic(
1187        send: &mut quinn::SendStream,
1188        recv: &mut quinn::RecvStream,
1189    ) -> Result<()> {
1190        let msg = proto::QuicClientMessage {
1191            commit: Some(proto::CommitRequest),
1192            ..Default::default()
1193        };
1194        Self::send_proto_quic(send, &msg).await?;
1195
1196        let resp = Self::read_proto_quic(recv, 5).await?;
1197        if resp.commit.is_none() {
1198            return Err(Error::protocol("Expected COMMIT response"));
1199        }
1200        Ok(())
1201    }
1202
1203    /// Send ROLLBACK transaction command over QUIC.
1204    async fn send_rollback_quic(
1205        send: &mut quinn::SendStream,
1206        recv: &mut quinn::RecvStream,
1207    ) -> Result<()> {
1208        let msg = proto::QuicClientMessage {
1209            rollback: Some(proto::RollbackRequest),
1210            ..Default::default()
1211        };
1212        Self::send_proto_quic(send, &msg).await?;
1213
1214        let resp = Self::read_proto_quic(recv, 5).await?;
1215        if resp.rollback.is_none() {
1216            return Err(Error::protocol("Expected ROLLBACK response"));
1217        }
1218        Ok(())
1219    }
1220
1221    /// Execute a GQL query without parameters.
1222    ///
1223    /// # Arguments
1224    ///
1225    /// * `gql` - The GQL query string
1226    ///
1227    /// # Returns
1228    ///
1229    /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1230    /// and the optional string contains any query warnings.
1231    ///
1232    /// # Errors
1233    ///
1234    /// Returns [`Error::Query`] if the query fails to execute.
1235    ///
1236    /// # Example
1237    ///
1238    /// ```no_run
1239    /// # use geode_client::Client;
1240    /// # async fn example() -> geode_client::Result<()> {
1241    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1242    /// # let mut conn = client.connect().await?;
1243    /// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name LIMIT 10").await?;
1244    /// for row in &page.rows {
1245    ///     println!("Name: {}", row.get("name").unwrap().as_string()?);
1246    /// }
1247    /// # Ok(())
1248    /// # }
1249    /// ```
1250    pub async fn query(&mut self, gql: &str) -> Result<(Page, Option<String>)> {
1251        self.query_with_params(gql, &HashMap::new()).await
1252    }
1253
1254    /// Execute a GQL query with parameters.
1255    ///
1256    /// Parameters are substituted for `$param_name` placeholders in the query.
1257    /// This is the recommended way to include dynamic values in queries, as it
1258    /// prevents injection attacks and allows query plan caching.
1259    ///
1260    /// # Arguments
1261    ///
1262    /// * `gql` - The GQL query string with parameter placeholders
1263    /// * `params` - A map of parameter names to values
1264    ///
1265    /// # Returns
1266    ///
1267    /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1268    /// and the optional string contains any query warnings.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns [`Error::Query`] if the query fails to execute.
1273    ///
1274    /// # Example
1275    ///
1276    /// ```no_run
1277    /// # use geode_client::{Client, Value};
1278    /// # use std::collections::HashMap;
1279    /// # async fn example() -> geode_client::Result<()> {
1280    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1281    /// # let mut conn = client.connect().await?;
1282    /// let mut params = HashMap::new();
1283    /// params.insert("name".to_string(), Value::string("Alice"));
1284    /// params.insert("min_age".to_string(), Value::int(25));
1285    ///
1286    /// let (page, _) = conn.query_with_params(
1287    ///     "MATCH (p:Person {name: $name}) WHERE p.age >= $min_age RETURN p",
1288    ///     &params
1289    /// ).await?;
1290    /// # Ok(())
1291    /// # }
1292    /// ```
1293    pub async fn query_with_params(
1294        &mut self,
1295        gql: &str,
1296        params: &HashMap<String, Value>,
1297    ) -> Result<(Page, Option<String>)> {
1298        match &mut self.kind {
1299            ConnectionKind::Quic { send, recv, .. } => {
1300                Self::query_with_params_quic(send, recv, gql, params).await
1301            }
1302            #[cfg(feature = "grpc")]
1303            ConnectionKind::Grpc { client } => client.query_with_params(gql, params).await,
1304        }
1305    }
1306
1307    /// Execute a GQL query with parameters over QUIC transport.
1308    async fn query_with_params_quic(
1309        send: &mut quinn::SendStream,
1310        recv: &mut quinn::RecvStream,
1311        gql: &str,
1312        params: &HashMap<String, Value>,
1313    ) -> Result<(Page, Option<String>)> {
1314        // Convert params to string map for protobuf
1315        let params_proto: HashMap<String, String> = params
1316            .iter()
1317            .map(|(k, v)| (k.clone(), v.to_proto_string()))
1318            .collect();
1319
1320        // Send Execute request via protobuf
1321        let exec_req = proto::ExecuteRequest {
1322            session_id: String::new(),
1323            query: gql.to_string(),
1324            parameters: params_proto,
1325        };
1326        let msg = proto::QuicClientMessage {
1327            execute: Some(exec_req),
1328            ..Default::default()
1329        };
1330        Self::send_proto_quic(send, &msg)
1331            .await
1332            .map_err(|e| Error::query(format!("{}", e)))?;
1333
1334        // Read first response (should be schema or error)
1335        let resp = Self::read_proto_quic(recv, 10).await?;
1336
1337        let exec_resp = resp
1338            .execute
1339            .ok_or_else(|| Error::protocol("Expected Execute response"))?;
1340
1341        // Check for error
1342        if let Some(ref err) = exec_resp.error {
1343            return Err(Error::Query {
1344                code: err.code.clone(),
1345                message: err.message.clone(),
1346            });
1347        }
1348
1349        // Parse columns from schema
1350        let columns: Vec<Column> = exec_resp
1351            .schema
1352            .as_ref()
1353            .map(|s| {
1354                s.columns
1355                    .iter()
1356                    .map(|c| Column {
1357                        name: c.name.clone(),
1358                        col_type: c.col_type.clone(),
1359                    })
1360                    .collect()
1361            })
1362            .unwrap_or_default();
1363
1364        trace!("Schema columns: {:?}", columns);
1365
1366        // Check for inline data page
1367        if let Some(inline_resp) = Self::try_read_proto_quic(recv).await? {
1368            if let Some(inline_exec) = inline_resp.execute {
1369                if let Some(ref err) = inline_exec.error {
1370                    return Err(Error::Query {
1371                        code: err.code.clone(),
1372                        message: err.message.clone(),
1373                    });
1374                }
1375
1376                if let Some(ref page_data) = inline_exec.page {
1377                    let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1378                    let page = Page {
1379                        columns,
1380                        rows,
1381                        ordered: false,
1382                        order_keys: Vec::new(),
1383                        final_page: page_data.last_page,
1384                    };
1385                    return Ok((page, None));
1386                }
1387
1388                // Metrics or heartbeat - empty result
1389                let page = Page {
1390                    columns,
1391                    rows: Vec::new(),
1392                    ordered: false,
1393                    order_keys: Vec::new(),
1394                    final_page: true,
1395                };
1396                return Ok((page, None));
1397            }
1398        }
1399
1400        // Check if we got a page in the first response
1401        if let Some(ref page_data) = exec_resp.page {
1402            let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1403            let page = Page {
1404                columns,
1405                rows,
1406                ordered: false,
1407                order_keys: Vec::new(),
1408                final_page: page_data.last_page,
1409            };
1410            return Ok((page, None));
1411        }
1412
1413        // Read next response for data page
1414        let resp = Self::read_proto_quic(recv, 30).await?;
1415        if let Some(exec_resp) = resp.execute {
1416            if let Some(ref err) = exec_resp.error {
1417                return Err(Error::Query {
1418                    code: err.code.clone(),
1419                    message: err.message.clone(),
1420                });
1421            }
1422
1423            if let Some(ref page_data) = exec_resp.page {
1424                let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1425                let page = Page {
1426                    columns,
1427                    rows,
1428                    ordered: false,
1429                    order_keys: Vec::new(),
1430                    final_page: page_data.last_page,
1431                };
1432                return Ok((page, None));
1433            }
1434        }
1435
1436        // Empty result
1437        let page = Page {
1438            columns,
1439            rows: Vec::new(),
1440            ordered: false,
1441            order_keys: Vec::new(),
1442            final_page: true,
1443        };
1444
1445        Ok((page, None))
1446    }
1447
1448    /// Execute a query without parameters (synchronous-style blocking version for test runner)
1449    pub fn query_sync(
1450        &mut self,
1451        gql: &str,
1452        params: Option<HashMap<String, serde_json::Value>>,
1453    ) -> Result<Page> {
1454        let params_map = params.unwrap_or_default();
1455        let params_typed: HashMap<String, Value> = params_map
1456            .into_iter()
1457            .map(|(k, v)| {
1458                let typed_val = crate::types::Value::from_json(v);
1459                (k, typed_val)
1460            })
1461            .collect();
1462
1463        match tokio::runtime::Handle::try_current() {
1464            Ok(handle) => {
1465                let (page, _cursor) =
1466                    handle.block_on(self.query_with_params(gql, &params_typed))?;
1467                Ok(page)
1468            }
1469            Err(_) => {
1470                let rt = tokio::runtime::Runtime::new()
1471                    .map_err(|e| Error::query(format!("Failed to create runtime: {}", e)))?;
1472                let (page, _cursor) = rt.block_on(self.query_with_params(gql, &params_typed))?;
1473                Ok(page)
1474            }
1475        }
1476    }
1477
1478    /// Begin a new transaction.
1479    ///
1480    /// After calling `begin`, all queries will be part of the transaction until
1481    /// [`commit`](Connection::commit) or [`rollback`](Connection::rollback) is called.
1482    ///
1483    /// # Errors
1484    ///
1485    /// Returns an error if a transaction is already in progress or if the
1486    /// server rejects the request.
1487    ///
1488    /// # Example
1489    ///
1490    /// ```no_run
1491    /// # use geode_client::Client;
1492    /// # async fn example() -> geode_client::Result<()> {
1493    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1494    /// # let mut conn = client.connect().await?;
1495    /// conn.begin().await?;
1496    /// conn.query("CREATE (n:Node {id: 1})").await?;
1497    /// conn.query("CREATE (n:Node {id: 2})").await?;
1498    /// conn.commit().await?;  // Both nodes are now persisted
1499    /// # Ok(())
1500    /// # }
1501    /// ```
1502    pub async fn begin(&mut self) -> Result<()> {
1503        match &mut self.kind {
1504            ConnectionKind::Quic { send, recv, .. } => Self::send_begin_quic(send, recv).await,
1505            #[cfg(feature = "grpc")]
1506            ConnectionKind::Grpc { client } => client.begin().await,
1507        }
1508    }
1509
1510    /// Commit the current transaction.
1511    ///
1512    /// Persists all changes made since [`begin`](Connection::begin) was called.
1513    ///
1514    /// # Errors
1515    ///
1516    /// Returns an error if no transaction is in progress or if the server
1517    /// rejects the commit.
1518    ///
1519    /// # Example
1520    ///
1521    /// ```no_run
1522    /// # use geode_client::Client;
1523    /// # async fn example() -> geode_client::Result<()> {
1524    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1525    /// # let mut conn = client.connect().await?;
1526    /// conn.begin().await?;
1527    /// conn.query("CREATE (n:Node)").await?;
1528    /// conn.commit().await?;  // Changes are now permanent
1529    /// # Ok(())
1530    /// # }
1531    /// ```
1532    pub async fn commit(&mut self) -> Result<()> {
1533        match &mut self.kind {
1534            ConnectionKind::Quic { send, recv, .. } => Self::send_commit_quic(send, recv).await,
1535            #[cfg(feature = "grpc")]
1536            ConnectionKind::Grpc { client } => client.commit().await,
1537        }
1538    }
1539
1540    /// Rollback the current transaction.
1541    ///
1542    /// Discards all changes made since [`begin`](Connection::begin) was called.
1543    ///
1544    /// # Errors
1545    ///
1546    /// Returns an error if no transaction is in progress.
1547    ///
1548    /// # Example
1549    ///
1550    /// ```no_run
1551    /// # use geode_client::Client;
1552    /// # async fn example() -> geode_client::Result<()> {
1553    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1554    /// # let mut conn = client.connect().await?;
1555    /// conn.begin().await?;
1556    /// match conn.query("CREATE (n:InvalidNode)").await {
1557    ///     Ok(_) => conn.commit().await?,
1558    ///     Err(_) => conn.rollback().await?,  // Undo everything
1559    /// }
1560    /// # Ok(())
1561    /// # }
1562    /// ```
1563    pub async fn rollback(&mut self) -> Result<()> {
1564        match &mut self.kind {
1565            ConnectionKind::Quic { send, recv, .. } => Self::send_rollback_quic(send, recv).await,
1566            #[cfg(feature = "grpc")]
1567            ConnectionKind::Grpc { client } => client.rollback().await,
1568        }
1569    }
1570
1571    /// Create a prepared statement for efficient repeated execution.
1572    ///
1573    /// Prepared statements allow you to define a query once and execute it
1574    /// multiple times with different parameters. The query text is parsed
1575    /// to extract parameter names (tokens starting with `$`).
1576    ///
1577    /// # Arguments
1578    ///
1579    /// * `query` - The GQL query string with parameter placeholders
1580    ///
1581    /// # Returns
1582    ///
1583    /// A [`PreparedStatement`] that can be executed multiple times.
1584    ///
1585    /// # Example
1586    ///
1587    /// ```no_run
1588    /// # use geode_client::{Client, Value};
1589    /// # use std::collections::HashMap;
1590    /// # async fn example() -> geode_client::Result<()> {
1591    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1592    /// # let mut conn = client.connect().await?;
1593    /// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p.name")?;
1594    ///
1595    /// for id in 1..=100 {
1596    ///     let mut params = HashMap::new();
1597    ///     params.insert("id".to_string(), Value::int(id));
1598    ///     let (page, _) = stmt.execute(&mut conn, &params).await?;
1599    ///     // Process results...
1600    /// }
1601    /// # Ok(())
1602    /// # }
1603    /// ```
1604    pub fn prepare(&self, query: &str) -> Result<PreparedStatement> {
1605        Ok(PreparedStatement::new(query))
1606    }
1607
1608    /// Get the execution plan for a query without running it.
1609    ///
1610    /// This is useful for understanding how the database will execute a query
1611    /// and for identifying potential performance issues.
1612    ///
1613    /// # Arguments
1614    ///
1615    /// * `gql` - The GQL query string to explain
1616    ///
1617    /// # Returns
1618    ///
1619    /// A [`QueryPlan`] containing the execution plan details.
1620    ///
1621    /// # Errors
1622    ///
1623    /// Returns an error if the query is invalid or cannot be planned.
1624    ///
1625    /// # Example
1626    ///
1627    /// ```no_run
1628    /// # use geode_client::Client;
1629    /// # async fn example() -> geode_client::Result<()> {
1630    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1631    /// # let mut conn = client.connect().await?;
1632    /// let plan = conn.explain("MATCH (p:Person)-[:KNOWS]->(f) RETURN f").await?;
1633    /// println!("Estimated rows: {}", plan.estimated_rows);
1634    /// for op in &plan.operations {
1635    ///     println!("  {} - {}", op.op_type, op.description);
1636    /// }
1637    /// # Ok(())
1638    /// # }
1639    /// ```
1640    pub async fn explain(&mut self, gql: &str) -> Result<QueryPlan> {
1641        // Execute EXPLAIN as a query via protobuf
1642        let explain_query = format!("EXPLAIN {}", gql);
1643        let (_page, _) = self.query(&explain_query).await?;
1644
1645        // Parse the plan from the response
1646        // The result format depends on server implementation
1647        Ok(QueryPlan {
1648            operations: Vec::new(),
1649            estimated_rows: 0,
1650            raw: serde_json::json!({}),
1651        })
1652    }
1653
1654    /// Execute a query and return the execution profile with timing information.
1655    ///
1656    /// This runs the query and collects detailed execution statistics including
1657    /// actual row counts and timing for each operation.
1658    ///
1659    /// # Arguments
1660    ///
1661    /// * `gql` - The GQL query string to profile
1662    ///
1663    /// # Returns
1664    ///
1665    /// A [`QueryProfile`] containing the execution plan and runtime statistics.
1666    ///
1667    /// # Errors
1668    ///
1669    /// Returns an error if the query fails to execute.
1670    ///
1671    /// # Example
1672    ///
1673    /// ```no_run
1674    /// # use geode_client::Client;
1675    /// # async fn example() -> geode_client::Result<()> {
1676    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1677    /// # let mut conn = client.connect().await?;
1678    /// let profile = conn.profile("MATCH (p:Person) RETURN p LIMIT 100").await?;
1679    /// println!("Execution time: {:.2}ms", profile.execution_time_ms);
1680    /// println!("Actual rows: {}", profile.actual_rows);
1681    /// # Ok(())
1682    /// # }
1683    /// ```
1684    pub async fn profile(&mut self, gql: &str) -> Result<QueryProfile> {
1685        // Execute PROFILE as a query via protobuf
1686        let profile_query = format!("PROFILE {}", gql);
1687        let (page, _) = self.query(&profile_query).await?;
1688
1689        // Parse the profile from the response
1690        let plan = QueryPlan {
1691            operations: Vec::new(),
1692            estimated_rows: 0,
1693            raw: serde_json::json!({}),
1694        };
1695
1696        Ok(QueryProfile {
1697            plan,
1698            actual_rows: page.rows.len() as u64,
1699            execution_time_ms: 0.0,
1700            raw: serde_json::json!({}),
1701        })
1702    }
1703
1704    /// Execute multiple queries in a batch.
1705    ///
1706    /// This is more efficient than executing queries one at a time when you
1707    /// have multiple independent queries to run.
1708    ///
1709    /// # Arguments
1710    ///
1711    /// * `queries` - A slice of (query, optional params) tuples
1712    ///
1713    /// # Returns
1714    ///
1715    /// A `Vec<Page>` with results for each query, in the same order as input.
1716    ///
1717    /// # Errors
1718    ///
1719    /// Returns an error if any query fails. Queries are executed in order,
1720    /// so earlier queries may have completed before the error.
1721    ///
1722    /// # Example
1723    ///
1724    /// ```no_run
1725    /// # use geode_client::Client;
1726    /// # async fn example() -> geode_client::Result<()> {
1727    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1728    /// # let mut conn = client.connect().await?;
1729    /// let results = conn.batch(&[
1730    ///     ("MATCH (n:Person) RETURN count(n)", None),
1731    ///     ("MATCH (n:Company) RETURN count(n)", None),
1732    ///     ("MATCH ()-[r:WORKS_AT]->() RETURN count(r)", None),
1733    /// ]).await?;
1734    ///
1735    /// for (i, page) in results.iter().enumerate() {
1736    ///     println!("Query {}: {} rows", i + 1, page.rows.len());
1737    /// }
1738    /// # Ok(())
1739    /// # }
1740    /// ```
1741    pub async fn batch(
1742        &mut self,
1743        queries: &[(&str, Option<&HashMap<String, Value>>)],
1744    ) -> Result<Vec<Page>> {
1745        let mut results = Vec::with_capacity(queries.len());
1746
1747        for (query, params) in queries {
1748            let (page, _) = match params {
1749                Some(p) => self.query_with_params(query, p).await?,
1750                None => self.query(query).await?,
1751            };
1752            results.push(page);
1753        }
1754
1755        Ok(results)
1756    }
1757
1758    /// Parse plan operations from a server response.
1759    /// Reserved for future EXPLAIN/PROFILE response parsing.
1760    #[allow(dead_code)]
1761    fn parse_plan_operations(result: &serde_json::Value) -> Vec<PlanOperation> {
1762        let mut operations = Vec::new();
1763
1764        if let Some(ops) = result.get("operations").and_then(|o| o.as_array()) {
1765            for op in ops {
1766                operations.push(Self::parse_single_operation(op));
1767            }
1768        } else if let Some(plan) = result.get("plan") {
1769            // Alternative format: single "plan" object
1770            operations.push(Self::parse_single_operation(plan));
1771        }
1772
1773        operations
1774    }
1775
1776    /// Parse a single operation from JSON.
1777    #[allow(dead_code)]
1778    fn parse_single_operation(op: &serde_json::Value) -> PlanOperation {
1779        let op_type = op
1780            .get("type")
1781            .or_else(|| op.get("op_type"))
1782            .and_then(|t| t.as_str())
1783            .unwrap_or("Unknown")
1784            .to_string();
1785
1786        let description = op
1787            .get("description")
1788            .or_else(|| op.get("desc"))
1789            .and_then(|d| d.as_str())
1790            .unwrap_or("")
1791            .to_string();
1792
1793        let estimated_rows = op
1794            .get("estimated_rows")
1795            .or_else(|| op.get("rows"))
1796            .and_then(|r| r.as_u64());
1797
1798        let children = op
1799            .get("children")
1800            .and_then(|c| c.as_array())
1801            .map(|arr| arr.iter().map(Self::parse_single_operation).collect())
1802            .unwrap_or_default();
1803
1804        PlanOperation {
1805            op_type,
1806            description,
1807            estimated_rows,
1808            children,
1809        }
1810    }
1811
1812    /// Close the connection.
1813    ///
1814    /// Gracefully closes the connection. After calling this method,
1815    /// the connection can no longer be used.
1816    ///
1817    /// # Note
1818    ///
1819    /// It's good practice to explicitly close connections, but they will also
1820    /// be closed when dropped.
1821    ///
1822    /// # Example
1823    ///
1824    /// ```no_run
1825    /// # use geode_client::Client;
1826    /// # async fn example() -> geode_client::Result<()> {
1827    /// # let client = Client::new("localhost", 3141).skip_verify(true);
1828    /// let mut conn = client.connect().await?;
1829    /// // ... use connection ...
1830    /// conn.close()?;
1831    /// # Ok(())
1832    /// # }
1833    /// ```
1834    pub fn close(&mut self) -> Result<()> {
1835        match &mut self.kind {
1836            ConnectionKind::Quic { conn, .. } => {
1837                // QUIC close is asynchronous and best-effort - the CONNECTION_CLOSE frame
1838                // will be sent by Quinn's internal I/O handling. No blocking delay needed.
1839                // (Gap #17: Removed std::thread::sleep that blocked the async runtime)
1840                conn.close(0u32.into(), b"client closing");
1841                Ok(())
1842            }
1843            #[cfg(feature = "grpc")]
1844            ConnectionKind::Grpc { client } => client.close(),
1845        }
1846    }
1847
1848    /// Check if the connection is still healthy.
1849    ///
1850    /// Returns `true` if the underlying QUIC connection is still open and usable.
1851    /// This is used by connection pools to verify connections before reuse.
1852    ///
1853    /// # Example
1854    ///
1855    /// ```ignore
1856    /// let mut conn = client.connect().await?;
1857    /// if conn.is_healthy() {
1858    ///     // Connection is still usable
1859    ///     let (page, _) = conn.query("RETURN 1").await?;
1860    /// }
1861    /// ```
1862    pub fn is_healthy(&self) -> bool {
1863        match &self.kind {
1864            ConnectionKind::Quic { conn, .. } => {
1865                // Quinn's close_reason() returns Some if the connection was closed
1866                conn.close_reason().is_none()
1867            }
1868            #[cfg(feature = "grpc")]
1869            ConnectionKind::Grpc { .. } => {
1870                // gRPC connections are managed by tonic, always report healthy
1871                true
1872            }
1873        }
1874    }
1875}
1876
1877/// Certificate verifier that skips all verification (INSECURE - for development only)
1878#[derive(Debug)]
1879struct SkipServerVerification;
1880
1881impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
1882    fn verify_server_cert(
1883        &self,
1884        _end_entity: &CertificateDer,
1885        _intermediates: &[CertificateDer],
1886        _server_name: &RustlsServerName,
1887        _ocsp_response: &[u8],
1888        _now: rustls::pki_types::UnixTime,
1889    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1890        Ok(rustls::client::danger::ServerCertVerified::assertion())
1891    }
1892
1893    fn verify_tls12_signature(
1894        &self,
1895        _message: &[u8],
1896        _cert: &CertificateDer,
1897        _dss: &rustls::DigitallySignedStruct,
1898    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1899        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1900    }
1901
1902    fn verify_tls13_signature(
1903        &self,
1904        _message: &[u8],
1905        _cert: &CertificateDer,
1906        _dss: &rustls::DigitallySignedStruct,
1907    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1908        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1909    }
1910
1911    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1912        vec![
1913            rustls::SignatureScheme::RSA_PKCS1_SHA256,
1914            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1915            rustls::SignatureScheme::ED25519,
1916        ]
1917    }
1918}
1919
1920#[cfg(test)]
1921mod tests {
1922    use super::*;
1923
1924    // PreparedStatement tests
1925
1926    #[test]
1927    fn test_prepared_statement_new() {
1928        let stmt = PreparedStatement::new("MATCH (n:Person {id: $id}) RETURN n");
1929        assert_eq!(stmt.query(), "MATCH (n:Person {id: $id}) RETURN n");
1930        assert_eq!(stmt.param_names(), &["id"]);
1931    }
1932
1933    #[test]
1934    fn test_prepared_statement_multiple_params() {
1935        let stmt = PreparedStatement::new(
1936            "MATCH (p:Person {name: $name}) WHERE p.age > $min_age AND p.city = $city RETURN p",
1937        );
1938        assert!(stmt.query().contains("$name"));
1939        let names = stmt.param_names();
1940        assert_eq!(names.len(), 3);
1941        assert!(names.contains(&"name".to_string()));
1942        assert!(names.contains(&"min_age".to_string()));
1943        assert!(names.contains(&"city".to_string()));
1944    }
1945
1946    #[test]
1947    fn test_prepared_statement_no_params() {
1948        let stmt = PreparedStatement::new("MATCH (n) RETURN n LIMIT 10");
1949        assert!(stmt.param_names().is_empty());
1950    }
1951
1952    #[test]
1953    fn test_prepared_statement_duplicate_params() {
1954        let stmt =
1955            PreparedStatement::new("MATCH (a {id: $id})-[:KNOWS]->(b {id: $id}) RETURN a, b");
1956        // Should deduplicate parameter names
1957        assert_eq!(stmt.param_names(), &["id"]);
1958    }
1959
1960    #[test]
1961    fn test_prepared_statement_underscore_params() {
1962        let stmt = PreparedStatement::new("MATCH (n {user_id: $user_id}) RETURN n");
1963        assert_eq!(stmt.param_names(), &["user_id"]);
1964    }
1965
1966    #[test]
1967    fn test_prepared_statement_numeric_params() {
1968        let stmt = PreparedStatement::new("RETURN $param1, $param2, $param123");
1969        let names = stmt.param_names();
1970        assert_eq!(names.len(), 3);
1971        assert!(names.contains(&"param1".to_string()));
1972        assert!(names.contains(&"param2".to_string()));
1973        assert!(names.contains(&"param123".to_string()));
1974    }
1975
1976    // PlanOperation tests
1977
1978    #[test]
1979    fn test_plan_operation_struct() {
1980        let op = PlanOperation {
1981            op_type: "NodeScan".to_string(),
1982            description: "Scan Person nodes".to_string(),
1983            estimated_rows: Some(100),
1984            children: vec![],
1985        };
1986        assert_eq!(op.op_type, "NodeScan");
1987        assert_eq!(op.description, "Scan Person nodes");
1988        assert_eq!(op.estimated_rows, Some(100));
1989        assert!(op.children.is_empty());
1990    }
1991
1992    #[test]
1993    fn test_plan_operation_with_children() {
1994        let child = PlanOperation {
1995            op_type: "Filter".to_string(),
1996            description: "Filter by age".to_string(),
1997            estimated_rows: Some(50),
1998            children: vec![],
1999        };
2000        let parent = PlanOperation {
2001            op_type: "Projection".to_string(),
2002            description: "Project name, age".to_string(),
2003            estimated_rows: Some(50),
2004            children: vec![child],
2005        };
2006        assert_eq!(parent.children.len(), 1);
2007        assert_eq!(parent.children[0].op_type, "Filter");
2008    }
2009
2010    // QueryPlan tests
2011
2012    #[test]
2013    fn test_query_plan_struct() {
2014        let plan = QueryPlan {
2015            operations: vec![PlanOperation {
2016                op_type: "NodeScan".to_string(),
2017                description: "Full scan".to_string(),
2018                estimated_rows: Some(1000),
2019                children: vec![],
2020            }],
2021            estimated_rows: 1000,
2022            raw: serde_json::json!({"type": "plan"}),
2023        };
2024        assert_eq!(plan.operations.len(), 1);
2025        assert_eq!(plan.estimated_rows, 1000);
2026    }
2027
2028    // QueryProfile tests
2029
2030    #[test]
2031    fn test_query_profile_struct() {
2032        let plan = QueryPlan {
2033            operations: vec![],
2034            estimated_rows: 100,
2035            raw: serde_json::json!({}),
2036        };
2037        let profile = QueryProfile {
2038            plan,
2039            actual_rows: 95,
2040            execution_time_ms: 12.5,
2041            raw: serde_json::json!({"type": "profile"}),
2042        };
2043        assert_eq!(profile.actual_rows, 95);
2044        assert!((profile.execution_time_ms - 12.5).abs() < 0.001);
2045    }
2046
2047    // Page tests
2048
2049    #[test]
2050    fn test_page_struct() {
2051        let page = Page {
2052            columns: vec![Column {
2053                name: "x".to_string(),
2054                col_type: "INT".to_string(),
2055            }],
2056            rows: vec![],
2057            ordered: false,
2058            order_keys: vec![],
2059            final_page: true,
2060        };
2061        assert_eq!(page.columns.len(), 1);
2062        assert!(page.rows.is_empty());
2063        assert!(page.final_page);
2064    }
2065
2066    // Column tests
2067
2068    #[test]
2069    fn test_column_struct() {
2070        let col = Column {
2071            name: "age".to_string(),
2072            col_type: "INT".to_string(),
2073        };
2074        assert_eq!(col.name, "age");
2075        assert_eq!(col.col_type, "INT");
2076    }
2077
2078    // Savepoint tests
2079
2080    #[test]
2081    fn test_savepoint_struct() {
2082        let sp = Savepoint {
2083            name: "before_update".to_string(),
2084        };
2085        assert_eq!(sp.name, "before_update");
2086    }
2087
2088    // Client builder tests
2089
2090    #[test]
2091    fn test_client_builder_defaults() {
2092        let _client = Client::new("localhost", 3141);
2093        // Test passes if it compiles - verifies defaults work
2094    }
2095
2096    #[test]
2097    fn test_client_builder_chain() {
2098        let _client = Client::new("example.com", 8443)
2099            .skip_verify(true)
2100            .page_size(500)
2101            .client_name("test-app")
2102            .client_version("2.0.0")
2103            .conformance("full");
2104        // Test passes if it compiles - verifies builder chain works
2105    }
2106
2107    #[test]
2108    fn test_client_clone() {
2109        let client = Client::new("localhost", 3141).skip_verify(true);
2110        let _cloned = client.clone();
2111        // Test passes if it compiles - verifies Clone is implemented
2112    }
2113
2114    // parse_plan_operations tests
2115
2116    #[test]
2117    fn test_parse_plan_operations_empty() {
2118        let result = serde_json::json!({});
2119        let ops = Connection::parse_plan_operations(&result);
2120        assert!(ops.is_empty());
2121    }
2122
2123    #[test]
2124    fn test_parse_plan_operations_array() {
2125        let result = serde_json::json!({
2126            "operations": [
2127                {"type": "NodeScan", "description": "Scan nodes", "estimated_rows": 100},
2128                {"type": "Filter", "description": "Apply filter", "estimated_rows": 50}
2129            ]
2130        });
2131        let ops = Connection::parse_plan_operations(&result);
2132        assert_eq!(ops.len(), 2);
2133        assert_eq!(ops[0].op_type, "NodeScan");
2134        assert_eq!(ops[1].op_type, "Filter");
2135    }
2136
2137    #[test]
2138    fn test_parse_plan_operations_single_plan() {
2139        let result = serde_json::json!({
2140            "plan": {"op_type": "FullScan", "desc": "Full table scan"}
2141        });
2142        let ops = Connection::parse_plan_operations(&result);
2143        assert_eq!(ops.len(), 1);
2144        assert_eq!(ops[0].op_type, "FullScan");
2145        assert_eq!(ops[0].description, "Full table scan");
2146    }
2147
2148    #[test]
2149    fn test_parse_single_operation() {
2150        let op_json = serde_json::json!({
2151            "type": "IndexScan",
2152            "description": "Use index on Person(name)",
2153            "estimated_rows": 25,
2154            "children": [
2155                {"type": "Filter", "description": "Filter results"}
2156            ]
2157        });
2158        let op = Connection::parse_single_operation(&op_json);
2159        assert_eq!(op.op_type, "IndexScan");
2160        assert_eq!(op.description, "Use index on Person(name)");
2161        assert_eq!(op.estimated_rows, Some(25));
2162        assert_eq!(op.children.len(), 1);
2163        assert_eq!(op.children[0].op_type, "Filter");
2164    }
2165
2166    #[test]
2167    fn test_parse_single_operation_minimal() {
2168        let op_json = serde_json::json!({});
2169        let op = Connection::parse_single_operation(&op_json);
2170        assert_eq!(op.op_type, "Unknown");
2171        assert_eq!(op.description, "");
2172        assert_eq!(op.estimated_rows, None);
2173        assert!(op.children.is_empty());
2174    }
2175
2176    #[test]
2177    fn test_parse_single_operation_alt_fields() {
2178        let op_json = serde_json::json!({
2179            "op_type": "Sort",
2180            "desc": "Sort by name ASC",
2181            "rows": 100
2182        });
2183        let op = Connection::parse_single_operation(&op_json);
2184        assert_eq!(op.op_type, "Sort");
2185        assert_eq!(op.description, "Sort by name ASC");
2186        assert_eq!(op.estimated_rows, Some(100));
2187    }
2188
2189    // redact_dsn tests - Gap #7 (DSN Password Exposure)
2190
2191    #[test]
2192    fn test_redact_dsn_url_with_password() {
2193        let dsn = "quic://admin:secret123@localhost:3141";
2194        let redacted = redact_dsn(dsn);
2195        assert!(redacted.contains("[REDACTED]"));
2196        assert!(!redacted.contains("secret123"));
2197        assert!(redacted.contains("admin"));
2198        assert!(redacted.contains("localhost"));
2199    }
2200
2201    #[test]
2202    fn test_redact_dsn_url_without_password() {
2203        let dsn = "quic://admin@localhost:3141";
2204        let redacted = redact_dsn(dsn);
2205        assert!(!redacted.contains("[REDACTED]"));
2206        assert!(redacted.contains("admin"));
2207        assert!(redacted.contains("localhost"));
2208    }
2209
2210    #[test]
2211    fn test_redact_dsn_url_no_auth() {
2212        let dsn = "quic://localhost:3141";
2213        let redacted = redact_dsn(dsn);
2214        assert_eq!(redacted, dsn);
2215    }
2216
2217    #[test]
2218    fn test_redact_dsn_query_param_password() {
2219        let dsn = "localhost:3141?username=admin&password=secret123";
2220        let redacted = redact_dsn(dsn);
2221        assert!(redacted.contains("[REDACTED]"));
2222        assert!(!redacted.contains("secret123"));
2223        assert!(redacted.contains("username=admin"));
2224    }
2225
2226    #[test]
2227    fn test_redact_dsn_query_param_pass() {
2228        let dsn = "localhost:3141?user=admin&pass=mysecret";
2229        let redacted = redact_dsn(dsn);
2230        assert!(redacted.contains("[REDACTED]"));
2231        assert!(!redacted.contains("mysecret"));
2232    }
2233
2234    #[test]
2235    fn test_redact_dsn_simple_no_password() {
2236        let dsn = "localhost:3141?insecure=true";
2237        let redacted = redact_dsn(dsn);
2238        assert_eq!(redacted, dsn);
2239    }
2240
2241    #[test]
2242    fn test_redact_dsn_url_with_query_and_password() {
2243        let dsn = "quic://user:pass@localhost:3141?insecure=true";
2244        let redacted = redact_dsn(dsn);
2245        assert!(redacted.contains("[REDACTED]"));
2246        assert!(!redacted.contains(":pass@"));
2247        assert!(redacted.contains("insecure=true"));
2248    }
2249
2250    // validate() tests - Gap #9 (Validation Not Automatic)
2251
2252    #[test]
2253    fn test_client_validate_valid() {
2254        let client = Client::new("localhost", 3141);
2255        assert!(client.validate().is_ok());
2256    }
2257
2258    #[test]
2259    fn test_client_validate_valid_hostname() {
2260        let client = Client::new("geode.example.com", 3141);
2261        assert!(client.validate().is_ok());
2262    }
2263
2264    #[test]
2265    fn test_client_validate_valid_ipv4() {
2266        let client = Client::new("192.168.1.1", 8443);
2267        assert!(client.validate().is_ok());
2268    }
2269
2270    #[test]
2271    fn test_client_validate_invalid_hostname_hyphen_start() {
2272        let client = Client::new("-invalid", 3141);
2273        assert!(client.validate().is_err());
2274    }
2275
2276    #[test]
2277    fn test_client_validate_invalid_hostname_hyphen_end() {
2278        let client = Client::new("invalid-", 3141);
2279        assert!(client.validate().is_err());
2280    }
2281
2282    #[test]
2283    fn test_client_validate_invalid_port_zero() {
2284        let client = Client::new("localhost", 0);
2285        assert!(client.validate().is_err());
2286    }
2287
2288    #[test]
2289    fn test_client_validate_invalid_page_size_zero() {
2290        let client = Client::new("localhost", 3141).page_size(0);
2291        assert!(client.validate().is_err());
2292    }
2293
2294    #[test]
2295    fn test_client_validate_invalid_page_size_too_large() {
2296        let client = Client::new("localhost", 3141).page_size(200_000);
2297        assert!(client.validate().is_err());
2298    }
2299
2300    #[test]
2301    fn test_client_validate_with_all_options() {
2302        let client = Client::new("geode.example.com", 8443)
2303            .skip_verify(true)
2304            .page_size(500)
2305            .username("admin")
2306            .password("secret")
2307            .connect_timeout(15)
2308            .hello_timeout(10)
2309            .idle_timeout(60);
2310        assert!(client.validate().is_ok());
2311    }
2312
2313    // Gap #13: Test that extreme timeout values don't cause builder panics
2314    #[test]
2315    fn test_client_extreme_timeout_values() {
2316        // These should not panic - the actual validation/capping happens at connect time
2317        let _client = Client::new("localhost", 3141)
2318            .connect_timeout(u64::MAX)
2319            .hello_timeout(u64::MAX)
2320            .idle_timeout(u64::MAX);
2321        // Builder should accept any u64 value without panicking
2322    }
2323}