Skip to main content

tempest_rt/fs/
read_exact.rs

1//! Async exact-length read operation.
2
3use std::{
4    io,
5    mem::replace,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use tempest_io::{Io, IoBufMut, Slice};
11
12use super::{ReadAt, read_at};
13
14enum ReadExactState<B: IoBufMut, I: Io> {
15    Idle {
16        buf: B,
17        done: usize,
18    },
19    Reading {
20        fut: ReadAt<Slice<B>, I>,
21        done: usize,
22    },
23    Done,
24}
25
26/// Future that reads exactly `buf.bytes_total()` bytes from a file, retrying on partial reads.
27#[must_use = "futures do nothing unless awaited"]
28pub struct ReadExact<B: IoBufMut, I: Io> {
29    state: ReadExactState<B, I>,
30    fd: I::Fd,
31    offset: u64,
32    total: usize,
33}
34
35/// Reads exactly `buf.bytes_total()` bytes from `fd` at `offset`, retrying on partial reads.
36/// Returns `(Ok(()), buf)` on success, or `(Err(e), buf)` on I/O error.
37// TODO: we could optimize this by inlining the ReadAt and reusing the OpHandle...
38// but that buys us only a really little advantage at most
39pub fn read_exact<B: IoBufMut, I: Io>(fd: I::Fd, buf: B, offset: u64) -> ReadExact<B, I> {
40    let total = buf.bytes_total();
41    ReadExact {
42        state: ReadExactState::Idle { buf, done: 0 },
43        fd,
44        offset,
45        total,
46    }
47}
48
49impl<B: IoBufMut, I: Io> Unpin for ReadExact<B, I> {}
50
51impl<B: IoBufMut, I: Io> Future for ReadExact<B, I> {
52    type Output = (io::Result<()>, B);
53
54    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55        let this = self.get_mut();
56
57        loop {
58            match replace(&mut this.state, ReadExactState::Done) {
59                ReadExactState::Idle { buf, done } => {
60                    let fut = read_at::<_, I>(
61                        this.fd,
62                        buf.slice(done..this.total),
63                        this.offset + done as u64,
64                    );
65                    this.state = ReadExactState::Reading { fut, done };
66                    // fall through to poll the ReadAt immediately
67                }
68                ReadExactState::Reading { mut fut, done } => {
69                    match Pin::new(&mut fut).poll(cx) {
70                        Poll::Pending => {
71                            this.state = ReadExactState::Reading { fut, done };
72                            return Poll::Pending;
73                        }
74                        Poll::Ready((Ok(n), slice)) => {
75                            let buf = slice.into_inner();
76                            let new_done = done + n;
77                            if new_done >= this.total {
78                                return Poll::Ready((Ok(()), buf));
79                            }
80                            this.state = ReadExactState::Idle {
81                                buf,
82                                done: new_done,
83                            };
84                            // loop to submit the next partial read
85                        }
86                        Poll::Ready((Err(e), slice)) => {
87                            return Poll::Ready((Err(e), slice.into_inner()));
88                        }
89                    }
90                }
91                ReadExactState::Done => panic!("ReadExact polled after completion"),
92            }
93        }
94    }
95}