bollard 0.21.0

An asynchronous Docker daemon API
Documentation
#![cfg(feature = "buildkit_providerless")]

use std::{
    cmp,
    pin::Pin,
    task::{Context, Poll},
};

use futures_core::TryStream;
use pin_project_lite::pin_project;
use tokio::io::AsyncRead;

pin_project! {
    #[derive(Debug)]
    pub struct IntoAsyncRead<St>
    where
        St: TryStream<Error = std::io::Error>,
        St::Ok: AsRef<[u8]>,
    {
        #[pin]
        stream: St,
        state: ReadState<St::Ok>,
    }
}

#[derive(Debug)]
enum ReadState<T: AsRef<[u8]>> {
    Ready { chunk: T, chunk_start: usize },
    PendingChunk,
    Eof,
}

impl<St> IntoAsyncRead<St>
where
    St: TryStream<Error = std::io::Error>,
    St::Ok: AsRef<[u8]>,
{
    pub(crate) fn new(stream: St) -> Self {
        Self {
            stream,
            state: ReadState::PendingChunk,
        }
    }
}

impl<St> AsyncRead for IntoAsyncRead<St>
where
    St: TryStream<Error = std::io::Error>,
    St::Ok: AsRef<[u8]>,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let mut this = self.project();

        loop {
            match this.state {
                ReadState::Ready { chunk, chunk_start } => {
                    let chunk = chunk.as_ref();
                    let len = cmp::min(buf.remaining(), chunk.len() - *chunk_start);

                    buf.put_slice(&chunk[*chunk_start..*chunk_start + len]);

                    *chunk_start += len;

                    if chunk.len() == *chunk_start {
                        *this.state = ReadState::PendingChunk;
                    }

                    return Poll::Ready(Ok(()));
                }
                ReadState::PendingChunk => {
                    match futures_core::ready!(this.stream.as_mut().try_poll_next(cx)) {
                        Some(Ok(chunk)) => {
                            if !chunk.as_ref().is_empty() {
                                *this.state = ReadState::Ready {
                                    chunk,
                                    chunk_start: 0,
                                };
                            }
                        }
                        Some(Err(err)) => {
                            *this.state = ReadState::Eof;
                            return Poll::Ready(Err(err));
                        }
                        None => {
                            *this.state = ReadState::Eof;
                            return Poll::Ready(Ok(()));
                        }
                    }
                }
                ReadState::Eof => {
                    return Poll::Ready(Ok(()));
                }
            }
        }
    }
}