bitar 0.14.0

bita archive utilities
Documentation
use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::{ready, stream::Stream, StreamExt};
use reqwest::RequestBuilder;
use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;

use crate::archive_reader::HttpReaderError;

pub(crate) struct HttpRangeRequest {
    request: RequestBuilder,
    state: RequestState,
    size: u64,
    offset: u64,
    retry_delay: Duration,
    retry_count: u32,
}

impl HttpRangeRequest {
    pub fn new(request: RequestBuilder, offset: u64, size: u64) -> Self {
        Self {
            request,
            offset,
            size,
            retry_delay: Duration::from_secs(0),
            retry_count: 0,
            state: RequestState::Init,
        }
    }

    pub fn retry(mut self, retry_count: u32, retry_delay: Duration) -> Self {
        self.retry_delay = retry_delay;
        self.retry_count = retry_count;
        self
    }

    async fn single_fail(
        request: RequestBuilder,
        offset: u64,
        size: u64,
    ) -> Result<Bytes, HttpReaderError> {
        let end_offset = offset + size - 1;
        let request = request.header(
            reqwest::header::RANGE,
            format!("bytes={}-{}", offset, end_offset),
        );
        let response = request.send().await?;
        Ok(response.bytes().await?)
    }

    pub async fn single(mut self) -> Result<Bytes, HttpReaderError> {
        loop {
            match Self::single_fail(
                self.request
                    .try_clone()
                    .ok_or(HttpReaderError::RequestNotClonable)?,
                self.offset,
                self.size,
            )
            .await
            {
                Ok(item) => return Ok(item),
                Err(err) => {
                    if self.retry_count == 0 {
                        return Err(err);
                    } else {
                        log::warn!("request failed (retrying soon): {}", err);
                        self.retry_count -= 1;
                    }
                }
            }
            sleep(self.retry_delay).await;
        }
    }

    fn poll_read_fail(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, HttpReaderError>>> {
        loop {
            match &mut self.state {
                RequestState::Init => {
                    let end_offset = self.offset + self.size - 1;
                    let request = match self.request.try_clone() {
                        Some(request) => request,
                        None => return Poll::Ready(Some(Err(HttpReaderError::RequestNotClonable))),
                    };
                    let request = request
                        .header(
                            reqwest::header::RANGE,
                            format!("bytes={}-{}", self.offset, end_offset),
                        )
                        .send();
                    self.state = RequestState::Request(Box::new(request));
                }
                RequestState::Request(request) => match ready!(Pin::new(&mut *request).poll(cx)) {
                    Ok(response) => {
                        self.state = RequestState::Stream(Box::new(response.bytes_stream()))
                    }
                    Err(err) => return Poll::Ready(Some(Err(HttpReaderError::from(err)))),
                },
                RequestState::Stream(stream) => match ready!(stream.poll_next_unpin(cx)) {
                    Some(Ok(item)) => {
                        self.offset += item.len() as u64;
                        self.size -= item.len() as u64;
                        return Poll::Ready(Some(Ok(item)));
                    }
                    Some(Err(err)) => return Poll::Ready(Some(Err(HttpReaderError::from(err)))),
                    None => return Poll::Ready(None),
                },
                RequestState::Delay(sleep) => {
                    ready!(Pin::new(sleep).poll(cx));
                    self.state = RequestState::Init;
                }
            }
        }
    }

    fn poll_read(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, HttpReaderError>>> {
        loop {
            match self.poll_read_fail(cx) {
                Poll::Ready(Some(Err(err))) => {
                    if self.retry_count == 0 {
                        return Poll::Ready(Some(Err(err)));
                    } else {
                        log::warn!("request failed (retrying soon): {}", err);
                        self.retry_count -= 1;
                        self.state = RequestState::Delay(Box::pin(sleep(self.retry_delay)));
                    }
                }
                result => return result,
            }
        }
    }
}

enum RequestState {
    Init,
    Request(Box<dyn Future<Output = Result<reqwest::Response, reqwest::Error>> + Send + Unpin>),
    Stream(Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send + Unpin>),
    Delay(Pin<Box<tokio::time::Sleep>>),
}

impl Stream for HttpRangeRequest {
    type Item = Result<Bytes, HttpReaderError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        self.poll_read(cx)
    }
}