Skip to main content

post3_server/s3/handlers/
objects.rs

1use std::collections::HashMap;
2
3use axum::{
4    body::Body,
5    extract::{Path, Query, State},
6    http::{header::HeaderName, HeaderMap, HeaderValue, StatusCode},
7    response::{IntoResponse, Response},
8};
9use bytes::Bytes;
10use post3::{Post3Error, StorageBackend};
11
12use crate::s3::extractors::{BucketGetQuery, ObjectKeyQuery};
13use crate::s3::handlers::multipart;
14use crate::s3::responses;
15use crate::state::State as AppState;
16
17// --- Dispatch functions ---
18
19/// PUT /{bucket}/{*key} — dispatches to upload_part or put_object based on query params.
20pub async fn put_dispatch<B: StorageBackend>(
21    state: State<AppState<B>>,
22    path: Path<(String, String)>,
23    query: Query<ObjectKeyQuery>,
24    headers: HeaderMap,
25    body: Bytes,
26) -> Response {
27    if query.upload_id.is_some() && query.part_number.is_some() {
28        multipart::upload_part(state, path, query, body).await
29    } else {
30        put_object(state, path, headers, body).await
31    }
32}
33
34/// GET /{bucket}/{*key} — dispatches to list_parts or get_object based on query params.
35pub async fn get_dispatch<B: StorageBackend>(
36    state: State<AppState<B>>,
37    path: Path<(String, String)>,
38    query: Query<ObjectKeyQuery>,
39) -> Response {
40    if query.upload_id.is_some() {
41        multipart::list_parts(state, path, query).await
42    } else {
43        get_object(state, path).await
44    }
45}
46
47/// DELETE /{bucket}/{*key} — dispatches to abort_multipart_upload or delete_object.
48pub async fn delete_dispatch<B: StorageBackend>(
49    state: State<AppState<B>>,
50    path: Path<(String, String)>,
51    query: Query<ObjectKeyQuery>,
52) -> Response {
53    if query.upload_id.is_some() {
54        multipart::abort_multipart_upload(state, path, query).await
55    } else {
56        delete_object(state, path).await
57    }
58}
59
60/// POST /{bucket}/{*key} — dispatches to create_multipart_upload or complete_multipart_upload.
61pub async fn post_dispatch<B: StorageBackend>(
62    state: State<AppState<B>>,
63    path: Path<(String, String)>,
64    query: Query<ObjectKeyQuery>,
65    headers: HeaderMap,
66    body: Bytes,
67) -> Response {
68    if query.uploads.is_some() {
69        multipart::create_multipart_upload(state, path, headers).await
70    } else if query.upload_id.is_some() {
71        multipart::complete_multipart_upload(state, path, query, body).await
72    } else {
73        (
74            StatusCode::BAD_REQUEST,
75            [("Content-Type", "application/xml")],
76            responses::error_xml(
77                "InvalidRequest",
78                "POST requires ?uploads or ?uploadId parameter",
79                &format!("/{}/{}", path.0 .0, path.0 .1),
80            ),
81        )
82            .into_response()
83    }
84}
85
86// --- Object handlers ---
87
88pub async fn put_object<B: StorageBackend>(
89    State(state): State<AppState<B>>,
90    Path((bucket, key)): Path<(String, String)>,
91    headers: HeaderMap,
92    body: Bytes,
93) -> Response {
94    let content_type = headers
95        .get("content-type")
96        .and_then(|v| v.to_str().ok())
97        .map(|s| s.to_string());
98
99    // Extract x-amz-meta-* user metadata
100    let mut metadata = HashMap::new();
101    for (name, value) in headers.iter() {
102        let name_str = name.as_str();
103        if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") {
104            if let Ok(v) = value.to_str() {
105                metadata.insert(meta_key.to_string(), v.to_string());
106            }
107        }
108    }
109
110    match state
111        .store
112        .put_object(&bucket, &key, content_type.as_deref(), metadata, body)
113        .await
114    {
115        Ok(result) => {
116            let mut response_headers = HeaderMap::new();
117            response_headers.insert("ETag", result.etag.parse().unwrap());
118            response_headers.insert(
119                "x-amz-request-id",
120                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
121            );
122            (StatusCode::OK, response_headers).into_response()
123        }
124        Err(Post3Error::BucketNotFound(b)) => (
125            StatusCode::NOT_FOUND,
126            [("Content-Type", "application/xml")],
127            responses::error_xml(
128                "NoSuchBucket",
129                "The specified bucket does not exist",
130                &b,
131            ),
132        )
133            .into_response(),
134        Err(e) => {
135            tracing::error!("put_object error: {e}");
136            (
137                StatusCode::INTERNAL_SERVER_ERROR,
138                [("Content-Type", "application/xml")],
139                responses::error_xml(
140                    "InternalError",
141                    &e.to_string(),
142                    &format!("/{bucket}/{key}"),
143                ),
144            )
145                .into_response()
146        }
147    }
148}
149
150pub async fn get_object<B: StorageBackend>(
151    State(state): State<AppState<B>>,
152    Path((bucket, key)): Path<(String, String)>,
153) -> Response {
154    match state.store.get_object(&bucket, &key).await {
155        Ok(result) => {
156            let mut headers = HeaderMap::new();
157            headers.insert(
158                "Content-Type",
159                HeaderValue::from_str(&result.metadata.content_type).unwrap(),
160            );
161            headers.insert(
162                "Content-Length",
163                HeaderValue::from_str(&result.metadata.size.to_string()).unwrap(),
164            );
165            headers.insert("ETag", HeaderValue::from_str(&result.metadata.etag).unwrap());
166            headers.insert(
167                "Last-Modified",
168                HeaderValue::from_str(
169                    &result
170                        .metadata
171                        .last_modified
172                        .format("%a, %d %b %Y %H:%M:%S GMT")
173                        .to_string(),
174                )
175                .unwrap(),
176            );
177            headers.insert(
178                "x-amz-request-id",
179                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
180            );
181
182            // Return user metadata as x-amz-meta-* headers
183            for (k, v) in &result.user_metadata {
184                let header_name = format!("x-amz-meta-{k}");
185                if let (Ok(name), Ok(val)) = (
186                    header_name.parse::<HeaderName>(),
187                    HeaderValue::from_str(v),
188                ) {
189                    headers.insert(name, val);
190                }
191            }
192
193            (StatusCode::OK, headers, Body::from(result.body)).into_response()
194        }
195        Err(Post3Error::BucketNotFound(b)) => (
196            StatusCode::NOT_FOUND,
197            [("Content-Type", "application/xml")],
198            responses::error_xml(
199                "NoSuchBucket",
200                "The specified bucket does not exist",
201                &b,
202            ),
203        )
204            .into_response(),
205        Err(Post3Error::ObjectNotFound { bucket: b, key: k }) => (
206            StatusCode::NOT_FOUND,
207            [("Content-Type", "application/xml")],
208            responses::error_xml(
209                "NoSuchKey",
210                "The specified key does not exist.",
211                &format!("/{b}/{k}"),
212            ),
213        )
214            .into_response(),
215        Err(e) => {
216            tracing::error!("get_object error: {e}");
217            (
218                StatusCode::INTERNAL_SERVER_ERROR,
219                [("Content-Type", "application/xml")],
220                responses::error_xml(
221                    "InternalError",
222                    &e.to_string(),
223                    &format!("/{bucket}/{key}"),
224                ),
225            )
226                .into_response()
227        }
228    }
229}
230
231pub async fn head_object<B: StorageBackend>(
232    State(state): State<AppState<B>>,
233    Path((bucket, key)): Path<(String, String)>,
234) -> Response {
235    match state.store.head_object(&bucket, &key).await {
236        Ok(Some(result)) => {
237            let mut headers = HeaderMap::new();
238            headers.insert(
239                "Content-Type",
240                HeaderValue::from_str(&result.object.content_type).unwrap(),
241            );
242            headers.insert(
243                "Content-Length",
244                HeaderValue::from_str(&result.object.size.to_string()).unwrap(),
245            );
246            headers.insert("ETag", HeaderValue::from_str(&result.object.etag).unwrap());
247            headers.insert(
248                "Last-Modified",
249                HeaderValue::from_str(
250                    &result
251                        .object
252                        .last_modified
253                        .format("%a, %d %b %Y %H:%M:%S GMT")
254                        .to_string(),
255                )
256                .unwrap(),
257            );
258            headers.insert(
259                "x-amz-request-id",
260                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
261            );
262
263            for (k, v) in &result.user_metadata {
264                let header_name = format!("x-amz-meta-{k}");
265                if let (Ok(name), Ok(val)) = (
266                    header_name.parse::<HeaderName>(),
267                    HeaderValue::from_str(v),
268                ) {
269                    headers.insert(name, val);
270                }
271            }
272
273            (StatusCode::OK, headers).into_response()
274        }
275        Ok(None) => {
276            let mut headers = HeaderMap::new();
277            headers.insert(
278                "x-amz-request-id",
279                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
280            );
281            (StatusCode::NOT_FOUND, headers).into_response()
282        }
283        Err(e) => {
284            tracing::error!("head_object error: {e}");
285            StatusCode::INTERNAL_SERVER_ERROR.into_response()
286        }
287    }
288}
289
290pub async fn delete_object<B: StorageBackend>(
291    State(state): State<AppState<B>>,
292    Path((bucket, key)): Path<(String, String)>,
293) -> Response {
294    match state.store.delete_object(&bucket, &key).await {
295        Ok(()) => {
296            let mut headers = HeaderMap::new();
297            headers.insert(
298                "x-amz-request-id",
299                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
300            );
301            (StatusCode::NO_CONTENT, headers).into_response()
302        }
303        Err(Post3Error::BucketNotFound(b)) => (
304            StatusCode::NOT_FOUND,
305            [("Content-Type", "application/xml")],
306            responses::error_xml(
307                "NoSuchBucket",
308                "The specified bucket does not exist",
309                &b,
310            ),
311        )
312            .into_response(),
313        Err(e) => {
314            tracing::error!("delete_object error: {e}");
315            StatusCode::INTERNAL_SERVER_ERROR.into_response()
316        }
317    }
318}
319
320/// Handles GET /{bucket} — dispatches to ListMultipartUploads, ListObjectVersions,
321/// GetBucketLocation, or ListObjects (v1/v2).
322pub async fn list_or_get<B: StorageBackend>(
323    State(state): State<AppState<B>>,
324    Path(bucket): Path<String>,
325    Query(query): Query<BucketGetQuery>,
326) -> Response {
327    // ?uploads → ListMultipartUploads
328    if query.uploads.is_some() {
329        return multipart::list_multipart_uploads(
330            State(state),
331            Path(bucket),
332            Query(query),
333        )
334        .await;
335    }
336
337    // ?location → GetBucketLocation
338    if query.location.is_some() {
339        return get_bucket_location(State(state), Path(bucket)).await;
340    }
341
342    // ?versions → ListObjectVersions
343    if query.versions.is_some() {
344        return list_object_versions(State(state), Path(bucket), Query(query)).await;
345    }
346
347    // Default: ListObjects (v1 or v2)
348    let is_v2 = query.list_type == Some(2);
349    let continuation_token = if is_v2 {
350        // v2: use continuation-token if present, else start-after
351        query
352            .continuation_token
353            .as_deref()
354            .or(query.start_after.as_deref())
355    } else {
356        query.marker.as_deref()
357    };
358
359    // Treat empty delimiter as absent (S3 spec: empty delimiter = no delimiter)
360    let delimiter = query
361        .delimiter
362        .as_deref()
363        .filter(|d| !d.is_empty());
364
365    match state
366        .store
367        .list_objects_v2(
368            &bucket,
369            query.prefix.as_deref(),
370            continuation_token,
371            query.max_keys,
372            delimiter,
373        )
374        .await
375    {
376        Ok(result) => {
377            let max_keys = query.max_keys.unwrap_or(1000);
378            let mut headers = HeaderMap::new();
379            headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
380            headers.insert(
381                "x-amz-request-id",
382                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
383            );
384
385            let xml = if is_v2 {
386                responses::list_objects_v2_xml(
387                    &bucket,
388                    &result,
389                    max_keys,
390                    query.continuation_token.as_deref(),
391                    query.start_after.as_deref(),
392                )
393            } else {
394                responses::list_objects_v1_xml(
395                    &bucket,
396                    &result,
397                    max_keys,
398                    query.marker.as_deref(),
399                )
400            };
401
402            (StatusCode::OK, headers, xml).into_response()
403        }
404        Err(Post3Error::BucketNotFound(b)) => (
405            StatusCode::NOT_FOUND,
406            [("Content-Type", "application/xml")],
407            responses::error_xml(
408                "NoSuchBucket",
409                "The specified bucket does not exist",
410                &b,
411            ),
412        )
413            .into_response(),
414        Err(e) => {
415            tracing::error!("list_objects error: {e}");
416            (
417                StatusCode::INTERNAL_SERVER_ERROR,
418                [("Content-Type", "application/xml")],
419                responses::error_xml("InternalError", &e.to_string(), &bucket),
420            )
421                .into_response()
422        }
423    }
424}
425
426/// GET /{bucket}?versions — ListObjectVersions (stub: returns all as version "null").
427async fn list_object_versions<B: StorageBackend>(
428    State(state): State<AppState<B>>,
429    Path(bucket): Path<String>,
430    Query(query): Query<BucketGetQuery>,
431) -> Response {
432    let delimiter = query.delimiter.as_deref().filter(|d| !d.is_empty());
433    match state
434        .store
435        .list_objects_v2(
436            &bucket,
437            query.prefix.as_deref(),
438            query.key_marker.as_deref(),
439            query.max_keys,
440            delimiter,
441        )
442        .await
443    {
444        Ok(result) => {
445            let max_keys = query.max_keys.unwrap_or(1000);
446            let mut headers = HeaderMap::new();
447            headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
448            headers.insert(
449                "x-amz-request-id",
450                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
451            );
452            (
453                StatusCode::OK,
454                headers,
455                responses::list_object_versions_xml(
456                    &bucket,
457                    &result,
458                    max_keys,
459                    query.key_marker.as_deref(),
460                ),
461            )
462                .into_response()
463        }
464        Err(Post3Error::BucketNotFound(b)) => (
465            StatusCode::NOT_FOUND,
466            [("Content-Type", "application/xml")],
467            responses::error_xml("NoSuchBucket", "The specified bucket does not exist", &b),
468        )
469            .into_response(),
470        Err(e) => {
471            tracing::error!("list_object_versions error: {e}");
472            (
473                StatusCode::INTERNAL_SERVER_ERROR,
474                [("Content-Type", "application/xml")],
475                responses::error_xml("InternalError", &e.to_string(), &bucket),
476            )
477                .into_response()
478        }
479    }
480}
481
482/// GET /{bucket}?location — GetBucketLocation.
483async fn get_bucket_location<B: StorageBackend>(
484    State(state): State<AppState<B>>,
485    Path(bucket): Path<String>,
486) -> Response {
487    match state.store.head_bucket(&bucket).await {
488        Ok(Some(_)) => {
489            let mut headers = HeaderMap::new();
490            headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
491            headers.insert(
492                "x-amz-request-id",
493                HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
494            );
495            (StatusCode::OK, headers, responses::get_bucket_location_xml()).into_response()
496        }
497        Ok(None) => (
498            StatusCode::NOT_FOUND,
499            [("Content-Type", "application/xml")],
500            responses::error_xml(
501                "NoSuchBucket",
502                "The specified bucket does not exist",
503                &bucket,
504            ),
505        )
506            .into_response(),
507        Err(e) => {
508            tracing::error!("get_bucket_location error: {e}");
509            StatusCode::INTERNAL_SERVER_ERROR.into_response()
510        }
511    }
512}
513
514/// POST /{bucket} — dispatches to DeleteObjects based on ?delete query param.
515pub async fn bucket_post_dispatch<B: StorageBackend>(
516    state: State<AppState<B>>,
517    path: Path<String>,
518    query: Query<crate::s3::extractors::BucketPostQuery>,
519    body: Bytes,
520) -> Response {
521    if query.delete.is_some() {
522        delete_objects(state, path, body).await
523    } else {
524        (
525            StatusCode::BAD_REQUEST,
526            [("Content-Type", "application/xml")],
527            responses::error_xml(
528                "InvalidRequest",
529                "POST on bucket requires ?delete parameter",
530                &format!("/{}", path.0),
531            ),
532        )
533            .into_response()
534    }
535}
536
537/// POST /{bucket}?delete — DeleteObjects (batch delete).
538async fn delete_objects<B: StorageBackend>(
539    State(state): State<AppState<B>>,
540    Path(bucket): Path<String>,
541    body: Bytes,
542) -> Response {
543    let (keys, quiet) = match responses::parse_delete_objects_xml(&body) {
544        Ok(result) => result,
545        Err(msg) => {
546            return (
547                StatusCode::BAD_REQUEST,
548                [("Content-Type", "application/xml")],
549                responses::error_xml("MalformedXML", &msg, &format!("/{bucket}")),
550            )
551                .into_response();
552        }
553    };
554
555    // S3 limits DeleteObjects to 1000 keys
556    if keys.len() > 1000 {
557        return (
558            StatusCode::BAD_REQUEST,
559            [("Content-Type", "application/xml")],
560            responses::error_xml(
561                "MalformedXML",
562                "The number of keys in a DeleteObjects request cannot exceed 1000",
563                &format!("/{bucket}"),
564            ),
565        )
566            .into_response();
567    }
568
569    let mut deleted = Vec::new();
570    let mut errors: Vec<(String, String, String)> = Vec::new();
571
572    for key in keys {
573        match state.store.delete_object(&bucket, &key).await {
574            Ok(()) => {
575                if !quiet {
576                    deleted.push(key);
577                }
578            }
579            Err(e) => {
580                errors.push((key, "InternalError".to_string(), e.to_string()));
581            }
582        }
583    }
584
585    let mut headers = HeaderMap::new();
586    headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
587    headers.insert(
588        "x-amz-request-id",
589        HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
590    );
591
592    (
593        StatusCode::OK,
594        headers,
595        responses::delete_objects_result_xml(&deleted, &errors),
596    )
597        .into_response()
598}