Skip to main content

tempest_rt/fs/
read_at.rs

1//! Async positioned read operation.
2
3use std::{
4    io,
5    marker::PhantomData,
6    mem::replace,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use tempest_io::{Io, IoBufMut, OpHandle, ReadHandle};
12
13use crate::context::{current_io, get_op_handle};
14
15enum ReadAtState<B: IoBufMut> {
16    NeedsSubmit {
17        buf: B,
18        op_handle: Option<OpHandle>,
19    },
20    InFlight {
21        read_handle: ReadHandle<B>,
22        op_handle: OpHandle,
23    },
24    Done,
25}
26
27/// Future that reads from a file at a given offset, resolving to `(bytes_read, buf)`.
28///
29/// May return fewer bytes than requested; use [`read_exact`] for a full read.
30///
31/// [`read_exact`]: crate::fs::read_exact
32#[must_use = "futures do nothing unless awaited"]
33pub struct ReadAt<B: IoBufMut, I: Io> {
34    state: ReadAtState<B>,
35    fd: I::Fd,
36    offset: u64,
37    _marker: PhantomData<fn() -> I>,
38}
39
40/// Reads from `fd` into `buf` at `offset`. Returns `(bytes_read, buf)`.
41pub fn read_at<B: IoBufMut, I: Io>(fd: I::Fd, buf: B, offset: u64) -> ReadAt<B, I> {
42    ReadAt {
43        state: ReadAtState::NeedsSubmit {
44            buf,
45            op_handle: None,
46        },
47        fd,
48        offset,
49        _marker: PhantomData,
50    }
51}
52
53impl<B: IoBufMut, I: Io> Unpin for ReadAt<B, I> {}
54
55impl<B: IoBufMut, I: Io> Future for ReadAt<B, I> {
56    type Output = (io::Result<usize>, B);
57
58    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59        let this = self.get_mut();
60
61        match replace(&mut this.state, ReadAtState::Done) {
62            ReadAtState::NeedsSubmit { buf, op_handle } => {
63                let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });
64
65                // SAFETY: we do not hold on to io outside of this function
66                let io = unsafe { current_io::<I>() };
67                match io.read_at(this.fd, buf, this.offset, op_handle) {
68                    Ok(read_handle) => {
69                        this.state = ReadAtState::InFlight {
70                            read_handle,
71                            op_handle,
72                        };
73                        Poll::Pending
74                    }
75                    Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
76                        this.state = ReadAtState::NeedsSubmit {
77                            buf,
78                            op_handle: Some(op_handle),
79                        };
80                        cx.waker().wake_by_ref();
81                        Poll::Pending
82                    }
83                    Err((e, buf)) => Poll::Ready((Err(e), buf)),
84                }
85            }
86            ReadAtState::InFlight {
87                read_handle,
88                op_handle,
89            } => {
90                // SAFETY: we do not hold on to io outside of this function
91                let io = unsafe { current_io::<I>() };
92                match io.get_cqe(op_handle) {
93                    Some(Ok(bytes_written)) => {
94                        // SAFETY: We got `bytes_written` from a read SQE
95                        let buf = unsafe { read_handle.complete(bytes_written) };
96                        Poll::Ready((Ok(bytes_written as usize), buf))
97                    }
98                    Some(Err(e)) => {
99                        // SAFETY: Setting completed to 0 cannot expose uninitialized data
100                        let buf = unsafe { read_handle.complete(0) };
101                        Poll::Ready((Err(e), buf))
102                    }
103                    None => Poll::Pending,
104                }
105            }
106            ReadAtState::Done => panic!("polled after completion"),
107        }
108    }
109}