use actix_web::body::{BoxBody, MessageBody};
use actix_web::dev::{ServiceRequest, ServiceResponse};
use actix_web::middleware::Next;
use actix_web::{Error, HttpRequest, HttpResponse, web, web::Json};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::RequestId;
use aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2FluentBuilder;
use aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder;
use aws_sdk_s3::presigning::PresigningConfig;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::path::PathBuf;
use std::process::Output;
use std::time::Duration;
use tokio::fs;
use tokio::process::Command;
pub mod digitalocean;
pub mod hetzner;
pub mod minio;
pub mod s3;
mod service;
mod validation;
use tracing::warn;
use which::which;
use crate::AppState;
use crate::api::gateway::auth::{require_admin_or_gateway, storage_proxy_right};
use crate::api::rate_limit::check_inbound_optional;
use crate::api::response::{api_success, bad_request, conflict, internal_error, not_found};
use service::{PRESIGN_URL_EXPIRY_SECS, TempPathCleanup, build_s3_client, ensure_bucket_present};
#[derive(Debug, Deserialize)]
struct S3ConnectionConfig {
endpoint: String,
region: String,
access_key_id: String,
secret_key: String,
}
#[derive(Debug, Deserialize)]
struct S3Creds {
#[serde(flatten)]
connection: S3ConnectionConfig,
bucket: String,
}
#[derive(Debug, Deserialize)]
struct ListBucketsRequest {
#[serde(flatten)]
connection: S3ConnectionConfig,
}
#[derive(Debug, Deserialize)]
struct ListObjectsRequest {
#[serde(flatten)]
creds: S3Creds,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
delimiter: Option<String>,
#[serde(default)]
continuation_token: Option<String>,
#[serde(default = "default_max_keys")]
max_keys: i32,
}
fn default_max_keys() -> i32 {
200
}
#[derive(Debug, Deserialize)]
struct GetObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct DeleteObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct CreateFolderRequest {
#[serde(flatten)]
creds: S3Creds,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct PresignUploadRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
content_type: Option<String>,
}
#[derive(Debug, Deserialize)]
struct BucketCorsRequest {
#[serde(flatten)]
creds: S3Creds,
}
#[derive(Debug, Deserialize)]
struct BucketCorsRuleInput {
allowed_origins: Vec<String>,
allowed_methods: Vec<String>,
#[serde(default)]
allowed_headers: Vec<String>,
#[serde(default)]
expose_headers: Vec<String>,
#[serde(default)]
max_age_seconds: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct SetBucketCorsRequest {
#[serde(flatten)]
creds: S3Creds,
rules: Vec<BucketCorsRuleInput>,
}
#[derive(Debug, Serialize)]
struct StorageBucketSummary {
name: String,
creation_date: String,
}
#[derive(Debug, Serialize)]
struct StorageBucketListResponse {
buckets: Vec<StorageBucketSummary>,
count: usize,
}
#[derive(Debug, Serialize)]
struct S3Object {
key: String,
size: i64,
last_modified: String,
is_folder: bool,
etag: Option<String>,
storage_class: Option<String>,
}
#[derive(Debug, Serialize)]
struct ListObjectsResponse {
objects: Vec<S3Object>,
common_prefixes: Vec<String>,
is_truncated: bool,
next_continuation_token: Option<String>,
prefix: Option<String>,
}
#[derive(Debug, Serialize)]
struct HeadObjectResponse {
bucket: String,
key: String,
content_length: i64,
content_type: Option<String>,
last_modified: String,
etag: Option<String>,
version_id: Option<String>,
storage_class: Option<String>,
metadata: serde_json::Map<String, Value>,
}
fn validate_s3_connection(connection: &S3ConnectionConfig) -> Result<(), HttpResponse> {
validation::validate_storage_endpoint(&connection.endpoint)?;
validation::validate_region(&connection.region)?;
validation::validate_access_credentials(&connection.access_key_id, &connection.secret_key)?;
if validation::scanner_junk_in_storage_field(&connection.endpoint)
|| validation::scanner_junk_in_storage_field(&connection.region)
{
return Err(bad_request(
"Invalid request",
"One or more fields contain unsupported characters",
));
}
Ok(())
}
fn validate_s3_creds(creds: &S3Creds) -> Result<(), HttpResponse> {
validate_s3_connection(&creds.connection)?;
validation::validate_bucket_name(&creds.bucket)?;
if validation::scanner_junk_in_storage_field(&creds.bucket) {
return Err(bad_request(
"Invalid request",
"One or more fields contain unsupported characters",
));
}
Ok(())
}
fn check_storage_rate_limit(req: &HttpRequest, app_state: &AppState) -> Result<(), HttpResponse> {
check_inbound_optional(
&app_state.inbound_rate_limit_storage,
app_state.inbound_rate_limit_trust_x_forwarded_for,
req,
)
}
async fn storage_auth_middleware<B>(
req: ServiceRequest,
next: Next<B>,
) -> Result<ServiceResponse<BoxBody>, Error>
where
B: MessageBody + 'static,
{
if req.path().starts_with("/storage/") {
let Some(data) = req.app_data::<web::Data<AppState>>() else {
return Err(actix_web::error::ErrorInternalServerError(
"AppState not configured",
));
};
let state: &AppState = data.get_ref();
if let Err(http_resp) =
require_admin_or_gateway(req.request(), state, None, vec![storage_proxy_right()]).await
{
return Ok(req.into_response(http_resp.map_into_boxed_body()));
}
}
Ok(next.call(req).await?.map_into_boxed_body())
}
async fn build_client(creds: &S3Creds) -> S3Client {
build_s3_client(
&creds.connection.endpoint,
&creds.connection.region,
&creds.bucket,
&creds.connection.access_key_id,
&creds.connection.secret_key,
)
.await
}
async fn build_connection_client(connection: &S3ConnectionConfig) -> S3Client {
build_s3_client(
&connection.endpoint,
&connection.region,
"",
&connection.access_key_id,
&connection.secret_key,
)
.await
}
fn resolve_s3cmd_path() -> Option<PathBuf> {
which("s3cmd").ok().or_else(|| which("s3cmd.exe").ok())
}
fn normalize_endpoint_url(endpoint: &str) -> Result<reqwest::Url, String> {
let trimmed: &str = endpoint.trim();
if trimmed.is_empty() {
return Err("endpoint field is empty".to_string());
}
if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
return reqwest::Url::parse(trimmed)
.map_err(|err| format!("invalid endpoint URL '{}': {err}", trimmed));
}
let with_scheme: String = format!("https://{trimmed}");
reqwest::Url::parse(&with_scheme)
.map_err(|err| format!("invalid endpoint URL '{}': {err}", trimmed))
}
fn s3cmd_base_args(creds: &S3Creds) -> Result<Vec<String>, String> {
if creds.connection.access_key_id.trim().is_empty() {
return Err("access_key_id is required for s3cmd".to_string());
}
if creds.connection.secret_key.trim().is_empty() {
return Err("secret_key is required for s3cmd".to_string());
}
let endpoint: reqwest::Url = normalize_endpoint_url(&creds.connection.endpoint)?;
let host: &str = endpoint
.host_str()
.ok_or_else(|| "endpoint URL does not contain a host".to_string())?;
let mut args: Vec<String> = vec![
format!("--access_key={}", creds.connection.access_key_id),
format!("--secret_key={}", creds.connection.secret_key),
format!("--host={host}"),
format!("--host-bucket=%(bucket)s.{host}"),
];
if endpoint.scheme().eq_ignore_ascii_case("http") {
args.push("--no-ssl".to_string());
}
Ok(args)
}
async fn run_s3cmd(creds: &S3Creds, command_args: &[String]) -> Result<String, String> {
let s3cmd_path: PathBuf = resolve_s3cmd_path().ok_or_else(|| {
"s3cmd is not installed or not available in PATH. Install s3cmd to manage bucket CORS."
.to_string()
})?;
let mut cmd: Command = Command::new(s3cmd_path);
for arg in s3cmd_base_args(creds)? {
cmd.arg(arg);
}
for arg in command_args {
cmd.arg(arg);
}
let output: Output = cmd
.output()
.await
.map_err(|err| format!("failed to execute s3cmd: {err}"))?;
if output.status.success() {
return Ok(String::from_utf8_lossy(&output.stdout).to_string());
}
let stderr: String = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout: String = String::from_utf8_lossy(&output.stdout).trim().to_string();
let detail: String = if !stderr.is_empty() { stderr } else { stdout };
if detail.is_empty() {
Err(format!(
"s3cmd failed with exit code {:?}",
output.status.code()
))
} else {
Err(detail)
}
}
fn escape_xml(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
fn method_is_supported(method: &str) -> bool {
matches!(
method,
"GET" | "HEAD" | "PUT" | "POST" | "DELETE" | "OPTIONS" | "PATCH"
)
}
fn build_cors_xml(rules: &[BucketCorsRuleInput]) -> Result<String, String> {
if rules.is_empty() {
return Err("At least one CORS rule is required".to_string());
}
let mut xml: String = String::from("<CORSConfiguration>\n");
for rule in rules {
let origins: Vec<String> = rule
.allowed_origins
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
if origins.is_empty() {
return Err("Each CORS rule must include at least one allowed origin".to_string());
}
let methods: Vec<String> = rule
.allowed_methods
.iter()
.map(|m| m.trim().to_ascii_uppercase())
.filter(|m| !m.is_empty())
.collect();
if methods.is_empty() {
return Err("Each CORS rule must include at least one allowed method".to_string());
}
for method in &methods {
if !method_is_supported(method) {
return Err(format!("Unsupported CORS method '{method}'"));
}
}
let mut headers: Vec<String> = rule
.allowed_headers
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
if headers.is_empty() {
headers.push("*".to_string());
}
let expose_headers: Vec<String> = rule
.expose_headers
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
xml.push_str(" <CORSRule>\n");
for origin in origins {
xml.push_str(&format!(
" <AllowedOrigin>{}</AllowedOrigin>\n",
escape_xml(&origin)
));
}
for method in methods {
xml.push_str(&format!(
" <AllowedMethod>{}</AllowedMethod>\n",
escape_xml(&method)
));
}
for header in headers {
xml.push_str(&format!(
" <AllowedHeader>{}</AllowedHeader>\n",
escape_xml(&header)
));
}
for header in expose_headers {
xml.push_str(&format!(
" <ExposeHeader>{}</ExposeHeader>\n",
escape_xml(&header)
));
}
if let Some(max_age_seconds) = rule.max_age_seconds {
xml.push_str(&format!(
" <MaxAgeSeconds>{max_age_seconds}</MaxAgeSeconds>\n"
));
}
xml.push_str(" </CORSRule>\n");
}
xml.push_str("</CORSConfiguration>\n");
Ok(xml)
}
fn bucket_create_uses_location_constraint(connection: &S3ConnectionConfig) -> bool {
let endpoint = connection.endpoint.to_ascii_lowercase();
endpoint.contains("amazonaws.com")
&& !connection.region.eq_ignore_ascii_case("us-east-1")
&& !connection.region.eq_ignore_ascii_case("auto")
}
async fn list_buckets(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ListBucketsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_connection(&body.connection) {
return resp;
}
let client: S3Client = build_connection_client(&body.connection).await;
match client.list_buckets().send().await {
Ok(output) => {
let buckets: Vec<StorageBucketSummary> = output
.buckets()
.iter()
.filter_map(|bucket| {
let name = bucket.name()?.to_string();
Some(StorageBucketSummary {
name,
creation_date: bucket
.creation_date()
.map(|value| value.to_string())
.unwrap_or_default(),
})
})
.collect();
let count = buckets.len();
tracing::info!(
bucket_count = count,
endpoint = %body.connection.endpoint,
"S3 list_buckets succeeded"
);
api_success(
"Buckets listed",
json!(StorageBucketListResponse { buckets, count }),
)
}
Err(err) => {
warn!(
endpoint = %body.connection.endpoint,
error = %err,
"S3 list_buckets failed"
);
internal_error("Failed to list buckets", err.to_string())
}
}
}
async fn create_bucket(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<S3Creds>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.bucket) {
return resp;
}
let client: S3Client = build_client(&body).await;
let mut request = client.create_bucket().bucket(&body.bucket);
if bucket_create_uses_location_constraint(&body.connection) {
request = request.create_bucket_configuration(
aws_sdk_s3::types::CreateBucketConfiguration::builder()
.location_constraint(aws_sdk_s3::types::BucketLocationConstraint::from(
body.connection.region.as_str(),
))
.build(),
);
}
match request.send().await {
Ok(_) => {
tracing::info!(
bucket = %body.bucket,
endpoint = %body.connection.endpoint,
region = %body.connection.region,
"S3 create_bucket succeeded"
);
api_success(
"Bucket created",
json!({
"bucket": body.bucket,
"region": body.connection.region,
}),
)
}
Err(err) => {
let detail = err.to_string();
warn!(bucket = %body.bucket, error = %detail, "S3 create_bucket failed");
if detail.contains("BucketAlreadyOwnedByYou") || detail.contains("BucketAlreadyExists")
{
return conflict("Bucket already exists", detail);
}
internal_error("Failed to create bucket", detail)
}
}
}
async fn delete_bucket(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<S3Creds>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.bucket) {
return resp;
}
let client: S3Client = build_client(&body).await;
match client.delete_bucket().bucket(&body.bucket).send().await {
Ok(_) => {
tracing::info!(bucket = %body.bucket, "S3 delete_bucket succeeded");
api_success("Bucket deleted", json!({ "bucket": body.bucket }))
}
Err(err) => {
let detail = err.to_string();
warn!(bucket = %body.bucket, error = %detail, "S3 delete_bucket failed");
if detail.contains("BucketNotEmpty") {
return conflict(
"Bucket is not empty",
"Delete or move the remaining objects before deleting the bucket",
);
}
if detail.contains("NoSuchBucket") {
return not_found("Bucket not found", detail);
}
internal_error("Failed to delete bucket", detail)
}
}
}
async fn list_objects(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<ListObjectsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
let mut req: ListObjectsV2FluentBuilder = client.list_objects_v2().bucket(&body.creds.bucket);
if let Some(prefix) = &body.prefix {
req = req.prefix(prefix);
}
let delimiter: &str = body.delimiter.as_deref().unwrap_or("/");
req = req.delimiter(delimiter);
req = req.max_keys(body.max_keys);
if let Some(token) = &body.continuation_token {
req = req.continuation_token(token);
}
match req.send().await {
Ok(output) => {
let objects: Vec<S3Object> = output
.contents()
.iter()
.map(|obj| {
let key: String = obj.key().unwrap_or_default().to_string();
let is_folder: bool = key.ends_with('/');
S3Object {
key,
size: obj.size().unwrap_or(0),
last_modified: obj
.last_modified()
.map(|t| t.to_string())
.unwrap_or_default(),
is_folder,
etag: obj.e_tag().map(|s| s.to_string()),
storage_class: obj.storage_class().map(|s| s.as_str().to_string()),
}
})
.collect();
let common_prefixes: Vec<String> = output
.common_prefixes()
.iter()
.filter_map(|cp| cp.prefix().map(|s| s.to_string()))
.collect();
let resp: ListObjectsResponse = ListObjectsResponse {
objects,
common_prefixes,
is_truncated: output.is_truncated().unwrap_or(false),
next_continuation_token: output.next_continuation_token().map(|s| s.to_string()),
prefix: body.prefix.clone(),
};
api_success("Objects listed", json!(resp))
}
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 list_objects_v2 failed");
internal_error("Failed to list objects", err.to_string())
}
}
}
async fn head_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<GetObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
match client
.head_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.send()
.await
{
Ok(output) => {
tracing::info!(
bucket = %body.creds.bucket,
key = %body.key,
"S3 head_object succeeded"
);
let metadata = output
.metadata()
.map(|map| {
let mut metadata = serde_json::Map::new();
for (key, value) in map {
metadata.insert(key.clone(), Value::String(value.clone()));
}
metadata
})
.unwrap_or_default();
api_success(
"Object metadata loaded",
json!(HeadObjectResponse {
bucket: body.creds.bucket.clone(),
key: body.key.clone(),
content_length: output.content_length().unwrap_or(0),
content_type: output.content_type().map(str::to_string),
last_modified: output
.last_modified()
.map(|value| value.to_string())
.unwrap_or_default(),
etag: output.e_tag().map(str::to_string),
version_id: output.version_id().map(str::to_string),
storage_class: output
.storage_class()
.map(|value| value.as_str().to_string()),
metadata,
}),
)
}
Err(err) => {
let detail = err.to_string();
warn!(
bucket = %body.creds.bucket,
key = %body.key,
error = %detail,
"S3 head_object failed"
);
if detail.contains("NotFound") || detail.contains("NoSuchKey") {
return not_found("Object not found", detail);
}
internal_error("Failed to load object metadata", detail)
}
}
}
async fn get_bucket_cors(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketCorsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.get_bucket_cors()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(output) => {
let mapped_rules: Vec<BucketCorsRuleInput> = output
.cors_rules()
.iter()
.map(|rule: &aws_sdk_s3::types::CorsRule| BucketCorsRuleInput {
allowed_origins: rule.allowed_origins().to_vec(),
allowed_methods: rule
.allowed_methods()
.iter()
.map(|method| method.as_str().to_string())
.collect::<Vec<String>>(),
allowed_headers: rule.allowed_headers().to_vec(),
expose_headers: rule.expose_headers().to_vec(),
max_age_seconds: rule
.max_age_seconds()
.and_then(|value| u32::try_from(value).ok()),
})
.collect();
let cors_xml: String = match build_cors_xml(&mapped_rules) {
Ok(value) => value,
Err(err) => {
return internal_error("Failed to parse bucket CORS", err);
}
};
api_success(
"Bucket CORS loaded",
json!({
"bucket": body.creds.bucket,
"cors_xml": cors_xml,
}),
)
}
Err(err) => {
let detail: String = match &err {
SdkError::ServiceError(se) => {
let inner = se.err();
let mut s: String = inner.to_string();
if s.len() < 32
|| s.eq_ignore_ascii_case("service error")
|| s == "unknown error"
{
s = format!("{s} ({inner:?})");
}
if let Some(rid) = inner.meta().request_id() {
s.push_str(&format!(" [request_id={rid}]"));
}
s
}
_ => err.to_string(),
};
if detail.contains("NoSuchCORSConfiguration") {
return api_success(
"Bucket CORS loaded",
json!({
"bucket": body.creds.bucket,
"cors_xml": "",
}),
);
}
warn!(bucket = %body.creds.bucket, error = %detail, "S3 get_bucket_cors failed");
internal_error("Failed to fetch bucket CORS", detail)
}
}
}
async fn set_bucket_cors(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<SetBucketCorsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let cors_xml: String = match build_cors_xml(&body.rules) {
Ok(xml) => xml,
Err(err) => return bad_request("Invalid CORS rules", err),
};
let tmp_path: PathBuf = std::env::temp_dir().join(format!(
"athena-cors-{}-{}.xml",
body.creds.bucket.trim(),
uuid::Uuid::new_v4()
));
let _tmp_cleanup: TempPathCleanup = TempPathCleanup::new(tmp_path.clone());
if let Err(err) = fs::write(&tmp_path, cors_xml).await {
return internal_error("Failed to prepare CORS config", err.to_string());
}
let args: Vec<String> = vec![
"setcors".to_string(),
tmp_path.to_string_lossy().to_string(),
format!("s3://{}", body.creds.bucket.trim()),
];
let result: Result<String, String> = run_s3cmd(&body.creds, &args).await;
match result {
Ok(_) => api_success(
"Bucket CORS updated",
json!({
"bucket": body.creds.bucket,
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "s3cmd setcors failed");
internal_error("Failed to update bucket CORS", err)
}
}
}
async fn delete_bucket_cors(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<BucketCorsRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let args: Vec<String> = vec![
"delcors".to_string(),
format!("s3://{}", body.creds.bucket.trim()),
];
match run_s3cmd(&body.creds, &args).await {
Ok(_) => api_success(
"Bucket CORS removed",
json!({
"bucket": body.creds.bucket,
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "s3cmd delcors failed");
internal_error("Failed to remove bucket CORS", err)
}
}
}
async fn get_object_url(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<GetObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let presign_config: PresigningConfig =
match PresigningConfig::expires_in(Duration::from_secs(PRESIGN_URL_EXPIRY_SECS)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
match client
.get_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.presigned(presign_config)
.await
{
Ok(presigned) => api_success(
"Presigned URL generated",
json!({ "url": presigned.uri(), "expires_in": PRESIGN_URL_EXPIRY_SECS }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign get_object failed");
internal_error("Failed to generate presigned URL", err.to_string())
}
}
}
async fn delete_object(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<DeleteObjectRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.send()
.await
{
Ok(_) => api_success(
"Object deleted",
json!({ "key": body.key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 delete_object failed");
internal_error("Failed to delete object", err.to_string())
}
}
}
async fn create_folder(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<CreateFolderRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let prefix: &str = body.prefix.trim().trim_end_matches('/');
if prefix.is_empty() {
return bad_request("Prefix is required", "prefix field is empty");
}
let folder_key: String = format!("{}/", prefix);
let client: S3Client = build_client(&body.creds).await;
match client
.put_object()
.bucket(&body.creds.bucket)
.key(&folder_key)
.body(aws_sdk_s3::primitives::ByteStream::from_static(b""))
.send()
.await
{
Ok(_) => api_success(
"Folder created",
json!({ "key": folder_key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %folder_key, error = %err, "S3 put_object (create folder) failed");
internal_error("Failed to create folder", err.to_string())
}
}
}
async fn presign_upload(
req: HttpRequest,
app_state: web::Data<AppState>,
body: Json<PresignUploadRequest>,
) -> HttpResponse {
if let Err(resp) = check_storage_rate_limit(&req, app_state.get_ref()) {
return resp;
}
if let Err(resp) = validate_s3_creds(&body.creds) {
return resp;
}
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let presign_config: PresigningConfig =
match PresigningConfig::expires_in(Duration::from_secs(PRESIGN_URL_EXPIRY_SECS)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
let mut req: PutObjectFluentBuilder = client
.put_object()
.bucket(&body.creds.bucket)
.key(&body.key);
if let Some(ct) = &body.content_type {
req = req.content_type(ct);
}
match req.presigned(presign_config).await {
Ok(presigned) => api_success(
"Upload URL generated",
json!({
"url": presigned.uri(),
"method": "PUT",
"key": body.key,
"expires_in": PRESIGN_URL_EXPIRY_SECS
}),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign put_object failed");
internal_error("Failed to generate upload URL", err.to_string())
}
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/storage")
.wrap(actix_web::middleware::from_fn(storage_auth_middleware))
.route("/buckets/list", web::post().to(list_buckets))
.route("/buckets/create", web::post().to(create_bucket))
.route("/buckets/delete", web::post().to(delete_bucket))
.route("/objects", web::post().to(list_objects))
.route("/objects/head", web::post().to(head_object))
.route("/objects/url", web::post().to(get_object_url))
.route("/objects/delete", web::post().to(delete_object))
.route("/objects/folder", web::post().to(create_folder))
.route("/objects/upload-url", web::post().to(presign_upload))
.route("/buckets/cors", web::post().to(get_bucket_cors))
.route("/buckets/cors/set", web::post().to(set_bucket_cors))
.route("/buckets/cors/delete", web::post().to(delete_bucket_cors)),
);
}