Skip to main content

hyperdb_api_core/client/
client.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level synchronous client for Hyper database.
5//!
6//! This module provides [`Client`], the primary synchronous interface for
7//! communicating with a Hyper server. It supports three query execution
8//! modes, each using a different level of the `PostgreSQL` wire protocol:
9//!
10//! - **Simple Query** ([`Client::query`]) — Sends a single `Query` message
11//!   and collects all `DataRow` messages into memory. Results are returned
12//!   in text format. Best for small result sets or DDL/DML commands.
13//!
14//! - **Extended Query / `HyperBinary`** ([`Client::query_fast`]) — Uses the
15//!   Extended Query protocol (`Parse` / `Bind` / `Execute`) with
16//!   [`ColumnFormat::HyperBinary`](super::statement::ColumnFormat::HyperBinary)
17//!   (format code 2) for `LittleEndian` binary results. Returns
18//!   [`StreamRow`]s that compute field offsets
19//!   on-demand, avoiding per-row allocation.
20//!
21//! - **Streaming** ([`Client::query_streaming`]) — Like `query_fast` but
22//!   returns a [`QueryStream`] that yields rows in chunks, keeping memory
23//!   usage constant regardless of result set size. The stream holds the
24//!   connection lock for its lifetime; dropping it triggers a cancel request
25//!   to stop the server from streaming the rest.
26//!
27//! # Bulk Insertion (COPY Protocol)
28//!
29//! [`Client::copy_in`] starts a `COPY ... FROM STDIN WITH (FORMAT HYPERBINARY)`
30//! session and returns a [`CopyInWriter`] for streaming binary data. The
31//! caller is responsible for encoding rows in the correct format (typically
32//! done by the higher-level [`hyperdb_api::Inserter`](https://docs.rs/hyperdb-api)).
33//! See also [`copy_in_with_format`](Client::copy_in_with_format) for
34//! alternative formats (CSV, Arrow IPC) and [`copy_in_raw`](Client::copy_in_raw)
35//! for fully custom COPY statements.
36
37use std::net::TcpStream;
38use std::sync::{Arc, Mutex, MutexGuard};
39
40use tracing::{debug, info, trace, warn};
41
42#[cfg(unix)]
43use std::os::unix::net::UnixStream;
44
45use super::cancel::Cancellable;
46use super::config::Config;
47use super::connection::{parse_error_response, RawConnection};
48use super::endpoint::ConnectionEndpoint;
49use super::error::{Error, ErrorKind, Result};
50use super::prepare;
51use super::row::{Row, StreamRow};
52use super::sync_stream::SyncStream;
53
54use crate::protocol::message::Message;
55use crate::types::Oid;
56
57use super::notice::{Notice, NoticeReceiver};
58
59/// A synchronous client for Hyper database.
60///
61/// The client handles connection management and query execution.
62/// It is thread-safe and can be shared between threads using `Arc`.
63///
64/// # Thread Safety
65///
66/// The `Client` is thread-safe and can be shared between threads using `Arc<Client>`.
67/// All methods use internal mutexes to synchronize access to the underlying connection.
68///
69/// # Example
70///
71/// ```no_run
72/// use hyperdb_api_core::client::{Client, Config};
73///
74/// # fn example() -> hyperdb_api_core::client::Result<()> {
75/// let config = Config::new()
76///     .with_host("localhost")
77///     .with_port(7483)
78///     .with_database("test.hyper");
79///
80/// let client = Client::connect(&config)?;
81/// let rows = client.query("SELECT 1")?;
82/// client.close()?;
83/// # Ok(())
84/// # }
85/// ```
86pub struct Client {
87    /// The underlying connection, protected by a mutex for thread safety.
88    connection: Arc<Mutex<RawConnection<SyncStream>>>,
89    /// Backend process ID (for cancel requests).
90    process_id: i32,
91    /// Secret key for authenticating cancel requests.
92    secret_key: i32,
93    /// Connection endpoint for cancel requests and reconnection.
94    endpoint: ConnectionEndpoint,
95    /// Optional notice receiver callback for server notices/warnings.
96    notice_receiver: Option<Arc<NoticeReceiver>>,
97}
98
99// Manual Debug implementation because NoticeReceiver doesn't implement Debug
100impl std::fmt::Debug for Client {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("Client")
103            .field("process_id", &self.process_id)
104            .field("secret_key", &self.secret_key)
105            .field("endpoint", &self.endpoint)
106            .field(
107                "notice_receiver",
108                &self.notice_receiver.as_ref().map(|_| "<callback>"),
109            )
110            .finish_non_exhaustive()
111    }
112}
113
114impl Client {
115    /// Connects to a Hyper server using the given configuration.
116    ///
117    /// Establishes a TCP connection, performs authentication, and initializes
118    /// the client. Returns an error if the connection fails or authentication
119    /// is rejected.
120    ///
121    /// # Arguments
122    ///
123    /// * `config` - Connection configuration (host, port, credentials, etc.)
124    ///
125    /// # Errors
126    ///
127    /// Returns `Error` if:
128    /// - Connection to the server fails
129    /// - Authentication fails
130    /// - Protocol handshake fails
131    ///
132    /// # Example
133    ///
134    /// ```no_run
135    /// # use hyperdb_api_core::client::{Client, Config};
136    /// # fn example() -> hyperdb_api_core::client::Result<()> {
137    /// let config = Config::new()
138    ///     .with_host("localhost")
139    ///     .with_port(7483)
140    ///     .with_user("myuser")
141    ///     .with_password("mypass")
142    ///     .with_database("test.hyper");
143    ///
144    /// let client = Client::connect(&config)?;
145    /// # Ok(())
146    /// # }
147    /// ```
148    pub fn connect(config: &Config) -> Result<Self> {
149        // Log connection parameters (password is intentionally omitted for security)
150        info!(
151            target: "hyperdb_api",
152            host = %config.host(),
153            port = config.port(),
154            user = config.user().unwrap_or("(default)"),
155            database = config.database().unwrap_or("(none)"),
156            "connection-parameters"
157        );
158
159        let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
160        let addr = format!("{}:{}", config.host(), config.port());
161        let tcp_stream = TcpStream::connect(&addr).map_err(|e| {
162            warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
163            Error::connection(format!("failed to connect to {addr}: {e}"))
164        })?;
165
166        // Set TCP options for better performance.
167        //
168        // `TCP_NODELAY` disables Nagle so request bytes flush immediately —
169        // needed for low-latency request/response shapes.
170        //
171        // `SO_RCVBUF` / `SO_SNDBUF` are bumped to 4 MiB. The Windows default
172        // TCP buffers are ~64 KiB, which throttles loopback throughput
173        // because hyperd blocks on `send()` once the kernel buffer fills up.
174        // Linux auto-tunes much higher, so this primarily helps Windows
175        // (and is a marginal win on macOS).
176        //
177        // Empirical knee on Windows i9-10980XE / TCP loopback (100M-row
178        // sync full-scan, single connection):
179        //
180        // |  size | rows/sec |
181        // |------:|---------:|
182        // |  64 K |     2.89 |  (default)
183        // |   1 M |     5.95 |
184        // |   4 M |     6.90 |  <-- knee
185        // |   8 M |     6.68 |  (insert workloads regress 18%)
186        //
187        // 4 MiB hits the throughput plateau without the memory-pressure
188        // regression seen at 8 MiB. We use `.ok()` because the kernel may
189        // clamp to a lower value or refuse the request entirely; either
190        // way the connection still works at the default size.
191        tcp_stream.set_nodelay(true).ok();
192        let sock = socket2::SockRef::from(&tcp_stream);
193        sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
194        sock.set_send_buffer_size(4 * 1024 * 1024).ok();
195
196        let stream = SyncStream::tcp(tcp_stream);
197        let mut connection = RawConnection::new(stream);
198
199        // Perform startup with authentication
200        let params = config.startup_params();
201        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
202        connection.startup(&params_ref, config.password())?;
203
204        let process_id = connection.process_id();
205        let secret_key = connection.secret_key();
206
207        debug!(
208            target: "hyperdb_api",
209            process_id,
210            "connection-established"
211        );
212
213        Ok(Client {
214            connection: Arc::new(Mutex::new(connection)),
215            process_id,
216            secret_key,
217            endpoint,
218            notice_receiver: None,
219        })
220    }
221
222    /// Connects to a Hyper server via Unix Domain Socket (Unix only).
223    ///
224    /// # Example
225    ///
226    /// ```no_run
227    /// # use hyperdb_api_core::client::{Client, Config};
228    /// # use std::path::Path;
229    /// # fn example() -> hyperdb_api_core::client::Result<()> {
230    /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
231    /// let config = Config::new().with_database("test.hyper");
232    /// let client = Client::connect_unix(socket_path, &config)?;
233    /// # Ok(())
234    /// # }
235    /// ```
236    ///
237    /// # Errors
238    ///
239    /// - Returns [`Error`] (connection) if the Unix domain socket cannot
240    ///   be connected.
241    /// - Propagates any [`Error`] from the startup handshake
242    ///   (authentication, protocol error, I/O error).
243    #[cfg(unix)]
244    pub fn connect_unix(socket_path: impl AsRef<std::path::Path>, config: &Config) -> Result<Self> {
245        use std::path::Path;
246
247        let path = socket_path.as_ref();
248        info!(
249            target: "hyperdb_api",
250            socket_path = %path.display(),
251            user = config.user().unwrap_or("(default)"),
252            database = config.database().unwrap_or("(none)"),
253            "connection-parameters-unix"
254        );
255
256        let unix_stream = UnixStream::connect(path).map_err(|e| {
257            warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
258            Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
259        })?;
260
261        // Parse endpoint from socket path
262        let directory = path.parent().unwrap_or(Path::new("/"));
263        let name = path
264            .file_name()
265            .and_then(|n| n.to_str())
266            .unwrap_or("socket");
267        let endpoint = ConnectionEndpoint::domain_socket(directory, name);
268
269        let stream = SyncStream::unix(unix_stream);
270        let mut connection = RawConnection::new(stream);
271
272        // Perform startup with authentication
273        let params = config.startup_params();
274        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
275        connection.startup(&params_ref, config.password())?;
276
277        let process_id = connection.process_id();
278        let secret_key = connection.secret_key();
279
280        debug!(
281            target: "hyperdb_api",
282            process_id,
283            "connection-established-unix"
284        );
285
286        Ok(Client {
287            connection: Arc::new(Mutex::new(connection)),
288            process_id,
289            secret_key,
290            endpoint,
291            notice_receiver: None,
292        })
293    }
294
295    /// Connects to a Hyper server via Windows Named Pipe (Windows only).
296    ///
297    /// # Arguments
298    ///
299    /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
300    /// * `config` - Connection configuration
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
305    /// exist, all instances are busy after the retry window, or permission is
306    /// denied) or if the authentication handshake fails.
307    #[cfg(windows)]
308    pub fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
309        use std::fs::OpenOptions;
310        use std::time::{Duration, Instant};
311
312        info!(
313            target: "hyperdb_api",
314            pipe_path = %pipe_path,
315            user = config.user().unwrap_or("(default)"),
316            database = config.database().unwrap_or("(none)"),
317            "connection-parameters-named-pipe"
318        );
319
320        // Windows named pipes have a finite number of server-side instances
321        // (`MaxInstances` on `CreateNamedPipe`). When all are busy, `CreateFile`
322        // returns `ERROR_PIPE_BUSY` (231). The expected client behavior is to
323        // wait briefly and retry — equivalent to `WaitNamedPipe` from Win32.
324        // We poll with a short sleep up to a reasonable deadline so concurrent
325        // clients don't spuriously fail when the pool is momentarily exhausted.
326        const RETRY_INTERVAL: Duration = Duration::from_millis(20);
327        const MAX_WAIT: Duration = Duration::from_secs(10);
328        const ERROR_PIPE_BUSY: i32 = 231;
329
330        let deadline = Instant::now() + MAX_WAIT;
331        let file = loop {
332            match OpenOptions::new().read(true).write(true).open(pipe_path) {
333                Ok(f) => break f,
334                Err(e)
335                    if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
336                {
337                    std::thread::sleep(RETRY_INTERVAL);
338                }
339                Err(e) => {
340                    warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
341                    return Err(Error::connection(format!(
342                        "failed to connect to named pipe {pipe_path}: {e}"
343                    )));
344                }
345            }
346        };
347
348        // Parse endpoint from pipe path
349        let endpoint = ConnectionEndpoint::parse(&format!(
350            "tab.pipe://{}",
351            pipe_path.trim_start_matches(r"\\").replace('\\', "/")
352        ))
353        .unwrap_or_else(|_| {
354            // Fallback: construct from pipe path directly
355            // Expected format: \\<host>\pipe\<name>
356            let parts: Vec<&str> = pipe_path
357                .trim_start_matches(r"\\")
358                .splitn(3, '\\')
359                .collect();
360            if parts.len() >= 3 {
361                ConnectionEndpoint::named_pipe(parts[0], parts[2])
362            } else {
363                ConnectionEndpoint::named_pipe(".", pipe_path)
364            }
365        });
366
367        let stream = SyncStream::named_pipe(file);
368        let mut connection = RawConnection::new(stream);
369
370        // Perform startup with authentication
371        let params = config.startup_params();
372        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
373        connection.startup(&params_ref, config.password())?;
374
375        let process_id = connection.process_id();
376        let secret_key = connection.secret_key();
377
378        debug!(
379            target: "hyperdb_api",
380            process_id,
381            "connection-established-named-pipe"
382        );
383
384        Ok(Client {
385            connection: Arc::new(Mutex::new(connection)),
386            process_id,
387            secret_key,
388            endpoint,
389            notice_receiver: None,
390        })
391    }
392
393    /// Connects to a Hyper server using a `ConnectionEndpoint`.
394    ///
395    /// This is a lower-level method that accepts a pre-parsed endpoint.
396    ///
397    /// # Errors
398    ///
399    /// Delegates to [`Client::connect`], [`Client::connect_unix`], or
400    /// `Client::connect_named_pipe` depending on the endpoint variant,
401    /// and propagates their errors unchanged.
402    pub fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
403        match endpoint {
404            ConnectionEndpoint::Tcp { host, port } => {
405                let mut cfg = config.clone();
406                cfg = cfg.with_host(host.clone()).with_port(*port);
407                Self::connect(&cfg)
408            }
409            #[cfg(unix)]
410            ConnectionEndpoint::DomainSocket { directory, name } => {
411                let socket_path = directory.join(name);
412                Self::connect_unix(&socket_path, config)
413            }
414            #[cfg(windows)]
415            ConnectionEndpoint::NamedPipe { host, name } => {
416                let pipe_path = format!(r"\\{host}\pipe\{name}");
417                Self::connect_named_pipe(&pipe_path, config)
418            }
419        }
420    }
421
422    /// Returns the connection endpoint.
423    #[must_use]
424    pub fn endpoint(&self) -> &ConnectionEndpoint {
425        &self.endpoint
426    }
427
428    /// Returns the server process ID for this connection.
429    #[must_use]
430    pub fn process_id(&self) -> i32 {
431        self.process_id
432    }
433
434    /// Returns the secret key for cancel requests.
435    #[must_use]
436    pub fn secret_key(&self) -> i32 {
437        self.secret_key
438    }
439
440    /// Cancels the currently executing query on this connection.
441    ///
442    /// This method is **thread-safe** and can be called from any thread while
443    /// a query is running on another thread. It works by opening a separate
444    /// TCP connection to the server and sending a cancel request.
445    ///
446    /// # How It Works
447    ///
448    /// 1. Opens a new TCP connection to the same server
449    /// 2. Sends a cancel request containing the process ID and secret key
450    /// 3. The server receives this and cancels the running query
451    /// 4. The original query will fail with error code 57014 (`query_canceled`)
452    ///
453    /// # Thread Safety
454    ///
455    /// This method does NOT acquire the connection mutex, so it can be called
456    /// while another thread is blocked waiting for query results.
457    ///
458    /// # Relation to the [`Cancellable`] trait
459    ///
460    /// This is the **fallible user-facing cancel API**: it returns a
461    /// `Result<()>` so explicit callers can observe transport-level
462    /// failures (network errors, socket issues) and react accordingly —
463    /// e.g. record a metric, show "cancel failed" UX, or retry.
464    ///
465    /// For [`Drop`]-path and other internal cleanup contexts where error
466    /// propagation is impossible, the separate
467    /// [`impl Cancellable for Client`](super::cancel::Cancellable) wraps
468    /// this method and swallows errors (logged via `tracing::warn!`).
469    /// The two coexist by design — each serves a different consumer.
470    ///
471    /// # Example
472    ///
473    /// ```no_run
474    /// use std::thread;
475    /// use std::sync::Arc;
476    /// use std::time::Duration;
477    /// use hyperdb_api_core::client::{Client, Config};
478    ///
479    /// # fn example() -> hyperdb_api_core::client::Result<()> {
480    /// # let config = Config::new().with_host("localhost").with_port(7483);
481    /// let client = Arc::new(Client::connect(&config)?);
482    /// let client_clone = Arc::clone(&client);
483    ///
484    /// // Start a long query in another thread
485    /// let handle = thread::spawn(move || {
486    ///     client_clone.query("SELECT pg_sleep(60)")
487    /// });
488    ///
489    /// // Cancel from the main thread
490    /// thread::sleep(Duration::from_millis(100));
491    /// client.cancel()?;
492    ///
493    /// // The query thread will get a cancellation error
494    /// let result = handle.join().unwrap();
495    /// assert!(result.is_err());
496    /// # Ok(())
497    /// # }
498    /// ```
499    ///
500    /// # Errors
501    ///
502    /// - Returns [`Error`] (connection) if the fresh cancel-side socket
503    ///   cannot be opened (TCP / UDS / named-pipe, depending on
504    ///   [`Self::endpoint`]).
505    /// - Returns [`Error`] (I/O) if writing or flushing the cancel
506    ///   request fails.
507    pub fn cancel(&self) -> Result<()> {
508        use crate::protocol::message::frontend;
509        use bytes::BytesMut;
510        use std::io::Write;
511
512        info!(
513            target: "hyperdb_api",
514            process_id = self.process_id,
515            "query-cancel-request"
516        );
517
518        let endpoint_str = self.endpoint.to_string();
519
520        // Open a new connection specifically for the cancel request
521        match &self.endpoint {
522            ConnectionEndpoint::Tcp { host, port } => {
523                let addr = format!("{host}:{port}");
524                let mut stream = TcpStream::connect(&addr).map_err(|e| {
525                    warn!(
526                        target: "hyperdb_api",
527                        addr = %endpoint_str,
528                        error = %e,
529                        "query-cancel-connect-failed"
530                    );
531                    Error::connection(format!(
532                        "failed to connect for cancel request to {endpoint_str}: {e}"
533                    ))
534                })?;
535                // Cancel is a 16-byte fire-and-forget — disable Nagle so the
536                // request hits the wire without waiting on a coalesce timer.
537                stream.set_nodelay(true).ok();
538
539                // Build and send the cancel request
540                let mut buf = BytesMut::with_capacity(16);
541                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
542
543                stream.write_all(&buf).map_err(|e| {
544                    warn!(
545                        target: "hyperdb_api",
546                        error = %e,
547                        "query-cancel-send-failed"
548                    );
549                    Error::io(e)
550                })?;
551
552                stream.flush().map_err(Error::io)?;
553            }
554            #[cfg(unix)]
555            ConnectionEndpoint::DomainSocket { directory, name } => {
556                let socket_path = directory.join(name);
557                let mut stream = UnixStream::connect(&socket_path).map_err(|e| {
558                    warn!(
559                        target: "hyperdb_api",
560                        addr = %endpoint_str,
561                        error = %e,
562                        "query-cancel-connect-failed"
563                    );
564                    Error::connection(format!(
565                        "failed to connect for cancel request to {endpoint_str}: {e}"
566                    ))
567                })?;
568
569                // Build and send the cancel request
570                let mut buf = BytesMut::with_capacity(16);
571                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
572
573                stream.write_all(&buf).map_err(|e| {
574                    warn!(
575                        target: "hyperdb_api",
576                        error = %e,
577                        "query-cancel-send-failed"
578                    );
579                    Error::io(e)
580                })?;
581
582                stream.flush().map_err(Error::io)?;
583            }
584            #[cfg(windows)]
585            ConnectionEndpoint::NamedPipe { host, name } => {
586                let pipe_path = format!(r"\\{host}\pipe\{name}");
587                let mut file = std::fs::OpenOptions::new()
588                    .read(true)
589                    .write(true)
590                    .open(&pipe_path)
591                    .map_err(|e| {
592                        warn!(
593                            target: "hyperdb_api",
594                            addr = %endpoint_str,
595                            error = %e,
596                            "query-cancel-connect-failed"
597                        );
598                        Error::connection(format!(
599                            "failed to connect for cancel request to {endpoint_str}: {e}"
600                        ))
601                    })?;
602
603                // Build and send the cancel request
604                let mut buf = BytesMut::with_capacity(16);
605                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
606
607                file.write_all(&buf).map_err(|e| {
608                    warn!(
609                        target: "hyperdb_api",
610                        error = %e,
611                        "query-cancel-send-failed"
612                    );
613                    Error::io(e)
614                })?;
615
616                file.flush().map_err(Error::io)?;
617            }
618        }
619
620        debug!(
621            target: "hyperdb_api",
622            process_id = self.process_id,
623            "query-cancel-sent"
624        );
625
626        Ok(())
627    }
628
629    /// Returns a server parameter value by name.
630    ///
631    /// Server parameters are sent by the server during connection startup.
632    /// Common parameters include:
633    /// - `server_version` - The server version string
634    /// - `server_encoding` - The server's character encoding
635    /// - `client_encoding` - The client's character encoding
636    ///
637    /// # Example
638    ///
639    /// ```no_run
640    /// # use hyperdb_api_core::client::{Client, Config};
641    /// # fn example(client: &Client) {
642    /// if let Some(version) = client.parameter_status("server_version") {
643    ///     println!("Connected to Hyper version: {}", version);
644    /// }
645    /// # }
646    /// ```
647    #[must_use]
648    pub fn parameter_status(&self, name: &str) -> Option<String> {
649        let conn = self.connection.lock().ok()?;
650        conn.parameter_status(name)
651            .map(std::string::ToString::to_string)
652    }
653
654    /// Sets the notice receiver for this connection.
655    ///
656    /// Notice and warning messages generated by the server are not returned by
657    /// query execution functions since they don't indicate failure. Instead,
658    /// they are passed to a notice handling function.
659    ///
660    /// The default behavior is to log notices at the `warn` level.
661    ///
662    /// # Arguments
663    ///
664    /// * `receiver` - The callback function that will be called with each notice.
665    ///   Pass `None` to restore default logging behavior.
666    ///
667    /// # Example
668    ///
669    /// ```no_run
670    /// # use hyperdb_api_core::client::Client;
671    /// # use std::sync::{Arc, Mutex};
672    /// # fn example(client: &mut Client) {
673    /// client.set_notice_receiver(Some(Box::new(|notice| {
674    ///     println!("Server notice: {}", notice);
675    /// })));
676    ///
677    /// // Or capture notices in a Vec
678    /// let notices = Arc::new(Mutex::new(Vec::new()));
679    /// let notices_clone = notices.clone();
680    /// client.set_notice_receiver(Some(Box::new(move |notice| {
681    ///     notices_clone.lock().unwrap().push(notice);
682    /// })));
683    /// # }
684    /// ```
685    pub fn set_notice_receiver(&mut self, receiver: Option<NoticeReceiver>) {
686        self.notice_receiver = receiver.map(Arc::new);
687    }
688
689    /// Processes any notices in a list of messages, calling the notice receiver.
690    ///
691    /// This is called internally after receiving messages from the server.
692    pub(crate) fn process_notices(&self, messages: &[Message]) {
693        for msg in messages {
694            if let Message::NoticeResponse(body) = msg {
695                let notice = Notice::from_response_body(body);
696
697                if let Some(ref receiver) = self.notice_receiver {
698                    receiver(notice);
699                } else {
700                    // Default behavior: log at warn level
701                    warn!(
702                        target: "hyperdb_api",
703                        severity = notice.severity().unwrap_or("NOTICE"),
704                        code = notice.code().unwrap_or(""),
705                        message = %notice.message(),
706                        "server-notice"
707                    );
708                }
709            }
710        }
711    }
712
713    /// Acquires a lock on the connection.
714    fn lock_connection(&self) -> Result<MutexGuard<'_, RawConnection<SyncStream>>> {
715        self.connection
716            .lock()
717            .map_err(|_| Error::connection("connection mutex poisoned"))
718    }
719
720    /// Executes a simple query and returns the rows.
721    ///
722    /// # Errors
723    ///
724    /// - Returns [`Error`] (connection) if the connection mutex is
725    ///   poisoned.
726    /// - Returns [`Error`] (server) for any SQL error the server reports
727    ///   (syntax error, constraint violation, type mismatch).
728    /// - Returns [`Error`] (I/O) on wire-protocol I/O failure.
729    /// - Propagates any [`Error`] from row construction (invalid row
730    ///   description or data row bytes).
731    pub fn query(&self, query: &str) -> Result<Vec<Row>> {
732        let mut conn = self.lock_connection()?;
733        let messages = conn.simple_query(query)?;
734        drop(conn); // Release lock before processing notices
735
736        self.process_notices(&messages);
737
738        let mut rows = Vec::new();
739        let mut columns = None;
740
741        for msg in messages {
742            match msg {
743                crate::protocol::message::Message::RowDescription(desc) => {
744                    // Extract column info including format from protocol
745                    let mut cols = Vec::new();
746                    for f in desc.fields().filter_map(|r| {
747                        r.map_err(|e| trace!(target: "hyperdb_api_core::client", error = %e, "dropped error parsing row description field")).ok()
748                    }) {
749                        cols.push(super::statement::Column::new(
750                            f.name().to_string(),
751                            f.type_oid(),
752                            f.type_modifier(),
753                            super::statement::ColumnFormat::from_code(f.format()),
754                        ));
755                    }
756                    columns = Some(Arc::new(cols));
757                }
758                crate::protocol::message::Message::DataRow(data) => {
759                    if let Some(ref cols) = columns {
760                        rows.push(Row::new(Arc::clone(cols), data)?);
761                    }
762                }
763                _ => {}
764            }
765        }
766
767        Ok(rows)
768    }
769
770    /// Executes a query using `HyperBinary` format for maximum performance.
771    ///
772    /// Returns `StreamRow`s which compute offsets on-demand without pre-allocation,
773    /// making them faster for large result sets where each row is processed once.
774    /// Uses the extended query protocol with `HyperBinary` format (format code 2)
775    /// for direct binary access without text parsing overhead.
776    ///
777    /// # Errors
778    ///
779    /// Same as [`Self::query`]: connection-mutex poisoning, SQL errors
780    /// from the server, and wire-protocol I/O failures all surface as
781    /// [`Error`].
782    pub fn query_fast(&self, query: &str) -> Result<Vec<StreamRow>> {
783        let mut conn = self.lock_connection()?;
784        let messages = conn.query_binary(query)?;
785        drop(conn);
786
787        self.process_notices(&messages);
788
789        let mut rows = Vec::new();
790        for msg in messages {
791            if let crate::protocol::message::Message::DataRow(data) = msg {
792                rows.push(StreamRow::new(data));
793            }
794        }
795        Ok(rows)
796    }
797
798    /// Executes a query with streaming results for minimum memory usage.
799    ///
800    /// Combines `HyperBinary` format with incremental row fetching.
801    ///
802    /// # Errors
803    ///
804    /// - Returns [`Error`] (connection) if the connection mutex is
805    ///   poisoned.
806    /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
807    ///   `Parse`/`Bind`/`Execute` sequence for the streaming query
808    ///   fails on the server or on the wire.
809    pub fn query_streaming<'a>(
810        &'a self,
811        query: &str,
812        chunk_size: usize,
813    ) -> Result<QueryStream<'a>> {
814        let mut conn = self.lock_connection()?;
815        conn.start_query_binary(query)?;
816        Ok(QueryStream {
817            conn: Some(conn),
818            // The owning client is the canceller: if the stream is dropped
819            // before being fully drained, its `Drop` impl will call
820            // `self.cancel()` (PG wire `CancelRequest` on a fresh
821            // connection) to stop the server from streaming the rest.
822            canceller: self,
823            finished: false,
824            chunk_size: chunk_size.max(1),
825            schema: None,
826            schema_read: false,
827        })
828    }
829
830    /// Executes a SQL command that doesn't return rows (e.g., INSERT, UPDATE).
831    ///
832    /// # Errors
833    ///
834    /// Same error modes as [`Self::query`] — connection-mutex poisoning,
835    /// server-side SQL errors, and wire-protocol I/O failures all
836    /// surface as [`Error`].
837    pub fn exec(&self, query: &str) -> Result<u64> {
838        let mut conn = self.lock_connection()?;
839        let messages = conn.simple_query(query)?;
840        drop(conn); // Release lock before processing notices
841
842        self.process_notices(&messages);
843
844        let mut affected = 0u64;
845        for msg in messages {
846            if let crate::protocol::message::Message::CommandComplete(body) = msg {
847                if let Ok(tag) = body.tag() {
848                    // Parse affected row count from tag like "INSERT 0 1"
849                    if let Some(count) = parse_affected_rows(tag) {
850                        affected = count;
851                    }
852                }
853            }
854        }
855
856        Ok(affected)
857    }
858
859    /// Executes a batch of statements separated by semicolons.
860    ///
861    /// # Errors
862    ///
863    /// Same error modes as [`Self::query`] — connection-mutex poisoning,
864    /// server-side SQL errors, and wire-protocol I/O failures.
865    pub fn batch_execute(&self, query: &str) -> Result<()> {
866        let mut conn = self.lock_connection()?;
867        let messages = conn.simple_query(query)?;
868        drop(conn); // Release lock before processing notices
869
870        self.process_notices(&messages);
871        Ok(())
872    }
873
874    /// Prepares a statement for execution with the \[`params!`\] macro.
875    ///
876    /// Returns an [`prepare::OwnedPreparedStatement`] that automatically closes when dropped.
877    ///
878    /// # Example
879    ///
880    /// ```no_run
881    /// # use hyperdb_api_core::{params, client::{Client, Config}};
882    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
883    /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1")?;
884    /// let rows = client.execute(&stmt, params![42_i32])?;
885    /// # Ok(())
886    /// # }
887    /// ```
888    ///
889    /// # Errors
890    ///
891    /// Propagates any [`Error`] from [`prepare::prepare_owned`] —
892    /// connection-mutex poisoning, server-side `Parse` failures (SQL
893    /// syntax, type resolution), and wire-protocol I/O failures.
894    pub fn prepare(&self, query: &str) -> Result<prepare::OwnedPreparedStatement> {
895        prepare::prepare_owned(&self.connection, query, &[])
896    }
897
898    /// Prepares a statement with explicit parameter types.
899    ///
900    /// # Errors
901    ///
902    /// Same failure modes as [`Self::prepare`].
903    pub fn prepare_typed(
904        &self,
905        query: &str,
906        param_types: &[Oid],
907    ) -> Result<prepare::OwnedPreparedStatement> {
908        prepare::prepare_owned(&self.connection, query, param_types)
909    }
910
911    /// Executes a prepared statement with parameters.
912    ///
913    /// Use the \[`params!`\] macro for ergonomic parameter encoding:
914    ///
915    /// ```no_run
916    /// # use hyperdb_api_core::{params, client::{Client, Config}};
917    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
918    /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1 AND name = $2")?;
919    /// let rows = client.execute(&stmt, params![42_i32, "Alice"])?;
920    /// # Ok(())
921    /// # }
922    /// ```
923    ///
924    /// # Errors
925    ///
926    /// Propagates any [`Error`] from [`prepare::execute_prepared`] —
927    /// parameter-count or type mismatch, server-side SQL errors, and
928    /// wire-protocol I/O failures.
929    pub fn execute<P: AsRef<[Option<Vec<u8>>]>>(
930        &self,
931        statement: &prepare::OwnedPreparedStatement,
932        params: P,
933    ) -> Result<Vec<Row>> {
934        let params_ref: Vec<Option<&[u8]>> = params
935            .as_ref()
936            .iter()
937            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
938            .collect();
939        prepare::execute_prepared(&self.connection, statement.statement(), &params_ref)
940    }
941
942    /// Executes a prepared statement that doesn't return rows.
943    ///
944    /// # Errors
945    ///
946    /// Same failure modes as [`Self::execute`].
947    pub fn execute_no_result<P: AsRef<[Option<Vec<u8>>]>>(
948        &self,
949        statement: &prepare::OwnedPreparedStatement,
950        params: P,
951    ) -> Result<u64> {
952        let params_ref: Vec<Option<&[u8]>> = params
953            .as_ref()
954            .iter()
955            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
956            .collect();
957        prepare::execute_prepared_no_result(&self.connection, statement.statement(), &params_ref)
958    }
959
960    /// Executes a prepared statement with streaming results.
961    ///
962    /// Returns a [`PreparedQueryStream`](super::prepared_stream::PreparedQueryStream)
963    /// that yields rows in chunks, keeping memory bounded regardless of
964    /// result size. This is the prepared-statement analog of
965    /// [`query_streaming`](Self::query_streaming).
966    ///
967    /// The connection mutex is held for the duration of iteration;
968    /// dropping the stream before completion issues a best-effort cancel.
969    ///
970    /// # Errors
971    ///
972    /// - Returns [`Error`] (connection) if the connection mutex is
973    ///   poisoned.
974    /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
975    ///   `Bind`/`Execute` sequence for the prepared statement fails on
976    ///   the server or on the wire.
977    pub fn execute_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
978        &'a self,
979        statement: &prepare::OwnedPreparedStatement,
980        params: P,
981        chunk_size: usize,
982    ) -> Result<super::prepared_stream::PreparedQueryStream<'a>> {
983        let params_ref: Vec<Option<&[u8]>> = params
984            .as_ref()
985            .iter()
986            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
987            .collect();
988
989        let mut conn = self.lock_connection()?;
990        conn.start_execute_prepared(statement.name(), &params_ref, statement.columns().len())?;
991
992        let columns = std::sync::Arc::new(statement.columns().to_vec());
993        Ok(super::prepared_stream::PreparedQueryStream::new(
994            conn, self, chunk_size, columns,
995        ))
996    }
997
998    /// Closes the connection.
999    ///
1000    /// # Errors
1001    ///
1002    /// - Returns [`Error`] (connection) if the connection mutex is
1003    ///   poisoned.
1004    /// - Returns [`Error`] (I/O) if writing the `Terminate` message or
1005    ///   flushing the socket fails.
1006    pub fn close(self) -> Result<()> {
1007        let mut conn = self.lock_connection()?;
1008        conn.terminate()
1009    }
1010
1011    /// Starts a COPY IN operation for bulk data insertion.
1012    ///
1013    /// Returns a `CopyInWriter` that can be used to send data in `HyperBinary` format.
1014    ///
1015    /// # Example
1016    ///
1017    /// ```no_run
1018    /// # use hyperdb_api_core::client::Client;
1019    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1020    /// # let binary_data = &[];
1021    /// let mut writer = client.copy_in("\"my_table\"", &["col1", "col2"])?;
1022    /// writer.send(binary_data)?;
1023    /// let rows = writer.finish()?;
1024    /// # Ok(())
1025    /// # }
1026    /// ```
1027    ///
1028    /// # Errors
1029    ///
1030    /// Delegates to [`Self::copy_in_with_format`]; see that method
1031    /// for the concrete failure modes.
1032    pub fn copy_in(&self, table_name: &str, columns: &[&str]) -> Result<CopyInWriter<'_>> {
1033        self.copy_in_with_format(table_name, columns, "HYPERBINARY")
1034    }
1035
1036    /// Starts a COPY IN operation with a specified data format.
1037    ///
1038    /// Returns a `CopyInWriter` that can be used to send data in the specified format.
1039    ///
1040    /// # Arguments
1041    ///
1042    /// * `table_name` - The target table name (should be properly quoted if needed)
1043    /// * `columns` - Column names to insert into
1044    /// * `format` - The data format string: "HYPERBINARY" or "ARROWSTREAM"
1045    ///
1046    /// # Example
1047    ///
1048    /// ```no_run
1049    /// # use hyperdb_api_core::client::Client;
1050    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1051    /// # let arrow_ipc_data = &[];
1052    /// // For Arrow IPC stream format
1053    /// let mut writer = client.copy_in_with_format("\"my_table\"", &["col1", "col2"], "ARROWSTREAM")?;
1054    /// writer.send(arrow_ipc_data)?;
1055    /// let rows = writer.finish()?;
1056    /// # Ok(())
1057    /// # }
1058    /// ```
1059    ///
1060    /// # Errors
1061    ///
1062    /// - Returns [`Error`] (connection) if the connection mutex is
1063    ///   poisoned.
1064    /// - Returns [`Error`] (server) if the server rejects the generated
1065    ///   `COPY ... FROM STDIN` statement (for example, missing table
1066    ///   or mismatched columns).
1067    /// - Returns [`Error`] (I/O) on wire-protocol I/O failure while
1068    ///   initiating the COPY.
1069    pub fn copy_in_with_format(
1070        &self,
1071        table_name: &str,
1072        columns: &[&str],
1073        format: &str,
1074    ) -> Result<CopyInWriter<'_>> {
1075        let mut conn = self.lock_connection()?;
1076        conn.start_copy_in_with_format(table_name, columns, format)?;
1077        Ok(CopyInWriter { connection: conn })
1078    }
1079
1080    /// Starts a COPY IN operation from a raw SQL query string.
1081    ///
1082    /// The query must be a complete `COPY ... FROM STDIN ...` statement.
1083    /// This is useful for text-format imports (CSV, TSV) where you need
1084    /// full control over the COPY options.
1085    ///
1086    /// # Security
1087    ///
1088    /// The query is validated to start with `COPY` (case-insensitive) as a
1089    /// defense-in-depth measure. Callers are still responsible for proper
1090    /// escaping of table names and other identifiers within the query.
1091    /// Prefer [`copy_in()`](Self::copy_in) or
1092    /// [`copy_in_with_format()`](Self::copy_in_with_format) when possible,
1093    /// as they handle escaping automatically.
1094    ///
1095    /// # Example
1096    ///
1097    /// ```no_run
1098    /// # use hyperdb_api_core::client::Client;
1099    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1100    /// let mut writer = client.copy_in_raw(
1101    ///     "COPY \"my_table\" FROM STDIN WITH (FORMAT csv, HEADER true)"
1102    /// )?;
1103    /// writer.send(b"1,Alice\n2,Bob\n")?;
1104    /// let rows = writer.finish()?;
1105    /// # Ok(())
1106    /// # }
1107    /// ```
1108    ///
1109    /// # Errors
1110    ///
1111    /// - Returns [`ErrorKind::Query`] if `query` (trimmed) does not
1112    ///   start with `COPY` (defense-in-depth check against non-COPY
1113    ///   statements).
1114    /// - Returns [`Error`] (connection) if the connection mutex is
1115    ///   poisoned.
1116    /// - Returns [`Error`] (server) or [`Error`] (I/O) if the server rejects
1117    ///   the COPY statement or the wire write fails.
1118    pub fn copy_in_raw(&self, query: &str) -> Result<CopyInWriter<'_>> {
1119        // Defense-in-depth: reject queries that don't look like COPY statements
1120        if !query.trim_start().to_ascii_uppercase().starts_with("COPY") {
1121            return Err(Error::new(
1122                ErrorKind::Query,
1123                "copy_in_raw() requires a COPY statement. \
1124                 The query must start with 'COPY'.",
1125            ));
1126        }
1127        let mut conn = self.lock_connection()?;
1128        conn.start_copy_in_raw(query)?;
1129        Ok(CopyInWriter { connection: conn })
1130    }
1131
1132    /// Returns true if the connection is alive (no error has occurred).
1133    #[must_use]
1134    pub fn is_alive(&self) -> bool {
1135        self.lock_connection().is_ok()
1136    }
1137
1138    /// Executes a COPY ... TO STDOUT query and returns all output data.
1139    ///
1140    /// This is used for queries like:
1141    /// `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
1142    ///
1143    /// # Arguments
1144    ///
1145    /// * `query` - The COPY TO STDOUT query to execute
1146    ///
1147    /// # Returns
1148    ///
1149    /// The raw bytes from all `CopyData` messages concatenated together.
1150    ///
1151    /// # Example
1152    ///
1153    /// ```no_run
1154    /// # use hyperdb_api_core::client::Client;
1155    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1156    /// let arrow_data = client.copy_out(
1157    ///     "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
1158    /// )?;
1159    /// # Ok(())
1160    /// # }
1161    /// ```
1162    ///
1163    /// # Errors
1164    ///
1165    /// - Returns [`Error`] (connection) if the connection mutex is
1166    ///   poisoned.
1167    /// - Returns [`Error`] (server) if the server rejects the `COPY ... TO
1168    ///   STDOUT` statement.
1169    /// - Returns [`Error`] (I/O) if the wire read fails while collecting
1170    ///   COPY output.
1171    pub fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
1172        let mut conn = self.lock_connection()?;
1173        conn.copy_out(query)
1174    }
1175
1176    /// Executes a COPY ... TO STDOUT query and streams output to a writer.
1177    ///
1178    /// Unlike [`copy_out`](Self::copy_out) which collects all data into memory,
1179    /// this method streams each `CopyData` chunk directly to the provided writer,
1180    /// keeping memory usage constant regardless of result size.
1181    ///
1182    /// Returns the total number of bytes written.
1183    ///
1184    /// # Example
1185    ///
1186    /// ```no_run
1187    /// # use hyperdb_api_core::client::Client;
1188    /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1189    /// let mut file = std::fs::File::create("output.csv")?;
1190    /// let bytes_written = client.copy_out_to_writer(
1191    ///     "COPY (SELECT * FROM my_table) TO STDOUT WITH (FORMAT csv, HEADER true)",
1192    ///     &mut file,
1193    /// )?;
1194    /// println!("Wrote {} bytes", bytes_written);
1195    /// # Ok(())
1196    /// # }
1197    /// ```
1198    ///
1199    /// # Errors
1200    ///
1201    /// Same failure modes as [`Self::copy_out`], plus [`Error`] (I/O)
1202    /// when the supplied `writer` returns an error while receiving
1203    /// COPY chunks.
1204    pub fn copy_out_to_writer(&self, query: &str, writer: &mut dyn std::io::Write) -> Result<u64> {
1205        let mut conn = self.lock_connection()?;
1206        conn.copy_out_to_writer(query, writer)
1207    }
1208}
1209
1210impl Cancellable for Client {
1211    /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
1212    /// fresh connection. Swallows errors (logged via `tracing::warn!`)
1213    /// because cancellation is a best-effort signal and callers cannot
1214    /// meaningfully recover from a failed cancel.
1215    fn cancel(&self) {
1216        if let Err(e) = Client::cancel(self) {
1217            warn!(
1218                target: "hyperdb_api_core::client",
1219                error = %e,
1220                process_id = self.process_id,
1221                "cancel request failed (best-effort, swallowed)",
1222            );
1223        }
1224    }
1225}
1226
1227/// A writer for COPY IN operations.
1228///
1229/// This struct holds the connection lock while sending data to ensure
1230/// exclusive access during the COPY operation.
1231#[derive(Debug)]
1232pub struct CopyInWriter<'a> {
1233    connection: MutexGuard<'a, RawConnection<SyncStream>>,
1234}
1235
1236impl CopyInWriter<'_> {
1237    /// Sends a chunk of COPY data.
1238    ///
1239    /// The data should be in `HyperBinary` format. For best performance,
1240    /// batch multiple rows into larger chunks (e.g., 1-16 MB).
1241    ///
1242    /// # Errors
1243    ///
1244    /// Returns [`Error`] (I/O) if the wire write of the `CopyData` frame
1245    /// fails.
1246    pub fn send(&mut self, data: &[u8]) -> Result<()> {
1247        self.connection.send_copy_data(data)
1248    }
1249
1250    /// Flushes any buffered data to the server.
1251    ///
1252    /// # Errors
1253    ///
1254    /// Returns [`Error`] (I/O) if flushing the internal write buffer to
1255    /// the transport fails.
1256    pub fn flush(&mut self) -> Result<()> {
1257        self.connection.flush()
1258    }
1259
1260    /// Sends COPY data directly to the stream without internal buffering.
1261    ///
1262    /// This writes data directly to the TCP stream, letting the kernel handle
1263    /// buffering. More efficient for streaming large amounts of data.
1264    /// Call `flush_stream()` periodically to ensure data is sent.
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns [`Error`] (I/O) if writing to the underlying stream fails.
1269    pub fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1270        self.connection.send_copy_data_direct(data)
1271    }
1272
1273    /// Flushes the TCP stream.
1274    ///
1275    /// Use with `send_direct()` to periodically ensure data reaches the server.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns [`Error`] (I/O) if flushing the underlying transport
1280    /// fails.
1281    pub fn flush_stream(&mut self) -> Result<()> {
1282        self.connection.flush_stream()
1283    }
1284
1285    /// Reserves capacity in the write buffer to avoid reallocations.
1286    ///
1287    /// Call this before bulk operations to pre-allocate buffer space.
1288    pub fn reserve_buffer(&mut self, capacity: usize) {
1289        self.connection.reserve_write_buffer(capacity);
1290    }
1291
1292    /// Finishes the COPY operation successfully.
1293    ///
1294    /// Returns the number of rows inserted.
1295    ///
1296    /// # Errors
1297    ///
1298    /// - Returns [`Error`] (I/O) if writing `CopyDone` or flushing the
1299    ///   transport fails.
1300    /// - Returns [`Error`] (server) if the server reports a COPY-side
1301    ///   failure (constraint violation, type coercion error, etc.)
1302    ///   via an `ErrorResponse` after `CopyDone`.
1303    pub fn finish(mut self) -> Result<u64> {
1304        self.connection.finish_copy()
1305    }
1306
1307    /// Cancels the COPY operation.
1308    ///
1309    /// All data sent so far will be discarded.
1310    ///
1311    /// # Errors
1312    ///
1313    /// Returns [`Error`] (I/O) if writing the `CopyFail` frame or
1314    /// flushing the transport fails, or [`Error`] (server) if the server
1315    /// reports an unexpected status after the cancel.
1316    pub fn cancel(mut self, reason: &str) -> Result<()> {
1317        self.connection.cancel_copy(reason)
1318    }
1319}
1320
1321/// Parses affected row count from a command tag.
1322fn parse_affected_rows(tag: &str) -> Option<u64> {
1323    let parts: Vec<&str> = tag.split_whitespace().collect();
1324
1325    match parts.first()? {
1326        &"INSERT" => {
1327            // INSERT oid count
1328            parts.get(2)?.parse().ok()
1329        }
1330        &"UPDATE" | &"DELETE" | &"SELECT" | &"COPY" => {
1331            // UPDATE/DELETE/SELECT/COPY count
1332            parts.get(1)?.parse().ok()
1333        }
1334        _ => None,
1335    }
1336}
1337
1338/// Streaming iterator for query results without materializing all rows.
1339///
1340/// Holding a `QueryStream` keeps the underlying [`RawConnection`] locked
1341/// via a `MutexGuard`. Dropping the stream before fully iterating triggers
1342/// a server-side cancel (see [`Drop`] below) so the connection is returned
1343/// to the pool cleanly.
1344pub struct QueryStream<'a> {
1345    conn: Option<MutexGuard<'a, RawConnection<SyncStream>>>,
1346    /// Best-effort cancel handle, used in [`Drop`] when the stream is
1347    /// abandoned before completion. For the current sync client this is
1348    /// the owning [`Client`] itself (which implements [`Cancellable`] via
1349    /// a PG wire `CancelRequest` on a fresh connection). When a gRPC
1350    /// equivalent lands it will plug in the same trait with a
1351    /// `cancel_query(query_id)` RPC implementation.
1352    canceller: &'a dyn Cancellable,
1353    finished: bool,
1354    chunk_size: usize,
1355    schema: Option<Vec<super::statement::Column>>,
1356    schema_read: bool,
1357}
1358
1359impl std::fmt::Debug for QueryStream<'_> {
1360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1361        f.debug_struct("QueryStream")
1362            .field("finished", &self.finished)
1363            .field("chunk_size", &self.chunk_size)
1364            .field("schema_read", &self.schema_read)
1365            .finish_non_exhaustive()
1366    }
1367}
1368
1369impl Drop for QueryStream<'_> {
1370    fn drop(&mut self) {
1371        // If the caller exhausted the stream, the connection is already at
1372        // `ReadyForQuery` — nothing to do.
1373        if self.finished {
1374            return;
1375        }
1376
1377        // Otherwise: the server is still happily streaming rows (potentially
1378        // millions of them) for a query we no longer care about. Passively
1379        // draining would waste bandwidth and could block the destructor for
1380        // a very long time. Instead we send a transport-appropriate cancel
1381        // signal — on PG wire this is a `CancelRequest` packet on a fresh
1382        // connection; on gRPC (future) it will be a `cancel_query` RPC on
1383        // the shared channel. `Cancellable::cancel` is fire-and-forget and
1384        // cannot fail.
1385        self.canceller.cancel();
1386
1387        // After cancel, the server stops producing new rows and emits
1388        // `ErrorResponse(QueryCanceled) + ReadyForQuery` promptly. We still
1389        // need to drain those trailing messages off the wire so the
1390        // connection returns to the pool cleanly. The budget here is small
1391        // because we only expect: (a) whatever rows the server had already
1392        // flushed before seeing the cancel, (b) `ErrorResponse`,
1393        // (c) `ReadyForQuery`. A well-behaved server reaches `ReadyForQuery`
1394        // within a handful of messages. If that budget is somehow exceeded
1395        // the bounded drain logs a warning and marks the connection
1396        // desynchronized — downstream users will surface the desync as a
1397        // transport-level failure and reconnect.
1398        const POST_CANCEL_DRAIN_CAP: usize = 1024;
1399        if let Some(ref mut conn) = self.conn {
1400            let _ok = conn.drain_until_ready_bounded(POST_CANCEL_DRAIN_CAP);
1401        }
1402    }
1403}
1404
1405impl QueryStream<'_> {
1406    /// Returns the schema (column metadata) for the result set.
1407    #[must_use]
1408    pub fn schema(&self) -> Option<&[super::statement::Column]> {
1409        self.schema.as_deref()
1410    }
1411
1412    /// Retrieves the next chunk of rows (up to `chunk_size`).
1413    ///
1414    /// # Errors
1415    ///
1416    /// - Returns [`Error`] (I/O) if reading from the underlying transport
1417    ///   fails while awaiting the next protocol message.
1418    /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
1419    ///   during streaming (for example, a server-side execution failure
1420    ///   encountered partway through the result set).
1421    pub fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
1422        if self.finished {
1423            return Ok(None);
1424        }
1425
1426        let Some(conn) = self.conn.as_mut() else {
1427            return Ok(None);
1428        };
1429
1430        let mut rows = Vec::with_capacity(self.chunk_size);
1431        while rows.len() < self.chunk_size {
1432            let msg = conn.read_message()?;
1433            match msg {
1434                Message::RowDescription(desc) if !self.schema_read => {
1435                    let mut cols = Vec::new();
1436                    for f in desc.fields().filter_map(std::result::Result::ok) {
1437                        cols.push(super::statement::Column::new(
1438                            f.name().to_string(),
1439                            f.type_oid(),
1440                            f.type_modifier(),
1441                            super::statement::ColumnFormat::from_code(f.format()),
1442                        ));
1443                    }
1444                    self.schema = Some(cols);
1445                    self.schema_read = true;
1446                }
1447                Message::DataRow(data) => {
1448                    rows.push(StreamRow::new(data));
1449                    if rows.len() >= self.chunk_size {
1450                        return Ok(Some(rows));
1451                    }
1452                }
1453                Message::ReadyForQuery(_) => {
1454                    self.finished = true;
1455                    self.conn = None;
1456                    return if rows.is_empty() {
1457                        Ok(None)
1458                    } else {
1459                        Ok(Some(rows))
1460                    };
1461                }
1462                Message::ErrorResponse(body) => {
1463                    // Mark the stream finished *before* touching the
1464                    // connection so the `Drop` impl's Cancellable-based
1465                    // cleanup path is trivially a no-op regardless of what
1466                    // happens next (normal return, `?`, panic in drain,
1467                    // future refactors that insert early returns, etc).
1468                    //
1469                    // `&mut self` exclusivity means `Drop` can't fire
1470                    // concurrently with this method, so this is purely a
1471                    // defensive-ordering improvement — but it also matches
1472                    // the `ReadyForQuery` arm above, which sets
1473                    // `finished = true` first for the same reason. Keeping
1474                    // both terminal arms in the same order makes the
1475                    // "terminal state is committed before cleanup" rule
1476                    // visible at a glance.
1477                    self.finished = true;
1478                    // Drain through `ReadyForQuery` before releasing the
1479                    // pooled connection so the next user sees a clean wire
1480                    // state. `consume_error` swallows any drain I/O errors
1481                    // via tracing::warn — the caller's original error is
1482                    // more informative than a transport hiccup during
1483                    // cleanup.
1484                    let err = match self.conn {
1485                        Some(ref mut c) => c.consume_error(&body),
1486                        None => parse_error_response(&body),
1487                    };
1488                    self.conn = None;
1489                    return Err(err);
1490                }
1491                _ => {}
1492            }
1493        }
1494        Ok(Some(rows))
1495    }
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500    use super::*;
1501
1502    #[test]
1503    fn test_parse_affected_rows() {
1504        assert_eq!(parse_affected_rows("INSERT 0 5"), Some(5));
1505        assert_eq!(parse_affected_rows("UPDATE 10"), Some(10));
1506        assert_eq!(parse_affected_rows("DELETE 3"), Some(3));
1507        assert_eq!(parse_affected_rows("SELECT 100"), Some(100));
1508        assert_eq!(parse_affected_rows("CREATE TABLE"), None);
1509    }
1510
1511    #[test]
1512    fn test_copy_in_raw_rejects_non_copy_query() {
1513        // We can't test with a real connection, but we can verify the prefix guard
1514        // works by checking the error message. Since Client::connect fails without
1515        // a server, we test the validation logic directly.
1516        let query = "SELECT * FROM users";
1517        assert!(
1518            !query.trim_start().to_ascii_uppercase().starts_with("COPY"),
1519            "Non-COPY query should not pass the COPY prefix check"
1520        );
1521
1522        let copy_query = "COPY \"users\" FROM STDIN WITH (FORMAT csv)";
1523        assert!(
1524            copy_query
1525                .trim_start()
1526                .to_ascii_uppercase()
1527                .starts_with("COPY"),
1528            "COPY query should pass the prefix check"
1529        );
1530
1531        // Leading whitespace should be accepted
1532        let padded = "  COPY \"users\" FROM STDIN";
1533        assert!(padded.trim_start().to_ascii_uppercase().starts_with("COPY"));
1534
1535        // Case-insensitive
1536        let lowercase = "copy \"users\" FROM STDIN";
1537        assert!(lowercase
1538            .trim_start()
1539            .to_ascii_uppercase()
1540            .starts_with("COPY"));
1541    }
1542}