tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async file open operation.

use std::{
    io,
    marker::PhantomData,
    mem::replace,
    path::PathBuf,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, OpHandle, OpenOptions};

use crate::context::{current_io, get_op_handle};

enum OpenFileState {
    NeedsSubmit {
        path: PathBuf,
        opts: OpenOptions,
        handle: Option<OpHandle>,
    },
    InFlight {
        handle: OpHandle,
    },
    Done,
}

/// Future that opens a file and resolves to its file descriptor.
#[must_use = "futures do nothing unless awaited"]
pub struct OpenFile<I: Io> {
    state: OpenFileState,
    // NB: we use I as the return here, so that OpenFile stays Unpin
    _marker: PhantomData<fn() -> I>,
}

/// Opens `path` with the given [`OpenOptions`], returning the file descriptor on success.
pub fn open_file<I: Io>(path: PathBuf, opts: OpenOptions) -> OpenFile<I> {
    OpenFile {
        state: OpenFileState::NeedsSubmit {
            path,
            opts,
            handle: None,
        },
        _marker: PhantomData,
    }
}

impl<I: Io> Future for OpenFile<I> {
    type Output = io::Result<I::Fd>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        match replace(&mut this.state, OpenFileState::Done) {
            OpenFileState::NeedsSubmit { path, opts, handle } => {
                let handle = handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });

                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.open(&path, opts.clone(), handle) {
                    Ok(()) => {
                        this.state = OpenFileState::InFlight { handle };
                        Poll::Pending
                    }
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                        // io SQ is full, retry next tick
                        this.state = OpenFileState::NeedsSubmit {
                            path,
                            opts,
                            handle: Some(handle),
                        };
                        cx.waker().wake_by_ref();
                        Poll::Pending
                    }
                    Err(e) => Poll::Ready(Err(e)),
                }
            }
            OpenFileState::InFlight { handle } => {
                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.get_cqe(handle).transpose()? {
                    Some(raw_fd) => {
                        // SAFETY: got this from an open CQE
                        let fd = unsafe { I::into_fd(raw_fd) };
                        Poll::Ready(Ok(fd))
                    }
                    None => {
                        this.state = OpenFileState::InFlight { handle };
                        Poll::Pending
                    }
                }
            }
            OpenFileState::Done => panic!("polled after completion"),
        }
    }
}