tern_core/executor/sqlx_backend/
pool.rs

1//! [`Executor`] for the generic [`sqlx::Pool`][sqlx-pool], a pool of `sqlx`
2//! database connections.
3//!
4//! [`Executor`]: crate::migration::Executor
5//! [sqlx-pool]: https://docs.rs/sqlx/0.8.3/sqlx/struct.Pool.html
6use crate::error::{DatabaseError as _, TernResult};
7use crate::migration::{AppliedMigration, Executor as MigrationExecutor, Query, QueryRepository};
8
9use chrono::{DateTime, Utc};
10use sqlx::pool::PoolOptions;
11use sqlx::{Acquire, Connection, Database, Encode, Executor, FromRow, IntoArguments, Pool, Type};
12use std::marker::PhantomData;
13
14/// The generic `sqlx::Pool` as a migration executor backend.
15pub struct SqlxExecutor<Db, Q>
16where
17    Db: Database,
18    Q: QueryRepository,
19{
20    pool: Pool<Db>,
21    _q: PhantomData<Q>,
22}
23
24impl<Db, Q> SqlxExecutor<Db, Q>
25where
26    Db: Database,
27    Q: QueryRepository,
28{
29    /// Create a pool with default options from a connection string.
30    pub async fn new(db_url: &str) -> TernResult<Self> {
31        let pool = Pool::connect(db_url).await.tern_result()?;
32
33        Ok(Self {
34            pool,
35            _q: PhantomData,
36        })
37    }
38
39    /// Create the pool from the given options.
40    pub async fn new_with(
41        pool_opts: PoolOptions<Db>,
42        conn_opts: <Db::Connection as Connection>::Options,
43    ) -> TernResult<Self> {
44        let pool = pool_opts.connect_with(conn_opts).await.tern_result()?;
45
46        Ok(Self {
47            pool,
48            _q: PhantomData,
49        })
50    }
51
52    /// Exposing the underlying connection object for usage involving queries
53    /// beyond what `Executor` details.
54    pub fn pool(&self) -> Pool<Db> {
55        self.pool.clone()
56    }
57}
58
59/// `SqlxExecutor` can be an [`Executor`] fairly straightforwardly when enough
60/// bounds involving `Db: sqlx::Database` are added to make it compile.
61///
62/// [`Executor`]: crate::migration::Executor
63impl<Db, Q> MigrationExecutor for SqlxExecutor<Db, Q>
64where
65    Self: Send + Sync + 'static,
66    Q: QueryRepository,
67    Db: Database,
68    for<'c> &'c mut <Db as Database>::Connection: Executor<'c, Database = Db>,
69    for<'q> <Db as Database>::Arguments<'q>: IntoArguments<'q, Db>,
70    for<'r> AppliedMigration: FromRow<'r, <Db as Database>::Row>,
71    String: Type<Db> + for<'a> Encode<'a, Db>,
72    i64: Type<Db> + for<'a> Encode<'a, Db>,
73    DateTime<Utc>: Type<Db> + for<'a> Encode<'a, Db>,
74{
75    type Queries = Q;
76
77    async fn apply_tx(&mut self, query: &Query) -> TernResult<()> {
78        let mut tx = self.pool.begin().await.tern_result()?;
79        let conn = tx.acquire().await.tern_result()?;
80        conn.execute(sqlx::raw_sql(query.sql()))
81            .await
82            .void_tern_result()?;
83        tx.commit().await.void_tern_result()?;
84
85        Ok(())
86    }
87
88    async fn apply_no_tx(&mut self, query: &Query) -> TernResult<()> {
89        let statements = query.split_statements()?;
90        for statement in statements.iter() {
91            self.pool
92                .execute(sqlx::raw_sql(statement.as_ref()))
93                .await
94                .void_tern_result()?;
95        }
96
97        Ok(())
98    }
99
100    async fn create_history_if_not_exists(&mut self, history_table: &str) -> TernResult<()> {
101        let query = Q::create_history_if_not_exists_query(history_table);
102        self.pool
103            .execute(sqlx::raw_sql(query.sql()))
104            .await
105            .void_tern_result()
106    }
107
108    async fn drop_history(&mut self, history_table: &str) -> TernResult<()> {
109        let query = Q::drop_history_query(history_table);
110        self.pool
111            .execute(sqlx::raw_sql(query.sql()))
112            .await
113            .void_tern_result()
114    }
115
116    async fn get_all_applied(&mut self, history_table: &str) -> TernResult<Vec<AppliedMigration>> {
117        let query = Q::select_star_from_history_query(history_table);
118        let applied = sqlx::query_as::<Db, AppliedMigration>(query.sql())
119            .fetch_all(&self.pool)
120            .await
121            .tern_result()?;
122
123        Ok(applied)
124    }
125
126    /// This expects [`insert_into_history_query`] to have placeholders for
127    /// `bind`ing the fields of the `AppliedMigration`, and that they appear in
128    /// the same order as they do in the [`AppliedMigration`] struct.
129    ///
130    /// [`insert_into_history_query`]: crate::migration::QueryRepository::insert_into_history_query
131    /// [`AppliedMigration`]: crate::migration::AppliedMigration
132    async fn insert_applied_migration(
133        &mut self,
134        history_table: &str,
135        applied: &AppliedMigration,
136    ) -> TernResult<()> {
137        let query = Q::insert_into_history_query(history_table, applied);
138        sqlx::query::<Db>(query.sql())
139            .bind(applied.version)
140            .bind(applied.description.clone())
141            .bind(applied.content.clone())
142            .bind(applied.duration_ms)
143            .bind(applied.applied_at)
144            .execute(&self.pool)
145            .await
146            .void_tern_result()?;
147
148        Ok(())
149    }
150
151    /// Like [`insert_applied_migration`] this expects a query with placeholders
152    /// lining up with the order of [`AppliedMigration`] fields.
153    ///
154    /// [`insert_applied_migration`]: Self::insert_applied_migration
155    /// [`AppliedMigration`]: crate::migration::AppliedMigration
156    async fn upsert_applied_migration(
157        &mut self,
158        history_table: &str,
159        applied: &AppliedMigration,
160    ) -> TernResult<()> {
161        let query = Q::upsert_history_query(history_table, applied);
162        sqlx::query::<Db>(query.sql())
163            .bind(applied.version)
164            .bind(applied.description.clone())
165            .bind(applied.content.clone())
166            .bind(applied.duration_ms)
167            .bind(applied.applied_at)
168            .execute(&self.pool)
169            .await
170            .void_tern_result()?;
171
172        Ok(())
173    }
174}