use actix_web::{HttpResponse, web, web::Json};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::RequestId;
use aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2FluentBuilder;
use aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder;
use aws_sdk_s3::presigning::PresigningConfig;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::PathBuf;
use std::process::Output;
use std::time::Duration;
use tokio::fs;
use tokio::process::Command;
pub mod digitalocean;
pub mod hetzner;
pub mod minio;
pub mod s3;
mod service;
use tracing::warn;
use which::which;
use crate::api::response::{api_success, bad_request, internal_error};
use service::{PRESIGN_URL_EXPIRY_SECS, TempPathCleanup, build_s3_client, ensure_bucket_present};
#[derive(Debug, Deserialize)]
struct S3Creds {
endpoint: String,
region: String,
bucket: String,
access_key_id: String,
secret_key: String,
}
#[derive(Debug, Deserialize)]
struct ListObjectsRequest {
#[serde(flatten)]
creds: S3Creds,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
delimiter: Option<String>,
#[serde(default)]
continuation_token: Option<String>,
#[serde(default = "default_max_keys")]
max_keys: i32,
}
fn default_max_keys() -> i32 {
200
}
#[derive(Debug, Deserialize)]
struct GetObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct DeleteObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct CreateFolderRequest {
#[serde(flatten)]
creds: S3Creds,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct PresignUploadRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
content_type: Option<String>,
}
#[derive(Debug, Deserialize)]
struct BucketCorsRequest {
#[serde(flatten)]
creds: S3Creds,
}
#[derive(Debug, Deserialize)]
struct BucketCorsRuleInput {
allowed_origins: Vec<String>,
allowed_methods: Vec<String>,
#[serde(default)]
allowed_headers: Vec<String>,
#[serde(default)]
expose_headers: Vec<String>,
#[serde(default)]
max_age_seconds: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct SetBucketCorsRequest {
#[serde(flatten)]
creds: S3Creds,
rules: Vec<BucketCorsRuleInput>,
}
#[derive(Debug, Serialize)]
struct S3Object {
key: String,
size: i64,
last_modified: String,
is_folder: bool,
etag: Option<String>,
storage_class: Option<String>,
}
#[derive(Debug, Serialize)]
struct ListObjectsResponse {
objects: Vec<S3Object>,
common_prefixes: Vec<String>,
is_truncated: bool,
next_continuation_token: Option<String>,
prefix: Option<String>,
}
async fn build_client(creds: &S3Creds) -> S3Client {
build_s3_client(
&creds.endpoint,
&creds.region,
&creds.bucket,
&creds.access_key_id,
&creds.secret_key,
)
.await
}
fn resolve_s3cmd_path() -> Option<PathBuf> {
which("s3cmd").ok().or_else(|| which("s3cmd.exe").ok())
}
fn normalize_endpoint_url(endpoint: &str) -> Result<reqwest::Url, String> {
let trimmed: &str = endpoint.trim();
if trimmed.is_empty() {
return Err("endpoint field is empty".to_string());
}
if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
return reqwest::Url::parse(trimmed)
.map_err(|err| format!("invalid endpoint URL '{}': {err}", trimmed));
}
let with_scheme: String = format!("https://{trimmed}");
reqwest::Url::parse(&with_scheme)
.map_err(|err| format!("invalid endpoint URL '{}': {err}", trimmed))
}
fn s3cmd_base_args(creds: &S3Creds) -> Result<Vec<String>, String> {
if creds.access_key_id.trim().is_empty() {
return Err("access_key_id is required for s3cmd".to_string());
}
if creds.secret_key.trim().is_empty() {
return Err("secret_key is required for s3cmd".to_string());
}
let endpoint: reqwest::Url = normalize_endpoint_url(&creds.endpoint)?;
let host: &str = endpoint
.host_str()
.ok_or_else(|| "endpoint URL does not contain a host".to_string())?;
let mut args: Vec<String> = vec![
format!("--access_key={}", creds.access_key_id),
format!("--secret_key={}", creds.secret_key),
format!("--host={host}"),
format!("--host-bucket=%(bucket)s.{host}"),
];
if endpoint.scheme().eq_ignore_ascii_case("http") {
args.push("--no-ssl".to_string());
}
Ok(args)
}
async fn run_s3cmd(creds: &S3Creds, command_args: &[String]) -> Result<String, String> {
let s3cmd_path: PathBuf = resolve_s3cmd_path().ok_or_else(|| {
"s3cmd is not installed or not available in PATH. Install s3cmd to manage bucket CORS."
.to_string()
})?;
let mut cmd: Command = Command::new(s3cmd_path);
for arg in s3cmd_base_args(creds)? {
cmd.arg(arg);
}
for arg in command_args {
cmd.arg(arg);
}
let output: Output = cmd
.output()
.await
.map_err(|err| format!("failed to execute s3cmd: {err}"))?;
if output.status.success() {
return Ok(String::from_utf8_lossy(&output.stdout).to_string());
}
let stderr: String = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout: String = String::from_utf8_lossy(&output.stdout).trim().to_string();
let detail: String = if !stderr.is_empty() { stderr } else { stdout };
if detail.is_empty() {
Err(format!(
"s3cmd failed with exit code {:?}",
output.status.code()
))
} else {
Err(detail)
}
}
fn escape_xml(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
fn method_is_supported(method: &str) -> bool {
matches!(
method,
"GET" | "HEAD" | "PUT" | "POST" | "DELETE" | "OPTIONS" | "PATCH"
)
}
fn build_cors_xml(rules: &[BucketCorsRuleInput]) -> Result<String, String> {
if rules.is_empty() {
return Err("At least one CORS rule is required".to_string());
}
let mut xml: String = String::from("<CORSConfiguration>\n");
for rule in rules {
let origins: Vec<String> = rule
.allowed_origins
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
if origins.is_empty() {
return Err("Each CORS rule must include at least one allowed origin".to_string());
}
let methods: Vec<String> = rule
.allowed_methods
.iter()
.map(|m| m.trim().to_ascii_uppercase())
.filter(|m| !m.is_empty())
.collect();
if methods.is_empty() {
return Err("Each CORS rule must include at least one allowed method".to_string());
}
for method in &methods {
if !method_is_supported(method) {
return Err(format!("Unsupported CORS method '{method}'"));
}
}
let mut headers: Vec<String> = rule
.allowed_headers
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
if headers.is_empty() {
headers.push("*".to_string());
}
let expose_headers: Vec<String> = rule
.expose_headers
.iter()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
.map(str::to_string)
.collect();
xml.push_str(" <CORSRule>\n");
for origin in origins {
xml.push_str(&format!(
" <AllowedOrigin>{}</AllowedOrigin>\n",
escape_xml(&origin)
));
}
for method in methods {
xml.push_str(&format!(
" <AllowedMethod>{}</AllowedMethod>\n",
escape_xml(&method)
));
}
for header in headers {
xml.push_str(&format!(
" <AllowedHeader>{}</AllowedHeader>\n",
escape_xml(&header)
));
}
for header in expose_headers {
xml.push_str(&format!(
" <ExposeHeader>{}</ExposeHeader>\n",
escape_xml(&header)
));
}
if let Some(max_age_seconds) = rule.max_age_seconds {
xml.push_str(&format!(
" <MaxAgeSeconds>{max_age_seconds}</MaxAgeSeconds>\n"
));
}
xml.push_str(" </CORSRule>\n");
}
xml.push_str("</CORSConfiguration>\n");
Ok(xml)
}
async fn list_objects(body: Json<ListObjectsRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
let mut req: ListObjectsV2FluentBuilder = client.list_objects_v2().bucket(&body.creds.bucket);
if let Some(prefix) = &body.prefix {
req = req.prefix(prefix);
}
let delimiter: &str = body.delimiter.as_deref().unwrap_or("/");
req = req.delimiter(delimiter);
req = req.max_keys(body.max_keys);
if let Some(token) = &body.continuation_token {
req = req.continuation_token(token);
}
match req.send().await {
Ok(output) => {
let objects: Vec<S3Object> = output
.contents()
.iter()
.map(|obj| {
let key: String = obj.key().unwrap_or_default().to_string();
let is_folder: bool = key.ends_with('/');
S3Object {
key,
size: obj.size().unwrap_or(0),
last_modified: obj
.last_modified()
.map(|t| t.to_string())
.unwrap_or_default(),
is_folder,
etag: obj.e_tag().map(|s| s.to_string()),
storage_class: obj.storage_class().map(|s| s.as_str().to_string()),
}
})
.collect();
let common_prefixes: Vec<String> = output
.common_prefixes()
.iter()
.filter_map(|cp| cp.prefix().map(|s| s.to_string()))
.collect();
let resp: ListObjectsResponse = ListObjectsResponse {
objects,
common_prefixes,
is_truncated: output.is_truncated().unwrap_or(false),
next_continuation_token: output.next_continuation_token().map(|s| s.to_string()),
prefix: body.prefix.clone(),
};
api_success("Objects listed", json!(resp))
}
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 list_objects_v2 failed");
internal_error("Failed to list objects", err.to_string())
}
}
}
async fn get_bucket_cors(body: Json<BucketCorsRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let client: S3Client = build_client(&body.creds).await;
match client
.get_bucket_cors()
.bucket(&body.creds.bucket)
.send()
.await
{
Ok(output) => {
let mapped_rules: Vec<BucketCorsRuleInput> = output
.cors_rules()
.iter()
.map(|rule: &aws_sdk_s3::types::CorsRule| BucketCorsRuleInput {
allowed_origins: rule.allowed_origins().to_vec(),
allowed_methods: rule
.allowed_methods()
.iter()
.map(|method| method.as_str().to_string())
.collect::<Vec<String>>(),
allowed_headers: rule.allowed_headers().to_vec(),
expose_headers: rule.expose_headers().to_vec(),
max_age_seconds: rule
.max_age_seconds()
.and_then(|value| u32::try_from(value).ok()),
})
.collect();
let cors_xml: String = match build_cors_xml(&mapped_rules) {
Ok(value) => value,
Err(err) => {
return internal_error("Failed to parse bucket CORS", err);
}
};
api_success(
"Bucket CORS loaded",
json!({
"bucket": body.creds.bucket,
"cors_xml": cors_xml,
}),
)
}
Err(err) => {
let detail: String = match &err {
SdkError::ServiceError(se) => {
let inner = se.err();
let mut s: String = inner.to_string();
if s.len() < 32
|| s.eq_ignore_ascii_case("service error")
|| s == "unknown error"
{
s = format!("{s} ({inner:?})");
}
if let Some(rid) = inner.meta().request_id() {
s.push_str(&format!(" [request_id={rid}]"));
}
s
}
_ => err.to_string(),
};
if detail.contains("NoSuchCORSConfiguration") {
return api_success(
"Bucket CORS loaded",
json!({
"bucket": body.creds.bucket,
"cors_xml": "",
}),
);
}
warn!(bucket = %body.creds.bucket, error = %detail, "S3 get_bucket_cors failed");
internal_error("Failed to fetch bucket CORS", detail)
}
}
}
async fn set_bucket_cors(body: Json<SetBucketCorsRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let cors_xml: String = match build_cors_xml(&body.rules) {
Ok(xml) => xml,
Err(err) => return bad_request("Invalid CORS rules", err),
};
let tmp_path: PathBuf = std::env::temp_dir().join(format!(
"athena-cors-{}-{}.xml",
body.creds.bucket.trim(),
uuid::Uuid::new_v4()
));
let _tmp_cleanup: TempPathCleanup = TempPathCleanup::new(tmp_path.clone());
if let Err(err) = fs::write(&tmp_path, cors_xml).await {
return internal_error("Failed to prepare CORS config", err.to_string());
}
let args: Vec<String> = vec![
"setcors".to_string(),
tmp_path.to_string_lossy().to_string(),
format!("s3://{}", body.creds.bucket.trim()),
];
let result: Result<String, String> = run_s3cmd(&body.creds, &args).await;
match result {
Ok(_) => api_success(
"Bucket CORS updated",
json!({
"bucket": body.creds.bucket,
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "s3cmd setcors failed");
internal_error("Failed to update bucket CORS", err)
}
}
}
async fn delete_bucket_cors(body: Json<BucketCorsRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let args: Vec<String> = vec![
"delcors".to_string(),
format!("s3://{}", body.creds.bucket.trim()),
];
match run_s3cmd(&body.creds, &args).await {
Ok(_) => api_success(
"Bucket CORS removed",
json!({
"bucket": body.creds.bucket,
}),
),
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "s3cmd delcors failed");
internal_error("Failed to remove bucket CORS", err)
}
}
}
async fn get_object_url(body: Json<GetObjectRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let presign_config: PresigningConfig =
match PresigningConfig::expires_in(Duration::from_secs(PRESIGN_URL_EXPIRY_SECS)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
match client
.get_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.presigned(presign_config)
.await
{
Ok(presigned) => api_success(
"Presigned URL generated",
json!({ "url": presigned.uri(), "expires_in": PRESIGN_URL_EXPIRY_SECS }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign get_object failed");
internal_error("Failed to generate presigned URL", err.to_string())
}
}
}
async fn delete_object(body: Json<DeleteObjectRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
match client
.delete_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.send()
.await
{
Ok(_) => api_success(
"Object deleted",
json!({ "key": body.key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 delete_object failed");
internal_error("Failed to delete object", err.to_string())
}
}
}
async fn create_folder(body: Json<CreateFolderRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
let prefix: &str = body.prefix.trim().trim_end_matches('/');
if prefix.is_empty() {
return bad_request("Prefix is required", "prefix field is empty");
}
let folder_key: String = format!("{}/", prefix);
let client: S3Client = build_client(&body.creds).await;
match client
.put_object()
.bucket(&body.creds.bucket)
.key(&folder_key)
.body(aws_sdk_s3::primitives::ByteStream::from_static(b""))
.send()
.await
{
Ok(_) => api_success(
"Folder created",
json!({ "key": folder_key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %folder_key, error = %err, "S3 put_object (create folder) failed");
internal_error("Failed to create folder", err.to_string())
}
}
}
async fn presign_upload(body: Json<PresignUploadRequest>) -> HttpResponse {
if let Err(resp) = ensure_bucket_present(&body.creds.bucket) {
return resp;
}
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client: S3Client = build_client(&body.creds).await;
let presign_config: PresigningConfig =
match PresigningConfig::expires_in(Duration::from_secs(PRESIGN_URL_EXPIRY_SECS)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
let mut req: PutObjectFluentBuilder = client
.put_object()
.bucket(&body.creds.bucket)
.key(&body.key);
if let Some(ct) = &body.content_type {
req = req.content_type(ct);
}
match req.presigned(presign_config).await {
Ok(presigned) => api_success(
"Upload URL generated",
json!({
"url": presigned.uri(),
"method": "PUT",
"key": body.key,
"expires_in": PRESIGN_URL_EXPIRY_SECS
}),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign put_object failed");
internal_error("Failed to generate upload URL", err.to_string())
}
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.route("/storage/objects", web::post().to(list_objects))
.route("/storage/objects/url", web::post().to(get_object_url))
.route("/storage/objects/delete", web::post().to(delete_object))
.route("/storage/objects/folder", web::post().to(create_folder))
.route(
"/storage/objects/upload-url",
web::post().to(presign_upload),
)
.route("/storage/buckets/cors", web::post().to(get_bucket_cors))
.route("/storage/buckets/cors/set", web::post().to(set_bucket_cors))
.route(
"/storage/buckets/cors/delete",
web::post().to(delete_bucket_cors),
);
}