tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async file close operation.

use std::{
    io,
    marker::PhantomData,
    mem::replace,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, OpHandle};

use crate::context::{current_io, get_op_handle};

enum CloseFileState<I: Io> {
    NeedsSubmit { fd: I::Fd, handle: Option<OpHandle> },
    InFlight { handle: OpHandle },
    Done,
}

/// Future that closes a file descriptor.
#[must_use = "futures do nothing unless awaited"]
pub struct CloseFile<I: Io> {
    state: CloseFileState<I>,
    // NB: we use I as the return here, so that CloseFile stays Unpin
    _marker: PhantomData<fn() -> I>,
}

/// Closes `fd`, releasing the underlying resource.
pub fn close_file<I: Io>(fd: I::Fd) -> CloseFile<I> {
    CloseFile {
        state: CloseFileState::NeedsSubmit { fd, handle: None },
        _marker: PhantomData,
    }
}

// NB: required because of Fd, which is Copy
impl<I: Io> Unpin for CloseFile<I> {}

impl<I: Io> Future for CloseFile<I> {
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        match replace(&mut this.state, CloseFileState::Done) {
            CloseFileState::NeedsSubmit { fd, handle } => {
                let handle = handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });

                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.close(fd, handle) {
                    Ok(()) => {
                        this.state = CloseFileState::InFlight { handle };
                        Poll::Pending
                    }
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                        // io SQ is full, retry next tick
                        this.state = CloseFileState::NeedsSubmit {
                            fd,
                            handle: Some(handle),
                        };
                        cx.waker().wake_by_ref();
                        Poll::Pending
                    }
                    Err(e) => Poll::Ready(Err(e)),
                }
            }
            CloseFileState::InFlight { handle } => {
                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.get_cqe(handle).transpose()? {
                    Some(_) => Poll::Ready(Ok(())),
                    None => {
                        this.state = CloseFileState::InFlight { handle };
                        Poll::Pending
                    }
                }
            }
            CloseFileState::Done => panic!("polled after completion"),
        }
    }
}