use actix_web::body::{BoxBody, MessageBody};
use actix_web::dev::{ServiceRequest, ServiceResponse};
use actix_web::middleware::Next;
use actix_web::{Error, HttpRequest, HttpResponse, web, web::Json};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::RequestId;
use aws_sdk_s3::operation::copy_object::builders::CopyObjectFluentBuilder;
use aws_sdk_s3::operation::create_multipart_upload::builders::CreateMultipartUploadFluentBuilder;
use aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2FluentBuilder;
use aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder;
use aws_sdk_s3::presigning::PresigningConfig;
use aws_sdk_s3::types::{
AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
LifecycleExpiration, LifecycleRule, LifecycleRuleFilter, MetadataDirective,
NoncurrentVersionExpiration, ObjectCannedAcl, PublicAccessBlockConfiguration,
ServerSideEncryption,
};
use aws_smithy_types::date_time::Format as AwsDateTimeFormat;
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
use chrono::Utc;
use hmac::{Hmac, Mac};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sha2::Sha256;
use sqlx::postgres::PgPool;
use std::collections::HashMap;
use std::net::IpAddr;
use std::path::PathBuf;
use std::process::Output;
use std::time::Duration;
use tokio::fs;
use tokio::process::Command;
pub mod auth;
pub mod digitalocean;
pub(crate) mod errors;
pub mod files;
pub mod hetzner;
pub mod minio;
pub mod s3;
pub(crate) mod service;
pub(crate) mod validation;
use tracing::warn;
use which::which;
use crate::AppState;
use crate::api::client_context::required_client_pool;
use crate::api::gateway::auth::{require_admin_or_gateway, storage_proxy_right};
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::rate_limit::check_inbound_optional;
use crate::api::response::{
api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};
use crate::api::s3 as storage_catalog;
use errors::{
STORAGE_ERROR_CODE_STORAGE_LIST_BUCKETS_FAILED, STORAGE_ERROR_CODE_STORAGE_LIST_OBJECTS_FAILED,
describe_s3_error, map_s3_operation_error,
};
use service::{PRESIGN_URL_EXPIRY_SECS, TempPathCleanup, build_s3_client, ensure_bucket_present};
#[derive(Debug, Deserialize)]
struct S3ConnectionConfig {
endpoint: String,
region: String,
access_key_id: String,
secret_key: String,
}
#[derive(Debug, Deserialize)]
struct S3Creds {
#[serde(flatten)]
connection: S3ConnectionConfig,
bucket: String,
}
#[derive(Debug, Deserialize)]
struct ListBucketsRequest {
#[serde(flatten)]
connection: S3ConnectionConfig,
}
#[derive(Debug, Deserialize)]
struct ListObjectsRequest {
#[serde(flatten)]
creds: S3Creds,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
delimiter: Option<String>,
#[serde(default)]
continuation_token: Option<String>,
#[serde(default = "default_max_keys")]
max_keys: i32,
}
fn default_max_keys() -> i32 {
200
}
#[derive(Debug, Deserialize)]
struct GetObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
#[serde(default)]
checksum_sha256: Option<String>,
#[serde(default)]
etag: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CopyObjectRequest {
#[serde(flatten)]
creds: S3Creds,
source_key: String,
destination_key: String,
#[serde(default)]
destination_bucket: Option<String>,
#[serde(default, flatten)]
encryption: StorageServerSideEncryptionRequest,
}
#[derive(Debug, Deserialize)]
struct ObjectPublicUrlRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
#[serde(default)]
public_base_url: Option<String>,
#[serde(default)]
force_path_style: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct DeleteObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct CreateFolderRequest {
#[serde(flatten)]
creds: S3Creds,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct DeleteFolderRequest {
#[serde(flatten)]
creds: S3Creds,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct RenameFolderRequest {
#[serde(flatten)]
creds: S3Creds,
from_prefix: String,
to_prefix: String,
}
#[derive(Debug, Deserialize)]
struct PresignUploadRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
content_type: Option<String>,
#[serde(default, flatten)]
encryption: StorageServerSideEncryptionRequest,
}
#[derive(Debug, Deserialize)]
struct UpdateObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
#[serde(default)]
acl: Option<String>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
cache_control: Option<String>,
#[serde(default)]
content_disposition: Option<String>,
#[serde(default)]
content_encoding: Option<String>,
#[serde(default)]
content_language: Option<String>,
#[serde(default)]
metadata: Option<HashMap<String, String>>,
}
#[derive(Debug, Deserialize)]
struct BucketCorsRequest {
#[serde(flatten)]
creds: S3Creds,
}
#[derive(Debug, Deserialize)]
struct BucketCorsRuleInput {
allowed_origins: Vec<String>,
allowed_methods: Vec<String>,
#[serde(default)]
allowed_headers: Vec<String>,
#[serde(default)]
expose_headers: Vec<String>,
#[serde(default)]
max_age_seconds: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct SetBucketCorsRequest {
#[serde(flatten)]
creds: S3Creds,
rules: Vec<BucketCorsRuleInput>,
}
#[derive(Debug, Default, Deserialize)]
struct StorageServerSideEncryptionRequest {
#[serde(default, alias = "sse")]
server_side_encryption: Option<String>,
#[serde(default, alias = "kms_key_id")]
ssekms_key_id: Option<String>,
#[serde(default)]
bucket_key_enabled: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct ObjectVersionsRequest {
#[serde(flatten)]
creds: S3Creds,
#[serde(default)]
key: Option<String>,
#[serde(default)]
max_keys: Option<i32>,
#[serde(default)]
key_marker: Option<String>,
#[serde(default)]
version_id_marker: Option<String>,
#[serde(default)]
delimiter: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ObjectVersionMutationRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
version_id: String,
}
#[derive(Debug, Deserialize)]
struct BucketLifecycleRequest {
#[serde(flatten)]
creds: S3Creds,
}
#[derive(Debug, Deserialize)]
struct BucketLifecycleRuleInput {
#[serde(default)]
id: Option<String>,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
expiration_days: Option<i32>,
#[serde(default)]
expired_object_delete_marker: Option<bool>,
#[serde(default)]
noncurrent_version_expiration_days: Option<i32>,
#[serde(default)]
abort_incomplete_multipart_upload_days: Option<i32>,
}
#[derive(Debug, Deserialize)]
struct SetBucketLifecycleRequest {
#[serde(flatten)]
creds: S3Creds,
rules: Vec<BucketLifecycleRuleInput>,
}
#[derive(Debug, Deserialize)]
struct BucketPolicyRequest {
#[serde(flatten)]
creds: S3Creds,
}
#[derive(Debug, Deserialize)]
struct SetBucketPolicyRequest {
#[serde(flatten)]
creds: S3Creds,
policy: Value,
}
#[derive(Debug, Deserialize)]
struct PublicAccessBlockRequest {
#[serde(flatten)]
creds: S3Creds,
}
#[derive(Debug, Deserialize)]
struct SetPublicAccessBlockRequest {
#[serde(flatten)]
creds: S3Creds,
#[serde(default)]
block_public_acls: Option<bool>,
#[serde(default)]
ignore_public_acls: Option<bool>,
#[serde(default)]
block_public_policy: Option<bool>,
#[serde(default)]
restrict_public_buckets: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct SignedPostPolicyRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
min_size: Option<i64>,
#[serde(default)]
max_size: Option<i64>,
#[serde(default)]
expires_in: Option<u64>,
#[serde(default)]
public_base_url: Option<String>,
#[serde(default)]
force_path_style: Option<bool>,
#[serde(default)]
success_action_status: Option<String>,
#[serde(default, flatten)]
encryption: StorageServerSideEncryptionRequest,
}
#[derive(Debug, Serialize)]
struct StorageBucketSummary {
name: String,
creation_date: String,
}
#[derive(Debug, Serialize)]
struct StorageBucketListResponse {
buckets: Vec<StorageBucketSummary>,
count: usize,
}
#[derive(Debug, Serialize)]
struct S3Object {
key: String,
size: i64,
last_modified: String,
is_folder: bool,
etag: Option<String>,
storage_class: Option<String>,
}
#[derive(Debug, Serialize)]
struct ListObjectsResponse {
objects: Vec<S3Object>,
common_prefixes: Vec<String>,
is_truncated: bool,
next_continuation_token: Option<String>,
prefix: Option<String>,
}
#[derive(Debug, Serialize)]
struct HeadObjectResponse {
bucket: String,
key: String,
content_length: i64,
content_type: Option<String>,
cache_control: Option<String>,
content_disposition: Option<String>,
content_encoding: Option<String>,
content_language: Option<String>,
last_modified: String,
etag: Option<String>,
version_id: Option<String>,
storage_class: Option<String>,
metadata: serde_json::Map<String, Value>,
}
#[derive(Debug, Serialize)]
struct UpdateObjectResponse {
bucket: String,
key: String,
updated_metadata_keys: usize,
applied_acl: Option<String>,
warning: Option<String>,
}
#[derive(Clone)]
struct StorageUpdateTrackingContext {
pool: PgPool,
organization_id: String,
user_id: Option<String>,
route: String,
bucket: String,
key: String,
request_payload: Value,
}
const MAX_OBJECT_METADATA_ENTRIES: usize = 128;
const MAX_OBJECT_METADATA_KEY_LEN: usize = 128;
const MAX_OBJECT_METADATA_VALUE_LEN: usize = 2048;
fn normalize_optional_string(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|v| !v.is_empty())
.map(str::to_string)
}
fn contains_control_chars(value: &str) -> bool {
value
.chars()
.any(|c| c.is_control() && c != '\n' && c != '\r' && c != '\t')
}
fn normalize_metadata_input(
metadata: Option<&HashMap<String, String>>,
) -> Result<Option<HashMap<String, String>>, HttpResponse> {
let Some(raw) = metadata else {
return Ok(None);
};
if raw.len() > MAX_OBJECT_METADATA_ENTRIES {
return Err(bad_request(
"Too many metadata entries",
format!("metadata supports at most {MAX_OBJECT_METADATA_ENTRIES} key/value pairs"),
));
}
let mut out: HashMap<String, String> = HashMap::with_capacity(raw.len());
for (key, value) in raw {
let normalized_key = key.trim();
if normalized_key.is_empty() {
return Err(bad_request(
"Invalid metadata key",
"metadata keys must not be empty",
));
}
if normalized_key.len() > MAX_OBJECT_METADATA_KEY_LEN {
return Err(bad_request(
"Invalid metadata key",
format!(
"metadata key '{normalized_key}' exceeds {MAX_OBJECT_METADATA_KEY_LEN} characters"
),
));
}
if contains_control_chars(normalized_key) {
return Err(bad_request(
"Invalid metadata key",
format!("metadata key '{normalized_key}' contains control characters"),
));
}
if value.len() > MAX_OBJECT_METADATA_VALUE_LEN {
return Err(bad_request(
"Invalid metadata value",
format!(
"metadata value for key '{normalized_key}' exceeds {MAX_OBJECT_METADATA_VALUE_LEN} characters"
),
));
}
if contains_control_chars(value) {
return Err(bad_request(
"Invalid metadata value",
format!("metadata value for key '{normalized_key}' contains control characters"),
));
}
out.insert(normalized_key.to_string(), value.clone());
}
Ok(Some(out))
}
fn non_empty_header_value(value: Option<String>) -> Option<String> {
value.and_then(|raw| {
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
}
fn build_storage_update_request_payload(
bucket: &str,
key: &str,
body: &UpdateObjectRequest,
) -> Value {
json!({
"bucket": bucket,
"key": key,
"acl": body.acl,
"content_type": body.content_type,
"cache_control": body.cache_control,
"content_disposition": body.content_disposition,
"content_encoding": body.content_encoding,
"content_language": body.content_language,
"metadata": body.metadata,
})
}
async fn prepare_storage_update_tracking(
req: &HttpRequest,
app_state: &AppState,
body: &UpdateObjectRequest,
key: &str,
) -> Result<Option<StorageUpdateTrackingContext>, HttpResponse> {
let organization_id = non_empty_header_value(get_x_organization_id(req));
let user_id = non_empty_header_value(get_x_user_id(req));
let Some(organization_id) = organization_id else {
return Ok(None);
};
let pool = required_client_pool(req, app_state).await?;
if let Err(err) = athena_s3::ensure_storage_file_catalog_tables(&pool).await {
return Err(service_unavailable(
"Athena file catalog unavailable",
err.to_string(),
));
}
Ok(Some(StorageUpdateTrackingContext {
pool,
organization_id,
user_id,
route: "/storage/objects/update".to_string(),
bucket: body.creds.bucket.clone(),
key: key.to_string(),
request_payload: build_storage_update_request_payload(&body.creds.bucket, key, body),
}))
}
async fn persist_storage_update_tracking(
context: &StorageUpdateTrackingContext,
status: athena_s3::StorageFileMutationStatus,
message: impl Into<String>,
mime_type: Option<String>,
size_bytes: Option<i64>,
metadata: Value,
result_payload: Value,
) -> Result<(), String> {
let record = athena_s3::StorageFileMutationRecord {
organization_id: context.organization_id.clone(),
user_id: context.user_id.clone(),
route: context.route.clone(),
bucket: context.bucket.clone(),
key: context.key.clone(),
mime_type,
size_bytes,
metadata,
request_payload: context.request_payload.clone(),
result_payload,
message: message.into(),
status,
};
athena_s3::record_storage_file_update(&context.pool, &record)
.await
.map(|_| ())
.map_err(|err| err.to_string())
}
async fn best_effort_storage_update_tracking(
context: Option<&StorageUpdateTrackingContext>,
status: athena_s3::StorageFileMutationStatus,
message: impl Into<String>,
mime_type: Option<String>,
size_bytes: Option<i64>,
metadata: Value,
result_payload: Value,
) {
let Some(context) = context else {
return;
};
let message = message.into();
if let Err(err) = persist_storage_update_tracking(
context,
status,
message.clone(),
mime_type,
size_bytes,
metadata,
result_payload,
)
.await
{
warn!(
bucket = %context.bucket,
key = %context.key,
organization_id = %context.organization_id,
error = %err,
"failed to persist Athena storage file tracking"
);
}
}
fn parse_object_acl(raw_acl: &str) -> Result<ObjectCannedAcl, HttpResponse> {
let normalized_input = raw_acl.trim().to_ascii_lowercase();
let normalized = match normalized_input.as_str() {
"" => {
return Err(bad_request(
"Invalid ACL",
"acl field must not be empty when provided",
));
}
"public" => "public-read",
other => other,
};
ObjectCannedAcl::try_parse(normalized).map_err(|_| {
bad_request(
"Invalid ACL",
format!(
"Unsupported acl value '{normalized}'. Allowed values: {}",
ObjectCannedAcl::values().join(", ")
),
)
})
}
fn validate_s3_connection(connection: &S3ConnectionConfig) -> Result<(), HttpResponse> {
let endpoint = validation::validate_storage_endpoint(&connection.endpoint)?;
validation::validate_region(&connection.region)?;
validation::validate_access_credentials(&connection.access_key_id, &connection.secret_key)?;
validation::validate_provider_specific_credentials(&endpoint, &connection.access_key_id)?;
if validation::scanner_junk_in_storage_field(&connection.endpoint)
|| validation::scanner_junk_in_storage_field(&connection.region)
{
return Err(bad_request(
"Invalid request",
"One or more fields contain unsupported characters",
));
}
Ok(())
}
fn validate_s3_creds(creds: &S3Creds) -> Result<(), HttpResponse> {
validate_s3_connection(&creds.connection)?;
validation::validate_bucket_name(&creds.bucket)?;
if validation::scanner_junk_in_storage_field(&creds.bucket) {
return Err(bad_request(
"Invalid request",
"One or more fields contain unsupported characters",
));
}
Ok(())
}
fn check_storage_rate_limit(req: &HttpRequest, app_state: &AppState) -> Result<(), HttpResponse> {
check_inbound_optional(
&app_state.inbound_rate_limit_storage,
app_state.inbound_rate_limit_trust_x_forwarded_for,
req,
)
}
async fn storage_auth_middleware<B>(
req: ServiceRequest,
next: Next<B>,
) -> Result<ServiceResponse<BoxBody>, Error>
where
B: MessageBody + 'static,
{
if req.path().starts_with("/storage/objects") || req.path().starts_with("/storage/buckets") {
let Some(data) = req.app_data::<web::Data<AppState>>() else {
return Err(actix_web::error::ErrorInternalServerError(
"AppState not configured",
));
};
let state: &AppState = data.get_ref();
if let Err(http_resp) =
require_admin_or_gateway(req.request(), state, None, vec![storage_proxy_right()]).await
{
return Ok(req.into_response(http_resp.map_into_boxed_body()));
}
}
Ok(next.call(req).await?.map_into_boxed_body())
}
async fn build_client(creds: &S3Creds) -> S3Client {
build_s3_client(
&creds.connection.endpoint,
&creds.connection.region,
&creds.bucket,
&creds.connection.access_key_id,
&creds.connection.secret_key,
)
.await
}
async fn build_connection_client(connection: &S3ConnectionConfig) -> S3Client {
build_s3_client(
&connection.endpoint,
&connection.region,
"",
&connection.access_key_id,
&connection.secret_key,
)
.await
}
fn resolve_s3cmd_path() -> Option<PathBuf> {
which("s3cmd").ok().or_else(|| which("s3cmd.exe").ok())
}
fn normalize_endpoint_url(endpoint: &str) -> Result<reqwest::Url, String> {
let trimmed: &str = endpoint.trim();
if trimmed.is_empty() {
return Err("endpoint field is empty".to_string());
}
if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
return reqwest::Url::parse(trimmed)
.map_err(|err| format!("invalid endpoint URL '{}': {err}", trimmed));
}
let with_scheme: String = format!("https://{trimmed}");
reqwest::Url::parse(&with_scheme)
.map_err(|err| format!("invalid endpoint URL '{}': {err}", trimmed))
}
fn key_missing_or_invalid(key: &str) -> Option<HttpResponse> {
let trimmed = key.trim();
if trimmed.is_empty() {
return Some(bad_request("Key is required", "key field is empty"));
}
if validation::scanner_junk_in_storage_field(trimmed) || trimmed.contains("..") {
return Some(bad_request(
"Invalid key",
"key contains unsupported characters",
));
}
None
}
fn is_object_not_found(detail: &str) -> bool {
detail.contains("NotFound")
|| detail.contains("NoSuchKey")
|| detail.contains("NoSuchVersion")
|| detail.contains("404")
}
fn build_public_object_url(
endpoint: &str,
bucket: &str,
key: &str,
public_base_url: Option<&str>,
force_path_style: bool,
) -> Result<String, HttpResponse> {
let mut url = if let Some(base) = public_base_url
.map(str::trim)
.filter(|value| !value.is_empty())
{
reqwest::Url::parse(base)
.map_err(|err| bad_request("Invalid public_base_url", err.to_string()))?
} else {
normalize_endpoint_url(endpoint).map_err(|err| bad_request("Invalid endpoint", err))?
};
let host = url
.host_str()
.ok_or_else(|| bad_request("Invalid endpoint", "endpoint URL does not contain a host"))?
.to_string();
let should_force_path_style = force_path_style
|| host.eq_ignore_ascii_case("localhost")
|| host.parse::<IpAddr>().is_ok();
if !should_force_path_style {
let bucket_host = format!("{bucket}.{host}");
url.set_host(Some(&bucket_host)).map_err(|_| {
bad_request(
"Invalid bucket host",
"bucket could not be applied to the endpoint host",
)
})?;
}
{
let mut segments = url.path_segments_mut().map_err(|_| {
bad_request(
"Invalid endpoint",
"endpoint URL cannot be used to build a public object URL",
)
})?;
segments.pop_if_empty();
if should_force_path_style {
segments.push(bucket);
}
for part in key.split('/').filter(|part| !part.is_empty()) {
segments.push(part);
}
}
url.set_query(None);
url.set_fragment(None);
Ok(url.to_string())
}
fn build_bucket_action_url(
endpoint: &str,
bucket: &str,
public_base_url: Option<&str>,
force_path_style: bool,
) -> Result<String, HttpResponse> {
let mut url = if let Some(base) = public_base_url
.map(str::trim)
.filter(|value| !value.is_empty())
{
reqwest::Url::parse(base)
.map_err(|err| bad_request("Invalid public_base_url", err.to_string()))?
} else {
normalize_endpoint_url(endpoint).map_err(|err| bad_request("Invalid endpoint", err))?
};
let host = url
.host_str()
.ok_or_else(|| bad_request("Invalid endpoint", "endpoint URL does not contain a host"))?
.to_string();
let should_force_path_style = force_path_style
|| host.eq_ignore_ascii_case("localhost")
|| host.parse::<IpAddr>().is_ok();
if !should_force_path_style {
let bucket_host = format!("{bucket}.{host}");
url.set_host(Some(&bucket_host)).map_err(|_| {
bad_request(
"Invalid bucket host",
"bucket could not be applied to the endpoint host",
)
})?;
}
{
let mut segments = url.path_segments_mut().map_err(|_| {
bad_request(
"Invalid endpoint",
"endpoint URL cannot be used to build a bucket action URL",
)
})?;
segments.pop_if_empty();
if should_force_path_style {
segments.push(bucket);
}
}
url.set_query(None);
url.set_fragment(None);
Ok(url.to_string())
}
fn normalize_sse_value(
encryption: &StorageServerSideEncryptionRequest,
) -> Result<Option<ServerSideEncryption>, HttpResponse> {
let Some(value) = encryption
.server_side_encryption
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(None);
};
let canonical = match value.to_ascii_lowercase().as_str() {
"aes256" => "AES256",
"aws:kms" | "kms" => "aws:kms",
"aws:kms:dsse" | "kms:dsse" | "dsse" => "aws:kms:dsse",
other => {
return Err(bad_request(
"Invalid server-side encryption",
format!("unsupported server_side_encryption value '{other}'"),
));
}
};
Ok(Some(ServerSideEncryption::from(canonical)))
}
fn apply_sse_to_put(
mut request: PutObjectFluentBuilder,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<PutObjectFluentBuilder, HttpResponse> {
if let Some(sse) = normalize_sse_value(encryption)? {
request = request.server_side_encryption(sse);
}
if let Some(key_id) = encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.ssekms_key_id(key_id);
}
if let Some(enabled) = encryption.bucket_key_enabled {
request = request.bucket_key_enabled(enabled);
}
Ok(request)
}
fn apply_sse_to_copy(
mut request: CopyObjectFluentBuilder,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<CopyObjectFluentBuilder, HttpResponse> {
if let Some(sse) = normalize_sse_value(encryption)? {
request = request.server_side_encryption(sse);
}
if let Some(key_id) = encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.ssekms_key_id(key_id);
}
if let Some(enabled) = encryption.bucket_key_enabled {
request = request.bucket_key_enabled(enabled);
}
Ok(request)
}
fn apply_sse_to_multipart(
mut request: CreateMultipartUploadFluentBuilder,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<CreateMultipartUploadFluentBuilder, HttpResponse> {
if let Some(sse) = normalize_sse_value(encryption)? {
request = request.server_side_encryption(sse);
}
if let Some(key_id) = encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.ssekms_key_id(key_id);
}
if let Some(enabled) = encryption.bucket_key_enabled {
request = request.bucket_key_enabled(enabled);
}
Ok(request)
}
fn presigned_headers_to_json<'a>(headers: impl Iterator<Item = (&'a str, &'a str)>) -> Value {
let mut map = serde_json::Map::new();
for (name, value) in headers {
map.insert(name.to_string(), Value::String(value.to_string()));
}
Value::Object(map)
}
fn aws_datetime_to_string(value: &aws_smithy_types::DateTime) -> String {
value
.fmt(AwsDateTimeFormat::DateTime)
.unwrap_or_else(|_| value.to_string())
}
fn object_version_to_json(value: &aws_sdk_s3::types::ObjectVersion) -> Value {
json!({
"key": value.key(),
"version_id": value.version_id(),
"is_latest": value.is_latest(),
"last_modified": value.last_modified().map(aws_datetime_to_string),
"etag": value.e_tag(),
"size": value.size(),
"storage_class": value.storage_class().map(|v| v.as_str()),
"checksum_algorithm": value.checksum_algorithm().iter().map(|v| v.as_str()).collect::<Vec<_>>(),
"checksum_type": value.checksum_type().map(|v| v.as_str()),
})
}
fn delete_marker_to_json(value: &aws_sdk_s3::types::DeleteMarkerEntry) -> Value {
json!({
"key": value.key(),
"version_id": value.version_id(),
"is_latest": value.is_latest(),
"last_modified": value.last_modified().map(aws_datetime_to_string),
})
}
fn encode_s3_copy_source_component(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for byte in value.bytes() {
let ch = byte as char;
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~' | '/') {
out.push(ch);
} else {
out.push_str(&format!("%{byte:02X}"));
}
}
out
}
fn s3_copy_source(bucket: &str, key: &str, version_id: Option<&str>) -> String {
let encoded_key = encode_s3_copy_source_component(key);
match version_id {
Some(version_id) => format!(
"{bucket}/{encoded_key}?versionId={}",
encode_s3_copy_source_component(version_id)
),
None => format!("{bucket}/{encoded_key}"),
}
}
fn normalize_positive_days(value: Option<i32>, field: &str) -> Result<Option<i32>, HttpResponse> {
match value {
Some(days) if days <= 0 => Err(bad_request(
"Invalid lifecycle rule",
format!("{field} must be a positive integer"),
)),
other => Ok(other),
}
}
fn build_lifecycle_rule(input: &BucketLifecycleRuleInput) -> Result<LifecycleRule, HttpResponse> {
let status = match input
.status
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("Enabled")
{
"Enabled" | "enabled" => ExpirationStatus::Enabled,
"Disabled" | "disabled" => ExpirationStatus::Disabled,
other => {
return Err(bad_request(
"Invalid lifecycle status",
format!("unsupported lifecycle status '{other}'"),
));
}
};
let expiration_days = normalize_positive_days(input.expiration_days, "expiration_days")?;
let noncurrent_days = normalize_positive_days(
input.noncurrent_version_expiration_days,
"noncurrent_version_expiration_days",
)?;
let abort_days = normalize_positive_days(
input.abort_incomplete_multipart_upload_days,
"abort_incomplete_multipart_upload_days",
)?;
let mut builder = LifecycleRule::builder().status(status);
if let Some(id) = input
.id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
builder = builder.id(id);
}
let filter = match input
.prefix
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
Some(prefix) => LifecycleRuleFilter::builder().prefix(prefix).build(),
None => LifecycleRuleFilter::builder().build(),
};
builder = builder.filter(filter);
if expiration_days.is_some() || input.expired_object_delete_marker.is_some() {
let mut expiration = LifecycleExpiration::builder();
if let Some(days) = expiration_days {
expiration = expiration.days(days);
}
if let Some(marker) = input.expired_object_delete_marker {
expiration = expiration.expired_object_delete_marker(marker);
}
builder = builder.expiration(expiration.build());
}
if let Some(days) = noncurrent_days {
builder = builder.noncurrent_version_expiration(
NoncurrentVersionExpiration::builder()
.noncurrent_days(days)
.build(),
);
}
if let Some(days) = abort_days {
builder = builder.abort_incomplete_multipart_upload(
AbortIncompleteMultipartUpload::builder()
.days_after_initiation(days)
.build(),
);
}
builder
.build()
.map_err(|err| bad_request("Invalid lifecycle rule", err.to_string()))
}
fn lifecycle_rule_to_json(rule: &LifecycleRule) -> Value {
json!({
"id": rule.id.as_deref(),
"status": rule.status.as_str(),
"prefix": rule.prefix.as_deref(),
"filter": rule.filter.as_ref().map(|filter| json!({
"prefix": filter.prefix.as_deref(),
"object_size_greater_than": filter.object_size_greater_than,
"object_size_less_than": filter.object_size_less_than,
})),
"expiration": rule.expiration.as_ref().map(|expiration| json!({
"date": expiration.date.as_ref().map(aws_datetime_to_string),
"days": expiration.days,
"expired_object_delete_marker": expiration.expired_object_delete_marker,
})),
"noncurrent_version_expiration": rule.noncurrent_version_expiration.as_ref().map(|expiration| json!({
"noncurrent_days": expiration.noncurrent_days,
"newer_noncurrent_versions": expiration.newer_noncurrent_versions,
})),
"abort_incomplete_multipart_upload": rule.abort_incomplete_multipart_upload.as_ref().map(|abort| json!({
"days_after_initiation": abort.days_after_initiation,
})),
})
}
fn public_access_block_to_json(config: &PublicAccessBlockConfiguration) -> Value {
json!({
"block_public_acls": config.block_public_acls,
"ignore_public_acls": config.ignore_public_acls,
"block_public_policy": config.block_public_policy,
"restrict_public_buckets": config.restrict_public_buckets,
})
}
type HmacSha256 = Hmac<Sha256>;
fn hmac_sha256(key: &[u8], data: &str) -> Vec<u8> {
let mut mac = HmacSha256::new_from_slice(key).expect("HMAC accepts any key length");
mac.update(data.as_bytes());
mac.finalize().into_bytes().to_vec()
}
fn signed_post_signature(
secret_key: &str,
date: &str,
region: &str,
encoded_policy: &str,
) -> String {
let k_date = hmac_sha256(format!("AWS4{secret_key}").as_bytes(), date);
let k_region = hmac_sha256(&k_date, region);
let k_service = hmac_sha256(&k_region, "s3");
let k_signing = hmac_sha256(&k_service, "aws4_request");
hex::encode(hmac_sha256(&k_signing, encoded_policy))
}
fn storage_feature_unavailable(
route: &str,
feature: &str,
detail: &str,
context: Value,
) -> HttpResponse {
HttpResponse::NotImplemented().json(json!({
"status": "error",
"message": "Storage feature is not available on this Athena runtime",
"error": detail,
"code": "STORAGE_FEATURE_UNAVAILABLE",
"feature": feature,
"route": route,
"context": context,
}))
}
fn s3cmd_base_args(creds: &S3Creds) -> Result<Vec<String>, String> {
if creds.connection.access_key_id.trim().is_empty() {
return Err("access_key_id is required for s3cmd".to_string());
}
if creds.connection.secret_key.trim().is_empty() {
return Err("secret_key is required for s3cmd".to_string());
}
let endpoint: reqwest::Url = normalize_endpoint_url(&creds.connection.endpoint)?;
let host: &str = endpoint
.host_str()
.ok_or_else(|| "endpoint URL does not contain a host".to_string())?;
let mut args: Vec<String> = vec![
format!("--access_key={}", creds.connection.access_key_id),
format!("--secret_key={}", creds.connection.secret_key),
format!("--host={host}"),
format!("--host-bucket=%(bucket)s.{host}"),
];
if endpoint.scheme().eq_ignore_ascii_case("http") {
args.push("--no-ssl".to_string());
}
Ok(args)
}
async fn run_s3cmd(creds: &S3Creds, command_args: &[String]) -> Result<String, String> {
let s3cmd_path: PathBuf = resolve_s3cmd_path().ok_or_else(|| {
"s3cmd is not installed or not available in PATH. Install s3cmd to manage bucket CORS."
.to_string()
})?;
let mut cmd: Command = Command::new(s3cmd_path);
for arg in s3cmd_base_args(creds)? {
cmd.arg(arg);
}
for arg in command_args {
cmd.arg(arg);
}
let output: Output = cmd
.output()
.await
.map_err(|err| format!("failed to execute s3cmd: {err}"))?;
if output.status.success() {
return Ok(String::from_utf8_lossy(&output.stdout).to_string());
}
let stderr: String = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout: String = String::from_utf8_lossy(&output.stdout).trim().to_string();
let detail: String = if !stderr.is_empty() { stderr } else { stdout };
if detail.is_empty() {
Err(format!(
"s3cmd failed with exit code {:?}",
output.status.code()
))
} else {
Err(detail)
}
}
fn escape_xml(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
fn method_is_supported(method: &str) -> bool {
matches!(
method,
"GET" | "HEAD" | "PUT" | "POST" | "DELETE" | "OPTIONS" | "PATCH"
)
}
fn build_cors_xml(rules: &[BucketCorsRuleInput]) -> Result<String, String> {
if rules.is_empty() {
return Err("At least one CORS rule is required".to_string());
}
let mut xml: String = String::from("<CORSConfiguration>\n");
for rule in rules {
let origins: Vec<String> = rule
.allowed_origins
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
if origins.is_empty() {
return Err("Each CORS rule must include at least one allowed origin".to_string());
}
let methods: Vec<String> = rule
.allowed_methods
.iter()
.map(|m| m.trim().to_ascii_uppercase())
.filter(|m| !m.is_empty())
.collect();
if methods.is_empty() {
return Err("Each CORS rule must include at least one allowed method".to_string());
}
for method in &methods {
if !method_is_supported(method) {
return Err(format!("Unsupported CORS method '{method}'"));
}
}
let mut headers: Vec<String> = rule
.allowed_headers
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
if headers.is_empty() {
headers.push("*".to_string());
}
let expose_headers: Vec<String> = rule
.expose_headers
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
xml.push_str(" <CORSRule>\n");
for origin in origins {
xml.push_str(&format!(
" <AllowedOrigin>{}</AllowedOrigin>\n",
escape_xml(&origin)
));
}
for method in methods {
xml.push_str(&format!(
" <AllowedMethod>{}</AllowedMethod>\n",
escape_xml(&method)
));
}
for header in headers {
xml.push_str(&format!(
" <AllowedHeader>{}</AllowedHeader>\n",
escape_xml(&header)
));
}
for header in expose_headers {
xml.push_str(&format!(
" <ExposeHeader>{}</ExposeHeader>\n",
escape_xml(&header)
));
}
if let Some(max_age_seconds) = rule.max_age_seconds {
xml.push_str(&format!(
" <MaxAgeSeconds>{max_age_seconds}</MaxAgeSeconds>\n"
));
}
xml.push_str(" </CORSRule>\n");
}
xml.push_str("</CORSConfiguration>\n");
Ok(xml)
}
fn bucket_create_uses_location_constraint(connection: &S3ConnectionConfig) -> bool {
let endpoint = connection.endpoint.to_ascii_lowercase();
endpoint.contains("amazonaws.com")
&& !connection.region.eq_ignore_ascii_case("us-east-1")
&& !connection.region.eq_ignore_ascii_case("auto")
}
async fn list_buckets(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ListBucketsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_connection(&body.connection) {
return resp;
}
let client: S3Client = build_connection_client(&body.connection).await;
match client.list_buckets().send().await {
Ok(output) => {
let buckets: Vec<StorageBucketSummary> = output
.buckets()
.iter()
.filter_map(|bucket| {
let name = bucket.name()?.to_string();
Some(StorageBucketSummary {
name,
creation_date: bucket
.creation_date()
.map(|value| value.to_string())
.unwrap_or_default(),
})
})
.collect();
let count = buckets.len();
tracing::info!(
bucket_count = count,
endpoint = %body.connection.endpoint,
"S3 list_buckets succeeded"
);
api_success(
"Buckets listed",
json!(StorageBucketListResponse { buckets, count }),
)
}
Err(err) => {
let detail = describe_s3_error("list_buckets", &err);
let response = errors::map_s3_operation_error(
"list_buckets",
"Failed to list buckets",
STORAGE_ERROR_CODE_STORAGE_LIST_BUCKETS_FAILED,
&err,
);
warn!(
endpoint = %body.connection.endpoint,
error = %detail,
"S3 list_buckets failed"
);
response
}
}
}
async fn create_bucket(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<S3Creds>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.bucket) {
return resp;
}
let client: S3Client = build_client(&body).await;
let mut request = client.create_bucket().bucket(&body.bucket);
if bucket_create_uses_location_constraint(&body.connection) {
request = request.create_bucket_configuration(
aws_sdk_s3::types::CreateBucketConfiguration::builder()
.location_constraint(aws_sdk_s3::types::BucketLocationConstraint::from(
body.connection.region.as_str(),
))
.build(),
);
}
match request.send().await {
Ok(_) => {
tracing::info!(
bucket = %body.bucket,
endpoint = %body.connection.endpoint,
region = %body.connection.region,
"S3 create_bucket succeeded"
);
api_success(
"Bucket created",
json!({
"bucket": body.bucket,
"region": body.connection.region,
}),
)
}
Err(err) => {
let detail = err.to_string();
warn!(bucket = %body.bucket, error = %detail, "S3 create_bucket failed");
if detail.contains("BucketAlreadyOwnedByYou") || detail.contains("BucketAlreadyExists")
{
return conflict("Bucket already exists", detail);
}
internal_error("Failed to create bucket", detail)
}
}
}
async fn delete_bucket(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<S3Creds>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.bucket) {
return resp;
}
let client: S3Client = build_client(&body).await;
match client.delete_bucket().bucket(&body.bucket).send().await {
Ok(_) => {
tracing::info!(bucket = %body.bucket, "S3 delete_bucket succeeded");
api_success("Bucket deleted", json!({ "bucket": body.bucket }))
}
Err(err) => {
let detail = err.to_string();
warn!(bucket = %body.bucket, error = %detail, "S3 delete_bucket failed");
if detail.contains("BucketNotEmpty") {
return conflict(
"Bucket is not empty",
"Delete or move the remaining objects before deleting the bucket",
);
}
if detail.contains("NoSuchBucket") {
return not_found("Bucket not found", detail);
}
internal_error("Failed to delete bucket", detail)
}
}
}
async fn list_objects(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ListObjectsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
let mut req: ListObjectsV2FluentBuilder = client.list_objects_v2().bucket(&body.creds.bucket);
if let Some(prefix) = &body.prefix {
req = req.prefix(prefix);
}
let delimiter: &str = body.delimiter.as_deref().unwrap_or("/");
req = req.delimiter(delimiter);
req = req.max_keys(body.max_keys);
if let Some(token) = &body.continuation_token {
req = req.continuation_token(token);
}
match req.send().await {
Ok(output) => {
let objects: Vec<S3Object> = output
.contents()
.iter()
.map(|obj| {
let key: String = obj.key().unwrap_or_default().to_string();
let is_folder: bool = key.ends_with('/');
S3Object {
key,
size: obj.size().unwrap_or(0),
last_modified: obj
.last_modified()
.map(|t| t.to_string())
.unwrap_or_default(),
is_folder,
etag: obj.e_tag().map(|s| s.to_string()),
storage_class: obj.storage_class().map(|s| s.as_str().to_string()),
}
})
.collect();
let common_prefixes: Vec<String> = output
.common_prefixes()
.iter()
.filter_map(|cp| cp.prefix().map(|s| s.to_string()))
.collect();
let resp: ListObjectsResponse = ListObjectsResponse {
objects,
common_prefixes,
is_truncated: output.is_truncated().unwrap_or(false),
next_continuation_token: output.next_continuation_token().map(|s| s.to_string()),
prefix: body.prefix.clone(),
};
api_success("Objects listed", json!(resp))
}
Err(err) => {
let detail = describe_s3_error("list_objects", &err);
warn!(bucket = %body.creds.bucket, error = %detail, "S3 list_objects_v2 failed");
map_s3_operation_error(
"list_objects",
"Failed to list objects",
STORAGE_ERROR_CODE_STORAGE_LIST_OBJECTS_FAILED,
&err,
)
}
}
}
async fn head_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<GetObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
match client
.head_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.send()
.await
{
Ok(output) => {
tracing::info!(
bucket = %body.creds.bucket,
key = %body.key,
"S3 head_object succeeded"
);
let metadata = output
.metadata()
.map(|map| {
let mut metadata = serde_json::Map::new();
for (key, value) in map {
metadata.insert(key.clone(), Value::String(value.clone()));
}
metadata
})
.unwrap_or_default();
api_success(
"Object metadata loaded",
json!(HeadObjectResponse {
bucket: body.creds.bucket.clone(),
key: body.key.clone(),
content_length: output.content_length().unwrap_or(0),
content_type: output.content_type().map(str::to_string),
cache_control: output.cache_control().map(str::to_string),
content_disposition: output.content_disposition().map(str::to_string),
content_encoding: output.content_encoding().map(str::to_string),
content_language: output.content_language().map(str::to_string),
last_modified: output
.last_modified()
.map(|value| value.to_string())
.unwrap_or_default(),
etag: output.e_tag().map(str::to_string),
version_id: output.version_id().map(str::to_string),
storage_class: output
.storage_class()
.map(|value| value.as_str().to_string()),
metadata,
}),
)
}
Err(err) => {
let detail = err.to_string();
warn!(
bucket = %body.creds.bucket,
key = %body.key,
error = %detail,
"S3 head_object failed"
);
if detail.contains("NotFound") || detail.contains("NoSuchKey") {
return not_found("Object not found", detail);
}
internal_error("Failed to load object metadata", detail)
}
}
}
async fn update_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<UpdateObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let key = body.key.trim();
if key.is_empty() {
return bad_request("Key is required", "key field is empty");
}
if validation::scanner_junk_in_storage_field(key) {
return bad_request("Invalid key", "key contains unsupported characters");
}
let requested_metadata = match normalize_metadata_input(body.metadata.as_ref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let requested_content_type = normalize_optional_string(body.content_type.as_deref());
let requested_cache_control = normalize_optional_string(body.cache_control.as_deref());
let requested_content_disposition =
normalize_optional_string(body.content_disposition.as_deref());
let requested_content_encoding = normalize_optional_string(body.content_encoding.as_deref());
let requested_content_language = normalize_optional_string(body.content_language.as_deref());
let requested_acl = match normalize_optional_string(body.acl.as_deref()) {
Some(raw_acl) => match parse_object_acl(&raw_acl) {
Ok(parsed) => Some(parsed),
Err(resp) => return resp,
},
None => None,
};
let metadata_update_requested = requested_metadata.is_some()
|| requested_content_type.is_some()
|| requested_cache_control.is_some()
|| requested_content_disposition.is_some()
|| requested_content_encoding.is_some()
|| requested_content_language.is_some();
if !metadata_update_requested && requested_acl.is_none() {
return bad_request(
"No updates provided",
"Provide at least one of acl, metadata, content_type, cache_control, content_disposition, content_encoding, or content_language",
);
}
let tracking_context =
match prepare_storage_update_tracking(&req, app_state.get_ref(), &body, key).await {
Ok(context) => context,
Err(resp) => return resp,
};
let client: S3Client = build_client(&body.creds).await;
if metadata_update_requested {
let existing_acl_policy = if requested_acl.is_none() {
match client
.get_object_acl()
.bucket(&body.creds.bucket)
.key(key)
.send()
.await
{
Ok(output) => Some(
aws_sdk_s3::types::AccessControlPolicy::builder()
.set_owner(output.owner().cloned())
.set_grants(Some(output.grants().to_vec()))
.build(),
),
Err(err) => {
warn!(
bucket = %body.creds.bucket,
key = %key,
error = %err,
"S3 get_object_acl failed before metadata update; proceeding without ACL restore"
);
None
}
}
} else {
None
};
let head = match client
.head_object()
.bucket(&body.creds.bucket)
.key(key)
.send()
.await
{
Ok(output) => output,
Err(err) => {
let detail = err.to_string();
if detail.contains("NotFound") || detail.contains("NoSuchKey") {
best_effort_storage_update_tracking(
tracking_context.as_ref(),
athena_s3::StorageFileMutationStatus::Failed,
format!("Object metadata update failed: {detail}"),
None,
None,
json!({}),
json!({
"stage": "head_object",
"error": detail.clone(),
}),
)
.await;
return not_found("Object not found", detail);
}
best_effort_storage_update_tracking(
tracking_context.as_ref(),
athena_s3::StorageFileMutationStatus::Failed,
format!("Object metadata update failed: {detail}"),
None,
None,
json!({}),
json!({
"stage": "head_object",
"error": detail.clone(),
}),
)
.await;
return internal_error("Failed to load object metadata", detail);
}
};
let metadata_for_copy: HashMap<String, String> = match requested_metadata {
Some(metadata) => metadata,
None => head.metadata().cloned().unwrap_or_default(),
};
let content_type_for_copy =
requested_content_type.or_else(|| head.content_type().map(str::to_string));
let cache_control_for_copy =
requested_cache_control.or_else(|| head.cache_control().map(str::to_string));
let content_disposition_for_copy = requested_content_disposition
.or_else(|| head.content_disposition().map(str::to_string));
let content_encoding_for_copy =
requested_content_encoding.or_else(|| head.content_encoding().map(str::to_string));
let content_language_for_copy =
requested_content_language.or_else(|| head.content_language().map(str::to_string));
let updated_metadata_keys = metadata_for_copy.len();
let metadata_json = json!(metadata_for_copy.clone());
let size_bytes = Some(head.content_length().unwrap_or(0));
let mut copy_req: CopyObjectFluentBuilder = client
.copy_object()
.bucket(&body.creds.bucket)
.copy_source(format!("{}/{}", body.creds.bucket, key))
.key(key)
.metadata_directive(MetadataDirective::Replace)
.set_metadata(Some(metadata_for_copy.clone()));
if let Some(content_type) = content_type_for_copy.clone() {
copy_req = copy_req.content_type(content_type);
}
if let Some(cache_control) = cache_control_for_copy {
copy_req = copy_req.cache_control(cache_control);
}
if let Some(content_disposition) = content_disposition_for_copy {
copy_req = copy_req.content_disposition(content_disposition);
}
if let Some(content_encoding) = content_encoding_for_copy {
copy_req = copy_req.content_encoding(content_encoding);
}
if let Some(content_language) = content_language_for_copy {
copy_req = copy_req.content_language(content_language);
}
if let Some(acl) = requested_acl.clone() {
copy_req = copy_req.acl(acl);
}
if let Err(err) = copy_req.send().await {
let detail = err.to_string();
warn!(
bucket = %body.creds.bucket,
key = %key,
error = %detail,
"S3 copy_object failed while updating metadata"
);
best_effort_storage_update_tracking(
tracking_context.as_ref(),
athena_s3::StorageFileMutationStatus::Failed,
format!("Object metadata update failed: {detail}"),
content_type_for_copy.clone(),
size_bytes,
metadata_json.clone(),
json!({
"stage": "copy_object",
"error": detail.clone(),
}),
)
.await;
return internal_error("Failed to update object metadata", detail);
}
let mut warning: Option<String> = None;
if requested_acl.is_none() {
if let Some(policy) = existing_acl_policy {
if let Err(err) = client
.put_object_acl()
.bucket(&body.creds.bucket)
.key(key)
.access_control_policy(policy)
.send()
.await
{
let detail = err.to_string();
if detail.contains("AccessControlListNotSupported") {
warning = Some(
"ACL restore skipped because the bucket has ACLs disabled".to_string(),
);
} else {
warning = Some(format!("Failed to restore previous ACL: {detail}"));
}
}
}
}
let response_body = json!(UpdateObjectResponse {
bucket: body.creds.bucket.clone(),
key: key.to_string(),
updated_metadata_keys,
applied_acl: requested_acl.map(|value| value.as_str().to_string()),
warning: warning.clone(),
});
if let Some(context) = tracking_context.as_ref() {
let message = match warning.as_ref() {
Some(warning) => format!("Object metadata updated ({warning})"),
None => "Object metadata updated".to_string(),
};
if let Err(err) = persist_storage_update_tracking(
context,
athena_s3::StorageFileMutationStatus::Success,
message,
content_type_for_copy,
size_bytes,
metadata_json,
response_body.clone(),
)
.await
{
return internal_error("Object updated but Athena file tracking failed", err);
}
}
return api_success("Object metadata updated", response_body);
}
let Some(acl) = requested_acl else {
return bad_request(
"ACL is required",
"Provide a valid acl when no metadata headers are being updated",
);
};
match client
.put_object_acl()
.bucket(&body.creds.bucket)
.key(key)
.acl(acl.clone())
.send()
.await
{
Ok(_) => {
let response_body = json!(UpdateObjectResponse {
bucket: body.creds.bucket.clone(),
key: key.to_string(),
updated_metadata_keys: 0,
applied_acl: Some(acl.as_str().to_string()),
warning: None,
});
if let Some(context) = tracking_context.as_ref() {
if let Err(err) = persist_storage_update_tracking(
context,
athena_s3::StorageFileMutationStatus::Success,
"Object ACL updated",
None,
None,
json!({}),
response_body.clone(),
)
.await
{
return internal_error("Object updated but Athena file tracking failed", err);
}
}
api_success("Object ACL updated", response_body)
}
Err(err) => {
let detail = err.to_string();
warn!(
bucket = %body.creds.bucket,
key = %key,
error = %detail,
"S3 put_object_acl failed"
);
best_effort_storage_update_tracking(
tracking_context.as_ref(),
athena_s3::StorageFileMutationStatus::Failed,
format!("Object ACL update failed: {detail}"),
None,
None,
json!({}),
json!({
"stage": "put_object_acl",
"error": detail.clone(),
"applied_acl": acl.as_str(),
}),
)
.await;
internal_error("Failed to update object ACL", detail)
}
}
}
async fn get_bucket_cors(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketCorsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.get_bucket_cors()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(output) => {
let mapped_rules: Vec<BucketCorsRuleInput> = output
.cors_rules()
.iter()
.map(|rule: &aws_sdk_s3::types::CorsRule| BucketCorsRuleInput {
allowed_origins: rule.allowed_origins().to_vec(),
allowed_methods: rule
.allowed_methods()
.iter()
.map(|method| method.as_str().to_string())
.collect::<Vec<String>>(),
allowed_headers: rule.allowed_headers().to_vec(),
expose_headers: rule.expose_headers().to_vec(),
max_age_seconds: rule
.max_age_seconds()
.and_then(|value| u32::try_from(value).ok()),
})
.collect();
let cors_xml: String = match build_cors_xml(&mapped_rules) {
Ok(value) => value,
Err(err) => {
return internal_error("Failed to parse bucket CORS", err);
}
};
api_success(
"Bucket CORS loaded",
json!({
"bucket": body.creds.bucket,
"cors_xml": cors_xml,
}),
)
}
Err(err) => {
let detail: String = match &err {
SdkError::ServiceError(se) => {
let inner = se.err();
let mut s: String = inner.to_string();
if s.len() < 32
|| s.eq_ignore_ascii_case("service error")
|| s == "unknown error"
{
s = format!("{s} ({inner:?})");
}
if let Some(rid) = inner.meta().request_id() {
s.push_str(&format!(" [request_id={rid}]"));
}
s
}
_ => err.to_string(),
};
if detail.contains("NoSuchCORSConfiguration") {
return api_success(
"Bucket CORS loaded",
json!({
"bucket": body.creds.bucket,
"cors_xml": "",
}),
);
}
warn!(bucket = %body.creds.bucket, error = %detail, "S3 get_bucket_cors failed");
internal_error("Failed to fetch bucket CORS", detail)
}
}
}
async fn set_bucket_cors(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<SetBucketCorsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let cors_xml: String = match build_cors_xml(&body.rules) {
Ok(xml) => xml,
Err(err) => return bad_request("Invalid CORS rules", err),
};
let tmp_path: PathBuf = std::env::temp_dir().join(format!(
"athena-cors-{}-{}.xml",
body.creds.bucket.trim(),
uuid::Uuid::new_v4()
));
let _tmp_cleanup: TempPathCleanup = TempPathCleanup::new(tmp_path.clone());
if let Err(err) = fs::write(&tmp_path, cors_xml).await {
return internal_error("Failed to prepare CORS config", err.to_string());
}
let args: Vec<String> = vec![
"setcors".to_string(),
tmp_path.to_string_lossy().to_string(),
format!("s3://{}", body.creds.bucket.trim()),
];
let result: Result<String, String> = run_s3cmd(&body.creds, &args).await;
match result {
Ok(_) => api_success(
"Bucket CORS updated",
json!({
"bucket": body.creds.bucket,
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "s3cmd setcors failed");
internal_error("Failed to update bucket CORS", err)
}
}
}
async fn delete_bucket_cors(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketCorsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let args: Vec<String> = vec![
"delcors".to_string(),
format!("s3://{}", body.creds.bucket.trim()),
];
match run_s3cmd(&body.creds, &args).await {
Ok(_) => api_success(
"Bucket CORS removed",
json!({
"bucket": body.creds.bucket,
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "s3cmd delcors failed");
internal_error("Failed to remove bucket CORS", err)
}
}
}
async fn get_object_url(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<GetObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let presign_config: PresigningConfig =
match PresigningConfig::expires_in(Duration::from_secs(PRESIGN_URL_EXPIRY_SECS)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
match client
.get_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.presigned(presign_config)
.await
{
Ok(presigned) => api_success(
"Presigned URL generated",
json!({ "url": presigned.uri(), "expires_in": PRESIGN_URL_EXPIRY_SECS }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign get_object failed");
internal_error("Failed to generate presigned URL", err.to_string())
}
}
}
async fn object_exists(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<GetObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.key) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.head_object()
.bucket(&body.creds.bucket)
.key(body.key.trim())
.send()
.await
{
Ok(output) => api_success(
"Object existence checked",
json!({
"bucket": body.creds.bucket,
"key": body.key.trim(),
"exists": true,
"content_length": output.content_length().unwrap_or(0),
"etag": output.e_tag(),
"version_id": output.version_id(),
}),
),
Err(err) => {
let detail = err.to_string();
if is_object_not_found(&detail) {
return api_success(
"Object existence checked",
json!({
"bucket": body.creds.bucket,
"key": body.key.trim(),
"exists": false,
}),
);
}
warn!(
bucket = %body.creds.bucket,
key = %body.key,
error = %detail,
"S3 head_object failed while checking object existence"
);
internal_error("Failed to check object existence", detail)
}
}
}
async fn validate_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<GetObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.key) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.head_object()
.bucket(&body.creds.bucket)
.key(body.key.trim())
.send()
.await
{
Ok(output) => {
let expected_sha256 = body
.checksum_sha256
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let expected_etag = body
.etag
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| value.trim_matches('"').to_string());
let actual_etag = output
.e_tag()
.map(|value| value.trim_matches('"').to_string());
let sha256_matches = expected_sha256.map(|expected| {
output
.checksum_sha256()
.map(|actual| actual == expected)
.unwrap_or(false)
});
let etag_matches = expected_etag.as_deref().map(|expected| {
actual_etag
.as_deref()
.map(|actual| actual == expected)
.unwrap_or(false)
});
let checksum_ok = sha256_matches.unwrap_or(true) && etag_matches.unwrap_or(true);
api_success(
"Object validated",
json!({
"valid": checksum_ok,
"status": if checksum_ok { "ok" } else { "checksum_mismatch" },
"bucket": body.creds.bucket,
"key": body.key.trim(),
"checks": {
"endpoint": true,
"region": true,
"bucket": true,
"key": true,
"signing": true,
"head_object": true,
"checksum_sha256": sha256_matches,
"etag": etag_matches,
},
"object": {
"content_length": output.content_length().unwrap_or(0),
"etag": output.e_tag(),
"checksum_sha256": output.checksum_sha256(),
"last_modified": output.last_modified().map(|value| value.to_string()),
"version_id": output.version_id(),
}
}),
)
}
Err(err) => {
let detail = err.to_string();
if is_object_not_found(&detail) {
return api_success(
"Object validated",
json!({
"valid": false,
"status": "not_found",
"bucket": body.creds.bucket,
"key": body.key.trim(),
"checks": {
"endpoint": true,
"region": true,
"bucket": true,
"key": true,
"signing": true,
"head_object": false,
},
"error": detail,
}),
);
}
warn!(
bucket = %body.creds.bucket,
key = %body.key,
error = %detail,
"S3 head_object failed while validating object"
);
internal_error("Failed to validate object", detail)
}
}
}
async fn copy_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<CopyObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.source_key) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.destination_key) {
return resp;
}
if body.source_key.trim() == body.destination_key.trim()
&& body
.destination_bucket
.as_deref()
.unwrap_or(&body.creds.bucket)
== body.creds.bucket
{
return bad_request(
"Invalid destination",
"destination bucket/key must differ from source bucket/key",
);
}
let destination_bucket = body
.destination_bucket
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(&body.creds.bucket)
.to_string();
if let Err(resp) = ensure_bucket_present(&destination_bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
let request = match apply_sse_to_copy(
client
.copy_object()
.bucket(&destination_bucket)
.copy_source(s3_copy_source(
&body.creds.bucket,
body.source_key.trim(),
None,
))
.key(body.destination_key.trim()),
&body.encryption,
) {
Ok(request) => request,
Err(resp) => return resp,
};
match request.send().await {
Ok(output) => api_success(
"Object copied",
json!({
"source_bucket": body.creds.bucket,
"source_key": body.source_key.trim(),
"destination_bucket": destination_bucket,
"destination_key": body.destination_key.trim(),
"etag": output.copy_object_result().and_then(|value| value.e_tag()),
"last_modified": output.copy_object_result().and_then(|value| value.last_modified()).map(|value| value.to_string()),
}),
),
Err(err) => {
let detail = err.to_string();
warn!(
source_bucket = %body.creds.bucket,
source_key = %body.source_key,
destination_bucket = %destination_bucket,
destination_key = %body.destination_key,
error = %detail,
"S3 copy_object failed"
);
internal_error("Failed to copy object", detail)
}
}
}
async fn get_object_public_url(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ObjectPublicUrlRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.key) {
return resp;
}
let url = match build_public_object_url(
&body.creds.connection.endpoint,
&body.creds.bucket,
body.key.trim(),
body.public_base_url.as_deref(),
body.force_path_style.unwrap_or(false),
) {
Ok(url) => url,
Err(resp) => return resp,
};
api_success(
"Public object URL computed",
json!({
"bucket": body.creds.bucket,
"key": body.key.trim(),
"url": url,
"force_path_style": body.force_path_style.unwrap_or(false),
"public_base_url": body.public_base_url,
}),
)
}
async fn list_object_versions(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ObjectVersionsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(key) = body.key.as_deref() {
if let Some(resp) = key_missing_or_invalid(key) {
return resp;
}
}
let client: S3Client = build_client(&body.creds).await;
let mut request = client
.list_object_versions()
.bucket(&body.creds.bucket)
.max_keys(body.max_keys.unwrap_or(default_max_keys()).clamp(1, 1000));
if let Some(prefix) = body
.key
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.prefix(prefix);
}
if let Some(marker) = body
.key_marker
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.key_marker(marker);
}
if let Some(marker) = body
.version_id_marker
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.version_id_marker(marker);
}
if let Some(delimiter) = body
.delimiter
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.delimiter(delimiter);
}
match request.send().await {
Ok(output) => api_success(
"Object versions loaded",
json!({
"bucket": body.creds.bucket,
"prefix": output.prefix(),
"is_truncated": output.is_truncated().unwrap_or(false),
"next_key_marker": output.next_key_marker(),
"next_version_id_marker": output.next_version_id_marker(),
"versions": output.versions().iter().map(object_version_to_json).collect::<Vec<_>>(),
"delete_markers": output.delete_markers().iter().map(delete_marker_to_json).collect::<Vec<_>>(),
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 list_object_versions failed");
internal_error("Failed to list object versions", err.to_string())
}
}
}
async fn restore_object_version(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ObjectVersionMutationRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.key) {
return resp;
}
if body.version_id.trim().is_empty() {
return bad_request("Version id is required", "version_id field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let copy_source = s3_copy_source(
&body.creds.bucket,
body.key.trim(),
Some(body.version_id.trim()),
);
match client
.copy_object()
.bucket(&body.creds.bucket)
.key(body.key.trim())
.copy_source(copy_source)
.metadata_directive(MetadataDirective::Copy)
.send()
.await
{
Ok(output) => api_success(
"Object version restored",
json!({
"bucket": body.creds.bucket,
"key": body.key.trim(),
"version_id": body.version_id.trim(),
"restored_version_id": output.version_id(),
"etag": output.copy_object_result().and_then(|value| value.e_tag()),
"last_modified": output.copy_object_result().and_then(|value| value.last_modified()).map(|value| value.to_string()),
}),
),
Err(err) => {
warn!(
bucket = %body.creds.bucket,
key = %body.key,
version_id = %body.version_id,
error = %err,
"S3 copy_object failed while restoring object version"
);
internal_error("Failed to restore object version", err.to_string())
}
}
}
async fn delete_object_version(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ObjectVersionMutationRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.key) {
return resp;
}
if body.version_id.trim().is_empty() {
return bad_request("Version id is required", "version_id field is empty");
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_object()
.bucket(&body.creds.bucket)
.key(body.key.trim())
.version_id(body.version_id.trim())
.send()
.await
{
Ok(output) => api_success(
"Object version deleted",
json!({
"bucket": body.creds.bucket,
"key": body.key.trim(),
"version_id": body.version_id.trim(),
"delete_marker": output.delete_marker(),
"delete_marker_version_id": output.version_id(),
}),
),
Err(err) => {
warn!(
bucket = %body.creds.bucket,
key = %body.key,
version_id = %body.version_id,
error = %err,
"S3 delete_object failed while deleting object version"
);
internal_error("Failed to delete object version", err.to_string())
}
}
}
async fn get_bucket_lifecycle(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketLifecycleRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.get_bucket_lifecycle_configuration()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(output) => api_success(
"Bucket lifecycle loaded",
json!({
"bucket": body.creds.bucket,
"rules": output.rules().iter().map(lifecycle_rule_to_json).collect::<Vec<_>>(),
}),
),
Err(err) => {
let detail = err.to_string();
if detail.contains("NoSuchLifecycleConfiguration") {
return api_success(
"Bucket lifecycle loaded",
json!({ "bucket": body.creds.bucket, "rules": [] }),
);
}
warn!(bucket = %body.creds.bucket, error = %detail, "S3 get_bucket_lifecycle_configuration failed");
internal_error("Failed to fetch bucket lifecycle", detail)
}
}
}
async fn set_bucket_lifecycle(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<SetBucketLifecycleRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.rules.is_empty() {
return bad_request("Invalid lifecycle rules", "rules array must not be empty");
}
let mut rules = Vec::with_capacity(body.rules.len());
for rule in &body.rules {
match build_lifecycle_rule(rule) {
Ok(rule) => rules.push(rule),
Err(resp) => return resp,
}
}
let lifecycle_configuration = match BucketLifecycleConfiguration::builder()
.set_rules(Some(rules))
.build()
{
Ok(config) => config,
Err(err) => return bad_request("Invalid lifecycle configuration", err.to_string()),
};
let client: S3Client = build_client(&body.creds).await;
match client
.put_bucket_lifecycle_configuration()
.bucket(&body.creds.bucket)
.lifecycle_configuration(lifecycle_configuration)
.send()
.await
{
Ok(_) => api_success(
"Bucket lifecycle updated",
json!({ "bucket": body.creds.bucket, "rule_count": body.rules.len() }),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 put_bucket_lifecycle_configuration failed");
internal_error("Failed to update bucket lifecycle", err.to_string())
}
}
}
async fn delete_bucket_lifecycle(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketLifecycleRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_bucket_lifecycle()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(_) => api_success(
"Bucket lifecycle removed",
json!({ "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 delete_bucket_lifecycle failed");
internal_error("Failed to delete bucket lifecycle", err.to_string())
}
}
}
async fn get_bucket_policy(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketPolicyRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.get_bucket_policy()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(output) => api_success(
"Bucket policy loaded",
json!({
"bucket": body.creds.bucket,
"policy": output.policy(),
"policy_json": output.policy().and_then(|value| serde_json::from_str::<Value>(value).ok()),
}),
),
Err(err) => {
let detail = err.to_string();
if detail.contains("NoSuchBucketPolicy") {
return api_success(
"Bucket policy loaded",
json!({ "bucket": body.creds.bucket, "policy": null, "policy_json": null }),
);
}
warn!(bucket = %body.creds.bucket, error = %detail, "S3 get_bucket_policy failed");
internal_error("Failed to fetch bucket policy", detail)
}
}
}
async fn set_bucket_policy(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<SetBucketPolicyRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let policy = match &body.policy {
Value::String(value) if !value.trim().is_empty() => value.trim().to_string(),
Value::Object(_) => body.policy.to_string(),
_ => {
return bad_request(
"Invalid bucket policy",
"policy must be a non-empty JSON object or JSON string",
);
}
};
let client: S3Client = build_client(&body.creds).await;
match client
.put_bucket_policy()
.bucket(&body.creds.bucket)
.policy(policy)
.send()
.await
{
Ok(_) => api_success(
"Bucket policy updated",
json!({ "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 put_bucket_policy failed");
internal_error("Failed to update bucket policy", err.to_string())
}
}
}
async fn delete_bucket_policy(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketPolicyRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_bucket_policy()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(_) => api_success(
"Bucket policy removed",
json!({ "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 delete_bucket_policy failed");
internal_error("Failed to delete bucket policy", err.to_string())
}
}
}
async fn get_public_access_block(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<PublicAccessBlockRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.get_public_access_block()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(output) => api_success(
"Bucket public access block loaded",
json!({
"bucket": body.creds.bucket,
"public_access_block": output.public_access_block_configuration().map(public_access_block_to_json),
}),
),
Err(err) => {
let detail = err.to_string();
if detail.contains("NoSuchPublicAccessBlockConfiguration") {
return api_success(
"Bucket public access block loaded",
json!({ "bucket": body.creds.bucket, "public_access_block": null }),
);
}
warn!(bucket = %body.creds.bucket, error = %detail, "S3 get_public_access_block failed");
internal_error("Failed to fetch bucket public access block", detail)
}
}
}
async fn set_public_access_block(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<SetPublicAccessBlockRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let config = PublicAccessBlockConfiguration::builder()
.set_block_public_acls(body.block_public_acls)
.set_ignore_public_acls(body.ignore_public_acls)
.set_block_public_policy(body.block_public_policy)
.set_restrict_public_buckets(body.restrict_public_buckets)
.build();
let client: S3Client = build_client(&body.creds).await;
match client
.put_public_access_block()
.bucket(&body.creds.bucket)
.public_access_block_configuration(config)
.send()
.await
{
Ok(_) => api_success(
"Bucket public access block updated",
json!({ "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 put_public_access_block failed");
internal_error(
"Failed to update bucket public access block",
err.to_string(),
)
}
}
}
async fn delete_public_access_block(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<PublicAccessBlockRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_public_access_block()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(_) => api_success(
"Bucket public access block removed",
json!({ "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 delete_public_access_block failed");
internal_error(
"Failed to delete bucket public access block",
err.to_string(),
)
}
}
}
async fn create_signed_post_policy(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<SignedPostPolicyRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if let Some(resp) = key_missing_or_invalid(&body.key) {
return resp;
}
let expires_in = body
.expires_in
.unwrap_or(PRESIGN_URL_EXPIRY_SECS)
.clamp(1, 86_400);
let min_size = body.min_size.unwrap_or(0).max(0);
let max_size = body.max_size.unwrap_or(5 * 1024 * 1024 * 1024);
if max_size < min_size {
return bad_request(
"Invalid content length range",
"max_size must be greater than or equal to min_size",
);
}
let action = match build_bucket_action_url(
&body.creds.connection.endpoint,
&body.creds.bucket,
body.public_base_url.as_deref(),
body.force_path_style.unwrap_or(false),
) {
Ok(value) => value,
Err(resp) => return resp,
};
let sse = match normalize_sse_value(&body.encryption) {
Ok(value) => value,
Err(resp) => return resp,
};
let now = Utc::now();
let date = now.format("%Y%m%d").to_string();
let amz_date = now.format("%Y%m%dT%H%M%SZ").to_string();
let expiration = (now + chrono::Duration::seconds(expires_in as i64))
.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let credential = format!(
"{}/{}/{}/s3/aws4_request",
body.creds.connection.access_key_id, date, body.creds.connection.region
);
let mut fields = serde_json::Map::new();
fields.insert(
"key".to_string(),
Value::String(body.key.trim().to_string()),
);
fields.insert(
"bucket".to_string(),
Value::String(body.creds.bucket.trim().to_string()),
);
fields.insert(
"X-Amz-Algorithm".to_string(),
Value::String("AWS4-HMAC-SHA256".to_string()),
);
fields.insert(
"X-Amz-Credential".to_string(),
Value::String(credential.clone()),
);
fields.insert("X-Amz-Date".to_string(), Value::String(amz_date.clone()));
if let Some(status) = body
.success_action_status
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
fields.insert(
"success_action_status".to_string(),
Value::String(status.to_string()),
);
}
if let Some(content_type) = body
.content_type
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
fields.insert(
"Content-Type".to_string(),
Value::String(content_type.to_string()),
);
}
if let Some(sse) = sse.as_ref() {
fields.insert(
"x-amz-server-side-encryption".to_string(),
Value::String(sse.as_str().to_string()),
);
}
if let Some(key_id) = body
.encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
fields.insert(
"x-amz-server-side-encryption-aws-kms-key-id".to_string(),
Value::String(key_id.to_string()),
);
}
if let Some(enabled) = body.encryption.bucket_key_enabled {
fields.insert(
"x-amz-server-side-encryption-bucket-key-enabled".to_string(),
Value::String(enabled.to_string()),
);
}
let mut conditions = vec![
json!({ "bucket": body.creds.bucket.trim() }),
json!(["eq", "$key", body.key.trim()]),
json!({ "X-Amz-Algorithm": "AWS4-HMAC-SHA256" }),
json!({ "X-Amz-Credential": credential }),
json!({ "X-Amz-Date": amz_date }),
json!(["content-length-range", min_size, max_size]),
];
if let Some(status) = fields.get("success_action_status") {
conditions.push(json!({ "success_action_status": status }));
}
if let Some(content_type) = fields.get("Content-Type") {
conditions.push(json!({ "Content-Type": content_type }));
}
if let Some(value) = fields.get("x-amz-server-side-encryption") {
conditions.push(json!({ "x-amz-server-side-encryption": value }));
}
if let Some(value) = fields.get("x-amz-server-side-encryption-aws-kms-key-id") {
conditions.push(json!({ "x-amz-server-side-encryption-aws-kms-key-id": value }));
}
if let Some(value) = fields.get("x-amz-server-side-encryption-bucket-key-enabled") {
conditions.push(json!({ "x-amz-server-side-encryption-bucket-key-enabled": value }));
}
let policy = json!({
"expiration": expiration,
"conditions": conditions,
});
let encoded_policy = BASE64_STANDARD.encode(policy.to_string());
let signature = signed_post_signature(
&body.creds.connection.secret_key,
&date,
&body.creds.connection.region,
&encoded_policy,
);
fields.insert("Policy".to_string(), Value::String(encoded_policy.clone()));
fields.insert("X-Amz-Signature".to_string(), Value::String(signature));
api_success(
"Signed POST policy generated",
json!({
"url": action,
"method": "POST",
"bucket": body.creds.bucket,
"key": body.key.trim(),
"fields": Value::Object(fields),
"policy": encoded_policy,
"expires_in": expires_in,
"expires_at": expiration,
"conditions": conditions,
}),
)
}
async fn delete_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<DeleteObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.send()
.await
{
Ok(_) => api_success(
"Object deleted",
json!({ "key": body.key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 delete_object failed");
internal_error("Failed to delete object", err.to_string())
}
}
}
async fn create_folder(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<CreateFolderRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let prefix: &str = body.prefix.trim().trim_end_matches('/');
if prefix.is_empty() {
return bad_request("Prefix is required", "prefix field is empty");
}
let folder_key: String = format!("{}/", prefix);
let client: S3Client = build_client(&body.creds).await;
match client
.put_object()
.bucket(&body.creds.bucket)
.key(&folder_key)
.body(aws_sdk_s3::primitives::ByteStream::from_static(b""))
.send()
.await
{
Ok(_) => api_success(
"Folder created",
json!({ "key": folder_key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %folder_key, error = %err, "S3 put_object (create folder) failed");
internal_error("Failed to create folder", err.to_string())
}
}
}
fn normalize_folder_prefix(prefix: &str) -> Option<String> {
let trimmed: &str = prefix.trim().trim_matches('/');
if trimmed.is_empty() {
return None;
}
Some(format!("{trimmed}/"))
}
async fn list_all_keys_for_prefix(
client: &S3Client,
bucket: &str,
prefix: &str,
) -> Result<Vec<String>, String> {
let mut all_keys: Vec<String> = Vec::new();
let mut continuation: Option<String> = None;
loop {
let mut req = client
.list_objects_v2()
.bucket(bucket)
.prefix(prefix)
.max_keys(1000);
if let Some(token) = continuation.as_deref() {
req = req.continuation_token(token);
}
let output = req.send().await.map_err(|err| err.to_string())?;
for obj in output.contents() {
if let Some(key) = obj.key() {
all_keys.push(key.to_string());
}
}
if output.is_truncated().unwrap_or(false) {
continuation = output.next_continuation_token().map(|v| v.to_string());
if continuation.is_none() {
break;
}
} else {
break;
}
}
Ok(all_keys)
}
async fn delete_keys_in_batches(
client: &S3Client,
bucket: &str,
keys: &[String],
) -> Result<(), String> {
for chunk in keys.chunks(1000) {
let objects: Vec<aws_sdk_s3::types::ObjectIdentifier> = chunk
.iter()
.map(|key| {
aws_sdk_s3::types::ObjectIdentifier::builder()
.key(key)
.build()
})
.collect::<Result<Vec<_>, _>>()
.map_err(|err| err.to_string())?;
let delete = aws_sdk_s3::types::Delete::builder()
.set_objects(Some(objects))
.quiet(true)
.build()
.map_err(|err| err.to_string())?;
client
.delete_objects()
.bucket(bucket)
.delete(delete)
.send()
.await
.map_err(|err| err.to_string())?;
}
Ok(())
}
async fn delete_folder(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<DeleteFolderRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let Some(prefix) = normalize_folder_prefix(&body.prefix) else {
return bad_request("Prefix is required", "prefix field is empty");
};
let client: S3Client = build_client(&body.creds).await;
let keys: Vec<String> =
match list_all_keys_for_prefix(&client, &body.creds.bucket, &prefix).await {
Ok(v) => v,
Err(err) => return internal_error("Failed to list folder objects", err),
};
if keys.is_empty() {
return not_found(
"Folder not found",
format!("No objects found for prefix {prefix}"),
);
}
match delete_keys_in_batches(&client, &body.creds.bucket, &keys).await {
Ok(_) => api_success(
"Folder deleted",
json!({ "prefix": prefix, "bucket": body.creds.bucket, "deleted_count": keys.len() }),
),
Err(err) => internal_error("Failed to delete folder", err),
}
}
async fn rename_folder(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<RenameFolderRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let Some(from_prefix) = normalize_folder_prefix(&body.from_prefix) else {
return bad_request("from_prefix is required", "from_prefix field is empty");
};
let Some(to_prefix) = normalize_folder_prefix(&body.to_prefix) else {
return bad_request("to_prefix is required", "to_prefix field is empty");
};
if from_prefix == to_prefix {
return bad_request(
"Rename requires a different destination",
"from_prefix and to_prefix must differ",
);
}
if to_prefix.starts_with(&from_prefix) {
return bad_request(
"Invalid destination prefix",
"to_prefix cannot be nested under from_prefix",
);
}
let client: S3Client = build_client(&body.creds).await;
let keys: Vec<String> =
match list_all_keys_for_prefix(&client, &body.creds.bucket, &from_prefix).await {
Ok(v) => v,
Err(err) => return internal_error("Failed to list source folder objects", err),
};
if keys.is_empty() {
return not_found(
"Folder not found",
format!("No objects found for prefix {from_prefix}"),
);
}
for key in &keys {
let suffix = match key.strip_prefix(&from_prefix) {
Some(v) => v,
None => continue,
};
let new_key = format!("{to_prefix}{suffix}");
let copy_source = format!("{}/{}", body.creds.bucket, key);
if let Err(err) = client
.copy_object()
.bucket(&body.creds.bucket)
.copy_source(copy_source)
.key(&new_key)
.send()
.await
{
warn!(
from_key = %key,
to_key = %new_key,
error = %err,
"S3 copy_object (rename folder) failed"
);
return internal_error("Failed to rename folder", err.to_string());
}
}
match delete_keys_in_batches(&client, &body.creds.bucket, &keys).await {
Ok(_) => api_success(
"Folder renamed",
json!({
"bucket": body.creds.bucket,
"from_prefix": from_prefix,
"to_prefix": to_prefix,
"moved_count": keys.len()
}),
),
Err(err) => internal_error("Failed to finalize folder rename", err),
}
}
async fn presign_upload(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<PresignUploadRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let presign_config: PresigningConfig =
match PresigningConfig::expires_in(Duration::from_secs(PRESIGN_URL_EXPIRY_SECS)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
let mut req: PutObjectFluentBuilder = client
.put_object()
.bucket(&body.creds.bucket)
.key(&body.key);
if let Some(ct) = &body.content_type {
req = req.content_type(ct);
}
req = match apply_sse_to_put(req, &body.encryption) {
Ok(req) => req,
Err(resp) => return resp,
};
match req.presigned(presign_config).await {
Ok(presigned) => api_success(
"Upload URL generated",
json!({
"url": presigned.uri(),
"method": "PUT",
"key": body.key,
"expires_in": PRESIGN_URL_EXPIRY_SECS,
"headers": presigned_headers_to_json(presigned.headers()),
}),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign put_object failed");
internal_error("Failed to generate upload URL", err.to_string())
}
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/storage")
.wrap(actix_web::middleware::from_fn(storage_auth_middleware))
.configure(storage_catalog::configure_storage_routes)
.route("/buckets/list", web::post().to(list_buckets))
.route("/buckets/create", web::post().to(create_bucket))
.route("/buckets/delete", web::post().to(delete_bucket))
.route("/buckets/lifecycle", web::post().to(get_bucket_lifecycle))
.route(
"/buckets/lifecycle/set",
web::post().to(set_bucket_lifecycle),
)
.route(
"/buckets/lifecycle/delete",
web::post().to(delete_bucket_lifecycle),
)
.route("/buckets/policy", web::post().to(get_bucket_policy))
.route("/buckets/policy/set", web::post().to(set_bucket_policy))
.route(
"/buckets/policy/delete",
web::post().to(delete_bucket_policy),
)
.route(
"/buckets/public-access",
web::post().to(get_public_access_block),
)
.route(
"/buckets/public-access/set",
web::post().to(set_public_access_block),
)
.route(
"/buckets/public-access/delete",
web::post().to(delete_public_access_block),
)
.route("/objects", web::post().to(list_objects))
.route("/objects/head", web::post().to(head_object))
.route("/objects/exists", web::post().to(object_exists))
.route("/objects/validate", web::post().to(validate_object))
.route("/objects/update", web::post().to(update_object))
.route("/objects/url", web::post().to(get_object_url))
.route("/objects/public-url", web::post().to(get_object_public_url))
.route("/objects/copy", web::post().to(copy_object))
.route("/objects/delete", web::post().to(delete_object))
.route("/objects/versions", web::post().to(list_object_versions))
.route(
"/objects/versions/restore",
web::post().to(restore_object_version),
)
.route(
"/objects/versions/delete",
web::post().to(delete_object_version),
)
.route("/objects/folder", web::post().to(create_folder))
.route("/objects/folder/delete", web::post().to(delete_folder))
.route("/objects/folder/rename", web::post().to(rename_folder))
.route("/objects/upload-url", web::post().to(presign_upload))
.route(
"/objects/post-policy",
web::post().to(create_signed_post_policy),
)
.route("/buckets/cors", web::post().to(get_bucket_cors))
.route("/buckets/cors/set", web::post().to(set_bucket_cors))
.route("/buckets/cors/delete", web::post().to(delete_bucket_cors))
.configure(files::configure_storage_routes),
);
}