use actix_web::error::ErrorInternalServerError;
use actix_web::http::header;
use actix_web::{HttpRequest, HttpResponse, web, web::Data, web::Json, web::Path, web::Query};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::presigning::PresigningConfig;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use chrono::{DateTime, Utc};
use futures::stream;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sqlx::PgPool;
use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;
use tracing::warn;
use uuid::Uuid;
use super::auth::{require_storage_actor, resolve_optional_storage_actor};
use super::service::{build_s3_client_with_session_token, ensure_bucket_present};
use crate::AppState;
use crate::api::client_context::required_client_pool;
use crate::api::response::{
api_created, api_success, bad_request, forbidden, internal_error, not_found,
service_unavailable,
};
use crate::config_validation::runtime_env_settings;
use crate::utils::redis_client::{
GLOBAL_REDIS, note_redis_failure_and_start_cooldown, note_redis_success,
should_bypass_redis_temporarily,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UrlCacheBackend {
Disabled,
Postgres,
Redis,
PostgresRedis,
}
impl UrlCacheBackend {
fn from_env() -> Self {
match std::env::var("FILE_URL_CACHE_BACKEND")
.ok()
.unwrap_or_else(|| "postgres_redis".to_string())
.trim()
.to_ascii_lowercase()
.as_str()
{
"disabled" => Self::Disabled,
"postgres" => Self::Postgres,
"redis" => Self::Redis,
"postgres_redis" => Self::PostgresRedis,
_ => Self::PostgresRedis,
}
}
fn uses_postgres(self) -> bool {
matches!(self, Self::Postgres | Self::PostgresRedis)
}
fn uses_redis(self) -> bool {
matches!(self, Self::Redis | Self::PostgresRedis)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UrlCacheReadPrecedence {
RedisFirst,
PostgresFirst,
}
impl UrlCacheReadPrecedence {
fn from_env() -> Self {
match std::env::var("FILE_URL_CACHE_READ_PRECEDENCE")
.ok()
.unwrap_or_else(|| "redis_first".to_string())
.trim()
.to_ascii_lowercase()
.as_str()
{
"postgres_first" => Self::PostgresFirst,
_ => Self::RedisFirst,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UrlCacheWriteStrategy {
PostgresOnly,
RedisOnly,
PostgresAndRedis,
}
impl UrlCacheWriteStrategy {
fn from_env() -> Self {
match std::env::var("FILE_URL_CACHE_WRITE_STRATEGY")
.ok()
.unwrap_or_else(|| "postgres_and_redis".to_string())
.trim()
.to_ascii_lowercase()
.as_str()
{
"postgres_only" => Self::PostgresOnly,
"redis_only" => Self::RedisOnly,
_ => Self::PostgresAndRedis,
}
}
}
#[derive(Debug, Clone)]
struct StorageUrlCacheSettings {
backend: UrlCacheBackend,
read_precedence: UrlCacheReadPrecedence,
write_strategy: UrlCacheWriteStrategy,
presign_expires_seconds: u64,
cache_ttl_seconds: u64,
min_remaining_seconds: u64,
redis_prefix: String,
}
impl StorageUrlCacheSettings {
fn load() -> Self {
let runtime = runtime_env_settings();
let presign_expires_seconds = runtime.file_url_presign_expires_seconds.max(1);
let cache_ttl_seconds = runtime
.file_url_cache_ttl_seconds
.max(1)
.min(presign_expires_seconds);
let min_remaining_seconds = runtime
.file_url_cache_min_remaining_seconds
.min(cache_ttl_seconds.saturating_sub(1));
Self {
backend: UrlCacheBackend::from_env(),
read_precedence: UrlCacheReadPrecedence::from_env(),
write_strategy: UrlCacheWriteStrategy::from_env(),
presign_expires_seconds,
cache_ttl_seconds,
min_remaining_seconds,
redis_prefix: std::env::var("REDIS_FILE_URL_CACHE_PREFIX")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "athena:file-url:".to_string()),
}
}
}
static STORAGE_URL_CACHE_SETTINGS: Lazy<StorageUrlCacheSettings> =
Lazy::new(StorageUrlCacheSettings::load);
#[derive(Debug, Deserialize)]
struct UploadFileRequest {
s3_id: String,
#[serde(default)]
bucket: Option<String>,
storage_key: String,
#[serde(default)]
name: Option<String>,
#[serde(default)]
original_name: Option<String>,
#[serde(default)]
resource_id: Option<String>,
#[serde(default)]
mime_type: Option<String>,
#[serde(default)]
size_bytes: Option<i64>,
#[serde(default)]
file_id: Option<String>,
#[serde(default)]
public: Option<bool>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct UploadUrlRequest {
#[serde(flatten)]
file: UploadFileRequest,
#[serde(default)]
content_type: Option<String>,
}
#[derive(Debug, Deserialize)]
struct BatchUploadUrlRequest {
files: Vec<UploadUrlRequest>,
}
#[derive(Debug, Deserialize)]
struct GetFileUrlQuery {
#[serde(default)]
purpose: Option<String>,
}
#[derive(Debug, Deserialize)]
struct UpdateFileRequest {
storage_key: String,
#[serde(default)]
bucket: Option<String>,
}
#[derive(Debug, Deserialize)]
struct SetVisibilityRequest {
#[serde(default)]
public: Option<bool>,
#[serde(default)]
visibility: Option<String>,
}
#[derive(Debug, Deserialize)]
struct FolderDeleteRequest {
s3_id: String,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct FolderMoveRequest {
s3_id: String,
from_prefix: String,
to_prefix: String,
}
#[derive(Debug, Deserialize)]
struct ListFilesRequest {
s3_id: String,
prefix: String,
}
#[derive(Debug, Deserialize)]
struct ConfirmUploadRequest {
#[serde(default)]
size_bytes: Option<i64>,
#[serde(default)]
content_type: Option<String>,
#[serde(default)]
checksum_sha256: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct FileDeleteManyRequest {
file_ids: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct FileUpdateManyRequest {
file_ids: Vec<String>,
storage_key: String,
#[serde(default)]
bucket: Option<String>,
}
#[derive(Debug, Deserialize)]
struct FileVisibilityManyRequest {
file_ids: Vec<String>,
#[serde(default)]
public: Option<bool>,
#[serde(default)]
visibility: Option<String>,
}
#[derive(Debug, Deserialize)]
struct FileSearchRequest {
#[serde(default)]
query: Option<String>,
#[serde(default)]
limit: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct PermissionListRequest {
file_id: String,
}
#[derive(Debug, Deserialize)]
struct PermissionGrantRequest {
file_id: String,
principal_type: String,
principal_id: String,
permission: String,
#[serde(default)]
expires_at: Option<DateTime<Utc>>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct PermissionRevokeRequest {
file_id: String,
principal_type: String,
principal_id: String,
permission: String,
}
#[derive(Debug, Deserialize)]
struct PermissionCheckRequest {
file_id: String,
permission: String,
}
#[derive(Debug, Deserialize)]
struct CopyFileRequest {
storage_key: String,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
file_name: Option<String>,
#[serde(default)]
visibility: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct AuditListRequest {
#[serde(default)]
file_id: Option<String>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
offset: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct FileVersionPath {
file_id: String,
version_id: String,
}
#[derive(Debug, Deserialize)]
struct MultipartCreateRequest {
file_id: String,
#[serde(default)]
content_type: Option<String>,
}
#[derive(Debug, Deserialize)]
struct MultipartSignPartRequest {
file_id: String,
upload_id: String,
part_number: i32,
}
#[derive(Debug, Deserialize)]
struct MultipartCompletedPartInput {
part_number: i32,
etag: String,
}
#[derive(Debug, Deserialize)]
struct MultipartCompleteRequest {
file_id: String,
upload_id: String,
parts: Vec<MultipartCompletedPartInput>,
}
#[derive(Debug, Deserialize)]
struct MultipartAbortRequest {
file_id: String,
upload_id: String,
}
#[derive(Debug, Deserialize)]
struct MultipartListPartsRequest {
file_id: String,
upload_id: String,
}
#[derive(Debug, Serialize)]
struct AuthorizedFileResponse {
file: athena_s3::ManagedFileRecord,
}
#[derive(Debug, Serialize)]
struct PresignedFileUrlResponse {
file_id: String,
bucket: String,
storage_key: String,
purpose: String,
url: String,
expires_at: String,
expires_at_epoch_seconds: i64,
expires_in: u64,
cache_hit: bool,
cache_layer: String,
}
#[derive(Debug, Serialize)]
struct UploadUrlResponse {
file: athena_s3::ManagedFileRecord,
upload: PresignedFileUrlResponse,
}
#[derive(Debug, Serialize)]
struct BatchUploadUrlResponse {
files: Vec<UploadUrlResponse>,
}
#[derive(Debug, Serialize)]
struct ListFilesResponse {
files: Vec<athena_s3::ManagedFileRecord>,
count: usize,
}
#[derive(Debug, Serialize)]
struct FileMutationResponse {
file: athena_s3::ManagedFileRecord,
}
#[derive(Debug, Serialize)]
struct FolderMutationResponse {
s3_id: String,
prefix: String,
processed_files: usize,
}
#[derive(Debug, Serialize)]
struct FileMutationManyResponse {
files: Vec<athena_s3::ManagedFileRecord>,
count: usize,
}
#[derive(Debug, Serialize)]
struct PermissionListResponse {
permissions: Vec<athena_s3::FilePermissionRecord>,
count: usize,
}
#[derive(Debug, Serialize)]
struct PermissionCheckResponse {
allowed: bool,
permission: String,
}
#[derive(Debug, Serialize)]
struct AuditListResponse {
events: Vec<athena_s3::StorageAuditEventRecord>,
count: usize,
}
struct AuthorizedFileReadContext {
file: athena_s3::ManagedFileRecord,
client: S3Client,
bucket: String,
}
pub fn configure_storage_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/files")
.route("/upload-url", web::post().to(create_upload_url))
.route("/upload-urls", web::post().to(create_batch_upload_urls))
.route("/list", web::post().to(list_files))
.route("/search", web::post().to(search_files))
.route("/delete-many", web::post().to(delete_many_files))
.route("/update-many", web::post().to(update_many_files))
.route("/visibility-many", web::post().to(set_many_file_visibility))
.route("/{file_id}", web::get().to(get_file))
.route("/{file_id}", web::patch().to(update_file))
.route("/{file_id}", web::delete().to(delete_file))
.route("/{file_id}/confirm-upload", web::post().to(confirm_upload))
.route("/{file_id}/upload", web::put().to(upload_file_binary))
.route("/{file_id}/copy", web::post().to(copy_file))
.route("/{file_id}/restore", web::post().to(restore_file))
.route("/{file_id}/purge", web::delete().to(purge_file))
.route("/{file_id}/versions", web::get().to(list_file_versions))
.route(
"/{file_id}/versions/{version_id}/restore",
web::post().to(restore_file_version),
)
.route(
"/{file_id}/versions/{version_id}",
web::delete().to(delete_file_version),
)
.route("/{file_id}/retention", web::post().to(set_file_retention))
.route("/{file_id}/url", web::get().to(get_file_url))
.route("/{file_id}/public-url", web::get().to(get_file_public_url))
.route("/{file_id}/proxy", web::get().to(proxy_file))
.route("/{file_id}/visibility", web::post().to(set_file_visibility))
.route(
"/{file_id}/visibility",
web::patch().to(set_file_visibility),
),
)
.service(
web::scope("/folders")
.route("/list", web::post().to(list_folders))
.route("/tree", web::post().to(tree_folders))
.route("/delete", web::post().to(delete_folder))
.route("/move", web::post().to(move_folder)),
)
.service(
web::scope("/permissions")
.route("/list", web::post().to(list_permissions))
.route("/grant", web::post().to(grant_permission))
.route("/revoke", web::post().to(revoke_permission))
.route("/check", web::post().to(check_permission)),
)
.service(
web::scope("/multipart")
.route("/create", web::post().to(create_multipart_upload))
.route("/sign-part", web::post().to(sign_multipart_part))
.route("/complete", web::post().to(complete_multipart_upload))
.route("/abort", web::post().to(abort_multipart_upload))
.route("/list-parts", web::post().to(list_multipart_parts)),
)
.service(web::scope("/audit").route("/list", web::post().to(list_audit_events)));
}
async fn create_upload_url(
req: HttpRequest,
state: Data<AppState>,
body: Json<UploadUrlRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
match create_upload_url_inner(&pool, state.get_ref(), &actor, &body).await {
Ok(response) => api_created("Upload URL generated", response),
Err(resp) => resp,
}
}
async fn create_batch_upload_urls(
req: HttpRequest,
state: Data<AppState>,
body: Json<BatchUploadUrlRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
if body.files.is_empty() {
return bad_request("Invalid files", "files array must not be empty");
}
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let mut files = Vec::with_capacity(body.files.len());
for entry in &body.files {
match create_upload_url_inner(&pool, state.get_ref(), &actor, entry).await {
Ok(response) => files.push(response),
Err(resp) => return resp,
}
}
api_created("Upload URLs generated", BatchUploadUrlResponse { files })
}
async fn list_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<ListFilesRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list files"),
};
let access_context = actor.access_context();
let mut visible = Vec::new();
for file in files {
match athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) => visible.push(file),
Err(athena_s3::S3CredentialStoreError::AccessDenied) => {}
Err(err) => return map_store_error(err, "Failed to list files"),
}
}
api_success(
"Files loaded",
ListFilesResponse {
count: visible.len(),
files: visible,
},
)
}
async fn get_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = optional_access_context(&req, state.get_ref()).await;
match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) => api_success("File loaded", AuthorizedFileResponse { file }),
Err(err) => map_store_error(err, "Failed to load file"),
}
}
async fn get_file_url(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<GetFileUrlQuery>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let purpose = normalize_purpose(query.purpose.as_deref()).unwrap_or_else(|| "read".to_string());
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
let file = context.file;
let client = context.client;
let bucket = context.bucket;
if let Some(response) =
load_cached_url_after_auth(state.get_ref(), &pool, &file, &purpose).await
{
return api_success("Presigned URL generated", response);
}
let entry = match generate_presigned_get_entry(&client, &file, &purpose, &bucket).await {
Ok(entry) => entry,
Err(resp) => return resp,
};
write_cached_entry(state.get_ref(), &pool, &entry).await;
api_success(
"Presigned URL generated",
presigned_response(&entry, false, "generated"),
)
}
async fn proxy_file(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<GetFileUrlQuery>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let purpose = normalize_purpose(query.purpose.as_deref()).unwrap_or_else(|| "read".to_string());
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
let file = context.file;
let client = context.client;
let bucket = context.bucket;
let requested_range = req
.headers()
.get(header::RANGE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let mut get_object = client.get_object().bucket(&bucket).key(&file.storage_key);
if let Some(range) = requested_range.as_deref() {
get_object = get_object.range(range.to_string());
}
let output = match get_object.send().await {
Ok(output) => output,
Err(err) => {
warn!(file_id = %file.id, purpose = %purpose, error = %err, "managed file proxy get_object failed");
return internal_error("Failed to proxy file", err.to_string());
}
};
let content_type = output
.content_type()
.map(str::to_string)
.or_else(|| file.mime_type.clone())
.unwrap_or_else(|| "application/octet-stream".to_string());
let is_partial = output.content_range().is_some() || requested_range.is_some();
let mut response = if is_partial {
HttpResponse::PartialContent()
} else {
HttpResponse::Ok()
};
response.insert_header((header::CONTENT_TYPE, content_type));
response.insert_header((header::CACHE_CONTROL, "private, no-store"));
response.insert_header((header::ACCEPT_RANGES, "bytes"));
response.insert_header((
header::CONTENT_DISPOSITION,
proxy_content_disposition(&file, &purpose),
));
if let Some(content_length) = output.content_length().filter(|value| *value >= 0) {
response.insert_header((header::CONTENT_LENGTH, content_length.to_string()));
}
if let Some(etag) = output.e_tag() {
response.insert_header((header::ETAG, etag.to_string()));
}
if let Some(content_range) = output.content_range() {
response.insert_header((header::CONTENT_RANGE, content_range.to_string()));
}
let file_id = file.id.clone();
let stream_purpose = purpose.clone();
let body_stream = stream::unfold(output.body, move |mut body| {
let file_id = file_id.clone();
let stream_purpose = stream_purpose.clone();
async move {
body.next().await.map(|result| {
let chunk = result.map_err(|err| {
warn!(file_id = %file_id, purpose = %stream_purpose, error = %err, "managed file proxy stream failed");
ErrorInternalServerError(err.to_string())
});
(chunk, body)
})
}
});
response.streaming(body_stream)
}
async fn update_file(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<UpdateFileRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = actor.access_context();
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let new_storage_key = match normalize_storage_key(&body.storage_key) {
Ok(value) => value,
Err(resp) => return resp,
};
if new_storage_key == file.storage_key {
return bad_request(
"Invalid storage key",
"new storage_key must differ from the current value",
);
}
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let copy_source = format!("{}/{}", target.bucket, file.storage_key);
if let Err(err) = client
.copy_object()
.bucket(&target.bucket)
.copy_source(copy_source)
.key(&new_storage_key)
.send()
.await
{
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.move",
athena_s3::ManagedFileActionStatus::Failed,
format!("copy_object failed: {err}"),
json!({ "from": file.storage_key, "to": new_storage_key }),
json!({}),
)
.await;
return internal_error("Failed to move file", err.to_string());
}
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.move",
athena_s3::ManagedFileActionStatus::Failed,
format!("delete_object failed after copy: {err}"),
json!({ "from": file.storage_key, "to": new_storage_key }),
json!({}),
)
.await;
return internal_error("Failed to finalize file move", err.to_string());
}
let updated = match athena_s3::update_file_location(
&pool,
&file.id,
&new_storage_key,
body.bucket.as_deref(),
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file metadata"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.move",
athena_s3::ManagedFileActionStatus::Success,
"File moved".to_string(),
json!({ "from": file.storage_key, "to": updated.storage_key }),
json!({ "file": updated }),
)
.await;
api_success("File updated", FileMutationResponse { file: updated })
}
async fn delete_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = actor.access_context();
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Delete,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.delete",
athena_s3::ManagedFileActionStatus::Failed,
format!("delete_object failed: {err}"),
json!({ "storage_key": file.storage_key }),
json!({}),
)
.await;
return internal_error("Failed to delete file", err.to_string());
}
let deleted = match athena_s3::mark_file_deleted(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to mark file deleted"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}",
"storage.file.delete",
athena_s3::ManagedFileActionStatus::Success,
"File deleted".to_string(),
json!({ "storage_key": file.storage_key }),
json!({ "file": deleted }),
)
.await;
api_success("File deleted", FileMutationResponse { file: deleted })
}
async fn set_file_visibility(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<SetVisibilityRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let access_context = actor.access_context();
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&access_context,
athena_s3::FilePermissionKind::Share,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let requested_visibility = match resolve_requested_visibility(&body) {
Ok(visibility) => visibility,
Err(resp) => return resp,
};
let updated = match athena_s3::set_file_visibility(&pool, &file.id, &requested_visibility).await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file visibility"),
};
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/visibility",
"storage.file.visibility",
athena_s3::ManagedFileActionStatus::Success,
format!("File visibility set to {requested_visibility}"),
json!({ "visibility": requested_visibility }),
json!({ "file": updated }),
)
.await;
api_success(
"File visibility updated",
FileMutationResponse { file: updated },
)
}
async fn confirm_upload(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<ConfirmUploadRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let updated = match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
body.size_bytes.or(file.size_bytes),
body.content_type.as_deref().or(file.mime_type.as_deref()),
body.checksum_sha256.as_deref(),
body.metadata.clone(),
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to confirm file upload"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/confirm-upload",
"storage.file.confirm_upload",
athena_s3::ManagedFileActionStatus::Success,
"Managed file upload confirmed".to_string(),
json!({
"size_bytes": body.size_bytes,
"content_type": body.content_type,
"checksum_sha256": body.checksum_sha256,
}),
json!({ "file": updated }),
)
.await;
api_success(
"File upload confirmed",
FileMutationResponse { file: updated },
)
}
async fn upload_file_binary(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: web::Bytes,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let content_type = req
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.or_else(|| file.mime_type.clone());
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let mut request = client
.put_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.body(ByteStream::from(body.to_vec()));
if let Some(content_type) = content_type.as_ref() {
request = request.content_type(content_type);
}
if let Err(err) = request.send().await {
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/upload",
"storage.file.upload_binary",
athena_s3::ManagedFileActionStatus::Failed,
format!("put_object failed: {err}"),
json!({ "size_bytes": body.len() }),
json!({}),
)
.await;
return internal_error("Failed to upload binary file body", err.to_string());
}
let updated = match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
Some(body.len() as i64),
content_type.as_deref(),
None,
None,
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to update file upload status"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/files/{file_id}/upload",
"storage.file.upload_binary",
athena_s3::ManagedFileActionStatus::Success,
"Managed file binary uploaded".to_string(),
json!({ "size_bytes": body.len() }),
json!({ "file": updated }),
)
.await;
api_success(
"File binary uploaded",
FileMutationResponse { file: updated },
)
}
async fn get_file_public_url(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
) -> HttpResponse {
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let context =
match resolve_authorized_file_read_context(&pool, state.get_ref(), &req, path.as_str())
.await
{
Ok(context) => context,
Err(resp) => return resp,
};
if context.file.visibility != "public" {
return forbidden(
"Forbidden",
"file visibility must be public before requesting a public URL",
);
}
let target = match file_target(&pool, &context.file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let url = match compute_public_object_url(&target, &context.bucket, &context.file.storage_key) {
Ok(url) => url,
Err(resp) => return resp,
};
api_success(
"Public URL resolved",
json!({
"file_id": context.file.id,
"bucket": context.bucket,
"storage_key": context.file.storage_key,
"url": url,
}),
)
}
async fn search_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileSearchRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows = match athena_s3::search_files(
&pool,
&actor.organization_ids,
body.query.as_deref(),
body.limit.unwrap_or(50),
)
.await
{
Ok(rows) => rows,
Err(err) => return map_store_error(err, "Failed to search files"),
};
let access_context = actor.access_context();
let mut visible = Vec::new();
for file in rows {
match athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) => visible.push(file),
Err(athena_s3::S3CredentialStoreError::AccessDenied) => {}
Err(err) => return map_store_error(err, "Failed to search files"),
}
}
api_success(
"Files searched",
ListFilesResponse {
files: visible.clone(),
count: visible.len(),
},
)
}
async fn list_folders(
req: HttpRequest,
state: Data<AppState>,
body: Json<ListFilesRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list folders"),
};
let access_context = actor.access_context();
let mut folders = BTreeSet::new();
for file in files {
if athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
.is_ok()
{
if let Some(prefix_path) = file.prefix_path {
folders.insert(prefix_path);
}
}
}
let folders: Vec<String> = folders.into_iter().collect();
let count = folders.len();
api_success(
"Folders loaded",
json!({
"folders": folders,
"count": count,
}),
)
}
async fn tree_folders(
req: HttpRequest,
state: Data<AppState>,
body: Json<ListFilesRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to build folder tree"),
};
let mut tree = BTreeMap::<String, Value>::new();
let access_context = actor.access_context();
for file in files {
if athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
.is_err()
{
continue;
}
if let Some(prefix_path) = file.prefix_path {
tree.insert(
prefix_path.clone(),
json!({
"path": prefix_path,
"file_count": tree
.get(&prefix_path)
.and_then(|value| value.get("file_count"))
.and_then(|value| value.as_u64())
.unwrap_or(0)
+ 1,
}),
);
}
}
api_success(
"Folder tree loaded",
json!({
"nodes": tree.into_values().collect::<Vec<Value>>(),
}),
)
}
async fn restore_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Delete,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file restore"),
};
let restored = match athena_s3::restore_file(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to restore file"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
api_success("File restored", FileMutationResponse { file: restored })
}
async fn purge_file(req: HttpRequest, state: Data<AppState>, path: Path<String>) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file purge"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let _ = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await;
let purged = match athena_s3::purge_file(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to purge file metadata"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
api_success("File purged", FileMutationResponse { file: purged })
}
async fn copy_file(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<CopyFileRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let source = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize source file"),
};
let target = match file_target(&pool, &source).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let bucket = normalize_bucket(body.bucket.as_deref()).unwrap_or_else(|| target.bucket.clone());
let storage_key = match normalize_storage_key(&body.storage_key) {
Ok(value) => value,
Err(resp) => return resp,
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.copy_object()
.bucket(&bucket)
.copy_source(format!("{}/{}", target.bucket, source.storage_key))
.key(&storage_key)
.send()
.await
{
return internal_error("Failed to copy managed file object", err.to_string());
}
let source_s3_id = match source
.s3_id
.as_deref()
.and_then(|value| Uuid::parse_str(value).ok())
{
Some(s3_id) => s3_id,
None => {
return bad_request(
"Invalid file metadata",
"source file is missing a valid s3_id",
);
}
};
let copied = match athena_s3::upsert_managed_file_for_upload(
&pool,
&athena_s3::ManagedFileUpsertInput {
file_id: None,
s3_id: source_s3_id,
bucket: bucket.clone(),
storage_key,
organization_id: actor.organization_id.clone(),
uploaded_by_user_id: Some(actor.user_id.clone()),
name: body.file_name.clone().or_else(|| source.file_name.clone()),
original_name: source.original_name.clone(),
mime_type: source.mime_type.clone(),
size_bytes: source.size_bytes,
resource_id: source.resource_id.clone(),
is_public: source.is_public,
visibility: body
.visibility
.clone()
.or_else(|| Some(source.visibility.clone())),
metadata: body
.metadata
.clone()
.unwrap_or_else(|| source.metadata.clone()),
},
)
.await
{
Ok(file) => match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
source.size_bytes,
source.mime_type.as_deref(),
source.checksum_sha256.as_deref(),
None,
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to persist copied file metadata"),
},
Err(err) => return map_store_error(err, "Failed to persist copied file metadata"),
};
api_success("File copied", FileMutationResponse { file: copied })
}
async fn delete_many_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileDeleteManyRequest>,
) -> HttpResponse {
if body.file_ids.is_empty() {
return bad_request("Invalid file_ids", "file_ids array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let mut files = Vec::new();
for file_id in &body.file_ids {
let file = match athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Delete,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file delete"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to delete file", err.to_string());
}
let deleted = match athena_s3::mark_file_deleted(&pool, &file.id).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to mark file deleted"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
files.push(deleted);
}
let count = files.len();
api_success("Files deleted", FileMutationManyResponse { count, files })
}
async fn update_many_files(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileUpdateManyRequest>,
) -> HttpResponse {
if body.file_ids.is_empty() {
return bad_request("Invalid file_ids", "file_ids array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let multi_prefix = if body.file_ids.len() > 1 {
Some(match normalize_prefix(&body.storage_key) {
Ok(value) => value,
Err(resp) => return resp,
})
} else {
None
};
let mut files = Vec::new();
for file_id in &body.file_ids {
let file = match athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file update"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let new_storage_key = if let Some(prefix) = multi_prefix.as_ref() {
format!("{}{}", prefix, preferred_proxy_filename(&file))
} else {
match normalize_storage_key(&body.storage_key) {
Ok(value) => value,
Err(resp) => return resp,
}
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
if let Err(err) = client
.copy_object()
.bucket(&target.bucket)
.copy_source(format!("{}/{}", target.bucket, file.storage_key))
.key(&new_storage_key)
.send()
.await
{
return internal_error("Failed to move file", err.to_string());
}
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to finalize file move", err.to_string());
}
let updated = match athena_s3::update_file_location(
&pool,
&file.id,
&new_storage_key,
body.bucket.as_deref(),
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to update file metadata"),
};
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
files.push(updated);
}
let count = files.len();
api_success("Files updated", FileMutationManyResponse { count, files })
}
async fn set_many_file_visibility(
req: HttpRequest,
state: Data<AppState>,
body: Json<FileVisibilityManyRequest>,
) -> HttpResponse {
if body.file_ids.is_empty() {
return bad_request("Invalid file_ids", "file_ids array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let visibility = match resolve_requested_visibility_from_many(&body) {
Ok(visibility) => visibility,
Err(resp) => return resp,
};
let mut files = Vec::new();
for file_id in &body.file_ids {
let file = match athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Share,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file visibility update"),
};
let updated = match athena_s3::set_file_visibility(&pool, &file.id, &visibility).await {
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to update file visibility"),
};
files.push(updated);
}
let count = files.len();
api_success(
"File visibility updated",
FileMutationManyResponse { count, files },
)
}
async fn list_permissions(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionListRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Share,
)
.await
{
return map_store_error(err, "Failed to authorize permission listing");
}
let permissions = match athena_s3::list_file_permissions(&pool, &body.file_id).await {
Ok(rows) => rows,
Err(err) => return map_store_error(err, "Failed to list file permissions"),
};
let count = permissions.len();
api_success(
"File permissions loaded",
PermissionListResponse { count, permissions },
)
}
async fn grant_permission(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionGrantRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let permission = match athena_s3::FilePermissionKind::parse(&body.permission) {
Some(permission) => permission,
None => return bad_request("Invalid permission", "unsupported permission value"),
};
let needed_permission = if permission == athena_s3::FilePermissionKind::Owner {
athena_s3::FilePermissionKind::Owner
} else {
athena_s3::FilePermissionKind::Share
};
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
needed_permission,
)
.await
{
return map_store_error(err, "Failed to authorize permission grant");
}
if let Err(err) = athena_s3::grant_file_permission(
&pool,
&body.file_id,
&athena_s3::FilePermissionGrantInput {
principal_type: body.principal_type.clone(),
principal_id: body.principal_id.clone(),
permission,
granted_by_user_id: Some(actor.user_id.clone()),
expires_at: body.expires_at,
metadata: body.metadata.clone().unwrap_or_else(|| json!({})),
},
)
.await
{
return map_store_error(err, "Failed to grant file permission");
}
api_success("File permission granted", json!({ "granted": true }))
}
async fn revoke_permission(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionRevokeRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let permission = match athena_s3::FilePermissionKind::parse(&body.permission) {
Some(permission) => permission,
None => return bad_request("Invalid permission", "unsupported permission value"),
};
let needed_permission = if permission == athena_s3::FilePermissionKind::Owner {
athena_s3::FilePermissionKind::Owner
} else {
athena_s3::FilePermissionKind::Share
};
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
needed_permission,
)
.await
{
return map_store_error(err, "Failed to authorize permission revoke");
}
let revoked = match athena_s3::revoke_file_permission(
&pool,
&body.file_id,
&body.principal_type,
&body.principal_id,
permission,
)
.await
{
Ok(revoked) => revoked,
Err(err) => return map_store_error(err, "Failed to revoke file permission"),
};
api_success("File permission revoked", json!({ "revoked": revoked }))
}
async fn check_permission(
req: HttpRequest,
state: Data<AppState>,
body: Json<PermissionCheckRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let permission = match athena_s3::FilePermissionKind::parse(&body.permission) {
Some(permission) => permission,
None => return bad_request("Invalid permission", "unsupported permission value"),
};
let allowed = match athena_s3::check_file_permission(
&pool,
&body.file_id,
&actor.access_context(),
permission,
)
.await
{
Ok(allowed) => allowed,
Err(err) => return map_store_error(err, "Failed to check file permission"),
};
api_success(
"File permission checked",
PermissionCheckResponse {
allowed,
permission: permission.as_str().to_string(),
},
)
}
async fn list_audit_events(
req: HttpRequest,
state: Data<AppState>,
body: Json<AuditListRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Some(file_id) = body.file_id.as_deref() {
if let Err(err) = athena_s3::authorize_file_access(
&pool,
file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Read,
)
.await
{
return map_store_error(err, "Failed to authorize audit lookup");
}
} else if !actor.is_admin {
return forbidden(
"Forbidden",
"audit listing without a file_id requires storage-admin privileges",
);
}
let events = match athena_s3::list_storage_audit_events(
&pool,
&athena_s3::StorageAuditListInput {
file_id: body.file_id.clone(),
actor_user_id: if actor.is_admin {
None
} else {
Some(actor.user_id.clone())
},
limit: body.limit.unwrap_or(50),
offset: body.offset.unwrap_or(0),
},
)
.await
{
Ok(events) => events,
Err(err) => return map_store_error(err, "Failed to list storage audit events"),
};
let count = events.len();
api_success(
"Storage audit events loaded",
AuditListResponse { count, events },
)
}
fn managed_storage_feature_unavailable(
route: &str,
feature: &str,
file: &athena_s3::ManagedFileRecord,
provider: Option<&str>,
extra: Value,
) -> HttpResponse {
HttpResponse::NotImplemented().json(json!({
"status": "error",
"message": "Managed storage feature is not available on this Athena runtime",
"error": format!("{feature} is not yet implemented for Athena-managed storage"),
"code": "STORAGE_FEATURE_UNAVAILABLE",
"feature": feature,
"route": route,
"file": {
"id": file.id,
"bucket": file.bucket,
"storage_key": file.storage_key,
"s3_id": file.s3_id,
"status": file.status,
},
"provider": provider,
"context": extra,
}))
}
async fn list_file_versions(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Read,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file version listing"),
};
let provider = file_target(&pool, &file)
.await
.ok()
.map(|target| target.provider);
managed_storage_feature_unavailable(
"/storage/files/{file_id}/versions",
"managed_file_versions",
&file,
provider.as_deref(),
json!({}),
)
}
async fn restore_file_version(
req: HttpRequest,
state: Data<AppState>,
path: Path<FileVersionPath>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let version_path = path.into_inner();
let file = match athena_s3::authorize_file_access(
&pool,
&version_path.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file version restore"),
};
let provider = file_target(&pool, &file)
.await
.ok()
.map(|target| target.provider);
managed_storage_feature_unavailable(
"/storage/files/{file_id}/versions/{version_id}/restore",
"managed_file_version_restore",
&file,
provider.as_deref(),
json!({
"version_id": version_path.version_id,
}),
)
}
async fn delete_file_version(
req: HttpRequest,
state: Data<AppState>,
path: Path<FileVersionPath>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let version_path = path.into_inner();
let file = match athena_s3::authorize_file_access(
&pool,
&version_path.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize file version deletion"),
};
let provider = file_target(&pool, &file)
.await
.ok()
.map(|target| target.provider);
managed_storage_feature_unavailable(
"/storage/files/{file_id}/versions/{version_id}",
"managed_file_version_delete",
&file,
provider.as_deref(),
json!({
"version_id": version_path.version_id,
}),
)
}
async fn set_file_retention(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<Value>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&path,
&actor.access_context(),
athena_s3::FilePermissionKind::Owner,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize managed retention update"),
};
let provider = file_target(&pool, &file)
.await
.ok()
.map(|target| target.provider);
managed_storage_feature_unavailable(
"/storage/files/{file_id}/retention",
"managed_file_retention",
&file,
provider.as_deref(),
json!({
"request": body.into_inner(),
}),
)
}
async fn create_multipart_upload(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartCreateRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart upload"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let mut request = client
.create_multipart_upload()
.bucket(&target.bucket)
.key(&file.storage_key);
if let Some(content_type) = body.content_type.as_deref().and_then(normalize_non_empty) {
request = request.content_type(content_type);
}
match request.send().await {
Ok(output) => api_success(
"Multipart upload created",
json!({
"file_id": file.id,
"bucket": target.bucket,
"storage_key": file.storage_key,
"upload_id": output.upload_id().unwrap_or_default(),
}),
),
Err(err) => internal_error("Failed to create multipart upload", err.to_string()),
}
}
async fn sign_multipart_part(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartSignPartRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart part signing"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let presign_config = match PresigningConfig::expires_in(Duration::from_secs(
STORAGE_URL_CACHE_SETTINGS.presign_expires_seconds,
)) {
Ok(config) => config,
Err(err) => return internal_error("Presign config error", err.to_string()),
};
match client
.upload_part()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.part_number(body.part_number)
.presigned(presign_config)
.await
{
Ok(presigned) => api_success(
"Multipart part signed",
json!({
"file_id": file.id,
"upload_id": body.upload_id,
"part_number": body.part_number,
"method": "PUT",
"url": presigned.uri().to_string(),
"headers": {},
}),
),
Err(err) => internal_error("Failed to sign multipart part", err.to_string()),
}
}
async fn complete_multipart_upload(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartCompleteRequest>,
) -> HttpResponse {
if body.parts.is_empty() {
return bad_request("Invalid parts", "parts array must not be empty");
}
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart completion"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let completed_parts = body
.parts
.iter()
.map(|part| {
CompletedPart::builder()
.part_number(part.part_number)
.e_tag(part.etag.clone())
.build()
})
.collect::<Vec<CompletedPart>>();
match client
.complete_multipart_upload()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build(),
)
.send()
.await
{
Ok(_) => {
let updated = match athena_s3::update_file_status(
&pool,
&file.id,
"uploaded",
file.size_bytes,
file.mime_type.as_deref(),
None,
None,
)
.await
{
Ok(updated) => updated,
Err(err) => return map_store_error(err, "Failed to finalize multipart upload"),
};
api_success(
"Multipart upload completed",
FileMutationResponse { file: updated },
)
}
Err(err) => internal_error("Failed to complete multipart upload", err.to_string()),
}
}
async fn abort_multipart_upload(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartAbortRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart abort"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.abort_multipart_upload()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.send()
.await
{
Ok(_) => api_success(
"Multipart upload aborted",
json!({ "file_id": file.id, "upload_id": body.upload_id, "aborted": true }),
),
Err(err) => internal_error("Failed to abort multipart upload", err.to_string()),
}
}
async fn list_multipart_parts(
req: HttpRequest,
state: Data<AppState>,
body: Json<MultipartListPartsRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let file = match athena_s3::authorize_file_access(
&pool,
&body.file_id,
&actor.access_context(),
athena_s3::FilePermissionKind::Write,
)
.await
{
Ok(file) => file,
Err(err) => return map_store_error(err, "Failed to authorize multipart listing"),
};
let target = match file_target(&pool, &file).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
match client
.list_parts()
.bucket(&target.bucket)
.key(&file.storage_key)
.upload_id(&body.upload_id)
.send()
.await
{
Ok(output) => api_success(
"Multipart parts loaded",
json!({
"file_id": file.id,
"upload_id": body.upload_id,
"parts": output.parts().iter().map(|part| json!({
"part_number": part.part_number(),
"etag": part.e_tag(),
"size": part.size(),
"last_modified": part.last_modified().map(|value| value.to_string()),
})).collect::<Vec<Value>>(),
}),
),
Err(err) => internal_error("Failed to list multipart parts", err.to_string()),
}
}
async fn delete_folder(
req: HttpRequest,
state: Data<AppState>,
body: Json<FolderDeleteRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&body.prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list folder files"),
};
if files.is_empty() {
return not_found(
"Folder not found",
"no tracked files were found for the supplied prefix",
);
}
let access_context = actor.access_context();
for file in &files {
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Delete,
)
.await
{
return map_store_error(err, "Failed to authorize folder delete");
}
}
let target = match file_target(&pool, &files[0]).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
for file in &files {
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to delete folder objects", err.to_string());
}
}
for file in &files {
if let Err(err) = athena_s3::mark_file_deleted(&pool, &file.id).await {
return map_store_error(err, "Failed to update folder file state");
}
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/folders/delete",
"storage.folder.delete",
athena_s3::ManagedFileActionStatus::Success,
"Folder file deleted".to_string(),
json!({ "prefix": body.prefix }),
json!({ "storage_key": file.storage_key }),
)
.await;
}
api_success(
"Folder deleted",
FolderMutationResponse {
s3_id: body.s3_id.clone(),
prefix: body.prefix.clone(),
processed_files: files.len(),
},
)
}
async fn move_folder(
req: HttpRequest,
state: Data<AppState>,
body: Json<FolderMoveRequest>,
) -> HttpResponse {
let actor = match required_actor(&req, state.get_ref()).await {
Ok(actor) => actor,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let s3_id = match parse_uuid(&body.s3_id, "s3_id") {
Ok(value) => value,
Err(resp) => return resp,
};
let from_prefix = match normalize_prefix(&body.from_prefix) {
Ok(value) => value,
Err(resp) => return resp,
};
let to_prefix = match normalize_prefix(&body.to_prefix) {
Ok(value) => value,
Err(resp) => return resp,
};
if from_prefix == to_prefix {
return bad_request(
"Invalid destination prefix",
"from_prefix and to_prefix must differ",
);
}
let files = match athena_s3::list_files_for_prefix(
&pool,
actor.organization_id.as_deref().unwrap_or(""),
s3_id,
&from_prefix,
)
.await
{
Ok(files) => files,
Err(err) => return map_store_error(err, "Failed to list folder files"),
};
if files.is_empty() {
return not_found(
"Folder not found",
"no tracked files were found for the supplied prefix",
);
}
let access_context = actor.access_context();
for file in &files {
if let Err(err) = athena_s3::authorize_file_access(
&pool,
&file.id,
&access_context,
athena_s3::FilePermissionKind::Write,
)
.await
{
return map_store_error(err, "Failed to authorize folder move");
}
}
let target = match file_target(&pool, &files[0]).await {
Ok(target) => target,
Err(err) => return map_store_error(err, "Failed to resolve storage backend"),
};
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&target.bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
for file in &files {
let suffix = match file.storage_key.strip_prefix(&from_prefix) {
Some(suffix) => suffix,
None => continue,
};
let new_storage_key = format!("{to_prefix}{suffix}");
let copy_source = format!("{}/{}", target.bucket, file.storage_key);
if let Err(err) = client
.copy_object()
.bucket(&target.bucket)
.copy_source(copy_source)
.key(&new_storage_key)
.send()
.await
{
return internal_error("Failed to move folder objects", err.to_string());
}
if let Err(err) = client
.delete_object()
.bucket(&target.bucket)
.key(&file.storage_key)
.send()
.await
{
return internal_error("Failed to finalize folder move", err.to_string());
}
if let Err(err) =
athena_s3::update_file_location(&pool, &file.id, &new_storage_key, None).await
{
return map_store_error(err, "Failed to update folder file metadata");
}
invalidate_cached_entry(state.get_ref(), &pool, &file.id).await;
best_effort_action_log(
&pool,
&actor,
&file.id,
"/storage/folders/move",
"storage.folder.move",
athena_s3::ManagedFileActionStatus::Success,
"Folder file moved".to_string(),
json!({ "from_prefix": from_prefix, "to_prefix": to_prefix }),
json!({ "storage_key": new_storage_key }),
)
.await;
}
api_success(
"Folder moved",
FolderMutationResponse {
s3_id: body.s3_id.clone(),
prefix: to_prefix,
processed_files: files.len(),
},
)
}
async fn create_upload_url_inner(
pool: &PgPool,
app_state: &AppState,
actor: &ActorContext,
body: &UploadUrlRequest,
) -> Result<UploadUrlResponse, HttpResponse> {
let s3_id = parse_uuid(&body.file.s3_id, "s3_id")?;
let target = athena_s3::load_managed_storage_target(pool, s3_id)
.await
.map_err(|err| map_store_error(err, "Failed to resolve storage backend"))?;
let bucket =
normalize_bucket(body.file.bucket.as_deref()).unwrap_or_else(|| target.bucket.clone());
ensure_bucket_present(&bucket)?;
let storage_key = normalize_storage_key(&body.file.storage_key)?;
if let Some(existing) =
athena_s3::find_existing_file_by_path(pool, s3_id, &bucket, &storage_key)
.await
.map_err(|err| map_store_error(err, "Failed to resolve existing file"))?
{
let access = actor.access_context();
athena_s3::authorize_file_access(
pool,
&existing.id,
&access,
athena_s3::FilePermissionKind::Write,
)
.await
.map_err(|err| map_store_error(err, "Failed to authorize existing file"))?;
}
let file = athena_s3::upsert_managed_file_for_upload(
pool,
&athena_s3::ManagedFileUpsertInput {
file_id: body.file.file_id.clone(),
s3_id,
bucket: bucket.clone(),
storage_key: storage_key.clone(),
organization_id: actor.organization_id.clone(),
uploaded_by_user_id: Some(actor.user_id.clone()),
name: body.file.name.clone(),
original_name: body.file.original_name.clone(),
mime_type: body
.content_type
.clone()
.or_else(|| body.file.mime_type.clone()),
size_bytes: body.file.size_bytes,
resource_id: body.file.resource_id.clone(),
is_public: body.file.public.unwrap_or(false),
visibility: body.file.visibility.clone(),
metadata: body.file.metadata.clone().unwrap_or_else(|| json!({})),
},
)
.await
.map_err(|err| map_store_error(err, "Failed to upsert file metadata"))?;
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
let upload =
generate_presigned_put_response(&client, &file, &bucket, body.content_type.as_deref())
.await?;
invalidate_cached_entry(app_state, pool, &file.id).await;
best_effort_action_log(
pool,
actor,
&file.id,
"/storage/files/upload-url",
"storage.file.upload_url",
athena_s3::ManagedFileActionStatus::Success,
"Upload URL generated".to_string(),
json!({
"storage_key": file.storage_key,
"bucket": file.bucket,
}),
json!({
"upload": upload,
}),
)
.await;
Ok(UploadUrlResponse { file, upload })
}
async fn generate_presigned_get_entry(
client: &S3Client,
file: &athena_s3::ManagedFileRecord,
purpose: &str,
bucket: &str,
) -> Result<athena_s3::PresignedUrlCacheRecord, HttpResponse> {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let presign_config =
PresigningConfig::expires_in(Duration::from_secs(settings.presign_expires_seconds))
.map_err(|err| internal_error("Presign config error", err.to_string()))?;
let presigned = client
.get_object()
.bucket(bucket)
.key(&file.storage_key)
.presigned(presign_config)
.await
.map_err(|err| internal_error("Failed to generate presigned URL", err.to_string()))?;
let expires_at =
Utc::now() + chrono::Duration::seconds(settings.presign_expires_seconds as i64);
Ok(athena_s3::PresignedUrlCacheRecord {
file_id: file.id.clone(),
purpose: purpose.to_string(),
bucket: bucket.to_string(),
storage_key: file.storage_key.clone(),
url: presigned.uri().to_string(),
expires_at,
expires_at_epoch_seconds: expires_at.timestamp(),
created_at: Utc::now(),
metadata: json!({
"source": "storage.files.url",
}),
})
}
async fn generate_presigned_put_response(
client: &S3Client,
file: &athena_s3::ManagedFileRecord,
bucket: &str,
content_type: Option<&str>,
) -> Result<PresignedFileUrlResponse, HttpResponse> {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let presign_config =
PresigningConfig::expires_in(Duration::from_secs(settings.presign_expires_seconds))
.map_err(|err| internal_error("Presign config error", err.to_string()))?;
let mut request = client.put_object().bucket(bucket).key(&file.storage_key);
if let Some(content_type) = content_type.and_then(normalize_non_empty) {
request = request.content_type(content_type);
}
let presigned = request
.presigned(presign_config)
.await
.map_err(|err| internal_error("Failed to generate upload URL", err.to_string()))?;
let expires_at =
Utc::now() + chrono::Duration::seconds(settings.presign_expires_seconds as i64);
Ok(PresignedFileUrlResponse {
file_id: file.id.clone(),
bucket: bucket.to_string(),
storage_key: file.storage_key.clone(),
purpose: "upload".to_string(),
url: presigned.uri().to_string(),
expires_at: expires_at.to_rfc3339(),
expires_at_epoch_seconds: expires_at.timestamp(),
expires_in: settings.presign_expires_seconds,
cache_hit: false,
cache_layer: "generated".to_string(),
})
}
async fn load_cached_url_after_auth(
app_state: &AppState,
pool: &PgPool,
file: &athena_s3::ManagedFileRecord,
purpose: &str,
) -> Option<PresignedFileUrlResponse> {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let cache_key = local_cache_key(&file.id, purpose);
if let Some(entry) = app_state.storage_file_url_cache.get(&cache_key).await {
if cache_entry_is_usable(&entry, file, settings) {
return Some(presigned_response(&entry, true, "moka"));
}
}
for layer in cache_read_attempts(settings) {
let entry = match layer {
CacheLayer::Redis => load_cached_entry_from_redis(file, purpose).await,
CacheLayer::Postgres => load_cached_entry_from_postgres(pool, &file.id, purpose).await,
};
if let Some(entry) = entry {
if cache_entry_is_usable(&entry, file, settings) {
app_state
.storage_file_url_cache
.insert(cache_key.clone(), entry.clone())
.await;
return Some(presigned_response(&entry, true, layer.as_str()));
}
}
}
None
}
async fn write_cached_entry(
app_state: &AppState,
pool: &PgPool,
entry: &athena_s3::PresignedUrlCacheRecord,
) {
let settings = &*STORAGE_URL_CACHE_SETTINGS;
let cache_key = local_cache_key(&entry.file_id, &entry.purpose);
app_state
.storage_file_url_cache
.insert(cache_key, entry.clone())
.await;
for layer in cache_write_targets(settings) {
match layer {
CacheLayer::Postgres => {
let _ = athena_s3::upsert_cached_presigned_url(pool, entry).await;
}
CacheLayer::Redis => {
write_cached_entry_to_redis(entry, settings).await;
}
}
}
}
async fn invalidate_cached_entry(app_state: &AppState, pool: &PgPool, file_id: &str) {
for purpose in ["read", "download", "stream"] {
app_state
.storage_file_url_cache
.invalidate(&local_cache_key(file_id, purpose))
.await;
let _ = athena_s3::delete_cached_presigned_url(pool, file_id, Some(purpose)).await;
delete_cached_entry_from_redis(file_id, purpose).await;
}
}
async fn load_cached_entry_from_postgres(
pool: &PgPool,
file_id: &str,
purpose: &str,
) -> Option<athena_s3::PresignedUrlCacheRecord> {
match athena_s3::load_cached_presigned_url(pool, file_id, purpose).await {
Ok(value) => value,
Err(err) => {
warn!(file_id = %file_id, purpose = %purpose, error = %err, "failed to load cached presigned url from postgres");
None
}
}
}
async fn load_cached_entry_from_redis(
file: &athena_s3::ManagedFileRecord,
purpose: &str,
) -> Option<athena_s3::PresignedUrlCacheRecord> {
if should_bypass_redis_temporarily() {
return None;
}
let Some(redis) = GLOBAL_REDIS.get() else {
return None;
};
match redis.get(&redis_cache_key(&file.id, purpose)).await {
Ok(Value::Null) => {
note_redis_success();
None
}
Ok(value) => match serde_json::from_value::<athena_s3::PresignedUrlCacheRecord>(value) {
Ok(entry) => {
note_redis_success();
Some(entry)
}
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %file.id, purpose = %purpose, error = %err, "failed to deserialize cached presigned url from redis");
None
}
},
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %file.id, purpose = %purpose, error = %err, "failed to load cached presigned url from redis");
None
}
}
}
async fn write_cached_entry_to_redis(
entry: &athena_s3::PresignedUrlCacheRecord,
settings: &StorageUrlCacheSettings,
) {
if should_bypass_redis_temporarily() {
return;
}
let Some(redis) = GLOBAL_REDIS.get() else {
return;
};
let remaining = entry
.expires_at_epoch_seconds
.saturating_sub(Utc::now().timestamp())
.max(1) as u64;
let ttl = settings.cache_ttl_seconds.min(remaining);
let payload = match serde_json::to_value(entry) {
Ok(payload) => payload,
Err(err) => {
warn!(file_id = %entry.file_id, purpose = %entry.purpose, error = %err, "failed to serialize cached presigned url for redis");
return;
}
};
match redis
.set_with_ttl(
&redis_cache_key(&entry.file_id, &entry.purpose),
&payload,
ttl,
)
.await
{
Ok(()) => note_redis_success(),
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %entry.file_id, purpose = %entry.purpose, error = %err, "failed to write cached presigned url to redis");
}
}
}
async fn delete_cached_entry_from_redis(file_id: &str, purpose: &str) {
if should_bypass_redis_temporarily() {
return;
}
let Some(redis) = GLOBAL_REDIS.get() else {
return;
};
match redis.delete(&redis_cache_key(file_id, purpose)).await {
Ok(()) => note_redis_success(),
Err(err) => {
note_redis_failure_and_start_cooldown();
warn!(file_id = %file_id, purpose = %purpose, error = %err, "failed to delete cached presigned url from redis");
}
}
}
async fn file_target(
pool: &PgPool,
file: &athena_s3::ManagedFileRecord,
) -> Result<athena_s3::ManagedStorageTarget, athena_s3::S3CredentialStoreError> {
let s3_id = file
.s3_id
.as_deref()
.ok_or_else(|| {
athena_s3::S3CredentialStoreError::InvalidInput("file is missing s3_id".to_string())
})
.and_then(|value| {
Uuid::parse_str(value).map_err(|_| {
athena_s3::S3CredentialStoreError::InvalidInput(
"file has invalid s3_id".to_string(),
)
})
})?;
athena_s3::load_managed_storage_target(pool, s3_id).await
}
async fn resolve_authorized_file_read_context(
pool: &PgPool,
app_state: &AppState,
req: &HttpRequest,
file_id: &str,
) -> Result<AuthorizedFileReadContext, HttpResponse> {
let access_context = optional_access_context(req, app_state).await;
let file = athena_s3::authorize_file_access(
pool,
file_id,
&access_context,
athena_s3::FilePermissionKind::Read,
)
.await
.map_err(|err| map_store_error(err, "Failed to authorize file"))?;
let target = file_target(pool, &file)
.await
.map_err(|err| map_store_error(err, "Failed to resolve storage backend"))?;
let bucket = target.bucket.clone();
let client = build_s3_client_with_session_token(
&target.endpoint,
&target.region,
&bucket,
&target.access_key_id,
&target.secret_key,
target.session_token.as_deref(),
)
.await;
Ok(AuthorizedFileReadContext {
file,
client,
bucket,
})
}
fn cache_entry_is_usable(
entry: &athena_s3::PresignedUrlCacheRecord,
file: &athena_s3::ManagedFileRecord,
settings: &StorageUrlCacheSettings,
) -> bool {
if entry.bucket != file.bucket || entry.storage_key != file.storage_key {
return false;
}
let now = Utc::now().timestamp();
let remaining = entry.expires_at_epoch_seconds.saturating_sub(now);
remaining > settings.min_remaining_seconds as i64
}
fn presigned_response(
entry: &athena_s3::PresignedUrlCacheRecord,
cache_hit: bool,
cache_layer: &str,
) -> PresignedFileUrlResponse {
let expires_in = entry
.expires_at_epoch_seconds
.saturating_sub(Utc::now().timestamp())
.max(0) as u64;
PresignedFileUrlResponse {
file_id: entry.file_id.clone(),
bucket: entry.bucket.clone(),
storage_key: entry.storage_key.clone(),
purpose: entry.purpose.clone(),
url: entry.url.clone(),
expires_at: entry.expires_at.to_rfc3339(),
expires_at_epoch_seconds: entry.expires_at_epoch_seconds,
expires_in,
cache_hit,
cache_layer: cache_layer.to_string(),
}
}
fn local_cache_key(file_id: &str, purpose: &str) -> String {
format!("{purpose}:{file_id}")
}
fn redis_cache_key(file_id: &str, purpose: &str) -> String {
format!(
"{}{}:{}",
STORAGE_URL_CACHE_SETTINGS.redis_prefix, purpose, file_id
)
}
fn parse_uuid(raw: &str, field: &str) -> Result<Uuid, HttpResponse> {
Uuid::parse_str(raw).map_err(|_| {
bad_request(
format!("Invalid {field}"),
format!("{field} must be a UUID"),
)
})
}
fn normalize_storage_key(value: &str) -> Result<String, HttpResponse> {
let normalized = value.trim().trim_matches('/').to_string();
if normalized.is_empty() {
return Err(bad_request(
"Invalid storage key",
"storage_key must not be empty",
));
}
if normalized.contains("..") {
return Err(bad_request(
"Invalid storage key",
"storage_key must not contain path traversal segments",
));
}
Ok(normalized)
}
fn normalize_prefix(value: &str) -> Result<String, HttpResponse> {
let normalized = value.trim().trim_matches('/').to_string();
if normalized.is_empty() {
return Err(bad_request("Invalid prefix", "prefix must not be empty"));
}
Ok(format!("{normalized}/"))
}
fn normalize_bucket(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn resolve_requested_visibility(body: &SetVisibilityRequest) -> Result<String, HttpResponse> {
if let Some(visibility) = body.visibility.as_deref() {
let normalized = visibility.trim().to_ascii_lowercase();
if matches!(normalized.as_str(), "private" | "organization" | "public") {
return Ok(normalized);
}
return Err(bad_request(
"Invalid visibility",
"visibility must be one of private, organization, or public",
));
}
match body.public {
Some(true) => Ok("public".to_string()),
Some(false) => Ok("private".to_string()),
None => Err(bad_request(
"Invalid visibility",
"provide either visibility or public",
)),
}
}
fn resolve_requested_visibility_from_many(
body: &FileVisibilityManyRequest,
) -> Result<String, HttpResponse> {
resolve_requested_visibility(&SetVisibilityRequest {
public: body.public,
visibility: body.visibility.clone(),
})
}
fn normalize_non_empty(value: &str) -> Option<String> {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn normalize_purpose(value: Option<&str>) -> Option<String> {
let value = value?.trim().to_ascii_lowercase();
match value.as_str() {
"read" | "download" | "stream" => Some(value),
_ => None,
}
}
fn preferred_proxy_filename(file: &athena_s3::ManagedFileRecord) -> String {
file.original_name
.as_deref()
.and_then(normalize_non_empty)
.or_else(|| normalize_non_empty(&file.name))
.or_else(|| {
file.storage_key
.rsplit('/')
.next()
.and_then(normalize_non_empty)
})
.unwrap_or_else(|| "file".to_string())
}
fn sanitize_header_filename(value: &str) -> String {
let sanitized: String = value
.chars()
.map(|ch| match ch {
'"' | '\\' | '\r' | '\n' => '_',
_ if ch.is_control() => '_',
_ => ch,
})
.collect();
let trimmed = sanitized.trim();
if trimmed.is_empty() {
"file".to_string()
} else {
trimmed.to_string()
}
}
fn proxy_content_disposition(file: &athena_s3::ManagedFileRecord, purpose: &str) -> String {
let disposition = if purpose == "download" {
"attachment"
} else {
"inline"
};
let filename = sanitize_header_filename(&preferred_proxy_filename(file));
format!("{disposition}; filename=\"{filename}\"")
}
fn compute_public_object_url(
target: &athena_s3::ManagedStorageTarget,
bucket: &str,
storage_key: &str,
) -> Result<String, HttpResponse> {
let key = storage_key.trim_start_matches('/');
if let Some(base_url) = target
.public_base_url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
let base = base_url.trim_end_matches('/');
return Ok(format!("{base}/{key}"));
}
let endpoint = target.endpoint.trim();
if endpoint.is_empty() {
return Err(service_unavailable(
"Storage backend unavailable",
"catalog endpoint is required to compute a public URL",
));
}
if target.force_path_style {
return Ok(format!(
"{}/{}/{}",
endpoint.trim_end_matches('/'),
bucket,
key
));
}
if let Some(without_scheme) = endpoint.strip_prefix("https://") {
return Ok(format!("https://{bucket}.{without_scheme}/{}", key));
}
if let Some(without_scheme) = endpoint.strip_prefix("http://") {
return Ok(format!("http://{bucket}.{without_scheme}/{}", key));
}
Ok(format!(
"{}/{}/{}",
endpoint.trim_end_matches('/'),
bucket,
key
))
}
async fn optional_access_context(
req: &HttpRequest,
app_state: &AppState,
) -> athena_s3::FileAccessContext {
resolve_optional_storage_actor(req, app_state)
.await
.ok()
.flatten()
.map(|actor| athena_s3::FileAccessContext {
user_id: Some(actor.user_id),
organization_id: actor.active_organization_id,
organization_ids: actor.organization_ids,
role_ids: actor.role_ids,
team_ids: Vec::new(),
is_admin: actor.is_admin,
})
.unwrap_or_default()
}
#[derive(Debug, Clone)]
struct ActorContext {
user_id: String,
organization_id: Option<String>,
organization_ids: Vec<String>,
role_ids: Vec<String>,
is_admin: bool,
session_id: Option<String>,
}
impl ActorContext {
fn access_context(&self) -> athena_s3::FileAccessContext {
actor_access_context(Some(self))
}
}
fn actor_access_context(actor: Option<&ActorContext>) -> athena_s3::FileAccessContext {
let Some(actor) = actor else {
return athena_s3::FileAccessContext {
user_id: None,
organization_id: None,
organization_ids: Vec::new(),
role_ids: Vec::new(),
team_ids: Vec::new(),
is_admin: false,
};
};
athena_s3::FileAccessContext {
user_id: Some(actor.user_id.clone()),
organization_id: actor.organization_id.clone(),
organization_ids: actor.organization_ids.clone(),
role_ids: actor.role_ids.clone(),
team_ids: Vec::new(),
is_admin: actor.is_admin,
}
}
async fn required_actor(
req: &HttpRequest,
app_state: &AppState,
) -> Result<ActorContext, HttpResponse> {
let actor = require_storage_actor(req, app_state).await?;
Ok(ActorContext {
user_id: actor.user_id,
organization_id: actor.active_organization_id,
organization_ids: actor.organization_ids,
role_ids: actor.role_ids,
is_admin: actor.is_admin,
session_id: actor.session_id,
})
}
fn map_store_error(err: athena_s3::S3CredentialStoreError, default_message: &str) -> HttpResponse {
match err {
athena_s3::S3CredentialStoreError::Database(sqlx::Error::Database(_)) => {
service_unavailable(default_message, err.to_string())
}
athena_s3::S3CredentialStoreError::NotFound => {
not_found("File or storage entry not found", err.to_string())
}
athena_s3::S3CredentialStoreError::AccessDenied => forbidden(
"Forbidden",
"you do not have permission to access this file",
),
athena_s3::S3CredentialStoreError::MissingActiveCredential => {
service_unavailable("Storage backend unavailable", err.to_string())
}
athena_s3::S3CredentialStoreError::InvalidInput(_) => {
bad_request("Invalid storage request", err.to_string())
}
athena_s3::S3CredentialStoreError::SchemaHeal(_) => {
internal_error(default_message, err.to_string())
}
athena_s3::S3CredentialStoreError::Database(_) => {
internal_error(default_message, err.to_string())
}
}
}
async fn best_effort_action_log(
pool: &PgPool,
actor: &ActorContext,
file_id: &str,
route: &str,
action: &str,
status: athena_s3::ManagedFileActionStatus,
message: String,
request_payload: Value,
result_payload: Value,
) {
if let Err(err) = athena_s3::record_managed_file_action(
pool,
&athena_s3::ManagedFileActionRecord {
file_id: file_id.to_string(),
organization_id: actor.organization_id.clone(),
user_id: Some(actor.user_id.clone()),
route: route.to_string(),
action: action.to_string(),
status,
message,
request_payload,
result_payload,
metadata: json!({
"source": route,
}),
},
)
.await
{
warn!(file_id = %file_id, route = %route, error = %err, "failed to persist managed file audit/event log");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CacheLayer {
Redis,
Postgres,
}
impl CacheLayer {
fn as_str(self) -> &'static str {
match self {
Self::Redis => "redis",
Self::Postgres => "postgres",
}
}
}
fn cache_read_attempts(settings: &StorageUrlCacheSettings) -> Vec<CacheLayer> {
let ordered = match settings.read_precedence {
UrlCacheReadPrecedence::RedisFirst => [CacheLayer::Redis, CacheLayer::Postgres],
UrlCacheReadPrecedence::PostgresFirst => [CacheLayer::Postgres, CacheLayer::Redis],
};
ordered
.into_iter()
.filter(|layer| match layer {
CacheLayer::Redis => settings.backend.uses_redis(),
CacheLayer::Postgres => settings.backend.uses_postgres(),
})
.collect()
}
fn cache_write_targets(settings: &StorageUrlCacheSettings) -> Vec<CacheLayer> {
match settings.write_strategy {
UrlCacheWriteStrategy::PostgresOnly if settings.backend.uses_postgres() => {
vec![CacheLayer::Postgres]
}
UrlCacheWriteStrategy::RedisOnly if settings.backend.uses_redis() => {
vec![CacheLayer::Redis]
}
UrlCacheWriteStrategy::PostgresAndRedis => {
let mut targets = Vec::with_capacity(2);
if settings.backend.uses_postgres() {
targets.push(CacheLayer::Postgres);
}
if settings.backend.uses_redis() {
targets.push(CacheLayer::Redis);
}
targets
}
_ => Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::{
CacheLayer, StorageUrlCacheSettings, UrlCacheBackend, UrlCacheReadPrecedence,
UrlCacheWriteStrategy, cache_read_attempts, cache_write_targets, normalize_purpose,
preferred_proxy_filename, proxy_content_disposition, sanitize_header_filename,
};
use chrono::Utc;
use serde_json::json;
fn sample_file() -> athena_s3::ManagedFileRecord {
athena_s3::ManagedFileRecord {
id: "file-1".to_string(),
name: "report.pdf".to_string(),
original_name: Some("Quarterly Report.pdf".to_string()),
url: None,
bucket: "bucket".to_string(),
s3_id: Some(uuid::Uuid::nil().to_string()),
prefix_path: Some("reports".to_string()),
size_bytes: Some(128),
mime_type: Some("application/pdf".to_string()),
resource_id: None,
organization_id: Some("org-1".to_string()),
metadata: json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
storage_key: "reports/2026/report.pdf".to_string(),
uploaded_by_user_id: Some("user-1".to_string()),
extension: Some("pdf".to_string()),
is_public: false,
visibility: "private".to_string(),
status: "ready".to_string(),
deleted_at: None,
created_by_user_id: "user-1".to_string(),
file_name: Some("report.pdf".to_string()),
content_type: Some("application/pdf".to_string()),
checksum_sha256: None,
}
}
#[test]
fn normalize_purpose_accepts_supported_values_only() {
assert_eq!(normalize_purpose(Some("read")).as_deref(), Some("read"));
assert_eq!(
normalize_purpose(Some(" DOWNLOAD ")).as_deref(),
Some("download")
);
assert_eq!(normalize_purpose(Some("stream")).as_deref(), Some("stream"));
assert_eq!(normalize_purpose(Some("proxy")), None);
}
#[test]
fn preferred_proxy_filename_uses_original_name_first() {
let file = sample_file();
assert_eq!(preferred_proxy_filename(&file), "Quarterly Report.pdf");
}
#[test]
fn proxy_content_disposition_sanitizes_download_filename() {
let mut file = sample_file();
file.original_name = Some("bad\"name\r\n.pdf".to_string());
assert_eq!(
sanitize_header_filename("bad\"name\r\n.pdf"),
"bad_name__.pdf"
);
assert_eq!(
proxy_content_disposition(&file, "download"),
"attachment; filename=\"bad_name__.pdf\""
);
assert_eq!(
proxy_content_disposition(&file, "read"),
"inline; filename=\"bad_name__.pdf\""
);
}
fn sample_cache_settings(
backend: UrlCacheBackend,
read_precedence: UrlCacheReadPrecedence,
write_strategy: UrlCacheWriteStrategy,
) -> StorageUrlCacheSettings {
StorageUrlCacheSettings {
backend,
read_precedence,
write_strategy,
presign_expires_seconds: 60,
cache_ttl_seconds: 55,
min_remaining_seconds: 5,
redis_prefix: "athena:file-url:".to_string(),
}
}
#[test]
fn cache_read_attempt_matrix_filters_disabled_layers() {
assert!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::Disabled,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
))
.is_empty()
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::Postgres,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresOnly,
)),
vec![CacheLayer::Postgres]
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::Redis,
UrlCacheReadPrecedence::PostgresFirst,
UrlCacheWriteStrategy::RedisOnly,
)),
vec![CacheLayer::Redis]
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::PostgresRedis,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
)),
vec![CacheLayer::Redis, CacheLayer::Postgres]
);
assert_eq!(
cache_read_attempts(&sample_cache_settings(
UrlCacheBackend::PostgresRedis,
UrlCacheReadPrecedence::PostgresFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
)),
vec![CacheLayer::Postgres, CacheLayer::Redis]
);
}
#[test]
fn cache_write_target_matrix_respects_backend_and_strategy() {
assert!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Disabled,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
))
.is_empty()
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Postgres,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresOnly,
)),
vec![CacheLayer::Postgres]
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Redis,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::RedisOnly,
)),
vec![CacheLayer::Redis]
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::PostgresRedis,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::PostgresAndRedis,
)),
vec![CacheLayer::Postgres, CacheLayer::Redis]
);
assert_eq!(
cache_write_targets(&sample_cache_settings(
UrlCacheBackend::Postgres,
UrlCacheReadPrecedence::RedisFirst,
UrlCacheWriteStrategy::RedisOnly,
)),
Vec::<CacheLayer>::new()
);
}
}