use actix_web::error::ErrorInternalServerError;
use actix_web::http::header;
use actix_web::{HttpRequest, HttpResponse, web, web::Data, web::Json, web::Path, web::Query};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::operation::copy_object::builders::CopyObjectFluentBuilder;
use aws_sdk_s3::operation::create_multipart_upload::builders::CreateMultipartUploadFluentBuilder;
use aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder;
use aws_sdk_s3::presigning::PresigningConfig;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{
CompletedMultipartUpload, CompletedPart, ObjectLockRetention, ObjectLockRetentionMode,
ServerSideEncryption,
};
use aws_smithy_types::date_time::Format as AwsDateTimeFormat;
use chrono::{DateTime, Utc};
use futures::stream;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sqlx::PgPool;
use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;
use tracing::warn;
use uuid::Uuid;
use super::auth::{require_storage_actor, resolve_optional_storage_actor};
use super::service::{build_s3_client_with_session_token, ensure_bucket_present};
use crate::AppState;
use crate::api::client_context::required_client_pool;
use crate::api::response::{
api_created, api_success, bad_request, forbidden, internal_error, not_found,
service_unavailable,
};
use crate::config_validation::runtime_env_settings;
use crate::utils::redis_client::{
GLOBAL_REDIS, note_redis_failure_and_start_cooldown, note_redis_success,
should_bypass_redis_temporarily,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UrlCacheBackend {
Disabled,
Postgres,
Redis,
PostgresRedis,
}
impl UrlCacheBackend {
fn from_env() -> Self {
match std::env::var("FILE_URL_CACHE_BACKEND")
.ok()
.unwrap_or_else(|| "postgres_redis".to_string())
.trim()
.to_ascii_lowercase()
.as_str()
{
"disabled" => Self::Disabled,
"postgres" => Self::Postgres,
"redis" => Self::Redis,
"postgres_redis" => Self::PostgresRedis,
_ => Self::PostgresRedis,
}
}
fn uses_postgres(self) -> bool {
matches!(self, Self::Postgres | Self::PostgresRedis)
}
fn uses_redis(self) -> bool {
matches!(self, Self::Redis | Self::PostgresRedis)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UrlCacheReadPrecedence {
RedisFirst,
PostgresFirst,
}
impl UrlCacheReadPrecedence {
fn from_env() -> Self {
match std::env::var("FILE_URL_CACHE_READ_PRECEDENCE")
.ok()
.unwrap_or_else(|| "redis_first".to_string())
.trim()
.to_ascii_lowercase()
.as_str()
{
"postgres_first" => Self::PostgresFirst,
_ => Self::RedisFirst,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UrlCacheWriteStrategy {
PostgresOnly,
RedisOnly,
PostgresAndRedis,
}
impl UrlCacheWriteStrategy {
fn from_env() -> Self {
match std::env::var("FILE_URL_CACHE_WRITE_STRATEGY")
.ok()
.unwrap_or_else(|| "postgres_and_redis".to_string())
.trim()
.to_ascii_lowercase()
.as_str()
{
"postgres_only" => Self::PostgresOnly,
"redis_only" => Self::RedisOnly,
_ => Self::PostgresAndRedis,
}
}
}
#[derive(Debug, Clone)]
struct StorageUrlCacheSettings {
backend: UrlCacheBackend,
read_precedence: UrlCacheReadPrecedence,
write_strategy: UrlCacheWriteStrategy,
presign_expires_seconds: u64,
cache_ttl_seconds: u64,
min_remaining_seconds: u64,
redis_prefix: String,
}
impl StorageUrlCacheSettings {
fn load() -> Self {
let runtime = runtime_env_settings();
let presign_expires_seconds = runtime.file_url_presign_expires_seconds.max(1);
let cache_ttl_seconds = runtime
.file_url_cache_ttl_seconds
.max(1)
.min(presign_expires_seconds);
let min_remaining_seconds = runtime
.file_url_cache_min_remaining_seconds
.min(cache_ttl_seconds.saturating_sub(1));
Self {
backend: UrlCacheBackend::from_env(),
read_precedence: UrlCacheReadPrecedence::from_env(),
write_strategy: UrlCacheWriteStrategy::from_env(),
presign_expires_seconds,
cache_ttl_seconds,
min_remaining_seconds,
redis_prefix: std::env::var("REDIS_FILE_URL_CACHE_PREFIX")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "athena:file-url:".to_string()),
}
}
}
static STORAGE_URL_CACHE_SETTINGS: Lazy<StorageUrlCacheSettings> =
Lazy::new(StorageUrlCacheSettings::load);
#[derive(Debug, Deserialize)]
struct UploadFileRequest {
s3_id: String,
#[serde(default)]
bucket: Option<String>,
storage_key: String,
#[serde(default)]
name: Option<String>,
#[serde(default)]
original_name: Option<String>,
#[serde(default)]
resource_id: Option<String>,
#[serde(default)]
mime_type: Option<String>,
#[serde(default)]
size_bytes: Option<i64>,
#[serde(default)]
file_id: Option<String>,
#[serde(default)]
public: Option<bool>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct UploadUrlRequest {
#[serde(flatten)]
file: UploadFileRequest,
#[serde(default)]
content_type: Option<String>,
#[serde(default, flatten)]
encryption: StorageServerSideEncryptionRequest,
}
#[derive(Debug, Deserialize)]
struct BatchUploadUrlRequest {
files: Vec<UploadUrlRequest>,
}
#[derive(Debug, Deserialize)]
struct GetFileUrlQuery {
#[serde(default)]
purpose: Option<String>,
}
#[derive(Debug, Deserialize)]
struct UpdateFileRequest {
#[serde(default)]
storage_key: Option<String>,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
file_name: Option<String>,
#[serde(default)]
original_name: Option<String>,
#[serde(default)]
resource_id: Option<String>,
#[serde(default)]
mime_type: Option<String>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
size_bytes: Option<i64>,
#[serde(default)]
checksum_sha256: Option<String>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct SetVisibilityRequest {
#[serde(default)]
public: Option<bool>,
#[serde(default)]
visibility: Option<String>,
}
#[derive(Debug, Deserialize)]
struct FolderDeleteRequest {
s3_id: String,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct FolderMoveRequest {
s3_id: String,
from_prefix: String,
to_prefix: String,
}
#[derive(Debug, Deserialize)]
struct ListFilesRequest {
s3_id: String,
#[serde(default)]
prefix: String,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
offset: Option<usize>,
#[serde(default)]
metadata: Option<Value>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
resource_id: Option<String>,
#[serde(default)]
mime_type: Option<String>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
key_prefix: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ConfirmUploadRequest {
#[serde(default)]
size_bytes: Option<i64>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
checksum_sha256: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct FileDeleteManyRequest {
file_ids: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct FileUpdateManyRequest {
file_ids: Vec<String>,
#[serde(default)]
storage_key: Option<String>,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
file_name: Option<String>,
#[serde(default)]
original_name: Option<String>,
#[serde(default)]
resource_id: Option<String>,
#[serde(default)]
mime_type: Option<String>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
size_bytes: Option<i64>,
#[serde(default)]
checksum_sha256: Option<String>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct FileVisibilityManyRequest {
file_ids: Vec<String>,
#[serde(default)]
public: Option<bool>,
#[serde(default)]
visibility: Option<String>,
}
#[derive(Debug, Deserialize)]
struct FileSearchRequest {
#[serde(default)]
query: Option<String>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
offset: Option<usize>,
#[serde(default)]
s3_id: Option<String>,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
metadata: Option<Value>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
resource_id: Option<String>,
#[serde(default)]
mime_type: Option<String>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
key_prefix: Option<String>,
}
#[derive(Debug, Deserialize)]
struct PermissionListRequest {
file_id: String,
}
#[derive(Debug, Deserialize)]
struct PermissionGrantRequest {
file_id: String,
principal_type: String,
principal_id: String,
permission: String,
#[serde(default)]
expires_at: Option<DateTime<Utc>>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct PermissionRevokeRequest {
file_id: String,
principal_type: String,
principal_id: String,
permission: String,
}
#[derive(Debug, Deserialize)]
struct PermissionCheckRequest {
file_id: String,
permission: String,
}
#[derive(Debug, Deserialize)]
struct CopyFileRequest {
storage_key: String,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
file_name: Option<String>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
metadata: Option<Value>,
#[serde(default, flatten)]
encryption: StorageServerSideEncryptionRequest,
}
#[derive(Debug, Deserialize)]
struct AuditListRequest {
#[serde(default)]
file_id: Option<String>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
offset: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct FileVersionPath {
file_id: String,
version_id: String,
}
#[derive(Debug, Deserialize)]
struct MultipartCreateRequest {
file_id: String,
#[serde(default)]
content_type: Option<String>,
#[serde(default, flatten)]
encryption: StorageServerSideEncryptionRequest,
}
#[derive(Debug, Default, Deserialize)]
struct StorageServerSideEncryptionRequest {
#[serde(default, alias = "sse")]
server_side_encryption: Option<String>,
#[serde(default, alias = "kms_key_id")]
ssekms_key_id: Option<String>,
#[serde(default)]
bucket_key_enabled: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct RetentionRequest {
#[serde(default)]
mode: Option<String>,
#[serde(default)]
retain_until: Option<DateTime<Utc>>,
#[serde(default)]
retain_until_date: Option<DateTime<Utc>>,
#[serde(default)]
version_id: Option<String>,
#[serde(default)]
bypass_governance: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct MultipartSignPartRequest {
file_id: String,
upload_id: String,
part_number: i32,
}
#[derive(Debug, Deserialize)]
struct MultipartCompletedPartInput {
part_number: i32,
etag: String,
}
#[derive(Debug, Deserialize)]
struct MultipartCompleteRequest {
file_id: String,
upload_id: String,
parts: Vec<MultipartCompletedPartInput>,
}
#[derive(Debug, Deserialize)]
struct MultipartAbortRequest {
file_id: String,
upload_id: String,
}
#[derive(Debug, Deserialize)]
struct MultipartListPartsRequest {
file_id: String,
upload_id: String,
}
#[derive(Debug, Serialize)]
struct AuthorizedFileResponse {
file: athena_s3::ManagedFileRecord,
}
#[derive(Debug, Serialize)]
struct PresignedFileUrlResponse {
file_id: String,
bucket: String,
storage_key: String,
purpose: String,
url: String,
headers: BTreeMap<String, String>,
expires_at: String,
expires_at_epoch_seconds: i64,
expires_in: u64,
cache_hit: bool,
cache_layer: String,
}
#[derive(Debug, Serialize)]
struct UploadUrlResponse {
file: athena_s3::ManagedFileRecord,
upload: PresignedFileUrlResponse,
}
#[derive(Debug, Serialize)]
struct BatchUploadUrlResponse {
files: Vec<UploadUrlResponse>,
}
#[derive(Debug, Serialize)]
struct ListFilesResponse {
files: Vec<athena_s3::ManagedFileRecord>,
count: usize,
}
#[derive(Debug, Serialize)]
struct FileMutationResponse {
file: athena_s3::ManagedFileRecord,
}
#[derive(Debug, Serialize)]
struct FolderMutationResponse {
s3_id: String,
prefix: String,
processed_files: usize,
}
#[derive(Debug, Serialize)]
struct FileMutationManyResponse {
files: Vec<athena_s3::ManagedFileRecord>,
count: usize,
}
#[derive(Debug, Serialize)]
struct PermissionListResponse {
permissions: Vec<athena_s3::FilePermissionRecord>,
count: usize,
}
#[derive(Debug, Serialize)]
struct PermissionCheckResponse {
allowed: bool,
permission: String,
}
#[derive(Debug, Serialize)]
struct AuditListResponse {
events: Vec<athena_s3::StorageAuditEventRecord>,
count: usize,
}
struct AuthorizedFileReadContext {
file: athena_s3::ManagedFileRecord,
client: S3Client,
bucket: String,
}
pub fn configure_storage_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/files")
.route("/upload-url", web::post().to(create_upload_url))
.route("/upload-urls", web::post().to(create_batch_upload_urls))
.route("/list", web::post().to(list_files))
.route("/search", web::post().to(search_files))
.route("/delete-many", web::post().to(delete_many_files))
.route("/update-many", web::post().to(update_many_files))
.route("/visibility-many", web::post().to(set_many_file_visibility))
.route("/{file_id}", web::get().to(get_file))
.route("/{file_id}", web::patch().to(update_file))
.route("/{file_id}", web::delete().to(delete_file))
.route("/{file_id}/confirm-upload", web::post().to(confirm_upload))
.route("/{file_id}/upload", web::put().to(upload_file_binary))
.route("/{file_id}/copy", web::post().to(copy_file))
.route("/{file_id}/restore", web::post().to(restore_file))
.route("/{file_id}/purge", web::delete().to(purge_file))
.route("/{file_id}/versions", web::get().to(list_file_versions))
.route(
"/{file_id}/versions/{version_id}/restore",
web::post().to(restore_file_version),
)
.route(
"/{file_id}/versions/{version_id}",
web::delete().to(delete_file_version),
)
.route("/{file_id}/retention", web::get().to(get_file_retention))
.route("/{file_id}/retention", web::post().to(set_file_retention))
.route("/{file_id}/url", web::get().to(get_file_url))
.route("/{file_id}/public-url", web::get().to(get_file_public_url))
.route("/{file_id}/proxy-url", web::get().to(get_file_proxy_url))
.route("/{file_id}/proxy", web::get().to(proxy_file))
.route("/{file_id}/visibility", web::post().to(set_file_visibility))
.route(
"/{file_id}/visibility",
web::patch().to(set_file_visibility),
),
)
.service(
web::scope("/folders")
.route("/list", web::post().to(list_folders))
.route("/tree", web::post().to(tree_folders))
.route("/delete", web::post().to(delete_folder))
.route("/move", web::post().to(move_folder)),
)
.service(
web::scope("/permissions")
.route("/list", web::post().to(list_permissions))
.route("/grant", web::post().to(grant_permission))
.route("/revoke", web::post().to(revoke_permission))
.route("/check", web::post().to(check_permission)),
)
.service(
web::scope("/multipart")
.route("/create", web::post().to(create_multipart_upload))
.route("/sign-part", web::post().to(sign_multipart_part))
.route("/complete", web::post().to(complete_multipart_upload))
.route("/abort", web::post().to(abort_multipart_upload))
.route("/list-parts", web::post().to(list_multipart_parts)),
)
.service(web::scope("/audit").route("/list", web::post().to(list_audit_events)));
}
async fn create_upload_url(
req: HttpRequest,
state: Data<AppState>,
body: Json<UploadUrlRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
match create_upload_url_inner(&pool, state.get_ref(), &actor, &body).await {
Ok(response) => api_created("Upload URL generated", response),
Err(resp) => resp,
}
}
async fn create_batch_upload_urls(
req: HttpRequest,
state: Data<AppState>,
body: Json<BatchUploadUrlRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
if body.files.is_empty() {
return bad_request("Invalid files", "files array must not be empty");
}
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let mut files = Vec::with_capacity(body.files.len());
for entry in &body.files {
match create_upload_url_inner(&pool, state.get_ref(), &actor, entry).await {
Ok(response) => files.push(response),
Err(resp) => return resp,
}
}
api_created("Upload URLs generated", BatchUploadUrlResponse { files })
}
async fn list_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<ListFilesRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let prefix = body
.prefix
.as_str()
.trim()
.is_empty()
.then_some(None)
.unwrap_or_else(|| Some(body.prefix.as_str()));
let files = match athena_s3::list_files_for_storage(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list files"),
};
let access_context = actor.access_context();
let mut visible = Vec::new();
for file in files {
match athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) if file_matches_list_filters(&file, &body) => visible.push(file),
Ok(_) => {}
Err(athena_s3::S3CredentialStoreError::AccessDenied) => {}
Err(err) => return map_store_error(err, "Failed to list files"),
}
}
let total = visible.len();
let (files, limit, offset, next_offset, has_more) =
paginate_files(visible, body.limit, body.offset);
let count = files.len();
api_success(
"Files loaded",
json!({
"files": files,
"count": count,
"total": total,
"limit": limit,
"offset": offset,
"next_offset": next_offset,
"has_more": has_more,
}),
)
}
async fn get_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = optional_access_context(&req, state.get_ref()).await;
match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) => api_success("File loaded", AuthorizedFileResponse { file }),
Err(err) => map_store_error(err, "Failed to load file"),
}
}
async fn get_file_url(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<GetFileUrlQuery>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let purpose = normalize_purpose(query.purpose.as_deref()).unwrap_or_else(|| "read".to_string());
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
let file = context.file;
let client = context.client;
let bucket = context.bucket;
if let Some(response) =
load_cached_url_after_auth(state.get_ref(), &pool, &file, &purpose).await
{
return api_success("Presigned URL generated", response);
}
let entry = match generate_presigned_get_entry(&client, &file, &purpose, &bucket).await {
Ok(entry) => entry,
Err(resp) => return resp,
};
write_cached_entry(state.get_ref(), &pool, &entry).await;
api_success(
"Presigned URL generated",
presigned_response(&entry, false, "generated"),
)
}
async fn proxy_file(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<GetFileUrlQuery>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let purpose = normalize_purpose(query.purpose.as_deref()).unwrap_or_else(|| "read".to_string());
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
let file = context.file;
let client = context.client;
let bucket = context.bucket;
let requested_range = req
.headers()
.get(header::RANGE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let mut get_object = client.get_object().bucket(&bucket).key(&file.storage_key);
if let Some(range) = requested_range.as_deref() {
get_object = get_object.range(range.to_string());
}
let output = match get_object.send().await {
Ok(output) => output,
Err(err) => {
warn!(file_id = %file.id, purpose = %purpose, error = %err, "managed file proxy get_object failed");
return internal_error("Failed to proxy file", err.to_string());
}
};
let content_type = output
.content_type()
.map(str::to_string)
.or_else(|| file.mime_type.clone())
.unwrap_or_else(|| "application/octet-stream".to_string());
let is_partial = output.content_range().is_some() || requested_range.is_some();
let mut response = if is_partial {
HttpResponse::PartialContent()
} else {
HttpResponse::Ok()
};
response.insert_header((header::CONTENT_TYPE, content_type));
response.insert_header((header::CACHE_CONTROL, "private, no-store"));
response.insert_header((header::ACCEPT_RANGES, "bytes"));
response.insert_header((
header::CONTENT_DISPOSITION,
proxy_content_disposition(&file, &purpose),
));
if let Some(content_length) = output.content_length().filter(|value| *value >= 0) {
response.insert_header((header::CONTENT_LENGTH, content_length.to_string()));
}
if let Some(etag) = output.e_tag() {
response.insert_header((header::ETAG, etag.to_string()));
}
if let Some(content_range) = output.content_range() {
response.insert_header((header::CONTENT_RANGE, content_range.to_string()));
}
let file_id = file.id.clone();
let stream_purpose = purpose.clone();
let body_stream = stream::unfold(output.body, move |mut body| {
let file_id = file_id.clone();
let stream_purpose = stream_purpose.clone();
async move {
body.next().await.map(|result| {
let chunk = result.map_err(|err| {
warn!(file_id = %file_id, purpose = %stream_purpose, error = %err, "managed file proxy stream failed");
ErrorInternalServerError(err.to_string())
});
(chunk, body)
})
}
});
response.streaming(body_stream)
}
async fn update_file(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<UpdateFileRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = actor.access_context();
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let has_patch = update_has_patch_fields(&body);
let requested_storage_key = match body.storage_key.as_deref() {
Some(value) => Some(match normalize_storage_key(value) {
Ok(value) => value,
Err(resp) => return resp,
}),
None => None,
};
if requested_storage_key.is_none() && !has_patch {
return bad_request(
"Invalid update",
"provide storage_key or at least one mutable metadata field",
);
}
let mut updated = file.clone();
if let Some(new_storage_key) = requested_storage_key {
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let destination_bucket_override = normalize_bucket(body.bucket.as_deref());
let destination_bucket = destination_bucket_override
.clone()
.unwrap_or_else(|| target.bucket.clone());
let should_move =
new_storage_key != file.storage_key || destination_bucket != target.bucket;
if should_move {
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.copy_object()
.bucket(&destination_bucket)
.copy_source(s3_copy_source(&target.bucket, &file.storage_key, None))
.key(&new_storage_key)
.send()
.await
{
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.move",
athena_s3::ManagedFileActionStatus::Failed,
format!("copy_object failed: {err}"),
json!({ "from": file.storage_key, "to": new_storage_key, "bucket": destination_bucket }),
json!({}),
)
.await;
return internal_error("Failed to move file", err.to_string());
}
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.move",
athena_s3::ManagedFileActionStatus::Failed,
format!("delete_object failed after copy: {err}"),
json!({ "from": file.storage_key, "to": new_storage_key, "bucket": destination_bucket }),
json!({}),
)
.await;
return internal_error("Failed to finalize file move", err.to_string());
}
} else if !has_patch {
return bad_request(
"Invalid storage key",
"new storage_key must differ from the current value",
);
}
updated = match athena_s3::update_file_location(
&pool,
&file.id,
&new_storage_key,
destination_bucket_override.as_deref(),
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file metadata"),
};
}
if has_patch {
let patch = patch_input_from_update(&body);
updated = match athena_s3::patch_managed_file(&pool, &updated.id, &patch).await {
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file metadata"),
};
}
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.move",
athena_s3::ManagedFileActionStatus::Success,
"File moved".to_string(),
json!({ "from": file.storage_key, "to": updated.storage_key, "patched": has_patch }),
json!({ "file": updated }),
)
.await;
api_success("File updated", FileMutationResponse { file: updated })
}
async fn delete_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = actor.access_context();
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Delete,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.delete",
athena_s3::ManagedFileActionStatus::Failed,
format!("delete_object failed: {err}"),
json!({ "storage_key": file.storage_key }),
json!({}),
)
.await;
return internal_error("Failed to delete file", err.to_string());
}
let deleted = match athena_s3::mark_file_deleted(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to mark file deleted"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.delete",
athena_s3::ManagedFileActionStatus::Success,
"File deleted".to_string(),
json!({ "storage_key": file.storage_key }),
json!({ "file": deleted }),
)
.await;
api_success("File deleted", FileMutationResponse { file: deleted })
}
async fn set_file_visibility(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<SetVisibilityRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = actor.access_context();
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Share,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let requested_visibility = match resolve_requested_visibility(&body) {
Ok(visibility) => visibility,
Err(resp) => return resp,
};
let updated = match athena_s3::set_file_visibility(&pool, &file.id, &requested_visibility).await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file visibility"),
};
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/visibility",
"storage.file.visibility",
athena_s3::ManagedFileActionStatus::Success,
format!("File visibility set to {requested_visibility}"),
json!({ "visibility": requested_visibility }),
json!({ "file": updated }),
)
.await;
api_success(
"File visibility updated",
FileMutationResponse { file: updated },
)
}
async fn confirm_upload(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<ConfirmUploadRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let updated = match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
body.size_bytes.or(file.size_bytes),
body.content_type.as_deref().or(file.mime_type.as_deref()),
body.checksum_sha256.as_deref(),
body.metadata.clone(),
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to confirm file upload"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/confirm-upload",
"storage.file.confirm_upload",
athena_s3::ManagedFileActionStatus::Success,
"Managed file upload confirmed".to_string(),
json!({
"size_bytes": body.size_bytes,
"content_type": body.content_type,
"checksum_sha256": body.checksum_sha256,
}),
json!({ "file": updated }),
)
.await;
api_success(
"File upload confirmed",
FileMutationResponse { file: updated },
)
}
async fn upload_file_binary(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: web::Bytes,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let content_type = req
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.or_else(|| file.mime_type.clone());
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let mut request = client
.put_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.body(ByteStream::from(body.to_vec()));
if let Some(content_type) = content_type.as_ref() {
request = request.content_type(content_type);
}
let encryption = sse_from_request_headers(&req);
request = match apply_sse_to_put(request, &encryption) {
Ok(request) => request,
Err(resp) => return resp,
};
if let Err(err) = request.send().await {
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/upload",
"storage.file.upload_binary",
athena_s3::ManagedFileActionStatus::Failed,
format!("put_object failed: {err}"),
json!({ "size_bytes": body.len() }),
json!({}),
)
.await;
return internal_error("Failed to upload binary file body", err.to_string());
}
let updated = match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
Some(body.len() as i64),
content_type.as_deref(),
None,
None,
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file upload status"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/upload",
"storage.file.upload_binary",
athena_s3::ManagedFileActionStatus::Success,
"Managed file binary uploaded".to_string(),
json!({ "size_bytes": body.len() }),
json!({ "file": updated }),
)
.await;
api_success(
"File binary uploaded",
FileMutationResponse { file: updated },
)
}
async fn get_file_public_url(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
if context.file.visibility != "public" {
return forbidden(
"Forbidden",
"file visibility must be public before requesting a public URL",
);
}
let target = match file_target(&pool, &context.file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let url = match compute_public_object_url(&target, &context.bucket, &context.file.storage_key) {
Ok(url) => url,
Err(resp) => return resp,
};
api_success(
"Public URL resolved",
json!({
"file_id": context.file.id,
"bucket": context.bucket,
"storage_key": context.file.storage_key,
"url": url,
}),
)
}
async fn get_file_proxy_url(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<GetFileUrlQuery>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let purpose = normalize_purpose(query.purpose.as_deref()).unwrap_or_else(|| "read".to_string());
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
let proxy_url = build_proxy_url(&req, &context.file.id, Some(&purpose));
api_success(
"Proxy URL resolved",
json!({
"file_id": context.file.id,
"bucket": context.bucket,
"storage_key": context.file.storage_key,
"purpose": purpose,
"url": proxy_url,
"proxy_url": proxy_url,
}),
)
}
async fn search_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileSearchRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let page_limit = body.limit.unwrap_or(50).max(1).min(500);
let page_offset = body.offset.unwrap_or(0);
let fetch_limit = page_offset.saturating_add(page_limit).max(50).min(1000);
let rows = match athena_s3::search_files(
&pool,
&actor.organization_ids,
body.query.as_deref(),
fetch_limit,
)
.await
{
Ok(rows) => rows,
Err(err) => return map_store_error(err, "Failed to search files"),
};
let access_context = actor.access_context();
let mut visible = Vec::new();
for file in rows {
match athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) if file_matches_search_filters(&file, &body) => visible.push(file),
Ok(_) => {}
Err(athena_s3::S3CredentialStoreError::AccessDenied) => {}
Err(err) => return map_store_error(err, "Failed to search files"),
}
}
let total = visible.len();
let (files, limit, offset, next_offset, has_more) =
paginate_files(visible, body.limit, body.offset);
let count = files.len();
api_success(
"Files searched",
json!({
"files": files,
"count": count,
"total": total,
"limit": limit,
"offset": offset,
"next_offset": next_offset,
"has_more": has_more,
}),
)
}
async fn list_folders(
req: HttpRequest,
state: Data<AppState>,
body: Json<ListFilesRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list folders"),
};
let access_context = actor.access_context();
let mut folders = BTreeSet::new();
for file in files {
if athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
.is_ok()
{
if let Some(prefix_path) = file.prefix_path {
folders.insert(prefix_path);
}
}
}
let folders: Vec<String> = folders.into_iter().collect();
let count = folders.len();
api_success(
"Folders loaded",
json!({
"folders": folders,
"count": count,
}),
)
}
async fn tree_folders(
req: HttpRequest,
state: Data<AppState>,
body: Json<ListFilesRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to build folder tree"),
};
let mut tree = BTreeMap::<String, Value>::new();
let access_context = actor.access_context();
for file in files {
if athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
.is_err()
{
continue;
}
if let Some(prefix_path) = file.prefix_path {
tree.insert(
prefix_path.clone(),
json!({
"path": prefix_path,
"file_count": tree
.get(&prefix_path)
.and_then(|value| value.get("file_count"))
.and_then(|value| value.as_u64())
.unwrap_or(0)
+ 1,
}),
);
}
}
api_success(
"Folder tree loaded",
json!({
"nodes": tree.into_values().collect::<Vec<Value>>(),
}),
)
}
async fn restore_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Delete,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file restore"),
};
let restored = match athena_s3::restore_file(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to restore file"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
api_success("File restored", FileMutationResponse { file: restored })
}
async fn purge_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file purge"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let _ = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await;
let purged = match athena_s3::purge_file(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to purge file metadata"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
api_success("File purged", FileMutationResponse { file: purged })
}
async fn copy_file(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<CopyFileRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let source = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize source file"),
};
let target = match file_target(&pool, &source).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let bucket = normalize_bucket(body.bucket.as_deref()).unwrap_or_else(|| target.bucket.clone());
let storage_key = match normalize_storage_key(&body.storage_key) {
Ok(value) => value,
Err(resp) => return resp,
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let request = match apply_sse_to_copy(
client
.copy_object()
.bucket(&bucket)
.copy_source(s3_copy_source(&target.bucket, &source.storage_key, None))
.key(&storage_key),
&body.encryption,
) {
Ok(request) => request,
Err(resp) => return resp,
};
if let Err(err) = request.send().await {
return internal_error("Failed to copy managed file object", err.to_string());
}
let source_s3_id = match source
.s3_id
.as_deref()
.and_then(|value| Uuid::parse_str(value).ok())
{
Some(s3_id) => s3_id,
None => {
return bad_request(
"Invalid file metadata",
"source file is missing a valid s3_id",
);
}
};
let copied = match athena_s3::upsert_managed_file_for_upload(
&pool,
&athena_s3::ManagedFileUpsertInput {
file_id: None,
s3_id: source_s3_id,
bucket: bucket.clone(),
storage_key,
organization_id: actor.organization_id.clone(),
uploaded_by_user_id: Some(actor.user_id.clone()),
name: body.file_name.clone().or_else(|| source.file_name.clone()),
original_name: source.original_name.clone(),
mime_type: source.mime_type.clone(),
size_bytes: source.size_bytes,
resource_id: source.resource_id.clone(),
is_public: source.is_public,
visibility: body
.visibility
.clone()
.or_else(|| Some(source.visibility.clone())),
metadata: body
.metadata
.clone()
.unwrap_or_else(|| source.metadata.clone()),
},
)
.await
{
Ok(file) => match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
source.size_bytes,
source.mime_type.as_deref(),
source.checksum_sha256.as_deref(),
None,
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to persist copied file metadata"),
},
Err(err) => return map_store_error(err, "Failed to persist copied file metadata"),
};
api_success("File copied", FileMutationResponse { file: copied })
}
async fn delete_many_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileDeleteManyRequest>,
) -> HttpResponse {
if body.file_ids.is_empty() {
return bad_request("Invalid file_ids", "file_ids array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let mut files = Vec::new();
for file_id in &body.file_ids {
let file = match athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Delete,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file delete"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to delete file", err.to_string());
}
let deleted = match athena_s3::mark_file_deleted(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to mark file deleted"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
files.push(deleted);
}
let count = files.len();
api_success("Files deleted", FileMutationManyResponse { count, files })
}
async fn update_many_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileUpdateManyRequest>,
) -> HttpResponse {
if body.file_ids.is_empty() {
return bad_request("Invalid file_ids", "file_ids array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let has_patch = update_many_has_patch_fields(&body);
let requested_storage_key = match body.storage_key.as_deref() {
Some(value) => Some(value),
None => None,
};
if requested_storage_key.is_none() && !has_patch {
return bad_request(
"Invalid update",
"provide storage_key or at least one mutable metadata field",
);
}
let multi_prefix = if body.file_ids.len() > 1 {
match requested_storage_key {
Some(value) => Some(match normalize_prefix(value) {
Ok(value) => value,
Err(resp) => return resp,
}),
None => None,
}
} else {
None
};
let mut files = Vec::new();
for file_id in &body.file_ids {
let file = match athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file update"),
};
let mut updated = file.clone();
if let Some(raw_storage_key) = requested_storage_key {
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let new_storage_key = if let Some(prefix) = multi_prefix.as_ref() {
format!("{}{}", prefix, preferred_proxy_filename(&file))
} else {
match normalize_storage_key(raw_storage_key) {
Ok(value) => value,
Err(resp) => return resp,
}
};
let destination_bucket_override = normalize_bucket(body.bucket.as_deref());
let destination_bucket = destination_bucket_override
.clone()
.unwrap_or_else(|| target.bucket.clone());
let should_move =
new_storage_key != file.storage_key || destination_bucket != target.bucket;
if should_move {
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.copy_object()
.bucket(&destination_bucket)
.copy_source(s3_copy_source(&target.bucket, &file.storage_key, None))
.key(&new_storage_key)
.send()
.await
{
return internal_error("Failed to move file", err.to_string());
}
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to finalize file move", err.to_string());
}
}
updated = match athena_s3::update_file_location(
&pool,
&file.id,
&new_storage_key,
destination_bucket_override.as_deref(),
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to update file metadata"),
};
}
if has_patch {
let patch = patch_input_from_update_many(&body);
updated = match athena_s3::patch_managed_file(&pool, &updated.id, &patch).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to update file metadata"),
};
}
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
files.push(updated);
}
let count = files.len();
api_success("Files updated", FileMutationManyResponse { count, files })
}
async fn set_many_file_visibility(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileVisibilityManyRequest>,
) -> HttpResponse {
if body.file_ids.is_empty() {
return bad_request("Invalid file_ids", "file_ids array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let visibility = match resolve_requested_visibility_from_many(&body) {
Ok(visibility) => visibility,
Err(resp) => return resp,
};
let mut files = Vec::new();
for file_id in &body.file_ids {
let file = match athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Share,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file visibility update"),
};
let updated = match athena_s3::set_file_visibility(&pool, &file.id, &visibility).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to update file visibility"),
};
files.push(updated);
}
let count = files.len();
api_success(
"File visibility updated",
FileMutationManyResponse { count, files },
)
}
async fn list_permissions(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionListRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Share,
)
.await
{
return map_store_error(err, "Failed to authorize permission listing");
}
let permissions = match athena_s3::list_file_permissions(&pool, &body.file_id).await {
Ok(rows) => rows,
Err(err) => return map_store_error(err, "Failed to list file permissions"),
};
let count = permissions.len();
api_success(
"File permissions loaded",
PermissionListResponse { count, permissions },
)
}
async fn grant_permission(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionGrantRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let permission = match athena_s3::FilePermissionKind::parse(&body.permission) {
Some(permission) => permission,
None => return bad_request("Invalid permission", "unsupported permission value"),
};
let needed_permission = if permission == athena_s3::FilePermissionKind::Owner {
athena_s3::FilePermissionKind::Owner
} else {
athena_s3::FilePermissionKind::Share
};
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
needed_permission,
)
.await
{
return map_store_error(err, "Failed to authorize permission grant");
}
if let Err(err) = athena_s3::grant_file_permission(
&pool,
&body.file_id,
&athena_s3::FilePermissionGrantInput {
principal_type: body.principal_type.clone(),
principal_id: body.principal_id.clone(),
permission,
granted_by_user_id: Some(actor.user_id.clone()),
expires_at: body.expires_at,
metadata: body.metadata.clone().unwrap_or_else(|| json!({})),
},
)
.await
{
return map_store_error(err, "Failed to grant file permission");
}
api_success("File permission granted", json!({ "granted": true }))
}
async fn revoke_permission(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionRevokeRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let permission = match athena_s3::FilePermissionKind::parse(&body.permission) {
Some(permission) => permission,
None => return bad_request("Invalid permission", "unsupported permission value"),
};
let needed_permission = if permission == athena_s3::FilePermissionKind::Owner {
athena_s3::FilePermissionKind::Owner
} else {
athena_s3::FilePermissionKind::Share
};
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
needed_permission,
)
.await
{
return map_store_error(err, "Failed to authorize permission revoke");
}
let revoked = match athena_s3::revoke_file_permission(
&pool,
&body.file_id,
&body.principal_type,
&body.principal_id,
permission,
)
.await
{
Ok(revoked) => revoked,
Err(err) => return map_store_error(err, "Failed to revoke file permission"),
};
api_success("File permission revoked", json!({ "revoked": revoked }))
}
async fn check_permission(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionCheckRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let permission = match athena_s3::FilePermissionKind::parse(&body.permission) {
Some(permission) => permission,
None => return bad_request("Invalid permission", "unsupported permission value"),
};
let allowed = match athena_s3::check_file_permission(
&pool,
&body.file_id,
&actor.access_context(),
permission,
)
.await
{
Ok(allowed) => allowed,
Err(err) => return map_store_error(err, "Failed to check file permission"),
};
api_success(
"File permission checked",
PermissionCheckResponse {
allowed,
permission: permission.as_str().to_string(),
},
)
}
async fn list_audit_events(
req: HttpRequest,
state: Data<AppState>,
body: Json<AuditListRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Some(file_id) = body.file_id.as_deref() {
if let Err(err) = athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Read,
)
.await
{
return map_store_error(err, "Failed to authorize audit lookup");
}
} else if !actor.is_admin {
return forbidden(
"Forbidden",
"audit listing without a file_id requires storage-admin privileges",
);
}
let events = match athena_s3::list_storage_audit_events(
&pool,
&athena_s3::StorageAuditListInput {
file_id: body.file_id.clone(),
actor_user_id: if actor.is_admin {
None
} else {
Some(actor.user_id.clone())
},
limit: body.limit.unwrap_or(50),
offset: body.offset.unwrap_or(0),
},
)
.await
{
Ok(events) => events,
Err(err) => return map_store_error(err, "Failed to list storage audit events"),
};
let count = events.len();
api_success(
"Storage audit events loaded",
AuditListResponse { count, events },
)
}
fn managed_storage_feature_unavailable(
route: &str,
feature: &str,
file: &athena_s3::ManagedFileRecord,
provider: Option<&str>,
extra: Value,
) -> HttpResponse {
HttpResponse::NotImplemented().json(json!({
"status": "error",
"message": "Managed storage feature is not available on this Athena runtime",
"error": format!("{feature} is not yet implemented for Athena-managed storage"),
"code": "STORAGE_FEATURE_UNAVAILABLE",
"feature": feature,
"route": route,
"file": {
"id": file.id,
"bucket": file.bucket,
"storage_key": file.storage_key,
"s3_id": file.s3_id,
"status": file.status,
},
"provider": provider,
"context": extra,
}))
}
async fn list_file_versions(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file version listing"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.list_object_versions()
.bucket(&target.bucket)
.prefix(&file.storage_key)
.send()
.await
{
Ok(output) => api_success(
"File versions loaded",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"versions": output.versions().iter().map(object_version_to_json).collect::<Vec<_>>(),
"delete_markers": output.delete_markers().iter().map(delete_marker_to_json).collect::<Vec<_>>(),
"is_truncated": output.is_truncated().unwrap_or(false),
"key_marker": output.key_marker(),
"version_id_marker": output.version_id_marker(),
"next_key_marker": output.next_key_marker(),
"next_version_id_marker": output.next_version_id_marker(),
}),
),
Err(err) => internal_error("Failed to list file versions", err.to_string()),
}
}
async fn restore_file_version(
req: HttpRequest,
state: Data<AppState>,
path: Path<FileVersionPath>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let version_path = path.into_inner();
let file = match athena_s3::authorize_file_access(
&pool,
&version_path.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file version restore"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.copy_object()
.bucket(&target.bucket)
.copy_source(s3_copy_source(
&target.bucket,
&file.storage_key,
Some(&version_path.version_id),
))
.key(&file.storage_key)
.send()
.await
{
Ok(output) => {
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/versions/{version_id}/restore",
"storage.file.version.restore",
athena_s3::ManagedFileActionStatus::Success,
"File version restored".to_string(),
json!({ "version_id": version_path.version_id }),
json!({
"copy_source_version_id": output.copy_source_version_id(),
"version_id": output.version_id(),
}),
)
.await;
api_success(
"File version restored",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"version_id": version_path.version_id,
"restored_version_id": output.version_id(),
"copy_source_version_id": output.copy_source_version_id(),
}),
)
}
Err(err) => internal_error("Failed to restore file version", err.to_string()),
}
}
async fn delete_file_version(
req: HttpRequest,
state: Data<AppState>,
path: Path<FileVersionPath>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let version_path = path.into_inner();
let file = match athena_s3::authorize_file_access(
&pool,
&version_path.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file version deletion"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.version_id(&version_path.version_id)
.send()
.await
{
Ok(output) => {
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/versions/{version_id}",
"storage.file.version.delete",
athena_s3::ManagedFileActionStatus::Success,
"File version deleted".to_string(),
json!({ "version_id": version_path.version_id }),
json!({ "delete_marker": output.delete_marker(), "version_id": output.version_id() }),
)
.await;
api_success(
"File version deleted",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"version_id": output.version_id().unwrap_or(&version_path.version_id),
"delete_marker": output.delete_marker(),
}),
)
}
Err(err) => internal_error("Failed to delete file version", err.to_string()),
}
}
async fn get_file_retention(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<RetentionRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize managed retention lookup"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let mut request = client
.get_object_retention()
.bucket(&target.bucket)
.key(&file.storage_key);
if let Some(version_id) = query
.version_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.version_id(version_id);
}
match request.send().await {
Ok(output) => api_success(
"File retention loaded",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"version_id": query.version_id.clone(),
"retention": retention_to_json(output.retention()),
}),
),
Err(err) => internal_error("Failed to load file retention", err.to_string()),
}
}
async fn set_file_retention(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<RetentionRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize managed retention update"),
};
let requested_mode = body.mode.clone();
let requested_version_id = body.version_id.clone();
let requested_bypass_governance = body.bypass_governance;
let mode = match parse_retention_mode(requested_mode.as_deref()) {
Ok(mode) => mode,
Err(resp) => return resp,
};
let retain_until = match body.retain_until.or(body.retain_until_date) {
Some(value) => value,
None => {
return bad_request(
"Invalid retention",
"retain_until or retain_until_date is required",
);
}
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let retention = ObjectLockRetention::builder()
.mode(mode)
.retain_until_date(aws_smithy_types::DateTime::from_secs(
retain_until.timestamp(),
))
.build();
let mut request = client
.put_object_retention()
.bucket(&target.bucket)
.key(&file.storage_key)
.retention(retention);
if let Some(version_id) = requested_version_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.version_id(version_id);
}
if let Some(bypass) = requested_bypass_governance {
request = request.bypass_governance_retention(bypass);
}
match request.send().await {
Ok(_) => {
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/retention",
"storage.file.retention.set",
athena_s3::ManagedFileActionStatus::Success,
"File retention updated".to_string(),
json!({
"mode": requested_mode,
"retain_until": retain_until,
"version_id": requested_version_id,
"bypass_governance": requested_bypass_governance,
}),
json!({}),
)
.await;
api_success(
"File retention updated",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"version_id": requested_version_id,
"retention": {
"mode": requested_mode,
"retain_until_date": retain_until.to_rfc3339(),
},
}),
)
}
Err(err) => internal_error("Failed to update file retention", err.to_string()),
}
}
async fn create_multipart_upload(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartCreateRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart upload"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let mut request = client
.create_multipart_upload()
.bucket(&target.bucket)
.key(&file.storage_key);
if let Some(content_type) = body.content_type.as_deref().and_then(normalize_non_empty) {
request = request.content_type(content_type);
}
let request = match apply_sse_to_multipart(request, &body.encryption) {
Ok(request) => request,
Err(resp) => return resp,
};
match request.send().await {
Ok(output) => api_success(
"Multipart upload created",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"upload_id": output.upload_id().unwrap_or_default(),
}),
),
Err(err) => internal_error("Failed to create multipart upload", err.to_string()),
}
}
async fn sign_multipart_part(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartSignPartRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart part signing"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let presign_config = match PresigningConfig::expires_in(Duration::from_secs(
STORAGE_URL_CACHE_SETTINGS.presign_expires_seconds,
)) {
Ok(config) => config,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
match client
.upload_part()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.part_number(body.part_number)
.presigned(presign_config)
.await
{
Ok(presigned) => api_success(
"Multipart part signed",
json!({
"file_id": file.id,
"upload_id": body.upload_id,
"part_number": body.part_number,
"method": "PUT",
"url": presigned.uri().to_string(),
"headers": {},
}),
),
Err(err) => internal_error("Failed to sign multipart part", err.to_string()),
}
}
async fn complete_multipart_upload(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartCompleteRequest>,
) -> HttpResponse {
if body.parts.is_empty() {
return bad_request("Invalid parts", "parts array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart completion"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let completed_parts = body
.parts
.iter()
.map(|part| {
CompletedPart::builder()
.part_number(part.part_number)
.e_tag(part.etag.clone())
.build()
})
.collect::<Vec<CompletedPart>>();
match client
.complete_multipart_upload()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build(),
)
.send()
.await
{
Ok(_) => {
let updated = match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
file.size_bytes,
file.mime_type.as_deref(),
None,
None,
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to finalize multipart upload"),
};
api_success(
"Multipart upload completed",
FileMutationResponse { file: updated },
)
}
Err(err) => internal_error("Failed to complete multipart upload", err.to_string()),
}
}
async fn abort_multipart_upload(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartAbortRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart abort"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.abort_multipart_upload()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.send()
.await
{
Ok(_) => api_success(
"Multipart upload aborted",
json!({ "file_id": file.id, "upload_id": body.upload_id, "aborted": true }),
),
Err(err) => internal_error("Failed to abort multipart upload", err.to_string()),
}
}
async fn list_multipart_parts(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartListPartsRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart listing"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.list_parts()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.send()
.await
{
Ok(output) => api_success(
"Multipart parts loaded",
json!({
"file_id": file.id,
"upload_id": body.upload_id,
"parts": output.parts().iter().map(|part| json!({
"part_number": part.part_number(),
"etag": part.e_tag(),
"size": part.size(),
"last_modified": part.last_modified().map(|value| value.to_string()),
})).collect::<Vec<Value>>(),
}),
),
Err(err) => internal_error("Failed to list multipart parts", err.to_string()),
}
}
async fn delete_folder(
req: HttpRequest,
state: Data<AppState>,
body: Json<FolderDeleteRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list folder files"),
};
if files.is_empty() {
return not_found(
"Folder not found",
"no tracked files were found for the supplied prefix",
);
}
let access_context = actor.access_context();
for file in &files {
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Delete,
)
.await
{
return map_store_error(err, "Failed to authorize folder delete");
}
}
let target = match file_target(&pool, &files[0]).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
for file in &files {
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to delete folder objects", err.to_string());
}
}
for file in &files {
if let Err(err) = athena_s3::mark_file_deleted(&pool, &file.id).await {
return map_store_error(err, "Failed to update folder file state");
}
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/folders/delete",
"storage.folder.delete",
athena_s3::ManagedFileActionStatus::Success,
"Folder file deleted".to_string(),
json!({ "prefix": body.prefix }),
json!({ "storage_key": file.storage_key }),
)
.await;
}
api_success(
"Folder deleted",
FolderMutationResponse {
s3_id: body.s3_id.clone(),
prefix: body.prefix.clone(),
processed_files: files.len(),
},
)
}
async fn move_folder(
req: HttpRequest,
state: Data<AppState>,
body: Json<FolderMoveRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let from_prefix = match normalize_prefix(&body.from_prefix) {
Ok(value) => value,
Err(resp) => return resp,
};
let to_prefix = match normalize_prefix(&body.to_prefix) {
Ok(value) => value,
Err(resp) => return resp,
};
if from_prefix == to_prefix {
return bad_request(
"Invalid destination prefix",
"from_prefix and to_prefix must differ",
);
}
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&from_prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list folder files"),
};
if files.is_empty() {
return not_found(
"Folder not found",
"no tracked files were found for the supplied prefix",
);
}
let access_context = actor.access_context();
for file in &files {
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Write,
)
.await
{
return map_store_error(err, "Failed to authorize folder move");
}
}
let target = match file_target(&pool, &files[0]).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
for file in &files {
let suffix = match file.storage_key.strip_prefix(&from_prefix) {
Some(suffix) => suffix,
None => continue,
};
let new_storage_key = format!("{to_prefix}{suffix}");
let copy_source = format!("{}/{}", target.bucket, file.storage_key);
if let Err(err) = client
.copy_object()
.bucket(&target.bucket)
.copy_source(copy_source)
.key(&new_storage_key)
.send()
.await
{
return internal_error("Failed to move folder objects", err.to_string());
}
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to finalize folder move", err.to_string());
}
if let Err(err) =
athena_s3::update_file_location(&pool, &file.id, &new_storage_key, None).await
{
return map_store_error(err, "Failed to update folder file metadata");
}
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/folders/move",
"storage.folder.move",
athena_s3::ManagedFileActionStatus::Success,
"Folder file moved".to_string(),
json!({ "from_prefix": from_prefix, "to_prefix": to_prefix }),
json!({ "storage_key": new_storage_key }),
)
.await;
}
api_success(
"Folder moved",
FolderMutationResponse {
s3_id: body.s3_id.clone(),
prefix: to_prefix,
processed_files: files.len(),
},
)
}
async fn create_upload_url_inner(
pool: &PgPool,
app_state: &AppState,
actor: &ActorContext,
body: &UploadUrlRequest,
) -> Result<UploadUrlResponse, HttpResponse> {
let s3_id = parse_uuid(&body.file.s3_id, "s3_id")?;
let target = athena_s3::load_managed_storage_target(pool, s3_id)
.await
.map_err(|err| map_store_error(err, "Failed to resolve storage backend"))?;
let bucket =
normalize_bucket(body.file.bucket.as_deref()).unwrap_or_else(|| target.bucket.clone());
ensure_bucket_present(&bucket)?;
let storage_key = normalize_storage_key(&body.file.storage_key)?;
if let Some(existing) =
athena_s3::find_existing_file_by_path(pool, s3_id, &bucket, &storage_key)
.await
.map_err(|err| map_store_error(err, "Failed to resolve existing file"))?
{
let access = actor.access_context();
athena_s3::authorize_file_access(
pool,
&existing.id,
&access,
athena_s3::FilePermissionKind::Write,
)
.await
.map_err(|err| map_store_error(err, "Failed to authorize existing file"))?;
}
let file = athena_s3::upsert_managed_file_for_upload(
pool,
&athena_s3::ManagedFileUpsertInput {
file_id: body.file.file_id.clone(),
s3_id,
bucket: bucket.clone(),
storage_key: storage_key.clone(),
organization_id: actor.organization_id.clone(),
uploaded_by_user_id: Some(actor.user_id.clone()),
name: body.file.name.clone(),
original_name: body.file.original_name.clone(),
mime_type: body
.content_type
.clone()
.or_else(|| body.file.mime_type.clone()),
size_bytes: body.file.size_bytes,
resource_id: body.file.resource_id.clone(),
is_public: body.file.public.unwrap_or(false),
visibility: body.file.visibility.clone(),
metadata: body.file.metadata.clone().unwrap_or_else(|| json!({})),
},
)
.await
.map_err(|err| map_store_error(err, "Failed to upsert file metadata"))?;
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let upload = generate_presigned_put_response(
&client,
&file,
&bucket,
body.content_type.as_deref(),
&body.encryption,
)
.await?;
invalidate_cached_entry(app_state, pool, &file.id).await;
best_effort_action_log(
pool,
actor,
&file.id,
"/storage/files/upload-url",
"storage.file.upload_url",
athena_s3::ManagedFileActionStatus::Success,
"Upload URL generated".to_string(),
json!({
"storage_key": file.storage_key,
"bucket": file.bucket,
}),
json!({
"upload": upload,
}),
)
.await;
Ok(UploadUrlResponse { file, upload })
}
async fn generate_presigned_get_entry(
client: &S3Client,
file: &athena_s3::ManagedFileRecord,
purpose: &str,
bucket: &str,
) -> Result<athena_s3::PresignedUrlCacheRecord, HttpResponse> {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let presign_config =
PresigningConfig::expires_in(Duration::from_secs(settings.presign_expires_seconds))
.map_err(|err| internal_error("Presign config error", err.to_string()))?;
let presigned = client
.get_object()
.bucket(bucket)
.key(&file.storage_key)
.presigned(presign_config)
.await
.map_err(|err| internal_error("Failed to generate presigned URL", err.to_string()))?;
let expires_at =
Utc::now() + chrono::Duration::seconds(settings.presign_expires_seconds as i64);
Ok(athena_s3::PresignedUrlCacheRecord {
file_id: file.id.clone(),
purpose: purpose.to_string(),
bucket: bucket.to_string(),
storage_key: file.storage_key.clone(),
url: presigned.uri().to_string(),
expires_at,
expires_at_epoch_seconds: expires_at.timestamp(),
created_at: Utc::now(),
metadata: json!({
"source": "storage.files.url",
}),
})
}
async fn generate_presigned_put_response(
client: &S3Client,
file: &athena_s3::ManagedFileRecord,
bucket: &str,
content_type: Option<&str>,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<PresignedFileUrlResponse, HttpResponse> {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let presign_config =
PresigningConfig::expires_in(Duration::from_secs(settings.presign_expires_seconds))
.map_err(|err| internal_error("Presign config error", err.to_string()))?;
let mut request = client.put_object().bucket(bucket).key(&file.storage_key);
if let Some(content_type) = content_type.and_then(normalize_non_empty) {
request = request.content_type(content_type);
}
request = apply_sse_to_put(request, encryption)?;
let presigned = request
.presigned(presign_config)
.await
.map_err(|err| internal_error("Failed to generate upload URL", err.to_string()))?;
let expires_at =
Utc::now() + chrono::Duration::seconds(settings.presign_expires_seconds as i64);
Ok(PresignedFileUrlResponse {
file_id: file.id.clone(),
bucket: bucket.to_string(),
storage_key: file.storage_key.clone(),
purpose: "upload".to_string(),
url: presigned.uri().to_string(),
headers: presigned_headers_to_map(presigned.headers()),
expires_at: expires_at.to_rfc3339(),
expires_at_epoch_seconds: expires_at.timestamp(),
expires_in: settings.presign_expires_seconds,
cache_hit: false,
cache_layer: "generated".to_string(),
})
}
async fn load_cached_url_after_auth(
app_state: &AppState,
pool: &PgPool,
file: &athena_s3::ManagedFileRecord,
purpose: &str,
) -> Option<PresignedFileUrlResponse> {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let cache_key = local_cache_key(&file.id, purpose);
if let Some(entry) = app_state.storage_file_url_cache.get(&cache_key).await {
if cache_entry_is_usable(&entry, file, settings) {
return Some(presigned_response(&entry, true, "moka"));
}
}
for layer in cache_read_attempts(settings) {
let entry = match layer {
CacheLayer::Redis => load_cached_entry_from_redis(file, purpose).await,
CacheLayer::Postgres => load_cached_entry_from_postgres(pool, &file.id, purpose).await,
};
if let Some(entry) = entry {
if cache_entry_is_usable(&entry, file, settings) {
app_state
.storage_file_url_cache
.insert(cache_key.clone(), entry.clone())
.await;
return Some(presigned_response(&entry, true, layer.as_str()));
}
}
}
None
}
async fn write_cached_entry(
app_state: &AppState,
pool: &PgPool,
entry: &athena_s3::PresignedUrlCacheRecord,
) {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let cache_key = local_cache_key(&entry.file_id, &entry.purpose);
app_state
.storage_file_url_cache
.insert(cache_key, entry.clone())
.await;
for layer in cache_write_targets(settings) {
match layer {
CacheLayer::Postgres => {
let _ = athena_s3::upsert_cached_presigned_url(pool, entry).await;
}
CacheLayer::Redis => {
write_cached_entry_to_redis(entry, settings).await;
}
}
}
}
async fn invalidate_cached_entry(app_state: &AppState, pool: &PgPool, file_id: &str) {
for purpose in ["read", "download", "stream"] {
app_state
.storage_file_url_cache
.invalidate(&local_cache_key(file_id, purpose))
.await;
let _ = athena_s3::delete_cached_presigned_url(pool, file_id, Some(purpose)).await;
delete_cached_entry_from_redis(file_id, purpose).await;
}
}
async fn load_cached_entry_from_postgres(
pool: &PgPool,
file_id: &str,
purpose: &str,
) -> Option<athena_s3::PresignedUrlCacheRecord> {
match athena_s3::load_cached_presigned_url(pool, file_id, purpose).await {
Ok(value) => value,
Err(err) => {
warn!(file_id = %file_id, purpose = %purpose, error = %err, "failed to load cached presigned url from postgres");
None
}
}
}
async fn load_cached_entry_from_redis(
file: &athena_s3::ManagedFileRecord,
purpose: &str,
) -> Option<athena_s3::PresignedUrlCacheRecord> {
if should_bypass_redis_temporarily() {
return None;
}
let Some(redis) = GLOBAL_REDIS.get() else {
return None;
};
match redis.get(&redis_cache_key(&file.id, purpose)).await {
Ok(Value::Null) => {
note_redis_success();
None
}
Ok(value) => match serde_json::from_value::<athena_s3::PresignedUrlCacheRecord>(value) {
Ok(entry) => {
note_redis_success();
Some(entry)
}
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %file.id, purpose = %purpose, error = %err, "failed to deserialize cached presigned url from redis");
None
}
},
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %file.id, purpose = %purpose, error = %err, "failed to load cached presigned url from redis");
None
}
}
}
async fn write_cached_entry_to_redis(
entry: &athena_s3::PresignedUrlCacheRecord,
settings: &StorageUrlCacheSettings,
) {
if should_bypass_redis_temporarily() {
return;
}
let Some(redis) = GLOBAL_REDIS.get() else {
return;
};
let remaining = entry
.expires_at_epoch_seconds
.saturating_sub(Utc::now().timestamp())
.max(1) as u64;
let ttl = settings.cache_ttl_seconds.min(remaining);
let payload = match serde_json::to_value(entry) {
Ok(payload) => payload,
Err(err) => {
warn!(file_id = %entry.file_id, purpose = %entry.purpose, error = %err, "failed to serialize cached presigned url for redis");
return;
}
};
match redis
.set_with_ttl(
&redis_cache_key(&entry.file_id, &entry.purpose),
&payload,
ttl,
)
.await
{
Ok(()) => note_redis_success(),
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %entry.file_id, purpose = %entry.purpose, error = %err, "failed to write cached presigned url to redis");
}
}
}
async fn delete_cached_entry_from_redis(file_id: &str, purpose: &str) {
if should_bypass_redis_temporarily() {
return;
}
let Some(redis) = GLOBAL_REDIS.get() else {
return;
};
match redis.delete(&redis_cache_key(file_id, purpose)).await {
Ok(()) => note_redis_success(),
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %file_id, purpose = %purpose, error = %err, "failed to delete cached presigned url from redis");
}
}
}
async fn file_target(
pool: &PgPool,
file: &athena_s3::ManagedFileRecord,
) -> Result<athena_s3::ManagedStorageTarget, athena_s3::S3CredentialStoreError> {
let s3_id = file
.s3_id
.as_deref()
.ok_or_else(|| {
athena_s3::S3CredentialStoreError::InvalidInput("file is missing s3_id".to_string())
})
.and_then(|value| {
Uuid::parse_str(value).map_err(|_| {
athena_s3::S3CredentialStoreError::InvalidInput(
"file has invalid s3_id".to_string(),
)
})
})?;
athena_s3::load_managed_storage_target(pool, s3_id).await
}
async fn resolve_authorized_file_read_context(
pool: &PgPool,
app_state: &AppState,
req: &HttpRequest,
file_id: &str,
) -> Result<AuthorizedFileReadContext, HttpResponse> {
let access_context = optional_access_context(req, app_state).await;
let file = athena_s3::authorize_file_access(
pool,
file_id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
.map_err(|err| map_store_error(err, "Failed to authorize file"))?;
let target = file_target(pool, &file)
.await
.map_err(|err| map_store_error(err, "Failed to resolve storage backend"))?;
let bucket = target.bucket.clone();
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
Ok(AuthorizedFileReadContext {
file,
client,
bucket,
})
}
fn cache_entry_is_usable(
entry: &athena_s3::PresignedUrlCacheRecord,
file: &athena_s3::ManagedFileRecord,
settings: &StorageUrlCacheSettings,
) -> bool {
if entry.bucket != file.bucket || entry.storage_key != file.storage_key {
return false;
}
let now = Utc::now().timestamp();
let remaining = entry.expires_at_epoch_seconds.saturating_sub(now);
remaining > settings.min_remaining_seconds as i64
}
fn presigned_response(
entry: &athena_s3::PresignedUrlCacheRecord,
cache_hit: bool,
cache_layer: &str,
) -> PresignedFileUrlResponse {
let expires_in = entry
.expires_at_epoch_seconds
.saturating_sub(Utc::now().timestamp())
.max(0) as u64;
PresignedFileUrlResponse {
file_id: entry.file_id.clone(),
bucket: entry.bucket.clone(),
storage_key: entry.storage_key.clone(),
purpose: entry.purpose.clone(),
url: entry.url.clone(),
headers: BTreeMap::new(),
expires_at: entry.expires_at.to_rfc3339(),
expires_at_epoch_seconds: entry.expires_at_epoch_seconds,
expires_in,
cache_hit,
cache_layer: cache_layer.to_string(),
}
}
fn local_cache_key(file_id: &str, purpose: &str) -> String {
format!("{purpose}:{file_id}")
}
fn redis_cache_key(file_id: &str, purpose: &str) -> String {
format!(
"{}{}:{}",
STORAGE_URL_CACHE_SETTINGS.redis_prefix, purpose, file_id
)
}
fn parse_uuid(raw: &str, field: &str) -> Result<Uuid, HttpResponse> {
Uuid::parse_str(raw).map_err(|_| {
bad_request(
format!("Invalid {field}"),
format!("{field} must be a UUID"),
)
})
}
fn normalize_storage_key(value: &str) -> Result<String, HttpResponse> {
let normalized = value.trim().trim_matches('/').to_string();
if normalized.is_empty() {
return Err(bad_request(
"Invalid storage key",
"storage_key must not be empty",
));
}
if normalized.contains("..") {
return Err(bad_request(
"Invalid storage key",
"storage_key must not contain path traversal segments",
));
}
Ok(normalized)
}
fn normalize_prefix(value: &str) -> Result<String, HttpResponse> {
let normalized = value.trim().trim_matches('/').to_string();
if normalized.is_empty() {
return Err(bad_request("Invalid prefix", "prefix must not be empty"));
}
Ok(format!("{normalized}/"))
}
fn normalize_bucket(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn resolve_requested_visibility(body: &SetVisibilityRequest) -> Result<String, HttpResponse> {
if let Some(visibility) = body.visibility.as_deref() {
let normalized = visibility.trim().to_ascii_lowercase();
if matches!(normalized.as_str(), "private" | "organization" | "public") {
return Ok(normalized);
}
return Err(bad_request(
"Invalid visibility",
"visibility must be one of private, organization, or public",
));
}
match body.public {
Some(true) => Ok("public".to_string()),
Some(false) => Ok("private".to_string()),
None => Err(bad_request(
"Invalid visibility",
"provide either visibility or public",
)),
}
}
fn resolve_requested_visibility_from_many(
body: &FileVisibilityManyRequest,
) -> Result<String, HttpResponse> {
resolve_requested_visibility(&SetVisibilityRequest {
public: body.public,
visibility: body.visibility.clone(),
})
}
fn normalize_non_empty(value: &str) -> Option<String> {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn normalize_purpose(value: Option<&str>) -> Option<String> {
let value = value?.trim().to_ascii_lowercase();
match value.as_str() {
"read" | "download" | "stream" => Some(value),
_ => None,
}
}
fn preferred_proxy_filename(file: &athena_s3::ManagedFileRecord) -> String {
file.original_name
.as_deref()
.and_then(normalize_non_empty)
.or_else(|| normalize_non_empty(&file.name))
.or_else(|| {
file.storage_key
.rsplit('/')
.next()
.and_then(normalize_non_empty)
})
.unwrap_or_else(|| "file".to_string())
}
fn sanitize_header_filename(value: &str) -> String {
let sanitized: String = value
.chars()
.map(|ch| match ch {
'"' | '\\' | '\r' | '\n' => '_',
_ if ch.is_control() => '_',
_ => ch,
})
.collect();
let trimmed = sanitized.trim();
if trimmed.is_empty() {
"file".to_string()
} else {
trimmed.to_string()
}
}
fn proxy_content_disposition(file: &athena_s3::ManagedFileRecord, purpose: &str) -> String {
let disposition = if purpose == "download" {
"attachment"
} else {
"inline"
};
let filename = sanitize_header_filename(&preferred_proxy_filename(file));
format!("{disposition}; filename=\"{filename}\"")
}
fn compute_public_object_url(
target: &athena_s3::ManagedStorageTarget,
bucket: &str,
storage_key: &str,
) -> Result<String, HttpResponse> {
let key = storage_key.trim_start_matches('/');
if let Some(base_url) = target
.public_base_url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
let base = base_url.trim_end_matches('/');
return Ok(format!("{base}/{key}"));
}
let endpoint = target.endpoint.trim();
if endpoint.is_empty() {
return Err(service_unavailable(
"Storage backend unavailable",
"catalog endpoint is required to compute a public URL",
));
}
if target.force_path_style {
return Ok(format!(
"{}/{}/{}",
endpoint.trim_end_matches('/'),
bucket,
key
));
}
if let Some(without_scheme) = endpoint.strip_prefix("https://") {
return Ok(format!("https://{bucket}.{without_scheme}/{}", key));
}
if let Some(without_scheme) = endpoint.strip_prefix("http://") {
return Ok(format!("http://{bucket}.{without_scheme}/{}", key));
}
Ok(format!(
"{}/{}/{}",
endpoint.trim_end_matches('/'),
bucket,
key
))
}
fn build_proxy_url(req: &HttpRequest, file_id: &str, purpose: Option<&str>) -> String {
let info = req.connection_info();
let scheme = info.scheme();
let host = info.host();
let mut url = format!("{scheme}://{host}/storage/files/{file_id}/proxy");
if let Some(purpose) = purpose.and_then(normalize_non_empty) {
url.push_str("?purpose=");
url.push_str(&purpose);
}
url
}
fn encode_s3_copy_source_component(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for byte in value.bytes() {
let ch = byte as char;
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~' | '/') {
out.push(ch);
} else {
out.push_str(&format!("%{byte:02X}"));
}
}
out
}
fn s3_copy_source(bucket: &str, key: &str, version_id: Option<&str>) -> String {
let encoded_key = encode_s3_copy_source_component(key);
match version_id {
Some(version_id) => format!(
"{bucket}/{encoded_key}?versionId={}",
encode_s3_copy_source_component(version_id)
),
None => format!("{bucket}/{encoded_key}"),
}
}
fn parse_retention_mode(value: Option<&str>) -> Result<ObjectLockRetentionMode, HttpResponse> {
let Some(value) = value.map(str::trim).filter(|value| !value.is_empty()) else {
return Err(bad_request(
"Invalid retention",
"mode is required and must be GOVERNANCE or COMPLIANCE",
));
};
ObjectLockRetentionMode::try_parse(&value.to_ascii_uppercase())
.map_err(|_| bad_request("Invalid retention", "mode must be GOVERNANCE or COMPLIANCE"))
}
fn normalize_sse_value(
encryption: &StorageServerSideEncryptionRequest,
) -> Result<Option<ServerSideEncryption>, HttpResponse> {
let Some(value) = encryption
.server_side_encryption
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(None);
};
let canonical = match value.to_ascii_lowercase().as_str() {
"aes256" => "AES256",
"aws:kms" | "kms" => "aws:kms",
"aws:kms:dsse" | "kms:dsse" | "dsse" => "aws:kms:dsse",
other => {
return Err(bad_request(
"Invalid server-side encryption",
format!("unsupported server_side_encryption value '{other}'"),
));
}
};
Ok(Some(ServerSideEncryption::from(canonical)))
}
fn apply_sse_to_put(
mut request: PutObjectFluentBuilder,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<PutObjectFluentBuilder, HttpResponse> {
if let Some(sse) = normalize_sse_value(encryption)? {
request = request.server_side_encryption(sse);
}
if let Some(key_id) = encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.ssekms_key_id(key_id);
}
if let Some(enabled) = encryption.bucket_key_enabled {
request = request.bucket_key_enabled(enabled);
}
Ok(request)
}
fn apply_sse_to_copy(
mut request: CopyObjectFluentBuilder,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<CopyObjectFluentBuilder, HttpResponse> {
if let Some(sse) = normalize_sse_value(encryption)? {
request = request.server_side_encryption(sse);
}
if let Some(key_id) = encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.ssekms_key_id(key_id);
}
if let Some(enabled) = encryption.bucket_key_enabled {
request = request.bucket_key_enabled(enabled);
}
Ok(request)
}
fn apply_sse_to_multipart(
mut request: CreateMultipartUploadFluentBuilder,
encryption: &StorageServerSideEncryptionRequest,
) -> Result<CreateMultipartUploadFluentBuilder, HttpResponse> {
if let Some(sse) = normalize_sse_value(encryption)? {
request = request.server_side_encryption(sse);
}
if let Some(key_id) = encryption
.ssekms_key_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
request = request.ssekms_key_id(key_id);
}
if let Some(enabled) = encryption.bucket_key_enabled {
request = request.bucket_key_enabled(enabled);
}
Ok(request)
}
fn sse_from_request_headers(req: &HttpRequest) -> StorageServerSideEncryptionRequest {
let header_value = |name: &str| {
req.headers()
.get(name)
.and_then(|value| value.to_str().ok())
.and_then(normalize_non_empty)
};
StorageServerSideEncryptionRequest {
server_side_encryption: header_value("x-amz-server-side-encryption"),
ssekms_key_id: header_value("x-amz-server-side-encryption-aws-kms-key-id"),
bucket_key_enabled: header_value("x-amz-server-side-encryption-bucket-key-enabled")
.and_then(|value| value.parse::<bool>().ok()),
}
}
fn presigned_headers_to_map<'a>(
headers: impl Iterator<Item = (&'a str, &'a str)>,
) -> BTreeMap<String, String> {
headers
.map(|(name, value)| (name.to_string(), value.to_string()))
.collect()
}
fn aws_datetime_to_string(value: &aws_smithy_types::DateTime) -> String {
value
.fmt(AwsDateTimeFormat::DateTime)
.unwrap_or_else(|_| value.to_string())
}
fn object_version_to_json(value: &aws_sdk_s3::types::ObjectVersion) -> Value {
json!({
"key": value.key(),
"version_id": value.version_id(),
"is_latest": value.is_latest(),
"last_modified": value.last_modified().map(aws_datetime_to_string),
"etag": value.e_tag(),
"size": value.size(),
"storage_class": value.storage_class().map(|v| v.as_str()),
"checksum_algorithm": value.checksum_algorithm().iter().map(|v| v.as_str()).collect::<Vec<_>>(),
"checksum_type": value.checksum_type().map(|v| v.as_str()),
})
}
fn delete_marker_to_json(value: &aws_sdk_s3::types::DeleteMarkerEntry) -> Value {
json!({
"key": value.key(),
"version_id": value.version_id(),
"is_latest": value.is_latest(),
"last_modified": value.last_modified().map(aws_datetime_to_string),
})
}
fn retention_to_json(value: Option<&ObjectLockRetention>) -> Value {
match value {
Some(retention) => json!({
"mode": retention.mode().map(|mode| mode.as_str()),
"retain_until_date": retention.retain_until_date().map(aws_datetime_to_string),
}),
None => Value::Null,
}
}
fn update_has_patch_fields(body: &UpdateFileRequest) -> bool {
body.name.is_some()
|| body.file_name.is_some()
|| body.original_name.is_some()
|| body.resource_id.is_some()
|| body.mime_type.is_some()
|| body.content_type.is_some()
|| body.size_bytes.is_some()
|| body.checksum_sha256.is_some()
|| body.visibility.is_some()
|| body.status.is_some()
|| body.metadata.is_some()
}
fn update_many_has_patch_fields(body: &FileUpdateManyRequest) -> bool {
body.name.is_some()
|| body.file_name.is_some()
|| body.original_name.is_some()
|| body.resource_id.is_some()
|| body.mime_type.is_some()
|| body.content_type.is_some()
|| body.size_bytes.is_some()
|| body.checksum_sha256.is_some()
|| body.visibility.is_some()
|| body.status.is_some()
|| body.metadata.is_some()
}
fn patch_input_from_update(body: &UpdateFileRequest) -> athena_s3::ManagedFilePatchInput {
athena_s3::ManagedFilePatchInput {
file_name: body.file_name.clone().or_else(|| body.name.clone()),
original_name: body.original_name.clone(),
content_type: body.content_type.clone().or_else(|| body.mime_type.clone()),
size_bytes: body.size_bytes,
checksum_sha256: body.checksum_sha256.clone(),
resource_id: body.resource_id.clone(),
visibility: body.visibility.clone(),
status: body.status.clone(),
metadata: body.metadata.clone(),
}
}
fn patch_input_from_update_many(body: &FileUpdateManyRequest) -> athena_s3::ManagedFilePatchInput {
athena_s3::ManagedFilePatchInput {
file_name: body.file_name.clone().or_else(|| body.name.clone()),
original_name: body.original_name.clone(),
content_type: body.content_type.clone().or_else(|| body.mime_type.clone()),
size_bytes: body.size_bytes,
checksum_sha256: body.checksum_sha256.clone(),
resource_id: body.resource_id.clone(),
visibility: body.visibility.clone(),
status: body.status.clone(),
metadata: body.metadata.clone(),
}
}
fn metadata_matches(file: &athena_s3::ManagedFileRecord, expected: Option<&Value>) -> bool {
let Some(Value::Object(filters)) = expected else {
return true;
};
filters.iter().all(|(key, expected)| {
file.metadata
.get(key)
.map(|actual| actual == expected)
.unwrap_or(false)
})
}
fn string_contains_case_insensitive(haystack: Option<&str>, needle: Option<&str>) -> bool {
let Some(needle) = needle.map(str::trim).filter(|value| !value.is_empty()) else {
return true;
};
haystack
.map(|value| {
value
.to_ascii_lowercase()
.contains(&needle.to_ascii_lowercase())
})
.unwrap_or(false)
}
fn optional_exact(haystack: Option<&str>, needle: Option<&str>) -> bool {
let Some(needle) = needle.map(str::trim).filter(|value| !value.is_empty()) else {
return true;
};
haystack.map(|value| value == needle).unwrap_or(false)
}
fn file_matches_list_filters(file: &athena_s3::ManagedFileRecord, body: &ListFilesRequest) -> bool {
metadata_matches(file, body.metadata.as_ref())
&& string_contains_case_insensitive(
file.file_name
.as_deref()
.or(file.original_name.as_deref())
.or(Some(file.name.as_str())),
body.name.as_deref(),
)
&& optional_exact(file.resource_id.as_deref(), body.resource_id.as_deref())
&& optional_exact(
file.content_type.as_deref().or(file.mime_type.as_deref()),
body.content_type.as_deref().or(body.mime_type.as_deref()),
)
&& optional_exact(Some(file.status.as_str()), body.status.as_deref())
&& optional_exact(Some(file.visibility.as_str()), body.visibility.as_deref())
&& optional_exact(Some(file.bucket.as_str()), body.bucket.as_deref())
&& body
.key_prefix
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|prefix| file.storage_key.starts_with(prefix))
.unwrap_or(true)
}
fn file_matches_search_filters(
file: &athena_s3::ManagedFileRecord,
body: &FileSearchRequest,
) -> bool {
metadata_matches(file, body.metadata.as_ref())
&& body
.s3_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|s3_id| file.s3_id.as_deref() == Some(s3_id))
.unwrap_or(true)
&& body
.prefix
.as_deref()
.or(body.key_prefix.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|prefix| file.storage_key.starts_with(prefix))
.unwrap_or(true)
&& string_contains_case_insensitive(
file.file_name
.as_deref()
.or(file.original_name.as_deref())
.or(Some(file.name.as_str())),
body.name.as_deref(),
)
&& optional_exact(file.resource_id.as_deref(), body.resource_id.as_deref())
&& optional_exact(
file.content_type.as_deref().or(file.mime_type.as_deref()),
body.content_type.as_deref().or(body.mime_type.as_deref()),
)
&& optional_exact(Some(file.status.as_str()), body.status.as_deref())
&& optional_exact(Some(file.visibility.as_str()), body.visibility.as_deref())
&& optional_exact(Some(file.bucket.as_str()), body.bucket.as_deref())
}
fn paginate_files(
mut files: Vec<athena_s3::ManagedFileRecord>,
limit: Option<usize>,
offset: Option<usize>,
) -> (
Vec<athena_s3::ManagedFileRecord>,
usize,
usize,
Option<usize>,
bool,
) {
let total = files.len();
let offset = offset.unwrap_or(0).min(total);
let limit = limit.unwrap_or(50).max(1).min(500);
let next_offset = offset.saturating_add(limit);
let has_more = next_offset < total;
let page = files
.drain(offset..total.min(next_offset))
.collect::<Vec<_>>();
(
page,
limit,
offset,
has_more.then_some(next_offset),
has_more,
)
}
async fn optional_access_context(
req: &HttpRequest,
app_state: &AppState,
) -> athena_s3::FileAccessContext {
resolve_optional_storage_actor(req, app_state)
.await
.ok()
.flatten()
.map(|actor| athena_s3::FileAccessContext {
user_id: Some(actor.user_id),
organization_id: actor.active_organization_id,
organization_ids: actor.organization_ids,
role_ids: actor.role_ids,
team_ids: Vec::new(),
is_admin: actor.is_admin,
})
.unwrap_or_default()
}
#[derive(Debug, Clone)]
struct ActorContext {
user_id: String,
organization_id: Option<String>,
organization_ids: Vec<String>,
role_ids: Vec<String>,
is_admin: bool,
session_id: Option<String>,
}
impl ActorContext {
fn access_context(&self) -> athena_s3::FileAccessContext {
actor_access_context(Some(self))
}
}
fn actor_access_context(actor: Option<&ActorContext>) -> athena_s3::FileAccessContext {
let Some(actor) = actor else {
return athena_s3::FileAccessContext {
user_id: None,
organization_id: None,
organization_ids: Vec::new(),
role_ids: Vec::new(),
team_ids: Vec::new(),
is_admin: false,
};
};
athena_s3::FileAccessContext {
user_id: Some(actor.user_id.clone()),
organization_id: actor.organization_id.clone(),
organization_ids: actor.organization_ids.clone(),
role_ids: actor.role_ids.clone(),
team_ids: Vec::new(),
is_admin: actor.is_admin,
}
}
async fn required_actor(
req: &HttpRequest,
app_state: &AppState,
) -> Result<ActorContext, HttpResponse> {
let actor = require_storage_actor(req, app_state).await?;
Ok(ActorContext {
user_id: actor.user_id,
organization_id: actor.active_organization_id,
organization_ids: actor.organization_ids,
role_ids: actor.role_ids,
is_admin: actor.is_admin,
session_id: actor.session_id,
})
}
fn map_store_error(err: athena_s3::S3CredentialStoreError, default_message: &str) -> HttpResponse {
match err {
athena_s3::S3CredentialStoreError::Database(sqlx::Error::Database(_)) => {
service_unavailable(default_message, err.to_string())
}
athena_s3::S3CredentialStoreError::NotFound => {
not_found("File or storage entry not found", err.to_string())
}
athena_s3::S3CredentialStoreError::AccessDenied => forbidden(
"Forbidden",
"you do not have permission to access this file",
),
athena_s3::S3CredentialStoreError::MissingActiveCredential => {
service_unavailable("Storage backend unavailable", err.to_string())
}
athena_s3::S3CredentialStoreError::InvalidInput(_) => {
bad_request("Invalid storage request", err.to_string())
}
athena_s3::S3CredentialStoreError::SchemaHeal(_) => {
internal_error(default_message, err.to_string())
}
athena_s3::S3CredentialStoreError::Database(_) => {
internal_error(default_message, err.to_string())
}
}
}
async fn best_effort_action_log(
pool: &PgPool,
actor: &ActorContext,
file_id: &str,
route: &str,
action: &str,
status: athena_s3::ManagedFileActionStatus,
message: String,
request_payload: Value,
result_payload: Value,
) {
if let Err(err) = athena_s3::record_managed_file_action(
pool,
&athena_s3::ManagedFileActionRecord {
file_id: file_id.to_string(),
organization_id: actor.organization_id.clone(),
user_id: Some(actor.user_id.clone()),
route: route.to_string(),
action: action.to_string(),
status,
message,
request_payload,
result_payload,
metadata: json!({
"source": route,
}),
},
)
.await
{
warn!(file_id = %file_id, route = %route, error = %err, "failed to persist managed file audit/event log");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CacheLayer {
Redis,
Postgres,
}
impl CacheLayer {
fn as_str(self) -> &'static str {
match self {
Self::Redis => "redis",
Self::Postgres => "postgres",
}
}
}
fn cache_read_attempts(settings: &StorageUrlCacheSettings) -> Vec<CacheLayer> {
let ordered = match settings.read_precedence {
UrlCacheReadPrecedence::RedisFirst => [CacheLayer::Redis, CacheLayer::Postgres],
UrlCacheReadPrecedence::PostgresFirst => [CacheLayer::Postgres, CacheLayer::Redis],
};
ordered
.into_iter()
.filter(|layer| match layer {
CacheLayer::Redis => settings.backend.uses_redis(),
CacheLayer::Postgres => settings.backend.uses_postgres(),
})
.collect()
}
fn cache_write_targets(settings: &StorageUrlCacheSettings) -> Vec<CacheLayer> {
match settings.write_strategy {
UrlCacheWriteStrategy::PostgresOnly if settings.backend.uses_postgres() => {
vec![CacheLayer::Postgres]
}
UrlCacheWriteStrategy::RedisOnly if settings.backend.uses_redis() => {
vec![CacheLayer::Redis]
}
UrlCacheWriteStrategy::PostgresAndRedis => {
let mut targets = Vec::with_capacity(2);
if settings.backend.uses_postgres() {
targets.push(CacheLayer::Postgres);
}
if settings.backend.uses_redis() {
targets.push(CacheLayer::Redis);
}
targets
}
_ => Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::{
CacheLayer, StorageUrlCacheSettings, UrlCacheBackend, UrlCacheReadPrecedence,
UrlCacheWriteStrategy, cache_read_attempts, cache_write_targets, normalize_purpose,
preferred_proxy_filename, proxy_content_disposition, sanitize_header_filename,
};
use chrono::Utc;
use serde_json::json;
fn sample_file() -> athena_s3::ManagedFileRecord {
athena_s3::ManagedFileRecord {
id: "file-1".to_string(),
name: "report.pdf".to_string(),
original_name: Some("Quarterly Report.pdf".to_string()),
url: None,
bucket: "bucket".to_string(),
s3_id: Some(uuid::Uuid::nil().to_string()),
prefix_path: Some("reports".to_string()),
size_bytes: Some(128),
mime_type: Some("application/pdf".to_string()),
resource_id: None,
organization_id: Some("org-1".to_string()),
metadata: json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
storage_key: "reports/2026/report.pdf".to_string(),
uploaded_by_user_id: Some("user-1".to_string()),
extension: Some("pdf".to_string()),
is_public: false,
visibility: "private".to_string(),
status: "ready".to_string(),
deleted_at: None,
created_by_user_id: "user-1".to_string(),
file_name: Some("report.pdf".to_string()),
content_type: Some("application/pdf".to_string()),
checksum_sha256: None,
}
}
#[test]
fn normalize_purpose_accepts_supported_values_only() {
assert_eq!(normalize_purpose(Some("read")).as_deref(), Some("read"));
assert_eq!(
normalize_purpose(Some(" DOWNLOAD ")).as_deref(),
Some("download")
);
assert_eq!(normalize_purpose(Some("stream")).as_deref(), Some("stream"));
assert_eq!(normalize_purpose(Some("proxy")), None);
}
#[test]
fn preferred_proxy_filename_uses_original_name_first() {
let file = sample_file();
assert_eq!(preferred_proxy_filename(&file), "Quarterly Report.pdf");
}
#[test]
fn proxy_content_disposition_sanitizes_download_filename() {
let mut file = sample_file();
file.original_name = Some("bad\"name\r\n.pdf".to_string());
assert_eq!(
sanitize_header_filename("bad\"name\r\n.pdf"),
"bad_name__.pdf"
);
assert_eq!(
proxy_content_disposition(&file, "download"),
"attachment; filename=\"bad_name__.pdf\""
);
assert_eq!(
proxy_content_disposition(&file, "read"),
"inline; filename=\"bad_name__.pdf\""
);
}
fn sample_cache_settings(
backend: UrlCacheBackend,
read_precedence: UrlCacheReadPrecedence,
write_strategy: UrlCacheWriteStrategy,
) -> StorageUrlCacheSettings {
StorageUrlCacheSettings {
backend,
read_precedence,
write_strategy,
presign_expires_seconds: 60,
cache_ttl_seconds: 55,
min_remaining_seconds: 5,
redis_prefix: "athena:file-url:".to_string(),
}
}
#[test]
fn cache_read_attempt_matrix_filters_disabled_layers() {
assert!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::Disabled,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
))
.is_empty()
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::Postgres,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresOnly,
)),
vec![CacheLayer::Postgres]
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::Redis,
UrlCacheReadPrecedence::PostgresFirst,
UrlCacheWriteStrategy::RedisOnly,
)),
vec![CacheLayer::Redis]
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::PostgresRedis,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
)),
vec![CacheLayer::Redis, CacheLayer::Postgres]
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::PostgresRedis,
UrlCacheReadPrecedence::PostgresFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
)),
vec![CacheLayer::Postgres, CacheLayer::Redis]
);
}
#[test]
fn cache_write_target_matrix_respects_backend_and_strategy() {
assert!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Disabled,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
))
.is_empty()
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Postgres,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresOnly,
)),
vec![CacheLayer::Postgres]
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Redis,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::RedisOnly,
)),
vec![CacheLayer::Redis]
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::PostgresRedis,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
)),
vec![CacheLayer::Postgres, CacheLayer::Redis]
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Postgres,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::RedisOnly,
)),
Vec::<CacheLayer>::new()
);
}
}