1use crate::fs::FileSystem;
2use crate::fs::InternalInfo;
3use crate::utils::*;
4
5use s3s::S3;
6use s3s::S3Result;
7use s3s::crypto::Checksum;
8use s3s::crypto::Md5;
9use s3s::dto::*;
10use s3s::s3_error;
11use s3s::{S3Request, S3Response};
12
13use std::collections::VecDeque;
14use std::fs::FileTimes;
15use std::io;
16use std::ops::Neg;
17use std::ops::Not;
18use std::path::Component;
19use std::path::{Path, PathBuf};
20use std::time::SystemTime;
21
22use tokio::fs;
23use tokio::io::AsyncReadExt;
24use tokio::io::AsyncSeekExt;
25use tokio::io::AsyncWriteExt;
26use tokio_util::io::ReaderStream;
27
28use futures::TryStreamExt;
29use numeric_cast::NumericCast;
30use stdx::default::default;
31use tracing::debug;
32use uuid::Uuid;
33
34fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
35 let mut normalized = String::new();
36 let mut first = true;
37 for component in path.components() {
38 match component {
39 Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
40 return None;
41 }
42 Component::Normal(name) => {
43 let name = name.to_str()?;
44 if !first {
45 normalized.push_str(delimiter);
46 }
47 normalized.push_str(name);
48 first = false;
49 }
50 }
51 }
52 Some(normalized)
53}
54
55fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String {
57 format!("bytes {start}-{end_inclusive}/{size}")
58}
59
60#[async_trait::async_trait]
61impl S3 for FileSystem {
62 #[tracing::instrument]
63 async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
64 let input = req.input;
65 let path = self.get_bucket_path(&input.bucket)?;
66
67 if path.exists() {
68 return Err(s3_error!(BucketAlreadyExists));
69 }
70
71 try_!(fs::create_dir(&path).await);
72
73 let output = CreateBucketOutput::default(); Ok(S3Response::new(output))
75 }
76
77 #[tracing::instrument]
78 async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
79 let input = req.input;
80 let (bucket, key) = match input.copy_source {
81 CopySource::AccessPoint { .. } | CopySource::Outpost { .. } => return Err(s3_error!(NotImplemented)),
82 CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
83 };
84
85 let src_path = self.get_object_path(bucket, key)?;
86 let dst_path = self.get_object_path(&input.bucket, &input.key)?;
87
88 if src_path.exists().not() {
89 return Err(s3_error!(NoSuchKey));
90 }
91
92 if self.get_bucket_path(&input.bucket)?.exists().not() {
93 return Err(s3_error!(NoSuchBucket));
94 }
95
96 let file_metadata = try_!(fs::metadata(&src_path).await);
97 let src_last_modified = Timestamp::from(try_!(file_metadata.modified()));
98
99 let src_info = self.load_internal_info(bucket, key).await?;
101
102 let mut src_etag: Option<ETag> = src_info.as_ref().and_then(crate::checksum::load_e_tag).map(ETag::Strong);
105
106 if let Some(ref condition) = input.copy_source_if_match {
108 if src_etag.is_none() {
109 src_etag = Some(ETag::Strong(self.get_md5_sum(bucket, key).await?));
110 }
111 let src = src_etag.as_ref().ok_or_else(|| s3_error!(InternalError))?;
112 let matches = match condition {
113 ETagCondition::Any => true,
114 ETagCondition::ETag(etag) => src.strong_cmp(etag),
115 };
116 if !matches {
117 return Err(s3_error!(PreconditionFailed));
118 }
119 } else if let Some(ref if_unmodified_since) = input.copy_source_if_unmodified_since
120 && src_last_modified > *if_unmodified_since
121 {
122 return Err(s3_error!(PreconditionFailed));
123 }
124
125 if let Some(ref condition) = input.copy_source_if_none_match {
127 if src_etag.is_none() {
128 src_etag = Some(ETag::Strong(self.get_md5_sum(bucket, key).await?));
129 }
130 let src = src_etag.as_ref().ok_or_else(|| s3_error!(InternalError))?;
131 let matches = match condition {
132 ETagCondition::Any => true,
133 ETagCondition::ETag(etag) => src.weak_cmp(etag),
134 };
135 if matches {
136 return Err(s3_error!(PreconditionFailed));
137 }
138 } else if let Some(ref if_modified_since) = input.copy_source_if_modified_since
139 && src_last_modified <= *if_modified_since
140 {
141 return Err(s3_error!(PreconditionFailed));
142 }
143
144 if let Some(dir_path) = dst_path.parent() {
145 try_!(fs::create_dir_all(&dir_path).await);
146 }
147
148 let dst_last_modified = if src_path == dst_path {
151 let now = SystemTime::now();
152 let file = try_!(std::fs::OpenOptions::new().write(true).open(&dst_path));
153 try_!(file.set_times(FileTimes::new().set_modified(now)));
154 debug!(path = %dst_path.display(), "replace file in place");
155 Timestamp::from(now)
156 } else {
157 let _ = try_!(fs::copy(&src_path, &dst_path).await);
158 debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");
159 let dst_metadata = try_!(fs::metadata(&dst_path).await);
160 Timestamp::from(try_!(dst_metadata.modified()))
161 };
162
163 let dst_etag_str = match src_etag {
167 Some(etag) => etag.into_value(),
168 None => self.get_md5_sum(&input.bucket, &input.key).await?,
169 };
170
171 let replace_metadata = input
179 .metadata_directive
180 .as_ref()
181 .is_some_and(|d| d.as_str() == MetadataDirective::REPLACE);
182
183 if replace_metadata {
184 let mut dst_attrs = crate::fs::ObjectAttributes {
185 user_metadata: input.metadata,
186 content_encoding: input.content_encoding,
187 content_type: input.content_type,
188 content_disposition: input.content_disposition,
189 content_language: input.content_language,
190 cache_control: input.cache_control,
191 expires: None,
192 website_redirect_location: input.website_redirect_location,
193 };
194 dst_attrs.set_expires_timestamp(input.expires);
195 self.save_object_attributes(&input.bucket, &input.key, &dst_attrs, None)
196 .await?;
197 } else {
198 let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
199 if src_metadata_path.exists() {
200 let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
201 if src_metadata_path != dst_metadata_path {
204 let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
205 }
206 }
207 }
208
209 {
210 let mut info = src_info.unwrap_or_default();
211 crate::checksum::save_e_tag(&mut info, &dst_etag_str);
212 self.save_internal_info(&input.bucket, &input.key, &info).await?;
213 }
214
215 let copy_object_result = CopyObjectResult {
216 e_tag: Some(ETag::Strong(dst_etag_str)),
217 last_modified: Some(dst_last_modified),
218 ..Default::default()
219 };
220
221 let output = CopyObjectOutput {
222 copy_object_result: Some(copy_object_result),
223 ..Default::default()
224 };
225 Ok(S3Response::new(output))
226 }
227
228 #[tracing::instrument]
229 async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
230 let input = req.input;
231 let path = self.get_bucket_path(&input.bucket)?;
232 if path.exists() {
233 try_!(fs::remove_dir_all(path).await);
234 } else {
235 return Err(s3_error!(NoSuchBucket));
236 }
237 Ok(S3Response::new(DeleteBucketOutput {}))
238 }
239
240 #[tracing::instrument]
241 async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
242 let input = req.input;
243 let path = self.get_object_path(&input.bucket, &input.key)?;
244 if path.exists().not() {
245 if self.get_bucket_path(&input.bucket)?.exists().not() {
246 return Err(s3_error!(NoSuchBucket));
247 }
248 let output = DeleteObjectOutput::default();
249 return Ok(S3Response::new(output));
250 }
251 if input.key.ends_with('/') {
252 let mut dir = try_!(fs::read_dir(&path).await);
253 let is_empty = try_!(dir.next_entry().await).is_none();
254 if is_empty {
255 try_!(fs::remove_dir(&path).await);
256 }
257 } else {
258 try_!(fs::remove_file(&path).await);
259 }
260 let output = DeleteObjectOutput::default(); Ok(S3Response::new(output))
262 }
263
264 #[tracing::instrument]
265 async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
266 let input = req.input;
267
268 let mut deleted_objects: Vec<DeletedObject> = Vec::new();
269 for object in input.delete.objects {
270 let path = self.get_object_path(&input.bucket, &object.key)?;
271 if object.key.ends_with('/') {
272 match fs::read_dir(&path).await {
273 Ok(mut dir) => {
274 let is_empty = try_!(dir.next_entry().await).is_none();
275 if is_empty {
276 try_!(fs::remove_dir(&path).await);
277 }
278 }
279 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
280 Err(e) => {
281 let _: () = try_!(Err(e));
282 }
283 }
284 } else {
285 match fs::remove_file(&path).await {
286 Ok(()) => {}
287 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
288 Err(e) => {
289 let _: () = try_!(Err(e));
290 }
291 }
292 }
293
294 let deleted_object = DeletedObject {
295 key: Some(object.key),
296 version_id: object.version_id,
297 ..Default::default()
298 };
299
300 deleted_objects.push(deleted_object);
301 }
302
303 let output = DeleteObjectsOutput {
304 deleted: Some(deleted_objects),
305 ..Default::default()
306 };
307 Ok(S3Response::new(output))
308 }
309
310 #[tracing::instrument]
311 async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
312 let input = req.input;
313 let path = self.get_bucket_path(&input.bucket)?;
314
315 if !path.exists() {
316 return Err(s3_error!(NoSuchBucket));
317 }
318
319 let output = GetBucketLocationOutput::default();
320 Ok(S3Response::new(output))
321 }
322
323 #[tracing::instrument]
324 async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
325 let input = req.input;
326 let object_path = self.get_object_path(&input.bucket, &input.key)?;
327
328 let mut file = fs::File::open(&object_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
329
330 let file_metadata = try_!(file.metadata().await);
331 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
332 let file_len = file_metadata.len();
333
334 let (content_length, content_range) = match input.range {
335 None => (file_len, None),
336 Some(range) => {
337 let file_range = range.check(file_len)?;
338 let content_length = file_range.end - file_range.start;
339 let content_range = fmt_content_range(file_range.start, file_range.end - 1, file_len);
340 (content_length, Some(content_range))
341 }
342 };
343 let content_length_usize = try_!(usize::try_from(content_length));
344 let content_length_i64 = try_!(i64::try_from(content_length));
345
346 match input.range {
347 Some(Range::Int { first, .. }) => {
348 try_!(file.seek(io::SeekFrom::Start(first)).await);
349 }
350 Some(Range::Suffix { length }) => {
351 let neg_offset = length.numeric_cast::<i64>().neg();
352 try_!(file.seek(io::SeekFrom::End(neg_offset)).await);
353 }
354 None => {}
355 }
356
357 let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);
358
359 let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
360
361 let info = self.load_internal_info(&input.bucket, &input.key).await?;
362
363 let md5_sum = match info.as_ref().and_then(crate::checksum::load_e_tag) {
364 Some(e_tag) => e_tag,
365 None => self.get_md5_sum(&input.bucket, &input.key).await?,
366 };
367
368 let checksum = match &info {
369 Some(info) if content_length == file_len => crate::checksum::from_internal_info(info),
372 _ => default(),
373 };
374
375 #[allow(clippy::redundant_closure_for_method_calls)]
376 let output = GetObjectOutput {
377 body: Some(StreamingBlob::wrap(body)),
378 content_length: Some(content_length_i64),
379 content_range,
380 last_modified: Some(last_modified),
381 metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
382 content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
383 content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
384 content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
385 content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
386 cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
387 expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
388 website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
389 e_tag: Some(ETag::Strong(md5_sum)),
390 checksum_crc32: checksum.checksum_crc32,
391 checksum_crc32c: checksum.checksum_crc32c,
392 checksum_sha1: checksum.checksum_sha1,
393 checksum_sha256: checksum.checksum_sha256,
394 checksum_crc64nvme: checksum.checksum_crc64nvme,
395 ..Default::default()
396 };
397 Ok(S3Response::new(output))
398 }
399
400 #[tracing::instrument]
401 async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
402 let input = req.input;
403 let path = self.get_bucket_path(&input.bucket)?;
404
405 if !path.exists() {
406 return Err(s3_error!(NoSuchBucket));
407 }
408
409 Ok(S3Response::new(HeadBucketOutput::default()))
410 }
411
412 #[tracing::instrument]
413 async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
414 let input = req.input;
415 let path = self.get_object_path(&input.bucket, &input.key)?;
416
417 if !path.exists() {
418 if self.get_bucket_path(&input.bucket)?.exists().not() {
419 return Err(s3_error!(NoSuchBucket));
420 }
421 return Err(s3_error!(NoSuchKey));
422 }
423
424 let file_metadata = try_!(fs::metadata(path).await);
425 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
426 let file_len = file_metadata.len();
427
428 let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
429
430 let info = self.load_internal_info(&input.bucket, &input.key).await?;
431
432 let md5_sum = match info.as_ref().and_then(crate::checksum::load_e_tag) {
433 Some(e_tag) => e_tag,
434 None => self.get_md5_sum(&input.bucket, &input.key).await?,
435 };
436
437 let checksum = match &info {
438 Some(info) => crate::checksum::from_internal_info(info),
439 _ => default(),
440 };
441
442 #[allow(clippy::redundant_closure_for_method_calls)]
443 let output = HeadObjectOutput {
444 content_length: Some(try_!(i64::try_from(file_len))),
445 content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
446 content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
447 content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
448 content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
449 cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
450 expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
451 website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
452 last_modified: Some(last_modified),
453 metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
454 e_tag: Some(ETag::Strong(md5_sum)),
455 checksum_crc32: checksum.checksum_crc32,
456 checksum_crc32c: checksum.checksum_crc32c,
457 checksum_sha1: checksum.checksum_sha1,
458 checksum_sha256: checksum.checksum_sha256,
459 checksum_crc64nvme: checksum.checksum_crc64nvme,
460 ..Default::default()
461 };
462 Ok(S3Response::new(output))
463 }
464
465 #[tracing::instrument]
466 async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
467 let mut buckets: Vec<Bucket> = Vec::new();
468 let mut iter = try_!(fs::read_dir(&self.root).await);
469 while let Some(entry) = try_!(iter.next_entry().await) {
470 let file_type = try_!(entry.file_type().await);
471 if file_type.is_dir().not() {
472 continue;
473 }
474
475 let file_name = entry.file_name();
476 let Some(name) = file_name.to_str() else { continue };
477 if s3s::path::check_bucket_name(name).not() {
478 continue;
479 }
480
481 let file_meta = try_!(entry.metadata().await);
482 let created_or_modified_date = Timestamp::from(try_!(file_meta.created().or(file_meta.modified())));
486
487 let bucket = Bucket {
488 creation_date: Some(created_or_modified_date),
489 name: Some(name.to_owned()),
490 bucket_region: None,
491 };
492 buckets.push(bucket);
493 }
494
495 let output = ListBucketsOutput {
496 buckets: Some(buckets),
497 owner: None,
498 ..Default::default()
499 };
500 Ok(S3Response::new(output))
501 }
502
503 #[tracing::instrument]
504 async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
505 let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
506
507 Ok(v2_resp.map_output(|v2| ListObjectsOutput {
508 contents: v2.contents,
509 common_prefixes: v2.common_prefixes,
510 delimiter: v2.delimiter,
511 encoding_type: v2.encoding_type,
512 name: v2.name,
513 prefix: v2.prefix,
514 max_keys: v2.max_keys,
515 is_truncated: v2.is_truncated,
516 next_marker: v2.next_continuation_token,
517 ..Default::default()
518 }))
519 }
520
521 #[tracing::instrument]
522 async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
523 let input = req.input;
524 let path = self.get_bucket_path(&input.bucket)?;
525
526 if path.exists().not() {
527 return Err(s3_error!(NoSuchBucket));
528 }
529
530 let delimiter = input.delimiter.as_deref();
531 let prefix = input.prefix.as_deref().unwrap_or("").trim_start_matches('/');
532 let max_keys = input.max_keys.unwrap_or(1000);
533
534 let mut objects: Vec<Object> = default();
536 let mut common_prefixes = std::collections::BTreeSet::new();
537
538 if let Some(delimiter) = delimiter {
539 self.list_objects_with_delimiter(&path, prefix, delimiter, &mut objects, &mut common_prefixes)
540 .await?;
541 } else {
542 self.list_objects_recursive(&path, prefix, &mut objects).await?;
543 }
544
545 objects.sort_by(|lhs, rhs| {
547 let lhs_key = lhs.key.as_deref().unwrap_or("");
548 let rhs_key = rhs.key.as_deref().unwrap_or("");
549 lhs_key.cmp(rhs_key)
550 });
551
552 let start_after = match (input.continuation_token.as_deref(), input.start_after.as_deref()) {
553 (Some(ct), Some(sa)) => Some(if ct >= sa { ct } else { sa }),
554 (Some(ct), None) => Some(ct),
555 (None, Some(sa)) => Some(sa),
556 (None, None) => None,
557 };
558
559 if let Some(marker) = start_after {
561 objects.retain(|obj| obj.key.as_deref().unwrap_or("") > marker);
562 common_prefixes.retain(|cp| cp.as_str() > marker);
563 }
564
565 let common_prefixes_list: Vec<CommonPrefix> = common_prefixes
567 .into_iter()
568 .map(|prefix| CommonPrefix { prefix: Some(prefix) })
569 .collect();
570
571 let mut result_objects = Vec::new();
573 let mut result_prefixes = Vec::new();
574 let mut total_count = 0;
575 let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
576 let mut last_key: Option<String> = None;
577
578 let mut obj_idx = 0;
579 let mut prefix_idx = 0;
580
581 while total_count < max_keys_usize {
582 let obj_key = objects.get(obj_idx).and_then(|o| o.key.as_deref());
583 let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.as_deref());
584
585 match (obj_key, prefix_key) {
586 (Some(ok), Some(pk)) => {
587 if ok < pk {
588 last_key = Some(ok.to_owned());
589 result_objects.push(objects[obj_idx].clone());
590 obj_idx += 1;
591 } else {
592 last_key = Some(pk.to_owned());
593 result_prefixes.push(common_prefixes_list[prefix_idx].clone());
594 prefix_idx += 1;
595 }
596 total_count += 1;
597 }
598 (Some(ok), None) => {
599 last_key = Some(ok.to_owned());
600 result_objects.push(objects[obj_idx].clone());
601 obj_idx += 1;
602 total_count += 1;
603 }
604 (None, Some(pk)) => {
605 last_key = Some(pk.to_owned());
606 result_prefixes.push(common_prefixes_list[prefix_idx].clone());
607 prefix_idx += 1;
608 total_count += 1;
609 }
610 (None, None) => break,
611 }
612 }
613
614 let is_truncated = max_keys_usize > 0 && (obj_idx < objects.len() || prefix_idx < common_prefixes_list.len());
615 let key_count = try_!(i32::try_from(total_count));
616 let next_continuation_token = if is_truncated {
617 last_key.or_else(|| {
618 let obj_key = objects.get(obj_idx).and_then(|o| o.key.clone());
619 let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.clone());
620 match (obj_key, prefix_key) {
621 (Some(ok), Some(pk)) => Some(if ok < pk { ok } else { pk }),
622 (Some(ok), None) => Some(ok),
623 (None, Some(pk)) => Some(pk),
624 (None, None) => None,
625 }
626 })
627 } else {
628 None
629 };
630
631 let contents = result_objects.is_empty().not().then_some(result_objects);
632 let common_prefixes = result_prefixes.is_empty().not().then_some(result_prefixes);
633
634 let output = ListObjectsV2Output {
635 key_count: Some(key_count),
636 max_keys: Some(max_keys),
637 is_truncated: Some(is_truncated),
638 contents,
639 common_prefixes,
640 continuation_token: input.continuation_token,
641 next_continuation_token,
642 delimiter: input.delimiter,
643 encoding_type: input.encoding_type,
644 name: Some(input.bucket),
645 prefix: input.prefix,
646 start_after: input.start_after,
647 ..Default::default()
648 };
649 Ok(S3Response::new(output))
650 }
651
652 #[tracing::instrument]
653 async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
654 use crate::fs::ObjectAttributes;
655
656 let mut input = req.input;
657 if let Some(ref storage_class) = input.storage_class {
658 let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
659 if !is_valid {
660 return Err(s3_error!(InvalidStorageClass));
661 }
662 }
663
664 let PutObjectInput {
665 body,
666 bucket,
667 key,
668 metadata,
669 content_length,
670 content_md5,
671 content_encoding,
672 content_type,
673 content_disposition,
674 content_language,
675 cache_control,
676 expires,
677 website_redirect_location,
678 if_match,
679 if_none_match,
680 ..
681 } = input;
682
683 let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
684
685 let object_path = self.get_object_path(&bucket, &key)?;
689 if let Some(ref condition) = if_none_match
690 && condition.is_any()
691 && object_path.exists()
692 {
693 return Err(s3_error!(PreconditionFailed, "Object already exists"));
694 }
695 if let Some(ref condition) = if_match {
696 if !object_path.exists() {
697 return Err(s3_error!(PreconditionFailed, "Object does not exist"));
698 }
699 if let ETagCondition::ETag(expected) = condition {
700 let info = self.load_internal_info(&bucket, &key).await?;
701 let etag_value = match info.as_ref().and_then(crate::checksum::load_e_tag) {
702 Some(v) => v,
703 None => self.get_md5_sum(&bucket, &key).await?,
704 };
705 if !ETag::Strong(etag_value).strong_cmp(expected) {
706 return Err(s3_error!(PreconditionFailed, "ETag does not match"));
707 }
708 }
709 }
710
711 let mut checksum: s3s::checksum::ChecksumHasher = default();
712 if input.checksum_crc32.is_some() {
713 checksum.crc32 = Some(default());
714 }
715 if input.checksum_crc32c.is_some() {
716 checksum.crc32c = Some(default());
717 }
718 if input.checksum_sha1.is_some() {
719 checksum.sha1 = Some(default());
720 }
721 if input.checksum_sha256.is_some() {
722 checksum.sha256 = Some(default());
723 }
724 if input.checksum_crc64nvme.is_some() {
725 checksum.crc64nvme = Some(default());
726 }
727 if let Some(alg) = input.checksum_algorithm {
728 match alg.as_str() {
729 ChecksumAlgorithm::CRC32 => checksum.crc32 = Some(default()),
730 ChecksumAlgorithm::CRC32C => checksum.crc32c = Some(default()),
731 ChecksumAlgorithm::SHA1 => checksum.sha1 = Some(default()),
732 ChecksumAlgorithm::SHA256 => checksum.sha256 = Some(default()),
733 ChecksumAlgorithm::CRC64NVME => checksum.crc64nvme = Some(default()),
734 _ => return Err(s3_error!(NotImplemented, "Unsupported checksum algorithm")),
735 }
736 }
737
738 if key.ends_with('/') {
739 if let Some(len) = content_length
740 && len > 0
741 {
742 return Err(s3_error!(UnexpectedContent, "Unexpected request body when creating a directory object."));
743 }
744 try_!(fs::create_dir_all(&object_path).await);
745 let output = PutObjectOutput::default();
746 return Ok(S3Response::new(output));
747 }
748
749 let mut file_writer = self.prepare_file_write(&object_path).await?;
750
751 let mut md5_hash = Md5::new();
752 let stream = body.inspect_ok(|bytes| {
753 md5_hash.update(bytes.as_ref());
754 checksum.update(bytes.as_ref());
755 });
756
757 let size = copy_bytes(stream, file_writer.writer()).await?;
758 file_writer.done().await?;
759
760 let md5_sum = hex(md5_hash.finalize());
761
762 if let Some(content_md5) = content_md5 {
763 let content_md5 = base64_simd::STANDARD
764 .decode_to_vec(content_md5)
765 .map_err(|_| s3_error!(InvalidArgument))?;
766 let content_md5 = hex(content_md5);
767 if content_md5 != md5_sum {
768 return Err(s3_error!(BadDigest, "content_md5 mismatch"));
769 }
770 }
771
772 let checksum = checksum.finalize();
773
774 if let Some(trailers) = req.trailing_headers
775 && let Some(trailers) = trailers.take()
776 {
777 if let Some(crc32) = trailers.get("x-amz-checksum-crc32") {
778 input.checksum_crc32 = Some(crc32.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
779 }
780 if let Some(crc32c) = trailers.get("x-amz-checksum-crc32c") {
781 input.checksum_crc32c = Some(crc32c.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
782 }
783 if let Some(sha1) = trailers.get("x-amz-checksum-sha1") {
784 input.checksum_sha1 = Some(sha1.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
785 }
786 if let Some(sha256) = trailers.get("x-amz-checksum-sha256") {
787 input.checksum_sha256 = Some(sha256.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
788 }
789 if let Some(crc64nvme) = trailers.get("x-amz-checksum-crc64nvme") {
790 input.checksum_crc64nvme = Some(crc64nvme.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
791 }
792 }
793
794 if checksum.checksum_crc32 != input.checksum_crc32 {
795 return Err(s3_error!(
796 BadDigest,
797 "checksum_crc32 mismatch: expected `{}`, got `{}`",
798 input.checksum_crc32.unwrap_or_default(),
799 checksum.checksum_crc32.unwrap_or_default()
800 ));
801 }
802 if checksum.checksum_crc32c != input.checksum_crc32c {
803 return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
804 }
805 if checksum.checksum_sha1 != input.checksum_sha1 {
806 return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
807 }
808 if checksum.checksum_sha256 != input.checksum_sha256 {
809 return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
810 }
811 if checksum.checksum_crc64nvme != input.checksum_crc64nvme {
812 return Err(s3_error!(BadDigest, "checksum_crc64nvme mismatch"));
813 }
814
815 debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");
816
817 let mut obj_attrs = ObjectAttributes {
819 user_metadata: metadata,
820 content_encoding,
821 content_type,
822 content_disposition,
823 content_language,
824 cache_control,
825 expires: None,
826 website_redirect_location,
827 };
828 obj_attrs.set_expires_timestamp(expires);
829 self.save_object_attributes(&bucket, &key, &obj_attrs, None).await?;
830
831 let mut info: InternalInfo = default();
832 crate::checksum::save_e_tag(&mut info, &md5_sum);
833 crate::checksum::modify_internal_info(&mut info, &checksum);
834 self.save_internal_info(&bucket, &key, &info).await?;
835
836 let output = PutObjectOutput {
837 e_tag: Some(ETag::Strong(md5_sum)),
838 checksum_crc32: checksum.checksum_crc32,
839 checksum_crc32c: checksum.checksum_crc32c,
840 checksum_sha1: checksum.checksum_sha1,
841 checksum_sha256: checksum.checksum_sha256,
842 checksum_crc64nvme: checksum.checksum_crc64nvme,
843 ..Default::default()
844 };
845 Ok(S3Response::new(output))
846 }
847
848 #[tracing::instrument]
849 async fn create_multipart_upload(
850 &self,
851 req: S3Request<CreateMultipartUploadInput>,
852 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
853 use crate::fs::ObjectAttributes;
854
855 let input = req.input;
856 let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;
857
858 let mut obj_attrs = ObjectAttributes {
860 user_metadata: input.metadata,
861 content_encoding: input.content_encoding,
862 content_type: input.content_type,
863 content_disposition: input.content_disposition,
864 content_language: input.content_language,
865 cache_control: input.cache_control,
866 expires: None,
867 website_redirect_location: input.website_redirect_location,
868 };
869 obj_attrs.set_expires_timestamp(input.expires);
870 self.save_object_attributes(&input.bucket, &input.key, &obj_attrs, Some(upload_id))
871 .await?;
872
873 let output = CreateMultipartUploadOutput {
874 bucket: Some(input.bucket),
875 key: Some(input.key),
876 upload_id: Some(upload_id.to_string()),
877 ..Default::default()
878 };
879
880 Ok(S3Response::new(output))
881 }
882
883 #[tracing::instrument]
884 async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
885 let UploadPartInput {
886 body,
887 upload_id,
888 part_number,
889 ..
890 } = req.input;
891
892 if part_number > 10_000 {
893 return Err(s3_error!(
894 InvalidArgument,
895 "Part number must be an integer between 1 and 10000, inclusive"
896 ));
897 }
898
899 let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
900
901 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
902 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
903 return Err(s3_error!(AccessDenied));
904 }
905
906 let file_path = self.resolve_upload_part_path(upload_id, part_number)?;
907
908 let mut md5_hash = Md5::new();
909 let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
910
911 let mut file_writer = self.prepare_file_write(&file_path).await?;
912 let size = copy_bytes(stream, file_writer.writer()).await?;
913 file_writer.done().await?;
914
915 let md5_sum = hex(md5_hash.finalize());
916
917 debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
918
919 let output = UploadPartOutput {
920 e_tag: Some(ETag::Strong(md5_sum)),
921 ..Default::default()
922 };
923 Ok(S3Response::new(output))
924 }
925
926 #[tracing::instrument]
927 async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
928 let input = req.input;
929
930 let upload_id = Uuid::parse_str(&input.upload_id).map_err(|_| s3_error!(InvalidRequest))?;
931 let part_number = input.part_number;
932 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
933 return Err(s3_error!(AccessDenied));
934 }
935
936 let (src_bucket, src_key) = match input.copy_source {
937 CopySource::AccessPoint { .. } | CopySource::Outpost { .. } => return Err(s3_error!(NotImplemented)),
938 CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
939 };
940 let src_path = self.get_object_path(src_bucket, src_key)?;
941 let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;
942
943 let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
944 let file_len = try_!(src_file.metadata().await).len();
945
946 let (start, content_length) = if let Some(copy_range) = &input.copy_source_range {
947 if !copy_range.starts_with("bytes=") {
948 return Err(s3_error!(InvalidArgument));
949 }
950 let range = ©_range["bytes=".len()..];
951 let parts: Vec<&str> = range.split('-').collect();
952 if parts.len() != 2 {
953 return Err(s3_error!(InvalidArgument));
954 }
955
956 let start: u64 = parts[0].parse().map_err(|_| s3_error!(InvalidArgument))?;
957 let end_inclusive = if parts[1].is_empty() {
958 file_len.saturating_sub(1)
959 } else {
960 parts[1].parse().map_err(|_| s3_error!(InvalidArgument))?
961 };
962 if start > end_inclusive || start >= file_len || end_inclusive >= file_len {
963 return Err(s3_error!(InvalidRange));
964 }
965 let content_length = end_inclusive - start + 1;
966 (start, content_length)
967 } else {
968 (0, file_len)
969 };
970 let content_length_usize = try_!(usize::try_from(content_length));
971
972 let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
973 let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));
974
975 let mut md5_hash = Md5::new();
976 let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
977
978 let mut file_writer = self.prepare_file_write(&dst_path).await?;
979 let size = copy_bytes(stream, file_writer.writer()).await?;
980 file_writer.done().await?;
981
982 let md5_sum = hex(md5_hash.finalize());
983
984 debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
985
986 let output = UploadPartCopyOutput {
987 copy_part_result: Some(CopyPartResult {
988 e_tag: Some(ETag::Strong(md5_sum)),
989 ..Default::default()
990 }),
991 ..Default::default()
992 };
993
994 Ok(S3Response::new(output))
995 }
996
997 #[tracing::instrument]
998 async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
999 let ListPartsInput {
1000 bucket, key, upload_id, ..
1001 } = req.input;
1002
1003 let mut parts: Vec<Part> = Vec::new();
1004 let mut iter = try_!(fs::read_dir(&self.root).await);
1005
1006 let prefix = format!(".upload_id-{upload_id}");
1007
1008 while let Some(entry) = try_!(iter.next_entry().await) {
1009 let file_type = try_!(entry.file_type().await);
1010 if file_type.is_file().not() {
1011 continue;
1012 }
1013
1014 let file_name = entry.file_name();
1015 let Some(name) = file_name.to_str() else { continue };
1016
1017 let Some(part_segment) = name.strip_prefix(&prefix) else { continue };
1018 let Some(part_number) = part_segment.strip_prefix(".part-") else { continue };
1019 let part_number = part_number.parse::<i32>().unwrap();
1020
1021 let file_meta = try_!(entry.metadata().await);
1022 let last_modified = Timestamp::from(try_!(file_meta.modified()));
1023 let size = try_!(i64::try_from(file_meta.len()));
1024
1025 let part = Part {
1026 last_modified: Some(last_modified),
1027 part_number: Some(part_number),
1028 size: Some(size),
1029 ..Default::default()
1030 };
1031 parts.push(part);
1032 }
1033
1034 let output = ListPartsOutput {
1035 bucket: Some(bucket),
1036 key: Some(key),
1037 upload_id: Some(upload_id),
1038 parts: Some(parts),
1039 ..Default::default()
1040 };
1041 Ok(S3Response::new(output))
1042 }
1043
1044 #[tracing::instrument]
1045 async fn complete_multipart_upload(
1046 &self,
1047 req: S3Request<CompleteMultipartUploadInput>,
1048 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1049 let CompleteMultipartUploadInput {
1050 multipart_upload,
1051 bucket,
1052 key,
1053 upload_id,
1054 if_match,
1055 if_none_match,
1056 ..
1057 } = req.input;
1058
1059 let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
1060
1061 let parts_count = multipart_upload.parts.as_ref().map_or(0, Vec::len);
1062 if parts_count == 0 {
1063 return Err(s3_error!(InvalidPart, "You must specify at least one part"));
1064 }
1065
1066 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
1067 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
1068 return Err(s3_error!(AccessDenied));
1069 }
1070
1071 let object_path = self.get_object_path(&bucket, &key)?;
1073 if let Some(ref condition) = if_none_match
1074 && condition.is_any()
1075 && object_path.exists()
1076 {
1077 return Err(s3_error!(PreconditionFailed, "Object already exists"));
1078 }
1079 if let Some(ref condition) = if_match {
1080 if condition.is_any() {
1081 if !object_path.exists() {
1083 return Err(s3_error!(PreconditionFailed, "Object does not exist"));
1084 }
1085 } else if let Some(expected_etag) = condition.as_etag() {
1086 if object_path.exists() {
1087 let info = self.load_internal_info(&bucket, &key).await?;
1088 let etag_value = match info.as_ref().and_then(crate::checksum::load_e_tag) {
1089 Some(e_tag) => e_tag,
1090 None => self.get_md5_sum(&bucket, &key).await?,
1091 };
1092 let existing_etag = ETag::Strong(etag_value);
1093 if !expected_etag.strong_cmp(&existing_etag) {
1094 return Err(s3_error!(PreconditionFailed, "ETag does not match"));
1095 }
1096 } else {
1097 return Err(s3_error!(PreconditionFailed, "Object does not exist"));
1098 }
1099 }
1100 }
1101
1102 self.delete_upload_id(&upload_id).await?;
1103
1104 if let Ok(Some(attrs)) = self.load_object_attributes(&bucket, &key, Some(upload_id)).await {
1105 self.save_object_attributes(&bucket, &key, &attrs, None).await?;
1106 let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
1107 }
1108
1109 let mut file_writer = self.prepare_file_write(&object_path).await?;
1110
1111 let mut cnt: i32 = 0;
1112 let total_parts_cnt = i32::try_from(parts_count).expect("total number of parts must be <= 10000.");
1113
1114 let mut part_md5_hashes: Vec<[u8; 16]> = Vec::new();
1115 let mut buf = vec![0u8; 65536];
1116
1117 for part in multipart_upload.parts.into_iter().flatten() {
1118 let part_number = part
1119 .part_number
1120 .ok_or_else(|| s3_error!(InvalidRequest, "missing part number"))?;
1121 cnt += 1;
1122 if part_number != cnt {
1123 return Err(s3_error!(InvalidRequest, "invalid part order"));
1124 }
1125
1126 let part_path = self.resolve_upload_part_path(upload_id, part_number)?;
1127
1128 let mut reader = try_!(fs::File::open(&part_path).await);
1129 let mut part_md5 = Md5::new();
1130 let mut size: u64 = 0;
1131 loop {
1132 let nread = try_!(reader.read(&mut buf).await);
1133 if nread == 0 {
1134 break;
1135 }
1136 part_md5.update(&buf[..nread]);
1137 try_!(file_writer.writer().write_all(&buf[..nread]).await);
1138 size += nread as u64;
1139 }
1140 try_!(file_writer.writer().flush().await);
1141 part_md5_hashes.push(part_md5.finalize());
1142
1143 if part_number != total_parts_cnt && size < 5 * 1024 * 1024 {
1144 return Err(s3_error!(EntityTooSmall));
1145 }
1146
1147 debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
1148 try_!(fs::remove_file(&part_path).await);
1149 }
1150 file_writer.done().await?;
1151
1152 let mut etag_hash = Md5::new();
1154 for hash in &part_md5_hashes {
1155 etag_hash.update(hash);
1156 }
1157 let e_tag = format!("{}-{}", hex(etag_hash.finalize()), part_md5_hashes.len());
1158
1159 debug!(?e_tag, path = %object_path.display(), "multipart etag");
1160
1161 {
1162 let mut info = self.load_internal_info(&bucket, &key).await?.unwrap_or_default();
1163 crate::checksum::save_e_tag(&mut info, &e_tag);
1164 self.save_internal_info(&bucket, &key, &info).await?;
1165 }
1166
1167 let output = CompleteMultipartUploadOutput {
1168 future: Some(Box::pin(async move {
1170 Ok(CompleteMultipartUploadOutput {
1171 bucket: Some(bucket),
1172 key: Some(key),
1173 e_tag: Some(ETag::Strong(e_tag)),
1174 ..Default::default()
1175 })
1176 })),
1177 ..Default::default()
1178 };
1179
1180 debug!(?output);
1181
1182 Ok(S3Response::new(output))
1183 }
1184
1185 #[tracing::instrument]
1186 async fn abort_multipart_upload(
1187 &self,
1188 req: S3Request<AbortMultipartUploadInput>,
1189 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1190 let AbortMultipartUploadInput {
1191 bucket, key, upload_id, ..
1192 } = req.input;
1193
1194 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
1195 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
1196 return Err(s3_error!(AccessDenied));
1197 }
1198
1199 let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
1200
1201 let prefix = format!(".upload_id-{upload_id}");
1202 let mut iter = try_!(fs::read_dir(&self.root).await);
1203 while let Some(entry) = try_!(iter.next_entry().await) {
1204 let file_type = try_!(entry.file_type().await);
1205 if file_type.is_file().not() {
1206 continue;
1207 }
1208
1209 let file_name = entry.file_name();
1210 let Some(name) = file_name.to_str() else { continue };
1211
1212 if name.starts_with(&prefix) {
1213 try_!(fs::remove_file(entry.path()).await);
1214 }
1215 }
1216
1217 self.delete_upload_id(&upload_id).await?;
1218
1219 debug!(bucket = %bucket, key = %key, upload_id = %upload_id, "multipart upload aborted");
1220
1221 Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
1222 }
1223}
1224
1225impl FileSystem {
1226 async fn list_objects_recursive(&self, bucket_root: &Path, prefix: &str, objects: &mut Vec<Object>) -> S3Result<()> {
1227 let mut dir_queue: VecDeque<PathBuf> = default();
1228 dir_queue.push_back(bucket_root.to_owned());
1229 let prefix_is_empty = prefix.is_empty();
1230
1231 while let Some(dir) = dir_queue.pop_front() {
1232 let mut iter = try_!(fs::read_dir(dir).await);
1233 while let Some(entry) = try_!(iter.next_entry().await) {
1234 let file_type = try_!(entry.file_type().await);
1235 if file_type.is_dir() {
1236 dir_queue.push_back(entry.path());
1237 } else {
1238 let file_path = entry.path();
1239 let key = try_!(file_path.strip_prefix(bucket_root));
1240 let Some(key_str) = normalize_path(key, "/") else {
1241 continue;
1242 };
1243
1244 if !prefix_is_empty && !key_str.starts_with(prefix) {
1245 continue;
1246 }
1247
1248 let metadata = try_!(entry.metadata().await);
1249 let last_modified = Timestamp::from(try_!(metadata.modified()));
1250 let size = metadata.len();
1251
1252 let object = Object {
1253 key: Some(key_str),
1254 last_modified: Some(last_modified),
1255 size: Some(try_!(i64::try_from(size))),
1256 ..Default::default()
1257 };
1258 objects.push(object);
1259 }
1260 }
1261 }
1262
1263 Ok(())
1264 }
1265
1266 async fn list_objects_with_delimiter(
1267 &self,
1268 bucket_root: &Path,
1269 prefix: &str,
1270 delimiter: &str,
1271 objects: &mut Vec<Object>,
1272 common_prefixes: &mut std::collections::BTreeSet<String>,
1273 ) -> S3Result<()> {
1274 let mut dir_queue: VecDeque<PathBuf> = default();
1277 dir_queue.push_back(bucket_root.to_owned());
1278 let prefix_is_empty = prefix.is_empty();
1279
1280 while let Some(dir) = dir_queue.pop_front() {
1281 let mut iter = try_!(fs::read_dir(dir).await);
1282
1283 while let Some(entry) = try_!(iter.next_entry().await) {
1284 let file_type = try_!(entry.file_type().await);
1285 let entry_path = entry.path();
1286
1287 let key = try_!(entry_path.strip_prefix(bucket_root));
1289 let Some(key_str) = normalize_path(key, "/") else {
1290 continue;
1291 };
1292
1293 if !prefix_is_empty && !key_str.starts_with(prefix) {
1295 if file_type.is_dir() && !prefix.starts_with(&key_str) && !key_str.starts_with(prefix) {
1297 continue;
1298 }
1299 if file_type.is_file() {
1300 continue;
1301 }
1302 }
1303
1304 if file_type.is_dir() {
1305 dir_queue.push_back(entry_path);
1307 } else {
1308 let remaining = &key_str[prefix.len()..];
1310
1311 if remaining.contains(delimiter) {
1312 if let Some(delimiter_pos) = remaining.find(delimiter) {
1314 let mut next_prefix = String::with_capacity(prefix.len() + delimiter_pos + 1);
1315 next_prefix.push_str(prefix);
1316 next_prefix.push_str(&remaining[..=delimiter_pos]);
1317 common_prefixes.insert(next_prefix);
1318 }
1319 } else {
1320 let metadata = try_!(entry.metadata().await);
1322 let last_modified = Timestamp::from(try_!(metadata.modified()));
1323 let size = metadata.len();
1324
1325 let object = Object {
1326 key: Some(key_str),
1327 last_modified: Some(last_modified),
1328 size: Some(try_!(i64::try_from(size))),
1329 ..Default::default()
1330 };
1331 objects.push(object);
1332 }
1333 }
1334 }
1335 }
1336
1337 Ok(())
1338 }
1339}