tern_core/executor/sqlx_backend/
pool.rs1use crate::error::{DatabaseError as _, TernResult};
7use crate::migration::{AppliedMigration, Executor as MigrationExecutor, Query, QueryRepository};
8
9use chrono::{DateTime, Utc};
10use sqlx::pool::PoolOptions;
11use sqlx::{Acquire, Connection, Database, Encode, Executor, FromRow, IntoArguments, Pool, Type};
12use std::marker::PhantomData;
13
14pub struct SqlxExecutor<Db, Q>
16where
17 Db: Database,
18 Q: QueryRepository,
19{
20 pool: Pool<Db>,
21 _q: PhantomData<Q>,
22}
23
24impl<Db, Q> SqlxExecutor<Db, Q>
25where
26 Db: Database,
27 Q: QueryRepository,
28{
29 pub async fn new(db_url: &str) -> TernResult<Self> {
31 let pool = Pool::connect(db_url).await.tern_result()?;
32
33 Ok(Self {
34 pool,
35 _q: PhantomData,
36 })
37 }
38
39 pub async fn new_with(
41 pool_opts: PoolOptions<Db>,
42 conn_opts: <Db::Connection as Connection>::Options,
43 ) -> TernResult<Self> {
44 let pool = pool_opts.connect_with(conn_opts).await.tern_result()?;
45
46 Ok(Self {
47 pool,
48 _q: PhantomData,
49 })
50 }
51
52 pub fn pool(&self) -> Pool<Db> {
55 self.pool.clone()
56 }
57}
58
59impl<Db, Q> MigrationExecutor for SqlxExecutor<Db, Q>
64where
65 Self: Send + Sync + 'static,
66 Q: QueryRepository,
67 Db: Database,
68 for<'c> &'c mut <Db as Database>::Connection: Executor<'c, Database = Db>,
69 for<'q> <Db as Database>::Arguments<'q>: IntoArguments<'q, Db>,
70 for<'r> AppliedMigration: FromRow<'r, <Db as Database>::Row>,
71 String: Type<Db> + for<'a> Encode<'a, Db>,
72 i64: Type<Db> + for<'a> Encode<'a, Db>,
73 DateTime<Utc>: Type<Db> + for<'a> Encode<'a, Db>,
74{
75 type Queries = Q;
76
77 async fn apply_tx(&mut self, query: &Query) -> TernResult<()> {
78 let mut tx = self.pool.begin().await.tern_result()?;
79 let conn = tx.acquire().await.tern_result()?;
80 conn.execute(sqlx::raw_sql(query.sql()))
81 .await
82 .void_tern_result()?;
83 tx.commit().await.void_tern_result()?;
84
85 Ok(())
86 }
87
88 async fn apply_no_tx(&mut self, query: &Query) -> TernResult<()> {
89 let statements = query.split_statements()?;
90 for statement in statements.iter() {
91 self.pool
92 .execute(sqlx::raw_sql(statement.as_ref()))
93 .await
94 .void_tern_result()?;
95 }
96
97 Ok(())
98 }
99
100 async fn create_history_if_not_exists(&mut self, history_table: &str) -> TernResult<()> {
101 let query = Q::create_history_if_not_exists_query(history_table);
102 self.pool
103 .execute(sqlx::raw_sql(query.sql()))
104 .await
105 .void_tern_result()
106 }
107
108 async fn drop_history(&mut self, history_table: &str) -> TernResult<()> {
109 let query = Q::drop_history_query(history_table);
110 self.pool
111 .execute(sqlx::raw_sql(query.sql()))
112 .await
113 .void_tern_result()
114 }
115
116 async fn get_all_applied(&mut self, history_table: &str) -> TernResult<Vec<AppliedMigration>> {
117 let query = Q::select_star_from_history_query(history_table);
118 let applied = sqlx::query_as::<Db, AppliedMigration>(query.sql())
119 .fetch_all(&self.pool)
120 .await
121 .tern_result()?;
122
123 Ok(applied)
124 }
125
126 async fn insert_applied_migration(
133 &mut self,
134 history_table: &str,
135 applied: &AppliedMigration,
136 ) -> TernResult<()> {
137 let query = Q::insert_into_history_query(history_table, applied);
138 sqlx::query::<Db>(query.sql())
139 .bind(applied.version)
140 .bind(applied.description.clone())
141 .bind(applied.content.clone())
142 .bind(applied.duration_ms)
143 .bind(applied.applied_at)
144 .execute(&self.pool)
145 .await
146 .void_tern_result()?;
147
148 Ok(())
149 }
150
151 async fn upsert_applied_migration(
157 &mut self,
158 history_table: &str,
159 applied: &AppliedMigration,
160 ) -> TernResult<()> {
161 let query = Q::upsert_history_query(history_table, applied);
162 sqlx::query::<Db>(query.sql())
163 .bind(applied.version)
164 .bind(applied.description.clone())
165 .bind(applied.content.clone())
166 .bind(applied.duration_ms)
167 .bind(applied.applied_at)
168 .execute(&self.pool)
169 .await
170 .void_tern_result()?;
171
172 Ok(())
173 }
174}