Skip to main content

RawConnection

Struct RawConnection 

Source
pub struct RawConnection<S> { /* private fields */ }
Expand description

A raw connection to a Hyper server.

This handles the low-level protocol communication, including:

  • Message framing (reading/writing PostgreSQL wire protocol messages)
  • Authentication handshake
  • Query execution (simple and extended query protocols)
  • COPY protocol support

The connection is generic over the stream type S, allowing it to work with different transport mechanisms (TCP, TLS, etc.) as long as they implement Read + Write.

§Buffering

The connection maintains separate read and write buffers for efficient I/O. Messages are buffered before being sent, and incoming data is buffered until complete messages can be parsed.

Implementations§

Source§

impl<S> RawConnection<S>
where S: Read + Write,

Source

pub fn new(stream: S) -> Self

Creates a new raw connection from a stream.

Initializes read and write buffers with default capacity (64 KB each). The connection is not yet authenticated - call startup() to begin the connection handshake.

§Arguments
  • stream - The I/O stream (must implement Read + Write)
Source

pub fn is_healthy(&self) -> bool

Returns true if this connection is still in a known-good state and safe to use for new requests.

Once the wire protocol falls out of sync with the server (see the desynchronized field on RawConnection), this returns false permanently — the only recovery is to drop this connection and open a new one. Pool implementations should consult this before running a recycle health probe to avoid spending a roundtrip on a connection that is already known to be bad.

Source

pub fn reserve_write_buffer(&mut self, additional: usize)

Reserves capacity in the write buffer to avoid reallocations.

Call this before bulk operations to pre-allocate buffer space. This is useful for high-throughput scenarios where buffer growth would cause performance overhead.

Source

pub fn process_id(&self) -> i32

Returns the process ID assigned by the server.

Source

pub fn secret_key(&self) -> i32

Returns the secret key for cancel requests.

Source

pub fn stream(&self) -> &S

Returns a reference to the underlying stream.

Source

pub fn stream_mut(&mut self) -> &mut S

Returns a mutable reference to the underlying stream.

Source

pub fn parameter_status(&self, name: &str) -> Option<&str>

Returns a server parameter value by name.

Server parameters are sent by the server during connection startup. Common parameters include:

  • server_version - The server version string
  • server_encoding - The server’s character encoding
  • client_encoding - The client’s character encoding
Source

pub fn startup( &mut self, params: &[(&str, &str)], password: Option<&str>, ) -> Result<()>

Sends a startup message and performs initial handshake.

§Errors
  • Returns Error (auth) when the server requests an auth method (cleartext, MD5, SASL) and no password is supplied, when the offered SASL mechanisms exclude SCRAM-SHA-256, or when SCRAM state is missing at the SASL-continue / SASL-final step.
  • Returns Error (server) when the server sends an ErrorResponse during startup (for example, unknown user or database).
  • Returns Error (protocol) if a message arrives out of the expected startup sequence.
  • Returns Error (I/O) on wire-protocol read/write failure.
Source

pub fn simple_query(&mut self, query: &str) -> Result<Vec<Message>>

Sends a simple query and returns all messages until ReadyForQuery.

§Errors
  • Returns Error (server) when the server sends an ErrorResponse (SQL error, constraint violation, etc.).
  • Returns Error (I/O) on transport read/write failure.
  • Returns Error (closed) if the server closes the connection mid-query.
  • Returns Error (connection) if the connection has already been marked unhealthy by a prior failure.
Source

pub fn query_binary(&mut self, query: &str) -> Result<Vec<Message>>

Sends a query using extended protocol with binary format results.

This uses the PostgreSQL extended query protocol (Parse/Bind/Execute/Sync) with HyperBinary format (format code 2) for maximum performance.

Returns all messages until ReadyForQuery.

§Errors

Same failure modes as Self::simple_query — server-side SQL errors surface as Error (server), transport failures as Error (I/O) / Error (closed), and an unhealthy prior state as Error (connection).

Source

pub fn start_query_binary(&mut self, query: &str) -> Result<()>

Starts a binary query but leaves result consumption to the caller.

This is useful for streaming scenarios where you want to pull messages incrementally instead of materializing the full result set up front.

§Errors
  • Returns Error (connection) if the connection has been marked unhealthy.
  • Returns Error (I/O) if writing the Parse/Bind/Execute/Sync sequence to the transport fails.
Source

pub fn start_simple_query(&mut self, query: &str) -> Result<()>

Starts a simple query but leaves result consumption to the caller.

This is useful for streaming scenarios where you want to pull messages incrementally instead of materializing the full result set up front.

§Errors

Same failure modes as Self::start_query_binaryError (connection) for unhealthy state, Error (I/O) for transport failure.

Source

pub fn start_execute_prepared( &mut self, statement_name: &str, params: &[Option<&[u8]>], column_count: usize, ) -> Result<()>

Starts an execute of a prepared statement but leaves result consumption to the caller.

Sends Bind + Execute(unnamed_portal, 0) + Sync, then returns. The caller drives a message loop that reads BindComplete, any DataRows, then CommandComplete + ReadyForQuery — the same shape used by Self::start_query_binary.

Format codes (PostgreSQL wire protocol):

  • Parameters: format 1 (standard PG binary, big-endian). Hyper’s server-side Bind decodes bound parameters as standard PG binary regardless of the format code we advertise. The caller is responsible for supplying parameter bytes in BE.
  • Results: format 2 (HyperBinary, little-endian). Hyper supports this as a separate protocol extension; the row decoders in super::row::StreamRow and the hyperdb-api Row type all expect LE, so requesting LE at Bind time avoids an extra conversion pass.

max_rows = 0 means “send all rows” — we pace on the client side by reading DataRows in chunks from the read buffer.

§Errors
  • Returns Error (connection) if the connection has been marked unhealthy.
  • Returns Error (I/O) if writing the Bind/Execute/Sync sequence to the transport fails.
Source

pub fn read_message(&mut self) -> Result<Message>

Reads a single message from the server.

§Errors
  • Returns Error (I/O) if reading from the transport fails or if Message::parse reports a malformed frame.
  • Returns Error (closed) when the transport reaches EOF (server closed the connection).
Source

pub fn drain_until_ready(&mut self)

Drains messages from the server until a Message::ReadyForQuery is seen, discarding them. Call this after receiving an Message::ErrorResponse to stay in sync with the wire protocol.

Per the PostgreSQL wire protocol, every query (simple or extended) ends with ReadyForQuery, even if the statement failed. Without this drain, the ReadyForQuery (and any other trailing messages) remain in the read buffer and get consumed by the next operation’s response parser, which misinterprets them — classic wire desync.

This is the unbounded variant, safe to use in the standard error path where the server has already sent ErrorResponse and ReadyForQuery is guaranteed to arrive within a few messages. In exceptional cases where the drain might take arbitrarily long — most notably the Drop path for a streaming result that the caller abandoned mid-way — prefer drain_until_ready_bounded to avoid blocking indefinitely on an unresponsive server.

Drain errors (connection already closed, I/O failure mid-drain) are logged via tracing::warn! and then swallowed. The caller’s original error is more informative to surface, and a dead connection will be reported on the next real operation anyway.

Source

pub fn drain_until_ready_bounded(&mut self, max_messages: usize) -> bool

Bounded version of drain_until_ready that stops after reading at most max_messages messages. Returns true when ReadyForQuery was observed within that budget; false if the budget was exhausted first or an I/O error occurred before reaching it.

§Why we do not send Sync before draining

A natural question is whether to send a Sync message first to prompt the server to emit ReadyForQuery sooner. The answer for Hyper is no — it would actively corrupt the next query.

Per the Hyper server state machine (see LibpqConnection::handleSync and handleQueryDone), every query — simple or extended — already ends with exactly one ReadyForQuery emission. After an error or normal completion the server returns to its main loop. If we then send a Sync, handleSync would emit an additional ReadyForQuery that no current operation is reading, and the next query’s response parser would consume that stale ReadyForQuery as its own terminator — the symptom is that query returning an empty result with “Query returned no rows”.

For the abandoned-stream case (a long-running query that the client stopped reading), Sync also does not help: Hyper processes the incoming byte stream in order, so Sync is only handled after the in-flight Execute finishes emitting all its DataRows plus its own CommandComplete and ReadyForQuery. By that point the drain has already reached ReadyForQuery, and the Sync produces the same extra ReadyForQuery contamination described above.

The canonical way to abort a running query is to open a separate connection and send CancelRequest with the original connection’s process id and secret. That is exactly what QueryStream’s Drop impl does (via the Cancellable trait) before calling this bounded drain. Cancel-then-drain converges on ReadyForQuery within a handful of messages because the server stops producing new DataRows once it observes the cancel.

§Poisoned connections

When this returns false the connection is in an indeterminate state. Callers should treat it as poisoned and not return it to a connection pool — the next operation will see residual bytes from whatever was still streaming. The bounded variant exists precisely to prevent indefinite blocking in contexts like Drop impls where we don’t own the thread’s time and can’t afford to wait for a multi-million-row query result to finish before returning from a destructor.

All drain errors are logged via tracing::warn! so state-related issues are observable in logs even though they don’t interrupt the caller’s control flow.

Source

pub fn consume_error(&mut self, body: &ErrorResponseBody) -> Error

Convenience: parse a server Message::ErrorResponse body into an Error and drain the rest of the response through the trailing Message::ReadyForQuery so the connection is safe to reuse.

Callers should almost always prefer this over calling drain_until_ready or drain_until_ready_bounded by hand, because forgetting the drain is exactly the bug it exists to prevent.

§Drain budget

Uses a bounded drain with a POST_ERROR_DRAIN_CAP-message budget rather than the unbounded drain_until_ready. A well-behaved server emits only a handful of messages after ErrorResponse before ReadyForQuery — typically just the error itself plus the Z, occasionally with a few NoticeResponse messages interleaved — so the cap is orders of magnitude above anything a legitimate error path produces. The cap exists purely as a defensive safety valve against pathological server behavior (a broken backend that never emits ReadyForQuery) and misbehaved network paths (stalled reads that would otherwise hang the caller indefinitely, particularly visible in async contexts).

If the cap is exceeded, drain_until_ready_bounded logs a tracing::warn! and marks the connection desynchronized; the next operation on it will surface a transport-level failure and trigger reconnect higher up. That is strictly better than blocking forever with no observable symptom.

§Example
match msg {
    Message::ErrorResponse(body) => {
        return Err(self.consume_error(&body));
    }
    // ...
}
Source

pub fn flush(&mut self) -> Result<()>

Flushes the write buffer to the server.

§Errors

Returns Error (I/O) if writing the buffered bytes or flushing the underlying transport fails.

Source

pub fn terminate(&mut self) -> Result<()>

Sends a terminate message and closes the connection.

§Errors

Returns Error (I/O) if writing the Terminate frame or flushing the transport fails.

Source

pub fn write_buf(&mut self) -> &mut BytesMut

Returns a mutable reference to the write buffer.

Source

pub fn start_copy_in( &mut self, table_name: &str, columns: &[&str], ) -> Result<()>

Initiates a COPY IN operation with HyperBinary format.

This sends a COPY … FROM STDIN query and waits for CopyInResponse. After this returns successfully, the caller should send data using send_copy_data and then call finish_copy or cancel_copy.

§Errors

Same failure modes as Self::start_copy_in_with_format.

Source

pub fn start_copy_in_with_format( &mut self, table_name: &str, columns: &[&str], format: &str, ) -> Result<()>

Initiates a COPY IN operation with a specified format.

This sends a COPY … FROM STDIN query and waits for CopyInResponse. After this returns successfully, the caller should send data using send_copy_data and then call finish_copy or cancel_copy.

§Arguments
  • table_name - The target table name (should be properly quoted if needed)
  • columns - Column names to insert into
  • format - The data format string: “HYPERBINARY” or “ARROWSTREAM”
§Example
// For HyperBinary format (default)
conn.start_copy_in_with_format("my_table", &["col1", "col2"], "HYPERBINARY")?;

// For Arrow IPC stream format
conn.start_copy_in_with_format("my_table", &["col1", "col2"], "ARROWSTREAM")?;
§Errors
  • Returns Error (connection) if the connection has been marked unhealthy by a prior failure.
  • Returns Error (server) if the server rejects the generated COPY ... FROM STDIN statement (missing table, column mismatch, etc.).
  • Returns Error (I/O) on wire-protocol I/O failure.
Source

pub fn start_copy_in_raw(&mut self, query: &str) -> Result<()>

Initiates a COPY IN operation from a raw SQL query string.

The query must be a complete COPY ... FROM STDIN ... statement.

§Errors

Same failure modes as Self::start_copy_in_with_format: unhealthy connection, server-side SQL rejection, or transport I/O failure.

Source

pub fn send_copy_data(&mut self, data: &[u8]) -> Result<()>

Sends COPY data to the server.

The data should be in HyperBinary format.

§Errors

Currently infallible — frame construction is pure. The Result return type is preserved for forward compatibility.

Source

pub fn send_copy_data_direct(&mut self, data: &[u8]) -> Result<()>

Sends COPY data directly to the stream without internal buffering.

This writes the CopyData message directly to the TCP stream, letting the kernel’s TCP stack handle buffering. Use flush_stream() periodically to ensure data is sent.

This is more efficient for streaming large amounts of data as it avoids copying data into an intermediate buffer.

§Errors
  • Returns Error (protocol) if data.len() + 4 exceeds u32::MAX (the PostgreSQL per-message length cap).
  • Returns Error (I/O) if flushing buffered bytes or writing the header/payload directly to the transport fails.
Source

pub fn flush_stream(&mut self) -> Result<()>

Flushes the TCP stream without clearing the write buffer.

Use this with send_copy_data_direct() to periodically ensure data is sent to the server.

§Errors

Returns Error (I/O) if flushing the underlying transport fails.

Source

pub fn finish_copy(&mut self) -> Result<u64>

Finishes a COPY IN operation successfully.

This sends CopyDone and waits for CommandComplete. Returns the number of rows inserted.

§Errors
  • Returns Error (server) if the server emits an ErrorResponse during finalization (e.g. constraint violation from the accumulated rows).
  • Returns Error (I/O) on wire-protocol read/write failure.
Source

pub fn cancel_copy(&mut self, reason: &str) -> Result<()>

Cancels a COPY IN operation.

This sends CopyFail and waits for the error response.

§Errors

Returns Error (I/O) if flushing the buffer or writing the CopyFail frame fails, or Error (closed) if the server drops the connection before returning ReadyForQuery.

Source

pub fn copy_out(&mut self, query: &str) -> Result<Vec<u8>>

Executes a COPY … TO STDOUT query and returns all output data.

This is used for queries like: COPY (SELECT ...) TO STDOUT WITH (format arrowstream)

The method:

  1. Sends the query
  2. Waits for CopyOutResponse
  3. Collects all CopyData messages
  4. Waits for CopyDone, CommandComplete, and ReadyForQuery
§Arguments
  • query - The COPY TO STDOUT query to execute
§Returns

The raw bytes from all CopyData messages concatenated together.

§Example
let arrow_data = conn.copy_out(
    "COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
)?;
§Errors
  • Returns Error (connection) if the connection has been marked unhealthy.
  • Returns Error (server) when the server rejects the COPY TO STDOUT statement via ErrorResponse.
  • Returns Error (I/O) / Error (closed) on transport read/write failure.
Source

pub fn copy_out_to_writer( &mut self, query: &str, writer: &mut dyn Write, ) -> Result<u64>

Streams COPY OUT data directly to a writer without buffering all data in memory.

Returns the total number of bytes written.

§Errors

Same failure modes as Self::copy_out, plus Error (I/O) wrapping any error from writer.write_all when the target writer cannot accept a COPY chunk.

Trait Implementations§

Source§

impl<S: Debug> Debug for RawConnection<S>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<S> Freeze for RawConnection<S>
where S: Freeze,

§

impl<S> RefUnwindSafe for RawConnection<S>
where S: RefUnwindSafe,

§

impl<S> Send for RawConnection<S>
where S: Send,

§

impl<S> Sync for RawConnection<S>
where S: Sync,

§

impl<S> Unpin for RawConnection<S>
where S: Unpin,

§

impl<S> UnsafeUnpin for RawConnection<S>
where S: UnsafeUnpin,

§

impl<S> UnwindSafe for RawConnection<S>
where S: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more