hyperdb_api_core/client/connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Low-level synchronous connection handling over the `PostgreSQL` wire protocol.
5//!
6//! [`RawConnection`] provides the message-level interface to a Hyper server:
7//! startup/authentication handshake, simple and extended query execution, and
8//! the COPY data path. It is generic over the transport stream `S` (TCP,
9//! Unix domain socket, named pipe, or TLS-wrapped variants).
10//!
11//! # Wire Protocol Overview
12//!
13//! Communication follows the `PostgreSQL` v3 message protocol. Each message has a
14//! 1-byte type tag, a 4-byte length (including itself), and a variable-length
15//! body. The connection maintains separate read and write `BytesMut` buffers to
16//! amortize syscall overhead.
17//!
18//! ## Query Protocols
19//!
20//! - **Simple Query**: A single `Query('Q')` message; the server responds with
21//! `RowDescription`, zero or more `DataRow`, `CommandComplete`, and
22//! `ReadyForQuery`. Results are in text format.
23//!
24//! - **Extended Query (`HyperBinary`)**: `Parse` / `Bind` / `Describe` /
25//! `Execute` / `Sync` sequence requesting format code 2 (`HyperBinary`,
26//! little-endian binary). Results are zero-copy-friendly `DataRow` messages.
27//! Used by [`Client::query_fast`](crate::client::Client::query_fast) and
28//! [`Client::query_streaming`](crate::client::Client::query_streaming).
29//!
30//! ## Connection Health
31//!
32//! A `desynchronized` flag tracks whether the wire protocol has fallen out of
33//! sync (e.g., a bounded drain exceeded its budget). Once set, all subsequent
34//! operations fast-fail. The only recovery is to drop the connection and
35//! open a new one. Pool layers should check [`RawConnection::is_healthy`]
36//! during recycle.
37
38use std::io::{Read, Write};
39
40use bytes::BytesMut;
41use tracing::{debug, info, trace, warn};
42
43use crate::protocol::message::{backend::Message, frontend};
44
45use super::auth::{self, AuthState};
46use super::error::{Error, Result};
47
48/// Maximum number of messages [`RawConnection::consume_error`] (and its
49/// async sibling) will read while draining the tail of a failed request.
50///
51/// A well-behaved server emits only a handful of messages after
52/// `ErrorResponse` before `ReadyForQuery` (typically just the error +
53/// `Z`, with the occasional `NoticeResponse` interleaved). This cap is
54/// therefore orders of magnitude above anything a legitimate error path
55/// produces; it exists purely as a defensive safety valve against broken
56/// server implementations and stalled network paths. When exceeded, the
57/// drain logs a `tracing::warn!` and returns — the next operation on the
58/// connection will see the resulting desync and trigger a reconnect.
59///
60/// See [`RawConnection::consume_error`] for the full rationale.
61pub const POST_ERROR_DRAIN_CAP: usize = 1024;
62
63/// A raw connection to a Hyper server.
64///
65/// This handles the low-level protocol communication, including:
66/// - Message framing (reading/writing `PostgreSQL` wire protocol messages)
67/// - Authentication handshake
68/// - Query execution (simple and extended query protocols)
69/// - COPY protocol support
70///
71/// The connection is generic over the stream type `S`, allowing it to work
72/// with different transport mechanisms (TCP, TLS, etc.) as long as they
73/// implement `Read + Write`.
74///
75/// # Buffering
76///
77/// The connection maintains separate read and write buffers for efficient
78/// I/O. Messages are buffered before being sent, and incoming data is
79/// buffered until complete messages can be parsed.
80#[derive(Debug)]
81pub struct RawConnection<S> {
82 /// The underlying I/O stream.
83 stream: S,
84 /// Buffer for reading incoming messages from the server.
85 read_buf: BytesMut,
86 /// Buffer for writing outgoing messages to the server.
87 write_buf: BytesMut,
88 /// Backend process ID (for cancel requests).
89 process_id: i32,
90 /// Secret key for authenticating cancel requests.
91 secret_key: i32,
92 /// Server parameters received during startup (e.g., `server_version`, `session_identifier`).
93 server_params: std::collections::HashMap<String, String>,
94 /// Sticky flag set when the wire protocol has fallen out of sync with
95 /// the server (e.g. a bounded drain exhausted its budget before seeing
96 /// `ReadyForQuery`). Once set it is never cleared — the only valid
97 /// recovery is to discard this connection and open a new one.
98 ///
99 /// Every public method that initiates a new server request checks
100 /// this flag via [`Self::ensure_healthy`] and fast-fails with a
101 /// clear error instead of sending bytes into a known-poisoned
102 /// channel. Pool layers should call [`Self::is_healthy`] during
103 /// recycle to skip the health-probe roundtrip for connections that
104 /// are already known-bad.
105 desynchronized: bool,
106}
107
108impl<S> RawConnection<S>
109where
110 S: Read + Write,
111{
112 /// Creates a new raw connection from a stream.
113 ///
114 /// Initializes read and write buffers with default capacity (64 KB each).
115 /// The connection is not yet authenticated - call `startup()` to begin
116 /// the connection handshake.
117 ///
118 /// # Arguments
119 ///
120 /// * `stream` - The I/O stream (must implement `Read + Write`)
121 pub fn new(stream: S) -> Self {
122 RawConnection {
123 stream,
124 read_buf: BytesMut::with_capacity(64 * 1024),
125 write_buf: BytesMut::with_capacity(64 * 1024),
126 process_id: 0,
127 secret_key: 0,
128 server_params: std::collections::HashMap::new(),
129 desynchronized: false,
130 }
131 }
132
133 /// Returns `true` if this connection is still in a known-good state
134 /// and safe to use for new requests.
135 ///
136 /// Once the wire protocol falls out of sync with the server (see the
137 /// `desynchronized` field on [`RawConnection`]), this returns `false`
138 /// permanently — the only recovery is to drop this connection and
139 /// open a new one. Pool implementations should consult this before
140 /// running a recycle health probe to avoid spending a roundtrip on a
141 /// connection that is already known to be bad.
142 pub fn is_healthy(&self) -> bool {
143 !self.desynchronized
144 }
145
146 /// Fast-fails with an explicit [`ErrorKind::Connection`] error if the
147 /// wire has fallen out of sync with the server, before any bytes are
148 /// written to the stream. Called from the entry point of every public
149 /// method that initiates a new server request — simple queries,
150 /// streaming queries, prepared statement execution, COPY in/out,
151 /// etc. — so that a desynchronized connection produces a clear
152 /// "connection unusable" diagnostic at the API boundary rather than a
153 /// cryptic protocol-parse error deep inside the message loop of the
154 /// *next* unrelated operation.
155 pub(crate) fn ensure_healthy(&self) -> Result<()> {
156 if self.desynchronized {
157 return Err(Error::new(
158 crate::client::error::ErrorKind::Connection,
159 "connection is desynchronized from the server and cannot be reused; \
160 discard it and open a new one",
161 ));
162 }
163 Ok(())
164 }
165
166 /// Reserves capacity in the write buffer to avoid reallocations.
167 ///
168 /// Call this before bulk operations to pre-allocate buffer space.
169 /// This is useful for high-throughput scenarios where buffer growth
170 /// would cause performance overhead.
171 pub fn reserve_write_buffer(&mut self, additional: usize) {
172 self.write_buf.reserve(additional);
173 }
174
175 /// Returns the process ID assigned by the server.
176 pub fn process_id(&self) -> i32 {
177 self.process_id
178 }
179
180 /// Returns the secret key for cancel requests.
181 pub fn secret_key(&self) -> i32 {
182 self.secret_key
183 }
184
185 /// Returns a reference to the underlying stream.
186 pub fn stream(&self) -> &S {
187 &self.stream
188 }
189
190 /// Returns a mutable reference to the underlying stream.
191 pub fn stream_mut(&mut self) -> &mut S {
192 &mut self.stream
193 }
194
195 /// Returns a server parameter value by name.
196 ///
197 /// Server parameters are sent by the server during connection startup.
198 /// Common parameters include:
199 /// - `server_version` - The server version string
200 /// - `server_encoding` - The server's character encoding
201 /// - `client_encoding` - The client's character encoding
202 pub fn parameter_status(&self, name: &str) -> Option<&str> {
203 self.server_params
204 .get(name)
205 .map(std::string::String::as_str)
206 }
207
208 /// Sends a startup message and performs initial handshake.
209 ///
210 /// # Errors
211 ///
212 /// - Returns [`Error`] (auth) when the server requests an
213 /// auth method (cleartext, MD5, SASL) and no password is supplied,
214 /// when the offered SASL mechanisms exclude SCRAM-SHA-256, or when
215 /// SCRAM state is missing at the SASL-continue / SASL-final step.
216 /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
217 /// during startup (for example, unknown user or database).
218 /// - Returns [`Error`] (protocol) if a message arrives out of the
219 /// expected startup sequence.
220 /// - Returns [`Error`] (I/O) on wire-protocol read/write failure.
221 pub fn startup(&mut self, params: &[(&str, &str)], password: Option<&str>) -> Result<()> {
222 // Send startup message
223 frontend::startup_message(params, &mut self.write_buf)?;
224 self.flush()?;
225
226 // Handle authentication
227 let mut auth_state: Option<AuthState> = None;
228
229 loop {
230 let msg = self.read_message()?;
231 match msg {
232 Message::AuthenticationOk => {
233 info!(target: "hyperdb_api", "connection-auth-success");
234 }
235 Message::AuthenticationCleartextPassword => {
236 debug!(target: "hyperdb_api", method = "cleartext", "connection-auth-method");
237 let password = password.ok_or_else(|| {
238 Error::authentication(
239 "server requested cleartext password but none provided",
240 )
241 })?;
242 frontend::password_message(password, &mut self.write_buf)?;
243 self.flush()?;
244 }
245 Message::AuthenticationMd5Password(body) => {
246 debug!(target: "hyperdb_api", method = "MD5", "connection-auth-method");
247 let password = password.ok_or_else(|| {
248 Error::authentication("server requested MD5 password but none provided")
249 })?;
250 let user = params
251 .iter()
252 .find(|(k, _)| *k == "user")
253 .map_or("", |(_, v)| *v);
254
255 let md5_response = auth::compute_md5_password(user, password, &body.salt());
256 frontend::password_message(&md5_response, &mut self.write_buf)?;
257 self.flush()?;
258 }
259 Message::AuthenticationSasl(body) => {
260 debug!(target: "hyperdb_api", method = "SCRAM-SHA-256", "connection-auth-method");
261 let password = password.ok_or_else(|| {
262 Error::authentication(
263 "server requested SASL authentication but no password provided",
264 )
265 })?;
266
267 // Check for SCRAM-SHA-256
268 let mechanisms: Vec<&str> = body.mechanisms().collect();
269 if !mechanisms.contains(&"SCRAM-SHA-256") {
270 return Err(Error::authentication(format!(
271 "server offered unsupported SASL mechanisms: {mechanisms:?}"
272 )));
273 }
274
275 // Start SCRAM-SHA-256 exchange
276 let (state, client_first) = auth::scram_client_first(password)?;
277 auth_state = Some(state);
278
279 frontend::sasl_initial_response(
280 "SCRAM-SHA-256",
281 &client_first,
282 &mut self.write_buf,
283 )?;
284 self.flush()?;
285 }
286 Message::AuthenticationSaslContinue(body) => {
287 let state = auth_state.take().ok_or_else(|| {
288 Error::authentication("received SASL continue without initial state")
289 })?;
290
291 let server_first = body.data();
292 let (new_state, client_final) = auth::scram_client_final(state, server_first)?;
293 auth_state = Some(new_state);
294
295 frontend::sasl_response(&client_final, &mut self.write_buf)?;
296 self.flush()?;
297 }
298 Message::AuthenticationSaslFinal(body) => {
299 let state = auth_state.take().ok_or_else(|| {
300 Error::authentication("received SASL final without state")
301 })?;
302
303 // Verify server signature
304 auth::scram_verify_server(state, body.data())?;
305 }
306 Message::BackendKeyData(data) => {
307 self.process_id = data.process_id();
308 self.secret_key = data.secret_key();
309 }
310 Message::ParameterStatus(body) => {
311 // Store server parameters
312 if let (Ok(name), Ok(value)) = (body.name(), body.value()) {
313 self.server_params
314 .insert(name.to_string(), value.to_string());
315 }
316 }
317 Message::ReadyForQuery(_) => {
318 // Connection is ready
319 return Ok(());
320 }
321 Message::ErrorResponse(body) => {
322 // Startup typically fails the connection outright, but we
323 // still drain in case the server sent any trailing
324 // messages so parity with the async startup path is
325 // preserved. The drain is bounded implicitly: a post-
326 // startup-error server either sends ReadyForQuery or
327 // closes immediately, so drain_until_ready returns fast.
328 return Err(self.consume_error(&body));
329 }
330 _ => {
331 return Err(Error::protocol("unexpected message during startup"));
332 }
333 }
334 }
335 }
336
337 /// Sends a simple query and returns all messages until `ReadyForQuery`.
338 ///
339 /// # Errors
340 ///
341 /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
342 /// (SQL error, constraint violation, etc.).
343 /// - Returns [`Error`] (I/O) on transport read/write failure.
344 /// - Returns [`Error`] (closed) if the server closes the connection
345 /// mid-query.
346 /// - Returns [`Error`] (connection) if the connection has already
347 /// been marked unhealthy by a prior failure.
348 pub fn simple_query(&mut self, query: &str) -> Result<Vec<Message>> {
349 self.ensure_healthy()?;
350 frontend::query(query, &mut self.write_buf)?;
351 self.flush()?;
352
353 let mut messages = Vec::new();
354 loop {
355 let msg = self.read_message()?;
356 match &msg {
357 Message::ReadyForQuery(_) => {
358 messages.push(msg);
359 return Ok(messages);
360 }
361 Message::ErrorResponse(body) => {
362 return Err(self.consume_error(body));
363 }
364 _ => {
365 messages.push(msg);
366 }
367 }
368 }
369 }
370
371 /// Sends a query using extended protocol with binary format results.
372 ///
373 /// This uses the `PostgreSQL` extended query protocol (Parse/Bind/Execute/Sync)
374 /// with `HyperBinary` format (format code 2) for maximum performance.
375 ///
376 /// Returns all messages until `ReadyForQuery`.
377 ///
378 /// # Errors
379 ///
380 /// Same failure modes as [`Self::simple_query`] — server-side SQL
381 /// errors surface as [`Error`] (server), transport failures as
382 /// [`Error`] (I/O) / [`Error`] (closed), and an unhealthy prior state
383 /// as [`Error`] (connection).
384 pub fn query_binary(&mut self, query: &str) -> Result<Vec<Message>> {
385 self.ensure_healthy()?;
386 // HyperBinary format code
387 const HYPER_BINARY_FORMAT: i16 = 2;
388
389 // Parse: prepare an unnamed statement
390 frontend::parse("", query, &[], &mut self.write_buf)?;
391
392 // Bind: bind unnamed portal with HyperBinary result format
393 // Empty arrays for param_formats and params (no parameters)
394 // Single result_format of 2 (HyperBinary) applies to all columns
395 frontend::bind(
396 "",
397 "",
398 &[],
399 &[],
400 &[HYPER_BINARY_FORMAT],
401 &mut self.write_buf,
402 )?;
403
404 // Describe: get column metadata (optional but useful)
405 frontend::describe(b'P', "", &mut self.write_buf)?;
406
407 // Execute: run the unnamed portal with no row limit
408 frontend::execute("", 0, &mut self.write_buf)?;
409
410 // Sync: end the extended query sequence
411 frontend::sync(&mut self.write_buf);
412
413 self.flush()?;
414
415 let mut messages = Vec::new();
416 loop {
417 let msg = self.read_message()?;
418 match &msg {
419 Message::ReadyForQuery(_) => {
420 messages.push(msg);
421 return Ok(messages);
422 }
423 Message::ErrorResponse(body) => {
424 return Err(self.consume_error(body));
425 }
426 _ => {
427 messages.push(msg);
428 }
429 }
430 }
431 }
432
433 /// Starts a binary query but leaves result consumption to the caller.
434 ///
435 /// This is useful for streaming scenarios where you want to pull messages
436 /// incrementally instead of materializing the full result set up front.
437 ///
438 /// # Errors
439 ///
440 /// - Returns [`Error`] (connection) if the connection has been
441 /// marked unhealthy.
442 /// - Returns [`Error`] (I/O) if writing the Parse/Bind/Execute/Sync
443 /// sequence to the transport fails.
444 pub fn start_query_binary(&mut self, query: &str) -> Result<()> {
445 self.ensure_healthy()?;
446 const HYPER_BINARY_FORMAT: i16 = 2;
447
448 frontend::parse("", query, &[], &mut self.write_buf)?;
449 frontend::bind(
450 "",
451 "",
452 &[],
453 &[],
454 &[HYPER_BINARY_FORMAT],
455 &mut self.write_buf,
456 )?;
457 frontend::describe(b'P', "", &mut self.write_buf)?;
458 frontend::execute("", 0, &mut self.write_buf)?;
459 frontend::sync(&mut self.write_buf);
460
461 self.flush()
462 }
463
464 /// Starts a simple query but leaves result consumption to the caller.
465 ///
466 /// This is useful for streaming scenarios where you want to pull messages
467 /// incrementally instead of materializing the full result set up front.
468 ///
469 /// # Errors
470 ///
471 /// Same failure modes as [`Self::start_query_binary`] —
472 /// [`Error`] (connection) for unhealthy state, [`Error`] (I/O) for
473 /// transport failure.
474 pub fn start_simple_query(&mut self, query: &str) -> Result<()> {
475 self.ensure_healthy()?;
476 frontend::query(query, &mut self.write_buf)?;
477 self.flush()
478 }
479
480 /// Starts an **execute** of a prepared statement but leaves result
481 /// consumption to the caller.
482 ///
483 /// Sends `Bind` + `Execute(unnamed_portal, 0)` + `Sync`, then
484 /// returns. The caller drives a message loop that reads
485 /// `BindComplete`, any `DataRow`s, then `CommandComplete` +
486 /// `ReadyForQuery` — the same shape used by
487 /// [`Self::start_query_binary`].
488 ///
489 /// Format codes (`PostgreSQL` wire protocol):
490 /// - **Parameters**: format `1` (standard PG binary, big-endian).
491 /// Hyper's server-side Bind decodes bound parameters as standard
492 /// PG binary regardless of the format code we advertise. The
493 /// caller is responsible for supplying parameter bytes in BE.
494 /// - **Results**: format `2` (`HyperBinary`, little-endian). Hyper
495 /// supports this as a separate protocol extension; the row
496 /// decoders in [`super::row::StreamRow`] and the hyperdb-api `Row`
497 /// type all expect LE, so requesting LE at Bind time avoids an
498 /// extra conversion pass.
499 ///
500 /// `max_rows = 0` means "send all rows" — we pace on the client side
501 /// by reading `DataRows` in chunks from the read buffer.
502 ///
503 /// # Errors
504 ///
505 /// - Returns [`Error`] (connection) if the connection has been
506 /// marked unhealthy.
507 /// - Returns [`Error`] (I/O) if writing the Bind/Execute/Sync
508 /// sequence to the transport fails.
509 pub fn start_execute_prepared(
510 &mut self,
511 statement_name: &str,
512 params: &[Option<&[u8]>],
513 column_count: usize,
514 ) -> Result<()> {
515 self.ensure_healthy()?;
516
517 const PG_BINARY_FORMAT: i16 = 1;
518 const HYPER_BINARY_FORMAT: i16 = 2;
519 let param_formats: Vec<i16> = vec![PG_BINARY_FORMAT; params.len()];
520 let result_formats: Vec<i16> = vec![HYPER_BINARY_FORMAT; column_count];
521
522 frontend::bind(
523 "", // unnamed portal
524 statement_name,
525 ¶m_formats,
526 params,
527 &result_formats,
528 &mut self.write_buf,
529 )?;
530 frontend::execute("", 0, &mut self.write_buf)?;
531 frontend::sync(&mut self.write_buf);
532
533 self.flush()
534 }
535
536 /// Reads a single message from the server.
537 ///
538 /// # Errors
539 ///
540 /// - Returns [`Error`] (I/O) if reading from the transport fails or
541 /// if [`Message::parse`] reports a malformed frame.
542 /// - Returns [`Error`] (closed) when the transport reaches EOF
543 /// (server closed the connection).
544 pub fn read_message(&mut self) -> Result<Message> {
545 loop {
546 if let Some(msg) = Message::parse(&mut self.read_buf).map_err(Error::io)? {
547 return Ok(msg);
548 }
549
550 // Need more data — read directly into the spare capacity of
551 // `read_buf`, no temporary buffer or `extend_from_slice` memcpy.
552 //
553 // The 64 KiB read-window matches the typical TCP loopback
554 // segment size and the default DataRow streaming chunk. On
555 // Windows TCP loopback the per-`WSARecv` overhead is several
556 // times higher than Linux/macOS `recv`, so a tight ceiling here
557 // dominates wall time on long scans. The previous 8 KiB ceiling
558 // forced an 8× syscall amplification on this path.
559 //
560 // Implementation: `resize` extends the buffer with zeroed bytes
561 // (single memset, ~50 GB/s on modern CPUs), `read` writes into
562 // the new tail, then `truncate` shrinks to the actual byte
563 // count. This is safe Rust and results in exactly one memset
564 // per syscall — no heap alloc, no extra memcpy.
565 let prev_len = self.read_buf.len();
566 self.read_buf.resize(prev_len + 64 * 1024, 0);
567 let n = self.stream.read(&mut self.read_buf[prev_len..])?;
568 if n == 0 {
569 self.read_buf.truncate(prev_len);
570 warn!(target: "hyperdb_api", "connection-closed");
571 return Err(Error::closed());
572 }
573 self.read_buf.truncate(prev_len + n);
574 }
575 }
576
577 /// Drains messages from the server until a [`Message::ReadyForQuery`] is
578 /// seen, discarding them. Call this after receiving an
579 /// [`Message::ErrorResponse`] to stay in sync with the wire protocol.
580 ///
581 /// Per the `PostgreSQL` wire protocol, every query (simple or extended) ends
582 /// with `ReadyForQuery`, even if the statement failed. Without this drain,
583 /// the `ReadyForQuery` (and any other trailing messages) remain in the
584 /// read buffer and get consumed by the next operation's response parser,
585 /// which misinterprets them — classic wire desync.
586 ///
587 /// This is the **unbounded** variant, safe to use in the standard
588 /// error path where the server has already sent `ErrorResponse` and
589 /// `ReadyForQuery` is guaranteed to arrive within a few messages. In
590 /// exceptional cases where the drain might take arbitrarily long —
591 /// most notably the `Drop` path for a streaming result that the caller
592 /// abandoned mid-way — prefer
593 /// [`drain_until_ready_bounded`](Self::drain_until_ready_bounded) to
594 /// avoid blocking indefinitely on an unresponsive server.
595 ///
596 /// Drain errors (connection already closed, I/O failure mid-drain) are
597 /// logged via `tracing::warn!` and then swallowed. The caller's original
598 /// error is more informative to surface, and a dead connection will be
599 /// reported on the next real operation anyway.
600 pub fn drain_until_ready(&mut self) {
601 let _ = self.drain_until_ready_bounded(usize::MAX);
602 }
603
604 /// Bounded version of [`drain_until_ready`](Self::drain_until_ready) that
605 /// stops after reading at most `max_messages` messages. Returns `true`
606 /// when `ReadyForQuery` was observed within that budget; `false` if the
607 /// budget was exhausted first or an I/O error occurred before reaching it.
608 ///
609 /// # Why we do not send `Sync` before draining
610 ///
611 /// A natural question is whether to send a `Sync` message first to prompt
612 /// the server to emit `ReadyForQuery` sooner. The answer for Hyper is
613 /// **no** — it would actively corrupt the next query.
614 ///
615 /// Per the Hyper server state machine (see `LibpqConnection::handleSync`
616 /// and `handleQueryDone`), every query — simple or extended — already
617 /// ends with exactly one `ReadyForQuery` emission. After an error or
618 /// normal completion the server returns to its main loop. If we then
619 /// send a `Sync`, `handleSync` would emit an **additional**
620 /// `ReadyForQuery` that no current operation is reading, and the next
621 /// query's response parser would consume that stale `ReadyForQuery`
622 /// as its own terminator — the symptom is that query returning an
623 /// empty result with "Query returned no rows".
624 ///
625 /// For the abandoned-stream case (a long-running query that the client
626 /// stopped reading), `Sync` also does not help: Hyper processes the
627 /// incoming byte stream in order, so `Sync` is only handled *after*
628 /// the in-flight `Execute` finishes emitting all its `DataRow`s plus
629 /// its own `CommandComplete` and `ReadyForQuery`. By that point the
630 /// drain has already reached `ReadyForQuery`, and the `Sync` produces
631 /// the same extra `ReadyForQuery` contamination described above.
632 ///
633 /// The canonical way to abort a running query is to open a *separate*
634 /// connection and send `CancelRequest` with the original connection's
635 /// process id and secret. That is exactly what
636 /// [`QueryStream`](super::client::QueryStream)'s `Drop` impl does
637 /// (via the [`Cancellable`](super::cancel::Cancellable) trait)
638 /// before calling this bounded drain. Cancel-then-drain converges on
639 /// `ReadyForQuery` within a handful of messages because the server
640 /// stops producing new `DataRow`s once it observes the cancel.
641 ///
642 /// # Poisoned connections
643 ///
644 /// When this returns `false` the connection is in an indeterminate
645 /// state. Callers should treat it as poisoned and not return it to a
646 /// connection pool — the next operation will see residual bytes from
647 /// whatever was still streaming. The bounded variant exists precisely
648 /// to prevent indefinite blocking in contexts like `Drop` impls where
649 /// we don't own the thread's time and can't afford to wait for a
650 /// multi-million-row query result to finish before returning from a
651 /// destructor.
652 ///
653 /// All drain errors are logged via `tracing::warn!` so state-related
654 /// issues are observable in logs even though they don't interrupt the
655 /// caller's control flow.
656 pub fn drain_until_ready_bounded(&mut self, max_messages: usize) -> bool {
657 for i in 0..max_messages {
658 match self.read_message() {
659 Ok(Message::ReadyForQuery(_)) => return true,
660 Ok(_) => {}
661 Err(e) => {
662 warn!(
663 target: "hyperdb_api_core::client",
664 error = %e,
665 messages_read = i,
666 "drain_until_ready: read error mid-drain (likely closed connection); \
667 connection marked desynchronized",
668 );
669 // Whether the underlying error is a closed socket, a
670 // partial read, or a corrupt frame, any subsequent
671 // `read_message` on this connection is operating on
672 // unknown state. Mark it so pool layers and upper APIs
673 // can short-circuit instead of piling another failed
674 // operation on top.
675 self.desynchronized = true;
676 return false;
677 }
678 }
679 }
680 warn!(
681 target: "hyperdb_api_core::client",
682 max_messages,
683 "drain_until_ready_bounded: exhausted budget without seeing ReadyForQuery; \
684 connection marked desynchronized and should not be reused",
685 );
686 // Budget exhausted — residual messages still on the wire. The
687 // next read on this connection will almost certainly misparse
688 // them as belonging to an unrelated operation. Mark it so the
689 // failure surfaces at a well-defined API boundary instead.
690 self.desynchronized = true;
691 false
692 }
693
694 /// Convenience: parse a server [`Message::ErrorResponse`] body into an
695 /// [`Error`] and drain the rest of the response through the trailing
696 /// [`Message::ReadyForQuery`] so the connection is safe to reuse.
697 ///
698 /// Callers should almost always prefer this over calling
699 /// [`drain_until_ready`](Self::drain_until_ready) or
700 /// [`drain_until_ready_bounded`](Self::drain_until_ready_bounded) by
701 /// hand, because forgetting the drain is exactly the bug it exists to
702 /// prevent.
703 ///
704 /// # Drain budget
705 ///
706 /// Uses a bounded drain with a [`POST_ERROR_DRAIN_CAP`]-message budget
707 /// rather than the unbounded [`drain_until_ready`](Self::drain_until_ready).
708 /// A well-behaved server emits only a handful of messages after
709 /// `ErrorResponse` before `ReadyForQuery` — typically just the error
710 /// itself plus the `Z`, occasionally with a few `NoticeResponse`
711 /// messages interleaved — so the cap is orders of magnitude above
712 /// anything a legitimate error path produces. The cap exists purely
713 /// as a defensive safety valve against pathological server behavior
714 /// (a broken backend that never emits `ReadyForQuery`) and
715 /// misbehaved network paths (stalled reads that would otherwise hang
716 /// the caller indefinitely, particularly visible in async contexts).
717 ///
718 /// If the cap is exceeded, `drain_until_ready_bounded` logs a
719 /// `tracing::warn!` and marks the connection desynchronized; the
720 /// next operation on it will surface a transport-level failure and
721 /// trigger reconnect higher up. That is strictly better than
722 /// blocking forever with no observable symptom.
723 ///
724 /// # Example
725 ///
726 /// ```ignore
727 /// match msg {
728 /// Message::ErrorResponse(body) => {
729 /// return Err(self.consume_error(&body));
730 /// }
731 /// // ...
732 /// }
733 /// ```
734 pub fn consume_error(
735 &mut self,
736 body: &crate::protocol::message::backend::ErrorResponseBody,
737 ) -> Error {
738 let err = parse_error_response(body);
739 let _ = self.drain_until_ready_bounded(POST_ERROR_DRAIN_CAP);
740 err
741 }
742
743 /// Flushes the write buffer to the server.
744 ///
745 /// # Errors
746 ///
747 /// Returns [`Error`] (I/O) if writing the buffered bytes or flushing
748 /// the underlying transport fails.
749 pub fn flush(&mut self) -> Result<()> {
750 if !self.write_buf.is_empty() {
751 self.stream.write_all(&self.write_buf)?;
752 self.stream.flush()?;
753 self.write_buf.clear();
754 }
755 Ok(())
756 }
757
758 /// Sends a terminate message and closes the connection.
759 ///
760 /// # Errors
761 ///
762 /// Returns [`Error`] (I/O) if writing the `Terminate` frame or
763 /// flushing the transport fails.
764 pub fn terminate(&mut self) -> Result<()> {
765 frontend::terminate(&mut self.write_buf);
766 self.flush()
767 }
768
769 /// Returns a mutable reference to the write buffer.
770 pub fn write_buf(&mut self) -> &mut BytesMut {
771 &mut self.write_buf
772 }
773
774 /// Initiates a COPY IN operation with `HyperBinary` format.
775 ///
776 /// This sends a COPY ... FROM STDIN query and waits for `CopyInResponse`.
777 /// After this returns successfully, the caller should send data using
778 /// `send_copy_data` and then call `finish_copy` or `cancel_copy`.
779 ///
780 /// # Errors
781 ///
782 /// Same failure modes as [`Self::start_copy_in_with_format`].
783 pub fn start_copy_in(&mut self, table_name: &str, columns: &[&str]) -> Result<()> {
784 self.start_copy_in_with_format(table_name, columns, "HYPERBINARY")
785 }
786
787 /// Initiates a COPY IN operation with a specified format.
788 ///
789 /// This sends a COPY ... FROM STDIN query and waits for `CopyInResponse`.
790 /// After this returns successfully, the caller should send data using
791 /// `send_copy_data` and then call `finish_copy` or `cancel_copy`.
792 ///
793 /// # Arguments
794 ///
795 /// * `table_name` - The target table name (should be properly quoted if needed)
796 /// * `columns` - Column names to insert into
797 /// * `format` - The data format string: "HYPERBINARY" or "ARROWSTREAM"
798 ///
799 /// # Example
800 ///
801 /// ```no_run
802 /// # use hyperdb_api_core::client::connection::RawConnection;
803 /// # use std::net::TcpStream;
804 /// # fn example(conn: &mut RawConnection<TcpStream>) -> hyperdb_api_core::client::Result<()> {
805 /// // For HyperBinary format (default)
806 /// conn.start_copy_in_with_format("my_table", &["col1", "col2"], "HYPERBINARY")?;
807 ///
808 /// // For Arrow IPC stream format
809 /// conn.start_copy_in_with_format("my_table", &["col1", "col2"], "ARROWSTREAM")?;
810 /// # Ok(())
811 /// # }
812 /// ```
813 ///
814 /// # Errors
815 ///
816 /// - Returns [`Error`] (connection) if the connection has been
817 /// marked unhealthy by a prior failure.
818 /// - Returns [`Error`] (server) if the server rejects the generated
819 /// `COPY ... FROM STDIN` statement (missing table, column
820 /// mismatch, etc.).
821 /// - Returns [`Error`] (I/O) on wire-protocol I/O failure.
822 pub fn start_copy_in_with_format(
823 &mut self,
824 table_name: &str,
825 columns: &[&str],
826 format: &str,
827 ) -> Result<()> {
828 self.ensure_healthy()?;
829 // Build COPY command with specified format
830 let column_list = if columns.is_empty() {
831 String::new()
832 } else {
833 format!(
834 " ({})",
835 columns
836 .iter()
837 .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
838 .collect::<Vec<_>>()
839 .join(", ")
840 )
841 };
842
843 let query = format!("COPY {table_name}{column_list} FROM STDIN WITH (FORMAT {format})");
844
845 frontend::query(&query, &mut self.write_buf)?;
846 self.flush()?;
847
848 // Wait for CopyInResponse
849 loop {
850 let msg = self.read_message()?;
851 match msg {
852 Message::CopyInResponse(_) => {
853 // Ready to receive data
854 return Ok(());
855 }
856 Message::ErrorResponse(body) => {
857 return Err(self.consume_error(&body));
858 }
859 _ => {
860 // Ignore other messages (like NoticeResponse)
861 }
862 }
863 }
864 }
865
866 /// Initiates a COPY IN operation from a raw SQL query string.
867 ///
868 /// The query must be a complete `COPY ... FROM STDIN ...` statement.
869 ///
870 /// # Errors
871 ///
872 /// Same failure modes as [`Self::start_copy_in_with_format`]: unhealthy
873 /// connection, server-side SQL rejection, or transport I/O failure.
874 pub fn start_copy_in_raw(&mut self, query: &str) -> Result<()> {
875 self.ensure_healthy()?;
876 frontend::query(query, &mut self.write_buf)?;
877 self.flush()?;
878
879 loop {
880 let msg = self.read_message()?;
881 match msg {
882 Message::CopyInResponse(_) => {
883 return Ok(());
884 }
885 Message::ErrorResponse(body) => {
886 return Err(self.consume_error(&body));
887 }
888 _ => {}
889 }
890 }
891 }
892
893 /// Sends COPY data to the server.
894 ///
895 /// The data should be in `HyperBinary` format.
896 ///
897 /// # Errors
898 ///
899 /// Currently infallible — frame construction is pure. The `Result`
900 /// return type is preserved for forward compatibility.
901 pub fn send_copy_data(&mut self, data: &[u8]) -> Result<()> {
902 frontend::copy_data(data, &mut self.write_buf);
903 // Don't flush immediately for better batching
904 // Caller can call flush() explicitly if needed
905 Ok(())
906 }
907
908 /// Sends COPY data directly to the stream without internal buffering.
909 ///
910 /// This writes the `CopyData` message directly to the TCP stream, letting
911 /// the kernel's TCP stack handle buffering. Use `flush_stream()` periodically
912 /// to ensure data is sent.
913 ///
914 /// This is more efficient for streaming large amounts of data as it avoids
915 /// copying data into an intermediate buffer.
916 ///
917 /// # Errors
918 ///
919 /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
920 /// `u32::MAX` (the PostgreSQL per-message length cap).
921 /// - Returns [`Error`] (I/O) if flushing buffered bytes or writing
922 /// the header/payload directly to the transport fails.
923 pub fn send_copy_data_direct(&mut self, data: &[u8]) -> Result<()> {
924 // First flush any pending buffered data
925 if !self.write_buf.is_empty() {
926 self.stream.write_all(&self.write_buf)?;
927 self.write_buf.clear();
928 }
929
930 // Write CopyData message header + data directly to stream
931 // Message format: 'd' (1 byte) + length (4 bytes BigEndian) + data
932 let msg_len = u32::try_from(4 + data.len())
933 .map_err(|_| Error::protocol("CopyData payload exceeds u32::MAX bytes"))?;
934 let len_be = msg_len.to_be_bytes();
935 let header = [b'd', len_be[0], len_be[1], len_be[2], len_be[3]];
936 self.stream.write_all(&header)?;
937 self.stream.write_all(data)?;
938 Ok(())
939 }
940
941 /// Flushes the TCP stream without clearing the write buffer.
942 ///
943 /// Use this with `send_copy_data_direct()` to periodically ensure
944 /// data is sent to the server.
945 ///
946 /// # Errors
947 ///
948 /// Returns [`Error`] (I/O) if flushing the underlying transport
949 /// fails.
950 pub fn flush_stream(&mut self) -> Result<()> {
951 self.stream.flush()?;
952 Ok(())
953 }
954
955 /// Finishes a COPY IN operation successfully.
956 ///
957 /// This sends `CopyDone` and waits for `CommandComplete`.
958 /// Returns the number of rows inserted.
959 ///
960 /// # Errors
961 ///
962 /// - Returns [`Error`] (server) if the server emits an `ErrorResponse`
963 /// during finalization (e.g. constraint violation from the
964 /// accumulated rows).
965 /// - Returns [`Error`] (I/O) on wire-protocol read/write failure.
966 pub fn finish_copy(&mut self) -> Result<u64> {
967 // Ensure all data is sent
968 self.flush()?;
969
970 // Send CopyDone
971 frontend::copy_done(&mut self.write_buf);
972 self.flush()?;
973
974 // Wait for CommandComplete and ReadyForQuery
975 let mut row_count = 0u64;
976 loop {
977 let msg = self.read_message()?;
978 match msg {
979 Message::CommandComplete(body) => {
980 if let Ok(tag) = body.tag() {
981 // Parse row count from tag like "COPY 1234"
982 if let Some(count_str) = tag.strip_prefix("COPY ") {
983 if let Ok(count) = count_str.trim().parse() {
984 row_count = count;
985 }
986 }
987 }
988 }
989 Message::ReadyForQuery(_) => {
990 return Ok(row_count);
991 }
992 Message::ErrorResponse(body) => {
993 return Err(self.consume_error(&body));
994 }
995 _ => {
996 // Ignore other messages
997 }
998 }
999 }
1000 }
1001
1002 /// Cancels a COPY IN operation.
1003 ///
1004 /// This sends `CopyFail` and waits for the error response.
1005 ///
1006 /// # Errors
1007 ///
1008 /// Returns [`Error`] (I/O) if flushing the buffer or writing the
1009 /// `CopyFail` frame fails, or [`Error`] (closed) if the server
1010 /// drops the connection before returning `ReadyForQuery`.
1011 pub fn cancel_copy(&mut self, reason: &str) -> Result<()> {
1012 // Ensure buffer is clear
1013 self.flush()?;
1014
1015 // Send CopyFail
1016 frontend::copy_fail(reason, &mut self.write_buf);
1017 self.flush()?;
1018
1019 // Wait for ErrorResponse and ReadyForQuery
1020 loop {
1021 let msg = self.read_message()?;
1022 match msg {
1023 Message::ReadyForQuery(_) => {
1024 return Ok(());
1025 }
1026 Message::ErrorResponse(_) => {
1027 // Expected - the server confirms the cancel
1028 }
1029 _ => {
1030 // Ignore other messages
1031 }
1032 }
1033 }
1034 }
1035
1036 /// Executes a COPY ... TO STDOUT query and returns all output data.
1037 ///
1038 /// This is used for queries like:
1039 /// `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
1040 ///
1041 /// The method:
1042 /// 1. Sends the query
1043 /// 2. Waits for `CopyOutResponse`
1044 /// 3. Collects all `CopyData` messages
1045 /// 4. Waits for `CopyDone`, `CommandComplete`, and `ReadyForQuery`
1046 ///
1047 /// # Arguments
1048 ///
1049 /// * `query` - The COPY TO STDOUT query to execute
1050 ///
1051 /// # Returns
1052 ///
1053 /// The raw bytes from all `CopyData` messages concatenated together.
1054 ///
1055 /// # Example
1056 ///
1057 /// ```no_run
1058 /// # use hyperdb_api_core::client::connection::RawConnection;
1059 /// # use std::net::TcpStream;
1060 /// # fn example(conn: &mut RawConnection<TcpStream>) -> hyperdb_api_core::client::Result<()> {
1061 /// let arrow_data = conn.copy_out(
1062 /// "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
1063 /// )?;
1064 /// # Ok(())
1065 /// # }
1066 /// ```
1067 ///
1068 /// # Errors
1069 ///
1070 /// - Returns [`Error`] (connection) if the connection has been
1071 /// marked unhealthy.
1072 /// - Returns [`Error`] (server) when the server rejects the COPY TO
1073 /// STDOUT statement via `ErrorResponse`.
1074 /// - Returns [`Error`] (I/O) / [`Error`] (closed) on transport
1075 /// read/write failure.
1076 pub fn copy_out(&mut self, query: &str) -> Result<Vec<u8>> {
1077 self.ensure_healthy()?;
1078 // Send the query
1079 frontend::query(query, &mut self.write_buf)?;
1080 self.flush()?;
1081
1082 let mut data = Vec::new();
1083 let mut in_copy_out = false;
1084
1085 // Process messages
1086 loop {
1087 let msg = self.read_message()?;
1088 match msg {
1089 Message::CopyOutResponse(_) => {
1090 // Server is ready to send COPY data
1091 in_copy_out = true;
1092 }
1093 Message::CopyData(body) if in_copy_out => {
1094 // Accumulate the copy data
1095 data.extend_from_slice(body.data());
1096 }
1097 Message::CopyDone => {
1098 // COPY data transfer complete
1099 in_copy_out = false;
1100 }
1101 Message::CommandComplete(_) => {
1102 // Command finished
1103 }
1104 Message::ReadyForQuery(_) => {
1105 // Connection is ready for next command
1106 return Ok(data);
1107 }
1108 Message::ErrorResponse(body) => {
1109 return Err(self.consume_error(&body));
1110 }
1111 _ => {
1112 // Ignore other messages (like NoticeResponse)
1113 }
1114 }
1115 }
1116 }
1117
1118 /// Streams COPY OUT data directly to a writer without buffering all data in memory.
1119 ///
1120 /// Returns the total number of bytes written.
1121 ///
1122 /// # Errors
1123 ///
1124 /// Same failure modes as [`Self::copy_out`], plus [`Error`] (I/O)
1125 /// wrapping any error from `writer.write_all` when the target
1126 /// writer cannot accept a COPY chunk.
1127 pub fn copy_out_to_writer(
1128 &mut self,
1129 query: &str,
1130 writer: &mut dyn std::io::Write,
1131 ) -> Result<u64> {
1132 self.ensure_healthy()?;
1133 frontend::query(query, &mut self.write_buf)?;
1134 self.flush()?;
1135
1136 let mut total_bytes: u64 = 0;
1137 let mut in_copy_out = false;
1138
1139 loop {
1140 let msg = self.read_message()?;
1141 match msg {
1142 Message::CopyOutResponse(_) => {
1143 in_copy_out = true;
1144 }
1145 Message::CopyData(body) if in_copy_out => {
1146 let chunk = body.data();
1147 writer.write_all(chunk).map_err(|e| {
1148 Error::new(
1149 super::error::ErrorKind::Io,
1150 format!("Failed to write COPY data: {e}"),
1151 )
1152 })?;
1153 total_bytes += chunk.len() as u64;
1154 }
1155 Message::CopyDone => {
1156 in_copy_out = false;
1157 }
1158 Message::CommandComplete(_) => {}
1159 Message::ReadyForQuery(_) => {
1160 return Ok(total_bytes);
1161 }
1162 Message::ErrorResponse(body) => {
1163 return Err(self.consume_error(&body));
1164 }
1165 _ => {}
1166 }
1167 }
1168 }
1169}
1170
1171/// Parses an error response into an Error.
1172pub(crate) fn parse_error_response(
1173 body: &crate::protocol::message::backend::ErrorResponseBody,
1174) -> Error {
1175 let mut severity = String::from("ERROR");
1176 let mut code = String::from("00000");
1177 let mut message = String::from("unknown error");
1178
1179 for field in body.fields().filter_map(|r| {
1180 r.map_err(|e| trace!(target: "hyperdb_api_core::client", error = %e, "dropped error parsing error response field")).ok()
1181 }) {
1182 match field.type_() {
1183 b'S' | b'V' => {
1184 if let Ok(s) = field.value() {
1185 severity = s.to_string();
1186 }
1187 }
1188 b'C' => {
1189 if let Ok(s) = field.value() {
1190 code = s.to_string();
1191 }
1192 }
1193 b'M' => {
1194 if let Ok(s) = field.value() {
1195 message = s.to_string();
1196 }
1197 }
1198 _ => {}
1199 }
1200 }
1201
1202 Error::db(&severity, &code, &message)
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use super::*;
1208 use std::io::Cursor;
1209
1210 /// Minimal `Read + Write` harness. Discards all writes and hands back
1211 /// empty reads so we can construct a `RawConnection` without touching
1212 /// the network. Adequate for exercising pure-state logic like the
1213 /// `desynchronized` flag and `ensure_healthy` — not for anything that
1214 /// actually needs a live server response.
1215 struct NullStream;
1216 impl std::io::Read for NullStream {
1217 fn read(&mut self, _: &mut [u8]) -> std::io::Result<usize> {
1218 Ok(0)
1219 }
1220 }
1221 impl std::io::Write for NullStream {
1222 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1223 Ok(buf.len())
1224 }
1225 fn flush(&mut self) -> std::io::Result<()> {
1226 Ok(())
1227 }
1228 }
1229
1230 /// Fresh connections are healthy and `ensure_healthy` is a no-op.
1231 #[test]
1232 fn fresh_connection_is_healthy() {
1233 let conn = RawConnection::new(NullStream);
1234 assert!(conn.is_healthy());
1235 assert!(conn.ensure_healthy().is_ok());
1236 }
1237
1238 /// Once `desynchronized` is set, `is_healthy` reports false and
1239 /// `ensure_healthy` returns a `Connection`-kind error whose message
1240 /// explicitly names the desync so log consumers can grep for it.
1241 #[test]
1242 fn desynchronized_connection_fails_health_check() {
1243 let mut conn = RawConnection::new(NullStream);
1244 conn.desynchronized = true;
1245 assert!(!conn.is_healthy());
1246 let err = conn.ensure_healthy().expect_err("must fail-fast");
1247 assert_eq!(err.kind(), crate::client::error::ErrorKind::Connection);
1248 assert!(
1249 err.to_string().to_lowercase().contains("desynchron"),
1250 "error message should mention desynchronization; got: {err}",
1251 );
1252 }
1253
1254 /// `drain_until_ready_bounded` with budget `0` returns false without
1255 /// reading anything, and marks the connection desynchronized. This is
1256 /// the cheapest way to exercise the "budget exhausted" code path
1257 /// without a live protocol stream. Uses a `Cursor` over an empty
1258 /// buffer so the underlying stream is well-defined.
1259 #[test]
1260 fn zero_budget_drain_marks_desynchronized() {
1261 let mut conn = RawConnection::new(Cursor::new(Vec::<u8>::new()));
1262 assert!(conn.is_healthy());
1263 let ok = conn.drain_until_ready_bounded(0);
1264 assert!(!ok, "zero-budget drain must return false");
1265 assert!(
1266 !conn.is_healthy(),
1267 "drain failure must mark connection desynchronized",
1268 );
1269 }
1270
1271 /// Once desynchronized, the main public request APIs all fail-fast
1272 /// with the `ensure_healthy` error instead of sending bytes into a
1273 /// known-poisoned wire. Spot-check one sync query method here; the
1274 /// check itself (`self.ensure_healthy()?`) is a trivial first line
1275 /// at every entry point so extending coverage to every API wouldn't
1276 /// catch additional bug classes.
1277 #[test]
1278 fn desynchronized_connection_fast_fails_simple_query() {
1279 let mut conn = RawConnection::new(Cursor::new(Vec::<u8>::new()));
1280 conn.desynchronized = true;
1281 // `Message` doesn't implement `Debug`, so we can't use
1282 // `expect_err`; match the result directly instead.
1283 let Err(err) = conn.simple_query("SELECT 1") else {
1284 panic!("desynced simple_query must fail-fast")
1285 };
1286 assert_eq!(err.kind(), crate::client::error::ErrorKind::Connection);
1287 assert!(err.to_string().to_lowercase().contains("desynchron"));
1288 }
1289}