1use crate::dialect::SqlDialect;
2use sqlx::Database;
3
4pub enum Executor<'a, DB: Database> {
10 Pool(&'a sqlx::Pool<DB>),
12 Conn(&'a mut DB::Connection),
14}
15
16impl<'a, DB: Database> std::fmt::Debug for Executor<'a, DB> {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 match self {
19 Self::Pool(_) => f.write_str("Executor::Pool"),
20 Self::Conn(_) => f.write_str("Executor::Conn"),
21 }
22 }
23}
24
25unsafe impl<'a, DB: Database> Send for Executor<'a, DB> where DB::Connection: Send {}
29unsafe impl<'a, DB: Database> Sync for Executor<'a, DB> where DB::Connection: Sync {}
31
32impl<'a, DB: Database> From<&'a sqlx::Pool<DB>> for Executor<'a, DB> {
33 fn from(pool: &'a sqlx::Pool<DB>) -> Self {
34 Self::Pool(pool)
35 }
36}
37
38impl<'a, DB: Database> From<&'a mut DB::Connection> for Executor<'a, DB> {
39 fn from(conn: &'a mut DB::Connection) -> Self {
40 Self::Conn(conn)
41 }
42}
43
44pub trait IntoExecutor<'a>: Send + 'a {
46 type DB: SqlDialect;
48 fn into_executor(self) -> Executor<'a, Self::DB>;
50}
51
52impl<'a, DB: SqlDialect> IntoExecutor<'a> for &'a sqlx::Pool<DB> {
53 type DB = DB;
54 fn into_executor(self) -> Executor<'a, DB> {
55 Executor::Pool(self)
56 }
57}
58
59#[cfg(feature = "sqlite")]
60impl<'a> IntoExecutor<'a> for &'a mut sqlx::SqliteConnection {
61 type DB = sqlx::Sqlite;
62 fn into_executor(self) -> Executor<'a, Self::DB> {
63 Executor::Conn(self)
64 }
65}
66
67#[cfg(feature = "postgres")]
68impl<'a> IntoExecutor<'a> for &'a mut sqlx::postgres::PgConnection {
69 type DB = sqlx::Postgres;
70 fn into_executor(self) -> Executor<'a, Self::DB> {
71 Executor::Conn(self)
72 }
73}
74
75impl<'a, DB: SqlDialect> IntoExecutor<'a> for Executor<'a, DB> {
76 type DB = DB;
77 fn into_executor(self) -> Executor<'a, DB> {
78 self
79 }
80}
81
82impl<'a, DB: Database> Executor<'a, DB> {
83 pub async fn execute<'q, A>(
85 &mut self,
86 query: sqlx::query::Query<'q, DB, A>,
87 ) -> Result<DB::QueryResult, sqlx::Error>
88 where
89 A: sqlx::IntoArguments<'q, DB> + 'q,
90 DB: SqlDialect,
91 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
92 {
93 match self {
94 Self::Pool(pool) => query.execute(*pool).await,
95 Self::Conn(conn) => query.execute(&mut **conn).await,
96 }
97 }
98
99 pub async fn fetch_all<'q, T, A>(
101 &mut self,
102 query: sqlx::query::QueryAs<'q, DB, T, A>,
103 ) -> Result<Vec<T>, sqlx::Error>
104 where
105 T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin,
106 A: sqlx::IntoArguments<'q, DB> + 'q,
107 DB: SqlDialect,
108 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
109 {
110 match self {
111 Self::Pool(pool) => query.fetch_all(*pool).await,
112 Self::Conn(conn) => query.fetch_all(&mut **conn).await,
113 }
114 }
115
116 pub async fn fetch_optional<'q, T, A>(
118 &mut self,
119 query: sqlx::query::QueryAs<'q, DB, T, A>,
120 ) -> Result<Option<T>, sqlx::Error>
121 where
122 T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin,
123 A: sqlx::IntoArguments<'q, DB> + 'q,
124 DB: SqlDialect,
125 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
126 {
127 match self {
128 Self::Pool(pool) => query.fetch_optional(*pool).await,
129 Self::Conn(conn) => query.fetch_optional(&mut **conn).await,
130 }
131 }
132
133 pub fn fetch_stream<'q, T, A>(
135 self,
136 query: sqlx::query::QueryAs<'q, DB, T, A>,
137 ) -> futures_util::stream::BoxStream<'a, Result<T, sqlx::Error>>
138 where
139 T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin + 'a,
140 A: sqlx::IntoArguments<'q, DB> + 'q,
141 'q: 'a,
142 DB: SqlDialect,
143 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
144 {
145 match self {
146 Self::Pool(pool) => query.fetch(pool),
147 Self::Conn(conn) => query.fetch(conn),
148 }
149 }
150}