#![warn(clippy::all)]
#![forbid(unsafe_code)]
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::{Debug, Display};
use daggy::petgraph::EdgeDirection;
use daggy::Dag;
use log::{debug, info};
use thiserror::Error;
use uuid::Uuid;
#[macro_use]
pub mod testing;
pub trait Migration {
fn id(&self) -> Uuid;
fn dependencies(&self) -> HashSet<Uuid>;
fn description(&self) -> &'static str;
}
#[macro_export]
macro_rules! migration {
($name:ident, $id:expr, [ $( $dependency_id:expr ),* $(,)* ], $description:expr) => {
impl $crate::Migration for $name {
fn id(&self) -> ::uuid::Uuid {
::uuid::Uuid::parse_str($id).unwrap()
}
fn dependencies(&self) -> ::std::collections::HashSet<::uuid::Uuid> {
vec![
$(
::uuid::Uuid::parse_str($dependency_id).unwrap(),
)*
].into_iter().collect()
}
fn description(&self) -> &'static str {
$description
}
}
}
}
#[derive(Debug)]
pub enum MigrationDirection {
Up,
Down,
}
impl Display for MigrationDirection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let printable = match *self {
MigrationDirection::Up => "up",
MigrationDirection::Down => "Down",
};
write!(f, "{}", printable)
}
}
pub trait Adapter {
type MigrationType: Migration + ?Sized;
type Error: std::error::Error + 'static;
fn applied_migrations(&mut self) -> Result<HashSet<Uuid>, Self::Error>;
fn apply_migration(&mut self, _: &Self::MigrationType) -> Result<(), Self::Error>;
fn revert_migration(&mut self, _: &Self::MigrationType) -> Result<(), Self::Error>;
}
#[derive(Debug, Error)]
pub enum DependencyError {
#[error("Duplicate migration ID {0}")]
DuplicateId(Uuid),
#[error("Unknown migration ID {0}")]
UnknownId(Uuid),
#[error("Cyclic dependency caused by edge from migration IDs {from} to {to}")]
Cycle { from: Uuid, to: Uuid },
}
#[derive(Debug, Error)]
pub enum MigratorError<T: std::error::Error + 'static> {
#[error("An error occurred due to migration dependencies")]
Dependency(#[source] DependencyError),
#[error("An error occurred while interacting with the adapter.")]
Adapter(#[from] T),
#[error(
"An error occurred while applying migration {id} ({description}) {direction}: {error}."
)]
Migration {
id: Uuid,
description: &'static str,
direction: MigrationDirection,
#[source]
error: T,
},
}
pub struct Migrator<T: Adapter> {
adapter: T,
dependencies: Dag<Box<T::MigrationType>, ()>,
id_map: HashMap<Uuid, daggy::NodeIndex>,
}
impl<T: Adapter> Migrator<T> {
pub fn new(adapter: T) -> Migrator<T> {
Migrator {
adapter,
dependencies: Dag::new(),
id_map: HashMap::new(),
}
}
pub fn register(
&mut self,
migration: Box<T::MigrationType>,
) -> Result<(), MigratorError<T::Error>> {
let id = migration.id();
debug!("Registering migration {}", id);
if self.id_map.contains_key(&id) {
return Err(MigratorError::Dependency(DependencyError::DuplicateId(id)));
}
let depends = migration.dependencies();
let migration_idx = self.dependencies.add_node(migration);
for d in depends {
let parent_idx = self
.id_map
.get(&d)
.ok_or(MigratorError::Dependency(DependencyError::UnknownId(d)))?;
self.dependencies
.add_edge(*parent_idx, migration_idx, ())
.map_err(|_| {
MigratorError::Dependency(DependencyError::Cycle { from: d, to: id })
})?;
}
self.id_map.insert(id, migration_idx);
Ok(())
}
pub fn register_multiple(
&mut self,
migrations: Vec<Box<T::MigrationType>>,
) -> Result<(), MigratorError<T::Error>> {
for migration in migrations {
let id = migration.id();
debug!("Registering migration (with multiple) {}", id);
if self.id_map.contains_key(&id) {
return Err(MigratorError::Dependency(DependencyError::DuplicateId(id)));
}
let migration_idx = self.dependencies.add_node(migration);
self.id_map.insert(id, migration_idx);
}
for (id, migration_idx) in &self.id_map {
let depends = self.dependencies[*migration_idx].dependencies();
for d in depends {
let parent_idx = self
.id_map
.get(&d)
.ok_or(MigratorError::Dependency(DependencyError::UnknownId(d)))?;
self.dependencies
.add_edge(*parent_idx, *migration_idx, ())
.map_err(|_| {
MigratorError::Dependency(DependencyError::Cycle { from: d, to: *id })
})?;
}
}
Ok(())
}
fn induced_stream(
&self,
id: Option<Uuid>,
dir: EdgeDirection,
) -> Result<HashSet<Uuid>, DependencyError> {
let mut target_ids = HashSet::new();
match id {
Some(id) => {
if !self.id_map.contains_key(&id) {
return Err(DependencyError::UnknownId(id));
}
target_ids.insert(id);
}
None => target_ids.extend(
self.dependencies
.graph()
.externals(dir.opposite())
.map(|idx| self.dependencies[idx].id()),
),
}
let mut to_visit: VecDeque<_> = target_ids
.iter()
.map(|id| *self.id_map.get(id).expect("ID map is malformed"))
.collect();
while !to_visit.is_empty() {
let idx = to_visit.pop_front().expect("Impossible: not empty");
let id = self.dependencies[idx].id();
target_ids.insert(id);
to_visit.extend(self.dependencies.graph().neighbors_directed(idx, dir));
}
Ok(target_ids)
}
pub fn up(&mut self, to: Option<Uuid>) -> Result<(), MigratorError<T::Error>> {
info!("Migrating up to target: {:?}", to);
let target_ids = self
.induced_stream(to, EdgeDirection::Incoming)
.map_err(MigratorError::Dependency)?;
let applied_migrations = self.adapter.applied_migrations()?;
for idx in &daggy::petgraph::algo::toposort(self.dependencies.graph(), None)
.expect("Impossible: dependencies are a DAG")
{
let migration = &self.dependencies[*idx];
let id = migration.id();
if applied_migrations.contains(&id) || !target_ids.contains(&id) {
continue;
}
info!("Applying migration {}", id);
self.adapter
.apply_migration(migration)
.map_err(|e| MigratorError::Migration {
id,
description: migration.description(),
direction: MigrationDirection::Up,
error: e,
})?;
}
Ok(())
}
pub fn down(&mut self, to: Option<Uuid>) -> Result<(), MigratorError<T::Error>> {
info!("Migrating down to target: {:?}", to);
let mut target_ids = self
.induced_stream(to, EdgeDirection::Outgoing)
.map_err(MigratorError::Dependency)?;
if let Some(sink_id) = to {
target_ids.remove(&sink_id);
}
let applied_migrations = self.adapter.applied_migrations()?;
for idx in daggy::petgraph::algo::toposort(self.dependencies.graph(), None)
.expect("Impossible: dependencies are a DAG")
.iter()
.rev()
{
let migration = &self.dependencies[*idx];
let id = migration.id();
if !applied_migrations.contains(&id) || !target_ids.contains(&id) {
continue;
}
info!("Reverting migration {}", id);
self.adapter
.revert_migration(migration)
.map_err(|e| MigratorError::Migration {
id,
description: migration.description(),
direction: MigrationDirection::Down,
error: e,
})?;
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use super::testing::*;
use super::*;
struct DefaultTestAdapter {
applied_migrations: HashSet<Uuid>,
}
impl DefaultTestAdapter {
fn new() -> DefaultTestAdapter {
DefaultTestAdapter {
applied_migrations: HashSet::new(),
}
}
}
#[derive(Debug, Error)]
#[error("An error occurred.")]
struct DefaultTestAdapterError;
impl Adapter for DefaultTestAdapter {
type MigrationType = dyn Migration;
type Error = DefaultTestAdapterError;
fn applied_migrations(&mut self) -> Result<HashSet<Uuid>, Self::Error> {
Ok(self.applied_migrations.clone())
}
fn apply_migration(&mut self, migration: &Self::MigrationType) -> Result<(), Self::Error> {
self.applied_migrations.insert(migration.id());
Ok(())
}
fn revert_migration(&mut self, migration: &Self::MigrationType) -> Result<(), Self::Error> {
self.applied_migrations.remove(&migration.id());
Ok(())
}
}
impl TestAdapter for DefaultTestAdapter {
fn mock(id: Uuid, dependencies: HashSet<Uuid>) -> Box<Self::MigrationType> {
Box::new(TestMigration::new(id, dependencies))
}
}
test_schemer_adapter!(DefaultTestAdapter::new());
}