use async_trait::async_trait;
use aws_smithy_runtime_api::client::http::{
HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
};
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::result::ConnectorError;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::http::{Response, StatusCode};
use aws_smithy_types::body::SdkBody;
use bytes::Bytes;
use jiff::Timestamp;
use rc_core::{
Alias, Capabilities, Error, ListOptions, ListResult, ObjectInfo, ObjectStore, ObjectVersion,
RemotePath, Result,
};
use tokio::io::AsyncReadExt;
const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BucketPolicyErrorKind {
MissingPolicy,
MissingBucket,
Other,
}
#[derive(Debug, Clone)]
struct ReqwestConnector {
client: reqwest::Client,
}
impl ReqwestConnector {
async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
if let Some(bundle_path) = ca_bundle {
let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
})?;
let cert = reqwest::Certificate::from_pem(&pem)
.map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
builder = builder.add_root_certificate(cert);
}
let client = builder
.build()
.map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
Ok(Self { client })
}
}
impl HttpConnector for ReqwestConnector {
fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
let client = self.client.clone();
HttpConnectorFuture::new(async move {
let uri = request.uri().to_string();
let method_str = request.method().to_string();
let headers = request.headers().clone();
let body_bytes = match request.body().bytes() {
Some(b) => Bytes::copy_from_slice(b),
None => {
return Err(ConnectorError::user(
"Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
use in-memory data for uploads with this connector"
.into(),
));
}
};
let method = reqwest::Method::from_bytes(method_str.as_bytes())
.map_err(|e| ConnectorError::user(Box::new(e)))?;
let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
let mut req = reqwest::Request::new(method, url);
for (name, value) in headers.iter() {
match (
reqwest::header::HeaderName::from_bytes(name.as_bytes()),
reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
) {
(Ok(header_name), Ok(header_value)) => {
req.headers_mut().append(header_name, header_value);
}
_ => {
tracing::warn!("Skipping non-convertible request header: {}", name);
}
}
}
*req.body_mut() = Some(reqwest::Body::from(body_bytes));
let resp = client
.execute(req)
.await
.map_err(|e| ConnectorError::io(Box::new(e)))?;
let status = StatusCode::try_from(resp.status().as_u16())
.map_err(|e| ConnectorError::other(Box::new(e), None))?;
let resp_headers = resp.headers().clone();
let body = resp
.bytes()
.await
.map_err(|e| ConnectorError::io(Box::new(e)))?;
let mut sdk_response = Response::new(status, SdkBody::from(body));
for (name, value) in &resp_headers {
match value.to_str() {
Ok(value_str) => {
sdk_response
.headers_mut()
.append(name.as_str().to_owned(), value_str.to_owned());
}
Err(_) => {
tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
}
}
}
Ok(sdk_response)
})
}
}
impl HttpClient for ReqwestConnector {
fn http_connector(
&self,
_settings: &HttpConnectorSettings,
_components: &RuntimeComponents,
) -> SharedHttpConnector {
SharedHttpConnector::new(self.clone())
}
}
pub struct S3Client {
inner: aws_sdk_s3::Client,
#[allow(dead_code)]
alias: Alias,
}
impl S3Client {
pub async fn new(alias: Alias) -> Result<Self> {
let endpoint = alias.endpoint.clone();
let region = alias.region.clone();
let access_key = alias.access_key.clone();
let secret_key = alias.secret_key.clone();
let credentials = aws_credential_types::Credentials::new(
access_key,
secret_key,
None, None, "rc-static-credentials",
);
let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
.credentials_provider(credentials)
.region(aws_config::Region::new(region))
.endpoint_url(&endpoint);
if alias.insecure || alias.ca_bundle.is_some() {
let connector =
ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?;
config_loader = config_loader.http_client(connector);
}
let config = config_loader.load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&config)
.force_path_style(alias.bucket_lookup == "path" || alias.bucket_lookup == "auto")
.request_checksum_calculation(
aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
)
.response_checksum_validation(
aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
)
.build();
let client = aws_sdk_s3::Client::from_conf(s3_config);
Ok(Self {
inner: client,
alias,
})
}
pub fn inner(&self) -> &aws_sdk_s3::Client {
&self.inner
}
fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
match error {
aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
let err = service_err.err();
let meta = service_err.raw();
let mut msg = format!("Service error: {}", err);
if let Some(code) = meta.headers().get("x-amz-error-code")
&& let Ok(code_str) = std::str::from_utf8(code.as_bytes())
{
msg.push_str(&format!(" (code: {})", code_str));
}
msg
}
aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
format!("Request construction failed: {:?}", err)
}
aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
format!("Network dispatch error: {:?}", err)
}
aws_sdk_s3::error::SdkError::ResponseError(err) => {
format!("Response error: {:?}", err)
}
_ => error.to_string(),
}
}
fn should_use_multipart(file_size: u64) -> bool {
file_size > SINGLE_PUT_OBJECT_MAX_SIZE
}
fn bucket_policy_error_kind(
error_code: Option<&str>,
status_code: Option<u16>,
error_text: &str,
) -> BucketPolicyErrorKind {
let error_code = error_code.map(|code| code.to_ascii_lowercase());
if matches!(
error_code.as_deref(),
Some("nosuchbucketpolicy") | Some("nosuchpolicy")
) {
return BucketPolicyErrorKind::MissingPolicy;
}
if matches!(error_code.as_deref(), Some("nosuchbucket")) {
return BucketPolicyErrorKind::MissingBucket;
}
let error_text = error_text.to_ascii_lowercase();
if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
return BucketPolicyErrorKind::MissingPolicy;
}
if error_text.contains("nosuchbucket") {
return BucketPolicyErrorKind::MissingBucket;
}
if status_code == Some(404) {
return BucketPolicyErrorKind::MissingPolicy;
}
BucketPolicyErrorKind::Other
}
fn map_get_bucket_policy_error(
bucket: &str,
kind: BucketPolicyErrorKind,
error_text: &str,
) -> Result<Option<String>> {
match kind {
BucketPolicyErrorKind::MissingPolicy => Ok(None),
BucketPolicyErrorKind::MissingBucket => {
Err(Error::NotFound(format!("Bucket not found: {bucket}")))
}
BucketPolicyErrorKind::Other => {
Err(Error::Network(format!("get_bucket_policy: {error_text}")))
}
}
}
fn map_delete_bucket_policy_error(
bucket: &str,
kind: BucketPolicyErrorKind,
error_text: &str,
) -> Result<()> {
match kind {
BucketPolicyErrorKind::MissingPolicy => Ok(()),
BucketPolicyErrorKind::MissingBucket => {
Err(Error::NotFound(format!("Bucket not found: {bucket}")))
}
BucketPolicyErrorKind::Other => Err(Error::General(format!(
"delete_bucket_policy: {error_text}"
))),
}
}
async fn read_next_part(
file: &mut tokio::fs::File,
file_path: &std::path::Path,
buffer: &mut [u8],
) -> Result<usize> {
let mut total_read = 0usize;
while total_read < buffer.len() {
let bytes_read = file
.read(&mut buffer[total_read..])
.await
.map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
if bytes_read == 0 {
break;
}
total_read += bytes_read;
}
Ok(total_read)
}
async fn put_object_single_part_from_path(
&self,
path: &RemotePath,
file_path: &std::path::Path,
content_type: Option<&str>,
file_size: u64,
) -> Result<ObjectInfo> {
let data = tokio::fs::read(file_path)
.await
.map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
let body = aws_sdk_s3::primitives::ByteStream::from(data);
let mut request = self
.inner
.put_object()
.bucket(&path.bucket)
.key(&path.key)
.body(body);
if let Some(ct) = content_type {
request = request.content_type(ct);
}
let response = request
.send()
.await
.map_err(|e| Error::Network(e.to_string()))?;
let mut info = ObjectInfo::file(&path.key, file_size as i64);
if let Some(etag) = response.e_tag() {
info.etag = Some(etag.trim_matches('"').to_string());
}
info.last_modified = Some(jiff::Timestamp::now());
Ok(info)
}
async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
let _ = self
.inner
.abort_multipart_upload()
.bucket(&path.bucket)
.key(&path.key)
.upload_id(upload_id)
.send()
.await;
}
async fn put_object_multipart_from_path(
&self,
path: &RemotePath,
file_path: &std::path::Path,
content_type: Option<&str>,
file_size: u64,
on_progress: impl Fn(u64) + Send,
) -> Result<ObjectInfo> {
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
let config = crate::multipart::MultipartConfig::default();
let part_size = config.calculate_part_size(file_size);
let part_buffer_size = usize::try_from(part_size)
.map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
tracing::debug!(file_size, part_size, "Starting multipart upload");
let mut create_request = self
.inner
.create_multipart_upload()
.bucket(&path.bucket)
.key(&path.key);
if let Some(ct) = content_type {
create_request = create_request.content_type(ct);
}
let create_response = create_request
.send()
.await
.map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
let upload_id = create_response
.upload_id()
.ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
.to_string();
tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
let mut file = tokio::fs::File::open(file_path)
.await
.map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
let mut completed_parts = Vec::new();
let mut part_number: i32 = 1;
let mut chunk = vec![0u8; part_buffer_size];
let mut bytes_uploaded: u64 = 0;
loop {
let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
if bytes_read == 0 {
break;
}
tracing::debug!(part_number, bytes_read, "Uploading part");
let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
let upload_part_result = self
.inner
.upload_part()
.bucket(&path.bucket)
.key(&path.key)
.upload_id(&upload_id)
.part_number(part_number)
.body(body)
.send()
.await;
let upload_part_response = match upload_part_result {
Ok(response) => response,
Err(e) => {
tracing::debug!(
upload_id = %upload_id,
part_number,
"Aborting multipart upload due to error"
);
self.abort_multipart_upload_best_effort(path, &upload_id)
.await;
return Err(Error::Network(format!(
"upload multipart part {part_number}: {e}"
)));
}
};
let etag = match upload_part_response.e_tag() {
Some(value) => value.trim_matches('"').to_string(),
None => {
self.abort_multipart_upload_best_effort(path, &upload_id)
.await;
return Err(Error::General(format!(
"missing ETag for multipart part {part_number}"
)));
}
};
completed_parts.push(
CompletedPart::builder()
.part_number(part_number)
.e_tag(etag)
.build(),
);
bytes_uploaded += bytes_read as u64;
on_progress(bytes_uploaded);
tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
part_number += 1;
}
let completed_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
let complete_result = self
.inner
.complete_multipart_upload()
.bucket(&path.bucket)
.key(&path.key)
.upload_id(&upload_id)
.multipart_upload(completed_upload)
.send()
.await;
let complete_response = match complete_result {
Ok(response) => response,
Err(e) => {
tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
self.abort_multipart_upload_best_effort(path, &upload_id)
.await;
return Err(Error::Network(format!("complete multipart upload: {e}")));
}
};
tracing::debug!("Multipart upload completed");
let mut info = ObjectInfo::file(&path.key, file_size as i64);
if let Some(etag) = complete_response.e_tag() {
info.etag = Some(etag.trim_matches('"').to_string());
}
info.last_modified = Some(jiff::Timestamp::now());
Ok(info)
}
pub async fn put_object_from_path(
&self,
path: &RemotePath,
file_path: &std::path::Path,
content_type: Option<&str>,
on_progress: impl Fn(u64) + Send,
) -> Result<ObjectInfo> {
let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
Error::General(format!("read metadata for '{}': {e}", file_path.display()))
})?;
if !metadata.is_file() {
return Err(Error::General(format!(
"source is not a file: {}",
file_path.display()
)));
}
let file_size = metadata.len();
if Self::should_use_multipart(file_size) {
self.put_object_multipart_from_path(
path,
file_path,
content_type,
file_size,
on_progress,
)
.await
} else {
self.put_object_single_part_from_path(path, file_path, content_type, file_size)
.await
}
}
}
fn build_tagging(
tags: std::collections::HashMap<String, String>,
) -> Result<aws_sdk_s3::types::Tagging> {
use aws_sdk_s3::types::{Tag, Tagging};
let mut tag_set = Vec::with_capacity(tags.len());
for (key, value) in tags {
let tag = Tag::builder()
.key(key)
.value(value)
.build()
.map_err(|e| Error::General(format!("invalid tag: {e}")))?;
tag_set.push(tag);
}
Tagging::builder()
.set_tag_set(Some(tag_set))
.build()
.map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
}
#[async_trait]
impl ObjectStore for S3Client {
async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
let response = self
.inner
.list_buckets()
.send()
.await
.map_err(|e| Error::Network(e.to_string()))?;
let buckets = response
.buckets()
.iter()
.map(|b| {
let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
if let Some(creation_date) = b.creation_date() {
info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
}
info
})
.collect();
Ok(buckets)
}
async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
let prefix = if path.key.is_empty() {
options.prefix.clone()
} else if let Some(p) = &options.prefix {
Some(format!("{}{}", path.key, p))
} else {
Some(path.key.clone())
};
if let Some(p) = prefix {
request = request.prefix(p);
}
if !options.recursive {
request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
}
if let Some(max) = options.max_keys {
request = request.max_keys(max);
}
if let Some(token) = &options.continuation_token {
request = request.continuation_token(token);
}
let response = request.send().await.map_err(|e| {
let err_str = Self::format_sdk_error(&e);
if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
Error::NotFound(format!("Bucket not found: {}", path.bucket))
} else {
Error::Network(err_str)
}
})?;
let mut items = Vec::new();
for prefix in response.common_prefixes() {
if let Some(p) = prefix.prefix() {
items.push(ObjectInfo::dir(p));
}
}
for object in response.contents() {
let key = object.key().unwrap_or_default().to_string();
let size = object.size().unwrap_or(0);
let mut info = ObjectInfo::file(&key, size);
if let Some(modified) = object.last_modified() {
info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
}
if let Some(etag) = object.e_tag() {
info.etag = Some(etag.trim_matches('"').to_string());
}
if let Some(sc) = object.storage_class() {
info.storage_class = Some(sc.as_str().to_string());
}
items.push(info);
}
Ok(ListResult {
items,
truncated: response.is_truncated().unwrap_or(false),
continuation_token: response.next_continuation_token().map(|s| s.to_string()),
})
}
async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
let response = self
.inner
.head_object()
.bucket(&path.bucket)
.key(&path.key)
.send()
.await
.map_err(|e| {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
Error::NotFound(path.to_string())
} else {
Error::Network(err_str)
}
})?;
let size = response.content_length().unwrap_or(0);
let mut info = ObjectInfo::file(&path.key, size);
if let Some(modified) = response.last_modified() {
info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
}
if let Some(etag) = response.e_tag() {
info.etag = Some(etag.trim_matches('"').to_string());
}
if let Some(ct) = response.content_type() {
info.content_type = Some(ct.to_string());
}
if let Some(sc) = response.storage_class() {
info.storage_class = Some(sc.as_str().to_string());
}
if let Some(meta) = response.metadata()
&& !meta.is_empty()
{
info.metadata = Some(meta.clone());
}
Ok(info)
}
async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
match self.inner.head_bucket().bucket(bucket).send().await {
Ok(_) => Ok(true),
Err(e) => {
if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
&& service_err.raw().status().as_u16() == 404
{
return Ok(false);
}
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
return Ok(false);
}
Err(Error::Network(err_str))
}
}
}
async fn create_bucket(&self, bucket: &str) -> Result<()> {
self.inner
.create_bucket()
.bucket(bucket)
.send()
.await
.map_err(|e| Error::Network(e.to_string()))?;
Ok(())
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
self.inner
.delete_bucket()
.bucket(bucket)
.send()
.await
.map_err(|e| {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
Error::NotFound(format!("Bucket not found: {bucket}"))
} else {
Error::Network(err_str)
}
})?;
Ok(())
}
async fn capabilities(&self) -> Result<Capabilities> {
Ok(Capabilities {
versioning: true,
object_lock: false,
tagging: true,
anonymous: true,
select: false,
notifications: false,
})
}
async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
let response = self
.inner
.get_object()
.bucket(&path.bucket)
.key(&path.key)
.send()
.await
.map_err(|e| {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
Error::NotFound(path.to_string())
} else {
Error::Network(err_str)
}
})?;
let data = response
.body
.collect()
.await
.map_err(|e| Error::Network(e.to_string()))?
.into_bytes()
.to_vec();
Ok(data)
}
async fn put_object(
&self,
path: &RemotePath,
data: Vec<u8>,
content_type: Option<&str>,
) -> Result<ObjectInfo> {
let size = data.len() as i64;
let body = aws_sdk_s3::primitives::ByteStream::from(data);
let mut request = self
.inner
.put_object()
.bucket(&path.bucket)
.key(&path.key)
.body(body);
if let Some(ct) = content_type {
request = request.content_type(ct);
}
let response = request
.send()
.await
.map_err(|e| Error::Network(e.to_string()))?;
let mut info = ObjectInfo::file(&path.key, size);
if let Some(etag) = response.e_tag() {
info.etag = Some(etag.trim_matches('"').to_string());
}
info.last_modified = Some(jiff::Timestamp::now());
Ok(info)
}
async fn delete_object(&self, path: &RemotePath) -> Result<()> {
self.inner
.delete_object()
.bucket(&path.bucket)
.key(&path.key)
.send()
.await
.map_err(|e| {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
Error::NotFound(path.to_string())
} else {
Error::Network(err_str)
}
})?;
Ok(())
}
async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
if keys.is_empty() {
return Ok(vec![]);
}
let objects: Vec<ObjectIdentifier> = keys
.iter()
.map(|k| ObjectIdentifier::builder().key(k).build().unwrap())
.collect();
let delete = Delete::builder()
.set_objects(Some(objects))
.build()
.map_err(|e| Error::General(e.to_string()))?;
let response = self
.inner
.delete_objects()
.bucket(bucket)
.delete(delete)
.send()
.await
.map_err(|e| Error::Network(e.to_string()))?;
let deleted: Vec<String> = response
.deleted()
.iter()
.filter_map(|d| d.key().map(|k| k.to_string()))
.collect();
if !response.errors().is_empty() {
let error_keys: Vec<String> = response
.errors()
.iter()
.filter_map(|e| e.key().map(|k| k.to_string()))
.collect();
tracing::warn!("Failed to delete some objects: {:?}", error_keys);
}
Ok(deleted)
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> Result<ObjectInfo> {
let copy_source = format!("{}/{}", src.bucket, src.key);
let response = self
.inner
.copy_object()
.copy_source(©_source)
.bucket(&dst.bucket)
.key(&dst.key)
.send()
.await
.map_err(|e| {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
Error::NotFound(src.to_string())
} else {
Error::Network(err_str)
}
})?;
let info = self.head_object(dst).await?;
let mut result = info;
if let Some(copy_result) = response.copy_object_result()
&& let Some(etag) = copy_result.e_tag()
{
result.etag = Some(etag.trim_matches('"').to_string());
}
Ok(result)
}
async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
let config = aws_sdk_s3::presigning::PresigningConfig::builder()
.expires_in(std::time::Duration::from_secs(expires_secs))
.build()
.map_err(|e| Error::General(format!("presign_get config: {e}")))?;
let request = self
.inner
.get_object()
.bucket(&path.bucket)
.key(&path.key)
.presigned(config)
.await
.map_err(|e| Error::General(format!("presign_get: {e}")))?;
Ok(request.uri().to_string())
}
async fn presign_put(
&self,
path: &RemotePath,
expires_secs: u64,
content_type: Option<&str>,
) -> Result<String> {
let config = aws_sdk_s3::presigning::PresigningConfig::builder()
.expires_in(std::time::Duration::from_secs(expires_secs))
.build()
.map_err(|e| Error::General(format!("presign_put config: {e}")))?;
let mut builder = self.inner.put_object().bucket(&path.bucket).key(&path.key);
if let Some(ct) = content_type {
builder = builder.content_type(ct);
}
let request = builder
.presigned(config)
.await
.map_err(|e| Error::General(format!("presign_put: {e}")))?;
Ok(request.uri().to_string())
}
async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
let response = self
.inner
.get_bucket_versioning()
.bucket(bucket)
.send()
.await
.map_err(|e| Error::General(format!("get_versioning: {e}")))?;
Ok(response
.status()
.map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
}
async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
let status = if enabled {
BucketVersioningStatus::Enabled
} else {
BucketVersioningStatus::Suspended
};
let config = VersioningConfiguration::builder().status(status).build();
self.inner
.put_bucket_versioning()
.bucket(bucket)
.versioning_configuration(config)
.send()
.await
.map_err(|e| Error::General(format!("set_versioning: {e}")))?;
Ok(())
}
async fn list_object_versions(
&self,
path: &RemotePath,
max_keys: Option<i32>,
) -> Result<Vec<ObjectVersion>> {
let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
if !path.key.is_empty() {
builder = builder.prefix(&path.key);
}
if let Some(max) = max_keys {
builder = builder.max_keys(max);
}
let response = builder
.send()
.await
.map_err(|e| Error::General(format!("list_object_versions: {e}")))?;
let mut versions = Vec::new();
for v in response.versions() {
versions.push(ObjectVersion {
key: v.key().unwrap_or_default().to_string(),
version_id: v.version_id().unwrap_or("null").to_string(),
is_latest: v.is_latest().unwrap_or(false),
is_delete_marker: false,
last_modified: v
.last_modified()
.and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
size_bytes: v.size(),
etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
});
}
for m in response.delete_markers() {
versions.push(ObjectVersion {
key: m.key().unwrap_or_default().to_string(),
version_id: m.version_id().unwrap_or("null").to_string(),
is_latest: m.is_latest().unwrap_or(false),
is_delete_marker: true,
last_modified: m
.last_modified()
.and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
size_bytes: None,
etag: None,
});
}
versions.sort_by(|a, b| {
a.key
.cmp(&b.key)
.then_with(|| b.last_modified.cmp(&a.last_modified))
});
Ok(versions)
}
async fn get_object_tags(
&self,
path: &RemotePath,
) -> Result<std::collections::HashMap<String, String>> {
let response = match self
.inner
.get_object_tagging()
.bucket(&path.bucket)
.key(&path.key)
.send()
.await
{
Ok(response) => response,
Err(e) => {
if e.to_string().contains("NoSuchTagSet") {
return Ok(std::collections::HashMap::new());
}
return Err(Error::General(format!("get_object_tags: {e}")));
}
};
let mut tags = std::collections::HashMap::new();
for tag in response.tag_set() {
let key = tag.key();
let value = tag.value();
tags.insert(key.to_string(), value.to_string());
}
Ok(tags)
}
async fn get_bucket_tags(
&self,
bucket: &str,
) -> Result<std::collections::HashMap<String, String>> {
let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
Ok(response) => response,
Err(e) => {
if e.to_string().contains("NoSuchTagSet") {
return Ok(std::collections::HashMap::new());
}
return Err(Error::General(format!("get_bucket_tags: {e}")));
}
};
let mut tags = std::collections::HashMap::new();
for tag in response.tag_set() {
let key = tag.key();
let value = tag.value();
tags.insert(key.to_string(), value.to_string());
}
Ok(tags)
}
async fn set_object_tags(
&self,
path: &RemotePath,
tags: std::collections::HashMap<String, String>,
) -> Result<()> {
let tagging = build_tagging(tags)?;
self.inner
.put_object_tagging()
.bucket(&path.bucket)
.key(&path.key)
.tagging(tagging)
.send()
.await
.map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
Ok(())
}
async fn set_bucket_tags(
&self,
bucket: &str,
tags: std::collections::HashMap<String, String>,
) -> Result<()> {
let tagging = build_tagging(tags)?;
self.inner
.put_bucket_tagging()
.bucket(bucket)
.tagging(tagging)
.send()
.await
.map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
Ok(())
}
async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
self.inner
.delete_object_tagging()
.bucket(&path.bucket)
.key(&path.key)
.send()
.await
.map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
Ok(())
}
async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
self.inner
.delete_bucket_tagging()
.bucket(bucket)
.send()
.await
.map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
Ok(())
}
async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
Ok(policy) => policy,
Err(error) => {
let error_text = error.to_string();
let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
let code = service_err
.raw()
.headers()
.get("x-amz-error-code")
.and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
let status = Some(service_err.raw().status().as_u16());
Self::bucket_policy_error_kind(code, status, &error_text)
} else {
Self::bucket_policy_error_kind(None, None, &error_text)
};
return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
}
};
Ok(response.policy().map(|policy| policy.to_string()))
}
async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
self.inner
.put_bucket_policy()
.bucket(bucket)
.policy(policy)
.send()
.await
.map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
Ok(())
}
async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
match self
.inner
.delete_bucket_policy()
.bucket(bucket)
.send()
.await
{
Ok(_) => Ok(()),
Err(e) => {
let error_text = e.to_string();
let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
let code = service_err
.raw()
.headers()
.get("x-amz-error-code")
.and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
let status = Some(service_err.raw().status().as_u16());
Self::bucket_policy_error_kind(code, status, &error_text)
} else {
Self::bucket_policy_error_kind(None, None, &error_text)
};
Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_object_info_creation() {
let info = ObjectInfo::file("test.txt", 1024);
assert_eq!(info.key, "test.txt");
assert_eq!(info.size_bytes, Some(1024));
}
#[test]
fn bucket_policy_error_kind_uses_error_code() {
assert_eq!(
S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
BucketPolicyErrorKind::MissingPolicy
);
assert_eq!(
S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
BucketPolicyErrorKind::MissingBucket
);
}
#[test]
fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
assert_eq!(
S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
BucketPolicyErrorKind::MissingBucket
);
assert_eq!(
S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
BucketPolicyErrorKind::MissingPolicy
);
}
#[test]
fn bucket_policy_error_mapping_returns_expected_result() {
let get_missing_policy = S3Client::map_get_bucket_policy_error(
"bucket",
BucketPolicyErrorKind::MissingPolicy,
"NoSuchPolicy",
)
.expect("missing policy should map to Ok(None)");
assert!(get_missing_policy.is_none());
match S3Client::map_get_bucket_policy_error(
"bucket",
BucketPolicyErrorKind::MissingBucket,
"NoSuchBucket",
) {
Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
}
let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
"bucket",
BucketPolicyErrorKind::MissingPolicy,
"NoSuchPolicy",
);
assert!(
delete_missing_policy.is_ok(),
"Missing policy should be treated as successful delete"
);
}
#[tokio::test]
async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
let connector = ReqwestConnector::new(true, None).await;
assert!(
connector.is_ok(),
"Expected insecure connector creation to succeed"
);
}
#[tokio::test]
async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
let result = ReqwestConnector::new(false, Some("")).await;
match result {
Err(Error::Network(msg)) => {
assert!(
msg.contains("Failed to read CA bundle"),
"Unexpected error message: {msg}"
);
}
other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
}
}
#[test]
fn should_use_multipart_for_large_files() {
assert!(S3Client::should_use_multipart(
SINGLE_PUT_OBJECT_MAX_SIZE + 1
));
}
#[test]
fn should_use_single_part_for_small_files() {
assert!(!S3Client::should_use_multipart(0));
assert!(!S3Client::should_use_multipart(1024 * 1024));
assert!(!S3Client::should_use_multipart(
crate::multipart::DEFAULT_PART_SIZE
));
assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
}
#[tokio::test]
async fn read_next_part_fills_buffer_until_eof() {
use tokio::io::AsyncWriteExt;
let temp_dir = tempfile::tempdir().expect("create temp dir");
let file_path = temp_dir.path().join("payload.bin");
let mut writer = tokio::fs::File::create(&file_path)
.await
.expect("create temp file");
writer
.write_all(b"abcdefghij")
.await
.expect("write temp file");
writer.flush().await.expect("flush temp file");
drop(writer);
let mut reader = tokio::fs::File::open(&file_path)
.await
.expect("open temp file");
let mut buffer = vec![0u8; 8];
let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
.await
.expect("first read");
assert_eq!(first, 8);
assert_eq!(&buffer[..first], b"abcdefgh");
let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
.await
.expect("second read");
assert_eq!(second, 2);
assert_eq!(&buffer[..second], b"ij");
let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
.await
.expect("third read");
assert_eq!(third, 0);
}
}