abs_buff 0.1.0

ABStraction of BUFFered IO
Documentation
use core::{
    borrow::BorrowMut,
    future::{Future, IntoFuture},
    marker::PhantomData,
    pin::Pin,
    task::{Context, Poll},
};

use pin_project::pin_project;
use pin_utils::pin_mut;

use abs_sync::{cancellation::*, x_deps::pin_utils};

use crate::{
    utils::{IoAbortReport, TrChunkFiller},
    TrBuffReader,
};

pub struct ReaderAsChunkFiller<B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    _use_r_: PhantomData<R>,
    _use_t_: PhantomData<[T]>,
    reader_: B,
}

impl<B, R, T> ReaderAsChunkFiller<B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    pub const fn new(reader: B) -> Self {
        ReaderAsChunkFiller {
            _use_r_: PhantomData,
            _use_t_: PhantomData,
            reader_: reader,
        }
    }

    pub fn can_fill(&mut self) -> bool {
        self.reader_.borrow_mut().can_read()
    }

    pub fn fill_async<'a>(
        &'a mut self,
        target: &'a mut [T],
    ) -> ReaderFillAsync<'a, B, R, T> {
        ReaderFillAsync::new(self, target)
    }
}

impl<B, R, T> TrChunkFiller<T> for ReaderAsChunkFiller<B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    type FillError = IoAbortReport<<R as TrBuffReader<T>>::Error>;
    type FillAsync<'a> = ReaderFillAsync<'a, B, R, T> where Self: 'a;

    #[inline(always)]
    fn can_fill(&mut self) -> bool {
        ReaderAsChunkFiller::can_fill(self)
    }

    #[inline(always)]
    fn fill_async<'a>(
        &'a mut self,
        target: &'a mut [T],
    ) -> Self::FillAsync<'a> {
        ReaderAsChunkFiller::fill_async(self, target)
    }
}

pub struct ReaderFillAsync<'a, B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    filler_: &'a mut ReaderAsChunkFiller<B, R, T>,
    target_: &'a mut [T],
}

impl<'a, B, R, T> ReaderFillAsync<'a, B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    pub fn new(
        filler: &'a mut ReaderAsChunkFiller<B, R, T>,
        target: &'a mut [T],
    ) -> Self {
        ReaderFillAsync {
            filler_: filler,
            target_: target,
        }
    }

    pub fn may_cancel_with<C>(
        self,
        cancel: Pin<&'a mut C>,
    ) -> ReaderFillFuture<'a, C, B, R, T>
    where
        C: TrCancellationToken,
    {
        ReaderFillFuture::new(self.filler_, self.target_, cancel)
    }
}

impl<'a, B, R, T> IntoFuture for ReaderFillAsync<'a, B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    type IntoFuture = ReaderFillFuture<'a, NonCancellableToken, B, R, T>;
    type Output = <Self::IntoFuture as Future>::Output;

    fn into_future(self) -> Self::IntoFuture {
        let cancel = NonCancellableToken::pinned();
        ReaderFillFuture::new(self.filler_, self.target_, cancel)
    }
}

impl<'a, B, R, T> TrIntoFutureMayCancel<'a> for ReaderFillAsync<'a, B, R, T>
where
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    type MayCancelOutput = <Self as IntoFuture>::Output;

    #[inline(always)]
    fn may_cancel_with<C>(
        self,
        cancel: Pin<&'a mut C>,
    ) -> impl Future<Output = Self::MayCancelOutput>
    where
        C: TrCancellationToken,
    {
        ReaderFillAsync::may_cancel_with(self, cancel)
    }
}

#[pin_project]
pub struct ReaderFillFuture<'a, C, B, R, T>
where
    C: TrCancellationToken,
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    #[pin]filler_: &'a mut ReaderAsChunkFiller<B, R, T>,
    #[pin]target_: &'a mut [T],
    cancel_: Pin<&'a mut C>,
}

impl<'a, C, B, R, T> ReaderFillFuture<'a, C, B, R, T>
where
    C: TrCancellationToken,
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    pub fn new(
        filler: &'a mut ReaderAsChunkFiller<B, R, T>,
        target: &'a mut [T],
        cancel: Pin<&'a mut C>,
    ) -> Self {
        ReaderFillFuture {
            filler_: filler,
            target_: target,
            cancel_: cancel,
        }
    }

    async fn fill_async_(
        self: Pin<&mut Self>,
    ) -> Result<usize, IoAbortReport<<R as TrBuffReader<T>>::Error>> {
        let mut this = self.project();
        let mut filler = this.filler_.as_mut();
        let reader = filler.reader_.borrow_mut();
        let mut target = this.target_.as_mut();
        let target_len = target.len();
        let mut perform_len = 0usize;
        loop {
            if perform_len >= target_len {
                break Result::Ok(perform_len);
            }
            let req_len = target_len - perform_len;

            #[cfg(test)]
            log::trace!(
                "[ReaderFillFuture::fill_async_] \
                target_len({target_len}), perform_len({perform_len})"
            );
            let r = reader
                .read_async(req_len)
                .may_cancel_with(this.cancel_.as_mut())
                .await;
            let Result::Ok(src) = r else {
                let Result::Err(last_error) = r else {
                    unreachable!("[ReaderFillFuture::fill_async_]")
                };
                let report = IoAbortReport::new(perform_len, last_error);
                break Result::Err(report);
            };
            let src_len = src.len();

            #[cfg(test)]
            log::trace!(
                "[ReaderFillFuture::fill_async_] read_async src_len({src_len})"
            );
            debug_assert!(src_len <= req_len);
            debug_assert!(src_len + perform_len <= target_len);
            let dst = &mut target[perform_len..perform_len + src_len];
            dst.clone_from_slice(&src);
            perform_len += src_len;
            drop(src);
        }
    }
}

impl<'a, C, B, R, T> Future for ReaderFillFuture<'a, C, B, R, T>
where
    C: TrCancellationToken,
    B: BorrowMut<R>,
    R: TrBuffReader<T>,
    T: Clone,
{
    type Output = Result<
        usize,
        IoAbortReport<<R as TrBuffReader<T>>::Error>,
    >;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let f = self.fill_async_();
        pin_mut!(f);
        f.poll(cx)
    }
}