Skip to main content

premix_core/
executor.rs

1use crate::dialect::SqlDialect;
2use sqlx::Database;
3
4// Chapter 7: Stronger Executor Abstraction
5/// A unified database executor that can wrap either a connection pool or a single connection.
6///
7/// This allows Premix to remain agnostic about whether it's executing within a transaction,
8/// a shared pool, or a dedicated connection.
9pub enum Executor<'a, DB: Database> {
10    /// A shared connection pool.
11    Pool(&'a sqlx::Pool<DB>),
12    /// A single, mutable database connection.
13    Conn(&'a mut DB::Connection),
14}
15
16impl<'a, DB: Database> std::fmt::Debug for Executor<'a, DB> {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        match self {
19            Self::Pool(_) => f.write_str("Executor::Pool"),
20            Self::Conn(_) => f.write_str("Executor::Conn"),
21        }
22    }
23}
24
25// SAFETY: Executor only contains a reference to a Pool or a mutable reference to a Connection.
26// Pools are thread-safe by design in sqlx. Connections are usually Send, and our bounds
27// ensure that the manual implementation matches the underlying database driver's capabilities.
28unsafe impl<'a, DB: Database> Send for Executor<'a, DB> where DB::Connection: Send {}
29// SAFETY: Similarly, Executor is Sync if the underlying Connection/Pool is Sync.
30unsafe impl<'a, DB: Database> Sync for Executor<'a, DB> where DB::Connection: Sync {}
31
32impl<'a, DB: Database> From<&'a sqlx::Pool<DB>> for Executor<'a, DB> {
33    fn from(pool: &'a sqlx::Pool<DB>) -> Self {
34        Self::Pool(pool)
35    }
36}
37
38impl<'a, DB: Database> From<&'a mut DB::Connection> for Executor<'a, DB> {
39    fn from(conn: &'a mut DB::Connection) -> Self {
40        Self::Conn(conn)
41    }
42}
43
44/// A trait for types that can be converted into an [`Executor`].
45pub trait IntoExecutor<'a>: Send + 'a {
46    /// The database dialect associated with this executor.
47    type DB: SqlDialect;
48    /// Converts the type into an [`Executor`].
49    fn into_executor(self) -> Executor<'a, Self::DB>;
50}
51
52impl<'a, DB: SqlDialect> IntoExecutor<'a> for &'a sqlx::Pool<DB> {
53    type DB = DB;
54    fn into_executor(self) -> Executor<'a, DB> {
55        Executor::Pool(self)
56    }
57}
58
59#[cfg(feature = "sqlite")]
60impl<'a> IntoExecutor<'a> for &'a mut sqlx::SqliteConnection {
61    type DB = sqlx::Sqlite;
62    fn into_executor(self) -> Executor<'a, Self::DB> {
63        Executor::Conn(self)
64    }
65}
66
67#[cfg(feature = "postgres")]
68impl<'a> IntoExecutor<'a> for &'a mut sqlx::postgres::PgConnection {
69    type DB = sqlx::Postgres;
70    fn into_executor(self) -> Executor<'a, Self::DB> {
71        Executor::Conn(self)
72    }
73}
74
75impl<'a, DB: SqlDialect> IntoExecutor<'a> for Executor<'a, DB> {
76    type DB = DB;
77    fn into_executor(self) -> Executor<'a, DB> {
78        self
79    }
80}
81
82impl<'a, DB: Database> Executor<'a, DB> {
83    /// Executes a SQL query and returns the database result (e.g., number of rows affected).
84    pub async fn execute<'q, A>(
85        &mut self,
86        query: sqlx::query::Query<'q, DB, A>,
87    ) -> Result<DB::QueryResult, sqlx::Error>
88    where
89        A: sqlx::IntoArguments<'q, DB> + 'q,
90        DB: SqlDialect,
91        for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
92    {
93        match self {
94            Self::Pool(pool) => query.execute(*pool).await,
95            Self::Conn(conn) => query.execute(&mut **conn).await,
96        }
97    }
98
99    /// Executes a SQL query and fetches all resulting rows.
100    pub async fn fetch_all<'q, T, A>(
101        &mut self,
102        query: sqlx::query::QueryAs<'q, DB, T, A>,
103    ) -> Result<Vec<T>, sqlx::Error>
104    where
105        T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin,
106        A: sqlx::IntoArguments<'q, DB> + 'q,
107        DB: SqlDialect,
108        for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
109    {
110        match self {
111            Self::Pool(pool) => query.fetch_all(*pool).await,
112            Self::Conn(conn) => query.fetch_all(&mut **conn).await,
113        }
114    }
115
116    /// Executes a SQL query and fetches an optional single row.
117    pub async fn fetch_optional<'q, T, A>(
118        &mut self,
119        query: sqlx::query::QueryAs<'q, DB, T, A>,
120    ) -> Result<Option<T>, sqlx::Error>
121    where
122        T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin,
123        A: sqlx::IntoArguments<'q, DB> + 'q,
124        DB: SqlDialect,
125        for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
126    {
127        match self {
128            Self::Pool(pool) => query.fetch_optional(*pool).await,
129            Self::Conn(conn) => query.fetch_optional(&mut **conn).await,
130        }
131    }
132
133    /// Executes a SQL query and returns a stream of resulting rows.
134    pub fn fetch_stream<'q, T, A>(
135        self,
136        query: sqlx::query::QueryAs<'q, DB, T, A>,
137    ) -> futures_util::stream::BoxStream<'a, Result<T, sqlx::Error>>
138    where
139        T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin + 'a,
140        A: sqlx::IntoArguments<'q, DB> + 'q,
141        'q: 'a,
142        DB: SqlDialect,
143        for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
144    {
145        match self {
146            Self::Pool(pool) => query.fetch(pool),
147            Self::Conn(conn) => query.fetch(conn),
148        }
149    }
150}