use exonum_crypto::Hash;
use exonum_derive::*;
use exonum_merkledb::{
access::{Access, AccessExt, AsReadonly},
Fork, KeySetIndex, MapIndex, ProofMapIndex,
};
use exonum_proto::ProtobufConvert;
use crate::{
proto::schema::{
self, details::ModifiedInstanceInfo_MigrationTransition as PbMigrationTransition,
},
runtime::{
migrations::{InstanceMigration, MigrationStatus},
ArtifactId, ArtifactState, ArtifactStatus, CoreError, ExecutionError, ExecutionFail,
InstanceId, InstanceQuery, InstanceSpec, InstanceState, InstanceStatus,
},
};
const ARTIFACTS: &str = "dispatcher_artifacts";
const PENDING_ARTIFACTS: &str = "dispatcher_pending_artifacts";
const INSTANCES: &str = "dispatcher_instances";
const PENDING_INSTANCES: &str = "dispatcher_pending_instances";
const LOCAL_MIGRATION_RESULTS: &str = "dispatcher_local_migration_results";
const INSTANCE_IDS: &str = "dispatcher_instance_ids";
#[derive(Debug)]
pub(super) enum ArtifactAction {
Deploy(Vec<u8>),
Unload,
}
#[derive(Debug, ProtobufConvert, BinaryValue)]
#[protobuf_convert(source = "schema::details::ModifiedInstanceInfo")]
pub(super) struct ModifiedInstanceInfo {
#[protobuf_convert(with = "MigrationTransition")]
pub migration_transition: Option<MigrationTransition>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub(super) enum MigrationTransition {
Start,
Commit,
Rollback,
}
impl MigrationTransition {
#[allow(clippy::wrong_self_convention, clippy::trivially_copy_pass_by_ref)]
fn to_pb(value: &Option<Self>) -> PbMigrationTransition {
use PbMigrationTransition::*;
match value {
None => NONE,
Some(Self::Start) => START,
Some(Self::Commit) => COMMIT,
Some(Self::Rollback) => ROLLBACK,
}
}
fn from_pb(pb: PbMigrationTransition) -> anyhow::Result<Option<Self>> {
use PbMigrationTransition::*;
Ok(match pb {
NONE => None,
START => Some(Self::Start),
COMMIT => Some(Self::Commit),
ROLLBACK => Some(Self::Rollback),
})
}
}
#[derive(Debug, Clone, Copy)]
enum MigrationOutcome {
Rollback,
Commit(Hash),
}
impl MigrationOutcome {
fn as_verb(self) -> &'static str {
match self {
Self::Rollback => "rollback",
Self::Commit(_) => "commit",
}
}
}
impl From<MigrationOutcome> for MigrationTransition {
fn from(value: MigrationOutcome) -> Self {
match value {
MigrationOutcome::Rollback => Self::Rollback,
MigrationOutcome::Commit(_) => Self::Commit,
}
}
}
#[derive(Debug)]
pub struct Schema<T: Access> {
access: T,
}
impl<T: Access> Schema<T> {
pub(crate) fn new(access: T) -> Self {
Self { access }
}
pub(crate) fn artifacts(&self) -> ProofMapIndex<T::Base, ArtifactId, ArtifactState> {
self.access.get_proof_map(ARTIFACTS)
}
pub(crate) fn instances(&self) -> ProofMapIndex<T::Base, str, InstanceState> {
self.access.get_proof_map(INSTANCES)
}
fn instance_ids(&self) -> MapIndex<T::Base, InstanceId, String> {
self.access.get_map(INSTANCE_IDS)
}
fn pending_artifacts(&self) -> KeySetIndex<T::Base, ArtifactId> {
self.access.get_key_set(PENDING_ARTIFACTS)
}
fn modified_instances(&self) -> MapIndex<T::Base, str, ModifiedInstanceInfo> {
self.access.get_map(PENDING_INSTANCES)
}
pub(crate) fn local_migration_results(&self) -> MapIndex<T::Base, str, MigrationStatus> {
self.access.get_map(LOCAL_MIGRATION_RESULTS)
}
pub fn get_instance<'q>(&self, query: impl Into<InstanceQuery<'q>>) -> Option<InstanceState> {
let instances = self.instances();
match query.into() {
InstanceQuery::Id(id) => self
.instance_ids()
.get(&id)
.and_then(|instance_name| instances.get(&instance_name)),
InstanceQuery::Name(instance_name) => instances.get(instance_name),
}
}
pub fn get_artifact(&self, name: &ArtifactId) -> Option<ArtifactState> {
self.artifacts().get(name)
}
pub fn local_migration_result(&self, instance_name: &str) -> Option<MigrationStatus> {
self.local_migration_results().get(instance_name)
}
pub fn check_unloading_artifact(&self, artifact: &ArtifactId) -> Result<(), ExecutionError> {
self.do_check_unloading_artifact(artifact).map(drop)
}
fn do_check_unloading_artifact(
&self,
artifact: &ArtifactId,
) -> Result<ArtifactState, ExecutionError> {
let state = self.artifacts().get(artifact).ok_or_else(|| {
let msg = format!(
"Requested to unload artifact `{}`, which is not deployed",
artifact
);
CoreError::ArtifactNotDeployed.with_description(msg)
})?;
if state.status != ArtifactStatus::Active {
let msg = format!(
"Requested to unload artifact `{}`, which has non-active status: {}",
artifact, state.status
);
return Err(CoreError::ArtifactNotDeployed.with_description(msg));
}
for instance in self.instances().values() {
if instance.associated_artifact() == Some(artifact) {
let msg = format!(
"Cannot unload artifact `{}`: service `{}` references it \
as the current artifact",
artifact,
instance.spec.as_descriptor()
);
return Err(CoreError::CannotUnloadArtifact.with_description(msg));
}
let status = instance
.pending_status
.as_ref()
.or_else(|| instance.status.as_ref());
if let Some(InstanceStatus::Migrating(migration)) = status {
if migration.target == *artifact {
let msg = format!(
"Cannot unload artifact `{}`: service `{}` references it \
as the data migration target",
artifact,
instance.spec.as_descriptor()
);
return Err(CoreError::CannotUnloadArtifact.with_description(msg));
}
}
}
Ok(state)
}
}
impl<T: AsReadonly> Schema<T> {
pub fn service_artifacts(&self) -> ProofMapIndex<T::Readonly, ArtifactId, ArtifactState> {
self.access.as_readonly().get_proof_map(ARTIFACTS)
}
pub fn service_instances(&self) -> ProofMapIndex<T::Readonly, str, InstanceState> {
self.access.as_readonly().get_proof_map(INSTANCES)
}
}
impl Schema<&Fork> {
pub(super) fn add_pending_artifact(
&mut self,
artifact: &ArtifactId,
deploy_spec: Vec<u8>,
) -> Result<(), ExecutionError> {
if self.artifacts().contains(artifact) {
let msg = format!("Cannot deploy artifact `{}` twice", artifact);
return Err(CoreError::ArtifactAlreadyDeployed.with_description(msg));
}
self.artifacts().put(
artifact,
ArtifactState::new(deploy_spec, ArtifactStatus::Deploying),
);
self.pending_artifacts().insert(artifact);
Ok(())
}
pub(super) fn add_active_artifact(
&mut self,
artifact: &ArtifactId,
deploy_spec: Vec<u8>,
) -> Result<(), ExecutionError> {
if self.artifacts().contains(artifact) {
let msg = format!("Cannot deploy artifact `{}` twice", artifact);
return Err(CoreError::ArtifactAlreadyDeployed.with_description(msg));
}
self.artifacts().put(
artifact,
ArtifactState::new(deploy_spec, ArtifactStatus::Active),
);
Ok(())
}
pub(super) fn unload_artifact(&mut self, artifact: &ArtifactId) -> Result<(), ExecutionError> {
let mut state = self.do_check_unloading_artifact(artifact)?;
state.status = ArtifactStatus::Unloading;
self.artifacts().put(artifact, state);
self.pending_artifacts().insert(artifact);
Ok(())
}
pub(super) fn check_migration_initiation(
&self,
new_artifact: &ArtifactId,
old_service: &str,
) -> Result<InstanceState, ExecutionError> {
let instance_state = self.instances().get(old_service).ok_or_else(|| {
let msg = format!(
"Cannot initiate migration for non-existing service `{}`",
old_service
);
CoreError::IncorrectInstanceId.with_description(msg)
})?;
if instance_state.status != Some(InstanceStatus::Stopped)
&& instance_state.status != Some(InstanceStatus::Frozen)
{
let msg = format!(
"Data migration cannot be initiated for service `{}` because is not stopped \
or frozen",
instance_state.spec.as_descriptor()
);
return Err(CoreError::InvalidServiceTransition.with_description(msg));
}
if let Some(pending_status) = instance_state.pending_status {
let msg = format!(
"Cannot initiate migration for service `{}` because it has \
another state transition in progress ({})",
old_service, pending_status
);
return Err(CoreError::ServicePending.with_description(msg));
}
let artifact_state = self.artifacts().get(new_artifact).ok_or_else(|| {
let msg = format!(
"The target artifact `{}` for data migration of service `{}` is not deployed",
new_artifact,
instance_state.spec.as_descriptor()
);
CoreError::UnknownArtifactId.with_description(msg)
})?;
if artifact_state.status != ArtifactStatus::Active {
let msg = format!(
"The target artifact `{}` for data migration of service `{}` is not active",
new_artifact,
instance_state.spec.as_descriptor()
);
return Err(CoreError::ArtifactNotDeployed.with_description(msg));
}
if !new_artifact.is_upgrade_of(&instance_state.spec.artifact) {
let msg = format!(
"The target artifact `{}` for data migration of service `{}` is not an upgrade \
of its current artifact `{}`",
new_artifact,
instance_state.spec.as_descriptor(),
instance_state.spec.artifact
);
return Err(CoreError::CannotUpgradeService.with_description(msg));
}
Ok(instance_state)
}
pub(super) fn add_pending_migration(
&mut self,
instance_state: InstanceState,
migration: InstanceMigration,
) {
let pending_status = InstanceStatus::migrating(migration);
self.add_pending_status(
instance_state,
pending_status,
Some(MigrationTransition::Start),
)
.expect("BUG: Cannot add pending service status during migration initialization");
}
pub(super) fn fast_forward_migration(
&mut self,
mut instance_state: InstanceState,
new_artifact: ArtifactId,
) {
debug_assert!(*instance_state.data_version() <= new_artifact.version);
instance_state.status = Some(InstanceStatus::Stopped);
instance_state.data_version = None;
instance_state.spec.artifact = new_artifact;
let instance_name = instance_state.spec.name.clone();
self.instances().put(&instance_name, instance_state);
let modified_info = ModifiedInstanceInfo {
migration_transition: None,
};
self.modified_instances().put(&instance_name, modified_info);
}
fn add_pending_status(
&mut self,
mut instance_state: InstanceState,
pending_status: InstanceStatus,
migration_transition: Option<MigrationTransition>,
) -> Result<(), CoreError> {
if instance_state.pending_status.is_some() {
return Err(CoreError::ServicePending);
}
instance_state.pending_status = Some(pending_status);
let instance_name = instance_state.spec.name.clone();
let modified_info = ModifiedInstanceInfo {
migration_transition,
};
self.instances().put(&instance_name, instance_state);
self.modified_instances().put(&instance_name, modified_info);
Ok(())
}
fn resolve_ongoing_migration(
&mut self,
instance_name: &str,
outcome: MigrationOutcome,
) -> Result<(), ExecutionError> {
let instance_state = self.instances().get(instance_name).ok_or_else(|| {
let msg = format!(
"Cannot {} migration for unknown service `{}`",
outcome.as_verb(),
instance_name
);
CoreError::IncorrectInstanceId.with_description(msg)
})?;
let migration = match instance_state.status {
Some(InstanceStatus::Migrating(ref migration)) if !migration.is_completed() => {
migration
}
_ => {
let msg = format!(
"Cannot {} migration for service `{}` because it has \
no ongoing migration",
outcome.as_verb(),
instance_state.spec.as_descriptor()
);
return Err(CoreError::NoMigration.with_description(msg));
}
};
let new_status = match outcome {
MigrationOutcome::Rollback => InstanceStatus::Stopped,
MigrationOutcome::Commit(hash) => {
let mut migration = migration.to_owned();
migration.completed_hash = Some(hash);
InstanceStatus::Migrating(migration)
}
};
self.add_pending_status(instance_state, new_status, Some(outcome.into()))?;
Ok(())
}
pub(super) fn add_migration_rollback(
&mut self,
instance_name: &str,
) -> Result<(), ExecutionError> {
self.resolve_ongoing_migration(instance_name, MigrationOutcome::Rollback)?;
self.local_migration_results().remove(instance_name);
Ok(())
}
pub(super) fn add_migration_commit(
&mut self,
instance_name: &str,
hash: Hash,
) -> Result<(), ExecutionError> {
self.resolve_ongoing_migration(instance_name, MigrationOutcome::Commit(hash))
}
pub(super) fn add_local_migration_result(
&mut self,
instance_name: &str,
result: MigrationStatus,
) {
self.local_migration_results().put(instance_name, result);
}
pub(crate) fn initiate_adding_service(
&mut self,
spec: InstanceSpec,
) -> Result<(), ExecutionError> {
let artifact_state = self.artifacts().get(&spec.artifact).ok_or_else(|| {
let msg = format!(
"Cannot instantiate service `{}` from unknown artifact `{}`",
spec.as_descriptor(),
spec.artifact
);
CoreError::ArtifactNotDeployed.with_description(msg)
})?;
if artifact_state.status != ArtifactStatus::Active {
let msg = format!(
"Cannot instantiate service `{}` from non-active artifact `{}` \
(artifact status: {})",
spec.as_descriptor(),
spec.artifact,
artifact_state.status
);
return Err(CoreError::ArtifactNotDeployed.with_description(msg));
}
if self.instances().contains(&spec.name) {
let msg = format!("Service with name `{}` already exists", spec.name);
return Err(CoreError::ServiceNameExists.with_description(msg));
}
let mut instance_ids = self.instance_ids();
if instance_ids.contains(&spec.id) {
let msg = format!("Service with numeric ID {} already exists", spec.id);
return Err(CoreError::ServiceIdExists.with_description(msg));
}
instance_ids.put(&spec.id, spec.name.clone());
let new_instance = InstanceState::from_raw_parts(spec, None, None, None);
self.add_pending_status(new_instance, InstanceStatus::Active, None)
.map_err(From::from)
}
pub(crate) fn initiate_simple_service_transition(
&mut self,
instance_id: InstanceId,
new_status: InstanceStatus,
) -> Result<(), ExecutionError> {
let verb = match new_status {
InstanceStatus::Stopped => "stop",
InstanceStatus::Frozen => "freeze",
_ => unreachable!(),
};
let instance_name = self.instance_ids().get(&instance_id).ok_or_else(|| {
let msg = format!("Cannot {} unknown service with ID {}", verb, instance_id);
CoreError::IncorrectInstanceId.with_description(msg)
})?;
let state = self
.instances()
.get(&instance_name)
.expect("BUG: Instance identifier exists but the corresponding instance is missing.");
let check = match new_status {
InstanceStatus::Stopped => InstanceStatus::can_be_stopped,
InstanceStatus::Frozen => InstanceStatus::can_be_frozen,
_ => unreachable!(),
};
let current_status = state.status.as_ref();
if current_status.map_or(false, check) {
self.add_pending_status(state, new_status, None)
.map_err(From::from)
} else {
let current_status =
current_status.map_or_else(|| "none".to_owned(), ToString::to_string);
let msg = format!(
"Cannot {} service `{}` because the transition is precluded by the current \
service status ({})",
verb,
state.spec.as_descriptor(),
current_status
);
Err(CoreError::InvalidServiceTransition.with_description(msg))
}
}
pub(crate) fn initiate_resuming_service(
&mut self,
instance_id: InstanceId,
) -> Result<(), ExecutionError> {
let instance_name = self.instance_ids().get(&instance_id).ok_or_else(|| {
let msg = format!("Cannot resume service with unknown ID {}", instance_id);
CoreError::IncorrectInstanceId.with_description(msg)
})?;
let mut state = self
.instances()
.get(&instance_name)
.expect("BUG: Instance identifier exists but the corresponding instance is missing.");
if *state.data_version() != state.spec.artifact.version {
let msg = format!(
"Service `{}` has data version ({}) differing from its artifact version (`{}`) \
and thus cannot be resumed",
state.spec.name,
state.data_version(),
state.spec.artifact
);
return Err(CoreError::CannotResumeService.with_description(msg));
}
let current_status = state.status.as_ref();
if current_status.map_or(false, InstanceStatus::can_be_resumed) {
state.data_version = None;
self.add_pending_status(state, InstanceStatus::Active, None)
.map_err(From::from)
} else {
let current_status =
current_status.map_or_else(|| "none".to_owned(), ToString::to_string);
let msg = format!(
"Cannot resume service `{}` because the transition is precluded by the current \
service status ({})",
state.spec.as_descriptor(),
current_status
);
Err(CoreError::InvalidServiceTransition.with_description(msg))
}
}
pub(super) fn activate_pending(&mut self) {
let mut artifacts = self.artifacts();
for artifact in &self.pending_artifacts() {
let mut state = artifacts
.get(&artifact)
.expect("Artifact marked as pending is not saved in `artifacts`");
match state.status {
ArtifactStatus::Deploying => {
state.status = ArtifactStatus::Active;
artifacts.put(&artifact, state);
}
ArtifactStatus::Unloading => {
artifacts.remove(&artifact);
}
_ => { }
}
}
let mut instances = self.instances();
for instance in self.modified_instances().keys() {
let mut state = instances
.get(&instance)
.expect("BUG: Instance marked as modified is not saved in `instances`");
if state.pending_status.is_some() {
state.commit_pending_status();
instances.put(&instance, state);
}
}
}
pub(super) fn take_pending_artifacts(&mut self) -> Vec<(ArtifactId, ArtifactAction)> {
let mut index = self.pending_artifacts();
let artifacts = self.artifacts();
let pending_artifacts = index
.iter()
.map(|artifact| {
let action = if let Some(state) = artifacts.get(&artifact) {
debug_assert_eq!(state.status, ArtifactStatus::Active);
ArtifactAction::Deploy(state.deploy_spec)
} else {
ArtifactAction::Unload
};
(artifact, action)
})
.collect();
index.clear();
pending_artifacts
}
pub(super) fn take_modified_instances(&mut self) -> Vec<(InstanceState, ModifiedInstanceInfo)> {
let mut modified_instances = self.modified_instances();
let instances = self.instances();
let output = modified_instances
.iter()
.map(|(instance_name, info)| {
let state = instances
.get(&instance_name)
.expect("BUG: Instance marked as modified is not saved in `instances`");
(state, info)
})
.collect();
modified_instances.clear();
output
}
pub(super) fn complete_migration(&mut self, instance_name: &str) -> Result<(), ExecutionError> {
let mut instance_state = self.instances().get(instance_name).ok_or_else(|| {
let msg = format!(
"Cannot complete migration for unknown service `{}`",
instance_name
);
CoreError::IncorrectInstanceId.with_description(msg)
})?;
let end_version = match instance_state.status {
Some(InstanceStatus::Migrating(ref migration)) if migration.is_completed() => {
migration.end_version.clone()
}
_ => {
let msg = format!(
"Cannot complete migration for service `{}` because it has no migration \
with committed outcome",
instance_name
);
return Err(CoreError::NoMigration.with_description(msg));
}
};
self.local_migration_results().remove(instance_name);
debug_assert!(*instance_state.data_version() < end_version);
instance_state.data_version = Some(end_version);
self.add_pending_status(instance_state, InstanceStatus::Stopped, None)
.map_err(From::from)
}
}
#[doc(hidden)]
pub fn remove_local_migration_result(fork: &Fork, service_name: &str) {
Schema::new(fork)
.local_migration_results()
.remove(service_name);
}