use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::StreamExt;
use crate::error::TosError;
use crate::reader::{BuildBufferReader, InternalReader};
pub(crate) async fn read_at_most(reader: &mut ( dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin), buf: &mut Vec<u8>, most: usize) -> Result<usize, crate::error::CommonError> {
if most == 0 {
return Ok(0);
}
let mut read_total = 0usize;
loop {
match reader.next().await {
None => return Ok(read_total),
Some(result) => {
let x = result?;
let mut read_once = x.len();
if read_total + read_once > most {
read_once = most - read_total;
}
buf.extend_from_slice(x.slice(0..read_once).as_ref());
read_total += read_once;
if read_total >= most {
return Ok(read_total);
}
}
}
}
}
impl<B> Stream for InternalReader<B> where B: Stream<Item=reqwest::Result<Bytes>> + Unpin {
type Item = Result<Bytes, crate::error::CommonError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.b.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(opt) => {
match opt {
None => {
if let Some(total_size) = self.total_size {
if self.read_size < total_size {
return Poll::Ready(Some(Err(Error::new(ErrorKind::Other, format!("premature end, expected {}, actual {}", total_size, self.read_size)))));
}
}
Poll::Ready(None)
}
Some(result) => {
match result {
Err(e) => Poll::Ready(Some(Err(Error::new(ErrorKind::Other, e.to_string())))),
Ok(x) => {
self.read_size += x.len();
Poll::Ready(Some(Ok(x)))
}
}
}
}
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.b.size_hint()
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub(crate) struct StreamVec(Option<Vec<u8>>);
impl Stream for StreamVec {
type Item = reqwest::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.0.is_none() {
return Poll::Ready(None);
}
Poll::Ready(Some(Ok(Bytes::from(self.0.take().unwrap()))))
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.0 {
None => (0, None),
Some(v) => (0, Some(v.len()))
}
}
}
impl BuildBufferReader for InternalReader<StreamVec> {
fn new(input: Vec<u8>) -> Result<(Self, usize), TosError> {
let len = input.len();
Ok(
(Self {
b: StreamVec(Some(input)),
total_size: Some(len),
read_size: 0,
}, len)
)
}
}