use rustack_s3_model::{
error::S3Error,
input::{ListObjectVersionsInput, ListObjectsInput, ListObjectsV2Input},
output::{ListObjectVersionsOutput, ListObjectsOutput, ListObjectsV2Output},
types::{
CommonPrefix, DeleteMarkerEntry, Object, ObjectStorageClass, ObjectVersion,
ObjectVersionStorageClass, Owner,
},
};
use tracing::debug;
use crate::{
error::S3ServiceError,
provider::RustackS3,
state::{keystore::VersionListEntry, object::Owner as InternalOwner},
utils::{decode_continuation_token, encode_continuation_token},
};
const DEFAULT_MAX_KEYS: i32 = 1000;
#[allow(clippy::result_large_err)]
fn validate_max_keys(max_keys: Option<i32>) -> Result<i32, S3Error> {
let value = max_keys.unwrap_or(DEFAULT_MAX_KEYS);
if value < 0 {
return Err(S3ServiceError::InvalidArgument {
message: format!(
"Argument max-keys must be an integer between 0 and {DEFAULT_MAX_KEYS}"
),
}
.into_s3_error());
}
Ok(value)
}
#[allow(clippy::cast_possible_wrap)]
fn to_model_object(obj: &crate::state::object::S3Object) -> Object {
let owner = Owner {
display_name: Some(obj.owner.display_name.clone()),
id: Some(obj.owner.id.clone()),
};
Object {
checksum_algorithm: Vec::new(),
checksum_type: None,
e_tag: Some(obj.etag.clone()),
key: Some(obj.key.clone()),
last_modified: Some(obj.last_modified),
owner: Some(owner),
restore_status: None,
size: Some(obj.size as i64),
storage_class: Some(ObjectStorageClass::from(obj.storage_class.as_str())),
}
}
fn to_model_owner(owner: &InternalOwner) -> Owner {
Owner {
display_name: Some(owner.display_name.clone()),
id: Some(owner.id.clone()),
}
}
fn to_common_prefixes(prefixes: &[String]) -> Vec<CommonPrefix> {
prefixes
.iter()
.map(|p| CommonPrefix {
prefix: Some(p.clone()),
})
.collect()
}
#[allow(
clippy::cast_possible_wrap,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::unused_async
)]
impl RustackS3 {
pub async fn handle_list_objects(
&self,
input: ListObjectsInput,
) -> Result<ListObjectsOutput, S3Error> {
let bucket_name = input.bucket;
let bucket = self
.state
.get_bucket(&bucket_name)
.map_err(S3ServiceError::into_s3_error)?;
let prefix = input.prefix.as_deref().unwrap_or("");
let delimiter = input.delimiter.as_deref().unwrap_or("");
let marker = input.marker.as_deref().unwrap_or("");
let max_keys = validate_max_keys(input.max_keys)?;
let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
let store = bucket.objects.read();
let result = store.list_objects(prefix, delimiter, marker, max_keys_usize);
drop(store);
drop(bucket);
let contents: Vec<Object> = result.objects.iter().map(to_model_object).collect();
let common_prefixes = to_common_prefixes(&result.common_prefixes);
let next_marker = if result.is_truncated {
result.next_marker.clone()
} else {
None
};
debug!(
bucket = %bucket_name,
prefix = %prefix,
count = contents.len(),
is_truncated = result.is_truncated,
"list_objects completed"
);
Ok(ListObjectsOutput {
common_prefixes,
contents,
delimiter: input.delimiter,
encoding_type: input.encoding_type,
is_truncated: Some(result.is_truncated),
marker: input.marker,
max_keys: Some(max_keys),
name: Some(bucket_name),
next_marker,
prefix: input.prefix,
request_charged: None,
})
}
pub async fn handle_list_objects_v2(
&self,
input: ListObjectsV2Input,
) -> Result<ListObjectsV2Output, S3Error> {
let bucket_name = input.bucket;
let bucket = self
.state
.get_bucket(&bucket_name)
.map_err(S3ServiceError::into_s3_error)?;
let prefix = input.prefix.as_deref().unwrap_or("");
let delimiter = input.delimiter.as_deref().unwrap_or("");
let max_keys = validate_max_keys(input.max_keys)?;
let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
let fetch_owner = input.fetch_owner.unwrap_or(false);
let decoded_token = if let Some(token) = &input.continuation_token {
Some(decode_continuation_token(token).map_err(S3ServiceError::into_s3_error)?)
} else {
None
};
let start_after = decoded_token
.as_deref()
.or(input.start_after.as_deref())
.unwrap_or("");
let store = bucket.objects.read();
let result = store.list_objects(prefix, delimiter, start_after, max_keys_usize);
drop(store);
drop(bucket);
let contents: Vec<Object> = result
.objects
.iter()
.map(|obj| {
let mut s3_obj = to_model_object(obj);
if !fetch_owner {
s3_obj.owner = None;
}
s3_obj
})
.collect();
let common_prefixes = to_common_prefixes(&result.common_prefixes);
let next_continuation_token = if result.is_truncated {
result
.next_marker
.as_ref()
.map(|m| encode_continuation_token(m))
} else {
None
};
let key_count = contents.len() as i32;
debug!(
bucket = %bucket_name,
prefix = %prefix,
count = key_count,
is_truncated = result.is_truncated,
"list_objects_v2 completed"
);
Ok(ListObjectsV2Output {
common_prefixes,
contents,
continuation_token: input.continuation_token,
delimiter: input.delimiter,
encoding_type: input.encoding_type,
is_truncated: Some(result.is_truncated),
key_count: Some(key_count),
max_keys: Some(max_keys),
name: Some(bucket_name),
next_continuation_token,
prefix: input.prefix,
request_charged: None,
start_after: input.start_after,
})
}
pub async fn handle_list_object_versions(
&self,
input: ListObjectVersionsInput,
) -> Result<ListObjectVersionsOutput, S3Error> {
if input.version_id_marker.is_some() && input.key_marker.is_none() {
return Err(S3Error::invalid_argument(
"A version-id marker cannot be specified without a key marker",
));
}
let bucket_name = input.bucket;
let bucket = self
.state
.get_bucket(&bucket_name)
.map_err(S3ServiceError::into_s3_error)?;
let prefix = input.prefix.as_deref().unwrap_or("");
let delimiter = input.delimiter.as_deref().unwrap_or("");
let key_marker = input.key_marker.as_deref().unwrap_or("");
let version_id_marker = input.version_id_marker.as_deref().unwrap_or("");
let max_keys = validate_max_keys(input.max_keys)?;
let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
let store = bucket.objects.read();
let result = store.list_object_versions(
prefix,
delimiter,
key_marker,
version_id_marker,
max_keys_usize,
);
drop(store);
drop(bucket);
let (versions, delete_markers) = partition_version_list_entries(&result.versions);
let common_prefixes = to_common_prefixes(&result.common_prefixes);
debug!(
bucket = %bucket_name,
prefix = %prefix,
versions = versions.len(),
delete_markers = delete_markers.len(),
is_truncated = result.is_truncated,
"list_object_versions completed"
);
Ok(ListObjectVersionsOutput {
common_prefixes,
delete_markers,
delimiter: input.delimiter,
encoding_type: input.encoding_type,
is_truncated: Some(result.is_truncated),
key_marker: input.key_marker,
max_keys: Some(max_keys),
name: Some(bucket_name),
next_key_marker: result.next_key_marker,
next_version_id_marker: result.next_version_id_marker,
prefix: input.prefix,
request_charged: None,
version_id_marker: input.version_id_marker,
versions,
})
}
}
#[allow(clippy::cast_possible_wrap)]
fn partition_version_list_entries(
entries: &[VersionListEntry],
) -> (Vec<ObjectVersion>, Vec<DeleteMarkerEntry>) {
let mut versions = Vec::new();
let mut delete_markers = Vec::new();
for entry in entries {
match &entry.version {
crate::state::object::ObjectVersion::Object(obj) => {
let owner = to_model_owner(&obj.owner);
versions.push(ObjectVersion {
checksum_algorithm: Vec::new(),
checksum_type: None,
e_tag: Some(obj.etag.clone()),
is_latest: Some(entry.is_latest),
key: Some(obj.key.clone()),
last_modified: Some(obj.last_modified),
owner: Some(owner),
restore_status: None,
size: Some(obj.size as i64),
storage_class: Some(ObjectVersionStorageClass::from(
obj.storage_class.as_str(),
)),
version_id: Some(obj.version_id.clone()),
});
}
crate::state::object::ObjectVersion::DeleteMarker(dm) => {
let owner = to_model_owner(&dm.owner);
delete_markers.push(DeleteMarkerEntry {
is_latest: Some(entry.is_latest),
key: Some(dm.key.clone()),
last_modified: Some(dm.last_modified),
owner: Some(owner),
version_id: Some(dm.version_id.clone()),
});
}
}
}
(versions, delete_markers)
}