Skip to main content

hyperdb_api_core/client/
client.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level synchronous client for Hyper database.
5//!
6//! This module provides [`Client`], the primary synchronous interface for
7//! communicating with a Hyper server. It supports three query execution
8//! modes, each using a different level of the `PostgreSQL` wire protocol:
9//!
10//! - **Simple Query** ([`Client::query`]) — Sends a single `Query` message
11//!   and collects all `DataRow` messages into memory. Results are returned
12//!   in text format. Best for small result sets or DDL/DML commands.
13//!
14//! - **Extended Query / `HyperBinary`** ([`Client::query_fast`]) — Uses the
15//!   Extended Query protocol (`Parse` / `Bind` / `Execute`) with
16//!   [`ColumnFormat::HyperBinary`](super::statement::ColumnFormat::HyperBinary)
17//!   (format code 2) for `LittleEndian` binary results. Returns
18//!   [`StreamRow`]s that compute field offsets
19//!   on-demand, avoiding per-row allocation.
20//!
21//! - **Streaming** ([`Client::query_streaming`]) — Like `query_fast` but
22//!   returns a [`QueryStream`] that yields rows in chunks, keeping memory
23//!   usage constant regardless of result set size. The stream holds the
24//!   connection lock for its lifetime; dropping it triggers a cancel request
25//!   to stop the server from streaming the rest.
26//!
27//! # Bulk Insertion (COPY Protocol)
28//!
29//! [`Client::copy_in`] starts a `COPY ... FROM STDIN WITH (FORMAT HYPERBINARY)`
30//! session and returns a [`CopyInWriter`] for streaming binary data. The
31//! caller is responsible for encoding rows in the correct format (typically
32//! done by the higher-level [`hyperdb_api::Inserter`](https://docs.rs/hyperdb-api)).
33//! See also [`copy_in_with_format`](Client::copy_in_with_format) for
34//! alternative formats (CSV, Arrow IPC) and [`copy_in_raw`](Client::copy_in_raw)
35//! for fully custom COPY statements.
36
37use std::net::TcpStream;
38use std::sync::{Arc, Mutex, MutexGuard};
39use std::time::Duration;
40
41use tracing::{debug, info, trace, warn};
42
43/// Enable TCP keepalive on a connection socket so a half-open peer (laptop
44/// sleep, network blip, a hyperd that vanished without a FIN) is detected in
45/// ~90s instead of blocking a blocking `read()` until the OS default idle
46/// timeout (7200s / 2h on macOS and Linux).
47///
48/// This matters most for long-lived idle connections — e.g. an MCP client
49/// holding a connection to a resident daemon's hyperd across a laptop suspend.
50/// Without keepalive, the next query on a silently-dead socket hangs for hours.
51///
52/// Tuning: 60s idle before the first probe, 10s between probes, 3 probes →
53/// the peer is declared dead ~90s after it goes silent. Probe count is only
54/// honored on platforms whose `socket2` build exposes `with_retries`; macOS
55/// honors idle+interval. All calls are best-effort (`.ok()`): a kernel that
56/// rejects a knob leaves the connection working at OS defaults.
57fn apply_tcp_keepalive(sock: &socket2::SockRef<'_>) {
58    let keepalive = socket2::TcpKeepalive::new()
59        .with_time(Duration::from_secs(60))
60        .with_interval(Duration::from_secs(10));
61    #[cfg(not(any(target_os = "macos", target_os = "windows")))]
62    let keepalive = keepalive.with_retries(3);
63    sock.set_tcp_keepalive(&keepalive).ok();
64}
65
66#[cfg(unix)]
67use std::os::unix::net::UnixStream;
68
69use super::cancel::Cancellable;
70use super::config::Config;
71use super::connection::{parse_error_response, RawConnection};
72use super::endpoint::ConnectionEndpoint;
73use super::error::{Error, ErrorKind, Result};
74use super::prepare;
75use super::row::{Row, StreamRow};
76use super::sync_stream::SyncStream;
77
78use crate::protocol::message::Message;
79use crate::types::Oid;
80
81use super::notice::{Notice, NoticeReceiver};
82
83/// A synchronous client for Hyper database.
84///
85/// The client handles connection management and query execution.
86/// It is thread-safe and can be shared between threads using `Arc`.
87///
88/// # Thread Safety
89///
90/// The `Client` is thread-safe and can be shared between threads using `Arc<Client>`.
91/// All methods use internal mutexes to synchronize access to the underlying connection.
92///
93/// # Example
94///
95/// ```no_run
96/// use hyperdb_api_core::client::{Client, Config};
97///
98/// # fn example() -> hyperdb_api_core::client::Result<()> {
99/// let config = Config::new()
100///     .with_host("localhost")
101///     .with_port(7483)
102///     .with_database("test.hyper");
103///
104/// let client = Client::connect(&config)?;
105/// let rows = client.query("SELECT 1")?;
106/// client.close()?;
107/// # Ok(())
108/// # }
109/// ```
110pub struct Client {
111    /// The underlying connection, protected by a mutex for thread safety.
112    connection: Arc<Mutex<RawConnection<SyncStream>>>,
113    /// Backend process ID (for cancel requests).
114    process_id: i32,
115    /// Secret key for authenticating cancel requests.
116    secret_key: i32,
117    /// Connection endpoint for cancel requests and reconnection.
118    endpoint: ConnectionEndpoint,
119    /// Optional notice receiver callback for server notices/warnings.
120    notice_receiver: Option<Arc<NoticeReceiver>>,
121}
122
123// Manual Debug implementation because NoticeReceiver doesn't implement Debug
124impl std::fmt::Debug for Client {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("Client")
127            .field("process_id", &self.process_id)
128            .field("secret_key", &self.secret_key)
129            .field("endpoint", &self.endpoint)
130            .field(
131                "notice_receiver",
132                &self.notice_receiver.as_ref().map(|_| "<callback>"),
133            )
134            .finish_non_exhaustive()
135    }
136}
137
138impl Client {
139    /// Connects to a Hyper server using the given configuration.
140    ///
141    /// Establishes a TCP connection, performs authentication, and initializes
142    /// the client. Returns an error if the connection fails or authentication
143    /// is rejected.
144    ///
145    /// # Arguments
146    ///
147    /// * `config` - Connection configuration (host, port, credentials, etc.)
148    ///
149    /// # Errors
150    ///
151    /// Returns `Error` if:
152    /// - Connection to the server fails
153    /// - Authentication fails
154    /// - Protocol handshake fails
155    ///
156    /// # Example
157    ///
158    /// ```no_run
159    /// # use hyperdb_api_core::client::{Client, Config};
160    /// # fn example() -> hyperdb_api_core::client::Result<()> {
161    /// let config = Config::new()
162    ///     .with_host("localhost")
163    ///     .with_port(7483)
164    ///     .with_user("myuser")
165    ///     .with_password("mypass")
166    ///     .with_database("test.hyper");
167    ///
168    /// let client = Client::connect(&config)?;
169    /// # Ok(())
170    /// # }
171    /// ```
172    pub fn connect(config: &Config) -> Result<Self> {
173        // Log connection parameters (password is intentionally omitted for security)
174        info!(
175            target: "hyperdb_api",
176            host = %config.host(),
177            port = config.port(),
178            user = config.user().unwrap_or("(default)"),
179            database = config.database().unwrap_or("(none)"),
180            "connection-parameters"
181        );
182
183        let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
184        let addr = format!("{}:{}", config.host(), config.port());
185        let tcp_stream = TcpStream::connect(&addr).map_err(|e| {
186            warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
187            Error::connection(format!("failed to connect to {addr}: {e}"))
188        })?;
189
190        // Set TCP options for better performance.
191        //
192        // `TCP_NODELAY` disables Nagle so request bytes flush immediately —
193        // needed for low-latency request/response shapes.
194        //
195        // `SO_RCVBUF` / `SO_SNDBUF` are bumped to 4 MiB. The Windows default
196        // TCP buffers are ~64 KiB, which throttles loopback throughput
197        // because hyperd blocks on `send()` once the kernel buffer fills up.
198        // Linux auto-tunes much higher, so this primarily helps Windows
199        // (and is a marginal win on macOS).
200        //
201        // Empirical knee on Windows i9-10980XE / TCP loopback (100M-row
202        // sync full-scan, single connection):
203        //
204        // |  size | rows/sec |
205        // |------:|---------:|
206        // |  64 K |     2.89 |  (default)
207        // |   1 M |     5.95 |
208        // |   4 M |     6.90 |  <-- knee
209        // |   8 M |     6.68 |  (insert workloads regress 18%)
210        //
211        // 4 MiB hits the throughput plateau without the memory-pressure
212        // regression seen at 8 MiB. We use `.ok()` because the kernel may
213        // clamp to a lower value or refuse the request entirely; either
214        // way the connection still works at the default size.
215        tcp_stream.set_nodelay(true).ok();
216        let sock = socket2::SockRef::from(&tcp_stream);
217        sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
218        sock.set_send_buffer_size(4 * 1024 * 1024).ok();
219        apply_tcp_keepalive(&sock);
220
221        let stream = SyncStream::tcp(tcp_stream);
222        let mut connection = RawConnection::new(stream);
223
224        // Perform startup with authentication
225        let params = config.startup_params();
226        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
227        connection.startup(&params_ref, config.password())?;
228
229        let process_id = connection.process_id();
230        let secret_key = connection.secret_key();
231
232        debug!(
233            target: "hyperdb_api",
234            process_id,
235            "connection-established"
236        );
237
238        Ok(Client {
239            connection: Arc::new(Mutex::new(connection)),
240            process_id,
241            secret_key,
242            endpoint,
243            notice_receiver: None,
244        })
245    }
246
247    /// Connects to a Hyper server via Unix Domain Socket (Unix only).
248    ///
249    /// # Example
250    ///
251    /// ```no_run
252    /// # use hyperdb_api_core::client::{Client, Config};
253    /// # use std::path::Path;
254    /// # fn example() -> hyperdb_api_core::client::Result<()> {
255    /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
256    /// let config = Config::new().with_database("test.hyper");
257    /// let client = Client::connect_unix(socket_path, &config)?;
258    /// # Ok(())
259    /// # }
260    /// ```
261    ///
262    /// # Errors
263    ///
264    /// - Returns [`Error`] (connection) if the Unix domain socket cannot
265    ///   be connected.
266    /// - Propagates any [`Error`] from the startup handshake
267    ///   (authentication, protocol error, I/O error).
268    #[cfg(unix)]
269    pub fn connect_unix(socket_path: impl AsRef<std::path::Path>, config: &Config) -> Result<Self> {
270        use std::path::Path;
271
272        let path = socket_path.as_ref();
273        info!(
274            target: "hyperdb_api",
275            socket_path = %path.display(),
276            user = config.user().unwrap_or("(default)"),
277            database = config.database().unwrap_or("(none)"),
278            "connection-parameters-unix"
279        );
280
281        let unix_stream = UnixStream::connect(path).map_err(|e| {
282            warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
283            Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
284        })?;
285
286        // Parse endpoint from socket path
287        let directory = path.parent().unwrap_or(Path::new("/"));
288        let name = path
289            .file_name()
290            .and_then(|n| n.to_str())
291            .unwrap_or("socket");
292        let endpoint = ConnectionEndpoint::domain_socket(directory, name);
293
294        let stream = SyncStream::unix(unix_stream);
295        let mut connection = RawConnection::new(stream);
296
297        // Perform startup with authentication
298        let params = config.startup_params();
299        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
300        connection.startup(&params_ref, config.password())?;
301
302        let process_id = connection.process_id();
303        let secret_key = connection.secret_key();
304
305        debug!(
306            target: "hyperdb_api",
307            process_id,
308            "connection-established-unix"
309        );
310
311        Ok(Client {
312            connection: Arc::new(Mutex::new(connection)),
313            process_id,
314            secret_key,
315            endpoint,
316            notice_receiver: None,
317        })
318    }
319
320    /// Connects to a Hyper server via Windows Named Pipe (Windows only).
321    ///
322    /// # Arguments
323    ///
324    /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
325    /// * `config` - Connection configuration
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
330    /// exist, all instances are busy after the retry window, or permission is
331    /// denied) or if the authentication handshake fails.
332    #[cfg(windows)]
333    pub fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
334        use std::fs::OpenOptions;
335        use std::time::{Duration, Instant};
336
337        info!(
338            target: "hyperdb_api",
339            pipe_path = %pipe_path,
340            user = config.user().unwrap_or("(default)"),
341            database = config.database().unwrap_or("(none)"),
342            "connection-parameters-named-pipe"
343        );
344
345        // Windows named pipes have a finite number of server-side instances
346        // (`MaxInstances` on `CreateNamedPipe`). When all are busy, `CreateFile`
347        // returns `ERROR_PIPE_BUSY` (231). The expected client behavior is to
348        // wait briefly and retry — equivalent to `WaitNamedPipe` from Win32.
349        // We poll with a short sleep up to a reasonable deadline so concurrent
350        // clients don't spuriously fail when the pool is momentarily exhausted.
351        const RETRY_INTERVAL: Duration = Duration::from_millis(20);
352        const MAX_WAIT: Duration = Duration::from_secs(10);
353        const ERROR_PIPE_BUSY: i32 = 231;
354
355        let deadline = Instant::now() + MAX_WAIT;
356        let file = loop {
357            match OpenOptions::new().read(true).write(true).open(pipe_path) {
358                Ok(f) => break f,
359                Err(e)
360                    if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
361                {
362                    std::thread::sleep(RETRY_INTERVAL);
363                }
364                Err(e) => {
365                    warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
366                    return Err(Error::connection(format!(
367                        "failed to connect to named pipe {pipe_path}: {e}"
368                    )));
369                }
370            }
371        };
372
373        // Parse endpoint from pipe path
374        let endpoint = ConnectionEndpoint::parse(&format!(
375            "tab.pipe://{}",
376            pipe_path.trim_start_matches(r"\\").replace('\\', "/")
377        ))
378        .unwrap_or_else(|_| {
379            // Fallback: construct from pipe path directly
380            // Expected format: \\<host>\pipe\<name>
381            let parts: Vec<&str> = pipe_path
382                .trim_start_matches(r"\\")
383                .splitn(3, '\\')
384                .collect();
385            if parts.len() >= 3 {
386                ConnectionEndpoint::named_pipe(parts[0], parts[2])
387            } else {
388                ConnectionEndpoint::named_pipe(".", pipe_path)
389            }
390        });
391
392        let stream = SyncStream::named_pipe(file);
393        let mut connection = RawConnection::new(stream);
394
395        // Perform startup with authentication
396        let params = config.startup_params();
397        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
398        connection.startup(&params_ref, config.password())?;
399
400        let process_id = connection.process_id();
401        let secret_key = connection.secret_key();
402
403        debug!(
404            target: "hyperdb_api",
405            process_id,
406            "connection-established-named-pipe"
407        );
408
409        Ok(Client {
410            connection: Arc::new(Mutex::new(connection)),
411            process_id,
412            secret_key,
413            endpoint,
414            notice_receiver: None,
415        })
416    }
417
418    /// Connects to a Hyper server using a `ConnectionEndpoint`.
419    ///
420    /// This is a lower-level method that accepts a pre-parsed endpoint.
421    ///
422    /// # Errors
423    ///
424    /// Delegates to [`Client::connect`], [`Client::connect_unix`], or
425    /// `Client::connect_named_pipe` depending on the endpoint variant,
426    /// and propagates their errors unchanged.
427    pub fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
428        match endpoint {
429            ConnectionEndpoint::Tcp { host, port } => {
430                let mut cfg = config.clone();
431                cfg = cfg.with_host(host.clone()).with_port(*port);
432                Self::connect(&cfg)
433            }
434            #[cfg(unix)]
435            ConnectionEndpoint::DomainSocket { directory, name } => {
436                let socket_path = directory.join(name);
437                Self::connect_unix(&socket_path, config)
438            }
439            #[cfg(windows)]
440            ConnectionEndpoint::NamedPipe { host, name } => {
441                let pipe_path = format!(r"\\{host}\pipe\{name}");
442                Self::connect_named_pipe(&pipe_path, config)
443            }
444        }
445    }
446
447    /// Returns the connection endpoint.
448    #[must_use]
449    pub fn endpoint(&self) -> &ConnectionEndpoint {
450        &self.endpoint
451    }
452
453    /// Returns the server process ID for this connection.
454    #[must_use]
455    pub fn process_id(&self) -> i32 {
456        self.process_id
457    }
458
459    /// Returns the secret key for cancel requests.
460    #[must_use]
461    pub fn secret_key(&self) -> i32 {
462        self.secret_key
463    }
464
465    /// Cancels the currently executing query on this connection.
466    ///
467    /// This method is **thread-safe** and can be called from any thread while
468    /// a query is running on another thread. It works by opening a separate
469    /// TCP connection to the server and sending a cancel request.
470    ///
471    /// # How It Works
472    ///
473    /// 1. Opens a new TCP connection to the same server
474    /// 2. Sends a cancel request containing the process ID and secret key
475    /// 3. The server receives this and cancels the running query
476    /// 4. The original query will fail with error code 57014 (`query_canceled`)
477    ///
478    /// # Thread Safety
479    ///
480    /// This method does NOT acquire the connection mutex, so it can be called
481    /// while another thread is blocked waiting for query results.
482    ///
483    /// # Relation to the [`Cancellable`] trait
484    ///
485    /// This is the **fallible user-facing cancel API**: it returns a
486    /// `Result<()>` so explicit callers can observe transport-level
487    /// failures (network errors, socket issues) and react accordingly —
488    /// e.g. record a metric, show "cancel failed" UX, or retry.
489    ///
490    /// For [`Drop`]-path and other internal cleanup contexts where error
491    /// propagation is impossible, the separate
492    /// [`impl Cancellable for Client`](super::cancel::Cancellable) wraps
493    /// this method and swallows errors (logged via `tracing::warn!`).
494    /// The two coexist by design — each serves a different consumer.
495    ///
496    /// # Example
497    ///
498    /// ```no_run
499    /// use std::thread;
500    /// use std::sync::Arc;
501    /// use std::time::Duration;
502    /// use hyperdb_api_core::client::{Client, Config};
503    ///
504    /// # fn example() -> hyperdb_api_core::client::Result<()> {
505    /// # let config = Config::new().with_host("localhost").with_port(7483);
506    /// let client = Arc::new(Client::connect(&config)?);
507    /// let client_clone = Arc::clone(&client);
508    ///
509    /// // Start a long query in another thread
510    /// let handle = thread::spawn(move || {
511    ///     client_clone.query("SELECT pg_sleep(60)")
512    /// });
513    ///
514    /// // Cancel from the main thread
515    /// thread::sleep(Duration::from_millis(100));
516    /// client.cancel()?;
517    ///
518    /// // The query thread will get a cancellation error
519    /// let result = handle.join().unwrap();
520    /// assert!(result.is_err());
521    /// # Ok(())
522    /// # }
523    /// ```
524    ///
525    /// # Errors
526    ///
527    /// - Returns [`Error`] (connection) if the fresh cancel-side socket
528    ///   cannot be opened (TCP / UDS / named-pipe, depending on
529    ///   [`Self::endpoint`]).
530    /// - Returns [`Error`] (I/O) if writing or flushing the cancel
531    ///   request fails.
532    pub fn cancel(&self) -> Result<()> {
533        use crate::protocol::message::frontend;
534        use bytes::BytesMut;
535        use std::io::Write;
536
537        info!(
538            target: "hyperdb_api",
539            process_id = self.process_id,
540            "query-cancel-request"
541        );
542
543        let endpoint_str = self.endpoint.to_string();
544
545        // Open a new connection specifically for the cancel request
546        match &self.endpoint {
547            ConnectionEndpoint::Tcp { host, port } => {
548                let addr = format!("{host}:{port}");
549                let mut stream = TcpStream::connect(&addr).map_err(|e| {
550                    warn!(
551                        target: "hyperdb_api",
552                        addr = %endpoint_str,
553                        error = %e,
554                        "query-cancel-connect-failed"
555                    );
556                    Error::connection(format!(
557                        "failed to connect for cancel request to {endpoint_str}: {e}"
558                    ))
559                })?;
560                // Cancel is a 16-byte fire-and-forget — disable Nagle so the
561                // request hits the wire without waiting on a coalesce timer.
562                stream.set_nodelay(true).ok();
563
564                // Build and send the cancel request
565                let mut buf = BytesMut::with_capacity(16);
566                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
567
568                stream.write_all(&buf).map_err(|e| {
569                    warn!(
570                        target: "hyperdb_api",
571                        error = %e,
572                        "query-cancel-send-failed"
573                    );
574                    Error::io(e)
575                })?;
576
577                stream.flush().map_err(Error::io)?;
578            }
579            #[cfg(unix)]
580            ConnectionEndpoint::DomainSocket { directory, name } => {
581                let socket_path = directory.join(name);
582                let mut stream = UnixStream::connect(&socket_path).map_err(|e| {
583                    warn!(
584                        target: "hyperdb_api",
585                        addr = %endpoint_str,
586                        error = %e,
587                        "query-cancel-connect-failed"
588                    );
589                    Error::connection(format!(
590                        "failed to connect for cancel request to {endpoint_str}: {e}"
591                    ))
592                })?;
593
594                // Build and send the cancel request
595                let mut buf = BytesMut::with_capacity(16);
596                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
597
598                stream.write_all(&buf).map_err(|e| {
599                    warn!(
600                        target: "hyperdb_api",
601                        error = %e,
602                        "query-cancel-send-failed"
603                    );
604                    Error::io(e)
605                })?;
606
607                stream.flush().map_err(Error::io)?;
608            }
609            #[cfg(windows)]
610            ConnectionEndpoint::NamedPipe { host, name } => {
611                let pipe_path = format!(r"\\{host}\pipe\{name}");
612                let mut file = std::fs::OpenOptions::new()
613                    .read(true)
614                    .write(true)
615                    .open(&pipe_path)
616                    .map_err(|e| {
617                        warn!(
618                            target: "hyperdb_api",
619                            addr = %endpoint_str,
620                            error = %e,
621                            "query-cancel-connect-failed"
622                        );
623                        Error::connection(format!(
624                            "failed to connect for cancel request to {endpoint_str}: {e}"
625                        ))
626                    })?;
627
628                // Build and send the cancel request
629                let mut buf = BytesMut::with_capacity(16);
630                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
631
632                file.write_all(&buf).map_err(|e| {
633                    warn!(
634                        target: "hyperdb_api",
635                        error = %e,
636                        "query-cancel-send-failed"
637                    );
638                    Error::io(e)
639                })?;
640
641                file.flush().map_err(Error::io)?;
642            }
643        }
644
645        debug!(
646            target: "hyperdb_api",
647            process_id = self.process_id,
648            "query-cancel-sent"
649        );
650
651        Ok(())
652    }
653
654    /// Returns a server parameter value by name.
655    ///
656    /// Server parameters are sent by the server during connection startup.
657    /// Common parameters include:
658    /// - `server_version` - The server version string
659    /// - `server_encoding` - The server's character encoding
660    /// - `client_encoding` - The client's character encoding
661    ///
662    /// # Example
663    ///
664    /// ```no_run
665    /// # use hyperdb_api_core::client::{Client, Config};
666    /// # fn example(client: &Client) {
667    /// if let Some(version) = client.parameter_status("server_version") {
668    ///     println!("Connected to Hyper version: {}", version);
669    /// }
670    /// # }
671    /// ```
672    #[must_use]
673    pub fn parameter_status(&self, name: &str) -> Option<String> {
674        let conn = self.connection.lock().ok()?;
675        conn.parameter_status(name)
676            .map(std::string::ToString::to_string)
677    }
678
679    /// Sets the notice receiver for this connection.
680    ///
681    /// Notice and warning messages generated by the server are not returned by
682    /// query execution functions since they don't indicate failure. Instead,
683    /// they are passed to a notice handling function.
684    ///
685    /// The default behavior is to log notices at the `warn` level.
686    ///
687    /// # Arguments
688    ///
689    /// * `receiver` - The callback function that will be called with each notice.
690    ///   Pass `None` to restore default logging behavior.
691    ///
692    /// # Example
693    ///
694    /// ```no_run
695    /// # use hyperdb_api_core::client::Client;
696    /// # use std::sync::{Arc, Mutex};
697    /// # fn example(client: &mut Client) {
698    /// client.set_notice_receiver(Some(Box::new(|notice| {
699    ///     println!("Server notice: {}", notice);
700    /// })));
701    ///
702    /// // Or capture notices in a Vec
703    /// let notices = Arc::new(Mutex::new(Vec::new()));
704    /// let notices_clone = notices.clone();
705    /// client.set_notice_receiver(Some(Box::new(move |notice| {
706    ///     notices_clone.lock().unwrap().push(notice);
707    /// })));
708    /// # }
709    /// ```
710    pub fn set_notice_receiver(&mut self, receiver: Option<NoticeReceiver>) {
711        self.notice_receiver = receiver.map(Arc::new);
712    }
713
714    /// Processes any notices in a list of messages, calling the notice receiver.
715    ///
716    /// This is called internally after receiving messages from the server.
717    pub(crate) fn process_notices(&self, messages: &[Message]) {
718        for msg in messages {
719            if let Message::NoticeResponse(body) = msg {
720                let notice = Notice::from_response_body(body);
721
722                if let Some(ref receiver) = self.notice_receiver {
723                    receiver(notice);
724                } else {
725                    // Default behavior: log at warn level
726                    warn!(
727                        target: "hyperdb_api",
728                        severity = notice.severity().unwrap_or("NOTICE"),
729                        code = notice.code().unwrap_or(""),
730                        message = %notice.message(),
731                        "server-notice"
732                    );
733                }
734            }
735        }
736    }
737
738    /// Acquires a lock on the connection.
739    fn lock_connection(&self) -> Result<MutexGuard<'_, RawConnection<SyncStream>>> {
740        self.connection
741            .lock()
742            .map_err(|_| Error::connection("connection mutex poisoned"))
743    }
744
745    /// Executes a simple query and returns the rows.
746    ///
747    /// # Errors
748    ///
749    /// - Returns [`Error`] (connection) if the connection mutex is
750    ///   poisoned.
751    /// - Returns [`Error`] (server) for any SQL error the server reports
752    ///   (syntax error, constraint violation, type mismatch).
753    /// - Returns [`Error`] (I/O) on wire-protocol I/O failure.
754    /// - Propagates any [`Error`] from row construction (invalid row
755    ///   description or data row bytes).
756    pub fn query(&self, query: &str) -> Result<Vec<Row>> {
757        let mut conn = self.lock_connection()?;
758        let messages = conn.simple_query(query)?;
759        drop(conn); // Release lock before processing notices
760
761        self.process_notices(&messages);
762
763        let mut rows = Vec::new();
764        let mut columns = None;
765
766        for msg in messages {
767            match msg {
768                crate::protocol::message::Message::RowDescription(desc) => {
769                    // Extract column info including format from protocol
770                    let mut cols = Vec::new();
771                    for f in desc.fields().filter_map(|r| {
772                        r.map_err(|e| trace!(target: "hyperdb_api_core::client", error = %e, "dropped error parsing row description field")).ok()
773                    }) {
774                        cols.push(super::statement::Column::new(
775                            f.name().to_string(),
776                            f.type_oid(),
777                            f.type_modifier(),
778                            super::statement::ColumnFormat::from_code(f.format()),
779                        ));
780                    }
781                    columns = Some(Arc::new(cols));
782                }
783                crate::protocol::message::Message::DataRow(data) => {
784                    if let Some(ref cols) = columns {
785                        rows.push(Row::new(Arc::clone(cols), data)?);
786                    }
787                }
788                _ => {}
789            }
790        }
791
792        Ok(rows)
793    }
794
795    /// Executes a query using `HyperBinary` format for maximum performance.
796    ///
797    /// Returns `StreamRow`s which compute offsets on-demand without pre-allocation,
798    /// making them faster for large result sets where each row is processed once.
799    /// Uses the extended query protocol with `HyperBinary` format (format code 2)
800    /// for direct binary access without text parsing overhead.
801    ///
802    /// # Errors
803    ///
804    /// Same as [`Self::query`]: connection-mutex poisoning, SQL errors
805    /// from the server, and wire-protocol I/O failures all surface as
806    /// [`Error`].
807    pub fn query_fast(&self, query: &str) -> Result<Vec<StreamRow>> {
808        let mut conn = self.lock_connection()?;
809        let messages = conn.query_binary(query)?;
810        drop(conn);
811
812        self.process_notices(&messages);
813
814        let mut rows = Vec::new();
815        for msg in messages {
816            if let crate::protocol::message::Message::DataRow(data) = msg {
817                rows.push(StreamRow::new(data));
818            }
819        }
820        Ok(rows)
821    }
822
823    /// Executes a query with streaming results for minimum memory usage.
824    ///
825    /// Combines `HyperBinary` format with incremental row fetching.
826    ///
827    /// # Errors
828    ///
829    /// - Returns [`Error`] (connection) if the connection mutex is
830    ///   poisoned.
831    /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
832    ///   `Parse`/`Bind`/`Execute` sequence for the streaming query
833    ///   fails on the server or on the wire.
834    pub fn query_streaming<'a>(
835        &'a self,
836        query: &str,
837        chunk_size: usize,
838    ) -> Result<QueryStream<'a>> {
839        let mut conn = self.lock_connection()?;
840        conn.start_query_binary(query)?;
841        Ok(QueryStream {
842            conn: Some(conn),
843            // The owning client is the canceller: if the stream is dropped
844            // before being fully drained, its `Drop` impl will call
845            // `self.cancel()` (PG wire `CancelRequest` on a fresh
846            // connection) to stop the server from streaming the rest.
847            canceller: self,
848            finished: false,
849            chunk_size: chunk_size.max(1),
850            schema: None,
851            schema_read: false,
852        })
853    }
854
855    /// Executes a SQL command that doesn't return rows (e.g., INSERT, UPDATE).
856    ///
857    /// # Errors
858    ///
859    /// Same error modes as [`Self::query`] — connection-mutex poisoning,
860    /// server-side SQL errors, and wire-protocol I/O failures all
861    /// surface as [`Error`].
862    pub fn exec(&self, query: &str) -> Result<u64> {
863        let mut conn = self.lock_connection()?;
864        let messages = conn.simple_query(query)?;
865        drop(conn); // Release lock before processing notices
866
867        self.process_notices(&messages);
868
869        let mut affected = 0u64;
870        for msg in messages {
871            if let crate::protocol::message::Message::CommandComplete(body) = msg {
872                if let Ok(tag) = body.tag() {
873                    // Parse affected row count from tag like "INSERT 0 1"
874                    if let Some(count) = parse_affected_rows(tag) {
875                        affected = count;
876                    }
877                }
878            }
879        }
880
881        Ok(affected)
882    }
883
884    /// Executes a batch of statements separated by semicolons.
885    ///
886    /// # Errors
887    ///
888    /// Same error modes as [`Self::query`] — connection-mutex poisoning,
889    /// server-side SQL errors, and wire-protocol I/O failures.
890    pub fn batch_execute(&self, query: &str) -> Result<()> {
891        let mut conn = self.lock_connection()?;
892        let messages = conn.simple_query(query)?;
893        drop(conn); // Release lock before processing notices
894
895        self.process_notices(&messages);
896        Ok(())
897    }
898
899    /// Prepares a statement for execution with the \[`params!`\] macro.
900    ///
901    /// Returns an [`prepare::OwnedPreparedStatement`] that automatically closes when dropped.
902    ///
903    /// # Example
904    ///
905    /// ```no_run
906    /// # use hyperdb_api_core::{params, client::{Client, Config}};
907    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
908    /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1")?;
909    /// let rows = client.execute(&stmt, params![42_i32])?;
910    /// # Ok(())
911    /// # }
912    /// ```
913    ///
914    /// # Errors
915    ///
916    /// Propagates any [`Error`] from [`prepare::prepare_owned`] —
917    /// connection-mutex poisoning, server-side `Parse` failures (SQL
918    /// syntax, type resolution), and wire-protocol I/O failures.
919    pub fn prepare(&self, query: &str) -> Result<prepare::OwnedPreparedStatement> {
920        prepare::prepare_owned(&self.connection, query, &[])
921    }
922
923    /// Prepares a statement with explicit parameter types.
924    ///
925    /// # Errors
926    ///
927    /// Same failure modes as [`Self::prepare`].
928    pub fn prepare_typed(
929        &self,
930        query: &str,
931        param_types: &[Oid],
932    ) -> Result<prepare::OwnedPreparedStatement> {
933        prepare::prepare_owned(&self.connection, query, param_types)
934    }
935
936    /// Executes a prepared statement with parameters.
937    ///
938    /// Use the \[`params!`\] macro for ergonomic parameter encoding:
939    ///
940    /// ```no_run
941    /// # use hyperdb_api_core::{params, client::{Client, Config}};
942    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
943    /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1 AND name = $2")?;
944    /// let rows = client.execute(&stmt, params![42_i32, "Alice"])?;
945    /// # Ok(())
946    /// # }
947    /// ```
948    ///
949    /// # Errors
950    ///
951    /// Propagates any [`Error`] from [`prepare::execute_prepared`] —
952    /// parameter-count or type mismatch, server-side SQL errors, and
953    /// wire-protocol I/O failures.
954    pub fn execute<P: AsRef<[Option<Vec<u8>>]>>(
955        &self,
956        statement: &prepare::OwnedPreparedStatement,
957        params: P,
958    ) -> Result<Vec<Row>> {
959        let params_ref: Vec<Option<&[u8]>> = params
960            .as_ref()
961            .iter()
962            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
963            .collect();
964        prepare::execute_prepared(&self.connection, statement.statement(), &params_ref)
965    }
966
967    /// Executes a prepared statement that doesn't return rows.
968    ///
969    /// # Errors
970    ///
971    /// Same failure modes as [`Self::execute`].
972    pub fn execute_no_result<P: AsRef<[Option<Vec<u8>>]>>(
973        &self,
974        statement: &prepare::OwnedPreparedStatement,
975        params: P,
976    ) -> Result<u64> {
977        let params_ref: Vec<Option<&[u8]>> = params
978            .as_ref()
979            .iter()
980            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
981            .collect();
982        prepare::execute_prepared_no_result(&self.connection, statement.statement(), &params_ref)
983    }
984
985    /// Executes a prepared statement with streaming results.
986    ///
987    /// Returns a [`PreparedQueryStream`](super::prepared_stream::PreparedQueryStream)
988    /// that yields rows in chunks, keeping memory bounded regardless of
989    /// result size. This is the prepared-statement analog of
990    /// [`query_streaming`](Self::query_streaming).
991    ///
992    /// The connection mutex is held for the duration of iteration;
993    /// dropping the stream before completion issues a best-effort cancel.
994    ///
995    /// # Errors
996    ///
997    /// - Returns [`Error`] (connection) if the connection mutex is
998    ///   poisoned.
999    /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
1000    ///   `Bind`/`Execute` sequence for the prepared statement fails on
1001    ///   the server or on the wire.
1002    pub fn execute_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
1003        &'a self,
1004        statement: &prepare::OwnedPreparedStatement,
1005        params: P,
1006        chunk_size: usize,
1007    ) -> Result<super::prepared_stream::PreparedQueryStream<'a>> {
1008        let params_ref: Vec<Option<&[u8]>> = params
1009            .as_ref()
1010            .iter()
1011            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
1012            .collect();
1013
1014        let mut conn = self.lock_connection()?;
1015        conn.start_execute_prepared(statement.name(), &params_ref, statement.columns().len())?;
1016
1017        let columns = std::sync::Arc::new(statement.columns().to_vec());
1018        Ok(super::prepared_stream::PreparedQueryStream::new(
1019            conn, self, chunk_size, columns,
1020        ))
1021    }
1022
1023    /// Closes the connection.
1024    ///
1025    /// # Errors
1026    ///
1027    /// - Returns [`Error`] (connection) if the connection mutex is
1028    ///   poisoned.
1029    /// - Returns [`Error`] (I/O) if writing the `Terminate` message or
1030    ///   flushing the socket fails.
1031    pub fn close(self) -> Result<()> {
1032        let mut conn = self.lock_connection()?;
1033        conn.terminate()
1034    }
1035
1036    /// Starts a COPY IN operation for bulk data insertion.
1037    ///
1038    /// Returns a `CopyInWriter` that can be used to send data in `HyperBinary` format.
1039    ///
1040    /// # Example
1041    ///
1042    /// ```no_run
1043    /// # use hyperdb_api_core::client::Client;
1044    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1045    /// # let binary_data = &[];
1046    /// let mut writer = client.copy_in("\"my_table\"", &["col1", "col2"])?;
1047    /// writer.send(binary_data)?;
1048    /// let rows = writer.finish()?;
1049    /// # Ok(())
1050    /// # }
1051    /// ```
1052    ///
1053    /// # Errors
1054    ///
1055    /// Delegates to [`Self::copy_in_with_format`]; see that method
1056    /// for the concrete failure modes.
1057    pub fn copy_in(&self, table_name: &str, columns: &[&str]) -> Result<CopyInWriter<'_>> {
1058        self.copy_in_with_format(table_name, columns, "HYPERBINARY")
1059    }
1060
1061    /// Starts a COPY IN operation with a specified data format.
1062    ///
1063    /// Returns a `CopyInWriter` that can be used to send data in the specified format.
1064    ///
1065    /// # Arguments
1066    ///
1067    /// * `table_name` - The target table name (should be properly quoted if needed)
1068    /// * `columns` - Column names to insert into
1069    /// * `format` - The data format string: "HYPERBINARY" or "ARROWSTREAM"
1070    ///
1071    /// # Example
1072    ///
1073    /// ```no_run
1074    /// # use hyperdb_api_core::client::Client;
1075    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1076    /// # let arrow_ipc_data = &[];
1077    /// // For Arrow IPC stream format
1078    /// let mut writer = client.copy_in_with_format("\"my_table\"", &["col1", "col2"], "ARROWSTREAM")?;
1079    /// writer.send(arrow_ipc_data)?;
1080    /// let rows = writer.finish()?;
1081    /// # Ok(())
1082    /// # }
1083    /// ```
1084    ///
1085    /// # Errors
1086    ///
1087    /// - Returns [`Error`] (connection) if the connection mutex is
1088    ///   poisoned.
1089    /// - Returns [`Error`] (server) if the server rejects the generated
1090    ///   `COPY ... FROM STDIN` statement (for example, missing table
1091    ///   or mismatched columns).
1092    /// - Returns [`Error`] (I/O) on wire-protocol I/O failure while
1093    ///   initiating the COPY.
1094    pub fn copy_in_with_format(
1095        &self,
1096        table_name: &str,
1097        columns: &[&str],
1098        format: &str,
1099    ) -> Result<CopyInWriter<'_>> {
1100        let mut conn = self.lock_connection()?;
1101        conn.start_copy_in_with_format(table_name, columns, format)?;
1102        Ok(CopyInWriter { connection: conn })
1103    }
1104
1105    /// Starts a COPY IN operation from a raw SQL query string.
1106    ///
1107    /// The query must be a complete `COPY ... FROM STDIN ...` statement.
1108    /// This is useful for text-format imports (CSV, TSV) where you need
1109    /// full control over the COPY options.
1110    ///
1111    /// # Security
1112    ///
1113    /// The query is validated to start with `COPY` (case-insensitive) as a
1114    /// defense-in-depth measure. Callers are still responsible for proper
1115    /// escaping of table names and other identifiers within the query.
1116    /// Prefer [`copy_in()`](Self::copy_in) or
1117    /// [`copy_in_with_format()`](Self::copy_in_with_format) when possible,
1118    /// as they handle escaping automatically.
1119    ///
1120    /// # Example
1121    ///
1122    /// ```no_run
1123    /// # use hyperdb_api_core::client::Client;
1124    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1125    /// let mut writer = client.copy_in_raw(
1126    ///     "COPY \"my_table\" FROM STDIN WITH (FORMAT csv, HEADER true)"
1127    /// )?;
1128    /// writer.send(b"1,Alice\n2,Bob\n")?;
1129    /// let rows = writer.finish()?;
1130    /// # Ok(())
1131    /// # }
1132    /// ```
1133    ///
1134    /// # Errors
1135    ///
1136    /// - Returns [`ErrorKind::Query`] if `query` (trimmed) does not
1137    ///   start with `COPY` (defense-in-depth check against non-COPY
1138    ///   statements).
1139    /// - Returns [`Error`] (connection) if the connection mutex is
1140    ///   poisoned.
1141    /// - Returns [`Error`] (server) or [`Error`] (I/O) if the server rejects
1142    ///   the COPY statement or the wire write fails.
1143    pub fn copy_in_raw(&self, query: &str) -> Result<CopyInWriter<'_>> {
1144        // Defense-in-depth: reject queries that don't look like COPY statements
1145        if !query.trim_start().to_ascii_uppercase().starts_with("COPY") {
1146            return Err(Error::new(
1147                ErrorKind::Query,
1148                "copy_in_raw() requires a COPY statement. \
1149                 The query must start with 'COPY'.",
1150            ));
1151        }
1152        let mut conn = self.lock_connection()?;
1153        conn.start_copy_in_raw(query)?;
1154        Ok(CopyInWriter { connection: conn })
1155    }
1156
1157    /// Returns true if the connection is alive (no error has occurred).
1158    #[must_use]
1159    pub fn is_alive(&self) -> bool {
1160        self.lock_connection().is_ok()
1161    }
1162
1163    /// Executes a COPY ... TO STDOUT query and returns all output data.
1164    ///
1165    /// This is used for queries like:
1166    /// `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
1167    ///
1168    /// # Arguments
1169    ///
1170    /// * `query` - The COPY TO STDOUT query to execute
1171    ///
1172    /// # Returns
1173    ///
1174    /// The raw bytes from all `CopyData` messages concatenated together.
1175    ///
1176    /// # Example
1177    ///
1178    /// ```no_run
1179    /// # use hyperdb_api_core::client::Client;
1180    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1181    /// let arrow_data = client.copy_out(
1182    ///     "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
1183    /// )?;
1184    /// # Ok(())
1185    /// # }
1186    /// ```
1187    ///
1188    /// # Errors
1189    ///
1190    /// - Returns [`Error`] (connection) if the connection mutex is
1191    ///   poisoned.
1192    /// - Returns [`Error`] (server) if the server rejects the `COPY ... TO
1193    ///   STDOUT` statement.
1194    /// - Returns [`Error`] (I/O) if the wire read fails while collecting
1195    ///   COPY output.
1196    pub fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
1197        let mut conn = self.lock_connection()?;
1198        conn.copy_out(query)
1199    }
1200
1201    /// Executes a COPY ... TO STDOUT query and streams output to a writer.
1202    ///
1203    /// Unlike [`copy_out`](Self::copy_out) which collects all data into memory,
1204    /// this method streams each `CopyData` chunk directly to the provided writer,
1205    /// keeping memory usage constant regardless of result size.
1206    ///
1207    /// Returns the total number of bytes written.
1208    ///
1209    /// # Example
1210    ///
1211    /// ```no_run
1212    /// # use hyperdb_api_core::client::Client;
1213    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1214    /// let mut file = std::fs::File::create("output.csv")?;
1215    /// let bytes_written = client.copy_out_to_writer(
1216    ///     "COPY (SELECT * FROM my_table) TO STDOUT WITH (FORMAT csv, HEADER true)",
1217    ///     &mut file,
1218    /// )?;
1219    /// println!("Wrote {} bytes", bytes_written);
1220    /// # Ok(())
1221    /// # }
1222    /// ```
1223    ///
1224    /// # Errors
1225    ///
1226    /// Same failure modes as [`Self::copy_out`], plus [`Error`] (I/O)
1227    /// when the supplied `writer` returns an error while receiving
1228    /// COPY chunks.
1229    pub fn copy_out_to_writer(&self, query: &str, writer: &mut dyn std::io::Write) -> Result<u64> {
1230        let mut conn = self.lock_connection()?;
1231        conn.copy_out_to_writer(query, writer)
1232    }
1233}
1234
1235impl Cancellable for Client {
1236    /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
1237    /// fresh connection. Swallows errors (logged via `tracing::warn!`)
1238    /// because cancellation is a best-effort signal and callers cannot
1239    /// meaningfully recover from a failed cancel.
1240    fn cancel(&self) {
1241        if let Err(e) = Client::cancel(self) {
1242            warn!(
1243                target: "hyperdb_api_core::client",
1244                error = %e,
1245                process_id = self.process_id,
1246                "cancel request failed (best-effort, swallowed)",
1247            );
1248        }
1249    }
1250}
1251
1252/// A writer for COPY IN operations.
1253///
1254/// This struct holds the connection lock while sending data to ensure
1255/// exclusive access during the COPY operation.
1256#[derive(Debug)]
1257pub struct CopyInWriter<'a> {
1258    connection: MutexGuard<'a, RawConnection<SyncStream>>,
1259}
1260
1261impl CopyInWriter<'_> {
1262    /// Sends a chunk of COPY data.
1263    ///
1264    /// The data should be in `HyperBinary` format. For best performance,
1265    /// batch multiple rows into larger chunks (e.g., 1-16 MB).
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns [`Error`] (I/O) if the wire write of the `CopyData` frame
1270    /// fails.
1271    pub fn send(&mut self, data: &[u8]) -> Result<()> {
1272        self.connection.send_copy_data(data)
1273    }
1274
1275    /// Flushes any buffered data to the server.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns [`Error`] (I/O) if flushing the internal write buffer to
1280    /// the transport fails.
1281    pub fn flush(&mut self) -> Result<()> {
1282        self.connection.flush()
1283    }
1284
1285    /// Sends COPY data directly to the stream without internal buffering.
1286    ///
1287    /// This writes data directly to the TCP stream, letting the kernel handle
1288    /// buffering. More efficient for streaming large amounts of data.
1289    /// Call `flush_stream()` periodically to ensure data is sent.
1290    ///
1291    /// # Errors
1292    ///
1293    /// Returns [`Error`] (I/O) if writing to the underlying stream fails.
1294    pub fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1295        self.connection.send_copy_data_direct(data)
1296    }
1297
1298    /// Flushes the TCP stream.
1299    ///
1300    /// Use with `send_direct()` to periodically ensure data reaches the server.
1301    ///
1302    /// # Errors
1303    ///
1304    /// Returns [`Error`] (I/O) if flushing the underlying transport
1305    /// fails.
1306    pub fn flush_stream(&mut self) -> Result<()> {
1307        self.connection.flush_stream()
1308    }
1309
1310    /// Reserves capacity in the write buffer to avoid reallocations.
1311    ///
1312    /// Call this before bulk operations to pre-allocate buffer space.
1313    pub fn reserve_buffer(&mut self, capacity: usize) {
1314        self.connection.reserve_write_buffer(capacity);
1315    }
1316
1317    /// Finishes the COPY operation successfully.
1318    ///
1319    /// Returns the number of rows inserted.
1320    ///
1321    /// # Errors
1322    ///
1323    /// - Returns [`Error`] (I/O) if writing `CopyDone` or flushing the
1324    ///   transport fails.
1325    /// - Returns [`Error`] (server) if the server reports a COPY-side
1326    ///   failure (constraint violation, type coercion error, etc.)
1327    ///   via an `ErrorResponse` after `CopyDone`.
1328    pub fn finish(mut self) -> Result<u64> {
1329        self.connection.finish_copy()
1330    }
1331
1332    /// Cancels the COPY operation.
1333    ///
1334    /// All data sent so far will be discarded.
1335    ///
1336    /// # Errors
1337    ///
1338    /// Returns [`Error`] (I/O) if writing the `CopyFail` frame or
1339    /// flushing the transport fails, or [`Error`] (server) if the server
1340    /// reports an unexpected status after the cancel.
1341    pub fn cancel(mut self, reason: &str) -> Result<()> {
1342        self.connection.cancel_copy(reason)
1343    }
1344}
1345
1346/// Parses affected row count from a command tag.
1347fn parse_affected_rows(tag: &str) -> Option<u64> {
1348    let parts: Vec<&str> = tag.split_whitespace().collect();
1349
1350    match parts.first()? {
1351        &"INSERT" => {
1352            // INSERT oid count
1353            parts.get(2)?.parse().ok()
1354        }
1355        &"UPDATE" | &"DELETE" | &"SELECT" | &"COPY" => {
1356            // UPDATE/DELETE/SELECT/COPY count
1357            parts.get(1)?.parse().ok()
1358        }
1359        _ => None,
1360    }
1361}
1362
1363/// Streaming iterator for query results without materializing all rows.
1364///
1365/// Holding a `QueryStream` keeps the underlying [`RawConnection`] locked
1366/// via a `MutexGuard`. Dropping the stream before fully iterating triggers
1367/// a server-side cancel (see [`Drop`] below) so the connection is returned
1368/// to the pool cleanly.
1369pub struct QueryStream<'a> {
1370    conn: Option<MutexGuard<'a, RawConnection<SyncStream>>>,
1371    /// Best-effort cancel handle, used in [`Drop`] when the stream is
1372    /// abandoned before completion. For the current sync client this is
1373    /// the owning [`Client`] itself (which implements [`Cancellable`] via
1374    /// a PG wire `CancelRequest` on a fresh connection). When a gRPC
1375    /// equivalent lands it will plug in the same trait with a
1376    /// `cancel_query(query_id)` RPC implementation.
1377    canceller: &'a dyn Cancellable,
1378    finished: bool,
1379    chunk_size: usize,
1380    schema: Option<Vec<super::statement::Column>>,
1381    schema_read: bool,
1382}
1383
1384impl std::fmt::Debug for QueryStream<'_> {
1385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1386        f.debug_struct("QueryStream")
1387            .field("finished", &self.finished)
1388            .field("chunk_size", &self.chunk_size)
1389            .field("schema_read", &self.schema_read)
1390            .finish_non_exhaustive()
1391    }
1392}
1393
1394impl Drop for QueryStream<'_> {
1395    fn drop(&mut self) {
1396        // If the caller exhausted the stream, the connection is already at
1397        // `ReadyForQuery` — nothing to do.
1398        if self.finished {
1399            return;
1400        }
1401
1402        // Otherwise: the server is still happily streaming rows (potentially
1403        // millions of them) for a query we no longer care about. Passively
1404        // draining would waste bandwidth and could block the destructor for
1405        // a very long time. Instead we send a transport-appropriate cancel
1406        // signal — on PG wire this is a `CancelRequest` packet on a fresh
1407        // connection; on gRPC (future) it will be a `cancel_query` RPC on
1408        // the shared channel. `Cancellable::cancel` is fire-and-forget and
1409        // cannot fail.
1410        self.canceller.cancel();
1411
1412        // After cancel, the server stops producing new rows and emits
1413        // `ErrorResponse(QueryCanceled) + ReadyForQuery` promptly. We still
1414        // need to drain those trailing messages off the wire so the
1415        // connection returns to the pool cleanly. The budget here is small
1416        // because we only expect: (a) whatever rows the server had already
1417        // flushed before seeing the cancel, (b) `ErrorResponse`,
1418        // (c) `ReadyForQuery`. A well-behaved server reaches `ReadyForQuery`
1419        // within a handful of messages. If that budget is somehow exceeded
1420        // the bounded drain logs a warning and marks the connection
1421        // desynchronized — downstream users will surface the desync as a
1422        // transport-level failure and reconnect.
1423        const POST_CANCEL_DRAIN_CAP: usize = 1024;
1424        if let Some(ref mut conn) = self.conn {
1425            let _ok = conn.drain_until_ready_bounded(POST_CANCEL_DRAIN_CAP);
1426        }
1427    }
1428}
1429
1430impl QueryStream<'_> {
1431    /// Returns the schema (column metadata) for the result set.
1432    #[must_use]
1433    pub fn schema(&self) -> Option<&[super::statement::Column]> {
1434        self.schema.as_deref()
1435    }
1436
1437    /// Retrieves the next chunk of rows (up to `chunk_size`).
1438    ///
1439    /// # Errors
1440    ///
1441    /// - Returns [`Error`] (I/O) if reading from the underlying transport
1442    ///   fails while awaiting the next protocol message.
1443    /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
1444    ///   during streaming (for example, a server-side execution failure
1445    ///   encountered partway through the result set).
1446    pub fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
1447        if self.finished {
1448            return Ok(None);
1449        }
1450
1451        let Some(conn) = self.conn.as_mut() else {
1452            return Ok(None);
1453        };
1454
1455        let mut rows = Vec::with_capacity(self.chunk_size);
1456        while rows.len() < self.chunk_size {
1457            let msg = conn.read_message()?;
1458            match msg {
1459                Message::RowDescription(desc) if !self.schema_read => {
1460                    let mut cols = Vec::new();
1461                    for f in desc.fields().filter_map(std::result::Result::ok) {
1462                        cols.push(super::statement::Column::new(
1463                            f.name().to_string(),
1464                            f.type_oid(),
1465                            f.type_modifier(),
1466                            super::statement::ColumnFormat::from_code(f.format()),
1467                        ));
1468                    }
1469                    self.schema = Some(cols);
1470                    self.schema_read = true;
1471                }
1472                Message::DataRow(data) => {
1473                    rows.push(StreamRow::new(data));
1474                    if rows.len() >= self.chunk_size {
1475                        return Ok(Some(rows));
1476                    }
1477                }
1478                Message::ReadyForQuery(_) => {
1479                    self.finished = true;
1480                    self.conn = None;
1481                    return if rows.is_empty() {
1482                        Ok(None)
1483                    } else {
1484                        Ok(Some(rows))
1485                    };
1486                }
1487                Message::ErrorResponse(body) => {
1488                    // Mark the stream finished *before* touching the
1489                    // connection so the `Drop` impl's Cancellable-based
1490                    // cleanup path is trivially a no-op regardless of what
1491                    // happens next (normal return, `?`, panic in drain,
1492                    // future refactors that insert early returns, etc).
1493                    //
1494                    // `&mut self` exclusivity means `Drop` can't fire
1495                    // concurrently with this method, so this is purely a
1496                    // defensive-ordering improvement — but it also matches
1497                    // the `ReadyForQuery` arm above, which sets
1498                    // `finished = true` first for the same reason. Keeping
1499                    // both terminal arms in the same order makes the
1500                    // "terminal state is committed before cleanup" rule
1501                    // visible at a glance.
1502                    self.finished = true;
1503                    // Drain through `ReadyForQuery` before releasing the
1504                    // pooled connection so the next user sees a clean wire
1505                    // state. `consume_error` swallows any drain I/O errors
1506                    // via tracing::warn — the caller's original error is
1507                    // more informative than a transport hiccup during
1508                    // cleanup.
1509                    let err = match self.conn {
1510                        Some(ref mut c) => c.consume_error(&body),
1511                        None => parse_error_response(&body),
1512                    };
1513                    self.conn = None;
1514                    return Err(err);
1515                }
1516                _ => {}
1517            }
1518        }
1519        Ok(Some(rows))
1520    }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525    use super::*;
1526
1527    #[test]
1528    fn test_parse_affected_rows() {
1529        assert_eq!(parse_affected_rows("INSERT 0 5"), Some(5));
1530        assert_eq!(parse_affected_rows("UPDATE 10"), Some(10));
1531        assert_eq!(parse_affected_rows("DELETE 3"), Some(3));
1532        assert_eq!(parse_affected_rows("SELECT 100"), Some(100));
1533        assert_eq!(parse_affected_rows("CREATE TABLE"), None);
1534    }
1535
1536    #[test]
1537    fn test_copy_in_raw_rejects_non_copy_query() {
1538        // We can't test with a real connection, but we can verify the prefix guard
1539        // works by checking the error message. Since Client::connect fails without
1540        // a server, we test the validation logic directly.
1541        let query = "SELECT * FROM users";
1542        assert!(
1543            !query.trim_start().to_ascii_uppercase().starts_with("COPY"),
1544            "Non-COPY query should not pass the COPY prefix check"
1545        );
1546
1547        let copy_query = "COPY \"users\" FROM STDIN WITH (FORMAT csv)";
1548        assert!(
1549            copy_query
1550                .trim_start()
1551                .to_ascii_uppercase()
1552                .starts_with("COPY"),
1553            "COPY query should pass the prefix check"
1554        );
1555
1556        // Leading whitespace should be accepted
1557        let padded = "  COPY \"users\" FROM STDIN";
1558        assert!(padded.trim_start().to_ascii_uppercase().starts_with("COPY"));
1559
1560        // Case-insensitive
1561        let lowercase = "copy \"users\" FROM STDIN";
1562        assert!(lowercase
1563            .trim_start()
1564            .to_ascii_uppercase()
1565            .starts_with("COPY"));
1566    }
1567}