use std::collections::{HashMap, HashSet, VecDeque};
use cylinder::{PublicKey, Signature, Verifier as SignatureVerifier};
use openssl::hash::{hash, MessageDigest};
use protobuf::Message;
use transact::protocol::batch::BatchPair;
use transact::protocol::transaction::{HashMethod, TransactionHeader};
use transact::protos::FromBytes;
use splinter::{
consensus::{PeerId, Proposal, ProposalId},
service::ServiceNetworkSender,
};
use crate::protos::scabbard::{ScabbardMessage, ScabbardMessage_Type};
use super::error::ScabbardError;
use super::ScabbardVersion;
const DEFAULT_PENDING_BATCH_LIMIT: usize = 30;
pub struct ScabbardShared {
batch_queue: VecDeque<BatchPair>,
network_sender: Option<Box<dyn ServiceNetworkSender>>,
peer_services: HashSet<String>,
coordinator_service_id: String,
service_id: String,
#[cfg(feature = "metrics")]
circuit_id: String,
open_proposals: HashMap<ProposalId, (Proposal, BatchPair)>,
signature_verifier: Box<dyn SignatureVerifier>,
accepting_batches: bool,
scabbard_version: ScabbardVersion,
}
impl ScabbardShared {
pub fn new(
batch_queue: VecDeque<BatchPair>,
network_sender: Option<Box<dyn ServiceNetworkSender>>,
peer_services: HashSet<String>,
service_id: String,
#[cfg(feature = "metrics")] circuit_id: String,
signature_verifier: Box<dyn SignatureVerifier>,
scabbard_version: ScabbardVersion,
) -> Self {
let coordinator_service_id = String::from_utf8(
peer_services
.iter()
.chain(std::iter::once(&service_id))
.map(|service_id| PeerId::from(service_id.as_bytes()))
.min()
.expect("There will always be at least one service (self)")
.into(),
)
.expect("String -> PeerId -> String conversion should not fail");
let scabbard_shared = ScabbardShared {
batch_queue,
network_sender,
peer_services,
coordinator_service_id,
service_id,
#[cfg(feature = "metrics")]
circuit_id,
open_proposals: HashMap::new(),
signature_verifier,
accepting_batches: true,
scabbard_version,
};
scabbard_shared.update_pending_batches(0.0);
scabbard_shared
}
pub fn is_coordinator(&self) -> bool {
self.service_id == self.coordinator_service_id
}
pub fn coordinator_service_id(&self) -> &str {
&self.coordinator_service_id
}
pub fn set_accepting_batches(&mut self, accepting: bool) {
self.accepting_batches = accepting;
}
pub fn accepting_batches(&self) -> bool {
self.accepting_batches
}
fn update_pending_batches(&self, _batches: f64) {
gauge!(
"splinter.scabbard.pending_batches",
_batches,
"service" => format!("{}::{}", self.circuit_id, self.service_id)
);
}
pub fn add_batch_to_queue(&mut self, batch: BatchPair) -> Result<(), ScabbardError> {
self.batch_queue.push_back(batch);
self.update_pending_batches(self.batch_queue.len() as f64);
if !self.is_coordinator() || self.scabbard_version == ScabbardVersion::V1 {
return Ok(());
};
if self.accepting_batches && self.batch_queue.len() >= DEFAULT_PENDING_BATCH_LIMIT {
self.set_accepting_batches(false);
let mut msg = ScabbardMessage::new();
msg.set_message_type(ScabbardMessage_Type::TOO_MANY_REQUESTS);
let msg_bytes = msg
.write_to_bytes()
.map_err(|err| ScabbardError::Internal(Box::new(err)))?;
for service in self.peer_services() {
self.network_sender()
.ok_or(ScabbardError::NotConnected)?
.send(service, msg_bytes.as_slice())
.map_err(|err| ScabbardError::Internal(Box::new(err)))?;
}
}
Ok(())
}
pub fn pop_batch_from_queue(&mut self) -> Result<Option<BatchPair>, ScabbardError> {
let batch = self.batch_queue.pop_front();
if batch.is_some() {
self.update_pending_batches(self.batch_queue.len() as f64);
}
if !self.is_coordinator() || self.scabbard_version == ScabbardVersion::V1 {
return Ok(batch);
};
if !self.accepting_batches && self.batch_queue.len() < DEFAULT_PENDING_BATCH_LIMIT / 2 {
self.set_accepting_batches(true);
let mut msg = ScabbardMessage::new();
msg.set_message_type(ScabbardMessage_Type::ACCEPTING_REQUESTS);
let msg_bytes = msg
.write_to_bytes()
.map_err(|err| ScabbardError::Internal(Box::new(err)))?;
for service in self.peer_services() {
self.network_sender()
.ok_or(ScabbardError::NotConnected)?
.send(service, msg_bytes.as_slice())
.map_err(|err| ScabbardError::Internal(Box::new(err)))?;
}
}
Ok(batch)
}
pub fn network_sender(&self) -> Option<&dyn ServiceNetworkSender> {
self.network_sender.as_deref()
}
pub fn set_network_sender(&mut self, sender: Box<dyn ServiceNetworkSender>) {
self.network_sender = Some(sender)
}
pub fn take_network_sender(&mut self) -> Option<Box<dyn ServiceNetworkSender>> {
self.network_sender.take()
}
pub fn peer_services(&self) -> &HashSet<String> {
&self.peer_services
}
pub fn add_open_proposal(&mut self, proposal: Proposal, batch: BatchPair) {
self.open_proposals
.insert(proposal.id.clone(), (proposal, batch));
}
pub fn get_open_proposal(&self, proposal_id: &ProposalId) -> Option<&(Proposal, BatchPair)> {
self.open_proposals.get(proposal_id)
}
pub fn remove_open_proposal(&mut self, proposal_id: &ProposalId) {
self.open_proposals.remove(proposal_id);
}
pub fn verify_batches(&self, batches: &[BatchPair]) -> Result<bool, ScabbardError> {
for batch in batches {
let batch_pub_key = batch.header().signer_public_key();
if !self
.signature_verifier
.verify(
batch.batch().header(),
&Signature::from_hex(batch.batch().header_signature())
.map_err(|err| ScabbardError::BatchVerificationFailed(Box::new(err)))?,
&PublicKey::new(batch_pub_key.to_vec()),
)
.map_err(|err| ScabbardError::BatchVerificationFailed(Box::new(err)))?
{
warn!(
"Batch failed signature verification: {}",
batch.batch().header_signature()
);
return Ok(false);
}
if batch.header().transaction_ids().len() != batch.batch().transactions().len() {
warn!(
"Number of transactions in batch header does not match number of transactions
in batch: {}",
batch.batch().header_signature(),
);
return Ok(false);
}
for (i, txn) in batch.batch().transactions().iter().enumerate() {
let header = TransactionHeader::from_bytes(txn.header())
.map_err(|err| ScabbardError::BatchVerificationFailed(Box::new(err)))?;
if txn.header_signature() != batch.header().transaction_ids()[i] {
warn!(
"Transaction at index {} does not match corresponding transaction ID in
batch header: {}",
i,
batch.batch().header_signature(),
);
return Ok(false);
}
if header.batcher_public_key() != batch_pub_key {
warn!(
"Transaction batcher public key does not match batch signer public key -
txn: {}, batch: {}",
txn.header_signature(),
batch.batch().header_signature(),
);
return Ok(false);
}
if !self
.signature_verifier
.verify(
txn.header(),
&Signature::from_hex(txn.header_signature())
.map_err(|err| ScabbardError::BatchVerificationFailed(Box::new(err)))?,
&PublicKey::new(header.signer_public_key().to_vec()),
)
.map_err(|err| ScabbardError::BatchVerificationFailed(Box::new(err)))?
{
warn!(
"Transaction failed signature verification - txn: {}, batch: {}",
txn.header_signature(),
batch.batch().header_signature()
);
return Ok(false);
}
if !match header.payload_hash_method() {
HashMethod::Sha512 => {
let expected_hash = hash(MessageDigest::sha512(), txn.payload())
.map_err(|err| ScabbardError::BatchVerificationFailed(Box::new(err)))?;
header.payload_hash() == expected_hash.as_ref()
}
} {
warn!(
"Transaction payload hash doesn't match payload - txn: {}, batch: {}",
txn.header_signature(),
batch.batch().header_signature()
);
return Ok(false);
}
}
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use cylinder::{secp256k1::Secp256k1Context, VerifierFactory};
use splinter::service::{ServiceMessageContext, ServiceSendError};
#[test]
fn coordinator() {
let context = Secp256k1Context::new();
let mut peer_services = HashSet::new();
peer_services.insert("svc1".to_string());
peer_services.insert("svc2".to_string());
let coordinator_shared = ScabbardShared::new(
VecDeque::new(),
Some(Box::new(MockServiceNetworkSender)),
peer_services.clone(),
"svc0".to_string(),
#[cfg(feature = "metrics")]
"vzrQS-rvwf4".to_string(),
context.new_verifier(),
ScabbardVersion::V2,
);
assert!(coordinator_shared.is_coordinator());
assert_eq!(coordinator_shared.coordinator_service_id(), "svc0");
let non_coordinator_shared = ScabbardShared::new(
VecDeque::new(),
Some(Box::new(MockServiceNetworkSender)),
peer_services,
"svc3".to_string(),
#[cfg(feature = "metrics")]
"vzrQS-rvwf4".to_string(),
context.new_verifier(),
ScabbardVersion::V2,
);
assert!(!non_coordinator_shared.is_coordinator());
assert_eq!(non_coordinator_shared.coordinator_service_id(), "svc1");
}
#[derive(Clone, Debug)]
pub struct MockServiceNetworkSender;
impl ServiceNetworkSender for MockServiceNetworkSender {
fn send(&self, _recipient: &str, _message: &[u8]) -> Result<(), ServiceSendError> {
unimplemented!()
}
fn send_and_await(
&self,
_recipient: &str,
_message: &[u8],
) -> Result<Vec<u8>, ServiceSendError> {
unimplemented!()
}
fn reply(
&self,
_message_origin: &ServiceMessageContext,
_message: &[u8],
) -> Result<(), ServiceSendError> {
unimplemented!()
}
fn clone_box(&self) -> Box<dyn ServiceNetworkSender> {
Box::new(self.clone())
}
fn send_with_sender(
&mut self,
_recipient: &str,
_message: &[u8],
_sender: &str,
) -> Result<(), ServiceSendError> {
Ok(())
}
}
}