1use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39 write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49 DEFAULT_S4F2_CHUNK_SIZE, cpu_zstd_decompress_stream, streaming_compress_to_frames,
50 supports_streaming_compress, supports_streaming_decompress,
51};
52
53const SAMPLE_BYTES: usize = 4096;
55
56pub struct S4Service<B: S3> {
57 backend: B,
58 registry: Arc<CodecRegistry>,
59 dispatcher: Arc<dyn CodecDispatcher>,
60 max_body_bytes: usize,
61 policy: Option<crate::policy::SharedPolicy>,
62 secure_transport: bool,
67}
68
69impl<B: S3> S4Service<B> {
70 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
72
73 pub fn new(
74 backend: B,
75 registry: Arc<CodecRegistry>,
76 dispatcher: Arc<dyn CodecDispatcher>,
77 ) -> Self {
78 Self {
79 backend,
80 registry,
81 dispatcher,
82 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
83 policy: None,
84 secure_transport: false,
85 }
86 }
87
88 #[must_use]
92 pub fn with_secure_transport(mut self, on: bool) -> Self {
93 self.secure_transport = on;
94 self
95 }
96
97 #[must_use]
98 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
99 self.max_body_bytes = n;
100 self
101 }
102
103 #[must_use]
108 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
109 self.policy = Some(policy);
110 self
111 }
112
113 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
116 req.credentials.as_ref().map(|c| c.access_key.as_str())
117 }
118
119 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
126 let user_agent = req
127 .headers
128 .get("user-agent")
129 .and_then(|v| v.to_str().ok())
130 .map(str::to_owned);
131 let source_ip = req
134 .headers
135 .get("x-forwarded-for")
136 .and_then(|v| v.to_str().ok())
137 .and_then(|raw| raw.split(',').next())
138 .and_then(|s| s.trim().parse().ok());
139 crate::policy::RequestContext {
140 source_ip,
141 user_agent,
142 request_time: Some(std::time::SystemTime::now()),
143 secure_transport: self.secure_transport,
144 extra: Default::default(),
145 }
146 }
147
148 fn enforce_policy<I>(
153 &self,
154 req: &S3Request<I>,
155 action: &'static str,
156 bucket: &str,
157 key: Option<&str>,
158 ) -> S3Result<()> {
159 let Some(policy) = self.policy.as_ref() else {
160 return Ok(());
161 };
162 let principal_id = Self::principal_of(req);
163 let ctx = self.request_context(req);
164 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
165 if decision.allow {
166 Ok(())
167 } else {
168 crate::metrics::record_policy_denial(action, bucket);
169 tracing::info!(
170 action,
171 bucket,
172 key = ?key,
173 principal = ?principal_id,
174 source_ip = ?ctx.source_ip,
175 user_agent = ?ctx.user_agent,
176 secure_transport = ctx.secure_transport,
177 matched_sid = ?decision.matched_sid,
178 effect = ?decision.matched_effect,
179 "S4 policy denied request"
180 );
181 Err(S3Error::with_message(
182 S3ErrorCode::AccessDenied,
183 format!("denied by S4 policy: {action} on bucket={bucket}"),
184 ))
185 }
186 }
187
188 pub fn into_backend(self) -> B {
190 self.backend
191 }
192
193 async fn partial_range_get(
196 &self,
197 req: &S3Request<GetObjectInput>,
198 plan: s4_codec::index::RangePlan,
199 client_start: u64,
200 client_end_exclusive: u64,
201 total_original: u64,
202 get_start: Instant,
203 ) -> S3Result<S3Response<GetObjectOutput>> {
204 let backend_range = s3s::dto::Range::Int {
206 first: plan.byte_start,
207 last: Some(plan.byte_end_exclusive - 1),
208 };
209 let backend_input = GetObjectInput {
210 bucket: req.input.bucket.clone(),
211 key: req.input.key.clone(),
212 range: Some(backend_range),
213 ..Default::default()
214 };
215 let backend_req = S3Request {
216 input: backend_input,
217 method: req.method.clone(),
218 uri: req.uri.clone(),
219 headers: req.headers.clone(),
220 extensions: http::Extensions::new(),
221 credentials: req.credentials.clone(),
222 region: req.region.clone(),
223 service: req.service.clone(),
224 trailing_headers: None,
225 };
226 let mut backend_resp = self.backend.get_object(backend_req).await?;
227 let blob = backend_resp.output.body.take().ok_or_else(|| {
228 S3Error::with_message(
229 S3ErrorCode::InternalError,
230 "backend partial GET returned empty body",
231 )
232 })?;
233 let bytes = collect_blob(blob, self.max_body_bytes)
234 .await
235 .map_err(internal("collect partial body"))?;
236
237 let mut combined = BytesMut::new();
239 for frame in FrameIter::new(bytes) {
240 let (header, payload) = frame.map_err(|e| {
241 S3Error::with_message(
242 S3ErrorCode::InternalError,
243 format!("partial-range frame parse: {e}"),
244 )
245 })?;
246 let chunk_manifest = ChunkManifest {
247 codec: header.codec,
248 original_size: header.original_size,
249 compressed_size: header.compressed_size,
250 crc32c: header.crc32c,
251 };
252 let decompressed = self
253 .registry
254 .decompress(payload, &chunk_manifest)
255 .await
256 .map_err(internal("partial-range decompress"))?;
257 combined.extend_from_slice(&decompressed);
258 }
259 let combined = combined.freeze();
260 let sliced = combined
261 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
262
263 let returned_size = sliced.len() as u64;
265 backend_resp.output.content_length = Some(returned_size as i64);
266 backend_resp.output.content_range = Some(format!(
267 "bytes {client_start}-{}/{total_original}",
268 client_end_exclusive - 1
269 ));
270 backend_resp.output.checksum_crc32 = None;
271 backend_resp.output.checksum_crc32c = None;
272 backend_resp.output.checksum_crc64nvme = None;
273 backend_resp.output.checksum_sha1 = None;
274 backend_resp.output.checksum_sha256 = None;
275 backend_resp.output.e_tag = None;
276 backend_resp.output.body = Some(bytes_to_blob(sliced));
277 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
278
279 let elapsed = get_start.elapsed();
280 crate::metrics::record_get(
281 "partial",
282 plan.byte_end_exclusive - plan.byte_start,
283 returned_size,
284 elapsed.as_secs_f64(),
285 true,
286 );
287 info!(
288 op = "get_object",
289 bucket = %req.input.bucket,
290 key = %req.input.key,
291 bytes_in = plan.byte_end_exclusive - plan.byte_start,
292 bytes_out = returned_size,
293 total_object_size = total_original,
294 range = true,
295 path = "sidecar-partial",
296 latency_ms = elapsed.as_millis() as u64,
297 "S4 partial Range GET via sidecar index"
298 );
299 Ok(backend_resp)
300 }
301
302 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
306 let bytes = encode_index(index);
307 let len = bytes.len() as i64;
308 let put_input = PutObjectInput {
309 bucket: bucket.into(),
310 key: sidecar_key(key),
311 body: Some(bytes_to_blob(bytes)),
312 content_length: Some(len),
313 content_type: Some("application/x-s4-index".into()),
314 ..Default::default()
315 };
316 let put_req = S3Request {
317 input: put_input,
318 method: http::Method::PUT,
319 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
320 headers: http::HeaderMap::new(),
321 extensions: http::Extensions::new(),
322 credentials: None,
323 region: None,
324 service: None,
325 trailing_headers: None,
326 };
327 if let Err(e) = self.backend.put_object(put_req).await {
328 tracing::warn!(
329 bucket,
330 key,
331 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
332 );
333 }
334 }
335
336 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
338 let get_input = GetObjectInput {
339 bucket: bucket.into(),
340 key: sidecar_key(key),
341 ..Default::default()
342 };
343 let get_req = S3Request {
344 input: get_input,
345 method: http::Method::GET,
346 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
347 headers: http::HeaderMap::new(),
348 extensions: http::Extensions::new(),
349 credentials: None,
350 region: None,
351 service: None,
352 trailing_headers: None,
353 };
354 let resp = self.backend.get_object(get_req).await.ok()?;
355 let blob = resp.output.body?;
356 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
357 decode_index(bytes).ok()
358 }
359
360 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
366 let mut out = BytesMut::new();
367 for frame in FrameIter::new(bytes) {
368 let (header, payload) = frame.map_err(|e| {
369 S3Error::with_message(
370 S3ErrorCode::InternalError,
371 format!("multipart frame parse: {e}"),
372 )
373 })?;
374 let chunk_manifest = ChunkManifest {
375 codec: header.codec,
376 original_size: header.original_size,
377 compressed_size: header.compressed_size,
378 crc32c: header.crc32c,
379 };
380 let decompressed = self
381 .registry
382 .decompress(payload, &chunk_manifest)
383 .await
384 .map_err(internal("multipart frame decompress"))?;
385 out.extend_from_slice(&decompressed);
386 }
387 Ok(out.freeze())
388 }
389}
390
391fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
396 let rest = s
397 .strip_prefix("bytes=")
398 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
399 let (a, b) = rest
400 .split_once('-')
401 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
402 let first: u64 = a
403 .parse()
404 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
405 let last: u64 = b
406 .parse()
407 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
408 if last < first {
409 return Err(format!("CopySourceRange last < first: {s:?}"));
410 }
411 Ok(s3s::dto::Range::Int {
412 first,
413 last: Some(last),
414 })
415}
416
417fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
418 metadata
419 .as_ref()
420 .and_then(|m| m.get(META_MULTIPART))
421 .map(|v| v == "true")
422 .unwrap_or(false)
423}
424
425const META_CODEC: &str = "s4-codec";
426const META_ORIGINAL_SIZE: &str = "s4-original-size";
427const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
428const META_CRC32C: &str = "s4-crc32c";
429const META_MULTIPART: &str = "s4-multipart";
432const META_FRAMED: &str = "s4-framed";
436
437fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
438 metadata
439 .as_ref()
440 .and_then(|m| m.get(META_FRAMED))
441 .map(|v| v == "true")
442 .unwrap_or(false)
443}
444
445fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
446 let m = metadata.as_ref()?;
447 let codec = m
448 .get(META_CODEC)
449 .and_then(|s| s.parse::<CodecKind>().ok())?;
450 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
451 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
452 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
453 Some(ChunkManifest {
454 codec,
455 original_size,
456 compressed_size,
457 crc32c,
458 })
459}
460
461fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
462 let meta = metadata.get_or_insert_with(Default::default);
463 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
464 meta.insert(
465 META_ORIGINAL_SIZE.into(),
466 manifest.original_size.to_string(),
467 );
468 meta.insert(
469 META_COMPRESSED_SIZE.into(),
470 manifest.compressed_size.to_string(),
471 );
472 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
473}
474
475fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
476 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
477}
478
479pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
483 if total == 0 {
484 return Err("cannot range-get zero-length object".into());
485 }
486 match range {
487 s3s::dto::Range::Int { first, last } => {
488 let start = *first;
489 let end_inclusive = match last {
490 Some(l) => (*l).min(total - 1),
491 None => total - 1,
492 };
493 if start > end_inclusive || start >= total {
494 return Err(format!(
495 "range bytes={start}-{:?} out of object size {total}",
496 last
497 ));
498 }
499 Ok((start, end_inclusive + 1))
500 }
501 s3s::dto::Range::Suffix { length } => {
502 let len = (*length).min(total);
503 Ok((total - len, total))
504 }
505 }
506}
507
508#[async_trait::async_trait]
509impl<B: S3> S3 for S4Service<B> {
510 #[tracing::instrument(
512 name = "s4.put_object",
513 skip(self, req),
514 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
515 )]
516 async fn put_object(
517 &self,
518 mut req: S3Request<PutObjectInput>,
519 ) -> S3Result<S3Response<PutObjectOutput>> {
520 let put_start = Instant::now();
521 let put_bucket = req.input.bucket.clone();
522 let put_key = req.input.key.clone();
523 self.enforce_policy(&req, "s3:PutObject", &put_bucket, Some(&put_key))?;
524 if let Some(blob) = req.input.body.take() {
525 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
528 .await
529 .map_err(internal("peek put sample"))?;
530 let sample_len = sample.len().min(SAMPLE_BYTES);
531 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
532
533 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
539 let (compressed, manifest, is_framed) = if use_framed {
540 let chained = chain_sample_with_rest(sample, rest_stream);
542 debug!(
543 bucket = ?req.input.bucket,
544 key = ?req.input.key,
545 codec = kind.as_str(),
546 path = "streaming-framed",
547 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
548 );
549 let (body, manifest) = streaming_compress_to_frames(
550 chained,
551 Arc::clone(&self.registry),
552 kind,
553 DEFAULT_S4F2_CHUNK_SIZE,
554 )
555 .await
556 .map_err(internal("streaming framed compress"))?;
557 (body, manifest, true)
558 } else {
559 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
562 .await
563 .map_err(internal("collect put body (buffered path)"))?;
564 debug!(
565 bucket = ?req.input.bucket,
566 key = ?req.input.key,
567 bytes = bytes.len(),
568 codec = kind.as_str(),
569 path = "buffered",
570 "S4 put_object: compressing (buffered, raw blob)"
571 );
572 let (body, m) = self
573 .registry
574 .compress(bytes, kind)
575 .await
576 .map_err(internal("registry compress"))?;
577 (body, m, false)
578 };
579
580 write_manifest(&mut req.input.metadata, &manifest);
581 if is_framed {
582 req.input
584 .metadata
585 .get_or_insert_with(Default::default)
586 .insert(META_FRAMED.into(), "true".into());
587 }
588 req.input.content_length = Some(compressed.len() as i64);
592 req.input.checksum_algorithm = None;
597 req.input.checksum_crc32 = None;
598 req.input.checksum_crc32c = None;
599 req.input.checksum_crc64nvme = None;
600 req.input.checksum_sha1 = None;
601 req.input.checksum_sha256 = None;
602 req.input.content_md5 = None;
603 let original_size = manifest.original_size;
604 let compressed_size = manifest.compressed_size;
605 let codec_label = manifest.codec.as_str();
606 let sidecar_index = if is_framed {
609 s4_codec::index::build_index_from_body(&compressed).ok()
610 } else {
611 None
612 };
613 req.input.body = Some(bytes_to_blob(compressed));
614 let backend_resp = self.backend.put_object(req).await;
615 if let Some(idx) = sidecar_index
616 && backend_resp.is_ok()
617 && idx.entries.len() > 1
618 {
619 self.write_sidecar(&put_bucket, &put_key, &idx).await;
622 }
623 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
625 crate::metrics::record_put(
626 codec_label,
627 original_size,
628 compressed_size,
629 elapsed.as_secs_f64(),
630 backend_resp.is_ok(),
631 );
632 info!(
633 op = "put_object",
634 bucket = %put_bucket,
635 key = %put_key,
636 codec = codec_label,
637 bytes_in = original_size,
638 bytes_out = compressed_size,
639 ratio = format!(
640 "{:.3}",
641 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
642 ),
643 latency_ms = elapsed.as_millis() as u64,
644 ok = backend_resp.is_ok(),
645 "S4 put completed"
646 );
647 return backend_resp;
648 }
649 self.backend.put_object(req).await
650 }
651
652 #[tracing::instrument(
654 name = "s4.get_object",
655 skip(self, req),
656 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
657 )]
658 async fn get_object(
659 &self,
660 mut req: S3Request<GetObjectInput>,
661 ) -> S3Result<S3Response<GetObjectOutput>> {
662 let get_start = Instant::now();
663 let get_bucket = req.input.bucket.clone();
664 let get_key = req.input.key.clone();
665 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
666 let range_request = req.input.range.take();
668
669 if let Some(ref r) = range_request
673 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
674 {
675 let total = index.total_original_size();
676 let (start, end_exclusive) = match resolve_range(r, total) {
677 Ok(v) => v,
678 Err(e) => {
679 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
680 }
681 };
682 if let Some(plan) = index.lookup_range(start, end_exclusive) {
683 return self
684 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
685 .await;
686 }
687 }
688 let mut resp = self.backend.get_object(req).await?;
689 let is_multipart = is_multipart_object(&resp.output.metadata);
690 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
691 let needs_frame_parse = is_multipart || is_framed_v2;
694 let manifest_opt = extract_manifest(&resp.output.metadata);
695
696 if !needs_frame_parse && manifest_opt.is_none() {
697 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
699 return Ok(resp);
700 }
701
702 if let Some(blob) = resp.output.body.take() {
703 if range_request.is_none()
711 && !needs_frame_parse
712 && let Some(ref m) = manifest_opt
713 && supports_streaming_decompress(m.codec)
714 && m.codec == CodecKind::CpuZstd
715 {
716 let decompressed_blob = cpu_zstd_decompress_stream(blob);
717 resp.output.content_length = Some(m.original_size as i64);
718 resp.output.checksum_crc32 = None;
719 resp.output.checksum_crc32c = None;
720 resp.output.checksum_crc64nvme = None;
721 resp.output.checksum_sha1 = None;
722 resp.output.checksum_sha256 = None;
723 resp.output.e_tag = None;
724 resp.output.body = Some(decompressed_blob);
725 let elapsed = get_start.elapsed();
726 crate::metrics::record_get(
727 m.codec.as_str(),
728 m.compressed_size,
729 m.original_size,
730 elapsed.as_secs_f64(),
731 true,
732 );
733 info!(
734 op = "get_object",
735 bucket = %get_bucket,
736 key = %get_key,
737 codec = m.codec.as_str(),
738 bytes_in = m.compressed_size,
739 bytes_out = m.original_size,
740 path = "streaming",
741 setup_latency_ms = elapsed.as_millis() as u64,
742 "S4 get started (streaming)"
743 );
744 return Ok(resp);
745 }
746 if range_request.is_none()
748 && !needs_frame_parse
749 && let Some(ref m) = manifest_opt
750 && m.codec == CodecKind::Passthrough
751 {
752 resp.output.content_length = Some(m.original_size as i64);
753 resp.output.checksum_crc32 = None;
754 resp.output.checksum_crc32c = None;
755 resp.output.checksum_crc64nvme = None;
756 resp.output.checksum_sha1 = None;
757 resp.output.checksum_sha256 = None;
758 resp.output.e_tag = None;
759 resp.output.body = Some(blob);
760 debug!("S4 get_object: passthrough streaming");
761 return Ok(resp);
762 }
763
764 let bytes = collect_blob(blob, self.max_body_bytes)
766 .await
767 .map_err(internal("collect get body"))?;
768
769 let decompressed = if needs_frame_parse {
770 self.decompress_multipart(bytes).await?
773 } else {
774 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
775 self.registry
776 .decompress(bytes, manifest)
777 .await
778 .map_err(internal("registry decompress"))?
779 };
780
781 let total_size = decompressed.len() as u64;
783 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
784 let (start, end) = resolve_range(r, total_size)
785 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
786 let sliced = decompressed.slice(start as usize..end as usize);
787 resp.output.content_range = Some(format!(
788 "bytes {start}-{}/{total_size}",
789 end.saturating_sub(1)
790 ));
791 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
792 } else {
793 (decompressed, None)
794 };
795 resp.output.content_length = Some(final_bytes.len() as i64);
798 resp.output.checksum_crc32 = None;
803 resp.output.checksum_crc32c = None;
804 resp.output.checksum_crc64nvme = None;
805 resp.output.checksum_sha1 = None;
806 resp.output.checksum_sha256 = None;
807 resp.output.e_tag = None;
808 let returned_size = final_bytes.len() as u64;
809 let codec_label = manifest_opt
810 .as_ref()
811 .map(|m| m.codec.as_str())
812 .unwrap_or("multipart");
813 resp.output.body = Some(bytes_to_blob(final_bytes));
814 if let Some(status) = status_override {
815 resp.status = Some(status);
816 }
817 let elapsed = get_start.elapsed();
818 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
819 info!(
820 op = "get_object",
821 bucket = %get_bucket,
822 key = %get_key,
823 codec = codec_label,
824 bytes_out = returned_size,
825 total_object_size = total_size,
826 range = range_request.is_some(),
827 path = "buffered",
828 latency_ms = elapsed.as_millis() as u64,
829 "S4 get completed (buffered)"
830 );
831 }
832 Ok(resp)
833 }
834
835 async fn head_bucket(
837 &self,
838 req: S3Request<HeadBucketInput>,
839 ) -> S3Result<S3Response<HeadBucketOutput>> {
840 self.backend.head_bucket(req).await
841 }
842 async fn list_buckets(
843 &self,
844 req: S3Request<ListBucketsInput>,
845 ) -> S3Result<S3Response<ListBucketsOutput>> {
846 self.backend.list_buckets(req).await
847 }
848 async fn create_bucket(
849 &self,
850 req: S3Request<CreateBucketInput>,
851 ) -> S3Result<S3Response<CreateBucketOutput>> {
852 self.backend.create_bucket(req).await
853 }
854 async fn delete_bucket(
855 &self,
856 req: S3Request<DeleteBucketInput>,
857 ) -> S3Result<S3Response<DeleteBucketOutput>> {
858 self.backend.delete_bucket(req).await
859 }
860 async fn head_object(
861 &self,
862 req: S3Request<HeadObjectInput>,
863 ) -> S3Result<S3Response<HeadObjectOutput>> {
864 let mut resp = self.backend.head_object(req).await?;
865 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
866 resp.output.content_length = Some(manifest.original_size as i64);
870 resp.output.checksum_crc32 = None;
871 resp.output.checksum_crc32c = None;
872 resp.output.checksum_crc64nvme = None;
873 resp.output.checksum_sha1 = None;
874 resp.output.checksum_sha256 = None;
875 resp.output.e_tag = None;
876 }
877 Ok(resp)
878 }
879 async fn delete_object(
880 &self,
881 req: S3Request<DeleteObjectInput>,
882 ) -> S3Result<S3Response<DeleteObjectOutput>> {
883 let bucket = req.input.bucket.clone();
884 let key = req.input.key.clone();
885 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
886 let resp = self.backend.delete_object(req).await?;
888 let sidecar_input = DeleteObjectInput {
889 bucket: bucket.clone(),
890 key: sidecar_key(&key),
891 ..Default::default()
892 };
893 let sidecar_req = S3Request {
894 input: sidecar_input,
895 method: http::Method::DELETE,
896 uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
897 headers: http::HeaderMap::new(),
898 extensions: http::Extensions::new(),
899 credentials: None,
900 region: None,
901 service: None,
902 trailing_headers: None,
903 };
904 let _ = self.backend.delete_object(sidecar_req).await;
905 Ok(resp)
906 }
907 async fn delete_objects(
908 &self,
909 req: S3Request<DeleteObjectsInput>,
910 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
911 self.backend.delete_objects(req).await
912 }
913 async fn copy_object(
914 &self,
915 mut req: S3Request<CopyObjectInput>,
916 ) -> S3Result<S3Response<CopyObjectOutput>> {
917 let dst_bucket = req.input.bucket.clone();
919 let dst_key = req.input.key.clone();
920 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
921 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
922 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
923 }
924 let needs_merge = req
934 .input
935 .metadata_directive
936 .as_ref()
937 .map(|d| d.as_str() == MetadataDirective::REPLACE)
938 .unwrap_or(false);
939 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
940 let head_input = HeadObjectInput {
941 bucket: bucket.to_string(),
942 key: key.to_string(),
943 ..Default::default()
944 };
945 let head_req = S3Request {
946 input: head_input,
947 method: req.method.clone(),
948 uri: req.uri.clone(),
949 headers: req.headers.clone(),
950 extensions: http::Extensions::new(),
951 credentials: req.credentials.clone(),
952 region: req.region.clone(),
953 service: req.service.clone(),
954 trailing_headers: None,
955 };
956 if let Ok(head) = self.backend.head_object(head_req).await
957 && let Some(src_meta) = head.output.metadata.as_ref()
958 {
959 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
960 for key in [
961 META_CODEC,
962 META_ORIGINAL_SIZE,
963 META_COMPRESSED_SIZE,
964 META_CRC32C,
965 META_MULTIPART,
966 META_FRAMED,
967 ] {
968 if let Some(v) = src_meta.get(key) {
969 dest_meta
972 .entry(key.to_string())
973 .or_insert_with(|| v.clone());
974 }
975 }
976 debug!(
977 src_bucket = %bucket,
978 src_key = %key,
979 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
980 );
981 }
982 }
983 self.backend.copy_object(req).await
984 }
985 async fn list_objects(
986 &self,
987 req: S3Request<ListObjectsInput>,
988 ) -> S3Result<S3Response<ListObjectsOutput>> {
989 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
990 let mut resp = self.backend.list_objects(req).await?;
991 if let Some(contents) = resp.output.contents.as_mut() {
993 contents.retain(|o| {
994 o.key
995 .as_ref()
996 .map(|k| !k.ends_with(".s4index"))
997 .unwrap_or(true)
998 });
999 }
1000 Ok(resp)
1001 }
1002 async fn list_objects_v2(
1003 &self,
1004 req: S3Request<ListObjectsV2Input>,
1005 ) -> S3Result<S3Response<ListObjectsV2Output>> {
1006 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1007 let mut resp = self.backend.list_objects_v2(req).await?;
1008 if let Some(contents) = resp.output.contents.as_mut() {
1009 let before = contents.len();
1010 contents.retain(|o| {
1011 o.key
1012 .as_ref()
1013 .map(|k| !k.ends_with(".s4index"))
1014 .unwrap_or(true)
1015 });
1016 if let Some(kc) = resp.output.key_count.as_mut() {
1018 *kc -= (before - contents.len()) as i32;
1019 }
1020 }
1021 Ok(resp)
1022 }
1023 async fn create_multipart_upload(
1024 &self,
1025 mut req: S3Request<CreateMultipartUploadInput>,
1026 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
1027 let codec_kind = self.registry.default_kind();
1031 let meta = req.input.metadata.get_or_insert_with(Default::default);
1032 meta.insert(META_MULTIPART.into(), "true".into());
1033 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
1034 debug!(
1035 bucket = ?req.input.bucket,
1036 key = ?req.input.key,
1037 codec = codec_kind.as_str(),
1038 "S4 create_multipart_upload: marking object for per-part compression"
1039 );
1040 self.backend.create_multipart_upload(req).await
1041 }
1042
1043 async fn upload_part(
1044 &self,
1045 mut req: S3Request<UploadPartInput>,
1046 ) -> S3Result<S3Response<UploadPartOutput>> {
1047 if let Some(blob) = req.input.body.take() {
1053 let bytes = collect_blob(blob, self.max_body_bytes)
1054 .await
1055 .map_err(internal("collect upload_part body"))?;
1056 let sample_len = bytes.len().min(SAMPLE_BYTES);
1057 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1058 let original_size = bytes.len() as u64;
1059 let (compressed, manifest) = self
1060 .registry
1061 .compress(bytes, codec_kind)
1062 .await
1063 .map_err(internal("registry compress part"))?;
1064 let header = FrameHeader {
1065 codec: codec_kind,
1066 original_size,
1067 compressed_size: compressed.len() as u64,
1068 crc32c: manifest.crc32c,
1069 };
1070 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1071 write_frame(&mut framed, header, &compressed);
1072 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1086 if !likely_final {
1087 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1088 }
1089 let framed_bytes = framed.freeze();
1090 let new_len = framed_bytes.len() as i64;
1091 req.input.content_length = Some(new_len);
1093 req.input.checksum_algorithm = None;
1094 req.input.checksum_crc32 = None;
1095 req.input.checksum_crc32c = None;
1096 req.input.checksum_crc64nvme = None;
1097 req.input.checksum_sha1 = None;
1098 req.input.checksum_sha256 = None;
1099 req.input.content_md5 = None;
1100 req.input.body = Some(bytes_to_blob(framed_bytes));
1101 debug!(
1102 part_number = ?req.input.part_number,
1103 upload_id = ?req.input.upload_id,
1104 original_size,
1105 framed_size = new_len,
1106 "S4 upload_part: framed compressed payload"
1107 );
1108 }
1109 self.backend.upload_part(req).await
1110 }
1111 async fn complete_multipart_upload(
1112 &self,
1113 req: S3Request<CompleteMultipartUploadInput>,
1114 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1115 let bucket = req.input.bucket.clone();
1116 let key = req.input.key.clone();
1117 let resp = self.backend.complete_multipart_upload(req).await?;
1118 let bucket_clone = bucket.clone();
1124 let key_clone = key.clone();
1125 let get_input = GetObjectInput {
1126 bucket: bucket_clone.clone(),
1127 key: key_clone.clone(),
1128 ..Default::default()
1129 };
1130 let get_req = S3Request {
1131 input: get_input,
1132 method: http::Method::GET,
1133 uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
1134 headers: http::HeaderMap::new(),
1135 extensions: http::Extensions::new(),
1136 credentials: None,
1137 region: None,
1138 service: None,
1139 trailing_headers: None,
1140 };
1141 if let Ok(get_resp) = self.backend.get_object(get_req).await
1142 && let Some(blob) = get_resp.output.body
1143 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
1144 && let Ok(index) = build_index_from_body(&body)
1145 {
1146 self.write_sidecar(&bucket, &key, &index).await;
1147 }
1148 Ok(resp)
1149 }
1150 async fn abort_multipart_upload(
1151 &self,
1152 req: S3Request<AbortMultipartUploadInput>,
1153 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1154 self.backend.abort_multipart_upload(req).await
1155 }
1156 async fn list_multipart_uploads(
1157 &self,
1158 req: S3Request<ListMultipartUploadsInput>,
1159 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
1160 self.backend.list_multipart_uploads(req).await
1161 }
1162 async fn list_parts(
1163 &self,
1164 req: S3Request<ListPartsInput>,
1165 ) -> S3Result<S3Response<ListPartsOutput>> {
1166 self.backend.list_parts(req).await
1167 }
1168
1169 async fn get_object_acl(
1185 &self,
1186 req: S3Request<GetObjectAclInput>,
1187 ) -> S3Result<S3Response<GetObjectAclOutput>> {
1188 self.backend.get_object_acl(req).await
1189 }
1190 async fn put_object_acl(
1191 &self,
1192 req: S3Request<PutObjectAclInput>,
1193 ) -> S3Result<S3Response<PutObjectAclOutput>> {
1194 self.backend.put_object_acl(req).await
1195 }
1196 async fn get_object_tagging(
1197 &self,
1198 req: S3Request<GetObjectTaggingInput>,
1199 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1200 self.backend.get_object_tagging(req).await
1201 }
1202 async fn put_object_tagging(
1203 &self,
1204 req: S3Request<PutObjectTaggingInput>,
1205 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1206 self.backend.put_object_tagging(req).await
1207 }
1208 async fn delete_object_tagging(
1209 &self,
1210 req: S3Request<DeleteObjectTaggingInput>,
1211 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1212 self.backend.delete_object_tagging(req).await
1213 }
1214 async fn get_object_attributes(
1215 &self,
1216 req: S3Request<GetObjectAttributesInput>,
1217 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1218 self.backend.get_object_attributes(req).await
1219 }
1220 async fn restore_object(
1221 &self,
1222 req: S3Request<RestoreObjectInput>,
1223 ) -> S3Result<S3Response<RestoreObjectOutput>> {
1224 self.backend.restore_object(req).await
1225 }
1226 async fn upload_part_copy(
1227 &self,
1228 req: S3Request<UploadPartCopyInput>,
1229 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1230 let CopySource::Bucket {
1241 bucket: src_bucket,
1242 key: src_key,
1243 ..
1244 } = &req.input.copy_source
1245 else {
1246 return self.backend.upload_part_copy(req).await;
1247 };
1248 let src_bucket = src_bucket.to_string();
1249 let src_key = src_key.to_string();
1250
1251 let head_input = HeadObjectInput {
1253 bucket: src_bucket.clone(),
1254 key: src_key.clone(),
1255 ..Default::default()
1256 };
1257 let head_req = S3Request {
1258 input: head_input,
1259 method: http::Method::HEAD,
1260 uri: req.uri.clone(),
1261 headers: req.headers.clone(),
1262 extensions: http::Extensions::new(),
1263 credentials: req.credentials.clone(),
1264 region: req.region.clone(),
1265 service: req.service.clone(),
1266 trailing_headers: None,
1267 };
1268 let needs_s4_copy = match self.backend.head_object(head_req).await {
1269 Ok(h) => {
1270 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
1271 }
1272 Err(_) => false,
1273 };
1274 if !needs_s4_copy {
1275 return self.backend.upload_part_copy(req).await;
1276 }
1277
1278 let source_range = req
1280 .input
1281 .copy_source_range
1282 .as_ref()
1283 .map(|r| parse_copy_source_range(r))
1284 .transpose()
1285 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1286
1287 let mut get_input = GetObjectInput {
1291 bucket: src_bucket.clone(),
1292 key: src_key.clone(),
1293 ..Default::default()
1294 };
1295 get_input.range = source_range;
1296 let get_req = S3Request {
1297 input: get_input,
1298 method: http::Method::GET,
1299 uri: req.uri.clone(),
1300 headers: req.headers.clone(),
1301 extensions: http::Extensions::new(),
1302 credentials: req.credentials.clone(),
1303 region: req.region.clone(),
1304 service: req.service.clone(),
1305 trailing_headers: None,
1306 };
1307 let get_resp = self.get_object(get_req).await?;
1308 let blob = get_resp.output.body.ok_or_else(|| {
1309 S3Error::with_message(
1310 S3ErrorCode::InternalError,
1311 "upload_part_copy: empty body from source GET",
1312 )
1313 })?;
1314 let bytes = collect_blob(blob, self.max_body_bytes)
1315 .await
1316 .map_err(internal("collect upload_part_copy source body"))?;
1317
1318 let sample_len = bytes.len().min(SAMPLE_BYTES);
1320 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1321 let original_size = bytes.len() as u64;
1322 let (compressed, manifest) = self
1323 .registry
1324 .compress(bytes, codec_kind)
1325 .await
1326 .map_err(internal("registry compress upload_part_copy"))?;
1327 let header = FrameHeader {
1328 codec: codec_kind,
1329 original_size,
1330 compressed_size: compressed.len() as u64,
1331 crc32c: manifest.crc32c,
1332 };
1333 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1334 write_frame(&mut framed, header, &compressed);
1335 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1336 if !likely_final {
1337 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1338 }
1339 let framed_bytes = framed.freeze();
1340 let framed_len = framed_bytes.len() as i64;
1341
1342 let part_input = UploadPartInput {
1344 bucket: req.input.bucket.clone(),
1345 key: req.input.key.clone(),
1346 part_number: req.input.part_number,
1347 upload_id: req.input.upload_id.clone(),
1348 body: Some(bytes_to_blob(framed_bytes)),
1349 content_length: Some(framed_len),
1350 ..Default::default()
1351 };
1352 let part_req = S3Request {
1353 input: part_input,
1354 method: http::Method::PUT,
1355 uri: req.uri.clone(),
1356 headers: req.headers.clone(),
1357 extensions: http::Extensions::new(),
1358 credentials: req.credentials.clone(),
1359 region: req.region.clone(),
1360 service: req.service.clone(),
1361 trailing_headers: None,
1362 };
1363 let upload_resp = self.backend.upload_part(part_req).await?;
1364
1365 let copy_output = UploadPartCopyOutput {
1366 copy_part_result: Some(CopyPartResult {
1367 e_tag: upload_resp.output.e_tag.clone(),
1368 ..Default::default()
1369 }),
1370 ..Default::default()
1371 };
1372 Ok(S3Response::new(copy_output))
1373 }
1374
1375 async fn get_object_lock_configuration(
1377 &self,
1378 req: S3Request<GetObjectLockConfigurationInput>,
1379 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1380 self.backend.get_object_lock_configuration(req).await
1381 }
1382 async fn put_object_lock_configuration(
1383 &self,
1384 req: S3Request<PutObjectLockConfigurationInput>,
1385 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1386 self.backend.put_object_lock_configuration(req).await
1387 }
1388 async fn get_object_legal_hold(
1389 &self,
1390 req: S3Request<GetObjectLegalHoldInput>,
1391 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1392 self.backend.get_object_legal_hold(req).await
1393 }
1394 async fn put_object_legal_hold(
1395 &self,
1396 req: S3Request<PutObjectLegalHoldInput>,
1397 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1398 self.backend.put_object_legal_hold(req).await
1399 }
1400 async fn get_object_retention(
1401 &self,
1402 req: S3Request<GetObjectRetentionInput>,
1403 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1404 self.backend.get_object_retention(req).await
1405 }
1406 async fn put_object_retention(
1407 &self,
1408 req: S3Request<PutObjectRetentionInput>,
1409 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1410 self.backend.put_object_retention(req).await
1411 }
1412
1413 async fn list_object_versions(
1415 &self,
1416 req: S3Request<ListObjectVersionsInput>,
1417 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1418 self.backend.list_object_versions(req).await
1419 }
1420 async fn get_bucket_versioning(
1421 &self,
1422 req: S3Request<GetBucketVersioningInput>,
1423 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1424 self.backend.get_bucket_versioning(req).await
1425 }
1426 async fn put_bucket_versioning(
1427 &self,
1428 req: S3Request<PutBucketVersioningInput>,
1429 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1430 self.backend.put_bucket_versioning(req).await
1431 }
1432
1433 async fn get_bucket_location(
1435 &self,
1436 req: S3Request<GetBucketLocationInput>,
1437 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1438 self.backend.get_bucket_location(req).await
1439 }
1440
1441 async fn get_bucket_policy(
1443 &self,
1444 req: S3Request<GetBucketPolicyInput>,
1445 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1446 self.backend.get_bucket_policy(req).await
1447 }
1448 async fn put_bucket_policy(
1449 &self,
1450 req: S3Request<PutBucketPolicyInput>,
1451 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1452 self.backend.put_bucket_policy(req).await
1453 }
1454 async fn delete_bucket_policy(
1455 &self,
1456 req: S3Request<DeleteBucketPolicyInput>,
1457 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1458 self.backend.delete_bucket_policy(req).await
1459 }
1460 async fn get_bucket_policy_status(
1461 &self,
1462 req: S3Request<GetBucketPolicyStatusInput>,
1463 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1464 self.backend.get_bucket_policy_status(req).await
1465 }
1466
1467 async fn get_bucket_acl(
1469 &self,
1470 req: S3Request<GetBucketAclInput>,
1471 ) -> S3Result<S3Response<GetBucketAclOutput>> {
1472 self.backend.get_bucket_acl(req).await
1473 }
1474 async fn put_bucket_acl(
1475 &self,
1476 req: S3Request<PutBucketAclInput>,
1477 ) -> S3Result<S3Response<PutBucketAclOutput>> {
1478 self.backend.put_bucket_acl(req).await
1479 }
1480
1481 async fn get_bucket_cors(
1483 &self,
1484 req: S3Request<GetBucketCorsInput>,
1485 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1486 self.backend.get_bucket_cors(req).await
1487 }
1488 async fn put_bucket_cors(
1489 &self,
1490 req: S3Request<PutBucketCorsInput>,
1491 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1492 self.backend.put_bucket_cors(req).await
1493 }
1494 async fn delete_bucket_cors(
1495 &self,
1496 req: S3Request<DeleteBucketCorsInput>,
1497 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1498 self.backend.delete_bucket_cors(req).await
1499 }
1500
1501 async fn get_bucket_lifecycle_configuration(
1503 &self,
1504 req: S3Request<GetBucketLifecycleConfigurationInput>,
1505 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1506 self.backend.get_bucket_lifecycle_configuration(req).await
1507 }
1508 async fn put_bucket_lifecycle_configuration(
1509 &self,
1510 req: S3Request<PutBucketLifecycleConfigurationInput>,
1511 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1512 self.backend.put_bucket_lifecycle_configuration(req).await
1513 }
1514 async fn delete_bucket_lifecycle(
1515 &self,
1516 req: S3Request<DeleteBucketLifecycleInput>,
1517 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1518 self.backend.delete_bucket_lifecycle(req).await
1519 }
1520
1521 async fn get_bucket_tagging(
1523 &self,
1524 req: S3Request<GetBucketTaggingInput>,
1525 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1526 self.backend.get_bucket_tagging(req).await
1527 }
1528 async fn put_bucket_tagging(
1529 &self,
1530 req: S3Request<PutBucketTaggingInput>,
1531 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1532 self.backend.put_bucket_tagging(req).await
1533 }
1534 async fn delete_bucket_tagging(
1535 &self,
1536 req: S3Request<DeleteBucketTaggingInput>,
1537 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1538 self.backend.delete_bucket_tagging(req).await
1539 }
1540
1541 async fn get_bucket_encryption(
1543 &self,
1544 req: S3Request<GetBucketEncryptionInput>,
1545 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1546 self.backend.get_bucket_encryption(req).await
1547 }
1548 async fn put_bucket_encryption(
1549 &self,
1550 req: S3Request<PutBucketEncryptionInput>,
1551 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1552 self.backend.put_bucket_encryption(req).await
1553 }
1554 async fn delete_bucket_encryption(
1555 &self,
1556 req: S3Request<DeleteBucketEncryptionInput>,
1557 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1558 self.backend.delete_bucket_encryption(req).await
1559 }
1560
1561 async fn get_bucket_logging(
1563 &self,
1564 req: S3Request<GetBucketLoggingInput>,
1565 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1566 self.backend.get_bucket_logging(req).await
1567 }
1568 async fn put_bucket_logging(
1569 &self,
1570 req: S3Request<PutBucketLoggingInput>,
1571 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1572 self.backend.put_bucket_logging(req).await
1573 }
1574
1575 async fn get_bucket_notification_configuration(
1577 &self,
1578 req: S3Request<GetBucketNotificationConfigurationInput>,
1579 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1580 self.backend
1581 .get_bucket_notification_configuration(req)
1582 .await
1583 }
1584 async fn put_bucket_notification_configuration(
1585 &self,
1586 req: S3Request<PutBucketNotificationConfigurationInput>,
1587 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1588 self.backend
1589 .put_bucket_notification_configuration(req)
1590 .await
1591 }
1592
1593 async fn get_bucket_request_payment(
1595 &self,
1596 req: S3Request<GetBucketRequestPaymentInput>,
1597 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1598 self.backend.get_bucket_request_payment(req).await
1599 }
1600 async fn put_bucket_request_payment(
1601 &self,
1602 req: S3Request<PutBucketRequestPaymentInput>,
1603 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1604 self.backend.put_bucket_request_payment(req).await
1605 }
1606
1607 async fn get_bucket_website(
1609 &self,
1610 req: S3Request<GetBucketWebsiteInput>,
1611 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1612 self.backend.get_bucket_website(req).await
1613 }
1614 async fn put_bucket_website(
1615 &self,
1616 req: S3Request<PutBucketWebsiteInput>,
1617 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1618 self.backend.put_bucket_website(req).await
1619 }
1620 async fn delete_bucket_website(
1621 &self,
1622 req: S3Request<DeleteBucketWebsiteInput>,
1623 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1624 self.backend.delete_bucket_website(req).await
1625 }
1626
1627 async fn get_bucket_replication(
1629 &self,
1630 req: S3Request<GetBucketReplicationInput>,
1631 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1632 self.backend.get_bucket_replication(req).await
1633 }
1634 async fn put_bucket_replication(
1635 &self,
1636 req: S3Request<PutBucketReplicationInput>,
1637 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1638 self.backend.put_bucket_replication(req).await
1639 }
1640 async fn delete_bucket_replication(
1641 &self,
1642 req: S3Request<DeleteBucketReplicationInput>,
1643 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1644 self.backend.delete_bucket_replication(req).await
1645 }
1646
1647 async fn get_bucket_accelerate_configuration(
1649 &self,
1650 req: S3Request<GetBucketAccelerateConfigurationInput>,
1651 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1652 self.backend.get_bucket_accelerate_configuration(req).await
1653 }
1654 async fn put_bucket_accelerate_configuration(
1655 &self,
1656 req: S3Request<PutBucketAccelerateConfigurationInput>,
1657 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1658 self.backend.put_bucket_accelerate_configuration(req).await
1659 }
1660
1661 async fn get_bucket_ownership_controls(
1663 &self,
1664 req: S3Request<GetBucketOwnershipControlsInput>,
1665 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1666 self.backend.get_bucket_ownership_controls(req).await
1667 }
1668 async fn put_bucket_ownership_controls(
1669 &self,
1670 req: S3Request<PutBucketOwnershipControlsInput>,
1671 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1672 self.backend.put_bucket_ownership_controls(req).await
1673 }
1674 async fn delete_bucket_ownership_controls(
1675 &self,
1676 req: S3Request<DeleteBucketOwnershipControlsInput>,
1677 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1678 self.backend.delete_bucket_ownership_controls(req).await
1679 }
1680
1681 async fn get_public_access_block(
1683 &self,
1684 req: S3Request<GetPublicAccessBlockInput>,
1685 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1686 self.backend.get_public_access_block(req).await
1687 }
1688 async fn put_public_access_block(
1689 &self,
1690 req: S3Request<PutPublicAccessBlockInput>,
1691 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1692 self.backend.put_public_access_block(req).await
1693 }
1694 async fn delete_public_access_block(
1695 &self,
1696 req: S3Request<DeletePublicAccessBlockInput>,
1697 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1698 self.backend.delete_public_access_block(req).await
1699 }
1700}
1701
1702#[cfg(test)]
1703mod tests {
1704 use super::*;
1705
1706 #[test]
1707 fn manifest_roundtrip_via_metadata() {
1708 let original = ChunkManifest {
1709 codec: CodecKind::CpuZstd,
1710 original_size: 1234,
1711 compressed_size: 567,
1712 crc32c: 0xdead_beef,
1713 };
1714 let mut meta: Option<Metadata> = None;
1715 write_manifest(&mut meta, &original);
1716 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1717 assert_eq!(extracted.codec, original.codec);
1718 assert_eq!(extracted.original_size, original.original_size);
1719 assert_eq!(extracted.compressed_size, original.compressed_size);
1720 assert_eq!(extracted.crc32c, original.crc32c);
1721 }
1722
1723 #[test]
1724 fn missing_metadata_yields_none() {
1725 let meta: Option<Metadata> = None;
1726 assert!(extract_manifest(&meta).is_none());
1727 }
1728
1729 #[test]
1730 fn partial_metadata_yields_none() {
1731 let mut meta = Metadata::new();
1732 meta.insert(META_CODEC.into(), "cpu-zstd".into());
1733 let opt = Some(meta);
1734 assert!(extract_manifest(&opt).is_none());
1735 }
1736
1737 #[test]
1738 fn parse_copy_source_range_basic() {
1739 let r = parse_copy_source_range("bytes=10-20").unwrap();
1740 match r {
1741 s3s::dto::Range::Int { first, last } => {
1742 assert_eq!(first, 10);
1743 assert_eq!(last, Some(20));
1744 }
1745 _ => panic!("expected Int range"),
1746 }
1747 }
1748
1749 #[test]
1750 fn parse_copy_source_range_rejects_inverted() {
1751 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
1752 assert!(err.contains("last < first"));
1753 }
1754
1755 #[test]
1756 fn parse_copy_source_range_rejects_missing_prefix() {
1757 let err = parse_copy_source_range("10-20").unwrap_err();
1758 assert!(err.contains("must start with 'bytes='"));
1759 }
1760
1761 #[test]
1762 fn parse_copy_source_range_rejects_open_ended() {
1763 assert!(parse_copy_source_range("bytes=10-").is_err());
1766 assert!(parse_copy_source_range("bytes=-10").is_err());
1767 }
1768}