use std::cmp;
use std::collections::{BinaryHeap, HashMap};
use commands::{Command, CommandExecutor};
use commands::pool::PoolCommand;
use errors::common::CommonError;
use errors::pool::PoolError;
use super::{
MerkleTree,
RemoteNode,
};
use super::rust_base58::{FromBase58, ToBase58};
use super::types::*;
use utils::json::JsonEncodable;
enum CatchupStepResult {
Finished,
Continue,
FailedAtNode(usize),
}
enum CatchupProgress {
ShouldBeStarted,
NotNeeded,
Finished(MerkleTree),
InProgress,
}
pub struct CatchupHandler {
pub f: usize,
pub ledger_status_same: usize,
pub merkle_tree: MerkleTree,
pub target_mt_size: usize,
pub target_mt_root: Vec<u8>,
pub new_mt_vote: usize,
pub nodes: Vec<RemoteNode>,
pub initiate_cmd_id: i32,
pub is_refresh: bool,
pub pending_catchup: Option<CatchUpProcess>,
pub pool_id: i32,
pub nodes_votes: Vec<Option<(String, usize)>>,
}
impl Default for CatchupHandler {
fn default() -> Self {
CatchupHandler {
f: 0,
ledger_status_same: 0,
merkle_tree: MerkleTree::from_vec(Vec::new()).unwrap(),
nodes: Vec::new(),
target_mt_size: 0,
new_mt_vote: 0,
target_mt_root: Vec::new(),
pending_catchup: None,
initiate_cmd_id: 0,
is_refresh: false,
pool_id: 0,
nodes_votes: Vec::new(),
}
}
}
impl CatchupHandler {
pub fn process_msg(&mut self, msg: Message, raw_msg: &String, src_ind: usize) -> Result<Option<MerkleTree>, PoolError> {
let catchup_status: CatchupProgress = match msg {
Message::Pong => {
let ls: LedgerStatus = LedgerStatus {
txnSeqNo: self.nodes.len(),
merkleRoot: self.merkle_tree.root_hash().as_slice().to_base58(),
ledgerId: 0,
ppSeqNo: None,
viewNo: None,
};
let resp_msg: Message = Message::LedgerStatus(ls);
self.nodes[src_ind].send_msg(&resp_msg)?;
CatchupProgress::InProgress
}
Message::LedgerStatus(ledger_status) => {
self.nodes_votes[src_ind] = Some((ledger_status.merkleRoot, ledger_status.txnSeqNo));
self.check_nodes_responses_on_status()?
}
Message::ConsistencyProof(cons_proof) => {
self.nodes_votes[src_ind] = Some((cons_proof.newMerkleRoot, cons_proof.seqNoEnd));
self.check_nodes_responses_on_status()?
}
Message::CatchupRep(catchup) => {
self.process_catchup_rep(catchup, src_ind)?
}
_ => {
warn!("unhandled msg {:?}", msg);
CatchupProgress::InProgress
}
};
match catchup_status {
CatchupProgress::Finished(mt) => return Ok(Some(mt)),
CatchupProgress::NotNeeded => return Ok(Some(self.merkle_tree.clone())),
CatchupProgress::ShouldBeStarted => self.start_catchup()?,
CatchupProgress::InProgress => { }
}
Ok(None)
}
fn check_nodes_responses_on_status(&mut self) -> Result<CatchupProgress, PoolError> {
if self.pending_catchup.is_some() {
return Ok(CatchupProgress::InProgress)
}
let mut votes: HashMap<(String, usize), usize> = HashMap::new();
for node_vote in &self.nodes_votes {
if let &Some(ref node_vote) = node_vote {
let cnt = *votes.get(&node_vote).unwrap_or(&0) + 1;
votes.insert((node_vote.0.clone(), node_vote.1), cnt);
}
}
if let Some((most_popular_vote, votes_cnt)) = votes.iter().max_by_key(|entry| entry.1) {
if *votes_cnt == self.nodes.len() - self.f {
let &(ref target_mt_root, target_mt_size) = most_popular_vote;
let cur_mt_size = self.merkle_tree.count();
let cur_mt_hash = self.merkle_tree.root_hash().to_base58();
if target_mt_size == cur_mt_size {
if cur_mt_hash.eq(target_mt_root) {
return Ok(CatchupProgress::NotNeeded);
} else {
return Err(PoolError::CommonError(CommonError::InvalidState(
"Ledger merkle tree doesn't acceptable for current tree.".to_string())));
}
} else if target_mt_size > cur_mt_size {
self.target_mt_size = target_mt_size;
self.target_mt_root = target_mt_root.from_base58().map_err(|_|
CommonError::InvalidStructure(
"Can't parse target MerkleTree hash from nodes responses".to_string()))?;
return Ok(CatchupProgress::ShouldBeStarted);
} else {
return Err(PoolError::CommonError(CommonError::InvalidState(
"Local merkle tree greater than mt from ledger".to_string())));
}
}
}
Ok(CatchupProgress::InProgress)
}
pub fn reset_nodes_votes(&mut self) {
self.nodes_votes.clear();
self.nodes_votes.resize(self.nodes.len(), None);
}
pub fn start_catchup(&mut self) -> Result<(), PoolError> {
trace!("start_catchup");
if self.pending_catchup.is_some() {
return Err(PoolError::CommonError(
CommonError::InvalidState(
"CatchUp already started for the pool".to_string())));
}
if self.merkle_tree.count() != self.nodes.len() {
return Err(PoolError::CommonError(
CommonError::InvalidState(
"Merkle tree doesn't equal nodes count".to_string())));
}
let node_cnt = self.nodes.iter().filter(|node| !node.is_blacklisted).count();
let cnt_to_catchup = self.target_mt_size - self.merkle_tree.count();
if cnt_to_catchup <= 0 {
return Err(PoolError::CommonError(CommonError::InvalidState(
"Nothing to CatchUp, but started".to_string())));
}
self.pending_catchup = Some(CatchUpProcess {
merkle_tree: self.merkle_tree.clone(),
pending_reps: BinaryHeap::new(),
});
let portion = (cnt_to_catchup + node_cnt - 1) / node_cnt; let mut catchup_req = CatchupReq {
ledgerId: 0,
seqNoStart: node_cnt + 1,
seqNoEnd: node_cnt + 1 + portion - 1,
catchupTill: self.target_mt_size,
};
for node in &self.nodes {
if node.is_blacklisted {
continue;
}
node.send_msg(&Message::CatchupReq(catchup_req.clone()))?;
catchup_req.seqNoStart += portion;
catchup_req.seqNoEnd = cmp::min(catchup_req.seqNoStart + portion - 1,
catchup_req.catchupTill);
}
Ok(())
}
fn process_catchup_rep(&mut self, catchup: CatchupRep, node_idx: usize) -> Result<CatchupProgress, PoolError> {
trace!("append {:?}", catchup);
let catchup_finished = self.catchup_step(catchup, node_idx)?;
match catchup_finished {
CatchupStepResult::Finished => return Ok(CatchupProgress::Finished(self.finish_catchup()?)),
CatchupStepResult::Continue => { }
CatchupStepResult::FailedAtNode(failed_node_idx) => {
warn!("Fail to continue catch-up by response from node with idx {}. Node will be blacklisted and catchup will be restarted", failed_node_idx);
self.nodes[failed_node_idx].is_blacklisted = true;
self.pending_catchup = None;
self.start_catchup()?
}
}
Ok(CatchupProgress::InProgress)
}
fn catchup_step(&mut self, catchup: CatchupRep, node_idx: usize) -> Result<CatchupStepResult, PoolError> {
let mut process = self.pending_catchup.as_mut()
.ok_or(CommonError::InvalidState("Process non-existing CatchUp".to_string()))?;
process.pending_reps.push((catchup, node_idx));
while !process.pending_reps.is_empty()
&& process.pending_reps.peek().unwrap().0.min_tx() - 1 == process.merkle_tree.count() {
let (mut first_resp, node_idx) = process.pending_reps.pop().unwrap();
let mut temp_mt = process.merkle_tree.clone();
while !first_resp.txns.is_empty() {
let key = first_resp.min_tx().to_string();
if let Ok(new_gen_tx) = first_resp.txns.remove(&key).unwrap().to_json() {
trace!("append to tree {}", new_gen_tx);
temp_mt.append(new_gen_tx)?;
} else {
return Ok(CatchupStepResult::FailedAtNode(node_idx));
}
}
if CatchupHandler::check_cons_proofs(&temp_mt, &first_resp.consProof, &self.target_mt_root, self.target_mt_size)
.map_err(map_err_err!()).is_err() {
return Ok(CatchupStepResult::FailedAtNode(node_idx));
}
process.merkle_tree = temp_mt;
}
trace!("updated mt hash {}, tree {:?}", process.merkle_tree.root_hash().as_slice().to_base58(), process.merkle_tree);
if &process.merkle_tree.count() == &self.target_mt_size {
if process.merkle_tree.root_hash().ne(&self.target_mt_root) {
return Err(PoolError::CommonError(CommonError::InvalidState(
"CatchUp failed: all transactions added, proofs checked, but root hash differ with target".to_string())));
}
return Ok(CatchupStepResult::Finished);
} else {
return Ok(CatchupStepResult::Continue);
}
}
fn check_cons_proofs(mt: &MerkleTree, cons_proofs: &Vec<String>, target_mt_root: &Vec<u8>, target_mt_size: usize) -> Result<(), CommonError> {
let mut bytes_proofs: Vec<Vec<u8>> = Vec::new();
for cons_proof in cons_proofs {
let cons_proof: &String = cons_proof;
bytes_proofs.push(cons_proof.from_base58().map_err(|err|
CommonError::InvalidStructure(
format!("Can't decode node consistency proof: {}", err)))?)
}
if !mt.consistency_proof(target_mt_root, target_mt_size, &bytes_proofs)? {
return Err(CommonError::InvalidStructure("Consistency proof verification failed".to_string()));
}
Ok(())
}
fn finish_catchup(&mut self) -> Result<MerkleTree, PoolError> {
Ok(self.pending_catchup.take().
ok_or(CommonError::InvalidState("Try to finish non-existing CatchUp".to_string()))?
.merkle_tree)
}
pub fn flush_requests(&mut self, status: Result<(), PoolError>) -> Result<(), PoolError> {
let cmd = if self.is_refresh {
PoolCommand::RefreshAck(self.initiate_cmd_id, status)
} else {
PoolCommand::OpenAck(self.initiate_cmd_id, status.map(|()| self.pool_id))
};
CommandExecutor::instance()
.send(Command::Pool(cmd))
.map_err(|err|
PoolError::CommonError(
CommonError::InvalidState("Can't send ACK cmd".to_string())))
}
}