use std::convert::{TryFrom, TryInto};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use protobuf::Message;
use splinter::consensus::{
error::{ConsensusSendError, ProposalManagerError},
two_phase::v1::TwoPhaseEngine as TwoPhaseEngineV1,
two_phase::v2::TwoPhaseEngine as TwoPhaseEngineV2,
ConsensusEngine, ConsensusMessage, ConsensusNetworkSender, PeerId, Proposal, ProposalId,
ProposalManager, ProposalUpdate, StartupState,
};
use transact::protos::IntoBytes;
use crate::protos::scabbard::{ProposedBatch, ScabbardMessage, ScabbardMessage_Type};
use super::error::{ScabbardConsensusManagerError, ScabbardError};
use super::shared::ScabbardShared;
use super::state::ScabbardState;
use super::ScabbardVersion;
pub struct ScabbardConsensusManager {
consensus_msg_tx: Sender<ConsensusMessage>,
proposal_update_tx: Sender<ProposalUpdate>,
thread_handle: JoinHandle<()>,
}
impl ScabbardConsensusManager {
pub fn new(
service_id: String,
version: ScabbardVersion,
shared: Arc<Mutex<ScabbardShared>>,
state: Arc<Mutex<ScabbardState>>,
coordinator_timeout: Duration,
) -> Result<Self, ScabbardConsensusManagerError> {
let peer_ids = shared
.lock()
.map_err(|_| ScabbardConsensusManagerError(Box::new(ScabbardError::LockPoisoned)))?
.peer_services()
.iter()
.map(|id| id.as_bytes().into())
.collect();
let (consensus_msg_tx, consensus_msg_rx) = channel();
let (proposal_update_tx, proposal_update_rx) = channel();
let proposal_manager = ScabbardProposalManager::new(
service_id.clone(),
version,
proposal_update_tx.clone(),
shared.clone(),
state,
);
let consensus_network_sender =
ScabbardConsensusNetworkSender::new(service_id.clone(), shared);
let startup_state = StartupState {
id: service_id.as_bytes().into(),
peer_ids,
last_proposal: None,
};
let thread_handle = Builder::new()
.name(format!("consensus-{}", service_id))
.spawn(move || match version {
ScabbardVersion::V1 => {
let mut two_phase_engine = TwoPhaseEngineV1::new(coordinator_timeout);
if let Err(err) = two_phase_engine.run(
consensus_msg_rx,
proposal_update_rx,
Box::new(consensus_network_sender),
Box::new(proposal_manager),
startup_state,
) {
error!("two phase consensus exited with an error: {}", err)
}
}
ScabbardVersion::V2 => {
let mut two_phase_engine = TwoPhaseEngineV2::new(coordinator_timeout);
if let Err(err) = two_phase_engine.run(
consensus_msg_rx,
proposal_update_rx,
Box::new(consensus_network_sender),
Box::new(proposal_manager),
startup_state,
) {
error!("two phase consensus exited with an error: {}", err)
}
}
})
.map_err(|err| ScabbardConsensusManagerError(Box::new(err)))?;
Ok(ScabbardConsensusManager {
consensus_msg_tx,
proposal_update_tx,
thread_handle,
})
}
pub fn shutdown(self) -> Result<(), ScabbardConsensusManagerError> {
self.send_update(ProposalUpdate::Shutdown)?;
self.thread_handle
.join()
.unwrap_or_else(|err| error!("consensus thread failed: {:?}", err));
Ok(())
}
pub fn handle_message(
&self,
message_bytes: &[u8],
) -> Result<(), ScabbardConsensusManagerError> {
let consensus_message = ConsensusMessage::try_from(message_bytes)
.map_err(|err| ScabbardConsensusManagerError(Box::new(err)))?;
self.consensus_msg_tx
.send(consensus_message)
.map_err(|err| ScabbardConsensusManagerError(Box::new(err)))?;
Ok(())
}
pub fn send_update(&self, update: ProposalUpdate) -> Result<(), ScabbardConsensusManagerError> {
self.proposal_update_tx
.send(update)
.map_err(|err| ScabbardConsensusManagerError(Box::new(err)))
}
}
pub struct ScabbardProposalManager {
service_id: String,
version: ScabbardVersion,
proposal_update_sender: Sender<ProposalUpdate>,
shared: Arc<Mutex<ScabbardShared>>,
state: Arc<Mutex<ScabbardState>>,
}
impl ScabbardProposalManager {
pub fn new(
service_id: String,
version: ScabbardVersion,
proposal_update_sender: Sender<ProposalUpdate>,
shared: Arc<Mutex<ScabbardShared>>,
state: Arc<Mutex<ScabbardState>>,
) -> Self {
ScabbardProposalManager {
service_id,
version,
proposal_update_sender,
shared,
state,
}
}
}
impl ProposalManager for ScabbardProposalManager {
fn create_proposal(
&self,
_previous_proposal_id: Option<ProposalId>,
_consensus_data: Vec<u8>,
) -> Result<(), ProposalManagerError> {
let mut shared = self
.shared
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?;
if let Some(batch) = shared
.pop_batch_from_queue()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?
{
let expected_hash = self
.state
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?
.prepare_change(batch.clone())
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
let id = match self.version {
ScabbardVersion::V1 => expected_hash.as_bytes().into(),
ScabbardVersion::V2 => batch.batch().header_signature().as_bytes().into(),
};
let proposal = Proposal {
id,
summary: expected_hash.as_bytes().into(),
..Default::default()
};
shared.add_open_proposal(proposal.clone(), batch.clone());
let mut proposed_batch = ProposedBatch::new();
proposed_batch.set_proposal(
proposal
.clone()
.try_into()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?,
);
proposed_batch.set_batch(
batch
.into_bytes()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?,
);
proposed_batch.set_service_id(self.service_id.clone());
let mut msg = ScabbardMessage::new();
msg.set_message_type(ScabbardMessage_Type::PROPOSED_BATCH);
msg.set_proposed_batch(proposed_batch);
let msg_bytes = msg
.write_to_bytes()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
let sender = shared
.network_sender()
.ok_or(ProposalManagerError::NotReady)?;
for service in shared.peer_services() {
sender
.send(service, msg_bytes.as_slice())
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
}
self.proposal_update_sender
.send(ProposalUpdate::ProposalCreated(Some(proposal)))?;
} else {
self.proposal_update_sender
.send(ProposalUpdate::ProposalCreated(None))?;
}
Ok(())
}
fn check_proposal(&self, id: &ProposalId) -> Result<(), ProposalManagerError> {
let (proposal, batch) = self
.shared
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?
.get_open_proposal(id)
.ok_or_else(|| ProposalManagerError::UnknownProposal(id.clone()))?
.clone();
let hash = self
.state
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?
.prepare_change(batch)
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
if hash.as_bytes() != proposal.summary {
warn!("Hash mismatch: expected {} but was {}", id, hash);
self.proposal_update_sender
.send(ProposalUpdate::ProposalInvalid(id.clone()))?;
} else {
self.proposal_update_sender
.send(ProposalUpdate::ProposalValid(id.clone()))?;
}
Ok(())
}
fn accept_proposal(
&self,
id: &ProposalId,
_consensus_data: Option<Vec<u8>>,
) -> Result<(), ProposalManagerError> {
let mut shared = self
.shared
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?;
shared.remove_open_proposal(id);
self.state
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?
.commit()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
self.proposal_update_sender
.send(ProposalUpdate::ProposalAccepted(id.clone()))?;
info!("Committed proposal {}", id);
Ok(())
}
fn reject_proposal(&self, id: &ProposalId) -> Result<(), ProposalManagerError> {
let mut shared = self
.shared
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?;
shared.remove_open_proposal(id);
self.state
.lock()
.map_err(|_| ProposalManagerError::Internal(Box::new(ScabbardError::LockPoisoned)))?
.rollback()
.map_err(|err| ProposalManagerError::Internal(Box::new(err)))?;
info!("Rolled back proposal {}", id);
Ok(())
}
}
pub struct ScabbardConsensusNetworkSender {
service_id: String,
shared: Arc<Mutex<ScabbardShared>>,
}
impl ScabbardConsensusNetworkSender {
pub fn new(service_id: String, shared: Arc<Mutex<ScabbardShared>>) -> Self {
ScabbardConsensusNetworkSender { service_id, shared }
}
}
impl ConsensusNetworkSender for ScabbardConsensusNetworkSender {
fn send_to(&self, peer_id: &PeerId, message: Vec<u8>) -> Result<(), ConsensusSendError> {
let peer_id_string = String::from_utf8(peer_id.clone().into())
.map_err(|err| ConsensusSendError::Internal(Box::new(err)))?;
let consensus_message = ConsensusMessage::new(message, self.service_id.as_bytes().into());
let mut msg = ScabbardMessage::new();
msg.set_message_type(ScabbardMessage_Type::CONSENSUS_MESSAGE);
msg.set_consensus_message(consensus_message.try_into()?);
let shared = self
.shared
.lock()
.map_err(|_| ConsensusSendError::Internal(Box::new(ScabbardError::LockPoisoned)))?;
if !shared.peer_services().contains(&peer_id_string) {
return Err(ConsensusSendError::UnknownPeer(peer_id.clone()));
}
let network_sender = shared
.network_sender()
.ok_or(ConsensusSendError::NotReady)?;
network_sender
.send(&peer_id_string, msg.write_to_bytes()?.as_slice())
.map_err(|err| ConsensusSendError::Internal(Box::new(err)))?;
Ok(())
}
fn broadcast(&self, message: Vec<u8>) -> Result<(), ConsensusSendError> {
let consensus_message = ConsensusMessage::new(message, self.service_id.as_bytes().into());
let mut msg = ScabbardMessage::new();
msg.set_message_type(ScabbardMessage_Type::CONSENSUS_MESSAGE);
msg.set_consensus_message(consensus_message.try_into()?);
let shared = self
.shared
.lock()
.map_err(|_| ConsensusSendError::Internal(Box::new(ScabbardError::LockPoisoned)))?;
let network_sender = shared
.network_sender()
.ok_or(ConsensusSendError::NotReady)?;
for service in shared.peer_services() {
network_sender
.send(service, msg.write_to_bytes()?.as_slice())
.map_err(|err| ConsensusSendError::Internal(Box::new(err)))?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::{HashSet, VecDeque};
use cylinder::{secp256k1::Secp256k1Context, VerifierFactory};
use splinter::service::{ServiceMessageContext, ServiceNetworkSender, ServiceSendError};
#[test]
fn network_sender() {
let service_sender = MockServiceNetworkSender::new();
let mut peer_services = HashSet::new();
peer_services.insert("svc1".to_string());
peer_services.insert("svc2".to_string());
let shared = Arc::new(Mutex::new(ScabbardShared::new(
VecDeque::new(),
Some(Box::new(service_sender.clone())),
peer_services.clone(),
"svc0".to_string(),
#[cfg(feature = "metrics")]
"vzrQS-rvwf4".to_string(),
Secp256k1Context::new().new_verifier(),
ScabbardVersion::V2,
)));
let consensus_sender = ScabbardConsensusNetworkSender::new("svc0".into(), shared);
consensus_sender
.send_to(&"svc1".as_bytes().into(), vec![0])
.expect("failed to send");
let (recipient, message) = service_sender
.sent
.lock()
.expect("sent lock poisoned")
.get(0)
.expect("1st message not sent")
.clone();
assert_eq!(recipient, "svc1".to_string());
let scabbard_message: ScabbardMessage =
Message::parse_from_bytes(&message).expect("failed to parse 1st scabbard message");
assert_eq!(
scabbard_message.get_message_type(),
ScabbardMessage_Type::CONSENSUS_MESSAGE
);
let consensus_message =
ConsensusMessage::try_from(scabbard_message.get_consensus_message())
.expect("failed to parse 1st consensus message");
assert_eq!(consensus_message.message, vec![0]);
assert_eq!(consensus_message.origin_id, "svc0".as_bytes().into());
consensus_sender.broadcast(vec![1]).expect("failed to send");
let (recipient, message) = service_sender
.sent
.lock()
.expect("sent lock poisoned")
.get(1)
.expect("2nd message not sent")
.clone();
assert!(peer_services.remove(&recipient));
let scabbard_message: ScabbardMessage =
Message::parse_from_bytes(&message).expect("failed to parse 2nd scabbard message");
assert_eq!(
scabbard_message.get_message_type(),
ScabbardMessage_Type::CONSENSUS_MESSAGE
);
let consensus_message =
ConsensusMessage::try_from(scabbard_message.get_consensus_message())
.expect("failed to parse 2nd consensus message");
assert_eq!(consensus_message.message, vec![1]);
assert_eq!(consensus_message.origin_id, "svc0".as_bytes().into());
let (recipient, message) = service_sender
.sent
.lock()
.expect("sent lock poisoned")
.get(2)
.expect("3rd message not sent")
.clone();
assert!(peer_services.remove(&recipient));
let scabbard_message: ScabbardMessage =
Message::parse_from_bytes(&message).expect("failed to parse 3rd scabbard message");
assert_eq!(
scabbard_message.get_message_type(),
ScabbardMessage_Type::CONSENSUS_MESSAGE
);
let consensus_message =
ConsensusMessage::try_from(scabbard_message.get_consensus_message())
.expect("failed to parse 3rd consensus message");
assert_eq!(consensus_message.message, vec![1]);
assert_eq!(consensus_message.origin_id, "svc0".as_bytes().into());
}
#[derive(Clone, Debug)]
pub struct MockServiceNetworkSender {
pub sent: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
pub sent_and_awaited: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
pub replied: Arc<Mutex<Vec<(ServiceMessageContext, Vec<u8>)>>>,
}
impl MockServiceNetworkSender {
pub fn new() -> Self {
MockServiceNetworkSender {
sent: Arc::new(Mutex::new(vec![])),
sent_and_awaited: Arc::new(Mutex::new(vec![])),
replied: Arc::new(Mutex::new(vec![])),
}
}
}
impl ServiceNetworkSender for MockServiceNetworkSender {
fn send(&self, recipient: &str, message: &[u8]) -> Result<(), ServiceSendError> {
self.sent
.lock()
.expect("sent lock poisoned")
.push((recipient.to_string(), message.to_vec()));
Ok(())
}
fn send_and_await(
&self,
recipient: &str,
message: &[u8],
) -> Result<Vec<u8>, ServiceSendError> {
self.sent_and_awaited
.lock()
.expect("sent_and_awaited lock poisoned")
.push((recipient.to_string(), message.to_vec()));
Ok(vec![])
}
fn reply(
&self,
message_origin: &ServiceMessageContext,
message: &[u8],
) -> Result<(), ServiceSendError> {
self.replied
.lock()
.expect("replied lock poisoned")
.push((message_origin.clone(), message.to_vec()));
Ok(())
}
fn clone_box(&self) -> Box<dyn ServiceNetworkSender> {
Box::new(self.clone())
}
fn send_with_sender(
&mut self,
_recipient: &str,
_message: &[u8],
_sender: &str,
) -> Result<(), ServiceSendError> {
Ok(())
}
}
}