abs_buff 0.1.0

ABStraction of BUFFered IO
Documentation
use core::{
    borrow::BorrowMut,
    future::{Future, IntoFuture},
    marker::PhantomData,
    pin::Pin,
    task::{Context, Poll},
};

use pin_project::pin_project;
use pin_utils::pin_mut;

use abs_sync::{cancellation::*, x_deps::pin_utils};

use crate::{
    utils::{IoAbortReport, TrChunkWriter},
    TrBuffWriter,
};

pub struct ChunkWriter<B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    _use_w_: PhantomData<W>,
    _use_t_: PhantomData<[T]>,
    writer_: B,
}

impl<B, W, T> ChunkWriter<B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    pub const fn new(writer: B) -> Self {
        ChunkWriter {
            _use_w_: PhantomData,
            _use_t_: PhantomData,
            writer_: writer,
        }
    }

    pub fn can_write(&mut self) -> bool {
        self.writer_.borrow_mut().can_write()
    }

    pub fn write_async<'a>(
        &'a mut self,
        source: &'a [T],
    ) -> WriteChunkAsync<'a, B, W, T> {
        WriteChunkAsync::new(self, source)
    }
}

impl<B, W, T> TrChunkWriter<T> for ChunkWriter<B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    type WriterError = IoAbortReport<<W as TrBuffWriter<T>>::Error>;
    type WriteAsync<'a> = WriteChunkAsync<'a, B, W, T> where Self: 'a;

    #[inline(always)]
    fn can_write(&mut self) -> bool {
        ChunkWriter::can_write(self)
    }

    #[inline(always)]
    fn write_async<'a>(&'a mut self, source: &'a [T]) -> Self::WriteAsync<'a> {
        ChunkWriter::write_async(self, source)
    }
}

pub struct WriteChunkAsync<'a, B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    writer_: &'a mut ChunkWriter<B, W, T>,
    source_: &'a [T],
}

impl<'a, B, W, T> WriteChunkAsync<'a, B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    pub fn new(
        writer: &'a mut ChunkWriter<B, W, T>,
        source: &'a [T],
    ) -> Self {
        WriteChunkAsync {
            writer_: writer,
            source_: source,
        }
    }

    pub fn may_cancel_with<C>(
        self,
        cancel: Pin<&'a mut C>,
    ) -> WriterChunkFuture<'a, C, B, W, T>
    where
        C: TrCancellationToken,
    {
        WriterChunkFuture::new(self.writer_, self.source_, cancel)
    }
}

impl<'a, B, W, T> IntoFuture for WriteChunkAsync<'a, B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    type IntoFuture = WriterChunkFuture<'a, NonCancellableToken, B, W, T>;
    type Output = <Self::IntoFuture as Future>::Output;

    fn into_future(self) -> Self::IntoFuture {
        let cancel = NonCancellableToken::pinned();
        WriterChunkFuture::new(self.writer_, self.source_, cancel)
    }
}

impl<'a, B, W, T> TrIntoFutureMayCancel<'a> for WriteChunkAsync<'a, B, W, T>
where
    B: BorrowMut<W>,
    W: TrBuffWriter<T>,
    T: Clone,
{
    type MayCancelOutput = <<Self as IntoFuture>::IntoFuture as Future>::Output;

    #[inline(always)]
    fn may_cancel_with<C>(
        self,
        cancel: Pin<&'a mut C>,
    ) -> impl Future<Output = Self::MayCancelOutput>
    where
        C: TrCancellationToken,
    {
        WriteChunkAsync::may_cancel_with(self, cancel)
    }
}

#[pin_project]
pub struct WriterChunkFuture<'a, C, Bw, Tw, T>
where
    C: TrCancellationToken,
    Bw: BorrowMut<Tw>,
    Tw: TrBuffWriter<T>,
    T: Clone,
{
    #[pin]writer_: &'a mut ChunkWriter<Bw, Tw, T>,
    source_: &'a [T],
    cancel_: Pin<&'a mut C>,
}

impl<'a, C, Bw, Tw, T> WriterChunkFuture<'a, C, Bw, Tw, T>
where
    C: TrCancellationToken,
    Bw: BorrowMut<Tw>,
    Tw: TrBuffWriter<T>,
    T: Clone,
{
    pub fn new(
        writer: &'a mut ChunkWriter<Bw, Tw, T>,
        source: &'a [T],
        cancel: Pin<&'a mut C>,
    ) -> Self {
        WriterChunkFuture {
            writer_: writer,
            source_: source,
            cancel_: cancel,
        }
    }

    async fn write_chunk_async_(
        self: Pin<&mut Self>,
    ) -> Result<usize, IoAbortReport<<Tw as TrBuffWriter<T>>::Error>> {
        let mut this = self.project();
        let mut writer = this.writer_.as_mut();
        let writer = writer.writer_.borrow_mut();
        let source = *this.source_;
        let source_len = source.len();
        let mut perform_len = 0usize;
        loop {
            if perform_len >= source_len {
                break Result::Ok(perform_len);
            }
            let req_len = source_len - perform_len;

            #[cfg(test)]
            log::trace!(
                "[WriterCopyFuture::copy_async_] \
                source_len({source_len}), perform_len({perform_len})"
            );

            let w = writer
                .write_async(req_len)
                .may_cancel_with(this.cancel_.as_mut())
                .await;
            let Result::Ok(mut dst) = w else {
                let Result::Err(last_error) = w else {
                    unreachable!("[WriterCopyFuture::copy_async_]")
                };
                let report = IoAbortReport::new(perform_len, last_error);
                break Result::Err(report);
            };
            let dst_len = dst.len();
            debug_assert!(dst_len + perform_len <= source_len);
            let src = &source[perform_len..perform_len + dst_len];
            dst.clone_from_slice(src);
            perform_len += dst_len;
            drop(dst);
        }
    }
}

impl<'a, C, Bw, Tw, T> Future for WriterChunkFuture<'a, C, Bw, Tw, T>
where
    C: TrCancellationToken,
    Bw: BorrowMut<Tw>,
    Tw: TrBuffWriter<T>,
    T: Clone,
{
    type Output = Result<usize, IoAbortReport<<Tw as TrBuffWriter<T>>::Error>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let f = self.write_chunk_async_();
        pin_mut!(f);
        f.poll(cx)
    }
}