1use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54const SAMPLE_BYTES: usize = 4096;
56
57const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
65 .add(b' ')
66 .add(b'"')
67 .add(b'#')
68 .add(b'<')
69 .add(b'>')
70 .add(b'?')
71 .add(b'`')
72 .add(b'{')
73 .add(b'}')
74 .add(b'|')
75 .add(b'\\')
76 .add(b'^')
77 .add(b'[')
78 .add(b']')
79 .add(b'%');
80
81pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
93 use percent_encoding::utf8_percent_encode;
94 let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
95 let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
96 let raw = format!("/{bucket_enc}/{key_enc}");
97 raw.parse::<http::Uri>().map_err(|e| {
98 let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
107 .unwrap_or(S3ErrorCode::InvalidArgument);
108 S3Error::with_message(
109 code,
110 format!("object key cannot be encoded as a request URI: {e}"),
111 )
112 })
113}
114
115struct AccessLogPreamble {
119 remote_ip: Option<String>,
120 requester: Option<String>,
121 request_uri: String,
122 user_agent: Option<String>,
123}
124
125pub struct S4Service<B: S3> {
126 backend: Arc<B>,
131 registry: Arc<CodecRegistry>,
132 dispatcher: Arc<dyn CodecDispatcher>,
133 max_body_bytes: usize,
134 policy: Option<crate::policy::SharedPolicy>,
135 secure_transport: bool,
140 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
142 access_log: Option<crate::access_log::SharedAccessLog>,
144 sse_keyring: Option<crate::sse::SharedSseKeyring>,
152 versioning: Option<Arc<crate::versioning::VersioningManager>>,
162 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
171 kms_default_key_id: Option<String>,
172 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
183 cors: Option<Arc<crate::cors::CorsManager>>,
192 inventory: Option<Arc<crate::inventory::InventoryManager>>,
203 notifications: Option<Arc<crate::notifications::NotificationManager>>,
211 lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
224 tagging: Option<Arc<crate::tagging::TagManager>>,
237 replication: Option<Arc<crate::replication::ReplicationManager>>,
256 mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
265 compliance_strict: bool,
271 sigv4a_gate: Option<Arc<SigV4aGate>>,
281}
282
283impl<B: S3> S4Service<B> {
284 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
286
287 pub fn new(
288 backend: B,
289 registry: Arc<CodecRegistry>,
290 dispatcher: Arc<dyn CodecDispatcher>,
291 ) -> Self {
292 Self {
293 backend: Arc::new(backend),
294 registry,
295 dispatcher,
296 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
297 policy: None,
298 secure_transport: false,
299 rate_limits: None,
300 access_log: None,
301 sse_keyring: None,
302 versioning: None,
303 kms: None,
304 kms_default_key_id: None,
305 object_lock: None,
306 cors: None,
307 inventory: None,
308 notifications: None,
309 lifecycle: None,
310 tagging: None,
311 replication: None,
312 mfa_delete: None,
313 compliance_strict: false,
314 sigv4a_gate: None,
315 }
316 }
317
318 #[must_use]
327 pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
328 self.sigv4a_gate = Some(gate);
329 self
330 }
331
332 #[must_use]
339 pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
340 self.sigv4a_gate.as_ref()
341 }
342
343 #[must_use]
350 pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
351 self.tagging = Some(mgr);
352 self
353 }
354
355 #[must_use]
359 pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
360 self.tagging.as_ref()
361 }
362
363 #[must_use]
373 pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
374 self.inventory = Some(mgr);
375 self
376 }
377
378 #[must_use]
383 pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
384 self.inventory.as_ref()
385 }
386
387 #[must_use]
397 pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
398 self.lifecycle = Some(mgr);
399 self
400 }
401
402 #[must_use]
407 pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
408 self.lifecycle.as_ref()
409 }
410
411 #[must_use]
420 pub fn run_lifecycle_once_for_test(
421 &self,
422 bucket: &str,
423 objects: &[crate::lifecycle::EvaluateBatchEntry],
424 ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
425 let Some(mgr) = self.lifecycle.as_ref() else {
426 return Vec::new();
427 };
428 crate::lifecycle::evaluate_batch(mgr, bucket, objects)
429 }
430
431 #[must_use]
441 pub fn with_notifications(
442 mut self,
443 mgr: Arc<crate::notifications::NotificationManager>,
444 ) -> Self {
445 self.notifications = Some(mgr);
446 self
447 }
448
449 #[must_use]
453 pub fn notifications_manager(
454 &self,
455 ) -> Option<&Arc<crate::notifications::NotificationManager>> {
456 self.notifications.as_ref()
457 }
458
459 fn fire_delete_notification(
464 &self,
465 bucket: &str,
466 key: &str,
467 event: crate::notifications::EventType,
468 version_id: Option<String>,
469 ) {
470 let Some(mgr) = self.notifications.as_ref() else {
471 return;
472 };
473 let dests = mgr.match_destinations(bucket, &event, key);
474 if dests.is_empty() {
475 return;
476 }
477 tokio::spawn(crate::notifications::dispatch_event(
478 Arc::clone(mgr),
479 bucket.to_owned(),
480 key.to_owned(),
481 event,
482 None,
483 None,
484 version_id,
485 format!("S4-{}", uuid::Uuid::new_v4()),
486 ));
487 }
488
489 #[must_use]
501 pub fn with_replication(
502 mut self,
503 mgr: Arc<crate::replication::ReplicationManager>,
504 ) -> Self {
505 self.replication = Some(mgr);
506 self
507 }
508
509 #[must_use]
513 pub fn replication_manager(
514 &self,
515 ) -> Option<&Arc<crate::replication::ReplicationManager>> {
516 self.replication.as_ref()
517 }
518
519 fn spawn_replication_if_matched(
529 &self,
530 source_bucket: &str,
531 source_key: &str,
532 request_tags: &Option<crate::tagging::TagSet>,
533 body: &bytes::Bytes,
534 metadata: &Option<std::collections::HashMap<String, String>>,
535 backend_ok: bool,
536 ) where
537 B: Send + Sync + 'static,
538 {
539 if !backend_ok {
540 return;
541 }
542 let Some(mgr) = self.replication.as_ref() else {
543 return;
544 };
545 let object_tags: Vec<(String, String)> = request_tags
552 .as_ref()
553 .map(|ts| ts.iter().cloned().collect())
554 .unwrap_or_default();
555 let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
556 return;
557 };
558 mgr.record_status(
562 source_bucket,
563 source_key,
564 crate::replication::ReplicationStatus::Pending,
565 );
566 let mgr_cl = Arc::clone(mgr);
567 let backend = Arc::clone(&self.backend);
568 let body_cl = body.clone();
569 let metadata_cl = metadata.clone();
570 let source_bucket_cl = source_bucket.to_owned();
571 let source_key_cl = source_key.to_owned();
572 tokio::spawn(async move {
573 let do_put = move |dest_bucket: String,
574 dest_key: String,
575 dest_body: bytes::Bytes,
576 dest_meta: Option<std::collections::HashMap<String, String>>| {
577 let backend = Arc::clone(&backend);
578 async move {
579 let req = S3Request {
580 input: PutObjectInput {
581 bucket: dest_bucket,
582 key: dest_key,
583 body: Some(bytes_to_blob(dest_body)),
584 metadata: dest_meta,
585 ..Default::default()
586 },
587 method: http::Method::PUT,
588 uri: "/".parse().unwrap(),
589 headers: http::HeaderMap::new(),
590 extensions: http::Extensions::new(),
591 credentials: None,
592 region: None,
593 service: None,
594 trailing_headers: None,
595 };
596 backend
597 .put_object(req)
598 .await
599 .map(|_| ())
600 .map_err(|e| format!("destination put_object: {e}"))
601 }
602 };
603 crate::replication::replicate_object(
604 rule,
605 source_bucket_cl,
606 source_key_cl,
607 body_cl,
608 metadata_cl,
609 do_put,
610 mgr_cl,
611 )
612 .await;
613 });
614 }
615
616 #[must_use]
623 pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
624 self.mfa_delete = Some(mgr);
625 self
626 }
627
628 #[must_use]
632 pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
633 self.mfa_delete.as_ref()
634 }
635
636 #[must_use]
642 pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
643 self.cors = Some(mgr);
644 self
645 }
646
647 #[must_use]
649 pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
650 self.cors.as_ref()
651 }
652
653 #[must_use]
670 pub fn handle_preflight(
671 &self,
672 bucket: &str,
673 origin: &str,
674 method: &str,
675 request_headers: &[String],
676 ) -> Option<std::collections::HashMap<String, String>> {
677 let mgr = self.cors.as_ref()?;
678 let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
679 let mut h = std::collections::HashMap::new();
680 let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
684 "*".to_string()
685 } else {
686 origin.to_string()
687 };
688 h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
689 h.insert(
690 "Access-Control-Allow-Methods".to_string(),
691 rule.allowed_methods.join(", "),
692 );
693 if !rule.allowed_headers.is_empty() {
694 h.insert(
699 "Access-Control-Allow-Headers".to_string(),
700 rule.allowed_headers.join(", "),
701 );
702 }
703 if let Some(secs) = rule.max_age_seconds {
704 h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
705 }
706 if !rule.expose_headers.is_empty() {
707 h.insert(
708 "Access-Control-Expose-Headers".to_string(),
709 rule.expose_headers.join(", "),
710 );
711 }
712 Some(h)
713 }
714
715 #[must_use]
722 pub fn with_compliance_strict(mut self, on: bool) -> Self {
723 self.compliance_strict = on;
724 self
725 }
726
727 #[must_use]
733 pub fn with_object_lock(
734 mut self,
735 mgr: Arc<crate::object_lock::ObjectLockManager>,
736 ) -> Self {
737 self.object_lock = Some(mgr);
738 self
739 }
740
741 #[must_use]
749 pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
750 self.object_lock.as_ref()
751 }
752
753 #[must_use]
757 pub fn with_kms_backend(
758 mut self,
759 kms: Arc<dyn crate::kms::KmsBackend>,
760 default_key_id: Option<String>,
761 ) -> Self {
762 self.kms = Some(kms);
763 self.kms_default_key_id = default_key_id;
764 self
765 }
766
767 #[must_use]
779 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
780 self.versioning = Some(mgr);
781 self
782 }
783
784 #[must_use]
791 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
792 let keyring = crate::sse::SseKeyring::new(1, key);
793 self.sse_keyring = Some(std::sync::Arc::new(keyring));
794 self
795 }
796
797 #[must_use]
803 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
804 self.sse_keyring = Some(keyring);
805 self
806 }
807
808 #[must_use]
814 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
815 self.access_log = Some(log);
816 self
817 }
818
819 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
824 self.access_log.as_ref()?;
825 Some(AccessLogPreamble {
826 remote_ip: req
827 .headers
828 .get("x-forwarded-for")
829 .and_then(|v| v.to_str().ok())
830 .and_then(|raw| raw.split(',').next())
831 .map(|s| s.trim().to_owned()),
832 requester: Self::principal_of(req).map(str::to_owned),
833 request_uri: format!("{} {}", req.method, req.uri.path()),
834 user_agent: req
835 .headers
836 .get("user-agent")
837 .and_then(|v| v.to_str().ok())
838 .map(str::to_owned),
839 })
840 }
841
842 #[allow(clippy::too_many_arguments)]
846 async fn record_access(
847 &self,
848 preamble: Option<AccessLogPreamble>,
849 operation: &'static str,
850 bucket: &str,
851 key: Option<&str>,
852 http_status: u16,
853 bytes_sent: u64,
854 object_size: u64,
855 total_time_ms: u64,
856 error_code: Option<&str>,
857 ) {
858 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
859 return;
860 };
861 log.record(crate::access_log::AccessLogEntry {
862 time: std::time::SystemTime::now(),
863 bucket: bucket.to_owned(),
864 remote_ip: p.remote_ip,
865 requester: p.requester,
866 operation,
867 key: key.map(str::to_owned),
868 request_uri: p.request_uri,
869 http_status,
870 error_code: error_code.map(str::to_owned),
871 bytes_sent,
872 object_size,
873 total_time_ms,
874 user_agent: p.user_agent,
875 })
876 .await;
877 }
878
879 #[must_use]
885 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
886 self.rate_limits = Some(rl);
887 self
888 }
889
890 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
894 let Some(rl) = self.rate_limits.as_ref() else {
895 return Ok(());
896 };
897 let principal_id = Self::principal_of(req);
898 if !rl.check(principal_id, bucket) {
899 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
900 return Err(S3Error::with_message(
901 S3ErrorCode::SlowDown,
902 format!("rate-limited: bucket={bucket}"),
903 ));
904 }
905 Ok(())
906 }
907
908 #[must_use]
912 pub fn with_secure_transport(mut self, on: bool) -> Self {
913 self.secure_transport = on;
914 self
915 }
916
917 #[must_use]
918 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
919 self.max_body_bytes = n;
920 self
921 }
922
923 #[must_use]
928 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
929 self.policy = Some(policy);
930 self
931 }
932
933 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
936 req.credentials.as_ref().map(|c| c.access_key.as_str())
937 }
938
939 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
946 let user_agent = req
947 .headers
948 .get("user-agent")
949 .and_then(|v| v.to_str().ok())
950 .map(str::to_owned);
951 let source_ip = req
954 .headers
955 .get("x-forwarded-for")
956 .and_then(|v| v.to_str().ok())
957 .and_then(|raw| raw.split(',').next())
958 .and_then(|s| s.trim().parse().ok());
959 crate::policy::RequestContext {
960 source_ip,
961 user_agent,
962 request_time: Some(std::time::SystemTime::now()),
963 secure_transport: self.secure_transport,
964 existing_object_tags: None,
965 request_object_tags: None,
966 extra: Default::default(),
967 }
968 }
969
970 fn enforce_policy<I>(
975 &self,
976 req: &S3Request<I>,
977 action: &'static str,
978 bucket: &str,
979 key: Option<&str>,
980 ) -> S3Result<()> {
981 self.enforce_policy_with_extra(req, action, bucket, key, None, None)
982 }
983
984 fn enforce_policy_with_extra<I>(
991 &self,
992 req: &S3Request<I>,
993 action: &'static str,
994 bucket: &str,
995 key: Option<&str>,
996 request_tags: Option<&crate::tagging::TagSet>,
997 existing_tags: Option<&crate::tagging::TagSet>,
998 ) -> S3Result<()> {
999 let Some(policy) = self.policy.as_ref() else {
1000 return Ok(());
1001 };
1002 let principal_id = Self::principal_of(req);
1003 let mut ctx = self.request_context(req);
1004 if let Some(t) = request_tags {
1005 ctx.request_object_tags = Some(t.clone());
1006 }
1007 if let Some(t) = existing_tags {
1008 ctx.existing_object_tags = Some(t.clone());
1009 }
1010 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1011 if decision.allow {
1012 Ok(())
1013 } else {
1014 crate::metrics::record_policy_denial(action, bucket);
1015 tracing::info!(
1016 action,
1017 bucket,
1018 key = ?key,
1019 principal = ?principal_id,
1020 source_ip = ?ctx.source_ip,
1021 user_agent = ?ctx.user_agent,
1022 secure_transport = ctx.secure_transport,
1023 matched_sid = ?decision.matched_sid,
1024 effect = ?decision.matched_effect,
1025 "S4 policy denied request"
1026 );
1027 Err(S3Error::with_message(
1028 S3ErrorCode::AccessDenied,
1029 format!("denied by S4 policy: {action} on bucket={bucket}"),
1030 ))
1031 }
1032 }
1033
1034 pub fn into_backend(self) -> B {
1040 Arc::try_unwrap(self.backend)
1041 .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1042 }
1043
1044 async fn partial_range_get(
1047 &self,
1048 req: &S3Request<GetObjectInput>,
1049 plan: s4_codec::index::RangePlan,
1050 client_start: u64,
1051 client_end_exclusive: u64,
1052 total_original: u64,
1053 get_start: Instant,
1054 ) -> S3Result<S3Response<GetObjectOutput>> {
1055 let backend_range = s3s::dto::Range::Int {
1057 first: plan.byte_start,
1058 last: Some(plan.byte_end_exclusive - 1),
1059 };
1060 let backend_input = GetObjectInput {
1061 bucket: req.input.bucket.clone(),
1062 key: req.input.key.clone(),
1063 range: Some(backend_range),
1064 ..Default::default()
1065 };
1066 let backend_req = S3Request {
1067 input: backend_input,
1068 method: req.method.clone(),
1069 uri: req.uri.clone(),
1070 headers: req.headers.clone(),
1071 extensions: http::Extensions::new(),
1072 credentials: req.credentials.clone(),
1073 region: req.region.clone(),
1074 service: req.service.clone(),
1075 trailing_headers: None,
1076 };
1077 let mut backend_resp = self.backend.get_object(backend_req).await?;
1078 let blob = backend_resp.output.body.take().ok_or_else(|| {
1079 S3Error::with_message(
1080 S3ErrorCode::InternalError,
1081 "backend partial GET returned empty body",
1082 )
1083 })?;
1084 let bytes = collect_blob(blob, self.max_body_bytes)
1085 .await
1086 .map_err(internal("collect partial body"))?;
1087
1088 let mut combined = BytesMut::new();
1090 for frame in FrameIter::new(bytes) {
1091 let (header, payload) = frame.map_err(|e| {
1092 S3Error::with_message(
1093 S3ErrorCode::InternalError,
1094 format!("partial-range frame parse: {e}"),
1095 )
1096 })?;
1097 let chunk_manifest = ChunkManifest {
1098 codec: header.codec,
1099 original_size: header.original_size,
1100 compressed_size: header.compressed_size,
1101 crc32c: header.crc32c,
1102 };
1103 let decompressed = self
1104 .registry
1105 .decompress(payload, &chunk_manifest)
1106 .await
1107 .map_err(internal("partial-range decompress"))?;
1108 combined.extend_from_slice(&decompressed);
1109 }
1110 let combined = combined.freeze();
1111 let sliced = combined
1112 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1113
1114 let returned_size = sliced.len() as u64;
1116 backend_resp.output.content_length = Some(returned_size as i64);
1117 backend_resp.output.content_range = Some(format!(
1118 "bytes {client_start}-{}/{total_original}",
1119 client_end_exclusive - 1
1120 ));
1121 backend_resp.output.checksum_crc32 = None;
1122 backend_resp.output.checksum_crc32c = None;
1123 backend_resp.output.checksum_crc64nvme = None;
1124 backend_resp.output.checksum_sha1 = None;
1125 backend_resp.output.checksum_sha256 = None;
1126 backend_resp.output.e_tag = None;
1127 backend_resp.output.body = Some(bytes_to_blob(sliced));
1128 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1129
1130 let elapsed = get_start.elapsed();
1131 crate::metrics::record_get(
1132 "partial",
1133 plan.byte_end_exclusive - plan.byte_start,
1134 returned_size,
1135 elapsed.as_secs_f64(),
1136 true,
1137 );
1138 info!(
1139 op = "get_object",
1140 bucket = %req.input.bucket,
1141 key = %req.input.key,
1142 bytes_in = plan.byte_end_exclusive - plan.byte_start,
1143 bytes_out = returned_size,
1144 total_object_size = total_original,
1145 range = true,
1146 path = "sidecar-partial",
1147 latency_ms = elapsed.as_millis() as u64,
1148 "S4 partial Range GET via sidecar index"
1149 );
1150 Ok(backend_resp)
1151 }
1152
1153 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1157 let bytes = encode_index(index);
1158 let len = bytes.len() as i64;
1159 let sidecar = sidecar_key(key);
1160 let uri = match safe_object_uri(bucket, &sidecar) {
1166 Ok(u) => u,
1167 Err(e) => {
1168 tracing::warn!(
1169 bucket,
1170 key,
1171 "S4 write_sidecar skipped (key not URI-encodable): {e}"
1172 );
1173 return;
1174 }
1175 };
1176 let put_input = PutObjectInput {
1177 bucket: bucket.into(),
1178 key: sidecar,
1179 body: Some(bytes_to_blob(bytes)),
1180 content_length: Some(len),
1181 content_type: Some("application/x-s4-index".into()),
1182 ..Default::default()
1183 };
1184 let put_req = S3Request {
1185 input: put_input,
1186 method: http::Method::PUT,
1187 uri,
1188 headers: http::HeaderMap::new(),
1189 extensions: http::Extensions::new(),
1190 credentials: None,
1191 region: None,
1192 service: None,
1193 trailing_headers: None,
1194 };
1195 if let Err(e) = self.backend.put_object(put_req).await {
1196 tracing::warn!(
1197 bucket,
1198 key,
1199 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1200 );
1201 }
1202 }
1203
1204 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1206 let sidecar = sidecar_key(key);
1207 let uri = safe_object_uri(bucket, &sidecar).ok()?;
1209 let get_input = GetObjectInput {
1210 bucket: bucket.into(),
1211 key: sidecar,
1212 ..Default::default()
1213 };
1214 let get_req = S3Request {
1215 input: get_input,
1216 method: http::Method::GET,
1217 uri,
1218 headers: http::HeaderMap::new(),
1219 extensions: http::Extensions::new(),
1220 credentials: None,
1221 region: None,
1222 service: None,
1223 trailing_headers: None,
1224 };
1225 let resp = self.backend.get_object(get_req).await.ok()?;
1226 let blob = resp.output.body?;
1227 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1228 decode_index(bytes).ok()
1229 }
1230
1231 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1237 let mut out = BytesMut::new();
1238 for frame in FrameIter::new(bytes) {
1239 let (header, payload) = frame.map_err(|e| {
1240 S3Error::with_message(
1241 S3ErrorCode::InternalError,
1242 format!("multipart frame parse: {e}"),
1243 )
1244 })?;
1245 let chunk_manifest = ChunkManifest {
1246 codec: header.codec,
1247 original_size: header.original_size,
1248 compressed_size: header.compressed_size,
1249 crc32c: header.crc32c,
1250 };
1251 let decompressed = self
1252 .registry
1253 .decompress(payload, &chunk_manifest)
1254 .await
1255 .map_err(internal("multipart frame decompress"))?;
1256 out.extend_from_slice(&decompressed);
1257 }
1258 Ok(out.freeze())
1259 }
1260}
1261
1262fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1267 let rest = s
1268 .strip_prefix("bytes=")
1269 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1270 let (a, b) = rest
1271 .split_once('-')
1272 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1273 let first: u64 = a
1274 .parse()
1275 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1276 let last: u64 = b
1277 .parse()
1278 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1279 if last < first {
1280 return Err(format!("CopySourceRange last < first: {s:?}"));
1281 }
1282 Ok(s3s::dto::Range::Int {
1283 first,
1284 last: Some(last),
1285 })
1286}
1287
1288pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1302 format!("{key}.__s4ver__/{version_id}")
1303}
1304
1305fn is_versioning_shadow_key(key: &str) -> bool {
1308 key.contains(".__s4ver__/")
1309}
1310
1311fn current_unix_secs() -> u64 {
1317 std::time::SystemTime::now()
1318 .duration_since(std::time::UNIX_EPOCH)
1319 .map(|d| d.as_secs())
1320 .unwrap_or(0)
1321}
1322
1323fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1331 match e {
1332 crate::mfa::MfaError::Missing => S3Error::with_message(
1333 S3ErrorCode::AccessDenied,
1334 "MFA token required for this operation",
1335 ),
1336 crate::mfa::MfaError::Malformed => S3Error::with_message(
1337 S3ErrorCode::InvalidRequest,
1338 "malformed x-amz-mfa header",
1339 ),
1340 crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1341 S3ErrorCode::AccessDenied,
1342 "MFA serial does not match configured device",
1343 ),
1344 crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1345 S3ErrorCode::AccessDenied,
1346 "invalid MFA code",
1347 ),
1348 }
1349}
1350
1351fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1352 metadata
1353 .as_ref()
1354 .and_then(|m| m.get(META_MULTIPART))
1355 .map(|v| v == "true")
1356 .unwrap_or(false)
1357}
1358
1359const META_CODEC: &str = "s4-codec";
1360const META_ORIGINAL_SIZE: &str = "s4-original-size";
1361const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1362const META_CRC32C: &str = "s4-crc32c";
1363const META_MULTIPART: &str = "s4-multipart";
1366const META_FRAMED: &str = "s4-framed";
1370
1371fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1372 metadata
1373 .as_ref()
1374 .and_then(|m| m.get(META_FRAMED))
1375 .map(|v| v == "true")
1376 .unwrap_or(false)
1377}
1378
1379fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1381 metadata
1382 .as_ref()
1383 .and_then(|m| m.get("s4-encrypted"))
1384 .map(|v| v == "aes-256-gcm")
1385 .unwrap_or(false)
1386}
1387
1388fn extract_sse_c_material(
1395 algorithm: &Option<String>,
1396 key: &Option<String>,
1397 md5: &Option<String>,
1398) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1399 match (algorithm, key, md5) {
1400 (None, None, None) => Ok(None),
1401 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1402 .map(Some)
1403 .map_err(sse_c_error_to_s3),
1404 _ => Err(S3Error::with_message(
1405 S3ErrorCode::InvalidRequest,
1406 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1407 )),
1408 }
1409}
1410
1411fn extract_kms_key_id(
1414 sse: &Option<ServerSideEncryption>,
1415 sse_kms_key_id: &Option<String>,
1416 gateway_default: Option<&str>,
1417) -> Option<String> {
1418 let asks_for_kms = sse
1419 .as_ref()
1420 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1421 .unwrap_or(false);
1422 if !asks_for_kms {
1423 return None;
1424 }
1425 sse_kms_key_id
1426 .clone()
1427 .or_else(|| gateway_default.map(str::to_owned))
1428}
1429
1430fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1434 use crate::kms::KmsError as K;
1435 match e {
1436 K::KeyNotFound { key_id } => S3Error::with_message(
1437 S3ErrorCode::InvalidArgument,
1438 format!("KMS key not found: {key_id}"),
1439 ),
1440 K::BackendUnavailable { message } => S3Error::with_message(
1441 S3ErrorCode::ServiceUnavailable,
1442 format!("KMS backend unavailable: {message}"),
1443 ),
1444 other => S3Error::with_message(
1445 S3ErrorCode::InternalError,
1446 format!("KMS error: {other}"),
1447 ),
1448 }
1449}
1450
1451fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1455 use crate::sse::SseError as E;
1456 match e {
1457 E::WrongCustomerKey => S3Error::with_message(
1458 S3ErrorCode::AccessDenied,
1459 "SSE-C key does not match the key used at PUT time",
1460 ),
1461 E::InvalidCustomerKey { reason } => S3Error::with_message(
1462 S3ErrorCode::InvalidArgument,
1463 format!("SSE-C: {reason}"),
1464 ),
1465 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1466 S3ErrorCode::InvalidArgument,
1467 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1468 ),
1469 E::CustomerKeyRequired => S3Error::with_message(
1470 S3ErrorCode::InvalidRequest,
1471 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1472 ),
1473 E::CustomerKeyUnexpected => S3Error::with_message(
1474 S3ErrorCode::InvalidRequest,
1475 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1476 ),
1477 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1478 }
1479}
1480
1481fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1482 let m = metadata.as_ref()?;
1483 let codec = m
1484 .get(META_CODEC)
1485 .and_then(|s| s.parse::<CodecKind>().ok())?;
1486 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1487 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1488 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1489 Some(ChunkManifest {
1490 codec,
1491 original_size,
1492 compressed_size,
1493 crc32c,
1494 })
1495}
1496
1497fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1498 let meta = metadata.get_or_insert_with(Default::default);
1499 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1500 meta.insert(
1501 META_ORIGINAL_SIZE.into(),
1502 manifest.original_size.to_string(),
1503 );
1504 meta.insert(
1505 META_COMPRESSED_SIZE.into(),
1506 manifest.compressed_size.to_string(),
1507 );
1508 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1509}
1510
1511fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1512 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1513}
1514
1515fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1521 use crate::select::SelectError;
1522 match e {
1523 SelectError::Parse(msg) => S3Error::with_message(
1524 S3ErrorCode::InvalidRequest,
1525 format!("SQL parse error: {msg}"),
1526 ),
1527 SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1528 S3ErrorCode::InvalidRequest,
1529 format!("unsupported SQL feature: {msg}"),
1530 ),
1531 SelectError::RowEval(msg) => S3Error::with_message(
1532 S3ErrorCode::InvalidRequest,
1533 format!("SQL row evaluation error: {msg}"),
1534 ),
1535 SelectError::InputFormat(msg) => S3Error::with_message(
1536 S3ErrorCode::InvalidRequest,
1537 format!("{fmt} input format error: {msg}"),
1538 ),
1539 }
1540}
1541
1542fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1546 headers
1547 .get("x-amz-bypass-governance-retention")
1548 .and_then(|v| v.to_str().ok())
1549 .map(|s| s.eq_ignore_ascii_case("true"))
1550 .unwrap_or(false)
1551}
1552
1553fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1559 let mut buf = Vec::new();
1560 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1561 let s = std::str::from_utf8(&buf).ok()?;
1562 chrono::DateTime::parse_from_rfc3339(s)
1563 .ok()
1564 .map(|dt| dt.with_timezone(&chrono::Utc))
1565}
1566
1567fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1570 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1575 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1576}
1577
1578fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1583 set.iter()
1584 .map(|(k, v)| Tag {
1585 key: Some(k.clone()),
1586 value: Some(v.clone()),
1587 })
1588 .collect()
1589}
1590
1591fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1596 let pairs = tags
1597 .iter()
1598 .map(|t| {
1599 (
1600 t.key.clone().unwrap_or_default(),
1601 t.value.clone().unwrap_or_default(),
1602 )
1603 })
1604 .collect();
1605 crate::tagging::TagSet::from_pairs(pairs)
1606}
1607
1608pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1612 if total == 0 {
1613 return Err("cannot range-get zero-length object".into());
1614 }
1615 match range {
1616 s3s::dto::Range::Int { first, last } => {
1617 let start = *first;
1618 let end_inclusive = match last {
1619 Some(l) => (*l).min(total - 1),
1620 None => total - 1,
1621 };
1622 if start > end_inclusive || start >= total {
1623 return Err(format!(
1624 "range bytes={start}-{:?} out of object size {total}",
1625 last
1626 ));
1627 }
1628 Ok((start, end_inclusive + 1))
1629 }
1630 s3s::dto::Range::Suffix { length } => {
1631 let len = (*length).min(total);
1632 Ok((total - len, total))
1633 }
1634 }
1635}
1636
1637#[async_trait::async_trait]
1638impl<B: S3> S3 for S4Service<B> {
1639 #[tracing::instrument(
1641 name = "s4.put_object",
1642 skip(self, req),
1643 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1644 )]
1645 async fn put_object(
1646 &self,
1647 mut req: S3Request<PutObjectInput>,
1648 ) -> S3Result<S3Response<PutObjectOutput>> {
1649 let put_start = Instant::now();
1650 let put_bucket = req.input.bucket.clone();
1651 let put_key = req.input.key.clone();
1652 let access_preamble = self.access_log_preamble(&req);
1653 self.enforce_rate_limit(&req, &put_bucket)?;
1654 let request_tags: Option<crate::tagging::TagSet> = req
1660 .input
1661 .tagging
1662 .as_deref()
1663 .map(crate::tagging::parse_tagging_header)
1664 .transpose()
1665 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1666 let existing_tags: Option<crate::tagging::TagSet> = self
1667 .tagging
1668 .as_ref()
1669 .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1670 self.enforce_policy_with_extra(
1671 &req,
1672 "s3:PutObject",
1673 &put_bucket,
1674 Some(&put_key),
1675 request_tags.as_ref(),
1676 existing_tags.as_ref(),
1677 )?;
1678 if let Some(mgr) = self.object_lock.as_ref()
1686 && let Some(state) = mgr.get(&put_bucket, &put_key)
1687 {
1688 let bucket_versioned_enabled = self
1689 .versioning
1690 .as_ref()
1691 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1692 .unwrap_or(false);
1693 if !bucket_versioned_enabled {
1694 let bypass = parse_bypass_governance_header(&req.headers);
1695 let now = chrono::Utc::now();
1696 if !state.can_delete(now, bypass) {
1697 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1698 return Err(S3Error::with_message(
1699 S3ErrorCode::AccessDenied,
1700 "Access Denied because object protected by object lock",
1701 ));
1702 }
1703 }
1704 }
1705 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1711 .input
1712 .object_lock_mode
1713 .as_ref()
1714 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1715 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1716 .input
1717 .object_lock_retain_until_date
1718 .as_ref()
1719 .and_then(timestamp_to_chrono_utc);
1720 let explicit_legal_hold_on: Option<bool> = req
1721 .input
1722 .object_lock_legal_hold_status
1723 .as_ref()
1724 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1725 if let Some(blob) = req.input.body.take() {
1726 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1729 .await
1730 .map_err(internal("peek put sample"))?;
1731 let sample_len = sample.len().min(SAMPLE_BYTES);
1732 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
1733
1734 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1740 let (compressed, manifest, is_framed) = if use_framed {
1741 let chained = chain_sample_with_rest(sample, rest_stream);
1743 debug!(
1744 bucket = ?req.input.bucket,
1745 key = ?req.input.key,
1746 codec = kind.as_str(),
1747 path = "streaming-framed",
1748 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1749 );
1750 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1754 let (body, manifest) = streaming_compress_to_frames(
1755 chained,
1756 Arc::clone(&self.registry),
1757 kind,
1758 chunk_size,
1759 )
1760 .await
1761 .map_err(internal("streaming framed compress"))?;
1762 (body, manifest, true)
1763 } else {
1764 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1767 .await
1768 .map_err(internal("collect put body (buffered path)"))?;
1769 debug!(
1770 bucket = ?req.input.bucket,
1771 key = ?req.input.key,
1772 bytes = bytes.len(),
1773 codec = kind.as_str(),
1774 path = "buffered",
1775 "S4 put_object: compressing (buffered, raw blob)"
1776 );
1777 let (body, m) = self
1778 .registry
1779 .compress(bytes, kind)
1780 .await
1781 .map_err(internal("registry compress"))?;
1782 (body, m, false)
1783 };
1784
1785 write_manifest(&mut req.input.metadata, &manifest);
1786 if is_framed {
1787 req.input
1789 .metadata
1790 .get_or_insert_with(Default::default)
1791 .insert(META_FRAMED.into(), "true".into());
1792 }
1793 req.input.content_length = Some(compressed.len() as i64);
1797 req.input.checksum_algorithm = None;
1802 req.input.checksum_crc32 = None;
1803 req.input.checksum_crc32c = None;
1804 req.input.checksum_crc64nvme = None;
1805 req.input.checksum_sha1 = None;
1806 req.input.checksum_sha256 = None;
1807 req.input.content_md5 = None;
1808 let original_size = manifest.original_size;
1809 let compressed_size = manifest.compressed_size;
1810 let codec_label = manifest.codec.as_str();
1811 let sidecar_index = if is_framed {
1814 s4_codec::index::build_index_from_body(&compressed).ok()
1815 } else {
1816 None
1817 };
1818 let sse_c_alg = req.input.sse_customer_algorithm.take();
1835 let sse_c_key = req.input.sse_customer_key.take();
1836 let sse_c_md5 = req.input.sse_customer_key_md5.take();
1837 let sse_header = req.input.server_side_encryption.take();
1838 let sse_kms_key = req.input.ssekms_key_id.take();
1839 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1840 let kms_key_id = extract_kms_key_id(
1845 &sse_header,
1846 &sse_kms_key,
1847 self.kms_default_key_id.as_deref(),
1848 );
1849 if self.compliance_strict
1856 && sse_c_material.is_none()
1857 && kms_key_id.is_none()
1858 && self.sse_keyring.is_none()
1859 && sse_header.as_ref().map(|s| s.as_str())
1860 != Some(ServerSideEncryption::AES256)
1861 {
1862 return Err(S3Error::with_message(
1863 S3ErrorCode::InvalidRequest,
1864 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1865 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1866 ));
1867 }
1868 if sse_c_material.is_some() && kms_key_id.is_some() {
1871 return Err(S3Error::with_message(
1872 S3ErrorCode::InvalidArgument,
1873 "SSE-C and SSE-KMS cannot be used together on the same PUT",
1874 ));
1875 }
1876 let kms_wrap = if let Some(ref key_id) = kms_key_id {
1879 let kms = self.kms.as_ref().ok_or_else(|| {
1880 S3Error::with_message(
1881 S3ErrorCode::InvalidRequest,
1882 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1883 )
1884 })?;
1885 let (dek, wrapped) = kms
1886 .generate_dek(key_id)
1887 .await
1888 .map_err(kms_error_to_s3)?;
1889 if dek.len() != 32 {
1890 return Err(S3Error::with_message(
1891 S3ErrorCode::InternalError,
1892 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1893 ));
1894 }
1895 let mut dek_arr = [0u8; 32];
1896 dek_arr.copy_from_slice(&dek);
1897 Some((dek_arr, wrapped))
1898 } else {
1899 None
1900 };
1901 let body_to_send = if let Some(ref m) = sse_c_material {
1908 let meta = req.input.metadata.get_or_insert_with(Default::default);
1909 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1910 meta.insert("s4-sse-type".into(), "AES256".into());
1911 meta.insert("s4-sse-c-key-md5".into(),
1912 base64::engine::general_purpose::STANDARD.encode(m.key_md5));
1913 crate::sse::encrypt_with_source(
1914 &compressed,
1915 crate::sse::SseSource::CustomerKey {
1916 key: &m.key,
1917 key_md5: &m.key_md5,
1918 },
1919 )
1920 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1921 let meta = req.input.metadata.get_or_insert_with(Default::default);
1922 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1923 meta.insert("s4-sse-type".into(), "aws:kms".into());
1924 meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
1925 crate::sse::encrypt_with_source(
1926 &compressed,
1927 crate::sse::SseSource::Kms { dek, wrapped },
1928 )
1929 } else if let Some(keyring) = self.sse_keyring.as_ref() {
1930 let meta = req.input.metadata.get_or_insert_with(Default::default);
1938 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1939 crate::sse::encrypt_v2(&compressed, keyring)
1940 } else {
1941 compressed.clone()
1942 };
1943 let replication_body = body_to_send.clone();
1948 let replication_metadata = req.input.metadata.clone();
1949 req.input.content_length = Some(body_to_send.len() as i64);
1958 req.input.body = Some(bytes_to_blob(body_to_send));
1959 let pending_version: Option<crate::versioning::PutOutcome> = self
1968 .versioning
1969 .as_ref()
1970 .map(|mgr| mgr.state(&put_bucket))
1971 .map(|state| match state {
1972 crate::versioning::VersioningState::Enabled => {
1973 crate::versioning::PutOutcome {
1974 version_id: crate::versioning::VersioningManager::new_version_id(),
1975 versioned_response: true,
1976 }
1977 }
1978 crate::versioning::VersioningState::Suspended
1979 | crate::versioning::VersioningState::Unversioned => {
1980 crate::versioning::PutOutcome {
1981 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1982 versioned_response: false,
1983 }
1984 }
1985 });
1986 if let Some(ref pv) = pending_version
1987 && pv.versioned_response
1988 {
1989 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1990 }
1991 let mut backend_resp = self.backend.put_object(req).await;
1992 if let Some(idx) = sidecar_index
1993 && backend_resp.is_ok()
1994 && idx.entries.len() > 1
1995 {
1996 self.write_sidecar(&put_bucket, &put_key, &idx).await;
2002 }
2003 if let (Some(mgr), Some(pv), Ok(resp)) = (
2007 self.versioning.as_ref(),
2008 pending_version.as_ref(),
2009 backend_resp.as_mut(),
2010 ) {
2011 let etag = resp
2012 .output
2013 .e_tag
2014 .clone()
2015 .map(ETag::into_value)
2016 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2017 let now = chrono::Utc::now();
2018 mgr.commit_put_with_version(
2019 &put_bucket,
2020 &put_key,
2021 crate::versioning::VersionEntry {
2022 version_id: pv.version_id.clone(),
2023 etag,
2024 size: original_size,
2025 is_delete_marker: false,
2026 created_at: now,
2027 },
2028 );
2029 if pv.versioned_response {
2030 resp.output.version_id = Some(pv.version_id.clone());
2031 }
2032 }
2033 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2037 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2038 resp.output.sse_customer_key_md5 = Some(
2039 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2040 );
2041 }
2042 if let (Some((_, wrapped)), Ok(resp)) =
2046 (kms_wrap.as_ref(), backend_resp.as_mut())
2047 {
2048 resp.output.server_side_encryption =
2049 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2050 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2051 }
2052 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2058 if explicit_lock_mode.is_some()
2059 || explicit_retain_until.is_some()
2060 || explicit_legal_hold_on.is_some()
2061 {
2062 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2063 if let Some(m) = explicit_lock_mode {
2064 state.mode = Some(m);
2065 }
2066 if let Some(u) = explicit_retain_until {
2067 state.retain_until = Some(u);
2068 }
2069 if let Some(lh) = explicit_legal_hold_on {
2070 state.legal_hold_on = lh;
2071 }
2072 mgr.set(&put_bucket, &put_key, state);
2073 }
2074 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2075 }
2076 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
2078 crate::metrics::record_put(
2079 codec_label,
2080 original_size,
2081 compressed_size,
2082 elapsed.as_secs_f64(),
2083 backend_resp.is_ok(),
2084 );
2085 self.record_access(
2087 access_preamble,
2088 "REST.PUT.OBJECT",
2089 &put_bucket,
2090 Some(&put_key),
2091 if backend_resp.is_ok() { 200 } else { 500 },
2092 compressed_size,
2093 original_size,
2094 elapsed.as_millis() as u64,
2095 backend_resp.as_ref().err().map(|e| e.code().as_str()),
2096 )
2097 .await;
2098 info!(
2099 op = "put_object",
2100 bucket = %put_bucket,
2101 key = %put_key,
2102 codec = codec_label,
2103 bytes_in = original_size,
2104 bytes_out = compressed_size,
2105 ratio = format!(
2106 "{:.3}",
2107 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2108 ),
2109 latency_ms = elapsed.as_millis() as u64,
2110 ok = backend_resp.is_ok(),
2111 "S4 put completed"
2112 );
2113 if backend_resp.is_ok()
2118 && let Some(mgr) = self.notifications.as_ref()
2119 {
2120 let dests = mgr.match_destinations(
2121 &put_bucket,
2122 &crate::notifications::EventType::ObjectCreatedPut,
2123 &put_key,
2124 );
2125 if !dests.is_empty() {
2126 let etag = backend_resp
2127 .as_ref()
2128 .ok()
2129 .and_then(|r| r.output.e_tag.clone())
2130 .map(ETag::into_value);
2131 let version_id = pending_version
2132 .as_ref()
2133 .filter(|pv| pv.versioned_response)
2134 .map(|pv| pv.version_id.clone());
2135 tokio::spawn(crate::notifications::dispatch_event(
2136 Arc::clone(mgr),
2137 put_bucket.clone(),
2138 put_key.clone(),
2139 crate::notifications::EventType::ObjectCreatedPut,
2140 Some(original_size),
2141 etag,
2142 version_id,
2143 format!("S4-{}", uuid::Uuid::new_v4()),
2144 ));
2145 }
2146 }
2147 if backend_resp.is_ok()
2152 && let (Some(mgr), Some(tags)) =
2153 (self.tagging.as_ref(), request_tags.clone())
2154 {
2155 mgr.put_object_tags(&put_bucket, &put_key, tags);
2156 }
2157 self.spawn_replication_if_matched(
2166 &put_bucket,
2167 &put_key,
2168 &request_tags,
2169 &replication_body,
2170 &replication_metadata,
2171 backend_resp.is_ok(),
2172 );
2173 return backend_resp;
2174 }
2175 let pending_version: Option<crate::versioning::PutOutcome> = self
2179 .versioning
2180 .as_ref()
2181 .map(|mgr| mgr.state(&put_bucket))
2182 .map(|state| match state {
2183 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2184 version_id: crate::versioning::VersioningManager::new_version_id(),
2185 versioned_response: true,
2186 },
2187 _ => crate::versioning::PutOutcome {
2188 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2189 versioned_response: false,
2190 },
2191 });
2192 if let Some(ref pv) = pending_version
2193 && pv.versioned_response
2194 {
2195 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2196 }
2197 let mut backend_resp = self.backend.put_object(req).await;
2198 if let (Some(mgr), Some(pv), Ok(resp)) = (
2199 self.versioning.as_ref(),
2200 pending_version.as_ref(),
2201 backend_resp.as_mut(),
2202 ) {
2203 let etag = resp
2204 .output
2205 .e_tag
2206 .clone()
2207 .map(ETag::into_value)
2208 .unwrap_or_default();
2209 let now = chrono::Utc::now();
2210 mgr.commit_put_with_version(
2211 &put_bucket,
2212 &put_key,
2213 crate::versioning::VersionEntry {
2214 version_id: pv.version_id.clone(),
2215 etag,
2216 size: 0,
2217 is_delete_marker: false,
2218 created_at: now,
2219 },
2220 );
2221 if pv.versioned_response {
2222 resp.output.version_id = Some(pv.version_id.clone());
2223 }
2224 }
2225 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2229 if explicit_lock_mode.is_some()
2230 || explicit_retain_until.is_some()
2231 || explicit_legal_hold_on.is_some()
2232 {
2233 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2234 if let Some(m) = explicit_lock_mode {
2235 state.mode = Some(m);
2236 }
2237 if let Some(u) = explicit_retain_until {
2238 state.retain_until = Some(u);
2239 }
2240 if let Some(lh) = explicit_legal_hold_on {
2241 state.legal_hold_on = lh;
2242 }
2243 mgr.set(&put_bucket, &put_key, state);
2244 }
2245 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2246 }
2247 if backend_resp.is_ok()
2251 && let Some(mgr) = self.notifications.as_ref()
2252 {
2253 let dests = mgr.match_destinations(
2254 &put_bucket,
2255 &crate::notifications::EventType::ObjectCreatedPut,
2256 &put_key,
2257 );
2258 if !dests.is_empty() {
2259 let etag = backend_resp
2260 .as_ref()
2261 .ok()
2262 .and_then(|r| r.output.e_tag.clone())
2263 .map(ETag::into_value);
2264 let version_id = pending_version
2265 .as_ref()
2266 .filter(|pv| pv.versioned_response)
2267 .map(|pv| pv.version_id.clone());
2268 tokio::spawn(crate::notifications::dispatch_event(
2269 Arc::clone(mgr),
2270 put_bucket.clone(),
2271 put_key.clone(),
2272 crate::notifications::EventType::ObjectCreatedPut,
2273 Some(0),
2274 etag,
2275 version_id,
2276 format!("S4-{}", uuid::Uuid::new_v4()),
2277 ));
2278 }
2279 }
2280 if backend_resp.is_ok()
2284 && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2285 {
2286 mgr.put_object_tags(&put_bucket, &put_key, tags);
2287 }
2288 self.spawn_replication_if_matched(
2291 &put_bucket,
2292 &put_key,
2293 &request_tags,
2294 &bytes::Bytes::new(),
2295 &None,
2296 backend_resp.is_ok(),
2297 );
2298 backend_resp
2299 }
2300
2301 #[tracing::instrument(
2303 name = "s4.get_object",
2304 skip(self, req),
2305 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2306 )]
2307 async fn get_object(
2308 &self,
2309 mut req: S3Request<GetObjectInput>,
2310 ) -> S3Result<S3Response<GetObjectOutput>> {
2311 let get_start = Instant::now();
2312 let get_bucket = req.input.bucket.clone();
2313 let get_key = req.input.key.clone();
2314 self.enforce_rate_limit(&req, &get_bucket)?;
2315 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2316 let range_request = req.input.range.take();
2318 let sse_c_alg = req.input.sse_customer_algorithm.take();
2324 let sse_c_key = req.input.sse_customer_key.take();
2325 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2326 let get_sse_c_material =
2327 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2328
2329 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2342 Some(mgr)
2343 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2344 {
2345 let req_vid = req.input.version_id.take();
2346 let entry = match req_vid.as_deref() {
2347 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2348 || S3Error::with_message(
2349 S3ErrorCode::NoSuchVersion,
2350 format!("no such version: {vid}"),
2351 ),
2352 )?,
2353 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2354 S3Error::with_message(
2355 S3ErrorCode::NoSuchKey,
2356 format!("no such key: {get_key}"),
2357 )
2358 })?,
2359 };
2360 if entry.is_delete_marker {
2361 return Err(S3Error::with_message(
2369 S3ErrorCode::NoSuchKey,
2370 format!("delete marker is the current version of {get_key}"),
2371 ));
2372 }
2373 if entry.version_id != crate::versioning::NULL_VERSION_ID {
2374 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2375 }
2376 Some(entry.version_id)
2377 }
2378 _ => None,
2379 };
2380
2381 if let Some(ref r) = range_request
2385 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2386 {
2387 let total = index.total_original_size();
2388 let (start, end_exclusive) = match resolve_range(r, total) {
2389 Ok(v) => v,
2390 Err(e) => {
2391 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2392 }
2393 };
2394 if let Some(plan) = index.lookup_range(start, end_exclusive) {
2395 return self
2396 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2397 .await;
2398 }
2399 }
2400 let mut resp = self.backend.get_object(req).await?;
2401 if let Some(ref vid) = resolved_version_id {
2406 resp.output.version_id = Some(vid.clone());
2407 }
2408 let is_multipart = is_multipart_object(&resp.output.metadata);
2409 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2410 let needs_frame_parse = is_multipart || is_framed_v2;
2413 let manifest_opt = extract_manifest(&resp.output.metadata);
2414
2415 if !needs_frame_parse && manifest_opt.is_none() {
2416 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2418 return Ok(resp);
2419 }
2420
2421 if let Some(blob) = resp.output.body.take() {
2422 let blob = if is_sse_encrypted(&resp.output.metadata) {
2430 let body = collect_blob(blob, self.max_body_bytes)
2431 .await
2432 .map_err(internal("collect SSE-encrypted body"))?;
2433 let plain = match crate::sse::peek_magic(&body) {
2438 Some("S4E4") => {
2439 let kms = self.kms.as_ref().ok_or_else(|| {
2440 S3Error::with_message(
2441 S3ErrorCode::InvalidRequest,
2442 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2443 )
2444 })?;
2445 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2446 crate::sse::decrypt_with_kms(&body, kms_ref)
2447 .await
2448 .map_err(|e| match e {
2449 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2450 other => S3Error::with_message(
2451 S3ErrorCode::InternalError,
2452 format!("SSE-KMS decrypt failed: {other}"),
2453 ),
2454 })?
2455 }
2456 _ => {
2457 if let Some(ref m) = get_sse_c_material {
2458 crate::sse::decrypt(
2459 &body,
2460 crate::sse::SseSource::CustomerKey {
2461 key: &m.key,
2462 key_md5: &m.key_md5,
2463 },
2464 )
2465 .map_err(sse_c_error_to_s3)?
2466 } else {
2467 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2468 S3Error::with_message(
2469 S3ErrorCode::InvalidRequest,
2470 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2471 )
2472 })?;
2473 crate::sse::decrypt(&body, keyring).map_err(|e| {
2474 S3Error::with_message(
2475 S3ErrorCode::InternalError,
2476 format!("SSE-S4 decrypt failed: {e}"),
2477 )
2478 })?
2479 }
2480 }
2481 };
2482 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2485 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2486 {
2487 resp.output.server_side_encryption = Some(
2488 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2489 );
2490 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2491 }
2492 bytes_to_blob(plain)
2493 } else if let Some(ref m) = get_sse_c_material {
2494 let _ = m;
2497 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2498 } else {
2499 blob
2500 };
2501 if let Some(ref m) = get_sse_c_material {
2504 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2505 resp.output.sse_customer_key_md5 = Some(
2506 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2507 );
2508 }
2509 if range_request.is_none()
2517 && !needs_frame_parse
2518 && let Some(ref m) = manifest_opt
2519 && supports_streaming_decompress(m.codec)
2520 && m.codec == CodecKind::CpuZstd
2521 {
2522 let decompressed_blob = cpu_zstd_decompress_stream(blob);
2523 resp.output.content_length = Some(m.original_size as i64);
2524 resp.output.checksum_crc32 = None;
2525 resp.output.checksum_crc32c = None;
2526 resp.output.checksum_crc64nvme = None;
2527 resp.output.checksum_sha1 = None;
2528 resp.output.checksum_sha256 = None;
2529 resp.output.e_tag = None;
2530 resp.output.body = Some(decompressed_blob);
2531 let elapsed = get_start.elapsed();
2532 crate::metrics::record_get(
2533 m.codec.as_str(),
2534 m.compressed_size,
2535 m.original_size,
2536 elapsed.as_secs_f64(),
2537 true,
2538 );
2539 info!(
2540 op = "get_object",
2541 bucket = %get_bucket,
2542 key = %get_key,
2543 codec = m.codec.as_str(),
2544 bytes_in = m.compressed_size,
2545 bytes_out = m.original_size,
2546 path = "streaming",
2547 setup_latency_ms = elapsed.as_millis() as u64,
2548 "S4 get started (streaming)"
2549 );
2550 return Ok(resp);
2551 }
2552 if range_request.is_none()
2554 && !needs_frame_parse
2555 && let Some(ref m) = manifest_opt
2556 && m.codec == CodecKind::Passthrough
2557 {
2558 resp.output.content_length = Some(m.original_size as i64);
2559 resp.output.checksum_crc32 = None;
2560 resp.output.checksum_crc32c = None;
2561 resp.output.checksum_crc64nvme = None;
2562 resp.output.checksum_sha1 = None;
2563 resp.output.checksum_sha256 = None;
2564 resp.output.e_tag = None;
2565 resp.output.body = Some(blob);
2566 debug!("S4 get_object: passthrough streaming");
2567 return Ok(resp);
2568 }
2569
2570 let bytes = collect_blob(blob, self.max_body_bytes)
2572 .await
2573 .map_err(internal("collect get body"))?;
2574
2575 let decompressed = if needs_frame_parse {
2576 self.decompress_multipart(bytes).await?
2579 } else {
2580 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2581 self.registry
2582 .decompress(bytes, manifest)
2583 .await
2584 .map_err(internal("registry decompress"))?
2585 };
2586
2587 let total_size = decompressed.len() as u64;
2589 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2590 let (start, end) = resolve_range(r, total_size)
2591 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2592 let sliced = decompressed.slice(start as usize..end as usize);
2593 resp.output.content_range = Some(format!(
2594 "bytes {start}-{}/{total_size}",
2595 end.saturating_sub(1)
2596 ));
2597 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2598 } else {
2599 (decompressed, None)
2600 };
2601 resp.output.content_length = Some(final_bytes.len() as i64);
2604 resp.output.checksum_crc32 = None;
2609 resp.output.checksum_crc32c = None;
2610 resp.output.checksum_crc64nvme = None;
2611 resp.output.checksum_sha1 = None;
2612 resp.output.checksum_sha256 = None;
2613 resp.output.e_tag = None;
2614 let returned_size = final_bytes.len() as u64;
2615 let codec_label = manifest_opt
2616 .as_ref()
2617 .map(|m| m.codec.as_str())
2618 .unwrap_or("multipart");
2619 resp.output.body = Some(bytes_to_blob(final_bytes));
2620 if let Some(status) = status_override {
2621 resp.status = Some(status);
2622 }
2623 let elapsed = get_start.elapsed();
2624 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2625 info!(
2626 op = "get_object",
2627 bucket = %get_bucket,
2628 key = %get_key,
2629 codec = codec_label,
2630 bytes_out = returned_size,
2631 total_object_size = total_size,
2632 range = range_request.is_some(),
2633 path = "buffered",
2634 latency_ms = elapsed.as_millis() as u64,
2635 "S4 get completed (buffered)"
2636 );
2637 }
2638 if let Some(mgr) = self.replication.as_ref()
2641 && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2642 {
2643 resp.output.replication_status =
2644 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2645 }
2646 Ok(resp)
2647 }
2648
2649 async fn head_bucket(
2651 &self,
2652 req: S3Request<HeadBucketInput>,
2653 ) -> S3Result<S3Response<HeadBucketOutput>> {
2654 self.backend.head_bucket(req).await
2655 }
2656 async fn list_buckets(
2657 &self,
2658 req: S3Request<ListBucketsInput>,
2659 ) -> S3Result<S3Response<ListBucketsOutput>> {
2660 self.backend.list_buckets(req).await
2661 }
2662 async fn create_bucket(
2663 &self,
2664 req: S3Request<CreateBucketInput>,
2665 ) -> S3Result<S3Response<CreateBucketOutput>> {
2666 self.backend.create_bucket(req).await
2667 }
2668 async fn delete_bucket(
2669 &self,
2670 req: S3Request<DeleteBucketInput>,
2671 ) -> S3Result<S3Response<DeleteBucketOutput>> {
2672 self.backend.delete_bucket(req).await
2673 }
2674 async fn head_object(
2675 &self,
2676 req: S3Request<HeadObjectInput>,
2677 ) -> S3Result<S3Response<HeadObjectOutput>> {
2678 let head_bucket = req.input.bucket.clone();
2681 let head_key = req.input.key.clone();
2682 let mut resp = self.backend.head_object(req).await?;
2683 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2684 resp.output.content_length = Some(manifest.original_size as i64);
2688 resp.output.checksum_crc32 = None;
2689 resp.output.checksum_crc32c = None;
2690 resp.output.checksum_crc64nvme = None;
2691 resp.output.checksum_sha1 = None;
2692 resp.output.checksum_sha256 = None;
2693 resp.output.e_tag = None;
2694 }
2695 if let Some(mgr) = self.replication.as_ref()
2698 && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2699 {
2700 resp.output.replication_status =
2701 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2702 }
2703 if let Some(meta) = resp.output.metadata.as_ref()
2708 && let Some(sse_type) = meta.get("s4-sse-type")
2709 {
2710 {
2711 match sse_type.as_str() {
2712 "aws:kms" => {
2713 resp.output.server_side_encryption = Some(
2714 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2715 );
2716 if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2717 resp.output.ssekms_key_id = Some(key_id.clone());
2718 }
2719 }
2720 _ => {
2721 resp.output.server_side_encryption = Some(
2722 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2723 );
2724 if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2725 resp.output.sse_customer_algorithm =
2726 Some(crate::sse::SSE_C_ALGORITHM.into());
2727 resp.output.sse_customer_key_md5 = Some(md5.clone());
2728 }
2729 }
2730 }
2731 }
2732 }
2733 Ok(resp)
2734 }
2735 async fn delete_object(
2736 &self,
2737 mut req: S3Request<DeleteObjectInput>,
2738 ) -> S3Result<S3Response<DeleteObjectOutput>> {
2739 let bucket = req.input.bucket.clone();
2740 let key = req.input.key.clone();
2741 self.enforce_rate_limit(&req, &bucket)?;
2742 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2743 if let Some(mgr) = self.mfa_delete.as_ref()
2750 && mgr.is_enabled(&bucket)
2751 {
2752 let header = req.input.mfa.as_deref();
2753 if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2754 crate::metrics::record_mfa_delete_denial(&bucket);
2755 return Err(mfa_error_to_s3(e));
2756 }
2757 }
2758 if let Some(mgr) = self.object_lock.as_ref()
2766 && let Some(state) = mgr.get(&bucket, &key)
2767 {
2768 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2769 let now = chrono::Utc::now();
2770 if !state.can_delete(now, bypass) {
2771 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2772 return Err(S3Error::with_message(
2773 S3ErrorCode::AccessDenied,
2774 "Access Denied because object protected by object lock",
2775 ));
2776 }
2777 }
2778 if let Some(mgr) = self.versioning.as_ref() {
2794 let state = mgr.state(&bucket);
2795 if state != crate::versioning::VersioningState::Unversioned {
2796 let req_vid = req.input.version_id.take();
2797 if let Some(vid) = req_vid {
2798 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2802 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2803 key.clone()
2804 } else {
2805 versioned_shadow_key(&key, &vid)
2806 };
2807 let was_real_version = outcome
2808 .as_ref()
2809 .map(|o| !o.is_delete_marker)
2810 .unwrap_or(false);
2811 if was_real_version {
2812 let backend_input = DeleteObjectInput {
2816 bucket: bucket.clone(),
2817 key: backend_target,
2818 ..Default::default()
2819 };
2820 let backend_req = S3Request {
2821 input: backend_input,
2822 method: http::Method::DELETE,
2823 uri: req.uri.clone(),
2824 headers: req.headers.clone(),
2825 extensions: http::Extensions::new(),
2826 credentials: req.credentials.clone(),
2827 region: req.region.clone(),
2828 service: req.service.clone(),
2829 trailing_headers: None,
2830 };
2831 let _ = self.backend.delete_object(backend_req).await;
2832 }
2833 let mut output = DeleteObjectOutput {
2834 version_id: Some(vid.clone()),
2835 ..Default::default()
2836 };
2837 if let Some(o) = outcome.as_ref()
2838 && o.is_delete_marker
2839 {
2840 output.delete_marker = Some(true);
2841 }
2842 self.fire_delete_notification(
2846 &bucket,
2847 &key,
2848 crate::notifications::EventType::ObjectRemovedDelete,
2849 Some(vid.clone()),
2850 );
2851 return Ok(S3Response::new(output));
2852 }
2853 let outcome = mgr.record_delete(&bucket, &key);
2855 if state == crate::versioning::VersioningState::Suspended {
2856 let backend_input = DeleteObjectInput {
2859 bucket: bucket.clone(),
2860 key: key.clone(),
2861 ..Default::default()
2862 };
2863 let backend_req = S3Request {
2864 input: backend_input,
2865 method: http::Method::DELETE,
2866 uri: req.uri.clone(),
2867 headers: req.headers.clone(),
2868 extensions: http::Extensions::new(),
2869 credentials: req.credentials.clone(),
2870 region: req.region.clone(),
2871 service: req.service.clone(),
2872 trailing_headers: None,
2873 };
2874 let _ = self.backend.delete_object(backend_req).await;
2875 }
2876 let output = DeleteObjectOutput {
2877 delete_marker: Some(true),
2878 version_id: outcome.version_id.clone(),
2879 ..Default::default()
2880 };
2881 self.fire_delete_notification(
2886 &bucket,
2887 &key,
2888 crate::notifications::EventType::ObjectRemovedDeleteMarker,
2889 outcome.version_id,
2890 );
2891 return Ok(S3Response::new(output));
2892 }
2893 }
2894 let resp = self.backend.delete_object(req).await?;
2897 if let Some(mgr) = self.object_lock.as_ref() {
2902 mgr.clear(&bucket, &key);
2903 }
2904 if let Some(mgr) = self.tagging.as_ref() {
2910 mgr.delete_object_tags(&bucket, &key);
2911 }
2912 let sidecar = sidecar_key(&key);
2913 if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
2918 let sidecar_input = DeleteObjectInput {
2919 bucket: bucket.clone(),
2920 key: sidecar,
2921 ..Default::default()
2922 };
2923 let sidecar_req = S3Request {
2924 input: sidecar_input,
2925 method: http::Method::DELETE,
2926 uri,
2927 headers: http::HeaderMap::new(),
2928 extensions: http::Extensions::new(),
2929 credentials: None,
2930 region: None,
2931 service: None,
2932 trailing_headers: None,
2933 };
2934 let _ = self.backend.delete_object(sidecar_req).await;
2935 }
2936 self.fire_delete_notification(
2939 &bucket,
2940 &key,
2941 crate::notifications::EventType::ObjectRemovedDelete,
2942 None,
2943 );
2944 Ok(resp)
2945 }
2946 async fn delete_objects(
2947 &self,
2948 req: S3Request<DeleteObjectsInput>,
2949 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
2950 if let Some(mgr) = self.mfa_delete.as_ref()
2954 && mgr.is_enabled(&req.input.bucket)
2955 {
2956 let header = req.input.mfa.as_deref();
2957 if let Err(e) =
2958 crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
2959 {
2960 crate::metrics::record_mfa_delete_denial(&req.input.bucket);
2961 return Err(mfa_error_to_s3(e));
2962 }
2963 }
2964 self.backend.delete_objects(req).await
2965 }
2966 async fn copy_object(
2967 &self,
2968 mut req: S3Request<CopyObjectInput>,
2969 ) -> S3Result<S3Response<CopyObjectOutput>> {
2970 let dst_bucket = req.input.bucket.clone();
2972 let dst_key = req.input.key.clone();
2973 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
2974 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2975 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
2976 }
2977 let needs_merge = req
2987 .input
2988 .metadata_directive
2989 .as_ref()
2990 .map(|d| d.as_str() == MetadataDirective::REPLACE)
2991 .unwrap_or(false);
2992 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2993 let head_input = HeadObjectInput {
2994 bucket: bucket.to_string(),
2995 key: key.to_string(),
2996 ..Default::default()
2997 };
2998 let head_req = S3Request {
2999 input: head_input,
3000 method: req.method.clone(),
3001 uri: req.uri.clone(),
3002 headers: req.headers.clone(),
3003 extensions: http::Extensions::new(),
3004 credentials: req.credentials.clone(),
3005 region: req.region.clone(),
3006 service: req.service.clone(),
3007 trailing_headers: None,
3008 };
3009 if let Ok(head) = self.backend.head_object(head_req).await
3010 && let Some(src_meta) = head.output.metadata.as_ref()
3011 {
3012 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3013 for key in [
3014 META_CODEC,
3015 META_ORIGINAL_SIZE,
3016 META_COMPRESSED_SIZE,
3017 META_CRC32C,
3018 META_MULTIPART,
3019 META_FRAMED,
3020 ] {
3021 if let Some(v) = src_meta.get(key) {
3022 dest_meta
3025 .entry(key.to_string())
3026 .or_insert_with(|| v.clone());
3027 }
3028 }
3029 debug!(
3030 src_bucket = %bucket,
3031 src_key = %key,
3032 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3033 );
3034 }
3035 }
3036 self.backend.copy_object(req).await
3037 }
3038 async fn list_objects(
3039 &self,
3040 req: S3Request<ListObjectsInput>,
3041 ) -> S3Result<S3Response<ListObjectsOutput>> {
3042 self.enforce_rate_limit(&req, &req.input.bucket)?;
3043 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3044 let mut resp = self.backend.list_objects(req).await?;
3045 if let Some(contents) = resp.output.contents.as_mut() {
3048 contents.retain(|o| {
3049 o.key
3050 .as_ref()
3051 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3052 .unwrap_or(true)
3053 });
3054 }
3055 Ok(resp)
3056 }
3057 async fn list_objects_v2(
3058 &self,
3059 req: S3Request<ListObjectsV2Input>,
3060 ) -> S3Result<S3Response<ListObjectsV2Output>> {
3061 self.enforce_rate_limit(&req, &req.input.bucket)?;
3062 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3063 let mut resp = self.backend.list_objects_v2(req).await?;
3064 if let Some(contents) = resp.output.contents.as_mut() {
3065 let before = contents.len();
3066 contents.retain(|o| {
3067 o.key
3068 .as_ref()
3069 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3070 .unwrap_or(true)
3071 });
3072 if let Some(kc) = resp.output.key_count.as_mut() {
3074 *kc -= (before - contents.len()) as i32;
3075 }
3076 }
3077 Ok(resp)
3078 }
3079 async fn list_object_versions(
3087 &self,
3088 req: S3Request<ListObjectVersionsInput>,
3089 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3090 self.enforce_rate_limit(&req, &req.input.bucket)?;
3091 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3092 if let Some(mgr) = self.versioning.as_ref()
3094 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3095 {
3096 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3097 let page = mgr.list_versions(
3098 &req.input.bucket,
3099 req.input.prefix.as_deref(),
3100 req.input.key_marker.as_deref(),
3101 req.input.version_id_marker.as_deref(),
3102 max_keys,
3103 );
3104 let versions: Vec<ObjectVersion> = page
3105 .versions
3106 .into_iter()
3107 .map(|e| ObjectVersion {
3108 key: Some(e.key),
3109 version_id: Some(e.version_id),
3110 is_latest: Some(e.is_latest),
3111 e_tag: Some(ETag::Strong(e.etag)),
3112 size: Some(e.size as i64),
3113 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3114 ..Default::default()
3115 })
3116 .collect();
3117 let delete_markers: Vec<DeleteMarkerEntry> = page
3118 .delete_markers
3119 .into_iter()
3120 .map(|e| DeleteMarkerEntry {
3121 key: Some(e.key),
3122 version_id: Some(e.version_id),
3123 is_latest: Some(e.is_latest),
3124 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3125 ..Default::default()
3126 })
3127 .collect();
3128 let output = ListObjectVersionsOutput {
3129 name: Some(req.input.bucket.clone()),
3130 prefix: req.input.prefix.clone(),
3131 key_marker: req.input.key_marker.clone(),
3132 version_id_marker: req.input.version_id_marker.clone(),
3133 max_keys: req.input.max_keys,
3134 versions: if versions.is_empty() {
3135 None
3136 } else {
3137 Some(versions)
3138 },
3139 delete_markers: if delete_markers.is_empty() {
3140 None
3141 } else {
3142 Some(delete_markers)
3143 },
3144 is_truncated: Some(page.is_truncated),
3145 next_key_marker: page.next_key_marker,
3146 next_version_id_marker: page.next_version_id_marker,
3147 ..Default::default()
3148 };
3149 return Ok(S3Response::new(output));
3150 }
3151 let mut resp = self.backend.list_object_versions(req).await?;
3153 if let Some(versions) = resp.output.versions.as_mut() {
3154 versions.retain(|v| {
3155 v.key
3156 .as_ref()
3157 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3158 .unwrap_or(true)
3159 });
3160 }
3161 if let Some(markers) = resp.output.delete_markers.as_mut() {
3162 markers.retain(|m| {
3163 m.key
3164 .as_ref()
3165 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3166 .unwrap_or(true)
3167 });
3168 }
3169 Ok(resp)
3170 }
3171
3172 async fn create_multipart_upload(
3173 &self,
3174 mut req: S3Request<CreateMultipartUploadInput>,
3175 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3176 let codec_kind = self.registry.default_kind();
3180 let meta = req.input.metadata.get_or_insert_with(Default::default);
3181 meta.insert(META_MULTIPART.into(), "true".into());
3182 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3183 debug!(
3184 bucket = ?req.input.bucket,
3185 key = ?req.input.key,
3186 codec = codec_kind.as_str(),
3187 "S4 create_multipart_upload: marking object for per-part compression"
3188 );
3189 self.backend.create_multipart_upload(req).await
3190 }
3191
3192 async fn upload_part(
3193 &self,
3194 mut req: S3Request<UploadPartInput>,
3195 ) -> S3Result<S3Response<UploadPartOutput>> {
3196 if let Some(blob) = req.input.body.take() {
3202 let bytes = collect_blob(blob, self.max_body_bytes)
3203 .await
3204 .map_err(internal("collect upload_part body"))?;
3205 let sample_len = bytes.len().min(SAMPLE_BYTES);
3206 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3207 let original_size = bytes.len() as u64;
3208 let (compressed, manifest) = self
3209 .registry
3210 .compress(bytes, codec_kind)
3211 .await
3212 .map_err(internal("registry compress part"))?;
3213 let header = FrameHeader {
3214 codec: codec_kind,
3215 original_size,
3216 compressed_size: compressed.len() as u64,
3217 crc32c: manifest.crc32c,
3218 };
3219 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3220 write_frame(&mut framed, header, &compressed);
3221 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3235 if !likely_final {
3236 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3237 }
3238 let framed_bytes = framed.freeze();
3239 let new_len = framed_bytes.len() as i64;
3240 req.input.content_length = Some(new_len);
3242 req.input.checksum_algorithm = None;
3243 req.input.checksum_crc32 = None;
3244 req.input.checksum_crc32c = None;
3245 req.input.checksum_crc64nvme = None;
3246 req.input.checksum_sha1 = None;
3247 req.input.checksum_sha256 = None;
3248 req.input.content_md5 = None;
3249 req.input.body = Some(bytes_to_blob(framed_bytes));
3250 debug!(
3251 part_number = ?req.input.part_number,
3252 upload_id = ?req.input.upload_id,
3253 original_size,
3254 framed_size = new_len,
3255 "S4 upload_part: framed compressed payload"
3256 );
3257 }
3258 self.backend.upload_part(req).await
3259 }
3260 async fn complete_multipart_upload(
3261 &self,
3262 req: S3Request<CompleteMultipartUploadInput>,
3263 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3264 let bucket = req.input.bucket.clone();
3265 let key = req.input.key.clone();
3266 let resp = self.backend.complete_multipart_upload(req).await?;
3267 let bucket_clone = bucket.clone();
3273 let key_clone = key.clone();
3274 if let Ok(uri) = safe_object_uri(&bucket_clone, &key_clone) {
3281 let get_input = GetObjectInput {
3282 bucket: bucket_clone.clone(),
3283 key: key_clone.clone(),
3284 ..Default::default()
3285 };
3286 let get_req = S3Request {
3287 input: get_input,
3288 method: http::Method::GET,
3289 uri,
3290 headers: http::HeaderMap::new(),
3291 extensions: http::Extensions::new(),
3292 credentials: None,
3293 region: None,
3294 service: None,
3295 trailing_headers: None,
3296 };
3297 if let Ok(get_resp) = self.backend.get_object(get_req).await
3298 && let Some(blob) = get_resp.output.body
3299 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
3300 && let Ok(index) = build_index_from_body(&body)
3301 {
3302 self.write_sidecar(&bucket, &key, &index).await;
3303 }
3304 }
3305 Ok(resp)
3306 }
3307 async fn abort_multipart_upload(
3308 &self,
3309 req: S3Request<AbortMultipartUploadInput>,
3310 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
3311 self.backend.abort_multipart_upload(req).await
3312 }
3313 async fn list_multipart_uploads(
3314 &self,
3315 req: S3Request<ListMultipartUploadsInput>,
3316 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
3317 self.backend.list_multipart_uploads(req).await
3318 }
3319 async fn list_parts(
3320 &self,
3321 req: S3Request<ListPartsInput>,
3322 ) -> S3Result<S3Response<ListPartsOutput>> {
3323 self.backend.list_parts(req).await
3324 }
3325
3326 async fn get_object_acl(
3342 &self,
3343 req: S3Request<GetObjectAclInput>,
3344 ) -> S3Result<S3Response<GetObjectAclOutput>> {
3345 self.backend.get_object_acl(req).await
3346 }
3347 async fn put_object_acl(
3348 &self,
3349 req: S3Request<PutObjectAclInput>,
3350 ) -> S3Result<S3Response<PutObjectAclOutput>> {
3351 self.backend.put_object_acl(req).await
3352 }
3353 async fn get_object_tagging(
3359 &self,
3360 req: S3Request<GetObjectTaggingInput>,
3361 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
3362 let Some(mgr) = self.tagging.as_ref() else {
3363 return self.backend.get_object_tagging(req).await;
3364 };
3365 let tags = mgr
3366 .get_object_tags(&req.input.bucket, &req.input.key)
3367 .unwrap_or_default();
3368 Ok(S3Response::new(GetObjectTaggingOutput {
3369 tag_set: tagset_to_aws(&tags),
3370 ..Default::default()
3371 }))
3372 }
3373 async fn put_object_tagging(
3374 &self,
3375 req: S3Request<PutObjectTaggingInput>,
3376 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
3377 let Some(mgr) = self.tagging.as_ref() else {
3378 return self.backend.put_object_tagging(req).await;
3379 };
3380 let bucket = req.input.bucket.clone();
3381 let key = req.input.key.clone();
3382 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
3383 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
3384 })?;
3385 let existing = mgr.get_object_tags(&bucket, &key);
3389 self.enforce_policy_with_extra(
3390 &req,
3391 "s3:PutObjectTagging",
3392 &bucket,
3393 Some(&key),
3394 Some(&parsed),
3395 existing.as_ref(),
3396 )?;
3397 mgr.put_object_tags(&bucket, &key, parsed);
3398 Ok(S3Response::new(PutObjectTaggingOutput::default()))
3399 }
3400 async fn delete_object_tagging(
3401 &self,
3402 req: S3Request<DeleteObjectTaggingInput>,
3403 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
3404 let Some(mgr) = self.tagging.as_ref() else {
3405 return self.backend.delete_object_tagging(req).await;
3406 };
3407 let bucket = req.input.bucket.clone();
3408 let key = req.input.key.clone();
3409 let existing = mgr.get_object_tags(&bucket, &key);
3410 self.enforce_policy_with_extra(
3411 &req,
3412 "s3:DeleteObjectTagging",
3413 &bucket,
3414 Some(&key),
3415 None,
3416 existing.as_ref(),
3417 )?;
3418 mgr.delete_object_tags(&bucket, &key);
3419 Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
3420 }
3421 async fn get_object_attributes(
3422 &self,
3423 req: S3Request<GetObjectAttributesInput>,
3424 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
3425 self.backend.get_object_attributes(req).await
3426 }
3427 async fn restore_object(
3428 &self,
3429 req: S3Request<RestoreObjectInput>,
3430 ) -> S3Result<S3Response<RestoreObjectOutput>> {
3431 self.backend.restore_object(req).await
3432 }
3433 async fn upload_part_copy(
3434 &self,
3435 req: S3Request<UploadPartCopyInput>,
3436 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
3437 let CopySource::Bucket {
3448 bucket: src_bucket,
3449 key: src_key,
3450 ..
3451 } = &req.input.copy_source
3452 else {
3453 return self.backend.upload_part_copy(req).await;
3454 };
3455 let src_bucket = src_bucket.to_string();
3456 let src_key = src_key.to_string();
3457
3458 let head_input = HeadObjectInput {
3460 bucket: src_bucket.clone(),
3461 key: src_key.clone(),
3462 ..Default::default()
3463 };
3464 let head_req = S3Request {
3465 input: head_input,
3466 method: http::Method::HEAD,
3467 uri: req.uri.clone(),
3468 headers: req.headers.clone(),
3469 extensions: http::Extensions::new(),
3470 credentials: req.credentials.clone(),
3471 region: req.region.clone(),
3472 service: req.service.clone(),
3473 trailing_headers: None,
3474 };
3475 let needs_s4_copy = match self.backend.head_object(head_req).await {
3476 Ok(h) => {
3477 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
3478 }
3479 Err(_) => false,
3480 };
3481 if !needs_s4_copy {
3482 return self.backend.upload_part_copy(req).await;
3483 }
3484
3485 let source_range = req
3487 .input
3488 .copy_source_range
3489 .as_ref()
3490 .map(|r| parse_copy_source_range(r))
3491 .transpose()
3492 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
3493
3494 let mut get_input = GetObjectInput {
3498 bucket: src_bucket.clone(),
3499 key: src_key.clone(),
3500 ..Default::default()
3501 };
3502 get_input.range = source_range;
3503 let get_req = S3Request {
3504 input: get_input,
3505 method: http::Method::GET,
3506 uri: req.uri.clone(),
3507 headers: req.headers.clone(),
3508 extensions: http::Extensions::new(),
3509 credentials: req.credentials.clone(),
3510 region: req.region.clone(),
3511 service: req.service.clone(),
3512 trailing_headers: None,
3513 };
3514 let get_resp = self.get_object(get_req).await?;
3515 let blob = get_resp.output.body.ok_or_else(|| {
3516 S3Error::with_message(
3517 S3ErrorCode::InternalError,
3518 "upload_part_copy: empty body from source GET",
3519 )
3520 })?;
3521 let bytes = collect_blob(blob, self.max_body_bytes)
3522 .await
3523 .map_err(internal("collect upload_part_copy source body"))?;
3524
3525 let sample_len = bytes.len().min(SAMPLE_BYTES);
3527 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3528 let original_size = bytes.len() as u64;
3529 let (compressed, manifest) = self
3530 .registry
3531 .compress(bytes, codec_kind)
3532 .await
3533 .map_err(internal("registry compress upload_part_copy"))?;
3534 let header = FrameHeader {
3535 codec: codec_kind,
3536 original_size,
3537 compressed_size: compressed.len() as u64,
3538 crc32c: manifest.crc32c,
3539 };
3540 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3541 write_frame(&mut framed, header, &compressed);
3542 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3543 if !likely_final {
3544 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3545 }
3546 let framed_bytes = framed.freeze();
3547 let framed_len = framed_bytes.len() as i64;
3548
3549 let part_input = UploadPartInput {
3551 bucket: req.input.bucket.clone(),
3552 key: req.input.key.clone(),
3553 part_number: req.input.part_number,
3554 upload_id: req.input.upload_id.clone(),
3555 body: Some(bytes_to_blob(framed_bytes)),
3556 content_length: Some(framed_len),
3557 ..Default::default()
3558 };
3559 let part_req = S3Request {
3560 input: part_input,
3561 method: http::Method::PUT,
3562 uri: req.uri.clone(),
3563 headers: req.headers.clone(),
3564 extensions: http::Extensions::new(),
3565 credentials: req.credentials.clone(),
3566 region: req.region.clone(),
3567 service: req.service.clone(),
3568 trailing_headers: None,
3569 };
3570 let upload_resp = self.backend.upload_part(part_req).await?;
3571
3572 let copy_output = UploadPartCopyOutput {
3573 copy_part_result: Some(CopyPartResult {
3574 e_tag: upload_resp.output.e_tag.clone(),
3575 ..Default::default()
3576 }),
3577 ..Default::default()
3578 };
3579 Ok(S3Response::new(copy_output))
3580 }
3581
3582 async fn get_object_lock_configuration(
3589 &self,
3590 req: S3Request<GetObjectLockConfigurationInput>,
3591 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
3592 if let Some(mgr) = self.object_lock.as_ref() {
3593 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
3594 ObjectLockConfiguration {
3595 object_lock_enabled: Some(ObjectLockEnabled::from_static(
3596 ObjectLockEnabled::ENABLED,
3597 )),
3598 rule: Some(ObjectLockRule {
3599 default_retention: Some(DefaultRetention {
3600 days: Some(d.retention_days as i32),
3601 mode: Some(ObjectLockRetentionMode::from_static(
3602 match d.mode {
3603 crate::object_lock::LockMode::Governance => {
3604 ObjectLockRetentionMode::GOVERNANCE
3605 }
3606 crate::object_lock::LockMode::Compliance => {
3607 ObjectLockRetentionMode::COMPLIANCE
3608 }
3609 },
3610 )),
3611 years: None,
3612 }),
3613 }),
3614 }
3615 });
3616 let output = GetObjectLockConfigurationOutput {
3617 object_lock_configuration: cfg,
3618 };
3619 return Ok(S3Response::new(output));
3620 }
3621 self.backend.get_object_lock_configuration(req).await
3622 }
3623 async fn put_object_lock_configuration(
3624 &self,
3625 req: S3Request<PutObjectLockConfigurationInput>,
3626 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
3627 if let Some(mgr) = self.object_lock.as_ref() {
3628 let bucket = req.input.bucket.clone();
3629 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
3630 && let Some(rule) = cfg.rule.as_ref()
3631 && let Some(d) = rule.default_retention.as_ref()
3632 {
3633 let mode = d
3634 .mode
3635 .as_ref()
3636 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
3637 .ok_or_else(|| {
3638 S3Error::with_message(
3639 S3ErrorCode::InvalidRequest,
3640 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
3641 )
3642 })?;
3643 let days: u32 = match (d.days, d.years) {
3647 (Some(d), None) if d > 0 => d as u32,
3648 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
3649 _ => {
3650 return Err(S3Error::with_message(
3651 S3ErrorCode::InvalidRequest,
3652 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
3653 ));
3654 }
3655 };
3656 mgr.set_bucket_default(
3657 &bucket,
3658 crate::object_lock::BucketObjectLockDefault {
3659 mode,
3660 retention_days: days,
3661 },
3662 );
3663 }
3664 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
3665 }
3666 self.backend.put_object_lock_configuration(req).await
3667 }
3668 async fn get_object_legal_hold(
3669 &self,
3670 req: S3Request<GetObjectLegalHoldInput>,
3671 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
3672 if let Some(mgr) = self.object_lock.as_ref() {
3673 let on = mgr
3674 .get(&req.input.bucket, &req.input.key)
3675 .map(|s| s.legal_hold_on)
3676 .unwrap_or(false);
3677 let status = ObjectLockLegalHoldStatus::from_static(if on {
3678 ObjectLockLegalHoldStatus::ON
3679 } else {
3680 ObjectLockLegalHoldStatus::OFF
3681 });
3682 let output = GetObjectLegalHoldOutput {
3683 legal_hold: Some(ObjectLockLegalHold {
3684 status: Some(status),
3685 }),
3686 };
3687 return Ok(S3Response::new(output));
3688 }
3689 self.backend.get_object_legal_hold(req).await
3690 }
3691 async fn put_object_legal_hold(
3692 &self,
3693 req: S3Request<PutObjectLegalHoldInput>,
3694 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
3695 if let Some(mgr) = self.object_lock.as_ref() {
3696 let on = req
3697 .input
3698 .legal_hold
3699 .as_ref()
3700 .and_then(|h| h.status.as_ref())
3701 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3702 .unwrap_or(false);
3703 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
3704 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
3705 }
3706 self.backend.put_object_legal_hold(req).await
3707 }
3708 async fn get_object_retention(
3709 &self,
3710 req: S3Request<GetObjectRetentionInput>,
3711 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
3712 if let Some(mgr) = self.object_lock.as_ref() {
3713 let retention = mgr
3714 .get(&req.input.bucket, &req.input.key)
3715 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
3716 .map(|s| {
3717 let mode = s.mode.map(|m| {
3718 ObjectLockRetentionMode::from_static(match m {
3719 crate::object_lock::LockMode::Governance => {
3720 ObjectLockRetentionMode::GOVERNANCE
3721 }
3722 crate::object_lock::LockMode::Compliance => {
3723 ObjectLockRetentionMode::COMPLIANCE
3724 }
3725 })
3726 });
3727 let until = s.retain_until.map(chrono_utc_to_timestamp);
3728 ObjectLockRetention {
3729 mode,
3730 retain_until_date: until,
3731 }
3732 });
3733 let output = GetObjectRetentionOutput { retention };
3734 return Ok(S3Response::new(output));
3735 }
3736 self.backend.get_object_retention(req).await
3737 }
3738 async fn put_object_retention(
3739 &self,
3740 req: S3Request<PutObjectRetentionInput>,
3741 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
3742 if let Some(mgr) = self.object_lock.as_ref() {
3743 let bucket = req.input.bucket.clone();
3744 let key = req.input.key.clone();
3745 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3746 let retention = req.input.retention.as_ref().ok_or_else(|| {
3747 S3Error::with_message(
3748 S3ErrorCode::InvalidRequest,
3749 "PutObjectRetention requires a Retention element",
3750 )
3751 })?;
3752 let new_mode = retention
3753 .mode
3754 .as_ref()
3755 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3756 let new_until = retention
3757 .retain_until_date
3758 .as_ref()
3759 .map(timestamp_to_chrono_utc)
3760 .unwrap_or(None);
3761 let now = chrono::Utc::now();
3762 let existing = mgr.get(&bucket, &key).unwrap_or_default();
3763 if let Some(existing_mode) = existing.mode
3769 && existing_mode == crate::object_lock::LockMode::Compliance
3770 && existing.is_locked(now)
3771 {
3772 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
3773 return Err(S3Error::with_message(
3774 S3ErrorCode::AccessDenied,
3775 "Cannot downgrade Compliance retention to Governance while lock is active",
3776 ));
3777 }
3778 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3779 && next < prev
3780 {
3781 return Err(S3Error::with_message(
3782 S3ErrorCode::AccessDenied,
3783 "Cannot shorten Compliance retention while lock is active",
3784 ));
3785 }
3786 }
3787 if let Some(existing_mode) = existing.mode
3788 && existing_mode == crate::object_lock::LockMode::Governance
3789 && existing.is_locked(now)
3790 && !bypass
3791 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3792 && next < prev
3793 {
3794 return Err(S3Error::with_message(
3795 S3ErrorCode::AccessDenied,
3796 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
3797 ));
3798 }
3799 let mut state = existing;
3800 if new_mode.is_some() {
3801 state.mode = new_mode;
3802 }
3803 if new_until.is_some() {
3804 state.retain_until = new_until;
3805 }
3806 mgr.set(&bucket, &key, state);
3807 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
3808 }
3809 self.backend.put_object_retention(req).await
3810 }
3811
3812 async fn get_bucket_versioning(
3818 &self,
3819 req: S3Request<GetBucketVersioningInput>,
3820 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
3821 if let Some(mgr) = self.versioning.as_ref() {
3826 let output = match mgr.state(&req.input.bucket).as_aws_status() {
3827 Some(s) => GetBucketVersioningOutput {
3828 status: Some(BucketVersioningStatus::from(s.to_owned())),
3829 ..Default::default()
3830 },
3831 None => GetBucketVersioningOutput::default(),
3832 };
3833 return Ok(S3Response::new(output));
3834 }
3835 self.backend.get_bucket_versioning(req).await
3836 }
3837 async fn put_bucket_versioning(
3838 &self,
3839 req: S3Request<PutBucketVersioningInput>,
3840 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
3841 if let Some(mgr) = self.mfa_delete.as_ref()
3852 && let Some(target_enabled) = req
3853 .input
3854 .versioning_configuration
3855 .mfa_delete
3856 .as_ref()
3857 .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
3858 {
3859 let bucket = req.input.bucket.clone();
3860 let header = req.input.mfa.as_deref();
3861 let secret = mgr.lookup_secret(&bucket);
3862 let verified = match (header, secret.as_ref()) {
3863 (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
3864 Ok((serial, code)) => {
3865 serial == s.serial
3866 && crate::mfa::verify_totp(
3867 &s.secret_base32,
3868 &code,
3869 current_unix_secs(),
3870 )
3871 }
3872 Err(_) => false,
3873 },
3874 _ => false,
3875 };
3876 if !verified {
3877 crate::metrics::record_mfa_delete_denial(&bucket);
3878 let err = if header.is_none() {
3879 crate::mfa::MfaError::Missing
3880 } else {
3881 crate::mfa::MfaError::InvalidCode
3882 };
3883 return Err(mfa_error_to_s3(err));
3884 }
3885 mgr.set_bucket_state(&bucket, target_enabled);
3886 }
3887 if let Some(mgr) = self.versioning.as_ref() {
3893 let new_state = match req
3894 .input
3895 .versioning_configuration
3896 .status
3897 .as_ref()
3898 .map(|s| s.as_str())
3899 {
3900 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
3901 crate::versioning::VersioningState::Enabled
3902 }
3903 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
3904 crate::versioning::VersioningState::Suspended
3905 }
3906 _ => crate::versioning::VersioningState::Unversioned,
3907 };
3908 mgr.set_state(&req.input.bucket, new_state);
3909 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
3910 }
3911 self.backend.put_bucket_versioning(req).await
3912 }
3913
3914 async fn get_bucket_location(
3916 &self,
3917 req: S3Request<GetBucketLocationInput>,
3918 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
3919 self.backend.get_bucket_location(req).await
3920 }
3921
3922 async fn get_bucket_policy(
3924 &self,
3925 req: S3Request<GetBucketPolicyInput>,
3926 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
3927 self.backend.get_bucket_policy(req).await
3928 }
3929 async fn put_bucket_policy(
3930 &self,
3931 req: S3Request<PutBucketPolicyInput>,
3932 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
3933 self.backend.put_bucket_policy(req).await
3934 }
3935 async fn delete_bucket_policy(
3936 &self,
3937 req: S3Request<DeleteBucketPolicyInput>,
3938 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
3939 self.backend.delete_bucket_policy(req).await
3940 }
3941 async fn get_bucket_policy_status(
3942 &self,
3943 req: S3Request<GetBucketPolicyStatusInput>,
3944 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
3945 self.backend.get_bucket_policy_status(req).await
3946 }
3947
3948 async fn get_bucket_acl(
3950 &self,
3951 req: S3Request<GetBucketAclInput>,
3952 ) -> S3Result<S3Response<GetBucketAclOutput>> {
3953 self.backend.get_bucket_acl(req).await
3954 }
3955 async fn put_bucket_acl(
3956 &self,
3957 req: S3Request<PutBucketAclInput>,
3958 ) -> S3Result<S3Response<PutBucketAclOutput>> {
3959 self.backend.put_bucket_acl(req).await
3960 }
3961
3962 async fn get_bucket_cors(
3964 &self,
3965 req: S3Request<GetBucketCorsInput>,
3966 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
3967 if let Some(mgr) = self.cors.as_ref() {
3968 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
3969 S3Error::with_message(
3970 S3ErrorCode::NoSuchCORSConfiguration,
3971 "The CORS configuration does not exist".to_string(),
3972 )
3973 })?;
3974 let rules: Vec<CORSRule> = cfg
3975 .rules
3976 .into_iter()
3977 .map(|r| CORSRule {
3978 allowed_headers: if r.allowed_headers.is_empty() {
3979 None
3980 } else {
3981 Some(r.allowed_headers)
3982 },
3983 allowed_methods: r.allowed_methods,
3984 allowed_origins: r.allowed_origins,
3985 expose_headers: if r.expose_headers.is_empty() {
3986 None
3987 } else {
3988 Some(r.expose_headers)
3989 },
3990 id: r.id,
3991 max_age_seconds: r.max_age_seconds.map(|s| s as i32),
3992 })
3993 .collect();
3994 return Ok(S3Response::new(GetBucketCorsOutput {
3995 cors_rules: Some(rules),
3996 }));
3997 }
3998 self.backend.get_bucket_cors(req).await
3999 }
4000 async fn put_bucket_cors(
4001 &self,
4002 req: S3Request<PutBucketCorsInput>,
4003 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4004 if let Some(mgr) = self.cors.as_ref() {
4005 let cfg = crate::cors::CorsConfig {
4006 rules: req
4007 .input
4008 .cors_configuration
4009 .cors_rules
4010 .into_iter()
4011 .map(|r| crate::cors::CorsRule {
4012 allowed_origins: r.allowed_origins,
4013 allowed_methods: r.allowed_methods,
4014 allowed_headers: r.allowed_headers.unwrap_or_default(),
4015 expose_headers: r.expose_headers.unwrap_or_default(),
4016 max_age_seconds: r.max_age_seconds.and_then(|s| {
4017 if s < 0 { None } else { Some(s as u32) }
4018 }),
4019 id: r.id,
4020 })
4021 .collect(),
4022 };
4023 mgr.put(&req.input.bucket, cfg);
4024 return Ok(S3Response::new(PutBucketCorsOutput::default()));
4025 }
4026 self.backend.put_bucket_cors(req).await
4027 }
4028 async fn delete_bucket_cors(
4029 &self,
4030 req: S3Request<DeleteBucketCorsInput>,
4031 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4032 if let Some(mgr) = self.cors.as_ref() {
4033 mgr.delete(&req.input.bucket);
4034 return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4035 }
4036 self.backend.delete_bucket_cors(req).await
4037 }
4038
4039 async fn get_bucket_lifecycle_configuration(
4041 &self,
4042 req: S3Request<GetBucketLifecycleConfigurationInput>,
4043 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4044 if let Some(mgr) = self.lifecycle.as_ref() {
4045 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4046 S3Error::with_message(
4047 S3ErrorCode::NoSuchLifecycleConfiguration,
4048 "The lifecycle configuration does not exist".to_string(),
4049 )
4050 })?;
4051 let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4052 return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4053 rules: Some(rules),
4054 transition_default_minimum_object_size: None,
4055 }));
4056 }
4057 self.backend.get_bucket_lifecycle_configuration(req).await
4058 }
4059 async fn put_bucket_lifecycle_configuration(
4060 &self,
4061 req: S3Request<PutBucketLifecycleConfigurationInput>,
4062 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4063 if let Some(mgr) = self.lifecycle.as_ref() {
4064 let bucket = req.input.bucket.clone();
4065 let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4066 let cfg = dto_lifecycle_to_internal(&dto_cfg);
4067 mgr.put(&bucket, cfg);
4068 return Ok(S3Response::new(
4069 PutBucketLifecycleConfigurationOutput::default(),
4070 ));
4071 }
4072 self.backend.put_bucket_lifecycle_configuration(req).await
4073 }
4074 async fn delete_bucket_lifecycle(
4075 &self,
4076 req: S3Request<DeleteBucketLifecycleInput>,
4077 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4078 if let Some(mgr) = self.lifecycle.as_ref() {
4079 mgr.delete(&req.input.bucket);
4080 return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4081 }
4082 self.backend.delete_bucket_lifecycle(req).await
4083 }
4084
4085 async fn get_bucket_tagging(
4087 &self,
4088 req: S3Request<GetBucketTaggingInput>,
4089 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4090 let Some(mgr) = self.tagging.as_ref() else {
4091 return self.backend.get_bucket_tagging(req).await;
4092 };
4093 let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4094 Ok(S3Response::new(GetBucketTaggingOutput {
4095 tag_set: tagset_to_aws(&tags),
4096 }))
4097 }
4098 async fn put_bucket_tagging(
4099 &self,
4100 req: S3Request<PutBucketTaggingInput>,
4101 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
4102 let Some(mgr) = self.tagging.as_ref() else {
4103 return self.backend.put_bucket_tagging(req).await;
4104 };
4105 let bucket = req.input.bucket.clone();
4106 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4107 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4108 })?;
4109 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4110 mgr.put_bucket_tags(&bucket, parsed);
4111 Ok(S3Response::new(PutBucketTaggingOutput::default()))
4112 }
4113 async fn delete_bucket_tagging(
4114 &self,
4115 req: S3Request<DeleteBucketTaggingInput>,
4116 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
4117 let Some(mgr) = self.tagging.as_ref() else {
4118 return self.backend.delete_bucket_tagging(req).await;
4119 };
4120 let bucket = req.input.bucket.clone();
4121 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4122 mgr.delete_bucket_tags(&bucket);
4123 Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
4124 }
4125
4126 async fn get_bucket_encryption(
4128 &self,
4129 req: S3Request<GetBucketEncryptionInput>,
4130 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
4131 self.backend.get_bucket_encryption(req).await
4132 }
4133 async fn put_bucket_encryption(
4134 &self,
4135 req: S3Request<PutBucketEncryptionInput>,
4136 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
4137 self.backend.put_bucket_encryption(req).await
4138 }
4139 async fn delete_bucket_encryption(
4140 &self,
4141 req: S3Request<DeleteBucketEncryptionInput>,
4142 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
4143 self.backend.delete_bucket_encryption(req).await
4144 }
4145
4146 async fn get_bucket_logging(
4148 &self,
4149 req: S3Request<GetBucketLoggingInput>,
4150 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
4151 self.backend.get_bucket_logging(req).await
4152 }
4153 async fn put_bucket_logging(
4154 &self,
4155 req: S3Request<PutBucketLoggingInput>,
4156 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
4157 self.backend.put_bucket_logging(req).await
4158 }
4159
4160 async fn get_bucket_notification_configuration(
4170 &self,
4171 req: S3Request<GetBucketNotificationConfigurationInput>,
4172 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
4173 if let Some(mgr) = self.notifications.as_ref() {
4174 let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
4175 let dto = notif_to_dto(&cfg);
4176 return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
4177 event_bridge_configuration: dto.event_bridge_configuration,
4178 lambda_function_configurations: dto.lambda_function_configurations,
4179 queue_configurations: dto.queue_configurations,
4180 topic_configurations: dto.topic_configurations,
4181 }));
4182 }
4183 self.backend
4184 .get_bucket_notification_configuration(req)
4185 .await
4186 }
4187 async fn put_bucket_notification_configuration(
4188 &self,
4189 req: S3Request<PutBucketNotificationConfigurationInput>,
4190 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
4191 if let Some(mgr) = self.notifications.as_ref() {
4192 let cfg = notif_from_dto(&req.input.notification_configuration);
4193 mgr.put(&req.input.bucket, cfg);
4194 return Ok(S3Response::new(
4195 PutBucketNotificationConfigurationOutput::default(),
4196 ));
4197 }
4198 self.backend
4199 .put_bucket_notification_configuration(req)
4200 .await
4201 }
4202
4203 async fn get_bucket_request_payment(
4205 &self,
4206 req: S3Request<GetBucketRequestPaymentInput>,
4207 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4208 self.backend.get_bucket_request_payment(req).await
4209 }
4210 async fn put_bucket_request_payment(
4211 &self,
4212 req: S3Request<PutBucketRequestPaymentInput>,
4213 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4214 self.backend.put_bucket_request_payment(req).await
4215 }
4216
4217 async fn get_bucket_website(
4219 &self,
4220 req: S3Request<GetBucketWebsiteInput>,
4221 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4222 self.backend.get_bucket_website(req).await
4223 }
4224 async fn put_bucket_website(
4225 &self,
4226 req: S3Request<PutBucketWebsiteInput>,
4227 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4228 self.backend.put_bucket_website(req).await
4229 }
4230 async fn delete_bucket_website(
4231 &self,
4232 req: S3Request<DeleteBucketWebsiteInput>,
4233 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4234 self.backend.delete_bucket_website(req).await
4235 }
4236
4237 async fn get_bucket_replication(
4239 &self,
4240 req: S3Request<GetBucketReplicationInput>,
4241 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4242 if let Some(mgr) = self.replication.as_ref() {
4243 return match mgr.get(&req.input.bucket) {
4244 Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4245 replication_configuration: Some(replication_to_dto(&cfg)),
4246 })),
4247 None => Err(S3Error::with_message(
4248 S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4249 format!("no replication configuration on bucket {}", req.input.bucket),
4250 )),
4251 };
4252 }
4253 self.backend.get_bucket_replication(req).await
4254 }
4255 async fn put_bucket_replication(
4256 &self,
4257 req: S3Request<PutBucketReplicationInput>,
4258 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4259 if let Some(mgr) = self.replication.as_ref() {
4260 let cfg = replication_from_dto(&req.input.replication_configuration);
4261 mgr.put(&req.input.bucket, cfg);
4262 return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4263 }
4264 self.backend.put_bucket_replication(req).await
4265 }
4266 async fn delete_bucket_replication(
4267 &self,
4268 req: S3Request<DeleteBucketReplicationInput>,
4269 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
4270 if let Some(mgr) = self.replication.as_ref() {
4271 mgr.delete(&req.input.bucket);
4272 return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
4273 }
4274 self.backend.delete_bucket_replication(req).await
4275 }
4276
4277 async fn get_bucket_accelerate_configuration(
4279 &self,
4280 req: S3Request<GetBucketAccelerateConfigurationInput>,
4281 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
4282 self.backend.get_bucket_accelerate_configuration(req).await
4283 }
4284 async fn put_bucket_accelerate_configuration(
4285 &self,
4286 req: S3Request<PutBucketAccelerateConfigurationInput>,
4287 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
4288 self.backend.put_bucket_accelerate_configuration(req).await
4289 }
4290
4291 async fn get_bucket_ownership_controls(
4293 &self,
4294 req: S3Request<GetBucketOwnershipControlsInput>,
4295 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
4296 self.backend.get_bucket_ownership_controls(req).await
4297 }
4298 async fn put_bucket_ownership_controls(
4299 &self,
4300 req: S3Request<PutBucketOwnershipControlsInput>,
4301 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
4302 self.backend.put_bucket_ownership_controls(req).await
4303 }
4304 async fn delete_bucket_ownership_controls(
4305 &self,
4306 req: S3Request<DeleteBucketOwnershipControlsInput>,
4307 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
4308 self.backend.delete_bucket_ownership_controls(req).await
4309 }
4310
4311 async fn get_public_access_block(
4313 &self,
4314 req: S3Request<GetPublicAccessBlockInput>,
4315 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
4316 self.backend.get_public_access_block(req).await
4317 }
4318 async fn put_public_access_block(
4319 &self,
4320 req: S3Request<PutPublicAccessBlockInput>,
4321 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
4322 self.backend.put_public_access_block(req).await
4323 }
4324 async fn delete_public_access_block(
4325 &self,
4326 req: S3Request<DeletePublicAccessBlockInput>,
4327 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
4328 self.backend.delete_public_access_block(req).await
4329 }
4330
4331 async fn select_object_content(
4350 &self,
4351 req: S3Request<SelectObjectContentInput>,
4352 ) -> S3Result<S3Response<SelectObjectContentOutput>> {
4353 use crate::select::{
4354 EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
4355 run_select_jsonlines,
4356 };
4357
4358 let select_bucket = req.input.bucket.clone();
4359 let select_key = req.input.key.clone();
4360 self.enforce_rate_limit(&req, &select_bucket)?;
4361 self.enforce_policy(
4362 &req,
4363 "s3:GetObject",
4364 &select_bucket,
4365 Some(&select_key),
4366 )?;
4367
4368 let request = req.input.request;
4369 let sql = request.expression.clone();
4370 if request.expression_type.as_str() != "SQL" {
4371 return Err(S3Error::with_message(
4372 S3ErrorCode::InvalidExpressionType,
4373 format!(
4374 "ExpressionType must be SQL, got: {}",
4375 request.expression_type.as_str()
4376 ),
4377 ));
4378 }
4379
4380 let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
4381 SelectInputFormat::JsonLines
4382 } else if let Some(csv) = request.input_serialization.csv.as_ref() {
4383 let has_header = csv
4384 .file_header_info
4385 .as_ref()
4386 .map(|h| {
4387 let s = h.as_str();
4388 s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
4389 })
4390 .unwrap_or(false);
4391 let delim = csv
4392 .field_delimiter
4393 .as_deref()
4394 .and_then(|s| s.chars().next())
4395 .unwrap_or(',');
4396 SelectInputFormat::Csv {
4397 has_header,
4398 delimiter: delim,
4399 }
4400 } else if request.input_serialization.parquet.is_some() {
4401 return Err(S3Error::with_message(
4402 S3ErrorCode::NotImplemented,
4403 "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
4404 ));
4405 } else {
4406 return Err(S3Error::with_message(
4407 S3ErrorCode::InvalidRequest,
4408 "InputSerialization requires exactly one of CSV / JSON / Parquet",
4409 ));
4410 };
4411 if let Some(ct) = request.input_serialization.compression_type.as_ref()
4412 && !ct.as_str().eq_ignore_ascii_case("NONE")
4413 {
4414 return Err(S3Error::with_message(
4415 S3ErrorCode::NotImplemented,
4416 format!(
4417 "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
4418 ct.as_str()
4419 ),
4420 ));
4421 }
4422
4423 let output_format = if request.output_serialization.json.is_some() {
4424 SelectOutputFormat::Json
4425 } else if request.output_serialization.csv.is_some() {
4426 SelectOutputFormat::Csv
4427 } else {
4428 return Err(S3Error::with_message(
4429 S3ErrorCode::InvalidRequest,
4430 "OutputSerialization requires exactly one of CSV / JSON",
4431 ));
4432 };
4433
4434 let get_input = GetObjectInput {
4435 bucket: select_bucket.clone(),
4436 key: select_key.clone(),
4437 sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
4438 sse_customer_key: req.input.sse_customer_key.clone(),
4439 sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
4440 ..Default::default()
4441 };
4442 let get_req = S3Request {
4443 input: get_input,
4444 method: http::Method::GET,
4445 uri: format!("/{}/{}", select_bucket, select_key)
4446 .parse()
4447 .map_err(|e| {
4448 S3Error::with_message(
4449 S3ErrorCode::InternalError,
4450 format!("constructing inner GET URI: {e}"),
4451 )
4452 })?,
4453 headers: http::HeaderMap::new(),
4454 extensions: http::Extensions::new(),
4455 credentials: req.credentials.clone(),
4456 region: req.region.clone(),
4457 service: req.service.clone(),
4458 trailing_headers: None,
4459 };
4460 let mut get_resp = self.get_object(get_req).await?;
4461 let blob = get_resp.output.body.take().ok_or_else(|| {
4462 S3Error::with_message(
4463 S3ErrorCode::InternalError,
4464 "Select: object body was empty after GET",
4465 )
4466 })?;
4467 let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
4468 .await
4469 .map_err(internal("collect Select body"))?;
4470 let scanned = body_bytes.len() as u64;
4471
4472 let matched_payload = match input_format {
4473 SelectInputFormat::JsonLines => {
4474 run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
4475 |e| select_error_to_s3(e, "JSON Lines"),
4476 )?
4477 }
4478 SelectInputFormat::Csv { .. } => {
4479 run_select_csv(&sql, &body_bytes, input_format, output_format)
4480 .map_err(|e| select_error_to_s3(e, "CSV"))?
4481 }
4482 };
4483
4484 let returned = matched_payload.len() as u64;
4485 let processed = scanned;
4486 let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
4487 if !matched_payload.is_empty() {
4488 events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
4489 payload: Some(bytes::Bytes::from(matched_payload)),
4490 })));
4491 }
4492 events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
4493 details: Some(Stats {
4494 bytes_scanned: Some(scanned as i64),
4495 bytes_processed: Some(processed as i64),
4496 bytes_returned: Some(returned as i64),
4497 }),
4498 })));
4499 events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
4500 let _writer = EventStreamWriter::new();
4503
4504 let stream =
4505 SelectObjectContentEventStream::new(futures::stream::iter(events));
4506 let output = SelectObjectContentOutput {
4507 payload: Some(stream),
4508 };
4509 Ok(S3Response::new(output))
4510 }
4511
4512 async fn put_bucket_inventory_configuration(
4526 &self,
4527 req: S3Request<PutBucketInventoryConfigurationInput>,
4528 ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
4529 if let Some(mgr) = self.inventory.as_ref() {
4530 let cfg = inv_from_dto(
4531 &req.input.bucket,
4532 &req.input.id,
4533 &req.input.inventory_configuration,
4534 );
4535 mgr.put(cfg);
4536 return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
4537 }
4538 self.backend.put_bucket_inventory_configuration(req).await
4539 }
4540
4541 async fn get_bucket_inventory_configuration(
4542 &self,
4543 req: S3Request<GetBucketInventoryConfigurationInput>,
4544 ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
4545 if let Some(mgr) = self.inventory.as_ref() {
4546 let cfg = mgr.get(&req.input.bucket, &req.input.id);
4547 if let Some(cfg) = cfg {
4548 let out = GetBucketInventoryConfigurationOutput {
4549 inventory_configuration: Some(inv_to_dto(&cfg)),
4550 };
4551 return Ok(S3Response::new(out));
4552 }
4553 let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
4560 .unwrap_or(S3ErrorCode::NoSuchKey);
4561 return Err(S3Error::with_message(
4562 code,
4563 format!(
4564 "no inventory configuration with id={} on bucket={}",
4565 req.input.id, req.input.bucket
4566 ),
4567 ));
4568 }
4569 self.backend.get_bucket_inventory_configuration(req).await
4570 }
4571
4572 async fn list_bucket_inventory_configurations(
4573 &self,
4574 req: S3Request<ListBucketInventoryConfigurationsInput>,
4575 ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
4576 if let Some(mgr) = self.inventory.as_ref() {
4577 let list = mgr.list_for_bucket(&req.input.bucket);
4578 let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
4579 let out = ListBucketInventoryConfigurationsOutput {
4580 continuation_token: req.input.continuation_token.clone(),
4581 inventory_configuration_list: if dto_list.is_empty() {
4582 None
4583 } else {
4584 Some(dto_list)
4585 },
4586 is_truncated: Some(false),
4587 next_continuation_token: None,
4588 };
4589 return Ok(S3Response::new(out));
4590 }
4591 self.backend.list_bucket_inventory_configurations(req).await
4592 }
4593
4594 async fn delete_bucket_inventory_configuration(
4595 &self,
4596 req: S3Request<DeleteBucketInventoryConfigurationInput>,
4597 ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
4598 if let Some(mgr) = self.inventory.as_ref() {
4599 mgr.delete(&req.input.bucket, &req.input.id);
4600 return Ok(S3Response::new(
4601 DeleteBucketInventoryConfigurationOutput::default(),
4602 ));
4603 }
4604 self.backend.delete_bucket_inventory_configuration(req).await
4605 }
4606}
4607
4608fn inv_from_dto(
4618 bucket: &str,
4619 id: &str,
4620 dto: &InventoryConfiguration,
4621) -> crate::inventory::InventoryConfig {
4622 let frequency_hours = match dto.schedule.frequency.as_str() {
4623 "Weekly" => 24 * 7,
4624 _ => 24,
4628 };
4629 let format = crate::inventory::InventoryFormat::Csv;
4633 crate::inventory::InventoryConfig {
4634 id: id.to_owned(),
4635 bucket: bucket.to_owned(),
4636 destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
4637 destination_prefix: dto
4638 .destination
4639 .s3_bucket_destination
4640 .prefix
4641 .clone()
4642 .unwrap_or_default(),
4643 frequency_hours,
4644 format,
4645 included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
4646 dto.included_object_versions.as_str(),
4647 ),
4648 }
4649}
4650
4651fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
4652 InventoryConfiguration {
4653 id: cfg.id.clone(),
4654 is_enabled: true,
4655 included_object_versions: InventoryIncludedObjectVersions::from(
4656 cfg.included_object_versions.as_aws_str().to_owned(),
4657 ),
4658 destination: InventoryDestination {
4659 s3_bucket_destination: InventoryS3BucketDestination {
4660 account_id: None,
4661 bucket: cfg.destination_bucket.clone(),
4662 encryption: None,
4663 format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
4664 prefix: if cfg.destination_prefix.is_empty() {
4665 None
4666 } else {
4667 Some(cfg.destination_prefix.clone())
4668 },
4669 },
4670 },
4671 schedule: InventorySchedule {
4672 frequency: InventoryFrequency::from(
4676 if cfg.frequency_hours == 24 * 7 {
4677 "Weekly"
4678 } else {
4679 "Daily"
4680 }
4681 .to_owned(),
4682 ),
4683 },
4684 filter: None,
4685 optional_fields: None,
4686 }
4687}
4688
4689fn notif_from_dto(
4706 dto: &NotificationConfiguration,
4707) -> crate::notifications::NotificationConfig {
4708 let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
4709 if let Some(topics) = dto.topic_configurations.as_ref() {
4710 for (idx, t) in topics.iter().enumerate() {
4711 let events = events_from_dto(&t.events);
4712 let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
4713 rules.push(crate::notifications::NotificationRule {
4714 id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
4715 events,
4716 destination: crate::notifications::Destination::Sns {
4717 topic_arn: t.topic_arn.clone(),
4718 },
4719 filter_prefix: prefix,
4720 filter_suffix: suffix,
4721 });
4722 }
4723 }
4724 if let Some(queues) = dto.queue_configurations.as_ref() {
4725 for (idx, q) in queues.iter().enumerate() {
4726 let events = events_from_dto(&q.events);
4727 let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
4728 rules.push(crate::notifications::NotificationRule {
4729 id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
4730 events,
4731 destination: crate::notifications::Destination::Sqs {
4732 queue_arn: q.queue_arn.clone(),
4733 },
4734 filter_prefix: prefix,
4735 filter_suffix: suffix,
4736 });
4737 }
4738 }
4739 crate::notifications::NotificationConfig { rules }
4740}
4741
4742fn notif_to_dto(
4743 cfg: &crate::notifications::NotificationConfig,
4744) -> NotificationConfiguration {
4745 let mut topics: Vec<TopicConfiguration> = Vec::new();
4746 let mut queues: Vec<QueueConfiguration> = Vec::new();
4747 for rule in &cfg.rules {
4748 let events: Vec<Event> = rule
4749 .events
4750 .iter()
4751 .map(|e| Event::from(e.as_aws_str().to_owned()))
4752 .collect();
4753 let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
4754 match &rule.destination {
4755 crate::notifications::Destination::Sns { topic_arn } => {
4756 topics.push(TopicConfiguration {
4757 events,
4758 filter,
4759 id: Some(rule.id.clone()),
4760 topic_arn: topic_arn.clone(),
4761 });
4762 }
4763 crate::notifications::Destination::Sqs { queue_arn } => {
4764 queues.push(QueueConfiguration {
4765 events,
4766 filter,
4767 id: Some(rule.id.clone()),
4768 queue_arn: queue_arn.clone(),
4769 });
4770 }
4771 crate::notifications::Destination::Webhook { .. } => {}
4776 }
4777 }
4778 NotificationConfiguration {
4779 event_bridge_configuration: None,
4780 lambda_function_configurations: None,
4781 queue_configurations: if queues.is_empty() { None } else { Some(queues) },
4782 topic_configurations: if topics.is_empty() { None } else { Some(topics) },
4783 }
4784}
4785
4786fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
4787 events
4788 .iter()
4789 .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
4790 .collect()
4791}
4792
4793fn filter_from_dto(
4794 f: Option<&NotificationConfigurationFilter>,
4795) -> (Option<String>, Option<String>) {
4796 let Some(f) = f else {
4797 return (None, None);
4798 };
4799 let Some(key) = f.key.as_ref() else {
4800 return (None, None);
4801 };
4802 let Some(rules) = key.filter_rules.as_ref() else {
4803 return (None, None);
4804 };
4805 let mut prefix = None;
4806 let mut suffix = None;
4807 for r in rules {
4808 let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
4809 let value = r.value.clone();
4810 match name.as_deref() {
4811 Some("prefix") => prefix = value,
4812 Some("suffix") => suffix = value,
4813 _ => {}
4814 }
4815 }
4816 (prefix, suffix)
4817}
4818
4819fn filter_to_dto(
4820 prefix: Option<&str>,
4821 suffix: Option<&str>,
4822) -> Option<NotificationConfigurationFilter> {
4823 if prefix.is_none() && suffix.is_none() {
4824 return None;
4825 }
4826 let mut rules: Vec<FilterRule> = Vec::new();
4827 if let Some(p) = prefix {
4828 rules.push(FilterRule {
4829 name: Some(FilterRuleName::from("prefix".to_owned())),
4830 value: Some(p.to_owned()),
4831 });
4832 }
4833 if let Some(s) = suffix {
4834 rules.push(FilterRule {
4835 name: Some(FilterRuleName::from("suffix".to_owned())),
4836 value: Some(s.to_owned()),
4837 });
4838 }
4839 Some(NotificationConfigurationFilter {
4840 key: Some(S3KeyFilter {
4841 filter_rules: Some(rules),
4842 }),
4843 })
4844}
4845
4846fn replication_from_dto(
4859 dto: &ReplicationConfiguration,
4860) -> crate::replication::ReplicationConfig {
4861 let rules = dto
4862 .rules
4863 .iter()
4864 .enumerate()
4865 .map(|(idx, r)| {
4866 let id = r
4867 .id
4868 .as_ref()
4869 .map(|s| s.as_str().to_owned())
4870 .unwrap_or_else(|| format!("rule-{idx}"));
4871 let priority = r.priority.unwrap_or(0).max(0) as u32;
4872 let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
4873 let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
4874 let destination_bucket = r.destination.bucket.clone();
4875 let destination_storage_class = r
4876 .destination
4877 .storage_class
4878 .as_ref()
4879 .map(|s| s.as_str().to_owned());
4880 crate::replication::ReplicationRule {
4881 id,
4882 priority,
4883 status_enabled,
4884 filter,
4885 destination_bucket,
4886 destination_storage_class,
4887 }
4888 })
4889 .collect();
4890 crate::replication::ReplicationConfig {
4891 role: dto.role.clone(),
4892 rules,
4893 }
4894}
4895
4896fn replication_to_dto(
4897 cfg: &crate::replication::ReplicationConfig,
4898) -> ReplicationConfiguration {
4899 let rules = cfg
4900 .rules
4901 .iter()
4902 .map(|r| {
4903 let status = if r.status_enabled {
4904 ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
4905 } else {
4906 ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
4907 };
4908 let destination = Destination {
4909 access_control_translation: None,
4910 account: None,
4911 bucket: r.destination_bucket.clone(),
4912 encryption_configuration: None,
4913 metrics: None,
4914 replication_time: None,
4915 storage_class: r
4916 .destination_storage_class
4917 .as_ref()
4918 .map(|s| StorageClass::from(s.clone())),
4919 };
4920 let filter = Some(replication_filter_to_dto(&r.filter));
4921 ReplicationRule {
4922 delete_marker_replication: None,
4923 destination,
4924 existing_object_replication: None,
4925 filter,
4926 id: Some(r.id.clone()),
4927 prefix: None,
4928 priority: Some(r.priority as i32),
4929 source_selection_criteria: None,
4930 status,
4931 }
4932 })
4933 .collect();
4934 ReplicationConfiguration {
4935 role: cfg.role.clone(),
4936 rules,
4937 }
4938}
4939
4940fn replication_filter_from_dto(
4941 f: Option<&ReplicationRuleFilter>,
4942 rule_level_prefix: Option<&str>,
4943) -> crate::replication::ReplicationFilter {
4944 let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
4945 let mut tags: Vec<(String, String)> = Vec::new();
4946 if let Some(f) = f {
4947 if let Some(p) = f.prefix.as_ref()
4948 && prefix.is_none()
4949 {
4950 prefix = Some(p.clone());
4951 }
4952 if let Some(t) = f.tag.as_ref()
4953 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
4954 {
4955 tags.push((k.clone(), v.clone()));
4956 }
4957 if let Some(and) = f.and.as_ref() {
4958 if let Some(p) = and.prefix.as_ref()
4959 && prefix.is_none()
4960 {
4961 prefix = Some(p.clone());
4962 }
4963 if let Some(ts) = and.tags.as_ref() {
4964 for t in ts {
4965 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
4966 tags.push((k.clone(), v.clone()));
4967 }
4968 }
4969 }
4970 }
4971 }
4972 crate::replication::ReplicationFilter { prefix, tags }
4973}
4974
4975fn replication_filter_to_dto(
4976 f: &crate::replication::ReplicationFilter,
4977) -> ReplicationRuleFilter {
4978 if f.tags.is_empty() {
4979 ReplicationRuleFilter {
4980 and: None,
4981 prefix: f.prefix.clone(),
4982 tag: None,
4983 }
4984 } else if f.tags.len() == 1 && f.prefix.is_none() {
4985 let (k, v) = &f.tags[0];
4986 ReplicationRuleFilter {
4987 and: None,
4988 prefix: None,
4989 tag: Some(Tag {
4990 key: Some(k.clone()),
4991 value: Some(v.clone()),
4992 }),
4993 }
4994 } else {
4995 let tags: Vec<Tag> = f
4996 .tags
4997 .iter()
4998 .map(|(k, v)| Tag {
4999 key: Some(k.clone()),
5000 value: Some(v.clone()),
5001 })
5002 .collect();
5003 ReplicationRuleFilter {
5004 and: Some(ReplicationRuleAndOperator {
5005 prefix: f.prefix.clone(),
5006 tags: Some(tags),
5007 }),
5008 prefix: None,
5009 tag: None,
5010 }
5011 }
5012}
5013
5014fn dto_lifecycle_to_internal(
5028 dto: &BucketLifecycleConfiguration,
5029) -> crate::lifecycle::LifecycleConfig {
5030 crate::lifecycle::LifecycleConfig {
5031 rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5032 }
5033}
5034
5035fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5036 let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5037 let filter = rule
5038 .filter
5039 .as_ref()
5040 .map(dto_filter_to_internal)
5041 .unwrap_or_default();
5042 let expiration_days = rule
5043 .expiration
5044 .as_ref()
5045 .and_then(|e| e.days)
5046 .and_then(|d| u32::try_from(d).ok());
5047 let expiration_date = rule
5048 .expiration
5049 .as_ref()
5050 .and_then(|e| e.date.as_ref())
5051 .and_then(timestamp_to_chrono_utc);
5052 let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5053 .transitions
5054 .as_ref()
5055 .map(|ts| {
5056 ts.iter()
5057 .filter_map(|t| {
5058 let days = u32::try_from(t.days?).ok()?;
5059 let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5060 Some(crate::lifecycle::TransitionRule {
5061 days,
5062 storage_class,
5063 })
5064 })
5065 .collect()
5066 })
5067 .unwrap_or_default();
5068 let noncurrent_version_expiration_days = rule
5069 .noncurrent_version_expiration
5070 .as_ref()
5071 .and_then(|n| n.noncurrent_days)
5072 .and_then(|d| u32::try_from(d).ok());
5073 let abort_incomplete_multipart_upload_days = rule
5074 .abort_incomplete_multipart_upload
5075 .as_ref()
5076 .and_then(|a| a.days_after_initiation)
5077 .and_then(|d| u32::try_from(d).ok());
5078 crate::lifecycle::LifecycleRule {
5079 id: rule.id.clone().unwrap_or_default(),
5080 status,
5081 filter,
5082 expiration_days,
5083 expiration_date,
5084 transitions,
5085 noncurrent_version_expiration_days,
5086 abort_incomplete_multipart_upload_days,
5087 }
5088}
5089
5090fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5091 let mut prefix = filter.prefix.clone();
5092 let mut tags: Vec<(String, String)> = Vec::new();
5093 let mut size_gt: Option<u64> = filter
5094 .object_size_greater_than
5095 .and_then(|n| u64::try_from(n).ok());
5096 let mut size_lt: Option<u64> = filter
5097 .object_size_less_than
5098 .and_then(|n| u64::try_from(n).ok());
5099 if let Some(t) = &filter.tag
5100 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5101 {
5102 tags.push((k.clone(), v.clone()));
5103 }
5104 if let Some(and) = &filter.and {
5105 if prefix.is_none() {
5106 prefix = and.prefix.clone();
5107 }
5108 if size_gt.is_none() {
5109 size_gt = and
5110 .object_size_greater_than
5111 .and_then(|n| u64::try_from(n).ok());
5112 }
5113 if size_lt.is_none() {
5114 size_lt = and
5115 .object_size_less_than
5116 .and_then(|n| u64::try_from(n).ok());
5117 }
5118 if let Some(ts) = &and.tags {
5119 for t in ts {
5120 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5121 tags.push((k.clone(), v.clone()));
5122 }
5123 }
5124 }
5125 }
5126 crate::lifecycle::LifecycleFilter {
5127 prefix,
5128 tags,
5129 object_size_greater_than: size_gt,
5130 object_size_less_than: size_lt,
5131 }
5132}
5133
5134fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
5135 let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
5136 Some(LifecycleExpiration {
5137 date: rule.expiration_date.map(chrono_utc_to_timestamp),
5138 days: rule.expiration_days.map(|d| d as i32),
5139 expired_object_delete_marker: None,
5140 })
5141 } else {
5142 None
5143 };
5144 let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
5145 None
5146 } else {
5147 Some(
5148 rule.transitions
5149 .iter()
5150 .map(|t| Transition {
5151 date: None,
5152 days: Some(t.days as i32),
5153 storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
5154 })
5155 .collect(),
5156 )
5157 };
5158 let noncurrent_version_expiration =
5159 rule.noncurrent_version_expiration_days
5160 .map(|d| NoncurrentVersionExpiration {
5161 newer_noncurrent_versions: None,
5162 noncurrent_days: Some(d as i32),
5163 });
5164 let abort_incomplete_multipart_upload =
5165 rule.abort_incomplete_multipart_upload_days
5166 .map(|d| AbortIncompleteMultipartUpload {
5167 days_after_initiation: Some(d as i32),
5168 });
5169 let filter = if rule.filter.tags.is_empty()
5170 && rule.filter.object_size_greater_than.is_none()
5171 && rule.filter.object_size_less_than.is_none()
5172 {
5173 rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
5174 and: None,
5175 object_size_greater_than: None,
5176 object_size_less_than: None,
5177 prefix: Some(p.clone()),
5178 tag: None,
5179 })
5180 } else if rule.filter.tags.len() == 1
5181 && rule.filter.prefix.is_none()
5182 && rule.filter.object_size_greater_than.is_none()
5183 && rule.filter.object_size_less_than.is_none()
5184 {
5185 let (k, v) = rule.filter.tags[0].clone();
5186 Some(LifecycleRuleFilter {
5187 and: None,
5188 object_size_greater_than: None,
5189 object_size_less_than: None,
5190 prefix: None,
5191 tag: Some(Tag {
5192 key: Some(k),
5193 value: Some(v),
5194 }),
5195 })
5196 } else {
5197 let tags = if rule.filter.tags.is_empty() {
5198 None
5199 } else {
5200 Some(
5201 rule.filter
5202 .tags
5203 .iter()
5204 .map(|(k, v)| Tag {
5205 key: Some(k.clone()),
5206 value: Some(v.clone()),
5207 })
5208 .collect(),
5209 )
5210 };
5211 Some(LifecycleRuleFilter {
5212 and: Some(LifecycleRuleAndOperator {
5213 object_size_greater_than: rule
5214 .filter
5215 .object_size_greater_than
5216 .and_then(|n| i64::try_from(n).ok()),
5217 object_size_less_than: rule
5218 .filter
5219 .object_size_less_than
5220 .and_then(|n| i64::try_from(n).ok()),
5221 prefix: rule.filter.prefix.clone(),
5222 tags,
5223 }),
5224 object_size_greater_than: None,
5225 object_size_less_than: None,
5226 prefix: None,
5227 tag: None,
5228 })
5229 };
5230 LifecycleRule {
5231 abort_incomplete_multipart_upload,
5232 expiration,
5233 filter,
5234 id: if rule.id.is_empty() {
5235 None
5236 } else {
5237 Some(rule.id.clone())
5238 },
5239 noncurrent_version_expiration,
5240 noncurrent_version_transitions: None,
5241 prefix: None,
5242 status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5243 transitions,
5244 }
5245}
5246
5247#[derive(Debug, Clone)]
5278pub struct SigV4aGate {
5279 store: crate::sigv4a::SharedSigV4aCredentialStore,
5280}
5281
5282impl SigV4aGate {
5283 #[must_use]
5284 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
5285 Self { store }
5286 }
5287
5288 pub fn pre_route<B>(
5304 &self,
5305 req: &http::Request<B>,
5306 requested_region: &str,
5307 canonical_request_bytes: &[u8],
5308 ) -> Result<(), SigV4aGateError> {
5309 if !crate::sigv4a::detect(req) {
5310 return Ok(());
5311 }
5312 let auth_hdr = req
5313 .headers()
5314 .get(http::header::AUTHORIZATION)
5315 .and_then(|v| v.to_str().ok())
5316 .ok_or(SigV4aGateError::MissingAuthorization)?;
5317 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
5318 .ok_or(SigV4aGateError::MalformedAuthorization)?;
5319 let region_set = req
5320 .headers()
5321 .get(crate::sigv4a::REGION_SET_HEADER)
5322 .and_then(|v| v.to_str().ok())
5323 .unwrap_or("*");
5324 let key = self
5325 .store
5326 .get(&parsed.access_key_id)
5327 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
5328 crate::sigv4a::verify(
5329 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
5330 &parsed.signature_der,
5331 key,
5332 region_set,
5333 requested_region,
5334 )
5335 .map_err(SigV4aGateError::Verify)?;
5336 Ok(())
5337 }
5338}
5339
5340#[derive(Debug, thiserror::Error)]
5344pub enum SigV4aGateError {
5345 #[error("missing Authorization header")]
5346 MissingAuthorization,
5347 #[error("malformed SigV4a Authorization header")]
5348 MalformedAuthorization,
5349 #[error("unknown SigV4a access-key-id: {0}")]
5350 UnknownAccessKey(String),
5351 #[error("SigV4a verification failed: {0}")]
5352 Verify(#[source] crate::sigv4a::SigV4aError),
5353}
5354
5355impl SigV4aGateError {
5356 #[must_use]
5358 pub fn s3_error_code(&self) -> &'static str {
5359 match self {
5360 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
5361 _ => "SignatureDoesNotMatch",
5362 }
5363 }
5364}
5365
5366#[cfg(test)]
5367mod tests {
5368 use super::*;
5369
5370 #[test]
5371 fn manifest_roundtrip_via_metadata() {
5372 let original = ChunkManifest {
5373 codec: CodecKind::CpuZstd,
5374 original_size: 1234,
5375 compressed_size: 567,
5376 crc32c: 0xdead_beef,
5377 };
5378 let mut meta: Option<Metadata> = None;
5379 write_manifest(&mut meta, &original);
5380 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
5381 assert_eq!(extracted.codec, original.codec);
5382 assert_eq!(extracted.original_size, original.original_size);
5383 assert_eq!(extracted.compressed_size, original.compressed_size);
5384 assert_eq!(extracted.crc32c, original.crc32c);
5385 }
5386
5387 #[test]
5388 fn missing_metadata_yields_none() {
5389 let meta: Option<Metadata> = None;
5390 assert!(extract_manifest(&meta).is_none());
5391 }
5392
5393 #[test]
5394 fn partial_metadata_yields_none() {
5395 let mut meta = Metadata::new();
5396 meta.insert(META_CODEC.into(), "cpu-zstd".into());
5397 let opt = Some(meta);
5398 assert!(extract_manifest(&opt).is_none());
5399 }
5400
5401 #[test]
5402 fn parse_copy_source_range_basic() {
5403 let r = parse_copy_source_range("bytes=10-20").unwrap();
5404 match r {
5405 s3s::dto::Range::Int { first, last } => {
5406 assert_eq!(first, 10);
5407 assert_eq!(last, Some(20));
5408 }
5409 _ => panic!("expected Int range"),
5410 }
5411 }
5412
5413 #[test]
5414 fn parse_copy_source_range_rejects_inverted() {
5415 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
5416 assert!(err.contains("last < first"));
5417 }
5418
5419 #[test]
5420 fn parse_copy_source_range_rejects_missing_prefix() {
5421 let err = parse_copy_source_range("10-20").unwrap_err();
5422 assert!(err.contains("must start with 'bytes='"));
5423 }
5424
5425 #[test]
5426 fn parse_copy_source_range_rejects_open_ended() {
5427 assert!(parse_copy_source_range("bytes=10-").is_err());
5430 assert!(parse_copy_source_range("bytes=-10").is_err());
5431 }
5432
5433 #[test]
5439 fn safe_object_uri_basic_ascii() {
5440 let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
5441 assert_eq!(uri.path(), "/bucket/key");
5442 }
5443
5444 #[test]
5445 fn safe_object_uri_encodes_spaces() {
5446 let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
5447 assert!(
5449 uri.path().contains("%20"),
5450 "expected percent-encoded space, got {}",
5451 uri.path()
5452 );
5453 assert!(uri.path().starts_with("/bucket/"));
5454 }
5455
5456 #[test]
5457 fn safe_object_uri_preserves_slashes() {
5458 let uri =
5462 safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
5463 assert_eq!(uri.path(), "/bucket/key/with/slashes");
5464 }
5465
5466 #[test]
5467 fn safe_object_uri_handles_newline_without_panic() {
5468 let _ = safe_object_uri("bucket", "key\n");
5472 }
5473
5474 #[test]
5475 fn safe_object_uri_handles_null_byte_without_panic() {
5476 let _ = safe_object_uri("bucket", "key\0bad");
5477 }
5478
5479 #[test]
5480 fn safe_object_uri_handles_unicode_without_panic() {
5481 let _ = safe_object_uri("bucket", "rtl\u{202E}override");
5483 let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
5484 let _ = safe_object_uri("bucket", "日本語キー");
5485 }
5486
5487 #[test]
5488 fn safe_object_uri_no_panic_for_every_byte() {
5489 for b in 0u8..=255 {
5494 let s = String::from_utf8_lossy(&[b]).into_owned();
5495 let _ = safe_object_uri("bucket", &s);
5496 }
5497 }
5498}