use std::fmt::{self, Debug};
use std::ops::Range;
use std::time::SystemTime;
use async_trait::async_trait;
use auto_impl::auto_impl;
use bytes::Bytes;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::collections::HashMap;
use thiserror::Error;
use time::OffsetDateTime;
use crate::checksums::{
self, crc32_to_base64, crc32c_to_base64, crc64nvme_to_base64, sha1_to_base64, sha256_to_base64,
};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
mod etag;
pub use etag::ETag;
#[cfg_attr(not(docsrs), async_trait)]
#[auto_impl(Arc)]
pub trait ObjectClient {
type GetObjectResponse: GetObjectResponse<ClientError = Self::ClientError>;
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;
fn read_part_size(&self) -> usize;
fn write_part_size(&self) -> usize;
fn initial_read_window_size(&self) -> Option<usize>;
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;
async fn delete_object(
&self,
bucket: &str,
key: &str,
) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, Self::ClientError>;
async fn copy_object(
&self,
source_bucket: &str,
source_key: &str,
destination_bucket: &str,
destination_key: &str,
params: &CopyObjectParams,
) -> ObjectClientResult<CopyObjectResult, CopyObjectError, Self::ClientError>;
async fn get_object(
&self,
bucket: &str,
key: &str,
params: &GetObjectParams,
) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError>;
async fn list_objects(
&self,
bucket: &str,
continuation_token: Option<&str>,
delimiter: &str,
max_keys: usize,
prefix: &str,
) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError>;
async fn head_object(
&self,
bucket: &str,
key: &str,
params: &HeadObjectParams,
) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError>;
async fn put_object(
&self,
bucket: &str,
key: &str,
params: &PutObjectParams,
) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError>;
async fn put_object_single<'a>(
&self,
bucket: &str,
key: &str,
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
async fn get_object_attributes(
&self,
bucket: &str,
key: &str,
max_parts: Option<usize>,
part_number_marker: Option<usize>,
object_attributes: &[ObjectAttribute],
) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, Self::ClientError>;
async fn rename_object(
&self,
bucket: &str,
src_key: &str,
dest_key: &str,
params: &RenameObjectParams,
) -> ObjectClientResult<RenameObjectResult, RenameObjectError, Self::ClientError>;
}
#[derive(Debug, Error, PartialEq)]
pub enum ObjectClientError<S, C> {
#[error("Service error")]
ServiceError(#[source] S),
#[error("Client error")]
ClientError(#[from] C),
}
impl<S, C> ProvideErrorMetadata for ObjectClientError<S, C>
where
S: ProvideErrorMetadata,
C: ProvideErrorMetadata,
{
fn meta(&self) -> ClientErrorMetadata {
match self {
Self::ClientError(err) => err.meta(),
Self::ServiceError(err) => err.meta(),
}
}
}
impl ProvideErrorMetadata for GetObjectError {
fn meta(&self) -> ClientErrorMetadata {
match self {
GetObjectError::NoSuchBucket(client_error_metadata) => client_error_metadata.clone(),
GetObjectError::NoSuchKey(client_error_metadata) => client_error_metadata.clone(),
GetObjectError::PreconditionFailed(client_error_metadata) => client_error_metadata.clone(),
}
}
}
impl ProvideErrorMetadata for ListObjectsError {
fn meta(&self) -> ClientErrorMetadata {
Default::default()
}
}
impl ProvideErrorMetadata for HeadObjectError {
fn meta(&self) -> ClientErrorMetadata {
Default::default()
}
}
impl ProvideErrorMetadata for DeleteObjectError {
fn meta(&self) -> ClientErrorMetadata {
Default::default()
}
}
impl ProvideErrorMetadata for RenameObjectError {
fn meta(&self) -> ClientErrorMetadata {
Default::default()
}
}
impl ProvideErrorMetadata for PutObjectError {
fn meta(&self) -> ClientErrorMetadata {
Default::default()
}
}
pub type ObjectClientResult<T, S, C> = Result<T, ObjectClientError<S, C>>;
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum GetObjectError {
#[error("The bucket does not exist")]
NoSuchBucket(ClientErrorMetadata),
#[error("The key does not exist")]
NoSuchKey(ClientErrorMetadata),
#[error("At least one of the preconditions specified did not hold")]
PreconditionFailed(ClientErrorMetadata),
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct GetObjectParams {
pub range: Option<Range<u64>>,
pub if_match: Option<ETag>,
pub checksum_mode: Option<ChecksumMode>,
}
impl GetObjectParams {
pub fn new() -> Self {
Self::default()
}
pub fn range(mut self, value: Option<Range<u64>>) -> Self {
self.range = value;
self
}
pub fn if_match(mut self, value: Option<ETag>) -> Self {
self.if_match = value;
self
}
pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
self.checksum_mode = value;
self
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct ListObjectsResult {
pub objects: Vec<ObjectInfo>,
pub common_prefixes: Vec<String>,
pub next_continuation_token: Option<String>,
}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum ListObjectsError {
#[error("The bucket does not exist")]
NoSuchBucket,
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct HeadObjectParams {
pub checksum_mode: Option<ChecksumMode>,
}
impl HeadObjectParams {
pub fn new() -> Self {
Self::default()
}
pub fn checksum_mode(mut self, value: Option<ChecksumMode>) -> Self {
self.checksum_mode = value;
self
}
}
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq)]
pub enum ChecksumMode {
Enabled,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct HeadObjectResult {
pub size: u64,
pub last_modified: OffsetDateTime,
pub etag: ETag,
pub storage_class: Option<String>,
pub restore_status: Option<RestoreStatus>,
pub checksum: Checksum,
pub sse_type: Option<String>,
pub sse_kms_key_id: Option<String>,
}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum HeadObjectError {
#[error("The object was not found")]
NotFound,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct DeleteObjectResult {}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum DeleteObjectError {
#[error("The bucket does not exist")]
NoSuchBucket,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct CopyObjectResult {
}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum CopyObjectError {
#[error("The object was not found")]
NotFound,
#[error("The source object of the COPY action is not in the active tier and is only stored in Amazon S3 Glacier.")]
ObjectNotInActiveTierError,
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct CopyObjectParams {
}
impl CopyObjectParams {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Default)]
pub struct GetObjectAttributesResult {
pub etag: Option<String>,
pub checksum: Option<Checksum>,
pub object_parts: Option<GetObjectAttributesParts>,
pub storage_class: Option<String>,
pub object_size: Option<u64>,
}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum GetObjectAttributesError {
#[error("The bucket does not exist")]
NoSuchBucket,
#[error("The key does not exist")]
NoSuchKey,
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct RenameObjectParams {
pub if_none_match: Option<String>,
pub if_match: Option<ETag>,
pub if_source_match: Option<ETag>,
pub client_token: Option<String>,
pub custom_headers: Vec<(String, String)>,
}
impl RenameObjectParams {
pub fn new() -> Self {
Self::default()
}
pub fn if_none_match(mut self, value: Option<String>) -> Self {
self.if_none_match = value;
self
}
pub fn if_match(mut self, value: Option<ETag>) -> Self {
self.if_match = value;
self
}
pub fn if_source_match(mut self, value: Option<ETag>) -> Self {
self.if_source_match = value;
self
}
pub fn client_token(mut self, value: Option<String>) -> Self {
self.client_token = value;
self
}
pub fn custom_headers(mut self, value: Vec<(String, String)>) -> Self {
self.custom_headers = value;
self
}
}
#[derive(Debug, Eq, PartialEq)]
#[non_exhaustive]
pub struct RenameObjectResult {}
#[derive(Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum RenamePreconditionTypes {
IfMatch,
IfNoneMatch,
Other,
}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum RenameObjectError {
#[error("The bucket does not exist")]
NoSuchBucket,
#[error("The destination key provided is too long")]
KeyTooLong,
#[error("The key was not found")]
KeyNotFound,
#[error("A Precondition")]
PreConditionFailed(RenamePreconditionTypes),
#[error("The service does not implement rename")]
NotImplementedError,
#[error("The service returned an Internal Error")]
InternalError,
#[error("You do not have access to this resource")]
AccessDenied,
#[error("Bad Request")]
BadRequest,
}
pub type ObjectMetadata = HashMap<String, String>;
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct PutObjectParams {
pub trailing_checksums: PutObjectTrailingChecksums,
pub storage_class: Option<String>,
pub server_side_encryption: Option<String>,
pub ssekms_key_id: Option<String>,
pub custom_headers: Vec<(String, String)>,
pub object_metadata: ObjectMetadata,
}
impl PutObjectParams {
pub fn new() -> Self {
Self::default()
}
pub fn trailing_checksums(mut self, value: PutObjectTrailingChecksums) -> Self {
self.trailing_checksums = value;
self
}
pub fn storage_class(mut self, value: String) -> Self {
self.storage_class = Some(value);
self
}
pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
self.server_side_encryption = value;
self
}
pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
self.ssekms_key_id = value;
self
}
pub fn add_custom_header(mut self, name: String, value: String) -> Self {
self.custom_headers.push((name, value));
self
}
pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
self.object_metadata = value;
self
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum PutObjectTrailingChecksums {
Enabled,
ReviewOnly,
#[default]
Disabled,
}
pub type UploadReview = mountpoint_s3_crt::s3::client::UploadReview;
pub type UploadReviewPart = mountpoint_s3_crt::s3::client::UploadReviewPart;
pub type ChecksumAlgorithm = mountpoint_s3_crt::s3::client::ChecksumAlgorithm;
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct PutObjectSingleParams {
pub checksum: Option<UploadChecksum>,
pub storage_class: Option<String>,
pub server_side_encryption: Option<String>,
pub ssekms_key_id: Option<String>,
pub if_match: Option<ETag>,
pub write_offset_bytes: Option<u64>,
pub custom_headers: Vec<(String, String)>,
pub object_metadata: ObjectMetadata,
}
impl PutObjectSingleParams {
pub fn new() -> Self {
Self::default()
}
pub fn new_for_append(offset: u64) -> Self {
Self::default().write_offset_bytes(offset)
}
pub fn checksum(mut self, value: Option<UploadChecksum>) -> Self {
self.checksum = value;
self
}
pub fn storage_class(mut self, value: String) -> Self {
self.storage_class = Some(value);
self
}
pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
self.server_side_encryption = value;
self
}
pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
self.ssekms_key_id = value;
self
}
pub fn if_match(mut self, value: Option<ETag>) -> Self {
self.if_match = value;
self
}
pub fn write_offset_bytes(mut self, value: u64) -> Self {
self.write_offset_bytes = Some(value);
self
}
pub fn add_custom_header(mut self, name: String, value: String) -> Self {
self.custom_headers.push((name, value));
self
}
pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
self.object_metadata = value;
self
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum UploadChecksum {
Crc64nvme(checksums::Crc64nvme),
Crc32c(checksums::Crc32c),
Crc32(checksums::Crc32),
Sha1(checksums::Sha1),
Sha256(checksums::Sha256),
}
impl UploadChecksum {
pub fn checksum_algorithm(&self) -> ChecksumAlgorithm {
match self {
UploadChecksum::Crc64nvme(_) => ChecksumAlgorithm::Crc64nvme,
UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c,
UploadChecksum::Crc32(_) => ChecksumAlgorithm::Crc32,
UploadChecksum::Sha1(_) => ChecksumAlgorithm::Sha1,
UploadChecksum::Sha256(_) => ChecksumAlgorithm::Sha256,
}
}
}
pub trait ClientBackpressureHandle {
fn increment_read_window(&mut self, len: usize);
fn ensure_read_window(&mut self, desired_end_offset: u64);
fn read_window_end_offset(&self) -> u64;
}
#[cfg_attr(not(docsrs), async_trait)]
pub trait GetObjectResponse:
Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
{
type BackpressureHandle: ClientBackpressureHandle + Clone + Send + Sync;
type ClientError: std::error::Error + Send + Sync + 'static;
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle>;
fn get_object_metadata(&self) -> ObjectMetadata;
fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError>;
}
#[derive(Debug, Error)]
pub enum ObjectChecksumError {
#[error("requested object checksums, but did not specify it in the request")]
DidNotRequestChecksums,
#[error("object checksum could not be retrieved from headers")]
HeadersError(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}
#[derive(Debug)]
pub struct GetBodyPart {
pub offset: u64,
pub data: Bytes,
}
#[cfg_attr(not(docsrs), async_trait)]
pub trait PutObjectRequest: Send {
type ClientError: std::error::Error + Send + Sync + 'static;
async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError>;
async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
async fn review_and_complete(
self,
review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;
}
#[derive(Debug)]
#[non_exhaustive]
pub struct PutObjectResult {
pub etag: ETag,
pub sse_type: Option<String>,
pub sse_kms_key_id: Option<String>,
}
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum PutObjectError {
#[error("The bucket does not exist")]
NoSuchBucket,
#[error("The key does not exist")]
NoSuchKey,
#[error("Request body cannot be empty when write offset is specified")]
EmptyBody,
#[error("The offset does not match the current object size")]
InvalidWriteOffset,
#[error("The provided checksum does not match the data")]
BadChecksum,
#[error("The provided checksum is not valid or does not match the existing checksum algorithm")]
InvalidChecksumType,
#[error("At least one of the pre-conditions you specified did not hold")]
PreconditionFailed,
#[error("The server does not support the functionality required to fulfill the request")]
NotImplemented,
}
#[derive(Debug, Clone, Copy)]
pub enum RestoreStatus {
InProgress,
Restored { expiry: SystemTime },
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ObjectInfo {
pub key: String,
pub size: u64,
pub last_modified: OffsetDateTime,
pub storage_class: Option<String>,
pub restore_status: Option<RestoreStatus>,
pub etag: String,
pub checksum_algorithms: Vec<ChecksumAlgorithm>,
}
#[derive(Debug)]
pub enum ObjectAttribute {
ETag,
Checksum,
ObjectParts,
StorageClass,
ObjectSize,
}
impl fmt::Display for ObjectAttribute {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let attr_name = match self {
ObjectAttribute::ETag => "ETag",
ObjectAttribute::Checksum => "Checksum",
ObjectAttribute::ObjectParts => "ObjectParts",
ObjectAttribute::StorageClass => "StorageClass",
ObjectAttribute::ObjectSize => "ObjectSize",
};
write!(f, "{attr_name}")
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Checksum {
pub checksum_crc64nvme: Option<String>,
pub checksum_crc32: Option<String>,
pub checksum_crc32c: Option<String>,
pub checksum_sha1: Option<String>,
pub checksum_sha256: Option<String>,
}
impl Checksum {
pub fn empty() -> Self {
Self {
checksum_crc64nvme: None,
checksum_crc32: None,
checksum_crc32c: None,
checksum_sha1: None,
checksum_sha256: None,
}
}
pub fn algorithms(&self) -> Vec<ChecksumAlgorithm> {
let mut algorithms = Vec::with_capacity(1);
let Self {
checksum_crc64nvme,
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
} = &self;
if checksum_crc64nvme.is_some() {
algorithms.push(ChecksumAlgorithm::Crc64nvme);
}
if checksum_crc32.is_some() {
algorithms.push(ChecksumAlgorithm::Crc32);
}
if checksum_crc32c.is_some() {
algorithms.push(ChecksumAlgorithm::Crc32c);
}
if checksum_sha1.is_some() {
algorithms.push(ChecksumAlgorithm::Sha1);
}
if checksum_sha256.is_some() {
algorithms.push(ChecksumAlgorithm::Sha256);
}
algorithms
}
}
impl From<Option<UploadChecksum>> for Checksum {
fn from(value: Option<UploadChecksum>) -> Self {
let mut checksum = Checksum::empty();
match value.as_ref() {
Some(UploadChecksum::Crc64nvme(crc64)) => checksum.checksum_crc64nvme = Some(crc64nvme_to_base64(crc64)),
Some(UploadChecksum::Crc32c(crc32c)) => checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c)),
Some(UploadChecksum::Crc32(crc32)) => checksum.checksum_crc32 = Some(crc32_to_base64(crc32)),
Some(UploadChecksum::Sha1(sha1)) => checksum.checksum_sha1 = Some(sha1_to_base64(sha1)),
Some(UploadChecksum::Sha256(sha256)) => checksum.checksum_sha256 = Some(sha256_to_base64(sha256)),
None => {}
};
checksum
}
}
#[derive(Debug)]
pub struct GetObjectAttributesParts {
pub is_truncated: Option<bool>,
pub max_parts: Option<usize>,
pub next_part_number_marker: Option<usize>,
pub part_number_marker: Option<usize>,
pub parts: Option<Vec<ObjectPart>>,
pub total_parts_count: Option<usize>,
}
#[derive(Debug)]
pub struct ObjectPart {
pub checksum: Option<Checksum>,
pub part_number: usize,
pub size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_checksum_algorithm_one_set() {
let checksum = Checksum {
checksum_crc64nvme: None,
checksum_crc32: None,
checksum_crc32c: None,
checksum_sha1: Some("checksum_sha1".to_string()),
checksum_sha256: None,
};
assert_eq!(checksum.algorithms(), vec![ChecksumAlgorithm::Sha1]);
}
#[test]
fn test_checksum_algorithm_none_set() {
let checksum = Checksum {
checksum_crc64nvme: None,
checksum_crc32: None,
checksum_crc32c: None,
checksum_sha1: None,
checksum_sha256: None,
};
assert_eq!(checksum.algorithms(), vec![]);
}
#[test]
fn test_checksum_algorithm_many_set() {
let checksum = Checksum {
checksum_crc64nvme: None,
checksum_crc32: None,
checksum_crc32c: Some("checksum_crc32c".to_string()),
checksum_sha1: Some("checksum_sha1".to_string()),
checksum_sha256: None,
};
assert_eq!(
checksum.algorithms(),
vec![ChecksumAlgorithm::Crc32c, ChecksumAlgorithm::Sha1],
);
}
}