btdt_server_lib/
asyncio.rs

1use bytes::BytesMut;
2use futures_core::Stream;
3use std::io;
4use std::io::Read;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc;
8use tokio::sync::mpsc::Receiver;
9use tokio::task::spawn_blocking;
10
11pub struct StreamAdapter {
12    rx: Receiver<io::Result<bytes::Bytes>>,
13}
14
15impl StreamAdapter {
16    pub fn new<R: Read + Send + 'static>(mut reader: R, size_hint: Option<u64>) -> Self {
17        let (tx, rx) = mpsc::channel(10);
18        spawn_blocking(move || {
19            const MAX_BUF_SIZE: usize = 512 * 1024;
20            const REALLOCATION_THRESHOLD: usize = 1024;
21            let buf_size = usize::min(
22                size_hint
23                    .map(|hint| hint as usize + REALLOCATION_THRESHOLD)
24                    .unwrap_or(MAX_BUF_SIZE),
25                MAX_BUF_SIZE,
26            );
27            let mut buf = BytesMut::zeroed(buf_size);
28            loop {
29                match reader.read(&mut buf) {
30                    Ok(0) => break, // EOF
31                    Ok(n) => {
32                        if tx.blocking_send(Ok(buf.split_to(n).freeze())).is_err() {
33                            break; // Channel closed
34                        }
35                    }
36                    Err(e) => {
37                        if tx.blocking_send(Err(e)).is_err() {
38                            break; // Channel closed
39                        }
40                    }
41                }
42
43                if buf.capacity() < REALLOCATION_THRESHOLD {
44                    buf = BytesMut::zeroed(MAX_BUF_SIZE);
45                }
46            }
47        });
48        Self { rx }
49    }
50}
51
52impl Stream for StreamAdapter {
53    type Item = io::Result<bytes::Bytes>;
54
55    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56        self.rx.poll_recv(cx)
57    }
58}