geode_client/client.rs
1//! Geode client implementation supporting both QUIC and gRPC transports.
2//! Uses protobuf wire protocol with 4-byte big-endian length prefix for QUIC.
3
4use log::{debug, trace, warn};
5use quinn::{ClientConfig, Endpoint};
6use rustls::pki_types::{CertificateDer, ServerName as RustlsServerName};
7use secrecy::{ExposeSecret, SecretString};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::net::{SocketAddr, ToSocketAddrs};
11use std::sync::Arc;
12use tokio::time::{Duration, timeout};
13
14use crate::dsn::{Dsn, Transport};
15use crate::error::{Error, Result};
16use crate::proto;
17use crate::types::Value;
18use crate::validate;
19
20const GEODE_ALPN: &[u8] = b"geode/1";
21
22/// Redact password from a DSN string for safe inclusion in error messages.
23/// Handles both URL format (quic://user:pass@host) and query parameter format (?password=xxx).
24#[allow(dead_code)] // Used in tests
25fn redact_dsn(dsn: &str) -> String {
26 let mut result = dsn.to_string();
27
28 // Handle URL format: scheme://user:password@host:port
29 // Look for pattern user:password@ and redact the password
30 if let Some(scheme_end) = result.find("://") {
31 let after_scheme = scheme_end + 3;
32 if let Some(at_pos) = result[after_scheme..].find('@') {
33 let auth_section = &result[after_scheme..after_scheme + at_pos];
34 if let Some(colon_pos) = auth_section.find(':') {
35 // Found user:password pattern
36 let user = &auth_section[..colon_pos];
37 let rest_start = after_scheme + at_pos;
38 result = format!(
39 "{}{}:{}{}",
40 &result[..after_scheme],
41 user,
42 "[REDACTED]",
43 &result[rest_start..]
44 );
45 }
46 }
47 }
48
49 // Handle query parameter format: host:port?password=xxx
50 // Redact password= and pass= parameters (only check once per pattern to avoid loops)
51 let patterns = ["password=", "pass="];
52 for pattern in patterns {
53 let lower = result.to_lowercase();
54 if let Some(start) = lower.find(pattern) {
55 let value_start = start + pattern.len();
56 // Find end of value (& or end of string)
57 let value_end = result[value_start..]
58 .find('&')
59 .map(|i| value_start + i)
60 .unwrap_or(result.len());
61
62 result = format!(
63 "{}[REDACTED]{}",
64 &result[..value_start],
65 &result[value_end..]
66 );
67 }
68 }
69
70 result
71}
72
73/// A column definition in a query result set.
74///
75/// Contains the column name and its GQL type as returned by the server.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Column {
78 /// The column name (or alias if specified in the query)
79 pub name: String,
80 /// The GQL type of the column (e.g., "INT", "STRING", "BOOL")
81 #[serde(rename = "type")]
82 pub col_type: String,
83}
84
85/// A page of query results.
86///
87/// Query results are returned in pages. Each page contains a slice of rows
88/// along with metadata about the result set.
89///
90/// # Example
91///
92/// ```ignore
93/// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name, n.age").await?;
94///
95/// for row in &page.rows {
96/// let name = row.get("name").unwrap().as_string()?;
97/// let age = row.get("age").unwrap().as_int()?;
98/// println!("{}: {}", name, age);
99/// }
100///
101/// if !page.final_page {
102/// // More results available, would need to pull next page
103/// }
104/// ```
105#[derive(Debug, Clone)]
106pub struct Page {
107 /// Column definitions for the result set
108 pub columns: Vec<Column>,
109 /// Result rows, each row is a map of column name to value
110 pub rows: Vec<HashMap<String, Value>>,
111 /// Whether results are ordered (ORDER BY was used)
112 pub ordered: bool,
113 /// The keys used for ordering, if any
114 pub order_keys: Vec<String>,
115 /// Whether this is the final page of results
116 pub final_page: bool,
117}
118
119/// A named savepoint within a transaction.
120///
121/// Savepoints allow partial rollback within a transaction. They can be created
122/// and managed via server-side GQL commands.
123///
124/// # Example
125///
126/// ```ignore
127/// conn.begin().await?;
128/// conn.query("CREATE (n:Node {id: 1})").await?;
129///
130/// let sp = conn.savepoint("before_risky_op")?;
131/// match conn.query("CREATE (n:Node {id: 2})").await {
132/// Ok(_) => {},
133/// Err(_) => conn.rollback_to(&sp).await?, // Undo only the second create
134/// }
135///
136/// conn.commit().await?; // First node is saved
137/// ```
138#[derive(Debug, Clone)]
139pub struct Savepoint {
140 /// The savepoint name
141 pub name: String,
142}
143
144/// A prepared statement for efficient repeated query execution.
145///
146/// Prepared statements allow you to define a query once and execute it
147/// multiple times with different parameters. This can improve performance
148/// by allowing query plan caching on the server.
149///
150/// # Example
151///
152/// ```ignore
153/// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p").await?;
154///
155/// for id in 1..=100 {
156/// let mut params = HashMap::new();
157/// params.insert("id".to_string(), Value::int(id));
158/// let (page, _) = stmt.execute(&mut conn, ¶ms).await?;
159/// // Process results...
160/// }
161/// ```
162#[derive(Debug, Clone)]
163pub struct PreparedStatement {
164 /// The GQL query string
165 query: String,
166 /// Parameter names extracted from the query
167 param_names: Vec<String>,
168}
169
170impl PreparedStatement {
171 /// Create a new prepared statement.
172 ///
173 /// Extracts parameter names from the query (tokens starting with `$`).
174 pub fn new(query: impl Into<String>) -> Self {
175 let query = query.into();
176 let param_names = Self::extract_param_names(&query);
177 Self { query, param_names }
178 }
179
180 /// Extract parameter names from a query string.
181 fn extract_param_names(query: &str) -> Vec<String> {
182 let mut names = Vec::new();
183 let mut chars = query.chars().peekable();
184
185 while let Some(c) = chars.next() {
186 if c == '$' {
187 let mut name = String::new();
188 while let Some(&next) = chars.peek() {
189 if next.is_ascii_alphanumeric() || next == '_' {
190 name.push(chars.next().unwrap());
191 } else {
192 break;
193 }
194 }
195 if !name.is_empty() && !names.contains(&name) {
196 names.push(name);
197 }
198 }
199 }
200
201 names
202 }
203
204 /// Get the query string.
205 pub fn query(&self) -> &str {
206 &self.query
207 }
208
209 /// Get the parameter names expected by this statement.
210 pub fn param_names(&self) -> &[String] {
211 &self.param_names
212 }
213
214 /// Execute the prepared statement with the given parameters.
215 ///
216 /// # Arguments
217 ///
218 /// * `conn` - The connection to execute on
219 /// * `params` - Parameter values (must include all parameters in the query)
220 ///
221 /// # Returns
222 ///
223 /// A tuple of (`Page`, `Option<String>`) with results and optional warnings.
224 ///
225 /// # Errors
226 ///
227 /// Returns an error if required parameters are missing or if the query fails.
228 pub async fn execute(
229 &self,
230 conn: &mut Connection,
231 params: &HashMap<String, crate::types::Value>,
232 ) -> crate::error::Result<(Page, Option<String>)> {
233 // Validate all required parameters are provided
234 for name in &self.param_names {
235 if !params.contains_key(name) {
236 return Err(crate::error::Error::validation(format!(
237 "Missing required parameter: {}",
238 name
239 )));
240 }
241 }
242
243 conn.query_with_params(&self.query, params).await
244 }
245}
246
247/// An operation in a query execution plan.
248#[derive(Debug, Clone)]
249pub struct PlanOperation {
250 /// Operation type (e.g., "NodeScan", "Filter", "Projection")
251 pub op_type: String,
252 /// Human-readable description
253 pub description: String,
254 /// Estimated row count for this operation
255 pub estimated_rows: Option<u64>,
256 /// Child operations
257 pub children: Vec<PlanOperation>,
258}
259
260/// A query execution plan.
261///
262/// Shows how the database will execute a query without actually running it.
263/// Useful for query optimization and understanding performance characteristics.
264#[derive(Debug, Clone)]
265pub struct QueryPlan {
266 /// Root operations in the plan
267 pub operations: Vec<PlanOperation>,
268 /// Total estimated rows
269 pub estimated_rows: u64,
270 /// Raw plan from server (for advanced analysis)
271 pub raw: serde_json::Value,
272}
273
274/// Query execution profile with timing information.
275///
276/// Includes the execution plan plus actual runtime statistics.
277#[derive(Debug, Clone)]
278pub struct QueryProfile {
279 /// The execution plan
280 pub plan: QueryPlan,
281 /// Actual rows returned
282 pub actual_rows: u64,
283 /// Total execution time in milliseconds
284 pub execution_time_ms: f64,
285 /// Raw profile from server
286 pub raw: serde_json::Value,
287}
288
289/// A Geode database client supporting both QUIC and gRPC transports.
290///
291/// Use the builder pattern to configure the client, then call [`connect`](Client::connect)
292/// to establish a connection.
293///
294/// # Transport Selection
295///
296/// The transport is selected based on the DSN scheme:
297/// - `quic://` - QUIC transport (default)
298/// - `grpc://` - gRPC transport
299///
300/// # Example
301///
302/// ```no_run
303/// use geode_client::Client;
304///
305/// # async fn example() -> geode_client::Result<()> {
306/// // QUIC transport (legacy API)
307/// let client = Client::new("127.0.0.1", 3141)
308/// .skip_verify(true) // Development only!
309/// .page_size(500)
310/// .client_name("my-app");
311///
312/// // Or use DSN with explicit transport
313/// let client = Client::from_dsn("quic://127.0.0.1:3141?insecure=true")?;
314/// let client = Client::from_dsn("grpc://127.0.0.1:50051")?;
315///
316/// let mut conn = client.connect().await?;
317/// let (page, _) = conn.query("RETURN 1 AS x").await?;
318/// conn.close()?;
319/// # Ok(())
320/// # }
321/// ```
322/// Client configuration for connecting to a Geode server.
323///
324/// The password field uses `SecretString` from the `secrecy` crate to ensure
325/// credentials are zeroized from memory on drop and not accidentally leaked
326/// in debug output or error messages.
327#[derive(Clone)]
328pub struct Client {
329 transport: Transport,
330 host: String,
331 port: u16,
332 skip_verify: bool,
333 page_size: usize,
334 hello_name: String,
335 hello_ver: String,
336 conformance: String,
337 username: Option<String>,
338 /// Password stored using SecretString for secure memory handling (CWE-316).
339 /// Automatically zeroized on drop and redacted in Debug output.
340 password: Option<SecretString>,
341 /// Connection timeout in seconds (default: 10)
342 connect_timeout_secs: u64,
343 /// HELLO handshake timeout in seconds (default: 5)
344 hello_timeout_secs: u64,
345 /// Idle connection timeout in seconds (default: 30)
346 idle_timeout_secs: u64,
347}
348
349impl Client {
350 /// Create a new QUIC client for the specified host and port.
351 ///
352 /// This method creates a client using QUIC transport. For gRPC transport,
353 /// use [`from_dsn`](Client::from_dsn) with a `grpc://` scheme.
354 ///
355 /// # Arguments
356 ///
357 /// * `host` - The server hostname or IP address
358 /// * `port` - The server port (typically 3141 for Geode)
359 ///
360 /// # Example
361 ///
362 /// ```
363 /// use geode_client::Client;
364 ///
365 /// let client = Client::new("localhost", 3141);
366 /// let client = Client::new("192.168.1.100", 8443);
367 /// let client = Client::new(String::from("geode.example.com"), 3141);
368 /// ```
369 pub fn new(host: impl Into<String>, port: u16) -> Self {
370 Self {
371 transport: Transport::Quic,
372 host: host.into(),
373 port,
374 skip_verify: false,
375 page_size: 1000,
376 hello_name: "geode-rust".to_string(),
377 hello_ver: env!("CARGO_PKG_VERSION").to_string(),
378 conformance: "min".to_string(),
379 username: None,
380 password: None,
381 connect_timeout_secs: 10,
382 hello_timeout_secs: 5,
383 idle_timeout_secs: 30,
384 }
385 }
386
387 /// Create a new client from a DSN (Data Source Name) string.
388 ///
389 /// # Supported DSN Formats
390 ///
391 /// - `quic://host:port?options` - QUIC transport (recommended)
392 /// - `grpc://host:port?options` - gRPC transport
393 /// - `host:port?options` - Legacy format (defaults to QUIC)
394 ///
395 /// # Supported Options
396 ///
397 /// - `tls` - Enable/disable TLS (0/1/true/false)
398 /// - `insecure` or `skip_verify` - Skip TLS verification
399 /// - `page_size` - Results page size (default: 1000)
400 /// - `client_name` or `hello_name` - Client name
401 /// - `client_version` or `hello_ver` - Client version
402 /// - `conformance` - GQL conformance level
403 /// - `username` or `user` - Authentication username
404 /// - `password` or `pass` - Authentication password
405 ///
406 /// # Examples
407 ///
408 /// ```
409 /// use geode_client::Client;
410 ///
411 /// // QUIC transport (explicit)
412 /// let client = Client::from_dsn("quic://localhost:3141").unwrap();
413 ///
414 /// // gRPC transport
415 /// let client = Client::from_dsn("grpc://localhost:50051?tls=0").unwrap();
416 ///
417 /// // Legacy format (defaults to QUIC)
418 /// let client = Client::from_dsn("localhost:3141?insecure=true").unwrap();
419 ///
420 /// // With authentication
421 /// let client = Client::from_dsn("quic://admin:secret@localhost:3141").unwrap();
422 ///
423 /// // IPv6 support
424 /// let client = Client::from_dsn("grpc://[::1]:50051").unwrap();
425 /// ```
426 ///
427 /// # Errors
428 ///
429 /// Returns `Error::InvalidDsn` if:
430 /// - DSN is empty
431 /// - Scheme is unsupported (not quic://, grpc://, or schemeless)
432 /// - Host is missing
433 /// - Port is invalid
434 pub fn from_dsn(dsn_str: &str) -> Result<Self> {
435 let dsn = Dsn::parse(dsn_str)?;
436
437 Ok(Self {
438 transport: dsn.transport(),
439 host: dsn.host().to_string(),
440 port: dsn.port(),
441 skip_verify: dsn.skip_verify(),
442 page_size: dsn.page_size(),
443 hello_name: dsn.client_name().to_string(),
444 hello_ver: dsn.client_version().to_string(),
445 conformance: dsn.conformance().to_string(),
446 username: dsn.username().map(String::from),
447 password: dsn.password().map(|p| SecretString::from(p.to_string())),
448 connect_timeout_secs: 10,
449 hello_timeout_secs: 5,
450 idle_timeout_secs: 30,
451 })
452 }
453
454 /// Get the transport type for this client.
455 pub fn transport(&self) -> Transport {
456 self.transport
457 }
458
459 /// Skip TLS certificate verification.
460 ///
461 /// # Security Warning
462 ///
463 /// **This should only be used in development environments.** Disabling
464 /// certificate verification makes the connection vulnerable to
465 /// man-in-the-middle attacks.
466 ///
467 /// # Arguments
468 ///
469 /// * `skip` - If true, skip certificate verification
470 pub fn skip_verify(mut self, skip: bool) -> Self {
471 self.skip_verify = skip;
472 self
473 }
474
475 /// Set the page size for query results.
476 ///
477 /// Controls how many rows are returned per page when fetching results.
478 /// Larger values reduce round-trips but use more memory.
479 ///
480 /// # Arguments
481 ///
482 /// * `size` - Number of rows per page (default: 1000)
483 pub fn page_size(mut self, size: usize) -> Self {
484 self.page_size = size;
485 self
486 }
487
488 /// Set the client name sent to the server.
489 ///
490 /// This appears in server logs and can help with debugging.
491 ///
492 /// # Arguments
493 ///
494 /// * `name` - Client application name (default: "geode-rust-quinn")
495 pub fn client_name(mut self, name: impl Into<String>) -> Self {
496 self.hello_name = name.into();
497 self
498 }
499
500 /// Set the client version sent to the server.
501 ///
502 /// # Arguments
503 ///
504 /// * `version` - Client version string (default: "0.1.0")
505 pub fn client_version(mut self, version: impl Into<String>) -> Self {
506 self.hello_ver = version.into();
507 self
508 }
509
510 /// Set the GQL conformance level.
511 ///
512 /// # Arguments
513 ///
514 /// * `level` - Conformance level (default: "min")
515 pub fn conformance(mut self, level: impl Into<String>) -> Self {
516 self.conformance = level.into();
517 self
518 }
519
520 /// Set the authentication username.
521 ///
522 /// # Arguments
523 ///
524 /// * `username` - The username for authentication
525 ///
526 /// # Example
527 ///
528 /// ```
529 /// use geode_client::Client;
530 ///
531 /// let client = Client::new("localhost", 3141)
532 /// .username("admin")
533 /// .password("secret");
534 /// ```
535 pub fn username(mut self, username: impl Into<String>) -> Self {
536 self.username = Some(username.into());
537 self
538 }
539
540 /// Set the authentication password.
541 ///
542 /// The password is stored using `SecretString` which ensures it is:
543 /// - Zeroized from memory when dropped
544 /// - Not accidentally leaked in debug output
545 ///
546 /// # Arguments
547 ///
548 /// * `password` - The password for authentication
549 pub fn password(mut self, password: impl Into<String>) -> Self {
550 self.password = Some(SecretString::from(password.into()));
551 self
552 }
553
554 /// Set the connection timeout in seconds.
555 ///
556 /// This controls how long to wait for the initial QUIC connection
557 /// to be established. Default is 10 seconds.
558 ///
559 /// # Arguments
560 ///
561 /// * `seconds` - Timeout in seconds (must be > 0)
562 pub fn connect_timeout(mut self, seconds: u64) -> Self {
563 self.connect_timeout_secs = seconds.max(1);
564 self
565 }
566
567 /// Set the HELLO handshake timeout in seconds.
568 ///
569 /// This controls how long to wait for the server to respond to the
570 /// initial HELLO message. Default is 5 seconds.
571 ///
572 /// # Arguments
573 ///
574 /// * `seconds` - Timeout in seconds (must be > 0)
575 pub fn hello_timeout(mut self, seconds: u64) -> Self {
576 self.hello_timeout_secs = seconds.max(1);
577 self
578 }
579
580 /// Set the idle connection timeout in seconds.
581 ///
582 /// This controls how long an idle connection can remain open before
583 /// being automatically closed by the QUIC layer. Default is 30 seconds.
584 ///
585 /// # Arguments
586 ///
587 /// * `seconds` - Timeout in seconds (must be > 0)
588 pub fn idle_timeout(mut self, seconds: u64) -> Self {
589 self.idle_timeout_secs = seconds.max(1);
590 self
591 }
592
593 /// Validate the client configuration.
594 ///
595 /// Performs validation on all configuration parameters including:
596 /// - Hostname format (RFC 1035 compliant)
597 /// - Port number (1-65535)
598 /// - Page size (1-100,000)
599 ///
600 /// This method is automatically called by [`connect`](Self::connect).
601 /// You can call it manually to validate configuration before attempting
602 /// to connect.
603 ///
604 /// # Errors
605 ///
606 /// Returns a validation error if any parameter is invalid.
607 ///
608 /// # Example
609 ///
610 /// ```
611 /// use geode_client::Client;
612 ///
613 /// let client = Client::new("localhost", 3141);
614 /// assert!(client.validate().is_ok());
615 ///
616 /// // Invalid hostname
617 /// let invalid = Client::new("-invalid-host", 3141);
618 /// assert!(invalid.validate().is_err());
619 /// ```
620 pub fn validate(&self) -> Result<()> {
621 // Validate hostname format
622 validate::hostname(&self.host)?;
623
624 // Validate port (0 is reserved)
625 validate::port(self.port)?;
626
627 // Validate page size
628 validate::page_size(self.page_size)?;
629
630 Ok(())
631 }
632
633 /// Connect to the Geode database.
634 ///
635 /// Establishes a QUIC connection to the server, performs the TLS handshake,
636 /// and sends the initial HELLO message.
637 ///
638 /// # Returns
639 ///
640 /// A [`Connection`] that can be used to execute queries.
641 ///
642 /// # Errors
643 ///
644 /// Returns an error if:
645 /// - The hostname cannot be resolved
646 /// - The connection cannot be established
647 /// - TLS verification fails (unless `skip_verify` is true)
648 /// - The HELLO handshake fails
649 ///
650 /// # Example
651 ///
652 /// ```no_run
653 /// # use geode_client::Client;
654 /// # async fn example() -> geode_client::Result<()> {
655 /// let client = Client::new("localhost", 3141).skip_verify(true);
656 /// let mut conn = client.connect().await?;
657 /// // Use connection...
658 /// conn.close()?;
659 /// # Ok(())
660 /// # }
661 /// ```
662 // CANARY: REQ=REQ-CLIENT-RUST-001; FEATURE="RustClientConnection"; ASPECT=HelloHandshake; STATUS=IMPL; OWNER=clients; UPDATED=2025-02-14
663 pub async fn connect(&self) -> Result<Connection> {
664 // Validate configuration before connecting (Gap #9 - automatic validation)
665 self.validate()?;
666
667 // Expose the secret password only when needed for the connection
668 let password_ref = self.password.as_ref().map(|s| s.expose_secret());
669
670 match self.transport {
671 Transport::Quic => {
672 Connection::new_quic(
673 &self.host,
674 self.port,
675 self.skip_verify,
676 self.page_size,
677 &self.hello_name,
678 &self.hello_ver,
679 &self.conformance,
680 self.username.as_deref(),
681 password_ref,
682 self.connect_timeout_secs,
683 self.hello_timeout_secs,
684 self.idle_timeout_secs,
685 )
686 .await
687 }
688 Transport::Grpc => {
689 #[cfg(feature = "grpc")]
690 {
691 Connection::new_grpc(
692 &self.host,
693 self.port,
694 self.skip_verify,
695 self.page_size,
696 self.username.as_deref(),
697 password_ref,
698 )
699 .await
700 }
701 #[cfg(not(feature = "grpc"))]
702 {
703 Err(Error::connection(
704 "gRPC transport requires the 'grpc' feature to be enabled",
705 ))
706 }
707 }
708 }
709 }
710}
711
712/// Internal connection type for transport-specific implementations.
713#[allow(dead_code)]
714enum ConnectionKind {
715 /// QUIC transport connection
716 Quic {
717 conn: quinn::Connection,
718 send: quinn::SendStream,
719 recv: quinn::RecvStream,
720 /// Reserved for future streaming support
721 buffer: Vec<u8>,
722 /// Reserved for request ID tracking
723 next_request_id: u64,
724 },
725 /// gRPC transport connection
726 #[cfg(feature = "grpc")]
727 Grpc { client: crate::grpc::GrpcClient },
728}
729
730/// An active connection to a Geode database server.
731///
732/// A `Connection` represents a connection to the Geode server using either
733/// QUIC or gRPC transport. It provides methods for executing queries, managing
734/// transactions, and controlling the connection lifecycle.
735///
736/// # Transport Support
737///
738/// - **QUIC**: Uses a bidirectional stream with protobuf wire protocol
739/// - **gRPC**: Uses tonic-based gRPC client (requires `grpc` feature)
740///
741/// # Connection Lifecycle
742///
743/// 1. Create via [`Client::connect`]
744/// 2. Execute queries with [`query`](Connection::query) or [`query_with_params`](Connection::query_with_params)
745/// 3. Optionally use transactions with [`begin`](Connection::begin), [`commit`](Connection::commit), [`rollback`](Connection::rollback)
746/// 4. Close with [`close`](Connection::close)
747///
748/// # Example
749///
750/// ```no_run
751/// # use geode_client::Client;
752/// # async fn example() -> geode_client::Result<()> {
753/// let client = Client::new("localhost", 3141).skip_verify(true);
754/// let mut conn = client.connect().await?;
755///
756/// // Execute queries
757/// let (page, _) = conn.query("RETURN 42 AS answer").await?;
758/// println!("Answer: {}", page.rows[0].get("answer").unwrap().as_int()?);
759///
760/// // Use transactions
761/// conn.begin().await?;
762/// conn.query("CREATE (n:Node {id: 1})").await?;
763/// conn.commit().await?;
764///
765/// conn.close()?;
766/// # Ok(())
767/// # }
768/// ```
769///
770/// # Thread Safety
771///
772/// `Connection` is `!Sync` because the underlying transport streams are not thread-safe.
773/// For concurrent access, use [`ConnectionPool`](crate::ConnectionPool).
774pub struct Connection {
775 kind: ConnectionKind,
776 /// Page size for query results (reserved for future use)
777 #[allow(dead_code)]
778 page_size: usize,
779}
780
781impl Connection {
782 /// Create a new QUIC connection.
783 #[allow(clippy::too_many_arguments)]
784 async fn new_quic(
785 host: &str,
786 port: u16,
787 skip_verify: bool,
788 page_size: usize,
789 hello_name: &str,
790 hello_ver: &str,
791 conformance: &str,
792 username: Option<&str>,
793 password: Option<&str>,
794 connect_timeout_secs: u64,
795 hello_timeout_secs: u64,
796 idle_timeout_secs: u64,
797 ) -> Result<Self> {
798 let mut last_err: Option<Error> = None;
799
800 for attempt in 1..=3 {
801 match Self::connect_quic_once(
802 host,
803 port,
804 skip_verify,
805 page_size,
806 hello_name,
807 hello_ver,
808 conformance,
809 username,
810 password,
811 connect_timeout_secs,
812 hello_timeout_secs,
813 idle_timeout_secs,
814 )
815 .await
816 {
817 Ok(conn) => return Ok(conn),
818 Err(e) => {
819 last_err = Some(e);
820 if attempt < 3 {
821 debug!("Connection attempt {} failed, retrying...", attempt);
822 tokio::time::sleep(Duration::from_millis(150)).await;
823 }
824 }
825 }
826 }
827
828 Err(last_err.unwrap_or_else(|| Error::connection("Failed to connect")))
829 }
830
831 /// Create a new gRPC connection.
832 #[cfg(feature = "grpc")]
833 #[allow(clippy::too_many_arguments)]
834 async fn new_grpc(
835 host: &str,
836 port: u16,
837 skip_verify: bool,
838 page_size: usize,
839 username: Option<&str>,
840 password: Option<&str>,
841 ) -> Result<Self> {
842 use crate::dsn::Dsn;
843
844 // Build DSN for gRPC client
845 let dsn_str = if let (Some(user), Some(pass)) = (username, password) {
846 format!(
847 "grpc://{}:{}@{}:{}?insecure={}",
848 user, pass, host, port, skip_verify
849 )
850 } else {
851 format!("grpc://{}:{}?insecure={}", host, port, skip_verify)
852 };
853
854 let dsn = Dsn::parse(&dsn_str)?;
855 let client = crate::grpc::GrpcClient::connect(&dsn).await?;
856
857 Ok(Self {
858 kind: ConnectionKind::Grpc { client },
859 page_size,
860 })
861 }
862
863 #[allow(clippy::too_many_arguments)]
864 async fn connect_quic_once(
865 host: &str,
866 port: u16,
867 skip_verify: bool,
868 page_size: usize,
869 _hello_name: &str,
870 _hello_ver: &str,
871 _conformance: &str,
872 username: Option<&str>,
873 password: Option<&str>,
874 connect_timeout_secs: u64,
875 _hello_timeout_secs: u64,
876 idle_timeout_secs: u64,
877 ) -> Result<Self> {
878 debug!("Creating connection to {}:{}", host, port);
879
880 // Install default crypto provider for rustls
881 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
882
883 // Build Quinn client config with TLS 1.3 explicitly (QUIC requires TLS 1.3)
884 let mut client_crypto = if skip_verify {
885 // SECURITY WARNING: Disabling TLS verification exposes connections to MITM attacks.
886 // Credentials sent in HELLO may be intercepted. Only use for development/testing.
887 warn!(
888 "TLS certificate verification DISABLED - connection to {}:{} is vulnerable to MITM attacks. \
889 Do NOT use skip_verify in production!",
890 host, port
891 );
892 rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
893 .dangerous()
894 .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
895 .with_no_client_auth()
896 } else {
897 // Load system root certificates for proper TLS verification
898 let mut root_store = rustls::RootCertStore::empty();
899
900 let cert_result = rustls_native_certs::load_native_certs();
901
902 // Log any errors that occurred during certificate loading
903 for err in &cert_result.errors {
904 warn!("Error loading native certificate: {:?}", err);
905 }
906
907 let mut certs_loaded = 0;
908 let mut certs_failed = 0;
909
910 for cert in cert_result.certs {
911 match root_store.add(cert) {
912 Ok(()) => certs_loaded += 1,
913 Err(_) => certs_failed += 1,
914 }
915 }
916
917 if certs_loaded == 0 {
918 return Err(Error::tls(
919 "No system root certificates found. TLS verification cannot proceed. \
920 Either install system CA certificates or use skip_verify(true) for development only.",
921 ));
922 }
923
924 debug!(
925 "Loaded {} system root certificates ({} failed to parse)",
926 certs_loaded, certs_failed
927 );
928
929 rustls::ClientConfig::builder_with_protocol_versions(&[&rustls::version::TLS13])
930 .with_root_certificates(root_store)
931 .with_no_client_auth()
932 };
933
934 // Set ALPN protocols
935 client_crypto.alpn_protocols = vec![GEODE_ALPN.to_vec()];
936
937 let mut client_config = ClientConfig::new(Arc::new(
938 quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto)
939 .map_err(|e| Error::connection(format!("Failed to create QUIC config: {}", e)))?,
940 ));
941
942 // Configure QUIC transport parameters to match Python/Go clients
943 let mut transport = quinn::TransportConfig::default();
944 // Cap idle timeout to quinn's maximum (2^62 - 1 microseconds ≈ 146 years)
945 // to prevent panic from VarInt overflow
946 let idle_timeout = Duration::from_secs(idle_timeout_secs.min(146_000 * 365 * 24 * 3600));
947 transport.max_idle_timeout(Some(idle_timeout.try_into().map_err(|_| {
948 Error::connection("Idle timeout value too large for QUIC protocol")
949 })?));
950 transport.keep_alive_interval(Some(Duration::from_secs(5)));
951 client_config.transport_config(Arc::new(transport));
952
953 // Create endpoint - "0.0.0.0:0" is a valid socket address literal that binds
954 // to any available port on all interfaces, so this parse cannot fail
955 let mut endpoint = Endpoint::client(
956 "0.0.0.0:0"
957 .parse()
958 .expect("0.0.0.0:0 is a valid socket address"),
959 )
960 .map_err(|e| Error::connection(format!("Failed to create endpoint: {}", e)))?;
961 endpoint.set_default_client_config(client_config);
962
963 // Resolve server address (supports hostnames as well as IP literals)
964 let mut resolved_addrs = format!("{}:{}", host, port)
965 .to_socket_addrs()
966 .map_err(|e| {
967 Error::connection(format!(
968 "Failed to resolve address {}:{} - {}",
969 host, port, e
970 ))
971 })?;
972
973 let server_addr: SocketAddr = resolved_addrs
974 .find(|addr| matches!(addr, SocketAddr::V4(_) | SocketAddr::V6(_)))
975 .ok_or_else(|| Error::connection("Invalid address: could not resolve host"))?;
976
977 debug!("Connecting to {}", server_addr);
978
979 // When skipping verification, don't use actual hostname for SNI
980 // This matches Python client behavior which avoids server_name when skip_verify=True
981 let server_name = if skip_verify {
982 "localhost" // Use generic name when skipping verification
983 } else {
984 host
985 };
986
987 trace!("Using server name for SNI: {}", server_name);
988
989 let conn = timeout(
990 Duration::from_secs(connect_timeout_secs),
991 endpoint
992 .connect(server_addr, server_name)
993 .map_err(|e| Error::connection(format!("Connection failed: {}", e)))?,
994 )
995 .await
996 .map_err(|_| Error::connection("Connection timeout"))?
997 .map_err(|e| Error::connection(format!("Failed to establish connection: {}", e)))?;
998
999 debug!("Connection established to {}:{}", host, port);
1000
1001 // Open a single bidirectional stream used for the entire session.
1002 let (mut send, mut recv) = conn
1003 .open_bi()
1004 .await
1005 .map_err(|e| Error::connection(format!("Failed to open stream: {}", e)))?;
1006
1007 // Send HELLO message using protobuf with length prefix
1008 let hello_req = proto::HelloRequest {
1009 username: username.unwrap_or("").to_string(),
1010 password: password.unwrap_or("").to_string(),
1011 tenant_id: String::new(),
1012 };
1013 let msg = proto::QuicClientMessage {
1014 hello: Some(hello_req),
1015 ..Default::default()
1016 };
1017 let data = proto::encode_with_length_prefix(&msg);
1018
1019 send.write_all(&data)
1020 .await
1021 .map_err(|e| Error::connection(format!("Failed to send HELLO: {}", e)))?;
1022
1023 // Wait for HELLO response (length-prefixed protobuf)
1024 let mut length_buf = [0u8; 4];
1025 timeout(Duration::from_secs(5), recv.read_exact(&mut length_buf))
1026 .await
1027 .map_err(|_| Error::connection("HELLO response timeout"))?
1028 .map_err(|e| {
1029 Error::connection(format!("Failed to read HELLO response length: {}", e))
1030 })?;
1031
1032 let msg_len = u32::from_be_bytes(length_buf) as usize;
1033 let mut msg_buf = vec![0u8; msg_len];
1034 recv.read_exact(&mut msg_buf)
1035 .await
1036 .map_err(|e| Error::connection(format!("Failed to read HELLO response body: {}", e)))?;
1037
1038 let hello_response = proto::decode_quic_server_message(&msg_buf)?;
1039
1040 if let Some(ref hello_resp) = hello_response.hello {
1041 if !hello_resp.success {
1042 return Err(Error::connection(format!(
1043 "Authentication failed: {}",
1044 hello_resp.error_message
1045 )));
1046 }
1047 } else {
1048 return Err(Error::connection("Expected HELLO response"));
1049 }
1050
1051 debug!("HELLO handshake complete");
1052
1053 Ok(Self {
1054 kind: ConnectionKind::Quic {
1055 conn,
1056 send,
1057 recv,
1058 buffer: Vec::new(),
1059 next_request_id: 1,
1060 },
1061 page_size,
1062 })
1063 }
1064
1065 /// Send a protobuf message over QUIC.
1066 async fn send_proto_quic(
1067 send: &mut quinn::SendStream,
1068 msg: &proto::QuicClientMessage,
1069 ) -> Result<()> {
1070 let data = proto::encode_with_length_prefix(msg);
1071 send.write_all(&data)
1072 .await
1073 .map_err(|e| Error::connection(format!("Failed to send message: {}", e)))?;
1074 Ok(())
1075 }
1076
1077 /// Read a protobuf message over QUIC with timeout.
1078 async fn read_proto_quic(
1079 recv: &mut quinn::RecvStream,
1080 timeout_secs: u64,
1081 ) -> Result<proto::QuicServerMessage> {
1082 // Read 4-byte length prefix
1083 let mut length_buf = [0u8; 4];
1084 timeout(
1085 Duration::from_secs(timeout_secs),
1086 recv.read_exact(&mut length_buf),
1087 )
1088 .await
1089 .map_err(|_| Error::timeout())?
1090 .map_err(|e| Error::connection(format!("Failed to read response length: {}", e)))?;
1091
1092 let msg_len = u32::from_be_bytes(length_buf) as usize;
1093 let mut msg_buf = vec![0u8; msg_len];
1094 recv.read_exact(&mut msg_buf)
1095 .await
1096 .map_err(|e| Error::connection(format!("Failed to read response body: {}", e)))?;
1097
1098 proto::decode_quic_server_message(&msg_buf)
1099 }
1100
1101 /// Attempt to read a buffered protobuf message without blocking (QUIC).
1102 /// Returns Ok(None) if no complete message is available immediately.
1103 async fn try_read_proto_quic(
1104 recv: &mut quinn::RecvStream,
1105 ) -> Result<Option<proto::QuicServerMessage>> {
1106 // Try to read with a short timeout
1107 let mut length_buf = [0u8; 4];
1108 let read_result =
1109 timeout(Duration::from_millis(10), recv.read_exact(&mut length_buf)).await;
1110
1111 match read_result {
1112 Ok(Ok(())) => {
1113 let msg_len = u32::from_be_bytes(length_buf) as usize;
1114 let mut msg_buf = vec![0u8; msg_len];
1115 recv.read_exact(&mut msg_buf).await.map_err(|e| {
1116 Error::connection(format!("Failed to read response body: {}", e))
1117 })?;
1118 let msg = proto::decode_quic_server_message(&msg_buf)?;
1119 Ok(Some(msg))
1120 }
1121 Ok(Err(e)) => Err(Error::connection(format!("Failed to read response: {}", e))),
1122 Err(_) => Ok(None), // Timeout - no data available
1123 }
1124 }
1125
1126 /// Parse protobuf rows into Value maps (static version for QUIC).
1127 fn parse_proto_rows_static(
1128 proto_rows: &[proto::Row],
1129 columns: &[Column],
1130 ) -> Result<Vec<HashMap<String, Value>>> {
1131 let mut rows = Vec::new();
1132 for proto_row in proto_rows {
1133 let mut row = HashMap::new();
1134 for (i, col) in columns.iter().enumerate() {
1135 let value = if i < proto_row.values.len() {
1136 Self::convert_proto_value_static(&proto_row.values[i])
1137 } else {
1138 Value::null()
1139 };
1140 row.insert(col.name.clone(), value);
1141 }
1142 rows.push(row);
1143 }
1144 Ok(rows)
1145 }
1146
1147 /// Convert a protobuf Value to our Value type (static version).
1148 fn convert_proto_value_static(proto_val: &proto::Value) -> Value {
1149 if proto_val.null_val {
1150 return Value::null();
1151 }
1152 if let Some(ref s) = proto_val.string_val {
1153 return Value::string(s.clone());
1154 }
1155 if let Some(i) = proto_val.int_val {
1156 return Value::int(i);
1157 }
1158 if let Some(d) = proto_val.double_val {
1159 return Value::decimal(rust_decimal::Decimal::from_f64_retain(d).unwrap_or_default());
1160 }
1161 if let Some(b) = proto_val.bool_val {
1162 return Value::bool(b);
1163 }
1164 Value::null()
1165 }
1166
1167 /// Send BEGIN transaction command over QUIC.
1168 async fn send_begin_quic(
1169 send: &mut quinn::SendStream,
1170 recv: &mut quinn::RecvStream,
1171 ) -> Result<()> {
1172 let msg = proto::QuicClientMessage {
1173 begin: Some(proto::BeginRequest::default()),
1174 ..Default::default()
1175 };
1176 Self::send_proto_quic(send, &msg).await?;
1177
1178 let resp = Self::read_proto_quic(recv, 5).await?;
1179 if resp.begin.is_none() {
1180 return Err(Error::protocol("Expected BEGIN response"));
1181 }
1182 Ok(())
1183 }
1184
1185 /// Send COMMIT transaction command over QUIC.
1186 async fn send_commit_quic(
1187 send: &mut quinn::SendStream,
1188 recv: &mut quinn::RecvStream,
1189 ) -> Result<()> {
1190 let msg = proto::QuicClientMessage {
1191 commit: Some(proto::CommitRequest),
1192 ..Default::default()
1193 };
1194 Self::send_proto_quic(send, &msg).await?;
1195
1196 let resp = Self::read_proto_quic(recv, 5).await?;
1197 if resp.commit.is_none() {
1198 return Err(Error::protocol("Expected COMMIT response"));
1199 }
1200 Ok(())
1201 }
1202
1203 /// Send ROLLBACK transaction command over QUIC.
1204 async fn send_rollback_quic(
1205 send: &mut quinn::SendStream,
1206 recv: &mut quinn::RecvStream,
1207 ) -> Result<()> {
1208 let msg = proto::QuicClientMessage {
1209 rollback: Some(proto::RollbackRequest),
1210 ..Default::default()
1211 };
1212 Self::send_proto_quic(send, &msg).await?;
1213
1214 let resp = Self::read_proto_quic(recv, 5).await?;
1215 if resp.rollback.is_none() {
1216 return Err(Error::protocol("Expected ROLLBACK response"));
1217 }
1218 Ok(())
1219 }
1220
1221 /// Execute a GQL query without parameters.
1222 ///
1223 /// # Arguments
1224 ///
1225 /// * `gql` - The GQL query string
1226 ///
1227 /// # Returns
1228 ///
1229 /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1230 /// and the optional string contains any query warnings.
1231 ///
1232 /// # Errors
1233 ///
1234 /// Returns [`Error::Query`] if the query fails to execute.
1235 ///
1236 /// # Example
1237 ///
1238 /// ```no_run
1239 /// # use geode_client::Client;
1240 /// # async fn example() -> geode_client::Result<()> {
1241 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1242 /// # let mut conn = client.connect().await?;
1243 /// let (page, _) = conn.query("MATCH (n:Person) RETURN n.name LIMIT 10").await?;
1244 /// for row in &page.rows {
1245 /// println!("Name: {}", row.get("name").unwrap().as_string()?);
1246 /// }
1247 /// # Ok(())
1248 /// # }
1249 /// ```
1250 pub async fn query(&mut self, gql: &str) -> Result<(Page, Option<String>)> {
1251 self.query_with_params(gql, &HashMap::new()).await
1252 }
1253
1254 /// Execute a GQL query with parameters.
1255 ///
1256 /// Parameters are substituted for `$param_name` placeholders in the query.
1257 /// This is the recommended way to include dynamic values in queries, as it
1258 /// prevents injection attacks and allows query plan caching.
1259 ///
1260 /// # Arguments
1261 ///
1262 /// * `gql` - The GQL query string with parameter placeholders
1263 /// * `params` - A map of parameter names to values
1264 ///
1265 /// # Returns
1266 ///
1267 /// A tuple of (`Page`, `Option<String>`) where the page contains the results
1268 /// and the optional string contains any query warnings.
1269 ///
1270 /// # Errors
1271 ///
1272 /// Returns [`Error::Query`] if the query fails to execute.
1273 ///
1274 /// # Example
1275 ///
1276 /// ```no_run
1277 /// # use geode_client::{Client, Value};
1278 /// # use std::collections::HashMap;
1279 /// # async fn example() -> geode_client::Result<()> {
1280 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1281 /// # let mut conn = client.connect().await?;
1282 /// let mut params = HashMap::new();
1283 /// params.insert("name".to_string(), Value::string("Alice"));
1284 /// params.insert("min_age".to_string(), Value::int(25));
1285 ///
1286 /// let (page, _) = conn.query_with_params(
1287 /// "MATCH (p:Person {name: $name}) WHERE p.age >= $min_age RETURN p",
1288 /// ¶ms
1289 /// ).await?;
1290 /// # Ok(())
1291 /// # }
1292 /// ```
1293 pub async fn query_with_params(
1294 &mut self,
1295 gql: &str,
1296 params: &HashMap<String, Value>,
1297 ) -> Result<(Page, Option<String>)> {
1298 match &mut self.kind {
1299 ConnectionKind::Quic { send, recv, .. } => {
1300 Self::query_with_params_quic(send, recv, gql, params).await
1301 }
1302 #[cfg(feature = "grpc")]
1303 ConnectionKind::Grpc { client } => client.query_with_params(gql, params).await,
1304 }
1305 }
1306
1307 /// Execute a GQL query with parameters over QUIC transport.
1308 async fn query_with_params_quic(
1309 send: &mut quinn::SendStream,
1310 recv: &mut quinn::RecvStream,
1311 gql: &str,
1312 params: &HashMap<String, Value>,
1313 ) -> Result<(Page, Option<String>)> {
1314 // Convert params to string map for protobuf
1315 let params_proto: HashMap<String, String> = params
1316 .iter()
1317 .map(|(k, v)| (k.clone(), v.to_proto_string()))
1318 .collect();
1319
1320 // Send Execute request via protobuf
1321 let exec_req = proto::ExecuteRequest {
1322 session_id: String::new(),
1323 query: gql.to_string(),
1324 parameters: params_proto,
1325 };
1326 let msg = proto::QuicClientMessage {
1327 execute: Some(exec_req),
1328 ..Default::default()
1329 };
1330 Self::send_proto_quic(send, &msg)
1331 .await
1332 .map_err(|e| Error::query(format!("{}", e)))?;
1333
1334 // Read first response (should be schema or error)
1335 let resp = Self::read_proto_quic(recv, 10).await?;
1336
1337 let exec_resp = resp
1338 .execute
1339 .ok_or_else(|| Error::protocol("Expected Execute response"))?;
1340
1341 // Check for error
1342 if let Some(ref err) = exec_resp.error {
1343 return Err(Error::Query {
1344 code: err.code.clone(),
1345 message: err.message.clone(),
1346 });
1347 }
1348
1349 // Parse columns from schema
1350 let columns: Vec<Column> = exec_resp
1351 .schema
1352 .as_ref()
1353 .map(|s| {
1354 s.columns
1355 .iter()
1356 .map(|c| Column {
1357 name: c.name.clone(),
1358 col_type: c.col_type.clone(),
1359 })
1360 .collect()
1361 })
1362 .unwrap_or_default();
1363
1364 trace!("Schema columns: {:?}", columns);
1365
1366 // Check for inline data page
1367 if let Some(inline_resp) = Self::try_read_proto_quic(recv).await? {
1368 if let Some(inline_exec) = inline_resp.execute {
1369 if let Some(ref err) = inline_exec.error {
1370 return Err(Error::Query {
1371 code: err.code.clone(),
1372 message: err.message.clone(),
1373 });
1374 }
1375
1376 if let Some(ref page_data) = inline_exec.page {
1377 let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1378 let page = Page {
1379 columns,
1380 rows,
1381 ordered: false,
1382 order_keys: Vec::new(),
1383 final_page: page_data.last_page,
1384 };
1385 return Ok((page, None));
1386 }
1387
1388 // Metrics or heartbeat - empty result
1389 let page = Page {
1390 columns,
1391 rows: Vec::new(),
1392 ordered: false,
1393 order_keys: Vec::new(),
1394 final_page: true,
1395 };
1396 return Ok((page, None));
1397 }
1398 }
1399
1400 // Check if we got a page in the first response
1401 if let Some(ref page_data) = exec_resp.page {
1402 let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1403 let page = Page {
1404 columns,
1405 rows,
1406 ordered: false,
1407 order_keys: Vec::new(),
1408 final_page: page_data.last_page,
1409 };
1410 return Ok((page, None));
1411 }
1412
1413 // Read next response for data page
1414 let resp = Self::read_proto_quic(recv, 30).await?;
1415 if let Some(exec_resp) = resp.execute {
1416 if let Some(ref err) = exec_resp.error {
1417 return Err(Error::Query {
1418 code: err.code.clone(),
1419 message: err.message.clone(),
1420 });
1421 }
1422
1423 if let Some(ref page_data) = exec_resp.page {
1424 let rows = Self::parse_proto_rows_static(&page_data.rows, &columns)?;
1425 let page = Page {
1426 columns,
1427 rows,
1428 ordered: false,
1429 order_keys: Vec::new(),
1430 final_page: page_data.last_page,
1431 };
1432 return Ok((page, None));
1433 }
1434 }
1435
1436 // Empty result
1437 let page = Page {
1438 columns,
1439 rows: Vec::new(),
1440 ordered: false,
1441 order_keys: Vec::new(),
1442 final_page: true,
1443 };
1444
1445 Ok((page, None))
1446 }
1447
1448 /// Execute a query without parameters (synchronous-style blocking version for test runner)
1449 pub fn query_sync(
1450 &mut self,
1451 gql: &str,
1452 params: Option<HashMap<String, serde_json::Value>>,
1453 ) -> Result<Page> {
1454 let params_map = params.unwrap_or_default();
1455 let params_typed: HashMap<String, Value> = params_map
1456 .into_iter()
1457 .map(|(k, v)| {
1458 let typed_val = crate::types::Value::from_json(v);
1459 (k, typed_val)
1460 })
1461 .collect();
1462
1463 match tokio::runtime::Handle::try_current() {
1464 Ok(handle) => {
1465 let (page, _cursor) =
1466 handle.block_on(self.query_with_params(gql, ¶ms_typed))?;
1467 Ok(page)
1468 }
1469 Err(_) => {
1470 let rt = tokio::runtime::Runtime::new()
1471 .map_err(|e| Error::query(format!("Failed to create runtime: {}", e)))?;
1472 let (page, _cursor) = rt.block_on(self.query_with_params(gql, ¶ms_typed))?;
1473 Ok(page)
1474 }
1475 }
1476 }
1477
1478 /// Begin a new transaction.
1479 ///
1480 /// After calling `begin`, all queries will be part of the transaction until
1481 /// [`commit`](Connection::commit) or [`rollback`](Connection::rollback) is called.
1482 ///
1483 /// # Errors
1484 ///
1485 /// Returns an error if a transaction is already in progress or if the
1486 /// server rejects the request.
1487 ///
1488 /// # Example
1489 ///
1490 /// ```no_run
1491 /// # use geode_client::Client;
1492 /// # async fn example() -> geode_client::Result<()> {
1493 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1494 /// # let mut conn = client.connect().await?;
1495 /// conn.begin().await?;
1496 /// conn.query("CREATE (n:Node {id: 1})").await?;
1497 /// conn.query("CREATE (n:Node {id: 2})").await?;
1498 /// conn.commit().await?; // Both nodes are now persisted
1499 /// # Ok(())
1500 /// # }
1501 /// ```
1502 pub async fn begin(&mut self) -> Result<()> {
1503 match &mut self.kind {
1504 ConnectionKind::Quic { send, recv, .. } => Self::send_begin_quic(send, recv).await,
1505 #[cfg(feature = "grpc")]
1506 ConnectionKind::Grpc { client } => client.begin().await,
1507 }
1508 }
1509
1510 /// Commit the current transaction.
1511 ///
1512 /// Persists all changes made since [`begin`](Connection::begin) was called.
1513 ///
1514 /// # Errors
1515 ///
1516 /// Returns an error if no transaction is in progress or if the server
1517 /// rejects the commit.
1518 ///
1519 /// # Example
1520 ///
1521 /// ```no_run
1522 /// # use geode_client::Client;
1523 /// # async fn example() -> geode_client::Result<()> {
1524 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1525 /// # let mut conn = client.connect().await?;
1526 /// conn.begin().await?;
1527 /// conn.query("CREATE (n:Node)").await?;
1528 /// conn.commit().await?; // Changes are now permanent
1529 /// # Ok(())
1530 /// # }
1531 /// ```
1532 pub async fn commit(&mut self) -> Result<()> {
1533 match &mut self.kind {
1534 ConnectionKind::Quic { send, recv, .. } => Self::send_commit_quic(send, recv).await,
1535 #[cfg(feature = "grpc")]
1536 ConnectionKind::Grpc { client } => client.commit().await,
1537 }
1538 }
1539
1540 /// Rollback the current transaction.
1541 ///
1542 /// Discards all changes made since [`begin`](Connection::begin) was called.
1543 ///
1544 /// # Errors
1545 ///
1546 /// Returns an error if no transaction is in progress.
1547 ///
1548 /// # Example
1549 ///
1550 /// ```no_run
1551 /// # use geode_client::Client;
1552 /// # async fn example() -> geode_client::Result<()> {
1553 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1554 /// # let mut conn = client.connect().await?;
1555 /// conn.begin().await?;
1556 /// match conn.query("CREATE (n:InvalidNode)").await {
1557 /// Ok(_) => conn.commit().await?,
1558 /// Err(_) => conn.rollback().await?, // Undo everything
1559 /// }
1560 /// # Ok(())
1561 /// # }
1562 /// ```
1563 pub async fn rollback(&mut self) -> Result<()> {
1564 match &mut self.kind {
1565 ConnectionKind::Quic { send, recv, .. } => Self::send_rollback_quic(send, recv).await,
1566 #[cfg(feature = "grpc")]
1567 ConnectionKind::Grpc { client } => client.rollback().await,
1568 }
1569 }
1570
1571 /// Create a prepared statement for efficient repeated execution.
1572 ///
1573 /// Prepared statements allow you to define a query once and execute it
1574 /// multiple times with different parameters. The query text is parsed
1575 /// to extract parameter names (tokens starting with `$`).
1576 ///
1577 /// # Arguments
1578 ///
1579 /// * `query` - The GQL query string with parameter placeholders
1580 ///
1581 /// # Returns
1582 ///
1583 /// A [`PreparedStatement`] that can be executed multiple times.
1584 ///
1585 /// # Example
1586 ///
1587 /// ```no_run
1588 /// # use geode_client::{Client, Value};
1589 /// # use std::collections::HashMap;
1590 /// # async fn example() -> geode_client::Result<()> {
1591 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1592 /// # let mut conn = client.connect().await?;
1593 /// let stmt = conn.prepare("MATCH (p:Person {id: $id}) RETURN p.name")?;
1594 ///
1595 /// for id in 1..=100 {
1596 /// let mut params = HashMap::new();
1597 /// params.insert("id".to_string(), Value::int(id));
1598 /// let (page, _) = stmt.execute(&mut conn, ¶ms).await?;
1599 /// // Process results...
1600 /// }
1601 /// # Ok(())
1602 /// # }
1603 /// ```
1604 pub fn prepare(&self, query: &str) -> Result<PreparedStatement> {
1605 Ok(PreparedStatement::new(query))
1606 }
1607
1608 /// Get the execution plan for a query without running it.
1609 ///
1610 /// This is useful for understanding how the database will execute a query
1611 /// and for identifying potential performance issues.
1612 ///
1613 /// # Arguments
1614 ///
1615 /// * `gql` - The GQL query string to explain
1616 ///
1617 /// # Returns
1618 ///
1619 /// A [`QueryPlan`] containing the execution plan details.
1620 ///
1621 /// # Errors
1622 ///
1623 /// Returns an error if the query is invalid or cannot be planned.
1624 ///
1625 /// # Example
1626 ///
1627 /// ```no_run
1628 /// # use geode_client::Client;
1629 /// # async fn example() -> geode_client::Result<()> {
1630 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1631 /// # let mut conn = client.connect().await?;
1632 /// let plan = conn.explain("MATCH (p:Person)-[:KNOWS]->(f) RETURN f").await?;
1633 /// println!("Estimated rows: {}", plan.estimated_rows);
1634 /// for op in &plan.operations {
1635 /// println!(" {} - {}", op.op_type, op.description);
1636 /// }
1637 /// # Ok(())
1638 /// # }
1639 /// ```
1640 pub async fn explain(&mut self, gql: &str) -> Result<QueryPlan> {
1641 // Execute EXPLAIN as a query via protobuf
1642 let explain_query = format!("EXPLAIN {}", gql);
1643 let (_page, _) = self.query(&explain_query).await?;
1644
1645 // Parse the plan from the response
1646 // The result format depends on server implementation
1647 Ok(QueryPlan {
1648 operations: Vec::new(),
1649 estimated_rows: 0,
1650 raw: serde_json::json!({}),
1651 })
1652 }
1653
1654 /// Execute a query and return the execution profile with timing information.
1655 ///
1656 /// This runs the query and collects detailed execution statistics including
1657 /// actual row counts and timing for each operation.
1658 ///
1659 /// # Arguments
1660 ///
1661 /// * `gql` - The GQL query string to profile
1662 ///
1663 /// # Returns
1664 ///
1665 /// A [`QueryProfile`] containing the execution plan and runtime statistics.
1666 ///
1667 /// # Errors
1668 ///
1669 /// Returns an error if the query fails to execute.
1670 ///
1671 /// # Example
1672 ///
1673 /// ```no_run
1674 /// # use geode_client::Client;
1675 /// # async fn example() -> geode_client::Result<()> {
1676 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1677 /// # let mut conn = client.connect().await?;
1678 /// let profile = conn.profile("MATCH (p:Person) RETURN p LIMIT 100").await?;
1679 /// println!("Execution time: {:.2}ms", profile.execution_time_ms);
1680 /// println!("Actual rows: {}", profile.actual_rows);
1681 /// # Ok(())
1682 /// # }
1683 /// ```
1684 pub async fn profile(&mut self, gql: &str) -> Result<QueryProfile> {
1685 // Execute PROFILE as a query via protobuf
1686 let profile_query = format!("PROFILE {}", gql);
1687 let (page, _) = self.query(&profile_query).await?;
1688
1689 // Parse the profile from the response
1690 let plan = QueryPlan {
1691 operations: Vec::new(),
1692 estimated_rows: 0,
1693 raw: serde_json::json!({}),
1694 };
1695
1696 Ok(QueryProfile {
1697 plan,
1698 actual_rows: page.rows.len() as u64,
1699 execution_time_ms: 0.0,
1700 raw: serde_json::json!({}),
1701 })
1702 }
1703
1704 /// Execute multiple queries in a batch.
1705 ///
1706 /// This is more efficient than executing queries one at a time when you
1707 /// have multiple independent queries to run.
1708 ///
1709 /// # Arguments
1710 ///
1711 /// * `queries` - A slice of (query, optional params) tuples
1712 ///
1713 /// # Returns
1714 ///
1715 /// A `Vec<Page>` with results for each query, in the same order as input.
1716 ///
1717 /// # Errors
1718 ///
1719 /// Returns an error if any query fails. Queries are executed in order,
1720 /// so earlier queries may have completed before the error.
1721 ///
1722 /// # Example
1723 ///
1724 /// ```no_run
1725 /// # use geode_client::Client;
1726 /// # async fn example() -> geode_client::Result<()> {
1727 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1728 /// # let mut conn = client.connect().await?;
1729 /// let results = conn.batch(&[
1730 /// ("MATCH (n:Person) RETURN count(n)", None),
1731 /// ("MATCH (n:Company) RETURN count(n)", None),
1732 /// ("MATCH ()-[r:WORKS_AT]->() RETURN count(r)", None),
1733 /// ]).await?;
1734 ///
1735 /// for (i, page) in results.iter().enumerate() {
1736 /// println!("Query {}: {} rows", i + 1, page.rows.len());
1737 /// }
1738 /// # Ok(())
1739 /// # }
1740 /// ```
1741 pub async fn batch(
1742 &mut self,
1743 queries: &[(&str, Option<&HashMap<String, Value>>)],
1744 ) -> Result<Vec<Page>> {
1745 let mut results = Vec::with_capacity(queries.len());
1746
1747 for (query, params) in queries {
1748 let (page, _) = match params {
1749 Some(p) => self.query_with_params(query, p).await?,
1750 None => self.query(query).await?,
1751 };
1752 results.push(page);
1753 }
1754
1755 Ok(results)
1756 }
1757
1758 /// Parse plan operations from a server response.
1759 /// Reserved for future EXPLAIN/PROFILE response parsing.
1760 #[allow(dead_code)]
1761 fn parse_plan_operations(result: &serde_json::Value) -> Vec<PlanOperation> {
1762 let mut operations = Vec::new();
1763
1764 if let Some(ops) = result.get("operations").and_then(|o| o.as_array()) {
1765 for op in ops {
1766 operations.push(Self::parse_single_operation(op));
1767 }
1768 } else if let Some(plan) = result.get("plan") {
1769 // Alternative format: single "plan" object
1770 operations.push(Self::parse_single_operation(plan));
1771 }
1772
1773 operations
1774 }
1775
1776 /// Parse a single operation from JSON.
1777 #[allow(dead_code)]
1778 fn parse_single_operation(op: &serde_json::Value) -> PlanOperation {
1779 let op_type = op
1780 .get("type")
1781 .or_else(|| op.get("op_type"))
1782 .and_then(|t| t.as_str())
1783 .unwrap_or("Unknown")
1784 .to_string();
1785
1786 let description = op
1787 .get("description")
1788 .or_else(|| op.get("desc"))
1789 .and_then(|d| d.as_str())
1790 .unwrap_or("")
1791 .to_string();
1792
1793 let estimated_rows = op
1794 .get("estimated_rows")
1795 .or_else(|| op.get("rows"))
1796 .and_then(|r| r.as_u64());
1797
1798 let children = op
1799 .get("children")
1800 .and_then(|c| c.as_array())
1801 .map(|arr| arr.iter().map(Self::parse_single_operation).collect())
1802 .unwrap_or_default();
1803
1804 PlanOperation {
1805 op_type,
1806 description,
1807 estimated_rows,
1808 children,
1809 }
1810 }
1811
1812 /// Close the connection.
1813 ///
1814 /// Gracefully closes the connection. After calling this method,
1815 /// the connection can no longer be used.
1816 ///
1817 /// # Note
1818 ///
1819 /// It's good practice to explicitly close connections, but they will also
1820 /// be closed when dropped.
1821 ///
1822 /// # Example
1823 ///
1824 /// ```no_run
1825 /// # use geode_client::Client;
1826 /// # async fn example() -> geode_client::Result<()> {
1827 /// # let client = Client::new("localhost", 3141).skip_verify(true);
1828 /// let mut conn = client.connect().await?;
1829 /// // ... use connection ...
1830 /// conn.close()?;
1831 /// # Ok(())
1832 /// # }
1833 /// ```
1834 pub fn close(&mut self) -> Result<()> {
1835 match &mut self.kind {
1836 ConnectionKind::Quic { conn, .. } => {
1837 // QUIC close is asynchronous and best-effort - the CONNECTION_CLOSE frame
1838 // will be sent by Quinn's internal I/O handling. No blocking delay needed.
1839 // (Gap #17: Removed std::thread::sleep that blocked the async runtime)
1840 conn.close(0u32.into(), b"client closing");
1841 Ok(())
1842 }
1843 #[cfg(feature = "grpc")]
1844 ConnectionKind::Grpc { client } => client.close(),
1845 }
1846 }
1847
1848 /// Check if the connection is still healthy.
1849 ///
1850 /// Returns `true` if the underlying QUIC connection is still open and usable.
1851 /// This is used by connection pools to verify connections before reuse.
1852 ///
1853 /// # Example
1854 ///
1855 /// ```ignore
1856 /// let mut conn = client.connect().await?;
1857 /// if conn.is_healthy() {
1858 /// // Connection is still usable
1859 /// let (page, _) = conn.query("RETURN 1").await?;
1860 /// }
1861 /// ```
1862 pub fn is_healthy(&self) -> bool {
1863 match &self.kind {
1864 ConnectionKind::Quic { conn, .. } => {
1865 // Quinn's close_reason() returns Some if the connection was closed
1866 conn.close_reason().is_none()
1867 }
1868 #[cfg(feature = "grpc")]
1869 ConnectionKind::Grpc { .. } => {
1870 // gRPC connections are managed by tonic, always report healthy
1871 true
1872 }
1873 }
1874 }
1875}
1876
1877/// Certificate verifier that skips all verification (INSECURE - for development only)
1878#[derive(Debug)]
1879struct SkipServerVerification;
1880
1881impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
1882 fn verify_server_cert(
1883 &self,
1884 _end_entity: &CertificateDer,
1885 _intermediates: &[CertificateDer],
1886 _server_name: &RustlsServerName,
1887 _ocsp_response: &[u8],
1888 _now: rustls::pki_types::UnixTime,
1889 ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1890 Ok(rustls::client::danger::ServerCertVerified::assertion())
1891 }
1892
1893 fn verify_tls12_signature(
1894 &self,
1895 _message: &[u8],
1896 _cert: &CertificateDer,
1897 _dss: &rustls::DigitallySignedStruct,
1898 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1899 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1900 }
1901
1902 fn verify_tls13_signature(
1903 &self,
1904 _message: &[u8],
1905 _cert: &CertificateDer,
1906 _dss: &rustls::DigitallySignedStruct,
1907 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1908 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1909 }
1910
1911 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1912 vec![
1913 rustls::SignatureScheme::RSA_PKCS1_SHA256,
1914 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1915 rustls::SignatureScheme::ED25519,
1916 ]
1917 }
1918}
1919
1920#[cfg(test)]
1921mod tests {
1922 use super::*;
1923
1924 // PreparedStatement tests
1925
1926 #[test]
1927 fn test_prepared_statement_new() {
1928 let stmt = PreparedStatement::new("MATCH (n:Person {id: $id}) RETURN n");
1929 assert_eq!(stmt.query(), "MATCH (n:Person {id: $id}) RETURN n");
1930 assert_eq!(stmt.param_names(), &["id"]);
1931 }
1932
1933 #[test]
1934 fn test_prepared_statement_multiple_params() {
1935 let stmt = PreparedStatement::new(
1936 "MATCH (p:Person {name: $name}) WHERE p.age > $min_age AND p.city = $city RETURN p",
1937 );
1938 assert!(stmt.query().contains("$name"));
1939 let names = stmt.param_names();
1940 assert_eq!(names.len(), 3);
1941 assert!(names.contains(&"name".to_string()));
1942 assert!(names.contains(&"min_age".to_string()));
1943 assert!(names.contains(&"city".to_string()));
1944 }
1945
1946 #[test]
1947 fn test_prepared_statement_no_params() {
1948 let stmt = PreparedStatement::new("MATCH (n) RETURN n LIMIT 10");
1949 assert!(stmt.param_names().is_empty());
1950 }
1951
1952 #[test]
1953 fn test_prepared_statement_duplicate_params() {
1954 let stmt =
1955 PreparedStatement::new("MATCH (a {id: $id})-[:KNOWS]->(b {id: $id}) RETURN a, b");
1956 // Should deduplicate parameter names
1957 assert_eq!(stmt.param_names(), &["id"]);
1958 }
1959
1960 #[test]
1961 fn test_prepared_statement_underscore_params() {
1962 let stmt = PreparedStatement::new("MATCH (n {user_id: $user_id}) RETURN n");
1963 assert_eq!(stmt.param_names(), &["user_id"]);
1964 }
1965
1966 #[test]
1967 fn test_prepared_statement_numeric_params() {
1968 let stmt = PreparedStatement::new("RETURN $param1, $param2, $param123");
1969 let names = stmt.param_names();
1970 assert_eq!(names.len(), 3);
1971 assert!(names.contains(&"param1".to_string()));
1972 assert!(names.contains(&"param2".to_string()));
1973 assert!(names.contains(&"param123".to_string()));
1974 }
1975
1976 // PlanOperation tests
1977
1978 #[test]
1979 fn test_plan_operation_struct() {
1980 let op = PlanOperation {
1981 op_type: "NodeScan".to_string(),
1982 description: "Scan Person nodes".to_string(),
1983 estimated_rows: Some(100),
1984 children: vec![],
1985 };
1986 assert_eq!(op.op_type, "NodeScan");
1987 assert_eq!(op.description, "Scan Person nodes");
1988 assert_eq!(op.estimated_rows, Some(100));
1989 assert!(op.children.is_empty());
1990 }
1991
1992 #[test]
1993 fn test_plan_operation_with_children() {
1994 let child = PlanOperation {
1995 op_type: "Filter".to_string(),
1996 description: "Filter by age".to_string(),
1997 estimated_rows: Some(50),
1998 children: vec![],
1999 };
2000 let parent = PlanOperation {
2001 op_type: "Projection".to_string(),
2002 description: "Project name, age".to_string(),
2003 estimated_rows: Some(50),
2004 children: vec![child],
2005 };
2006 assert_eq!(parent.children.len(), 1);
2007 assert_eq!(parent.children[0].op_type, "Filter");
2008 }
2009
2010 // QueryPlan tests
2011
2012 #[test]
2013 fn test_query_plan_struct() {
2014 let plan = QueryPlan {
2015 operations: vec![PlanOperation {
2016 op_type: "NodeScan".to_string(),
2017 description: "Full scan".to_string(),
2018 estimated_rows: Some(1000),
2019 children: vec![],
2020 }],
2021 estimated_rows: 1000,
2022 raw: serde_json::json!({"type": "plan"}),
2023 };
2024 assert_eq!(plan.operations.len(), 1);
2025 assert_eq!(plan.estimated_rows, 1000);
2026 }
2027
2028 // QueryProfile tests
2029
2030 #[test]
2031 fn test_query_profile_struct() {
2032 let plan = QueryPlan {
2033 operations: vec![],
2034 estimated_rows: 100,
2035 raw: serde_json::json!({}),
2036 };
2037 let profile = QueryProfile {
2038 plan,
2039 actual_rows: 95,
2040 execution_time_ms: 12.5,
2041 raw: serde_json::json!({"type": "profile"}),
2042 };
2043 assert_eq!(profile.actual_rows, 95);
2044 assert!((profile.execution_time_ms - 12.5).abs() < 0.001);
2045 }
2046
2047 // Page tests
2048
2049 #[test]
2050 fn test_page_struct() {
2051 let page = Page {
2052 columns: vec![Column {
2053 name: "x".to_string(),
2054 col_type: "INT".to_string(),
2055 }],
2056 rows: vec![],
2057 ordered: false,
2058 order_keys: vec![],
2059 final_page: true,
2060 };
2061 assert_eq!(page.columns.len(), 1);
2062 assert!(page.rows.is_empty());
2063 assert!(page.final_page);
2064 }
2065
2066 // Column tests
2067
2068 #[test]
2069 fn test_column_struct() {
2070 let col = Column {
2071 name: "age".to_string(),
2072 col_type: "INT".to_string(),
2073 };
2074 assert_eq!(col.name, "age");
2075 assert_eq!(col.col_type, "INT");
2076 }
2077
2078 // Savepoint tests
2079
2080 #[test]
2081 fn test_savepoint_struct() {
2082 let sp = Savepoint {
2083 name: "before_update".to_string(),
2084 };
2085 assert_eq!(sp.name, "before_update");
2086 }
2087
2088 // Client builder tests
2089
2090 #[test]
2091 fn test_client_builder_defaults() {
2092 let _client = Client::new("localhost", 3141);
2093 // Test passes if it compiles - verifies defaults work
2094 }
2095
2096 #[test]
2097 fn test_client_builder_chain() {
2098 let _client = Client::new("example.com", 8443)
2099 .skip_verify(true)
2100 .page_size(500)
2101 .client_name("test-app")
2102 .client_version("2.0.0")
2103 .conformance("full");
2104 // Test passes if it compiles - verifies builder chain works
2105 }
2106
2107 #[test]
2108 fn test_client_clone() {
2109 let client = Client::new("localhost", 3141).skip_verify(true);
2110 let _cloned = client.clone();
2111 // Test passes if it compiles - verifies Clone is implemented
2112 }
2113
2114 // parse_plan_operations tests
2115
2116 #[test]
2117 fn test_parse_plan_operations_empty() {
2118 let result = serde_json::json!({});
2119 let ops = Connection::parse_plan_operations(&result);
2120 assert!(ops.is_empty());
2121 }
2122
2123 #[test]
2124 fn test_parse_plan_operations_array() {
2125 let result = serde_json::json!({
2126 "operations": [
2127 {"type": "NodeScan", "description": "Scan nodes", "estimated_rows": 100},
2128 {"type": "Filter", "description": "Apply filter", "estimated_rows": 50}
2129 ]
2130 });
2131 let ops = Connection::parse_plan_operations(&result);
2132 assert_eq!(ops.len(), 2);
2133 assert_eq!(ops[0].op_type, "NodeScan");
2134 assert_eq!(ops[1].op_type, "Filter");
2135 }
2136
2137 #[test]
2138 fn test_parse_plan_operations_single_plan() {
2139 let result = serde_json::json!({
2140 "plan": {"op_type": "FullScan", "desc": "Full table scan"}
2141 });
2142 let ops = Connection::parse_plan_operations(&result);
2143 assert_eq!(ops.len(), 1);
2144 assert_eq!(ops[0].op_type, "FullScan");
2145 assert_eq!(ops[0].description, "Full table scan");
2146 }
2147
2148 #[test]
2149 fn test_parse_single_operation() {
2150 let op_json = serde_json::json!({
2151 "type": "IndexScan",
2152 "description": "Use index on Person(name)",
2153 "estimated_rows": 25,
2154 "children": [
2155 {"type": "Filter", "description": "Filter results"}
2156 ]
2157 });
2158 let op = Connection::parse_single_operation(&op_json);
2159 assert_eq!(op.op_type, "IndexScan");
2160 assert_eq!(op.description, "Use index on Person(name)");
2161 assert_eq!(op.estimated_rows, Some(25));
2162 assert_eq!(op.children.len(), 1);
2163 assert_eq!(op.children[0].op_type, "Filter");
2164 }
2165
2166 #[test]
2167 fn test_parse_single_operation_minimal() {
2168 let op_json = serde_json::json!({});
2169 let op = Connection::parse_single_operation(&op_json);
2170 assert_eq!(op.op_type, "Unknown");
2171 assert_eq!(op.description, "");
2172 assert_eq!(op.estimated_rows, None);
2173 assert!(op.children.is_empty());
2174 }
2175
2176 #[test]
2177 fn test_parse_single_operation_alt_fields() {
2178 let op_json = serde_json::json!({
2179 "op_type": "Sort",
2180 "desc": "Sort by name ASC",
2181 "rows": 100
2182 });
2183 let op = Connection::parse_single_operation(&op_json);
2184 assert_eq!(op.op_type, "Sort");
2185 assert_eq!(op.description, "Sort by name ASC");
2186 assert_eq!(op.estimated_rows, Some(100));
2187 }
2188
2189 // redact_dsn tests - Gap #7 (DSN Password Exposure)
2190
2191 #[test]
2192 fn test_redact_dsn_url_with_password() {
2193 let dsn = "quic://admin:secret123@localhost:3141";
2194 let redacted = redact_dsn(dsn);
2195 assert!(redacted.contains("[REDACTED]"));
2196 assert!(!redacted.contains("secret123"));
2197 assert!(redacted.contains("admin"));
2198 assert!(redacted.contains("localhost"));
2199 }
2200
2201 #[test]
2202 fn test_redact_dsn_url_without_password() {
2203 let dsn = "quic://admin@localhost:3141";
2204 let redacted = redact_dsn(dsn);
2205 assert!(!redacted.contains("[REDACTED]"));
2206 assert!(redacted.contains("admin"));
2207 assert!(redacted.contains("localhost"));
2208 }
2209
2210 #[test]
2211 fn test_redact_dsn_url_no_auth() {
2212 let dsn = "quic://localhost:3141";
2213 let redacted = redact_dsn(dsn);
2214 assert_eq!(redacted, dsn);
2215 }
2216
2217 #[test]
2218 fn test_redact_dsn_query_param_password() {
2219 let dsn = "localhost:3141?username=admin&password=secret123";
2220 let redacted = redact_dsn(dsn);
2221 assert!(redacted.contains("[REDACTED]"));
2222 assert!(!redacted.contains("secret123"));
2223 assert!(redacted.contains("username=admin"));
2224 }
2225
2226 #[test]
2227 fn test_redact_dsn_query_param_pass() {
2228 let dsn = "localhost:3141?user=admin&pass=mysecret";
2229 let redacted = redact_dsn(dsn);
2230 assert!(redacted.contains("[REDACTED]"));
2231 assert!(!redacted.contains("mysecret"));
2232 }
2233
2234 #[test]
2235 fn test_redact_dsn_simple_no_password() {
2236 let dsn = "localhost:3141?insecure=true";
2237 let redacted = redact_dsn(dsn);
2238 assert_eq!(redacted, dsn);
2239 }
2240
2241 #[test]
2242 fn test_redact_dsn_url_with_query_and_password() {
2243 let dsn = "quic://user:pass@localhost:3141?insecure=true";
2244 let redacted = redact_dsn(dsn);
2245 assert!(redacted.contains("[REDACTED]"));
2246 assert!(!redacted.contains(":pass@"));
2247 assert!(redacted.contains("insecure=true"));
2248 }
2249
2250 // validate() tests - Gap #9 (Validation Not Automatic)
2251
2252 #[test]
2253 fn test_client_validate_valid() {
2254 let client = Client::new("localhost", 3141);
2255 assert!(client.validate().is_ok());
2256 }
2257
2258 #[test]
2259 fn test_client_validate_valid_hostname() {
2260 let client = Client::new("geode.example.com", 3141);
2261 assert!(client.validate().is_ok());
2262 }
2263
2264 #[test]
2265 fn test_client_validate_valid_ipv4() {
2266 let client = Client::new("192.168.1.1", 8443);
2267 assert!(client.validate().is_ok());
2268 }
2269
2270 #[test]
2271 fn test_client_validate_invalid_hostname_hyphen_start() {
2272 let client = Client::new("-invalid", 3141);
2273 assert!(client.validate().is_err());
2274 }
2275
2276 #[test]
2277 fn test_client_validate_invalid_hostname_hyphen_end() {
2278 let client = Client::new("invalid-", 3141);
2279 assert!(client.validate().is_err());
2280 }
2281
2282 #[test]
2283 fn test_client_validate_invalid_port_zero() {
2284 let client = Client::new("localhost", 0);
2285 assert!(client.validate().is_err());
2286 }
2287
2288 #[test]
2289 fn test_client_validate_invalid_page_size_zero() {
2290 let client = Client::new("localhost", 3141).page_size(0);
2291 assert!(client.validate().is_err());
2292 }
2293
2294 #[test]
2295 fn test_client_validate_invalid_page_size_too_large() {
2296 let client = Client::new("localhost", 3141).page_size(200_000);
2297 assert!(client.validate().is_err());
2298 }
2299
2300 #[test]
2301 fn test_client_validate_with_all_options() {
2302 let client = Client::new("geode.example.com", 8443)
2303 .skip_verify(true)
2304 .page_size(500)
2305 .username("admin")
2306 .password("secret")
2307 .connect_timeout(15)
2308 .hello_timeout(10)
2309 .idle_timeout(60);
2310 assert!(client.validate().is_ok());
2311 }
2312
2313 // Gap #13: Test that extreme timeout values don't cause builder panics
2314 #[test]
2315 fn test_client_extreme_timeout_values() {
2316 // These should not panic - the actual validation/capping happens at connect time
2317 let _client = Client::new("localhost", 3141)
2318 .connect_timeout(u64::MAX)
2319 .hello_timeout(u64::MAX)
2320 .idle_timeout(u64::MAX);
2321 // Builder should accept any u64 value without panicking
2322 }
2323}