1use siphasher::sip::SipHasher13;
2use time::OffsetDateTime;
3
4use log::error;
5use std::cmp::Ordering;
6use std::collections::VecDeque;
7use std::fmt;
8use std::hash::{Hash, Hasher};
9
10use crate::traits::{DEFAULT_MIGRATION_TABLE_NAME, sync::migrate as sync_migrate};
11use crate::util::parse_migration_name;
12use crate::{AsyncMigrate, Error, Migrate};
13use std::fmt::Formatter;
14
15#[derive(Clone, PartialEq)]
17pub enum Type {
18 Versioned,
19 Unversioned,
20}
21
22impl fmt::Display for Type {
23 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
24 let version_type = match self {
25 Type::Versioned => "V",
26 Type::Unversioned => "U",
27 };
28 write!(f, "{}", version_type)
29 }
30}
31
32impl fmt::Debug for Type {
33 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
34 let version_type = match self {
35 Type::Versioned => "Versioned",
36 Type::Unversioned => "Unversioned",
37 };
38 write!(f, "{}", version_type)
39 }
40}
41
42#[derive(Clone, Copy, Debug)]
45pub enum Target {
46 Latest,
47 Version(u32),
48 Fake,
49 FakeVersion(u32),
50}
51
52#[derive(Clone, Debug)]
55enum State {
56 Applied,
57 Unapplied,
58}
59
60#[derive(Clone, Debug)]
66pub struct Migration {
67 state: State,
68 name: String,
69 checksum: u64,
70 version: i32,
71 prefix: Type,
72 sql: Option<String>,
73 applied_on: Option<OffsetDateTime>,
74}
75
76impl Migration {
77 pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
81 let (prefix, version, name) = parse_migration_name(input_name)?;
82
83 let mut hasher = SipHasher13::new();
92 name.hash(&mut hasher);
93 version.hash(&mut hasher);
94 sql.hash(&mut hasher);
95 let checksum = hasher.finish();
96
97 Ok(Migration {
98 state: State::Unapplied,
99 name,
100 version,
101 prefix,
102 sql: Some(sql.into()),
103 applied_on: None,
104 checksum,
105 })
106 }
107
108 pub fn applied(
110 version: i32,
111 name: String,
112 applied_on: OffsetDateTime,
113 checksum: u64,
114 ) -> Migration {
115 Migration {
116 state: State::Applied,
117 name,
118 checksum,
119 version,
120 prefix: Type::Versioned,
122 sql: None,
123 applied_on: Some(applied_on),
124 }
125 }
126
127 pub fn set_applied(&mut self) {
129 self.applied_on = Some(OffsetDateTime::now_utc());
130 self.state = State::Applied;
131 }
132
133 pub fn sql(&self) -> Option<&str> {
135 self.sql.as_deref()
136 }
137
138 pub fn version(&self) -> u32 {
140 self.version as u32
141 }
142
143 pub fn prefix(&self) -> &Type {
145 &self.prefix
146 }
147
148 pub fn name(&self) -> &str {
150 &self.name
151 }
152
153 pub fn applied_on(&self) -> Option<&OffsetDateTime> {
156 self.applied_on.as_ref()
157 }
158
159 pub fn checksum(&self) -> u64 {
161 self.checksum
162 }
163}
164
165impl fmt::Display for Migration {
166 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167 write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
168 }
169}
170
171impl Eq for Migration {}
172
173impl PartialEq for Migration {
174 fn eq(&self, other: &Migration) -> bool {
175 self.version == other.version && self.name == other.name && self.checksum() == other.checksum()
176 }
177}
178
179impl Ord for Migration {
180 fn cmp(&self, other: &Migration) -> Ordering {
181 self.version.cmp(&other.version)
182 }
183}
184
185impl PartialOrd for Migration {
186 fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
187 Some(self.cmp(other))
188 }
189}
190
191#[derive(Clone, Debug)]
202pub struct Report {
203 applied_migrations: Vec<Migration>,
204}
205
206impl Report {
207 pub(crate) fn new(applied_migrations: Vec<Migration>) -> Report {
209 Report { applied_migrations }
210 }
211
212 pub fn applied_migrations(&self) -> &Vec<Migration> {
214 &self.applied_migrations
215 }
216}
217
218pub struct Runner {
224 grouped: bool,
225 abort_divergent: bool,
226 abort_missing: bool,
227 migrations: Vec<Migration>,
228 target: Target,
229 migration_table_name: String,
230}
231
232impl Runner {
233 pub fn new(migrations: &[Migration]) -> Runner {
235 Runner {
236 grouped: false,
237 target: Target::Latest,
238 abort_divergent: true,
239 abort_missing: true,
240 migrations: migrations.to_vec(),
241 migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
242 }
243 }
244
245 pub fn get_migrations(&self) -> &Vec<Migration> {
247 &self.migrations
248 }
249
250 pub fn set_target(self, target: Target) -> Runner {
255 Runner { target, ..self }
256 }
257
258 pub fn set_grouped(self, grouped: bool) -> Runner {
268 Runner { grouped, ..self }
269 }
270
271 pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
275 Runner {
276 abort_divergent,
277 ..self
278 }
279 }
280
281 pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
286 Runner {
287 abort_missing,
288 ..self
289 }
290 }
291
292 pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
295 where
296 C: Migrate,
297 {
298 Migrate::get_last_applied_migration(conn, &self.migration_table_name)
299 }
300
301 pub async fn get_last_applied_migration_async<C>(
304 &self,
305 conn: &mut C,
306 ) -> Result<Option<Migration>, Error>
307 where
308 C: AsyncMigrate + Send,
309 {
310 AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
311 }
312
313 pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
315 where
316 C: Migrate,
317 {
318 Migrate::get_applied_migrations(conn, &self.migration_table_name)
319 }
320
321 pub async fn get_applied_migrations_async<C>(&self, conn: &mut C) -> Result<Vec<Migration>, Error>
323 where
324 C: AsyncMigrate + Send,
325 {
326 AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
327 }
328
329 pub fn set_migration_table_name<S: AsRef<str>>(&mut self, migration_table_name: S) -> &mut Self {
340 if migration_table_name.as_ref().is_empty() {
341 panic!("Migration table name must not be empty");
342 }
343
344 self.migration_table_name = migration_table_name.as_ref().to_string();
345 self
346 }
347
348 pub fn run_iter<C>(
352 self,
353 connection: &mut C,
354 ) -> impl Iterator<Item = Result<Migration, Error>> + '_
355 where
356 C: Migrate,
357 {
358 RunIterator::new(self, connection)
359 }
360
361 pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
363 where
364 C: Migrate,
365 {
366 Migrate::migrate(
367 connection,
368 &self.migrations,
369 self.abort_divergent,
370 self.abort_missing,
371 self.grouped,
372 self.target,
373 &self.migration_table_name,
374 )
375 }
376
377 pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
379 where
380 C: AsyncMigrate + Send,
381 {
382 AsyncMigrate::migrate(
383 connection,
384 &self.migrations,
385 self.abort_divergent,
386 self.abort_missing,
387 self.grouped,
388 self.target,
389 &self.migration_table_name,
390 )
391 .await
392 }
393}
394
395pub struct RunIterator<'a, C> {
396 connection: &'a mut C,
397 target: Target,
398 migration_table_name: String,
399 items: VecDeque<Migration>,
400 failed: bool,
401}
402impl<'a, C> RunIterator<'a, C>
403where
404 C: Migrate,
405{
406 pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
407 RunIterator {
408 items: VecDeque::from(
409 Migrate::get_unapplied_migrations(
410 connection,
411 &runner.migrations,
412 runner.abort_divergent,
413 runner.abort_missing,
414 &runner.migration_table_name,
415 )
416 .unwrap(),
417 ),
418 connection,
419 target: runner.target,
420 migration_table_name: runner.migration_table_name.clone(),
421 failed: false,
422 }
423 }
424}
425impl<C> Iterator for RunIterator<'_, C>
426where
427 C: Migrate,
428{
429 type Item = Result<Migration, Error>;
430
431 fn next(&mut self) -> Option<Self::Item> {
432 match self.failed {
433 true => None,
434 false => self.items.pop_front().and_then(|migration| {
435 sync_migrate(
436 self.connection,
437 vec![migration],
438 self.target,
439 &self.migration_table_name,
440 false,
441 )
442 .map(|r| r.applied_migrations.first().cloned())
443 .map_err(|e| {
444 error!("migration failed: {e:?}");
445 self.failed = true;
446 e
447 })
448 .transpose()
449 }),
450 }
451 }
452}