diesel_async/
deref_connection.rs

1use crate::UpdateAndFetchResults;
2use crate::{AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection, TransactionManager};
3use diesel::associations::HasTable;
4use diesel::connection::CacheSize;
5use diesel::connection::Instrumentation;
6use diesel::QueryResult;
7use futures_util::future::BoxFuture;
8use std::ops::DerefMut;
9
10impl<C> SimpleAsyncConnection for C
11where
12    C: DerefMut + Send,
13    C::Target: SimpleAsyncConnection + Send,
14{
15    async fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> {
16        let conn = self.deref_mut();
17        conn.batch_execute(query).await
18    }
19}
20
21impl<C> AsyncConnectionCore for C
22where
23    C: DerefMut + Send,
24    C::Target: AsyncConnectionCore,
25{
26    type ExecuteFuture<'conn, 'query> =
27        <C::Target as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>;
28    type LoadFuture<'conn, 'query> = <C::Target as AsyncConnectionCore>::LoadFuture<'conn, 'query>;
29    type Stream<'conn, 'query> = <C::Target as AsyncConnectionCore>::Stream<'conn, 'query>;
30    type Row<'conn, 'query> = <C::Target as AsyncConnectionCore>::Row<'conn, 'query>;
31
32    type Backend = <C::Target as AsyncConnectionCore>::Backend;
33
34    fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
35    where
36        T: diesel::query_builder::AsQuery + 'query,
37        T::Query: diesel::query_builder::QueryFragment<Self::Backend>
38            + diesel::query_builder::QueryId
39            + 'query,
40    {
41        let conn = self.deref_mut();
42        conn.load(source)
43    }
44
45    fn execute_returning_count<'conn, 'query, T>(
46        &'conn mut self,
47        source: T,
48    ) -> Self::ExecuteFuture<'conn, 'query>
49    where
50        T: diesel::query_builder::QueryFragment<Self::Backend>
51            + diesel::query_builder::QueryId
52            + 'query,
53    {
54        let conn = self.deref_mut();
55        conn.execute_returning_count(source)
56    }
57}
58
59#[diagnostic::do_not_recommend]
60impl<C> AsyncConnection for C
61where
62    C: DerefMut + Send,
63    C::Target: AsyncConnection,
64{
65    type TransactionManager =
66        PoolTransactionManager<<C::Target as AsyncConnection>::TransactionManager>;
67
68    async fn establish(_database_url: &str) -> diesel::ConnectionResult<Self> {
69        Err(diesel::result::ConnectionError::BadConnection(
70            String::from("Cannot directly establish a pooled connection"),
71        ))
72    }
73
74    fn transaction_state(
75        &mut self,
76    ) -> &mut <Self::TransactionManager as crate::transaction_manager::TransactionManager<Self>>::TransactionStateData{
77        let conn = self.deref_mut();
78        conn.transaction_state()
79    }
80
81    async fn begin_test_transaction(&mut self) -> diesel::QueryResult<()> {
82        self.deref_mut().begin_test_transaction().await
83    }
84
85    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
86        self.deref_mut().instrumentation()
87    }
88
89    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
90        self.deref_mut().set_instrumentation(instrumentation);
91    }
92
93    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
94        self.deref_mut().set_prepared_statement_cache_size(size);
95    }
96}
97
98#[doc(hidden)]
99#[allow(missing_debug_implementations)]
100pub struct PoolTransactionManager<TM>(std::marker::PhantomData<TM>);
101
102impl<C, TM> TransactionManager<C> for PoolTransactionManager<TM>
103where
104    C: DerefMut + Send,
105    C::Target: AsyncConnection<TransactionManager = TM>,
106    TM: TransactionManager<C::Target>,
107{
108    type TransactionStateData = TM::TransactionStateData;
109
110    async fn begin_transaction(conn: &mut C) -> diesel::QueryResult<()> {
111        TM::begin_transaction(&mut **conn).await
112    }
113
114    async fn rollback_transaction(conn: &mut C) -> diesel::QueryResult<()> {
115        TM::rollback_transaction(&mut **conn).await
116    }
117
118    async fn commit_transaction(conn: &mut C) -> diesel::QueryResult<()> {
119        TM::commit_transaction(&mut **conn).await
120    }
121
122    fn transaction_manager_status_mut(
123        conn: &mut C,
124    ) -> &mut diesel::connection::TransactionManagerStatus {
125        TM::transaction_manager_status_mut(&mut **conn)
126    }
127
128    fn is_broken_transaction_manager(conn: &mut C) -> bool {
129        TM::is_broken_transaction_manager(&mut **conn)
130    }
131}
132
133impl<Changes, Output, Conn> UpdateAndFetchResults<Changes, Output> for Conn
134where
135    Conn: DerefMut + Send,
136    Changes: diesel::prelude::Identifiable + HasTable + Send,
137    Conn::Target: UpdateAndFetchResults<Changes, Output>,
138{
139    fn update_and_fetch<'conn, 'changes>(
140        &'conn mut self,
141        changeset: Changes,
142    ) -> BoxFuture<'changes, QueryResult<Output>>
143    where
144        Changes: 'changes,
145        'conn: 'changes,
146        Self: 'changes,
147    {
148        self.deref_mut().update_and_fetch(changeset)
149    }
150}