use actix_web::{HttpResponse, web, web::Json};
use aws_config::BehaviorVersion;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::presigning::PresigningConfig;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::Duration;
use tracing::warn;
use crate::api::response::{api_success, bad_request, internal_error};
#[derive(Debug, Deserialize)]
struct S3Creds {
endpoint: String,
region: String,
bucket: String,
access_key_id: String,
secret_key: String,
}
#[derive(Debug, Deserialize)]
struct ListObjectsRequest {
#[serde(flatten)]
creds: S3Creds,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
delimiter: Option<String>,
#[serde(default)]
continuation_token: Option<String>,
#[serde(default = "default_max_keys")]
max_keys: i32,
}
fn default_max_keys() -> i32 {
200
}
#[derive(Debug, Deserialize)]
struct GetObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct DeleteObjectRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
}
#[derive(Debug, Deserialize)]
struct CreateFolderRequest {
#[serde(flatten)]
creds: S3Creds,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct PresignUploadRequest {
#[serde(flatten)]
creds: S3Creds,
key: String,
content_type: Option<String>,
}
#[derive(Debug, Serialize)]
struct S3Object {
key: String,
size: i64,
last_modified: String,
is_folder: bool,
etag: Option<String>,
storage_class: Option<String>,
}
#[derive(Debug, Serialize)]
struct ListObjectsResponse {
objects: Vec<S3Object>,
common_prefixes: Vec<String>,
is_truncated: bool,
next_continuation_token: Option<String>,
prefix: Option<String>,
}
async fn build_client(creds: &S3Creds) -> S3Client {
let region = Region::new(creds.region.clone());
let credentials = Credentials::new(
&creds.access_key_id,
&creds.secret_key,
None,
None,
"athena-storage",
);
let mut builder = aws_config::defaults(BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials);
let endpoint = creds.endpoint.trim().to_string();
if !endpoint.is_empty() && !endpoint.contains("amazonaws.com") {
builder = builder.endpoint_url(&endpoint);
}
let aws_config = builder.load().await;
let mut s3_builder = aws_sdk_s3::config::Builder::from(&aws_config);
if !endpoint.is_empty() && !endpoint.contains("amazonaws.com") {
s3_builder = s3_builder.force_path_style(true);
}
S3Client::from_conf(s3_builder.build())
}
async fn list_objects(body: Json<ListObjectsRequest>) -> HttpResponse {
if body.creds.bucket.trim().is_empty() {
return bad_request("Bucket is required", "bucket field is empty");
}
let client = build_client(&body.creds).await;
let mut req = client.list_objects_v2().bucket(&body.creds.bucket);
if let Some(prefix) = &body.prefix {
req = req.prefix(prefix);
}
let delimiter = body.delimiter.as_deref().unwrap_or("/");
req = req.delimiter(delimiter);
req = req.max_keys(body.max_keys);
if let Some(token) = &body.continuation_token {
req = req.continuation_token(token);
}
match req.send().await {
Ok(output) => {
let objects: Vec<S3Object> = output
.contents()
.iter()
.map(|obj| {
let key = obj.key().unwrap_or_default().to_string();
let is_folder = key.ends_with('/');
S3Object {
key,
size: obj.size().unwrap_or(0),
last_modified: obj
.last_modified()
.map(|t| t.to_string())
.unwrap_or_default(),
is_folder,
etag: obj.e_tag().map(|s| s.to_string()),
storage_class: obj.storage_class().map(|s| s.as_str().to_string()),
}
})
.collect();
let common_prefixes: Vec<String> = output
.common_prefixes()
.iter()
.filter_map(|cp| cp.prefix().map(|s| s.to_string()))
.collect();
let resp = ListObjectsResponse {
objects,
common_prefixes,
is_truncated: output.is_truncated().unwrap_or(false),
next_continuation_token: output.next_continuation_token().map(|s| s.to_string()),
prefix: body.prefix.clone(),
};
api_success("Objects listed", json!(resp))
}
Err(err) => {
warn!(bucket = %body.creds.bucket, error = %err, "S3 list_objects_v2 failed");
internal_error("Failed to list objects", err.to_string())
}
}
}
async fn get_object_url(body: Json<GetObjectRequest>) -> HttpResponse {
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client = build_client(&body.creds).await;
let presign_config = match PresigningConfig::expires_in(Duration::from_secs(3600)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
match client
.get_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.presigned(presign_config)
.await
{
Ok(presigned) => api_success(
"Presigned URL generated",
json!({ "url": presigned.uri(), "expires_in": 3600 }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign get_object failed");
internal_error("Failed to generate presigned URL", err.to_string())
}
}
}
async fn delete_object(body: Json<DeleteObjectRequest>) -> HttpResponse {
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client = build_client(&body.creds).await;
match client
.delete_object()
.bucket(&body.creds.bucket)
.key(&body.key)
.send()
.await
{
Ok(_) => api_success(
"Object deleted",
json!({ "key": body.key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 delete_object failed");
internal_error("Failed to delete object", err.to_string())
}
}
}
async fn create_folder(body: Json<CreateFolderRequest>) -> HttpResponse {
let prefix = body.prefix.trim().trim_end_matches('/');
if prefix.is_empty() {
return bad_request("Prefix is required", "prefix field is empty");
}
let folder_key = format!("{}/", prefix);
let client = build_client(&body.creds).await;
match client
.put_object()
.bucket(&body.creds.bucket)
.key(&folder_key)
.body(aws_sdk_s3::primitives::ByteStream::from_static(b""))
.send()
.await
{
Ok(_) => api_success(
"Folder created",
json!({ "key": folder_key, "bucket": body.creds.bucket }),
),
Err(err) => {
warn!(key = %folder_key, error = %err, "S3 put_object (create folder) failed");
internal_error("Failed to create folder", err.to_string())
}
}
}
async fn presign_upload(body: Json<PresignUploadRequest>) -> HttpResponse {
if body.key.trim().is_empty() {
return bad_request("Key is required", "key field is empty");
}
let client = build_client(&body.creds).await;
let presign_config = match PresigningConfig::expires_in(Duration::from_secs(3600)) {
Ok(cfg) => cfg,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
let mut req = client
.put_object()
.bucket(&body.creds.bucket)
.key(&body.key);
if let Some(ct) = &body.content_type {
req = req.content_type(ct);
}
match req.presigned(presign_config).await {
Ok(presigned) => api_success(
"Upload URL generated",
json!({
"url": presigned.uri(),
"method": "PUT",
"key": body.key,
"expires_in": 3600
}),
),
Err(err) => {
warn!(key = %body.key, error = %err, "S3 presign put_object failed");
internal_error("Failed to generate upload URL", err.to_string())
}
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.route("/storage/objects", web::post().to(list_objects))
.route("/storage/objects/url", web::post().to(get_object_url))
.route("/storage/objects/delete", web::post().to(delete_object))
.route("/storage/objects/folder", web::post().to(create_folder))
.route(
"/storage/objects/upload-url",
web::post().to(presign_upload),
);
}