#![cfg_attr(feature = "cargo-clippy", allow(block_in_if_condition_stmt))]
#[macro_use]
extern crate exonum;
#[macro_use]
extern crate log;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate iron;
extern crate router;
extern crate bodyparser;
extern crate params;
#[macro_use]
extern crate lazy_static;
use router::Router;
use iron::Handler;
use exonum::api::Api;
use exonum::blockchain::{StoredConfiguration, Service, Transaction, Schema, ApiContext, gen_prefix};
use exonum::node::State;
use exonum::helpers::fabric::{ServiceFactory, Context};
use exonum::crypto::{Signature, PublicKey, Hash};
use exonum::messages::{Message, FromRaw, RawTransaction};
use exonum::storage::{Fork, ProofListIndex, ProofMapIndex, Snapshot, StorageValue};
use exonum::encoding::{Field, Error as StreamStructError};
use exonum::encoding::serialize::json::reexport as serde_json;
pub mod config_api;
type ProposeData = StorageValueConfigProposeData;
pub const CONFIG_SERVICE: u16 = 1;
pub const CONFIG_PROPOSE_MESSAGE_ID: u16 = 0;
pub const CONFIG_VOTE_MESSAGE_ID: u16 = 1;
lazy_static! {
#[doc="
Specific [TxConfigVote](TxConfigVote.t.html) with all bytes in message set to 0.
It is used as placeholder in database for votes of validators, which didn't cast votes."]
pub static ref ZEROVOTE: TxConfigVote = TxConfigVote::new_with_signature(&PublicKey::zero(),
&Hash::zero(), &Signature::zero());
}
encoding_struct! {
struct StorageValueConfigProposeData {
const SIZE = 48;
field tx_propose: TxConfigPropose [00 => 8]
field votes_history_hash: &Hash [8 => 40]
field num_votes: u64 [40 => 48]
}
}
impl StorageValueConfigProposeData {
pub fn set_history_hash(&mut self, hash: &Hash) {
Field::write(&hash, &mut self.raw, 8, 40);
}
}
message! {
struct TxConfigPropose {
const TYPE = CONFIG_SERVICE;
const ID = CONFIG_PROPOSE_MESSAGE_ID;
const SIZE = 40;
field from: &PublicKey [00 => 32]
field cfg: &str [32 => 40]
}
}
message! {
struct TxConfigVote {
const TYPE = CONFIG_SERVICE;
const ID = CONFIG_VOTE_MESSAGE_ID;
const SIZE = 64;
field from: &PublicKey [00 => 32]
field cfg_hash: &Hash [32 => 64]
}
}
#[derive(Default)]
pub struct ConfigurationService {}
pub struct ConfigurationSchema<T> {
view: T,
}
impl<T> ConfigurationSchema<T>
where
T: AsRef<Snapshot>,
{
pub fn new(snapshot: T) -> ConfigurationSchema<T> {
ConfigurationSchema { view: snapshot }
}
pub fn propose_data_by_config_hash(&self) -> ProofMapIndex<&T, Hash, ProposeData> {
let prefix = gen_prefix(CONFIG_SERVICE, 0, &());
ProofMapIndex::new(prefix, &self.view)
}
pub fn config_hash_by_ordinal(&self) -> ProofListIndex<&T, Hash> {
let prefix = gen_prefix(CONFIG_SERVICE, 1, &());
ProofListIndex::new(prefix, &self.view)
}
pub fn votes_by_config_hash(&self, config_hash: &Hash) -> ProofListIndex<&T, TxConfigVote> {
let prefix = gen_prefix(CONFIG_SERVICE, 2, config_hash);
ProofListIndex::new(prefix, &self.view)
}
pub fn get_propose(&self, cfg_hash: &Hash) -> Option<TxConfigPropose> {
let option_propose_data_by_config_hash = self.propose_data_by_config_hash().get(cfg_hash);
option_propose_data_by_config_hash.map(|propose_data_by_config_hash| {
propose_data_by_config_hash.tx_propose()
})
}
#[cfg_attr(feature = "cargo-clippy", allow(let_and_return))]
pub fn get_votes(&self, cfg_hash: &Hash) -> Vec<Option<TxConfigVote>> {
let votes_table = self.votes_by_config_hash(cfg_hash);
let votes = votes_table
.into_iter()
.map(|vote| if vote == ZEROVOTE.clone() {
None
} else {
Some(vote)
})
.collect();
votes
}
pub fn state_hash(&self) -> Vec<Hash> {
vec![
self.propose_data_by_config_hash().root_hash(),
self.config_hash_by_ordinal().root_hash(),
]
}
}
impl<'a> ConfigurationSchema<&'a mut Fork> {
pub fn propose_data_by_config_hash_mut(
&mut self,
) -> ProofMapIndex<&mut Fork, Hash, ProposeData> {
let prefix = gen_prefix(CONFIG_SERVICE, 0, &());
ProofMapIndex::new(prefix, &mut self.view)
}
pub fn config_hash_by_ordinal_mut(&mut self) -> ProofListIndex<&mut Fork, Hash> {
let prefix = gen_prefix(CONFIG_SERVICE, 1, &());
ProofListIndex::new(prefix, &mut self.view)
}
pub fn votes_by_config_hash_mut(
&mut self,
config_hash: &Hash,
) -> ProofListIndex<&mut Fork, TxConfigVote> {
let prefix = gen_prefix(CONFIG_SERVICE, 2, config_hash);
ProofListIndex::new(prefix, &mut self.view)
}
pub fn put_propose(&mut self, tx_propose: TxConfigPropose) -> bool {
let cfg =
<StoredConfiguration as StorageValue>::from_bytes(tx_propose.cfg().as_bytes().into());
let cfg_hash = &StorageValue::hash(&cfg);
if let Some(old_tx_propose) = self.get_propose(cfg_hash) {
error!(
"Discarding TxConfigPropose:{} which contains an already posted config. \
Previous TxConfigPropose:{}",
serde_json::to_string(&tx_propose).unwrap(),
serde_json::to_string(&old_tx_propose).unwrap()
);
return false;
}
let prev_cfg = Schema::new(&self.view)
.configs()
.get(&cfg.previous_cfg_hash)
.expect(&format!(
"Previous cfg:{:?} unexpectedly not found for TxConfigPropose:{:?}",
&cfg.previous_cfg_hash,
serde_json::to_string(&tx_propose).unwrap()
));
let propose_data_by_config_hash = {
let mut votes_table = self.votes_by_config_hash_mut(cfg_hash);
debug_assert!(votes_table.is_empty());
let num_validators = prev_cfg.validator_keys.len();
for _ in 0..num_validators {
votes_table.push(ZEROVOTE.clone());
}
StorageValueConfigProposeData::new(
tx_propose,
&votes_table.root_hash(),
num_validators as u64,
)
};
{
let mut propose_data_by_config_hash_table = self.propose_data_by_config_hash_mut();
debug_assert!(propose_data_by_config_hash_table.get(cfg_hash).is_none());
propose_data_by_config_hash_table.put(cfg_hash, propose_data_by_config_hash);
}
self.config_hash_by_ordinal_mut().push(*cfg_hash);
true
}
pub fn put_vote(&mut self, tx_vote: TxConfigVote) -> bool {
let cfg_hash = tx_vote.cfg_hash();
let mut propose_data_by_config_hash = self.propose_data_by_config_hash()
.get(cfg_hash)
.expect(&format!(
"Corresponding propose unexpectedly not found for TxConfigVote:{:?}",
&tx_vote
));
let tx_propose = propose_data_by_config_hash.tx_propose();
let prev_cfg_hash = <StoredConfiguration as StorageValue>::from_bytes(
tx_propose.cfg().as_bytes().into(),
).previous_cfg_hash;
let prev_cfg = Schema::new(&self.view)
.configs()
.get(&prev_cfg_hash)
.expect(&format!(
"Previous cfg:{:?} unexpectedly not found for TxConfigVote:{:?}",
prev_cfg_hash,
&tx_vote
));
let from: &PublicKey = tx_vote.from();
let validator_id = prev_cfg
.validator_keys
.iter()
.position(|pk| pk.service_key == *from)
.expect(&format!(
"See !prev_cfg.validators.contains(self.from()) for \
TxConfigVote:{:?}",
&tx_vote
));
let res: bool;
{
let mut votes_for_cfg_table = self.votes_by_config_hash_mut(cfg_hash);
if votes_for_cfg_table.get(validator_id as u64).unwrap() == ZEROVOTE.clone() {
votes_for_cfg_table.set(validator_id as u64, tx_vote.clone());
propose_data_by_config_hash.set_history_hash(&votes_for_cfg_table.root_hash());
res = true;
} else {
res = false;
}
}
if res {
self.propose_data_by_config_hash_mut().put(
cfg_hash,
propose_data_by_config_hash,
);
}
res
}
}
impl<T> ConfigurationSchema<T> {
pub fn into_snapshot(self) -> T {
self.view
}
}
impl Transaction for TxConfigPropose {
fn verify(&self) -> bool {
self.verify_signature(self.from())
}
fn execute(&self, fork: &mut Fork) {
let following_config: Option<StoredConfiguration> = Schema::new(&fork)
.following_configuration();
if let Some(foll_cfg) = following_config {
error!(
"Discarding TxConfigPropose: {} as there is an already scheduled next config: \
{:?} ",
serde_json::to_string(self).unwrap(),
foll_cfg
);
return;
}
let actual_config: StoredConfiguration = Schema::new(&fork).actual_configuration();
if !actual_config.validator_keys.iter().any(|k| {
k.service_key == *self.from()
})
{
error!(
"Discarding TxConfigPropose:{} from unknown validator. ",
serde_json::to_string(self).unwrap()
);
return;
}
let config_candidate = StoredConfiguration::try_deserialize(self.cfg().as_bytes());
if config_candidate.is_err() {
error!(
"Discarding TxConfigPropose:{} which contains config, which cannot be parsed: \
{:?}",
serde_json::to_string(self).unwrap(),
config_candidate
);
return;
}
let actual_config_hash = actual_config.hash();
let config_candidate_body = config_candidate.unwrap();
if config_candidate_body.previous_cfg_hash != actual_config_hash {
error!(
"Discarding TxConfigPropose:{} which does not reference actual config: {:?}",
serde_json::to_string(self).unwrap(),
actual_config
);
return;
}
let current_height = Schema::new(&fork).current_height();
let actual_from = config_candidate_body.actual_from;
if actual_from <= current_height {
error!(
"Discarding TxConfigPropose:{} which has actual_from height less than or \
equal to current: {:?}",
serde_json::to_string(self).unwrap(),
current_height
);
return;
}
let result = ConfigurationSchema::new(fork).put_propose(self.clone());
if result {
trace!(
"Put TxConfigPropose:{} to config_proposes table",
serde_json::to_string(self).unwrap()
);
}
}
}
impl Transaction for TxConfigVote {
fn verify(&self) -> bool {
self.verify_signature(self.from())
}
fn execute(&self, fork: &mut Fork) {
let propose_option = ConfigurationSchema::new(&fork).get_propose(self.cfg_hash());
if propose_option.is_none() {
error!(
"Discarding TxConfigVote:{:?} which references unknown config hash",
self
);
return;
}
let following_config: Option<StoredConfiguration> = Schema::new(&fork)
.following_configuration();
if let Some(foll_cfg) = following_config {
error!(
"Discarding TxConfigVote: {:?} as there is an already scheduled next config: \
{:?} ",
self,
foll_cfg
);
return;
}
let actual_config: StoredConfiguration = Schema::new(&fork).actual_configuration();
if !actual_config.validator_keys.iter().any(|k| {
k.service_key == *self.from()
})
{
error!(
"Discarding TxConfigVote:{:?} from unknown validator. ",
self
);
return;
}
let referenced_tx_propose = propose_option.unwrap();
let parsed_config =
StoredConfiguration::try_deserialize(referenced_tx_propose.cfg().as_bytes()).unwrap();
let actual_config_hash = actual_config.hash();
if parsed_config.previous_cfg_hash != actual_config_hash {
error!(
"Discarding TxConfigVote:{:?}, whose corresponding TxConfigPropose:{} does \
not reference actual config: {:?}",
self,
serde_json::to_string(&referenced_tx_propose).unwrap(),
actual_config
);
return;
}
let current_height = Schema::new(&fork).current_height();
let actual_from = parsed_config.actual_from;
if actual_from <= current_height {
error!(
"Discarding TxConfigVote:{:?}, whose corresponding TxConfigPropose:{} has \
actual_from height less than or equal to current: {:?}",
self,
serde_json::to_string(&referenced_tx_propose).unwrap(),
current_height
);
return;
}
let mut configuration_schema = ConfigurationSchema::new(fork);
let result = configuration_schema.put_vote(self.clone());
if !result {
return;
}
trace!(
"Put TxConfigVote:{:?} to corresponding cfg votes_by_config_hash table",
self
);
let mut votes_count = 0;
{
for vote_option in configuration_schema.get_votes(self.cfg_hash()) {
if vote_option.is_some() {
votes_count += 1;
}
}
}
let fork = configuration_schema.into_snapshot();
if votes_count >= State::byzantine_majority_count(actual_config.validator_keys.len()) {
Schema::new(fork).commit_configuration(parsed_config);
}
}
}
impl ConfigurationService {
pub fn new() -> ConfigurationService {
ConfigurationService {}
}
}
impl Service for ConfigurationService {
fn service_name(&self) -> &'static str {
"configuration"
}
fn service_id(&self) -> u16 {
CONFIG_SERVICE
}
fn state_hash(&self, snapshot: &Snapshot) -> Vec<Hash> {
let schema = ConfigurationSchema::new(snapshot);
schema.state_hash()
}
fn tx_from_raw(&self, raw: RawTransaction) -> Result<Box<Transaction>, StreamStructError> {
match raw.message_type() {
CONFIG_PROPOSE_MESSAGE_ID => Ok(Box::new(TxConfigPropose::from_raw(raw)?)),
CONFIG_VOTE_MESSAGE_ID => Ok(Box::new(TxConfigVote::from_raw(raw)?)),
_ => Err(StreamStructError::IncorrectMessageType {
message_type: raw.message_type(),
}),
}
}
fn public_api_handler(&self, ctx: &ApiContext) -> Option<Box<Handler>> {
let mut router = Router::new();
let api = config_api::PublicConfigApi { blockchain: ctx.blockchain().clone() };
api.wire(&mut router);
Some(Box::new(router))
}
fn private_api_handler(&self, ctx: &ApiContext) -> Option<Box<Handler>> {
let mut router = Router::new();
let api = config_api::PrivateConfigApi {
channel: ctx.node_channel().clone(),
config: (*ctx.public_key(), ctx.secret_key().clone()),
};
api.wire(&mut router);
Some(Box::new(router))
}
}
impl ServiceFactory for ConfigurationService {
fn make_service(_: &Context) -> Box<Service> {
Box::new(ConfigurationService::new())
}
}