1use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39 write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49 DEFAULT_S4F2_CHUNK_SIZE, cpu_zstd_decompress_stream, streaming_compress_to_frames,
50 supports_streaming_compress, supports_streaming_decompress,
51};
52
53const SAMPLE_BYTES: usize = 4096;
55
56pub struct S4Service<B: S3> {
57 backend: B,
58 registry: Arc<CodecRegistry>,
59 dispatcher: Arc<dyn CodecDispatcher>,
60 max_body_bytes: usize,
61 policy: Option<crate::policy::SharedPolicy>,
62}
63
64impl<B: S3> S4Service<B> {
65 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
67
68 pub fn new(
69 backend: B,
70 registry: Arc<CodecRegistry>,
71 dispatcher: Arc<dyn CodecDispatcher>,
72 ) -> Self {
73 Self {
74 backend,
75 registry,
76 dispatcher,
77 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
78 policy: None,
79 }
80 }
81
82 #[must_use]
83 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
84 self.max_body_bytes = n;
85 self
86 }
87
88 #[must_use]
93 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
94 self.policy = Some(policy);
95 self
96 }
97
98 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
101 req.credentials.as_ref().map(|c| c.access_key.as_str())
102 }
103
104 fn enforce_policy(
109 &self,
110 action: &'static str,
111 bucket: &str,
112 key: Option<&str>,
113 principal_id: Option<&str>,
114 ) -> S3Result<()> {
115 let Some(policy) = self.policy.as_ref() else {
116 return Ok(());
117 };
118 let decision = policy.evaluate(action, bucket, key, principal_id);
119 if decision.allow {
120 Ok(())
121 } else {
122 crate::metrics::record_policy_denial(action, bucket);
123 tracing::info!(
124 action,
125 bucket,
126 key = ?key,
127 principal = ?principal_id,
128 matched_sid = ?decision.matched_sid,
129 effect = ?decision.matched_effect,
130 "S4 policy denied request"
131 );
132 Err(S3Error::with_message(
133 S3ErrorCode::AccessDenied,
134 format!("denied by S4 policy: {action} on bucket={bucket}"),
135 ))
136 }
137 }
138
139 pub fn into_backend(self) -> B {
141 self.backend
142 }
143
144 async fn partial_range_get(
147 &self,
148 req: &S3Request<GetObjectInput>,
149 plan: s4_codec::index::RangePlan,
150 client_start: u64,
151 client_end_exclusive: u64,
152 total_original: u64,
153 get_start: Instant,
154 ) -> S3Result<S3Response<GetObjectOutput>> {
155 let backend_range = s3s::dto::Range::Int {
157 first: plan.byte_start,
158 last: Some(plan.byte_end_exclusive - 1),
159 };
160 let backend_input = GetObjectInput {
161 bucket: req.input.bucket.clone(),
162 key: req.input.key.clone(),
163 range: Some(backend_range),
164 ..Default::default()
165 };
166 let backend_req = S3Request {
167 input: backend_input,
168 method: req.method.clone(),
169 uri: req.uri.clone(),
170 headers: req.headers.clone(),
171 extensions: http::Extensions::new(),
172 credentials: req.credentials.clone(),
173 region: req.region.clone(),
174 service: req.service.clone(),
175 trailing_headers: None,
176 };
177 let mut backend_resp = self.backend.get_object(backend_req).await?;
178 let blob = backend_resp.output.body.take().ok_or_else(|| {
179 S3Error::with_message(
180 S3ErrorCode::InternalError,
181 "backend partial GET returned empty body",
182 )
183 })?;
184 let bytes = collect_blob(blob, self.max_body_bytes)
185 .await
186 .map_err(internal("collect partial body"))?;
187
188 let mut combined = BytesMut::new();
190 for frame in FrameIter::new(bytes) {
191 let (header, payload) = frame.map_err(|e| {
192 S3Error::with_message(
193 S3ErrorCode::InternalError,
194 format!("partial-range frame parse: {e}"),
195 )
196 })?;
197 let chunk_manifest = ChunkManifest {
198 codec: header.codec,
199 original_size: header.original_size,
200 compressed_size: header.compressed_size,
201 crc32c: header.crc32c,
202 };
203 let decompressed = self
204 .registry
205 .decompress(payload, &chunk_manifest)
206 .await
207 .map_err(internal("partial-range decompress"))?;
208 combined.extend_from_slice(&decompressed);
209 }
210 let combined = combined.freeze();
211 let sliced = combined
212 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
213
214 let returned_size = sliced.len() as u64;
216 backend_resp.output.content_length = Some(returned_size as i64);
217 backend_resp.output.content_range = Some(format!(
218 "bytes {client_start}-{}/{total_original}",
219 client_end_exclusive - 1
220 ));
221 backend_resp.output.checksum_crc32 = None;
222 backend_resp.output.checksum_crc32c = None;
223 backend_resp.output.checksum_crc64nvme = None;
224 backend_resp.output.checksum_sha1 = None;
225 backend_resp.output.checksum_sha256 = None;
226 backend_resp.output.e_tag = None;
227 backend_resp.output.body = Some(bytes_to_blob(sliced));
228 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
229
230 let elapsed = get_start.elapsed();
231 crate::metrics::record_get(
232 "partial",
233 plan.byte_end_exclusive - plan.byte_start,
234 returned_size,
235 elapsed.as_secs_f64(),
236 true,
237 );
238 info!(
239 op = "get_object",
240 bucket = %req.input.bucket,
241 key = %req.input.key,
242 bytes_in = plan.byte_end_exclusive - plan.byte_start,
243 bytes_out = returned_size,
244 total_object_size = total_original,
245 range = true,
246 path = "sidecar-partial",
247 latency_ms = elapsed.as_millis() as u64,
248 "S4 partial Range GET via sidecar index"
249 );
250 Ok(backend_resp)
251 }
252
253 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
257 let bytes = encode_index(index);
258 let len = bytes.len() as i64;
259 let put_input = PutObjectInput {
260 bucket: bucket.into(),
261 key: sidecar_key(key),
262 body: Some(bytes_to_blob(bytes)),
263 content_length: Some(len),
264 content_type: Some("application/x-s4-index".into()),
265 ..Default::default()
266 };
267 let put_req = S3Request {
268 input: put_input,
269 method: http::Method::PUT,
270 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
271 headers: http::HeaderMap::new(),
272 extensions: http::Extensions::new(),
273 credentials: None,
274 region: None,
275 service: None,
276 trailing_headers: None,
277 };
278 if let Err(e) = self.backend.put_object(put_req).await {
279 tracing::warn!(
280 bucket,
281 key,
282 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
283 );
284 }
285 }
286
287 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
289 let get_input = GetObjectInput {
290 bucket: bucket.into(),
291 key: sidecar_key(key),
292 ..Default::default()
293 };
294 let get_req = S3Request {
295 input: get_input,
296 method: http::Method::GET,
297 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
298 headers: http::HeaderMap::new(),
299 extensions: http::Extensions::new(),
300 credentials: None,
301 region: None,
302 service: None,
303 trailing_headers: None,
304 };
305 let resp = self.backend.get_object(get_req).await.ok()?;
306 let blob = resp.output.body?;
307 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
308 decode_index(bytes).ok()
309 }
310
311 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
317 let mut out = BytesMut::new();
318 for frame in FrameIter::new(bytes) {
319 let (header, payload) = frame.map_err(|e| {
320 S3Error::with_message(
321 S3ErrorCode::InternalError,
322 format!("multipart frame parse: {e}"),
323 )
324 })?;
325 let chunk_manifest = ChunkManifest {
326 codec: header.codec,
327 original_size: header.original_size,
328 compressed_size: header.compressed_size,
329 crc32c: header.crc32c,
330 };
331 let decompressed = self
332 .registry
333 .decompress(payload, &chunk_manifest)
334 .await
335 .map_err(internal("multipart frame decompress"))?;
336 out.extend_from_slice(&decompressed);
337 }
338 Ok(out.freeze())
339 }
340}
341
342fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
347 let rest = s
348 .strip_prefix("bytes=")
349 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
350 let (a, b) = rest
351 .split_once('-')
352 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
353 let first: u64 = a
354 .parse()
355 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
356 let last: u64 = b
357 .parse()
358 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
359 if last < first {
360 return Err(format!("CopySourceRange last < first: {s:?}"));
361 }
362 Ok(s3s::dto::Range::Int {
363 first,
364 last: Some(last),
365 })
366}
367
368fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
369 metadata
370 .as_ref()
371 .and_then(|m| m.get(META_MULTIPART))
372 .map(|v| v == "true")
373 .unwrap_or(false)
374}
375
376const META_CODEC: &str = "s4-codec";
377const META_ORIGINAL_SIZE: &str = "s4-original-size";
378const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
379const META_CRC32C: &str = "s4-crc32c";
380const META_MULTIPART: &str = "s4-multipart";
383const META_FRAMED: &str = "s4-framed";
387
388fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
389 metadata
390 .as_ref()
391 .and_then(|m| m.get(META_FRAMED))
392 .map(|v| v == "true")
393 .unwrap_or(false)
394}
395
396fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
397 let m = metadata.as_ref()?;
398 let codec = m
399 .get(META_CODEC)
400 .and_then(|s| s.parse::<CodecKind>().ok())?;
401 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
402 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
403 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
404 Some(ChunkManifest {
405 codec,
406 original_size,
407 compressed_size,
408 crc32c,
409 })
410}
411
412fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
413 let meta = metadata.get_or_insert_with(Default::default);
414 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
415 meta.insert(
416 META_ORIGINAL_SIZE.into(),
417 manifest.original_size.to_string(),
418 );
419 meta.insert(
420 META_COMPRESSED_SIZE.into(),
421 manifest.compressed_size.to_string(),
422 );
423 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
424}
425
426fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
427 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
428}
429
430pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
434 if total == 0 {
435 return Err("cannot range-get zero-length object".into());
436 }
437 match range {
438 s3s::dto::Range::Int { first, last } => {
439 let start = *first;
440 let end_inclusive = match last {
441 Some(l) => (*l).min(total - 1),
442 None => total - 1,
443 };
444 if start > end_inclusive || start >= total {
445 return Err(format!(
446 "range bytes={start}-{:?} out of object size {total}",
447 last
448 ));
449 }
450 Ok((start, end_inclusive + 1))
451 }
452 s3s::dto::Range::Suffix { length } => {
453 let len = (*length).min(total);
454 Ok((total - len, total))
455 }
456 }
457}
458
459#[async_trait::async_trait]
460impl<B: S3> S3 for S4Service<B> {
461 #[tracing::instrument(
463 name = "s4.put_object",
464 skip(self, req),
465 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
466 )]
467 async fn put_object(
468 &self,
469 mut req: S3Request<PutObjectInput>,
470 ) -> S3Result<S3Response<PutObjectOutput>> {
471 let put_start = Instant::now();
472 let put_bucket = req.input.bucket.clone();
473 let put_key = req.input.key.clone();
474 self.enforce_policy(
475 "s3:PutObject",
476 &put_bucket,
477 Some(&put_key),
478 Self::principal_of(&req),
479 )?;
480 if let Some(blob) = req.input.body.take() {
481 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
484 .await
485 .map_err(internal("peek put sample"))?;
486 let sample_len = sample.len().min(SAMPLE_BYTES);
487 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
488
489 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
495 let (compressed, manifest, is_framed) = if use_framed {
496 let chained = chain_sample_with_rest(sample, rest_stream);
498 debug!(
499 bucket = ?req.input.bucket,
500 key = ?req.input.key,
501 codec = kind.as_str(),
502 path = "streaming-framed",
503 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
504 );
505 let (body, manifest) = streaming_compress_to_frames(
506 chained,
507 Arc::clone(&self.registry),
508 kind,
509 DEFAULT_S4F2_CHUNK_SIZE,
510 )
511 .await
512 .map_err(internal("streaming framed compress"))?;
513 (body, manifest, true)
514 } else {
515 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
518 .await
519 .map_err(internal("collect put body (buffered path)"))?;
520 debug!(
521 bucket = ?req.input.bucket,
522 key = ?req.input.key,
523 bytes = bytes.len(),
524 codec = kind.as_str(),
525 path = "buffered",
526 "S4 put_object: compressing (buffered, raw blob)"
527 );
528 let (body, m) = self
529 .registry
530 .compress(bytes, kind)
531 .await
532 .map_err(internal("registry compress"))?;
533 (body, m, false)
534 };
535
536 write_manifest(&mut req.input.metadata, &manifest);
537 if is_framed {
538 req.input
540 .metadata
541 .get_or_insert_with(Default::default)
542 .insert(META_FRAMED.into(), "true".into());
543 }
544 req.input.content_length = Some(compressed.len() as i64);
548 req.input.checksum_algorithm = None;
553 req.input.checksum_crc32 = None;
554 req.input.checksum_crc32c = None;
555 req.input.checksum_crc64nvme = None;
556 req.input.checksum_sha1 = None;
557 req.input.checksum_sha256 = None;
558 req.input.content_md5 = None;
559 let original_size = manifest.original_size;
560 let compressed_size = manifest.compressed_size;
561 let codec_label = manifest.codec.as_str();
562 let sidecar_index = if is_framed {
565 s4_codec::index::build_index_from_body(&compressed).ok()
566 } else {
567 None
568 };
569 req.input.body = Some(bytes_to_blob(compressed));
570 let backend_resp = self.backend.put_object(req).await;
571 if let Some(idx) = sidecar_index
572 && backend_resp.is_ok()
573 && idx.entries.len() > 1
574 {
575 self.write_sidecar(&put_bucket, &put_key, &idx).await;
578 }
579 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
581 crate::metrics::record_put(
582 codec_label,
583 original_size,
584 compressed_size,
585 elapsed.as_secs_f64(),
586 backend_resp.is_ok(),
587 );
588 info!(
589 op = "put_object",
590 bucket = %put_bucket,
591 key = %put_key,
592 codec = codec_label,
593 bytes_in = original_size,
594 bytes_out = compressed_size,
595 ratio = format!(
596 "{:.3}",
597 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
598 ),
599 latency_ms = elapsed.as_millis() as u64,
600 ok = backend_resp.is_ok(),
601 "S4 put completed"
602 );
603 return backend_resp;
604 }
605 self.backend.put_object(req).await
606 }
607
608 #[tracing::instrument(
610 name = "s4.get_object",
611 skip(self, req),
612 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
613 )]
614 async fn get_object(
615 &self,
616 mut req: S3Request<GetObjectInput>,
617 ) -> S3Result<S3Response<GetObjectOutput>> {
618 let get_start = Instant::now();
619 let get_bucket = req.input.bucket.clone();
620 let get_key = req.input.key.clone();
621 self.enforce_policy(
622 "s3:GetObject",
623 &get_bucket,
624 Some(&get_key),
625 Self::principal_of(&req),
626 )?;
627 let range_request = req.input.range.take();
629
630 if let Some(ref r) = range_request
634 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
635 {
636 let total = index.total_original_size();
637 let (start, end_exclusive) = match resolve_range(r, total) {
638 Ok(v) => v,
639 Err(e) => {
640 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
641 }
642 };
643 if let Some(plan) = index.lookup_range(start, end_exclusive) {
644 return self
645 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
646 .await;
647 }
648 }
649 let mut resp = self.backend.get_object(req).await?;
650 let is_multipart = is_multipart_object(&resp.output.metadata);
651 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
652 let needs_frame_parse = is_multipart || is_framed_v2;
655 let manifest_opt = extract_manifest(&resp.output.metadata);
656
657 if !needs_frame_parse && manifest_opt.is_none() {
658 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
660 return Ok(resp);
661 }
662
663 if let Some(blob) = resp.output.body.take() {
664 if range_request.is_none()
672 && !needs_frame_parse
673 && let Some(ref m) = manifest_opt
674 && supports_streaming_decompress(m.codec)
675 && m.codec == CodecKind::CpuZstd
676 {
677 let decompressed_blob = cpu_zstd_decompress_stream(blob);
678 resp.output.content_length = Some(m.original_size as i64);
679 resp.output.checksum_crc32 = None;
680 resp.output.checksum_crc32c = None;
681 resp.output.checksum_crc64nvme = None;
682 resp.output.checksum_sha1 = None;
683 resp.output.checksum_sha256 = None;
684 resp.output.e_tag = None;
685 resp.output.body = Some(decompressed_blob);
686 let elapsed = get_start.elapsed();
687 crate::metrics::record_get(
688 m.codec.as_str(),
689 m.compressed_size,
690 m.original_size,
691 elapsed.as_secs_f64(),
692 true,
693 );
694 info!(
695 op = "get_object",
696 bucket = %get_bucket,
697 key = %get_key,
698 codec = m.codec.as_str(),
699 bytes_in = m.compressed_size,
700 bytes_out = m.original_size,
701 path = "streaming",
702 setup_latency_ms = elapsed.as_millis() as u64,
703 "S4 get started (streaming)"
704 );
705 return Ok(resp);
706 }
707 if range_request.is_none()
709 && !needs_frame_parse
710 && let Some(ref m) = manifest_opt
711 && m.codec == CodecKind::Passthrough
712 {
713 resp.output.content_length = Some(m.original_size as i64);
714 resp.output.checksum_crc32 = None;
715 resp.output.checksum_crc32c = None;
716 resp.output.checksum_crc64nvme = None;
717 resp.output.checksum_sha1 = None;
718 resp.output.checksum_sha256 = None;
719 resp.output.e_tag = None;
720 resp.output.body = Some(blob);
721 debug!("S4 get_object: passthrough streaming");
722 return Ok(resp);
723 }
724
725 let bytes = collect_blob(blob, self.max_body_bytes)
727 .await
728 .map_err(internal("collect get body"))?;
729
730 let decompressed = if needs_frame_parse {
731 self.decompress_multipart(bytes).await?
734 } else {
735 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
736 self.registry
737 .decompress(bytes, manifest)
738 .await
739 .map_err(internal("registry decompress"))?
740 };
741
742 let total_size = decompressed.len() as u64;
744 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
745 let (start, end) = resolve_range(r, total_size)
746 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
747 let sliced = decompressed.slice(start as usize..end as usize);
748 resp.output.content_range = Some(format!(
749 "bytes {start}-{}/{total_size}",
750 end.saturating_sub(1)
751 ));
752 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
753 } else {
754 (decompressed, None)
755 };
756 resp.output.content_length = Some(final_bytes.len() as i64);
759 resp.output.checksum_crc32 = None;
764 resp.output.checksum_crc32c = None;
765 resp.output.checksum_crc64nvme = None;
766 resp.output.checksum_sha1 = None;
767 resp.output.checksum_sha256 = None;
768 resp.output.e_tag = None;
769 let returned_size = final_bytes.len() as u64;
770 let codec_label = manifest_opt
771 .as_ref()
772 .map(|m| m.codec.as_str())
773 .unwrap_or("multipart");
774 resp.output.body = Some(bytes_to_blob(final_bytes));
775 if let Some(status) = status_override {
776 resp.status = Some(status);
777 }
778 let elapsed = get_start.elapsed();
779 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
780 info!(
781 op = "get_object",
782 bucket = %get_bucket,
783 key = %get_key,
784 codec = codec_label,
785 bytes_out = returned_size,
786 total_object_size = total_size,
787 range = range_request.is_some(),
788 path = "buffered",
789 latency_ms = elapsed.as_millis() as u64,
790 "S4 get completed (buffered)"
791 );
792 }
793 Ok(resp)
794 }
795
796 async fn head_bucket(
798 &self,
799 req: S3Request<HeadBucketInput>,
800 ) -> S3Result<S3Response<HeadBucketOutput>> {
801 self.backend.head_bucket(req).await
802 }
803 async fn list_buckets(
804 &self,
805 req: S3Request<ListBucketsInput>,
806 ) -> S3Result<S3Response<ListBucketsOutput>> {
807 self.backend.list_buckets(req).await
808 }
809 async fn create_bucket(
810 &self,
811 req: S3Request<CreateBucketInput>,
812 ) -> S3Result<S3Response<CreateBucketOutput>> {
813 self.backend.create_bucket(req).await
814 }
815 async fn delete_bucket(
816 &self,
817 req: S3Request<DeleteBucketInput>,
818 ) -> S3Result<S3Response<DeleteBucketOutput>> {
819 self.backend.delete_bucket(req).await
820 }
821 async fn head_object(
822 &self,
823 req: S3Request<HeadObjectInput>,
824 ) -> S3Result<S3Response<HeadObjectOutput>> {
825 let mut resp = self.backend.head_object(req).await?;
826 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
827 resp.output.content_length = Some(manifest.original_size as i64);
831 resp.output.checksum_crc32 = None;
832 resp.output.checksum_crc32c = None;
833 resp.output.checksum_crc64nvme = None;
834 resp.output.checksum_sha1 = None;
835 resp.output.checksum_sha256 = None;
836 resp.output.e_tag = None;
837 }
838 Ok(resp)
839 }
840 async fn delete_object(
841 &self,
842 req: S3Request<DeleteObjectInput>,
843 ) -> S3Result<S3Response<DeleteObjectOutput>> {
844 let bucket = req.input.bucket.clone();
845 let key = req.input.key.clone();
846 self.enforce_policy(
847 "s3:DeleteObject",
848 &bucket,
849 Some(&key),
850 Self::principal_of(&req),
851 )?;
852 let resp = self.backend.delete_object(req).await?;
854 let sidecar_input = DeleteObjectInput {
855 bucket: bucket.clone(),
856 key: sidecar_key(&key),
857 ..Default::default()
858 };
859 let sidecar_req = S3Request {
860 input: sidecar_input,
861 method: http::Method::DELETE,
862 uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
863 headers: http::HeaderMap::new(),
864 extensions: http::Extensions::new(),
865 credentials: None,
866 region: None,
867 service: None,
868 trailing_headers: None,
869 };
870 let _ = self.backend.delete_object(sidecar_req).await;
871 Ok(resp)
872 }
873 async fn delete_objects(
874 &self,
875 req: S3Request<DeleteObjectsInput>,
876 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
877 self.backend.delete_objects(req).await
878 }
879 async fn copy_object(
880 &self,
881 mut req: S3Request<CopyObjectInput>,
882 ) -> S3Result<S3Response<CopyObjectOutput>> {
883 let dst_bucket = req.input.bucket.clone();
885 let dst_key = req.input.key.clone();
886 self.enforce_policy(
887 "s3:PutObject",
888 &dst_bucket,
889 Some(&dst_key),
890 Self::principal_of(&req),
891 )?;
892 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
893 self.enforce_policy("s3:GetObject", bucket, Some(key), Self::principal_of(&req))?;
894 }
895 let needs_merge = req
905 .input
906 .metadata_directive
907 .as_ref()
908 .map(|d| d.as_str() == MetadataDirective::REPLACE)
909 .unwrap_or(false);
910 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
911 let head_input = HeadObjectInput {
912 bucket: bucket.to_string(),
913 key: key.to_string(),
914 ..Default::default()
915 };
916 let head_req = S3Request {
917 input: head_input,
918 method: req.method.clone(),
919 uri: req.uri.clone(),
920 headers: req.headers.clone(),
921 extensions: http::Extensions::new(),
922 credentials: req.credentials.clone(),
923 region: req.region.clone(),
924 service: req.service.clone(),
925 trailing_headers: None,
926 };
927 if let Ok(head) = self.backend.head_object(head_req).await
928 && let Some(src_meta) = head.output.metadata.as_ref()
929 {
930 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
931 for key in [
932 META_CODEC,
933 META_ORIGINAL_SIZE,
934 META_COMPRESSED_SIZE,
935 META_CRC32C,
936 META_MULTIPART,
937 META_FRAMED,
938 ] {
939 if let Some(v) = src_meta.get(key) {
940 dest_meta
943 .entry(key.to_string())
944 .or_insert_with(|| v.clone());
945 }
946 }
947 debug!(
948 src_bucket = %bucket,
949 src_key = %key,
950 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
951 );
952 }
953 }
954 self.backend.copy_object(req).await
955 }
956 async fn list_objects(
957 &self,
958 req: S3Request<ListObjectsInput>,
959 ) -> S3Result<S3Response<ListObjectsOutput>> {
960 self.enforce_policy(
961 "s3:ListBucket",
962 &req.input.bucket,
963 None,
964 Self::principal_of(&req),
965 )?;
966 let mut resp = self.backend.list_objects(req).await?;
967 if let Some(contents) = resp.output.contents.as_mut() {
969 contents.retain(|o| {
970 o.key
971 .as_ref()
972 .map(|k| !k.ends_with(".s4index"))
973 .unwrap_or(true)
974 });
975 }
976 Ok(resp)
977 }
978 async fn list_objects_v2(
979 &self,
980 req: S3Request<ListObjectsV2Input>,
981 ) -> S3Result<S3Response<ListObjectsV2Output>> {
982 self.enforce_policy(
983 "s3:ListBucket",
984 &req.input.bucket,
985 None,
986 Self::principal_of(&req),
987 )?;
988 let mut resp = self.backend.list_objects_v2(req).await?;
989 if let Some(contents) = resp.output.contents.as_mut() {
990 let before = contents.len();
991 contents.retain(|o| {
992 o.key
993 .as_ref()
994 .map(|k| !k.ends_with(".s4index"))
995 .unwrap_or(true)
996 });
997 if let Some(kc) = resp.output.key_count.as_mut() {
999 *kc -= (before - contents.len()) as i32;
1000 }
1001 }
1002 Ok(resp)
1003 }
1004 async fn create_multipart_upload(
1005 &self,
1006 mut req: S3Request<CreateMultipartUploadInput>,
1007 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
1008 let codec_kind = self.registry.default_kind();
1012 let meta = req.input.metadata.get_or_insert_with(Default::default);
1013 meta.insert(META_MULTIPART.into(), "true".into());
1014 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
1015 debug!(
1016 bucket = ?req.input.bucket,
1017 key = ?req.input.key,
1018 codec = codec_kind.as_str(),
1019 "S4 create_multipart_upload: marking object for per-part compression"
1020 );
1021 self.backend.create_multipart_upload(req).await
1022 }
1023
1024 async fn upload_part(
1025 &self,
1026 mut req: S3Request<UploadPartInput>,
1027 ) -> S3Result<S3Response<UploadPartOutput>> {
1028 if let Some(blob) = req.input.body.take() {
1034 let bytes = collect_blob(blob, self.max_body_bytes)
1035 .await
1036 .map_err(internal("collect upload_part body"))?;
1037 let sample_len = bytes.len().min(SAMPLE_BYTES);
1038 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1039 let original_size = bytes.len() as u64;
1040 let (compressed, manifest) = self
1041 .registry
1042 .compress(bytes, codec_kind)
1043 .await
1044 .map_err(internal("registry compress part"))?;
1045 let header = FrameHeader {
1046 codec: codec_kind,
1047 original_size,
1048 compressed_size: compressed.len() as u64,
1049 crc32c: manifest.crc32c,
1050 };
1051 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1052 write_frame(&mut framed, header, &compressed);
1053 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1067 if !likely_final {
1068 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1069 }
1070 let framed_bytes = framed.freeze();
1071 let new_len = framed_bytes.len() as i64;
1072 req.input.content_length = Some(new_len);
1074 req.input.checksum_algorithm = None;
1075 req.input.checksum_crc32 = None;
1076 req.input.checksum_crc32c = None;
1077 req.input.checksum_crc64nvme = None;
1078 req.input.checksum_sha1 = None;
1079 req.input.checksum_sha256 = None;
1080 req.input.content_md5 = None;
1081 req.input.body = Some(bytes_to_blob(framed_bytes));
1082 debug!(
1083 part_number = ?req.input.part_number,
1084 upload_id = ?req.input.upload_id,
1085 original_size,
1086 framed_size = new_len,
1087 "S4 upload_part: framed compressed payload"
1088 );
1089 }
1090 self.backend.upload_part(req).await
1091 }
1092 async fn complete_multipart_upload(
1093 &self,
1094 req: S3Request<CompleteMultipartUploadInput>,
1095 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1096 let bucket = req.input.bucket.clone();
1097 let key = req.input.key.clone();
1098 let resp = self.backend.complete_multipart_upload(req).await?;
1099 let bucket_clone = bucket.clone();
1105 let key_clone = key.clone();
1106 let get_input = GetObjectInput {
1107 bucket: bucket_clone.clone(),
1108 key: key_clone.clone(),
1109 ..Default::default()
1110 };
1111 let get_req = S3Request {
1112 input: get_input,
1113 method: http::Method::GET,
1114 uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
1115 headers: http::HeaderMap::new(),
1116 extensions: http::Extensions::new(),
1117 credentials: None,
1118 region: None,
1119 service: None,
1120 trailing_headers: None,
1121 };
1122 if let Ok(get_resp) = self.backend.get_object(get_req).await
1123 && let Some(blob) = get_resp.output.body
1124 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
1125 && let Ok(index) = build_index_from_body(&body)
1126 {
1127 self.write_sidecar(&bucket, &key, &index).await;
1128 }
1129 Ok(resp)
1130 }
1131 async fn abort_multipart_upload(
1132 &self,
1133 req: S3Request<AbortMultipartUploadInput>,
1134 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1135 self.backend.abort_multipart_upload(req).await
1136 }
1137 async fn list_multipart_uploads(
1138 &self,
1139 req: S3Request<ListMultipartUploadsInput>,
1140 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
1141 self.backend.list_multipart_uploads(req).await
1142 }
1143 async fn list_parts(
1144 &self,
1145 req: S3Request<ListPartsInput>,
1146 ) -> S3Result<S3Response<ListPartsOutput>> {
1147 self.backend.list_parts(req).await
1148 }
1149
1150 async fn get_object_acl(
1166 &self,
1167 req: S3Request<GetObjectAclInput>,
1168 ) -> S3Result<S3Response<GetObjectAclOutput>> {
1169 self.backend.get_object_acl(req).await
1170 }
1171 async fn put_object_acl(
1172 &self,
1173 req: S3Request<PutObjectAclInput>,
1174 ) -> S3Result<S3Response<PutObjectAclOutput>> {
1175 self.backend.put_object_acl(req).await
1176 }
1177 async fn get_object_tagging(
1178 &self,
1179 req: S3Request<GetObjectTaggingInput>,
1180 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1181 self.backend.get_object_tagging(req).await
1182 }
1183 async fn put_object_tagging(
1184 &self,
1185 req: S3Request<PutObjectTaggingInput>,
1186 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1187 self.backend.put_object_tagging(req).await
1188 }
1189 async fn delete_object_tagging(
1190 &self,
1191 req: S3Request<DeleteObjectTaggingInput>,
1192 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1193 self.backend.delete_object_tagging(req).await
1194 }
1195 async fn get_object_attributes(
1196 &self,
1197 req: S3Request<GetObjectAttributesInput>,
1198 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1199 self.backend.get_object_attributes(req).await
1200 }
1201 async fn restore_object(
1202 &self,
1203 req: S3Request<RestoreObjectInput>,
1204 ) -> S3Result<S3Response<RestoreObjectOutput>> {
1205 self.backend.restore_object(req).await
1206 }
1207 async fn upload_part_copy(
1208 &self,
1209 req: S3Request<UploadPartCopyInput>,
1210 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1211 let CopySource::Bucket {
1222 bucket: src_bucket,
1223 key: src_key,
1224 ..
1225 } = &req.input.copy_source
1226 else {
1227 return self.backend.upload_part_copy(req).await;
1228 };
1229 let src_bucket = src_bucket.to_string();
1230 let src_key = src_key.to_string();
1231
1232 let head_input = HeadObjectInput {
1234 bucket: src_bucket.clone(),
1235 key: src_key.clone(),
1236 ..Default::default()
1237 };
1238 let head_req = S3Request {
1239 input: head_input,
1240 method: http::Method::HEAD,
1241 uri: req.uri.clone(),
1242 headers: req.headers.clone(),
1243 extensions: http::Extensions::new(),
1244 credentials: req.credentials.clone(),
1245 region: req.region.clone(),
1246 service: req.service.clone(),
1247 trailing_headers: None,
1248 };
1249 let needs_s4_copy = match self.backend.head_object(head_req).await {
1250 Ok(h) => {
1251 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
1252 }
1253 Err(_) => false,
1254 };
1255 if !needs_s4_copy {
1256 return self.backend.upload_part_copy(req).await;
1257 }
1258
1259 let source_range = req
1261 .input
1262 .copy_source_range
1263 .as_ref()
1264 .map(|r| parse_copy_source_range(r))
1265 .transpose()
1266 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1267
1268 let mut get_input = GetObjectInput {
1272 bucket: src_bucket.clone(),
1273 key: src_key.clone(),
1274 ..Default::default()
1275 };
1276 get_input.range = source_range;
1277 let get_req = S3Request {
1278 input: get_input,
1279 method: http::Method::GET,
1280 uri: req.uri.clone(),
1281 headers: req.headers.clone(),
1282 extensions: http::Extensions::new(),
1283 credentials: req.credentials.clone(),
1284 region: req.region.clone(),
1285 service: req.service.clone(),
1286 trailing_headers: None,
1287 };
1288 let get_resp = self.get_object(get_req).await?;
1289 let blob = get_resp.output.body.ok_or_else(|| {
1290 S3Error::with_message(
1291 S3ErrorCode::InternalError,
1292 "upload_part_copy: empty body from source GET",
1293 )
1294 })?;
1295 let bytes = collect_blob(blob, self.max_body_bytes)
1296 .await
1297 .map_err(internal("collect upload_part_copy source body"))?;
1298
1299 let sample_len = bytes.len().min(SAMPLE_BYTES);
1301 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1302 let original_size = bytes.len() as u64;
1303 let (compressed, manifest) = self
1304 .registry
1305 .compress(bytes, codec_kind)
1306 .await
1307 .map_err(internal("registry compress upload_part_copy"))?;
1308 let header = FrameHeader {
1309 codec: codec_kind,
1310 original_size,
1311 compressed_size: compressed.len() as u64,
1312 crc32c: manifest.crc32c,
1313 };
1314 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1315 write_frame(&mut framed, header, &compressed);
1316 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1317 if !likely_final {
1318 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1319 }
1320 let framed_bytes = framed.freeze();
1321 let framed_len = framed_bytes.len() as i64;
1322
1323 let part_input = UploadPartInput {
1325 bucket: req.input.bucket.clone(),
1326 key: req.input.key.clone(),
1327 part_number: req.input.part_number,
1328 upload_id: req.input.upload_id.clone(),
1329 body: Some(bytes_to_blob(framed_bytes)),
1330 content_length: Some(framed_len),
1331 ..Default::default()
1332 };
1333 let part_req = S3Request {
1334 input: part_input,
1335 method: http::Method::PUT,
1336 uri: req.uri.clone(),
1337 headers: req.headers.clone(),
1338 extensions: http::Extensions::new(),
1339 credentials: req.credentials.clone(),
1340 region: req.region.clone(),
1341 service: req.service.clone(),
1342 trailing_headers: None,
1343 };
1344 let upload_resp = self.backend.upload_part(part_req).await?;
1345
1346 let copy_output = UploadPartCopyOutput {
1347 copy_part_result: Some(CopyPartResult {
1348 e_tag: upload_resp.output.e_tag.clone(),
1349 ..Default::default()
1350 }),
1351 ..Default::default()
1352 };
1353 Ok(S3Response::new(copy_output))
1354 }
1355
1356 async fn get_object_lock_configuration(
1358 &self,
1359 req: S3Request<GetObjectLockConfigurationInput>,
1360 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1361 self.backend.get_object_lock_configuration(req).await
1362 }
1363 async fn put_object_lock_configuration(
1364 &self,
1365 req: S3Request<PutObjectLockConfigurationInput>,
1366 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1367 self.backend.put_object_lock_configuration(req).await
1368 }
1369 async fn get_object_legal_hold(
1370 &self,
1371 req: S3Request<GetObjectLegalHoldInput>,
1372 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1373 self.backend.get_object_legal_hold(req).await
1374 }
1375 async fn put_object_legal_hold(
1376 &self,
1377 req: S3Request<PutObjectLegalHoldInput>,
1378 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1379 self.backend.put_object_legal_hold(req).await
1380 }
1381 async fn get_object_retention(
1382 &self,
1383 req: S3Request<GetObjectRetentionInput>,
1384 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1385 self.backend.get_object_retention(req).await
1386 }
1387 async fn put_object_retention(
1388 &self,
1389 req: S3Request<PutObjectRetentionInput>,
1390 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1391 self.backend.put_object_retention(req).await
1392 }
1393
1394 async fn list_object_versions(
1396 &self,
1397 req: S3Request<ListObjectVersionsInput>,
1398 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1399 self.backend.list_object_versions(req).await
1400 }
1401 async fn get_bucket_versioning(
1402 &self,
1403 req: S3Request<GetBucketVersioningInput>,
1404 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1405 self.backend.get_bucket_versioning(req).await
1406 }
1407 async fn put_bucket_versioning(
1408 &self,
1409 req: S3Request<PutBucketVersioningInput>,
1410 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1411 self.backend.put_bucket_versioning(req).await
1412 }
1413
1414 async fn get_bucket_location(
1416 &self,
1417 req: S3Request<GetBucketLocationInput>,
1418 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1419 self.backend.get_bucket_location(req).await
1420 }
1421
1422 async fn get_bucket_policy(
1424 &self,
1425 req: S3Request<GetBucketPolicyInput>,
1426 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1427 self.backend.get_bucket_policy(req).await
1428 }
1429 async fn put_bucket_policy(
1430 &self,
1431 req: S3Request<PutBucketPolicyInput>,
1432 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1433 self.backend.put_bucket_policy(req).await
1434 }
1435 async fn delete_bucket_policy(
1436 &self,
1437 req: S3Request<DeleteBucketPolicyInput>,
1438 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1439 self.backend.delete_bucket_policy(req).await
1440 }
1441 async fn get_bucket_policy_status(
1442 &self,
1443 req: S3Request<GetBucketPolicyStatusInput>,
1444 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1445 self.backend.get_bucket_policy_status(req).await
1446 }
1447
1448 async fn get_bucket_acl(
1450 &self,
1451 req: S3Request<GetBucketAclInput>,
1452 ) -> S3Result<S3Response<GetBucketAclOutput>> {
1453 self.backend.get_bucket_acl(req).await
1454 }
1455 async fn put_bucket_acl(
1456 &self,
1457 req: S3Request<PutBucketAclInput>,
1458 ) -> S3Result<S3Response<PutBucketAclOutput>> {
1459 self.backend.put_bucket_acl(req).await
1460 }
1461
1462 async fn get_bucket_cors(
1464 &self,
1465 req: S3Request<GetBucketCorsInput>,
1466 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1467 self.backend.get_bucket_cors(req).await
1468 }
1469 async fn put_bucket_cors(
1470 &self,
1471 req: S3Request<PutBucketCorsInput>,
1472 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1473 self.backend.put_bucket_cors(req).await
1474 }
1475 async fn delete_bucket_cors(
1476 &self,
1477 req: S3Request<DeleteBucketCorsInput>,
1478 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1479 self.backend.delete_bucket_cors(req).await
1480 }
1481
1482 async fn get_bucket_lifecycle_configuration(
1484 &self,
1485 req: S3Request<GetBucketLifecycleConfigurationInput>,
1486 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1487 self.backend.get_bucket_lifecycle_configuration(req).await
1488 }
1489 async fn put_bucket_lifecycle_configuration(
1490 &self,
1491 req: S3Request<PutBucketLifecycleConfigurationInput>,
1492 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1493 self.backend.put_bucket_lifecycle_configuration(req).await
1494 }
1495 async fn delete_bucket_lifecycle(
1496 &self,
1497 req: S3Request<DeleteBucketLifecycleInput>,
1498 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1499 self.backend.delete_bucket_lifecycle(req).await
1500 }
1501
1502 async fn get_bucket_tagging(
1504 &self,
1505 req: S3Request<GetBucketTaggingInput>,
1506 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1507 self.backend.get_bucket_tagging(req).await
1508 }
1509 async fn put_bucket_tagging(
1510 &self,
1511 req: S3Request<PutBucketTaggingInput>,
1512 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1513 self.backend.put_bucket_tagging(req).await
1514 }
1515 async fn delete_bucket_tagging(
1516 &self,
1517 req: S3Request<DeleteBucketTaggingInput>,
1518 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1519 self.backend.delete_bucket_tagging(req).await
1520 }
1521
1522 async fn get_bucket_encryption(
1524 &self,
1525 req: S3Request<GetBucketEncryptionInput>,
1526 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1527 self.backend.get_bucket_encryption(req).await
1528 }
1529 async fn put_bucket_encryption(
1530 &self,
1531 req: S3Request<PutBucketEncryptionInput>,
1532 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1533 self.backend.put_bucket_encryption(req).await
1534 }
1535 async fn delete_bucket_encryption(
1536 &self,
1537 req: S3Request<DeleteBucketEncryptionInput>,
1538 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1539 self.backend.delete_bucket_encryption(req).await
1540 }
1541
1542 async fn get_bucket_logging(
1544 &self,
1545 req: S3Request<GetBucketLoggingInput>,
1546 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1547 self.backend.get_bucket_logging(req).await
1548 }
1549 async fn put_bucket_logging(
1550 &self,
1551 req: S3Request<PutBucketLoggingInput>,
1552 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1553 self.backend.put_bucket_logging(req).await
1554 }
1555
1556 async fn get_bucket_notification_configuration(
1558 &self,
1559 req: S3Request<GetBucketNotificationConfigurationInput>,
1560 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1561 self.backend
1562 .get_bucket_notification_configuration(req)
1563 .await
1564 }
1565 async fn put_bucket_notification_configuration(
1566 &self,
1567 req: S3Request<PutBucketNotificationConfigurationInput>,
1568 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1569 self.backend
1570 .put_bucket_notification_configuration(req)
1571 .await
1572 }
1573
1574 async fn get_bucket_request_payment(
1576 &self,
1577 req: S3Request<GetBucketRequestPaymentInput>,
1578 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1579 self.backend.get_bucket_request_payment(req).await
1580 }
1581 async fn put_bucket_request_payment(
1582 &self,
1583 req: S3Request<PutBucketRequestPaymentInput>,
1584 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1585 self.backend.put_bucket_request_payment(req).await
1586 }
1587
1588 async fn get_bucket_website(
1590 &self,
1591 req: S3Request<GetBucketWebsiteInput>,
1592 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1593 self.backend.get_bucket_website(req).await
1594 }
1595 async fn put_bucket_website(
1596 &self,
1597 req: S3Request<PutBucketWebsiteInput>,
1598 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1599 self.backend.put_bucket_website(req).await
1600 }
1601 async fn delete_bucket_website(
1602 &self,
1603 req: S3Request<DeleteBucketWebsiteInput>,
1604 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1605 self.backend.delete_bucket_website(req).await
1606 }
1607
1608 async fn get_bucket_replication(
1610 &self,
1611 req: S3Request<GetBucketReplicationInput>,
1612 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1613 self.backend.get_bucket_replication(req).await
1614 }
1615 async fn put_bucket_replication(
1616 &self,
1617 req: S3Request<PutBucketReplicationInput>,
1618 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1619 self.backend.put_bucket_replication(req).await
1620 }
1621 async fn delete_bucket_replication(
1622 &self,
1623 req: S3Request<DeleteBucketReplicationInput>,
1624 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1625 self.backend.delete_bucket_replication(req).await
1626 }
1627
1628 async fn get_bucket_accelerate_configuration(
1630 &self,
1631 req: S3Request<GetBucketAccelerateConfigurationInput>,
1632 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1633 self.backend.get_bucket_accelerate_configuration(req).await
1634 }
1635 async fn put_bucket_accelerate_configuration(
1636 &self,
1637 req: S3Request<PutBucketAccelerateConfigurationInput>,
1638 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1639 self.backend.put_bucket_accelerate_configuration(req).await
1640 }
1641
1642 async fn get_bucket_ownership_controls(
1644 &self,
1645 req: S3Request<GetBucketOwnershipControlsInput>,
1646 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1647 self.backend.get_bucket_ownership_controls(req).await
1648 }
1649 async fn put_bucket_ownership_controls(
1650 &self,
1651 req: S3Request<PutBucketOwnershipControlsInput>,
1652 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1653 self.backend.put_bucket_ownership_controls(req).await
1654 }
1655 async fn delete_bucket_ownership_controls(
1656 &self,
1657 req: S3Request<DeleteBucketOwnershipControlsInput>,
1658 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1659 self.backend.delete_bucket_ownership_controls(req).await
1660 }
1661
1662 async fn get_public_access_block(
1664 &self,
1665 req: S3Request<GetPublicAccessBlockInput>,
1666 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1667 self.backend.get_public_access_block(req).await
1668 }
1669 async fn put_public_access_block(
1670 &self,
1671 req: S3Request<PutPublicAccessBlockInput>,
1672 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1673 self.backend.put_public_access_block(req).await
1674 }
1675 async fn delete_public_access_block(
1676 &self,
1677 req: S3Request<DeletePublicAccessBlockInput>,
1678 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1679 self.backend.delete_public_access_block(req).await
1680 }
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685 use super::*;
1686
1687 #[test]
1688 fn manifest_roundtrip_via_metadata() {
1689 let original = ChunkManifest {
1690 codec: CodecKind::CpuZstd,
1691 original_size: 1234,
1692 compressed_size: 567,
1693 crc32c: 0xdead_beef,
1694 };
1695 let mut meta: Option<Metadata> = None;
1696 write_manifest(&mut meta, &original);
1697 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1698 assert_eq!(extracted.codec, original.codec);
1699 assert_eq!(extracted.original_size, original.original_size);
1700 assert_eq!(extracted.compressed_size, original.compressed_size);
1701 assert_eq!(extracted.crc32c, original.crc32c);
1702 }
1703
1704 #[test]
1705 fn missing_metadata_yields_none() {
1706 let meta: Option<Metadata> = None;
1707 assert!(extract_manifest(&meta).is_none());
1708 }
1709
1710 #[test]
1711 fn partial_metadata_yields_none() {
1712 let mut meta = Metadata::new();
1713 meta.insert(META_CODEC.into(), "cpu-zstd".into());
1714 let opt = Some(meta);
1715 assert!(extract_manifest(&opt).is_none());
1716 }
1717
1718 #[test]
1719 fn parse_copy_source_range_basic() {
1720 let r = parse_copy_source_range("bytes=10-20").unwrap();
1721 match r {
1722 s3s::dto::Range::Int { first, last } => {
1723 assert_eq!(first, 10);
1724 assert_eq!(last, Some(20));
1725 }
1726 _ => panic!("expected Int range"),
1727 }
1728 }
1729
1730 #[test]
1731 fn parse_copy_source_range_rejects_inverted() {
1732 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
1733 assert!(err.contains("last < first"));
1734 }
1735
1736 #[test]
1737 fn parse_copy_source_range_rejects_missing_prefix() {
1738 let err = parse_copy_source_range("10-20").unwrap_err();
1739 assert!(err.contains("must start with 'bytes='"));
1740 }
1741
1742 #[test]
1743 fn parse_copy_source_range_rejects_open_ended() {
1744 assert!(parse_copy_source_range("bytes=10-").is_err());
1747 assert!(parse_copy_source_range("bytes=-10").is_err());
1748 }
1749}