use std::convert::TryFrom;
use std::convert::TryInto;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use protobuf::{Message, RepeatedField};
use crate::admin::token::PeerAuthorizationTokenReader;
use crate::consensus::two_phase::v1::TwoPhaseEngine;
use crate::consensus::{
error::{ConsensusSendError, ProposalManagerError},
ConsensusMessage, ConsensusNetworkSender, PeerId, Proposal, ProposalId, ProposalManager,
ProposalUpdate,
};
use crate::consensus::{ConsensusEngine, StartupState};
use crate::error::InvalidStateError;
use crate::hex::to_hex;
use crate::peer::PeerTokenPair;
use crate::protos::admin::{AdminMessage, AdminMessage_Type, ProposedCircuit};
use crate::protos::two_phase::RequiredVerifiers;
use crate::service::ServiceError;
use super::error::AdminConsensusManagerError;
use super::shared::AdminServiceShared;
use super::{admin_service_id, sha256};
pub struct AdminConsensusManager {
consensus_msg_tx: Sender<ConsensusMessage>,
proposal_update_tx: Sender<ProposalUpdate>,
thread_handle: JoinHandle<()>,
}
impl AdminConsensusManager {
pub fn new(
service_id: String,
shared: Arc<Mutex<AdminServiceShared>>,
coordinator_timeout: Duration,
) -> Result<Self, AdminConsensusManagerError> {
let (consensus_msg_tx, consensus_msg_rx) = channel();
let (proposal_update_tx, proposal_update_rx) = channel();
let proposal_manager =
AdminProposalManager::new(proposal_update_tx.clone(), shared.clone());
let consensus_network_sender = AdminConsensusNetworkSender::new(service_id.clone(), shared);
let startup_state = StartupState {
id: service_id.as_bytes().into(),
peer_ids: vec![],
last_proposal: None,
};
let thread_handle = Builder::new()
.name(format!("consensus-{}", service_id))
.spawn(move || {
let mut two_phase_engine = TwoPhaseEngine::new(coordinator_timeout);
if let Err(err) = two_phase_engine.run(
consensus_msg_rx,
proposal_update_rx,
Box::new(consensus_network_sender),
Box::new(proposal_manager),
startup_state,
) {
error!("two phase consensus exited with an error: {}", err)
};
})
.map_err(|err| AdminConsensusManagerError(Box::new(err)))?;
Ok(AdminConsensusManager {
consensus_msg_tx,
proposal_update_tx,
thread_handle,
})
}
pub fn shutdown(self) -> Result<(), AdminConsensusManagerError> {
self.send_update(ProposalUpdate::Shutdown)?;
self.thread_handle
.join()
.unwrap_or_else(|err| error!("consensus thread failed: {:?}", err));
Ok(())
}
pub fn handle_message(&self, message_bytes: &[u8]) -> Result<(), AdminConsensusManagerError> {
let consensus_message = ConsensusMessage::try_from(message_bytes)
.map_err(|err| AdminConsensusManagerError(Box::new(err)))?;
self.consensus_msg_tx
.send(consensus_message)
.map_err(|err| AdminConsensusManagerError(Box::new(err)))?;
Ok(())
}
pub fn send_update(&self, update: ProposalUpdate) -> Result<(), AdminConsensusManagerError> {
self.proposal_update_tx
.send(update)
.map_err(|err| AdminConsensusManagerError(Box::new(err)))
}
pub fn proposal_update_sender(&self) -> Sender<ProposalUpdate> {
self.proposal_update_tx.clone()
}
}
pub struct AdminProposalManager {
proposal_update_sender: Sender<ProposalUpdate>,
shared: Arc<Mutex<AdminServiceShared>>,
}
impl AdminProposalManager {
pub fn new(
proposal_update_sender: Sender<ProposalUpdate>,
shared: Arc<Mutex<AdminServiceShared>>,
) -> Self {
AdminProposalManager {
proposal_update_sender,
shared,
}
}
}
impl ProposalManager for AdminProposalManager {
fn create_proposal(
&self,
_previous_proposal_id: Option<ProposalId>,
_consensus_data: Vec<u8>,
) -> Result<(), ProposalManagerError> {
let network_sender = self
.shared
.lock()
.map_err(|_| ServiceError::PoisonedLock("the admin state lock was poisoned".into()))?
.network_sender()
.as_ref()
.cloned()
.ok_or(ServiceError::NotStarted)?;
let mut shared = self
.shared
.lock()
.map_err(|_| ServiceError::PoisonedLock("the admin state lock was poisoned".into()))?;
if let Some(circuit_payload) = shared.pop_pending_circuit_payload() {
let (expected_hash, circuit_proposal) = shared
.propose_change(circuit_payload.clone())
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
let mut proposal = Proposal {
id: sha256(&circuit_payload)
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?
.as_bytes()
.into(),
summary: expected_hash.as_bytes().into(),
..Default::default()
};
let mut required_verifiers = RequiredVerifiers::new();
let mut verifiers = vec![];
let members = circuit_proposal.get_circuit_proposal().get_members();
for member in members {
verifiers.push(admin_service_id(member.get_node_id()).as_bytes().to_vec());
}
required_verifiers.set_verifiers(RepeatedField::from_vec(verifiers));
let required_verifiers_bytes = required_verifiers
.write_to_bytes()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
proposal.consensus_data = required_verifiers_bytes.clone();
shared.add_pending_consensus_proposal(
proposal.id.clone(),
(proposal.clone(), circuit_payload.clone()),
);
let mut proposed_circuit = ProposedCircuit::new();
proposed_circuit.set_circuit_payload(circuit_payload);
proposed_circuit.set_expected_hash(expected_hash.as_bytes().into());
proposed_circuit.set_required_verifiers(required_verifiers_bytes);
let mut msg = AdminMessage::new();
msg.set_message_type(AdminMessage_Type::PROPOSED_CIRCUIT);
msg.set_proposed_circuit(proposed_circuit);
let envelope_bytes = msg.write_to_bytes().unwrap();
let peer_node = circuit_proposal
.get_circuit_proposal()
.list_nodes()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
let local_node = circuit_proposal
.get_circuit_proposal()
.get_node_token(shared.node_id())
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?
.ok_or_else(|| {
ProposalManagerError::Internal(Box::new(InvalidStateError::with_message(
format!(
"Proposal is missing required local authorization for node {}",
shared.node_id()
),
)))
})?;
for node in peer_node {
if node.node_id != shared.node_id() {
network_sender
.send(
&admin_service_id(
&PeerTokenPair::new(node.token.clone(), local_node.clone())
.id_as_string(),
),
&envelope_bytes,
)
.unwrap();
}
}
self.proposal_update_sender
.send(ProposalUpdate::ProposalCreated(Some(proposal)))?;
} else {
self.proposal_update_sender
.send(ProposalUpdate::ProposalCreated(None))?;
shared.cleanup_held_peer_refs();
}
Ok(())
}
fn check_proposal(&self, id: &ProposalId) -> Result<(), ProposalManagerError> {
let mut shared = self
.shared
.lock()
.map_err(|_| ServiceError::PoisonedLock("the admin state lock was poisoned".into()))?;
let (proposal, circuit_payload) = shared
.pending_consensus_proposals(id)
.ok_or_else(|| ProposalManagerError::UnknownProposal(id.clone()))?
.clone();
let (hash, _) = shared
.propose_change(circuit_payload)
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
if hash.as_bytes().to_vec() != proposal.summary {
warn!(
"Hash mismatch: expected {} but was {}",
to_hex(&proposal.summary),
to_hex(hash.as_bytes())
);
self.proposal_update_sender
.send(ProposalUpdate::ProposalInvalid(id.clone()))?;
} else {
self.proposal_update_sender
.send(ProposalUpdate::ProposalValid(id.clone()))?;
}
Ok(())
}
fn accept_proposal(
&self,
id: &ProposalId,
_consensus_data: Option<Vec<u8>>,
) -> Result<(), ProposalManagerError> {
let mut shared = self
.shared
.lock()
.map_err(|_| ServiceError::PoisonedLock("the admin state lock was poisoned".into()))?;
match shared.pending_consensus_proposals(id) {
Some((proposal, _)) if &proposal.id == id => match shared.commit() {
Ok(_) => {
shared.remove_pending_consensus_proposals(id);
info!("Committed proposal {}", id);
}
Err(err) => {
self.proposal_update_sender
.send(ProposalUpdate::ProposalAcceptFailed(
id.clone(),
format!("failed to commit proposal: {}", err),
))?
}
},
_ => self
.proposal_update_sender
.send(ProposalUpdate::ProposalAcceptFailed(
id.clone(),
"not pending proposal".into(),
))?,
}
Ok(())
}
fn reject_proposal(&self, id: &ProposalId) -> Result<(), ProposalManagerError> {
let mut shared = self
.shared
.lock()
.map_err(|_| ServiceError::PoisonedLock("the admin state lock was poisoned".into()))?;
shared
.remove_pending_consensus_proposals(id)
.ok_or_else(|| ProposalManagerError::UnknownProposal(id.clone()))?;
shared
.rollback()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
info!("Rolled back proposal {}", id);
Ok(())
}
}
pub struct AdminConsensusNetworkSender {
service_id: String,
state: Arc<Mutex<AdminServiceShared>>,
}
impl AdminConsensusNetworkSender {
pub fn new(service_id: String, state: Arc<Mutex<AdminServiceShared>>) -> Self {
AdminConsensusNetworkSender { service_id, state }
}
}
impl ConsensusNetworkSender for AdminConsensusNetworkSender {
fn send_to(&self, peer_id: &PeerId, message: Vec<u8>) -> Result<(), ConsensusSendError> {
let peer_id_string = String::from_utf8(peer_id.clone().into())
.map_err(|err| ConsensusSendError::Internal(Box::new(err)))?;
let consensus_message = ConsensusMessage::new(message, self.service_id.as_bytes().into());
let mut msg = AdminMessage::new();
msg.set_message_type(AdminMessage_Type::CONSENSUS_MESSAGE);
msg.set_consensus_message(consensus_message.try_into()?);
let shared = self.state.lock().map_err(|_| {
ConsensusSendError::Internal(Box::new(ServiceError::PoisonedLock(
"the admin state lock was poisoned".into(),
)))
})?;
let network_sender = shared
.network_sender()
.clone()
.ok_or(ConsensusSendError::NotReady)?;
let service_id = shared
.token_to_peer()
.iter()
.find(|(_, node)| node.peer_node.admin_service == peer_id_string)
.map(|(token, _)| admin_service_id(&token.id_as_string()))
.unwrap_or(peer_id_string);
network_sender
.send(&service_id, msg.write_to_bytes()?.as_slice())
.map_err(|err| ConsensusSendError::Internal(Box::new(err)))?;
Ok(())
}
fn broadcast(&self, message: Vec<u8>) -> Result<(), ConsensusSendError> {
let consensus_message = ConsensusMessage::new(message, self.service_id.as_bytes().into());
let mut msg = AdminMessage::new();
msg.set_message_type(AdminMessage_Type::CONSENSUS_MESSAGE);
msg.set_consensus_message(consensus_message.try_into()?);
let shared = self.state.lock().map_err(|_| {
ConsensusSendError::Internal(Box::new(ServiceError::PoisonedLock(
"the admin state lock was poisoned".into(),
)))
})?;
let network_sender = shared
.network_sender()
.clone()
.ok_or(ConsensusSendError::NotReady)?;
for verifier in shared.current_consensus_verifiers() {
{
if !shared.is_local_node(verifier.peer_id()) {
network_sender
.send(
&admin_service_id(&verifier.id_as_string()),
msg.write_to_bytes()?.as_slice(),
)
.map_err(|err| ConsensusSendError::Internal(Box::new(err)))?;
}
}
}
Ok(())
}
}