object_store 0.13.2

A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage, Azure Blob Storage and local files.
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#[cfg(feature = "tokio")]
use std::task::{Context, Poll};

#[cfg(feature = "tokio")]
use crate::PutPayloadMut;
use crate::{PutPayload, PutResult, Result};
use async_trait::async_trait;
use futures_util::future::BoxFuture;
#[cfg(feature = "tokio")]
use tokio::task::JoinSet;

/// An upload part request
pub type UploadPart = BoxFuture<'static, Result<()>>;

/// A trait allowing writing an object in fixed size chunks
///
/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
/// may be polled in parallel, allowing for concurrent uploads.
///
/// Once all part uploads have been polled to completion, the upload can be completed by
/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
/// is called before all [`UploadPart`] have been polled to completion.
#[async_trait]
pub trait MultipartUpload: Send + std::fmt::Debug {
    /// Upload the next part
    ///
    /// Most stores require that all parts excluding the last are at least 5 MiB, and some
    /// further require that all parts excluding the last be the same size, e.g. [R2].
    /// Clients wanting to maximise compatibility should therefore perform writes in
    /// fixed size blocks larger than 5 MiB.
    ///
    /// Implementations may invoke this method multiple times and then await on the
    /// returned futures in parallel
    ///
    /// ```no_run
    /// # use futures_util::StreamExt;
    /// # use object_store::MultipartUpload;
    /// #
    /// # async fn test() {
    /// #
    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
    /// futures_util::future::try_join(p1, p2).await.unwrap();
    /// upload.complete().await.unwrap();
    /// # }
    /// ```
    ///
    /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
    fn put_part(&mut self, data: PutPayload) -> UploadPart;

    /// Complete the multipart upload
    ///
    /// It is implementation defined behaviour if this method is called before polling
    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
    /// it is implementation defined behaviour to call [`MultipartUpload::complete`]
    /// on an already completed or aborted [`MultipartUpload`].
    async fn complete(&mut self) -> Result<PutResult>;

    /// Abort the multipart upload
    ///
    /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
    /// some object stores will automatically clean up any previously uploaded parts.
    /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
    /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
    ///
    /// It will not be possible to call `abort` in all failure scenarios, for example
    /// non-graceful shutdown of the calling application. It is therefore recommended
    /// object stores are configured with lifecycle rules to automatically cleanup
    /// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
    /// for more information.
    ///
    /// It is implementation defined behaviour to call [`MultipartUpload::abort`]
    /// on an already completed or aborted [`MultipartUpload`]
    async fn abort(&mut self) -> Result<()>;
}

#[async_trait]
impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
    fn put_part(&mut self, data: PutPayload) -> UploadPart {
        (**self).put_part(data)
    }

    async fn complete(&mut self) -> Result<PutResult> {
        (**self).complete().await
    }

    async fn abort(&mut self) -> Result<()> {
        (**self).abort().await
    }
}

/// A synchronous write API for uploading data in parallel in fixed size chunks
///
/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
///
/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
/// allowing back pressure on producers, prior to buffering the next part. However, unlike
/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
///
/// [`Sink`]: futures_util::sink::Sink
#[cfg(feature = "tokio")]
#[derive(Debug)]
pub struct WriteMultipart {
    upload: Box<dyn MultipartUpload>,

    buffer: PutPayloadMut,

    chunk_size: usize,

    tasks: JoinSet<Result<()>>,
}

#[cfg(feature = "tokio")]
impl WriteMultipart {
    /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
        Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
    }

    /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
    pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
        Self {
            upload,
            chunk_size,
            buffer: PutPayloadMut::new(),
            tasks: Default::default(),
        }
    }

    /// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
    ///
    /// See [`Self::wait_for_capacity`] for an async version of this function
    pub fn poll_for_capacity(
        &mut self,
        cx: &mut Context<'_>,
        max_concurrency: usize,
    ) -> Poll<Result<()>> {
        while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
            futures_core::ready!(self.tasks.poll_join_next(cx)).unwrap()??
        }
        Poll::Ready(Ok(()))
    }

    /// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
    ///
    /// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
        futures_util::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
    }

    /// Write data to this [`WriteMultipart`]
    ///
    /// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
    /// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
    ///
    /// Note this method is synchronous (not `async`) and will immediately
    /// start new uploads as soon as the internal `chunk_size` is hit,
    /// regardless of how many outstanding uploads are already in progress.
    ///
    /// Back pressure can optionally be applied to producers by calling
    /// [`Self::wait_for_capacity`] prior to calling this method
    pub fn write(&mut self, mut buf: &[u8]) {
        while !buf.is_empty() {
            let remaining = self.chunk_size - self.buffer.content_length();
            let to_read = buf.len().min(remaining);
            self.buffer.extend_from_slice(&buf[..to_read]);
            if to_read == remaining {
                let buffer = std::mem::take(&mut self.buffer);
                self.put_part(buffer.into())
            }
            buf = &buf[to_read..]
        }
    }

    /// Put a chunk of data into this [`WriteMultipart`] without copying
    ///
    /// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
    /// perform writes from non-owned buffers should prefer [`Self::write`] as this
    /// will allow multiple calls to share the same underlying allocation.
    ///
    /// See [`Self::write`] for information on backpressure
    pub fn put(&mut self, mut bytes: bytes::Bytes) {
        while !bytes.is_empty() {
            let remaining = self.chunk_size - self.buffer.content_length();
            if bytes.len() < remaining {
                self.buffer.push(bytes);
                return;
            }
            self.buffer.push(bytes.split_to(remaining));
            let buffer = std::mem::take(&mut self.buffer);
            self.put_part(buffer.into())
        }
    }

    pub(crate) fn put_part(&mut self, part: PutPayload) {
        self.tasks.spawn(self.upload.put_part(part));
    }

    /// Abort this upload, attempting to clean up any successfully uploaded parts
    pub async fn abort(mut self) -> Result<()> {
        self.tasks.shutdown().await;
        self.upload.abort().await
    }

    /// Flush final chunk, and await completion of all in-flight requests
    pub async fn finish(mut self) -> Result<PutResult> {
        if !self.buffer.is_empty() {
            let part = std::mem::take(&mut self.buffer);
            self.put_part(part.into())
        }

        self.wait_for_capacity(0).await?;

        match self.upload.complete().await {
            Err(e) => {
                self.tasks.shutdown().await;
                self.upload.abort().await?;
                Err(e)
            }
            Ok(result) => Ok(result),
        }
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use std::time::Duration;

    use futures_util::FutureExt;
    use parking_lot::Mutex;
    use rand::prelude::StdRng;
    use rand::{RngExt, SeedableRng};

    use crate::ObjectStoreExt;
    use crate::memory::InMemory;
    use crate::path::Path;
    use crate::throttle::{ThrottleConfig, ThrottledStore};

    use super::*;

    #[tokio::test]
    async fn test_concurrency() {
        let config = ThrottleConfig {
            wait_put_per_call: Duration::from_millis(1),
            ..Default::default()
        };

        let path = Path::from("foo");
        let store = ThrottledStore::new(InMemory::new(), config);
        let upload = store.put_multipart(&path).await.unwrap();
        let mut write = WriteMultipart::new_with_chunk_size(upload, 10);

        for _ in 0..20 {
            write.write(&[0; 5]);
        }
        assert!(write.wait_for_capacity(10).now_or_never().is_none());
        write.wait_for_capacity(10).await.unwrap()
    }

    #[derive(Debug, Default)]
    struct InstrumentedUpload {
        chunks: Arc<Mutex<Vec<PutPayload>>>,
    }

    #[async_trait]
    impl MultipartUpload for InstrumentedUpload {
        fn put_part(&mut self, data: PutPayload) -> UploadPart {
            self.chunks.lock().push(data);
            futures_util::future::ready(Ok(())).boxed()
        }

        async fn complete(&mut self) -> Result<PutResult> {
            Ok(PutResult {
                e_tag: None,
                version: None,
            })
        }

        async fn abort(&mut self) -> Result<()> {
            unimplemented!()
        }
    }

    #[tokio::test]
    async fn test_write_multipart() {
        let mut rng = StdRng::seed_from_u64(42);

        for method in [0.0, 0.5, 1.0] {
            for _ in 0..10 {
                for chunk_size in [1, 17, 23] {
                    let upload = Box::<InstrumentedUpload>::default();
                    let chunks = Arc::clone(&upload.chunks);
                    let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);

                    let mut expected = Vec::with_capacity(1024);

                    for _ in 0..50 {
                        let chunk_size = rng.random_range(0..30);
                        let data: Vec<_> = (0..chunk_size).map(|_| rng.random()).collect();
                        expected.extend_from_slice(&data);

                        match rng.random_bool(method) {
                            true => write.put(data.into()),
                            false => write.write(&data),
                        }
                    }
                    write.finish().await.unwrap();

                    let chunks = chunks.lock();

                    let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
                    assert_eq!(expected, actual);

                    for chunk in chunks.iter().take(chunks.len() - 1) {
                        assert_eq!(chunk.content_length(), chunk_size)
                    }

                    let last_chunk = chunks.last().unwrap().content_length();
                    assert!(last_chunk <= chunk_size, "{chunk_size}");
                }
            }
        }
    }
}