pub struct RawConnection<S> { /* private fields */ }Expand description
A raw connection to a Hyper server.
This handles the low-level protocol communication, including:
- Message framing (reading/writing
PostgreSQLwire protocol messages) - Authentication handshake
- Query execution (simple and extended query protocols)
- COPY protocol support
The connection is generic over the stream type S, allowing it to work
with different transport mechanisms (TCP, TLS, etc.) as long as they
implement Read + Write.
§Buffering
The connection maintains separate read and write buffers for efficient I/O. Messages are buffered before being sent, and incoming data is buffered until complete messages can be parsed.
Implementations§
Source§impl<S> RawConnection<S>
impl<S> RawConnection<S>
Sourcepub fn new(stream: S) -> Self
pub fn new(stream: S) -> Self
Creates a new raw connection from a stream.
Initializes read and write buffers with default capacity (64 KB each).
The connection is not yet authenticated - call startup() to begin
the connection handshake.
§Arguments
stream- The I/O stream (must implementRead + Write)
Sourcepub fn is_healthy(&self) -> bool
pub fn is_healthy(&self) -> bool
Returns true if this connection is still in a known-good state
and safe to use for new requests.
Once the wire protocol falls out of sync with the server (see the
desynchronized field on RawConnection), this returns false
permanently — the only recovery is to drop this connection and
open a new one. Pool implementations should consult this before
running a recycle health probe to avoid spending a roundtrip on a
connection that is already known to be bad.
Sourcepub fn reserve_write_buffer(&mut self, additional: usize)
pub fn reserve_write_buffer(&mut self, additional: usize)
Reserves capacity in the write buffer to avoid reallocations.
Call this before bulk operations to pre-allocate buffer space. This is useful for high-throughput scenarios where buffer growth would cause performance overhead.
Sourcepub fn process_id(&self) -> i32
pub fn process_id(&self) -> i32
Returns the process ID assigned by the server.
Sourcepub fn secret_key(&self) -> i32
pub fn secret_key(&self) -> i32
Returns the secret key for cancel requests.
Sourcepub fn stream_mut(&mut self) -> &mut S
pub fn stream_mut(&mut self) -> &mut S
Returns a mutable reference to the underlying stream.
Sourcepub fn parameter_status(&self, name: &str) -> Option<&str>
pub fn parameter_status(&self, name: &str) -> Option<&str>
Returns a server parameter value by name.
Server parameters are sent by the server during connection startup. Common parameters include:
server_version- The server version stringserver_encoding- The server’s character encodingclient_encoding- The client’s character encoding
Sourcepub fn startup(
&mut self,
params: &[(&str, &str)],
password: Option<&str>,
) -> Result<()>
pub fn startup( &mut self, params: &[(&str, &str)], password: Option<&str>, ) -> Result<()>
Sends a startup message and performs initial handshake.
§Errors
- Returns
Error(auth) when the server requests an auth method (cleartext, MD5, SASL) and no password is supplied, when the offered SASL mechanisms exclude SCRAM-SHA-256, or when SCRAM state is missing at the SASL-continue / SASL-final step. - Returns
Error(server) when the server sends anErrorResponseduring startup (for example, unknown user or database). - Returns
Error(protocol) if a message arrives out of the expected startup sequence. - Returns
Error(I/O) on wire-protocol read/write failure.
Sourcepub fn simple_query(&mut self, query: &str) -> Result<Vec<Message>>
pub fn simple_query(&mut self, query: &str) -> Result<Vec<Message>>
Sends a simple query and returns all messages until ReadyForQuery.
§Errors
- Returns
Error(server) when the server sends anErrorResponse(SQL error, constraint violation, etc.). - Returns
Error(I/O) on transport read/write failure. - Returns
Error(closed) if the server closes the connection mid-query. - Returns
Error(connection) if the connection has already been marked unhealthy by a prior failure.
Sourcepub fn query_binary(&mut self, query: &str) -> Result<Vec<Message>>
pub fn query_binary(&mut self, query: &str) -> Result<Vec<Message>>
Sends a query using extended protocol with binary format results.
This uses the PostgreSQL extended query protocol (Parse/Bind/Execute/Sync)
with HyperBinary format (format code 2) for maximum performance.
Returns all messages until ReadyForQuery.
§Errors
Same failure modes as Self::simple_query — server-side SQL
errors surface as Error (server), transport failures as
Error (I/O) / Error (closed), and an unhealthy prior state
as Error (connection).
Sourcepub fn start_query_binary(&mut self, query: &str) -> Result<()>
pub fn start_query_binary(&mut self, query: &str) -> Result<()>
Starts a binary query but leaves result consumption to the caller.
This is useful for streaming scenarios where you want to pull messages incrementally instead of materializing the full result set up front.
§Errors
Sourcepub fn start_simple_query(&mut self, query: &str) -> Result<()>
pub fn start_simple_query(&mut self, query: &str) -> Result<()>
Starts a simple query but leaves result consumption to the caller.
This is useful for streaming scenarios where you want to pull messages incrementally instead of materializing the full result set up front.
§Errors
Same failure modes as Self::start_query_binary —
Error (connection) for unhealthy state, Error (I/O) for
transport failure.
Sourcepub fn start_execute_prepared(
&mut self,
statement_name: &str,
params: &[Option<&[u8]>],
column_count: usize,
) -> Result<()>
pub fn start_execute_prepared( &mut self, statement_name: &str, params: &[Option<&[u8]>], column_count: usize, ) -> Result<()>
Starts an execute of a prepared statement but leaves result consumption to the caller.
Sends Bind + Execute(unnamed_portal, 0) + Sync, then
returns. The caller drives a message loop that reads
BindComplete, any DataRows, then CommandComplete +
ReadyForQuery — the same shape used by
Self::start_query_binary.
Format codes (PostgreSQL wire protocol):
- Parameters: format
1(standard PG binary, big-endian). Hyper’s server-side Bind decodes bound parameters as standard PG binary regardless of the format code we advertise. The caller is responsible for supplying parameter bytes in BE. - Results: format
2(HyperBinary, little-endian). Hyper supports this as a separate protocol extension; the row decoders insuper::row::StreamRowand the hyperdb-apiRowtype all expect LE, so requesting LE at Bind time avoids an extra conversion pass.
max_rows = 0 means “send all rows” — we pace on the client side
by reading DataRows in chunks from the read buffer.
§Errors
Sourcepub fn read_message(&mut self) -> Result<Message>
pub fn read_message(&mut self) -> Result<Message>
Reads a single message from the server.
§Errors
- Returns
Error(I/O) if reading from the transport fails or ifMessage::parsereports a malformed frame. - Returns
Error(closed) when the transport reaches EOF (server closed the connection).
Sourcepub fn drain_until_ready(&mut self)
pub fn drain_until_ready(&mut self)
Drains messages from the server until a Message::ReadyForQuery is
seen, discarding them. Call this after receiving an
Message::ErrorResponse to stay in sync with the wire protocol.
Per the PostgreSQL wire protocol, every query (simple or extended) ends
with ReadyForQuery, even if the statement failed. Without this drain,
the ReadyForQuery (and any other trailing messages) remain in the
read buffer and get consumed by the next operation’s response parser,
which misinterprets them — classic wire desync.
This is the unbounded variant, safe to use in the standard
error path where the server has already sent ErrorResponse and
ReadyForQuery is guaranteed to arrive within a few messages. In
exceptional cases where the drain might take arbitrarily long —
most notably the Drop path for a streaming result that the caller
abandoned mid-way — prefer
drain_until_ready_bounded to
avoid blocking indefinitely on an unresponsive server.
Drain errors (connection already closed, I/O failure mid-drain) are
logged via tracing::warn! and then swallowed. The caller’s original
error is more informative to surface, and a dead connection will be
reported on the next real operation anyway.
Sourcepub fn drain_until_ready_bounded(&mut self, max_messages: usize) -> bool
pub fn drain_until_ready_bounded(&mut self, max_messages: usize) -> bool
Bounded version of drain_until_ready that
stops after reading at most max_messages messages. Returns true
when ReadyForQuery was observed within that budget; false if the
budget was exhausted first or an I/O error occurred before reaching it.
§Why we do not send Sync before draining
A natural question is whether to send a Sync message first to prompt
the server to emit ReadyForQuery sooner. The answer for Hyper is
no — it would actively corrupt the next query.
Per the Hyper server state machine (see LibpqConnection::handleSync
and handleQueryDone), every query — simple or extended — already
ends with exactly one ReadyForQuery emission. After an error or
normal completion the server returns to its main loop. If we then
send a Sync, handleSync would emit an additional
ReadyForQuery that no current operation is reading, and the next
query’s response parser would consume that stale ReadyForQuery
as its own terminator — the symptom is that query returning an
empty result with “Query returned no rows”.
For the abandoned-stream case (a long-running query that the client
stopped reading), Sync also does not help: Hyper processes the
incoming byte stream in order, so Sync is only handled after
the in-flight Execute finishes emitting all its DataRows plus
its own CommandComplete and ReadyForQuery. By that point the
drain has already reached ReadyForQuery, and the Sync produces
the same extra ReadyForQuery contamination described above.
The canonical way to abort a running query is to open a separate
connection and send CancelRequest with the original connection’s
process id and secret. That is exactly what
QueryStream’s Drop impl does
(via the Cancellable trait)
before calling this bounded drain. Cancel-then-drain converges on
ReadyForQuery within a handful of messages because the server
stops producing new DataRows once it observes the cancel.
§Poisoned connections
When this returns false the connection is in an indeterminate
state. Callers should treat it as poisoned and not return it to a
connection pool — the next operation will see residual bytes from
whatever was still streaming. The bounded variant exists precisely
to prevent indefinite blocking in contexts like Drop impls where
we don’t own the thread’s time and can’t afford to wait for a
multi-million-row query result to finish before returning from a
destructor.
All drain errors are logged via tracing::warn! so state-related
issues are observable in logs even though they don’t interrupt the
caller’s control flow.
Sourcepub fn consume_error(&mut self, body: &ErrorResponseBody) -> Error
pub fn consume_error(&mut self, body: &ErrorResponseBody) -> Error
Convenience: parse a server Message::ErrorResponse body into an
Error and drain the rest of the response through the trailing
Message::ReadyForQuery so the connection is safe to reuse.
Callers should almost always prefer this over calling
drain_until_ready or
drain_until_ready_bounded by
hand, because forgetting the drain is exactly the bug it exists to
prevent.
§Drain budget
Uses a bounded drain with a POST_ERROR_DRAIN_CAP-message budget
rather than the unbounded drain_until_ready.
A well-behaved server emits only a handful of messages after
ErrorResponse before ReadyForQuery — typically just the error
itself plus the Z, occasionally with a few NoticeResponse
messages interleaved — so the cap is orders of magnitude above
anything a legitimate error path produces. The cap exists purely
as a defensive safety valve against pathological server behavior
(a broken backend that never emits ReadyForQuery) and
misbehaved network paths (stalled reads that would otherwise hang
the caller indefinitely, particularly visible in async contexts).
If the cap is exceeded, drain_until_ready_bounded logs a
tracing::warn! and marks the connection desynchronized; the
next operation on it will surface a transport-level failure and
trigger reconnect higher up. That is strictly better than
blocking forever with no observable symptom.
§Example
match msg {
Message::ErrorResponse(body) => {
return Err(self.consume_error(&body));
}
// ...
}Sourcepub fn start_copy_in(
&mut self,
table_name: &str,
columns: &[&str],
) -> Result<()>
pub fn start_copy_in( &mut self, table_name: &str, columns: &[&str], ) -> Result<()>
Initiates a COPY IN operation with HyperBinary format.
This sends a COPY … FROM STDIN query and waits for CopyInResponse.
After this returns successfully, the caller should send data using
send_copy_data and then call finish_copy or cancel_copy.
§Errors
Same failure modes as Self::start_copy_in_with_format.
Sourcepub fn start_copy_in_with_format(
&mut self,
table_name: &str,
columns: &[&str],
format: &str,
) -> Result<()>
pub fn start_copy_in_with_format( &mut self, table_name: &str, columns: &[&str], format: &str, ) -> Result<()>
Initiates a COPY IN operation with a specified format.
This sends a COPY … FROM STDIN query and waits for CopyInResponse.
After this returns successfully, the caller should send data using
send_copy_data and then call finish_copy or cancel_copy.
§Arguments
table_name- The target table name (should be properly quoted if needed)columns- Column names to insert intoformat- The data format string: “HYPERBINARY” or “ARROWSTREAM”
§Example
// For HyperBinary format (default)
conn.start_copy_in_with_format("my_table", &["col1", "col2"], "HYPERBINARY")?;
// For Arrow IPC stream format
conn.start_copy_in_with_format("my_table", &["col1", "col2"], "ARROWSTREAM")?;§Errors
Sourcepub fn start_copy_in_raw(&mut self, query: &str) -> Result<()>
pub fn start_copy_in_raw(&mut self, query: &str) -> Result<()>
Initiates a COPY IN operation from a raw SQL query string.
The query must be a complete COPY ... FROM STDIN ... statement.
§Errors
Same failure modes as Self::start_copy_in_with_format: unhealthy
connection, server-side SQL rejection, or transport I/O failure.
Sourcepub fn send_copy_data(&mut self, data: &[u8]) -> Result<()>
pub fn send_copy_data(&mut self, data: &[u8]) -> Result<()>
Sends COPY data to the server.
The data should be in HyperBinary format.
§Errors
Currently infallible — frame construction is pure. The Result
return type is preserved for forward compatibility.
Sourcepub fn send_copy_data_direct(&mut self, data: &[u8]) -> Result<()>
pub fn send_copy_data_direct(&mut self, data: &[u8]) -> Result<()>
Sends COPY data directly to the stream without internal buffering.
This writes the CopyData message directly to the TCP stream, letting
the kernel’s TCP stack handle buffering. Use flush_stream() periodically
to ensure data is sent.
This is more efficient for streaming large amounts of data as it avoids copying data into an intermediate buffer.
§Errors
Sourcepub fn flush_stream(&mut self) -> Result<()>
pub fn flush_stream(&mut self) -> Result<()>
Sourcepub fn finish_copy(&mut self) -> Result<u64>
pub fn finish_copy(&mut self) -> Result<u64>
Finishes a COPY IN operation successfully.
This sends CopyDone and waits for CommandComplete.
Returns the number of rows inserted.
§Errors
Sourcepub fn cancel_copy(&mut self, reason: &str) -> Result<()>
pub fn cancel_copy(&mut self, reason: &str) -> Result<()>
Sourcepub fn copy_out(&mut self, query: &str) -> Result<Vec<u8>>
pub fn copy_out(&mut self, query: &str) -> Result<Vec<u8>>
Executes a COPY … TO STDOUT query and returns all output data.
This is used for queries like:
COPY (SELECT ...) TO STDOUT WITH (format arrowstream)
The method:
- Sends the query
- Waits for
CopyOutResponse - Collects all
CopyDatamessages - Waits for
CopyDone,CommandComplete, andReadyForQuery
§Arguments
query- The COPY TO STDOUT query to execute
§Returns
The raw bytes from all CopyData messages concatenated together.
§Example
let arrow_data = conn.copy_out(
"COPY (SELECT * FROM my_table) TO STDOUT WITH (format arrowstream)"
)?;§Errors
Sourcepub fn copy_out_to_writer(
&mut self,
query: &str,
writer: &mut dyn Write,
) -> Result<u64>
pub fn copy_out_to_writer( &mut self, query: &str, writer: &mut dyn Write, ) -> Result<u64>
Streams COPY OUT data directly to a writer without buffering all data in memory.
Returns the total number of bytes written.
§Errors
Same failure modes as Self::copy_out, plus Error (I/O)
wrapping any error from writer.write_all when the target
writer cannot accept a COPY chunk.
Trait Implementations§
Auto Trait Implementations§
impl<S> Freeze for RawConnection<S>where
S: Freeze,
impl<S> RefUnwindSafe for RawConnection<S>where
S: RefUnwindSafe,
impl<S> Send for RawConnection<S>where
S: Send,
impl<S> Sync for RawConnection<S>where
S: Sync,
impl<S> Unpin for RawConnection<S>where
S: Unpin,
impl<S> UnsafeUnpin for RawConnection<S>where
S: UnsafeUnpin,
impl<S> UnwindSafe for RawConnection<S>where
S: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request