s3s_fs/
s3.rs

1use crate::fs::FileSystem;
2use crate::fs::InternalInfo;
3use crate::utils::*;
4
5use s3s::S3;
6use s3s::S3Result;
7use s3s::dto::*;
8use s3s::s3_error;
9use s3s::{S3Request, S3Response};
10
11use std::collections::VecDeque;
12use std::io;
13use std::ops::Neg;
14use std::ops::Not;
15use std::path::Component;
16use std::path::{Path, PathBuf};
17
18use tokio::fs;
19use tokio::io::AsyncSeekExt;
20use tokio_util::io::ReaderStream;
21
22use futures::TryStreamExt;
23use md5::{Digest, Md5};
24use numeric_cast::NumericCast;
25use stdx::default::default;
26use tracing::debug;
27use uuid::Uuid;
28
29fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
30    let mut normalized = String::new();
31    let mut first = true;
32    for component in path.components() {
33        match component {
34            Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
35                return None;
36            }
37            Component::Normal(name) => {
38                let name = name.to_str()?;
39                if !first {
40                    normalized.push_str(delimiter);
41                }
42                normalized.push_str(name);
43                first = false;
44            }
45        }
46    }
47    Some(normalized)
48}
49
50/// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
51fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String {
52    format!("bytes {start}-{end_inclusive}/{size}")
53}
54
55#[async_trait::async_trait]
56impl S3 for FileSystem {
57    #[tracing::instrument]
58    async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
59        let input = req.input;
60        let path = self.get_bucket_path(&input.bucket)?;
61
62        if path.exists() {
63            return Err(s3_error!(BucketAlreadyExists));
64        }
65
66        try_!(fs::create_dir(&path).await);
67
68        let output = CreateBucketOutput::default(); // TODO: handle other fields
69        Ok(S3Response::new(output))
70    }
71
72    #[tracing::instrument]
73    async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
74        let input = req.input;
75        let (bucket, key) = match input.copy_source {
76            CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
77            CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
78        };
79
80        let src_path = self.get_object_path(bucket, key)?;
81        let dst_path = self.get_object_path(&input.bucket, &input.key)?;
82
83        if src_path.exists().not() {
84            return Err(s3_error!(NoSuchKey));
85        }
86
87        if self.get_bucket_path(&input.bucket)?.exists().not() {
88            return Err(s3_error!(NoSuchBucket));
89        }
90
91        if let Some(dir_path) = dst_path.parent() {
92            try_!(fs::create_dir_all(&dir_path).await);
93        }
94
95        let file_metadata = try_!(fs::metadata(&src_path).await);
96        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
97
98        let _ = try_!(fs::copy(&src_path, &dst_path).await);
99
100        debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");
101
102        let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
103        if src_metadata_path.exists() {
104            let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
105            let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
106        }
107
108        let md5_sum = self.get_md5_sum(bucket, key).await?;
109
110        let copy_object_result = CopyObjectResult {
111            e_tag: Some(format!("\"{md5_sum}\"")),
112            last_modified: Some(last_modified),
113            ..Default::default()
114        };
115
116        let output = CopyObjectOutput {
117            copy_object_result: Some(copy_object_result),
118            ..Default::default()
119        };
120        Ok(S3Response::new(output))
121    }
122
123    #[tracing::instrument]
124    async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
125        let input = req.input;
126        let path = self.get_bucket_path(&input.bucket)?;
127        if path.exists() {
128            try_!(fs::remove_dir_all(path).await);
129        } else {
130            return Err(s3_error!(NoSuchBucket));
131        }
132        Ok(S3Response::new(DeleteBucketOutput {}))
133    }
134
135    #[tracing::instrument]
136    async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
137        let input = req.input;
138        let path = self.get_object_path(&input.bucket, &input.key)?;
139        if path.exists().not() {
140            return Err(s3_error!(NoSuchKey));
141        }
142        if input.key.ends_with('/') {
143            let mut dir = try_!(fs::read_dir(&path).await);
144            let is_empty = try_!(dir.next_entry().await).is_none();
145            if is_empty {
146                try_!(fs::remove_dir(&path).await);
147            }
148        } else {
149            try_!(fs::remove_file(&path).await);
150        }
151        let output = DeleteObjectOutput::default(); // TODO: handle other fields
152        Ok(S3Response::new(output))
153    }
154
155    #[tracing::instrument]
156    async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
157        let input = req.input;
158        let mut objects: Vec<(PathBuf, String)> = Vec::new();
159        for object in input.delete.objects {
160            let path = self.get_object_path(&input.bucket, &object.key)?;
161            if path.exists() {
162                objects.push((path, object.key));
163            }
164        }
165
166        let mut deleted_objects: Vec<DeletedObject> = Vec::new();
167        for (path, key) in objects {
168            try_!(fs::remove_file(path).await);
169
170            let deleted_object = DeletedObject {
171                key: Some(key),
172                ..Default::default()
173            };
174
175            deleted_objects.push(deleted_object);
176        }
177
178        let output = DeleteObjectsOutput {
179            deleted: Some(deleted_objects),
180            ..Default::default()
181        };
182        Ok(S3Response::new(output))
183    }
184
185    #[tracing::instrument]
186    async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
187        let input = req.input;
188        let path = self.get_bucket_path(&input.bucket)?;
189
190        if !path.exists() {
191            return Err(s3_error!(NoSuchBucket));
192        }
193
194        let output = GetBucketLocationOutput::default();
195        Ok(S3Response::new(output))
196    }
197
198    #[tracing::instrument]
199    async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
200        let input = req.input;
201        let object_path = self.get_object_path(&input.bucket, &input.key)?;
202
203        let mut file = fs::File::open(&object_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
204
205        let file_metadata = try_!(file.metadata().await);
206        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
207        let file_len = file_metadata.len();
208
209        let (content_length, content_range) = match input.range {
210            None => (file_len, None),
211            Some(range) => {
212                let file_range = range.check(file_len)?;
213                let content_length = file_range.end - file_range.start;
214                let content_range = fmt_content_range(file_range.start, file_range.end - 1, file_len);
215                (content_length, Some(content_range))
216            }
217        };
218        let content_length_usize = try_!(usize::try_from(content_length));
219        let content_length_i64 = try_!(i64::try_from(content_length));
220
221        match input.range {
222            Some(Range::Int { first, .. }) => {
223                try_!(file.seek(io::SeekFrom::Start(first)).await);
224            }
225            Some(Range::Suffix { length }) => {
226                let neg_offset = length.numeric_cast::<i64>().neg();
227                try_!(file.seek(io::SeekFrom::End(neg_offset)).await);
228            }
229            None => {}
230        }
231
232        let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);
233
234        let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;
235
236        let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;
237        let e_tag = format!("\"{md5_sum}\"");
238
239        let info = self.load_internal_info(&input.bucket, &input.key).await?;
240        let checksum = match &info {
241            Some(info) => crate::checksum::from_internal_info(info),
242            None => default(),
243        };
244
245        let output = GetObjectOutput {
246            body: Some(StreamingBlob::wrap(body)),
247            content_length: Some(content_length_i64),
248            content_range,
249            last_modified: Some(last_modified),
250            metadata: object_metadata,
251            e_tag: Some(e_tag),
252            checksum_crc32: checksum.checksum_crc32,
253            checksum_crc32c: checksum.checksum_crc32c,
254            checksum_sha1: checksum.checksum_sha1,
255            checksum_sha256: checksum.checksum_sha256,
256            ..Default::default()
257        };
258        Ok(S3Response::new(output))
259    }
260
261    #[tracing::instrument]
262    async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
263        let input = req.input;
264        let path = self.get_bucket_path(&input.bucket)?;
265
266        if !path.exists() {
267            return Err(s3_error!(NoSuchBucket));
268        }
269
270        Ok(S3Response::new(HeadBucketOutput::default()))
271    }
272
273    #[tracing::instrument]
274    async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
275        let input = req.input;
276        let path = self.get_object_path(&input.bucket, &input.key)?;
277
278        if !path.exists() {
279            return Err(s3_error!(NoSuchBucket));
280        }
281
282        let file_metadata = try_!(fs::metadata(path).await);
283        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
284        let file_len = file_metadata.len();
285
286        let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;
287
288        // TODO: detect content type
289        let content_type = mime::APPLICATION_OCTET_STREAM;
290
291        let output = HeadObjectOutput {
292            content_length: Some(try_!(i64::try_from(file_len))),
293            content_type: Some(content_type),
294            last_modified: Some(last_modified),
295            metadata: object_metadata,
296            ..Default::default()
297        };
298        Ok(S3Response::new(output))
299    }
300
301    #[tracing::instrument]
302    async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
303        let mut buckets: Vec<Bucket> = Vec::new();
304        let mut iter = try_!(fs::read_dir(&self.root).await);
305        while let Some(entry) = try_!(iter.next_entry().await) {
306            let file_type = try_!(entry.file_type().await);
307            if file_type.is_dir().not() {
308                continue;
309            }
310
311            let file_name = entry.file_name();
312            let Some(name) = file_name.to_str() else { continue };
313            if s3s::path::check_bucket_name(name).not() {
314                continue;
315            }
316
317            let file_meta = try_!(entry.metadata().await);
318            // Not all filesystems/mounts provide all file attributes like created timestamp,
319            // therefore we try to fallback to modified if possible.
320            // See https://github.com/Nugine/s3s/pull/22 for more details.
321            let created_or_modified_date = Timestamp::from(try_!(file_meta.created().or(file_meta.modified())));
322
323            let bucket = Bucket {
324                creation_date: Some(created_or_modified_date),
325                name: Some(name.to_owned()),
326                bucket_region: None,
327            };
328            buckets.push(bucket);
329        }
330
331        let output = ListBucketsOutput {
332            buckets: Some(buckets),
333            owner: None,
334            ..Default::default()
335        };
336        Ok(S3Response::new(output))
337    }
338
339    #[tracing::instrument]
340    async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
341        let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
342
343        Ok(v2_resp.map_output(|v2| ListObjectsOutput {
344            contents: v2.contents,
345            delimiter: v2.delimiter,
346            encoding_type: v2.encoding_type,
347            name: v2.name,
348            prefix: v2.prefix,
349            max_keys: v2.max_keys,
350            ..Default::default()
351        }))
352    }
353
354    #[tracing::instrument]
355    async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
356        let input = req.input;
357        let path = self.get_bucket_path(&input.bucket)?;
358
359        if path.exists().not() {
360            return Err(s3_error!(NoSuchBucket));
361        }
362
363        let mut objects: Vec<Object> = default();
364        let mut dir_queue: VecDeque<PathBuf> = default();
365        dir_queue.push_back(path.clone());
366
367        while let Some(dir) = dir_queue.pop_front() {
368            let mut iter = try_!(fs::read_dir(dir).await);
369            while let Some(entry) = try_!(iter.next_entry().await) {
370                let file_type = try_!(entry.file_type().await);
371                if file_type.is_dir() {
372                    dir_queue.push_back(entry.path());
373                } else {
374                    let file_path = entry.path();
375                    let key = try_!(file_path.strip_prefix(&path));
376                    let delimiter = input.delimiter.as_ref().map_or("/", |d| d.as_str());
377                    let Some(key_str) = normalize_path(key, delimiter) else {
378                        continue;
379                    };
380
381                    if let Some(ref prefix) = input.prefix {
382                        let prefix_path: PathBuf = prefix.split(delimiter).collect();
383
384                        let key_s = format!("{}", key.display());
385                        let prefix_path_s = format!("{}", prefix_path.display());
386
387                        if !key_s.starts_with(&prefix_path_s) {
388                            continue;
389                        }
390                    }
391
392                    let metadata = try_!(entry.metadata().await);
393                    let last_modified = Timestamp::from(try_!(metadata.modified()));
394                    let size = metadata.len();
395
396                    let object = Object {
397                        key: Some(key_str),
398                        last_modified: Some(last_modified),
399                        size: Some(try_!(i64::try_from(size))),
400                        ..Default::default()
401                    };
402                    objects.push(object);
403                }
404            }
405        }
406
407        objects.sort_by(|lhs, rhs| {
408            let lhs_key = lhs.key.as_deref().unwrap_or("");
409            let rhs_key = rhs.key.as_deref().unwrap_or("");
410            lhs_key.cmp(rhs_key)
411        });
412
413        let objects = if let Some(marker) = &input.start_after {
414            objects
415                .into_iter()
416                .skip_while(|n| n.key.as_deref().unwrap_or("") <= marker.as_str())
417                .collect()
418        } else {
419            objects
420        };
421
422        let key_count = try_!(i32::try_from(objects.len()));
423
424        let output = ListObjectsV2Output {
425            key_count: Some(key_count),
426            max_keys: Some(key_count),
427            contents: Some(objects),
428            delimiter: input.delimiter,
429            encoding_type: input.encoding_type,
430            name: Some(input.bucket),
431            prefix: input.prefix,
432            ..Default::default()
433        };
434        Ok(S3Response::new(output))
435    }
436
437    #[tracing::instrument]
438    async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
439        let input = req.input;
440        if let Some(ref storage_class) = input.storage_class {
441            let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
442            if !is_valid {
443                return Err(s3_error!(InvalidStorageClass));
444            }
445        }
446
447        let PutObjectInput {
448            body,
449            bucket,
450            key,
451            metadata,
452            content_length,
453            ..
454        } = input;
455
456        let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
457
458        let mut checksum: s3s::checksum::ChecksumHasher = default();
459        if input.checksum_crc32.is_some() {
460            checksum.crc32 = Some(default());
461        }
462        if input.checksum_crc32c.is_some() {
463            checksum.crc32c = Some(default());
464        }
465        if input.checksum_sha1.is_some() {
466            checksum.sha1 = Some(default());
467        }
468        if input.checksum_sha256.is_some() {
469            checksum.sha256 = Some(default());
470        }
471
472        if key.ends_with('/') {
473            if let Some(len) = content_length {
474                if len > 0 {
475                    return Err(s3_error!(UnexpectedContent, "Unexpected request body when creating a directory object."));
476                }
477            }
478            let object_path = self.get_object_path(&bucket, &key)?;
479            try_!(fs::create_dir_all(&object_path).await);
480            let output = PutObjectOutput::default();
481            return Ok(S3Response::new(output));
482        }
483
484        let object_path = self.get_object_path(&bucket, &key)?;
485        let mut file_writer = self.prepare_file_write(&object_path).await?;
486
487        let mut md5_hash = <Md5 as Digest>::new();
488        let stream = body.inspect_ok(|bytes| {
489            md5_hash.update(bytes.as_ref());
490            checksum.update(bytes.as_ref());
491        });
492
493        let size = copy_bytes(stream, file_writer.writer()).await?;
494        file_writer.done().await?;
495
496        let md5_sum = hex(md5_hash.finalize());
497
498        let checksum = checksum.finalize();
499        if checksum.checksum_crc32 != input.checksum_crc32 {
500            return Err(s3_error!(BadDigest, "checksum_crc32 mismatch"));
501        }
502        if checksum.checksum_crc32c != input.checksum_crc32c {
503            return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
504        }
505        if checksum.checksum_sha1 != input.checksum_sha1 {
506            return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
507        }
508        if checksum.checksum_sha256 != input.checksum_sha256 {
509            return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
510        }
511
512        debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");
513
514        if let Some(ref metadata) = metadata {
515            self.save_metadata(&bucket, &key, metadata, None).await?;
516        }
517
518        let mut info: InternalInfo = default();
519        crate::checksum::modify_internal_info(&mut info, &checksum);
520        self.save_internal_info(&bucket, &key, &info).await?;
521
522        let e_tag = format!("\"{md5_sum}\"");
523
524        let output = PutObjectOutput {
525            e_tag: Some(e_tag),
526            checksum_crc32: checksum.checksum_crc32,
527            checksum_crc32c: checksum.checksum_crc32c,
528            checksum_sha1: checksum.checksum_sha1,
529            checksum_sha256: checksum.checksum_sha256,
530            ..Default::default()
531        };
532        Ok(S3Response::new(output))
533    }
534
535    #[tracing::instrument]
536    async fn create_multipart_upload(
537        &self,
538        req: S3Request<CreateMultipartUploadInput>,
539    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
540        let input = req.input;
541        let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;
542
543        if let Some(ref metadata) = input.metadata {
544            self.save_metadata(&input.bucket, &input.key, metadata, Some(upload_id))
545                .await?;
546        }
547
548        let output = CreateMultipartUploadOutput {
549            bucket: Some(input.bucket),
550            key: Some(input.key),
551            upload_id: Some(upload_id.to_string()),
552            ..Default::default()
553        };
554
555        Ok(S3Response::new(output))
556    }
557
558    #[tracing::instrument]
559    async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
560        let UploadPartInput {
561            body,
562            upload_id,
563            part_number,
564            ..
565        } = req.input;
566
567        let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
568
569        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
570        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
571            return Err(s3_error!(AccessDenied));
572        }
573
574        let file_path = self.resolve_upload_part_path(upload_id, part_number)?;
575
576        let mut md5_hash = <Md5 as Digest>::new();
577        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
578
579        let mut file_writer = self.prepare_file_write(&file_path).await?;
580        let size = copy_bytes(stream, file_writer.writer()).await?;
581        file_writer.done().await?;
582
583        let md5_sum = hex(md5_hash.finalize());
584
585        debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
586
587        let output = UploadPartOutput {
588            e_tag: Some(format!("\"{md5_sum}\"")),
589            ..Default::default()
590        };
591        Ok(S3Response::new(output))
592    }
593
594    #[tracing::instrument]
595    async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
596        let input = req.input;
597
598        let upload_id = Uuid::parse_str(&input.upload_id).map_err(|_| s3_error!(InvalidRequest))?;
599        let part_number = input.part_number;
600        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
601            return Err(s3_error!(AccessDenied));
602        }
603
604        let (src_bucket, src_key) = match input.copy_source {
605            CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
606            CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
607        };
608        let src_path = self.get_object_path(src_bucket, src_key)?;
609        let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;
610
611        let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
612        let file_len = try_!(src_file.metadata().await).len();
613
614        let (start, end) = if let Some(copy_range) = &input.copy_source_range {
615            if !copy_range.starts_with("bytes=") {
616                return Err(s3_error!(InvalidArgument));
617            }
618            let range = &copy_range["bytes=".len()..];
619            let parts: Vec<&str> = range.split('-').collect();
620            if parts.len() != 2 {
621                return Err(s3_error!(InvalidArgument));
622            }
623
624            let start: u64 = parts[0].parse().map_err(|_| s3_error!(InvalidArgument))?;
625            let mut end = file_len - 1;
626            if parts[1].is_empty().not() {
627                end = parts[1].parse().map_err(|_| s3_error!(InvalidArgument))?;
628            }
629            (start, end)
630        } else {
631            (0, file_len - 1)
632        };
633
634        let content_length = end - start + 1;
635        let content_length_usize = try_!(usize::try_from(content_length));
636
637        let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
638        let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));
639
640        let mut md5_hash = <Md5 as Digest>::new();
641        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
642
643        let mut file_writer = self.prepare_file_write(&dst_path).await?;
644        let size = copy_bytes(stream, file_writer.writer()).await?;
645        file_writer.done().await?;
646
647        let md5_sum = hex(md5_hash.finalize());
648
649        debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
650
651        let output = UploadPartCopyOutput {
652            copy_part_result: Some(CopyPartResult {
653                e_tag: Some(format!("\"{md5_sum}\"")),
654                ..Default::default()
655            }),
656            ..Default::default()
657        };
658
659        Ok(S3Response::new(output))
660    }
661
662    #[tracing::instrument]
663    async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
664        let ListPartsInput {
665            bucket, key, upload_id, ..
666        } = req.input;
667
668        let mut parts: Vec<Part> = Vec::new();
669        let mut iter = try_!(fs::read_dir(&self.root).await);
670
671        let prefix = format!(".upload_id-{upload_id}");
672
673        while let Some(entry) = try_!(iter.next_entry().await) {
674            let file_type = try_!(entry.file_type().await);
675            if file_type.is_file().not() {
676                continue;
677            }
678
679            let file_name = entry.file_name();
680            let Some(name) = file_name.to_str() else { continue };
681
682            let Some(part_segment) = name.strip_prefix(&prefix) else { continue };
683            let Some(part_number) = part_segment.strip_prefix(".part-") else { continue };
684            let part_number = part_number.parse::<i32>().unwrap();
685
686            let file_meta = try_!(entry.metadata().await);
687            let last_modified = Timestamp::from(try_!(file_meta.modified()));
688            let size = try_!(i64::try_from(file_meta.len()));
689
690            let part = Part {
691                last_modified: Some(last_modified),
692                part_number: Some(part_number),
693                size: Some(size),
694                ..Default::default()
695            };
696            parts.push(part);
697        }
698
699        let output = ListPartsOutput {
700            bucket: Some(bucket),
701            key: Some(key),
702            upload_id: Some(upload_id),
703            parts: Some(parts),
704            ..Default::default()
705        };
706        Ok(S3Response::new(output))
707    }
708
709    #[tracing::instrument]
710    async fn complete_multipart_upload(
711        &self,
712        req: S3Request<CompleteMultipartUploadInput>,
713    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
714        let CompleteMultipartUploadInput {
715            multipart_upload,
716            bucket,
717            key,
718            upload_id,
719            ..
720        } = req.input;
721
722        let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
723
724        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
725        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
726            return Err(s3_error!(AccessDenied));
727        }
728
729        self.delete_upload_id(&upload_id).await?;
730
731        if let Ok(Some(metadata)) = self.load_metadata(&bucket, &key, Some(upload_id)).await {
732            self.save_metadata(&bucket, &key, &metadata, None).await?;
733            let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
734        }
735
736        let object_path = self.get_object_path(&bucket, &key)?;
737        let mut file_writer = self.prepare_file_write(&object_path).await?;
738
739        let mut cnt: i32 = 0;
740        for part in multipart_upload.parts.into_iter().flatten() {
741            let part_number = part
742                .part_number
743                .ok_or_else(|| s3_error!(InvalidRequest, "missing part number"))?;
744            cnt += 1;
745            if part_number != cnt {
746                return Err(s3_error!(InvalidRequest, "invalid part order"));
747            }
748
749            let part_path = self.resolve_upload_part_path(upload_id, part_number)?;
750
751            let mut reader = try_!(fs::File::open(&part_path).await);
752            let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);
753
754            debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
755            try_!(fs::remove_file(&part_path).await);
756        }
757        file_writer.done().await?;
758
759        let file_size = try_!(fs::metadata(&object_path).await).len();
760        let md5_sum = self.get_md5_sum(&bucket, &key).await?;
761
762        debug!(?md5_sum, path = %object_path.display(), size = ?file_size, "file md5 sum");
763
764        let output = CompleteMultipartUploadOutput {
765            bucket: Some(bucket),
766            key: Some(key),
767            e_tag: Some(format!("\"{md5_sum}\"")),
768            ..Default::default()
769        };
770        Ok(S3Response::new(output))
771    }
772
773    #[tracing::instrument]
774    async fn abort_multipart_upload(
775        &self,
776        req: S3Request<AbortMultipartUploadInput>,
777    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
778        let AbortMultipartUploadInput {
779            bucket, key, upload_id, ..
780        } = req.input;
781
782        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
783        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
784            return Err(s3_error!(AccessDenied));
785        }
786
787        let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
788
789        let prefix = format!(".upload_id-{upload_id}");
790        let mut iter = try_!(fs::read_dir(&self.root).await);
791        while let Some(entry) = try_!(iter.next_entry().await) {
792            let file_type = try_!(entry.file_type().await);
793            if file_type.is_file().not() {
794                continue;
795            }
796
797            let file_name = entry.file_name();
798            let Some(name) = file_name.to_str() else { continue };
799
800            if name.starts_with(&prefix) {
801                try_!(fs::remove_file(entry.path()).await);
802            }
803        }
804
805        self.delete_upload_id(&upload_id).await?;
806
807        debug!(bucket = %bucket, key = %key, upload_id = %upload_id, "multipart upload aborted");
808
809        Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
810    }
811}