#![cfg(feature = "bench")]
use dashmap::DashMap;
use freenet::transport::mock_transport::{
Channels, MockSocket, PacketDropPolicy, create_mock_peer,
};
use freenet::transport::{OutboundConnectionHandler, PeerConnection};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::info;
async fn create_connected_pair(
channels: Channels,
) -> (
PeerConnection<MockSocket>,
PeerConnection<MockSocket>,
OutboundConnectionHandler<MockSocket>,
OutboundConnectionHandler<MockSocket>,
) {
let (peer_a_pub, mut peer_a, peer_a_addr) =
create_mock_peer(PacketDropPolicy::ReceiveAll, channels.clone())
.await
.expect("create peer A");
let (peer_b_pub, mut peer_b, peer_b_addr) =
create_mock_peer(PacketDropPolicy::ReceiveAll, channels)
.await
.expect("create peer B");
let (conn_a_inner, conn_b_inner) = futures::join!(
peer_a.connect(peer_b_pub, peer_b_addr),
peer_b.connect(peer_a_pub, peer_a_addr),
);
let (conn_a, conn_b) = futures::join!(conn_a_inner, conn_b_inner);
let (conn_a, conn_b) = (conn_a.expect("connect A"), conn_b.expect("connect B"));
(conn_a, conn_b, peer_a, peer_b)
}
async fn bench_single_stream(message_size: usize, iterations: usize) -> (Duration, f64) {
let channels: Channels = Arc::new(DashMap::new());
let (mut conn_a, mut conn_b, _peer_a, _peer_b) = create_connected_pair(channels).await;
let start = Instant::now();
for _ in 0..iterations {
let msg = vec![0xABu8; message_size];
conn_a.send(msg).await.unwrap();
let _: Vec<u8> = conn_b.recv().await.unwrap();
}
let elapsed = start.elapsed();
let total_bytes = message_size * iterations;
let throughput_mbps = (total_bytes as f64 * 8.0) / elapsed.as_secs_f64() / 1_000_000.0;
(elapsed, throughput_mbps)
}
async fn bench_concurrent_streams(
num_streams: usize,
message_size: usize,
iterations: usize,
) -> (Duration, f64) {
let channels: Channels = Arc::new(DashMap::new());
let mut tasks = Vec::new();
for stream_id in 0..num_streams {
let channels_clone = channels.clone();
let task = tokio::spawn(async move {
let (mut conn_a, mut conn_b, _peer_a, _peer_b) =
create_connected_pair(channels_clone).await;
let start = Instant::now();
for _ in 0..iterations {
let msg = vec![0xABu8; message_size];
conn_a.send(msg).await.unwrap();
let _: Vec<u8> = conn_b.recv().await.unwrap();
}
let elapsed = start.elapsed();
(stream_id, elapsed)
});
tasks.push(task);
}
let start = Instant::now();
let _results = futures::future::join_all(tasks).await;
let total_elapsed = start.elapsed();
let total_bytes = message_size * iterations * num_streams;
let aggregate_mbps = (total_bytes as f64 * 8.0) / total_elapsed.as_secs_f64() / 1_000_000.0;
(total_elapsed, aggregate_mbps)
}
#[test_log::test(tokio::test(flavor = "current_thread", start_paused = true))]
async fn test_concurrent_streams_bottleneck() {
info!("=== Concurrent Streams Bottleneck Test ===");
let message_size = 1024; let iterations = 25;
info!("Benchmarking single stream...");
let (single_time, single_mbps) = bench_single_stream(message_size, iterations).await;
info!(
"Single stream: {:.2}ms, {:.2} Mbps",
single_time.as_secs_f64() * 1000.0,
single_mbps
);
info!("Benchmarking 4 concurrent streams...");
let (concurrent_time, concurrent_mbps) =
bench_concurrent_streams(4, message_size, iterations).await;
info!(
"4 concurrent streams: {:.2}ms, {:.2} Mbps (aggregate)",
concurrent_time.as_secs_f64() * 1000.0,
concurrent_mbps
);
let degradation_ratio = single_mbps / concurrent_mbps;
info!(
"Degradation: {:.2}x (single stream is {:.2}x faster than aggregate of 4 streams)",
degradation_ratio, degradation_ratio
);
if degradation_ratio > 2.0 {
info!(
"WARNING: Significant bottleneck detected ({:.2}x degradation)",
degradation_ratio
);
info!(" Expected: ~1.0x (concurrent ≈ single stream throughput)");
info!(" This indicates serialization in the packet processing pipeline.");
} else {
info!("No significant bottleneck (degradation < 2x)");
}
info!("=== Test Complete ===");
}