poll_buf_utils/
lib.rs

1/*!
2This is an utility library for resumable byte transfers between buffers
3supported by the [`bytes`] crate and byte-streams which support the
4[`futures_io`] [`AsyncRead`](futures_io::AsyncRead) and/or
5[`AsyncWrite`](futures_io::AsyncWrite) traits.
6
7This crate assumes the following behavoirs about `AsyncRead/AsyncWrite` implementations: If the `poll_*` method call results in:
8 * `Poll::Ready(Ok(n))` with `n != 0`, bytes were successfully transferred
9 * otherwise, we assume that the call failed and no bytes were transferred at all
10
11 **/
12
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16fn ret_reduce(
17    ret: Poll<std::io::Result<usize>>,
18    reached_limit: bool,
19) -> Poll<std::io::Result<bool>> {
20    ret.map(|x| x.map(|_| reached_limit))
21}
22
23#[derive(Debug)]
24pub struct PollResult {
25    /// how much bytes were successfully transferred until yielding
26    pub delta: usize,
27
28    /// yielded with the following result
29    /// the inner `bool` specifies if some pre-specified limit was reached
30    pub ret: Poll<std::io::Result<bool>>,
31}
32
33/// This function tries to read at most `delta_limit` bytes from `input` to `output`.
34pub fn poll_read<I, O>(
35    mut input: Pin<&mut I>,
36    output: &mut O,
37    cx: &mut Context<'_>,
38    delta_limit: usize,
39) -> PollResult
40where
41    I: futures_io::AsyncRead,
42    O: bytes::BufMut,
43{
44    let mut rdbuf = [0u8; 8192];
45    let start = output.remaining_mut();
46    loop {
47        let buflim = *[
48            rdbuf.len(),
49            output.remaining_mut(),
50            delta_limit - (start - output.remaining_mut()),
51        ]
52        .iter()
53        .min()
54        .unwrap();
55        match input.as_mut().poll_read(cx, &mut rdbuf[..buflim]) {
56            // if we managed to read something....
57            Poll::Ready(Ok(n)) if n != 0 => output.put_slice(&rdbuf[..n]),
58
59            // assumption: if we get here, the call to poll_read failed and
60            // didn't read anything
61            ret => {
62                return PollResult {
63                    delta: start - output.remaining_mut(),
64                    ret: ret_reduce(ret, buflim == 0),
65                }
66            }
67        }
68    }
69}
70
71pub fn poll_write<I, O>(input: &mut I, mut output: Pin<&mut O>, cx: &mut Context<'_>) -> PollResult
72where
73    I: bytes::Buf,
74    O: futures_io::AsyncWrite,
75{
76    let start = input.remaining();
77    loop {
78        match output.as_mut().poll_write(cx, input.bytes()) {
79            // if we managed to write something...
80            Poll::Ready(Ok(n)) if n != 0 => input.advance(n),
81
82            // assumption: if we get here, the call to poll_write failed and
83            // didn't write anything
84            ret => {
85                return PollResult {
86                    delta: start - input.remaining(),
87                    ret: ret_reduce(ret, !input.has_remaining()),
88                }
89            }
90        }
91    }
92}