diesel-async 0.8.0

An async extension for Diesel the safe, extensible ORM and Query Builder
Documentation
use crate::UpdateAndFetchResults;
use crate::{AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection, TransactionManager};
use diesel::associations::HasTable;
use diesel::connection::CacheSize;
use diesel::connection::Instrumentation;
use diesel::QueryResult;
use futures_util::future::BoxFuture;
use std::ops::DerefMut;

impl<C> SimpleAsyncConnection for C
where
    C: DerefMut + Send,
    C::Target: SimpleAsyncConnection + Send,
{
    async fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> {
        let conn = self.deref_mut();
        conn.batch_execute(query).await
    }
}

impl<C> AsyncConnectionCore for C
where
    C: DerefMut + Send,
    C::Target: AsyncConnectionCore,
{
    type ExecuteFuture<'conn, 'query> =
        <C::Target as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>;
    type LoadFuture<'conn, 'query> = <C::Target as AsyncConnectionCore>::LoadFuture<'conn, 'query>;
    type Stream<'conn, 'query> = <C::Target as AsyncConnectionCore>::Stream<'conn, 'query>;
    type Row<'conn, 'query> = <C::Target as AsyncConnectionCore>::Row<'conn, 'query>;

    type Backend = <C::Target as AsyncConnectionCore>::Backend;

    fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
    where
        T: diesel::query_builder::AsQuery + 'query,
        T::Query: diesel::query_builder::QueryFragment<Self::Backend>
            + diesel::query_builder::QueryId
            + 'query,
    {
        let conn = self.deref_mut();
        conn.load(source)
    }

    fn execute_returning_count<'conn, 'query, T>(
        &'conn mut self,
        source: T,
    ) -> Self::ExecuteFuture<'conn, 'query>
    where
        T: diesel::query_builder::QueryFragment<Self::Backend>
            + diesel::query_builder::QueryId
            + 'query,
    {
        let conn = self.deref_mut();
        conn.execute_returning_count(source)
    }
}

#[diagnostic::do_not_recommend]
impl<C> AsyncConnection for C
where
    C: DerefMut + Send,
    C::Target: AsyncConnection,
{
    type TransactionManager =
        PoolTransactionManager<<C::Target as AsyncConnection>::TransactionManager>;

    async fn establish(_database_url: &str) -> diesel::ConnectionResult<Self> {
        Err(diesel::result::ConnectionError::BadConnection(
            String::from("Cannot directly establish a pooled connection"),
        ))
    }

    fn transaction_state(
        &mut self,
    ) -> &mut <Self::TransactionManager as crate::transaction_manager::TransactionManager<Self>>::TransactionStateData{
        let conn = self.deref_mut();
        conn.transaction_state()
    }

    async fn begin_test_transaction(&mut self) -> diesel::QueryResult<()> {
        self.deref_mut().begin_test_transaction().await
    }

    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
        self.deref_mut().instrumentation()
    }

    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
        self.deref_mut().set_instrumentation(instrumentation);
    }

    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
        self.deref_mut().set_prepared_statement_cache_size(size);
    }
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PoolTransactionManager<TM>(std::marker::PhantomData<TM>);

impl<C, TM> TransactionManager<C> for PoolTransactionManager<TM>
where
    C: DerefMut + Send,
    C::Target: AsyncConnection<TransactionManager = TM>,
    TM: TransactionManager<C::Target>,
{
    type TransactionStateData = TM::TransactionStateData;

    async fn begin_transaction(conn: &mut C) -> diesel::QueryResult<()> {
        TM::begin_transaction(&mut **conn).await
    }

    async fn rollback_transaction(conn: &mut C) -> diesel::QueryResult<()> {
        TM::rollback_transaction(&mut **conn).await
    }

    async fn commit_transaction(conn: &mut C) -> diesel::QueryResult<()> {
        TM::commit_transaction(&mut **conn).await
    }

    fn transaction_manager_status_mut(
        conn: &mut C,
    ) -> &mut diesel::connection::TransactionManagerStatus {
        TM::transaction_manager_status_mut(&mut **conn)
    }

    fn is_broken_transaction_manager(conn: &mut C) -> bool {
        TM::is_broken_transaction_manager(&mut **conn)
    }
}

impl<Changes, Output, Conn> UpdateAndFetchResults<Changes, Output> for Conn
where
    Conn: DerefMut + Send,
    Changes: diesel::prelude::Identifiable + HasTable + Send,
    Conn::Target: UpdateAndFetchResults<Changes, Output>,
{
    fn update_and_fetch<'conn, 'changes>(
        &'conn mut self,
        changeset: Changes,
    ) -> BoxFuture<'changes, QueryResult<Output>>
    where
        Changes: 'changes,
        'conn: 'changes,
        Self: 'changes,
    {
        self.deref_mut().update_and_fetch(changeset)
    }
}