tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async positioned read operation.

use std::{
    io,
    marker::PhantomData,
    mem::replace,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, IoBufMut, OpHandle, ReadHandle};

use crate::context::{current_io, get_op_handle};

enum ReadAtState<B: IoBufMut> {
    NeedsSubmit {
        buf: B,
        op_handle: Option<OpHandle>,
    },
    InFlight {
        read_handle: ReadHandle<B>,
        op_handle: OpHandle,
    },
    Done,
}

/// Future that reads from a file at a given offset, resolving to `(bytes_read, buf)`.
///
/// May return fewer bytes than requested; use [`read_exact`] for a full read.
///
/// [`read_exact`]: crate::fs::read_exact
#[must_use = "futures do nothing unless awaited"]
pub struct ReadAt<B: IoBufMut, I: Io> {
    state: ReadAtState<B>,
    fd: I::Fd,
    offset: u64,
    _marker: PhantomData<fn() -> I>,
}

/// Reads from `fd` into `buf` at `offset`. Returns `(bytes_read, buf)`.
pub fn read_at<B: IoBufMut, I: Io>(fd: I::Fd, buf: B, offset: u64) -> ReadAt<B, I> {
    ReadAt {
        state: ReadAtState::NeedsSubmit {
            buf,
            op_handle: None,
        },
        fd,
        offset,
        _marker: PhantomData,
    }
}

impl<B: IoBufMut, I: Io> Unpin for ReadAt<B, I> {}

impl<B: IoBufMut, I: Io> Future for ReadAt<B, I> {
    type Output = (io::Result<usize>, B);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        match replace(&mut this.state, ReadAtState::Done) {
            ReadAtState::NeedsSubmit { buf, op_handle } => {
                let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });

                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.read_at(this.fd, buf, this.offset, op_handle) {
                    Ok(read_handle) => {
                        this.state = ReadAtState::InFlight {
                            read_handle,
                            op_handle,
                        };
                        Poll::Pending
                    }
                    Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
                        this.state = ReadAtState::NeedsSubmit {
                            buf,
                            op_handle: Some(op_handle),
                        };
                        cx.waker().wake_by_ref();
                        Poll::Pending
                    }
                    Err((e, buf)) => Poll::Ready((Err(e), buf)),
                }
            }
            ReadAtState::InFlight {
                read_handle,
                op_handle,
            } => {
                // SAFETY: we do not hold on to io outside of this function
                let io = unsafe { current_io::<I>() };
                match io.get_cqe(op_handle) {
                    Some(Ok(bytes_written)) => {
                        // SAFETY: We got `bytes_written` from a read SQE
                        let buf = unsafe { read_handle.complete(bytes_written) };
                        Poll::Ready((Ok(bytes_written as usize), buf))
                    }
                    Some(Err(e)) => {
                        // SAFETY: Setting completed to 0 cannot expose uninitialized data
                        let buf = unsafe { read_handle.complete(0) };
                        Poll::Ready((Err(e), buf))
                    }
                    None => Poll::Pending,
                }
            }
            ReadAtState::Done => panic!("polled after completion"),
        }
    }
}