use criterion::{
criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration,
Throughput,
};
use sc_network::{
config::{
FullNetworkConfiguration, IncomingRequest, NetworkConfiguration, NonReservedPeerMode,
NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role, SetConfig,
},
service::traits::NetworkService,
IfDisconnected, Litep2pNetworkBackend, NetworkBackend, NetworkRequest, NetworkWorker,
NotificationMetrics, NotificationService, PeerId, Roles,
};
use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT};
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Zero};
use std::{sync::Arc, time::Duration};
use substrate_test_runtime_client::runtime;
use tokio::{sync::Mutex, task::JoinHandle};
const MAX_SIZE: u64 = 2u64.pow(30);
const NUMBER_OF_REQUESTS: usize = 100;
const PAYLOAD: &[(u32, &'static str)] = &[
(6, "64B"),
(9, "512B"),
(12, "4KB"),
(15, "64KB"),
(18, "256KB"),
(21, "2MB"),
(24, "16MB"),
];
pub fn create_network_worker<B, H, N>() -> (
N,
Arc<dyn NetworkService>,
async_channel::Receiver<IncomingRequest>,
Arc<Mutex<Box<dyn NotificationService>>>,
)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let (tx, rx) = async_channel::bounded(10);
let request_response_config = N::request_response_config(
"/request-response/1".into(),
vec![],
MAX_SIZE,
MAX_SIZE,
Duration::from_secs(2),
Some(tx),
);
let role = Role::Full;
let net_conf = NetworkConfiguration::new_local();
let mut network_config = FullNetworkConfiguration::new(&net_conf, None);
network_config.add_request_response_protocol(request_response_config);
let genesis_hash = runtime::Hash::zero();
let (block_announce_config, notification_service) = N::notification_config(
"/block-announces/1".into(),
vec![],
1024,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<runtime::Block>::build(
Roles::from(&Role::Full),
Zero::zero(),
genesis_hash,
genesis_hash,
))),
SetConfig {
in_peers: 1,
out_peers: 1,
reserved_nodes: vec![],
non_reserved_mode: NonReservedPeerMode::Accept,
},
NotificationMetrics::new(None),
network_config.peer_store_handle(),
);
let worker = N::new(Params::<B, H, N> {
block_announce_config,
role,
executor: Box::new(|f| {
tokio::spawn(f);
}),
genesis_hash: runtime::Hash::zero(),
network_config,
protocol_id: ProtocolId::from("bench-request-response-protocol"),
fork_id: None,
metrics_registry: None,
ipfs_config: None,
notification_metrics: NotificationMetrics::new(None),
})
.unwrap();
let notification_service = Arc::new(Mutex::new(notification_service));
let network_service = worker.network_service();
(worker, network_service, rx, notification_service)
}
struct BenchSetup {
#[allow(dead_code)]
notification_service1: Arc<Mutex<Box<dyn NotificationService>>>,
#[allow(dead_code)]
notification_service2: Arc<Mutex<Box<dyn NotificationService>>>,
network_service1: Arc<dyn NetworkService>,
peer_id2: PeerId,
handle1: JoinHandle<()>,
handle2: JoinHandle<()>,
#[allow(dead_code)]
rx1: async_channel::Receiver<IncomingRequest>,
rx2: async_channel::Receiver<IncomingRequest>,
}
impl Drop for BenchSetup {
fn drop(&mut self) {
self.handle1.abort();
self.handle2.abort();
}
}
fn setup_workers<B, H, N>(rt: &tokio::runtime::Runtime) -> Arc<BenchSetup>
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let _guard = rt.enter();
let (worker1, network_service1, rx1, notification_service1) =
create_network_worker::<B, H, N>();
let (worker2, network_service2, rx2, notification_service2) =
create_network_worker::<B, H, N>();
let peer_id2 = worker2.network_service().local_peer_id();
let handle1 = tokio::spawn(worker1.run());
let handle2 = tokio::spawn(worker2.run());
let _ = tokio::spawn({
let rx2 = rx2.clone();
async move {
let req = rx2.recv().await.unwrap();
req.pending_response
.send(OutgoingResponse {
result: Ok(vec![0; 2usize.pow(25)]),
reputation_changes: vec![],
sent_feedback: None,
})
.unwrap();
}
});
let ready = tokio::spawn({
let network_service1 = Arc::clone(&network_service1);
async move {
let listen_address2 = {
while network_service2.listen_addresses().is_empty() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
network_service2.listen_addresses()[0].clone()
};
network_service1.add_known_address(peer_id2, listen_address2.into());
let _ = network_service1
.request(
peer_id2.into(),
"/request-response/1".into(),
vec![0; 2],
None,
IfDisconnected::TryConnect,
)
.await
.unwrap();
}
});
tokio::task::block_in_place(|| {
let _ = tokio::runtime::Handle::current().block_on(ready);
});
Arc::new(BenchSetup {
notification_service1,
notification_service2,
network_service1,
peer_id2,
handle1,
handle2,
rx1,
rx2,
})
}
async fn run_serially(setup: Arc<BenchSetup>, size: usize, limit: usize) {
let (break_tx, break_rx) = async_channel::bounded(1);
let network1 = tokio::spawn({
let network_service1 = Arc::clone(&setup.network_service1);
let peer_id2 = setup.peer_id2;
async move {
for _ in 0..limit {
let _ = network_service1
.request(
peer_id2.into(),
"/request-response/1".into(),
vec![0; 2],
None,
IfDisconnected::TryConnect,
)
.await
.unwrap();
}
let _ = break_tx.send(()).await;
}
});
let network2 = tokio::spawn({
let rx2 = setup.rx2.clone();
async move {
loop {
tokio::select! {
req = rx2.recv() => {
let IncomingRequest { pending_response, .. } = req.unwrap();
pending_response.send(OutgoingResponse {
result: Ok(vec![0; size]),
reputation_changes: vec![],
sent_feedback: None,
}).unwrap();
},
_ = break_rx.recv() => break,
}
}
}
});
let _ = tokio::join!(network1, network2);
}
#[allow(dead_code)]
async fn run_with_backpressure(setup: Arc<BenchSetup>, size: usize, limit: usize) {
let (break_tx, break_rx) = async_channel::bounded(1);
let requests = futures::future::join_all((0..limit).into_iter().map(|_| {
let (tx, rx) = futures::channel::oneshot::channel();
setup.network_service1.start_request(
setup.peer_id2.into(),
"/request-response/1".into(),
vec![0; 8],
None,
tx,
IfDisconnected::TryConnect,
);
rx
}));
let network1 = tokio::spawn(async move {
let responses = requests.await;
for res in responses {
res.unwrap().unwrap();
}
let _ = break_tx.send(()).await;
});
let network2 = tokio::spawn(async move {
for _ in 0..limit {
let IncomingRequest { pending_response, .. } = setup.rx2.recv().await.unwrap();
pending_response
.send(OutgoingResponse {
result: Ok(vec![0; size]),
reputation_changes: vec![],
sent_feedback: None,
})
.unwrap();
}
break_rx.recv().await
});
let _ = tokio::join!(network1, network2);
}
fn run_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
let mut group = c.benchmark_group("request_response_protocol");
group.plot_config(plot_config);
group.sample_size(10);
let libp2p_setup = setup_workers::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(&rt);
for &(exponent, label) in PAYLOAD.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64));
group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| {
b.to_async(&rt)
.iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_REQUESTS));
});
}
drop(libp2p_setup);
let litep2p_setup = setup_workers::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(&rt);
for &(exponent, label) in PAYLOAD.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64));
group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| {
b.to_async(&rt)
.iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_REQUESTS));
});
}
drop(litep2p_setup);
}
criterion_group!(benches, run_benchmark);
criterion_main!(benches);