use std::time::Duration;
use bincode::Options;
use futures::future::join_all;
use tokio::{spawn, sync::oneshot, time::timeout};
use crate::{
app::mock::App,
common::{Opaque, SignedMessage, SigningKey},
facade::{Invoke, Receiver},
framework::tokio::AsyncEcosystem,
protocol::pbft::message::{self, ToReplica},
simulated::Transport,
stage::Handle,
tests::TRACING,
};
use super::{Client, Replica};
#[test]
fn send_pre_prepare_message() {
*TRACING;
let signing_key = SigningKey::from_bytes(&[1; 32]).unwrap();
let pre_prepare = message::PrePrepare {
view_number: 0,
op_number: 1,
digest: Default::default(),
};
let buffer = bincode::options()
.serialize(&ToReplica::PrePrepare(SignedMessage::sign(
pre_prepare,
&signing_key,
)))
.unwrap();
let message: ToReplica = bincode::options().deserialize_from(&buffer[..]).unwrap();
if let ToReplica::PrePrepare(pre_prepare) = message {
pre_prepare.verify(&signing_key.verifying_key()).unwrap();
} else {
panic!();
}
}
fn generate_route(
replica_list: &[Handle<Replica<Transport>>],
client_list: &[Client<Transport, AsyncEcosystem>],
) {
for replica in replica_list {
replica.with_stateful(|replica| {
replica.route_table.extend(
client_list
.iter()
.map(|client| (client.id, client.get_address().clone())),
);
});
}
}
#[tokio::test(start_paused = true)]
async fn one_request() {
*TRACING;
let config = Transport::config_builder(4, 1);
let mut transport = Transport::new(config());
let replica: Vec<_> = (0..4)
.map(|i| Replica::register_new(config(), &mut transport, i, App::default(), 1, false))
.collect();
let client: Client<_, AsyncEcosystem> = Client::register_new(config(), &mut transport);
let client = [client];
generate_route(&replica, &client);
let mut client = client.into_iter().next().unwrap();
let (stop_tx, stop) = oneshot::channel();
spawn(async move { transport.deliver_until(stop).await });
assert_eq!(
timeout(Duration::from_micros(1), client.invoke(b"hello".to_vec()))
.await
.unwrap(),
b"reply: hello".to_vec()
);
stop_tx.send(()).unwrap();
}
#[tokio::test(start_paused = true)]
async fn multiple_client() {
*TRACING;
let config = Transport::config_builder(4, 1);
let mut transport = Transport::new(config());
let replica: Vec<_> = (0..4)
.map(|i| Replica::register_new(config(), &mut transport, i, App::default(), 1, false))
.collect();
let client: Vec<Client<_, AsyncEcosystem>> = (0..3)
.map(|_| Client::register_new(config(), &mut transport))
.collect();
generate_route(&replica, &client);
let client: Vec<_> = client
.into_iter()
.enumerate()
.map(|(i, mut client)| {
spawn(async move {
assert_eq!(
client.invoke(format!("client-{}", i).into()).await,
Opaque::from(format!("reply: client-{}", i))
);
})
})
.collect();
let (stop_tx, stop) = oneshot::channel();
spawn(async move { transport.deliver_until(stop).await });
timeout(Duration::from_micros(1), join_all(client))
.await
.unwrap();
stop_tx.send(()).unwrap();
}