Skip to main content

AsyncConn

Struct AsyncConn 

Source
pub struct AsyncConn { /* private fields */ }
Expand description

A shared async connection that multiplexes requests from many tasks.

Implementations§

Source§

impl AsyncConn

Source

pub fn is_alive(&self) -> bool

Check if the connection is still alive (writer/reader tasks running).

Source

pub fn backend_pid(&self) -> i32

Backend process ID assigned by the server.

Source

pub fn addr(&self) -> &str

Server address this connection is talking to.

Source

pub fn cancel_token(&self) -> CancelToken

Produce a cancel token for the running session on this connection.

Source

pub fn mark_state_mutated(&self)

Mark the connection as having mutated session state since the last reset. Pools call take_state_mutated() on return to decide whether to issue DISCARD ALL. Callers issuing BEGIN, SET (without LOCAL), advisory locks, temp tables, etc., should call this before submitting.

Source

pub fn take_state_mutated(&self) -> bool

Atomically read and clear the state-mutated flag. Returns the previous value: true means the caller should issue a reset.

Source

pub fn is_state_mutated(&self) -> bool

Read the state-mutated flag without clearing it.

Source

pub fn mark_broken(&self)

Mark the connection as broken. The reader/writer tasks may still be running, but the session is in an indeterminate state (for example, a transaction was dropped without commit or rollback) and the connection must not be reused. Pool integrations check AsyncConn::is_broken on return and destroy the connection instead of returning it to the idle set.

Source

pub fn is_broken(&self) -> bool

True if the connection has been declared broken by a caller via AsyncConn::mark_broken. Independent of AsyncConn::is_alive, which only reflects whether the reader/writer tasks are still running.

Source

pub fn enqueue_rollback(&self) -> bool

Fire-and-forget enqueue of a ROLLBACK simple-query, intended to be callable from a synchronous Drop. Returns true if the request was queued on the writer task, false if the connection is not alive or the channel was full/closed (in which case the caller should fall back to AsyncConn::mark_broken so the connection is discarded by the pool).

PostgreSQL accepts ROLLBACK from any in-transaction state — including the aborted state (25P02) that a failed query leaves behind — so this reliably restores the session to idle. The response is drained and discarded; ordering on the writer queue is preserved, so any subsequent request (e.g., the pool’s DISCARD ALL reset) sees a clean connection.

Source§

impl AsyncConn

Source

pub fn new(conn: WireConn) -> Self

Create a new async connection from a raw WireConn. Spawns writer and reader tasks.

Source

pub fn dropped_notifications(&self) -> u64

Cumulative number of NotificationResponse messages this connection has discarded since it was created.

Notifications are dropped when (a) the application has not called AsyncConn::take_notification_receiver yet, or (b) the receiver is not draining fast enough and the bounded channel fills up. Compare successive readings to detect missed LISTEN events.

Source

pub fn take_notification_receiver(&self) -> Option<Receiver<BackendMsg>>

Take the notification receiver. Call once to get a channel that receives NotificationResponse messages that arrive during queries.

Source

pub fn lookup_or_alloc(&self, sql: &str, _param_oids: &[u32]) -> (Vec<u8>, bool)

Look up or allocate a statement name.

Cache hit: returns the cached name with needs_parse=false. The caller submits only Bind/Execute/Sync.

Cache miss: allocates a fresh, unique name from the connection’s statement counter and returns (name, needs_parse=true). The name is NOT yet published in the cache: the caller MUST include a Parse for the new name in the same atomic submit as Bind/Execute/Sync (so the Parse runs inside whatever role-switched transaction the caller has framed, e.g. BEGIN; SET LOCAL ROLE …; …), and then call Self::cache_statement to publish the name only after the Parse has succeeded on the wire.

Why publish-after-success: an earlier version pre-queued the Parse as its own writer request and published the cache entry up-front to avoid a race where a concurrent caller saw the cached name and submitted a Bind-only request that races ahead of the Parse. That eliminated the race, but ran the Parse outside any transaction, under the connection’s persistent role (e.g. PostgREST’s authenticator). SQL that references objects only reachable after SET LOCAL ROLE to a user role failed with 42501 permission denied during Parse, while every subsequent Bind for the same name failed with 26000: prepared statement "sN" does not exist. Publishing only after a successful Parse keeps caching role-correct: each first-time concurrent caller pays for its own Parse (rather than sharing a pre-queued one), and cache_statement uses first-publisher-wins semantics so the losing names become session-bounded orphans (bounded by the 256-entry LRU on this connection).

Source

pub fn cache_statement(&self, sql: &str, name: &[u8])

Publish a freshly Parsed statement in the cache so subsequent lookups for the same SQL skip the Parse step.

Called by the high-level exec_* helpers (and any external caller of Self::lookup_or_alloc) after the writer submit that included Parse for name returned successfully. Skipping this step doesn’t cause correctness problems; the next lookup just misses and re-Parses.

First-publisher-wins: if another concurrent miss already published a different name for the same SQL, that name stays in the cache and the caller’s name becomes a server-side orphan (cleaned up at session end; bounded by LRU eviction during the session).

LRU eviction: when the cache reaches its 256-entry cap, the oldest entry by counter is removed and a Close + Sync is fire-and-forget queued to free the server-side prepared statement.

Source

pub async fn copy_in( &self, copy_sql: &str, data: &[u8], ) -> Result<u64, PgWireError>

Execute COPY FROM STDIN: sends the COPY command, then data in chunks, then CopyDone. Returns the number of rows copied (from CommandComplete tag).

Data is sent in chunks of up to 1MB to avoid buffering the entire payload in a single BytesMut. For small payloads (< 1MB), this is a single write.

Source

pub async fn copy_in_stream<R: AsyncRead + Unpin>( &self, copy_sql: &str, reader: R, ) -> Result<u64, PgWireError>

Execute COPY FROM STDIN with streaming: sends the COPY command, then reads data from an async reader in chunks, avoiding buffering the entire payload in memory.

use tokio::fs::File;
let file = File::open("data.csv").await?;
let _count = conn.copy_in_stream("COPY users FROM STDIN WITH (FORMAT csv)", file).await?;
Source

pub async fn copy_out(&self, copy_sql: &str) -> Result<Vec<u8>, PgWireError>

Execute COPY TO STDOUT: sends the COPY command, collects all CopyData.

Source

pub fn invalidate_statement(&self, sql: &str)

Evict a SQL statement from the cache, forcing re-parse on next use. Used for prepared statement invalidation after schema changes.

Source

pub fn clear_statement_cache(&self)

Clear the entire statement cache. Must be called after DISCARD ALL which destroys server-side prepared statements.

Source

pub async fn exec_transaction( &self, setup_sql: &str, query_sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], ) -> Result<Vec<RawRow>, PgWireError>

Execute a pipelined transaction with automatic statement caching.

On a successful Parse the new statement name is published in the cache via Self::cache_statement. If a cached statement turns out to be invalid (PG error 26000 or 0A000), the cache entry is evicted and the transaction is retried once with a fresh Parse. This handles schema changes invalidating cached plans after their initial Parse.

Source

pub async fn exec_query( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], ) -> Result<Vec<RawRow>, PgWireError>

Execute a parameterized query with automatic statement caching. If a cached statement is invalidated by a schema change (PG error 26000 or 0A000), automatically evicts the cache entry, re-parses, and retries once.

Source

pub async fn submit( &self, messages: BytesMut, collector: ResponseCollector, ) -> Result<PipelineResponse, PgWireError>

Submit a request to the connection. Returns a future that resolves when the response is available. Times out after 5 minutes to prevent hanging forever if the reader/writer task dies.

Source

pub async fn submit_batch( &self, items: Vec<(BytesMut, ResponseCollector)>, ) -> Result<Vec<Result<PipelineResponse, PgWireError>>, PgWireError>

Submit a batch of requests in FIFO order. All requests are queued before any response is awaited, so the writer task sees them together and coalesces them into a single write() syscall. The server then pipelines the N responses back-to-back, giving one network round-trip for all N queries.

Returns one Result<PipelineResponse, PgWireError> per input item, in the same order. The outer Result fails only if queueing fails (channel closed). Each inner Result reflects the per-query outcome.

Source

pub async fn close(&self) -> Result<(), PgWireError>

Send a Terminate message to the server and wait for the writer/reader tasks to exit. After this returns, the connection is unusable; further calls fail with ConnectionClosed. Idempotent: calling close on an already-closed connection is a no-op and returns Ok.

Source

pub async fn submit_stream( &self, messages: BytesMut, row_buffer: usize, ) -> Result<(StreamHeader, Receiver<Result<StreamedRow, PgWireError>>), PgWireError>

Submit a streaming request. Returns the column header and an mpsc receiver that yields rows one at a time.

Source

pub async fn pipeline_transaction( &self, setup_sql: &str, query_sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>

Execute a pipelined transaction: setup (simple query) + data query (extended protocol) + COMMIT (simple query) All coalesced into one TCP write. Binary-safe parameterized data query.

Source

pub async fn query( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>

Execute a simple parameterized query (no transaction).

Source

pub async fn query_with_formats( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], param_formats: &[FormatCode], result_formats: &[FormatCode], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>

Execute a parameterized query with explicit per-param and per-result format codes (text = 0, binary = 1).

param_formats is interpreted per PostgreSQL wire protocol rules:

  • empty: all params are text
  • length 1: the single code applies to every param
  • length N (== params.len()): one code per param

Same rules apply to result_formats for output columns (empty → all text; single code → applies to all columns; per-column list otherwise).

Source

pub async fn exec_query_with_formats( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], param_formats: &[FormatCode], result_formats: &[FormatCode], ) -> Result<Vec<RawRow>, PgWireError>

Variant of exec_query with per-param and per-result format codes. See query_with_formats for format code semantics.

Trait Implementations§

Source§

impl Debug for AsyncConn

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more