1#[cfg(test)]
12mod tests;
13
14use std::{collections::HashMap, sync::Arc};
15
16use axum::{
17 Extension, Router,
18 body::Body,
19 extract::{Path, Query, State},
20 http::{HeaderMap, StatusCode, header},
21 response::{IntoResponse, Response},
22 routing::{get, post, put},
23};
24use bytes::Bytes;
25use fraiseql_error::{FileError, FraiseQLError};
26use serde::{Deserialize, Serialize};
27
28#[cfg(feature = "aws-s3")]
29use crate::PresignedUrl;
30use crate::{
31 backend::StorageBackend,
32 config::BucketConfig,
33 metadata::{NewStorageObject, StorageMetadataRepo, StorageMetadataRow},
34 rls::StorageRlsEvaluator,
35};
36
37#[derive(Clone)]
43pub struct StorageState {
44 pub backend: Arc<StorageBackend>,
46 pub metadata: Arc<StorageMetadataRepo>,
48 pub rls: StorageRlsEvaluator,
50 pub buckets: Arc<HashMap<String, BucketConfig>>,
52}
53
54#[derive(Debug, Deserialize)]
60pub struct PresignRequest {
61 pub operation: String,
63 #[serde(default)]
65 pub content_type: Option<String>,
66 #[serde(default = "default_expiry_secs")]
68 pub expires_in_secs: u64,
69}
70
71const fn default_expiry_secs() -> u64 {
72 3600
73}
74
75#[derive(Debug, Serialize)]
77pub struct PresignResponse {
78 pub url: String,
80 pub expires_at: String,
82 pub method: String,
84}
85
86#[cfg(feature = "aws-s3")]
87impl From<PresignedUrl> for PresignResponse {
88 fn from(url: PresignedUrl) -> Self {
89 Self {
90 url: url.url,
91 expires_at: url.expires_at.to_rfc3339(),
92 method: url.method,
93 }
94 }
95}
96
97#[derive(Debug, Deserialize)]
99pub struct ListQuery {
100 pub prefix: Option<String>,
102 pub limit: Option<u32>,
104 pub offset: Option<u32>,
106}
107
108#[derive(Debug, Clone, Default)]
110pub struct StorageUser {
111 pub user_id: Option<String>,
113 pub roles: Vec<String>,
115}
116
117pub fn storage_router(state: StorageState) -> Router {
127 Router::new()
128 .route(
129 "/storage/v1/object/{bucket}/{*key}",
130 put(put_handler).get(get_handler).delete(delete_handler),
131 )
132 .route("/storage/v1/list/{bucket}", get(list_handler))
133 .route("/storage/v1/presign/{bucket}/{*key}", post(presign_handler))
134 .with_state(state)
135}
136
137#[tracing::instrument(skip(state, user, headers, body), fields(bucket = %bucket_name, key = %key))]
143async fn put_handler(
144 State(state): State<StorageState>,
145 user: Option<Extension<StorageUser>>,
146 Path((bucket_name, key)): Path<(String, String)>,
147 headers: HeaderMap,
148 body: Bytes,
149) -> Response {
150 let Some(bucket) = state.buckets.get(&bucket_name) else {
151 return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
152 };
153
154 let user = user.map(|Extension(u)| u).unwrap_or_default();
155
156 if !state.rls.can_write(user.user_id.as_deref(), &user.roles, bucket) {
158 tracing::warn!(
159 bucket = %bucket_name,
160 user_id = ?user.user_id,
161 "Storage upload denied: authentication required"
162 );
163 return error_response(StatusCode::UNAUTHORIZED, "unauthorized", "Authentication required");
164 }
165
166 if let Some(max_bytes) = bucket.max_object_bytes {
168 if body.len() as u64 > max_bytes {
169 tracing::warn!(
170 bucket = %bucket_name,
171 key = %key,
172 size = body.len(),
173 max_bytes = max_bytes,
174 "Storage upload rejected: payload too large"
175 );
176 return error_response(
177 StatusCode::PAYLOAD_TOO_LARGE,
178 "payload_too_large",
179 "Object exceeds maximum size",
180 );
181 }
182 }
183
184 let content_type = headers
186 .get(header::CONTENT_TYPE)
187 .and_then(|v| v.to_str().ok())
188 .unwrap_or("application/octet-stream");
189
190 if let Some(ref allowed) = bucket.allowed_mime_types {
192 if !allowed.is_empty() && !allowed.iter().any(|m| mime_matches(m, content_type)) {
193 tracing::warn!(
194 bucket = %bucket_name,
195 key = %key,
196 content_type = %content_type,
197 "Storage upload rejected: MIME type not allowed"
198 );
199 return error_response(
200 StatusCode::UNSUPPORTED_MEDIA_TYPE,
201 "mime_type_rejected",
202 "Content type not allowed for this bucket",
203 );
204 }
205 }
206
207 let etag = match state.backend.upload(&key, &body, content_type).await {
209 Ok(etag) => etag,
210 Err(e) => return storage_error_response(&e),
211 };
212
213 let new_obj = NewStorageObject {
215 bucket: bucket_name,
216 key,
217 content_type: content_type.to_string(),
218 #[allow(clippy::cast_possible_wrap)]
221 size_bytes: body.len() as i64,
222 etag: Some(etag.clone()),
223 owner_id: user.user_id,
224 };
225 if let Err(e) = state.metadata.upsert(&new_obj).await {
226 return storage_error_response(&e);
227 }
228
229 let mut headers = HeaderMap::new();
230 if let Ok(val) = etag.parse() {
231 headers.insert(header::ETAG, val);
232 }
233 (StatusCode::OK, headers).into_response()
234}
235
236#[tracing::instrument(skip(state, user), fields(bucket = %bucket_name, key = %key))]
238async fn get_handler(
239 State(state): State<StorageState>,
240 user: Option<Extension<StorageUser>>,
241 Path((bucket_name, key)): Path<(String, String)>,
242) -> Response {
243 let Some(bucket) = state.buckets.get(&bucket_name) else {
244 return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
245 };
246
247 let row = match state.metadata.get(&bucket_name, &key).await {
249 Ok(Some(row)) => row,
250 Ok(None) => return error_response(StatusCode::NOT_FOUND, "not_found", "Object not found"),
251 Err(e) => return storage_error_response(&e),
252 };
253
254 let user = user.map(|Extension(u)| u).unwrap_or_default();
255 if !state.rls.can_read(user.user_id.as_deref(), &user.roles, bucket, &row) {
256 tracing::warn!(
257 bucket = %bucket_name,
258 key = %key,
259 user_id = ?user.user_id,
260 "Storage download denied: access forbidden"
261 );
262 return error_response(StatusCode::FORBIDDEN, "forbidden", "Access denied");
263 }
264
265 match state.backend.download(&key).await {
267 Ok(data) => {
268 let mut headers = HeaderMap::new();
269 if let Ok(ct) = row.content_type.parse() {
270 headers.insert(header::CONTENT_TYPE, ct);
271 }
272 if let Some(ref etag) = row.etag {
273 if let Ok(val) = etag.parse() {
274 headers.insert(header::ETAG, val);
275 }
276 }
277 headers.insert(
278 header::CACHE_CONTROL,
279 "public, max-age=3600"
280 .parse()
281 .expect("static ASCII header value parses as HeaderValue"),
282 );
283 (StatusCode::OK, headers, Body::from(data)).into_response()
284 },
285 Err(e) => storage_error_response(&e),
286 }
287}
288
289#[tracing::instrument(skip(state, user), fields(bucket = %bucket_name, key = %key))]
291async fn delete_handler(
292 State(state): State<StorageState>,
293 user: Option<Extension<StorageUser>>,
294 Path((bucket_name, key)): Path<(String, String)>,
295) -> Response {
296 let Some(bucket) = state.buckets.get(&bucket_name) else {
297 return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
298 };
299
300 let row = match state.metadata.get(&bucket_name, &key).await {
302 Ok(Some(row)) => row,
303 Ok(None) => return error_response(StatusCode::NOT_FOUND, "not_found", "Object not found"),
304 Err(e) => return storage_error_response(&e),
305 };
306
307 let user = user.map(|Extension(u)| u).unwrap_or_default();
308 if !state.rls.can_delete(user.user_id.as_deref(), &user.roles, bucket, &row) {
309 tracing::warn!(
310 bucket = %bucket_name,
311 key = %key,
312 user_id = ?user.user_id,
313 "Storage delete denied: access forbidden"
314 );
315 return error_response(StatusCode::FORBIDDEN, "forbidden", "Access denied");
316 }
317
318 if let Err(e) = state.backend.delete(&key).await {
320 return storage_error_response(&e);
321 }
322
323 if let Err(e) = state.metadata.delete(&bucket_name, &key).await {
325 return storage_error_response(&e);
326 }
327
328 StatusCode::NO_CONTENT.into_response()
329}
330
331#[tracing::instrument(skip(state, user, query), fields(bucket = %bucket_name))]
333async fn list_handler(
334 State(state): State<StorageState>,
335 user: Option<Extension<StorageUser>>,
336 Path(bucket_name): Path<String>,
337 Query(query): Query<ListQuery>,
338) -> Response {
339 let Some(bucket) = state.buckets.get(&bucket_name) else {
340 return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
341 };
342
343 let user = user.map(|Extension(u)| u).unwrap_or_default();
344 if !state.rls.can_write(user.user_id.as_deref(), &user.roles, bucket) {
345 if matches!(bucket.access, crate::config::BucketAccess::Private) {
348 return error_response(
349 StatusCode::UNAUTHORIZED,
350 "unauthorized",
351 "Authentication required",
352 );
353 }
354 }
355
356 let limit = query.limit.unwrap_or(100).min(1000);
357 let offset = query.offset.unwrap_or(0);
358
359 let rows = match state.metadata.list(&bucket_name, query.prefix.as_deref(), limit, offset).await
360 {
361 Ok(rows) => rows,
362 Err(e) => return storage_error_response(&e),
363 };
364
365 let visible = state.rls.filter_visible(user.user_id.as_deref(), &user.roles, bucket, rows);
367
368 let items: Vec<ListItem> = visible.iter().map(ListItem::from).collect();
369 axum::Json(items).into_response()
370}
371
372#[tracing::instrument(skip(state, request), fields(bucket = %bucket_name, key = %key))]
374async fn presign_handler(
375 State(state): State<StorageState>,
376 Path((bucket_name, key)): Path<(String, String)>,
377 axum::Json(request): axum::Json<PresignRequest>,
378) -> Response {
379 let Some(_bucket) = state.buckets.get(&bucket_name) else {
380 return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
381 };
382
383 let operation = request.operation.to_lowercase();
385 if operation != "upload" && operation != "download" {
386 return error_response(
387 StatusCode::BAD_REQUEST,
388 "invalid_operation",
389 "operation must be 'upload' or 'download'",
390 );
391 }
392
393 if request.expires_in_secs == 0 || request.expires_in_secs > 86400 {
394 return error_response(
395 StatusCode::BAD_REQUEST,
396 "invalid_expiry",
397 "expires_in_secs must be between 1 and 86400",
398 );
399 }
400
401 #[cfg(feature = "aws-s3")]
402 {
403 use std::time::Duration;
404 let expires_in = Duration::from_secs(request.expires_in_secs);
405
406 let result = if operation == "upload" {
407 let Some(content_type) = request.content_type else {
408 return error_response(
409 StatusCode::BAD_REQUEST,
410 "missing_content_type",
411 "content_type required for upload",
412 );
413 };
414 state.backend.presign_put(&key, &content_type, expires_in).await
415 } else {
416 state.backend.presign_get(&key, expires_in).await
417 };
418
419 match result {
420 Ok(url) => axum::Json(PresignResponse::from(url)).into_response(),
421 Err(e) => storage_error_response(&e),
422 }
423 }
424
425 #[cfg(not(feature = "aws-s3"))]
426 {
427 let _ = (key, operation, request);
428 error_response(
429 StatusCode::NOT_IMPLEMENTED,
430 "not_supported",
431 "Presigned URLs require S3 backend",
432 )
433 }
434}
435
436#[derive(Debug, Serialize)]
442struct ListItem {
443 key: String,
444 size: i64,
445 content_type: String,
446 etag: Option<String>,
447 created_at: String,
448 updated_at: String,
449}
450
451impl From<&StorageMetadataRow> for ListItem {
452 fn from(row: &StorageMetadataRow) -> Self {
453 Self {
454 key: row.key.clone(),
455 size: row.size_bytes,
456 content_type: row.content_type.clone(),
457 etag: row.etag.clone(),
458 created_at: row.created_at.to_rfc3339(),
459 updated_at: row.updated_at.to_rfc3339(),
460 }
461 }
462}
463
464fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
466 let body = serde_json::json!({
467 "error": {
468 "code": code,
469 "message": message,
470 }
471 });
472 (status, axum::Json(body)).into_response()
473}
474
475fn storage_error_response(err: &FraiseQLError) -> Response {
488 if let FraiseQLError::File(file_err) = err {
489 let (status, code) = match file_err {
490 FileError::NotFound { .. } => (StatusCode::NOT_FOUND, "not_found"),
491 FileError::PermissionDenied { .. } => (StatusCode::FORBIDDEN, "permission_denied"),
492 FileError::InvalidKey { .. } => (StatusCode::BAD_REQUEST, "invalid_key"),
493 FileError::IoError { .. } => {
494 tracing::error!(error = %err, "Storage I/O error");
495 (StatusCode::INTERNAL_SERVER_ERROR, "io_error")
496 },
497 FileError::NotImplemented { .. } => {
498 (StatusCode::INTERNAL_SERVER_ERROR, "not_implemented")
499 },
500 FileError::Unsupported { .. } => (StatusCode::INTERNAL_SERVER_ERROR, "not_supported"),
501 FileError::SizeLimitExceeded { .. } => {
502 (StatusCode::INTERNAL_SERVER_ERROR, "size_limit_exceeded")
503 },
504 FileError::MimeTypeNotAllowed { .. } => {
505 (StatusCode::INTERNAL_SERVER_ERROR, "mime_type_not_allowed")
506 },
507 FileError::Backend { .. } => {
508 tracing::error!(error = %err, "Storage backend error");
509 (StatusCode::INTERNAL_SERVER_ERROR, "storage_error")
510 },
511 FileError::TooLarge { .. } => (StatusCode::PAYLOAD_TOO_LARGE, "payload_too_large"),
514 FileError::QuotaExceeded => (StatusCode::PAYLOAD_TOO_LARGE, "quota_exceeded"),
515 FileError::InvalidType { .. } | FileError::MimeMismatch { .. } => {
516 (StatusCode::UNSUPPORTED_MEDIA_TYPE, "invalid_type")
517 },
518 FileError::VirusDetected { .. } => (StatusCode::UNPROCESSABLE_ENTITY, "virus_detected"),
519 FileError::Storage { .. } | FileError::Processing { .. } => {
520 tracing::error!(error = %err, "Storage backend error");
521 (StatusCode::INTERNAL_SERVER_ERROR, "storage_error")
522 },
523 _ => {
527 tracing::error!(error = %err, "Unhandled FileError variant");
528 (StatusCode::INTERNAL_SERVER_ERROR, "internal_error")
529 },
530 };
531 error_response(status, code, &file_err.to_string())
532 } else {
533 tracing::error!(error = %err, "Unexpected storage error");
534 error_response(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", &err.to_string())
535 }
536}
537
538fn mime_matches(pattern: &str, content_type: &str) -> bool {
541 if pattern == "*/*" || pattern == content_type {
542 return true;
543 }
544 if let Some(prefix) = pattern.strip_suffix("/*") {
545 return content_type.starts_with(prefix)
546 && content_type.as_bytes().get(prefix.len()) == Some(&b'/');
547 }
548 false
549}