mountpoint_s3_client/
object_client.rs

1use std::fmt::{self, Debug};
2use std::ops::Range;
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use auto_impl::auto_impl;
7use bytes::Bytes;
8use futures::Stream;
9use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
10use std::collections::HashMap;
11use thiserror::Error;
12use time::OffsetDateTime;
13
14use crate::checksums::{
15    self, crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64,
16};
17use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
18
19mod etag;
20pub use etag::ETag;
21
22/// A generic interface to S3-like object storage services.
23///
24/// This trait defines the common methods that all object services implement.
25///
26/// This is an async trait defined with the [async-trait](https://crates.io/crates/async-trait)
27/// crate, and so implementations of this trait must use the `#[async_trait::async_trait]`
28/// attribute.
29#[cfg_attr(not(docsrs), async_trait)]
30#[auto_impl(Arc)]
31pub trait ObjectClient {
32    type GetObjectResponse: GetObjectResponse<ClientError = Self::ClientError>;
33    type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
34    type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;
35
36    /// Query the part size this client uses for GET operations to the object store.
37    fn read_part_size(&self) -> usize;
38
39    /// Query the part size this client uses for PUT operations to the object store.
40    fn write_part_size(&self) -> usize;
41
42    /// Query the initial read window size this client uses for backpressure GetObject requests.
43    /// This can be `None` if backpressure is disabled.
44    fn initial_read_window_size(&self) -> Option<usize>;
45
46    /// Query current memory usage stats for the client. This can be `None` if the client
47    /// does not record the stats.
48    fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;
49
50    /// Delete a single object from the object store.
51    ///
52    /// DeleteObject will succeed even if the object within the bucket does not exist.
53    async fn delete_object(
54        &self,
55        bucket: &str,
56        key: &str,
57    ) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError>;
58
59    /// Create a copy of an existing object. Currently, this functionality has the following limitations:
60    /// - Supported only for copying between matching bucket types:
61    ///     - Standard S3 to Standard S3 buckets.
62    ///     - S3 Express to S3 Express buckets.
63    /// - Host header must use virtual host addressing style (path style is not supported) and both source and dest buckets must have dns compliant name.
64    /// - Only {bucket}/{key} format is supported for source and passing arn as source will not work.
65    /// - Source bucket is assumed to be in the same region as destination bucket.
66    async fn copy_object(
67        &self,
68        source_bucket: &str,
69        source_key: &str,
70        destination_bucket: &str,
71        destination_key: &str,
72        params: &CopyObjectParams,
73    ) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError>;
74
75    /// Get an object from the object store. Returns a stream of body parts of the object. Parts are
76    /// guaranteed to be returned by the stream in order and contiguously.
77    async fn get_object(
78        &self,
79        bucket: &str,
80        key: &str,
81        params: &GetObjectParams,
82    ) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError>;
83
84    /// List the objects in a bucket under a given prefix
85    async fn list_objects(
86        &self,
87        bucket: &str,
88        continuation_token: Option<&str>,
89        delimiter: &str,
90        max_keys: usize,
91        prefix: &str,
92    ) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError>;
93
94    /// Retrieve object metadata without retrieving the object contents
95    async fn head_object(
96        &self,
97        bucket: &str,
98        key: &str,
99        params: &HeadObjectParams,
100    ) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError>;
101
102    /// Put an object into the object store. Returns a [PutObjectRequest] for callers
103    /// to provide the content of the object.
104    async fn put_object(
105        &self,
106        bucket: &str,
107        key: &str,
108        params: &PutObjectParams,
109    ) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError>;
110
111    /// Put an object into the object store.
112    async fn put_object_single<'a>(
113        &self,
114        bucket: &str,
115        key: &str,
116        params: &PutObjectSingleParams,
117        contents: impl AsRef<[u8]> + Send + 'a,
118    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
119
120    /// Retrieves all the metadata from an object without returning the object contents.
121    async fn get_object_attributes(
122        &self,
123        bucket: &str,
124        key: &str,
125        max_parts: Option<usize>,
126        part_number_marker: Option<usize>,
127        object_attributes: &[ObjectAttribute],
128    ) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError>;
129
130    /// Rename an object from some source key to a destination key within the same bucket.
131    async fn rename_object(
132        &self,
133        bucket: &str,
134        src_key: &str,
135        dest_key: &str,
136        params: &RenameObjectParams,
137    ) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError>;
138}
139
140/// The top-level error type returned by calls to an [`ObjectClient`].
141///
142/// Errors that are explicitly modeled on a per-request-type basis are [`ServiceError`]s. Other
143/// generic or unhandled errors are [`ClientError`]s.
144///
145/// The distinction between these two types of error can sometimes be blurry. As a rough heuristic,
146/// [`ServiceError`]s are those that *any reasonable implementation* of an object client would be
147/// capable of experiencing, and [`ClientError`]s are anything else. For example, any object client
148/// could experience a "no such key" error, but only object clients that implement a permissions
149/// system could experience "permission denied" errors. When in doubt, we err towards *not* adding
150/// new [`ServiceError`]s, as they are public API for *every* object client.
151///
152/// [`ServiceError`]: ObjectClientError::ServiceError
153/// [`ClientError`]: ObjectClientError::ClientError
154#[derive(Debug, Error, PartialEq)]
155pub enum ObjectClientError<S, C> {
156    /// An error returned by the service itself
157    #[error("Service error")]
158    ServiceError(#[source] S),
159
160    /// An error within the object client (for example, an unexpected response, or a failure to
161    /// construct the request).
162    #[error("Client error")]
163    ClientError(#[from] C),
164}
165
166impl<S, C> ProvideErrorMetadata for ObjectClientError<S, C>
167where
168    S: ProvideErrorMetadata,
169    C: ProvideErrorMetadata,
170{
171    fn meta(&self) -> ClientErrorMetadata {
172        match self {
173            Self::ClientError(err) => err.meta(),
174            Self::ServiceError(err) => err.meta(),
175        }
176    }
177}
178
179impl ProvideErrorMetadata for GetObjectError {
180    fn meta(&self) -> ClientErrorMetadata {
181        match self {
182            GetObjectError::NoSuchBucket(client_error_metadata) => client_error_metadata.clone(),
183            GetObjectError::NoSuchKey(client_error_metadata) => client_error_metadata.clone(),
184            GetObjectError::PreconditionFailed(client_error_metadata) => client_error_metadata.clone(),
185        }
186    }
187}
188
189impl ProvideErrorMetadata for ListObjectsError {
190    fn meta(&self) -> ClientErrorMetadata {
191        Default::default()
192    }
193}
194
195impl ProvideErrorMetadata for HeadObjectError {
196    fn meta(&self) -> ClientErrorMetadata {
197        Default::default()
198    }
199}
200
201impl ProvideErrorMetadata for DeleteObjectError {
202    fn meta(&self) -> ClientErrorMetadata {
203        Default::default()
204    }
205}
206
207impl ProvideErrorMetadata for RenameObjectError {
208    fn meta(&self) -> ClientErrorMetadata {
209        Default::default()
210    }
211}
212
213impl ProvideErrorMetadata for PutObjectError {
214    fn meta(&self) -> ClientErrorMetadata {
215        Default::default()
216    }
217}
218
219/// Shorthand type for the result of an object client request
220pub type ObjectClientResult<T, S, C> = Result<T, ObjectClientError<S, C>>;
221
222/// Errors returned by a [`get_object`](ObjectClient::get_object) request
223#[derive(Debug, Error, PartialEq, Eq)]
224#[non_exhaustive]
225pub enum GetObjectError {
226    #[error("The bucket does not exist")]
227    NoSuchBucket(ClientErrorMetadata),
228
229    #[error("The key does not exist")]
230    NoSuchKey(ClientErrorMetadata),
231
232    #[error("At least one of the preconditions specified did not hold")]
233    PreconditionFailed(ClientErrorMetadata),
234}
235
236/// Parameters to a [`get_object`](ObjectClient::get_object) request
237#[derive(Debug, Default, Clone)]
238#[non_exhaustive]
239pub struct GetObjectParams {
240    pub range: Option<Range<u64>>,
241    pub if_match: Option<ETag>,
242    pub checksum_mode: Option<ChecksumMode>,
243}
244
245impl GetObjectParams {
246    /// Create a default [GetObjectParams].
247    pub fn new() -> Self {
248        Self::default()
249    }
250
251    /// Set the range retrieved by the GetObject request
252    pub fn range(mut self, value: Option<Range<u64>>) -> Self {
253        self.range = value;
254        self
255    }
256
257    /// Set the required etag on the object
258    pub fn if_match(mut self, value: Option<ETag>) -> Self {
259        self.if_match = value;
260        self
261    }
262
263    /// Set option to retrieve checksum as part of the GetObject request
264    pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
265        self.checksum_mode = value;
266        self
267    }
268}
269
270/// Result of a [`list_objects`](ObjectClient::list_objects) request
271#[derive(Debug)]
272#[non_exhaustive]
273pub struct ListObjectsResult {
274    /// The list of objects.
275    pub objects: Vec<ObjectInfo>,
276
277    /// The list of common prefixes. This rolls up all of the objects with a common prefix up to
278    /// the next instance of the delimiter.
279    pub common_prefixes: Vec<String>,
280
281    /// If present, the continuation token to use to query more results.
282    pub next_continuation_token: Option<String>,
283}
284
285/// Errors returned by a [`list_objects`](ObjectClient::list_objects) request
286#[derive(Debug, Error, PartialEq, Eq)]
287#[non_exhaustive]
288pub enum ListObjectsError {
289    #[error("The bucket does not exist")]
290    NoSuchBucket,
291}
292
293/// Parameters to a [`head_object`](ObjectClient::head_object) request
294#[derive(Debug, Default, Clone)]
295#[non_exhaustive]
296pub struct HeadObjectParams {
297    /// Enable to retrieve checksum as part of the HeadObject request
298    pub checksum_mode: Option<ChecksumMode>,
299}
300
301impl HeadObjectParams {
302    /// Create a default [HeadObjectParams].
303    pub fn new() -> Self {
304        Self::default()
305    }
306
307    /// Set option to retrieve checksum as part of the HeadObject request
308    pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
309        self.checksum_mode = value;
310        self
311    }
312}
313
314/// Enable [ChecksumMode] to retrieve object checksums
315#[non_exhaustive]
316#[derive(Clone, Debug, PartialEq)]
317pub enum ChecksumMode {
318    /// Retrieve checksums
319    Enabled,
320}
321
322/// Result of a [`head_object`](ObjectClient::head_object) request
323#[derive(Debug)]
324#[non_exhaustive]
325pub struct HeadObjectResult {
326    /// Size of the object in bytes.
327    ///
328    /// Refers to the `Content-Length` HTTP header for HeadObject.
329    pub size: u64,
330
331    /// The time this object was last modified.
332    pub last_modified: OffsetDateTime,
333
334    /// Entity tag of this object.
335    pub etag: ETag,
336
337    /// Storage class for this object.
338    ///
339    /// The value is optional because HeadObject does not return the storage class in its response
340    /// for objects in the S3 Standard storage class.
341    /// See examples in the
342    /// [Amazon S3 API Reference](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Examples).
343    pub storage_class: Option<String>,
344
345    /// Objects in flexible retrieval storage classes (such as GLACIER and DEEP_ARCHIVE) are only
346    /// accessible after restoration
347    pub restore_status: Option<RestoreStatus>,
348    /// Checksum of the object.
349    ///
350    /// HeadObject must explicitly request for this field to be included,
351    /// otherwise the values will be empty.
352    pub checksum: Checksum,
353
354    /// Server-side encryption type that was used to store the object.
355    pub sse_type: Option<String>,
356
357    /// Server-side encryption KMS key ID that was used to store the object.
358    pub sse_kms_key_id: Option<String>,
359}
360
361/// Errors returned by a [`head_object`](ObjectClient::head_object) request
362#[derive(Debug, Error, PartialEq, Eq)]
363#[non_exhaustive]
364pub enum HeadObjectError {
365    /// Note that HeadObject cannot distinguish between NoSuchBucket and NoSuchKey errors
366    #[error("The object was not found")]
367    NotFound,
368}
369
370/// Result of a [`delete_object`](ObjectClient::delete_object) request
371///
372/// Note: DeleteObject requests on a non-existent object within a bucket are considered a success.
373// TODO: Populate this struct with return fields from the S3 API, e.g., version id, delete marker.
374#[derive(Debug)]
375#[non_exhaustive]
376pub struct DeleteObjectResult {}
377
378/// Errors returned by a [`delete_object`](ObjectClient::delete_object) request
379#[derive(Debug, Error, PartialEq, Eq)]
380#[non_exhaustive]
381pub enum DeleteObjectError {
382    #[error("The bucket does not exist")]
383    NoSuchBucket,
384}
385
386/// Result of a [`copy_object`](ObjectClient::copy_object) request
387#[derive(Debug)]
388#[non_exhaustive]
389pub struct CopyObjectResult {
390    // TODO: Populate this struct with return fields from the S3 API, e.g., etag.
391}
392
393/// Errors returned by a [`copy_object`](ObjectClient::copy_object) request
394#[derive(Debug, Error, PartialEq, Eq)]
395#[non_exhaustive]
396pub enum CopyObjectError {
397    /// Note that CopyObject cannot distinguish between NoSuchBucket and NoSuchKey errors
398    #[error("The object was not found")]
399    NotFound,
400
401    #[error("The source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glacier.")]
402    ObjectNotInActiveTierError,
403}
404
405/// Parameters to a [`copy_object`](ObjectClient::copy_object) request
406#[derive(Debug, Default, Clone)]
407#[non_exhaustive]
408pub struct CopyObjectParams {
409    // TODO: Populate this struct with fields as and when required to satisfy various use cases.
410}
411
412impl CopyObjectParams {
413    /// Create a default [CopyObjectParams].
414    pub fn new() -> Self {
415        Self::default()
416    }
417}
418
419/// Result of a [`get_object_attributes`](ObjectClient::get_object_attributes) request
420#[derive(Debug, Default)]
421pub struct GetObjectAttributesResult {
422    /// ETag of the object
423    pub etag: Option<String>,
424
425    /// Checksum of the object
426    pub checksum: Option<Checksum>,
427
428    /// Object parts metadata for multi part object
429    pub object_parts: Option<GetObjectAttributesParts>,
430
431    /// Storage class of the object
432    pub storage_class: Option<String>,
433
434    /// Object size
435    pub object_size: Option<u64>,
436}
437
438/// Errors returned by a [`get_object_attributes`](ObjectClient::get_object_attributes) request
439#[derive(Debug, Error, PartialEq, Eq)]
440#[non_exhaustive]
441pub enum GetObjectAttributesError {
442    #[error("The bucket does not exist")]
443    NoSuchBucket,
444
445    #[error("The key does not exist")]
446    NoSuchKey,
447}
448
449/// Parameters to a [`rename_object`](ObjectClient::rename_object) request
450#[derive(Debug, Default, Clone)]
451#[non_exhaustive]
452pub struct RenameObjectParams {
453    /// Can be set to * to disable overwrite
454    pub if_none_match: Option<String>,
455    /// Optional ETag that the destination must match
456    pub if_match: Option<ETag>,
457    /// Optional ETag that the source must match
458    pub if_source_match: Option<ETag>,
459    /// Idempotency token
460    pub client_token: Option<String>,
461    /// Can be used for custom headers
462    pub custom_headers: Vec<(String, String)>,
463}
464
465impl RenameObjectParams {
466    /// Create a default [RenameObjectParams].
467    pub fn new() -> Self {
468        Self::default()
469    }
470
471    /// Set if-none-match header
472    pub fn if_none_match(mut self, value: Option<String>) -> Self {
473        self.if_none_match = value;
474        self
475    }
476
477    /// Set if-match header
478    pub fn if_match(mut self, value: Option<ETag>) -> Self {
479        self.if_match = value;
480        self
481    }
482
483    /// Set if-source-match header
484    pub fn if_source_match(mut self, value: Option<ETag>) -> Self {
485        self.if_source_match = value;
486        self
487    }
488
489    /// Set idempotency token
490    pub fn client_token(mut self, value: Option<String>) -> Self {
491        self.client_token = value;
492        self
493    }
494
495    /// Set custom headers
496    pub fn custom_headers(mut self, value: Vec<(String, String)>) -> Self {
497        self.custom_headers = value;
498        self
499    }
500}
501
502/// Result of a [`rename_object`](ObjectClient::rename_object) request
503#[derive(Debug, Eq, PartialEq)]
504#[non_exhaustive]
505pub struct RenameObjectResult {}
506
507#[derive(Debug, Eq, PartialEq)]
508#[non_exhaustive]
509pub enum RenamePreconditionTypes {
510    IfMatch,
511    IfNoneMatch,
512    Other,
513}
514
515/// Errors returned by a [`rename_object`](ObjectClient::rename_object) request
516#[derive(Debug, Error, PartialEq, Eq)]
517#[non_exhaustive]
518pub enum RenameObjectError {
519    #[error("The bucket does not exist")]
520    NoSuchBucket,
521    #[error("The destination key provided is too long")]
522    KeyTooLong,
523    #[error("The key was not found")]
524    KeyNotFound,
525    #[error("A Precondition")]
526    PreConditionFailed(RenamePreconditionTypes),
527    #[error("The service does not implement rename")]
528    NotImplementedError,
529    #[error("The service returned an Internal Error")]
530    InternalError,
531    #[error("You do not have access to this resource")]
532    AccessDenied,
533    #[error("Bad Request")]
534    BadRequest,
535}
536
537pub type ObjectMetadata = HashMap<String, String>;
538
539/// Parameters to a [`put_object`](ObjectClient::put_object) request
540#[derive(Debug, Default, Clone)]
541#[non_exhaustive]
542pub struct PutObjectParams {
543    /// Enable Crc32c trailing checksums.
544    pub trailing_checksums: PutObjectTrailingChecksums,
545    /// Storage class to be used when creating new S3 object
546    pub storage_class: Option<String>,
547    /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
548    pub server_side_encryption: Option<String>,
549    /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
550    /// when creating new S3 object
551    pub ssekms_key_id: Option<String>,
552    /// Custom headers to add to the request
553    pub custom_headers: Vec<(String, String)>,
554    /// User-defined object metadata
555    pub object_metadata: ObjectMetadata,
556}
557
558impl PutObjectParams {
559    /// Create a default [PutObjectParams].
560    pub fn new() -> Self {
561        Self::default()
562    }
563
564    /// Set Crc32c trailing checksums.
565    pub fn trailing_checksums(mut self, value: PutObjectTrailingChecksums) -> Self {
566        self.trailing_checksums = value;
567        self
568    }
569
570    /// Set the storage class.
571    pub fn storage_class(mut self, value: String) -> Self {
572        self.storage_class = Some(value);
573        self
574    }
575
576    /// Set server-side encryption type.
577    pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
578        self.server_side_encryption = value;
579        self
580    }
581
582    /// Set KMS key ID to be used for server-side encryption.
583    pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
584        self.ssekms_key_id = value;
585        self
586    }
587
588    /// Add a custom header to the request.
589    pub fn add_custom_header(mut self, name: String, value: String) -> Self {
590        self.custom_headers.push((name, value));
591        self
592    }
593
594    /// Set user defined object metadata.
595    pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
596        self.object_metadata = value;
597        self
598    }
599}
600
601/// How CRC32c checksums are used for parts of a multi-part PutObject request
602#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
603pub enum PutObjectTrailingChecksums {
604    /// Checksums are computed, passed to upload review, and also sent to S3
605    Enabled,
606    /// Checksums are computed, passed to upload review, but not sent to S3
607    ReviewOnly,
608    /// Checksums are not computed on the client side
609    #[default]
610    Disabled,
611}
612
613/// Info for the caller to review before an upload completes.
614pub type UploadReview = mountpoint_s3_crt::s3::client::UploadReview;
615
616/// Info about a single part, for the caller to review before the upload completes.
617pub type UploadReviewPart = mountpoint_s3_crt::s3::client::UploadReviewPart;
618
619/// A checksum algorithm used by the object client for integrity checks on uploads and downloads.
620pub type ChecksumAlgorithm = mountpoint_s3_crt::s3::client::ChecksumAlgorithm;
621
622/// Parameters to a [`put_object_single`](ObjectClient::put_object_single) request
623#[derive(Debug, Default, Clone)]
624#[non_exhaustive]
625pub struct PutObjectSingleParams {
626    /// User-provided checksum of the data to upload.
627    pub checksum: Option<UploadChecksum>,
628    /// Storage class to be used when creating new S3 object
629    pub storage_class: Option<String>,
630    /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
631    pub server_side_encryption: Option<String>,
632    /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
633    /// when creating new S3 object
634    pub ssekms_key_id: Option<String>,
635    /// Requires pre-existing object to match the given etag in order to perform the request
636    pub if_match: Option<ETag>,
637    /// Offset on the pre-existing object where to append the data in the request
638    pub write_offset_bytes: Option<u64>,
639    /// Custom headers to add to the request
640    pub custom_headers: Vec<(String, String)>,
641    /// User-defined object metadata
642    pub object_metadata: ObjectMetadata,
643}
644
645impl PutObjectSingleParams {
646    /// Create a default [PutObjectSingleParams].
647    pub fn new() -> Self {
648        Self::default()
649    }
650
651    /// Create a [PutObjectSingleParams] for an append request at the given offset.
652    pub fn new_for_append(offset: u64) -> Self {
653        Self::default().write_offset_bytes(offset)
654    }
655
656    /// Set checksum.
657    pub fn checksum(mut self, value: Option<UploadChecksum>) -> Self {
658        self.checksum = value;
659        self
660    }
661
662    /// Set the storage class.
663    pub fn storage_class(mut self, value: String) -> Self {
664        self.storage_class = Some(value);
665        self
666    }
667
668    /// Set server-side encryption type.
669    pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
670        self.server_side_encryption = value;
671        self
672    }
673
674    /// Set KMS key ID to be used for server-side encryption.
675    pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
676        self.ssekms_key_id = value;
677        self
678    }
679
680    /// Set the required etag on the pre-existing object.
681    pub fn if_match(mut self, value: Option<ETag>) -> Self {
682        self.if_match = value;
683        self
684    }
685
686    /// Set the offset on the pre-existing object where to append the data in the request.
687    pub fn write_offset_bytes(mut self, value: u64) -> Self {
688        self.write_offset_bytes = Some(value);
689        self
690    }
691
692    /// Add a custom header to the request.
693    pub fn add_custom_header(mut self, name: String, value: String) -> Self {
694        self.custom_headers.push((name, value));
695        self
696    }
697
698    /// Set user defined object metadata.
699    pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
700        self.object_metadata = value;
701        self
702    }
703}
704
705/// A checksum used by the object client for integrity checks on uploads.
706#[derive(Debug, Clone)]
707#[non_exhaustive]
708pub enum UploadChecksum {
709    Crc64nvme(checksums::Crc64nvme),
710    Crc32c(checksums::Crc32c),
711    Crc32(checksums::Crc32),
712    Sha1(checksums::Sha1),
713    Sha256(checksums::Sha256),
714}
715
716impl UploadChecksum {
717    /// The checksum algorithm used to compute this checksum.
718    pub fn checksum_algorithm(&self) -> ChecksumAlgorithm {
719        match self {
720            UploadChecksum::Crc64nvme(_) => ChecksumAlgorithm::Crc64nvme,
721            UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c,
722            UploadChecksum::Crc32(_) => ChecksumAlgorithm::Crc32,
723            UploadChecksum::Sha1(_) => ChecksumAlgorithm::Sha1,
724            UploadChecksum::Sha256(_) => ChecksumAlgorithm::Sha256,
725        }
726    }
727}
728
729/// A handle for controlling backpressure enabled requests.
730///
731/// If the client was created with `enable_read_backpressure` set true,
732/// each meta request has a flow-control window that shrinks as response
733/// body data is downloaded (headers do not affect the size of the window).
734/// The client's `initial_read_window` determines the starting size of each meta request's window.
735/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
736/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
737/// Maintain a larger window to keep up a high download throughput,
738/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
739/// Maintain a smaller window to limit the amount of data buffered in memory.
740pub trait ClientBackpressureHandle {
741    /// Increment the flow-control read window, so that response data continues downloading.
742    ///
743    /// The client should read data up to and including the end of the read window,
744    /// even if that means fetching and returning data beyond the end of the window
745    /// to fulfil a part-sized GetObject request.
746    /// As an example, a call with `len = 1` could trigger an 8MiB GetObject request
747    /// if the given client fetches data in requests of 8MiB ranges.
748    fn increment_read_window(&mut self, len: usize);
749
750    /// Move the upper bound of the read window to the given offset if it's not already there.
751    fn ensure_read_window(&mut self, desired_end_offset: u64);
752
753    /// Get the upper bound of the read window. When backpressure is enabled, [GetObjectRequest] can
754    /// return data up to this offset *exclusively*.
755    fn read_window_end_offset(&self) -> u64;
756}
757
758/// A streaming response to a GetObject request.
759///
760/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
761/// Each item of the stream is a part of the object body together with the part's offset within the
762/// object.
763#[cfg_attr(not(docsrs), async_trait)]
764pub trait GetObjectResponse:
765    Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
766{
767    type BackpressureHandle: ClientBackpressureHandle + Clone + Send + Sync;
768    type ClientError: std::error::Error + Send + Sync + 'static;
769
770    /// Take the backpressure handle from the response.
771    ///
772    /// If `enable_read_backpressure` is false this call will return `None`,
773    /// no backpressure is being applied and data is being downloaded as fast as possible.
774    fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle>;
775
776    /// Get the object's user defined metadata.
777    fn get_object_metadata(&self) -> ObjectMetadata;
778
779    /// Get the object's checksum, if uploaded with one
780    fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError>;
781}
782
783/// Failures to return object checksum
784#[derive(Debug, Error)]
785pub enum ObjectChecksumError {
786    #[error("requested object checksums, but did not specify it in the request")]
787    DidNotRequestChecksums,
788    #[error("object checksum could not be retrieved from headers")]
789    HeadersError(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
790}
791
792/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
793/// offset within the object and the bytes starting at that offset.
794#[derive(Debug)]
795pub struct GetBodyPart {
796    pub offset: u64,
797    pub data: Bytes,
798}
799
800/// A streaming put request which allows callers to asynchronously write the body of the request.
801///
802/// You can call the [`write`](Self::write) method to write data to the object, and then call
803/// [`complete`](Self::complete) to complete the upload. Alternatively, you can call
804/// [`review_and_complete`](Self::review_and_complete) to review the upload before completing it,
805/// giving the chance to cancel the request if the upload is not as expected.
806///
807/// This is an async trait defined with the [async-trait](https://crates.io/crates/async-trait)
808/// crate, and so implementations of this trait must use the `#[async_trait::async_trait]`
809/// attribute.
810#[cfg_attr(not(docsrs), async_trait)]
811pub trait PutObjectRequest: Send {
812    type ClientError: std::error::Error + Send + Sync + 'static;
813
814    /// Write the given slice to the put request body.
815    async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError>;
816
817    /// Complete the put request and return a [`PutObjectResult`].
818    async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
819
820    /// Review and complete the put request and return a [`PutObjectResult`].
821    async fn review_and_complete(
822        self,
823        review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
824    ) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
825}
826
827/// Result of a [ObjectClient::put_object] request
828#[derive(Debug)]
829#[non_exhaustive]
830pub struct PutObjectResult {
831    /// ETag of the uploaded object
832    pub etag: ETag,
833    /// Server-side encryption type that was used to store new object (reported by S3)
834    pub sse_type: Option<String>,
835    /// Server-side encryption KMS key ID that was used to store new object (reported by S3)
836    pub sse_kms_key_id: Option<String>,
837}
838
839/// Errors returned by a [`put_object`](ObjectClient::put_object) request
840#[derive(Debug, Error, PartialEq, Eq)]
841#[non_exhaustive]
842pub enum PutObjectError {
843    #[error("The bucket does not exist")]
844    NoSuchBucket,
845
846    #[error("The key does not exist")]
847    NoSuchKey,
848
849    #[error("Request body cannot be empty when write offset is specified")]
850    EmptyBody,
851
852    #[error("The offset does not match the current object size")]
853    InvalidWriteOffset,
854
855    #[error("The provided checksum does not match the data")]
856    BadChecksum,
857
858    #[error("The provided checksum is not valid or does not match the existing checksum algorithm")]
859    InvalidChecksumType,
860
861    #[error("At least one of the pre-conditions you specified did not hold")]
862    PreconditionFailed,
863
864    #[error("The server does not support the functionality required to fulfill the request")]
865    NotImplemented,
866}
867
868/// Restoration status for S3 objects in flexible retrieval storage classes.
869///
870/// See [Checking restore status and expiration
871/// date](https://docs.aws.amazon.com/AmazonS3/latest/userguide/restoring-objects.html#restore-archived-objects-status)
872/// in the *Amazon S3 User Guide* for more details.
873#[derive(Debug, Clone, Copy)]
874pub enum RestoreStatus {
875    /// S3 returns this status after it accepted a restoration request, but not have completed it yet.
876    /// Objects with this status are not readable.
877    InProgress,
878
879    /// This status means that restoration is fully completed. Note that restored objects are stored only
880    /// for the number of days that was specified in the request.
881    Restored { expiry: SystemTime },
882}
883
884/// Metadata about a single S3 object.
885///
886/// See [Object](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html) in the *Amazon S3
887/// API Reference* for more details.
888#[derive(Debug, Clone)]
889#[non_exhaustive]
890pub struct ObjectInfo {
891    /// Key for this object.
892    pub key: String,
893
894    /// Size of this object in bytes.
895    pub size: u64,
896
897    /// The time this object was last modified.
898    pub last_modified: OffsetDateTime,
899
900    /// Storage class for this object. Optional because head_object does not return
901    /// the storage class in its response for Standard objects. See examples in the [*Amazon S3 API
902    /// Reference*](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Examples)
903    pub storage_class: Option<String>,
904
905    /// Objects in flexible retrieval storage classes (such as GLACIER and DEEP_ARCHIVE) are only
906    /// accessible after restoration
907    pub restore_status: Option<RestoreStatus>,
908
909    /// Entity tag of this object.
910    pub etag: String,
911
912    /// The algorithm that was used to create a checksum of the object.
913    ///
914    /// The [Amazon S3 API Reference] specifies this field as a list of strings,
915    /// so we return here a [Vec] of [ChecksumAlgorithm].
916    ///
917    /// [Amazon S3 API Reference]:
918    ///     https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ChecksumAlgorithm
919    pub checksum_algorithms: Vec<ChecksumAlgorithm>,
920}
921
922/// All possible object attributes that can be retrived from [ObjectClient::get_object_attributes].
923/// Fields that you do not specify are not returned.
924#[derive(Debug)]
925pub enum ObjectAttribute {
926    /// ETag of the object
927    ETag,
928
929    /// Checksum of the object
930    Checksum,
931
932    /// Object parts metadata for multi part object
933    ObjectParts,
934
935    /// Storage class of the object
936    StorageClass,
937
938    /// Object size
939    ObjectSize,
940}
941
942impl fmt::Display for ObjectAttribute {
943    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
944        let attr_name = match self {
945            ObjectAttribute::ETag => "ETag",
946            ObjectAttribute::Checksum => "Checksum",
947            ObjectAttribute::ObjectParts => "ObjectParts",
948            ObjectAttribute::StorageClass => "StorageClass",
949            ObjectAttribute::ObjectSize => "ObjectSize",
950        };
951        write!(f, "{attr_name}")
952    }
953}
954
955/// Metadata about object checksum.
956///
957/// See [Checksum](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Checksum.html) in the *Amazon
958/// S3 API Reference* for more details.
959#[derive(Clone, Debug, PartialEq, Eq)]
960pub struct Checksum {
961    /// Base64-encoded, 64-bit CRC64NVME checksum of the object
962    pub checksum_crc64nvme: Option<String>,
963
964    /// Base64-encoded, 32-bit CRC32 checksum of the object
965    pub checksum_crc32: Option<String>,
966
967    /// Base64-encoded, 32-bit CRC32C checksum of the object
968    pub checksum_crc32c: Option<String>,
969
970    /// Base64-encoded, 160-bit SHA-1 digest of the object
971    pub checksum_sha1: Option<String>,
972
973    /// Base64-encoded, 256-bit SHA-256 digest of the object
974    pub checksum_sha256: Option<String>,
975}
976
977impl Checksum {
978    /// Construct an empty [Checksum]
979    pub fn empty() -> Self {
980        Self {
981            checksum_crc64nvme: None,
982            checksum_crc32: None,
983            checksum_crc32c: None,
984            checksum_sha1: None,
985            checksum_sha256: None,
986        }
987    }
988
989    /// Provide [ChecksumAlgorithm]s for the [Checksum], if set and recognized.
990    pub fn algorithms(&self) -> Vec<ChecksumAlgorithm> {
991        // We assume that at most one checksum will be set.
992        let mut algorithms = Vec::with_capacity(1);
993
994        // Pattern match forces us to accomodate any new fields when added.
995        let Self {
996            checksum_crc64nvme,
997            checksum_crc32,
998            checksum_crc32c,
999            checksum_sha1,
1000            checksum_sha256,
1001        } = &self;
1002
1003        if checksum_crc64nvme.is_some() {
1004            algorithms.push(ChecksumAlgorithm::Crc64nvme);
1005        }
1006        if checksum_crc32.is_some() {
1007            algorithms.push(ChecksumAlgorithm::Crc32);
1008        }
1009        if checksum_crc32c.is_some() {
1010            algorithms.push(ChecksumAlgorithm::Crc32c);
1011        }
1012        if checksum_sha1.is_some() {
1013            algorithms.push(ChecksumAlgorithm::Sha1);
1014        }
1015        if checksum_sha256.is_some() {
1016            algorithms.push(ChecksumAlgorithm::Sha256);
1017        }
1018
1019        algorithms
1020    }
1021}
1022
1023impl From<Option<UploadChecksum>> for Checksum {
1024    fn from(value: Option<UploadChecksum>) -> Self {
1025        let mut checksum = Checksum::empty();
1026        match value.as_ref() {
1027            Some(UploadChecksum::Crc64nvme(crc64)) => checksum.checksum_crc64nvme = Some(crc64nvme_to_base64(crc64)),
1028            Some(UploadChecksum::Crc32c(crc32c)) => checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c)),
1029            Some(UploadChecksum::Crc32(crc32)) => checksum.checksum_crc32 = Some(crc32_to_base64(crc32)),
1030            Some(UploadChecksum::Sha1(sha1)) => checksum.checksum_sha1 = Some(sha1_to_base64(sha1)),
1031            Some(UploadChecksum::Sha256(sha256)) => checksum.checksum_sha256 = Some(sha256_to_base64(sha256)),
1032            None => {}
1033        };
1034        checksum
1035    }
1036}
1037
1038/// Metadata about object parts from GetObjectAttributes API.
1039///
1040/// See [GetObjectAttributesParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAttributesParts.html)
1041/// in the *Amazon S3 API Reference* for more details.
1042#[derive(Debug)]
1043pub struct GetObjectAttributesParts {
1044    /// Indicates whether the returned list of parts is truncated
1045    pub is_truncated: Option<bool>,
1046
1047    /// Maximum number of parts allowed in the response
1048    pub max_parts: Option<usize>,
1049
1050    /// When a list is truncated, this element specifies the next marker
1051    pub next_part_number_marker: Option<usize>,
1052
1053    /// The marker for the current part
1054    pub part_number_marker: Option<usize>,
1055
1056    /// Array of metadata for particular parts
1057    pub parts: Option<Vec<ObjectPart>>,
1058
1059    /// Total number of parts
1060    pub total_parts_count: Option<usize>,
1061}
1062
1063/// Metadata for an individual object part.
1064///
1065/// See [ObjectPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ObjectPart.html) in the
1066/// *Amazon S3 API Reference* for more details.
1067#[derive(Debug)]
1068pub struct ObjectPart {
1069    /// Checksum of the object
1070    pub checksum: Option<Checksum>,
1071
1072    /// Number of the part, this value is a positive integer between 1 and 10,000
1073    pub part_number: usize,
1074
1075    /// Size of the part in bytes
1076    pub size: usize,
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use super::*;
1082
1083    #[test]
1084    fn test_checksum_algorithm_one_set() {
1085        let checksum = Checksum {
1086            checksum_crc64nvme: None,
1087            checksum_crc32: None,
1088            checksum_crc32c: None,
1089            checksum_sha1: Some("checksum_sha1".to_string()),
1090            checksum_sha256: None,
1091        };
1092        assert_eq!(checksum.algorithms(), vec![ChecksumAlgorithm::Sha1]);
1093    }
1094
1095    #[test]
1096    fn test_checksum_algorithm_none_set() {
1097        let checksum = Checksum {
1098            checksum_crc64nvme: None,
1099            checksum_crc32: None,
1100            checksum_crc32c: None,
1101            checksum_sha1: None,
1102            checksum_sha256: None,
1103        };
1104        assert_eq!(checksum.algorithms(), vec![]);
1105    }
1106
1107    #[test]
1108    fn test_checksum_algorithm_many_set() {
1109        // Amazon S3 doesn't support more than one algorithm today, but just in case... let's show we don't panic.
1110        let checksum = Checksum {
1111            checksum_crc64nvme: None,
1112            checksum_crc32: None,
1113            checksum_crc32c: Some("checksum_crc32c".to_string()),
1114            checksum_sha1: Some("checksum_sha1".to_string()),
1115            checksum_sha256: None,
1116        };
1117        assert_eq!(
1118            checksum.algorithms(),
1119            vec![ChecksumAlgorithm::Crc32c, ChecksumAlgorithm::Sha1],
1120        );
1121    }
1122}