tern_core/executor/sqlx_backend/
pool.rs1use chrono::{DateTime, Utc};
7use sqlx::{Acquire, Database, Encode, Executor, FromRow, IntoArguments, Pool, Type};
8use std::marker::PhantomData;
9
10use crate::error::{DatabaseError as _, TernResult};
11use crate::migration::{AppliedMigration, Executor as MigrationExecutor, Query, QueryRepository};
12
13pub struct SqlxExecutor<Db, Q>
15where
16 Db: Database,
17 Q: QueryRepository,
18{
19 pool: Pool<Db>,
20 _q: PhantomData<Q>,
21}
22
23impl<Db, Q> SqlxExecutor<Db, Q>
24where
25 Db: Database,
26 Q: QueryRepository,
27{
28 pub async fn new(db_url: &str) -> TernResult<Self> {
29 let pool = Pool::connect(db_url).await.tern_result()?;
30
31 Ok(Self {
32 pool,
33 _q: PhantomData,
34 })
35 }
36
37 pub fn pool(&self) -> Pool<Db> {
40 self.pool.clone()
41 }
42}
43
44impl<Db, Q> MigrationExecutor for SqlxExecutor<Db, Q>
49where
50 Self: Send + Sync + 'static,
51 Q: QueryRepository,
52 Db: Database,
53 for<'c> &'c mut <Db as Database>::Connection: Executor<'c, Database = Db>,
54 for<'q> <Db as Database>::Arguments<'q>: IntoArguments<'q, Db>,
55 for<'r> AppliedMigration: FromRow<'r, <Db as Database>::Row>,
56 String: Type<Db> + for<'a> Encode<'a, Db>,
57 i64: Type<Db> + for<'a> Encode<'a, Db>,
58 DateTime<Utc>: Type<Db> + for<'a> Encode<'a, Db>,
59{
60 type Queries = Q;
61
62 async fn apply_tx(&mut self, query: &Query) -> TernResult<()> {
63 let mut tx = self.pool.begin().await.tern_result()?;
64 let conn = tx.acquire().await.tern_result()?;
65 conn.execute(sqlx::raw_sql(query.sql()))
66 .await
67 .void_tern_result()?;
68 tx.commit().await.void_tern_result()?;
69
70 Ok(())
71 }
72
73 async fn apply_no_tx(&mut self, query: &Query) -> TernResult<()> {
74 let statements = query.split_statements()?;
75 for statement in statements.iter() {
76 self.pool
77 .execute(sqlx::raw_sql(statement.as_ref()))
78 .await
79 .void_tern_result()?;
80 }
81
82 Ok(())
83 }
84
85 async fn create_history_if_not_exists(&mut self, history_table: &str) -> TernResult<()> {
86 let query = Q::create_history_if_not_exists_query(history_table);
87 self.pool
88 .execute(sqlx::raw_sql(query.sql()))
89 .await
90 .void_tern_result()
91 }
92
93 async fn drop_history(&mut self, history_table: &str) -> TernResult<()> {
94 let query = Q::drop_history_query(history_table);
95 self.pool
96 .execute(sqlx::raw_sql(query.sql()))
97 .await
98 .void_tern_result()
99 }
100
101 async fn get_all_applied(&mut self, history_table: &str) -> TernResult<Vec<AppliedMigration>> {
102 let query = Q::select_star_from_history_query(history_table);
103 let applied = sqlx::query_as::<Db, AppliedMigration>(query.sql())
104 .fetch_all(&self.pool)
105 .await
106 .tern_result()?;
107
108 Ok(applied)
109 }
110
111 async fn insert_applied_migration(
118 &mut self,
119 history_table: &str,
120 applied: &AppliedMigration,
121 ) -> TernResult<()> {
122 let query = Q::insert_into_history_query(history_table, applied);
123 sqlx::query::<Db>(query.sql())
124 .bind(applied.version)
125 .bind(applied.description.clone())
126 .bind(applied.content.clone())
127 .bind(applied.duration_ms)
128 .bind(applied.applied_at)
129 .execute(&self.pool)
130 .await
131 .void_tern_result()?;
132
133 Ok(())
134 }
135
136 async fn upsert_applied_migration(
142 &mut self,
143 history_table: &str,
144 applied: &AppliedMigration,
145 ) -> TernResult<()> {
146 let query = Q::upsert_history_query(history_table, applied);
147 sqlx::query::<Db>(query.sql())
148 .bind(applied.version)
149 .bind(applied.description.clone())
150 .bind(applied.content.clone())
151 .bind(applied.duration_ms)
152 .bind(applied.applied_at)
153 .execute(&self.pool)
154 .await
155 .void_tern_result()?;
156
157 Ok(())
158 }
159}