s3s_fs/
s3.rs

1use crate::fs::FileSystem;
2use crate::fs::InternalInfo;
3use crate::utils::*;
4
5use s3s::S3;
6use s3s::S3Result;
7use s3s::crypto::Checksum;
8use s3s::crypto::Md5;
9use s3s::dto::*;
10use s3s::s3_error;
11use s3s::{S3Request, S3Response};
12
13use std::collections::VecDeque;
14use std::io;
15use std::ops::Neg;
16use std::ops::Not;
17use std::path::Component;
18use std::path::{Path, PathBuf};
19
20use tokio::fs;
21use tokio::io::AsyncSeekExt;
22use tokio_util::io::ReaderStream;
23
24use futures::TryStreamExt;
25use numeric_cast::NumericCast;
26use stdx::default::default;
27use tracing::debug;
28use uuid::Uuid;
29
30fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
31    let mut normalized = String::new();
32    let mut first = true;
33    for component in path.components() {
34        match component {
35            Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
36                return None;
37            }
38            Component::Normal(name) => {
39                let name = name.to_str()?;
40                if !first {
41                    normalized.push_str(delimiter);
42                }
43                normalized.push_str(name);
44                first = false;
45            }
46        }
47    }
48    Some(normalized)
49}
50
51/// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
52fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String {
53    format!("bytes {start}-{end_inclusive}/{size}")
54}
55
56#[async_trait::async_trait]
57impl S3 for FileSystem {
58    #[tracing::instrument]
59    async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
60        let input = req.input;
61        let path = self.get_bucket_path(&input.bucket)?;
62
63        if path.exists() {
64            return Err(s3_error!(BucketAlreadyExists));
65        }
66
67        try_!(fs::create_dir(&path).await);
68
69        let output = CreateBucketOutput::default(); // TODO: handle other fields
70        Ok(S3Response::new(output))
71    }
72
73    #[tracing::instrument]
74    async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
75        let input = req.input;
76        let (bucket, key) = match input.copy_source {
77            CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
78            CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
79        };
80
81        let src_path = self.get_object_path(bucket, key)?;
82        let dst_path = self.get_object_path(&input.bucket, &input.key)?;
83
84        if src_path.exists().not() {
85            return Err(s3_error!(NoSuchKey));
86        }
87
88        if self.get_bucket_path(&input.bucket)?.exists().not() {
89            return Err(s3_error!(NoSuchBucket));
90        }
91
92        if let Some(dir_path) = dst_path.parent() {
93            try_!(fs::create_dir_all(&dir_path).await);
94        }
95
96        let file_metadata = try_!(fs::metadata(&src_path).await);
97        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
98
99        let _ = try_!(fs::copy(&src_path, &dst_path).await);
100
101        debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");
102
103        let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
104        if src_metadata_path.exists() {
105            let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
106            let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
107        }
108
109        let md5_sum = self.get_md5_sum(bucket, key).await?;
110
111        let copy_object_result = CopyObjectResult {
112            e_tag: Some(ETag::Strong(md5_sum)),
113            last_modified: Some(last_modified),
114            ..Default::default()
115        };
116
117        let output = CopyObjectOutput {
118            copy_object_result: Some(copy_object_result),
119            ..Default::default()
120        };
121        Ok(S3Response::new(output))
122    }
123
124    #[tracing::instrument]
125    async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
126        let input = req.input;
127        let path = self.get_bucket_path(&input.bucket)?;
128        if path.exists() {
129            try_!(fs::remove_dir_all(path).await);
130        } else {
131            return Err(s3_error!(NoSuchBucket));
132        }
133        Ok(S3Response::new(DeleteBucketOutput {}))
134    }
135
136    #[tracing::instrument]
137    async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
138        let input = req.input;
139        let path = self.get_object_path(&input.bucket, &input.key)?;
140        if path.exists().not() {
141            return Err(s3_error!(NoSuchKey));
142        }
143        if input.key.ends_with('/') {
144            let mut dir = try_!(fs::read_dir(&path).await);
145            let is_empty = try_!(dir.next_entry().await).is_none();
146            if is_empty {
147                try_!(fs::remove_dir(&path).await);
148            }
149        } else {
150            try_!(fs::remove_file(&path).await);
151        }
152        let output = DeleteObjectOutput::default(); // TODO: handle other fields
153        Ok(S3Response::new(output))
154    }
155
156    #[tracing::instrument]
157    async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
158        let input = req.input;
159        let mut objects: Vec<(PathBuf, String)> = Vec::new();
160        for object in input.delete.objects {
161            let path = self.get_object_path(&input.bucket, &object.key)?;
162            if path.exists() {
163                objects.push((path, object.key));
164            }
165        }
166
167        let mut deleted_objects: Vec<DeletedObject> = Vec::new();
168        for (path, key) in objects {
169            try_!(fs::remove_file(path).await);
170
171            let deleted_object = DeletedObject {
172                key: Some(key),
173                ..Default::default()
174            };
175
176            deleted_objects.push(deleted_object);
177        }
178
179        let output = DeleteObjectsOutput {
180            deleted: Some(deleted_objects),
181            ..Default::default()
182        };
183        Ok(S3Response::new(output))
184    }
185
186    #[tracing::instrument]
187    async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
188        let input = req.input;
189        let path = self.get_bucket_path(&input.bucket)?;
190
191        if !path.exists() {
192            return Err(s3_error!(NoSuchBucket));
193        }
194
195        let output = GetBucketLocationOutput::default();
196        Ok(S3Response::new(output))
197    }
198
199    #[tracing::instrument]
200    async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
201        let input = req.input;
202        let object_path = self.get_object_path(&input.bucket, &input.key)?;
203
204        let mut file = fs::File::open(&object_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
205
206        let file_metadata = try_!(file.metadata().await);
207        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
208        let file_len = file_metadata.len();
209
210        let (content_length, content_range) = match input.range {
211            None => (file_len, None),
212            Some(range) => {
213                let file_range = range.check(file_len)?;
214                let content_length = file_range.end - file_range.start;
215                let content_range = fmt_content_range(file_range.start, file_range.end - 1, file_len);
216                (content_length, Some(content_range))
217            }
218        };
219        let content_length_usize = try_!(usize::try_from(content_length));
220        let content_length_i64 = try_!(i64::try_from(content_length));
221
222        match input.range {
223            Some(Range::Int { first, .. }) => {
224                try_!(file.seek(io::SeekFrom::Start(first)).await);
225            }
226            Some(Range::Suffix { length }) => {
227                let neg_offset = length.numeric_cast::<i64>().neg();
228                try_!(file.seek(io::SeekFrom::End(neg_offset)).await);
229            }
230            None => {}
231        }
232
233        let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);
234
235        let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
236
237        let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;
238
239        let info = self.load_internal_info(&input.bucket, &input.key).await?;
240        let checksum = match &info {
241            // S3 skips returning the checksum if a range is specified that is
242            // less than the whole file
243            Some(info) if content_length == file_len => crate::checksum::from_internal_info(info),
244            _ => default(),
245        };
246
247        #[allow(clippy::redundant_closure_for_method_calls)]
248        let output = GetObjectOutput {
249            body: Some(StreamingBlob::wrap(body)),
250            content_length: Some(content_length_i64),
251            content_range,
252            last_modified: Some(last_modified),
253            metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
254            content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
255            content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
256            content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
257            content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
258            cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
259            expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
260            website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
261            e_tag: Some(ETag::Strong(md5_sum)),
262            checksum_crc32: checksum.checksum_crc32,
263            checksum_crc32c: checksum.checksum_crc32c,
264            checksum_sha1: checksum.checksum_sha1,
265            checksum_sha256: checksum.checksum_sha256,
266            checksum_crc64nvme: checksum.checksum_crc64nvme,
267            ..Default::default()
268        };
269        Ok(S3Response::new(output))
270    }
271
272    #[tracing::instrument]
273    async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
274        let input = req.input;
275        let path = self.get_bucket_path(&input.bucket)?;
276
277        if !path.exists() {
278            return Err(s3_error!(NoSuchBucket));
279        }
280
281        Ok(S3Response::new(HeadBucketOutput::default()))
282    }
283
284    #[tracing::instrument]
285    async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
286        let input = req.input;
287        let path = self.get_object_path(&input.bucket, &input.key)?;
288
289        if !path.exists() {
290            return Err(s3_error!(NoSuchBucket));
291        }
292
293        let file_metadata = try_!(fs::metadata(path).await);
294        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
295        let file_len = file_metadata.len();
296
297        let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
298
299        #[allow(clippy::redundant_closure_for_method_calls)]
300        let output = HeadObjectOutput {
301            content_length: Some(try_!(i64::try_from(file_len))),
302            content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
303            content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
304            content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
305            content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
306            cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
307            expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
308            website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
309            last_modified: Some(last_modified),
310            metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
311            ..Default::default()
312        };
313        Ok(S3Response::new(output))
314    }
315
316    #[tracing::instrument]
317    async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
318        let mut buckets: Vec<Bucket> = Vec::new();
319        let mut iter = try_!(fs::read_dir(&self.root).await);
320        while let Some(entry) = try_!(iter.next_entry().await) {
321            let file_type = try_!(entry.file_type().await);
322            if file_type.is_dir().not() {
323                continue;
324            }
325
326            let file_name = entry.file_name();
327            let Some(name) = file_name.to_str() else { continue };
328            if s3s::path::check_bucket_name(name).not() {
329                continue;
330            }
331
332            let file_meta = try_!(entry.metadata().await);
333            // Not all filesystems/mounts provide all file attributes like created timestamp,
334            // therefore we try to fallback to modified if possible.
335            // See https://github.com/Nugine/s3s/pull/22 for more details.
336            let created_or_modified_date = Timestamp::from(try_!(file_meta.created().or(file_meta.modified())));
337
338            let bucket = Bucket {
339                creation_date: Some(created_or_modified_date),
340                name: Some(name.to_owned()),
341                bucket_region: None,
342            };
343            buckets.push(bucket);
344        }
345
346        let output = ListBucketsOutput {
347            buckets: Some(buckets),
348            owner: None,
349            ..Default::default()
350        };
351        Ok(S3Response::new(output))
352    }
353
354    #[tracing::instrument]
355    async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
356        let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
357
358        Ok(v2_resp.map_output(|v2| ListObjectsOutput {
359            contents: v2.contents,
360            common_prefixes: v2.common_prefixes,
361            delimiter: v2.delimiter,
362            encoding_type: v2.encoding_type,
363            name: v2.name,
364            prefix: v2.prefix,
365            max_keys: v2.max_keys,
366            is_truncated: v2.is_truncated,
367            ..Default::default()
368        }))
369    }
370
371    #[tracing::instrument]
372    async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
373        let input = req.input;
374        let path = self.get_bucket_path(&input.bucket)?;
375
376        if path.exists().not() {
377            return Err(s3_error!(NoSuchBucket));
378        }
379
380        let delimiter = input.delimiter.as_deref();
381        let prefix = input.prefix.as_deref().unwrap_or("").trim_start_matches('/');
382        let max_keys = input.max_keys.unwrap_or(1000);
383
384        // Collect all matching objects and common prefixes
385        let mut objects: Vec<Object> = default();
386        let mut common_prefixes = std::collections::BTreeSet::new();
387
388        if let Some(delimiter) = delimiter {
389            self.list_objects_with_delimiter(&path, prefix, delimiter, &mut objects, &mut common_prefixes)
390                .await?;
391        } else {
392            self.list_objects_recursive(&path, prefix, &mut objects).await?;
393        }
394
395        // Sort before filtering and limiting
396        objects.sort_by(|lhs, rhs| {
397            let lhs_key = lhs.key.as_deref().unwrap_or("");
398            let rhs_key = rhs.key.as_deref().unwrap_or("");
399            lhs_key.cmp(rhs_key)
400        });
401
402        // Apply start_after filter if provided
403        if let Some(marker) = &input.start_after {
404            objects.retain(|obj| obj.key.as_deref().unwrap_or("") > marker.as_str());
405        }
406
407        // Convert common_prefixes to sorted list
408        let common_prefixes_list: Vec<CommonPrefix> = common_prefixes
409            .into_iter()
410            .map(|prefix| CommonPrefix { prefix: Some(prefix) })
411            .collect();
412
413        // Limit results to max_keys by interleaving objects and common_prefixes
414        let mut result_objects = Vec::new();
415        let mut result_prefixes = Vec::new();
416        let mut total_count = 0;
417        let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
418
419        let mut obj_idx = 0;
420        let mut prefix_idx = 0;
421
422        while total_count < max_keys_usize {
423            let obj_key = objects.get(obj_idx).and_then(|o| o.key.as_deref());
424            let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.as_deref());
425
426            match (obj_key, prefix_key) {
427                (Some(ok), Some(pk)) => {
428                    if ok < pk {
429                        result_objects.push(objects[obj_idx].clone());
430                        obj_idx += 1;
431                    } else {
432                        result_prefixes.push(common_prefixes_list[prefix_idx].clone());
433                        prefix_idx += 1;
434                    }
435                    total_count += 1;
436                }
437                (Some(_), None) => {
438                    result_objects.push(objects[obj_idx].clone());
439                    obj_idx += 1;
440                    total_count += 1;
441                }
442                (None, Some(_)) => {
443                    result_prefixes.push(common_prefixes_list[prefix_idx].clone());
444                    prefix_idx += 1;
445                    total_count += 1;
446                }
447                (None, None) => break,
448            }
449        }
450
451        let is_truncated = obj_idx < objects.len() || prefix_idx < common_prefixes_list.len();
452        let key_count = try_!(i32::try_from(total_count));
453
454        let contents = result_objects.is_empty().not().then_some(result_objects);
455        let common_prefixes = result_prefixes.is_empty().not().then_some(result_prefixes);
456
457        let output = ListObjectsV2Output {
458            key_count: Some(key_count),
459            max_keys: Some(max_keys),
460            is_truncated: Some(is_truncated),
461            contents,
462            common_prefixes,
463            delimiter: input.delimiter,
464            encoding_type: input.encoding_type,
465            name: Some(input.bucket),
466            prefix: input.prefix,
467            ..Default::default()
468        };
469        Ok(S3Response::new(output))
470    }
471
472    #[tracing::instrument]
473    async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
474        use crate::fs::ObjectAttributes;
475
476        let mut input = req.input;
477        if let Some(ref storage_class) = input.storage_class {
478            let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
479            if !is_valid {
480                return Err(s3_error!(InvalidStorageClass));
481            }
482        }
483
484        let PutObjectInput {
485            body,
486            bucket,
487            key,
488            metadata,
489            content_length,
490            content_md5,
491            content_encoding,
492            content_type,
493            content_disposition,
494            content_language,
495            cache_control,
496            expires,
497            website_redirect_location,
498            if_none_match,
499            ..
500        } = input;
501
502        let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
503
504        // Check If-None-Match condition
505        // If-None-Match: * means "only create if the object doesn't exist"
506        if let Some(ref condition) = if_none_match {
507            if condition.is_any() {
508                let object_path = self.get_object_path(&bucket, &key)?;
509                if object_path.exists() {
510                    return Err(s3_error!(PreconditionFailed, "Object already exists"));
511                }
512            }
513        }
514
515        let mut checksum: s3s::checksum::ChecksumHasher = default();
516        if input.checksum_crc32.is_some() {
517            checksum.crc32 = Some(default());
518        }
519        if input.checksum_crc32c.is_some() {
520            checksum.crc32c = Some(default());
521        }
522        if input.checksum_sha1.is_some() {
523            checksum.sha1 = Some(default());
524        }
525        if input.checksum_sha256.is_some() {
526            checksum.sha256 = Some(default());
527        }
528        if input.checksum_crc64nvme.is_some() {
529            checksum.crc64nvme = Some(default());
530        }
531        if let Some(alg) = input.checksum_algorithm {
532            match alg.as_str() {
533                ChecksumAlgorithm::CRC32 => checksum.crc32 = Some(default()),
534                ChecksumAlgorithm::CRC32C => checksum.crc32c = Some(default()),
535                ChecksumAlgorithm::SHA1 => checksum.sha1 = Some(default()),
536                ChecksumAlgorithm::SHA256 => checksum.sha256 = Some(default()),
537                ChecksumAlgorithm::CRC64NVME => checksum.crc64nvme = Some(default()),
538                _ => return Err(s3_error!(NotImplemented, "Unsupported checksum algorithm")),
539            }
540        }
541
542        if key.ends_with('/') {
543            if let Some(len) = content_length {
544                if len > 0 {
545                    return Err(s3_error!(UnexpectedContent, "Unexpected request body when creating a directory object."));
546                }
547            }
548            let object_path = self.get_object_path(&bucket, &key)?;
549            try_!(fs::create_dir_all(&object_path).await);
550            let output = PutObjectOutput::default();
551            return Ok(S3Response::new(output));
552        }
553
554        let object_path = self.get_object_path(&bucket, &key)?;
555        let mut file_writer = self.prepare_file_write(&object_path).await?;
556
557        let mut md5_hash = Md5::new();
558        let stream = body.inspect_ok(|bytes| {
559            md5_hash.update(bytes.as_ref());
560            checksum.update(bytes.as_ref());
561        });
562
563        let size = copy_bytes(stream, file_writer.writer()).await?;
564        file_writer.done().await?;
565
566        let md5_sum = hex(md5_hash.finalize());
567
568        if let Some(content_md5) = content_md5 {
569            let content_md5 = base64_simd::STANDARD
570                .decode_to_vec(content_md5)
571                .map_err(|_| s3_error!(InvalidArgument))?;
572            let content_md5 = hex(content_md5);
573            if content_md5 != md5_sum {
574                return Err(s3_error!(BadDigest, "content_md5 mismatch"));
575            }
576        }
577
578        let checksum = checksum.finalize();
579
580        if let Some(trailers) = req.trailing_headers {
581            if let Some(trailers) = trailers.take() {
582                if let Some(crc32) = trailers.get("x-amz-checksum-crc32") {
583                    input.checksum_crc32 = Some(crc32.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
584                }
585                if let Some(crc32c) = trailers.get("x-amz-checksum-crc32c") {
586                    input.checksum_crc32c = Some(crc32c.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
587                }
588                if let Some(sha1) = trailers.get("x-amz-checksum-sha1") {
589                    input.checksum_sha1 = Some(sha1.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
590                }
591                if let Some(sha256) = trailers.get("x-amz-checksum-sha256") {
592                    input.checksum_sha256 = Some(sha256.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
593                }
594                if let Some(crc64nvme) = trailers.get("x-amz-checksum-crc64nvme") {
595                    input.checksum_crc64nvme = Some(crc64nvme.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
596                }
597            }
598        }
599
600        if checksum.checksum_crc32 != input.checksum_crc32 {
601            return Err(s3_error!(
602                BadDigest,
603                "checksum_crc32 mismatch: expected `{}`, got `{}`",
604                input.checksum_crc32.unwrap_or_default(),
605                checksum.checksum_crc32.unwrap_or_default()
606            ));
607        }
608        if checksum.checksum_crc32c != input.checksum_crc32c {
609            return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
610        }
611        if checksum.checksum_sha1 != input.checksum_sha1 {
612            return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
613        }
614        if checksum.checksum_sha256 != input.checksum_sha256 {
615            return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
616        }
617        if checksum.checksum_crc64nvme != input.checksum_crc64nvme {
618            return Err(s3_error!(BadDigest, "checksum_crc64nvme mismatch"));
619        }
620
621        debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");
622
623        // Save object attributes (including user metadata and standard attributes)
624        let mut obj_attrs = ObjectAttributes {
625            user_metadata: metadata,
626            content_encoding,
627            content_type,
628            content_disposition,
629            content_language,
630            cache_control,
631            expires: None,
632            website_redirect_location,
633        };
634        obj_attrs.set_expires_timestamp(expires);
635        self.save_object_attributes(&bucket, &key, &obj_attrs, None).await?;
636
637        let mut info: InternalInfo = default();
638        crate::checksum::modify_internal_info(&mut info, &checksum);
639        self.save_internal_info(&bucket, &key, &info).await?;
640
641        let output = PutObjectOutput {
642            e_tag: Some(ETag::Strong(md5_sum)),
643            checksum_crc32: checksum.checksum_crc32,
644            checksum_crc32c: checksum.checksum_crc32c,
645            checksum_sha1: checksum.checksum_sha1,
646            checksum_sha256: checksum.checksum_sha256,
647            checksum_crc64nvme: checksum.checksum_crc64nvme,
648            ..Default::default()
649        };
650        Ok(S3Response::new(output))
651    }
652
653    #[tracing::instrument]
654    async fn create_multipart_upload(
655        &self,
656        req: S3Request<CreateMultipartUploadInput>,
657    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
658        use crate::fs::ObjectAttributes;
659
660        let input = req.input;
661        let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;
662
663        // Save object attributes (including user metadata and standard attributes)
664        let mut obj_attrs = ObjectAttributes {
665            user_metadata: input.metadata,
666            content_encoding: input.content_encoding,
667            content_type: input.content_type,
668            content_disposition: input.content_disposition,
669            content_language: input.content_language,
670            cache_control: input.cache_control,
671            expires: None,
672            website_redirect_location: input.website_redirect_location,
673        };
674        obj_attrs.set_expires_timestamp(input.expires);
675        self.save_object_attributes(&input.bucket, &input.key, &obj_attrs, Some(upload_id))
676            .await?;
677
678        let output = CreateMultipartUploadOutput {
679            bucket: Some(input.bucket),
680            key: Some(input.key),
681            upload_id: Some(upload_id.to_string()),
682            ..Default::default()
683        };
684
685        Ok(S3Response::new(output))
686    }
687
688    #[tracing::instrument]
689    async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
690        let UploadPartInput {
691            body,
692            upload_id,
693            part_number,
694            ..
695        } = req.input;
696
697        if part_number > 10_000 {
698            return Err(s3_error!(
699                InvalidArgument,
700                "Part number must be an integer between 1 and 10000, inclusive"
701            ));
702        }
703
704        let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
705
706        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
707        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
708            return Err(s3_error!(AccessDenied));
709        }
710
711        let file_path = self.resolve_upload_part_path(upload_id, part_number)?;
712
713        let mut md5_hash = Md5::new();
714        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
715
716        let mut file_writer = self.prepare_file_write(&file_path).await?;
717        let size = copy_bytes(stream, file_writer.writer()).await?;
718        file_writer.done().await?;
719
720        let md5_sum = hex(md5_hash.finalize());
721
722        debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
723
724        let output = UploadPartOutput {
725            e_tag: Some(ETag::Strong(md5_sum)),
726            ..Default::default()
727        };
728        Ok(S3Response::new(output))
729    }
730
731    #[tracing::instrument]
732    async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
733        let input = req.input;
734
735        let upload_id = Uuid::parse_str(&input.upload_id).map_err(|_| s3_error!(InvalidRequest))?;
736        let part_number = input.part_number;
737        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
738            return Err(s3_error!(AccessDenied));
739        }
740
741        let (src_bucket, src_key) = match input.copy_source {
742            CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
743            CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
744        };
745        let src_path = self.get_object_path(src_bucket, src_key)?;
746        let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;
747
748        let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
749        let file_len = try_!(src_file.metadata().await).len();
750
751        let (start, end) = if let Some(copy_range) = &input.copy_source_range {
752            if !copy_range.starts_with("bytes=") {
753                return Err(s3_error!(InvalidArgument));
754            }
755            let range = &copy_range["bytes=".len()..];
756            let parts: Vec<&str> = range.split('-').collect();
757            if parts.len() != 2 {
758                return Err(s3_error!(InvalidArgument));
759            }
760
761            let start: u64 = parts[0].parse().map_err(|_| s3_error!(InvalidArgument))?;
762            let mut end = file_len - 1;
763            if parts[1].is_empty().not() {
764                end = parts[1].parse().map_err(|_| s3_error!(InvalidArgument))?;
765            }
766            (start, end)
767        } else {
768            (0, file_len - 1)
769        };
770
771        let content_length = end - start + 1;
772        let content_length_usize = try_!(usize::try_from(content_length));
773
774        let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
775        let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));
776
777        let mut md5_hash = Md5::new();
778        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
779
780        let mut file_writer = self.prepare_file_write(&dst_path).await?;
781        let size = copy_bytes(stream, file_writer.writer()).await?;
782        file_writer.done().await?;
783
784        let md5_sum = hex(md5_hash.finalize());
785
786        debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
787
788        let output = UploadPartCopyOutput {
789            copy_part_result: Some(CopyPartResult {
790                e_tag: Some(ETag::Strong(md5_sum)),
791                ..Default::default()
792            }),
793            ..Default::default()
794        };
795
796        Ok(S3Response::new(output))
797    }
798
799    #[tracing::instrument]
800    async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
801        let ListPartsInput {
802            bucket, key, upload_id, ..
803        } = req.input;
804
805        let mut parts: Vec<Part> = Vec::new();
806        let mut iter = try_!(fs::read_dir(&self.root).await);
807
808        let prefix = format!(".upload_id-{upload_id}");
809
810        while let Some(entry) = try_!(iter.next_entry().await) {
811            let file_type = try_!(entry.file_type().await);
812            if file_type.is_file().not() {
813                continue;
814            }
815
816            let file_name = entry.file_name();
817            let Some(name) = file_name.to_str() else { continue };
818
819            let Some(part_segment) = name.strip_prefix(&prefix) else { continue };
820            let Some(part_number) = part_segment.strip_prefix(".part-") else { continue };
821            let part_number = part_number.parse::<i32>().unwrap();
822
823            let file_meta = try_!(entry.metadata().await);
824            let last_modified = Timestamp::from(try_!(file_meta.modified()));
825            let size = try_!(i64::try_from(file_meta.len()));
826
827            let part = Part {
828                last_modified: Some(last_modified),
829                part_number: Some(part_number),
830                size: Some(size),
831                ..Default::default()
832            };
833            parts.push(part);
834        }
835
836        let output = ListPartsOutput {
837            bucket: Some(bucket),
838            key: Some(key),
839            upload_id: Some(upload_id),
840            parts: Some(parts),
841            ..Default::default()
842        };
843        Ok(S3Response::new(output))
844    }
845
846    #[tracing::instrument]
847    async fn complete_multipart_upload(
848        &self,
849        req: S3Request<CompleteMultipartUploadInput>,
850    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
851        let CompleteMultipartUploadInput {
852            multipart_upload,
853            bucket,
854            key,
855            upload_id,
856            ..
857        } = req.input;
858
859        let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
860
861        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
862        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
863            return Err(s3_error!(AccessDenied));
864        }
865
866        self.delete_upload_id(&upload_id).await?;
867
868        if let Ok(Some(attrs)) = self.load_object_attributes(&bucket, &key, Some(upload_id)).await {
869            self.save_object_attributes(&bucket, &key, &attrs, None).await?;
870            let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
871        }
872
873        let object_path = self.get_object_path(&bucket, &key)?;
874        let mut file_writer = self.prepare_file_write(&object_path).await?;
875
876        let mut cnt: i32 = 0;
877        let total_parts_cnt = multipart_upload
878            .parts
879            .as_ref()
880            .map(|parts| i32::try_from(parts.len()).expect("total number of parts must be <= 10000."))
881            .unwrap_or_default();
882
883        for part in multipart_upload.parts.into_iter().flatten() {
884            let part_number = part
885                .part_number
886                .ok_or_else(|| s3_error!(InvalidRequest, "missing part number"))?;
887            cnt += 1;
888            if part_number != cnt {
889                return Err(s3_error!(InvalidRequest, "invalid part order"));
890            }
891
892            let part_path = self.resolve_upload_part_path(upload_id, part_number)?;
893
894            let mut reader = try_!(fs::File::open(&part_path).await);
895            let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);
896
897            if part_number != total_parts_cnt && size < 5 * 1024 * 1024 {
898                return Err(s3_error!(EntityTooSmall));
899            }
900
901            debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
902            try_!(fs::remove_file(&part_path).await);
903        }
904        file_writer.done().await?;
905
906        let file_size = try_!(fs::metadata(&object_path).await).len();
907        let md5_sum = self.get_md5_sum(&bucket, &key).await?;
908
909        debug!(?md5_sum, path = %object_path.display(), size = ?file_size, "file md5 sum");
910
911        let output = CompleteMultipartUploadOutput {
912            // TODO: better example of AWS-like keep-alive behavior
913            future: Some(Box::pin(async move {
914                Ok(CompleteMultipartUploadOutput {
915                    bucket: Some(bucket),
916                    key: Some(key),
917                    e_tag: Some(ETag::Strong(md5_sum)),
918                    ..Default::default()
919                })
920            })),
921            ..Default::default()
922        };
923
924        debug!(?output);
925
926        Ok(S3Response::new(output))
927    }
928
929    #[tracing::instrument]
930    async fn abort_multipart_upload(
931        &self,
932        req: S3Request<AbortMultipartUploadInput>,
933    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
934        let AbortMultipartUploadInput {
935            bucket, key, upload_id, ..
936        } = req.input;
937
938        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
939        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
940            return Err(s3_error!(AccessDenied));
941        }
942
943        let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
944
945        let prefix = format!(".upload_id-{upload_id}");
946        let mut iter = try_!(fs::read_dir(&self.root).await);
947        while let Some(entry) = try_!(iter.next_entry().await) {
948            let file_type = try_!(entry.file_type().await);
949            if file_type.is_file().not() {
950                continue;
951            }
952
953            let file_name = entry.file_name();
954            let Some(name) = file_name.to_str() else { continue };
955
956            if name.starts_with(&prefix) {
957                try_!(fs::remove_file(entry.path()).await);
958            }
959        }
960
961        self.delete_upload_id(&upload_id).await?;
962
963        debug!(bucket = %bucket, key = %key, upload_id = %upload_id, "multipart upload aborted");
964
965        Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
966    }
967}
968
969impl FileSystem {
970    async fn list_objects_recursive(&self, bucket_root: &Path, prefix: &str, objects: &mut Vec<Object>) -> S3Result<()> {
971        let mut dir_queue: VecDeque<PathBuf> = default();
972        dir_queue.push_back(bucket_root.to_owned());
973        let prefix_is_empty = prefix.is_empty();
974
975        while let Some(dir) = dir_queue.pop_front() {
976            let mut iter = try_!(fs::read_dir(dir).await);
977            while let Some(entry) = try_!(iter.next_entry().await) {
978                let file_type = try_!(entry.file_type().await);
979                if file_type.is_dir() {
980                    dir_queue.push_back(entry.path());
981                } else {
982                    let file_path = entry.path();
983                    let key = try_!(file_path.strip_prefix(bucket_root));
984                    let Some(key_str) = normalize_path(key, "/") else {
985                        continue;
986                    };
987
988                    if !prefix_is_empty && !key_str.starts_with(prefix) {
989                        continue;
990                    }
991
992                    let metadata = try_!(entry.metadata().await);
993                    let last_modified = Timestamp::from(try_!(metadata.modified()));
994                    let size = metadata.len();
995
996                    let object = Object {
997                        key: Some(key_str),
998                        last_modified: Some(last_modified),
999                        size: Some(try_!(i64::try_from(size))),
1000                        ..Default::default()
1001                    };
1002                    objects.push(object);
1003                }
1004            }
1005        }
1006
1007        Ok(())
1008    }
1009
1010    async fn list_objects_with_delimiter(
1011        &self,
1012        bucket_root: &Path,
1013        prefix: &str,
1014        delimiter: &str,
1015        objects: &mut Vec<Object>,
1016        common_prefixes: &mut std::collections::BTreeSet<String>,
1017    ) -> S3Result<()> {
1018        // For delimiter-based listing, we need to recursively scan all files
1019        // but group them according to the delimiter rules
1020        let mut dir_queue: VecDeque<PathBuf> = default();
1021        dir_queue.push_back(bucket_root.to_owned());
1022        let prefix_is_empty = prefix.is_empty();
1023
1024        while let Some(dir) = dir_queue.pop_front() {
1025            let mut iter = try_!(fs::read_dir(dir).await);
1026
1027            while let Some(entry) = try_!(iter.next_entry().await) {
1028                let file_type = try_!(entry.file_type().await);
1029                let entry_path = entry.path();
1030
1031                // Calculate the key relative to the bucket root
1032                let key = try_!(entry_path.strip_prefix(bucket_root));
1033                let Some(key_str) = normalize_path(key, "/") else {
1034                    continue;
1035                };
1036
1037                // Skip if doesn't match prefix
1038                if !prefix_is_empty && !key_str.starts_with(prefix) {
1039                    // For directories, also skip if they don't have potential to contain matching files
1040                    if file_type.is_dir() && !prefix.starts_with(&key_str) && !key_str.starts_with(prefix) {
1041                        continue;
1042                    }
1043                    if file_type.is_file() {
1044                        continue;
1045                    }
1046                }
1047
1048                if file_type.is_dir() {
1049                    // Continue scanning this directory
1050                    dir_queue.push_back(entry_path);
1051                } else {
1052                    // For files, determine if they should be listed directly or as common prefixes
1053                    let remaining = &key_str[prefix.len()..];
1054
1055                    if remaining.contains(delimiter) {
1056                        // File is in a subdirectory, add the subdirectory as common prefix
1057                        if let Some(delimiter_pos) = remaining.find(delimiter) {
1058                            let mut next_prefix = String::with_capacity(prefix.len() + delimiter_pos + 1);
1059                            next_prefix.push_str(prefix);
1060                            next_prefix.push_str(&remaining[..=delimiter_pos]);
1061                            common_prefixes.insert(next_prefix);
1062                        }
1063                    } else {
1064                        // File is at the current level, include it in objects
1065                        let metadata = try_!(entry.metadata().await);
1066                        let last_modified = Timestamp::from(try_!(metadata.modified()));
1067                        let size = metadata.len();
1068
1069                        let object = Object {
1070                            key: Some(key_str),
1071                            last_modified: Some(last_modified),
1072                            size: Some(try_!(i64::try_from(size))),
1073                            ..Default::default()
1074                        };
1075                        objects.push(object);
1076                    }
1077                }
1078            }
1079        }
1080
1081        Ok(())
1082    }
1083}