hyperdb_api_core/client/client.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level synchronous client for Hyper database.
5//!
6//! This module provides [`Client`], the primary synchronous interface for
7//! communicating with a Hyper server. It supports three query execution
8//! modes, each using a different level of the `PostgreSQL` wire protocol:
9//!
10//! - **Simple Query** ([`Client::query`]) — Sends a single `Query` message
11//! and collects all `DataRow` messages into memory. Results are returned
12//! in text format. Best for small result sets or DDL/DML commands.
13//!
14//! - **Extended Query / `HyperBinary`** ([`Client::query_fast`]) — Uses the
15//! Extended Query protocol (`Parse` / `Bind` / `Execute`) with
16//! [`ColumnFormat::HyperBinary`](super::statement::ColumnFormat::HyperBinary)
17//! (format code 2) for `LittleEndian` binary results. Returns
18//! [`StreamRow`]s that compute field offsets
19//! on-demand, avoiding per-row allocation.
20//!
21//! - **Streaming** ([`Client::query_streaming`]) — Like `query_fast` but
22//! returns a [`QueryStream`] that yields rows in chunks, keeping memory
23//! usage constant regardless of result set size. The stream holds the
24//! connection lock for its lifetime; dropping it triggers a cancel request
25//! to stop the server from streaming the rest.
26//!
27//! # Bulk Insertion (COPY Protocol)
28//!
29//! [`Client::copy_in`] starts a `COPY ... FROM STDIN WITH (FORMAT HYPERBINARY)`
30//! session and returns a [`CopyInWriter`] for streaming binary data. The
31//! caller is responsible for encoding rows in the correct format (typically
32//! done by the higher-level [`hyperdb_api::Inserter`](https://docs.rs/hyperdb-api)).
33//! See also [`copy_in_with_format`](Client::copy_in_with_format) for
34//! alternative formats (CSV, Arrow IPC) and [`copy_in_raw`](Client::copy_in_raw)
35//! for fully custom COPY statements.
36
37use std::net::TcpStream;
38use std::sync::{Arc, Mutex, MutexGuard};
39
40use tracing::{debug, info, trace, warn};
41
42#[cfg(unix)]
43use std::os::unix::net::UnixStream;
44
45use super::cancel::Cancellable;
46use super::config::Config;
47use super::connection::{parse_error_response, RawConnection};
48use super::endpoint::ConnectionEndpoint;
49use super::error::{Error, ErrorKind, Result};
50use super::prepare;
51use super::row::{Row, StreamRow};
52use super::sync_stream::SyncStream;
53
54use crate::protocol::message::Message;
55use crate::types::Oid;
56
57use super::notice::{Notice, NoticeReceiver};
58
59/// A synchronous client for Hyper database.
60///
61/// The client handles connection management and query execution.
62/// It is thread-safe and can be shared between threads using `Arc`.
63///
64/// # Thread Safety
65///
66/// The `Client` is thread-safe and can be shared between threads using `Arc<Client>`.
67/// All methods use internal mutexes to synchronize access to the underlying connection.
68///
69/// # Example
70///
71/// ```no_run
72/// use hyperdb_api_core::client::{Client, Config};
73///
74/// # fn example() -> hyperdb_api_core::client::Result<()> {
75/// let config = Config::new()
76/// .with_host("localhost")
77/// .with_port(7483)
78/// .with_database("test.hyper");
79///
80/// let client = Client::connect(&config)?;
81/// let rows = client.query("SELECT 1")?;
82/// client.close()?;
83/// # Ok(())
84/// # }
85/// ```
86pub struct Client {
87 /// The underlying connection, protected by a mutex for thread safety.
88 connection: Arc<Mutex<RawConnection<SyncStream>>>,
89 /// Backend process ID (for cancel requests).
90 process_id: i32,
91 /// Secret key for authenticating cancel requests.
92 secret_key: i32,
93 /// Connection endpoint for cancel requests and reconnection.
94 endpoint: ConnectionEndpoint,
95 /// Optional notice receiver callback for server notices/warnings.
96 notice_receiver: Option<Arc<NoticeReceiver>>,
97}
98
99// Manual Debug implementation because NoticeReceiver doesn't implement Debug
100impl std::fmt::Debug for Client {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("Client")
103 .field("process_id", &self.process_id)
104 .field("secret_key", &self.secret_key)
105 .field("endpoint", &self.endpoint)
106 .field(
107 "notice_receiver",
108 &self.notice_receiver.as_ref().map(|_| "<callback>"),
109 )
110 .finish_non_exhaustive()
111 }
112}
113
114impl Client {
115 /// Connects to a Hyper server using the given configuration.
116 ///
117 /// Establishes a TCP connection, performs authentication, and initializes
118 /// the client. Returns an error if the connection fails or authentication
119 /// is rejected.
120 ///
121 /// # Arguments
122 ///
123 /// * `config` - Connection configuration (host, port, credentials, etc.)
124 ///
125 /// # Errors
126 ///
127 /// Returns `Error` if:
128 /// - Connection to the server fails
129 /// - Authentication fails
130 /// - Protocol handshake fails
131 ///
132 /// # Example
133 ///
134 /// ```no_run
135 /// # use hyperdb_api_core::client::{Client, Config};
136 /// # fn example() -> hyperdb_api_core::client::Result<()> {
137 /// let config = Config::new()
138 /// .with_host("localhost")
139 /// .with_port(7483)
140 /// .with_user("myuser")
141 /// .with_password("mypass")
142 /// .with_database("test.hyper");
143 ///
144 /// let client = Client::connect(&config)?;
145 /// # Ok(())
146 /// # }
147 /// ```
148 pub fn connect(config: &Config) -> Result<Self> {
149 // Log connection parameters (password is intentionally omitted for security)
150 info!(
151 target: "hyperdb_api",
152 host = %config.host(),
153 port = config.port(),
154 user = config.user().unwrap_or("(default)"),
155 database = config.database().unwrap_or("(none)"),
156 "connection-parameters"
157 );
158
159 let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
160 let addr = format!("{}:{}", config.host(), config.port());
161 let tcp_stream = TcpStream::connect(&addr).map_err(|e| {
162 warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
163 Error::connection(format!("failed to connect to {addr}: {e}"))
164 })?;
165
166 // Set TCP options for better performance.
167 //
168 // `TCP_NODELAY` disables Nagle so request bytes flush immediately —
169 // needed for low-latency request/response shapes.
170 //
171 // `SO_RCVBUF` / `SO_SNDBUF` are bumped to 4 MiB. The Windows default
172 // TCP buffers are ~64 KiB, which throttles loopback throughput
173 // because hyperd blocks on `send()` once the kernel buffer fills up.
174 // Linux auto-tunes much higher, so this primarily helps Windows
175 // (and is a marginal win on macOS).
176 //
177 // Empirical knee on Windows i9-10980XE / TCP loopback (100M-row
178 // sync full-scan, single connection):
179 //
180 // | size | rows/sec |
181 // |------:|---------:|
182 // | 64 K | 2.89 | (default)
183 // | 1 M | 5.95 |
184 // | 4 M | 6.90 | <-- knee
185 // | 8 M | 6.68 | (insert workloads regress 18%)
186 //
187 // 4 MiB hits the throughput plateau without the memory-pressure
188 // regression seen at 8 MiB. We use `.ok()` because the kernel may
189 // clamp to a lower value or refuse the request entirely; either
190 // way the connection still works at the default size.
191 tcp_stream.set_nodelay(true).ok();
192 let sock = socket2::SockRef::from(&tcp_stream);
193 sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
194 sock.set_send_buffer_size(4 * 1024 * 1024).ok();
195
196 let stream = SyncStream::tcp(tcp_stream);
197 let mut connection = RawConnection::new(stream);
198
199 // Perform startup with authentication
200 let params = config.startup_params();
201 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
202 connection.startup(¶ms_ref, config.password())?;
203
204 let process_id = connection.process_id();
205 let secret_key = connection.secret_key();
206
207 debug!(
208 target: "hyperdb_api",
209 process_id,
210 "connection-established"
211 );
212
213 Ok(Client {
214 connection: Arc::new(Mutex::new(connection)),
215 process_id,
216 secret_key,
217 endpoint,
218 notice_receiver: None,
219 })
220 }
221
222 /// Connects to a Hyper server via Unix Domain Socket (Unix only).
223 ///
224 /// # Example
225 ///
226 /// ```no_run
227 /// # use hyperdb_api_core::client::{Client, Config};
228 /// # use std::path::Path;
229 /// # fn example() -> hyperdb_api_core::client::Result<()> {
230 /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
231 /// let config = Config::new().with_database("test.hyper");
232 /// let client = Client::connect_unix(socket_path, &config)?;
233 /// # Ok(())
234 /// # }
235 /// ```
236 ///
237 /// # Errors
238 ///
239 /// - Returns [`Error`] (connection) if the Unix domain socket cannot
240 /// be connected.
241 /// - Propagates any [`Error`] from the startup handshake
242 /// (authentication, protocol error, I/O error).
243 #[cfg(unix)]
244 pub fn connect_unix(socket_path: impl AsRef<std::path::Path>, config: &Config) -> Result<Self> {
245 use std::path::Path;
246
247 let path = socket_path.as_ref();
248 info!(
249 target: "hyperdb_api",
250 socket_path = %path.display(),
251 user = config.user().unwrap_or("(default)"),
252 database = config.database().unwrap_or("(none)"),
253 "connection-parameters-unix"
254 );
255
256 let unix_stream = UnixStream::connect(path).map_err(|e| {
257 warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
258 Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
259 })?;
260
261 // Parse endpoint from socket path
262 let directory = path.parent().unwrap_or(Path::new("/"));
263 let name = path
264 .file_name()
265 .and_then(|n| n.to_str())
266 .unwrap_or("socket");
267 let endpoint = ConnectionEndpoint::domain_socket(directory, name);
268
269 let stream = SyncStream::unix(unix_stream);
270 let mut connection = RawConnection::new(stream);
271
272 // Perform startup with authentication
273 let params = config.startup_params();
274 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
275 connection.startup(¶ms_ref, config.password())?;
276
277 let process_id = connection.process_id();
278 let secret_key = connection.secret_key();
279
280 debug!(
281 target: "hyperdb_api",
282 process_id,
283 "connection-established-unix"
284 );
285
286 Ok(Client {
287 connection: Arc::new(Mutex::new(connection)),
288 process_id,
289 secret_key,
290 endpoint,
291 notice_receiver: None,
292 })
293 }
294
295 /// Connects to a Hyper server via Windows Named Pipe (Windows only).
296 ///
297 /// # Arguments
298 ///
299 /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
300 /// * `config` - Connection configuration
301 ///
302 /// # Errors
303 ///
304 /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
305 /// exist, all instances are busy after the retry window, or permission is
306 /// denied) or if the authentication handshake fails.
307 #[cfg(windows)]
308 pub fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
309 use std::fs::OpenOptions;
310 use std::time::{Duration, Instant};
311
312 info!(
313 target: "hyperdb_api",
314 pipe_path = %pipe_path,
315 user = config.user().unwrap_or("(default)"),
316 database = config.database().unwrap_or("(none)"),
317 "connection-parameters-named-pipe"
318 );
319
320 // Windows named pipes have a finite number of server-side instances
321 // (`MaxInstances` on `CreateNamedPipe`). When all are busy, `CreateFile`
322 // returns `ERROR_PIPE_BUSY` (231). The expected client behavior is to
323 // wait briefly and retry — equivalent to `WaitNamedPipe` from Win32.
324 // We poll with a short sleep up to a reasonable deadline so concurrent
325 // clients don't spuriously fail when the pool is momentarily exhausted.
326 const RETRY_INTERVAL: Duration = Duration::from_millis(20);
327 const MAX_WAIT: Duration = Duration::from_secs(10);
328 const ERROR_PIPE_BUSY: i32 = 231;
329
330 let deadline = Instant::now() + MAX_WAIT;
331 let file = loop {
332 match OpenOptions::new().read(true).write(true).open(pipe_path) {
333 Ok(f) => break f,
334 Err(e)
335 if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
336 {
337 std::thread::sleep(RETRY_INTERVAL);
338 }
339 Err(e) => {
340 warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
341 return Err(Error::connection(format!(
342 "failed to connect to named pipe {pipe_path}: {e}"
343 )));
344 }
345 }
346 };
347
348 // Parse endpoint from pipe path
349 let endpoint = ConnectionEndpoint::parse(&format!(
350 "tab.pipe://{}",
351 pipe_path.trim_start_matches(r"\\").replace('\\', "/")
352 ))
353 .unwrap_or_else(|_| {
354 // Fallback: construct from pipe path directly
355 // Expected format: \\<host>\pipe\<name>
356 let parts: Vec<&str> = pipe_path
357 .trim_start_matches(r"\\")
358 .splitn(3, '\\')
359 .collect();
360 if parts.len() >= 3 {
361 ConnectionEndpoint::named_pipe(parts[0], parts[2])
362 } else {
363 ConnectionEndpoint::named_pipe(".", pipe_path)
364 }
365 });
366
367 let stream = SyncStream::named_pipe(file);
368 let mut connection = RawConnection::new(stream);
369
370 // Perform startup with authentication
371 let params = config.startup_params();
372 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
373 connection.startup(¶ms_ref, config.password())?;
374
375 let process_id = connection.process_id();
376 let secret_key = connection.secret_key();
377
378 debug!(
379 target: "hyperdb_api",
380 process_id,
381 "connection-established-named-pipe"
382 );
383
384 Ok(Client {
385 connection: Arc::new(Mutex::new(connection)),
386 process_id,
387 secret_key,
388 endpoint,
389 notice_receiver: None,
390 })
391 }
392
393 /// Connects to a Hyper server using a `ConnectionEndpoint`.
394 ///
395 /// This is a lower-level method that accepts a pre-parsed endpoint.
396 ///
397 /// # Errors
398 ///
399 /// Delegates to [`Client::connect`], [`Client::connect_unix`], or
400 /// `Client::connect_named_pipe` depending on the endpoint variant,
401 /// and propagates their errors unchanged.
402 pub fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
403 match endpoint {
404 ConnectionEndpoint::Tcp { host, port } => {
405 let mut cfg = config.clone();
406 cfg = cfg.with_host(host.clone()).with_port(*port);
407 Self::connect(&cfg)
408 }
409 #[cfg(unix)]
410 ConnectionEndpoint::DomainSocket { directory, name } => {
411 let socket_path = directory.join(name);
412 Self::connect_unix(&socket_path, config)
413 }
414 #[cfg(windows)]
415 ConnectionEndpoint::NamedPipe { host, name } => {
416 let pipe_path = format!(r"\\{host}\pipe\{name}");
417 Self::connect_named_pipe(&pipe_path, config)
418 }
419 }
420 }
421
422 /// Returns the connection endpoint.
423 #[must_use]
424 pub fn endpoint(&self) -> &ConnectionEndpoint {
425 &self.endpoint
426 }
427
428 /// Returns the server process ID for this connection.
429 #[must_use]
430 pub fn process_id(&self) -> i32 {
431 self.process_id
432 }
433
434 /// Returns the secret key for cancel requests.
435 #[must_use]
436 pub fn secret_key(&self) -> i32 {
437 self.secret_key
438 }
439
440 /// Cancels the currently executing query on this connection.
441 ///
442 /// This method is **thread-safe** and can be called from any thread while
443 /// a query is running on another thread. It works by opening a separate
444 /// TCP connection to the server and sending a cancel request.
445 ///
446 /// # How It Works
447 ///
448 /// 1. Opens a new TCP connection to the same server
449 /// 2. Sends a cancel request containing the process ID and secret key
450 /// 3. The server receives this and cancels the running query
451 /// 4. The original query will fail with error code 57014 (`query_canceled`)
452 ///
453 /// # Thread Safety
454 ///
455 /// This method does NOT acquire the connection mutex, so it can be called
456 /// while another thread is blocked waiting for query results.
457 ///
458 /// # Relation to the [`Cancellable`] trait
459 ///
460 /// This is the **fallible user-facing cancel API**: it returns a
461 /// `Result<()>` so explicit callers can observe transport-level
462 /// failures (network errors, socket issues) and react accordingly —
463 /// e.g. record a metric, show "cancel failed" UX, or retry.
464 ///
465 /// For [`Drop`]-path and other internal cleanup contexts where error
466 /// propagation is impossible, the separate
467 /// [`impl Cancellable for Client`](super::cancel::Cancellable) wraps
468 /// this method and swallows errors (logged via `tracing::warn!`).
469 /// The two coexist by design — each serves a different consumer.
470 ///
471 /// # Example
472 ///
473 /// ```no_run
474 /// use std::thread;
475 /// use std::sync::Arc;
476 /// use std::time::Duration;
477 /// use hyperdb_api_core::client::{Client, Config};
478 ///
479 /// # fn example() -> hyperdb_api_core::client::Result<()> {
480 /// # let config = Config::new().with_host("localhost").with_port(7483);
481 /// let client = Arc::new(Client::connect(&config)?);
482 /// let client_clone = Arc::clone(&client);
483 ///
484 /// // Start a long query in another thread
485 /// let handle = thread::spawn(move || {
486 /// client_clone.query("SELECT pg_sleep(60)")
487 /// });
488 ///
489 /// // Cancel from the main thread
490 /// thread::sleep(Duration::from_millis(100));
491 /// client.cancel()?;
492 ///
493 /// // The query thread will get a cancellation error
494 /// let result = handle.join().unwrap();
495 /// assert!(result.is_err());
496 /// # Ok(())
497 /// # }
498 /// ```
499 ///
500 /// # Errors
501 ///
502 /// - Returns [`Error`] (connection) if the fresh cancel-side socket
503 /// cannot be opened (TCP / UDS / named-pipe, depending on
504 /// [`Self::endpoint`]).
505 /// - Returns [`Error`] (I/O) if writing or flushing the cancel
506 /// request fails.
507 pub fn cancel(&self) -> Result<()> {
508 use crate::protocol::message::frontend;
509 use bytes::BytesMut;
510 use std::io::Write;
511
512 info!(
513 target: "hyperdb_api",
514 process_id = self.process_id,
515 "query-cancel-request"
516 );
517
518 let endpoint_str = self.endpoint.to_string();
519
520 // Open a new connection specifically for the cancel request
521 match &self.endpoint {
522 ConnectionEndpoint::Tcp { host, port } => {
523 let addr = format!("{host}:{port}");
524 let mut stream = TcpStream::connect(&addr).map_err(|e| {
525 warn!(
526 target: "hyperdb_api",
527 addr = %endpoint_str,
528 error = %e,
529 "query-cancel-connect-failed"
530 );
531 Error::connection(format!(
532 "failed to connect for cancel request to {endpoint_str}: {e}"
533 ))
534 })?;
535 // Cancel is a 16-byte fire-and-forget — disable Nagle so the
536 // request hits the wire without waiting on a coalesce timer.
537 stream.set_nodelay(true).ok();
538
539 // Build and send the cancel request
540 let mut buf = BytesMut::with_capacity(16);
541 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
542
543 stream.write_all(&buf).map_err(|e| {
544 warn!(
545 target: "hyperdb_api",
546 error = %e,
547 "query-cancel-send-failed"
548 );
549 Error::io(e)
550 })?;
551
552 stream.flush().map_err(Error::io)?;
553 }
554 #[cfg(unix)]
555 ConnectionEndpoint::DomainSocket { directory, name } => {
556 let socket_path = directory.join(name);
557 let mut stream = UnixStream::connect(&socket_path).map_err(|e| {
558 warn!(
559 target: "hyperdb_api",
560 addr = %endpoint_str,
561 error = %e,
562 "query-cancel-connect-failed"
563 );
564 Error::connection(format!(
565 "failed to connect for cancel request to {endpoint_str}: {e}"
566 ))
567 })?;
568
569 // Build and send the cancel request
570 let mut buf = BytesMut::with_capacity(16);
571 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
572
573 stream.write_all(&buf).map_err(|e| {
574 warn!(
575 target: "hyperdb_api",
576 error = %e,
577 "query-cancel-send-failed"
578 );
579 Error::io(e)
580 })?;
581
582 stream.flush().map_err(Error::io)?;
583 }
584 #[cfg(windows)]
585 ConnectionEndpoint::NamedPipe { host, name } => {
586 let pipe_path = format!(r"\\{host}\pipe\{name}");
587 let mut file = std::fs::OpenOptions::new()
588 .read(true)
589 .write(true)
590 .open(&pipe_path)
591 .map_err(|e| {
592 warn!(
593 target: "hyperdb_api",
594 addr = %endpoint_str,
595 error = %e,
596 "query-cancel-connect-failed"
597 );
598 Error::connection(format!(
599 "failed to connect for cancel request to {endpoint_str}: {e}"
600 ))
601 })?;
602
603 // Build and send the cancel request
604 let mut buf = BytesMut::with_capacity(16);
605 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
606
607 file.write_all(&buf).map_err(|e| {
608 warn!(
609 target: "hyperdb_api",
610 error = %e,
611 "query-cancel-send-failed"
612 );
613 Error::io(e)
614 })?;
615
616 file.flush().map_err(Error::io)?;
617 }
618 }
619
620 debug!(
621 target: "hyperdb_api",
622 process_id = self.process_id,
623 "query-cancel-sent"
624 );
625
626 Ok(())
627 }
628
629 /// Returns a server parameter value by name.
630 ///
631 /// Server parameters are sent by the server during connection startup.
632 /// Common parameters include:
633 /// - `server_version` - The server version string
634 /// - `server_encoding` - The server's character encoding
635 /// - `client_encoding` - The client's character encoding
636 ///
637 /// # Example
638 ///
639 /// ```no_run
640 /// # use hyperdb_api_core::client::{Client, Config};
641 /// # fn example(client: &Client) {
642 /// if let Some(version) = client.parameter_status("server_version") {
643 /// println!("Connected to Hyper version: {}", version);
644 /// }
645 /// # }
646 /// ```
647 #[must_use]
648 pub fn parameter_status(&self, name: &str) -> Option<String> {
649 let conn = self.connection.lock().ok()?;
650 conn.parameter_status(name)
651 .map(std::string::ToString::to_string)
652 }
653
654 /// Sets the notice receiver for this connection.
655 ///
656 /// Notice and warning messages generated by the server are not returned by
657 /// query execution functions since they don't indicate failure. Instead,
658 /// they are passed to a notice handling function.
659 ///
660 /// The default behavior is to log notices at the `warn` level.
661 ///
662 /// # Arguments
663 ///
664 /// * `receiver` - The callback function that will be called with each notice.
665 /// Pass `None` to restore default logging behavior.
666 ///
667 /// # Example
668 ///
669 /// ```no_run
670 /// # use hyperdb_api_core::client::Client;
671 /// # use std::sync::{Arc, Mutex};
672 /// # fn example(client: &mut Client) {
673 /// client.set_notice_receiver(Some(Box::new(|notice| {
674 /// println!("Server notice: {}", notice);
675 /// })));
676 ///
677 /// // Or capture notices in a Vec
678 /// let notices = Arc::new(Mutex::new(Vec::new()));
679 /// let notices_clone = notices.clone();
680 /// client.set_notice_receiver(Some(Box::new(move |notice| {
681 /// notices_clone.lock().unwrap().push(notice);
682 /// })));
683 /// # }
684 /// ```
685 pub fn set_notice_receiver(&mut self, receiver: Option<NoticeReceiver>) {
686 self.notice_receiver = receiver.map(Arc::new);
687 }
688
689 /// Processes any notices in a list of messages, calling the notice receiver.
690 ///
691 /// This is called internally after receiving messages from the server.
692 pub(crate) fn process_notices(&self, messages: &[Message]) {
693 for msg in messages {
694 if let Message::NoticeResponse(body) = msg {
695 let notice = Notice::from_response_body(body);
696
697 if let Some(ref receiver) = self.notice_receiver {
698 receiver(notice);
699 } else {
700 // Default behavior: log at warn level
701 warn!(
702 target: "hyperdb_api",
703 severity = notice.severity().unwrap_or("NOTICE"),
704 code = notice.code().unwrap_or(""),
705 message = %notice.message(),
706 "server-notice"
707 );
708 }
709 }
710 }
711 }
712
713 /// Acquires a lock on the connection.
714 fn lock_connection(&self) -> Result<MutexGuard<'_, RawConnection<SyncStream>>> {
715 self.connection
716 .lock()
717 .map_err(|_| Error::connection("connection mutex poisoned"))
718 }
719
720 /// Executes a simple query and returns the rows.
721 ///
722 /// # Errors
723 ///
724 /// - Returns [`Error`] (connection) if the connection mutex is
725 /// poisoned.
726 /// - Returns [`Error`] (server) for any SQL error the server reports
727 /// (syntax error, constraint violation, type mismatch).
728 /// - Returns [`Error`] (I/O) on wire-protocol I/O failure.
729 /// - Propagates any [`Error`] from row construction (invalid row
730 /// description or data row bytes).
731 pub fn query(&self, query: &str) -> Result<Vec<Row>> {
732 let mut conn = self.lock_connection()?;
733 let messages = conn.simple_query(query)?;
734 drop(conn); // Release lock before processing notices
735
736 self.process_notices(&messages);
737
738 let mut rows = Vec::new();
739 let mut columns = None;
740
741 for msg in messages {
742 match msg {
743 crate::protocol::message::Message::RowDescription(desc) => {
744 // Extract column info including format from protocol
745 let mut cols = Vec::new();
746 for f in desc.fields().filter_map(|r| {
747 r.map_err(|e| trace!(target: "hyperdb_api_core::client", error = %e, "dropped error parsing row description field")).ok()
748 }) {
749 cols.push(super::statement::Column::new(
750 f.name().to_string(),
751 f.type_oid(),
752 f.type_modifier(),
753 super::statement::ColumnFormat::from_code(f.format()),
754 ));
755 }
756 columns = Some(Arc::new(cols));
757 }
758 crate::protocol::message::Message::DataRow(data) => {
759 if let Some(ref cols) = columns {
760 rows.push(Row::new(Arc::clone(cols), data)?);
761 }
762 }
763 _ => {}
764 }
765 }
766
767 Ok(rows)
768 }
769
770 /// Executes a query using `HyperBinary` format for maximum performance.
771 ///
772 /// Returns `StreamRow`s which compute offsets on-demand without pre-allocation,
773 /// making them faster for large result sets where each row is processed once.
774 /// Uses the extended query protocol with `HyperBinary` format (format code 2)
775 /// for direct binary access without text parsing overhead.
776 ///
777 /// # Errors
778 ///
779 /// Same as [`Self::query`]: connection-mutex poisoning, SQL errors
780 /// from the server, and wire-protocol I/O failures all surface as
781 /// [`Error`].
782 pub fn query_fast(&self, query: &str) -> Result<Vec<StreamRow>> {
783 let mut conn = self.lock_connection()?;
784 let messages = conn.query_binary(query)?;
785 drop(conn);
786
787 self.process_notices(&messages);
788
789 let mut rows = Vec::new();
790 for msg in messages {
791 if let crate::protocol::message::Message::DataRow(data) = msg {
792 rows.push(StreamRow::new(data));
793 }
794 }
795 Ok(rows)
796 }
797
798 /// Executes a query with streaming results for minimum memory usage.
799 ///
800 /// Combines `HyperBinary` format with incremental row fetching.
801 ///
802 /// # Errors
803 ///
804 /// - Returns [`Error`] (connection) if the connection mutex is
805 /// poisoned.
806 /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
807 /// `Parse`/`Bind`/`Execute` sequence for the streaming query
808 /// fails on the server or on the wire.
809 pub fn query_streaming<'a>(
810 &'a self,
811 query: &str,
812 chunk_size: usize,
813 ) -> Result<QueryStream<'a>> {
814 let mut conn = self.lock_connection()?;
815 conn.start_query_binary(query)?;
816 Ok(QueryStream {
817 conn: Some(conn),
818 // The owning client is the canceller: if the stream is dropped
819 // before being fully drained, its `Drop` impl will call
820 // `self.cancel()` (PG wire `CancelRequest` on a fresh
821 // connection) to stop the server from streaming the rest.
822 canceller: self,
823 finished: false,
824 chunk_size: chunk_size.max(1),
825 schema: None,
826 schema_read: false,
827 })
828 }
829
830 /// Executes a SQL command that doesn't return rows (e.g., INSERT, UPDATE).
831 ///
832 /// # Errors
833 ///
834 /// Same error modes as [`Self::query`] — connection-mutex poisoning,
835 /// server-side SQL errors, and wire-protocol I/O failures all
836 /// surface as [`Error`].
837 pub fn exec(&self, query: &str) -> Result<u64> {
838 let mut conn = self.lock_connection()?;
839 let messages = conn.simple_query(query)?;
840 drop(conn); // Release lock before processing notices
841
842 self.process_notices(&messages);
843
844 let mut affected = 0u64;
845 for msg in messages {
846 if let crate::protocol::message::Message::CommandComplete(body) = msg {
847 if let Ok(tag) = body.tag() {
848 // Parse affected row count from tag like "INSERT 0 1"
849 if let Some(count) = parse_affected_rows(tag) {
850 affected = count;
851 }
852 }
853 }
854 }
855
856 Ok(affected)
857 }
858
859 /// Executes a batch of statements separated by semicolons.
860 ///
861 /// # Errors
862 ///
863 /// Same error modes as [`Self::query`] — connection-mutex poisoning,
864 /// server-side SQL errors, and wire-protocol I/O failures.
865 pub fn batch_execute(&self, query: &str) -> Result<()> {
866 let mut conn = self.lock_connection()?;
867 let messages = conn.simple_query(query)?;
868 drop(conn); // Release lock before processing notices
869
870 self.process_notices(&messages);
871 Ok(())
872 }
873
874 /// Prepares a statement for execution with the \[`params!`\] macro.
875 ///
876 /// Returns an [`prepare::OwnedPreparedStatement`] that automatically closes when dropped.
877 ///
878 /// # Example
879 ///
880 /// ```no_run
881 /// # use hyperdb_api_core::{params, client::{Client, Config}};
882 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
883 /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1")?;
884 /// let rows = client.execute(&stmt, params![42_i32])?;
885 /// # Ok(())
886 /// # }
887 /// ```
888 ///
889 /// # Errors
890 ///
891 /// Propagates any [`Error`] from [`prepare::prepare_owned`] —
892 /// connection-mutex poisoning, server-side `Parse` failures (SQL
893 /// syntax, type resolution), and wire-protocol I/O failures.
894 pub fn prepare(&self, query: &str) -> Result<prepare::OwnedPreparedStatement> {
895 prepare::prepare_owned(&self.connection, query, &[])
896 }
897
898 /// Prepares a statement with explicit parameter types.
899 ///
900 /// # Errors
901 ///
902 /// Same failure modes as [`Self::prepare`].
903 pub fn prepare_typed(
904 &self,
905 query: &str,
906 param_types: &[Oid],
907 ) -> Result<prepare::OwnedPreparedStatement> {
908 prepare::prepare_owned(&self.connection, query, param_types)
909 }
910
911 /// Executes a prepared statement with parameters.
912 ///
913 /// Use the \[`params!`\] macro for ergonomic parameter encoding:
914 ///
915 /// ```no_run
916 /// # use hyperdb_api_core::{params, client::{Client, Config}};
917 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
918 /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1 AND name = $2")?;
919 /// let rows = client.execute(&stmt, params![42_i32, "Alice"])?;
920 /// # Ok(())
921 /// # }
922 /// ```
923 ///
924 /// # Errors
925 ///
926 /// Propagates any [`Error`] from [`prepare::execute_prepared`] —
927 /// parameter-count or type mismatch, server-side SQL errors, and
928 /// wire-protocol I/O failures.
929 pub fn execute<P: AsRef<[Option<Vec<u8>>]>>(
930 &self,
931 statement: &prepare::OwnedPreparedStatement,
932 params: P,
933 ) -> Result<Vec<Row>> {
934 let params_ref: Vec<Option<&[u8]>> = params
935 .as_ref()
936 .iter()
937 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
938 .collect();
939 prepare::execute_prepared(&self.connection, statement.statement(), ¶ms_ref)
940 }
941
942 /// Executes a prepared statement that doesn't return rows.
943 ///
944 /// # Errors
945 ///
946 /// Same failure modes as [`Self::execute`].
947 pub fn execute_no_result<P: AsRef<[Option<Vec<u8>>]>>(
948 &self,
949 statement: &prepare::OwnedPreparedStatement,
950 params: P,
951 ) -> Result<u64> {
952 let params_ref: Vec<Option<&[u8]>> = params
953 .as_ref()
954 .iter()
955 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
956 .collect();
957 prepare::execute_prepared_no_result(&self.connection, statement.statement(), ¶ms_ref)
958 }
959
960 /// Executes a prepared statement with streaming results.
961 ///
962 /// Returns a [`PreparedQueryStream`](super::prepared_stream::PreparedQueryStream)
963 /// that yields rows in chunks, keeping memory bounded regardless of
964 /// result size. This is the prepared-statement analog of
965 /// [`query_streaming`](Self::query_streaming).
966 ///
967 /// The connection mutex is held for the duration of iteration;
968 /// dropping the stream before completion issues a best-effort cancel.
969 ///
970 /// # Errors
971 ///
972 /// - Returns [`Error`] (connection) if the connection mutex is
973 /// poisoned.
974 /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
975 /// `Bind`/`Execute` sequence for the prepared statement fails on
976 /// the server or on the wire.
977 pub fn execute_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
978 &'a self,
979 statement: &prepare::OwnedPreparedStatement,
980 params: P,
981 chunk_size: usize,
982 ) -> Result<super::prepared_stream::PreparedQueryStream<'a>> {
983 let params_ref: Vec<Option<&[u8]>> = params
984 .as_ref()
985 .iter()
986 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
987 .collect();
988
989 let mut conn = self.lock_connection()?;
990 conn.start_execute_prepared(statement.name(), ¶ms_ref, statement.columns().len())?;
991
992 let columns = std::sync::Arc::new(statement.columns().to_vec());
993 Ok(super::prepared_stream::PreparedQueryStream::new(
994 conn, self, chunk_size, columns,
995 ))
996 }
997
998 /// Closes the connection.
999 ///
1000 /// # Errors
1001 ///
1002 /// - Returns [`Error`] (connection) if the connection mutex is
1003 /// poisoned.
1004 /// - Returns [`Error`] (I/O) if writing the `Terminate` message or
1005 /// flushing the socket fails.
1006 pub fn close(self) -> Result<()> {
1007 let mut conn = self.lock_connection()?;
1008 conn.terminate()
1009 }
1010
1011 /// Starts a COPY IN operation for bulk data insertion.
1012 ///
1013 /// Returns a `CopyInWriter` that can be used to send data in `HyperBinary` format.
1014 ///
1015 /// # Example
1016 ///
1017 /// ```no_run
1018 /// # use hyperdb_api_core::client::Client;
1019 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1020 /// # let binary_data = &[];
1021 /// let mut writer = client.copy_in("\"my_table\"", &["col1", "col2"])?;
1022 /// writer.send(binary_data)?;
1023 /// let rows = writer.finish()?;
1024 /// # Ok(())
1025 /// # }
1026 /// ```
1027 ///
1028 /// # Errors
1029 ///
1030 /// Delegates to [`Self::copy_in_with_format`]; see that method
1031 /// for the concrete failure modes.
1032 pub fn copy_in(&self, table_name: &str, columns: &[&str]) -> Result<CopyInWriter<'_>> {
1033 self.copy_in_with_format(table_name, columns, "HYPERBINARY")
1034 }
1035
1036 /// Starts a COPY IN operation with a specified data format.
1037 ///
1038 /// Returns a `CopyInWriter` that can be used to send data in the specified format.
1039 ///
1040 /// # Arguments
1041 ///
1042 /// * `table_name` - The target table name (should be properly quoted if needed)
1043 /// * `columns` - Column names to insert into
1044 /// * `format` - The data format string: "HYPERBINARY" or "ARROWSTREAM"
1045 ///
1046 /// # Example
1047 ///
1048 /// ```no_run
1049 /// # use hyperdb_api_core::client::Client;
1050 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1051 /// # let arrow_ipc_data = &[];
1052 /// // For Arrow IPC stream format
1053 /// let mut writer = client.copy_in_with_format("\"my_table\"", &["col1", "col2"], "ARROWSTREAM")?;
1054 /// writer.send(arrow_ipc_data)?;
1055 /// let rows = writer.finish()?;
1056 /// # Ok(())
1057 /// # }
1058 /// ```
1059 ///
1060 /// # Errors
1061 ///
1062 /// - Returns [`Error`] (connection) if the connection mutex is
1063 /// poisoned.
1064 /// - Returns [`Error`] (server) if the server rejects the generated
1065 /// `COPY ... FROM STDIN` statement (for example, missing table
1066 /// or mismatched columns).
1067 /// - Returns [`Error`] (I/O) on wire-protocol I/O failure while
1068 /// initiating the COPY.
1069 pub fn copy_in_with_format(
1070 &self,
1071 table_name: &str,
1072 columns: &[&str],
1073 format: &str,
1074 ) -> Result<CopyInWriter<'_>> {
1075 let mut conn = self.lock_connection()?;
1076 conn.start_copy_in_with_format(table_name, columns, format)?;
1077 Ok(CopyInWriter { connection: conn })
1078 }
1079
1080 /// Starts a COPY IN operation from a raw SQL query string.
1081 ///
1082 /// The query must be a complete `COPY ... FROM STDIN ...` statement.
1083 /// This is useful for text-format imports (CSV, TSV) where you need
1084 /// full control over the COPY options.
1085 ///
1086 /// # Security
1087 ///
1088 /// The query is validated to start with `COPY` (case-insensitive) as a
1089 /// defense-in-depth measure. Callers are still responsible for proper
1090 /// escaping of table names and other identifiers within the query.
1091 /// Prefer [`copy_in()`](Self::copy_in) or
1092 /// [`copy_in_with_format()`](Self::copy_in_with_format) when possible,
1093 /// as they handle escaping automatically.
1094 ///
1095 /// # Example
1096 ///
1097 /// ```no_run
1098 /// # use hyperdb_api_core::client::Client;
1099 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1100 /// let mut writer = client.copy_in_raw(
1101 /// "COPY \"my_table\" FROM STDIN WITH (FORMAT csv, HEADER true)"
1102 /// )?;
1103 /// writer.send(b"1,Alice\n2,Bob\n")?;
1104 /// let rows = writer.finish()?;
1105 /// # Ok(())
1106 /// # }
1107 /// ```
1108 ///
1109 /// # Errors
1110 ///
1111 /// - Returns [`ErrorKind::Query`] if `query` (trimmed) does not
1112 /// start with `COPY` (defense-in-depth check against non-COPY
1113 /// statements).
1114 /// - Returns [`Error`] (connection) if the connection mutex is
1115 /// poisoned.
1116 /// - Returns [`Error`] (server) or [`Error`] (I/O) if the server rejects
1117 /// the COPY statement or the wire write fails.
1118 pub fn copy_in_raw(&self, query: &str) -> Result<CopyInWriter<'_>> {
1119 // Defense-in-depth: reject queries that don't look like COPY statements
1120 if !query.trim_start().to_ascii_uppercase().starts_with("COPY") {
1121 return Err(Error::new(
1122 ErrorKind::Query,
1123 "copy_in_raw() requires a COPY statement. \
1124 The query must start with 'COPY'.",
1125 ));
1126 }
1127 let mut conn = self.lock_connection()?;
1128 conn.start_copy_in_raw(query)?;
1129 Ok(CopyInWriter { connection: conn })
1130 }
1131
1132 /// Returns true if the connection is alive (no error has occurred).
1133 #[must_use]
1134 pub fn is_alive(&self) -> bool {
1135 self.lock_connection().is_ok()
1136 }
1137
1138 /// Executes a COPY ... TO STDOUT query and returns all output data.
1139 ///
1140 /// This is used for queries like:
1141 /// `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
1142 ///
1143 /// # Arguments
1144 ///
1145 /// * `query` - The COPY TO STDOUT query to execute
1146 ///
1147 /// # Returns
1148 ///
1149 /// The raw bytes from all `CopyData` messages concatenated together.
1150 ///
1151 /// # Example
1152 ///
1153 /// ```no_run
1154 /// # use hyperdb_api_core::client::Client;
1155 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1156 /// let arrow_data = client.copy_out(
1157 /// "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
1158 /// )?;
1159 /// # Ok(())
1160 /// # }
1161 /// ```
1162 ///
1163 /// # Errors
1164 ///
1165 /// - Returns [`Error`] (connection) if the connection mutex is
1166 /// poisoned.
1167 /// - Returns [`Error`] (server) if the server rejects the `COPY ... TO
1168 /// STDOUT` statement.
1169 /// - Returns [`Error`] (I/O) if the wire read fails while collecting
1170 /// COPY output.
1171 pub fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
1172 let mut conn = self.lock_connection()?;
1173 conn.copy_out(query)
1174 }
1175
1176 /// Executes a COPY ... TO STDOUT query and streams output to a writer.
1177 ///
1178 /// Unlike [`copy_out`](Self::copy_out) which collects all data into memory,
1179 /// this method streams each `CopyData` chunk directly to the provided writer,
1180 /// keeping memory usage constant regardless of result size.
1181 ///
1182 /// Returns the total number of bytes written.
1183 ///
1184 /// # Example
1185 ///
1186 /// ```no_run
1187 /// # use hyperdb_api_core::client::Client;
1188 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1189 /// let mut file = std::fs::File::create("output.csv")?;
1190 /// let bytes_written = client.copy_out_to_writer(
1191 /// "COPY (SELECT * FROM my_table) TO STDOUT WITH (FORMAT csv, HEADER true)",
1192 /// &mut file,
1193 /// )?;
1194 /// println!("Wrote {} bytes", bytes_written);
1195 /// # Ok(())
1196 /// # }
1197 /// ```
1198 ///
1199 /// # Errors
1200 ///
1201 /// Same failure modes as [`Self::copy_out`], plus [`Error`] (I/O)
1202 /// when the supplied `writer` returns an error while receiving
1203 /// COPY chunks.
1204 pub fn copy_out_to_writer(&self, query: &str, writer: &mut dyn std::io::Write) -> Result<u64> {
1205 let mut conn = self.lock_connection()?;
1206 conn.copy_out_to_writer(query, writer)
1207 }
1208}
1209
1210impl Cancellable for Client {
1211 /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
1212 /// fresh connection. Swallows errors (logged via `tracing::warn!`)
1213 /// because cancellation is a best-effort signal and callers cannot
1214 /// meaningfully recover from a failed cancel.
1215 fn cancel(&self) {
1216 if let Err(e) = Client::cancel(self) {
1217 warn!(
1218 target: "hyperdb_api_core::client",
1219 error = %e,
1220 process_id = self.process_id,
1221 "cancel request failed (best-effort, swallowed)",
1222 );
1223 }
1224 }
1225}
1226
1227/// A writer for COPY IN operations.
1228///
1229/// This struct holds the connection lock while sending data to ensure
1230/// exclusive access during the COPY operation.
1231#[derive(Debug)]
1232pub struct CopyInWriter<'a> {
1233 connection: MutexGuard<'a, RawConnection<SyncStream>>,
1234}
1235
1236impl CopyInWriter<'_> {
1237 /// Sends a chunk of COPY data.
1238 ///
1239 /// The data should be in `HyperBinary` format. For best performance,
1240 /// batch multiple rows into larger chunks (e.g., 1-16 MB).
1241 ///
1242 /// # Errors
1243 ///
1244 /// Returns [`Error`] (I/O) if the wire write of the `CopyData` frame
1245 /// fails.
1246 pub fn send(&mut self, data: &[u8]) -> Result<()> {
1247 self.connection.send_copy_data(data)
1248 }
1249
1250 /// Flushes any buffered data to the server.
1251 ///
1252 /// # Errors
1253 ///
1254 /// Returns [`Error`] (I/O) if flushing the internal write buffer to
1255 /// the transport fails.
1256 pub fn flush(&mut self) -> Result<()> {
1257 self.connection.flush()
1258 }
1259
1260 /// Sends COPY data directly to the stream without internal buffering.
1261 ///
1262 /// This writes data directly to the TCP stream, letting the kernel handle
1263 /// buffering. More efficient for streaming large amounts of data.
1264 /// Call `flush_stream()` periodically to ensure data is sent.
1265 ///
1266 /// # Errors
1267 ///
1268 /// Returns [`Error`] (I/O) if writing to the underlying stream fails.
1269 pub fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1270 self.connection.send_copy_data_direct(data)
1271 }
1272
1273 /// Flushes the TCP stream.
1274 ///
1275 /// Use with `send_direct()` to periodically ensure data reaches the server.
1276 ///
1277 /// # Errors
1278 ///
1279 /// Returns [`Error`] (I/O) if flushing the underlying transport
1280 /// fails.
1281 pub fn flush_stream(&mut self) -> Result<()> {
1282 self.connection.flush_stream()
1283 }
1284
1285 /// Reserves capacity in the write buffer to avoid reallocations.
1286 ///
1287 /// Call this before bulk operations to pre-allocate buffer space.
1288 pub fn reserve_buffer(&mut self, capacity: usize) {
1289 self.connection.reserve_write_buffer(capacity);
1290 }
1291
1292 /// Finishes the COPY operation successfully.
1293 ///
1294 /// Returns the number of rows inserted.
1295 ///
1296 /// # Errors
1297 ///
1298 /// - Returns [`Error`] (I/O) if writing `CopyDone` or flushing the
1299 /// transport fails.
1300 /// - Returns [`Error`] (server) if the server reports a COPY-side
1301 /// failure (constraint violation, type coercion error, etc.)
1302 /// via an `ErrorResponse` after `CopyDone`.
1303 pub fn finish(mut self) -> Result<u64> {
1304 self.connection.finish_copy()
1305 }
1306
1307 /// Cancels the COPY operation.
1308 ///
1309 /// All data sent so far will be discarded.
1310 ///
1311 /// # Errors
1312 ///
1313 /// Returns [`Error`] (I/O) if writing the `CopyFail` frame or
1314 /// flushing the transport fails, or [`Error`] (server) if the server
1315 /// reports an unexpected status after the cancel.
1316 pub fn cancel(mut self, reason: &str) -> Result<()> {
1317 self.connection.cancel_copy(reason)
1318 }
1319}
1320
1321/// Parses affected row count from a command tag.
1322fn parse_affected_rows(tag: &str) -> Option<u64> {
1323 let parts: Vec<&str> = tag.split_whitespace().collect();
1324
1325 match parts.first()? {
1326 &"INSERT" => {
1327 // INSERT oid count
1328 parts.get(2)?.parse().ok()
1329 }
1330 &"UPDATE" | &"DELETE" | &"SELECT" | &"COPY" => {
1331 // UPDATE/DELETE/SELECT/COPY count
1332 parts.get(1)?.parse().ok()
1333 }
1334 _ => None,
1335 }
1336}
1337
1338/// Streaming iterator for query results without materializing all rows.
1339///
1340/// Holding a `QueryStream` keeps the underlying [`RawConnection`] locked
1341/// via a `MutexGuard`. Dropping the stream before fully iterating triggers
1342/// a server-side cancel (see [`Drop`] below) so the connection is returned
1343/// to the pool cleanly.
1344pub struct QueryStream<'a> {
1345 conn: Option<MutexGuard<'a, RawConnection<SyncStream>>>,
1346 /// Best-effort cancel handle, used in [`Drop`] when the stream is
1347 /// abandoned before completion. For the current sync client this is
1348 /// the owning [`Client`] itself (which implements [`Cancellable`] via
1349 /// a PG wire `CancelRequest` on a fresh connection). When a gRPC
1350 /// equivalent lands it will plug in the same trait with a
1351 /// `cancel_query(query_id)` RPC implementation.
1352 canceller: &'a dyn Cancellable,
1353 finished: bool,
1354 chunk_size: usize,
1355 schema: Option<Vec<super::statement::Column>>,
1356 schema_read: bool,
1357}
1358
1359impl std::fmt::Debug for QueryStream<'_> {
1360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1361 f.debug_struct("QueryStream")
1362 .field("finished", &self.finished)
1363 .field("chunk_size", &self.chunk_size)
1364 .field("schema_read", &self.schema_read)
1365 .finish_non_exhaustive()
1366 }
1367}
1368
1369impl Drop for QueryStream<'_> {
1370 fn drop(&mut self) {
1371 // If the caller exhausted the stream, the connection is already at
1372 // `ReadyForQuery` — nothing to do.
1373 if self.finished {
1374 return;
1375 }
1376
1377 // Otherwise: the server is still happily streaming rows (potentially
1378 // millions of them) for a query we no longer care about. Passively
1379 // draining would waste bandwidth and could block the destructor for
1380 // a very long time. Instead we send a transport-appropriate cancel
1381 // signal — on PG wire this is a `CancelRequest` packet on a fresh
1382 // connection; on gRPC (future) it will be a `cancel_query` RPC on
1383 // the shared channel. `Cancellable::cancel` is fire-and-forget and
1384 // cannot fail.
1385 self.canceller.cancel();
1386
1387 // After cancel, the server stops producing new rows and emits
1388 // `ErrorResponse(QueryCanceled) + ReadyForQuery` promptly. We still
1389 // need to drain those trailing messages off the wire so the
1390 // connection returns to the pool cleanly. The budget here is small
1391 // because we only expect: (a) whatever rows the server had already
1392 // flushed before seeing the cancel, (b) `ErrorResponse`,
1393 // (c) `ReadyForQuery`. A well-behaved server reaches `ReadyForQuery`
1394 // within a handful of messages. If that budget is somehow exceeded
1395 // the bounded drain logs a warning and marks the connection
1396 // desynchronized — downstream users will surface the desync as a
1397 // transport-level failure and reconnect.
1398 const POST_CANCEL_DRAIN_CAP: usize = 1024;
1399 if let Some(ref mut conn) = self.conn {
1400 let _ok = conn.drain_until_ready_bounded(POST_CANCEL_DRAIN_CAP);
1401 }
1402 }
1403}
1404
1405impl QueryStream<'_> {
1406 /// Returns the schema (column metadata) for the result set.
1407 #[must_use]
1408 pub fn schema(&self) -> Option<&[super::statement::Column]> {
1409 self.schema.as_deref()
1410 }
1411
1412 /// Retrieves the next chunk of rows (up to `chunk_size`).
1413 ///
1414 /// # Errors
1415 ///
1416 /// - Returns [`Error`] (I/O) if reading from the underlying transport
1417 /// fails while awaiting the next protocol message.
1418 /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
1419 /// during streaming (for example, a server-side execution failure
1420 /// encountered partway through the result set).
1421 pub fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
1422 if self.finished {
1423 return Ok(None);
1424 }
1425
1426 let Some(conn) = self.conn.as_mut() else {
1427 return Ok(None);
1428 };
1429
1430 let mut rows = Vec::with_capacity(self.chunk_size);
1431 while rows.len() < self.chunk_size {
1432 let msg = conn.read_message()?;
1433 match msg {
1434 Message::RowDescription(desc) if !self.schema_read => {
1435 let mut cols = Vec::new();
1436 for f in desc.fields().filter_map(std::result::Result::ok) {
1437 cols.push(super::statement::Column::new(
1438 f.name().to_string(),
1439 f.type_oid(),
1440 f.type_modifier(),
1441 super::statement::ColumnFormat::from_code(f.format()),
1442 ));
1443 }
1444 self.schema = Some(cols);
1445 self.schema_read = true;
1446 }
1447 Message::DataRow(data) => {
1448 rows.push(StreamRow::new(data));
1449 if rows.len() >= self.chunk_size {
1450 return Ok(Some(rows));
1451 }
1452 }
1453 Message::ReadyForQuery(_) => {
1454 self.finished = true;
1455 self.conn = None;
1456 return if rows.is_empty() {
1457 Ok(None)
1458 } else {
1459 Ok(Some(rows))
1460 };
1461 }
1462 Message::ErrorResponse(body) => {
1463 // Mark the stream finished *before* touching the
1464 // connection so the `Drop` impl's Cancellable-based
1465 // cleanup path is trivially a no-op regardless of what
1466 // happens next (normal return, `?`, panic in drain,
1467 // future refactors that insert early returns, etc).
1468 //
1469 // `&mut self` exclusivity means `Drop` can't fire
1470 // concurrently with this method, so this is purely a
1471 // defensive-ordering improvement — but it also matches
1472 // the `ReadyForQuery` arm above, which sets
1473 // `finished = true` first for the same reason. Keeping
1474 // both terminal arms in the same order makes the
1475 // "terminal state is committed before cleanup" rule
1476 // visible at a glance.
1477 self.finished = true;
1478 // Drain through `ReadyForQuery` before releasing the
1479 // pooled connection so the next user sees a clean wire
1480 // state. `consume_error` swallows any drain I/O errors
1481 // via tracing::warn — the caller's original error is
1482 // more informative than a transport hiccup during
1483 // cleanup.
1484 let err = match self.conn {
1485 Some(ref mut c) => c.consume_error(&body),
1486 None => parse_error_response(&body),
1487 };
1488 self.conn = None;
1489 return Err(err);
1490 }
1491 _ => {}
1492 }
1493 }
1494 Ok(Some(rows))
1495 }
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use super::*;
1501
1502 #[test]
1503 fn test_parse_affected_rows() {
1504 assert_eq!(parse_affected_rows("INSERT 0 5"), Some(5));
1505 assert_eq!(parse_affected_rows("UPDATE 10"), Some(10));
1506 assert_eq!(parse_affected_rows("DELETE 3"), Some(3));
1507 assert_eq!(parse_affected_rows("SELECT 100"), Some(100));
1508 assert_eq!(parse_affected_rows("CREATE TABLE"), None);
1509 }
1510
1511 #[test]
1512 fn test_copy_in_raw_rejects_non_copy_query() {
1513 // We can't test with a real connection, but we can verify the prefix guard
1514 // works by checking the error message. Since Client::connect fails without
1515 // a server, we test the validation logic directly.
1516 let query = "SELECT * FROM users";
1517 assert!(
1518 !query.trim_start().to_ascii_uppercase().starts_with("COPY"),
1519 "Non-COPY query should not pass the COPY prefix check"
1520 );
1521
1522 let copy_query = "COPY \"users\" FROM STDIN WITH (FORMAT csv)";
1523 assert!(
1524 copy_query
1525 .trim_start()
1526 .to_ascii_uppercase()
1527 .starts_with("COPY"),
1528 "COPY query should pass the prefix check"
1529 );
1530
1531 // Leading whitespace should be accepted
1532 let padded = " COPY \"users\" FROM STDIN";
1533 assert!(padded.trim_start().to_ascii_uppercase().starts_with("COPY"));
1534
1535 // Case-insensitive
1536 let lowercase = "copy \"users\" FROM STDIN";
1537 assert!(lowercase
1538 .trim_start()
1539 .to_ascii_uppercase()
1540 .starts_with("COPY"));
1541 }
1542}