Skip to main content

AsyncPoolable

Struct AsyncPoolable 

Source
pub struct AsyncPoolable(/* private fields */);
Expand description

Poolable wrapper around pg_wired::AsyncConn.

The AsyncConn spawns reader/writer tasks on creation and keeps them running until the connection dies. Pooling AsyncConn directly means connections are reused without re-establishing TCP or re-authenticating.

The wrapped connection is exposed read-only via AsyncPoolable::conn. Constructing an AsyncPoolable directly is intentionally not part of the public API: connections enter the wrapper through Poolable::connect and the pool’s lifecycle hooks, which keeps pool accounting consistent.

Implementations§

Source§

impl AsyncPoolable

Source

pub fn conn(&self) -> &AsyncConn

Borrow the wrapped connection. Use this for read-only access from pool consumers (e.g., to check liveness or fetch a cancel token).

Methods from Deref<Target = AsyncConn>§

Source

pub fn is_alive(&self) -> bool

Check if the connection is still alive (writer/reader tasks running).

Source

pub fn backend_pid(&self) -> i32

Backend process ID assigned by the server.

Source

pub fn addr(&self) -> &str

Server address this connection is talking to.

Source

pub fn cancel_token(&self) -> CancelToken

Produce a cancel token for the running session on this connection.

Source

pub fn mark_state_mutated(&self)

Mark the connection as having mutated session state since the last reset. Pools call take_state_mutated() on return to decide whether to issue DISCARD ALL. Callers issuing BEGIN, SET (without LOCAL), advisory locks, temp tables, etc., should call this before submitting.

Source

pub fn take_state_mutated(&self) -> bool

Atomically read and clear the state-mutated flag. Returns the previous value: true means the caller should issue a reset.

Source

pub fn is_state_mutated(&self) -> bool

Read the state-mutated flag without clearing it.

Source

pub fn mark_broken(&self)

Mark the connection as broken. The reader/writer tasks may still be running, but the session is in an indeterminate state (for example, a transaction was dropped without commit or rollback) and the connection must not be reused. Pool integrations check AsyncConn::is_broken on return and destroy the connection instead of returning it to the idle set.

Source

pub fn is_broken(&self) -> bool

True if the connection has been declared broken by a caller via AsyncConn::mark_broken. Independent of AsyncConn::is_alive, which only reflects whether the reader/writer tasks are still running.

Source

pub fn enqueue_rollback(&self) -> bool

Fire-and-forget enqueue of a ROLLBACK simple-query, intended to be callable from a synchronous Drop. Returns true if the request was queued on the writer task, false if the connection is not alive or the channel was full/closed (in which case the caller should fall back to AsyncConn::mark_broken so the connection is discarded by the pool).

PostgreSQL accepts ROLLBACK from any in-transaction state — including the aborted state (25P02) that a failed query leaves behind — so this reliably restores the session to idle. The response is drained and discarded; ordering on the writer queue is preserved, so any subsequent request (e.g., the pool’s DISCARD ALL reset) sees a clean connection.

Source

pub fn dropped_notifications(&self) -> u64

Cumulative number of NotificationResponse messages this connection has discarded since it was created.

Notifications are dropped when (a) the application has not called AsyncConn::take_notification_receiver yet, or (b) the receiver is not draining fast enough and the bounded channel fills up. Compare successive readings to detect missed LISTEN events.

Source

pub fn take_notification_receiver(&self) -> Option<Receiver<BackendMsg>>

Take the notification receiver. Call once to get a channel that receives NotificationResponse messages that arrive during queries.

Source

pub fn lookup_or_alloc(&self, sql: &str) -> (Vec<u8>, bool)

Look up or allocate a statement name. Uses an LRU-style eviction: when the cache is full, the oldest entry (by insertion order / counter) is removed and a Close message is queued to free the server-side prepared statement.

Source

pub async fn copy_in( &self, copy_sql: &str, data: &[u8], ) -> Result<u64, PgWireError>

Execute COPY FROM STDIN: sends the COPY command, then data in chunks, then CopyDone. Returns the number of rows copied (from CommandComplete tag).

Data is sent in chunks of up to 1MB to avoid buffering the entire payload in a single BytesMut. For small payloads (< 1MB), this is a single write.

Source

pub async fn copy_in_stream<R>( &self, copy_sql: &str, reader: R, ) -> Result<u64, PgWireError>
where R: AsyncRead + Unpin,

Execute COPY FROM STDIN with streaming: sends the COPY command, then reads data from an async reader in chunks, avoiding buffering the entire payload in memory.

use tokio::fs::File;
let file = File::open("data.csv").await?;
let _count = conn.copy_in_stream("COPY users FROM STDIN WITH (FORMAT csv)", file).await?;
Source

pub async fn copy_out(&self, copy_sql: &str) -> Result<Vec<u8>, PgWireError>

Execute COPY TO STDOUT: sends the COPY command, collects all CopyData.

Source

pub fn invalidate_statement(&self, sql: &str)

Evict a SQL statement from the cache, forcing re-parse on next use. Used for prepared statement invalidation after schema changes.

Source

pub fn clear_statement_cache(&self)

Clear the entire statement cache. Must be called after DISCARD ALL which destroys server-side prepared statements.

Source

pub async fn exec_transaction( &self, setup_sql: &str, query_sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], ) -> Result<Vec<RawRow>, PgWireError>

Execute a pipelined transaction with automatic statement caching.

Source

pub async fn exec_query( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], ) -> Result<Vec<RawRow>, PgWireError>

Execute a parameterized query with automatic statement caching. If a cached statement is invalidated by a schema change (PG error 26000 or 0A000), automatically evicts the cache entry, re-parses, and retries once.

Source

pub async fn submit( &self, messages: BytesMut, collector: ResponseCollector, ) -> Result<PipelineResponse, PgWireError>

Submit a request to the connection. Returns a future that resolves when the response is available. Times out after 5 minutes to prevent hanging forever if the reader/writer task dies.

Source

pub async fn submit_batch( &self, items: Vec<(BytesMut, ResponseCollector)>, ) -> Result<Vec<Result<PipelineResponse, PgWireError>>, PgWireError>

Submit a batch of requests in FIFO order. All requests are queued before any response is awaited, so the writer task sees them together and coalesces them into a single write() syscall. The server then pipelines the N responses back-to-back, giving one network round-trip for all N queries.

Returns one Result<PipelineResponse, PgWireError> per input item, in the same order. The outer Result fails only if queueing fails (channel closed). Each inner Result reflects the per-query outcome.

Source

pub async fn close(&self) -> Result<(), PgWireError>

Send a Terminate message to the server and wait for the writer/reader tasks to exit. After this returns, the connection is unusable; further calls fail with ConnectionClosed. Idempotent: calling close on an already-closed connection is a no-op and returns Ok.

Source

pub async fn submit_stream( &self, messages: BytesMut, row_buffer: usize, ) -> Result<(StreamHeader, Receiver<Result<RawRow, PgWireError>>), PgWireError>

Submit a streaming request. Returns the column header and an mpsc receiver that yields rows one at a time.

Source

pub async fn pipeline_transaction( &self, setup_sql: &str, query_sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>

Execute a pipelined transaction: setup (simple query) + data query (extended protocol) + COMMIT (simple query) All coalesced into one TCP write. Binary-safe parameterized data query.

Source

pub async fn query( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>

Execute a simple parameterized query (no transaction).

Source

pub async fn query_with_formats( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], param_formats: &[FormatCode], result_formats: &[FormatCode], stmt_name: &[u8], needs_parse: bool, ) -> Result<Vec<RawRow>, PgWireError>

Execute a parameterized query with explicit per-param and per-result format codes (text = 0, binary = 1).

param_formats is interpreted per PostgreSQL wire protocol rules:

  • empty: all params are text
  • length 1: the single code applies to every param
  • length N (== params.len()): one code per param

Same rules apply to result_formats for output columns (empty → all text; single code → applies to all columns; per-column list otherwise).

Source

pub async fn exec_query_with_formats( &self, sql: &str, params: &[Option<&[u8]>], param_oids: &[u32], param_formats: &[FormatCode], result_formats: &[FormatCode], ) -> Result<Vec<RawRow>, PgWireError>

Variant of exec_query with per-param and per-result format codes. See query_with_formats for format code semantics.

Trait Implementations§

Source§

impl Debug for AsyncPoolable

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Deref for AsyncPoolable

Source§

type Target = AsyncConn

The resulting type after dereferencing.
Source§

fn deref(&self) -> &Self::Target

Dereferences the value.
Source§

impl Poolable for AsyncPoolable

Source§

async fn reset(&self) -> bool

Reset connection state on return to pool.

Fast path: if no operation since the last reset has mutated session state, skip the round-trip. The reader task automatically flags the connection state-mutated whenever ReadyForQuery reports a non-idle transaction status; callers that issue SET / advisory-lock / temp table / LISTEN / etc. via simple-query call pg_wired::AsyncConn::mark_state_mutated explicitly. The bulk of pooled usage is self-contained Bind/Execute/Sync queries which never set the flag, so the fast path is the common case.

Slow path: send DISCARD ALL to clear transactions, SET variables, temp tables, advisory locks, and prepared statements. If the reset fails, the connection is destroyed.

Source§

type Error = PgWireError

The error type for connection operations.
Source§

async fn connect( addr: &str, user: &str, password: &str, database: &str, ) -> Result<Self, Self::Error>

Create a new connection to the database.
Source§

fn has_pending_data(&self) -> bool

Check if the connection has unconsumed data (is in a corrupted state).

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<P, T> Receiver for P
where P: Deref<Target = T> + ?Sized, T: ?Sized,

Source§

type Target = T

🔬This is a nightly-only experimental API. (arbitrary_self_types)
The target type on which the method may be called.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more