pea2pea 0.29.0

A small library allowing simple and quick creation of custom P2P nodes and networks.
Documentation
use bytes::Bytes;
use once_cell::sync::Lazy;
use rand::{distributions::Standard, rngs::SmallRng, Rng, SeedableRng};

mod common;
use pea2pea::{
    protocols::{Reading, Writing},
    Config, Node, Pea2Pea,
};

use std::{convert::TryInto, io, net::SocketAddr, time::Instant};

const NUM_MESSAGES: usize = 10_000;
const MSG_SIZE: usize = 64 * 1024;

static RANDOM_BYTES: Lazy<Bytes> = Lazy::new(|| {
    Bytes::from(
        (&mut SmallRng::from_entropy())
            .sample_iter(Standard)
            .take(MSG_SIZE - 4)
            .collect::<Vec<_>>(),
    )
});

#[derive(Clone)]
struct Sink(Node);

impl Pea2Pea for Sink {
    fn node(&self) -> &Node {
        &self.0
    }
}

#[async_trait::async_trait]
impl Reading for Sink {
    type Message = ();

    fn read_message<R: io::Read>(
        &self,
        _source: SocketAddr,
        reader: &mut R,
    ) -> io::Result<Option<Self::Message>> {
        let mut buf = [0u8; MSG_SIZE];

        let payload_len = {
            if reader.read_exact(&mut buf[..4]).is_err() {
                return Ok(None);
            }
            u32::from_le_bytes(buf[..4].try_into().unwrap()) as usize
        };

        if reader.read_exact(&mut buf[..payload_len]).is_err() {
            Ok(None)
        } else {
            Ok(Some(()))
        }
    }

    async fn process_message(&self, _src: SocketAddr, _msg: Self::Message) -> io::Result<()> {
        Ok(())
    }
}

async fn run_bench_scenario(sender_count: usize) -> f64 {
    let config = Config {
        outbound_queue_depth: NUM_MESSAGES,
        ..Default::default()
    };
    let spammers = common::start_nodes(sender_count, Some(config)).await;
    let spammers = spammers
        .into_iter()
        .map(common::MessagingNode)
        .collect::<Vec<_>>();

    for spammer in &spammers {
        spammer.enable_writing();
    }

    let config = Config {
        read_buffer_size: MSG_SIZE * 3,
        max_connections: sender_count as u16,
        ..Default::default()
    };
    let sink = Sink(Node::new(Some(config)).await.unwrap());

    sink.enable_reading();

    for spammer in &spammers {
        spammer
            .node()
            .connect(sink.node().listening_addr().unwrap())
            .await
            .unwrap();
    }

    wait_until!(10, sink.node().num_connected() == sender_count);

    let sink_addr = sink.node().listening_addr().unwrap();

    let start = Instant::now();
    for spammer in spammers {
        tokio::spawn(async move {
            for _ in 0..NUM_MESSAGES {
                spammer
                    .send_direct_message(sink_addr, RANDOM_BYTES.clone())
                    .unwrap();
            }
        });
    }

    wait_until!(
        10,
        sink.node().stats().received().0 as usize == sender_count * NUM_MESSAGES
    );

    let time_elapsed = start.elapsed().as_millis();
    let bytes_received = sink.node().stats().received().1;

    (bytes_received as f64) / (time_elapsed as f64 / 1000.0)
}

#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn bench_spam_to_one() {
    let mut results = Vec::with_capacity(4);
    for sender_count in &[1, 10, 20, 50, 100] {
        let throughput = run_bench_scenario(*sender_count).await;
        println!(
            "throughput with {:>3} sender(s), 1 receiver: {}/s",
            sender_count,
            common::display_bytes(throughput)
        );
        results.push(throughput);
    }

    let avg_throughput = results.iter().sum::<f64>() / results.len() as f64;
    println!("\naverage: {}/s", common::display_bytes(avg_throughput));
}