tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async exact-length read operation.

use std::{
    io,
    mem::replace,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, IoBufMut, Slice};

use super::{ReadAt, read_at};

enum ReadExactState<B: IoBufMut, I: Io> {
    Idle {
        buf: B,
        done: usize,
    },
    Reading {
        fut: ReadAt<Slice<B>, I>,
        done: usize,
    },
    Done,
}

/// Future that reads exactly `buf.bytes_total()` bytes from a file, retrying on partial reads.
#[must_use = "futures do nothing unless awaited"]
pub struct ReadExact<B: IoBufMut, I: Io> {
    state: ReadExactState<B, I>,
    fd: I::Fd,
    offset: u64,
    total: usize,
}

/// Reads exactly `buf.bytes_total()` bytes from `fd` at `offset`, retrying on partial reads.
/// Returns `(Ok(()), buf)` on success, or `(Err(e), buf)` on I/O error.
// TODO: we could optimize this by inlining the ReadAt and reusing the OpHandle...
// but that buys us only a really little advantage at most
pub fn read_exact<B: IoBufMut, I: Io>(fd: I::Fd, buf: B, offset: u64) -> ReadExact<B, I> {
    let total = buf.bytes_total();
    ReadExact {
        state: ReadExactState::Idle { buf, done: 0 },
        fd,
        offset,
        total,
    }
}

impl<B: IoBufMut, I: Io> Unpin for ReadExact<B, I> {}

impl<B: IoBufMut, I: Io> Future for ReadExact<B, I> {
    type Output = (io::Result<()>, B);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        loop {
            match replace(&mut this.state, ReadExactState::Done) {
                ReadExactState::Idle { buf, done } => {
                    let fut = read_at::<_, I>(
                        this.fd,
                        buf.slice(done..this.total),
                        this.offset + done as u64,
                    );
                    this.state = ReadExactState::Reading { fut, done };
                    // fall through to poll the ReadAt immediately
                }
                ReadExactState::Reading { mut fut, done } => {
                    match Pin::new(&mut fut).poll(cx) {
                        Poll::Pending => {
                            this.state = ReadExactState::Reading { fut, done };
                            return Poll::Pending;
                        }
                        Poll::Ready((Ok(n), slice)) => {
                            let buf = slice.into_inner();
                            let new_done = done + n;
                            if new_done >= this.total {
                                return Poll::Ready((Ok(()), buf));
                            }
                            this.state = ReadExactState::Idle {
                                buf,
                                done: new_done,
                            };
                            // loop to submit the next partial read
                        }
                        Poll::Ready((Err(e), slice)) => {
                            return Poll::Ready((Err(e), slice.into_inner()));
                        }
                    }
                }
                ReadExactState::Done => panic!("ReadExact polled after completion"),
            }
        }
    }
}