1use crate::fs::FileSystem;
2use crate::fs::InternalInfo;
3use crate::utils::*;
4
5use s3s::S3;
6use s3s::S3Result;
7use s3s::dto::*;
8use s3s::s3_error;
9use s3s::{S3Request, S3Response};
10
11use std::collections::VecDeque;
12use std::io;
13use std::ops::Neg;
14use std::ops::Not;
15use std::path::Component;
16use std::path::{Path, PathBuf};
17
18use tokio::fs;
19use tokio::io::AsyncSeekExt;
20use tokio_util::io::ReaderStream;
21
22use futures::TryStreamExt;
23use md5::{Digest, Md5};
24use numeric_cast::NumericCast;
25use stdx::default::default;
26use tracing::debug;
27use uuid::Uuid;
28
29fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
30 let mut normalized = String::new();
31 let mut first = true;
32 for component in path.components() {
33 match component {
34 Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
35 return None;
36 }
37 Component::Normal(name) => {
38 let name = name.to_str()?;
39 if !first {
40 normalized.push_str(delimiter);
41 }
42 normalized.push_str(name);
43 first = false;
44 }
45 }
46 }
47 Some(normalized)
48}
49
50fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String {
52 format!("bytes {start}-{end_inclusive}/{size}")
53}
54
55#[async_trait::async_trait]
56impl S3 for FileSystem {
57 #[tracing::instrument]
58 async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
59 let input = req.input;
60 let path = self.get_bucket_path(&input.bucket)?;
61
62 if path.exists() {
63 return Err(s3_error!(BucketAlreadyExists));
64 }
65
66 try_!(fs::create_dir(&path).await);
67
68 let output = CreateBucketOutput::default(); Ok(S3Response::new(output))
70 }
71
72 #[tracing::instrument]
73 async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
74 let input = req.input;
75 let (bucket, key) = match input.copy_source {
76 CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
77 CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
78 };
79
80 let src_path = self.get_object_path(bucket, key)?;
81 let dst_path = self.get_object_path(&input.bucket, &input.key)?;
82
83 if src_path.exists().not() {
84 return Err(s3_error!(NoSuchKey));
85 }
86
87 if self.get_bucket_path(&input.bucket)?.exists().not() {
88 return Err(s3_error!(NoSuchBucket));
89 }
90
91 if let Some(dir_path) = dst_path.parent() {
92 try_!(fs::create_dir_all(&dir_path).await);
93 }
94
95 let file_metadata = try_!(fs::metadata(&src_path).await);
96 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
97
98 let _ = try_!(fs::copy(&src_path, &dst_path).await);
99
100 debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");
101
102 let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
103 if src_metadata_path.exists() {
104 let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
105 let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
106 }
107
108 let md5_sum = self.get_md5_sum(bucket, key).await?;
109
110 let copy_object_result = CopyObjectResult {
111 e_tag: Some(format!("\"{md5_sum}\"")),
112 last_modified: Some(last_modified),
113 ..Default::default()
114 };
115
116 let output = CopyObjectOutput {
117 copy_object_result: Some(copy_object_result),
118 ..Default::default()
119 };
120 Ok(S3Response::new(output))
121 }
122
123 #[tracing::instrument]
124 async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
125 let input = req.input;
126 let path = self.get_bucket_path(&input.bucket)?;
127 if path.exists() {
128 try_!(fs::remove_dir_all(path).await);
129 } else {
130 return Err(s3_error!(NoSuchBucket));
131 }
132 Ok(S3Response::new(DeleteBucketOutput {}))
133 }
134
135 #[tracing::instrument]
136 async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
137 let input = req.input;
138 let path = self.get_object_path(&input.bucket, &input.key)?;
139 if path.exists().not() {
140 return Err(s3_error!(NoSuchKey));
141 }
142 if input.key.ends_with('/') {
143 let mut dir = try_!(fs::read_dir(&path).await);
144 let is_empty = try_!(dir.next_entry().await).is_none();
145 if is_empty {
146 try_!(fs::remove_dir(&path).await);
147 }
148 } else {
149 try_!(fs::remove_file(&path).await);
150 }
151 let output = DeleteObjectOutput::default(); Ok(S3Response::new(output))
153 }
154
155 #[tracing::instrument]
156 async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
157 let input = req.input;
158 let mut objects: Vec<(PathBuf, String)> = Vec::new();
159 for object in input.delete.objects {
160 let path = self.get_object_path(&input.bucket, &object.key)?;
161 if path.exists() {
162 objects.push((path, object.key));
163 }
164 }
165
166 let mut deleted_objects: Vec<DeletedObject> = Vec::new();
167 for (path, key) in objects {
168 try_!(fs::remove_file(path).await);
169
170 let deleted_object = DeletedObject {
171 key: Some(key),
172 ..Default::default()
173 };
174
175 deleted_objects.push(deleted_object);
176 }
177
178 let output = DeleteObjectsOutput {
179 deleted: Some(deleted_objects),
180 ..Default::default()
181 };
182 Ok(S3Response::new(output))
183 }
184
185 #[tracing::instrument]
186 async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
187 let input = req.input;
188 let path = self.get_bucket_path(&input.bucket)?;
189
190 if !path.exists() {
191 return Err(s3_error!(NoSuchBucket));
192 }
193
194 let output = GetBucketLocationOutput::default();
195 Ok(S3Response::new(output))
196 }
197
198 #[tracing::instrument]
199 async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
200 let input = req.input;
201 let object_path = self.get_object_path(&input.bucket, &input.key)?;
202
203 let mut file = fs::File::open(&object_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
204
205 let file_metadata = try_!(file.metadata().await);
206 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
207 let file_len = file_metadata.len();
208
209 let (content_length, content_range) = match input.range {
210 None => (file_len, None),
211 Some(range) => {
212 let file_range = range.check(file_len)?;
213 let content_length = file_range.end - file_range.start;
214 let content_range = fmt_content_range(file_range.start, file_range.end - 1, file_len);
215 (content_length, Some(content_range))
216 }
217 };
218 let content_length_usize = try_!(usize::try_from(content_length));
219 let content_length_i64 = try_!(i64::try_from(content_length));
220
221 match input.range {
222 Some(Range::Int { first, .. }) => {
223 try_!(file.seek(io::SeekFrom::Start(first)).await);
224 }
225 Some(Range::Suffix { length }) => {
226 let neg_offset = length.numeric_cast::<i64>().neg();
227 try_!(file.seek(io::SeekFrom::End(neg_offset)).await);
228 }
229 None => {}
230 }
231
232 let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);
233
234 let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;
235
236 let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;
237 let e_tag = format!("\"{md5_sum}\"");
238
239 let info = self.load_internal_info(&input.bucket, &input.key).await?;
240 let checksum = match &info {
241 Some(info) => crate::checksum::from_internal_info(info),
242 None => default(),
243 };
244
245 let output = GetObjectOutput {
246 body: Some(StreamingBlob::wrap(body)),
247 content_length: Some(content_length_i64),
248 content_range,
249 last_modified: Some(last_modified),
250 metadata: object_metadata,
251 e_tag: Some(e_tag),
252 checksum_crc32: checksum.checksum_crc32,
253 checksum_crc32c: checksum.checksum_crc32c,
254 checksum_sha1: checksum.checksum_sha1,
255 checksum_sha256: checksum.checksum_sha256,
256 ..Default::default()
257 };
258 Ok(S3Response::new(output))
259 }
260
261 #[tracing::instrument]
262 async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
263 let input = req.input;
264 let path = self.get_bucket_path(&input.bucket)?;
265
266 if !path.exists() {
267 return Err(s3_error!(NoSuchBucket));
268 }
269
270 Ok(S3Response::new(HeadBucketOutput::default()))
271 }
272
273 #[tracing::instrument]
274 async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
275 let input = req.input;
276 let path = self.get_object_path(&input.bucket, &input.key)?;
277
278 if !path.exists() {
279 return Err(s3_error!(NoSuchBucket));
280 }
281
282 let file_metadata = try_!(fs::metadata(path).await);
283 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
284 let file_len = file_metadata.len();
285
286 let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;
287
288 let content_type = mime::APPLICATION_OCTET_STREAM;
290
291 let output = HeadObjectOutput {
292 content_length: Some(try_!(i64::try_from(file_len))),
293 content_type: Some(content_type),
294 last_modified: Some(last_modified),
295 metadata: object_metadata,
296 ..Default::default()
297 };
298 Ok(S3Response::new(output))
299 }
300
301 #[tracing::instrument]
302 async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
303 let mut buckets: Vec<Bucket> = Vec::new();
304 let mut iter = try_!(fs::read_dir(&self.root).await);
305 while let Some(entry) = try_!(iter.next_entry().await) {
306 let file_type = try_!(entry.file_type().await);
307 if file_type.is_dir().not() {
308 continue;
309 }
310
311 let file_name = entry.file_name();
312 let Some(name) = file_name.to_str() else { continue };
313 if s3s::path::check_bucket_name(name).not() {
314 continue;
315 }
316
317 let file_meta = try_!(entry.metadata().await);
318 let created_or_modified_date = Timestamp::from(try_!(file_meta.created().or(file_meta.modified())));
322
323 let bucket = Bucket {
324 creation_date: Some(created_or_modified_date),
325 name: Some(name.to_owned()),
326 bucket_region: None,
327 };
328 buckets.push(bucket);
329 }
330
331 let output = ListBucketsOutput {
332 buckets: Some(buckets),
333 owner: None,
334 ..Default::default()
335 };
336 Ok(S3Response::new(output))
337 }
338
339 #[tracing::instrument]
340 async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
341 let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
342
343 Ok(v2_resp.map_output(|v2| ListObjectsOutput {
344 contents: v2.contents,
345 delimiter: v2.delimiter,
346 encoding_type: v2.encoding_type,
347 name: v2.name,
348 prefix: v2.prefix,
349 max_keys: v2.max_keys,
350 ..Default::default()
351 }))
352 }
353
354 #[tracing::instrument]
355 async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
356 let input = req.input;
357 let path = self.get_bucket_path(&input.bucket)?;
358
359 if path.exists().not() {
360 return Err(s3_error!(NoSuchBucket));
361 }
362
363 let mut objects: Vec<Object> = default();
364 let mut dir_queue: VecDeque<PathBuf> = default();
365 dir_queue.push_back(path.clone());
366
367 while let Some(dir) = dir_queue.pop_front() {
368 let mut iter = try_!(fs::read_dir(dir).await);
369 while let Some(entry) = try_!(iter.next_entry().await) {
370 let file_type = try_!(entry.file_type().await);
371 if file_type.is_dir() {
372 dir_queue.push_back(entry.path());
373 } else {
374 let file_path = entry.path();
375 let key = try_!(file_path.strip_prefix(&path));
376 let delimiter = input.delimiter.as_ref().map_or("/", |d| d.as_str());
377 let Some(key_str) = normalize_path(key, delimiter) else {
378 continue;
379 };
380
381 if let Some(ref prefix) = input.prefix {
382 let prefix_path: PathBuf = prefix.split(delimiter).collect();
383
384 let key_s = format!("{}", key.display());
385 let prefix_path_s = format!("{}", prefix_path.display());
386
387 if !key_s.starts_with(&prefix_path_s) {
388 continue;
389 }
390 }
391
392 let metadata = try_!(entry.metadata().await);
393 let last_modified = Timestamp::from(try_!(metadata.modified()));
394 let size = metadata.len();
395
396 let object = Object {
397 key: Some(key_str),
398 last_modified: Some(last_modified),
399 size: Some(try_!(i64::try_from(size))),
400 ..Default::default()
401 };
402 objects.push(object);
403 }
404 }
405 }
406
407 objects.sort_by(|lhs, rhs| {
408 let lhs_key = lhs.key.as_deref().unwrap_or("");
409 let rhs_key = rhs.key.as_deref().unwrap_or("");
410 lhs_key.cmp(rhs_key)
411 });
412
413 let objects = if let Some(marker) = &input.start_after {
414 objects
415 .into_iter()
416 .skip_while(|n| n.key.as_deref().unwrap_or("") <= marker.as_str())
417 .collect()
418 } else {
419 objects
420 };
421
422 let key_count = try_!(i32::try_from(objects.len()));
423
424 let output = ListObjectsV2Output {
425 key_count: Some(key_count),
426 max_keys: Some(key_count),
427 contents: Some(objects),
428 delimiter: input.delimiter,
429 encoding_type: input.encoding_type,
430 name: Some(input.bucket),
431 prefix: input.prefix,
432 ..Default::default()
433 };
434 Ok(S3Response::new(output))
435 }
436
437 #[tracing::instrument]
438 async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
439 let input = req.input;
440 if let Some(ref storage_class) = input.storage_class {
441 let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
442 if !is_valid {
443 return Err(s3_error!(InvalidStorageClass));
444 }
445 }
446
447 let PutObjectInput {
448 body,
449 bucket,
450 key,
451 metadata,
452 content_length,
453 ..
454 } = input;
455
456 let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
457
458 let mut checksum: s3s::checksum::ChecksumHasher = default();
459 if input.checksum_crc32.is_some() {
460 checksum.crc32 = Some(default());
461 }
462 if input.checksum_crc32c.is_some() {
463 checksum.crc32c = Some(default());
464 }
465 if input.checksum_sha1.is_some() {
466 checksum.sha1 = Some(default());
467 }
468 if input.checksum_sha256.is_some() {
469 checksum.sha256 = Some(default());
470 }
471
472 if key.ends_with('/') {
473 if let Some(len) = content_length {
474 if len > 0 {
475 return Err(s3_error!(UnexpectedContent, "Unexpected request body when creating a directory object."));
476 }
477 }
478 let object_path = self.get_object_path(&bucket, &key)?;
479 try_!(fs::create_dir_all(&object_path).await);
480 let output = PutObjectOutput::default();
481 return Ok(S3Response::new(output));
482 }
483
484 let object_path = self.get_object_path(&bucket, &key)?;
485 let mut file_writer = self.prepare_file_write(&object_path).await?;
486
487 let mut md5_hash = <Md5 as Digest>::new();
488 let stream = body.inspect_ok(|bytes| {
489 md5_hash.update(bytes.as_ref());
490 checksum.update(bytes.as_ref());
491 });
492
493 let size = copy_bytes(stream, file_writer.writer()).await?;
494 file_writer.done().await?;
495
496 let md5_sum = hex(md5_hash.finalize());
497
498 let checksum = checksum.finalize();
499 if checksum.checksum_crc32 != input.checksum_crc32 {
500 return Err(s3_error!(BadDigest, "checksum_crc32 mismatch"));
501 }
502 if checksum.checksum_crc32c != input.checksum_crc32c {
503 return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
504 }
505 if checksum.checksum_sha1 != input.checksum_sha1 {
506 return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
507 }
508 if checksum.checksum_sha256 != input.checksum_sha256 {
509 return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
510 }
511
512 debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");
513
514 if let Some(ref metadata) = metadata {
515 self.save_metadata(&bucket, &key, metadata, None).await?;
516 }
517
518 let mut info: InternalInfo = default();
519 crate::checksum::modify_internal_info(&mut info, &checksum);
520 self.save_internal_info(&bucket, &key, &info).await?;
521
522 let e_tag = format!("\"{md5_sum}\"");
523
524 let output = PutObjectOutput {
525 e_tag: Some(e_tag),
526 checksum_crc32: checksum.checksum_crc32,
527 checksum_crc32c: checksum.checksum_crc32c,
528 checksum_sha1: checksum.checksum_sha1,
529 checksum_sha256: checksum.checksum_sha256,
530 ..Default::default()
531 };
532 Ok(S3Response::new(output))
533 }
534
535 #[tracing::instrument]
536 async fn create_multipart_upload(
537 &self,
538 req: S3Request<CreateMultipartUploadInput>,
539 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
540 let input = req.input;
541 let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;
542
543 if let Some(ref metadata) = input.metadata {
544 self.save_metadata(&input.bucket, &input.key, metadata, Some(upload_id))
545 .await?;
546 }
547
548 let output = CreateMultipartUploadOutput {
549 bucket: Some(input.bucket),
550 key: Some(input.key),
551 upload_id: Some(upload_id.to_string()),
552 ..Default::default()
553 };
554
555 Ok(S3Response::new(output))
556 }
557
558 #[tracing::instrument]
559 async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
560 let UploadPartInput {
561 body,
562 upload_id,
563 part_number,
564 ..
565 } = req.input;
566
567 let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
568
569 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
570 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
571 return Err(s3_error!(AccessDenied));
572 }
573
574 let file_path = self.resolve_upload_part_path(upload_id, part_number)?;
575
576 let mut md5_hash = <Md5 as Digest>::new();
577 let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
578
579 let mut file_writer = self.prepare_file_write(&file_path).await?;
580 let size = copy_bytes(stream, file_writer.writer()).await?;
581 file_writer.done().await?;
582
583 let md5_sum = hex(md5_hash.finalize());
584
585 debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
586
587 let output = UploadPartOutput {
588 e_tag: Some(format!("\"{md5_sum}\"")),
589 ..Default::default()
590 };
591 Ok(S3Response::new(output))
592 }
593
594 #[tracing::instrument]
595 async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
596 let input = req.input;
597
598 let upload_id = Uuid::parse_str(&input.upload_id).map_err(|_| s3_error!(InvalidRequest))?;
599 let part_number = input.part_number;
600 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
601 return Err(s3_error!(AccessDenied));
602 }
603
604 let (src_bucket, src_key) = match input.copy_source {
605 CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
606 CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
607 };
608 let src_path = self.get_object_path(src_bucket, src_key)?;
609 let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;
610
611 let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
612 let file_len = try_!(src_file.metadata().await).len();
613
614 let (start, end) = if let Some(copy_range) = &input.copy_source_range {
615 if !copy_range.starts_with("bytes=") {
616 return Err(s3_error!(InvalidArgument));
617 }
618 let range = ©_range["bytes=".len()..];
619 let parts: Vec<&str> = range.split('-').collect();
620 if parts.len() != 2 {
621 return Err(s3_error!(InvalidArgument));
622 }
623
624 let start: u64 = parts[0].parse().map_err(|_| s3_error!(InvalidArgument))?;
625 let mut end = file_len - 1;
626 if parts[1].is_empty().not() {
627 end = parts[1].parse().map_err(|_| s3_error!(InvalidArgument))?;
628 }
629 (start, end)
630 } else {
631 (0, file_len - 1)
632 };
633
634 let content_length = end - start + 1;
635 let content_length_usize = try_!(usize::try_from(content_length));
636
637 let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
638 let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));
639
640 let mut md5_hash = <Md5 as Digest>::new();
641 let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
642
643 let mut file_writer = self.prepare_file_write(&dst_path).await?;
644 let size = copy_bytes(stream, file_writer.writer()).await?;
645 file_writer.done().await?;
646
647 let md5_sum = hex(md5_hash.finalize());
648
649 debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
650
651 let output = UploadPartCopyOutput {
652 copy_part_result: Some(CopyPartResult {
653 e_tag: Some(format!("\"{md5_sum}\"")),
654 ..Default::default()
655 }),
656 ..Default::default()
657 };
658
659 Ok(S3Response::new(output))
660 }
661
662 #[tracing::instrument]
663 async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
664 let ListPartsInput {
665 bucket, key, upload_id, ..
666 } = req.input;
667
668 let mut parts: Vec<Part> = Vec::new();
669 let mut iter = try_!(fs::read_dir(&self.root).await);
670
671 let prefix = format!(".upload_id-{upload_id}");
672
673 while let Some(entry) = try_!(iter.next_entry().await) {
674 let file_type = try_!(entry.file_type().await);
675 if file_type.is_file().not() {
676 continue;
677 }
678
679 let file_name = entry.file_name();
680 let Some(name) = file_name.to_str() else { continue };
681
682 let Some(part_segment) = name.strip_prefix(&prefix) else { continue };
683 let Some(part_number) = part_segment.strip_prefix(".part-") else { continue };
684 let part_number = part_number.parse::<i32>().unwrap();
685
686 let file_meta = try_!(entry.metadata().await);
687 let last_modified = Timestamp::from(try_!(file_meta.modified()));
688 let size = try_!(i64::try_from(file_meta.len()));
689
690 let part = Part {
691 last_modified: Some(last_modified),
692 part_number: Some(part_number),
693 size: Some(size),
694 ..Default::default()
695 };
696 parts.push(part);
697 }
698
699 let output = ListPartsOutput {
700 bucket: Some(bucket),
701 key: Some(key),
702 upload_id: Some(upload_id),
703 parts: Some(parts),
704 ..Default::default()
705 };
706 Ok(S3Response::new(output))
707 }
708
709 #[tracing::instrument]
710 async fn complete_multipart_upload(
711 &self,
712 req: S3Request<CompleteMultipartUploadInput>,
713 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
714 let CompleteMultipartUploadInput {
715 multipart_upload,
716 bucket,
717 key,
718 upload_id,
719 ..
720 } = req.input;
721
722 let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
723
724 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
725 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
726 return Err(s3_error!(AccessDenied));
727 }
728
729 self.delete_upload_id(&upload_id).await?;
730
731 if let Ok(Some(metadata)) = self.load_metadata(&bucket, &key, Some(upload_id)).await {
732 self.save_metadata(&bucket, &key, &metadata, None).await?;
733 let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
734 }
735
736 let object_path = self.get_object_path(&bucket, &key)?;
737 let mut file_writer = self.prepare_file_write(&object_path).await?;
738
739 let mut cnt: i32 = 0;
740 for part in multipart_upload.parts.into_iter().flatten() {
741 let part_number = part
742 .part_number
743 .ok_or_else(|| s3_error!(InvalidRequest, "missing part number"))?;
744 cnt += 1;
745 if part_number != cnt {
746 return Err(s3_error!(InvalidRequest, "invalid part order"));
747 }
748
749 let part_path = self.resolve_upload_part_path(upload_id, part_number)?;
750
751 let mut reader = try_!(fs::File::open(&part_path).await);
752 let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);
753
754 debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
755 try_!(fs::remove_file(&part_path).await);
756 }
757 file_writer.done().await?;
758
759 let file_size = try_!(fs::metadata(&object_path).await).len();
760 let md5_sum = self.get_md5_sum(&bucket, &key).await?;
761
762 debug!(?md5_sum, path = %object_path.display(), size = ?file_size, "file md5 sum");
763
764 let output = CompleteMultipartUploadOutput {
765 bucket: Some(bucket),
766 key: Some(key),
767 e_tag: Some(format!("\"{md5_sum}\"")),
768 ..Default::default()
769 };
770 Ok(S3Response::new(output))
771 }
772
773 #[tracing::instrument]
774 async fn abort_multipart_upload(
775 &self,
776 req: S3Request<AbortMultipartUploadInput>,
777 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
778 let AbortMultipartUploadInput {
779 bucket, key, upload_id, ..
780 } = req.input;
781
782 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
783 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
784 return Err(s3_error!(AccessDenied));
785 }
786
787 let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
788
789 let prefix = format!(".upload_id-{upload_id}");
790 let mut iter = try_!(fs::read_dir(&self.root).await);
791 while let Some(entry) = try_!(iter.next_entry().await) {
792 let file_type = try_!(entry.file_type().await);
793 if file_type.is_file().not() {
794 continue;
795 }
796
797 let file_name = entry.file_name();
798 let Some(name) = file_name.to_str() else { continue };
799
800 if name.starts_with(&prefix) {
801 try_!(fs::remove_file(entry.path()).await);
802 }
803 }
804
805 self.delete_upload_id(&upload_id).await?;
806
807 debug!(bucket = %bucket, key = %key, upload_id = %upload_id, "multipart upload aborted");
808
809 Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
810 }
811}