1use chrono::{DateTime, Utc};
16use futures_core::{future::BoxFuture, Future};
17use std::{fmt::Write, time::Instant};
18
19use crate::error::{DatabaseError as _, TernResult};
20
21pub trait MigrationContext
23where
24 Self: MigrationSource<Ctx = Self> + Send + Sync + 'static,
25{
26 const HISTORY_TABLE: &str;
32
33 type Exec: Executor;
35
36 fn executor(&mut self) -> &mut Self::Exec;
38
39 fn apply<'migration, 'conn: 'migration, M>(
43 &'conn mut self,
44 migration: &'migration M,
45 ) -> BoxFuture<'migration, TernResult<AppliedMigration>>
46 where
47 M: Migration<Ctx = Self> + Send + Sync + ?Sized,
48 {
49 Box::pin(async move {
50 let start = Instant::now();
51 let query = M::build(migration, self).await?;
52 let executor = self.executor();
53
54 if migration.no_tx() {
55 executor
56 .apply_no_tx(&query)
57 .await
58 .void_tern_migration_result(migration)?;
59 } else {
60 executor
61 .apply_tx(&query)
62 .await
63 .void_tern_migration_result(migration)?;
64 }
65
66 let applied_at = Utc::now();
67 let duration_ms = start.elapsed().as_millis() as i64;
68 let applied = migration.to_applied(duration_ms, applied_at, query.sql());
69 executor
70 .insert_applied_migration(Self::HISTORY_TABLE, &applied)
71 .await?;
72
73 Ok(applied)
74 })
75 }
76
77 fn latest_version(&mut self) -> BoxFuture<'_, TernResult<Option<i64>>> {
79 Box::pin(async move {
80 let executor = self.executor();
81 let latest = executor
82 .get_all_applied(Self::HISTORY_TABLE)
83 .await?
84 .into_iter()
85 .fold(None, |acc, m| match acc {
86 None => Some(m.version),
87 Some(v) if m.version > v => Some(m.version),
88 _ => acc,
89 });
90
91 Ok(latest)
92 })
93 }
94
95 fn previously_applied(&mut self) -> BoxFuture<'_, TernResult<Vec<AppliedMigration>>> {
97 Box::pin(async move {
98 let executor = self.executor();
99 let applied = executor.get_all_applied(Self::HISTORY_TABLE).await?;
100
101 Ok(applied)
102 })
103 }
104
105 fn check_history_table(&mut self) -> BoxFuture<'_, TernResult<()>> {
107 Box::pin(async move {
108 let executor = self.executor();
109 executor
110 .create_history_if_not_exists(Self::HISTORY_TABLE)
111 .await?;
112
113 Ok(())
114 })
115 }
116
117 fn drop_history_table(&mut self) -> BoxFuture<'_, TernResult<()>> {
119 Box::pin(async move {
120 let executor = self.executor();
121 executor.drop_history(Self::HISTORY_TABLE).await?;
122
123 Ok(())
124 })
125 }
126
127 fn upsert_applied<'migration, 'conn: 'migration>(
129 &'conn mut self,
130 applied: &'migration AppliedMigration,
131 ) -> BoxFuture<'migration, TernResult<()>> {
132 Box::pin(async move {
133 self.executor()
134 .upsert_applied_migration(Self::HISTORY_TABLE, applied)
135 .await?;
136
137 Ok(())
138 })
139 }
140}
141
142pub trait Executor
145where
146 Self: Send + Sync + 'static,
147{
148 type Queries: QueryRepository;
151
152 fn apply_tx(&mut self, query: &Query) -> impl Future<Output = TernResult<()>> + Send;
154
155 fn apply_no_tx(&mut self, query: &Query) -> impl Future<Output = TernResult<()>> + Send;
157
158 fn create_history_if_not_exists(
160 &mut self,
161 history_table: &str,
162 ) -> impl Future<Output = TernResult<()>> + Send;
163
164 fn drop_history(&mut self, history_table: &str) -> impl Future<Output = TernResult<()>> + Send;
166
167 fn get_all_applied(
169 &mut self,
170 history_table: &str,
171 ) -> impl Future<Output = TernResult<Vec<AppliedMigration>>> + Send;
172
173 fn insert_applied_migration(
175 &mut self,
176 history_table: &str,
177 applied: &AppliedMigration,
178 ) -> impl Future<Output = TernResult<()>> + Send;
179
180 fn upsert_applied_migration(
182 &mut self,
183 history_table: &str,
184 applied: &AppliedMigration,
185 ) -> impl Future<Output = TernResult<()>> + Send;
186}
187
188pub trait QueryRepository {
191 fn create_history_if_not_exists_query(history_table: &str) -> Query;
194
195 fn drop_history_query(history_table: &str) -> Query;
197
198 fn insert_into_history_query(history_table: &str, applied: &AppliedMigration) -> Query;
200
201 fn select_star_from_history_query(history_table: &str) -> Query;
203
204 fn upsert_history_query(history_table: &str, applied: &AppliedMigration) -> Query;
206}
207
208pub trait Migration
210where
211 Self: Send + Sync,
212{
213 type Ctx: MigrationContext;
215
216 fn migration_id(&self) -> MigrationId;
218
219 fn content(&self) -> String;
222
223 fn no_tx(&self) -> bool;
225
226 fn build<'a>(&'a self, ctx: &'a mut Self::Ctx) -> BoxFuture<'a, TernResult<Query>>;
228
229 fn version(&self) -> i64 {
231 self.migration_id().version()
232 }
233
234 fn to_applied(
237 &self,
238 duration_ms: i64,
239 applied_at: DateTime<Utc>,
240 content: &str,
241 ) -> AppliedMigration {
242 AppliedMigration::new(self.migration_id(), content, duration_ms, applied_at)
243 }
244}
245
246pub trait MigrationSource {
249 type Ctx: MigrationContext;
252
253 fn migration_set(&self, latest_version: Option<i64>) -> MigrationSet<Self::Ctx>;
255}
256
257pub struct MigrationSet<Ctx: ?Sized> {
260 pub migrations: Vec<Box<dyn Migration<Ctx = Ctx>>>,
261}
262
263impl<Ctx> MigrationSet<Ctx>
264where
265 Ctx: MigrationContext,
266{
267 pub fn new<T>(vs: T) -> MigrationSet<Ctx>
268 where
269 T: Into<Vec<Box<dyn Migration<Ctx = Ctx>>>>,
270 {
271 let mut migrations = vs.into();
272 migrations.sort_by_key(|m| m.version());
273 MigrationSet { migrations }
274 }
275
276 pub fn len(&self) -> usize {
278 self.migrations.len()
279 }
280
281 pub fn versions(&self) -> Vec<i64> {
283 self.migrations
284 .iter()
285 .map(|m| m.version())
286 .collect::<Vec<_>>()
287 }
288
289 pub fn migration_ids(&self) -> Vec<MigrationId> {
291 self.migrations
292 .iter()
293 .map(|m| m.migration_id())
294 .collect::<Vec<_>>()
295 }
296
297 pub fn max(&self) -> Option<i64> {
299 self.versions().iter().max().copied()
300 }
301
302 pub fn is_empty(&self) -> bool {
304 self.len() == 0
305 }
306}
307
308pub trait QueryBuilder {
314 type Ctx: MigrationContext;
316
317 fn build(&self, ctx: &mut Self::Ctx) -> impl Future<Output = TernResult<Query>> + Send;
319}
320
321pub struct Query(String);
323
324impl Query {
325 pub fn new(sql: String) -> Self {
326 Self(sql)
327 }
328
329 pub fn sql(&self) -> &str {
330 &self.0
331 }
332
333 pub fn split_statements(&self) -> TernResult<Vec<String>> {
345 let mut statements = Vec::new();
346 self.sql()
347 .lines()
348 .try_fold(String::new(), |mut buf, line| {
349 let line = line.trim();
350 writeln!(buf, "{line}")?;
351 if line.ends_with(";") && !line.starts_with("--") {
355 statements.push(buf);
356 Ok::<String, std::fmt::Error>(String::new())
357 } else {
358 Ok(buf)
359 }
360 })?;
361
362 Ok(statements)
363 }
364}
365
366#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
368pub struct MigrationId {
369 version: i64,
371 description: String,
373}
374
375impl MigrationId {
376 pub fn new(version: i64, description: String) -> Self {
377 Self {
378 version,
379 description,
380 }
381 }
382
383 pub fn version(&self) -> i64 {
384 self.version
385 }
386
387 pub fn description(&self) -> String {
388 self.description.clone()
389 }
390}
391
392impl std::fmt::Display for MigrationId {
393 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394 write!(f, "V{}__{}", self.version, self.description)
395 }
396}
397
398impl From<AppliedMigration> for MigrationId {
399 fn from(value: AppliedMigration) -> Self {
400 Self {
401 version: value.version,
402 description: value.description,
403 }
404 }
405}
406
407#[derive(Debug, Clone)]
410#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
411pub struct AppliedMigration {
412 pub version: i64,
414 pub description: String,
416 pub content: String,
418 pub duration_ms: i64,
420 pub applied_at: DateTime<Utc>,
422}
423
424impl AppliedMigration {
425 pub fn new(
426 id: MigrationId,
427 content: &str,
428 duration_ms: i64,
429 applied_at: DateTime<Utc>,
430 ) -> Self {
431 Self {
432 version: id.version,
433 description: id.description,
434 content: content.into(),
435 duration_ms,
436 applied_at,
437 }
438 }
439}