1use std::{
4 io,
5 marker::PhantomData,
6 mem::replace,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use tempest_io::{Io, IoBufMut, OpHandle, ReadHandle};
12
13use crate::context::{current_io, get_op_handle};
14
15enum ReadAtState<B: IoBufMut> {
16 NeedsSubmit {
17 buf: B,
18 op_handle: Option<OpHandle>,
19 },
20 InFlight {
21 read_handle: ReadHandle<B>,
22 op_handle: OpHandle,
23 },
24 Done,
25}
26
27#[must_use = "futures do nothing unless awaited"]
33pub struct ReadAt<B: IoBufMut, I: Io> {
34 state: ReadAtState<B>,
35 fd: I::Fd,
36 offset: u64,
37 _marker: PhantomData<fn() -> I>,
38}
39
40pub fn read_at<B: IoBufMut, I: Io>(fd: I::Fd, buf: B, offset: u64) -> ReadAt<B, I> {
42 ReadAt {
43 state: ReadAtState::NeedsSubmit {
44 buf,
45 op_handle: None,
46 },
47 fd,
48 offset,
49 _marker: PhantomData,
50 }
51}
52
53impl<B: IoBufMut, I: Io> Unpin for ReadAt<B, I> {}
54
55impl<B: IoBufMut, I: Io> Future for ReadAt<B, I> {
56 type Output = (io::Result<usize>, B);
57
58 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59 let this = self.get_mut();
60
61 match replace(&mut this.state, ReadAtState::Done) {
62 ReadAtState::NeedsSubmit { buf, op_handle } => {
63 let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });
64
65 let io = unsafe { current_io::<I>() };
67 match io.read_at(this.fd, buf, this.offset, op_handle) {
68 Ok(read_handle) => {
69 this.state = ReadAtState::InFlight {
70 read_handle,
71 op_handle,
72 };
73 Poll::Pending
74 }
75 Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
76 this.state = ReadAtState::NeedsSubmit {
77 buf,
78 op_handle: Some(op_handle),
79 };
80 cx.waker().wake_by_ref();
81 Poll::Pending
82 }
83 Err((e, buf)) => Poll::Ready((Err(e), buf)),
84 }
85 }
86 ReadAtState::InFlight {
87 read_handle,
88 op_handle,
89 } => {
90 let io = unsafe { current_io::<I>() };
92 match io.get_cqe(op_handle) {
93 Some(Ok(bytes_written)) => {
94 let buf = unsafe { read_handle.complete(bytes_written) };
96 Poll::Ready((Ok(bytes_written as usize), buf))
97 }
98 Some(Err(e)) => {
99 let buf = unsafe { read_handle.complete(0) };
101 Poll::Ready((Err(e), buf))
102 }
103 None => Poll::Pending,
104 }
105 }
106 ReadAtState::Done => panic!("polled after completion"),
107 }
108 }
109}