#![warn(
missing_debug_implementations,
missing_docs,
unsafe_code,
bare_trait_objects
)]
#![warn(clippy::pedantic, clippy::nursery)]
#![allow(
// Next `cast_*` lints don't give alternatives.
clippy::cast_possible_wrap, clippy::cast_possible_truncation, clippy::cast_sign_loss,
// Next lints produce too much noise/false positives.
clippy::module_name_repetitions, clippy::similar_names, clippy::must_use_candidate,
clippy::pub_enum_variant_names,
// '... may panic' lints.
clippy::indexing_slicing,
// Too much work to fix.
clippy::missing_errors_doc, clippy::missing_const_for_fn
)]
pub use exonum::runtime::ExecutionContext;
pub use self::{
error::Error,
runtime_api::{ArtifactProtobufSpec, ProtoSourceFile, ProtoSourcesQuery},
service::{
AfterCommitContext, Broadcaster, DefaultInstance, Service, ServiceDispatcher,
ServiceFactory,
},
stubs::{FallthroughAuth, GenericCall, GenericCallMut, Interface, MethodDescriptor, TxStub},
};
pub mod api;
pub mod spec;
use exonum::{
blockchain::{Blockchain, Schema as CoreSchema},
helpers::Height,
merkledb::Snapshot,
runtime::{
catch_panic,
migrations::{InitMigrationError, MigrateData, MigrationScript},
oneshot::Receiver,
versioning::Version,
ArtifactId, ExecutionError, ExecutionFail, InstanceDescriptor, InstanceId, InstanceSpec,
InstanceState, InstanceStatus, Mailbox, MethodId, Runtime, RuntimeFeature,
RuntimeIdentifier, WellKnownRuntime,
},
};
use exonum_api::{ApiBuilder, UpdateEndpoints};
use futures::{channel::mpsc, executor, SinkExt};
use log::trace;
use std::collections::{BTreeMap, HashMap, HashSet};
use self::api::ServiceApiBuilder;
mod error;
mod runtime_api;
mod service;
mod stubs;
#[doc(hidden)]
pub mod _reexports {
pub use exonum::runtime::{
ArtifactId, CommonError, ExecutionContext, ExecutionError, MethodId, RuntimeIdentifier,
};
}
trait FactoryWithMigrations: ServiceFactory + MigrateData {}
impl<T: ServiceFactory + MigrateData> FactoryWithMigrations for T {}
#[derive(Debug)]
struct WithoutMigrations<T>(T);
impl<T: ServiceFactory> ServiceFactory for WithoutMigrations<T> {
fn artifact_id(&self) -> ArtifactId {
self.0.artifact_id()
}
fn artifact_protobuf_spec(&self) -> ArtifactProtobufSpec {
self.0.artifact_protobuf_spec()
}
fn create_instance(&self) -> Box<dyn Service> {
self.0.create_instance()
}
}
impl<T> MigrateData for WithoutMigrations<T> {
fn migration_scripts(
&self,
_start_version: &Version,
) -> Result<Vec<MigrationScript>, InitMigrationError> {
Err(InitMigrationError::NotSupported)
}
}
#[derive(Debug)]
pub struct RustRuntime {
blockchain: Option<Blockchain>,
api_notifier: mpsc::Sender<UpdateEndpoints>,
available_artifacts: HashMap<ArtifactId, Box<dyn FactoryWithMigrations>>,
deployed_artifacts: HashSet<ArtifactId>,
started_services: BTreeMap<InstanceId, Instance>,
started_services_by_name: HashMap<String, InstanceId>,
changed_services_since_last_block: bool,
}
#[derive(Debug, Default)]
pub struct RustRuntimeBuilder {
available_artifacts: HashMap<ArtifactId, Box<dyn FactoryWithMigrations>>,
}
#[derive(Debug)]
struct Instance {
id: InstanceId,
name: String,
service: Box<dyn Service>,
artifact_id: ArtifactId,
}
impl Instance {
fn descriptor(&self) -> InstanceDescriptor {
InstanceDescriptor::new(self.id, &self.name)
}
}
impl AsRef<dyn Service> for Instance {
fn as_ref(&self) -> &dyn Service {
self.service.as_ref()
}
}
impl RustRuntimeBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_factory<S: ServiceFactory>(mut self, service_factory: S) -> Self {
let artifact = service_factory.artifact_id();
trace!(
"Added available artifact {} without migration support",
artifact
);
let service_factory = WithoutMigrations(service_factory);
self.available_artifacts
.insert(artifact, Box::new(service_factory));
self
}
pub fn with_migrating_factory<S>(mut self, service_factory: S) -> Self
where
S: ServiceFactory + MigrateData,
{
let artifact = service_factory.artifact_id();
trace!(
"Added available artifact {} with migration support",
artifact
);
self.available_artifacts
.insert(artifact, Box::new(service_factory));
self
}
pub fn build(self, api_notifier: mpsc::Sender<UpdateEndpoints>) -> RustRuntime {
RustRuntime {
blockchain: None,
api_notifier,
available_artifacts: self.available_artifacts,
deployed_artifacts: HashSet::new(),
started_services: BTreeMap::new(),
started_services_by_name: HashMap::new(),
changed_services_since_last_block: true,
}
}
pub fn build_for_tests(self) -> RustRuntime {
self.build(mpsc::channel(1).0)
}
}
impl RustRuntime {
pub const NAME: &'static str = "rust";
pub fn builder() -> RustRuntimeBuilder {
RustRuntimeBuilder::new()
}
fn assert_known_status(status: &InstanceStatus) {
match status {
InstanceStatus::Active
| InstanceStatus::Stopped
| InstanceStatus::Frozen
| InstanceStatus::Migrating(_) => (),
other => {
panic!(
"Received non-expected service status: {}; \
Rust runtime isn't prepared to process this action, \
probably Rust runtime is outdated relative to the core library",
other
);
}
}
}
fn blockchain(&self) -> &Blockchain {
self.blockchain
.as_ref()
.expect("Method called before Rust runtime is initialized")
}
fn add_started_service(&mut self, instance: Instance) {
self.started_services_by_name
.insert(instance.name.clone(), instance.id);
self.started_services.insert(instance.id, instance);
}
fn remove_started_service(&mut self, instance: &InstanceSpec) {
self.started_services_by_name.remove(&instance.name);
self.started_services.remove(&instance.id);
}
fn deploy(&mut self, artifact: &ArtifactId) -> Result<(), ExecutionError> {
if self.deployed_artifacts.contains(artifact) {
panic!(
"BUG: Core requested deploy of already deployed artifact {:?}",
artifact
);
}
if !self.available_artifacts.contains_key(artifact) {
let description = format!(
"Runtime failed to deploy artifact with id {}, \
it is not listed among available artifacts. Available artifacts: {}",
artifact,
self.artifacts_to_pretty_string()
);
return Err(Error::UnableToDeploy.with_description(description));
}
trace!("Deployed artifact: {}", artifact);
self.deployed_artifacts.insert(artifact.to_owned());
Ok(())
}
fn new_service(
&self,
artifact: &ArtifactId,
instance: &InstanceDescriptor,
) -> Result<Instance, ExecutionError> {
let factory = self.available_artifacts.get(artifact).unwrap_or_else(|| {
panic!(
"BUG: Core requested service instance start ({}) of not deployed artifact {}",
instance.name, artifact
);
});
let service = factory.create_instance();
Ok(Instance {
id: instance.id,
name: instance.name.to_owned(),
service,
artifact_id: artifact.to_owned(),
})
}
fn new_service_if_needed(
&self,
artifact: &ArtifactId,
descriptor: &InstanceDescriptor,
) -> Result<Option<Instance>, ExecutionError> {
if let Some(instance) = self.started_services.get(&descriptor.id) {
assert!(
instance.artifact_id == *artifact || artifact.is_upgrade_of(&instance.artifact_id),
"Mismatch between the requested artifact and the artifact associated \
with the running service {}. This is either a bug in the lifecycle \
workflow in the core, or this version of the Rust runtime is outdated \
compared to the core.",
descriptor
);
if instance.artifact_id == *artifact {
return Ok(None);
}
}
Some(self.new_service(artifact, descriptor)).transpose()
}
fn api_endpoints(&self) -> Vec<(String, ApiBuilder)> {
self.started_services
.values()
.map(|instance| {
let mut builder = ServiceApiBuilder::new(
self.blockchain().clone(),
instance.descriptor(),
instance.artifact_id.clone(),
);
instance.as_ref().wire_api(&mut builder);
let root_path = builder
.take_root_path()
.unwrap_or_else(|| ["services/", &instance.name].concat());
(root_path, ApiBuilder::from(builder))
})
.chain(self::runtime_api::endpoints(self))
.collect()
}
fn push_api_changes(&mut self) {
if self.changed_services_since_last_block {
let user_endpoints = self.api_endpoints();
if !self.api_notifier.is_closed() {
let send_task = self.api_notifier.send(UpdateEndpoints::new(user_endpoints));
executor::block_on(send_task).ok();
}
}
self.changed_services_since_last_block = false;
}
fn artifacts_to_pretty_string(&self) -> String {
if self.available_artifacts.is_empty() {
return "None".to_string();
}
self.available_artifacts
.keys()
.map(ToString::to_string)
.collect::<Vec<String>>()
.join(", ")
}
}
impl WellKnownRuntime for RustRuntime {
const ID: u32 = RuntimeIdentifier::Rust as u32;
}
impl Runtime for RustRuntime {
fn initialize(&mut self, blockchain: &Blockchain) {
self.blockchain = Some(blockchain.clone());
}
fn is_supported(&self, feature: &RuntimeFeature) -> bool {
match feature {
RuntimeFeature::FreezingServices => true,
_ => false,
}
}
fn on_resume(&mut self) {
self.push_api_changes();
}
fn deploy_artifact(&mut self, artifact: ArtifactId, spec: Vec<u8>) -> Receiver {
let result = if spec.is_empty() {
self.deploy(&artifact)
} else {
Err(Error::IncorrectArtifactId.into())
};
Receiver::with_result(result)
}
fn is_artifact_deployed(&self, id: &ArtifactId) -> bool {
self.deployed_artifacts.contains(id)
}
fn unload_artifact(&mut self, artifact: &ArtifactId) {
let was_present = self.deployed_artifacts.remove(artifact);
debug_assert!(
was_present,
"Requested to unload non-existing artifact `{}`",
artifact
);
}
fn initiate_adding_service(
&self,
context: ExecutionContext<'_>,
artifact: &ArtifactId,
parameters: Vec<u8>,
) -> Result<(), ExecutionError> {
let instance = self.new_service(artifact, context.instance())?;
let service = instance.as_ref();
catch_panic(|| service.initialize(context, parameters))
}
fn initiate_resuming_service(
&self,
context: ExecutionContext<'_>,
artifact: &ArtifactId,
parameters: Vec<u8>,
) -> Result<(), ExecutionError> {
let maybe_instance = self.new_service_if_needed(artifact, context.instance())?;
let service = if let Some(ref instance) = maybe_instance {
instance.as_ref()
} else {
self.started_services[&context.instance().id].as_ref()
};
catch_panic(|| service.resume(context, parameters))
}
fn update_service_status(&mut self, _snapshot: &dyn Snapshot, state: &InstanceState) {
const CANNOT_INSTANTIATE_SERVICE: &str =
"BUG: Attempt to create a new service instance failed; \
within `instantiate_adding_service` we were able to create a new instance, \
but now we are not.";
let status = state
.status
.as_ref()
.expect("Rust runtime does not support removing service status");
Self::assert_known_status(status);
let mut service_api_changed = false;
let switch_off = if status.provides_read_access() {
if let Some(artifact) = state.associated_artifact() {
let maybe_instance = self
.new_service_if_needed(artifact, &state.spec.as_descriptor())
.expect(CANNOT_INSTANTIATE_SERVICE);
if let Some(instance) = maybe_instance {
self.add_started_service(instance);
service_api_changed = true;
}
false
} else {
true
}
} else {
true
};
if switch_off {
service_api_changed = self.started_services.contains_key(&state.spec.id);
self.remove_started_service(&state.spec);
}
self.changed_services_since_last_block =
self.changed_services_since_last_block || service_api_changed;
}
fn migrate(
&self,
new_artifact: &ArtifactId,
data_version: &Version,
) -> Result<Option<MigrationScript>, InitMigrationError> {
let artifact = self
.available_artifacts
.get(new_artifact)
.unwrap_or_else(|| {
panic!(
"BUG: `migrate` call to a non-existing artifact {:?}",
new_artifact
);
});
let mut scripts = artifact.migration_scripts(data_version)?;
Ok(if scripts.is_empty() {
None
} else {
Some(scripts.swap_remove(0))
})
}
fn execute(
&self,
context: ExecutionContext<'_>,
method_id: MethodId,
payload: &[u8],
) -> Result<(), ExecutionError> {
let instance = self
.started_services
.get(&context.instance().id)
.expect("BUG: an attempt to execute transaction of unknown service.");
catch_panic(|| instance.as_ref().call(context, method_id, payload))
}
fn before_transactions(&self, context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
let instance = self
.started_services
.get(&context.instance().id)
.expect("`before_transactions` called with non-existing `instance_id`");
catch_panic(|| instance.as_ref().before_transactions(context))
}
fn after_transactions(&self, context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
let instance = self
.started_services
.get(&context.instance().id)
.expect("`after_transactions` called with non-existing `instance_id`");
catch_panic(|| instance.as_ref().after_transactions(context))
}
fn after_commit(&mut self, snapshot: &dyn Snapshot, mailbox: &mut Mailbox) {
self.push_api_changes();
let core_schema = CoreSchema::new(snapshot);
if core_schema.height() == Height(0) {
return;
}
let blockchain = self.blockchain();
let validator_id = core_schema.validator_id(blockchain.service_keypair().public_key());
for service in self.started_services.values() {
service.as_ref().after_commit(AfterCommitContext::new(
mailbox,
service.descriptor(),
snapshot,
blockchain.service_keypair(),
blockchain.sender(),
validator_id,
));
}
}
}