1use siphasher::sip::SipHasher13;
2use time::OffsetDateTime;
3
4use log::error;
5use std::cmp::Ordering;
6use std::collections::VecDeque;
7use std::fmt;
8use std::hash::{Hash, Hasher};
9
10use crate::traits::{sync::migrate as sync_migrate, DEFAULT_MIGRATION_TABLE_NAME};
11use crate::util::parse_migration_name;
12use crate::{AsyncMigrate, Error, Migrate};
13use std::fmt::Formatter;
14
15#[derive(Clone, PartialEq)]
17pub enum Type {
18 Versioned,
19 Unversioned,
20}
21
22impl fmt::Display for Type {
23 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
24 let version_type = match self {
25 Type::Versioned => "V",
26 Type::Unversioned => "U",
27 };
28 write!(f, "{}", version_type)
29 }
30}
31
32impl fmt::Debug for Type {
33 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
34 let version_type = match self {
35 Type::Versioned => "Versioned",
36 Type::Unversioned => "Unversioned",
37 };
38 write!(f, "{}", version_type)
39 }
40}
41
42#[derive(Clone, Copy, Debug)]
44pub enum Target {
45 Latest,
46 Version(u32),
47 Fake,
48 FakeVersion(u32),
49}
50
51#[derive(Clone, Debug)]
54enum State {
55 Applied,
56 Unapplied,
57}
58
59#[derive(Clone, Debug)]
65pub struct Migration {
66 state: State,
67 name: String,
68 checksum: u64,
69 version: i32,
70 prefix: Type,
71 sql: Option<String>,
72 applied_on: Option<OffsetDateTime>,
73}
74
75impl Migration {
76 pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
79 let (prefix, version, name) = parse_migration_name(input_name)?;
80
81 let mut hasher = SipHasher13::new();
90 name.hash(&mut hasher);
91 version.hash(&mut hasher);
92 sql.hash(&mut hasher);
93 let checksum = hasher.finish();
94
95 Ok(Migration {
96 state: State::Unapplied,
97 name,
98 version,
99 prefix,
100 sql: Some(sql.into()),
101 applied_on: None,
102 checksum,
103 })
104 }
105
106 pub fn applied(
108 version: i32,
109 name: String,
110 applied_on: OffsetDateTime,
111 checksum: u64,
112 ) -> Migration {
113 Migration {
114 state: State::Applied,
115 name,
116 checksum,
117 version,
118 prefix: Type::Versioned,
120 sql: None,
121 applied_on: Some(applied_on),
122 }
123 }
124
125 pub fn set_applied(&mut self) {
127 self.applied_on = Some(OffsetDateTime::now_utc());
128 self.state = State::Applied;
129 }
130
131 pub fn sql(&self) -> Option<&str> {
133 self.sql.as_deref()
134 }
135
136 pub fn version(&self) -> u32 {
138 self.version as u32
139 }
140
141 pub fn prefix(&self) -> &Type {
143 &self.prefix
144 }
145
146 pub fn name(&self) -> &str {
148 &self.name
149 }
150
151 pub fn applied_on(&self) -> Option<&OffsetDateTime> {
154 self.applied_on.as_ref()
155 }
156
157 pub fn checksum(&self) -> u64 {
159 self.checksum
160 }
161}
162
163impl fmt::Display for Migration {
164 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
165 write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
166 }
167}
168
169impl Eq for Migration {}
170
171impl PartialEq for Migration {
172 fn eq(&self, other: &Migration) -> bool {
173 self.version == other.version
174 && self.name == other.name
175 && self.checksum() == other.checksum()
176 }
177}
178
179impl Ord for Migration {
180 fn cmp(&self, other: &Migration) -> Ordering {
181 self.version.cmp(&other.version)
182 }
183}
184
185impl PartialOrd for Migration {
186 fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
187 Some(self.cmp(other))
188 }
189}
190
191#[derive(Clone, Debug)]
201pub struct Report {
202 applied_migrations: Vec<Migration>,
203}
204
205impl Report {
206 pub fn new(applied_migrations: Vec<Migration>) -> Report {
208 Report { applied_migrations }
209 }
210
211 pub fn applied_migrations(&self) -> &Vec<Migration> {
213 &self.applied_migrations
214 }
215}
216
217pub struct Runner {
223 grouped: bool,
224 abort_divergent: bool,
225 abort_missing: bool,
226 migrations: Vec<Migration>,
227 target: Target,
228 migration_table_name: String,
229}
230
231impl Runner {
232 pub fn new(migrations: &[Migration]) -> Runner {
234 Runner {
235 grouped: false,
236 target: Target::Latest,
237 abort_divergent: true,
238 abort_missing: true,
239 migrations: migrations.to_vec(),
240 migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
241 }
242 }
243
244 pub fn get_migrations(&self) -> &Vec<Migration> {
246 &self.migrations
247 }
248
249 pub fn set_target(self, target: Target) -> Runner {
254 Runner { target, ..self }
255 }
256
257 pub fn set_grouped(self, grouped: bool) -> Runner {
265 Runner { grouped, ..self }
266 }
267
268 pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
272 Runner {
273 abort_divergent,
274 ..self
275 }
276 }
277
278 pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
283 Runner {
284 abort_missing,
285 ..self
286 }
287 }
288
289 pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
291 where
292 C: Migrate,
293 {
294 Migrate::get_last_applied_migration(conn, &self.migration_table_name)
295 }
296
297 pub async fn get_last_applied_migration_async<C>(
299 &self,
300 conn: &mut C,
301 ) -> Result<Option<Migration>, Error>
302 where
303 C: AsyncMigrate + Send,
304 {
305 AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
306 }
307
308 pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
310 where
311 C: Migrate,
312 {
313 Migrate::get_applied_migrations(conn, &self.migration_table_name)
314 }
315
316 pub async fn get_applied_migrations_async<C>(
318 &self,
319 conn: &mut C,
320 ) -> Result<Vec<Migration>, Error>
321 where
322 C: AsyncMigrate + Send,
323 {
324 AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
325 }
326
327 pub fn set_migration_table_name<S: AsRef<str>>(
337 &mut self,
338 migration_table_name: S,
339 ) -> &mut Self {
340 if migration_table_name.as_ref().is_empty() {
341 panic!("Migration table name must not be empty");
342 }
343
344 self.migration_table_name = migration_table_name.as_ref().to_string();
345 self
346 }
347
348 pub fn run_iter<C>(
352 self,
353 connection: &mut C,
354 ) -> impl Iterator<Item = Result<Migration, Error>> + '_
355 where
356 C: Migrate,
357 {
358 RunIterator::new(self, connection)
359 }
360
361 pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
363 where
364 C: Migrate,
365 {
366 Migrate::migrate(
367 connection,
368 &self.migrations,
369 self.abort_divergent,
370 self.abort_missing,
371 self.grouped,
372 self.target,
373 &self.migration_table_name,
374 )
375 }
376
377 pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
379 where
380 C: AsyncMigrate + Send,
381 {
382 AsyncMigrate::migrate(
383 connection,
384 &self.migrations,
385 self.abort_divergent,
386 self.abort_missing,
387 self.grouped,
388 self.target,
389 &self.migration_table_name,
390 )
391 .await
392 }
393}
394
395pub struct RunIterator<'a, C> {
396 connection: &'a mut C,
397 target: Target,
398 migration_table_name: String,
399 items: VecDeque<Migration>,
400 failed: bool,
401}
402impl<'a, C> RunIterator<'a, C>
403where
404 C: Migrate,
405{
406 pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
407 RunIterator {
408 items: VecDeque::from(
409 Migrate::get_unapplied_migrations(
410 connection,
411 &runner.migrations,
412 runner.abort_divergent,
413 runner.abort_missing,
414 &runner.migration_table_name,
415 )
416 .unwrap(),
417 ),
418 connection,
419 target: runner.target,
420 migration_table_name: runner.migration_table_name.clone(),
421 failed: false,
422 }
423 }
424}
425impl<C> Iterator for RunIterator<'_, C>
426where
427 C: Migrate,
428{
429 type Item = Result<Migration, Error>;
430
431 fn next(&mut self) -> Option<Self::Item> {
432 match self.failed {
433 true => None,
434 false => self.items.pop_front().and_then(|migration| {
435 sync_migrate(
436 self.connection,
437 vec![migration],
438 self.target,
439 &self.migration_table_name,
440 false,
441 )
442 .map(|r| r.applied_migrations.first().cloned())
443 .map_err(|e| {
444 error!("migration failed: {e:?}");
445 self.failed = true;
446 e
447 })
448 .transpose()
449 }),
450 }
451 }
452}