#![warn(
missing_debug_implementations,
missing_docs,
unsafe_code,
bare_trait_objects
)]
#![warn(clippy::pedantic, clippy::nursery)]
#![allow(
// Next `cast_*` lints don't give alternatives.
clippy::cast_possible_wrap, clippy::cast_possible_truncation, clippy::cast_sign_loss,
// Next lints produce too much noise/false positives.
clippy::module_name_repetitions, clippy::similar_names, clippy::must_use_candidate,
clippy::pub_enum_variant_names,
// '... may panic' lints.
clippy::indexing_slicing,
// Too much work to fix.
clippy::missing_errors_doc, clippy::missing_const_for_fn
)]
pub use self::{
configure::{Configure, CONFIGURE_INTERFACE_NAME},
errors::{ArtifactError, CommonError, ConfigurationError, MigrationError, ServiceError},
event_state::AsyncEventState,
migration_state::MigrationState,
proto_structures::{
ConfigChange, ConfigProposalWithHash, ConfigPropose, ConfigVote, DeployRequest,
DeployResult, FreezeService, MigrationRequest, MigrationResult, ResumeService,
ServiceConfig, StartService, StopService, SupervisorConfig, UnloadArtifact,
},
schema::Schema,
transactions::SupervisorInterface,
};
#[doc(hidden)] pub use self::schema::SchemaImpl;
use exonum::runtime::{ExecutionContext, ExecutionError, InstanceId, SUPERVISOR_INSTANCE_ID};
use exonum_derive::*;
use exonum_merkledb::BinaryValue;
use exonum_rust_runtime::{
api::ServiceApiBuilder,
spec::{Simple, Spec},
AfterCommitContext, Service,
};
use crate::{configure::ConfigureMut, mode::Mode};
pub mod api;
pub mod mode;
mod configure;
mod errors;
mod event_state;
mod migration_state;
mod multisig;
mod proto;
mod proto_structures;
mod schema;
mod transactions;
const NOT_SUPERVISOR_MSG: &str = "`Supervisor` is installed as a non-privileged service. \
For correct operation, `Supervisor` needs to have numeric ID 0.";
fn update_configs(
context: &mut ExecutionContext<'_>,
changes: Vec<ConfigChange>,
) -> Result<(), ExecutionError> {
const NO_SERVICE: &str =
"BUG: Instance with the specified ID is absent in the dispatcher schema";
for change in changes {
match change {
ConfigChange::Consensus(config) => {
log::trace!("Updating consensus configuration {:?}", config);
context
.supervisor_extensions()
.writeable_core_schema()
.consensus_config_entry()
.set(config);
}
ConfigChange::Service(config) => {
log::trace!(
"Updating service instance configuration, instance ID is {}",
config.instance_id
);
context
.apply_config(config.instance_id, config.params.clone())
.map_err(|err| {
log::error!(
"An error occurred while applying service configuration. {}",
err
);
err
})?;
}
ConfigChange::StartService(start_service) => {
log::trace!(
"Request add service with name {} from artifact {}",
start_service.name,
start_service.artifact
);
let id = assign_instance_id(context);
let (instance_spec, config) = start_service.into_parts(id);
context
.supervisor_extensions()
.initiate_adding_service(instance_spec, config)
.map_err(|err| {
log::error!("Service start request failed. {}", err);
err
})?;
}
ConfigChange::StopService(stop_service) => {
let instance = context
.data()
.for_dispatcher()
.get_instance(stop_service.instance_id)
.expect(NO_SERVICE);
log::trace!(
"Stopping service with name {} from artifact {}",
instance.spec.name,
instance.spec.artifact
);
context
.supervisor_extensions()
.initiate_stopping_service(stop_service.instance_id)?;
}
ConfigChange::FreezeService(freeze_service) => {
let instance = context
.data()
.for_dispatcher()
.get_instance(freeze_service.instance_id)
.expect(NO_SERVICE);
log::trace!(
"Freezing service with name {} from artifact {}",
instance.spec.name,
instance.spec.artifact
);
context
.supervisor_extensions()
.initiate_freezing_service(freeze_service.instance_id)?;
}
ConfigChange::ResumeService(resume_service) => {
let instance = context
.data()
.for_dispatcher()
.get_instance(resume_service.instance_id)
.expect(NO_SERVICE);
log::trace!(
"Resuming service with name {} with artifact {}",
instance.spec.name,
instance.spec.artifact,
);
context
.supervisor_extensions()
.initiate_resuming_service(resume_service.instance_id, resume_service.params)?;
}
ConfigChange::UnloadArtifact(unload_artifact) => {
log::trace!("Unloading artifact `{}`", unload_artifact.artifact_id);
context
.supervisor_extensions()
.unload_artifact(&unload_artifact.artifact_id)?;
}
}
}
Ok(())
}
fn assign_instance_id(context: &ExecutionContext<'_>) -> InstanceId {
let mut schema = SchemaImpl::new(context.service_data());
if let Some(id) = schema.assign_instance_id() {
id
} else {
let dispatcher_schema = context.data().for_dispatcher();
let builtin_instances = dispatcher_schema.service_instances();
let new_instance_id = builtin_instances
.values()
.map(|state| state.spec.id)
.max()
.unwrap_or(SUPERVISOR_INSTANCE_ID)
+ 1;
let vacant_instance_id = new_instance_id + 1;
schema.vacant_instance_id.set(vacant_instance_id);
new_instance_id
}
}
#[derive(Debug, Default, Clone, ServiceFactory, ServiceDispatcher)]
#[service_dispatcher(implements(
"SupervisorInterface",
raw = "Configure<Params = SupervisorConfig>"
))]
#[service_factory(proto_sources = "proto", artifact_name = "exonum-supervisor")]
pub struct Supervisor;
impl Supervisor {
pub const NAME: &'static str = "supervisor";
pub fn simple_config() -> SupervisorConfig {
SupervisorConfig { mode: Mode::Simple }
}
pub fn decentralized_config() -> SupervisorConfig {
SupervisorConfig {
mode: Mode::Decentralized,
}
}
pub fn simple() -> Spec<Self, Simple> {
Self::builtin_instance(Self::simple_config())
}
pub fn decentralized() -> Spec<Self, Simple> {
Self::builtin_instance(Self::decentralized_config())
}
pub fn builtin_instance(config: SupervisorConfig) -> Spec<Self, Simple> {
Spec::new(Self).with_instance(SUPERVISOR_INSTANCE_ID, Self::NAME, config)
}
}
impl Service for Supervisor {
fn initialize(
&self,
context: ExecutionContext<'_>,
params: Vec<u8>,
) -> Result<(), ExecutionError> {
use std::borrow::Cow;
let config = SupervisorConfig::from_bytes(Cow::from(¶ms))
.map_err(|_| ConfigurationError::InvalidConfig)?;
let mut schema = SchemaImpl::new(context.service_data());
schema.public.configuration.set(config);
Ok(())
}
fn before_transactions(&self, mut context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
Self::remove_outdated_deployments(&context);
Self::remove_outdated_config_proposal(&context);
Self::flush_completed_migrations(&mut context)?;
Self::remove_outdated_migrations(&mut context)?;
Ok(())
}
fn after_transactions(&self, mut context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
let mut schema = SchemaImpl::new(context.service_data());
let configuration = schema.supervisor_config();
let core_schema = context.data().for_core();
let next_height = core_schema.next_height();
let validator_count = core_schema.consensus_config().validator_keys.len();
let entry = schema.public.pending_proposal.get();
if let Some(entry) = entry {
if entry.config_propose.actual_from == next_height {
if configuration.mode.config_approved(
&entry.propose_hash,
&schema.config_confirms,
validator_count,
) {
log::info!(
"New configuration has been accepted: {:?}",
entry.config_propose
);
schema.public.pending_proposal.remove();
drop(schema);
update_configs(&mut context, entry.config_propose.changes)?;
}
}
}
Ok(())
}
fn after_commit(&self, mut context: AfterCommitContext<'_>) {
Self::process_unconfirmed_deployments(&mut context);
Self::process_incomplete_migrations(&mut context);
}
fn wire_api(&self, builder: &mut ServiceApiBuilder) {
api::wire(builder)
}
}
impl Supervisor {
fn remove_outdated_deployments(context: &ExecutionContext<'_>) {
let mut schema = SchemaImpl::new(context.service_data());
let core_schema = context.data().for_core();
let height = core_schema.height();
let requests_to_remove = schema
.pending_deployments
.values()
.filter(|request| request.deadline_height <= height)
.collect::<Vec<_>>();
for request in requests_to_remove {
schema.pending_deployments.remove(&request.artifact);
if let Some(AsyncEventState::Pending) = schema.deploy_states.get(&request) {
schema.deploy_states.put(&request, AsyncEventState::Timeout);
}
log::trace!("Removed outdated deployment request {:?}", request);
}
}
fn remove_outdated_config_proposal(context: &ExecutionContext<'_>) {
let mut schema = SchemaImpl::new(context.service_data());
let core_schema = context.data().for_core();
let height = core_schema.height();
let entry = schema.public.pending_proposal.get();
if let Some(entry) = entry {
if entry.config_propose.actual_from <= height {
log::trace!("Removed outdated config proposal");
schema.public.pending_proposal.remove();
}
}
}
fn process_unconfirmed_deployments(context: &mut AfterCommitContext<'_>) {
let service_key = context.service_key();
let deployments: Vec<_> = {
let schema = SchemaImpl::new(context.service_data());
schema
.pending_deployments
.values()
.filter(|request| {
if let Some(AsyncEventState::Pending) = schema.deploy_states.get(request) {
!schema
.deploy_confirmations
.confirmed_by(request, &service_key)
} else {
false
}
})
.collect()
};
for unconfirmed_request in deployments {
let artifact = unconfirmed_request.artifact.clone();
let spec = unconfirmed_request.spec.clone();
let tx_sender = context.broadcaster();
let mut extensions = context.supervisor_extensions().expect(NOT_SUPERVISOR_MSG);
extensions.start_deploy(artifact, spec, move |result| {
if let Some(tx_sender) = tx_sender {
log::trace!("Sending deployment result report {:?}", unconfirmed_request);
let confirmation = DeployResult::new(unconfirmed_request, result);
if let Err(e) = tx_sender.blocking().report_deploy_result((), confirmation) {
log::error!("Cannot send `DeployResult`: {}", e);
};
}
Ok(())
});
}
}
fn flush_completed_migrations(
context: &mut ExecutionContext<'_>,
) -> Result<(), ExecutionError> {
let mut schema = SchemaImpl::new(context.service_data());
let finished_migrations = schema
.migrations_to_flush
.iter()
.map(|(_, request)| request)
.collect::<Vec<_>>();
schema.migrations_to_flush.clear();
drop(schema);
for request in finished_migrations {
context
.supervisor_extensions()
.flush_migration(request.service.as_ref())?;
log::trace!("Flushed and finished migration with request {:?}", request);
let mut schema = SchemaImpl::new(context.service_data());
let mut state = schema.migration_state_unchecked(&request);
let instance = transactions::get_instance_by_name(context, request.service.as_ref())
.expect("BUG: Migration succeed, but there is no such instance in core");
state.update(AsyncEventState::Succeed, instance.data_version().clone());
schema.migration_states.put(&request, state);
}
Ok(())
}
fn remove_outdated_migrations(
context: &mut ExecutionContext<'_>,
) -> Result<(), ExecutionError> {
let height = context.data().for_core().height();
let requests_to_remove = SchemaImpl::new(context.service_data())
.pending_migrations
.iter()
.filter_map(|(_, request)| {
if request.deadline_height <= height {
Some(request)
} else {
None
}
})
.collect::<Vec<_>>();
for request in requests_to_remove {
let mut schema = SchemaImpl::new(context.service_data());
schema.pending_migrations.remove(&request);
let mut state = schema.migration_state_unchecked(&request);
if state.is_pending() {
state.fail(AsyncEventState::Timeout);
schema.migration_states.put(&request, state);
drop(schema);
context
.supervisor_extensions()
.rollback_migration(request.service.as_ref())?;
}
log::trace!("Removed outdated migration request {:?}", request);
}
Ok(())
}
fn process_incomplete_migrations(context: &mut AfterCommitContext<'_>) {
let service_key = context.service_key();
let pending_migrations: Vec<_> = {
let schema = SchemaImpl::new(context.service_data());
schema
.pending_migrations
.iter()
.filter_map(|(_, request)| {
let state = schema.migration_state_unchecked(&request);
let confirmed_by_us = schema
.migration_confirmations
.confirmed_by(&request, &service_key);
if state.is_pending() && !confirmed_by_us {
Some(request)
} else {
None
}
})
.collect()
};
for request in pending_migrations {
let local_migration_result = context
.data()
.for_dispatcher()
.local_migration_result(request.service.as_ref());
let tx_sender = context.broadcaster();
if let Some(status) = local_migration_result {
if let Some(tx_sender) = tx_sender {
let confirmation = MigrationResult { request, status };
if let Err(e) = tx_sender
.blocking()
.report_migration_result((), confirmation)
{
log::error!("Cannot send `MigrationResult`: {}", e);
}
}
}
}
}
}
impl Configure for Supervisor {
type Params = SupervisorConfig;
fn verify_config(
&self,
_context: ExecutionContext<'_>,
_params: Self::Params,
) -> Result<(), ExecutionError> {
Ok(())
}
fn apply_config(
&self,
context: ExecutionContext<'_>,
params: Self::Params,
) -> Result<(), ExecutionError> {
let mut schema = SchemaImpl::new(context.service_data());
schema.public.configuration.set(params);
Ok(())
}
}