hyperdb_api_core/client/client.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level synchronous client for Hyper database.
5//!
6//! This module provides [`Client`], the primary synchronous interface for
7//! communicating with a Hyper server. It supports three query execution
8//! modes, each using a different level of the `PostgreSQL` wire protocol:
9//!
10//! - **Simple Query** ([`Client::query`]) — Sends a single `Query` message
11//! and collects all `DataRow` messages into memory. Results are returned
12//! in text format. Best for small result sets or DDL/DML commands.
13//!
14//! - **Extended Query / `HyperBinary`** ([`Client::query_fast`]) — Uses the
15//! Extended Query protocol (`Parse` / `Bind` / `Execute`) with
16//! [`ColumnFormat::HyperBinary`](super::statement::ColumnFormat::HyperBinary)
17//! (format code 2) for `LittleEndian` binary results. Returns
18//! [`StreamRow`]s that compute field offsets
19//! on-demand, avoiding per-row allocation.
20//!
21//! - **Streaming** ([`Client::query_streaming`]) — Like `query_fast` but
22//! returns a [`QueryStream`] that yields rows in chunks, keeping memory
23//! usage constant regardless of result set size. The stream holds the
24//! connection lock for its lifetime; dropping it triggers a cancel request
25//! to stop the server from streaming the rest.
26//!
27//! # Bulk Insertion (COPY Protocol)
28//!
29//! [`Client::copy_in`] starts a `COPY ... FROM STDIN WITH (FORMAT HYPERBINARY)`
30//! session and returns a [`CopyInWriter`] for streaming binary data. The
31//! caller is responsible for encoding rows in the correct format (typically
32//! done by the higher-level [`hyperdb_api::Inserter`](https://docs.rs/hyperdb-api)).
33//! See also [`copy_in_with_format`](Client::copy_in_with_format) for
34//! alternative formats (CSV, Arrow IPC) and [`copy_in_raw`](Client::copy_in_raw)
35//! for fully custom COPY statements.
36
37use std::net::TcpStream;
38use std::sync::{Arc, Mutex, MutexGuard};
39use std::time::Duration;
40
41use tracing::{debug, info, trace, warn};
42
43/// Enable TCP keepalive on a connection socket so a half-open peer (laptop
44/// sleep, network blip, a hyperd that vanished without a FIN) is detected in
45/// ~90s instead of blocking a blocking `read()` until the OS default idle
46/// timeout (7200s / 2h on macOS and Linux).
47///
48/// This matters most for long-lived idle connections — e.g. an MCP client
49/// holding a connection to a resident daemon's hyperd across a laptop suspend.
50/// Without keepalive, the next query on a silently-dead socket hangs for hours.
51///
52/// Tuning: 60s idle before the first probe, 10s between probes, 3 probes →
53/// the peer is declared dead ~90s after it goes silent. Probe count is only
54/// honored on platforms whose `socket2` build exposes `with_retries`; macOS
55/// honors idle+interval. All calls are best-effort (`.ok()`): a kernel that
56/// rejects a knob leaves the connection working at OS defaults.
57fn apply_tcp_keepalive(sock: &socket2::SockRef<'_>) {
58 let keepalive = socket2::TcpKeepalive::new()
59 .with_time(Duration::from_secs(60))
60 .with_interval(Duration::from_secs(10));
61 #[cfg(not(any(target_os = "macos", target_os = "windows")))]
62 let keepalive = keepalive.with_retries(3);
63 sock.set_tcp_keepalive(&keepalive).ok();
64}
65
66#[cfg(unix)]
67use std::os::unix::net::UnixStream;
68
69use super::cancel::Cancellable;
70use super::config::Config;
71use super::connection::{parse_error_response, RawConnection};
72use super::endpoint::ConnectionEndpoint;
73use super::error::{Error, ErrorKind, Result};
74use super::prepare;
75use super::row::{Row, StreamRow};
76use super::sync_stream::SyncStream;
77
78use crate::protocol::message::Message;
79use crate::types::Oid;
80
81use super::notice::{Notice, NoticeReceiver};
82
83/// A synchronous client for Hyper database.
84///
85/// The client handles connection management and query execution.
86/// It is thread-safe and can be shared between threads using `Arc`.
87///
88/// # Thread Safety
89///
90/// The `Client` is thread-safe and can be shared between threads using `Arc<Client>`.
91/// All methods use internal mutexes to synchronize access to the underlying connection.
92///
93/// # Example
94///
95/// ```no_run
96/// use hyperdb_api_core::client::{Client, Config};
97///
98/// # fn example() -> hyperdb_api_core::client::Result<()> {
99/// let config = Config::new()
100/// .with_host("localhost")
101/// .with_port(7483)
102/// .with_database("test.hyper");
103///
104/// let client = Client::connect(&config)?;
105/// let rows = client.query("SELECT 1")?;
106/// client.close()?;
107/// # Ok(())
108/// # }
109/// ```
110pub struct Client {
111 /// The underlying connection, protected by a mutex for thread safety.
112 connection: Arc<Mutex<RawConnection<SyncStream>>>,
113 /// Backend process ID (for cancel requests).
114 process_id: i32,
115 /// Secret key for authenticating cancel requests.
116 secret_key: i32,
117 /// Connection endpoint for cancel requests and reconnection.
118 endpoint: ConnectionEndpoint,
119 /// Optional notice receiver callback for server notices/warnings.
120 notice_receiver: Option<Arc<NoticeReceiver>>,
121}
122
123// Manual Debug implementation because NoticeReceiver doesn't implement Debug
124impl std::fmt::Debug for Client {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("Client")
127 .field("process_id", &self.process_id)
128 .field("secret_key", &self.secret_key)
129 .field("endpoint", &self.endpoint)
130 .field(
131 "notice_receiver",
132 &self.notice_receiver.as_ref().map(|_| "<callback>"),
133 )
134 .finish_non_exhaustive()
135 }
136}
137
138impl Client {
139 /// Connects to a Hyper server using the given configuration.
140 ///
141 /// Establishes a TCP connection, performs authentication, and initializes
142 /// the client. Returns an error if the connection fails or authentication
143 /// is rejected.
144 ///
145 /// # Arguments
146 ///
147 /// * `config` - Connection configuration (host, port, credentials, etc.)
148 ///
149 /// # Errors
150 ///
151 /// Returns `Error` if:
152 /// - Connection to the server fails
153 /// - Authentication fails
154 /// - Protocol handshake fails
155 ///
156 /// # Example
157 ///
158 /// ```no_run
159 /// # use hyperdb_api_core::client::{Client, Config};
160 /// # fn example() -> hyperdb_api_core::client::Result<()> {
161 /// let config = Config::new()
162 /// .with_host("localhost")
163 /// .with_port(7483)
164 /// .with_user("myuser")
165 /// .with_password("mypass")
166 /// .with_database("test.hyper");
167 ///
168 /// let client = Client::connect(&config)?;
169 /// # Ok(())
170 /// # }
171 /// ```
172 pub fn connect(config: &Config) -> Result<Self> {
173 // Log connection parameters (password is intentionally omitted for security)
174 info!(
175 target: "hyperdb_api",
176 host = %config.host(),
177 port = config.port(),
178 user = config.user().unwrap_or("(default)"),
179 database = config.database().unwrap_or("(none)"),
180 "connection-parameters"
181 );
182
183 let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
184 let addr = format!("{}:{}", config.host(), config.port());
185 let tcp_stream = TcpStream::connect(&addr).map_err(|e| {
186 warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
187 Error::connection(format!("failed to connect to {addr}: {e}"))
188 })?;
189
190 // Set TCP options for better performance.
191 //
192 // `TCP_NODELAY` disables Nagle so request bytes flush immediately —
193 // needed for low-latency request/response shapes.
194 //
195 // `SO_RCVBUF` / `SO_SNDBUF` are bumped to 4 MiB. The Windows default
196 // TCP buffers are ~64 KiB, which throttles loopback throughput
197 // because hyperd blocks on `send()` once the kernel buffer fills up.
198 // Linux auto-tunes much higher, so this primarily helps Windows
199 // (and is a marginal win on macOS).
200 //
201 // Empirical knee on Windows i9-10980XE / TCP loopback (100M-row
202 // sync full-scan, single connection):
203 //
204 // | size | rows/sec |
205 // |------:|---------:|
206 // | 64 K | 2.89 | (default)
207 // | 1 M | 5.95 |
208 // | 4 M | 6.90 | <-- knee
209 // | 8 M | 6.68 | (insert workloads regress 18%)
210 //
211 // 4 MiB hits the throughput plateau without the memory-pressure
212 // regression seen at 8 MiB. We use `.ok()` because the kernel may
213 // clamp to a lower value or refuse the request entirely; either
214 // way the connection still works at the default size.
215 tcp_stream.set_nodelay(true).ok();
216 let sock = socket2::SockRef::from(&tcp_stream);
217 sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
218 sock.set_send_buffer_size(4 * 1024 * 1024).ok();
219 apply_tcp_keepalive(&sock);
220
221 let stream = SyncStream::tcp(tcp_stream);
222 let mut connection = RawConnection::new(stream);
223
224 // Perform startup with authentication
225 let params = config.startup_params();
226 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
227 connection.startup(¶ms_ref, config.password())?;
228
229 let process_id = connection.process_id();
230 let secret_key = connection.secret_key();
231
232 debug!(
233 target: "hyperdb_api",
234 process_id,
235 "connection-established"
236 );
237
238 Ok(Client {
239 connection: Arc::new(Mutex::new(connection)),
240 process_id,
241 secret_key,
242 endpoint,
243 notice_receiver: None,
244 })
245 }
246
247 /// Connects to a Hyper server via Unix Domain Socket (Unix only).
248 ///
249 /// # Example
250 ///
251 /// ```no_run
252 /// # use hyperdb_api_core::client::{Client, Config};
253 /// # use std::path::Path;
254 /// # fn example() -> hyperdb_api_core::client::Result<()> {
255 /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
256 /// let config = Config::new().with_database("test.hyper");
257 /// let client = Client::connect_unix(socket_path, &config)?;
258 /// # Ok(())
259 /// # }
260 /// ```
261 ///
262 /// # Errors
263 ///
264 /// - Returns [`Error`] (connection) if the Unix domain socket cannot
265 /// be connected.
266 /// - Propagates any [`Error`] from the startup handshake
267 /// (authentication, protocol error, I/O error).
268 #[cfg(unix)]
269 pub fn connect_unix(socket_path: impl AsRef<std::path::Path>, config: &Config) -> Result<Self> {
270 use std::path::Path;
271
272 let path = socket_path.as_ref();
273 info!(
274 target: "hyperdb_api",
275 socket_path = %path.display(),
276 user = config.user().unwrap_or("(default)"),
277 database = config.database().unwrap_or("(none)"),
278 "connection-parameters-unix"
279 );
280
281 let unix_stream = UnixStream::connect(path).map_err(|e| {
282 warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
283 Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
284 })?;
285
286 // Parse endpoint from socket path
287 let directory = path.parent().unwrap_or(Path::new("/"));
288 let name = path
289 .file_name()
290 .and_then(|n| n.to_str())
291 .unwrap_or("socket");
292 let endpoint = ConnectionEndpoint::domain_socket(directory, name);
293
294 let stream = SyncStream::unix(unix_stream);
295 let mut connection = RawConnection::new(stream);
296
297 // Perform startup with authentication
298 let params = config.startup_params();
299 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
300 connection.startup(¶ms_ref, config.password())?;
301
302 let process_id = connection.process_id();
303 let secret_key = connection.secret_key();
304
305 debug!(
306 target: "hyperdb_api",
307 process_id,
308 "connection-established-unix"
309 );
310
311 Ok(Client {
312 connection: Arc::new(Mutex::new(connection)),
313 process_id,
314 secret_key,
315 endpoint,
316 notice_receiver: None,
317 })
318 }
319
320 /// Connects to a Hyper server via Windows Named Pipe (Windows only).
321 ///
322 /// # Arguments
323 ///
324 /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
325 /// * `config` - Connection configuration
326 ///
327 /// # Errors
328 ///
329 /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
330 /// exist, all instances are busy after the retry window, or permission is
331 /// denied) or if the authentication handshake fails.
332 #[cfg(windows)]
333 pub fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
334 use std::fs::OpenOptions;
335 use std::time::{Duration, Instant};
336
337 info!(
338 target: "hyperdb_api",
339 pipe_path = %pipe_path,
340 user = config.user().unwrap_or("(default)"),
341 database = config.database().unwrap_or("(none)"),
342 "connection-parameters-named-pipe"
343 );
344
345 // Windows named pipes have a finite number of server-side instances
346 // (`MaxInstances` on `CreateNamedPipe`). When all are busy, `CreateFile`
347 // returns `ERROR_PIPE_BUSY` (231). The expected client behavior is to
348 // wait briefly and retry — equivalent to `WaitNamedPipe` from Win32.
349 // We poll with a short sleep up to a reasonable deadline so concurrent
350 // clients don't spuriously fail when the pool is momentarily exhausted.
351 const RETRY_INTERVAL: Duration = Duration::from_millis(20);
352 const MAX_WAIT: Duration = Duration::from_secs(10);
353 const ERROR_PIPE_BUSY: i32 = 231;
354
355 let deadline = Instant::now() + MAX_WAIT;
356 let file = loop {
357 match OpenOptions::new().read(true).write(true).open(pipe_path) {
358 Ok(f) => break f,
359 Err(e)
360 if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
361 {
362 std::thread::sleep(RETRY_INTERVAL);
363 }
364 Err(e) => {
365 warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
366 return Err(Error::connection(format!(
367 "failed to connect to named pipe {pipe_path}: {e}"
368 )));
369 }
370 }
371 };
372
373 // Parse endpoint from pipe path
374 let endpoint = ConnectionEndpoint::parse(&format!(
375 "tab.pipe://{}",
376 pipe_path.trim_start_matches(r"\\").replace('\\', "/")
377 ))
378 .unwrap_or_else(|_| {
379 // Fallback: construct from pipe path directly
380 // Expected format: \\<host>\pipe\<name>
381 let parts: Vec<&str> = pipe_path
382 .trim_start_matches(r"\\")
383 .splitn(3, '\\')
384 .collect();
385 if parts.len() >= 3 {
386 ConnectionEndpoint::named_pipe(parts[0], parts[2])
387 } else {
388 ConnectionEndpoint::named_pipe(".", pipe_path)
389 }
390 });
391
392 let stream = SyncStream::named_pipe(file);
393 let mut connection = RawConnection::new(stream);
394
395 // Perform startup with authentication
396 let params = config.startup_params();
397 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
398 connection.startup(¶ms_ref, config.password())?;
399
400 let process_id = connection.process_id();
401 let secret_key = connection.secret_key();
402
403 debug!(
404 target: "hyperdb_api",
405 process_id,
406 "connection-established-named-pipe"
407 );
408
409 Ok(Client {
410 connection: Arc::new(Mutex::new(connection)),
411 process_id,
412 secret_key,
413 endpoint,
414 notice_receiver: None,
415 })
416 }
417
418 /// Connects to a Hyper server using a `ConnectionEndpoint`.
419 ///
420 /// This is a lower-level method that accepts a pre-parsed endpoint.
421 ///
422 /// # Errors
423 ///
424 /// Delegates to [`Client::connect`], [`Client::connect_unix`], or
425 /// `Client::connect_named_pipe` depending on the endpoint variant,
426 /// and propagates their errors unchanged.
427 pub fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
428 match endpoint {
429 ConnectionEndpoint::Tcp { host, port } => {
430 let mut cfg = config.clone();
431 cfg = cfg.with_host(host.clone()).with_port(*port);
432 Self::connect(&cfg)
433 }
434 #[cfg(unix)]
435 ConnectionEndpoint::DomainSocket { directory, name } => {
436 let socket_path = directory.join(name);
437 Self::connect_unix(&socket_path, config)
438 }
439 #[cfg(windows)]
440 ConnectionEndpoint::NamedPipe { host, name } => {
441 let pipe_path = format!(r"\\{host}\pipe\{name}");
442 Self::connect_named_pipe(&pipe_path, config)
443 }
444 }
445 }
446
447 /// Returns the connection endpoint.
448 #[must_use]
449 pub fn endpoint(&self) -> &ConnectionEndpoint {
450 &self.endpoint
451 }
452
453 /// Returns the server process ID for this connection.
454 #[must_use]
455 pub fn process_id(&self) -> i32 {
456 self.process_id
457 }
458
459 /// Returns the secret key for cancel requests.
460 #[must_use]
461 pub fn secret_key(&self) -> i32 {
462 self.secret_key
463 }
464
465 /// Cancels the currently executing query on this connection.
466 ///
467 /// This method is **thread-safe** and can be called from any thread while
468 /// a query is running on another thread. It works by opening a separate
469 /// TCP connection to the server and sending a cancel request.
470 ///
471 /// # How It Works
472 ///
473 /// 1. Opens a new TCP connection to the same server
474 /// 2. Sends a cancel request containing the process ID and secret key
475 /// 3. The server receives this and cancels the running query
476 /// 4. The original query will fail with error code 57014 (`query_canceled`)
477 ///
478 /// # Thread Safety
479 ///
480 /// This method does NOT acquire the connection mutex, so it can be called
481 /// while another thread is blocked waiting for query results.
482 ///
483 /// # Relation to the [`Cancellable`] trait
484 ///
485 /// This is the **fallible user-facing cancel API**: it returns a
486 /// `Result<()>` so explicit callers can observe transport-level
487 /// failures (network errors, socket issues) and react accordingly —
488 /// e.g. record a metric, show "cancel failed" UX, or retry.
489 ///
490 /// For [`Drop`]-path and other internal cleanup contexts where error
491 /// propagation is impossible, the separate
492 /// [`impl Cancellable for Client`](super::cancel::Cancellable) wraps
493 /// this method and swallows errors (logged via `tracing::warn!`).
494 /// The two coexist by design — each serves a different consumer.
495 ///
496 /// # Example
497 ///
498 /// ```no_run
499 /// use std::thread;
500 /// use std::sync::Arc;
501 /// use std::time::Duration;
502 /// use hyperdb_api_core::client::{Client, Config};
503 ///
504 /// # fn example() -> hyperdb_api_core::client::Result<()> {
505 /// # let config = Config::new().with_host("localhost").with_port(7483);
506 /// let client = Arc::new(Client::connect(&config)?);
507 /// let client_clone = Arc::clone(&client);
508 ///
509 /// // Start a long query in another thread
510 /// let handle = thread::spawn(move || {
511 /// client_clone.query("SELECT pg_sleep(60)")
512 /// });
513 ///
514 /// // Cancel from the main thread
515 /// thread::sleep(Duration::from_millis(100));
516 /// client.cancel()?;
517 ///
518 /// // The query thread will get a cancellation error
519 /// let result = handle.join().unwrap();
520 /// assert!(result.is_err());
521 /// # Ok(())
522 /// # }
523 /// ```
524 ///
525 /// # Errors
526 ///
527 /// - Returns [`Error`] (connection) if the fresh cancel-side socket
528 /// cannot be opened (TCP / UDS / named-pipe, depending on
529 /// [`Self::endpoint`]).
530 /// - Returns [`Error`] (I/O) if writing or flushing the cancel
531 /// request fails.
532 pub fn cancel(&self) -> Result<()> {
533 use crate::protocol::message::frontend;
534 use bytes::BytesMut;
535 use std::io::Write;
536
537 info!(
538 target: "hyperdb_api",
539 process_id = self.process_id,
540 "query-cancel-request"
541 );
542
543 let endpoint_str = self.endpoint.to_string();
544
545 // Open a new connection specifically for the cancel request
546 match &self.endpoint {
547 ConnectionEndpoint::Tcp { host, port } => {
548 let addr = format!("{host}:{port}");
549 let mut stream = TcpStream::connect(&addr).map_err(|e| {
550 warn!(
551 target: "hyperdb_api",
552 addr = %endpoint_str,
553 error = %e,
554 "query-cancel-connect-failed"
555 );
556 Error::connection(format!(
557 "failed to connect for cancel request to {endpoint_str}: {e}"
558 ))
559 })?;
560 // Cancel is a 16-byte fire-and-forget — disable Nagle so the
561 // request hits the wire without waiting on a coalesce timer.
562 stream.set_nodelay(true).ok();
563
564 // Build and send the cancel request
565 let mut buf = BytesMut::with_capacity(16);
566 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
567
568 stream.write_all(&buf).map_err(|e| {
569 warn!(
570 target: "hyperdb_api",
571 error = %e,
572 "query-cancel-send-failed"
573 );
574 Error::io(e)
575 })?;
576
577 stream.flush().map_err(Error::io)?;
578 }
579 #[cfg(unix)]
580 ConnectionEndpoint::DomainSocket { directory, name } => {
581 let socket_path = directory.join(name);
582 let mut stream = UnixStream::connect(&socket_path).map_err(|e| {
583 warn!(
584 target: "hyperdb_api",
585 addr = %endpoint_str,
586 error = %e,
587 "query-cancel-connect-failed"
588 );
589 Error::connection(format!(
590 "failed to connect for cancel request to {endpoint_str}: {e}"
591 ))
592 })?;
593
594 // Build and send the cancel request
595 let mut buf = BytesMut::with_capacity(16);
596 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
597
598 stream.write_all(&buf).map_err(|e| {
599 warn!(
600 target: "hyperdb_api",
601 error = %e,
602 "query-cancel-send-failed"
603 );
604 Error::io(e)
605 })?;
606
607 stream.flush().map_err(Error::io)?;
608 }
609 #[cfg(windows)]
610 ConnectionEndpoint::NamedPipe { host, name } => {
611 let pipe_path = format!(r"\\{host}\pipe\{name}");
612 let mut file = std::fs::OpenOptions::new()
613 .read(true)
614 .write(true)
615 .open(&pipe_path)
616 .map_err(|e| {
617 warn!(
618 target: "hyperdb_api",
619 addr = %endpoint_str,
620 error = %e,
621 "query-cancel-connect-failed"
622 );
623 Error::connection(format!(
624 "failed to connect for cancel request to {endpoint_str}: {e}"
625 ))
626 })?;
627
628 // Build and send the cancel request
629 let mut buf = BytesMut::with_capacity(16);
630 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
631
632 file.write_all(&buf).map_err(|e| {
633 warn!(
634 target: "hyperdb_api",
635 error = %e,
636 "query-cancel-send-failed"
637 );
638 Error::io(e)
639 })?;
640
641 file.flush().map_err(Error::io)?;
642 }
643 }
644
645 debug!(
646 target: "hyperdb_api",
647 process_id = self.process_id,
648 "query-cancel-sent"
649 );
650
651 Ok(())
652 }
653
654 /// Returns a server parameter value by name.
655 ///
656 /// Server parameters are sent by the server during connection startup.
657 /// Common parameters include:
658 /// - `server_version` - The server version string
659 /// - `server_encoding` - The server's character encoding
660 /// - `client_encoding` - The client's character encoding
661 ///
662 /// # Example
663 ///
664 /// ```no_run
665 /// # use hyperdb_api_core::client::{Client, Config};
666 /// # fn example(client: &Client) {
667 /// if let Some(version) = client.parameter_status("server_version") {
668 /// println!("Connected to Hyper version: {}", version);
669 /// }
670 /// # }
671 /// ```
672 #[must_use]
673 pub fn parameter_status(&self, name: &str) -> Option<String> {
674 let conn = self.connection.lock().ok()?;
675 conn.parameter_status(name)
676 .map(std::string::ToString::to_string)
677 }
678
679 /// Sets the notice receiver for this connection.
680 ///
681 /// Notice and warning messages generated by the server are not returned by
682 /// query execution functions since they don't indicate failure. Instead,
683 /// they are passed to a notice handling function.
684 ///
685 /// The default behavior is to log notices at the `warn` level.
686 ///
687 /// # Arguments
688 ///
689 /// * `receiver` - The callback function that will be called with each notice.
690 /// Pass `None` to restore default logging behavior.
691 ///
692 /// # Example
693 ///
694 /// ```no_run
695 /// # use hyperdb_api_core::client::Client;
696 /// # use std::sync::{Arc, Mutex};
697 /// # fn example(client: &mut Client) {
698 /// client.set_notice_receiver(Some(Box::new(|notice| {
699 /// println!("Server notice: {}", notice);
700 /// })));
701 ///
702 /// // Or capture notices in a Vec
703 /// let notices = Arc::new(Mutex::new(Vec::new()));
704 /// let notices_clone = notices.clone();
705 /// client.set_notice_receiver(Some(Box::new(move |notice| {
706 /// notices_clone.lock().unwrap().push(notice);
707 /// })));
708 /// # }
709 /// ```
710 pub fn set_notice_receiver(&mut self, receiver: Option<NoticeReceiver>) {
711 self.notice_receiver = receiver.map(Arc::new);
712 }
713
714 /// Processes any notices in a list of messages, calling the notice receiver.
715 ///
716 /// This is called internally after receiving messages from the server.
717 pub(crate) fn process_notices(&self, messages: &[Message]) {
718 for msg in messages {
719 if let Message::NoticeResponse(body) = msg {
720 let notice = Notice::from_response_body(body);
721
722 if let Some(ref receiver) = self.notice_receiver {
723 receiver(notice);
724 } else {
725 // Default behavior: log at warn level
726 warn!(
727 target: "hyperdb_api",
728 severity = notice.severity().unwrap_or("NOTICE"),
729 code = notice.code().unwrap_or(""),
730 message = %notice.message(),
731 "server-notice"
732 );
733 }
734 }
735 }
736 }
737
738 /// Acquires a lock on the connection.
739 fn lock_connection(&self) -> Result<MutexGuard<'_, RawConnection<SyncStream>>> {
740 self.connection
741 .lock()
742 .map_err(|_| Error::connection("connection mutex poisoned"))
743 }
744
745 /// Executes a simple query and returns the rows.
746 ///
747 /// # Errors
748 ///
749 /// - Returns [`Error`] (connection) if the connection mutex is
750 /// poisoned.
751 /// - Returns [`Error`] (server) for any SQL error the server reports
752 /// (syntax error, constraint violation, type mismatch).
753 /// - Returns [`Error`] (I/O) on wire-protocol I/O failure.
754 /// - Propagates any [`Error`] from row construction (invalid row
755 /// description or data row bytes).
756 pub fn query(&self, query: &str) -> Result<Vec<Row>> {
757 let mut conn = self.lock_connection()?;
758 let messages = conn.simple_query(query)?;
759 drop(conn); // Release lock before processing notices
760
761 self.process_notices(&messages);
762
763 let mut rows = Vec::new();
764 let mut columns = None;
765
766 for msg in messages {
767 match msg {
768 crate::protocol::message::Message::RowDescription(desc) => {
769 // Extract column info including format from protocol
770 let mut cols = Vec::new();
771 for f in desc.fields().filter_map(|r| {
772 r.map_err(|e| trace!(target: "hyperdb_api_core::client", error = %e, "dropped error parsing row description field")).ok()
773 }) {
774 cols.push(super::statement::Column::new(
775 f.name().to_string(),
776 f.type_oid(),
777 f.type_modifier(),
778 super::statement::ColumnFormat::from_code(f.format()),
779 ));
780 }
781 columns = Some(Arc::new(cols));
782 }
783 crate::protocol::message::Message::DataRow(data) => {
784 if let Some(ref cols) = columns {
785 rows.push(Row::new(Arc::clone(cols), data)?);
786 }
787 }
788 _ => {}
789 }
790 }
791
792 Ok(rows)
793 }
794
795 /// Executes a query using `HyperBinary` format for maximum performance.
796 ///
797 /// Returns `StreamRow`s which compute offsets on-demand without pre-allocation,
798 /// making them faster for large result sets where each row is processed once.
799 /// Uses the extended query protocol with `HyperBinary` format (format code 2)
800 /// for direct binary access without text parsing overhead.
801 ///
802 /// # Errors
803 ///
804 /// Same as [`Self::query`]: connection-mutex poisoning, SQL errors
805 /// from the server, and wire-protocol I/O failures all surface as
806 /// [`Error`].
807 pub fn query_fast(&self, query: &str) -> Result<Vec<StreamRow>> {
808 let mut conn = self.lock_connection()?;
809 let messages = conn.query_binary(query)?;
810 drop(conn);
811
812 self.process_notices(&messages);
813
814 let mut rows = Vec::new();
815 for msg in messages {
816 if let crate::protocol::message::Message::DataRow(data) = msg {
817 rows.push(StreamRow::new(data));
818 }
819 }
820 Ok(rows)
821 }
822
823 /// Executes a query with streaming results for minimum memory usage.
824 ///
825 /// Combines `HyperBinary` format with incremental row fetching.
826 ///
827 /// # Errors
828 ///
829 /// - Returns [`Error`] (connection) if the connection mutex is
830 /// poisoned.
831 /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
832 /// `Parse`/`Bind`/`Execute` sequence for the streaming query
833 /// fails on the server or on the wire.
834 pub fn query_streaming<'a>(
835 &'a self,
836 query: &str,
837 chunk_size: usize,
838 ) -> Result<QueryStream<'a>> {
839 let mut conn = self.lock_connection()?;
840 conn.start_query_binary(query)?;
841 Ok(QueryStream {
842 conn: Some(conn),
843 // The owning client is the canceller: if the stream is dropped
844 // before being fully drained, its `Drop` impl will call
845 // `self.cancel()` (PG wire `CancelRequest` on a fresh
846 // connection) to stop the server from streaming the rest.
847 canceller: self,
848 finished: false,
849 chunk_size: chunk_size.max(1),
850 schema: None,
851 schema_read: false,
852 })
853 }
854
855 /// Executes a SQL command that doesn't return rows (e.g., INSERT, UPDATE).
856 ///
857 /// # Errors
858 ///
859 /// Same error modes as [`Self::query`] — connection-mutex poisoning,
860 /// server-side SQL errors, and wire-protocol I/O failures all
861 /// surface as [`Error`].
862 pub fn exec(&self, query: &str) -> Result<u64> {
863 let mut conn = self.lock_connection()?;
864 let messages = conn.simple_query(query)?;
865 drop(conn); // Release lock before processing notices
866
867 self.process_notices(&messages);
868
869 let mut affected = 0u64;
870 for msg in messages {
871 if let crate::protocol::message::Message::CommandComplete(body) = msg {
872 if let Ok(tag) = body.tag() {
873 // Parse affected row count from tag like "INSERT 0 1"
874 if let Some(count) = parse_affected_rows(tag) {
875 affected = count;
876 }
877 }
878 }
879 }
880
881 Ok(affected)
882 }
883
884 /// Executes a batch of statements separated by semicolons.
885 ///
886 /// # Errors
887 ///
888 /// Same error modes as [`Self::query`] — connection-mutex poisoning,
889 /// server-side SQL errors, and wire-protocol I/O failures.
890 pub fn batch_execute(&self, query: &str) -> Result<()> {
891 let mut conn = self.lock_connection()?;
892 let messages = conn.simple_query(query)?;
893 drop(conn); // Release lock before processing notices
894
895 self.process_notices(&messages);
896 Ok(())
897 }
898
899 /// Prepares a statement for execution with the \[`params!`\] macro.
900 ///
901 /// Returns an [`prepare::OwnedPreparedStatement`] that automatically closes when dropped.
902 ///
903 /// # Example
904 ///
905 /// ```no_run
906 /// # use hyperdb_api_core::{params, client::{Client, Config}};
907 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
908 /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1")?;
909 /// let rows = client.execute(&stmt, params![42_i32])?;
910 /// # Ok(())
911 /// # }
912 /// ```
913 ///
914 /// # Errors
915 ///
916 /// Propagates any [`Error`] from [`prepare::prepare_owned`] —
917 /// connection-mutex poisoning, server-side `Parse` failures (SQL
918 /// syntax, type resolution), and wire-protocol I/O failures.
919 pub fn prepare(&self, query: &str) -> Result<prepare::OwnedPreparedStatement> {
920 prepare::prepare_owned(&self.connection, query, &[])
921 }
922
923 /// Prepares a statement with explicit parameter types.
924 ///
925 /// # Errors
926 ///
927 /// Same failure modes as [`Self::prepare`].
928 pub fn prepare_typed(
929 &self,
930 query: &str,
931 param_types: &[Oid],
932 ) -> Result<prepare::OwnedPreparedStatement> {
933 prepare::prepare_owned(&self.connection, query, param_types)
934 }
935
936 /// Executes a prepared statement with parameters.
937 ///
938 /// Use the \[`params!`\] macro for ergonomic parameter encoding:
939 ///
940 /// ```no_run
941 /// # use hyperdb_api_core::{params, client::{Client, Config}};
942 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
943 /// let stmt = client.prepare("SELECT * FROM users WHERE id = $1 AND name = $2")?;
944 /// let rows = client.execute(&stmt, params![42_i32, "Alice"])?;
945 /// # Ok(())
946 /// # }
947 /// ```
948 ///
949 /// # Errors
950 ///
951 /// Propagates any [`Error`] from [`prepare::execute_prepared`] —
952 /// parameter-count or type mismatch, server-side SQL errors, and
953 /// wire-protocol I/O failures.
954 pub fn execute<P: AsRef<[Option<Vec<u8>>]>>(
955 &self,
956 statement: &prepare::OwnedPreparedStatement,
957 params: P,
958 ) -> Result<Vec<Row>> {
959 let params_ref: Vec<Option<&[u8]>> = params
960 .as_ref()
961 .iter()
962 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
963 .collect();
964 prepare::execute_prepared(&self.connection, statement.statement(), ¶ms_ref)
965 }
966
967 /// Executes a prepared statement that doesn't return rows.
968 ///
969 /// # Errors
970 ///
971 /// Same failure modes as [`Self::execute`].
972 pub fn execute_no_result<P: AsRef<[Option<Vec<u8>>]>>(
973 &self,
974 statement: &prepare::OwnedPreparedStatement,
975 params: P,
976 ) -> Result<u64> {
977 let params_ref: Vec<Option<&[u8]>> = params
978 .as_ref()
979 .iter()
980 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
981 .collect();
982 prepare::execute_prepared_no_result(&self.connection, statement.statement(), ¶ms_ref)
983 }
984
985 /// Executes a prepared statement with streaming results.
986 ///
987 /// Returns a [`PreparedQueryStream`](super::prepared_stream::PreparedQueryStream)
988 /// that yields rows in chunks, keeping memory bounded regardless of
989 /// result size. This is the prepared-statement analog of
990 /// [`query_streaming`](Self::query_streaming).
991 ///
992 /// The connection mutex is held for the duration of iteration;
993 /// dropping the stream before completion issues a best-effort cancel.
994 ///
995 /// # Errors
996 ///
997 /// - Returns [`Error`] (connection) if the connection mutex is
998 /// poisoned.
999 /// - Returns [`Error`] (server) or [`Error`] (I/O) if the initial
1000 /// `Bind`/`Execute` sequence for the prepared statement fails on
1001 /// the server or on the wire.
1002 pub fn execute_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
1003 &'a self,
1004 statement: &prepare::OwnedPreparedStatement,
1005 params: P,
1006 chunk_size: usize,
1007 ) -> Result<super::prepared_stream::PreparedQueryStream<'a>> {
1008 let params_ref: Vec<Option<&[u8]>> = params
1009 .as_ref()
1010 .iter()
1011 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
1012 .collect();
1013
1014 let mut conn = self.lock_connection()?;
1015 conn.start_execute_prepared(statement.name(), ¶ms_ref, statement.columns().len())?;
1016
1017 let columns = std::sync::Arc::new(statement.columns().to_vec());
1018 Ok(super::prepared_stream::PreparedQueryStream::new(
1019 conn, self, chunk_size, columns,
1020 ))
1021 }
1022
1023 /// Closes the connection.
1024 ///
1025 /// # Errors
1026 ///
1027 /// - Returns [`Error`] (connection) if the connection mutex is
1028 /// poisoned.
1029 /// - Returns [`Error`] (I/O) if writing the `Terminate` message or
1030 /// flushing the socket fails.
1031 pub fn close(self) -> Result<()> {
1032 let mut conn = self.lock_connection()?;
1033 conn.terminate()
1034 }
1035
1036 /// Starts a COPY IN operation for bulk data insertion.
1037 ///
1038 /// Returns a `CopyInWriter` that can be used to send data in `HyperBinary` format.
1039 ///
1040 /// # Example
1041 ///
1042 /// ```no_run
1043 /// # use hyperdb_api_core::client::Client;
1044 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1045 /// # let binary_data = &[];
1046 /// let mut writer = client.copy_in("\"my_table\"", &["col1", "col2"])?;
1047 /// writer.send(binary_data)?;
1048 /// let rows = writer.finish()?;
1049 /// # Ok(())
1050 /// # }
1051 /// ```
1052 ///
1053 /// # Errors
1054 ///
1055 /// Delegates to [`Self::copy_in_with_format`]; see that method
1056 /// for the concrete failure modes.
1057 pub fn copy_in(&self, table_name: &str, columns: &[&str]) -> Result<CopyInWriter<'_>> {
1058 self.copy_in_with_format(table_name, columns, "HYPERBINARY")
1059 }
1060
1061 /// Starts a COPY IN operation with a specified data format.
1062 ///
1063 /// Returns a `CopyInWriter` that can be used to send data in the specified format.
1064 ///
1065 /// # Arguments
1066 ///
1067 /// * `table_name` - The target table name (should be properly quoted if needed)
1068 /// * `columns` - Column names to insert into
1069 /// * `format` - The data format string: "HYPERBINARY" or "ARROWSTREAM"
1070 ///
1071 /// # Example
1072 ///
1073 /// ```no_run
1074 /// # use hyperdb_api_core::client::Client;
1075 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1076 /// # let arrow_ipc_data = &[];
1077 /// // For Arrow IPC stream format
1078 /// let mut writer = client.copy_in_with_format("\"my_table\"", &["col1", "col2"], "ARROWSTREAM")?;
1079 /// writer.send(arrow_ipc_data)?;
1080 /// let rows = writer.finish()?;
1081 /// # Ok(())
1082 /// # }
1083 /// ```
1084 ///
1085 /// # Errors
1086 ///
1087 /// - Returns [`Error`] (connection) if the connection mutex is
1088 /// poisoned.
1089 /// - Returns [`Error`] (server) if the server rejects the generated
1090 /// `COPY ... FROM STDIN` statement (for example, missing table
1091 /// or mismatched columns).
1092 /// - Returns [`Error`] (I/O) on wire-protocol I/O failure while
1093 /// initiating the COPY.
1094 pub fn copy_in_with_format(
1095 &self,
1096 table_name: &str,
1097 columns: &[&str],
1098 format: &str,
1099 ) -> Result<CopyInWriter<'_>> {
1100 let mut conn = self.lock_connection()?;
1101 conn.start_copy_in_with_format(table_name, columns, format)?;
1102 Ok(CopyInWriter { connection: conn })
1103 }
1104
1105 /// Starts a COPY IN operation from a raw SQL query string.
1106 ///
1107 /// The query must be a complete `COPY ... FROM STDIN ...` statement.
1108 /// This is useful for text-format imports (CSV, TSV) where you need
1109 /// full control over the COPY options.
1110 ///
1111 /// # Security
1112 ///
1113 /// The query is validated to start with `COPY` (case-insensitive) as a
1114 /// defense-in-depth measure. Callers are still responsible for proper
1115 /// escaping of table names and other identifiers within the query.
1116 /// Prefer [`copy_in()`](Self::copy_in) or
1117 /// [`copy_in_with_format()`](Self::copy_in_with_format) when possible,
1118 /// as they handle escaping automatically.
1119 ///
1120 /// # Example
1121 ///
1122 /// ```no_run
1123 /// # use hyperdb_api_core::client::Client;
1124 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1125 /// let mut writer = client.copy_in_raw(
1126 /// "COPY \"my_table\" FROM STDIN WITH (FORMAT csv, HEADER true)"
1127 /// )?;
1128 /// writer.send(b"1,Alice\n2,Bob\n")?;
1129 /// let rows = writer.finish()?;
1130 /// # Ok(())
1131 /// # }
1132 /// ```
1133 ///
1134 /// # Errors
1135 ///
1136 /// - Returns [`ErrorKind::Query`] if `query` (trimmed) does not
1137 /// start with `COPY` (defense-in-depth check against non-COPY
1138 /// statements).
1139 /// - Returns [`Error`] (connection) if the connection mutex is
1140 /// poisoned.
1141 /// - Returns [`Error`] (server) or [`Error`] (I/O) if the server rejects
1142 /// the COPY statement or the wire write fails.
1143 pub fn copy_in_raw(&self, query: &str) -> Result<CopyInWriter<'_>> {
1144 // Defense-in-depth: reject queries that don't look like COPY statements
1145 if !query.trim_start().to_ascii_uppercase().starts_with("COPY") {
1146 return Err(Error::new(
1147 ErrorKind::Query,
1148 "copy_in_raw() requires a COPY statement. \
1149 The query must start with 'COPY'.",
1150 ));
1151 }
1152 let mut conn = self.lock_connection()?;
1153 conn.start_copy_in_raw(query)?;
1154 Ok(CopyInWriter { connection: conn })
1155 }
1156
1157 /// Returns true if the connection is alive (no error has occurred).
1158 #[must_use]
1159 pub fn is_alive(&self) -> bool {
1160 self.lock_connection().is_ok()
1161 }
1162
1163 /// Executes a COPY ... TO STDOUT query and returns all output data.
1164 ///
1165 /// This is used for queries like:
1166 /// `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
1167 ///
1168 /// # Arguments
1169 ///
1170 /// * `query` - The COPY TO STDOUT query to execute
1171 ///
1172 /// # Returns
1173 ///
1174 /// The raw bytes from all `CopyData` messages concatenated together.
1175 ///
1176 /// # Example
1177 ///
1178 /// ```no_run
1179 /// # use hyperdb_api_core::client::Client;
1180 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1181 /// let arrow_data = client.copy_out(
1182 /// "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
1183 /// )?;
1184 /// # Ok(())
1185 /// # }
1186 /// ```
1187 ///
1188 /// # Errors
1189 ///
1190 /// - Returns [`Error`] (connection) if the connection mutex is
1191 /// poisoned.
1192 /// - Returns [`Error`] (server) if the server rejects the `COPY ... TO
1193 /// STDOUT` statement.
1194 /// - Returns [`Error`] (I/O) if the wire read fails while collecting
1195 /// COPY output.
1196 pub fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
1197 let mut conn = self.lock_connection()?;
1198 conn.copy_out(query)
1199 }
1200
1201 /// Executes a COPY ... TO STDOUT query and streams output to a writer.
1202 ///
1203 /// Unlike [`copy_out`](Self::copy_out) which collects all data into memory,
1204 /// this method streams each `CopyData` chunk directly to the provided writer,
1205 /// keeping memory usage constant regardless of result size.
1206 ///
1207 /// Returns the total number of bytes written.
1208 ///
1209 /// # Example
1210 ///
1211 /// ```no_run
1212 /// # use hyperdb_api_core::client::Client;
1213 /// # fn example(client: &Client) -> hyperdb_api_core::client::Result<()> {
1214 /// let mut file = std::fs::File::create("output.csv")?;
1215 /// let bytes_written = client.copy_out_to_writer(
1216 /// "COPY (SELECT * FROM my_table) TO STDOUT WITH (FORMAT csv, HEADER true)",
1217 /// &mut file,
1218 /// )?;
1219 /// println!("Wrote {} bytes", bytes_written);
1220 /// # Ok(())
1221 /// # }
1222 /// ```
1223 ///
1224 /// # Errors
1225 ///
1226 /// Same failure modes as [`Self::copy_out`], plus [`Error`] (I/O)
1227 /// when the supplied `writer` returns an error while receiving
1228 /// COPY chunks.
1229 pub fn copy_out_to_writer(&self, query: &str, writer: &mut dyn std::io::Write) -> Result<u64> {
1230 let mut conn = self.lock_connection()?;
1231 conn.copy_out_to_writer(query, writer)
1232 }
1233}
1234
1235impl Cancellable for Client {
1236 /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
1237 /// fresh connection. Swallows errors (logged via `tracing::warn!`)
1238 /// because cancellation is a best-effort signal and callers cannot
1239 /// meaningfully recover from a failed cancel.
1240 fn cancel(&self) {
1241 if let Err(e) = Client::cancel(self) {
1242 warn!(
1243 target: "hyperdb_api_core::client",
1244 error = %e,
1245 process_id = self.process_id,
1246 "cancel request failed (best-effort, swallowed)",
1247 );
1248 }
1249 }
1250}
1251
1252/// A writer for COPY IN operations.
1253///
1254/// This struct holds the connection lock while sending data to ensure
1255/// exclusive access during the COPY operation.
1256#[derive(Debug)]
1257pub struct CopyInWriter<'a> {
1258 connection: MutexGuard<'a, RawConnection<SyncStream>>,
1259}
1260
1261impl CopyInWriter<'_> {
1262 /// Sends a chunk of COPY data.
1263 ///
1264 /// The data should be in `HyperBinary` format. For best performance,
1265 /// batch multiple rows into larger chunks (e.g., 1-16 MB).
1266 ///
1267 /// # Errors
1268 ///
1269 /// Returns [`Error`] (I/O) if the wire write of the `CopyData` frame
1270 /// fails.
1271 pub fn send(&mut self, data: &[u8]) -> Result<()> {
1272 self.connection.send_copy_data(data)
1273 }
1274
1275 /// Flushes any buffered data to the server.
1276 ///
1277 /// # Errors
1278 ///
1279 /// Returns [`Error`] (I/O) if flushing the internal write buffer to
1280 /// the transport fails.
1281 pub fn flush(&mut self) -> Result<()> {
1282 self.connection.flush()
1283 }
1284
1285 /// Sends COPY data directly to the stream without internal buffering.
1286 ///
1287 /// This writes data directly to the TCP stream, letting the kernel handle
1288 /// buffering. More efficient for streaming large amounts of data.
1289 /// Call `flush_stream()` periodically to ensure data is sent.
1290 ///
1291 /// # Errors
1292 ///
1293 /// Returns [`Error`] (I/O) if writing to the underlying stream fails.
1294 pub fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1295 self.connection.send_copy_data_direct(data)
1296 }
1297
1298 /// Flushes the TCP stream.
1299 ///
1300 /// Use with `send_direct()` to periodically ensure data reaches the server.
1301 ///
1302 /// # Errors
1303 ///
1304 /// Returns [`Error`] (I/O) if flushing the underlying transport
1305 /// fails.
1306 pub fn flush_stream(&mut self) -> Result<()> {
1307 self.connection.flush_stream()
1308 }
1309
1310 /// Reserves capacity in the write buffer to avoid reallocations.
1311 ///
1312 /// Call this before bulk operations to pre-allocate buffer space.
1313 pub fn reserve_buffer(&mut self, capacity: usize) {
1314 self.connection.reserve_write_buffer(capacity);
1315 }
1316
1317 /// Finishes the COPY operation successfully.
1318 ///
1319 /// Returns the number of rows inserted.
1320 ///
1321 /// # Errors
1322 ///
1323 /// - Returns [`Error`] (I/O) if writing `CopyDone` or flushing the
1324 /// transport fails.
1325 /// - Returns [`Error`] (server) if the server reports a COPY-side
1326 /// failure (constraint violation, type coercion error, etc.)
1327 /// via an `ErrorResponse` after `CopyDone`.
1328 pub fn finish(mut self) -> Result<u64> {
1329 self.connection.finish_copy()
1330 }
1331
1332 /// Cancels the COPY operation.
1333 ///
1334 /// All data sent so far will be discarded.
1335 ///
1336 /// # Errors
1337 ///
1338 /// Returns [`Error`] (I/O) if writing the `CopyFail` frame or
1339 /// flushing the transport fails, or [`Error`] (server) if the server
1340 /// reports an unexpected status after the cancel.
1341 pub fn cancel(mut self, reason: &str) -> Result<()> {
1342 self.connection.cancel_copy(reason)
1343 }
1344}
1345
1346/// Parses affected row count from a command tag.
1347fn parse_affected_rows(tag: &str) -> Option<u64> {
1348 let parts: Vec<&str> = tag.split_whitespace().collect();
1349
1350 match parts.first()? {
1351 &"INSERT" => {
1352 // INSERT oid count
1353 parts.get(2)?.parse().ok()
1354 }
1355 &"UPDATE" | &"DELETE" | &"SELECT" | &"COPY" => {
1356 // UPDATE/DELETE/SELECT/COPY count
1357 parts.get(1)?.parse().ok()
1358 }
1359 _ => None,
1360 }
1361}
1362
1363/// Streaming iterator for query results without materializing all rows.
1364///
1365/// Holding a `QueryStream` keeps the underlying [`RawConnection`] locked
1366/// via a `MutexGuard`. Dropping the stream before fully iterating triggers
1367/// a server-side cancel (see [`Drop`] below) so the connection is returned
1368/// to the pool cleanly.
1369pub struct QueryStream<'a> {
1370 conn: Option<MutexGuard<'a, RawConnection<SyncStream>>>,
1371 /// Best-effort cancel handle, used in [`Drop`] when the stream is
1372 /// abandoned before completion. For the current sync client this is
1373 /// the owning [`Client`] itself (which implements [`Cancellable`] via
1374 /// a PG wire `CancelRequest` on a fresh connection). When a gRPC
1375 /// equivalent lands it will plug in the same trait with a
1376 /// `cancel_query(query_id)` RPC implementation.
1377 canceller: &'a dyn Cancellable,
1378 finished: bool,
1379 chunk_size: usize,
1380 schema: Option<Vec<super::statement::Column>>,
1381 schema_read: bool,
1382}
1383
1384impl std::fmt::Debug for QueryStream<'_> {
1385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1386 f.debug_struct("QueryStream")
1387 .field("finished", &self.finished)
1388 .field("chunk_size", &self.chunk_size)
1389 .field("schema_read", &self.schema_read)
1390 .finish_non_exhaustive()
1391 }
1392}
1393
1394impl Drop for QueryStream<'_> {
1395 fn drop(&mut self) {
1396 // If the caller exhausted the stream, the connection is already at
1397 // `ReadyForQuery` — nothing to do.
1398 if self.finished {
1399 return;
1400 }
1401
1402 // Otherwise: the server is still happily streaming rows (potentially
1403 // millions of them) for a query we no longer care about. Passively
1404 // draining would waste bandwidth and could block the destructor for
1405 // a very long time. Instead we send a transport-appropriate cancel
1406 // signal — on PG wire this is a `CancelRequest` packet on a fresh
1407 // connection; on gRPC (future) it will be a `cancel_query` RPC on
1408 // the shared channel. `Cancellable::cancel` is fire-and-forget and
1409 // cannot fail.
1410 self.canceller.cancel();
1411
1412 // After cancel, the server stops producing new rows and emits
1413 // `ErrorResponse(QueryCanceled) + ReadyForQuery` promptly. We still
1414 // need to drain those trailing messages off the wire so the
1415 // connection returns to the pool cleanly. The budget here is small
1416 // because we only expect: (a) whatever rows the server had already
1417 // flushed before seeing the cancel, (b) `ErrorResponse`,
1418 // (c) `ReadyForQuery`. A well-behaved server reaches `ReadyForQuery`
1419 // within a handful of messages. If that budget is somehow exceeded
1420 // the bounded drain logs a warning and marks the connection
1421 // desynchronized — downstream users will surface the desync as a
1422 // transport-level failure and reconnect.
1423 const POST_CANCEL_DRAIN_CAP: usize = 1024;
1424 if let Some(ref mut conn) = self.conn {
1425 let _ok = conn.drain_until_ready_bounded(POST_CANCEL_DRAIN_CAP);
1426 }
1427 }
1428}
1429
1430impl QueryStream<'_> {
1431 /// Returns the schema (column metadata) for the result set.
1432 #[must_use]
1433 pub fn schema(&self) -> Option<&[super::statement::Column]> {
1434 self.schema.as_deref()
1435 }
1436
1437 /// Retrieves the next chunk of rows (up to `chunk_size`).
1438 ///
1439 /// # Errors
1440 ///
1441 /// - Returns [`Error`] (I/O) if reading from the underlying transport
1442 /// fails while awaiting the next protocol message.
1443 /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
1444 /// during streaming (for example, a server-side execution failure
1445 /// encountered partway through the result set).
1446 pub fn next_chunk(&mut self) -> Result<Option<Vec<StreamRow>>> {
1447 if self.finished {
1448 return Ok(None);
1449 }
1450
1451 let Some(conn) = self.conn.as_mut() else {
1452 return Ok(None);
1453 };
1454
1455 let mut rows = Vec::with_capacity(self.chunk_size);
1456 while rows.len() < self.chunk_size {
1457 let msg = conn.read_message()?;
1458 match msg {
1459 Message::RowDescription(desc) if !self.schema_read => {
1460 let mut cols = Vec::new();
1461 for f in desc.fields().filter_map(std::result::Result::ok) {
1462 cols.push(super::statement::Column::new(
1463 f.name().to_string(),
1464 f.type_oid(),
1465 f.type_modifier(),
1466 super::statement::ColumnFormat::from_code(f.format()),
1467 ));
1468 }
1469 self.schema = Some(cols);
1470 self.schema_read = true;
1471 }
1472 Message::DataRow(data) => {
1473 rows.push(StreamRow::new(data));
1474 if rows.len() >= self.chunk_size {
1475 return Ok(Some(rows));
1476 }
1477 }
1478 Message::ReadyForQuery(_) => {
1479 self.finished = true;
1480 self.conn = None;
1481 return if rows.is_empty() {
1482 Ok(None)
1483 } else {
1484 Ok(Some(rows))
1485 };
1486 }
1487 Message::ErrorResponse(body) => {
1488 // Mark the stream finished *before* touching the
1489 // connection so the `Drop` impl's Cancellable-based
1490 // cleanup path is trivially a no-op regardless of what
1491 // happens next (normal return, `?`, panic in drain,
1492 // future refactors that insert early returns, etc).
1493 //
1494 // `&mut self` exclusivity means `Drop` can't fire
1495 // concurrently with this method, so this is purely a
1496 // defensive-ordering improvement — but it also matches
1497 // the `ReadyForQuery` arm above, which sets
1498 // `finished = true` first for the same reason. Keeping
1499 // both terminal arms in the same order makes the
1500 // "terminal state is committed before cleanup" rule
1501 // visible at a glance.
1502 self.finished = true;
1503 // Drain through `ReadyForQuery` before releasing the
1504 // pooled connection so the next user sees a clean wire
1505 // state. `consume_error` swallows any drain I/O errors
1506 // via tracing::warn — the caller's original error is
1507 // more informative than a transport hiccup during
1508 // cleanup.
1509 let err = match self.conn {
1510 Some(ref mut c) => c.consume_error(&body),
1511 None => parse_error_response(&body),
1512 };
1513 self.conn = None;
1514 return Err(err);
1515 }
1516 _ => {}
1517 }
1518 }
1519 Ok(Some(rows))
1520 }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525 use super::*;
1526
1527 #[test]
1528 fn test_parse_affected_rows() {
1529 assert_eq!(parse_affected_rows("INSERT 0 5"), Some(5));
1530 assert_eq!(parse_affected_rows("UPDATE 10"), Some(10));
1531 assert_eq!(parse_affected_rows("DELETE 3"), Some(3));
1532 assert_eq!(parse_affected_rows("SELECT 100"), Some(100));
1533 assert_eq!(parse_affected_rows("CREATE TABLE"), None);
1534 }
1535
1536 #[test]
1537 fn test_copy_in_raw_rejects_non_copy_query() {
1538 // We can't test with a real connection, but we can verify the prefix guard
1539 // works by checking the error message. Since Client::connect fails without
1540 // a server, we test the validation logic directly.
1541 let query = "SELECT * FROM users";
1542 assert!(
1543 !query.trim_start().to_ascii_uppercase().starts_with("COPY"),
1544 "Non-COPY query should not pass the COPY prefix check"
1545 );
1546
1547 let copy_query = "COPY \"users\" FROM STDIN WITH (FORMAT csv)";
1548 assert!(
1549 copy_query
1550 .trim_start()
1551 .to_ascii_uppercase()
1552 .starts_with("COPY"),
1553 "COPY query should pass the prefix check"
1554 );
1555
1556 // Leading whitespace should be accepted
1557 let padded = " COPY \"users\" FROM STDIN";
1558 assert!(padded.trim_start().to_ascii_uppercase().starts_with("COPY"));
1559
1560 // Case-insensitive
1561 let lowercase = "copy \"users\" FROM STDIN";
1562 assert!(lowercase
1563 .trim_start()
1564 .to_ascii_uppercase()
1565 .starts_with("COPY"));
1566 }
1567}