use siphasher::sip::SipHasher13;
use time::OffsetDateTime;
use log::error;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt;
use std::hash::{Hash, Hasher};
use crate::traits::{sync::migrate as sync_migrate, DEFAULT_MIGRATION_TABLE_NAME};
use crate::util::{parse_migration_name, SchemaVersion};
use crate::{AsyncMigrate, Error, Migrate};
use std::fmt::Formatter;
#[derive(Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Type {
Versioned,
Unversioned,
}
impl fmt::Display for Type {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let version_type = match self {
Type::Versioned => "V",
Type::Unversioned => "U",
};
write!(f, "{version_type}")
}
}
impl fmt::Debug for Type {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let version_type = match self {
Type::Versioned => "Versioned",
Type::Unversioned => "Unversioned",
};
write!(f, "{version_type}")
}
}
#[derive(Clone, Copy, Debug)]
pub enum Target {
Latest,
Version(SchemaVersion),
Fake,
FakeVersion(SchemaVersion),
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
enum State {
Applied,
Unapplied,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Migration {
state: State,
name: String,
checksum: u64,
version: SchemaVersion,
prefix: Type,
sql: Option<String>,
applied_on: Option<OffsetDateTime>,
}
impl Migration {
pub fn unapplied(input_name: &str, sql: &str) -> Result<Migration, Error> {
let (prefix, version, name) = parse_migration_name(input_name)?;
let mut hasher = SipHasher13::new();
name.hash(&mut hasher);
version.hash(&mut hasher);
sql.hash(&mut hasher);
let checksum = hasher.finish();
Ok(Migration {
state: State::Unapplied,
name,
version,
prefix,
sql: Some(sql.into()),
applied_on: None,
checksum,
})
}
pub fn applied(
version: SchemaVersion,
name: String,
applied_on: OffsetDateTime,
checksum: u64,
) -> Migration {
Migration {
state: State::Applied,
name,
checksum,
version,
prefix: Type::Versioned,
sql: None,
applied_on: Some(applied_on),
}
}
pub fn set_applied(&mut self) {
self.applied_on = Some(OffsetDateTime::now_utc());
self.state = State::Applied;
}
pub fn sql(&self) -> Option<&str> {
self.sql.as_deref()
}
pub fn version(&self) -> SchemaVersion {
self.version
}
pub fn prefix(&self) -> &Type {
&self.prefix
}
pub fn name(&self) -> &str {
&self.name
}
pub fn applied_on(&self) -> Option<&OffsetDateTime> {
self.applied_on.as_ref()
}
pub fn checksum(&self) -> u64 {
self.checksum
}
}
impl fmt::Display for Migration {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}{}__{}", self.prefix, self.version, self.name)
}
}
impl Eq for Migration {}
impl PartialEq for Migration {
fn eq(&self, other: &Migration) -> bool {
self.version == other.version
&& self.name == other.name
&& self.checksum() == other.checksum()
}
}
impl Ord for Migration {
fn cmp(&self, other: &Migration) -> Ordering {
self.version.cmp(&other.version)
}
}
impl PartialOrd for Migration {
fn partial_cmp(&self, other: &Migration) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Debug)]
pub struct Report {
applied_migrations: Vec<Migration>,
}
impl Report {
pub fn new(applied_migrations: Vec<Migration>) -> Report {
Report { applied_migrations }
}
pub fn applied_migrations(&self) -> &Vec<Migration> {
&self.applied_migrations
}
}
pub struct Runner {
grouped: bool,
abort_divergent: bool,
abort_missing: bool,
migrations: Vec<Migration>,
target: Target,
migration_table_name: String,
}
impl Runner {
pub fn new(migrations: &[Migration]) -> Runner {
Runner {
grouped: false,
target: Target::Latest,
abort_divergent: true,
abort_missing: true,
migrations: migrations.to_vec(),
migration_table_name: DEFAULT_MIGRATION_TABLE_NAME.into(),
}
}
pub fn get_migrations(&self) -> &Vec<Migration> {
&self.migrations
}
pub fn set_target(self, target: Target) -> Runner {
Runner { target, ..self }
}
pub fn set_grouped(self, grouped: bool) -> Runner {
Runner { grouped, ..self }
}
pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
Runner {
abort_divergent,
..self
}
}
pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
Runner {
abort_missing,
..self
}
}
pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
where
C: Migrate,
{
Migrate::get_last_applied_migration(conn, &self.migration_table_name)
}
pub async fn get_last_applied_migration_async<C>(
&self,
conn: &mut C,
) -> Result<Option<Migration>, Error>
where
C: AsyncMigrate + Send,
{
AsyncMigrate::get_last_applied_migration(conn, &self.migration_table_name).await
}
pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
where
C: Migrate,
{
Migrate::get_applied_migrations(conn, &self.migration_table_name)
}
pub async fn get_applied_migrations_async<C>(
&self,
conn: &mut C,
) -> Result<Vec<Migration>, Error>
where
C: AsyncMigrate + Send,
{
AsyncMigrate::get_applied_migrations(conn, &self.migration_table_name).await
}
pub fn set_migration_table_name<S: AsRef<str>>(
&mut self,
migration_table_name: S,
) -> &mut Self {
if migration_table_name.as_ref().is_empty() {
panic!("Migration table name must not be empty");
}
self.migration_table_name = migration_table_name.as_ref().to_string();
self
}
pub fn run_iter<C>(
self,
connection: &mut C,
) -> impl Iterator<Item = Result<Migration, Error>> + '_
where
C: Migrate,
{
RunIterator::new(self, connection)
}
pub fn run<C>(&self, connection: &mut C) -> Result<Report, Error>
where
C: Migrate,
{
Migrate::migrate(
connection,
&self.migrations,
self.abort_divergent,
self.abort_missing,
self.grouped,
self.target,
&self.migration_table_name,
)
}
pub async fn run_async<C>(&self, connection: &mut C) -> Result<Report, Error>
where
C: AsyncMigrate + Send,
{
AsyncMigrate::migrate(
connection,
&self.migrations,
self.abort_divergent,
self.abort_missing,
self.grouped,
self.target,
&self.migration_table_name,
)
.await
}
}
pub struct RunIterator<'a, C> {
connection: &'a mut C,
target: Target,
migration_table_name: String,
items: VecDeque<Migration>,
failed: bool,
}
impl<'a, C> RunIterator<'a, C>
where
C: Migrate,
{
pub(crate) fn new(runner: Runner, connection: &'a mut C) -> RunIterator<'a, C> {
RunIterator {
items: VecDeque::from(
Migrate::get_unapplied_migrations(
connection,
&runner.migrations,
runner.abort_divergent,
runner.abort_missing,
&runner.migration_table_name,
)
.unwrap(),
),
connection,
target: runner.target,
migration_table_name: runner.migration_table_name.clone(),
failed: false,
}
}
}
impl<C> Iterator for RunIterator<'_, C>
where
C: Migrate,
{
type Item = Result<Migration, Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.failed {
true => None,
false => self.items.pop_front().and_then(|migration| {
sync_migrate(
self.connection,
vec![migration],
self.target,
&self.migration_table_name,
false,
)
.map(|r| r.applied_migrations.first().cloned())
.map_err(|e| {
error!("migration failed: {e:?}");
self.failed = true;
e
})
.transpose()
}),
}
}
}