mountpoint_s3_client/
object_client.rs

1use std::fmt::{self, Debug};
2use std::ops::Range;
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use auto_impl::auto_impl;
7use bytes::Bytes;
8use futures::Stream;
9use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
10use std::collections::HashMap;
11use thiserror::Error;
12use time::OffsetDateTime;
13
14use crate::checksums::{
15    self, crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64,
16};
17use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
18
19mod etag;
20pub use etag::ETag;
21
22/// A generic interface to S3-like object storage services.
23///
24/// This trait defines the common methods that all object services implement.
25///
26/// This is an async trait defined with the [async-trait](https://crates.io/crates/async-trait)
27/// crate, and so implementations of this trait must use the `#[async_trait::async_trait]`
28/// attribute.
29#[cfg_attr(not(docsrs), async_trait)]
30#[auto_impl(Arc)]
31pub trait ObjectClient {
32    type GetObjectResponse: GetObjectResponse<ClientError = Self::ClientError>;
33    type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
34    type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;
35
36    /// Query the part size this client uses for GET operations to the object store.
37    fn read_part_size(&self) -> usize;
38
39    /// Query the part size this client uses for PUT operations to the object store.
40    fn write_part_size(&self) -> usize;
41
42    /// Query the initial read window size this client uses for backpressure GetObject requests.
43    /// This can be `None` if backpressure is disabled.
44    fn initial_read_window_size(&self) -> Option<usize>;
45
46    /// Query current memory usage stats for the client. This can be `None` if the client
47    /// does not record the stats.
48    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;
49
50    /// Delete a single object from the object store.
51    ///
52    /// DeleteObject will succeed even if the object within the bucket does not exist.
53    async fn delete_object(
54        &self,
55        bucket: &str,
56        key: &str,
57    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError>;
58
59    /// Create a copy of an existing object. Currently, this functionality has the following limitations:
60    /// - Supported only for copying between matching bucket types:
61    ///     - Standard S3 to Standard S3 buckets.
62    ///     - S3 Express to S3 Express buckets.
63    /// - Host header must use virtual host addressing style (path style is not supported) and both source and dest buckets must have dns compliant name.
64    /// - Only {bucket}/{key} format is supported for source and passing arn as source will not work.
65    /// - Source bucket is assumed to be in the same region as destination bucket.
66    async fn copy_object(
67        &self,
68        source_bucket: &str,
69        source_key: &str,
70        destination_bucket: &str,
71        destination_key: &str,
72        params: &CopyObjectParams,
73    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError>;
74
75    /// Get an object from the object store. Returns a stream of body parts of the object. Parts are
76    /// guaranteed to be returned by the stream in order and contiguously.
77    async fn get_object(
78        &self,
79        bucket: &str,
80        key: &str,
81        params: &GetObjectParams,
82    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError>;
83
84    /// List the objects in a bucket under a given prefix
85    async fn list_objects(
86        &self,
87        bucket: &str,
88        continuation_token: Option<&str>,
89        delimiter: &str,
90        max_keys: usize,
91        prefix: &str,
92    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError>;
93
94    /// Retrieve object metadata without retrieving the object contents
95    async fn head_object(
96        &self,
97        bucket: &str,
98        key: &str,
99        params: &HeadObjectParams,
100    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError>;
101
102    /// Put an object into the object store. Returns a [PutObjectRequest] for callers
103    /// to provide the content of the object.
104    async fn put_object(
105        &self,
106        bucket: &str,
107        key: &str,
108        params: &PutObjectParams,
109    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError>;
110
111    /// Put an object into the object store.
112    async fn put_object_single<'a>(
113        &self,
114        bucket: &str,
115        key: &str,
116        params: &PutObjectSingleParams,
117        contents: impl AsRef<[u8]> + Send + 'a,
118    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
119
120    /// Retrieves all the metadata from an object without returning the object contents.
121    async fn get_object_attributes(
122        &self,
123        bucket: &str,
124        key: &str,
125        max_parts: Option<usize>,
126        part_number_marker: Option<usize>,
127        object_attributes: &[ObjectAttribute],
128    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError>;
129
130    /// Rename an object from some source key to a destination key within the same bucket.
131    async fn rename_object(
132        &self,
133        bucket: &str,
134        src_key: &str,
135        dest_key: &str,
136        params: &RenameObjectParams,
137    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError>;
138}
139
140/// The top-level error type returned by calls to an [`ObjectClient`].
141///
142/// Errors that are explicitly modeled on a per-request-type basis are [`ServiceError`]s. Other
143/// generic or unhandled errors are [`ClientError`]s.
144///
145/// The distinction between these two types of error can sometimes be blurry. As a rough heuristic,
146/// [`ServiceError`]s are those that *any reasonable implementation* of an object client would be
147/// capable of experiencing, and [`ClientError`]s are anything else. For example, any object client
148/// could experience a "no such key" error, but only object clients that implement a permissions
149/// system could experience "permission denied" errors. When in doubt, we err towards *not* adding
150/// new [`ServiceError`]s, as they are public API for *every* object client.
151///
152/// [`ServiceError`]: ObjectClientError::ServiceError
153/// [`ClientError`]: ObjectClientError::ClientError
154#[derive(Debug, Error, PartialEq)]
155pub enum ObjectClientError<S, C> {
156    /// An error returned by the service itself
157    #[error("Service error")]
158    ServiceError(#[source] S),
159
160    /// An error within the object client (for example, an unexpected response, or a failure to
161    /// construct the request).
162    #[error("Client error")]
163    ClientError(#[from] C),
164}
165
166impl<S, C> ProvideErrorMetadata for ObjectClientError<S, C>
167where
168    S: ProvideErrorMetadata,
169    C: ProvideErrorMetadata,
170{
171    fn meta(&self) -> ClientErrorMetadata {
172        match self {
173            Self::ClientError(err) => err.meta(),
174            Self::ServiceError(err) => err.meta(),
175        }
176    }
177}
178
179impl ProvideErrorMetadata for GetObjectError {
180    fn meta(&self) -> ClientErrorMetadata {
181        match self {
182            GetObjectError::NoSuchBucket(client_error_metadata) => client_error_metadata.clone(),
183            GetObjectError::NoSuchKey(client_error_metadata) => client_error_metadata.clone(),
184            GetObjectError::PreconditionFailed(client_error_metadata) => client_error_metadata.clone(),
185        }
186    }
187}
188
189impl ProvideErrorMetadata for ListObjectsError {
190    fn meta(&self) -> ClientErrorMetadata {
191        Default::default()
192    }
193}
194
195impl ProvideErrorMetadata for HeadObjectError {
196    fn meta(&self) -> ClientErrorMetadata {
197        Default::default()
198    }
199}
200
201impl ProvideErrorMetadata for DeleteObjectError {
202    fn meta(&self) -> ClientErrorMetadata {
203        Default::default()
204    }
205}
206
207impl ProvideErrorMetadata for RenameObjectError {
208    fn meta(&self) -> ClientErrorMetadata {
209        Default::default()
210    }
211}
212
213/// Shorthand type for the result of an object client request
214pub type ObjectClientResult<T, S, C> = Result<T, ObjectClientError<S, C>>;
215
216/// Errors returned by a [`get_object`](ObjectClient::get_object) request
217#[derive(Debug, Error, PartialEq, Eq)]
218#[non_exhaustive]
219pub enum GetObjectError {
220    #[error("The bucket does not exist")]
221    NoSuchBucket(ClientErrorMetadata),
222
223    #[error("The key does not exist")]
224    NoSuchKey(ClientErrorMetadata),
225
226    #[error("At least one of the preconditions specified did not hold")]
227    PreconditionFailed(ClientErrorMetadata),
228}
229
230/// Parameters to a [`get_object`](ObjectClient::get_object) request
231#[derive(Debug, Default, Clone)]
232#[non_exhaustive]
233pub struct GetObjectParams {
234    pub range: Option<Range<u64>>,
235    pub if_match: Option<ETag>,
236    pub checksum_mode: Option<ChecksumMode>,
237}
238
239impl GetObjectParams {
240    /// Create a default [GetObjectParams].
241    pub fn new() -> Self {
242        Self::default()
243    }
244
245    /// Set the range retrieved by the GetObject request
246    pub fn range(mut self, value: Option<Range<u64>>) -> Self {
247        self.range = value;
248        self
249    }
250
251    /// Set the required etag on the object
252    pub fn if_match(mut self, value: Option<ETag>) -> Self {
253        self.if_match = value;
254        self
255    }
256
257    /// Set option to retrieve checksum as part of the GetObject request
258    pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
259        self.checksum_mode = value;
260        self
261    }
262}
263
264/// Result of a [`list_objects`](ObjectClient::list_objects) request
265#[derive(Debug)]
266#[non_exhaustive]
267pub struct ListObjectsResult {
268    /// The list of objects.
269    pub objects: Vec<ObjectInfo>,
270
271    /// The list of common prefixes. This rolls up all of the objects with a common prefix up to
272    /// the next instance of the delimiter.
273    pub common_prefixes: Vec<String>,
274
275    /// If present, the continuation token to use to query more results.
276    pub next_continuation_token: Option<String>,
277}
278
279/// Errors returned by a [`list_objects`](ObjectClient::list_objects) request
280#[derive(Debug, Error, PartialEq, Eq)]
281#[non_exhaustive]
282pub enum ListObjectsError {
283    #[error("The bucket does not exist")]
284    NoSuchBucket,
285}
286
287/// Parameters to a [`head_object`](ObjectClient::head_object) request
288#[derive(Debug, Default, Clone)]
289#[non_exhaustive]
290pub struct HeadObjectParams {
291    /// Enable to retrieve checksum as part of the HeadObject request
292    pub checksum_mode: Option<ChecksumMode>,
293}
294
295impl HeadObjectParams {
296    /// Create a default [HeadObjectParams].
297    pub fn new() -> Self {
298        Self::default()
299    }
300
301    /// Set option to retrieve checksum as part of the HeadObject request
302    pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
303        self.checksum_mode = value;
304        self
305    }
306}
307
308/// Enable [ChecksumMode] to retrieve object checksums
309#[non_exhaustive]
310#[derive(Clone, Debug, PartialEq)]
311pub enum ChecksumMode {
312    /// Retrieve checksums
313    Enabled,
314}
315
316/// Result of a [`head_object`](ObjectClient::head_object) request
317#[derive(Debug)]
318#[non_exhaustive]
319pub struct HeadObjectResult {
320    /// Size of the object in bytes.
321    ///
322    /// Refers to the `Content-Length` HTTP header for HeadObject.
323    pub size: u64,
324
325    /// The time this object was last modified.
326    pub last_modified: OffsetDateTime,
327
328    /// Entity tag of this object.
329    pub etag: ETag,
330
331    /// Storage class for this object.
332    ///
333    /// The value is optional because HeadObject does not return the storage class in its response
334    /// for objects in the S3 Standard storage class.
335    /// See examples in the
336    /// [Amazon S3 API Reference](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Examples).
337    pub storage_class: Option<String>,
338
339    /// Objects in flexible retrieval storage classes (such as GLACIER and DEEP_ARCHIVE) are only
340    /// accessible after restoration
341    pub restore_status: Option<RestoreStatus>,
342    /// Checksum of the object.
343    ///
344    /// HeadObject must explicitly request for this field to be included,
345    /// otherwise the values will be empty.
346    pub checksum: Checksum,
347
348    /// Server-side encryption type that was used to store the object.
349    pub sse_type: Option<String>,
350
351    /// Server-side encryption KMS key ID that was used to store the object.
352    pub sse_kms_key_id: Option<String>,
353}
354
355/// Errors returned by a [`head_object`](ObjectClient::head_object) request
356#[derive(Debug, Error, PartialEq, Eq)]
357#[non_exhaustive]
358pub enum HeadObjectError {
359    /// Note that HeadObject cannot distinguish between NoSuchBucket and NoSuchKey errors
360    #[error("The object was not found")]
361    NotFound,
362}
363
364/// Result of a [`delete_object`](ObjectClient::delete_object) request
365///
366/// Note: DeleteObject requests on a non-existent object within a bucket are considered a success.
367// TODO: Populate this struct with return fields from the S3 API, e.g., version id, delete marker.
368#[derive(Debug)]
369#[non_exhaustive]
370pub struct DeleteObjectResult {}
371
372/// Errors returned by a [`delete_object`](ObjectClient::delete_object) request
373#[derive(Debug, Error, PartialEq, Eq)]
374#[non_exhaustive]
375pub enum DeleteObjectError {
376    #[error("The bucket does not exist")]
377    NoSuchBucket,
378}
379
380/// Result of a [`copy_object`](ObjectClient::copy_object) request
381#[derive(Debug)]
382#[non_exhaustive]
383pub struct CopyObjectResult {
384    // TODO: Populate this struct with return fields from the S3 API, e.g., etag.
385}
386
387/// Errors returned by a [`copy_object`](ObjectClient::copy_object) request
388#[derive(Debug, Error, PartialEq, Eq)]
389#[non_exhaustive]
390pub enum CopyObjectError {
391    /// Note that CopyObject cannot distinguish between NoSuchBucket and NoSuchKey errors
392    #[error("The object was not found")]
393    NotFound,
394
395    #[error("The source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glacier.")]
396    ObjectNotInActiveTierError,
397}
398
399/// Parameters to a [`copy_object`](ObjectClient::copy_object) request
400#[derive(Debug, Default, Clone)]
401#[non_exhaustive]
402pub struct CopyObjectParams {
403    // TODO: Populate this struct with fields as and when required to satisfy various use cases.
404}
405
406impl CopyObjectParams {
407    /// Create a default [CopyObjectParams].
408    pub fn new() -> Self {
409        Self::default()
410    }
411}
412
413/// Result of a [`get_object_attributes`](ObjectClient::get_object_attributes) request
414#[derive(Debug, Default)]
415pub struct GetObjectAttributesResult {
416    /// ETag of the object
417    pub etag: Option<String>,
418
419    /// Checksum of the object
420    pub checksum: Option<Checksum>,
421
422    /// Object parts metadata for multi part object
423    pub object_parts: Option<GetObjectAttributesParts>,
424
425    /// Storage class of the object
426    pub storage_class: Option<String>,
427
428    /// Object size
429    pub object_size: Option<u64>,
430}
431
432/// Errors returned by a [`get_object_attributes`](ObjectClient::get_object_attributes) request
433#[derive(Debug, Error, PartialEq, Eq)]
434#[non_exhaustive]
435pub enum GetObjectAttributesError {
436    #[error("The bucket does not exist")]
437    NoSuchBucket,
438
439    #[error("The key does not exist")]
440    NoSuchKey,
441}
442
443/// Parameters to a [`rename_object`](ObjectClient::rename_object) request
444#[derive(Debug, Default, Clone)]
445#[non_exhaustive]
446pub struct RenameObjectParams {
447    /// Can be set to * to disable overwrite
448    pub if_none_match: Option<String>,
449    /// Optional ETag that the destination must match
450    pub if_match: Option<ETag>,
451    /// Optional ETag that the source must match
452    pub if_source_match: Option<ETag>,
453    /// Idempotency token
454    pub client_token: Option<String>,
455    /// Can be used for custom headers
456    pub custom_headers: Vec<(String, String)>,
457}
458
459impl RenameObjectParams {
460    /// Create a default [RenameObjectParams].
461    pub fn new() -> Self {
462        Self::default()
463    }
464
465    /// Set if-none-match header
466    pub fn if_none_match(mut self, value: Option<String>) -> Self {
467        self.if_none_match = value;
468        self
469    }
470
471    /// Set if-match header
472    pub fn if_match(mut self, value: Option<ETag>) -> Self {
473        self.if_match = value;
474        self
475    }
476
477    /// Set if-source-match header
478    pub fn if_source_match(mut self, value: Option<ETag>) -> Self {
479        self.if_source_match = value;
480        self
481    }
482
483    /// Set idempotency token
484    pub fn client_token(mut self, value: Option<String>) -> Self {
485        self.client_token = value;
486        self
487    }
488
489    /// Set custom headers
490    pub fn custom_headers(mut self, value: Vec<(String, String)>) -> Self {
491        self.custom_headers = value;
492        self
493    }
494}
495
496/// Result of a [`rename_object`](ObjectClient::rename_object) request
497#[derive(Debug, Eq, PartialEq)]
498#[non_exhaustive]
499pub struct RenameObjectResult {}
500
501#[derive(Debug, Eq, PartialEq)]
502#[non_exhaustive]
503pub enum RenamePreconditionTypes {
504    IfMatch,
505    IfNoneMatch,
506    Other,
507}
508
509/// Errors returned by a [`rename_object`](ObjectClient::rename_object) request
510#[derive(Debug, Error, PartialEq, Eq)]
511#[non_exhaustive]
512pub enum RenameObjectError {
513    #[error("The bucket does not exist")]
514    NoSuchBucket,
515    #[error("The destination key provided is too long")]
516    KeyTooLong,
517    #[error("The key was not found")]
518    KeyNotFound,
519    #[error("A Precondition")]
520    PreConditionFailed(RenamePreconditionTypes),
521    #[error("The service does not implement rename")]
522    NotImplementedError,
523    #[error("The service returned an Internal Error")]
524    InternalError,
525    #[error("You do not have access to this resource")]
526    AccessDenied,
527    #[error("Bad Request")]
528    BadRequest,
529}
530
531pub type ObjectMetadata = HashMap<String, String>;
532
533/// Parameters to a [`put_object`](ObjectClient::put_object) request
534#[derive(Debug, Default, Clone)]
535#[non_exhaustive]
536pub struct PutObjectParams {
537    /// Enable Crc32c trailing checksums.
538    pub trailing_checksums: PutObjectTrailingChecksums,
539    /// Storage class to be used when creating new S3 object
540    pub storage_class: Option<String>,
541    /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
542    pub server_side_encryption: Option<String>,
543    /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
544    /// when creating new S3 object
545    pub ssekms_key_id: Option<String>,
546    /// Custom headers to add to the request
547    pub custom_headers: Vec<(String, String)>,
548    /// User-defined object metadata
549    pub object_metadata: ObjectMetadata,
550}
551
552impl PutObjectParams {
553    /// Create a default [PutObjectParams].
554    pub fn new() -> Self {
555        Self::default()
556    }
557
558    /// Set Crc32c trailing checksums.
559    pub fn trailing_checksums(mut self, value: PutObjectTrailingChecksums) -> Self {
560        self.trailing_checksums = value;
561        self
562    }
563
564    /// Set the storage class.
565    pub fn storage_class(mut self, value: String) -> Self {
566        self.storage_class = Some(value);
567        self
568    }
569
570    /// Set server-side encryption type.
571    pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
572        self.server_side_encryption = value;
573        self
574    }
575
576    /// Set KMS key ID to be used for server-side encryption.
577    pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
578        self.ssekms_key_id = value;
579        self
580    }
581
582    /// Add a custom header to the request.
583    pub fn add_custom_header(mut self, name: String, value: String) -> Self {
584        self.custom_headers.push((name, value));
585        self
586    }
587
588    /// Set user defined object metadata.
589    pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
590        self.object_metadata = value;
591        self
592    }
593}
594
595/// How CRC32c checksums are used for parts of a multi-part PutObject request
596#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
597pub enum PutObjectTrailingChecksums {
598    /// Checksums are computed, passed to upload review, and also sent to S3
599    Enabled,
600    /// Checksums are computed, passed to upload review, but not sent to S3
601    ReviewOnly,
602    /// Checksums are not computed on the client side
603    #[default]
604    Disabled,
605}
606
607/// Info for the caller to review before an upload completes.
608pub type UploadReview = mountpoint_s3_crt::s3::client::UploadReview;
609
610/// Info about a single part, for the caller to review before the upload completes.
611pub type UploadReviewPart = mountpoint_s3_crt::s3::client::UploadReviewPart;
612
613/// A checksum algorithm used by the object client for integrity checks on uploads and downloads.
614pub type ChecksumAlgorithm = mountpoint_s3_crt::s3::client::ChecksumAlgorithm;
615
616/// Parameters to a [`put_object_single`](ObjectClient::put_object_single) request
617#[derive(Debug, Default, Clone)]
618#[non_exhaustive]
619pub struct PutObjectSingleParams {
620    /// User-provided checksum of the data to upload.
621    pub checksum: Option<UploadChecksum>,
622    /// Storage class to be used when creating new S3 object
623    pub storage_class: Option<String>,
624    /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
625    pub server_side_encryption: Option<String>,
626    /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
627    /// when creating new S3 object
628    pub ssekms_key_id: Option<String>,
629    /// Requires pre-existing object to match the given etag in order to perform the request
630    pub if_match: Option<ETag>,
631    /// Offset on the pre-existing object where to append the data in the request
632    pub write_offset_bytes: Option<u64>,
633    /// Custom headers to add to the request
634    pub custom_headers: Vec<(String, String)>,
635    /// User-defined object metadata
636    pub object_metadata: ObjectMetadata,
637}
638
639impl PutObjectSingleParams {
640    /// Create a default [PutObjectSingleParams].
641    pub fn new() -> Self {
642        Self::default()
643    }
644
645    /// Create a [PutObjectSingleParams] for an append request at the given offset.
646    pub fn new_for_append(offset: u64) -> Self {
647        Self::default().write_offset_bytes(offset)
648    }
649
650    /// Set checksum.
651    pub fn checksum(mut self, value: Option<UploadChecksum>) -> Self {
652        self.checksum = value;
653        self
654    }
655
656    /// Set the storage class.
657    pub fn storage_class(mut self, value: String) -> Self {
658        self.storage_class = Some(value);
659        self
660    }
661
662    /// Set server-side encryption type.
663    pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
664        self.server_side_encryption = value;
665        self
666    }
667
668    /// Set KMS key ID to be used for server-side encryption.
669    pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
670        self.ssekms_key_id = value;
671        self
672    }
673
674    /// Set the required etag on the pre-existing object.
675    pub fn if_match(mut self, value: Option<ETag>) -> Self {
676        self.if_match = value;
677        self
678    }
679
680    /// Set the offset on the pre-existing object where to append the data in the request.
681    pub fn write_offset_bytes(mut self, value: u64) -> Self {
682        self.write_offset_bytes = Some(value);
683        self
684    }
685
686    /// Add a custom header to the request.
687    pub fn add_custom_header(mut self, name: String, value: String) -> Self {
688        self.custom_headers.push((name, value));
689        self
690    }
691
692    /// Set user defined object metadata.
693    pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
694        self.object_metadata = value;
695        self
696    }
697}
698
699/// A checksum used by the object client for integrity checks on uploads.
700#[derive(Debug, Clone)]
701#[non_exhaustive]
702pub enum UploadChecksum {
703    Crc64nvme(checksums::Crc64nvme),
704    Crc32c(checksums::Crc32c),
705    Crc32(checksums::Crc32),
706    Sha1(checksums::Sha1),
707    Sha256(checksums::Sha256),
708}
709
710impl UploadChecksum {
711    /// The checksum algorithm used to compute this checksum.
712    pub fn checksum_algorithm(&self) -> ChecksumAlgorithm {
713        match self {
714            UploadChecksum::Crc64nvme(_) => ChecksumAlgorithm::Crc64nvme,
715            UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c,
716            UploadChecksum::Crc32(_) => ChecksumAlgorithm::Crc32,
717            UploadChecksum::Sha1(_) => ChecksumAlgorithm::Sha1,
718            UploadChecksum::Sha256(_) => ChecksumAlgorithm::Sha256,
719        }
720    }
721}
722
723/// A handle for controlling backpressure enabled requests.
724///
725/// If the client was created with `enable_read_backpressure` set true,
726/// each meta request has a flow-control window that shrinks as response
727/// body data is downloaded (headers do not affect the size of the window).
728/// The client's `initial_read_window` determines the starting size of each meta request's window.
729/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
730/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
731/// Maintain a larger window to keep up a high download throughput,
732/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
733/// Maintain a smaller window to limit the amount of data buffered in memory.
734pub trait ClientBackpressureHandle {
735    /// Increment the flow-control read window, so that response data continues downloading.
736    ///
737    /// The client should read data up to and including the end of the read window,
738    /// even if that means fetching and returning data beyond the end of the window
739    /// to fulfil a part-sized GetObject request.
740    /// As an example, a call with `len = 1` could trigger an 8MiB GetObject request
741    /// if the given client fetches data in requests of 8MiB ranges.
742    fn increment_read_window(&mut self, len: usize);
743
744    /// Move the upper bound of the read window to the given offset if it's not already there.
745    fn ensure_read_window(&mut self, desired_end_offset: u64);
746
747    /// Get the upper bound of the read window. When backpressure is enabled, [GetObjectRequest] can
748    /// return data up to this offset *exclusively*.
749    fn read_window_end_offset(&self) -> u64;
750}
751
752/// A streaming response to a GetObject request.
753///
754/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
755/// Each item of the stream is a part of the object body together with the part's offset within the
756/// object.
757#[cfg_attr(not(docsrs), async_trait)]
758pub trait GetObjectResponse:
759    Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
760{
761    type BackpressureHandle: ClientBackpressureHandle + Clone + Send + Sync;
762    type ClientError: std::error::Error + Send + Sync + 'static;
763
764    /// Take the backpressure handle from the response.
765    ///
766    /// If `enable_read_backpressure` is false this call will return `None`,
767    /// no backpressure is being applied and data is being downloaded as fast as possible.
768    fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle>;
769
770    /// Get the object's user defined metadata.
771    fn get_object_metadata(&self) -> ObjectMetadata;
772
773    /// Get the object's checksum, if uploaded with one
774    fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError>;
775}
776
777/// Failures to return object checksum
778#[derive(Debug, Error)]
779pub enum ObjectChecksumError {
780    #[error("requested object checksums, but did not specify it in the request")]
781    DidNotRequestChecksums,
782    #[error("object checksum could not be retrieved from headers")]
783    HeadersError(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
784}
785
786/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
787/// offset within the object and the bytes starting at that offset.
788#[derive(Debug)]
789pub struct GetBodyPart {
790    pub offset: u64,
791    pub data: Bytes,
792}
793
794/// A streaming put request which allows callers to asynchronously write the body of the request.
795///
796/// You can call the [`write`](Self::write) method to write data to the object, and then call
797/// [`complete`](Self::complete) to complete the upload. Alternatively, you can call
798/// [`review_and_complete`](Self::review_and_complete) to review the upload before completing it,
799/// giving the chance to cancel the request if the upload is not as expected.
800///
801/// This is an async trait defined with the [async-trait](https://crates.io/crates/async-trait)
802/// crate, and so implementations of this trait must use the `#[async_trait::async_trait]`
803/// attribute.
804#[cfg_attr(not(docsrs), async_trait)]
805pub trait PutObjectRequest: Send {
806    type ClientError: std::error::Error + Send + Sync + 'static;
807
808    /// Write the given slice to the put request body.
809    async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError>;
810
811    /// Complete the put request and return a [`PutObjectResult`].
812    async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
813
814    /// Review and complete the put request and return a [`PutObjectResult`].
815    async fn review_and_complete(
816        self,
817        review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
818    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
819}
820
821/// Result of a [ObjectClient::put_object] request
822#[derive(Debug)]
823#[non_exhaustive]
824pub struct PutObjectResult {
825    /// ETag of the uploaded object
826    pub etag: ETag,
827    /// Server-side encryption type that was used to store new object (reported by S3)
828    pub sse_type: Option<String>,
829    /// Server-side encryption KMS key ID that was used to store new object (reported by S3)
830    pub sse_kms_key_id: Option<String>,
831}
832
833/// Errors returned by a [`put_object`](ObjectClient::put_object) request
834#[derive(Debug, Error, PartialEq, Eq)]
835#[non_exhaustive]
836pub enum PutObjectError {
837    #[error("The bucket does not exist")]
838    NoSuchBucket,
839
840    #[error("The key does not exist")]
841    NoSuchKey,
842
843    #[error("Request body cannot be empty when write offset is specified")]
844    EmptyBody,
845
846    #[error("The offset does not match the current object size")]
847    InvalidWriteOffset,
848
849    #[error("The provided checksum does not match the data")]
850    BadChecksum,
851
852    #[error("The provided checksum is not valid or does not match the existing checksum algorithm")]
853    InvalidChecksumType,
854
855    #[error("At least one of the pre-conditions you specified did not hold")]
856    PreconditionFailed,
857
858    #[error("The server does not support the functionality required to fulfill the request")]
859    NotImplemented,
860}
861
862/// Restoration status for S3 objects in flexible retrieval storage classes.
863///
864/// See [Checking restore status and expiration
865/// date](https://docs.aws.amazon.com/AmazonS3/latest/userguide/restoring-objects.html#restore-archived-objects-status)
866/// in the *Amazon S3 User Guide* for more details.
867#[derive(Debug, Clone, Copy)]
868pub enum RestoreStatus {
869    /// S3 returns this status after it accepted a restoration request, but not have completed it yet.
870    /// Objects with this status are not readable.
871    InProgress,
872
873    /// This status means that restoration is fully completed. Note that restored objects are stored only
874    /// for the number of days that was specified in the request.
875    Restored { expiry: SystemTime },
876}
877
878/// Metadata about a single S3 object.
879///
880/// See [Object](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html) in the *Amazon S3
881/// API Reference* for more details.
882#[derive(Debug, Clone)]
883#[non_exhaustive]
884pub struct ObjectInfo {
885    /// Key for this object.
886    pub key: String,
887
888    /// Size of this object in bytes.
889    pub size: u64,
890
891    /// The time this object was last modified.
892    pub last_modified: OffsetDateTime,
893
894    /// Storage class for this object. Optional because head_object does not return
895    /// the storage class in its response for Standard objects. See examples in the [*Amazon S3 API
896    /// Reference*](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Examples)
897    pub storage_class: Option<String>,
898
899    /// Objects in flexible retrieval storage classes (such as GLACIER and DEEP_ARCHIVE) are only
900    /// accessible after restoration
901    pub restore_status: Option<RestoreStatus>,
902
903    /// Entity tag of this object.
904    pub etag: String,
905
906    /// The algorithm that was used to create a checksum of the object.
907    ///
908    /// The [Amazon S3 API Reference] specifies this field as a list of strings,
909    /// so we return here a [Vec] of [ChecksumAlgorithm].
910    ///
911    /// [Amazon S3 API Reference]:
912    ///     https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ChecksumAlgorithm
913    pub checksum_algorithms: Vec<ChecksumAlgorithm>,
914}
915
916/// All possible object attributes that can be retrived from [ObjectClient::get_object_attributes].
917/// Fields that you do not specify are not returned.
918#[derive(Debug)]
919pub enum ObjectAttribute {
920    /// ETag of the object
921    ETag,
922
923    /// Checksum of the object
924    Checksum,
925
926    /// Object parts metadata for multi part object
927    ObjectParts,
928
929    /// Storage class of the object
930    StorageClass,
931
932    /// Object size
933    ObjectSize,
934}
935
936impl fmt::Display for ObjectAttribute {
937    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
938        let attr_name = match self {
939            ObjectAttribute::ETag => "ETag",
940            ObjectAttribute::Checksum => "Checksum",
941            ObjectAttribute::ObjectParts => "ObjectParts",
942            ObjectAttribute::StorageClass => "StorageClass",
943            ObjectAttribute::ObjectSize => "ObjectSize",
944        };
945        write!(f, "{attr_name}")
946    }
947}
948
949/// Metadata about object checksum.
950///
951/// See [Checksum](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Checksum.html) in the *Amazon
952/// S3 API Reference* for more details.
953#[derive(Clone, Debug, PartialEq, Eq)]
954pub struct Checksum {
955    /// Base64-encoded, 64-bit CRC64NVME checksum of the object
956    pub checksum_crc64nvme: Option<String>,
957
958    /// Base64-encoded, 32-bit CRC32 checksum of the object
959    pub checksum_crc32: Option<String>,
960
961    /// Base64-encoded, 32-bit CRC32C checksum of the object
962    pub checksum_crc32c: Option<String>,
963
964    /// Base64-encoded, 160-bit SHA-1 digest of the object
965    pub checksum_sha1: Option<String>,
966
967    /// Base64-encoded, 256-bit SHA-256 digest of the object
968    pub checksum_sha256: Option<String>,
969}
970
971impl Checksum {
972    /// Construct an empty [Checksum]
973    pub fn empty() -> Self {
974        Self {
975            checksum_crc64nvme: None,
976            checksum_crc32: None,
977            checksum_crc32c: None,
978            checksum_sha1: None,
979            checksum_sha256: None,
980        }
981    }
982
983    /// Provide [ChecksumAlgorithm]s for the [Checksum], if set and recognized.
984    pub fn algorithms(&self) -> Vec<ChecksumAlgorithm> {
985        // We assume that at most one checksum will be set.
986        let mut algorithms = Vec::with_capacity(1);
987
988        // Pattern match forces us to accomodate any new fields when added.
989        let Self {
990            checksum_crc64nvme,
991            checksum_crc32,
992            checksum_crc32c,
993            checksum_sha1,
994            checksum_sha256,
995        } = &self;
996
997        if checksum_crc64nvme.is_some() {
998            algorithms.push(ChecksumAlgorithm::Crc64nvme);
999        }
1000        if checksum_crc32.is_some() {
1001            algorithms.push(ChecksumAlgorithm::Crc32);
1002        }
1003        if checksum_crc32c.is_some() {
1004            algorithms.push(ChecksumAlgorithm::Crc32c);
1005        }
1006        if checksum_sha1.is_some() {
1007            algorithms.push(ChecksumAlgorithm::Sha1);
1008        }
1009        if checksum_sha256.is_some() {
1010            algorithms.push(ChecksumAlgorithm::Sha256);
1011        }
1012
1013        algorithms
1014    }
1015}
1016
1017impl From<Option<UploadChecksum>> for Checksum {
1018    fn from(value: Option<UploadChecksum>) -> Self {
1019        let mut checksum = Checksum::empty();
1020        match value.as_ref() {
1021            Some(UploadChecksum::Crc64nvme(crc64)) => checksum.checksum_crc64nvme = Some(crc64nvme_to_base64(crc64)),
1022            Some(UploadChecksum::Crc32c(crc32c)) => checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c)),
1023            Some(UploadChecksum::Crc32(crc32)) => checksum.checksum_crc32 = Some(crc32_to_base64(crc32)),
1024            Some(UploadChecksum::Sha1(sha1)) => checksum.checksum_sha1 = Some(sha1_to_base64(sha1)),
1025            Some(UploadChecksum::Sha256(sha256)) => checksum.checksum_sha256 = Some(sha256_to_base64(sha256)),
1026            None => {}
1027        };
1028        checksum
1029    }
1030}
1031
1032/// Metadata about object parts from GetObjectAttributes API.
1033///
1034/// See [GetObjectAttributesParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAttributesParts.html)
1035/// in the *Amazon S3 API Reference* for more details.
1036#[derive(Debug)]
1037pub struct GetObjectAttributesParts {
1038    /// Indicates whether the returned list of parts is truncated
1039    pub is_truncated: Option<bool>,
1040
1041    /// Maximum number of parts allowed in the response
1042    pub max_parts: Option<usize>,
1043
1044    /// When a list is truncated, this element specifies the next marker
1045    pub next_part_number_marker: Option<usize>,
1046
1047    /// The marker for the current part
1048    pub part_number_marker: Option<usize>,
1049
1050    /// Array of metadata for particular parts
1051    pub parts: Option<Vec<ObjectPart>>,
1052
1053    /// Total number of parts
1054    pub total_parts_count: Option<usize>,
1055}
1056
1057/// Metadata for an individual object part.
1058///
1059/// See [ObjectPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ObjectPart.html) in the
1060/// *Amazon S3 API Reference* for more details.
1061#[derive(Debug)]
1062pub struct ObjectPart {
1063    /// Checksum of the object
1064    pub checksum: Option<Checksum>,
1065
1066    /// Number of the part, this value is a positive integer between 1 and 10,000
1067    pub part_number: usize,
1068
1069    /// Size of the part in bytes
1070    pub size: usize,
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075    use super::*;
1076
1077    #[test]
1078    fn test_checksum_algorithm_one_set() {
1079        let checksum = Checksum {
1080            checksum_crc64nvme: None,
1081            checksum_crc32: None,
1082            checksum_crc32c: None,
1083            checksum_sha1: Some("checksum_sha1".to_string()),
1084            checksum_sha256: None,
1085        };
1086        assert_eq!(checksum.algorithms(), vec![ChecksumAlgorithm::Sha1]);
1087    }
1088
1089    #[test]
1090    fn test_checksum_algorithm_none_set() {
1091        let checksum = Checksum {
1092            checksum_crc64nvme: None,
1093            checksum_crc32: None,
1094            checksum_crc32c: None,
1095            checksum_sha1: None,
1096            checksum_sha256: None,
1097        };
1098        assert_eq!(checksum.algorithms(), vec![]);
1099    }
1100
1101    #[test]
1102    fn test_checksum_algorithm_many_set() {
1103        // Amazon S3 doesn't support more than one algorithm today, but just in case... let's show we don't panic.
1104        let checksum = Checksum {
1105            checksum_crc64nvme: None,
1106            checksum_crc32: None,
1107            checksum_crc32c: Some("checksum_crc32c".to_string()),
1108            checksum_sha1: Some("checksum_sha1".to_string()),
1109            checksum_sha256: None,
1110        };
1111        assert_eq!(
1112            checksum.algorithms(),
1113            vec![ChecksumAlgorithm::Crc32c, ChecksumAlgorithm::Sha1],
1114        );
1115    }
1116}