Skip to main content

fraiseql_storage/routes/
mod.rs

1//! HTTP route handlers for storage operations.
2//!
3//! Provides a complete `axum::Router` for object storage:
4//! - `PUT /storage/v1/object/{bucket}/{*key}` — upload
5//! - `GET /storage/v1/object/{bucket}/{*key}` — download
6//! - `DELETE /storage/v1/object/{bucket}/{*key}` — delete
7//! - `GET /storage/v1/list/{bucket}` — list
8//! - `POST /storage/v1/presign/{bucket}/{*key}` — presigned URL
9//! - `GET /storage/v1/render/{bucket}/{*key}` — image transform
10
11#[cfg(test)]
12mod tests;
13
14use std::{collections::HashMap, sync::Arc};
15
16use axum::{
17    Extension, Router,
18    body::Body,
19    extract::{Path, Query, State},
20    http::{HeaderMap, StatusCode, header},
21    response::{IntoResponse, Response},
22    routing::{get, post, put},
23};
24use bytes::Bytes;
25use fraiseql_error::{FileError, FraiseQLError};
26use serde::{Deserialize, Serialize};
27
28#[cfg(feature = "aws-s3")]
29use crate::PresignedUrl;
30use crate::{
31    backend::StorageBackend,
32    config::BucketConfig,
33    metadata::{NewStorageObject, StorageMetadataRepo, StorageMetadataRow},
34    rls::StorageRlsEvaluator,
35};
36
37// ---------------------------------------------------------------------------
38// State
39// ---------------------------------------------------------------------------
40
41/// Shared state for all storage route handlers.
42#[derive(Clone)]
43pub struct StorageState {
44    /// Storage backend (shared across all buckets).
45    pub backend:  Arc<StorageBackend>,
46    /// Metadata repository for object tracking.
47    pub metadata: Arc<StorageMetadataRepo>,
48    /// RLS evaluator for access control.
49    pub rls:      StorageRlsEvaluator,
50    /// Bucket configurations keyed by bucket name.
51    pub buckets:  Arc<HashMap<String, BucketConfig>>,
52}
53
54// ---------------------------------------------------------------------------
55// Request / Response types
56// ---------------------------------------------------------------------------
57
58/// Request body for presigned URL generation.
59#[derive(Debug, Deserialize)]
60pub struct PresignRequest {
61    /// Operation: "upload" (PUT) or "download" (GET).
62    pub operation:       String,
63    /// MIME type (required for uploads, optional for downloads).
64    #[serde(default)]
65    pub content_type:    Option<String>,
66    /// URL validity duration in seconds (default: 3600, max: 86400).
67    #[serde(default = "default_expiry_secs")]
68    pub expires_in_secs: u64,
69}
70
71const fn default_expiry_secs() -> u64 {
72    3600
73}
74
75/// Response body for presigned URL generation.
76#[derive(Debug, Serialize)]
77pub struct PresignResponse {
78    /// The presigned URL.
79    pub url:        String,
80    /// When the URL expires (RFC3339 format).
81    pub expires_at: String,
82    /// HTTP method this URL is valid for.
83    pub method:     String,
84}
85
86#[cfg(feature = "aws-s3")]
87impl From<PresignedUrl> for PresignResponse {
88    fn from(url: PresignedUrl) -> Self {
89        Self {
90            url:        url.url,
91            expires_at: url.expires_at.to_rfc3339(),
92            method:     url.method,
93        }
94    }
95}
96
97/// Query parameters for list endpoint.
98#[derive(Debug, Deserialize)]
99pub struct ListQuery {
100    /// Filter by key prefix.
101    pub prefix: Option<String>,
102    /// Maximum results (default: 100, max: 1000).
103    pub limit:  Option<u32>,
104    /// Offset for pagination.
105    pub offset: Option<u32>,
106}
107
108/// User identity extracted from request (populated by auth middleware).
109#[derive(Debug, Clone, Default)]
110pub struct StorageUser {
111    /// User identifier (sub claim from JWT).
112    pub user_id: Option<String>,
113    /// User roles.
114    pub roles:   Vec<String>,
115}
116
117// ---------------------------------------------------------------------------
118// Router
119// ---------------------------------------------------------------------------
120
121/// Build the storage HTTP router.
122///
123/// Returns an `axum::Router` that handles all storage endpoints.
124/// The caller is responsible for applying authentication middleware
125/// that populates `StorageUser` in request extensions.
126pub fn storage_router(state: StorageState) -> Router {
127    Router::new()
128        .route(
129            "/storage/v1/object/{bucket}/{*key}",
130            put(put_handler).get(get_handler).delete(delete_handler),
131        )
132        .route("/storage/v1/list/{bucket}", get(list_handler))
133        .route("/storage/v1/presign/{bucket}/{*key}", post(presign_handler))
134        .with_state(state)
135}
136
137// ---------------------------------------------------------------------------
138// Handlers
139// ---------------------------------------------------------------------------
140
141/// Upload an object.
142#[tracing::instrument(skip(state, user, headers, body), fields(bucket = %bucket_name, key = %key))]
143async fn put_handler(
144    State(state): State<StorageState>,
145    user: Option<Extension<StorageUser>>,
146    Path((bucket_name, key)): Path<(String, String)>,
147    headers: HeaderMap,
148    body: Bytes,
149) -> Response {
150    let Some(bucket) = state.buckets.get(&bucket_name) else {
151        return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
152    };
153
154    let user = user.map(|Extension(u)| u).unwrap_or_default();
155
156    // RLS: check write permission
157    if !state.rls.can_write(user.user_id.as_deref(), &user.roles, bucket) {
158        tracing::warn!(
159            bucket = %bucket_name,
160            user_id = ?user.user_id,
161            "Storage upload denied: authentication required"
162        );
163        return error_response(StatusCode::UNAUTHORIZED, "unauthorized", "Authentication required");
164    }
165
166    // Validate size
167    if let Some(max_bytes) = bucket.max_object_bytes {
168        if body.len() as u64 > max_bytes {
169            tracing::warn!(
170                bucket = %bucket_name,
171                key = %key,
172                size = body.len(),
173                max_bytes = max_bytes,
174                "Storage upload rejected: payload too large"
175            );
176            return error_response(
177                StatusCode::PAYLOAD_TOO_LARGE,
178                "payload_too_large",
179                "Object exceeds maximum size",
180            );
181        }
182    }
183
184    // Determine content type
185    let content_type = headers
186        .get(header::CONTENT_TYPE)
187        .and_then(|v| v.to_str().ok())
188        .unwrap_or("application/octet-stream");
189
190    // Validate MIME type
191    if let Some(ref allowed) = bucket.allowed_mime_types {
192        if !allowed.is_empty() && !allowed.iter().any(|m| mime_matches(m, content_type)) {
193            tracing::warn!(
194                bucket = %bucket_name,
195                key = %key,
196                content_type = %content_type,
197                "Storage upload rejected: MIME type not allowed"
198            );
199            return error_response(
200                StatusCode::UNSUPPORTED_MEDIA_TYPE,
201                "mime_type_rejected",
202                "Content type not allowed for this bucket",
203            );
204        }
205    }
206
207    // Upload to backend
208    let etag = match state.backend.upload(&key, &body, content_type).await {
209        Ok(etag) => etag,
210        Err(e) => return storage_error_response(&e),
211    };
212
213    // Record metadata
214    let new_obj = NewStorageObject {
215        bucket: bucket_name,
216        key,
217        content_type: content_type.to_string(),
218        // Reason: body length is bounded by max_object_bytes config (set elsewhere); i64
219        // capacity is 9.2 EB so wrap is unreachable.
220        #[allow(clippy::cast_possible_wrap)]
221        size_bytes: body.len() as i64,
222        etag: Some(etag.clone()),
223        owner_id: user.user_id,
224    };
225    if let Err(e) = state.metadata.upsert(&new_obj).await {
226        return storage_error_response(&e);
227    }
228
229    let mut headers = HeaderMap::new();
230    if let Ok(val) = etag.parse() {
231        headers.insert(header::ETAG, val);
232    }
233    (StatusCode::OK, headers).into_response()
234}
235
236/// Download an object.
237#[tracing::instrument(skip(state, user), fields(bucket = %bucket_name, key = %key))]
238async fn get_handler(
239    State(state): State<StorageState>,
240    user: Option<Extension<StorageUser>>,
241    Path((bucket_name, key)): Path<(String, String)>,
242) -> Response {
243    let Some(bucket) = state.buckets.get(&bucket_name) else {
244        return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
245    };
246
247    // Look up metadata for RLS check
248    let row = match state.metadata.get(&bucket_name, &key).await {
249        Ok(Some(row)) => row,
250        Ok(None) => return error_response(StatusCode::NOT_FOUND, "not_found", "Object not found"),
251        Err(e) => return storage_error_response(&e),
252    };
253
254    let user = user.map(|Extension(u)| u).unwrap_or_default();
255    if !state.rls.can_read(user.user_id.as_deref(), &user.roles, bucket, &row) {
256        tracing::warn!(
257            bucket = %bucket_name,
258            key = %key,
259            user_id = ?user.user_id,
260            "Storage download denied: access forbidden"
261        );
262        return error_response(StatusCode::FORBIDDEN, "forbidden", "Access denied");
263    }
264
265    // Download from backend
266    match state.backend.download(&key).await {
267        Ok(data) => {
268            let mut headers = HeaderMap::new();
269            if let Ok(ct) = row.content_type.parse() {
270                headers.insert(header::CONTENT_TYPE, ct);
271            }
272            if let Some(ref etag) = row.etag {
273                if let Ok(val) = etag.parse() {
274                    headers.insert(header::ETAG, val);
275                }
276            }
277            headers.insert(
278                header::CACHE_CONTROL,
279                "public, max-age=3600"
280                    .parse()
281                    .expect("static ASCII header value parses as HeaderValue"),
282            );
283            (StatusCode::OK, headers, Body::from(data)).into_response()
284        },
285        Err(e) => storage_error_response(&e),
286    }
287}
288
289/// Delete an object.
290#[tracing::instrument(skip(state, user), fields(bucket = %bucket_name, key = %key))]
291async fn delete_handler(
292    State(state): State<StorageState>,
293    user: Option<Extension<StorageUser>>,
294    Path((bucket_name, key)): Path<(String, String)>,
295) -> Response {
296    let Some(bucket) = state.buckets.get(&bucket_name) else {
297        return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
298    };
299
300    // Look up metadata for RLS check
301    let row = match state.metadata.get(&bucket_name, &key).await {
302        Ok(Some(row)) => row,
303        Ok(None) => return error_response(StatusCode::NOT_FOUND, "not_found", "Object not found"),
304        Err(e) => return storage_error_response(&e),
305    };
306
307    let user = user.map(|Extension(u)| u).unwrap_or_default();
308    if !state.rls.can_delete(user.user_id.as_deref(), &user.roles, bucket, &row) {
309        tracing::warn!(
310            bucket = %bucket_name,
311            key = %key,
312            user_id = ?user.user_id,
313            "Storage delete denied: access forbidden"
314        );
315        return error_response(StatusCode::FORBIDDEN, "forbidden", "Access denied");
316    }
317
318    // Delete from backend
319    if let Err(e) = state.backend.delete(&key).await {
320        return storage_error_response(&e);
321    }
322
323    // Remove metadata
324    if let Err(e) = state.metadata.delete(&bucket_name, &key).await {
325        return storage_error_response(&e);
326    }
327
328    StatusCode::NO_CONTENT.into_response()
329}
330
331/// List objects in a bucket.
332#[tracing::instrument(skip(state, user, query), fields(bucket = %bucket_name))]
333async fn list_handler(
334    State(state): State<StorageState>,
335    user: Option<Extension<StorageUser>>,
336    Path(bucket_name): Path<String>,
337    Query(query): Query<ListQuery>,
338) -> Response {
339    let Some(bucket) = state.buckets.get(&bucket_name) else {
340        return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
341    };
342
343    let user = user.map(|Extension(u)| u).unwrap_or_default();
344    if !state.rls.can_write(user.user_id.as_deref(), &user.roles, bucket) {
345        // For listing, we require at least authenticated access
346        // Public bucket reads are handled via filter_visible
347        if matches!(bucket.access, crate::config::BucketAccess::Private) {
348            return error_response(
349                StatusCode::UNAUTHORIZED,
350                "unauthorized",
351                "Authentication required",
352            );
353        }
354    }
355
356    let limit = query.limit.unwrap_or(100).min(1000);
357    let offset = query.offset.unwrap_or(0);
358
359    let rows = match state.metadata.list(&bucket_name, query.prefix.as_deref(), limit, offset).await
360    {
361        Ok(rows) => rows,
362        Err(e) => return storage_error_response(&e),
363    };
364
365    // Apply RLS filtering
366    let visible = state.rls.filter_visible(user.user_id.as_deref(), &user.roles, bucket, rows);
367
368    let items: Vec<ListItem> = visible.iter().map(ListItem::from).collect();
369    axum::Json(items).into_response()
370}
371
372/// Generate a presigned URL.
373#[tracing::instrument(skip(state, request), fields(bucket = %bucket_name, key = %key))]
374async fn presign_handler(
375    State(state): State<StorageState>,
376    Path((bucket_name, key)): Path<(String, String)>,
377    axum::Json(request): axum::Json<PresignRequest>,
378) -> Response {
379    let Some(_bucket) = state.buckets.get(&bucket_name) else {
380        return error_response(StatusCode::NOT_FOUND, "bucket_not_found", "Bucket not found");
381    };
382
383    // Validate operation
384    let operation = request.operation.to_lowercase();
385    if operation != "upload" && operation != "download" {
386        return error_response(
387            StatusCode::BAD_REQUEST,
388            "invalid_operation",
389            "operation must be 'upload' or 'download'",
390        );
391    }
392
393    if request.expires_in_secs == 0 || request.expires_in_secs > 86400 {
394        return error_response(
395            StatusCode::BAD_REQUEST,
396            "invalid_expiry",
397            "expires_in_secs must be between 1 and 86400",
398        );
399    }
400
401    #[cfg(feature = "aws-s3")]
402    {
403        use std::time::Duration;
404        let expires_in = Duration::from_secs(request.expires_in_secs);
405
406        let result = if operation == "upload" {
407            let Some(content_type) = request.content_type else {
408                return error_response(
409                    StatusCode::BAD_REQUEST,
410                    "missing_content_type",
411                    "content_type required for upload",
412                );
413            };
414            state.backend.presign_put(&key, &content_type, expires_in).await
415        } else {
416            state.backend.presign_get(&key, expires_in).await
417        };
418
419        match result {
420            Ok(url) => axum::Json(PresignResponse::from(url)).into_response(),
421            Err(e) => storage_error_response(&e),
422        }
423    }
424
425    #[cfg(not(feature = "aws-s3"))]
426    {
427        let _ = (key, operation, request);
428        error_response(
429            StatusCode::NOT_IMPLEMENTED,
430            "not_supported",
431            "Presigned URLs require S3 backend",
432        )
433    }
434}
435
436// ---------------------------------------------------------------------------
437// Helpers
438// ---------------------------------------------------------------------------
439
440/// List item returned in JSON array from list endpoint.
441#[derive(Debug, Serialize)]
442struct ListItem {
443    key:          String,
444    size:         i64,
445    content_type: String,
446    etag:         Option<String>,
447    created_at:   String,
448    updated_at:   String,
449}
450
451impl From<&StorageMetadataRow> for ListItem {
452    fn from(row: &StorageMetadataRow) -> Self {
453        Self {
454            key:          row.key.clone(),
455            size:         row.size_bytes,
456            content_type: row.content_type.clone(),
457            etag:         row.etag.clone(),
458            created_at:   row.created_at.to_rfc3339(),
459            updated_at:   row.updated_at.to_rfc3339(),
460        }
461    }
462}
463
464/// Build a JSON error response.
465fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
466    let body = serde_json::json!({
467        "error": {
468            "code": code,
469            "message": message,
470        }
471    });
472    (status, axum::Json(body)).into_response()
473}
474
475/// Convert a `FraiseQLError` to an appropriate HTTP response.
476///
477/// After F050 (typed `FileError` migration), backend storage failures arrive
478/// as `FraiseQLError::File(FileError::*)` rather than `FraiseQLError::Storage`.
479/// The routing here matches the previous behaviour of
480/// `Storage { code: Some("...") }`:
481///
482/// - `FileError::NotFound` → 404
483/// - `FileError::PermissionDenied` → 403
484/// - other backend variants (`IoError`, `Backend`, `NotImplemented`, `Unsupported`,
485///   `SizeLimitExceeded`, `MimeTypeNotAllowed`) → 500
486/// - `FileError::InvalidKey` → 400
487fn storage_error_response(err: &FraiseQLError) -> Response {
488    if let FraiseQLError::File(file_err) = err {
489        let (status, code) = match file_err {
490            FileError::NotFound { .. } => (StatusCode::NOT_FOUND, "not_found"),
491            FileError::PermissionDenied { .. } => (StatusCode::FORBIDDEN, "permission_denied"),
492            FileError::InvalidKey { .. } => (StatusCode::BAD_REQUEST, "invalid_key"),
493            FileError::IoError { .. } => {
494                tracing::error!(error = %err, "Storage I/O error");
495                (StatusCode::INTERNAL_SERVER_ERROR, "io_error")
496            },
497            FileError::NotImplemented { .. } => {
498                (StatusCode::INTERNAL_SERVER_ERROR, "not_implemented")
499            },
500            FileError::Unsupported { .. } => (StatusCode::INTERNAL_SERVER_ERROR, "not_supported"),
501            FileError::SizeLimitExceeded { .. } => {
502                (StatusCode::INTERNAL_SERVER_ERROR, "size_limit_exceeded")
503            },
504            FileError::MimeTypeNotAllowed { .. } => {
505                (StatusCode::INTERNAL_SERVER_ERROR, "mime_type_not_allowed")
506            },
507            FileError::Backend { .. } => {
508                tracing::error!(error = %err, "Storage backend error");
509                (StatusCode::INTERNAL_SERVER_ERROR, "storage_error")
510            },
511            // Pre-F050 FileError variants — unlikely to reach the storage
512            // routes but handled for completeness.
513            FileError::TooLarge { .. } => (StatusCode::PAYLOAD_TOO_LARGE, "payload_too_large"),
514            FileError::QuotaExceeded => (StatusCode::PAYLOAD_TOO_LARGE, "quota_exceeded"),
515            FileError::InvalidType { .. } | FileError::MimeMismatch { .. } => {
516                (StatusCode::UNSUPPORTED_MEDIA_TYPE, "invalid_type")
517            },
518            FileError::VirusDetected { .. } => (StatusCode::UNPROCESSABLE_ENTITY, "virus_detected"),
519            FileError::Storage { .. } | FileError::Processing { .. } => {
520                tracing::error!(error = %err, "Storage backend error");
521                (StatusCode::INTERNAL_SERVER_ERROR, "storage_error")
522            },
523            // SECURITY: `FileError` is `#[non_exhaustive]`. Any future variant
524            // added without updating this match falls through to a generic
525            // 500 response rather than silently leaking the wrong status.
526            _ => {
527                tracing::error!(error = %err, "Unhandled FileError variant");
528                (StatusCode::INTERNAL_SERVER_ERROR, "internal_error")
529            },
530        };
531        error_response(status, code, &file_err.to_string())
532    } else {
533        tracing::error!(error = %err, "Unexpected storage error");
534        error_response(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", &err.to_string())
535    }
536}
537
538/// Check if a MIME type pattern matches a content type.
539/// Supports wildcard patterns like "image/*".
540fn mime_matches(pattern: &str, content_type: &str) -> bool {
541    if pattern == "*/*" || pattern == content_type {
542        return true;
543    }
544    if let Some(prefix) = pattern.strip_suffix("/*") {
545        return content_type.starts_with(prefix)
546            && content_type.as_bytes().get(prefix.len()) == Some(&b'/');
547    }
548    false
549}