1use crate::error::{DatabaseError as _, Error, TernResult};
9use crate::migration::{AppliedMigration, Migration, MigrationContext, MigrationId};
10
11use chrono::{DateTime, Utc};
12use display_json::{DebugAsJson, DisplayAsJsonPretty};
13use serde::Serialize;
14use std::collections::HashSet;
15use std::fmt::Write;
16
17pub struct Runner<C: MigrationContext> {
19 context: C,
20}
21
22impl<C> Runner<C>
23where
24 C: MigrationContext,
25{
26 pub fn new(context: C) -> Self {
27 Self { context }
28 }
29
30 pub async fn init_history(&mut self) -> TernResult<()> {
32 self.context.check_history_table().await
33 }
34
35 pub async fn drop_history(&mut self) -> TernResult<()> {
37 self.context.drop_history_table().await
38 }
39
40 async fn validate_source(&mut self) -> TernResult<()> {
48 self.context.check_history_table().await?;
49
50 let applied: HashSet<MigrationId> = self
51 .context
52 .previously_applied()
53 .await?
54 .into_iter()
55 .map(MigrationId::from)
56 .collect();
57 let source: HashSet<MigrationId> = self
58 .context
59 .migration_set(None)
60 .migration_ids()
61 .into_iter()
62 .collect();
63
64 if !source.is_superset(&applied) {
65 let sym_diff: HashSet<_> = applied.symmetric_difference(&source).collect();
66 let msg = sym_diff
70 .into_iter()
71 .map(|id| id.to_string())
72 .collect::<Vec<_>>()
73 .join(", ");
74
75 return Err(Error::MissingSource {
76 local: source.len() as i64,
77 history: applied.len() as i64,
78 msg,
79 });
80 }
81
82 Ok(())
83 }
84
85 fn validate_target(
87 &self,
88 last_applied: Option<i64>,
89 target_version: Option<i64>,
90 ) -> TernResult<()> {
91 let Some(source) = self.context.migration_set(None).max() else {
92 return Ok(());
93 };
94 if let Some(target) = target_version {
95 match last_applied {
96 Some(applied) if target < applied => Err(Error::Invalid(format!(
97 "target version V{target} earlier than latest applied version V{applied}",
98 )))?,
99 _ if target > source => Err(Error::Invalid(format!(
100 "target version V{target} does not exist, latest version found was V{source}",
101 )))?,
102 _ => Ok(()),
103 }
104 } else {
105 Ok(())
106 }
107 }
108
109 pub async fn run_apply(
116 &mut self,
117 target_version: Option<i64>,
118 dryrun: bool,
119 ) -> TernResult<Report> {
120 self.validate_source().await?;
121 let last_applied = self.context.latest_version().await?;
122 self.validate_target(last_applied, target_version)?;
123
124 let unapplied = self.context.migration_set(last_applied);
125
126 let mut results = Vec::new();
127 for migration in &unapplied.migrations {
128 let id = migration.migration_id();
129 let ver = migration.version();
130
131 if matches!(target_version, Some(end) if ver > end) {
133 break;
134 }
135
136 let result = if dryrun {
137 let query = migration
139 .build(&mut self.context)
140 .await
141 .with_report(&results)?;
142
143 MigrationResult::from_unapplied(migration.as_ref(), query.sql())
144 } else {
145 log::trace!("applying migration {id}");
146
147 self.context
148 .apply(migration.as_ref())
149 .await
150 .tern_migration_result(migration.as_ref())
151 .with_report(&results)
152 .map(|v| MigrationResult::from_applied(&v, Some(migration.no_tx())))?
153 };
154
155 results.push(result);
156 }
157
158 Ok(Report::new(results))
159 }
160
161 #[deprecated(since = "3.1.0", note = "use `run_apply_all` with `dryrun = false`")]
163 pub async fn apply_all(&mut self) -> TernResult<Report> {
164 self.run_apply(None, false).await
165 }
166
167 pub async fn run_apply_all(&mut self, dryrun: bool) -> TernResult<Report> {
174 self.run_apply(None, dryrun).await
175 }
176
177 pub async fn list_applied(&mut self) -> TernResult<Report> {
179 self.validate_source().await?;
180
181 let applied = self
182 .context
183 .previously_applied()
184 .await?
185 .iter()
186 .map(|m| MigrationResult::from_applied(m, None))
187 .collect::<Vec<_>>();
188 let report = Report::new(applied);
189
190 Ok(report)
191 }
192
193 #[deprecated(since = "3.1.0", note = "no valid use case for `start_version`")]
194 pub async fn soft_apply(
195 &mut self,
196 start_version: Option<i64>,
197 target_version: Option<i64>,
198 ) -> TernResult<Report> {
199 if start_version.is_some() {
200 return Err(Error::Invalid(
201 "no valid `start_version` other than the first unapplied, use `run_soft_apply`"
202 .into(),
203 ));
204 }
205 self.run_soft_apply(target_version, false).await
206 }
207
208 pub async fn run_soft_apply(
221 &mut self,
222 target_version: Option<i64>,
223 dryrun: bool,
224 ) -> TernResult<Report> {
225 self.validate_source().await?;
226 let last_applied = self.context.latest_version().await?;
227 self.validate_target(last_applied, target_version)?;
228
229 let unapplied = self.context.migration_set(last_applied);
230
231 let mut results = Vec::new();
232 for migration in &unapplied.migrations {
233 let id = migration.migration_id();
234 let ver = migration.version();
235
236 if matches!(target_version, Some(end) if ver > end) {
238 break;
239 }
240
241 let query = migration
243 .build(&mut self.context)
244 .await
245 .with_report(&results)?;
246 let mut content = String::from("-- SOFT APPLIED:\n\n");
247 writeln!(content, "{query}")?;
248
249 let applied = migration.to_applied(0, Utc::now(), &content);
250 let result = MigrationResult::from_soft_applied(&applied, dryrun);
251
252 if !dryrun {
253 log::trace!("soft applying migration {id}");
254 self.context
255 .insert_applied(&applied)
256 .await
257 .with_report(&results)?;
258 }
259
260 results.push(result);
261 }
262 let report = Report::new(results);
263
264 Ok(report)
265 }
266}
267
268#[derive(Clone, Serialize, DebugAsJson, DisplayAsJsonPretty)]
270pub struct Report {
271 migrations: Vec<MigrationResult>,
272}
273
274impl Report {
275 pub fn new(migrations: Vec<MigrationResult>) -> Self {
276 Self { migrations }
277 }
278
279 pub fn count(&self) -> usize {
280 self.migrations.len()
281 }
282
283 pub fn results(&self) -> Vec<MigrationResult> {
285 self.migrations.clone()
286 }
287
288 pub fn iter_results(&self) -> impl Iterator<Item = MigrationResult> {
290 self.migrations.clone().into_iter()
291 }
292}
293
294#[derive(Clone, Serialize, DebugAsJson, DisplayAsJsonPretty)]
297#[allow(dead_code)]
298pub struct MigrationResult {
299 dryrun: bool,
300 version: i64,
301 state: MigrationState,
302 applied_at: Option<DateTime<Utc>>,
303 description: String,
304 content: String,
305 transactional: Transactional,
306 duration_ms: RunDuration,
307}
308
309impl MigrationResult {
310 pub(crate) fn from_applied(applied: &AppliedMigration, no_tx: Option<bool>) -> Self {
311 Self {
312 dryrun: false,
313 version: applied.version,
314 state: MigrationState::Applied,
315 applied_at: Some(applied.applied_at),
316 description: applied.description.clone(),
317 content: applied.content.clone(),
318 transactional: no_tx
319 .map(Transactional::from_boolean)
320 .unwrap_or(Transactional::Other("Previously applied".to_string())),
321 duration_ms: RunDuration::Duration(applied.duration_ms),
322 }
323 }
324
325 pub(crate) fn from_soft_applied(applied: &AppliedMigration, dryrun: bool) -> Self {
326 Self {
327 dryrun,
328 version: applied.version,
329 state: MigrationState::SoftApplied,
330 applied_at: Some(applied.applied_at),
331 description: applied.description.clone(),
332 content: applied.content.clone(),
333 transactional: Transactional::Other("Soft applied".to_string()),
334 duration_ms: RunDuration::Duration(applied.duration_ms),
335 }
336 }
337
338 pub(crate) fn from_unapplied<M>(migration: &M, content: &str) -> Self
339 where
340 M: Migration + ?Sized,
341 {
342 Self {
343 dryrun: true,
344 version: migration.version(),
345 state: MigrationState::Unapplied,
346 applied_at: None,
347 description: migration.migration_id().description(),
348 content: content.into(),
349 transactional: Transactional::from_boolean(migration.no_tx()),
350 duration_ms: RunDuration::Unapplied,
351 }
352 }
353}
354
355#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Serialize)]
356enum MigrationState {
357 Applied,
358 SoftApplied,
359 Unapplied,
360}
361
362#[derive(Debug, Clone, Serialize)]
363enum Transactional {
364 NoTransaction,
365 InTransaction,
366 Other(String),
367}
368
369impl Transactional {
370 fn from_boolean(v: bool) -> Self {
371 if v {
372 return Self::NoTransaction;
373 };
374 Self::InTransaction
375 }
376}
377
378#[derive(Debug, Clone, Copy, Serialize)]
379enum RunDuration {
380 Duration(i64),
381 Unapplied,
382}
383
384impl std::fmt::Display for Transactional {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 Self::NoTransaction => write!(f, "No Transaction"),
388 Self::InTransaction => write!(f, "In Transaction"),
389 Self::Other(s) => write!(f, "{s}"),
390 }
391 }
392}
393
394impl std::fmt::Display for MigrationState {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 match self {
397 Self::Applied => write!(f, "Applied"),
398 Self::SoftApplied => write!(f, "Soft Applied"),
399 Self::Unapplied => write!(f, "Not Applied"),
400 }
401 }
402}
403
404impl std::fmt::Display for RunDuration {
405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 match self {
407 Self::Duration(ms) => write!(f, "{}ms", ms),
408 Self::Unapplied => write!(f, "Not Applied"),
409 }
410 }
411}