tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async positioned write operation.

use std::{
    io,
    marker::PhantomData,
    mem::replace,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, IoBuf, OpHandle, WriteHandle};

use crate::context::{current_io, get_op_handle};

enum WriteAtState<B: IoBuf> {
    NeedsSubmit {
        buf: B,
        op_handle: Option<OpHandle>,
    },
    InFlight {
        write_handle: WriteHandle<B>,
        op_handle: OpHandle,
    },
    Done,
}

/// Future that writes to a file at a given offset, resolving to `(bytes_written, buf)`.
///
/// May write fewer bytes than provided; use [`write_exact`] for a full write.
///
/// [`write_exact`]: crate::fs::write_exact
#[must_use = "futures do nothing unless awaited"]
pub struct WriteAt<B: IoBuf, I: Io> {
    state: WriteAtState<B>,
    fd: I::Fd,
    offset: u64,
    _marker: PhantomData<fn() -> I>,
}

/// Writes `buf` to `fd` at `offset`. Returns `(bytes_written, buf)`.
pub fn write_at<B: IoBuf, I: Io>(fd: I::Fd, buf: B, offset: u64) -> WriteAt<B, I> {
    WriteAt {
        state: WriteAtState::NeedsSubmit {
            buf,
            op_handle: None,
        },
        fd,
        offset,
        _marker: PhantomData,
    }
}

impl<B: IoBuf, I: Io> Unpin for WriteAt<B, I> {}

impl<B: IoBuf, I: Io> Future for WriteAt<B, I> {
    type Output = (io::Result<usize>, B);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        match replace(&mut this.state, WriteAtState::Done) {
            WriteAtState::NeedsSubmit { buf, op_handle } => {
                let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });

                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.write_at(this.fd, buf, this.offset, op_handle) {
                    Ok(write_handle) => {
                        this.state = WriteAtState::InFlight {
                            write_handle,
                            op_handle,
                        };
                        Poll::Pending
                    }
                    Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
                        this.state = WriteAtState::NeedsSubmit {
                            buf,
                            op_handle: Some(op_handle),
                        };
                        cx.waker().wake_by_ref();
                        Poll::Pending
                    }
                    Err((e, buf)) => Poll::Ready((Err(e), buf)),
                }
            }
            WriteAtState::InFlight {
                write_handle,
                op_handle,
            } => {
                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.get_cqe(op_handle) {
                    Some(Ok(bytes_written)) => {
                        let buf = write_handle.complete();
                        Poll::Ready((Ok(bytes_written as usize), buf))
                    }
                    Some(Err(e)) => {
                        let buf = write_handle.complete();
                        Poll::Ready((Err(e), buf))
                    }
                    None => Poll::Pending,
                }
            }
            WriteAtState::Done => panic!("polled after completion"),
        }
    }
}