1use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39 write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49 cpu_zstd_decompress_stream, streaming_compress_cpu_zstd, streaming_passthrough,
50 supports_streaming_compress, supports_streaming_decompress,
51};
52
53const SAMPLE_BYTES: usize = 4096;
55
56pub struct S4Service<B: S3> {
57 backend: B,
58 registry: Arc<CodecRegistry>,
59 dispatcher: Arc<dyn CodecDispatcher>,
60 max_body_bytes: usize,
61}
62
63impl<B: S3> S4Service<B> {
64 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
66
67 pub fn new(
68 backend: B,
69 registry: Arc<CodecRegistry>,
70 dispatcher: Arc<dyn CodecDispatcher>,
71 ) -> Self {
72 Self {
73 backend,
74 registry,
75 dispatcher,
76 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
77 }
78 }
79
80 #[must_use]
81 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
82 self.max_body_bytes = n;
83 self
84 }
85
86 pub fn into_backend(self) -> B {
88 self.backend
89 }
90
91 async fn partial_range_get(
94 &self,
95 req: &S3Request<GetObjectInput>,
96 plan: s4_codec::index::RangePlan,
97 client_start: u64,
98 client_end_exclusive: u64,
99 total_original: u64,
100 get_start: Instant,
101 ) -> S3Result<S3Response<GetObjectOutput>> {
102 let backend_range = s3s::dto::Range::Int {
104 first: plan.byte_start,
105 last: Some(plan.byte_end_exclusive - 1),
106 };
107 let backend_input = GetObjectInput {
108 bucket: req.input.bucket.clone(),
109 key: req.input.key.clone(),
110 range: Some(backend_range),
111 ..Default::default()
112 };
113 let backend_req = S3Request {
114 input: backend_input,
115 method: req.method.clone(),
116 uri: req.uri.clone(),
117 headers: req.headers.clone(),
118 extensions: http::Extensions::new(),
119 credentials: req.credentials.clone(),
120 region: req.region.clone(),
121 service: req.service.clone(),
122 trailing_headers: None,
123 };
124 let mut backend_resp = self.backend.get_object(backend_req).await?;
125 let blob = backend_resp.output.body.take().ok_or_else(|| {
126 S3Error::with_message(
127 S3ErrorCode::InternalError,
128 "backend partial GET returned empty body",
129 )
130 })?;
131 let bytes = collect_blob(blob, self.max_body_bytes)
132 .await
133 .map_err(internal("collect partial body"))?;
134
135 let mut combined = BytesMut::new();
137 for frame in FrameIter::new(bytes) {
138 let (header, payload) = frame.map_err(|e| {
139 S3Error::with_message(
140 S3ErrorCode::InternalError,
141 format!("partial-range frame parse: {e}"),
142 )
143 })?;
144 let chunk_manifest = ChunkManifest {
145 codec: header.codec,
146 original_size: header.original_size,
147 compressed_size: header.compressed_size,
148 crc32c: header.crc32c,
149 };
150 let decompressed = self
151 .registry
152 .decompress(payload, &chunk_manifest)
153 .await
154 .map_err(internal("partial-range decompress"))?;
155 combined.extend_from_slice(&decompressed);
156 }
157 let combined = combined.freeze();
158 let sliced = combined
159 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
160
161 let returned_size = sliced.len() as u64;
163 backend_resp.output.content_length = Some(returned_size as i64);
164 backend_resp.output.content_range = Some(format!(
165 "bytes {client_start}-{}/{total_original}",
166 client_end_exclusive - 1
167 ));
168 backend_resp.output.checksum_crc32 = None;
169 backend_resp.output.checksum_crc32c = None;
170 backend_resp.output.checksum_crc64nvme = None;
171 backend_resp.output.checksum_sha1 = None;
172 backend_resp.output.checksum_sha256 = None;
173 backend_resp.output.e_tag = None;
174 backend_resp.output.body = Some(bytes_to_blob(sliced));
175 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
176
177 let elapsed = get_start.elapsed();
178 crate::metrics::record_get(
179 "partial",
180 plan.byte_end_exclusive - plan.byte_start,
181 returned_size,
182 elapsed.as_secs_f64(),
183 true,
184 );
185 info!(
186 op = "get_object",
187 bucket = %req.input.bucket,
188 key = %req.input.key,
189 bytes_in = plan.byte_end_exclusive - plan.byte_start,
190 bytes_out = returned_size,
191 total_object_size = total_original,
192 range = true,
193 path = "sidecar-partial",
194 latency_ms = elapsed.as_millis() as u64,
195 "S4 partial Range GET via sidecar index"
196 );
197 Ok(backend_resp)
198 }
199
200 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
204 let bytes = encode_index(index);
205 let len = bytes.len() as i64;
206 let put_input = PutObjectInput {
207 bucket: bucket.into(),
208 key: sidecar_key(key),
209 body: Some(bytes_to_blob(bytes)),
210 content_length: Some(len),
211 content_type: Some("application/x-s4-index".into()),
212 ..Default::default()
213 };
214 let put_req = S3Request {
215 input: put_input,
216 method: http::Method::PUT,
217 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
218 headers: http::HeaderMap::new(),
219 extensions: http::Extensions::new(),
220 credentials: None,
221 region: None,
222 service: None,
223 trailing_headers: None,
224 };
225 if let Err(e) = self.backend.put_object(put_req).await {
226 tracing::warn!(
227 bucket,
228 key,
229 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
230 );
231 }
232 }
233
234 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
236 let get_input = GetObjectInput {
237 bucket: bucket.into(),
238 key: sidecar_key(key),
239 ..Default::default()
240 };
241 let get_req = S3Request {
242 input: get_input,
243 method: http::Method::GET,
244 uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
245 headers: http::HeaderMap::new(),
246 extensions: http::Extensions::new(),
247 credentials: None,
248 region: None,
249 service: None,
250 trailing_headers: None,
251 };
252 let resp = self.backend.get_object(get_req).await.ok()?;
253 let blob = resp.output.body?;
254 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
255 decode_index(bytes).ok()
256 }
257
258 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
264 let mut out = BytesMut::new();
265 for frame in FrameIter::new(bytes) {
266 let (header, payload) = frame.map_err(|e| {
267 S3Error::with_message(
268 S3ErrorCode::InternalError,
269 format!("multipart frame parse: {e}"),
270 )
271 })?;
272 let chunk_manifest = ChunkManifest {
273 codec: header.codec,
274 original_size: header.original_size,
275 compressed_size: header.compressed_size,
276 crc32c: header.crc32c,
277 };
278 let decompressed = self
279 .registry
280 .decompress(payload, &chunk_manifest)
281 .await
282 .map_err(internal("multipart frame decompress"))?;
283 out.extend_from_slice(&decompressed);
284 }
285 Ok(out.freeze())
286 }
287}
288
289fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
290 metadata
291 .as_ref()
292 .and_then(|m| m.get(META_MULTIPART))
293 .map(|v| v == "true")
294 .unwrap_or(false)
295}
296
297const META_CODEC: &str = "s4-codec";
298const META_ORIGINAL_SIZE: &str = "s4-original-size";
299const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
300const META_CRC32C: &str = "s4-crc32c";
301const META_MULTIPART: &str = "s4-multipart";
304
305fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
306 let m = metadata.as_ref()?;
307 let codec = m
308 .get(META_CODEC)
309 .and_then(|s| s.parse::<CodecKind>().ok())?;
310 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
311 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
312 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
313 Some(ChunkManifest {
314 codec,
315 original_size,
316 compressed_size,
317 crc32c,
318 })
319}
320
321fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
322 let meta = metadata.get_or_insert_with(Default::default);
323 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
324 meta.insert(
325 META_ORIGINAL_SIZE.into(),
326 manifest.original_size.to_string(),
327 );
328 meta.insert(
329 META_COMPRESSED_SIZE.into(),
330 manifest.compressed_size.to_string(),
331 );
332 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
333}
334
335fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
336 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
337}
338
339pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
343 if total == 0 {
344 return Err("cannot range-get zero-length object".into());
345 }
346 match range {
347 s3s::dto::Range::Int { first, last } => {
348 let start = *first;
349 let end_inclusive = match last {
350 Some(l) => (*l).min(total - 1),
351 None => total - 1,
352 };
353 if start > end_inclusive || start >= total {
354 return Err(format!(
355 "range bytes={start}-{:?} out of object size {total}",
356 last
357 ));
358 }
359 Ok((start, end_inclusive + 1))
360 }
361 s3s::dto::Range::Suffix { length } => {
362 let len = (*length).min(total);
363 Ok((total - len, total))
364 }
365 }
366}
367
368#[async_trait::async_trait]
369impl<B: S3> S3 for S4Service<B> {
370 #[tracing::instrument(
372 name = "s4.put_object",
373 skip(self, req),
374 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
375 )]
376 async fn put_object(
377 &self,
378 mut req: S3Request<PutObjectInput>,
379 ) -> S3Result<S3Response<PutObjectOutput>> {
380 let put_start = Instant::now();
381 let put_bucket = req.input.bucket.clone();
382 let put_key = req.input.key.clone();
383 if let Some(blob) = req.input.body.take() {
384 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
387 .await
388 .map_err(internal("peek put sample"))?;
389 let sample_len = sample.len().min(SAMPLE_BYTES);
390 let kind = self.dispatcher.pick(&sample[..sample_len]).await;
391
392 let (compressed, manifest) = if supports_streaming_compress(kind) {
393 let chained = chain_sample_with_rest(sample, rest_stream);
395 debug!(
396 bucket = ?req.input.bucket,
397 key = ?req.input.key,
398 codec = kind.as_str(),
399 path = "streaming",
400 "S4 put_object: compressing (streaming)"
401 );
402 match kind {
403 CodecKind::CpuZstd => {
404 streaming_compress_cpu_zstd(chained, 3).await
409 }
410 CodecKind::Passthrough => streaming_passthrough(chained).await,
411 other => unreachable!("supports_streaming_compress lied: {other:?}"),
412 }
413 .map_err(internal("streaming compress"))?
414 } else {
415 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
417 .await
418 .map_err(internal("collect put body (gpu path)"))?;
419 debug!(
420 bucket = ?req.input.bucket,
421 key = ?req.input.key,
422 bytes = bytes.len(),
423 codec = kind.as_str(),
424 path = "buffered",
425 "S4 put_object: compressing (buffered)"
426 );
427 self.registry
428 .compress(bytes, kind)
429 .await
430 .map_err(internal("registry compress"))?
431 };
432
433 write_manifest(&mut req.input.metadata, &manifest);
434 req.input.content_length = Some(compressed.len() as i64);
438 req.input.checksum_algorithm = None;
443 req.input.checksum_crc32 = None;
444 req.input.checksum_crc32c = None;
445 req.input.checksum_crc64nvme = None;
446 req.input.checksum_sha1 = None;
447 req.input.checksum_sha256 = None;
448 req.input.content_md5 = None;
449 let original_size = manifest.original_size;
450 let compressed_size = manifest.compressed_size;
451 let codec_label = manifest.codec.as_str();
452 req.input.body = Some(bytes_to_blob(compressed));
453 let backend_resp = self.backend.put_object(req).await;
454 let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
460 crate::metrics::record_put(
461 codec_label,
462 original_size,
463 compressed_size,
464 elapsed.as_secs_f64(),
465 backend_resp.is_ok(),
466 );
467 info!(
468 op = "put_object",
469 bucket = %put_bucket,
470 key = %put_key,
471 codec = codec_label,
472 bytes_in = original_size,
473 bytes_out = compressed_size,
474 ratio = format!(
475 "{:.3}",
476 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
477 ),
478 latency_ms = elapsed.as_millis() as u64,
479 ok = backend_resp.is_ok(),
480 "S4 put completed"
481 );
482 return backend_resp;
483 }
484 self.backend.put_object(req).await
485 }
486
487 #[tracing::instrument(
489 name = "s4.get_object",
490 skip(self, req),
491 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
492 )]
493 async fn get_object(
494 &self,
495 mut req: S3Request<GetObjectInput>,
496 ) -> S3Result<S3Response<GetObjectOutput>> {
497 let get_start = Instant::now();
498 let get_bucket = req.input.bucket.clone();
499 let get_key = req.input.key.clone();
500 let range_request = req.input.range.take();
502
503 if let Some(ref r) = range_request
507 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
508 {
509 let total = index.total_original_size();
510 let (start, end_exclusive) = match resolve_range(r, total) {
511 Ok(v) => v,
512 Err(e) => {
513 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
514 }
515 };
516 if let Some(plan) = index.lookup_range(start, end_exclusive) {
517 return self
518 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
519 .await;
520 }
521 }
522 let mut resp = self.backend.get_object(req).await?;
523 let is_multipart = is_multipart_object(&resp.output.metadata);
524 let manifest_opt = extract_manifest(&resp.output.metadata);
525
526 if !is_multipart && manifest_opt.is_none() {
527 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
529 return Ok(resp);
530 }
531
532 if let Some(blob) = resp.output.body.take() {
533 if range_request.is_none()
541 && !is_multipart
542 && let Some(ref m) = manifest_opt
543 && supports_streaming_decompress(m.codec)
544 && m.codec == CodecKind::CpuZstd
545 {
546 let decompressed_blob = cpu_zstd_decompress_stream(blob);
547 resp.output.content_length = Some(m.original_size as i64);
548 resp.output.checksum_crc32 = None;
549 resp.output.checksum_crc32c = None;
550 resp.output.checksum_crc64nvme = None;
551 resp.output.checksum_sha1 = None;
552 resp.output.checksum_sha256 = None;
553 resp.output.e_tag = None;
554 resp.output.body = Some(decompressed_blob);
555 let elapsed = get_start.elapsed();
556 crate::metrics::record_get(
557 m.codec.as_str(),
558 m.compressed_size,
559 m.original_size,
560 elapsed.as_secs_f64(),
561 true,
562 );
563 info!(
564 op = "get_object",
565 bucket = %get_bucket,
566 key = %get_key,
567 codec = m.codec.as_str(),
568 bytes_in = m.compressed_size,
569 bytes_out = m.original_size,
570 path = "streaming",
571 setup_latency_ms = elapsed.as_millis() as u64,
572 "S4 get started (streaming)"
573 );
574 return Ok(resp);
575 }
576 if range_request.is_none()
578 && !is_multipart
579 && let Some(ref m) = manifest_opt
580 && m.codec == CodecKind::Passthrough
581 {
582 resp.output.content_length = Some(m.original_size as i64);
583 resp.output.checksum_crc32 = None;
584 resp.output.checksum_crc32c = None;
585 resp.output.checksum_crc64nvme = None;
586 resp.output.checksum_sha1 = None;
587 resp.output.checksum_sha256 = None;
588 resp.output.e_tag = None;
589 resp.output.body = Some(blob);
590 debug!("S4 get_object: passthrough streaming");
591 return Ok(resp);
592 }
593
594 let bytes = collect_blob(blob, self.max_body_bytes)
596 .await
597 .map_err(internal("collect get body"))?;
598
599 let decompressed = if is_multipart {
600 self.decompress_multipart(bytes).await?
601 } else {
602 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
603 self.registry
604 .decompress(bytes, manifest)
605 .await
606 .map_err(internal("registry decompress"))?
607 };
608
609 let total_size = decompressed.len() as u64;
611 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
612 let (start, end) = resolve_range(r, total_size)
613 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
614 let sliced = decompressed.slice(start as usize..end as usize);
615 resp.output.content_range = Some(format!(
616 "bytes {start}-{}/{total_size}",
617 end.saturating_sub(1)
618 ));
619 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
620 } else {
621 (decompressed, None)
622 };
623 resp.output.content_length = Some(final_bytes.len() as i64);
626 resp.output.checksum_crc32 = None;
631 resp.output.checksum_crc32c = None;
632 resp.output.checksum_crc64nvme = None;
633 resp.output.checksum_sha1 = None;
634 resp.output.checksum_sha256 = None;
635 resp.output.e_tag = None;
636 let returned_size = final_bytes.len() as u64;
637 let codec_label = manifest_opt
638 .as_ref()
639 .map(|m| m.codec.as_str())
640 .unwrap_or("multipart");
641 resp.output.body = Some(bytes_to_blob(final_bytes));
642 if let Some(status) = status_override {
643 resp.status = Some(status);
644 }
645 let elapsed = get_start.elapsed();
646 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
647 info!(
648 op = "get_object",
649 bucket = %get_bucket,
650 key = %get_key,
651 codec = codec_label,
652 bytes_out = returned_size,
653 total_object_size = total_size,
654 range = range_request.is_some(),
655 path = "buffered",
656 latency_ms = elapsed.as_millis() as u64,
657 "S4 get completed (buffered)"
658 );
659 }
660 Ok(resp)
661 }
662
663 async fn head_bucket(
665 &self,
666 req: S3Request<HeadBucketInput>,
667 ) -> S3Result<S3Response<HeadBucketOutput>> {
668 self.backend.head_bucket(req).await
669 }
670 async fn list_buckets(
671 &self,
672 req: S3Request<ListBucketsInput>,
673 ) -> S3Result<S3Response<ListBucketsOutput>> {
674 self.backend.list_buckets(req).await
675 }
676 async fn create_bucket(
677 &self,
678 req: S3Request<CreateBucketInput>,
679 ) -> S3Result<S3Response<CreateBucketOutput>> {
680 self.backend.create_bucket(req).await
681 }
682 async fn delete_bucket(
683 &self,
684 req: S3Request<DeleteBucketInput>,
685 ) -> S3Result<S3Response<DeleteBucketOutput>> {
686 self.backend.delete_bucket(req).await
687 }
688 async fn head_object(
689 &self,
690 req: S3Request<HeadObjectInput>,
691 ) -> S3Result<S3Response<HeadObjectOutput>> {
692 let mut resp = self.backend.head_object(req).await?;
693 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
694 resp.output.content_length = Some(manifest.original_size as i64);
698 resp.output.checksum_crc32 = None;
699 resp.output.checksum_crc32c = None;
700 resp.output.checksum_crc64nvme = None;
701 resp.output.checksum_sha1 = None;
702 resp.output.checksum_sha256 = None;
703 resp.output.e_tag = None;
704 }
705 Ok(resp)
706 }
707 async fn delete_object(
708 &self,
709 req: S3Request<DeleteObjectInput>,
710 ) -> S3Result<S3Response<DeleteObjectOutput>> {
711 let bucket = req.input.bucket.clone();
713 let key = req.input.key.clone();
714 let resp = self.backend.delete_object(req).await?;
715 let sidecar_input = DeleteObjectInput {
716 bucket: bucket.clone(),
717 key: sidecar_key(&key),
718 ..Default::default()
719 };
720 let sidecar_req = S3Request {
721 input: sidecar_input,
722 method: http::Method::DELETE,
723 uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
724 headers: http::HeaderMap::new(),
725 extensions: http::Extensions::new(),
726 credentials: None,
727 region: None,
728 service: None,
729 trailing_headers: None,
730 };
731 let _ = self.backend.delete_object(sidecar_req).await;
732 Ok(resp)
733 }
734 async fn delete_objects(
735 &self,
736 req: S3Request<DeleteObjectsInput>,
737 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
738 self.backend.delete_objects(req).await
739 }
740 async fn copy_object(
741 &self,
742 mut req: S3Request<CopyObjectInput>,
743 ) -> S3Result<S3Response<CopyObjectOutput>> {
744 let needs_merge = req
754 .input
755 .metadata_directive
756 .as_ref()
757 .map(|d| d.as_str() == MetadataDirective::REPLACE)
758 .unwrap_or(false);
759 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
760 let head_input = HeadObjectInput {
761 bucket: bucket.to_string(),
762 key: key.to_string(),
763 ..Default::default()
764 };
765 let head_req = S3Request {
766 input: head_input,
767 method: req.method.clone(),
768 uri: req.uri.clone(),
769 headers: req.headers.clone(),
770 extensions: http::Extensions::new(),
771 credentials: req.credentials.clone(),
772 region: req.region.clone(),
773 service: req.service.clone(),
774 trailing_headers: None,
775 };
776 if let Ok(head) = self.backend.head_object(head_req).await
777 && let Some(src_meta) = head.output.metadata.as_ref()
778 {
779 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
780 for key in [
781 META_CODEC,
782 META_ORIGINAL_SIZE,
783 META_COMPRESSED_SIZE,
784 META_CRC32C,
785 META_MULTIPART,
786 ] {
787 if let Some(v) = src_meta.get(key) {
788 dest_meta
791 .entry(key.to_string())
792 .or_insert_with(|| v.clone());
793 }
794 }
795 debug!(
796 src_bucket = %bucket,
797 src_key = %key,
798 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
799 );
800 }
801 }
802 self.backend.copy_object(req).await
803 }
804 async fn list_objects(
805 &self,
806 req: S3Request<ListObjectsInput>,
807 ) -> S3Result<S3Response<ListObjectsOutput>> {
808 let mut resp = self.backend.list_objects(req).await?;
809 if let Some(contents) = resp.output.contents.as_mut() {
811 contents.retain(|o| {
812 o.key
813 .as_ref()
814 .map(|k| !k.ends_with(".s4index"))
815 .unwrap_or(true)
816 });
817 }
818 Ok(resp)
819 }
820 async fn list_objects_v2(
821 &self,
822 req: S3Request<ListObjectsV2Input>,
823 ) -> S3Result<S3Response<ListObjectsV2Output>> {
824 let mut resp = self.backend.list_objects_v2(req).await?;
825 if let Some(contents) = resp.output.contents.as_mut() {
826 let before = contents.len();
827 contents.retain(|o| {
828 o.key
829 .as_ref()
830 .map(|k| !k.ends_with(".s4index"))
831 .unwrap_or(true)
832 });
833 if let Some(kc) = resp.output.key_count.as_mut() {
835 *kc -= (before - contents.len()) as i32;
836 }
837 }
838 Ok(resp)
839 }
840 async fn create_multipart_upload(
841 &self,
842 mut req: S3Request<CreateMultipartUploadInput>,
843 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
844 let codec_kind = self.registry.default_kind();
848 let meta = req.input.metadata.get_or_insert_with(Default::default);
849 meta.insert(META_MULTIPART.into(), "true".into());
850 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
851 debug!(
852 bucket = ?req.input.bucket,
853 key = ?req.input.key,
854 codec = codec_kind.as_str(),
855 "S4 create_multipart_upload: marking object for per-part compression"
856 );
857 self.backend.create_multipart_upload(req).await
858 }
859
860 async fn upload_part(
861 &self,
862 mut req: S3Request<UploadPartInput>,
863 ) -> S3Result<S3Response<UploadPartOutput>> {
864 if let Some(blob) = req.input.body.take() {
870 let bytes = collect_blob(blob, self.max_body_bytes)
871 .await
872 .map_err(internal("collect upload_part body"))?;
873 let sample_len = bytes.len().min(SAMPLE_BYTES);
874 let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
875 let original_size = bytes.len() as u64;
876 let (compressed, manifest) = self
877 .registry
878 .compress(bytes, codec_kind)
879 .await
880 .map_err(internal("registry compress part"))?;
881 let header = FrameHeader {
882 codec: codec_kind,
883 original_size,
884 compressed_size: compressed.len() as u64,
885 crc32c: manifest.crc32c,
886 };
887 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
888 write_frame(&mut framed, header, &compressed);
889 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
895 let framed_bytes = framed.freeze();
896 let new_len = framed_bytes.len() as i64;
897 req.input.content_length = Some(new_len);
899 req.input.checksum_algorithm = None;
900 req.input.checksum_crc32 = None;
901 req.input.checksum_crc32c = None;
902 req.input.checksum_crc64nvme = None;
903 req.input.checksum_sha1 = None;
904 req.input.checksum_sha256 = None;
905 req.input.content_md5 = None;
906 req.input.body = Some(bytes_to_blob(framed_bytes));
907 debug!(
908 part_number = ?req.input.part_number,
909 upload_id = ?req.input.upload_id,
910 original_size,
911 framed_size = new_len,
912 "S4 upload_part: framed compressed payload"
913 );
914 }
915 self.backend.upload_part(req).await
916 }
917 async fn complete_multipart_upload(
918 &self,
919 req: S3Request<CompleteMultipartUploadInput>,
920 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
921 let bucket = req.input.bucket.clone();
922 let key = req.input.key.clone();
923 let resp = self.backend.complete_multipart_upload(req).await?;
924 let bucket_clone = bucket.clone();
930 let key_clone = key.clone();
931 let get_input = GetObjectInput {
932 bucket: bucket_clone.clone(),
933 key: key_clone.clone(),
934 ..Default::default()
935 };
936 let get_req = S3Request {
937 input: get_input,
938 method: http::Method::GET,
939 uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
940 headers: http::HeaderMap::new(),
941 extensions: http::Extensions::new(),
942 credentials: None,
943 region: None,
944 service: None,
945 trailing_headers: None,
946 };
947 if let Ok(get_resp) = self.backend.get_object(get_req).await
948 && let Some(blob) = get_resp.output.body
949 && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
950 && let Ok(index) = build_index_from_body(&body)
951 {
952 self.write_sidecar(&bucket, &key, &index).await;
953 }
954 Ok(resp)
955 }
956 async fn abort_multipart_upload(
957 &self,
958 req: S3Request<AbortMultipartUploadInput>,
959 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
960 self.backend.abort_multipart_upload(req).await
961 }
962 async fn list_multipart_uploads(
963 &self,
964 req: S3Request<ListMultipartUploadsInput>,
965 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
966 self.backend.list_multipart_uploads(req).await
967 }
968 async fn list_parts(
969 &self,
970 req: S3Request<ListPartsInput>,
971 ) -> S3Result<S3Response<ListPartsOutput>> {
972 self.backend.list_parts(req).await
973 }
974
975 async fn get_object_acl(
991 &self,
992 req: S3Request<GetObjectAclInput>,
993 ) -> S3Result<S3Response<GetObjectAclOutput>> {
994 self.backend.get_object_acl(req).await
995 }
996 async fn put_object_acl(
997 &self,
998 req: S3Request<PutObjectAclInput>,
999 ) -> S3Result<S3Response<PutObjectAclOutput>> {
1000 self.backend.put_object_acl(req).await
1001 }
1002 async fn get_object_tagging(
1003 &self,
1004 req: S3Request<GetObjectTaggingInput>,
1005 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1006 self.backend.get_object_tagging(req).await
1007 }
1008 async fn put_object_tagging(
1009 &self,
1010 req: S3Request<PutObjectTaggingInput>,
1011 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1012 self.backend.put_object_tagging(req).await
1013 }
1014 async fn delete_object_tagging(
1015 &self,
1016 req: S3Request<DeleteObjectTaggingInput>,
1017 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1018 self.backend.delete_object_tagging(req).await
1019 }
1020 async fn get_object_attributes(
1021 &self,
1022 req: S3Request<GetObjectAttributesInput>,
1023 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1024 self.backend.get_object_attributes(req).await
1025 }
1026 async fn restore_object(
1027 &self,
1028 req: S3Request<RestoreObjectInput>,
1029 ) -> S3Result<S3Response<RestoreObjectOutput>> {
1030 self.backend.restore_object(req).await
1031 }
1032 async fn upload_part_copy(
1033 &self,
1034 req: S3Request<UploadPartCopyInput>,
1035 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1036 self.backend.upload_part_copy(req).await
1040 }
1041
1042 async fn get_object_lock_configuration(
1044 &self,
1045 req: S3Request<GetObjectLockConfigurationInput>,
1046 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1047 self.backend.get_object_lock_configuration(req).await
1048 }
1049 async fn put_object_lock_configuration(
1050 &self,
1051 req: S3Request<PutObjectLockConfigurationInput>,
1052 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1053 self.backend.put_object_lock_configuration(req).await
1054 }
1055 async fn get_object_legal_hold(
1056 &self,
1057 req: S3Request<GetObjectLegalHoldInput>,
1058 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1059 self.backend.get_object_legal_hold(req).await
1060 }
1061 async fn put_object_legal_hold(
1062 &self,
1063 req: S3Request<PutObjectLegalHoldInput>,
1064 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1065 self.backend.put_object_legal_hold(req).await
1066 }
1067 async fn get_object_retention(
1068 &self,
1069 req: S3Request<GetObjectRetentionInput>,
1070 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1071 self.backend.get_object_retention(req).await
1072 }
1073 async fn put_object_retention(
1074 &self,
1075 req: S3Request<PutObjectRetentionInput>,
1076 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1077 self.backend.put_object_retention(req).await
1078 }
1079
1080 async fn list_object_versions(
1082 &self,
1083 req: S3Request<ListObjectVersionsInput>,
1084 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1085 self.backend.list_object_versions(req).await
1086 }
1087 async fn get_bucket_versioning(
1088 &self,
1089 req: S3Request<GetBucketVersioningInput>,
1090 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1091 self.backend.get_bucket_versioning(req).await
1092 }
1093 async fn put_bucket_versioning(
1094 &self,
1095 req: S3Request<PutBucketVersioningInput>,
1096 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1097 self.backend.put_bucket_versioning(req).await
1098 }
1099
1100 async fn get_bucket_location(
1102 &self,
1103 req: S3Request<GetBucketLocationInput>,
1104 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1105 self.backend.get_bucket_location(req).await
1106 }
1107
1108 async fn get_bucket_policy(
1110 &self,
1111 req: S3Request<GetBucketPolicyInput>,
1112 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1113 self.backend.get_bucket_policy(req).await
1114 }
1115 async fn put_bucket_policy(
1116 &self,
1117 req: S3Request<PutBucketPolicyInput>,
1118 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1119 self.backend.put_bucket_policy(req).await
1120 }
1121 async fn delete_bucket_policy(
1122 &self,
1123 req: S3Request<DeleteBucketPolicyInput>,
1124 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1125 self.backend.delete_bucket_policy(req).await
1126 }
1127 async fn get_bucket_policy_status(
1128 &self,
1129 req: S3Request<GetBucketPolicyStatusInput>,
1130 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1131 self.backend.get_bucket_policy_status(req).await
1132 }
1133
1134 async fn get_bucket_acl(
1136 &self,
1137 req: S3Request<GetBucketAclInput>,
1138 ) -> S3Result<S3Response<GetBucketAclOutput>> {
1139 self.backend.get_bucket_acl(req).await
1140 }
1141 async fn put_bucket_acl(
1142 &self,
1143 req: S3Request<PutBucketAclInput>,
1144 ) -> S3Result<S3Response<PutBucketAclOutput>> {
1145 self.backend.put_bucket_acl(req).await
1146 }
1147
1148 async fn get_bucket_cors(
1150 &self,
1151 req: S3Request<GetBucketCorsInput>,
1152 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1153 self.backend.get_bucket_cors(req).await
1154 }
1155 async fn put_bucket_cors(
1156 &self,
1157 req: S3Request<PutBucketCorsInput>,
1158 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1159 self.backend.put_bucket_cors(req).await
1160 }
1161 async fn delete_bucket_cors(
1162 &self,
1163 req: S3Request<DeleteBucketCorsInput>,
1164 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1165 self.backend.delete_bucket_cors(req).await
1166 }
1167
1168 async fn get_bucket_lifecycle_configuration(
1170 &self,
1171 req: S3Request<GetBucketLifecycleConfigurationInput>,
1172 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1173 self.backend.get_bucket_lifecycle_configuration(req).await
1174 }
1175 async fn put_bucket_lifecycle_configuration(
1176 &self,
1177 req: S3Request<PutBucketLifecycleConfigurationInput>,
1178 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1179 self.backend.put_bucket_lifecycle_configuration(req).await
1180 }
1181 async fn delete_bucket_lifecycle(
1182 &self,
1183 req: S3Request<DeleteBucketLifecycleInput>,
1184 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1185 self.backend.delete_bucket_lifecycle(req).await
1186 }
1187
1188 async fn get_bucket_tagging(
1190 &self,
1191 req: S3Request<GetBucketTaggingInput>,
1192 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1193 self.backend.get_bucket_tagging(req).await
1194 }
1195 async fn put_bucket_tagging(
1196 &self,
1197 req: S3Request<PutBucketTaggingInput>,
1198 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1199 self.backend.put_bucket_tagging(req).await
1200 }
1201 async fn delete_bucket_tagging(
1202 &self,
1203 req: S3Request<DeleteBucketTaggingInput>,
1204 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1205 self.backend.delete_bucket_tagging(req).await
1206 }
1207
1208 async fn get_bucket_encryption(
1210 &self,
1211 req: S3Request<GetBucketEncryptionInput>,
1212 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1213 self.backend.get_bucket_encryption(req).await
1214 }
1215 async fn put_bucket_encryption(
1216 &self,
1217 req: S3Request<PutBucketEncryptionInput>,
1218 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1219 self.backend.put_bucket_encryption(req).await
1220 }
1221 async fn delete_bucket_encryption(
1222 &self,
1223 req: S3Request<DeleteBucketEncryptionInput>,
1224 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1225 self.backend.delete_bucket_encryption(req).await
1226 }
1227
1228 async fn get_bucket_logging(
1230 &self,
1231 req: S3Request<GetBucketLoggingInput>,
1232 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1233 self.backend.get_bucket_logging(req).await
1234 }
1235 async fn put_bucket_logging(
1236 &self,
1237 req: S3Request<PutBucketLoggingInput>,
1238 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1239 self.backend.put_bucket_logging(req).await
1240 }
1241
1242 async fn get_bucket_notification_configuration(
1244 &self,
1245 req: S3Request<GetBucketNotificationConfigurationInput>,
1246 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1247 self.backend
1248 .get_bucket_notification_configuration(req)
1249 .await
1250 }
1251 async fn put_bucket_notification_configuration(
1252 &self,
1253 req: S3Request<PutBucketNotificationConfigurationInput>,
1254 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1255 self.backend
1256 .put_bucket_notification_configuration(req)
1257 .await
1258 }
1259
1260 async fn get_bucket_request_payment(
1262 &self,
1263 req: S3Request<GetBucketRequestPaymentInput>,
1264 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1265 self.backend.get_bucket_request_payment(req).await
1266 }
1267 async fn put_bucket_request_payment(
1268 &self,
1269 req: S3Request<PutBucketRequestPaymentInput>,
1270 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1271 self.backend.put_bucket_request_payment(req).await
1272 }
1273
1274 async fn get_bucket_website(
1276 &self,
1277 req: S3Request<GetBucketWebsiteInput>,
1278 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1279 self.backend.get_bucket_website(req).await
1280 }
1281 async fn put_bucket_website(
1282 &self,
1283 req: S3Request<PutBucketWebsiteInput>,
1284 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1285 self.backend.put_bucket_website(req).await
1286 }
1287 async fn delete_bucket_website(
1288 &self,
1289 req: S3Request<DeleteBucketWebsiteInput>,
1290 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1291 self.backend.delete_bucket_website(req).await
1292 }
1293
1294 async fn get_bucket_replication(
1296 &self,
1297 req: S3Request<GetBucketReplicationInput>,
1298 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1299 self.backend.get_bucket_replication(req).await
1300 }
1301 async fn put_bucket_replication(
1302 &self,
1303 req: S3Request<PutBucketReplicationInput>,
1304 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1305 self.backend.put_bucket_replication(req).await
1306 }
1307 async fn delete_bucket_replication(
1308 &self,
1309 req: S3Request<DeleteBucketReplicationInput>,
1310 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1311 self.backend.delete_bucket_replication(req).await
1312 }
1313
1314 async fn get_bucket_accelerate_configuration(
1316 &self,
1317 req: S3Request<GetBucketAccelerateConfigurationInput>,
1318 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1319 self.backend.get_bucket_accelerate_configuration(req).await
1320 }
1321 async fn put_bucket_accelerate_configuration(
1322 &self,
1323 req: S3Request<PutBucketAccelerateConfigurationInput>,
1324 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1325 self.backend.put_bucket_accelerate_configuration(req).await
1326 }
1327
1328 async fn get_bucket_ownership_controls(
1330 &self,
1331 req: S3Request<GetBucketOwnershipControlsInput>,
1332 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1333 self.backend.get_bucket_ownership_controls(req).await
1334 }
1335 async fn put_bucket_ownership_controls(
1336 &self,
1337 req: S3Request<PutBucketOwnershipControlsInput>,
1338 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1339 self.backend.put_bucket_ownership_controls(req).await
1340 }
1341 async fn delete_bucket_ownership_controls(
1342 &self,
1343 req: S3Request<DeleteBucketOwnershipControlsInput>,
1344 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1345 self.backend.delete_bucket_ownership_controls(req).await
1346 }
1347
1348 async fn get_public_access_block(
1350 &self,
1351 req: S3Request<GetPublicAccessBlockInput>,
1352 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1353 self.backend.get_public_access_block(req).await
1354 }
1355 async fn put_public_access_block(
1356 &self,
1357 req: S3Request<PutPublicAccessBlockInput>,
1358 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1359 self.backend.put_public_access_block(req).await
1360 }
1361 async fn delete_public_access_block(
1362 &self,
1363 req: S3Request<DeletePublicAccessBlockInput>,
1364 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1365 self.backend.delete_public_access_block(req).await
1366 }
1367}
1368
1369#[cfg(test)]
1370mod tests {
1371 use super::*;
1372
1373 #[test]
1374 fn manifest_roundtrip_via_metadata() {
1375 let original = ChunkManifest {
1376 codec: CodecKind::CpuZstd,
1377 original_size: 1234,
1378 compressed_size: 567,
1379 crc32c: 0xdead_beef,
1380 };
1381 let mut meta: Option<Metadata> = None;
1382 write_manifest(&mut meta, &original);
1383 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1384 assert_eq!(extracted.codec, original.codec);
1385 assert_eq!(extracted.original_size, original.original_size);
1386 assert_eq!(extracted.compressed_size, original.compressed_size);
1387 assert_eq!(extracted.crc32c, original.crc32c);
1388 }
1389
1390 #[test]
1391 fn missing_metadata_yields_none() {
1392 let meta: Option<Metadata> = None;
1393 assert!(extract_manifest(&meta).is_none());
1394 }
1395
1396 #[test]
1397 fn partial_metadata_yields_none() {
1398 let mut meta = Metadata::new();
1399 meta.insert(META_CODEC.into(), "cpu-zstd".into());
1400 let opt = Some(meta);
1401 assert!(extract_manifest(&opt).is_none());
1402 }
1403}