Skip to main content

btdt_server_lib/
asyncio.rs

1//! Helpers for dealing with asynchronous I/O that should be available for benchmarks.
2
3use bytes::BytesMut;
4use futures_core::Stream;
5use std::io;
6use std::io::Read;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::sync::mpsc;
10use tokio::sync::mpsc::Receiver;
11use tokio::task::spawn_blocking;
12
13/// An adapter to convert a blocking `Read` into an async `Stream` of `Bytes`.
14pub struct StreamAdapter {
15    rx: Receiver<io::Result<bytes::Bytes>>,
16}
17
18impl StreamAdapter {
19    /// Creates a new `StreamAdapter` from a blocking `Read` implementor.
20    pub fn new<R: Read + Send + 'static>(mut reader: R, size_hint: Option<u64>) -> Self {
21        let (tx, rx) = mpsc::channel(10);
22        spawn_blocking(move || {
23            const MAX_BUF_SIZE: usize = 512 * 1024;
24            const REALLOCATION_THRESHOLD: usize = 1024;
25            let buf_size = usize::min(
26                size_hint
27                    .map(|hint| hint as usize + REALLOCATION_THRESHOLD)
28                    .unwrap_or(MAX_BUF_SIZE),
29                MAX_BUF_SIZE,
30            );
31            let mut buf = BytesMut::zeroed(buf_size);
32            loop {
33                match reader.read(&mut buf) {
34                    Ok(0) => break, // EOF
35                    Ok(n) => {
36                        if tx.blocking_send(Ok(buf.split_to(n).freeze())).is_err() {
37                            break; // Channel closed
38                        }
39                    }
40                    Err(e) => {
41                        if tx.blocking_send(Err(e)).is_err() {
42                            break; // Channel closed
43                        }
44                    }
45                }
46
47                if buf.capacity() < REALLOCATION_THRESHOLD {
48                    buf = BytesMut::zeroed(MAX_BUF_SIZE);
49                }
50            }
51        });
52        Self { rx }
53    }
54}
55
56impl Stream for StreamAdapter {
57    type Item = io::Result<bytes::Bytes>;
58
59    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60        self.rx.poll_recv(cx)
61    }
62}