use std::error;
use std::fmt;
use std::sync::mpsc::Receiver;
use crate::consensus::service::Service;
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Update {
PeerConnected(PeerInfo),
PeerDisconnected(PeerId),
PeerMessage(PeerMessage, PeerId),
BlockNew(Block),
BlockValid(BlockId),
BlockInvalid(BlockId),
BlockCommit(BlockId),
Shutdown,
}
pub type BlockId = Vec<u8>;
#[derive(Clone, Default, PartialEq, Hash)]
pub struct Block {
pub block_id: BlockId,
pub previous_id: BlockId,
pub signer_id: PeerId,
pub block_num: u64,
pub payload: Vec<u8>,
pub summary: Vec<u8>,
}
impl Eq for Block {}
impl fmt::Debug for Block {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Block(block_num: {:?}, block_id: {:?}, previous_id: {:?}, signer_id: {:?}, payload: {}, summary: {})",
self.block_num,
self.block_id,
self.previous_id,
self.signer_id,
hex::encode(&self.payload),
hex::encode(&self.summary),
)
}
}
pub type PeerId = Vec<u8>;
#[derive(Default, Debug, PartialEq, Hash)]
pub struct PeerInfo {
pub peer_id: PeerId,
}
impl Eq for PeerInfo {}
#[derive(Default, Debug, Clone)]
pub struct PeerMessage {
pub header: PeerMessageHeader,
pub header_bytes: Vec<u8>,
pub header_signature: Vec<u8>,
pub content: Vec<u8>,
}
#[derive(Default, Debug, Clone)]
pub struct PeerMessageHeader {
pub signer_id: Vec<u8>,
pub content_sha512: Vec<u8>,
pub message_type: String,
pub name: String,
pub version: String,
}
pub trait Engine {
fn start(
&mut self,
updates: Receiver<Update>,
service: Box<dyn Service>,
startup_state: StartupState,
) -> Result<(), Error>;
fn version(&self) -> String;
fn name(&self) -> String;
fn additional_protocols(&self) -> Vec<(String, String)>;
}
#[derive(Debug, Default)]
pub struct StartupState {
pub chain_head: Block,
pub peers: Vec<PeerInfo>,
pub local_peer_info: PeerInfo,
}
#[derive(Debug)]
pub enum Error {
EncodingError(String),
SendError(String),
ReceiveError(String),
InvalidState(String),
UnknownBlock(String),
UnknownPeer(String),
NoChainHead,
BlockNotReady,
}
impl error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::Error::*;
match *self {
EncodingError(ref s) => write!(f, "EncodingError: {}", s),
SendError(ref s) => write!(f, "SendError: {}", s),
ReceiveError(ref s) => write!(f, "ReceiveError: {}", s),
InvalidState(ref s) => write!(f, "InvalidState: {}", s),
UnknownBlock(ref s) => write!(f, "UnknownBlock: {}", s),
UnknownPeer(ref s) => write!(f, "UnknownPeer: {}", s),
NoChainHead => write!(f, "NoChainHead"),
BlockNotReady => write!(f, "BlockNotReady"),
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::default::Default;
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use crate::consensus::service::tests::MockService;
pub struct MockEngine {
calls: Arc<Mutex<Vec<String>>>,
}
impl MockEngine {
pub fn new() -> Self {
MockEngine {
calls: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn with(amv: Arc<Mutex<Vec<String>>>) -> Self {
MockEngine { calls: amv }
}
pub fn calls(&self) -> Vec<String> {
let calls = self.calls.lock().unwrap();
let mut v = Vec::with_capacity((*calls).len());
v.clone_from(&*calls);
v
}
}
impl Engine for MockEngine {
fn start(
&mut self,
updates: Receiver<Update>,
_service: Box<dyn Service>,
_startup_state: StartupState,
) -> Result<(), Error> {
(*self.calls.lock().unwrap()).push("start".into());
loop {
match updates.recv_timeout(Duration::from_millis(100)) {
Ok(update) => {
match update {
Update::PeerConnected(_) => {
(*self.calls.lock().unwrap()).push("PeerConnected".into())
}
Update::PeerDisconnected(_) => {
(*self.calls.lock().unwrap()).push("PeerDisconnected".into())
}
Update::PeerMessage(_, _) => {
(*self.calls.lock().unwrap()).push("PeerMessage".into())
}
Update::BlockNew(_) => {
(*self.calls.lock().unwrap()).push("BlockNew".into())
}
Update::BlockValid(_) => {
(*self.calls.lock().unwrap()).push("BlockValid".into())
}
Update::BlockInvalid(_) => {
(*self.calls.lock().unwrap()).push("BlockInvalid".into())
}
Update::BlockCommit(_) => {
(*self.calls.lock().unwrap()).push("BlockCommit".into())
}
Update::Shutdown => {
println!("shutdown");
break;
}
};
}
Err(RecvTimeoutError::Disconnected) => {
println!("disconnected");
break;
}
Err(RecvTimeoutError::Timeout) => {
println!("timeout");
}
}
}
Ok(())
}
fn version(&self) -> String {
"0".into()
}
fn name(&self) -> String {
"mock".into()
}
fn additional_protocols(&self) -> Vec<(String, String)> {
vec![("1".into(), "Mock".into())]
}
}
#[test]
fn test_engine() {
let calls = Arc::new(Mutex::new(Vec::new()));
let mut mock_engine = MockEngine::with(calls.clone());
let (sender, receiver) = channel();
sender
.send(Update::PeerConnected(Default::default()))
.unwrap();
sender
.send(Update::PeerDisconnected(Default::default()))
.unwrap();
sender
.send(Update::PeerMessage(Default::default(), Default::default()))
.unwrap();
sender.send(Update::BlockNew(Default::default())).unwrap();
sender.send(Update::BlockValid(Default::default())).unwrap();
sender
.send(Update::BlockInvalid(Default::default()))
.unwrap();
sender
.send(Update::BlockCommit(Default::default()))
.unwrap();
let handle = thread::spawn(move || {
let svc = Box::new(MockService {});
mock_engine
.start(receiver, svc, Default::default())
.unwrap();
});
sender.send(Update::Shutdown).unwrap();
handle.join().unwrap();
assert!(contains(&calls, "start"));
assert!(contains(&calls, "PeerConnected"));
assert!(contains(&calls, "PeerDisconnected"));
assert!(contains(&calls, "PeerMessage"));
assert!(contains(&calls, "BlockNew"));
assert!(contains(&calls, "BlockValid"));
assert!(contains(&calls, "BlockInvalid"));
assert!(contains(&calls, "BlockCommit"));
}
fn contains(calls: &Arc<Mutex<Vec<String>>>, expected: &str) -> bool {
for call in &*(calls.lock().unwrap()) {
if expected == call {
return true;
}
}
false
}
}