tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async file sync (fsync) operation.

use std::{
    io,
    marker::PhantomData,
    mem::replace,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, OpHandle};

use crate::context::{current_io, get_op_handle};

enum SyncFileState<I: Io> {
    NeedsSubmit { fd: I::Fd, handle: Option<OpHandle> },
    InFlight { handle: OpHandle },
    Done,
}

/// Future that fsyncs a file descriptor, ensuring data is flushed to storage.
#[must_use = "futures do nothing unless awaited"]
pub struct SyncFile<I: Io> {
    state: SyncFileState<I>,
    // NB: we use I as the return here, so that SyncFile stays Unpin
    _marker: PhantomData<fn() -> I>,
}

/// Issues an fsync on `fd`, resolving when the kernel confirms the flush.
pub fn sync_file<I: Io>(fd: I::Fd) -> SyncFile<I> {
    SyncFile {
        state: SyncFileState::NeedsSubmit { fd, handle: None },
        _marker: PhantomData,
    }
}

// NB: required because of Fd, which is Copy
impl<I: Io> Unpin for SyncFile<I> {}

impl<I: Io> Future for SyncFile<I> {
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        match replace(&mut this.state, SyncFileState::Done) {
            SyncFileState::NeedsSubmit { fd, handle } => {
                let handle = handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });

                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.fsync(fd, handle) {
                    Ok(()) => {
                        this.state = SyncFileState::InFlight { handle };
                        Poll::Pending
                    }
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                        // io SQ is full, retry next tick
                        this.state = SyncFileState::NeedsSubmit {
                            fd,
                            handle: Some(handle),
                        };
                        cx.waker().wake_by_ref();
                        Poll::Pending
                    }
                    Err(e) => Poll::Ready(Err(e)),
                }
            }
            SyncFileState::InFlight { handle } => {
                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.get_cqe(handle).transpose()? {
                    Some(_) => Poll::Ready(Ok(())),
                    None => Poll::Pending,
                }
            }
            SyncFileState::Done => panic!("polled after completion"),
        }
    }
}