1use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry, CompressTelemetry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54const SAMPLE_BYTES: usize = 4096;
56
57#[inline]
64fn stamp_gpu_compress_telemetry(tel: &CompressTelemetry) {
65 if let Some(secs) = tel.gpu_seconds {
66 crate::metrics::record_gpu_compress(tel.codec, secs, tel.bytes_in, tel.bytes_out);
67 }
68 if tel.oom {
69 crate::metrics::record_gpu_oom(tel.codec);
70 }
71}
72
73const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
81 .add(b' ')
82 .add(b'"')
83 .add(b'#')
84 .add(b'<')
85 .add(b'>')
86 .add(b'?')
87 .add(b'`')
88 .add(b'{')
89 .add(b'}')
90 .add(b'|')
91 .add(b'\\')
92 .add(b'^')
93 .add(b'[')
94 .add(b']')
95 .add(b'%');
96
97pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
109 use percent_encoding::utf8_percent_encode;
110 let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
111 let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
112 let raw = format!("/{bucket_enc}/{key_enc}");
113 raw.parse::<http::Uri>().map_err(|e| {
114 let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
123 .unwrap_or(S3ErrorCode::InvalidArgument);
124 S3Error::with_message(
125 code,
126 format!("object key cannot be encoded as a request URI: {e}"),
127 )
128 })
129}
130
131struct AccessLogPreamble {
135 remote_ip: Option<String>,
136 requester: Option<String>,
137 request_uri: String,
138 user_agent: Option<String>,
139}
140
141pub struct S4Service<B: S3> {
142 backend: Arc<B>,
147 registry: Arc<CodecRegistry>,
148 dispatcher: Arc<dyn CodecDispatcher>,
149 max_body_bytes: usize,
150 policy: Option<crate::policy::SharedPolicy>,
151 secure_transport: bool,
156 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
158 access_log: Option<crate::access_log::SharedAccessLog>,
160 sse_keyring: Option<crate::sse::SharedSseKeyring>,
168 versioning: Option<Arc<crate::versioning::VersioningManager>>,
178 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
187 kms_default_key_id: Option<String>,
188 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
199 cors: Option<Arc<crate::cors::CorsManager>>,
208 inventory: Option<Arc<crate::inventory::InventoryManager>>,
219 notifications: Option<Arc<crate::notifications::NotificationManager>>,
227 lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
240 tagging: Option<Arc<crate::tagging::TagManager>>,
253 replication: Option<Arc<crate::replication::ReplicationManager>>,
272 mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
281 compliance_strict: bool,
287 sigv4a_gate: Option<Arc<SigV4aGate>>,
297 multipart_state: Arc<crate::multipart_state::MultipartStateStore>,
304 sse_chunk_size: usize,
312}
313
314impl<B: S3> S4Service<B> {
315 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
317
318 pub fn new(
319 backend: B,
320 registry: Arc<CodecRegistry>,
321 dispatcher: Arc<dyn CodecDispatcher>,
322 ) -> Self {
323 Self {
324 backend: Arc::new(backend),
325 registry,
326 dispatcher,
327 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
328 policy: None,
329 secure_transport: false,
330 rate_limits: None,
331 access_log: None,
332 sse_keyring: None,
333 versioning: None,
334 kms: None,
335 kms_default_key_id: None,
336 object_lock: None,
337 cors: None,
338 inventory: None,
339 notifications: None,
340 lifecycle: None,
341 tagging: None,
342 replication: None,
343 mfa_delete: None,
344 compliance_strict: false,
345 sigv4a_gate: None,
346 multipart_state: Arc::new(crate::multipart_state::MultipartStateStore::new()),
347 sse_chunk_size: 0,
353 }
354 }
355
356 #[must_use]
365 pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
366 self.sigv4a_gate = Some(gate);
367 self
368 }
369
370 #[must_use]
377 pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
378 self.sigv4a_gate.as_ref()
379 }
380
381 #[must_use]
388 pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
389 self.tagging = Some(mgr);
390 self
391 }
392
393 #[must_use]
397 pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
398 self.tagging.as_ref()
399 }
400
401 #[must_use]
411 pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
412 self.inventory = Some(mgr);
413 self
414 }
415
416 #[must_use]
421 pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
422 self.inventory.as_ref()
423 }
424
425 #[must_use]
435 pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
436 self.lifecycle = Some(mgr);
437 self
438 }
439
440 #[must_use]
445 pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
446 self.lifecycle.as_ref()
447 }
448
449 #[must_use]
458 pub fn run_lifecycle_once_for_test(
459 &self,
460 bucket: &str,
461 objects: &[crate::lifecycle::EvaluateBatchEntry],
462 ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
463 let Some(mgr) = self.lifecycle.as_ref() else {
464 return Vec::new();
465 };
466 crate::lifecycle::evaluate_batch(mgr, bucket, objects)
467 }
468
469 #[must_use]
479 pub fn with_notifications(
480 mut self,
481 mgr: Arc<crate::notifications::NotificationManager>,
482 ) -> Self {
483 self.notifications = Some(mgr);
484 self
485 }
486
487 #[must_use]
491 pub fn notifications_manager(
492 &self,
493 ) -> Option<&Arc<crate::notifications::NotificationManager>> {
494 self.notifications.as_ref()
495 }
496
497 fn fire_delete_notification(
502 &self,
503 bucket: &str,
504 key: &str,
505 event: crate::notifications::EventType,
506 version_id: Option<String>,
507 ) {
508 let Some(mgr) = self.notifications.as_ref() else {
509 return;
510 };
511 let dests = mgr.match_destinations(bucket, &event, key);
512 if dests.is_empty() {
513 return;
514 }
515 tokio::spawn(crate::notifications::dispatch_event(
516 Arc::clone(mgr),
517 bucket.to_owned(),
518 key.to_owned(),
519 event,
520 None,
521 None,
522 version_id,
523 format!("S4-{}", uuid::Uuid::new_v4()),
524 ));
525 }
526
527 #[must_use]
539 pub fn with_replication(
540 mut self,
541 mgr: Arc<crate::replication::ReplicationManager>,
542 ) -> Self {
543 self.replication = Some(mgr);
544 self
545 }
546
547 #[must_use]
551 pub fn replication_manager(
552 &self,
553 ) -> Option<&Arc<crate::replication::ReplicationManager>> {
554 self.replication.as_ref()
555 }
556
557 fn spawn_replication_if_matched(
567 &self,
568 source_bucket: &str,
569 source_key: &str,
570 request_tags: &Option<crate::tagging::TagSet>,
571 body: &bytes::Bytes,
572 metadata: &Option<std::collections::HashMap<String, String>>,
573 backend_ok: bool,
574 ) where
575 B: Send + Sync + 'static,
576 {
577 if !backend_ok {
578 return;
579 }
580 let Some(mgr) = self.replication.as_ref() else {
581 return;
582 };
583 let object_tags: Vec<(String, String)> = request_tags
590 .as_ref()
591 .map(|ts| ts.iter().cloned().collect())
592 .unwrap_or_default();
593 let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
594 return;
595 };
596 mgr.record_status(
600 source_bucket,
601 source_key,
602 crate::replication::ReplicationStatus::Pending,
603 );
604 let mgr_cl = Arc::clone(mgr);
605 let backend = Arc::clone(&self.backend);
606 let body_cl = body.clone();
607 let metadata_cl = metadata.clone();
608 let source_bucket_cl = source_bucket.to_owned();
609 let source_key_cl = source_key.to_owned();
610 tokio::spawn(async move {
611 let do_put = move |dest_bucket: String,
612 dest_key: String,
613 dest_body: bytes::Bytes,
614 dest_meta: Option<std::collections::HashMap<String, String>>| {
615 let backend = Arc::clone(&backend);
616 async move {
617 let req = S3Request {
618 input: PutObjectInput {
619 bucket: dest_bucket,
620 key: dest_key,
621 body: Some(bytes_to_blob(dest_body)),
622 metadata: dest_meta,
623 ..Default::default()
624 },
625 method: http::Method::PUT,
626 uri: "/".parse().unwrap(),
627 headers: http::HeaderMap::new(),
628 extensions: http::Extensions::new(),
629 credentials: None,
630 region: None,
631 service: None,
632 trailing_headers: None,
633 };
634 backend
635 .put_object(req)
636 .await
637 .map(|_| ())
638 .map_err(|e| format!("destination put_object: {e}"))
639 }
640 };
641 crate::replication::replicate_object(
642 rule,
643 source_bucket_cl,
644 source_key_cl,
645 body_cl,
646 metadata_cl,
647 do_put,
648 mgr_cl,
649 )
650 .await;
651 });
652 }
653
654 #[must_use]
661 pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
662 self.mfa_delete = Some(mgr);
663 self
664 }
665
666 #[must_use]
670 pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
671 self.mfa_delete.as_ref()
672 }
673
674 #[must_use]
680 pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
681 self.cors = Some(mgr);
682 self
683 }
684
685 #[must_use]
687 pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
688 self.cors.as_ref()
689 }
690
691 #[must_use]
708 pub fn handle_preflight(
709 &self,
710 bucket: &str,
711 origin: &str,
712 method: &str,
713 request_headers: &[String],
714 ) -> Option<std::collections::HashMap<String, String>> {
715 let mgr = self.cors.as_ref()?;
716 let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
717 let mut h = std::collections::HashMap::new();
718 let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
722 "*".to_string()
723 } else {
724 origin.to_string()
725 };
726 h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
727 h.insert(
728 "Access-Control-Allow-Methods".to_string(),
729 rule.allowed_methods.join(", "),
730 );
731 if !rule.allowed_headers.is_empty() {
732 h.insert(
737 "Access-Control-Allow-Headers".to_string(),
738 rule.allowed_headers.join(", "),
739 );
740 }
741 if let Some(secs) = rule.max_age_seconds {
742 h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
743 }
744 if !rule.expose_headers.is_empty() {
745 h.insert(
746 "Access-Control-Expose-Headers".to_string(),
747 rule.expose_headers.join(", "),
748 );
749 }
750 Some(h)
751 }
752
753 #[must_use]
760 pub fn with_compliance_strict(mut self, on: bool) -> Self {
761 self.compliance_strict = on;
762 self
763 }
764
765 #[must_use]
771 pub fn with_object_lock(
772 mut self,
773 mgr: Arc<crate::object_lock::ObjectLockManager>,
774 ) -> Self {
775 self.object_lock = Some(mgr);
776 self
777 }
778
779 #[must_use]
787 pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
788 self.object_lock.as_ref()
789 }
790
791 #[must_use]
795 pub fn with_kms_backend(
796 mut self,
797 kms: Arc<dyn crate::kms::KmsBackend>,
798 default_key_id: Option<String>,
799 ) -> Self {
800 self.kms = Some(kms);
801 self.kms_default_key_id = default_key_id;
802 self
803 }
804
805 #[must_use]
817 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
818 self.versioning = Some(mgr);
819 self
820 }
821
822 #[must_use]
829 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
830 let keyring = crate::sse::SseKeyring::new(1, key);
831 self.sse_keyring = Some(std::sync::Arc::new(keyring));
832 self
833 }
834
835 #[must_use]
841 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
842 self.sse_keyring = Some(keyring);
843 self
844 }
845
846 #[must_use]
862 pub fn with_sse_chunk_size(mut self, bytes: usize) -> Self {
863 self.sse_chunk_size = bytes;
864 self
865 }
866
867 #[must_use]
873 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
874 self.access_log = Some(log);
875 self
876 }
877
878 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
883 self.access_log.as_ref()?;
884 Some(AccessLogPreamble {
885 remote_ip: req
886 .headers
887 .get("x-forwarded-for")
888 .and_then(|v| v.to_str().ok())
889 .and_then(|raw| raw.split(',').next())
890 .map(|s| s.trim().to_owned()),
891 requester: Self::principal_of(req).map(str::to_owned),
892 request_uri: format!("{} {}", req.method, req.uri.path()),
893 user_agent: req
894 .headers
895 .get("user-agent")
896 .and_then(|v| v.to_str().ok())
897 .map(str::to_owned),
898 })
899 }
900
901 #[allow(clippy::too_many_arguments)]
905 async fn record_access(
906 &self,
907 preamble: Option<AccessLogPreamble>,
908 operation: &'static str,
909 bucket: &str,
910 key: Option<&str>,
911 http_status: u16,
912 bytes_sent: u64,
913 object_size: u64,
914 total_time_ms: u64,
915 error_code: Option<&str>,
916 ) {
917 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
918 return;
919 };
920 log.record(crate::access_log::AccessLogEntry {
921 time: std::time::SystemTime::now(),
922 bucket: bucket.to_owned(),
923 remote_ip: p.remote_ip,
924 requester: p.requester,
925 operation,
926 key: key.map(str::to_owned),
927 request_uri: p.request_uri,
928 http_status,
929 error_code: error_code.map(str::to_owned),
930 bytes_sent,
931 object_size,
932 total_time_ms,
933 user_agent: p.user_agent,
934 })
935 .await;
936 }
937
938 #[must_use]
944 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
945 self.rate_limits = Some(rl);
946 self
947 }
948
949 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
953 let Some(rl) = self.rate_limits.as_ref() else {
954 return Ok(());
955 };
956 let principal_id = Self::principal_of(req);
957 if !rl.check(principal_id, bucket) {
958 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
959 return Err(S3Error::with_message(
960 S3ErrorCode::SlowDown,
961 format!("rate-limited: bucket={bucket}"),
962 ));
963 }
964 Ok(())
965 }
966
967 #[must_use]
971 pub fn with_secure_transport(mut self, on: bool) -> Self {
972 self.secure_transport = on;
973 self
974 }
975
976 #[must_use]
977 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
978 self.max_body_bytes = n;
979 self
980 }
981
982 #[must_use]
987 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
988 self.policy = Some(policy);
989 self
990 }
991
992 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
995 req.credentials.as_ref().map(|c| c.access_key.as_str())
996 }
997
998 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
1005 let user_agent = req
1006 .headers
1007 .get("user-agent")
1008 .and_then(|v| v.to_str().ok())
1009 .map(str::to_owned);
1010 let source_ip = req
1013 .headers
1014 .get("x-forwarded-for")
1015 .and_then(|v| v.to_str().ok())
1016 .and_then(|raw| raw.split(',').next())
1017 .and_then(|s| s.trim().parse().ok());
1018 crate::policy::RequestContext {
1019 source_ip,
1020 user_agent,
1021 request_time: Some(std::time::SystemTime::now()),
1022 secure_transport: self.secure_transport,
1023 existing_object_tags: None,
1024 request_object_tags: None,
1025 extra: Default::default(),
1026 }
1027 }
1028
1029 fn enforce_policy<I>(
1034 &self,
1035 req: &S3Request<I>,
1036 action: &'static str,
1037 bucket: &str,
1038 key: Option<&str>,
1039 ) -> S3Result<()> {
1040 self.enforce_policy_with_extra(req, action, bucket, key, None, None)
1041 }
1042
1043 fn enforce_policy_with_extra<I>(
1050 &self,
1051 req: &S3Request<I>,
1052 action: &'static str,
1053 bucket: &str,
1054 key: Option<&str>,
1055 request_tags: Option<&crate::tagging::TagSet>,
1056 existing_tags: Option<&crate::tagging::TagSet>,
1057 ) -> S3Result<()> {
1058 let Some(policy) = self.policy.as_ref() else {
1059 return Ok(());
1060 };
1061 let principal_id = Self::principal_of(req);
1062 let mut ctx = self.request_context(req);
1063 if let Some(t) = request_tags {
1064 ctx.request_object_tags = Some(t.clone());
1065 }
1066 if let Some(t) = existing_tags {
1067 ctx.existing_object_tags = Some(t.clone());
1068 }
1069 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1070 if decision.allow {
1071 Ok(())
1072 } else {
1073 crate::metrics::record_policy_denial(action, bucket);
1074 tracing::info!(
1075 action,
1076 bucket,
1077 key = ?key,
1078 principal = ?principal_id,
1079 source_ip = ?ctx.source_ip,
1080 user_agent = ?ctx.user_agent,
1081 secure_transport = ctx.secure_transport,
1082 matched_sid = ?decision.matched_sid,
1083 effect = ?decision.matched_effect,
1084 "S4 policy denied request"
1085 );
1086 Err(S3Error::with_message(
1087 S3ErrorCode::AccessDenied,
1088 format!("denied by S4 policy: {action} on bucket={bucket}"),
1089 ))
1090 }
1091 }
1092
1093 pub fn into_backend(self) -> B {
1099 Arc::try_unwrap(self.backend)
1100 .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1101 }
1102
1103 async fn partial_range_get(
1106 &self,
1107 req: &S3Request<GetObjectInput>,
1108 plan: s4_codec::index::RangePlan,
1109 client_start: u64,
1110 client_end_exclusive: u64,
1111 total_original: u64,
1112 get_start: Instant,
1113 ) -> S3Result<S3Response<GetObjectOutput>> {
1114 let backend_range = s3s::dto::Range::Int {
1116 first: plan.byte_start,
1117 last: Some(plan.byte_end_exclusive - 1),
1118 };
1119 let backend_input = GetObjectInput {
1120 bucket: req.input.bucket.clone(),
1121 key: req.input.key.clone(),
1122 range: Some(backend_range),
1123 ..Default::default()
1124 };
1125 let backend_req = S3Request {
1126 input: backend_input,
1127 method: req.method.clone(),
1128 uri: req.uri.clone(),
1129 headers: req.headers.clone(),
1130 extensions: http::Extensions::new(),
1131 credentials: req.credentials.clone(),
1132 region: req.region.clone(),
1133 service: req.service.clone(),
1134 trailing_headers: None,
1135 };
1136 let mut backend_resp = self.backend.get_object(backend_req).await?;
1137 let blob = backend_resp.output.body.take().ok_or_else(|| {
1138 S3Error::with_message(
1139 S3ErrorCode::InternalError,
1140 "backend partial GET returned empty body",
1141 )
1142 })?;
1143 let bytes = collect_blob(blob, self.max_body_bytes)
1144 .await
1145 .map_err(internal("collect partial body"))?;
1146
1147 let mut combined = BytesMut::new();
1149 for frame in FrameIter::new(bytes) {
1150 let (header, payload) = frame.map_err(|e| {
1151 S3Error::with_message(
1152 S3ErrorCode::InternalError,
1153 format!("partial-range frame parse: {e}"),
1154 )
1155 })?;
1156 let chunk_manifest = ChunkManifest {
1157 codec: header.codec,
1158 original_size: header.original_size,
1159 compressed_size: header.compressed_size,
1160 crc32c: header.crc32c,
1161 };
1162 let decompressed = self
1163 .registry
1164 .decompress(payload, &chunk_manifest)
1165 .await
1166 .map_err(internal("partial-range decompress"))?;
1167 combined.extend_from_slice(&decompressed);
1168 }
1169 let combined = combined.freeze();
1170 let sliced = combined
1171 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1172
1173 let returned_size = sliced.len() as u64;
1175 backend_resp.output.content_length = Some(returned_size as i64);
1176 backend_resp.output.content_range = Some(format!(
1177 "bytes {client_start}-{}/{total_original}",
1178 client_end_exclusive - 1
1179 ));
1180 backend_resp.output.checksum_crc32 = None;
1181 backend_resp.output.checksum_crc32c = None;
1182 backend_resp.output.checksum_crc64nvme = None;
1183 backend_resp.output.checksum_sha1 = None;
1184 backend_resp.output.checksum_sha256 = None;
1185 backend_resp.output.e_tag = None;
1186 backend_resp.output.body = Some(bytes_to_blob(sliced));
1187 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1188
1189 let elapsed = get_start.elapsed();
1190 crate::metrics::record_get(
1191 "partial",
1192 plan.byte_end_exclusive - plan.byte_start,
1193 returned_size,
1194 elapsed.as_secs_f64(),
1195 true,
1196 );
1197 info!(
1198 op = "get_object",
1199 bucket = %req.input.bucket,
1200 key = %req.input.key,
1201 bytes_in = plan.byte_end_exclusive - plan.byte_start,
1202 bytes_out = returned_size,
1203 total_object_size = total_original,
1204 range = true,
1205 path = "sidecar-partial",
1206 latency_ms = elapsed.as_millis() as u64,
1207 "S4 partial Range GET via sidecar index"
1208 );
1209 Ok(backend_resp)
1210 }
1211
1212 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1216 let bytes = encode_index(index);
1217 let len = bytes.len() as i64;
1218 let sidecar = sidecar_key(key);
1219 let uri = match safe_object_uri(bucket, &sidecar) {
1225 Ok(u) => u,
1226 Err(e) => {
1227 tracing::warn!(
1228 bucket,
1229 key,
1230 "S4 write_sidecar skipped (key not URI-encodable): {e}"
1231 );
1232 return;
1233 }
1234 };
1235 let put_input = PutObjectInput {
1236 bucket: bucket.into(),
1237 key: sidecar,
1238 body: Some(bytes_to_blob(bytes)),
1239 content_length: Some(len),
1240 content_type: Some("application/x-s4-index".into()),
1241 ..Default::default()
1242 };
1243 let put_req = S3Request {
1244 input: put_input,
1245 method: http::Method::PUT,
1246 uri,
1247 headers: http::HeaderMap::new(),
1248 extensions: http::Extensions::new(),
1249 credentials: None,
1250 region: None,
1251 service: None,
1252 trailing_headers: None,
1253 };
1254 if let Err(e) = self.backend.put_object(put_req).await {
1255 tracing::warn!(
1256 bucket,
1257 key,
1258 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1259 );
1260 }
1261 }
1262
1263 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1265 let sidecar = sidecar_key(key);
1266 let uri = safe_object_uri(bucket, &sidecar).ok()?;
1268 let get_input = GetObjectInput {
1269 bucket: bucket.into(),
1270 key: sidecar,
1271 ..Default::default()
1272 };
1273 let get_req = S3Request {
1274 input: get_input,
1275 method: http::Method::GET,
1276 uri,
1277 headers: http::HeaderMap::new(),
1278 extensions: http::Extensions::new(),
1279 credentials: None,
1280 region: None,
1281 service: None,
1282 trailing_headers: None,
1283 };
1284 let resp = self.backend.get_object(get_req).await.ok()?;
1285 let blob = resp.output.body?;
1286 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1287 decode_index(bytes).ok()
1288 }
1289
1290 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1296 let mut out = BytesMut::new();
1297 for frame in FrameIter::new(bytes) {
1298 let (header, payload) = frame.map_err(|e| {
1299 S3Error::with_message(
1300 S3ErrorCode::InternalError,
1301 format!("multipart frame parse: {e}"),
1302 )
1303 })?;
1304 let chunk_manifest = ChunkManifest {
1305 codec: header.codec,
1306 original_size: header.original_size,
1307 compressed_size: header.compressed_size,
1308 crc32c: header.crc32c,
1309 };
1310 let decompressed = self
1311 .registry
1312 .decompress(payload, &chunk_manifest)
1313 .await
1314 .map_err(internal("multipart frame decompress"))?;
1315 out.extend_from_slice(&decompressed);
1316 }
1317 Ok(out.freeze())
1318 }
1319}
1320
1321fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1326 let rest = s
1327 .strip_prefix("bytes=")
1328 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1329 let (a, b) = rest
1330 .split_once('-')
1331 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1332 let first: u64 = a
1333 .parse()
1334 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1335 let last: u64 = b
1336 .parse()
1337 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1338 if last < first {
1339 return Err(format!("CopySourceRange last < first: {s:?}"));
1340 }
1341 Ok(s3s::dto::Range::Int {
1342 first,
1343 last: Some(last),
1344 })
1345}
1346
1347pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1361 format!("{key}.__s4ver__/{version_id}")
1362}
1363
1364fn is_versioning_shadow_key(key: &str) -> bool {
1367 key.contains(".__s4ver__/")
1368}
1369
1370fn current_unix_secs() -> u64 {
1376 std::time::SystemTime::now()
1377 .duration_since(std::time::UNIX_EPOCH)
1378 .map(|d| d.as_secs())
1379 .unwrap_or(0)
1380}
1381
1382fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1390 match e {
1391 crate::mfa::MfaError::Missing => S3Error::with_message(
1392 S3ErrorCode::AccessDenied,
1393 "MFA token required for this operation",
1394 ),
1395 crate::mfa::MfaError::Malformed => S3Error::with_message(
1396 S3ErrorCode::InvalidRequest,
1397 "malformed x-amz-mfa header",
1398 ),
1399 crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1400 S3ErrorCode::AccessDenied,
1401 "MFA serial does not match configured device",
1402 ),
1403 crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1404 S3ErrorCode::AccessDenied,
1405 "invalid MFA code",
1406 ),
1407 }
1408}
1409
1410fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1411 metadata
1412 .as_ref()
1413 .and_then(|m| m.get(META_MULTIPART))
1414 .map(|v| v == "true")
1415 .unwrap_or(false)
1416}
1417
1418const META_CODEC: &str = "s4-codec";
1419const META_ORIGINAL_SIZE: &str = "s4-original-size";
1420const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1421const META_CRC32C: &str = "s4-crc32c";
1422const META_MULTIPART: &str = "s4-multipart";
1425const META_FRAMED: &str = "s4-framed";
1429
1430fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1431 metadata
1432 .as_ref()
1433 .and_then(|m| m.get(META_FRAMED))
1434 .map(|v| v == "true")
1435 .unwrap_or(false)
1436}
1437
1438fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1440 metadata
1441 .as_ref()
1442 .and_then(|m| m.get("s4-encrypted"))
1443 .map(|v| v == "aes-256-gcm")
1444 .unwrap_or(false)
1445}
1446
1447fn extract_sse_c_material(
1454 algorithm: &Option<String>,
1455 key: &Option<String>,
1456 md5: &Option<String>,
1457) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1458 match (algorithm, key, md5) {
1459 (None, None, None) => Ok(None),
1460 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1461 .map(Some)
1462 .map_err(sse_c_error_to_s3),
1463 _ => Err(S3Error::with_message(
1464 S3ErrorCode::InvalidRequest,
1465 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1466 )),
1467 }
1468}
1469
1470fn extract_kms_key_id(
1473 sse: &Option<ServerSideEncryption>,
1474 sse_kms_key_id: &Option<String>,
1475 gateway_default: Option<&str>,
1476) -> Option<String> {
1477 let asks_for_kms = sse
1478 .as_ref()
1479 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1480 .unwrap_or(false);
1481 if !asks_for_kms {
1482 return None;
1483 }
1484 sse_kms_key_id
1485 .clone()
1486 .or_else(|| gateway_default.map(str::to_owned))
1487}
1488
1489fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1493 use crate::kms::KmsError as K;
1494 match e {
1495 K::KeyNotFound { key_id } => S3Error::with_message(
1496 S3ErrorCode::InvalidArgument,
1497 format!("KMS key not found: {key_id}"),
1498 ),
1499 K::BackendUnavailable { message } => S3Error::with_message(
1500 S3ErrorCode::ServiceUnavailable,
1501 format!("KMS backend unavailable: {message}"),
1502 ),
1503 other => S3Error::with_message(
1504 S3ErrorCode::InternalError,
1505 format!("KMS error: {other}"),
1506 ),
1507 }
1508}
1509
1510fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1514 use crate::sse::SseError as E;
1515 match e {
1516 E::WrongCustomerKey => S3Error::with_message(
1517 S3ErrorCode::AccessDenied,
1518 "SSE-C key does not match the key used at PUT time",
1519 ),
1520 E::InvalidCustomerKey { reason } => S3Error::with_message(
1521 S3ErrorCode::InvalidArgument,
1522 format!("SSE-C: {reason}"),
1523 ),
1524 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1525 S3ErrorCode::InvalidArgument,
1526 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1527 ),
1528 E::CustomerKeyRequired => S3Error::with_message(
1529 S3ErrorCode::InvalidRequest,
1530 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1531 ),
1532 E::CustomerKeyUnexpected => S3Error::with_message(
1533 S3ErrorCode::InvalidRequest,
1534 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1535 ),
1536 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1537 }
1538}
1539
1540fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1541 let m = metadata.as_ref()?;
1542 let codec = m
1543 .get(META_CODEC)
1544 .and_then(|s| s.parse::<CodecKind>().ok())?;
1545 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1546 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1547 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1548 Some(ChunkManifest {
1549 codec,
1550 original_size,
1551 compressed_size,
1552 crc32c,
1553 })
1554}
1555
1556fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1557 let meta = metadata.get_or_insert_with(Default::default);
1558 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1559 meta.insert(
1560 META_ORIGINAL_SIZE.into(),
1561 manifest.original_size.to_string(),
1562 );
1563 meta.insert(
1564 META_COMPRESSED_SIZE.into(),
1565 manifest.compressed_size.to_string(),
1566 );
1567 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1568}
1569
1570fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1571 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1572}
1573
1574fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1580 use crate::select::SelectError;
1581 match e {
1582 SelectError::Parse(msg) => S3Error::with_message(
1583 S3ErrorCode::InvalidRequest,
1584 format!("SQL parse error: {msg}"),
1585 ),
1586 SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1587 S3ErrorCode::InvalidRequest,
1588 format!("unsupported SQL feature: {msg}"),
1589 ),
1590 SelectError::RowEval(msg) => S3Error::with_message(
1591 S3ErrorCode::InvalidRequest,
1592 format!("SQL row evaluation error: {msg}"),
1593 ),
1594 SelectError::InputFormat(msg) => S3Error::with_message(
1595 S3ErrorCode::InvalidRequest,
1596 format!("{fmt} input format error: {msg}"),
1597 ),
1598 }
1599}
1600
1601fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1605 headers
1606 .get("x-amz-bypass-governance-retention")
1607 .and_then(|v| v.to_str().ok())
1608 .map(|s| s.eq_ignore_ascii_case("true"))
1609 .unwrap_or(false)
1610}
1611
1612fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1618 let mut buf = Vec::new();
1619 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1620 let s = std::str::from_utf8(&buf).ok()?;
1621 chrono::DateTime::parse_from_rfc3339(s)
1622 .ok()
1623 .map(|dt| dt.with_timezone(&chrono::Utc))
1624}
1625
1626fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1629 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1634 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1635}
1636
1637fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1642 set.iter()
1643 .map(|(k, v)| Tag {
1644 key: Some(k.clone()),
1645 value: Some(v.clone()),
1646 })
1647 .collect()
1648}
1649
1650fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1655 let pairs = tags
1656 .iter()
1657 .map(|t| {
1658 (
1659 t.key.clone().unwrap_or_default(),
1660 t.value.clone().unwrap_or_default(),
1661 )
1662 })
1663 .collect();
1664 crate::tagging::TagSet::from_pairs(pairs)
1665}
1666
1667pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1671 if total == 0 {
1672 return Err("cannot range-get zero-length object".into());
1673 }
1674 match range {
1675 s3s::dto::Range::Int { first, last } => {
1676 let start = *first;
1677 let end_inclusive = match last {
1678 Some(l) => (*l).min(total - 1),
1679 None => total - 1,
1680 };
1681 if start > end_inclusive || start >= total {
1682 return Err(format!(
1683 "range bytes={start}-{:?} out of object size {total}",
1684 last
1685 ));
1686 }
1687 Ok((start, end_inclusive + 1))
1688 }
1689 s3s::dto::Range::Suffix { length } => {
1690 let len = (*length).min(total);
1691 Ok((total - len, total))
1692 }
1693 }
1694}
1695
1696#[async_trait::async_trait]
1697impl<B: S3> S3 for S4Service<B> {
1698 #[tracing::instrument(
1700 name = "s4.put_object",
1701 skip(self, req),
1702 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1703 )]
1704 async fn put_object(
1705 &self,
1706 mut req: S3Request<PutObjectInput>,
1707 ) -> S3Result<S3Response<PutObjectOutput>> {
1708 let put_start = Instant::now();
1709 let put_bucket = req.input.bucket.clone();
1710 let put_key = req.input.key.clone();
1711 let access_preamble = self.access_log_preamble(&req);
1712 self.enforce_rate_limit(&req, &put_bucket)?;
1713 let request_tags: Option<crate::tagging::TagSet> = req
1719 .input
1720 .tagging
1721 .as_deref()
1722 .map(crate::tagging::parse_tagging_header)
1723 .transpose()
1724 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1725 let existing_tags: Option<crate::tagging::TagSet> = self
1726 .tagging
1727 .as_ref()
1728 .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1729 self.enforce_policy_with_extra(
1730 &req,
1731 "s3:PutObject",
1732 &put_bucket,
1733 Some(&put_key),
1734 request_tags.as_ref(),
1735 existing_tags.as_ref(),
1736 )?;
1737 if let Some(mgr) = self.object_lock.as_ref()
1745 && let Some(state) = mgr.get(&put_bucket, &put_key)
1746 {
1747 let bucket_versioned_enabled = self
1748 .versioning
1749 .as_ref()
1750 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1751 .unwrap_or(false);
1752 if !bucket_versioned_enabled {
1753 let bypass = parse_bypass_governance_header(&req.headers);
1754 let now = chrono::Utc::now();
1755 if !state.can_delete(now, bypass) {
1756 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1757 return Err(S3Error::with_message(
1758 S3ErrorCode::AccessDenied,
1759 "Access Denied because object protected by object lock",
1760 ));
1761 }
1762 }
1763 }
1764 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1770 .input
1771 .object_lock_mode
1772 .as_ref()
1773 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1774 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1775 .input
1776 .object_lock_retain_until_date
1777 .as_ref()
1778 .and_then(timestamp_to_chrono_utc);
1779 let explicit_legal_hold_on: Option<bool> = req
1780 .input
1781 .object_lock_legal_hold_status
1782 .as_ref()
1783 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1784 if let Some(blob) = req.input.body.take() {
1785 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1788 .await
1789 .map_err(internal("peek put sample"))?;
1790 let sample_len = sample.len().min(SAMPLE_BYTES);
1791 let total_size_hint = req.input.content_length.and_then(|n| u64::try_from(n).ok());
1795 let kind = self
1796 .dispatcher
1797 .pick_with_size_hint(&sample[..sample_len], total_size_hint)
1798 .await;
1799
1800 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1806 let (compressed, manifest, is_framed) = if use_framed {
1807 let chained = chain_sample_with_rest(sample, rest_stream);
1809 debug!(
1810 bucket = ?req.input.bucket,
1811 key = ?req.input.key,
1812 codec = kind.as_str(),
1813 path = "streaming-framed",
1814 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1815 );
1816 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1820 let (body, manifest) = streaming_compress_to_frames(
1821 chained,
1822 Arc::clone(&self.registry),
1823 kind,
1824 chunk_size,
1825 )
1826 .await
1827 .map_err(internal("streaming framed compress"))?;
1828 (body, manifest, true)
1829 } else {
1830 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1833 .await
1834 .map_err(internal("collect put body (buffered path)"))?;
1835 debug!(
1836 bucket = ?req.input.bucket,
1837 key = ?req.input.key,
1838 bytes = bytes.len(),
1839 codec = kind.as_str(),
1840 path = "buffered",
1841 "S4 put_object: compressing (buffered, raw blob)"
1842 );
1843 let (compress_res, tel) =
1849 self.registry.compress_with_telemetry(bytes, kind).await;
1850 stamp_gpu_compress_telemetry(&tel);
1851 let (body, m) = compress_res.map_err(internal("registry compress"))?;
1852 (body, m, false)
1853 };
1854
1855 write_manifest(&mut req.input.metadata, &manifest);
1856 if is_framed {
1857 req.input
1859 .metadata
1860 .get_or_insert_with(Default::default)
1861 .insert(META_FRAMED.into(), "true".into());
1862 }
1863 req.input.content_length = Some(compressed.len() as i64);
1867 req.input.checksum_algorithm = None;
1872 req.input.checksum_crc32 = None;
1873 req.input.checksum_crc32c = None;
1874 req.input.checksum_crc64nvme = None;
1875 req.input.checksum_sha1 = None;
1876 req.input.checksum_sha256 = None;
1877 req.input.content_md5 = None;
1878 let original_size = manifest.original_size;
1879 let compressed_size = manifest.compressed_size;
1880 let codec_label = manifest.codec.as_str();
1881 let sidecar_index = if is_framed {
1884 s4_codec::index::build_index_from_body(&compressed).ok()
1885 } else {
1886 None
1887 };
1888 let sse_c_alg = req.input.sse_customer_algorithm.take();
1905 let sse_c_key = req.input.sse_customer_key.take();
1906 let sse_c_md5 = req.input.sse_customer_key_md5.take();
1907 let sse_header = req.input.server_side_encryption.take();
1908 let sse_kms_key = req.input.ssekms_key_id.take();
1909 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1910 let kms_key_id = extract_kms_key_id(
1915 &sse_header,
1916 &sse_kms_key,
1917 self.kms_default_key_id.as_deref(),
1918 );
1919 if self.compliance_strict
1926 && sse_c_material.is_none()
1927 && kms_key_id.is_none()
1928 && self.sse_keyring.is_none()
1929 && sse_header.as_ref().map(|s| s.as_str())
1930 != Some(ServerSideEncryption::AES256)
1931 {
1932 return Err(S3Error::with_message(
1933 S3ErrorCode::InvalidRequest,
1934 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1935 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1936 ));
1937 }
1938 if sse_c_material.is_some() && kms_key_id.is_some() {
1941 return Err(S3Error::with_message(
1942 S3ErrorCode::InvalidArgument,
1943 "SSE-C and SSE-KMS cannot be used together on the same PUT",
1944 ));
1945 }
1946 let kms_wrap: Option<(zeroize::Zeroizing<[u8; 32]>, crate::kms::WrappedDek)> =
1963 if let Some(ref key_id) = kms_key_id {
1964 let kms = self.kms.as_ref().ok_or_else(|| {
1965 S3Error::with_message(
1966 S3ErrorCode::InvalidRequest,
1967 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1968 )
1969 })?;
1970 let (dek, wrapped) = kms
1973 .generate_dek(key_id)
1974 .await
1975 .map_err(kms_error_to_s3)?;
1976 if dek.len() != 32 {
1977 return Err(S3Error::with_message(
1978 S3ErrorCode::InternalError,
1979 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1980 ));
1981 }
1982 let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
1983 zeroize::Zeroizing::new([0u8; 32]);
1984 dek_arr.copy_from_slice(&dek);
1985 Some((dek_arr, wrapped))
1988 } else {
1989 None
1990 };
1991 let body_to_send = if let Some(ref m) = sse_c_material {
1998 let meta = req.input.metadata.get_or_insert_with(Default::default);
1999 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2000 meta.insert("s4-sse-type".into(), "AES256".into());
2001 meta.insert("s4-sse-c-key-md5".into(),
2002 base64::engine::general_purpose::STANDARD.encode(m.key_md5));
2003 crate::sse::encrypt_with_source(
2004 &compressed,
2005 crate::sse::SseSource::CustomerKey {
2006 key: &m.key,
2007 key_md5: &m.key_md5,
2008 },
2009 )
2010 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
2011 let meta = req.input.metadata.get_or_insert_with(Default::default);
2012 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2013 meta.insert("s4-sse-type".into(), "aws:kms".into());
2014 meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
2015 let dek_ref: &[u8; 32] = dek;
2022 crate::sse::encrypt_with_source(
2023 &compressed,
2024 crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
2025 )
2026 } else if let Some(keyring) = self.sse_keyring.as_ref() {
2027 let meta = req.input.metadata.get_or_insert_with(Default::default);
2035 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2036 if self.sse_chunk_size > 0 {
2043 crate::sse::encrypt_v2_chunked(
2044 &compressed,
2045 keyring,
2046 self.sse_chunk_size,
2047 )
2048 .map_err(|e| {
2049 S3Error::with_message(
2050 S3ErrorCode::InternalError,
2051 format!("SSE-S4 chunked encrypt failed: {e}"),
2052 )
2053 })?
2054 } else {
2055 crate::sse::encrypt_v2(&compressed, keyring)
2056 }
2057 } else {
2058 compressed.clone()
2059 };
2060 let replication_body = body_to_send.clone();
2065 let replication_metadata = req.input.metadata.clone();
2066 req.input.content_length = Some(body_to_send.len() as i64);
2075 req.input.body = Some(bytes_to_blob(body_to_send));
2076 let pending_version: Option<crate::versioning::PutOutcome> = self
2085 .versioning
2086 .as_ref()
2087 .map(|mgr| mgr.state(&put_bucket))
2088 .map(|state| match state {
2089 crate::versioning::VersioningState::Enabled => {
2090 crate::versioning::PutOutcome {
2091 version_id: crate::versioning::VersioningManager::new_version_id(),
2092 versioned_response: true,
2093 }
2094 }
2095 crate::versioning::VersioningState::Suspended
2096 | crate::versioning::VersioningState::Unversioned => {
2097 crate::versioning::PutOutcome {
2098 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2099 versioned_response: false,
2100 }
2101 }
2102 });
2103 if let Some(ref pv) = pending_version
2104 && pv.versioned_response
2105 {
2106 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2107 }
2108 let mut backend_resp = self.backend.put_object(req).await;
2109 if let Some(idx) = sidecar_index
2110 && backend_resp.is_ok()
2111 && idx.entries.len() > 1
2112 {
2113 self.write_sidecar(&put_bucket, &put_key, &idx).await;
2119 }
2120 if let (Some(mgr), Some(pv), Ok(resp)) = (
2124 self.versioning.as_ref(),
2125 pending_version.as_ref(),
2126 backend_resp.as_mut(),
2127 ) {
2128 let etag = resp
2129 .output
2130 .e_tag
2131 .clone()
2132 .map(ETag::into_value)
2133 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2134 let now = chrono::Utc::now();
2135 mgr.commit_put_with_version(
2136 &put_bucket,
2137 &put_key,
2138 crate::versioning::VersionEntry {
2139 version_id: pv.version_id.clone(),
2140 etag,
2141 size: original_size,
2142 is_delete_marker: false,
2143 created_at: now,
2144 },
2145 );
2146 if pv.versioned_response {
2147 resp.output.version_id = Some(pv.version_id.clone());
2148 }
2149 }
2150 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2154 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2155 resp.output.sse_customer_key_md5 = Some(
2156 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2157 );
2158 }
2159 if let (Some((_, wrapped)), Ok(resp)) =
2163 (kms_wrap.as_ref(), backend_resp.as_mut())
2164 {
2165 resp.output.server_side_encryption =
2166 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2167 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2168 }
2169 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2175 if explicit_lock_mode.is_some()
2176 || explicit_retain_until.is_some()
2177 || explicit_legal_hold_on.is_some()
2178 {
2179 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2180 if let Some(m) = explicit_lock_mode {
2181 state.mode = Some(m);
2182 }
2183 if let Some(u) = explicit_retain_until {
2184 state.retain_until = Some(u);
2185 }
2186 if let Some(lh) = explicit_legal_hold_on {
2187 state.legal_hold_on = lh;
2188 }
2189 mgr.set(&put_bucket, &put_key, state);
2190 }
2191 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2192 }
2193 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
2195 crate::metrics::record_put(
2196 codec_label,
2197 original_size,
2198 compressed_size,
2199 elapsed.as_secs_f64(),
2200 backend_resp.is_ok(),
2201 );
2202 self.record_access(
2204 access_preamble,
2205 "REST.PUT.OBJECT",
2206 &put_bucket,
2207 Some(&put_key),
2208 if backend_resp.is_ok() { 200 } else { 500 },
2209 compressed_size,
2210 original_size,
2211 elapsed.as_millis() as u64,
2212 backend_resp.as_ref().err().map(|e| e.code().as_str()),
2213 )
2214 .await;
2215 info!(
2216 op = "put_object",
2217 bucket = %put_bucket,
2218 key = %put_key,
2219 codec = codec_label,
2220 bytes_in = original_size,
2221 bytes_out = compressed_size,
2222 ratio = format!(
2223 "{:.3}",
2224 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2225 ),
2226 latency_ms = elapsed.as_millis() as u64,
2227 ok = backend_resp.is_ok(),
2228 "S4 put completed"
2229 );
2230 if backend_resp.is_ok()
2235 && let Some(mgr) = self.notifications.as_ref()
2236 {
2237 let dests = mgr.match_destinations(
2238 &put_bucket,
2239 &crate::notifications::EventType::ObjectCreatedPut,
2240 &put_key,
2241 );
2242 if !dests.is_empty() {
2243 let etag = backend_resp
2244 .as_ref()
2245 .ok()
2246 .and_then(|r| r.output.e_tag.clone())
2247 .map(ETag::into_value);
2248 let version_id = pending_version
2249 .as_ref()
2250 .filter(|pv| pv.versioned_response)
2251 .map(|pv| pv.version_id.clone());
2252 tokio::spawn(crate::notifications::dispatch_event(
2253 Arc::clone(mgr),
2254 put_bucket.clone(),
2255 put_key.clone(),
2256 crate::notifications::EventType::ObjectCreatedPut,
2257 Some(original_size),
2258 etag,
2259 version_id,
2260 format!("S4-{}", uuid::Uuid::new_v4()),
2261 ));
2262 }
2263 }
2264 if backend_resp.is_ok()
2269 && let (Some(mgr), Some(tags)) =
2270 (self.tagging.as_ref(), request_tags.clone())
2271 {
2272 mgr.put_object_tags(&put_bucket, &put_key, tags);
2273 }
2274 self.spawn_replication_if_matched(
2283 &put_bucket,
2284 &put_key,
2285 &request_tags,
2286 &replication_body,
2287 &replication_metadata,
2288 backend_resp.is_ok(),
2289 );
2290 return backend_resp;
2291 }
2292 let pending_version: Option<crate::versioning::PutOutcome> = self
2296 .versioning
2297 .as_ref()
2298 .map(|mgr| mgr.state(&put_bucket))
2299 .map(|state| match state {
2300 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2301 version_id: crate::versioning::VersioningManager::new_version_id(),
2302 versioned_response: true,
2303 },
2304 _ => crate::versioning::PutOutcome {
2305 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2306 versioned_response: false,
2307 },
2308 });
2309 if let Some(ref pv) = pending_version
2310 && pv.versioned_response
2311 {
2312 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2313 }
2314 let mut backend_resp = self.backend.put_object(req).await;
2315 if let (Some(mgr), Some(pv), Ok(resp)) = (
2316 self.versioning.as_ref(),
2317 pending_version.as_ref(),
2318 backend_resp.as_mut(),
2319 ) {
2320 let etag = resp
2321 .output
2322 .e_tag
2323 .clone()
2324 .map(ETag::into_value)
2325 .unwrap_or_default();
2326 let now = chrono::Utc::now();
2327 mgr.commit_put_with_version(
2328 &put_bucket,
2329 &put_key,
2330 crate::versioning::VersionEntry {
2331 version_id: pv.version_id.clone(),
2332 etag,
2333 size: 0,
2334 is_delete_marker: false,
2335 created_at: now,
2336 },
2337 );
2338 if pv.versioned_response {
2339 resp.output.version_id = Some(pv.version_id.clone());
2340 }
2341 }
2342 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2346 if explicit_lock_mode.is_some()
2347 || explicit_retain_until.is_some()
2348 || explicit_legal_hold_on.is_some()
2349 {
2350 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2351 if let Some(m) = explicit_lock_mode {
2352 state.mode = Some(m);
2353 }
2354 if let Some(u) = explicit_retain_until {
2355 state.retain_until = Some(u);
2356 }
2357 if let Some(lh) = explicit_legal_hold_on {
2358 state.legal_hold_on = lh;
2359 }
2360 mgr.set(&put_bucket, &put_key, state);
2361 }
2362 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2363 }
2364 if backend_resp.is_ok()
2368 && let Some(mgr) = self.notifications.as_ref()
2369 {
2370 let dests = mgr.match_destinations(
2371 &put_bucket,
2372 &crate::notifications::EventType::ObjectCreatedPut,
2373 &put_key,
2374 );
2375 if !dests.is_empty() {
2376 let etag = backend_resp
2377 .as_ref()
2378 .ok()
2379 .and_then(|r| r.output.e_tag.clone())
2380 .map(ETag::into_value);
2381 let version_id = pending_version
2382 .as_ref()
2383 .filter(|pv| pv.versioned_response)
2384 .map(|pv| pv.version_id.clone());
2385 tokio::spawn(crate::notifications::dispatch_event(
2386 Arc::clone(mgr),
2387 put_bucket.clone(),
2388 put_key.clone(),
2389 crate::notifications::EventType::ObjectCreatedPut,
2390 Some(0),
2391 etag,
2392 version_id,
2393 format!("S4-{}", uuid::Uuid::new_v4()),
2394 ));
2395 }
2396 }
2397 if backend_resp.is_ok()
2401 && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2402 {
2403 mgr.put_object_tags(&put_bucket, &put_key, tags);
2404 }
2405 self.spawn_replication_if_matched(
2408 &put_bucket,
2409 &put_key,
2410 &request_tags,
2411 &bytes::Bytes::new(),
2412 &None,
2413 backend_resp.is_ok(),
2414 );
2415 backend_resp
2416 }
2417
2418 #[tracing::instrument(
2420 name = "s4.get_object",
2421 skip(self, req),
2422 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2423 )]
2424 async fn get_object(
2425 &self,
2426 mut req: S3Request<GetObjectInput>,
2427 ) -> S3Result<S3Response<GetObjectOutput>> {
2428 let get_start = Instant::now();
2429 let get_bucket = req.input.bucket.clone();
2430 let get_key = req.input.key.clone();
2431 self.enforce_rate_limit(&req, &get_bucket)?;
2432 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2433 let range_request = req.input.range.take();
2435 let sse_c_alg = req.input.sse_customer_algorithm.take();
2441 let sse_c_key = req.input.sse_customer_key.take();
2442 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2443 let get_sse_c_material =
2444 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2445
2446 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2459 Some(mgr)
2460 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2461 {
2462 let req_vid = req.input.version_id.take();
2463 let entry = match req_vid.as_deref() {
2464 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2465 || S3Error::with_message(
2466 S3ErrorCode::NoSuchVersion,
2467 format!("no such version: {vid}"),
2468 ),
2469 )?,
2470 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2471 S3Error::with_message(
2472 S3ErrorCode::NoSuchKey,
2473 format!("no such key: {get_key}"),
2474 )
2475 })?,
2476 };
2477 if entry.is_delete_marker {
2478 return Err(S3Error::with_message(
2486 S3ErrorCode::NoSuchKey,
2487 format!("delete marker is the current version of {get_key}"),
2488 ));
2489 }
2490 if entry.version_id != crate::versioning::NULL_VERSION_ID {
2491 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2492 }
2493 Some(entry.version_id)
2494 }
2495 _ => None,
2496 };
2497
2498 if let Some(ref r) = range_request
2502 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2503 {
2504 let total = index.total_original_size();
2505 let (start, end_exclusive) = match resolve_range(r, total) {
2506 Ok(v) => v,
2507 Err(e) => {
2508 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2509 }
2510 };
2511 if let Some(plan) = index.lookup_range(start, end_exclusive) {
2512 return self
2513 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2514 .await;
2515 }
2516 }
2517 let mut resp = self.backend.get_object(req).await?;
2518 if let Some(ref vid) = resolved_version_id {
2523 resp.output.version_id = Some(vid.clone());
2524 }
2525 let is_multipart = is_multipart_object(&resp.output.metadata);
2526 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2527 let needs_frame_parse = is_multipart || is_framed_v2;
2530 let manifest_opt = extract_manifest(&resp.output.metadata);
2531
2532 if !needs_frame_parse && manifest_opt.is_none() {
2533 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2535 return Ok(resp);
2536 }
2537
2538 if let Some(blob) = resp.output.body.take() {
2539 let blob = if is_sse_encrypted(&resp.output.metadata) {
2547 let body = collect_blob(blob, self.max_body_bytes)
2548 .await
2549 .map_err(internal("collect SSE-encrypted body"))?;
2550 if matches!(
2570 crate::sse::peek_magic(&body),
2571 Some("S4E5") | Some("S4E6")
2572 ) && get_sse_c_material.is_none()
2573 {
2574 let keyring_arc = self.sse_keyring.clone().ok_or_else(|| {
2575 S3Error::with_message(
2576 S3ErrorCode::InvalidRequest,
2577 "object is SSE-S4 encrypted (S4E5/S4E6) but no --sse-s4-key is configured on this gateway",
2578 )
2579 })?;
2580 let body_len = body.len() as u64;
2581 let stream =
2582 crate::sse::decrypt_chunked_stream(body, keyring_arc.as_ref());
2583 use futures::StreamExt;
2589 let mapped = stream.map(|r| {
2590 r.map_err(|e| std::io::Error::other(format!(
2591 "SSE-S4 chunked decrypt: {e}"
2592 )))
2593 });
2594 use s3s::dto::StreamingBlob;
2595 resp.output.body = Some(StreamingBlob::wrap(mapped));
2596 resp.output.content_length = None;
2602 resp.output.checksum_crc32 = None;
2609 resp.output.checksum_crc32c = None;
2610 resp.output.checksum_crc64nvme = None;
2611 resp.output.checksum_sha1 = None;
2612 resp.output.checksum_sha256 = None;
2613 resp.output.e_tag = None;
2614 let elapsed = get_start.elapsed();
2615 crate::metrics::record_get(
2616 "sse-s4-chunked",
2617 body_len,
2618 body_len,
2619 elapsed.as_secs_f64(),
2620 true,
2621 );
2622 return Ok(resp);
2623 }
2624 let plain = match crate::sse::peek_magic(&body) {
2625 Some("S4E4") => {
2626 let kms = self.kms.as_ref().ok_or_else(|| {
2627 S3Error::with_message(
2628 S3ErrorCode::InvalidRequest,
2629 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2630 )
2631 })?;
2632 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2633 crate::sse::decrypt_with_kms(&body, kms_ref)
2634 .await
2635 .map_err(|e| match e {
2636 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2637 other => S3Error::with_message(
2638 S3ErrorCode::InternalError,
2639 format!("SSE-KMS decrypt failed: {other}"),
2640 ),
2641 })?
2642 }
2643 _ => {
2644 if let Some(ref m) = get_sse_c_material {
2645 crate::sse::decrypt(
2646 &body,
2647 crate::sse::SseSource::CustomerKey {
2648 key: &m.key,
2649 key_md5: &m.key_md5,
2650 },
2651 )
2652 .map_err(sse_c_error_to_s3)?
2653 } else {
2654 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2655 S3Error::with_message(
2656 S3ErrorCode::InvalidRequest,
2657 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2658 )
2659 })?;
2660 crate::sse::decrypt(&body, keyring).map_err(|e| {
2661 S3Error::with_message(
2662 S3ErrorCode::InternalError,
2663 format!("SSE-S4 decrypt failed: {e}"),
2664 )
2665 })?
2666 }
2667 }
2668 };
2669 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2672 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2673 {
2674 resp.output.server_side_encryption = Some(
2675 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2676 );
2677 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2678 }
2679 bytes_to_blob(plain)
2680 } else if let Some(ref m) = get_sse_c_material {
2681 let _ = m;
2684 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2685 } else {
2686 blob
2687 };
2688 if let Some(ref m) = get_sse_c_material {
2691 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2692 resp.output.sse_customer_key_md5 = Some(
2693 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2694 );
2695 }
2696 if range_request.is_none()
2704 && !needs_frame_parse
2705 && let Some(ref m) = manifest_opt
2706 && supports_streaming_decompress(m.codec)
2707 && m.codec == CodecKind::CpuZstd
2708 {
2709 let decompressed_blob = cpu_zstd_decompress_stream(blob);
2710 resp.output.content_length = Some(m.original_size as i64);
2711 resp.output.checksum_crc32 = None;
2712 resp.output.checksum_crc32c = None;
2713 resp.output.checksum_crc64nvme = None;
2714 resp.output.checksum_sha1 = None;
2715 resp.output.checksum_sha256 = None;
2716 resp.output.e_tag = None;
2717 resp.output.body = Some(decompressed_blob);
2718 let elapsed = get_start.elapsed();
2719 crate::metrics::record_get(
2720 m.codec.as_str(),
2721 m.compressed_size,
2722 m.original_size,
2723 elapsed.as_secs_f64(),
2724 true,
2725 );
2726 info!(
2727 op = "get_object",
2728 bucket = %get_bucket,
2729 key = %get_key,
2730 codec = m.codec.as_str(),
2731 bytes_in = m.compressed_size,
2732 bytes_out = m.original_size,
2733 path = "streaming",
2734 setup_latency_ms = elapsed.as_millis() as u64,
2735 "S4 get started (streaming)"
2736 );
2737 return Ok(resp);
2738 }
2739 if range_request.is_none()
2741 && !needs_frame_parse
2742 && let Some(ref m) = manifest_opt
2743 && m.codec == CodecKind::Passthrough
2744 {
2745 resp.output.content_length = Some(m.original_size as i64);
2746 resp.output.checksum_crc32 = None;
2747 resp.output.checksum_crc32c = None;
2748 resp.output.checksum_crc64nvme = None;
2749 resp.output.checksum_sha1 = None;
2750 resp.output.checksum_sha256 = None;
2751 resp.output.e_tag = None;
2752 resp.output.body = Some(blob);
2753 debug!("S4 get_object: passthrough streaming");
2754 return Ok(resp);
2755 }
2756
2757 let bytes = collect_blob(blob, self.max_body_bytes)
2759 .await
2760 .map_err(internal("collect get body"))?;
2761
2762 let decompressed = if needs_frame_parse {
2763 self.decompress_multipart(bytes).await?
2766 } else {
2767 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2768 self.registry
2769 .decompress(bytes, manifest)
2770 .await
2771 .map_err(internal("registry decompress"))?
2772 };
2773
2774 let total_size = decompressed.len() as u64;
2776 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2777 let (start, end) = resolve_range(r, total_size)
2778 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2779 let sliced = decompressed.slice(start as usize..end as usize);
2780 resp.output.content_range = Some(format!(
2781 "bytes {start}-{}/{total_size}",
2782 end.saturating_sub(1)
2783 ));
2784 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2785 } else {
2786 (decompressed, None)
2787 };
2788 resp.output.content_length = Some(final_bytes.len() as i64);
2791 resp.output.checksum_crc32 = None;
2796 resp.output.checksum_crc32c = None;
2797 resp.output.checksum_crc64nvme = None;
2798 resp.output.checksum_sha1 = None;
2799 resp.output.checksum_sha256 = None;
2800 resp.output.e_tag = None;
2801 let returned_size = final_bytes.len() as u64;
2802 let codec_label = manifest_opt
2803 .as_ref()
2804 .map(|m| m.codec.as_str())
2805 .unwrap_or("multipart");
2806 resp.output.body = Some(bytes_to_blob(final_bytes));
2807 if let Some(status) = status_override {
2808 resp.status = Some(status);
2809 }
2810 let elapsed = get_start.elapsed();
2811 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2812 info!(
2813 op = "get_object",
2814 bucket = %get_bucket,
2815 key = %get_key,
2816 codec = codec_label,
2817 bytes_out = returned_size,
2818 total_object_size = total_size,
2819 range = range_request.is_some(),
2820 path = "buffered",
2821 latency_ms = elapsed.as_millis() as u64,
2822 "S4 get completed (buffered)"
2823 );
2824 }
2825 if let Some(mgr) = self.replication.as_ref()
2828 && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2829 {
2830 resp.output.replication_status =
2831 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2832 }
2833 Ok(resp)
2834 }
2835
2836 async fn head_bucket(
2838 &self,
2839 req: S3Request<HeadBucketInput>,
2840 ) -> S3Result<S3Response<HeadBucketOutput>> {
2841 self.backend.head_bucket(req).await
2842 }
2843 async fn list_buckets(
2844 &self,
2845 req: S3Request<ListBucketsInput>,
2846 ) -> S3Result<S3Response<ListBucketsOutput>> {
2847 self.backend.list_buckets(req).await
2848 }
2849 async fn create_bucket(
2850 &self,
2851 req: S3Request<CreateBucketInput>,
2852 ) -> S3Result<S3Response<CreateBucketOutput>> {
2853 self.backend.create_bucket(req).await
2854 }
2855 async fn delete_bucket(
2856 &self,
2857 req: S3Request<DeleteBucketInput>,
2858 ) -> S3Result<S3Response<DeleteBucketOutput>> {
2859 self.backend.delete_bucket(req).await
2860 }
2861 async fn head_object(
2862 &self,
2863 req: S3Request<HeadObjectInput>,
2864 ) -> S3Result<S3Response<HeadObjectOutput>> {
2865 let head_bucket = req.input.bucket.clone();
2868 let head_key = req.input.key.clone();
2869 let mut resp = self.backend.head_object(req).await?;
2870 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2871 resp.output.content_length = Some(manifest.original_size as i64);
2875 resp.output.checksum_crc32 = None;
2876 resp.output.checksum_crc32c = None;
2877 resp.output.checksum_crc64nvme = None;
2878 resp.output.checksum_sha1 = None;
2879 resp.output.checksum_sha256 = None;
2880 resp.output.e_tag = None;
2881 }
2882 if let Some(mgr) = self.replication.as_ref()
2885 && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2886 {
2887 resp.output.replication_status =
2888 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2889 }
2890 if let Some(meta) = resp.output.metadata.as_ref()
2895 && let Some(sse_type) = meta.get("s4-sse-type")
2896 {
2897 {
2898 match sse_type.as_str() {
2899 "aws:kms" => {
2900 resp.output.server_side_encryption = Some(
2901 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2902 );
2903 if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2904 resp.output.ssekms_key_id = Some(key_id.clone());
2905 }
2906 }
2907 _ => {
2908 resp.output.server_side_encryption = Some(
2909 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2910 );
2911 if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2912 resp.output.sse_customer_algorithm =
2913 Some(crate::sse::SSE_C_ALGORITHM.into());
2914 resp.output.sse_customer_key_md5 = Some(md5.clone());
2915 }
2916 }
2917 }
2918 }
2919 }
2920 Ok(resp)
2921 }
2922 async fn delete_object(
2923 &self,
2924 mut req: S3Request<DeleteObjectInput>,
2925 ) -> S3Result<S3Response<DeleteObjectOutput>> {
2926 let bucket = req.input.bucket.clone();
2927 let key = req.input.key.clone();
2928 self.enforce_rate_limit(&req, &bucket)?;
2929 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2930 if let Some(mgr) = self.mfa_delete.as_ref()
2937 && mgr.is_enabled(&bucket)
2938 {
2939 let header = req.input.mfa.as_deref();
2940 if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2941 crate::metrics::record_mfa_delete_denial(&bucket);
2942 return Err(mfa_error_to_s3(e));
2943 }
2944 }
2945 if let Some(mgr) = self.object_lock.as_ref()
2953 && let Some(state) = mgr.get(&bucket, &key)
2954 {
2955 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2956 let now = chrono::Utc::now();
2957 if !state.can_delete(now, bypass) {
2958 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2959 return Err(S3Error::with_message(
2960 S3ErrorCode::AccessDenied,
2961 "Access Denied because object protected by object lock",
2962 ));
2963 }
2964 }
2965 if let Some(mgr) = self.versioning.as_ref() {
2981 let state = mgr.state(&bucket);
2982 if state != crate::versioning::VersioningState::Unversioned {
2983 let req_vid = req.input.version_id.take();
2984 if let Some(vid) = req_vid {
2985 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2989 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2990 key.clone()
2991 } else {
2992 versioned_shadow_key(&key, &vid)
2993 };
2994 let was_real_version = outcome
2995 .as_ref()
2996 .map(|o| !o.is_delete_marker)
2997 .unwrap_or(false);
2998 if was_real_version {
2999 let backend_input = DeleteObjectInput {
3003 bucket: bucket.clone(),
3004 key: backend_target,
3005 ..Default::default()
3006 };
3007 let backend_req = S3Request {
3008 input: backend_input,
3009 method: http::Method::DELETE,
3010 uri: req.uri.clone(),
3011 headers: req.headers.clone(),
3012 extensions: http::Extensions::new(),
3013 credentials: req.credentials.clone(),
3014 region: req.region.clone(),
3015 service: req.service.clone(),
3016 trailing_headers: None,
3017 };
3018 let _ = self.backend.delete_object(backend_req).await;
3019 }
3020 let mut output = DeleteObjectOutput {
3021 version_id: Some(vid.clone()),
3022 ..Default::default()
3023 };
3024 if let Some(o) = outcome.as_ref()
3025 && o.is_delete_marker
3026 {
3027 output.delete_marker = Some(true);
3028 }
3029 self.fire_delete_notification(
3033 &bucket,
3034 &key,
3035 crate::notifications::EventType::ObjectRemovedDelete,
3036 Some(vid.clone()),
3037 );
3038 return Ok(S3Response::new(output));
3039 }
3040 let outcome = mgr.record_delete(&bucket, &key);
3042 if state == crate::versioning::VersioningState::Suspended {
3043 let backend_input = DeleteObjectInput {
3046 bucket: bucket.clone(),
3047 key: key.clone(),
3048 ..Default::default()
3049 };
3050 let backend_req = S3Request {
3051 input: backend_input,
3052 method: http::Method::DELETE,
3053 uri: req.uri.clone(),
3054 headers: req.headers.clone(),
3055 extensions: http::Extensions::new(),
3056 credentials: req.credentials.clone(),
3057 region: req.region.clone(),
3058 service: req.service.clone(),
3059 trailing_headers: None,
3060 };
3061 let _ = self.backend.delete_object(backend_req).await;
3062 }
3063 let output = DeleteObjectOutput {
3064 delete_marker: Some(true),
3065 version_id: outcome.version_id.clone(),
3066 ..Default::default()
3067 };
3068 self.fire_delete_notification(
3073 &bucket,
3074 &key,
3075 crate::notifications::EventType::ObjectRemovedDeleteMarker,
3076 outcome.version_id,
3077 );
3078 return Ok(S3Response::new(output));
3079 }
3080 }
3081 let resp = self.backend.delete_object(req).await?;
3084 if let Some(mgr) = self.object_lock.as_ref() {
3089 mgr.clear(&bucket, &key);
3090 }
3091 if let Some(mgr) = self.tagging.as_ref() {
3097 mgr.delete_object_tags(&bucket, &key);
3098 }
3099 let sidecar = sidecar_key(&key);
3100 if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
3105 let sidecar_input = DeleteObjectInput {
3106 bucket: bucket.clone(),
3107 key: sidecar,
3108 ..Default::default()
3109 };
3110 let sidecar_req = S3Request {
3111 input: sidecar_input,
3112 method: http::Method::DELETE,
3113 uri,
3114 headers: http::HeaderMap::new(),
3115 extensions: http::Extensions::new(),
3116 credentials: None,
3117 region: None,
3118 service: None,
3119 trailing_headers: None,
3120 };
3121 let _ = self.backend.delete_object(sidecar_req).await;
3122 }
3123 self.fire_delete_notification(
3126 &bucket,
3127 &key,
3128 crate::notifications::EventType::ObjectRemovedDelete,
3129 None,
3130 );
3131 Ok(resp)
3132 }
3133 async fn delete_objects(
3134 &self,
3135 req: S3Request<DeleteObjectsInput>,
3136 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
3137 if let Some(mgr) = self.mfa_delete.as_ref()
3141 && mgr.is_enabled(&req.input.bucket)
3142 {
3143 let header = req.input.mfa.as_deref();
3144 if let Err(e) =
3145 crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
3146 {
3147 crate::metrics::record_mfa_delete_denial(&req.input.bucket);
3148 return Err(mfa_error_to_s3(e));
3149 }
3150 }
3151 self.backend.delete_objects(req).await
3152 }
3153 async fn copy_object(
3154 &self,
3155 mut req: S3Request<CopyObjectInput>,
3156 ) -> S3Result<S3Response<CopyObjectOutput>> {
3157 let dst_bucket = req.input.bucket.clone();
3159 let dst_key = req.input.key.clone();
3160 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
3161 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3162 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
3163 }
3164 let needs_merge = req
3174 .input
3175 .metadata_directive
3176 .as_ref()
3177 .map(|d| d.as_str() == MetadataDirective::REPLACE)
3178 .unwrap_or(false);
3179 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3180 let head_input = HeadObjectInput {
3181 bucket: bucket.to_string(),
3182 key: key.to_string(),
3183 ..Default::default()
3184 };
3185 let head_req = S3Request {
3186 input: head_input,
3187 method: req.method.clone(),
3188 uri: req.uri.clone(),
3189 headers: req.headers.clone(),
3190 extensions: http::Extensions::new(),
3191 credentials: req.credentials.clone(),
3192 region: req.region.clone(),
3193 service: req.service.clone(),
3194 trailing_headers: None,
3195 };
3196 if let Ok(head) = self.backend.head_object(head_req).await
3197 && let Some(src_meta) = head.output.metadata.as_ref()
3198 {
3199 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3200 for key in [
3201 META_CODEC,
3202 META_ORIGINAL_SIZE,
3203 META_COMPRESSED_SIZE,
3204 META_CRC32C,
3205 META_MULTIPART,
3206 META_FRAMED,
3207 ] {
3208 if let Some(v) = src_meta.get(key) {
3209 dest_meta
3212 .entry(key.to_string())
3213 .or_insert_with(|| v.clone());
3214 }
3215 }
3216 debug!(
3217 src_bucket = %bucket,
3218 src_key = %key,
3219 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3220 );
3221 }
3222 }
3223 self.backend.copy_object(req).await
3224 }
3225 async fn list_objects(
3226 &self,
3227 req: S3Request<ListObjectsInput>,
3228 ) -> S3Result<S3Response<ListObjectsOutput>> {
3229 self.enforce_rate_limit(&req, &req.input.bucket)?;
3230 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3231 let mut resp = self.backend.list_objects(req).await?;
3232 if let Some(contents) = resp.output.contents.as_mut() {
3235 contents.retain(|o| {
3236 o.key
3237 .as_ref()
3238 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3239 .unwrap_or(true)
3240 });
3241 }
3242 Ok(resp)
3243 }
3244 async fn list_objects_v2(
3245 &self,
3246 req: S3Request<ListObjectsV2Input>,
3247 ) -> S3Result<S3Response<ListObjectsV2Output>> {
3248 self.enforce_rate_limit(&req, &req.input.bucket)?;
3249 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3250 let mut resp = self.backend.list_objects_v2(req).await?;
3251 if let Some(contents) = resp.output.contents.as_mut() {
3252 let before = contents.len();
3253 contents.retain(|o| {
3254 o.key
3255 .as_ref()
3256 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3257 .unwrap_or(true)
3258 });
3259 if let Some(kc) = resp.output.key_count.as_mut() {
3261 *kc -= (before - contents.len()) as i32;
3262 }
3263 }
3264 Ok(resp)
3265 }
3266 async fn list_object_versions(
3274 &self,
3275 req: S3Request<ListObjectVersionsInput>,
3276 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3277 self.enforce_rate_limit(&req, &req.input.bucket)?;
3278 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3279 if let Some(mgr) = self.versioning.as_ref()
3281 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3282 {
3283 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3284 let page = mgr.list_versions(
3285 &req.input.bucket,
3286 req.input.prefix.as_deref(),
3287 req.input.key_marker.as_deref(),
3288 req.input.version_id_marker.as_deref(),
3289 max_keys,
3290 );
3291 let versions: Vec<ObjectVersion> = page
3292 .versions
3293 .into_iter()
3294 .map(|e| ObjectVersion {
3295 key: Some(e.key),
3296 version_id: Some(e.version_id),
3297 is_latest: Some(e.is_latest),
3298 e_tag: Some(ETag::Strong(e.etag)),
3299 size: Some(e.size as i64),
3300 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3301 ..Default::default()
3302 })
3303 .collect();
3304 let delete_markers: Vec<DeleteMarkerEntry> = page
3305 .delete_markers
3306 .into_iter()
3307 .map(|e| DeleteMarkerEntry {
3308 key: Some(e.key),
3309 version_id: Some(e.version_id),
3310 is_latest: Some(e.is_latest),
3311 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3312 ..Default::default()
3313 })
3314 .collect();
3315 let output = ListObjectVersionsOutput {
3316 name: Some(req.input.bucket.clone()),
3317 prefix: req.input.prefix.clone(),
3318 key_marker: req.input.key_marker.clone(),
3319 version_id_marker: req.input.version_id_marker.clone(),
3320 max_keys: req.input.max_keys,
3321 versions: if versions.is_empty() {
3322 None
3323 } else {
3324 Some(versions)
3325 },
3326 delete_markers: if delete_markers.is_empty() {
3327 None
3328 } else {
3329 Some(delete_markers)
3330 },
3331 is_truncated: Some(page.is_truncated),
3332 next_key_marker: page.next_key_marker,
3333 next_version_id_marker: page.next_version_id_marker,
3334 ..Default::default()
3335 };
3336 return Ok(S3Response::new(output));
3337 }
3338 let mut resp = self.backend.list_object_versions(req).await?;
3340 if let Some(versions) = resp.output.versions.as_mut() {
3341 versions.retain(|v| {
3342 v.key
3343 .as_ref()
3344 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3345 .unwrap_or(true)
3346 });
3347 }
3348 if let Some(markers) = resp.output.delete_markers.as_mut() {
3349 markers.retain(|m| {
3350 m.key
3351 .as_ref()
3352 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3353 .unwrap_or(true)
3354 });
3355 }
3356 Ok(resp)
3357 }
3358
3359 async fn create_multipart_upload(
3360 &self,
3361 mut req: S3Request<CreateMultipartUploadInput>,
3362 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3363 let codec_kind = self.registry.default_kind();
3367 let meta = req.input.metadata.get_or_insert_with(Default::default);
3368 meta.insert(META_MULTIPART.into(), "true".into());
3369 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3370 let sse_c_alg = req.input.sse_customer_algorithm.take();
3378 let sse_c_key = req.input.sse_customer_key.take();
3379 let sse_c_md5 = req.input.sse_customer_key_md5.take();
3380 let sse_header = req.input.server_side_encryption.take();
3381 let sse_kms_key = req.input.ssekms_key_id.take();
3382 let _ = req.input.ssekms_encryption_context.take();
3385 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
3386 let kms_key_id = extract_kms_key_id(
3387 &sse_header,
3388 &sse_kms_key,
3389 self.kms_default_key_id.as_deref(),
3390 );
3391 if sse_c_material.is_some() && kms_key_id.is_some() {
3393 return Err(S3Error::with_message(
3394 S3ErrorCode::InvalidArgument,
3395 "SSE-C and SSE-KMS cannot be used together on the same multipart upload",
3396 ));
3397 }
3398 let sse_mode = if let Some(ref m) = sse_c_material {
3399 crate::multipart_state::MultipartSseMode::SseC {
3400 key: m.key,
3401 key_md5: m.key_md5,
3402 }
3403 } else if let Some(ref kid) = kms_key_id {
3404 if self.kms.is_none() {
3408 return Err(S3Error::with_message(
3409 S3ErrorCode::InvalidRequest,
3410 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3411 ));
3412 }
3413 crate::multipart_state::MultipartSseMode::SseKms { key_id: kid.clone() }
3414 } else if self.sse_keyring.is_some() {
3415 crate::multipart_state::MultipartSseMode::SseS4
3419 } else {
3420 crate::multipart_state::MultipartSseMode::None
3421 };
3422 let request_tags: Option<crate::tagging::TagSet> = req
3426 .input
3427 .tagging
3428 .as_deref()
3429 .map(crate::tagging::parse_tagging_header)
3430 .transpose()
3431 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
3432 let _ = req.input.tagging.take();
3436 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
3438 .input
3439 .object_lock_mode
3440 .as_ref()
3441 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3442 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
3443 .input
3444 .object_lock_retain_until_date
3445 .as_ref()
3446 .and_then(timestamp_to_chrono_utc);
3447 let explicit_legal_hold_on: bool = req
3448 .input
3449 .object_lock_legal_hold_status
3450 .as_ref()
3451 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3452 .unwrap_or(false);
3453 let bucket = req.input.bucket.clone();
3454 let key = req.input.key.clone();
3455 debug!(
3456 bucket = %bucket,
3457 key = %key,
3458 codec = codec_kind.as_str(),
3459 sse = ?sse_mode,
3460 "S4 create_multipart_upload: marking object for per-part compression"
3461 );
3462 let mut resp = self.backend.create_multipart_upload(req).await?;
3463 if let Some(upload_id) = resp.output.upload_id.as_ref() {
3466 self.multipart_state.put(
3467 upload_id,
3468 crate::multipart_state::MultipartUploadContext {
3469 bucket,
3470 key,
3471 sse: sse_mode.clone(),
3472 tags: request_tags,
3473 object_lock_mode: explicit_lock_mode,
3474 object_lock_retain_until: explicit_retain_until,
3475 object_lock_legal_hold: explicit_legal_hold_on,
3476 },
3477 );
3478 }
3479 match &sse_mode {
3481 crate::multipart_state::MultipartSseMode::SseC { key_md5, .. } => {
3482 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
3483 resp.output.sse_customer_key_md5 = Some(
3484 base64::engine::general_purpose::STANDARD.encode(key_md5),
3485 );
3486 }
3487 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3488 resp.output.server_side_encryption = Some(
3489 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3490 );
3491 resp.output.ssekms_key_id = Some(key_id.clone());
3492 }
3493 _ => {}
3494 }
3495 Ok(resp)
3496 }
3497
3498 async fn upload_part(
3499 &self,
3500 mut req: S3Request<UploadPartInput>,
3501 ) -> S3Result<S3Response<UploadPartOutput>> {
3502 let _sse_ctx = self
3521 .multipart_state
3522 .get(req.input.upload_id.as_str());
3523 let _ = req.input.sse_customer_algorithm.take();
3533 let _ = req.input.sse_customer_key.take();
3534 let _ = req.input.sse_customer_key_md5.take();
3535 if let Some(blob) = req.input.body.take() {
3536 let bytes = collect_blob(blob, self.max_body_bytes)
3537 .await
3538 .map_err(internal("collect upload_part body"))?;
3539 let sample_len = bytes.len().min(SAMPLE_BYTES);
3540 let codec_kind = self
3544 .dispatcher
3545 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
3546 .await;
3547 let original_size = bytes.len() as u64;
3548 let (compress_res, tel) = self
3550 .registry
3551 .compress_with_telemetry(bytes, codec_kind)
3552 .await;
3553 stamp_gpu_compress_telemetry(&tel);
3554 let (compressed, manifest) = compress_res.map_err(internal("registry compress part"))?;
3555 let header = FrameHeader {
3556 codec: codec_kind,
3557 original_size,
3558 compressed_size: compressed.len() as u64,
3559 crc32c: manifest.crc32c,
3560 };
3561 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3562 write_frame(&mut framed, header, &compressed);
3563 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3577 if !likely_final {
3578 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3579 }
3580 let framed_bytes = framed.freeze();
3581 let new_len = framed_bytes.len() as i64;
3582 req.input.content_length = Some(new_len);
3584 req.input.checksum_algorithm = None;
3585 req.input.checksum_crc32 = None;
3586 req.input.checksum_crc32c = None;
3587 req.input.checksum_crc64nvme = None;
3588 req.input.checksum_sha1 = None;
3589 req.input.checksum_sha256 = None;
3590 req.input.content_md5 = None;
3591 req.input.body = Some(bytes_to_blob(framed_bytes));
3592 debug!(
3593 part_number = ?req.input.part_number,
3594 upload_id = ?req.input.upload_id,
3595 original_size,
3596 framed_size = new_len,
3597 "S4 upload_part: framed compressed payload"
3598 );
3599 }
3600 self.backend.upload_part(req).await
3601 }
3602 async fn complete_multipart_upload(
3603 &self,
3604 mut req: S3Request<CompleteMultipartUploadInput>,
3605 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3606 let bucket = req.input.bucket.clone();
3607 let key = req.input.key.clone();
3608 let upload_id = req.input.upload_id.clone();
3609 let completion_lock = self.multipart_state.completion_lock(&bucket, &key);
3619 let _completion_guard = completion_lock.lock().await;
3620 let ctx = self.multipart_state.get(upload_id.as_str());
3628 let _ = req.input.sse_customer_algorithm.take();
3633 let _ = req.input.sse_customer_key.take();
3634 let _ = req.input.sse_customer_key_md5.take();
3635 let mut resp = self.backend.complete_multipart_upload(req).await?;
3636 let assembled_body: Option<bytes::Bytes> =
3647 if let Ok(uri) = safe_object_uri(&bucket, &key) {
3648 let get_input = GetObjectInput {
3649 bucket: bucket.clone(),
3650 key: key.clone(),
3651 ..Default::default()
3652 };
3653 let get_req = S3Request {
3654 input: get_input,
3655 method: http::Method::GET,
3656 uri,
3657 headers: http::HeaderMap::new(),
3658 extensions: http::Extensions::new(),
3659 credentials: None,
3660 region: None,
3661 service: None,
3662 trailing_headers: None,
3663 };
3664 match self.backend.get_object(get_req).await {
3665 Ok(get_resp) => match get_resp.output.body {
3666 Some(blob) => collect_blob(blob, self.max_body_bytes).await.ok(),
3667 None => None,
3668 },
3669 Err(_) => None,
3670 }
3671 } else {
3672 None
3673 };
3674 if let Some(ref body) = assembled_body
3676 && let Ok(index) = build_index_from_body(body)
3677 {
3678 self.write_sidecar(&bucket, &key, &index).await;
3679 }
3680 if let Some(ctx) = ctx {
3684 let pending_version: Option<crate::versioning::PutOutcome> = self
3691 .versioning
3692 .as_ref()
3693 .map(|mgr| mgr.state(&bucket))
3694 .map(|state| match state {
3695 crate::versioning::VersioningState::Enabled => {
3696 crate::versioning::PutOutcome {
3697 version_id: crate::versioning::VersioningManager::new_version_id(),
3698 versioned_response: true,
3699 }
3700 }
3701 crate::versioning::VersioningState::Suspended
3702 | crate::versioning::VersioningState::Unversioned => {
3703 crate::versioning::PutOutcome {
3704 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
3705 versioned_response: false,
3706 }
3707 }
3708 });
3709 let needs_re_put = matches!(
3726 ctx.sse,
3727 crate::multipart_state::MultipartSseMode::SseS4
3728 | crate::multipart_state::MultipartSseMode::SseC { .. }
3729 | crate::multipart_state::MultipartSseMode::SseKms { .. }
3730 ) || pending_version
3731 .as_ref()
3732 .map(|pv| pv.versioned_response)
3733 .unwrap_or(false);
3734 let replication_body = assembled_body.clone();
3737 let mut applied_metadata: Option<std::collections::HashMap<String, String>> = None;
3738 if needs_re_put && let Some(body) = assembled_body {
3739 let kms_wrap: Option<(
3744 zeroize::Zeroizing<[u8; 32]>,
3745 crate::kms::WrappedDek,
3746 )> = if let crate::multipart_state::MultipartSseMode::SseKms {
3747 ref key_id,
3748 } = ctx.sse
3749 {
3750 let kms = self.kms.as_ref().ok_or_else(|| {
3751 S3Error::with_message(
3752 S3ErrorCode::InvalidRequest,
3753 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3754 )
3755 })?;
3756 let (dek, wrapped) = kms
3757 .generate_dek(key_id)
3758 .await
3759 .map_err(kms_error_to_s3)?;
3760 if dek.len() != 32 {
3761 return Err(S3Error::with_message(
3762 S3ErrorCode::InternalError,
3763 format!(
3764 "KMS backend returned a DEK of {} bytes (expected 32)",
3765 dek.len()
3766 ),
3767 ));
3768 }
3769 let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
3770 zeroize::Zeroizing::new([0u8; 32]);
3771 dek_arr.copy_from_slice(&dek);
3772 Some((dek_arr, wrapped))
3774 } else {
3775 None
3776 };
3777 let head_req = S3Request {
3782 input: HeadObjectInput {
3783 bucket: bucket.clone(),
3784 key: key.clone(),
3785 ..Default::default()
3786 },
3787 method: http::Method::HEAD,
3788 uri: safe_object_uri(&bucket, &key)?,
3789 headers: http::HeaderMap::new(),
3790 extensions: http::Extensions::new(),
3791 credentials: None,
3792 region: None,
3793 service: None,
3794 trailing_headers: None,
3795 };
3796 let mut new_metadata: std::collections::HashMap<String, String> =
3797 match self.backend.head_object(head_req).await {
3798 Ok(h) => h.output.metadata.unwrap_or_default(),
3799 Err(_) => std::collections::HashMap::new(),
3800 };
3801 let new_body = match &ctx.sse {
3802 crate::multipart_state::MultipartSseMode::SseC { key, key_md5 } => {
3803 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3804 new_metadata.insert("s4-sse-type".into(), "AES256".into());
3805 new_metadata.insert(
3806 "s4-sse-c-key-md5".into(),
3807 base64::engine::general_purpose::STANDARD.encode(key_md5),
3808 );
3809 crate::sse::encrypt_with_source(
3810 &body,
3811 crate::sse::SseSource::CustomerKey { key, key_md5 },
3812 )
3813 }
3814 crate::multipart_state::MultipartSseMode::SseKms { .. } => {
3815 let (dek, wrapped) = kms_wrap
3816 .as_ref()
3817 .expect("SseKms branch implies kms_wrap is Some");
3818 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3819 new_metadata.insert("s4-sse-type".into(), "aws:kms".into());
3820 new_metadata
3821 .insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
3822 let dek_ref: &[u8; 32] = dek;
3826 crate::sse::encrypt_with_source(
3827 &body,
3828 crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
3829 )
3830 }
3831 crate::multipart_state::MultipartSseMode::SseS4 => {
3832 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
3833 S3Error::with_message(
3834 S3ErrorCode::InternalError,
3835 "SSE-S4 captured at Create but keyring missing at Complete",
3836 )
3837 })?;
3838 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3839 if self.sse_chunk_size > 0 {
3849 crate::sse::encrypt_v2_chunked(
3850 &body,
3851 keyring,
3852 self.sse_chunk_size,
3853 )
3854 .map_err(|e| {
3855 S3Error::with_message(
3856 S3ErrorCode::InternalError,
3857 format!(
3858 "SSE-S4 chunked encrypt failed at Complete: {e}"
3859 ),
3860 )
3861 })?
3862 } else {
3863 crate::sse::encrypt_v2(&body, keyring)
3864 }
3865 }
3866 crate::multipart_state::MultipartSseMode::None => body.clone(),
3867 };
3868 let put_target_key = if let Some(pv) = pending_version.as_ref() {
3875 if pv.versioned_response {
3876 versioned_shadow_key(&key, &pv.version_id)
3877 } else {
3878 key.clone()
3879 }
3880 } else {
3881 key.clone()
3882 };
3883 let new_body_len = new_body.len() as i64;
3884 let put_req = S3Request {
3885 input: PutObjectInput {
3886 bucket: bucket.clone(),
3887 key: put_target_key.clone(),
3888 body: Some(bytes_to_blob(new_body.clone())),
3889 metadata: Some(new_metadata.clone()),
3890 content_length: Some(new_body_len),
3891 ..Default::default()
3892 },
3893 method: http::Method::PUT,
3894 uri: safe_object_uri(&bucket, &put_target_key)?,
3895 headers: http::HeaderMap::new(),
3896 extensions: http::Extensions::new(),
3897 credentials: None,
3898 region: None,
3899 service: None,
3900 trailing_headers: None,
3901 };
3902 self.backend.put_object(put_req).await?;
3903 if put_target_key != key {
3908 let del_req = S3Request {
3909 input: DeleteObjectInput {
3910 bucket: bucket.clone(),
3911 key: key.clone(),
3912 ..Default::default()
3913 },
3914 method: http::Method::DELETE,
3915 uri: safe_object_uri(&bucket, &key)?,
3916 headers: http::HeaderMap::new(),
3917 extensions: http::Extensions::new(),
3918 credentials: None,
3919 region: None,
3920 service: None,
3921 trailing_headers: None,
3922 };
3923 let _ = self.backend.delete_object(del_req).await;
3924 }
3925 applied_metadata = Some(new_metadata);
3926 }
3927 if let (Some(mgr), Some(pv)) = (self.versioning.as_ref(), pending_version.as_ref()) {
3931 let etag = resp
3932 .output
3933 .e_tag
3934 .clone()
3935 .map(ETag::into_value)
3936 .unwrap_or_default();
3937 let now = chrono::Utc::now();
3938 mgr.commit_put_with_version(
3939 &bucket,
3940 &key,
3941 crate::versioning::VersionEntry {
3942 version_id: pv.version_id.clone(),
3943 etag,
3944 size: replication_body
3945 .as_ref()
3946 .map(|b| b.len() as u64)
3947 .unwrap_or(0),
3948 is_delete_marker: false,
3949 created_at: now,
3950 },
3951 );
3952 if pv.versioned_response {
3953 resp.output.version_id = Some(pv.version_id.clone());
3954 }
3955 }
3956 if let Some(mgr) = self.object_lock.as_ref() {
3960 if ctx.object_lock_mode.is_some()
3961 || ctx.object_lock_retain_until.is_some()
3962 || ctx.object_lock_legal_hold
3963 {
3964 let mut state = mgr.get(&bucket, &key).unwrap_or_default();
3965 if let Some(m) = ctx.object_lock_mode {
3966 state.mode = Some(m);
3967 }
3968 if let Some(u) = ctx.object_lock_retain_until {
3969 state.retain_until = Some(u);
3970 }
3971 if ctx.object_lock_legal_hold {
3972 state.legal_hold_on = true;
3973 }
3974 mgr.set(&bucket, &key, state);
3975 }
3976 mgr.apply_default_on_put(&bucket, &key, chrono::Utc::now());
3977 }
3978 if let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), ctx.tags.as_ref()) {
3981 mgr.put_object_tags(&bucket, &key, tags.clone());
3982 }
3983 match &ctx.sse {
3988 crate::multipart_state::MultipartSseMode::SseC { .. } => {
3989 resp.output.server_side_encryption = Some(
3990 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
3991 );
3992 }
3993 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3994 resp.output.server_side_encryption = Some(
3995 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3996 );
3997 resp.output.ssekms_key_id = Some(key_id.clone());
3998 }
3999 _ => {}
4000 }
4001 let replication_body_bytes = replication_body.unwrap_or_default();
4008 self.spawn_replication_if_matched(
4009 &bucket,
4010 &key,
4011 &ctx.tags,
4012 &replication_body_bytes,
4013 &applied_metadata,
4014 true,
4015 );
4016 self.multipart_state.remove(upload_id.as_str());
4017 }
4018 self.multipart_state.prune_completion_locks();
4028 Ok(resp)
4029 }
4030 async fn abort_multipart_upload(
4031 &self,
4032 req: S3Request<AbortMultipartUploadInput>,
4033 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
4034 self.multipart_state.remove(req.input.upload_id.as_str());
4038 self.backend.abort_multipart_upload(req).await
4039 }
4040 async fn list_multipart_uploads(
4041 &self,
4042 req: S3Request<ListMultipartUploadsInput>,
4043 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
4044 self.backend.list_multipart_uploads(req).await
4045 }
4046 async fn list_parts(
4047 &self,
4048 req: S3Request<ListPartsInput>,
4049 ) -> S3Result<S3Response<ListPartsOutput>> {
4050 self.backend.list_parts(req).await
4051 }
4052
4053 async fn get_object_acl(
4069 &self,
4070 req: S3Request<GetObjectAclInput>,
4071 ) -> S3Result<S3Response<GetObjectAclOutput>> {
4072 self.backend.get_object_acl(req).await
4073 }
4074 async fn put_object_acl(
4075 &self,
4076 req: S3Request<PutObjectAclInput>,
4077 ) -> S3Result<S3Response<PutObjectAclOutput>> {
4078 self.backend.put_object_acl(req).await
4079 }
4080 async fn get_object_tagging(
4086 &self,
4087 req: S3Request<GetObjectTaggingInput>,
4088 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
4089 let Some(mgr) = self.tagging.as_ref() else {
4090 return self.backend.get_object_tagging(req).await;
4091 };
4092 let tags = mgr
4093 .get_object_tags(&req.input.bucket, &req.input.key)
4094 .unwrap_or_default();
4095 Ok(S3Response::new(GetObjectTaggingOutput {
4096 tag_set: tagset_to_aws(&tags),
4097 ..Default::default()
4098 }))
4099 }
4100 async fn put_object_tagging(
4101 &self,
4102 req: S3Request<PutObjectTaggingInput>,
4103 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
4104 let Some(mgr) = self.tagging.as_ref() else {
4105 return self.backend.put_object_tagging(req).await;
4106 };
4107 let bucket = req.input.bucket.clone();
4108 let key = req.input.key.clone();
4109 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4110 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4111 })?;
4112 let existing = mgr.get_object_tags(&bucket, &key);
4116 self.enforce_policy_with_extra(
4117 &req,
4118 "s3:PutObjectTagging",
4119 &bucket,
4120 Some(&key),
4121 Some(&parsed),
4122 existing.as_ref(),
4123 )?;
4124 mgr.put_object_tags(&bucket, &key, parsed);
4125 Ok(S3Response::new(PutObjectTaggingOutput::default()))
4126 }
4127 async fn delete_object_tagging(
4128 &self,
4129 req: S3Request<DeleteObjectTaggingInput>,
4130 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
4131 let Some(mgr) = self.tagging.as_ref() else {
4132 return self.backend.delete_object_tagging(req).await;
4133 };
4134 let bucket = req.input.bucket.clone();
4135 let key = req.input.key.clone();
4136 let existing = mgr.get_object_tags(&bucket, &key);
4137 self.enforce_policy_with_extra(
4138 &req,
4139 "s3:DeleteObjectTagging",
4140 &bucket,
4141 Some(&key),
4142 None,
4143 existing.as_ref(),
4144 )?;
4145 mgr.delete_object_tags(&bucket, &key);
4146 Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
4147 }
4148 async fn get_object_attributes(
4149 &self,
4150 req: S3Request<GetObjectAttributesInput>,
4151 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
4152 self.backend.get_object_attributes(req).await
4153 }
4154 async fn restore_object(
4155 &self,
4156 req: S3Request<RestoreObjectInput>,
4157 ) -> S3Result<S3Response<RestoreObjectOutput>> {
4158 self.backend.restore_object(req).await
4159 }
4160 async fn upload_part_copy(
4161 &self,
4162 req: S3Request<UploadPartCopyInput>,
4163 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
4164 let CopySource::Bucket {
4175 bucket: src_bucket,
4176 key: src_key,
4177 ..
4178 } = &req.input.copy_source
4179 else {
4180 return self.backend.upload_part_copy(req).await;
4181 };
4182 let src_bucket = src_bucket.to_string();
4183 let src_key = src_key.to_string();
4184
4185 let head_input = HeadObjectInput {
4187 bucket: src_bucket.clone(),
4188 key: src_key.clone(),
4189 ..Default::default()
4190 };
4191 let head_req = S3Request {
4192 input: head_input,
4193 method: http::Method::HEAD,
4194 uri: req.uri.clone(),
4195 headers: req.headers.clone(),
4196 extensions: http::Extensions::new(),
4197 credentials: req.credentials.clone(),
4198 region: req.region.clone(),
4199 service: req.service.clone(),
4200 trailing_headers: None,
4201 };
4202 let needs_s4_copy = match self.backend.head_object(head_req).await {
4203 Ok(h) => {
4204 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
4205 }
4206 Err(_) => false,
4207 };
4208 if !needs_s4_copy {
4209 return self.backend.upload_part_copy(req).await;
4210 }
4211
4212 let source_range = req
4214 .input
4215 .copy_source_range
4216 .as_ref()
4217 .map(|r| parse_copy_source_range(r))
4218 .transpose()
4219 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
4220
4221 let mut get_input = GetObjectInput {
4225 bucket: src_bucket.clone(),
4226 key: src_key.clone(),
4227 ..Default::default()
4228 };
4229 get_input.range = source_range;
4230 let get_req = S3Request {
4231 input: get_input,
4232 method: http::Method::GET,
4233 uri: req.uri.clone(),
4234 headers: req.headers.clone(),
4235 extensions: http::Extensions::new(),
4236 credentials: req.credentials.clone(),
4237 region: req.region.clone(),
4238 service: req.service.clone(),
4239 trailing_headers: None,
4240 };
4241 let get_resp = self.get_object(get_req).await?;
4242 let blob = get_resp.output.body.ok_or_else(|| {
4243 S3Error::with_message(
4244 S3ErrorCode::InternalError,
4245 "upload_part_copy: empty body from source GET",
4246 )
4247 })?;
4248 let bytes = collect_blob(blob, self.max_body_bytes)
4249 .await
4250 .map_err(internal("collect upload_part_copy source body"))?;
4251
4252 let sample_len = bytes.len().min(SAMPLE_BYTES);
4254 let codec_kind = self
4256 .dispatcher
4257 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
4258 .await;
4259 let original_size = bytes.len() as u64;
4260 let (compress_res, tel) = self
4262 .registry
4263 .compress_with_telemetry(bytes, codec_kind)
4264 .await;
4265 stamp_gpu_compress_telemetry(&tel);
4266 let (compressed, manifest) =
4267 compress_res.map_err(internal("registry compress upload_part_copy"))?;
4268 let header = FrameHeader {
4269 codec: codec_kind,
4270 original_size,
4271 compressed_size: compressed.len() as u64,
4272 crc32c: manifest.crc32c,
4273 };
4274 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
4275 write_frame(&mut framed, header, &compressed);
4276 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
4277 if !likely_final {
4278 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
4279 }
4280 let framed_bytes = framed.freeze();
4281 let framed_len = framed_bytes.len() as i64;
4282
4283 let part_input = UploadPartInput {
4285 bucket: req.input.bucket.clone(),
4286 key: req.input.key.clone(),
4287 part_number: req.input.part_number,
4288 upload_id: req.input.upload_id.clone(),
4289 body: Some(bytes_to_blob(framed_bytes)),
4290 content_length: Some(framed_len),
4291 ..Default::default()
4292 };
4293 let part_req = S3Request {
4294 input: part_input,
4295 method: http::Method::PUT,
4296 uri: req.uri.clone(),
4297 headers: req.headers.clone(),
4298 extensions: http::Extensions::new(),
4299 credentials: req.credentials.clone(),
4300 region: req.region.clone(),
4301 service: req.service.clone(),
4302 trailing_headers: None,
4303 };
4304 let upload_resp = self.backend.upload_part(part_req).await?;
4305
4306 let copy_output = UploadPartCopyOutput {
4307 copy_part_result: Some(CopyPartResult {
4308 e_tag: upload_resp.output.e_tag.clone(),
4309 ..Default::default()
4310 }),
4311 ..Default::default()
4312 };
4313 Ok(S3Response::new(copy_output))
4314 }
4315
4316 async fn get_object_lock_configuration(
4323 &self,
4324 req: S3Request<GetObjectLockConfigurationInput>,
4325 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
4326 if let Some(mgr) = self.object_lock.as_ref() {
4327 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
4328 ObjectLockConfiguration {
4329 object_lock_enabled: Some(ObjectLockEnabled::from_static(
4330 ObjectLockEnabled::ENABLED,
4331 )),
4332 rule: Some(ObjectLockRule {
4333 default_retention: Some(DefaultRetention {
4334 days: Some(d.retention_days as i32),
4335 mode: Some(ObjectLockRetentionMode::from_static(
4336 match d.mode {
4337 crate::object_lock::LockMode::Governance => {
4338 ObjectLockRetentionMode::GOVERNANCE
4339 }
4340 crate::object_lock::LockMode::Compliance => {
4341 ObjectLockRetentionMode::COMPLIANCE
4342 }
4343 },
4344 )),
4345 years: None,
4346 }),
4347 }),
4348 }
4349 });
4350 let output = GetObjectLockConfigurationOutput {
4351 object_lock_configuration: cfg,
4352 };
4353 return Ok(S3Response::new(output));
4354 }
4355 self.backend.get_object_lock_configuration(req).await
4356 }
4357 async fn put_object_lock_configuration(
4358 &self,
4359 req: S3Request<PutObjectLockConfigurationInput>,
4360 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
4361 if let Some(mgr) = self.object_lock.as_ref() {
4362 let bucket = req.input.bucket.clone();
4363 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
4364 && let Some(rule) = cfg.rule.as_ref()
4365 && let Some(d) = rule.default_retention.as_ref()
4366 {
4367 let mode = d
4368 .mode
4369 .as_ref()
4370 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
4371 .ok_or_else(|| {
4372 S3Error::with_message(
4373 S3ErrorCode::InvalidRequest,
4374 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
4375 )
4376 })?;
4377 let days: u32 = match (d.days, d.years) {
4381 (Some(d), None) if d > 0 => d as u32,
4382 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
4383 _ => {
4384 return Err(S3Error::with_message(
4385 S3ErrorCode::InvalidRequest,
4386 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
4387 ));
4388 }
4389 };
4390 mgr.set_bucket_default(
4391 &bucket,
4392 crate::object_lock::BucketObjectLockDefault {
4393 mode,
4394 retention_days: days,
4395 },
4396 );
4397 }
4398 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
4399 }
4400 self.backend.put_object_lock_configuration(req).await
4401 }
4402 async fn get_object_legal_hold(
4403 &self,
4404 req: S3Request<GetObjectLegalHoldInput>,
4405 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
4406 if let Some(mgr) = self.object_lock.as_ref() {
4407 let on = mgr
4408 .get(&req.input.bucket, &req.input.key)
4409 .map(|s| s.legal_hold_on)
4410 .unwrap_or(false);
4411 let status = ObjectLockLegalHoldStatus::from_static(if on {
4412 ObjectLockLegalHoldStatus::ON
4413 } else {
4414 ObjectLockLegalHoldStatus::OFF
4415 });
4416 let output = GetObjectLegalHoldOutput {
4417 legal_hold: Some(ObjectLockLegalHold {
4418 status: Some(status),
4419 }),
4420 };
4421 return Ok(S3Response::new(output));
4422 }
4423 self.backend.get_object_legal_hold(req).await
4424 }
4425 async fn put_object_legal_hold(
4426 &self,
4427 req: S3Request<PutObjectLegalHoldInput>,
4428 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
4429 if let Some(mgr) = self.object_lock.as_ref() {
4430 let on = req
4431 .input
4432 .legal_hold
4433 .as_ref()
4434 .and_then(|h| h.status.as_ref())
4435 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
4436 .unwrap_or(false);
4437 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
4438 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
4439 }
4440 self.backend.put_object_legal_hold(req).await
4441 }
4442 async fn get_object_retention(
4443 &self,
4444 req: S3Request<GetObjectRetentionInput>,
4445 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
4446 if let Some(mgr) = self.object_lock.as_ref() {
4447 let retention = mgr
4448 .get(&req.input.bucket, &req.input.key)
4449 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
4450 .map(|s| {
4451 let mode = s.mode.map(|m| {
4452 ObjectLockRetentionMode::from_static(match m {
4453 crate::object_lock::LockMode::Governance => {
4454 ObjectLockRetentionMode::GOVERNANCE
4455 }
4456 crate::object_lock::LockMode::Compliance => {
4457 ObjectLockRetentionMode::COMPLIANCE
4458 }
4459 })
4460 });
4461 let until = s.retain_until.map(chrono_utc_to_timestamp);
4462 ObjectLockRetention {
4463 mode,
4464 retain_until_date: until,
4465 }
4466 });
4467 let output = GetObjectRetentionOutput { retention };
4468 return Ok(S3Response::new(output));
4469 }
4470 self.backend.get_object_retention(req).await
4471 }
4472 async fn put_object_retention(
4473 &self,
4474 req: S3Request<PutObjectRetentionInput>,
4475 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
4476 if let Some(mgr) = self.object_lock.as_ref() {
4477 let bucket = req.input.bucket.clone();
4478 let key = req.input.key.clone();
4479 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
4480 let retention = req.input.retention.as_ref().ok_or_else(|| {
4481 S3Error::with_message(
4482 S3ErrorCode::InvalidRequest,
4483 "PutObjectRetention requires a Retention element",
4484 )
4485 })?;
4486 let new_mode = retention
4487 .mode
4488 .as_ref()
4489 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
4490 let new_until = retention
4491 .retain_until_date
4492 .as_ref()
4493 .map(timestamp_to_chrono_utc)
4494 .unwrap_or(None);
4495 let now = chrono::Utc::now();
4496 let existing = mgr.get(&bucket, &key).unwrap_or_default();
4497 if let Some(existing_mode) = existing.mode
4503 && existing_mode == crate::object_lock::LockMode::Compliance
4504 && existing.is_locked(now)
4505 {
4506 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
4507 return Err(S3Error::with_message(
4508 S3ErrorCode::AccessDenied,
4509 "Cannot downgrade Compliance retention to Governance while lock is active",
4510 ));
4511 }
4512 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4513 && next < prev
4514 {
4515 return Err(S3Error::with_message(
4516 S3ErrorCode::AccessDenied,
4517 "Cannot shorten Compliance retention while lock is active",
4518 ));
4519 }
4520 }
4521 if let Some(existing_mode) = existing.mode
4522 && existing_mode == crate::object_lock::LockMode::Governance
4523 && existing.is_locked(now)
4524 && !bypass
4525 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4526 && next < prev
4527 {
4528 return Err(S3Error::with_message(
4529 S3ErrorCode::AccessDenied,
4530 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
4531 ));
4532 }
4533 let mut state = existing;
4534 if new_mode.is_some() {
4535 state.mode = new_mode;
4536 }
4537 if new_until.is_some() {
4538 state.retain_until = new_until;
4539 }
4540 mgr.set(&bucket, &key, state);
4541 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
4542 }
4543 self.backend.put_object_retention(req).await
4544 }
4545
4546 async fn get_bucket_versioning(
4552 &self,
4553 req: S3Request<GetBucketVersioningInput>,
4554 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
4555 if let Some(mgr) = self.versioning.as_ref() {
4560 let output = match mgr.state(&req.input.bucket).as_aws_status() {
4561 Some(s) => GetBucketVersioningOutput {
4562 status: Some(BucketVersioningStatus::from(s.to_owned())),
4563 ..Default::default()
4564 },
4565 None => GetBucketVersioningOutput::default(),
4566 };
4567 return Ok(S3Response::new(output));
4568 }
4569 self.backend.get_bucket_versioning(req).await
4570 }
4571 async fn put_bucket_versioning(
4572 &self,
4573 req: S3Request<PutBucketVersioningInput>,
4574 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
4575 if let Some(mgr) = self.mfa_delete.as_ref()
4586 && let Some(target_enabled) = req
4587 .input
4588 .versioning_configuration
4589 .mfa_delete
4590 .as_ref()
4591 .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
4592 {
4593 let bucket = req.input.bucket.clone();
4594 let header = req.input.mfa.as_deref();
4595 let secret = mgr.lookup_secret(&bucket);
4596 let verified = match (header, secret.as_ref()) {
4597 (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
4598 Ok((serial, code)) => {
4599 serial == s.serial
4600 && crate::mfa::verify_totp(
4601 &s.secret_base32,
4602 &code,
4603 current_unix_secs(),
4604 )
4605 }
4606 Err(_) => false,
4607 },
4608 _ => false,
4609 };
4610 if !verified {
4611 crate::metrics::record_mfa_delete_denial(&bucket);
4612 let err = if header.is_none() {
4613 crate::mfa::MfaError::Missing
4614 } else {
4615 crate::mfa::MfaError::InvalidCode
4616 };
4617 return Err(mfa_error_to_s3(err));
4618 }
4619 mgr.set_bucket_state(&bucket, target_enabled);
4620 }
4621 if let Some(mgr) = self.versioning.as_ref() {
4627 let new_state = match req
4628 .input
4629 .versioning_configuration
4630 .status
4631 .as_ref()
4632 .map(|s| s.as_str())
4633 {
4634 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
4635 crate::versioning::VersioningState::Enabled
4636 }
4637 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
4638 crate::versioning::VersioningState::Suspended
4639 }
4640 _ => crate::versioning::VersioningState::Unversioned,
4641 };
4642 mgr.set_state(&req.input.bucket, new_state);
4643 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
4644 }
4645 self.backend.put_bucket_versioning(req).await
4646 }
4647
4648 async fn get_bucket_location(
4650 &self,
4651 req: S3Request<GetBucketLocationInput>,
4652 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
4653 self.backend.get_bucket_location(req).await
4654 }
4655
4656 async fn get_bucket_policy(
4658 &self,
4659 req: S3Request<GetBucketPolicyInput>,
4660 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
4661 self.backend.get_bucket_policy(req).await
4662 }
4663 async fn put_bucket_policy(
4664 &self,
4665 req: S3Request<PutBucketPolicyInput>,
4666 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
4667 self.backend.put_bucket_policy(req).await
4668 }
4669 async fn delete_bucket_policy(
4670 &self,
4671 req: S3Request<DeleteBucketPolicyInput>,
4672 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
4673 self.backend.delete_bucket_policy(req).await
4674 }
4675 async fn get_bucket_policy_status(
4676 &self,
4677 req: S3Request<GetBucketPolicyStatusInput>,
4678 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
4679 self.backend.get_bucket_policy_status(req).await
4680 }
4681
4682 async fn get_bucket_acl(
4684 &self,
4685 req: S3Request<GetBucketAclInput>,
4686 ) -> S3Result<S3Response<GetBucketAclOutput>> {
4687 self.backend.get_bucket_acl(req).await
4688 }
4689 async fn put_bucket_acl(
4690 &self,
4691 req: S3Request<PutBucketAclInput>,
4692 ) -> S3Result<S3Response<PutBucketAclOutput>> {
4693 self.backend.put_bucket_acl(req).await
4694 }
4695
4696 async fn get_bucket_cors(
4698 &self,
4699 req: S3Request<GetBucketCorsInput>,
4700 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
4701 if let Some(mgr) = self.cors.as_ref() {
4702 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4703 S3Error::with_message(
4704 S3ErrorCode::NoSuchCORSConfiguration,
4705 "The CORS configuration does not exist".to_string(),
4706 )
4707 })?;
4708 let rules: Vec<CORSRule> = cfg
4709 .rules
4710 .into_iter()
4711 .map(|r| CORSRule {
4712 allowed_headers: if r.allowed_headers.is_empty() {
4713 None
4714 } else {
4715 Some(r.allowed_headers)
4716 },
4717 allowed_methods: r.allowed_methods,
4718 allowed_origins: r.allowed_origins,
4719 expose_headers: if r.expose_headers.is_empty() {
4720 None
4721 } else {
4722 Some(r.expose_headers)
4723 },
4724 id: r.id,
4725 max_age_seconds: r.max_age_seconds.map(|s| s as i32),
4726 })
4727 .collect();
4728 return Ok(S3Response::new(GetBucketCorsOutput {
4729 cors_rules: Some(rules),
4730 }));
4731 }
4732 self.backend.get_bucket_cors(req).await
4733 }
4734 async fn put_bucket_cors(
4735 &self,
4736 req: S3Request<PutBucketCorsInput>,
4737 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4738 if let Some(mgr) = self.cors.as_ref() {
4739 let cfg = crate::cors::CorsConfig {
4740 rules: req
4741 .input
4742 .cors_configuration
4743 .cors_rules
4744 .into_iter()
4745 .map(|r| crate::cors::CorsRule {
4746 allowed_origins: r.allowed_origins,
4747 allowed_methods: r.allowed_methods,
4748 allowed_headers: r.allowed_headers.unwrap_or_default(),
4749 expose_headers: r.expose_headers.unwrap_or_default(),
4750 max_age_seconds: r.max_age_seconds.and_then(|s| {
4751 if s < 0 { None } else { Some(s as u32) }
4752 }),
4753 id: r.id,
4754 })
4755 .collect(),
4756 };
4757 mgr.put(&req.input.bucket, cfg);
4758 return Ok(S3Response::new(PutBucketCorsOutput::default()));
4759 }
4760 self.backend.put_bucket_cors(req).await
4761 }
4762 async fn delete_bucket_cors(
4763 &self,
4764 req: S3Request<DeleteBucketCorsInput>,
4765 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4766 if let Some(mgr) = self.cors.as_ref() {
4767 mgr.delete(&req.input.bucket);
4768 return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4769 }
4770 self.backend.delete_bucket_cors(req).await
4771 }
4772
4773 async fn get_bucket_lifecycle_configuration(
4775 &self,
4776 req: S3Request<GetBucketLifecycleConfigurationInput>,
4777 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4778 if let Some(mgr) = self.lifecycle.as_ref() {
4779 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4780 S3Error::with_message(
4781 S3ErrorCode::NoSuchLifecycleConfiguration,
4782 "The lifecycle configuration does not exist".to_string(),
4783 )
4784 })?;
4785 let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4786 return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4787 rules: Some(rules),
4788 transition_default_minimum_object_size: None,
4789 }));
4790 }
4791 self.backend.get_bucket_lifecycle_configuration(req).await
4792 }
4793 async fn put_bucket_lifecycle_configuration(
4794 &self,
4795 req: S3Request<PutBucketLifecycleConfigurationInput>,
4796 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4797 if let Some(mgr) = self.lifecycle.as_ref() {
4798 let bucket = req.input.bucket.clone();
4799 let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4800 let cfg = dto_lifecycle_to_internal(&dto_cfg);
4801 mgr.put(&bucket, cfg);
4802 return Ok(S3Response::new(
4803 PutBucketLifecycleConfigurationOutput::default(),
4804 ));
4805 }
4806 self.backend.put_bucket_lifecycle_configuration(req).await
4807 }
4808 async fn delete_bucket_lifecycle(
4809 &self,
4810 req: S3Request<DeleteBucketLifecycleInput>,
4811 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4812 if let Some(mgr) = self.lifecycle.as_ref() {
4813 mgr.delete(&req.input.bucket);
4814 return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4815 }
4816 self.backend.delete_bucket_lifecycle(req).await
4817 }
4818
4819 async fn get_bucket_tagging(
4821 &self,
4822 req: S3Request<GetBucketTaggingInput>,
4823 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4824 let Some(mgr) = self.tagging.as_ref() else {
4825 return self.backend.get_bucket_tagging(req).await;
4826 };
4827 let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4828 Ok(S3Response::new(GetBucketTaggingOutput {
4829 tag_set: tagset_to_aws(&tags),
4830 }))
4831 }
4832 async fn put_bucket_tagging(
4833 &self,
4834 req: S3Request<PutBucketTaggingInput>,
4835 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
4836 let Some(mgr) = self.tagging.as_ref() else {
4837 return self.backend.put_bucket_tagging(req).await;
4838 };
4839 let bucket = req.input.bucket.clone();
4840 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4841 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4842 })?;
4843 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4844 mgr.put_bucket_tags(&bucket, parsed);
4845 Ok(S3Response::new(PutBucketTaggingOutput::default()))
4846 }
4847 async fn delete_bucket_tagging(
4848 &self,
4849 req: S3Request<DeleteBucketTaggingInput>,
4850 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
4851 let Some(mgr) = self.tagging.as_ref() else {
4852 return self.backend.delete_bucket_tagging(req).await;
4853 };
4854 let bucket = req.input.bucket.clone();
4855 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4856 mgr.delete_bucket_tags(&bucket);
4857 Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
4858 }
4859
4860 async fn get_bucket_encryption(
4862 &self,
4863 req: S3Request<GetBucketEncryptionInput>,
4864 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
4865 self.backend.get_bucket_encryption(req).await
4866 }
4867 async fn put_bucket_encryption(
4868 &self,
4869 req: S3Request<PutBucketEncryptionInput>,
4870 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
4871 self.backend.put_bucket_encryption(req).await
4872 }
4873 async fn delete_bucket_encryption(
4874 &self,
4875 req: S3Request<DeleteBucketEncryptionInput>,
4876 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
4877 self.backend.delete_bucket_encryption(req).await
4878 }
4879
4880 async fn get_bucket_logging(
4882 &self,
4883 req: S3Request<GetBucketLoggingInput>,
4884 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
4885 self.backend.get_bucket_logging(req).await
4886 }
4887 async fn put_bucket_logging(
4888 &self,
4889 req: S3Request<PutBucketLoggingInput>,
4890 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
4891 self.backend.put_bucket_logging(req).await
4892 }
4893
4894 async fn get_bucket_notification_configuration(
4904 &self,
4905 req: S3Request<GetBucketNotificationConfigurationInput>,
4906 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
4907 if let Some(mgr) = self.notifications.as_ref() {
4908 let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
4909 let dto = notif_to_dto(&cfg);
4910 return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
4911 event_bridge_configuration: dto.event_bridge_configuration,
4912 lambda_function_configurations: dto.lambda_function_configurations,
4913 queue_configurations: dto.queue_configurations,
4914 topic_configurations: dto.topic_configurations,
4915 }));
4916 }
4917 self.backend
4918 .get_bucket_notification_configuration(req)
4919 .await
4920 }
4921 async fn put_bucket_notification_configuration(
4922 &self,
4923 req: S3Request<PutBucketNotificationConfigurationInput>,
4924 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
4925 if let Some(mgr) = self.notifications.as_ref() {
4926 let cfg = notif_from_dto(&req.input.notification_configuration);
4927 mgr.put(&req.input.bucket, cfg);
4928 return Ok(S3Response::new(
4929 PutBucketNotificationConfigurationOutput::default(),
4930 ));
4931 }
4932 self.backend
4933 .put_bucket_notification_configuration(req)
4934 .await
4935 }
4936
4937 async fn get_bucket_request_payment(
4939 &self,
4940 req: S3Request<GetBucketRequestPaymentInput>,
4941 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4942 self.backend.get_bucket_request_payment(req).await
4943 }
4944 async fn put_bucket_request_payment(
4945 &self,
4946 req: S3Request<PutBucketRequestPaymentInput>,
4947 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4948 self.backend.put_bucket_request_payment(req).await
4949 }
4950
4951 async fn get_bucket_website(
4953 &self,
4954 req: S3Request<GetBucketWebsiteInput>,
4955 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4956 self.backend.get_bucket_website(req).await
4957 }
4958 async fn put_bucket_website(
4959 &self,
4960 req: S3Request<PutBucketWebsiteInput>,
4961 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4962 self.backend.put_bucket_website(req).await
4963 }
4964 async fn delete_bucket_website(
4965 &self,
4966 req: S3Request<DeleteBucketWebsiteInput>,
4967 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4968 self.backend.delete_bucket_website(req).await
4969 }
4970
4971 async fn get_bucket_replication(
4973 &self,
4974 req: S3Request<GetBucketReplicationInput>,
4975 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4976 if let Some(mgr) = self.replication.as_ref() {
4977 return match mgr.get(&req.input.bucket) {
4978 Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4979 replication_configuration: Some(replication_to_dto(&cfg)),
4980 })),
4981 None => Err(S3Error::with_message(
4982 S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4983 format!("no replication configuration on bucket {}", req.input.bucket),
4984 )),
4985 };
4986 }
4987 self.backend.get_bucket_replication(req).await
4988 }
4989 async fn put_bucket_replication(
4990 &self,
4991 req: S3Request<PutBucketReplicationInput>,
4992 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4993 if let Some(mgr) = self.replication.as_ref() {
4994 let cfg = replication_from_dto(&req.input.replication_configuration);
4995 mgr.put(&req.input.bucket, cfg);
4996 return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4997 }
4998 self.backend.put_bucket_replication(req).await
4999 }
5000 async fn delete_bucket_replication(
5001 &self,
5002 req: S3Request<DeleteBucketReplicationInput>,
5003 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
5004 if let Some(mgr) = self.replication.as_ref() {
5005 mgr.delete(&req.input.bucket);
5006 return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
5007 }
5008 self.backend.delete_bucket_replication(req).await
5009 }
5010
5011 async fn get_bucket_accelerate_configuration(
5013 &self,
5014 req: S3Request<GetBucketAccelerateConfigurationInput>,
5015 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
5016 self.backend.get_bucket_accelerate_configuration(req).await
5017 }
5018 async fn put_bucket_accelerate_configuration(
5019 &self,
5020 req: S3Request<PutBucketAccelerateConfigurationInput>,
5021 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
5022 self.backend.put_bucket_accelerate_configuration(req).await
5023 }
5024
5025 async fn get_bucket_ownership_controls(
5027 &self,
5028 req: S3Request<GetBucketOwnershipControlsInput>,
5029 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
5030 self.backend.get_bucket_ownership_controls(req).await
5031 }
5032 async fn put_bucket_ownership_controls(
5033 &self,
5034 req: S3Request<PutBucketOwnershipControlsInput>,
5035 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
5036 self.backend.put_bucket_ownership_controls(req).await
5037 }
5038 async fn delete_bucket_ownership_controls(
5039 &self,
5040 req: S3Request<DeleteBucketOwnershipControlsInput>,
5041 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
5042 self.backend.delete_bucket_ownership_controls(req).await
5043 }
5044
5045 async fn get_public_access_block(
5047 &self,
5048 req: S3Request<GetPublicAccessBlockInput>,
5049 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
5050 self.backend.get_public_access_block(req).await
5051 }
5052 async fn put_public_access_block(
5053 &self,
5054 req: S3Request<PutPublicAccessBlockInput>,
5055 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
5056 self.backend.put_public_access_block(req).await
5057 }
5058 async fn delete_public_access_block(
5059 &self,
5060 req: S3Request<DeletePublicAccessBlockInput>,
5061 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
5062 self.backend.delete_public_access_block(req).await
5063 }
5064
5065 async fn select_object_content(
5084 &self,
5085 req: S3Request<SelectObjectContentInput>,
5086 ) -> S3Result<S3Response<SelectObjectContentOutput>> {
5087 use crate::select::{
5088 EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
5089 run_select_jsonlines,
5090 };
5091
5092 let select_bucket = req.input.bucket.clone();
5093 let select_key = req.input.key.clone();
5094 self.enforce_rate_limit(&req, &select_bucket)?;
5095 self.enforce_policy(
5096 &req,
5097 "s3:GetObject",
5098 &select_bucket,
5099 Some(&select_key),
5100 )?;
5101
5102 let request = req.input.request;
5103 let sql = request.expression.clone();
5104 if request.expression_type.as_str() != "SQL" {
5105 return Err(S3Error::with_message(
5106 S3ErrorCode::InvalidExpressionType,
5107 format!(
5108 "ExpressionType must be SQL, got: {}",
5109 request.expression_type.as_str()
5110 ),
5111 ));
5112 }
5113
5114 let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
5115 SelectInputFormat::JsonLines
5116 } else if let Some(csv) = request.input_serialization.csv.as_ref() {
5117 let has_header = csv
5118 .file_header_info
5119 .as_ref()
5120 .map(|h| {
5121 let s = h.as_str();
5122 s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
5123 })
5124 .unwrap_or(false);
5125 let delim = csv
5126 .field_delimiter
5127 .as_deref()
5128 .and_then(|s| s.chars().next())
5129 .unwrap_or(',');
5130 SelectInputFormat::Csv {
5131 has_header,
5132 delimiter: delim,
5133 }
5134 } else if request.input_serialization.parquet.is_some() {
5135 return Err(S3Error::with_message(
5136 S3ErrorCode::NotImplemented,
5137 "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
5138 ));
5139 } else {
5140 return Err(S3Error::with_message(
5141 S3ErrorCode::InvalidRequest,
5142 "InputSerialization requires exactly one of CSV / JSON / Parquet",
5143 ));
5144 };
5145 if let Some(ct) = request.input_serialization.compression_type.as_ref()
5146 && !ct.as_str().eq_ignore_ascii_case("NONE")
5147 {
5148 return Err(S3Error::with_message(
5149 S3ErrorCode::NotImplemented,
5150 format!(
5151 "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
5152 ct.as_str()
5153 ),
5154 ));
5155 }
5156
5157 let output_format = if request.output_serialization.json.is_some() {
5158 SelectOutputFormat::Json
5159 } else if request.output_serialization.csv.is_some() {
5160 SelectOutputFormat::Csv
5161 } else {
5162 return Err(S3Error::with_message(
5163 S3ErrorCode::InvalidRequest,
5164 "OutputSerialization requires exactly one of CSV / JSON",
5165 ));
5166 };
5167
5168 let get_input = GetObjectInput {
5169 bucket: select_bucket.clone(),
5170 key: select_key.clone(),
5171 sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
5172 sse_customer_key: req.input.sse_customer_key.clone(),
5173 sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
5174 ..Default::default()
5175 };
5176 let get_req = S3Request {
5177 input: get_input,
5178 method: http::Method::GET,
5179 uri: format!("/{}/{}", select_bucket, select_key)
5180 .parse()
5181 .map_err(|e| {
5182 S3Error::with_message(
5183 S3ErrorCode::InternalError,
5184 format!("constructing inner GET URI: {e}"),
5185 )
5186 })?,
5187 headers: http::HeaderMap::new(),
5188 extensions: http::Extensions::new(),
5189 credentials: req.credentials.clone(),
5190 region: req.region.clone(),
5191 service: req.service.clone(),
5192 trailing_headers: None,
5193 };
5194 let mut get_resp = self.get_object(get_req).await?;
5195 let blob = get_resp.output.body.take().ok_or_else(|| {
5196 S3Error::with_message(
5197 S3ErrorCode::InternalError,
5198 "Select: object body was empty after GET",
5199 )
5200 })?;
5201 let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
5202 .await
5203 .map_err(internal("collect Select body"))?;
5204 let scanned = body_bytes.len() as u64;
5205
5206 let matched_payload = match input_format {
5207 SelectInputFormat::JsonLines => {
5208 run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
5209 |e| select_error_to_s3(e, "JSON Lines"),
5210 )?
5211 }
5212 SelectInputFormat::Csv { .. } => {
5213 run_select_csv(&sql, &body_bytes, input_format, output_format)
5214 .map_err(|e| select_error_to_s3(e, "CSV"))?
5215 }
5216 };
5217
5218 let returned = matched_payload.len() as u64;
5219 let processed = scanned;
5220 let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
5221 if !matched_payload.is_empty() {
5222 events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
5223 payload: Some(bytes::Bytes::from(matched_payload)),
5224 })));
5225 }
5226 events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
5227 details: Some(Stats {
5228 bytes_scanned: Some(scanned as i64),
5229 bytes_processed: Some(processed as i64),
5230 bytes_returned: Some(returned as i64),
5231 }),
5232 })));
5233 events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
5234 let _writer = EventStreamWriter::new();
5237
5238 let stream =
5239 SelectObjectContentEventStream::new(futures::stream::iter(events));
5240 let output = SelectObjectContentOutput {
5241 payload: Some(stream),
5242 };
5243 Ok(S3Response::new(output))
5244 }
5245
5246 async fn put_bucket_inventory_configuration(
5260 &self,
5261 req: S3Request<PutBucketInventoryConfigurationInput>,
5262 ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
5263 if let Some(mgr) = self.inventory.as_ref() {
5264 let cfg = inv_from_dto(
5265 &req.input.bucket,
5266 &req.input.id,
5267 &req.input.inventory_configuration,
5268 );
5269 mgr.put(cfg);
5270 return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
5271 }
5272 self.backend.put_bucket_inventory_configuration(req).await
5273 }
5274
5275 async fn get_bucket_inventory_configuration(
5276 &self,
5277 req: S3Request<GetBucketInventoryConfigurationInput>,
5278 ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
5279 if let Some(mgr) = self.inventory.as_ref() {
5280 let cfg = mgr.get(&req.input.bucket, &req.input.id);
5281 if let Some(cfg) = cfg {
5282 let out = GetBucketInventoryConfigurationOutput {
5283 inventory_configuration: Some(inv_to_dto(&cfg)),
5284 };
5285 return Ok(S3Response::new(out));
5286 }
5287 let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
5294 .unwrap_or(S3ErrorCode::NoSuchKey);
5295 return Err(S3Error::with_message(
5296 code,
5297 format!(
5298 "no inventory configuration with id={} on bucket={}",
5299 req.input.id, req.input.bucket
5300 ),
5301 ));
5302 }
5303 self.backend.get_bucket_inventory_configuration(req).await
5304 }
5305
5306 async fn list_bucket_inventory_configurations(
5307 &self,
5308 req: S3Request<ListBucketInventoryConfigurationsInput>,
5309 ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
5310 if let Some(mgr) = self.inventory.as_ref() {
5311 let list = mgr.list_for_bucket(&req.input.bucket);
5312 let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
5313 let out = ListBucketInventoryConfigurationsOutput {
5314 continuation_token: req.input.continuation_token.clone(),
5315 inventory_configuration_list: if dto_list.is_empty() {
5316 None
5317 } else {
5318 Some(dto_list)
5319 },
5320 is_truncated: Some(false),
5321 next_continuation_token: None,
5322 };
5323 return Ok(S3Response::new(out));
5324 }
5325 self.backend.list_bucket_inventory_configurations(req).await
5326 }
5327
5328 async fn delete_bucket_inventory_configuration(
5329 &self,
5330 req: S3Request<DeleteBucketInventoryConfigurationInput>,
5331 ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
5332 if let Some(mgr) = self.inventory.as_ref() {
5333 mgr.delete(&req.input.bucket, &req.input.id);
5334 return Ok(S3Response::new(
5335 DeleteBucketInventoryConfigurationOutput::default(),
5336 ));
5337 }
5338 self.backend.delete_bucket_inventory_configuration(req).await
5339 }
5340}
5341
5342fn inv_from_dto(
5352 bucket: &str,
5353 id: &str,
5354 dto: &InventoryConfiguration,
5355) -> crate::inventory::InventoryConfig {
5356 let frequency_hours = match dto.schedule.frequency.as_str() {
5357 "Weekly" => 24 * 7,
5358 _ => 24,
5362 };
5363 let format = crate::inventory::InventoryFormat::Csv;
5367 crate::inventory::InventoryConfig {
5368 id: id.to_owned(),
5369 bucket: bucket.to_owned(),
5370 destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
5371 destination_prefix: dto
5372 .destination
5373 .s3_bucket_destination
5374 .prefix
5375 .clone()
5376 .unwrap_or_default(),
5377 frequency_hours,
5378 format,
5379 included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
5380 dto.included_object_versions.as_str(),
5381 ),
5382 }
5383}
5384
5385fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
5386 InventoryConfiguration {
5387 id: cfg.id.clone(),
5388 is_enabled: true,
5389 included_object_versions: InventoryIncludedObjectVersions::from(
5390 cfg.included_object_versions.as_aws_str().to_owned(),
5391 ),
5392 destination: InventoryDestination {
5393 s3_bucket_destination: InventoryS3BucketDestination {
5394 account_id: None,
5395 bucket: cfg.destination_bucket.clone(),
5396 encryption: None,
5397 format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
5398 prefix: if cfg.destination_prefix.is_empty() {
5399 None
5400 } else {
5401 Some(cfg.destination_prefix.clone())
5402 },
5403 },
5404 },
5405 schedule: InventorySchedule {
5406 frequency: InventoryFrequency::from(
5410 if cfg.frequency_hours == 24 * 7 {
5411 "Weekly"
5412 } else {
5413 "Daily"
5414 }
5415 .to_owned(),
5416 ),
5417 },
5418 filter: None,
5419 optional_fields: None,
5420 }
5421}
5422
5423fn notif_from_dto(
5440 dto: &NotificationConfiguration,
5441) -> crate::notifications::NotificationConfig {
5442 let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
5443 if let Some(topics) = dto.topic_configurations.as_ref() {
5444 for (idx, t) in topics.iter().enumerate() {
5445 let events = events_from_dto(&t.events);
5446 let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
5447 rules.push(crate::notifications::NotificationRule {
5448 id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
5449 events,
5450 destination: crate::notifications::Destination::Sns {
5451 topic_arn: t.topic_arn.clone(),
5452 },
5453 filter_prefix: prefix,
5454 filter_suffix: suffix,
5455 });
5456 }
5457 }
5458 if let Some(queues) = dto.queue_configurations.as_ref() {
5459 for (idx, q) in queues.iter().enumerate() {
5460 let events = events_from_dto(&q.events);
5461 let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
5462 rules.push(crate::notifications::NotificationRule {
5463 id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
5464 events,
5465 destination: crate::notifications::Destination::Sqs {
5466 queue_arn: q.queue_arn.clone(),
5467 },
5468 filter_prefix: prefix,
5469 filter_suffix: suffix,
5470 });
5471 }
5472 }
5473 crate::notifications::NotificationConfig { rules }
5474}
5475
5476fn notif_to_dto(
5477 cfg: &crate::notifications::NotificationConfig,
5478) -> NotificationConfiguration {
5479 let mut topics: Vec<TopicConfiguration> = Vec::new();
5480 let mut queues: Vec<QueueConfiguration> = Vec::new();
5481 for rule in &cfg.rules {
5482 let events: Vec<Event> = rule
5483 .events
5484 .iter()
5485 .map(|e| Event::from(e.as_aws_str().to_owned()))
5486 .collect();
5487 let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
5488 match &rule.destination {
5489 crate::notifications::Destination::Sns { topic_arn } => {
5490 topics.push(TopicConfiguration {
5491 events,
5492 filter,
5493 id: Some(rule.id.clone()),
5494 topic_arn: topic_arn.clone(),
5495 });
5496 }
5497 crate::notifications::Destination::Sqs { queue_arn } => {
5498 queues.push(QueueConfiguration {
5499 events,
5500 filter,
5501 id: Some(rule.id.clone()),
5502 queue_arn: queue_arn.clone(),
5503 });
5504 }
5505 crate::notifications::Destination::Webhook { .. } => {}
5510 }
5511 }
5512 NotificationConfiguration {
5513 event_bridge_configuration: None,
5514 lambda_function_configurations: None,
5515 queue_configurations: if queues.is_empty() { None } else { Some(queues) },
5516 topic_configurations: if topics.is_empty() { None } else { Some(topics) },
5517 }
5518}
5519
5520fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
5521 events
5522 .iter()
5523 .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
5524 .collect()
5525}
5526
5527fn filter_from_dto(
5528 f: Option<&NotificationConfigurationFilter>,
5529) -> (Option<String>, Option<String>) {
5530 let Some(f) = f else {
5531 return (None, None);
5532 };
5533 let Some(key) = f.key.as_ref() else {
5534 return (None, None);
5535 };
5536 let Some(rules) = key.filter_rules.as_ref() else {
5537 return (None, None);
5538 };
5539 let mut prefix = None;
5540 let mut suffix = None;
5541 for r in rules {
5542 let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
5543 let value = r.value.clone();
5544 match name.as_deref() {
5545 Some("prefix") => prefix = value,
5546 Some("suffix") => suffix = value,
5547 _ => {}
5548 }
5549 }
5550 (prefix, suffix)
5551}
5552
5553fn filter_to_dto(
5554 prefix: Option<&str>,
5555 suffix: Option<&str>,
5556) -> Option<NotificationConfigurationFilter> {
5557 if prefix.is_none() && suffix.is_none() {
5558 return None;
5559 }
5560 let mut rules: Vec<FilterRule> = Vec::new();
5561 if let Some(p) = prefix {
5562 rules.push(FilterRule {
5563 name: Some(FilterRuleName::from("prefix".to_owned())),
5564 value: Some(p.to_owned()),
5565 });
5566 }
5567 if let Some(s) = suffix {
5568 rules.push(FilterRule {
5569 name: Some(FilterRuleName::from("suffix".to_owned())),
5570 value: Some(s.to_owned()),
5571 });
5572 }
5573 Some(NotificationConfigurationFilter {
5574 key: Some(S3KeyFilter {
5575 filter_rules: Some(rules),
5576 }),
5577 })
5578}
5579
5580fn replication_from_dto(
5593 dto: &ReplicationConfiguration,
5594) -> crate::replication::ReplicationConfig {
5595 let rules = dto
5596 .rules
5597 .iter()
5598 .enumerate()
5599 .map(|(idx, r)| {
5600 let id = r
5601 .id
5602 .as_ref()
5603 .map(|s| s.as_str().to_owned())
5604 .unwrap_or_else(|| format!("rule-{idx}"));
5605 let priority = r.priority.unwrap_or(0).max(0) as u32;
5606 let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
5607 let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
5608 let destination_bucket = r.destination.bucket.clone();
5609 let destination_storage_class = r
5610 .destination
5611 .storage_class
5612 .as_ref()
5613 .map(|s| s.as_str().to_owned());
5614 crate::replication::ReplicationRule {
5615 id,
5616 priority,
5617 status_enabled,
5618 filter,
5619 destination_bucket,
5620 destination_storage_class,
5621 }
5622 })
5623 .collect();
5624 crate::replication::ReplicationConfig {
5625 role: dto.role.clone(),
5626 rules,
5627 }
5628}
5629
5630fn replication_to_dto(
5631 cfg: &crate::replication::ReplicationConfig,
5632) -> ReplicationConfiguration {
5633 let rules = cfg
5634 .rules
5635 .iter()
5636 .map(|r| {
5637 let status = if r.status_enabled {
5638 ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
5639 } else {
5640 ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
5641 };
5642 let destination = Destination {
5643 access_control_translation: None,
5644 account: None,
5645 bucket: r.destination_bucket.clone(),
5646 encryption_configuration: None,
5647 metrics: None,
5648 replication_time: None,
5649 storage_class: r
5650 .destination_storage_class
5651 .as_ref()
5652 .map(|s| StorageClass::from(s.clone())),
5653 };
5654 let filter = Some(replication_filter_to_dto(&r.filter));
5655 ReplicationRule {
5656 delete_marker_replication: None,
5657 destination,
5658 existing_object_replication: None,
5659 filter,
5660 id: Some(r.id.clone()),
5661 prefix: None,
5662 priority: Some(r.priority as i32),
5663 source_selection_criteria: None,
5664 status,
5665 }
5666 })
5667 .collect();
5668 ReplicationConfiguration {
5669 role: cfg.role.clone(),
5670 rules,
5671 }
5672}
5673
5674fn replication_filter_from_dto(
5675 f: Option<&ReplicationRuleFilter>,
5676 rule_level_prefix: Option<&str>,
5677) -> crate::replication::ReplicationFilter {
5678 let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
5679 let mut tags: Vec<(String, String)> = Vec::new();
5680 if let Some(f) = f {
5681 if let Some(p) = f.prefix.as_ref()
5682 && prefix.is_none()
5683 {
5684 prefix = Some(p.clone());
5685 }
5686 if let Some(t) = f.tag.as_ref()
5687 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5688 {
5689 tags.push((k.clone(), v.clone()));
5690 }
5691 if let Some(and) = f.and.as_ref() {
5692 if let Some(p) = and.prefix.as_ref()
5693 && prefix.is_none()
5694 {
5695 prefix = Some(p.clone());
5696 }
5697 if let Some(ts) = and.tags.as_ref() {
5698 for t in ts {
5699 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5700 tags.push((k.clone(), v.clone()));
5701 }
5702 }
5703 }
5704 }
5705 }
5706 crate::replication::ReplicationFilter { prefix, tags }
5707}
5708
5709fn replication_filter_to_dto(
5710 f: &crate::replication::ReplicationFilter,
5711) -> ReplicationRuleFilter {
5712 if f.tags.is_empty() {
5713 ReplicationRuleFilter {
5714 and: None,
5715 prefix: f.prefix.clone(),
5716 tag: None,
5717 }
5718 } else if f.tags.len() == 1 && f.prefix.is_none() {
5719 let (k, v) = &f.tags[0];
5720 ReplicationRuleFilter {
5721 and: None,
5722 prefix: None,
5723 tag: Some(Tag {
5724 key: Some(k.clone()),
5725 value: Some(v.clone()),
5726 }),
5727 }
5728 } else {
5729 let tags: Vec<Tag> = f
5730 .tags
5731 .iter()
5732 .map(|(k, v)| Tag {
5733 key: Some(k.clone()),
5734 value: Some(v.clone()),
5735 })
5736 .collect();
5737 ReplicationRuleFilter {
5738 and: Some(ReplicationRuleAndOperator {
5739 prefix: f.prefix.clone(),
5740 tags: Some(tags),
5741 }),
5742 prefix: None,
5743 tag: None,
5744 }
5745 }
5746}
5747
5748fn dto_lifecycle_to_internal(
5762 dto: &BucketLifecycleConfiguration,
5763) -> crate::lifecycle::LifecycleConfig {
5764 crate::lifecycle::LifecycleConfig {
5765 rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5766 }
5767}
5768
5769fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5770 let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5771 let filter = rule
5772 .filter
5773 .as_ref()
5774 .map(dto_filter_to_internal)
5775 .unwrap_or_default();
5776 let expiration_days = rule
5777 .expiration
5778 .as_ref()
5779 .and_then(|e| e.days)
5780 .and_then(|d| u32::try_from(d).ok());
5781 let expiration_date = rule
5782 .expiration
5783 .as_ref()
5784 .and_then(|e| e.date.as_ref())
5785 .and_then(timestamp_to_chrono_utc);
5786 let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5787 .transitions
5788 .as_ref()
5789 .map(|ts| {
5790 ts.iter()
5791 .filter_map(|t| {
5792 let days = u32::try_from(t.days?).ok()?;
5793 let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5794 Some(crate::lifecycle::TransitionRule {
5795 days,
5796 storage_class,
5797 })
5798 })
5799 .collect()
5800 })
5801 .unwrap_or_default();
5802 let noncurrent_version_expiration_days = rule
5803 .noncurrent_version_expiration
5804 .as_ref()
5805 .and_then(|n| n.noncurrent_days)
5806 .and_then(|d| u32::try_from(d).ok());
5807 let abort_incomplete_multipart_upload_days = rule
5808 .abort_incomplete_multipart_upload
5809 .as_ref()
5810 .and_then(|a| a.days_after_initiation)
5811 .and_then(|d| u32::try_from(d).ok());
5812 crate::lifecycle::LifecycleRule {
5813 id: rule.id.clone().unwrap_or_default(),
5814 status,
5815 filter,
5816 expiration_days,
5817 expiration_date,
5818 transitions,
5819 noncurrent_version_expiration_days,
5820 abort_incomplete_multipart_upload_days,
5821 }
5822}
5823
5824fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5825 let mut prefix = filter.prefix.clone();
5826 let mut tags: Vec<(String, String)> = Vec::new();
5827 let mut size_gt: Option<u64> = filter
5828 .object_size_greater_than
5829 .and_then(|n| u64::try_from(n).ok());
5830 let mut size_lt: Option<u64> = filter
5831 .object_size_less_than
5832 .and_then(|n| u64::try_from(n).ok());
5833 if let Some(t) = &filter.tag
5834 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5835 {
5836 tags.push((k.clone(), v.clone()));
5837 }
5838 if let Some(and) = &filter.and {
5839 if prefix.is_none() {
5840 prefix = and.prefix.clone();
5841 }
5842 if size_gt.is_none() {
5843 size_gt = and
5844 .object_size_greater_than
5845 .and_then(|n| u64::try_from(n).ok());
5846 }
5847 if size_lt.is_none() {
5848 size_lt = and
5849 .object_size_less_than
5850 .and_then(|n| u64::try_from(n).ok());
5851 }
5852 if let Some(ts) = &and.tags {
5853 for t in ts {
5854 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5855 tags.push((k.clone(), v.clone()));
5856 }
5857 }
5858 }
5859 }
5860 crate::lifecycle::LifecycleFilter {
5861 prefix,
5862 tags,
5863 object_size_greater_than: size_gt,
5864 object_size_less_than: size_lt,
5865 }
5866}
5867
5868fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
5869 let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
5870 Some(LifecycleExpiration {
5871 date: rule.expiration_date.map(chrono_utc_to_timestamp),
5872 days: rule.expiration_days.map(|d| d as i32),
5873 expired_object_delete_marker: None,
5874 })
5875 } else {
5876 None
5877 };
5878 let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
5879 None
5880 } else {
5881 Some(
5882 rule.transitions
5883 .iter()
5884 .map(|t| Transition {
5885 date: None,
5886 days: Some(t.days as i32),
5887 storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
5888 })
5889 .collect(),
5890 )
5891 };
5892 let noncurrent_version_expiration =
5893 rule.noncurrent_version_expiration_days
5894 .map(|d| NoncurrentVersionExpiration {
5895 newer_noncurrent_versions: None,
5896 noncurrent_days: Some(d as i32),
5897 });
5898 let abort_incomplete_multipart_upload =
5899 rule.abort_incomplete_multipart_upload_days
5900 .map(|d| AbortIncompleteMultipartUpload {
5901 days_after_initiation: Some(d as i32),
5902 });
5903 let filter = if rule.filter.tags.is_empty()
5904 && rule.filter.object_size_greater_than.is_none()
5905 && rule.filter.object_size_less_than.is_none()
5906 {
5907 rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
5908 and: None,
5909 object_size_greater_than: None,
5910 object_size_less_than: None,
5911 prefix: Some(p.clone()),
5912 tag: None,
5913 })
5914 } else if rule.filter.tags.len() == 1
5915 && rule.filter.prefix.is_none()
5916 && rule.filter.object_size_greater_than.is_none()
5917 && rule.filter.object_size_less_than.is_none()
5918 {
5919 let (k, v) = rule.filter.tags[0].clone();
5920 Some(LifecycleRuleFilter {
5921 and: None,
5922 object_size_greater_than: None,
5923 object_size_less_than: None,
5924 prefix: None,
5925 tag: Some(Tag {
5926 key: Some(k),
5927 value: Some(v),
5928 }),
5929 })
5930 } else {
5931 let tags = if rule.filter.tags.is_empty() {
5932 None
5933 } else {
5934 Some(
5935 rule.filter
5936 .tags
5937 .iter()
5938 .map(|(k, v)| Tag {
5939 key: Some(k.clone()),
5940 value: Some(v.clone()),
5941 })
5942 .collect(),
5943 )
5944 };
5945 Some(LifecycleRuleFilter {
5946 and: Some(LifecycleRuleAndOperator {
5947 object_size_greater_than: rule
5948 .filter
5949 .object_size_greater_than
5950 .and_then(|n| i64::try_from(n).ok()),
5951 object_size_less_than: rule
5952 .filter
5953 .object_size_less_than
5954 .and_then(|n| i64::try_from(n).ok()),
5955 prefix: rule.filter.prefix.clone(),
5956 tags,
5957 }),
5958 object_size_greater_than: None,
5959 object_size_less_than: None,
5960 prefix: None,
5961 tag: None,
5962 })
5963 };
5964 LifecycleRule {
5965 abort_incomplete_multipart_upload,
5966 expiration,
5967 filter,
5968 id: if rule.id.is_empty() {
5969 None
5970 } else {
5971 Some(rule.id.clone())
5972 },
5973 noncurrent_version_expiration,
5974 noncurrent_version_transitions: None,
5975 prefix: None,
5976 status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5977 transitions,
5978 }
5979}
5980
5981#[derive(Debug, Clone)]
6012pub struct SigV4aGate {
6013 store: crate::sigv4a::SharedSigV4aCredentialStore,
6014}
6015
6016impl SigV4aGate {
6017 #[must_use]
6018 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
6019 Self { store }
6020 }
6021
6022 pub fn pre_route<B>(
6038 &self,
6039 req: &http::Request<B>,
6040 requested_region: &str,
6041 canonical_request_bytes: &[u8],
6042 ) -> Result<(), SigV4aGateError> {
6043 if !crate::sigv4a::detect(req) {
6044 return Ok(());
6045 }
6046 let auth_hdr = req
6047 .headers()
6048 .get(http::header::AUTHORIZATION)
6049 .and_then(|v| v.to_str().ok())
6050 .ok_or(SigV4aGateError::MissingAuthorization)?;
6051 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
6052 .ok_or(SigV4aGateError::MalformedAuthorization)?;
6053 let region_set = req
6054 .headers()
6055 .get(crate::sigv4a::REGION_SET_HEADER)
6056 .and_then(|v| v.to_str().ok())
6057 .unwrap_or("*");
6058 let key = self
6059 .store
6060 .get(&parsed.access_key_id)
6061 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
6062 crate::sigv4a::verify(
6063 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
6064 &parsed.signature_der,
6065 key,
6066 region_set,
6067 requested_region,
6068 )
6069 .map_err(SigV4aGateError::Verify)?;
6070 Ok(())
6071 }
6072}
6073
6074#[derive(Debug, thiserror::Error)]
6078pub enum SigV4aGateError {
6079 #[error("missing Authorization header")]
6080 MissingAuthorization,
6081 #[error("malformed SigV4a Authorization header")]
6082 MalformedAuthorization,
6083 #[error("unknown SigV4a access-key-id: {0}")]
6084 UnknownAccessKey(String),
6085 #[error("SigV4a verification failed: {0}")]
6086 Verify(#[source] crate::sigv4a::SigV4aError),
6087}
6088
6089impl SigV4aGateError {
6090 #[must_use]
6092 pub fn s3_error_code(&self) -> &'static str {
6093 match self {
6094 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
6095 _ => "SignatureDoesNotMatch",
6096 }
6097 }
6098}
6099
6100#[cfg(test)]
6101mod tests {
6102 use super::*;
6103
6104 #[test]
6105 fn manifest_roundtrip_via_metadata() {
6106 let original = ChunkManifest {
6107 codec: CodecKind::CpuZstd,
6108 original_size: 1234,
6109 compressed_size: 567,
6110 crc32c: 0xdead_beef,
6111 };
6112 let mut meta: Option<Metadata> = None;
6113 write_manifest(&mut meta, &original);
6114 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
6115 assert_eq!(extracted.codec, original.codec);
6116 assert_eq!(extracted.original_size, original.original_size);
6117 assert_eq!(extracted.compressed_size, original.compressed_size);
6118 assert_eq!(extracted.crc32c, original.crc32c);
6119 }
6120
6121 #[test]
6122 fn missing_metadata_yields_none() {
6123 let meta: Option<Metadata> = None;
6124 assert!(extract_manifest(&meta).is_none());
6125 }
6126
6127 #[test]
6128 fn partial_metadata_yields_none() {
6129 let mut meta = Metadata::new();
6130 meta.insert(META_CODEC.into(), "cpu-zstd".into());
6131 let opt = Some(meta);
6132 assert!(extract_manifest(&opt).is_none());
6133 }
6134
6135 #[test]
6136 fn parse_copy_source_range_basic() {
6137 let r = parse_copy_source_range("bytes=10-20").unwrap();
6138 match r {
6139 s3s::dto::Range::Int { first, last } => {
6140 assert_eq!(first, 10);
6141 assert_eq!(last, Some(20));
6142 }
6143 _ => panic!("expected Int range"),
6144 }
6145 }
6146
6147 #[test]
6148 fn parse_copy_source_range_rejects_inverted() {
6149 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
6150 assert!(err.contains("last < first"));
6151 }
6152
6153 #[test]
6154 fn parse_copy_source_range_rejects_missing_prefix() {
6155 let err = parse_copy_source_range("10-20").unwrap_err();
6156 assert!(err.contains("must start with 'bytes='"));
6157 }
6158
6159 #[test]
6160 fn parse_copy_source_range_rejects_open_ended() {
6161 assert!(parse_copy_source_range("bytes=10-").is_err());
6164 assert!(parse_copy_source_range("bytes=-10").is_err());
6165 }
6166
6167 #[test]
6173 fn safe_object_uri_basic_ascii() {
6174 let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
6175 assert_eq!(uri.path(), "/bucket/key");
6176 }
6177
6178 #[test]
6179 fn safe_object_uri_encodes_spaces() {
6180 let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
6181 assert!(
6183 uri.path().contains("%20"),
6184 "expected percent-encoded space, got {}",
6185 uri.path()
6186 );
6187 assert!(uri.path().starts_with("/bucket/"));
6188 }
6189
6190 #[test]
6191 fn safe_object_uri_preserves_slashes() {
6192 let uri =
6196 safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
6197 assert_eq!(uri.path(), "/bucket/key/with/slashes");
6198 }
6199
6200 #[test]
6201 fn safe_object_uri_handles_newline_without_panic() {
6202 let _ = safe_object_uri("bucket", "key\n");
6206 }
6207
6208 #[test]
6209 fn safe_object_uri_handles_null_byte_without_panic() {
6210 let _ = safe_object_uri("bucket", "key\0bad");
6211 }
6212
6213 #[test]
6214 fn safe_object_uri_handles_unicode_without_panic() {
6215 let _ = safe_object_uri("bucket", "rtl\u{202E}override");
6217 let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
6218 let _ = safe_object_uri("bucket", "日本語キー");
6219 }
6220
6221 #[test]
6222 fn safe_object_uri_no_panic_for_every_byte() {
6223 for b in 0u8..=255 {
6228 let s = String::from_utf8_lossy(&[b]).into_owned();
6229 let _ = safe_object_uri("bucket", &s);
6230 }
6231 }
6232
6233 #[tokio::test]
6245 async fn kms_dek_lifetime_within_function_scope() {
6246 use crate::kms::{KmsBackend, LocalKms};
6247 use std::collections::HashMap;
6248 use std::path::PathBuf;
6249 use zeroize::Zeroizing;
6250
6251 let mut keks = HashMap::new();
6252 keks.insert("scope".to_string(), [33u8; 32]);
6253 let kms = LocalKms::from_keks(PathBuf::from("/tmp/kms-scope-test"), keks);
6254
6255 let (dek, wrapped) = kms.generate_dek("scope").await.unwrap();
6257 assert_eq!(dek.len(), 32);
6258 let mut dek_arr: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
6259 dek_arr.copy_from_slice(&dek);
6260
6261 let dek_ref: &[u8; 32] = &dek_arr;
6265 assert_eq!(dek_ref, &*dek_arr);
6267 assert_eq!(wrapped.key_id, "scope");
6269
6270 }
6277}