ve-tos-rust-sdk 2.0.3

volcengine offical tos rust sdk
Documentation
/*
 * Copyright (2024) Volcengine
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
use crate::asynchronous::object::ObjectAPI;
use crate::asynchronous::tos::AsyncSpawner;
use crate::error::TosError;
use crate::internal::check_bucket_and_key;
use crate::object::{ListObjectsType2Input, ListObjectsType2Output};
use crate::paginator::{NextPageInput, Paginator};
use crate::tos::ConfigAware;
use arc_swap::ArcSwap;
use async_channel::Receiver;
use async_trait::async_trait;
use std::sync::Arc;

pub fn new_list_objects_type2_paginator<C, ASP>(input: &ListObjectsType2Input, recursive_for_delimiter: bool,
                                                client: Arc<C>, asp: ASP) -> Result<impl ListObjectsType2Paginator, TosError>
where
    C: ObjectAPI + ConfigAware + Sync + Send + 'static,
    ASP: AsyncSpawner,
{
    let _ = check_bucket_and_key(input, client.is_custom_domain())?;
    let mut _input = input.clone();
    let _client = client.clone();
    let (sender, receiver) = async_channel::bounded(3);
    let _ = asp.spawn(async move {
        let mut need_break = false;
        if _input.delimiter == "/" {
            let mut prefixes = Vec::with_capacity(16);
            let mut last_page_end = false;
            loop {
                if last_page_end {
                    if prefixes.is_empty() {
                        let _ = sender.send((true, Err(TosError::client_error("invalid status error")))).await;
                        break;
                    }
                    let prefix = prefixes.remove(0);
                    _input.prefix = prefix;
                    _input.start_after = "".to_string();
                    _input.continuation_token = "".to_string();
                    last_page_end = false;
                }
                let result = _client.list_objects_type2(&_input).await;
                if let Ok(ref o) = result {
                    if o.is_truncated() {
                        _input.continuation_token = o.next_continuation_token.clone();
                    } else {
                        last_page_end = true;
                    }

                    if recursive_for_delimiter {
                        for cp in o.common_prefixes() {
                            prefixes.push(cp.prefix().to_string());
                        }
                    }
                    need_break = last_page_end && prefixes.is_empty();
                } else {
                    need_break = true;
                }

                if let Err(_) = sender.send((need_break, result)).await {
                    need_break = true;
                }
                if need_break {
                    break;
                }
            }
        } else {
            loop {
                let result = _client.list_objects_type2(&_input).await;
                if let Ok(ref o) = result {
                    if o.is_truncated() {
                        _input.continuation_token = o.next_continuation_token.clone();
                    } else {
                        need_break = true;
                    }
                } else {
                    need_break = true;
                }

                if let Err(_) = sender.send((need_break, result)).await {
                    need_break = true
                }
                if need_break {
                    break;
                }
            }
        }
        sender.close();
    });

    Ok(PaginatorImpl {
        is_end: ArcSwap::new(Arc::new(false)),
        last_err: ArcSwap::new(Arc::new(None)),
        current_prefix: ArcSwap::new(Arc::new(input.prefix.clone())),
        receiver,
        client,
    })
}


#[async_trait]
pub trait ListObjectsType2Paginator: Paginator {
    async fn next_page(&self, input: &NextPageInput) -> Result<ListObjectsType2Output, TosError>;
}


pub(crate) struct PaginatorImpl<C>
{
    is_end: ArcSwap<bool>,
    last_err: ArcSwap<Option<TosError>>,
    current_prefix: ArcSwap<String>,
    receiver: Receiver<(bool, Result<ListObjectsType2Output, TosError>)>,
    client: Arc<C>,
}

unsafe impl<C> Sync for PaginatorImpl<C> {}

impl<C> Paginator for PaginatorImpl<C> {
    fn has_next(&self) -> bool {
        if self.last_err.load().is_some() {
            return false;
        }
        !*self.is_end.load().as_ref()
    }

    fn current_prefix(&self) -> Arc<String> {
        self.current_prefix.load().clone()
    }

    fn close(&self) {
        self.receiver.close();
    }
}

#[async_trait]
impl<C> ListObjectsType2Paginator for PaginatorImpl<C>
where
    C: ObjectAPI,
{
    async fn next_page(&self, _: &NextPageInput) -> Result<ListObjectsType2Output, TosError> {
        if let Some(e) = self.last_err.load().as_ref() {
            return Err(e.clone());
        }
        if *self.is_end.load().as_ref() {
            return Err(TosError::client_error("no next page error"));
        }

        match self.receiver.recv().await {
            Err(_) => {
                self.is_end.store(Arc::new(true));
                Err(TosError::client_error("no next page error"))
            }
            Ok((is_end, result)) => {
                match result {
                    Err(e) => {
                        self.last_err.store(Arc::new(Some(e.clone())));
                        Err(e)
                    }
                    Ok(output) => {
                        self.current_prefix.store(Arc::new(output.prefix().to_string()));
                        if is_end {
                            self.is_end.store(Arc::new(true));
                        }
                        Ok(output)
                    }
                }
            }
        }
    }
}