1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::SharedKmsState;
13use fakecloud_persistence::{MemoryS3Store, S3Store, StoreError};
14
15use base64::engine::general_purpose::STANDARD as BASE64;
16use base64::Engine as _;
17
18use crate::logging;
19use crate::state::{AclGrant, S3Bucket, S3Object, SharedS3State};
20
21mod access_points;
22mod acl;
23mod buckets;
24pub(crate) mod config;
25mod lock;
26mod multipart;
27mod notifications;
28mod objects;
29mod tags;
30
31#[cfg(test)]
33use notifications::replicate_object;
34pub(super) use notifications::{
35 deliver_notifications, normalize_notification_ids, normalize_replication_xml,
36 replicate_through_store,
37};
38
39use notifications::extract_all_xml_values;
41
42#[cfg(test)]
44use notifications::{
45 event_matches, key_matches_filters, parse_notification_config, parse_replication_rules,
46 NotificationTargetType,
47};
48
49pub struct S3Service {
50 state: SharedS3State,
51 delivery: Arc<DeliveryBus>,
52 kms_state: Option<SharedKmsState>,
53 pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
54 store: Arc<dyn S3Store>,
55}
56
57pub(crate) fn persistence_error(err: StoreError) -> AwsServiceError {
62 AwsServiceError::aws_error(
63 StatusCode::INTERNAL_SERVER_ERROR,
64 "InternalError",
65 format!("persistence store error: {err}"),
66 )
67}
68
69pub(crate) fn io_to_aws(err: std::io::Error) -> AwsServiceError {
72 AwsServiceError::aws_error(
73 StatusCode::INTERNAL_SERVER_ERROR,
74 "InternalError",
75 format!("failed to read object body from disk: {err}"),
76 )
77}
78
79impl S3Service {
80 pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
81 Self::with_store(state, delivery, Arc::new(MemoryS3Store::new()))
82 }
83
84 pub fn with_store(
85 state: SharedS3State,
86 delivery: Arc<DeliveryBus>,
87 store: Arc<dyn S3Store>,
88 ) -> Self {
89 Self {
90 state,
91 delivery,
92 kms_state: None,
93 kms_hook: None,
94 store,
95 }
96 }
97
98 pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
99 self.kms_state = Some(kms_state);
100 self
101 }
102
103 pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
104 self.kms_hook = Some(hook);
105 self
106 }
107
108 pub(crate) fn encrypt_object_body(
119 &self,
120 account_id: &str,
121 region: &str,
122 bucket: &str,
123 plaintext: &[u8],
124 kms_key_id: Option<&str>,
125 ) -> Result<bytes::Bytes, AwsServiceError> {
126 let Some(hook) = &self.kms_hook else {
127 return Ok(bytes::Bytes::copy_from_slice(plaintext));
128 };
129 let key = kms_key_id.filter(|k| !k.is_empty()).unwrap_or("aws/s3");
130 let bucket_arn = Arn::s3(bucket).to_string();
131 let mut ctx = std::collections::HashMap::new();
132 ctx.insert("aws:s3:arn".to_string(), bucket_arn);
133 match hook.encrypt(account_id, region, key, plaintext, "s3.amazonaws.com", ctx) {
134 Ok(envelope) => Ok(bytes::Bytes::from(envelope.into_bytes())),
135 Err(err) => {
136 tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS encrypt failed");
137 Err(AwsServiceError::aws_error(
138 StatusCode::INTERNAL_SERVER_ERROR,
139 "KMS.InternalFailureException",
140 format!("Failed to encrypt object via KMS: {err}"),
141 ))
142 }
143 }
144 }
145
146 pub(crate) fn decrypt_object_body(
156 &self,
157 account_id: &str,
158 bucket: &str,
159 ciphertext: &[u8],
160 ) -> Result<bytes::Bytes, AwsServiceError> {
161 let Some(hook) = &self.kms_hook else {
162 return Ok(bytes::Bytes::copy_from_slice(ciphertext));
163 };
164 let envelope = match std::str::from_utf8(ciphertext) {
167 Ok(s) => s,
168 Err(_) => return Ok(bytes::Bytes::copy_from_slice(ciphertext)),
169 };
170 let bucket_arn = Arn::s3(bucket).to_string();
171 let mut ctx = std::collections::HashMap::new();
172 ctx.insert("aws:s3:arn".to_string(), bucket_arn);
173 match hook.decrypt(account_id, envelope, "s3.amazonaws.com", ctx) {
174 Ok(bytes) => Ok(bytes::Bytes::from(bytes)),
175 Err(err) => {
176 tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS decrypt failed");
177 Err(AwsServiceError::aws_error(
178 StatusCode::INTERNAL_SERVER_ERROR,
179 "KMS.InternalFailureException",
180 format!("Failed to decrypt object via KMS: {err}"),
181 ))
182 }
183 }
184 }
185}
186
187#[async_trait]
188impl AwsService for S3Service {
189 fn service_name(&self) -> &str {
190 "s3"
191 }
192
193 async fn handle(&self, mut req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194 let is_put_to_key = req.method == Method::PUT
204 && req.path_segments.len() >= 2
205 && req
206 .path_segments
207 .first()
208 .map(|s| !s.is_empty())
209 .unwrap_or(false);
210 let q = &req.query_params;
211 let is_put_object = is_put_to_key
212 && !q.contains_key("tagging")
213 && !q.contains_key("acl")
214 && !q.contains_key("retention")
215 && !q.contains_key("legal-hold")
216 && !q.contains_key("renameObject")
217 && !q.contains_key("encryption")
218 && !req.headers.contains_key("x-amz-copy-source");
219 let is_upload_part =
224 is_put_to_key && q.contains_key("partNumber") && q.contains_key("uploadId");
225 if !is_put_object && !is_upload_part {
226 if let Some(stream) = req.take_body_stream() {
227 req.body = fakecloud_core::service::drain_request_stream(stream).await?;
228 }
229 }
230
231 access_points::resolve_access_point(self, &mut req)?;
233
234 let account_id = req.account_id.as_str();
235
236 let host = req
238 .headers
239 .get("host")
240 .and_then(|v| v.to_str().ok())
241 .unwrap_or("");
242 let is_control = host.to_ascii_lowercase().contains("s3-control");
243 if is_control {
244 let v1 = req.path_segments.first().map(|s| s.as_str());
245 let v2 = req.path_segments.get(1).map(|s| s.as_str());
246 let v3 = req.path_segments.get(2).map(|s| s.as_str());
247 if v1 == Some("v20180820") && v2 == Some("accesspoint") {
248 if let Some(name) = v3 {
249 match req.method {
250 Method::PUT => return self.create_access_point(account_id, &req, name),
251 Method::GET => return self.get_access_point(account_id, &req, name),
252 Method::DELETE => return self.delete_access_point(account_id, &req, name),
253 _ => {}
254 }
255 } else if req.method == Method::GET {
256 return self.list_access_points(account_id, &req);
257 }
258 }
259 }
260
261 if req.raw_path.starts_with("//") {
270 return Err(AwsServiceError::aws_error(
271 StatusCode::BAD_REQUEST,
272 "InvalidBucketName",
273 "The specified bucket is not valid: bucket name cannot be empty",
274 ));
275 }
276 let bucket = req.path_segments.first().map(|s| s.as_str());
277 let key = if let Some(b) = bucket {
280 let prefix = format!("/{b}/");
281 if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
282 let raw_key = &req.raw_path[prefix.len()..];
283 Some(
284 percent_encoding::percent_decode_str(raw_key)
285 .decode_utf8_lossy()
286 .into_owned(),
287 )
288 } else if req.path_segments.len() > 1 {
289 let raw = req.path_segments[1..].join("/");
290 Some(
291 percent_encoding::percent_decode_str(&raw)
292 .decode_utf8_lossy()
293 .into_owned(),
294 )
295 } else {
296 None
297 }
298 } else {
299 None
300 };
301
302 if let Some(b) = bucket {
304 if req.method == Method::POST
306 && key.is_some()
307 && req.query_params.contains_key("uploads")
308 {
309 return self.create_multipart_upload(account_id, &req, b, key.as_deref().unwrap());
310 }
311
312 if req.method == Method::POST
314 && key.is_some()
315 && req.query_params.contains_key("restore")
316 {
317 return self.restore_object(account_id, &req, b, key.as_deref().unwrap());
318 }
319
320 if req.method == Method::POST && key.is_some() {
322 if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
323 return self.complete_multipart_upload(
324 account_id,
325 &req,
326 b,
327 key.as_deref().unwrap(),
328 &upload_id,
329 );
330 }
331 }
332
333 if req.method == Method::PUT && key.is_some() {
335 if let (Some(part_num_str), Some(upload_id)) = (
336 req.query_params.get("partNumber").cloned(),
337 req.query_params.get("uploadId").cloned(),
338 ) {
339 if let Ok(part_number) = part_num_str.parse::<i64>() {
340 if req.headers.contains_key("x-amz-copy-source") {
341 return self.upload_part_copy(
342 account_id,
343 &req,
344 b,
345 key.as_deref().unwrap(),
346 &upload_id,
347 part_number,
348 );
349 }
350 return self
351 .upload_part(
352 account_id,
353 &req,
354 b,
355 key.as_deref().unwrap(),
356 &upload_id,
357 part_number,
358 )
359 .await;
360 }
361 }
362 }
363
364 if req.method == Method::DELETE && key.is_some() {
366 if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
367 return self.abort_multipart_upload(
368 account_id,
369 b,
370 key.as_deref().unwrap(),
371 &upload_id,
372 );
373 }
374 }
375
376 if req.method == Method::GET
378 && key.is_none()
379 && req.query_params.contains_key("uploads")
380 {
381 return self.list_multipart_uploads(account_id, b);
382 }
383
384 if req.method == Method::GET && key.is_some() {
386 if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
387 return self.list_parts(
388 account_id,
389 &req,
390 b,
391 key.as_deref().unwrap(),
392 &upload_id,
393 );
394 }
395 }
396 }
397
398 if req.method == Method::OPTIONS {
400 if let Some(b_name) = bucket {
401 let cors_config = {
402 let accounts = self.state.read();
403 let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
404 let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
405 state
406 .buckets
407 .get(b_name)
408 .and_then(|b| b.cors_config.clone())
409 };
410 if let Some(ref config) = cors_config {
411 let origin = req
412 .headers
413 .get("origin")
414 .and_then(|v| v.to_str().ok())
415 .unwrap_or("");
416 let request_method = req
417 .headers
418 .get("access-control-request-method")
419 .and_then(|v| v.to_str().ok())
420 .unwrap_or("");
421 let rules = parse_cors_config(config);
422 if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
423 let mut headers = HeaderMap::new();
424 let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
425 "*"
426 } else {
427 origin
428 };
429 headers.insert(
430 "access-control-allow-origin",
431 matched_origin
432 .parse()
433 .unwrap_or_else(|_| http::HeaderValue::from_static("")),
434 );
435 headers.insert(
436 "access-control-allow-methods",
437 rule.allowed_methods
438 .join(", ")
439 .parse()
440 .unwrap_or_else(|_| http::HeaderValue::from_static("")),
441 );
442 if !rule.allowed_headers.is_empty() {
443 let ah = if rule.allowed_headers.contains(&"*".to_string()) {
444 req.headers
445 .get("access-control-request-headers")
446 .and_then(|v| v.to_str().ok())
447 .unwrap_or("*")
448 .to_string()
449 } else {
450 rule.allowed_headers.join(", ")
451 };
452 headers.insert(
453 "access-control-allow-headers",
454 ah.parse()
455 .unwrap_or_else(|_| http::HeaderValue::from_static("")),
456 );
457 }
458 if let Some(max_age) = rule.max_age_seconds {
459 headers.insert(
460 "access-control-max-age",
461 max_age
462 .to_string()
463 .parse()
464 .unwrap_or_else(|_| http::HeaderValue::from_static("")),
465 );
466 }
467 return Ok(AwsResponse {
468 status: StatusCode::OK,
469 content_type: String::new(),
470 body: Bytes::new().into(),
471 headers,
472 });
473 }
474 }
475 return Err(AwsServiceError::aws_error(
476 StatusCode::FORBIDDEN,
477 "CORSResponse",
478 "CORS is not enabled for this bucket",
479 ));
480 }
481 }
482
483 let origin_header = req
485 .headers
486 .get("origin")
487 .and_then(|v| v.to_str().ok())
488 .map(|s| s.to_string());
489
490 let bucket_subresources: &[&str] = &[
497 "tagging",
498 "acl",
499 "versioning",
500 "cors",
501 "notification",
502 "website",
503 "accelerate",
504 "publicAccessBlock",
505 "encryption",
506 "lifecycle",
507 "logging",
508 "policy",
509 "policyStatus",
510 "replication",
511 "ownershipControls",
512 "inventory",
513 "analytics",
514 "intelligent-tiering",
515 "metrics",
516 "requestPayment",
517 "abac",
518 "metadataConfiguration",
519 "metadataTable",
520 "metadataInventoryTable",
521 "metadataJournalTable",
522 "object-lock",
523 "versions",
524 "session",
525 "retention",
526 "legal-hold",
527 "attributes",
528 "torrent",
529 "renameObject",
530 "uploads",
531 "restore",
532 "select",
533 "location",
540 "delete",
541 "list-type",
542 ];
543 if bucket.is_none()
544 && bucket_subresources
545 .iter()
546 .any(|q| req.query_params.contains_key(*q))
547 {
548 return Err(AwsServiceError::aws_error(
549 StatusCode::BAD_REQUEST,
550 "InvalidBucketName",
551 "The specified bucket is not valid: bucket name is required for this operation",
552 ));
553 }
554
555 let mut result = match (&req.method, bucket, key.as_deref()) {
556 (&Method::GET, None, None) => {
558 if req.query_params.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets")
559 {
560 self.list_directory_buckets(account_id, &req)
561 } else {
562 self.list_buckets(account_id, &req)
563 }
564 }
565
566 (&Method::PUT, Some(b), None) => {
568 if req.query_params.contains_key("tagging") {
569 self.put_bucket_tagging(account_id, &req, b)
570 } else if req.query_params.contains_key("acl") {
571 self.put_bucket_acl(account_id, &req, b)
572 } else if req.query_params.contains_key("versioning") {
573 self.put_bucket_versioning(account_id, &req, b)
574 } else if req.query_params.contains_key("cors") {
575 self.put_bucket_cors(account_id, &req, b)
576 } else if req.query_params.contains_key("notification") {
577 self.put_bucket_notification(account_id, &req, b)
578 } else if req.query_params.contains_key("website") {
579 self.put_bucket_website(account_id, &req, b)
580 } else if req.query_params.contains_key("accelerate") {
581 self.put_bucket_accelerate(account_id, &req, b)
582 } else if req.query_params.contains_key("publicAccessBlock") {
583 self.put_public_access_block(account_id, &req, b)
584 } else if req.query_params.contains_key("encryption") {
585 self.put_bucket_encryption(account_id, &req, b)
586 } else if req.query_params.contains_key("lifecycle") {
587 self.put_bucket_lifecycle(account_id, &req, b)
588 } else if req.query_params.contains_key("logging") {
589 self.put_bucket_logging(account_id, &req, b)
590 } else if req.query_params.contains_key("policy") {
591 self.put_bucket_policy(account_id, &req, b)
592 } else if req.query_params.contains_key("object-lock") {
593 self.put_object_lock_config(account_id, &req, b)
594 } else if req.query_params.contains_key("replication") {
595 self.put_bucket_replication(account_id, &req, b)
596 } else if req.query_params.contains_key("ownershipControls") {
597 self.put_bucket_ownership_controls(account_id, &req, b)
598 } else if req.query_params.contains_key("inventory") {
599 self.put_bucket_inventory(account_id, &req, b)
600 } else if req.query_params.contains_key("analytics") {
601 self.put_bucket_analytics_config(account_id, &req, b)
602 } else if req.query_params.contains_key("intelligent-tiering") {
603 self.put_bucket_intelligent_tiering_config(account_id, &req, b)
604 } else if req.query_params.contains_key("metrics") {
605 self.put_bucket_metrics_config(account_id, &req, b)
606 } else if req.query_params.contains_key("requestPayment") {
607 self.put_bucket_request_payment(account_id, &req, b)
608 } else if req.query_params.contains_key("abac") {
609 self.put_bucket_abac(account_id, &req, b)
610 } else if req.query_params.contains_key("metadataInventoryTable") {
611 self.update_bucket_metadata_inventory_table(account_id, &req, b)
612 } else if req.query_params.contains_key("metadataJournalTable") {
613 self.update_bucket_metadata_journal_table(account_id, &req, b)
614 } else {
615 self.create_bucket(account_id, &req, b)
616 }
617 }
618 (&Method::DELETE, Some(b), None) => {
619 if req.query_params.contains_key("tagging") {
620 self.delete_bucket_tagging(account_id, &req, b)
621 } else if req.query_params.contains_key("cors") {
622 self.delete_bucket_cors(account_id, b)
623 } else if req.query_params.contains_key("website") {
624 self.delete_bucket_website(account_id, b)
625 } else if req.query_params.contains_key("publicAccessBlock") {
626 self.delete_public_access_block(account_id, b)
627 } else if req.query_params.contains_key("encryption") {
628 self.delete_bucket_encryption(account_id, b)
629 } else if req.query_params.contains_key("lifecycle") {
630 self.delete_bucket_lifecycle(account_id, b)
631 } else if req.query_params.contains_key("policy") {
632 self.delete_bucket_policy(account_id, b)
633 } else if req.query_params.contains_key("replication") {
634 self.delete_bucket_replication(account_id, b)
635 } else if req.query_params.contains_key("ownershipControls") {
636 self.delete_bucket_ownership_controls(account_id, b)
637 } else if req.query_params.contains_key("inventory") {
638 self.delete_bucket_inventory(account_id, &req, b)
639 } else if req.query_params.contains_key("analytics") {
640 self.delete_bucket_analytics_config(account_id, &req, b)
641 } else if req.query_params.contains_key("intelligent-tiering") {
642 self.delete_bucket_intelligent_tiering_config(account_id, &req, b)
643 } else if req.query_params.contains_key("metrics") {
644 self.delete_bucket_metrics_config(account_id, &req, b)
645 } else if req.query_params.contains_key("metadataConfiguration") {
646 self.delete_bucket_metadata_config(account_id, b)
647 } else if req.query_params.contains_key("metadataTable") {
648 self.delete_bucket_metadata_table_config(account_id, b)
649 } else {
650 self.delete_bucket(account_id, &req, b)
651 }
652 }
653 (&Method::HEAD, Some(b), None) => self.head_bucket(account_id, b),
654 (&Method::GET, Some(b), None) => {
655 if req.query_params.contains_key("tagging") {
656 self.get_bucket_tagging(account_id, &req, b)
657 } else if req.query_params.contains_key("location") {
658 self.get_bucket_location(account_id, b)
659 } else if req.query_params.contains_key("acl") {
660 self.get_bucket_acl(account_id, &req, b)
661 } else if req.query_params.contains_key("versioning") {
662 self.get_bucket_versioning(account_id, b)
663 } else if req.query_params.contains_key("versions") {
664 self.list_object_versions(account_id, &req, b)
665 } else if req.query_params.contains_key("object-lock") {
666 self.get_object_lock_configuration(account_id, b)
667 } else if req.query_params.contains_key("cors") {
668 self.get_bucket_cors(account_id, b)
669 } else if req.query_params.contains_key("notification") {
670 self.get_bucket_notification(account_id, b)
671 } else if req.query_params.contains_key("website") {
672 self.get_bucket_website(account_id, b)
673 } else if req.query_params.contains_key("accelerate") {
674 self.get_bucket_accelerate(account_id, b)
675 } else if req.query_params.contains_key("publicAccessBlock") {
676 self.get_public_access_block(account_id, b)
677 } else if req.query_params.contains_key("encryption") {
678 self.get_bucket_encryption(account_id, b)
679 } else if req.query_params.contains_key("lifecycle") {
680 self.get_bucket_lifecycle(account_id, b)
681 } else if req.query_params.contains_key("logging") {
682 self.get_bucket_logging(account_id, b)
683 } else if req.query_params.contains_key("policy") {
684 self.get_bucket_policy(account_id, b)
685 } else if req.query_params.contains_key("replication") {
686 self.get_bucket_replication(account_id, b)
687 } else if req.query_params.contains_key("ownershipControls") {
688 self.get_bucket_ownership_controls(account_id, b)
689 } else if req.query_params.contains_key("inventory") {
690 if req.query_params.contains_key("id") {
691 self.get_bucket_inventory(account_id, &req, b)
692 } else {
693 self.list_bucket_inventory_configurations(account_id, b)
694 }
695 } else if req.query_params.contains_key("analytics") {
696 if req.query_params.contains_key("id") {
697 self.get_bucket_analytics_config(account_id, &req, b)
698 } else {
699 self.list_bucket_analytics_configurations(account_id, b)
700 }
701 } else if req.query_params.contains_key("intelligent-tiering") {
702 if req.query_params.contains_key("id") {
703 self.get_bucket_intelligent_tiering_config(account_id, &req, b)
704 } else {
705 self.list_bucket_intelligent_tiering_configurations(account_id, b)
706 }
707 } else if req.query_params.contains_key("metrics") {
708 if req.query_params.contains_key("id") {
709 self.get_bucket_metrics_config(account_id, &req, b)
710 } else {
711 self.list_bucket_metrics_configurations(account_id, b)
712 }
713 } else if req.query_params.contains_key("requestPayment") {
714 self.get_bucket_request_payment(account_id, b)
715 } else if req.query_params.contains_key("abac") {
716 self.get_bucket_abac(account_id, b)
717 } else if req.query_params.contains_key("policyStatus") {
718 self.get_bucket_policy_status(account_id, b)
719 } else if req.query_params.contains_key("metadataConfiguration") {
720 self.get_bucket_metadata_config(account_id, b)
721 } else if req.query_params.contains_key("metadataTable") {
722 self.get_bucket_metadata_table_config(account_id, b)
723 } else if req.query_params.contains_key("session") {
724 self.create_session(account_id, &req, b)
725 } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
726 self.list_objects_v2(account_id, &req, b)
727 } else if req.query_params.is_empty() {
728 let website_config = {
730 let accounts = self.state.read();
731 let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
732 let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
733 state
734 .buckets
735 .get(b)
736 .and_then(|bkt| bkt.website_config.clone())
737 };
738 if let Some(ref config) = website_config {
739 if let Some(index_doc) = extract_xml_value(config, "Suffix").or_else(|| {
740 extract_xml_value(config, "IndexDocument").and_then(|inner| {
741 let open = "<Suffix>";
742 let close = "</Suffix>";
743 let s = inner.find(open)? + open.len();
744 let e = inner.find(close)?;
745 Some(inner[s..e].trim().to_string())
746 })
747 }) {
748 self.serve_website_object(account_id, &req, b, &index_doc, config)
749 } else {
750 self.list_objects_v1(account_id, &req, b)
751 }
752 } else {
753 self.list_objects_v1(account_id, &req, b)
754 }
755 } else {
756 self.list_objects_v1(account_id, &req, b)
757 }
758 }
759
760 (&Method::PUT, Some(b), Some(k)) => {
762 if req.query_params.contains_key("tagging") {
763 self.put_object_tagging(account_id, &req, b, k)
764 } else if req.query_params.contains_key("acl") {
765 self.put_object_acl(account_id, &req, b, k)
766 } else if req.query_params.contains_key("retention") {
767 self.put_object_retention(account_id, &req, b, k)
768 } else if req.query_params.contains_key("legal-hold") {
769 self.put_object_legal_hold(account_id, &req, b, k)
770 } else if req.query_params.contains_key("renameObject") {
771 self.rename_object(account_id, &req, b, k)
772 } else if req.query_params.contains_key("encryption") {
773 self.update_object_encryption(account_id, &req, b, k)
774 } else if req.headers.contains_key("x-amz-copy-source") {
775 self.copy_object(account_id, &req, b, k)
776 } else {
777 self.put_object(account_id, &req, b, k).await
778 }
779 }
780 (&Method::GET, Some(b), Some(k)) => {
781 if req.query_params.contains_key("tagging") {
782 self.get_object_tagging(account_id, &req, b, k)
783 } else if req.query_params.contains_key("acl") {
784 self.get_object_acl(account_id, &req, b, k)
785 } else if req.query_params.contains_key("retention") {
786 self.get_object_retention(account_id, &req, b, k)
787 } else if req.query_params.contains_key("legal-hold") {
788 self.get_object_legal_hold(account_id, &req, b, k)
789 } else if req.query_params.contains_key("attributes") {
790 self.get_object_attributes(account_id, &req, b, k)
791 } else if req.query_params.contains_key("torrent") {
792 self.get_object_torrent(account_id, &req, b, k)
793 } else {
794 let result = self.get_object(account_id, &req, b, k);
795 let is_not_found = matches!(
797 &result,
798 Err(e) if e.code() == "NoSuchKey"
799 );
800 if is_not_found {
801 let website_config = {
802 let accounts = self.state.read();
803 let _empty_s3 =
804 crate::state::S3State::new(&req.account_id, &req.region);
805 let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
806 state
807 .buckets
808 .get(b)
809 .and_then(|bkt| bkt.website_config.clone())
810 };
811 if let Some(ref config) = website_config {
812 if let Some(error_key) = extract_xml_value(config, "ErrorDocument")
813 .and_then(|inner| {
814 let open = "<Key>";
815 let close = "</Key>";
816 let s = inner.find(open)? + open.len();
817 let e = inner.find(close)?;
818 Some(inner[s..e].trim().to_string())
819 })
820 .or_else(|| extract_xml_value(config, "Key"))
821 {
822 return self.serve_website_error(account_id, &req, b, &error_key);
823 }
824 }
825 }
826 result
827 }
828 }
829 (&Method::DELETE, Some(b), Some(k)) => {
830 if req.query_params.contains_key("tagging") {
831 self.delete_object_tagging(account_id, b, k)
832 } else {
833 self.delete_object(account_id, &req, b, k)
834 }
835 }
836 (&Method::HEAD, Some(b), Some(k)) => self.head_object(account_id, &req, b, k),
837
838 (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
840 self.delete_objects(account_id, &req, b)
841 }
842 (&Method::POST, Some(b), None)
843 if req.query_params.contains_key("metadataConfiguration") =>
844 {
845 self.create_bucket_metadata_config(account_id, &req, b)
846 }
847 (&Method::POST, Some(b), None) if req.query_params.contains_key("metadataTable") => {
848 self.create_bucket_metadata_table_config(account_id, &req, b)
849 }
850 (&Method::POST, Some(b), Some(k))
851 if req.query_params.get("select-type").map(|s| s.as_str()) == Some("2") =>
852 {
853 self.select_object_content(account_id, &req, b, k)
854 }
855 (&Method::POST, Some("WriteGetObjectResponse"), None) => {
856 self.write_get_object_response(account_id, &req)
857 }
858
859 _ => Err(AwsServiceError::aws_error(
860 StatusCode::METHOD_NOT_ALLOWED,
861 "MethodNotAllowed",
862 "The specified method is not allowed against this resource",
863 )),
864 };
865
866 if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
868 let cors_config = {
869 let accounts = self.state.read();
870 let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
871 let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
872 state
873 .buckets
874 .get(b_name)
875 .and_then(|b| b.cors_config.clone())
876 };
877 if let Some(ref config) = cors_config {
878 let rules = parse_cors_config(config);
879 if let Some(rule) = find_cors_rule(&rules, origin, None) {
880 if let Ok(ref mut resp) = result {
881 let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
882 "*"
883 } else {
884 origin
885 };
886 resp.headers.insert(
887 "access-control-allow-origin",
888 matched_origin
889 .parse()
890 .unwrap_or_else(|_| http::HeaderValue::from_static("")),
891 );
892 if !rule.expose_headers.is_empty() {
893 resp.headers.insert(
894 "access-control-expose-headers",
895 rule.expose_headers
896 .join(", ")
897 .parse()
898 .unwrap_or_else(|_| http::HeaderValue::from_static("")),
899 );
900 }
901 }
902 }
903 }
904 }
905
906 if let Some(b_name) = bucket {
908 let status_code = match &result {
909 Ok(resp) => resp.status.as_u16(),
910 Err(e) => e.status().as_u16(),
911 };
912 let op = logging::operation_name(&req.method, key.as_deref());
913 logging::maybe_write_access_log(
914 &self.state,
915 &self.store,
916 b_name,
917 &logging::AccessLogRequest {
918 operation: op,
919 key: key.as_deref(),
920 status: status_code,
921 request_id: &req.request_id,
922 method: req.method.as_str(),
923 path: &req.raw_path,
924 },
925 );
926 }
927
928 result
929 }
930
931 fn supported_actions(&self) -> &[&str] {
932 &[
933 "ListBuckets",
935 "CreateBucket",
936 "DeleteBucket",
937 "HeadBucket",
938 "GetBucketLocation",
939 "PutObject",
941 "GetObject",
942 "DeleteObject",
943 "HeadObject",
944 "CopyObject",
945 "DeleteObjects",
946 "ListObjectsV2",
947 "ListObjects",
948 "ListObjectVersions",
949 "GetObjectAttributes",
950 "RestoreObject",
951 "PutObjectTagging",
953 "GetObjectTagging",
954 "DeleteObjectTagging",
955 "PutObjectAcl",
956 "GetObjectAcl",
957 "PutObjectRetention",
958 "GetObjectRetention",
959 "PutObjectLegalHold",
960 "GetObjectLegalHold",
961 "PutBucketTagging",
963 "GetBucketTagging",
964 "DeleteBucketTagging",
965 "PutBucketAcl",
966 "GetBucketAcl",
967 "PutBucketVersioning",
968 "GetBucketVersioning",
969 "PutBucketCors",
970 "GetBucketCors",
971 "DeleteBucketCors",
972 "PutBucketNotificationConfiguration",
973 "GetBucketNotificationConfiguration",
974 "PutBucketWebsite",
975 "GetBucketWebsite",
976 "DeleteBucketWebsite",
977 "PutBucketAccelerateConfiguration",
978 "GetBucketAccelerateConfiguration",
979 "PutPublicAccessBlock",
980 "GetPublicAccessBlock",
981 "DeletePublicAccessBlock",
982 "PutBucketEncryption",
983 "GetBucketEncryption",
984 "DeleteBucketEncryption",
985 "PutBucketLifecycleConfiguration",
986 "GetBucketLifecycleConfiguration",
987 "DeleteBucketLifecycle",
988 "PutBucketLogging",
989 "GetBucketLogging",
990 "PutBucketPolicy",
991 "GetBucketPolicy",
992 "DeleteBucketPolicy",
993 "PutObjectLockConfiguration",
994 "GetObjectLockConfiguration",
995 "PutBucketReplication",
996 "GetBucketReplication",
997 "DeleteBucketReplication",
998 "PutBucketOwnershipControls",
999 "GetBucketOwnershipControls",
1000 "DeleteBucketOwnershipControls",
1001 "PutBucketInventoryConfiguration",
1002 "GetBucketInventoryConfiguration",
1003 "DeleteBucketInventoryConfiguration",
1004 "ListBucketInventoryConfigurations",
1005 "PutBucketAnalyticsConfiguration",
1006 "GetBucketAnalyticsConfiguration",
1007 "DeleteBucketAnalyticsConfiguration",
1008 "ListBucketAnalyticsConfigurations",
1009 "PutBucketIntelligentTieringConfiguration",
1010 "GetBucketIntelligentTieringConfiguration",
1011 "DeleteBucketIntelligentTieringConfiguration",
1012 "ListBucketIntelligentTieringConfigurations",
1013 "PutBucketMetricsConfiguration",
1014 "GetBucketMetricsConfiguration",
1015 "DeleteBucketMetricsConfiguration",
1016 "ListBucketMetricsConfigurations",
1017 "PutBucketRequestPayment",
1018 "GetBucketRequestPayment",
1019 "PutBucketAbac",
1020 "GetBucketAbac",
1021 "GetBucketPolicyStatus",
1022 "CreateBucketMetadataConfiguration",
1023 "GetBucketMetadataConfiguration",
1024 "DeleteBucketMetadataConfiguration",
1025 "CreateBucketMetadataTableConfiguration",
1026 "GetBucketMetadataTableConfiguration",
1027 "DeleteBucketMetadataTableConfiguration",
1028 "UpdateBucketMetadataInventoryTableConfiguration",
1029 "UpdateBucketMetadataJournalTableConfiguration",
1030 "GetObjectTorrent",
1031 "RenameObject",
1032 "SelectObjectContent",
1033 "UpdateObjectEncryption",
1034 "WriteGetObjectResponse",
1035 "ListDirectoryBuckets",
1036 "CreateSession",
1037 "CreateMultipartUpload",
1039 "UploadPart",
1040 "UploadPartCopy",
1041 "CompleteMultipartUpload",
1042 "AbortMultipartUpload",
1043 "ListParts",
1044 "ListMultipartUploads",
1045 ]
1046 }
1047
1048 fn iam_enforceable(&self) -> bool {
1049 true
1050 }
1051
1052 fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1062 let bucket = request.path_segments.first().map(|s| s.as_str());
1067 let key = if request.path_segments.len() > 1 {
1068 Some(request.path_segments[1..].join("/"))
1069 } else {
1070 None
1071 };
1072 let action = s3_detect_action(
1073 request.method.as_str(),
1074 bucket,
1075 key.as_deref(),
1076 &request.query_params,
1077 )?;
1078 let service = match action {
1082 "CreateSession" => "s3express",
1083 "WriteGetObjectResponse" => "s3-object-lambda",
1084 _ => "s3",
1085 };
1086 let resource = s3_resource_for(action, bucket, key.as_deref());
1087 Some(fakecloud_core::auth::IamAction {
1088 service,
1089 action,
1090 resource,
1091 })
1092 }
1093
1094 fn iam_condition_keys_for(
1095 &self,
1096 request: &AwsRequest,
1097 action: &fakecloud_core::auth::IamAction,
1098 ) -> std::collections::BTreeMap<String, Vec<String>> {
1099 s3_condition_keys(action.action, &request.query_params)
1100 }
1101
1102 fn resource_tags_for(
1103 &self,
1104 resource_arn: &str,
1105 ) -> Option<std::collections::HashMap<String, String>> {
1106 s3_resource_tags(&self.state, resource_arn)
1107 .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1108 }
1109
1110 fn request_tags_from(
1111 &self,
1112 request: &AwsRequest,
1113 action: &str,
1114 ) -> Option<std::collections::HashMap<String, String>> {
1115 s3_request_tags(request, action)
1116 .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1117 }
1118}
1119
1120fn s3_condition_keys(
1127 action: &str,
1128 query: &std::collections::HashMap<String, String>,
1129) -> std::collections::BTreeMap<String, Vec<String>> {
1130 let mut out = std::collections::BTreeMap::new();
1131 if matches!(action, "ListObjects" | "ListObjectsV2") {
1132 if let Some(prefix) = query.get("prefix") {
1134 out.insert("s3:prefix".to_string(), vec![prefix.clone()]);
1135 }
1136 if let Some(delimiter) = query.get("delimiter") {
1137 out.insert("s3:delimiter".to_string(), vec![delimiter.clone()]);
1138 }
1139 if let Some(max_keys) = query.get("max-keys") {
1140 out.insert("s3:max-keys".to_string(), vec![max_keys.clone()]);
1141 }
1142 }
1143 out
1144}
1145
1146fn s3_resource_tags(
1152 state: &SharedS3State,
1153 resource_arn: &str,
1154) -> Option<std::collections::BTreeMap<String, String>> {
1155 if resource_arn == "*" {
1156 return Some(std::collections::BTreeMap::new());
1157 }
1158 let after_prefix = resource_arn.strip_prefix("arn:aws:s3:::")?;
1160 let mas = state.read();
1161 let bucket_name = after_prefix.split('/').next().unwrap_or(after_prefix);
1163 let state = mas
1164 .find_account(|s| s.buckets.contains_key(bucket_name))
1165 .and_then(|id| mas.get(id))
1166 .or_else(|| Some(mas.default_ref()))?;
1167 if let Some(slash_pos) = after_prefix.find('/') {
1168 let bucket_name = &after_prefix[..slash_pos];
1170 let key = &after_prefix[slash_pos + 1..];
1171 let bucket = state.buckets.get(bucket_name)?;
1172 if let Some(obj) = bucket.objects.get(key) {
1174 Some(obj.tags.clone())
1175 } else if let Some(versions) = bucket.object_versions.get(key) {
1176 versions.last().map(|v| v.tags.clone())
1177 } else {
1178 Some(std::collections::BTreeMap::new())
1180 }
1181 } else {
1182 let bucket = state.buckets.get(after_prefix)?;
1184 Some(bucket.tags.clone())
1185 }
1186}
1187
1188fn s3_request_tags(
1194 request: &AwsRequest,
1195 action: &str,
1196) -> Option<std::collections::BTreeMap<String, String>> {
1197 match action {
1198 "PutObject" | "CopyObject" | "CreateMultipartUpload" => {
1199 if let Some(tagging) = request.headers.get("x-amz-tagging") {
1201 let tags = parse_url_encoded_tags(tagging.to_str().unwrap_or(""));
1202 Some(tags.into_iter().collect())
1203 } else {
1204 Some(std::collections::BTreeMap::new())
1205 }
1206 }
1207 "PutBucketTagging" | "PutObjectTagging" => {
1208 let body = std::str::from_utf8(&request.body).unwrap_or("");
1210 let tags = parse_tagging_xml(body);
1211 Some(tags.into_iter().collect())
1212 }
1213 _ => Some(std::collections::BTreeMap::new()),
1214 }
1215}
1216
1217fn s3_detect_action(
1230 method: &str,
1231 bucket: Option<&str>,
1232 key: Option<&str>,
1233 query: &std::collections::HashMap<String, String>,
1234) -> Option<&'static str> {
1235 let has = |q: &str| query.contains_key(q);
1236 let is_get = method == "GET";
1237 let is_put = method == "PUT";
1238 let is_post = method == "POST";
1239 let is_delete = method == "DELETE";
1240
1241 if bucket.is_none() {
1243 return match method {
1244 "GET" => Some("ListBuckets"),
1245 _ => None,
1246 };
1247 }
1248
1249 if is_post && bucket == Some("WriteGetObjectResponse") && key.is_none() {
1256 return Some("WriteGetObjectResponse");
1257 }
1258
1259 let has_key = key.is_some();
1260
1261 if has_key && is_post && has("uploads") {
1263 return Some("CreateMultipartUpload");
1264 }
1265 if has_key && is_post && has("uploadId") {
1266 return Some("CompleteMultipartUpload");
1267 }
1268 if has_key && is_put && has("partNumber") && has("uploadId") {
1269 return Some("UploadPart");
1270 }
1271 if has_key && is_delete && has("uploadId") {
1272 return Some("AbortMultipartUpload");
1273 }
1274 if has_key && is_get && has("uploadId") {
1275 return Some("ListParts");
1276 }
1277 if !has_key && is_get && has("uploads") {
1278 return Some("ListMultipartUploads");
1279 }
1280
1281 if has_key {
1285 if has("tagging") {
1286 return Some(match method {
1287 "GET" => "GetObjectTagging",
1288 "PUT" => "PutObjectTagging",
1289 "DELETE" => "DeleteObjectTagging",
1290 _ => return None,
1291 });
1292 }
1293 if has("acl") {
1294 return Some(match method {
1295 "GET" => "GetObjectAcl",
1296 "PUT" => "PutObjectAcl",
1297 _ => return None,
1298 });
1299 }
1300 if has("retention") {
1301 return Some(match method {
1302 "GET" => "GetObjectRetention",
1303 "PUT" => "PutObjectRetention",
1304 _ => return None,
1305 });
1306 }
1307 if has("legal-hold") {
1308 return Some(match method {
1309 "GET" => "GetObjectLegalHold",
1310 "PUT" => "PutObjectLegalHold",
1311 _ => return None,
1312 });
1313 }
1314 if has("attributes") && is_get {
1319 return Some("GetObjectAttributes");
1320 }
1321 if has("restore") && is_post {
1322 return Some("RestoreObject");
1323 }
1324 if has("renameObject") && is_put {
1330 return Some("RenameObject");
1331 }
1332 if has("encryption") && is_put {
1338 return Some("PutObject");
1339 }
1340 if has("select-type") && is_post {
1344 return Some("GetObject");
1345 }
1346 }
1347
1348 if !has_key {
1350 if has("tagging") {
1351 return Some(match method {
1352 "GET" => "GetBucketTagging",
1353 "PUT" => "PutBucketTagging",
1354 "DELETE" => "DeleteBucketTagging",
1355 _ => return None,
1356 });
1357 }
1358 if has("acl") {
1359 return Some(match method {
1360 "GET" => "GetBucketAcl",
1361 "PUT" => "PutBucketAcl",
1362 _ => return None,
1363 });
1364 }
1365 if has("versioning") {
1366 return Some(match method {
1367 "GET" => "GetBucketVersioning",
1368 "PUT" => "PutBucketVersioning",
1369 _ => return None,
1370 });
1371 }
1372 if has("cors") {
1373 return Some(match method {
1374 "GET" => "GetBucketCors",
1375 "PUT" => "PutBucketCors",
1376 "DELETE" => "DeleteBucketCors",
1377 _ => return None,
1378 });
1379 }
1380 if has("policy") {
1381 return Some(match method {
1382 "GET" => "GetBucketPolicy",
1383 "PUT" => "PutBucketPolicy",
1384 "DELETE" => "DeleteBucketPolicy",
1385 _ => return None,
1386 });
1387 }
1388 if has("website") {
1389 return Some(match method {
1390 "GET" => "GetBucketWebsite",
1391 "PUT" => "PutBucketWebsite",
1392 "DELETE" => "DeleteBucketWebsite",
1393 _ => return None,
1394 });
1395 }
1396 if has("lifecycle") {
1397 return Some(match method {
1398 "GET" => "GetBucketLifecycleConfiguration",
1399 "PUT" => "PutBucketLifecycleConfiguration",
1400 "DELETE" => "DeleteBucketLifecycle",
1401 _ => return None,
1402 });
1403 }
1404 if has("encryption") {
1405 return Some(match method {
1406 "GET" => "GetBucketEncryption",
1407 "PUT" => "PutBucketEncryption",
1408 "DELETE" => "DeleteBucketEncryption",
1409 _ => return None,
1410 });
1411 }
1412 if has("logging") {
1413 return Some(match method {
1414 "GET" => "GetBucketLogging",
1415 "PUT" => "PutBucketLogging",
1416 _ => return None,
1417 });
1418 }
1419 if has("notification") {
1420 return Some(match method {
1421 "GET" => "GetBucketNotificationConfiguration",
1422 "PUT" => "PutBucketNotificationConfiguration",
1423 _ => return None,
1424 });
1425 }
1426 if has("replication") {
1427 return Some(match method {
1428 "GET" => "GetBucketReplication",
1429 "PUT" => "PutBucketReplication",
1430 "DELETE" => "DeleteBucketReplication",
1431 _ => return None,
1432 });
1433 }
1434 if has("ownershipControls") {
1435 return Some(match method {
1436 "GET" => "GetBucketOwnershipControls",
1437 "PUT" => "PutBucketOwnershipControls",
1438 "DELETE" => "DeleteBucketOwnershipControls",
1439 _ => return None,
1440 });
1441 }
1442 if has("publicAccessBlock") {
1443 return Some(match method {
1444 "GET" => "GetPublicAccessBlock",
1445 "PUT" => "PutPublicAccessBlock",
1446 "DELETE" => "DeletePublicAccessBlock",
1447 _ => return None,
1448 });
1449 }
1450 if has("accelerate") {
1451 return Some(match method {
1452 "GET" => "GetBucketAccelerateConfiguration",
1453 "PUT" => "PutBucketAccelerateConfiguration",
1454 _ => return None,
1455 });
1456 }
1457 if has("inventory") {
1458 return Some(match method {
1459 "GET" => "GetBucketInventoryConfiguration",
1460 "PUT" => "PutBucketInventoryConfiguration",
1461 "DELETE" => "DeleteBucketInventoryConfiguration",
1462 _ => return None,
1463 });
1464 }
1465 if has("analytics") {
1466 return Some(match method {
1467 "GET" if has("id") => "GetBucketAnalyticsConfiguration",
1468 "GET" => "ListBucketAnalyticsConfigurations",
1469 "PUT" => "PutBucketAnalyticsConfiguration",
1470 "DELETE" => "DeleteBucketAnalyticsConfiguration",
1471 _ => return None,
1472 });
1473 }
1474 if has("intelligent-tiering") {
1475 return Some(match method {
1476 "GET" if has("id") => "GetBucketIntelligentTieringConfiguration",
1477 "GET" => "ListBucketIntelligentTieringConfigurations",
1478 "PUT" => "PutBucketIntelligentTieringConfiguration",
1479 "DELETE" => "DeleteBucketIntelligentTieringConfiguration",
1480 _ => return None,
1481 });
1482 }
1483 if has("metrics") {
1484 return Some(match method {
1485 "GET" if has("id") => "GetBucketMetricsConfiguration",
1486 "GET" => "ListBucketMetricsConfigurations",
1487 "PUT" => "PutBucketMetricsConfiguration",
1488 "DELETE" => "DeleteBucketMetricsConfiguration",
1489 _ => return None,
1490 });
1491 }
1492 if has("requestPayment") {
1493 return Some(match method {
1494 "GET" => "GetBucketRequestPayment",
1495 "PUT" => "PutBucketRequestPayment",
1496 _ => return None,
1497 });
1498 }
1499 if has("policyStatus") && is_get {
1500 return Some("GetBucketPolicyStatus");
1501 }
1502 if has("metadataConfiguration") {
1503 return Some(match method {
1504 "GET" => "GetBucketMetadataConfiguration",
1505 "POST" => "CreateBucketMetadataConfiguration",
1506 "DELETE" => "DeleteBucketMetadataConfiguration",
1507 _ => return None,
1508 });
1509 }
1510 if has("metadataTable") {
1511 return Some(match method {
1512 "GET" => "GetBucketMetadataTableConfiguration",
1513 "POST" => "CreateBucketMetadataTableConfiguration",
1514 "DELETE" => "DeleteBucketMetadataTableConfiguration",
1515 _ => return None,
1516 });
1517 }
1518 if has("metadataInventoryTable") && is_put {
1519 return Some("UpdateBucketMetadataInventoryTableConfiguration");
1520 }
1521 if has("metadataJournalTable") && is_put {
1522 return Some("UpdateBucketMetadataJournalTableConfiguration");
1523 }
1524 if has("abac") {
1525 return Some(match method {
1531 "PUT" => "PutBucketAbacConfiguration",
1532 "GET" => "GetBucketAbacConfiguration",
1533 _ => return None,
1534 });
1535 }
1536 if has("session") && is_get {
1537 return Some("CreateSession");
1543 }
1544 if has("object-lock") {
1545 return Some(match method {
1546 "GET" => "GetObjectLockConfiguration",
1547 "PUT" => "PutObjectLockConfiguration",
1548 _ => return None,
1549 });
1550 }
1551 if has("location") {
1552 return Some("GetBucketLocation");
1553 }
1554 if is_post && has("delete") {
1555 return Some("DeleteObjects");
1556 }
1557 if is_get && has("versions") {
1558 return Some("ListObjectVersions");
1559 }
1560 }
1561
1562 if is_get && has_key && has("torrent") {
1567 return Some("GetObjectTorrent");
1568 }
1569
1570 match (method, has_key) {
1572 ("GET", true) => Some("GetObject"),
1573 ("PUT", true) => {
1574 Some("PutObject")
1580 }
1581 ("DELETE", true) => Some("DeleteObject"),
1582 ("HEAD", true) => Some("HeadObject"),
1583 ("GET", false) => {
1584 if query.contains_key("list-type") {
1585 Some("ListObjectsV2")
1586 } else {
1587 Some("ListObjects")
1588 }
1589 }
1590 ("PUT", false) => Some("CreateBucket"),
1591 ("DELETE", false) => Some("DeleteBucket"),
1592 ("HEAD", false) => Some("HeadBucket"),
1593 _ => None,
1594 }
1595}
1596
1597fn s3_resource_for(action: &'static str, bucket: Option<&str>, key: Option<&str>) -> String {
1601 const OBJECT_ACTIONS: &[&str] = &[
1603 "PutObject",
1604 "GetObject",
1605 "DeleteObject",
1606 "HeadObject",
1607 "CopyObject",
1608 "GetObjectAttributes",
1609 "RestoreObject",
1610 "PutObjectTagging",
1611 "GetObjectTagging",
1612 "DeleteObjectTagging",
1613 "PutObjectAcl",
1614 "GetObjectAcl",
1615 "PutObjectRetention",
1616 "GetObjectRetention",
1617 "PutObjectLegalHold",
1618 "GetObjectLegalHold",
1619 "CreateMultipartUpload",
1620 "UploadPart",
1621 "UploadPartCopy",
1622 "CompleteMultipartUpload",
1623 "AbortMultipartUpload",
1624 "ListParts",
1625 ];
1626 if action == "ListBuckets" {
1627 return "*".to_string();
1628 }
1629 let Some(bucket) = bucket else {
1630 return "*".to_string();
1631 };
1632 if OBJECT_ACTIONS.contains(&action) {
1633 match key {
1634 Some(k) if !k.is_empty() => Arn::s3(&format!("{bucket}/{k}")).to_string(),
1635 _ => Arn::s3(&format!("{bucket}/*")).to_string(),
1636 }
1637 } else {
1638 Arn::s3(bucket).to_string()
1640 }
1641}
1642
1643pub(crate) fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
1647 dt.with_nanosecond(0).unwrap_or(dt)
1648}
1649
1650pub(crate) fn check_get_conditionals(
1651 req: &AwsRequest,
1652 obj: &S3Object,
1653) -> Result<(), AwsServiceError> {
1654 let obj_etag = format!("\"{}\"", obj.etag);
1655 let obj_time = truncate_to_seconds(obj.last_modified);
1656
1657 if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1659 if !etag_matches(if_match, &obj_etag) {
1660 return Err(precondition_failed("If-Match"));
1661 }
1662 }
1663
1664 if let Some(if_none_match) = req
1666 .headers
1667 .get("if-none-match")
1668 .and_then(|v| v.to_str().ok())
1669 {
1670 if etag_matches(if_none_match, &obj_etag) {
1671 return Err(not_modified_with_etag(&obj_etag));
1672 }
1673 }
1674
1675 if let Some(since) = req
1677 .headers
1678 .get("if-unmodified-since")
1679 .and_then(|v| v.to_str().ok())
1680 {
1681 if let Some(dt) = parse_http_date(since) {
1682 if obj_time > dt {
1683 return Err(precondition_failed("If-Unmodified-Since"));
1684 }
1685 }
1686 }
1687
1688 if let Some(since) = req
1690 .headers
1691 .get("if-modified-since")
1692 .and_then(|v| v.to_str().ok())
1693 {
1694 if let Some(dt) = parse_http_date(since) {
1695 if obj_time <= dt {
1696 return Err(not_modified());
1697 }
1698 }
1699 }
1700
1701 Ok(())
1702}
1703
1704pub(crate) fn check_head_conditionals(
1705 req: &AwsRequest,
1706 obj: &S3Object,
1707) -> Result<(), AwsServiceError> {
1708 let obj_etag = format!("\"{}\"", obj.etag);
1709 let obj_time = truncate_to_seconds(obj.last_modified);
1710
1711 if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1713 if !etag_matches(if_match, &obj_etag) {
1714 return Err(AwsServiceError::aws_error(
1715 StatusCode::PRECONDITION_FAILED,
1716 "412",
1717 "Precondition Failed",
1718 ));
1719 }
1720 }
1721
1722 if let Some(if_none_match) = req
1724 .headers
1725 .get("if-none-match")
1726 .and_then(|v| v.to_str().ok())
1727 {
1728 if etag_matches(if_none_match, &obj_etag) {
1729 return Err(not_modified_with_etag(&obj_etag));
1730 }
1731 }
1732
1733 if let Some(since) = req
1735 .headers
1736 .get("if-unmodified-since")
1737 .and_then(|v| v.to_str().ok())
1738 {
1739 if let Some(dt) = parse_http_date(since) {
1740 if obj_time > dt {
1741 return Err(AwsServiceError::aws_error(
1742 StatusCode::PRECONDITION_FAILED,
1743 "412",
1744 "Precondition Failed",
1745 ));
1746 }
1747 }
1748 }
1749
1750 if let Some(since) = req
1752 .headers
1753 .get("if-modified-since")
1754 .and_then(|v| v.to_str().ok())
1755 {
1756 if let Some(dt) = parse_http_date(since) {
1757 if obj_time <= dt {
1758 return Err(not_modified());
1759 }
1760 }
1761 }
1762
1763 Ok(())
1764}
1765
1766pub(crate) fn etag_matches(condition: &str, obj_etag: &str) -> bool {
1767 let condition = condition.trim();
1768 if condition == "*" {
1769 return true;
1770 }
1771 let clean_etag = obj_etag.replace('"', "");
1772 for part in condition.split(',') {
1774 let part = part.trim().replace('"', "");
1775 if part == clean_etag {
1776 return true;
1777 }
1778 }
1779 false
1780}
1781
1782pub(crate) fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
1783 if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
1785 return Some(dt.with_timezone(&Utc));
1786 }
1787 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1789 return Some(dt.with_timezone(&Utc));
1790 }
1791 if let Ok(dt) =
1793 chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
1794 {
1795 return Some(dt.and_utc());
1796 }
1797 if let Ok(dt) = s.parse::<DateTime<Utc>>() {
1799 return Some(dt);
1800 }
1801 None
1802}
1803
1804pub(crate) fn not_modified() -> AwsServiceError {
1805 AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
1806}
1807
1808pub(crate) fn not_modified_with_etag(etag: &str) -> AwsServiceError {
1809 AwsServiceError::aws_error_with_headers(
1810 StatusCode::NOT_MODIFIED,
1811 "304",
1812 "Not Modified",
1813 vec![("etag".to_string(), etag.to_string())],
1814 )
1815}
1816
1817pub(crate) fn precondition_failed(condition: &str) -> AwsServiceError {
1818 AwsServiceError::aws_error_with_fields(
1819 StatusCode::PRECONDITION_FAILED,
1820 "PreconditionFailed",
1821 "At least one of the pre-conditions you specified did not hold",
1822 vec![("Condition".to_string(), condition.to_string())],
1823 )
1824}
1825
1826pub(crate) fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
1829 let mut grants_xml = String::new();
1830 for g in grants {
1831 let grantee_xml = if g.grantee_type == "Group" {
1832 let uri = g.grantee_uri.as_deref().unwrap_or("");
1833 format!(
1834 "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
1835 <URI>{}</URI></Grantee>",
1836 xml_escape(uri),
1837 )
1838 } else {
1839 let id = g.grantee_id.as_deref().unwrap_or("");
1840 format!(
1841 "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
1842 <ID>{}</ID></Grantee>",
1843 xml_escape(id),
1844 )
1845 };
1846 grants_xml.push_str(&format!(
1847 "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
1848 xml_escape(&g.permission),
1849 ));
1850 }
1851
1852 format!(
1853 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1854 <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1855 <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1856 <AccessControlList>{grants_xml}</AccessControlList>\
1857 </AccessControlPolicy>",
1858 owner_id = xml_escape(owner_id),
1859 )
1860}
1861
1862pub(crate) fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1863 let owner_grant = AclGrant {
1864 grantee_type: "CanonicalUser".to_string(),
1865 grantee_id: Some(owner_id.to_string()),
1866 grantee_display_name: Some(owner_id.to_string()),
1867 grantee_uri: None,
1868 permission: "FULL_CONTROL".to_string(),
1869 };
1870 match acl {
1871 "private" => vec![owner_grant],
1872 "public-read" => vec![
1873 owner_grant,
1874 AclGrant {
1875 grantee_type: "Group".to_string(),
1876 grantee_id: None,
1877 grantee_display_name: None,
1878 grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1879 permission: "READ".to_string(),
1880 },
1881 ],
1882 "public-read-write" => vec![
1883 owner_grant,
1884 AclGrant {
1885 grantee_type: "Group".to_string(),
1886 grantee_id: None,
1887 grantee_display_name: None,
1888 grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1889 permission: "READ".to_string(),
1890 },
1891 AclGrant {
1892 grantee_type: "Group".to_string(),
1893 grantee_id: None,
1894 grantee_display_name: None,
1895 grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1896 permission: "WRITE".to_string(),
1897 },
1898 ],
1899 "authenticated-read" => vec![
1900 owner_grant,
1901 AclGrant {
1902 grantee_type: "Group".to_string(),
1903 grantee_id: None,
1904 grantee_display_name: None,
1905 grantee_uri: Some(
1906 "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
1907 ),
1908 permission: "READ".to_string(),
1909 },
1910 ],
1911 "bucket-owner-full-control" => vec![owner_grant],
1912 _ => vec![owner_grant],
1913 }
1914}
1915
1916pub(crate) fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1917 canned_acl_grants(acl, owner_id)
1919}
1920
1921pub(crate) fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
1922 let mut grants = Vec::new();
1923 let header_permission_map = [
1924 ("x-amz-grant-read", "READ"),
1925 ("x-amz-grant-write", "WRITE"),
1926 ("x-amz-grant-read-acp", "READ_ACP"),
1927 ("x-amz-grant-write-acp", "WRITE_ACP"),
1928 ("x-amz-grant-full-control", "FULL_CONTROL"),
1929 ];
1930
1931 for (header, permission) in &header_permission_map {
1932 if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
1933 for part in value.split(',') {
1935 let part = part.trim();
1936 if let Some((key, val)) = part.split_once('=') {
1937 let val = val.trim().trim_matches('"');
1938 let key = key.trim().to_lowercase();
1939 match key.as_str() {
1940 "id" => {
1941 grants.push(AclGrant {
1942 grantee_type: "CanonicalUser".to_string(),
1943 grantee_id: Some(val.to_string()),
1944 grantee_display_name: Some(val.to_string()),
1945 grantee_uri: None,
1946 permission: permission.to_string(),
1947 });
1948 }
1949 "uri" | "url" => {
1950 grants.push(AclGrant {
1951 grantee_type: "Group".to_string(),
1952 grantee_id: None,
1953 grantee_display_name: None,
1954 grantee_uri: Some(val.to_string()),
1955 permission: permission.to_string(),
1956 });
1957 }
1958 _ => {}
1959 }
1960 }
1961 }
1962 }
1963 }
1964 grants
1965}
1966
1967pub(crate) fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
1968 if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
1970 return Err(AwsServiceError::aws_error(
1971 StatusCode::BAD_REQUEST,
1972 "MalformedACLError",
1973 "The XML you provided was not well-formed or did not validate against our published schema",
1974 ));
1975 }
1976
1977 let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
1978
1979 let mut grants = Vec::new();
1980 let mut remaining = xml;
1981 while let Some(start) = remaining.find("<Grant>") {
1982 let after = &remaining[start + 7..];
1983 if let Some(end) = after.find("</Grant>") {
1984 let grant_body = &after[..end];
1985
1986 let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
1988 if !valid_permissions.contains(&permission.as_str()) {
1989 return Err(AwsServiceError::aws_error(
1990 StatusCode::BAD_REQUEST,
1991 "MalformedACLError",
1992 "The XML you provided was not well-formed or did not validate against our published schema",
1993 ));
1994 }
1995
1996 if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
1998 let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
1999 grants.push(AclGrant {
2000 grantee_type: "Group".to_string(),
2001 grantee_id: None,
2002 grantee_display_name: None,
2003 grantee_uri: Some(uri),
2004 permission,
2005 });
2006 } else {
2007 let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
2008 let display =
2009 extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
2010 grants.push(AclGrant {
2011 grantee_type: "CanonicalUser".to_string(),
2012 grantee_id: Some(id),
2013 grantee_display_name: Some(display),
2014 grantee_uri: None,
2015 permission,
2016 });
2017 }
2018
2019 remaining = &after[end + 8..];
2020 } else {
2021 break;
2022 }
2023 }
2024 Ok(grants)
2025}
2026
2027pub(crate) enum RangeResult {
2030 Satisfiable { start: usize, end: usize },
2031 NotSatisfiable,
2032 Ignored,
2033}
2034
2035pub(crate) fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
2036 let range_str = range_str.strip_prefix("bytes=")?;
2037 let (start_str, end_str) = range_str.split_once('-')?;
2038 if start_str.is_empty() {
2039 let suffix_len: usize = end_str.parse().ok()?;
2040 if suffix_len == 0 || total_size == 0 {
2041 return Some(RangeResult::NotSatisfiable);
2042 }
2043 let start = total_size.saturating_sub(suffix_len);
2044 Some(RangeResult::Satisfiable {
2045 start,
2046 end: total_size - 1,
2047 })
2048 } else {
2049 let start: usize = start_str.parse().ok()?;
2050 if start >= total_size {
2051 return Some(RangeResult::NotSatisfiable);
2052 }
2053 let end = if end_str.is_empty() {
2054 total_size - 1
2055 } else {
2056 let e: usize = end_str.parse().ok()?;
2057 if e < start {
2058 return Some(RangeResult::Ignored);
2059 }
2060 std::cmp::min(e, total_size - 1)
2061 };
2062 Some(RangeResult::Satisfiable { start, end })
2063 }
2064}
2065
2066pub(crate) fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
2070 AwsResponse {
2071 status,
2072 content_type: "application/xml".to_string(),
2073 body: body.into().into(),
2074 headers: HeaderMap::new(),
2075 }
2076}
2077
2078pub(crate) fn empty_response(status: StatusCode) -> AwsResponse {
2079 AwsResponse {
2080 status,
2081 content_type: "application/xml".to_string(),
2082 body: Bytes::new().into(),
2083 headers: HeaderMap::new(),
2084 }
2085}
2086
2087pub(crate) fn is_frozen(obj: &S3Object) -> bool {
2090 matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
2091 && obj.restore_ongoing != Some(false)
2092}
2093
2094pub(crate) fn no_such_bucket(bucket: &str) -> AwsServiceError {
2095 AwsServiceError::aws_error_with_fields(
2096 StatusCode::NOT_FOUND,
2097 "NoSuchBucket",
2098 "The specified bucket does not exist",
2099 vec![("BucketName".to_string(), bucket.to_string())],
2100 )
2101}
2102
2103pub(crate) fn no_such_key(key: &str) -> AwsServiceError {
2104 AwsServiceError::aws_error_with_fields(
2105 StatusCode::NOT_FOUND,
2106 "NoSuchKey",
2107 "The specified key does not exist.",
2108 vec![("Key".to_string(), key.to_string())],
2109 )
2110}
2111
2112pub(crate) fn no_such_upload(upload_id: &str) -> AwsServiceError {
2113 AwsServiceError::aws_error_with_fields(
2114 StatusCode::NOT_FOUND,
2115 "NoSuchUpload",
2116 "The specified upload does not exist. The upload ID may be invalid, \
2117 or the upload may have been aborted or completed.",
2118 vec![("UploadId".to_string(), upload_id.to_string())],
2119 )
2120}
2121
2122pub(crate) fn no_such_key_with_detail(key: &str) -> AwsServiceError {
2123 AwsServiceError::aws_error_with_fields(
2124 StatusCode::NOT_FOUND,
2125 "NoSuchKey",
2126 "The specified key does not exist.",
2127 vec![("Key".to_string(), key.to_string())],
2128 )
2129}
2130
2131pub(crate) fn compute_md5(data: &[u8]) -> String {
2132 let digest = Md5::digest(data);
2133 format!("{:x}", digest)
2134}
2135
2136pub(crate) fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
2137 let raw = compute_checksum_raw(algorithm, data);
2138 if raw.is_empty() {
2139 String::new()
2140 } else {
2141 BASE64.encode(raw)
2142 }
2143}
2144
2145pub(crate) fn compute_checksum_raw(algorithm: &str, data: &[u8]) -> Vec<u8> {
2152 match algorithm {
2153 "CRC32" => crc32fast::hash(data).to_be_bytes().to_vec(),
2154 "CRC32C" => crc32c::crc32c(data).to_be_bytes().to_vec(),
2159 "CRC64NVME" => {
2160 let mut hasher = crc64fast_nvme::Digest::new();
2161 hasher.write(data);
2162 hasher.sum64().to_be_bytes().to_vec()
2163 }
2164 "SHA1" => {
2165 use sha1::Digest as _;
2166 sha1::Sha1::digest(data).to_vec()
2167 }
2168 "SHA256" => {
2169 use sha2::Digest as _;
2170 sha2::Sha256::digest(data).to_vec()
2171 }
2172 _ => Vec::new(),
2173 }
2174}
2175
2176pub(crate) fn compute_composite_checksum(
2182 algorithm: &str,
2183 part_raw_digests: &[Vec<u8>],
2184) -> Option<String> {
2185 if part_raw_digests.is_empty() {
2186 return None;
2187 }
2188 let mut concat = Vec::new();
2189 for d in part_raw_digests {
2190 concat.extend_from_slice(d);
2191 }
2192 let digest = compute_checksum_raw(algorithm, &concat);
2193 if digest.is_empty() {
2194 return None;
2195 }
2196 Some(format!(
2197 "{}-{}",
2198 BASE64.encode(digest),
2199 part_raw_digests.len()
2200 ))
2201}
2202
2203pub(crate) async fn compute_checksum_streaming(
2209 algorithm: &str,
2210 path: &std::path::Path,
2211) -> Result<String, std::io::Error> {
2212 use tokio::io::AsyncReadExt;
2213 let mut file = tokio::fs::File::open(path).await?;
2214 let mut buf = vec![0u8; 1024 * 1024];
2215 match algorithm {
2216 "CRC32" => {
2217 let mut hasher = crc32fast::Hasher::new();
2218 loop {
2219 let n = file.read(&mut buf).await?;
2220 if n == 0 {
2221 break;
2222 }
2223 hasher.update(&buf[..n]);
2224 }
2225 Ok(BASE64.encode(hasher.finalize().to_be_bytes()))
2226 }
2227 "CRC32C" => {
2228 let mut crc: u32 = 0;
2229 loop {
2230 let n = file.read(&mut buf).await?;
2231 if n == 0 {
2232 break;
2233 }
2234 crc = crc32c::crc32c_append(crc, &buf[..n]);
2235 }
2236 Ok(BASE64.encode(crc.to_be_bytes()))
2237 }
2238 "CRC64NVME" => {
2239 let mut hasher = crc64fast_nvme::Digest::new();
2240 loop {
2241 let n = file.read(&mut buf).await?;
2242 if n == 0 {
2243 break;
2244 }
2245 hasher.write(&buf[..n]);
2246 }
2247 Ok(BASE64.encode(hasher.sum64().to_be_bytes()))
2248 }
2249 "SHA1" => {
2250 use sha1::Digest as _;
2251 let mut hasher = sha1::Sha1::new();
2252 loop {
2253 let n = file.read(&mut buf).await?;
2254 if n == 0 {
2255 break;
2256 }
2257 hasher.update(&buf[..n]);
2258 }
2259 Ok(BASE64.encode(hasher.finalize()))
2260 }
2261 "SHA256" => {
2262 use sha2::Digest as _;
2263 let mut hasher = sha2::Sha256::new();
2264 loop {
2265 let n = file.read(&mut buf).await?;
2266 if n == 0 {
2267 break;
2268 }
2269 hasher.update(&buf[..n]);
2270 }
2271 Ok(BASE64.encode(hasher.finalize()))
2272 }
2273 _ => Ok(String::new()),
2274 }
2275}
2276
2277pub(crate) fn url_encode_s3_key(s: &str) -> String {
2278 let mut out = String::with_capacity(s.len() * 2);
2279 for byte in s.bytes() {
2280 match byte {
2281 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
2282 out.push(byte as char);
2283 }
2284 _ => {
2285 out.push_str(&format!("%{:02X}", byte));
2286 }
2287 }
2288 }
2289 out
2290}
2291
2292pub(crate) use fakecloud_aws::xml::xml_escape;
2293
2294pub(crate) fn extract_user_metadata(
2295 headers: &HeaderMap,
2296) -> std::collections::BTreeMap<String, String> {
2297 let mut meta = std::collections::BTreeMap::new();
2298 for (name, value) in headers {
2299 if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
2300 if let Ok(v) = value.to_str() {
2301 meta.insert(key.to_string(), v.to_string());
2302 }
2303 }
2304 }
2305 meta
2306}
2307
2308pub(crate) fn is_valid_storage_class(class: &str) -> bool {
2309 matches!(
2310 class,
2311 "STANDARD"
2312 | "REDUCED_REDUNDANCY"
2313 | "STANDARD_IA"
2314 | "ONEZONE_IA"
2315 | "INTELLIGENT_TIERING"
2316 | "GLACIER"
2317 | "DEEP_ARCHIVE"
2318 | "GLACIER_IR"
2319 | "OUTPOSTS"
2320 | "SNOW"
2321 | "EXPRESS_ONEZONE"
2322 )
2323}
2324
2325pub(crate) fn is_valid_bucket_name(name: &str) -> bool {
2326 if name.len() < 3 || name.len() > 63 {
2327 return false;
2328 }
2329 let bytes = name.as_bytes();
2331 if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
2332 return false;
2333 }
2334 name.chars()
2336 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
2337}
2338
2339pub(crate) fn is_valid_region(region: &str) -> bool {
2340 let valid_regions = [
2342 "us-east-1",
2343 "us-east-2",
2344 "us-west-1",
2345 "us-west-2",
2346 "af-south-1",
2347 "ap-east-1",
2348 "ap-south-1",
2349 "ap-south-2",
2350 "ap-southeast-1",
2351 "ap-southeast-2",
2352 "ap-southeast-3",
2353 "ap-southeast-4",
2354 "ap-northeast-1",
2355 "ap-northeast-2",
2356 "ap-northeast-3",
2357 "ca-central-1",
2358 "ca-west-1",
2359 "eu-central-1",
2360 "eu-central-2",
2361 "eu-west-1",
2362 "eu-west-2",
2363 "eu-west-3",
2364 "eu-south-1",
2365 "eu-south-2",
2366 "eu-north-1",
2367 "il-central-1",
2368 "me-south-1",
2369 "me-central-1",
2370 "sa-east-1",
2371 "cn-north-1",
2372 "cn-northwest-1",
2373 "us-gov-east-1",
2374 "us-gov-east-2",
2375 "us-gov-west-1",
2376 "us-iso-east-1",
2377 "us-iso-west-1",
2378 "us-isob-east-1",
2379 "us-isof-south-1",
2380 ];
2381 valid_regions.contains(®ion)
2382}
2383
2384pub(crate) fn resolve_object<'a>(
2385 b: &'a S3Bucket,
2386 key: &str,
2387 version_id: Option<&String>,
2388) -> Result<&'a S3Object, AwsServiceError> {
2389 if let Some(vid) = version_id {
2390 if vid == "null" {
2392 if let Some(versions) = b.object_versions.get(key) {
2394 if let Some(obj) = versions
2395 .iter()
2396 .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
2397 {
2398 return Ok(obj);
2399 }
2400 }
2401 if let Some(obj) = b.objects.get(key) {
2403 if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
2404 return Ok(obj);
2405 }
2406 }
2407 } else {
2408 if let Some(versions) = b.object_versions.get(key) {
2410 if let Some(obj) = versions
2411 .iter()
2412 .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
2413 {
2414 return Ok(obj);
2415 }
2416 }
2417 if let Some(obj) = b.objects.get(key) {
2419 if obj.version_id.as_deref() == Some(vid.as_str()) {
2420 return Ok(obj);
2421 }
2422 }
2423 }
2424 if b.versioning.is_some() {
2426 Err(AwsServiceError::aws_error_with_fields(
2427 StatusCode::NOT_FOUND,
2428 "NoSuchVersion",
2429 "The specified version does not exist.",
2430 vec![
2431 ("Key".to_string(), key.to_string()),
2432 ("VersionId".to_string(), vid.to_string()),
2433 ],
2434 ))
2435 } else {
2436 Err(AwsServiceError::aws_error(
2437 StatusCode::BAD_REQUEST,
2438 "InvalidArgument",
2439 "Invalid version id specified",
2440 ))
2441 }
2442 } else {
2443 b.objects.get(key).ok_or_else(|| no_such_key(key))
2444 }
2445}
2446
2447pub(crate) fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
2448 S3Object {
2449 key: key.to_string(),
2450 last_modified: Utc::now(),
2451 storage_class: "STANDARD".to_string(),
2452 version_id: Some(dm_id.to_string()),
2453 is_delete_marker: true,
2454 ..Default::default()
2455 }
2456}
2457
2458pub(crate) struct DeleteObjectEntry {
2460 key: String,
2461 version_id: Option<String>,
2462}
2463
2464pub(crate) fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
2465 let mut entries = Vec::new();
2466 let mut remaining = xml;
2467 while let Some(obj_start) = remaining.find("<Object>") {
2468 let after = &remaining[obj_start + 8..];
2469 if let Some(obj_end) = after.find("</Object>") {
2470 let obj_body = &after[..obj_end];
2471 let key = extract_xml_value(obj_body, "Key");
2472 let version_id = extract_xml_value(obj_body, "VersionId");
2473 if let Some(k) = key {
2474 entries.push(DeleteObjectEntry { key: k, version_id });
2475 }
2476 remaining = &after[obj_end + 9..];
2477 } else {
2478 break;
2479 }
2480 }
2481 entries
2482}
2483
2484pub(crate) fn parse_delete_objects_quiet(xml: &str) -> bool {
2487 extract_xml_value(xml, "Quiet")
2488 .map(|v| v.eq_ignore_ascii_case("true"))
2489 .unwrap_or(false)
2490}
2491
2492pub(crate) fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
2495 let mut tags = Vec::new();
2496 let mut remaining = xml;
2497 while let Some(tag_start) = remaining.find("<Tag>") {
2498 let after = &remaining[tag_start + 5..];
2499 if let Some(tag_end) = after.find("</Tag>") {
2500 let tag_body = &after[..tag_end];
2501 let key = extract_xml_value(tag_body, "Key");
2502 let value = extract_xml_value(tag_body, "Value");
2503 if let (Some(k), Some(v)) = (key, value) {
2504 tags.push((k, v));
2505 }
2506 remaining = &after[tag_end + 6..];
2507 } else {
2508 break;
2509 }
2510 }
2511 tags
2512}
2513
2514pub(crate) fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
2515 let mut seen = std::collections::HashSet::new();
2517 for (k, _) in tags {
2518 if !seen.insert(k.as_str()) {
2519 return Err(AwsServiceError::aws_error(
2520 StatusCode::BAD_REQUEST,
2521 "InvalidTag",
2522 "Cannot provide multiple Tags with the same key",
2523 ));
2524 }
2525 if k.starts_with("aws:") {
2527 return Err(AwsServiceError::aws_error(
2528 StatusCode::BAD_REQUEST,
2529 "InvalidTag",
2530 "System tags cannot be added/updated by requester",
2531 ));
2532 }
2533 }
2534 Ok(())
2535}
2536
2537pub(crate) fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
2538 let self_closing1 = format!("<{tag} />");
2540 let self_closing2 = format!("<{tag}/>");
2541 if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
2542 let self_pos = xml
2544 .find(&self_closing1)
2545 .or_else(|| xml.find(&self_closing2));
2546 let open = format!("<{tag}>");
2547 let open_pos = xml.find(&open);
2548 match (self_pos, open_pos) {
2549 (Some(sp), Some(op)) if sp < op => return Some(String::new()),
2550 (Some(_), None) => return Some(String::new()),
2551 _ => {}
2552 }
2553 }
2554
2555 let open = format!("<{tag}>");
2556 let close = format!("</{tag}>");
2557 let start = xml.find(&open)? + open.len();
2558 let end = xml[start..].find(&close)? + start;
2562 Some(xml[start..end].to_string())
2563}
2564
2565pub(crate) fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
2567 let mut parts = Vec::new();
2568 let mut remaining = xml;
2569 while let Some(part_start) = remaining.find("<Part>") {
2570 let after = &remaining[part_start + 6..];
2571 if let Some(part_end) = after.find("</Part>") {
2572 let part_body = &after[..part_end];
2573 let part_num =
2574 extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
2575 let etag = extract_xml_value(part_body, "ETag")
2576 .map(|s| s.replace(""", "").replace('"', ""));
2577 if let (Some(num), Some(e)) = (part_num, etag) {
2578 parts.push((num, e));
2579 }
2580 remaining = &after[part_end + 7..];
2581 } else {
2582 break;
2583 }
2584 }
2585 parts
2586}
2587
2588pub(crate) fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
2589 let mut tags = Vec::new();
2590 for pair in s.split('&') {
2591 if pair.is_empty() {
2592 continue;
2593 }
2594 let (key, value) = match pair.find('=') {
2595 Some(pos) => (&pair[..pos], &pair[pos + 1..]),
2596 None => (pair, ""),
2597 };
2598 tags.push((
2599 percent_encoding::percent_decode_str(key)
2600 .decode_utf8_lossy()
2601 .to_string(),
2602 percent_encoding::percent_decode_str(value)
2603 .decode_utf8_lossy()
2604 .to_string(),
2605 ));
2606 }
2607 tags
2608}
2609
2610pub(crate) fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
2612 let malformed = || {
2613 AwsServiceError::aws_error(
2614 StatusCode::BAD_REQUEST,
2615 "MalformedXML",
2616 "The XML you provided was not well-formed or did not validate against our published schema",
2617 )
2618 };
2619
2620 let mut remaining = xml;
2621 while let Some(rule_start) = remaining.find("<Rule>") {
2622 let after = &remaining[rule_start + 6..];
2623 if let Some(rule_end) = after.find("</Rule>") {
2624 let rule_body = &after[..rule_end];
2625
2626 let has_filter = rule_body.contains("<Filter>")
2628 || rule_body.contains("<Filter/>")
2629 || rule_body.contains("<Filter />");
2630
2631 let has_prefix_outside_filter = {
2633 if !rule_body.contains("<Prefix") {
2634 false
2635 } else if !has_filter {
2636 true } else {
2638 let mut stripped = rule_body.to_string();
2640 if let Some(fs) = stripped.find("<Filter") {
2642 if let Some(fe) = stripped.find("</Filter>") {
2643 stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
2644 }
2645 }
2646 stripped.contains("<Prefix")
2647 }
2648 };
2649
2650 if !has_filter && !has_prefix_outside_filter {
2651 return Err(malformed());
2652 }
2653 if has_filter && has_prefix_outside_filter {
2655 return Err(malformed());
2656 }
2657
2658 if let Some(exp_start) = rule_body.find("<Expiration>") {
2661 if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
2662 let exp_body = &rule_body[exp_start..exp_start + exp_end];
2663 if exp_body.contains("<ExpiredObjectDeleteMarker>")
2664 && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
2665 {
2666 return Err(malformed());
2667 }
2668 }
2669 }
2670
2671 if has_filter {
2673 if let Some(fs) = rule_body.find("<Filter>") {
2674 if let Some(fe) = rule_body.find("</Filter>") {
2675 if fe < fs + 8 {
2680 return Err(malformed());
2681 }
2682 let filter_body = &rule_body[fs + 8..fe];
2683 let has_prefix_in_filter = filter_body.contains("<Prefix");
2684 let has_tag_in_filter = filter_body.contains("<Tag>");
2685 let has_and_in_filter = filter_body.contains("<And>");
2686 if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
2688 return Err(malformed());
2689 }
2690 if has_tag_in_filter && has_and_in_filter {
2692 let and_start = filter_body.find("<And>").unwrap_or(0);
2694 let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
2695 if tag_pos < and_start {
2696 return Err(malformed());
2697 }
2698 }
2699 }
2700 }
2701 }
2702
2703 if rule_body.contains("<NoncurrentVersionTransition>") {
2705 let mut nvt_remaining = rule_body;
2706 while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
2707 let nvt_after = &nvt_remaining[nvt_start + 29..];
2708 if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
2709 let nvt_body = &nvt_after[..nvt_end];
2710 if !nvt_body.contains("<NoncurrentDays>") {
2711 return Err(malformed());
2712 }
2713 if !nvt_body.contains("<StorageClass>") {
2714 return Err(malformed());
2715 }
2716 nvt_remaining = &nvt_after[nvt_end + 30..];
2717 } else {
2718 break;
2719 }
2720 }
2721 }
2722
2723 remaining = &after[rule_end + 7..];
2724 } else {
2725 break;
2726 }
2727 }
2728
2729 Ok(())
2730}
2731
2732pub(crate) struct CorsRule {
2734 allowed_origins: Vec<String>,
2735 allowed_methods: Vec<String>,
2736 allowed_headers: Vec<String>,
2737 expose_headers: Vec<String>,
2738 max_age_seconds: Option<u32>,
2739}
2740
2741pub(crate) fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
2743 let mut rules = Vec::new();
2744 let mut remaining = xml;
2745 while let Some(start) = remaining.find("<CORSRule>") {
2746 let after = &remaining[start + 10..];
2747 if let Some(end) = after.find("</CORSRule>") {
2748 let block = &after[..end];
2749 let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
2750 let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
2751 let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
2752 let expose_headers = extract_all_xml_values(block, "ExposeHeader");
2753 let max_age_seconds =
2754 extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
2755 rules.push(CorsRule {
2756 allowed_origins,
2757 allowed_methods,
2758 allowed_headers,
2759 expose_headers,
2760 max_age_seconds,
2761 });
2762 remaining = &after[end + 11..];
2763 } else {
2764 break;
2765 }
2766 }
2767 rules
2768}
2769
2770pub(crate) fn origin_matches(origin: &str, pattern: &str) -> bool {
2772 if pattern == "*" {
2773 return true;
2774 }
2775 if let Some(suffix) = pattern.strip_prefix('*') {
2777 return origin.ends_with(suffix);
2778 }
2779 origin == pattern
2780}
2781
2782pub(crate) fn find_cors_rule<'a>(
2784 rules: &'a [CorsRule],
2785 origin: &str,
2786 method: Option<&str>,
2787) -> Option<&'a CorsRule> {
2788 rules.iter().find(|rule| {
2789 let origin_ok = rule
2790 .allowed_origins
2791 .iter()
2792 .any(|o| origin_matches(origin, o));
2793 let method_ok = match method {
2794 Some(m) => rule.allowed_methods.iter().any(|am| am == m),
2795 None => true,
2796 };
2797 origin_ok && method_ok
2798 })
2799}
2800
2801pub(crate) fn check_object_lock_for_overwrite(
2804 obj: &S3Object,
2805 req: &AwsRequest,
2806) -> Option<&'static str> {
2807 if obj.lock_legal_hold.as_deref() == Some("ON") {
2809 return Some("AccessDenied");
2810 }
2811 if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2813 if *until > Utc::now() {
2814 if mode == "COMPLIANCE" {
2815 return Some("AccessDenied");
2816 }
2817 if mode == "GOVERNANCE" {
2818 let bypass = req
2819 .headers
2820 .get("x-amz-bypass-governance-retention")
2821 .and_then(|v| v.to_str().ok())
2822 .map(|s| s.eq_ignore_ascii_case("true"))
2823 .unwrap_or(false);
2824 if !bypass {
2825 return Some("AccessDenied");
2826 }
2827 }
2828 }
2829 }
2830 None
2831}
2832
2833#[cfg(test)]
2834mod tests;
2835
2836#[cfg(test)]
2837mod s3_detect_action_tests {
2838 use super::s3_detect_action;
2842 use std::collections::HashMap;
2843
2844 fn q(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2845 pairs
2846 .iter()
2847 .map(|(k, v)| (k.to_string(), v.to_string()))
2848 .collect()
2849 }
2850
2851 #[test]
2852 fn select_object_content_maps_to_get_object() {
2853 let action = s3_detect_action(
2855 "POST",
2856 Some("bucket"),
2857 Some("key"),
2858 &q(&[("select", ""), ("select-type", "2")]),
2859 );
2860 assert_eq!(action, Some("GetObject"));
2861 }
2862
2863 #[test]
2864 fn write_get_object_response_is_detected() {
2865 let action = s3_detect_action("POST", Some("WriteGetObjectResponse"), None, &q(&[]));
2867 assert_eq!(action, Some("WriteGetObjectResponse"));
2868 }
2869
2870 #[test]
2871 fn rename_object_is_detected_with_key() {
2872 let action = s3_detect_action(
2875 "PUT",
2876 Some("bucket"),
2877 Some("newkey"),
2878 &q(&[("renameObject", "")]),
2879 );
2880 assert_eq!(action, Some("RenameObject"));
2881 }
2882
2883 #[test]
2884 fn update_object_encryption_maps_to_put_object() {
2885 let action = s3_detect_action(
2889 "PUT",
2890 Some("bucket"),
2891 Some("key"),
2892 &q(&[("encryption", "")]),
2893 );
2894 assert_eq!(action, Some("PutObject"));
2895 }
2896
2897 #[test]
2898 fn create_session_not_misdetected_as_list_objects() {
2899 let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("session", "")]));
2901 assert_eq!(action, Some("CreateSession"));
2902 }
2903
2904 #[test]
2905 fn get_bucket_abac_not_misdetected_as_list_objects() {
2906 let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("abac", "")]));
2908 assert_eq!(action, Some("GetBucketAbacConfiguration"));
2909 }
2910
2911 #[test]
2912 fn put_bucket_abac_still_detected() {
2913 let action = s3_detect_action("PUT", Some("bucket"), None, &q(&[("abac", "")]));
2914 assert_eq!(action, Some("PutBucketAbacConfiguration"));
2915 }
2916}
2917
2918#[cfg(test)]
2919mod s3_iam_service_prefix_tests {
2920 use super::*;
2923 use parking_lot::RwLock;
2924 use std::sync::Arc;
2925
2926 fn make_service() -> S3Service {
2927 let state: SharedS3State = Arc::new(RwLock::new(
2928 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2929 ));
2930 S3Service::new(state, Arc::new(DeliveryBus::new()))
2931 }
2932
2933 fn req(method: Method, path: &str, query: &[(&str, &str)]) -> AwsRequest {
2934 let path_segments: Vec<String> = path
2935 .split('/')
2936 .filter(|s| !s.is_empty())
2937 .map(|s| s.to_string())
2938 .collect();
2939 AwsRequest {
2940 service: "s3".to_string(),
2941 action: String::new(),
2942 region: "us-east-1".to_string(),
2943 account_id: "123456789012".to_string(),
2944 request_id: "rid".to_string(),
2945 headers: http::HeaderMap::new(),
2946 query_params: query
2947 .iter()
2948 .map(|(k, v)| (k.to_string(), v.to_string()))
2949 .collect(),
2950 body: bytes::Bytes::new(),
2951 body_stream: parking_lot::Mutex::new(None),
2952 path_segments,
2953 raw_path: path.to_string(),
2954 raw_query: String::new(),
2955 method,
2956 is_query_protocol: false,
2957 access_key_id: None,
2958 principal: None,
2959 }
2960 }
2961
2962 #[test]
2963 fn create_session_uses_s3express_prefix() {
2964 let svc = make_service();
2965 let action = svc
2966 .iam_action_for(&req(Method::GET, "/mybucket", &[("session", "")]))
2967 .expect("must be mapped");
2968 assert_eq!(action.service, "s3express");
2969 assert_eq!(action.action, "CreateSession");
2970 assert_eq!(action.action_string(), "s3express:CreateSession");
2971 }
2972
2973 #[test]
2974 fn write_get_object_response_uses_object_lambda_prefix() {
2975 let svc = make_service();
2976 let action = svc
2977 .iam_action_for(&req(Method::POST, "/WriteGetObjectResponse", &[]))
2978 .expect("must be mapped");
2979 assert_eq!(action.service, "s3-object-lambda");
2980 assert_eq!(action.action, "WriteGetObjectResponse");
2981 assert_eq!(
2982 action.action_string(),
2983 "s3-object-lambda:WriteGetObjectResponse"
2984 );
2985 }
2986
2987 #[test]
2988 fn get_object_torrent_maps_to_its_own_action() {
2989 let svc = make_service();
2992 let action = svc
2993 .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[("torrent", "")]))
2994 .expect("must be mapped");
2995 assert_eq!(action.action, "GetObjectTorrent");
2996 let plain = svc
2998 .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[]))
2999 .expect("must be mapped");
3000 assert_eq!(plain.action, "GetObject");
3001 }
3002}
3003
3004#[cfg(test)]
3005mod extract_xml_value_tests {
3006 use super::extract_xml_value;
3007
3008 #[test]
3009 fn returns_inner_value() {
3010 assert_eq!(
3011 extract_xml_value("<Root><Key>value</Key></Root>", "Key"),
3012 Some("value".to_string())
3013 );
3014 }
3015
3016 #[test]
3017 fn missing_tag_is_none() {
3018 assert_eq!(extract_xml_value("<Root></Root>", "Key"), None);
3019 }
3020
3021 #[test]
3024 fn close_before_open_does_not_panic() {
3025 assert_eq!(extract_xml_value("</Key>oops<Key>value", "Key"), None);
3026 }
3027
3028 #[test]
3029 fn open_without_close_is_none() {
3030 assert_eq!(extract_xml_value("<Key>value", "Key"), None);
3031 }
3032}
3033
3034#[cfg(test)]
3035mod compute_checksum_tests {
3036 use super::{compute_checksum, compute_checksum_streaming};
3037
3038 #[test]
3043 fn crc32c_is_non_empty_and_matches_known_vector() {
3044 let out = compute_checksum("CRC32C", b"123456789");
3046 assert!(!out.is_empty());
3047 let bytes = base64::engine::general_purpose::STANDARD
3048 .decode(&out)
3049 .unwrap();
3050 assert_eq!(bytes, 0xE306_9283u32.to_be_bytes());
3051 }
3052
3053 #[test]
3054 fn crc64nvme_is_non_empty() {
3055 let out = compute_checksum("CRC64NVME", b"hello world");
3056 assert!(!out.is_empty());
3057 let bytes = base64::engine::general_purpose::STANDARD
3059 .decode(&out)
3060 .unwrap();
3061 assert_eq!(bytes.len(), 8);
3062 }
3063
3064 #[tokio::test]
3065 async fn non_streaming_matches_streaming_for_all_algorithms() {
3066 let data = b"the quick brown fox jumps over the lazy dog".repeat(100);
3067 let tmp = tempfile::NamedTempFile::new().unwrap();
3068 tokio::fs::write(tmp.path(), &data).await.unwrap();
3069
3070 for algo in ["CRC32", "CRC32C", "CRC64NVME", "SHA1", "SHA256"] {
3071 let direct = compute_checksum(algo, &data);
3072 let streamed = compute_checksum_streaming(algo, tmp.path()).await.unwrap();
3073 assert!(!direct.is_empty(), "{algo} direct empty");
3074 assert_eq!(direct, streamed, "{algo} direct != streaming");
3075 }
3076 }
3077
3078 use base64::Engine as _;
3079}