tonic 0.11.0

A gRPC over HTTP/2 implementation focused on high performance, interoperability, and flexibility.
Documentation
use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::Body;
use std::{
    fmt::{Error, Formatter},
    pin::Pin,
    task::{Context, Poll},
};
use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};

macro_rules! bench {
    ($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
        fn $name(b: &mut Bencher) {
            let rt = tokio::runtime::Builder::new_multi_thread()
                .build()
                .expect("runtime");

            let payload = make_payload($message_size, $message_count);
            let body = MockBody::new(payload, $chunk_size);
            b.bytes = body.len() as u64;

            b.iter(|| {
                rt.block_on(async {
                    let decoder = MockDecoder::new($message_size);
                    let mut stream = Streaming::new_request(decoder, body.clone(), None, None);

                    let mut count = 0;
                    while let Some(msg) = stream.message().await.unwrap() {
                        assert_eq!($message_size, msg.len());
                        count += 1;
                    }

                    assert_eq!(count, $message_count);
                    assert!(stream.trailers().await.unwrap().is_none());
                })
            })
        }
    };
}

#[derive(Clone)]
struct MockBody {
    data: Bytes,
    chunk_size: usize,
}

impl MockBody {
    pub fn new(data: Bytes, chunk_size: usize) -> Self {
        MockBody { data, chunk_size }
    }

    pub fn len(&self) -> usize {
        self.data.len()
    }
}

impl Body for MockBody {
    type Data = Bytes;
    type Error = Status;

    fn poll_data(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
        if self.data.has_remaining() {
            let split = std::cmp::min(self.chunk_size, self.data.remaining());
            Poll::Ready(Some(Ok(self.data.split_to(split))))
        } else {
            Poll::Ready(None)
        }
    }

    fn poll_trailers(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
        Poll::Ready(Ok(None))
    }
}

impl std::fmt::Debug for MockBody {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
        let sample = self.data.iter().take(10).collect::<Vec<_>>();
        write!(f, "{:?}...({})", sample, self.data.len())
    }
}

#[derive(Debug, Clone)]
struct MockDecoder {
    message_size: usize,
}

impl MockDecoder {
    fn new(message_size: usize) -> Self {
        MockDecoder { message_size }
    }
}

impl Decoder for MockDecoder {
    type Item = Vec<u8>;
    type Error = Status;

    fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
        let out = Vec::from(buf.chunk());
        buf.advance(self.message_size);
        Ok(Some(out))
    }
}

fn make_payload(message_length: usize, message_count: usize) -> Bytes {
    let mut buf = BytesMut::new();

    for _ in 0..message_count {
        let msg = vec![97u8; message_length];
        buf.reserve(msg.len() + 5);
        buf.put_u8(0);
        buf.put_u32(msg.len() as u32);
        buf.put(&msg[..]);
    }

    buf.freeze()
}

// change body chunk size only
bench!(chunk_size_100, 1_000, 100, 1);
bench!(chunk_size_500, 1_000, 500, 1);
bench!(chunk_size_1005, 1_000, 1_005, 1);

// change message size only
bench!(message_size_1k, 1_000, 1_005, 2);
bench!(message_size_5k, 5_000, 1_005, 2);
bench!(message_size_10k, 10_000, 1_005, 2);

// change message count only
bench!(message_count_1, 500, 505, 1);
bench!(message_count_10, 500, 505, 10);
bench!(message_count_20, 500, 505, 20);

benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005);

benchmark_group!(
    message_size,
    message_size_1k,
    message_size_5k,
    message_size_10k
);

benchmark_group!(
    message_count,
    message_count_1,
    message_count_10,
    message_count_20
);

benchmark_main!(chunk_size, message_size, message_count);