use assert_matches::assert_matches;
use exonum_crypto::KeyPair;
use exonum_merkledb::{
access::{AccessExt, CopyAccessExt},
migration::Migration,
HashTag, ObjectHash, SystemSchema, TemporaryDB,
};
use std::time::Duration;
use super::*;
use crate::{
blockchain::{ApiSender, Block, BlockParams, BlockchainMut},
helpers::{Height, ValidatorId},
runtime::{
migrations::{InitMigrationError, MigrationError},
oneshot::Receiver,
BlockchainData, CoreError, DispatcherSchema, ErrorMatch, MethodId, RuntimeIdentifier,
SnapshotExt, WellKnownRuntime,
},
};
const DELAY: Duration = Duration::from_millis(40);
#[derive(Default, Debug, Clone)]
struct MigrationRuntime {
run_good_script: bool,
}
impl MigrationRuntime {
fn with_script_flag(flag: bool) -> Self {
Self {
run_good_script: flag,
}
}
}
impl WellKnownRuntime for MigrationRuntime {
const ID: u32 = 2;
}
impl Runtime for MigrationRuntime {
fn deploy_artifact(&mut self, _artifact: ArtifactId, _deploy_spec: Vec<u8>) -> Receiver {
Receiver::with_result(Ok(()))
}
fn is_supported(&self, feature: &RuntimeFeature) -> bool {
match feature {
RuntimeFeature::FreezingServices => true,
}
}
fn is_artifact_deployed(&self, _id: &ArtifactId) -> bool {
true
}
fn initiate_adding_service(
&self,
_context: ExecutionContext<'_>,
_artifact: &ArtifactId,
_parameters: Vec<u8>,
) -> Result<(), ExecutionError> {
Ok(())
}
fn initiate_resuming_service(
&self,
_context: ExecutionContext<'_>,
_artifact: &ArtifactId,
_parameters: Vec<u8>,
) -> Result<(), ExecutionError> {
Ok(())
}
fn update_service_status(&mut self, _snapshot: &dyn Snapshot, _state: &InstanceState) {}
fn migrate(
&self,
new_artifact: &ArtifactId,
data_version: &Version,
) -> Result<Option<MigrationScript>, InitMigrationError> {
let mut end_version = new_artifact.version.clone();
end_version.patch = 0;
let script = match new_artifact.name.as_str() {
"good" => simple_delayed_migration,
"complex" => {
let version1 = Version::new(0, 2, 0);
let version2 = Version::new(0, 3, 0);
if *data_version < version1 {
end_version = version1;
complex_migration_part1
} else if *data_version < version2 && new_artifact.version >= version2 {
end_version = version2;
complex_migration_part2
} else {
return Ok(None);
}
}
"not-good" => erroneous_migration,
"bad" => panicking_migration,
"with-state" => migration_modifying_state_hash,
"none" => return Ok(None),
"good-or-not-good" => {
if self.run_good_script {
simple_delayed_migration
} else {
erroneous_migration
}
}
_ => return Err(InitMigrationError::NotSupported),
};
let script = MigrationScript::new(script, end_version);
Ok(Some(script))
}
fn execute(
&self,
_context: ExecutionContext<'_>,
_method_id: MethodId,
_arguments: &[u8],
) -> Result<(), ExecutionError> {
Ok(())
}
fn before_transactions(&self, _context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
Ok(())
}
fn after_transactions(&self, _context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
Ok(())
}
fn after_commit(&mut self, _snapshot: &dyn Snapshot, _mailbox: &mut Mailbox) {}
}
fn simple_delayed_migration(_ctx: &mut MigrationContext) -> Result<(), MigrationError> {
thread::sleep(DELAY);
Ok(())
}
fn erroneous_migration(_ctx: &mut MigrationContext) -> Result<(), MigrationError> {
thread::sleep(DELAY);
Err(MigrationError::new("This migration is unsuccessful!"))
}
fn panicking_migration(_ctx: &mut MigrationContext) -> Result<(), MigrationError> {
thread::sleep(DELAY);
panic!("This migration is unsuccessful!");
}
fn migration_modifying_state_hash(ctx: &mut MigrationContext) -> Result<(), MigrationError> {
for i in 1_u32..=2 {
ctx.helper.new_data().get_proof_entry("entry").set(i);
thread::sleep(DELAY / 2);
ctx.helper.merge()?;
}
Ok(())
}
fn complex_migration_part1(ctx: &mut MigrationContext) -> Result<(), MigrationError> {
assert!(ctx.data_version < Version::new(0, 2, 0));
ctx.helper.new_data().get_proof_entry("entry").set(1_u32);
Ok(())
}
fn complex_migration_part2(ctx: &mut MigrationContext) -> Result<(), MigrationError> {
assert!(ctx.data_version >= Version::new(0, 2, 0));
assert!(ctx.data_version < Version::new(0, 3, 0));
ctx.helper.new_data().get_proof_entry("entry").set(2_u32);
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum LocalResult {
None,
InMemory,
Saved,
SavedWithNodeRestart,
}
#[derive(Debug)]
struct Rig {
blockchain: BlockchainMut,
next_service_id: InstanceId,
}
impl Rig {
fn new() -> Self {
Self::with_db_and_flag(Arc::new(TemporaryDB::new()), false)
}
fn with_db_and_flag(db: Arc<TemporaryDB>, flag: bool) -> Self {
let blockchain = Blockchain::new(
db as Arc<dyn Database>,
KeyPair::random(),
ApiSender::closed(),
);
let blockchain = blockchain
.into_mut_with_dummy_config()
.with_runtime(MigrationRuntime::with_script_flag(flag))
.build();
Self {
blockchain,
next_service_id: 100,
}
}
fn migration_hash(&self, indexes: &[(&str, Hash)]) -> Hash {
let fork = self.blockchain.fork();
let mut aggregator = fork.get_proof_map::<_, str, Hash>("_aggregator");
for &(index_name, hash) in indexes {
aggregator.put(index_name, hash);
}
aggregator.object_hash()
}
fn stop(self) -> Blockchain {
self.blockchain.immutable_view()
}
fn restart(&mut self) {
let blockchain = self.blockchain.as_ref().to_owned();
let blockchain = blockchain
.into_mut_with_dummy_config()
.with_runtime(MigrationRuntime::default())
.build();
self.blockchain = blockchain;
}
fn dispatcher(&mut self) -> &mut Dispatcher {
self.blockchain.dispatcher()
}
fn migration_threads(&mut self) -> &HashMap<String, MigrationThread> {
&self.dispatcher().migrations.threads
}
fn assert_no_migration_threads(&mut self) {
assert!(self.migration_threads().is_empty());
}
fn wait_migration_threads(&mut self, local_result: LocalResult) {
if local_result == LocalResult::None {
} else {
thread::sleep(DELAY * 3);
if local_result == LocalResult::InMemory {
} else {
self.create_block(self.blockchain.fork());
assert!(self.dispatcher().migrations.threads.is_empty());
if local_result == LocalResult::SavedWithNodeRestart {
self.restart();
}
}
}
}
fn create_block(&mut self, fork: Fork) -> Block {
let block_params = BlockParams::new(ValidatorId(0), Height(100), &[]);
let patch = self
.blockchain
.create_patch_inner(fork, &block_params, &[], &());
self.blockchain.commit(patch, vec![]).unwrap();
self.blockchain.as_ref().last_block()
}
fn deploy_artifact(&mut self, name: &str, version: Version) -> ArtifactId {
let artifact = ArtifactId::from_raw_parts(MigrationRuntime::ID, name.into(), version);
let fork = self.blockchain.fork();
Dispatcher::commit_artifact(&fork, &artifact, vec![]);
self.create_block(fork);
artifact
}
fn initialize_service(&mut self, artifact: ArtifactId, name: &str) -> InstanceSpec {
let service = InstanceSpec::from_raw_parts(self.next_service_id, name.to_owned(), artifact);
self.next_service_id += 1;
let mut fork = self.blockchain.fork();
let mut should_rollback = false;
let mut context = ExecutionContext::for_block_call(
self.dispatcher(),
&mut fork,
&mut should_rollback,
service.as_descriptor(),
);
context
.initiate_adding_service(service.clone(), vec![])
.expect("`initiate_adding_service` failed");
assert!(!should_rollback);
self.create_block(fork);
service
}
fn stop_service(&mut self, spec: &InstanceSpec) {
let fork = self.blockchain.fork();
Dispatcher::initiate_stopping_service(&fork, spec.id).unwrap();
self.create_block(fork);
}
fn freeze_service(&mut self, spec: &InstanceSpec) {
let fork = self.blockchain.fork();
self.dispatcher()
.initiate_freezing_service(&fork, spec.id)
.unwrap();
self.create_block(fork);
}
}
fn test_migration_workflow(freeze_service: bool) {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact.clone(), "good");
let fork = rig.blockchain.fork();
let err = rig
.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::InvalidServiceTransition)
.with_description_containing("Data migration cannot be initiated")
);
if freeze_service {
rig.freeze_service(&service);
} else {
rig.stop_service(&service);
}
let fork = rig.blockchain.fork();
let ty = rig
.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
assert_matches!(ty, MigrationType::Async);
assert!(!rig.migration_threads().contains_key(&service.name));
let err = Dispatcher::unload_artifact(&fork, &new_artifact).unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::CannotUnloadArtifact)
.with_description_containing("`100:good` references it as the data migration target")
);
rig.create_block(fork);
assert!(rig.migration_threads().contains_key(&service.name));
let snapshot = rig.blockchain.snapshot();
assert!(snapshot.for_service(service.id).is_some());
let fork = rig.blockchain.fork();
let err = Dispatcher::unload_artifact(&fork, &old_artifact).unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::CannotUnloadArtifact)
.with_description_containing("`100:good` references it as the current artifact")
);
let err = Dispatcher::unload_artifact(&fork, &new_artifact).unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::CannotUnloadArtifact)
.with_description_containing("`100:good` references it as the data migration target")
);
for _ in 0..3 {
rig.create_block(rig.blockchain.fork());
}
thread::sleep(DELAY * 3);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let state = schema.get_instance(service.id).unwrap();
let end_version = match state.status.unwrap() {
InstanceStatus::Migrating(migration) => migration.end_version,
status => panic!("Unexpected service status: {:?}", status),
};
assert_eq!(end_version, Version::new(0, 5, 0));
let res = schema.local_migration_result(&service.name).unwrap();
assert_eq!(res.0, Ok(HashTag::empty_map_hash()));
assert!(!rig.migration_threads().contains_key(&service.name));
for _ in 0..3 {
rig.create_block(rig.blockchain.fork());
}
assert!(!rig.migration_threads().contains_key(&service.name));
}
#[test]
fn migration_workflow() {
test_migration_workflow(false);
}
#[test]
fn migration_workflow_with_frozen_service() {
test_migration_workflow(true);
}
#[test]
fn migration_after_artifact_unloading() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
Dispatcher::unload_artifact(&fork, &new_artifact).unwrap();
let err = rig
.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap_err();
let expected_msg =
"artifact `2:good:0.5.2` for data migration of service `100:good` is not active";
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::ArtifactNotDeployed)
.with_description_containing(expected_msg)
);
}
fn test_fast_forward_migration(freeze_service: bool) {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("none", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("none", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact.clone(), "service");
if freeze_service {
rig.freeze_service(&service);
} else {
rig.stop_service(&service);
}
let fork = rig.blockchain.fork();
let ty = rig
.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
assert_matches!(ty, MigrationType::FastForward);
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let state = schema.get_instance(service.id).unwrap();
assert_eq!(state.status, Some(InstanceStatus::Stopped));
assert_eq!(state.pending_status, None);
assert_eq!(state.spec.artifact, new_artifact);
assert_eq!(state.data_version, None);
let fork = rig.blockchain.fork();
Dispatcher::unload_artifact(&fork, &old_artifact).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
assert!(DispatcherSchema::new(&snapshot)
.get_artifact(&old_artifact)
.is_none());
}
#[test]
fn fast_forward_migration() {
test_fast_forward_migration(false);
}
#[test]
fn fast_forward_migration_with_service_freezing() {
test_fast_forward_migration(true);
}
#[test]
fn migration_immediate_errors() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let unrelated_artifact = rig.deploy_artifact("unrelated", "1.0.1".parse().unwrap());
let old_service = rig.initialize_service(old_artifact.clone(), "old");
rig.stop_service(&old_service);
let new_service = rig.initialize_service(new_artifact.clone(), "new");
rig.stop_service(&new_service);
let fork = rig.blockchain.fork();
let err = rig
.dispatcher()
.initiate_migration(&fork, unrelated_artifact, &old_service.name)
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::CannotUpgradeService).with_any_description()
);
let err = rig
.dispatcher()
.initiate_migration(&fork, old_artifact, &new_service.name)
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::CannotUpgradeService).with_any_description()
);
let err = rig
.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &new_service.name)
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::CannotUpgradeService).with_any_description()
);
let err = rig
.dispatcher()
.initiate_migration(&fork, new_artifact, "bogus-service")
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::IncorrectInstanceId)
.with_description_containing("for non-existing service `bogus-service`")
);
let unknown_artifact = ArtifactId::from_raw_parts(
RuntimeIdentifier::Rust as _,
"good".into(),
Version::new(0, 6, 0),
);
let err = rig
.dispatcher()
.initiate_migration(&fork, unknown_artifact.clone(), &old_service.name)
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::UnknownArtifactId).with_any_description()
);
Dispatcher::commit_artifact(&fork, &unknown_artifact, vec![]);
let err = rig
.dispatcher()
.initiate_migration(&fork, unknown_artifact, &old_service.name)
.unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::ArtifactNotDeployed).with_any_description()
);
}
#[test]
fn migration_is_resumed_after_node_restart() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
rig.restart();
assert!(rig.migration_threads().contains_key(&service.name));
thread::sleep(DELAY * 3);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema.local_migration_result(&service.name).unwrap();
assert_eq!(res.0, Ok(HashTag::empty_map_hash()));
}
#[test]
fn migration_threads_are_timely_aborted() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("with-state", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("with-state", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
thread::sleep(DELAY * 2 / 3);
let blockchain = rig.stop();
thread::sleep(DELAY * 10);
let snapshot = blockchain.snapshot();
let migration = Migration::new(&service.name, &snapshot);
let val = migration
.get_proof_entry::<_, u32>("entry")
.get()
.unwrap_or(0);
assert!(val < 2);
thread::sleep(DELAY * 2);
let snapshot = blockchain.snapshot();
let migration = Migration::new(&service.name, &snapshot);
let new_val = migration
.get_proof_entry::<_, u32>("entry")
.get()
.unwrap_or(0);
assert_eq!(val, new_val);
}
#[test]
fn completed_migration_is_not_resumed_after_node_restart() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
thread::sleep(DELAY * 3);
rig.create_block(rig.blockchain.fork());
rig.assert_no_migration_threads();
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
assert!(schema.local_migration_result(&service.name).is_some());
rig.restart();
rig.assert_no_migration_threads();
}
fn test_erroneous_migration(artifact_name: &str) {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact(artifact_name, "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact(artifact_name, "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "service");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
let res = loop {
thread::sleep(DELAY * 3);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
if let Some(res) = schema.local_migration_result(&service.name) {
break res;
}
};
assert!(res
.0
.unwrap_err()
.contains("This migration is unsuccessful!"));
}
#[test]
fn migration_with_error() {
test_erroneous_migration("not-good");
}
#[test]
fn migration_with_panic() {
test_erroneous_migration("bad");
}
#[test]
fn concurrent_migrations_to_same_artifact() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact.clone(), "service");
rig.stop_service(&service);
let other_service = rig.initialize_service(old_artifact.clone(), "other-service");
rig.stop_service(&other_service);
let another_service = rig.initialize_service(old_artifact, "another-service");
rig.stop_service(&another_service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &other_service.name)
.unwrap();
rig.create_block(fork);
let threads = rig.migration_threads();
assert!(threads.contains_key(&service.name));
assert!(threads.contains_key(&other_service.name));
assert!(!threads.contains_key(&another_service.name));
thread::sleep(DELAY * 2 / 3);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &another_service.name)
.unwrap();
rig.create_block(fork);
assert!(rig.migration_threads().contains_key(&another_service.name));
thread::sleep(DELAY / 2);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema.local_migration_result(&service.name).unwrap();
assert_eq!(res.0, Ok(HashTag::empty_map_hash()));
let res = schema.local_migration_result(&other_service.name).unwrap();
assert_eq!(res.0, Ok(HashTag::empty_map_hash()));
thread::sleep(DELAY);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema
.local_migration_result(&another_service.name)
.unwrap();
assert_eq!(res.0, Ok(HashTag::empty_map_hash()));
rig.assert_no_migration_threads();
}
#[test]
fn migration_influencing_state_hash() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("with-state", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("with-state", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "service");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
let state_hash = rig.create_block(fork).state_hash;
for _ in 0..2 {
thread::sleep(DELAY * 2 / 3);
let fork = rig.blockchain.fork();
let blockchain_data = BlockchainData::new(&fork, "test");
assert!(!blockchain_data
.for_service(service.id)
.unwrap()
.get_proof_entry::<_, u32>("entry")
.exists());
let new_state_hash = rig.create_block(fork).state_hash;
assert_eq!(state_hash, new_state_hash);
}
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema.local_migration_result(&service.name).unwrap();
let migration_hash = res.0.unwrap();
let migration = Migration::new(&service.name, &snapshot);
assert_eq!(migration_hash, migration.state_hash());
let aggregator = migration.state_aggregator();
assert_eq!(
aggregator.keys().collect::<Vec<_>>(),
vec!["service.entry".to_owned()]
);
assert_eq!(aggregator.get("service.entry"), Some(2_u32.object_hash()));
}
#[test]
fn migration_rollback_workflow() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
thread::sleep(DELAY * 3);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
schema.local_migration_result(&service.name).unwrap();
rig.assert_no_migration_threads();
let fork = rig.blockchain.fork();
Dispatcher::rollback_migration(&fork, &service.name).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
assert!(schema.local_migration_result(&service.name).is_none());
let state = schema.get_instance(service.id).unwrap();
assert_eq!(state.status, Some(InstanceStatus::Stopped));
assert_eq!(state.data_version, None);
}
#[test]
fn migration_rollback_invariants() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
let fork = rig.blockchain.fork();
let err = Dispatcher::rollback_migration(&fork, "bogus").unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::IncorrectInstanceId)
.with_description_containing("Cannot rollback migration for unknown service `bogus`")
);
let err = Dispatcher::rollback_migration(&fork, &service.name).unwrap_err();
let no_migration_match = ErrorMatch::from_fail(&CoreError::NoMigration)
.with_description_containing("it has no ongoing migration");
assert_eq!(err, no_migration_match);
rig.stop_service(&service);
let fork = rig.blockchain.fork();
let err = Dispatcher::rollback_migration(&fork, &service.name).unwrap_err();
assert_eq!(err, no_migration_match);
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, HashTag::empty_map_hash()).unwrap();
let err = Dispatcher::rollback_migration(&fork, &service.name).unwrap_err();
assert_eq!(err, ErrorMatch::from_fail(&CoreError::ServicePending));
rig.create_block(fork);
let fork = rig.blockchain.fork();
let err = Dispatcher::rollback_migration(&fork, &service.name).unwrap_err();
assert_eq!(err, no_migration_match);
}
#[test]
fn migration_rollback_aborts_migration_script() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("with-state", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("with-state", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
let fork = rig.blockchain.fork();
Dispatcher::rollback_migration(&fork, &service.name).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
assert!(schema.local_migration_result(&service.name).is_none());
rig.assert_no_migration_threads();
let migration = Migration::new(&service.name, &snapshot);
assert!(!migration.get_proof_entry::<_, u32>("entry").exists());
thread::sleep(DELAY);
let snapshot = rig.blockchain.snapshot();
let migration = Migration::new(&service.name, &snapshot);
assert!(!migration.get_proof_entry::<_, u32>("entry").exists());
}
#[test]
fn migration_rollback_erases_migration_data() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("with-state", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("with-state", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
thread::sleep(DELAY * 10);
rig.create_block(rig.blockchain.fork());
let snapshot = rig.blockchain.snapshot();
let migration = Migration::new(&service.name, &snapshot);
assert_eq!(migration.get_proof_entry::<_, u32>("entry").get(), Some(2));
let fork = rig.blockchain.fork();
Dispatcher::rollback_migration(&fork, &service.name).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
let migration = Migration::new(&service.name, &snapshot);
assert!(!migration.get_proof_entry::<_, u32>("entry").exists());
}
#[test]
fn migration_commit_workflow() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
rig.create_block(fork);
thread::sleep(DELAY * 3);
rig.create_block(rig.blockchain.fork());
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, HashTag::empty_map_hash()).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema.local_migration_result(&service.name).unwrap();
assert_eq!(res.0.unwrap(), HashTag::empty_map_hash());
let state = schema.get_instance(service.id).unwrap();
let expected_status = InstanceStatus::migrating(InstanceMigration::from_raw_parts(
new_artifact,
Version::new(0, 5, 0),
Some(HashTag::empty_map_hash()),
));
assert_eq!(state.status, Some(expected_status));
}
#[test]
fn migration_commit_invariants() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "good");
let fork = rig.blockchain.fork();
let err = Dispatcher::commit_migration(&fork, "bogus", Hash::zero()).unwrap_err();
assert_eq!(
err,
ErrorMatch::from_fail(&CoreError::IncorrectInstanceId)
.with_description_containing("Cannot commit migration for unknown service `bogus`")
);
let err = Dispatcher::commit_migration(&fork, &service.name, Hash::zero()).unwrap_err();
let no_migration_match = ErrorMatch::from_fail(&CoreError::NoMigration)
.with_description_containing("Cannot commit migration for service `100:good`");
assert_eq!(err, no_migration_match);
rig.stop_service(&service);
let fork = rig.blockchain.fork();
let err = Dispatcher::commit_migration(&fork, &service.name, Hash::zero()).unwrap_err();
assert_eq!(err, no_migration_match);
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
let fork = rig.blockchain.fork();
let migration_hash = HashTag::empty_map_hash();
Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap();
let err = Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap_err();
assert_eq!(err, ErrorMatch::from_fail(&CoreError::ServicePending));
rig.create_block(fork);
let fork = rig.blockchain.fork();
let err = Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap_err();
assert_eq!(err, no_migration_match);
}
fn test_migration_commit_with_local_error(
rig: &mut Rig,
local_result: LocalResult,
artifact_name: &str,
) {
let old_artifact = rig.deploy_artifact(artifact_name, "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact(artifact_name, "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "service");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
rig.wait_migration_threads(local_result);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, Hash::zero()).unwrap();
rig.create_block(fork); }
#[test]
#[should_panic(expected = "locally it has finished with an error: This migration is unsuccessful")]
fn migration_commit_with_local_error_blocking() {
test_migration_commit_with_local_error(&mut Rig::new(), LocalResult::None, "not-good");
}
#[test]
#[should_panic(expected = "locally it has finished with an error: This migration is unsuccessful")]
fn migration_commit_with_local_error_in_memory() {
test_migration_commit_with_local_error(&mut Rig::new(), LocalResult::InMemory, "not-good");
}
#[test]
#[should_panic(expected = "locally it has finished with an error: This migration is unsuccessful")]
fn migration_commit_with_local_error_saved() {
test_migration_commit_with_local_error(&mut Rig::new(), LocalResult::Saved, "not-good");
}
#[test]
#[should_panic(expected = "locally it has finished with an error: This migration is unsuccessful")]
fn migration_commit_with_local_error_saved_and_node_restart() {
test_migration_commit_with_local_error(
&mut Rig::new(),
LocalResult::SavedWithNodeRestart,
"not-good",
);
}
#[test]
fn test_migration_restart() {
let artifact_name = "good-or-not-good";
let service_name = "service";
let db = Arc::new(TemporaryDB::new());
std::panic::catch_unwind(|| {
let mut rig = Rig::with_db_and_flag(Arc::clone(&db), false);
test_migration_commit_with_local_error(&mut rig, LocalResult::Saved, artifact_name)
})
.expect_err("Node should panic on unsuccessful migration commit");
let snapshot = db.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema
.local_migration_result(service_name)
.expect("Schema does not have local result");
assert_eq!(res.0.unwrap_err(), "This migration is unsuccessful!");
let mut fork = db.fork();
rollback_migration(&mut fork, service_name);
remove_local_migration_result(&fork, service_name);
db.merge_sync(fork.into_patch())
.expect("Failed to merge patch after local migration result remove");
let snapshot = db.snapshot();
let schema = DispatcherSchema::new(&snapshot);
assert!(schema.local_migration_result(service_name).is_none());
let mut rig = Rig::with_db_and_flag(Arc::clone(&db), true);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, service_name, HashTag::empty_map_hash())
.expect("Failed to commit migration");
rig.create_block(fork);
rig.assert_no_migration_threads();
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let res = schema.local_migration_result(service_name).unwrap();
assert_eq!(res.0.unwrap(), HashTag::empty_map_hash());
let state = schema.get_instance(100).unwrap();
let artifact = ArtifactId::from_raw_parts(
MigrationRuntime::ID,
artifact_name.to_string(),
"0.5.2".parse().unwrap(),
);
let expected_status = InstanceStatus::migrating(InstanceMigration::from_raw_parts(
artifact,
Version::new(0, 5, 0),
Some(HashTag::empty_map_hash()),
));
assert_eq!(state.status, Some(expected_status));
}
fn test_migration_commit_with_differing_hash(local_result: LocalResult) {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("good", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("good", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "service");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
rig.wait_migration_threads(local_result);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, Hash::zero()).unwrap();
rig.create_block(fork); }
#[test]
#[should_panic(expected = "locally it has finished with another hash")]
fn migration_commit_with_differing_hash_blocking() {
test_migration_commit_with_differing_hash(LocalResult::None);
}
#[test]
#[should_panic(expected = "locally it has finished with another hash")]
fn migration_commit_with_differing_hash_in_memory() {
test_migration_commit_with_differing_hash(LocalResult::InMemory);
}
#[test]
#[should_panic(expected = "locally it has finished with another hash")]
fn migration_commit_with_differing_hash_saved() {
test_migration_commit_with_differing_hash(LocalResult::Saved);
}
#[test]
#[should_panic(expected = "locally it has finished with another hash")]
fn migration_commit_with_differing_hash_saved_and_node_restarted() {
test_migration_commit_with_differing_hash(LocalResult::SavedWithNodeRestart);
}
#[test]
fn migration_commit_without_completing_script_locally() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("with-state", "0.3.0".parse().unwrap());
let new_artifact = rig.deploy_artifact("with-state", "0.5.2".parse().unwrap());
let service = rig.initialize_service(old_artifact, "test");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
rig.create_block(fork);
let migration_hash = rig.migration_hash(&[("test.entry", 2_u32.object_hash())]);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap();
rig.create_block(fork);
rig.assert_no_migration_threads();
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let state = schema.get_instance(service.id).unwrap();
let expected_status = InstanceStatus::migrating(InstanceMigration::from_raw_parts(
new_artifact,
Version::new(0, 5, 0),
Some(migration_hash),
));
assert_eq!(state.status, Some(expected_status));
let mut fork = rig.blockchain.fork();
Dispatcher::flush_migration(&mut fork, &service.name).unwrap();
let state_hash = rig.create_block(fork).state_hash;
let snapshot = rig.blockchain.snapshot();
let schema = DispatcherSchema::new(&snapshot);
let state = schema.get_instance(service.id).unwrap();
assert_eq!(state.data_version, Some(Version::new(0, 5, 0)));
assert_eq!(state.status, Some(InstanceStatus::Stopped));
assert!(schema.local_migration_result(&service.name).is_none());
let entry = snapshot.get_proof_entry::<_, u32>("test.entry");
assert_eq!(entry.get(), Some(2));
let aggregator = SystemSchema::new(&snapshot).state_aggregator();
assert_eq!(aggregator.get("test.entry"), Some(2_u32.object_hash()));
assert_eq!(aggregator.object_hash(), state_hash);
}
#[test]
fn two_part_migration() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("complex", "0.1.1".parse().unwrap());
let new_artifact = rig.deploy_artifact("complex", "0.3.7".parse().unwrap());
let service = rig.initialize_service(old_artifact.clone(), "test");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
rig.create_block(fork);
let migration_hash = rig.migration_hash(&[("test.entry", 1_u32.object_hash())]);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap();
rig.create_block(fork);
let mut fork = rig.blockchain.fork();
Dispatcher::flush_migration(&mut fork, &service.name).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
assert_eq!(
snapshot.get_proof_entry::<_, u32>("test.entry").get(),
Some(1)
);
let schema = DispatcherSchema::new(&snapshot);
let instance_state = schema.get_instance(service.id).unwrap();
assert_eq!(instance_state.data_version, Some(Version::new(0, 2, 0)));
let fork = rig.blockchain.fork();
Dispatcher::unload_artifact(&fork, &old_artifact).unwrap();
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
rig.create_block(fork);
let migration_hash = rig.migration_hash(&[("test.entry", 2_u32.object_hash())]);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap();
rig.create_block(fork);
let mut fork = rig.blockchain.fork();
Dispatcher::flush_migration(&mut fork, &service.name).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
assert_eq!(
snapshot.get_proof_entry::<_, u32>("test.entry").get(),
Some(2)
);
let schema = DispatcherSchema::new(&snapshot);
let instance_state = schema.get_instance(service.id).unwrap();
assert_eq!(instance_state.data_version, Some(Version::new(0, 3, 0)));
let fork = rig.blockchain.fork();
Dispatcher::unload_artifact(&fork, &new_artifact).unwrap();
rig.create_block(fork);
}
#[test]
fn two_part_migration_with_intermediate_artifact() {
let mut rig = Rig::new();
let old_artifact = rig.deploy_artifact("complex", "0.1.1".parse().unwrap());
let intermediate_artifact = rig.deploy_artifact("complex", "0.2.2".parse().unwrap());
let new_artifact = rig.deploy_artifact("complex", "0.3.7".parse().unwrap());
let service = rig.initialize_service(old_artifact, "test");
rig.stop_service(&service);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact.clone(), &service.name)
.unwrap();
rig.create_block(fork);
let migration_hash = rig.migration_hash(&[("test.entry", 1_u32.object_hash())]);
let fork = rig.blockchain.fork();
Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap();
rig.create_block(fork);
let mut fork = rig.blockchain.fork();
Dispatcher::flush_migration(&mut fork, &service.name).unwrap();
rig.create_block(fork);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, intermediate_artifact.clone(), &service.name)
.unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
assert_eq!(
snapshot.get_proof_entry::<_, u32>("test.entry").get(),
Some(1)
);
let schema = DispatcherSchema::new(&snapshot);
let instance_state = schema.get_instance(service.id).unwrap();
assert_eq!(instance_state.status, Some(InstanceStatus::Stopped));
assert_eq!(instance_state.spec.artifact, intermediate_artifact);
assert_eq!(instance_state.data_version, None);
let fork = rig.blockchain.fork();
rig.dispatcher()
.initiate_migration(&fork, new_artifact, &service.name)
.unwrap();
rig.create_block(fork);
thread::sleep(DELAY * 5);
let migration_hash = rig.migration_hash(&[("test.entry", 2_u32.object_hash())]);
let fork = rig.blockchain.fork();
let blockchain_data = BlockchainData::new(&fork, "other");
let entry_value = blockchain_data
.for_service(service.id)
.unwrap()
.get_proof_entry::<_, u32>("entry")
.get();
assert_eq!(entry_value, Some(1));
Dispatcher::commit_migration(&fork, &service.name, migration_hash).unwrap();
rig.create_block(fork);
let mut fork = rig.blockchain.fork();
Dispatcher::flush_migration(&mut fork, &service.name).unwrap();
rig.create_block(fork);
let snapshot = rig.blockchain.snapshot();
assert_eq!(
snapshot.get_proof_entry::<_, u32>("test.entry").get(),
Some(2)
);
let schema = DispatcherSchema::new(&snapshot);
let instance_state = schema.get_instance(service.id).unwrap();
assert_eq!(instance_state.data_version, Some(Version::new(0, 3, 0)));
}