#![warn(missing_debug_implementations, missing_docs)]
#![deny(unsafe_code, bare_trait_objects)]
pub use exonum::runtime::ExecutionContext;
pub use self::{
error::Error,
runtime_api::{ArtifactProtobufSpec, ProtoSourceFile, ProtoSourcesQuery},
service::{
AfterCommitContext, Broadcaster, DefaultInstance, Service, ServiceDispatcher,
ServiceFactory,
},
stubs::{FallthroughAuth, GenericCall, GenericCallMut, Interface, MethodDescriptor, TxStub},
};
pub mod api;
use exonum::{
blockchain::{Blockchain, Schema as CoreSchema},
helpers::Height,
merkledb::Snapshot,
runtime::{
catch_panic,
migrations::{InitMigrationError, MigrateData, MigrationScript},
versioning::Version,
ArtifactId, ExecutionError, ExecutionFail, InstanceDescriptor, InstanceId, InstanceSpec,
InstanceStatus, Mailbox, MethodId, Runtime, RuntimeIdentifier, WellKnownRuntime,
},
};
use exonum_api::{ApiBuilder, UpdateEndpoints};
use futures::{future, sync::mpsc, Future, IntoFuture, Sink};
use log::trace;
use std::collections::{BTreeMap, HashMap, HashSet};
use self::api::ServiceApiBuilder;
mod error;
mod runtime_api;
mod service;
mod stubs;
#[doc(hidden)]
pub mod _reexports {
pub use exonum::runtime::{
ArtifactId, CommonError, ExecutionContext, ExecutionError, MethodId, RuntimeIdentifier,
};
}
trait FactoryWithMigrations: ServiceFactory + MigrateData {}
impl<T: ServiceFactory + MigrateData> FactoryWithMigrations for T {}
#[derive(Debug)]
struct WithoutMigrations<T>(T);
impl<T: ServiceFactory> ServiceFactory for WithoutMigrations<T> {
fn artifact_id(&self) -> ArtifactId {
self.0.artifact_id()
}
fn artifact_protobuf_spec(&self) -> ArtifactProtobufSpec {
self.0.artifact_protobuf_spec()
}
fn create_instance(&self) -> Box<dyn Service> {
self.0.create_instance()
}
}
impl<T> MigrateData for WithoutMigrations<T> {
fn migration_scripts(
&self,
_start_version: &Version,
) -> Result<Vec<MigrationScript>, InitMigrationError> {
Err(InitMigrationError::NotSupported)
}
}
#[derive(Debug)]
pub struct RustRuntime {
blockchain: Option<Blockchain>,
api_notifier: mpsc::Sender<UpdateEndpoints>,
available_artifacts: HashMap<ArtifactId, Box<dyn FactoryWithMigrations>>,
deployed_artifacts: HashSet<ArtifactId>,
started_services: BTreeMap<InstanceId, Instance>,
started_services_by_name: HashMap<String, InstanceId>,
changed_services_since_last_block: bool,
}
#[derive(Debug, Default)]
pub struct RustRuntimeBuilder {
available_artifacts: HashMap<ArtifactId, Box<dyn FactoryWithMigrations>>,
}
#[derive(Debug)]
struct Instance {
id: InstanceId,
name: String,
service: Box<dyn Service>,
}
impl Instance {
fn new(id: InstanceId, name: String, service: Box<dyn Service>) -> Self {
Self { id, name, service }
}
fn descriptor(&self) -> InstanceDescriptor {
InstanceDescriptor::new(self.id, &self.name)
}
}
impl AsRef<dyn Service> for Instance {
fn as_ref(&self) -> &dyn Service {
self.service.as_ref()
}
}
impl RustRuntimeBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_factory<S: ServiceFactory>(mut self, service_factory: S) -> Self {
let artifact = service_factory.artifact_id();
trace!(
"Added available artifact {} without migration support",
artifact
);
let service_factory = WithoutMigrations(service_factory);
self.available_artifacts
.insert(artifact, Box::new(service_factory));
self
}
pub fn with_migrating_factory<S>(mut self, service_factory: S) -> Self
where
S: ServiceFactory + MigrateData,
{
let artifact = service_factory.artifact_id();
trace!(
"Added available artifact {} with migration support",
artifact
);
self.available_artifacts
.insert(artifact, Box::new(service_factory));
self
}
pub fn build(self, api_notifier: mpsc::Sender<UpdateEndpoints>) -> RustRuntime {
RustRuntime {
blockchain: None,
api_notifier,
available_artifacts: self.available_artifacts,
deployed_artifacts: Default::default(),
started_services: Default::default(),
started_services_by_name: Default::default(),
changed_services_since_last_block: false,
}
}
pub fn build_for_tests(self) -> RustRuntime {
self.build(mpsc::channel(1).0)
}
}
impl RustRuntime {
pub const NAME: &'static str = "rust";
pub fn builder() -> RustRuntimeBuilder {
RustRuntimeBuilder::new()
}
fn blockchain(&self) -> &Blockchain {
self.blockchain
.as_ref()
.expect("Method called before Rust runtime is initialized")
}
fn add_started_service(&mut self, instance: Instance) {
self.started_services_by_name
.insert(instance.name.clone(), instance.id);
self.started_services.insert(instance.id, instance);
}
fn remove_started_service(&mut self, instance: &InstanceSpec) {
self.started_services_by_name.remove(&instance.name);
self.started_services.remove(&instance.id);
}
fn deploy(&mut self, artifact: &ArtifactId) -> Result<(), ExecutionError> {
if self.deployed_artifacts.contains(&artifact) {
panic!(
"BUG: Core requested deploy of already deployed artifact {:?}",
artifact
);
}
if !self.available_artifacts.contains_key(&artifact) {
let description = format!(
"Runtime failed to deploy artifact with id {}, \
it is not listed among available artifacts. Available artifacts: {}",
artifact,
self.artifacts_to_pretty_string()
);
return Err(Error::UnableToDeploy.with_description(description));
}
trace!("Deployed artifact: {}", artifact);
self.deployed_artifacts.insert(artifact.to_owned());
Ok(())
}
fn new_service(
&self,
artifact: &ArtifactId,
instance: &InstanceDescriptor,
) -> Result<Instance, ExecutionError> {
if !self.deployed_artifacts.contains(artifact) {
panic!(
"BUG: Core requested service instance start ({}) of not deployed artifact {}",
instance.name, artifact
);
}
if self.started_services.contains_key(&instance.id) {
panic!(
"BUG: Core requested service service instance start ({}) with already taken ID",
instance
);
}
if self.started_services_by_name.contains_key(&instance.name) {
panic!(
"BUG: Core requested service service instance start ({}) with already taken name",
instance
);
}
let service = self.available_artifacts[artifact].create_instance();
Ok(Instance::new(
instance.id,
instance.name.to_owned(),
service,
))
}
fn api_endpoints(&self) -> Vec<(String, ApiBuilder)> {
self.started_services
.values()
.map(|instance| {
let mut builder = ServiceApiBuilder::new(
self.blockchain().clone(),
InstanceDescriptor::new(instance.id, &instance.name),
);
instance.as_ref().wire_api(&mut builder);
let root_path = builder
.take_root_path()
.unwrap_or_else(|| ["services/", &instance.name].concat());
(root_path, ApiBuilder::from(builder))
})
.chain(self::runtime_api::endpoints(self))
.collect()
}
fn push_api_changes(&mut self) {
if self.changed_services_since_last_block {
let user_endpoints = self.api_endpoints();
if !self.api_notifier.is_closed() {
self.api_notifier
.clone()
.send(UpdateEndpoints {
endpoints: user_endpoints,
})
.wait()
.ok();
}
}
self.changed_services_since_last_block = false;
}
fn artifacts_to_pretty_string(&self) -> String {
if self.available_artifacts.is_empty() {
return "None".to_string();
}
self.available_artifacts
.keys()
.map(ToString::to_string)
.collect::<Vec<String>>()
.join(", ")
}
}
impl WellKnownRuntime for RustRuntime {
const ID: u32 = RuntimeIdentifier::Rust as u32;
}
impl Runtime for RustRuntime {
fn initialize(&mut self, blockchain: &Blockchain) {
self.blockchain = Some(blockchain.clone());
}
fn on_resume(&mut self) {
self.push_api_changes();
}
fn deploy_artifact(
&mut self,
artifact: ArtifactId,
spec: Vec<u8>,
) -> Box<dyn Future<Item = (), Error = ExecutionError>> {
if !spec.is_empty() {
Box::new(future::err(Error::IncorrectArtifactId.into()))
} else {
Box::new(self.deploy(&artifact).into_future())
}
}
fn is_artifact_deployed(&self, id: &ArtifactId) -> bool {
self.deployed_artifacts.contains(id)
}
fn initiate_adding_service(
&self,
context: ExecutionContext<'_>,
artifact: &ArtifactId,
parameters: Vec<u8>,
) -> Result<(), ExecutionError> {
let instance = self.new_service(artifact, context.instance())?;
let service = instance.as_ref();
catch_panic(|| service.initialize(context, parameters))
}
fn initiate_resuming_service(
&self,
context: ExecutionContext<'_>,
artifact: &ArtifactId,
parameters: Vec<u8>,
) -> Result<(), ExecutionError> {
let instance = self.new_service(artifact, context.instance())?;
let service = instance.as_ref();
catch_panic(|| service.resume(context, parameters))
}
fn update_service_status(
&mut self,
_snapshot: &dyn Snapshot,
spec: &InstanceSpec,
status: &InstanceStatus,
) {
match status {
InstanceStatus::Active => {
let instance = self
.new_service(&spec.artifact, &spec.as_descriptor())
.expect(
"BUG: Attempt to create a new service instance failed; \
within `instantiate_adding_service` we were able to create a new instance, \
but now we are not.",
);
self.add_started_service(instance);
}
InstanceStatus::Stopped => {
self.remove_started_service(spec);
}
InstanceStatus::Migrating(_) => { }
other => {
panic!(
"Received non-expected service status: {}; \
Rust runtime isn't prepared to process this action, \
probably Rust runtime is outdated relative to the core library",
other
);
}
}
self.changed_services_since_last_block = true;
}
fn migrate(
&self,
new_artifact: &ArtifactId,
data_version: &Version,
) -> Result<Option<MigrationScript>, InitMigrationError> {
let artifact = self
.available_artifacts
.get(&new_artifact)
.unwrap_or_else(|| {
panic!(
"BUG: `migrate` call to a non-existing artifact {:?}",
new_artifact
);
});
let mut scripts = artifact.migration_scripts(data_version)?;
Ok(if scripts.is_empty() {
None
} else {
Some(scripts.swap_remove(0))
})
}
fn execute(
&self,
context: ExecutionContext<'_>,
method_id: MethodId,
payload: &[u8],
) -> Result<(), ExecutionError> {
let instance = self
.started_services
.get(&context.instance().id)
.expect("BUG: an attempt to execute transaction of unknown service.");
catch_panic(|| instance.as_ref().call(context, method_id, payload))
}
fn before_transactions(&self, context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
let instance = self
.started_services
.get(&context.instance().id)
.expect("`before_transactions` called with non-existing `instance_id`");
catch_panic(|| instance.as_ref().before_transactions(context))
}
fn after_transactions(&self, context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
let instance = self
.started_services
.get(&context.instance().id)
.expect("`after_transactions` called with non-existing `instance_id`");
catch_panic(|| instance.as_ref().after_transactions(context))
}
fn after_commit(&mut self, snapshot: &dyn Snapshot, mailbox: &mut Mailbox) {
self.push_api_changes();
let core_schema = CoreSchema::new(snapshot);
if core_schema.height() == Height(0) {
return;
}
let blockchain = self.blockchain();
let validator_id = core_schema.validator_id(blockchain.service_keypair().public_key());
for service in self.started_services.values() {
service.as_ref().after_commit(AfterCommitContext::new(
mailbox,
service.descriptor(),
snapshot,
blockchain.service_keypair(),
blockchain.sender(),
validator_id,
));
}
}
}