pub struct AsyncConn { /* private fields */ }Expand description
A shared async connection that multiplexes requests from many tasks.
Implementations§
Source§impl AsyncConn
impl AsyncConn
Sourcepub fn is_alive(&self) -> bool
pub fn is_alive(&self) -> bool
Check if the connection is still alive (writer/reader tasks running).
Sourcepub fn backend_pid(&self) -> i32
pub fn backend_pid(&self) -> i32
Backend process ID assigned by the server.
Sourcepub fn cancel_token(&self) -> CancelToken
pub fn cancel_token(&self) -> CancelToken
Produce a cancel token for the running session on this connection.
Sourcepub fn mark_state_mutated(&self)
pub fn mark_state_mutated(&self)
Mark the connection as having mutated session state since the last
reset. Pools call take_state_mutated() on return to decide whether
to issue DISCARD ALL. Callers issuing BEGIN, SET (without
LOCAL), advisory locks, temp tables, etc., should call this before
submitting.
Sourcepub fn take_state_mutated(&self) -> bool
pub fn take_state_mutated(&self) -> bool
Atomically read and clear the state-mutated flag. Returns the
previous value: true means the caller should issue a reset.
Sourcepub fn is_state_mutated(&self) -> bool
pub fn is_state_mutated(&self) -> bool
Read the state-mutated flag without clearing it.
Sourcepub fn mark_broken(&self)
pub fn mark_broken(&self)
Mark the connection as broken. The reader/writer tasks may still be
running, but the session is in an indeterminate state (for example,
a transaction was dropped without commit or rollback) and the
connection must not be reused. Pool integrations check
AsyncConn::is_broken on return and destroy the connection
instead of returning it to the idle set.
Sourcepub fn is_broken(&self) -> bool
pub fn is_broken(&self) -> bool
True if the connection has been declared broken by a caller via
AsyncConn::mark_broken. Independent of AsyncConn::is_alive,
which only reflects whether the reader/writer tasks are still running.
Sourcepub fn enqueue_rollback(&self) -> bool
pub fn enqueue_rollback(&self) -> bool
Fire-and-forget enqueue of a ROLLBACK simple-query, intended to be
callable from a synchronous Drop. Returns true if the request was
queued on the writer task, false if the connection is not alive or
the channel was full/closed (in which case the caller should fall
back to AsyncConn::mark_broken so the connection is discarded
by the pool).
PostgreSQL accepts ROLLBACK from any in-transaction state — including
the aborted state (25P02) that a failed query leaves behind — so this
reliably restores the session to idle. The response is drained and
discarded; ordering on the writer queue is preserved, so any
subsequent request (e.g., the pool’s DISCARD ALL reset) sees a clean
connection.
Source§impl AsyncConn
impl AsyncConn
Sourcepub fn new(conn: WireConn) -> Self
pub fn new(conn: WireConn) -> Self
Create a new async connection from a raw WireConn. Spawns writer and reader tasks.
Sourcepub fn dropped_notifications(&self) -> u64
pub fn dropped_notifications(&self) -> u64
Cumulative number of NotificationResponse messages this connection
has discarded since it was created.
Notifications are dropped when (a) the application has not called
AsyncConn::take_notification_receiver yet, or (b) the receiver is
not draining fast enough and the bounded channel fills up. Compare
successive readings to detect missed LISTEN events.
Sourcepub fn take_notification_receiver(&self) -> Option<Receiver<BackendMsg>>
pub fn take_notification_receiver(&self) -> Option<Receiver<BackendMsg>>
Take the notification receiver. Call once to get a channel that
receives NotificationResponse messages that arrive during queries.
Sourcepub fn lookup_or_alloc(&self, sql: &str, _param_oids: &[u32]) -> (Vec<u8>, bool)
pub fn lookup_or_alloc(&self, sql: &str, _param_oids: &[u32]) -> (Vec<u8>, bool)
Look up or allocate a statement name.
Cache hit: returns the cached name with needs_parse=false. The
caller submits only Bind/Execute/Sync.
Cache miss: allocates a fresh, unique name from the connection’s
statement counter and returns (name, needs_parse=true). The name
is NOT yet published in the cache: the caller MUST include a
Parse for the new name in the same atomic submit as
Bind/Execute/Sync (so the Parse runs inside whatever
role-switched transaction the caller has framed, e.g. BEGIN; SET LOCAL ROLE …; …), and then call Self::cache_statement to
publish the name only after the Parse has succeeded on the wire.
Why publish-after-success: an earlier version pre-queued the
Parse as its own writer request and published the cache entry
up-front to avoid a race where a concurrent caller saw the
cached name and submitted a Bind-only request that races ahead
of the Parse. That eliminated the race, but ran the Parse
outside any transaction, under the connection’s persistent role
(e.g. PostgREST’s authenticator). SQL that references objects
only reachable after SET LOCAL ROLE to a user role failed
with 42501 permission denied during Parse, while every
subsequent Bind for the same name failed with 26000: prepared statement "sN" does not exist. Publishing only after a
successful Parse keeps caching role-correct: each first-time
concurrent caller pays for its own Parse (rather than sharing a
pre-queued one), and cache_statement uses first-publisher-wins
semantics so the losing names become session-bounded orphans
(bounded by the 256-entry LRU on this connection).
Sourcepub fn cache_statement(&self, sql: &str, name: &[u8])
pub fn cache_statement(&self, sql: &str, name: &[u8])
Publish a freshly Parsed statement in the cache so subsequent lookups for the same SQL skip the Parse step.
Called by the high-level exec_* helpers (and any external
caller of Self::lookup_or_alloc) after the writer submit that
included Parse for name returned successfully. Skipping this
step doesn’t cause correctness problems; the next lookup just
misses and re-Parses.
First-publisher-wins: if another concurrent miss already published a different name for the same SQL, that name stays in the cache and the caller’s name becomes a server-side orphan (cleaned up at session end; bounded by LRU eviction during the session).
LRU eviction: when the cache reaches its 256-entry cap, the
oldest entry by counter is removed and a Close + Sync is
fire-and-forget queued to free the server-side prepared
statement.
Sourcepub async fn copy_in(
&self,
copy_sql: &str,
data: &[u8],
) -> Result<u64, PgWireError>
pub async fn copy_in( &self, copy_sql: &str, data: &[u8], ) -> Result<u64, PgWireError>
Execute COPY FROM STDIN: sends the COPY command, then data in chunks, then CopyDone. Returns the number of rows copied (from CommandComplete tag).
Data is sent in chunks of up to 1MB to avoid buffering the entire payload in a single BytesMut. For small payloads (< 1MB), this is a single write.
Sourcepub async fn copy_in_stream<R: AsyncRead + Unpin>(
&self,
copy_sql: &str,
reader: R,
) -> Result<u64, PgWireError>
pub async fn copy_in_stream<R: AsyncRead + Unpin>( &self, copy_sql: &str, reader: R, ) -> Result<u64, PgWireError>
Execute COPY FROM STDIN with streaming: sends the COPY command, then reads data from an async reader in chunks, avoiding buffering the entire payload in memory.
use tokio::fs::File;
let file = File::open("data.csv").await?;
let _count = conn.copy_in_stream("COPY users FROM STDIN WITH (FORMAT csv)", file).await?;Sourcepub async fn copy_out(&self, copy_sql: &str) -> Result<Vec<u8>, PgWireError>
pub async fn copy_out(&self, copy_sql: &str) -> Result<Vec<u8>, PgWireError>
Execute COPY TO STDOUT: sends the COPY command, collects all CopyData.
Sourcepub fn invalidate_statement(&self, sql: &str)
pub fn invalidate_statement(&self, sql: &str)
Evict a SQL statement from the cache, forcing re-parse on next use. Used for prepared statement invalidation after schema changes.
Sourcepub fn clear_statement_cache(&self)
pub fn clear_statement_cache(&self)
Clear the entire statement cache. Must be called after DISCARD ALL
which destroys server-side prepared statements.
Sourcepub async fn exec_transaction(
&self,
setup_sql: &str,
query_sql: &str,
params: &[Option<&[u8]>],
param_oids: &[u32],
) -> Result<Vec<RawRow>, PgWireError>
pub async fn exec_transaction( &self, setup_sql: &str, query_sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], ) -> Result<Vec<RawRow>, PgWireError>
Execute a pipelined transaction with automatic statement caching.
On a successful Parse the new statement name is published in the
cache via Self::cache_statement. If a cached statement turns
out to be invalid (PG error 26000 or 0A000), the cache entry is
evicted and the transaction is retried once with a fresh Parse.
This handles schema changes invalidating cached plans after their
initial Parse.
Sourcepub async fn exec_query(
&self,
sql: &str,
params: &[Option<&[u8]>],
param_oids: &[u32],
) -> Result<Vec<RawRow>, PgWireError>
pub async fn exec_query( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], ) -> Result<Vec<RawRow>, PgWireError>
Execute a parameterized query with automatic statement caching. If a cached statement is invalidated by a schema change (PG error 26000 or 0A000), automatically evicts the cache entry, re-parses, and retries once.
Sourcepub async fn submit(
&self,
messages: BytesMut,
collector: ResponseCollector,
) -> Result<PipelineResponse, PgWireError>
pub async fn submit( &self, messages: BytesMut, collector: ResponseCollector, ) -> Result<PipelineResponse, PgWireError>
Submit a request to the connection. Returns a future that resolves when the response is available. Times out after 5 minutes to prevent hanging forever if the reader/writer task dies.
Sourcepub async fn submit_batch(
&self,
items: Vec<(BytesMut, ResponseCollector)>,
) -> Result<Vec<Result<PipelineResponse, PgWireError>>, PgWireError>
pub async fn submit_batch( &self, items: Vec<(BytesMut, ResponseCollector)>, ) -> Result<Vec<Result<PipelineResponse, PgWireError>>, PgWireError>
Submit a batch of requests in FIFO order. All requests are queued before any response is awaited, so the writer task sees them together and coalesces them into a single write() syscall. The server then pipelines the N responses back-to-back, giving one network round-trip for all N queries.
Returns one Result<PipelineResponse, PgWireError> per input item,
in the same order. The outer Result fails only if queueing fails
(channel closed). Each inner Result reflects the per-query outcome.
Sourcepub async fn close(&self) -> Result<(), PgWireError>
pub async fn close(&self) -> Result<(), PgWireError>
Send a Terminate message to the server and wait for the writer/reader
tasks to exit. After this returns, the connection is unusable; further
calls fail with ConnectionClosed. Idempotent: calling close on an
already-closed connection is a no-op and returns Ok.
Sourcepub async fn submit_stream(
&self,
messages: BytesMut,
row_buffer: usize,
) -> Result<(StreamHeader, Receiver<Result<StreamedRow, PgWireError>>), PgWireError>
pub async fn submit_stream( &self, messages: BytesMut, row_buffer: usize, ) -> Result<(StreamHeader, Receiver<Result<StreamedRow, PgWireError>>), PgWireError>
Submit a streaming request. Returns the column header and an mpsc receiver that yields rows one at a time.
Sourcepub async fn pipeline_transaction(
&self,
setup_sql: &str,
query_sql: &str,
params: &[Option<&[u8]>],
param_oids: &[u32],
stmt_name: &[u8],
needs_parse: bool,
) -> Result<Vec<RawRow>, PgWireError>
pub async fn pipeline_transaction( &self, setup_sql: &str, query_sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>
Execute a pipelined transaction: setup (simple query) + data query (extended protocol) + COMMIT (simple query) All coalesced into one TCP write. Binary-safe parameterized data query.
Sourcepub async fn query(
&self,
sql: &str,
params: &[Option<&[u8]>],
param_oids: &[u32],
stmt_name: &[u8],
needs_parse: bool,
) -> Result<Vec<RawRow>, PgWireError>
pub async fn query( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>
Execute a simple parameterized query (no transaction).
Sourcepub async fn query_with_formats(
&self,
sql: &str,
params: &[Option<&[u8]>],
param_oids: &[u32],
param_formats: &[FormatCode],
result_formats: &[FormatCode],
stmt_name: &[u8],
needs_parse: bool,
) -> Result<Vec<RawRow>, PgWireError>
pub async fn query_with_formats( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], param_formats: &[FormatCode], result_formats: &[FormatCode], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>
Execute a parameterized query with explicit per-param and per-result format codes (text = 0, binary = 1).
param_formats is interpreted per PostgreSQL wire protocol rules:
- empty: all params are text
- length 1: the single code applies to every param
- length N (== params.len()): one code per param
Same rules apply to result_formats for output columns (empty → all
text; single code → applies to all columns; per-column list otherwise).
Sourcepub async fn exec_query_with_formats(
&self,
sql: &str,
params: &[Option<&[u8]>],
param_oids: &[u32],
param_formats: &[FormatCode],
result_formats: &[FormatCode],
) -> Result<Vec<RawRow>, PgWireError>
pub async fn exec_query_with_formats( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], param_formats: &[FormatCode], result_formats: &[FormatCode], ) -> Result<Vec<RawRow>, PgWireError>
Variant of exec_query with per-param and per-result format codes.
See query_with_formats for format code semantics.