opendal 0.25.2

OpenDAL: Access data freely, painlessly, and efficiently.
Documentation
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use futures::future::BoxFuture;
use futures::ready;
use futures::FutureExt;
use futures::Stream;

use crate::raw::*;
use crate::*;

/// ObjectLister is returned by `Object::list` to list objects.
///
/// User can use object lister as `Stream<Item = Result<Object>>` or
/// call `next_page` directly.
pub struct ObjectLister {
    acc: Arc<dyn Accessor>,
    pager: Option<ObjectPager>,

    buf: VecDeque<ObjectEntry>,
    /// We will move `pager` inside future and return it back while future is ready.
    /// Thus, we should not allow calling other function while we already have
    /// a future.
    #[allow(clippy::type_complexity)]
    fut: Option<BoxFuture<'static, (ObjectPager, Result<Option<Vec<ObjectEntry>>>)>>,
}

impl ObjectLister {
    /// Create a new object lister.
    pub fn new(op: Operator, pager: ObjectPager) -> Self {
        Self {
            acc: op.inner(),
            pager: Some(pager),
            buf: VecDeque::default(),
            fut: None,
        }
    }

    /// Fetch the operator that used by this object.
    pub fn operator(&self) -> Operator {
        self.acc.clone().into()
    }

    /// next_page can be used to fetch a new object page.
    ///
    /// # Notes
    ///
    /// Don't mix the usage of `next_page` and `Stream<Item = Result<Object>>`.
    /// Always using the same calling style.
    pub async fn next_page(&mut self) -> Result<Option<Vec<Object>>> {
        debug_assert!(
            self.fut.is_none(),
            "there are ongoing futures for next page"
        );

        let entries = if !self.buf.is_empty() {
            mem::take(&mut self.buf)
        } else {
            match self
                .pager
                .as_mut()
                .expect("pager must be valid")
                .next_page()
                .await?
            {
                // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
                //
                // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
                Some(entries) => entries.into(),
                None => return Ok(None),
            }
        };

        Ok(Some(
            entries
                .into_iter()
                .map(|v| v.into_object(self.operator()))
                .collect(),
        ))
    }
}

impl Stream for ObjectLister {
    type Item = Result<Object>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Some(oe) = self.buf.pop_front() {
            return Poll::Ready(Some(Ok(oe.into_object(self.operator()))));
        }

        if let Some(fut) = self.fut.as_mut() {
            let (op, res) = ready!(fut.poll_unpin(cx));
            self.pager = Some(op);

            match res? {
                Some(oes) => {
                    self.fut = None;
                    self.buf = oes.into();
                    return self.poll_next(cx);
                }
                None => {
                    self.fut = None;
                    return Poll::Ready(None);
                }
            }
        }

        let mut pager = self.pager.take().expect("pager must be valid");
        let fut = async move {
            let res = pager.next_page().await;

            (pager, res)
        };
        self.fut = Some(Box::pin(fut));
        self.poll_next(cx)
    }
}

pub struct BlockingObjectLister {
    acc: Arc<dyn Accessor>,
    pager: BlockingObjectPager,
    buf: VecDeque<ObjectEntry>,
}

impl BlockingObjectLister {
    /// Create a new object lister.
    pub fn new(acc: Arc<dyn Accessor>, pager: BlockingObjectPager) -> Self {
        Self {
            acc,
            pager,
            buf: VecDeque::default(),
        }
    }

    /// Fetch the operator that used by this object.
    pub fn operator(&self) -> Operator {
        self.acc.clone().into()
    }

    /// next_page can be used to fetch a new object page.
    pub fn next_page(&mut self) -> Result<Option<Vec<Object>>> {
        let entries = if !self.buf.is_empty() {
            mem::take(&mut self.buf)
        } else {
            match self.pager.next_page()? {
                // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
                //
                // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
                Some(entries) => entries.into(),
                None => return Ok(None),
            }
        };

        Ok(Some(
            entries
                .into_iter()
                .map(|v| v.into_object(self.operator()))
                .collect(),
        ))
    }
}

/// TODO: we can implement next_chunk.
impl Iterator for BlockingObjectLister {
    type Item = Result<Object>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(oe) = self.buf.pop_front() {
            return Some(Ok(oe.into_object(self.operator())));
        }

        self.buf = match self.pager.next_page() {
            // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
            //
            // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
            Ok(Some(entries)) => entries.into(),
            Ok(None) => return None,
            Err(err) => return Some(Err(err)),
        };

        self.next()
    }
}