tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Async exact-length write operation.

use std::{
    io,
    mem::replace,
    pin::Pin,
    task::{Context, Poll},
};

use tempest_io::{Io, IoBuf, Slice};

use super::{WriteAt, write_at};

enum WriteExactState<B: IoBuf, I: Io> {
    Idle {
        buf: B,
        done: usize,
    },
    Writing {
        fut: WriteAt<Slice<B>, I>,
        done: usize,
    },
    Done,
}

/// Future that writes exactly `buf.bytes_init()` bytes to a file, retrying on partial writes.
#[must_use = "futures do nothing unless awaited"]
pub struct WriteExact<B: IoBuf, I: Io> {
    state: WriteExactState<B, I>,
    fd: I::Fd,
    offset: u64,
    total: usize,
}

/// Writes exactly `buf.bytes_init()` bytes from `buf` to `fd` at `offset`, retrying on partials.
/// Returns `(Ok(()), buf)` on success, or `(Err(e), buf)` on I/O error.
// TODO: we could optimize this by inlining the WriteAt and reusing the OpHandle...
// but that buys us only a really little advantage at most
pub fn write_exact<B: IoBuf, I: Io>(fd: I::Fd, buf: B, offset: u64) -> WriteExact<B, I> {
    let total = buf.bytes_init();
    WriteExact {
        state: WriteExactState::Idle { buf, done: 0 },
        fd,
        offset,
        total,
    }
}

impl<B: IoBuf, I: Io> Unpin for WriteExact<B, I> {}

impl<B: IoBuf, I: Io> Future for WriteExact<B, I> {
    type Output = (io::Result<()>, B);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        loop {
            match replace(&mut this.state, WriteExactState::Done) {
                WriteExactState::Idle { buf, done } => {
                    let fut = write_at::<_, I>(
                        this.fd,
                        buf.slice(done..this.total),
                        this.offset + done as u64,
                    );
                    this.state = WriteExactState::Writing { fut, done };
                    // fall through to poll the WriteAt immediately
                }
                WriteExactState::Writing { mut fut, done } => {
                    match Pin::new(&mut fut).poll(cx) {
                        Poll::Pending => {
                            this.state = WriteExactState::Writing { fut, done };
                            return Poll::Pending;
                        }
                        Poll::Ready((Ok(n), slice)) => {
                            let buf = slice.into_inner();
                            let new_done = done + n;
                            if new_done >= this.total {
                                return Poll::Ready((Ok(()), buf));
                            }
                            this.state = WriteExactState::Idle {
                                buf,
                                done: new_done,
                            };
                            // loop to submit the next partial write
                        }
                        Poll::Ready((Err(e), slice)) => {
                            return Poll::Ready((Err(e), slice.into_inner()));
                        }
                    }
                }
                WriteExactState::Done => panic!("WriteExact polled after completion"),
            }
        }
    }
}