1use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54const SAMPLE_BYTES: usize = 4096;
56
57struct AccessLogPreamble {
61 remote_ip: Option<String>,
62 requester: Option<String>,
63 request_uri: String,
64 user_agent: Option<String>,
65}
66
67pub struct S4Service<B: S3> {
68 backend: Arc<B>,
73 registry: Arc<CodecRegistry>,
74 dispatcher: Arc<dyn CodecDispatcher>,
75 max_body_bytes: usize,
76 policy: Option<crate::policy::SharedPolicy>,
77 secure_transport: bool,
82 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
84 access_log: Option<crate::access_log::SharedAccessLog>,
86 sse_keyring: Option<crate::sse::SharedSseKeyring>,
94 versioning: Option<Arc<crate::versioning::VersioningManager>>,
104 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
113 kms_default_key_id: Option<String>,
114 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
125 cors: Option<Arc<crate::cors::CorsManager>>,
134 inventory: Option<Arc<crate::inventory::InventoryManager>>,
145 notifications: Option<Arc<crate::notifications::NotificationManager>>,
153 lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
166 tagging: Option<Arc<crate::tagging::TagManager>>,
179 replication: Option<Arc<crate::replication::ReplicationManager>>,
198 mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
207 compliance_strict: bool,
213}
214
215impl<B: S3> S4Service<B> {
216 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
218
219 pub fn new(
220 backend: B,
221 registry: Arc<CodecRegistry>,
222 dispatcher: Arc<dyn CodecDispatcher>,
223 ) -> Self {
224 Self {
225 backend: Arc::new(backend),
226 registry,
227 dispatcher,
228 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
229 policy: None,
230 secure_transport: false,
231 rate_limits: None,
232 access_log: None,
233 sse_keyring: None,
234 versioning: None,
235 kms: None,
236 kms_default_key_id: None,
237 object_lock: None,
238 cors: None,
239 inventory: None,
240 notifications: None,
241 lifecycle: None,
242 tagging: None,
243 replication: None,
244 mfa_delete: None,
245 compliance_strict: false,
246 }
247 }
248
249 #[must_use]
256 pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
257 self.tagging = Some(mgr);
258 self
259 }
260
261 #[must_use]
265 pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
266 self.tagging.as_ref()
267 }
268
269 #[must_use]
279 pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
280 self.inventory = Some(mgr);
281 self
282 }
283
284 #[must_use]
289 pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
290 self.inventory.as_ref()
291 }
292
293 #[must_use]
303 pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
304 self.lifecycle = Some(mgr);
305 self
306 }
307
308 #[must_use]
313 pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
314 self.lifecycle.as_ref()
315 }
316
317 #[must_use]
326 pub fn run_lifecycle_once_for_test(
327 &self,
328 bucket: &str,
329 objects: &[crate::lifecycle::EvaluateBatchEntry],
330 ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
331 let Some(mgr) = self.lifecycle.as_ref() else {
332 return Vec::new();
333 };
334 crate::lifecycle::evaluate_batch(mgr, bucket, objects)
335 }
336
337 #[must_use]
347 pub fn with_notifications(
348 mut self,
349 mgr: Arc<crate::notifications::NotificationManager>,
350 ) -> Self {
351 self.notifications = Some(mgr);
352 self
353 }
354
355 #[must_use]
359 pub fn notifications_manager(
360 &self,
361 ) -> Option<&Arc<crate::notifications::NotificationManager>> {
362 self.notifications.as_ref()
363 }
364
365 fn fire_delete_notification(
370 &self,
371 bucket: &str,
372 key: &str,
373 event: crate::notifications::EventType,
374 version_id: Option<String>,
375 ) {
376 let Some(mgr) = self.notifications.as_ref() else {
377 return;
378 };
379 let dests = mgr.match_destinations(bucket, &event, key);
380 if dests.is_empty() {
381 return;
382 }
383 tokio::spawn(crate::notifications::dispatch_event(
384 Arc::clone(mgr),
385 bucket.to_owned(),
386 key.to_owned(),
387 event,
388 None,
389 None,
390 version_id,
391 format!("S4-{}", uuid::Uuid::new_v4()),
392 ));
393 }
394
395 #[must_use]
407 pub fn with_replication(
408 mut self,
409 mgr: Arc<crate::replication::ReplicationManager>,
410 ) -> Self {
411 self.replication = Some(mgr);
412 self
413 }
414
415 #[must_use]
419 pub fn replication_manager(
420 &self,
421 ) -> Option<&Arc<crate::replication::ReplicationManager>> {
422 self.replication.as_ref()
423 }
424
425 fn spawn_replication_if_matched(
435 &self,
436 source_bucket: &str,
437 source_key: &str,
438 request_tags: &Option<crate::tagging::TagSet>,
439 body: &bytes::Bytes,
440 metadata: &Option<std::collections::HashMap<String, String>>,
441 backend_ok: bool,
442 ) where
443 B: Send + Sync + 'static,
444 {
445 if !backend_ok {
446 return;
447 }
448 let Some(mgr) = self.replication.as_ref() else {
449 return;
450 };
451 let object_tags: Vec<(String, String)> = request_tags
458 .as_ref()
459 .map(|ts| ts.iter().cloned().collect())
460 .unwrap_or_default();
461 let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
462 return;
463 };
464 mgr.record_status(
468 source_bucket,
469 source_key,
470 crate::replication::ReplicationStatus::Pending,
471 );
472 let mgr_cl = Arc::clone(mgr);
473 let backend = Arc::clone(&self.backend);
474 let body_cl = body.clone();
475 let metadata_cl = metadata.clone();
476 let source_bucket_cl = source_bucket.to_owned();
477 let source_key_cl = source_key.to_owned();
478 tokio::spawn(async move {
479 let do_put = move |dest_bucket: String,
480 dest_key: String,
481 dest_body: bytes::Bytes,
482 dest_meta: Option<std::collections::HashMap<String, String>>| {
483 let backend = Arc::clone(&backend);
484 async move {
485 let req = S3Request {
486 input: PutObjectInput {
487 bucket: dest_bucket,
488 key: dest_key,
489 body: Some(bytes_to_blob(dest_body)),
490 metadata: dest_meta,
491 ..Default::default()
492 },
493 method: http::Method::PUT,
494 uri: "/".parse().unwrap(),
495 headers: http::HeaderMap::new(),
496 extensions: http::Extensions::new(),
497 credentials: None,
498 region: None,
499 service: None,
500 trailing_headers: None,
501 };
502 backend
503 .put_object(req)
504 .await
505 .map(|_| ())
506 .map_err(|e| format!("destination put_object: {e}"))
507 }
508 };
509 crate::replication::replicate_object(
510 rule,
511 source_bucket_cl,
512 source_key_cl,
513 body_cl,
514 metadata_cl,
515 do_put,
516 mgr_cl,
517 )
518 .await;
519 });
520 }
521
522 #[must_use]
529 pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
530 self.mfa_delete = Some(mgr);
531 self
532 }
533
534 #[must_use]
538 pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
539 self.mfa_delete.as_ref()
540 }
541
542 #[must_use]
548 pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
549 self.cors = Some(mgr);
550 self
551 }
552
553 #[must_use]
555 pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
556 self.cors.as_ref()
557 }
558
559 #[must_use]
576 pub fn handle_preflight(
577 &self,
578 bucket: &str,
579 origin: &str,
580 method: &str,
581 request_headers: &[String],
582 ) -> Option<std::collections::HashMap<String, String>> {
583 let mgr = self.cors.as_ref()?;
584 let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
585 let mut h = std::collections::HashMap::new();
586 let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
590 "*".to_string()
591 } else {
592 origin.to_string()
593 };
594 h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
595 h.insert(
596 "Access-Control-Allow-Methods".to_string(),
597 rule.allowed_methods.join(", "),
598 );
599 if !rule.allowed_headers.is_empty() {
600 h.insert(
605 "Access-Control-Allow-Headers".to_string(),
606 rule.allowed_headers.join(", "),
607 );
608 }
609 if let Some(secs) = rule.max_age_seconds {
610 h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
611 }
612 if !rule.expose_headers.is_empty() {
613 h.insert(
614 "Access-Control-Expose-Headers".to_string(),
615 rule.expose_headers.join(", "),
616 );
617 }
618 Some(h)
619 }
620
621 #[must_use]
628 pub fn with_compliance_strict(mut self, on: bool) -> Self {
629 self.compliance_strict = on;
630 self
631 }
632
633 #[must_use]
639 pub fn with_object_lock(
640 mut self,
641 mgr: Arc<crate::object_lock::ObjectLockManager>,
642 ) -> Self {
643 self.object_lock = Some(mgr);
644 self
645 }
646
647 #[must_use]
651 pub fn with_kms_backend(
652 mut self,
653 kms: Arc<dyn crate::kms::KmsBackend>,
654 default_key_id: Option<String>,
655 ) -> Self {
656 self.kms = Some(kms);
657 self.kms_default_key_id = default_key_id;
658 self
659 }
660
661 #[must_use]
673 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
674 self.versioning = Some(mgr);
675 self
676 }
677
678 #[must_use]
685 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
686 let keyring = crate::sse::SseKeyring::new(1, key);
687 self.sse_keyring = Some(std::sync::Arc::new(keyring));
688 self
689 }
690
691 #[must_use]
697 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
698 self.sse_keyring = Some(keyring);
699 self
700 }
701
702 #[must_use]
708 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
709 self.access_log = Some(log);
710 self
711 }
712
713 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
718 self.access_log.as_ref()?;
719 Some(AccessLogPreamble {
720 remote_ip: req
721 .headers
722 .get("x-forwarded-for")
723 .and_then(|v| v.to_str().ok())
724 .and_then(|raw| raw.split(',').next())
725 .map(|s| s.trim().to_owned()),
726 requester: Self::principal_of(req).map(str::to_owned),
727 request_uri: format!("{} {}", req.method, req.uri.path()),
728 user_agent: req
729 .headers
730 .get("user-agent")
731 .and_then(|v| v.to_str().ok())
732 .map(str::to_owned),
733 })
734 }
735
736 #[allow(clippy::too_many_arguments)]
740 async fn record_access(
741 &self,
742 preamble: Option<AccessLogPreamble>,
743 operation: &'static str,
744 bucket: &str,
745 key: Option<&str>,
746 http_status: u16,
747 bytes_sent: u64,
748 object_size: u64,
749 total_time_ms: u64,
750 error_code: Option<&str>,
751 ) {
752 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
753 return;
754 };
755 log.record(crate::access_log::AccessLogEntry {
756 time: std::time::SystemTime::now(),
757 bucket: bucket.to_owned(),
758 remote_ip: p.remote_ip,
759 requester: p.requester,
760 operation,
761 key: key.map(str::to_owned),
762 request_uri: p.request_uri,
763 http_status,
764 error_code: error_code.map(str::to_owned),
765 bytes_sent,
766 object_size,
767 total_time_ms,
768 user_agent: p.user_agent,
769 })
770 .await;
771 }
772
773 #[must_use]
779 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
780 self.rate_limits = Some(rl);
781 self
782 }
783
784 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
788 let Some(rl) = self.rate_limits.as_ref() else {
789 return Ok(());
790 };
791 let principal_id = Self::principal_of(req);
792 if !rl.check(principal_id, bucket) {
793 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
794 return Err(S3Error::with_message(
795 S3ErrorCode::SlowDown,
796 format!("rate-limited: bucket={bucket}"),
797 ));
798 }
799 Ok(())
800 }
801
802 #[must_use]
806 pub fn with_secure_transport(mut self, on: bool) -> Self {
807 self.secure_transport = on;
808 self
809 }
810
811 #[must_use]
812 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
813 self.max_body_bytes = n;
814 self
815 }
816
817 #[must_use]
822 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
823 self.policy = Some(policy);
824 self
825 }
826
827 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
830 req.credentials.as_ref().map(|c| c.access_key.as_str())
831 }
832
833 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
840 let user_agent = req
841 .headers
842 .get("user-agent")
843 .and_then(|v| v.to_str().ok())
844 .map(str::to_owned);
845 let source_ip = req
848 .headers
849 .get("x-forwarded-for")
850 .and_then(|v| v.to_str().ok())
851 .and_then(|raw| raw.split(',').next())
852 .and_then(|s| s.trim().parse().ok());
853 crate::policy::RequestContext {
854 source_ip,
855 user_agent,
856 request_time: Some(std::time::SystemTime::now()),
857 secure_transport: self.secure_transport,
858 existing_object_tags: None,
859 request_object_tags: None,
860 extra: Default::default(),
861 }
862 }
863
864 fn enforce_policy<I>(
869 &self,
870 req: &S3Request<I>,
871 action: &'static str,
872 bucket: &str,
873 key: Option<&str>,
874 ) -> S3Result<()> {
875 self.enforce_policy_with_extra(req, action, bucket, key, None, None)
876 }
877
878 fn enforce_policy_with_extra<I>(
885 &self,
886 req: &S3Request<I>,
887 action: &'static str,
888 bucket: &str,
889 key: Option<&str>,
890 request_tags: Option<&crate::tagging::TagSet>,
891 existing_tags: Option<&crate::tagging::TagSet>,
892 ) -> S3Result<()> {
893 let Some(policy) = self.policy.as_ref() else {
894 return Ok(());
895 };
896 let principal_id = Self::principal_of(req);
897 let mut ctx = self.request_context(req);
898 if let Some(t) = request_tags {
899 ctx.request_object_tags = Some(t.clone());
900 }
901 if let Some(t) = existing_tags {
902 ctx.existing_object_tags = Some(t.clone());
903 }
904 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
905 if decision.allow {
906 Ok(())
907 } else {
908 crate::metrics::record_policy_denial(action, bucket);
909 tracing::info!(
910 action,
911 bucket,
912 key = ?key,
913 principal = ?principal_id,
914 source_ip = ?ctx.source_ip,
915 user_agent = ?ctx.user_agent,
916 secure_transport = ctx.secure_transport,
917 matched_sid = ?decision.matched_sid,
918 effect = ?decision.matched_effect,
919 "S4 policy denied request"
920 );
921 Err(S3Error::with_message(
922 S3ErrorCode::AccessDenied,
923 format!("denied by S4 policy: {action} on bucket={bucket}"),
924 ))
925 }
926 }
927
928 pub fn into_backend(self) -> B {
934 Arc::try_unwrap(self.backend)
935 .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
936 }
937
938 async fn partial_range_get(
941 &self,
942 req: &S3Request<GetObjectInput>,
943 plan: s4_codec::index::RangePlan,
944 client_start: u64,
945 client_end_exclusive: u64,
946 total_original: u64,
947 get_start: Instant,
948 ) -> S3Result<S3Response<GetObjectOutput>> {
949 let backend_range = s3s::dto::Range::Int {
951 first: plan.byte_start,
952 last: Some(plan.byte_end_exclusive - 1),
953 };
954 let backend_input = GetObjectInput {
955 bucket: req.input.bucket.clone(),
956 key: req.input.key.clone(),
957 range: Some(backend_range),
958 ..Default::default()
959 };
960 let backend_req = S3Request {
961 input: backend_input,
962 method: req.method.clone(),
963 uri: req.uri.clone(),
964 headers: req.headers.clone(),
965 extensions: http::Extensions::new(),
966 credentials: req.credentials.clone(),
967 region: req.region.clone(),
968 service: req.service.clone(),
969 trailing_headers: None,
970 };
971 let mut backend_resp = self.backend.get_object(backend_req).await?;
972 let blob = backend_resp.output.body.take().ok_or_else(|| {
973 S3Error::with_message(
974 S3ErrorCode::InternalError,
975 "backend partial GET returned empty body",
976 )
977 })?;
978 let bytes = collect_blob(blob, self.max_body_bytes)
979 .await
980 .map_err(internal("collect partial body"))?;
981
982 let mut combined = BytesMut::new();
984 for frame in FrameIter::new(bytes) {
985 let (header, payload) = frame.map_err(|e| {
986 S3Error::with_message(
987 S3ErrorCode::InternalError,
988 format!("partial-range frame parse: {e}"),
989 )
990 })?;
991 let chunk_manifest = ChunkManifest {
992 codec: header.codec,
993 original_size: header.original_size,
994 compressed_size: header.compressed_size,
995 crc32c: header.crc32c,
996 };
997 let decompressed = self
998 .registry
999 .decompress(payload, &chunk_manifest)
1000 .await
1001 .map_err(internal("partial-range decompress"))?;
1002 combined.extend_from_slice(&decompressed);
1003 }
1004 let combined = combined.freeze();
1005 let sliced = combined
1006 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1007
1008 let returned_size = sliced.len() as u64;
1010 backend_resp.output.content_length = Some(returned_size as i64);
1011 backend_resp.output.content_range = Some(format!(
1012 "bytes {client_start}-{}/{total_original}",
1013 client_end_exclusive - 1
1014 ));
1015 backend_resp.output.checksum_crc32 = None;
1016 backend_resp.output.checksum_crc32c = None;
1017 backend_resp.output.checksum_crc64nvme = None;
1018 backend_resp.output.checksum_sha1 = None;
1019 backend_resp.output.checksum_sha256 = None;
1020 backend_resp.output.e_tag = None;
1021 backend_resp.output.body = Some(bytes_to_blob(sliced));
1022 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1023
1024 let elapsed = get_start.elapsed();
1025 crate::metrics::record_get(
1026 "partial",
1027 plan.byte_end_exclusive - plan.byte_start,
1028 returned_size,
1029 elapsed.as_secs_f64(),
1030 true,
1031 );
1032 info!(
1033 op = "get_object",
1034 bucket = %req.input.bucket,
1035 key = %req.input.key,
1036 bytes_in = plan.byte_end_exclusive - plan.byte_start,
1037 bytes_out = returned_size,
1038 total_object_size = total_original,
1039 range = true,
1040 path = "sidecar-partial",
1041 latency_ms = elapsed.as_millis() as u64,
1042 "S4 partial Range GET via sidecar index"
1043 );
1044 Ok(backend_resp)
1045 }
1046
1047 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1051 let bytes = encode_index(index);
1052 let len = bytes.len() as i64;
1053 let put_input = PutObjectInput {
1054 bucket: bucket.into(),
1055 key: sidecar_key(key),
1056 body: Some(bytes_to_blob(bytes)),
1057 content_length: Some(len),
1058 content_type: Some("application/x-s4-index".into()),
1059 ..Default::default()
1060 };
1061 let put_req = S3Request {
1062 input: put_input,
1063 method: http::Method::PUT,
1064 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
1065 headers: http::HeaderMap::new(),
1066 extensions: http::Extensions::new(),
1067 credentials: None,
1068 region: None,
1069 service: None,
1070 trailing_headers: None,
1071 };
1072 if let Err(e) = self.backend.put_object(put_req).await {
1073 tracing::warn!(
1074 bucket,
1075 key,
1076 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1077 );
1078 }
1079 }
1080
1081 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1083 let get_input = GetObjectInput {
1084 bucket: bucket.into(),
1085 key: sidecar_key(key),
1086 ..Default::default()
1087 };
1088 let get_req = S3Request {
1089 input: get_input,
1090 method: http::Method::GET,
1091 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
1092 headers: http::HeaderMap::new(),
1093 extensions: http::Extensions::new(),
1094 credentials: None,
1095 region: None,
1096 service: None,
1097 trailing_headers: None,
1098 };
1099 let resp = self.backend.get_object(get_req).await.ok()?;
1100 let blob = resp.output.body?;
1101 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1102 decode_index(bytes).ok()
1103 }
1104
1105 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1111 let mut out = BytesMut::new();
1112 for frame in FrameIter::new(bytes) {
1113 let (header, payload) = frame.map_err(|e| {
1114 S3Error::with_message(
1115 S3ErrorCode::InternalError,
1116 format!("multipart frame parse: {e}"),
1117 )
1118 })?;
1119 let chunk_manifest = ChunkManifest {
1120 codec: header.codec,
1121 original_size: header.original_size,
1122 compressed_size: header.compressed_size,
1123 crc32c: header.crc32c,
1124 };
1125 let decompressed = self
1126 .registry
1127 .decompress(payload, &chunk_manifest)
1128 .await
1129 .map_err(internal("multipart frame decompress"))?;
1130 out.extend_from_slice(&decompressed);
1131 }
1132 Ok(out.freeze())
1133 }
1134}
1135
1136fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1141 let rest = s
1142 .strip_prefix("bytes=")
1143 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1144 let (a, b) = rest
1145 .split_once('-')
1146 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1147 let first: u64 = a
1148 .parse()
1149 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1150 let last: u64 = b
1151 .parse()
1152 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1153 if last < first {
1154 return Err(format!("CopySourceRange last < first: {s:?}"));
1155 }
1156 Ok(s3s::dto::Range::Int {
1157 first,
1158 last: Some(last),
1159 })
1160}
1161
1162pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1176 format!("{key}.__s4ver__/{version_id}")
1177}
1178
1179fn is_versioning_shadow_key(key: &str) -> bool {
1182 key.contains(".__s4ver__/")
1183}
1184
1185fn current_unix_secs() -> u64 {
1191 std::time::SystemTime::now()
1192 .duration_since(std::time::UNIX_EPOCH)
1193 .map(|d| d.as_secs())
1194 .unwrap_or(0)
1195}
1196
1197fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1205 match e {
1206 crate::mfa::MfaError::Missing => S3Error::with_message(
1207 S3ErrorCode::AccessDenied,
1208 "MFA token required for this operation",
1209 ),
1210 crate::mfa::MfaError::Malformed => S3Error::with_message(
1211 S3ErrorCode::InvalidRequest,
1212 "malformed x-amz-mfa header",
1213 ),
1214 crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1215 S3ErrorCode::AccessDenied,
1216 "MFA serial does not match configured device",
1217 ),
1218 crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1219 S3ErrorCode::AccessDenied,
1220 "invalid MFA code",
1221 ),
1222 }
1223}
1224
1225fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1226 metadata
1227 .as_ref()
1228 .and_then(|m| m.get(META_MULTIPART))
1229 .map(|v| v == "true")
1230 .unwrap_or(false)
1231}
1232
1233const META_CODEC: &str = "s4-codec";
1234const META_ORIGINAL_SIZE: &str = "s4-original-size";
1235const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1236const META_CRC32C: &str = "s4-crc32c";
1237const META_MULTIPART: &str = "s4-multipart";
1240const META_FRAMED: &str = "s4-framed";
1244
1245fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1246 metadata
1247 .as_ref()
1248 .and_then(|m| m.get(META_FRAMED))
1249 .map(|v| v == "true")
1250 .unwrap_or(false)
1251}
1252
1253fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1255 metadata
1256 .as_ref()
1257 .and_then(|m| m.get("s4-encrypted"))
1258 .map(|v| v == "aes-256-gcm")
1259 .unwrap_or(false)
1260}
1261
1262fn extract_sse_c_material(
1269 algorithm: &Option<String>,
1270 key: &Option<String>,
1271 md5: &Option<String>,
1272) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1273 match (algorithm, key, md5) {
1274 (None, None, None) => Ok(None),
1275 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1276 .map(Some)
1277 .map_err(sse_c_error_to_s3),
1278 _ => Err(S3Error::with_message(
1279 S3ErrorCode::InvalidRequest,
1280 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1281 )),
1282 }
1283}
1284
1285fn extract_kms_key_id(
1288 sse: &Option<ServerSideEncryption>,
1289 sse_kms_key_id: &Option<String>,
1290 gateway_default: Option<&str>,
1291) -> Option<String> {
1292 let asks_for_kms = sse
1293 .as_ref()
1294 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1295 .unwrap_or(false);
1296 if !asks_for_kms {
1297 return None;
1298 }
1299 sse_kms_key_id
1300 .clone()
1301 .or_else(|| gateway_default.map(str::to_owned))
1302}
1303
1304fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1308 use crate::kms::KmsError as K;
1309 match e {
1310 K::KeyNotFound { key_id } => S3Error::with_message(
1311 S3ErrorCode::InvalidArgument,
1312 format!("KMS key not found: {key_id}"),
1313 ),
1314 K::BackendUnavailable { message } => S3Error::with_message(
1315 S3ErrorCode::ServiceUnavailable,
1316 format!("KMS backend unavailable: {message}"),
1317 ),
1318 other => S3Error::with_message(
1319 S3ErrorCode::InternalError,
1320 format!("KMS error: {other}"),
1321 ),
1322 }
1323}
1324
1325fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1329 use crate::sse::SseError as E;
1330 match e {
1331 E::WrongCustomerKey => S3Error::with_message(
1332 S3ErrorCode::AccessDenied,
1333 "SSE-C key does not match the key used at PUT time",
1334 ),
1335 E::InvalidCustomerKey { reason } => S3Error::with_message(
1336 S3ErrorCode::InvalidArgument,
1337 format!("SSE-C: {reason}"),
1338 ),
1339 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1340 S3ErrorCode::InvalidArgument,
1341 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1342 ),
1343 E::CustomerKeyRequired => S3Error::with_message(
1344 S3ErrorCode::InvalidRequest,
1345 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1346 ),
1347 E::CustomerKeyUnexpected => S3Error::with_message(
1348 S3ErrorCode::InvalidRequest,
1349 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1350 ),
1351 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1352 }
1353}
1354
1355fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1356 let m = metadata.as_ref()?;
1357 let codec = m
1358 .get(META_CODEC)
1359 .and_then(|s| s.parse::<CodecKind>().ok())?;
1360 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1361 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1362 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1363 Some(ChunkManifest {
1364 codec,
1365 original_size,
1366 compressed_size,
1367 crc32c,
1368 })
1369}
1370
1371fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1372 let meta = metadata.get_or_insert_with(Default::default);
1373 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1374 meta.insert(
1375 META_ORIGINAL_SIZE.into(),
1376 manifest.original_size.to_string(),
1377 );
1378 meta.insert(
1379 META_COMPRESSED_SIZE.into(),
1380 manifest.compressed_size.to_string(),
1381 );
1382 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1383}
1384
1385fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1386 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1387}
1388
1389fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1395 use crate::select::SelectError;
1396 match e {
1397 SelectError::Parse(msg) => S3Error::with_message(
1398 S3ErrorCode::InvalidRequest,
1399 format!("SQL parse error: {msg}"),
1400 ),
1401 SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1402 S3ErrorCode::InvalidRequest,
1403 format!("unsupported SQL feature: {msg}"),
1404 ),
1405 SelectError::RowEval(msg) => S3Error::with_message(
1406 S3ErrorCode::InvalidRequest,
1407 format!("SQL row evaluation error: {msg}"),
1408 ),
1409 SelectError::InputFormat(msg) => S3Error::with_message(
1410 S3ErrorCode::InvalidRequest,
1411 format!("{fmt} input format error: {msg}"),
1412 ),
1413 }
1414}
1415
1416fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1420 headers
1421 .get("x-amz-bypass-governance-retention")
1422 .and_then(|v| v.to_str().ok())
1423 .map(|s| s.eq_ignore_ascii_case("true"))
1424 .unwrap_or(false)
1425}
1426
1427fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1433 let mut buf = Vec::new();
1434 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1435 let s = std::str::from_utf8(&buf).ok()?;
1436 chrono::DateTime::parse_from_rfc3339(s)
1437 .ok()
1438 .map(|dt| dt.with_timezone(&chrono::Utc))
1439}
1440
1441fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1444 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1449 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1450}
1451
1452fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1457 set.iter()
1458 .map(|(k, v)| Tag {
1459 key: Some(k.clone()),
1460 value: Some(v.clone()),
1461 })
1462 .collect()
1463}
1464
1465fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1470 let pairs = tags
1471 .iter()
1472 .map(|t| {
1473 (
1474 t.key.clone().unwrap_or_default(),
1475 t.value.clone().unwrap_or_default(),
1476 )
1477 })
1478 .collect();
1479 crate::tagging::TagSet::from_pairs(pairs)
1480}
1481
1482pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1486 if total == 0 {
1487 return Err("cannot range-get zero-length object".into());
1488 }
1489 match range {
1490 s3s::dto::Range::Int { first, last } => {
1491 let start = *first;
1492 let end_inclusive = match last {
1493 Some(l) => (*l).min(total - 1),
1494 None => total - 1,
1495 };
1496 if start > end_inclusive || start >= total {
1497 return Err(format!(
1498 "range bytes={start}-{:?} out of object size {total}",
1499 last
1500 ));
1501 }
1502 Ok((start, end_inclusive + 1))
1503 }
1504 s3s::dto::Range::Suffix { length } => {
1505 let len = (*length).min(total);
1506 Ok((total - len, total))
1507 }
1508 }
1509}
1510
1511#[async_trait::async_trait]
1512impl<B: S3> S3 for S4Service<B> {
1513 #[tracing::instrument(
1515 name = "s4.put_object",
1516 skip(self, req),
1517 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1518 )]
1519 async fn put_object(
1520 &self,
1521 mut req: S3Request<PutObjectInput>,
1522 ) -> S3Result<S3Response<PutObjectOutput>> {
1523 let put_start = Instant::now();
1524 let put_bucket = req.input.bucket.clone();
1525 let put_key = req.input.key.clone();
1526 let access_preamble = self.access_log_preamble(&req);
1527 self.enforce_rate_limit(&req, &put_bucket)?;
1528 let request_tags: Option<crate::tagging::TagSet> = req
1534 .input
1535 .tagging
1536 .as_deref()
1537 .map(crate::tagging::parse_tagging_header)
1538 .transpose()
1539 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1540 let existing_tags: Option<crate::tagging::TagSet> = self
1541 .tagging
1542 .as_ref()
1543 .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1544 self.enforce_policy_with_extra(
1545 &req,
1546 "s3:PutObject",
1547 &put_bucket,
1548 Some(&put_key),
1549 request_tags.as_ref(),
1550 existing_tags.as_ref(),
1551 )?;
1552 if let Some(mgr) = self.object_lock.as_ref()
1560 && let Some(state) = mgr.get(&put_bucket, &put_key)
1561 {
1562 let bucket_versioned_enabled = self
1563 .versioning
1564 .as_ref()
1565 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1566 .unwrap_or(false);
1567 if !bucket_versioned_enabled {
1568 let bypass = parse_bypass_governance_header(&req.headers);
1569 let now = chrono::Utc::now();
1570 if !state.can_delete(now, bypass) {
1571 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1572 return Err(S3Error::with_message(
1573 S3ErrorCode::AccessDenied,
1574 "Access Denied because object protected by object lock",
1575 ));
1576 }
1577 }
1578 }
1579 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1585 .input
1586 .object_lock_mode
1587 .as_ref()
1588 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1589 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1590 .input
1591 .object_lock_retain_until_date
1592 .as_ref()
1593 .and_then(timestamp_to_chrono_utc);
1594 let explicit_legal_hold_on: Option<bool> = req
1595 .input
1596 .object_lock_legal_hold_status
1597 .as_ref()
1598 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1599 if let Some(blob) = req.input.body.take() {
1600 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1603 .await
1604 .map_err(internal("peek put sample"))?;
1605 let sample_len = sample.len().min(SAMPLE_BYTES);
1606 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
1607
1608 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1614 let (compressed, manifest, is_framed) = if use_framed {
1615 let chained = chain_sample_with_rest(sample, rest_stream);
1617 debug!(
1618 bucket = ?req.input.bucket,
1619 key = ?req.input.key,
1620 codec = kind.as_str(),
1621 path = "streaming-framed",
1622 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1623 );
1624 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1628 let (body, manifest) = streaming_compress_to_frames(
1629 chained,
1630 Arc::clone(&self.registry),
1631 kind,
1632 chunk_size,
1633 )
1634 .await
1635 .map_err(internal("streaming framed compress"))?;
1636 (body, manifest, true)
1637 } else {
1638 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1641 .await
1642 .map_err(internal("collect put body (buffered path)"))?;
1643 debug!(
1644 bucket = ?req.input.bucket,
1645 key = ?req.input.key,
1646 bytes = bytes.len(),
1647 codec = kind.as_str(),
1648 path = "buffered",
1649 "S4 put_object: compressing (buffered, raw blob)"
1650 );
1651 let (body, m) = self
1652 .registry
1653 .compress(bytes, kind)
1654 .await
1655 .map_err(internal("registry compress"))?;
1656 (body, m, false)
1657 };
1658
1659 write_manifest(&mut req.input.metadata, &manifest);
1660 if is_framed {
1661 req.input
1663 .metadata
1664 .get_or_insert_with(Default::default)
1665 .insert(META_FRAMED.into(), "true".into());
1666 }
1667 req.input.content_length = Some(compressed.len() as i64);
1671 req.input.checksum_algorithm = None;
1676 req.input.checksum_crc32 = None;
1677 req.input.checksum_crc32c = None;
1678 req.input.checksum_crc64nvme = None;
1679 req.input.checksum_sha1 = None;
1680 req.input.checksum_sha256 = None;
1681 req.input.content_md5 = None;
1682 let original_size = manifest.original_size;
1683 let compressed_size = manifest.compressed_size;
1684 let codec_label = manifest.codec.as_str();
1685 let sidecar_index = if is_framed {
1688 s4_codec::index::build_index_from_body(&compressed).ok()
1689 } else {
1690 None
1691 };
1692 let sse_c_material = extract_sse_c_material(
1701 &req.input.sse_customer_algorithm,
1702 &req.input.sse_customer_key,
1703 &req.input.sse_customer_key_md5,
1704 )?;
1705 let kms_key_id = extract_kms_key_id(
1710 &req.input.server_side_encryption,
1711 &req.input.ssekms_key_id,
1712 self.kms_default_key_id.as_deref(),
1713 );
1714 if self.compliance_strict
1721 && sse_c_material.is_none()
1722 && kms_key_id.is_none()
1723 && self.sse_keyring.is_none()
1724 && req
1725 .input
1726 .server_side_encryption
1727 .as_ref()
1728 .map(|s| s.as_str())
1729 != Some(ServerSideEncryption::AES256)
1730 {
1731 return Err(S3Error::with_message(
1732 S3ErrorCode::InvalidRequest,
1733 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1734 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1735 ));
1736 }
1737 if sse_c_material.is_some() && kms_key_id.is_some() {
1740 return Err(S3Error::with_message(
1741 S3ErrorCode::InvalidArgument,
1742 "SSE-C and SSE-KMS cannot be used together on the same PUT",
1743 ));
1744 }
1745 let kms_wrap = if let Some(ref key_id) = kms_key_id {
1748 let kms = self.kms.as_ref().ok_or_else(|| {
1749 S3Error::with_message(
1750 S3ErrorCode::InvalidRequest,
1751 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1752 )
1753 })?;
1754 let (dek, wrapped) = kms
1755 .generate_dek(key_id)
1756 .await
1757 .map_err(kms_error_to_s3)?;
1758 if dek.len() != 32 {
1759 return Err(S3Error::with_message(
1760 S3ErrorCode::InternalError,
1761 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1762 ));
1763 }
1764 let mut dek_arr = [0u8; 32];
1765 dek_arr.copy_from_slice(&dek);
1766 Some((dek_arr, wrapped))
1767 } else {
1768 None
1769 };
1770 let body_to_send = if let Some(ref m) = sse_c_material {
1771 req.input
1772 .metadata
1773 .get_or_insert_with(Default::default)
1774 .insert("s4-encrypted".into(), "aes-256-gcm".into());
1775 crate::sse::encrypt_with_source(
1776 &compressed,
1777 crate::sse::SseSource::CustomerKey {
1778 key: &m.key,
1779 key_md5: &m.key_md5,
1780 },
1781 )
1782 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1783 req.input
1784 .metadata
1785 .get_or_insert_with(Default::default)
1786 .insert("s4-encrypted".into(), "aes-256-gcm".into());
1787 crate::sse::encrypt_with_source(
1788 &compressed,
1789 crate::sse::SseSource::Kms { dek, wrapped },
1790 )
1791 } else if let Some(keyring) = self.sse_keyring.as_ref() {
1792 req.input
1793 .metadata
1794 .get_or_insert_with(Default::default)
1795 .insert("s4-encrypted".into(), "aes-256-gcm".into());
1796 crate::sse::encrypt_v2(&compressed, keyring)
1797 } else {
1798 compressed.clone()
1799 };
1800 let replication_body = body_to_send.clone();
1805 let replication_metadata = req.input.metadata.clone();
1806 req.input.body = Some(bytes_to_blob(body_to_send));
1807 let pending_version: Option<crate::versioning::PutOutcome> = self
1816 .versioning
1817 .as_ref()
1818 .map(|mgr| mgr.state(&put_bucket))
1819 .map(|state| match state {
1820 crate::versioning::VersioningState::Enabled => {
1821 crate::versioning::PutOutcome {
1822 version_id: crate::versioning::VersioningManager::new_version_id(),
1823 versioned_response: true,
1824 }
1825 }
1826 crate::versioning::VersioningState::Suspended
1827 | crate::versioning::VersioningState::Unversioned => {
1828 crate::versioning::PutOutcome {
1829 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1830 versioned_response: false,
1831 }
1832 }
1833 });
1834 if let Some(ref pv) = pending_version
1835 && pv.versioned_response
1836 {
1837 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1838 }
1839 let mut backend_resp = self.backend.put_object(req).await;
1840 if let Some(idx) = sidecar_index
1841 && backend_resp.is_ok()
1842 && idx.entries.len() > 1
1843 {
1844 self.write_sidecar(&put_bucket, &put_key, &idx).await;
1850 }
1851 if let (Some(mgr), Some(pv), Ok(resp)) = (
1855 self.versioning.as_ref(),
1856 pending_version.as_ref(),
1857 backend_resp.as_mut(),
1858 ) {
1859 let etag = resp
1860 .output
1861 .e_tag
1862 .clone()
1863 .map(ETag::into_value)
1864 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
1865 let now = chrono::Utc::now();
1866 mgr.commit_put_with_version(
1867 &put_bucket,
1868 &put_key,
1869 crate::versioning::VersionEntry {
1870 version_id: pv.version_id.clone(),
1871 etag,
1872 size: original_size,
1873 is_delete_marker: false,
1874 created_at: now,
1875 },
1876 );
1877 if pv.versioned_response {
1878 resp.output.version_id = Some(pv.version_id.clone());
1879 }
1880 }
1881 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
1885 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
1886 resp.output.sse_customer_key_md5 = Some(
1887 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
1888 );
1889 }
1890 if let (Some((_, wrapped)), Ok(resp)) =
1894 (kms_wrap.as_ref(), backend_resp.as_mut())
1895 {
1896 resp.output.server_side_encryption =
1897 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
1898 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
1899 }
1900 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
1906 if explicit_lock_mode.is_some()
1907 || explicit_retain_until.is_some()
1908 || explicit_legal_hold_on.is_some()
1909 {
1910 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
1911 if let Some(m) = explicit_lock_mode {
1912 state.mode = Some(m);
1913 }
1914 if let Some(u) = explicit_retain_until {
1915 state.retain_until = Some(u);
1916 }
1917 if let Some(lh) = explicit_legal_hold_on {
1918 state.legal_hold_on = lh;
1919 }
1920 mgr.set(&put_bucket, &put_key, state);
1921 }
1922 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
1923 }
1924 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
1926 crate::metrics::record_put(
1927 codec_label,
1928 original_size,
1929 compressed_size,
1930 elapsed.as_secs_f64(),
1931 backend_resp.is_ok(),
1932 );
1933 self.record_access(
1935 access_preamble,
1936 "REST.PUT.OBJECT",
1937 &put_bucket,
1938 Some(&put_key),
1939 if backend_resp.is_ok() { 200 } else { 500 },
1940 compressed_size,
1941 original_size,
1942 elapsed.as_millis() as u64,
1943 backend_resp.as_ref().err().map(|e| e.code().as_str()),
1944 )
1945 .await;
1946 info!(
1947 op = "put_object",
1948 bucket = %put_bucket,
1949 key = %put_key,
1950 codec = codec_label,
1951 bytes_in = original_size,
1952 bytes_out = compressed_size,
1953 ratio = format!(
1954 "{:.3}",
1955 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
1956 ),
1957 latency_ms = elapsed.as_millis() as u64,
1958 ok = backend_resp.is_ok(),
1959 "S4 put completed"
1960 );
1961 if backend_resp.is_ok()
1966 && let Some(mgr) = self.notifications.as_ref()
1967 {
1968 let dests = mgr.match_destinations(
1969 &put_bucket,
1970 &crate::notifications::EventType::ObjectCreatedPut,
1971 &put_key,
1972 );
1973 if !dests.is_empty() {
1974 let etag = backend_resp
1975 .as_ref()
1976 .ok()
1977 .and_then(|r| r.output.e_tag.clone())
1978 .map(ETag::into_value);
1979 let version_id = pending_version
1980 .as_ref()
1981 .filter(|pv| pv.versioned_response)
1982 .map(|pv| pv.version_id.clone());
1983 tokio::spawn(crate::notifications::dispatch_event(
1984 Arc::clone(mgr),
1985 put_bucket.clone(),
1986 put_key.clone(),
1987 crate::notifications::EventType::ObjectCreatedPut,
1988 Some(original_size),
1989 etag,
1990 version_id,
1991 format!("S4-{}", uuid::Uuid::new_v4()),
1992 ));
1993 }
1994 }
1995 if backend_resp.is_ok()
2000 && let (Some(mgr), Some(tags)) =
2001 (self.tagging.as_ref(), request_tags.clone())
2002 {
2003 mgr.put_object_tags(&put_bucket, &put_key, tags);
2004 }
2005 self.spawn_replication_if_matched(
2014 &put_bucket,
2015 &put_key,
2016 &request_tags,
2017 &replication_body,
2018 &replication_metadata,
2019 backend_resp.is_ok(),
2020 );
2021 return backend_resp;
2022 }
2023 let pending_version: Option<crate::versioning::PutOutcome> = self
2027 .versioning
2028 .as_ref()
2029 .map(|mgr| mgr.state(&put_bucket))
2030 .map(|state| match state {
2031 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2032 version_id: crate::versioning::VersioningManager::new_version_id(),
2033 versioned_response: true,
2034 },
2035 _ => crate::versioning::PutOutcome {
2036 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2037 versioned_response: false,
2038 },
2039 });
2040 if let Some(ref pv) = pending_version
2041 && pv.versioned_response
2042 {
2043 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2044 }
2045 let mut backend_resp = self.backend.put_object(req).await;
2046 if let (Some(mgr), Some(pv), Ok(resp)) = (
2047 self.versioning.as_ref(),
2048 pending_version.as_ref(),
2049 backend_resp.as_mut(),
2050 ) {
2051 let etag = resp
2052 .output
2053 .e_tag
2054 .clone()
2055 .map(ETag::into_value)
2056 .unwrap_or_default();
2057 let now = chrono::Utc::now();
2058 mgr.commit_put_with_version(
2059 &put_bucket,
2060 &put_key,
2061 crate::versioning::VersionEntry {
2062 version_id: pv.version_id.clone(),
2063 etag,
2064 size: 0,
2065 is_delete_marker: false,
2066 created_at: now,
2067 },
2068 );
2069 if pv.versioned_response {
2070 resp.output.version_id = Some(pv.version_id.clone());
2071 }
2072 }
2073 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2077 if explicit_lock_mode.is_some()
2078 || explicit_retain_until.is_some()
2079 || explicit_legal_hold_on.is_some()
2080 {
2081 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2082 if let Some(m) = explicit_lock_mode {
2083 state.mode = Some(m);
2084 }
2085 if let Some(u) = explicit_retain_until {
2086 state.retain_until = Some(u);
2087 }
2088 if let Some(lh) = explicit_legal_hold_on {
2089 state.legal_hold_on = lh;
2090 }
2091 mgr.set(&put_bucket, &put_key, state);
2092 }
2093 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2094 }
2095 if backend_resp.is_ok()
2099 && let Some(mgr) = self.notifications.as_ref()
2100 {
2101 let dests = mgr.match_destinations(
2102 &put_bucket,
2103 &crate::notifications::EventType::ObjectCreatedPut,
2104 &put_key,
2105 );
2106 if !dests.is_empty() {
2107 let etag = backend_resp
2108 .as_ref()
2109 .ok()
2110 .and_then(|r| r.output.e_tag.clone())
2111 .map(ETag::into_value);
2112 let version_id = pending_version
2113 .as_ref()
2114 .filter(|pv| pv.versioned_response)
2115 .map(|pv| pv.version_id.clone());
2116 tokio::spawn(crate::notifications::dispatch_event(
2117 Arc::clone(mgr),
2118 put_bucket.clone(),
2119 put_key.clone(),
2120 crate::notifications::EventType::ObjectCreatedPut,
2121 Some(0),
2122 etag,
2123 version_id,
2124 format!("S4-{}", uuid::Uuid::new_v4()),
2125 ));
2126 }
2127 }
2128 if backend_resp.is_ok()
2132 && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2133 {
2134 mgr.put_object_tags(&put_bucket, &put_key, tags);
2135 }
2136 self.spawn_replication_if_matched(
2139 &put_bucket,
2140 &put_key,
2141 &request_tags,
2142 &bytes::Bytes::new(),
2143 &None,
2144 backend_resp.is_ok(),
2145 );
2146 backend_resp
2147 }
2148
2149 #[tracing::instrument(
2151 name = "s4.get_object",
2152 skip(self, req),
2153 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2154 )]
2155 async fn get_object(
2156 &self,
2157 mut req: S3Request<GetObjectInput>,
2158 ) -> S3Result<S3Response<GetObjectOutput>> {
2159 let get_start = Instant::now();
2160 let get_bucket = req.input.bucket.clone();
2161 let get_key = req.input.key.clone();
2162 self.enforce_rate_limit(&req, &get_bucket)?;
2163 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2164 let range_request = req.input.range.take();
2166 let sse_c_alg = req.input.sse_customer_algorithm.take();
2172 let sse_c_key = req.input.sse_customer_key.take();
2173 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2174 let get_sse_c_material =
2175 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2176
2177 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2190 Some(mgr)
2191 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2192 {
2193 let req_vid = req.input.version_id.take();
2194 let entry = match req_vid.as_deref() {
2195 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2196 || S3Error::with_message(
2197 S3ErrorCode::NoSuchVersion,
2198 format!("no such version: {vid}"),
2199 ),
2200 )?,
2201 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2202 S3Error::with_message(
2203 S3ErrorCode::NoSuchKey,
2204 format!("no such key: {get_key}"),
2205 )
2206 })?,
2207 };
2208 if entry.is_delete_marker {
2209 return Err(S3Error::with_message(
2217 S3ErrorCode::NoSuchKey,
2218 format!("delete marker is the current version of {get_key}"),
2219 ));
2220 }
2221 if entry.version_id != crate::versioning::NULL_VERSION_ID {
2222 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2223 }
2224 Some(entry.version_id)
2225 }
2226 _ => None,
2227 };
2228
2229 if let Some(ref r) = range_request
2233 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2234 {
2235 let total = index.total_original_size();
2236 let (start, end_exclusive) = match resolve_range(r, total) {
2237 Ok(v) => v,
2238 Err(e) => {
2239 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2240 }
2241 };
2242 if let Some(plan) = index.lookup_range(start, end_exclusive) {
2243 return self
2244 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2245 .await;
2246 }
2247 }
2248 let mut resp = self.backend.get_object(req).await?;
2249 if let Some(ref vid) = resolved_version_id {
2254 resp.output.version_id = Some(vid.clone());
2255 }
2256 let is_multipart = is_multipart_object(&resp.output.metadata);
2257 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2258 let needs_frame_parse = is_multipart || is_framed_v2;
2261 let manifest_opt = extract_manifest(&resp.output.metadata);
2262
2263 if !needs_frame_parse && manifest_opt.is_none() {
2264 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2266 return Ok(resp);
2267 }
2268
2269 if let Some(blob) = resp.output.body.take() {
2270 let blob = if is_sse_encrypted(&resp.output.metadata) {
2278 let body = collect_blob(blob, self.max_body_bytes)
2279 .await
2280 .map_err(internal("collect SSE-encrypted body"))?;
2281 let plain = match crate::sse::peek_magic(&body) {
2286 Some("S4E4") => {
2287 let kms = self.kms.as_ref().ok_or_else(|| {
2288 S3Error::with_message(
2289 S3ErrorCode::InvalidRequest,
2290 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2291 )
2292 })?;
2293 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2294 crate::sse::decrypt_with_kms(&body, kms_ref)
2295 .await
2296 .map_err(|e| match e {
2297 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2298 other => S3Error::with_message(
2299 S3ErrorCode::InternalError,
2300 format!("SSE-KMS decrypt failed: {other}"),
2301 ),
2302 })?
2303 }
2304 _ => {
2305 if let Some(ref m) = get_sse_c_material {
2306 crate::sse::decrypt(
2307 &body,
2308 crate::sse::SseSource::CustomerKey {
2309 key: &m.key,
2310 key_md5: &m.key_md5,
2311 },
2312 )
2313 .map_err(sse_c_error_to_s3)?
2314 } else {
2315 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2316 S3Error::with_message(
2317 S3ErrorCode::InvalidRequest,
2318 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2319 )
2320 })?;
2321 crate::sse::decrypt(&body, keyring).map_err(|e| {
2322 S3Error::with_message(
2323 S3ErrorCode::InternalError,
2324 format!("SSE-S4 decrypt failed: {e}"),
2325 )
2326 })?
2327 }
2328 }
2329 };
2330 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2333 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2334 {
2335 resp.output.server_side_encryption = Some(
2336 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2337 );
2338 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2339 }
2340 bytes_to_blob(plain)
2341 } else if let Some(ref m) = get_sse_c_material {
2342 let _ = m;
2345 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2346 } else {
2347 blob
2348 };
2349 if let Some(ref m) = get_sse_c_material {
2352 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2353 resp.output.sse_customer_key_md5 = Some(
2354 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2355 );
2356 }
2357 if range_request.is_none()
2365 && !needs_frame_parse
2366 && let Some(ref m) = manifest_opt
2367 && supports_streaming_decompress(m.codec)
2368 && m.codec == CodecKind::CpuZstd
2369 {
2370 let decompressed_blob = cpu_zstd_decompress_stream(blob);
2371 resp.output.content_length = Some(m.original_size as i64);
2372 resp.output.checksum_crc32 = None;
2373 resp.output.checksum_crc32c = None;
2374 resp.output.checksum_crc64nvme = None;
2375 resp.output.checksum_sha1 = None;
2376 resp.output.checksum_sha256 = None;
2377 resp.output.e_tag = None;
2378 resp.output.body = Some(decompressed_blob);
2379 let elapsed = get_start.elapsed();
2380 crate::metrics::record_get(
2381 m.codec.as_str(),
2382 m.compressed_size,
2383 m.original_size,
2384 elapsed.as_secs_f64(),
2385 true,
2386 );
2387 info!(
2388 op = "get_object",
2389 bucket = %get_bucket,
2390 key = %get_key,
2391 codec = m.codec.as_str(),
2392 bytes_in = m.compressed_size,
2393 bytes_out = m.original_size,
2394 path = "streaming",
2395 setup_latency_ms = elapsed.as_millis() as u64,
2396 "S4 get started (streaming)"
2397 );
2398 return Ok(resp);
2399 }
2400 if range_request.is_none()
2402 && !needs_frame_parse
2403 && let Some(ref m) = manifest_opt
2404 && m.codec == CodecKind::Passthrough
2405 {
2406 resp.output.content_length = Some(m.original_size as i64);
2407 resp.output.checksum_crc32 = None;
2408 resp.output.checksum_crc32c = None;
2409 resp.output.checksum_crc64nvme = None;
2410 resp.output.checksum_sha1 = None;
2411 resp.output.checksum_sha256 = None;
2412 resp.output.e_tag = None;
2413 resp.output.body = Some(blob);
2414 debug!("S4 get_object: passthrough streaming");
2415 return Ok(resp);
2416 }
2417
2418 let bytes = collect_blob(blob, self.max_body_bytes)
2420 .await
2421 .map_err(internal("collect get body"))?;
2422
2423 let decompressed = if needs_frame_parse {
2424 self.decompress_multipart(bytes).await?
2427 } else {
2428 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2429 self.registry
2430 .decompress(bytes, manifest)
2431 .await
2432 .map_err(internal("registry decompress"))?
2433 };
2434
2435 let total_size = decompressed.len() as u64;
2437 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2438 let (start, end) = resolve_range(r, total_size)
2439 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2440 let sliced = decompressed.slice(start as usize..end as usize);
2441 resp.output.content_range = Some(format!(
2442 "bytes {start}-{}/{total_size}",
2443 end.saturating_sub(1)
2444 ));
2445 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2446 } else {
2447 (decompressed, None)
2448 };
2449 resp.output.content_length = Some(final_bytes.len() as i64);
2452 resp.output.checksum_crc32 = None;
2457 resp.output.checksum_crc32c = None;
2458 resp.output.checksum_crc64nvme = None;
2459 resp.output.checksum_sha1 = None;
2460 resp.output.checksum_sha256 = None;
2461 resp.output.e_tag = None;
2462 let returned_size = final_bytes.len() as u64;
2463 let codec_label = manifest_opt
2464 .as_ref()
2465 .map(|m| m.codec.as_str())
2466 .unwrap_or("multipart");
2467 resp.output.body = Some(bytes_to_blob(final_bytes));
2468 if let Some(status) = status_override {
2469 resp.status = Some(status);
2470 }
2471 let elapsed = get_start.elapsed();
2472 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2473 info!(
2474 op = "get_object",
2475 bucket = %get_bucket,
2476 key = %get_key,
2477 codec = codec_label,
2478 bytes_out = returned_size,
2479 total_object_size = total_size,
2480 range = range_request.is_some(),
2481 path = "buffered",
2482 latency_ms = elapsed.as_millis() as u64,
2483 "S4 get completed (buffered)"
2484 );
2485 }
2486 if let Some(mgr) = self.replication.as_ref()
2489 && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2490 {
2491 resp.output.replication_status =
2492 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2493 }
2494 Ok(resp)
2495 }
2496
2497 async fn head_bucket(
2499 &self,
2500 req: S3Request<HeadBucketInput>,
2501 ) -> S3Result<S3Response<HeadBucketOutput>> {
2502 self.backend.head_bucket(req).await
2503 }
2504 async fn list_buckets(
2505 &self,
2506 req: S3Request<ListBucketsInput>,
2507 ) -> S3Result<S3Response<ListBucketsOutput>> {
2508 self.backend.list_buckets(req).await
2509 }
2510 async fn create_bucket(
2511 &self,
2512 req: S3Request<CreateBucketInput>,
2513 ) -> S3Result<S3Response<CreateBucketOutput>> {
2514 self.backend.create_bucket(req).await
2515 }
2516 async fn delete_bucket(
2517 &self,
2518 req: S3Request<DeleteBucketInput>,
2519 ) -> S3Result<S3Response<DeleteBucketOutput>> {
2520 self.backend.delete_bucket(req).await
2521 }
2522 async fn head_object(
2523 &self,
2524 req: S3Request<HeadObjectInput>,
2525 ) -> S3Result<S3Response<HeadObjectOutput>> {
2526 let head_bucket = req.input.bucket.clone();
2529 let head_key = req.input.key.clone();
2530 let mut resp = self.backend.head_object(req).await?;
2531 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2532 resp.output.content_length = Some(manifest.original_size as i64);
2536 resp.output.checksum_crc32 = None;
2537 resp.output.checksum_crc32c = None;
2538 resp.output.checksum_crc64nvme = None;
2539 resp.output.checksum_sha1 = None;
2540 resp.output.checksum_sha256 = None;
2541 resp.output.e_tag = None;
2542 }
2543 if let Some(mgr) = self.replication.as_ref()
2546 && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2547 {
2548 resp.output.replication_status =
2549 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2550 }
2551 Ok(resp)
2552 }
2553 async fn delete_object(
2554 &self,
2555 mut req: S3Request<DeleteObjectInput>,
2556 ) -> S3Result<S3Response<DeleteObjectOutput>> {
2557 let bucket = req.input.bucket.clone();
2558 let key = req.input.key.clone();
2559 self.enforce_rate_limit(&req, &bucket)?;
2560 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2561 if let Some(mgr) = self.mfa_delete.as_ref()
2568 && mgr.is_enabled(&bucket)
2569 {
2570 let header = req.input.mfa.as_deref();
2571 if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2572 crate::metrics::record_mfa_delete_denial(&bucket);
2573 return Err(mfa_error_to_s3(e));
2574 }
2575 }
2576 if let Some(mgr) = self.object_lock.as_ref()
2584 && let Some(state) = mgr.get(&bucket, &key)
2585 {
2586 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2587 let now = chrono::Utc::now();
2588 if !state.can_delete(now, bypass) {
2589 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2590 return Err(S3Error::with_message(
2591 S3ErrorCode::AccessDenied,
2592 "Access Denied because object protected by object lock",
2593 ));
2594 }
2595 }
2596 if let Some(mgr) = self.versioning.as_ref() {
2612 let state = mgr.state(&bucket);
2613 if state != crate::versioning::VersioningState::Unversioned {
2614 let req_vid = req.input.version_id.take();
2615 if let Some(vid) = req_vid {
2616 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2620 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2621 key.clone()
2622 } else {
2623 versioned_shadow_key(&key, &vid)
2624 };
2625 let was_real_version = outcome
2626 .as_ref()
2627 .map(|o| !o.is_delete_marker)
2628 .unwrap_or(false);
2629 if was_real_version {
2630 let backend_input = DeleteObjectInput {
2634 bucket: bucket.clone(),
2635 key: backend_target,
2636 ..Default::default()
2637 };
2638 let backend_req = S3Request {
2639 input: backend_input,
2640 method: http::Method::DELETE,
2641 uri: req.uri.clone(),
2642 headers: req.headers.clone(),
2643 extensions: http::Extensions::new(),
2644 credentials: req.credentials.clone(),
2645 region: req.region.clone(),
2646 service: req.service.clone(),
2647 trailing_headers: None,
2648 };
2649 let _ = self.backend.delete_object(backend_req).await;
2650 }
2651 let mut output = DeleteObjectOutput {
2652 version_id: Some(vid.clone()),
2653 ..Default::default()
2654 };
2655 if let Some(o) = outcome.as_ref()
2656 && o.is_delete_marker
2657 {
2658 output.delete_marker = Some(true);
2659 }
2660 self.fire_delete_notification(
2664 &bucket,
2665 &key,
2666 crate::notifications::EventType::ObjectRemovedDelete,
2667 Some(vid.clone()),
2668 );
2669 return Ok(S3Response::new(output));
2670 }
2671 let outcome = mgr.record_delete(&bucket, &key);
2673 if state == crate::versioning::VersioningState::Suspended {
2674 let backend_input = DeleteObjectInput {
2677 bucket: bucket.clone(),
2678 key: key.clone(),
2679 ..Default::default()
2680 };
2681 let backend_req = S3Request {
2682 input: backend_input,
2683 method: http::Method::DELETE,
2684 uri: req.uri.clone(),
2685 headers: req.headers.clone(),
2686 extensions: http::Extensions::new(),
2687 credentials: req.credentials.clone(),
2688 region: req.region.clone(),
2689 service: req.service.clone(),
2690 trailing_headers: None,
2691 };
2692 let _ = self.backend.delete_object(backend_req).await;
2693 }
2694 let output = DeleteObjectOutput {
2695 delete_marker: Some(true),
2696 version_id: outcome.version_id.clone(),
2697 ..Default::default()
2698 };
2699 self.fire_delete_notification(
2704 &bucket,
2705 &key,
2706 crate::notifications::EventType::ObjectRemovedDeleteMarker,
2707 outcome.version_id,
2708 );
2709 return Ok(S3Response::new(output));
2710 }
2711 }
2712 let resp = self.backend.delete_object(req).await?;
2715 if let Some(mgr) = self.object_lock.as_ref() {
2720 mgr.clear(&bucket, &key);
2721 }
2722 if let Some(mgr) = self.tagging.as_ref() {
2728 mgr.delete_object_tags(&bucket, &key);
2729 }
2730 let sidecar_input = DeleteObjectInput {
2731 bucket: bucket.clone(),
2732 key: sidecar_key(&key),
2733 ..Default::default()
2734 };
2735 let sidecar_req = S3Request {
2736 input: sidecar_input,
2737 method: http::Method::DELETE,
2738 uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
2739 headers: http::HeaderMap::new(),
2740 extensions: http::Extensions::new(),
2741 credentials: None,
2742 region: None,
2743 service: None,
2744 trailing_headers: None,
2745 };
2746 let _ = self.backend.delete_object(sidecar_req).await;
2747 self.fire_delete_notification(
2750 &bucket,
2751 &key,
2752 crate::notifications::EventType::ObjectRemovedDelete,
2753 None,
2754 );
2755 Ok(resp)
2756 }
2757 async fn delete_objects(
2758 &self,
2759 req: S3Request<DeleteObjectsInput>,
2760 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
2761 if let Some(mgr) = self.mfa_delete.as_ref()
2765 && mgr.is_enabled(&req.input.bucket)
2766 {
2767 let header = req.input.mfa.as_deref();
2768 if let Err(e) =
2769 crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
2770 {
2771 crate::metrics::record_mfa_delete_denial(&req.input.bucket);
2772 return Err(mfa_error_to_s3(e));
2773 }
2774 }
2775 self.backend.delete_objects(req).await
2776 }
2777 async fn copy_object(
2778 &self,
2779 mut req: S3Request<CopyObjectInput>,
2780 ) -> S3Result<S3Response<CopyObjectOutput>> {
2781 let dst_bucket = req.input.bucket.clone();
2783 let dst_key = req.input.key.clone();
2784 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
2785 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2786 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
2787 }
2788 let needs_merge = req
2798 .input
2799 .metadata_directive
2800 .as_ref()
2801 .map(|d| d.as_str() == MetadataDirective::REPLACE)
2802 .unwrap_or(false);
2803 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2804 let head_input = HeadObjectInput {
2805 bucket: bucket.to_string(),
2806 key: key.to_string(),
2807 ..Default::default()
2808 };
2809 let head_req = S3Request {
2810 input: head_input,
2811 method: req.method.clone(),
2812 uri: req.uri.clone(),
2813 headers: req.headers.clone(),
2814 extensions: http::Extensions::new(),
2815 credentials: req.credentials.clone(),
2816 region: req.region.clone(),
2817 service: req.service.clone(),
2818 trailing_headers: None,
2819 };
2820 if let Ok(head) = self.backend.head_object(head_req).await
2821 && let Some(src_meta) = head.output.metadata.as_ref()
2822 {
2823 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
2824 for key in [
2825 META_CODEC,
2826 META_ORIGINAL_SIZE,
2827 META_COMPRESSED_SIZE,
2828 META_CRC32C,
2829 META_MULTIPART,
2830 META_FRAMED,
2831 ] {
2832 if let Some(v) = src_meta.get(key) {
2833 dest_meta
2836 .entry(key.to_string())
2837 .or_insert_with(|| v.clone());
2838 }
2839 }
2840 debug!(
2841 src_bucket = %bucket,
2842 src_key = %key,
2843 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
2844 );
2845 }
2846 }
2847 self.backend.copy_object(req).await
2848 }
2849 async fn list_objects(
2850 &self,
2851 req: S3Request<ListObjectsInput>,
2852 ) -> S3Result<S3Response<ListObjectsOutput>> {
2853 self.enforce_rate_limit(&req, &req.input.bucket)?;
2854 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2855 let mut resp = self.backend.list_objects(req).await?;
2856 if let Some(contents) = resp.output.contents.as_mut() {
2859 contents.retain(|o| {
2860 o.key
2861 .as_ref()
2862 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2863 .unwrap_or(true)
2864 });
2865 }
2866 Ok(resp)
2867 }
2868 async fn list_objects_v2(
2869 &self,
2870 req: S3Request<ListObjectsV2Input>,
2871 ) -> S3Result<S3Response<ListObjectsV2Output>> {
2872 self.enforce_rate_limit(&req, &req.input.bucket)?;
2873 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2874 let mut resp = self.backend.list_objects_v2(req).await?;
2875 if let Some(contents) = resp.output.contents.as_mut() {
2876 let before = contents.len();
2877 contents.retain(|o| {
2878 o.key
2879 .as_ref()
2880 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2881 .unwrap_or(true)
2882 });
2883 if let Some(kc) = resp.output.key_count.as_mut() {
2885 *kc -= (before - contents.len()) as i32;
2886 }
2887 }
2888 Ok(resp)
2889 }
2890 async fn list_object_versions(
2898 &self,
2899 req: S3Request<ListObjectVersionsInput>,
2900 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
2901 self.enforce_rate_limit(&req, &req.input.bucket)?;
2902 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2903 if let Some(mgr) = self.versioning.as_ref()
2905 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
2906 {
2907 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
2908 let page = mgr.list_versions(
2909 &req.input.bucket,
2910 req.input.prefix.as_deref(),
2911 req.input.key_marker.as_deref(),
2912 req.input.version_id_marker.as_deref(),
2913 max_keys,
2914 );
2915 let versions: Vec<ObjectVersion> = page
2916 .versions
2917 .into_iter()
2918 .map(|e| ObjectVersion {
2919 key: Some(e.key),
2920 version_id: Some(e.version_id),
2921 is_latest: Some(e.is_latest),
2922 e_tag: Some(ETag::Strong(e.etag)),
2923 size: Some(e.size as i64),
2924 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2925 ..Default::default()
2926 })
2927 .collect();
2928 let delete_markers: Vec<DeleteMarkerEntry> = page
2929 .delete_markers
2930 .into_iter()
2931 .map(|e| DeleteMarkerEntry {
2932 key: Some(e.key),
2933 version_id: Some(e.version_id),
2934 is_latest: Some(e.is_latest),
2935 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2936 ..Default::default()
2937 })
2938 .collect();
2939 let output = ListObjectVersionsOutput {
2940 name: Some(req.input.bucket.clone()),
2941 prefix: req.input.prefix.clone(),
2942 key_marker: req.input.key_marker.clone(),
2943 version_id_marker: req.input.version_id_marker.clone(),
2944 max_keys: req.input.max_keys,
2945 versions: if versions.is_empty() {
2946 None
2947 } else {
2948 Some(versions)
2949 },
2950 delete_markers: if delete_markers.is_empty() {
2951 None
2952 } else {
2953 Some(delete_markers)
2954 },
2955 is_truncated: Some(page.is_truncated),
2956 next_key_marker: page.next_key_marker,
2957 next_version_id_marker: page.next_version_id_marker,
2958 ..Default::default()
2959 };
2960 return Ok(S3Response::new(output));
2961 }
2962 let mut resp = self.backend.list_object_versions(req).await?;
2964 if let Some(versions) = resp.output.versions.as_mut() {
2965 versions.retain(|v| {
2966 v.key
2967 .as_ref()
2968 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2969 .unwrap_or(true)
2970 });
2971 }
2972 if let Some(markers) = resp.output.delete_markers.as_mut() {
2973 markers.retain(|m| {
2974 m.key
2975 .as_ref()
2976 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2977 .unwrap_or(true)
2978 });
2979 }
2980 Ok(resp)
2981 }
2982
2983 async fn create_multipart_upload(
2984 &self,
2985 mut req: S3Request<CreateMultipartUploadInput>,
2986 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
2987 let codec_kind = self.registry.default_kind();
2991 let meta = req.input.metadata.get_or_insert_with(Default::default);
2992 meta.insert(META_MULTIPART.into(), "true".into());
2993 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
2994 debug!(
2995 bucket = ?req.input.bucket,
2996 key = ?req.input.key,
2997 codec = codec_kind.as_str(),
2998 "S4 create_multipart_upload: marking object for per-part compression"
2999 );
3000 self.backend.create_multipart_upload(req).await
3001 }
3002
3003 async fn upload_part(
3004 &self,
3005 mut req: S3Request<UploadPartInput>,
3006 ) -> S3Result<S3Response<UploadPartOutput>> {
3007 if let Some(blob) = req.input.body.take() {
3013 let bytes = collect_blob(blob, self.max_body_bytes)
3014 .await
3015 .map_err(internal("collect upload_part body"))?;
3016 let sample_len = bytes.len().min(SAMPLE_BYTES);
3017 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3018 let original_size = bytes.len() as u64;
3019 let (compressed, manifest) = self
3020 .registry
3021 .compress(bytes, codec_kind)
3022 .await
3023 .map_err(internal("registry compress part"))?;
3024 let header = FrameHeader {
3025 codec: codec_kind,
3026 original_size,
3027 compressed_size: compressed.len() as u64,
3028 crc32c: manifest.crc32c,
3029 };
3030 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3031 write_frame(&mut framed, header, &compressed);
3032 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3046 if !likely_final {
3047 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3048 }
3049 let framed_bytes = framed.freeze();
3050 let new_len = framed_bytes.len() as i64;
3051 req.input.content_length = Some(new_len);
3053 req.input.checksum_algorithm = None;
3054 req.input.checksum_crc32 = None;
3055 req.input.checksum_crc32c = None;
3056 req.input.checksum_crc64nvme = None;
3057 req.input.checksum_sha1 = None;
3058 req.input.checksum_sha256 = None;
3059 req.input.content_md5 = None;
3060 req.input.body = Some(bytes_to_blob(framed_bytes));
3061 debug!(
3062 part_number = ?req.input.part_number,
3063 upload_id = ?req.input.upload_id,
3064 original_size,
3065 framed_size = new_len,
3066 "S4 upload_part: framed compressed payload"
3067 );
3068 }
3069 self.backend.upload_part(req).await
3070 }
3071 async fn complete_multipart_upload(
3072 &self,
3073 req: S3Request<CompleteMultipartUploadInput>,
3074 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3075 let bucket = req.input.bucket.clone();
3076 let key = req.input.key.clone();
3077 let resp = self.backend.complete_multipart_upload(req).await?;
3078 let bucket_clone = bucket.clone();
3084 let key_clone = key.clone();
3085 let get_input = GetObjectInput {
3086 bucket: bucket_clone.clone(),
3087 key: key_clone.clone(),
3088 ..Default::default()
3089 };
3090 let get_req = S3Request {
3091 input: get_input,
3092 method: http::Method::GET,
3093 uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
3094 headers: http::HeaderMap::new(),
3095 extensions: http::Extensions::new(),
3096 credentials: None,
3097 region: None,
3098 service: None,
3099 trailing_headers: None,
3100 };
3101 if let Ok(get_resp) = self.backend.get_object(get_req).await
3102 && let Some(blob) = get_resp.output.body
3103 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
3104 && let Ok(index) = build_index_from_body(&body)
3105 {
3106 self.write_sidecar(&bucket, &key, &index).await;
3107 }
3108 Ok(resp)
3109 }
3110 async fn abort_multipart_upload(
3111 &self,
3112 req: S3Request<AbortMultipartUploadInput>,
3113 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
3114 self.backend.abort_multipart_upload(req).await
3115 }
3116 async fn list_multipart_uploads(
3117 &self,
3118 req: S3Request<ListMultipartUploadsInput>,
3119 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
3120 self.backend.list_multipart_uploads(req).await
3121 }
3122 async fn list_parts(
3123 &self,
3124 req: S3Request<ListPartsInput>,
3125 ) -> S3Result<S3Response<ListPartsOutput>> {
3126 self.backend.list_parts(req).await
3127 }
3128
3129 async fn get_object_acl(
3145 &self,
3146 req: S3Request<GetObjectAclInput>,
3147 ) -> S3Result<S3Response<GetObjectAclOutput>> {
3148 self.backend.get_object_acl(req).await
3149 }
3150 async fn put_object_acl(
3151 &self,
3152 req: S3Request<PutObjectAclInput>,
3153 ) -> S3Result<S3Response<PutObjectAclOutput>> {
3154 self.backend.put_object_acl(req).await
3155 }
3156 async fn get_object_tagging(
3162 &self,
3163 req: S3Request<GetObjectTaggingInput>,
3164 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
3165 let Some(mgr) = self.tagging.as_ref() else {
3166 return self.backend.get_object_tagging(req).await;
3167 };
3168 let tags = mgr
3169 .get_object_tags(&req.input.bucket, &req.input.key)
3170 .unwrap_or_default();
3171 Ok(S3Response::new(GetObjectTaggingOutput {
3172 tag_set: tagset_to_aws(&tags),
3173 ..Default::default()
3174 }))
3175 }
3176 async fn put_object_tagging(
3177 &self,
3178 req: S3Request<PutObjectTaggingInput>,
3179 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
3180 let Some(mgr) = self.tagging.as_ref() else {
3181 return self.backend.put_object_tagging(req).await;
3182 };
3183 let bucket = req.input.bucket.clone();
3184 let key = req.input.key.clone();
3185 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
3186 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
3187 })?;
3188 let existing = mgr.get_object_tags(&bucket, &key);
3192 self.enforce_policy_with_extra(
3193 &req,
3194 "s3:PutObjectTagging",
3195 &bucket,
3196 Some(&key),
3197 Some(&parsed),
3198 existing.as_ref(),
3199 )?;
3200 mgr.put_object_tags(&bucket, &key, parsed);
3201 Ok(S3Response::new(PutObjectTaggingOutput::default()))
3202 }
3203 async fn delete_object_tagging(
3204 &self,
3205 req: S3Request<DeleteObjectTaggingInput>,
3206 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
3207 let Some(mgr) = self.tagging.as_ref() else {
3208 return self.backend.delete_object_tagging(req).await;
3209 };
3210 let bucket = req.input.bucket.clone();
3211 let key = req.input.key.clone();
3212 let existing = mgr.get_object_tags(&bucket, &key);
3213 self.enforce_policy_with_extra(
3214 &req,
3215 "s3:DeleteObjectTagging",
3216 &bucket,
3217 Some(&key),
3218 None,
3219 existing.as_ref(),
3220 )?;
3221 mgr.delete_object_tags(&bucket, &key);
3222 Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
3223 }
3224 async fn get_object_attributes(
3225 &self,
3226 req: S3Request<GetObjectAttributesInput>,
3227 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
3228 self.backend.get_object_attributes(req).await
3229 }
3230 async fn restore_object(
3231 &self,
3232 req: S3Request<RestoreObjectInput>,
3233 ) -> S3Result<S3Response<RestoreObjectOutput>> {
3234 self.backend.restore_object(req).await
3235 }
3236 async fn upload_part_copy(
3237 &self,
3238 req: S3Request<UploadPartCopyInput>,
3239 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
3240 let CopySource::Bucket {
3251 bucket: src_bucket,
3252 key: src_key,
3253 ..
3254 } = &req.input.copy_source
3255 else {
3256 return self.backend.upload_part_copy(req).await;
3257 };
3258 let src_bucket = src_bucket.to_string();
3259 let src_key = src_key.to_string();
3260
3261 let head_input = HeadObjectInput {
3263 bucket: src_bucket.clone(),
3264 key: src_key.clone(),
3265 ..Default::default()
3266 };
3267 let head_req = S3Request {
3268 input: head_input,
3269 method: http::Method::HEAD,
3270 uri: req.uri.clone(),
3271 headers: req.headers.clone(),
3272 extensions: http::Extensions::new(),
3273 credentials: req.credentials.clone(),
3274 region: req.region.clone(),
3275 service: req.service.clone(),
3276 trailing_headers: None,
3277 };
3278 let needs_s4_copy = match self.backend.head_object(head_req).await {
3279 Ok(h) => {
3280 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
3281 }
3282 Err(_) => false,
3283 };
3284 if !needs_s4_copy {
3285 return self.backend.upload_part_copy(req).await;
3286 }
3287
3288 let source_range = req
3290 .input
3291 .copy_source_range
3292 .as_ref()
3293 .map(|r| parse_copy_source_range(r))
3294 .transpose()
3295 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
3296
3297 let mut get_input = GetObjectInput {
3301 bucket: src_bucket.clone(),
3302 key: src_key.clone(),
3303 ..Default::default()
3304 };
3305 get_input.range = source_range;
3306 let get_req = S3Request {
3307 input: get_input,
3308 method: http::Method::GET,
3309 uri: req.uri.clone(),
3310 headers: req.headers.clone(),
3311 extensions: http::Extensions::new(),
3312 credentials: req.credentials.clone(),
3313 region: req.region.clone(),
3314 service: req.service.clone(),
3315 trailing_headers: None,
3316 };
3317 let get_resp = self.get_object(get_req).await?;
3318 let blob = get_resp.output.body.ok_or_else(|| {
3319 S3Error::with_message(
3320 S3ErrorCode::InternalError,
3321 "upload_part_copy: empty body from source GET",
3322 )
3323 })?;
3324 let bytes = collect_blob(blob, self.max_body_bytes)
3325 .await
3326 .map_err(internal("collect upload_part_copy source body"))?;
3327
3328 let sample_len = bytes.len().min(SAMPLE_BYTES);
3330 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3331 let original_size = bytes.len() as u64;
3332 let (compressed, manifest) = self
3333 .registry
3334 .compress(bytes, codec_kind)
3335 .await
3336 .map_err(internal("registry compress upload_part_copy"))?;
3337 let header = FrameHeader {
3338 codec: codec_kind,
3339 original_size,
3340 compressed_size: compressed.len() as u64,
3341 crc32c: manifest.crc32c,
3342 };
3343 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3344 write_frame(&mut framed, header, &compressed);
3345 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3346 if !likely_final {
3347 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3348 }
3349 let framed_bytes = framed.freeze();
3350 let framed_len = framed_bytes.len() as i64;
3351
3352 let part_input = UploadPartInput {
3354 bucket: req.input.bucket.clone(),
3355 key: req.input.key.clone(),
3356 part_number: req.input.part_number,
3357 upload_id: req.input.upload_id.clone(),
3358 body: Some(bytes_to_blob(framed_bytes)),
3359 content_length: Some(framed_len),
3360 ..Default::default()
3361 };
3362 let part_req = S3Request {
3363 input: part_input,
3364 method: http::Method::PUT,
3365 uri: req.uri.clone(),
3366 headers: req.headers.clone(),
3367 extensions: http::Extensions::new(),
3368 credentials: req.credentials.clone(),
3369 region: req.region.clone(),
3370 service: req.service.clone(),
3371 trailing_headers: None,
3372 };
3373 let upload_resp = self.backend.upload_part(part_req).await?;
3374
3375 let copy_output = UploadPartCopyOutput {
3376 copy_part_result: Some(CopyPartResult {
3377 e_tag: upload_resp.output.e_tag.clone(),
3378 ..Default::default()
3379 }),
3380 ..Default::default()
3381 };
3382 Ok(S3Response::new(copy_output))
3383 }
3384
3385 async fn get_object_lock_configuration(
3392 &self,
3393 req: S3Request<GetObjectLockConfigurationInput>,
3394 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
3395 if let Some(mgr) = self.object_lock.as_ref() {
3396 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
3397 ObjectLockConfiguration {
3398 object_lock_enabled: Some(ObjectLockEnabled::from_static(
3399 ObjectLockEnabled::ENABLED,
3400 )),
3401 rule: Some(ObjectLockRule {
3402 default_retention: Some(DefaultRetention {
3403 days: Some(d.retention_days as i32),
3404 mode: Some(ObjectLockRetentionMode::from_static(
3405 match d.mode {
3406 crate::object_lock::LockMode::Governance => {
3407 ObjectLockRetentionMode::GOVERNANCE
3408 }
3409 crate::object_lock::LockMode::Compliance => {
3410 ObjectLockRetentionMode::COMPLIANCE
3411 }
3412 },
3413 )),
3414 years: None,
3415 }),
3416 }),
3417 }
3418 });
3419 let output = GetObjectLockConfigurationOutput {
3420 object_lock_configuration: cfg,
3421 };
3422 return Ok(S3Response::new(output));
3423 }
3424 self.backend.get_object_lock_configuration(req).await
3425 }
3426 async fn put_object_lock_configuration(
3427 &self,
3428 req: S3Request<PutObjectLockConfigurationInput>,
3429 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
3430 if let Some(mgr) = self.object_lock.as_ref() {
3431 let bucket = req.input.bucket.clone();
3432 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
3433 && let Some(rule) = cfg.rule.as_ref()
3434 && let Some(d) = rule.default_retention.as_ref()
3435 {
3436 let mode = d
3437 .mode
3438 .as_ref()
3439 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
3440 .ok_or_else(|| {
3441 S3Error::with_message(
3442 S3ErrorCode::InvalidRequest,
3443 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
3444 )
3445 })?;
3446 let days: u32 = match (d.days, d.years) {
3450 (Some(d), None) if d > 0 => d as u32,
3451 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
3452 _ => {
3453 return Err(S3Error::with_message(
3454 S3ErrorCode::InvalidRequest,
3455 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
3456 ));
3457 }
3458 };
3459 mgr.set_bucket_default(
3460 &bucket,
3461 crate::object_lock::BucketObjectLockDefault {
3462 mode,
3463 retention_days: days,
3464 },
3465 );
3466 }
3467 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
3468 }
3469 self.backend.put_object_lock_configuration(req).await
3470 }
3471 async fn get_object_legal_hold(
3472 &self,
3473 req: S3Request<GetObjectLegalHoldInput>,
3474 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
3475 if let Some(mgr) = self.object_lock.as_ref() {
3476 let on = mgr
3477 .get(&req.input.bucket, &req.input.key)
3478 .map(|s| s.legal_hold_on)
3479 .unwrap_or(false);
3480 let status = ObjectLockLegalHoldStatus::from_static(if on {
3481 ObjectLockLegalHoldStatus::ON
3482 } else {
3483 ObjectLockLegalHoldStatus::OFF
3484 });
3485 let output = GetObjectLegalHoldOutput {
3486 legal_hold: Some(ObjectLockLegalHold {
3487 status: Some(status),
3488 }),
3489 };
3490 return Ok(S3Response::new(output));
3491 }
3492 self.backend.get_object_legal_hold(req).await
3493 }
3494 async fn put_object_legal_hold(
3495 &self,
3496 req: S3Request<PutObjectLegalHoldInput>,
3497 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
3498 if let Some(mgr) = self.object_lock.as_ref() {
3499 let on = req
3500 .input
3501 .legal_hold
3502 .as_ref()
3503 .and_then(|h| h.status.as_ref())
3504 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3505 .unwrap_or(false);
3506 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
3507 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
3508 }
3509 self.backend.put_object_legal_hold(req).await
3510 }
3511 async fn get_object_retention(
3512 &self,
3513 req: S3Request<GetObjectRetentionInput>,
3514 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
3515 if let Some(mgr) = self.object_lock.as_ref() {
3516 let retention = mgr
3517 .get(&req.input.bucket, &req.input.key)
3518 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
3519 .map(|s| {
3520 let mode = s.mode.map(|m| {
3521 ObjectLockRetentionMode::from_static(match m {
3522 crate::object_lock::LockMode::Governance => {
3523 ObjectLockRetentionMode::GOVERNANCE
3524 }
3525 crate::object_lock::LockMode::Compliance => {
3526 ObjectLockRetentionMode::COMPLIANCE
3527 }
3528 })
3529 });
3530 let until = s.retain_until.map(chrono_utc_to_timestamp);
3531 ObjectLockRetention {
3532 mode,
3533 retain_until_date: until,
3534 }
3535 });
3536 let output = GetObjectRetentionOutput { retention };
3537 return Ok(S3Response::new(output));
3538 }
3539 self.backend.get_object_retention(req).await
3540 }
3541 async fn put_object_retention(
3542 &self,
3543 req: S3Request<PutObjectRetentionInput>,
3544 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
3545 if let Some(mgr) = self.object_lock.as_ref() {
3546 let bucket = req.input.bucket.clone();
3547 let key = req.input.key.clone();
3548 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3549 let retention = req.input.retention.as_ref().ok_or_else(|| {
3550 S3Error::with_message(
3551 S3ErrorCode::InvalidRequest,
3552 "PutObjectRetention requires a Retention element",
3553 )
3554 })?;
3555 let new_mode = retention
3556 .mode
3557 .as_ref()
3558 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3559 let new_until = retention
3560 .retain_until_date
3561 .as_ref()
3562 .map(timestamp_to_chrono_utc)
3563 .unwrap_or(None);
3564 let now = chrono::Utc::now();
3565 let existing = mgr.get(&bucket, &key).unwrap_or_default();
3566 if let Some(existing_mode) = existing.mode
3572 && existing_mode == crate::object_lock::LockMode::Compliance
3573 && existing.is_locked(now)
3574 {
3575 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
3576 return Err(S3Error::with_message(
3577 S3ErrorCode::AccessDenied,
3578 "Cannot downgrade Compliance retention to Governance while lock is active",
3579 ));
3580 }
3581 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3582 && next < prev
3583 {
3584 return Err(S3Error::with_message(
3585 S3ErrorCode::AccessDenied,
3586 "Cannot shorten Compliance retention while lock is active",
3587 ));
3588 }
3589 }
3590 if let Some(existing_mode) = existing.mode
3591 && existing_mode == crate::object_lock::LockMode::Governance
3592 && existing.is_locked(now)
3593 && !bypass
3594 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3595 && next < prev
3596 {
3597 return Err(S3Error::with_message(
3598 S3ErrorCode::AccessDenied,
3599 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
3600 ));
3601 }
3602 let mut state = existing;
3603 if new_mode.is_some() {
3604 state.mode = new_mode;
3605 }
3606 if new_until.is_some() {
3607 state.retain_until = new_until;
3608 }
3609 mgr.set(&bucket, &key, state);
3610 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
3611 }
3612 self.backend.put_object_retention(req).await
3613 }
3614
3615 async fn get_bucket_versioning(
3621 &self,
3622 req: S3Request<GetBucketVersioningInput>,
3623 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
3624 if let Some(mgr) = self.versioning.as_ref() {
3629 let output = match mgr.state(&req.input.bucket).as_aws_status() {
3630 Some(s) => GetBucketVersioningOutput {
3631 status: Some(BucketVersioningStatus::from(s.to_owned())),
3632 ..Default::default()
3633 },
3634 None => GetBucketVersioningOutput::default(),
3635 };
3636 return Ok(S3Response::new(output));
3637 }
3638 self.backend.get_bucket_versioning(req).await
3639 }
3640 async fn put_bucket_versioning(
3641 &self,
3642 req: S3Request<PutBucketVersioningInput>,
3643 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
3644 if let Some(mgr) = self.mfa_delete.as_ref()
3655 && let Some(target_enabled) = req
3656 .input
3657 .versioning_configuration
3658 .mfa_delete
3659 .as_ref()
3660 .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
3661 {
3662 let bucket = req.input.bucket.clone();
3663 let header = req.input.mfa.as_deref();
3664 let secret = mgr.lookup_secret(&bucket);
3665 let verified = match (header, secret.as_ref()) {
3666 (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
3667 Ok((serial, code)) => {
3668 serial == s.serial
3669 && crate::mfa::verify_totp(
3670 &s.secret_base32,
3671 &code,
3672 current_unix_secs(),
3673 )
3674 }
3675 Err(_) => false,
3676 },
3677 _ => false,
3678 };
3679 if !verified {
3680 crate::metrics::record_mfa_delete_denial(&bucket);
3681 let err = if header.is_none() {
3682 crate::mfa::MfaError::Missing
3683 } else {
3684 crate::mfa::MfaError::InvalidCode
3685 };
3686 return Err(mfa_error_to_s3(err));
3687 }
3688 mgr.set_bucket_state(&bucket, target_enabled);
3689 }
3690 if let Some(mgr) = self.versioning.as_ref() {
3696 let new_state = match req
3697 .input
3698 .versioning_configuration
3699 .status
3700 .as_ref()
3701 .map(|s| s.as_str())
3702 {
3703 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
3704 crate::versioning::VersioningState::Enabled
3705 }
3706 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
3707 crate::versioning::VersioningState::Suspended
3708 }
3709 _ => crate::versioning::VersioningState::Unversioned,
3710 };
3711 mgr.set_state(&req.input.bucket, new_state);
3712 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
3713 }
3714 self.backend.put_bucket_versioning(req).await
3715 }
3716
3717 async fn get_bucket_location(
3719 &self,
3720 req: S3Request<GetBucketLocationInput>,
3721 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
3722 self.backend.get_bucket_location(req).await
3723 }
3724
3725 async fn get_bucket_policy(
3727 &self,
3728 req: S3Request<GetBucketPolicyInput>,
3729 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
3730 self.backend.get_bucket_policy(req).await
3731 }
3732 async fn put_bucket_policy(
3733 &self,
3734 req: S3Request<PutBucketPolicyInput>,
3735 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
3736 self.backend.put_bucket_policy(req).await
3737 }
3738 async fn delete_bucket_policy(
3739 &self,
3740 req: S3Request<DeleteBucketPolicyInput>,
3741 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
3742 self.backend.delete_bucket_policy(req).await
3743 }
3744 async fn get_bucket_policy_status(
3745 &self,
3746 req: S3Request<GetBucketPolicyStatusInput>,
3747 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
3748 self.backend.get_bucket_policy_status(req).await
3749 }
3750
3751 async fn get_bucket_acl(
3753 &self,
3754 req: S3Request<GetBucketAclInput>,
3755 ) -> S3Result<S3Response<GetBucketAclOutput>> {
3756 self.backend.get_bucket_acl(req).await
3757 }
3758 async fn put_bucket_acl(
3759 &self,
3760 req: S3Request<PutBucketAclInput>,
3761 ) -> S3Result<S3Response<PutBucketAclOutput>> {
3762 self.backend.put_bucket_acl(req).await
3763 }
3764
3765 async fn get_bucket_cors(
3767 &self,
3768 req: S3Request<GetBucketCorsInput>,
3769 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
3770 if let Some(mgr) = self.cors.as_ref() {
3771 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
3772 S3Error::with_message(
3773 S3ErrorCode::NoSuchCORSConfiguration,
3774 "The CORS configuration does not exist".to_string(),
3775 )
3776 })?;
3777 let rules: Vec<CORSRule> = cfg
3778 .rules
3779 .into_iter()
3780 .map(|r| CORSRule {
3781 allowed_headers: if r.allowed_headers.is_empty() {
3782 None
3783 } else {
3784 Some(r.allowed_headers)
3785 },
3786 allowed_methods: r.allowed_methods,
3787 allowed_origins: r.allowed_origins,
3788 expose_headers: if r.expose_headers.is_empty() {
3789 None
3790 } else {
3791 Some(r.expose_headers)
3792 },
3793 id: r.id,
3794 max_age_seconds: r.max_age_seconds.map(|s| s as i32),
3795 })
3796 .collect();
3797 return Ok(S3Response::new(GetBucketCorsOutput {
3798 cors_rules: Some(rules),
3799 }));
3800 }
3801 self.backend.get_bucket_cors(req).await
3802 }
3803 async fn put_bucket_cors(
3804 &self,
3805 req: S3Request<PutBucketCorsInput>,
3806 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
3807 if let Some(mgr) = self.cors.as_ref() {
3808 let cfg = crate::cors::CorsConfig {
3809 rules: req
3810 .input
3811 .cors_configuration
3812 .cors_rules
3813 .into_iter()
3814 .map(|r| crate::cors::CorsRule {
3815 allowed_origins: r.allowed_origins,
3816 allowed_methods: r.allowed_methods,
3817 allowed_headers: r.allowed_headers.unwrap_or_default(),
3818 expose_headers: r.expose_headers.unwrap_or_default(),
3819 max_age_seconds: r.max_age_seconds.and_then(|s| {
3820 if s < 0 { None } else { Some(s as u32) }
3821 }),
3822 id: r.id,
3823 })
3824 .collect(),
3825 };
3826 mgr.put(&req.input.bucket, cfg);
3827 return Ok(S3Response::new(PutBucketCorsOutput::default()));
3828 }
3829 self.backend.put_bucket_cors(req).await
3830 }
3831 async fn delete_bucket_cors(
3832 &self,
3833 req: S3Request<DeleteBucketCorsInput>,
3834 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
3835 if let Some(mgr) = self.cors.as_ref() {
3836 mgr.delete(&req.input.bucket);
3837 return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
3838 }
3839 self.backend.delete_bucket_cors(req).await
3840 }
3841
3842 async fn get_bucket_lifecycle_configuration(
3844 &self,
3845 req: S3Request<GetBucketLifecycleConfigurationInput>,
3846 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
3847 if let Some(mgr) = self.lifecycle.as_ref() {
3848 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
3849 S3Error::with_message(
3850 S3ErrorCode::NoSuchLifecycleConfiguration,
3851 "The lifecycle configuration does not exist".to_string(),
3852 )
3853 })?;
3854 let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
3855 return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
3856 rules: Some(rules),
3857 transition_default_minimum_object_size: None,
3858 }));
3859 }
3860 self.backend.get_bucket_lifecycle_configuration(req).await
3861 }
3862 async fn put_bucket_lifecycle_configuration(
3863 &self,
3864 req: S3Request<PutBucketLifecycleConfigurationInput>,
3865 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
3866 if let Some(mgr) = self.lifecycle.as_ref() {
3867 let bucket = req.input.bucket.clone();
3868 let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
3869 let cfg = dto_lifecycle_to_internal(&dto_cfg);
3870 mgr.put(&bucket, cfg);
3871 return Ok(S3Response::new(
3872 PutBucketLifecycleConfigurationOutput::default(),
3873 ));
3874 }
3875 self.backend.put_bucket_lifecycle_configuration(req).await
3876 }
3877 async fn delete_bucket_lifecycle(
3878 &self,
3879 req: S3Request<DeleteBucketLifecycleInput>,
3880 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
3881 if let Some(mgr) = self.lifecycle.as_ref() {
3882 mgr.delete(&req.input.bucket);
3883 return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
3884 }
3885 self.backend.delete_bucket_lifecycle(req).await
3886 }
3887
3888 async fn get_bucket_tagging(
3890 &self,
3891 req: S3Request<GetBucketTaggingInput>,
3892 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
3893 let Some(mgr) = self.tagging.as_ref() else {
3894 return self.backend.get_bucket_tagging(req).await;
3895 };
3896 let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
3897 Ok(S3Response::new(GetBucketTaggingOutput {
3898 tag_set: tagset_to_aws(&tags),
3899 }))
3900 }
3901 async fn put_bucket_tagging(
3902 &self,
3903 req: S3Request<PutBucketTaggingInput>,
3904 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
3905 let Some(mgr) = self.tagging.as_ref() else {
3906 return self.backend.put_bucket_tagging(req).await;
3907 };
3908 let bucket = req.input.bucket.clone();
3909 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
3910 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
3911 })?;
3912 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
3913 mgr.put_bucket_tags(&bucket, parsed);
3914 Ok(S3Response::new(PutBucketTaggingOutput::default()))
3915 }
3916 async fn delete_bucket_tagging(
3917 &self,
3918 req: S3Request<DeleteBucketTaggingInput>,
3919 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
3920 let Some(mgr) = self.tagging.as_ref() else {
3921 return self.backend.delete_bucket_tagging(req).await;
3922 };
3923 let bucket = req.input.bucket.clone();
3924 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
3925 mgr.delete_bucket_tags(&bucket);
3926 Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
3927 }
3928
3929 async fn get_bucket_encryption(
3931 &self,
3932 req: S3Request<GetBucketEncryptionInput>,
3933 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
3934 self.backend.get_bucket_encryption(req).await
3935 }
3936 async fn put_bucket_encryption(
3937 &self,
3938 req: S3Request<PutBucketEncryptionInput>,
3939 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
3940 self.backend.put_bucket_encryption(req).await
3941 }
3942 async fn delete_bucket_encryption(
3943 &self,
3944 req: S3Request<DeleteBucketEncryptionInput>,
3945 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
3946 self.backend.delete_bucket_encryption(req).await
3947 }
3948
3949 async fn get_bucket_logging(
3951 &self,
3952 req: S3Request<GetBucketLoggingInput>,
3953 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
3954 self.backend.get_bucket_logging(req).await
3955 }
3956 async fn put_bucket_logging(
3957 &self,
3958 req: S3Request<PutBucketLoggingInput>,
3959 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
3960 self.backend.put_bucket_logging(req).await
3961 }
3962
3963 async fn get_bucket_notification_configuration(
3973 &self,
3974 req: S3Request<GetBucketNotificationConfigurationInput>,
3975 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
3976 if let Some(mgr) = self.notifications.as_ref() {
3977 let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
3978 let dto = notif_to_dto(&cfg);
3979 return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
3980 event_bridge_configuration: dto.event_bridge_configuration,
3981 lambda_function_configurations: dto.lambda_function_configurations,
3982 queue_configurations: dto.queue_configurations,
3983 topic_configurations: dto.topic_configurations,
3984 }));
3985 }
3986 self.backend
3987 .get_bucket_notification_configuration(req)
3988 .await
3989 }
3990 async fn put_bucket_notification_configuration(
3991 &self,
3992 req: S3Request<PutBucketNotificationConfigurationInput>,
3993 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
3994 if let Some(mgr) = self.notifications.as_ref() {
3995 let cfg = notif_from_dto(&req.input.notification_configuration);
3996 mgr.put(&req.input.bucket, cfg);
3997 return Ok(S3Response::new(
3998 PutBucketNotificationConfigurationOutput::default(),
3999 ));
4000 }
4001 self.backend
4002 .put_bucket_notification_configuration(req)
4003 .await
4004 }
4005
4006 async fn get_bucket_request_payment(
4008 &self,
4009 req: S3Request<GetBucketRequestPaymentInput>,
4010 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4011 self.backend.get_bucket_request_payment(req).await
4012 }
4013 async fn put_bucket_request_payment(
4014 &self,
4015 req: S3Request<PutBucketRequestPaymentInput>,
4016 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4017 self.backend.put_bucket_request_payment(req).await
4018 }
4019
4020 async fn get_bucket_website(
4022 &self,
4023 req: S3Request<GetBucketWebsiteInput>,
4024 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4025 self.backend.get_bucket_website(req).await
4026 }
4027 async fn put_bucket_website(
4028 &self,
4029 req: S3Request<PutBucketWebsiteInput>,
4030 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4031 self.backend.put_bucket_website(req).await
4032 }
4033 async fn delete_bucket_website(
4034 &self,
4035 req: S3Request<DeleteBucketWebsiteInput>,
4036 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4037 self.backend.delete_bucket_website(req).await
4038 }
4039
4040 async fn get_bucket_replication(
4042 &self,
4043 req: S3Request<GetBucketReplicationInput>,
4044 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4045 if let Some(mgr) = self.replication.as_ref() {
4046 return match mgr.get(&req.input.bucket) {
4047 Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4048 replication_configuration: Some(replication_to_dto(&cfg)),
4049 })),
4050 None => Err(S3Error::with_message(
4051 S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4052 format!("no replication configuration on bucket {}", req.input.bucket),
4053 )),
4054 };
4055 }
4056 self.backend.get_bucket_replication(req).await
4057 }
4058 async fn put_bucket_replication(
4059 &self,
4060 req: S3Request<PutBucketReplicationInput>,
4061 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4062 if let Some(mgr) = self.replication.as_ref() {
4063 let cfg = replication_from_dto(&req.input.replication_configuration);
4064 mgr.put(&req.input.bucket, cfg);
4065 return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4066 }
4067 self.backend.put_bucket_replication(req).await
4068 }
4069 async fn delete_bucket_replication(
4070 &self,
4071 req: S3Request<DeleteBucketReplicationInput>,
4072 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
4073 if let Some(mgr) = self.replication.as_ref() {
4074 mgr.delete(&req.input.bucket);
4075 return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
4076 }
4077 self.backend.delete_bucket_replication(req).await
4078 }
4079
4080 async fn get_bucket_accelerate_configuration(
4082 &self,
4083 req: S3Request<GetBucketAccelerateConfigurationInput>,
4084 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
4085 self.backend.get_bucket_accelerate_configuration(req).await
4086 }
4087 async fn put_bucket_accelerate_configuration(
4088 &self,
4089 req: S3Request<PutBucketAccelerateConfigurationInput>,
4090 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
4091 self.backend.put_bucket_accelerate_configuration(req).await
4092 }
4093
4094 async fn get_bucket_ownership_controls(
4096 &self,
4097 req: S3Request<GetBucketOwnershipControlsInput>,
4098 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
4099 self.backend.get_bucket_ownership_controls(req).await
4100 }
4101 async fn put_bucket_ownership_controls(
4102 &self,
4103 req: S3Request<PutBucketOwnershipControlsInput>,
4104 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
4105 self.backend.put_bucket_ownership_controls(req).await
4106 }
4107 async fn delete_bucket_ownership_controls(
4108 &self,
4109 req: S3Request<DeleteBucketOwnershipControlsInput>,
4110 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
4111 self.backend.delete_bucket_ownership_controls(req).await
4112 }
4113
4114 async fn get_public_access_block(
4116 &self,
4117 req: S3Request<GetPublicAccessBlockInput>,
4118 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
4119 self.backend.get_public_access_block(req).await
4120 }
4121 async fn put_public_access_block(
4122 &self,
4123 req: S3Request<PutPublicAccessBlockInput>,
4124 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
4125 self.backend.put_public_access_block(req).await
4126 }
4127 async fn delete_public_access_block(
4128 &self,
4129 req: S3Request<DeletePublicAccessBlockInput>,
4130 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
4131 self.backend.delete_public_access_block(req).await
4132 }
4133
4134 async fn select_object_content(
4153 &self,
4154 req: S3Request<SelectObjectContentInput>,
4155 ) -> S3Result<S3Response<SelectObjectContentOutput>> {
4156 use crate::select::{
4157 EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
4158 run_select_jsonlines,
4159 };
4160
4161 let select_bucket = req.input.bucket.clone();
4162 let select_key = req.input.key.clone();
4163 self.enforce_rate_limit(&req, &select_bucket)?;
4164 self.enforce_policy(
4165 &req,
4166 "s3:GetObject",
4167 &select_bucket,
4168 Some(&select_key),
4169 )?;
4170
4171 let request = req.input.request;
4172 let sql = request.expression.clone();
4173 if request.expression_type.as_str() != "SQL" {
4174 return Err(S3Error::with_message(
4175 S3ErrorCode::InvalidExpressionType,
4176 format!(
4177 "ExpressionType must be SQL, got: {}",
4178 request.expression_type.as_str()
4179 ),
4180 ));
4181 }
4182
4183 let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
4184 SelectInputFormat::JsonLines
4185 } else if let Some(csv) = request.input_serialization.csv.as_ref() {
4186 let has_header = csv
4187 .file_header_info
4188 .as_ref()
4189 .map(|h| {
4190 let s = h.as_str();
4191 s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
4192 })
4193 .unwrap_or(false);
4194 let delim = csv
4195 .field_delimiter
4196 .as_deref()
4197 .and_then(|s| s.chars().next())
4198 .unwrap_or(',');
4199 SelectInputFormat::Csv {
4200 has_header,
4201 delimiter: delim,
4202 }
4203 } else if request.input_serialization.parquet.is_some() {
4204 return Err(S3Error::with_message(
4205 S3ErrorCode::NotImplemented,
4206 "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
4207 ));
4208 } else {
4209 return Err(S3Error::with_message(
4210 S3ErrorCode::InvalidRequest,
4211 "InputSerialization requires exactly one of CSV / JSON / Parquet",
4212 ));
4213 };
4214 if let Some(ct) = request.input_serialization.compression_type.as_ref()
4215 && !ct.as_str().eq_ignore_ascii_case("NONE")
4216 {
4217 return Err(S3Error::with_message(
4218 S3ErrorCode::NotImplemented,
4219 format!(
4220 "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
4221 ct.as_str()
4222 ),
4223 ));
4224 }
4225
4226 let output_format = if request.output_serialization.json.is_some() {
4227 SelectOutputFormat::Json
4228 } else if request.output_serialization.csv.is_some() {
4229 SelectOutputFormat::Csv
4230 } else {
4231 return Err(S3Error::with_message(
4232 S3ErrorCode::InvalidRequest,
4233 "OutputSerialization requires exactly one of CSV / JSON",
4234 ));
4235 };
4236
4237 let get_input = GetObjectInput {
4238 bucket: select_bucket.clone(),
4239 key: select_key.clone(),
4240 sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
4241 sse_customer_key: req.input.sse_customer_key.clone(),
4242 sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
4243 ..Default::default()
4244 };
4245 let get_req = S3Request {
4246 input: get_input,
4247 method: http::Method::GET,
4248 uri: format!("/{}/{}", select_bucket, select_key)
4249 .parse()
4250 .map_err(|e| {
4251 S3Error::with_message(
4252 S3ErrorCode::InternalError,
4253 format!("constructing inner GET URI: {e}"),
4254 )
4255 })?,
4256 headers: http::HeaderMap::new(),
4257 extensions: http::Extensions::new(),
4258 credentials: req.credentials.clone(),
4259 region: req.region.clone(),
4260 service: req.service.clone(),
4261 trailing_headers: None,
4262 };
4263 let mut get_resp = self.get_object(get_req).await?;
4264 let blob = get_resp.output.body.take().ok_or_else(|| {
4265 S3Error::with_message(
4266 S3ErrorCode::InternalError,
4267 "Select: object body was empty after GET",
4268 )
4269 })?;
4270 let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
4271 .await
4272 .map_err(internal("collect Select body"))?;
4273 let scanned = body_bytes.len() as u64;
4274
4275 let matched_payload = match input_format {
4276 SelectInputFormat::JsonLines => {
4277 run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
4278 |e| select_error_to_s3(e, "JSON Lines"),
4279 )?
4280 }
4281 SelectInputFormat::Csv { .. } => {
4282 run_select_csv(&sql, &body_bytes, input_format, output_format)
4283 .map_err(|e| select_error_to_s3(e, "CSV"))?
4284 }
4285 };
4286
4287 let returned = matched_payload.len() as u64;
4288 let processed = scanned;
4289 let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
4290 if !matched_payload.is_empty() {
4291 events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
4292 payload: Some(bytes::Bytes::from(matched_payload)),
4293 })));
4294 }
4295 events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
4296 details: Some(Stats {
4297 bytes_scanned: Some(scanned as i64),
4298 bytes_processed: Some(processed as i64),
4299 bytes_returned: Some(returned as i64),
4300 }),
4301 })));
4302 events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
4303 let _writer = EventStreamWriter::new();
4306
4307 let stream =
4308 SelectObjectContentEventStream::new(futures::stream::iter(events));
4309 let output = SelectObjectContentOutput {
4310 payload: Some(stream),
4311 };
4312 Ok(S3Response::new(output))
4313 }
4314
4315 async fn put_bucket_inventory_configuration(
4329 &self,
4330 req: S3Request<PutBucketInventoryConfigurationInput>,
4331 ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
4332 if let Some(mgr) = self.inventory.as_ref() {
4333 let cfg = inv_from_dto(
4334 &req.input.bucket,
4335 &req.input.id,
4336 &req.input.inventory_configuration,
4337 );
4338 mgr.put(cfg);
4339 return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
4340 }
4341 self.backend.put_bucket_inventory_configuration(req).await
4342 }
4343
4344 async fn get_bucket_inventory_configuration(
4345 &self,
4346 req: S3Request<GetBucketInventoryConfigurationInput>,
4347 ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
4348 if let Some(mgr) = self.inventory.as_ref() {
4349 let cfg = mgr.get(&req.input.bucket, &req.input.id);
4350 if let Some(cfg) = cfg {
4351 let out = GetBucketInventoryConfigurationOutput {
4352 inventory_configuration: Some(inv_to_dto(&cfg)),
4353 };
4354 return Ok(S3Response::new(out));
4355 }
4356 let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
4363 .unwrap_or(S3ErrorCode::NoSuchKey);
4364 return Err(S3Error::with_message(
4365 code,
4366 format!(
4367 "no inventory configuration with id={} on bucket={}",
4368 req.input.id, req.input.bucket
4369 ),
4370 ));
4371 }
4372 self.backend.get_bucket_inventory_configuration(req).await
4373 }
4374
4375 async fn list_bucket_inventory_configurations(
4376 &self,
4377 req: S3Request<ListBucketInventoryConfigurationsInput>,
4378 ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
4379 if let Some(mgr) = self.inventory.as_ref() {
4380 let list = mgr.list_for_bucket(&req.input.bucket);
4381 let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
4382 let out = ListBucketInventoryConfigurationsOutput {
4383 continuation_token: req.input.continuation_token.clone(),
4384 inventory_configuration_list: if dto_list.is_empty() {
4385 None
4386 } else {
4387 Some(dto_list)
4388 },
4389 is_truncated: Some(false),
4390 next_continuation_token: None,
4391 };
4392 return Ok(S3Response::new(out));
4393 }
4394 self.backend.list_bucket_inventory_configurations(req).await
4395 }
4396
4397 async fn delete_bucket_inventory_configuration(
4398 &self,
4399 req: S3Request<DeleteBucketInventoryConfigurationInput>,
4400 ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
4401 if let Some(mgr) = self.inventory.as_ref() {
4402 mgr.delete(&req.input.bucket, &req.input.id);
4403 return Ok(S3Response::new(
4404 DeleteBucketInventoryConfigurationOutput::default(),
4405 ));
4406 }
4407 self.backend.delete_bucket_inventory_configuration(req).await
4408 }
4409}
4410
4411fn inv_from_dto(
4421 bucket: &str,
4422 id: &str,
4423 dto: &InventoryConfiguration,
4424) -> crate::inventory::InventoryConfig {
4425 let frequency_hours = match dto.schedule.frequency.as_str() {
4426 "Weekly" => 24 * 7,
4427 _ => 24,
4431 };
4432 let format = crate::inventory::InventoryFormat::Csv;
4436 crate::inventory::InventoryConfig {
4437 id: id.to_owned(),
4438 bucket: bucket.to_owned(),
4439 destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
4440 destination_prefix: dto
4441 .destination
4442 .s3_bucket_destination
4443 .prefix
4444 .clone()
4445 .unwrap_or_default(),
4446 frequency_hours,
4447 format,
4448 included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
4449 dto.included_object_versions.as_str(),
4450 ),
4451 }
4452}
4453
4454fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
4455 InventoryConfiguration {
4456 id: cfg.id.clone(),
4457 is_enabled: true,
4458 included_object_versions: InventoryIncludedObjectVersions::from(
4459 cfg.included_object_versions.as_aws_str().to_owned(),
4460 ),
4461 destination: InventoryDestination {
4462 s3_bucket_destination: InventoryS3BucketDestination {
4463 account_id: None,
4464 bucket: cfg.destination_bucket.clone(),
4465 encryption: None,
4466 format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
4467 prefix: if cfg.destination_prefix.is_empty() {
4468 None
4469 } else {
4470 Some(cfg.destination_prefix.clone())
4471 },
4472 },
4473 },
4474 schedule: InventorySchedule {
4475 frequency: InventoryFrequency::from(
4479 if cfg.frequency_hours == 24 * 7 {
4480 "Weekly"
4481 } else {
4482 "Daily"
4483 }
4484 .to_owned(),
4485 ),
4486 },
4487 filter: None,
4488 optional_fields: None,
4489 }
4490}
4491
4492fn notif_from_dto(
4509 dto: &NotificationConfiguration,
4510) -> crate::notifications::NotificationConfig {
4511 let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
4512 if let Some(topics) = dto.topic_configurations.as_ref() {
4513 for (idx, t) in topics.iter().enumerate() {
4514 let events = events_from_dto(&t.events);
4515 let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
4516 rules.push(crate::notifications::NotificationRule {
4517 id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
4518 events,
4519 destination: crate::notifications::Destination::Sns {
4520 topic_arn: t.topic_arn.clone(),
4521 },
4522 filter_prefix: prefix,
4523 filter_suffix: suffix,
4524 });
4525 }
4526 }
4527 if let Some(queues) = dto.queue_configurations.as_ref() {
4528 for (idx, q) in queues.iter().enumerate() {
4529 let events = events_from_dto(&q.events);
4530 let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
4531 rules.push(crate::notifications::NotificationRule {
4532 id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
4533 events,
4534 destination: crate::notifications::Destination::Sqs {
4535 queue_arn: q.queue_arn.clone(),
4536 },
4537 filter_prefix: prefix,
4538 filter_suffix: suffix,
4539 });
4540 }
4541 }
4542 crate::notifications::NotificationConfig { rules }
4543}
4544
4545fn notif_to_dto(
4546 cfg: &crate::notifications::NotificationConfig,
4547) -> NotificationConfiguration {
4548 let mut topics: Vec<TopicConfiguration> = Vec::new();
4549 let mut queues: Vec<QueueConfiguration> = Vec::new();
4550 for rule in &cfg.rules {
4551 let events: Vec<Event> = rule
4552 .events
4553 .iter()
4554 .map(|e| Event::from(e.as_aws_str().to_owned()))
4555 .collect();
4556 let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
4557 match &rule.destination {
4558 crate::notifications::Destination::Sns { topic_arn } => {
4559 topics.push(TopicConfiguration {
4560 events,
4561 filter,
4562 id: Some(rule.id.clone()),
4563 topic_arn: topic_arn.clone(),
4564 });
4565 }
4566 crate::notifications::Destination::Sqs { queue_arn } => {
4567 queues.push(QueueConfiguration {
4568 events,
4569 filter,
4570 id: Some(rule.id.clone()),
4571 queue_arn: queue_arn.clone(),
4572 });
4573 }
4574 crate::notifications::Destination::Webhook { .. } => {}
4579 }
4580 }
4581 NotificationConfiguration {
4582 event_bridge_configuration: None,
4583 lambda_function_configurations: None,
4584 queue_configurations: if queues.is_empty() { None } else { Some(queues) },
4585 topic_configurations: if topics.is_empty() { None } else { Some(topics) },
4586 }
4587}
4588
4589fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
4590 events
4591 .iter()
4592 .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
4593 .collect()
4594}
4595
4596fn filter_from_dto(
4597 f: Option<&NotificationConfigurationFilter>,
4598) -> (Option<String>, Option<String>) {
4599 let Some(f) = f else {
4600 return (None, None);
4601 };
4602 let Some(key) = f.key.as_ref() else {
4603 return (None, None);
4604 };
4605 let Some(rules) = key.filter_rules.as_ref() else {
4606 return (None, None);
4607 };
4608 let mut prefix = None;
4609 let mut suffix = None;
4610 for r in rules {
4611 let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
4612 let value = r.value.clone();
4613 match name.as_deref() {
4614 Some("prefix") => prefix = value,
4615 Some("suffix") => suffix = value,
4616 _ => {}
4617 }
4618 }
4619 (prefix, suffix)
4620}
4621
4622fn filter_to_dto(
4623 prefix: Option<&str>,
4624 suffix: Option<&str>,
4625) -> Option<NotificationConfigurationFilter> {
4626 if prefix.is_none() && suffix.is_none() {
4627 return None;
4628 }
4629 let mut rules: Vec<FilterRule> = Vec::new();
4630 if let Some(p) = prefix {
4631 rules.push(FilterRule {
4632 name: Some(FilterRuleName::from("prefix".to_owned())),
4633 value: Some(p.to_owned()),
4634 });
4635 }
4636 if let Some(s) = suffix {
4637 rules.push(FilterRule {
4638 name: Some(FilterRuleName::from("suffix".to_owned())),
4639 value: Some(s.to_owned()),
4640 });
4641 }
4642 Some(NotificationConfigurationFilter {
4643 key: Some(S3KeyFilter {
4644 filter_rules: Some(rules),
4645 }),
4646 })
4647}
4648
4649fn replication_from_dto(
4662 dto: &ReplicationConfiguration,
4663) -> crate::replication::ReplicationConfig {
4664 let rules = dto
4665 .rules
4666 .iter()
4667 .enumerate()
4668 .map(|(idx, r)| {
4669 let id = r
4670 .id
4671 .as_ref()
4672 .map(|s| s.as_str().to_owned())
4673 .unwrap_or_else(|| format!("rule-{idx}"));
4674 let priority = r.priority.unwrap_or(0).max(0) as u32;
4675 let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
4676 let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
4677 let destination_bucket = r.destination.bucket.clone();
4678 let destination_storage_class = r
4679 .destination
4680 .storage_class
4681 .as_ref()
4682 .map(|s| s.as_str().to_owned());
4683 crate::replication::ReplicationRule {
4684 id,
4685 priority,
4686 status_enabled,
4687 filter,
4688 destination_bucket,
4689 destination_storage_class,
4690 }
4691 })
4692 .collect();
4693 crate::replication::ReplicationConfig {
4694 role: dto.role.clone(),
4695 rules,
4696 }
4697}
4698
4699fn replication_to_dto(
4700 cfg: &crate::replication::ReplicationConfig,
4701) -> ReplicationConfiguration {
4702 let rules = cfg
4703 .rules
4704 .iter()
4705 .map(|r| {
4706 let status = if r.status_enabled {
4707 ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
4708 } else {
4709 ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
4710 };
4711 let destination = Destination {
4712 access_control_translation: None,
4713 account: None,
4714 bucket: r.destination_bucket.clone(),
4715 encryption_configuration: None,
4716 metrics: None,
4717 replication_time: None,
4718 storage_class: r
4719 .destination_storage_class
4720 .as_ref()
4721 .map(|s| StorageClass::from(s.clone())),
4722 };
4723 let filter = Some(replication_filter_to_dto(&r.filter));
4724 ReplicationRule {
4725 delete_marker_replication: None,
4726 destination,
4727 existing_object_replication: None,
4728 filter,
4729 id: Some(r.id.clone()),
4730 prefix: None,
4731 priority: Some(r.priority as i32),
4732 source_selection_criteria: None,
4733 status,
4734 }
4735 })
4736 .collect();
4737 ReplicationConfiguration {
4738 role: cfg.role.clone(),
4739 rules,
4740 }
4741}
4742
4743fn replication_filter_from_dto(
4744 f: Option<&ReplicationRuleFilter>,
4745 rule_level_prefix: Option<&str>,
4746) -> crate::replication::ReplicationFilter {
4747 let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
4748 let mut tags: Vec<(String, String)> = Vec::new();
4749 if let Some(f) = f {
4750 if let Some(p) = f.prefix.as_ref()
4751 && prefix.is_none()
4752 {
4753 prefix = Some(p.clone());
4754 }
4755 if let Some(t) = f.tag.as_ref()
4756 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
4757 {
4758 tags.push((k.clone(), v.clone()));
4759 }
4760 if let Some(and) = f.and.as_ref() {
4761 if let Some(p) = and.prefix.as_ref()
4762 && prefix.is_none()
4763 {
4764 prefix = Some(p.clone());
4765 }
4766 if let Some(ts) = and.tags.as_ref() {
4767 for t in ts {
4768 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
4769 tags.push((k.clone(), v.clone()));
4770 }
4771 }
4772 }
4773 }
4774 }
4775 crate::replication::ReplicationFilter { prefix, tags }
4776}
4777
4778fn replication_filter_to_dto(
4779 f: &crate::replication::ReplicationFilter,
4780) -> ReplicationRuleFilter {
4781 if f.tags.is_empty() {
4782 ReplicationRuleFilter {
4783 and: None,
4784 prefix: f.prefix.clone(),
4785 tag: None,
4786 }
4787 } else if f.tags.len() == 1 && f.prefix.is_none() {
4788 let (k, v) = &f.tags[0];
4789 ReplicationRuleFilter {
4790 and: None,
4791 prefix: None,
4792 tag: Some(Tag {
4793 key: Some(k.clone()),
4794 value: Some(v.clone()),
4795 }),
4796 }
4797 } else {
4798 let tags: Vec<Tag> = f
4799 .tags
4800 .iter()
4801 .map(|(k, v)| Tag {
4802 key: Some(k.clone()),
4803 value: Some(v.clone()),
4804 })
4805 .collect();
4806 ReplicationRuleFilter {
4807 and: Some(ReplicationRuleAndOperator {
4808 prefix: f.prefix.clone(),
4809 tags: Some(tags),
4810 }),
4811 prefix: None,
4812 tag: None,
4813 }
4814 }
4815}
4816
4817fn dto_lifecycle_to_internal(
4831 dto: &BucketLifecycleConfiguration,
4832) -> crate::lifecycle::LifecycleConfig {
4833 crate::lifecycle::LifecycleConfig {
4834 rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
4835 }
4836}
4837
4838fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
4839 let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
4840 let filter = rule
4841 .filter
4842 .as_ref()
4843 .map(dto_filter_to_internal)
4844 .unwrap_or_default();
4845 let expiration_days = rule
4846 .expiration
4847 .as_ref()
4848 .and_then(|e| e.days)
4849 .and_then(|d| u32::try_from(d).ok());
4850 let expiration_date = rule
4851 .expiration
4852 .as_ref()
4853 .and_then(|e| e.date.as_ref())
4854 .and_then(timestamp_to_chrono_utc);
4855 let transitions: Vec<crate::lifecycle::TransitionRule> = rule
4856 .transitions
4857 .as_ref()
4858 .map(|ts| {
4859 ts.iter()
4860 .filter_map(|t| {
4861 let days = u32::try_from(t.days?).ok()?;
4862 let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
4863 Some(crate::lifecycle::TransitionRule {
4864 days,
4865 storage_class,
4866 })
4867 })
4868 .collect()
4869 })
4870 .unwrap_or_default();
4871 let noncurrent_version_expiration_days = rule
4872 .noncurrent_version_expiration
4873 .as_ref()
4874 .and_then(|n| n.noncurrent_days)
4875 .and_then(|d| u32::try_from(d).ok());
4876 let abort_incomplete_multipart_upload_days = rule
4877 .abort_incomplete_multipart_upload
4878 .as_ref()
4879 .and_then(|a| a.days_after_initiation)
4880 .and_then(|d| u32::try_from(d).ok());
4881 crate::lifecycle::LifecycleRule {
4882 id: rule.id.clone().unwrap_or_default(),
4883 status,
4884 filter,
4885 expiration_days,
4886 expiration_date,
4887 transitions,
4888 noncurrent_version_expiration_days,
4889 abort_incomplete_multipart_upload_days,
4890 }
4891}
4892
4893fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
4894 let mut prefix = filter.prefix.clone();
4895 let mut tags: Vec<(String, String)> = Vec::new();
4896 let mut size_gt: Option<u64> = filter
4897 .object_size_greater_than
4898 .and_then(|n| u64::try_from(n).ok());
4899 let mut size_lt: Option<u64> = filter
4900 .object_size_less_than
4901 .and_then(|n| u64::try_from(n).ok());
4902 if let Some(t) = &filter.tag
4903 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
4904 {
4905 tags.push((k.clone(), v.clone()));
4906 }
4907 if let Some(and) = &filter.and {
4908 if prefix.is_none() {
4909 prefix = and.prefix.clone();
4910 }
4911 if size_gt.is_none() {
4912 size_gt = and
4913 .object_size_greater_than
4914 .and_then(|n| u64::try_from(n).ok());
4915 }
4916 if size_lt.is_none() {
4917 size_lt = and
4918 .object_size_less_than
4919 .and_then(|n| u64::try_from(n).ok());
4920 }
4921 if let Some(ts) = &and.tags {
4922 for t in ts {
4923 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
4924 tags.push((k.clone(), v.clone()));
4925 }
4926 }
4927 }
4928 }
4929 crate::lifecycle::LifecycleFilter {
4930 prefix,
4931 tags,
4932 object_size_greater_than: size_gt,
4933 object_size_less_than: size_lt,
4934 }
4935}
4936
4937fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
4938 let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
4939 Some(LifecycleExpiration {
4940 date: rule.expiration_date.map(chrono_utc_to_timestamp),
4941 days: rule.expiration_days.map(|d| d as i32),
4942 expired_object_delete_marker: None,
4943 })
4944 } else {
4945 None
4946 };
4947 let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
4948 None
4949 } else {
4950 Some(
4951 rule.transitions
4952 .iter()
4953 .map(|t| Transition {
4954 date: None,
4955 days: Some(t.days as i32),
4956 storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
4957 })
4958 .collect(),
4959 )
4960 };
4961 let noncurrent_version_expiration =
4962 rule.noncurrent_version_expiration_days
4963 .map(|d| NoncurrentVersionExpiration {
4964 newer_noncurrent_versions: None,
4965 noncurrent_days: Some(d as i32),
4966 });
4967 let abort_incomplete_multipart_upload =
4968 rule.abort_incomplete_multipart_upload_days
4969 .map(|d| AbortIncompleteMultipartUpload {
4970 days_after_initiation: Some(d as i32),
4971 });
4972 let filter = if rule.filter.tags.is_empty()
4973 && rule.filter.object_size_greater_than.is_none()
4974 && rule.filter.object_size_less_than.is_none()
4975 {
4976 rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
4977 and: None,
4978 object_size_greater_than: None,
4979 object_size_less_than: None,
4980 prefix: Some(p.clone()),
4981 tag: None,
4982 })
4983 } else if rule.filter.tags.len() == 1
4984 && rule.filter.prefix.is_none()
4985 && rule.filter.object_size_greater_than.is_none()
4986 && rule.filter.object_size_less_than.is_none()
4987 {
4988 let (k, v) = rule.filter.tags[0].clone();
4989 Some(LifecycleRuleFilter {
4990 and: None,
4991 object_size_greater_than: None,
4992 object_size_less_than: None,
4993 prefix: None,
4994 tag: Some(Tag {
4995 key: Some(k),
4996 value: Some(v),
4997 }),
4998 })
4999 } else {
5000 let tags = if rule.filter.tags.is_empty() {
5001 None
5002 } else {
5003 Some(
5004 rule.filter
5005 .tags
5006 .iter()
5007 .map(|(k, v)| Tag {
5008 key: Some(k.clone()),
5009 value: Some(v.clone()),
5010 })
5011 .collect(),
5012 )
5013 };
5014 Some(LifecycleRuleFilter {
5015 and: Some(LifecycleRuleAndOperator {
5016 object_size_greater_than: rule
5017 .filter
5018 .object_size_greater_than
5019 .and_then(|n| i64::try_from(n).ok()),
5020 object_size_less_than: rule
5021 .filter
5022 .object_size_less_than
5023 .and_then(|n| i64::try_from(n).ok()),
5024 prefix: rule.filter.prefix.clone(),
5025 tags,
5026 }),
5027 object_size_greater_than: None,
5028 object_size_less_than: None,
5029 prefix: None,
5030 tag: None,
5031 })
5032 };
5033 LifecycleRule {
5034 abort_incomplete_multipart_upload,
5035 expiration,
5036 filter,
5037 id: if rule.id.is_empty() {
5038 None
5039 } else {
5040 Some(rule.id.clone())
5041 },
5042 noncurrent_version_expiration,
5043 noncurrent_version_transitions: None,
5044 prefix: None,
5045 status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5046 transitions,
5047 }
5048}
5049
5050#[derive(Debug, Clone)]
5081pub struct SigV4aGate {
5082 store: crate::sigv4a::SharedSigV4aCredentialStore,
5083}
5084
5085impl SigV4aGate {
5086 #[must_use]
5087 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
5088 Self { store }
5089 }
5090
5091 pub fn pre_route<B>(
5107 &self,
5108 req: &http::Request<B>,
5109 requested_region: &str,
5110 canonical_request_bytes: &[u8],
5111 ) -> Result<(), SigV4aGateError> {
5112 if !crate::sigv4a::detect(req) {
5113 return Ok(());
5114 }
5115 let auth_hdr = req
5116 .headers()
5117 .get(http::header::AUTHORIZATION)
5118 .and_then(|v| v.to_str().ok())
5119 .ok_or(SigV4aGateError::MissingAuthorization)?;
5120 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
5121 .ok_or(SigV4aGateError::MalformedAuthorization)?;
5122 let region_set = req
5123 .headers()
5124 .get(crate::sigv4a::REGION_SET_HEADER)
5125 .and_then(|v| v.to_str().ok())
5126 .unwrap_or("*");
5127 let key = self
5128 .store
5129 .get(&parsed.access_key_id)
5130 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
5131 crate::sigv4a::verify(
5132 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
5133 &parsed.signature_der,
5134 key,
5135 region_set,
5136 requested_region,
5137 )
5138 .map_err(SigV4aGateError::Verify)?;
5139 Ok(())
5140 }
5141}
5142
5143#[derive(Debug, thiserror::Error)]
5147pub enum SigV4aGateError {
5148 #[error("missing Authorization header")]
5149 MissingAuthorization,
5150 #[error("malformed SigV4a Authorization header")]
5151 MalformedAuthorization,
5152 #[error("unknown SigV4a access-key-id: {0}")]
5153 UnknownAccessKey(String),
5154 #[error("SigV4a verification failed: {0}")]
5155 Verify(#[source] crate::sigv4a::SigV4aError),
5156}
5157
5158impl SigV4aGateError {
5159 #[must_use]
5161 pub fn s3_error_code(&self) -> &'static str {
5162 match self {
5163 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
5164 _ => "SignatureDoesNotMatch",
5165 }
5166 }
5167}
5168
5169#[cfg(test)]
5170mod tests {
5171 use super::*;
5172
5173 #[test]
5174 fn manifest_roundtrip_via_metadata() {
5175 let original = ChunkManifest {
5176 codec: CodecKind::CpuZstd,
5177 original_size: 1234,
5178 compressed_size: 567,
5179 crc32c: 0xdead_beef,
5180 };
5181 let mut meta: Option<Metadata> = None;
5182 write_manifest(&mut meta, &original);
5183 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
5184 assert_eq!(extracted.codec, original.codec);
5185 assert_eq!(extracted.original_size, original.original_size);
5186 assert_eq!(extracted.compressed_size, original.compressed_size);
5187 assert_eq!(extracted.crc32c, original.crc32c);
5188 }
5189
5190 #[test]
5191 fn missing_metadata_yields_none() {
5192 let meta: Option<Metadata> = None;
5193 assert!(extract_manifest(&meta).is_none());
5194 }
5195
5196 #[test]
5197 fn partial_metadata_yields_none() {
5198 let mut meta = Metadata::new();
5199 meta.insert(META_CODEC.into(), "cpu-zstd".into());
5200 let opt = Some(meta);
5201 assert!(extract_manifest(&opt).is_none());
5202 }
5203
5204 #[test]
5205 fn parse_copy_source_range_basic() {
5206 let r = parse_copy_source_range("bytes=10-20").unwrap();
5207 match r {
5208 s3s::dto::Range::Int { first, last } => {
5209 assert_eq!(first, 10);
5210 assert_eq!(last, Some(20));
5211 }
5212 _ => panic!("expected Int range"),
5213 }
5214 }
5215
5216 #[test]
5217 fn parse_copy_source_range_rejects_inverted() {
5218 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
5219 assert!(err.contains("last < first"));
5220 }
5221
5222 #[test]
5223 fn parse_copy_source_range_rejects_missing_prefix() {
5224 let err = parse_copy_source_range("10-20").unwrap_err();
5225 assert!(err.contains("must start with 'bytes='"));
5226 }
5227
5228 #[test]
5229 fn parse_copy_source_range_rejects_open_ended() {
5230 assert!(parse_copy_source_range("bytes=10-").is_err());
5233 assert!(parse_copy_source_range("bytes=-10").is_err());
5234 }
5235}