use exonum::{merkledb::BinaryValue, messages::SignedMessage};
use futures::{channel::mpsc, prelude::*};
use tokio::{task, time::delay_for};
use std::time::{Duration, SystemTime};
use super::{InternalEvent, InternalRequest, TimeoutRequest};
use crate::messages::{ExonumMessage, Message};
#[derive(Debug)]
pub struct InternalPart {
pub internal_tx: mpsc::Sender<InternalEvent>,
pub internal_requests_rx: mpsc::Receiver<InternalRequest>,
}
impl InternalPart {
async fn send_event(mut sender: mpsc::Sender<InternalEvent>, event: InternalEvent) {
sender.send(event).await.ok();
}
async fn verify_message(raw: Vec<u8>, internal_tx: mpsc::Sender<InternalEvent>) {
let task = task::spawn_blocking(|| {
SignedMessage::from_bytes(raw.into())
.and_then(SignedMessage::into_verified::<ExonumMessage>)
.map(Message::from)
});
if let Ok(Ok(msg)) = task.await {
let event = InternalEvent::message_verified(msg);
Self::send_event(internal_tx, event).await;
}
}
pub async fn run(mut self) {
while let Some(request) = self.internal_requests_rx.next().await {
if self.internal_tx.is_closed() {
return;
}
let internal_tx = self.internal_tx.clone();
match request {
InternalRequest::VerifyMessage(raw) => {
tokio::spawn(Self::verify_message(raw, internal_tx));
}
InternalRequest::Timeout(TimeoutRequest(time, timeout)) => {
let duration = time
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::from_millis(0));
tokio::spawn(async move {
delay_for(duration).await;
Self::send_event(internal_tx, InternalEvent::timeout(timeout)).await;
});
}
InternalRequest::JumpToRound(height, round) => {
let event = InternalEvent::jump_to_round(height, round);
tokio::spawn(Self::send_event(internal_tx, event));
}
}
}
}
}
#[cfg(test)]
mod tests {
use exonum::{
crypto::{Hash, KeyPair, Signature},
helpers::Height,
messages::Verified,
};
use pretty_assertions::assert_eq;
use super::*;
use crate::messages::Status;
async fn verify_message(msg: Vec<u8>) -> Option<InternalEvent> {
let (internal_tx, mut internal_rx) = mpsc::channel(16);
let (mut internal_requests_tx, internal_requests_rx) = mpsc::channel(16);
let internal_part = InternalPart {
internal_tx,
internal_requests_rx,
};
tokio::spawn(internal_part.run());
let request = InternalRequest::VerifyMessage(msg);
internal_requests_tx.send(request).await.unwrap();
drop(internal_requests_tx); internal_rx.next().await
}
fn get_signed_message() -> SignedMessage {
let keys = KeyPair::random();
Verified::from_value(
Status::new(Height(0), Height(0), Hash::zero(), 0),
keys.public_key(),
keys.secret_key(),
)
.into_raw()
}
#[tokio::test]
async fn verify_msg() {
let tx = get_signed_message();
let expected_event =
InternalEvent::message_verified(Message::from_signed(tx.clone()).unwrap());
let event = verify_message(tx.into_bytes()).await;
assert_eq!(event, Some(expected_event));
}
#[tokio::test]
async fn verify_incorrect_msg() {
let mut tx = get_signed_message();
tx.signature = Signature::zero();
let event = verify_message(tx.into_bytes()).await;
assert_eq!(event, None);
}
}