pub struct AsyncPgConnection { /* private fields */ }
postgres
only.Expand description
A connection to a PostgreSQL database.
Connection URLs should be in the form
postgres://[user[:password]@]host/database_name
Checkout the documentation of the tokio_postgres crate for details about the format
§Pipelining
This connection supports pipelined requests. Pipelining can improve performance in use cases in which multiple, independent queries need to be executed. In a traditional workflow, each query is sent to the server after the previous query completes. In contrast, pipelining allows the client to send all of the queries to the server up front, minimizing time spent by one side waiting for the other to finish sending data:
Sequential Pipelined
| Client | Server | | Client | Server |
|----------------|-----------------| |----------------|-----------------|
| send query 1 | | | send query 1 | |
| | process query 1 | | send query 2 | process query 1 |
| receive rows 1 | | | send query 3 | process query 2 |
| send query 2 | | | receive rows 1 | process query 3 |
| | process query 2 | | receive rows 2 | |
| receive rows 2 | | | receive rows 3 | |
| send query 3 | |
| | process query 3 |
| receive rows 3 | |
In both cases, the PostgreSQL server is executing the queries sequentially - pipelining just allows both sides of the connection to work concurrently when possible.
Pipelining happens automatically when futures are polled concurrently (for example, by using the futures join
combinator):
use diesel_async::RunQueryDsl;
let q1 = diesel::select(1_i32.into_sql::<Integer>());
let q2 = diesel::select(2_i32.into_sql::<Integer>());
// construct multiple futures for different queries
let f1 = q1.get_result::<i32>(conn);
let f2 = q2.get_result::<i32>(conn);
// wait on both results
let res = futures_util::try_join!(f1, f2)?;
assert_eq!(res.0, 1);
assert_eq!(res.1, 2);
For more complex cases, an immutable reference to the connection need to be used:
use diesel_async::RunQueryDsl;
async fn fn12(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
let f1 = diesel::select(1_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
let f2 = diesel::select(2_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
futures_util::try_join!(f1, f2)
}
async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
let f3 = diesel::select(3_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
let f4 = diesel::select(4_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
futures_util::try_join!(f3, f4)
}
let f12 = fn12(&conn);
let f34 = fn34(&conn);
let ((r1, r2), (r3, r4)) = futures_util::try_join!(f12, f34).unwrap();
assert_eq!(r1, 1);
assert_eq!(r2, 2);
assert_eq!(r3, 3);
assert_eq!(r4, 4);
§TLS
Connections created by AsyncPgConnection::establish
do not support TLS.
TLS support for tokio_postgres connections is implemented by external crates, e.g. tokio_postgres_rustls.
AsyncPgConnection::try_from_client_and_connection
can be used to construct a connection from an existing
tokio_postgres::Connection
with TLS enabled.
Implementations§
Source§impl AsyncPgConnection
impl AsyncPgConnection
Sourcepub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self>
pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self>
Build a transaction, specifying additional details such as isolation level
See TransactionBuilder
for more examples.
conn.build_transaction()
.read_only()
.serializable()
.deferrable()
.run(|conn| async move { Ok(()) }.scope_boxed())
.await
Sourcepub async fn try_from(conn: Client) -> ConnectionResult<Self>
pub async fn try_from(conn: Client) -> ConnectionResult<Self>
Construct a new AsyncPgConnection
instance from an existing tokio_postgres::Client
Sourcepub async fn try_from_client_and_connection<S>(
client: Client,
conn: Connection<Socket, S>,
) -> ConnectionResult<Self>
pub async fn try_from_client_and_connection<S>( client: Client, conn: Connection<Socket, S>, ) -> ConnectionResult<Self>
Constructs a new AsyncPgConnection
from an existing tokio_postgres::Client
and
tokio_postgres::Connection
Sourcepub fn cancel_token(&self) -> CancelToken
pub fn cancel_token(&self) -> CancelToken
Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client.
Sourcepub fn notifications_stream(
&mut self,
) -> impl Stream<Item = QueryResult<PgNotification>> + '_
pub fn notifications_stream( &mut self, ) -> impl Stream<Item = QueryResult<PgNotification>> + '_
See Postgres documentation for SQL commands NOTIFY and LISTEN
The returned stream yields all notifications received by the connection, not only notifications received after calling the function. The returned stream will never close, so no notifications will just result in a pending state.
If there’s no connection available to poll, the stream will yield no notifications and be pending forever.
This can happen if you created the AsyncPgConnection
by the try_from
constructor.
// register the notifications channel we want to receive notifications for
diesel::sql_query("LISTEN example_channel").execute(conn).await?;
// send some notification (usually done from a different connection/thread/application)
diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(conn).await?;
let mut notifications = std::pin::pin!(conn.notifications_stream());
let mut notification = notifications.next().await.unwrap().unwrap();
assert_eq!(notification.channel, "example_channel");
assert_eq!(notification.payload, "additional data");
println!("Notification received from process with id {}", notification.process_id);
Trait Implementations§
Source§impl AsyncConnection for AsyncPgConnection
impl AsyncConnection for AsyncPgConnection
Source§async fn establish(database_url: &str) -> ConnectionResult<Self>
async fn establish(database_url: &str) -> ConnectionResult<Self>
Source§fn set_instrumentation(&mut self, instrumentation: impl Instrumentation)
fn set_instrumentation(&mut self, instrumentation: impl Instrumentation)
Instrumentation
implementation for this connectionSource§fn set_prepared_statement_cache_size(&mut self, size: CacheSize)
fn set_prepared_statement_cache_size(&mut self, size: CacheSize)
CacheSize
for this connectionSource§fn transaction<'a, 'conn, R, E, F>(
&'conn mut self,
callback: F,
) -> BoxFuture<'conn, Result<R, E>>
fn transaction<'a, 'conn, R, E, F>( &'conn mut self, callback: F, ) -> BoxFuture<'conn, Result<R, E>>
Source§fn begin_test_transaction(
&mut self,
) -> impl Future<Output = QueryResult<()>> + Send
fn begin_test_transaction( &mut self, ) -> impl Future<Output = QueryResult<()>> + Send
Source§impl AsyncConnectionCore for &AsyncPgConnection
impl AsyncConnectionCore for &AsyncPgConnection
Source§type LoadFuture<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::LoadFuture<'conn, 'query>
type LoadFuture<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::LoadFuture<'conn, 'query>
AsyncConnection::load
Source§type ExecuteFuture<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>
type ExecuteFuture<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>
AsyncConnection::execute
Source§type Stream<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::Stream<'conn, 'query>
type Stream<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::Stream<'conn, 'query>
AsyncConnection::load
Source§type Row<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::Row<'conn, 'query>
type Row<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::Row<'conn, 'query>
AsyncConnection::load
Source§type Backend = <AsyncPgConnection as AsyncConnectionCore>::Backend
type Backend = <AsyncPgConnection as AsyncConnectionCore>::Backend
Source§impl AsyncConnectionCore for AsyncPgConnection
impl AsyncConnectionCore for AsyncPgConnection
Source§type LoadFuture<'conn, 'query> = Pin<Box<dyn Future<Output = Result<<AsyncPgConnection as AsyncConnectionCore>::Stream<'conn, 'query>, Error>> + Send + 'query>>
type LoadFuture<'conn, 'query> = Pin<Box<dyn Future<Output = Result<<AsyncPgConnection as AsyncConnectionCore>::Stream<'conn, 'query>, Error>> + Send + 'query>>
AsyncConnection::load
Source§type ExecuteFuture<'conn, 'query> = Pin<Box<dyn Future<Output = Result<usize, Error>> + Send + 'query>>
type ExecuteFuture<'conn, 'query> = Pin<Box<dyn Future<Output = Result<usize, Error>> + Send + 'query>>
AsyncConnection::execute
Source§type Stream<'conn, 'query> = Pin<Box<dyn Stream<Item = Result<PgRow, Error>> + Send>>
type Stream<'conn, 'query> = Pin<Box<dyn Stream<Item = Result<PgRow, Error>> + Send>>
AsyncConnection::load
Source§impl Drop for AsyncPgConnection
impl Drop for AsyncPgConnection
Source§impl SimpleAsyncConnection for &AsyncPgConnection
impl SimpleAsyncConnection for &AsyncPgConnection
Source§async fn batch_execute(&mut self, query: &str) -> QueryResult<()>
async fn batch_execute(&mut self, query: &str) -> QueryResult<()>
Source§impl SimpleAsyncConnection for AsyncPgConnection
impl SimpleAsyncConnection for AsyncPgConnection
Source§async fn batch_execute(&mut self, query: &str) -> QueryResult<()>
async fn batch_execute(&mut self, query: &str) -> QueryResult<()>
Source§impl<'b, Changes, Output, Tab, V> UpdateAndFetchResults<Changes, Output> for AsyncPgConnectionwhere
Output: Send + 'static,
Changes: Copy + AsChangeset<Target = Tab> + Send + Identifiable<Table = Tab>,
Tab: Table + FindDsl<Changes::Id> + 'b,
Find<Tab, Changes::Id>: IntoUpdateTarget<Table = Tab, WhereClause = V>,
UpdateStatement<Tab, V, Changes::Changeset>: AsQuery,
Update<Changes, Changes>: LoadQuery<'b, Self, Output>,
V: Send + 'b,
Changes::Changeset: Send + 'b,
Tab::FromClause: Send,
impl<'b, Changes, Output, Tab, V> UpdateAndFetchResults<Changes, Output> for AsyncPgConnectionwhere
Output: Send + 'static,
Changes: Copy + AsChangeset<Target = Tab> + Send + Identifiable<Table = Tab>,
Tab: Table + FindDsl<Changes::Id> + 'b,
Find<Tab, Changes::Id>: IntoUpdateTarget<Table = Tab, WhereClause = V>,
UpdateStatement<Tab, V, Changes::Changeset>: AsQuery,
Update<Changes, Changes>: LoadQuery<'b, Self, Output>,
V: Send + 'b,
Changes::Changeset: Send + 'b,
Tab::FromClause: Send,
Source§fn update_and_fetch<'conn, 'changes>(
&'conn mut self,
changeset: Changes,
) -> BoxFuture<'changes, QueryResult<Output>>where
Changes: 'changes,
Changes::Changeset: 'changes,
Self: 'changes,
'conn: 'changes,
fn update_and_fetch<'conn, 'changes>(
&'conn mut self,
changeset: Changes,
) -> BoxFuture<'changes, QueryResult<Output>>where
Changes: 'changes,
Changes::Changeset: 'changes,
Self: 'changes,
'conn: 'changes,
Auto Trait Implementations§
impl Freeze for AsyncPgConnection
impl !RefUnwindSafe for AsyncPgConnection
impl Send for AsyncPgConnection
impl Sync for AsyncPgConnection
impl Unpin for AsyncPgConnection
impl !UnwindSafe for AsyncPgConnection
Blanket Implementations§
Source§impl<T> AggregateExpressionMethods for T
impl<T> AggregateExpressionMethods for T
Source§fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
DISTINCT
modifier for aggregate functions Read moreSource§fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
ALL
modifier for aggregate functions Read moreSource§fn aggregate_filter<P>(self, f: P) -> Self::Output
fn aggregate_filter<P>(self, f: P) -> Self::Output
Source§fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
Source§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
, which can then be
downcast
into Box<dyn ConcreteType>
where ConcreteType
implements Trait
.Source§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
, which can then be further
downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.Source§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.Source§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.Source§impl<T> DowncastSend for T
impl<T> DowncastSend for T
Source§impl<T> DowncastSync for T
impl<T> DowncastSync for T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoSql for T
impl<T> IntoSql for T
Source§fn into_sql<T>(self) -> Self::Expression
fn into_sql<T>(self) -> Self::Expression
self
to an expression for Diesel’s query builder. Read moreSource§fn as_sql<'a, T>(&'a self) -> <&'a Self as AsExpression<T>>::Expression
fn as_sql<'a, T>(&'a self) -> <&'a Self as AsExpression<T>>::Expression
&self
to an expression for Diesel’s query builder. Read moreSource§impl<T, Conn> RunQueryDsl<Conn> for T
impl<T, Conn> RunQueryDsl<Conn> for T
Source§fn execute<'conn, 'query>(
self,
conn: &'conn mut Conn,
) -> Conn::ExecuteFuture<'conn, 'query>
fn execute<'conn, 'query>( self, conn: &'conn mut Conn, ) -> Conn::ExecuteFuture<'conn, 'query>
Source§fn load<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> LoadFuture<'conn, 'query, Self, Conn, U>
fn load<'query, 'conn, U>( self, conn: &'conn mut Conn, ) -> LoadFuture<'conn, 'query, Self, Conn, U>
Source§fn load_stream<'conn, 'query, U>(
self,
conn: &'conn mut Conn,
) -> Self::LoadFuture<'conn>where
Conn: AsyncConnectionCore,
U: 'conn,
Self: LoadQuery<'query, Conn, U> + 'query,
fn load_stream<'conn, 'query, U>(
self,
conn: &'conn mut Conn,
) -> Self::LoadFuture<'conn>where
Conn: AsyncConnectionCore,
U: 'conn,
Self: LoadQuery<'query, Conn, U> + 'query,
Stream
] with the returned rows. Read moreSource§fn get_result<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> GetResult<'conn, 'query, Self, Conn, U>
fn get_result<'query, 'conn, U>( self, conn: &'conn mut Conn, ) -> GetResult<'conn, 'query, Self, Conn, U>
Source§fn get_results<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> LoadFuture<'conn, 'query, Self, Conn, U>
fn get_results<'query, 'conn, U>( self, conn: &'conn mut Conn, ) -> LoadFuture<'conn, 'query, Self, Conn, U>
Vec
with the affected rows. Read more