Skip to main content

aws_multipart_upload/client/
part.rs

1use std::borrow::Cow;
2use std::collections::BTreeMap;
3use std::fmt::{self, Display, Formatter};
4use std::io::{Result as IoResult, Write};
5use std::ops::{Deref, DerefMut};
6
7use aws_sdk_s3::primitives::ByteStream;
8use bytes::{BufMut as _, BytesMut};
9
10use crate::complete_upload::CompleteMultipartUploadOutput as CompleteResponse;
11use crate::error::{ErrorRepr, Result};
12use crate::part_upload::UploadPartOutput as UploadResponse;
13
14/// Body of the multipart upload request.
15///
16/// This type dereferences to [`BytesMut`], so in particular supports the
17/// methods of [`BufMut`], which is the preferred way of writing data to a
18/// `PartBody`.
19///
20/// `PartBody` also implements [`Write`], so it can also be used in combination
21/// with the class of external writer types that are parametrized by `Write`.
22///
23/// [`BufMut`]: bytes::BufMut
24#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
25pub struct PartBody(BytesMut);
26
27impl PartBody {
28    /// Construct a body from [`BytesMut`].
29    pub fn new(bytes: BytesMut) -> Self {
30        Self(bytes)
31    }
32
33    /// Returns an empty `PartBody` to write to that has pre-allocated capacity.
34    pub fn with_capacity(capacity: usize) -> Self {
35        let bytes = BytesMut::with_capacity(capacity);
36        Self(bytes)
37    }
38
39    /// Current size in bytes of the `PartBody`.
40    pub fn size(&self) -> usize {
41        self.0.len()
42    }
43
44    /// Removes the bytes from the current `PartBody` returning them in a new
45    /// `PartBody`.
46    ///
47    /// After, `self` will be empty but has the same capacity as before the
48    /// `remove_bytes` call.  This operates in constant time as it only involves
49    /// operations that increment a reference count and set some indices on the
50    /// inner view.
51    pub fn remove(&mut self) -> Self {
52        self.split().into()
53    }
54
55    /// Convert this type into a [`ByteStream`], which is the type required by
56    /// the SDK in the request to AWS to add a part to a multipart upload.
57    ///
58    /// This conversion is zero-cost as it only involves operations that
59    /// increment a reference count and set some indices on the inner view.
60    pub fn as_sdk_body(&mut self) -> ByteStream {
61        let buf = self.split();
62        let bytes = buf.freeze();
63        bytes.into()
64    }
65}
66
67impl Write for PartBody {
68    fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
69        let bytes = buf.len();
70        self.reserve(bytes);
71        self.put(buf);
72        Ok(bytes)
73    }
74
75    fn flush(&mut self) -> IoResult<()> {
76        Ok(())
77    }
78}
79
80impl From<BytesMut> for PartBody {
81    fn from(value: BytesMut) -> Self {
82        Self(value)
83    }
84}
85
86impl Deref for PartBody {
87    type Target = BytesMut;
88
89    fn deref(&self) -> &Self::Target {
90        &self.0
91    }
92}
93
94impl DerefMut for PartBody {
95    fn deref_mut(&mut self) -> &mut Self::Target {
96        &mut self.0
97    }
98}
99
100impl AsRef<[u8]> for PartBody {
101    fn as_ref(&self) -> &[u8] {
102        self.deref().as_ref()
103    }
104}
105
106/// Number we assign to a part when uploading.
107///
108/// This, along with the entity tag found in the response, is required in the
109/// request to complete a multipart upload because it identifies the where the
110/// part goes when assembling the full object.
111///
112/// The `Default` behavior is to start with the part number `1`.  This is
113/// because it is a requirement of the API that a part number be an integer
114/// between 1 and 10,000.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
116pub struct PartNumber(i32);
117
118impl PartNumber {
119    /// Initializes the part number at 1, which is a requirement of the API.
120    pub fn new() -> Self {
121        Self(1)
122    }
123
124    /// Create a new `PartNumber` starting with `n`.
125    ///
126    /// Used when resuming an upload that was suspended with completed parts.
127    ///
128    /// For a new upload, use [`Self::new`].
129    pub fn start_with(n: i32) -> Self {
130        Self(n)
131    }
132
133    /// Returns whether this is the first part of an upload.
134    pub fn is_first(&self) -> bool {
135        **self == 1
136    }
137
138    /// Returns whether this is a valid part according to API specification.
139    pub fn is_valid(&self) -> bool {
140        **self > 0
141    }
142
143    /// Increment the `PartNumber` by 1, returning the previous part number.
144    pub fn fetch_incr(&mut self) -> PartNumber {
145        let curr = PartNumber(self.0);
146        self.0 += 1;
147        curr
148    }
149}
150
151impl Default for PartNumber {
152    fn default() -> Self {
153        Self(1)
154    }
155}
156
157impl Deref for PartNumber {
158    type Target = i32;
159
160    fn deref(&self) -> &Self::Target {
161        &self.0
162    }
163}
164
165impl DerefMut for PartNumber {
166    fn deref_mut(&mut self) -> &mut Self::Target {
167        &mut self.0
168    }
169}
170
171impl Display for PartNumber {
172    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
173        write!(f, "part_{}", self.0)
174    }
175}
176
177/// AWS entity tag.
178///
179/// This value is a hash of an object on S3; it is the canonical identifier in
180/// AWS of the object and its contents.
181///
182/// A part in an upload has an e-tag, returned when the part was added
183/// successfully.  In this case, it is critical that the e-tag be retained
184/// alongside the [`PartNumber`] that it corresponded to.  These are needed in
185/// completing the upload.
186///
187/// It is also assigned to a completed upload, but more generally any S3 object.
188/// upload response.
189#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
190pub struct EntityTag(Cow<'static, str>);
191
192impl EntityTag {
193    fn new<T: Into<Cow<'static, str>>>(etag: T) -> Self {
194        Self(etag.into())
195    }
196
197    pub(crate) fn try_from_upload_resp(
198        value: &UploadResponse,
199    ) -> Result<Self, ErrorRepr> {
200        value
201            .e_tag
202            .as_deref()
203            .map(Self::from)
204            .ok_or_else(|| ErrorRepr::Missing("UploadResponse", "e_tag"))
205    }
206
207    pub(crate) fn try_from_complete_resp(
208        value: &CompleteResponse,
209    ) -> Result<Self, ErrorRepr> {
210        value
211            .e_tag
212            .as_deref()
213            .map(Self::from)
214            .ok_or_else(|| ErrorRepr::Missing("CompleteResponse", "e_tag"))
215    }
216}
217
218impl Deref for EntityTag {
219    type Target = str;
220
221    fn deref(&self) -> &str {
222        &self.0
223    }
224}
225
226impl AsRef<str> for EntityTag {
227    fn as_ref(&self) -> &str {
228        self
229    }
230}
231
232impl Display for EntityTag {
233    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
234        self.0.fmt(f)
235    }
236}
237
238impl From<&str> for EntityTag {
239    fn from(value: &str) -> Self {
240        Self::new(value.to_string())
241    }
242}
243
244impl From<String> for EntityTag {
245    fn from(value: String) -> Self {
246        Self(Cow::Owned(value))
247    }
248}
249
250/// The value for a successful part upload request.
251///
252/// All `CompletedPart`s need to be retained in order to construct a valid
253/// complete upload request.
254#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
255pub struct CompletedPart {
256    /// The etag of the object part in S3.
257    pub etag: EntityTag,
258    /// The incrementing integer starting with 1 that identifies this part in
259    /// the part upload.
260    pub part_number: PartNumber,
261    /// The size of this part in bytes.
262    pub part_size: usize,
263}
264
265impl CompletedPart {
266    /// Create a new value.
267    pub fn new(
268        etag: EntityTag,
269        part_number: PartNumber,
270        part_size: usize,
271    ) -> Self {
272        Self { etag, part_number, part_size }
273    }
274}
275
276/// Collection of completed part uploads.
277#[derive(Debug, Clone, Default)]
278pub struct CompletedParts {
279    parts: BTreeMap<PartNumber, CompletedPart>,
280    total_bytes: usize,
281}
282
283impl CompletedParts {
284    /// Add a new `CompletedPart` to the collection, skipping the operation if
285    /// the part number already exists.
286    pub fn insert(&mut self, part: CompletedPart) {
287        let k = part.part_number;
288        if !self.parts.contains_key(&k) {
289            self.total_bytes += part.part_size;
290            self.parts.insert(k, part);
291        }
292    }
293
294    /// Moves all completed parts from `other` into `self`, skipping the
295    /// operation if the part number already exists.
296    pub fn append(&mut self, other: CompletedParts) {
297        other.parts.into_values().for_each(|v| {
298            self.insert(v);
299        })
300    }
301
302    /// Returns the number of parts that have been successfully uploaded.
303    pub fn count(&self) -> usize {
304        self.parts.len()
305    }
306
307    /// Returns the current size in bytes of this upload.
308    pub fn size(&self) -> usize {
309        self.total_bytes
310    }
311}
312
313impl From<&CompletedParts> for aws_sdk_s3::types::CompletedMultipartUpload {
314    fn from(vs: &CompletedParts) -> Self {
315        let parts = vs.parts.values().fold(Vec::new(), |mut acc, v| {
316            acc.push(
317                aws_sdk_s3::types::CompletedPart::builder()
318                    .e_tag(&*v.etag)
319                    .part_number(*v.part_number)
320                    .build(),
321            );
322            acc
323        });
324        aws_sdk_s3::types::CompletedMultipartUpload::builder()
325            .set_parts(Some(parts))
326            .build()
327    }
328}