1use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry, CompressTelemetry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54const SAMPLE_BYTES: usize = 4096;
56
57#[inline]
64fn stamp_gpu_compress_telemetry(tel: &CompressTelemetry) {
65 if let Some(secs) = tel.gpu_seconds {
66 crate::metrics::record_gpu_compress(tel.codec, secs, tel.bytes_in, tel.bytes_out);
67 }
68 if tel.oom {
69 crate::metrics::record_gpu_oom(tel.codec);
70 }
71}
72
73const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
81 .add(b' ')
82 .add(b'"')
83 .add(b'#')
84 .add(b'<')
85 .add(b'>')
86 .add(b'?')
87 .add(b'`')
88 .add(b'{')
89 .add(b'}')
90 .add(b'|')
91 .add(b'\\')
92 .add(b'^')
93 .add(b'[')
94 .add(b']')
95 .add(b'%');
96
97pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
109 use percent_encoding::utf8_percent_encode;
110 let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
111 let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
112 let raw = format!("/{bucket_enc}/{key_enc}");
113 raw.parse::<http::Uri>().map_err(|e| {
114 let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
123 .unwrap_or(S3ErrorCode::InvalidArgument);
124 S3Error::with_message(
125 code,
126 format!("object key cannot be encoded as a request URI: {e}"),
127 )
128 })
129}
130
131struct AccessLogPreamble {
135 remote_ip: Option<String>,
136 requester: Option<String>,
137 request_uri: String,
138 user_agent: Option<String>,
139}
140
141pub struct S4Service<B: S3> {
142 backend: Arc<B>,
147 registry: Arc<CodecRegistry>,
148 dispatcher: Arc<dyn CodecDispatcher>,
149 max_body_bytes: usize,
150 policy: Option<crate::policy::SharedPolicy>,
151 secure_transport: bool,
156 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
158 access_log: Option<crate::access_log::SharedAccessLog>,
160 sse_keyring: Option<crate::sse::SharedSseKeyring>,
168 versioning: Option<Arc<crate::versioning::VersioningManager>>,
178 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
187 kms_default_key_id: Option<String>,
188 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
199 cors: Option<Arc<crate::cors::CorsManager>>,
208 inventory: Option<Arc<crate::inventory::InventoryManager>>,
219 notifications: Option<Arc<crate::notifications::NotificationManager>>,
227 lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
240 tagging: Option<Arc<crate::tagging::TagManager>>,
253 replication: Option<Arc<crate::replication::ReplicationManager>>,
272 mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
281 compliance_strict: bool,
287 sigv4a_gate: Option<Arc<SigV4aGate>>,
297 multipart_state: Arc<crate::multipart_state::MultipartStateStore>,
304 sse_chunk_size: usize,
312}
313
314impl<B: S3> S4Service<B> {
315 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
317
318 pub fn new(
319 backend: B,
320 registry: Arc<CodecRegistry>,
321 dispatcher: Arc<dyn CodecDispatcher>,
322 ) -> Self {
323 Self {
324 backend: Arc::new(backend),
325 registry,
326 dispatcher,
327 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
328 policy: None,
329 secure_transport: false,
330 rate_limits: None,
331 access_log: None,
332 sse_keyring: None,
333 versioning: None,
334 kms: None,
335 kms_default_key_id: None,
336 object_lock: None,
337 cors: None,
338 inventory: None,
339 notifications: None,
340 lifecycle: None,
341 tagging: None,
342 replication: None,
343 mfa_delete: None,
344 compliance_strict: false,
345 sigv4a_gate: None,
346 multipart_state: Arc::new(crate::multipart_state::MultipartStateStore::new()),
347 sse_chunk_size: 0,
353 }
354 }
355
356 #[must_use]
365 pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
366 self.sigv4a_gate = Some(gate);
367 self
368 }
369
370 #[must_use]
377 pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
378 self.sigv4a_gate.as_ref()
379 }
380
381 #[must_use]
388 pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
389 self.tagging = Some(mgr);
390 self
391 }
392
393 #[must_use]
397 pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
398 self.tagging.as_ref()
399 }
400
401 #[must_use]
411 pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
412 self.inventory = Some(mgr);
413 self
414 }
415
416 #[must_use]
421 pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
422 self.inventory.as_ref()
423 }
424
425 #[must_use]
435 pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
436 self.lifecycle = Some(mgr);
437 self
438 }
439
440 #[must_use]
445 pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
446 self.lifecycle.as_ref()
447 }
448
449 #[must_use]
458 pub fn run_lifecycle_once_for_test(
459 &self,
460 bucket: &str,
461 objects: &[crate::lifecycle::EvaluateBatchEntry],
462 ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
463 let Some(mgr) = self.lifecycle.as_ref() else {
464 return Vec::new();
465 };
466 crate::lifecycle::evaluate_batch(mgr, bucket, objects)
467 }
468
469 #[must_use]
479 pub fn with_notifications(
480 mut self,
481 mgr: Arc<crate::notifications::NotificationManager>,
482 ) -> Self {
483 self.notifications = Some(mgr);
484 self
485 }
486
487 #[must_use]
491 pub fn notifications_manager(
492 &self,
493 ) -> Option<&Arc<crate::notifications::NotificationManager>> {
494 self.notifications.as_ref()
495 }
496
497 fn fire_delete_notification(
502 &self,
503 bucket: &str,
504 key: &str,
505 event: crate::notifications::EventType,
506 version_id: Option<String>,
507 ) {
508 let Some(mgr) = self.notifications.as_ref() else {
509 return;
510 };
511 let dests = mgr.match_destinations(bucket, &event, key);
512 if dests.is_empty() {
513 return;
514 }
515 tokio::spawn(crate::notifications::dispatch_event(
516 Arc::clone(mgr),
517 bucket.to_owned(),
518 key.to_owned(),
519 event,
520 None,
521 None,
522 version_id,
523 format!("S4-{}", uuid::Uuid::new_v4()),
524 ));
525 }
526
527 #[must_use]
539 pub fn with_replication(
540 mut self,
541 mgr: Arc<crate::replication::ReplicationManager>,
542 ) -> Self {
543 self.replication = Some(mgr);
544 self
545 }
546
547 #[must_use]
551 pub fn replication_manager(
552 &self,
553 ) -> Option<&Arc<crate::replication::ReplicationManager>> {
554 self.replication.as_ref()
555 }
556
557 fn spawn_replication_if_matched(
567 &self,
568 source_bucket: &str,
569 source_key: &str,
570 request_tags: &Option<crate::tagging::TagSet>,
571 body: &bytes::Bytes,
572 metadata: &Option<std::collections::HashMap<String, String>>,
573 backend_ok: bool,
574 ) where
575 B: Send + Sync + 'static,
576 {
577 if !backend_ok {
578 return;
579 }
580 let Some(mgr) = self.replication.as_ref() else {
581 return;
582 };
583 let object_tags: Vec<(String, String)> = request_tags
590 .as_ref()
591 .map(|ts| ts.iter().cloned().collect())
592 .unwrap_or_default();
593 let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
594 return;
595 };
596 mgr.record_status(
600 source_bucket,
601 source_key,
602 crate::replication::ReplicationStatus::Pending,
603 );
604 let mgr_cl = Arc::clone(mgr);
605 let backend = Arc::clone(&self.backend);
606 let body_cl = body.clone();
607 let metadata_cl = metadata.clone();
608 let source_bucket_cl = source_bucket.to_owned();
609 let source_key_cl = source_key.to_owned();
610 tokio::spawn(async move {
611 let do_put = move |dest_bucket: String,
612 dest_key: String,
613 dest_body: bytes::Bytes,
614 dest_meta: Option<std::collections::HashMap<String, String>>| {
615 let backend = Arc::clone(&backend);
616 async move {
617 let req = S3Request {
618 input: PutObjectInput {
619 bucket: dest_bucket,
620 key: dest_key,
621 body: Some(bytes_to_blob(dest_body)),
622 metadata: dest_meta,
623 ..Default::default()
624 },
625 method: http::Method::PUT,
626 uri: "/".parse().unwrap(),
627 headers: http::HeaderMap::new(),
628 extensions: http::Extensions::new(),
629 credentials: None,
630 region: None,
631 service: None,
632 trailing_headers: None,
633 };
634 backend
635 .put_object(req)
636 .await
637 .map(|_| ())
638 .map_err(|e| format!("destination put_object: {e}"))
639 }
640 };
641 crate::replication::replicate_object(
642 rule,
643 source_bucket_cl,
644 source_key_cl,
645 body_cl,
646 metadata_cl,
647 do_put,
648 mgr_cl,
649 )
650 .await;
651 });
652 }
653
654 #[must_use]
661 pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
662 self.mfa_delete = Some(mgr);
663 self
664 }
665
666 #[must_use]
670 pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
671 self.mfa_delete.as_ref()
672 }
673
674 #[must_use]
680 pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
681 self.cors = Some(mgr);
682 self
683 }
684
685 #[must_use]
687 pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
688 self.cors.as_ref()
689 }
690
691 #[must_use]
708 pub fn handle_preflight(
709 &self,
710 bucket: &str,
711 origin: &str,
712 method: &str,
713 request_headers: &[String],
714 ) -> Option<std::collections::HashMap<String, String>> {
715 let mgr = self.cors.as_ref()?;
716 let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
717 let mut h = std::collections::HashMap::new();
718 let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
722 "*".to_string()
723 } else {
724 origin.to_string()
725 };
726 h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
727 h.insert(
728 "Access-Control-Allow-Methods".to_string(),
729 rule.allowed_methods.join(", "),
730 );
731 if !rule.allowed_headers.is_empty() {
732 h.insert(
737 "Access-Control-Allow-Headers".to_string(),
738 rule.allowed_headers.join(", "),
739 );
740 }
741 if let Some(secs) = rule.max_age_seconds {
742 h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
743 }
744 if !rule.expose_headers.is_empty() {
745 h.insert(
746 "Access-Control-Expose-Headers".to_string(),
747 rule.expose_headers.join(", "),
748 );
749 }
750 Some(h)
751 }
752
753 #[must_use]
760 pub fn with_compliance_strict(mut self, on: bool) -> Self {
761 self.compliance_strict = on;
762 self
763 }
764
765 #[must_use]
771 pub fn with_object_lock(
772 mut self,
773 mgr: Arc<crate::object_lock::ObjectLockManager>,
774 ) -> Self {
775 self.object_lock = Some(mgr);
776 self
777 }
778
779 #[must_use]
787 pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
788 self.object_lock.as_ref()
789 }
790
791 #[must_use]
795 pub fn with_kms_backend(
796 mut self,
797 kms: Arc<dyn crate::kms::KmsBackend>,
798 default_key_id: Option<String>,
799 ) -> Self {
800 self.kms = Some(kms);
801 self.kms_default_key_id = default_key_id;
802 self
803 }
804
805 #[must_use]
817 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
818 self.versioning = Some(mgr);
819 self
820 }
821
822 #[must_use]
829 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
830 let keyring = crate::sse::SseKeyring::new(1, key);
831 self.sse_keyring = Some(std::sync::Arc::new(keyring));
832 self
833 }
834
835 #[must_use]
841 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
842 self.sse_keyring = Some(keyring);
843 self
844 }
845
846 #[must_use]
862 pub fn with_sse_chunk_size(mut self, bytes: usize) -> Self {
863 self.sse_chunk_size = bytes;
864 self
865 }
866
867 #[must_use]
873 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
874 self.access_log = Some(log);
875 self
876 }
877
878 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
883 self.access_log.as_ref()?;
884 Some(AccessLogPreamble {
885 remote_ip: req
886 .headers
887 .get("x-forwarded-for")
888 .and_then(|v| v.to_str().ok())
889 .and_then(|raw| raw.split(',').next())
890 .map(|s| s.trim().to_owned()),
891 requester: Self::principal_of(req).map(str::to_owned),
892 request_uri: format!("{} {}", req.method, req.uri.path()),
893 user_agent: req
894 .headers
895 .get("user-agent")
896 .and_then(|v| v.to_str().ok())
897 .map(str::to_owned),
898 })
899 }
900
901 #[allow(clippy::too_many_arguments)]
905 async fn record_access(
906 &self,
907 preamble: Option<AccessLogPreamble>,
908 operation: &'static str,
909 bucket: &str,
910 key: Option<&str>,
911 http_status: u16,
912 bytes_sent: u64,
913 object_size: u64,
914 total_time_ms: u64,
915 error_code: Option<&str>,
916 ) {
917 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
918 return;
919 };
920 log.record(crate::access_log::AccessLogEntry {
921 time: std::time::SystemTime::now(),
922 bucket: bucket.to_owned(),
923 remote_ip: p.remote_ip,
924 requester: p.requester,
925 operation,
926 key: key.map(str::to_owned),
927 request_uri: p.request_uri,
928 http_status,
929 error_code: error_code.map(str::to_owned),
930 bytes_sent,
931 object_size,
932 total_time_ms,
933 user_agent: p.user_agent,
934 })
935 .await;
936 }
937
938 #[must_use]
944 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
945 self.rate_limits = Some(rl);
946 self
947 }
948
949 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
953 let Some(rl) = self.rate_limits.as_ref() else {
954 return Ok(());
955 };
956 let principal_id = Self::principal_of(req);
957 if !rl.check(principal_id, bucket) {
958 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
959 return Err(S3Error::with_message(
960 S3ErrorCode::SlowDown,
961 format!("rate-limited: bucket={bucket}"),
962 ));
963 }
964 Ok(())
965 }
966
967 #[must_use]
971 pub fn with_secure_transport(mut self, on: bool) -> Self {
972 self.secure_transport = on;
973 self
974 }
975
976 #[must_use]
977 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
978 self.max_body_bytes = n;
979 self
980 }
981
982 #[must_use]
987 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
988 self.policy = Some(policy);
989 self
990 }
991
992 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
995 req.credentials.as_ref().map(|c| c.access_key.as_str())
996 }
997
998 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
1005 let user_agent = req
1006 .headers
1007 .get("user-agent")
1008 .and_then(|v| v.to_str().ok())
1009 .map(str::to_owned);
1010 let source_ip = req
1013 .headers
1014 .get("x-forwarded-for")
1015 .and_then(|v| v.to_str().ok())
1016 .and_then(|raw| raw.split(',').next())
1017 .and_then(|s| s.trim().parse().ok());
1018 crate::policy::RequestContext {
1019 source_ip,
1020 user_agent,
1021 request_time: Some(std::time::SystemTime::now()),
1022 secure_transport: self.secure_transport,
1023 existing_object_tags: None,
1024 request_object_tags: None,
1025 extra: Default::default(),
1026 }
1027 }
1028
1029 fn enforce_policy<I>(
1034 &self,
1035 req: &S3Request<I>,
1036 action: &'static str,
1037 bucket: &str,
1038 key: Option<&str>,
1039 ) -> S3Result<()> {
1040 self.enforce_policy_with_extra(req, action, bucket, key, None, None)
1041 }
1042
1043 fn enforce_policy_with_extra<I>(
1050 &self,
1051 req: &S3Request<I>,
1052 action: &'static str,
1053 bucket: &str,
1054 key: Option<&str>,
1055 request_tags: Option<&crate::tagging::TagSet>,
1056 existing_tags: Option<&crate::tagging::TagSet>,
1057 ) -> S3Result<()> {
1058 let Some(policy) = self.policy.as_ref() else {
1059 return Ok(());
1060 };
1061 let principal_id = Self::principal_of(req);
1062 let mut ctx = self.request_context(req);
1063 if let Some(t) = request_tags {
1064 ctx.request_object_tags = Some(t.clone());
1065 }
1066 if let Some(t) = existing_tags {
1067 ctx.existing_object_tags = Some(t.clone());
1068 }
1069 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1070 if decision.allow {
1071 Ok(())
1072 } else {
1073 crate::metrics::record_policy_denial(action, bucket);
1074 tracing::info!(
1075 action,
1076 bucket,
1077 key = ?key,
1078 principal = ?principal_id,
1079 source_ip = ?ctx.source_ip,
1080 user_agent = ?ctx.user_agent,
1081 secure_transport = ctx.secure_transport,
1082 matched_sid = ?decision.matched_sid,
1083 effect = ?decision.matched_effect,
1084 "S4 policy denied request"
1085 );
1086 Err(S3Error::with_message(
1087 S3ErrorCode::AccessDenied,
1088 format!("denied by S4 policy: {action} on bucket={bucket}"),
1089 ))
1090 }
1091 }
1092
1093 pub fn into_backend(self) -> B {
1099 Arc::try_unwrap(self.backend)
1100 .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1101 }
1102
1103 async fn partial_range_get(
1106 &self,
1107 req: &S3Request<GetObjectInput>,
1108 plan: s4_codec::index::RangePlan,
1109 client_start: u64,
1110 client_end_exclusive: u64,
1111 total_original: u64,
1112 get_start: Instant,
1113 ) -> S3Result<S3Response<GetObjectOutput>> {
1114 let backend_range = s3s::dto::Range::Int {
1116 first: plan.byte_start,
1117 last: Some(plan.byte_end_exclusive - 1),
1118 };
1119 let backend_input = GetObjectInput {
1120 bucket: req.input.bucket.clone(),
1121 key: req.input.key.clone(),
1122 range: Some(backend_range),
1123 ..Default::default()
1124 };
1125 let backend_req = S3Request {
1126 input: backend_input,
1127 method: req.method.clone(),
1128 uri: req.uri.clone(),
1129 headers: req.headers.clone(),
1130 extensions: http::Extensions::new(),
1131 credentials: req.credentials.clone(),
1132 region: req.region.clone(),
1133 service: req.service.clone(),
1134 trailing_headers: None,
1135 };
1136 let mut backend_resp = self.backend.get_object(backend_req).await?;
1137 let blob = backend_resp.output.body.take().ok_or_else(|| {
1138 S3Error::with_message(
1139 S3ErrorCode::InternalError,
1140 "backend partial GET returned empty body",
1141 )
1142 })?;
1143 let bytes = collect_blob(blob, self.max_body_bytes)
1144 .await
1145 .map_err(internal("collect partial body"))?;
1146
1147 let mut combined = BytesMut::new();
1149 for frame in FrameIter::new(bytes) {
1150 let (header, payload) = frame.map_err(|e| {
1151 S3Error::with_message(
1152 S3ErrorCode::InternalError,
1153 format!("partial-range frame parse: {e}"),
1154 )
1155 })?;
1156 let chunk_manifest = ChunkManifest {
1157 codec: header.codec,
1158 original_size: header.original_size,
1159 compressed_size: header.compressed_size,
1160 crc32c: header.crc32c,
1161 };
1162 let decompressed = self
1163 .registry
1164 .decompress(payload, &chunk_manifest)
1165 .await
1166 .map_err(internal("partial-range decompress"))?;
1167 combined.extend_from_slice(&decompressed);
1168 }
1169 let combined = combined.freeze();
1170 let sliced = combined
1171 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1172
1173 let returned_size = sliced.len() as u64;
1175 backend_resp.output.content_length = Some(returned_size as i64);
1176 backend_resp.output.content_range = Some(format!(
1177 "bytes {client_start}-{}/{total_original}",
1178 client_end_exclusive - 1
1179 ));
1180 backend_resp.output.checksum_crc32 = None;
1181 backend_resp.output.checksum_crc32c = None;
1182 backend_resp.output.checksum_crc64nvme = None;
1183 backend_resp.output.checksum_sha1 = None;
1184 backend_resp.output.checksum_sha256 = None;
1185 backend_resp.output.e_tag = None;
1186 backend_resp.output.body = Some(bytes_to_blob(sliced));
1187 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1188
1189 let elapsed = get_start.elapsed();
1190 crate::metrics::record_get(
1191 "partial",
1192 plan.byte_end_exclusive - plan.byte_start,
1193 returned_size,
1194 elapsed.as_secs_f64(),
1195 true,
1196 );
1197 info!(
1198 op = "get_object",
1199 bucket = %req.input.bucket,
1200 key = %req.input.key,
1201 bytes_in = plan.byte_end_exclusive - plan.byte_start,
1202 bytes_out = returned_size,
1203 total_object_size = total_original,
1204 range = true,
1205 path = "sidecar-partial",
1206 latency_ms = elapsed.as_millis() as u64,
1207 "S4 partial Range GET via sidecar index"
1208 );
1209 Ok(backend_resp)
1210 }
1211
1212 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1216 let bytes = encode_index(index);
1217 let len = bytes.len() as i64;
1218 let sidecar = sidecar_key(key);
1219 let uri = match safe_object_uri(bucket, &sidecar) {
1225 Ok(u) => u,
1226 Err(e) => {
1227 tracing::warn!(
1228 bucket,
1229 key,
1230 "S4 write_sidecar skipped (key not URI-encodable): {e}"
1231 );
1232 return;
1233 }
1234 };
1235 let put_input = PutObjectInput {
1236 bucket: bucket.into(),
1237 key: sidecar,
1238 body: Some(bytes_to_blob(bytes)),
1239 content_length: Some(len),
1240 content_type: Some("application/x-s4-index".into()),
1241 ..Default::default()
1242 };
1243 let put_req = S3Request {
1244 input: put_input,
1245 method: http::Method::PUT,
1246 uri,
1247 headers: http::HeaderMap::new(),
1248 extensions: http::Extensions::new(),
1249 credentials: None,
1250 region: None,
1251 service: None,
1252 trailing_headers: None,
1253 };
1254 if let Err(e) = self.backend.put_object(put_req).await {
1255 tracing::warn!(
1256 bucket,
1257 key,
1258 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1259 );
1260 }
1261 }
1262
1263 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1265 let sidecar = sidecar_key(key);
1266 let uri = safe_object_uri(bucket, &sidecar).ok()?;
1268 let get_input = GetObjectInput {
1269 bucket: bucket.into(),
1270 key: sidecar,
1271 ..Default::default()
1272 };
1273 let get_req = S3Request {
1274 input: get_input,
1275 method: http::Method::GET,
1276 uri,
1277 headers: http::HeaderMap::new(),
1278 extensions: http::Extensions::new(),
1279 credentials: None,
1280 region: None,
1281 service: None,
1282 trailing_headers: None,
1283 };
1284 let resp = self.backend.get_object(get_req).await.ok()?;
1285 let blob = resp.output.body?;
1286 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1287 decode_index(bytes).ok()
1288 }
1289
1290 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1296 let mut out = BytesMut::new();
1297 for frame in FrameIter::new(bytes) {
1298 let (header, payload) = frame.map_err(|e| {
1299 S3Error::with_message(
1300 S3ErrorCode::InternalError,
1301 format!("multipart frame parse: {e}"),
1302 )
1303 })?;
1304 let chunk_manifest = ChunkManifest {
1305 codec: header.codec,
1306 original_size: header.original_size,
1307 compressed_size: header.compressed_size,
1308 crc32c: header.crc32c,
1309 };
1310 let decompressed = self
1311 .registry
1312 .decompress(payload, &chunk_manifest)
1313 .await
1314 .map_err(internal("multipart frame decompress"))?;
1315 out.extend_from_slice(&decompressed);
1316 }
1317 Ok(out.freeze())
1318 }
1319}
1320
1321fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1326 let rest = s
1327 .strip_prefix("bytes=")
1328 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1329 let (a, b) = rest
1330 .split_once('-')
1331 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1332 let first: u64 = a
1333 .parse()
1334 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1335 let last: u64 = b
1336 .parse()
1337 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1338 if last < first {
1339 return Err(format!("CopySourceRange last < first: {s:?}"));
1340 }
1341 Ok(s3s::dto::Range::Int {
1342 first,
1343 last: Some(last),
1344 })
1345}
1346
1347pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1361 format!("{key}.__s4ver__/{version_id}")
1362}
1363
1364fn is_versioning_shadow_key(key: &str) -> bool {
1367 key.contains(".__s4ver__/")
1368}
1369
1370fn current_unix_secs() -> u64 {
1376 std::time::SystemTime::now()
1377 .duration_since(std::time::UNIX_EPOCH)
1378 .map(|d| d.as_secs())
1379 .unwrap_or(0)
1380}
1381
1382fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1390 match e {
1391 crate::mfa::MfaError::Missing => S3Error::with_message(
1392 S3ErrorCode::AccessDenied,
1393 "MFA token required for this operation",
1394 ),
1395 crate::mfa::MfaError::Malformed => S3Error::with_message(
1396 S3ErrorCode::InvalidRequest,
1397 "malformed x-amz-mfa header",
1398 ),
1399 crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1400 S3ErrorCode::AccessDenied,
1401 "MFA serial does not match configured device",
1402 ),
1403 crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1404 S3ErrorCode::AccessDenied,
1405 "invalid MFA code",
1406 ),
1407 }
1408}
1409
1410fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1411 metadata
1412 .as_ref()
1413 .and_then(|m| m.get(META_MULTIPART))
1414 .map(|v| v == "true")
1415 .unwrap_or(false)
1416}
1417
1418const META_CODEC: &str = "s4-codec";
1419const META_ORIGINAL_SIZE: &str = "s4-original-size";
1420const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1421const META_CRC32C: &str = "s4-crc32c";
1422const META_MULTIPART: &str = "s4-multipart";
1425const META_FRAMED: &str = "s4-framed";
1429
1430fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1431 metadata
1432 .as_ref()
1433 .and_then(|m| m.get(META_FRAMED))
1434 .map(|v| v == "true")
1435 .unwrap_or(false)
1436}
1437
1438fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1440 metadata
1441 .as_ref()
1442 .and_then(|m| m.get("s4-encrypted"))
1443 .map(|v| v == "aes-256-gcm")
1444 .unwrap_or(false)
1445}
1446
1447fn extract_sse_c_material(
1454 algorithm: &Option<String>,
1455 key: &Option<String>,
1456 md5: &Option<String>,
1457) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1458 match (algorithm, key, md5) {
1459 (None, None, None) => Ok(None),
1460 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1461 .map(Some)
1462 .map_err(sse_c_error_to_s3),
1463 _ => Err(S3Error::with_message(
1464 S3ErrorCode::InvalidRequest,
1465 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1466 )),
1467 }
1468}
1469
1470fn extract_kms_key_id(
1473 sse: &Option<ServerSideEncryption>,
1474 sse_kms_key_id: &Option<String>,
1475 gateway_default: Option<&str>,
1476) -> Option<String> {
1477 let asks_for_kms = sse
1478 .as_ref()
1479 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1480 .unwrap_or(false);
1481 if !asks_for_kms {
1482 return None;
1483 }
1484 sse_kms_key_id
1485 .clone()
1486 .or_else(|| gateway_default.map(str::to_owned))
1487}
1488
1489fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1493 use crate::kms::KmsError as K;
1494 match e {
1495 K::KeyNotFound { key_id } => S3Error::with_message(
1496 S3ErrorCode::InvalidArgument,
1497 format!("KMS key not found: {key_id}"),
1498 ),
1499 K::BackendUnavailable { message } => S3Error::with_message(
1500 S3ErrorCode::ServiceUnavailable,
1501 format!("KMS backend unavailable: {message}"),
1502 ),
1503 other => S3Error::with_message(
1504 S3ErrorCode::InternalError,
1505 format!("KMS error: {other}"),
1506 ),
1507 }
1508}
1509
1510fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1514 use crate::sse::SseError as E;
1515 match e {
1516 E::WrongCustomerKey => S3Error::with_message(
1517 S3ErrorCode::AccessDenied,
1518 "SSE-C key does not match the key used at PUT time",
1519 ),
1520 E::InvalidCustomerKey { reason } => S3Error::with_message(
1521 S3ErrorCode::InvalidArgument,
1522 format!("SSE-C: {reason}"),
1523 ),
1524 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1525 S3ErrorCode::InvalidArgument,
1526 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1527 ),
1528 E::CustomerKeyRequired => S3Error::with_message(
1529 S3ErrorCode::InvalidRequest,
1530 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1531 ),
1532 E::CustomerKeyUnexpected => S3Error::with_message(
1533 S3ErrorCode::InvalidRequest,
1534 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1535 ),
1536 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1537 }
1538}
1539
1540fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1541 let m = metadata.as_ref()?;
1542 let codec = m
1543 .get(META_CODEC)
1544 .and_then(|s| s.parse::<CodecKind>().ok())?;
1545 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1546 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1547 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1548 Some(ChunkManifest {
1549 codec,
1550 original_size,
1551 compressed_size,
1552 crc32c,
1553 })
1554}
1555
1556fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1557 let meta = metadata.get_or_insert_with(Default::default);
1558 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1559 meta.insert(
1560 META_ORIGINAL_SIZE.into(),
1561 manifest.original_size.to_string(),
1562 );
1563 meta.insert(
1564 META_COMPRESSED_SIZE.into(),
1565 manifest.compressed_size.to_string(),
1566 );
1567 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1568}
1569
1570fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1571 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1572}
1573
1574fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1580 use crate::select::SelectError;
1581 match e {
1582 SelectError::Parse(msg) => S3Error::with_message(
1583 S3ErrorCode::InvalidRequest,
1584 format!("SQL parse error: {msg}"),
1585 ),
1586 SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1587 S3ErrorCode::InvalidRequest,
1588 format!("unsupported SQL feature: {msg}"),
1589 ),
1590 SelectError::RowEval(msg) => S3Error::with_message(
1591 S3ErrorCode::InvalidRequest,
1592 format!("SQL row evaluation error: {msg}"),
1593 ),
1594 SelectError::InputFormat(msg) => S3Error::with_message(
1595 S3ErrorCode::InvalidRequest,
1596 format!("{fmt} input format error: {msg}"),
1597 ),
1598 }
1599}
1600
1601fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1605 headers
1606 .get("x-amz-bypass-governance-retention")
1607 .and_then(|v| v.to_str().ok())
1608 .map(|s| s.eq_ignore_ascii_case("true"))
1609 .unwrap_or(false)
1610}
1611
1612fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1618 let mut buf = Vec::new();
1619 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1620 let s = std::str::from_utf8(&buf).ok()?;
1621 chrono::DateTime::parse_from_rfc3339(s)
1622 .ok()
1623 .map(|dt| dt.with_timezone(&chrono::Utc))
1624}
1625
1626fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1629 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1634 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1635}
1636
1637fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1642 set.iter()
1643 .map(|(k, v)| Tag {
1644 key: Some(k.clone()),
1645 value: Some(v.clone()),
1646 })
1647 .collect()
1648}
1649
1650fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1655 let pairs = tags
1656 .iter()
1657 .map(|t| {
1658 (
1659 t.key.clone().unwrap_or_default(),
1660 t.value.clone().unwrap_or_default(),
1661 )
1662 })
1663 .collect();
1664 crate::tagging::TagSet::from_pairs(pairs)
1665}
1666
1667pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1671 if total == 0 {
1672 return Err("cannot range-get zero-length object".into());
1673 }
1674 match range {
1675 s3s::dto::Range::Int { first, last } => {
1676 let start = *first;
1677 let end_inclusive = match last {
1678 Some(l) => (*l).min(total - 1),
1679 None => total - 1,
1680 };
1681 if start > end_inclusive || start >= total {
1682 return Err(format!(
1683 "range bytes={start}-{:?} out of object size {total}",
1684 last
1685 ));
1686 }
1687 Ok((start, end_inclusive + 1))
1688 }
1689 s3s::dto::Range::Suffix { length } => {
1690 let len = (*length).min(total);
1691 Ok((total - len, total))
1692 }
1693 }
1694}
1695
1696#[async_trait::async_trait]
1697impl<B: S3> S3 for S4Service<B> {
1698 #[tracing::instrument(
1700 name = "s4.put_object",
1701 skip(self, req),
1702 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1703 )]
1704 async fn put_object(
1705 &self,
1706 mut req: S3Request<PutObjectInput>,
1707 ) -> S3Result<S3Response<PutObjectOutput>> {
1708 let put_start = Instant::now();
1709 let put_bucket = req.input.bucket.clone();
1710 let put_key = req.input.key.clone();
1711 let access_preamble = self.access_log_preamble(&req);
1712 self.enforce_rate_limit(&req, &put_bucket)?;
1713 let request_tags: Option<crate::tagging::TagSet> = req
1719 .input
1720 .tagging
1721 .as_deref()
1722 .map(crate::tagging::parse_tagging_header)
1723 .transpose()
1724 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1725 let existing_tags: Option<crate::tagging::TagSet> = self
1726 .tagging
1727 .as_ref()
1728 .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1729 self.enforce_policy_with_extra(
1730 &req,
1731 "s3:PutObject",
1732 &put_bucket,
1733 Some(&put_key),
1734 request_tags.as_ref(),
1735 existing_tags.as_ref(),
1736 )?;
1737 if let Some(mgr) = self.object_lock.as_ref()
1745 && let Some(state) = mgr.get(&put_bucket, &put_key)
1746 {
1747 let bucket_versioned_enabled = self
1748 .versioning
1749 .as_ref()
1750 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1751 .unwrap_or(false);
1752 if !bucket_versioned_enabled {
1753 let bypass = parse_bypass_governance_header(&req.headers);
1754 let now = chrono::Utc::now();
1755 if !state.can_delete(now, bypass) {
1756 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1757 return Err(S3Error::with_message(
1758 S3ErrorCode::AccessDenied,
1759 "Access Denied because object protected by object lock",
1760 ));
1761 }
1762 }
1763 }
1764 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1770 .input
1771 .object_lock_mode
1772 .as_ref()
1773 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1774 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1775 .input
1776 .object_lock_retain_until_date
1777 .as_ref()
1778 .and_then(timestamp_to_chrono_utc);
1779 let explicit_legal_hold_on: Option<bool> = req
1780 .input
1781 .object_lock_legal_hold_status
1782 .as_ref()
1783 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1784 if let Some(blob) = req.input.body.take() {
1785 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1788 .await
1789 .map_err(internal("peek put sample"))?;
1790 let sample_len = sample.len().min(SAMPLE_BYTES);
1791 let total_size_hint = req.input.content_length.and_then(|n| u64::try_from(n).ok());
1795 let kind = self
1796 .dispatcher
1797 .pick_with_size_hint(&sample[..sample_len], total_size_hint)
1798 .await;
1799
1800 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1806 let (compressed, manifest, is_framed) = if use_framed {
1807 let chained = chain_sample_with_rest(sample, rest_stream);
1809 debug!(
1810 bucket = ?req.input.bucket,
1811 key = ?req.input.key,
1812 codec = kind.as_str(),
1813 path = "streaming-framed",
1814 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1815 );
1816 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1820 let (body, manifest) = streaming_compress_to_frames(
1821 chained,
1822 Arc::clone(&self.registry),
1823 kind,
1824 chunk_size,
1825 )
1826 .await
1827 .map_err(internal("streaming framed compress"))?;
1828 (body, manifest, true)
1829 } else {
1830 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1833 .await
1834 .map_err(internal("collect put body (buffered path)"))?;
1835 debug!(
1836 bucket = ?req.input.bucket,
1837 key = ?req.input.key,
1838 bytes = bytes.len(),
1839 codec = kind.as_str(),
1840 path = "buffered",
1841 "S4 put_object: compressing (buffered, raw blob)"
1842 );
1843 let (compress_res, tel) =
1849 self.registry.compress_with_telemetry(bytes, kind).await;
1850 stamp_gpu_compress_telemetry(&tel);
1851 let (body, m) = compress_res.map_err(internal("registry compress"))?;
1852 (body, m, false)
1853 };
1854
1855 write_manifest(&mut req.input.metadata, &manifest);
1856 if is_framed {
1857 req.input
1859 .metadata
1860 .get_or_insert_with(Default::default)
1861 .insert(META_FRAMED.into(), "true".into());
1862 }
1863 req.input.content_length = Some(compressed.len() as i64);
1867 req.input.checksum_algorithm = None;
1872 req.input.checksum_crc32 = None;
1873 req.input.checksum_crc32c = None;
1874 req.input.checksum_crc64nvme = None;
1875 req.input.checksum_sha1 = None;
1876 req.input.checksum_sha256 = None;
1877 req.input.content_md5 = None;
1878 let original_size = manifest.original_size;
1879 let compressed_size = manifest.compressed_size;
1880 let codec_label = manifest.codec.as_str();
1881 let sidecar_index = if is_framed {
1884 s4_codec::index::build_index_from_body(&compressed).ok()
1885 } else {
1886 None
1887 };
1888 let sse_c_alg = req.input.sse_customer_algorithm.take();
1905 let sse_c_key = req.input.sse_customer_key.take();
1906 let sse_c_md5 = req.input.sse_customer_key_md5.take();
1907 let sse_header = req.input.server_side_encryption.take();
1908 let sse_kms_key = req.input.ssekms_key_id.take();
1909 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1910 let kms_key_id = extract_kms_key_id(
1915 &sse_header,
1916 &sse_kms_key,
1917 self.kms_default_key_id.as_deref(),
1918 );
1919 if self.compliance_strict
1926 && sse_c_material.is_none()
1927 && kms_key_id.is_none()
1928 && self.sse_keyring.is_none()
1929 && sse_header.as_ref().map(|s| s.as_str())
1930 != Some(ServerSideEncryption::AES256)
1931 {
1932 return Err(S3Error::with_message(
1933 S3ErrorCode::InvalidRequest,
1934 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1935 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1936 ));
1937 }
1938 if sse_c_material.is_some() && kms_key_id.is_some() {
1941 return Err(S3Error::with_message(
1942 S3ErrorCode::InvalidArgument,
1943 "SSE-C and SSE-KMS cannot be used together on the same PUT",
1944 ));
1945 }
1946 let kms_wrap = if let Some(ref key_id) = kms_key_id {
1949 let kms = self.kms.as_ref().ok_or_else(|| {
1950 S3Error::with_message(
1951 S3ErrorCode::InvalidRequest,
1952 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1953 )
1954 })?;
1955 let (dek, wrapped) = kms
1956 .generate_dek(key_id)
1957 .await
1958 .map_err(kms_error_to_s3)?;
1959 if dek.len() != 32 {
1960 return Err(S3Error::with_message(
1961 S3ErrorCode::InternalError,
1962 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1963 ));
1964 }
1965 let mut dek_arr = [0u8; 32];
1966 dek_arr.copy_from_slice(&dek);
1967 Some((dek_arr, wrapped))
1968 } else {
1969 None
1970 };
1971 let body_to_send = if let Some(ref m) = sse_c_material {
1978 let meta = req.input.metadata.get_or_insert_with(Default::default);
1979 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1980 meta.insert("s4-sse-type".into(), "AES256".into());
1981 meta.insert("s4-sse-c-key-md5".into(),
1982 base64::engine::general_purpose::STANDARD.encode(m.key_md5));
1983 crate::sse::encrypt_with_source(
1984 &compressed,
1985 crate::sse::SseSource::CustomerKey {
1986 key: &m.key,
1987 key_md5: &m.key_md5,
1988 },
1989 )
1990 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1991 let meta = req.input.metadata.get_or_insert_with(Default::default);
1992 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1993 meta.insert("s4-sse-type".into(), "aws:kms".into());
1994 meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
1995 crate::sse::encrypt_with_source(
1996 &compressed,
1997 crate::sse::SseSource::Kms { dek, wrapped },
1998 )
1999 } else if let Some(keyring) = self.sse_keyring.as_ref() {
2000 let meta = req.input.metadata.get_or_insert_with(Default::default);
2008 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2009 if self.sse_chunk_size > 0 {
2016 crate::sse::encrypt_v2_chunked(
2017 &compressed,
2018 keyring,
2019 self.sse_chunk_size,
2020 )
2021 .map_err(|e| {
2022 S3Error::with_message(
2023 S3ErrorCode::InternalError,
2024 format!("SSE-S4 chunked encrypt failed: {e}"),
2025 )
2026 })?
2027 } else {
2028 crate::sse::encrypt_v2(&compressed, keyring)
2029 }
2030 } else {
2031 compressed.clone()
2032 };
2033 let replication_body = body_to_send.clone();
2038 let replication_metadata = req.input.metadata.clone();
2039 req.input.content_length = Some(body_to_send.len() as i64);
2048 req.input.body = Some(bytes_to_blob(body_to_send));
2049 let pending_version: Option<crate::versioning::PutOutcome> = self
2058 .versioning
2059 .as_ref()
2060 .map(|mgr| mgr.state(&put_bucket))
2061 .map(|state| match state {
2062 crate::versioning::VersioningState::Enabled => {
2063 crate::versioning::PutOutcome {
2064 version_id: crate::versioning::VersioningManager::new_version_id(),
2065 versioned_response: true,
2066 }
2067 }
2068 crate::versioning::VersioningState::Suspended
2069 | crate::versioning::VersioningState::Unversioned => {
2070 crate::versioning::PutOutcome {
2071 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2072 versioned_response: false,
2073 }
2074 }
2075 });
2076 if let Some(ref pv) = pending_version
2077 && pv.versioned_response
2078 {
2079 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2080 }
2081 let mut backend_resp = self.backend.put_object(req).await;
2082 if let Some(idx) = sidecar_index
2083 && backend_resp.is_ok()
2084 && idx.entries.len() > 1
2085 {
2086 self.write_sidecar(&put_bucket, &put_key, &idx).await;
2092 }
2093 if let (Some(mgr), Some(pv), Ok(resp)) = (
2097 self.versioning.as_ref(),
2098 pending_version.as_ref(),
2099 backend_resp.as_mut(),
2100 ) {
2101 let etag = resp
2102 .output
2103 .e_tag
2104 .clone()
2105 .map(ETag::into_value)
2106 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2107 let now = chrono::Utc::now();
2108 mgr.commit_put_with_version(
2109 &put_bucket,
2110 &put_key,
2111 crate::versioning::VersionEntry {
2112 version_id: pv.version_id.clone(),
2113 etag,
2114 size: original_size,
2115 is_delete_marker: false,
2116 created_at: now,
2117 },
2118 );
2119 if pv.versioned_response {
2120 resp.output.version_id = Some(pv.version_id.clone());
2121 }
2122 }
2123 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2127 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2128 resp.output.sse_customer_key_md5 = Some(
2129 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2130 );
2131 }
2132 if let (Some((_, wrapped)), Ok(resp)) =
2136 (kms_wrap.as_ref(), backend_resp.as_mut())
2137 {
2138 resp.output.server_side_encryption =
2139 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2140 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2141 }
2142 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2148 if explicit_lock_mode.is_some()
2149 || explicit_retain_until.is_some()
2150 || explicit_legal_hold_on.is_some()
2151 {
2152 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2153 if let Some(m) = explicit_lock_mode {
2154 state.mode = Some(m);
2155 }
2156 if let Some(u) = explicit_retain_until {
2157 state.retain_until = Some(u);
2158 }
2159 if let Some(lh) = explicit_legal_hold_on {
2160 state.legal_hold_on = lh;
2161 }
2162 mgr.set(&put_bucket, &put_key, state);
2163 }
2164 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2165 }
2166 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
2168 crate::metrics::record_put(
2169 codec_label,
2170 original_size,
2171 compressed_size,
2172 elapsed.as_secs_f64(),
2173 backend_resp.is_ok(),
2174 );
2175 self.record_access(
2177 access_preamble,
2178 "REST.PUT.OBJECT",
2179 &put_bucket,
2180 Some(&put_key),
2181 if backend_resp.is_ok() { 200 } else { 500 },
2182 compressed_size,
2183 original_size,
2184 elapsed.as_millis() as u64,
2185 backend_resp.as_ref().err().map(|e| e.code().as_str()),
2186 )
2187 .await;
2188 info!(
2189 op = "put_object",
2190 bucket = %put_bucket,
2191 key = %put_key,
2192 codec = codec_label,
2193 bytes_in = original_size,
2194 bytes_out = compressed_size,
2195 ratio = format!(
2196 "{:.3}",
2197 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2198 ),
2199 latency_ms = elapsed.as_millis() as u64,
2200 ok = backend_resp.is_ok(),
2201 "S4 put completed"
2202 );
2203 if backend_resp.is_ok()
2208 && let Some(mgr) = self.notifications.as_ref()
2209 {
2210 let dests = mgr.match_destinations(
2211 &put_bucket,
2212 &crate::notifications::EventType::ObjectCreatedPut,
2213 &put_key,
2214 );
2215 if !dests.is_empty() {
2216 let etag = backend_resp
2217 .as_ref()
2218 .ok()
2219 .and_then(|r| r.output.e_tag.clone())
2220 .map(ETag::into_value);
2221 let version_id = pending_version
2222 .as_ref()
2223 .filter(|pv| pv.versioned_response)
2224 .map(|pv| pv.version_id.clone());
2225 tokio::spawn(crate::notifications::dispatch_event(
2226 Arc::clone(mgr),
2227 put_bucket.clone(),
2228 put_key.clone(),
2229 crate::notifications::EventType::ObjectCreatedPut,
2230 Some(original_size),
2231 etag,
2232 version_id,
2233 format!("S4-{}", uuid::Uuid::new_v4()),
2234 ));
2235 }
2236 }
2237 if backend_resp.is_ok()
2242 && let (Some(mgr), Some(tags)) =
2243 (self.tagging.as_ref(), request_tags.clone())
2244 {
2245 mgr.put_object_tags(&put_bucket, &put_key, tags);
2246 }
2247 self.spawn_replication_if_matched(
2256 &put_bucket,
2257 &put_key,
2258 &request_tags,
2259 &replication_body,
2260 &replication_metadata,
2261 backend_resp.is_ok(),
2262 );
2263 return backend_resp;
2264 }
2265 let pending_version: Option<crate::versioning::PutOutcome> = self
2269 .versioning
2270 .as_ref()
2271 .map(|mgr| mgr.state(&put_bucket))
2272 .map(|state| match state {
2273 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2274 version_id: crate::versioning::VersioningManager::new_version_id(),
2275 versioned_response: true,
2276 },
2277 _ => crate::versioning::PutOutcome {
2278 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2279 versioned_response: false,
2280 },
2281 });
2282 if let Some(ref pv) = pending_version
2283 && pv.versioned_response
2284 {
2285 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2286 }
2287 let mut backend_resp = self.backend.put_object(req).await;
2288 if let (Some(mgr), Some(pv), Ok(resp)) = (
2289 self.versioning.as_ref(),
2290 pending_version.as_ref(),
2291 backend_resp.as_mut(),
2292 ) {
2293 let etag = resp
2294 .output
2295 .e_tag
2296 .clone()
2297 .map(ETag::into_value)
2298 .unwrap_or_default();
2299 let now = chrono::Utc::now();
2300 mgr.commit_put_with_version(
2301 &put_bucket,
2302 &put_key,
2303 crate::versioning::VersionEntry {
2304 version_id: pv.version_id.clone(),
2305 etag,
2306 size: 0,
2307 is_delete_marker: false,
2308 created_at: now,
2309 },
2310 );
2311 if pv.versioned_response {
2312 resp.output.version_id = Some(pv.version_id.clone());
2313 }
2314 }
2315 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2319 if explicit_lock_mode.is_some()
2320 || explicit_retain_until.is_some()
2321 || explicit_legal_hold_on.is_some()
2322 {
2323 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2324 if let Some(m) = explicit_lock_mode {
2325 state.mode = Some(m);
2326 }
2327 if let Some(u) = explicit_retain_until {
2328 state.retain_until = Some(u);
2329 }
2330 if let Some(lh) = explicit_legal_hold_on {
2331 state.legal_hold_on = lh;
2332 }
2333 mgr.set(&put_bucket, &put_key, state);
2334 }
2335 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2336 }
2337 if backend_resp.is_ok()
2341 && let Some(mgr) = self.notifications.as_ref()
2342 {
2343 let dests = mgr.match_destinations(
2344 &put_bucket,
2345 &crate::notifications::EventType::ObjectCreatedPut,
2346 &put_key,
2347 );
2348 if !dests.is_empty() {
2349 let etag = backend_resp
2350 .as_ref()
2351 .ok()
2352 .and_then(|r| r.output.e_tag.clone())
2353 .map(ETag::into_value);
2354 let version_id = pending_version
2355 .as_ref()
2356 .filter(|pv| pv.versioned_response)
2357 .map(|pv| pv.version_id.clone());
2358 tokio::spawn(crate::notifications::dispatch_event(
2359 Arc::clone(mgr),
2360 put_bucket.clone(),
2361 put_key.clone(),
2362 crate::notifications::EventType::ObjectCreatedPut,
2363 Some(0),
2364 etag,
2365 version_id,
2366 format!("S4-{}", uuid::Uuid::new_v4()),
2367 ));
2368 }
2369 }
2370 if backend_resp.is_ok()
2374 && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2375 {
2376 mgr.put_object_tags(&put_bucket, &put_key, tags);
2377 }
2378 self.spawn_replication_if_matched(
2381 &put_bucket,
2382 &put_key,
2383 &request_tags,
2384 &bytes::Bytes::new(),
2385 &None,
2386 backend_resp.is_ok(),
2387 );
2388 backend_resp
2389 }
2390
2391 #[tracing::instrument(
2393 name = "s4.get_object",
2394 skip(self, req),
2395 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2396 )]
2397 async fn get_object(
2398 &self,
2399 mut req: S3Request<GetObjectInput>,
2400 ) -> S3Result<S3Response<GetObjectOutput>> {
2401 let get_start = Instant::now();
2402 let get_bucket = req.input.bucket.clone();
2403 let get_key = req.input.key.clone();
2404 self.enforce_rate_limit(&req, &get_bucket)?;
2405 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2406 let range_request = req.input.range.take();
2408 let sse_c_alg = req.input.sse_customer_algorithm.take();
2414 let sse_c_key = req.input.sse_customer_key.take();
2415 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2416 let get_sse_c_material =
2417 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2418
2419 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2432 Some(mgr)
2433 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2434 {
2435 let req_vid = req.input.version_id.take();
2436 let entry = match req_vid.as_deref() {
2437 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2438 || S3Error::with_message(
2439 S3ErrorCode::NoSuchVersion,
2440 format!("no such version: {vid}"),
2441 ),
2442 )?,
2443 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2444 S3Error::with_message(
2445 S3ErrorCode::NoSuchKey,
2446 format!("no such key: {get_key}"),
2447 )
2448 })?,
2449 };
2450 if entry.is_delete_marker {
2451 return Err(S3Error::with_message(
2459 S3ErrorCode::NoSuchKey,
2460 format!("delete marker is the current version of {get_key}"),
2461 ));
2462 }
2463 if entry.version_id != crate::versioning::NULL_VERSION_ID {
2464 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2465 }
2466 Some(entry.version_id)
2467 }
2468 _ => None,
2469 };
2470
2471 if let Some(ref r) = range_request
2475 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2476 {
2477 let total = index.total_original_size();
2478 let (start, end_exclusive) = match resolve_range(r, total) {
2479 Ok(v) => v,
2480 Err(e) => {
2481 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2482 }
2483 };
2484 if let Some(plan) = index.lookup_range(start, end_exclusive) {
2485 return self
2486 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2487 .await;
2488 }
2489 }
2490 let mut resp = self.backend.get_object(req).await?;
2491 if let Some(ref vid) = resolved_version_id {
2496 resp.output.version_id = Some(vid.clone());
2497 }
2498 let is_multipart = is_multipart_object(&resp.output.metadata);
2499 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2500 let needs_frame_parse = is_multipart || is_framed_v2;
2503 let manifest_opt = extract_manifest(&resp.output.metadata);
2504
2505 if !needs_frame_parse && manifest_opt.is_none() {
2506 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2508 return Ok(resp);
2509 }
2510
2511 if let Some(blob) = resp.output.body.take() {
2512 let blob = if is_sse_encrypted(&resp.output.metadata) {
2520 let body = collect_blob(blob, self.max_body_bytes)
2521 .await
2522 .map_err(internal("collect SSE-encrypted body"))?;
2523 if matches!(crate::sse::peek_magic(&body), Some("S4E5"))
2542 && get_sse_c_material.is_none()
2543 {
2544 let keyring_arc = self.sse_keyring.clone().ok_or_else(|| {
2545 S3Error::with_message(
2546 S3ErrorCode::InvalidRequest,
2547 "object is SSE-S4 encrypted (S4E5) but no --sse-s4-key is configured on this gateway",
2548 )
2549 })?;
2550 let body_len = body.len() as u64;
2551 let stream =
2552 crate::sse::decrypt_chunked_stream(body, keyring_arc.as_ref());
2553 use futures::StreamExt;
2559 let mapped = stream.map(|r| {
2560 r.map_err(|e| std::io::Error::other(format!(
2561 "SSE-S4 chunked decrypt: {e}"
2562 )))
2563 });
2564 use s3s::dto::StreamingBlob;
2565 resp.output.body = Some(StreamingBlob::wrap(mapped));
2566 resp.output.content_length = None;
2572 resp.output.checksum_crc32 = None;
2579 resp.output.checksum_crc32c = None;
2580 resp.output.checksum_crc64nvme = None;
2581 resp.output.checksum_sha1 = None;
2582 resp.output.checksum_sha256 = None;
2583 resp.output.e_tag = None;
2584 let elapsed = get_start.elapsed();
2585 crate::metrics::record_get(
2586 "sse-s4-chunked",
2587 body_len,
2588 body_len,
2589 elapsed.as_secs_f64(),
2590 true,
2591 );
2592 return Ok(resp);
2593 }
2594 let plain = match crate::sse::peek_magic(&body) {
2595 Some("S4E4") => {
2596 let kms = self.kms.as_ref().ok_or_else(|| {
2597 S3Error::with_message(
2598 S3ErrorCode::InvalidRequest,
2599 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2600 )
2601 })?;
2602 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2603 crate::sse::decrypt_with_kms(&body, kms_ref)
2604 .await
2605 .map_err(|e| match e {
2606 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2607 other => S3Error::with_message(
2608 S3ErrorCode::InternalError,
2609 format!("SSE-KMS decrypt failed: {other}"),
2610 ),
2611 })?
2612 }
2613 _ => {
2614 if let Some(ref m) = get_sse_c_material {
2615 crate::sse::decrypt(
2616 &body,
2617 crate::sse::SseSource::CustomerKey {
2618 key: &m.key,
2619 key_md5: &m.key_md5,
2620 },
2621 )
2622 .map_err(sse_c_error_to_s3)?
2623 } else {
2624 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2625 S3Error::with_message(
2626 S3ErrorCode::InvalidRequest,
2627 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2628 )
2629 })?;
2630 crate::sse::decrypt(&body, keyring).map_err(|e| {
2631 S3Error::with_message(
2632 S3ErrorCode::InternalError,
2633 format!("SSE-S4 decrypt failed: {e}"),
2634 )
2635 })?
2636 }
2637 }
2638 };
2639 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2642 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2643 {
2644 resp.output.server_side_encryption = Some(
2645 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2646 );
2647 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2648 }
2649 bytes_to_blob(plain)
2650 } else if let Some(ref m) = get_sse_c_material {
2651 let _ = m;
2654 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2655 } else {
2656 blob
2657 };
2658 if let Some(ref m) = get_sse_c_material {
2661 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2662 resp.output.sse_customer_key_md5 = Some(
2663 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2664 );
2665 }
2666 if range_request.is_none()
2674 && !needs_frame_parse
2675 && let Some(ref m) = manifest_opt
2676 && supports_streaming_decompress(m.codec)
2677 && m.codec == CodecKind::CpuZstd
2678 {
2679 let decompressed_blob = cpu_zstd_decompress_stream(blob);
2680 resp.output.content_length = Some(m.original_size as i64);
2681 resp.output.checksum_crc32 = None;
2682 resp.output.checksum_crc32c = None;
2683 resp.output.checksum_crc64nvme = None;
2684 resp.output.checksum_sha1 = None;
2685 resp.output.checksum_sha256 = None;
2686 resp.output.e_tag = None;
2687 resp.output.body = Some(decompressed_blob);
2688 let elapsed = get_start.elapsed();
2689 crate::metrics::record_get(
2690 m.codec.as_str(),
2691 m.compressed_size,
2692 m.original_size,
2693 elapsed.as_secs_f64(),
2694 true,
2695 );
2696 info!(
2697 op = "get_object",
2698 bucket = %get_bucket,
2699 key = %get_key,
2700 codec = m.codec.as_str(),
2701 bytes_in = m.compressed_size,
2702 bytes_out = m.original_size,
2703 path = "streaming",
2704 setup_latency_ms = elapsed.as_millis() as u64,
2705 "S4 get started (streaming)"
2706 );
2707 return Ok(resp);
2708 }
2709 if range_request.is_none()
2711 && !needs_frame_parse
2712 && let Some(ref m) = manifest_opt
2713 && m.codec == CodecKind::Passthrough
2714 {
2715 resp.output.content_length = Some(m.original_size as i64);
2716 resp.output.checksum_crc32 = None;
2717 resp.output.checksum_crc32c = None;
2718 resp.output.checksum_crc64nvme = None;
2719 resp.output.checksum_sha1 = None;
2720 resp.output.checksum_sha256 = None;
2721 resp.output.e_tag = None;
2722 resp.output.body = Some(blob);
2723 debug!("S4 get_object: passthrough streaming");
2724 return Ok(resp);
2725 }
2726
2727 let bytes = collect_blob(blob, self.max_body_bytes)
2729 .await
2730 .map_err(internal("collect get body"))?;
2731
2732 let decompressed = if needs_frame_parse {
2733 self.decompress_multipart(bytes).await?
2736 } else {
2737 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2738 self.registry
2739 .decompress(bytes, manifest)
2740 .await
2741 .map_err(internal("registry decompress"))?
2742 };
2743
2744 let total_size = decompressed.len() as u64;
2746 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2747 let (start, end) = resolve_range(r, total_size)
2748 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2749 let sliced = decompressed.slice(start as usize..end as usize);
2750 resp.output.content_range = Some(format!(
2751 "bytes {start}-{}/{total_size}",
2752 end.saturating_sub(1)
2753 ));
2754 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2755 } else {
2756 (decompressed, None)
2757 };
2758 resp.output.content_length = Some(final_bytes.len() as i64);
2761 resp.output.checksum_crc32 = None;
2766 resp.output.checksum_crc32c = None;
2767 resp.output.checksum_crc64nvme = None;
2768 resp.output.checksum_sha1 = None;
2769 resp.output.checksum_sha256 = None;
2770 resp.output.e_tag = None;
2771 let returned_size = final_bytes.len() as u64;
2772 let codec_label = manifest_opt
2773 .as_ref()
2774 .map(|m| m.codec.as_str())
2775 .unwrap_or("multipart");
2776 resp.output.body = Some(bytes_to_blob(final_bytes));
2777 if let Some(status) = status_override {
2778 resp.status = Some(status);
2779 }
2780 let elapsed = get_start.elapsed();
2781 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2782 info!(
2783 op = "get_object",
2784 bucket = %get_bucket,
2785 key = %get_key,
2786 codec = codec_label,
2787 bytes_out = returned_size,
2788 total_object_size = total_size,
2789 range = range_request.is_some(),
2790 path = "buffered",
2791 latency_ms = elapsed.as_millis() as u64,
2792 "S4 get completed (buffered)"
2793 );
2794 }
2795 if let Some(mgr) = self.replication.as_ref()
2798 && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2799 {
2800 resp.output.replication_status =
2801 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2802 }
2803 Ok(resp)
2804 }
2805
2806 async fn head_bucket(
2808 &self,
2809 req: S3Request<HeadBucketInput>,
2810 ) -> S3Result<S3Response<HeadBucketOutput>> {
2811 self.backend.head_bucket(req).await
2812 }
2813 async fn list_buckets(
2814 &self,
2815 req: S3Request<ListBucketsInput>,
2816 ) -> S3Result<S3Response<ListBucketsOutput>> {
2817 self.backend.list_buckets(req).await
2818 }
2819 async fn create_bucket(
2820 &self,
2821 req: S3Request<CreateBucketInput>,
2822 ) -> S3Result<S3Response<CreateBucketOutput>> {
2823 self.backend.create_bucket(req).await
2824 }
2825 async fn delete_bucket(
2826 &self,
2827 req: S3Request<DeleteBucketInput>,
2828 ) -> S3Result<S3Response<DeleteBucketOutput>> {
2829 self.backend.delete_bucket(req).await
2830 }
2831 async fn head_object(
2832 &self,
2833 req: S3Request<HeadObjectInput>,
2834 ) -> S3Result<S3Response<HeadObjectOutput>> {
2835 let head_bucket = req.input.bucket.clone();
2838 let head_key = req.input.key.clone();
2839 let mut resp = self.backend.head_object(req).await?;
2840 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2841 resp.output.content_length = Some(manifest.original_size as i64);
2845 resp.output.checksum_crc32 = None;
2846 resp.output.checksum_crc32c = None;
2847 resp.output.checksum_crc64nvme = None;
2848 resp.output.checksum_sha1 = None;
2849 resp.output.checksum_sha256 = None;
2850 resp.output.e_tag = None;
2851 }
2852 if let Some(mgr) = self.replication.as_ref()
2855 && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2856 {
2857 resp.output.replication_status =
2858 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2859 }
2860 if let Some(meta) = resp.output.metadata.as_ref()
2865 && let Some(sse_type) = meta.get("s4-sse-type")
2866 {
2867 {
2868 match sse_type.as_str() {
2869 "aws:kms" => {
2870 resp.output.server_side_encryption = Some(
2871 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2872 );
2873 if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2874 resp.output.ssekms_key_id = Some(key_id.clone());
2875 }
2876 }
2877 _ => {
2878 resp.output.server_side_encryption = Some(
2879 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2880 );
2881 if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2882 resp.output.sse_customer_algorithm =
2883 Some(crate::sse::SSE_C_ALGORITHM.into());
2884 resp.output.sse_customer_key_md5 = Some(md5.clone());
2885 }
2886 }
2887 }
2888 }
2889 }
2890 Ok(resp)
2891 }
2892 async fn delete_object(
2893 &self,
2894 mut req: S3Request<DeleteObjectInput>,
2895 ) -> S3Result<S3Response<DeleteObjectOutput>> {
2896 let bucket = req.input.bucket.clone();
2897 let key = req.input.key.clone();
2898 self.enforce_rate_limit(&req, &bucket)?;
2899 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2900 if let Some(mgr) = self.mfa_delete.as_ref()
2907 && mgr.is_enabled(&bucket)
2908 {
2909 let header = req.input.mfa.as_deref();
2910 if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2911 crate::metrics::record_mfa_delete_denial(&bucket);
2912 return Err(mfa_error_to_s3(e));
2913 }
2914 }
2915 if let Some(mgr) = self.object_lock.as_ref()
2923 && let Some(state) = mgr.get(&bucket, &key)
2924 {
2925 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2926 let now = chrono::Utc::now();
2927 if !state.can_delete(now, bypass) {
2928 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2929 return Err(S3Error::with_message(
2930 S3ErrorCode::AccessDenied,
2931 "Access Denied because object protected by object lock",
2932 ));
2933 }
2934 }
2935 if let Some(mgr) = self.versioning.as_ref() {
2951 let state = mgr.state(&bucket);
2952 if state != crate::versioning::VersioningState::Unversioned {
2953 let req_vid = req.input.version_id.take();
2954 if let Some(vid) = req_vid {
2955 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2959 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2960 key.clone()
2961 } else {
2962 versioned_shadow_key(&key, &vid)
2963 };
2964 let was_real_version = outcome
2965 .as_ref()
2966 .map(|o| !o.is_delete_marker)
2967 .unwrap_or(false);
2968 if was_real_version {
2969 let backend_input = DeleteObjectInput {
2973 bucket: bucket.clone(),
2974 key: backend_target,
2975 ..Default::default()
2976 };
2977 let backend_req = S3Request {
2978 input: backend_input,
2979 method: http::Method::DELETE,
2980 uri: req.uri.clone(),
2981 headers: req.headers.clone(),
2982 extensions: http::Extensions::new(),
2983 credentials: req.credentials.clone(),
2984 region: req.region.clone(),
2985 service: req.service.clone(),
2986 trailing_headers: None,
2987 };
2988 let _ = self.backend.delete_object(backend_req).await;
2989 }
2990 let mut output = DeleteObjectOutput {
2991 version_id: Some(vid.clone()),
2992 ..Default::default()
2993 };
2994 if let Some(o) = outcome.as_ref()
2995 && o.is_delete_marker
2996 {
2997 output.delete_marker = Some(true);
2998 }
2999 self.fire_delete_notification(
3003 &bucket,
3004 &key,
3005 crate::notifications::EventType::ObjectRemovedDelete,
3006 Some(vid.clone()),
3007 );
3008 return Ok(S3Response::new(output));
3009 }
3010 let outcome = mgr.record_delete(&bucket, &key);
3012 if state == crate::versioning::VersioningState::Suspended {
3013 let backend_input = DeleteObjectInput {
3016 bucket: bucket.clone(),
3017 key: key.clone(),
3018 ..Default::default()
3019 };
3020 let backend_req = S3Request {
3021 input: backend_input,
3022 method: http::Method::DELETE,
3023 uri: req.uri.clone(),
3024 headers: req.headers.clone(),
3025 extensions: http::Extensions::new(),
3026 credentials: req.credentials.clone(),
3027 region: req.region.clone(),
3028 service: req.service.clone(),
3029 trailing_headers: None,
3030 };
3031 let _ = self.backend.delete_object(backend_req).await;
3032 }
3033 let output = DeleteObjectOutput {
3034 delete_marker: Some(true),
3035 version_id: outcome.version_id.clone(),
3036 ..Default::default()
3037 };
3038 self.fire_delete_notification(
3043 &bucket,
3044 &key,
3045 crate::notifications::EventType::ObjectRemovedDeleteMarker,
3046 outcome.version_id,
3047 );
3048 return Ok(S3Response::new(output));
3049 }
3050 }
3051 let resp = self.backend.delete_object(req).await?;
3054 if let Some(mgr) = self.object_lock.as_ref() {
3059 mgr.clear(&bucket, &key);
3060 }
3061 if let Some(mgr) = self.tagging.as_ref() {
3067 mgr.delete_object_tags(&bucket, &key);
3068 }
3069 let sidecar = sidecar_key(&key);
3070 if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
3075 let sidecar_input = DeleteObjectInput {
3076 bucket: bucket.clone(),
3077 key: sidecar,
3078 ..Default::default()
3079 };
3080 let sidecar_req = S3Request {
3081 input: sidecar_input,
3082 method: http::Method::DELETE,
3083 uri,
3084 headers: http::HeaderMap::new(),
3085 extensions: http::Extensions::new(),
3086 credentials: None,
3087 region: None,
3088 service: None,
3089 trailing_headers: None,
3090 };
3091 let _ = self.backend.delete_object(sidecar_req).await;
3092 }
3093 self.fire_delete_notification(
3096 &bucket,
3097 &key,
3098 crate::notifications::EventType::ObjectRemovedDelete,
3099 None,
3100 );
3101 Ok(resp)
3102 }
3103 async fn delete_objects(
3104 &self,
3105 req: S3Request<DeleteObjectsInput>,
3106 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
3107 if let Some(mgr) = self.mfa_delete.as_ref()
3111 && mgr.is_enabled(&req.input.bucket)
3112 {
3113 let header = req.input.mfa.as_deref();
3114 if let Err(e) =
3115 crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
3116 {
3117 crate::metrics::record_mfa_delete_denial(&req.input.bucket);
3118 return Err(mfa_error_to_s3(e));
3119 }
3120 }
3121 self.backend.delete_objects(req).await
3122 }
3123 async fn copy_object(
3124 &self,
3125 mut req: S3Request<CopyObjectInput>,
3126 ) -> S3Result<S3Response<CopyObjectOutput>> {
3127 let dst_bucket = req.input.bucket.clone();
3129 let dst_key = req.input.key.clone();
3130 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
3131 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3132 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
3133 }
3134 let needs_merge = req
3144 .input
3145 .metadata_directive
3146 .as_ref()
3147 .map(|d| d.as_str() == MetadataDirective::REPLACE)
3148 .unwrap_or(false);
3149 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3150 let head_input = HeadObjectInput {
3151 bucket: bucket.to_string(),
3152 key: key.to_string(),
3153 ..Default::default()
3154 };
3155 let head_req = S3Request {
3156 input: head_input,
3157 method: req.method.clone(),
3158 uri: req.uri.clone(),
3159 headers: req.headers.clone(),
3160 extensions: http::Extensions::new(),
3161 credentials: req.credentials.clone(),
3162 region: req.region.clone(),
3163 service: req.service.clone(),
3164 trailing_headers: None,
3165 };
3166 if let Ok(head) = self.backend.head_object(head_req).await
3167 && let Some(src_meta) = head.output.metadata.as_ref()
3168 {
3169 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3170 for key in [
3171 META_CODEC,
3172 META_ORIGINAL_SIZE,
3173 META_COMPRESSED_SIZE,
3174 META_CRC32C,
3175 META_MULTIPART,
3176 META_FRAMED,
3177 ] {
3178 if let Some(v) = src_meta.get(key) {
3179 dest_meta
3182 .entry(key.to_string())
3183 .or_insert_with(|| v.clone());
3184 }
3185 }
3186 debug!(
3187 src_bucket = %bucket,
3188 src_key = %key,
3189 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3190 );
3191 }
3192 }
3193 self.backend.copy_object(req).await
3194 }
3195 async fn list_objects(
3196 &self,
3197 req: S3Request<ListObjectsInput>,
3198 ) -> S3Result<S3Response<ListObjectsOutput>> {
3199 self.enforce_rate_limit(&req, &req.input.bucket)?;
3200 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3201 let mut resp = self.backend.list_objects(req).await?;
3202 if let Some(contents) = resp.output.contents.as_mut() {
3205 contents.retain(|o| {
3206 o.key
3207 .as_ref()
3208 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3209 .unwrap_or(true)
3210 });
3211 }
3212 Ok(resp)
3213 }
3214 async fn list_objects_v2(
3215 &self,
3216 req: S3Request<ListObjectsV2Input>,
3217 ) -> S3Result<S3Response<ListObjectsV2Output>> {
3218 self.enforce_rate_limit(&req, &req.input.bucket)?;
3219 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3220 let mut resp = self.backend.list_objects_v2(req).await?;
3221 if let Some(contents) = resp.output.contents.as_mut() {
3222 let before = contents.len();
3223 contents.retain(|o| {
3224 o.key
3225 .as_ref()
3226 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3227 .unwrap_or(true)
3228 });
3229 if let Some(kc) = resp.output.key_count.as_mut() {
3231 *kc -= (before - contents.len()) as i32;
3232 }
3233 }
3234 Ok(resp)
3235 }
3236 async fn list_object_versions(
3244 &self,
3245 req: S3Request<ListObjectVersionsInput>,
3246 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3247 self.enforce_rate_limit(&req, &req.input.bucket)?;
3248 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3249 if let Some(mgr) = self.versioning.as_ref()
3251 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3252 {
3253 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3254 let page = mgr.list_versions(
3255 &req.input.bucket,
3256 req.input.prefix.as_deref(),
3257 req.input.key_marker.as_deref(),
3258 req.input.version_id_marker.as_deref(),
3259 max_keys,
3260 );
3261 let versions: Vec<ObjectVersion> = page
3262 .versions
3263 .into_iter()
3264 .map(|e| ObjectVersion {
3265 key: Some(e.key),
3266 version_id: Some(e.version_id),
3267 is_latest: Some(e.is_latest),
3268 e_tag: Some(ETag::Strong(e.etag)),
3269 size: Some(e.size as i64),
3270 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3271 ..Default::default()
3272 })
3273 .collect();
3274 let delete_markers: Vec<DeleteMarkerEntry> = page
3275 .delete_markers
3276 .into_iter()
3277 .map(|e| DeleteMarkerEntry {
3278 key: Some(e.key),
3279 version_id: Some(e.version_id),
3280 is_latest: Some(e.is_latest),
3281 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3282 ..Default::default()
3283 })
3284 .collect();
3285 let output = ListObjectVersionsOutput {
3286 name: Some(req.input.bucket.clone()),
3287 prefix: req.input.prefix.clone(),
3288 key_marker: req.input.key_marker.clone(),
3289 version_id_marker: req.input.version_id_marker.clone(),
3290 max_keys: req.input.max_keys,
3291 versions: if versions.is_empty() {
3292 None
3293 } else {
3294 Some(versions)
3295 },
3296 delete_markers: if delete_markers.is_empty() {
3297 None
3298 } else {
3299 Some(delete_markers)
3300 },
3301 is_truncated: Some(page.is_truncated),
3302 next_key_marker: page.next_key_marker,
3303 next_version_id_marker: page.next_version_id_marker,
3304 ..Default::default()
3305 };
3306 return Ok(S3Response::new(output));
3307 }
3308 let mut resp = self.backend.list_object_versions(req).await?;
3310 if let Some(versions) = resp.output.versions.as_mut() {
3311 versions.retain(|v| {
3312 v.key
3313 .as_ref()
3314 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3315 .unwrap_or(true)
3316 });
3317 }
3318 if let Some(markers) = resp.output.delete_markers.as_mut() {
3319 markers.retain(|m| {
3320 m.key
3321 .as_ref()
3322 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3323 .unwrap_or(true)
3324 });
3325 }
3326 Ok(resp)
3327 }
3328
3329 async fn create_multipart_upload(
3330 &self,
3331 mut req: S3Request<CreateMultipartUploadInput>,
3332 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3333 let codec_kind = self.registry.default_kind();
3337 let meta = req.input.metadata.get_or_insert_with(Default::default);
3338 meta.insert(META_MULTIPART.into(), "true".into());
3339 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3340 let sse_c_alg = req.input.sse_customer_algorithm.take();
3348 let sse_c_key = req.input.sse_customer_key.take();
3349 let sse_c_md5 = req.input.sse_customer_key_md5.take();
3350 let sse_header = req.input.server_side_encryption.take();
3351 let sse_kms_key = req.input.ssekms_key_id.take();
3352 let _ = req.input.ssekms_encryption_context.take();
3355 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
3356 let kms_key_id = extract_kms_key_id(
3357 &sse_header,
3358 &sse_kms_key,
3359 self.kms_default_key_id.as_deref(),
3360 );
3361 if sse_c_material.is_some() && kms_key_id.is_some() {
3363 return Err(S3Error::with_message(
3364 S3ErrorCode::InvalidArgument,
3365 "SSE-C and SSE-KMS cannot be used together on the same multipart upload",
3366 ));
3367 }
3368 let sse_mode = if let Some(ref m) = sse_c_material {
3369 crate::multipart_state::MultipartSseMode::SseC {
3370 key: m.key,
3371 key_md5: m.key_md5,
3372 }
3373 } else if let Some(ref kid) = kms_key_id {
3374 if self.kms.is_none() {
3378 return Err(S3Error::with_message(
3379 S3ErrorCode::InvalidRequest,
3380 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3381 ));
3382 }
3383 crate::multipart_state::MultipartSseMode::SseKms { key_id: kid.clone() }
3384 } else if self.sse_keyring.is_some() {
3385 crate::multipart_state::MultipartSseMode::SseS4
3389 } else {
3390 crate::multipart_state::MultipartSseMode::None
3391 };
3392 let request_tags: Option<crate::tagging::TagSet> = req
3396 .input
3397 .tagging
3398 .as_deref()
3399 .map(crate::tagging::parse_tagging_header)
3400 .transpose()
3401 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
3402 let _ = req.input.tagging.take();
3406 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
3408 .input
3409 .object_lock_mode
3410 .as_ref()
3411 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3412 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
3413 .input
3414 .object_lock_retain_until_date
3415 .as_ref()
3416 .and_then(timestamp_to_chrono_utc);
3417 let explicit_legal_hold_on: bool = req
3418 .input
3419 .object_lock_legal_hold_status
3420 .as_ref()
3421 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3422 .unwrap_or(false);
3423 let bucket = req.input.bucket.clone();
3424 let key = req.input.key.clone();
3425 debug!(
3426 bucket = %bucket,
3427 key = %key,
3428 codec = codec_kind.as_str(),
3429 sse = ?sse_mode,
3430 "S4 create_multipart_upload: marking object for per-part compression"
3431 );
3432 let mut resp = self.backend.create_multipart_upload(req).await?;
3433 if let Some(upload_id) = resp.output.upload_id.as_ref() {
3436 self.multipart_state.put(
3437 upload_id,
3438 crate::multipart_state::MultipartUploadContext {
3439 bucket,
3440 key,
3441 sse: sse_mode.clone(),
3442 tags: request_tags,
3443 object_lock_mode: explicit_lock_mode,
3444 object_lock_retain_until: explicit_retain_until,
3445 object_lock_legal_hold: explicit_legal_hold_on,
3446 },
3447 );
3448 }
3449 match &sse_mode {
3451 crate::multipart_state::MultipartSseMode::SseC { key_md5, .. } => {
3452 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
3453 resp.output.sse_customer_key_md5 = Some(
3454 base64::engine::general_purpose::STANDARD.encode(key_md5),
3455 );
3456 }
3457 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3458 resp.output.server_side_encryption = Some(
3459 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3460 );
3461 resp.output.ssekms_key_id = Some(key_id.clone());
3462 }
3463 _ => {}
3464 }
3465 Ok(resp)
3466 }
3467
3468 async fn upload_part(
3469 &self,
3470 mut req: S3Request<UploadPartInput>,
3471 ) -> S3Result<S3Response<UploadPartOutput>> {
3472 let _sse_ctx = self
3491 .multipart_state
3492 .get(req.input.upload_id.as_str());
3493 let _ = req.input.sse_customer_algorithm.take();
3503 let _ = req.input.sse_customer_key.take();
3504 let _ = req.input.sse_customer_key_md5.take();
3505 if let Some(blob) = req.input.body.take() {
3506 let bytes = collect_blob(blob, self.max_body_bytes)
3507 .await
3508 .map_err(internal("collect upload_part body"))?;
3509 let sample_len = bytes.len().min(SAMPLE_BYTES);
3510 let codec_kind = self
3514 .dispatcher
3515 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
3516 .await;
3517 let original_size = bytes.len() as u64;
3518 let (compress_res, tel) = self
3520 .registry
3521 .compress_with_telemetry(bytes, codec_kind)
3522 .await;
3523 stamp_gpu_compress_telemetry(&tel);
3524 let (compressed, manifest) = compress_res.map_err(internal("registry compress part"))?;
3525 let header = FrameHeader {
3526 codec: codec_kind,
3527 original_size,
3528 compressed_size: compressed.len() as u64,
3529 crc32c: manifest.crc32c,
3530 };
3531 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3532 write_frame(&mut framed, header, &compressed);
3533 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3547 if !likely_final {
3548 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3549 }
3550 let framed_bytes = framed.freeze();
3551 let new_len = framed_bytes.len() as i64;
3552 req.input.content_length = Some(new_len);
3554 req.input.checksum_algorithm = None;
3555 req.input.checksum_crc32 = None;
3556 req.input.checksum_crc32c = None;
3557 req.input.checksum_crc64nvme = None;
3558 req.input.checksum_sha1 = None;
3559 req.input.checksum_sha256 = None;
3560 req.input.content_md5 = None;
3561 req.input.body = Some(bytes_to_blob(framed_bytes));
3562 debug!(
3563 part_number = ?req.input.part_number,
3564 upload_id = ?req.input.upload_id,
3565 original_size,
3566 framed_size = new_len,
3567 "S4 upload_part: framed compressed payload"
3568 );
3569 }
3570 self.backend.upload_part(req).await
3571 }
3572 async fn complete_multipart_upload(
3573 &self,
3574 mut req: S3Request<CompleteMultipartUploadInput>,
3575 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3576 let bucket = req.input.bucket.clone();
3577 let key = req.input.key.clone();
3578 let upload_id = req.input.upload_id.clone();
3579 let ctx = self.multipart_state.get(upload_id.as_str());
3587 let _ = req.input.sse_customer_algorithm.take();
3592 let _ = req.input.sse_customer_key.take();
3593 let _ = req.input.sse_customer_key_md5.take();
3594 let mut resp = self.backend.complete_multipart_upload(req).await?;
3595 let assembled_body: Option<bytes::Bytes> =
3606 if let Ok(uri) = safe_object_uri(&bucket, &key) {
3607 let get_input = GetObjectInput {
3608 bucket: bucket.clone(),
3609 key: key.clone(),
3610 ..Default::default()
3611 };
3612 let get_req = S3Request {
3613 input: get_input,
3614 method: http::Method::GET,
3615 uri,
3616 headers: http::HeaderMap::new(),
3617 extensions: http::Extensions::new(),
3618 credentials: None,
3619 region: None,
3620 service: None,
3621 trailing_headers: None,
3622 };
3623 match self.backend.get_object(get_req).await {
3624 Ok(get_resp) => match get_resp.output.body {
3625 Some(blob) => collect_blob(blob, self.max_body_bytes).await.ok(),
3626 None => None,
3627 },
3628 Err(_) => None,
3629 }
3630 } else {
3631 None
3632 };
3633 if let Some(ref body) = assembled_body
3635 && let Ok(index) = build_index_from_body(body)
3636 {
3637 self.write_sidecar(&bucket, &key, &index).await;
3638 }
3639 if let Some(ctx) = ctx {
3643 let pending_version: Option<crate::versioning::PutOutcome> = self
3650 .versioning
3651 .as_ref()
3652 .map(|mgr| mgr.state(&bucket))
3653 .map(|state| match state {
3654 crate::versioning::VersioningState::Enabled => {
3655 crate::versioning::PutOutcome {
3656 version_id: crate::versioning::VersioningManager::new_version_id(),
3657 versioned_response: true,
3658 }
3659 }
3660 crate::versioning::VersioningState::Suspended
3661 | crate::versioning::VersioningState::Unversioned => {
3662 crate::versioning::PutOutcome {
3663 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
3664 versioned_response: false,
3665 }
3666 }
3667 });
3668 let needs_re_put = matches!(
3685 ctx.sse,
3686 crate::multipart_state::MultipartSseMode::SseS4
3687 | crate::multipart_state::MultipartSseMode::SseC { .. }
3688 | crate::multipart_state::MultipartSseMode::SseKms { .. }
3689 ) || pending_version
3690 .as_ref()
3691 .map(|pv| pv.versioned_response)
3692 .unwrap_or(false);
3693 let replication_body = assembled_body.clone();
3696 let mut applied_metadata: Option<std::collections::HashMap<String, String>> = None;
3697 if needs_re_put && let Some(body) = assembled_body {
3698 let kms_wrap = if let crate::multipart_state::MultipartSseMode::SseKms {
3699 ref key_id,
3700 } = ctx.sse
3701 {
3702 let kms = self.kms.as_ref().ok_or_else(|| {
3703 S3Error::with_message(
3704 S3ErrorCode::InvalidRequest,
3705 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3706 )
3707 })?;
3708 let (dek, wrapped) = kms
3709 .generate_dek(key_id)
3710 .await
3711 .map_err(kms_error_to_s3)?;
3712 if dek.len() != 32 {
3713 return Err(S3Error::with_message(
3714 S3ErrorCode::InternalError,
3715 format!(
3716 "KMS backend returned a DEK of {} bytes (expected 32)",
3717 dek.len()
3718 ),
3719 ));
3720 }
3721 let mut dek_arr = [0u8; 32];
3722 dek_arr.copy_from_slice(&dek);
3723 Some((dek_arr, wrapped))
3724 } else {
3725 None
3726 };
3727 let head_req = S3Request {
3732 input: HeadObjectInput {
3733 bucket: bucket.clone(),
3734 key: key.clone(),
3735 ..Default::default()
3736 },
3737 method: http::Method::HEAD,
3738 uri: safe_object_uri(&bucket, &key)?,
3739 headers: http::HeaderMap::new(),
3740 extensions: http::Extensions::new(),
3741 credentials: None,
3742 region: None,
3743 service: None,
3744 trailing_headers: None,
3745 };
3746 let mut new_metadata: std::collections::HashMap<String, String> =
3747 match self.backend.head_object(head_req).await {
3748 Ok(h) => h.output.metadata.unwrap_or_default(),
3749 Err(_) => std::collections::HashMap::new(),
3750 };
3751 let new_body = match &ctx.sse {
3752 crate::multipart_state::MultipartSseMode::SseC { key, key_md5 } => {
3753 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3754 new_metadata.insert("s4-sse-type".into(), "AES256".into());
3755 new_metadata.insert(
3756 "s4-sse-c-key-md5".into(),
3757 base64::engine::general_purpose::STANDARD.encode(key_md5),
3758 );
3759 crate::sse::encrypt_with_source(
3760 &body,
3761 crate::sse::SseSource::CustomerKey { key, key_md5 },
3762 )
3763 }
3764 crate::multipart_state::MultipartSseMode::SseKms { .. } => {
3765 let (dek, wrapped) = kms_wrap
3766 .as_ref()
3767 .expect("SseKms branch implies kms_wrap is Some");
3768 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3769 new_metadata.insert("s4-sse-type".into(), "aws:kms".into());
3770 new_metadata
3771 .insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
3772 crate::sse::encrypt_with_source(
3773 &body,
3774 crate::sse::SseSource::Kms { dek, wrapped },
3775 )
3776 }
3777 crate::multipart_state::MultipartSseMode::SseS4 => {
3778 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
3779 S3Error::with_message(
3780 S3ErrorCode::InternalError,
3781 "SSE-S4 captured at Create but keyring missing at Complete",
3782 )
3783 })?;
3784 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3785 if self.sse_chunk_size > 0 {
3795 crate::sse::encrypt_v2_chunked(
3796 &body,
3797 keyring,
3798 self.sse_chunk_size,
3799 )
3800 .map_err(|e| {
3801 S3Error::with_message(
3802 S3ErrorCode::InternalError,
3803 format!(
3804 "SSE-S4 chunked encrypt failed at Complete: {e}"
3805 ),
3806 )
3807 })?
3808 } else {
3809 crate::sse::encrypt_v2(&body, keyring)
3810 }
3811 }
3812 crate::multipart_state::MultipartSseMode::None => body.clone(),
3813 };
3814 let put_target_key = if let Some(pv) = pending_version.as_ref() {
3821 if pv.versioned_response {
3822 versioned_shadow_key(&key, &pv.version_id)
3823 } else {
3824 key.clone()
3825 }
3826 } else {
3827 key.clone()
3828 };
3829 let new_body_len = new_body.len() as i64;
3830 let put_req = S3Request {
3831 input: PutObjectInput {
3832 bucket: bucket.clone(),
3833 key: put_target_key.clone(),
3834 body: Some(bytes_to_blob(new_body.clone())),
3835 metadata: Some(new_metadata.clone()),
3836 content_length: Some(new_body_len),
3837 ..Default::default()
3838 },
3839 method: http::Method::PUT,
3840 uri: safe_object_uri(&bucket, &put_target_key)?,
3841 headers: http::HeaderMap::new(),
3842 extensions: http::Extensions::new(),
3843 credentials: None,
3844 region: None,
3845 service: None,
3846 trailing_headers: None,
3847 };
3848 self.backend.put_object(put_req).await?;
3849 if put_target_key != key {
3854 let del_req = S3Request {
3855 input: DeleteObjectInput {
3856 bucket: bucket.clone(),
3857 key: key.clone(),
3858 ..Default::default()
3859 },
3860 method: http::Method::DELETE,
3861 uri: safe_object_uri(&bucket, &key)?,
3862 headers: http::HeaderMap::new(),
3863 extensions: http::Extensions::new(),
3864 credentials: None,
3865 region: None,
3866 service: None,
3867 trailing_headers: None,
3868 };
3869 let _ = self.backend.delete_object(del_req).await;
3870 }
3871 applied_metadata = Some(new_metadata);
3872 }
3873 if let (Some(mgr), Some(pv)) = (self.versioning.as_ref(), pending_version.as_ref()) {
3877 let etag = resp
3878 .output
3879 .e_tag
3880 .clone()
3881 .map(ETag::into_value)
3882 .unwrap_or_default();
3883 let now = chrono::Utc::now();
3884 mgr.commit_put_with_version(
3885 &bucket,
3886 &key,
3887 crate::versioning::VersionEntry {
3888 version_id: pv.version_id.clone(),
3889 etag,
3890 size: replication_body
3891 .as_ref()
3892 .map(|b| b.len() as u64)
3893 .unwrap_or(0),
3894 is_delete_marker: false,
3895 created_at: now,
3896 },
3897 );
3898 if pv.versioned_response {
3899 resp.output.version_id = Some(pv.version_id.clone());
3900 }
3901 }
3902 if let Some(mgr) = self.object_lock.as_ref() {
3906 if ctx.object_lock_mode.is_some()
3907 || ctx.object_lock_retain_until.is_some()
3908 || ctx.object_lock_legal_hold
3909 {
3910 let mut state = mgr.get(&bucket, &key).unwrap_or_default();
3911 if let Some(m) = ctx.object_lock_mode {
3912 state.mode = Some(m);
3913 }
3914 if let Some(u) = ctx.object_lock_retain_until {
3915 state.retain_until = Some(u);
3916 }
3917 if ctx.object_lock_legal_hold {
3918 state.legal_hold_on = true;
3919 }
3920 mgr.set(&bucket, &key, state);
3921 }
3922 mgr.apply_default_on_put(&bucket, &key, chrono::Utc::now());
3923 }
3924 if let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), ctx.tags.as_ref()) {
3927 mgr.put_object_tags(&bucket, &key, tags.clone());
3928 }
3929 match &ctx.sse {
3934 crate::multipart_state::MultipartSseMode::SseC { .. } => {
3935 resp.output.server_side_encryption = Some(
3936 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
3937 );
3938 }
3939 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3940 resp.output.server_side_encryption = Some(
3941 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3942 );
3943 resp.output.ssekms_key_id = Some(key_id.clone());
3944 }
3945 _ => {}
3946 }
3947 let replication_body_bytes = replication_body.unwrap_or_default();
3954 self.spawn_replication_if_matched(
3955 &bucket,
3956 &key,
3957 &ctx.tags,
3958 &replication_body_bytes,
3959 &applied_metadata,
3960 true,
3961 );
3962 self.multipart_state.remove(upload_id.as_str());
3963 }
3964 Ok(resp)
3965 }
3966 async fn abort_multipart_upload(
3967 &self,
3968 req: S3Request<AbortMultipartUploadInput>,
3969 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
3970 self.multipart_state.remove(req.input.upload_id.as_str());
3974 self.backend.abort_multipart_upload(req).await
3975 }
3976 async fn list_multipart_uploads(
3977 &self,
3978 req: S3Request<ListMultipartUploadsInput>,
3979 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
3980 self.backend.list_multipart_uploads(req).await
3981 }
3982 async fn list_parts(
3983 &self,
3984 req: S3Request<ListPartsInput>,
3985 ) -> S3Result<S3Response<ListPartsOutput>> {
3986 self.backend.list_parts(req).await
3987 }
3988
3989 async fn get_object_acl(
4005 &self,
4006 req: S3Request<GetObjectAclInput>,
4007 ) -> S3Result<S3Response<GetObjectAclOutput>> {
4008 self.backend.get_object_acl(req).await
4009 }
4010 async fn put_object_acl(
4011 &self,
4012 req: S3Request<PutObjectAclInput>,
4013 ) -> S3Result<S3Response<PutObjectAclOutput>> {
4014 self.backend.put_object_acl(req).await
4015 }
4016 async fn get_object_tagging(
4022 &self,
4023 req: S3Request<GetObjectTaggingInput>,
4024 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
4025 let Some(mgr) = self.tagging.as_ref() else {
4026 return self.backend.get_object_tagging(req).await;
4027 };
4028 let tags = mgr
4029 .get_object_tags(&req.input.bucket, &req.input.key)
4030 .unwrap_or_default();
4031 Ok(S3Response::new(GetObjectTaggingOutput {
4032 tag_set: tagset_to_aws(&tags),
4033 ..Default::default()
4034 }))
4035 }
4036 async fn put_object_tagging(
4037 &self,
4038 req: S3Request<PutObjectTaggingInput>,
4039 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
4040 let Some(mgr) = self.tagging.as_ref() else {
4041 return self.backend.put_object_tagging(req).await;
4042 };
4043 let bucket = req.input.bucket.clone();
4044 let key = req.input.key.clone();
4045 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4046 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4047 })?;
4048 let existing = mgr.get_object_tags(&bucket, &key);
4052 self.enforce_policy_with_extra(
4053 &req,
4054 "s3:PutObjectTagging",
4055 &bucket,
4056 Some(&key),
4057 Some(&parsed),
4058 existing.as_ref(),
4059 )?;
4060 mgr.put_object_tags(&bucket, &key, parsed);
4061 Ok(S3Response::new(PutObjectTaggingOutput::default()))
4062 }
4063 async fn delete_object_tagging(
4064 &self,
4065 req: S3Request<DeleteObjectTaggingInput>,
4066 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
4067 let Some(mgr) = self.tagging.as_ref() else {
4068 return self.backend.delete_object_tagging(req).await;
4069 };
4070 let bucket = req.input.bucket.clone();
4071 let key = req.input.key.clone();
4072 let existing = mgr.get_object_tags(&bucket, &key);
4073 self.enforce_policy_with_extra(
4074 &req,
4075 "s3:DeleteObjectTagging",
4076 &bucket,
4077 Some(&key),
4078 None,
4079 existing.as_ref(),
4080 )?;
4081 mgr.delete_object_tags(&bucket, &key);
4082 Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
4083 }
4084 async fn get_object_attributes(
4085 &self,
4086 req: S3Request<GetObjectAttributesInput>,
4087 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
4088 self.backend.get_object_attributes(req).await
4089 }
4090 async fn restore_object(
4091 &self,
4092 req: S3Request<RestoreObjectInput>,
4093 ) -> S3Result<S3Response<RestoreObjectOutput>> {
4094 self.backend.restore_object(req).await
4095 }
4096 async fn upload_part_copy(
4097 &self,
4098 req: S3Request<UploadPartCopyInput>,
4099 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
4100 let CopySource::Bucket {
4111 bucket: src_bucket,
4112 key: src_key,
4113 ..
4114 } = &req.input.copy_source
4115 else {
4116 return self.backend.upload_part_copy(req).await;
4117 };
4118 let src_bucket = src_bucket.to_string();
4119 let src_key = src_key.to_string();
4120
4121 let head_input = HeadObjectInput {
4123 bucket: src_bucket.clone(),
4124 key: src_key.clone(),
4125 ..Default::default()
4126 };
4127 let head_req = S3Request {
4128 input: head_input,
4129 method: http::Method::HEAD,
4130 uri: req.uri.clone(),
4131 headers: req.headers.clone(),
4132 extensions: http::Extensions::new(),
4133 credentials: req.credentials.clone(),
4134 region: req.region.clone(),
4135 service: req.service.clone(),
4136 trailing_headers: None,
4137 };
4138 let needs_s4_copy = match self.backend.head_object(head_req).await {
4139 Ok(h) => {
4140 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
4141 }
4142 Err(_) => false,
4143 };
4144 if !needs_s4_copy {
4145 return self.backend.upload_part_copy(req).await;
4146 }
4147
4148 let source_range = req
4150 .input
4151 .copy_source_range
4152 .as_ref()
4153 .map(|r| parse_copy_source_range(r))
4154 .transpose()
4155 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
4156
4157 let mut get_input = GetObjectInput {
4161 bucket: src_bucket.clone(),
4162 key: src_key.clone(),
4163 ..Default::default()
4164 };
4165 get_input.range = source_range;
4166 let get_req = S3Request {
4167 input: get_input,
4168 method: http::Method::GET,
4169 uri: req.uri.clone(),
4170 headers: req.headers.clone(),
4171 extensions: http::Extensions::new(),
4172 credentials: req.credentials.clone(),
4173 region: req.region.clone(),
4174 service: req.service.clone(),
4175 trailing_headers: None,
4176 };
4177 let get_resp = self.get_object(get_req).await?;
4178 let blob = get_resp.output.body.ok_or_else(|| {
4179 S3Error::with_message(
4180 S3ErrorCode::InternalError,
4181 "upload_part_copy: empty body from source GET",
4182 )
4183 })?;
4184 let bytes = collect_blob(blob, self.max_body_bytes)
4185 .await
4186 .map_err(internal("collect upload_part_copy source body"))?;
4187
4188 let sample_len = bytes.len().min(SAMPLE_BYTES);
4190 let codec_kind = self
4192 .dispatcher
4193 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
4194 .await;
4195 let original_size = bytes.len() as u64;
4196 let (compress_res, tel) = self
4198 .registry
4199 .compress_with_telemetry(bytes, codec_kind)
4200 .await;
4201 stamp_gpu_compress_telemetry(&tel);
4202 let (compressed, manifest) =
4203 compress_res.map_err(internal("registry compress upload_part_copy"))?;
4204 let header = FrameHeader {
4205 codec: codec_kind,
4206 original_size,
4207 compressed_size: compressed.len() as u64,
4208 crc32c: manifest.crc32c,
4209 };
4210 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
4211 write_frame(&mut framed, header, &compressed);
4212 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
4213 if !likely_final {
4214 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
4215 }
4216 let framed_bytes = framed.freeze();
4217 let framed_len = framed_bytes.len() as i64;
4218
4219 let part_input = UploadPartInput {
4221 bucket: req.input.bucket.clone(),
4222 key: req.input.key.clone(),
4223 part_number: req.input.part_number,
4224 upload_id: req.input.upload_id.clone(),
4225 body: Some(bytes_to_blob(framed_bytes)),
4226 content_length: Some(framed_len),
4227 ..Default::default()
4228 };
4229 let part_req = S3Request {
4230 input: part_input,
4231 method: http::Method::PUT,
4232 uri: req.uri.clone(),
4233 headers: req.headers.clone(),
4234 extensions: http::Extensions::new(),
4235 credentials: req.credentials.clone(),
4236 region: req.region.clone(),
4237 service: req.service.clone(),
4238 trailing_headers: None,
4239 };
4240 let upload_resp = self.backend.upload_part(part_req).await?;
4241
4242 let copy_output = UploadPartCopyOutput {
4243 copy_part_result: Some(CopyPartResult {
4244 e_tag: upload_resp.output.e_tag.clone(),
4245 ..Default::default()
4246 }),
4247 ..Default::default()
4248 };
4249 Ok(S3Response::new(copy_output))
4250 }
4251
4252 async fn get_object_lock_configuration(
4259 &self,
4260 req: S3Request<GetObjectLockConfigurationInput>,
4261 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
4262 if let Some(mgr) = self.object_lock.as_ref() {
4263 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
4264 ObjectLockConfiguration {
4265 object_lock_enabled: Some(ObjectLockEnabled::from_static(
4266 ObjectLockEnabled::ENABLED,
4267 )),
4268 rule: Some(ObjectLockRule {
4269 default_retention: Some(DefaultRetention {
4270 days: Some(d.retention_days as i32),
4271 mode: Some(ObjectLockRetentionMode::from_static(
4272 match d.mode {
4273 crate::object_lock::LockMode::Governance => {
4274 ObjectLockRetentionMode::GOVERNANCE
4275 }
4276 crate::object_lock::LockMode::Compliance => {
4277 ObjectLockRetentionMode::COMPLIANCE
4278 }
4279 },
4280 )),
4281 years: None,
4282 }),
4283 }),
4284 }
4285 });
4286 let output = GetObjectLockConfigurationOutput {
4287 object_lock_configuration: cfg,
4288 };
4289 return Ok(S3Response::new(output));
4290 }
4291 self.backend.get_object_lock_configuration(req).await
4292 }
4293 async fn put_object_lock_configuration(
4294 &self,
4295 req: S3Request<PutObjectLockConfigurationInput>,
4296 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
4297 if let Some(mgr) = self.object_lock.as_ref() {
4298 let bucket = req.input.bucket.clone();
4299 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
4300 && let Some(rule) = cfg.rule.as_ref()
4301 && let Some(d) = rule.default_retention.as_ref()
4302 {
4303 let mode = d
4304 .mode
4305 .as_ref()
4306 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
4307 .ok_or_else(|| {
4308 S3Error::with_message(
4309 S3ErrorCode::InvalidRequest,
4310 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
4311 )
4312 })?;
4313 let days: u32 = match (d.days, d.years) {
4317 (Some(d), None) if d > 0 => d as u32,
4318 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
4319 _ => {
4320 return Err(S3Error::with_message(
4321 S3ErrorCode::InvalidRequest,
4322 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
4323 ));
4324 }
4325 };
4326 mgr.set_bucket_default(
4327 &bucket,
4328 crate::object_lock::BucketObjectLockDefault {
4329 mode,
4330 retention_days: days,
4331 },
4332 );
4333 }
4334 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
4335 }
4336 self.backend.put_object_lock_configuration(req).await
4337 }
4338 async fn get_object_legal_hold(
4339 &self,
4340 req: S3Request<GetObjectLegalHoldInput>,
4341 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
4342 if let Some(mgr) = self.object_lock.as_ref() {
4343 let on = mgr
4344 .get(&req.input.bucket, &req.input.key)
4345 .map(|s| s.legal_hold_on)
4346 .unwrap_or(false);
4347 let status = ObjectLockLegalHoldStatus::from_static(if on {
4348 ObjectLockLegalHoldStatus::ON
4349 } else {
4350 ObjectLockLegalHoldStatus::OFF
4351 });
4352 let output = GetObjectLegalHoldOutput {
4353 legal_hold: Some(ObjectLockLegalHold {
4354 status: Some(status),
4355 }),
4356 };
4357 return Ok(S3Response::new(output));
4358 }
4359 self.backend.get_object_legal_hold(req).await
4360 }
4361 async fn put_object_legal_hold(
4362 &self,
4363 req: S3Request<PutObjectLegalHoldInput>,
4364 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
4365 if let Some(mgr) = self.object_lock.as_ref() {
4366 let on = req
4367 .input
4368 .legal_hold
4369 .as_ref()
4370 .and_then(|h| h.status.as_ref())
4371 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
4372 .unwrap_or(false);
4373 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
4374 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
4375 }
4376 self.backend.put_object_legal_hold(req).await
4377 }
4378 async fn get_object_retention(
4379 &self,
4380 req: S3Request<GetObjectRetentionInput>,
4381 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
4382 if let Some(mgr) = self.object_lock.as_ref() {
4383 let retention = mgr
4384 .get(&req.input.bucket, &req.input.key)
4385 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
4386 .map(|s| {
4387 let mode = s.mode.map(|m| {
4388 ObjectLockRetentionMode::from_static(match m {
4389 crate::object_lock::LockMode::Governance => {
4390 ObjectLockRetentionMode::GOVERNANCE
4391 }
4392 crate::object_lock::LockMode::Compliance => {
4393 ObjectLockRetentionMode::COMPLIANCE
4394 }
4395 })
4396 });
4397 let until = s.retain_until.map(chrono_utc_to_timestamp);
4398 ObjectLockRetention {
4399 mode,
4400 retain_until_date: until,
4401 }
4402 });
4403 let output = GetObjectRetentionOutput { retention };
4404 return Ok(S3Response::new(output));
4405 }
4406 self.backend.get_object_retention(req).await
4407 }
4408 async fn put_object_retention(
4409 &self,
4410 req: S3Request<PutObjectRetentionInput>,
4411 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
4412 if let Some(mgr) = self.object_lock.as_ref() {
4413 let bucket = req.input.bucket.clone();
4414 let key = req.input.key.clone();
4415 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
4416 let retention = req.input.retention.as_ref().ok_or_else(|| {
4417 S3Error::with_message(
4418 S3ErrorCode::InvalidRequest,
4419 "PutObjectRetention requires a Retention element",
4420 )
4421 })?;
4422 let new_mode = retention
4423 .mode
4424 .as_ref()
4425 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
4426 let new_until = retention
4427 .retain_until_date
4428 .as_ref()
4429 .map(timestamp_to_chrono_utc)
4430 .unwrap_or(None);
4431 let now = chrono::Utc::now();
4432 let existing = mgr.get(&bucket, &key).unwrap_or_default();
4433 if let Some(existing_mode) = existing.mode
4439 && existing_mode == crate::object_lock::LockMode::Compliance
4440 && existing.is_locked(now)
4441 {
4442 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
4443 return Err(S3Error::with_message(
4444 S3ErrorCode::AccessDenied,
4445 "Cannot downgrade Compliance retention to Governance while lock is active",
4446 ));
4447 }
4448 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4449 && next < prev
4450 {
4451 return Err(S3Error::with_message(
4452 S3ErrorCode::AccessDenied,
4453 "Cannot shorten Compliance retention while lock is active",
4454 ));
4455 }
4456 }
4457 if let Some(existing_mode) = existing.mode
4458 && existing_mode == crate::object_lock::LockMode::Governance
4459 && existing.is_locked(now)
4460 && !bypass
4461 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4462 && next < prev
4463 {
4464 return Err(S3Error::with_message(
4465 S3ErrorCode::AccessDenied,
4466 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
4467 ));
4468 }
4469 let mut state = existing;
4470 if new_mode.is_some() {
4471 state.mode = new_mode;
4472 }
4473 if new_until.is_some() {
4474 state.retain_until = new_until;
4475 }
4476 mgr.set(&bucket, &key, state);
4477 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
4478 }
4479 self.backend.put_object_retention(req).await
4480 }
4481
4482 async fn get_bucket_versioning(
4488 &self,
4489 req: S3Request<GetBucketVersioningInput>,
4490 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
4491 if let Some(mgr) = self.versioning.as_ref() {
4496 let output = match mgr.state(&req.input.bucket).as_aws_status() {
4497 Some(s) => GetBucketVersioningOutput {
4498 status: Some(BucketVersioningStatus::from(s.to_owned())),
4499 ..Default::default()
4500 },
4501 None => GetBucketVersioningOutput::default(),
4502 };
4503 return Ok(S3Response::new(output));
4504 }
4505 self.backend.get_bucket_versioning(req).await
4506 }
4507 async fn put_bucket_versioning(
4508 &self,
4509 req: S3Request<PutBucketVersioningInput>,
4510 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
4511 if let Some(mgr) = self.mfa_delete.as_ref()
4522 && let Some(target_enabled) = req
4523 .input
4524 .versioning_configuration
4525 .mfa_delete
4526 .as_ref()
4527 .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
4528 {
4529 let bucket = req.input.bucket.clone();
4530 let header = req.input.mfa.as_deref();
4531 let secret = mgr.lookup_secret(&bucket);
4532 let verified = match (header, secret.as_ref()) {
4533 (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
4534 Ok((serial, code)) => {
4535 serial == s.serial
4536 && crate::mfa::verify_totp(
4537 &s.secret_base32,
4538 &code,
4539 current_unix_secs(),
4540 )
4541 }
4542 Err(_) => false,
4543 },
4544 _ => false,
4545 };
4546 if !verified {
4547 crate::metrics::record_mfa_delete_denial(&bucket);
4548 let err = if header.is_none() {
4549 crate::mfa::MfaError::Missing
4550 } else {
4551 crate::mfa::MfaError::InvalidCode
4552 };
4553 return Err(mfa_error_to_s3(err));
4554 }
4555 mgr.set_bucket_state(&bucket, target_enabled);
4556 }
4557 if let Some(mgr) = self.versioning.as_ref() {
4563 let new_state = match req
4564 .input
4565 .versioning_configuration
4566 .status
4567 .as_ref()
4568 .map(|s| s.as_str())
4569 {
4570 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
4571 crate::versioning::VersioningState::Enabled
4572 }
4573 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
4574 crate::versioning::VersioningState::Suspended
4575 }
4576 _ => crate::versioning::VersioningState::Unversioned,
4577 };
4578 mgr.set_state(&req.input.bucket, new_state);
4579 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
4580 }
4581 self.backend.put_bucket_versioning(req).await
4582 }
4583
4584 async fn get_bucket_location(
4586 &self,
4587 req: S3Request<GetBucketLocationInput>,
4588 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
4589 self.backend.get_bucket_location(req).await
4590 }
4591
4592 async fn get_bucket_policy(
4594 &self,
4595 req: S3Request<GetBucketPolicyInput>,
4596 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
4597 self.backend.get_bucket_policy(req).await
4598 }
4599 async fn put_bucket_policy(
4600 &self,
4601 req: S3Request<PutBucketPolicyInput>,
4602 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
4603 self.backend.put_bucket_policy(req).await
4604 }
4605 async fn delete_bucket_policy(
4606 &self,
4607 req: S3Request<DeleteBucketPolicyInput>,
4608 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
4609 self.backend.delete_bucket_policy(req).await
4610 }
4611 async fn get_bucket_policy_status(
4612 &self,
4613 req: S3Request<GetBucketPolicyStatusInput>,
4614 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
4615 self.backend.get_bucket_policy_status(req).await
4616 }
4617
4618 async fn get_bucket_acl(
4620 &self,
4621 req: S3Request<GetBucketAclInput>,
4622 ) -> S3Result<S3Response<GetBucketAclOutput>> {
4623 self.backend.get_bucket_acl(req).await
4624 }
4625 async fn put_bucket_acl(
4626 &self,
4627 req: S3Request<PutBucketAclInput>,
4628 ) -> S3Result<S3Response<PutBucketAclOutput>> {
4629 self.backend.put_bucket_acl(req).await
4630 }
4631
4632 async fn get_bucket_cors(
4634 &self,
4635 req: S3Request<GetBucketCorsInput>,
4636 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
4637 if let Some(mgr) = self.cors.as_ref() {
4638 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4639 S3Error::with_message(
4640 S3ErrorCode::NoSuchCORSConfiguration,
4641 "The CORS configuration does not exist".to_string(),
4642 )
4643 })?;
4644 let rules: Vec<CORSRule> = cfg
4645 .rules
4646 .into_iter()
4647 .map(|r| CORSRule {
4648 allowed_headers: if r.allowed_headers.is_empty() {
4649 None
4650 } else {
4651 Some(r.allowed_headers)
4652 },
4653 allowed_methods: r.allowed_methods,
4654 allowed_origins: r.allowed_origins,
4655 expose_headers: if r.expose_headers.is_empty() {
4656 None
4657 } else {
4658 Some(r.expose_headers)
4659 },
4660 id: r.id,
4661 max_age_seconds: r.max_age_seconds.map(|s| s as i32),
4662 })
4663 .collect();
4664 return Ok(S3Response::new(GetBucketCorsOutput {
4665 cors_rules: Some(rules),
4666 }));
4667 }
4668 self.backend.get_bucket_cors(req).await
4669 }
4670 async fn put_bucket_cors(
4671 &self,
4672 req: S3Request<PutBucketCorsInput>,
4673 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4674 if let Some(mgr) = self.cors.as_ref() {
4675 let cfg = crate::cors::CorsConfig {
4676 rules: req
4677 .input
4678 .cors_configuration
4679 .cors_rules
4680 .into_iter()
4681 .map(|r| crate::cors::CorsRule {
4682 allowed_origins: r.allowed_origins,
4683 allowed_methods: r.allowed_methods,
4684 allowed_headers: r.allowed_headers.unwrap_or_default(),
4685 expose_headers: r.expose_headers.unwrap_or_default(),
4686 max_age_seconds: r.max_age_seconds.and_then(|s| {
4687 if s < 0 { None } else { Some(s as u32) }
4688 }),
4689 id: r.id,
4690 })
4691 .collect(),
4692 };
4693 mgr.put(&req.input.bucket, cfg);
4694 return Ok(S3Response::new(PutBucketCorsOutput::default()));
4695 }
4696 self.backend.put_bucket_cors(req).await
4697 }
4698 async fn delete_bucket_cors(
4699 &self,
4700 req: S3Request<DeleteBucketCorsInput>,
4701 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4702 if let Some(mgr) = self.cors.as_ref() {
4703 mgr.delete(&req.input.bucket);
4704 return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4705 }
4706 self.backend.delete_bucket_cors(req).await
4707 }
4708
4709 async fn get_bucket_lifecycle_configuration(
4711 &self,
4712 req: S3Request<GetBucketLifecycleConfigurationInput>,
4713 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4714 if let Some(mgr) = self.lifecycle.as_ref() {
4715 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4716 S3Error::with_message(
4717 S3ErrorCode::NoSuchLifecycleConfiguration,
4718 "The lifecycle configuration does not exist".to_string(),
4719 )
4720 })?;
4721 let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4722 return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4723 rules: Some(rules),
4724 transition_default_minimum_object_size: None,
4725 }));
4726 }
4727 self.backend.get_bucket_lifecycle_configuration(req).await
4728 }
4729 async fn put_bucket_lifecycle_configuration(
4730 &self,
4731 req: S3Request<PutBucketLifecycleConfigurationInput>,
4732 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4733 if let Some(mgr) = self.lifecycle.as_ref() {
4734 let bucket = req.input.bucket.clone();
4735 let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4736 let cfg = dto_lifecycle_to_internal(&dto_cfg);
4737 mgr.put(&bucket, cfg);
4738 return Ok(S3Response::new(
4739 PutBucketLifecycleConfigurationOutput::default(),
4740 ));
4741 }
4742 self.backend.put_bucket_lifecycle_configuration(req).await
4743 }
4744 async fn delete_bucket_lifecycle(
4745 &self,
4746 req: S3Request<DeleteBucketLifecycleInput>,
4747 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4748 if let Some(mgr) = self.lifecycle.as_ref() {
4749 mgr.delete(&req.input.bucket);
4750 return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4751 }
4752 self.backend.delete_bucket_lifecycle(req).await
4753 }
4754
4755 async fn get_bucket_tagging(
4757 &self,
4758 req: S3Request<GetBucketTaggingInput>,
4759 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4760 let Some(mgr) = self.tagging.as_ref() else {
4761 return self.backend.get_bucket_tagging(req).await;
4762 };
4763 let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4764 Ok(S3Response::new(GetBucketTaggingOutput {
4765 tag_set: tagset_to_aws(&tags),
4766 }))
4767 }
4768 async fn put_bucket_tagging(
4769 &self,
4770 req: S3Request<PutBucketTaggingInput>,
4771 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
4772 let Some(mgr) = self.tagging.as_ref() else {
4773 return self.backend.put_bucket_tagging(req).await;
4774 };
4775 let bucket = req.input.bucket.clone();
4776 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4777 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4778 })?;
4779 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4780 mgr.put_bucket_tags(&bucket, parsed);
4781 Ok(S3Response::new(PutBucketTaggingOutput::default()))
4782 }
4783 async fn delete_bucket_tagging(
4784 &self,
4785 req: S3Request<DeleteBucketTaggingInput>,
4786 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
4787 let Some(mgr) = self.tagging.as_ref() else {
4788 return self.backend.delete_bucket_tagging(req).await;
4789 };
4790 let bucket = req.input.bucket.clone();
4791 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4792 mgr.delete_bucket_tags(&bucket);
4793 Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
4794 }
4795
4796 async fn get_bucket_encryption(
4798 &self,
4799 req: S3Request<GetBucketEncryptionInput>,
4800 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
4801 self.backend.get_bucket_encryption(req).await
4802 }
4803 async fn put_bucket_encryption(
4804 &self,
4805 req: S3Request<PutBucketEncryptionInput>,
4806 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
4807 self.backend.put_bucket_encryption(req).await
4808 }
4809 async fn delete_bucket_encryption(
4810 &self,
4811 req: S3Request<DeleteBucketEncryptionInput>,
4812 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
4813 self.backend.delete_bucket_encryption(req).await
4814 }
4815
4816 async fn get_bucket_logging(
4818 &self,
4819 req: S3Request<GetBucketLoggingInput>,
4820 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
4821 self.backend.get_bucket_logging(req).await
4822 }
4823 async fn put_bucket_logging(
4824 &self,
4825 req: S3Request<PutBucketLoggingInput>,
4826 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
4827 self.backend.put_bucket_logging(req).await
4828 }
4829
4830 async fn get_bucket_notification_configuration(
4840 &self,
4841 req: S3Request<GetBucketNotificationConfigurationInput>,
4842 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
4843 if let Some(mgr) = self.notifications.as_ref() {
4844 let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
4845 let dto = notif_to_dto(&cfg);
4846 return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
4847 event_bridge_configuration: dto.event_bridge_configuration,
4848 lambda_function_configurations: dto.lambda_function_configurations,
4849 queue_configurations: dto.queue_configurations,
4850 topic_configurations: dto.topic_configurations,
4851 }));
4852 }
4853 self.backend
4854 .get_bucket_notification_configuration(req)
4855 .await
4856 }
4857 async fn put_bucket_notification_configuration(
4858 &self,
4859 req: S3Request<PutBucketNotificationConfigurationInput>,
4860 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
4861 if let Some(mgr) = self.notifications.as_ref() {
4862 let cfg = notif_from_dto(&req.input.notification_configuration);
4863 mgr.put(&req.input.bucket, cfg);
4864 return Ok(S3Response::new(
4865 PutBucketNotificationConfigurationOutput::default(),
4866 ));
4867 }
4868 self.backend
4869 .put_bucket_notification_configuration(req)
4870 .await
4871 }
4872
4873 async fn get_bucket_request_payment(
4875 &self,
4876 req: S3Request<GetBucketRequestPaymentInput>,
4877 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4878 self.backend.get_bucket_request_payment(req).await
4879 }
4880 async fn put_bucket_request_payment(
4881 &self,
4882 req: S3Request<PutBucketRequestPaymentInput>,
4883 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4884 self.backend.put_bucket_request_payment(req).await
4885 }
4886
4887 async fn get_bucket_website(
4889 &self,
4890 req: S3Request<GetBucketWebsiteInput>,
4891 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4892 self.backend.get_bucket_website(req).await
4893 }
4894 async fn put_bucket_website(
4895 &self,
4896 req: S3Request<PutBucketWebsiteInput>,
4897 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4898 self.backend.put_bucket_website(req).await
4899 }
4900 async fn delete_bucket_website(
4901 &self,
4902 req: S3Request<DeleteBucketWebsiteInput>,
4903 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4904 self.backend.delete_bucket_website(req).await
4905 }
4906
4907 async fn get_bucket_replication(
4909 &self,
4910 req: S3Request<GetBucketReplicationInput>,
4911 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4912 if let Some(mgr) = self.replication.as_ref() {
4913 return match mgr.get(&req.input.bucket) {
4914 Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4915 replication_configuration: Some(replication_to_dto(&cfg)),
4916 })),
4917 None => Err(S3Error::with_message(
4918 S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4919 format!("no replication configuration on bucket {}", req.input.bucket),
4920 )),
4921 };
4922 }
4923 self.backend.get_bucket_replication(req).await
4924 }
4925 async fn put_bucket_replication(
4926 &self,
4927 req: S3Request<PutBucketReplicationInput>,
4928 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4929 if let Some(mgr) = self.replication.as_ref() {
4930 let cfg = replication_from_dto(&req.input.replication_configuration);
4931 mgr.put(&req.input.bucket, cfg);
4932 return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4933 }
4934 self.backend.put_bucket_replication(req).await
4935 }
4936 async fn delete_bucket_replication(
4937 &self,
4938 req: S3Request<DeleteBucketReplicationInput>,
4939 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
4940 if let Some(mgr) = self.replication.as_ref() {
4941 mgr.delete(&req.input.bucket);
4942 return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
4943 }
4944 self.backend.delete_bucket_replication(req).await
4945 }
4946
4947 async fn get_bucket_accelerate_configuration(
4949 &self,
4950 req: S3Request<GetBucketAccelerateConfigurationInput>,
4951 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
4952 self.backend.get_bucket_accelerate_configuration(req).await
4953 }
4954 async fn put_bucket_accelerate_configuration(
4955 &self,
4956 req: S3Request<PutBucketAccelerateConfigurationInput>,
4957 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
4958 self.backend.put_bucket_accelerate_configuration(req).await
4959 }
4960
4961 async fn get_bucket_ownership_controls(
4963 &self,
4964 req: S3Request<GetBucketOwnershipControlsInput>,
4965 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
4966 self.backend.get_bucket_ownership_controls(req).await
4967 }
4968 async fn put_bucket_ownership_controls(
4969 &self,
4970 req: S3Request<PutBucketOwnershipControlsInput>,
4971 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
4972 self.backend.put_bucket_ownership_controls(req).await
4973 }
4974 async fn delete_bucket_ownership_controls(
4975 &self,
4976 req: S3Request<DeleteBucketOwnershipControlsInput>,
4977 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
4978 self.backend.delete_bucket_ownership_controls(req).await
4979 }
4980
4981 async fn get_public_access_block(
4983 &self,
4984 req: S3Request<GetPublicAccessBlockInput>,
4985 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
4986 self.backend.get_public_access_block(req).await
4987 }
4988 async fn put_public_access_block(
4989 &self,
4990 req: S3Request<PutPublicAccessBlockInput>,
4991 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
4992 self.backend.put_public_access_block(req).await
4993 }
4994 async fn delete_public_access_block(
4995 &self,
4996 req: S3Request<DeletePublicAccessBlockInput>,
4997 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
4998 self.backend.delete_public_access_block(req).await
4999 }
5000
5001 async fn select_object_content(
5020 &self,
5021 req: S3Request<SelectObjectContentInput>,
5022 ) -> S3Result<S3Response<SelectObjectContentOutput>> {
5023 use crate::select::{
5024 EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
5025 run_select_jsonlines,
5026 };
5027
5028 let select_bucket = req.input.bucket.clone();
5029 let select_key = req.input.key.clone();
5030 self.enforce_rate_limit(&req, &select_bucket)?;
5031 self.enforce_policy(
5032 &req,
5033 "s3:GetObject",
5034 &select_bucket,
5035 Some(&select_key),
5036 )?;
5037
5038 let request = req.input.request;
5039 let sql = request.expression.clone();
5040 if request.expression_type.as_str() != "SQL" {
5041 return Err(S3Error::with_message(
5042 S3ErrorCode::InvalidExpressionType,
5043 format!(
5044 "ExpressionType must be SQL, got: {}",
5045 request.expression_type.as_str()
5046 ),
5047 ));
5048 }
5049
5050 let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
5051 SelectInputFormat::JsonLines
5052 } else if let Some(csv) = request.input_serialization.csv.as_ref() {
5053 let has_header = csv
5054 .file_header_info
5055 .as_ref()
5056 .map(|h| {
5057 let s = h.as_str();
5058 s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
5059 })
5060 .unwrap_or(false);
5061 let delim = csv
5062 .field_delimiter
5063 .as_deref()
5064 .and_then(|s| s.chars().next())
5065 .unwrap_or(',');
5066 SelectInputFormat::Csv {
5067 has_header,
5068 delimiter: delim,
5069 }
5070 } else if request.input_serialization.parquet.is_some() {
5071 return Err(S3Error::with_message(
5072 S3ErrorCode::NotImplemented,
5073 "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
5074 ));
5075 } else {
5076 return Err(S3Error::with_message(
5077 S3ErrorCode::InvalidRequest,
5078 "InputSerialization requires exactly one of CSV / JSON / Parquet",
5079 ));
5080 };
5081 if let Some(ct) = request.input_serialization.compression_type.as_ref()
5082 && !ct.as_str().eq_ignore_ascii_case("NONE")
5083 {
5084 return Err(S3Error::with_message(
5085 S3ErrorCode::NotImplemented,
5086 format!(
5087 "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
5088 ct.as_str()
5089 ),
5090 ));
5091 }
5092
5093 let output_format = if request.output_serialization.json.is_some() {
5094 SelectOutputFormat::Json
5095 } else if request.output_serialization.csv.is_some() {
5096 SelectOutputFormat::Csv
5097 } else {
5098 return Err(S3Error::with_message(
5099 S3ErrorCode::InvalidRequest,
5100 "OutputSerialization requires exactly one of CSV / JSON",
5101 ));
5102 };
5103
5104 let get_input = GetObjectInput {
5105 bucket: select_bucket.clone(),
5106 key: select_key.clone(),
5107 sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
5108 sse_customer_key: req.input.sse_customer_key.clone(),
5109 sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
5110 ..Default::default()
5111 };
5112 let get_req = S3Request {
5113 input: get_input,
5114 method: http::Method::GET,
5115 uri: format!("/{}/{}", select_bucket, select_key)
5116 .parse()
5117 .map_err(|e| {
5118 S3Error::with_message(
5119 S3ErrorCode::InternalError,
5120 format!("constructing inner GET URI: {e}"),
5121 )
5122 })?,
5123 headers: http::HeaderMap::new(),
5124 extensions: http::Extensions::new(),
5125 credentials: req.credentials.clone(),
5126 region: req.region.clone(),
5127 service: req.service.clone(),
5128 trailing_headers: None,
5129 };
5130 let mut get_resp = self.get_object(get_req).await?;
5131 let blob = get_resp.output.body.take().ok_or_else(|| {
5132 S3Error::with_message(
5133 S3ErrorCode::InternalError,
5134 "Select: object body was empty after GET",
5135 )
5136 })?;
5137 let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
5138 .await
5139 .map_err(internal("collect Select body"))?;
5140 let scanned = body_bytes.len() as u64;
5141
5142 let matched_payload = match input_format {
5143 SelectInputFormat::JsonLines => {
5144 run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
5145 |e| select_error_to_s3(e, "JSON Lines"),
5146 )?
5147 }
5148 SelectInputFormat::Csv { .. } => {
5149 run_select_csv(&sql, &body_bytes, input_format, output_format)
5150 .map_err(|e| select_error_to_s3(e, "CSV"))?
5151 }
5152 };
5153
5154 let returned = matched_payload.len() as u64;
5155 let processed = scanned;
5156 let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
5157 if !matched_payload.is_empty() {
5158 events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
5159 payload: Some(bytes::Bytes::from(matched_payload)),
5160 })));
5161 }
5162 events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
5163 details: Some(Stats {
5164 bytes_scanned: Some(scanned as i64),
5165 bytes_processed: Some(processed as i64),
5166 bytes_returned: Some(returned as i64),
5167 }),
5168 })));
5169 events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
5170 let _writer = EventStreamWriter::new();
5173
5174 let stream =
5175 SelectObjectContentEventStream::new(futures::stream::iter(events));
5176 let output = SelectObjectContentOutput {
5177 payload: Some(stream),
5178 };
5179 Ok(S3Response::new(output))
5180 }
5181
5182 async fn put_bucket_inventory_configuration(
5196 &self,
5197 req: S3Request<PutBucketInventoryConfigurationInput>,
5198 ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
5199 if let Some(mgr) = self.inventory.as_ref() {
5200 let cfg = inv_from_dto(
5201 &req.input.bucket,
5202 &req.input.id,
5203 &req.input.inventory_configuration,
5204 );
5205 mgr.put(cfg);
5206 return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
5207 }
5208 self.backend.put_bucket_inventory_configuration(req).await
5209 }
5210
5211 async fn get_bucket_inventory_configuration(
5212 &self,
5213 req: S3Request<GetBucketInventoryConfigurationInput>,
5214 ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
5215 if let Some(mgr) = self.inventory.as_ref() {
5216 let cfg = mgr.get(&req.input.bucket, &req.input.id);
5217 if let Some(cfg) = cfg {
5218 let out = GetBucketInventoryConfigurationOutput {
5219 inventory_configuration: Some(inv_to_dto(&cfg)),
5220 };
5221 return Ok(S3Response::new(out));
5222 }
5223 let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
5230 .unwrap_or(S3ErrorCode::NoSuchKey);
5231 return Err(S3Error::with_message(
5232 code,
5233 format!(
5234 "no inventory configuration with id={} on bucket={}",
5235 req.input.id, req.input.bucket
5236 ),
5237 ));
5238 }
5239 self.backend.get_bucket_inventory_configuration(req).await
5240 }
5241
5242 async fn list_bucket_inventory_configurations(
5243 &self,
5244 req: S3Request<ListBucketInventoryConfigurationsInput>,
5245 ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
5246 if let Some(mgr) = self.inventory.as_ref() {
5247 let list = mgr.list_for_bucket(&req.input.bucket);
5248 let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
5249 let out = ListBucketInventoryConfigurationsOutput {
5250 continuation_token: req.input.continuation_token.clone(),
5251 inventory_configuration_list: if dto_list.is_empty() {
5252 None
5253 } else {
5254 Some(dto_list)
5255 },
5256 is_truncated: Some(false),
5257 next_continuation_token: None,
5258 };
5259 return Ok(S3Response::new(out));
5260 }
5261 self.backend.list_bucket_inventory_configurations(req).await
5262 }
5263
5264 async fn delete_bucket_inventory_configuration(
5265 &self,
5266 req: S3Request<DeleteBucketInventoryConfigurationInput>,
5267 ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
5268 if let Some(mgr) = self.inventory.as_ref() {
5269 mgr.delete(&req.input.bucket, &req.input.id);
5270 return Ok(S3Response::new(
5271 DeleteBucketInventoryConfigurationOutput::default(),
5272 ));
5273 }
5274 self.backend.delete_bucket_inventory_configuration(req).await
5275 }
5276}
5277
5278fn inv_from_dto(
5288 bucket: &str,
5289 id: &str,
5290 dto: &InventoryConfiguration,
5291) -> crate::inventory::InventoryConfig {
5292 let frequency_hours = match dto.schedule.frequency.as_str() {
5293 "Weekly" => 24 * 7,
5294 _ => 24,
5298 };
5299 let format = crate::inventory::InventoryFormat::Csv;
5303 crate::inventory::InventoryConfig {
5304 id: id.to_owned(),
5305 bucket: bucket.to_owned(),
5306 destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
5307 destination_prefix: dto
5308 .destination
5309 .s3_bucket_destination
5310 .prefix
5311 .clone()
5312 .unwrap_or_default(),
5313 frequency_hours,
5314 format,
5315 included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
5316 dto.included_object_versions.as_str(),
5317 ),
5318 }
5319}
5320
5321fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
5322 InventoryConfiguration {
5323 id: cfg.id.clone(),
5324 is_enabled: true,
5325 included_object_versions: InventoryIncludedObjectVersions::from(
5326 cfg.included_object_versions.as_aws_str().to_owned(),
5327 ),
5328 destination: InventoryDestination {
5329 s3_bucket_destination: InventoryS3BucketDestination {
5330 account_id: None,
5331 bucket: cfg.destination_bucket.clone(),
5332 encryption: None,
5333 format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
5334 prefix: if cfg.destination_prefix.is_empty() {
5335 None
5336 } else {
5337 Some(cfg.destination_prefix.clone())
5338 },
5339 },
5340 },
5341 schedule: InventorySchedule {
5342 frequency: InventoryFrequency::from(
5346 if cfg.frequency_hours == 24 * 7 {
5347 "Weekly"
5348 } else {
5349 "Daily"
5350 }
5351 .to_owned(),
5352 ),
5353 },
5354 filter: None,
5355 optional_fields: None,
5356 }
5357}
5358
5359fn notif_from_dto(
5376 dto: &NotificationConfiguration,
5377) -> crate::notifications::NotificationConfig {
5378 let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
5379 if let Some(topics) = dto.topic_configurations.as_ref() {
5380 for (idx, t) in topics.iter().enumerate() {
5381 let events = events_from_dto(&t.events);
5382 let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
5383 rules.push(crate::notifications::NotificationRule {
5384 id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
5385 events,
5386 destination: crate::notifications::Destination::Sns {
5387 topic_arn: t.topic_arn.clone(),
5388 },
5389 filter_prefix: prefix,
5390 filter_suffix: suffix,
5391 });
5392 }
5393 }
5394 if let Some(queues) = dto.queue_configurations.as_ref() {
5395 for (idx, q) in queues.iter().enumerate() {
5396 let events = events_from_dto(&q.events);
5397 let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
5398 rules.push(crate::notifications::NotificationRule {
5399 id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
5400 events,
5401 destination: crate::notifications::Destination::Sqs {
5402 queue_arn: q.queue_arn.clone(),
5403 },
5404 filter_prefix: prefix,
5405 filter_suffix: suffix,
5406 });
5407 }
5408 }
5409 crate::notifications::NotificationConfig { rules }
5410}
5411
5412fn notif_to_dto(
5413 cfg: &crate::notifications::NotificationConfig,
5414) -> NotificationConfiguration {
5415 let mut topics: Vec<TopicConfiguration> = Vec::new();
5416 let mut queues: Vec<QueueConfiguration> = Vec::new();
5417 for rule in &cfg.rules {
5418 let events: Vec<Event> = rule
5419 .events
5420 .iter()
5421 .map(|e| Event::from(e.as_aws_str().to_owned()))
5422 .collect();
5423 let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
5424 match &rule.destination {
5425 crate::notifications::Destination::Sns { topic_arn } => {
5426 topics.push(TopicConfiguration {
5427 events,
5428 filter,
5429 id: Some(rule.id.clone()),
5430 topic_arn: topic_arn.clone(),
5431 });
5432 }
5433 crate::notifications::Destination::Sqs { queue_arn } => {
5434 queues.push(QueueConfiguration {
5435 events,
5436 filter,
5437 id: Some(rule.id.clone()),
5438 queue_arn: queue_arn.clone(),
5439 });
5440 }
5441 crate::notifications::Destination::Webhook { .. } => {}
5446 }
5447 }
5448 NotificationConfiguration {
5449 event_bridge_configuration: None,
5450 lambda_function_configurations: None,
5451 queue_configurations: if queues.is_empty() { None } else { Some(queues) },
5452 topic_configurations: if topics.is_empty() { None } else { Some(topics) },
5453 }
5454}
5455
5456fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
5457 events
5458 .iter()
5459 .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
5460 .collect()
5461}
5462
5463fn filter_from_dto(
5464 f: Option<&NotificationConfigurationFilter>,
5465) -> (Option<String>, Option<String>) {
5466 let Some(f) = f else {
5467 return (None, None);
5468 };
5469 let Some(key) = f.key.as_ref() else {
5470 return (None, None);
5471 };
5472 let Some(rules) = key.filter_rules.as_ref() else {
5473 return (None, None);
5474 };
5475 let mut prefix = None;
5476 let mut suffix = None;
5477 for r in rules {
5478 let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
5479 let value = r.value.clone();
5480 match name.as_deref() {
5481 Some("prefix") => prefix = value,
5482 Some("suffix") => suffix = value,
5483 _ => {}
5484 }
5485 }
5486 (prefix, suffix)
5487}
5488
5489fn filter_to_dto(
5490 prefix: Option<&str>,
5491 suffix: Option<&str>,
5492) -> Option<NotificationConfigurationFilter> {
5493 if prefix.is_none() && suffix.is_none() {
5494 return None;
5495 }
5496 let mut rules: Vec<FilterRule> = Vec::new();
5497 if let Some(p) = prefix {
5498 rules.push(FilterRule {
5499 name: Some(FilterRuleName::from("prefix".to_owned())),
5500 value: Some(p.to_owned()),
5501 });
5502 }
5503 if let Some(s) = suffix {
5504 rules.push(FilterRule {
5505 name: Some(FilterRuleName::from("suffix".to_owned())),
5506 value: Some(s.to_owned()),
5507 });
5508 }
5509 Some(NotificationConfigurationFilter {
5510 key: Some(S3KeyFilter {
5511 filter_rules: Some(rules),
5512 }),
5513 })
5514}
5515
5516fn replication_from_dto(
5529 dto: &ReplicationConfiguration,
5530) -> crate::replication::ReplicationConfig {
5531 let rules = dto
5532 .rules
5533 .iter()
5534 .enumerate()
5535 .map(|(idx, r)| {
5536 let id = r
5537 .id
5538 .as_ref()
5539 .map(|s| s.as_str().to_owned())
5540 .unwrap_or_else(|| format!("rule-{idx}"));
5541 let priority = r.priority.unwrap_or(0).max(0) as u32;
5542 let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
5543 let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
5544 let destination_bucket = r.destination.bucket.clone();
5545 let destination_storage_class = r
5546 .destination
5547 .storage_class
5548 .as_ref()
5549 .map(|s| s.as_str().to_owned());
5550 crate::replication::ReplicationRule {
5551 id,
5552 priority,
5553 status_enabled,
5554 filter,
5555 destination_bucket,
5556 destination_storage_class,
5557 }
5558 })
5559 .collect();
5560 crate::replication::ReplicationConfig {
5561 role: dto.role.clone(),
5562 rules,
5563 }
5564}
5565
5566fn replication_to_dto(
5567 cfg: &crate::replication::ReplicationConfig,
5568) -> ReplicationConfiguration {
5569 let rules = cfg
5570 .rules
5571 .iter()
5572 .map(|r| {
5573 let status = if r.status_enabled {
5574 ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
5575 } else {
5576 ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
5577 };
5578 let destination = Destination {
5579 access_control_translation: None,
5580 account: None,
5581 bucket: r.destination_bucket.clone(),
5582 encryption_configuration: None,
5583 metrics: None,
5584 replication_time: None,
5585 storage_class: r
5586 .destination_storage_class
5587 .as_ref()
5588 .map(|s| StorageClass::from(s.clone())),
5589 };
5590 let filter = Some(replication_filter_to_dto(&r.filter));
5591 ReplicationRule {
5592 delete_marker_replication: None,
5593 destination,
5594 existing_object_replication: None,
5595 filter,
5596 id: Some(r.id.clone()),
5597 prefix: None,
5598 priority: Some(r.priority as i32),
5599 source_selection_criteria: None,
5600 status,
5601 }
5602 })
5603 .collect();
5604 ReplicationConfiguration {
5605 role: cfg.role.clone(),
5606 rules,
5607 }
5608}
5609
5610fn replication_filter_from_dto(
5611 f: Option<&ReplicationRuleFilter>,
5612 rule_level_prefix: Option<&str>,
5613) -> crate::replication::ReplicationFilter {
5614 let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
5615 let mut tags: Vec<(String, String)> = Vec::new();
5616 if let Some(f) = f {
5617 if let Some(p) = f.prefix.as_ref()
5618 && prefix.is_none()
5619 {
5620 prefix = Some(p.clone());
5621 }
5622 if let Some(t) = f.tag.as_ref()
5623 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5624 {
5625 tags.push((k.clone(), v.clone()));
5626 }
5627 if let Some(and) = f.and.as_ref() {
5628 if let Some(p) = and.prefix.as_ref()
5629 && prefix.is_none()
5630 {
5631 prefix = Some(p.clone());
5632 }
5633 if let Some(ts) = and.tags.as_ref() {
5634 for t in ts {
5635 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5636 tags.push((k.clone(), v.clone()));
5637 }
5638 }
5639 }
5640 }
5641 }
5642 crate::replication::ReplicationFilter { prefix, tags }
5643}
5644
5645fn replication_filter_to_dto(
5646 f: &crate::replication::ReplicationFilter,
5647) -> ReplicationRuleFilter {
5648 if f.tags.is_empty() {
5649 ReplicationRuleFilter {
5650 and: None,
5651 prefix: f.prefix.clone(),
5652 tag: None,
5653 }
5654 } else if f.tags.len() == 1 && f.prefix.is_none() {
5655 let (k, v) = &f.tags[0];
5656 ReplicationRuleFilter {
5657 and: None,
5658 prefix: None,
5659 tag: Some(Tag {
5660 key: Some(k.clone()),
5661 value: Some(v.clone()),
5662 }),
5663 }
5664 } else {
5665 let tags: Vec<Tag> = f
5666 .tags
5667 .iter()
5668 .map(|(k, v)| Tag {
5669 key: Some(k.clone()),
5670 value: Some(v.clone()),
5671 })
5672 .collect();
5673 ReplicationRuleFilter {
5674 and: Some(ReplicationRuleAndOperator {
5675 prefix: f.prefix.clone(),
5676 tags: Some(tags),
5677 }),
5678 prefix: None,
5679 tag: None,
5680 }
5681 }
5682}
5683
5684fn dto_lifecycle_to_internal(
5698 dto: &BucketLifecycleConfiguration,
5699) -> crate::lifecycle::LifecycleConfig {
5700 crate::lifecycle::LifecycleConfig {
5701 rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5702 }
5703}
5704
5705fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5706 let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5707 let filter = rule
5708 .filter
5709 .as_ref()
5710 .map(dto_filter_to_internal)
5711 .unwrap_or_default();
5712 let expiration_days = rule
5713 .expiration
5714 .as_ref()
5715 .and_then(|e| e.days)
5716 .and_then(|d| u32::try_from(d).ok());
5717 let expiration_date = rule
5718 .expiration
5719 .as_ref()
5720 .and_then(|e| e.date.as_ref())
5721 .and_then(timestamp_to_chrono_utc);
5722 let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5723 .transitions
5724 .as_ref()
5725 .map(|ts| {
5726 ts.iter()
5727 .filter_map(|t| {
5728 let days = u32::try_from(t.days?).ok()?;
5729 let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5730 Some(crate::lifecycle::TransitionRule {
5731 days,
5732 storage_class,
5733 })
5734 })
5735 .collect()
5736 })
5737 .unwrap_or_default();
5738 let noncurrent_version_expiration_days = rule
5739 .noncurrent_version_expiration
5740 .as_ref()
5741 .and_then(|n| n.noncurrent_days)
5742 .and_then(|d| u32::try_from(d).ok());
5743 let abort_incomplete_multipart_upload_days = rule
5744 .abort_incomplete_multipart_upload
5745 .as_ref()
5746 .and_then(|a| a.days_after_initiation)
5747 .and_then(|d| u32::try_from(d).ok());
5748 crate::lifecycle::LifecycleRule {
5749 id: rule.id.clone().unwrap_or_default(),
5750 status,
5751 filter,
5752 expiration_days,
5753 expiration_date,
5754 transitions,
5755 noncurrent_version_expiration_days,
5756 abort_incomplete_multipart_upload_days,
5757 }
5758}
5759
5760fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5761 let mut prefix = filter.prefix.clone();
5762 let mut tags: Vec<(String, String)> = Vec::new();
5763 let mut size_gt: Option<u64> = filter
5764 .object_size_greater_than
5765 .and_then(|n| u64::try_from(n).ok());
5766 let mut size_lt: Option<u64> = filter
5767 .object_size_less_than
5768 .and_then(|n| u64::try_from(n).ok());
5769 if let Some(t) = &filter.tag
5770 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5771 {
5772 tags.push((k.clone(), v.clone()));
5773 }
5774 if let Some(and) = &filter.and {
5775 if prefix.is_none() {
5776 prefix = and.prefix.clone();
5777 }
5778 if size_gt.is_none() {
5779 size_gt = and
5780 .object_size_greater_than
5781 .and_then(|n| u64::try_from(n).ok());
5782 }
5783 if size_lt.is_none() {
5784 size_lt = and
5785 .object_size_less_than
5786 .and_then(|n| u64::try_from(n).ok());
5787 }
5788 if let Some(ts) = &and.tags {
5789 for t in ts {
5790 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5791 tags.push((k.clone(), v.clone()));
5792 }
5793 }
5794 }
5795 }
5796 crate::lifecycle::LifecycleFilter {
5797 prefix,
5798 tags,
5799 object_size_greater_than: size_gt,
5800 object_size_less_than: size_lt,
5801 }
5802}
5803
5804fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
5805 let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
5806 Some(LifecycleExpiration {
5807 date: rule.expiration_date.map(chrono_utc_to_timestamp),
5808 days: rule.expiration_days.map(|d| d as i32),
5809 expired_object_delete_marker: None,
5810 })
5811 } else {
5812 None
5813 };
5814 let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
5815 None
5816 } else {
5817 Some(
5818 rule.transitions
5819 .iter()
5820 .map(|t| Transition {
5821 date: None,
5822 days: Some(t.days as i32),
5823 storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
5824 })
5825 .collect(),
5826 )
5827 };
5828 let noncurrent_version_expiration =
5829 rule.noncurrent_version_expiration_days
5830 .map(|d| NoncurrentVersionExpiration {
5831 newer_noncurrent_versions: None,
5832 noncurrent_days: Some(d as i32),
5833 });
5834 let abort_incomplete_multipart_upload =
5835 rule.abort_incomplete_multipart_upload_days
5836 .map(|d| AbortIncompleteMultipartUpload {
5837 days_after_initiation: Some(d as i32),
5838 });
5839 let filter = if rule.filter.tags.is_empty()
5840 && rule.filter.object_size_greater_than.is_none()
5841 && rule.filter.object_size_less_than.is_none()
5842 {
5843 rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
5844 and: None,
5845 object_size_greater_than: None,
5846 object_size_less_than: None,
5847 prefix: Some(p.clone()),
5848 tag: None,
5849 })
5850 } else if rule.filter.tags.len() == 1
5851 && rule.filter.prefix.is_none()
5852 && rule.filter.object_size_greater_than.is_none()
5853 && rule.filter.object_size_less_than.is_none()
5854 {
5855 let (k, v) = rule.filter.tags[0].clone();
5856 Some(LifecycleRuleFilter {
5857 and: None,
5858 object_size_greater_than: None,
5859 object_size_less_than: None,
5860 prefix: None,
5861 tag: Some(Tag {
5862 key: Some(k),
5863 value: Some(v),
5864 }),
5865 })
5866 } else {
5867 let tags = if rule.filter.tags.is_empty() {
5868 None
5869 } else {
5870 Some(
5871 rule.filter
5872 .tags
5873 .iter()
5874 .map(|(k, v)| Tag {
5875 key: Some(k.clone()),
5876 value: Some(v.clone()),
5877 })
5878 .collect(),
5879 )
5880 };
5881 Some(LifecycleRuleFilter {
5882 and: Some(LifecycleRuleAndOperator {
5883 object_size_greater_than: rule
5884 .filter
5885 .object_size_greater_than
5886 .and_then(|n| i64::try_from(n).ok()),
5887 object_size_less_than: rule
5888 .filter
5889 .object_size_less_than
5890 .and_then(|n| i64::try_from(n).ok()),
5891 prefix: rule.filter.prefix.clone(),
5892 tags,
5893 }),
5894 object_size_greater_than: None,
5895 object_size_less_than: None,
5896 prefix: None,
5897 tag: None,
5898 })
5899 };
5900 LifecycleRule {
5901 abort_incomplete_multipart_upload,
5902 expiration,
5903 filter,
5904 id: if rule.id.is_empty() {
5905 None
5906 } else {
5907 Some(rule.id.clone())
5908 },
5909 noncurrent_version_expiration,
5910 noncurrent_version_transitions: None,
5911 prefix: None,
5912 status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5913 transitions,
5914 }
5915}
5916
5917#[derive(Debug, Clone)]
5948pub struct SigV4aGate {
5949 store: crate::sigv4a::SharedSigV4aCredentialStore,
5950}
5951
5952impl SigV4aGate {
5953 #[must_use]
5954 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
5955 Self { store }
5956 }
5957
5958 pub fn pre_route<B>(
5974 &self,
5975 req: &http::Request<B>,
5976 requested_region: &str,
5977 canonical_request_bytes: &[u8],
5978 ) -> Result<(), SigV4aGateError> {
5979 if !crate::sigv4a::detect(req) {
5980 return Ok(());
5981 }
5982 let auth_hdr = req
5983 .headers()
5984 .get(http::header::AUTHORIZATION)
5985 .and_then(|v| v.to_str().ok())
5986 .ok_or(SigV4aGateError::MissingAuthorization)?;
5987 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
5988 .ok_or(SigV4aGateError::MalformedAuthorization)?;
5989 let region_set = req
5990 .headers()
5991 .get(crate::sigv4a::REGION_SET_HEADER)
5992 .and_then(|v| v.to_str().ok())
5993 .unwrap_or("*");
5994 let key = self
5995 .store
5996 .get(&parsed.access_key_id)
5997 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
5998 crate::sigv4a::verify(
5999 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
6000 &parsed.signature_der,
6001 key,
6002 region_set,
6003 requested_region,
6004 )
6005 .map_err(SigV4aGateError::Verify)?;
6006 Ok(())
6007 }
6008}
6009
6010#[derive(Debug, thiserror::Error)]
6014pub enum SigV4aGateError {
6015 #[error("missing Authorization header")]
6016 MissingAuthorization,
6017 #[error("malformed SigV4a Authorization header")]
6018 MalformedAuthorization,
6019 #[error("unknown SigV4a access-key-id: {0}")]
6020 UnknownAccessKey(String),
6021 #[error("SigV4a verification failed: {0}")]
6022 Verify(#[source] crate::sigv4a::SigV4aError),
6023}
6024
6025impl SigV4aGateError {
6026 #[must_use]
6028 pub fn s3_error_code(&self) -> &'static str {
6029 match self {
6030 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
6031 _ => "SignatureDoesNotMatch",
6032 }
6033 }
6034}
6035
6036#[cfg(test)]
6037mod tests {
6038 use super::*;
6039
6040 #[test]
6041 fn manifest_roundtrip_via_metadata() {
6042 let original = ChunkManifest {
6043 codec: CodecKind::CpuZstd,
6044 original_size: 1234,
6045 compressed_size: 567,
6046 crc32c: 0xdead_beef,
6047 };
6048 let mut meta: Option<Metadata> = None;
6049 write_manifest(&mut meta, &original);
6050 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
6051 assert_eq!(extracted.codec, original.codec);
6052 assert_eq!(extracted.original_size, original.original_size);
6053 assert_eq!(extracted.compressed_size, original.compressed_size);
6054 assert_eq!(extracted.crc32c, original.crc32c);
6055 }
6056
6057 #[test]
6058 fn missing_metadata_yields_none() {
6059 let meta: Option<Metadata> = None;
6060 assert!(extract_manifest(&meta).is_none());
6061 }
6062
6063 #[test]
6064 fn partial_metadata_yields_none() {
6065 let mut meta = Metadata::new();
6066 meta.insert(META_CODEC.into(), "cpu-zstd".into());
6067 let opt = Some(meta);
6068 assert!(extract_manifest(&opt).is_none());
6069 }
6070
6071 #[test]
6072 fn parse_copy_source_range_basic() {
6073 let r = parse_copy_source_range("bytes=10-20").unwrap();
6074 match r {
6075 s3s::dto::Range::Int { first, last } => {
6076 assert_eq!(first, 10);
6077 assert_eq!(last, Some(20));
6078 }
6079 _ => panic!("expected Int range"),
6080 }
6081 }
6082
6083 #[test]
6084 fn parse_copy_source_range_rejects_inverted() {
6085 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
6086 assert!(err.contains("last < first"));
6087 }
6088
6089 #[test]
6090 fn parse_copy_source_range_rejects_missing_prefix() {
6091 let err = parse_copy_source_range("10-20").unwrap_err();
6092 assert!(err.contains("must start with 'bytes='"));
6093 }
6094
6095 #[test]
6096 fn parse_copy_source_range_rejects_open_ended() {
6097 assert!(parse_copy_source_range("bytes=10-").is_err());
6100 assert!(parse_copy_source_range("bytes=-10").is_err());
6101 }
6102
6103 #[test]
6109 fn safe_object_uri_basic_ascii() {
6110 let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
6111 assert_eq!(uri.path(), "/bucket/key");
6112 }
6113
6114 #[test]
6115 fn safe_object_uri_encodes_spaces() {
6116 let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
6117 assert!(
6119 uri.path().contains("%20"),
6120 "expected percent-encoded space, got {}",
6121 uri.path()
6122 );
6123 assert!(uri.path().starts_with("/bucket/"));
6124 }
6125
6126 #[test]
6127 fn safe_object_uri_preserves_slashes() {
6128 let uri =
6132 safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
6133 assert_eq!(uri.path(), "/bucket/key/with/slashes");
6134 }
6135
6136 #[test]
6137 fn safe_object_uri_handles_newline_without_panic() {
6138 let _ = safe_object_uri("bucket", "key\n");
6142 }
6143
6144 #[test]
6145 fn safe_object_uri_handles_null_byte_without_panic() {
6146 let _ = safe_object_uri("bucket", "key\0bad");
6147 }
6148
6149 #[test]
6150 fn safe_object_uri_handles_unicode_without_panic() {
6151 let _ = safe_object_uri("bucket", "rtl\u{202E}override");
6153 let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
6154 let _ = safe_object_uri("bucket", "日本語キー");
6155 }
6156
6157 #[test]
6158 fn safe_object_uri_no_panic_for_every_byte() {
6159 for b in 0u8..=255 {
6164 let s = String::from_utf8_lossy(&[b]).into_owned();
6165 let _ = safe_object_uri("bucket", &s);
6166 }
6167 }
6168}