poll_buf_utils/lib.rs
1/*!
2This is an utility library for resumable byte transfers between buffers
3supported by the [`bytes`] crate and byte-streams which support the
4[`futures_io`] [`AsyncRead`](futures_io::AsyncRead) and/or
5[`AsyncWrite`](futures_io::AsyncWrite) traits.
6
7This crate assumes the following behavoirs about `AsyncRead/AsyncWrite` implementations: If the `poll_*` method call results in:
8 * `Poll::Ready(Ok(n))` with `n != 0`, bytes were successfully transferred
9 * otherwise, we assume that the call failed and no bytes were transferred at all
10
11 **/
12
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16fn ret_reduce(
17 ret: Poll<std::io::Result<usize>>,
18 reached_limit: bool,
19) -> Poll<std::io::Result<bool>> {
20 ret.map(|x| x.map(|_| reached_limit))
21}
22
23#[derive(Debug)]
24pub struct PollResult {
25 /// how much bytes were successfully transferred until yielding
26 pub delta: usize,
27
28 /// yielded with the following result
29 /// the inner `bool` specifies if some pre-specified limit was reached
30 pub ret: Poll<std::io::Result<bool>>,
31}
32
33/// This function tries to read at most `delta_limit` bytes from `input` to `output`.
34pub fn poll_read<I, O>(
35 mut input: Pin<&mut I>,
36 output: &mut O,
37 cx: &mut Context<'_>,
38 delta_limit: usize,
39) -> PollResult
40where
41 I: futures_io::AsyncRead,
42 O: bytes::BufMut,
43{
44 let mut rdbuf = [0u8; 8192];
45 let start = output.remaining_mut();
46 loop {
47 let buflim = *[
48 rdbuf.len(),
49 output.remaining_mut(),
50 delta_limit - (start - output.remaining_mut()),
51 ]
52 .iter()
53 .min()
54 .unwrap();
55 match input.as_mut().poll_read(cx, &mut rdbuf[..buflim]) {
56 // if we managed to read something....
57 Poll::Ready(Ok(n)) if n != 0 => output.put_slice(&rdbuf[..n]),
58
59 // assumption: if we get here, the call to poll_read failed and
60 // didn't read anything
61 ret => {
62 return PollResult {
63 delta: start - output.remaining_mut(),
64 ret: ret_reduce(ret, buflim == 0),
65 }
66 }
67 }
68 }
69}
70
71pub fn poll_write<I, O>(input: &mut I, mut output: Pin<&mut O>, cx: &mut Context<'_>) -> PollResult
72where
73 I: bytes::Buf,
74 O: futures_io::AsyncWrite,
75{
76 let start = input.remaining();
77 loop {
78 match output.as_mut().poll_write(cx, input.bytes()) {
79 // if we managed to write something...
80 Poll::Ready(Ok(n)) if n != 0 => input.advance(n),
81
82 // assumption: if we get here, the call to poll_write failed and
83 // didn't write anything
84 ret => {
85 return PollResult {
86 delta: start - input.remaining(),
87 ret: ret_reduce(ret, !input.has_remaining()),
88 }
89 }
90 }
91 }
92}