bytes-handoff 1.0.0

Incremental async byte ingestion and bounded owned write handoff.
Documentation
use bytes::Bytes;
use bytes_handoff::{WriteHandoff, WriteHandoffConfig};
use criterion::{BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::{Builder, Runtime};

fn runtime() -> Runtime {
    Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("build benchmark runtime")
}

async fn drain_expected<R>(mut reader: R, expected: usize) -> usize
where
    R: tokio::io::AsyncRead + Unpin,
{
    let mut total = 0;
    let mut buf = vec![0_u8; 64 * 1024];
    while total < expected {
        let read = reader.read(&mut buf).await.expect("read benchmark sink");
        if read == 0 {
            break;
        }
        total += read;
    }
    total
}

fn write_direct_chunks(rt: &Runtime, chunk_size: usize, chunks: usize) -> usize {
    rt.block_on(async {
        let total_bytes = chunk_size * chunks;
        let (mut writer, reader) = tokio::io::duplex(4 * 1024 * 1024);
        let drain = tokio::spawn(drain_expected(reader, total_bytes));
        let chunk = vec![7_u8; chunk_size];

        for _ in 0..chunks {
            writer
                .write_all(&chunk)
                .await
                .expect("write benchmark chunk");
        }

        let drained = drain.await.expect("drain task joins");
        black_box(drained)
    })
}

#[derive(Clone, Copy)]
enum CompletionMode {
    Ticket,
    FireAndForget,
}

fn write_handoff_chunks(
    rt: &Runtime,
    chunk_size: usize,
    chunks: usize,
    tasks: usize,
    completion: CompletionMode,
) -> usize {
    rt.block_on(async {
        let total_bytes = chunk_size * chunks;
        let (writer, reader) = tokio::io::duplex(4 * 1024 * 1024);
        let handoff = WriteHandoff::spawn(
            writer,
            WriteHandoffConfig::new(chunks + tasks, total_bytes + chunk_size),
        );
        let drain = tokio::spawn(drain_expected(reader, total_bytes));
        let chunk = Bytes::from(vec![7_u8; chunk_size]);

        let mut handles = Vec::with_capacity(tasks);
        for task_id in 0..tasks {
            let handoff = handoff.clone();
            let chunk = chunk.clone();
            let per_task = chunks / tasks;
            let extra = usize::from(task_id < chunks % tasks);
            handles.push(tokio::spawn(async move {
                match completion {
                    CompletionMode::Ticket => {
                        let mut tickets = Vec::with_capacity(per_task + extra);
                        for _ in 0..(per_task + extra) {
                            tickets.push(handoff.write(chunk.clone()).await.expect("submit write"));
                        }
                        for ticket in tickets {
                            ticket.wait().await.expect("write completes");
                        }
                    }
                    CompletionMode::FireAndForget => {
                        for _ in 0..(per_task + extra) {
                            handoff
                                .try_write_fire_and_forget(chunk.clone())
                                .expect("submit write");
                        }
                    }
                }
            }));
        }

        for handle in handles {
            handle.await.expect("producer task joins");
        }

        let drained = drain.await.expect("drain task joins");
        black_box(drained)
    })
}

fn write_handoff_benches(c: &mut Criterion) {
    let rt = runtime();

    let mut large = c.benchmark_group("write_large_chunks");
    for chunk_size in [64 * 1024, 1024 * 1024] {
        let chunks = 32;
        large.throughput(Throughput::Bytes((chunk_size * chunks) as u64));
        large.bench_with_input(
            BenchmarkId::new("direct_write_all", chunk_size),
            &chunk_size,
            |b, chunk_size| {
                b.iter(|| write_direct_chunks(&rt, *chunk_size, chunks));
            },
        );
        large.bench_with_input(
            BenchmarkId::new("handoff_ticket_single_task", chunk_size),
            &chunk_size,
            |b, chunk_size| {
                b.iter(|| {
                    write_handoff_chunks(&rt, *chunk_size, chunks, 1, CompletionMode::Ticket)
                });
            },
        );
        large.bench_with_input(
            BenchmarkId::new("handoff_fire_and_forget_single_task", chunk_size),
            &chunk_size,
            |b, chunk_size| {
                b.iter(|| {
                    write_handoff_chunks(&rt, *chunk_size, chunks, 1, CompletionMode::FireAndForget)
                });
            },
        );
    }
    large.finish();

    let mut producers = c.benchmark_group("write_many_tasks");
    let chunk_size = 32 * 1024;
    let chunks = 256;
    producers.throughput(Throughput::Bytes((chunk_size * chunks) as u64));
    for task_count in [1, 4, 16, 64] {
        producers.bench_with_input(
            BenchmarkId::new("ticket", task_count),
            &task_count,
            |b, task_count| {
                b.iter(|| {
                    write_handoff_chunks(
                        &rt,
                        chunk_size,
                        chunks,
                        *task_count,
                        CompletionMode::Ticket,
                    )
                });
            },
        );
        producers.bench_with_input(
            BenchmarkId::new("fire_and_forget", task_count),
            &task_count,
            |b, task_count| {
                b.iter(|| {
                    write_handoff_chunks(
                        &rt,
                        chunk_size,
                        chunks,
                        *task_count,
                        CompletionMode::FireAndForget,
                    )
                });
            },
        );
    }
    producers.finish();
}

fn backpressure_bench(c: &mut Criterion) {
    let rt = runtime();
    c.bench_function("write_byte_budget_backpressure", |b| {
        b.iter(|| {
            rt.block_on(async {
                let (writer, _reader) = tokio::io::duplex(64);
                let handoff = WriteHandoff::spawn(writer, WriteHandoffConfig::new(1, 63));
                let rejected = handoff
                    .try_write(Bytes::from(vec![1_u8; 64]))
                    .expect_err("chunk exceeds byte budget");
                black_box(rejected.into_bytes().len())
            })
        });
    });
}

criterion_group!(benches, write_handoff_benches, backpressure_bench);
criterion_main!(benches);