1use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39 write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
50 supports_streaming_compress, supports_streaming_decompress,
51};
52
53const SAMPLE_BYTES: usize = 4096;
55
56struct AccessLogPreamble {
60 remote_ip: Option<String>,
61 requester: Option<String>,
62 request_uri: String,
63 user_agent: Option<String>,
64}
65
66pub struct S4Service<B: S3> {
67 backend: B,
68 registry: Arc<CodecRegistry>,
69 dispatcher: Arc<dyn CodecDispatcher>,
70 max_body_bytes: usize,
71 policy: Option<crate::policy::SharedPolicy>,
72 secure_transport: bool,
77 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
79 access_log: Option<crate::access_log::SharedAccessLog>,
81 sse_key: Option<crate::sse::SharedSseKey>,
86}
87
88impl<B: S3> S4Service<B> {
89 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
91
92 pub fn new(
93 backend: B,
94 registry: Arc<CodecRegistry>,
95 dispatcher: Arc<dyn CodecDispatcher>,
96 ) -> Self {
97 Self {
98 backend,
99 registry,
100 dispatcher,
101 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
102 policy: None,
103 secure_transport: false,
104 rate_limits: None,
105 access_log: None,
106 sse_key: None,
107 }
108 }
109
110 #[must_use]
116 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
117 self.sse_key = Some(key);
118 self
119 }
120
121 #[must_use]
127 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
128 self.access_log = Some(log);
129 self
130 }
131
132 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
137 self.access_log.as_ref()?;
138 Some(AccessLogPreamble {
139 remote_ip: req
140 .headers
141 .get("x-forwarded-for")
142 .and_then(|v| v.to_str().ok())
143 .and_then(|raw| raw.split(',').next())
144 .map(|s| s.trim().to_owned()),
145 requester: Self::principal_of(req).map(str::to_owned),
146 request_uri: format!("{} {}", req.method, req.uri.path()),
147 user_agent: req
148 .headers
149 .get("user-agent")
150 .and_then(|v| v.to_str().ok())
151 .map(str::to_owned),
152 })
153 }
154
155 #[allow(clippy::too_many_arguments)]
159 async fn record_access(
160 &self,
161 preamble: Option<AccessLogPreamble>,
162 operation: &'static str,
163 bucket: &str,
164 key: Option<&str>,
165 http_status: u16,
166 bytes_sent: u64,
167 object_size: u64,
168 total_time_ms: u64,
169 error_code: Option<&str>,
170 ) {
171 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
172 return;
173 };
174 log.record(crate::access_log::AccessLogEntry {
175 time: std::time::SystemTime::now(),
176 bucket: bucket.to_owned(),
177 remote_ip: p.remote_ip,
178 requester: p.requester,
179 operation,
180 key: key.map(str::to_owned),
181 request_uri: p.request_uri,
182 http_status,
183 error_code: error_code.map(str::to_owned),
184 bytes_sent,
185 object_size,
186 total_time_ms,
187 user_agent: p.user_agent,
188 })
189 .await;
190 }
191
192 #[must_use]
198 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
199 self.rate_limits = Some(rl);
200 self
201 }
202
203 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
207 let Some(rl) = self.rate_limits.as_ref() else {
208 return Ok(());
209 };
210 let principal_id = Self::principal_of(req);
211 if !rl.check(principal_id, bucket) {
212 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
213 return Err(S3Error::with_message(
214 S3ErrorCode::SlowDown,
215 format!("rate-limited: bucket={bucket}"),
216 ));
217 }
218 Ok(())
219 }
220
221 #[must_use]
225 pub fn with_secure_transport(mut self, on: bool) -> Self {
226 self.secure_transport = on;
227 self
228 }
229
230 #[must_use]
231 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
232 self.max_body_bytes = n;
233 self
234 }
235
236 #[must_use]
241 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
242 self.policy = Some(policy);
243 self
244 }
245
246 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
249 req.credentials.as_ref().map(|c| c.access_key.as_str())
250 }
251
252 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
259 let user_agent = req
260 .headers
261 .get("user-agent")
262 .and_then(|v| v.to_str().ok())
263 .map(str::to_owned);
264 let source_ip = req
267 .headers
268 .get("x-forwarded-for")
269 .and_then(|v| v.to_str().ok())
270 .and_then(|raw| raw.split(',').next())
271 .and_then(|s| s.trim().parse().ok());
272 crate::policy::RequestContext {
273 source_ip,
274 user_agent,
275 request_time: Some(std::time::SystemTime::now()),
276 secure_transport: self.secure_transport,
277 extra: Default::default(),
278 }
279 }
280
281 fn enforce_policy<I>(
286 &self,
287 req: &S3Request<I>,
288 action: &'static str,
289 bucket: &str,
290 key: Option<&str>,
291 ) -> S3Result<()> {
292 let Some(policy) = self.policy.as_ref() else {
293 return Ok(());
294 };
295 let principal_id = Self::principal_of(req);
296 let ctx = self.request_context(req);
297 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
298 if decision.allow {
299 Ok(())
300 } else {
301 crate::metrics::record_policy_denial(action, bucket);
302 tracing::info!(
303 action,
304 bucket,
305 key = ?key,
306 principal = ?principal_id,
307 source_ip = ?ctx.source_ip,
308 user_agent = ?ctx.user_agent,
309 secure_transport = ctx.secure_transport,
310 matched_sid = ?decision.matched_sid,
311 effect = ?decision.matched_effect,
312 "S4 policy denied request"
313 );
314 Err(S3Error::with_message(
315 S3ErrorCode::AccessDenied,
316 format!("denied by S4 policy: {action} on bucket={bucket}"),
317 ))
318 }
319 }
320
321 pub fn into_backend(self) -> B {
323 self.backend
324 }
325
326 async fn partial_range_get(
329 &self,
330 req: &S3Request<GetObjectInput>,
331 plan: s4_codec::index::RangePlan,
332 client_start: u64,
333 client_end_exclusive: u64,
334 total_original: u64,
335 get_start: Instant,
336 ) -> S3Result<S3Response<GetObjectOutput>> {
337 let backend_range = s3s::dto::Range::Int {
339 first: plan.byte_start,
340 last: Some(plan.byte_end_exclusive - 1),
341 };
342 let backend_input = GetObjectInput {
343 bucket: req.input.bucket.clone(),
344 key: req.input.key.clone(),
345 range: Some(backend_range),
346 ..Default::default()
347 };
348 let backend_req = S3Request {
349 input: backend_input,
350 method: req.method.clone(),
351 uri: req.uri.clone(),
352 headers: req.headers.clone(),
353 extensions: http::Extensions::new(),
354 credentials: req.credentials.clone(),
355 region: req.region.clone(),
356 service: req.service.clone(),
357 trailing_headers: None,
358 };
359 let mut backend_resp = self.backend.get_object(backend_req).await?;
360 let blob = backend_resp.output.body.take().ok_or_else(|| {
361 S3Error::with_message(
362 S3ErrorCode::InternalError,
363 "backend partial GET returned empty body",
364 )
365 })?;
366 let bytes = collect_blob(blob, self.max_body_bytes)
367 .await
368 .map_err(internal("collect partial body"))?;
369
370 let mut combined = BytesMut::new();
372 for frame in FrameIter::new(bytes) {
373 let (header, payload) = frame.map_err(|e| {
374 S3Error::with_message(
375 S3ErrorCode::InternalError,
376 format!("partial-range frame parse: {e}"),
377 )
378 })?;
379 let chunk_manifest = ChunkManifest {
380 codec: header.codec,
381 original_size: header.original_size,
382 compressed_size: header.compressed_size,
383 crc32c: header.crc32c,
384 };
385 let decompressed = self
386 .registry
387 .decompress(payload, &chunk_manifest)
388 .await
389 .map_err(internal("partial-range decompress"))?;
390 combined.extend_from_slice(&decompressed);
391 }
392 let combined = combined.freeze();
393 let sliced = combined
394 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
395
396 let returned_size = sliced.len() as u64;
398 backend_resp.output.content_length = Some(returned_size as i64);
399 backend_resp.output.content_range = Some(format!(
400 "bytes {client_start}-{}/{total_original}",
401 client_end_exclusive - 1
402 ));
403 backend_resp.output.checksum_crc32 = None;
404 backend_resp.output.checksum_crc32c = None;
405 backend_resp.output.checksum_crc64nvme = None;
406 backend_resp.output.checksum_sha1 = None;
407 backend_resp.output.checksum_sha256 = None;
408 backend_resp.output.e_tag = None;
409 backend_resp.output.body = Some(bytes_to_blob(sliced));
410 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
411
412 let elapsed = get_start.elapsed();
413 crate::metrics::record_get(
414 "partial",
415 plan.byte_end_exclusive - plan.byte_start,
416 returned_size,
417 elapsed.as_secs_f64(),
418 true,
419 );
420 info!(
421 op = "get_object",
422 bucket = %req.input.bucket,
423 key = %req.input.key,
424 bytes_in = plan.byte_end_exclusive - plan.byte_start,
425 bytes_out = returned_size,
426 total_object_size = total_original,
427 range = true,
428 path = "sidecar-partial",
429 latency_ms = elapsed.as_millis() as u64,
430 "S4 partial Range GET via sidecar index"
431 );
432 Ok(backend_resp)
433 }
434
435 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
439 let bytes = encode_index(index);
440 let len = bytes.len() as i64;
441 let put_input = PutObjectInput {
442 bucket: bucket.into(),
443 key: sidecar_key(key),
444 body: Some(bytes_to_blob(bytes)),
445 content_length: Some(len),
446 content_type: Some("application/x-s4-index".into()),
447 ..Default::default()
448 };
449 let put_req = S3Request {
450 input: put_input,
451 method: http::Method::PUT,
452 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
453 headers: http::HeaderMap::new(),
454 extensions: http::Extensions::new(),
455 credentials: None,
456 region: None,
457 service: None,
458 trailing_headers: None,
459 };
460 if let Err(e) = self.backend.put_object(put_req).await {
461 tracing::warn!(
462 bucket,
463 key,
464 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
465 );
466 }
467 }
468
469 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
471 let get_input = GetObjectInput {
472 bucket: bucket.into(),
473 key: sidecar_key(key),
474 ..Default::default()
475 };
476 let get_req = S3Request {
477 input: get_input,
478 method: http::Method::GET,
479 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
480 headers: http::HeaderMap::new(),
481 extensions: http::Extensions::new(),
482 credentials: None,
483 region: None,
484 service: None,
485 trailing_headers: None,
486 };
487 let resp = self.backend.get_object(get_req).await.ok()?;
488 let blob = resp.output.body?;
489 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
490 decode_index(bytes).ok()
491 }
492
493 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
499 let mut out = BytesMut::new();
500 for frame in FrameIter::new(bytes) {
501 let (header, payload) = frame.map_err(|e| {
502 S3Error::with_message(
503 S3ErrorCode::InternalError,
504 format!("multipart frame parse: {e}"),
505 )
506 })?;
507 let chunk_manifest = ChunkManifest {
508 codec: header.codec,
509 original_size: header.original_size,
510 compressed_size: header.compressed_size,
511 crc32c: header.crc32c,
512 };
513 let decompressed = self
514 .registry
515 .decompress(payload, &chunk_manifest)
516 .await
517 .map_err(internal("multipart frame decompress"))?;
518 out.extend_from_slice(&decompressed);
519 }
520 Ok(out.freeze())
521 }
522}
523
524fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
529 let rest = s
530 .strip_prefix("bytes=")
531 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
532 let (a, b) = rest
533 .split_once('-')
534 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
535 let first: u64 = a
536 .parse()
537 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
538 let last: u64 = b
539 .parse()
540 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
541 if last < first {
542 return Err(format!("CopySourceRange last < first: {s:?}"));
543 }
544 Ok(s3s::dto::Range::Int {
545 first,
546 last: Some(last),
547 })
548}
549
550fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
551 metadata
552 .as_ref()
553 .and_then(|m| m.get(META_MULTIPART))
554 .map(|v| v == "true")
555 .unwrap_or(false)
556}
557
558const META_CODEC: &str = "s4-codec";
559const META_ORIGINAL_SIZE: &str = "s4-original-size";
560const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
561const META_CRC32C: &str = "s4-crc32c";
562const META_MULTIPART: &str = "s4-multipart";
565const META_FRAMED: &str = "s4-framed";
569
570fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
571 metadata
572 .as_ref()
573 .and_then(|m| m.get(META_FRAMED))
574 .map(|v| v == "true")
575 .unwrap_or(false)
576}
577
578fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
580 metadata
581 .as_ref()
582 .and_then(|m| m.get("s4-encrypted"))
583 .map(|v| v == "aes-256-gcm")
584 .unwrap_or(false)
585}
586
587fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
588 let m = metadata.as_ref()?;
589 let codec = m
590 .get(META_CODEC)
591 .and_then(|s| s.parse::<CodecKind>().ok())?;
592 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
593 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
594 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
595 Some(ChunkManifest {
596 codec,
597 original_size,
598 compressed_size,
599 crc32c,
600 })
601}
602
603fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
604 let meta = metadata.get_or_insert_with(Default::default);
605 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
606 meta.insert(
607 META_ORIGINAL_SIZE.into(),
608 manifest.original_size.to_string(),
609 );
610 meta.insert(
611 META_COMPRESSED_SIZE.into(),
612 manifest.compressed_size.to_string(),
613 );
614 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
615}
616
617fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
618 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
619}
620
621pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
625 if total == 0 {
626 return Err("cannot range-get zero-length object".into());
627 }
628 match range {
629 s3s::dto::Range::Int { first, last } => {
630 let start = *first;
631 let end_inclusive = match last {
632 Some(l) => (*l).min(total - 1),
633 None => total - 1,
634 };
635 if start > end_inclusive || start >= total {
636 return Err(format!(
637 "range bytes={start}-{:?} out of object size {total}",
638 last
639 ));
640 }
641 Ok((start, end_inclusive + 1))
642 }
643 s3s::dto::Range::Suffix { length } => {
644 let len = (*length).min(total);
645 Ok((total - len, total))
646 }
647 }
648}
649
650#[async_trait::async_trait]
651impl<B: S3> S3 for S4Service<B> {
652 #[tracing::instrument(
654 name = "s4.put_object",
655 skip(self, req),
656 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
657 )]
658 async fn put_object(
659 &self,
660 mut req: S3Request<PutObjectInput>,
661 ) -> S3Result<S3Response<PutObjectOutput>> {
662 let put_start = Instant::now();
663 let put_bucket = req.input.bucket.clone();
664 let put_key = req.input.key.clone();
665 let access_preamble = self.access_log_preamble(&req);
666 self.enforce_rate_limit(&req, &put_bucket)?;
667 self.enforce_policy(&req, "s3:PutObject", &put_bucket, Some(&put_key))?;
668 if let Some(blob) = req.input.body.take() {
669 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
672 .await
673 .map_err(internal("peek put sample"))?;
674 let sample_len = sample.len().min(SAMPLE_BYTES);
675 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
676
677 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
683 let (compressed, manifest, is_framed) = if use_framed {
684 let chained = chain_sample_with_rest(sample, rest_stream);
686 debug!(
687 bucket = ?req.input.bucket,
688 key = ?req.input.key,
689 codec = kind.as_str(),
690 path = "streaming-framed",
691 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
692 );
693 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
697 let (body, manifest) = streaming_compress_to_frames(
698 chained,
699 Arc::clone(&self.registry),
700 kind,
701 chunk_size,
702 )
703 .await
704 .map_err(internal("streaming framed compress"))?;
705 (body, manifest, true)
706 } else {
707 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
710 .await
711 .map_err(internal("collect put body (buffered path)"))?;
712 debug!(
713 bucket = ?req.input.bucket,
714 key = ?req.input.key,
715 bytes = bytes.len(),
716 codec = kind.as_str(),
717 path = "buffered",
718 "S4 put_object: compressing (buffered, raw blob)"
719 );
720 let (body, m) = self
721 .registry
722 .compress(bytes, kind)
723 .await
724 .map_err(internal("registry compress"))?;
725 (body, m, false)
726 };
727
728 write_manifest(&mut req.input.metadata, &manifest);
729 if is_framed {
730 req.input
732 .metadata
733 .get_or_insert_with(Default::default)
734 .insert(META_FRAMED.into(), "true".into());
735 }
736 req.input.content_length = Some(compressed.len() as i64);
740 req.input.checksum_algorithm = None;
745 req.input.checksum_crc32 = None;
746 req.input.checksum_crc32c = None;
747 req.input.checksum_crc64nvme = None;
748 req.input.checksum_sha1 = None;
749 req.input.checksum_sha256 = None;
750 req.input.content_md5 = None;
751 let original_size = manifest.original_size;
752 let compressed_size = manifest.compressed_size;
753 let codec_label = manifest.codec.as_str();
754 let sidecar_index = if is_framed {
757 s4_codec::index::build_index_from_body(&compressed).ok()
758 } else {
759 None
760 };
761 let body_to_send = if let Some(key) = self.sse_key.as_ref() {
766 req.input
767 .metadata
768 .get_or_insert_with(Default::default)
769 .insert("s4-encrypted".into(), "aes-256-gcm".into());
770 crate::sse::encrypt(key, &compressed)
771 } else {
772 compressed.clone()
773 };
774 req.input.body = Some(bytes_to_blob(body_to_send));
775 let backend_resp = self.backend.put_object(req).await;
776 if let Some(idx) = sidecar_index
777 && backend_resp.is_ok()
778 && idx.entries.len() > 1
779 {
780 self.write_sidecar(&put_bucket, &put_key, &idx).await;
783 }
784 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
786 crate::metrics::record_put(
787 codec_label,
788 original_size,
789 compressed_size,
790 elapsed.as_secs_f64(),
791 backend_resp.is_ok(),
792 );
793 self.record_access(
795 access_preamble,
796 "REST.PUT.OBJECT",
797 &put_bucket,
798 Some(&put_key),
799 if backend_resp.is_ok() { 200 } else { 500 },
800 compressed_size,
801 original_size,
802 elapsed.as_millis() as u64,
803 backend_resp.as_ref().err().map(|e| e.code().as_str()),
804 )
805 .await;
806 info!(
807 op = "put_object",
808 bucket = %put_bucket,
809 key = %put_key,
810 codec = codec_label,
811 bytes_in = original_size,
812 bytes_out = compressed_size,
813 ratio = format!(
814 "{:.3}",
815 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
816 ),
817 latency_ms = elapsed.as_millis() as u64,
818 ok = backend_resp.is_ok(),
819 "S4 put completed"
820 );
821 return backend_resp;
822 }
823 self.backend.put_object(req).await
824 }
825
826 #[tracing::instrument(
828 name = "s4.get_object",
829 skip(self, req),
830 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
831 )]
832 async fn get_object(
833 &self,
834 mut req: S3Request<GetObjectInput>,
835 ) -> S3Result<S3Response<GetObjectOutput>> {
836 let get_start = Instant::now();
837 let get_bucket = req.input.bucket.clone();
838 let get_key = req.input.key.clone();
839 self.enforce_rate_limit(&req, &get_bucket)?;
840 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
841 let range_request = req.input.range.take();
843
844 if let Some(ref r) = range_request
848 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
849 {
850 let total = index.total_original_size();
851 let (start, end_exclusive) = match resolve_range(r, total) {
852 Ok(v) => v,
853 Err(e) => {
854 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
855 }
856 };
857 if let Some(plan) = index.lookup_range(start, end_exclusive) {
858 return self
859 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
860 .await;
861 }
862 }
863 let mut resp = self.backend.get_object(req).await?;
864 let is_multipart = is_multipart_object(&resp.output.metadata);
865 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
866 let needs_frame_parse = is_multipart || is_framed_v2;
869 let manifest_opt = extract_manifest(&resp.output.metadata);
870
871 if !needs_frame_parse && manifest_opt.is_none() {
872 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
874 return Ok(resp);
875 }
876
877 if let Some(blob) = resp.output.body.take() {
878 let blob = if is_sse_encrypted(&resp.output.metadata) {
884 let key = self.sse_key.as_ref().ok_or_else(|| {
885 S3Error::with_message(
886 S3ErrorCode::InvalidRequest,
887 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
888 )
889 })?;
890 let body = collect_blob(blob, self.max_body_bytes)
891 .await
892 .map_err(internal("collect SSE-encrypted body"))?;
893 let plain = crate::sse::decrypt(key, &body).map_err(|e| {
894 S3Error::with_message(
895 S3ErrorCode::InternalError,
896 format!("SSE-S4 decrypt failed: {e}"),
897 )
898 })?;
899 bytes_to_blob(plain)
900 } else {
901 blob
902 };
903 if range_request.is_none()
911 && !needs_frame_parse
912 && let Some(ref m) = manifest_opt
913 && supports_streaming_decompress(m.codec)
914 && m.codec == CodecKind::CpuZstd
915 {
916 let decompressed_blob = cpu_zstd_decompress_stream(blob);
917 resp.output.content_length = Some(m.original_size as i64);
918 resp.output.checksum_crc32 = None;
919 resp.output.checksum_crc32c = None;
920 resp.output.checksum_crc64nvme = None;
921 resp.output.checksum_sha1 = None;
922 resp.output.checksum_sha256 = None;
923 resp.output.e_tag = None;
924 resp.output.body = Some(decompressed_blob);
925 let elapsed = get_start.elapsed();
926 crate::metrics::record_get(
927 m.codec.as_str(),
928 m.compressed_size,
929 m.original_size,
930 elapsed.as_secs_f64(),
931 true,
932 );
933 info!(
934 op = "get_object",
935 bucket = %get_bucket,
936 key = %get_key,
937 codec = m.codec.as_str(),
938 bytes_in = m.compressed_size,
939 bytes_out = m.original_size,
940 path = "streaming",
941 setup_latency_ms = elapsed.as_millis() as u64,
942 "S4 get started (streaming)"
943 );
944 return Ok(resp);
945 }
946 if range_request.is_none()
948 && !needs_frame_parse
949 && let Some(ref m) = manifest_opt
950 && m.codec == CodecKind::Passthrough
951 {
952 resp.output.content_length = Some(m.original_size as i64);
953 resp.output.checksum_crc32 = None;
954 resp.output.checksum_crc32c = None;
955 resp.output.checksum_crc64nvme = None;
956 resp.output.checksum_sha1 = None;
957 resp.output.checksum_sha256 = None;
958 resp.output.e_tag = None;
959 resp.output.body = Some(blob);
960 debug!("S4 get_object: passthrough streaming");
961 return Ok(resp);
962 }
963
964 let bytes = collect_blob(blob, self.max_body_bytes)
966 .await
967 .map_err(internal("collect get body"))?;
968
969 let decompressed = if needs_frame_parse {
970 self.decompress_multipart(bytes).await?
973 } else {
974 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
975 self.registry
976 .decompress(bytes, manifest)
977 .await
978 .map_err(internal("registry decompress"))?
979 };
980
981 let total_size = decompressed.len() as u64;
983 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
984 let (start, end) = resolve_range(r, total_size)
985 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
986 let sliced = decompressed.slice(start as usize..end as usize);
987 resp.output.content_range = Some(format!(
988 "bytes {start}-{}/{total_size}",
989 end.saturating_sub(1)
990 ));
991 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
992 } else {
993 (decompressed, None)
994 };
995 resp.output.content_length = Some(final_bytes.len() as i64);
998 resp.output.checksum_crc32 = None;
1003 resp.output.checksum_crc32c = None;
1004 resp.output.checksum_crc64nvme = None;
1005 resp.output.checksum_sha1 = None;
1006 resp.output.checksum_sha256 = None;
1007 resp.output.e_tag = None;
1008 let returned_size = final_bytes.len() as u64;
1009 let codec_label = manifest_opt
1010 .as_ref()
1011 .map(|m| m.codec.as_str())
1012 .unwrap_or("multipart");
1013 resp.output.body = Some(bytes_to_blob(final_bytes));
1014 if let Some(status) = status_override {
1015 resp.status = Some(status);
1016 }
1017 let elapsed = get_start.elapsed();
1018 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
1019 info!(
1020 op = "get_object",
1021 bucket = %get_bucket,
1022 key = %get_key,
1023 codec = codec_label,
1024 bytes_out = returned_size,
1025 total_object_size = total_size,
1026 range = range_request.is_some(),
1027 path = "buffered",
1028 latency_ms = elapsed.as_millis() as u64,
1029 "S4 get completed (buffered)"
1030 );
1031 }
1032 Ok(resp)
1033 }
1034
1035 async fn head_bucket(
1037 &self,
1038 req: S3Request<HeadBucketInput>,
1039 ) -> S3Result<S3Response<HeadBucketOutput>> {
1040 self.backend.head_bucket(req).await
1041 }
1042 async fn list_buckets(
1043 &self,
1044 req: S3Request<ListBucketsInput>,
1045 ) -> S3Result<S3Response<ListBucketsOutput>> {
1046 self.backend.list_buckets(req).await
1047 }
1048 async fn create_bucket(
1049 &self,
1050 req: S3Request<CreateBucketInput>,
1051 ) -> S3Result<S3Response<CreateBucketOutput>> {
1052 self.backend.create_bucket(req).await
1053 }
1054 async fn delete_bucket(
1055 &self,
1056 req: S3Request<DeleteBucketInput>,
1057 ) -> S3Result<S3Response<DeleteBucketOutput>> {
1058 self.backend.delete_bucket(req).await
1059 }
1060 async fn head_object(
1061 &self,
1062 req: S3Request<HeadObjectInput>,
1063 ) -> S3Result<S3Response<HeadObjectOutput>> {
1064 let mut resp = self.backend.head_object(req).await?;
1065 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
1066 resp.output.content_length = Some(manifest.original_size as i64);
1070 resp.output.checksum_crc32 = None;
1071 resp.output.checksum_crc32c = None;
1072 resp.output.checksum_crc64nvme = None;
1073 resp.output.checksum_sha1 = None;
1074 resp.output.checksum_sha256 = None;
1075 resp.output.e_tag = None;
1076 }
1077 Ok(resp)
1078 }
1079 async fn delete_object(
1080 &self,
1081 req: S3Request<DeleteObjectInput>,
1082 ) -> S3Result<S3Response<DeleteObjectOutput>> {
1083 let bucket = req.input.bucket.clone();
1084 let key = req.input.key.clone();
1085 self.enforce_rate_limit(&req, &bucket)?;
1086 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
1087 let resp = self.backend.delete_object(req).await?;
1089 let sidecar_input = DeleteObjectInput {
1090 bucket: bucket.clone(),
1091 key: sidecar_key(&key),
1092 ..Default::default()
1093 };
1094 let sidecar_req = S3Request {
1095 input: sidecar_input,
1096 method: http::Method::DELETE,
1097 uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
1098 headers: http::HeaderMap::new(),
1099 extensions: http::Extensions::new(),
1100 credentials: None,
1101 region: None,
1102 service: None,
1103 trailing_headers: None,
1104 };
1105 let _ = self.backend.delete_object(sidecar_req).await;
1106 Ok(resp)
1107 }
1108 async fn delete_objects(
1109 &self,
1110 req: S3Request<DeleteObjectsInput>,
1111 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
1112 self.backend.delete_objects(req).await
1113 }
1114 async fn copy_object(
1115 &self,
1116 mut req: S3Request<CopyObjectInput>,
1117 ) -> S3Result<S3Response<CopyObjectOutput>> {
1118 let dst_bucket = req.input.bucket.clone();
1120 let dst_key = req.input.key.clone();
1121 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
1122 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1123 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
1124 }
1125 let needs_merge = req
1135 .input
1136 .metadata_directive
1137 .as_ref()
1138 .map(|d| d.as_str() == MetadataDirective::REPLACE)
1139 .unwrap_or(false);
1140 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1141 let head_input = HeadObjectInput {
1142 bucket: bucket.to_string(),
1143 key: key.to_string(),
1144 ..Default::default()
1145 };
1146 let head_req = S3Request {
1147 input: head_input,
1148 method: req.method.clone(),
1149 uri: req.uri.clone(),
1150 headers: req.headers.clone(),
1151 extensions: http::Extensions::new(),
1152 credentials: req.credentials.clone(),
1153 region: req.region.clone(),
1154 service: req.service.clone(),
1155 trailing_headers: None,
1156 };
1157 if let Ok(head) = self.backend.head_object(head_req).await
1158 && let Some(src_meta) = head.output.metadata.as_ref()
1159 {
1160 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
1161 for key in [
1162 META_CODEC,
1163 META_ORIGINAL_SIZE,
1164 META_COMPRESSED_SIZE,
1165 META_CRC32C,
1166 META_MULTIPART,
1167 META_FRAMED,
1168 ] {
1169 if let Some(v) = src_meta.get(key) {
1170 dest_meta
1173 .entry(key.to_string())
1174 .or_insert_with(|| v.clone());
1175 }
1176 }
1177 debug!(
1178 src_bucket = %bucket,
1179 src_key = %key,
1180 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
1181 );
1182 }
1183 }
1184 self.backend.copy_object(req).await
1185 }
1186 async fn list_objects(
1187 &self,
1188 req: S3Request<ListObjectsInput>,
1189 ) -> S3Result<S3Response<ListObjectsOutput>> {
1190 self.enforce_rate_limit(&req, &req.input.bucket)?;
1191 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1192 let mut resp = self.backend.list_objects(req).await?;
1193 if let Some(contents) = resp.output.contents.as_mut() {
1195 contents.retain(|o| {
1196 o.key
1197 .as_ref()
1198 .map(|k| !k.ends_with(".s4index"))
1199 .unwrap_or(true)
1200 });
1201 }
1202 Ok(resp)
1203 }
1204 async fn list_objects_v2(
1205 &self,
1206 req: S3Request<ListObjectsV2Input>,
1207 ) -> S3Result<S3Response<ListObjectsV2Output>> {
1208 self.enforce_rate_limit(&req, &req.input.bucket)?;
1209 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1210 let mut resp = self.backend.list_objects_v2(req).await?;
1211 if let Some(contents) = resp.output.contents.as_mut() {
1212 let before = contents.len();
1213 contents.retain(|o| {
1214 o.key
1215 .as_ref()
1216 .map(|k| !k.ends_with(".s4index"))
1217 .unwrap_or(true)
1218 });
1219 if let Some(kc) = resp.output.key_count.as_mut() {
1221 *kc -= (before - contents.len()) as i32;
1222 }
1223 }
1224 Ok(resp)
1225 }
1226 async fn list_object_versions(
1232 &self,
1233 req: S3Request<ListObjectVersionsInput>,
1234 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1235 self.enforce_rate_limit(&req, &req.input.bucket)?;
1236 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1237 let mut resp = self.backend.list_object_versions(req).await?;
1238 if let Some(versions) = resp.output.versions.as_mut() {
1239 versions.retain(|v| {
1240 v.key
1241 .as_ref()
1242 .map(|k| !k.ends_with(".s4index"))
1243 .unwrap_or(true)
1244 });
1245 }
1246 if let Some(markers) = resp.output.delete_markers.as_mut() {
1247 markers.retain(|m| {
1248 m.key
1249 .as_ref()
1250 .map(|k| !k.ends_with(".s4index"))
1251 .unwrap_or(true)
1252 });
1253 }
1254 Ok(resp)
1255 }
1256
1257 async fn create_multipart_upload(
1258 &self,
1259 mut req: S3Request<CreateMultipartUploadInput>,
1260 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
1261 let codec_kind = self.registry.default_kind();
1265 let meta = req.input.metadata.get_or_insert_with(Default::default);
1266 meta.insert(META_MULTIPART.into(), "true".into());
1267 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
1268 debug!(
1269 bucket = ?req.input.bucket,
1270 key = ?req.input.key,
1271 codec = codec_kind.as_str(),
1272 "S4 create_multipart_upload: marking object for per-part compression"
1273 );
1274 self.backend.create_multipart_upload(req).await
1275 }
1276
1277 async fn upload_part(
1278 &self,
1279 mut req: S3Request<UploadPartInput>,
1280 ) -> S3Result<S3Response<UploadPartOutput>> {
1281 if let Some(blob) = req.input.body.take() {
1287 let bytes = collect_blob(blob, self.max_body_bytes)
1288 .await
1289 .map_err(internal("collect upload_part body"))?;
1290 let sample_len = bytes.len().min(SAMPLE_BYTES);
1291 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1292 let original_size = bytes.len() as u64;
1293 let (compressed, manifest) = self
1294 .registry
1295 .compress(bytes, codec_kind)
1296 .await
1297 .map_err(internal("registry compress part"))?;
1298 let header = FrameHeader {
1299 codec: codec_kind,
1300 original_size,
1301 compressed_size: compressed.len() as u64,
1302 crc32c: manifest.crc32c,
1303 };
1304 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1305 write_frame(&mut framed, header, &compressed);
1306 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1320 if !likely_final {
1321 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1322 }
1323 let framed_bytes = framed.freeze();
1324 let new_len = framed_bytes.len() as i64;
1325 req.input.content_length = Some(new_len);
1327 req.input.checksum_algorithm = None;
1328 req.input.checksum_crc32 = None;
1329 req.input.checksum_crc32c = None;
1330 req.input.checksum_crc64nvme = None;
1331 req.input.checksum_sha1 = None;
1332 req.input.checksum_sha256 = None;
1333 req.input.content_md5 = None;
1334 req.input.body = Some(bytes_to_blob(framed_bytes));
1335 debug!(
1336 part_number = ?req.input.part_number,
1337 upload_id = ?req.input.upload_id,
1338 original_size,
1339 framed_size = new_len,
1340 "S4 upload_part: framed compressed payload"
1341 );
1342 }
1343 self.backend.upload_part(req).await
1344 }
1345 async fn complete_multipart_upload(
1346 &self,
1347 req: S3Request<CompleteMultipartUploadInput>,
1348 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1349 let bucket = req.input.bucket.clone();
1350 let key = req.input.key.clone();
1351 let resp = self.backend.complete_multipart_upload(req).await?;
1352 let bucket_clone = bucket.clone();
1358 let key_clone = key.clone();
1359 let get_input = GetObjectInput {
1360 bucket: bucket_clone.clone(),
1361 key: key_clone.clone(),
1362 ..Default::default()
1363 };
1364 let get_req = S3Request {
1365 input: get_input,
1366 method: http::Method::GET,
1367 uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
1368 headers: http::HeaderMap::new(),
1369 extensions: http::Extensions::new(),
1370 credentials: None,
1371 region: None,
1372 service: None,
1373 trailing_headers: None,
1374 };
1375 if let Ok(get_resp) = self.backend.get_object(get_req).await
1376 && let Some(blob) = get_resp.output.body
1377 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
1378 && let Ok(index) = build_index_from_body(&body)
1379 {
1380 self.write_sidecar(&bucket, &key, &index).await;
1381 }
1382 Ok(resp)
1383 }
1384 async fn abort_multipart_upload(
1385 &self,
1386 req: S3Request<AbortMultipartUploadInput>,
1387 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1388 self.backend.abort_multipart_upload(req).await
1389 }
1390 async fn list_multipart_uploads(
1391 &self,
1392 req: S3Request<ListMultipartUploadsInput>,
1393 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
1394 self.backend.list_multipart_uploads(req).await
1395 }
1396 async fn list_parts(
1397 &self,
1398 req: S3Request<ListPartsInput>,
1399 ) -> S3Result<S3Response<ListPartsOutput>> {
1400 self.backend.list_parts(req).await
1401 }
1402
1403 async fn get_object_acl(
1419 &self,
1420 req: S3Request<GetObjectAclInput>,
1421 ) -> S3Result<S3Response<GetObjectAclOutput>> {
1422 self.backend.get_object_acl(req).await
1423 }
1424 async fn put_object_acl(
1425 &self,
1426 req: S3Request<PutObjectAclInput>,
1427 ) -> S3Result<S3Response<PutObjectAclOutput>> {
1428 self.backend.put_object_acl(req).await
1429 }
1430 async fn get_object_tagging(
1431 &self,
1432 req: S3Request<GetObjectTaggingInput>,
1433 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1434 self.backend.get_object_tagging(req).await
1435 }
1436 async fn put_object_tagging(
1437 &self,
1438 req: S3Request<PutObjectTaggingInput>,
1439 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1440 self.backend.put_object_tagging(req).await
1441 }
1442 async fn delete_object_tagging(
1443 &self,
1444 req: S3Request<DeleteObjectTaggingInput>,
1445 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1446 self.backend.delete_object_tagging(req).await
1447 }
1448 async fn get_object_attributes(
1449 &self,
1450 req: S3Request<GetObjectAttributesInput>,
1451 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1452 self.backend.get_object_attributes(req).await
1453 }
1454 async fn restore_object(
1455 &self,
1456 req: S3Request<RestoreObjectInput>,
1457 ) -> S3Result<S3Response<RestoreObjectOutput>> {
1458 self.backend.restore_object(req).await
1459 }
1460 async fn upload_part_copy(
1461 &self,
1462 req: S3Request<UploadPartCopyInput>,
1463 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1464 let CopySource::Bucket {
1475 bucket: src_bucket,
1476 key: src_key,
1477 ..
1478 } = &req.input.copy_source
1479 else {
1480 return self.backend.upload_part_copy(req).await;
1481 };
1482 let src_bucket = src_bucket.to_string();
1483 let src_key = src_key.to_string();
1484
1485 let head_input = HeadObjectInput {
1487 bucket: src_bucket.clone(),
1488 key: src_key.clone(),
1489 ..Default::default()
1490 };
1491 let head_req = S3Request {
1492 input: head_input,
1493 method: http::Method::HEAD,
1494 uri: req.uri.clone(),
1495 headers: req.headers.clone(),
1496 extensions: http::Extensions::new(),
1497 credentials: req.credentials.clone(),
1498 region: req.region.clone(),
1499 service: req.service.clone(),
1500 trailing_headers: None,
1501 };
1502 let needs_s4_copy = match self.backend.head_object(head_req).await {
1503 Ok(h) => {
1504 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
1505 }
1506 Err(_) => false,
1507 };
1508 if !needs_s4_copy {
1509 return self.backend.upload_part_copy(req).await;
1510 }
1511
1512 let source_range = req
1514 .input
1515 .copy_source_range
1516 .as_ref()
1517 .map(|r| parse_copy_source_range(r))
1518 .transpose()
1519 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1520
1521 let mut get_input = GetObjectInput {
1525 bucket: src_bucket.clone(),
1526 key: src_key.clone(),
1527 ..Default::default()
1528 };
1529 get_input.range = source_range;
1530 let get_req = S3Request {
1531 input: get_input,
1532 method: http::Method::GET,
1533 uri: req.uri.clone(),
1534 headers: req.headers.clone(),
1535 extensions: http::Extensions::new(),
1536 credentials: req.credentials.clone(),
1537 region: req.region.clone(),
1538 service: req.service.clone(),
1539 trailing_headers: None,
1540 };
1541 let get_resp = self.get_object(get_req).await?;
1542 let blob = get_resp.output.body.ok_or_else(|| {
1543 S3Error::with_message(
1544 S3ErrorCode::InternalError,
1545 "upload_part_copy: empty body from source GET",
1546 )
1547 })?;
1548 let bytes = collect_blob(blob, self.max_body_bytes)
1549 .await
1550 .map_err(internal("collect upload_part_copy source body"))?;
1551
1552 let sample_len = bytes.len().min(SAMPLE_BYTES);
1554 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1555 let original_size = bytes.len() as u64;
1556 let (compressed, manifest) = self
1557 .registry
1558 .compress(bytes, codec_kind)
1559 .await
1560 .map_err(internal("registry compress upload_part_copy"))?;
1561 let header = FrameHeader {
1562 codec: codec_kind,
1563 original_size,
1564 compressed_size: compressed.len() as u64,
1565 crc32c: manifest.crc32c,
1566 };
1567 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1568 write_frame(&mut framed, header, &compressed);
1569 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1570 if !likely_final {
1571 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1572 }
1573 let framed_bytes = framed.freeze();
1574 let framed_len = framed_bytes.len() as i64;
1575
1576 let part_input = UploadPartInput {
1578 bucket: req.input.bucket.clone(),
1579 key: req.input.key.clone(),
1580 part_number: req.input.part_number,
1581 upload_id: req.input.upload_id.clone(),
1582 body: Some(bytes_to_blob(framed_bytes)),
1583 content_length: Some(framed_len),
1584 ..Default::default()
1585 };
1586 let part_req = S3Request {
1587 input: part_input,
1588 method: http::Method::PUT,
1589 uri: req.uri.clone(),
1590 headers: req.headers.clone(),
1591 extensions: http::Extensions::new(),
1592 credentials: req.credentials.clone(),
1593 region: req.region.clone(),
1594 service: req.service.clone(),
1595 trailing_headers: None,
1596 };
1597 let upload_resp = self.backend.upload_part(part_req).await?;
1598
1599 let copy_output = UploadPartCopyOutput {
1600 copy_part_result: Some(CopyPartResult {
1601 e_tag: upload_resp.output.e_tag.clone(),
1602 ..Default::default()
1603 }),
1604 ..Default::default()
1605 };
1606 Ok(S3Response::new(copy_output))
1607 }
1608
1609 async fn get_object_lock_configuration(
1611 &self,
1612 req: S3Request<GetObjectLockConfigurationInput>,
1613 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1614 self.backend.get_object_lock_configuration(req).await
1615 }
1616 async fn put_object_lock_configuration(
1617 &self,
1618 req: S3Request<PutObjectLockConfigurationInput>,
1619 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1620 self.backend.put_object_lock_configuration(req).await
1621 }
1622 async fn get_object_legal_hold(
1623 &self,
1624 req: S3Request<GetObjectLegalHoldInput>,
1625 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1626 self.backend.get_object_legal_hold(req).await
1627 }
1628 async fn put_object_legal_hold(
1629 &self,
1630 req: S3Request<PutObjectLegalHoldInput>,
1631 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1632 self.backend.put_object_legal_hold(req).await
1633 }
1634 async fn get_object_retention(
1635 &self,
1636 req: S3Request<GetObjectRetentionInput>,
1637 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1638 self.backend.get_object_retention(req).await
1639 }
1640 async fn put_object_retention(
1641 &self,
1642 req: S3Request<PutObjectRetentionInput>,
1643 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1644 self.backend.put_object_retention(req).await
1645 }
1646
1647 async fn get_bucket_versioning(
1652 &self,
1653 req: S3Request<GetBucketVersioningInput>,
1654 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1655 self.backend.get_bucket_versioning(req).await
1656 }
1657 async fn put_bucket_versioning(
1658 &self,
1659 req: S3Request<PutBucketVersioningInput>,
1660 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1661 self.backend.put_bucket_versioning(req).await
1662 }
1663
1664 async fn get_bucket_location(
1666 &self,
1667 req: S3Request<GetBucketLocationInput>,
1668 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1669 self.backend.get_bucket_location(req).await
1670 }
1671
1672 async fn get_bucket_policy(
1674 &self,
1675 req: S3Request<GetBucketPolicyInput>,
1676 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1677 self.backend.get_bucket_policy(req).await
1678 }
1679 async fn put_bucket_policy(
1680 &self,
1681 req: S3Request<PutBucketPolicyInput>,
1682 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1683 self.backend.put_bucket_policy(req).await
1684 }
1685 async fn delete_bucket_policy(
1686 &self,
1687 req: S3Request<DeleteBucketPolicyInput>,
1688 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1689 self.backend.delete_bucket_policy(req).await
1690 }
1691 async fn get_bucket_policy_status(
1692 &self,
1693 req: S3Request<GetBucketPolicyStatusInput>,
1694 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1695 self.backend.get_bucket_policy_status(req).await
1696 }
1697
1698 async fn get_bucket_acl(
1700 &self,
1701 req: S3Request<GetBucketAclInput>,
1702 ) -> S3Result<S3Response<GetBucketAclOutput>> {
1703 self.backend.get_bucket_acl(req).await
1704 }
1705 async fn put_bucket_acl(
1706 &self,
1707 req: S3Request<PutBucketAclInput>,
1708 ) -> S3Result<S3Response<PutBucketAclOutput>> {
1709 self.backend.put_bucket_acl(req).await
1710 }
1711
1712 async fn get_bucket_cors(
1714 &self,
1715 req: S3Request<GetBucketCorsInput>,
1716 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1717 self.backend.get_bucket_cors(req).await
1718 }
1719 async fn put_bucket_cors(
1720 &self,
1721 req: S3Request<PutBucketCorsInput>,
1722 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1723 self.backend.put_bucket_cors(req).await
1724 }
1725 async fn delete_bucket_cors(
1726 &self,
1727 req: S3Request<DeleteBucketCorsInput>,
1728 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1729 self.backend.delete_bucket_cors(req).await
1730 }
1731
1732 async fn get_bucket_lifecycle_configuration(
1734 &self,
1735 req: S3Request<GetBucketLifecycleConfigurationInput>,
1736 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1737 self.backend.get_bucket_lifecycle_configuration(req).await
1738 }
1739 async fn put_bucket_lifecycle_configuration(
1740 &self,
1741 req: S3Request<PutBucketLifecycleConfigurationInput>,
1742 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1743 self.backend.put_bucket_lifecycle_configuration(req).await
1744 }
1745 async fn delete_bucket_lifecycle(
1746 &self,
1747 req: S3Request<DeleteBucketLifecycleInput>,
1748 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1749 self.backend.delete_bucket_lifecycle(req).await
1750 }
1751
1752 async fn get_bucket_tagging(
1754 &self,
1755 req: S3Request<GetBucketTaggingInput>,
1756 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1757 self.backend.get_bucket_tagging(req).await
1758 }
1759 async fn put_bucket_tagging(
1760 &self,
1761 req: S3Request<PutBucketTaggingInput>,
1762 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1763 self.backend.put_bucket_tagging(req).await
1764 }
1765 async fn delete_bucket_tagging(
1766 &self,
1767 req: S3Request<DeleteBucketTaggingInput>,
1768 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1769 self.backend.delete_bucket_tagging(req).await
1770 }
1771
1772 async fn get_bucket_encryption(
1774 &self,
1775 req: S3Request<GetBucketEncryptionInput>,
1776 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1777 self.backend.get_bucket_encryption(req).await
1778 }
1779 async fn put_bucket_encryption(
1780 &self,
1781 req: S3Request<PutBucketEncryptionInput>,
1782 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1783 self.backend.put_bucket_encryption(req).await
1784 }
1785 async fn delete_bucket_encryption(
1786 &self,
1787 req: S3Request<DeleteBucketEncryptionInput>,
1788 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1789 self.backend.delete_bucket_encryption(req).await
1790 }
1791
1792 async fn get_bucket_logging(
1794 &self,
1795 req: S3Request<GetBucketLoggingInput>,
1796 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1797 self.backend.get_bucket_logging(req).await
1798 }
1799 async fn put_bucket_logging(
1800 &self,
1801 req: S3Request<PutBucketLoggingInput>,
1802 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1803 self.backend.put_bucket_logging(req).await
1804 }
1805
1806 async fn get_bucket_notification_configuration(
1808 &self,
1809 req: S3Request<GetBucketNotificationConfigurationInput>,
1810 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1811 self.backend
1812 .get_bucket_notification_configuration(req)
1813 .await
1814 }
1815 async fn put_bucket_notification_configuration(
1816 &self,
1817 req: S3Request<PutBucketNotificationConfigurationInput>,
1818 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1819 self.backend
1820 .put_bucket_notification_configuration(req)
1821 .await
1822 }
1823
1824 async fn get_bucket_request_payment(
1826 &self,
1827 req: S3Request<GetBucketRequestPaymentInput>,
1828 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1829 self.backend.get_bucket_request_payment(req).await
1830 }
1831 async fn put_bucket_request_payment(
1832 &self,
1833 req: S3Request<PutBucketRequestPaymentInput>,
1834 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1835 self.backend.put_bucket_request_payment(req).await
1836 }
1837
1838 async fn get_bucket_website(
1840 &self,
1841 req: S3Request<GetBucketWebsiteInput>,
1842 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1843 self.backend.get_bucket_website(req).await
1844 }
1845 async fn put_bucket_website(
1846 &self,
1847 req: S3Request<PutBucketWebsiteInput>,
1848 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1849 self.backend.put_bucket_website(req).await
1850 }
1851 async fn delete_bucket_website(
1852 &self,
1853 req: S3Request<DeleteBucketWebsiteInput>,
1854 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1855 self.backend.delete_bucket_website(req).await
1856 }
1857
1858 async fn get_bucket_replication(
1860 &self,
1861 req: S3Request<GetBucketReplicationInput>,
1862 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1863 self.backend.get_bucket_replication(req).await
1864 }
1865 async fn put_bucket_replication(
1866 &self,
1867 req: S3Request<PutBucketReplicationInput>,
1868 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1869 self.backend.put_bucket_replication(req).await
1870 }
1871 async fn delete_bucket_replication(
1872 &self,
1873 req: S3Request<DeleteBucketReplicationInput>,
1874 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1875 self.backend.delete_bucket_replication(req).await
1876 }
1877
1878 async fn get_bucket_accelerate_configuration(
1880 &self,
1881 req: S3Request<GetBucketAccelerateConfigurationInput>,
1882 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1883 self.backend.get_bucket_accelerate_configuration(req).await
1884 }
1885 async fn put_bucket_accelerate_configuration(
1886 &self,
1887 req: S3Request<PutBucketAccelerateConfigurationInput>,
1888 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1889 self.backend.put_bucket_accelerate_configuration(req).await
1890 }
1891
1892 async fn get_bucket_ownership_controls(
1894 &self,
1895 req: S3Request<GetBucketOwnershipControlsInput>,
1896 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1897 self.backend.get_bucket_ownership_controls(req).await
1898 }
1899 async fn put_bucket_ownership_controls(
1900 &self,
1901 req: S3Request<PutBucketOwnershipControlsInput>,
1902 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1903 self.backend.put_bucket_ownership_controls(req).await
1904 }
1905 async fn delete_bucket_ownership_controls(
1906 &self,
1907 req: S3Request<DeleteBucketOwnershipControlsInput>,
1908 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1909 self.backend.delete_bucket_ownership_controls(req).await
1910 }
1911
1912 async fn get_public_access_block(
1914 &self,
1915 req: S3Request<GetPublicAccessBlockInput>,
1916 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1917 self.backend.get_public_access_block(req).await
1918 }
1919 async fn put_public_access_block(
1920 &self,
1921 req: S3Request<PutPublicAccessBlockInput>,
1922 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1923 self.backend.put_public_access_block(req).await
1924 }
1925 async fn delete_public_access_block(
1926 &self,
1927 req: S3Request<DeletePublicAccessBlockInput>,
1928 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1929 self.backend.delete_public_access_block(req).await
1930 }
1931}
1932
1933#[cfg(test)]
1934mod tests {
1935 use super::*;
1936
1937 #[test]
1938 fn manifest_roundtrip_via_metadata() {
1939 let original = ChunkManifest {
1940 codec: CodecKind::CpuZstd,
1941 original_size: 1234,
1942 compressed_size: 567,
1943 crc32c: 0xdead_beef,
1944 };
1945 let mut meta: Option<Metadata> = None;
1946 write_manifest(&mut meta, &original);
1947 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1948 assert_eq!(extracted.codec, original.codec);
1949 assert_eq!(extracted.original_size, original.original_size);
1950 assert_eq!(extracted.compressed_size, original.compressed_size);
1951 assert_eq!(extracted.crc32c, original.crc32c);
1952 }
1953
1954 #[test]
1955 fn missing_metadata_yields_none() {
1956 let meta: Option<Metadata> = None;
1957 assert!(extract_manifest(&meta).is_none());
1958 }
1959
1960 #[test]
1961 fn partial_metadata_yields_none() {
1962 let mut meta = Metadata::new();
1963 meta.insert(META_CODEC.into(), "cpu-zstd".into());
1964 let opt = Some(meta);
1965 assert!(extract_manifest(&opt).is_none());
1966 }
1967
1968 #[test]
1969 fn parse_copy_source_range_basic() {
1970 let r = parse_copy_source_range("bytes=10-20").unwrap();
1971 match r {
1972 s3s::dto::Range::Int { first, last } => {
1973 assert_eq!(first, 10);
1974 assert_eq!(last, Some(20));
1975 }
1976 _ => panic!("expected Int range"),
1977 }
1978 }
1979
1980 #[test]
1981 fn parse_copy_source_range_rejects_inverted() {
1982 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
1983 assert!(err.contains("last < first"));
1984 }
1985
1986 #[test]
1987 fn parse_copy_source_range_rejects_missing_prefix() {
1988 let err = parse_copy_source_range("10-20").unwrap_err();
1989 assert!(err.contains("must start with 'bytes='"));
1990 }
1991
1992 #[test]
1993 fn parse_copy_source_range_rejects_open_ended() {
1994 assert!(parse_copy_source_range("bytes=10-").is_err());
1997 assert!(parse_copy_source_range("bytes=-10").is_err());
1998 }
1999}