1use std::collections::HashMap;
2
3use axum::{
4 body::Body,
5 extract::{Path, Query, State},
6 http::{header::HeaderName, HeaderMap, HeaderValue, StatusCode},
7 response::{IntoResponse, Response},
8};
9use bytes::Bytes;
10use post3::{Post3Error, StorageBackend};
11
12use crate::s3::extractors::{BucketGetQuery, ObjectKeyQuery};
13use crate::s3::handlers::multipart;
14use crate::s3::responses;
15use crate::state::State as AppState;
16
17pub async fn put_dispatch<B: StorageBackend>(
21 state: State<AppState<B>>,
22 path: Path<(String, String)>,
23 query: Query<ObjectKeyQuery>,
24 headers: HeaderMap,
25 body: Bytes,
26) -> Response {
27 if query.upload_id.is_some() && query.part_number.is_some() {
28 multipart::upload_part(state, path, query, body).await
29 } else {
30 put_object(state, path, headers, body).await
31 }
32}
33
34pub async fn get_dispatch<B: StorageBackend>(
36 state: State<AppState<B>>,
37 path: Path<(String, String)>,
38 query: Query<ObjectKeyQuery>,
39) -> Response {
40 if query.upload_id.is_some() {
41 multipart::list_parts(state, path, query).await
42 } else {
43 get_object(state, path).await
44 }
45}
46
47pub async fn delete_dispatch<B: StorageBackend>(
49 state: State<AppState<B>>,
50 path: Path<(String, String)>,
51 query: Query<ObjectKeyQuery>,
52) -> Response {
53 if query.upload_id.is_some() {
54 multipart::abort_multipart_upload(state, path, query).await
55 } else {
56 delete_object(state, path).await
57 }
58}
59
60pub async fn post_dispatch<B: StorageBackend>(
62 state: State<AppState<B>>,
63 path: Path<(String, String)>,
64 query: Query<ObjectKeyQuery>,
65 headers: HeaderMap,
66 body: Bytes,
67) -> Response {
68 if query.uploads.is_some() {
69 multipart::create_multipart_upload(state, path, headers).await
70 } else if query.upload_id.is_some() {
71 multipart::complete_multipart_upload(state, path, query, body).await
72 } else {
73 (
74 StatusCode::BAD_REQUEST,
75 [("Content-Type", "application/xml")],
76 responses::error_xml(
77 "InvalidRequest",
78 "POST requires ?uploads or ?uploadId parameter",
79 &format!("/{}/{}", path.0 .0, path.0 .1),
80 ),
81 )
82 .into_response()
83 }
84}
85
86pub async fn put_object<B: StorageBackend>(
89 State(state): State<AppState<B>>,
90 Path((bucket, key)): Path<(String, String)>,
91 headers: HeaderMap,
92 body: Bytes,
93) -> Response {
94 let content_type = headers
95 .get("content-type")
96 .and_then(|v| v.to_str().ok())
97 .map(|s| s.to_string());
98
99 let mut metadata = HashMap::new();
101 for (name, value) in headers.iter() {
102 let name_str = name.as_str();
103 if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") {
104 if let Ok(v) = value.to_str() {
105 metadata.insert(meta_key.to_string(), v.to_string());
106 }
107 }
108 }
109
110 match state
111 .store
112 .put_object(&bucket, &key, content_type.as_deref(), metadata, body)
113 .await
114 {
115 Ok(result) => {
116 let mut response_headers = HeaderMap::new();
117 response_headers.insert("ETag", result.etag.parse().unwrap());
118 response_headers.insert(
119 "x-amz-request-id",
120 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
121 );
122 (StatusCode::OK, response_headers).into_response()
123 }
124 Err(Post3Error::BucketNotFound(b)) => (
125 StatusCode::NOT_FOUND,
126 [("Content-Type", "application/xml")],
127 responses::error_xml(
128 "NoSuchBucket",
129 "The specified bucket does not exist",
130 &b,
131 ),
132 )
133 .into_response(),
134 Err(e) => {
135 tracing::error!("put_object error: {e}");
136 (
137 StatusCode::INTERNAL_SERVER_ERROR,
138 [("Content-Type", "application/xml")],
139 responses::error_xml(
140 "InternalError",
141 &e.to_string(),
142 &format!("/{bucket}/{key}"),
143 ),
144 )
145 .into_response()
146 }
147 }
148}
149
150pub async fn get_object<B: StorageBackend>(
151 State(state): State<AppState<B>>,
152 Path((bucket, key)): Path<(String, String)>,
153) -> Response {
154 match state.store.get_object(&bucket, &key).await {
155 Ok(result) => {
156 let mut headers = HeaderMap::new();
157 headers.insert(
158 "Content-Type",
159 HeaderValue::from_str(&result.metadata.content_type).unwrap(),
160 );
161 headers.insert(
162 "Content-Length",
163 HeaderValue::from_str(&result.metadata.size.to_string()).unwrap(),
164 );
165 headers.insert("ETag", HeaderValue::from_str(&result.metadata.etag).unwrap());
166 headers.insert(
167 "Last-Modified",
168 HeaderValue::from_str(
169 &result
170 .metadata
171 .last_modified
172 .format("%a, %d %b %Y %H:%M:%S GMT")
173 .to_string(),
174 )
175 .unwrap(),
176 );
177 headers.insert(
178 "x-amz-request-id",
179 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
180 );
181
182 for (k, v) in &result.user_metadata {
184 let header_name = format!("x-amz-meta-{k}");
185 if let (Ok(name), Ok(val)) = (
186 header_name.parse::<HeaderName>(),
187 HeaderValue::from_str(v),
188 ) {
189 headers.insert(name, val);
190 }
191 }
192
193 (StatusCode::OK, headers, Body::from(result.body)).into_response()
194 }
195 Err(Post3Error::BucketNotFound(b)) => (
196 StatusCode::NOT_FOUND,
197 [("Content-Type", "application/xml")],
198 responses::error_xml(
199 "NoSuchBucket",
200 "The specified bucket does not exist",
201 &b,
202 ),
203 )
204 .into_response(),
205 Err(Post3Error::ObjectNotFound { bucket: b, key: k }) => (
206 StatusCode::NOT_FOUND,
207 [("Content-Type", "application/xml")],
208 responses::error_xml(
209 "NoSuchKey",
210 "The specified key does not exist.",
211 &format!("/{b}/{k}"),
212 ),
213 )
214 .into_response(),
215 Err(e) => {
216 tracing::error!("get_object error: {e}");
217 (
218 StatusCode::INTERNAL_SERVER_ERROR,
219 [("Content-Type", "application/xml")],
220 responses::error_xml(
221 "InternalError",
222 &e.to_string(),
223 &format!("/{bucket}/{key}"),
224 ),
225 )
226 .into_response()
227 }
228 }
229}
230
231pub async fn head_object<B: StorageBackend>(
232 State(state): State<AppState<B>>,
233 Path((bucket, key)): Path<(String, String)>,
234) -> Response {
235 match state.store.head_object(&bucket, &key).await {
236 Ok(Some(result)) => {
237 let mut headers = HeaderMap::new();
238 headers.insert(
239 "Content-Type",
240 HeaderValue::from_str(&result.object.content_type).unwrap(),
241 );
242 headers.insert(
243 "Content-Length",
244 HeaderValue::from_str(&result.object.size.to_string()).unwrap(),
245 );
246 headers.insert("ETag", HeaderValue::from_str(&result.object.etag).unwrap());
247 headers.insert(
248 "Last-Modified",
249 HeaderValue::from_str(
250 &result
251 .object
252 .last_modified
253 .format("%a, %d %b %Y %H:%M:%S GMT")
254 .to_string(),
255 )
256 .unwrap(),
257 );
258 headers.insert(
259 "x-amz-request-id",
260 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
261 );
262
263 for (k, v) in &result.user_metadata {
264 let header_name = format!("x-amz-meta-{k}");
265 if let (Ok(name), Ok(val)) = (
266 header_name.parse::<HeaderName>(),
267 HeaderValue::from_str(v),
268 ) {
269 headers.insert(name, val);
270 }
271 }
272
273 (StatusCode::OK, headers).into_response()
274 }
275 Ok(None) => {
276 let mut headers = HeaderMap::new();
277 headers.insert(
278 "x-amz-request-id",
279 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
280 );
281 (StatusCode::NOT_FOUND, headers).into_response()
282 }
283 Err(e) => {
284 tracing::error!("head_object error: {e}");
285 StatusCode::INTERNAL_SERVER_ERROR.into_response()
286 }
287 }
288}
289
290pub async fn delete_object<B: StorageBackend>(
291 State(state): State<AppState<B>>,
292 Path((bucket, key)): Path<(String, String)>,
293) -> Response {
294 match state.store.delete_object(&bucket, &key).await {
295 Ok(()) => {
296 let mut headers = HeaderMap::new();
297 headers.insert(
298 "x-amz-request-id",
299 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
300 );
301 (StatusCode::NO_CONTENT, headers).into_response()
302 }
303 Err(Post3Error::BucketNotFound(b)) => (
304 StatusCode::NOT_FOUND,
305 [("Content-Type", "application/xml")],
306 responses::error_xml(
307 "NoSuchBucket",
308 "The specified bucket does not exist",
309 &b,
310 ),
311 )
312 .into_response(),
313 Err(e) => {
314 tracing::error!("delete_object error: {e}");
315 StatusCode::INTERNAL_SERVER_ERROR.into_response()
316 }
317 }
318}
319
320pub async fn list_or_get<B: StorageBackend>(
323 State(state): State<AppState<B>>,
324 Path(bucket): Path<String>,
325 Query(query): Query<BucketGetQuery>,
326) -> Response {
327 if query.uploads.is_some() {
329 return multipart::list_multipart_uploads(
330 State(state),
331 Path(bucket),
332 Query(query),
333 )
334 .await;
335 }
336
337 if query.location.is_some() {
339 return get_bucket_location(State(state), Path(bucket)).await;
340 }
341
342 if query.versions.is_some() {
344 return list_object_versions(State(state), Path(bucket), Query(query)).await;
345 }
346
347 let is_v2 = query.list_type == Some(2);
349 let continuation_token = if is_v2 {
350 query
352 .continuation_token
353 .as_deref()
354 .or(query.start_after.as_deref())
355 } else {
356 query.marker.as_deref()
357 };
358
359 let delimiter = query
361 .delimiter
362 .as_deref()
363 .filter(|d| !d.is_empty());
364
365 match state
366 .store
367 .list_objects_v2(
368 &bucket,
369 query.prefix.as_deref(),
370 continuation_token,
371 query.max_keys,
372 delimiter,
373 )
374 .await
375 {
376 Ok(result) => {
377 let max_keys = query.max_keys.unwrap_or(1000);
378 let mut headers = HeaderMap::new();
379 headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
380 headers.insert(
381 "x-amz-request-id",
382 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
383 );
384
385 let xml = if is_v2 {
386 responses::list_objects_v2_xml(
387 &bucket,
388 &result,
389 max_keys,
390 query.continuation_token.as_deref(),
391 query.start_after.as_deref(),
392 )
393 } else {
394 responses::list_objects_v1_xml(
395 &bucket,
396 &result,
397 max_keys,
398 query.marker.as_deref(),
399 )
400 };
401
402 (StatusCode::OK, headers, xml).into_response()
403 }
404 Err(Post3Error::BucketNotFound(b)) => (
405 StatusCode::NOT_FOUND,
406 [("Content-Type", "application/xml")],
407 responses::error_xml(
408 "NoSuchBucket",
409 "The specified bucket does not exist",
410 &b,
411 ),
412 )
413 .into_response(),
414 Err(e) => {
415 tracing::error!("list_objects error: {e}");
416 (
417 StatusCode::INTERNAL_SERVER_ERROR,
418 [("Content-Type", "application/xml")],
419 responses::error_xml("InternalError", &e.to_string(), &bucket),
420 )
421 .into_response()
422 }
423 }
424}
425
426async fn list_object_versions<B: StorageBackend>(
428 State(state): State<AppState<B>>,
429 Path(bucket): Path<String>,
430 Query(query): Query<BucketGetQuery>,
431) -> Response {
432 let delimiter = query.delimiter.as_deref().filter(|d| !d.is_empty());
433 match state
434 .store
435 .list_objects_v2(
436 &bucket,
437 query.prefix.as_deref(),
438 query.key_marker.as_deref(),
439 query.max_keys,
440 delimiter,
441 )
442 .await
443 {
444 Ok(result) => {
445 let max_keys = query.max_keys.unwrap_or(1000);
446 let mut headers = HeaderMap::new();
447 headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
448 headers.insert(
449 "x-amz-request-id",
450 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
451 );
452 (
453 StatusCode::OK,
454 headers,
455 responses::list_object_versions_xml(
456 &bucket,
457 &result,
458 max_keys,
459 query.key_marker.as_deref(),
460 ),
461 )
462 .into_response()
463 }
464 Err(Post3Error::BucketNotFound(b)) => (
465 StatusCode::NOT_FOUND,
466 [("Content-Type", "application/xml")],
467 responses::error_xml("NoSuchBucket", "The specified bucket does not exist", &b),
468 )
469 .into_response(),
470 Err(e) => {
471 tracing::error!("list_object_versions error: {e}");
472 (
473 StatusCode::INTERNAL_SERVER_ERROR,
474 [("Content-Type", "application/xml")],
475 responses::error_xml("InternalError", &e.to_string(), &bucket),
476 )
477 .into_response()
478 }
479 }
480}
481
482async fn get_bucket_location<B: StorageBackend>(
484 State(state): State<AppState<B>>,
485 Path(bucket): Path<String>,
486) -> Response {
487 match state.store.head_bucket(&bucket).await {
488 Ok(Some(_)) => {
489 let mut headers = HeaderMap::new();
490 headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
491 headers.insert(
492 "x-amz-request-id",
493 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
494 );
495 (StatusCode::OK, headers, responses::get_bucket_location_xml()).into_response()
496 }
497 Ok(None) => (
498 StatusCode::NOT_FOUND,
499 [("Content-Type", "application/xml")],
500 responses::error_xml(
501 "NoSuchBucket",
502 "The specified bucket does not exist",
503 &bucket,
504 ),
505 )
506 .into_response(),
507 Err(e) => {
508 tracing::error!("get_bucket_location error: {e}");
509 StatusCode::INTERNAL_SERVER_ERROR.into_response()
510 }
511 }
512}
513
514pub async fn bucket_post_dispatch<B: StorageBackend>(
516 state: State<AppState<B>>,
517 path: Path<String>,
518 query: Query<crate::s3::extractors::BucketPostQuery>,
519 body: Bytes,
520) -> Response {
521 if query.delete.is_some() {
522 delete_objects(state, path, body).await
523 } else {
524 (
525 StatusCode::BAD_REQUEST,
526 [("Content-Type", "application/xml")],
527 responses::error_xml(
528 "InvalidRequest",
529 "POST on bucket requires ?delete parameter",
530 &format!("/{}", path.0),
531 ),
532 )
533 .into_response()
534 }
535}
536
537async fn delete_objects<B: StorageBackend>(
539 State(state): State<AppState<B>>,
540 Path(bucket): Path<String>,
541 body: Bytes,
542) -> Response {
543 let (keys, quiet) = match responses::parse_delete_objects_xml(&body) {
544 Ok(result) => result,
545 Err(msg) => {
546 return (
547 StatusCode::BAD_REQUEST,
548 [("Content-Type", "application/xml")],
549 responses::error_xml("MalformedXML", &msg, &format!("/{bucket}")),
550 )
551 .into_response();
552 }
553 };
554
555 if keys.len() > 1000 {
557 return (
558 StatusCode::BAD_REQUEST,
559 [("Content-Type", "application/xml")],
560 responses::error_xml(
561 "MalformedXML",
562 "The number of keys in a DeleteObjects request cannot exceed 1000",
563 &format!("/{bucket}"),
564 ),
565 )
566 .into_response();
567 }
568
569 let mut deleted = Vec::new();
570 let mut errors: Vec<(String, String, String)> = Vec::new();
571
572 for key in keys {
573 match state.store.delete_object(&bucket, &key).await {
574 Ok(()) => {
575 if !quiet {
576 deleted.push(key);
577 }
578 }
579 Err(e) => {
580 errors.push((key, "InternalError".to_string(), e.to_string()));
581 }
582 }
583 }
584
585 let mut headers = HeaderMap::new();
586 headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
587 headers.insert(
588 "x-amz-request-id",
589 HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
590 );
591
592 (
593 StatusCode::OK,
594 headers,
595 responses::delete_objects_result_xml(&deleted, &errors),
596 )
597 .into_response()
598}