object_store/upload.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "tokio")]
19use std::task::{Context, Poll};
20
21#[cfg(feature = "tokio")]
22use crate::PutPayloadMut;
23use crate::{PutPayload, PutResult, Result};
24use async_trait::async_trait;
25use futures_util::future::BoxFuture;
26#[cfg(feature = "tokio")]
27use tokio::task::JoinSet;
28
29/// An upload part request
30pub type UploadPart = BoxFuture<'static, Result<()>>;
31
32/// A trait allowing writing an object in fixed size chunks
33///
34/// Consecutive chunks of data can be written by calling [`MultipartUpload::put_part`] and polling
35/// the returned futures to completion. Multiple futures returned by [`MultipartUpload::put_part`]
36/// may be polled in parallel, allowing for concurrent uploads.
37///
38/// Once all part uploads have been polled to completion, the upload can be completed by
39/// calling [`MultipartUpload::complete`]. This will make the entire uploaded object visible
40/// as an atomic operation.It is implementation behind behaviour if [`MultipartUpload::complete`]
41/// is called before all [`UploadPart`] have been polled to completion.
42#[async_trait]
43pub trait MultipartUpload: Send + std::fmt::Debug {
44 /// Upload the next part
45 ///
46 /// Most stores require that all parts excluding the last are at least 5 MiB, and some
47 /// further require that all parts excluding the last be the same size, e.g. [R2].
48 /// Clients wanting to maximise compatibility should therefore perform writes in
49 /// fixed size blocks larger than 5 MiB.
50 ///
51 /// Implementations may invoke this method multiple times and then await on the
52 /// returned futures in parallel
53 ///
54 /// ```no_run
55 /// # use futures_util::StreamExt;
56 /// # use object_store::MultipartUpload;
57 /// #
58 /// # async fn test() {
59 /// #
60 /// let mut upload: Box<&dyn MultipartUpload> = todo!();
61 /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
62 /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
63 /// futures_util::future::try_join(p1, p2).await.unwrap();
64 /// upload.complete().await.unwrap();
65 /// # }
66 /// ```
67 ///
68 /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
69 fn put_part(&mut self, data: PutPayload) -> UploadPart;
70
71 /// Complete the multipart upload
72 ///
73 /// It is implementation defined behaviour if this method is called before polling
74 /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
75 /// it is implementation defined behaviour to call [`MultipartUpload::complete`]
76 /// on an already completed or aborted [`MultipartUpload`].
77 async fn complete(&mut self) -> Result<PutResult>;
78
79 /// Abort the multipart upload
80 ///
81 /// If a [`MultipartUpload`] is dropped without calling [`MultipartUpload::complete`],
82 /// some object stores will automatically clean up any previously uploaded parts.
83 /// However, some stores, such as S3 and GCS, cannot perform cleanup on drop.
84 /// As such [`MultipartUpload::abort`] can be invoked to perform this cleanup.
85 ///
86 /// It will not be possible to call `abort` in all failure scenarios, for example
87 /// non-graceful shutdown of the calling application. It is therefore recommended
88 /// object stores are configured with lifecycle rules to automatically cleanup
89 /// unused parts older than some threshold. See [crate::aws] and [crate::gcp]
90 /// for more information.
91 ///
92 /// It is implementation defined behaviour to call [`MultipartUpload::abort`]
93 /// on an already completed or aborted [`MultipartUpload`]
94 async fn abort(&mut self) -> Result<()>;
95}
96
97#[async_trait]
98impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
99 fn put_part(&mut self, data: PutPayload) -> UploadPart {
100 (**self).put_part(data)
101 }
102
103 async fn complete(&mut self) -> Result<PutResult> {
104 (**self).complete().await
105 }
106
107 async fn abort(&mut self) -> Result<()> {
108 (**self).abort().await
109 }
110}
111
112/// A synchronous write API for uploading data in parallel in fixed size chunks
113///
114/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel
115///
116/// The design also takes inspiration from [`Sink`] with [`WriteMultipart::wait_for_capacity`]
117/// allowing back pressure on producers, prior to buffering the next part. However, unlike
118/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers
119///
120/// [`Sink`]: futures_util::sink::Sink
121#[cfg(feature = "tokio")]
122#[derive(Debug)]
123pub struct WriteMultipart {
124 upload: Box<dyn MultipartUpload>,
125
126 buffer: PutPayloadMut,
127
128 chunk_size: usize,
129
130 tasks: JoinSet<Result<()>>,
131}
132
133#[cfg(feature = "tokio")]
134impl WriteMultipart {
135 /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
136 pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
137 Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
138 }
139
140 /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
141 pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
142 Self {
143 upload,
144 chunk_size,
145 buffer: PutPayloadMut::new(),
146 tasks: Default::default(),
147 }
148 }
149
150 /// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress
151 ///
152 /// See [`Self::wait_for_capacity`] for an async version of this function
153 pub fn poll_for_capacity(
154 &mut self,
155 cx: &mut Context<'_>,
156 max_concurrency: usize,
157 ) -> Poll<Result<()>> {
158 while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
159 futures_core::ready!(self.tasks.poll_join_next(cx)).unwrap()??
160 }
161 Poll::Ready(Ok(()))
162 }
163
164 /// Wait until there are less than `max_concurrency` [`UploadPart`] in progress
165 ///
166 /// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
167 pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
168 futures_util::future::poll_fn(|cx| self.poll_for_capacity(cx, max_concurrency)).await
169 }
170
171 /// Write data to this [`WriteMultipart`]
172 ///
173 /// Data is buffered using [`PutPayloadMut::extend_from_slice`]. Implementations looking to
174 /// write data from owned buffers may prefer [`Self::put`] as this avoids copying.
175 ///
176 /// Note this method is synchronous (not `async`) and will immediately
177 /// start new uploads as soon as the internal `chunk_size` is hit,
178 /// regardless of how many outstanding uploads are already in progress.
179 ///
180 /// Back pressure can optionally be applied to producers by calling
181 /// [`Self::wait_for_capacity`] prior to calling this method
182 pub fn write(&mut self, mut buf: &[u8]) {
183 while !buf.is_empty() {
184 let remaining = self.chunk_size - self.buffer.content_length();
185 let to_read = buf.len().min(remaining);
186 self.buffer.extend_from_slice(&buf[..to_read]);
187 if to_read == remaining {
188 let buffer = std::mem::take(&mut self.buffer);
189 self.put_part(buffer.into())
190 }
191 buf = &buf[to_read..]
192 }
193 }
194
195 /// Put a chunk of data into this [`WriteMultipart`] without copying
196 ///
197 /// Data is buffered using [`PutPayloadMut::push`]. Implementations looking to
198 /// perform writes from non-owned buffers should prefer [`Self::write`] as this
199 /// will allow multiple calls to share the same underlying allocation.
200 ///
201 /// See [`Self::write`] for information on backpressure
202 pub fn put(&mut self, mut bytes: bytes::Bytes) {
203 while !bytes.is_empty() {
204 let remaining = self.chunk_size - self.buffer.content_length();
205 if bytes.len() < remaining {
206 self.buffer.push(bytes);
207 return;
208 }
209 self.buffer.push(bytes.split_to(remaining));
210 let buffer = std::mem::take(&mut self.buffer);
211 self.put_part(buffer.into())
212 }
213 }
214
215 pub(crate) fn put_part(&mut self, part: PutPayload) {
216 self.tasks.spawn(self.upload.put_part(part));
217 }
218
219 /// Abort this upload, attempting to clean up any successfully uploaded parts
220 pub async fn abort(mut self) -> Result<()> {
221 self.tasks.shutdown().await;
222 self.upload.abort().await
223 }
224
225 /// Flush final chunk, and await completion of all in-flight requests
226 pub async fn finish(mut self) -> Result<PutResult> {
227 if !self.buffer.is_empty() {
228 let part = std::mem::take(&mut self.buffer);
229 self.put_part(part.into())
230 }
231
232 self.wait_for_capacity(0).await?;
233
234 match self.upload.complete().await {
235 Err(e) => {
236 self.tasks.shutdown().await;
237 self.upload.abort().await?;
238 Err(e)
239 }
240 Ok(result) => Ok(result),
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use std::sync::Arc;
248 use std::time::Duration;
249
250 use futures_util::FutureExt;
251 use parking_lot::Mutex;
252 use rand::prelude::StdRng;
253 use rand::{RngExt, SeedableRng};
254
255 use crate::ObjectStoreExt;
256 use crate::memory::InMemory;
257 use crate::path::Path;
258 use crate::throttle::{ThrottleConfig, ThrottledStore};
259
260 use super::*;
261
262 #[tokio::test]
263 async fn test_concurrency() {
264 let config = ThrottleConfig {
265 wait_put_per_call: Duration::from_millis(1),
266 ..Default::default()
267 };
268
269 let path = Path::from("foo");
270 let store = ThrottledStore::new(InMemory::new(), config);
271 let upload = store.put_multipart(&path).await.unwrap();
272 let mut write = WriteMultipart::new_with_chunk_size(upload, 10);
273
274 for _ in 0..20 {
275 write.write(&[0; 5]);
276 }
277 assert!(write.wait_for_capacity(10).now_or_never().is_none());
278 write.wait_for_capacity(10).await.unwrap()
279 }
280
281 #[derive(Debug, Default)]
282 struct InstrumentedUpload {
283 chunks: Arc<Mutex<Vec<PutPayload>>>,
284 }
285
286 #[async_trait]
287 impl MultipartUpload for InstrumentedUpload {
288 fn put_part(&mut self, data: PutPayload) -> UploadPart {
289 self.chunks.lock().push(data);
290 futures_util::future::ready(Ok(())).boxed()
291 }
292
293 async fn complete(&mut self) -> Result<PutResult> {
294 Ok(PutResult {
295 e_tag: None,
296 version: None,
297 })
298 }
299
300 async fn abort(&mut self) -> Result<()> {
301 unimplemented!()
302 }
303 }
304
305 #[tokio::test]
306 async fn test_write_multipart() {
307 let mut rng = StdRng::seed_from_u64(42);
308
309 for method in [0.0, 0.5, 1.0] {
310 for _ in 0..10 {
311 for chunk_size in [1, 17, 23] {
312 let upload = Box::<InstrumentedUpload>::default();
313 let chunks = Arc::clone(&upload.chunks);
314 let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);
315
316 let mut expected = Vec::with_capacity(1024);
317
318 for _ in 0..50 {
319 let chunk_size = rng.random_range(0..30);
320 let data: Vec<_> = (0..chunk_size).map(|_| rng.random()).collect();
321 expected.extend_from_slice(&data);
322
323 match rng.random_bool(method) {
324 true => write.put(data.into()),
325 false => write.write(&data),
326 }
327 }
328 write.finish().await.unwrap();
329
330 let chunks = chunks.lock();
331
332 let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
333 assert_eq!(expected, actual);
334
335 for chunk in chunks.iter().take(chunks.len() - 1) {
336 assert_eq!(chunk.content_length(), chunk_size)
337 }
338
339 let last_chunk = chunks.last().unwrap().content_length();
340 assert!(last_chunk <= chunk_size, "{chunk_size}");
341 }
342 }
343 }
344 }
345}