use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{
body::Body,
extract::{FromRequest, Path, Query, State},
http::{HeaderMap, StatusCode},
response::{Html, IntoResponse, Response},
routing::{delete, get, head, options, post, put},
Router,
};
use http_body_util::BodyExt;
use serde::Deserialize;
use std::sync::Arc;
use utoipa::OpenApi;
use crate::AppState;
use super::utils::{error_response, malformed_xml_response};
use super::{
bucket_stubs, cors, graphql, handlers, multipart, observability_handlers, openapi,
preprocessing_handlers, query_intelligence_handlers, replication_handlers,
select_cache_handlers, tiering_handlers, training_handlers, websocket,
};
#[derive(Debug, Deserialize, Default)]
pub struct ObjectQueryParams {
#[serde(rename = "uploadId")]
pub upload_id: Option<String>,
#[serde(rename = "partNumber")]
pub part_number: Option<u32>,
pub uploads: Option<String>,
pub tagging: Option<String>,
pub acl: Option<String>,
pub attributes: Option<String>,
pub restore: Option<String>,
#[serde(rename = "legal-hold")]
pub legal_hold: Option<String>,
pub retention: Option<String>,
pub select: Option<String>,
#[serde(rename = "select-type")]
pub select_type: Option<String>,
pub torrent: Option<String>,
#[serde(rename = "max-parts")]
pub max_parts: Option<u32>,
#[serde(rename = "part-number-marker")]
pub part_number_marker: Option<u32>,
#[serde(rename = "versionId")]
pub version_id: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub struct BucketPostQueryParams {
pub delete: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub struct BucketPutQueryParams {
pub versioning: Option<String>,
pub acl: Option<String>,
pub tagging: Option<String>,
pub policy: Option<String>,
pub encryption: Option<String>,
pub lifecycle: Option<String>,
pub cors: Option<String>,
pub notification: Option<String>,
pub logging: Option<String>,
#[serde(rename = "requestPayment")]
pub request_payment: Option<String>,
pub website: Option<String>,
pub replication: Option<String>,
pub accelerate: Option<String>,
#[serde(rename = "ownershipControls")]
pub ownership_controls: Option<String>,
#[serde(rename = "publicAccessBlock")]
pub public_access_block: Option<String>,
#[serde(rename = "intelligent-tiering")]
pub intelligent_tiering: Option<String>,
#[serde(rename = "object-lock")]
pub object_lock: Option<String>,
pub metrics: Option<String>,
pub analytics: Option<String>,
pub inventory: Option<String>,
pub id: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub struct BucketDeleteQueryParams {
pub tagging: Option<String>,
pub policy: Option<String>,
pub encryption: Option<String>,
pub lifecycle: Option<String>,
pub cors: Option<String>,
pub website: Option<String>,
pub replication: Option<String>,
#[serde(rename = "ownershipControls")]
pub ownership_controls: Option<String>,
#[serde(rename = "publicAccessBlock")]
pub public_access_block: Option<String>,
#[serde(rename = "intelligent-tiering")]
pub intelligent_tiering: Option<String>,
pub id: Option<String>,
pub metrics: Option<String>,
pub analytics: Option<String>,
pub inventory: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub struct BucketGetQueryParams {
pub uploads: Option<String>,
pub location: Option<String>,
pub versioning: Option<String>,
pub acl: Option<String>,
pub tagging: Option<String>,
pub policy: Option<String>,
pub versions: Option<String>,
pub encryption: Option<String>,
pub lifecycle: Option<String>,
pub cors: Option<String>,
pub notification: Option<String>,
pub logging: Option<String>,
#[serde(rename = "requestPayment")]
pub request_payment: Option<String>,
pub website: Option<String>,
pub replication: Option<String>,
pub accelerate: Option<String>,
#[serde(rename = "ownershipControls")]
pub ownership_controls: Option<String>,
#[serde(rename = "publicAccessBlock")]
pub public_access_block: Option<String>,
#[serde(rename = "intelligent-tiering")]
pub intelligent_tiering: Option<String>,
#[serde(rename = "object-lock")]
pub object_lock: Option<String>,
pub metrics: Option<String>,
pub analytics: Option<String>,
pub inventory: Option<String>,
pub id: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
#[serde(rename = "max-keys")]
pub max_keys: Option<usize>,
#[serde(rename = "encoding-type")]
pub encoding_type: Option<String>,
pub marker: Option<String>,
#[serde(rename = "continuation-token")]
pub continuation_token: Option<String>,
#[serde(rename = "start-after")]
pub start_after: Option<String>,
#[serde(rename = "list-type")]
pub list_type: Option<String>,
#[serde(rename = "max-uploads")]
pub max_uploads: Option<u32>,
#[serde(rename = "key-marker")]
pub key_marker: Option<String>,
#[serde(rename = "version-id-marker")]
pub version_id_marker: Option<String>,
#[serde(rename = "upload-id-marker")]
pub upload_id_marker: Option<String>,
}
async fn get_bucket_dispatcher(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketGetQueryParams>,
) -> Response {
if query.location.is_some() {
return handlers::get_bucket_location(State(state), Path(bucket))
.await
.into_response();
}
if query.versioning.is_some() {
return handlers::get_bucket_versioning(State(state), Path(bucket))
.await
.into_response();
}
if query.acl.is_some() {
return handlers::get_bucket_acl(State(state), Path(bucket))
.await
.into_response();
}
if query.tagging.is_some() {
return handlers::get_bucket_tagging(State(state), Path(bucket))
.await
.into_response();
}
if query.policy.is_some() {
return handlers::get_bucket_policy(State(state), Path(bucket))
.await
.into_response();
}
if query.versions.is_some() {
return handlers::list_object_versions(
State(state),
Path(bucket),
Query(handlers::ListObjectVersionsQuery {
prefix: query.prefix,
delimiter: query.delimiter,
max_keys: query.max_keys,
key_marker: query.key_marker,
version_id_marker: query.version_id_marker,
}),
)
.await
.into_response();
}
if query.encryption.is_some() {
return bucket_stubs::get_bucket_encryption(State(state), Path(bucket))
.await
.into_response();
}
if query.lifecycle.is_some() {
return bucket_stubs::get_bucket_lifecycle(State(state), Path(bucket))
.await
.into_response();
}
if query.cors.is_some() {
return bucket_stubs::get_bucket_cors(State(state), Path(bucket))
.await
.into_response();
}
if query.notification.is_some() {
return bucket_stubs::get_bucket_notification(State(state), Path(bucket))
.await
.into_response();
}
if query.logging.is_some() {
return bucket_stubs::get_bucket_logging(State(state), Path(bucket))
.await
.into_response();
}
if query.request_payment.is_some() {
return bucket_stubs::get_bucket_request_payment(State(state), Path(bucket))
.await
.into_response();
}
if query.website.is_some() {
return bucket_stubs::get_bucket_website(State(state), Path(bucket))
.await
.into_response();
}
if query.replication.is_some() {
return bucket_stubs::get_bucket_replication(State(state), Path(bucket))
.await
.into_response();
}
if query.accelerate.is_some() {
return bucket_stubs::get_bucket_accelerate(State(state), Path(bucket))
.await
.into_response();
}
if query.ownership_controls.is_some() {
return bucket_stubs::get_bucket_ownership_controls(State(state), Path(bucket))
.await
.into_response();
}
if query.public_access_block.is_some() {
return bucket_stubs::get_public_access_block(State(state), Path(bucket))
.await
.into_response();
}
if query.intelligent_tiering.is_some() {
return bucket_stubs::get_bucket_intelligent_tiering(
State(state),
Path(bucket),
query.id.unwrap_or_default(),
)
.await
.into_response();
}
if query.object_lock.is_some() {
return bucket_stubs::get_object_lock_configuration(State(state), Path(bucket))
.await
.into_response();
}
if query.metrics.is_some() {
if let Some(ref id) = query.id {
return bucket_stubs::get_bucket_metrics_configuration(
State(state),
Path(bucket),
id.clone(),
)
.await
.into_response();
} else {
return bucket_stubs::list_bucket_metrics_configurations(State(state), Path(bucket))
.await
.into_response();
}
}
if query.analytics.is_some() {
if let Some(ref id) = query.id {
return bucket_stubs::get_bucket_analytics_configuration(
State(state),
Path(bucket),
id.clone(),
)
.await
.into_response();
} else {
return bucket_stubs::list_bucket_analytics_configurations(State(state), Path(bucket))
.await
.into_response();
}
}
if query.inventory.is_some() {
if let Some(ref id) = query.id {
return bucket_stubs::get_bucket_inventory_configuration(
State(state),
Path(bucket),
id.clone(),
)
.await
.into_response();
} else {
return bucket_stubs::list_bucket_inventory_configurations(State(state), Path(bucket))
.await
.into_response();
}
}
if query.uploads.is_some() {
return handlers::list_multipart_uploads(
State(state),
Path(bucket),
Query(handlers::ListMultipartUploadsQuery {
prefix: query.prefix,
delimiter: query.delimiter,
max_uploads: query.max_uploads,
key_marker: query.key_marker,
upload_id_marker: query.upload_id_marker,
}),
)
.await
.into_response();
}
if query.list_type.as_deref() == Some("2") {
return handlers::list_objects_v2(
State(state),
Path(bucket),
Query(handlers::ListObjectsV2Query {
prefix: query.prefix,
delimiter: query.delimiter,
max_keys: query.max_keys,
continuation_token: query.continuation_token,
start_after: query.start_after,
encoding_type: query.encoding_type,
list_type: query.list_type,
}),
)
.await
.into_response();
}
handlers::list_objects_v1(
State(state),
Path(bucket),
Query(handlers::ListObjectsV1Query {
prefix: query.prefix,
delimiter: query.delimiter,
max_keys: query.max_keys,
marker: query.marker,
encoding_type: query.encoding_type,
}),
)
.await
.into_response()
}
async fn put_object_dispatcher(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQueryParams>,
headers: HeaderMap,
body: Body,
) -> Response {
if query.tagging.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return handlers::put_object_tagging(State(state), Path((bucket, key)), body_bytes)
.await
.into_response();
}
if query.acl.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return error_response(
StatusCode::BAD_REQUEST,
"IncompleteBody",
&format!("Failed to read request body: {}", e),
&format!("/{}/{}", bucket, key),
)
.into_response();
}
};
return handlers::put_object_acl(
State(state),
Path((bucket, key)),
headers.clone(),
body_bytes,
)
.await
.into_response();
}
if query.legal_hold.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
let bypass = headers
.get("x-amz-bypass-governance-retention")
.and_then(|v| v.to_str().ok())
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
return bucket_stubs::put_object_legal_hold(
State(state),
Path((bucket, key)),
query.version_id,
bypass,
body_bytes,
)
.await
.into_response();
}
if query.retention.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
let bypass = headers
.get("x-amz-bypass-governance-retention")
.and_then(|v| v.to_str().ok())
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
return bucket_stubs::put_object_retention(
State(state),
Path((bucket, key)),
query.version_id,
bypass,
body_bytes,
)
.await
.into_response();
}
if let (Some(upload_id), Some(part_number)) = (&query.upload_id, query.part_number) {
if headers.contains_key("x-amz-copy-source") {
return multipart::upload_part_copy(
State(state),
Path((bucket, key)),
Query(multipart::MultipartQuery {
upload_id: Some(upload_id.clone()),
part_number: Some(part_number),
uploads: None,
max_parts: None,
part_number_marker: None,
}),
headers,
)
.await
.into_response();
}
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return multipart::upload_part(
State(state),
Path((bucket, key)),
Query(multipart::MultipartQuery {
upload_id: Some(upload_id.clone()),
part_number: Some(part_number),
uploads: None,
max_parts: None,
part_number_marker: None,
}),
body_bytes,
)
.await
.into_response();
}
if headers.contains_key("x-amz-copy-source") {
return handlers::copy_object(State(state), Path((bucket, key)), headers)
.await
.into_response();
}
handlers::put_object(State(state), Path((bucket, key)), headers, body)
.await
.into_response()
}
async fn post_object_dispatcher(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQueryParams>,
headers: HeaderMap,
body: Body,
) -> Response {
if query.restore.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
tracing::error!("Failed to collect RestoreObject body: {}", e);
return (StatusCode::BAD_REQUEST, "Failed to read request body").into_response();
}
};
return handlers::restore_object(State(state), Path((bucket, key)), body_bytes)
.await
.into_response();
}
if query.select.is_some() || query.select_type.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
tracing::error!("Failed to collect request body: {}", e);
return (StatusCode::BAD_REQUEST, "Failed to read request body").into_response();
}
};
return handlers::select_object_content(State(state), Path((bucket, key)), body_bytes)
.await
.into_response();
}
if query.uploads.is_some() {
return multipart::create_multipart_upload(State(state), Path((bucket, key)), headers)
.await
.into_response();
}
if let Some(upload_id) = query.upload_id {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return multipart::complete_multipart_upload(
State(state),
Path((bucket, key)),
Query(multipart::MultipartQuery {
upload_id: Some(upload_id),
part_number: None,
uploads: None,
max_parts: None,
part_number_marker: None,
}),
body_bytes,
)
.await
.into_response();
}
(StatusCode::BAD_REQUEST, "Invalid POST operation").into_response()
}
async fn get_object_dispatcher(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQueryParams>,
headers: HeaderMap,
) -> Response {
if query.tagging.is_some() {
return handlers::get_object_tagging(State(state), Path((bucket, key)))
.await
.into_response();
}
if query.acl.is_some() {
return handlers::get_object_acl(State(state), Path((bucket, key)))
.await
.into_response();
}
if query.attributes.is_some() {
return handlers::get_object_attributes(State(state), Path((bucket, key)), headers)
.await
.into_response();
}
if query.legal_hold.is_some() {
return bucket_stubs::get_object_legal_hold(
State(state),
Path((bucket, key)),
query.version_id,
)
.await
.into_response();
}
if query.retention.is_some() {
return bucket_stubs::get_object_retention(
State(state),
Path((bucket, key)),
query.version_id,
)
.await
.into_response();
}
if query.torrent.is_some() {
return bucket_stubs::get_object_torrent(State(state), Path((bucket, key)))
.await
.into_response();
}
if let Some(upload_id) = query.upload_id {
return multipart::list_parts(
State(state),
Path((bucket, key)),
Query(multipart::MultipartQuery {
upload_id: Some(upload_id),
part_number: None,
uploads: None,
max_parts: query.max_parts,
part_number_marker: query.part_number_marker,
}),
)
.await
.into_response();
}
handlers::get_object(State(state), Path((bucket, key)), headers)
.await
.into_response()
}
async fn delete_object_dispatcher(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQueryParams>,
headers: HeaderMap,
) -> Response {
if query.tagging.is_some() {
return handlers::delete_object_tagging(State(state), Path((bucket, key)))
.await
.into_response();
}
if let Some(upload_id) = query.upload_id {
return multipart::abort_multipart_upload(
State(state),
Path((bucket, key)),
Query(multipart::MultipartQuery {
upload_id: Some(upload_id),
part_number: None,
uploads: None,
max_parts: None,
part_number_marker: None,
}),
)
.await
.into_response();
}
let version_str = query.version_id.as_deref().unwrap_or("null");
let bypass = headers
.get("x-amz-bypass-governance-retention")
.and_then(|v| v.to_str().ok())
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
match state
.storage
.object_lock_manager
.is_protected(&bucket, &key, version_str)
.await
{
Ok(true) => {
let active_mode = state
.storage
.object_lock_manager
.retention_mode(&bucket, &key, version_str)
.await
.ok()
.flatten();
let resource = format!("/{}/{}", bucket, key);
if let Some(mode) = active_mode {
if mode == "COMPLIANCE" {
return super::utils::error_response(
StatusCode::FORBIDDEN,
"AccessDenied",
"Object is protected by a COMPLIANCE retention policy and cannot be deleted.",
&resource,
);
}
if bypass {
return handlers::delete_object(State(state), Path((bucket, key)))
.await
.into_response();
}
return super::utils::error_response(
StatusCode::FORBIDDEN,
"AccessDenied",
"Object is under GOVERNANCE retention; supply x-amz-bypass-governance-retention: true to delete.",
&resource,
);
}
return super::utils::error_response(
StatusCode::FORBIDDEN,
"AccessDenied",
"Object is protected by a Legal Hold and cannot be deleted.",
&resource,
);
}
Ok(false) => {}
Err(_) => {} }
handlers::delete_object(State(state), Path((bucket, key)))
.await
.into_response()
}
async fn post_bucket_dispatcher(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketPostQueryParams>,
headers: HeaderMap,
body: Body,
) -> Response {
if query.delete.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return handlers::delete_objects(State(state), Path(bucket), headers.clone(), body_bytes)
.await
.into_response();
}
if let Some(content_type) = headers.get("content-type") {
if let Ok(ct) = content_type.to_str() {
if ct.starts_with("multipart/form-data") {
let req = match axum::http::Request::builder()
.header("content-type", ct)
.body(body)
{
Ok(r) => r,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to build request: {}", e),
)
.into_response();
}
};
let multipart = match axum::extract::Multipart::from_request(req, &state).await {
Ok(m) => m,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
format!("Failed to parse multipart: {}", e),
)
.into_response();
}
};
return handlers::post_object(State(state), Path(bucket), multipart)
.await
.into_response();
}
}
}
(StatusCode::BAD_REQUEST, "Invalid POST operation on bucket").into_response()
}
async fn put_bucket_dispatcher(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketPutQueryParams>,
headers: HeaderMap,
body: Body,
) -> Response {
if query.versioning.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return handlers::put_bucket_versioning(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.acl.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return error_response(
StatusCode::BAD_REQUEST,
"IncompleteBody",
&format!("Failed to read request body: {}", e),
&format!("/{}", bucket),
)
.into_response();
}
};
return handlers::put_bucket_acl(State(state), Path(bucket), headers, body_bytes)
.await
.into_response();
}
if query.tagging.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return handlers::put_bucket_tagging(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.policy.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return handlers::put_bucket_policy(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.encryption.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_encryption(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.lifecycle.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_lifecycle(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.cors.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_cors(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.notification.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
return bucket_stubs::put_bucket_notification(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.logging.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_logging(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.request_payment.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_request_payment(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.website.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_website(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.replication.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
return bucket_stubs::put_bucket_replication(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.accelerate.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
return bucket_stubs::put_bucket_accelerate(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.ownership_controls.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_bucket_ownership_controls(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.public_access_block.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
return bucket_stubs::put_public_access_block(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.intelligent_tiering.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
return bucket_stubs::put_bucket_intelligent_tiering(
State(state),
Path(bucket),
body_bytes,
)
.await
.into_response();
}
if query.object_lock.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
return bucket_stubs::put_object_lock_configuration(State(state), Path(bucket), body_bytes)
.await
.into_response();
}
if query.metrics.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
let id = query.id.unwrap_or_default();
return bucket_stubs::put_bucket_metrics_configuration(
State(state),
Path(bucket),
id,
body_bytes,
)
.await
.into_response();
}
if query.analytics.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
let id = query.id.unwrap_or_default();
return bucket_stubs::put_bucket_analytics_configuration(
State(state),
Path(bucket),
id,
body_bytes,
)
.await
.into_response();
}
if query.inventory.is_some() {
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return malformed_xml_response("Failed to read request body").into_response(),
};
let id = query.id.unwrap_or_default();
return bucket_stubs::put_bucket_inventory_configuration(
State(state),
Path(bucket),
id,
body_bytes,
)
.await
.into_response();
}
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read body: {}", e),
)
.into_response();
}
};
if !body_bytes.is_empty() {
let body_str = String::from_utf8_lossy(&body_bytes);
let trimmed = body_str.trim();
if !trimmed.starts_with('<') {
return (
StatusCode::BAD_REQUEST,
[("content-type", "application/xml")],
r#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>MalformedXML</Code><Message>The XML you provided was not well-formed</Message></Error>"#,
)
.into_response();
}
let mut depth: i32 = 0;
let chars: Vec<char> = trimmed.chars().collect();
let mut i = 0;
let mut malformed = false;
while i < chars.len() {
if chars[i] == '<' {
if i + 1 >= chars.len() {
malformed = true;
break;
}
if let Some(gt) = chars[i..].iter().position(|&c| c == '>') {
let tag: String = chars[i..i + gt + 1].iter().collect();
if tag.starts_with("</") {
depth -= 1;
} else if !tag.ends_with("/>")
&& !tag.starts_with("<?")
&& !tag.starts_with("<!")
{
depth += 1;
}
i += gt + 1;
} else {
malformed = true;
break;
}
} else {
i += 1;
}
}
if malformed || depth != 0 {
return (
StatusCode::BAD_REQUEST,
[("content-type", "application/xml")],
r#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>MalformedXML</Code><Message>The XML you provided was not well-formed</Message></Error>"#,
)
.into_response();
}
}
let object_lock_enabled = headers
.get("x-amz-bucket-object-lock-enabled")
.and_then(|v| v.to_str().ok())
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
handlers::create_bucket(State(state), Path(bucket), object_lock_enabled)
.await
.into_response()
}
async fn delete_bucket_dispatcher(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketDeleteQueryParams>,
) -> Response {
if query.tagging.is_some() {
return handlers::delete_bucket_tagging(State(state), Path(bucket))
.await
.into_response();
}
if query.policy.is_some() {
return handlers::delete_bucket_policy(State(state), Path(bucket))
.await
.into_response();
}
if query.encryption.is_some() {
return bucket_stubs::delete_bucket_encryption(State(state), Path(bucket))
.await
.into_response();
}
if query.lifecycle.is_some() {
return bucket_stubs::delete_bucket_lifecycle(State(state), Path(bucket))
.await
.into_response();
}
if query.cors.is_some() {
return bucket_stubs::delete_bucket_cors(State(state), Path(bucket))
.await
.into_response();
}
if query.website.is_some() {
return bucket_stubs::delete_bucket_website(State(state), Path(bucket))
.await
.into_response();
}
if query.replication.is_some() {
return bucket_stubs::delete_bucket_replication(State(state), Path(bucket))
.await
.into_response();
}
if query.ownership_controls.is_some() {
return bucket_stubs::delete_bucket_ownership_controls(State(state), Path(bucket))
.await
.into_response();
}
if query.public_access_block.is_some() {
return bucket_stubs::delete_public_access_block(State(state), Path(bucket))
.await
.into_response();
}
if query.intelligent_tiering.is_some() {
return bucket_stubs::delete_bucket_intelligent_tiering(
State(state),
Path(bucket),
query.id.unwrap_or_default(),
)
.await
.into_response();
}
if query.metrics.is_some() {
let id = query.id.clone().unwrap_or_default();
return bucket_stubs::delete_bucket_metrics_configuration(State(state), Path(bucket), id)
.await
.into_response();
}
if query.analytics.is_some() {
let id = query.id.clone().unwrap_or_default();
return bucket_stubs::delete_bucket_analytics_configuration(State(state), Path(bucket), id)
.await
.into_response();
}
if query.inventory.is_some() {
let id = query.id.clone().unwrap_or_default();
return bucket_stubs::delete_bucket_inventory_configuration(State(state), Path(bucket), id)
.await
.into_response();
}
handlers::delete_bucket(State(state), Path(bucket))
.await
.into_response()
}
async fn write_get_object_response_handler(
State(state): State<AppState>,
headers: HeaderMap,
) -> Response {
bucket_stubs::write_get_object_response(State(state), headers)
.await
.into_response()
}
async fn graphql_handler(State(state): State<AppState>, req: GraphQLRequest) -> GraphQLResponse {
let schema = graphql::build_schema(
state.storage.clone(),
Arc::new(state.event_broadcaster.clone()),
);
schema.execute(req.into_inner()).await.into()
}
async fn graphql_playground_handler() -> impl IntoResponse {
Html(
r#"
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>rs3gw GraphQL Playground</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://unpkg.com/graphql-playground-react/build/static/css/index.css">
<link rel="shortcut icon" href="https://unpkg.com/graphql-playground-react/build/favicon.png">
<script src="https://unpkg.com/graphql-playground-react/build/static/js/middleware.js"></script>
</head>
<body>
<div id="root"></div>
<script>
window.addEventListener('load', function (event) {
GraphQLPlayground.init(document.getElementById('root'), {
endpoint: '/graphql',
settings: {
'editor.theme': 'dark',
'editor.cursorShape': 'line'
},
tabs: [
{
endpoint: '/graphql',
query: `# Welcome to rs3gw GraphQL API!
#
# Example queries:
# List all buckets
query ListBuckets {
buckets {
name
createdAt
region
}
}
# Get bucket details
query GetBucket {
bucket(name: "my-bucket") {
name
objectCount
totalSize
tags {
key
value
}
}
}
# List objects in a bucket
query ListObjects {
objects(bucket: "my-bucket", limit: 10) {
key
size
lastModified
contentType
}
}
# Search objects by pattern
query SearchObjects {
searchObjects(pattern: "data", limit: 10) {
key
bucket
size
}
}
# Get storage statistics
query GetStats {
stats {
bucketCount
totalObjects
totalSizeBytes
}
}
`
}
]
})
})
</script>
</body>
</html>
"#,
)
}
pub fn routes() -> Router<AppState> {
Router::new()
.route("/health", get(handlers::health_check))
.route("/ready", get(handlers::ready_check))
.route("/metrics", get(handlers::metrics))
.route("/api/admin/gc/multipart", post(handlers::admin_gc_multipart))
.merge(utoipa_swagger_ui::SwaggerUi::new("/swagger-ui")
.url("/openapi.json", openapi::ApiDoc::openapi()))
.route("/graphql", get(graphql_playground_handler).post(graphql_handler))
.route("/events/stream", get(websocket::ws_handler))
.route("/api/replication/{bucket}/config",
get(replication_handlers::get_replication_config)
.put(replication_handlers::set_replication_config)
.delete(replication_handlers::delete_replication_config))
.route("/api/replication/metrics", get(replication_handlers::get_replication_metrics))
.route("/api/replication/metrics/{destination}", get(replication_handlers::get_destination_metrics))
.route("/api/replication/flush", post(replication_handlers::flush_replication_batches))
.route("/api/observability/profiling", get(observability_handlers::get_profiling_data))
.route("/api/observability/business-metrics", get(observability_handlers::get_business_metrics))
.route("/api/observability/anomalies", get(observability_handlers::get_anomalies))
.route("/api/observability/resources", get(observability_handlers::get_resource_stats))
.route("/api/observability/health", get(observability_handlers::get_comprehensive_health))
.route("/api/observability/predictions/storage-growth", get(observability_handlers::get_storage_growth_prediction))
.route("/api/observability/predictions/access-patterns", get(observability_handlers::get_access_pattern_prediction))
.route("/api/observability/predictions/costs", get(observability_handlers::get_cost_forecast))
.route("/api/observability/predictions/capacity", get(observability_handlers::get_capacity_recommendations))
.route("/api/preprocessing/pipelines", post(preprocessing_handlers::create_pipeline))
.route("/api/preprocessing/pipelines", get(preprocessing_handlers::list_pipelines))
.route("/api/preprocessing/pipelines/{id}", get(preprocessing_handlers::get_pipeline))
.route("/api/preprocessing/pipelines/{id}", delete(preprocessing_handlers::delete_pipeline))
.route("/api/preprocessing/apply", post(preprocessing_handlers::apply_pipeline))
.route("/api/preprocessing/validate", post(preprocessing_handlers::validate_pipeline))
.route("/api/preprocessing/cache/stats", get(preprocessing_handlers::get_cache_stats))
.route("/api/preprocessing/cache/clear", post(preprocessing_handlers::clear_cache))
.route("/api/tiering/policies/{bucket}", get(tiering_handlers::get_tiering_policy))
.route("/api/tiering/policies/{bucket}", put(tiering_handlers::set_tiering_policy))
.route("/api/tiering/policies/{bucket}", delete(tiering_handlers::delete_tiering_policy))
.route("/api/tiering/analyze/{bucket}", post(tiering_handlers::analyze_tiering))
.route("/api/tiering/analyze/{bucket}/predictive", post(tiering_handlers::analyze_tiering_predictive))
.route("/api/tiering/recommendations/{bucket}/capacity", get(tiering_handlers::get_capacity_recommendations))
.route("/api/tiering/apply/{bucket}", post(tiering_handlers::apply_tiering_recommendations))
.route("/api/tiering/history", get(tiering_handlers::get_transition_history))
.route("/api/tiering/history/{bucket}", get(tiering_handlers::get_bucket_transition_history))
.route("/api/select/cache/stats", get(select_cache_handlers::get_cache_stats))
.route("/api/select/cache/clear", post(select_cache_handlers::clear_cache))
.route("/api/select/cache/invalidate/{etag}", delete(select_cache_handlers::invalidate_object_cache))
.route("/api/select/cache/patterns", get(select_cache_handlers::get_pattern_stats))
.route("/api/select/cache/patterns/top", get(select_cache_handlers::get_top_queries))
.route("/api/select/cache/patterns/recent", get(select_cache_handlers::get_recent_queries))
.route("/api/select/cache/patterns/clear", post(select_cache_handlers::clear_patterns))
.route("/api/select/cache/save", post(select_cache_handlers::save_cache))
.route("/api/select/cache/load", post(select_cache_handlers::load_cache))
.route("/api/query/intelligence/statistics", get(query_intelligence_handlers::get_statistics))
.route("/api/query/intelligence/summary", get(query_intelligence_handlers::get_summary))
.route("/api/query/intelligence/predict-cost", post(query_intelligence_handlers::predict_cost))
.route("/api/query/intelligence/recommend-strategy", post(query_intelligence_handlers::recommend_strategy))
.route("/api/query/intelligence/find-similar", post(query_intelligence_handlers::find_similar))
.route("/api/query/intelligence/index-recommendations", get(query_intelligence_handlers::get_index_recommendations))
.route("/api/query/intelligence/complexity-distribution", get(query_intelligence_handlers::get_complexity_distribution))
.route("/api/training/experiments", post(training_handlers::create_experiment))
.route("/api/training/experiments/{experiment_id}", get(training_handlers::get_experiment))
.route("/api/training/experiments/{experiment_id}/status", put(training_handlers::update_experiment_status))
.route("/api/training/experiments/{experiment_id}/checkpoints", post(training_handlers::save_checkpoint))
.route("/api/training/experiments/{experiment_id}/checkpoints", get(training_handlers::list_checkpoints))
.route("/api/training/checkpoints/{checkpoint_id}", get(training_handlers::load_checkpoint))
.route("/api/training/experiments/{experiment_id}/metrics", post(training_handlers::log_metrics))
.route("/api/training/experiments/{experiment_id}/metrics", get(training_handlers::get_metrics))
.route("/api/training/searches", post(training_handlers::create_search))
.route("/api/training/searches/{search_id}", get(training_handlers::get_search))
.route("/api/training/searches/{search_id}/trials", post(training_handlers::add_trial))
.route("/presign/{bucket}/{*key}", get(handlers::generate_presigned_url))
.route("/", get(handlers::list_buckets))
.route("/{bucket}", head(handlers::head_bucket))
.route("/{bucket}/", head(handlers::head_bucket))
.route("/{bucket}", get(get_bucket_dispatcher))
.route("/{bucket}/", get(get_bucket_dispatcher))
.route("/{bucket}", put(put_bucket_dispatcher))
.route("/{bucket}/", put(put_bucket_dispatcher))
.route("/{bucket}", delete(delete_bucket_dispatcher))
.route("/{bucket}/", delete(delete_bucket_dispatcher))
.route("/{bucket}", post(post_bucket_dispatcher))
.route("/{bucket}/", post(post_bucket_dispatcher))
.route("/{bucket}/{*key}", head(handlers::head_object))
.route("/{bucket}/{*key}", get(get_object_dispatcher))
.route("/{bucket}/{*key}", put(put_object_dispatcher))
.route("/{bucket}/{*key}", post(post_object_dispatcher))
.route("/{bucket}/{*key}", delete(delete_object_dispatcher))
.route("/WriteGetObjectResponse", post(write_get_object_response_handler))
.route("/{bucket}", options(cors::options_bucket_handler))
.route("/{bucket}/", options(cors::options_bucket_handler))
.route("/{bucket}/{*key}", options(cors::options_object_handler))
}