ve-tos-rust-sdk 2.0.0

volcengine offical tos rust sdk
Documentation
/*
 * Copyright (2024) Volcengine
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_core::Stream;
use futures_util::StreamExt;

use crate::error::TosError;
use crate::reader::{BuildBufferReader, InternalReader};

pub(crate) async fn read_at_most(reader: &mut ( dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin), buf: &mut Vec<u8>, most: usize) -> Result<usize, crate::error::CommonError> {
    if most == 0 {
        return Ok(0);
    }
    let mut read_total = 0usize;
    loop {
        match reader.next().await {
            None => return Ok(read_total),
            Some(result) => {
                let x = result?;
                let mut read_once = x.len();
                if read_total + read_once > most {
                    read_once = most - read_total;
                }
                buf.extend_from_slice(x.slice(0..read_once).as_ref());
                read_total += read_once;
                if read_total >= most {
                    return Ok(read_total);
                }
            }
        }
    }
}

impl<B> Stream for InternalReader<B> where B: Stream<Item=reqwest::Result<Bytes>> + Unpin {
    type Item = Result<Bytes, crate::error::CommonError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.b.poll_next_unpin(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(opt) => {
                match opt {
                    None => {
                        if let Some(total_size) = self.total_size {
                            if self.read_size < total_size {
                                return Poll::Ready(Some(Err(Error::new(ErrorKind::Other, format!("premature end, expected {}, actual {}", total_size, self.read_size)))));
                            }
                        }
                        Poll::Ready(None)
                    }
                    Some(result) => {
                        match result {
                            Err(e) => Poll::Ready(Some(Err(Error::new(ErrorKind::Other, e.to_string())))),
                            Ok(x) => {
                                self.read_size += x.len();
                                Poll::Ready(Some(Ok(x)))
                            }
                        }
                    }
                }
            }
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.b.size_hint()
    }
}

#[derive(Debug, Clone, PartialEq, Default)]
pub(crate) struct StreamVec(Option<Vec<u8>>);

impl Stream for StreamVec {
    type Item = reqwest::Result<Bytes>;

    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.0.is_none() {
            return Poll::Ready(None);
        }

        Poll::Ready(Some(Ok(Bytes::from(self.0.take().unwrap()))))
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        match &self.0 {
            None => (0, None),
            Some(v) => (0, Some(v.len()))
        }
    }
}

impl BuildBufferReader for InternalReader<StreamVec> {
    fn new(input: Vec<u8>) -> Result<(Self, usize), TosError> {
        let len = input.len();
        Ok(
            (Self {
                b: StreamVec(Some(input)),
                total_size: Some(len),
                read_size: 0,
            }, len)
        )
    }
}