Skip to main content

tempest_rt/fs/
open_file.rs

1//! Async file open operation.
2
3use std::{
4    io,
5    marker::PhantomData,
6    mem::replace,
7    path::PathBuf,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12use tempest_io::{Io, OpHandle, OpenOptions};
13
14use crate::context::{current_io, get_op_handle};
15
16enum OpenFileState {
17    NeedsSubmit {
18        path: PathBuf,
19        opts: OpenOptions,
20        handle: Option<OpHandle>,
21    },
22    InFlight {
23        handle: OpHandle,
24    },
25    Done,
26}
27
28/// Future that opens a file and resolves to its file descriptor.
29#[must_use = "futures do nothing unless awaited"]
30pub struct OpenFile<I: Io> {
31    state: OpenFileState,
32    // NB: we use I as the return here, so that OpenFile stays Unpin
33    _marker: PhantomData<fn() -> I>,
34}
35
36/// Opens `path` with the given [`OpenOptions`], returning the file descriptor on success.
37pub fn open_file<I: Io>(path: PathBuf, opts: OpenOptions) -> OpenFile<I> {
38    OpenFile {
39        state: OpenFileState::NeedsSubmit {
40            path,
41            opts,
42            handle: None,
43        },
44        _marker: PhantomData,
45    }
46}
47
48impl<I: Io> Future for OpenFile<I> {
49    type Output = io::Result<I::Fd>;
50
51    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52        let this = self.get_mut();
53
54        match replace(&mut this.state, OpenFileState::Done) {
55            OpenFileState::NeedsSubmit { path, opts, handle } => {
56                let handle = handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });
57
58                // SAFETY: we do not hold on to io outside of this function
59                let io = unsafe { current_io::<I>() };
60                match io.open(&path, opts.clone(), handle) {
61                    Ok(()) => {
62                        this.state = OpenFileState::InFlight { handle };
63                        Poll::Pending
64                    }
65                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
66                        // io SQ is full, retry next tick
67                        this.state = OpenFileState::NeedsSubmit {
68                            path,
69                            opts,
70                            handle: Some(handle),
71                        };
72                        cx.waker().wake_by_ref();
73                        Poll::Pending
74                    }
75                    Err(e) => Poll::Ready(Err(e)),
76                }
77            }
78            OpenFileState::InFlight { handle } => {
79                // SAFETY: we do not hold on to io outside of this function
80                let io = unsafe { current_io::<I>() };
81                match io.get_cqe(handle).transpose()? {
82                    Some(raw_fd) => {
83                        // SAFETY: got this from an open CQE
84                        let fd = unsafe { I::into_fd(raw_fd) };
85                        Poll::Ready(Ok(fd))
86                    }
87                    None => {
88                        this.state = OpenFileState::InFlight { handle };
89                        Poll::Pending
90                    }
91                }
92            }
93            OpenFileState::Done => panic!("polled after completion"),
94        }
95    }
96}