Skip to main content

object_store/
upload.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "tokio")]
19use std::task::{Context, Poll};
20
21#[cfg(feature = "tokio")]
22use crate::PutPayloadMut;
23use crate::{PutPayload, PutResult, Result};
24use async_trait::async_trait;
25use futures_util::future::BoxFuture;
26#[cfg(feature = "tokio")]
27use tokio::task::JoinSet;
28
29/// An upload part request
30pub type UploadPart = BoxFuture<'static, Result<()>>;
31
32/// A trait allowing writing an object in fixed size chunks
33///
34/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
35/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
36/// may be polled in parallel, allowing for concurrent uploads.
37///
38/// Once all part uploads have been polled to completion, the upload can be completed by
39/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
40/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
41/// is called before all [`UploadPart`] have been polled to completion.
42#[async_trait]
43pub trait MultipartUpload: Send + std::fmt::Debug {
44    /// Upload the next part
45    ///
46    /// Most stores require that all parts excluding the last are at least 5 MiB, and some
47    /// further require that all parts excluding the last be the same size, e.g. [R2].
48    /// Clients wanting to maximise compatibility should therefore perform writes in
49    /// fixed size blocks larger than 5 MiB.
50    ///
51    /// Implementations may invoke this method multiple times and then await on the
52    /// returned futures in parallel
53    ///
54    /// ```no_run
55    /// # use futures_util::StreamExt;
56    /// # use object_store::MultipartUpload;
57    /// #
58    /// # async fn test() {
59    /// #
60    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
61    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
62    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
63    /// futures_util::future::try_join(p1, p2).await.unwrap();
64    /// upload.complete().await.unwrap();
65    /// # }
66    /// ```
67    ///
68    /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
69    fn put_part(&mut self, data: PutPayload) -> UploadPart;
70
71    /// Complete the multipart upload
72    ///
73    /// It is implementation defined behaviour if this method is called before polling
74    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
75    /// it is implementation defined behaviour to call [`MultipartUpload::complete`]
76    /// on an already completed or aborted [`MultipartUpload`].
77    async fn complete(&mut self) -> Result<PutResult>;
78
79    /// Abort the multipart upload
80    ///
81    /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
82    /// some object stores will automatically clean up any previously uploaded parts.
83    /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
84    /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
85    ///
86    /// It will not be possible to call `abort` in all failure scenarios, for example
87    /// non-graceful shutdown of the calling application. It is therefore recommended
88    /// object stores are configured with lifecycle rules to automatically cleanup
89    /// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
90    /// for more information.
91    ///
92    /// It is implementation defined behaviour to call [`MultipartUpload::abort`]
93    /// on an already completed or aborted [`MultipartUpload`]
94    async fn abort(&mut self) -> Result<()>;
95}
96
97#[async_trait]
98impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
99    fn put_part(&mut self, data: PutPayload) -> UploadPart {
100        (**self).put_part(data)
101    }
102
103    async fn complete(&mut self) -> Result<PutResult> {
104        (**self).complete().await
105    }
106
107    async fn abort(&mut self) -> Result<()> {
108        (**self).abort().await
109    }
110}
111
112/// A synchronous write API for uploading data in parallel in fixed size chunks
113///
114/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
115///
116/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
117/// allowing back pressure on producers, prior to buffering the next part. However, unlike
118/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
119///
120/// [`Sink`]: futures_util::sink::Sink
121#[cfg(feature = "tokio")]
122#[derive(Debug)]
123pub struct WriteMultipart {
124    upload: Box<dyn MultipartUpload>,
125
126    buffer: PutPayloadMut,
127
128    chunk_size: usize,
129
130    tasks: JoinSet<Result<()>>,
131}
132
133#[cfg(feature = "tokio")]
134impl WriteMultipart {
135    /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
136    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
137        Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
138    }
139
140    /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
141    pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
142        Self {
143            upload,
144            chunk_size,
145            buffer: PutPayloadMut::new(),
146            tasks: Default::default(),
147        }
148    }
149
150    /// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
151    ///
152    /// See [`Self::wait_for_capacity`] for an async version of this function
153    pub fn poll_for_capacity(
154        &mut self,
155        cx: &mut Context<'_>,
156        max_concurrency: usize,
157    ) -> Poll<Result<()>> {
158        while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
159            futures_core::ready!(self.tasks.poll_join_next(cx)).unwrap()??
160        }
161        Poll::Ready(Ok(()))
162    }
163
164    /// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
165    ///
166    /// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
167    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
168        futures_util::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
169    }
170
171    /// Write data to this [`WriteMultipart`]
172    ///
173    /// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
174    /// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
175    ///
176    /// Note this method is synchronous (not `async`) and will immediately
177    /// start new uploads as soon as the internal `chunk_size` is hit,
178    /// regardless of how many outstanding uploads are already in progress.
179    ///
180    /// Back pressure can optionally be applied to producers by calling
181    /// [`Self::wait_for_capacity`] prior to calling this method
182    pub fn write(&mut self, mut buf: &[u8]) {
183        while !buf.is_empty() {
184            let remaining = self.chunk_size - self.buffer.content_length();
185            let to_read = buf.len().min(remaining);
186            self.buffer.extend_from_slice(&buf[..to_read]);
187            if to_read == remaining {
188                let buffer = std::mem::take(&mut self.buffer);
189                self.put_part(buffer.into())
190            }
191            buf = &buf[to_read..]
192        }
193    }
194
195    /// Put a chunk of data into this [`WriteMultipart`] without copying
196    ///
197    /// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
198    /// perform writes from non-owned buffers should prefer [`Self::write`] as this
199    /// will allow multiple calls to share the same underlying allocation.
200    ///
201    /// See [`Self::write`] for information on backpressure
202    pub fn put(&mut self, mut bytes: bytes::Bytes) {
203        while !bytes.is_empty() {
204            let remaining = self.chunk_size - self.buffer.content_length();
205            if bytes.len() < remaining {
206                self.buffer.push(bytes);
207                return;
208            }
209            self.buffer.push(bytes.split_to(remaining));
210            let buffer = std::mem::take(&mut self.buffer);
211            self.put_part(buffer.into())
212        }
213    }
214
215    pub(crate) fn put_part(&mut self, part: PutPayload) {
216        self.tasks.spawn(self.upload.put_part(part));
217    }
218
219    /// Abort this upload, attempting to clean up any successfully uploaded parts
220    pub async fn abort(mut self) -> Result<()> {
221        self.tasks.shutdown().await;
222        self.upload.abort().await
223    }
224
225    /// Flush final chunk, and await completion of all in-flight requests
226    pub async fn finish(mut self) -> Result<PutResult> {
227        if !self.buffer.is_empty() {
228            let part = std::mem::take(&mut self.buffer);
229            self.put_part(part.into())
230        }
231
232        self.wait_for_capacity(0).await?;
233
234        match self.upload.complete().await {
235            Err(e) => {
236                self.tasks.shutdown().await;
237                self.upload.abort().await?;
238                Err(e)
239            }
240            Ok(result) => Ok(result),
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use std::sync::Arc;
248    use std::time::Duration;
249
250    use futures_util::FutureExt;
251    use parking_lot::Mutex;
252    use rand::prelude::StdRng;
253    use rand::{RngExt, SeedableRng};
254
255    use crate::ObjectStoreExt;
256    use crate::memory::InMemory;
257    use crate::path::Path;
258    use crate::throttle::{ThrottleConfig, ThrottledStore};
259
260    use super::*;
261
262    #[tokio::test]
263    async fn test_concurrency() {
264        let config = ThrottleConfig {
265            wait_put_per_call: Duration::from_millis(1),
266            ..Default::default()
267        };
268
269        let path = Path::from("foo");
270        let store = ThrottledStore::new(InMemory::new(), config);
271        let upload = store.put_multipart(&path).await.unwrap();
272        let mut write = WriteMultipart::new_with_chunk_size(upload, 10);
273
274        for _ in 0..20 {
275            write.write(&[0; 5]);
276        }
277        assert!(write.wait_for_capacity(10).now_or_never().is_none());
278        write.wait_for_capacity(10).await.unwrap()
279    }
280
281    #[derive(Debug, Default)]
282    struct InstrumentedUpload {
283        chunks: Arc<Mutex<Vec<PutPayload>>>,
284    }
285
286    #[async_trait]
287    impl MultipartUpload for InstrumentedUpload {
288        fn put_part(&mut self, data: PutPayload) -> UploadPart {
289            self.chunks.lock().push(data);
290            futures_util::future::ready(Ok(())).boxed()
291        }
292
293        async fn complete(&mut self) -> Result<PutResult> {
294            Ok(PutResult {
295                e_tag: None,
296                version: None,
297            })
298        }
299
300        async fn abort(&mut self) -> Result<()> {
301            unimplemented!()
302        }
303    }
304
305    #[tokio::test]
306    async fn test_write_multipart() {
307        let mut rng = StdRng::seed_from_u64(42);
308
309        for method in [0.0, 0.5, 1.0] {
310            for _ in 0..10 {
311                for chunk_size in [1, 17, 23] {
312                    let upload = Box::<InstrumentedUpload>::default();
313                    let chunks = Arc::clone(&upload.chunks);
314                    let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);
315
316                    let mut expected = Vec::with_capacity(1024);
317
318                    for _ in 0..50 {
319                        let chunk_size = rng.random_range(0..30);
320                        let data: Vec<_> = (0..chunk_size).map(|_| rng.random()).collect();
321                        expected.extend_from_slice(&data);
322
323                        match rng.random_bool(method) {
324                            true => write.put(data.into()),
325                            false => write.write(&data),
326                        }
327                    }
328                    write.finish().await.unwrap();
329
330                    let chunks = chunks.lock();
331
332                    let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
333                    assert_eq!(expected, actual);
334
335                    for chunk in chunks.iter().take(chunks.len() - 1) {
336                        assert_eq!(chunk.content_length(), chunk_size)
337                    }
338
339                    let last_chunk = chunks.last().unwrap().content_length();
340                    assert!(last_chunk <= chunk_size, "{chunk_size}");
341                }
342            }
343        }
344    }
345}