tern_core/executor/sqlx_backend/
pool.rs

1//! [`Executor`] for the generic [`sqlx::Pool`][sqlx-pool], a pool of `sqlx`
2//! database connections.
3//!
4//! [`Executor`]: crate::migration::Executor
5//! [sqlx-pool]: https://docs.rs/sqlx/0.8.3/sqlx/struct.Pool.html
6use chrono::{DateTime, Utc};
7use sqlx::{Acquire, Database, Encode, Executor, FromRow, IntoArguments, Pool, Type};
8use std::marker::PhantomData;
9
10use crate::error::{DatabaseError as _, TernResult};
11use crate::migration::{AppliedMigration, Executor as MigrationExecutor, Query, QueryRepository};
12
13/// The generic `sqlx::Pool` as a migration executor backend.
14pub struct SqlxExecutor<Db, Q>
15where
16    Db: Database,
17    Q: QueryRepository,
18{
19    pool: Pool<Db>,
20    _q: PhantomData<Q>,
21}
22
23impl<Db, Q> SqlxExecutor<Db, Q>
24where
25    Db: Database,
26    Q: QueryRepository,
27{
28    pub async fn new(db_url: &str) -> TernResult<Self> {
29        let pool = Pool::connect(db_url).await.tern_result()?;
30
31        Ok(Self {
32            pool,
33            _q: PhantomData,
34        })
35    }
36
37    /// Exposing the underlying connection object for usage involving queries
38    /// beyond what `Executor` details.
39    pub fn pool(&self) -> Pool<Db> {
40        self.pool.clone()
41    }
42}
43
44/// `SqlxExecutor` can be an [`Executor`] fairly straightforwardly when enough
45/// bounds involving `Db: sqlx::Database` are added to make it compile.
46///
47/// [`Executor`]: crate::migration::Executor
48impl<Db, Q> MigrationExecutor for SqlxExecutor<Db, Q>
49where
50    Self: Send + Sync + 'static,
51    Q: QueryRepository,
52    Db: Database,
53    for<'c> &'c mut <Db as Database>::Connection: Executor<'c, Database = Db>,
54    for<'q> <Db as Database>::Arguments<'q>: IntoArguments<'q, Db>,
55    for<'r> AppliedMigration: FromRow<'r, <Db as Database>::Row>,
56    String: Type<Db> + for<'a> Encode<'a, Db>,
57    i64: Type<Db> + for<'a> Encode<'a, Db>,
58    DateTime<Utc>: Type<Db> + for<'a> Encode<'a, Db>,
59{
60    type Queries = Q;
61
62    async fn apply_tx(&mut self, query: &Query) -> TernResult<()> {
63        let mut tx = self.pool.begin().await.tern_result()?;
64        let conn = tx.acquire().await.tern_result()?;
65        conn.execute(sqlx::raw_sql(query.sql()))
66            .await
67            .void_tern_result()?;
68        tx.commit().await.void_tern_result()?;
69
70        Ok(())
71    }
72
73    async fn apply_no_tx(&mut self, query: &Query) -> TernResult<()> {
74        let statements = query.split_statements()?;
75        for statement in statements.iter() {
76            self.pool
77                .execute(sqlx::raw_sql(statement.as_ref()))
78                .await
79                .void_tern_result()?;
80        }
81
82        Ok(())
83    }
84
85    async fn create_history_if_not_exists(&mut self, history_table: &str) -> TernResult<()> {
86        let query = Q::create_history_if_not_exists_query(history_table);
87        self.pool
88            .execute(sqlx::raw_sql(query.sql()))
89            .await
90            .void_tern_result()
91    }
92
93    async fn drop_history(&mut self, history_table: &str) -> TernResult<()> {
94        let query = Q::drop_history_query(history_table);
95        self.pool
96            .execute(sqlx::raw_sql(query.sql()))
97            .await
98            .void_tern_result()
99    }
100
101    async fn get_all_applied(&mut self, history_table: &str) -> TernResult<Vec<AppliedMigration>> {
102        let query = Q::select_star_from_history_query(history_table);
103        let applied = sqlx::query_as::<Db, AppliedMigration>(query.sql())
104            .fetch_all(&self.pool)
105            .await
106            .tern_result()?;
107
108        Ok(applied)
109    }
110
111    /// This expects [`insert_into_history_query`] to have placeholders for
112    /// `bind`ing the fields of the `AppliedMigration`, and that they appear in
113    /// the same order as they do in the [`AppliedMigration`] struct.
114    ///
115    /// [`insert_into_history_query`]: crate::migration::QueryRepository::insert_into_history_query
116    /// [`AppliedMigration`]: crate::migration::AppliedMigration
117    async fn insert_applied_migration(
118        &mut self,
119        history_table: &str,
120        applied: &AppliedMigration,
121    ) -> TernResult<()> {
122        let query = Q::insert_into_history_query(history_table, applied);
123        sqlx::query::<Db>(query.sql())
124            .bind(applied.version)
125            .bind(applied.description.clone())
126            .bind(applied.content.clone())
127            .bind(applied.duration_ms)
128            .bind(applied.applied_at)
129            .execute(&self.pool)
130            .await
131            .void_tern_result()?;
132
133        Ok(())
134    }
135
136    /// Like [`insert_applied_migration`] this expects a query with placeholders
137    /// lining up with the order of [`AppliedMigration`] fields.
138    ///
139    /// [`insert_applied_migration`]: Self::insert_applied_migration
140    /// [`AppliedMigration`]: crate::migration::AppliedMigration
141    async fn upsert_applied_migration(
142        &mut self,
143        history_table: &str,
144        applied: &AppliedMigration,
145    ) -> TernResult<()> {
146        let query = Q::upsert_history_query(history_table, applied);
147        sqlx::query::<Db>(query.sql())
148            .bind(applied.version)
149            .bind(applied.description.clone())
150            .bind(applied.content.clone())
151            .bind(applied.duration_ms)
152            .bind(applied.applied_at)
153            .execute(&self.pool)
154            .await
155            .void_tern_result()?;
156
157        Ok(())
158    }
159}