use crate::common::DataTransferType;
use crate::error::TosError;
use crate::reader::{BuildBufferReader, InternalReader, MultifunctionalReader};
use bytes::Bytes;
use futures_core::Future;
use futures_core::{ready, Stream};
use futures_util::StreamExt;
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) async fn read_at_most<R: Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin + ?Sized>(reader: &mut R, buf: &mut Vec<u8>, most: usize) -> Result<usize, crate::error::CommonError> {
if most == 0 {
return Ok(0);
}
let mut read_total = 0usize;
loop {
match reader.next().await {
None => return Ok(read_total),
Some(result) => {
let x = result?;
let mut read_once = x.len();
if read_total + read_once > most {
read_once = most - read_total;
}
buf.extend_from_slice(x.slice(0..read_once).as_ref());
read_total += read_once;
if read_total >= most {
return Ok(read_total);
}
}
}
}
}
impl<B> Stream for InternalReader<B>
where
B: Stream<Item=reqwest::Result<Bytes>> + Unpin,
{
type Item = Result<Bytes, crate::error::CommonError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.first_read {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferStarted, -1)).poll(cx));
self.first_read = false;
}
match self.b.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(opt) => {
match opt {
None => {
if let Some(total_size) = self.total_size {
if self.read_size < total_size {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferFailed, -1)).poll(cx));
return Poll::Ready(Some(Err(Error::new(ErrorKind::Other, format!("premature end, expected {}, actual {}", total_size, self.read_size)))));
}
}
if !self.succeed_send {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferSucceed, -1)).poll(cx));
self.succeed_send = true;
}
Poll::Ready(None)
}
Some(result) => {
match result {
Err(e) => {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferFailed, -1)).poll(cx));
Poll::Ready(Some(Err(Error::new(ErrorKind::Other, e.to_string()))))
}
Ok(x) => {
self.read_size += x.len();
if x.len() > 0 {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferRW, x.len() as i64)).poll(cx));
if let Some(total_size) = self.total_size {
if self.read_size == total_size {
if !self.succeed_send {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferSucceed, -1)).poll(cx));
self.succeed_send = true;
}
}
}
}
Poll::Ready(Some(Ok(x)))
}
}
}
}
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.b.size_hint()
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub(crate) struct StreamVec(Option<Vec<u8>>);
impl Stream for StreamVec {
type Item = reqwest::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.0.is_none() {
return Poll::Ready(None);
}
Poll::Ready(Some(Ok(Bytes::from(self.0.take().unwrap()))))
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.0 {
None => (0, None),
Some(v) => (0, Some(v.len()))
}
}
}
impl BuildBufferReader for InternalReader<StreamVec> {
fn new(input: Vec<u8>) -> Result<(Self, usize), TosError> {
let len = input.len();
Ok(
(Self::sized(StreamVec(Some(input)), len), len)
)
}
}
impl<B> Stream for MultifunctionalReader<B>
where
B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin,
{
type Item = Result<Bytes, crate::error::CommonError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.first_read {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferStarted, -1)).poll(cx));
self.first_read = false;
}
match self.b.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(opt) => {
match opt {
None => {
if self.digest.is_some() {
self.set_crc64();
}
if !self.succeed_send {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferSucceed, -1)).poll(cx));
self.succeed_send = true;
}
Poll::Ready(None)
}
Some(result) => {
if result.is_err() {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferFailed, -1)).poll(cx));
return Poll::Ready(Some(result));
}
let b = result.as_ref().unwrap().as_ref();
self.read_size += b.len();
if b.len() > 0 {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferRW, b.len() as i64)).poll(cx));
if self.digest.is_some() {
self.digest.as_mut().unwrap().write(b);
}
if let Some(total_size) = self.total_size {
if self.read_size == total_size {
if self.digest.is_some() {
self.set_crc64();
}
if !self.succeed_send {
let _ = ready!(std::pin::pin!(self.async_send_data_transfer_status(DataTransferType::DataTransferSucceed, -1)).poll(cx));
self.succeed_send = true;
}
}
}
}
Poll::Ready(Some(result))
}
}
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.b.size_hint()
}
}