Skip to main content

hyperdb_api_core/client/
connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Low-level synchronous connection handling over the `PostgreSQL` wire protocol.
5//!
6//! [`RawConnection`] provides the message-level interface to a Hyper server:
7//! startup/authentication handshake, simple and extended query execution, and
8//! the COPY data path. It is generic over the transport stream `S` (TCP,
9//! Unix domain socket, named pipe, or TLS-wrapped variants).
10//!
11//! # Wire Protocol Overview
12//!
13//! Communication follows the `PostgreSQL` v3 message protocol. Each message has a
14//! 1-byte type tag, a 4-byte length (including itself), and a variable-length
15//! body. The connection maintains separate read and write `BytesMut` buffers to
16//! amortize syscall overhead.
17//!
18//! ## Query Protocols
19//!
20//! - **Simple Query**: A single `Query('Q')` message; the server responds with
21//!   `RowDescription`, zero or more `DataRow`, `CommandComplete`, and
22//!   `ReadyForQuery`. Results are in text format.
23//!
24//! - **Extended Query (`HyperBinary`)**: `Parse` / `Bind` / `Describe` /
25//!   `Execute` / `Sync` sequence requesting format code 2 (`HyperBinary`,
26//!   little-endian binary). Results are zero-copy-friendly `DataRow` messages.
27//!   Used by [`Client::query_fast`](crate::client::Client::query_fast) and
28//!   [`Client::query_streaming`](crate::client::Client::query_streaming).
29//!
30//! ## Connection Health
31//!
32//! A `desynchronized` flag tracks whether the wire protocol has fallen out of
33//! sync (e.g., a bounded drain exceeded its budget). Once set, all subsequent
34//! operations fast-fail. The only recovery is to drop the connection and
35//! open a new one. Pool layers should check [`RawConnection::is_healthy`]
36//! during recycle.
37
38use std::io::{Read, Write};
39
40use bytes::BytesMut;
41use tracing::{debug, info, trace, warn};
42
43use crate::protocol::message::{backend::Message, frontend};
44
45use super::auth::{self, AuthState};
46use super::error::{Error, Result};
47
48/// Maximum number of messages [`RawConnection::consume_error`] (and its
49/// async sibling) will read while draining the tail of a failed request.
50///
51/// A well-behaved server emits only a handful of messages after
52/// `ErrorResponse` before `ReadyForQuery` (typically just the error +
53/// `Z`, with the occasional `NoticeResponse` interleaved). This cap is
54/// therefore orders of magnitude above anything a legitimate error path
55/// produces; it exists purely as a defensive safety valve against broken
56/// server implementations and stalled network paths. When exceeded, the
57/// drain logs a `tracing::warn!` and returns — the next operation on the
58/// connection will see the resulting desync and trigger a reconnect.
59///
60/// See [`RawConnection::consume_error`] for the full rationale.
61pub const POST_ERROR_DRAIN_CAP: usize = 1024;
62
63/// A raw connection to a Hyper server.
64///
65/// This handles the low-level protocol communication, including:
66/// - Message framing (reading/writing `PostgreSQL` wire protocol messages)
67/// - Authentication handshake
68/// - Query execution (simple and extended query protocols)
69/// - COPY protocol support
70///
71/// The connection is generic over the stream type `S`, allowing it to work
72/// with different transport mechanisms (TCP, TLS, etc.) as long as they
73/// implement `Read + Write`.
74///
75/// # Buffering
76///
77/// The connection maintains separate read and write buffers for efficient
78/// I/O. Messages are buffered before being sent, and incoming data is
79/// buffered until complete messages can be parsed.
80#[derive(Debug)]
81pub struct RawConnection<S> {
82    /// The underlying I/O stream.
83    stream: S,
84    /// Buffer for reading incoming messages from the server.
85    read_buf: BytesMut,
86    /// Buffer for writing outgoing messages to the server.
87    write_buf: BytesMut,
88    /// Backend process ID (for cancel requests).
89    process_id: i32,
90    /// Secret key for authenticating cancel requests.
91    secret_key: i32,
92    /// Server parameters received during startup (e.g., `server_version`, `session_identifier`).
93    server_params: std::collections::HashMap<String, String>,
94    /// Sticky flag set when the wire protocol has fallen out of sync with
95    /// the server (e.g. a bounded drain exhausted its budget before seeing
96    /// `ReadyForQuery`). Once set it is never cleared — the only valid
97    /// recovery is to discard this connection and open a new one.
98    ///
99    /// Every public method that initiates a new server request checks
100    /// this flag via [`Self::ensure_healthy`] and fast-fails with a
101    /// clear error instead of sending bytes into a known-poisoned
102    /// channel. Pool layers should call [`Self::is_healthy`] during
103    /// recycle to skip the health-probe roundtrip for connections that
104    /// are already known-bad.
105    desynchronized: bool,
106}
107
108impl<S> RawConnection<S>
109where
110    S: Read + Write,
111{
112    /// Creates a new raw connection from a stream.
113    ///
114    /// Initializes read and write buffers with default capacity (64 KB each).
115    /// The connection is not yet authenticated - call `startup()` to begin
116    /// the connection handshake.
117    ///
118    /// # Arguments
119    ///
120    /// * `stream` - The I/O stream (must implement `Read + Write`)
121    pub fn new(stream: S) -> Self {
122        RawConnection {
123            stream,
124            read_buf: BytesMut::with_capacity(64 * 1024),
125            write_buf: BytesMut::with_capacity(64 * 1024),
126            process_id: 0,
127            secret_key: 0,
128            server_params: std::collections::HashMap::new(),
129            desynchronized: false,
130        }
131    }
132
133    /// Returns `true` if this connection is still in a known-good state
134    /// and safe to use for new requests.
135    ///
136    /// Once the wire protocol falls out of sync with the server (see the
137    /// `desynchronized` field on [`RawConnection`]), this returns `false`
138    /// permanently — the only recovery is to drop this connection and
139    /// open a new one. Pool implementations should consult this before
140    /// running a recycle health probe to avoid spending a roundtrip on a
141    /// connection that is already known to be bad.
142    pub fn is_healthy(&self) -> bool {
143        !self.desynchronized
144    }
145
146    /// Fast-fails with an explicit [`ErrorKind::Connection`] error if the
147    /// wire has fallen out of sync with the server, before any bytes are
148    /// written to the stream. Called from the entry point of every public
149    /// method that initiates a new server request — simple queries,
150    /// streaming queries, prepared statement execution, COPY in/out,
151    /// etc. — so that a desynchronized connection produces a clear
152    /// "connection unusable" diagnostic at the API boundary rather than a
153    /// cryptic protocol-parse error deep inside the message loop of the
154    /// *next* unrelated operation.
155    pub(crate) fn ensure_healthy(&self) -> Result<()> {
156        if self.desynchronized {
157            return Err(Error::new(
158                crate::client::error::ErrorKind::Connection,
159                "connection is desynchronized from the server and cannot be reused; \
160                 discard it and open a new one",
161            ));
162        }
163        Ok(())
164    }
165
166    /// Reserves capacity in the write buffer to avoid reallocations.
167    ///
168    /// Call this before bulk operations to pre-allocate buffer space.
169    /// This is useful for high-throughput scenarios where buffer growth
170    /// would cause performance overhead.
171    pub fn reserve_write_buffer(&mut self, additional: usize) {
172        self.write_buf.reserve(additional);
173    }
174
175    /// Returns the process ID assigned by the server.
176    pub fn process_id(&self) -> i32 {
177        self.process_id
178    }
179
180    /// Returns the secret key for cancel requests.
181    pub fn secret_key(&self) -> i32 {
182        self.secret_key
183    }
184
185    /// Returns a reference to the underlying stream.
186    pub fn stream(&self) -> &S {
187        &self.stream
188    }
189
190    /// Returns a mutable reference to the underlying stream.
191    pub fn stream_mut(&mut self) -> &mut S {
192        &mut self.stream
193    }
194
195    /// Returns a server parameter value by name.
196    ///
197    /// Server parameters are sent by the server during connection startup.
198    /// Common parameters include:
199    /// - `server_version` - The server version string
200    /// - `server_encoding` - The server's character encoding
201    /// - `client_encoding` - The client's character encoding
202    pub fn parameter_status(&self, name: &str) -> Option<&str> {
203        self.server_params
204            .get(name)
205            .map(std::string::String::as_str)
206    }
207
208    /// Sends a startup message and performs initial handshake.
209    ///
210    /// # Errors
211    ///
212    /// - Returns [`Error`] (auth) when the server requests an
213    ///   auth method (cleartext, MD5, SASL) and no password is supplied,
214    ///   when the offered SASL mechanisms exclude SCRAM-SHA-256, or when
215    ///   SCRAM state is missing at the SASL-continue / SASL-final step.
216    /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
217    ///   during startup (for example, unknown user or database).
218    /// - Returns [`Error`] (protocol) if a message arrives out of the
219    ///   expected startup sequence.
220    /// - Returns [`Error`] (I/O) on wire-protocol read/write failure.
221    pub fn startup(&mut self, params: &[(&str, &str)], password: Option<&str>) -> Result<()> {
222        // Send startup message
223        frontend::startup_message(params, &mut self.write_buf)?;
224        self.flush()?;
225
226        // Handle authentication
227        let mut auth_state: Option<AuthState> = None;
228
229        loop {
230            let msg = self.read_message()?;
231            match msg {
232                Message::AuthenticationOk => {
233                    info!(target: "hyperdb_api", "connection-auth-success");
234                }
235                Message::AuthenticationCleartextPassword => {
236                    debug!(target: "hyperdb_api", method = "cleartext", "connection-auth-method");
237                    let password = password.ok_or_else(|| {
238                        Error::authentication(
239                            "server requested cleartext password but none provided",
240                        )
241                    })?;
242                    frontend::password_message(password, &mut self.write_buf)?;
243                    self.flush()?;
244                }
245                Message::AuthenticationMd5Password(body) => {
246                    debug!(target: "hyperdb_api", method = "MD5", "connection-auth-method");
247                    let password = password.ok_or_else(|| {
248                        Error::authentication("server requested MD5 password but none provided")
249                    })?;
250                    let user = params
251                        .iter()
252                        .find(|(k, _)| *k == "user")
253                        .map_or("", |(_, v)| *v);
254
255                    let md5_response = auth::compute_md5_password(user, password, &body.salt());
256                    frontend::password_message(&md5_response, &mut self.write_buf)?;
257                    self.flush()?;
258                }
259                Message::AuthenticationSasl(body) => {
260                    debug!(target: "hyperdb_api", method = "SCRAM-SHA-256", "connection-auth-method");
261                    let password = password.ok_or_else(|| {
262                        Error::authentication(
263                            "server requested SASL authentication but no password provided",
264                        )
265                    })?;
266
267                    // Check for SCRAM-SHA-256
268                    let mechanisms: Vec<&str> = body.mechanisms().collect();
269                    if !mechanisms.contains(&"SCRAM-SHA-256") {
270                        return Err(Error::authentication(format!(
271                            "server offered unsupported SASL mechanisms: {mechanisms:?}"
272                        )));
273                    }
274
275                    // Start SCRAM-SHA-256 exchange
276                    let (state, client_first) = auth::scram_client_first(password)?;
277                    auth_state = Some(state);
278
279                    frontend::sasl_initial_response(
280                        "SCRAM-SHA-256",
281                        &client_first,
282                        &mut self.write_buf,
283                    )?;
284                    self.flush()?;
285                }
286                Message::AuthenticationSaslContinue(body) => {
287                    let state = auth_state.take().ok_or_else(|| {
288                        Error::authentication("received SASL continue without initial state")
289                    })?;
290
291                    let server_first = body.data();
292                    let (new_state, client_final) = auth::scram_client_final(state, server_first)?;
293                    auth_state = Some(new_state);
294
295                    frontend::sasl_response(&client_final, &mut self.write_buf)?;
296                    self.flush()?;
297                }
298                Message::AuthenticationSaslFinal(body) => {
299                    let state = auth_state.take().ok_or_else(|| {
300                        Error::authentication("received SASL final without state")
301                    })?;
302
303                    // Verify server signature
304                    auth::scram_verify_server(state, body.data())?;
305                }
306                Message::BackendKeyData(data) => {
307                    self.process_id = data.process_id();
308                    self.secret_key = data.secret_key();
309                }
310                Message::ParameterStatus(body) => {
311                    // Store server parameters
312                    if let (Ok(name), Ok(value)) = (body.name(), body.value()) {
313                        self.server_params
314                            .insert(name.to_string(), value.to_string());
315                    }
316                }
317                Message::ReadyForQuery(_) => {
318                    // Connection is ready
319                    return Ok(());
320                }
321                Message::ErrorResponse(body) => {
322                    // Startup typically fails the connection outright, but we
323                    // still drain in case the server sent any trailing
324                    // messages so parity with the async startup path is
325                    // preserved. The drain is bounded implicitly: a post-
326                    // startup-error server either sends ReadyForQuery or
327                    // closes immediately, so drain_until_ready returns fast.
328                    return Err(self.consume_error(&body));
329                }
330                _ => {
331                    return Err(Error::protocol("unexpected message during startup"));
332                }
333            }
334        }
335    }
336
337    /// Sends a simple query and returns all messages until `ReadyForQuery`.
338    ///
339    /// # Errors
340    ///
341    /// - Returns [`Error`] (server) when the server sends an `ErrorResponse`
342    ///   (SQL error, constraint violation, etc.).
343    /// - Returns [`Error`] (I/O) on transport read/write failure.
344    /// - Returns [`Error`] (closed) if the server closes the connection
345    ///   mid-query.
346    /// - Returns [`Error`] (connection) if the connection has already
347    ///   been marked unhealthy by a prior failure.
348    pub fn simple_query(&mut self, query: &str) -> Result<Vec<Message>> {
349        self.ensure_healthy()?;
350        frontend::query(query, &mut self.write_buf)?;
351        self.flush()?;
352
353        let mut messages = Vec::new();
354        loop {
355            let msg = self.read_message()?;
356            match &msg {
357                Message::ReadyForQuery(_) => {
358                    messages.push(msg);
359                    return Ok(messages);
360                }
361                Message::ErrorResponse(body) => {
362                    return Err(self.consume_error(body));
363                }
364                _ => {
365                    messages.push(msg);
366                }
367            }
368        }
369    }
370
371    /// Sends a query using extended protocol with binary format results.
372    ///
373    /// This uses the `PostgreSQL` extended query protocol (Parse/Bind/Execute/Sync)
374    /// with `HyperBinary` format (format code 2) for maximum performance.
375    ///
376    /// Returns all messages until `ReadyForQuery`.
377    ///
378    /// # Errors
379    ///
380    /// Same failure modes as [`Self::simple_query`] — server-side SQL
381    /// errors surface as [`Error`] (server), transport failures as
382    /// [`Error`] (I/O) / [`Error`] (closed), and an unhealthy prior state
383    /// as [`Error`] (connection).
384    pub fn query_binary(&mut self, query: &str) -> Result<Vec<Message>> {
385        self.ensure_healthy()?;
386        // HyperBinary format code
387        const HYPER_BINARY_FORMAT: i16 = 2;
388
389        // Parse: prepare an unnamed statement
390        frontend::parse("", query, &[], &mut self.write_buf)?;
391
392        // Bind: bind unnamed portal with HyperBinary result format
393        // Empty arrays for param_formats and params (no parameters)
394        // Single result_format of 2 (HyperBinary) applies to all columns
395        frontend::bind(
396            "",
397            "",
398            &[],
399            &[],
400            &[HYPER_BINARY_FORMAT],
401            &mut self.write_buf,
402        )?;
403
404        // Describe: get column metadata (optional but useful)
405        frontend::describe(b'P', "", &mut self.write_buf)?;
406
407        // Execute: run the unnamed portal with no row limit
408        frontend::execute("", 0, &mut self.write_buf)?;
409
410        // Sync: end the extended query sequence
411        frontend::sync(&mut self.write_buf);
412
413        self.flush()?;
414
415        let mut messages = Vec::new();
416        loop {
417            let msg = self.read_message()?;
418            match &msg {
419                Message::ReadyForQuery(_) => {
420                    messages.push(msg);
421                    return Ok(messages);
422                }
423                Message::ErrorResponse(body) => {
424                    return Err(self.consume_error(body));
425                }
426                _ => {
427                    messages.push(msg);
428                }
429            }
430        }
431    }
432
433    /// Starts a binary query but leaves result consumption to the caller.
434    ///
435    /// This is useful for streaming scenarios where you want to pull messages
436    /// incrementally instead of materializing the full result set up front.
437    ///
438    /// # Errors
439    ///
440    /// - Returns [`Error`] (connection) if the connection has been
441    ///   marked unhealthy.
442    /// - Returns [`Error`] (I/O) if writing the Parse/Bind/Execute/Sync
443    ///   sequence to the transport fails.
444    pub fn start_query_binary(&mut self, query: &str) -> Result<()> {
445        self.ensure_healthy()?;
446        const HYPER_BINARY_FORMAT: i16 = 2;
447
448        frontend::parse("", query, &[], &mut self.write_buf)?;
449        frontend::bind(
450            "",
451            "",
452            &[],
453            &[],
454            &[HYPER_BINARY_FORMAT],
455            &mut self.write_buf,
456        )?;
457        frontend::describe(b'P', "", &mut self.write_buf)?;
458        frontend::execute("", 0, &mut self.write_buf)?;
459        frontend::sync(&mut self.write_buf);
460
461        self.flush()
462    }
463
464    /// Starts a simple query but leaves result consumption to the caller.
465    ///
466    /// This is useful for streaming scenarios where you want to pull messages
467    /// incrementally instead of materializing the full result set up front.
468    ///
469    /// # Errors
470    ///
471    /// Same failure modes as [`Self::start_query_binary`] —
472    /// [`Error`] (connection) for unhealthy state, [`Error`] (I/O) for
473    /// transport failure.
474    pub fn start_simple_query(&mut self, query: &str) -> Result<()> {
475        self.ensure_healthy()?;
476        frontend::query(query, &mut self.write_buf)?;
477        self.flush()
478    }
479
480    /// Starts an **execute** of a prepared statement but leaves result
481    /// consumption to the caller.
482    ///
483    /// Sends `Bind` + `Execute(unnamed_portal, 0)` + `Sync`, then
484    /// returns. The caller drives a message loop that reads
485    /// `BindComplete`, any `DataRow`s, then `CommandComplete` +
486    /// `ReadyForQuery` — the same shape used by
487    /// [`Self::start_query_binary`].
488    ///
489    /// Format codes (`PostgreSQL` wire protocol):
490    /// - **Parameters**: format `1` (standard PG binary, big-endian).
491    ///   Hyper's server-side Bind decodes bound parameters as standard
492    ///   PG binary regardless of the format code we advertise. The
493    ///   caller is responsible for supplying parameter bytes in BE.
494    /// - **Results**: format `2` (`HyperBinary`, little-endian). Hyper
495    ///   supports this as a separate protocol extension; the row
496    ///   decoders in [`super::row::StreamRow`] and the hyperdb-api `Row`
497    ///   type all expect LE, so requesting LE at Bind time avoids an
498    ///   extra conversion pass.
499    ///
500    /// `max_rows = 0` means "send all rows" — we pace on the client side
501    /// by reading `DataRows` in chunks from the read buffer.
502    ///
503    /// # Errors
504    ///
505    /// - Returns [`Error`] (connection) if the connection has been
506    ///   marked unhealthy.
507    /// - Returns [`Error`] (I/O) if writing the Bind/Execute/Sync
508    ///   sequence to the transport fails.
509    pub fn start_execute_prepared(
510        &mut self,
511        statement_name: &str,
512        params: &[Option<&[u8]>],
513        column_count: usize,
514    ) -> Result<()> {
515        self.ensure_healthy()?;
516
517        const PG_BINARY_FORMAT: i16 = 1;
518        const HYPER_BINARY_FORMAT: i16 = 2;
519        let param_formats: Vec<i16> = vec![PG_BINARY_FORMAT; params.len()];
520        let result_formats: Vec<i16> = vec![HYPER_BINARY_FORMAT; column_count];
521
522        frontend::bind(
523            "", // unnamed portal
524            statement_name,
525            &param_formats,
526            params,
527            &result_formats,
528            &mut self.write_buf,
529        )?;
530        frontend::execute("", 0, &mut self.write_buf)?;
531        frontend::sync(&mut self.write_buf);
532
533        self.flush()
534    }
535
536    /// Reads a single message from the server.
537    ///
538    /// # Errors
539    ///
540    /// - Returns [`Error`] (I/O) if reading from the transport fails or
541    ///   if [`Message::parse`] reports a malformed frame.
542    /// - Returns [`Error`] (closed) when the transport reaches EOF
543    ///   (server closed the connection).
544    pub fn read_message(&mut self) -> Result<Message> {
545        loop {
546            if let Some(msg) = Message::parse(&mut self.read_buf).map_err(Error::io)? {
547                return Ok(msg);
548            }
549
550            // Need more data — read directly into the spare capacity of
551            // `read_buf`, no temporary buffer or `extend_from_slice` memcpy.
552            //
553            // The 64 KiB read-window matches the typical TCP loopback
554            // segment size and the default DataRow streaming chunk. On
555            // Windows TCP loopback the per-`WSARecv` overhead is several
556            // times higher than Linux/macOS `recv`, so a tight ceiling here
557            // dominates wall time on long scans. The previous 8 KiB ceiling
558            // forced an 8× syscall amplification on this path.
559            //
560            // Implementation: `resize` extends the buffer with zeroed bytes
561            // (single memset, ~50 GB/s on modern CPUs), `read` writes into
562            // the new tail, then `truncate` shrinks to the actual byte
563            // count. This is safe Rust and results in exactly one memset
564            // per syscall — no heap alloc, no extra memcpy.
565            let prev_len = self.read_buf.len();
566            self.read_buf.resize(prev_len + 64 * 1024, 0);
567            let n = self.stream.read(&mut self.read_buf[prev_len..])?;
568            if n == 0 {
569                self.read_buf.truncate(prev_len);
570                warn!(target: "hyperdb_api", "connection-closed");
571                return Err(Error::closed());
572            }
573            self.read_buf.truncate(prev_len + n);
574        }
575    }
576
577    /// Drains messages from the server until a [`Message::ReadyForQuery`] is
578    /// seen, discarding them. Call this after receiving an
579    /// [`Message::ErrorResponse`] to stay in sync with the wire protocol.
580    ///
581    /// Per the `PostgreSQL` wire protocol, every query (simple or extended) ends
582    /// with `ReadyForQuery`, even if the statement failed. Without this drain,
583    /// the `ReadyForQuery` (and any other trailing messages) remain in the
584    /// read buffer and get consumed by the next operation's response parser,
585    /// which misinterprets them — classic wire desync.
586    ///
587    /// This is the **unbounded** variant, safe to use in the standard
588    /// error path where the server has already sent `ErrorResponse` and
589    /// `ReadyForQuery` is guaranteed to arrive within a few messages. In
590    /// exceptional cases where the drain might take arbitrarily long —
591    /// most notably the `Drop` path for a streaming result that the caller
592    /// abandoned mid-way — prefer
593    /// [`drain_until_ready_bounded`](Self::drain_until_ready_bounded) to
594    /// avoid blocking indefinitely on an unresponsive server.
595    ///
596    /// Drain errors (connection already closed, I/O failure mid-drain) are
597    /// logged via `tracing::warn!` and then swallowed. The caller's original
598    /// error is more informative to surface, and a dead connection will be
599    /// reported on the next real operation anyway.
600    pub fn drain_until_ready(&mut self) {
601        let _ = self.drain_until_ready_bounded(usize::MAX);
602    }
603
604    /// Bounded version of [`drain_until_ready`](Self::drain_until_ready) that
605    /// stops after reading at most `max_messages` messages. Returns `true`
606    /// when `ReadyForQuery` was observed within that budget; `false` if the
607    /// budget was exhausted first or an I/O error occurred before reaching it.
608    ///
609    /// # Why we do not send `Sync` before draining
610    ///
611    /// A natural question is whether to send a `Sync` message first to prompt
612    /// the server to emit `ReadyForQuery` sooner. The answer for Hyper is
613    /// **no** — it would actively corrupt the next query.
614    ///
615    /// Per the Hyper server state machine (see `LibpqConnection::handleSync`
616    /// and `handleQueryDone`), every query — simple or extended — already
617    /// ends with exactly one `ReadyForQuery` emission. After an error or
618    /// normal completion the server returns to its main loop. If we then
619    /// send a `Sync`, `handleSync` would emit an **additional**
620    /// `ReadyForQuery` that no current operation is reading, and the next
621    /// query's response parser would consume that stale `ReadyForQuery`
622    /// as its own terminator — the symptom is that query returning an
623    /// empty result with "Query returned no rows".
624    ///
625    /// For the abandoned-stream case (a long-running query that the client
626    /// stopped reading), `Sync` also does not help: Hyper processes the
627    /// incoming byte stream in order, so `Sync` is only handled *after*
628    /// the in-flight `Execute` finishes emitting all its `DataRow`s plus
629    /// its own `CommandComplete` and `ReadyForQuery`. By that point the
630    /// drain has already reached `ReadyForQuery`, and the `Sync` produces
631    /// the same extra `ReadyForQuery` contamination described above.
632    ///
633    /// The canonical way to abort a running query is to open a *separate*
634    /// connection and send `CancelRequest` with the original connection's
635    /// process id and secret. That is exactly what
636    /// [`QueryStream`](super::client::QueryStream)'s `Drop` impl does
637    /// (via the [`Cancellable`](super::cancel::Cancellable) trait)
638    /// before calling this bounded drain. Cancel-then-drain converges on
639    /// `ReadyForQuery` within a handful of messages because the server
640    /// stops producing new `DataRow`s once it observes the cancel.
641    ///
642    /// # Poisoned connections
643    ///
644    /// When this returns `false` the connection is in an indeterminate
645    /// state. Callers should treat it as poisoned and not return it to a
646    /// connection pool — the next operation will see residual bytes from
647    /// whatever was still streaming. The bounded variant exists precisely
648    /// to prevent indefinite blocking in contexts like `Drop` impls where
649    /// we don't own the thread's time and can't afford to wait for a
650    /// multi-million-row query result to finish before returning from a
651    /// destructor.
652    ///
653    /// All drain errors are logged via `tracing::warn!` so state-related
654    /// issues are observable in logs even though they don't interrupt the
655    /// caller's control flow.
656    pub fn drain_until_ready_bounded(&mut self, max_messages: usize) -> bool {
657        for i in 0..max_messages {
658            match self.read_message() {
659                Ok(Message::ReadyForQuery(_)) => return true,
660                Ok(_) => {}
661                Err(e) => {
662                    warn!(
663                        target: "hyperdb_api_core::client",
664                        error = %e,
665                        messages_read = i,
666                        "drain_until_ready: read error mid-drain (likely closed connection); \
667                         connection marked desynchronized",
668                    );
669                    // Whether the underlying error is a closed socket, a
670                    // partial read, or a corrupt frame, any subsequent
671                    // `read_message` on this connection is operating on
672                    // unknown state. Mark it so pool layers and upper APIs
673                    // can short-circuit instead of piling another failed
674                    // operation on top.
675                    self.desynchronized = true;
676                    return false;
677                }
678            }
679        }
680        warn!(
681            target: "hyperdb_api_core::client",
682            max_messages,
683            "drain_until_ready_bounded: exhausted budget without seeing ReadyForQuery; \
684             connection marked desynchronized and should not be reused",
685        );
686        // Budget exhausted — residual messages still on the wire. The
687        // next read on this connection will almost certainly misparse
688        // them as belonging to an unrelated operation. Mark it so the
689        // failure surfaces at a well-defined API boundary instead.
690        self.desynchronized = true;
691        false
692    }
693
694    /// Convenience: parse a server [`Message::ErrorResponse`] body into an
695    /// [`Error`] and drain the rest of the response through the trailing
696    /// [`Message::ReadyForQuery`] so the connection is safe to reuse.
697    ///
698    /// Callers should almost always prefer this over calling
699    /// [`drain_until_ready`](Self::drain_until_ready) or
700    /// [`drain_until_ready_bounded`](Self::drain_until_ready_bounded) by
701    /// hand, because forgetting the drain is exactly the bug it exists to
702    /// prevent.
703    ///
704    /// # Drain budget
705    ///
706    /// Uses a bounded drain with a [`POST_ERROR_DRAIN_CAP`]-message budget
707    /// rather than the unbounded [`drain_until_ready`](Self::drain_until_ready).
708    /// A well-behaved server emits only a handful of messages after
709    /// `ErrorResponse` before `ReadyForQuery` — typically just the error
710    /// itself plus the `Z`, occasionally with a few `NoticeResponse`
711    /// messages interleaved — so the cap is orders of magnitude above
712    /// anything a legitimate error path produces. The cap exists purely
713    /// as a defensive safety valve against pathological server behavior
714    /// (a broken backend that never emits `ReadyForQuery`) and
715    /// misbehaved network paths (stalled reads that would otherwise hang
716    /// the caller indefinitely, particularly visible in async contexts).
717    ///
718    /// If the cap is exceeded, `drain_until_ready_bounded` logs a
719    /// `tracing::warn!` and marks the connection desynchronized; the
720    /// next operation on it will surface a transport-level failure and
721    /// trigger reconnect higher up. That is strictly better than
722    /// blocking forever with no observable symptom.
723    ///
724    /// # Example
725    ///
726    /// ```ignore
727    /// match msg {
728    ///     Message::ErrorResponse(body) => {
729    ///         return Err(self.consume_error(&body));
730    ///     }
731    ///     // ...
732    /// }
733    /// ```
734    pub fn consume_error(
735        &mut self,
736        body: &crate::protocol::message::backend::ErrorResponseBody,
737    ) -> Error {
738        let err = parse_error_response(body);
739        let _ = self.drain_until_ready_bounded(POST_ERROR_DRAIN_CAP);
740        err
741    }
742
743    /// Flushes the write buffer to the server.
744    ///
745    /// # Errors
746    ///
747    /// Returns [`Error`] (I/O) if writing the buffered bytes or flushing
748    /// the underlying transport fails.
749    pub fn flush(&mut self) -> Result<()> {
750        if !self.write_buf.is_empty() {
751            self.stream.write_all(&self.write_buf)?;
752            self.stream.flush()?;
753            self.write_buf.clear();
754        }
755        Ok(())
756    }
757
758    /// Sends a terminate message and closes the connection.
759    ///
760    /// # Errors
761    ///
762    /// Returns [`Error`] (I/O) if writing the `Terminate` frame or
763    /// flushing the transport fails.
764    pub fn terminate(&mut self) -> Result<()> {
765        frontend::terminate(&mut self.write_buf);
766        self.flush()
767    }
768
769    /// Returns a mutable reference to the write buffer.
770    pub fn write_buf(&mut self) -> &mut BytesMut {
771        &mut self.write_buf
772    }
773
774    /// Initiates a COPY IN operation with `HyperBinary` format.
775    ///
776    /// This sends a COPY ... FROM STDIN query and waits for `CopyInResponse`.
777    /// After this returns successfully, the caller should send data using
778    /// `send_copy_data` and then call `finish_copy` or `cancel_copy`.
779    ///
780    /// # Errors
781    ///
782    /// Same failure modes as [`Self::start_copy_in_with_format`].
783    pub fn start_copy_in(&mut self, table_name: &str, columns: &[&str]) -> Result<()> {
784        self.start_copy_in_with_format(table_name, columns, "HYPERBINARY")
785    }
786
787    /// Initiates a COPY IN operation with a specified format.
788    ///
789    /// This sends a COPY ... FROM STDIN query and waits for `CopyInResponse`.
790    /// After this returns successfully, the caller should send data using
791    /// `send_copy_data` and then call `finish_copy` or `cancel_copy`.
792    ///
793    /// # Arguments
794    ///
795    /// * `table_name` - The target table name (should be properly quoted if needed)
796    /// * `columns` - Column names to insert into
797    /// * `format` - The data format string: "HYPERBINARY" or "ARROWSTREAM"
798    ///
799    /// # Example
800    ///
801    /// ```no_run
802    /// # use hyperdb_api_core::client::connection::RawConnection;
803    /// # use std::net::TcpStream;
804    /// # fn example(conn: &mut RawConnection<TcpStream>) -> hyperdb_api_core::client::Result<()> {
805    /// // For HyperBinary format (default)
806    /// conn.start_copy_in_with_format("my_table", &["col1", "col2"], "HYPERBINARY")?;
807    ///
808    /// // For Arrow IPC stream format
809    /// conn.start_copy_in_with_format("my_table", &["col1", "col2"], "ARROWSTREAM")?;
810    /// # Ok(())
811    /// # }
812    /// ```
813    ///
814    /// # Errors
815    ///
816    /// - Returns [`Error`] (connection) if the connection has been
817    ///   marked unhealthy by a prior failure.
818    /// - Returns [`Error`] (server) if the server rejects the generated
819    ///   `COPY ... FROM STDIN` statement (missing table, column
820    ///   mismatch, etc.).
821    /// - Returns [`Error`] (I/O) on wire-protocol I/O failure.
822    pub fn start_copy_in_with_format(
823        &mut self,
824        table_name: &str,
825        columns: &[&str],
826        format: &str,
827    ) -> Result<()> {
828        self.ensure_healthy()?;
829        // Build COPY command with specified format
830        let column_list = if columns.is_empty() {
831            String::new()
832        } else {
833            format!(
834                " ({})",
835                columns
836                    .iter()
837                    .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
838                    .collect::<Vec<_>>()
839                    .join(", ")
840            )
841        };
842
843        let query = format!("COPY {table_name}{column_list} FROM STDIN WITH (FORMAT {format})");
844
845        frontend::query(&query, &mut self.write_buf)?;
846        self.flush()?;
847
848        // Wait for CopyInResponse
849        loop {
850            let msg = self.read_message()?;
851            match msg {
852                Message::CopyInResponse(_) => {
853                    // Ready to receive data
854                    return Ok(());
855                }
856                Message::ErrorResponse(body) => {
857                    return Err(self.consume_error(&body));
858                }
859                _ => {
860                    // Ignore other messages (like NoticeResponse)
861                }
862            }
863        }
864    }
865
866    /// Initiates a COPY IN operation from a raw SQL query string.
867    ///
868    /// The query must be a complete `COPY ... FROM STDIN ...` statement.
869    ///
870    /// # Errors
871    ///
872    /// Same failure modes as [`Self::start_copy_in_with_format`]: unhealthy
873    /// connection, server-side SQL rejection, or transport I/O failure.
874    pub fn start_copy_in_raw(&mut self, query: &str) -> Result<()> {
875        self.ensure_healthy()?;
876        frontend::query(query, &mut self.write_buf)?;
877        self.flush()?;
878
879        loop {
880            let msg = self.read_message()?;
881            match msg {
882                Message::CopyInResponse(_) => {
883                    return Ok(());
884                }
885                Message::ErrorResponse(body) => {
886                    return Err(self.consume_error(&body));
887                }
888                _ => {}
889            }
890        }
891    }
892
893    /// Sends COPY data to the server.
894    ///
895    /// The data should be in `HyperBinary` format.
896    ///
897    /// # Errors
898    ///
899    /// Currently infallible — frame construction is pure. The `Result`
900    /// return type is preserved for forward compatibility.
901    pub fn send_copy_data(&mut self, data: &[u8]) -> Result<()> {
902        frontend::copy_data(data, &mut self.write_buf);
903        // Don't flush immediately for better batching
904        // Caller can call flush() explicitly if needed
905        Ok(())
906    }
907
908    /// Sends COPY data directly to the stream without internal buffering.
909    ///
910    /// This writes the `CopyData` message directly to the TCP stream, letting
911    /// the kernel's TCP stack handle buffering. Use `flush_stream()` periodically
912    /// to ensure data is sent.
913    ///
914    /// This is more efficient for streaming large amounts of data as it avoids
915    /// copying data into an intermediate buffer.
916    ///
917    /// # Errors
918    ///
919    /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
920    ///   `u32::MAX` (the PostgreSQL per-message length cap).
921    /// - Returns [`Error`] (I/O) if flushing buffered bytes or writing
922    ///   the header/payload directly to the transport fails.
923    pub fn send_copy_data_direct(&mut self, data: &[u8]) -> Result<()> {
924        // First flush any pending buffered data
925        if !self.write_buf.is_empty() {
926            self.stream.write_all(&self.write_buf)?;
927            self.write_buf.clear();
928        }
929
930        // Write CopyData message header + data directly to stream
931        // Message format: 'd' (1 byte) + length (4 bytes BigEndian) + data
932        let msg_len = u32::try_from(4 + data.len())
933            .map_err(|_| Error::protocol("CopyData payload exceeds u32::MAX bytes"))?;
934        let len_be = msg_len.to_be_bytes();
935        let header = [b'd', len_be[0], len_be[1], len_be[2], len_be[3]];
936        self.stream.write_all(&header)?;
937        self.stream.write_all(data)?;
938        Ok(())
939    }
940
941    /// Flushes the TCP stream without clearing the write buffer.
942    ///
943    /// Use this with `send_copy_data_direct()` to periodically ensure
944    /// data is sent to the server.
945    ///
946    /// # Errors
947    ///
948    /// Returns [`Error`] (I/O) if flushing the underlying transport
949    /// fails.
950    pub fn flush_stream(&mut self) -> Result<()> {
951        self.stream.flush()?;
952        Ok(())
953    }
954
955    /// Finishes a COPY IN operation successfully.
956    ///
957    /// This sends `CopyDone` and waits for `CommandComplete`.
958    /// Returns the number of rows inserted.
959    ///
960    /// # Errors
961    ///
962    /// - Returns [`Error`] (server) if the server emits an `ErrorResponse`
963    ///   during finalization (e.g. constraint violation from the
964    ///   accumulated rows).
965    /// - Returns [`Error`] (I/O) on wire-protocol read/write failure.
966    pub fn finish_copy(&mut self) -> Result<u64> {
967        // Ensure all data is sent
968        self.flush()?;
969
970        // Send CopyDone
971        frontend::copy_done(&mut self.write_buf);
972        self.flush()?;
973
974        // Wait for CommandComplete and ReadyForQuery
975        let mut row_count = 0u64;
976        loop {
977            let msg = self.read_message()?;
978            match msg {
979                Message::CommandComplete(body) => {
980                    if let Ok(tag) = body.tag() {
981                        // Parse row count from tag like "COPY 1234"
982                        if let Some(count_str) = tag.strip_prefix("COPY ") {
983                            if let Ok(count) = count_str.trim().parse() {
984                                row_count = count;
985                            }
986                        }
987                    }
988                }
989                Message::ReadyForQuery(_) => {
990                    return Ok(row_count);
991                }
992                Message::ErrorResponse(body) => {
993                    return Err(self.consume_error(&body));
994                }
995                _ => {
996                    // Ignore other messages
997                }
998            }
999        }
1000    }
1001
1002    /// Cancels a COPY IN operation.
1003    ///
1004    /// This sends `CopyFail` and waits for the error response.
1005    ///
1006    /// # Errors
1007    ///
1008    /// Returns [`Error`] (I/O) if flushing the buffer or writing the
1009    /// `CopyFail` frame fails, or [`Error`] (closed) if the server
1010    /// drops the connection before returning `ReadyForQuery`.
1011    pub fn cancel_copy(&mut self, reason: &str) -> Result<()> {
1012        // Ensure buffer is clear
1013        self.flush()?;
1014
1015        // Send CopyFail
1016        frontend::copy_fail(reason, &mut self.write_buf);
1017        self.flush()?;
1018
1019        // Wait for ErrorResponse and ReadyForQuery
1020        loop {
1021            let msg = self.read_message()?;
1022            match msg {
1023                Message::ReadyForQuery(_) => {
1024                    return Ok(());
1025                }
1026                Message::ErrorResponse(_) => {
1027                    // Expected - the server confirms the cancel
1028                }
1029                _ => {
1030                    // Ignore other messages
1031                }
1032            }
1033        }
1034    }
1035
1036    /// Executes a COPY ... TO STDOUT query and returns all output data.
1037    ///
1038    /// This is used for queries like:
1039    /// `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
1040    ///
1041    /// The method:
1042    /// 1. Sends the query
1043    /// 2. Waits for `CopyOutResponse`
1044    /// 3. Collects all `CopyData` messages
1045    /// 4. Waits for `CopyDone`, `CommandComplete`, and `ReadyForQuery`
1046    ///
1047    /// # Arguments
1048    ///
1049    /// * `query` - The COPY TO STDOUT query to execute
1050    ///
1051    /// # Returns
1052    ///
1053    /// The raw bytes from all `CopyData` messages concatenated together.
1054    ///
1055    /// # Example
1056    ///
1057    /// ```no_run
1058    /// # use hyperdb_api_core::client::connection::RawConnection;
1059    /// # use std::net::TcpStream;
1060    /// # fn example(conn: &mut RawConnection<TcpStream>) -> hyperdb_api_core::client::Result<()> {
1061    /// let arrow_data = conn.copy_out(
1062    ///     "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
1063    /// )?;
1064    /// # Ok(())
1065    /// # }
1066    /// ```
1067    ///
1068    /// # Errors
1069    ///
1070    /// - Returns [`Error`] (connection) if the connection has been
1071    ///   marked unhealthy.
1072    /// - Returns [`Error`] (server) when the server rejects the COPY TO
1073    ///   STDOUT statement via `ErrorResponse`.
1074    /// - Returns [`Error`] (I/O) / [`Error`] (closed) on transport
1075    ///   read/write failure.
1076    pub fn copy_out(&mut self, query: &str) -> Result<Vec<u8>> {
1077        self.ensure_healthy()?;
1078        // Send the query
1079        frontend::query(query, &mut self.write_buf)?;
1080        self.flush()?;
1081
1082        let mut data = Vec::new();
1083        let mut in_copy_out = false;
1084
1085        // Process messages
1086        loop {
1087            let msg = self.read_message()?;
1088            match msg {
1089                Message::CopyOutResponse(_) => {
1090                    // Server is ready to send COPY data
1091                    in_copy_out = true;
1092                }
1093                Message::CopyData(body) if in_copy_out => {
1094                    // Accumulate the copy data
1095                    data.extend_from_slice(body.data());
1096                }
1097                Message::CopyDone => {
1098                    // COPY data transfer complete
1099                    in_copy_out = false;
1100                }
1101                Message::CommandComplete(_) => {
1102                    // Command finished
1103                }
1104                Message::ReadyForQuery(_) => {
1105                    // Connection is ready for next command
1106                    return Ok(data);
1107                }
1108                Message::ErrorResponse(body) => {
1109                    return Err(self.consume_error(&body));
1110                }
1111                _ => {
1112                    // Ignore other messages (like NoticeResponse)
1113                }
1114            }
1115        }
1116    }
1117
1118    /// Streams COPY OUT data directly to a writer without buffering all data in memory.
1119    ///
1120    /// Returns the total number of bytes written.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Same failure modes as [`Self::copy_out`], plus [`Error`] (I/O)
1125    /// wrapping any error from `writer.write_all` when the target
1126    /// writer cannot accept a COPY chunk.
1127    pub fn copy_out_to_writer(
1128        &mut self,
1129        query: &str,
1130        writer: &mut dyn std::io::Write,
1131    ) -> Result<u64> {
1132        self.ensure_healthy()?;
1133        frontend::query(query, &mut self.write_buf)?;
1134        self.flush()?;
1135
1136        let mut total_bytes: u64 = 0;
1137        let mut in_copy_out = false;
1138
1139        loop {
1140            let msg = self.read_message()?;
1141            match msg {
1142                Message::CopyOutResponse(_) => {
1143                    in_copy_out = true;
1144                }
1145                Message::CopyData(body) if in_copy_out => {
1146                    let chunk = body.data();
1147                    writer.write_all(chunk).map_err(|e| {
1148                        Error::new(
1149                            super::error::ErrorKind::Io,
1150                            format!("Failed to write COPY data: {e}"),
1151                        )
1152                    })?;
1153                    total_bytes += chunk.len() as u64;
1154                }
1155                Message::CopyDone => {
1156                    in_copy_out = false;
1157                }
1158                Message::CommandComplete(_) => {}
1159                Message::ReadyForQuery(_) => {
1160                    return Ok(total_bytes);
1161                }
1162                Message::ErrorResponse(body) => {
1163                    return Err(self.consume_error(&body));
1164                }
1165                _ => {}
1166            }
1167        }
1168    }
1169}
1170
1171/// Parses an error response into an Error.
1172pub(crate) fn parse_error_response(
1173    body: &crate::protocol::message::backend::ErrorResponseBody,
1174) -> Error {
1175    let mut severity = String::from("ERROR");
1176    let mut code = String::from("00000");
1177    let mut message = String::from("unknown error");
1178
1179    for field in body.fields().filter_map(|r| {
1180        r.map_err(|e| trace!(target: "hyperdb_api_core::client", error = %e, "dropped error parsing error response field")).ok()
1181    }) {
1182        match field.type_() {
1183            b'S' | b'V' => {
1184                if let Ok(s) = field.value() {
1185                    severity = s.to_string();
1186                }
1187            }
1188            b'C' => {
1189                if let Ok(s) = field.value() {
1190                    code = s.to_string();
1191                }
1192            }
1193            b'M' => {
1194                if let Ok(s) = field.value() {
1195                    message = s.to_string();
1196                }
1197            }
1198            _ => {}
1199        }
1200    }
1201
1202    Error::db(&severity, &code, &message)
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use super::*;
1208    use std::io::Cursor;
1209
1210    /// Minimal `Read + Write` harness. Discards all writes and hands back
1211    /// empty reads so we can construct a `RawConnection` without touching
1212    /// the network. Adequate for exercising pure-state logic like the
1213    /// `desynchronized` flag and `ensure_healthy` — not for anything that
1214    /// actually needs a live server response.
1215    struct NullStream;
1216    impl std::io::Read for NullStream {
1217        fn read(&mut self, _: &mut [u8]) -> std::io::Result<usize> {
1218            Ok(0)
1219        }
1220    }
1221    impl std::io::Write for NullStream {
1222        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1223            Ok(buf.len())
1224        }
1225        fn flush(&mut self) -> std::io::Result<()> {
1226            Ok(())
1227        }
1228    }
1229
1230    /// Fresh connections are healthy and `ensure_healthy` is a no-op.
1231    #[test]
1232    fn fresh_connection_is_healthy() {
1233        let conn = RawConnection::new(NullStream);
1234        assert!(conn.is_healthy());
1235        assert!(conn.ensure_healthy().is_ok());
1236    }
1237
1238    /// Once `desynchronized` is set, `is_healthy` reports false and
1239    /// `ensure_healthy` returns a `Connection`-kind error whose message
1240    /// explicitly names the desync so log consumers can grep for it.
1241    #[test]
1242    fn desynchronized_connection_fails_health_check() {
1243        let mut conn = RawConnection::new(NullStream);
1244        conn.desynchronized = true;
1245        assert!(!conn.is_healthy());
1246        let err = conn.ensure_healthy().expect_err("must fail-fast");
1247        assert_eq!(err.kind(), crate::client::error::ErrorKind::Connection);
1248        assert!(
1249            err.to_string().to_lowercase().contains("desynchron"),
1250            "error message should mention desynchronization; got: {err}",
1251        );
1252    }
1253
1254    /// `drain_until_ready_bounded` with budget `0` returns false without
1255    /// reading anything, and marks the connection desynchronized. This is
1256    /// the cheapest way to exercise the "budget exhausted" code path
1257    /// without a live protocol stream. Uses a `Cursor` over an empty
1258    /// buffer so the underlying stream is well-defined.
1259    #[test]
1260    fn zero_budget_drain_marks_desynchronized() {
1261        let mut conn = RawConnection::new(Cursor::new(Vec::<u8>::new()));
1262        assert!(conn.is_healthy());
1263        let ok = conn.drain_until_ready_bounded(0);
1264        assert!(!ok, "zero-budget drain must return false");
1265        assert!(
1266            !conn.is_healthy(),
1267            "drain failure must mark connection desynchronized",
1268        );
1269    }
1270
1271    /// Once desynchronized, the main public request APIs all fail-fast
1272    /// with the `ensure_healthy` error instead of sending bytes into a
1273    /// known-poisoned wire. Spot-check one sync query method here; the
1274    /// check itself (`self.ensure_healthy()?`) is a trivial first line
1275    /// at every entry point so extending coverage to every API wouldn't
1276    /// catch additional bug classes.
1277    #[test]
1278    fn desynchronized_connection_fast_fails_simple_query() {
1279        let mut conn = RawConnection::new(Cursor::new(Vec::<u8>::new()));
1280        conn.desynchronized = true;
1281        // `Message` doesn't implement `Debug`, so we can't use
1282        // `expect_err`; match the result directly instead.
1283        let Err(err) = conn.simple_query("SELECT 1") else {
1284            panic!("desynced simple_query must fail-fast")
1285        };
1286        assert_eq!(err.kind(), crate::client::error::ErrorKind::Connection);
1287        assert!(err.to_string().to_lowercase().contains("desynchron"));
1288    }
1289}