#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(
missing_docs,
clippy::cast_possible_wrap,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
use diesel::backend::Backend;
use diesel::connection::{CacheSize, Instrumentation};
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
use diesel::row::Row;
use diesel::{ConnectionResult, QueryResult};
use futures_core::future::BoxFuture;
use futures_core::Stream;
use futures_util::FutureExt;
use std::fmt::Debug;
use std::future::Future;
pub use scoped_futures;
use scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
#[cfg(feature = "async-connection-wrapper")]
pub mod async_connection_wrapper;
mod deref_connection;
#[cfg(feature = "migrations")]
mod migrations;
#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "postgres")]
pub mod pg;
#[cfg(feature = "pool")]
pub mod pooled_connection;
mod run_query_dsl;
#[cfg(any(feature = "postgres", feature = "mysql"))]
mod stmt_cache;
#[cfg(feature = "sync-connection-wrapper")]
pub mod sync_connection_wrapper;
mod transaction_manager;
#[cfg(feature = "mysql")]
#[doc(inline)]
pub use self::mysql::AsyncMysqlConnection;
#[cfg(feature = "mysql")]
#[doc(inline)]
pub use self::mysql::MysqlCancelToken;
#[cfg(feature = "postgres")]
#[doc(inline)]
pub use self::pg::AsyncPgConnection;
#[doc(inline)]
pub use self::run_query_dsl::*;
#[doc(inline)]
#[cfg(feature = "migrations")]
pub use self::migrations::AsyncMigrationHarness;
#[doc(inline)]
pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
pub trait SimpleAsyncConnection {
fn batch_execute(&mut self, query: &str) -> impl Future<Output = QueryResult<()>> + Send;
}
pub trait AsyncConnectionCore: SimpleAsyncConnection + Send {
type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
type LoadFuture<'conn, 'query>: Future<Output = QueryResult<Self::Stream<'conn, 'query>>> + Send;
type Stream<'conn, 'query>: Stream<Item = QueryResult<Self::Row<'conn, 'query>>> + Send;
type Row<'conn, 'query>: Row<'conn, Self::Backend>;
type Backend: Backend;
#[doc(hidden)]
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
where
T: AsQuery + 'query,
T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
#[doc(hidden)]
fn execute_returning_count<'conn, 'query, T>(
&'conn mut self,
source: T,
) -> Self::ExecuteFuture<'conn, 'query>
where
T: QueryFragment<Self::Backend> + QueryId + 'query;
#[doc(hidden)]
fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
#[doc(hidden)]
fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
}
pub trait AsyncConnection: AsyncConnectionCore + Sized {
#[doc(hidden)]
type TransactionManager: TransactionManager<Self>;
fn establish(database_url: &str) -> impl Future<Output = ConnectionResult<Self>> + Send;
fn transaction<'a, 'conn, R, E, F>(
&'conn mut self,
callback: F,
) -> BoxFuture<'conn, Result<R, E>>
where
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: From<diesel::result::Error> + Send + 'a,
R: Send + 'a,
'a: 'conn,
{
Self::TransactionManager::transaction(self, callback).boxed()
}
fn begin_test_transaction(&mut self) -> impl Future<Output = QueryResult<()>> + Send {
use diesel::connection::TransactionManagerStatus;
async {
match Self::TransactionManager::transaction_manager_status_mut(self) {
TransactionManagerStatus::Valid(valid_status) => {
assert_eq!(None, valid_status.transaction_depth())
}
TransactionManagerStatus::InError => panic!("Transaction manager in error"),
};
Self::TransactionManager::begin_transaction(self).await?;
Self::TransactionManager::transaction_manager_status_mut(self)
.set_test_transaction_flag();
Ok(())
}
}
fn test_transaction<'conn, 'a, R, E, F>(
&'conn mut self,
f: F,
) -> impl Future<Output = R> + Send + 'conn
where
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: Debug + Send + 'a,
R: Send + 'a,
'a: 'conn,
{
use futures_util::TryFutureExt;
let (user_result_tx, user_result_rx) = std::sync::mpsc::channel();
self.transaction::<R, _, _>(move |conn| {
f(conn)
.map_err(|_| diesel::result::Error::RollbackTransaction)
.and_then(move |r| {
let _ = user_result_tx.send(r);
std::future::ready(Err(diesel::result::Error::RollbackTransaction))
})
.scope_boxed()
})
.then(move |_r| {
let r = user_result_rx
.try_recv()
.expect("Transaction did not succeed");
std::future::ready(r)
})
}
#[doc(hidden)]
fn transaction_state(
&mut self,
) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
#[doc(hidden)]
fn instrumentation(&mut self) -> &mut dyn Instrumentation;
fn set_instrumentation(&mut self, instrumentation: impl Instrumentation);
fn set_prepared_statement_cache_size(&mut self, size: CacheSize);
}