1use crate::fs::FileSystem;
2use crate::fs::InternalInfo;
3use crate::utils::*;
4
5use s3s::S3;
6use s3s::S3Result;
7use s3s::crypto::Checksum;
8use s3s::crypto::Md5;
9use s3s::dto::*;
10use s3s::s3_error;
11use s3s::{S3Request, S3Response};
12
13use std::collections::VecDeque;
14use std::io;
15use std::ops::Neg;
16use std::ops::Not;
17use std::path::Component;
18use std::path::{Path, PathBuf};
19
20use tokio::fs;
21use tokio::io::AsyncSeekExt;
22use tokio_util::io::ReaderStream;
23
24use futures::TryStreamExt;
25use numeric_cast::NumericCast;
26use stdx::default::default;
27use tracing::debug;
28use uuid::Uuid;
29
30fn normalize_path(path: &Path, delimiter: &str) -> Option<String> {
31 let mut normalized = String::new();
32 let mut first = true;
33 for component in path.components() {
34 match component {
35 Component::RootDir | Component::CurDir | Component::ParentDir | Component::Prefix(_) => {
36 return None;
37 }
38 Component::Normal(name) => {
39 let name = name.to_str()?;
40 if !first {
41 normalized.push_str(delimiter);
42 }
43 normalized.push_str(name);
44 first = false;
45 }
46 }
47 }
48 Some(normalized)
49}
50
51fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String {
53 format!("bytes {start}-{end_inclusive}/{size}")
54}
55
56#[async_trait::async_trait]
57impl S3 for FileSystem {
58 #[tracing::instrument]
59 async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
60 let input = req.input;
61 let path = self.get_bucket_path(&input.bucket)?;
62
63 if path.exists() {
64 return Err(s3_error!(BucketAlreadyExists));
65 }
66
67 try_!(fs::create_dir(&path).await);
68
69 let output = CreateBucketOutput::default(); Ok(S3Response::new(output))
71 }
72
73 #[tracing::instrument]
74 async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
75 let input = req.input;
76 let (bucket, key) = match input.copy_source {
77 CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
78 CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
79 };
80
81 let src_path = self.get_object_path(bucket, key)?;
82 let dst_path = self.get_object_path(&input.bucket, &input.key)?;
83
84 if src_path.exists().not() {
85 return Err(s3_error!(NoSuchKey));
86 }
87
88 if self.get_bucket_path(&input.bucket)?.exists().not() {
89 return Err(s3_error!(NoSuchBucket));
90 }
91
92 if let Some(dir_path) = dst_path.parent() {
93 try_!(fs::create_dir_all(&dir_path).await);
94 }
95
96 let file_metadata = try_!(fs::metadata(&src_path).await);
97 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
98
99 let _ = try_!(fs::copy(&src_path, &dst_path).await);
100
101 debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");
102
103 let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
104 if src_metadata_path.exists() {
105 let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
106 let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
107 }
108
109 let md5_sum = self.get_md5_sum(bucket, key).await?;
110
111 let copy_object_result = CopyObjectResult {
112 e_tag: Some(ETag::Strong(md5_sum)),
113 last_modified: Some(last_modified),
114 ..Default::default()
115 };
116
117 let output = CopyObjectOutput {
118 copy_object_result: Some(copy_object_result),
119 ..Default::default()
120 };
121 Ok(S3Response::new(output))
122 }
123
124 #[tracing::instrument]
125 async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
126 let input = req.input;
127 let path = self.get_bucket_path(&input.bucket)?;
128 if path.exists() {
129 try_!(fs::remove_dir_all(path).await);
130 } else {
131 return Err(s3_error!(NoSuchBucket));
132 }
133 Ok(S3Response::new(DeleteBucketOutput {}))
134 }
135
136 #[tracing::instrument]
137 async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
138 let input = req.input;
139 let path = self.get_object_path(&input.bucket, &input.key)?;
140 if path.exists().not() {
141 return Err(s3_error!(NoSuchKey));
142 }
143 if input.key.ends_with('/') {
144 let mut dir = try_!(fs::read_dir(&path).await);
145 let is_empty = try_!(dir.next_entry().await).is_none();
146 if is_empty {
147 try_!(fs::remove_dir(&path).await);
148 }
149 } else {
150 try_!(fs::remove_file(&path).await);
151 }
152 let output = DeleteObjectOutput::default(); Ok(S3Response::new(output))
154 }
155
156 #[tracing::instrument]
157 async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
158 let input = req.input;
159 let mut objects: Vec<(PathBuf, String)> = Vec::new();
160 for object in input.delete.objects {
161 let path = self.get_object_path(&input.bucket, &object.key)?;
162 if path.exists() {
163 objects.push((path, object.key));
164 }
165 }
166
167 let mut deleted_objects: Vec<DeletedObject> = Vec::new();
168 for (path, key) in objects {
169 try_!(fs::remove_file(path).await);
170
171 let deleted_object = DeletedObject {
172 key: Some(key),
173 ..Default::default()
174 };
175
176 deleted_objects.push(deleted_object);
177 }
178
179 let output = DeleteObjectsOutput {
180 deleted: Some(deleted_objects),
181 ..Default::default()
182 };
183 Ok(S3Response::new(output))
184 }
185
186 #[tracing::instrument]
187 async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
188 let input = req.input;
189 let path = self.get_bucket_path(&input.bucket)?;
190
191 if !path.exists() {
192 return Err(s3_error!(NoSuchBucket));
193 }
194
195 let output = GetBucketLocationOutput::default();
196 Ok(S3Response::new(output))
197 }
198
199 #[tracing::instrument]
200 async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
201 let input = req.input;
202 let object_path = self.get_object_path(&input.bucket, &input.key)?;
203
204 let mut file = fs::File::open(&object_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
205
206 let file_metadata = try_!(file.metadata().await);
207 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
208 let file_len = file_metadata.len();
209
210 let (content_length, content_range) = match input.range {
211 None => (file_len, None),
212 Some(range) => {
213 let file_range = range.check(file_len)?;
214 let content_length = file_range.end - file_range.start;
215 let content_range = fmt_content_range(file_range.start, file_range.end - 1, file_len);
216 (content_length, Some(content_range))
217 }
218 };
219 let content_length_usize = try_!(usize::try_from(content_length));
220 let content_length_i64 = try_!(i64::try_from(content_length));
221
222 match input.range {
223 Some(Range::Int { first, .. }) => {
224 try_!(file.seek(io::SeekFrom::Start(first)).await);
225 }
226 Some(Range::Suffix { length }) => {
227 let neg_offset = length.numeric_cast::<i64>().neg();
228 try_!(file.seek(io::SeekFrom::End(neg_offset)).await);
229 }
230 None => {}
231 }
232
233 let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);
234
235 let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
236
237 let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;
238
239 let info = self.load_internal_info(&input.bucket, &input.key).await?;
240 let checksum = match &info {
241 Some(info) if content_length == file_len => crate::checksum::from_internal_info(info),
244 _ => default(),
245 };
246
247 #[allow(clippy::redundant_closure_for_method_calls)]
248 let output = GetObjectOutput {
249 body: Some(StreamingBlob::wrap(body)),
250 content_length: Some(content_length_i64),
251 content_range,
252 last_modified: Some(last_modified),
253 metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
254 content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
255 content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
256 content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
257 content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
258 cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
259 expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
260 website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
261 e_tag: Some(ETag::Strong(md5_sum)),
262 checksum_crc32: checksum.checksum_crc32,
263 checksum_crc32c: checksum.checksum_crc32c,
264 checksum_sha1: checksum.checksum_sha1,
265 checksum_sha256: checksum.checksum_sha256,
266 checksum_crc64nvme: checksum.checksum_crc64nvme,
267 ..Default::default()
268 };
269 Ok(S3Response::new(output))
270 }
271
272 #[tracing::instrument]
273 async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
274 let input = req.input;
275 let path = self.get_bucket_path(&input.bucket)?;
276
277 if !path.exists() {
278 return Err(s3_error!(NoSuchBucket));
279 }
280
281 Ok(S3Response::new(HeadBucketOutput::default()))
282 }
283
284 #[tracing::instrument]
285 async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
286 let input = req.input;
287 let path = self.get_object_path(&input.bucket, &input.key)?;
288
289 if !path.exists() {
290 return Err(s3_error!(NoSuchBucket));
291 }
292
293 let file_metadata = try_!(fs::metadata(path).await);
294 let last_modified = Timestamp::from(try_!(file_metadata.modified()));
295 let file_len = file_metadata.len();
296
297 let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;
298
299 #[allow(clippy::redundant_closure_for_method_calls)]
300 let output = HeadObjectOutput {
301 content_length: Some(try_!(i64::try_from(file_len))),
302 content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
303 content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
304 content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
305 content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
306 cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
307 expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
308 website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
309 last_modified: Some(last_modified),
310 metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
311 ..Default::default()
312 };
313 Ok(S3Response::new(output))
314 }
315
316 #[tracing::instrument]
317 async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
318 let mut buckets: Vec<Bucket> = Vec::new();
319 let mut iter = try_!(fs::read_dir(&self.root).await);
320 while let Some(entry) = try_!(iter.next_entry().await) {
321 let file_type = try_!(entry.file_type().await);
322 if file_type.is_dir().not() {
323 continue;
324 }
325
326 let file_name = entry.file_name();
327 let Some(name) = file_name.to_str() else { continue };
328 if s3s::path::check_bucket_name(name).not() {
329 continue;
330 }
331
332 let file_meta = try_!(entry.metadata().await);
333 let created_or_modified_date = Timestamp::from(try_!(file_meta.created().or(file_meta.modified())));
337
338 let bucket = Bucket {
339 creation_date: Some(created_or_modified_date),
340 name: Some(name.to_owned()),
341 bucket_region: None,
342 };
343 buckets.push(bucket);
344 }
345
346 let output = ListBucketsOutput {
347 buckets: Some(buckets),
348 owner: None,
349 ..Default::default()
350 };
351 Ok(S3Response::new(output))
352 }
353
354 #[tracing::instrument]
355 async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
356 let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
357
358 Ok(v2_resp.map_output(|v2| ListObjectsOutput {
359 contents: v2.contents,
360 common_prefixes: v2.common_prefixes,
361 delimiter: v2.delimiter,
362 encoding_type: v2.encoding_type,
363 name: v2.name,
364 prefix: v2.prefix,
365 max_keys: v2.max_keys,
366 is_truncated: v2.is_truncated,
367 ..Default::default()
368 }))
369 }
370
371 #[tracing::instrument]
372 async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
373 let input = req.input;
374 let path = self.get_bucket_path(&input.bucket)?;
375
376 if path.exists().not() {
377 return Err(s3_error!(NoSuchBucket));
378 }
379
380 let delimiter = input.delimiter.as_deref();
381 let prefix = input.prefix.as_deref().unwrap_or("").trim_start_matches('/');
382 let max_keys = input.max_keys.unwrap_or(1000);
383
384 let mut objects: Vec<Object> = default();
386 let mut common_prefixes = std::collections::BTreeSet::new();
387
388 if let Some(delimiter) = delimiter {
389 self.list_objects_with_delimiter(&path, prefix, delimiter, &mut objects, &mut common_prefixes)
390 .await?;
391 } else {
392 self.list_objects_recursive(&path, prefix, &mut objects).await?;
393 }
394
395 objects.sort_by(|lhs, rhs| {
397 let lhs_key = lhs.key.as_deref().unwrap_or("");
398 let rhs_key = rhs.key.as_deref().unwrap_or("");
399 lhs_key.cmp(rhs_key)
400 });
401
402 if let Some(marker) = &input.start_after {
404 objects.retain(|obj| obj.key.as_deref().unwrap_or("") > marker.as_str());
405 }
406
407 let common_prefixes_list: Vec<CommonPrefix> = common_prefixes
409 .into_iter()
410 .map(|prefix| CommonPrefix { prefix: Some(prefix) })
411 .collect();
412
413 let mut result_objects = Vec::new();
415 let mut result_prefixes = Vec::new();
416 let mut total_count = 0;
417 let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);
418
419 let mut obj_idx = 0;
420 let mut prefix_idx = 0;
421
422 while total_count < max_keys_usize {
423 let obj_key = objects.get(obj_idx).and_then(|o| o.key.as_deref());
424 let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.as_deref());
425
426 match (obj_key, prefix_key) {
427 (Some(ok), Some(pk)) => {
428 if ok < pk {
429 result_objects.push(objects[obj_idx].clone());
430 obj_idx += 1;
431 } else {
432 result_prefixes.push(common_prefixes_list[prefix_idx].clone());
433 prefix_idx += 1;
434 }
435 total_count += 1;
436 }
437 (Some(_), None) => {
438 result_objects.push(objects[obj_idx].clone());
439 obj_idx += 1;
440 total_count += 1;
441 }
442 (None, Some(_)) => {
443 result_prefixes.push(common_prefixes_list[prefix_idx].clone());
444 prefix_idx += 1;
445 total_count += 1;
446 }
447 (None, None) => break,
448 }
449 }
450
451 let is_truncated = obj_idx < objects.len() || prefix_idx < common_prefixes_list.len();
452 let key_count = try_!(i32::try_from(total_count));
453
454 let contents = result_objects.is_empty().not().then_some(result_objects);
455 let common_prefixes = result_prefixes.is_empty().not().then_some(result_prefixes);
456
457 let output = ListObjectsV2Output {
458 key_count: Some(key_count),
459 max_keys: Some(max_keys),
460 is_truncated: Some(is_truncated),
461 contents,
462 common_prefixes,
463 delimiter: input.delimiter,
464 encoding_type: input.encoding_type,
465 name: Some(input.bucket),
466 prefix: input.prefix,
467 ..Default::default()
468 };
469 Ok(S3Response::new(output))
470 }
471
472 #[tracing::instrument]
473 async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
474 use crate::fs::ObjectAttributes;
475
476 let mut input = req.input;
477 if let Some(ref storage_class) = input.storage_class {
478 let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
479 if !is_valid {
480 return Err(s3_error!(InvalidStorageClass));
481 }
482 }
483
484 let PutObjectInput {
485 body,
486 bucket,
487 key,
488 metadata,
489 content_length,
490 content_md5,
491 content_encoding,
492 content_type,
493 content_disposition,
494 content_language,
495 cache_control,
496 expires,
497 website_redirect_location,
498 if_none_match,
499 ..
500 } = input;
501
502 let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
503
504 if let Some(ref condition) = if_none_match {
507 if condition.is_any() {
508 let object_path = self.get_object_path(&bucket, &key)?;
509 if object_path.exists() {
510 return Err(s3_error!(PreconditionFailed, "Object already exists"));
511 }
512 }
513 }
514
515 let mut checksum: s3s::checksum::ChecksumHasher = default();
516 if input.checksum_crc32.is_some() {
517 checksum.crc32 = Some(default());
518 }
519 if input.checksum_crc32c.is_some() {
520 checksum.crc32c = Some(default());
521 }
522 if input.checksum_sha1.is_some() {
523 checksum.sha1 = Some(default());
524 }
525 if input.checksum_sha256.is_some() {
526 checksum.sha256 = Some(default());
527 }
528 if input.checksum_crc64nvme.is_some() {
529 checksum.crc64nvme = Some(default());
530 }
531 if let Some(alg) = input.checksum_algorithm {
532 match alg.as_str() {
533 ChecksumAlgorithm::CRC32 => checksum.crc32 = Some(default()),
534 ChecksumAlgorithm::CRC32C => checksum.crc32c = Some(default()),
535 ChecksumAlgorithm::SHA1 => checksum.sha1 = Some(default()),
536 ChecksumAlgorithm::SHA256 => checksum.sha256 = Some(default()),
537 ChecksumAlgorithm::CRC64NVME => checksum.crc64nvme = Some(default()),
538 _ => return Err(s3_error!(NotImplemented, "Unsupported checksum algorithm")),
539 }
540 }
541
542 if key.ends_with('/') {
543 if let Some(len) = content_length {
544 if len > 0 {
545 return Err(s3_error!(UnexpectedContent, "Unexpected request body when creating a directory object."));
546 }
547 }
548 let object_path = self.get_object_path(&bucket, &key)?;
549 try_!(fs::create_dir_all(&object_path).await);
550 let output = PutObjectOutput::default();
551 return Ok(S3Response::new(output));
552 }
553
554 let object_path = self.get_object_path(&bucket, &key)?;
555 let mut file_writer = self.prepare_file_write(&object_path).await?;
556
557 let mut md5_hash = Md5::new();
558 let stream = body.inspect_ok(|bytes| {
559 md5_hash.update(bytes.as_ref());
560 checksum.update(bytes.as_ref());
561 });
562
563 let size = copy_bytes(stream, file_writer.writer()).await?;
564 file_writer.done().await?;
565
566 let md5_sum = hex(md5_hash.finalize());
567
568 if let Some(content_md5) = content_md5 {
569 let content_md5 = base64_simd::STANDARD
570 .decode_to_vec(content_md5)
571 .map_err(|_| s3_error!(InvalidArgument))?;
572 let content_md5 = hex(content_md5);
573 if content_md5 != md5_sum {
574 return Err(s3_error!(BadDigest, "content_md5 mismatch"));
575 }
576 }
577
578 let checksum = checksum.finalize();
579
580 if let Some(trailers) = req.trailing_headers {
581 if let Some(trailers) = trailers.take() {
582 if let Some(crc32) = trailers.get("x-amz-checksum-crc32") {
583 input.checksum_crc32 = Some(crc32.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
584 }
585 if let Some(crc32c) = trailers.get("x-amz-checksum-crc32c") {
586 input.checksum_crc32c = Some(crc32c.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
587 }
588 if let Some(sha1) = trailers.get("x-amz-checksum-sha1") {
589 input.checksum_sha1 = Some(sha1.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
590 }
591 if let Some(sha256) = trailers.get("x-amz-checksum-sha256") {
592 input.checksum_sha256 = Some(sha256.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
593 }
594 if let Some(crc64nvme) = trailers.get("x-amz-checksum-crc64nvme") {
595 input.checksum_crc64nvme = Some(crc64nvme.to_str().map_err(|_| s3_error!(InvalidArgument))?.to_owned());
596 }
597 }
598 }
599
600 if checksum.checksum_crc32 != input.checksum_crc32 {
601 return Err(s3_error!(
602 BadDigest,
603 "checksum_crc32 mismatch: expected `{}`, got `{}`",
604 input.checksum_crc32.unwrap_or_default(),
605 checksum.checksum_crc32.unwrap_or_default()
606 ));
607 }
608 if checksum.checksum_crc32c != input.checksum_crc32c {
609 return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
610 }
611 if checksum.checksum_sha1 != input.checksum_sha1 {
612 return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
613 }
614 if checksum.checksum_sha256 != input.checksum_sha256 {
615 return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
616 }
617 if checksum.checksum_crc64nvme != input.checksum_crc64nvme {
618 return Err(s3_error!(BadDigest, "checksum_crc64nvme mismatch"));
619 }
620
621 debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");
622
623 let mut obj_attrs = ObjectAttributes {
625 user_metadata: metadata,
626 content_encoding,
627 content_type,
628 content_disposition,
629 content_language,
630 cache_control,
631 expires: None,
632 website_redirect_location,
633 };
634 obj_attrs.set_expires_timestamp(expires);
635 self.save_object_attributes(&bucket, &key, &obj_attrs, None).await?;
636
637 let mut info: InternalInfo = default();
638 crate::checksum::modify_internal_info(&mut info, &checksum);
639 self.save_internal_info(&bucket, &key, &info).await?;
640
641 let output = PutObjectOutput {
642 e_tag: Some(ETag::Strong(md5_sum)),
643 checksum_crc32: checksum.checksum_crc32,
644 checksum_crc32c: checksum.checksum_crc32c,
645 checksum_sha1: checksum.checksum_sha1,
646 checksum_sha256: checksum.checksum_sha256,
647 checksum_crc64nvme: checksum.checksum_crc64nvme,
648 ..Default::default()
649 };
650 Ok(S3Response::new(output))
651 }
652
653 #[tracing::instrument]
654 async fn create_multipart_upload(
655 &self,
656 req: S3Request<CreateMultipartUploadInput>,
657 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
658 use crate::fs::ObjectAttributes;
659
660 let input = req.input;
661 let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;
662
663 let mut obj_attrs = ObjectAttributes {
665 user_metadata: input.metadata,
666 content_encoding: input.content_encoding,
667 content_type: input.content_type,
668 content_disposition: input.content_disposition,
669 content_language: input.content_language,
670 cache_control: input.cache_control,
671 expires: None,
672 website_redirect_location: input.website_redirect_location,
673 };
674 obj_attrs.set_expires_timestamp(input.expires);
675 self.save_object_attributes(&input.bucket, &input.key, &obj_attrs, Some(upload_id))
676 .await?;
677
678 let output = CreateMultipartUploadOutput {
679 bucket: Some(input.bucket),
680 key: Some(input.key),
681 upload_id: Some(upload_id.to_string()),
682 ..Default::default()
683 };
684
685 Ok(S3Response::new(output))
686 }
687
688 #[tracing::instrument]
689 async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
690 let UploadPartInput {
691 body,
692 upload_id,
693 part_number,
694 ..
695 } = req.input;
696
697 if part_number > 10_000 {
698 return Err(s3_error!(
699 InvalidArgument,
700 "Part number must be an integer between 1 and 10000, inclusive"
701 ));
702 }
703
704 let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
705
706 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
707 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
708 return Err(s3_error!(AccessDenied));
709 }
710
711 let file_path = self.resolve_upload_part_path(upload_id, part_number)?;
712
713 let mut md5_hash = Md5::new();
714 let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
715
716 let mut file_writer = self.prepare_file_write(&file_path).await?;
717 let size = copy_bytes(stream, file_writer.writer()).await?;
718 file_writer.done().await?;
719
720 let md5_sum = hex(md5_hash.finalize());
721
722 debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
723
724 let output = UploadPartOutput {
725 e_tag: Some(ETag::Strong(md5_sum)),
726 ..Default::default()
727 };
728 Ok(S3Response::new(output))
729 }
730
731 #[tracing::instrument]
732 async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
733 let input = req.input;
734
735 let upload_id = Uuid::parse_str(&input.upload_id).map_err(|_| s3_error!(InvalidRequest))?;
736 let part_number = input.part_number;
737 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
738 return Err(s3_error!(AccessDenied));
739 }
740
741 let (src_bucket, src_key) = match input.copy_source {
742 CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
743 CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
744 };
745 let src_path = self.get_object_path(src_bucket, src_key)?;
746 let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;
747
748 let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
749 let file_len = try_!(src_file.metadata().await).len();
750
751 let (start, end) = if let Some(copy_range) = &input.copy_source_range {
752 if !copy_range.starts_with("bytes=") {
753 return Err(s3_error!(InvalidArgument));
754 }
755 let range = ©_range["bytes=".len()..];
756 let parts: Vec<&str> = range.split('-').collect();
757 if parts.len() != 2 {
758 return Err(s3_error!(InvalidArgument));
759 }
760
761 let start: u64 = parts[0].parse().map_err(|_| s3_error!(InvalidArgument))?;
762 let mut end = file_len - 1;
763 if parts[1].is_empty().not() {
764 end = parts[1].parse().map_err(|_| s3_error!(InvalidArgument))?;
765 }
766 (start, end)
767 } else {
768 (0, file_len - 1)
769 };
770
771 let content_length = end - start + 1;
772 let content_length_usize = try_!(usize::try_from(content_length));
773
774 let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
775 let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));
776
777 let mut md5_hash = Md5::new();
778 let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
779
780 let mut file_writer = self.prepare_file_write(&dst_path).await?;
781 let size = copy_bytes(stream, file_writer.writer()).await?;
782 file_writer.done().await?;
783
784 let md5_sum = hex(md5_hash.finalize());
785
786 debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
787
788 let output = UploadPartCopyOutput {
789 copy_part_result: Some(CopyPartResult {
790 e_tag: Some(ETag::Strong(md5_sum)),
791 ..Default::default()
792 }),
793 ..Default::default()
794 };
795
796 Ok(S3Response::new(output))
797 }
798
799 #[tracing::instrument]
800 async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
801 let ListPartsInput {
802 bucket, key, upload_id, ..
803 } = req.input;
804
805 let mut parts: Vec<Part> = Vec::new();
806 let mut iter = try_!(fs::read_dir(&self.root).await);
807
808 let prefix = format!(".upload_id-{upload_id}");
809
810 while let Some(entry) = try_!(iter.next_entry().await) {
811 let file_type = try_!(entry.file_type().await);
812 if file_type.is_file().not() {
813 continue;
814 }
815
816 let file_name = entry.file_name();
817 let Some(name) = file_name.to_str() else { continue };
818
819 let Some(part_segment) = name.strip_prefix(&prefix) else { continue };
820 let Some(part_number) = part_segment.strip_prefix(".part-") else { continue };
821 let part_number = part_number.parse::<i32>().unwrap();
822
823 let file_meta = try_!(entry.metadata().await);
824 let last_modified = Timestamp::from(try_!(file_meta.modified()));
825 let size = try_!(i64::try_from(file_meta.len()));
826
827 let part = Part {
828 last_modified: Some(last_modified),
829 part_number: Some(part_number),
830 size: Some(size),
831 ..Default::default()
832 };
833 parts.push(part);
834 }
835
836 let output = ListPartsOutput {
837 bucket: Some(bucket),
838 key: Some(key),
839 upload_id: Some(upload_id),
840 parts: Some(parts),
841 ..Default::default()
842 };
843 Ok(S3Response::new(output))
844 }
845
846 #[tracing::instrument]
847 async fn complete_multipart_upload(
848 &self,
849 req: S3Request<CompleteMultipartUploadInput>,
850 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
851 let CompleteMultipartUploadInput {
852 multipart_upload,
853 bucket,
854 key,
855 upload_id,
856 ..
857 } = req.input;
858
859 let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
860
861 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
862 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
863 return Err(s3_error!(AccessDenied));
864 }
865
866 self.delete_upload_id(&upload_id).await?;
867
868 if let Ok(Some(attrs)) = self.load_object_attributes(&bucket, &key, Some(upload_id)).await {
869 self.save_object_attributes(&bucket, &key, &attrs, None).await?;
870 let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
871 }
872
873 let object_path = self.get_object_path(&bucket, &key)?;
874 let mut file_writer = self.prepare_file_write(&object_path).await?;
875
876 let mut cnt: i32 = 0;
877 let total_parts_cnt = multipart_upload
878 .parts
879 .as_ref()
880 .map(|parts| i32::try_from(parts.len()).expect("total number of parts must be <= 10000."))
881 .unwrap_or_default();
882
883 for part in multipart_upload.parts.into_iter().flatten() {
884 let part_number = part
885 .part_number
886 .ok_or_else(|| s3_error!(InvalidRequest, "missing part number"))?;
887 cnt += 1;
888 if part_number != cnt {
889 return Err(s3_error!(InvalidRequest, "invalid part order"));
890 }
891
892 let part_path = self.resolve_upload_part_path(upload_id, part_number)?;
893
894 let mut reader = try_!(fs::File::open(&part_path).await);
895 let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);
896
897 if part_number != total_parts_cnt && size < 5 * 1024 * 1024 {
898 return Err(s3_error!(EntityTooSmall));
899 }
900
901 debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
902 try_!(fs::remove_file(&part_path).await);
903 }
904 file_writer.done().await?;
905
906 let file_size = try_!(fs::metadata(&object_path).await).len();
907 let md5_sum = self.get_md5_sum(&bucket, &key).await?;
908
909 debug!(?md5_sum, path = %object_path.display(), size = ?file_size, "file md5 sum");
910
911 let output = CompleteMultipartUploadOutput {
912 future: Some(Box::pin(async move {
914 Ok(CompleteMultipartUploadOutput {
915 bucket: Some(bucket),
916 key: Some(key),
917 e_tag: Some(ETag::Strong(md5_sum)),
918 ..Default::default()
919 })
920 })),
921 ..Default::default()
922 };
923
924 debug!(?output);
925
926 Ok(S3Response::new(output))
927 }
928
929 #[tracing::instrument]
930 async fn abort_multipart_upload(
931 &self,
932 req: S3Request<AbortMultipartUploadInput>,
933 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
934 let AbortMultipartUploadInput {
935 bucket, key, upload_id, ..
936 } = req.input;
937
938 let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?;
939 if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() {
940 return Err(s3_error!(AccessDenied));
941 }
942
943 let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
944
945 let prefix = format!(".upload_id-{upload_id}");
946 let mut iter = try_!(fs::read_dir(&self.root).await);
947 while let Some(entry) = try_!(iter.next_entry().await) {
948 let file_type = try_!(entry.file_type().await);
949 if file_type.is_file().not() {
950 continue;
951 }
952
953 let file_name = entry.file_name();
954 let Some(name) = file_name.to_str() else { continue };
955
956 if name.starts_with(&prefix) {
957 try_!(fs::remove_file(entry.path()).await);
958 }
959 }
960
961 self.delete_upload_id(&upload_id).await?;
962
963 debug!(bucket = %bucket, key = %key, upload_id = %upload_id, "multipart upload aborted");
964
965 Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
966 }
967}
968
969impl FileSystem {
970 async fn list_objects_recursive(&self, bucket_root: &Path, prefix: &str, objects: &mut Vec<Object>) -> S3Result<()> {
971 let mut dir_queue: VecDeque<PathBuf> = default();
972 dir_queue.push_back(bucket_root.to_owned());
973 let prefix_is_empty = prefix.is_empty();
974
975 while let Some(dir) = dir_queue.pop_front() {
976 let mut iter = try_!(fs::read_dir(dir).await);
977 while let Some(entry) = try_!(iter.next_entry().await) {
978 let file_type = try_!(entry.file_type().await);
979 if file_type.is_dir() {
980 dir_queue.push_back(entry.path());
981 } else {
982 let file_path = entry.path();
983 let key = try_!(file_path.strip_prefix(bucket_root));
984 let Some(key_str) = normalize_path(key, "/") else {
985 continue;
986 };
987
988 if !prefix_is_empty && !key_str.starts_with(prefix) {
989 continue;
990 }
991
992 let metadata = try_!(entry.metadata().await);
993 let last_modified = Timestamp::from(try_!(metadata.modified()));
994 let size = metadata.len();
995
996 let object = Object {
997 key: Some(key_str),
998 last_modified: Some(last_modified),
999 size: Some(try_!(i64::try_from(size))),
1000 ..Default::default()
1001 };
1002 objects.push(object);
1003 }
1004 }
1005 }
1006
1007 Ok(())
1008 }
1009
1010 async fn list_objects_with_delimiter(
1011 &self,
1012 bucket_root: &Path,
1013 prefix: &str,
1014 delimiter: &str,
1015 objects: &mut Vec<Object>,
1016 common_prefixes: &mut std::collections::BTreeSet<String>,
1017 ) -> S3Result<()> {
1018 let mut dir_queue: VecDeque<PathBuf> = default();
1021 dir_queue.push_back(bucket_root.to_owned());
1022 let prefix_is_empty = prefix.is_empty();
1023
1024 while let Some(dir) = dir_queue.pop_front() {
1025 let mut iter = try_!(fs::read_dir(dir).await);
1026
1027 while let Some(entry) = try_!(iter.next_entry().await) {
1028 let file_type = try_!(entry.file_type().await);
1029 let entry_path = entry.path();
1030
1031 let key = try_!(entry_path.strip_prefix(bucket_root));
1033 let Some(key_str) = normalize_path(key, "/") else {
1034 continue;
1035 };
1036
1037 if !prefix_is_empty && !key_str.starts_with(prefix) {
1039 if file_type.is_dir() && !prefix.starts_with(&key_str) && !key_str.starts_with(prefix) {
1041 continue;
1042 }
1043 if file_type.is_file() {
1044 continue;
1045 }
1046 }
1047
1048 if file_type.is_dir() {
1049 dir_queue.push_back(entry_path);
1051 } else {
1052 let remaining = &key_str[prefix.len()..];
1054
1055 if remaining.contains(delimiter) {
1056 if let Some(delimiter_pos) = remaining.find(delimiter) {
1058 let mut next_prefix = String::with_capacity(prefix.len() + delimiter_pos + 1);
1059 next_prefix.push_str(prefix);
1060 next_prefix.push_str(&remaining[..=delimiter_pos]);
1061 common_prefixes.insert(next_prefix);
1062 }
1063 } else {
1064 let metadata = try_!(entry.metadata().await);
1066 let last_modified = Timestamp::from(try_!(metadata.modified()));
1067 let size = metadata.len();
1068
1069 let object = Object {
1070 key: Some(key_str),
1071 last_modified: Some(last_modified),
1072 size: Some(try_!(i64::try_from(size))),
1073 ..Default::default()
1074 };
1075 objects.push(object);
1076 }
1077 }
1078 }
1079 }
1080
1081 Ok(())
1082 }
1083}