Skip to main content

mountpoint_s3_client/
object_client.rs

1use std::fmt::{self, Debug};
2use std::ops::Range;
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use auto_impl::auto_impl;
7use bytes::Bytes;
8use futures::Stream;
9use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
10use std::collections::HashMap;
11use thiserror::Error;
12use time::OffsetDateTime;
13
14use crate::checksums::{
15    self, crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64,
16};
17use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
18
19mod etag;
20pub use etag::ETag;
21
22/// A generic interface to S3-like object storage services.
23///
24/// This trait defines the common methods that all object services implement.
25///
26/// This is an async trait defined with the [async-trait](https://crates.io/crates/async-trait)
27/// crate, and so implementations of this trait must use the `#[async_trait::async_trait]`
28/// attribute.
29#[cfg_attr(not(docsrs), async_trait)]
30#[auto_impl(Arc)]
31pub trait ObjectClient {
32    type GetObjectResponse: GetObjectResponse<ClientError = Self::ClientError>;
33    type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
34    type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;
35
36    /// Query the part size this client uses for GET operations to the object store.
37    fn read_part_size(&self) -> usize;
38
39    /// Query the part size this client uses for PUT operations to the object store.
40    fn write_part_size(&self) -> usize;
41
42    /// Query the initial read window size this client uses for backpressure GetObject requests.
43    /// This can be `None` if backpressure is disabled.
44    fn initial_read_window_size(&self) -> Option<usize>;
45
46    /// Query current memory usage stats for the client. This can be `None` if the client
47    /// does not record the stats.
48    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;
49
50    /// Delete a single object from the object store.
51    ///
52    /// DeleteObject will succeed even if the object within the bucket does not exist.
53    async fn delete_object(
54        &self,
55        bucket: &str,
56        key: &str,
57    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError>;
58
59    /// Create a copy of an existing object. Currently, this functionality has the following limitations:
60    /// - Supported only for copying between matching bucket types:
61    ///     - Standard S3 to Standard S3 buckets.
62    ///     - S3 Express to S3 Express buckets.
63    /// - Host header must use virtual host addressing style (path style is not supported) and both source and dest buckets must have dns compliant name.
64    /// - Only {bucket}/{key} format is supported for source and passing arn as source will not work.
65    /// - Source bucket is assumed to be in the same region as destination bucket.
66    async fn copy_object(
67        &self,
68        source_bucket: &str,
69        source_key: &str,
70        destination_bucket: &str,
71        destination_key: &str,
72        params: &CopyObjectParams,
73    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError>;
74
75    /// Get an object from the object store. Returns a stream of body parts of the object. Parts are
76    /// guaranteed to be returned by the stream in order and contiguously.
77    async fn get_object(
78        &self,
79        bucket: &str,
80        key: &str,
81        params: &GetObjectParams,
82    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError>;
83
84    /// List the objects in a bucket under a given prefix
85    async fn list_objects(
86        &self,
87        bucket: &str,
88        continuation_token: Option<&str>,
89        delimiter: &str,
90        max_keys: usize,
91        prefix: &str,
92    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError>;
93
94    /// Retrieve object metadata without retrieving the object contents
95    async fn head_object(
96        &self,
97        bucket: &str,
98        key: &str,
99        params: &HeadObjectParams,
100    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError>;
101
102    /// Put an object into the object store. Returns a [PutObjectRequest] for callers
103    /// to provide the content of the object.
104    async fn put_object(
105        &self,
106        bucket: &str,
107        key: &str,
108        params: &PutObjectParams,
109    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError>;
110
111    /// Put an object into the object store.
112    async fn put_object_single<'a>(
113        &self,
114        bucket: &str,
115        key: &str,
116        params: &PutObjectSingleParams,
117        contents: impl AsRef<[u8]> + Send + 'a,
118    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
119
120    /// Retrieves all the metadata from an object without returning the object contents.
121    async fn get_object_attributes(
122        &self,
123        bucket: &str,
124        key: &str,
125        max_parts: Option<usize>,
126        part_number_marker: Option<usize>,
127        object_attributes: &[ObjectAttribute],
128    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError>;
129
130    /// Rename an object from some source key to a destination key within the same bucket.
131    async fn rename_object(
132        &self,
133        bucket: &str,
134        src_key: &str,
135        dest_key: &str,
136        params: &RenameObjectParams,
137    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError>;
138}
139
140/// The top-level error type returned by calls to an [`ObjectClient`].
141///
142/// Errors that are explicitly modeled on a per-request-type basis are [`ServiceError`]s. Other
143/// generic or unhandled errors are [`ClientError`]s.
144///
145/// The distinction between these two types of error can sometimes be blurry. As a rough heuristic,
146/// [`ServiceError`]s are those that *any reasonable implementation* of an object client would be
147/// capable of experiencing, and [`ClientError`]s are anything else. For example, any object client
148/// could experience a "no such key" error, but only object clients that implement a permissions
149/// system could experience "permission denied" errors. When in doubt, we err towards *not* adding
150/// new [`ServiceError`]s, as they are public API for *every* object client.
151///
152/// [`ServiceError`]: ObjectClientError::ServiceError
153/// [`ClientError`]: ObjectClientError::ClientError
154#[derive(Debug, Error, PartialEq)]
155pub enum ObjectClientError<S, C> {
156    /// An error returned by the service itself
157    #[error("Service error")]
158    ServiceError(#[source] S),
159
160    /// An error within the object client (for example, an unexpected response, or a failure to
161    /// construct the request).
162    #[error("Client error")]
163    ClientError(#[from] C),
164}
165
166impl<S, C> ProvideErrorMetadata for ObjectClientError<S, C>
167where
168    S: ProvideErrorMetadata,
169    C: ProvideErrorMetadata,
170{
171    fn meta(&self) -> ClientErrorMetadata {
172        match self {
173            Self::ClientError(err) => err.meta(),
174            Self::ServiceError(err) => err.meta(),
175        }
176    }
177}
178
179impl ProvideErrorMetadata for GetObjectError {
180    fn meta(&self) -> ClientErrorMetadata {
181        match self {
182            GetObjectError::NoSuchBucket(client_error_metadata) => client_error_metadata.clone(),
183            GetObjectError::NoSuchKey(client_error_metadata) => client_error_metadata.clone(),
184            GetObjectError::PreconditionFailed(client_error_metadata) => client_error_metadata.clone(),
185        }
186    }
187}
188
189impl ProvideErrorMetadata for ListObjectsError {
190    fn meta(&self) -> ClientErrorMetadata {
191        Default::default()
192    }
193}
194
195impl ProvideErrorMetadata for HeadObjectError {
196    fn meta(&self) -> ClientErrorMetadata {
197        Default::default()
198    }
199}
200
201impl ProvideErrorMetadata for DeleteObjectError {
202    fn meta(&self) -> ClientErrorMetadata {
203        Default::default()
204    }
205}
206
207impl ProvideErrorMetadata for RenameObjectError {
208    fn meta(&self) -> ClientErrorMetadata {
209        Default::default()
210    }
211}
212
213impl ProvideErrorMetadata for PutObjectError {
214    fn meta(&self) -> ClientErrorMetadata {
215        Default::default()
216    }
217}
218
219/// Shorthand type for the result of an object client request
220pub type ObjectClientResult<T, S, C> = Result<T, ObjectClientError<S, C>>;
221
222/// Errors returned by a [`get_object`](ObjectClient::get_object) request
223#[derive(Debug, Error, PartialEq, Eq)]
224#[non_exhaustive]
225pub enum GetObjectError {
226    #[error("The bucket does not exist")]
227    NoSuchBucket(ClientErrorMetadata),
228
229    #[error("The key does not exist")]
230    NoSuchKey(ClientErrorMetadata),
231
232    #[error("At least one of the preconditions specified did not hold")]
233    PreconditionFailed(ClientErrorMetadata),
234}
235
236/// Parameters to a [`get_object`](ObjectClient::get_object) request
237#[derive(Debug, Default, Clone)]
238#[non_exhaustive]
239pub struct GetObjectParams {
240    pub range: Option<Range<u64>>,
241    pub if_match: Option<ETag>,
242    pub checksum_mode: Option<ChecksumMode>,
243    /// An optional caller-supplied identifier passed through to the memory pool on buffer
244    /// allocations for this request. Not related to the S3 request ID returned by the service.
245    pub custom_id: Option<u64>,
246}
247
248impl GetObjectParams {
249    /// Create a default [GetObjectParams].
250    pub fn new() -> Self {
251        Self::default()
252    }
253
254    /// Set the range retrieved by the GetObject request
255    pub fn range(mut self, value: Option<Range<u64>>) -> Self {
256        self.range = value;
257        self
258    }
259
260    /// Set the required etag on the object
261    pub fn if_match(mut self, value: Option<ETag>) -> Self {
262        self.if_match = value;
263        self
264    }
265
266    /// Set option to retrieve checksum as part of the GetObject request
267    pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
268        self.checksum_mode = value;
269        self
270    }
271
272    /// Set an optional caller-supplied identifier passed through to the memory pool on buffer
273    /// allocations for this request. Not related to the S3 request ID returned by the service.
274    pub fn custom_id(mut self, value: Option<u64>) -> Self {
275        self.custom_id = value;
276        self
277    }
278}
279
280/// Result of a [`list_objects`](ObjectClient::list_objects) request
281#[derive(Debug)]
282#[non_exhaustive]
283pub struct ListObjectsResult {
284    /// The list of objects.
285    pub objects: Vec<ObjectInfo>,
286
287    /// The list of common prefixes. This rolls up all of the objects with a common prefix up to
288    /// the next instance of the delimiter.
289    pub common_prefixes: Vec<String>,
290
291    /// If present, the continuation token to use to query more results.
292    pub next_continuation_token: Option<String>,
293}
294
295/// Errors returned by a [`list_objects`](ObjectClient::list_objects) request
296#[derive(Debug, Error, PartialEq, Eq)]
297#[non_exhaustive]
298pub enum ListObjectsError {
299    #[error("The bucket does not exist")]
300    NoSuchBucket,
301}
302
303/// Parameters to a [`head_object`](ObjectClient::head_object) request
304#[derive(Debug, Default, Clone)]
305#[non_exhaustive]
306pub struct HeadObjectParams {
307    /// Enable to retrieve checksum as part of the HeadObject request
308    pub checksum_mode: Option<ChecksumMode>,
309}
310
311impl HeadObjectParams {
312    /// Create a default [HeadObjectParams].
313    pub fn new() -> Self {
314        Self::default()
315    }
316
317    /// Set option to retrieve checksum as part of the HeadObject request
318    pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
319        self.checksum_mode = value;
320        self
321    }
322}
323
324/// Enable [ChecksumMode] to retrieve object checksums
325#[non_exhaustive]
326#[derive(Clone, Debug, PartialEq)]
327pub enum ChecksumMode {
328    /// Retrieve checksums
329    Enabled,
330}
331
332/// Result of a [`head_object`](ObjectClient::head_object) request
333#[derive(Debug)]
334#[non_exhaustive]
335pub struct HeadObjectResult {
336    /// Size of the object in bytes.
337    ///
338    /// Refers to the `Content-Length` HTTP header for HeadObject.
339    pub size: u64,
340
341    /// The time this object was last modified.
342    pub last_modified: OffsetDateTime,
343
344    /// Entity tag of this object.
345    pub etag: ETag,
346
347    /// Storage class for this object.
348    ///
349    /// The value is optional because HeadObject does not return the storage class in its response
350    /// for objects in the S3 Standard storage class.
351    /// See examples in the
352    /// [Amazon S3 API Reference](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Examples).
353    pub storage_class: Option<String>,
354
355    /// Objects in flexible retrieval storage classes (such as GLACIER and DEEP_ARCHIVE) are only
356    /// accessible after restoration
357    pub restore_status: Option<RestoreStatus>,
358    /// Checksum of the object.
359    ///
360    /// HeadObject must explicitly request for this field to be included,
361    /// otherwise the values will be empty.
362    pub checksum: Checksum,
363
364    /// Server-side encryption type that was used to store the object.
365    pub sse_type: Option<String>,
366
367    /// Server-side encryption KMS key ID that was used to store the object.
368    pub sse_kms_key_id: Option<String>,
369}
370
371/// Errors returned by a [`head_object`](ObjectClient::head_object) request
372#[derive(Debug, Error, PartialEq, Eq)]
373#[non_exhaustive]
374pub enum HeadObjectError {
375    /// Note that HeadObject cannot distinguish between NoSuchBucket and NoSuchKey errors
376    #[error("The object was not found")]
377    NotFound,
378}
379
380/// Result of a [`delete_object`](ObjectClient::delete_object) request
381///
382/// Note: DeleteObject requests on a non-existent object within a bucket are considered a success.
383// TODO: Populate this struct with return fields from the S3 API, e.g., version id, delete marker.
384#[derive(Debug)]
385#[non_exhaustive]
386pub struct DeleteObjectResult {}
387
388/// Errors returned by a [`delete_object`](ObjectClient::delete_object) request
389#[derive(Debug, Error, PartialEq, Eq)]
390#[non_exhaustive]
391pub enum DeleteObjectError {
392    #[error("The bucket does not exist")]
393    NoSuchBucket,
394}
395
396/// Result of a [`copy_object`](ObjectClient::copy_object) request
397#[derive(Debug)]
398#[non_exhaustive]
399pub struct CopyObjectResult {
400    // TODO: Populate this struct with return fields from the S3 API, e.g., etag.
401}
402
403/// Errors returned by a [`copy_object`](ObjectClient::copy_object) request
404#[derive(Debug, Error, PartialEq, Eq)]
405#[non_exhaustive]
406pub enum CopyObjectError {
407    /// Note that CopyObject cannot distinguish between NoSuchBucket and NoSuchKey errors
408    #[error("The object was not found")]
409    NotFound,
410
411    #[error("The source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glacier.")]
412    ObjectNotInActiveTierError,
413}
414
415/// Parameters to a [`copy_object`](ObjectClient::copy_object) request
416#[derive(Debug, Default, Clone)]
417#[non_exhaustive]
418pub struct CopyObjectParams {
419    // TODO: Populate this struct with fields as and when required to satisfy various use cases.
420}
421
422impl CopyObjectParams {
423    /// Create a default [CopyObjectParams].
424    pub fn new() -> Self {
425        Self::default()
426    }
427}
428
429/// Result of a [`get_object_attributes`](ObjectClient::get_object_attributes) request
430#[derive(Debug, Default)]
431pub struct GetObjectAttributesResult {
432    /// ETag of the object
433    pub etag: Option<String>,
434
435    /// Checksum of the object
436    pub checksum: Option<Checksum>,
437
438    /// Object parts metadata for multi part object
439    pub object_parts: Option<GetObjectAttributesParts>,
440
441    /// Storage class of the object
442    pub storage_class: Option<String>,
443
444    /// Object size
445    pub object_size: Option<u64>,
446}
447
448/// Errors returned by a [`get_object_attributes`](ObjectClient::get_object_attributes) request
449#[derive(Debug, Error, PartialEq, Eq)]
450#[non_exhaustive]
451pub enum GetObjectAttributesError {
452    #[error("The bucket does not exist")]
453    NoSuchBucket,
454
455    #[error("The key does not exist")]
456    NoSuchKey,
457}
458
459/// Parameters to a [`rename_object`](ObjectClient::rename_object) request
460#[derive(Debug, Default, Clone)]
461#[non_exhaustive]
462pub struct RenameObjectParams {
463    /// Can be set to * to disable overwrite
464    pub if_none_match: Option<String>,
465    /// Optional ETag that the destination must match
466    pub if_match: Option<ETag>,
467    /// Optional ETag that the source must match
468    pub if_source_match: Option<ETag>,
469    /// Idempotency token
470    pub client_token: Option<String>,
471    /// Can be used for custom headers
472    pub custom_headers: Vec<(String, String)>,
473}
474
475impl RenameObjectParams {
476    /// Create a default [RenameObjectParams].
477    pub fn new() -> Self {
478        Self::default()
479    }
480
481    /// Set if-none-match header
482    pub fn if_none_match(mut self, value: Option<String>) -> Self {
483        self.if_none_match = value;
484        self
485    }
486
487    /// Set if-match header
488    pub fn if_match(mut self, value: Option<ETag>) -> Self {
489        self.if_match = value;
490        self
491    }
492
493    /// Set if-source-match header
494    pub fn if_source_match(mut self, value: Option<ETag>) -> Self {
495        self.if_source_match = value;
496        self
497    }
498
499    /// Set idempotency token
500    pub fn client_token(mut self, value: Option<String>) -> Self {
501        self.client_token = value;
502        self
503    }
504
505    /// Set custom headers
506    pub fn custom_headers(mut self, value: Vec<(String, String)>) -> Self {
507        self.custom_headers = value;
508        self
509    }
510}
511
512/// Result of a [`rename_object`](ObjectClient::rename_object) request
513#[derive(Debug, Eq, PartialEq)]
514#[non_exhaustive]
515pub struct RenameObjectResult {}
516
517#[derive(Debug, Eq, PartialEq)]
518#[non_exhaustive]
519pub enum RenamePreconditionTypes {
520    IfMatch,
521    IfNoneMatch,
522    Other,
523}
524
525/// Errors returned by a [`rename_object`](ObjectClient::rename_object) request
526#[derive(Debug, Error, PartialEq, Eq)]
527#[non_exhaustive]
528pub enum RenameObjectError {
529    #[error("The bucket does not exist")]
530    NoSuchBucket,
531    #[error("The destination key provided is too long")]
532    KeyTooLong,
533    #[error("The key was not found")]
534    KeyNotFound,
535    #[error("A Precondition")]
536    PreConditionFailed(RenamePreconditionTypes),
537    #[error("The service does not implement rename")]
538    NotImplementedError,
539    #[error("The service returned an Internal Error")]
540    InternalError,
541    #[error("You do not have access to this resource")]
542    AccessDenied,
543    #[error("Bad Request")]
544    BadRequest,
545}
546
547pub type ObjectMetadata = HashMap<String, String>;
548
549/// Parameters to a [`put_object`](ObjectClient::put_object) request
550#[derive(Debug, Default, Clone)]
551#[non_exhaustive]
552pub struct PutObjectParams {
553    /// Enable Crc32c trailing checksums.
554    pub trailing_checksums: PutObjectTrailingChecksums,
555    /// Storage class to be used when creating new S3 object
556    pub storage_class: Option<String>,
557    /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
558    pub server_side_encryption: Option<String>,
559    /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
560    /// when creating new S3 object
561    pub ssekms_key_id: Option<String>,
562    /// Custom headers to add to the request
563    pub custom_headers: Vec<(String, String)>,
564    /// User-defined object metadata
565    pub object_metadata: ObjectMetadata,
566    /// An optional caller-supplied identifier passed through to the memory pool on buffer
567    /// allocations for this request. Not related to the S3 request ID returned by the service.
568    pub custom_id: Option<u64>,
569}
570
571impl PutObjectParams {
572    /// Create a default [PutObjectParams].
573    pub fn new() -> Self {
574        Self::default()
575    }
576
577    /// Set Crc32c trailing checksums.
578    pub fn trailing_checksums(mut self, value: PutObjectTrailingChecksums) -> Self {
579        self.trailing_checksums = value;
580        self
581    }
582
583    /// Set the storage class.
584    pub fn storage_class(mut self, value: String) -> Self {
585        self.storage_class = Some(value);
586        self
587    }
588
589    /// Set server-side encryption type.
590    pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
591        self.server_side_encryption = value;
592        self
593    }
594
595    /// Set KMS key ID to be used for server-side encryption.
596    pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
597        self.ssekms_key_id = value;
598        self
599    }
600
601    /// Add a custom header to the request.
602    pub fn add_custom_header(mut self, name: String, value: String) -> Self {
603        self.custom_headers.push((name, value));
604        self
605    }
606
607    /// Set user defined object metadata.
608    pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
609        self.object_metadata = value;
610        self
611    }
612
613    /// Set an optional caller-supplied identifier passed through to the memory pool on buffer
614    /// allocations for this request. Not related to the S3 request ID returned by the service.
615    pub fn custom_id(mut self, value: Option<u64>) -> Self {
616        self.custom_id = value;
617        self
618    }
619}
620
621/// How CRC32c checksums are used for parts of a multi-part PutObject request
622#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
623pub enum PutObjectTrailingChecksums {
624    /// Checksums are computed, passed to upload review, and also sent to S3
625    Enabled,
626    /// Checksums are computed, passed to upload review, but not sent to S3
627    ReviewOnly,
628    /// Checksums are not computed on the client side
629    #[default]
630    Disabled,
631}
632
633/// Info for the caller to review before an upload completes.
634pub type UploadReview = mountpoint_s3_crt::s3::client::UploadReview;
635
636/// Info about a single part, for the caller to review before the upload completes.
637pub type UploadReviewPart = mountpoint_s3_crt::s3::client::UploadReviewPart;
638
639/// A checksum algorithm used by the object client for integrity checks on uploads and downloads.
640pub type ChecksumAlgorithm = mountpoint_s3_crt::s3::client::ChecksumAlgorithm;
641
642/// Parameters to a [`put_object_single`](ObjectClient::put_object_single) request
643#[derive(Debug, Default, Clone)]
644#[non_exhaustive]
645pub struct PutObjectSingleParams {
646    /// User-provided checksum of the data to upload.
647    pub checksum: Option<UploadChecksum>,
648    /// Storage class to be used when creating new S3 object
649    pub storage_class: Option<String>,
650    /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
651    pub server_side_encryption: Option<String>,
652    /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
653    /// when creating new S3 object
654    pub ssekms_key_id: Option<String>,
655    /// Requires pre-existing object to match the given etag in order to perform the request
656    pub if_match: Option<ETag>,
657    /// Offset on the pre-existing object where to append the data in the request
658    pub write_offset_bytes: Option<u64>,
659    /// Custom headers to add to the request
660    pub custom_headers: Vec<(String, String)>,
661    /// User-defined object metadata
662    pub object_metadata: ObjectMetadata,
663}
664
665impl PutObjectSingleParams {
666    /// Create a default [PutObjectSingleParams].
667    pub fn new() -> Self {
668        Self::default()
669    }
670
671    /// Create a [PutObjectSingleParams] for an append request at the given offset.
672    pub fn new_for_append(offset: u64) -> Self {
673        Self::default().write_offset_bytes(offset)
674    }
675
676    /// Set checksum.
677    pub fn checksum(mut self, value: Option<UploadChecksum>) -> Self {
678        self.checksum = value;
679        self
680    }
681
682    /// Set the storage class.
683    pub fn storage_class(mut self, value: String) -> Self {
684        self.storage_class = Some(value);
685        self
686    }
687
688    /// Set server-side encryption type.
689    pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
690        self.server_side_encryption = value;
691        self
692    }
693
694    /// Set KMS key ID to be used for server-side encryption.
695    pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
696        self.ssekms_key_id = value;
697        self
698    }
699
700    /// Set the required etag on the pre-existing object.
701    pub fn if_match(mut self, value: Option<ETag>) -> Self {
702        self.if_match = value;
703        self
704    }
705
706    /// Set the offset on the pre-existing object where to append the data in the request.
707    pub fn write_offset_bytes(mut self, value: u64) -> Self {
708        self.write_offset_bytes = Some(value);
709        self
710    }
711
712    /// Add a custom header to the request.
713    pub fn add_custom_header(mut self, name: String, value: String) -> Self {
714        self.custom_headers.push((name, value));
715        self
716    }
717
718    /// Set user defined object metadata.
719    pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
720        self.object_metadata = value;
721        self
722    }
723}
724
725/// A checksum used by the object client for integrity checks on uploads.
726#[derive(Debug, Clone)]
727#[non_exhaustive]
728pub enum UploadChecksum {
729    Crc64nvme(checksums::Crc64nvme),
730    Crc32c(checksums::Crc32c),
731    Crc32(checksums::Crc32),
732    Sha1(checksums::Sha1),
733    Sha256(checksums::Sha256),
734}
735
736impl UploadChecksum {
737    /// The checksum algorithm used to compute this checksum.
738    pub fn checksum_algorithm(&self) -> ChecksumAlgorithm {
739        match self {
740            UploadChecksum::Crc64nvme(_) => ChecksumAlgorithm::Crc64nvme,
741            UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c,
742            UploadChecksum::Crc32(_) => ChecksumAlgorithm::Crc32,
743            UploadChecksum::Sha1(_) => ChecksumAlgorithm::Sha1,
744            UploadChecksum::Sha256(_) => ChecksumAlgorithm::Sha256,
745        }
746    }
747}
748
749/// A handle for controlling backpressure enabled requests.
750///
751/// If the client was created with `enable_read_backpressure` set true,
752/// each meta request has a flow-control window that shrinks as response
753/// body data is downloaded (headers do not affect the size of the window).
754/// The client's `initial_read_window` determines the starting size of each meta request's window.
755/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
756/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
757/// Maintain a larger window to keep up a high download throughput,
758/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
759/// Maintain a smaller window to limit the amount of data buffered in memory.
760pub trait ClientBackpressureHandle {
761    /// Increment the flow-control read window, so that response data continues downloading.
762    ///
763    /// The client should read data up to and including the end of the read window,
764    /// even if that means fetching and returning data beyond the end of the window
765    /// to fulfil a part-sized GetObject request.
766    /// As an example, a call with `len = 1` could trigger an 8MiB GetObject request
767    /// if the given client fetches data in requests of 8MiB ranges.
768    fn increment_read_window(&mut self, len: usize);
769
770    /// Move the upper bound of the read window to the given offset if it's not already there.
771    fn ensure_read_window(&mut self, desired_end_offset: u64);
772
773    /// Get the upper bound of the read window. When backpressure is enabled, [GetObjectRequest] can
774    /// return data up to this offset *exclusively*.
775    fn read_window_end_offset(&self) -> u64;
776}
777
778/// A streaming response to a GetObject request.
779///
780/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
781/// Each item of the stream is a part of the object body together with the part's offset within the
782/// object.
783#[cfg_attr(not(docsrs), async_trait)]
784pub trait GetObjectResponse:
785    Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
786{
787    type BackpressureHandle: ClientBackpressureHandle + Clone + Send + Sync;
788    type ClientError: std::error::Error + Send + Sync + 'static;
789
790    /// Take the backpressure handle from the response.
791    ///
792    /// If `enable_read_backpressure` is false this call will return `None`,
793    /// no backpressure is being applied and data is being downloaded as fast as possible.
794    fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle>;
795
796    /// Get the object's user defined metadata.
797    fn get_object_metadata(&self) -> ObjectMetadata;
798
799    /// Get the object's checksum, if uploaded with one
800    fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError>;
801}
802
803/// Failures to return object checksum
804#[derive(Debug, Error)]
805pub enum ObjectChecksumError {
806    #[error("requested object checksums, but did not specify it in the request")]
807    DidNotRequestChecksums,
808    #[error("object checksum could not be retrieved from headers")]
809    HeadersError(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
810}
811
812/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
813/// offset within the object and the bytes starting at that offset.
814#[derive(Debug)]
815pub struct GetBodyPart {
816    pub offset: u64,
817    pub data: Bytes,
818}
819
820/// A streaming put request which allows callers to asynchronously write the body of the request.
821///
822/// You can call the [`write`](Self::write) method to write data to the object, and then call
823/// [`complete`](Self::complete) to complete the upload. Alternatively, you can call
824/// [`review_and_complete`](Self::review_and_complete) to review the upload before completing it,
825/// giving the chance to cancel the request if the upload is not as expected.
826///
827/// This is an async trait defined with the [async-trait](https://crates.io/crates/async-trait)
828/// crate, and so implementations of this trait must use the `#[async_trait::async_trait]`
829/// attribute.
830#[cfg_attr(not(docsrs), async_trait)]
831pub trait PutObjectRequest: Send {
832    type ClientError: std::error::Error + Send + Sync + 'static;
833
834    /// Write the given slice to the put request body.
835    async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError>;
836
837    /// Complete the put request and return a [`PutObjectResult`].
838    async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
839
840    /// Review and complete the put request and return a [`PutObjectResult`].
841    async fn review_and_complete(
842        self,
843        review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
844    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
845}
846
847/// Result of a [ObjectClient::put_object] request
848#[derive(Debug)]
849#[non_exhaustive]
850pub struct PutObjectResult {
851    /// ETag of the uploaded object
852    pub etag: ETag,
853    /// Server-side encryption type that was used to store new object (reported by S3)
854    pub sse_type: Option<String>,
855    /// Server-side encryption KMS key ID that was used to store new object (reported by S3)
856    pub sse_kms_key_id: Option<String>,
857}
858
859/// Errors returned by a [`put_object`](ObjectClient::put_object) request
860#[derive(Debug, Error, PartialEq, Eq)]
861#[non_exhaustive]
862pub enum PutObjectError {
863    #[error("The bucket does not exist")]
864    NoSuchBucket,
865
866    #[error("The key does not exist")]
867    NoSuchKey,
868
869    #[error("Request body cannot be empty when write offset is specified")]
870    EmptyBody,
871
872    #[error("The offset does not match the current object size")]
873    InvalidWriteOffset,
874
875    #[error("The provided checksum does not match the data")]
876    BadChecksum,
877
878    #[error("The provided checksum is not valid or does not match the existing checksum algorithm")]
879    InvalidChecksumType,
880
881    #[error("At least one of the pre-conditions you specified did not hold")]
882    PreconditionFailed,
883
884    #[error("The server does not support the functionality required to fulfill the request")]
885    NotImplemented,
886}
887
888/// Restoration status for S3 objects in flexible retrieval storage classes.
889///
890/// See [Checking restore status and expiration
891/// date](https://docs.aws.amazon.com/AmazonS3/latest/userguide/restoring-objects.html#restore-archived-objects-status)
892/// in the *Amazon S3 User Guide* for more details.
893#[derive(Debug, Clone, Copy)]
894pub enum RestoreStatus {
895    /// S3 returns this status after it accepted a restoration request, but not have completed it yet.
896    /// Objects with this status are not readable.
897    InProgress,
898
899    /// This status means that restoration is fully completed. Note that restored objects are stored only
900    /// for the number of days that was specified in the request.
901    Restored { expiry: SystemTime },
902}
903
904/// Metadata about a single S3 object.
905///
906/// See [Object](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html) in the *Amazon S3
907/// API Reference* for more details.
908#[derive(Debug, Clone)]
909#[non_exhaustive]
910pub struct ObjectInfo {
911    /// Key for this object.
912    pub key: String,
913
914    /// Size of this object in bytes.
915    pub size: u64,
916
917    /// The time this object was last modified.
918    pub last_modified: OffsetDateTime,
919
920    /// Storage class for this object. Optional because head_object does not return
921    /// the storage class in its response for Standard objects. See examples in the [*Amazon S3 API
922    /// Reference*](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Examples)
923    pub storage_class: Option<String>,
924
925    /// Objects in flexible retrieval storage classes (such as GLACIER and DEEP_ARCHIVE) are only
926    /// accessible after restoration
927    pub restore_status: Option<RestoreStatus>,
928
929    /// Entity tag of this object.
930    pub etag: String,
931
932    /// The algorithm that was used to create a checksum of the object.
933    ///
934    /// The [Amazon S3 API Reference] specifies this field as a list of strings,
935    /// so we return here a [Vec] of [ChecksumAlgorithm].
936    ///
937    /// [Amazon S3 API Reference]:
938    ///     https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ChecksumAlgorithm
939    pub checksum_algorithms: Vec<ChecksumAlgorithm>,
940}
941
942/// All possible object attributes that can be retrived from [ObjectClient::get_object_attributes].
943/// Fields that you do not specify are not returned.
944#[derive(Debug)]
945pub enum ObjectAttribute {
946    /// ETag of the object
947    ETag,
948
949    /// Checksum of the object
950    Checksum,
951
952    /// Object parts metadata for multi part object
953    ObjectParts,
954
955    /// Storage class of the object
956    StorageClass,
957
958    /// Object size
959    ObjectSize,
960}
961
962impl fmt::Display for ObjectAttribute {
963    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
964        let attr_name = match self {
965            ObjectAttribute::ETag => "ETag",
966            ObjectAttribute::Checksum => "Checksum",
967            ObjectAttribute::ObjectParts => "ObjectParts",
968            ObjectAttribute::StorageClass => "StorageClass",
969            ObjectAttribute::ObjectSize => "ObjectSize",
970        };
971        write!(f, "{attr_name}")
972    }
973}
974
975/// Metadata about object checksum.
976///
977/// See [Checksum](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Checksum.html) in the *Amazon
978/// S3 API Reference* for more details.
979#[derive(Clone, Debug, PartialEq, Eq)]
980pub struct Checksum {
981    /// Base64-encoded, 64-bit CRC64NVME checksum of the object
982    pub checksum_crc64nvme: Option<String>,
983
984    /// Base64-encoded, 32-bit CRC32 checksum of the object
985    pub checksum_crc32: Option<String>,
986
987    /// Base64-encoded, 32-bit CRC32C checksum of the object
988    pub checksum_crc32c: Option<String>,
989
990    /// Base64-encoded, 160-bit SHA-1 digest of the object
991    pub checksum_sha1: Option<String>,
992
993    /// Base64-encoded, 256-bit SHA-256 digest of the object
994    pub checksum_sha256: Option<String>,
995}
996
997impl Checksum {
998    /// Construct an empty [Checksum]
999    pub fn empty() -> Self {
1000        Self {
1001            checksum_crc64nvme: None,
1002            checksum_crc32: None,
1003            checksum_crc32c: None,
1004            checksum_sha1: None,
1005            checksum_sha256: None,
1006        }
1007    }
1008
1009    /// Provide [ChecksumAlgorithm]s for the [Checksum], if set and recognized.
1010    pub fn algorithms(&self) -> Vec<ChecksumAlgorithm> {
1011        // We assume that at most one checksum will be set.
1012        let mut algorithms = Vec::with_capacity(1);
1013
1014        // Pattern match forces us to accomodate any new fields when added.
1015        let Self {
1016            checksum_crc64nvme,
1017            checksum_crc32,
1018            checksum_crc32c,
1019            checksum_sha1,
1020            checksum_sha256,
1021        } = &self;
1022
1023        if checksum_crc64nvme.is_some() {
1024            algorithms.push(ChecksumAlgorithm::Crc64nvme);
1025        }
1026        if checksum_crc32.is_some() {
1027            algorithms.push(ChecksumAlgorithm::Crc32);
1028        }
1029        if checksum_crc32c.is_some() {
1030            algorithms.push(ChecksumAlgorithm::Crc32c);
1031        }
1032        if checksum_sha1.is_some() {
1033            algorithms.push(ChecksumAlgorithm::Sha1);
1034        }
1035        if checksum_sha256.is_some() {
1036            algorithms.push(ChecksumAlgorithm::Sha256);
1037        }
1038
1039        algorithms
1040    }
1041}
1042
1043impl From<Option<UploadChecksum>> for Checksum {
1044    fn from(value: Option<UploadChecksum>) -> Self {
1045        let mut checksum = Checksum::empty();
1046        match value.as_ref() {
1047            Some(UploadChecksum::Crc64nvme(crc64)) => checksum.checksum_crc64nvme = Some(crc64nvme_to_base64(crc64)),
1048            Some(UploadChecksum::Crc32c(crc32c)) => checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c)),
1049            Some(UploadChecksum::Crc32(crc32)) => checksum.checksum_crc32 = Some(crc32_to_base64(crc32)),
1050            Some(UploadChecksum::Sha1(sha1)) => checksum.checksum_sha1 = Some(sha1_to_base64(sha1)),
1051            Some(UploadChecksum::Sha256(sha256)) => checksum.checksum_sha256 = Some(sha256_to_base64(sha256)),
1052            None => {}
1053        };
1054        checksum
1055    }
1056}
1057
1058/// Metadata about object parts from GetObjectAttributes API.
1059///
1060/// See [GetObjectAttributesParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAttributesParts.html)
1061/// in the *Amazon S3 API Reference* for more details.
1062#[derive(Debug)]
1063pub struct GetObjectAttributesParts {
1064    /// Indicates whether the returned list of parts is truncated
1065    pub is_truncated: Option<bool>,
1066
1067    /// Maximum number of parts allowed in the response
1068    pub max_parts: Option<usize>,
1069
1070    /// When a list is truncated, this element specifies the next marker
1071    pub next_part_number_marker: Option<usize>,
1072
1073    /// The marker for the current part
1074    pub part_number_marker: Option<usize>,
1075
1076    /// Array of metadata for particular parts
1077    pub parts: Option<Vec<ObjectPart>>,
1078
1079    /// Total number of parts
1080    pub total_parts_count: Option<usize>,
1081}
1082
1083/// Metadata for an individual object part.
1084///
1085/// See [ObjectPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ObjectPart.html) in the
1086/// *Amazon S3 API Reference* for more details.
1087#[derive(Debug)]
1088pub struct ObjectPart {
1089    /// Checksum of the object
1090    pub checksum: Option<Checksum>,
1091
1092    /// Number of the part, this value is a positive integer between 1 and 10,000
1093    pub part_number: usize,
1094
1095    /// Size of the part in bytes
1096    pub size: usize,
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use super::*;
1102
1103    #[test]
1104    fn test_checksum_algorithm_one_set() {
1105        let checksum = Checksum {
1106            checksum_crc64nvme: None,
1107            checksum_crc32: None,
1108            checksum_crc32c: None,
1109            checksum_sha1: Some("checksum_sha1".to_string()),
1110            checksum_sha256: None,
1111        };
1112        assert_eq!(checksum.algorithms(), vec![ChecksumAlgorithm::Sha1]);
1113    }
1114
1115    #[test]
1116    fn test_checksum_algorithm_none_set() {
1117        let checksum = Checksum {
1118            checksum_crc64nvme: None,
1119            checksum_crc32: None,
1120            checksum_crc32c: None,
1121            checksum_sha1: None,
1122            checksum_sha256: None,
1123        };
1124        assert_eq!(checksum.algorithms(), vec![]);
1125    }
1126
1127    #[test]
1128    fn test_checksum_algorithm_many_set() {
1129        // Amazon S3 doesn't support more than one algorithm today, but just in case... let's show we don't panic.
1130        let checksum = Checksum {
1131            checksum_crc64nvme: None,
1132            checksum_crc32: None,
1133            checksum_crc32c: Some("checksum_crc32c".to_string()),
1134            checksum_sha1: Some("checksum_sha1".to_string()),
1135            checksum_sha256: None,
1136        };
1137        assert_eq!(
1138            checksum.algorithms(),
1139            vec![ChecksumAlgorithm::Crc32c, ChecksumAlgorithm::Sha1],
1140        );
1141    }
1142}