Skip to main content

s3s_fs/
s3.rs

1use crate::fs::FileSystem;
2use crate::fs::InternalInfo;
3use crate::utils::*;
4
5use s3s::S3;
6use s3s::S3Result;
7use s3s::crypto::Checksum;
8use s3s::crypto::Md5;
9use s3s::dto::*;
10use s3s::s3_error;
11use s3s::{S3Request, S3Response};
12
13use std::collections::VecDeque;
14use std::fs::FileTimes;
15use std::io;
16use std::ops::Neg;
17use std::ops::Not;
18use std::path::Component;
19use std::path::{Path, PathBuf};
20use std::time::SystemTime;
21
22use tokio::fs;
23use tokio::io::AsyncReadExt;
24use tokio::io::AsyncSeekExt;
25use tokio::io::AsyncWriteExt;
26use tokio_util::io::ReaderStream;
27
28use futures::TryStreamExt;
29use numeric_cast::NumericCast;
30use stdx::default::default;
31use tracing::debug;
32use uuid::Uuid;
33
34fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
35    let mut normalized = String::new();
36    let mut first = true;
37    for component in path.components() {
38        match component {
39            Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
40                return None;
41            }
42            Component::Normal(name) => {
43                let name = name.to_str()?;
44                if !first {
45                    normalized.push_str(delimiter);
46                }
47                normalized.push_str(name);
48                first = false;
49            }
50        }
51    }
52    Some(normalized)
53}
54
55/// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
56fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String {
57    format!("bytes {start}-{end_inclusive}/{size}")
58}
59
60#[async_trait::async_trait]
61impl S3 for FileSystem {
62    #[tracing::instrument]
63    async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
64        let input = req.input;
65        let path = self.get_bucket_path(&input.bucket)?;
66
67        if path.exists() {
68            return Err(s3_error!(BucketAlreadyExists));
69        }
70
71        try_!(fs::create_dir(&path).await);
72
73        let output = CreateBucketOutput::default(); // TODO: handle other fields
74        Ok(S3Response::new(output))
75    }
76
77    #[tracing::instrument]
78    async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
79        let input = req.input;
80        let (bucket, key) = match input.copy_source {
81            CopySource::AccessPoint { .. } | CopySource::Outpost { .. } => return Err(s3_error!(NotImplemented)),
82            CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
83        };
84
85        let src_path = self.get_object_path(bucket, key)?;
86        let dst_path = self.get_object_path(&input.bucket, &input.key)?;
87
88        if src_path.exists().not() {
89            return Err(s3_error!(NoSuchKey));
90        }
91
92        if self.get_bucket_path(&input.bucket)?.exists().not() {
93            return Err(s3_error!(NoSuchBucket));
94        }
95
96        let file_metadata = try_!(fs::metadata(&src_path).await);
97        let src_last_modified = Timestamp::from(try_!(file_metadata.modified()));
98
99        // Always load internal info – needed for ETag derivation and checksum propagation.
100        let src_info = self.load_internal_info(bucket, key).await?;
101
102        // Derive source ETag from stored internal info when available.
103        // For ETag-based conditions, fall back to MD5 only when no stored ETag exists.
104        let mut src_etag: Option<ETag> = src_info.as_ref().and_then(crate::checksum::load_e_tag).map(ETag::Strong);
105
106        // S3 precedence: If-Match overrides If-Unmodified-Since.
107        if let Some(ref condition) = input.copy_source_if_match {
108            if src_etag.is_none() {
109                src_etag = Some(ETag::Strong(self.get_md5_sum(bucket, key).await?));
110            }
111            let src = src_etag.as_ref().ok_or_else(|| s3_error!(InternalError))?;
112            let matches = match condition {
113                ETagCondition::Any => true,
114                ETagCondition::ETag(etag) => src.strong_cmp(etag),
115            };
116            if !matches {
117                return Err(s3_error!(PreconditionFailed));
118            }
119        } else if let Some(ref if_unmodified_since) = input.copy_source_if_unmodified_since
120            && src_last_modified > *if_unmodified_since
121        {
122            return Err(s3_error!(PreconditionFailed));
123        }
124
125        // S3 precedence: If-None-Match overrides If-Modified-Since.
126        if let Some(ref condition) = input.copy_source_if_none_match {
127            if src_etag.is_none() {
128                src_etag = Some(ETag::Strong(self.get_md5_sum(bucket, key).await?));
129            }
130            let src = src_etag.as_ref().ok_or_else(|| s3_error!(InternalError))?;
131            let matches = match condition {
132                ETagCondition::Any => true,
133                ETagCondition::ETag(etag) => src.weak_cmp(etag),
134            };
135            if matches {
136                return Err(s3_error!(PreconditionFailed));
137            }
138        } else if let Some(ref if_modified_since) = input.copy_source_if_modified_since
139            && src_last_modified <= *if_modified_since
140        {
141            return Err(s3_error!(PreconditionFailed));
142        }
143
144        if let Some(dir_path) = dst_path.parent() {
145            try_!(fs::create_dir_all(&dir_path).await);
146        }
147
148        // `fs::copy(p, p)` truncates the file before reading it, so self-replace
149        // must preserve bytes in place while still updating LastModified.
150        let dst_last_modified = if src_path == dst_path {
151            let now = SystemTime::now();
152            let file = try_!(std::fs::OpenOptions::new().write(true).open(&dst_path));
153            try_!(file.set_times(FileTimes::new().set_modified(now)));
154            debug!(path = %dst_path.display(), "replace file in place");
155            Timestamp::from(now)
156        } else {
157            let _ = try_!(fs::copy(&src_path, &dst_path).await);
158            debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");
159            let dst_metadata = try_!(fs::metadata(&dst_path).await);
160            Timestamp::from(try_!(dst_metadata.modified()))
161        };
162
163        // Derive the destination ETag from the source ETag when available.
164        // This preserves non-MD5 ETag formats (e.g., multipart `{hash}-{part_count}`)
165        // and avoids re-hashing the destination file.
166        let dst_etag_str = match src_etag {
167            Some(etag) => etag.into_value(),
168            None => self.get_md5_sum(&input.bucket, &input.key).await?,
169        };
170
171        // `MetadataDirective` defaults to `COPY` per AWS API: when the
172        // header is absent the destination should inherit the source's
173        // metadata sidecar verbatim. When set to `REPLACE`, the
174        // destination's metadata is built fresh from the request and
175        // anything from the source is dropped (matching the behaviour
176        // documented at
177        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html).
178        let replace_metadata = input
179            .metadata_directive
180            .as_ref()
181            .is_some_and(|d| d.as_str() == MetadataDirective::REPLACE);
182
183        if replace_metadata {
184            let mut dst_attrs = crate::fs::ObjectAttributes {
185                user_metadata: input.metadata,
186                content_encoding: input.content_encoding,
187                content_type: input.content_type,
188                content_disposition: input.content_disposition,
189                content_language: input.content_language,
190                cache_control: input.cache_control,
191                expires: None,
192                website_redirect_location: input.website_redirect_location,
193            };
194            dst_attrs.set_expires_timestamp(input.expires);
195            self.save_object_attributes(&input.bucket, &input.key, &dst_attrs, None)
196                .await?;
197        } else {
198            let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
199            if src_metadata_path.exists() {
200                let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
201                // Same self-replace guard as for the payload above — `fs::copy`
202                // would zero the metadata sidecar when src == dst.
203                if src_metadata_path != dst_metadata_path {
204                    let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
205                }
206            }
207        }
208
209        {
210            let mut info = src_info.unwrap_or_default();
211            crate::checksum::save_e_tag(&mut info, &dst_etag_str);
212            self.save_internal_info(&input.bucket, &input.key, &info).await?;
213        }
214
215        let copy_object_result = CopyObjectResult {
216            e_tag: Some(ETag::Strong(dst_etag_str)),
217            last_modified: Some(dst_last_modified),
218            ..Default::default()
219        };
220
221        let output = CopyObjectOutput {
222            copy_object_result: Some(copy_object_result),
223            ..Default::default()
224        };
225        Ok(S3Response::new(output))
226    }
227
228    #[tracing::instrument]
229    async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
230        let input = req.input;
231        let path = self.get_bucket_path(&input.bucket)?;
232        if path.exists() {
233            try_!(fs::remove_dir_all(path).await);
234        } else {
235            return Err(s3_error!(NoSuchBucket));
236        }
237        Ok(S3Response::new(DeleteBucketOutput {}))
238    }
239
240    #[tracing::instrument]
241    async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
242        let input = req.input;
243        let path = self.get_object_path(&input.bucket, &input.key)?;
244        if path.exists().not() {
245            if self.get_bucket_path(&input.bucket)?.exists().not() {
246                return Err(s3_error!(NoSuchBucket));
247            }
248            let output = DeleteObjectOutput::default();
249            return Ok(S3Response::new(output));
250        }
251        if input.key.ends_with('/') {
252            let mut dir = try_!(fs::read_dir(&path).await);
253            let is_empty = try_!(dir.next_entry().await).is_none();
254            if is_empty {
255                try_!(fs::remove_dir(&path).await);
256            }
257        } else {
258            try_!(fs::remove_file(&path).await);
259        }
260        let output = DeleteObjectOutput::default(); // TODO: handle other fields
261        Ok(S3Response::new(output))
262    }
263
264    #[tracing::instrument]
265    async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
266        let input = req.input;
267
268        let mut deleted_objects: Vec<DeletedObject> = Vec::new();
269        for object in input.delete.objects {
270            let path = self.get_object_path(&input.bucket, &object.key)?;
271            if object.key.ends_with('/') {
272                match fs::read_dir(&path).await {
273                    Ok(mut dir) => {
274                        let is_empty = try_!(dir.next_entry().await).is_none();
275                        if is_empty {
276                            try_!(fs::remove_dir(&path).await);
277                        }
278                    }
279                    Err(e) if e.kind() == io::ErrorKind::NotFound => {}
280                    Err(e) => {
281                        let _: () = try_!(Err(e));
282                    }
283                }
284            } else {
285                match fs::remove_file(&path).await {
286                    Ok(()) => {}
287                    Err(e) if e.kind() == io::ErrorKind::NotFound => {}
288                    Err(e) => {
289                        let _: () = try_!(Err(e));
290                    }
291                }
292            }
293
294            let deleted_object = DeletedObject {
295                key: Some(object.key),
296                version_id: object.version_id,
297                ..Default::default()
298            };
299
300            deleted_objects.push(deleted_object);
301        }
302
303        let output = DeleteObjectsOutput {
304            deleted: Some(deleted_objects),
305            ..Default::default()
306        };
307        Ok(S3Response::new(output))
308    }
309
310    #[tracing::instrument]
311    async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
312        let input = req.input;
313        let path = self.get_bucket_path(&input.bucket)?;
314
315        if !path.exists() {
316            return Err(s3_error!(NoSuchBucket));
317        }
318
319        let output = GetBucketLocationOutput::default();
320        Ok(S3Response::new(output))
321    }
322
323    #[tracing::instrument]
324    async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
325        let input = req.input;
326        let object_path = self.get_object_path(&input.bucket, &input.key)?;
327
328        let mut file = fs::File::open(&object_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
329
330        let file_metadata = try_!(file.metadata().await);
331        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
332        let file_len = file_metadata.len();
333
334        let (content_length, content_range) = match input.range {
335            None => (file_len, None),
336            Some(range) => {
337                let file_range = range.check(file_len)?;
338                let content_length = file_range.end - file_range.start;
339                let content_range = fmt_content_range(file_range.start, file_range.end - 1, file_len);
340                (content_length, Some(content_range))
341            }
342        };
343        let content_length_usize = try_!(usize::try_from(content_length));
344        let content_length_i64 = try_!(i64::try_from(content_length));
345
346        match input.range {
347            Some(Range::Int { first, .. }) => {
348                try_!(file.seek(io::SeekFrom::Start(first)).await);
349            }
350            Some(Range::Suffix { length }) => {
351                let neg_offset = length.numeric_cast::<i64>().neg();
352                try_!(file.seek(io::SeekFrom::End(neg_offset)).await);
353            }
354            None => {}
355        }
356
357        let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);
358
359        let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
360
361        let info = self.load_internal_info(&input.bucket, &input.key).await?;
362
363        let md5_sum = match info.as_ref().and_then(crate::checksum::load_e_tag) {
364            Some(e_tag) => e_tag,
365            None => self.get_md5_sum(&input.bucket, &input.key).await?,
366        };
367
368        let checksum = match &info {
369            // S3 skips returning the checksum if a range is specified that is
370            // less than the whole file
371            Some(info) if content_length == file_len => crate::checksum::from_internal_info(info),
372            _ => default(),
373        };
374
375        #[allow(clippy::redundant_closure_for_method_calls)]
376        let output = GetObjectOutput {
377            body: Some(StreamingBlob::wrap(body)),
378            content_length: Some(content_length_i64),
379            content_range,
380            last_modified: Some(last_modified),
381            metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
382            content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
383            content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
384            content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
385            content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
386            cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
387            expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
388            website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
389            e_tag: Some(ETag::Strong(md5_sum)),
390            checksum_crc32: checksum.checksum_crc32,
391            checksum_crc32c: checksum.checksum_crc32c,
392            checksum_sha1: checksum.checksum_sha1,
393            checksum_sha256: checksum.checksum_sha256,
394            checksum_crc64nvme: checksum.checksum_crc64nvme,
395            ..Default::default()
396        };
397        Ok(S3Response::new(output))
398    }
399
400    #[tracing::instrument]
401    async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
402        let input = req.input;
403        let path = self.get_bucket_path(&input.bucket)?;
404
405        if !path.exists() {
406            return Err(s3_error!(NoSuchBucket));
407        }
408
409        Ok(S3Response::new(HeadBucketOutput::default()))
410    }
411
412    #[tracing::instrument]
413    async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
414        let input = req.input;
415        let path = self.get_object_path(&input.bucket, &input.key)?;
416
417        if !path.exists() {
418            if self.get_bucket_path(&input.bucket)?.exists().not() {
419                return Err(s3_error!(NoSuchBucket));
420            }
421            return Err(s3_error!(NoSuchKey));
422        }
423
424        let file_metadata = try_!(fs::metadata(path).await);
425        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
426        let file_len = file_metadata.len();
427
428        let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
429
430        let info = self.load_internal_info(&input.bucket, &input.key).await?;
431
432        let md5_sum = match info.as_ref().and_then(crate::checksum::load_e_tag) {
433            Some(e_tag) => e_tag,
434            None => self.get_md5_sum(&input.bucket, &input.key).await?,
435        };
436
437        let checksum = match &info {
438            Some(info) => crate::checksum::from_internal_info(info),
439            _ => default(),
440        };
441
442        #[allow(clippy::redundant_closure_for_method_calls)]
443        let output = HeadObjectOutput {
444            content_length: Some(try_!(i64::try_from(file_len))),
445            content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
446            content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
447            content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
448            content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
449            cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
450            expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
451            website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
452            last_modified: Some(last_modified),
453            metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
454            e_tag: Some(ETag::Strong(md5_sum)),
455            checksum_crc32: checksum.checksum_crc32,
456            checksum_crc32c: checksum.checksum_crc32c,
457            checksum_sha1: checksum.checksum_sha1,
458            checksum_sha256: checksum.checksum_sha256,
459            checksum_crc64nvme: checksum.checksum_crc64nvme,
460            ..Default::default()
461        };
462        Ok(S3Response::new(output))
463    }
464
465    #[tracing::instrument]
466    async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
467        let mut buckets: Vec<Bucket> = Vec::new();
468        let mut iter = try_!(fs::read_dir(&self.root).await);
469        while let Some(entry) = try_!(iter.next_entry().await) {
470            let file_type = try_!(entry.file_type().await);
471            if file_type.is_dir().not() {
472                continue;
473            }
474
475            let file_name = entry.file_name();
476            let Some(name) = file_name.to_str() else { continue };
477            if s3s::path::check_bucket_name(name).not() {
478                continue;
479            }
480
481            let file_meta = try_!(entry.metadata().await);
482            // Not all filesystems/mounts provide all file attributes like created timestamp,
483            // therefore we try to fallback to modified if possible.
484            // See https://github.com/Nugine/s3s/pull/22 for more details.
485            let created_or_modified_date = Timestamp::from(try_!(file_meta.created().or(file_meta.modified())));
486
487            let bucket = Bucket {
488                creation_date: Some(created_or_modified_date),
489                name: Some(name.to_owned()),
490                bucket_region: None,
491            };
492            buckets.push(bucket);
493        }
494
495        let output = ListBucketsOutput {
496            buckets: Some(buckets),
497            owner: None,
498            ..Default::default()
499        };
500        Ok(S3Response::new(output))
501    }
502
503    #[tracing::instrument]
504    async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
505        let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
506
507        Ok(v2_resp.map_output(|v2| ListObjectsOutput {
508            contents: v2.contents,
509            common_prefixes: v2.common_prefixes,
510            delimiter: v2.delimiter,
511            encoding_type: v2.encoding_type,
512            name: v2.name,
513            prefix: v2.prefix,
514            max_keys: v2.max_keys,
515            is_truncated: v2.is_truncated,
516            next_marker: v2.next_continuation_token,
517            ..Default::default()
518        }))
519    }
520
521    #[tracing::instrument]
522    async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
523        let input = req.input;
524        let path = self.get_bucket_path(&input.bucket)?;
525
526        if path.exists().not() {
527            return Err(s3_error!(NoSuchBucket));
528        }
529
530        let delimiter = input.delimiter.as_deref();
531        let prefix = input.prefix.as_deref().unwrap_or("").trim_start_matches('/');
532        let max_keys = input.max_keys.unwrap_or(1000);
533
534        // Collect all matching objects and common prefixes
535        let mut objects: Vec<Object> = default();
536        let mut common_prefixes = std::collections::BTreeSet::new();
537
538        if let Some(delimiter) = delimiter {
539            self.list_objects_with_delimiter(&path, prefix, delimiter, &mut objects, &mut common_prefixes)
540                .await?;
541        } else {
542            self.list_objects_recursive(&path, prefix, &mut objects).await?;
543        }
544
545        // Sort before filtering and limiting
546        objects.sort_by(|lhs, rhs| {
547            let lhs_key = lhs.key.as_deref().unwrap_or("");
548            let rhs_key = rhs.key.as_deref().unwrap_or("");
549            lhs_key.cmp(rhs_key)
550        });
551
552        let start_after = match (input.continuation_token.as_deref(), input.start_after.as_deref()) {
553            (Some(ct), Some(sa)) => Some(if ct >= sa { ct } else { sa }),
554            (Some(ct), None) => Some(ct),
555            (None, Some(sa)) => Some(sa),
556            (None, None) => None,
557        };
558
559        // Filter out objects and common prefixes at or before the resume point
560        if let Some(marker) = start_after {
561            objects.retain(|obj| obj.key.as_deref().unwrap_or("") > marker);
562            common_prefixes.retain(|cp| cp.as_str() > marker);
563        }
564
565        // Convert common_prefixes to sorted list
566        let common_prefixes_list: Vec<CommonPrefix> = common_prefixes
567            .into_iter()
568            .map(|prefix| CommonPrefix { prefix: Some(prefix) })
569            .collect();
570
571        // Limit results to max_keys by interleaving objects and common_prefixes
572        let mut result_objects = Vec::new();
573        let mut result_prefixes = Vec::new();
574        let mut total_count = 0;
575        let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
576        let mut last_key: Option<String> = None;
577
578        let mut obj_idx = 0;
579        let mut prefix_idx = 0;
580
581        while total_count < max_keys_usize {
582            let obj_key = objects.get(obj_idx).and_then(|o| o.key.as_deref());
583            let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.as_deref());
584
585            match (obj_key, prefix_key) {
586                (Some(ok), Some(pk)) => {
587                    if ok < pk {
588                        last_key = Some(ok.to_owned());
589                        result_objects.push(objects[obj_idx].clone());
590                        obj_idx += 1;
591                    } else {
592                        last_key = Some(pk.to_owned());
593                        result_prefixes.push(common_prefixes_list[prefix_idx].clone());
594                        prefix_idx += 1;
595                    }
596                    total_count += 1;
597                }
598                (Some(ok), None) => {
599                    last_key = Some(ok.to_owned());
600                    result_objects.push(objects[obj_idx].clone());
601                    obj_idx += 1;
602                    total_count += 1;
603                }
604                (None, Some(pk)) => {
605                    last_key = Some(pk.to_owned());
606                    result_prefixes.push(common_prefixes_list[prefix_idx].clone());
607                    prefix_idx += 1;
608                    total_count += 1;
609                }
610                (None, None) => break,
611            }
612        }
613
614        let is_truncated = max_keys_usize > 0 && (obj_idx < objects.len() || prefix_idx < common_prefixes_list.len());
615        let key_count = try_!(i32::try_from(total_count));
616        let next_continuation_token = if is_truncated {
617            last_key.or_else(|| {
618                let obj_key = objects.get(obj_idx).and_then(|o| o.key.clone());
619                let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.clone());
620                match (obj_key, prefix_key) {
621                    (Some(ok), Some(pk)) => Some(if ok < pk { ok } else { pk }),
622                    (Some(ok), None) => Some(ok),
623                    (None, Some(pk)) => Some(pk),
624                    (None, None) => None,
625                }
626            })
627        } else {
628            None
629        };
630
631        let contents = result_objects.is_empty().not().then_some(result_objects);
632        let common_prefixes = result_prefixes.is_empty().not().then_some(result_prefixes);
633
634        let output = ListObjectsV2Output {
635            key_count: Some(key_count),
636            max_keys: Some(max_keys),
637            is_truncated: Some(is_truncated),
638            contents,
639            common_prefixes,
640            continuation_token: input.continuation_token,
641            next_continuation_token,
642            delimiter: input.delimiter,
643            encoding_type: input.encoding_type,
644            name: Some(input.bucket),
645            prefix: input.prefix,
646            start_after: input.start_after,
647            ..Default::default()
648        };
649        Ok(S3Response::new(output))
650    }
651
652    #[tracing::instrument]
653    async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
654        use crate::fs::ObjectAttributes;
655
656        let mut input = req.input;
657        if let Some(ref storage_class) = input.storage_class {
658            let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
659            if !is_valid {
660                return Err(s3_error!(InvalidStorageClass));
661            }
662        }
663
664        let PutObjectInput {
665            body,
666            bucket,
667            key,
668            metadata,
669            content_length,
670            content_md5,
671            content_encoding,
672            content_type,
673            content_disposition,
674            content_language,
675            cache_control,
676            expires,
677            website_redirect_location,
678            if_match,
679            if_none_match,
680            ..
681        } = input;
682
683        let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
684
685        // Check conditional headers before modifying any state.
686        // If-None-Match: * means "only create if the object doesn't exist".
687        // If-Match: <etag> means "only overwrite if ETag matches" (CAS).
688        let object_path = self.get_object_path(&bucket, &key)?;
689        if let Some(ref condition) = if_none_match
690            && condition.is_any()
691            && object_path.exists()
692        {
693            return Err(s3_error!(PreconditionFailed, "Object already exists"));
694        }
695        if let Some(ref condition) = if_match {
696            if !object_path.exists() {
697                return Err(s3_error!(PreconditionFailed, "Object does not exist"));
698            }
699            if let ETagCondition::ETag(expected) = condition {
700                let info = self.load_internal_info(&bucket, &key).await?;
701                let etag_value = match info.as_ref().and_then(crate::checksum::load_e_tag) {
702                    Some(v) => v,
703                    None => self.get_md5_sum(&bucket, &key).await?,
704                };
705                if !ETag::Strong(etag_value).strong_cmp(expected) {
706                    return Err(s3_error!(PreconditionFailed, "ETag does not match"));
707                }
708            }
709        }
710
711        let mut checksum: s3s::checksum::ChecksumHasher = default();
712        if input.checksum_crc32.is_some() {
713            checksum.crc32 = Some(default());
714        }
715        if input.checksum_crc32c.is_some() {
716            checksum.crc32c = Some(default());
717        }
718        if input.checksum_sha1.is_some() {
719            checksum.sha1 = Some(default());
720        }
721        if input.checksum_sha256.is_some() {
722            checksum.sha256 = Some(default());
723        }
724        if input.checksum_crc64nvme.is_some() {
725            checksum.crc64nvme = Some(default());
726        }
727        if let Some(alg) = input.checksum_algorithm {
728            match alg.as_str() {
729                ChecksumAlgorithm::CRC32 => checksum.crc32 = Some(default()),
730                ChecksumAlgorithm::CRC32C => checksum.crc32c = Some(default()),
731                ChecksumAlgorithm::SHA1 => checksum.sha1 = Some(default()),
732                ChecksumAlgorithm::SHA256 => checksum.sha256 = Some(default()),
733                ChecksumAlgorithm::CRC64NVME => checksum.crc64nvme = Some(default()),
734                _ => return Err(s3_error!(NotImplemented, "Unsupported checksum algorithm")),
735            }
736        }
737
738        if key.ends_with('/') {
739            if let Some(len) = content_length
740                && len > 0
741            {
742                return Err(s3_error!(UnexpectedContent, "Unexpected request body when creating a directory object."));
743            }
744            try_!(fs::create_dir_all(&object_path).await);
745            let output = PutObjectOutput::default();
746            return Ok(S3Response::new(output));
747        }
748
749        let mut file_writer = self.prepare_file_write(&object_path).await?;
750
751        let mut md5_hash = Md5::new();
752        let stream = body.inspect_ok(|bytes| {
753            md5_hash.update(bytes.as_ref());
754            checksum.update(bytes.as_ref());
755        });
756
757        let size = copy_bytes(stream, file_writer.writer()).await?;
758        file_writer.done().await?;
759
760        let md5_sum = hex(md5_hash.finalize());
761
762        if let Some(content_md5) = content_md5 {
763            let content_md5 = base64_simd::STANDARD
764                .decode_to_vec(content_md5)
765                .map_err(|_| s3_error!(InvalidArgument))?;
766            let content_md5 = hex(content_md5);
767            if content_md5 != md5_sum {
768                return Err(s3_error!(BadDigest, "content_md5 mismatch"));
769            }
770        }
771
772        let checksum = checksum.finalize();
773
774        if let Some(trailers) = req.trailing_headers
775            && let Some(trailers) = trailers.take()
776        {
777            if let Some(crc32) = trailers.get("x-amz-checksum-crc32") {
778                input.checksum_crc32 = Some(crc32.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
779            }
780            if let Some(crc32c) = trailers.get("x-amz-checksum-crc32c") {
781                input.checksum_crc32c = Some(crc32c.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
782            }
783            if let Some(sha1) = trailers.get("x-amz-checksum-sha1") {
784                input.checksum_sha1 = Some(sha1.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
785            }
786            if let Some(sha256) = trailers.get("x-amz-checksum-sha256") {
787                input.checksum_sha256 = Some(sha256.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
788            }
789            if let Some(crc64nvme) = trailers.get("x-amz-checksum-crc64nvme") {
790                input.checksum_crc64nvme = Some(crc64nvme.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
791            }
792        }
793
794        if checksum.checksum_crc32 != input.checksum_crc32 {
795            return Err(s3_error!(
796                BadDigest,
797                "checksum_crc32 mismatch: expected `{}`, got `{}`",
798                input.checksum_crc32.unwrap_or_default(),
799                checksum.checksum_crc32.unwrap_or_default()
800            ));
801        }
802        if checksum.checksum_crc32c != input.checksum_crc32c {
803            return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
804        }
805        if checksum.checksum_sha1 != input.checksum_sha1 {
806            return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
807        }
808        if checksum.checksum_sha256 != input.checksum_sha256 {
809            return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
810        }
811        if checksum.checksum_crc64nvme != input.checksum_crc64nvme {
812            return Err(s3_error!(BadDigest, "checksum_crc64nvme mismatch"));
813        }
814
815        debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");
816
817        // Save object attributes (including user metadata and standard attributes)
818        let mut obj_attrs = ObjectAttributes {
819            user_metadata: metadata,
820            content_encoding,
821            content_type,
822            content_disposition,
823            content_language,
824            cache_control,
825            expires: None,
826            website_redirect_location,
827        };
828        obj_attrs.set_expires_timestamp(expires);
829        self.save_object_attributes(&bucket, &key, &obj_attrs, None).await?;
830
831        let mut info: InternalInfo = default();
832        crate::checksum::save_e_tag(&mut info, &md5_sum);
833        crate::checksum::modify_internal_info(&mut info, &checksum);
834        self.save_internal_info(&bucket, &key, &info).await?;
835
836        let output = PutObjectOutput {
837            e_tag: Some(ETag::Strong(md5_sum)),
838            checksum_crc32: checksum.checksum_crc32,
839            checksum_crc32c: checksum.checksum_crc32c,
840            checksum_sha1: checksum.checksum_sha1,
841            checksum_sha256: checksum.checksum_sha256,
842            checksum_crc64nvme: checksum.checksum_crc64nvme,
843            ..Default::default()
844        };
845        Ok(S3Response::new(output))
846    }
847
848    #[tracing::instrument]
849    async fn create_multipart_upload(
850        &self,
851        req: S3Request<CreateMultipartUploadInput>,
852    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
853        use crate::fs::ObjectAttributes;
854
855        let input = req.input;
856        let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;
857
858        // Save object attributes (including user metadata and standard attributes)
859        let mut obj_attrs = ObjectAttributes {
860            user_metadata: input.metadata,
861            content_encoding: input.content_encoding,
862            content_type: input.content_type,
863            content_disposition: input.content_disposition,
864            content_language: input.content_language,
865            cache_control: input.cache_control,
866            expires: None,
867            website_redirect_location: input.website_redirect_location,
868        };
869        obj_attrs.set_expires_timestamp(input.expires);
870        self.save_object_attributes(&input.bucket, &input.key, &obj_attrs, Some(upload_id))
871            .await?;
872
873        let output = CreateMultipartUploadOutput {
874            bucket: Some(input.bucket),
875            key: Some(input.key),
876            upload_id: Some(upload_id.to_string()),
877            ..Default::default()
878        };
879
880        Ok(S3Response::new(output))
881    }
882
883    #[tracing::instrument]
884    async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
885        let UploadPartInput {
886            body,
887            upload_id,
888            part_number,
889            ..
890        } = req.input;
891
892        if part_number > 10_000 {
893            return Err(s3_error!(
894                InvalidArgument,
895                "Part number must be an integer between 1 and 10000, inclusive"
896            ));
897        }
898
899        let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
900
901        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
902        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
903            return Err(s3_error!(AccessDenied));
904        }
905
906        let file_path = self.resolve_upload_part_path(upload_id, part_number)?;
907
908        let mut md5_hash = Md5::new();
909        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
910
911        let mut file_writer = self.prepare_file_write(&file_path).await?;
912        let size = copy_bytes(stream, file_writer.writer()).await?;
913        file_writer.done().await?;
914
915        let md5_sum = hex(md5_hash.finalize());
916
917        debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
918
919        let output = UploadPartOutput {
920            e_tag: Some(ETag::Strong(md5_sum)),
921            ..Default::default()
922        };
923        Ok(S3Response::new(output))
924    }
925
926    #[tracing::instrument]
927    async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
928        let input = req.input;
929
930        let upload_id = Uuid::parse_str(&input.upload_id).map_err(|_| s3_error!(InvalidRequest))?;
931        let part_number = input.part_number;
932        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
933            return Err(s3_error!(AccessDenied));
934        }
935
936        let (src_bucket, src_key) = match input.copy_source {
937            CopySource::AccessPoint { .. } | CopySource::Outpost { .. } => return Err(s3_error!(NotImplemented)),
938            CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
939        };
940        let src_path = self.get_object_path(src_bucket, src_key)?;
941        let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;
942
943        let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
944        let file_len = try_!(src_file.metadata().await).len();
945
946        let (start, content_length) = if let Some(copy_range) = &input.copy_source_range {
947            if !copy_range.starts_with("bytes=") {
948                return Err(s3_error!(InvalidArgument));
949            }
950            let range = &copy_range["bytes=".len()..];
951            let parts: Vec<&str> = range.split('-').collect();
952            if parts.len() != 2 {
953                return Err(s3_error!(InvalidArgument));
954            }
955
956            let start: u64 = parts[0].parse().map_err(|_| s3_error!(InvalidArgument))?;
957            let end_inclusive = if parts[1].is_empty() {
958                file_len.saturating_sub(1)
959            } else {
960                parts[1].parse().map_err(|_| s3_error!(InvalidArgument))?
961            };
962            if start > end_inclusive || start >= file_len || end_inclusive >= file_len {
963                return Err(s3_error!(InvalidRange));
964            }
965            let content_length = end_inclusive - start + 1;
966            (start, content_length)
967        } else {
968            (0, file_len)
969        };
970        let content_length_usize = try_!(usize::try_from(content_length));
971
972        let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
973        let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));
974
975        let mut md5_hash = Md5::new();
976        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
977
978        let mut file_writer = self.prepare_file_write(&dst_path).await?;
979        let size = copy_bytes(stream, file_writer.writer()).await?;
980        file_writer.done().await?;
981
982        let md5_sum = hex(md5_hash.finalize());
983
984        debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
985
986        let output = UploadPartCopyOutput {
987            copy_part_result: Some(CopyPartResult {
988                e_tag: Some(ETag::Strong(md5_sum)),
989                ..Default::default()
990            }),
991            ..Default::default()
992        };
993
994        Ok(S3Response::new(output))
995    }
996
997    #[tracing::instrument]
998    async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
999        let ListPartsInput {
1000            bucket, key, upload_id, ..
1001        } = req.input;
1002
1003        let mut parts: Vec<Part> = Vec::new();
1004        let mut iter = try_!(fs::read_dir(&self.root).await);
1005
1006        let prefix = format!(".upload_id-{upload_id}");
1007
1008        while let Some(entry) = try_!(iter.next_entry().await) {
1009            let file_type = try_!(entry.file_type().await);
1010            if file_type.is_file().not() {
1011                continue;
1012            }
1013
1014            let file_name = entry.file_name();
1015            let Some(name) = file_name.to_str() else { continue };
1016
1017            let Some(part_segment) = name.strip_prefix(&prefix) else { continue };
1018            let Some(part_number) = part_segment.strip_prefix(".part-") else { continue };
1019            let part_number = part_number.parse::<i32>().unwrap();
1020
1021            let file_meta = try_!(entry.metadata().await);
1022            let last_modified = Timestamp::from(try_!(file_meta.modified()));
1023            let size = try_!(i64::try_from(file_meta.len()));
1024
1025            let part = Part {
1026                last_modified: Some(last_modified),
1027                part_number: Some(part_number),
1028                size: Some(size),
1029                ..Default::default()
1030            };
1031            parts.push(part);
1032        }
1033
1034        let output = ListPartsOutput {
1035            bucket: Some(bucket),
1036            key: Some(key),
1037            upload_id: Some(upload_id),
1038            parts: Some(parts),
1039            ..Default::default()
1040        };
1041        Ok(S3Response::new(output))
1042    }
1043
1044    #[tracing::instrument]
1045    async fn complete_multipart_upload(
1046        &self,
1047        req: S3Request<CompleteMultipartUploadInput>,
1048    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1049        let CompleteMultipartUploadInput {
1050            multipart_upload,
1051            bucket,
1052            key,
1053            upload_id,
1054            if_match,
1055            if_none_match,
1056            ..
1057        } = req.input;
1058
1059        let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
1060
1061        let parts_count = multipart_upload.parts.as_ref().map_or(0, Vec::len);
1062        if parts_count == 0 {
1063            return Err(s3_error!(InvalidPart, "You must specify at least one part"));
1064        }
1065
1066        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
1067        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
1068            return Err(s3_error!(AccessDenied));
1069        }
1070
1071        // Check conditional headers before modifying any state
1072        let object_path = self.get_object_path(&bucket, &key)?;
1073        if let Some(ref condition) = if_none_match
1074            && condition.is_any()
1075            && object_path.exists()
1076        {
1077            return Err(s3_error!(PreconditionFailed, "Object already exists"));
1078        }
1079        if let Some(ref condition) = if_match {
1080            if condition.is_any() {
1081                // If-Match: * — require the object to exist
1082                if !object_path.exists() {
1083                    return Err(s3_error!(PreconditionFailed, "Object does not exist"));
1084                }
1085            } else if let Some(expected_etag) = condition.as_etag() {
1086                if object_path.exists() {
1087                    let info = self.load_internal_info(&bucket, &key).await?;
1088                    let etag_value = match info.as_ref().and_then(crate::checksum::load_e_tag) {
1089                        Some(e_tag) => e_tag,
1090                        None => self.get_md5_sum(&bucket, &key).await?,
1091                    };
1092                    let existing_etag = ETag::Strong(etag_value);
1093                    if !expected_etag.strong_cmp(&existing_etag) {
1094                        return Err(s3_error!(PreconditionFailed, "ETag does not match"));
1095                    }
1096                } else {
1097                    return Err(s3_error!(PreconditionFailed, "Object does not exist"));
1098                }
1099            }
1100        }
1101
1102        self.delete_upload_id(&upload_id).await?;
1103
1104        if let Ok(Some(attrs)) = self.load_object_attributes(&bucket, &key, Some(upload_id)).await {
1105            self.save_object_attributes(&bucket, &key, &attrs, None).await?;
1106            let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
1107        }
1108
1109        let mut file_writer = self.prepare_file_write(&object_path).await?;
1110
1111        let mut cnt: i32 = 0;
1112        let total_parts_cnt = i32::try_from(parts_count).expect("total number of parts must be <= 10000.");
1113
1114        let mut part_md5_hashes: Vec<[u8; 16]> = Vec::new();
1115        let mut buf = vec![0u8; 65536];
1116
1117        for part in multipart_upload.parts.into_iter().flatten() {
1118            let part_number = part
1119                .part_number
1120                .ok_or_else(|| s3_error!(InvalidRequest, "missing part number"))?;
1121            cnt += 1;
1122            if part_number != cnt {
1123                return Err(s3_error!(InvalidRequest, "invalid part order"));
1124            }
1125
1126            let part_path = self.resolve_upload_part_path(upload_id, part_number)?;
1127
1128            let mut reader = try_!(fs::File::open(&part_path).await);
1129            let mut part_md5 = Md5::new();
1130            let mut size: u64 = 0;
1131            loop {
1132                let nread = try_!(reader.read(&mut buf).await);
1133                if nread == 0 {
1134                    break;
1135                }
1136                part_md5.update(&buf[..nread]);
1137                try_!(file_writer.writer().write_all(&buf[..nread]).await);
1138                size += nread as u64;
1139            }
1140            try_!(file_writer.writer().flush().await);
1141            part_md5_hashes.push(part_md5.finalize());
1142
1143            if part_number != total_parts_cnt && size < 5 * 1024 * 1024 {
1144                return Err(s3_error!(EntityTooSmall));
1145            }
1146
1147            debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
1148            try_!(fs::remove_file(&part_path).await);
1149        }
1150        file_writer.done().await?;
1151
1152        // Compute multipart ETag: MD5 of concatenated part MD5 hashes, suffixed with part count
1153        let mut etag_hash = Md5::new();
1154        for hash in &part_md5_hashes {
1155            etag_hash.update(hash);
1156        }
1157        let e_tag = format!("{}-{}", hex(etag_hash.finalize()), part_md5_hashes.len());
1158
1159        debug!(?e_tag, path = %object_path.display(), "multipart etag");
1160
1161        {
1162            let mut info = self.load_internal_info(&bucket, &key).await?.unwrap_or_default();
1163            crate::checksum::save_e_tag(&mut info, &e_tag);
1164            self.save_internal_info(&bucket, &key, &info).await?;
1165        }
1166
1167        let output = CompleteMultipartUploadOutput {
1168            // TODO: better example of AWS-like keep-alive behavior
1169            future: Some(Box::pin(async move {
1170                Ok(CompleteMultipartUploadOutput {
1171                    bucket: Some(bucket),
1172                    key: Some(key),
1173                    e_tag: Some(ETag::Strong(e_tag)),
1174                    ..Default::default()
1175                })
1176            })),
1177            ..Default::default()
1178        };
1179
1180        debug!(?output);
1181
1182        Ok(S3Response::new(output))
1183    }
1184
1185    #[tracing::instrument]
1186    async fn abort_multipart_upload(
1187        &self,
1188        req: S3Request<AbortMultipartUploadInput>,
1189    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1190        let AbortMultipartUploadInput {
1191            bucket, key, upload_id, ..
1192        } = req.input;
1193
1194        let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
1195        if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
1196            return Err(s3_error!(AccessDenied));
1197        }
1198
1199        let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
1200
1201        let prefix = format!(".upload_id-{upload_id}");
1202        let mut iter = try_!(fs::read_dir(&self.root).await);
1203        while let Some(entry) = try_!(iter.next_entry().await) {
1204            let file_type = try_!(entry.file_type().await);
1205            if file_type.is_file().not() {
1206                continue;
1207            }
1208
1209            let file_name = entry.file_name();
1210            let Some(name) = file_name.to_str() else { continue };
1211
1212            if name.starts_with(&prefix) {
1213                try_!(fs::remove_file(entry.path()).await);
1214            }
1215        }
1216
1217        self.delete_upload_id(&upload_id).await?;
1218
1219        debug!(bucket = %bucket, key = %key, upload_id = %upload_id, "multipart upload aborted");
1220
1221        Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
1222    }
1223}
1224
1225impl FileSystem {
1226    async fn list_objects_recursive(&self, bucket_root: &Path, prefix: &str, objects: &mut Vec<Object>) -> S3Result<()> {
1227        let mut dir_queue: VecDeque<PathBuf> = default();
1228        dir_queue.push_back(bucket_root.to_owned());
1229        let prefix_is_empty = prefix.is_empty();
1230
1231        while let Some(dir) = dir_queue.pop_front() {
1232            let mut iter = try_!(fs::read_dir(dir).await);
1233            while let Some(entry) = try_!(iter.next_entry().await) {
1234                let file_type = try_!(entry.file_type().await);
1235                if file_type.is_dir() {
1236                    dir_queue.push_back(entry.path());
1237                } else {
1238                    let file_path = entry.path();
1239                    let key = try_!(file_path.strip_prefix(bucket_root));
1240                    let Some(key_str) = normalize_path(key, "/") else {
1241                        continue;
1242                    };
1243
1244                    if !prefix_is_empty && !key_str.starts_with(prefix) {
1245                        continue;
1246                    }
1247
1248                    let metadata = try_!(entry.metadata().await);
1249                    let last_modified = Timestamp::from(try_!(metadata.modified()));
1250                    let size = metadata.len();
1251
1252                    let object = Object {
1253                        key: Some(key_str),
1254                        last_modified: Some(last_modified),
1255                        size: Some(try_!(i64::try_from(size))),
1256                        ..Default::default()
1257                    };
1258                    objects.push(object);
1259                }
1260            }
1261        }
1262
1263        Ok(())
1264    }
1265
1266    async fn list_objects_with_delimiter(
1267        &self,
1268        bucket_root: &Path,
1269        prefix: &str,
1270        delimiter: &str,
1271        objects: &mut Vec<Object>,
1272        common_prefixes: &mut std::collections::BTreeSet<String>,
1273    ) -> S3Result<()> {
1274        // For delimiter-based listing, we need to recursively scan all files
1275        // but group them according to the delimiter rules
1276        let mut dir_queue: VecDeque<PathBuf> = default();
1277        dir_queue.push_back(bucket_root.to_owned());
1278        let prefix_is_empty = prefix.is_empty();
1279
1280        while let Some(dir) = dir_queue.pop_front() {
1281            let mut iter = try_!(fs::read_dir(dir).await);
1282
1283            while let Some(entry) = try_!(iter.next_entry().await) {
1284                let file_type = try_!(entry.file_type().await);
1285                let entry_path = entry.path();
1286
1287                // Calculate the key relative to the bucket root
1288                let key = try_!(entry_path.strip_prefix(bucket_root));
1289                let Some(key_str) = normalize_path(key, "/") else {
1290                    continue;
1291                };
1292
1293                // Skip if doesn't match prefix
1294                if !prefix_is_empty && !key_str.starts_with(prefix) {
1295                    // For directories, also skip if they don't have potential to contain matching files
1296                    if file_type.is_dir() && !prefix.starts_with(&key_str) && !key_str.starts_with(prefix) {
1297                        continue;
1298                    }
1299                    if file_type.is_file() {
1300                        continue;
1301                    }
1302                }
1303
1304                if file_type.is_dir() {
1305                    // Continue scanning this directory
1306                    dir_queue.push_back(entry_path);
1307                } else {
1308                    // For files, determine if they should be listed directly or as common prefixes
1309                    let remaining = &key_str[prefix.len()..];
1310
1311                    if remaining.contains(delimiter) {
1312                        // File is in a subdirectory, add the subdirectory as common prefix
1313                        if let Some(delimiter_pos) = remaining.find(delimiter) {
1314                            let mut next_prefix = String::with_capacity(prefix.len() + delimiter_pos + 1);
1315                            next_prefix.push_str(prefix);
1316                            next_prefix.push_str(&remaining[..=delimiter_pos]);
1317                            common_prefixes.insert(next_prefix);
1318                        }
1319                    } else {
1320                        // File is at the current level, include it in objects
1321                        let metadata = try_!(entry.metadata().await);
1322                        let last_modified = Timestamp::from(try_!(metadata.modified()));
1323                        let size = metadata.len();
1324
1325                        let object = Object {
1326                            key: Some(key_str),
1327                            last_modified: Some(last_modified),
1328                            size: Some(try_!(i64::try_from(size))),
1329                            ..Default::default()
1330                        };
1331                        objects.push(object);
1332                    }
1333                }
1334            }
1335        }
1336
1337        Ok(())
1338    }
1339}