s4_server/service.rs
1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//! `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//! `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//! `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//! `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//! を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//! 複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//! manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//! manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//! Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//! Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry, CompressTelemetry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.8 #55: stamp the GPU pipeline metrics (`s4_gpu_compress_seconds`,
58/// `s4_gpu_throughput_bytes_per_sec`, `s4_gpu_oom_total`) from a
59/// `CompressTelemetry` returned by `CodecRegistry::compress_with_telemetry`.
60/// CPU codecs (`gpu_seconds = None`) are no-ops here — they're already
61/// covered by the existing `s4_request_latency_seconds` / `s4_bytes_*`
62/// counters in the request-level `record_put` / `record_get` calls.
63#[inline]
64fn stamp_gpu_compress_telemetry(tel: &CompressTelemetry) {
65 if let Some(secs) = tel.gpu_seconds {
66 crate::metrics::record_gpu_compress(tel.codec, secs, tel.bytes_in, tel.bytes_out);
67 }
68 if tel.oom {
69 crate::metrics::record_gpu_oom(tel.codec);
70 }
71}
72
73/// v0.7 #49: percent-encoding set covering everything that is **not** an
74/// `unreserved` character per RFC 3986 §2.3, **plus** we additionally
75/// encode the path-reserved sub-delims that `http::Uri` rejects in a
76/// path segment (`?`, `#`, `%`, control bytes, space, etc.). We
77/// deliberately keep `/` un-encoded because S3 keys legally use `/` as
78/// a logical separator and the rest of the synthetic URI relies on the
79/// path layout `/{bucket}/{key}` round-tripping byte-for-byte.
80const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
81 .add(b' ')
82 .add(b'"')
83 .add(b'#')
84 .add(b'<')
85 .add(b'>')
86 .add(b'?')
87 .add(b'`')
88 .add(b'{')
89 .add(b'}')
90 .add(b'|')
91 .add(b'\\')
92 .add(b'^')
93 .add(b'[')
94 .add(b']')
95 .add(b'%');
96
97/// v0.7 #49: build the synthetic `/{bucket}/{key}` request URI used by
98/// the sidecar / replication helpers when they re-enter the backend
99/// trait without going through the HTTP layer. S3 object keys can
100/// contain spaces, control bytes, and arbitrary Unicode that would
101/// make `format!(...).parse::<http::Uri>()` panic; we percent-encode
102/// the key bytes (RFC 3986 path segment) and the bucket name (defensive
103/// — bucket names are normally DNS-safe, but the helper is the single
104/// choke-point) before splicing them in. If the encoded form *still*
105/// fails to parse (extremely unlikely once everything outside the
106/// unreserved set is escaped) we surface a typed `400 InvalidObjectName`
107/// instead of crashing the worker.
108pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
109 use percent_encoding::utf8_percent_encode;
110 let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
111 let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
112 let raw = format!("/{bucket_enc}/{key_enc}");
113 raw.parse::<http::Uri>().map_err(|e| {
114 // S3 spec uses `InvalidObjectName` (HTTP 400) for keys that
115 // can't be represented in a request URI. The generated
116 // `S3ErrorCode` enum doesn't expose a typed variant for it,
117 // so we round-trip through `from_bytes` which preserves the
118 // canonical wire string while falling back to InvalidArgument
119 // if even that lookup fails (cannot happen at runtime — kept
120 // as a belt-and-suspenders branch so this helper never
121 // panics).
122 let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
123 .unwrap_or(S3ErrorCode::InvalidArgument);
124 S3Error::with_message(
125 code,
126 format!("object key cannot be encoded as a request URI: {e}"),
127 )
128 })
129}
130
131/// v0.4 #20: captured at the start of a handler, before the request is
132/// consumed by the backend call, so the matching `record_access` at
133/// end-of-request can fill in the structured access log entry.
134struct AccessLogPreamble {
135 remote_ip: Option<String>,
136 requester: Option<String>,
137 request_uri: String,
138 user_agent: Option<String>,
139}
140
141pub struct S4Service<B: S3> {
142 /// Wrapped in `Arc` so the v0.6 #40 cross-bucket replication
143 /// dispatcher can clone it into a detached `tokio::spawn` task
144 /// (Arc::clone is cheap; backend trait methods take `&self` so no
145 /// other handler is affected by the indirection).
146 backend: Arc<B>,
147 registry: Arc<CodecRegistry>,
148 dispatcher: Arc<dyn CodecDispatcher>,
149 max_body_bytes: usize,
150 policy: Option<crate::policy::SharedPolicy>,
151 /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
152 /// to `true` when the listener is wrapped in TLS (or ACME), so policies
153 /// gating "deny if not over TLS" can do their job. Defaults to `false`
154 /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
155 secure_transport: bool,
156 /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
157 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
158 /// v0.4 #20: optional S3-style access log emitter.
159 access_log: Option<crate::access_log::SharedAccessLog>,
160 /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
161 /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
162 /// (with the keyring's active key id) after the compress + framing
163 /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
164 /// frame parsing. A `with_sse_key(...)` call wraps the supplied
165 /// key in a 1-slot keyring so single-key (v0.4) operators get the
166 /// same behaviour they had before, just on the v2 frame.
167 sse_keyring: Option<crate::sse::SharedSseKeyring>,
168 /// v0.5 #34: optional first-class versioning state machine. When
169 /// `Some(...)`, S4-server itself owns the per-bucket versioning
170 /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
171 /// list_object_versions / get_bucket_versioning /
172 /// put_bucket_versioning handlers consult the manager instead of
173 /// passing through. When `None` (default), the legacy
174 /// backend-passthrough behaviour applies so existing v0.4
175 /// deployments are unaffected until they explicitly call
176 /// `with_versioning(...)`.
177 versioning: Option<Arc<crate::versioning::VersioningManager>>,
178 /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
179 /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
180 /// generate a fresh DEK via the backend, encrypt the body with it
181 /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
182 /// S4E4 unwrap the DEK through the same backend before decrypt.
183 /// `kms_default_key_id` is used when the request omits an explicit
184 /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
185 /// bucket-default behaviour).
186 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
187 kms_default_key_id: Option<String>,
188 /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
189 /// `Some(...)`, `delete_object` and overwrite-style `put_object`
190 /// consult the manager and refuse the operation with HTTP 403
191 /// `AccessDenied` while the object is locked (Compliance until
192 /// expiry, Governance unless the bypass header is set, or any time
193 /// a legal hold is on). PUT also auto-applies the bucket-default
194 /// retention to brand-new objects when configured. When `None`
195 /// (default), the legacy backend-passthrough behaviour applies, so
196 /// existing v0.4 deployments are unaffected until they explicitly
197 /// call `with_object_lock(...)`.
198 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
199 /// v0.6 #38: optional first-class CORS bucket configuration manager.
200 /// When `Some(...)`, S4-server itself owns per-bucket CORS rules and
201 /// `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
202 /// consult the manager instead of passing through to the backend.
203 /// `handle_preflight` (public method on `S4Service`) routes OPTIONS-
204 /// style preflight matching through the same store; the actual HTTP
205 /// OPTIONS routing wire-up at the listener level is a follow-up
206 /// (s3s framework does not surface OPTIONS as a typed handler).
207 cors: Option<Arc<crate::cors::CorsManager>>,
208 /// v0.6 #36: optional first-class S3 Inventory manager. When
209 /// `Some(...)`, S4-server itself owns per-(bucket, id) inventory
210 /// configurations and `put_bucket_inventory_configuration` /
211 /// `get_bucket_inventory_configuration` /
212 /// `list_bucket_inventory_configurations` /
213 /// `delete_bucket_inventory_configuration` consult the manager
214 /// instead of passing through to the backend. The actual periodic
215 /// CSV emission is driven by a tokio task in `main.rs` that calls
216 /// `InventoryManager::run_once_for_test` on a fixed cadence; the
217 /// service handlers below only deal with config-level CRUD.
218 inventory: Option<Arc<crate::inventory::InventoryManager>>,
219 /// v0.6 #35: optional first-class S3 bucket-notification manager.
220 /// When `Some(...)`, S4-server itself owns per-bucket notification
221 /// configurations and `put_bucket_notification_configuration` /
222 /// `get_bucket_notification_configuration` consult the manager
223 /// instead of passing through to the backend. Successful PUT /
224 /// DELETE handlers fire matching destinations on a detached tokio
225 /// task (best-effort; see `crate::notifications::dispatch_event`).
226 notifications: Option<Arc<crate::notifications::NotificationManager>>,
227 /// v0.6 #37: optional first-class S3 Lifecycle configuration
228 /// manager. When `Some(...)`, S4-server itself owns per-bucket
229 /// lifecycle rules and `put_bucket_lifecycle_configuration` /
230 /// `get_bucket_lifecycle_configuration` /
231 /// `delete_bucket_lifecycle` consult the manager instead of
232 /// passing through to the backend. The actual background scanner
233 /// (list_objects_v2 -> evaluate -> delete / metadata-rewrite per
234 /// rule) is a v0.7+ follow-up; the test path
235 /// `S4Service::run_lifecycle_once_for_test` exercises the
236 /// evaluator end-to-end so this v0.6 #37 wiring is enough to ship
237 /// the configuration-management half without putting a
238 /// half-wired bucket-walk in front of users.
239 lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
240 /// v0.6 #39: optional first-class object + bucket Tagging manager.
241 /// When `Some(...)`, S4-server itself owns per-(bucket, key) and
242 /// per-bucket tag state — `PutObjectTagging` /
243 /// `GetObjectTagging` / `DeleteObjectTagging` /
244 /// `PutBucketTagging` / `GetBucketTagging` /
245 /// `DeleteBucketTagging` route through the manager (replacing the
246 /// previous backend-passthrough behaviour). `put_object` also
247 /// pre-parses the `x-amz-tagging` header / `Tagging` input field
248 /// so the IAM policy evaluator can gate on
249 /// `s3:RequestObjectTag/<key>` and `s3:ExistingObjectTag/<key>`.
250 /// On a successful PUT the parsed tags are persisted; on a
251 /// successful DELETE the matching tag entry is dropped.
252 tagging: Option<Arc<crate::tagging::TagManager>>,
253 /// v0.6 #40: optional first-class cross-bucket replication manager.
254 /// When `Some(...)`, S4-server itself owns per-bucket replication
255 /// rules; `PutBucketReplication` / `GetBucketReplication` /
256 /// `DeleteBucketReplication` route through the manager (replacing
257 /// the previous backend-passthrough behaviour). On every successful
258 /// `put_object` the manager's rule list is consulted; the
259 /// highest-priority matching enabled rule wins, the per-key status
260 /// is recorded as `Pending`, and the source body and metadata are
261 /// handed to a detached tokio task that PUTs to the destination
262 /// bucket through the same backend. The replica is stamped with
263 /// `x-amz-replication-status: REPLICA` in its metadata; the
264 /// source-side status is updated to `Completed` on success or
265 /// `Failed` after the 3-attempt retry budget is exhausted (drop
266 /// counter bumps in either-side case so dashboards see the loss).
267 /// `head_object` / `get_object` echo the recorded status back as
268 /// `x-amz-replication-status` so consumers can poll progress.
269 /// Limited to single-instance (same `S4Service`) replication; true
270 /// cross-region (multi-instance) is a v0.7+ follow-up.
271 replication: Option<Arc<crate::replication::ReplicationManager>>,
272 /// v0.6 #42: optional MFA-Delete enforcement layer. When `Some(...)`,
273 /// every DELETE / DELETE-version / delete-marker / `PutBucketVersioning`
274 /// request against a bucket whose MFA-Delete state is `Enabled`
275 /// must carry `x-amz-mfa: <serial> <code>` (RFC 6238 6-digit TOTP);
276 /// missing or invalid tokens return HTTP 403 `AccessDenied`. When
277 /// `None` (default), the gate is a no-op so existing v0.4 / v0.5
278 /// deployments are unaffected until they explicitly call
279 /// `with_mfa_delete(...)`.
280 mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
281 /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
282 /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
283 /// or be matched against a configured server-managed keyring/KMS).
284 /// Set by `--compliance-mode strict` after the boot-time
285 /// prerequisite check passes.
286 compliance_strict: bool,
287 /// v0.7 #47: optional SigV4a (asymmetric ECDSA-P256-SHA256) verify
288 /// gate. When `Some(...)`, the listener-side middleware (see
289 /// [`crate::routing::try_sigv4a_verify`]) inspects every incoming
290 /// request and short-circuits SigV4a-signed ones — verifying the
291 /// signature against the credential store and returning 403
292 /// `SignatureDoesNotMatch` / `InvalidAccessKeyId` on failure. Plain
293 /// SigV4 (HMAC-SHA256) requests pass through to s3s untouched. When
294 /// `None`, the middleware is a no-op so the existing SigV4 path is
295 /// unaffected (operators opt in via `--sigv4a-credentials <DIR>`).
296 sigv4a_gate: Option<Arc<SigV4aGate>>,
297 /// v0.8 #54 BUG-5..10: per-`upload_id` side-table that ferries the
298 /// SSE / Tagging / Object-Lock context captured at
299 /// `CreateMultipartUpload` time through to `UploadPart` /
300 /// `CompleteMultipartUpload`. Always-on (no `with_*` flag) — the
301 /// store is gateway-internal and idle when no multipart is in
302 /// flight. See [`crate::multipart_state`] for rationale.
303 multipart_state: Arc<crate::multipart_state::MultipartStateStore>,
304 /// v0.8 #52: plaintext bytes per S4E5 chunk on the SSE-S4 PUT
305 /// path. `0` (default) → use the legacy buffered S4E2 path
306 /// (whole-body AES-GCM tag, GET buffers + verifies before
307 /// emitting). Non-zero → use the chunked S4E5 frame so GET can
308 /// stream-decrypt chunk-by-chunk. Wired by `--sse-chunk-size`
309 /// in `main.rs`. SSE-C and SSE-KMS are intentionally unaffected
310 /// (chunked variants tracked in a follow-up issue).
311 sse_chunk_size: usize,
312}
313
314impl<B: S3> S4Service<B> {
315 /// AWS S3 単発 PUT の API 上限 (5 GiB)
316 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
317
318 pub fn new(
319 backend: B,
320 registry: Arc<CodecRegistry>,
321 dispatcher: Arc<dyn CodecDispatcher>,
322 ) -> Self {
323 Self {
324 backend: Arc::new(backend),
325 registry,
326 dispatcher,
327 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
328 policy: None,
329 secure_transport: false,
330 rate_limits: None,
331 access_log: None,
332 sse_keyring: None,
333 versioning: None,
334 kms: None,
335 kms_default_key_id: None,
336 object_lock: None,
337 cors: None,
338 inventory: None,
339 notifications: None,
340 lifecycle: None,
341 tagging: None,
342 replication: None,
343 mfa_delete: None,
344 compliance_strict: false,
345 sigv4a_gate: None,
346 multipart_state: Arc::new(crate::multipart_state::MultipartStateStore::new()),
347 // v0.8 #52: chunked SSE-S4 disabled by default — opt
348 // in via `S4Service::with_sse_chunk_size(...)` /
349 // `--sse-chunk-size <BYTES>`. Default keeps the legacy
350 // S4E2 buffered path so existing deployments are
351 // bit-for-bit unchanged.
352 sse_chunk_size: 0,
353 }
354 }
355
356 /// v0.7 #47: attach the SigV4a verify gate. Once set, the
357 /// listener-side middleware (`crate::routing::try_sigv4a_verify`)
358 /// short-circuits any incoming `AWS4-ECDSA-P256-SHA256` request,
359 /// verifying it against the supplied credential store and
360 /// returning 403 on failure. Plain SigV4 (HMAC-SHA256) requests
361 /// are unaffected. When the gate is unset (default), the
362 /// middleware skips entirely so existing SigV4 deployments keep
363 /// working.
364 #[must_use]
365 pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
366 self.sigv4a_gate = Some(gate);
367 self
368 }
369
370 /// v0.7 #47: borrow the attached SigV4a gate. Used by `main.rs`
371 /// to snapshot the gate `Arc` before the s3s `ServiceBuilder`
372 /// consumes the `S4Service` (the listener-side middleware needs
373 /// the same `Arc` because s3s' SigV4 verifier rejects SigV4a
374 /// algorithm tokens with "unknown algorithm" — match has to
375 /// happen at the hyper layer instead).
376 #[must_use]
377 pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
378 self.sigv4a_gate.as_ref()
379 }
380
381 /// v0.8.2 #62: borrow the multipart state store so `main.rs` can
382 /// snapshot the `Arc` before the s3s `ServiceBuilder` consumes
383 /// the `S4Service`. The background `sweep_stale` task in `main.rs`
384 /// holds this `Arc` and ticks once an hour to drop abandoned
385 /// upload contexts (and their `Zeroizing<[u8; 32]>` SSE-C keys).
386 #[must_use]
387 pub fn multipart_state(&self) -> &Arc<crate::multipart_state::MultipartStateStore> {
388 &self.multipart_state
389 }
390
391 /// v0.6 #39: attach the in-memory object + bucket Tagging manager.
392 /// Once set, `Put/Get/Delete` `Object/Bucket Tagging` route
393 /// through the manager (instead of forwarding to the backend),
394 /// and `put_object`'s `x-amz-tagging` parse path becomes the
395 /// source of `s3:RequestObjectTag/<key>` for the IAM policy
396 /// evaluator. The manager itself is shared via `Arc`.
397 #[must_use]
398 pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
399 self.tagging = Some(mgr);
400 self
401 }
402
403 /// v0.6 #39: borrow the attached tagging manager (test /
404 /// introspection — the snapshotter in `main.rs`, when wired,
405 /// will keep its own `Arc` clone).
406 #[must_use]
407 pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
408 self.tagging.as_ref()
409 }
410
411 /// v0.6 #36: attach the in-memory S3 Inventory manager. Once set,
412 /// `put_bucket_inventory_configuration` /
413 /// `get_bucket_inventory_configuration` /
414 /// `list_bucket_inventory_configurations` /
415 /// `delete_bucket_inventory_configuration` route through the
416 /// manager. The actual periodic CSV / manifest emission is
417 /// orchestrated by a tokio task started in `main.rs`; the manager
418 /// itself is shared between the handler and the scheduler via
419 /// `Arc`.
420 #[must_use]
421 pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
422 self.inventory = Some(mgr);
423 self
424 }
425
426 /// v0.6 #36: borrow the attached inventory manager (test /
427 /// introspection — the background scheduler in `main.rs` keeps its
428 /// own `Arc` clone, so this accessor is for the test path that
429 /// invokes `run_once_for_test` directly).
430 #[must_use]
431 pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
432 self.inventory.as_ref()
433 }
434
435 /// v0.6 #37: attach the in-memory S3 Lifecycle configuration
436 /// manager. Once set, `put_bucket_lifecycle_configuration` /
437 /// `get_bucket_lifecycle_configuration` / `delete_bucket_lifecycle`
438 /// route through the manager (replacing the previous backend-
439 /// passthrough behaviour). The actual periodic scanner that walks
440 /// the source bucket and invokes Expiration / Transition /
441 /// NoncurrentExpiration actions is a v0.7+ follow-up — see
442 /// [`Self::run_lifecycle_once_for_test`] for the in-memory test
443 /// path that exercises the evaluator end-to-end.
444 #[must_use]
445 pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
446 self.lifecycle = Some(mgr);
447 self
448 }
449
450 /// v0.6 #37: borrow the attached lifecycle manager (test /
451 /// introspection — the background scheduler in `main.rs` keeps its
452 /// own `Arc` clone, so this accessor is for the test path that
453 /// invokes the evaluator directly).
454 #[must_use]
455 pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
456 self.lifecycle.as_ref()
457 }
458
459 /// v0.6 #37: synchronous test entry that runs the lifecycle evaluator
460 /// against a caller-provided list of `(key, age, size, tags)` tuples
461 /// and returns the `(key, action)` pairs that should fire. The actual
462 /// backend invocation (S3.delete_object / metadata rewrite) is left
463 /// to the caller — the unit + E2E tests use this to verify the
464 /// evaluator without spawning the (deferred) background scanner.
465 /// Returns an empty `Vec` when no lifecycle manager is attached or
466 /// no rule matches.
467 #[must_use]
468 pub fn run_lifecycle_once_for_test(
469 &self,
470 bucket: &str,
471 objects: &[crate::lifecycle::EvaluateBatchEntry],
472 ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
473 let Some(mgr) = self.lifecycle.as_ref() else {
474 return Vec::new();
475 };
476 crate::lifecycle::evaluate_batch(mgr, bucket, objects)
477 }
478
479 /// v0.6 #35: attach the in-memory bucket-notification manager. Once
480 /// set, `put_bucket_notification_configuration` /
481 /// `get_bucket_notification_configuration` route through the manager
482 /// (replacing the previous backend-passthrough behaviour); successful
483 /// `put_object` / `delete_object` calls fire matching destinations
484 /// on a detached tokio task via
485 /// `crate::notifications::dispatch_event` (best-effort, fire-and-
486 /// forget — failures bump the manager's `dropped_total` counter and
487 /// log at warn but do NOT fail the originating S3 request).
488 #[must_use]
489 pub fn with_notifications(
490 mut self,
491 mgr: Arc<crate::notifications::NotificationManager>,
492 ) -> Self {
493 self.notifications = Some(mgr);
494 self
495 }
496
497 /// v0.6 #35: borrow the attached notifications manager (test /
498 /// introspection — used by the metrics layer to read
499 /// `dropped_total`).
500 #[must_use]
501 pub fn notifications_manager(
502 &self,
503 ) -> Option<&Arc<crate::notifications::NotificationManager>> {
504 self.notifications.as_ref()
505 }
506
507 /// v0.6 #35: internal helper used by the DELETE handlers to fire a
508 /// matching notification on a detached tokio task. No-op when no
509 /// manager is attached or no rule on the bucket matches the given
510 /// (event, key) tuple.
511 fn fire_delete_notification(
512 &self,
513 bucket: &str,
514 key: &str,
515 event: crate::notifications::EventType,
516 version_id: Option<String>,
517 ) {
518 let Some(mgr) = self.notifications.as_ref() else {
519 return;
520 };
521 let dests = mgr.match_destinations(bucket, &event, key);
522 if dests.is_empty() {
523 return;
524 }
525 tokio::spawn(crate::notifications::dispatch_event(
526 Arc::clone(mgr),
527 bucket.to_owned(),
528 key.to_owned(),
529 event,
530 None,
531 None,
532 version_id,
533 format!("S4-{}", uuid::Uuid::new_v4()),
534 ));
535 }
536
537 /// v0.6 #40: attach the in-memory cross-bucket replication manager.
538 /// Once set, `put_bucket_replication` / `get_bucket_replication` /
539 /// `delete_bucket_replication` route through the manager (replacing
540 /// the previous backend-passthrough behaviour); a successful
541 /// `put_object` whose key matches an enabled rule fires a detached
542 /// tokio task that PUTs the same body + metadata to the rule's
543 /// destination bucket, stamping the replica with
544 /// `x-amz-replication-status: REPLICA`. Failures after the retry
545 /// budget bump the manager's `dropped_total` counter and are
546 /// surfaced in the `s4_replication_dropped_total` Prometheus
547 /// counter; successes bump `s4_replication_replicated_total`.
548 #[must_use]
549 pub fn with_replication(
550 mut self,
551 mgr: Arc<crate::replication::ReplicationManager>,
552 ) -> Self {
553 self.replication = Some(mgr);
554 self
555 }
556
557 /// v0.6 #40: borrow the attached replication manager (test /
558 /// introspection — used by the metrics layer to read
559 /// `dropped_total`).
560 #[must_use]
561 pub fn replication_manager(
562 &self,
563 ) -> Option<&Arc<crate::replication::ReplicationManager>> {
564 self.replication.as_ref()
565 }
566
567 /// v0.6 #40: internal helper used by the PUT handlers to fire a
568 /// detached cross-bucket replication task. No-op when no manager
569 /// is attached, the source backend PUT failed, or no rule on the
570 /// source bucket matches the (key, tags) tuple. The `body` is the
571 /// post-compression / post-encryption `Bytes` that was sent to
572 /// the source backend (refcount-cloned), and `metadata` is the
573 /// metadata map that already includes the manifest /
574 /// `s4-encrypted` markers — the replica decodes through the same
575 /// path. The destination PUT runs through `Arc<B>::put_object`.
576 ///
577 /// ## v0.8.2 #61: generation token + shadow-key destination
578 ///
579 /// `pending_version` is the source-side `PutOutcome` minted by the
580 /// caller's versioning branch (or `None` for unversioned /
581 /// suspended buckets). When `pending_version.versioned_response`
582 /// is `true`, the dispatcher writes the destination under the same
583 /// shadow path the source uses (`<key>.__s4ver__/<vid>`) so the
584 /// destination's version chain receives the new version the same
585 /// way `?versionId=` GET resolves it. Closes audit C-1.
586 ///
587 /// The dispatcher also mints a fresh `generation` token before
588 /// spawning, threaded through to [`crate::replication::
589 /// replicate_object`]. Closes audit C-3 — a stale retry of an
590 /// older PUT can no longer overwrite the destination's newer bytes
591 /// because the CAS guard sees the higher stored generation and
592 /// drops its destination write.
593 ///
594 /// ## Asymmetric versioning policy (out of scope)
595 ///
596 /// We assume source + destination buckets share the same
597 /// versioning policy (both Enabled or both Suspended /
598 /// Unversioned). Cross-bucket policy queries would require a
599 /// backend round-trip per replication, which is not worth it for
600 /// the single-instance scope. Operators who configure asymmetric
601 /// versioning will see destination-side `?versionId=` lookups
602 /// miss — documented as out-of-scope until a future per-rule
603 /// `destination_versioning_policy` knob lands.
604 // 8 args is the post-#61 shape: replication needs the
605 // source bucket+key, the canonical tag set for rule-matching,
606 // the post-codec body+metadata for the destination PUT, the
607 // backend-success gate, and the pending version-id for the
608 // shadow-key destination override. A shape struct would just
609 // split the (single) call site so opt for the inline form.
610 #[allow(clippy::too_many_arguments)]
611 fn spawn_replication_if_matched(
612 &self,
613 source_bucket: &str,
614 source_key: &str,
615 request_tags: &Option<crate::tagging::TagSet>,
616 body: &bytes::Bytes,
617 metadata: &Option<std::collections::HashMap<String, String>>,
618 backend_ok: bool,
619 pending_version: Option<&crate::versioning::PutOutcome>,
620 ) where
621 B: Send + Sync + 'static,
622 {
623 if !backend_ok {
624 return;
625 }
626 let Some(mgr) = self.replication.as_ref() else {
627 return;
628 };
629 // Pull the request's tags into the (k, v) shape the matcher
630 // expects. The tagging manager would have the canonical
631 // post-PUT view but at this point in the pipeline it's
632 // already been written above; for the rule-match decision
633 // the request's tags are sufficient (= the tags this PUT
634 // applies, S3 PutObject is full-replace on tags).
635 let object_tags: Vec<(String, String)> = request_tags
636 .as_ref()
637 .map(|ts| ts.iter().cloned().collect())
638 .unwrap_or_default();
639 let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
640 return;
641 };
642 // v0.8.2 #61: mint the per-PUT generation BEFORE the eager
643 // Pending stamp so the stamp itself carries the right
644 // generation (the CAS in `record_status_if_newer` would
645 // otherwise see a `generation=0` Pending and accept any
646 // stale retry).
647 let generation = mgr.next_generation();
648 // Eagerly mark the source key as Pending so a HEAD between
649 // the source PUT returning and the spawned task completing
650 // surfaces the in-flight state. CAS-guarded so a slower
651 // older PUT can't downgrade a newer Completed back to Pending.
652 let _ = mgr.record_status_if_newer(
653 source_bucket,
654 source_key,
655 generation,
656 crate::replication::ReplicationStatus::Pending,
657 );
658 // v0.8.2 #61: derive the destination storage key. For a
659 // versioning-Enabled source the destination receives the
660 // same shadow-key path so a `?versionId=<vid>` GET on the
661 // destination resolves through the same lookup the source
662 // uses. Suspended / Unversioned sources keep the logical
663 // key (= `None` override = dispatcher uses `source_key`).
664 let destination_key_override = pending_version
665 .filter(|pv| pv.versioned_response)
666 .map(|pv| versioned_shadow_key(source_key, &pv.version_id));
667 let mgr_cl = Arc::clone(mgr);
668 let backend = Arc::clone(&self.backend);
669 let body_cl = body.clone();
670 let metadata_cl = metadata.clone();
671 let source_bucket_cl = source_bucket.to_owned();
672 let source_key_cl = source_key.to_owned();
673 tokio::spawn(async move {
674 let do_put = move |dest_bucket: String,
675 dest_key: String,
676 dest_body: bytes::Bytes,
677 dest_meta: Option<std::collections::HashMap<String, String>>| {
678 let backend = Arc::clone(&backend);
679 async move {
680 let req = S3Request {
681 input: PutObjectInput {
682 bucket: dest_bucket,
683 key: dest_key,
684 body: Some(bytes_to_blob(dest_body)),
685 metadata: dest_meta,
686 ..Default::default()
687 },
688 method: http::Method::PUT,
689 uri: "/".parse().unwrap(),
690 headers: http::HeaderMap::new(),
691 extensions: http::Extensions::new(),
692 credentials: None,
693 region: None,
694 service: None,
695 trailing_headers: None,
696 };
697 backend
698 .put_object(req)
699 .await
700 .map(|_| ())
701 .map_err(|e| format!("destination put_object: {e}"))
702 }
703 };
704 crate::replication::replicate_object(
705 rule,
706 source_bucket_cl,
707 source_key_cl,
708 body_cl,
709 metadata_cl,
710 do_put,
711 mgr_cl,
712 generation,
713 destination_key_override,
714 )
715 .await;
716 });
717 }
718
719 /// v0.6 #42: attach the in-memory MFA-Delete enforcement manager.
720 /// Once set, every DELETE / DELETE-version / delete-marker /
721 /// `PutBucketVersioning` request against a bucket whose MFA-Delete
722 /// state is `Enabled` requires a valid `x-amz-mfa: <serial> <code>`
723 /// header (RFC 6238 6-digit TOTP); the gate is a no-op for buckets
724 /// where MFA-Delete is `Disabled` (S3 default).
725 #[must_use]
726 pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
727 self.mfa_delete = Some(mgr);
728 self
729 }
730
731 /// v0.6 #42: borrow the attached MFA-Delete manager (test /
732 /// introspection — used by the snapshot path in `main.rs` to call
733 /// `to_json` for restart-recoverable state).
734 #[must_use]
735 pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
736 self.mfa_delete.as_ref()
737 }
738
739 /// v0.6 #38: attach the in-memory CORS configuration manager. Once
740 /// set, `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
741 /// route through the manager instead of forwarding to the backend,
742 /// and [`Self::handle_preflight`] becomes useful for the (future)
743 /// listener-side OPTIONS interceptor.
744 #[must_use]
745 pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
746 self.cors = Some(mgr);
747 self
748 }
749
750 /// v0.6 #38: Borrow the attached CORS manager (test / introspection).
751 #[must_use]
752 pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
753 self.cors.as_ref()
754 }
755
756 /// v0.6 #38: evaluate a CORS preflight request against the bucket's
757 /// configured rules and, if a rule matches, return the headers that
758 /// the (future) listener-side OPTIONS interceptor must put on the
759 /// 200 response: `Access-Control-Allow-Origin`, `Access-Control-
760 /// Allow-Methods`, `Access-Control-Allow-Headers`, optionally
761 /// `Access-Control-Max-Age` and `Access-Control-Expose-Headers`.
762 ///
763 /// Returns `None` when no manager is attached, no config is
764 /// registered for the bucket, or no rule matches the (origin,
765 /// method, headers) triple. The caller is responsible for turning
766 /// `None` into the appropriate 403 response.
767 ///
768 /// **Note:** the OPTIONS routing itself (i.e. wiring this method
769 /// into the hyper-util listener path) is a follow-up — s3s does not
770 /// surface OPTIONS as a typed S3 handler, so this method is
771 /// currently call-able only from inside other handlers and tests.
772 #[must_use]
773 pub fn handle_preflight(
774 &self,
775 bucket: &str,
776 origin: &str,
777 method: &str,
778 request_headers: &[String],
779 ) -> Option<std::collections::HashMap<String, String>> {
780 let mgr = self.cors.as_ref()?;
781 let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
782 let mut h = std::collections::HashMap::new();
783 // Echo the matched origin back. If the rule used "*" we still
784 // echo "*" (S3 spec — the spec does not require us to echo the
785 // *requesting* origin when the wildcard matched).
786 let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
787 "*".to_string()
788 } else {
789 origin.to_string()
790 };
791 h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
792 h.insert(
793 "Access-Control-Allow-Methods".to_string(),
794 rule.allowed_methods.join(", "),
795 );
796 if !rule.allowed_headers.is_empty() {
797 // For the Allow-Headers response, echo back the rule's
798 // pattern list verbatim (S3 echoes the configured list,
799 // including "*" if present). Browsers honour exact-match
800 // rules.
801 h.insert(
802 "Access-Control-Allow-Headers".to_string(),
803 rule.allowed_headers.join(", "),
804 );
805 }
806 if let Some(secs) = rule.max_age_seconds {
807 h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
808 }
809 if !rule.expose_headers.is_empty() {
810 h.insert(
811 "Access-Control-Expose-Headers".to_string(),
812 rule.expose_headers.join(", "),
813 );
814 }
815 Some(h)
816 }
817
818 /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
819 /// SSE indicator (server-side encryption header or SSE-C customer
820 /// key); requests without one are rejected with 400 InvalidRequest.
821 /// Boot-time prerequisite checking lives in the binary
822 /// (`validate_compliance_mode`) so this flag is purely the runtime
823 /// switch.
824 #[must_use]
825 pub fn with_compliance_strict(mut self, on: bool) -> Self {
826 self.compliance_strict = on;
827 self
828 }
829
830 /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
831 /// manager. Once set, `delete_object` and overwrite-path
832 /// `put_object` refuse operations on locked keys with HTTP 403
833 /// `AccessDenied`; new PUTs to a bucket with a default retention
834 /// policy auto-create per-object lock state.
835 #[must_use]
836 pub fn with_object_lock(
837 mut self,
838 mgr: Arc<crate::object_lock::ObjectLockManager>,
839 ) -> Self {
840 self.object_lock = Some(mgr);
841 self
842 }
843
844 /// v0.7 #45: borrow the attached Object Lock manager (read-only —
845 /// the lifecycle scanner uses this to skip currently-locked objects
846 /// before issuing `delete_object`, since an Object Lock always wins
847 /// over Lifecycle Expiration in AWS S3 semantics). Mirrors the
848 /// shape of [`Self::lifecycle_manager`] /
849 /// [`Self::tag_manager`] — purely additive accessor, no handler
850 /// behaviour change.
851 #[must_use]
852 pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
853 self.object_lock.as_ref()
854 }
855
856 /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
857 /// when a PUT requests SSE-KMS without naming a specific KMS key
858 /// (operators set this to mirror AWS S3's bucket-default key).
859 #[must_use]
860 pub fn with_kms_backend(
861 mut self,
862 kms: Arc<dyn crate::kms::KmsBackend>,
863 default_key_id: Option<String>,
864 ) -> Self {
865 self.kms = Some(kms);
866 self.kms_default_key_id = default_key_id;
867 self
868 }
869
870 /// v0.5 #34: attach the first-class versioning state machine. Once
871 /// set, this `S4Service` owns the per-bucket versioning state +
872 /// per-(bucket, key) version chain; `put_object` / `get_object` /
873 /// `delete_object` / `list_object_versions` /
874 /// `get_bucket_versioning` / `put_bucket_versioning` consult the
875 /// manager instead of passing through to the backend. The backend
876 /// is still used as the byte store: Suspended / Unversioned buckets
877 /// keep using `<key>` directly (legacy), Enabled buckets redirect
878 /// each version's bytes to a shadow key
879 /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
880 /// PUTs to the same logical key.
881 #[must_use]
882 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
883 self.versioning = Some(mgr);
884 self
885 }
886
887 /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
888 /// Internally wraps it in a 1-slot keyring with id=1 active, so
889 /// new objects ride the v0.5 S4E2 frame while previously-written
890 /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
891 /// fallback path. Operators wanting true rotation should call
892 /// [`Self::with_sse_keyring`] instead.
893 #[must_use]
894 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
895 let keyring = crate::sse::SseKeyring::new(1, key);
896 self.sse_keyring = Some(std::sync::Arc::new(keyring));
897 self
898 }
899
900 /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
901 /// the active key (S4E2 frame stamped with that key's id); GET
902 /// dispatches on the body's magic — S4E1 falls back to trying every
903 /// key in the ring (active first) so v0.4 objects survive a
904 /// migration; S4E2 looks up the explicit key_id from the header.
905 #[must_use]
906 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
907 self.sse_keyring = Some(keyring);
908 self
909 }
910
911 /// v0.8 #52: opt the SSE-S4 PUT path into the chunked S4E5 frame
912 /// (so the matching GET can stream-decrypt chunk-by-chunk
913 /// instead of buffering the entire body before tag verify).
914 /// `bytes` is the plaintext slice size — typically 1 MiB; 0
915 /// disables the path and reverts to the legacy S4E2 buffered
916 /// frame.
917 ///
918 /// SSE-C (S4E3) and SSE-KMS (S4E4) are intentionally untouched:
919 /// the chunked envelopes for those flows are a follow-up issue
920 /// (the customer-key wire surface needs separate version
921 /// negotiation).
922 ///
923 /// Has no effect when `with_sse_keyring` / `with_sse_key` is
924 /// not also set — the chunked path runs only on the SSE-S4
925 /// branch of `put_object`.
926 #[must_use]
927 pub fn with_sse_chunk_size(mut self, bytes: usize) -> Self {
928 self.sse_chunk_size = bytes;
929 self
930 }
931
932 /// v0.4 #20: attach an S3-style access-log emitter. Each completed
933 /// PUT / GET / DELETE / List handler emits one entry into the
934 /// emitter's buffer; a background flusher (started separately, see
935 /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
936 /// rotated `.log` files into the configured directory.
937 #[must_use]
938 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
939 self.access_log = Some(log);
940 self
941 }
942
943 /// Capture the per-request access-log preamble before the request is
944 /// consumed by the backend call. Returns `None` if no access logger
945 /// is configured (cheap early-out so the handler doesn't pay the
946 /// header-clone cost when access logging is off).
947 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
948 self.access_log.as_ref()?;
949 Some(AccessLogPreamble {
950 remote_ip: req
951 .headers
952 .get("x-forwarded-for")
953 .and_then(|v| v.to_str().ok())
954 .and_then(|raw| raw.split(',').next())
955 .map(|s| s.trim().to_owned()),
956 requester: Self::principal_of(req).map(str::to_owned),
957 request_uri: format!("{} {}", req.method, req.uri.path()),
958 user_agent: req
959 .headers
960 .get("user-agent")
961 .and_then(|v| v.to_str().ok())
962 .map(str::to_owned),
963 })
964 }
965
966 /// Internal — called by handlers at end-of-request with a captured
967 /// preamble. Best-effort: swallows the await fast (clones Arc +
968 /// pushes), no error propagation back to the request path.
969 #[allow(clippy::too_many_arguments)]
970 async fn record_access(
971 &self,
972 preamble: Option<AccessLogPreamble>,
973 operation: &'static str,
974 bucket: &str,
975 key: Option<&str>,
976 http_status: u16,
977 bytes_sent: u64,
978 object_size: u64,
979 total_time_ms: u64,
980 error_code: Option<&str>,
981 ) {
982 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
983 return;
984 };
985 log.record(crate::access_log::AccessLogEntry {
986 time: std::time::SystemTime::now(),
987 bucket: bucket.to_owned(),
988 remote_ip: p.remote_ip,
989 requester: p.requester,
990 operation,
991 key: key.map(str::to_owned),
992 request_uri: p.request_uri,
993 http_status,
994 error_code: error_code.map(str::to_owned),
995 bytes_sent,
996 object_size,
997 total_time_ms,
998 user_agent: p.user_agent,
999 })
1000 .await;
1001 }
1002
1003 /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
1004 /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
1005 /// throttle-checked before the policy gate; throttled requests return
1006 /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
1007 /// `s4_rate_limit_throttled_total{principal,bucket}`.
1008 #[must_use]
1009 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
1010 self.rate_limits = Some(rl);
1011 self
1012 }
1013
1014 /// Helper used by request handlers to apply the rate limit. Returns
1015 /// `Ok(())` when allowed (or no rate limiter is configured), or a
1016 /// `SlowDown` S3Error otherwise.
1017 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
1018 let Some(rl) = self.rate_limits.as_ref() else {
1019 return Ok(());
1020 };
1021 let principal_id = Self::principal_of(req);
1022 if !rl.check(principal_id, bucket) {
1023 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
1024 return Err(S3Error::with_message(
1025 S3ErrorCode::SlowDown,
1026 format!("rate-limited: bucket={bucket}"),
1027 ));
1028 }
1029 Ok(())
1030 }
1031
1032 /// Tell the policy evaluator that the listener is reached over TLS
1033 /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
1034 /// resolves to `true`. Defaults to `false`.
1035 #[must_use]
1036 pub fn with_secure_transport(mut self, on: bool) -> Self {
1037 self.secure_transport = on;
1038 self
1039 }
1040
1041 #[must_use]
1042 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
1043 self.max_body_bytes = n;
1044 self
1045 }
1046
1047 /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
1048 /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
1049 /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
1050 /// When `None` (the default), no policy enforcement happens.
1051 #[must_use]
1052 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
1053 self.policy = Some(policy);
1054 self
1055 }
1056
1057 /// Pull the SigV4 access key id off the request's credentials, if any.
1058 /// Used as the `principal_id` for policy evaluation.
1059 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
1060 req.credentials.as_ref().map(|c| c.access_key.as_str())
1061 }
1062
1063 /// v0.3 #13: build the per-request policy context from the incoming
1064 /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
1065 /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
1066 /// production deployments are behind an LB / reverse proxy that sets
1067 /// this), `aws:CurrentTime` from the system clock, and
1068 /// `aws:SecureTransport` from the per-listener TLS flag.
1069 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
1070 let user_agent = req
1071 .headers
1072 .get("user-agent")
1073 .and_then(|v| v.to_str().ok())
1074 .map(str::to_owned);
1075 // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
1076 // is the original client. Trim and parse leniently.
1077 let source_ip = req
1078 .headers
1079 .get("x-forwarded-for")
1080 .and_then(|v| v.to_str().ok())
1081 .and_then(|raw| raw.split(',').next())
1082 .and_then(|s| s.trim().parse().ok());
1083 crate::policy::RequestContext {
1084 source_ip,
1085 user_agent,
1086 request_time: Some(std::time::SystemTime::now()),
1087 secure_transport: self.secure_transport,
1088 existing_object_tags: None,
1089 request_object_tags: None,
1090 extra: Default::default(),
1091 }
1092 }
1093
1094 /// Helper used by request handlers to enforce the optional policy.
1095 /// Returns `Ok(())` when allowed (or no policy is configured), or an
1096 /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
1097 /// counter on deny.
1098 fn enforce_policy<I>(
1099 &self,
1100 req: &S3Request<I>,
1101 action: &'static str,
1102 bucket: &str,
1103 key: Option<&str>,
1104 ) -> S3Result<()> {
1105 self.enforce_policy_with_extra(req, action, bucket, key, None, None)
1106 }
1107
1108 /// v0.6 #39: variant of [`Self::enforce_policy`] that lets the
1109 /// caller plumb tag context (existing-on-object + on-request) into
1110 /// the policy evaluator. Both arguments default to `None`, in
1111 /// which case the resulting `RequestContext` is identical to
1112 /// [`Self::enforce_policy`]'s — so for handlers that don't deal
1113 /// with tags this is a transparent no-op.
1114 fn enforce_policy_with_extra<I>(
1115 &self,
1116 req: &S3Request<I>,
1117 action: &'static str,
1118 bucket: &str,
1119 key: Option<&str>,
1120 request_tags: Option<&crate::tagging::TagSet>,
1121 existing_tags: Option<&crate::tagging::TagSet>,
1122 ) -> S3Result<()> {
1123 let Some(policy) = self.policy.as_ref() else {
1124 return Ok(());
1125 };
1126 let principal_id = Self::principal_of(req);
1127 let mut ctx = self.request_context(req);
1128 if let Some(t) = request_tags {
1129 ctx.request_object_tags = Some(t.clone());
1130 }
1131 if let Some(t) = existing_tags {
1132 ctx.existing_object_tags = Some(t.clone());
1133 }
1134 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1135 if decision.allow {
1136 Ok(())
1137 } else {
1138 crate::metrics::record_policy_denial(action, bucket);
1139 tracing::info!(
1140 action,
1141 bucket,
1142 key = ?key,
1143 principal = ?principal_id,
1144 source_ip = ?ctx.source_ip,
1145 user_agent = ?ctx.user_agent,
1146 secure_transport = ctx.secure_transport,
1147 matched_sid = ?decision.matched_sid,
1148 effect = ?decision.matched_effect,
1149 "S4 policy denied request"
1150 );
1151 Err(S3Error::with_message(
1152 S3ErrorCode::AccessDenied,
1153 format!("denied by S4 policy: {action} on bucket={bucket}"),
1154 ))
1155 }
1156 }
1157
1158 /// テスト用: backend を取り戻す (test helper、production では使わない).
1159 /// v0.6 #40 で `backend` が `Arc<B>` 化したので `Arc::try_unwrap` で
1160 /// 1-clone の場合のみ返す。共有されている (= replication dispatcher が
1161 /// 同じ Arc を持っていて未完了) 場合は `Err` を返さず panic させる
1162 /// (test 用途専用 helper の caller 契約を維持)。
1163 pub fn into_backend(self) -> B {
1164 Arc::try_unwrap(self.backend)
1165 .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1166 }
1167
1168 /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
1169 /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
1170 async fn partial_range_get(
1171 &self,
1172 req: &S3Request<GetObjectInput>,
1173 plan: s4_codec::index::RangePlan,
1174 client_start: u64,
1175 client_end_exclusive: u64,
1176 total_original: u64,
1177 get_start: Instant,
1178 ) -> S3Result<S3Response<GetObjectOutput>> {
1179 // 必要 byte 範囲だけを backend に partial GET
1180 let backend_range = s3s::dto::Range::Int {
1181 first: plan.byte_start,
1182 last: Some(plan.byte_end_exclusive - 1),
1183 };
1184 let backend_input = GetObjectInput {
1185 bucket: req.input.bucket.clone(),
1186 key: req.input.key.clone(),
1187 range: Some(backend_range),
1188 ..Default::default()
1189 };
1190 let backend_req = S3Request {
1191 input: backend_input,
1192 method: req.method.clone(),
1193 uri: req.uri.clone(),
1194 headers: req.headers.clone(),
1195 extensions: http::Extensions::new(),
1196 credentials: req.credentials.clone(),
1197 region: req.region.clone(),
1198 service: req.service.clone(),
1199 trailing_headers: None,
1200 };
1201 let mut backend_resp = self.backend.get_object(backend_req).await?;
1202 let blob = backend_resp.output.body.take().ok_or_else(|| {
1203 S3Error::with_message(
1204 S3ErrorCode::InternalError,
1205 "backend partial GET returned empty body",
1206 )
1207 })?;
1208 let bytes = collect_blob(blob, self.max_body_bytes)
1209 .await
1210 .map_err(internal("collect partial body"))?;
1211
1212 // frame parse + decompress
1213 let mut combined = BytesMut::new();
1214 for frame in FrameIter::new(bytes) {
1215 let (header, payload) = frame.map_err(|e| {
1216 S3Error::with_message(
1217 S3ErrorCode::InternalError,
1218 format!("partial-range frame parse: {e}"),
1219 )
1220 })?;
1221 let chunk_manifest = ChunkManifest {
1222 codec: header.codec,
1223 original_size: header.original_size,
1224 compressed_size: header.compressed_size,
1225 crc32c: header.crc32c,
1226 };
1227 let decompressed = self
1228 .registry
1229 .decompress(payload, &chunk_manifest)
1230 .await
1231 .map_err(internal("partial-range decompress"))?;
1232 combined.extend_from_slice(&decompressed);
1233 }
1234 let combined = combined.freeze();
1235 let sliced = combined
1236 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1237
1238 // response 組立て
1239 let returned_size = sliced.len() as u64;
1240 backend_resp.output.content_length = Some(returned_size as i64);
1241 backend_resp.output.content_range = Some(format!(
1242 "bytes {client_start}-{}/{total_original}",
1243 client_end_exclusive - 1
1244 ));
1245 backend_resp.output.checksum_crc32 = None;
1246 backend_resp.output.checksum_crc32c = None;
1247 backend_resp.output.checksum_crc64nvme = None;
1248 backend_resp.output.checksum_sha1 = None;
1249 backend_resp.output.checksum_sha256 = None;
1250 backend_resp.output.e_tag = None;
1251 backend_resp.output.body = Some(bytes_to_blob(sliced));
1252 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1253
1254 let elapsed = get_start.elapsed();
1255 crate::metrics::record_get(
1256 "partial",
1257 plan.byte_end_exclusive - plan.byte_start,
1258 returned_size,
1259 elapsed.as_secs_f64(),
1260 true,
1261 );
1262 info!(
1263 op = "get_object",
1264 bucket = %req.input.bucket,
1265 key = %req.input.key,
1266 bytes_in = plan.byte_end_exclusive - plan.byte_start,
1267 bytes_out = returned_size,
1268 total_object_size = total_original,
1269 range = true,
1270 path = "sidecar-partial",
1271 latency_ms = elapsed.as_millis() as u64,
1272 "S4 partial Range GET via sidecar index"
1273 );
1274 Ok(backend_resp)
1275 }
1276
1277 /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
1278 /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
1279 /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
1280 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1281 let bytes = encode_index(index);
1282 let len = bytes.len() as i64;
1283 let sidecar = sidecar_key(key);
1284 // v0.7 #49: synthetic re-entry URI must be percent-encoded; if
1285 // the (already legally-arbitrary) S3 key produces something we
1286 // cannot encode at all, drop the sidecar PUT (the GET path
1287 // falls back to a full read on a missing sidecar) instead of
1288 // panicking on `parse().unwrap()`.
1289 let uri = match safe_object_uri(bucket, &sidecar) {
1290 Ok(u) => u,
1291 Err(e) => {
1292 tracing::warn!(
1293 bucket,
1294 key,
1295 "S4 write_sidecar skipped (key not URI-encodable): {e}"
1296 );
1297 return;
1298 }
1299 };
1300 let put_input = PutObjectInput {
1301 bucket: bucket.into(),
1302 key: sidecar,
1303 body: Some(bytes_to_blob(bytes)),
1304 content_length: Some(len),
1305 content_type: Some("application/x-s4-index".into()),
1306 ..Default::default()
1307 };
1308 let put_req = S3Request {
1309 input: put_input,
1310 method: http::Method::PUT,
1311 uri,
1312 headers: http::HeaderMap::new(),
1313 extensions: http::Extensions::new(),
1314 credentials: None,
1315 region: None,
1316 service: None,
1317 trailing_headers: None,
1318 };
1319 if let Err(e) = self.backend.put_object(put_req).await {
1320 tracing::warn!(
1321 bucket,
1322 key,
1323 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1324 );
1325 }
1326 }
1327
1328 /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
1329 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1330 let sidecar = sidecar_key(key);
1331 // v0.7 #49: same encode-or-bail treatment as write_sidecar.
1332 let uri = safe_object_uri(bucket, &sidecar).ok()?;
1333 let get_input = GetObjectInput {
1334 bucket: bucket.into(),
1335 key: sidecar,
1336 ..Default::default()
1337 };
1338 let get_req = S3Request {
1339 input: get_input,
1340 method: http::Method::GET,
1341 uri,
1342 headers: http::HeaderMap::new(),
1343 extensions: http::Extensions::new(),
1344 credentials: None,
1345 region: None,
1346 service: None,
1347 trailing_headers: None,
1348 };
1349 let resp = self.backend.get_object(get_req).await.ok()?;
1350 let blob = resp.output.body?;
1351 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1352 decode_index(bytes).ok()
1353 }
1354
1355 /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
1356 ///
1357 /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
1358 /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
1359 /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
1360 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1361 let mut out = BytesMut::new();
1362 for frame in FrameIter::new(bytes) {
1363 let (header, payload) = frame.map_err(|e| {
1364 S3Error::with_message(
1365 S3ErrorCode::InternalError,
1366 format!("multipart frame parse: {e}"),
1367 )
1368 })?;
1369 let chunk_manifest = ChunkManifest {
1370 codec: header.codec,
1371 original_size: header.original_size,
1372 compressed_size: header.compressed_size,
1373 crc32c: header.crc32c,
1374 };
1375 let decompressed = self
1376 .registry
1377 .decompress(payload, &chunk_manifest)
1378 .await
1379 .map_err(internal("multipart frame decompress"))?;
1380 out.extend_from_slice(&decompressed);
1381 }
1382 Ok(out.freeze())
1383 }
1384}
1385
1386/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
1387/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
1388/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
1389/// reject the other variants for parity with AWS.
1390fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1391 let rest = s
1392 .strip_prefix("bytes=")
1393 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1394 let (a, b) = rest
1395 .split_once('-')
1396 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1397 let first: u64 = a
1398 .parse()
1399 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1400 let last: u64 = b
1401 .parse()
1402 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1403 if last < first {
1404 return Err(format!("CopySourceRange last < first: {s:?}"));
1405 }
1406 Ok(s3s::dto::Range::Int {
1407 first,
1408 last: Some(last),
1409 })
1410}
1411
1412/// v0.5 #34: synthesize the backend storage key for a given
1413/// (logical key, version-id) pair on an Enabled-versioning bucket.
1414///
1415/// Uses the `__s4ver__/` infix because:
1416/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
1417/// listing filter collisions)
1418/// - directory-style separator keeps S3 console "browse by prefix" UX intact
1419/// (versions roll up under one virtual folder per object)
1420/// - human-readable on debug logs / `aws s3 ls`
1421///
1422/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
1423/// keys containing `.__s4ver__/` from results so customers don't see internal
1424/// shadow objects.
1425pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1426 format!("{key}.__s4ver__/{version_id}")
1427}
1428
1429/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
1430/// scan; both list_objects filter and the GET passthrough check use this.
1431fn is_versioning_shadow_key(key: &str) -> bool {
1432 key.contains(".__s4ver__/")
1433}
1434
1435/// v0.6 #42: wall-clock seconds since the UNIX epoch — fed to
1436/// `mfa::check_mfa` so the TOTP verifier can match the client's
1437/// authenticator app's view of "now". Falls back to `0` on the
1438/// (impossible-in-practice) clock-before-1970 path so the verifier
1439/// rejects rather than panicking.
1440fn current_unix_secs() -> u64 {
1441 std::time::SystemTime::now()
1442 .duration_since(std::time::UNIX_EPOCH)
1443 .map(|d| d.as_secs())
1444 .unwrap_or(0)
1445}
1446
1447/// v0.6 #42: translate an `MfaError` into the matching S3 wire error.
1448///
1449/// - `Missing` / `SerialMismatch` / `InvalidCode` → `403 AccessDenied`
1450/// (S3 spec for MFA Delete: every gating failure surfaces as
1451/// `AccessDenied`, not a separate `MFA*` code).
1452/// - `Malformed` → `400 InvalidRequest` (the request itself is
1453/// syntactically broken, not a permission issue).
1454fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1455 match e {
1456 crate::mfa::MfaError::Missing => S3Error::with_message(
1457 S3ErrorCode::AccessDenied,
1458 "MFA token required for this operation",
1459 ),
1460 crate::mfa::MfaError::Malformed => S3Error::with_message(
1461 S3ErrorCode::InvalidRequest,
1462 "malformed x-amz-mfa header",
1463 ),
1464 crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1465 S3ErrorCode::AccessDenied,
1466 "MFA serial does not match configured device",
1467 ),
1468 crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1469 S3ErrorCode::AccessDenied,
1470 "invalid MFA code",
1471 ),
1472 }
1473}
1474
1475fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1476 metadata
1477 .as_ref()
1478 .and_then(|m| m.get(META_MULTIPART))
1479 .map(|v| v == "true")
1480 .unwrap_or(false)
1481}
1482
1483const META_CODEC: &str = "s4-codec";
1484const META_ORIGINAL_SIZE: &str = "s4-original-size";
1485const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1486const META_CRC32C: &str = "s4-crc32c";
1487/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
1488/// GET 時にこの flag を見て frame parser を起動する。
1489const META_MULTIPART: &str = "s4-multipart";
1490/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
1491/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
1492/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
1493const META_FRAMED: &str = "s4-framed";
1494
1495fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1496 metadata
1497 .as_ref()
1498 .and_then(|m| m.get(META_FRAMED))
1499 .map(|v| v == "true")
1500 .unwrap_or(false)
1501}
1502
1503/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
1504fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1505 metadata
1506 .as_ref()
1507 .and_then(|m| m.get("s4-encrypted"))
1508 .map(|v| v == "aes-256-gcm")
1509 .unwrap_or(false)
1510}
1511
1512/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
1513/// contract is "all three or none" — partial sets are a 400.
1514///
1515/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
1516/// no encryption), `Ok(Some(material))` on validated client key, and
1517/// `Err` for malformed or partial inputs.
1518fn extract_sse_c_material(
1519 algorithm: &Option<String>,
1520 key: &Option<String>,
1521 md5: &Option<String>,
1522) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1523 match (algorithm, key, md5) {
1524 (None, None, None) => Ok(None),
1525 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1526 .map(Some)
1527 .map_err(sse_c_error_to_s3),
1528 _ => Err(S3Error::with_message(
1529 S3ErrorCode::InvalidRequest,
1530 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1531 )),
1532 }
1533}
1534
1535/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
1536/// Returns the key-id to wrap under, falling back to the gateway default.
1537fn extract_kms_key_id(
1538 sse: &Option<ServerSideEncryption>,
1539 sse_kms_key_id: &Option<String>,
1540 gateway_default: Option<&str>,
1541) -> Option<String> {
1542 let asks_for_kms = sse
1543 .as_ref()
1544 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1545 .unwrap_or(false);
1546 if !asks_for_kms {
1547 return None;
1548 }
1549 sse_kms_key_id
1550 .clone()
1551 .or_else(|| gateway_default.map(str::to_owned))
1552}
1553
1554/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
1555/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
1556/// transient KMS outage (503). Other variants are 500 InternalError.
1557fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1558 use crate::kms::KmsError as K;
1559 match e {
1560 K::KeyNotFound { key_id } => S3Error::with_message(
1561 S3ErrorCode::InvalidArgument,
1562 format!("KMS key not found: {key_id}"),
1563 ),
1564 K::BackendUnavailable { message } => S3Error::with_message(
1565 S3ErrorCode::ServiceUnavailable,
1566 format!("KMS backend unavailable: {message}"),
1567 ),
1568 other => S3Error::with_message(
1569 S3ErrorCode::InternalError,
1570 format!("KMS error: {other}"),
1571 ),
1572 }
1573}
1574
1575/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
1576/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
1577/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
1578fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1579 use crate::sse::SseError as E;
1580 match e {
1581 E::WrongCustomerKey => S3Error::with_message(
1582 S3ErrorCode::AccessDenied,
1583 "SSE-C key does not match the key used at PUT time",
1584 ),
1585 E::InvalidCustomerKey { reason } => S3Error::with_message(
1586 S3ErrorCode::InvalidArgument,
1587 format!("SSE-C: {reason}"),
1588 ),
1589 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1590 S3ErrorCode::InvalidArgument,
1591 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1592 ),
1593 E::CustomerKeyRequired => S3Error::with_message(
1594 S3ErrorCode::InvalidRequest,
1595 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1596 ),
1597 E::CustomerKeyUnexpected => S3Error::with_message(
1598 S3ErrorCode::InvalidRequest,
1599 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1600 ),
1601 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1602 }
1603}
1604
1605fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1606 let m = metadata.as_ref()?;
1607 let codec = m
1608 .get(META_CODEC)
1609 .and_then(|s| s.parse::<CodecKind>().ok())?;
1610 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1611 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1612 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1613 Some(ChunkManifest {
1614 codec,
1615 original_size,
1616 compressed_size,
1617 crc32c,
1618 })
1619}
1620
1621fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1622 let meta = metadata.get_or_insert_with(Default::default);
1623 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1624 meta.insert(
1625 META_ORIGINAL_SIZE.into(),
1626 manifest.original_size.to_string(),
1627 );
1628 meta.insert(
1629 META_COMPRESSED_SIZE.into(),
1630 manifest.compressed_size.to_string(),
1631 );
1632 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1633}
1634
1635fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1636 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1637}
1638
1639/// v0.6 #41: map a `select::SelectError` to the S3 error surface. AWS
1640/// uses a domain-specific `InvalidSqlExpression` code for parse / unsupported
1641/// errors, but s3s 0.13 doesn't expose that as a typed variant — we
1642/// fall back to the well-known `InvalidRequest` 400 with a descriptive
1643/// message that includes the original error context.
1644fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1645 use crate::select::SelectError;
1646 match e {
1647 SelectError::Parse(msg) => S3Error::with_message(
1648 S3ErrorCode::InvalidRequest,
1649 format!("SQL parse error: {msg}"),
1650 ),
1651 SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1652 S3ErrorCode::InvalidRequest,
1653 format!("unsupported SQL feature: {msg}"),
1654 ),
1655 SelectError::RowEval(msg) => S3Error::with_message(
1656 S3ErrorCode::InvalidRequest,
1657 format!("SQL row evaluation error: {msg}"),
1658 ),
1659 SelectError::InputFormat(msg) => S3Error::with_message(
1660 S3ErrorCode::InvalidRequest,
1661 format!("{fmt} input format error: {msg}"),
1662 ),
1663 }
1664}
1665
1666/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
1667/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
1668/// (including missing) is treated as `false`.
1669fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1670 headers
1671 .get("x-amz-bypass-governance-retention")
1672 .and_then(|v| v.to_str().ok())
1673 .map(|s| s.eq_ignore_ascii_case("true"))
1674 .unwrap_or(false)
1675}
1676
1677/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
1678/// as an RFC3339 string and re-parsing through `chrono`. The string format
1679/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
1680/// s4-server) into our direct deps. Returns `None` if the format/parse fails
1681/// or the value is outside `chrono`'s supported range.
1682fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1683 let mut buf = Vec::new();
1684 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1685 let s = std::str::from_utf8(&buf).ok()?;
1686 chrono::DateTime::parse_from_rfc3339(s)
1687 .ok()
1688 .map(|dt| dt.with_timezone(&chrono::Utc))
1689}
1690
1691/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
1692/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
1693fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1694 // chrono's RFC3339 output format matches s3s' parser ("...Z" with
1695 // optional sub-second precision). Fall back to UNIX_EPOCH if anything
1696 // unexpected happens — we never produce malformed strings, so this
1697 // branch is unreachable in practice.
1698 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1699 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1700}
1701
1702/// v0.6 #39: convert our internal [`crate::tagging::TagSet`] into the
1703/// s3s `Vec<Tag>` wire shape used on `GetObject/BucketTaggingOutput`.
1704/// Both halves of every pair land in the `Some(_)` slot — AWS marks
1705/// the field optional but always populates it on response.
1706fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1707 set.iter()
1708 .map(|(k, v)| Tag {
1709 key: Some(k.clone()),
1710 value: Some(v.clone()),
1711 })
1712 .collect()
1713}
1714
1715/// v0.6 #39: inverse of [`tagset_to_aws`] for input handlers. Missing
1716/// keys / values become empty strings (mirrors AWS, which rejects
1717/// `<Key/>` with InvalidTag at the parser layer; downstream
1718/// `TagSet::validate` then enforces our size limits).
1719fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1720 let pairs = tags
1721 .iter()
1722 .map(|t| {
1723 (
1724 t.key.clone().unwrap_or_default(),
1725 t.value.clone().unwrap_or_default(),
1726 )
1727 })
1728 .collect();
1729 crate::tagging::TagSet::from_pairs(pairs)
1730}
1731
1732/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
1733/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
1734/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
1735pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1736 if total == 0 {
1737 return Err("cannot range-get zero-length object".into());
1738 }
1739 match range {
1740 s3s::dto::Range::Int { first, last } => {
1741 let start = *first;
1742 let end_inclusive = match last {
1743 Some(l) => (*l).min(total - 1),
1744 None => total - 1,
1745 };
1746 if start > end_inclusive || start >= total {
1747 return Err(format!(
1748 "range bytes={start}-{:?} out of object size {total}",
1749 last
1750 ));
1751 }
1752 Ok((start, end_inclusive + 1))
1753 }
1754 s3s::dto::Range::Suffix { length } => {
1755 let len = (*length).min(total);
1756 Ok((total - len, total))
1757 }
1758 }
1759}
1760
1761#[async_trait::async_trait]
1762impl<B: S3> S3 for S4Service<B> {
1763 // === 圧縮を挟む path (PUT) ===
1764 #[tracing::instrument(
1765 name = "s4.put_object",
1766 skip(self, req),
1767 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1768 )]
1769 async fn put_object(
1770 &self,
1771 mut req: S3Request<PutObjectInput>,
1772 ) -> S3Result<S3Response<PutObjectOutput>> {
1773 let put_start = Instant::now();
1774 let put_bucket = req.input.bucket.clone();
1775 let put_key = req.input.key.clone();
1776 let access_preamble = self.access_log_preamble(&req);
1777 self.enforce_rate_limit(&req, &put_bucket)?;
1778 // v0.6 #39: parse `x-amz-tagging` (URL-encoded query string) so
1779 // the IAM policy gate sees the request's tags via
1780 // `s3:RequestObjectTag/<key>`. `existing_object_tags` is also
1781 // resolved from the Tagging manager (when wired) so
1782 // `s3:ExistingObjectTag/<key>` works on overwrite.
1783 let request_tags: Option<crate::tagging::TagSet> = req
1784 .input
1785 .tagging
1786 .as_deref()
1787 .map(crate::tagging::parse_tagging_header)
1788 .transpose()
1789 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1790 let existing_tags: Option<crate::tagging::TagSet> = self
1791 .tagging
1792 .as_ref()
1793 .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1794 self.enforce_policy_with_extra(
1795 &req,
1796 "s3:PutObject",
1797 &put_bucket,
1798 Some(&put_key),
1799 request_tags.as_ref(),
1800 existing_tags.as_ref(),
1801 )?;
1802 // v0.5 #30: an Object Lock-protected key cannot be overwritten by
1803 // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
1804 // bucket PUTs are exempt because they materialise a fresh
1805 // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
1806 // locked version's bytes are untouched. The check mirrors the
1807 // delete path (Compliance never bypassable, Governance via the
1808 // bypass header, legal hold never).
1809 if let Some(mgr) = self.object_lock.as_ref()
1810 && let Some(state) = mgr.get(&put_bucket, &put_key)
1811 {
1812 let bucket_versioned_enabled = self
1813 .versioning
1814 .as_ref()
1815 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1816 .unwrap_or(false);
1817 if !bucket_versioned_enabled {
1818 let bypass = parse_bypass_governance_header(&req.headers);
1819 let now = chrono::Utc::now();
1820 if !state.can_delete(now, bypass) {
1821 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1822 return Err(S3Error::with_message(
1823 S3ErrorCode::AccessDenied,
1824 "Access Denied because object protected by object lock",
1825 ));
1826 }
1827 }
1828 }
1829 // v0.5 #30: per-PUT explicit retention / legal hold (S3
1830 // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
1831 // `x-amz-object-lock-legal-hold`). Captured before the body
1832 // moves into the backend; persisted into the manager only on
1833 // backend success below.
1834 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1835 .input
1836 .object_lock_mode
1837 .as_ref()
1838 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1839 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1840 .input
1841 .object_lock_retain_until_date
1842 .as_ref()
1843 .and_then(timestamp_to_chrono_utc);
1844 let explicit_legal_hold_on: Option<bool> = req
1845 .input
1846 .object_lock_legal_hold_status
1847 .as_ref()
1848 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1849 if let Some(blob) = req.input.body.take() {
1850 // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
1851 // compress fast path、そうでなければ従来の collect-then-compress。
1852 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1853 .await
1854 .map_err(internal("peek put sample"))?;
1855 let sample_len = sample.len().min(SAMPLE_BYTES);
1856 // v0.8 #56: pass the request's Content-Length (when present) so
1857 // the sampling dispatcher can promote large objects to a GPU
1858 // codec. Chunked transfers (no Content-Length) keep CPU.
1859 let total_size_hint = req.input.content_length.and_then(|n| u64::try_from(n).ok());
1860 let kind = self
1861 .dispatcher
1862 .pick_with_size_hint(&sample[..sample_len], total_size_hint)
1863 .await;
1864
1865 // Passthrough buys nothing from S4F2 wrapping (no compression =
1866 // no per-chunk frame to skip past) and the +28-byte header
1867 // overhead breaks size-sensitive callers that expect a true
1868 // pass-through. So passthrough always uses the legacy raw-blob
1869 // path; only compressing codecs go through the framed path.
1870 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1871 let (compressed, manifest, is_framed) = if use_framed {
1872 // streaming fast path: input は memory に collect しない
1873 let chained = chain_sample_with_rest(sample, rest_stream);
1874 debug!(
1875 bucket = ?req.input.bucket,
1876 key = ?req.input.key,
1877 codec = kind.as_str(),
1878 path = "streaming-framed",
1879 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1880 );
1881 // v0.4 #16: pick the chunk size based on the request's
1882 // Content-Length when known, falling back to the 4 MiB
1883 // default for chunked transfers.
1884 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1885 let (body, manifest) = streaming_compress_to_frames(
1886 chained,
1887 Arc::clone(&self.registry),
1888 kind,
1889 chunk_size,
1890 )
1891 .await
1892 .map_err(internal("streaming framed compress"))?;
1893 (body, manifest, true)
1894 } else {
1895 // GPU codec 等で streaming-aware でないものは bytes-buffered path
1896 // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1897 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1898 .await
1899 .map_err(internal("collect put body (buffered path)"))?;
1900 debug!(
1901 bucket = ?req.input.bucket,
1902 key = ?req.input.key,
1903 bytes = bytes.len(),
1904 codec = kind.as_str(),
1905 path = "buffered",
1906 "S4 put_object: compressing (buffered, raw blob)"
1907 );
1908 // v0.8 #55: telemetry-returning compress so we can stamp
1909 // GPU-pipeline Prometheus metrics (`s4_gpu_compress_seconds`,
1910 // throughput gauge, OOM counter) for nvcomp / dietgpu codecs.
1911 // CPU codecs come back with `gpu_seconds = None` and the
1912 // stamp helper short-circuits — no extra cost on CPU path.
1913 let (compress_res, tel) =
1914 self.registry.compress_with_telemetry(bytes, kind).await;
1915 stamp_gpu_compress_telemetry(&tel);
1916 let (body, m) = compress_res.map_err(internal("registry compress"))?;
1917 (body, m, false)
1918 };
1919
1920 write_manifest(&mut req.input.metadata, &manifest);
1921 if is_framed {
1922 // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1923 req.input
1924 .metadata
1925 .get_or_insert_with(Default::default)
1926 .insert(META_FRAMED.into(), "true".into());
1927 }
1928 // 重要: content_length を圧縮後サイズで更新する。
1929 // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1930 // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1931 req.input.content_length = Some(compressed.len() as i64);
1932 // body を書き換えたので、客側が送ってきた original body 用の
1933 // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1934 // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1935 // ChunkManifest.crc32c で担保している。
1936 req.input.checksum_algorithm = None;
1937 req.input.checksum_crc32 = None;
1938 req.input.checksum_crc32c = None;
1939 req.input.checksum_crc64nvme = None;
1940 req.input.checksum_sha1 = None;
1941 req.input.checksum_sha256 = None;
1942 req.input.content_md5 = None;
1943 let original_size = manifest.original_size;
1944 let compressed_size = manifest.compressed_size;
1945 let codec_label = manifest.codec.as_str();
1946 // framed body は GET 側で sidecar partial-fetch を効かせるため
1947 // build_index_from_body で sidecar を組み立てて backend に PUT する。
1948 let sidecar_index = if is_framed {
1949 s4_codec::index::build_index_from_body(&compressed).ok()
1950 } else {
1951 None
1952 };
1953 // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
1954 // Precedence:
1955 // - SSE-C headers present → per-request customer key (S4E3)
1956 // - server-managed keyring configured → active key (S4E2)
1957 // - neither → no encryption (raw compressed body)
1958 // The `s4-encrypted: aes-256-gcm` metadata flag is set in
1959 // both encrypted modes; the on-disk frame magic distinguishes
1960 // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
1961 // v0.7 #48 BUG-2/3 fix: take() the SSE fields off req.input
1962 // so the encryption headers are NOT forwarded to the
1963 // backend. S4 owns the encrypt-then-store contract; if we
1964 // leave the headers in place, real S3-compat backends
1965 // (MinIO / AWS) try to apply their own SSE on top and
1966 // either reject (MinIO requires HTTPS for SSE-C) or fail
1967 // (MinIO has no KMS configured). MemoryBackend ignored
1968 // these so mock tests passed.
1969 let sse_c_alg = req.input.sse_customer_algorithm.take();
1970 let sse_c_key = req.input.sse_customer_key.take();
1971 let sse_c_md5 = req.input.sse_customer_key_md5.take();
1972 let sse_header = req.input.server_side_encryption.take();
1973 let sse_kms_key = req.input.ssekms_key_id.take();
1974 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1975 // v0.5 #28: SSE-KMS request? Resolves to None unless the
1976 // request asks for `aws:kms` AND a key id is available
1977 // (explicit header or gateway default). When set, we'll
1978 // generate a per-object DEK below.
1979 let kms_key_id = extract_kms_key_id(
1980 &sse_header,
1981 &sse_kms_key,
1982 self.kms_default_key_id.as_deref(),
1983 );
1984 // v0.5 #32: in compliance-strict mode, every PUT must
1985 // declare SSE — either client-supplied (SSE-C), KMS, or by
1986 // virtue of a server-side keyring being configured (which
1987 // applies SSE-S4 to every PUT automatically). Requests that
1988 // would otherwise land as plain compressed bytes are
1989 // rejected with 400 InvalidRequest.
1990 if self.compliance_strict
1991 && sse_c_material.is_none()
1992 && kms_key_id.is_none()
1993 && self.sse_keyring.is_none()
1994 && sse_header.as_ref().map(|s| s.as_str())
1995 != Some(ServerSideEncryption::AES256)
1996 {
1997 return Err(S3Error::with_message(
1998 S3ErrorCode::InvalidRequest,
1999 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
2000 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
2001 ));
2002 }
2003 // SSE-C and SSE-KMS are mutually exclusive on a single PUT
2004 // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
2005 if sse_c_material.is_some() && kms_key_id.is_some() {
2006 return Err(S3Error::with_message(
2007 S3ErrorCode::InvalidArgument,
2008 "SSE-C and SSE-KMS cannot be used together on the same PUT",
2009 ));
2010 }
2011 // KMS path needs to call generate_dek().await before the
2012 // body_to_send branch; capture the result here.
2013 //
2014 // v0.8.1 #58: the plaintext DEK lives in three places
2015 // during one PUT:
2016 //
2017 // 1. The `Zeroizing<Vec<u8>>` returned by `generate_dek`
2018 // — wiped when the binding `dek` falls out of scope at
2019 // the end of this `if`-arm.
2020 // 2. The stack `[u8; 32]` we copy into for `SseSource::Kms`
2021 // — wrapped in `Zeroizing<[u8; 32]>` so it's wiped when
2022 // the outer `kms_wrap` `Option` is dropped at the end
2023 // of `put_object`.
2024 // 3. AES-GCM internal key state inside the `aes-gcm`
2025 // crate during `encrypt_with_source` — out of scope
2026 // for this fix; tracked separately in v0.8.2.
2027 let kms_wrap: Option<(zeroize::Zeroizing<[u8; 32]>, crate::kms::WrappedDek)> =
2028 if let Some(ref key_id) = kms_key_id {
2029 let kms = self.kms.as_ref().ok_or_else(|| {
2030 S3Error::with_message(
2031 S3ErrorCode::InvalidRequest,
2032 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2033 )
2034 })?;
2035 // `dek` is `Zeroizing<Vec<u8>>`; deref + slice access
2036 // works unchanged via `Deref<Target=Vec<u8>>`.
2037 let (dek, wrapped) = kms
2038 .generate_dek(key_id)
2039 .await
2040 .map_err(kms_error_to_s3)?;
2041 if dek.len() != 32 {
2042 return Err(S3Error::with_message(
2043 S3ErrorCode::InternalError,
2044 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
2045 ));
2046 }
2047 let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
2048 zeroize::Zeroizing::new([0u8; 32]);
2049 dek_arr.copy_from_slice(&dek);
2050 // `dek` (the `Zeroizing<Vec<u8>>`) is dropped at the
2051 // end of this scope, wiping the heap allocation.
2052 Some((dek_arr, wrapped))
2053 } else {
2054 None
2055 };
2056 // v0.7 #48 BUG-4 fix: stamp the SSE *type* into metadata
2057 // alongside `s4-encrypted` so HEAD (which doesn't fetch the
2058 // body) can echo the correct `x-amz-server-side-encryption`
2059 // value. Without this, HEAD on an SSE-KMS object would not
2060 // echo `aws:kms` because the frame magic is only available
2061 // on the body (which HEAD doesn't read).
2062 let body_to_send = if let Some(ref m) = sse_c_material {
2063 let meta = req.input.metadata.get_or_insert_with(Default::default);
2064 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2065 meta.insert("s4-sse-type".into(), "AES256".into());
2066 meta.insert("s4-sse-c-key-md5".into(),
2067 base64::engine::general_purpose::STANDARD.encode(m.key_md5));
2068 crate::sse::encrypt_with_source(
2069 &compressed,
2070 crate::sse::SseSource::CustomerKey {
2071 key: &m.key,
2072 key_md5: &m.key_md5,
2073 },
2074 )
2075 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
2076 let meta = req.input.metadata.get_or_insert_with(Default::default);
2077 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2078 meta.insert("s4-sse-type".into(), "aws:kms".into());
2079 meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
2080 // v0.8.1 #58: `dek` is `&Zeroizing<[u8; 32]>`; `SseSource::Kms`
2081 // wants `&[u8; 32]`. Rust auto-derefs `&Zeroizing<T>` to
2082 // `&T` here via `Deref<Target=T>`, so the binding picks
2083 // up the inner array reference without copying. The array
2084 // stays in the `Zeroizing` wrapper that owns it and gets
2085 // wiped when `kms_wrap` drops at the end of `put_object`.
2086 let dek_ref: &[u8; 32] = dek;
2087 crate::sse::encrypt_with_source(
2088 &compressed,
2089 crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
2090 )
2091 } else if let Some(keyring) = self.sse_keyring.as_ref() {
2092 // SSE-S4 is server-driven transparent encryption; the
2093 // client didn't ask for SSE. We stamp `s4-encrypted`
2094 // (internal flag the GET path needs) but deliberately
2095 // do NOT stamp `s4-sse-type` — that lights up the HEAD
2096 // echo of `x-amz-server-side-encryption: AES256`,
2097 // which would falsely advertise AWS-style SSE-S3
2098 // semantics the operator didn't request.
2099 let meta = req.input.metadata.get_or_insert_with(Default::default);
2100 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2101 // v0.8 #52: when `--sse-chunk-size > 0` is configured,
2102 // emit the chunked S4E5 frame so the matching GET can
2103 // stream-decrypt instead of buffering 5 GiB before
2104 // emitting a byte. Falls back to the buffered S4E2
2105 // frame at chunk_size=0 (default) so existing
2106 // deployments are bit-for-bit unchanged.
2107 if self.sse_chunk_size > 0 {
2108 crate::sse::encrypt_v2_chunked(
2109 &compressed,
2110 keyring,
2111 self.sse_chunk_size,
2112 )
2113 .map_err(|e| {
2114 S3Error::with_message(
2115 S3ErrorCode::InternalError,
2116 format!("SSE-S4 chunked encrypt failed: {e}"),
2117 )
2118 })?
2119 } else {
2120 crate::sse::encrypt_v2(&compressed, keyring)
2121 }
2122 } else {
2123 compressed.clone()
2124 };
2125 // v0.6 #40: capture the about-to-be-sent body + metadata so
2126 // the replication dispatcher (run after the source PUT
2127 // succeeds) can hand the same backend bytes to the
2128 // destination bucket. `Bytes` clone is cheap (refcounted).
2129 let replication_body = body_to_send.clone();
2130 let replication_metadata = req.input.metadata.clone();
2131 // v0.7 #48 BUG-1 fix: SSE encryption (S4E1/E2/E3/E4 frames)
2132 // makes the body longer than the post-compression bytes
2133 // (header + nonce + tag overhead). The earlier
2134 // content_length stamp at compressed.len() is now stale, so
2135 // re-stamp from the actual bytes about to be sent or the
2136 // backend (real S3 / MinIO) rejects with
2137 // `StreamLengthMismatch`. MemoryBackend never validated
2138 // this, which is why mock-only tests passed.
2139 req.input.content_length = Some(body_to_send.len() as i64);
2140 req.input.body = Some(bytes_to_blob(body_to_send));
2141 // v0.5 #34: pre-allocate a version-id when the bucket is
2142 // Enabled, then redirect the backend storage key to the
2143 // shadow path so older versions survive newer PUTs.
2144 // Suspended / Unversioned buckets keep using the plain
2145 // `<key>` (S3 spec: Suspended overwrites the same backend
2146 // object). Pre-allocation (instead of recording after PUT)
2147 // ensures the shadow key + the response's
2148 // `x-amz-version-id` use the same vid.
2149 let pending_version: Option<crate::versioning::PutOutcome> = self
2150 .versioning
2151 .as_ref()
2152 .map(|mgr| mgr.state(&put_bucket))
2153 .map(|state| match state {
2154 crate::versioning::VersioningState::Enabled => {
2155 crate::versioning::PutOutcome {
2156 version_id: crate::versioning::VersioningManager::new_version_id(),
2157 versioned_response: true,
2158 }
2159 }
2160 crate::versioning::VersioningState::Suspended
2161 | crate::versioning::VersioningState::Unversioned => {
2162 crate::versioning::PutOutcome {
2163 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2164 versioned_response: false,
2165 }
2166 }
2167 });
2168 if let Some(ref pv) = pending_version
2169 && pv.versioned_response
2170 {
2171 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2172 }
2173 let mut backend_resp = self.backend.put_object(req).await;
2174 if let Some(idx) = sidecar_index
2175 && backend_resp.is_ok()
2176 && idx.entries.len() > 1
2177 {
2178 // 1 chunk しかない (small object) なら sidecar は意味がない (=
2179 // partial fetch しても full body と同じ範囲) ので省略。
2180 // Sidecar は user-visible key で書く (latest version の
2181 // partial fetch path 用)。Old versions の Range GET は今 task
2182 // の scope 外 (full read fallback でも意味的には正しい)。
2183 self.write_sidecar(&put_bucket, &put_key, &idx).await;
2184 }
2185 // v0.5 #34: commit the new version into the manager only on
2186 // backend success. Use the pre-allocated vid so the response
2187 // header and the chain entry agree.
2188 if let (Some(mgr), Some(pv), Ok(resp)) = (
2189 self.versioning.as_ref(),
2190 pending_version.as_ref(),
2191 backend_resp.as_mut(),
2192 ) {
2193 let etag = resp
2194 .output
2195 .e_tag
2196 .clone()
2197 .map(ETag::into_value)
2198 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2199 let now = chrono::Utc::now();
2200 mgr.commit_put_with_version(
2201 &put_bucket,
2202 &put_key,
2203 crate::versioning::VersionEntry {
2204 version_id: pv.version_id.clone(),
2205 etag,
2206 size: original_size,
2207 is_delete_marker: false,
2208 created_at: now,
2209 },
2210 );
2211 if pv.versioned_response {
2212 resp.output.version_id = Some(pv.version_id.clone());
2213 }
2214 }
2215 // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
2216 // so the client knows the server actually applied the
2217 // requested algorithm and which key fingerprint matched.
2218 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2219 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2220 resp.output.sse_customer_key_md5 = Some(
2221 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2222 );
2223 }
2224 // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
2225 // the backend returned (AWS KMS returns the ARN even when
2226 // the request used an alias).
2227 if let (Some((_, wrapped)), Ok(resp)) =
2228 (kms_wrap.as_ref(), backend_resp.as_mut())
2229 {
2230 resp.output.server_side_encryption =
2231 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2232 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2233 }
2234 // v0.5 #30: persist any per-PUT explicit retention / legal
2235 // hold the client supplied, then auto-apply the bucket
2236 // default (no-op when state is already populated). The
2237 // explicit fields take precedence — the bucket-default
2238 // helper bails out as soon as it sees any retention.
2239 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2240 if explicit_lock_mode.is_some()
2241 || explicit_retain_until.is_some()
2242 || explicit_legal_hold_on.is_some()
2243 {
2244 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2245 if let Some(m) = explicit_lock_mode {
2246 state.mode = Some(m);
2247 }
2248 if let Some(u) = explicit_retain_until {
2249 state.retain_until = Some(u);
2250 }
2251 if let Some(lh) = explicit_legal_hold_on {
2252 state.legal_hold_on = lh;
2253 }
2254 mgr.set(&put_bucket, &put_key, state);
2255 }
2256 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2257 }
2258 let _ = (original_size, compressed_size); // mute unused warnings
2259 let elapsed = put_start.elapsed();
2260 crate::metrics::record_put(
2261 codec_label,
2262 original_size,
2263 compressed_size,
2264 elapsed.as_secs_f64(),
2265 backend_resp.is_ok(),
2266 );
2267 // v0.4 #20: structured access-log entry (best-effort).
2268 self.record_access(
2269 access_preamble,
2270 "REST.PUT.OBJECT",
2271 &put_bucket,
2272 Some(&put_key),
2273 if backend_resp.is_ok() { 200 } else { 500 },
2274 compressed_size,
2275 original_size,
2276 elapsed.as_millis() as u64,
2277 backend_resp.as_ref().err().map(|e| e.code().as_str()),
2278 )
2279 .await;
2280 info!(
2281 op = "put_object",
2282 bucket = %put_bucket,
2283 key = %put_key,
2284 codec = codec_label,
2285 bytes_in = original_size,
2286 bytes_out = compressed_size,
2287 ratio = format!(
2288 "{:.3}",
2289 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2290 ),
2291 latency_ms = elapsed.as_millis() as u64,
2292 ok = backend_resp.is_ok(),
2293 "S4 put completed"
2294 );
2295 // v0.6 #35: fire bucket-notification destinations (best-effort,
2296 // detached). Skipped when no manager is attached or when the
2297 // bucket has no rule matching `s3:ObjectCreated:Put` for this
2298 // key.
2299 if backend_resp.is_ok()
2300 && let Some(mgr) = self.notifications.as_ref()
2301 {
2302 let dests = mgr.match_destinations(
2303 &put_bucket,
2304 &crate::notifications::EventType::ObjectCreatedPut,
2305 &put_key,
2306 );
2307 if !dests.is_empty() {
2308 let etag = backend_resp
2309 .as_ref()
2310 .ok()
2311 .and_then(|r| r.output.e_tag.clone())
2312 .map(ETag::into_value);
2313 let version_id = pending_version
2314 .as_ref()
2315 .filter(|pv| pv.versioned_response)
2316 .map(|pv| pv.version_id.clone());
2317 tokio::spawn(crate::notifications::dispatch_event(
2318 Arc::clone(mgr),
2319 put_bucket.clone(),
2320 put_key.clone(),
2321 crate::notifications::EventType::ObjectCreatedPut,
2322 Some(original_size),
2323 etag,
2324 version_id,
2325 format!("S4-{}", uuid::Uuid::new_v4()),
2326 ));
2327 }
2328 }
2329 // v0.6 #39: persist parsed `x-amz-tagging` tags into the
2330 // tagging manager on a successful PUT. AWS PutObject's
2331 // tagging is a full-replace operation (not a merge), so
2332 // any pre-existing entry for `(bucket, key)` is overwritten.
2333 if backend_resp.is_ok()
2334 && let (Some(mgr), Some(tags)) =
2335 (self.tagging.as_ref(), request_tags.clone())
2336 {
2337 mgr.put_object_tags(&put_bucket, &put_key, tags);
2338 }
2339 // v0.6 #40: cross-bucket replication fire-point. On
2340 // successful source PUT, consult the replication manager;
2341 // when an enabled rule matches, mark the source key
2342 // `Pending` and spawn a detached task that PUTs the same
2343 // backend bytes + metadata to the rule's destination
2344 // bucket. The dispatcher itself records `Completed` /
2345 // `Failed` and bumps the drop counter on retry-budget
2346 // exhaustion.
2347 self.spawn_replication_if_matched(
2348 &put_bucket,
2349 &put_key,
2350 &request_tags,
2351 &replication_body,
2352 &replication_metadata,
2353 backend_resp.is_ok(),
2354 pending_version.as_ref(),
2355 );
2356 return backend_resp;
2357 }
2358 // Body-less PUT (rare: zero-length object). Mirror the body-full
2359 // versioning hooks so list_object_versions / GET-by-version still see
2360 // empty-body objects in the chain.
2361 let pending_version: Option<crate::versioning::PutOutcome> = self
2362 .versioning
2363 .as_ref()
2364 .map(|mgr| mgr.state(&put_bucket))
2365 .map(|state| match state {
2366 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2367 version_id: crate::versioning::VersioningManager::new_version_id(),
2368 versioned_response: true,
2369 },
2370 _ => crate::versioning::PutOutcome {
2371 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2372 versioned_response: false,
2373 },
2374 });
2375 if let Some(ref pv) = pending_version
2376 && pv.versioned_response
2377 {
2378 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2379 }
2380 let mut backend_resp = self.backend.put_object(req).await;
2381 if let (Some(mgr), Some(pv), Ok(resp)) = (
2382 self.versioning.as_ref(),
2383 pending_version.as_ref(),
2384 backend_resp.as_mut(),
2385 ) {
2386 let etag = resp
2387 .output
2388 .e_tag
2389 .clone()
2390 .map(ETag::into_value)
2391 .unwrap_or_default();
2392 let now = chrono::Utc::now();
2393 mgr.commit_put_with_version(
2394 &put_bucket,
2395 &put_key,
2396 crate::versioning::VersionEntry {
2397 version_id: pv.version_id.clone(),
2398 etag,
2399 size: 0,
2400 is_delete_marker: false,
2401 created_at: now,
2402 },
2403 );
2404 if pv.versioned_response {
2405 resp.output.version_id = Some(pv.version_id.clone());
2406 }
2407 }
2408 // v0.5 #30: same explicit-then-default lock-state commit as the
2409 // body-bearing branch above, so a zero-length PUT also picks up
2410 // bucket-default retention.
2411 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2412 if explicit_lock_mode.is_some()
2413 || explicit_retain_until.is_some()
2414 || explicit_legal_hold_on.is_some()
2415 {
2416 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2417 if let Some(m) = explicit_lock_mode {
2418 state.mode = Some(m);
2419 }
2420 if let Some(u) = explicit_retain_until {
2421 state.retain_until = Some(u);
2422 }
2423 if let Some(lh) = explicit_legal_hold_on {
2424 state.legal_hold_on = lh;
2425 }
2426 mgr.set(&put_bucket, &put_key, state);
2427 }
2428 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2429 }
2430 // v0.6 #35: same notification fire-point as the body-bearing PUT
2431 // branch above (zero-length objects still match `ObjectCreated:Put`
2432 // rules per the AWS event taxonomy).
2433 if backend_resp.is_ok()
2434 && let Some(mgr) = self.notifications.as_ref()
2435 {
2436 let dests = mgr.match_destinations(
2437 &put_bucket,
2438 &crate::notifications::EventType::ObjectCreatedPut,
2439 &put_key,
2440 );
2441 if !dests.is_empty() {
2442 let etag = backend_resp
2443 .as_ref()
2444 .ok()
2445 .and_then(|r| r.output.e_tag.clone())
2446 .map(ETag::into_value);
2447 let version_id = pending_version
2448 .as_ref()
2449 .filter(|pv| pv.versioned_response)
2450 .map(|pv| pv.version_id.clone());
2451 tokio::spawn(crate::notifications::dispatch_event(
2452 Arc::clone(mgr),
2453 put_bucket.clone(),
2454 put_key.clone(),
2455 crate::notifications::EventType::ObjectCreatedPut,
2456 Some(0),
2457 etag,
2458 version_id,
2459 format!("S4-{}", uuid::Uuid::new_v4()),
2460 ));
2461 }
2462 }
2463 // v0.6 #39: persist parsed `x-amz-tagging` for the body-less
2464 // (zero-length) PUT branch too — same shape as the body-bearing
2465 // branch above.
2466 if backend_resp.is_ok()
2467 && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2468 {
2469 mgr.put_object_tags(&put_bucket, &put_key, tags);
2470 }
2471 // v0.6 #40: cross-bucket replication for the zero-length PUT
2472 // branch — same shape as the body-bearing branch above.
2473 // v0.8.2 #61: pass `pending_version` so a versioned source's
2474 // destination receives the same shadow-key path.
2475 self.spawn_replication_if_matched(
2476 &put_bucket,
2477 &put_key,
2478 &request_tags,
2479 &bytes::Bytes::new(),
2480 &None,
2481 backend_resp.is_ok(),
2482 pending_version.as_ref(),
2483 );
2484 backend_resp
2485 }
2486
2487 // === 圧縮を解く path (GET) ===
2488 #[tracing::instrument(
2489 name = "s4.get_object",
2490 skip(self, req),
2491 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2492 )]
2493 async fn get_object(
2494 &self,
2495 mut req: S3Request<GetObjectInput>,
2496 ) -> S3Result<S3Response<GetObjectOutput>> {
2497 let get_start = Instant::now();
2498 let get_bucket = req.input.bucket.clone();
2499 let get_key = req.input.key.clone();
2500 self.enforce_rate_limit(&req, &get_bucket)?;
2501 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2502 // Range request の事前検出 (decompress 後 slice する path に使う)。
2503 let range_request = req.input.range.take();
2504 // v0.5 #27: pull SSE-C material from the input headers before
2505 // the request is moved into the backend. A header parse error
2506 // fails fast (no body fetch). The material is consumed below
2507 // when decrypting an S4E3-framed body; the SSE-C headers on
2508 // `req.input` are cleared so the backend doesn't see them.
2509 let sse_c_alg = req.input.sse_customer_algorithm.take();
2510 let sse_c_key = req.input.sse_customer_key.take();
2511 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2512 let get_sse_c_material =
2513 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2514
2515 // v0.5 #34: route the GET through the VersioningManager when
2516 // attached AND the bucket is in a versioning-aware state.
2517 // Resolves which version to fetch (explicit `?versionId=` query
2518 // param vs. chain latest), translates a delete-marker into 404
2519 // NoSuchKey, and rewrites the backend storage key to the shadow
2520 // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
2521 // versions. `resolved_version_id` is stamped onto the response
2522 // so clients see a coherent `x-amz-version-id` header.
2523 //
2524 // When the bucket is Unversioned (or no manager attached), the
2525 // chain-resolution step is skipped and the request flows
2526 // through the existing single-key path unchanged.
2527 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2528 Some(mgr)
2529 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2530 {
2531 let req_vid = req.input.version_id.take();
2532 let entry = match req_vid.as_deref() {
2533 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2534 || S3Error::with_message(
2535 S3ErrorCode::NoSuchVersion,
2536 format!("no such version: {vid}"),
2537 ),
2538 )?,
2539 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2540 S3Error::with_message(
2541 S3ErrorCode::NoSuchKey,
2542 format!("no such key: {get_key}"),
2543 )
2544 })?,
2545 };
2546 if entry.is_delete_marker {
2547 // S3 spec: GET without versionId on a
2548 // delete-marker latest → 404 NoSuchKey + the
2549 // response carries `x-amz-delete-marker: true`.
2550 // GET with explicit versionId pointing at a delete
2551 // marker → 405 MethodNotAllowed; we surface
2552 // NoSuchKey here for both since s3s collapses them
2553 // into the same not-found error path.
2554 return Err(S3Error::with_message(
2555 S3ErrorCode::NoSuchKey,
2556 format!("delete marker is the current version of {get_key}"),
2557 ));
2558 }
2559 if entry.version_id != crate::versioning::NULL_VERSION_ID {
2560 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2561 }
2562 Some(entry.version_id)
2563 }
2564 _ => None,
2565 };
2566
2567 // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
2568 // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
2569 // 必要 frame だけを backend に Range GET し帯域節約する。
2570 if let Some(ref r) = range_request
2571 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2572 {
2573 let total = index.total_original_size();
2574 let (start, end_exclusive) = match resolve_range(r, total) {
2575 Ok(v) => v,
2576 Err(e) => {
2577 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2578 }
2579 };
2580 if let Some(plan) = index.lookup_range(start, end_exclusive) {
2581 return self
2582 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2583 .await;
2584 }
2585 }
2586 let mut resp = self.backend.get_object(req).await?;
2587 // v0.5 #34: stamp the resolved version-id so the client sees a
2588 // coherent `x-amz-version-id` header (only for chains owned by
2589 // the manager — Unversioned buckets / no-manager paths never
2590 // set this).
2591 if let Some(ref vid) = resolved_version_id {
2592 resp.output.version_id = Some(vid.clone());
2593 }
2594 let is_multipart = is_multipart_object(&resp.output.metadata);
2595 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2596 // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
2597 // multipart と同じ path に流す。
2598 let needs_frame_parse = is_multipart || is_framed_v2;
2599 let manifest_opt = extract_manifest(&resp.output.metadata);
2600
2601 if !needs_frame_parse && manifest_opt.is_none() {
2602 // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
2603 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2604 return Ok(resp);
2605 }
2606
2607 if let Some(blob) = resp.output.body.take() {
2608 // v0.4 #21 / v0.5 #27: if the object was stored under SSE
2609 // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
2610 // before any frame parse / streaming decompress. Encrypted
2611 // bodies are opaque to the codec; this also forces the
2612 // buffered path because AES-GCM needs the full body for tag
2613 // verify. SSE-C uses the per-request customer key, SSE-S4
2614 // falls back to the configured keyring.
2615 let blob = if is_sse_encrypted(&resp.output.metadata) {
2616 let body = collect_blob(blob, self.max_body_bytes)
2617 .await
2618 .map_err(internal("collect SSE-encrypted body"))?;
2619 // v0.5 #28: peek the frame magic to route the right
2620 // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
2621 // through the KMS backend (async). S4E1/E2/E3 take
2622 // the sync path (keyring or customer key).
2623 //
2624 // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): the chunked
2625 // SSE-S4 frames take the *streaming* path — we hand
2626 // the response body a per-chunk verify-and-emit
2627 // Stream so the client sees chunk 0 plaintext after
2628 // one chunk-worth of AES-GCM verify (vs. waiting
2629 // for the whole body's tag), and the gateway no
2630 // longer needs to materialize the full plaintext
2631 // in memory before responding. SSE-C is out of
2632 // scope for the chunked path (chunked S4E3 is a
2633 // follow-up), so this branch requires the SSE-S4
2634 // keyring to be wired and `get_sse_c_material` to
2635 // be absent — otherwise we surface a clear
2636 // misconfiguration error instead of silently
2637 // falling through to the buffered chunked path.
2638 if matches!(
2639 crate::sse::peek_magic(&body),
2640 Some("S4E5") | Some("S4E6")
2641 ) && get_sse_c_material.is_none()
2642 {
2643 let keyring_arc = self.sse_keyring.clone().ok_or_else(|| {
2644 S3Error::with_message(
2645 S3ErrorCode::InvalidRequest,
2646 "object is SSE-S4 encrypted (S4E5/S4E6) but no --sse-s4-key is configured on this gateway",
2647 )
2648 })?;
2649 let body_len = body.len() as u64;
2650 let stream =
2651 crate::sse::decrypt_chunked_stream(body, keyring_arc.as_ref());
2652 // Stream is `'static` (the keyring borrow is
2653 // consumed up front; the cipher lives inside
2654 // the stream state — see decrypt_chunked_stream
2655 // doc), so we can move it straight into a
2656 // StreamingBlob without lifetime gymnastics.
2657 use futures::StreamExt;
2658 let mapped = stream.map(|r| {
2659 r.map_err(|e| std::io::Error::other(format!(
2660 "SSE-S4 chunked decrypt: {e}"
2661 )))
2662 });
2663 use s3s::dto::StreamingBlob;
2664 resp.output.body = Some(StreamingBlob::wrap(mapped));
2665 // Plaintext content_length is unknown until all
2666 // chunks have been verified; null it out so the
2667 // ByteStream wrapper reports `unknown` to the
2668 // HTTP layer (which then emits chunked transfer-
2669 // encoding) rather than lying about the size.
2670 resp.output.content_length = None;
2671 // The backend's checksums + ETag describe the
2672 // encrypted body (S4E5/S4E6 wire format), not
2673 // the plaintext we're about to stream — clear them
2674 // so the AWS SDK doesn't fail the GET with a
2675 // ChecksumMismatch on a successful round-trip.
2676 // Mirrors the streaming-zstd path at L1180-1185.
2677 resp.output.checksum_crc32 = None;
2678 resp.output.checksum_crc32c = None;
2679 resp.output.checksum_crc64nvme = None;
2680 resp.output.checksum_sha1 = None;
2681 resp.output.checksum_sha256 = None;
2682 resp.output.e_tag = None;
2683 let elapsed = get_start.elapsed();
2684 crate::metrics::record_get(
2685 "sse-s4-chunked",
2686 body_len,
2687 body_len,
2688 elapsed.as_secs_f64(),
2689 true,
2690 );
2691 return Ok(resp);
2692 }
2693 let plain = match crate::sse::peek_magic(&body) {
2694 Some("S4E4") => {
2695 let kms = self.kms.as_ref().ok_or_else(|| {
2696 S3Error::with_message(
2697 S3ErrorCode::InvalidRequest,
2698 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2699 )
2700 })?;
2701 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2702 crate::sse::decrypt_with_kms(&body, kms_ref)
2703 .await
2704 .map_err(|e| match e {
2705 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2706 other => S3Error::with_message(
2707 S3ErrorCode::InternalError,
2708 format!("SSE-KMS decrypt failed: {other}"),
2709 ),
2710 })?
2711 }
2712 _ => {
2713 if let Some(ref m) = get_sse_c_material {
2714 crate::sse::decrypt(
2715 &body,
2716 crate::sse::SseSource::CustomerKey {
2717 key: &m.key,
2718 key_md5: &m.key_md5,
2719 },
2720 )
2721 .map_err(sse_c_error_to_s3)?
2722 } else {
2723 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2724 S3Error::with_message(
2725 S3ErrorCode::InvalidRequest,
2726 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2727 )
2728 })?;
2729 crate::sse::decrypt(&body, keyring).map_err(|e| {
2730 S3Error::with_message(
2731 S3ErrorCode::InternalError,
2732 format!("SSE-S4 decrypt failed: {e}"),
2733 )
2734 })?
2735 }
2736 }
2737 };
2738 // v0.5 #28: parse out the on-disk wrapped DEK's key id
2739 // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
2740 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2741 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2742 {
2743 resp.output.server_side_encryption = Some(
2744 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2745 );
2746 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2747 }
2748 bytes_to_blob(plain)
2749 } else if let Some(ref m) = get_sse_c_material {
2750 // Client sent SSE-C headers for an unencrypted object —
2751 // mirror AWS S3's 400 InvalidRequest.
2752 let _ = m;
2753 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2754 } else {
2755 blob
2756 };
2757 // v0.5 #27: SSE-C echo on success — algorithm + key MD5
2758 // tell the client that the supplied key was the one used.
2759 if let Some(ref m) = get_sse_c_material {
2760 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2761 resp.output.sse_customer_key_md5 = Some(
2762 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2763 );
2764 }
2765 // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
2766 // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
2767 // codec が streaming-aware なら body を chunk-by-chunk で decompress して
2768 // 即座に client に流す。
2769 //
2770 // ただし Range request 時は streaming できない (slice するため total bytes
2771 // が必要) → buffered path に fall through。
2772 if range_request.is_none()
2773 && !needs_frame_parse
2774 && let Some(ref m) = manifest_opt
2775 && supports_streaming_decompress(m.codec)
2776 && m.codec == CodecKind::CpuZstd
2777 {
2778 let decompressed_blob = cpu_zstd_decompress_stream(blob);
2779 resp.output.content_length = Some(m.original_size as i64);
2780 resp.output.checksum_crc32 = None;
2781 resp.output.checksum_crc32c = None;
2782 resp.output.checksum_crc64nvme = None;
2783 resp.output.checksum_sha1 = None;
2784 resp.output.checksum_sha256 = None;
2785 resp.output.e_tag = None;
2786 resp.output.body = Some(decompressed_blob);
2787 let elapsed = get_start.elapsed();
2788 crate::metrics::record_get(
2789 m.codec.as_str(),
2790 m.compressed_size,
2791 m.original_size,
2792 elapsed.as_secs_f64(),
2793 true,
2794 );
2795 info!(
2796 op = "get_object",
2797 bucket = %get_bucket,
2798 key = %get_key,
2799 codec = m.codec.as_str(),
2800 bytes_in = m.compressed_size,
2801 bytes_out = m.original_size,
2802 path = "streaming",
2803 setup_latency_ms = elapsed.as_millis() as u64,
2804 "S4 get started (streaming)"
2805 );
2806 return Ok(resp);
2807 }
2808 // Passthrough: そのまま流す (Range なしの場合のみ streaming)
2809 if range_request.is_none()
2810 && !needs_frame_parse
2811 && let Some(ref m) = manifest_opt
2812 && m.codec == CodecKind::Passthrough
2813 {
2814 resp.output.content_length = Some(m.original_size as i64);
2815 resp.output.checksum_crc32 = None;
2816 resp.output.checksum_crc32c = None;
2817 resp.output.checksum_crc64nvme = None;
2818 resp.output.checksum_sha1 = None;
2819 resp.output.checksum_sha256 = None;
2820 resp.output.e_tag = None;
2821 resp.output.body = Some(blob);
2822 debug!("S4 get_object: passthrough streaming");
2823 return Ok(resp);
2824 }
2825
2826 // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
2827 let bytes = collect_blob(blob, self.max_body_bytes)
2828 .await
2829 .map_err(internal("collect get body"))?;
2830
2831 let decompressed = if needs_frame_parse {
2832 // multipart objects と framed-v2 single-PUT objects は同じ
2833 // S4F2 frame 列なので decompress_multipart で統一処理
2834 self.decompress_multipart(bytes).await?
2835 } else {
2836 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2837 self.registry
2838 .decompress(bytes, manifest)
2839 .await
2840 .map_err(internal("registry decompress"))?
2841 };
2842
2843 // Range request があれば slice。なければ full body を返す。
2844 let total_size = decompressed.len() as u64;
2845 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2846 let (start, end) = resolve_range(r, total_size)
2847 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2848 let sliced = decompressed.slice(start as usize..end as usize);
2849 resp.output.content_range = Some(format!(
2850 "bytes {start}-{}/{total_size}",
2851 end.saturating_sub(1)
2852 ));
2853 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2854 } else {
2855 (decompressed, None)
2856 };
2857 // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
2858 // 圧縮 size のままだと downstream が body を途中で切ってしまう)
2859 resp.output.content_length = Some(final_bytes.len() as i64);
2860 // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
2861 // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
2862 // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
2863 // (manifest 内 / frame 内) で integrity を保証する設計にする。
2864 resp.output.checksum_crc32 = None;
2865 resp.output.checksum_crc32c = None;
2866 resp.output.checksum_crc64nvme = None;
2867 resp.output.checksum_sha1 = None;
2868 resp.output.checksum_sha256 = None;
2869 resp.output.e_tag = None;
2870 let returned_size = final_bytes.len() as u64;
2871 let codec_label = manifest_opt
2872 .as_ref()
2873 .map(|m| m.codec.as_str())
2874 .unwrap_or("multipart");
2875 resp.output.body = Some(bytes_to_blob(final_bytes));
2876 if let Some(status) = status_override {
2877 resp.status = Some(status);
2878 }
2879 let elapsed = get_start.elapsed();
2880 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2881 info!(
2882 op = "get_object",
2883 bucket = %get_bucket,
2884 key = %get_key,
2885 codec = codec_label,
2886 bytes_out = returned_size,
2887 total_object_size = total_size,
2888 range = range_request.is_some(),
2889 path = "buffered",
2890 latency_ms = elapsed.as_millis() as u64,
2891 "S4 get completed (buffered)"
2892 );
2893 }
2894 // v0.6 #40: echo the recorded `x-amz-replication-status` so
2895 // consumers can poll progress (PENDING / COMPLETED / FAILED).
2896 if let Some(mgr) = self.replication.as_ref()
2897 && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2898 {
2899 resp.output.replication_status =
2900 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2901 }
2902 Ok(resp)
2903 }
2904
2905 // === passthrough delegations ===
2906 async fn head_bucket(
2907 &self,
2908 req: S3Request<HeadBucketInput>,
2909 ) -> S3Result<S3Response<HeadBucketOutput>> {
2910 self.backend.head_bucket(req).await
2911 }
2912 async fn list_buckets(
2913 &self,
2914 req: S3Request<ListBucketsInput>,
2915 ) -> S3Result<S3Response<ListBucketsOutput>> {
2916 self.backend.list_buckets(req).await
2917 }
2918 async fn create_bucket(
2919 &self,
2920 req: S3Request<CreateBucketInput>,
2921 ) -> S3Result<S3Response<CreateBucketOutput>> {
2922 self.backend.create_bucket(req).await
2923 }
2924 async fn delete_bucket(
2925 &self,
2926 req: S3Request<DeleteBucketInput>,
2927 ) -> S3Result<S3Response<DeleteBucketOutput>> {
2928 self.backend.delete_bucket(req).await
2929 }
2930 async fn head_object(
2931 &self,
2932 req: S3Request<HeadObjectInput>,
2933 ) -> S3Result<S3Response<HeadObjectOutput>> {
2934 // v0.6 #40: capture bucket/key before req is consumed so the
2935 // replication-status echo can look the entry up.
2936 let head_bucket = req.input.bucket.clone();
2937 let head_key = req.input.key.clone();
2938 let mut resp = self.backend.head_object(req).await?;
2939 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2940 // 客側には decompress 後の意味のある content_length / checksum を返す。
2941 // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
2942 // (S4 は manifest 内の crc32c で integrity を担保する)。
2943 resp.output.content_length = Some(manifest.original_size as i64);
2944 resp.output.checksum_crc32 = None;
2945 resp.output.checksum_crc32c = None;
2946 resp.output.checksum_crc64nvme = None;
2947 resp.output.checksum_sha1 = None;
2948 resp.output.checksum_sha256 = None;
2949 resp.output.e_tag = None;
2950 }
2951 // v0.6 #40: echo `x-amz-replication-status` (PENDING / COMPLETED
2952 // / FAILED) so consumers can poll progress without a GET.
2953 if let Some(mgr) = self.replication.as_ref()
2954 && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2955 {
2956 resp.output.replication_status =
2957 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2958 }
2959 // v0.7 #48 BUG-4 fix: HEAD must echo SSE indicators so SDKs
2960 // and pipelines see the same posture they got on PUT. The PUT
2961 // path stamps `s4-sse-type` metadata for exactly this — HEAD
2962 // doesn't fetch the body, so it can't peek frame magic.
2963 if let Some(meta) = resp.output.metadata.as_ref()
2964 && let Some(sse_type) = meta.get("s4-sse-type")
2965 {
2966 {
2967 match sse_type.as_str() {
2968 "aws:kms" => {
2969 resp.output.server_side_encryption = Some(
2970 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2971 );
2972 if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2973 resp.output.ssekms_key_id = Some(key_id.clone());
2974 }
2975 }
2976 _ => {
2977 resp.output.server_side_encryption = Some(
2978 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2979 );
2980 if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2981 resp.output.sse_customer_algorithm =
2982 Some(crate::sse::SSE_C_ALGORITHM.into());
2983 resp.output.sse_customer_key_md5 = Some(md5.clone());
2984 }
2985 }
2986 }
2987 }
2988 }
2989 Ok(resp)
2990 }
2991 async fn delete_object(
2992 &self,
2993 mut req: S3Request<DeleteObjectInput>,
2994 ) -> S3Result<S3Response<DeleteObjectOutput>> {
2995 let bucket = req.input.bucket.clone();
2996 let key = req.input.key.clone();
2997 self.enforce_rate_limit(&req, &bucket)?;
2998 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2999 // v0.6 #42: MFA Delete enforcement. When the bucket has
3000 // MFA-Delete = Enabled, every DELETE / DELETE-version /
3001 // delete-marker form needs `x-amz-mfa: <serial> <code>` (RFC 6238
3002 // 6-digit TOTP). Runs *before* the WORM / versioning routers so
3003 // a missing token is denied for free regardless of which delete
3004 // path the request would otherwise take.
3005 if let Some(mgr) = self.mfa_delete.as_ref()
3006 && mgr.is_enabled(&bucket)
3007 {
3008 let header = req.input.mfa.as_deref();
3009 if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
3010 crate::metrics::record_mfa_delete_denial(&bucket);
3011 return Err(mfa_error_to_s3(e));
3012 }
3013 }
3014 // v0.5 #30: refuse the delete while a WORM lock is in effect.
3015 // Compliance can never be bypassed; Governance can be overridden
3016 // via `x-amz-bypass-governance-retention: true`; legal hold
3017 // never. The check happens before the versioning router so a
3018 // locked object can't be soft-deleted (delete-marker push) on an
3019 // Enabled bucket either — S3 spec says lock applies to all
3020 // delete forms.
3021 if let Some(mgr) = self.object_lock.as_ref()
3022 && let Some(state) = mgr.get(&bucket, &key)
3023 {
3024 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3025 let now = chrono::Utc::now();
3026 if !state.can_delete(now, bypass) {
3027 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
3028 return Err(S3Error::with_message(
3029 S3ErrorCode::AccessDenied,
3030 "Access Denied because object protected by object lock",
3031 ));
3032 }
3033 }
3034 // v0.5 #34: route DELETE through the VersioningManager when the
3035 // bucket is in a versioning-aware state.
3036 //
3037 // - Enabled bucket, no version_id → push a delete marker into
3038 // the chain. NO backend object is touched (older versions
3039 // stay reachable via specific-version GET).
3040 // - Enabled / Suspended bucket, with version_id → physical
3041 // delete. Backend bytes at the shadow key (or `<key>` for
3042 // `null`) are removed; chain entry is dropped. If the deleted
3043 // entry was a delete marker, no backend bytes exist for it
3044 // (record-only).
3045 // - Suspended bucket, no version_id → push a "null" delete
3046 // marker (S3 spec); backend bytes at `<key>` are physically
3047 // removed (same as legacy).
3048 // - Unversioned bucket → fall through to legacy passthrough.
3049 if let Some(mgr) = self.versioning.as_ref() {
3050 let state = mgr.state(&bucket);
3051 if state != crate::versioning::VersioningState::Unversioned {
3052 let req_vid = req.input.version_id.take();
3053 if let Some(vid) = req_vid {
3054 // Specific-version DELETE: touch backend bytes only
3055 // when the entry was a real version (not a delete
3056 // marker, which has no backend bytes).
3057 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
3058 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
3059 key.clone()
3060 } else {
3061 versioned_shadow_key(&key, &vid)
3062 };
3063 let was_real_version = outcome
3064 .as_ref()
3065 .map(|o| !o.is_delete_marker)
3066 .unwrap_or(false);
3067 if was_real_version {
3068 // Best-effort backend cleanup; missing bytes
3069 // are not an error (e.g. shadow key already
3070 // GC'd).
3071 let backend_input = DeleteObjectInput {
3072 bucket: bucket.clone(),
3073 key: backend_target,
3074 ..Default::default()
3075 };
3076 let backend_req = S3Request {
3077 input: backend_input,
3078 method: http::Method::DELETE,
3079 uri: req.uri.clone(),
3080 headers: req.headers.clone(),
3081 extensions: http::Extensions::new(),
3082 credentials: req.credentials.clone(),
3083 region: req.region.clone(),
3084 service: req.service.clone(),
3085 trailing_headers: None,
3086 };
3087 let _ = self.backend.delete_object(backend_req).await;
3088 }
3089 let mut output = DeleteObjectOutput {
3090 version_id: Some(vid.clone()),
3091 ..Default::default()
3092 };
3093 if let Some(o) = outcome.as_ref()
3094 && o.is_delete_marker
3095 {
3096 output.delete_marker = Some(true);
3097 }
3098 // v0.6 #35: specific-version DELETE always counts as
3099 // a hard `ObjectRemoved:Delete` event (the chain
3100 // entry, marker or not, is gone after this call).
3101 self.fire_delete_notification(
3102 &bucket,
3103 &key,
3104 crate::notifications::EventType::ObjectRemovedDelete,
3105 Some(vid.clone()),
3106 );
3107 return Ok(S3Response::new(output));
3108 }
3109 // No version_id: record a delete marker (state-aware).
3110 let outcome = mgr.record_delete(&bucket, &key);
3111 if state == crate::versioning::VersioningState::Suspended {
3112 // Suspended buckets also evict the prior `<key>`
3113 // bytes (the previous null version is gone too).
3114 let backend_input = DeleteObjectInput {
3115 bucket: bucket.clone(),
3116 key: key.clone(),
3117 ..Default::default()
3118 };
3119 let backend_req = S3Request {
3120 input: backend_input,
3121 method: http::Method::DELETE,
3122 uri: req.uri.clone(),
3123 headers: req.headers.clone(),
3124 extensions: http::Extensions::new(),
3125 credentials: req.credentials.clone(),
3126 region: req.region.clone(),
3127 service: req.service.clone(),
3128 trailing_headers: None,
3129 };
3130 let _ = self.backend.delete_object(backend_req).await;
3131 }
3132 let output = DeleteObjectOutput {
3133 delete_marker: Some(true),
3134 version_id: outcome.version_id.clone(),
3135 ..Default::default()
3136 };
3137 // v0.6 #35: versioned bucket DELETE without a version-id
3138 // creates a delete marker — the dedicated AWS event
3139 // taxonomy entry. Suspended-state buckets also push a
3140 // (null) marker, so the same event fires there.
3141 self.fire_delete_notification(
3142 &bucket,
3143 &key,
3144 crate::notifications::EventType::ObjectRemovedDeleteMarker,
3145 outcome.version_id,
3146 );
3147 return Ok(S3Response::new(output));
3148 }
3149 }
3150 // Legacy / Unversioned path: physical delete on the backend +
3151 // best-effort sidecar cleanup (mirrors v0.4 behaviour).
3152 let resp = self.backend.delete_object(req).await?;
3153 // v0.5 #30: drop any per-object lock state once the delete has
3154 // succeeded so the freed key can be re-armed by a future PUT
3155 // under the bucket default. Reaching here implies the lock had
3156 // already passed `can_delete` above, so this is purely cleanup.
3157 if let Some(mgr) = self.object_lock.as_ref() {
3158 mgr.clear(&bucket, &key);
3159 }
3160 // v0.6 #39: drop any object-level tag set on physical delete —
3161 // the freed key starts a fresh tag history if a future PUT
3162 // re-creates it. (Versioned-delete branches above return early
3163 // and do NOT touch tags, mirroring AWS where tag state is
3164 // attached to the logical key, not the version chain.)
3165 if let Some(mgr) = self.tagging.as_ref() {
3166 mgr.delete_object_tags(&bucket, &key);
3167 }
3168 let sidecar = sidecar_key(&key);
3169 // v0.7 #49: skip the sidecar DELETE if the key + sidecar suffix
3170 // can't be encoded into a request URI — the primary delete
3171 // already succeeded and a stale sidecar is harmless (Range GET
3172 // re-validates the underlying object on next read).
3173 if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
3174 let sidecar_input = DeleteObjectInput {
3175 bucket: bucket.clone(),
3176 key: sidecar,
3177 ..Default::default()
3178 };
3179 let sidecar_req = S3Request {
3180 input: sidecar_input,
3181 method: http::Method::DELETE,
3182 uri,
3183 headers: http::HeaderMap::new(),
3184 extensions: http::Extensions::new(),
3185 credentials: None,
3186 region: None,
3187 service: None,
3188 trailing_headers: None,
3189 };
3190 let _ = self.backend.delete_object(sidecar_req).await;
3191 }
3192 // v0.6 #35: legacy unversioned-bucket hard delete fires the
3193 // canonical `ObjectRemoved:Delete` event.
3194 self.fire_delete_notification(
3195 &bucket,
3196 &key,
3197 crate::notifications::EventType::ObjectRemovedDelete,
3198 None,
3199 );
3200 Ok(resp)
3201 }
3202 async fn delete_objects(
3203 &self,
3204 req: S3Request<DeleteObjectsInput>,
3205 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
3206 // v0.6 #42: MFA Delete applies once to the whole batch (S3 spec:
3207 // when MFA-Delete is on the bucket, a missing / invalid token
3208 // fails the entire DeleteObjects request, not per-object).
3209 if let Some(mgr) = self.mfa_delete.as_ref()
3210 && mgr.is_enabled(&req.input.bucket)
3211 {
3212 let header = req.input.mfa.as_deref();
3213 if let Err(e) =
3214 crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
3215 {
3216 crate::metrics::record_mfa_delete_denial(&req.input.bucket);
3217 return Err(mfa_error_to_s3(e));
3218 }
3219 }
3220 self.backend.delete_objects(req).await
3221 }
3222 async fn copy_object(
3223 &self,
3224 mut req: S3Request<CopyObjectInput>,
3225 ) -> S3Result<S3Response<CopyObjectOutput>> {
3226 // copy is conceptually "GetObject src + PutObject dst" — enforce both.
3227 let dst_bucket = req.input.bucket.clone();
3228 let dst_key = req.input.key.clone();
3229 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
3230 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3231 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
3232 }
3233 // S4-aware copy: source object に s4-* metadata がある場合、それを
3234 // destination に確実に preserve する。
3235 //
3236 // - MetadataDirective::COPY (default): backend が source metadata を
3237 // そのまま copy するので S4 metadata も自動で渡る。介入不要
3238 // - MetadataDirective::REPLACE: 客が指定した metadata で source を
3239 // 上書き → s4-* metadata が消えると destination は decompress 不能に
3240 // なる (silent corruption)。S4 が source metadata を HEAD で取得し、
3241 // s4-* fields を input.metadata に強制 merge する
3242 let needs_merge = req
3243 .input
3244 .metadata_directive
3245 .as_ref()
3246 .map(|d| d.as_str() == MetadataDirective::REPLACE)
3247 .unwrap_or(false);
3248 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3249 let head_input = HeadObjectInput {
3250 bucket: bucket.to_string(),
3251 key: key.to_string(),
3252 ..Default::default()
3253 };
3254 let head_req = S3Request {
3255 input: head_input,
3256 method: req.method.clone(),
3257 uri: req.uri.clone(),
3258 headers: req.headers.clone(),
3259 extensions: http::Extensions::new(),
3260 credentials: req.credentials.clone(),
3261 region: req.region.clone(),
3262 service: req.service.clone(),
3263 trailing_headers: None,
3264 };
3265 if let Ok(head) = self.backend.head_object(head_req).await
3266 && let Some(src_meta) = head.output.metadata.as_ref()
3267 {
3268 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3269 for key in [
3270 META_CODEC,
3271 META_ORIGINAL_SIZE,
3272 META_COMPRESSED_SIZE,
3273 META_CRC32C,
3274 META_MULTIPART,
3275 META_FRAMED,
3276 ] {
3277 if let Some(v) = src_meta.get(key) {
3278 // 客が同じ key を指定していたら preserve しない (= 上書き許可)
3279 // していたら何もしない。指定していなければ insert
3280 dest_meta
3281 .entry(key.to_string())
3282 .or_insert_with(|| v.clone());
3283 }
3284 }
3285 debug!(
3286 src_bucket = %bucket,
3287 src_key = %key,
3288 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3289 );
3290 }
3291 }
3292 self.backend.copy_object(req).await
3293 }
3294 async fn list_objects(
3295 &self,
3296 req: S3Request<ListObjectsInput>,
3297 ) -> S3Result<S3Response<ListObjectsOutput>> {
3298 self.enforce_rate_limit(&req, &req.input.bucket)?;
3299 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3300 let mut resp = self.backend.list_objects(req).await?;
3301 // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
3302 // — v0.5 #34) を顧客から隠す。
3303 if let Some(contents) = resp.output.contents.as_mut() {
3304 contents.retain(|o| {
3305 o.key
3306 .as_ref()
3307 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3308 .unwrap_or(true)
3309 });
3310 }
3311 Ok(resp)
3312 }
3313 async fn list_objects_v2(
3314 &self,
3315 req: S3Request<ListObjectsV2Input>,
3316 ) -> S3Result<S3Response<ListObjectsV2Output>> {
3317 self.enforce_rate_limit(&req, &req.input.bucket)?;
3318 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3319 let mut resp = self.backend.list_objects_v2(req).await?;
3320 if let Some(contents) = resp.output.contents.as_mut() {
3321 let before = contents.len();
3322 contents.retain(|o| {
3323 o.key
3324 .as_ref()
3325 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3326 .unwrap_or(true)
3327 });
3328 // key_count も補正 (S3 spec compliance)
3329 if let Some(kc) = resp.output.key_count.as_mut() {
3330 *kc -= (before - contents.len()) as i32;
3331 }
3332 }
3333 Ok(resp)
3334 }
3335 /// v0.4 #17: filter S4-internal sidecars from versioned listings.
3336 /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
3337 /// attached AND the bucket is in a versioning-aware state, build
3338 /// the `Versions` / `DeleteMarkers` arrays directly from the
3339 /// in-memory chain (paginated + ordered the S3 way: key asc,
3340 /// version newest-first inside each key). Otherwise fall back to
3341 /// passthrough + sidecar-filter (legacy v0.4 behaviour).
3342 async fn list_object_versions(
3343 &self,
3344 req: S3Request<ListObjectVersionsInput>,
3345 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3346 self.enforce_rate_limit(&req, &req.input.bucket)?;
3347 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3348 // v0.5 #34: VersioningManager-owned path.
3349 if let Some(mgr) = self.versioning.as_ref()
3350 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3351 {
3352 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3353 let page = mgr.list_versions(
3354 &req.input.bucket,
3355 req.input.prefix.as_deref(),
3356 req.input.key_marker.as_deref(),
3357 req.input.version_id_marker.as_deref(),
3358 max_keys,
3359 );
3360 let versions: Vec<ObjectVersion> = page
3361 .versions
3362 .into_iter()
3363 .map(|e| ObjectVersion {
3364 key: Some(e.key),
3365 version_id: Some(e.version_id),
3366 is_latest: Some(e.is_latest),
3367 e_tag: Some(ETag::Strong(e.etag)),
3368 size: Some(e.size as i64),
3369 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3370 ..Default::default()
3371 })
3372 .collect();
3373 let delete_markers: Vec<DeleteMarkerEntry> = page
3374 .delete_markers
3375 .into_iter()
3376 .map(|e| DeleteMarkerEntry {
3377 key: Some(e.key),
3378 version_id: Some(e.version_id),
3379 is_latest: Some(e.is_latest),
3380 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3381 ..Default::default()
3382 })
3383 .collect();
3384 let output = ListObjectVersionsOutput {
3385 name: Some(req.input.bucket.clone()),
3386 prefix: req.input.prefix.clone(),
3387 key_marker: req.input.key_marker.clone(),
3388 version_id_marker: req.input.version_id_marker.clone(),
3389 max_keys: req.input.max_keys,
3390 versions: if versions.is_empty() {
3391 None
3392 } else {
3393 Some(versions)
3394 },
3395 delete_markers: if delete_markers.is_empty() {
3396 None
3397 } else {
3398 Some(delete_markers)
3399 },
3400 is_truncated: Some(page.is_truncated),
3401 next_key_marker: page.next_key_marker,
3402 next_version_id_marker: page.next_version_id_marker,
3403 ..Default::default()
3404 };
3405 return Ok(S3Response::new(output));
3406 }
3407 // Legacy passthrough path (v0.4 #17 sidecar filter retained).
3408 let mut resp = self.backend.list_object_versions(req).await?;
3409 if let Some(versions) = resp.output.versions.as_mut() {
3410 versions.retain(|v| {
3411 v.key
3412 .as_ref()
3413 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3414 .unwrap_or(true)
3415 });
3416 }
3417 if let Some(markers) = resp.output.delete_markers.as_mut() {
3418 markers.retain(|m| {
3419 m.key
3420 .as_ref()
3421 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3422 .unwrap_or(true)
3423 });
3424 }
3425 Ok(resp)
3426 }
3427
3428 async fn create_multipart_upload(
3429 &self,
3430 mut req: S3Request<CreateMultipartUploadInput>,
3431 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3432 // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
3433 // frame parse を起動するため、object metadata に flag を立てる。
3434 // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
3435 let codec_kind = self.registry.default_kind();
3436 let meta = req.input.metadata.get_or_insert_with(Default::default);
3437 meta.insert(META_MULTIPART.into(), "true".into());
3438 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3439 // v0.8 #54 BUG-10 fix: take() the SSE request fields off
3440 // `req.input` so they are NOT forwarded to the backend on
3441 // CreateMultipartUpload. Same root cause as v0.7 #48 BUG-2/3 on
3442 // single-PUT — MinIO rejects SSE-C with "HTTPS required" and
3443 // SSE-KMS with "KMS not configured" when the headers reach it.
3444 // S4 owns the encrypt-then-store contract; we capture the
3445 // recipe in `multipart_state` here and apply it on Complete.
3446 let sse_c_alg = req.input.sse_customer_algorithm.take();
3447 let sse_c_key = req.input.sse_customer_key.take();
3448 let sse_c_md5 = req.input.sse_customer_key_md5.take();
3449 let sse_header = req.input.server_side_encryption.take();
3450 let sse_kms_key = req.input.ssekms_key_id.take();
3451 // Strip the encryption-context too — leaving it would make
3452 // MinIO try to validate it against a non-existent KMS key.
3453 let _ = req.input.ssekms_encryption_context.take();
3454 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
3455 let kms_key_id = extract_kms_key_id(
3456 &sse_header,
3457 &sse_kms_key,
3458 self.kms_default_key_id.as_deref(),
3459 );
3460 // SSE-C / SSE-KMS exclusivity (mirrors put_object L1870).
3461 if sse_c_material.is_some() && kms_key_id.is_some() {
3462 return Err(S3Error::with_message(
3463 S3ErrorCode::InvalidArgument,
3464 "SSE-C and SSE-KMS cannot be used together on the same multipart upload",
3465 ));
3466 }
3467 let sse_mode = if let Some(ref m) = sse_c_material {
3468 // v0.8.2 #62 (H-6 audit fix): wrap the customer-supplied
3469 // 32-byte key in `Zeroizing` so abandoned uploads (or
3470 // normal Complete/Abort) wipe the key bytes on drop. The
3471 // `key_md5` is the public fingerprint and stays as a
3472 // bare `[u8; 16]`.
3473 crate::multipart_state::MultipartSseMode::SseC {
3474 key: zeroize::Zeroizing::new(m.key),
3475 key_md5: m.key_md5,
3476 }
3477 } else if let Some(ref kid) = kms_key_id {
3478 // KMS pre-flight: fail at Create rather than at Complete if
3479 // the gateway has no KMS backend wired (mirrors the
3480 // put_object L1879 check).
3481 if self.kms.is_none() {
3482 return Err(S3Error::with_message(
3483 S3ErrorCode::InvalidRequest,
3484 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3485 ));
3486 }
3487 crate::multipart_state::MultipartSseMode::SseKms { key_id: kid.clone() }
3488 } else if self.sse_keyring.is_some() {
3489 // SSE-S4: server-driven transparent encryption. Activates
3490 // whenever the gateway has a keyring configured AND the
3491 // client didn't pick a different SSE mode.
3492 crate::multipart_state::MultipartSseMode::SseS4
3493 } else {
3494 crate::multipart_state::MultipartSseMode::None
3495 };
3496 // v0.8 #54 BUG-9 fix: parse the Tagging header on Create. The
3497 // single-PUT path does this on PutObject; the multipart path
3498 // captures it now and commits via TagManager on Complete.
3499 let request_tags: Option<crate::tagging::TagSet> = req
3500 .input
3501 .tagging
3502 .as_deref()
3503 .map(crate::tagging::parse_tagging_header)
3504 .transpose()
3505 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
3506 // Strip the `Tagging` field off the input so the backend
3507 // doesn't try to apply it (no-op on MinIO but keeps the wire
3508 // clean).
3509 let _ = req.input.tagging.take();
3510 // Object Lock recipe (BUG-7 — captured here, applied on Complete).
3511 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
3512 .input
3513 .object_lock_mode
3514 .as_ref()
3515 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3516 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
3517 .input
3518 .object_lock_retain_until_date
3519 .as_ref()
3520 .and_then(timestamp_to_chrono_utc);
3521 let explicit_legal_hold_on: bool = req
3522 .input
3523 .object_lock_legal_hold_status
3524 .as_ref()
3525 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3526 .unwrap_or(false);
3527 let bucket = req.input.bucket.clone();
3528 let key = req.input.key.clone();
3529 debug!(
3530 bucket = %bucket,
3531 key = %key,
3532 codec = codec_kind.as_str(),
3533 sse = ?sse_mode,
3534 "S4 create_multipart_upload: marking object for per-part compression"
3535 );
3536 let mut resp = self.backend.create_multipart_upload(req).await?;
3537 // Stash the per-upload context only after the backend handed
3538 // us an upload_id (failed Creates leave nothing in the store).
3539 if let Some(upload_id) = resp.output.upload_id.as_ref() {
3540 self.multipart_state.put(
3541 upload_id,
3542 crate::multipart_state::MultipartUploadContext {
3543 bucket,
3544 key,
3545 sse: sse_mode.clone(),
3546 tags: request_tags,
3547 object_lock_mode: explicit_lock_mode,
3548 object_lock_retain_until: explicit_retain_until,
3549 object_lock_legal_hold: explicit_legal_hold_on,
3550 },
3551 );
3552 }
3553 // SSE-C / SSE-KMS response echo (mirrors put_object L2036-L2050).
3554 match &sse_mode {
3555 crate::multipart_state::MultipartSseMode::SseC { key_md5, .. } => {
3556 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
3557 resp.output.sse_customer_key_md5 = Some(
3558 base64::engine::general_purpose::STANDARD.encode(key_md5),
3559 );
3560 }
3561 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3562 resp.output.server_side_encryption = Some(
3563 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3564 );
3565 resp.output.ssekms_key_id = Some(key_id.clone());
3566 }
3567 _ => {}
3568 }
3569 Ok(resp)
3570 }
3571
3572 async fn upload_part(
3573 &self,
3574 mut req: S3Request<UploadPartInput>,
3575 ) -> S3Result<S3Response<UploadPartOutput>> {
3576 // 各 part を圧縮して frame header 付きで forward。GET 時に
3577 // `decompress_multipart` が frame iter で順に解凍する。
3578 // **per-part codec dispatch**: dispatcher が body 先頭 sample から
3579 // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
3580 // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
3581 //
3582 // v0.8 #54 BUG-5/BUG-10 fix: lookup the per-upload SSE
3583 // context captured by `create_multipart_upload` and (a) strip
3584 // any SSE-C request headers off `req.input` so the backend
3585 // doesn't see them — same root cause as v0.7 #48 BUG-2/3 on
3586 // single-PUT; MinIO refuses SSE-C parts over HTTP — and (b)
3587 // observe that an upload context exists for `upload_id`. The
3588 // actual encrypt happens once at `complete_multipart_upload`
3589 // time on the assembled body (the per-part-encrypt approach
3590 // would require a matching multi-segment decrypt path on GET;
3591 // encrypting the whole assembled body keeps the GET path's
3592 // `is_sse_encrypted` branch in get_object L2429 working
3593 // unchanged).
3594 let sse_ctx = self
3595 .multipart_state
3596 .get(req.input.upload_id.as_str());
3597 // v0.8.2 #62 (H-1 audit fix): SSE-C key consistency check.
3598 // The AWS S3 spec requires the same SSE-C key headers on
3599 // every UploadPart and rejects mismatches with 400. Prior to
3600 // #62 we silently stripped the headers (BUG-10 fix) without
3601 // validating them, allowing a client to send part 1 under
3602 // key-A and part 2 under key-B; both got stored, then
3603 // re-encrypted with key-A on Complete — the client thinks
3604 // part 2 is under key-B but a GET with key-B would in fact
3605 // hit the part-1 ciphertext that was actually encrypted with
3606 // key-A. That would either decrypt successfully (silent
3607 // corruption: client lost track of which key encrypts what)
3608 // or fail in a confusing way. Validate the per-part headers
3609 // now and reject with 400 InvalidArgument on mismatch /
3610 // omission / partial supply, matching real-S3 behaviour.
3611 if let Some(ref ctx) = sse_ctx {
3612 if let crate::multipart_state::MultipartSseMode::SseC { key_md5: ctx_md5, .. } =
3613 &ctx.sse
3614 {
3615 let alg = req.input.sse_customer_algorithm.take();
3616 let key_b64 = req.input.sse_customer_key.take();
3617 let md5_b64 = req.input.sse_customer_key_md5.take();
3618 match (alg, key_b64, md5_b64) {
3619 (Some(a), Some(k), Some(m)) => {
3620 // Parse + validate; if the per-part headers
3621 // are themselves malformed (algorithm not
3622 // AES256, MD5 mismatch, key not 32 bytes)
3623 // surface the same 400 the single-PUT path
3624 // would. Then compare the parsed MD5 to the
3625 // upload-context's MD5; mismatch is a
3626 // different-key UploadPart and must reject.
3627 let part_material =
3628 crate::sse::parse_customer_key_headers(&a, &k, &m)
3629 .map_err(sse_c_error_to_s3)?;
3630 if part_material.key_md5 != *ctx_md5 {
3631 return Err(S3Error::with_message(
3632 S3ErrorCode::InvalidArgument,
3633 "SSE-C key on UploadPart does not match the key supplied on CreateMultipartUpload",
3634 ));
3635 }
3636 // OK — same key as Create. Headers are
3637 // already taken off `req.input` so the
3638 // backend never sees them.
3639 }
3640 (None, None, None) => {
3641 // AWS S3 spec: SSE-C headers MUST be replayed
3642 // on every UploadPart of an SSE-C multipart.
3643 // Real-S3 returns 400 InvalidRequest in this
3644 // case; mirror that.
3645 return Err(S3Error::with_message(
3646 S3ErrorCode::InvalidRequest,
3647 "SSE-C requires customer-key headers on every UploadPart (CreateMultipartUpload was SSE-C)",
3648 ));
3649 }
3650 _ => {
3651 // Partial header set (e.g. algorithm + key
3652 // but no MD5) — same handling as the
3653 // single-PUT `extract_sse_c_material` helper.
3654 return Err(S3Error::with_message(
3655 S3ErrorCode::InvalidRequest,
3656 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
3657 ));
3658 }
3659 }
3660 } else {
3661 // CreateMultipartUpload was non-SSE-C (None / SseS4 /
3662 // SseKms). A part that arrives carrying SSE-C headers
3663 // is either a confused client or an attempt to
3664 // smuggle SSE-C around the gateway-internal SSE
3665 // recipe. Reject with 400 InvalidRequest rather than
3666 // silently strip — the strip would let the client
3667 // believe the part was encrypted under their key
3668 // when in fact the upload's encryption recipe is
3669 // whatever the Create captured.
3670 if req.input.sse_customer_algorithm.is_some()
3671 || req.input.sse_customer_key.is_some()
3672 || req.input.sse_customer_key_md5.is_some()
3673 {
3674 return Err(S3Error::with_message(
3675 S3ErrorCode::InvalidRequest,
3676 "UploadPart sent SSE-C headers but CreateMultipartUpload was not SSE-C",
3677 ));
3678 }
3679 }
3680 } else {
3681 // No upload context registered (gateway crashed between
3682 // Create and Part, or pre-#62 abandoned-upload restore).
3683 // We can't check key consistency in this case — strip
3684 // the headers and let the request through unchanged so
3685 // the backend's `NoSuchUpload` reply (or whatever it
3686 // chooses to do) flows back to the client.
3687 let _ = req.input.sse_customer_algorithm.take();
3688 let _ = req.input.sse_customer_key.take();
3689 let _ = req.input.sse_customer_key_md5.take();
3690 }
3691 let _sse_ctx = sse_ctx;
3692 if let Some(blob) = req.input.body.take() {
3693 let bytes = collect_blob(blob, self.max_body_bytes)
3694 .await
3695 .map_err(internal("collect upload_part body"))?;
3696 let sample_len = bytes.len().min(SAMPLE_BYTES);
3697 // v0.8 #56: full part body is already in memory here; use its
3698 // length as the size hint so the dispatcher can promote to GPU
3699 // if it's big enough.
3700 let codec_kind = self
3701 .dispatcher
3702 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
3703 .await;
3704 let original_size = bytes.len() as u64;
3705 // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
3706 let (compress_res, tel) = self
3707 .registry
3708 .compress_with_telemetry(bytes, codec_kind)
3709 .await;
3710 stamp_gpu_compress_telemetry(&tel);
3711 let (compressed, manifest) = compress_res.map_err(internal("registry compress part"))?;
3712 let header = FrameHeader {
3713 codec: codec_kind,
3714 original_size,
3715 compressed_size: compressed.len() as u64,
3716 crc32c: manifest.crc32c,
3717 };
3718 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3719 write_frame(&mut framed, header, &compressed);
3720 // v0.2 #5: heuristic-based padding skip for likely-final parts.
3721 //
3722 // AWS SDK / aws-cli / boto3 always send the final (and only the
3723 // final) part below the configured part_size. So if the raw user
3724 // part is already smaller than S3's 5 MiB multipart minimum, this
3725 // is overwhelmingly likely to be the final part — and the final
3726 // part is exempt from S3's size constraint. Skipping padding here
3727 // saves up to ~5 MiB per object on highly compressible workloads.
3728 //
3729 // If a misbehaving client sends a tiny **non-final** part, S3
3730 // itself rejects with EntityTooSmall at CompleteMultipartUpload —
3731 // identical outcome to a vanilla S3 PUT, just earlier than
3732 // padding-then-complete would catch it.
3733 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3734 if !likely_final {
3735 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3736 }
3737 let framed_bytes = framed.freeze();
3738 let new_len = framed_bytes.len() as i64;
3739 // 同じ wire 互換問題が multipart にもある (content-length / checksum)
3740 req.input.content_length = Some(new_len);
3741 req.input.checksum_algorithm = None;
3742 req.input.checksum_crc32 = None;
3743 req.input.checksum_crc32c = None;
3744 req.input.checksum_crc64nvme = None;
3745 req.input.checksum_sha1 = None;
3746 req.input.checksum_sha256 = None;
3747 req.input.content_md5 = None;
3748 req.input.body = Some(bytes_to_blob(framed_bytes));
3749 debug!(
3750 part_number = ?req.input.part_number,
3751 upload_id = ?req.input.upload_id,
3752 original_size,
3753 framed_size = new_len,
3754 "S4 upload_part: framed compressed payload"
3755 );
3756 }
3757 self.backend.upload_part(req).await
3758 }
3759 async fn complete_multipart_upload(
3760 &self,
3761 mut req: S3Request<CompleteMultipartUploadInput>,
3762 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3763 let bucket = req.input.bucket.clone();
3764 let key = req.input.key.clone();
3765 let upload_id = req.input.upload_id.clone();
3766 // v0.8.1 #59: serialise concurrent Complete invocations on the
3767 // same `(bucket, key)`. The race window the lock closes is the
3768 // GET-assembled-body → encrypt → PUT-encrypted-body triple
3769 // below (BUG-5 fix); without serialisation, two Completes for
3770 // different `upload_id` but the same logical key could each
3771 // read the other's plaintext assembled body and overwrite the
3772 // peer's encrypted result. The guard is held to function exit
3773 // (drop on `Ok` / `Err`), covering version-id mint, object-
3774 // lock apply, tagging persist, and replication enqueue too.
3775 let completion_lock = self.multipart_state.completion_lock(&bucket, &key);
3776 let _completion_guard = completion_lock.lock().await;
3777 // v0.8 #54 — fetch the per-upload context captured on Create.
3778 // `None` means an abandoned / unknown upload_id (gateway
3779 // crashed between Create and Complete, or pre-v0.8 state
3780 // restore); we still let the backend do its thing for
3781 // transparency, but we can't apply any SSE / version / lock /
3782 // tag / replication post-processing because we never captured
3783 // the recipe.
3784 let ctx = self.multipart_state.get(upload_id.as_str());
3785 // v0.8 #54 BUG-10 fix: same SSE-C header strip as upload_part
3786 // — some clients (boto3 / aws-sdk-cpp older versions) replay
3787 // the SSE-C triple on Complete too, and MinIO will choke if
3788 // they reach the backend.
3789 let _ = req.input.sse_customer_algorithm.take();
3790 let _ = req.input.sse_customer_key.take();
3791 let _ = req.input.sse_customer_key_md5.take();
3792 let mut resp = self.backend.complete_multipart_upload(req).await?;
3793 // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
3794 // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
3795 // partial fetch path が利用可能になる (Range request の帯域節約)。
3796 // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
3797 // できれば爆速になるので 1 回の cost は payback される
3798 //
3799 // v0.8 #54 BUG-5..9: this same fetch is the choke-point for
3800 // the SSE encrypt re-PUT + versioning shadow-key rewrite +
3801 // replication source-bytes capture, so we GET once and reuse
3802 // the bytes for every post-processing step.
3803 let assembled_body: Option<bytes::Bytes> =
3804 if let Ok(uri) = safe_object_uri(&bucket, &key) {
3805 let get_input = GetObjectInput {
3806 bucket: bucket.clone(),
3807 key: key.clone(),
3808 ..Default::default()
3809 };
3810 let get_req = S3Request {
3811 input: get_input,
3812 method: http::Method::GET,
3813 uri,
3814 headers: http::HeaderMap::new(),
3815 extensions: http::Extensions::new(),
3816 credentials: None,
3817 region: None,
3818 service: None,
3819 trailing_headers: None,
3820 };
3821 match self.backend.get_object(get_req).await {
3822 Ok(get_resp) => match get_resp.output.body {
3823 Some(blob) => collect_blob(blob, self.max_body_bytes).await.ok(),
3824 None => None,
3825 },
3826 Err(_) => None,
3827 }
3828 } else {
3829 None
3830 };
3831 // Sidecar build (existing behaviour, gated on assembled body).
3832 if let Some(ref body) = assembled_body
3833 && let Ok(index) = build_index_from_body(body)
3834 {
3835 self.write_sidecar(&bucket, &key, &index).await;
3836 }
3837 // From here on, post-processing depends on the context —
3838 // short-circuit when the upload had no captured recipe
3839 // (legacy / crashed-Create / pre-v0.8 state restore).
3840 if let Some(ctx) = ctx {
3841 // v0.8 #54 BUG-6 fix: mint a version-id when the bucket
3842 // is versioning-Enabled. The single-PUT path does this in
3843 // `put_object` ~L1968; multipart was the missing branch.
3844 // We mint here (post-Complete, before any re-PUT) so the
3845 // same vid threads into both the shadow-key rewrite and
3846 // the VersionEntry the manager records.
3847 let pending_version: Option<crate::versioning::PutOutcome> = self
3848 .versioning
3849 .as_ref()
3850 .map(|mgr| mgr.state(&bucket))
3851 .map(|state| match state {
3852 crate::versioning::VersioningState::Enabled => {
3853 crate::versioning::PutOutcome {
3854 version_id: crate::versioning::VersioningManager::new_version_id(),
3855 versioned_response: true,
3856 }
3857 }
3858 crate::versioning::VersioningState::Suspended
3859 | crate::versioning::VersioningState::Unversioned => {
3860 crate::versioning::PutOutcome {
3861 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
3862 versioned_response: false,
3863 }
3864 }
3865 });
3866 // v0.8 #54 BUG-5 fix: encrypt the assembled framed body
3867 // and re-PUT it to the backend so the on-disk bytes are
3868 // SSE-encrypted. The single-PUT path does this body-by-
3869 // body inside `put_object` (L1907-L1942); for multipart,
3870 // encrypt-per-part would require a multi-segment decrypt
3871 // path on GET — we instead do a single encrypt over the
3872 // assembled framed body so the existing GET decrypt
3873 // branch (`is_sse_encrypted` → `decrypt(body, source)` →
3874 // FrameIter) handles it unchanged.
3875 //
3876 // The cost is one extra round-trip per Complete for SSE-
3877 // enabled multipart (already-paid for the sidecar build).
3878 // For single-instance gateways pointing at a co-located
3879 // backend this is negligible; cross-region operators
3880 // would benefit from per-part encrypt + multi-segment
3881 // decrypt as a follow-up.
3882 let needs_re_put = matches!(
3883 ctx.sse,
3884 crate::multipart_state::MultipartSseMode::SseS4
3885 | crate::multipart_state::MultipartSseMode::SseC { .. }
3886 | crate::multipart_state::MultipartSseMode::SseKms { .. }
3887 ) || pending_version
3888 .as_ref()
3889 .map(|pv| pv.versioned_response)
3890 .unwrap_or(false);
3891 // Snapshot replication body in advance so we can pass it
3892 // to the spawn helper after the (possibly absent) re-PUT.
3893 let replication_body = assembled_body.clone();
3894 let mut applied_metadata: Option<std::collections::HashMap<String, String>> = None;
3895 if needs_re_put && let Some(body) = assembled_body {
3896 // v0.8.1 #58: same Zeroizing pattern as put_object's
3897 // single-PUT KMS branch — DEK plaintext lives in
3898 // `Zeroizing<[u8; 32]>` for the lifetime of this
3899 // Complete handler, then is wiped on drop.
3900 let kms_wrap: Option<(
3901 zeroize::Zeroizing<[u8; 32]>,
3902 crate::kms::WrappedDek,
3903 )> = if let crate::multipart_state::MultipartSseMode::SseKms {
3904 ref key_id,
3905 } = ctx.sse
3906 {
3907 let kms = self.kms.as_ref().ok_or_else(|| {
3908 S3Error::with_message(
3909 S3ErrorCode::InvalidRequest,
3910 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3911 )
3912 })?;
3913 let (dek, wrapped) = kms
3914 .generate_dek(key_id)
3915 .await
3916 .map_err(kms_error_to_s3)?;
3917 if dek.len() != 32 {
3918 return Err(S3Error::with_message(
3919 S3ErrorCode::InternalError,
3920 format!(
3921 "KMS backend returned a DEK of {} bytes (expected 32)",
3922 dek.len()
3923 ),
3924 ));
3925 }
3926 let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
3927 zeroize::Zeroizing::new([0u8; 32]);
3928 dek_arr.copy_from_slice(&dek);
3929 // `dek` (Zeroizing<Vec<u8>>) is dropped at scope end.
3930 Some((dek_arr, wrapped))
3931 } else {
3932 None
3933 };
3934 // Build the new metadata map: re-fetch via HEAD so
3935 // the multipart / codec markers the backend stamped
3936 // on Create flow through unchanged, then layer the
3937 // SSE markers on top.
3938 let head_req = S3Request {
3939 input: HeadObjectInput {
3940 bucket: bucket.clone(),
3941 key: key.clone(),
3942 ..Default::default()
3943 },
3944 method: http::Method::HEAD,
3945 uri: safe_object_uri(&bucket, &key)?,
3946 headers: http::HeaderMap::new(),
3947 extensions: http::Extensions::new(),
3948 credentials: None,
3949 region: None,
3950 service: None,
3951 trailing_headers: None,
3952 };
3953 let mut new_metadata: std::collections::HashMap<String, String> =
3954 match self.backend.head_object(head_req).await {
3955 Ok(h) => h.output.metadata.unwrap_or_default(),
3956 Err(_) => std::collections::HashMap::new(),
3957 };
3958 let new_body = match &ctx.sse {
3959 crate::multipart_state::MultipartSseMode::SseC { key, key_md5 } => {
3960 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3961 new_metadata.insert("s4-sse-type".into(), "AES256".into());
3962 new_metadata.insert(
3963 "s4-sse-c-key-md5".into(),
3964 base64::engine::general_purpose::STANDARD.encode(key_md5),
3965 );
3966 // v0.8.2 #62: `key` is `&Zeroizing<[u8; 32]>`;
3967 // auto-deref through one explicit binding so
3968 // `SseSource::CustomerKey` gets the `&[u8; 32]`
3969 // it expects (mirrors the SSE-KMS DEK shape
3970 // a few lines down).
3971 let key_ref: &[u8; 32] = key;
3972 crate::sse::encrypt_with_source(
3973 &body,
3974 crate::sse::SseSource::CustomerKey { key: key_ref, key_md5 },
3975 )
3976 }
3977 crate::multipart_state::MultipartSseMode::SseKms { .. } => {
3978 let (dek, wrapped) = kms_wrap
3979 .as_ref()
3980 .expect("SseKms branch implies kms_wrap is Some");
3981 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3982 new_metadata.insert("s4-sse-type".into(), "aws:kms".into());
3983 new_metadata
3984 .insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
3985 // v0.8.1 #58: auto-deref from `&Zeroizing<[u8; 32]>`
3986 // to `&[u8; 32]` (same shape as the put_object
3987 // single-PUT branch).
3988 let dek_ref: &[u8; 32] = dek;
3989 crate::sse::encrypt_with_source(
3990 &body,
3991 crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
3992 )
3993 }
3994 crate::multipart_state::MultipartSseMode::SseS4 => {
3995 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
3996 S3Error::with_message(
3997 S3ErrorCode::InternalError,
3998 "SSE-S4 captured at Create but keyring missing at Complete",
3999 )
4000 })?;
4001 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
4002 // SSE-S4 deliberately omits `s4-sse-type` so
4003 // HEAD doesn't falsely advertise AWS-style
4004 // SSE-S3 (matches the put_object L1929-L1939
4005 // comment).
4006 // v0.8 #52: same chunk_size dispatch as the
4007 // single-PUT branch — multipart Complete
4008 // re-encrypts the assembled body, so honoring
4009 // the chunked path here is required to keep
4010 // GET streaming on multipart-uploaded objects.
4011 if self.sse_chunk_size > 0 {
4012 crate::sse::encrypt_v2_chunked(
4013 &body,
4014 keyring,
4015 self.sse_chunk_size,
4016 )
4017 .map_err(|e| {
4018 S3Error::with_message(
4019 S3ErrorCode::InternalError,
4020 format!(
4021 "SSE-S4 chunked encrypt failed at Complete: {e}"
4022 ),
4023 )
4024 })?
4025 } else {
4026 crate::sse::encrypt_v2(&body, keyring)
4027 }
4028 }
4029 crate::multipart_state::MultipartSseMode::None => body.clone(),
4030 };
4031 // v0.8 #54 BUG-6 fix: write the re-PUT under the
4032 // shadow key so the version chain doesn't overwrite
4033 // the previous version on a versioned bucket. The
4034 // original (unshadowed) key was assembled by the
4035 // backend on Complete; we delete it after the shadow
4036 // PUT lands.
4037 let put_target_key = if let Some(pv) = pending_version.as_ref() {
4038 if pv.versioned_response {
4039 versioned_shadow_key(&key, &pv.version_id)
4040 } else {
4041 key.clone()
4042 }
4043 } else {
4044 key.clone()
4045 };
4046 let new_body_len = new_body.len() as i64;
4047 let put_req = S3Request {
4048 input: PutObjectInput {
4049 bucket: bucket.clone(),
4050 key: put_target_key.clone(),
4051 body: Some(bytes_to_blob(new_body.clone())),
4052 metadata: Some(new_metadata.clone()),
4053 content_length: Some(new_body_len),
4054 ..Default::default()
4055 },
4056 method: http::Method::PUT,
4057 uri: safe_object_uri(&bucket, &put_target_key)?,
4058 headers: http::HeaderMap::new(),
4059 extensions: http::Extensions::new(),
4060 credentials: None,
4061 region: None,
4062 service: None,
4063 trailing_headers: None,
4064 };
4065 self.backend.put_object(put_req).await?;
4066 // If we rewrote the storage key (versioning shadow),
4067 // we must drop the original (unshadowed) Complete-
4068 // assembled bytes so subsequent listings don't see a
4069 // duplicate.
4070 if put_target_key != key {
4071 let del_req = S3Request {
4072 input: DeleteObjectInput {
4073 bucket: bucket.clone(),
4074 key: key.clone(),
4075 ..Default::default()
4076 },
4077 method: http::Method::DELETE,
4078 uri: safe_object_uri(&bucket, &key)?,
4079 headers: http::HeaderMap::new(),
4080 extensions: http::Extensions::new(),
4081 credentials: None,
4082 region: None,
4083 service: None,
4084 trailing_headers: None,
4085 };
4086 let _ = self.backend.delete_object(del_req).await;
4087 }
4088 applied_metadata = Some(new_metadata);
4089 }
4090 // v0.8 #54 BUG-6 commit: register the new version with
4091 // the VersioningManager so list_object_versions /
4092 // GET ?versionId= see it.
4093 if let (Some(mgr), Some(pv)) = (self.versioning.as_ref(), pending_version.as_ref()) {
4094 let etag = resp
4095 .output
4096 .e_tag
4097 .clone()
4098 .map(ETag::into_value)
4099 .unwrap_or_default();
4100 let now = chrono::Utc::now();
4101 mgr.commit_put_with_version(
4102 &bucket,
4103 &key,
4104 crate::versioning::VersionEntry {
4105 version_id: pv.version_id.clone(),
4106 etag,
4107 size: replication_body
4108 .as_ref()
4109 .map(|b| b.len() as u64)
4110 .unwrap_or(0),
4111 is_delete_marker: false,
4112 created_at: now,
4113 },
4114 );
4115 if pv.versioned_response {
4116 resp.output.version_id = Some(pv.version_id.clone());
4117 }
4118 }
4119 // v0.8 #54 BUG-7 fix: persist any per-upload Object Lock
4120 // recipe + auto-apply the bucket default. Mirrors the
4121 // put_object L2057-L2074 block.
4122 if let Some(mgr) = self.object_lock.as_ref() {
4123 if ctx.object_lock_mode.is_some()
4124 || ctx.object_lock_retain_until.is_some()
4125 || ctx.object_lock_legal_hold
4126 {
4127 let mut state = mgr.get(&bucket, &key).unwrap_or_default();
4128 if let Some(m) = ctx.object_lock_mode {
4129 state.mode = Some(m);
4130 }
4131 if let Some(u) = ctx.object_lock_retain_until {
4132 state.retain_until = Some(u);
4133 }
4134 if ctx.object_lock_legal_hold {
4135 state.legal_hold_on = true;
4136 }
4137 mgr.set(&bucket, &key, state);
4138 }
4139 mgr.apply_default_on_put(&bucket, &key, chrono::Utc::now());
4140 }
4141 // v0.8 #54 BUG-9 fix: persist the captured tags via the
4142 // TagManager so GetObjectTagging returns them.
4143 if let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), ctx.tags.as_ref()) {
4144 mgr.put_object_tags(&bucket, &key, tags.clone());
4145 }
4146 // SSE-C / SSE-KMS response echo. The
4147 // CompleteMultipartUploadOutput only exposes
4148 // `server_side_encryption` + `ssekms_key_id` (no
4149 // sse_customer_* — those round-tripped on Create / parts).
4150 match &ctx.sse {
4151 crate::multipart_state::MultipartSseMode::SseC { .. } => {
4152 resp.output.server_side_encryption = Some(
4153 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
4154 );
4155 }
4156 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
4157 resp.output.server_side_encryption = Some(
4158 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
4159 );
4160 resp.output.ssekms_key_id = Some(key_id.clone());
4161 }
4162 _ => {}
4163 }
4164 // v0.8 #54 BUG-8 fix: fire cross-bucket replication just
4165 // like put_object L2165 does. We hand the dispatcher the
4166 // assembled body bytes (post-encrypt where applicable, so
4167 // the destination ends up byte-identical to the source's
4168 // on-disk shape) plus the metadata that was actually
4169 // committed.
4170 let replication_body_bytes = replication_body.unwrap_or_default();
4171 // v0.8.2 #61: thread the multipart-Complete `pending_version`
4172 // through so a versioning-Enabled source's destination
4173 // receives the same shadow-key path (mirror of the
4174 // single-PUT branch above).
4175 self.spawn_replication_if_matched(
4176 &bucket,
4177 &key,
4178 &ctx.tags,
4179 &replication_body_bytes,
4180 &applied_metadata,
4181 true,
4182 pending_version.as_ref(),
4183 );
4184 self.multipart_state.remove(upload_id.as_str());
4185 }
4186 // v0.8.1 #59 janitor: best-effort sweep of stale completion
4187 // locks while we are still on the critical path of a single
4188 // Complete (so steady-state workloads of unique keys don't
4189 // accumulate `DashMap` entries). The sweep only retires
4190 // entries whose `Arc::strong_count == 1`, so any other in-
4191 // flight Complete on a different key keeps its lock alive.
4192 // Our own `_completion_guard` keeps `bucket`/`key`'s entry
4193 // alive across this call; it's reaped on the next Complete or
4194 // the next caller-driven prune.
4195 self.multipart_state.prune_completion_locks();
4196 Ok(resp)
4197 }
4198 async fn abort_multipart_upload(
4199 &self,
4200 req: S3Request<AbortMultipartUploadInput>,
4201 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
4202 // v0.8 #54: drop the per-upload state (SSE-C key bytes / tag
4203 // set) promptly so an aborted upload doesn't leak the
4204 // customer's key into a long-running gateway's RSS.
4205 self.multipart_state.remove(req.input.upload_id.as_str());
4206 self.backend.abort_multipart_upload(req).await
4207 }
4208 async fn list_multipart_uploads(
4209 &self,
4210 req: S3Request<ListMultipartUploadsInput>,
4211 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
4212 self.backend.list_multipart_uploads(req).await
4213 }
4214 async fn list_parts(
4215 &self,
4216 req: S3Request<ListPartsInput>,
4217 ) -> S3Result<S3Response<ListPartsOutput>> {
4218 self.backend.list_parts(req).await
4219 }
4220
4221 // =========================================================================
4222 // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
4223 // 持たないので、backend (= AWS S3) の動作と完全に同一。
4224 //
4225 // 既知の制限事項:
4226 // - copy_object / upload_part_copy: source object が S4-compressed の場合、
4227 // backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
4228 // coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
4229 // 経由で正しく decompress できる。MetadataDirective REPLACE で上書き
4230 // されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
4231 // - list_object_versions: versioning enabled bucket では各 version も S4
4232 // metadata を維持する。古い version も S4 経由で正しく GET できる。
4233 // =========================================================================
4234
4235 // ---- Object ACL / tagging / attributes ----
4236 async fn get_object_acl(
4237 &self,
4238 req: S3Request<GetObjectAclInput>,
4239 ) -> S3Result<S3Response<GetObjectAclOutput>> {
4240 self.backend.get_object_acl(req).await
4241 }
4242 async fn put_object_acl(
4243 &self,
4244 req: S3Request<PutObjectAclInput>,
4245 ) -> S3Result<S3Response<PutObjectAclOutput>> {
4246 self.backend.put_object_acl(req).await
4247 }
4248 // v0.6 #39: object tagging — when a `TagManager` is attached the
4249 // configuration / per-(bucket, key) state lives in the manager and
4250 // these handlers serve directly from it; when no manager is
4251 // attached they fall back to the backend (legacy passthrough so
4252 // v0.5 deployments are unaffected).
4253 async fn get_object_tagging(
4254 &self,
4255 req: S3Request<GetObjectTaggingInput>,
4256 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
4257 let Some(mgr) = self.tagging.as_ref() else {
4258 return self.backend.get_object_tagging(req).await;
4259 };
4260 let tags = mgr
4261 .get_object_tags(&req.input.bucket, &req.input.key)
4262 .unwrap_or_default();
4263 Ok(S3Response::new(GetObjectTaggingOutput {
4264 tag_set: tagset_to_aws(&tags),
4265 ..Default::default()
4266 }))
4267 }
4268 async fn put_object_tagging(
4269 &self,
4270 req: S3Request<PutObjectTaggingInput>,
4271 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
4272 let Some(mgr) = self.tagging.as_ref() else {
4273 return self.backend.put_object_tagging(req).await;
4274 };
4275 let bucket = req.input.bucket.clone();
4276 let key = req.input.key.clone();
4277 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4278 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4279 })?;
4280 // v0.6 #39: gate via IAM policy with both the request tags
4281 // (`s3:RequestObjectTag/<key>`) and any existing tags on the
4282 // target object (`s3:ExistingObjectTag/<key>`).
4283 let existing = mgr.get_object_tags(&bucket, &key);
4284 self.enforce_policy_with_extra(
4285 &req,
4286 "s3:PutObjectTagging",
4287 &bucket,
4288 Some(&key),
4289 Some(&parsed),
4290 existing.as_ref(),
4291 )?;
4292 mgr.put_object_tags(&bucket, &key, parsed);
4293 Ok(S3Response::new(PutObjectTaggingOutput::default()))
4294 }
4295 async fn delete_object_tagging(
4296 &self,
4297 req: S3Request<DeleteObjectTaggingInput>,
4298 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
4299 let Some(mgr) = self.tagging.as_ref() else {
4300 return self.backend.delete_object_tagging(req).await;
4301 };
4302 let bucket = req.input.bucket.clone();
4303 let key = req.input.key.clone();
4304 let existing = mgr.get_object_tags(&bucket, &key);
4305 self.enforce_policy_with_extra(
4306 &req,
4307 "s3:DeleteObjectTagging",
4308 &bucket,
4309 Some(&key),
4310 None,
4311 existing.as_ref(),
4312 )?;
4313 mgr.delete_object_tags(&bucket, &key);
4314 Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
4315 }
4316 async fn get_object_attributes(
4317 &self,
4318 req: S3Request<GetObjectAttributesInput>,
4319 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
4320 self.backend.get_object_attributes(req).await
4321 }
4322 async fn restore_object(
4323 &self,
4324 req: S3Request<RestoreObjectInput>,
4325 ) -> S3Result<S3Response<RestoreObjectOutput>> {
4326 self.backend.restore_object(req).await
4327 }
4328 async fn upload_part_copy(
4329 &self,
4330 req: S3Request<UploadPartCopyInput>,
4331 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
4332 // v0.2 #6: byte-range aware copy when the source is S4-framed.
4333 //
4334 // For a framed source (multipart upload OR single-PUT framed-v2),
4335 // a naive byte-range passthrough would copy compressed bytes that
4336 // don't align with S4 frame boundaries — silently corrupting the
4337 // result. Instead we GET the source through S4 (which handles
4338 // decompression + Range), re-compress + re-frame as a new part,
4339 // and forward as upload_part. For non-framed sources (S4-untouched
4340 // raw objects), passthrough is correct and we keep the original
4341 // (cheaper) code path.
4342 let CopySource::Bucket {
4343 bucket: src_bucket,
4344 key: src_key,
4345 ..
4346 } = &req.input.copy_source
4347 else {
4348 return self.backend.upload_part_copy(req).await;
4349 };
4350 let src_bucket = src_bucket.to_string();
4351 let src_key = src_key.to_string();
4352
4353 // Probe metadata to decide whether the source needs S4-aware copy.
4354 let head_input = HeadObjectInput {
4355 bucket: src_bucket.clone(),
4356 key: src_key.clone(),
4357 ..Default::default()
4358 };
4359 let head_req = S3Request {
4360 input: head_input,
4361 method: http::Method::HEAD,
4362 uri: req.uri.clone(),
4363 headers: req.headers.clone(),
4364 extensions: http::Extensions::new(),
4365 credentials: req.credentials.clone(),
4366 region: req.region.clone(),
4367 service: req.service.clone(),
4368 trailing_headers: None,
4369 };
4370 let needs_s4_copy = match self.backend.head_object(head_req).await {
4371 Ok(h) => {
4372 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
4373 }
4374 Err(_) => false,
4375 };
4376 if !needs_s4_copy {
4377 return self.backend.upload_part_copy(req).await;
4378 }
4379
4380 // Resolve the optional source byte range to pass to GET.
4381 let source_range = req
4382 .input
4383 .copy_source_range
4384 .as_ref()
4385 .map(|r| parse_copy_source_range(r))
4386 .transpose()
4387 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
4388
4389 // GET source via S4 (handles decompression + sidecar partial fetch
4390 // when range is present). The result is the requested user-visible
4391 // byte range, fully decompressed.
4392 let mut get_input = GetObjectInput {
4393 bucket: src_bucket.clone(),
4394 key: src_key.clone(),
4395 ..Default::default()
4396 };
4397 get_input.range = source_range;
4398 let get_req = S3Request {
4399 input: get_input,
4400 method: http::Method::GET,
4401 uri: req.uri.clone(),
4402 headers: req.headers.clone(),
4403 extensions: http::Extensions::new(),
4404 credentials: req.credentials.clone(),
4405 region: req.region.clone(),
4406 service: req.service.clone(),
4407 trailing_headers: None,
4408 };
4409 let get_resp = self.get_object(get_req).await?;
4410 let blob = get_resp.output.body.ok_or_else(|| {
4411 S3Error::with_message(
4412 S3ErrorCode::InternalError,
4413 "upload_part_copy: empty body from source GET",
4414 )
4415 })?;
4416 let bytes = collect_blob(blob, self.max_body_bytes)
4417 .await
4418 .map_err(internal("collect upload_part_copy source body"))?;
4419
4420 // Compress + frame as a fresh part (mirrors upload_part path).
4421 let sample_len = bytes.len().min(SAMPLE_BYTES);
4422 // v0.8 #56: same size-hint promotion as the upload_part path.
4423 let codec_kind = self
4424 .dispatcher
4425 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
4426 .await;
4427 let original_size = bytes.len() as u64;
4428 // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
4429 let (compress_res, tel) = self
4430 .registry
4431 .compress_with_telemetry(bytes, codec_kind)
4432 .await;
4433 stamp_gpu_compress_telemetry(&tel);
4434 let (compressed, manifest) =
4435 compress_res.map_err(internal("registry compress upload_part_copy"))?;
4436 let header = FrameHeader {
4437 codec: codec_kind,
4438 original_size,
4439 compressed_size: compressed.len() as u64,
4440 crc32c: manifest.crc32c,
4441 };
4442 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
4443 write_frame(&mut framed, header, &compressed);
4444 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
4445 if !likely_final {
4446 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
4447 }
4448 let framed_bytes = framed.freeze();
4449 let framed_len = framed_bytes.len() as i64;
4450
4451 // Forward as upload_part to the destination multipart upload.
4452 let part_input = UploadPartInput {
4453 bucket: req.input.bucket.clone(),
4454 key: req.input.key.clone(),
4455 part_number: req.input.part_number,
4456 upload_id: req.input.upload_id.clone(),
4457 body: Some(bytes_to_blob(framed_bytes)),
4458 content_length: Some(framed_len),
4459 ..Default::default()
4460 };
4461 let part_req = S3Request {
4462 input: part_input,
4463 method: http::Method::PUT,
4464 uri: req.uri.clone(),
4465 headers: req.headers.clone(),
4466 extensions: http::Extensions::new(),
4467 credentials: req.credentials.clone(),
4468 region: req.region.clone(),
4469 service: req.service.clone(),
4470 trailing_headers: None,
4471 };
4472 let upload_resp = self.backend.upload_part(part_req).await?;
4473
4474 let copy_output = UploadPartCopyOutput {
4475 copy_part_result: Some(CopyPartResult {
4476 e_tag: upload_resp.output.e_tag.clone(),
4477 ..Default::default()
4478 }),
4479 ..Default::default()
4480 };
4481 Ok(S3Response::new(copy_output))
4482 }
4483
4484 // ---- Object lock / retention / legal hold (v0.5 #30) ----
4485 //
4486 // When an `ObjectLockManager` is attached the configuration / per-object
4487 // state lives in the manager and these handlers serve directly from it;
4488 // when no manager is attached they fall back to the backend (legacy
4489 // passthrough so v0.4 deployments are unaffected).
4490 async fn get_object_lock_configuration(
4491 &self,
4492 req: S3Request<GetObjectLockConfigurationInput>,
4493 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
4494 if let Some(mgr) = self.object_lock.as_ref() {
4495 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
4496 ObjectLockConfiguration {
4497 object_lock_enabled: Some(ObjectLockEnabled::from_static(
4498 ObjectLockEnabled::ENABLED,
4499 )),
4500 rule: Some(ObjectLockRule {
4501 default_retention: Some(DefaultRetention {
4502 days: Some(d.retention_days as i32),
4503 mode: Some(ObjectLockRetentionMode::from_static(
4504 match d.mode {
4505 crate::object_lock::LockMode::Governance => {
4506 ObjectLockRetentionMode::GOVERNANCE
4507 }
4508 crate::object_lock::LockMode::Compliance => {
4509 ObjectLockRetentionMode::COMPLIANCE
4510 }
4511 },
4512 )),
4513 years: None,
4514 }),
4515 }),
4516 }
4517 });
4518 let output = GetObjectLockConfigurationOutput {
4519 object_lock_configuration: cfg,
4520 };
4521 return Ok(S3Response::new(output));
4522 }
4523 self.backend.get_object_lock_configuration(req).await
4524 }
4525 async fn put_object_lock_configuration(
4526 &self,
4527 req: S3Request<PutObjectLockConfigurationInput>,
4528 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
4529 if let Some(mgr) = self.object_lock.as_ref() {
4530 let bucket = req.input.bucket.clone();
4531 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
4532 && let Some(rule) = cfg.rule.as_ref()
4533 && let Some(d) = rule.default_retention.as_ref()
4534 {
4535 let mode = d
4536 .mode
4537 .as_ref()
4538 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
4539 .ok_or_else(|| {
4540 S3Error::with_message(
4541 S3ErrorCode::InvalidRequest,
4542 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
4543 )
4544 })?;
4545 // S3 spec: exactly one of Days / Years (we accept Days
4546 // outright and convert Years → Days for storage; Years
4547 // is just a UX shorthand on the wire).
4548 let days: u32 = match (d.days, d.years) {
4549 (Some(d), None) if d > 0 => d as u32,
4550 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
4551 _ => {
4552 return Err(S3Error::with_message(
4553 S3ErrorCode::InvalidRequest,
4554 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
4555 ));
4556 }
4557 };
4558 mgr.set_bucket_default(
4559 &bucket,
4560 crate::object_lock::BucketObjectLockDefault {
4561 mode,
4562 retention_days: days,
4563 },
4564 );
4565 }
4566 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
4567 }
4568 self.backend.put_object_lock_configuration(req).await
4569 }
4570 async fn get_object_legal_hold(
4571 &self,
4572 req: S3Request<GetObjectLegalHoldInput>,
4573 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
4574 if let Some(mgr) = self.object_lock.as_ref() {
4575 let on = mgr
4576 .get(&req.input.bucket, &req.input.key)
4577 .map(|s| s.legal_hold_on)
4578 .unwrap_or(false);
4579 let status = ObjectLockLegalHoldStatus::from_static(if on {
4580 ObjectLockLegalHoldStatus::ON
4581 } else {
4582 ObjectLockLegalHoldStatus::OFF
4583 });
4584 let output = GetObjectLegalHoldOutput {
4585 legal_hold: Some(ObjectLockLegalHold {
4586 status: Some(status),
4587 }),
4588 };
4589 return Ok(S3Response::new(output));
4590 }
4591 self.backend.get_object_legal_hold(req).await
4592 }
4593 async fn put_object_legal_hold(
4594 &self,
4595 req: S3Request<PutObjectLegalHoldInput>,
4596 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
4597 if let Some(mgr) = self.object_lock.as_ref() {
4598 let on = req
4599 .input
4600 .legal_hold
4601 .as_ref()
4602 .and_then(|h| h.status.as_ref())
4603 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
4604 .unwrap_or(false);
4605 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
4606 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
4607 }
4608 self.backend.put_object_legal_hold(req).await
4609 }
4610 async fn get_object_retention(
4611 &self,
4612 req: S3Request<GetObjectRetentionInput>,
4613 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
4614 if let Some(mgr) = self.object_lock.as_ref() {
4615 let retention = mgr
4616 .get(&req.input.bucket, &req.input.key)
4617 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
4618 .map(|s| {
4619 let mode = s.mode.map(|m| {
4620 ObjectLockRetentionMode::from_static(match m {
4621 crate::object_lock::LockMode::Governance => {
4622 ObjectLockRetentionMode::GOVERNANCE
4623 }
4624 crate::object_lock::LockMode::Compliance => {
4625 ObjectLockRetentionMode::COMPLIANCE
4626 }
4627 })
4628 });
4629 let until = s.retain_until.map(chrono_utc_to_timestamp);
4630 ObjectLockRetention {
4631 mode,
4632 retain_until_date: until,
4633 }
4634 });
4635 let output = GetObjectRetentionOutput { retention };
4636 return Ok(S3Response::new(output));
4637 }
4638 self.backend.get_object_retention(req).await
4639 }
4640 async fn put_object_retention(
4641 &self,
4642 req: S3Request<PutObjectRetentionInput>,
4643 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
4644 if let Some(mgr) = self.object_lock.as_ref() {
4645 let bucket = req.input.bucket.clone();
4646 let key = req.input.key.clone();
4647 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
4648 let retention = req.input.retention.as_ref().ok_or_else(|| {
4649 S3Error::with_message(
4650 S3ErrorCode::InvalidRequest,
4651 "PutObjectRetention requires a Retention element",
4652 )
4653 })?;
4654 let new_mode = retention
4655 .mode
4656 .as_ref()
4657 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
4658 let new_until = retention
4659 .retain_until_date
4660 .as_ref()
4661 .map(timestamp_to_chrono_utc)
4662 .unwrap_or(None);
4663 let now = chrono::Utc::now();
4664 let existing = mgr.get(&bucket, &key).unwrap_or_default();
4665 // S3 immutability rules:
4666 // - Compliance is one-way: once set, mode cannot move to
4667 // Governance, and retain-until cannot be shortened.
4668 // - Governance can be lengthened freely; shortened only
4669 // with bypass=true.
4670 if let Some(existing_mode) = existing.mode
4671 && existing_mode == crate::object_lock::LockMode::Compliance
4672 && existing.is_locked(now)
4673 {
4674 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
4675 return Err(S3Error::with_message(
4676 S3ErrorCode::AccessDenied,
4677 "Cannot downgrade Compliance retention to Governance while lock is active",
4678 ));
4679 }
4680 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4681 && next < prev
4682 {
4683 return Err(S3Error::with_message(
4684 S3ErrorCode::AccessDenied,
4685 "Cannot shorten Compliance retention while lock is active",
4686 ));
4687 }
4688 }
4689 if let Some(existing_mode) = existing.mode
4690 && existing_mode == crate::object_lock::LockMode::Governance
4691 && existing.is_locked(now)
4692 && !bypass
4693 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4694 && next < prev
4695 {
4696 return Err(S3Error::with_message(
4697 S3ErrorCode::AccessDenied,
4698 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
4699 ));
4700 }
4701 let mut state = existing;
4702 if new_mode.is_some() {
4703 state.mode = new_mode;
4704 }
4705 if new_until.is_some() {
4706 state.retain_until = new_until;
4707 }
4708 mgr.set(&bucket, &key, state);
4709 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
4710 }
4711 self.backend.put_object_retention(req).await
4712 }
4713
4714 // ---- Versioning ----
4715 // list_object_versions is implemented above in the compression-hook
4716 // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
4717 // VersioningManager is attached (v0.5 #34), serves chains directly
4718 // from the in-memory index.
4719 async fn get_bucket_versioning(
4720 &self,
4721 req: S3Request<GetBucketVersioningInput>,
4722 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
4723 // v0.5 #34: when a VersioningManager is attached, the bucket's
4724 // versioning state lives in the manager (= S4-server's
4725 // authoritative source). Pass-through hits the backend only
4726 // when no manager is configured (legacy v0.4 behaviour).
4727 if let Some(mgr) = self.versioning.as_ref() {
4728 let output = match mgr.state(&req.input.bucket).as_aws_status() {
4729 Some(s) => GetBucketVersioningOutput {
4730 status: Some(BucketVersioningStatus::from(s.to_owned())),
4731 ..Default::default()
4732 },
4733 None => GetBucketVersioningOutput::default(),
4734 };
4735 return Ok(S3Response::new(output));
4736 }
4737 self.backend.get_bucket_versioning(req).await
4738 }
4739 async fn put_bucket_versioning(
4740 &self,
4741 req: S3Request<PutBucketVersioningInput>,
4742 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
4743 // v0.6 #42: MFA gating on the `PutBucketVersioning` request
4744 // itself. S3 spec: when the request body carries an
4745 // `MfaDelete` element (either `Enabled` or `Disabled`), the
4746 // request must include a valid `x-amz-mfa` token — both for
4747 // the *first* enable (so the operator can't quietly side-step
4748 // the gate by never enabling it) and for any subsequent
4749 // change (so a leaked credential alone can't disable MFA
4750 // Delete to bypass it on subsequent DELETEs). Requests that
4751 // omit the `MfaDelete` element entirely (i.e. they flip only
4752 // `Status`) skip this gate, matching AWS.
4753 if let Some(mgr) = self.mfa_delete.as_ref()
4754 && let Some(target_enabled) = req
4755 .input
4756 .versioning_configuration
4757 .mfa_delete
4758 .as_ref()
4759 .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
4760 {
4761 let bucket = req.input.bucket.clone();
4762 let header = req.input.mfa.as_deref();
4763 let secret = mgr.lookup_secret(&bucket);
4764 let verified = match (header, secret.as_ref()) {
4765 (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
4766 Ok((serial, code)) => {
4767 serial == s.serial
4768 && crate::mfa::verify_totp(
4769 &s.secret_base32,
4770 &code,
4771 current_unix_secs(),
4772 )
4773 }
4774 Err(_) => false,
4775 },
4776 _ => false,
4777 };
4778 if !verified {
4779 crate::metrics::record_mfa_delete_denial(&bucket);
4780 let err = if header.is_none() {
4781 crate::mfa::MfaError::Missing
4782 } else {
4783 crate::mfa::MfaError::InvalidCode
4784 };
4785 return Err(mfa_error_to_s3(err));
4786 }
4787 mgr.set_bucket_state(&bucket, target_enabled);
4788 }
4789 // v0.5 #34: stash the new state in the manager, then forward to
4790 // the backend so any downstream that *also* tracks state
4791 // (e.g. a real S3 backend) stays in sync. Manager-attached but
4792 // backend rejection is treated as a soft-fail (state is still
4793 // owned by the manager).
4794 if let Some(mgr) = self.versioning.as_ref() {
4795 let new_state = match req
4796 .input
4797 .versioning_configuration
4798 .status
4799 .as_ref()
4800 .map(|s| s.as_str())
4801 {
4802 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
4803 crate::versioning::VersioningState::Enabled
4804 }
4805 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
4806 crate::versioning::VersioningState::Suspended
4807 }
4808 _ => crate::versioning::VersioningState::Unversioned,
4809 };
4810 mgr.set_state(&req.input.bucket, new_state);
4811 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
4812 }
4813 self.backend.put_bucket_versioning(req).await
4814 }
4815
4816 // ---- Bucket location ----
4817 async fn get_bucket_location(
4818 &self,
4819 req: S3Request<GetBucketLocationInput>,
4820 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
4821 self.backend.get_bucket_location(req).await
4822 }
4823
4824 // ---- Bucket policy ----
4825 async fn get_bucket_policy(
4826 &self,
4827 req: S3Request<GetBucketPolicyInput>,
4828 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
4829 self.backend.get_bucket_policy(req).await
4830 }
4831 async fn put_bucket_policy(
4832 &self,
4833 req: S3Request<PutBucketPolicyInput>,
4834 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
4835 self.backend.put_bucket_policy(req).await
4836 }
4837 async fn delete_bucket_policy(
4838 &self,
4839 req: S3Request<DeleteBucketPolicyInput>,
4840 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
4841 self.backend.delete_bucket_policy(req).await
4842 }
4843 async fn get_bucket_policy_status(
4844 &self,
4845 req: S3Request<GetBucketPolicyStatusInput>,
4846 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
4847 self.backend.get_bucket_policy_status(req).await
4848 }
4849
4850 // ---- Bucket ACL ----
4851 async fn get_bucket_acl(
4852 &self,
4853 req: S3Request<GetBucketAclInput>,
4854 ) -> S3Result<S3Response<GetBucketAclOutput>> {
4855 self.backend.get_bucket_acl(req).await
4856 }
4857 async fn put_bucket_acl(
4858 &self,
4859 req: S3Request<PutBucketAclInput>,
4860 ) -> S3Result<S3Response<PutBucketAclOutput>> {
4861 self.backend.put_bucket_acl(req).await
4862 }
4863
4864 // ---- Bucket CORS (v0.6 #38) ----
4865 async fn get_bucket_cors(
4866 &self,
4867 req: S3Request<GetBucketCorsInput>,
4868 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
4869 if let Some(mgr) = self.cors.as_ref() {
4870 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4871 S3Error::with_message(
4872 S3ErrorCode::NoSuchCORSConfiguration,
4873 "The CORS configuration does not exist".to_string(),
4874 )
4875 })?;
4876 let rules: Vec<CORSRule> = cfg
4877 .rules
4878 .into_iter()
4879 .map(|r| CORSRule {
4880 allowed_headers: if r.allowed_headers.is_empty() {
4881 None
4882 } else {
4883 Some(r.allowed_headers)
4884 },
4885 allowed_methods: r.allowed_methods,
4886 allowed_origins: r.allowed_origins,
4887 expose_headers: if r.expose_headers.is_empty() {
4888 None
4889 } else {
4890 Some(r.expose_headers)
4891 },
4892 id: r.id,
4893 max_age_seconds: r.max_age_seconds.map(|s| s as i32),
4894 })
4895 .collect();
4896 return Ok(S3Response::new(GetBucketCorsOutput {
4897 cors_rules: Some(rules),
4898 }));
4899 }
4900 self.backend.get_bucket_cors(req).await
4901 }
4902 async fn put_bucket_cors(
4903 &self,
4904 req: S3Request<PutBucketCorsInput>,
4905 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4906 if let Some(mgr) = self.cors.as_ref() {
4907 let cfg = crate::cors::CorsConfig {
4908 rules: req
4909 .input
4910 .cors_configuration
4911 .cors_rules
4912 .into_iter()
4913 .map(|r| crate::cors::CorsRule {
4914 allowed_origins: r.allowed_origins,
4915 allowed_methods: r.allowed_methods,
4916 allowed_headers: r.allowed_headers.unwrap_or_default(),
4917 expose_headers: r.expose_headers.unwrap_or_default(),
4918 max_age_seconds: r.max_age_seconds.and_then(|s| {
4919 if s < 0 { None } else { Some(s as u32) }
4920 }),
4921 id: r.id,
4922 })
4923 .collect(),
4924 };
4925 mgr.put(&req.input.bucket, cfg);
4926 return Ok(S3Response::new(PutBucketCorsOutput::default()));
4927 }
4928 self.backend.put_bucket_cors(req).await
4929 }
4930 async fn delete_bucket_cors(
4931 &self,
4932 req: S3Request<DeleteBucketCorsInput>,
4933 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4934 if let Some(mgr) = self.cors.as_ref() {
4935 mgr.delete(&req.input.bucket);
4936 return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4937 }
4938 self.backend.delete_bucket_cors(req).await
4939 }
4940
4941 // ---- Bucket lifecycle (v0.6 #37) ----
4942 async fn get_bucket_lifecycle_configuration(
4943 &self,
4944 req: S3Request<GetBucketLifecycleConfigurationInput>,
4945 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4946 if let Some(mgr) = self.lifecycle.as_ref() {
4947 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4948 S3Error::with_message(
4949 S3ErrorCode::NoSuchLifecycleConfiguration,
4950 "The lifecycle configuration does not exist".to_string(),
4951 )
4952 })?;
4953 let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4954 return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4955 rules: Some(rules),
4956 transition_default_minimum_object_size: None,
4957 }));
4958 }
4959 self.backend.get_bucket_lifecycle_configuration(req).await
4960 }
4961 async fn put_bucket_lifecycle_configuration(
4962 &self,
4963 req: S3Request<PutBucketLifecycleConfigurationInput>,
4964 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4965 if let Some(mgr) = self.lifecycle.as_ref() {
4966 let bucket = req.input.bucket.clone();
4967 let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4968 let cfg = dto_lifecycle_to_internal(&dto_cfg);
4969 mgr.put(&bucket, cfg);
4970 return Ok(S3Response::new(
4971 PutBucketLifecycleConfigurationOutput::default(),
4972 ));
4973 }
4974 self.backend.put_bucket_lifecycle_configuration(req).await
4975 }
4976 async fn delete_bucket_lifecycle(
4977 &self,
4978 req: S3Request<DeleteBucketLifecycleInput>,
4979 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4980 if let Some(mgr) = self.lifecycle.as_ref() {
4981 mgr.delete(&req.input.bucket);
4982 return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4983 }
4984 self.backend.delete_bucket_lifecycle(req).await
4985 }
4986
4987 // ---- Bucket tagging (v0.6 #39) ----
4988 async fn get_bucket_tagging(
4989 &self,
4990 req: S3Request<GetBucketTaggingInput>,
4991 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4992 let Some(mgr) = self.tagging.as_ref() else {
4993 return self.backend.get_bucket_tagging(req).await;
4994 };
4995 let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4996 Ok(S3Response::new(GetBucketTaggingOutput {
4997 tag_set: tagset_to_aws(&tags),
4998 }))
4999 }
5000 async fn put_bucket_tagging(
5001 &self,
5002 req: S3Request<PutBucketTaggingInput>,
5003 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
5004 let Some(mgr) = self.tagging.as_ref() else {
5005 return self.backend.put_bucket_tagging(req).await;
5006 };
5007 let bucket = req.input.bucket.clone();
5008 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
5009 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
5010 })?;
5011 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
5012 mgr.put_bucket_tags(&bucket, parsed);
5013 Ok(S3Response::new(PutBucketTaggingOutput::default()))
5014 }
5015 async fn delete_bucket_tagging(
5016 &self,
5017 req: S3Request<DeleteBucketTaggingInput>,
5018 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
5019 let Some(mgr) = self.tagging.as_ref() else {
5020 return self.backend.delete_bucket_tagging(req).await;
5021 };
5022 let bucket = req.input.bucket.clone();
5023 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
5024 mgr.delete_bucket_tags(&bucket);
5025 Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
5026 }
5027
5028 // ---- Bucket encryption ----
5029 async fn get_bucket_encryption(
5030 &self,
5031 req: S3Request<GetBucketEncryptionInput>,
5032 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
5033 self.backend.get_bucket_encryption(req).await
5034 }
5035 async fn put_bucket_encryption(
5036 &self,
5037 req: S3Request<PutBucketEncryptionInput>,
5038 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
5039 self.backend.put_bucket_encryption(req).await
5040 }
5041 async fn delete_bucket_encryption(
5042 &self,
5043 req: S3Request<DeleteBucketEncryptionInput>,
5044 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
5045 self.backend.delete_bucket_encryption(req).await
5046 }
5047
5048 // ---- Bucket logging ----
5049 async fn get_bucket_logging(
5050 &self,
5051 req: S3Request<GetBucketLoggingInput>,
5052 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
5053 self.backend.get_bucket_logging(req).await
5054 }
5055 async fn put_bucket_logging(
5056 &self,
5057 req: S3Request<PutBucketLoggingInput>,
5058 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
5059 self.backend.put_bucket_logging(req).await
5060 }
5061
5062 // ---- Bucket notification (v0.6 #35) ----
5063 //
5064 // When a `NotificationManager` is attached, S4 itself owns per-bucket
5065 // notification configurations and the PUT / GET handlers route through
5066 // the manager. The wire DTO's queue / topic configurations map onto
5067 // S4's `Destination::Sqs` / `Destination::Sns`; LambdaFunction and
5068 // EventBridge configurations are accepted on PUT but silently dropped
5069 // (out of scope for v0.6 #35). When no manager is attached the legacy
5070 // backend-passthrough behaviour applies.
5071 async fn get_bucket_notification_configuration(
5072 &self,
5073 req: S3Request<GetBucketNotificationConfigurationInput>,
5074 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
5075 if let Some(mgr) = self.notifications.as_ref() {
5076 let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
5077 let dto = notif_to_dto(&cfg);
5078 return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
5079 event_bridge_configuration: dto.event_bridge_configuration,
5080 lambda_function_configurations: dto.lambda_function_configurations,
5081 queue_configurations: dto.queue_configurations,
5082 topic_configurations: dto.topic_configurations,
5083 }));
5084 }
5085 self.backend
5086 .get_bucket_notification_configuration(req)
5087 .await
5088 }
5089 async fn put_bucket_notification_configuration(
5090 &self,
5091 req: S3Request<PutBucketNotificationConfigurationInput>,
5092 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
5093 if let Some(mgr) = self.notifications.as_ref() {
5094 let cfg = notif_from_dto(&req.input.notification_configuration);
5095 mgr.put(&req.input.bucket, cfg);
5096 return Ok(S3Response::new(
5097 PutBucketNotificationConfigurationOutput::default(),
5098 ));
5099 }
5100 self.backend
5101 .put_bucket_notification_configuration(req)
5102 .await
5103 }
5104
5105 // ---- Bucket request payment ----
5106 async fn get_bucket_request_payment(
5107 &self,
5108 req: S3Request<GetBucketRequestPaymentInput>,
5109 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
5110 self.backend.get_bucket_request_payment(req).await
5111 }
5112 async fn put_bucket_request_payment(
5113 &self,
5114 req: S3Request<PutBucketRequestPaymentInput>,
5115 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
5116 self.backend.put_bucket_request_payment(req).await
5117 }
5118
5119 // ---- Bucket website ----
5120 async fn get_bucket_website(
5121 &self,
5122 req: S3Request<GetBucketWebsiteInput>,
5123 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
5124 self.backend.get_bucket_website(req).await
5125 }
5126 async fn put_bucket_website(
5127 &self,
5128 req: S3Request<PutBucketWebsiteInput>,
5129 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
5130 self.backend.put_bucket_website(req).await
5131 }
5132 async fn delete_bucket_website(
5133 &self,
5134 req: S3Request<DeleteBucketWebsiteInput>,
5135 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
5136 self.backend.delete_bucket_website(req).await
5137 }
5138
5139 // ---- Bucket replication (v0.6 #40) ----
5140 async fn get_bucket_replication(
5141 &self,
5142 req: S3Request<GetBucketReplicationInput>,
5143 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
5144 if let Some(mgr) = self.replication.as_ref() {
5145 return match mgr.get(&req.input.bucket) {
5146 Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
5147 replication_configuration: Some(replication_to_dto(&cfg)),
5148 })),
5149 None => Err(S3Error::with_message(
5150 S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
5151 format!("no replication configuration on bucket {}", req.input.bucket),
5152 )),
5153 };
5154 }
5155 self.backend.get_bucket_replication(req).await
5156 }
5157 async fn put_bucket_replication(
5158 &self,
5159 req: S3Request<PutBucketReplicationInput>,
5160 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
5161 if let Some(mgr) = self.replication.as_ref() {
5162 let cfg = replication_from_dto(&req.input.replication_configuration);
5163 mgr.put(&req.input.bucket, cfg);
5164 return Ok(S3Response::new(PutBucketReplicationOutput::default()));
5165 }
5166 self.backend.put_bucket_replication(req).await
5167 }
5168 async fn delete_bucket_replication(
5169 &self,
5170 req: S3Request<DeleteBucketReplicationInput>,
5171 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
5172 if let Some(mgr) = self.replication.as_ref() {
5173 mgr.delete(&req.input.bucket);
5174 return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
5175 }
5176 self.backend.delete_bucket_replication(req).await
5177 }
5178
5179 // ---- Bucket accelerate ----
5180 async fn get_bucket_accelerate_configuration(
5181 &self,
5182 req: S3Request<GetBucketAccelerateConfigurationInput>,
5183 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
5184 self.backend.get_bucket_accelerate_configuration(req).await
5185 }
5186 async fn put_bucket_accelerate_configuration(
5187 &self,
5188 req: S3Request<PutBucketAccelerateConfigurationInput>,
5189 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
5190 self.backend.put_bucket_accelerate_configuration(req).await
5191 }
5192
5193 // ---- Bucket ownership controls ----
5194 async fn get_bucket_ownership_controls(
5195 &self,
5196 req: S3Request<GetBucketOwnershipControlsInput>,
5197 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
5198 self.backend.get_bucket_ownership_controls(req).await
5199 }
5200 async fn put_bucket_ownership_controls(
5201 &self,
5202 req: S3Request<PutBucketOwnershipControlsInput>,
5203 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
5204 self.backend.put_bucket_ownership_controls(req).await
5205 }
5206 async fn delete_bucket_ownership_controls(
5207 &self,
5208 req: S3Request<DeleteBucketOwnershipControlsInput>,
5209 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
5210 self.backend.delete_bucket_ownership_controls(req).await
5211 }
5212
5213 // ---- Public access block ----
5214 async fn get_public_access_block(
5215 &self,
5216 req: S3Request<GetPublicAccessBlockInput>,
5217 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
5218 self.backend.get_public_access_block(req).await
5219 }
5220 async fn put_public_access_block(
5221 &self,
5222 req: S3Request<PutPublicAccessBlockInput>,
5223 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
5224 self.backend.put_public_access_block(req).await
5225 }
5226 async fn delete_public_access_block(
5227 &self,
5228 req: S3Request<DeletePublicAccessBlockInput>,
5229 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
5230 self.backend.delete_public_access_block(req).await
5231 }
5232
5233 // ====================================================================
5234 // v0.6 #41: S3 Select — server-side SQL filter on object body.
5235 //
5236 // Fetch the object via the regular `get_object` path (so SSE-C /
5237 // SSE-S4 / SSE-KMS / S4 codec all decompress + decrypt transparently),
5238 // run a small SQL subset (CSV + JSON Lines, equality / inequality /
5239 // LIKE / AND / OR / NOT) over the in-memory body, and stream the
5240 // matched rows back as AWS event-stream `Records` + `Stats` + `End`
5241 // frames.
5242 //
5243 // Limitations (deliberate, documented):
5244 // - Parquet input is rejected with NotImplemented.
5245 // - Aggregates / GROUP BY / JOIN / ORDER BY / LIMIT are rejected at
5246 // parse time as InvalidRequest (s3s 0.13 doesn't expose AWS's
5247 // domain-specific `InvalidSqlExpression` code).
5248 // - The body is fully buffered before SQL evaluation (S3 Select
5249 // streaming-during-evaluation is v0.7 scope).
5250 // - GPU-accelerated WHERE evaluation is stubbed out (always None).
5251 async fn select_object_content(
5252 &self,
5253 req: S3Request<SelectObjectContentInput>,
5254 ) -> S3Result<S3Response<SelectObjectContentOutput>> {
5255 use crate::select::{
5256 EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
5257 run_select_jsonlines,
5258 };
5259
5260 let select_bucket = req.input.bucket.clone();
5261 let select_key = req.input.key.clone();
5262 self.enforce_rate_limit(&req, &select_bucket)?;
5263 self.enforce_policy(
5264 &req,
5265 "s3:GetObject",
5266 &select_bucket,
5267 Some(&select_key),
5268 )?;
5269
5270 let request = req.input.request;
5271 let sql = request.expression.clone();
5272 if request.expression_type.as_str() != "SQL" {
5273 return Err(S3Error::with_message(
5274 S3ErrorCode::InvalidExpressionType,
5275 format!(
5276 "ExpressionType must be SQL, got: {}",
5277 request.expression_type.as_str()
5278 ),
5279 ));
5280 }
5281
5282 let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
5283 SelectInputFormat::JsonLines
5284 } else if let Some(csv) = request.input_serialization.csv.as_ref() {
5285 let has_header = csv
5286 .file_header_info
5287 .as_ref()
5288 .map(|h| {
5289 let s = h.as_str();
5290 s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
5291 })
5292 .unwrap_or(false);
5293 let delim = csv
5294 .field_delimiter
5295 .as_deref()
5296 .and_then(|s| s.chars().next())
5297 .unwrap_or(',');
5298 SelectInputFormat::Csv {
5299 has_header,
5300 delimiter: delim,
5301 }
5302 } else if request.input_serialization.parquet.is_some() {
5303 return Err(S3Error::with_message(
5304 S3ErrorCode::NotImplemented,
5305 "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
5306 ));
5307 } else {
5308 return Err(S3Error::with_message(
5309 S3ErrorCode::InvalidRequest,
5310 "InputSerialization requires exactly one of CSV / JSON / Parquet",
5311 ));
5312 };
5313 if let Some(ct) = request.input_serialization.compression_type.as_ref()
5314 && !ct.as_str().eq_ignore_ascii_case("NONE")
5315 {
5316 return Err(S3Error::with_message(
5317 S3ErrorCode::NotImplemented,
5318 format!(
5319 "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
5320 ct.as_str()
5321 ),
5322 ));
5323 }
5324
5325 let output_format = if request.output_serialization.json.is_some() {
5326 SelectOutputFormat::Json
5327 } else if request.output_serialization.csv.is_some() {
5328 SelectOutputFormat::Csv
5329 } else {
5330 return Err(S3Error::with_message(
5331 S3ErrorCode::InvalidRequest,
5332 "OutputSerialization requires exactly one of CSV / JSON",
5333 ));
5334 };
5335
5336 let get_input = GetObjectInput {
5337 bucket: select_bucket.clone(),
5338 key: select_key.clone(),
5339 sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
5340 sse_customer_key: req.input.sse_customer_key.clone(),
5341 sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
5342 ..Default::default()
5343 };
5344 let get_req = S3Request {
5345 input: get_input,
5346 method: http::Method::GET,
5347 uri: format!("/{}/{}", select_bucket, select_key)
5348 .parse()
5349 .map_err(|e| {
5350 S3Error::with_message(
5351 S3ErrorCode::InternalError,
5352 format!("constructing inner GET URI: {e}"),
5353 )
5354 })?,
5355 headers: http::HeaderMap::new(),
5356 extensions: http::Extensions::new(),
5357 credentials: req.credentials.clone(),
5358 region: req.region.clone(),
5359 service: req.service.clone(),
5360 trailing_headers: None,
5361 };
5362 let mut get_resp = self.get_object(get_req).await?;
5363 let blob = get_resp.output.body.take().ok_or_else(|| {
5364 S3Error::with_message(
5365 S3ErrorCode::InternalError,
5366 "Select: object body was empty after GET",
5367 )
5368 })?;
5369 let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
5370 .await
5371 .map_err(internal("collect Select body"))?;
5372 let scanned = body_bytes.len() as u64;
5373
5374 let matched_payload = match input_format {
5375 SelectInputFormat::JsonLines => {
5376 run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
5377 |e| select_error_to_s3(e, "JSON Lines"),
5378 )?
5379 }
5380 SelectInputFormat::Csv { .. } => {
5381 run_select_csv(&sql, &body_bytes, input_format, output_format)
5382 .map_err(|e| select_error_to_s3(e, "CSV"))?
5383 }
5384 };
5385
5386 let returned = matched_payload.len() as u64;
5387 let processed = scanned;
5388 let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
5389 if !matched_payload.is_empty() {
5390 events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
5391 payload: Some(bytes::Bytes::from(matched_payload)),
5392 })));
5393 }
5394 events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
5395 details: Some(Stats {
5396 bytes_scanned: Some(scanned as i64),
5397 bytes_processed: Some(processed as i64),
5398 bytes_returned: Some(returned as i64),
5399 }),
5400 })));
5401 events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
5402 // Touch EventStreamWriter so the public API stays linked into the
5403 // build (the actual wire framing is delegated to s3s).
5404 let _writer = EventStreamWriter::new();
5405
5406 let stream =
5407 SelectObjectContentEventStream::new(futures::stream::iter(events));
5408 let output = SelectObjectContentOutput {
5409 payload: Some(stream),
5410 };
5411 Ok(S3Response::new(output))
5412 }
5413
5414 // ---- Bucket Inventory configuration (v0.6 #36) ----
5415 //
5416 // When an `InventoryManager` is attached, S4-server owns the
5417 // configuration store and these handlers no longer pass through to
5418 // the backend. The mapping between the s3s-typed
5419 // `InventoryConfiguration` and the inventory module's internal
5420 // `InventoryConfig` is intentionally lossy: only the fields S4
5421 // actually uses for periodic CSV emission survive the round trip
5422 // (id, source bucket, destination bucket / prefix, format, included
5423 // versions, schedule frequency). Optional fields, encryption, and
5424 // filter prefixes are accepted on PUT and re-surfaced on GET via
5425 // a best-effort default-shape `InventoryConfiguration` so the
5426 // client sees a roundtrip-clean response.
5427 async fn put_bucket_inventory_configuration(
5428 &self,
5429 req: S3Request<PutBucketInventoryConfigurationInput>,
5430 ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
5431 if let Some(mgr) = self.inventory.as_ref() {
5432 let cfg = inv_from_dto(
5433 &req.input.bucket,
5434 &req.input.id,
5435 &req.input.inventory_configuration,
5436 );
5437 mgr.put(cfg);
5438 return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
5439 }
5440 self.backend.put_bucket_inventory_configuration(req).await
5441 }
5442
5443 async fn get_bucket_inventory_configuration(
5444 &self,
5445 req: S3Request<GetBucketInventoryConfigurationInput>,
5446 ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
5447 if let Some(mgr) = self.inventory.as_ref() {
5448 let cfg = mgr.get(&req.input.bucket, &req.input.id);
5449 if let Some(cfg) = cfg {
5450 let out = GetBucketInventoryConfigurationOutput {
5451 inventory_configuration: Some(inv_to_dto(&cfg)),
5452 };
5453 return Ok(S3Response::new(out));
5454 }
5455 // AWS returns `NoSuchConfiguration` (404) when the id has no
5456 // matching inventory configuration on the bucket. The
5457 // generated `S3ErrorCode` enum doesn't expose a typed variant
5458 // for this code, so we round-trip through `from_bytes` which
5459 // wraps unknown codes as `Custom(...)` (= the AWS-canonical
5460 // error-code string survives into the XML response envelope).
5461 let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
5462 .unwrap_or(S3ErrorCode::NoSuchKey);
5463 return Err(S3Error::with_message(
5464 code,
5465 format!(
5466 "no inventory configuration with id={} on bucket={}",
5467 req.input.id, req.input.bucket
5468 ),
5469 ));
5470 }
5471 self.backend.get_bucket_inventory_configuration(req).await
5472 }
5473
5474 async fn list_bucket_inventory_configurations(
5475 &self,
5476 req: S3Request<ListBucketInventoryConfigurationsInput>,
5477 ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
5478 if let Some(mgr) = self.inventory.as_ref() {
5479 let list = mgr.list_for_bucket(&req.input.bucket);
5480 let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
5481 let out = ListBucketInventoryConfigurationsOutput {
5482 continuation_token: req.input.continuation_token.clone(),
5483 inventory_configuration_list: if dto_list.is_empty() {
5484 None
5485 } else {
5486 Some(dto_list)
5487 },
5488 is_truncated: Some(false),
5489 next_continuation_token: None,
5490 };
5491 return Ok(S3Response::new(out));
5492 }
5493 self.backend.list_bucket_inventory_configurations(req).await
5494 }
5495
5496 async fn delete_bucket_inventory_configuration(
5497 &self,
5498 req: S3Request<DeleteBucketInventoryConfigurationInput>,
5499 ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
5500 if let Some(mgr) = self.inventory.as_ref() {
5501 mgr.delete(&req.input.bucket, &req.input.id);
5502 return Ok(S3Response::new(
5503 DeleteBucketInventoryConfigurationOutput::default(),
5504 ));
5505 }
5506 self.backend.delete_bucket_inventory_configuration(req).await
5507 }
5508}
5509
5510// ---------------------------------------------------------------------------
5511// v0.6 #36: Convert between the s3s-typed `InventoryConfiguration` (the wire
5512// surface) and our internal `crate::inventory::InventoryConfig`. Only the
5513// fields S4 actually uses for CSV emission survive the round trip; the
5514// missing fields (filter prefix, optional fields, encryption) are dropped on
5515// PUT and re-rendered as the AWS-default shape on GET so the client sees a
5516// well-formed `InventoryConfiguration`.
5517// ---------------------------------------------------------------------------
5518
5519fn inv_from_dto(
5520 bucket: &str,
5521 id: &str,
5522 dto: &InventoryConfiguration,
5523) -> crate::inventory::InventoryConfig {
5524 let frequency_hours = match dto.schedule.frequency.as_str() {
5525 "Weekly" => 24 * 7,
5526 // Daily is the default; anything S4 doesn't recognise (incl.
5527 // empty, which is the s3s-default) maps to Daily so the
5528 // operator's PUT doesn't silently turn into a no-op cadence.
5529 _ => 24,
5530 };
5531 // Parquet/ORC are not supported (issue #36 scope); we still accept
5532 // the PUT so callers don't fail-loud, but we record CSV and rely on
5533 // the operator catching the discrepancy on GET.
5534 let format = crate::inventory::InventoryFormat::Csv;
5535 crate::inventory::InventoryConfig {
5536 id: id.to_owned(),
5537 bucket: bucket.to_owned(),
5538 destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
5539 destination_prefix: dto
5540 .destination
5541 .s3_bucket_destination
5542 .prefix
5543 .clone()
5544 .unwrap_or_default(),
5545 frequency_hours,
5546 format,
5547 included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
5548 dto.included_object_versions.as_str(),
5549 ),
5550 }
5551}
5552
5553fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
5554 InventoryConfiguration {
5555 id: cfg.id.clone(),
5556 is_enabled: true,
5557 included_object_versions: InventoryIncludedObjectVersions::from(
5558 cfg.included_object_versions.as_aws_str().to_owned(),
5559 ),
5560 destination: InventoryDestination {
5561 s3_bucket_destination: InventoryS3BucketDestination {
5562 account_id: None,
5563 bucket: cfg.destination_bucket.clone(),
5564 encryption: None,
5565 format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
5566 prefix: if cfg.destination_prefix.is_empty() {
5567 None
5568 } else {
5569 Some(cfg.destination_prefix.clone())
5570 },
5571 },
5572 },
5573 schedule: InventorySchedule {
5574 // `frequency_hours == 168` -> Weekly; everything else maps to
5575 // Daily for the wire response (the manager keeps the precise
5576 // hour count internally for due-checking).
5577 frequency: InventoryFrequency::from(
5578 if cfg.frequency_hours == 24 * 7 {
5579 "Weekly"
5580 } else {
5581 "Daily"
5582 }
5583 .to_owned(),
5584 ),
5585 },
5586 filter: None,
5587 optional_fields: None,
5588 }
5589}
5590
5591// ---------------------------------------------------------------------------
5592// v0.6 #35: Convert between the s3s-typed `NotificationConfiguration` (the
5593// wire surface) and our internal `crate::notifications::NotificationConfig`.
5594//
5595// We support TopicConfiguration (-> Destination::Sns) and QueueConfiguration
5596// (-> Destination::Sqs). LambdaFunction and EventBridge configurations are
5597// silently dropped on PUT (out of scope for v0.6 #35); the GET response only
5598// surfaces topic / queue rules.
5599//
5600// The webhook destination has no AWS-native wire form: operators configure
5601// webhooks via the JSON snapshot file (`--notifications-state-file`) or by
5602// poking `NotificationManager::put` directly from a custom binary. This
5603// keeps the wire surface AWS-compatible while still letting the always-
5604// available `Webhook` destination be reachable.
5605// ---------------------------------------------------------------------------
5606
5607fn notif_from_dto(
5608 dto: &NotificationConfiguration,
5609) -> crate::notifications::NotificationConfig {
5610 let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
5611 if let Some(topics) = dto.topic_configurations.as_ref() {
5612 for (idx, t) in topics.iter().enumerate() {
5613 let events = events_from_dto(&t.events);
5614 let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
5615 rules.push(crate::notifications::NotificationRule {
5616 id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
5617 events,
5618 destination: crate::notifications::Destination::Sns {
5619 topic_arn: t.topic_arn.clone(),
5620 },
5621 filter_prefix: prefix,
5622 filter_suffix: suffix,
5623 });
5624 }
5625 }
5626 if let Some(queues) = dto.queue_configurations.as_ref() {
5627 for (idx, q) in queues.iter().enumerate() {
5628 let events = events_from_dto(&q.events);
5629 let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
5630 rules.push(crate::notifications::NotificationRule {
5631 id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
5632 events,
5633 destination: crate::notifications::Destination::Sqs {
5634 queue_arn: q.queue_arn.clone(),
5635 },
5636 filter_prefix: prefix,
5637 filter_suffix: suffix,
5638 });
5639 }
5640 }
5641 crate::notifications::NotificationConfig { rules }
5642}
5643
5644fn notif_to_dto(
5645 cfg: &crate::notifications::NotificationConfig,
5646) -> NotificationConfiguration {
5647 let mut topics: Vec<TopicConfiguration> = Vec::new();
5648 let mut queues: Vec<QueueConfiguration> = Vec::new();
5649 for rule in &cfg.rules {
5650 let events: Vec<Event> = rule
5651 .events
5652 .iter()
5653 .map(|e| Event::from(e.as_aws_str().to_owned()))
5654 .collect();
5655 let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
5656 match &rule.destination {
5657 crate::notifications::Destination::Sns { topic_arn } => {
5658 topics.push(TopicConfiguration {
5659 events,
5660 filter,
5661 id: Some(rule.id.clone()),
5662 topic_arn: topic_arn.clone(),
5663 });
5664 }
5665 crate::notifications::Destination::Sqs { queue_arn } => {
5666 queues.push(QueueConfiguration {
5667 events,
5668 filter,
5669 id: Some(rule.id.clone()),
5670 queue_arn: queue_arn.clone(),
5671 });
5672 }
5673 // Webhook destinations have no AWS wire equivalent — they
5674 // round-trip through the JSON snapshot only. Skip them on the
5675 // GET surface (an SDK consumer wouldn't know what to do with
5676 // them anyway).
5677 crate::notifications::Destination::Webhook { .. } => {}
5678 }
5679 }
5680 NotificationConfiguration {
5681 event_bridge_configuration: None,
5682 lambda_function_configurations: None,
5683 queue_configurations: if queues.is_empty() { None } else { Some(queues) },
5684 topic_configurations: if topics.is_empty() { None } else { Some(topics) },
5685 }
5686}
5687
5688fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
5689 events
5690 .iter()
5691 .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
5692 .collect()
5693}
5694
5695fn filter_from_dto(
5696 f: Option<&NotificationConfigurationFilter>,
5697) -> (Option<String>, Option<String>) {
5698 let Some(f) = f else {
5699 return (None, None);
5700 };
5701 let Some(key) = f.key.as_ref() else {
5702 return (None, None);
5703 };
5704 let Some(rules) = key.filter_rules.as_ref() else {
5705 return (None, None);
5706 };
5707 let mut prefix = None;
5708 let mut suffix = None;
5709 for r in rules {
5710 let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
5711 let value = r.value.clone();
5712 match name.as_deref() {
5713 Some("prefix") => prefix = value,
5714 Some("suffix") => suffix = value,
5715 _ => {}
5716 }
5717 }
5718 (prefix, suffix)
5719}
5720
5721fn filter_to_dto(
5722 prefix: Option<&str>,
5723 suffix: Option<&str>,
5724) -> Option<NotificationConfigurationFilter> {
5725 if prefix.is_none() && suffix.is_none() {
5726 return None;
5727 }
5728 let mut rules: Vec<FilterRule> = Vec::new();
5729 if let Some(p) = prefix {
5730 rules.push(FilterRule {
5731 name: Some(FilterRuleName::from("prefix".to_owned())),
5732 value: Some(p.to_owned()),
5733 });
5734 }
5735 if let Some(s) = suffix {
5736 rules.push(FilterRule {
5737 name: Some(FilterRuleName::from("suffix".to_owned())),
5738 value: Some(s.to_owned()),
5739 });
5740 }
5741 Some(NotificationConfigurationFilter {
5742 key: Some(S3KeyFilter {
5743 filter_rules: Some(rules),
5744 }),
5745 })
5746}
5747
5748// ---------------------------------------------------------------------------
5749// v0.6 #40: Convert between the s3s-typed `ReplicationConfiguration` (the
5750// wire surface) and our internal `crate::replication::ReplicationConfig`.
5751// AWS's `ReplicationRuleFilter` is a sum type — `Prefix | Tag | And { Prefix,
5752// Tags }`; we flatten it into the single `(prefix, tag-vec)` representation
5753// the matcher needs. Sub-blocks v0.6 #40 does not implement
5754// (DeleteMarkerReplication / SourceSelectionCriteria / ReplicationTime /
5755// Metrics / EncryptionConfiguration) round-trip as `None` on GET — operators
5756// who set them on PUT see them silently dropped, mirroring "feature not
5757// supported in this release" semantics.
5758// ---------------------------------------------------------------------------
5759
5760fn replication_from_dto(
5761 dto: &ReplicationConfiguration,
5762) -> crate::replication::ReplicationConfig {
5763 let rules = dto
5764 .rules
5765 .iter()
5766 .enumerate()
5767 .map(|(idx, r)| {
5768 let id = r
5769 .id
5770 .as_ref()
5771 .map(|s| s.as_str().to_owned())
5772 .unwrap_or_else(|| format!("rule-{idx}"));
5773 let priority = r.priority.unwrap_or(0).max(0) as u32;
5774 let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
5775 let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
5776 let destination_bucket = r.destination.bucket.clone();
5777 let destination_storage_class = r
5778 .destination
5779 .storage_class
5780 .as_ref()
5781 .map(|s| s.as_str().to_owned());
5782 crate::replication::ReplicationRule {
5783 id,
5784 priority,
5785 status_enabled,
5786 filter,
5787 destination_bucket,
5788 destination_storage_class,
5789 }
5790 })
5791 .collect();
5792 crate::replication::ReplicationConfig {
5793 role: dto.role.clone(),
5794 rules,
5795 }
5796}
5797
5798fn replication_to_dto(
5799 cfg: &crate::replication::ReplicationConfig,
5800) -> ReplicationConfiguration {
5801 let rules = cfg
5802 .rules
5803 .iter()
5804 .map(|r| {
5805 let status = if r.status_enabled {
5806 ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
5807 } else {
5808 ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
5809 };
5810 let destination = Destination {
5811 access_control_translation: None,
5812 account: None,
5813 bucket: r.destination_bucket.clone(),
5814 encryption_configuration: None,
5815 metrics: None,
5816 replication_time: None,
5817 storage_class: r
5818 .destination_storage_class
5819 .as_ref()
5820 .map(|s| StorageClass::from(s.clone())),
5821 };
5822 let filter = Some(replication_filter_to_dto(&r.filter));
5823 ReplicationRule {
5824 delete_marker_replication: None,
5825 destination,
5826 existing_object_replication: None,
5827 filter,
5828 id: Some(r.id.clone()),
5829 prefix: None,
5830 priority: Some(r.priority as i32),
5831 source_selection_criteria: None,
5832 status,
5833 }
5834 })
5835 .collect();
5836 ReplicationConfiguration {
5837 role: cfg.role.clone(),
5838 rules,
5839 }
5840}
5841
5842fn replication_filter_from_dto(
5843 f: Option<&ReplicationRuleFilter>,
5844 rule_level_prefix: Option<&str>,
5845) -> crate::replication::ReplicationFilter {
5846 let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
5847 let mut tags: Vec<(String, String)> = Vec::new();
5848 if let Some(f) = f {
5849 if let Some(p) = f.prefix.as_ref()
5850 && prefix.is_none()
5851 {
5852 prefix = Some(p.clone());
5853 }
5854 if let Some(t) = f.tag.as_ref()
5855 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5856 {
5857 tags.push((k.clone(), v.clone()));
5858 }
5859 if let Some(and) = f.and.as_ref() {
5860 if let Some(p) = and.prefix.as_ref()
5861 && prefix.is_none()
5862 {
5863 prefix = Some(p.clone());
5864 }
5865 if let Some(ts) = and.tags.as_ref() {
5866 for t in ts {
5867 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5868 tags.push((k.clone(), v.clone()));
5869 }
5870 }
5871 }
5872 }
5873 }
5874 crate::replication::ReplicationFilter { prefix, tags }
5875}
5876
5877fn replication_filter_to_dto(
5878 f: &crate::replication::ReplicationFilter,
5879) -> ReplicationRuleFilter {
5880 if f.tags.is_empty() {
5881 ReplicationRuleFilter {
5882 and: None,
5883 prefix: f.prefix.clone(),
5884 tag: None,
5885 }
5886 } else if f.tags.len() == 1 && f.prefix.is_none() {
5887 let (k, v) = &f.tags[0];
5888 ReplicationRuleFilter {
5889 and: None,
5890 prefix: None,
5891 tag: Some(Tag {
5892 key: Some(k.clone()),
5893 value: Some(v.clone()),
5894 }),
5895 }
5896 } else {
5897 let tags: Vec<Tag> = f
5898 .tags
5899 .iter()
5900 .map(|(k, v)| Tag {
5901 key: Some(k.clone()),
5902 value: Some(v.clone()),
5903 })
5904 .collect();
5905 ReplicationRuleFilter {
5906 and: Some(ReplicationRuleAndOperator {
5907 prefix: f.prefix.clone(),
5908 tags: Some(tags),
5909 }),
5910 prefix: None,
5911 tag: None,
5912 }
5913 }
5914}
5915
5916// ---------------------------------------------------------------------------
5917// v0.6 #37: Convert between the s3s-typed `BucketLifecycleConfiguration`
5918// (the wire surface) and our internal `crate::lifecycle::LifecycleConfig`.
5919// The internal representation flattens AWS's "Filter | And" disjunction
5920// into a single `LifecycleFilter` struct of optional fields plus a tag
5921// vector. Fields S4's evaluator does not consume
5922// (`expired_object_delete_marker`, `noncurrent_version_transitions`,
5923// `transition_default_minimum_object_size`, the storage class on the
5924// noncurrent expiration) are dropped on PUT and re-rendered as their
5925// AWS-default shape on GET so the client always sees a well-formed
5926// configuration.
5927// ---------------------------------------------------------------------------
5928
5929fn dto_lifecycle_to_internal(
5930 dto: &BucketLifecycleConfiguration,
5931) -> crate::lifecycle::LifecycleConfig {
5932 crate::lifecycle::LifecycleConfig {
5933 rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5934 }
5935}
5936
5937fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5938 let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5939 let filter = rule
5940 .filter
5941 .as_ref()
5942 .map(dto_filter_to_internal)
5943 .unwrap_or_default();
5944 let expiration_days = rule
5945 .expiration
5946 .as_ref()
5947 .and_then(|e| e.days)
5948 .and_then(|d| u32::try_from(d).ok());
5949 let expiration_date = rule
5950 .expiration
5951 .as_ref()
5952 .and_then(|e| e.date.as_ref())
5953 .and_then(timestamp_to_chrono_utc);
5954 let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5955 .transitions
5956 .as_ref()
5957 .map(|ts| {
5958 ts.iter()
5959 .filter_map(|t| {
5960 let days = u32::try_from(t.days?).ok()?;
5961 let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5962 Some(crate::lifecycle::TransitionRule {
5963 days,
5964 storage_class,
5965 })
5966 })
5967 .collect()
5968 })
5969 .unwrap_or_default();
5970 let noncurrent_version_expiration_days = rule
5971 .noncurrent_version_expiration
5972 .as_ref()
5973 .and_then(|n| n.noncurrent_days)
5974 .and_then(|d| u32::try_from(d).ok());
5975 let abort_incomplete_multipart_upload_days = rule
5976 .abort_incomplete_multipart_upload
5977 .as_ref()
5978 .and_then(|a| a.days_after_initiation)
5979 .and_then(|d| u32::try_from(d).ok());
5980 crate::lifecycle::LifecycleRule {
5981 id: rule.id.clone().unwrap_or_default(),
5982 status,
5983 filter,
5984 expiration_days,
5985 expiration_date,
5986 transitions,
5987 noncurrent_version_expiration_days,
5988 abort_incomplete_multipart_upload_days,
5989 }
5990}
5991
5992fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5993 let mut prefix = filter.prefix.clone();
5994 let mut tags: Vec<(String, String)> = Vec::new();
5995 let mut size_gt: Option<u64> = filter
5996 .object_size_greater_than
5997 .and_then(|n| u64::try_from(n).ok());
5998 let mut size_lt: Option<u64> = filter
5999 .object_size_less_than
6000 .and_then(|n| u64::try_from(n).ok());
6001 if let Some(t) = &filter.tag
6002 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
6003 {
6004 tags.push((k.clone(), v.clone()));
6005 }
6006 if let Some(and) = &filter.and {
6007 if prefix.is_none() {
6008 prefix = and.prefix.clone();
6009 }
6010 if size_gt.is_none() {
6011 size_gt = and
6012 .object_size_greater_than
6013 .and_then(|n| u64::try_from(n).ok());
6014 }
6015 if size_lt.is_none() {
6016 size_lt = and
6017 .object_size_less_than
6018 .and_then(|n| u64::try_from(n).ok());
6019 }
6020 if let Some(ts) = &and.tags {
6021 for t in ts {
6022 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
6023 tags.push((k.clone(), v.clone()));
6024 }
6025 }
6026 }
6027 }
6028 crate::lifecycle::LifecycleFilter {
6029 prefix,
6030 tags,
6031 object_size_greater_than: size_gt,
6032 object_size_less_than: size_lt,
6033 }
6034}
6035
6036fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
6037 let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
6038 Some(LifecycleExpiration {
6039 date: rule.expiration_date.map(chrono_utc_to_timestamp),
6040 days: rule.expiration_days.map(|d| d as i32),
6041 expired_object_delete_marker: None,
6042 })
6043 } else {
6044 None
6045 };
6046 let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
6047 None
6048 } else {
6049 Some(
6050 rule.transitions
6051 .iter()
6052 .map(|t| Transition {
6053 date: None,
6054 days: Some(t.days as i32),
6055 storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
6056 })
6057 .collect(),
6058 )
6059 };
6060 let noncurrent_version_expiration =
6061 rule.noncurrent_version_expiration_days
6062 .map(|d| NoncurrentVersionExpiration {
6063 newer_noncurrent_versions: None,
6064 noncurrent_days: Some(d as i32),
6065 });
6066 let abort_incomplete_multipart_upload =
6067 rule.abort_incomplete_multipart_upload_days
6068 .map(|d| AbortIncompleteMultipartUpload {
6069 days_after_initiation: Some(d as i32),
6070 });
6071 let filter = if rule.filter.tags.is_empty()
6072 && rule.filter.object_size_greater_than.is_none()
6073 && rule.filter.object_size_less_than.is_none()
6074 {
6075 rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
6076 and: None,
6077 object_size_greater_than: None,
6078 object_size_less_than: None,
6079 prefix: Some(p.clone()),
6080 tag: None,
6081 })
6082 } else if rule.filter.tags.len() == 1
6083 && rule.filter.prefix.is_none()
6084 && rule.filter.object_size_greater_than.is_none()
6085 && rule.filter.object_size_less_than.is_none()
6086 {
6087 let (k, v) = rule.filter.tags[0].clone();
6088 Some(LifecycleRuleFilter {
6089 and: None,
6090 object_size_greater_than: None,
6091 object_size_less_than: None,
6092 prefix: None,
6093 tag: Some(Tag {
6094 key: Some(k),
6095 value: Some(v),
6096 }),
6097 })
6098 } else {
6099 let tags = if rule.filter.tags.is_empty() {
6100 None
6101 } else {
6102 Some(
6103 rule.filter
6104 .tags
6105 .iter()
6106 .map(|(k, v)| Tag {
6107 key: Some(k.clone()),
6108 value: Some(v.clone()),
6109 })
6110 .collect(),
6111 )
6112 };
6113 Some(LifecycleRuleFilter {
6114 and: Some(LifecycleRuleAndOperator {
6115 object_size_greater_than: rule
6116 .filter
6117 .object_size_greater_than
6118 .and_then(|n| i64::try_from(n).ok()),
6119 object_size_less_than: rule
6120 .filter
6121 .object_size_less_than
6122 .and_then(|n| i64::try_from(n).ok()),
6123 prefix: rule.filter.prefix.clone(),
6124 tags,
6125 }),
6126 object_size_greater_than: None,
6127 object_size_less_than: None,
6128 prefix: None,
6129 tag: None,
6130 })
6131 };
6132 LifecycleRule {
6133 abort_incomplete_multipart_upload,
6134 expiration,
6135 filter,
6136 id: if rule.id.is_empty() {
6137 None
6138 } else {
6139 Some(rule.id.clone())
6140 },
6141 noncurrent_version_expiration,
6142 noncurrent_version_transitions: None,
6143 prefix: None,
6144 status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
6145 transitions,
6146 }
6147}
6148
6149// (timestamp <-> chrono helpers `timestamp_to_chrono_utc` /
6150// `chrono_utc_to_timestamp` are defined earlier in this file for the
6151// tagging/notifications work; the lifecycle DTO converters reuse them.)
6152
6153// ---------------------------------------------------------------------------
6154// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
6155//
6156// Kept as a self-contained block at the bottom of the file so it doesn't
6157// touch the existing `S4Service` struct, `new()`, or any of the per-op
6158// handlers above. The hook is wired in by the binary at server-build time
6159// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
6160//
6161// Lifecycle:
6162// 1. `SigV4aGate::new(store)` is constructed once at boot from the
6163// operator-supplied credential directory.
6164// 2. For each incoming request, `SigV4aGate::pre_route(&req,
6165// &requested_region, &canonical_request_bytes)` is invoked BEFORE
6166// the request hits the S3 framework. If the request claims SigV4a
6167// and verifies, control returns to the framework. Otherwise a 403
6168// `SignatureDoesNotMatch` is produced.
6169// 3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
6170// ---------------------------------------------------------------------------
6171
6172/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
6173///
6174/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
6175/// `pre_route` entry point that returns `Ok(())` for both
6176/// "request is plain SigV4 — pass through" and "request is SigV4a and
6177/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
6178/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
6179#[derive(Debug, Clone)]
6180pub struct SigV4aGate {
6181 store: crate::sigv4a::SharedSigV4aCredentialStore,
6182}
6183
6184impl SigV4aGate {
6185 #[must_use]
6186 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
6187 Self { store }
6188 }
6189
6190 /// Inspect an incoming HTTP request. Behaviour:
6191 ///
6192 /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
6193 /// prefix) → returns `Ok(())`; the framework's existing SigV4
6194 /// path handles the request.
6195 /// - SigV4a + valid signature + region match → `Ok(())`.
6196 /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
6197 /// - SigV4a + bad signature / region mismatch → `Err` with
6198 /// `SignatureDoesNotMatch`.
6199 ///
6200 /// `canonical_request_bytes` is the SigV4a string-to-sign (or
6201 /// canonical-request bytes; the caller decides) that the framework
6202 /// has already produced for this request. Keeping it as a parameter
6203 /// instead of rebuilding it inside the hook avoids duplicating the
6204 /// canonicalisation logic.
6205 pub fn pre_route<B>(
6206 &self,
6207 req: &http::Request<B>,
6208 requested_region: &str,
6209 canonical_request_bytes: &[u8],
6210 ) -> Result<(), SigV4aGateError> {
6211 if !crate::sigv4a::detect(req) {
6212 return Ok(());
6213 }
6214 let auth_hdr = req
6215 .headers()
6216 .get(http::header::AUTHORIZATION)
6217 .and_then(|v| v.to_str().ok())
6218 .ok_or(SigV4aGateError::MissingAuthorization)?;
6219 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
6220 .ok_or(SigV4aGateError::MalformedAuthorization)?;
6221 let region_set = req
6222 .headers()
6223 .get(crate::sigv4a::REGION_SET_HEADER)
6224 .and_then(|v| v.to_str().ok())
6225 .unwrap_or("*");
6226 let key = self
6227 .store
6228 .get(&parsed.access_key_id)
6229 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
6230 crate::sigv4a::verify(
6231 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
6232 &parsed.signature_der,
6233 key,
6234 region_set,
6235 requested_region,
6236 )
6237 .map_err(SigV4aGateError::Verify)?;
6238 Ok(())
6239 }
6240}
6241
6242/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
6243/// HTTP 403 with one of the two AWS-standard error codes
6244/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
6245#[derive(Debug, thiserror::Error)]
6246pub enum SigV4aGateError {
6247 #[error("missing Authorization header")]
6248 MissingAuthorization,
6249 #[error("malformed SigV4a Authorization header")]
6250 MalformedAuthorization,
6251 #[error("unknown SigV4a access-key-id: {0}")]
6252 UnknownAccessKey(String),
6253 #[error("SigV4a verification failed: {0}")]
6254 Verify(#[source] crate::sigv4a::SigV4aError),
6255}
6256
6257impl SigV4aGateError {
6258 /// AWS S3 error code that should accompany a 403 response.
6259 #[must_use]
6260 pub fn s3_error_code(&self) -> &'static str {
6261 match self {
6262 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
6263 _ => "SignatureDoesNotMatch",
6264 }
6265 }
6266}
6267
6268#[cfg(test)]
6269mod tests {
6270 use super::*;
6271
6272 #[test]
6273 fn manifest_roundtrip_via_metadata() {
6274 let original = ChunkManifest {
6275 codec: CodecKind::CpuZstd,
6276 original_size: 1234,
6277 compressed_size: 567,
6278 crc32c: 0xdead_beef,
6279 };
6280 let mut meta: Option<Metadata> = None;
6281 write_manifest(&mut meta, &original);
6282 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
6283 assert_eq!(extracted.codec, original.codec);
6284 assert_eq!(extracted.original_size, original.original_size);
6285 assert_eq!(extracted.compressed_size, original.compressed_size);
6286 assert_eq!(extracted.crc32c, original.crc32c);
6287 }
6288
6289 #[test]
6290 fn missing_metadata_yields_none() {
6291 let meta: Option<Metadata> = None;
6292 assert!(extract_manifest(&meta).is_none());
6293 }
6294
6295 #[test]
6296 fn partial_metadata_yields_none() {
6297 let mut meta = Metadata::new();
6298 meta.insert(META_CODEC.into(), "cpu-zstd".into());
6299 let opt = Some(meta);
6300 assert!(extract_manifest(&opt).is_none());
6301 }
6302
6303 #[test]
6304 fn parse_copy_source_range_basic() {
6305 let r = parse_copy_source_range("bytes=10-20").unwrap();
6306 match r {
6307 s3s::dto::Range::Int { first, last } => {
6308 assert_eq!(first, 10);
6309 assert_eq!(last, Some(20));
6310 }
6311 _ => panic!("expected Int range"),
6312 }
6313 }
6314
6315 #[test]
6316 fn parse_copy_source_range_rejects_inverted() {
6317 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
6318 assert!(err.contains("last < first"));
6319 }
6320
6321 #[test]
6322 fn parse_copy_source_range_rejects_missing_prefix() {
6323 let err = parse_copy_source_range("10-20").unwrap_err();
6324 assert!(err.contains("must start with 'bytes='"));
6325 }
6326
6327 #[test]
6328 fn parse_copy_source_range_rejects_open_ended() {
6329 // S3 upload_part_copy spec requires N-M (closed); suffix and
6330 // open-ended forms are not allowed for this header.
6331 assert!(parse_copy_source_range("bytes=10-").is_err());
6332 assert!(parse_copy_source_range("bytes=-10").is_err());
6333 }
6334
6335 // v0.7 #49: safe_object_uri must round-trip every legal S3 key
6336 // (which includes spaces, slashes, control chars, raw UTF-8) into
6337 // a parseable `http::Uri` instead of panicking like the previous
6338 // `format!(...).parse().unwrap()` call sites did.
6339
6340 #[test]
6341 fn safe_object_uri_basic_ascii() {
6342 let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
6343 assert_eq!(uri.path(), "/bucket/key");
6344 }
6345
6346 #[test]
6347 fn safe_object_uri_encodes_spaces() {
6348 let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
6349 // RFC 3986 path-segment encoding turns ' ' into %20.
6350 assert!(
6351 uri.path().contains("%20"),
6352 "expected percent-encoded space, got {}",
6353 uri.path()
6354 );
6355 assert!(uri.path().starts_with("/bucket/"));
6356 }
6357
6358 #[test]
6359 fn safe_object_uri_preserves_slashes() {
6360 // S3 keys legally contain '/' as a logical path separator —
6361 // the helper must NOT escape it (otherwise the synthetic URI
6362 // changes the perceived hierarchy).
6363 let uri =
6364 safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
6365 assert_eq!(uri.path(), "/bucket/key/with/slashes");
6366 }
6367
6368 #[test]
6369 fn safe_object_uri_handles_newline_without_panic() {
6370 // Newlines are control chars in URIs; whether the result is
6371 // Ok (encoded as %0A) or Err (parse rejects), the helper
6372 // MUST NOT panic. Either outcome is acceptable.
6373 let _ = safe_object_uri("bucket", "key\n");
6374 }
6375
6376 #[test]
6377 fn safe_object_uri_handles_null_byte_without_panic() {
6378 let _ = safe_object_uri("bucket", "key\0bad");
6379 }
6380
6381 #[test]
6382 fn safe_object_uri_handles_unicode_without_panic() {
6383 // RTL override, BOM, plain Japanese — none should panic.
6384 let _ = safe_object_uri("bucket", "rtl\u{202E}override");
6385 let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
6386 let _ = safe_object_uri("bucket", "日本語キー");
6387 }
6388
6389 #[test]
6390 fn safe_object_uri_no_panic_for_every_byte() {
6391 // Exhaustive byte coverage: 0x00..=0xFF as a 1-byte key.
6392 // None of these may panic. (0x80..=0xFF are not valid UTF-8
6393 // by themselves; we go through `String::from_utf8_lossy` so
6394 // the helper sees a real `&str` regardless of the raw byte.)
6395 for b in 0u8..=255 {
6396 let s = String::from_utf8_lossy(&[b]).into_owned();
6397 let _ = safe_object_uri("bucket", &s);
6398 }
6399 }
6400
6401 /// v0.8.1 #58: smoke test for the DEK-handling shape used by the
6402 /// SSE-KMS branches of `put_object` and `complete_multipart_upload`.
6403 /// Mirrors the call pattern (generate_dek → length check → copy
6404 /// into stack `[u8; 32]` → reborrow as `&[u8; 32]` for `SseSource`)
6405 /// without spinning up a full `S4Service`.
6406 ///
6407 /// The real assertion this guards against is a regression where
6408 /// the `Zeroizing` wrapper is accidentally dropped before the
6409 /// stack copy lands (e.g. someone refactors to use
6410 /// `let dek = kms.generate_dek(...).await?.0; drop(dek); ...`)
6411 /// or where `&**dek` is rewritten in a way that doesn't compile.
6412 #[tokio::test]
6413 async fn kms_dek_lifetime_within_function_scope() {
6414 use crate::kms::{KmsBackend, LocalKms};
6415 use std::collections::HashMap;
6416 use std::path::PathBuf;
6417 use zeroize::Zeroizing;
6418
6419 let mut keks = HashMap::new();
6420 keks.insert("scope".to_string(), [33u8; 32]);
6421 let kms = LocalKms::from_keks(PathBuf::from("/tmp/kms-scope-test"), keks);
6422
6423 // Mirror the put_object KMS branch shape exactly.
6424 let (dek, wrapped) = kms.generate_dek("scope").await.unwrap();
6425 assert_eq!(dek.len(), 32);
6426 let mut dek_arr: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
6427 dek_arr.copy_from_slice(&dek);
6428
6429 // The reborrow used at the SseSource construction site —
6430 // mirrors the call-site pattern where `let dek_ref: &[u8; 32]`
6431 // auto-derefs from a `Zeroizing<[u8; 32]>` reference.
6432 let dek_ref: &[u8; 32] = &dek_arr;
6433 // Sanity: the reborrow points at the same bytes.
6434 assert_eq!(dek_ref, &*dek_arr);
6435 // Wrapped key id flows through unchanged.
6436 assert_eq!(wrapped.key_id, "scope");
6437
6438 // At end of scope, both `dek` (Zeroizing<Vec<u8>>) and
6439 // `dek_arr` (Zeroizing<[u8; 32]>) are dropped, wiping the
6440 // backing memory. Cannot directly assert the wipe (would be
6441 // UB to read freed memory), so this test instead enforces
6442 // that the call shape compiles and executes; the wipe itself
6443 // is exercised by the `zeroize` crate's own test suite.
6444 }
6445}