pub use super::types::{InstanceMigration, MigrationStatus};
use exonum_merkledb::migration::{self as db_migration, MigrationHelper};
use semver::Version;
use thiserror::Error;
use std::{collections::BTreeMap, fmt};
use crate::runtime::{CoreError, ExecutionError, ExecutionFail, InstanceSpec};
type MigrationLogic = dyn FnOnce(&mut MigrationContext) -> Result<(), MigrationError> + Send;
#[derive(Debug)]
#[non_exhaustive]
pub enum MigrationType {
FastForward,
Async,
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MigrationError {
#[error("{}", _0)]
Helper(#[source] db_migration::MigrationError),
#[error("{}", _0)]
Custom(String),
}
impl MigrationError {
pub fn new(cause: impl fmt::Display) -> Self {
Self::Custom(cause.to_string())
}
}
impl From<db_migration::MigrationError> for MigrationError {
fn from(err: db_migration::MigrationError) -> Self {
Self::Helper(err)
}
}
pub struct MigrationScript {
end_version: Version,
name: String,
logic: Box<MigrationLogic>,
}
impl fmt::Debug for MigrationScript {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("MigrationScript")
.field("end_version", &self.end_version)
.field("name", &self.name)
.finish()
}
}
impl MigrationScript {
pub fn new<F>(logic: F, end_version: Version) -> Self
where
F: FnOnce(&mut MigrationContext) -> Result<(), MigrationError> + Send + 'static,
{
Self {
name: format!("Migration to {}", end_version),
end_version,
logic: Box::new(logic),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn end_version(&self) -> &Version {
&self.end_version
}
pub fn execute(self, context: &mut MigrationContext) -> Result<(), MigrationError> {
(self.logic)(context)
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct MigrationContext {
pub helper: MigrationHelper,
pub instance_spec: InstanceSpec,
pub data_version: Version,
}
impl MigrationContext {
#[doc(hidden)]
pub fn new(
helper: MigrationHelper,
instance_spec: InstanceSpec,
data_version: Version,
) -> Self {
Self {
helper,
instance_spec,
data_version,
}
}
}
pub trait MigrateData {
fn migration_scripts(
&self,
start_version: &Version,
) -> Result<Vec<MigrationScript>, InitMigrationError>;
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum InitMigrationError {
#[error(
"The provided start version is too far in the past; the minimum supported version is {}",
min_supported_version
)]
OldStartVersion {
min_supported_version: Version,
},
#[error(
"The provided start version is greater than the maximum supported version ({})",
max_supported_version
)]
FutureStartVersion {
max_supported_version: Version,
},
#[error("Start version is not supported: {}", _0)]
UnsupportedStart(String),
#[error("Data migrations are not supported by the artifact")]
NotSupported,
}
impl From<InitMigrationError> for ExecutionError {
fn from(err: InitMigrationError) -> Self {
CoreError::NoMigration.with_description(err)
}
}
#[derive(Debug)]
pub struct LinearMigrations {
min_start_version: Option<Version>,
latest_version: Version,
scripts: BTreeMap<Version, MigrationScript>,
support_prereleases: bool,
}
impl LinearMigrations {
pub fn new(latest_version: Version) -> Self {
assert!(
!latest_version.is_prerelease(),
"Prerelease versions require using `with_prereleases` constructor"
);
Self {
support_prereleases: false,
..Self::with_prereleases(latest_version)
}
}
pub fn with_prereleases(latest_version: Version) -> Self {
Self {
min_start_version: None,
latest_version,
scripts: BTreeMap::new(),
support_prereleases: true,
}
}
fn check_prerelease(&self, version: &Version) {
if !self.support_prereleases {
assert!(
!version.is_prerelease(),
"Prerelease versions require using `with_prereleases` constructor"
);
}
}
pub fn set_min_version(mut self, version: Version) -> Self {
self.check_prerelease(&version);
self.min_start_version = Some(version);
self
}
pub fn add_script<F>(mut self, version: Version, script: F) -> Self
where
F: FnOnce(&mut MigrationContext) -> Result<(), MigrationError> + Send + 'static,
{
self.check_prerelease(&version);
assert!(
version <= self.latest_version,
"Cannot add a script for a future version {} (the latest version is {})",
version,
self.latest_version
);
let script = MigrationScript::new(script, version.clone());
self.scripts.insert(version, script);
self
}
fn check(&self, start_version: &Version) -> Result<(), InitMigrationError> {
if !self.support_prereleases && start_version.is_prerelease() {
let msg = "the start version is a prerelease".to_owned();
return Err(InitMigrationError::UnsupportedStart(msg));
}
if *start_version > self.latest_version {
return Err(InitMigrationError::FutureStartVersion {
max_supported_version: self.latest_version.clone(),
});
}
if let Some(ref min_supported_version) = self.min_start_version {
if start_version < min_supported_version {
return Err(InitMigrationError::OldStartVersion {
min_supported_version: min_supported_version.to_owned(),
});
}
}
Ok(())
}
pub fn select(
self,
start_version: &Version,
) -> Result<Vec<MigrationScript>, InitMigrationError> {
self.check(start_version)?;
Ok(self
.scripts
.into_iter()
.filter_map(|(version, script)| {
if version > *start_version {
Some(script)
} else {
None
}
})
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::{ArtifactId, RuntimeIdentifier};
use assert_matches::assert_matches;
use exonum_crypto::Hash;
use exonum_merkledb::{
access::{AccessExt, CopyAccessExt},
migration::flush_migration,
Database, Snapshot, TemporaryDB,
};
use std::{collections::HashSet, sync::Arc};
const ARTIFACT_NAME: &str = "service.test.Migration";
fn migration_02(context: &mut MigrationContext) -> Result<(), MigrationError> {
assert_eq!(context.instance_spec.name, "test");
assert_eq!(context.instance_spec.artifact.name, ARTIFACT_NAME);
assert!(context.data_version < Version::new(0, 2, 0));
let old_entry = context.helper.old_data().get_proof_entry::<_, u32>("entry");
assert!(!old_entry.exists());
let mut new_entry = context.helper.new_data().get_proof_entry::<_, u32>("entry");
new_entry.set(1);
Ok(())
}
fn migration_05(context: &mut MigrationContext) -> Result<(), MigrationError> {
assert_eq!(context.instance_spec.name, "test");
assert_eq!(context.instance_spec.artifact.name, ARTIFACT_NAME);
assert!(context.data_version >= Version::new(0, 2, 0));
assert!(context.data_version < Version::new(0, 5, 0));
let old_entry = context.helper.old_data().get_proof_entry::<_, u32>("entry");
assert_eq!(old_entry.get(), Some(1));
let mut new_entry = context.helper.new_data().get_proof_entry::<_, u32>("entry");
new_entry.set(2);
Ok(())
}
fn migration_06(context: &mut MigrationContext) -> Result<(), MigrationError> {
assert_eq!(context.instance_spec.name, "test");
assert_eq!(context.instance_spec.artifact.name, ARTIFACT_NAME);
assert!(context.data_version >= Version::new(0, 5, 0));
assert!(context.data_version < Version::new(0, 6, 0));
let old_entry = context.helper.old_data().get_proof_entry::<_, u32>("entry");
assert_eq!(old_entry.get(), Some(2));
let mut new_entry = context.helper.new_data().get_proof_entry::<_, u32>("entry");
new_entry.set(3);
Ok(())
}
fn create_linear_migrations() -> LinearMigrations {
LinearMigrations::new(Version::new(0, 6, 3))
.add_script(Version::new(0, 2, 0), migration_02)
.add_script(Version::new(0, 5, 0), migration_05)
.add_script(Version::new(0, 6, 0), migration_06)
}
fn execute_scripts(
db: TemporaryDB,
start_version: Version,
scripts: Vec<MigrationScript>,
) -> Box<dyn Snapshot> {
let db = Arc::new(db);
let artifact = ArtifactId::from_raw_parts(
RuntimeIdentifier::Rust as _,
ARTIFACT_NAME.to_owned(),
start_version.clone(),
);
let instance_spec = InstanceSpec::from_raw_parts(100, "test".to_string(), artifact);
let mut version = start_version;
let mut migration_hashes = HashSet::new();
for script in scripts {
let mut context = MigrationContext::new(
MigrationHelper::new(Arc::clone(&db) as Arc<dyn Database>, "test"),
instance_spec.clone(),
version.clone(),
);
let next_version = script.end_version().to_owned();
assert!(
version < next_version,
"current version = {}, next version = {}",
version,
next_version
);
version = next_version;
script.execute(&mut context).unwrap();
let migration_hash = context.helper.finish().unwrap();
assert_ne!(migration_hash, Hash::zero());
assert!(migration_hashes.insert(migration_hash));
let mut fork = db.fork();
flush_migration(&mut fork, "test");
db.merge(fork.into_patch()).unwrap();
}
db.snapshot()
}
#[test]
fn linear_migration_all_scripts() {
let migrations = create_linear_migrations();
assert_eq!(migrations.latest_version, Version::new(0, 6, 3));
assert_eq!(migrations.min_start_version, None);
assert_eq!(migrations.scripts.len(), 3);
let start_version = Version::new(0, 1, 0);
let scripts = migrations.select(&start_version).unwrap();
assert_eq!(scripts.len(), 3);
let snapshot = execute_scripts(TemporaryDB::new(), start_version, scripts);
let entry = snapshot.get_proof_entry::<_, u32>("test.entry");
assert_eq!(entry.get(), Some(3));
}
#[test]
fn linear_migration_part_of_scripts() {
let migrations = create_linear_migrations();
let start_version = Version::new(0, 2, 0);
let scripts = migrations.select(&start_version).unwrap();
assert_eq!(scripts.len(), 2);
let db = TemporaryDB::new();
let fork = db.fork();
fork.get_proof_entry::<_, u32>("test.entry").set(1);
db.merge(fork.into_patch()).unwrap();
let snapshot = execute_scripts(db, start_version, scripts);
let entry = snapshot.get_proof_entry::<_, u32>("test.entry");
assert_eq!(entry.get(), Some(3));
}
#[test]
fn prereleases_are_not_supported_by_default() {
let migrations = create_linear_migrations();
let start_version: Version = "0.2.0-pre.2".parse().unwrap();
let err = migrations.select(&start_version).unwrap_err();
assert_matches!(err, InitMigrationError::UnsupportedStart(_));
}
#[test]
#[should_panic(expected = "Prerelease versions require using `with_prereleases`")]
fn prerelease_in_constructor_leads_to_panic() {
LinearMigrations::new("0.3.0-pre.0".parse().unwrap());
}
#[test]
#[should_panic(expected = "Prerelease versions require using `with_prereleases`")]
fn prerelease_in_migration_spec_leads_to_panic() {
LinearMigrations::new(Version::new(0, 3, 1))
.add_script("0.2.0-pre.1".parse().unwrap(), migration_02);
}
#[test]
fn linear_migration_out_of_bounds_version() {
let migrations = create_linear_migrations();
let start_version = Version::new(1, 0, 0);
let err = migrations.select(&start_version).unwrap_err();
assert_matches!(
err,
InitMigrationError::FutureStartVersion { ref max_supported_version }
if *max_supported_version == Version::new(0, 6, 3)
);
let start_version = Version::new(0, 1, 0);
let migrations = LinearMigrations::new(Version::new(0, 5, 7))
.set_min_version(Version::new(0, 4, 0))
.add_script(Version::new(0, 5, 0), migration_05);
let err = migrations.select(&start_version).unwrap_err();
assert_matches!(
err,
InitMigrationError::OldStartVersion { ref min_supported_version }
if *min_supported_version == Version::new(0, 4, 0)
);
}
fn create_migrations_with_prerelease() -> LinearMigrations {
let pre_version: Version = "0.2.0-alpha.0".parse().unwrap();
let pre_version_ = pre_version.clone();
LinearMigrations::with_prereleases(Version::new(0, 3, 2))
.add_script(pre_version.clone(), move |ctx| {
let start_version = &ctx.data_version;
assert!(*start_version < pre_version_);
ctx.helper.new_data().get_proof_entry("v02pre").set(1_u8);
Ok(())
})
.add_script(Version::new(0, 2, 0), move |ctx| {
let start_version = &ctx.data_version;
assert!(*start_version >= pre_version);
assert!(*start_version < Version::new(0, 2, 0));
ctx.helper.new_data().get_proof_entry("v02").set(2_u8);
Ok(())
})
.add_script(Version::new(0, 3, 0), |ctx| {
let start_version = &ctx.data_version;
assert!(*start_version >= Version::new(0, 2, 0));
assert!(*start_version < Version::new(0, 3, 0));
ctx.helper.new_data().get_proof_entry("v03").set(3_u8);
Ok(())
})
}
#[test]
fn linear_migration_from_prerelease_with_explicit_allowance() {
let start_version = Version::new(0, 1, 7);
let scripts = create_migrations_with_prerelease()
.select(&start_version)
.unwrap();
assert_eq!(scripts.len(), 3);
execute_scripts(TemporaryDB::new(), start_version, scripts);
let start_version = "0.2.0-alpha.0".parse().unwrap();
let scripts = create_migrations_with_prerelease()
.select(&start_version)
.unwrap();
assert_eq!(scripts.len(), 2);
execute_scripts(TemporaryDB::new(), start_version, scripts);
let start_version = "0.2.0-alpha.0".parse().unwrap();
let scripts = create_migrations_with_prerelease()
.select(&start_version)
.unwrap();
assert_eq!(scripts.len(), 2);
execute_scripts(TemporaryDB::new(), start_version, scripts);
let start_version = "0.3.0-alpha.0".parse().unwrap();
let scripts = create_migrations_with_prerelease()
.select(&start_version)
.unwrap();
assert_eq!(scripts.len(), 1);
execute_scripts(TemporaryDB::new(), start_version, scripts);
let start_version = "0.3.0".parse().unwrap();
let scripts = create_migrations_with_prerelease()
.select(&start_version)
.unwrap();
assert!(scripts.is_empty());
execute_scripts(TemporaryDB::new(), start_version, scripts);
}
}