use exonum::{
blockchain::{config::InstanceInitParams, ApiSender, SendError},
crypto::{Hash, KeyPair, PublicKey},
helpers::{Height, ValidatorId},
merkledb::{access::Prefixed, BinaryValue, ObjectHash, Snapshot},
runtime::{
ArtifactId, BlockchainData, DispatcherAction, ExecutionContext, ExecutionError,
InstanceDescriptor, InstanceId, InstanceStatus, Mailbox, MethodId, SnapshotExt,
},
};
use futures::{
executor::block_on,
future::{BoxFuture, FutureExt},
};
use std::fmt::{self, Debug};
use super::{api::ServiceApiBuilder, ArtifactProtobufSpec, GenericCall, MethodDescriptor};
pub trait ServiceDispatcher: Send {
fn call(
&self,
context: ExecutionContext<'_>,
method: MethodId,
payload: &[u8],
) -> Result<(), ExecutionError>;
}
pub trait Service: ServiceDispatcher + Debug + 'static {
fn initialize(
&self,
_context: ExecutionContext<'_>,
_params: Vec<u8>,
) -> Result<(), ExecutionError> {
Ok(())
}
fn resume(
&self,
_context: ExecutionContext<'_>,
_params: Vec<u8>,
) -> Result<(), ExecutionError> {
Ok(())
}
fn before_transactions(&self, _context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
Ok(())
}
fn after_transactions(&self, _context: ExecutionContext<'_>) -> Result<(), ExecutionError> {
Ok(())
}
fn after_commit(&self, _context: AfterCommitContext<'_>) {}
fn wire_api(&self, _builder: &mut ServiceApiBuilder) {}
}
pub trait ServiceFactory: Send + Debug + 'static {
fn artifact_id(&self) -> ArtifactId;
fn artifact_protobuf_spec(&self) -> ArtifactProtobufSpec;
fn create_instance(&self) -> Box<dyn Service>;
}
#[allow(clippy::use_self)] impl<T> From<T> for Box<dyn ServiceFactory>
where
T: ServiceFactory,
{
fn from(factory: T) -> Self {
Box::new(factory) as Self
}
}
pub trait DefaultInstance: ServiceFactory {
const INSTANCE_ID: InstanceId;
const INSTANCE_NAME: &'static str;
fn default_instance(&self) -> InstanceInitParams {
self.artifact_id()
.into_default_instance(Self::INSTANCE_ID, Self::INSTANCE_NAME)
}
}
pub struct AfterCommitContext<'a> {
mailbox: &'a mut Mailbox,
snapshot: &'a dyn Snapshot,
broadcaster: Broadcaster,
validator_id: Option<ValidatorId>,
status: InstanceStatus,
}
impl<'a> AfterCommitContext<'a> {
pub(crate) fn new(
mailbox: &'a mut Mailbox,
instance: InstanceDescriptor,
snapshot: &'a dyn Snapshot,
service_keypair: &'a KeyPair,
tx_sender: &'a ApiSender,
validator_id: Option<ValidatorId>,
) -> Self {
let status = snapshot
.for_dispatcher()
.get_instance(instance.id)
.unwrap_or_else(|| {
panic!("BUG: Cannot find instance state for service `{}`", instance);
})
.status
.expect("BUG: status for a service receiving `after_commit` hook cannot be `None`");
Self {
mailbox,
snapshot,
validator_id,
broadcaster: Broadcaster::new(instance, service_keypair.clone(), tx_sender.clone()),
status,
}
}
pub fn data(&self) -> BlockchainData<&'a dyn Snapshot> {
BlockchainData::new(self.snapshot, &self.broadcaster.instance().name)
}
pub fn service_data(&self) -> Prefixed<&'a dyn Snapshot> {
self.data().for_executing_service()
}
pub fn height(&self) -> Height {
self.data().for_core().height()
}
pub fn service_key(&self) -> PublicKey {
self.broadcaster.service_keypair.public_key()
}
pub fn validator_id(&self) -> Option<ValidatorId> {
self.validator_id
}
pub fn status(&self) -> &InstanceStatus {
&self.status
}
pub fn broadcaster(&self) -> Option<Broadcaster> {
self.validator_id?;
if self.status.is_active() {
Some(self.broadcaster.clone())
} else {
None
}
}
pub fn generic_broadcaster(&self) -> Broadcaster {
self.broadcaster.clone()
}
#[doc(hidden)]
pub fn supervisor_extensions(&mut self) -> Option<SupervisorExtensions<'_>> {
if !is_supervisor(self.broadcaster.instance().id) {
return None;
}
Some(SupervisorExtensions {
mailbox: &mut *self.mailbox,
})
}
}
#[derive(Debug, Clone)]
pub struct Broadcaster {
instance: InstanceDescriptor,
service_keypair: KeyPair,
tx_sender: ApiSender,
}
impl Broadcaster {
pub(super) fn new(
instance: InstanceDescriptor,
service_keypair: KeyPair,
tx_sender: ApiSender,
) -> Self {
Self {
instance,
service_keypair,
tx_sender,
}
}
pub fn blocking(self) -> BlockingBroadcaster {
BlockingBroadcaster(self)
}
pub(super) fn keypair(&self) -> &KeyPair {
&self.service_keypair
}
pub(super) fn instance(&self) -> &InstanceDescriptor {
&self.instance
}
}
impl GenericCall<()> for Broadcaster {
type Output = BoxFuture<'static, Result<Hash, SendError>>;
fn generic_call(&self, _ctx: (), method: MethodDescriptor<'_>, args: Vec<u8>) -> Self::Output {
let msg = self
.service_keypair
.clone()
.generic_call(self.instance().id, method, args);
let tx_hash = msg.object_hash();
let tx_sender = self.tx_sender.clone();
async move {
tx_sender.broadcast_transaction(msg).await?;
Ok(tx_hash)
}
.boxed()
}
}
#[derive(Debug, Clone)]
pub struct BlockingBroadcaster(Broadcaster);
impl GenericCall<()> for BlockingBroadcaster {
type Output = Result<Hash, SendError>;
fn generic_call(&self, _ctx: (), method: MethodDescriptor<'_>, args: Vec<u8>) -> Self::Output {
block_on(self.0.generic_call((), method, args))
}
}
#[derive(Debug)]
pub struct SupervisorExtensions<'a> {
mailbox: &'a mut Mailbox,
}
impl SupervisorExtensions<'_> {
pub fn start_deploy(
&mut self,
artifact: ArtifactId,
spec: impl BinaryValue,
then: impl FnOnce(Result<(), ExecutionError>) -> Result<(), ExecutionError> + Send + 'static,
) {
let action = DispatcherAction::StartDeploy {
artifact,
spec: spec.into_bytes(),
then: Box::new(|res| then(res)),
};
self.mailbox.push(action);
}
}
impl Debug for AfterCommitContext<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AfterCommitContext")
.field("instance", &self.broadcaster.instance)
.finish()
}
}
fn is_supervisor(instance_id: InstanceId) -> bool {
instance_id == exonum::runtime::SUPERVISOR_INSTANCE_ID
}