1use siphasher::sip::SipHasher13;
2use time::OffsetDateTime;
3
4use log::error;
5use std::cmp::Ordering;
6use std::collections::VecDeque;
7use std::fmt;
8use std::hash::{Hash, Hasher};
9
10use crate::traits::{sync::migrate as sync_migrate, DEFAULT_MIGRATION_TABLE_NAME};
11use crate::util::{parse_migration_name, SchemaVersion};
12use crate::{AsyncMigrate, Error, Migrate};
13use std::fmt::Formatter;
14
15#[derive(Clone, PartialEq)]
17#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
18pub enum Type {
19 Versioned,
20 Unversioned,
21}
22
23impl fmt::Display for Type {
24 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
25 let version_type = match self {
26 Type::Versioned => "V",
27 Type::Unversioned => "U",
28 };
29 write!(f, "{version_type}")
30 }
31}
32
33impl fmt::Debug for Type {
34 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
35 let version_type = match self {
36 Type::Versioned => "Versioned",
37 Type::Unversioned => "Unversioned",
38 };
39 write!(f, "{version_type}")
40 }
41}
42
43#[derive(Clone, Copy, Debug)]
45pub enum Target {
46 Latest,
47 Version(SchemaVersion),
48 Fake,
49 FakeVersion(SchemaVersion),
50}
51
52#[derive(Clone, Debug)]
55#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
56enum State {
57 Applied,
58 Unapplied,
59}
60
61#[derive(Clone, Debug)]
67#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
68pub struct Migration {
69 state: State,
70 name: String,
71 checksum: u64,
72 version: SchemaVersion,
73 prefix: Type,
74 sql: Option<String>,
75 applied_on: Option<OffsetDateTime>,
76}
77
78impl Migration {
79 pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
82 let (prefix, version, name) = parse_migration_name(input_name)?;
83
84 let mut hasher = SipHasher13::new();
93 name.hash(&mut hasher);
94 version.hash(&mut hasher);
95 sql.hash(&mut hasher);
96 let checksum = hasher.finish();
97
98 Ok(Migration {
99 state: State::Unapplied,
100 name,
101 version,
102 prefix,
103 sql: Some(sql.into()),
104 applied_on: None,
105 checksum,
106 })
107 }
108
109 pub fn applied(
111 version: SchemaVersion,
112 name: String,
113 applied_on: OffsetDateTime,
114 checksum: u64,
115 ) -> Migration {
116 Migration {
117 state: State::Applied,
118 name,
119 checksum,
120 version,
121 prefix: Type::Versioned,
123 sql: None,
124 applied_on: Some(applied_on),
125 }
126 }
127
128 pub fn set_applied(&mut self) {
130 self.applied_on = Some(OffsetDateTime::now_utc());
131 self.state = State::Applied;
132 }
133
134 pub fn sql(&self) -> Option<&str> {
136 self.sql.as_deref()
137 }
138
139 pub fn version(&self) -> SchemaVersion {
141 self.version
142 }
143
144 pub fn prefix(&self) -> &Type {
146 &self.prefix
147 }
148
149 pub fn name(&self) -> &str {
151 &self.name
152 }
153
154 pub fn applied_on(&self) -> Option<&OffsetDateTime> {
157 self.applied_on.as_ref()
158 }
159
160 pub fn checksum(&self) -> u64 {
162 self.checksum
163 }
164}
165
166impl fmt::Display for Migration {
167 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
168 write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
169 }
170}
171
172impl Eq for Migration {}
173
174impl PartialEq for Migration {
175 fn eq(&self, other: &Migration) -> bool {
176 self.version == other.version
177 && self.name == other.name
178 && self.checksum() == other.checksum()
179 }
180}
181
182impl Ord for Migration {
183 fn cmp(&self, other: &Migration) -> Ordering {
184 self.version.cmp(&other.version)
185 }
186}
187
188impl PartialOrd for Migration {
189 fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
190 Some(self.cmp(other))
191 }
192}
193
194#[derive(Clone, Debug)]
204pub struct Report {
205 applied_migrations: Vec<Migration>,
206}
207
208impl Report {
209 pub fn new(applied_migrations: Vec<Migration>) -> Report {
211 Report { applied_migrations }
212 }
213
214 pub fn applied_migrations(&self) -> &Vec<Migration> {
216 &self.applied_migrations
217 }
218}
219
220pub struct Runner {
226 grouped: bool,
227 abort_divergent: bool,
228 abort_missing: bool,
229 migrations: Vec<Migration>,
230 target: Target,
231 migration_table_name: String,
232}
233
234impl Runner {
235 pub fn new(migrations: &[Migration]) -> Runner {
237 Runner {
238 grouped: false,
239 target: Target::Latest,
240 abort_divergent: true,
241 abort_missing: true,
242 migrations: migrations.to_vec(),
243 migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
244 }
245 }
246
247 pub fn get_migrations(&self) -> &Vec<Migration> {
249 &self.migrations
250 }
251
252 pub fn set_target(self, target: Target) -> Runner {
257 Runner { target, ..self }
258 }
259
260 pub fn set_grouped(self, grouped: bool) -> Runner {
268 Runner { grouped, ..self }
269 }
270
271 pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
275 Runner {
276 abort_divergent,
277 ..self
278 }
279 }
280
281 pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
286 Runner {
287 abort_missing,
288 ..self
289 }
290 }
291
292 pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
294 where
295 C: Migrate,
296 {
297 Migrate::get_last_applied_migration(conn, &self.migration_table_name)
298 }
299
300 pub async fn get_last_applied_migration_async<C>(
302 &self,
303 conn: &mut C,
304 ) -> Result<Option<Migration>, Error>
305 where
306 C: AsyncMigrate + Send,
307 {
308 AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
309 }
310
311 pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
313 where
314 C: Migrate,
315 {
316 Migrate::get_applied_migrations(conn, &self.migration_table_name)
317 }
318
319 pub async fn get_applied_migrations_async<C>(
321 &self,
322 conn: &mut C,
323 ) -> Result<Vec<Migration>, Error>
324 where
325 C: AsyncMigrate + Send,
326 {
327 AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
328 }
329
330 pub fn set_migration_table_name<S: AsRef<str>>(
340 &mut self,
341 migration_table_name: S,
342 ) -> &mut Self {
343 if migration_table_name.as_ref().is_empty() {
344 panic!("Migration table name must not be empty");
345 }
346
347 self.migration_table_name = migration_table_name.as_ref().to_string();
348 self
349 }
350
351 pub fn run_iter<C>(
355 self,
356 connection: &mut C,
357 ) -> impl Iterator<Item = Result<Migration, Error>> + '_
358 where
359 C: Migrate,
360 {
361 RunIterator::new(self, connection)
362 }
363
364 pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
366 where
367 C: Migrate,
368 {
369 Migrate::migrate(
370 connection,
371 &self.migrations,
372 self.abort_divergent,
373 self.abort_missing,
374 self.grouped,
375 self.target,
376 &self.migration_table_name,
377 )
378 }
379
380 pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
382 where
383 C: AsyncMigrate + Send,
384 {
385 AsyncMigrate::migrate(
386 connection,
387 &self.migrations,
388 self.abort_divergent,
389 self.abort_missing,
390 self.grouped,
391 self.target,
392 &self.migration_table_name,
393 )
394 .await
395 }
396}
397
398pub struct RunIterator<'a, C> {
399 connection: &'a mut C,
400 target: Target,
401 migration_table_name: String,
402 items: VecDeque<Migration>,
403 failed: bool,
404}
405impl<'a, C> RunIterator<'a, C>
406where
407 C: Migrate,
408{
409 pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
410 RunIterator {
411 items: VecDeque::from(
412 Migrate::get_unapplied_migrations(
413 connection,
414 &runner.migrations,
415 runner.abort_divergent,
416 runner.abort_missing,
417 &runner.migration_table_name,
418 )
419 .unwrap(),
420 ),
421 connection,
422 target: runner.target,
423 migration_table_name: runner.migration_table_name.clone(),
424 failed: false,
425 }
426 }
427}
428impl<C> Iterator for RunIterator<'_, C>
429where
430 C: Migrate,
431{
432 type Item = Result<Migration, Error>;
433
434 fn next(&mut self) -> Option<Self::Item> {
435 match self.failed {
436 true => None,
437 false => self.items.pop_front().and_then(|migration| {
438 sync_migrate(
439 self.connection,
440 vec![migration],
441 self.target,
442 &self.migration_table_name,
443 false,
444 )
445 .map(|r| r.applied_migrations.first().cloned())
446 .map_err(|e| {
447 error!("migration failed: {e:?}");
448 self.failed = true;
449 e
450 })
451 .transpose()
452 }),
453 }
454 }
455}