1use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54const SAMPLE_BYTES: usize = 4096;
56
57struct AccessLogPreamble {
61 remote_ip: Option<String>,
62 requester: Option<String>,
63 request_uri: String,
64 user_agent: Option<String>,
65}
66
67pub struct S4Service<B: S3> {
68 backend: B,
69 registry: Arc<CodecRegistry>,
70 dispatcher: Arc<dyn CodecDispatcher>,
71 max_body_bytes: usize,
72 policy: Option<crate::policy::SharedPolicy>,
73 secure_transport: bool,
78 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
80 access_log: Option<crate::access_log::SharedAccessLog>,
82 sse_keyring: Option<crate::sse::SharedSseKeyring>,
90 versioning: Option<Arc<crate::versioning::VersioningManager>>,
100 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
109 kms_default_key_id: Option<String>,
110 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
121 compliance_strict: bool,
127}
128
129impl<B: S3> S4Service<B> {
130 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
132
133 pub fn new(
134 backend: B,
135 registry: Arc<CodecRegistry>,
136 dispatcher: Arc<dyn CodecDispatcher>,
137 ) -> Self {
138 Self {
139 backend,
140 registry,
141 dispatcher,
142 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
143 policy: None,
144 secure_transport: false,
145 rate_limits: None,
146 access_log: None,
147 sse_keyring: None,
148 versioning: None,
149 kms: None,
150 kms_default_key_id: None,
151 object_lock: None,
152 compliance_strict: false,
153 }
154 }
155
156 #[must_use]
163 pub fn with_compliance_strict(mut self, on: bool) -> Self {
164 self.compliance_strict = on;
165 self
166 }
167
168 #[must_use]
174 pub fn with_object_lock(
175 mut self,
176 mgr: Arc<crate::object_lock::ObjectLockManager>,
177 ) -> Self {
178 self.object_lock = Some(mgr);
179 self
180 }
181
182 #[must_use]
186 pub fn with_kms_backend(
187 mut self,
188 kms: Arc<dyn crate::kms::KmsBackend>,
189 default_key_id: Option<String>,
190 ) -> Self {
191 self.kms = Some(kms);
192 self.kms_default_key_id = default_key_id;
193 self
194 }
195
196 #[must_use]
208 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
209 self.versioning = Some(mgr);
210 self
211 }
212
213 #[must_use]
220 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
221 let keyring = crate::sse::SseKeyring::new(1, key);
222 self.sse_keyring = Some(std::sync::Arc::new(keyring));
223 self
224 }
225
226 #[must_use]
232 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
233 self.sse_keyring = Some(keyring);
234 self
235 }
236
237 #[must_use]
243 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
244 self.access_log = Some(log);
245 self
246 }
247
248 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
253 self.access_log.as_ref()?;
254 Some(AccessLogPreamble {
255 remote_ip: req
256 .headers
257 .get("x-forwarded-for")
258 .and_then(|v| v.to_str().ok())
259 .and_then(|raw| raw.split(',').next())
260 .map(|s| s.trim().to_owned()),
261 requester: Self::principal_of(req).map(str::to_owned),
262 request_uri: format!("{} {}", req.method, req.uri.path()),
263 user_agent: req
264 .headers
265 .get("user-agent")
266 .and_then(|v| v.to_str().ok())
267 .map(str::to_owned),
268 })
269 }
270
271 #[allow(clippy::too_many_arguments)]
275 async fn record_access(
276 &self,
277 preamble: Option<AccessLogPreamble>,
278 operation: &'static str,
279 bucket: &str,
280 key: Option<&str>,
281 http_status: u16,
282 bytes_sent: u64,
283 object_size: u64,
284 total_time_ms: u64,
285 error_code: Option<&str>,
286 ) {
287 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
288 return;
289 };
290 log.record(crate::access_log::AccessLogEntry {
291 time: std::time::SystemTime::now(),
292 bucket: bucket.to_owned(),
293 remote_ip: p.remote_ip,
294 requester: p.requester,
295 operation,
296 key: key.map(str::to_owned),
297 request_uri: p.request_uri,
298 http_status,
299 error_code: error_code.map(str::to_owned),
300 bytes_sent,
301 object_size,
302 total_time_ms,
303 user_agent: p.user_agent,
304 })
305 .await;
306 }
307
308 #[must_use]
314 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
315 self.rate_limits = Some(rl);
316 self
317 }
318
319 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
323 let Some(rl) = self.rate_limits.as_ref() else {
324 return Ok(());
325 };
326 let principal_id = Self::principal_of(req);
327 if !rl.check(principal_id, bucket) {
328 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
329 return Err(S3Error::with_message(
330 S3ErrorCode::SlowDown,
331 format!("rate-limited: bucket={bucket}"),
332 ));
333 }
334 Ok(())
335 }
336
337 #[must_use]
341 pub fn with_secure_transport(mut self, on: bool) -> Self {
342 self.secure_transport = on;
343 self
344 }
345
346 #[must_use]
347 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
348 self.max_body_bytes = n;
349 self
350 }
351
352 #[must_use]
357 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
358 self.policy = Some(policy);
359 self
360 }
361
362 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
365 req.credentials.as_ref().map(|c| c.access_key.as_str())
366 }
367
368 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
375 let user_agent = req
376 .headers
377 .get("user-agent")
378 .and_then(|v| v.to_str().ok())
379 .map(str::to_owned);
380 let source_ip = req
383 .headers
384 .get("x-forwarded-for")
385 .and_then(|v| v.to_str().ok())
386 .and_then(|raw| raw.split(',').next())
387 .and_then(|s| s.trim().parse().ok());
388 crate::policy::RequestContext {
389 source_ip,
390 user_agent,
391 request_time: Some(std::time::SystemTime::now()),
392 secure_transport: self.secure_transport,
393 extra: Default::default(),
394 }
395 }
396
397 fn enforce_policy<I>(
402 &self,
403 req: &S3Request<I>,
404 action: &'static str,
405 bucket: &str,
406 key: Option<&str>,
407 ) -> S3Result<()> {
408 let Some(policy) = self.policy.as_ref() else {
409 return Ok(());
410 };
411 let principal_id = Self::principal_of(req);
412 let ctx = self.request_context(req);
413 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
414 if decision.allow {
415 Ok(())
416 } else {
417 crate::metrics::record_policy_denial(action, bucket);
418 tracing::info!(
419 action,
420 bucket,
421 key = ?key,
422 principal = ?principal_id,
423 source_ip = ?ctx.source_ip,
424 user_agent = ?ctx.user_agent,
425 secure_transport = ctx.secure_transport,
426 matched_sid = ?decision.matched_sid,
427 effect = ?decision.matched_effect,
428 "S4 policy denied request"
429 );
430 Err(S3Error::with_message(
431 S3ErrorCode::AccessDenied,
432 format!("denied by S4 policy: {action} on bucket={bucket}"),
433 ))
434 }
435 }
436
437 pub fn into_backend(self) -> B {
439 self.backend
440 }
441
442 async fn partial_range_get(
445 &self,
446 req: &S3Request<GetObjectInput>,
447 plan: s4_codec::index::RangePlan,
448 client_start: u64,
449 client_end_exclusive: u64,
450 total_original: u64,
451 get_start: Instant,
452 ) -> S3Result<S3Response<GetObjectOutput>> {
453 let backend_range = s3s::dto::Range::Int {
455 first: plan.byte_start,
456 last: Some(plan.byte_end_exclusive - 1),
457 };
458 let backend_input = GetObjectInput {
459 bucket: req.input.bucket.clone(),
460 key: req.input.key.clone(),
461 range: Some(backend_range),
462 ..Default::default()
463 };
464 let backend_req = S3Request {
465 input: backend_input,
466 method: req.method.clone(),
467 uri: req.uri.clone(),
468 headers: req.headers.clone(),
469 extensions: http::Extensions::new(),
470 credentials: req.credentials.clone(),
471 region: req.region.clone(),
472 service: req.service.clone(),
473 trailing_headers: None,
474 };
475 let mut backend_resp = self.backend.get_object(backend_req).await?;
476 let blob = backend_resp.output.body.take().ok_or_else(|| {
477 S3Error::with_message(
478 S3ErrorCode::InternalError,
479 "backend partial GET returned empty body",
480 )
481 })?;
482 let bytes = collect_blob(blob, self.max_body_bytes)
483 .await
484 .map_err(internal("collect partial body"))?;
485
486 let mut combined = BytesMut::new();
488 for frame in FrameIter::new(bytes) {
489 let (header, payload) = frame.map_err(|e| {
490 S3Error::with_message(
491 S3ErrorCode::InternalError,
492 format!("partial-range frame parse: {e}"),
493 )
494 })?;
495 let chunk_manifest = ChunkManifest {
496 codec: header.codec,
497 original_size: header.original_size,
498 compressed_size: header.compressed_size,
499 crc32c: header.crc32c,
500 };
501 let decompressed = self
502 .registry
503 .decompress(payload, &chunk_manifest)
504 .await
505 .map_err(internal("partial-range decompress"))?;
506 combined.extend_from_slice(&decompressed);
507 }
508 let combined = combined.freeze();
509 let sliced = combined
510 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
511
512 let returned_size = sliced.len() as u64;
514 backend_resp.output.content_length = Some(returned_size as i64);
515 backend_resp.output.content_range = Some(format!(
516 "bytes {client_start}-{}/{total_original}",
517 client_end_exclusive - 1
518 ));
519 backend_resp.output.checksum_crc32 = None;
520 backend_resp.output.checksum_crc32c = None;
521 backend_resp.output.checksum_crc64nvme = None;
522 backend_resp.output.checksum_sha1 = None;
523 backend_resp.output.checksum_sha256 = None;
524 backend_resp.output.e_tag = None;
525 backend_resp.output.body = Some(bytes_to_blob(sliced));
526 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
527
528 let elapsed = get_start.elapsed();
529 crate::metrics::record_get(
530 "partial",
531 plan.byte_end_exclusive - plan.byte_start,
532 returned_size,
533 elapsed.as_secs_f64(),
534 true,
535 );
536 info!(
537 op = "get_object",
538 bucket = %req.input.bucket,
539 key = %req.input.key,
540 bytes_in = plan.byte_end_exclusive - plan.byte_start,
541 bytes_out = returned_size,
542 total_object_size = total_original,
543 range = true,
544 path = "sidecar-partial",
545 latency_ms = elapsed.as_millis() as u64,
546 "S4 partial Range GET via sidecar index"
547 );
548 Ok(backend_resp)
549 }
550
551 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
555 let bytes = encode_index(index);
556 let len = bytes.len() as i64;
557 let put_input = PutObjectInput {
558 bucket: bucket.into(),
559 key: sidecar_key(key),
560 body: Some(bytes_to_blob(bytes)),
561 content_length: Some(len),
562 content_type: Some("application/x-s4-index".into()),
563 ..Default::default()
564 };
565 let put_req = S3Request {
566 input: put_input,
567 method: http::Method::PUT,
568 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
569 headers: http::HeaderMap::new(),
570 extensions: http::Extensions::new(),
571 credentials: None,
572 region: None,
573 service: None,
574 trailing_headers: None,
575 };
576 if let Err(e) = self.backend.put_object(put_req).await {
577 tracing::warn!(
578 bucket,
579 key,
580 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
581 );
582 }
583 }
584
585 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
587 let get_input = GetObjectInput {
588 bucket: bucket.into(),
589 key: sidecar_key(key),
590 ..Default::default()
591 };
592 let get_req = S3Request {
593 input: get_input,
594 method: http::Method::GET,
595 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
596 headers: http::HeaderMap::new(),
597 extensions: http::Extensions::new(),
598 credentials: None,
599 region: None,
600 service: None,
601 trailing_headers: None,
602 };
603 let resp = self.backend.get_object(get_req).await.ok()?;
604 let blob = resp.output.body?;
605 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
606 decode_index(bytes).ok()
607 }
608
609 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
615 let mut out = BytesMut::new();
616 for frame in FrameIter::new(bytes) {
617 let (header, payload) = frame.map_err(|e| {
618 S3Error::with_message(
619 S3ErrorCode::InternalError,
620 format!("multipart frame parse: {e}"),
621 )
622 })?;
623 let chunk_manifest = ChunkManifest {
624 codec: header.codec,
625 original_size: header.original_size,
626 compressed_size: header.compressed_size,
627 crc32c: header.crc32c,
628 };
629 let decompressed = self
630 .registry
631 .decompress(payload, &chunk_manifest)
632 .await
633 .map_err(internal("multipart frame decompress"))?;
634 out.extend_from_slice(&decompressed);
635 }
636 Ok(out.freeze())
637 }
638}
639
640fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
645 let rest = s
646 .strip_prefix("bytes=")
647 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
648 let (a, b) = rest
649 .split_once('-')
650 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
651 let first: u64 = a
652 .parse()
653 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
654 let last: u64 = b
655 .parse()
656 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
657 if last < first {
658 return Err(format!("CopySourceRange last < first: {s:?}"));
659 }
660 Ok(s3s::dto::Range::Int {
661 first,
662 last: Some(last),
663 })
664}
665
666pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
680 format!("{key}.__s4ver__/{version_id}")
681}
682
683fn is_versioning_shadow_key(key: &str) -> bool {
686 key.contains(".__s4ver__/")
687}
688
689fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
690 metadata
691 .as_ref()
692 .and_then(|m| m.get(META_MULTIPART))
693 .map(|v| v == "true")
694 .unwrap_or(false)
695}
696
697const META_CODEC: &str = "s4-codec";
698const META_ORIGINAL_SIZE: &str = "s4-original-size";
699const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
700const META_CRC32C: &str = "s4-crc32c";
701const META_MULTIPART: &str = "s4-multipart";
704const META_FRAMED: &str = "s4-framed";
708
709fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
710 metadata
711 .as_ref()
712 .and_then(|m| m.get(META_FRAMED))
713 .map(|v| v == "true")
714 .unwrap_or(false)
715}
716
717fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
719 metadata
720 .as_ref()
721 .and_then(|m| m.get("s4-encrypted"))
722 .map(|v| v == "aes-256-gcm")
723 .unwrap_or(false)
724}
725
726fn extract_sse_c_material(
733 algorithm: &Option<String>,
734 key: &Option<String>,
735 md5: &Option<String>,
736) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
737 match (algorithm, key, md5) {
738 (None, None, None) => Ok(None),
739 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
740 .map(Some)
741 .map_err(sse_c_error_to_s3),
742 _ => Err(S3Error::with_message(
743 S3ErrorCode::InvalidRequest,
744 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
745 )),
746 }
747}
748
749fn extract_kms_key_id(
752 sse: &Option<ServerSideEncryption>,
753 sse_kms_key_id: &Option<String>,
754 gateway_default: Option<&str>,
755) -> Option<String> {
756 let asks_for_kms = sse
757 .as_ref()
758 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
759 .unwrap_or(false);
760 if !asks_for_kms {
761 return None;
762 }
763 sse_kms_key_id
764 .clone()
765 .or_else(|| gateway_default.map(str::to_owned))
766}
767
768fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
772 use crate::kms::KmsError as K;
773 match e {
774 K::KeyNotFound { key_id } => S3Error::with_message(
775 S3ErrorCode::InvalidArgument,
776 format!("KMS key not found: {key_id}"),
777 ),
778 K::BackendUnavailable { message } => S3Error::with_message(
779 S3ErrorCode::ServiceUnavailable,
780 format!("KMS backend unavailable: {message}"),
781 ),
782 other => S3Error::with_message(
783 S3ErrorCode::InternalError,
784 format!("KMS error: {other}"),
785 ),
786 }
787}
788
789fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
793 use crate::sse::SseError as E;
794 match e {
795 E::WrongCustomerKey => S3Error::with_message(
796 S3ErrorCode::AccessDenied,
797 "SSE-C key does not match the key used at PUT time",
798 ),
799 E::InvalidCustomerKey { reason } => S3Error::with_message(
800 S3ErrorCode::InvalidArgument,
801 format!("SSE-C: {reason}"),
802 ),
803 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
804 S3ErrorCode::InvalidArgument,
805 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
806 ),
807 E::CustomerKeyRequired => S3Error::with_message(
808 S3ErrorCode::InvalidRequest,
809 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
810 ),
811 E::CustomerKeyUnexpected => S3Error::with_message(
812 S3ErrorCode::InvalidRequest,
813 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
814 ),
815 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
816 }
817}
818
819fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
820 let m = metadata.as_ref()?;
821 let codec = m
822 .get(META_CODEC)
823 .and_then(|s| s.parse::<CodecKind>().ok())?;
824 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
825 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
826 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
827 Some(ChunkManifest {
828 codec,
829 original_size,
830 compressed_size,
831 crc32c,
832 })
833}
834
835fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
836 let meta = metadata.get_or_insert_with(Default::default);
837 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
838 meta.insert(
839 META_ORIGINAL_SIZE.into(),
840 manifest.original_size.to_string(),
841 );
842 meta.insert(
843 META_COMPRESSED_SIZE.into(),
844 manifest.compressed_size.to_string(),
845 );
846 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
847}
848
849fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
850 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
851}
852
853fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
857 headers
858 .get("x-amz-bypass-governance-retention")
859 .and_then(|v| v.to_str().ok())
860 .map(|s| s.eq_ignore_ascii_case("true"))
861 .unwrap_or(false)
862}
863
864fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
870 let mut buf = Vec::new();
871 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
872 let s = std::str::from_utf8(&buf).ok()?;
873 chrono::DateTime::parse_from_rfc3339(s)
874 .ok()
875 .map(|dt| dt.with_timezone(&chrono::Utc))
876}
877
878fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
881 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
886 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
887}
888
889pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
893 if total == 0 {
894 return Err("cannot range-get zero-length object".into());
895 }
896 match range {
897 s3s::dto::Range::Int { first, last } => {
898 let start = *first;
899 let end_inclusive = match last {
900 Some(l) => (*l).min(total - 1),
901 None => total - 1,
902 };
903 if start > end_inclusive || start >= total {
904 return Err(format!(
905 "range bytes={start}-{:?} out of object size {total}",
906 last
907 ));
908 }
909 Ok((start, end_inclusive + 1))
910 }
911 s3s::dto::Range::Suffix { length } => {
912 let len = (*length).min(total);
913 Ok((total - len, total))
914 }
915 }
916}
917
918#[async_trait::async_trait]
919impl<B: S3> S3 for S4Service<B> {
920 #[tracing::instrument(
922 name = "s4.put_object",
923 skip(self, req),
924 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
925 )]
926 async fn put_object(
927 &self,
928 mut req: S3Request<PutObjectInput>,
929 ) -> S3Result<S3Response<PutObjectOutput>> {
930 let put_start = Instant::now();
931 let put_bucket = req.input.bucket.clone();
932 let put_key = req.input.key.clone();
933 let access_preamble = self.access_log_preamble(&req);
934 self.enforce_rate_limit(&req, &put_bucket)?;
935 self.enforce_policy(&req, "s3:PutObject", &put_bucket, Some(&put_key))?;
936 if let Some(mgr) = self.object_lock.as_ref()
944 && let Some(state) = mgr.get(&put_bucket, &put_key)
945 {
946 let bucket_versioned_enabled = self
947 .versioning
948 .as_ref()
949 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
950 .unwrap_or(false);
951 if !bucket_versioned_enabled {
952 let bypass = parse_bypass_governance_header(&req.headers);
953 let now = chrono::Utc::now();
954 if !state.can_delete(now, bypass) {
955 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
956 return Err(S3Error::with_message(
957 S3ErrorCode::AccessDenied,
958 "Access Denied because object protected by object lock",
959 ));
960 }
961 }
962 }
963 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
969 .input
970 .object_lock_mode
971 .as_ref()
972 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
973 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
974 .input
975 .object_lock_retain_until_date
976 .as_ref()
977 .and_then(timestamp_to_chrono_utc);
978 let explicit_legal_hold_on: Option<bool> = req
979 .input
980 .object_lock_legal_hold_status
981 .as_ref()
982 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
983 if let Some(blob) = req.input.body.take() {
984 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
987 .await
988 .map_err(internal("peek put sample"))?;
989 let sample_len = sample.len().min(SAMPLE_BYTES);
990 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
991
992 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
998 let (compressed, manifest, is_framed) = if use_framed {
999 let chained = chain_sample_with_rest(sample, rest_stream);
1001 debug!(
1002 bucket = ?req.input.bucket,
1003 key = ?req.input.key,
1004 codec = kind.as_str(),
1005 path = "streaming-framed",
1006 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1007 );
1008 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1012 let (body, manifest) = streaming_compress_to_frames(
1013 chained,
1014 Arc::clone(&self.registry),
1015 kind,
1016 chunk_size,
1017 )
1018 .await
1019 .map_err(internal("streaming framed compress"))?;
1020 (body, manifest, true)
1021 } else {
1022 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1025 .await
1026 .map_err(internal("collect put body (buffered path)"))?;
1027 debug!(
1028 bucket = ?req.input.bucket,
1029 key = ?req.input.key,
1030 bytes = bytes.len(),
1031 codec = kind.as_str(),
1032 path = "buffered",
1033 "S4 put_object: compressing (buffered, raw blob)"
1034 );
1035 let (body, m) = self
1036 .registry
1037 .compress(bytes, kind)
1038 .await
1039 .map_err(internal("registry compress"))?;
1040 (body, m, false)
1041 };
1042
1043 write_manifest(&mut req.input.metadata, &manifest);
1044 if is_framed {
1045 req.input
1047 .metadata
1048 .get_or_insert_with(Default::default)
1049 .insert(META_FRAMED.into(), "true".into());
1050 }
1051 req.input.content_length = Some(compressed.len() as i64);
1055 req.input.checksum_algorithm = None;
1060 req.input.checksum_crc32 = None;
1061 req.input.checksum_crc32c = None;
1062 req.input.checksum_crc64nvme = None;
1063 req.input.checksum_sha1 = None;
1064 req.input.checksum_sha256 = None;
1065 req.input.content_md5 = None;
1066 let original_size = manifest.original_size;
1067 let compressed_size = manifest.compressed_size;
1068 let codec_label = manifest.codec.as_str();
1069 let sidecar_index = if is_framed {
1072 s4_codec::index::build_index_from_body(&compressed).ok()
1073 } else {
1074 None
1075 };
1076 let sse_c_material = extract_sse_c_material(
1085 &req.input.sse_customer_algorithm,
1086 &req.input.sse_customer_key,
1087 &req.input.sse_customer_key_md5,
1088 )?;
1089 let kms_key_id = extract_kms_key_id(
1094 &req.input.server_side_encryption,
1095 &req.input.ssekms_key_id,
1096 self.kms_default_key_id.as_deref(),
1097 );
1098 if self.compliance_strict
1105 && sse_c_material.is_none()
1106 && kms_key_id.is_none()
1107 && self.sse_keyring.is_none()
1108 && req
1109 .input
1110 .server_side_encryption
1111 .as_ref()
1112 .map(|s| s.as_str())
1113 != Some(ServerSideEncryption::AES256)
1114 {
1115 return Err(S3Error::with_message(
1116 S3ErrorCode::InvalidRequest,
1117 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1118 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1119 ));
1120 }
1121 if sse_c_material.is_some() && kms_key_id.is_some() {
1124 return Err(S3Error::with_message(
1125 S3ErrorCode::InvalidArgument,
1126 "SSE-C and SSE-KMS cannot be used together on the same PUT",
1127 ));
1128 }
1129 let kms_wrap = if let Some(ref key_id) = kms_key_id {
1132 let kms = self.kms.as_ref().ok_or_else(|| {
1133 S3Error::with_message(
1134 S3ErrorCode::InvalidRequest,
1135 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1136 )
1137 })?;
1138 let (dek, wrapped) = kms
1139 .generate_dek(key_id)
1140 .await
1141 .map_err(kms_error_to_s3)?;
1142 if dek.len() != 32 {
1143 return Err(S3Error::with_message(
1144 S3ErrorCode::InternalError,
1145 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1146 ));
1147 }
1148 let mut dek_arr = [0u8; 32];
1149 dek_arr.copy_from_slice(&dek);
1150 Some((dek_arr, wrapped))
1151 } else {
1152 None
1153 };
1154 let body_to_send = if let Some(ref m) = sse_c_material {
1155 req.input
1156 .metadata
1157 .get_or_insert_with(Default::default)
1158 .insert("s4-encrypted".into(), "aes-256-gcm".into());
1159 crate::sse::encrypt_with_source(
1160 &compressed,
1161 crate::sse::SseSource::CustomerKey {
1162 key: &m.key,
1163 key_md5: &m.key_md5,
1164 },
1165 )
1166 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1167 req.input
1168 .metadata
1169 .get_or_insert_with(Default::default)
1170 .insert("s4-encrypted".into(), "aes-256-gcm".into());
1171 crate::sse::encrypt_with_source(
1172 &compressed,
1173 crate::sse::SseSource::Kms { dek, wrapped },
1174 )
1175 } else if let Some(keyring) = self.sse_keyring.as_ref() {
1176 req.input
1177 .metadata
1178 .get_or_insert_with(Default::default)
1179 .insert("s4-encrypted".into(), "aes-256-gcm".into());
1180 crate::sse::encrypt_v2(&compressed, keyring)
1181 } else {
1182 compressed.clone()
1183 };
1184 req.input.body = Some(bytes_to_blob(body_to_send));
1185 let pending_version: Option<crate::versioning::PutOutcome> = self
1194 .versioning
1195 .as_ref()
1196 .map(|mgr| mgr.state(&put_bucket))
1197 .map(|state| match state {
1198 crate::versioning::VersioningState::Enabled => {
1199 crate::versioning::PutOutcome {
1200 version_id: crate::versioning::VersioningManager::new_version_id(),
1201 versioned_response: true,
1202 }
1203 }
1204 crate::versioning::VersioningState::Suspended
1205 | crate::versioning::VersioningState::Unversioned => {
1206 crate::versioning::PutOutcome {
1207 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1208 versioned_response: false,
1209 }
1210 }
1211 });
1212 if let Some(ref pv) = pending_version
1213 && pv.versioned_response
1214 {
1215 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1216 }
1217 let mut backend_resp = self.backend.put_object(req).await;
1218 if let Some(idx) = sidecar_index
1219 && backend_resp.is_ok()
1220 && idx.entries.len() > 1
1221 {
1222 self.write_sidecar(&put_bucket, &put_key, &idx).await;
1228 }
1229 if let (Some(mgr), Some(pv), Ok(resp)) = (
1233 self.versioning.as_ref(),
1234 pending_version.as_ref(),
1235 backend_resp.as_mut(),
1236 ) {
1237 let etag = resp
1238 .output
1239 .e_tag
1240 .clone()
1241 .map(ETag::into_value)
1242 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
1243 let now = chrono::Utc::now();
1244 mgr.commit_put_with_version(
1245 &put_bucket,
1246 &put_key,
1247 crate::versioning::VersionEntry {
1248 version_id: pv.version_id.clone(),
1249 etag,
1250 size: original_size,
1251 is_delete_marker: false,
1252 created_at: now,
1253 },
1254 );
1255 if pv.versioned_response {
1256 resp.output.version_id = Some(pv.version_id.clone());
1257 }
1258 }
1259 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
1263 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
1264 resp.output.sse_customer_key_md5 = Some(
1265 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
1266 );
1267 }
1268 if let (Some((_, wrapped)), Ok(resp)) =
1272 (kms_wrap.as_ref(), backend_resp.as_mut())
1273 {
1274 resp.output.server_side_encryption =
1275 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
1276 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
1277 }
1278 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
1284 if explicit_lock_mode.is_some()
1285 || explicit_retain_until.is_some()
1286 || explicit_legal_hold_on.is_some()
1287 {
1288 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
1289 if let Some(m) = explicit_lock_mode {
1290 state.mode = Some(m);
1291 }
1292 if let Some(u) = explicit_retain_until {
1293 state.retain_until = Some(u);
1294 }
1295 if let Some(lh) = explicit_legal_hold_on {
1296 state.legal_hold_on = lh;
1297 }
1298 mgr.set(&put_bucket, &put_key, state);
1299 }
1300 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
1301 }
1302 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
1304 crate::metrics::record_put(
1305 codec_label,
1306 original_size,
1307 compressed_size,
1308 elapsed.as_secs_f64(),
1309 backend_resp.is_ok(),
1310 );
1311 self.record_access(
1313 access_preamble,
1314 "REST.PUT.OBJECT",
1315 &put_bucket,
1316 Some(&put_key),
1317 if backend_resp.is_ok() { 200 } else { 500 },
1318 compressed_size,
1319 original_size,
1320 elapsed.as_millis() as u64,
1321 backend_resp.as_ref().err().map(|e| e.code().as_str()),
1322 )
1323 .await;
1324 info!(
1325 op = "put_object",
1326 bucket = %put_bucket,
1327 key = %put_key,
1328 codec = codec_label,
1329 bytes_in = original_size,
1330 bytes_out = compressed_size,
1331 ratio = format!(
1332 "{:.3}",
1333 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
1334 ),
1335 latency_ms = elapsed.as_millis() as u64,
1336 ok = backend_resp.is_ok(),
1337 "S4 put completed"
1338 );
1339 return backend_resp;
1340 }
1341 let pending_version: Option<crate::versioning::PutOutcome> = self
1345 .versioning
1346 .as_ref()
1347 .map(|mgr| mgr.state(&put_bucket))
1348 .map(|state| match state {
1349 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
1350 version_id: crate::versioning::VersioningManager::new_version_id(),
1351 versioned_response: true,
1352 },
1353 _ => crate::versioning::PutOutcome {
1354 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1355 versioned_response: false,
1356 },
1357 });
1358 if let Some(ref pv) = pending_version
1359 && pv.versioned_response
1360 {
1361 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1362 }
1363 let mut backend_resp = self.backend.put_object(req).await;
1364 if let (Some(mgr), Some(pv), Ok(resp)) = (
1365 self.versioning.as_ref(),
1366 pending_version.as_ref(),
1367 backend_resp.as_mut(),
1368 ) {
1369 let etag = resp
1370 .output
1371 .e_tag
1372 .clone()
1373 .map(ETag::into_value)
1374 .unwrap_or_default();
1375 let now = chrono::Utc::now();
1376 mgr.commit_put_with_version(
1377 &put_bucket,
1378 &put_key,
1379 crate::versioning::VersionEntry {
1380 version_id: pv.version_id.clone(),
1381 etag,
1382 size: 0,
1383 is_delete_marker: false,
1384 created_at: now,
1385 },
1386 );
1387 if pv.versioned_response {
1388 resp.output.version_id = Some(pv.version_id.clone());
1389 }
1390 }
1391 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
1395 if explicit_lock_mode.is_some()
1396 || explicit_retain_until.is_some()
1397 || explicit_legal_hold_on.is_some()
1398 {
1399 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
1400 if let Some(m) = explicit_lock_mode {
1401 state.mode = Some(m);
1402 }
1403 if let Some(u) = explicit_retain_until {
1404 state.retain_until = Some(u);
1405 }
1406 if let Some(lh) = explicit_legal_hold_on {
1407 state.legal_hold_on = lh;
1408 }
1409 mgr.set(&put_bucket, &put_key, state);
1410 }
1411 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
1412 }
1413 backend_resp
1414 }
1415
1416 #[tracing::instrument(
1418 name = "s4.get_object",
1419 skip(self, req),
1420 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
1421 )]
1422 async fn get_object(
1423 &self,
1424 mut req: S3Request<GetObjectInput>,
1425 ) -> S3Result<S3Response<GetObjectOutput>> {
1426 let get_start = Instant::now();
1427 let get_bucket = req.input.bucket.clone();
1428 let get_key = req.input.key.clone();
1429 self.enforce_rate_limit(&req, &get_bucket)?;
1430 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
1431 let range_request = req.input.range.take();
1433 let sse_c_alg = req.input.sse_customer_algorithm.take();
1439 let sse_c_key = req.input.sse_customer_key.take();
1440 let sse_c_md5 = req.input.sse_customer_key_md5.take();
1441 let get_sse_c_material =
1442 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1443
1444 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
1457 Some(mgr)
1458 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
1459 {
1460 let req_vid = req.input.version_id.take();
1461 let entry = match req_vid.as_deref() {
1462 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
1463 || S3Error::with_message(
1464 S3ErrorCode::NoSuchVersion,
1465 format!("no such version: {vid}"),
1466 ),
1467 )?,
1468 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
1469 S3Error::with_message(
1470 S3ErrorCode::NoSuchKey,
1471 format!("no such key: {get_key}"),
1472 )
1473 })?,
1474 };
1475 if entry.is_delete_marker {
1476 return Err(S3Error::with_message(
1484 S3ErrorCode::NoSuchKey,
1485 format!("delete marker is the current version of {get_key}"),
1486 ));
1487 }
1488 if entry.version_id != crate::versioning::NULL_VERSION_ID {
1489 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
1490 }
1491 Some(entry.version_id)
1492 }
1493 _ => None,
1494 };
1495
1496 if let Some(ref r) = range_request
1500 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
1501 {
1502 let total = index.total_original_size();
1503 let (start, end_exclusive) = match resolve_range(r, total) {
1504 Ok(v) => v,
1505 Err(e) => {
1506 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
1507 }
1508 };
1509 if let Some(plan) = index.lookup_range(start, end_exclusive) {
1510 return self
1511 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
1512 .await;
1513 }
1514 }
1515 let mut resp = self.backend.get_object(req).await?;
1516 if let Some(ref vid) = resolved_version_id {
1521 resp.output.version_id = Some(vid.clone());
1522 }
1523 let is_multipart = is_multipart_object(&resp.output.metadata);
1524 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
1525 let needs_frame_parse = is_multipart || is_framed_v2;
1528 let manifest_opt = extract_manifest(&resp.output.metadata);
1529
1530 if !needs_frame_parse && manifest_opt.is_none() {
1531 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
1533 return Ok(resp);
1534 }
1535
1536 if let Some(blob) = resp.output.body.take() {
1537 let blob = if is_sse_encrypted(&resp.output.metadata) {
1545 let body = collect_blob(blob, self.max_body_bytes)
1546 .await
1547 .map_err(internal("collect SSE-encrypted body"))?;
1548 let plain = match crate::sse::peek_magic(&body) {
1553 Some("S4E4") => {
1554 let kms = self.kms.as_ref().ok_or_else(|| {
1555 S3Error::with_message(
1556 S3ErrorCode::InvalidRequest,
1557 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1558 )
1559 })?;
1560 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
1561 crate::sse::decrypt_with_kms(&body, kms_ref)
1562 .await
1563 .map_err(|e| match e {
1564 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
1565 other => S3Error::with_message(
1566 S3ErrorCode::InternalError,
1567 format!("SSE-KMS decrypt failed: {other}"),
1568 ),
1569 })?
1570 }
1571 _ => {
1572 if let Some(ref m) = get_sse_c_material {
1573 crate::sse::decrypt(
1574 &body,
1575 crate::sse::SseSource::CustomerKey {
1576 key: &m.key,
1577 key_md5: &m.key_md5,
1578 },
1579 )
1580 .map_err(sse_c_error_to_s3)?
1581 } else {
1582 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
1583 S3Error::with_message(
1584 S3ErrorCode::InvalidRequest,
1585 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
1586 )
1587 })?;
1588 crate::sse::decrypt(&body, keyring).map_err(|e| {
1589 S3Error::with_message(
1590 S3ErrorCode::InternalError,
1591 format!("SSE-S4 decrypt failed: {e}"),
1592 )
1593 })?
1594 }
1595 }
1596 };
1597 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
1600 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
1601 {
1602 resp.output.server_side_encryption = Some(
1603 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
1604 );
1605 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
1606 }
1607 bytes_to_blob(plain)
1608 } else if let Some(ref m) = get_sse_c_material {
1609 let _ = m;
1612 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
1613 } else {
1614 blob
1615 };
1616 if let Some(ref m) = get_sse_c_material {
1619 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
1620 resp.output.sse_customer_key_md5 = Some(
1621 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
1622 );
1623 }
1624 if range_request.is_none()
1632 && !needs_frame_parse
1633 && let Some(ref m) = manifest_opt
1634 && supports_streaming_decompress(m.codec)
1635 && m.codec == CodecKind::CpuZstd
1636 {
1637 let decompressed_blob = cpu_zstd_decompress_stream(blob);
1638 resp.output.content_length = Some(m.original_size as i64);
1639 resp.output.checksum_crc32 = None;
1640 resp.output.checksum_crc32c = None;
1641 resp.output.checksum_crc64nvme = None;
1642 resp.output.checksum_sha1 = None;
1643 resp.output.checksum_sha256 = None;
1644 resp.output.e_tag = None;
1645 resp.output.body = Some(decompressed_blob);
1646 let elapsed = get_start.elapsed();
1647 crate::metrics::record_get(
1648 m.codec.as_str(),
1649 m.compressed_size,
1650 m.original_size,
1651 elapsed.as_secs_f64(),
1652 true,
1653 );
1654 info!(
1655 op = "get_object",
1656 bucket = %get_bucket,
1657 key = %get_key,
1658 codec = m.codec.as_str(),
1659 bytes_in = m.compressed_size,
1660 bytes_out = m.original_size,
1661 path = "streaming",
1662 setup_latency_ms = elapsed.as_millis() as u64,
1663 "S4 get started (streaming)"
1664 );
1665 return Ok(resp);
1666 }
1667 if range_request.is_none()
1669 && !needs_frame_parse
1670 && let Some(ref m) = manifest_opt
1671 && m.codec == CodecKind::Passthrough
1672 {
1673 resp.output.content_length = Some(m.original_size as i64);
1674 resp.output.checksum_crc32 = None;
1675 resp.output.checksum_crc32c = None;
1676 resp.output.checksum_crc64nvme = None;
1677 resp.output.checksum_sha1 = None;
1678 resp.output.checksum_sha256 = None;
1679 resp.output.e_tag = None;
1680 resp.output.body = Some(blob);
1681 debug!("S4 get_object: passthrough streaming");
1682 return Ok(resp);
1683 }
1684
1685 let bytes = collect_blob(blob, self.max_body_bytes)
1687 .await
1688 .map_err(internal("collect get body"))?;
1689
1690 let decompressed = if needs_frame_parse {
1691 self.decompress_multipart(bytes).await?
1694 } else {
1695 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
1696 self.registry
1697 .decompress(bytes, manifest)
1698 .await
1699 .map_err(internal("registry decompress"))?
1700 };
1701
1702 let total_size = decompressed.len() as u64;
1704 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
1705 let (start, end) = resolve_range(r, total_size)
1706 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1707 let sliced = decompressed.slice(start as usize..end as usize);
1708 resp.output.content_range = Some(format!(
1709 "bytes {start}-{}/{total_size}",
1710 end.saturating_sub(1)
1711 ));
1712 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
1713 } else {
1714 (decompressed, None)
1715 };
1716 resp.output.content_length = Some(final_bytes.len() as i64);
1719 resp.output.checksum_crc32 = None;
1724 resp.output.checksum_crc32c = None;
1725 resp.output.checksum_crc64nvme = None;
1726 resp.output.checksum_sha1 = None;
1727 resp.output.checksum_sha256 = None;
1728 resp.output.e_tag = None;
1729 let returned_size = final_bytes.len() as u64;
1730 let codec_label = manifest_opt
1731 .as_ref()
1732 .map(|m| m.codec.as_str())
1733 .unwrap_or("multipart");
1734 resp.output.body = Some(bytes_to_blob(final_bytes));
1735 if let Some(status) = status_override {
1736 resp.status = Some(status);
1737 }
1738 let elapsed = get_start.elapsed();
1739 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
1740 info!(
1741 op = "get_object",
1742 bucket = %get_bucket,
1743 key = %get_key,
1744 codec = codec_label,
1745 bytes_out = returned_size,
1746 total_object_size = total_size,
1747 range = range_request.is_some(),
1748 path = "buffered",
1749 latency_ms = elapsed.as_millis() as u64,
1750 "S4 get completed (buffered)"
1751 );
1752 }
1753 Ok(resp)
1754 }
1755
1756 async fn head_bucket(
1758 &self,
1759 req: S3Request<HeadBucketInput>,
1760 ) -> S3Result<S3Response<HeadBucketOutput>> {
1761 self.backend.head_bucket(req).await
1762 }
1763 async fn list_buckets(
1764 &self,
1765 req: S3Request<ListBucketsInput>,
1766 ) -> S3Result<S3Response<ListBucketsOutput>> {
1767 self.backend.list_buckets(req).await
1768 }
1769 async fn create_bucket(
1770 &self,
1771 req: S3Request<CreateBucketInput>,
1772 ) -> S3Result<S3Response<CreateBucketOutput>> {
1773 self.backend.create_bucket(req).await
1774 }
1775 async fn delete_bucket(
1776 &self,
1777 req: S3Request<DeleteBucketInput>,
1778 ) -> S3Result<S3Response<DeleteBucketOutput>> {
1779 self.backend.delete_bucket(req).await
1780 }
1781 async fn head_object(
1782 &self,
1783 req: S3Request<HeadObjectInput>,
1784 ) -> S3Result<S3Response<HeadObjectOutput>> {
1785 let mut resp = self.backend.head_object(req).await?;
1786 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
1787 resp.output.content_length = Some(manifest.original_size as i64);
1791 resp.output.checksum_crc32 = None;
1792 resp.output.checksum_crc32c = None;
1793 resp.output.checksum_crc64nvme = None;
1794 resp.output.checksum_sha1 = None;
1795 resp.output.checksum_sha256 = None;
1796 resp.output.e_tag = None;
1797 }
1798 Ok(resp)
1799 }
1800 async fn delete_object(
1801 &self,
1802 mut req: S3Request<DeleteObjectInput>,
1803 ) -> S3Result<S3Response<DeleteObjectOutput>> {
1804 let bucket = req.input.bucket.clone();
1805 let key = req.input.key.clone();
1806 self.enforce_rate_limit(&req, &bucket)?;
1807 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
1808 if let Some(mgr) = self.object_lock.as_ref()
1816 && let Some(state) = mgr.get(&bucket, &key)
1817 {
1818 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
1819 let now = chrono::Utc::now();
1820 if !state.can_delete(now, bypass) {
1821 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
1822 return Err(S3Error::with_message(
1823 S3ErrorCode::AccessDenied,
1824 "Access Denied because object protected by object lock",
1825 ));
1826 }
1827 }
1828 if let Some(mgr) = self.versioning.as_ref() {
1844 let state = mgr.state(&bucket);
1845 if state != crate::versioning::VersioningState::Unversioned {
1846 let req_vid = req.input.version_id.take();
1847 if let Some(vid) = req_vid {
1848 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
1852 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
1853 key.clone()
1854 } else {
1855 versioned_shadow_key(&key, &vid)
1856 };
1857 let was_real_version = outcome
1858 .as_ref()
1859 .map(|o| !o.is_delete_marker)
1860 .unwrap_or(false);
1861 if was_real_version {
1862 let backend_input = DeleteObjectInput {
1866 bucket: bucket.clone(),
1867 key: backend_target,
1868 ..Default::default()
1869 };
1870 let backend_req = S3Request {
1871 input: backend_input,
1872 method: http::Method::DELETE,
1873 uri: req.uri.clone(),
1874 headers: req.headers.clone(),
1875 extensions: http::Extensions::new(),
1876 credentials: req.credentials.clone(),
1877 region: req.region.clone(),
1878 service: req.service.clone(),
1879 trailing_headers: None,
1880 };
1881 let _ = self.backend.delete_object(backend_req).await;
1882 }
1883 let mut output = DeleteObjectOutput {
1884 version_id: Some(vid.clone()),
1885 ..Default::default()
1886 };
1887 if let Some(o) = outcome.as_ref()
1888 && o.is_delete_marker
1889 {
1890 output.delete_marker = Some(true);
1891 }
1892 return Ok(S3Response::new(output));
1893 }
1894 let outcome = mgr.record_delete(&bucket, &key);
1896 if state == crate::versioning::VersioningState::Suspended {
1897 let backend_input = DeleteObjectInput {
1900 bucket: bucket.clone(),
1901 key: key.clone(),
1902 ..Default::default()
1903 };
1904 let backend_req = S3Request {
1905 input: backend_input,
1906 method: http::Method::DELETE,
1907 uri: req.uri.clone(),
1908 headers: req.headers.clone(),
1909 extensions: http::Extensions::new(),
1910 credentials: req.credentials.clone(),
1911 region: req.region.clone(),
1912 service: req.service.clone(),
1913 trailing_headers: None,
1914 };
1915 let _ = self.backend.delete_object(backend_req).await;
1916 }
1917 let output = DeleteObjectOutput {
1918 delete_marker: Some(true),
1919 version_id: outcome.version_id,
1920 ..Default::default()
1921 };
1922 return Ok(S3Response::new(output));
1923 }
1924 }
1925 let resp = self.backend.delete_object(req).await?;
1928 if let Some(mgr) = self.object_lock.as_ref() {
1933 mgr.clear(&bucket, &key);
1934 }
1935 let sidecar_input = DeleteObjectInput {
1936 bucket: bucket.clone(),
1937 key: sidecar_key(&key),
1938 ..Default::default()
1939 };
1940 let sidecar_req = S3Request {
1941 input: sidecar_input,
1942 method: http::Method::DELETE,
1943 uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
1944 headers: http::HeaderMap::new(),
1945 extensions: http::Extensions::new(),
1946 credentials: None,
1947 region: None,
1948 service: None,
1949 trailing_headers: None,
1950 };
1951 let _ = self.backend.delete_object(sidecar_req).await;
1952 Ok(resp)
1953 }
1954 async fn delete_objects(
1955 &self,
1956 req: S3Request<DeleteObjectsInput>,
1957 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
1958 self.backend.delete_objects(req).await
1959 }
1960 async fn copy_object(
1961 &self,
1962 mut req: S3Request<CopyObjectInput>,
1963 ) -> S3Result<S3Response<CopyObjectOutput>> {
1964 let dst_bucket = req.input.bucket.clone();
1966 let dst_key = req.input.key.clone();
1967 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
1968 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1969 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
1970 }
1971 let needs_merge = req
1981 .input
1982 .metadata_directive
1983 .as_ref()
1984 .map(|d| d.as_str() == MetadataDirective::REPLACE)
1985 .unwrap_or(false);
1986 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1987 let head_input = HeadObjectInput {
1988 bucket: bucket.to_string(),
1989 key: key.to_string(),
1990 ..Default::default()
1991 };
1992 let head_req = S3Request {
1993 input: head_input,
1994 method: req.method.clone(),
1995 uri: req.uri.clone(),
1996 headers: req.headers.clone(),
1997 extensions: http::Extensions::new(),
1998 credentials: req.credentials.clone(),
1999 region: req.region.clone(),
2000 service: req.service.clone(),
2001 trailing_headers: None,
2002 };
2003 if let Ok(head) = self.backend.head_object(head_req).await
2004 && let Some(src_meta) = head.output.metadata.as_ref()
2005 {
2006 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
2007 for key in [
2008 META_CODEC,
2009 META_ORIGINAL_SIZE,
2010 META_COMPRESSED_SIZE,
2011 META_CRC32C,
2012 META_MULTIPART,
2013 META_FRAMED,
2014 ] {
2015 if let Some(v) = src_meta.get(key) {
2016 dest_meta
2019 .entry(key.to_string())
2020 .or_insert_with(|| v.clone());
2021 }
2022 }
2023 debug!(
2024 src_bucket = %bucket,
2025 src_key = %key,
2026 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
2027 );
2028 }
2029 }
2030 self.backend.copy_object(req).await
2031 }
2032 async fn list_objects(
2033 &self,
2034 req: S3Request<ListObjectsInput>,
2035 ) -> S3Result<S3Response<ListObjectsOutput>> {
2036 self.enforce_rate_limit(&req, &req.input.bucket)?;
2037 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2038 let mut resp = self.backend.list_objects(req).await?;
2039 if let Some(contents) = resp.output.contents.as_mut() {
2042 contents.retain(|o| {
2043 o.key
2044 .as_ref()
2045 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2046 .unwrap_or(true)
2047 });
2048 }
2049 Ok(resp)
2050 }
2051 async fn list_objects_v2(
2052 &self,
2053 req: S3Request<ListObjectsV2Input>,
2054 ) -> S3Result<S3Response<ListObjectsV2Output>> {
2055 self.enforce_rate_limit(&req, &req.input.bucket)?;
2056 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2057 let mut resp = self.backend.list_objects_v2(req).await?;
2058 if let Some(contents) = resp.output.contents.as_mut() {
2059 let before = contents.len();
2060 contents.retain(|o| {
2061 o.key
2062 .as_ref()
2063 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2064 .unwrap_or(true)
2065 });
2066 if let Some(kc) = resp.output.key_count.as_mut() {
2068 *kc -= (before - contents.len()) as i32;
2069 }
2070 }
2071 Ok(resp)
2072 }
2073 async fn list_object_versions(
2081 &self,
2082 req: S3Request<ListObjectVersionsInput>,
2083 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
2084 self.enforce_rate_limit(&req, &req.input.bucket)?;
2085 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2086 if let Some(mgr) = self.versioning.as_ref()
2088 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
2089 {
2090 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
2091 let page = mgr.list_versions(
2092 &req.input.bucket,
2093 req.input.prefix.as_deref(),
2094 req.input.key_marker.as_deref(),
2095 req.input.version_id_marker.as_deref(),
2096 max_keys,
2097 );
2098 let versions: Vec<ObjectVersion> = page
2099 .versions
2100 .into_iter()
2101 .map(|e| ObjectVersion {
2102 key: Some(e.key),
2103 version_id: Some(e.version_id),
2104 is_latest: Some(e.is_latest),
2105 e_tag: Some(ETag::Strong(e.etag)),
2106 size: Some(e.size as i64),
2107 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2108 ..Default::default()
2109 })
2110 .collect();
2111 let delete_markers: Vec<DeleteMarkerEntry> = page
2112 .delete_markers
2113 .into_iter()
2114 .map(|e| DeleteMarkerEntry {
2115 key: Some(e.key),
2116 version_id: Some(e.version_id),
2117 is_latest: Some(e.is_latest),
2118 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2119 ..Default::default()
2120 })
2121 .collect();
2122 let output = ListObjectVersionsOutput {
2123 name: Some(req.input.bucket.clone()),
2124 prefix: req.input.prefix.clone(),
2125 key_marker: req.input.key_marker.clone(),
2126 version_id_marker: req.input.version_id_marker.clone(),
2127 max_keys: req.input.max_keys,
2128 versions: if versions.is_empty() {
2129 None
2130 } else {
2131 Some(versions)
2132 },
2133 delete_markers: if delete_markers.is_empty() {
2134 None
2135 } else {
2136 Some(delete_markers)
2137 },
2138 is_truncated: Some(page.is_truncated),
2139 next_key_marker: page.next_key_marker,
2140 next_version_id_marker: page.next_version_id_marker,
2141 ..Default::default()
2142 };
2143 return Ok(S3Response::new(output));
2144 }
2145 let mut resp = self.backend.list_object_versions(req).await?;
2147 if let Some(versions) = resp.output.versions.as_mut() {
2148 versions.retain(|v| {
2149 v.key
2150 .as_ref()
2151 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2152 .unwrap_or(true)
2153 });
2154 }
2155 if let Some(markers) = resp.output.delete_markers.as_mut() {
2156 markers.retain(|m| {
2157 m.key
2158 .as_ref()
2159 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2160 .unwrap_or(true)
2161 });
2162 }
2163 Ok(resp)
2164 }
2165
2166 async fn create_multipart_upload(
2167 &self,
2168 mut req: S3Request<CreateMultipartUploadInput>,
2169 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
2170 let codec_kind = self.registry.default_kind();
2174 let meta = req.input.metadata.get_or_insert_with(Default::default);
2175 meta.insert(META_MULTIPART.into(), "true".into());
2176 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
2177 debug!(
2178 bucket = ?req.input.bucket,
2179 key = ?req.input.key,
2180 codec = codec_kind.as_str(),
2181 "S4 create_multipart_upload: marking object for per-part compression"
2182 );
2183 self.backend.create_multipart_upload(req).await
2184 }
2185
2186 async fn upload_part(
2187 &self,
2188 mut req: S3Request<UploadPartInput>,
2189 ) -> S3Result<S3Response<UploadPartOutput>> {
2190 if let Some(blob) = req.input.body.take() {
2196 let bytes = collect_blob(blob, self.max_body_bytes)
2197 .await
2198 .map_err(internal("collect upload_part body"))?;
2199 let sample_len = bytes.len().min(SAMPLE_BYTES);
2200 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
2201 let original_size = bytes.len() as u64;
2202 let (compressed, manifest) = self
2203 .registry
2204 .compress(bytes, codec_kind)
2205 .await
2206 .map_err(internal("registry compress part"))?;
2207 let header = FrameHeader {
2208 codec: codec_kind,
2209 original_size,
2210 compressed_size: compressed.len() as u64,
2211 crc32c: manifest.crc32c,
2212 };
2213 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
2214 write_frame(&mut framed, header, &compressed);
2215 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
2229 if !likely_final {
2230 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
2231 }
2232 let framed_bytes = framed.freeze();
2233 let new_len = framed_bytes.len() as i64;
2234 req.input.content_length = Some(new_len);
2236 req.input.checksum_algorithm = None;
2237 req.input.checksum_crc32 = None;
2238 req.input.checksum_crc32c = None;
2239 req.input.checksum_crc64nvme = None;
2240 req.input.checksum_sha1 = None;
2241 req.input.checksum_sha256 = None;
2242 req.input.content_md5 = None;
2243 req.input.body = Some(bytes_to_blob(framed_bytes));
2244 debug!(
2245 part_number = ?req.input.part_number,
2246 upload_id = ?req.input.upload_id,
2247 original_size,
2248 framed_size = new_len,
2249 "S4 upload_part: framed compressed payload"
2250 );
2251 }
2252 self.backend.upload_part(req).await
2253 }
2254 async fn complete_multipart_upload(
2255 &self,
2256 req: S3Request<CompleteMultipartUploadInput>,
2257 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
2258 let bucket = req.input.bucket.clone();
2259 let key = req.input.key.clone();
2260 let resp = self.backend.complete_multipart_upload(req).await?;
2261 let bucket_clone = bucket.clone();
2267 let key_clone = key.clone();
2268 let get_input = GetObjectInput {
2269 bucket: bucket_clone.clone(),
2270 key: key_clone.clone(),
2271 ..Default::default()
2272 };
2273 let get_req = S3Request {
2274 input: get_input,
2275 method: http::Method::GET,
2276 uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
2277 headers: http::HeaderMap::new(),
2278 extensions: http::Extensions::new(),
2279 credentials: None,
2280 region: None,
2281 service: None,
2282 trailing_headers: None,
2283 };
2284 if let Ok(get_resp) = self.backend.get_object(get_req).await
2285 && let Some(blob) = get_resp.output.body
2286 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
2287 && let Ok(index) = build_index_from_body(&body)
2288 {
2289 self.write_sidecar(&bucket, &key, &index).await;
2290 }
2291 Ok(resp)
2292 }
2293 async fn abort_multipart_upload(
2294 &self,
2295 req: S3Request<AbortMultipartUploadInput>,
2296 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
2297 self.backend.abort_multipart_upload(req).await
2298 }
2299 async fn list_multipart_uploads(
2300 &self,
2301 req: S3Request<ListMultipartUploadsInput>,
2302 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
2303 self.backend.list_multipart_uploads(req).await
2304 }
2305 async fn list_parts(
2306 &self,
2307 req: S3Request<ListPartsInput>,
2308 ) -> S3Result<S3Response<ListPartsOutput>> {
2309 self.backend.list_parts(req).await
2310 }
2311
2312 async fn get_object_acl(
2328 &self,
2329 req: S3Request<GetObjectAclInput>,
2330 ) -> S3Result<S3Response<GetObjectAclOutput>> {
2331 self.backend.get_object_acl(req).await
2332 }
2333 async fn put_object_acl(
2334 &self,
2335 req: S3Request<PutObjectAclInput>,
2336 ) -> S3Result<S3Response<PutObjectAclOutput>> {
2337 self.backend.put_object_acl(req).await
2338 }
2339 async fn get_object_tagging(
2340 &self,
2341 req: S3Request<GetObjectTaggingInput>,
2342 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
2343 self.backend.get_object_tagging(req).await
2344 }
2345 async fn put_object_tagging(
2346 &self,
2347 req: S3Request<PutObjectTaggingInput>,
2348 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
2349 self.backend.put_object_tagging(req).await
2350 }
2351 async fn delete_object_tagging(
2352 &self,
2353 req: S3Request<DeleteObjectTaggingInput>,
2354 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
2355 self.backend.delete_object_tagging(req).await
2356 }
2357 async fn get_object_attributes(
2358 &self,
2359 req: S3Request<GetObjectAttributesInput>,
2360 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
2361 self.backend.get_object_attributes(req).await
2362 }
2363 async fn restore_object(
2364 &self,
2365 req: S3Request<RestoreObjectInput>,
2366 ) -> S3Result<S3Response<RestoreObjectOutput>> {
2367 self.backend.restore_object(req).await
2368 }
2369 async fn upload_part_copy(
2370 &self,
2371 req: S3Request<UploadPartCopyInput>,
2372 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
2373 let CopySource::Bucket {
2384 bucket: src_bucket,
2385 key: src_key,
2386 ..
2387 } = &req.input.copy_source
2388 else {
2389 return self.backend.upload_part_copy(req).await;
2390 };
2391 let src_bucket = src_bucket.to_string();
2392 let src_key = src_key.to_string();
2393
2394 let head_input = HeadObjectInput {
2396 bucket: src_bucket.clone(),
2397 key: src_key.clone(),
2398 ..Default::default()
2399 };
2400 let head_req = S3Request {
2401 input: head_input,
2402 method: http::Method::HEAD,
2403 uri: req.uri.clone(),
2404 headers: req.headers.clone(),
2405 extensions: http::Extensions::new(),
2406 credentials: req.credentials.clone(),
2407 region: req.region.clone(),
2408 service: req.service.clone(),
2409 trailing_headers: None,
2410 };
2411 let needs_s4_copy = match self.backend.head_object(head_req).await {
2412 Ok(h) => {
2413 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
2414 }
2415 Err(_) => false,
2416 };
2417 if !needs_s4_copy {
2418 return self.backend.upload_part_copy(req).await;
2419 }
2420
2421 let source_range = req
2423 .input
2424 .copy_source_range
2425 .as_ref()
2426 .map(|r| parse_copy_source_range(r))
2427 .transpose()
2428 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2429
2430 let mut get_input = GetObjectInput {
2434 bucket: src_bucket.clone(),
2435 key: src_key.clone(),
2436 ..Default::default()
2437 };
2438 get_input.range = source_range;
2439 let get_req = S3Request {
2440 input: get_input,
2441 method: http::Method::GET,
2442 uri: req.uri.clone(),
2443 headers: req.headers.clone(),
2444 extensions: http::Extensions::new(),
2445 credentials: req.credentials.clone(),
2446 region: req.region.clone(),
2447 service: req.service.clone(),
2448 trailing_headers: None,
2449 };
2450 let get_resp = self.get_object(get_req).await?;
2451 let blob = get_resp.output.body.ok_or_else(|| {
2452 S3Error::with_message(
2453 S3ErrorCode::InternalError,
2454 "upload_part_copy: empty body from source GET",
2455 )
2456 })?;
2457 let bytes = collect_blob(blob, self.max_body_bytes)
2458 .await
2459 .map_err(internal("collect upload_part_copy source body"))?;
2460
2461 let sample_len = bytes.len().min(SAMPLE_BYTES);
2463 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
2464 let original_size = bytes.len() as u64;
2465 let (compressed, manifest) = self
2466 .registry
2467 .compress(bytes, codec_kind)
2468 .await
2469 .map_err(internal("registry compress upload_part_copy"))?;
2470 let header = FrameHeader {
2471 codec: codec_kind,
2472 original_size,
2473 compressed_size: compressed.len() as u64,
2474 crc32c: manifest.crc32c,
2475 };
2476 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
2477 write_frame(&mut framed, header, &compressed);
2478 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
2479 if !likely_final {
2480 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
2481 }
2482 let framed_bytes = framed.freeze();
2483 let framed_len = framed_bytes.len() as i64;
2484
2485 let part_input = UploadPartInput {
2487 bucket: req.input.bucket.clone(),
2488 key: req.input.key.clone(),
2489 part_number: req.input.part_number,
2490 upload_id: req.input.upload_id.clone(),
2491 body: Some(bytes_to_blob(framed_bytes)),
2492 content_length: Some(framed_len),
2493 ..Default::default()
2494 };
2495 let part_req = S3Request {
2496 input: part_input,
2497 method: http::Method::PUT,
2498 uri: req.uri.clone(),
2499 headers: req.headers.clone(),
2500 extensions: http::Extensions::new(),
2501 credentials: req.credentials.clone(),
2502 region: req.region.clone(),
2503 service: req.service.clone(),
2504 trailing_headers: None,
2505 };
2506 let upload_resp = self.backend.upload_part(part_req).await?;
2507
2508 let copy_output = UploadPartCopyOutput {
2509 copy_part_result: Some(CopyPartResult {
2510 e_tag: upload_resp.output.e_tag.clone(),
2511 ..Default::default()
2512 }),
2513 ..Default::default()
2514 };
2515 Ok(S3Response::new(copy_output))
2516 }
2517
2518 async fn get_object_lock_configuration(
2525 &self,
2526 req: S3Request<GetObjectLockConfigurationInput>,
2527 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
2528 if let Some(mgr) = self.object_lock.as_ref() {
2529 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
2530 ObjectLockConfiguration {
2531 object_lock_enabled: Some(ObjectLockEnabled::from_static(
2532 ObjectLockEnabled::ENABLED,
2533 )),
2534 rule: Some(ObjectLockRule {
2535 default_retention: Some(DefaultRetention {
2536 days: Some(d.retention_days as i32),
2537 mode: Some(ObjectLockRetentionMode::from_static(
2538 match d.mode {
2539 crate::object_lock::LockMode::Governance => {
2540 ObjectLockRetentionMode::GOVERNANCE
2541 }
2542 crate::object_lock::LockMode::Compliance => {
2543 ObjectLockRetentionMode::COMPLIANCE
2544 }
2545 },
2546 )),
2547 years: None,
2548 }),
2549 }),
2550 }
2551 });
2552 let output = GetObjectLockConfigurationOutput {
2553 object_lock_configuration: cfg,
2554 };
2555 return Ok(S3Response::new(output));
2556 }
2557 self.backend.get_object_lock_configuration(req).await
2558 }
2559 async fn put_object_lock_configuration(
2560 &self,
2561 req: S3Request<PutObjectLockConfigurationInput>,
2562 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
2563 if let Some(mgr) = self.object_lock.as_ref() {
2564 let bucket = req.input.bucket.clone();
2565 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
2566 && let Some(rule) = cfg.rule.as_ref()
2567 && let Some(d) = rule.default_retention.as_ref()
2568 {
2569 let mode = d
2570 .mode
2571 .as_ref()
2572 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
2573 .ok_or_else(|| {
2574 S3Error::with_message(
2575 S3ErrorCode::InvalidRequest,
2576 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
2577 )
2578 })?;
2579 let days: u32 = match (d.days, d.years) {
2583 (Some(d), None) if d > 0 => d as u32,
2584 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
2585 _ => {
2586 return Err(S3Error::with_message(
2587 S3ErrorCode::InvalidRequest,
2588 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
2589 ));
2590 }
2591 };
2592 mgr.set_bucket_default(
2593 &bucket,
2594 crate::object_lock::BucketObjectLockDefault {
2595 mode,
2596 retention_days: days,
2597 },
2598 );
2599 }
2600 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
2601 }
2602 self.backend.put_object_lock_configuration(req).await
2603 }
2604 async fn get_object_legal_hold(
2605 &self,
2606 req: S3Request<GetObjectLegalHoldInput>,
2607 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
2608 if let Some(mgr) = self.object_lock.as_ref() {
2609 let on = mgr
2610 .get(&req.input.bucket, &req.input.key)
2611 .map(|s| s.legal_hold_on)
2612 .unwrap_or(false);
2613 let status = ObjectLockLegalHoldStatus::from_static(if on {
2614 ObjectLockLegalHoldStatus::ON
2615 } else {
2616 ObjectLockLegalHoldStatus::OFF
2617 });
2618 let output = GetObjectLegalHoldOutput {
2619 legal_hold: Some(ObjectLockLegalHold {
2620 status: Some(status),
2621 }),
2622 };
2623 return Ok(S3Response::new(output));
2624 }
2625 self.backend.get_object_legal_hold(req).await
2626 }
2627 async fn put_object_legal_hold(
2628 &self,
2629 req: S3Request<PutObjectLegalHoldInput>,
2630 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
2631 if let Some(mgr) = self.object_lock.as_ref() {
2632 let on = req
2633 .input
2634 .legal_hold
2635 .as_ref()
2636 .and_then(|h| h.status.as_ref())
2637 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
2638 .unwrap_or(false);
2639 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
2640 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
2641 }
2642 self.backend.put_object_legal_hold(req).await
2643 }
2644 async fn get_object_retention(
2645 &self,
2646 req: S3Request<GetObjectRetentionInput>,
2647 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
2648 if let Some(mgr) = self.object_lock.as_ref() {
2649 let retention = mgr
2650 .get(&req.input.bucket, &req.input.key)
2651 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
2652 .map(|s| {
2653 let mode = s.mode.map(|m| {
2654 ObjectLockRetentionMode::from_static(match m {
2655 crate::object_lock::LockMode::Governance => {
2656 ObjectLockRetentionMode::GOVERNANCE
2657 }
2658 crate::object_lock::LockMode::Compliance => {
2659 ObjectLockRetentionMode::COMPLIANCE
2660 }
2661 })
2662 });
2663 let until = s.retain_until.map(chrono_utc_to_timestamp);
2664 ObjectLockRetention {
2665 mode,
2666 retain_until_date: until,
2667 }
2668 });
2669 let output = GetObjectRetentionOutput { retention };
2670 return Ok(S3Response::new(output));
2671 }
2672 self.backend.get_object_retention(req).await
2673 }
2674 async fn put_object_retention(
2675 &self,
2676 req: S3Request<PutObjectRetentionInput>,
2677 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
2678 if let Some(mgr) = self.object_lock.as_ref() {
2679 let bucket = req.input.bucket.clone();
2680 let key = req.input.key.clone();
2681 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2682 let retention = req.input.retention.as_ref().ok_or_else(|| {
2683 S3Error::with_message(
2684 S3ErrorCode::InvalidRequest,
2685 "PutObjectRetention requires a Retention element",
2686 )
2687 })?;
2688 let new_mode = retention
2689 .mode
2690 .as_ref()
2691 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
2692 let new_until = retention
2693 .retain_until_date
2694 .as_ref()
2695 .map(timestamp_to_chrono_utc)
2696 .unwrap_or(None);
2697 let now = chrono::Utc::now();
2698 let existing = mgr.get(&bucket, &key).unwrap_or_default();
2699 if let Some(existing_mode) = existing.mode
2705 && existing_mode == crate::object_lock::LockMode::Compliance
2706 && existing.is_locked(now)
2707 {
2708 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
2709 return Err(S3Error::with_message(
2710 S3ErrorCode::AccessDenied,
2711 "Cannot downgrade Compliance retention to Governance while lock is active",
2712 ));
2713 }
2714 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
2715 && next < prev
2716 {
2717 return Err(S3Error::with_message(
2718 S3ErrorCode::AccessDenied,
2719 "Cannot shorten Compliance retention while lock is active",
2720 ));
2721 }
2722 }
2723 if let Some(existing_mode) = existing.mode
2724 && existing_mode == crate::object_lock::LockMode::Governance
2725 && existing.is_locked(now)
2726 && !bypass
2727 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
2728 && next < prev
2729 {
2730 return Err(S3Error::with_message(
2731 S3ErrorCode::AccessDenied,
2732 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
2733 ));
2734 }
2735 let mut state = existing;
2736 if new_mode.is_some() {
2737 state.mode = new_mode;
2738 }
2739 if new_until.is_some() {
2740 state.retain_until = new_until;
2741 }
2742 mgr.set(&bucket, &key, state);
2743 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
2744 }
2745 self.backend.put_object_retention(req).await
2746 }
2747
2748 async fn get_bucket_versioning(
2754 &self,
2755 req: S3Request<GetBucketVersioningInput>,
2756 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
2757 if let Some(mgr) = self.versioning.as_ref() {
2762 let output = match mgr.state(&req.input.bucket).as_aws_status() {
2763 Some(s) => GetBucketVersioningOutput {
2764 status: Some(BucketVersioningStatus::from(s.to_owned())),
2765 ..Default::default()
2766 },
2767 None => GetBucketVersioningOutput::default(),
2768 };
2769 return Ok(S3Response::new(output));
2770 }
2771 self.backend.get_bucket_versioning(req).await
2772 }
2773 async fn put_bucket_versioning(
2774 &self,
2775 req: S3Request<PutBucketVersioningInput>,
2776 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
2777 if let Some(mgr) = self.versioning.as_ref() {
2783 let new_state = match req
2784 .input
2785 .versioning_configuration
2786 .status
2787 .as_ref()
2788 .map(|s| s.as_str())
2789 {
2790 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
2791 crate::versioning::VersioningState::Enabled
2792 }
2793 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
2794 crate::versioning::VersioningState::Suspended
2795 }
2796 _ => crate::versioning::VersioningState::Unversioned,
2797 };
2798 mgr.set_state(&req.input.bucket, new_state);
2799 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
2800 }
2801 self.backend.put_bucket_versioning(req).await
2802 }
2803
2804 async fn get_bucket_location(
2806 &self,
2807 req: S3Request<GetBucketLocationInput>,
2808 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
2809 self.backend.get_bucket_location(req).await
2810 }
2811
2812 async fn get_bucket_policy(
2814 &self,
2815 req: S3Request<GetBucketPolicyInput>,
2816 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
2817 self.backend.get_bucket_policy(req).await
2818 }
2819 async fn put_bucket_policy(
2820 &self,
2821 req: S3Request<PutBucketPolicyInput>,
2822 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
2823 self.backend.put_bucket_policy(req).await
2824 }
2825 async fn delete_bucket_policy(
2826 &self,
2827 req: S3Request<DeleteBucketPolicyInput>,
2828 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
2829 self.backend.delete_bucket_policy(req).await
2830 }
2831 async fn get_bucket_policy_status(
2832 &self,
2833 req: S3Request<GetBucketPolicyStatusInput>,
2834 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
2835 self.backend.get_bucket_policy_status(req).await
2836 }
2837
2838 async fn get_bucket_acl(
2840 &self,
2841 req: S3Request<GetBucketAclInput>,
2842 ) -> S3Result<S3Response<GetBucketAclOutput>> {
2843 self.backend.get_bucket_acl(req).await
2844 }
2845 async fn put_bucket_acl(
2846 &self,
2847 req: S3Request<PutBucketAclInput>,
2848 ) -> S3Result<S3Response<PutBucketAclOutput>> {
2849 self.backend.put_bucket_acl(req).await
2850 }
2851
2852 async fn get_bucket_cors(
2854 &self,
2855 req: S3Request<GetBucketCorsInput>,
2856 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
2857 self.backend.get_bucket_cors(req).await
2858 }
2859 async fn put_bucket_cors(
2860 &self,
2861 req: S3Request<PutBucketCorsInput>,
2862 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
2863 self.backend.put_bucket_cors(req).await
2864 }
2865 async fn delete_bucket_cors(
2866 &self,
2867 req: S3Request<DeleteBucketCorsInput>,
2868 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
2869 self.backend.delete_bucket_cors(req).await
2870 }
2871
2872 async fn get_bucket_lifecycle_configuration(
2874 &self,
2875 req: S3Request<GetBucketLifecycleConfigurationInput>,
2876 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
2877 self.backend.get_bucket_lifecycle_configuration(req).await
2878 }
2879 async fn put_bucket_lifecycle_configuration(
2880 &self,
2881 req: S3Request<PutBucketLifecycleConfigurationInput>,
2882 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
2883 self.backend.put_bucket_lifecycle_configuration(req).await
2884 }
2885 async fn delete_bucket_lifecycle(
2886 &self,
2887 req: S3Request<DeleteBucketLifecycleInput>,
2888 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
2889 self.backend.delete_bucket_lifecycle(req).await
2890 }
2891
2892 async fn get_bucket_tagging(
2894 &self,
2895 req: S3Request<GetBucketTaggingInput>,
2896 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
2897 self.backend.get_bucket_tagging(req).await
2898 }
2899 async fn put_bucket_tagging(
2900 &self,
2901 req: S3Request<PutBucketTaggingInput>,
2902 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
2903 self.backend.put_bucket_tagging(req).await
2904 }
2905 async fn delete_bucket_tagging(
2906 &self,
2907 req: S3Request<DeleteBucketTaggingInput>,
2908 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
2909 self.backend.delete_bucket_tagging(req).await
2910 }
2911
2912 async fn get_bucket_encryption(
2914 &self,
2915 req: S3Request<GetBucketEncryptionInput>,
2916 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
2917 self.backend.get_bucket_encryption(req).await
2918 }
2919 async fn put_bucket_encryption(
2920 &self,
2921 req: S3Request<PutBucketEncryptionInput>,
2922 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
2923 self.backend.put_bucket_encryption(req).await
2924 }
2925 async fn delete_bucket_encryption(
2926 &self,
2927 req: S3Request<DeleteBucketEncryptionInput>,
2928 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
2929 self.backend.delete_bucket_encryption(req).await
2930 }
2931
2932 async fn get_bucket_logging(
2934 &self,
2935 req: S3Request<GetBucketLoggingInput>,
2936 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
2937 self.backend.get_bucket_logging(req).await
2938 }
2939 async fn put_bucket_logging(
2940 &self,
2941 req: S3Request<PutBucketLoggingInput>,
2942 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
2943 self.backend.put_bucket_logging(req).await
2944 }
2945
2946 async fn get_bucket_notification_configuration(
2948 &self,
2949 req: S3Request<GetBucketNotificationConfigurationInput>,
2950 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
2951 self.backend
2952 .get_bucket_notification_configuration(req)
2953 .await
2954 }
2955 async fn put_bucket_notification_configuration(
2956 &self,
2957 req: S3Request<PutBucketNotificationConfigurationInput>,
2958 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
2959 self.backend
2960 .put_bucket_notification_configuration(req)
2961 .await
2962 }
2963
2964 async fn get_bucket_request_payment(
2966 &self,
2967 req: S3Request<GetBucketRequestPaymentInput>,
2968 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
2969 self.backend.get_bucket_request_payment(req).await
2970 }
2971 async fn put_bucket_request_payment(
2972 &self,
2973 req: S3Request<PutBucketRequestPaymentInput>,
2974 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
2975 self.backend.put_bucket_request_payment(req).await
2976 }
2977
2978 async fn get_bucket_website(
2980 &self,
2981 req: S3Request<GetBucketWebsiteInput>,
2982 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
2983 self.backend.get_bucket_website(req).await
2984 }
2985 async fn put_bucket_website(
2986 &self,
2987 req: S3Request<PutBucketWebsiteInput>,
2988 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
2989 self.backend.put_bucket_website(req).await
2990 }
2991 async fn delete_bucket_website(
2992 &self,
2993 req: S3Request<DeleteBucketWebsiteInput>,
2994 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
2995 self.backend.delete_bucket_website(req).await
2996 }
2997
2998 async fn get_bucket_replication(
3000 &self,
3001 req: S3Request<GetBucketReplicationInput>,
3002 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
3003 self.backend.get_bucket_replication(req).await
3004 }
3005 async fn put_bucket_replication(
3006 &self,
3007 req: S3Request<PutBucketReplicationInput>,
3008 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
3009 self.backend.put_bucket_replication(req).await
3010 }
3011 async fn delete_bucket_replication(
3012 &self,
3013 req: S3Request<DeleteBucketReplicationInput>,
3014 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
3015 self.backend.delete_bucket_replication(req).await
3016 }
3017
3018 async fn get_bucket_accelerate_configuration(
3020 &self,
3021 req: S3Request<GetBucketAccelerateConfigurationInput>,
3022 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
3023 self.backend.get_bucket_accelerate_configuration(req).await
3024 }
3025 async fn put_bucket_accelerate_configuration(
3026 &self,
3027 req: S3Request<PutBucketAccelerateConfigurationInput>,
3028 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
3029 self.backend.put_bucket_accelerate_configuration(req).await
3030 }
3031
3032 async fn get_bucket_ownership_controls(
3034 &self,
3035 req: S3Request<GetBucketOwnershipControlsInput>,
3036 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
3037 self.backend.get_bucket_ownership_controls(req).await
3038 }
3039 async fn put_bucket_ownership_controls(
3040 &self,
3041 req: S3Request<PutBucketOwnershipControlsInput>,
3042 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
3043 self.backend.put_bucket_ownership_controls(req).await
3044 }
3045 async fn delete_bucket_ownership_controls(
3046 &self,
3047 req: S3Request<DeleteBucketOwnershipControlsInput>,
3048 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
3049 self.backend.delete_bucket_ownership_controls(req).await
3050 }
3051
3052 async fn get_public_access_block(
3054 &self,
3055 req: S3Request<GetPublicAccessBlockInput>,
3056 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
3057 self.backend.get_public_access_block(req).await
3058 }
3059 async fn put_public_access_block(
3060 &self,
3061 req: S3Request<PutPublicAccessBlockInput>,
3062 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
3063 self.backend.put_public_access_block(req).await
3064 }
3065 async fn delete_public_access_block(
3066 &self,
3067 req: S3Request<DeletePublicAccessBlockInput>,
3068 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
3069 self.backend.delete_public_access_block(req).await
3070 }
3071}
3072
3073#[derive(Debug, Clone)]
3100pub struct SigV4aGate {
3101 store: crate::sigv4a::SharedSigV4aCredentialStore,
3102}
3103
3104impl SigV4aGate {
3105 #[must_use]
3106 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
3107 Self { store }
3108 }
3109
3110 pub fn pre_route<B>(
3126 &self,
3127 req: &http::Request<B>,
3128 requested_region: &str,
3129 canonical_request_bytes: &[u8],
3130 ) -> Result<(), SigV4aGateError> {
3131 if !crate::sigv4a::detect(req) {
3132 return Ok(());
3133 }
3134 let auth_hdr = req
3135 .headers()
3136 .get(http::header::AUTHORIZATION)
3137 .and_then(|v| v.to_str().ok())
3138 .ok_or(SigV4aGateError::MissingAuthorization)?;
3139 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
3140 .ok_or(SigV4aGateError::MalformedAuthorization)?;
3141 let region_set = req
3142 .headers()
3143 .get(crate::sigv4a::REGION_SET_HEADER)
3144 .and_then(|v| v.to_str().ok())
3145 .unwrap_or("*");
3146 let key = self
3147 .store
3148 .get(&parsed.access_key_id)
3149 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
3150 crate::sigv4a::verify(
3151 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
3152 &parsed.signature_der,
3153 key,
3154 region_set,
3155 requested_region,
3156 )
3157 .map_err(SigV4aGateError::Verify)?;
3158 Ok(())
3159 }
3160}
3161
3162#[derive(Debug, thiserror::Error)]
3166pub enum SigV4aGateError {
3167 #[error("missing Authorization header")]
3168 MissingAuthorization,
3169 #[error("malformed SigV4a Authorization header")]
3170 MalformedAuthorization,
3171 #[error("unknown SigV4a access-key-id: {0}")]
3172 UnknownAccessKey(String),
3173 #[error("SigV4a verification failed: {0}")]
3174 Verify(#[source] crate::sigv4a::SigV4aError),
3175}
3176
3177impl SigV4aGateError {
3178 #[must_use]
3180 pub fn s3_error_code(&self) -> &'static str {
3181 match self {
3182 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
3183 _ => "SignatureDoesNotMatch",
3184 }
3185 }
3186}
3187
3188#[cfg(test)]
3189mod tests {
3190 use super::*;
3191
3192 #[test]
3193 fn manifest_roundtrip_via_metadata() {
3194 let original = ChunkManifest {
3195 codec: CodecKind::CpuZstd,
3196 original_size: 1234,
3197 compressed_size: 567,
3198 crc32c: 0xdead_beef,
3199 };
3200 let mut meta: Option<Metadata> = None;
3201 write_manifest(&mut meta, &original);
3202 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
3203 assert_eq!(extracted.codec, original.codec);
3204 assert_eq!(extracted.original_size, original.original_size);
3205 assert_eq!(extracted.compressed_size, original.compressed_size);
3206 assert_eq!(extracted.crc32c, original.crc32c);
3207 }
3208
3209 #[test]
3210 fn missing_metadata_yields_none() {
3211 let meta: Option<Metadata> = None;
3212 assert!(extract_manifest(&meta).is_none());
3213 }
3214
3215 #[test]
3216 fn partial_metadata_yields_none() {
3217 let mut meta = Metadata::new();
3218 meta.insert(META_CODEC.into(), "cpu-zstd".into());
3219 let opt = Some(meta);
3220 assert!(extract_manifest(&opt).is_none());
3221 }
3222
3223 #[test]
3224 fn parse_copy_source_range_basic() {
3225 let r = parse_copy_source_range("bytes=10-20").unwrap();
3226 match r {
3227 s3s::dto::Range::Int { first, last } => {
3228 assert_eq!(first, 10);
3229 assert_eq!(last, Some(20));
3230 }
3231 _ => panic!("expected Int range"),
3232 }
3233 }
3234
3235 #[test]
3236 fn parse_copy_source_range_rejects_inverted() {
3237 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
3238 assert!(err.contains("last < first"));
3239 }
3240
3241 #[test]
3242 fn parse_copy_source_range_rejects_missing_prefix() {
3243 let err = parse_copy_source_range("10-20").unwrap_err();
3244 assert!(err.contains("must start with 'bytes='"));
3245 }
3246
3247 #[test]
3248 fn parse_copy_source_range_rejects_open_ended() {
3249 assert!(parse_copy_source_range("bytes=10-").is_err());
3252 assert!(parse_copy_source_range("bytes=-10").is_err());
3253 }
3254}