ve-tos-rust-sdk 2.2.0

volcengine offical tos rust sdk
Documentation
/*
 * Copyright (2024) Volcengine
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
use crate::asynchronous::common::DataTransferListener;
use crate::asynchronous::http::HttpResponse;
use crate::asynchronous::internal::{AsyncInputTranslator, OutputParser};
use crate::asynchronous::tos::{AsyncRuntime, TosClientImpl};
use crate::common::{DataTransferStatus, Meta, RequestInfo};
use crate::constant::{HEADER_CONTENT_LENGTH, HEADER_CONTENT_RANGE, UUID_NODE};
use crate::credential::{Credentials, CredentialsProvider};
use crate::error::{GenericError, TosError};
use crate::http::HttpRequest;
use crate::internal::{get_header_value, InputDescriptor, InputTranslator, MockAsyncInputTranslator};
use crate::multipart::UploadPartFromFileInput;
use crate::object::{GetObjectToFileOutput, HeadObjectOutput, PutObjectFromFileInput};
use crate::reader::InternalReader;
use async_trait::async_trait;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::StreamExt;
use std::io::{Error, ErrorKind, SeekFrom};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs;
use tokio::fs::File;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use uuid::Uuid;

pub(crate) struct FileReader {
    pub(crate) b: ReaderStream<File>,
    pub(crate) total_size: Option<usize>,
    pub(crate) read_size: usize,
}

impl FileReader {
    async fn open(input: &str) -> Result<(File, Option<usize>), TosError> {
        match File::open(input).await {
            Ok(fd) => {
                if let Ok(x) = fd.metadata().await {
                    let len = x.len() as usize;
                    return Ok((fd, Some(len)));
                }
                Ok((fd, None))
            }
            Err(e) => Err(TosError::client_error_with_cause("open file error", GenericError::IoError(e.to_string()))),
        }
    }
}

impl Stream for FileReader
{
    type Item = Result<Bytes, crate::error::CommonError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.b.poll_next_unpin(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(opt) => {
                match opt {
                    None => {
                        if let Some(total_size) = self.total_size {
                            if self.read_size < total_size {
                                return Poll::Ready(Some(Err(Error::new(ErrorKind::Other, format!("premature end, expected {}, actual {}", total_size, self.read_size)))));
                            }
                        }
                        Poll::Ready(None)
                    }
                    Some(result) => {
                        match result {
                            Err(e) => Poll::Ready(Some(Err(Error::new(ErrorKind::Other, e.to_string())))),
                            Ok(x) => {
                                self.read_size += x.len();
                                Poll::Ready(Some(Ok(x)))
                            }
                        }
                    }
                }
            }
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.b.size_hint()
    }
}

#[async_trait]
pub(crate) trait BuildFileStream: Sized {
    async fn new(input: &str) -> Result<(Self, Option<usize>), TosError>;
    async fn new_with_offset(input: &str, offset: i64) -> Result<(Self, Option<usize>), TosError>;
}
#[async_trait]
impl BuildFileStream for FileReader {
    async fn new(input: &str) -> Result<(Self, Option<usize>), TosError> {
        let (fd, len) = Self::open(input).await?;
        Ok((Self { b: ReaderStream::new(fd), total_size: len, read_size: 0 }, len))
    }

    async fn new_with_offset(input: &str, offset: i64) -> Result<(Self, Option<usize>), TosError> {
        let (mut fd, len) = Self::open(input).await?;
        if offset > 0 {
            if let Err(e) = fd.seek(SeekFrom::Start(offset as u64)).await {
                return Err(TosError::client_error_with_cause("seek file error", GenericError::IoError(e.to_string())));
            }
        }
        Ok((Self { b: ReaderStream::new(fd), total_size: len, read_size: 0 }, len))
    }
}

impl<P, C, S> TosClientImpl<P, C, S>
where
    P: CredentialsProvider<C>,
    C: Credentials,
    S: AsyncRuntime,
{
    pub(crate) async fn do_request_af<F, K, B>(&self, input: &F) -> Result<K, TosError>
    where
        F: AsyncInputTranslator<B>,
        K: OutputParser + Send,
        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin + 'static,
    {
        self.do_request_common::<MockAsyncInputTranslator, F, K, B>(None, Some(input)).await
    }
}


#[async_trait]
impl<B> AsyncInputTranslator<B> for PutObjectFromFileInput
where
    B: BuildFileStream + Send,
{
    async fn trans(&self) -> Result<HttpRequest<B>, TosError> {
        let mut request = self.inner.trans()?;
        request.operation = self.operation();
        if self.file_path != "" {
            let (body, len) = B::new(&self.file_path).await?;
            request.body = Some(body);
            if let Some(l) = len {
                if self.inner.content_length < 0 {
                    request.header.insert(HEADER_CONTENT_LENGTH, l.to_string());
                }
            }
        }
        Ok(request)
    }
}

#[async_trait]
impl<B> AsyncInputTranslator<B> for UploadPartFromFileInput
where
    B: BuildFileStream + Send,
{
    async fn trans(&self) -> Result<HttpRequest<B>, TosError> {
        let mut request = self.inner.trans()?;
        request.operation = self.operation();
        if self.offset < 0 {
            return Err(TosError::client_error("invalid offset for upload part"));
        }
        if self.part_size >= 0 {
            request.header.insert(HEADER_CONTENT_LENGTH, self.part_size.to_string());
        }
        if self.file_path != "" {
            let (body, len) = B::new_with_offset(&self.file_path, self.offset).await?;
            request.body = Some(body);
            if let Some(l) = len {
                if self.part_size < 0 {
                    request.header.insert(HEADER_CONTENT_LENGTH, (l - self.offset as usize).to_string());
                }
            }
        }
        Ok(request)
    }
}

#[async_trait]
impl OutputParser for GetObjectToFileOutput {
    async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError>
    where
        B: Send,
    {
        let head_object_output = HeadObjectOutput::parse_by_header(response.headers(), request_info, meta)?;
        let content_range = get_header_value(response.headers(), HEADER_CONTENT_RANGE);
        let content = Box::new(response.bytes_stream()) as Box<dyn Stream<Item=reqwest::Result<Bytes>> + Send + Unpin>;
        let mut reader;
        if head_object_output.content_length >= 0 {
            reader = InternalReader::sized(content, head_object_output.content_length as usize);
        } else {
            reader = InternalReader::new(content);
        }

        if let Some(ref rc) = request.request_context {
            if let Some(ref rl) = rc.rate_limiter {
                reader.set_rate_limiter(rl.clone());
            }
            if let Some(ref adts) = rc.async_data_transfer_listener {
                reader.set_async_data_transfer_listener(adts.clone());
                reader.operation = request.operation.to_string();
                reader.bucket = request.bucket.to_string();
                reader.key = request.key.to_string();
                reader.retry_count = request.retry_count;
            }
        }

        let file_path = &request.request_context.as_ref().unwrap().file_path;
        let path = Path::new(file_path);
        match path.parent() {
            None => return Err(TosError::client_error(format!("cannot get parent for path {}", file_path))),
            Some(p) => {
                if !p.exists() {
                    if let Err(e) = fs::create_dir_all(p).await {
                        return Err(TosError::client_error_with_cause(format!("create dir for parent {} error", p.display()),
                                                                     GenericError::IoError(e.to_string())));
                    }
                }
            }
        }
        let final_file_path;
        if path.exists() && path.is_dir() {
            final_file_path = path.join(request.key);
        } else {
            final_file_path = path.to_path_buf();
        }

        let temp_file_path = final_file_path.parent().unwrap().join(Uuid::now_v1(&UUID_NODE).to_string());
        match File::options().write(true).truncate(true).create(true).open(temp_file_path.clone()).await {
            Err(e) => {
                return Err(TosError::client_error_with_cause("open file to write error", GenericError::IoError(e.to_string())))
            }
            Ok(mut fd) => {
                loop {
                    match reader.next().await {
                        None => break,
                        Some(result) => {
                            match result {
                                Err(re) => {
                                    let _ = fs::remove_file(temp_file_path).await;
                                    return Err(TosError::client_error_with_cause("read content to write error", GenericError::IoError(re.to_string())));
                                }
                                Ok(data) => {
                                    if let Err(we) = fd.write_all(data.as_ref()).await {
                                        let _ = fs::remove_file(temp_file_path).await;
                                        return Err(TosError::client_error_with_cause("write data to file error", GenericError::IoError(we.to_string())));
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

        if let Err(re) = fs::rename(temp_file_path.clone(), final_file_path).await {
            let _ = fs::remove_file(temp_file_path).await;
            return Err(TosError::client_error_with_cause("rename file error", GenericError::IoError(re.to_string())));
        }

        Ok(Self {
            content_range,
            head_object_output,
        })
    }
}

impl DataTransferListener for crate::object::PutObjectFromFileInput {
    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
        &self.inner.async_data_transfer_listener
    }

    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
        self.inner.async_data_transfer_listener = Some(listener.into());
    }
}

impl DataTransferListener for crate::object::GetObjectToFileInput {
    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
        &self.inner.async_data_transfer_listener
    }

    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
        self.inner.async_data_transfer_listener = Some(listener.into());
    }
}

impl DataTransferListener for UploadPartFromFileInput {
    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
        &self.inner.async_data_transfer_listener
    }

    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
        self.inner.async_data_transfer_listener = Some(listener.into());
    }
}