s4_server/service.rs
1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//! `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//! `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//! `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//! `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//! を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//! 複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//! manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//! manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//! Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//! Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39 FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40 write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry, CompressTelemetry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47 bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50 cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51 supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.8 #55: stamp the GPU pipeline metrics (`s4_gpu_compress_seconds`,
58/// `s4_gpu_throughput_bytes_per_sec`, `s4_gpu_oom_total`) from a
59/// `CompressTelemetry` returned by `CodecRegistry::compress_with_telemetry`.
60/// CPU codecs (`gpu_seconds = None`) are no-ops here — they're already
61/// covered by the existing `s4_request_latency_seconds` / `s4_bytes_*`
62/// counters in the request-level `record_put` / `record_get` calls.
63#[inline]
64fn stamp_gpu_compress_telemetry(tel: &CompressTelemetry) {
65 if let Some(secs) = tel.gpu_seconds {
66 crate::metrics::record_gpu_compress(tel.codec, secs, tel.bytes_in, tel.bytes_out);
67 }
68 if tel.oom {
69 crate::metrics::record_gpu_oom(tel.codec);
70 }
71}
72
73/// v0.7 #49: percent-encoding set covering everything that is **not** an
74/// `unreserved` character per RFC 3986 §2.3, **plus** we additionally
75/// encode the path-reserved sub-delims that `http::Uri` rejects in a
76/// path segment (`?`, `#`, `%`, control bytes, space, etc.). We
77/// deliberately keep `/` un-encoded because S3 keys legally use `/` as
78/// a logical separator and the rest of the synthetic URI relies on the
79/// path layout `/{bucket}/{key}` round-tripping byte-for-byte.
80const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
81 .add(b' ')
82 .add(b'"')
83 .add(b'#')
84 .add(b'<')
85 .add(b'>')
86 .add(b'?')
87 .add(b'`')
88 .add(b'{')
89 .add(b'}')
90 .add(b'|')
91 .add(b'\\')
92 .add(b'^')
93 .add(b'[')
94 .add(b']')
95 .add(b'%');
96
97/// v0.7 #49: build the synthetic `/{bucket}/{key}` request URI used by
98/// the sidecar / replication helpers when they re-enter the backend
99/// trait without going through the HTTP layer. S3 object keys can
100/// contain spaces, control bytes, and arbitrary Unicode that would
101/// make `format!(...).parse::<http::Uri>()` panic; we percent-encode
102/// the key bytes (RFC 3986 path segment) and the bucket name (defensive
103/// — bucket names are normally DNS-safe, but the helper is the single
104/// choke-point) before splicing them in. If the encoded form *still*
105/// fails to parse (extremely unlikely once everything outside the
106/// unreserved set is escaped) we surface a typed `400 InvalidObjectName`
107/// instead of crashing the worker.
108pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
109 use percent_encoding::utf8_percent_encode;
110 let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
111 let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
112 let raw = format!("/{bucket_enc}/{key_enc}");
113 raw.parse::<http::Uri>().map_err(|e| {
114 // S3 spec uses `InvalidObjectName` (HTTP 400) for keys that
115 // can't be represented in a request URI. The generated
116 // `S3ErrorCode` enum doesn't expose a typed variant for it,
117 // so we round-trip through `from_bytes` which preserves the
118 // canonical wire string while falling back to InvalidArgument
119 // if even that lookup fails (cannot happen at runtime — kept
120 // as a belt-and-suspenders branch so this helper never
121 // panics).
122 let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
123 .unwrap_or(S3ErrorCode::InvalidArgument);
124 S3Error::with_message(
125 code,
126 format!("object key cannot be encoded as a request URI: {e}"),
127 )
128 })
129}
130
131/// v0.4 #20: captured at the start of a handler, before the request is
132/// consumed by the backend call, so the matching `record_access` at
133/// end-of-request can fill in the structured access log entry.
134struct AccessLogPreamble {
135 remote_ip: Option<String>,
136 requester: Option<String>,
137 request_uri: String,
138 user_agent: Option<String>,
139}
140
141pub struct S4Service<B: S3> {
142 /// Wrapped in `Arc` so the v0.6 #40 cross-bucket replication
143 /// dispatcher can clone it into a detached `tokio::spawn` task
144 /// (Arc::clone is cheap; backend trait methods take `&self` so no
145 /// other handler is affected by the indirection).
146 backend: Arc<B>,
147 registry: Arc<CodecRegistry>,
148 dispatcher: Arc<dyn CodecDispatcher>,
149 max_body_bytes: usize,
150 policy: Option<crate::policy::SharedPolicy>,
151 /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
152 /// to `true` when the listener is wrapped in TLS (or ACME), so policies
153 /// gating "deny if not over TLS" can do their job. Defaults to `false`
154 /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
155 secure_transport: bool,
156 /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
157 rate_limits: Option<crate::rate_limit::SharedRateLimits>,
158 /// v0.4 #20: optional S3-style access log emitter.
159 access_log: Option<crate::access_log::SharedAccessLog>,
160 /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
161 /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
162 /// (with the keyring's active key id) after the compress + framing
163 /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
164 /// frame parsing. A `with_sse_key(...)` call wraps the supplied
165 /// key in a 1-slot keyring so single-key (v0.4) operators get the
166 /// same behaviour they had before, just on the v2 frame.
167 sse_keyring: Option<crate::sse::SharedSseKeyring>,
168 /// v0.5 #34: optional first-class versioning state machine. When
169 /// `Some(...)`, S4-server itself owns the per-bucket versioning
170 /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
171 /// list_object_versions / get_bucket_versioning /
172 /// put_bucket_versioning handlers consult the manager instead of
173 /// passing through. When `None` (default), the legacy
174 /// backend-passthrough behaviour applies so existing v0.4
175 /// deployments are unaffected until they explicitly call
176 /// `with_versioning(...)`.
177 versioning: Option<Arc<crate::versioning::VersioningManager>>,
178 /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
179 /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
180 /// generate a fresh DEK via the backend, encrypt the body with it
181 /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
182 /// S4E4 unwrap the DEK through the same backend before decrypt.
183 /// `kms_default_key_id` is used when the request omits an explicit
184 /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
185 /// bucket-default behaviour).
186 kms: Option<Arc<dyn crate::kms::KmsBackend>>,
187 kms_default_key_id: Option<String>,
188 /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
189 /// `Some(...)`, `delete_object` and overwrite-style `put_object`
190 /// consult the manager and refuse the operation with HTTP 403
191 /// `AccessDenied` while the object is locked (Compliance until
192 /// expiry, Governance unless the bypass header is set, or any time
193 /// a legal hold is on). PUT also auto-applies the bucket-default
194 /// retention to brand-new objects when configured. When `None`
195 /// (default), the legacy backend-passthrough behaviour applies, so
196 /// existing v0.4 deployments are unaffected until they explicitly
197 /// call `with_object_lock(...)`.
198 object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
199 /// v0.6 #38: optional first-class CORS bucket configuration manager.
200 /// When `Some(...)`, S4-server itself owns per-bucket CORS rules and
201 /// `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
202 /// consult the manager instead of passing through to the backend.
203 /// `handle_preflight` (public method on `S4Service`) routes OPTIONS-
204 /// style preflight matching through the same store; the actual HTTP
205 /// OPTIONS routing wire-up at the listener level is a follow-up
206 /// (s3s framework does not surface OPTIONS as a typed handler).
207 cors: Option<Arc<crate::cors::CorsManager>>,
208 /// v0.6 #36: optional first-class S3 Inventory manager. When
209 /// `Some(...)`, S4-server itself owns per-(bucket, id) inventory
210 /// configurations and `put_bucket_inventory_configuration` /
211 /// `get_bucket_inventory_configuration` /
212 /// `list_bucket_inventory_configurations` /
213 /// `delete_bucket_inventory_configuration` consult the manager
214 /// instead of passing through to the backend. The actual periodic
215 /// CSV emission is driven by a tokio task in `main.rs` that calls
216 /// `InventoryManager::run_once_for_test` on a fixed cadence; the
217 /// service handlers below only deal with config-level CRUD.
218 inventory: Option<Arc<crate::inventory::InventoryManager>>,
219 /// v0.6 #35: optional first-class S3 bucket-notification manager.
220 /// When `Some(...)`, S4-server itself owns per-bucket notification
221 /// configurations and `put_bucket_notification_configuration` /
222 /// `get_bucket_notification_configuration` consult the manager
223 /// instead of passing through to the backend. Successful PUT /
224 /// DELETE handlers fire matching destinations on a detached tokio
225 /// task (best-effort; see `crate::notifications::dispatch_event`).
226 notifications: Option<Arc<crate::notifications::NotificationManager>>,
227 /// v0.6 #37: optional first-class S3 Lifecycle configuration
228 /// manager. When `Some(...)`, S4-server itself owns per-bucket
229 /// lifecycle rules and `put_bucket_lifecycle_configuration` /
230 /// `get_bucket_lifecycle_configuration` /
231 /// `delete_bucket_lifecycle` consult the manager instead of
232 /// passing through to the backend. The actual background scanner
233 /// (list_objects_v2 -> evaluate -> delete / metadata-rewrite per
234 /// rule) is a v0.7+ follow-up; the test path
235 /// `S4Service::run_lifecycle_once_for_test` exercises the
236 /// evaluator end-to-end so this v0.6 #37 wiring is enough to ship
237 /// the configuration-management half without putting a
238 /// half-wired bucket-walk in front of users.
239 lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
240 /// v0.6 #39: optional first-class object + bucket Tagging manager.
241 /// When `Some(...)`, S4-server itself owns per-(bucket, key) and
242 /// per-bucket tag state — `PutObjectTagging` /
243 /// `GetObjectTagging` / `DeleteObjectTagging` /
244 /// `PutBucketTagging` / `GetBucketTagging` /
245 /// `DeleteBucketTagging` route through the manager (replacing the
246 /// previous backend-passthrough behaviour). `put_object` also
247 /// pre-parses the `x-amz-tagging` header / `Tagging` input field
248 /// so the IAM policy evaluator can gate on
249 /// `s3:RequestObjectTag/<key>` and `s3:ExistingObjectTag/<key>`.
250 /// On a successful PUT the parsed tags are persisted; on a
251 /// successful DELETE the matching tag entry is dropped.
252 tagging: Option<Arc<crate::tagging::TagManager>>,
253 /// v0.6 #40: optional first-class cross-bucket replication manager.
254 /// When `Some(...)`, S4-server itself owns per-bucket replication
255 /// rules; `PutBucketReplication` / `GetBucketReplication` /
256 /// `DeleteBucketReplication` route through the manager (replacing
257 /// the previous backend-passthrough behaviour). On every successful
258 /// `put_object` the manager's rule list is consulted; the
259 /// highest-priority matching enabled rule wins, the per-key status
260 /// is recorded as `Pending`, and the source body and metadata are
261 /// handed to a detached tokio task that PUTs to the destination
262 /// bucket through the same backend. The replica is stamped with
263 /// `x-amz-replication-status: REPLICA` in its metadata; the
264 /// source-side status is updated to `Completed` on success or
265 /// `Failed` after the 3-attempt retry budget is exhausted (drop
266 /// counter bumps in either-side case so dashboards see the loss).
267 /// `head_object` / `get_object` echo the recorded status back as
268 /// `x-amz-replication-status` so consumers can poll progress.
269 /// Limited to single-instance (same `S4Service`) replication; true
270 /// cross-region (multi-instance) is a v0.7+ follow-up.
271 replication: Option<Arc<crate::replication::ReplicationManager>>,
272 /// v0.6 #42: optional MFA-Delete enforcement layer. When `Some(...)`,
273 /// every DELETE / DELETE-version / delete-marker / `PutBucketVersioning`
274 /// request against a bucket whose MFA-Delete state is `Enabled`
275 /// must carry `x-amz-mfa: <serial> <code>` (RFC 6238 6-digit TOTP);
276 /// missing or invalid tokens return HTTP 403 `AccessDenied`. When
277 /// `None` (default), the gate is a no-op so existing v0.4 / v0.5
278 /// deployments are unaffected until they explicitly call
279 /// `with_mfa_delete(...)`.
280 mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
281 /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
282 /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
283 /// or be matched against a configured server-managed keyring/KMS).
284 /// Set by `--compliance-mode strict` after the boot-time
285 /// prerequisite check passes.
286 compliance_strict: bool,
287 /// v0.7 #47: optional SigV4a (asymmetric ECDSA-P256-SHA256) verify
288 /// gate. When `Some(...)`, the listener-side middleware (see
289 /// [`crate::routing::try_sigv4a_verify`]) inspects every incoming
290 /// request and short-circuits SigV4a-signed ones — verifying the
291 /// signature against the credential store and returning 403
292 /// `SignatureDoesNotMatch` / `InvalidAccessKeyId` on failure. Plain
293 /// SigV4 (HMAC-SHA256) requests pass through to s3s untouched. When
294 /// `None`, the middleware is a no-op so the existing SigV4 path is
295 /// unaffected (operators opt in via `--sigv4a-credentials <DIR>`).
296 sigv4a_gate: Option<Arc<SigV4aGate>>,
297 /// v0.8 #54 BUG-5..10: per-`upload_id` side-table that ferries the
298 /// SSE / Tagging / Object-Lock context captured at
299 /// `CreateMultipartUpload` time through to `UploadPart` /
300 /// `CompleteMultipartUpload`. Always-on (no `with_*` flag) — the
301 /// store is gateway-internal and idle when no multipart is in
302 /// flight. See [`crate::multipart_state`] for rationale.
303 multipart_state: Arc<crate::multipart_state::MultipartStateStore>,
304 /// v0.8 #52: plaintext bytes per S4E5 chunk on the SSE-S4 PUT
305 /// path. `0` (default) → use the legacy buffered S4E2 path
306 /// (whole-body AES-GCM tag, GET buffers + verifies before
307 /// emitting). Non-zero → use the chunked S4E5 frame so GET can
308 /// stream-decrypt chunk-by-chunk. Wired by `--sse-chunk-size`
309 /// in `main.rs`. SSE-C and SSE-KMS are intentionally unaffected
310 /// (chunked variants tracked in a follow-up issue).
311 sse_chunk_size: usize,
312}
313
314impl<B: S3> S4Service<B> {
315 /// AWS S3 単発 PUT の API 上限 (5 GiB)
316 pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
317
318 pub fn new(
319 backend: B,
320 registry: Arc<CodecRegistry>,
321 dispatcher: Arc<dyn CodecDispatcher>,
322 ) -> Self {
323 Self {
324 backend: Arc::new(backend),
325 registry,
326 dispatcher,
327 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
328 policy: None,
329 secure_transport: false,
330 rate_limits: None,
331 access_log: None,
332 sse_keyring: None,
333 versioning: None,
334 kms: None,
335 kms_default_key_id: None,
336 object_lock: None,
337 cors: None,
338 inventory: None,
339 notifications: None,
340 lifecycle: None,
341 tagging: None,
342 replication: None,
343 mfa_delete: None,
344 compliance_strict: false,
345 sigv4a_gate: None,
346 multipart_state: Arc::new(crate::multipart_state::MultipartStateStore::new()),
347 // v0.8 #52: chunked SSE-S4 disabled by default — opt
348 // in via `S4Service::with_sse_chunk_size(...)` /
349 // `--sse-chunk-size <BYTES>`. Default keeps the legacy
350 // S4E2 buffered path so existing deployments are
351 // bit-for-bit unchanged.
352 sse_chunk_size: 0,
353 }
354 }
355
356 /// v0.7 #47: attach the SigV4a verify gate. Once set, the
357 /// listener-side middleware (`crate::routing::try_sigv4a_verify`)
358 /// short-circuits any incoming `AWS4-ECDSA-P256-SHA256` request,
359 /// verifying it against the supplied credential store and
360 /// returning 403 on failure. Plain SigV4 (HMAC-SHA256) requests
361 /// are unaffected. When the gate is unset (default), the
362 /// middleware skips entirely so existing SigV4 deployments keep
363 /// working.
364 #[must_use]
365 pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
366 self.sigv4a_gate = Some(gate);
367 self
368 }
369
370 /// v0.7 #47: borrow the attached SigV4a gate. Used by `main.rs`
371 /// to snapshot the gate `Arc` before the s3s `ServiceBuilder`
372 /// consumes the `S4Service` (the listener-side middleware needs
373 /// the same `Arc` because s3s' SigV4 verifier rejects SigV4a
374 /// algorithm tokens with "unknown algorithm" — match has to
375 /// happen at the hyper layer instead).
376 #[must_use]
377 pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
378 self.sigv4a_gate.as_ref()
379 }
380
381 /// v0.8.2 #62: borrow the multipart state store so `main.rs` can
382 /// snapshot the `Arc` before the s3s `ServiceBuilder` consumes
383 /// the `S4Service`. The background `sweep_stale` task in `main.rs`
384 /// holds this `Arc` and ticks once an hour to drop abandoned
385 /// upload contexts (and their `Zeroizing<[u8; 32]>` SSE-C keys).
386 #[must_use]
387 pub fn multipart_state(&self) -> &Arc<crate::multipart_state::MultipartStateStore> {
388 &self.multipart_state
389 }
390
391 /// v0.6 #39: attach the in-memory object + bucket Tagging manager.
392 /// Once set, `Put/Get/Delete` `Object/Bucket Tagging` route
393 /// through the manager (instead of forwarding to the backend),
394 /// and `put_object`'s `x-amz-tagging` parse path becomes the
395 /// source of `s3:RequestObjectTag/<key>` for the IAM policy
396 /// evaluator. The manager itself is shared via `Arc`.
397 #[must_use]
398 pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
399 self.tagging = Some(mgr);
400 self
401 }
402
403 /// v0.6 #39: borrow the attached tagging manager (test /
404 /// introspection — the snapshotter in `main.rs`, when wired,
405 /// will keep its own `Arc` clone).
406 #[must_use]
407 pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
408 self.tagging.as_ref()
409 }
410
411 /// v0.6 #36: attach the in-memory S3 Inventory manager. Once set,
412 /// `put_bucket_inventory_configuration` /
413 /// `get_bucket_inventory_configuration` /
414 /// `list_bucket_inventory_configurations` /
415 /// `delete_bucket_inventory_configuration` route through the
416 /// manager. The actual periodic CSV / manifest emission is
417 /// orchestrated by a tokio task started in `main.rs`; the manager
418 /// itself is shared between the handler and the scheduler via
419 /// `Arc`.
420 #[must_use]
421 pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
422 self.inventory = Some(mgr);
423 self
424 }
425
426 /// v0.6 #36: borrow the attached inventory manager (test /
427 /// introspection — the background scheduler in `main.rs` keeps its
428 /// own `Arc` clone, so this accessor is for the test path that
429 /// invokes `run_once_for_test` directly).
430 #[must_use]
431 pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
432 self.inventory.as_ref()
433 }
434
435 /// v0.6 #37: attach the in-memory S3 Lifecycle configuration
436 /// manager. Once set, `put_bucket_lifecycle_configuration` /
437 /// `get_bucket_lifecycle_configuration` / `delete_bucket_lifecycle`
438 /// route through the manager (replacing the previous backend-
439 /// passthrough behaviour). The actual periodic scanner that walks
440 /// the source bucket and invokes Expiration / Transition /
441 /// NoncurrentExpiration actions is a v0.7+ follow-up — see
442 /// [`Self::run_lifecycle_once_for_test`] for the in-memory test
443 /// path that exercises the evaluator end-to-end.
444 #[must_use]
445 pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
446 self.lifecycle = Some(mgr);
447 self
448 }
449
450 /// v0.6 #37: borrow the attached lifecycle manager (test /
451 /// introspection — the background scheduler in `main.rs` keeps its
452 /// own `Arc` clone, so this accessor is for the test path that
453 /// invokes the evaluator directly).
454 #[must_use]
455 pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
456 self.lifecycle.as_ref()
457 }
458
459 /// v0.6 #37: synchronous test entry that runs the lifecycle evaluator
460 /// against a caller-provided list of `(key, age, size, tags)` tuples
461 /// and returns the `(key, action)` pairs that should fire. The actual
462 /// backend invocation (S3.delete_object / metadata rewrite) is left
463 /// to the caller — the unit + E2E tests use this to verify the
464 /// evaluator without spawning the (deferred) background scanner.
465 /// Returns an empty `Vec` when no lifecycle manager is attached or
466 /// no rule matches.
467 #[must_use]
468 pub fn run_lifecycle_once_for_test(
469 &self,
470 bucket: &str,
471 objects: &[crate::lifecycle::EvaluateBatchEntry],
472 ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
473 let Some(mgr) = self.lifecycle.as_ref() else {
474 return Vec::new();
475 };
476 crate::lifecycle::evaluate_batch(mgr, bucket, objects)
477 }
478
479 /// v0.6 #35: attach the in-memory bucket-notification manager. Once
480 /// set, `put_bucket_notification_configuration` /
481 /// `get_bucket_notification_configuration` route through the manager
482 /// (replacing the previous backend-passthrough behaviour); successful
483 /// `put_object` / `delete_object` calls fire matching destinations
484 /// on a detached tokio task via
485 /// `crate::notifications::dispatch_event` (best-effort, fire-and-
486 /// forget — failures bump the manager's `dropped_total` counter and
487 /// log at warn but do NOT fail the originating S3 request).
488 #[must_use]
489 pub fn with_notifications(
490 mut self,
491 mgr: Arc<crate::notifications::NotificationManager>,
492 ) -> Self {
493 self.notifications = Some(mgr);
494 self
495 }
496
497 /// v0.6 #35: borrow the attached notifications manager (test /
498 /// introspection — used by the metrics layer to read
499 /// `dropped_total`).
500 #[must_use]
501 pub fn notifications_manager(
502 &self,
503 ) -> Option<&Arc<crate::notifications::NotificationManager>> {
504 self.notifications.as_ref()
505 }
506
507 /// v0.6 #35: internal helper used by the DELETE handlers to fire a
508 /// matching notification on a detached tokio task. No-op when no
509 /// manager is attached or no rule on the bucket matches the given
510 /// (event, key) tuple.
511 fn fire_delete_notification(
512 &self,
513 bucket: &str,
514 key: &str,
515 event: crate::notifications::EventType,
516 version_id: Option<String>,
517 ) {
518 let Some(mgr) = self.notifications.as_ref() else {
519 return;
520 };
521 let dests = mgr.match_destinations(bucket, &event, key);
522 if dests.is_empty() {
523 return;
524 }
525 tokio::spawn(crate::notifications::dispatch_event(
526 Arc::clone(mgr),
527 bucket.to_owned(),
528 key.to_owned(),
529 event,
530 None,
531 None,
532 version_id,
533 format!("S4-{}", uuid::Uuid::new_v4()),
534 ));
535 }
536
537 /// v0.6 #40: attach the in-memory cross-bucket replication manager.
538 /// Once set, `put_bucket_replication` / `get_bucket_replication` /
539 /// `delete_bucket_replication` route through the manager (replacing
540 /// the previous backend-passthrough behaviour); a successful
541 /// `put_object` whose key matches an enabled rule fires a detached
542 /// tokio task that PUTs the same body + metadata to the rule's
543 /// destination bucket, stamping the replica with
544 /// `x-amz-replication-status: REPLICA`. Failures after the retry
545 /// budget bump the manager's `dropped_total` counter and are
546 /// surfaced in the `s4_replication_dropped_total` Prometheus
547 /// counter; successes bump `s4_replication_replicated_total`.
548 #[must_use]
549 pub fn with_replication(
550 mut self,
551 mgr: Arc<crate::replication::ReplicationManager>,
552 ) -> Self {
553 self.replication = Some(mgr);
554 self
555 }
556
557 /// v0.6 #40: borrow the attached replication manager (test /
558 /// introspection — used by the metrics layer to read
559 /// `dropped_total`).
560 #[must_use]
561 pub fn replication_manager(
562 &self,
563 ) -> Option<&Arc<crate::replication::ReplicationManager>> {
564 self.replication.as_ref()
565 }
566
567 /// v0.6 #40: internal helper used by the PUT handlers to fire a
568 /// detached cross-bucket replication task. No-op when no manager
569 /// is attached, the source backend PUT failed, or no rule on the
570 /// source bucket matches the (key, tags) tuple. The `body` is the
571 /// post-compression / post-encryption `Bytes` that was sent to
572 /// the source backend (refcount-cloned), and `metadata` is the
573 /// metadata map that already includes the manifest /
574 /// `s4-encrypted` markers — the replica decodes through the same
575 /// path. The destination PUT runs through `Arc<B>::put_object`.
576 ///
577 /// ## v0.8.2 #61: generation token + shadow-key destination
578 ///
579 /// `pending_version` is the source-side `PutOutcome` minted by the
580 /// caller's versioning branch (or `None` for unversioned /
581 /// suspended buckets). When `pending_version.versioned_response`
582 /// is `true`, the dispatcher writes the destination under the same
583 /// shadow path the source uses (`<key>.__s4ver__/<vid>`) so the
584 /// destination's version chain receives the new version the same
585 /// way `?versionId=` GET resolves it. Closes audit C-1.
586 ///
587 /// The dispatcher also mints a fresh `generation` token before
588 /// spawning, threaded through to [`crate::replication::
589 /// replicate_object`]. Closes audit C-3 — a stale retry of an
590 /// older PUT can no longer overwrite the destination's newer bytes
591 /// because the CAS guard sees the higher stored generation and
592 /// drops its destination write.
593 ///
594 /// ## Asymmetric versioning policy (out of scope)
595 ///
596 /// We assume source + destination buckets share the same
597 /// versioning policy (both Enabled or both Suspended /
598 /// Unversioned). Cross-bucket policy queries would require a
599 /// backend round-trip per replication, which is not worth it for
600 /// the single-instance scope. Operators who configure asymmetric
601 /// versioning will see destination-side `?versionId=` lookups
602 /// miss — documented as out-of-scope until a future per-rule
603 /// `destination_versioning_policy` knob lands.
604 // 8 args is the post-#61 shape: replication needs the
605 // source bucket+key, the canonical tag set for rule-matching,
606 // the post-codec body+metadata for the destination PUT, the
607 // backend-success gate, and the pending version-id for the
608 // shadow-key destination override. A shape struct would just
609 // split the (single) call site so opt for the inline form.
610 #[allow(clippy::too_many_arguments)]
611 fn spawn_replication_if_matched(
612 &self,
613 source_bucket: &str,
614 source_key: &str,
615 request_tags: &Option<crate::tagging::TagSet>,
616 body: &bytes::Bytes,
617 metadata: &Option<std::collections::HashMap<String, String>>,
618 backend_ok: bool,
619 pending_version: Option<&crate::versioning::PutOutcome>,
620 ) where
621 B: Send + Sync + 'static,
622 {
623 if !backend_ok {
624 return;
625 }
626 let Some(mgr) = self.replication.as_ref() else {
627 return;
628 };
629 // Pull the request's tags into the (k, v) shape the matcher
630 // expects. The tagging manager would have the canonical
631 // post-PUT view but at this point in the pipeline it's
632 // already been written above; for the rule-match decision
633 // the request's tags are sufficient (= the tags this PUT
634 // applies, S3 PutObject is full-replace on tags).
635 let object_tags: Vec<(String, String)> = request_tags
636 .as_ref()
637 .map(|ts| ts.iter().cloned().collect())
638 .unwrap_or_default();
639 let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
640 return;
641 };
642 // v0.8.2 #61: mint the per-PUT generation BEFORE the eager
643 // Pending stamp so the stamp itself carries the right
644 // generation (the CAS in `record_status_if_newer` would
645 // otherwise see a `generation=0` Pending and accept any
646 // stale retry).
647 let generation = mgr.next_generation();
648 // Eagerly mark the source key as Pending so a HEAD between
649 // the source PUT returning and the spawned task completing
650 // surfaces the in-flight state. CAS-guarded so a slower
651 // older PUT can't downgrade a newer Completed back to Pending.
652 let _ = mgr.record_status_if_newer(
653 source_bucket,
654 source_key,
655 generation,
656 crate::replication::ReplicationStatus::Pending,
657 );
658 // v0.8.2 #61: derive the destination storage key. For a
659 // versioning-Enabled source the destination receives the
660 // same shadow-key path so a `?versionId=<vid>` GET on the
661 // destination resolves through the same lookup the source
662 // uses. Suspended / Unversioned sources keep the logical
663 // key (= `None` override = dispatcher uses `source_key`).
664 let destination_key_override = pending_version
665 .filter(|pv| pv.versioned_response)
666 .map(|pv| versioned_shadow_key(source_key, &pv.version_id));
667 // v0.8.3 #68 (audit M-1): capture the source object's Object
668 // Lock state so the dispatcher can decorate the destination
669 // PUT with the matching AWS-wire lock headers. Without this,
670 // a Compliance / Governance / legal-hold protected source
671 // would replicate to a destination where DELETE succeeds
672 // (the WORM posture would only hold on the source).
673 let source_lock_state = self
674 .object_lock
675 .as_ref()
676 .and_then(|mgr| mgr.get(source_bucket, source_key));
677 // v0.8.3 #68: hand the destination-side ObjectLockManager to
678 // the dispatcher closure so we can persist the propagated
679 // lock state on successful destination PUT (the destination
680 // PUT below bypasses S4Service::put_object — we drive the
681 // backend directly — so the explicit_lock_mode commit block
682 // in put_object never fires for replicas. We replay it here
683 // against the destination key.)
684 let dest_lock_mgr = self.object_lock.as_ref().map(Arc::clone);
685 let mgr_cl = Arc::clone(mgr);
686 let backend = Arc::clone(&self.backend);
687 let body_cl = body.clone();
688 let metadata_cl = metadata.clone();
689 let source_bucket_cl = source_bucket.to_owned();
690 let source_key_cl = source_key.to_owned();
691 let source_lock_state_for_closure = source_lock_state.clone();
692 let source_bucket_for_warn = source_bucket.to_owned();
693 tokio::spawn(async move {
694 let do_put = move |dest_bucket: String,
695 dest_key: String,
696 dest_body: bytes::Bytes,
697 dest_meta: Option<std::collections::HashMap<String, String>>| {
698 let backend = Arc::clone(&backend);
699 let dest_lock_mgr = dest_lock_mgr.clone();
700 let lock_state = source_lock_state_for_closure.clone();
701 let warn_src = source_bucket_for_warn.clone();
702 async move {
703 let req = S3Request {
704 input: PutObjectInput {
705 bucket: dest_bucket.clone(),
706 key: dest_key.clone(),
707 body: Some(bytes_to_blob(dest_body)),
708 metadata: dest_meta,
709 ..Default::default()
710 },
711 method: http::Method::PUT,
712 uri: "/".parse().unwrap(),
713 headers: http::HeaderMap::new(),
714 extensions: http::Extensions::new(),
715 credentials: None,
716 region: None,
717 service: None,
718 trailing_headers: None,
719 };
720 let put_result = backend
721 .put_object(req)
722 .await
723 .map(|_| ())
724 .map_err(|e| format!("destination put_object: {e}"));
725 // v0.8.3 #68: on successful destination PUT,
726 // persist the propagated lock state into the
727 // destination's ObjectLockManager so a subsequent
728 // DELETE on the destination is refused. Three cases:
729 // - PUT failed → skip (no replica to protect)
730 // - lock_state None → nothing to propagate
731 // - dest manager None (operator misconfig)
732 // → log warn-once + bump skip metric
733 if put_result.is_ok()
734 && let Some(state) = lock_state
735 {
736 match dest_lock_mgr {
737 Some(ref mgr) => {
738 mgr.set(&dest_bucket, &dest_key, state);
739 }
740 None => {
741 crate::replication::warn_lock_propagation_skipped(
742 &warn_src,
743 &dest_bucket,
744 );
745 }
746 }
747 }
748 put_result
749 }
750 };
751 crate::replication::replicate_object(
752 rule,
753 source_bucket_cl,
754 source_key_cl,
755 body_cl,
756 metadata_cl,
757 do_put,
758 mgr_cl,
759 generation,
760 destination_key_override,
761 source_lock_state,
762 )
763 .await;
764 });
765 }
766
767 /// v0.6 #42: attach the in-memory MFA-Delete enforcement manager.
768 /// Once set, every DELETE / DELETE-version / delete-marker /
769 /// `PutBucketVersioning` request against a bucket whose MFA-Delete
770 /// state is `Enabled` requires a valid `x-amz-mfa: <serial> <code>`
771 /// header (RFC 6238 6-digit TOTP); the gate is a no-op for buckets
772 /// where MFA-Delete is `Disabled` (S3 default).
773 #[must_use]
774 pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
775 self.mfa_delete = Some(mgr);
776 self
777 }
778
779 /// v0.6 #42: borrow the attached MFA-Delete manager (test /
780 /// introspection — used by the snapshot path in `main.rs` to call
781 /// `to_json` for restart-recoverable state).
782 #[must_use]
783 pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
784 self.mfa_delete.as_ref()
785 }
786
787 /// v0.6 #38: attach the in-memory CORS configuration manager. Once
788 /// set, `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
789 /// route through the manager instead of forwarding to the backend,
790 /// and [`Self::handle_preflight`] becomes useful for the (future)
791 /// listener-side OPTIONS interceptor.
792 #[must_use]
793 pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
794 self.cors = Some(mgr);
795 self
796 }
797
798 /// v0.6 #38: Borrow the attached CORS manager (test / introspection).
799 #[must_use]
800 pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
801 self.cors.as_ref()
802 }
803
804 /// v0.6 #38: evaluate a CORS preflight request against the bucket's
805 /// configured rules and, if a rule matches, return the headers that
806 /// the (future) listener-side OPTIONS interceptor must put on the
807 /// 200 response: `Access-Control-Allow-Origin`, `Access-Control-
808 /// Allow-Methods`, `Access-Control-Allow-Headers`, optionally
809 /// `Access-Control-Max-Age` and `Access-Control-Expose-Headers`.
810 ///
811 /// Returns `None` when no manager is attached, no config is
812 /// registered for the bucket, or no rule matches the (origin,
813 /// method, headers) triple. The caller is responsible for turning
814 /// `None` into the appropriate 403 response.
815 ///
816 /// **Note:** the OPTIONS routing itself (i.e. wiring this method
817 /// into the hyper-util listener path) is a follow-up — s3s does not
818 /// surface OPTIONS as a typed S3 handler, so this method is
819 /// currently call-able only from inside other handlers and tests.
820 #[must_use]
821 pub fn handle_preflight(
822 &self,
823 bucket: &str,
824 origin: &str,
825 method: &str,
826 request_headers: &[String],
827 ) -> Option<std::collections::HashMap<String, String>> {
828 let mgr = self.cors.as_ref()?;
829 let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
830 let mut h = std::collections::HashMap::new();
831 // Echo the matched origin back. If the rule used "*" we still
832 // echo "*" (S3 spec — the spec does not require us to echo the
833 // *requesting* origin when the wildcard matched).
834 let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
835 "*".to_string()
836 } else {
837 origin.to_string()
838 };
839 h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
840 h.insert(
841 "Access-Control-Allow-Methods".to_string(),
842 rule.allowed_methods.join(", "),
843 );
844 if !rule.allowed_headers.is_empty() {
845 // For the Allow-Headers response, echo back the rule's
846 // pattern list verbatim (S3 echoes the configured list,
847 // including "*" if present). Browsers honour exact-match
848 // rules.
849 h.insert(
850 "Access-Control-Allow-Headers".to_string(),
851 rule.allowed_headers.join(", "),
852 );
853 }
854 if let Some(secs) = rule.max_age_seconds {
855 h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
856 }
857 if !rule.expose_headers.is_empty() {
858 h.insert(
859 "Access-Control-Expose-Headers".to_string(),
860 rule.expose_headers.join(", "),
861 );
862 }
863 Some(h)
864 }
865
866 /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
867 /// SSE indicator (server-side encryption header or SSE-C customer
868 /// key); requests without one are rejected with 400 InvalidRequest.
869 /// Boot-time prerequisite checking lives in the binary
870 /// (`validate_compliance_mode`) so this flag is purely the runtime
871 /// switch.
872 #[must_use]
873 pub fn with_compliance_strict(mut self, on: bool) -> Self {
874 self.compliance_strict = on;
875 self
876 }
877
878 /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
879 /// manager. Once set, `delete_object` and overwrite-path
880 /// `put_object` refuse operations on locked keys with HTTP 403
881 /// `AccessDenied`; new PUTs to a bucket with a default retention
882 /// policy auto-create per-object lock state.
883 #[must_use]
884 pub fn with_object_lock(
885 mut self,
886 mgr: Arc<crate::object_lock::ObjectLockManager>,
887 ) -> Self {
888 self.object_lock = Some(mgr);
889 self
890 }
891
892 /// v0.7 #45: borrow the attached Object Lock manager (read-only —
893 /// the lifecycle scanner uses this to skip currently-locked objects
894 /// before issuing `delete_object`, since an Object Lock always wins
895 /// over Lifecycle Expiration in AWS S3 semantics). Mirrors the
896 /// shape of [`Self::lifecycle_manager`] /
897 /// [`Self::tag_manager`] — purely additive accessor, no handler
898 /// behaviour change.
899 #[must_use]
900 pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
901 self.object_lock.as_ref()
902 }
903
904 /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
905 /// when a PUT requests SSE-KMS without naming a specific KMS key
906 /// (operators set this to mirror AWS S3's bucket-default key).
907 #[must_use]
908 pub fn with_kms_backend(
909 mut self,
910 kms: Arc<dyn crate::kms::KmsBackend>,
911 default_key_id: Option<String>,
912 ) -> Self {
913 self.kms = Some(kms);
914 self.kms_default_key_id = default_key_id;
915 self
916 }
917
918 /// v0.5 #34: attach the first-class versioning state machine. Once
919 /// set, this `S4Service` owns the per-bucket versioning state +
920 /// per-(bucket, key) version chain; `put_object` / `get_object` /
921 /// `delete_object` / `list_object_versions` /
922 /// `get_bucket_versioning` / `put_bucket_versioning` consult the
923 /// manager instead of passing through to the backend. The backend
924 /// is still used as the byte store: Suspended / Unversioned buckets
925 /// keep using `<key>` directly (legacy), Enabled buckets redirect
926 /// each version's bytes to a shadow key
927 /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
928 /// PUTs to the same logical key.
929 #[must_use]
930 pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
931 self.versioning = Some(mgr);
932 self
933 }
934
935 /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
936 /// Internally wraps it in a 1-slot keyring with id=1 active, so
937 /// new objects ride the v0.5 S4E2 frame while previously-written
938 /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
939 /// fallback path. Operators wanting true rotation should call
940 /// [`Self::with_sse_keyring`] instead.
941 #[must_use]
942 pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
943 let keyring = crate::sse::SseKeyring::new(1, key);
944 self.sse_keyring = Some(std::sync::Arc::new(keyring));
945 self
946 }
947
948 /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
949 /// the active key (S4E2 frame stamped with that key's id); GET
950 /// dispatches on the body's magic — S4E1 falls back to trying every
951 /// key in the ring (active first) so v0.4 objects survive a
952 /// migration; S4E2 looks up the explicit key_id from the header.
953 #[must_use]
954 pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
955 self.sse_keyring = Some(keyring);
956 self
957 }
958
959 /// v0.8 #52: opt the SSE-S4 PUT path into the chunked S4E5 frame
960 /// (so the matching GET can stream-decrypt chunk-by-chunk
961 /// instead of buffering the entire body before tag verify).
962 /// `bytes` is the plaintext slice size — typically 1 MiB; 0
963 /// disables the path and reverts to the legacy S4E2 buffered
964 /// frame.
965 ///
966 /// SSE-C (S4E3) and SSE-KMS (S4E4) are intentionally untouched:
967 /// the chunked envelopes for those flows are a follow-up issue
968 /// (the customer-key wire surface needs separate version
969 /// negotiation).
970 ///
971 /// Has no effect when `with_sse_keyring` / `with_sse_key` is
972 /// not also set — the chunked path runs only on the SSE-S4
973 /// branch of `put_object`.
974 #[must_use]
975 pub fn with_sse_chunk_size(mut self, bytes: usize) -> Self {
976 self.sse_chunk_size = bytes;
977 self
978 }
979
980 /// v0.4 #20: attach an S3-style access-log emitter. Each completed
981 /// PUT / GET / DELETE / List handler emits one entry into the
982 /// emitter's buffer; a background flusher (started separately, see
983 /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
984 /// rotated `.log` files into the configured directory.
985 #[must_use]
986 pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
987 self.access_log = Some(log);
988 self
989 }
990
991 /// Capture the per-request access-log preamble before the request is
992 /// consumed by the backend call. Returns `None` if no access logger
993 /// is configured (cheap early-out so the handler doesn't pay the
994 /// header-clone cost when access logging is off).
995 fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
996 self.access_log.as_ref()?;
997 Some(AccessLogPreamble {
998 remote_ip: req
999 .headers
1000 .get("x-forwarded-for")
1001 .and_then(|v| v.to_str().ok())
1002 .and_then(|raw| raw.split(',').next())
1003 .map(|s| s.trim().to_owned()),
1004 requester: Self::principal_of(req).map(str::to_owned),
1005 request_uri: format!("{} {}", req.method, req.uri.path()),
1006 user_agent: req
1007 .headers
1008 .get("user-agent")
1009 .and_then(|v| v.to_str().ok())
1010 .map(str::to_owned),
1011 })
1012 }
1013
1014 /// Internal — called by handlers at end-of-request with a captured
1015 /// preamble. Best-effort: swallows the await fast (clones Arc +
1016 /// pushes), no error propagation back to the request path.
1017 #[allow(clippy::too_many_arguments)]
1018 async fn record_access(
1019 &self,
1020 preamble: Option<AccessLogPreamble>,
1021 operation: &'static str,
1022 bucket: &str,
1023 key: Option<&str>,
1024 http_status: u16,
1025 bytes_sent: u64,
1026 object_size: u64,
1027 total_time_ms: u64,
1028 error_code: Option<&str>,
1029 ) {
1030 let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
1031 return;
1032 };
1033 log.record(crate::access_log::AccessLogEntry {
1034 time: std::time::SystemTime::now(),
1035 bucket: bucket.to_owned(),
1036 remote_ip: p.remote_ip,
1037 requester: p.requester,
1038 operation,
1039 key: key.map(str::to_owned),
1040 request_uri: p.request_uri,
1041 http_status,
1042 error_code: error_code.map(str::to_owned),
1043 bytes_sent,
1044 object_size,
1045 total_time_ms,
1046 user_agent: p.user_agent,
1047 })
1048 .await;
1049 }
1050
1051 /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
1052 /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
1053 /// throttle-checked before the policy gate; throttled requests return
1054 /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
1055 /// `s4_rate_limit_throttled_total{principal,bucket}`.
1056 #[must_use]
1057 pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
1058 self.rate_limits = Some(rl);
1059 self
1060 }
1061
1062 /// Helper used by request handlers to apply the rate limit. Returns
1063 /// `Ok(())` when allowed (or no rate limiter is configured), or a
1064 /// `SlowDown` S3Error otherwise.
1065 fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
1066 let Some(rl) = self.rate_limits.as_ref() else {
1067 return Ok(());
1068 };
1069 let principal_id = Self::principal_of(req);
1070 if !rl.check(principal_id, bucket) {
1071 crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
1072 return Err(S3Error::with_message(
1073 S3ErrorCode::SlowDown,
1074 format!("rate-limited: bucket={bucket}"),
1075 ));
1076 }
1077 Ok(())
1078 }
1079
1080 /// Tell the policy evaluator that the listener is reached over TLS
1081 /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
1082 /// resolves to `true`. Defaults to `false`.
1083 #[must_use]
1084 pub fn with_secure_transport(mut self, on: bool) -> Self {
1085 self.secure_transport = on;
1086 self
1087 }
1088
1089 #[must_use]
1090 pub fn with_max_body_bytes(mut self, n: usize) -> Self {
1091 self.max_body_bytes = n;
1092 self
1093 }
1094
1095 /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
1096 /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
1097 /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
1098 /// When `None` (the default), no policy enforcement happens.
1099 #[must_use]
1100 pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
1101 self.policy = Some(policy);
1102 self
1103 }
1104
1105 /// Pull the SigV4 access key id off the request's credentials, if any.
1106 /// Used as the `principal_id` for policy evaluation.
1107 fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
1108 req.credentials.as_ref().map(|c| c.access_key.as_str())
1109 }
1110
1111 /// v0.3 #13: build the per-request policy context from the incoming
1112 /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
1113 /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
1114 /// production deployments are behind an LB / reverse proxy that sets
1115 /// this), `aws:CurrentTime` from the system clock, and
1116 /// `aws:SecureTransport` from the per-listener TLS flag.
1117 fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
1118 let user_agent = req
1119 .headers
1120 .get("user-agent")
1121 .and_then(|v| v.to_str().ok())
1122 .map(str::to_owned);
1123 // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
1124 // is the original client. Trim and parse leniently.
1125 let source_ip = req
1126 .headers
1127 .get("x-forwarded-for")
1128 .and_then(|v| v.to_str().ok())
1129 .and_then(|raw| raw.split(',').next())
1130 .and_then(|s| s.trim().parse().ok());
1131 crate::policy::RequestContext {
1132 source_ip,
1133 user_agent,
1134 request_time: Some(std::time::SystemTime::now()),
1135 secure_transport: self.secure_transport,
1136 existing_object_tags: None,
1137 request_object_tags: None,
1138 extra: Default::default(),
1139 }
1140 }
1141
1142 /// Helper used by request handlers to enforce the optional policy.
1143 /// Returns `Ok(())` when allowed (or no policy is configured), or an
1144 /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
1145 /// counter on deny.
1146 fn enforce_policy<I>(
1147 &self,
1148 req: &S3Request<I>,
1149 action: &'static str,
1150 bucket: &str,
1151 key: Option<&str>,
1152 ) -> S3Result<()> {
1153 self.enforce_policy_with_extra(req, action, bucket, key, None, None)
1154 }
1155
1156 /// v0.6 #39: variant of [`Self::enforce_policy`] that lets the
1157 /// caller plumb tag context (existing-on-object + on-request) into
1158 /// the policy evaluator. Both arguments default to `None`, in
1159 /// which case the resulting `RequestContext` is identical to
1160 /// [`Self::enforce_policy`]'s — so for handlers that don't deal
1161 /// with tags this is a transparent no-op.
1162 fn enforce_policy_with_extra<I>(
1163 &self,
1164 req: &S3Request<I>,
1165 action: &'static str,
1166 bucket: &str,
1167 key: Option<&str>,
1168 request_tags: Option<&crate::tagging::TagSet>,
1169 existing_tags: Option<&crate::tagging::TagSet>,
1170 ) -> S3Result<()> {
1171 let Some(policy) = self.policy.as_ref() else {
1172 return Ok(());
1173 };
1174 let principal_id = Self::principal_of(req);
1175 let mut ctx = self.request_context(req);
1176 if let Some(t) = request_tags {
1177 ctx.request_object_tags = Some(t.clone());
1178 }
1179 if let Some(t) = existing_tags {
1180 ctx.existing_object_tags = Some(t.clone());
1181 }
1182 let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1183 if decision.allow {
1184 Ok(())
1185 } else {
1186 crate::metrics::record_policy_denial(action, bucket);
1187 tracing::info!(
1188 action,
1189 bucket,
1190 key = ?key,
1191 principal = ?principal_id,
1192 source_ip = ?ctx.source_ip,
1193 user_agent = ?ctx.user_agent,
1194 secure_transport = ctx.secure_transport,
1195 matched_sid = ?decision.matched_sid,
1196 effect = ?decision.matched_effect,
1197 "S4 policy denied request"
1198 );
1199 Err(S3Error::with_message(
1200 S3ErrorCode::AccessDenied,
1201 format!("denied by S4 policy: {action} on bucket={bucket}"),
1202 ))
1203 }
1204 }
1205
1206 /// テスト用: backend を取り戻す (test helper、production では使わない).
1207 /// v0.6 #40 で `backend` が `Arc<B>` 化したので `Arc::try_unwrap` で
1208 /// 1-clone の場合のみ返す。共有されている (= replication dispatcher が
1209 /// 同じ Arc を持っていて未完了) 場合は `Err` を返さず panic させる
1210 /// (test 用途専用 helper の caller 契約を維持)。
1211 pub fn into_backend(self) -> B {
1212 Arc::try_unwrap(self.backend)
1213 .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1214 }
1215
1216 /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
1217 /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
1218 async fn partial_range_get(
1219 &self,
1220 req: &S3Request<GetObjectInput>,
1221 plan: s4_codec::index::RangePlan,
1222 client_start: u64,
1223 client_end_exclusive: u64,
1224 total_original: u64,
1225 get_start: Instant,
1226 ) -> S3Result<S3Response<GetObjectOutput>> {
1227 // 必要 byte 範囲だけを backend に partial GET
1228 let backend_range = s3s::dto::Range::Int {
1229 first: plan.byte_start,
1230 last: Some(plan.byte_end_exclusive - 1),
1231 };
1232 let backend_input = GetObjectInput {
1233 bucket: req.input.bucket.clone(),
1234 key: req.input.key.clone(),
1235 range: Some(backend_range),
1236 ..Default::default()
1237 };
1238 let backend_req = S3Request {
1239 input: backend_input,
1240 method: req.method.clone(),
1241 uri: req.uri.clone(),
1242 headers: req.headers.clone(),
1243 extensions: http::Extensions::new(),
1244 credentials: req.credentials.clone(),
1245 region: req.region.clone(),
1246 service: req.service.clone(),
1247 trailing_headers: None,
1248 };
1249 let mut backend_resp = self.backend.get_object(backend_req).await?;
1250 let blob = backend_resp.output.body.take().ok_or_else(|| {
1251 S3Error::with_message(
1252 S3ErrorCode::InternalError,
1253 "backend partial GET returned empty body",
1254 )
1255 })?;
1256 let bytes = collect_blob(blob, self.max_body_bytes)
1257 .await
1258 .map_err(internal("collect partial body"))?;
1259
1260 // frame parse + decompress
1261 let mut combined = BytesMut::new();
1262 for frame in FrameIter::new(bytes) {
1263 let (header, payload) = frame.map_err(|e| {
1264 S3Error::with_message(
1265 S3ErrorCode::InternalError,
1266 format!("partial-range frame parse: {e}"),
1267 )
1268 })?;
1269 let chunk_manifest = ChunkManifest {
1270 codec: header.codec,
1271 original_size: header.original_size,
1272 compressed_size: header.compressed_size,
1273 crc32c: header.crc32c,
1274 };
1275 let decompressed = self
1276 .registry
1277 .decompress(payload, &chunk_manifest)
1278 .await
1279 .map_err(internal("partial-range decompress"))?;
1280 combined.extend_from_slice(&decompressed);
1281 }
1282 let combined = combined.freeze();
1283 let sliced = combined
1284 .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1285
1286 // response 組立て
1287 let returned_size = sliced.len() as u64;
1288 backend_resp.output.content_length = Some(returned_size as i64);
1289 backend_resp.output.content_range = Some(format!(
1290 "bytes {client_start}-{}/{total_original}",
1291 client_end_exclusive - 1
1292 ));
1293 backend_resp.output.checksum_crc32 = None;
1294 backend_resp.output.checksum_crc32c = None;
1295 backend_resp.output.checksum_crc64nvme = None;
1296 backend_resp.output.checksum_sha1 = None;
1297 backend_resp.output.checksum_sha256 = None;
1298 backend_resp.output.e_tag = None;
1299 backend_resp.output.body = Some(bytes_to_blob(sliced));
1300 backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1301
1302 let elapsed = get_start.elapsed();
1303 crate::metrics::record_get(
1304 "partial",
1305 plan.byte_end_exclusive - plan.byte_start,
1306 returned_size,
1307 elapsed.as_secs_f64(),
1308 true,
1309 );
1310 info!(
1311 op = "get_object",
1312 bucket = %req.input.bucket,
1313 key = %req.input.key,
1314 bytes_in = plan.byte_end_exclusive - plan.byte_start,
1315 bytes_out = returned_size,
1316 total_object_size = total_original,
1317 range = true,
1318 path = "sidecar-partial",
1319 latency_ms = elapsed.as_millis() as u64,
1320 "S4 partial Range GET via sidecar index"
1321 );
1322 Ok(backend_resp)
1323 }
1324
1325 /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
1326 /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
1327 /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
1328 async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1329 let bytes = encode_index(index);
1330 let len = bytes.len() as i64;
1331 let sidecar = sidecar_key(key);
1332 // v0.7 #49: synthetic re-entry URI must be percent-encoded; if
1333 // the (already legally-arbitrary) S3 key produces something we
1334 // cannot encode at all, drop the sidecar PUT (the GET path
1335 // falls back to a full read on a missing sidecar) instead of
1336 // panicking on `parse().unwrap()`.
1337 let uri = match safe_object_uri(bucket, &sidecar) {
1338 Ok(u) => u,
1339 Err(e) => {
1340 tracing::warn!(
1341 bucket,
1342 key,
1343 "S4 write_sidecar skipped (key not URI-encodable): {e}"
1344 );
1345 return;
1346 }
1347 };
1348 let put_input = PutObjectInput {
1349 bucket: bucket.into(),
1350 key: sidecar,
1351 body: Some(bytes_to_blob(bytes)),
1352 content_length: Some(len),
1353 content_type: Some("application/x-s4-index".into()),
1354 ..Default::default()
1355 };
1356 let put_req = S3Request {
1357 input: put_input,
1358 method: http::Method::PUT,
1359 uri,
1360 headers: http::HeaderMap::new(),
1361 extensions: http::Extensions::new(),
1362 credentials: None,
1363 region: None,
1364 service: None,
1365 trailing_headers: None,
1366 };
1367 if let Err(e) = self.backend.put_object(put_req).await {
1368 tracing::warn!(
1369 bucket,
1370 key,
1371 "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1372 );
1373 }
1374 }
1375
1376 /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
1377 async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1378 let sidecar = sidecar_key(key);
1379 // v0.7 #49: same encode-or-bail treatment as write_sidecar.
1380 let uri = safe_object_uri(bucket, &sidecar).ok()?;
1381 let get_input = GetObjectInput {
1382 bucket: bucket.into(),
1383 key: sidecar,
1384 ..Default::default()
1385 };
1386 let get_req = S3Request {
1387 input: get_input,
1388 method: http::Method::GET,
1389 uri,
1390 headers: http::HeaderMap::new(),
1391 extensions: http::Extensions::new(),
1392 credentials: None,
1393 region: None,
1394 service: None,
1395 trailing_headers: None,
1396 };
1397 let resp = self.backend.get_object(get_req).await.ok()?;
1398 let blob = resp.output.body?;
1399 let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1400 decode_index(bytes).ok()
1401 }
1402
1403 /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
1404 ///
1405 /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
1406 /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
1407 /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
1408 async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1409 let mut out = BytesMut::new();
1410 for frame in FrameIter::new(bytes) {
1411 let (header, payload) = frame.map_err(|e| {
1412 S3Error::with_message(
1413 S3ErrorCode::InternalError,
1414 format!("multipart frame parse: {e}"),
1415 )
1416 })?;
1417 let chunk_manifest = ChunkManifest {
1418 codec: header.codec,
1419 original_size: header.original_size,
1420 compressed_size: header.compressed_size,
1421 crc32c: header.crc32c,
1422 };
1423 let decompressed = self
1424 .registry
1425 .decompress(payload, &chunk_manifest)
1426 .await
1427 .map_err(internal("multipart frame decompress"))?;
1428 out.extend_from_slice(&decompressed);
1429 }
1430 Ok(out.freeze())
1431 }
1432}
1433
1434/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
1435/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
1436/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
1437/// reject the other variants for parity with AWS.
1438fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1439 let rest = s
1440 .strip_prefix("bytes=")
1441 .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1442 let (a, b) = rest
1443 .split_once('-')
1444 .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1445 let first: u64 = a
1446 .parse()
1447 .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1448 let last: u64 = b
1449 .parse()
1450 .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1451 if last < first {
1452 return Err(format!("CopySourceRange last < first: {s:?}"));
1453 }
1454 Ok(s3s::dto::Range::Int {
1455 first,
1456 last: Some(last),
1457 })
1458}
1459
1460/// v0.5 #34: synthesize the backend storage key for a given
1461/// (logical key, version-id) pair on an Enabled-versioning bucket.
1462///
1463/// Uses the `__s4ver__/` infix because:
1464/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
1465/// listing filter collisions)
1466/// - directory-style separator keeps S3 console "browse by prefix" UX intact
1467/// (versions roll up under one virtual folder per object)
1468/// - human-readable on debug logs / `aws s3 ls`
1469///
1470/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
1471/// keys containing `.__s4ver__/` from results so customers don't see internal
1472/// shadow objects.
1473pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1474 format!("{key}.__s4ver__/{version_id}")
1475}
1476
1477/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
1478/// scan; both list_objects filter and the GET passthrough check use this.
1479fn is_versioning_shadow_key(key: &str) -> bool {
1480 key.contains(".__s4ver__/")
1481}
1482
1483/// v0.6 #42: wall-clock seconds since the UNIX epoch — fed to
1484/// `mfa::check_mfa` so the TOTP verifier can match the client's
1485/// authenticator app's view of "now". Falls back to `0` on the
1486/// (impossible-in-practice) clock-before-1970 path so the verifier
1487/// rejects rather than panicking.
1488fn current_unix_secs() -> u64 {
1489 std::time::SystemTime::now()
1490 .duration_since(std::time::UNIX_EPOCH)
1491 .map(|d| d.as_secs())
1492 .unwrap_or(0)
1493}
1494
1495/// v0.6 #42: translate an `MfaError` into the matching S3 wire error.
1496///
1497/// - `Missing` / `SerialMismatch` / `InvalidCode` → `403 AccessDenied`
1498/// (S3 spec for MFA Delete: every gating failure surfaces as
1499/// `AccessDenied`, not a separate `MFA*` code).
1500/// - `Malformed` → `400 InvalidRequest` (the request itself is
1501/// syntactically broken, not a permission issue).
1502fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1503 match e {
1504 crate::mfa::MfaError::Missing => S3Error::with_message(
1505 S3ErrorCode::AccessDenied,
1506 "MFA token required for this operation",
1507 ),
1508 crate::mfa::MfaError::Malformed => S3Error::with_message(
1509 S3ErrorCode::InvalidRequest,
1510 "malformed x-amz-mfa header",
1511 ),
1512 crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1513 S3ErrorCode::AccessDenied,
1514 "MFA serial does not match configured device",
1515 ),
1516 crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1517 S3ErrorCode::AccessDenied,
1518 "invalid MFA code",
1519 ),
1520 }
1521}
1522
1523fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1524 metadata
1525 .as_ref()
1526 .and_then(|m| m.get(META_MULTIPART))
1527 .map(|v| v == "true")
1528 .unwrap_or(false)
1529}
1530
1531const META_CODEC: &str = "s4-codec";
1532const META_ORIGINAL_SIZE: &str = "s4-original-size";
1533const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1534const META_CRC32C: &str = "s4-crc32c";
1535/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
1536/// GET 時にこの flag を見て frame parser を起動する。
1537const META_MULTIPART: &str = "s4-multipart";
1538/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
1539/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
1540/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
1541const META_FRAMED: &str = "s4-framed";
1542
1543fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1544 metadata
1545 .as_ref()
1546 .and_then(|m| m.get(META_FRAMED))
1547 .map(|v| v == "true")
1548 .unwrap_or(false)
1549}
1550
1551/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
1552fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1553 metadata
1554 .as_ref()
1555 .and_then(|m| m.get("s4-encrypted"))
1556 .map(|v| v == "aes-256-gcm")
1557 .unwrap_or(false)
1558}
1559
1560/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
1561/// contract is "all three or none" — partial sets are a 400.
1562///
1563/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
1564/// no encryption), `Ok(Some(material))` on validated client key, and
1565/// `Err` for malformed or partial inputs.
1566fn extract_sse_c_material(
1567 algorithm: &Option<String>,
1568 key: &Option<String>,
1569 md5: &Option<String>,
1570) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1571 match (algorithm, key, md5) {
1572 (None, None, None) => Ok(None),
1573 (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1574 .map(Some)
1575 .map_err(sse_c_error_to_s3),
1576 _ => Err(S3Error::with_message(
1577 S3ErrorCode::InvalidRequest,
1578 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1579 )),
1580 }
1581}
1582
1583/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
1584/// Returns the key-id to wrap under, falling back to the gateway default.
1585fn extract_kms_key_id(
1586 sse: &Option<ServerSideEncryption>,
1587 sse_kms_key_id: &Option<String>,
1588 gateway_default: Option<&str>,
1589) -> Option<String> {
1590 let asks_for_kms = sse
1591 .as_ref()
1592 .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1593 .unwrap_or(false);
1594 if !asks_for_kms {
1595 return None;
1596 }
1597 sse_kms_key_id
1598 .clone()
1599 .or_else(|| gateway_default.map(str::to_owned))
1600}
1601
1602/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
1603/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
1604/// transient KMS outage (503). Other variants are 500 InternalError.
1605fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1606 use crate::kms::KmsError as K;
1607 match e {
1608 K::KeyNotFound { key_id } => S3Error::with_message(
1609 S3ErrorCode::InvalidArgument,
1610 format!("KMS key not found: {key_id}"),
1611 ),
1612 K::BackendUnavailable { message } => S3Error::with_message(
1613 S3ErrorCode::ServiceUnavailable,
1614 format!("KMS backend unavailable: {message}"),
1615 ),
1616 other => S3Error::with_message(
1617 S3ErrorCode::InternalError,
1618 format!("KMS error: {other}"),
1619 ),
1620 }
1621}
1622
1623/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
1624/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
1625/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
1626fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1627 use crate::sse::SseError as E;
1628 match e {
1629 E::WrongCustomerKey => S3Error::with_message(
1630 S3ErrorCode::AccessDenied,
1631 "SSE-C key does not match the key used at PUT time",
1632 ),
1633 E::InvalidCustomerKey { reason } => S3Error::with_message(
1634 S3ErrorCode::InvalidArgument,
1635 format!("SSE-C: {reason}"),
1636 ),
1637 E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1638 S3ErrorCode::InvalidArgument,
1639 format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1640 ),
1641 E::CustomerKeyRequired => S3Error::with_message(
1642 S3ErrorCode::InvalidRequest,
1643 "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1644 ),
1645 E::CustomerKeyUnexpected => S3Error::with_message(
1646 S3ErrorCode::InvalidRequest,
1647 "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1648 ),
1649 other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1650 }
1651}
1652
1653fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1654 let m = metadata.as_ref()?;
1655 let codec = m
1656 .get(META_CODEC)
1657 .and_then(|s| s.parse::<CodecKind>().ok())?;
1658 let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1659 let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1660 let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1661 Some(ChunkManifest {
1662 codec,
1663 original_size,
1664 compressed_size,
1665 crc32c,
1666 })
1667}
1668
1669fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1670 let meta = metadata.get_or_insert_with(Default::default);
1671 meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1672 meta.insert(
1673 META_ORIGINAL_SIZE.into(),
1674 manifest.original_size.to_string(),
1675 );
1676 meta.insert(
1677 META_COMPRESSED_SIZE.into(),
1678 manifest.compressed_size.to_string(),
1679 );
1680 meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1681}
1682
1683fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1684 move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1685}
1686
1687/// v0.6 #41: map a `select::SelectError` to the S3 error surface. AWS
1688/// uses a domain-specific `InvalidSqlExpression` code for parse / unsupported
1689/// errors, but s3s 0.13 doesn't expose that as a typed variant — we
1690/// fall back to the well-known `InvalidRequest` 400 with a descriptive
1691/// message that includes the original error context.
1692fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1693 use crate::select::SelectError;
1694 match e {
1695 SelectError::Parse(msg) => S3Error::with_message(
1696 S3ErrorCode::InvalidRequest,
1697 format!("SQL parse error: {msg}"),
1698 ),
1699 SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1700 S3ErrorCode::InvalidRequest,
1701 format!("unsupported SQL feature: {msg}"),
1702 ),
1703 SelectError::RowEval(msg) => S3Error::with_message(
1704 S3ErrorCode::InvalidRequest,
1705 format!("SQL row evaluation error: {msg}"),
1706 ),
1707 SelectError::InputFormat(msg) => S3Error::with_message(
1708 S3ErrorCode::InvalidRequest,
1709 format!("{fmt} input format error: {msg}"),
1710 ),
1711 }
1712}
1713
1714/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
1715/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
1716/// (including missing) is treated as `false`.
1717fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1718 headers
1719 .get("x-amz-bypass-governance-retention")
1720 .and_then(|v| v.to_str().ok())
1721 .map(|s| s.eq_ignore_ascii_case("true"))
1722 .unwrap_or(false)
1723}
1724
1725/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
1726/// as an RFC3339 string and re-parsing through `chrono`. The string format
1727/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
1728/// s4-server) into our direct deps. Returns `None` if the format/parse fails
1729/// or the value is outside `chrono`'s supported range.
1730fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1731 let mut buf = Vec::new();
1732 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1733 let s = std::str::from_utf8(&buf).ok()?;
1734 chrono::DateTime::parse_from_rfc3339(s)
1735 .ok()
1736 .map(|dt| dt.with_timezone(&chrono::Utc))
1737}
1738
1739/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
1740/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
1741fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1742 // chrono's RFC3339 output format matches s3s' parser ("...Z" with
1743 // optional sub-second precision). Fall back to UNIX_EPOCH if anything
1744 // unexpected happens — we never produce malformed strings, so this
1745 // branch is unreachable in practice.
1746 let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1747 Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1748}
1749
1750/// v0.6 #39: convert our internal [`crate::tagging::TagSet`] into the
1751/// s3s `Vec<Tag>` wire shape used on `GetObject/BucketTaggingOutput`.
1752/// Both halves of every pair land in the `Some(_)` slot — AWS marks
1753/// the field optional but always populates it on response.
1754fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1755 set.iter()
1756 .map(|(k, v)| Tag {
1757 key: Some(k.clone()),
1758 value: Some(v.clone()),
1759 })
1760 .collect()
1761}
1762
1763/// v0.6 #39: inverse of [`tagset_to_aws`] for input handlers. Missing
1764/// keys / values become empty strings (mirrors AWS, which rejects
1765/// `<Key/>` with InvalidTag at the parser layer; downstream
1766/// `TagSet::validate` then enforces our size limits).
1767fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1768 let pairs = tags
1769 .iter()
1770 .map(|t| {
1771 (
1772 t.key.clone().unwrap_or_default(),
1773 t.value.clone().unwrap_or_default(),
1774 )
1775 })
1776 .collect();
1777 crate::tagging::TagSet::from_pairs(pairs)
1778}
1779
1780/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
1781/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
1782/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
1783pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1784 if total == 0 {
1785 return Err("cannot range-get zero-length object".into());
1786 }
1787 match range {
1788 s3s::dto::Range::Int { first, last } => {
1789 let start = *first;
1790 let end_inclusive = match last {
1791 Some(l) => (*l).min(total - 1),
1792 None => total - 1,
1793 };
1794 if start > end_inclusive || start >= total {
1795 return Err(format!(
1796 "range bytes={start}-{:?} out of object size {total}",
1797 last
1798 ));
1799 }
1800 Ok((start, end_inclusive + 1))
1801 }
1802 s3s::dto::Range::Suffix { length } => {
1803 let len = (*length).min(total);
1804 Ok((total - len, total))
1805 }
1806 }
1807}
1808
1809#[async_trait::async_trait]
1810impl<B: S3> S3 for S4Service<B> {
1811 // === 圧縮を挟む path (PUT) ===
1812 #[tracing::instrument(
1813 name = "s4.put_object",
1814 skip(self, req),
1815 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1816 )]
1817 async fn put_object(
1818 &self,
1819 mut req: S3Request<PutObjectInput>,
1820 ) -> S3Result<S3Response<PutObjectOutput>> {
1821 let put_start = Instant::now();
1822 let put_bucket = req.input.bucket.clone();
1823 let put_key = req.input.key.clone();
1824 let access_preamble = self.access_log_preamble(&req);
1825 self.enforce_rate_limit(&req, &put_bucket)?;
1826 // v0.6 #39: parse `x-amz-tagging` (URL-encoded query string) so
1827 // the IAM policy gate sees the request's tags via
1828 // `s3:RequestObjectTag/<key>`. `existing_object_tags` is also
1829 // resolved from the Tagging manager (when wired) so
1830 // `s3:ExistingObjectTag/<key>` works on overwrite.
1831 let request_tags: Option<crate::tagging::TagSet> = req
1832 .input
1833 .tagging
1834 .as_deref()
1835 .map(crate::tagging::parse_tagging_header)
1836 .transpose()
1837 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1838 let existing_tags: Option<crate::tagging::TagSet> = self
1839 .tagging
1840 .as_ref()
1841 .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1842 self.enforce_policy_with_extra(
1843 &req,
1844 "s3:PutObject",
1845 &put_bucket,
1846 Some(&put_key),
1847 request_tags.as_ref(),
1848 existing_tags.as_ref(),
1849 )?;
1850 // v0.5 #30: an Object Lock-protected key cannot be overwritten by
1851 // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
1852 // bucket PUTs are exempt because they materialise a fresh
1853 // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
1854 // locked version's bytes are untouched. The check mirrors the
1855 // delete path (Compliance never bypassable, Governance via the
1856 // bypass header, legal hold never).
1857 if let Some(mgr) = self.object_lock.as_ref()
1858 && let Some(state) = mgr.get(&put_bucket, &put_key)
1859 {
1860 let bucket_versioned_enabled = self
1861 .versioning
1862 .as_ref()
1863 .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1864 .unwrap_or(false);
1865 if !bucket_versioned_enabled {
1866 let bypass = parse_bypass_governance_header(&req.headers);
1867 let now = chrono::Utc::now();
1868 if !state.can_delete(now, bypass) {
1869 crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1870 return Err(S3Error::with_message(
1871 S3ErrorCode::AccessDenied,
1872 "Access Denied because object protected by object lock",
1873 ));
1874 }
1875 }
1876 }
1877 // v0.5 #30: per-PUT explicit retention / legal hold (S3
1878 // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
1879 // `x-amz-object-lock-legal-hold`). Captured before the body
1880 // moves into the backend; persisted into the manager only on
1881 // backend success below.
1882 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1883 .input
1884 .object_lock_mode
1885 .as_ref()
1886 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1887 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1888 .input
1889 .object_lock_retain_until_date
1890 .as_ref()
1891 .and_then(timestamp_to_chrono_utc);
1892 let explicit_legal_hold_on: Option<bool> = req
1893 .input
1894 .object_lock_legal_hold_status
1895 .as_ref()
1896 .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1897 if let Some(blob) = req.input.body.take() {
1898 // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
1899 // compress fast path、そうでなければ従来の collect-then-compress。
1900 let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1901 .await
1902 .map_err(internal("peek put sample"))?;
1903 let sample_len = sample.len().min(SAMPLE_BYTES);
1904 // v0.8 #56: pass the request's Content-Length (when present) so
1905 // the sampling dispatcher can promote large objects to a GPU
1906 // codec. Chunked transfers (no Content-Length) keep CPU.
1907 let total_size_hint = req.input.content_length.and_then(|n| u64::try_from(n).ok());
1908 let kind = self
1909 .dispatcher
1910 .pick_with_size_hint(&sample[..sample_len], total_size_hint)
1911 .await;
1912
1913 // Passthrough buys nothing from S4F2 wrapping (no compression =
1914 // no per-chunk frame to skip past) and the +28-byte header
1915 // overhead breaks size-sensitive callers that expect a true
1916 // pass-through. So passthrough always uses the legacy raw-blob
1917 // path; only compressing codecs go through the framed path.
1918 let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1919 let (compressed, manifest, is_framed) = if use_framed {
1920 // streaming fast path: input は memory に collect しない
1921 let chained = chain_sample_with_rest(sample, rest_stream);
1922 debug!(
1923 bucket = ?req.input.bucket,
1924 key = ?req.input.key,
1925 codec = kind.as_str(),
1926 path = "streaming-framed",
1927 "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1928 );
1929 // v0.4 #16: pick the chunk size based on the request's
1930 // Content-Length when known, falling back to the 4 MiB
1931 // default for chunked transfers.
1932 let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1933 let (body, manifest) = streaming_compress_to_frames(
1934 chained,
1935 Arc::clone(&self.registry),
1936 kind,
1937 chunk_size,
1938 )
1939 .await
1940 .map_err(internal("streaming framed compress"))?;
1941 (body, manifest, true)
1942 } else {
1943 // GPU codec 等で streaming-aware でないものは bytes-buffered path
1944 // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1945 let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1946 .await
1947 .map_err(internal("collect put body (buffered path)"))?;
1948 debug!(
1949 bucket = ?req.input.bucket,
1950 key = ?req.input.key,
1951 bytes = bytes.len(),
1952 codec = kind.as_str(),
1953 path = "buffered",
1954 "S4 put_object: compressing (buffered, raw blob)"
1955 );
1956 // v0.8 #55: telemetry-returning compress so we can stamp
1957 // GPU-pipeline Prometheus metrics (`s4_gpu_compress_seconds`,
1958 // throughput gauge, OOM counter) for nvcomp / dietgpu codecs.
1959 // CPU codecs come back with `gpu_seconds = None` and the
1960 // stamp helper short-circuits — no extra cost on CPU path.
1961 let (compress_res, tel) =
1962 self.registry.compress_with_telemetry(bytes, kind).await;
1963 stamp_gpu_compress_telemetry(&tel);
1964 let (body, m) = compress_res.map_err(internal("registry compress"))?;
1965 (body, m, false)
1966 };
1967
1968 write_manifest(&mut req.input.metadata, &manifest);
1969 if is_framed {
1970 // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1971 req.input
1972 .metadata
1973 .get_or_insert_with(Default::default)
1974 .insert(META_FRAMED.into(), "true".into());
1975 }
1976 // 重要: content_length を圧縮後サイズで更新する。
1977 // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1978 // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1979 req.input.content_length = Some(compressed.len() as i64);
1980 // body を書き換えたので、客側が送ってきた original body 用の
1981 // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1982 // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1983 // ChunkManifest.crc32c で担保している。
1984 req.input.checksum_algorithm = None;
1985 req.input.checksum_crc32 = None;
1986 req.input.checksum_crc32c = None;
1987 req.input.checksum_crc64nvme = None;
1988 req.input.checksum_sha1 = None;
1989 req.input.checksum_sha256 = None;
1990 req.input.content_md5 = None;
1991 let original_size = manifest.original_size;
1992 let compressed_size = manifest.compressed_size;
1993 let codec_label = manifest.codec.as_str();
1994 // framed body は GET 側で sidecar partial-fetch を効かせるため
1995 // build_index_from_body で sidecar を組み立てて backend に PUT する。
1996 let sidecar_index = if is_framed {
1997 s4_codec::index::build_index_from_body(&compressed).ok()
1998 } else {
1999 None
2000 };
2001 // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
2002 // Precedence:
2003 // - SSE-C headers present → per-request customer key (S4E3)
2004 // - server-managed keyring configured → active key (S4E2)
2005 // - neither → no encryption (raw compressed body)
2006 // The `s4-encrypted: aes-256-gcm` metadata flag is set in
2007 // both encrypted modes; the on-disk frame magic distinguishes
2008 // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
2009 // v0.7 #48 BUG-2/3 fix: take() the SSE fields off req.input
2010 // so the encryption headers are NOT forwarded to the
2011 // backend. S4 owns the encrypt-then-store contract; if we
2012 // leave the headers in place, real S3-compat backends
2013 // (MinIO / AWS) try to apply their own SSE on top and
2014 // either reject (MinIO requires HTTPS for SSE-C) or fail
2015 // (MinIO has no KMS configured). MemoryBackend ignored
2016 // these so mock tests passed.
2017 let sse_c_alg = req.input.sse_customer_algorithm.take();
2018 let sse_c_key = req.input.sse_customer_key.take();
2019 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2020 let sse_header = req.input.server_side_encryption.take();
2021 let sse_kms_key = req.input.ssekms_key_id.take();
2022 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2023 // v0.5 #28: SSE-KMS request? Resolves to None unless the
2024 // request asks for `aws:kms` AND a key id is available
2025 // (explicit header or gateway default). When set, we'll
2026 // generate a per-object DEK below.
2027 let kms_key_id = extract_kms_key_id(
2028 &sse_header,
2029 &sse_kms_key,
2030 self.kms_default_key_id.as_deref(),
2031 );
2032 // v0.5 #32: in compliance-strict mode, every PUT must
2033 // declare SSE — either client-supplied (SSE-C), KMS, or by
2034 // virtue of a server-side keyring being configured (which
2035 // applies SSE-S4 to every PUT automatically). Requests that
2036 // would otherwise land as plain compressed bytes are
2037 // rejected with 400 InvalidRequest.
2038 if self.compliance_strict
2039 && sse_c_material.is_none()
2040 && kms_key_id.is_none()
2041 && self.sse_keyring.is_none()
2042 && sse_header.as_ref().map(|s| s.as_str())
2043 != Some(ServerSideEncryption::AES256)
2044 {
2045 return Err(S3Error::with_message(
2046 S3ErrorCode::InvalidRequest,
2047 "compliance-mode strict: PUT must include x-amz-server-side-encryption \
2048 (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
2049 ));
2050 }
2051 // SSE-C and SSE-KMS are mutually exclusive on a single PUT
2052 // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
2053 if sse_c_material.is_some() && kms_key_id.is_some() {
2054 return Err(S3Error::with_message(
2055 S3ErrorCode::InvalidArgument,
2056 "SSE-C and SSE-KMS cannot be used together on the same PUT",
2057 ));
2058 }
2059 // KMS path needs to call generate_dek().await before the
2060 // body_to_send branch; capture the result here.
2061 //
2062 // v0.8.1 #58: the plaintext DEK lives in three places
2063 // during one PUT:
2064 //
2065 // 1. The `Zeroizing<Vec<u8>>` returned by `generate_dek`
2066 // — wiped when the binding `dek` falls out of scope at
2067 // the end of this `if`-arm.
2068 // 2. The stack `[u8; 32]` we copy into for `SseSource::Kms`
2069 // — wrapped in `Zeroizing<[u8; 32]>` so it's wiped when
2070 // the outer `kms_wrap` `Option` is dropped at the end
2071 // of `put_object`.
2072 // 3. AES-GCM internal key state inside the `aes-gcm`
2073 // crate during `encrypt_with_source` — out of scope
2074 // for this fix; tracked separately in v0.8.2.
2075 let kms_wrap: Option<(zeroize::Zeroizing<[u8; 32]>, crate::kms::WrappedDek)> =
2076 if let Some(ref key_id) = kms_key_id {
2077 let kms = self.kms.as_ref().ok_or_else(|| {
2078 S3Error::with_message(
2079 S3ErrorCode::InvalidRequest,
2080 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2081 )
2082 })?;
2083 // `dek` is `Zeroizing<Vec<u8>>`; deref + slice access
2084 // works unchanged via `Deref<Target=Vec<u8>>`.
2085 let (dek, wrapped) = kms
2086 .generate_dek(key_id)
2087 .await
2088 .map_err(kms_error_to_s3)?;
2089 if dek.len() != 32 {
2090 return Err(S3Error::with_message(
2091 S3ErrorCode::InternalError,
2092 format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
2093 ));
2094 }
2095 let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
2096 zeroize::Zeroizing::new([0u8; 32]);
2097 dek_arr.copy_from_slice(&dek);
2098 // `dek` (the `Zeroizing<Vec<u8>>`) is dropped at the
2099 // end of this scope, wiping the heap allocation.
2100 Some((dek_arr, wrapped))
2101 } else {
2102 None
2103 };
2104 // v0.7 #48 BUG-4 fix: stamp the SSE *type* into metadata
2105 // alongside `s4-encrypted` so HEAD (which doesn't fetch the
2106 // body) can echo the correct `x-amz-server-side-encryption`
2107 // value. Without this, HEAD on an SSE-KMS object would not
2108 // echo `aws:kms` because the frame magic is only available
2109 // on the body (which HEAD doesn't read).
2110 let body_to_send = if let Some(ref m) = sse_c_material {
2111 let meta = req.input.metadata.get_or_insert_with(Default::default);
2112 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2113 meta.insert("s4-sse-type".into(), "AES256".into());
2114 meta.insert("s4-sse-c-key-md5".into(),
2115 base64::engine::general_purpose::STANDARD.encode(m.key_md5));
2116 crate::sse::encrypt_with_source(
2117 &compressed,
2118 crate::sse::SseSource::CustomerKey {
2119 key: &m.key,
2120 key_md5: &m.key_md5,
2121 },
2122 )
2123 } else if let Some((ref dek, ref wrapped)) = kms_wrap {
2124 let meta = req.input.metadata.get_or_insert_with(Default::default);
2125 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2126 meta.insert("s4-sse-type".into(), "aws:kms".into());
2127 meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
2128 // v0.8.1 #58: `dek` is `&Zeroizing<[u8; 32]>`; `SseSource::Kms`
2129 // wants `&[u8; 32]`. Rust auto-derefs `&Zeroizing<T>` to
2130 // `&T` here via `Deref<Target=T>`, so the binding picks
2131 // up the inner array reference without copying. The array
2132 // stays in the `Zeroizing` wrapper that owns it and gets
2133 // wiped when `kms_wrap` drops at the end of `put_object`.
2134 let dek_ref: &[u8; 32] = dek;
2135 crate::sse::encrypt_with_source(
2136 &compressed,
2137 crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
2138 )
2139 } else if let Some(keyring) = self.sse_keyring.as_ref() {
2140 // SSE-S4 is server-driven transparent encryption; the
2141 // client didn't ask for SSE. We stamp `s4-encrypted`
2142 // (internal flag the GET path needs) but deliberately
2143 // do NOT stamp `s4-sse-type` — that lights up the HEAD
2144 // echo of `x-amz-server-side-encryption: AES256`,
2145 // which would falsely advertise AWS-style SSE-S3
2146 // semantics the operator didn't request.
2147 let meta = req.input.metadata.get_or_insert_with(Default::default);
2148 meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2149 // v0.8 #52: when `--sse-chunk-size > 0` is configured,
2150 // emit the chunked S4E5 frame so the matching GET can
2151 // stream-decrypt instead of buffering 5 GiB before
2152 // emitting a byte. Falls back to the buffered S4E2
2153 // frame at chunk_size=0 (default) so existing
2154 // deployments are bit-for-bit unchanged.
2155 if self.sse_chunk_size > 0 {
2156 crate::sse::encrypt_v2_chunked(
2157 &compressed,
2158 keyring,
2159 self.sse_chunk_size,
2160 )
2161 .map_err(|e| {
2162 S3Error::with_message(
2163 S3ErrorCode::InternalError,
2164 format!("SSE-S4 chunked encrypt failed: {e}"),
2165 )
2166 })?
2167 } else {
2168 crate::sse::encrypt_v2(&compressed, keyring)
2169 }
2170 } else {
2171 compressed.clone()
2172 };
2173 // v0.6 #40: capture the about-to-be-sent body + metadata so
2174 // the replication dispatcher (run after the source PUT
2175 // succeeds) can hand the same backend bytes to the
2176 // destination bucket. `Bytes` clone is cheap (refcounted).
2177 let replication_body = body_to_send.clone();
2178 let replication_metadata = req.input.metadata.clone();
2179 // v0.7 #48 BUG-1 fix: SSE encryption (S4E1/E2/E3/E4 frames)
2180 // makes the body longer than the post-compression bytes
2181 // (header + nonce + tag overhead). The earlier
2182 // content_length stamp at compressed.len() is now stale, so
2183 // re-stamp from the actual bytes about to be sent or the
2184 // backend (real S3 / MinIO) rejects with
2185 // `StreamLengthMismatch`. MemoryBackend never validated
2186 // this, which is why mock-only tests passed.
2187 req.input.content_length = Some(body_to_send.len() as i64);
2188 req.input.body = Some(bytes_to_blob(body_to_send));
2189 // v0.5 #34: pre-allocate a version-id when the bucket is
2190 // Enabled, then redirect the backend storage key to the
2191 // shadow path so older versions survive newer PUTs.
2192 // Suspended / Unversioned buckets keep using the plain
2193 // `<key>` (S3 spec: Suspended overwrites the same backend
2194 // object). Pre-allocation (instead of recording after PUT)
2195 // ensures the shadow key + the response's
2196 // `x-amz-version-id` use the same vid.
2197 let pending_version: Option<crate::versioning::PutOutcome> = self
2198 .versioning
2199 .as_ref()
2200 .map(|mgr| mgr.state(&put_bucket))
2201 .map(|state| match state {
2202 crate::versioning::VersioningState::Enabled => {
2203 crate::versioning::PutOutcome {
2204 version_id: crate::versioning::VersioningManager::new_version_id(),
2205 versioned_response: true,
2206 }
2207 }
2208 crate::versioning::VersioningState::Suspended
2209 | crate::versioning::VersioningState::Unversioned => {
2210 crate::versioning::PutOutcome {
2211 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2212 versioned_response: false,
2213 }
2214 }
2215 });
2216 if let Some(ref pv) = pending_version
2217 && pv.versioned_response
2218 {
2219 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2220 }
2221 let mut backend_resp = self.backend.put_object(req).await;
2222 if let Some(idx) = sidecar_index
2223 && backend_resp.is_ok()
2224 && idx.entries.len() > 1
2225 {
2226 // 1 chunk しかない (small object) なら sidecar は意味がない (=
2227 // partial fetch しても full body と同じ範囲) ので省略。
2228 // Sidecar は user-visible key で書く (latest version の
2229 // partial fetch path 用)。Old versions の Range GET は今 task
2230 // の scope 外 (full read fallback でも意味的には正しい)。
2231 self.write_sidecar(&put_bucket, &put_key, &idx).await;
2232 }
2233 // v0.5 #34: commit the new version into the manager only on
2234 // backend success. Use the pre-allocated vid so the response
2235 // header and the chain entry agree.
2236 if let (Some(mgr), Some(pv), Ok(resp)) = (
2237 self.versioning.as_ref(),
2238 pending_version.as_ref(),
2239 backend_resp.as_mut(),
2240 ) {
2241 let etag = resp
2242 .output
2243 .e_tag
2244 .clone()
2245 .map(ETag::into_value)
2246 .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2247 let now = chrono::Utc::now();
2248 mgr.commit_put_with_version(
2249 &put_bucket,
2250 &put_key,
2251 crate::versioning::VersionEntry {
2252 version_id: pv.version_id.clone(),
2253 etag,
2254 size: original_size,
2255 is_delete_marker: false,
2256 created_at: now,
2257 },
2258 );
2259 if pv.versioned_response {
2260 resp.output.version_id = Some(pv.version_id.clone());
2261 }
2262 }
2263 // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
2264 // so the client knows the server actually applied the
2265 // requested algorithm and which key fingerprint matched.
2266 if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2267 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2268 resp.output.sse_customer_key_md5 = Some(
2269 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2270 );
2271 }
2272 // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
2273 // the backend returned (AWS KMS returns the ARN even when
2274 // the request used an alias).
2275 if let (Some((_, wrapped)), Ok(resp)) =
2276 (kms_wrap.as_ref(), backend_resp.as_mut())
2277 {
2278 resp.output.server_side_encryption =
2279 Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2280 resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2281 }
2282 // v0.5 #30: persist any per-PUT explicit retention / legal
2283 // hold the client supplied, then auto-apply the bucket
2284 // default (no-op when state is already populated). The
2285 // explicit fields take precedence — the bucket-default
2286 // helper bails out as soon as it sees any retention.
2287 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2288 if explicit_lock_mode.is_some()
2289 || explicit_retain_until.is_some()
2290 || explicit_legal_hold_on.is_some()
2291 {
2292 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2293 if let Some(m) = explicit_lock_mode {
2294 state.mode = Some(m);
2295 }
2296 if let Some(u) = explicit_retain_until {
2297 state.retain_until = Some(u);
2298 }
2299 if let Some(lh) = explicit_legal_hold_on {
2300 state.legal_hold_on = lh;
2301 }
2302 mgr.set(&put_bucket, &put_key, state);
2303 }
2304 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2305 }
2306 let _ = (original_size, compressed_size); // mute unused warnings
2307 let elapsed = put_start.elapsed();
2308 crate::metrics::record_put(
2309 codec_label,
2310 original_size,
2311 compressed_size,
2312 elapsed.as_secs_f64(),
2313 backend_resp.is_ok(),
2314 );
2315 // v0.4 #20: structured access-log entry (best-effort).
2316 self.record_access(
2317 access_preamble,
2318 "REST.PUT.OBJECT",
2319 &put_bucket,
2320 Some(&put_key),
2321 if backend_resp.is_ok() { 200 } else { 500 },
2322 compressed_size,
2323 original_size,
2324 elapsed.as_millis() as u64,
2325 backend_resp.as_ref().err().map(|e| e.code().as_str()),
2326 )
2327 .await;
2328 info!(
2329 op = "put_object",
2330 bucket = %put_bucket,
2331 key = %put_key,
2332 codec = codec_label,
2333 bytes_in = original_size,
2334 bytes_out = compressed_size,
2335 ratio = format!(
2336 "{:.3}",
2337 if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2338 ),
2339 latency_ms = elapsed.as_millis() as u64,
2340 ok = backend_resp.is_ok(),
2341 "S4 put completed"
2342 );
2343 // v0.6 #35: fire bucket-notification destinations (best-effort,
2344 // detached). Skipped when no manager is attached or when the
2345 // bucket has no rule matching `s3:ObjectCreated:Put` for this
2346 // key.
2347 if backend_resp.is_ok()
2348 && let Some(mgr) = self.notifications.as_ref()
2349 {
2350 let dests = mgr.match_destinations(
2351 &put_bucket,
2352 &crate::notifications::EventType::ObjectCreatedPut,
2353 &put_key,
2354 );
2355 if !dests.is_empty() {
2356 let etag = backend_resp
2357 .as_ref()
2358 .ok()
2359 .and_then(|r| r.output.e_tag.clone())
2360 .map(ETag::into_value);
2361 let version_id = pending_version
2362 .as_ref()
2363 .filter(|pv| pv.versioned_response)
2364 .map(|pv| pv.version_id.clone());
2365 tokio::spawn(crate::notifications::dispatch_event(
2366 Arc::clone(mgr),
2367 put_bucket.clone(),
2368 put_key.clone(),
2369 crate::notifications::EventType::ObjectCreatedPut,
2370 Some(original_size),
2371 etag,
2372 version_id,
2373 format!("S4-{}", uuid::Uuid::new_v4()),
2374 ));
2375 }
2376 }
2377 // v0.6 #39: persist parsed `x-amz-tagging` tags into the
2378 // tagging manager on a successful PUT. AWS PutObject's
2379 // tagging is a full-replace operation (not a merge), so
2380 // any pre-existing entry for `(bucket, key)` is overwritten.
2381 if backend_resp.is_ok()
2382 && let (Some(mgr), Some(tags)) =
2383 (self.tagging.as_ref(), request_tags.clone())
2384 {
2385 mgr.put_object_tags(&put_bucket, &put_key, tags);
2386 }
2387 // v0.6 #40: cross-bucket replication fire-point. On
2388 // successful source PUT, consult the replication manager;
2389 // when an enabled rule matches, mark the source key
2390 // `Pending` and spawn a detached task that PUTs the same
2391 // backend bytes + metadata to the rule's destination
2392 // bucket. The dispatcher itself records `Completed` /
2393 // `Failed` and bumps the drop counter on retry-budget
2394 // exhaustion.
2395 self.spawn_replication_if_matched(
2396 &put_bucket,
2397 &put_key,
2398 &request_tags,
2399 &replication_body,
2400 &replication_metadata,
2401 backend_resp.is_ok(),
2402 pending_version.as_ref(),
2403 );
2404 return backend_resp;
2405 }
2406 // Body-less PUT (rare: zero-length object). Mirror the body-full
2407 // versioning hooks so list_object_versions / GET-by-version still see
2408 // empty-body objects in the chain.
2409 let pending_version: Option<crate::versioning::PutOutcome> = self
2410 .versioning
2411 .as_ref()
2412 .map(|mgr| mgr.state(&put_bucket))
2413 .map(|state| match state {
2414 crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2415 version_id: crate::versioning::VersioningManager::new_version_id(),
2416 versioned_response: true,
2417 },
2418 _ => crate::versioning::PutOutcome {
2419 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2420 versioned_response: false,
2421 },
2422 });
2423 if let Some(ref pv) = pending_version
2424 && pv.versioned_response
2425 {
2426 req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2427 }
2428 let mut backend_resp = self.backend.put_object(req).await;
2429 if let (Some(mgr), Some(pv), Ok(resp)) = (
2430 self.versioning.as_ref(),
2431 pending_version.as_ref(),
2432 backend_resp.as_mut(),
2433 ) {
2434 let etag = resp
2435 .output
2436 .e_tag
2437 .clone()
2438 .map(ETag::into_value)
2439 .unwrap_or_default();
2440 let now = chrono::Utc::now();
2441 mgr.commit_put_with_version(
2442 &put_bucket,
2443 &put_key,
2444 crate::versioning::VersionEntry {
2445 version_id: pv.version_id.clone(),
2446 etag,
2447 size: 0,
2448 is_delete_marker: false,
2449 created_at: now,
2450 },
2451 );
2452 if pv.versioned_response {
2453 resp.output.version_id = Some(pv.version_id.clone());
2454 }
2455 }
2456 // v0.5 #30: same explicit-then-default lock-state commit as the
2457 // body-bearing branch above, so a zero-length PUT also picks up
2458 // bucket-default retention.
2459 if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2460 if explicit_lock_mode.is_some()
2461 || explicit_retain_until.is_some()
2462 || explicit_legal_hold_on.is_some()
2463 {
2464 let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2465 if let Some(m) = explicit_lock_mode {
2466 state.mode = Some(m);
2467 }
2468 if let Some(u) = explicit_retain_until {
2469 state.retain_until = Some(u);
2470 }
2471 if let Some(lh) = explicit_legal_hold_on {
2472 state.legal_hold_on = lh;
2473 }
2474 mgr.set(&put_bucket, &put_key, state);
2475 }
2476 mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2477 }
2478 // v0.6 #35: same notification fire-point as the body-bearing PUT
2479 // branch above (zero-length objects still match `ObjectCreated:Put`
2480 // rules per the AWS event taxonomy).
2481 if backend_resp.is_ok()
2482 && let Some(mgr) = self.notifications.as_ref()
2483 {
2484 let dests = mgr.match_destinations(
2485 &put_bucket,
2486 &crate::notifications::EventType::ObjectCreatedPut,
2487 &put_key,
2488 );
2489 if !dests.is_empty() {
2490 let etag = backend_resp
2491 .as_ref()
2492 .ok()
2493 .and_then(|r| r.output.e_tag.clone())
2494 .map(ETag::into_value);
2495 let version_id = pending_version
2496 .as_ref()
2497 .filter(|pv| pv.versioned_response)
2498 .map(|pv| pv.version_id.clone());
2499 tokio::spawn(crate::notifications::dispatch_event(
2500 Arc::clone(mgr),
2501 put_bucket.clone(),
2502 put_key.clone(),
2503 crate::notifications::EventType::ObjectCreatedPut,
2504 Some(0),
2505 etag,
2506 version_id,
2507 format!("S4-{}", uuid::Uuid::new_v4()),
2508 ));
2509 }
2510 }
2511 // v0.6 #39: persist parsed `x-amz-tagging` for the body-less
2512 // (zero-length) PUT branch too — same shape as the body-bearing
2513 // branch above.
2514 if backend_resp.is_ok()
2515 && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2516 {
2517 mgr.put_object_tags(&put_bucket, &put_key, tags);
2518 }
2519 // v0.6 #40: cross-bucket replication for the zero-length PUT
2520 // branch — same shape as the body-bearing branch above.
2521 // v0.8.2 #61: pass `pending_version` so a versioned source's
2522 // destination receives the same shadow-key path.
2523 self.spawn_replication_if_matched(
2524 &put_bucket,
2525 &put_key,
2526 &request_tags,
2527 &bytes::Bytes::new(),
2528 &None,
2529 backend_resp.is_ok(),
2530 pending_version.as_ref(),
2531 );
2532 backend_resp
2533 }
2534
2535 // === 圧縮を解く path (GET) ===
2536 #[tracing::instrument(
2537 name = "s4.get_object",
2538 skip(self, req),
2539 fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2540 )]
2541 async fn get_object(
2542 &self,
2543 mut req: S3Request<GetObjectInput>,
2544 ) -> S3Result<S3Response<GetObjectOutput>> {
2545 let get_start = Instant::now();
2546 let get_bucket = req.input.bucket.clone();
2547 let get_key = req.input.key.clone();
2548 self.enforce_rate_limit(&req, &get_bucket)?;
2549 self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2550 // Range request の事前検出 (decompress 後 slice する path に使う)。
2551 let range_request = req.input.range.take();
2552 // v0.5 #27: pull SSE-C material from the input headers before
2553 // the request is moved into the backend. A header parse error
2554 // fails fast (no body fetch). The material is consumed below
2555 // when decrypting an S4E3-framed body; the SSE-C headers on
2556 // `req.input` are cleared so the backend doesn't see them.
2557 let sse_c_alg = req.input.sse_customer_algorithm.take();
2558 let sse_c_key = req.input.sse_customer_key.take();
2559 let sse_c_md5 = req.input.sse_customer_key_md5.take();
2560 let get_sse_c_material =
2561 extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2562
2563 // v0.5 #34: route the GET through the VersioningManager when
2564 // attached AND the bucket is in a versioning-aware state.
2565 // Resolves which version to fetch (explicit `?versionId=` query
2566 // param vs. chain latest), translates a delete-marker into 404
2567 // NoSuchKey, and rewrites the backend storage key to the shadow
2568 // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
2569 // versions. `resolved_version_id` is stamped onto the response
2570 // so clients see a coherent `x-amz-version-id` header.
2571 //
2572 // When the bucket is Unversioned (or no manager attached), the
2573 // chain-resolution step is skipped and the request flows
2574 // through the existing single-key path unchanged.
2575 let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2576 Some(mgr)
2577 if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2578 {
2579 let req_vid = req.input.version_id.take();
2580 let entry = match req_vid.as_deref() {
2581 Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2582 || S3Error::with_message(
2583 S3ErrorCode::NoSuchVersion,
2584 format!("no such version: {vid}"),
2585 ),
2586 )?,
2587 None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2588 S3Error::with_message(
2589 S3ErrorCode::NoSuchKey,
2590 format!("no such key: {get_key}"),
2591 )
2592 })?,
2593 };
2594 if entry.is_delete_marker {
2595 // S3 spec: GET without versionId on a
2596 // delete-marker latest → 404 NoSuchKey + the
2597 // response carries `x-amz-delete-marker: true`.
2598 // GET with explicit versionId pointing at a delete
2599 // marker → 405 MethodNotAllowed; we surface
2600 // NoSuchKey here for both since s3s collapses them
2601 // into the same not-found error path.
2602 return Err(S3Error::with_message(
2603 S3ErrorCode::NoSuchKey,
2604 format!("delete marker is the current version of {get_key}"),
2605 ));
2606 }
2607 if entry.version_id != crate::versioning::NULL_VERSION_ID {
2608 req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2609 }
2610 Some(entry.version_id)
2611 }
2612 _ => None,
2613 };
2614
2615 // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
2616 // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
2617 // 必要 frame だけを backend に Range GET し帯域節約する。
2618 if let Some(ref r) = range_request
2619 && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2620 {
2621 let total = index.total_original_size();
2622 let (start, end_exclusive) = match resolve_range(r, total) {
2623 Ok(v) => v,
2624 Err(e) => {
2625 return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2626 }
2627 };
2628 if let Some(plan) = index.lookup_range(start, end_exclusive) {
2629 return self
2630 .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2631 .await;
2632 }
2633 }
2634 let mut resp = self.backend.get_object(req).await?;
2635 // v0.5 #34: stamp the resolved version-id so the client sees a
2636 // coherent `x-amz-version-id` header (only for chains owned by
2637 // the manager — Unversioned buckets / no-manager paths never
2638 // set this).
2639 if let Some(ref vid) = resolved_version_id {
2640 resp.output.version_id = Some(vid.clone());
2641 }
2642 let is_multipart = is_multipart_object(&resp.output.metadata);
2643 let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2644 // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
2645 // multipart と同じ path に流す。
2646 let needs_frame_parse = is_multipart || is_framed_v2;
2647 let manifest_opt = extract_manifest(&resp.output.metadata);
2648
2649 if !needs_frame_parse && manifest_opt.is_none() {
2650 // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
2651 debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2652 return Ok(resp);
2653 }
2654
2655 if let Some(blob) = resp.output.body.take() {
2656 // v0.4 #21 / v0.5 #27: if the object was stored under SSE
2657 // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
2658 // before any frame parse / streaming decompress. Encrypted
2659 // bodies are opaque to the codec; this also forces the
2660 // buffered path because AES-GCM needs the full body for tag
2661 // verify. SSE-C uses the per-request customer key, SSE-S4
2662 // falls back to the configured keyring.
2663 let blob = if is_sse_encrypted(&resp.output.metadata) {
2664 let body = collect_blob(blob, self.max_body_bytes)
2665 .await
2666 .map_err(internal("collect SSE-encrypted body"))?;
2667 // v0.5 #28: peek the frame magic to route the right
2668 // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
2669 // through the KMS backend (async). S4E1/E2/E3 take
2670 // the sync path (keyring or customer key).
2671 //
2672 // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): the chunked
2673 // SSE-S4 frames take the *streaming* path — we hand
2674 // the response body a per-chunk verify-and-emit
2675 // Stream so the client sees chunk 0 plaintext after
2676 // one chunk-worth of AES-GCM verify (vs. waiting
2677 // for the whole body's tag), and the gateway no
2678 // longer needs to materialize the full plaintext
2679 // in memory before responding. SSE-C is out of
2680 // scope for the chunked path (chunked S4E3 is a
2681 // follow-up), so this branch requires the SSE-S4
2682 // keyring to be wired and `get_sse_c_material` to
2683 // be absent — otherwise we surface a clear
2684 // misconfiguration error instead of silently
2685 // falling through to the buffered chunked path.
2686 if matches!(
2687 crate::sse::peek_magic(&body),
2688 Some("S4E5") | Some("S4E6")
2689 ) && get_sse_c_material.is_none()
2690 {
2691 let keyring_arc = self.sse_keyring.clone().ok_or_else(|| {
2692 S3Error::with_message(
2693 S3ErrorCode::InvalidRequest,
2694 "object is SSE-S4 encrypted (S4E5/S4E6) but no --sse-s4-key is configured on this gateway",
2695 )
2696 })?;
2697 let body_len = body.len() as u64;
2698 let stream =
2699 crate::sse::decrypt_chunked_stream(body, keyring_arc.as_ref());
2700 // Stream is `'static` (the keyring borrow is
2701 // consumed up front; the cipher lives inside
2702 // the stream state — see decrypt_chunked_stream
2703 // doc), so we can move it straight into a
2704 // StreamingBlob without lifetime gymnastics.
2705 use futures::StreamExt;
2706 let mapped = stream.map(|r| {
2707 r.map_err(|e| std::io::Error::other(format!(
2708 "SSE-S4 chunked decrypt: {e}"
2709 )))
2710 });
2711 use s3s::dto::StreamingBlob;
2712 resp.output.body = Some(StreamingBlob::wrap(mapped));
2713 // Plaintext content_length is unknown until all
2714 // chunks have been verified; null it out so the
2715 // ByteStream wrapper reports `unknown` to the
2716 // HTTP layer (which then emits chunked transfer-
2717 // encoding) rather than lying about the size.
2718 resp.output.content_length = None;
2719 // The backend's checksums + ETag describe the
2720 // encrypted body (S4E5/S4E6 wire format), not
2721 // the plaintext we're about to stream — clear them
2722 // so the AWS SDK doesn't fail the GET with a
2723 // ChecksumMismatch on a successful round-trip.
2724 // Mirrors the streaming-zstd path at L1180-1185.
2725 resp.output.checksum_crc32 = None;
2726 resp.output.checksum_crc32c = None;
2727 resp.output.checksum_crc64nvme = None;
2728 resp.output.checksum_sha1 = None;
2729 resp.output.checksum_sha256 = None;
2730 resp.output.e_tag = None;
2731 let elapsed = get_start.elapsed();
2732 crate::metrics::record_get(
2733 "sse-s4-chunked",
2734 body_len,
2735 body_len,
2736 elapsed.as_secs_f64(),
2737 true,
2738 );
2739 return Ok(resp);
2740 }
2741 let plain = match crate::sse::peek_magic(&body) {
2742 Some("S4E4") => {
2743 let kms = self.kms.as_ref().ok_or_else(|| {
2744 S3Error::with_message(
2745 S3ErrorCode::InvalidRequest,
2746 "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2747 )
2748 })?;
2749 let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2750 crate::sse::decrypt_with_kms(&body, kms_ref)
2751 .await
2752 .map_err(|e| match e {
2753 crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2754 other => S3Error::with_message(
2755 S3ErrorCode::InternalError,
2756 format!("SSE-KMS decrypt failed: {other}"),
2757 ),
2758 })?
2759 }
2760 _ => {
2761 if let Some(ref m) = get_sse_c_material {
2762 crate::sse::decrypt(
2763 &body,
2764 crate::sse::SseSource::CustomerKey {
2765 key: &m.key,
2766 key_md5: &m.key_md5,
2767 },
2768 )
2769 .map_err(sse_c_error_to_s3)?
2770 } else {
2771 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2772 S3Error::with_message(
2773 S3ErrorCode::InvalidRequest,
2774 "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2775 )
2776 })?;
2777 crate::sse::decrypt(&body, keyring).map_err(|e| {
2778 S3Error::with_message(
2779 S3ErrorCode::InternalError,
2780 format!("SSE-S4 decrypt failed: {e}"),
2781 )
2782 })?
2783 }
2784 }
2785 };
2786 // v0.5 #28: parse out the on-disk wrapped DEK's key id
2787 // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
2788 if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2789 && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2790 {
2791 resp.output.server_side_encryption = Some(
2792 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2793 );
2794 resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2795 }
2796 bytes_to_blob(plain)
2797 } else if let Some(ref m) = get_sse_c_material {
2798 // Client sent SSE-C headers for an unencrypted object —
2799 // mirror AWS S3's 400 InvalidRequest.
2800 let _ = m;
2801 return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2802 } else {
2803 blob
2804 };
2805 // v0.5 #27: SSE-C echo on success — algorithm + key MD5
2806 // tell the client that the supplied key was the one used.
2807 if let Some(ref m) = get_sse_c_material {
2808 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2809 resp.output.sse_customer_key_md5 = Some(
2810 base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2811 );
2812 }
2813 // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
2814 // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
2815 // codec が streaming-aware なら body を chunk-by-chunk で decompress して
2816 // 即座に client に流す。
2817 //
2818 // ただし Range request 時は streaming できない (slice するため total bytes
2819 // が必要) → buffered path に fall through。
2820 if range_request.is_none()
2821 && !needs_frame_parse
2822 && let Some(ref m) = manifest_opt
2823 && supports_streaming_decompress(m.codec)
2824 && m.codec == CodecKind::CpuZstd
2825 {
2826 let decompressed_blob = cpu_zstd_decompress_stream(blob);
2827 resp.output.content_length = Some(m.original_size as i64);
2828 resp.output.checksum_crc32 = None;
2829 resp.output.checksum_crc32c = None;
2830 resp.output.checksum_crc64nvme = None;
2831 resp.output.checksum_sha1 = None;
2832 resp.output.checksum_sha256 = None;
2833 resp.output.e_tag = None;
2834 resp.output.body = Some(decompressed_blob);
2835 let elapsed = get_start.elapsed();
2836 crate::metrics::record_get(
2837 m.codec.as_str(),
2838 m.compressed_size,
2839 m.original_size,
2840 elapsed.as_secs_f64(),
2841 true,
2842 );
2843 info!(
2844 op = "get_object",
2845 bucket = %get_bucket,
2846 key = %get_key,
2847 codec = m.codec.as_str(),
2848 bytes_in = m.compressed_size,
2849 bytes_out = m.original_size,
2850 path = "streaming",
2851 setup_latency_ms = elapsed.as_millis() as u64,
2852 "S4 get started (streaming)"
2853 );
2854 return Ok(resp);
2855 }
2856 // Passthrough: そのまま流す (Range なしの場合のみ streaming)
2857 if range_request.is_none()
2858 && !needs_frame_parse
2859 && let Some(ref m) = manifest_opt
2860 && m.codec == CodecKind::Passthrough
2861 {
2862 resp.output.content_length = Some(m.original_size as i64);
2863 resp.output.checksum_crc32 = None;
2864 resp.output.checksum_crc32c = None;
2865 resp.output.checksum_crc64nvme = None;
2866 resp.output.checksum_sha1 = None;
2867 resp.output.checksum_sha256 = None;
2868 resp.output.e_tag = None;
2869 resp.output.body = Some(blob);
2870 debug!("S4 get_object: passthrough streaming");
2871 return Ok(resp);
2872 }
2873
2874 // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
2875 let bytes = collect_blob(blob, self.max_body_bytes)
2876 .await
2877 .map_err(internal("collect get body"))?;
2878
2879 let decompressed = if needs_frame_parse {
2880 // multipart objects と framed-v2 single-PUT objects は同じ
2881 // S4F2 frame 列なので decompress_multipart で統一処理
2882 self.decompress_multipart(bytes).await?
2883 } else {
2884 let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2885 self.registry
2886 .decompress(bytes, manifest)
2887 .await
2888 .map_err(internal("registry decompress"))?
2889 };
2890
2891 // Range request があれば slice。なければ full body を返す。
2892 let total_size = decompressed.len() as u64;
2893 let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2894 let (start, end) = resolve_range(r, total_size)
2895 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2896 let sliced = decompressed.slice(start as usize..end as usize);
2897 resp.output.content_range = Some(format!(
2898 "bytes {start}-{}/{total_size}",
2899 end.saturating_sub(1)
2900 ));
2901 (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2902 } else {
2903 (decompressed, None)
2904 };
2905 // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
2906 // 圧縮 size のままだと downstream が body を途中で切ってしまう)
2907 resp.output.content_length = Some(final_bytes.len() as i64);
2908 // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
2909 // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
2910 // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
2911 // (manifest 内 / frame 内) で integrity を保証する設計にする。
2912 resp.output.checksum_crc32 = None;
2913 resp.output.checksum_crc32c = None;
2914 resp.output.checksum_crc64nvme = None;
2915 resp.output.checksum_sha1 = None;
2916 resp.output.checksum_sha256 = None;
2917 resp.output.e_tag = None;
2918 let returned_size = final_bytes.len() as u64;
2919 let codec_label = manifest_opt
2920 .as_ref()
2921 .map(|m| m.codec.as_str())
2922 .unwrap_or("multipart");
2923 resp.output.body = Some(bytes_to_blob(final_bytes));
2924 if let Some(status) = status_override {
2925 resp.status = Some(status);
2926 }
2927 let elapsed = get_start.elapsed();
2928 crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2929 info!(
2930 op = "get_object",
2931 bucket = %get_bucket,
2932 key = %get_key,
2933 codec = codec_label,
2934 bytes_out = returned_size,
2935 total_object_size = total_size,
2936 range = range_request.is_some(),
2937 path = "buffered",
2938 latency_ms = elapsed.as_millis() as u64,
2939 "S4 get completed (buffered)"
2940 );
2941 }
2942 // v0.6 #40: echo the recorded `x-amz-replication-status` so
2943 // consumers can poll progress (PENDING / COMPLETED / FAILED).
2944 if let Some(mgr) = self.replication.as_ref()
2945 && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2946 {
2947 resp.output.replication_status =
2948 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2949 }
2950 Ok(resp)
2951 }
2952
2953 // === passthrough delegations ===
2954 async fn head_bucket(
2955 &self,
2956 req: S3Request<HeadBucketInput>,
2957 ) -> S3Result<S3Response<HeadBucketOutput>> {
2958 self.backend.head_bucket(req).await
2959 }
2960 async fn list_buckets(
2961 &self,
2962 req: S3Request<ListBucketsInput>,
2963 ) -> S3Result<S3Response<ListBucketsOutput>> {
2964 self.backend.list_buckets(req).await
2965 }
2966 async fn create_bucket(
2967 &self,
2968 req: S3Request<CreateBucketInput>,
2969 ) -> S3Result<S3Response<CreateBucketOutput>> {
2970 self.backend.create_bucket(req).await
2971 }
2972 async fn delete_bucket(
2973 &self,
2974 req: S3Request<DeleteBucketInput>,
2975 ) -> S3Result<S3Response<DeleteBucketOutput>> {
2976 self.backend.delete_bucket(req).await
2977 }
2978 async fn head_object(
2979 &self,
2980 req: S3Request<HeadObjectInput>,
2981 ) -> S3Result<S3Response<HeadObjectOutput>> {
2982 // v0.6 #40: capture bucket/key before req is consumed so the
2983 // replication-status echo can look the entry up.
2984 let head_bucket = req.input.bucket.clone();
2985 let head_key = req.input.key.clone();
2986 let mut resp = self.backend.head_object(req).await?;
2987 if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2988 // 客側には decompress 後の意味のある content_length / checksum を返す。
2989 // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
2990 // (S4 は manifest 内の crc32c で integrity を担保する)。
2991 resp.output.content_length = Some(manifest.original_size as i64);
2992 resp.output.checksum_crc32 = None;
2993 resp.output.checksum_crc32c = None;
2994 resp.output.checksum_crc64nvme = None;
2995 resp.output.checksum_sha1 = None;
2996 resp.output.checksum_sha256 = None;
2997 resp.output.e_tag = None;
2998 }
2999 // v0.6 #40: echo `x-amz-replication-status` (PENDING / COMPLETED
3000 // / FAILED) so consumers can poll progress without a GET.
3001 if let Some(mgr) = self.replication.as_ref()
3002 && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
3003 {
3004 resp.output.replication_status =
3005 Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
3006 }
3007 // v0.7 #48 BUG-4 fix: HEAD must echo SSE indicators so SDKs
3008 // and pipelines see the same posture they got on PUT. The PUT
3009 // path stamps `s4-sse-type` metadata for exactly this — HEAD
3010 // doesn't fetch the body, so it can't peek frame magic.
3011 if let Some(meta) = resp.output.metadata.as_ref()
3012 && let Some(sse_type) = meta.get("s4-sse-type")
3013 {
3014 {
3015 match sse_type.as_str() {
3016 "aws:kms" => {
3017 resp.output.server_side_encryption = Some(
3018 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3019 );
3020 if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
3021 resp.output.ssekms_key_id = Some(key_id.clone());
3022 }
3023 }
3024 _ => {
3025 resp.output.server_side_encryption = Some(
3026 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
3027 );
3028 if let Some(md5) = meta.get("s4-sse-c-key-md5") {
3029 resp.output.sse_customer_algorithm =
3030 Some(crate::sse::SSE_C_ALGORITHM.into());
3031 resp.output.sse_customer_key_md5 = Some(md5.clone());
3032 }
3033 }
3034 }
3035 }
3036 }
3037 Ok(resp)
3038 }
3039 async fn delete_object(
3040 &self,
3041 mut req: S3Request<DeleteObjectInput>,
3042 ) -> S3Result<S3Response<DeleteObjectOutput>> {
3043 let bucket = req.input.bucket.clone();
3044 let key = req.input.key.clone();
3045 self.enforce_rate_limit(&req, &bucket)?;
3046 self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
3047 // v0.6 #42: MFA Delete enforcement. When the bucket has
3048 // MFA-Delete = Enabled, every DELETE / DELETE-version /
3049 // delete-marker form needs `x-amz-mfa: <serial> <code>` (RFC 6238
3050 // 6-digit TOTP). Runs *before* the WORM / versioning routers so
3051 // a missing token is denied for free regardless of which delete
3052 // path the request would otherwise take.
3053 if let Some(mgr) = self.mfa_delete.as_ref()
3054 && mgr.is_enabled(&bucket)
3055 {
3056 let header = req.input.mfa.as_deref();
3057 if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
3058 crate::metrics::record_mfa_delete_denial(&bucket);
3059 return Err(mfa_error_to_s3(e));
3060 }
3061 }
3062 // v0.5 #30: refuse the delete while a WORM lock is in effect.
3063 // Compliance can never be bypassed; Governance can be overridden
3064 // via `x-amz-bypass-governance-retention: true`; legal hold
3065 // never. The check happens before the versioning router so a
3066 // locked object can't be soft-deleted (delete-marker push) on an
3067 // Enabled bucket either — S3 spec says lock applies to all
3068 // delete forms.
3069 if let Some(mgr) = self.object_lock.as_ref()
3070 && let Some(state) = mgr.get(&bucket, &key)
3071 {
3072 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3073 let now = chrono::Utc::now();
3074 if !state.can_delete(now, bypass) {
3075 crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
3076 return Err(S3Error::with_message(
3077 S3ErrorCode::AccessDenied,
3078 "Access Denied because object protected by object lock",
3079 ));
3080 }
3081 }
3082 // v0.5 #34: route DELETE through the VersioningManager when the
3083 // bucket is in a versioning-aware state.
3084 //
3085 // - Enabled bucket, no version_id → push a delete marker into
3086 // the chain. NO backend object is touched (older versions
3087 // stay reachable via specific-version GET).
3088 // - Enabled / Suspended bucket, with version_id → physical
3089 // delete. Backend bytes at the shadow key (or `<key>` for
3090 // `null`) are removed; chain entry is dropped. If the deleted
3091 // entry was a delete marker, no backend bytes exist for it
3092 // (record-only).
3093 // - Suspended bucket, no version_id → push a "null" delete
3094 // marker (S3 spec); backend bytes at `<key>` are physically
3095 // removed (same as legacy).
3096 // - Unversioned bucket → fall through to legacy passthrough.
3097 if let Some(mgr) = self.versioning.as_ref() {
3098 let state = mgr.state(&bucket);
3099 if state != crate::versioning::VersioningState::Unversioned {
3100 let req_vid = req.input.version_id.take();
3101 if let Some(vid) = req_vid {
3102 // Specific-version DELETE: touch backend bytes only
3103 // when the entry was a real version (not a delete
3104 // marker, which has no backend bytes).
3105 let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
3106 let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
3107 key.clone()
3108 } else {
3109 versioned_shadow_key(&key, &vid)
3110 };
3111 let was_real_version = outcome
3112 .as_ref()
3113 .map(|o| !o.is_delete_marker)
3114 .unwrap_or(false);
3115 if was_real_version {
3116 // Best-effort backend cleanup; missing bytes
3117 // are not an error (e.g. shadow key already
3118 // GC'd).
3119 let backend_input = DeleteObjectInput {
3120 bucket: bucket.clone(),
3121 key: backend_target,
3122 ..Default::default()
3123 };
3124 let backend_req = S3Request {
3125 input: backend_input,
3126 method: http::Method::DELETE,
3127 uri: req.uri.clone(),
3128 headers: req.headers.clone(),
3129 extensions: http::Extensions::new(),
3130 credentials: req.credentials.clone(),
3131 region: req.region.clone(),
3132 service: req.service.clone(),
3133 trailing_headers: None,
3134 };
3135 let _ = self.backend.delete_object(backend_req).await;
3136 }
3137 let mut output = DeleteObjectOutput {
3138 version_id: Some(vid.clone()),
3139 ..Default::default()
3140 };
3141 if let Some(o) = outcome.as_ref()
3142 && o.is_delete_marker
3143 {
3144 output.delete_marker = Some(true);
3145 }
3146 // v0.6 #35: specific-version DELETE always counts as
3147 // a hard `ObjectRemoved:Delete` event (the chain
3148 // entry, marker or not, is gone after this call).
3149 self.fire_delete_notification(
3150 &bucket,
3151 &key,
3152 crate::notifications::EventType::ObjectRemovedDelete,
3153 Some(vid.clone()),
3154 );
3155 return Ok(S3Response::new(output));
3156 }
3157 // No version_id: record a delete marker (state-aware).
3158 let outcome = mgr.record_delete(&bucket, &key);
3159 if state == crate::versioning::VersioningState::Suspended {
3160 // Suspended buckets also evict the prior `<key>`
3161 // bytes (the previous null version is gone too).
3162 let backend_input = DeleteObjectInput {
3163 bucket: bucket.clone(),
3164 key: key.clone(),
3165 ..Default::default()
3166 };
3167 let backend_req = S3Request {
3168 input: backend_input,
3169 method: http::Method::DELETE,
3170 uri: req.uri.clone(),
3171 headers: req.headers.clone(),
3172 extensions: http::Extensions::new(),
3173 credentials: req.credentials.clone(),
3174 region: req.region.clone(),
3175 service: req.service.clone(),
3176 trailing_headers: None,
3177 };
3178 let _ = self.backend.delete_object(backend_req).await;
3179 }
3180 let output = DeleteObjectOutput {
3181 delete_marker: Some(true),
3182 version_id: outcome.version_id.clone(),
3183 ..Default::default()
3184 };
3185 // v0.6 #35: versioned bucket DELETE without a version-id
3186 // creates a delete marker — the dedicated AWS event
3187 // taxonomy entry. Suspended-state buckets also push a
3188 // (null) marker, so the same event fires there.
3189 self.fire_delete_notification(
3190 &bucket,
3191 &key,
3192 crate::notifications::EventType::ObjectRemovedDeleteMarker,
3193 outcome.version_id,
3194 );
3195 return Ok(S3Response::new(output));
3196 }
3197 }
3198 // Legacy / Unversioned path: physical delete on the backend +
3199 // best-effort sidecar cleanup (mirrors v0.4 behaviour).
3200 let resp = self.backend.delete_object(req).await?;
3201 // v0.5 #30: drop any per-object lock state once the delete has
3202 // succeeded so the freed key can be re-armed by a future PUT
3203 // under the bucket default. Reaching here implies the lock had
3204 // already passed `can_delete` above, so this is purely cleanup.
3205 if let Some(mgr) = self.object_lock.as_ref() {
3206 mgr.clear(&bucket, &key);
3207 }
3208 // v0.6 #39: drop any object-level tag set on physical delete —
3209 // the freed key starts a fresh tag history if a future PUT
3210 // re-creates it. (Versioned-delete branches above return early
3211 // and do NOT touch tags, mirroring AWS where tag state is
3212 // attached to the logical key, not the version chain.)
3213 if let Some(mgr) = self.tagging.as_ref() {
3214 mgr.delete_object_tags(&bucket, &key);
3215 }
3216 let sidecar = sidecar_key(&key);
3217 // v0.7 #49: skip the sidecar DELETE if the key + sidecar suffix
3218 // can't be encoded into a request URI — the primary delete
3219 // already succeeded and a stale sidecar is harmless (Range GET
3220 // re-validates the underlying object on next read).
3221 if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
3222 let sidecar_input = DeleteObjectInput {
3223 bucket: bucket.clone(),
3224 key: sidecar,
3225 ..Default::default()
3226 };
3227 let sidecar_req = S3Request {
3228 input: sidecar_input,
3229 method: http::Method::DELETE,
3230 uri,
3231 headers: http::HeaderMap::new(),
3232 extensions: http::Extensions::new(),
3233 credentials: None,
3234 region: None,
3235 service: None,
3236 trailing_headers: None,
3237 };
3238 let _ = self.backend.delete_object(sidecar_req).await;
3239 }
3240 // v0.6 #35: legacy unversioned-bucket hard delete fires the
3241 // canonical `ObjectRemoved:Delete` event.
3242 self.fire_delete_notification(
3243 &bucket,
3244 &key,
3245 crate::notifications::EventType::ObjectRemovedDelete,
3246 None,
3247 );
3248 Ok(resp)
3249 }
3250 async fn delete_objects(
3251 &self,
3252 req: S3Request<DeleteObjectsInput>,
3253 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
3254 // v0.6 #42: MFA Delete applies once to the whole batch (S3 spec:
3255 // when MFA-Delete is on the bucket, a missing / invalid token
3256 // fails the entire DeleteObjects request, not per-object).
3257 if let Some(mgr) = self.mfa_delete.as_ref()
3258 && mgr.is_enabled(&req.input.bucket)
3259 {
3260 let header = req.input.mfa.as_deref();
3261 if let Err(e) =
3262 crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
3263 {
3264 crate::metrics::record_mfa_delete_denial(&req.input.bucket);
3265 return Err(mfa_error_to_s3(e));
3266 }
3267 }
3268 self.backend.delete_objects(req).await
3269 }
3270 async fn copy_object(
3271 &self,
3272 mut req: S3Request<CopyObjectInput>,
3273 ) -> S3Result<S3Response<CopyObjectOutput>> {
3274 // copy is conceptually "GetObject src + PutObject dst" — enforce both.
3275 let dst_bucket = req.input.bucket.clone();
3276 let dst_key = req.input.key.clone();
3277 self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
3278 if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3279 self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
3280 }
3281 // S4-aware copy: source object に s4-* metadata がある場合、それを
3282 // destination に確実に preserve する。
3283 //
3284 // - MetadataDirective::COPY (default): backend が source metadata を
3285 // そのまま copy するので S4 metadata も自動で渡る。介入不要
3286 // - MetadataDirective::REPLACE: 客が指定した metadata で source を
3287 // 上書き → s4-* metadata が消えると destination は decompress 不能に
3288 // なる (silent corruption)。S4 が source metadata を HEAD で取得し、
3289 // s4-* fields を input.metadata に強制 merge する
3290 let needs_merge = req
3291 .input
3292 .metadata_directive
3293 .as_ref()
3294 .map(|d| d.as_str() == MetadataDirective::REPLACE)
3295 .unwrap_or(false);
3296 if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3297 let head_input = HeadObjectInput {
3298 bucket: bucket.to_string(),
3299 key: key.to_string(),
3300 ..Default::default()
3301 };
3302 let head_req = S3Request {
3303 input: head_input,
3304 method: req.method.clone(),
3305 uri: req.uri.clone(),
3306 headers: req.headers.clone(),
3307 extensions: http::Extensions::new(),
3308 credentials: req.credentials.clone(),
3309 region: req.region.clone(),
3310 service: req.service.clone(),
3311 trailing_headers: None,
3312 };
3313 if let Ok(head) = self.backend.head_object(head_req).await
3314 && let Some(src_meta) = head.output.metadata.as_ref()
3315 {
3316 let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3317 for key in [
3318 META_CODEC,
3319 META_ORIGINAL_SIZE,
3320 META_COMPRESSED_SIZE,
3321 META_CRC32C,
3322 META_MULTIPART,
3323 META_FRAMED,
3324 ] {
3325 if let Some(v) = src_meta.get(key) {
3326 // 客が同じ key を指定していたら preserve しない (= 上書き許可)
3327 // していたら何もしない。指定していなければ insert
3328 dest_meta
3329 .entry(key.to_string())
3330 .or_insert_with(|| v.clone());
3331 }
3332 }
3333 debug!(
3334 src_bucket = %bucket,
3335 src_key = %key,
3336 "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3337 );
3338 }
3339 }
3340 self.backend.copy_object(req).await
3341 }
3342 async fn list_objects(
3343 &self,
3344 req: S3Request<ListObjectsInput>,
3345 ) -> S3Result<S3Response<ListObjectsOutput>> {
3346 self.enforce_rate_limit(&req, &req.input.bucket)?;
3347 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3348 let mut resp = self.backend.list_objects(req).await?;
3349 // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
3350 // — v0.5 #34) を顧客から隠す。
3351 if let Some(contents) = resp.output.contents.as_mut() {
3352 contents.retain(|o| {
3353 o.key
3354 .as_ref()
3355 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3356 .unwrap_or(true)
3357 });
3358 }
3359 Ok(resp)
3360 }
3361 async fn list_objects_v2(
3362 &self,
3363 req: S3Request<ListObjectsV2Input>,
3364 ) -> S3Result<S3Response<ListObjectsV2Output>> {
3365 self.enforce_rate_limit(&req, &req.input.bucket)?;
3366 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3367 let mut resp = self.backend.list_objects_v2(req).await?;
3368 if let Some(contents) = resp.output.contents.as_mut() {
3369 let before = contents.len();
3370 contents.retain(|o| {
3371 o.key
3372 .as_ref()
3373 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3374 .unwrap_or(true)
3375 });
3376 // key_count も補正 (S3 spec compliance)
3377 if let Some(kc) = resp.output.key_count.as_mut() {
3378 *kc -= (before - contents.len()) as i32;
3379 }
3380 }
3381 Ok(resp)
3382 }
3383 /// v0.4 #17: filter S4-internal sidecars from versioned listings.
3384 /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
3385 /// attached AND the bucket is in a versioning-aware state, build
3386 /// the `Versions` / `DeleteMarkers` arrays directly from the
3387 /// in-memory chain (paginated + ordered the S3 way: key asc,
3388 /// version newest-first inside each key). Otherwise fall back to
3389 /// passthrough + sidecar-filter (legacy v0.4 behaviour).
3390 async fn list_object_versions(
3391 &self,
3392 req: S3Request<ListObjectVersionsInput>,
3393 ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3394 self.enforce_rate_limit(&req, &req.input.bucket)?;
3395 self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3396 // v0.5 #34: VersioningManager-owned path.
3397 if let Some(mgr) = self.versioning.as_ref()
3398 && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3399 {
3400 let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3401 let page = mgr.list_versions(
3402 &req.input.bucket,
3403 req.input.prefix.as_deref(),
3404 req.input.key_marker.as_deref(),
3405 req.input.version_id_marker.as_deref(),
3406 max_keys,
3407 );
3408 let versions: Vec<ObjectVersion> = page
3409 .versions
3410 .into_iter()
3411 .map(|e| ObjectVersion {
3412 key: Some(e.key),
3413 version_id: Some(e.version_id),
3414 is_latest: Some(e.is_latest),
3415 e_tag: Some(ETag::Strong(e.etag)),
3416 size: Some(e.size as i64),
3417 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3418 ..Default::default()
3419 })
3420 .collect();
3421 let delete_markers: Vec<DeleteMarkerEntry> = page
3422 .delete_markers
3423 .into_iter()
3424 .map(|e| DeleteMarkerEntry {
3425 key: Some(e.key),
3426 version_id: Some(e.version_id),
3427 is_latest: Some(e.is_latest),
3428 last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3429 ..Default::default()
3430 })
3431 .collect();
3432 let output = ListObjectVersionsOutput {
3433 name: Some(req.input.bucket.clone()),
3434 prefix: req.input.prefix.clone(),
3435 key_marker: req.input.key_marker.clone(),
3436 version_id_marker: req.input.version_id_marker.clone(),
3437 max_keys: req.input.max_keys,
3438 versions: if versions.is_empty() {
3439 None
3440 } else {
3441 Some(versions)
3442 },
3443 delete_markers: if delete_markers.is_empty() {
3444 None
3445 } else {
3446 Some(delete_markers)
3447 },
3448 is_truncated: Some(page.is_truncated),
3449 next_key_marker: page.next_key_marker,
3450 next_version_id_marker: page.next_version_id_marker,
3451 ..Default::default()
3452 };
3453 return Ok(S3Response::new(output));
3454 }
3455 // Legacy passthrough path (v0.4 #17 sidecar filter retained).
3456 let mut resp = self.backend.list_object_versions(req).await?;
3457 if let Some(versions) = resp.output.versions.as_mut() {
3458 versions.retain(|v| {
3459 v.key
3460 .as_ref()
3461 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3462 .unwrap_or(true)
3463 });
3464 }
3465 if let Some(markers) = resp.output.delete_markers.as_mut() {
3466 markers.retain(|m| {
3467 m.key
3468 .as_ref()
3469 .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3470 .unwrap_or(true)
3471 });
3472 }
3473 Ok(resp)
3474 }
3475
3476 async fn create_multipart_upload(
3477 &self,
3478 mut req: S3Request<CreateMultipartUploadInput>,
3479 ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3480 // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
3481 // frame parse を起動するため、object metadata に flag を立てる。
3482 // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
3483 let codec_kind = self.registry.default_kind();
3484 let meta = req.input.metadata.get_or_insert_with(Default::default);
3485 meta.insert(META_MULTIPART.into(), "true".into());
3486 meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3487 // v0.8 #54 BUG-10 fix: take() the SSE request fields off
3488 // `req.input` so they are NOT forwarded to the backend on
3489 // CreateMultipartUpload. Same root cause as v0.7 #48 BUG-2/3 on
3490 // single-PUT — MinIO rejects SSE-C with "HTTPS required" and
3491 // SSE-KMS with "KMS not configured" when the headers reach it.
3492 // S4 owns the encrypt-then-store contract; we capture the
3493 // recipe in `multipart_state` here and apply it on Complete.
3494 let sse_c_alg = req.input.sse_customer_algorithm.take();
3495 let sse_c_key = req.input.sse_customer_key.take();
3496 let sse_c_md5 = req.input.sse_customer_key_md5.take();
3497 let sse_header = req.input.server_side_encryption.take();
3498 let sse_kms_key = req.input.ssekms_key_id.take();
3499 // Strip the encryption-context too — leaving it would make
3500 // MinIO try to validate it against a non-existent KMS key.
3501 let _ = req.input.ssekms_encryption_context.take();
3502 let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
3503 let kms_key_id = extract_kms_key_id(
3504 &sse_header,
3505 &sse_kms_key,
3506 self.kms_default_key_id.as_deref(),
3507 );
3508 // SSE-C / SSE-KMS exclusivity (mirrors put_object L1870).
3509 if sse_c_material.is_some() && kms_key_id.is_some() {
3510 return Err(S3Error::with_message(
3511 S3ErrorCode::InvalidArgument,
3512 "SSE-C and SSE-KMS cannot be used together on the same multipart upload",
3513 ));
3514 }
3515 let sse_mode = if let Some(ref m) = sse_c_material {
3516 // v0.8.2 #62 (H-6 audit fix): wrap the customer-supplied
3517 // 32-byte key in `Zeroizing` so abandoned uploads (or
3518 // normal Complete/Abort) wipe the key bytes on drop. The
3519 // `key_md5` is the public fingerprint and stays as a
3520 // bare `[u8; 16]`.
3521 crate::multipart_state::MultipartSseMode::SseC {
3522 key: zeroize::Zeroizing::new(m.key),
3523 key_md5: m.key_md5,
3524 }
3525 } else if let Some(ref kid) = kms_key_id {
3526 // KMS pre-flight: fail at Create rather than at Complete if
3527 // the gateway has no KMS backend wired (mirrors the
3528 // put_object L1879 check).
3529 if self.kms.is_none() {
3530 return Err(S3Error::with_message(
3531 S3ErrorCode::InvalidRequest,
3532 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3533 ));
3534 }
3535 crate::multipart_state::MultipartSseMode::SseKms { key_id: kid.clone() }
3536 } else if self.sse_keyring.is_some() {
3537 // SSE-S4: server-driven transparent encryption. Activates
3538 // whenever the gateway has a keyring configured AND the
3539 // client didn't pick a different SSE mode.
3540 crate::multipart_state::MultipartSseMode::SseS4
3541 } else {
3542 crate::multipart_state::MultipartSseMode::None
3543 };
3544 // v0.8 #54 BUG-9 fix: parse the Tagging header on Create. The
3545 // single-PUT path does this on PutObject; the multipart path
3546 // captures it now and commits via TagManager on Complete.
3547 let request_tags: Option<crate::tagging::TagSet> = req
3548 .input
3549 .tagging
3550 .as_deref()
3551 .map(crate::tagging::parse_tagging_header)
3552 .transpose()
3553 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
3554 // Strip the `Tagging` field off the input so the backend
3555 // doesn't try to apply it (no-op on MinIO but keeps the wire
3556 // clean).
3557 let _ = req.input.tagging.take();
3558 // Object Lock recipe (BUG-7 — captured here, applied on Complete).
3559 let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
3560 .input
3561 .object_lock_mode
3562 .as_ref()
3563 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3564 let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
3565 .input
3566 .object_lock_retain_until_date
3567 .as_ref()
3568 .and_then(timestamp_to_chrono_utc);
3569 let explicit_legal_hold_on: bool = req
3570 .input
3571 .object_lock_legal_hold_status
3572 .as_ref()
3573 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3574 .unwrap_or(false);
3575 let bucket = req.input.bucket.clone();
3576 let key = req.input.key.clone();
3577 debug!(
3578 bucket = %bucket,
3579 key = %key,
3580 codec = codec_kind.as_str(),
3581 sse = ?sse_mode,
3582 "S4 create_multipart_upload: marking object for per-part compression"
3583 );
3584 let mut resp = self.backend.create_multipart_upload(req).await?;
3585 // Stash the per-upload context only after the backend handed
3586 // us an upload_id (failed Creates leave nothing in the store).
3587 if let Some(upload_id) = resp.output.upload_id.as_ref() {
3588 self.multipart_state.put(
3589 upload_id,
3590 crate::multipart_state::MultipartUploadContext {
3591 bucket,
3592 key,
3593 sse: sse_mode.clone(),
3594 tags: request_tags,
3595 object_lock_mode: explicit_lock_mode,
3596 object_lock_retain_until: explicit_retain_until,
3597 object_lock_legal_hold: explicit_legal_hold_on,
3598 },
3599 );
3600 }
3601 // SSE-C / SSE-KMS response echo (mirrors put_object L2036-L2050).
3602 match &sse_mode {
3603 crate::multipart_state::MultipartSseMode::SseC { key_md5, .. } => {
3604 resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
3605 resp.output.sse_customer_key_md5 = Some(
3606 base64::engine::general_purpose::STANDARD.encode(key_md5),
3607 );
3608 }
3609 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3610 resp.output.server_side_encryption = Some(
3611 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3612 );
3613 resp.output.ssekms_key_id = Some(key_id.clone());
3614 }
3615 _ => {}
3616 }
3617 Ok(resp)
3618 }
3619
3620 async fn upload_part(
3621 &self,
3622 mut req: S3Request<UploadPartInput>,
3623 ) -> S3Result<S3Response<UploadPartOutput>> {
3624 // 各 part を圧縮して frame header 付きで forward。GET 時に
3625 // `decompress_multipart` が frame iter で順に解凍する。
3626 // **per-part codec dispatch**: dispatcher が body 先頭 sample から
3627 // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
3628 // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
3629 //
3630 // v0.8 #54 BUG-5/BUG-10 fix: lookup the per-upload SSE
3631 // context captured by `create_multipart_upload` and (a) strip
3632 // any SSE-C request headers off `req.input` so the backend
3633 // doesn't see them — same root cause as v0.7 #48 BUG-2/3 on
3634 // single-PUT; MinIO refuses SSE-C parts over HTTP — and (b)
3635 // observe that an upload context exists for `upload_id`. The
3636 // actual encrypt happens once at `complete_multipart_upload`
3637 // time on the assembled body (the per-part-encrypt approach
3638 // would require a matching multi-segment decrypt path on GET;
3639 // encrypting the whole assembled body keeps the GET path's
3640 // `is_sse_encrypted` branch in get_object L2429 working
3641 // unchanged).
3642 let sse_ctx = self
3643 .multipart_state
3644 .get(req.input.upload_id.as_str());
3645 // v0.8.2 #62 (H-1 audit fix): SSE-C key consistency check.
3646 // The AWS S3 spec requires the same SSE-C key headers on
3647 // every UploadPart and rejects mismatches with 400. Prior to
3648 // #62 we silently stripped the headers (BUG-10 fix) without
3649 // validating them, allowing a client to send part 1 under
3650 // key-A and part 2 under key-B; both got stored, then
3651 // re-encrypted with key-A on Complete — the client thinks
3652 // part 2 is under key-B but a GET with key-B would in fact
3653 // hit the part-1 ciphertext that was actually encrypted with
3654 // key-A. That would either decrypt successfully (silent
3655 // corruption: client lost track of which key encrypts what)
3656 // or fail in a confusing way. Validate the per-part headers
3657 // now and reject with 400 InvalidArgument on mismatch /
3658 // omission / partial supply, matching real-S3 behaviour.
3659 if let Some(ref ctx) = sse_ctx {
3660 if let crate::multipart_state::MultipartSseMode::SseC { key_md5: ctx_md5, .. } =
3661 &ctx.sse
3662 {
3663 let alg = req.input.sse_customer_algorithm.take();
3664 let key_b64 = req.input.sse_customer_key.take();
3665 let md5_b64 = req.input.sse_customer_key_md5.take();
3666 match (alg, key_b64, md5_b64) {
3667 (Some(a), Some(k), Some(m)) => {
3668 // Parse + validate; if the per-part headers
3669 // are themselves malformed (algorithm not
3670 // AES256, MD5 mismatch, key not 32 bytes)
3671 // surface the same 400 the single-PUT path
3672 // would. Then compare the parsed MD5 to the
3673 // upload-context's MD5; mismatch is a
3674 // different-key UploadPart and must reject.
3675 let part_material =
3676 crate::sse::parse_customer_key_headers(&a, &k, &m)
3677 .map_err(sse_c_error_to_s3)?;
3678 if part_material.key_md5 != *ctx_md5 {
3679 return Err(S3Error::with_message(
3680 S3ErrorCode::InvalidArgument,
3681 "SSE-C key on UploadPart does not match the key supplied on CreateMultipartUpload",
3682 ));
3683 }
3684 // OK — same key as Create. Headers are
3685 // already taken off `req.input` so the
3686 // backend never sees them.
3687 }
3688 (None, None, None) => {
3689 // AWS S3 spec: SSE-C headers MUST be replayed
3690 // on every UploadPart of an SSE-C multipart.
3691 // Real-S3 returns 400 InvalidRequest in this
3692 // case; mirror that.
3693 return Err(S3Error::with_message(
3694 S3ErrorCode::InvalidRequest,
3695 "SSE-C requires customer-key headers on every UploadPart (CreateMultipartUpload was SSE-C)",
3696 ));
3697 }
3698 _ => {
3699 // Partial header set (e.g. algorithm + key
3700 // but no MD5) — same handling as the
3701 // single-PUT `extract_sse_c_material` helper.
3702 return Err(S3Error::with_message(
3703 S3ErrorCode::InvalidRequest,
3704 "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
3705 ));
3706 }
3707 }
3708 } else {
3709 // CreateMultipartUpload was non-SSE-C (None / SseS4 /
3710 // SseKms). A part that arrives carrying SSE-C headers
3711 // is either a confused client or an attempt to
3712 // smuggle SSE-C around the gateway-internal SSE
3713 // recipe. Reject with 400 InvalidRequest rather than
3714 // silently strip — the strip would let the client
3715 // believe the part was encrypted under their key
3716 // when in fact the upload's encryption recipe is
3717 // whatever the Create captured.
3718 if req.input.sse_customer_algorithm.is_some()
3719 || req.input.sse_customer_key.is_some()
3720 || req.input.sse_customer_key_md5.is_some()
3721 {
3722 return Err(S3Error::with_message(
3723 S3ErrorCode::InvalidRequest,
3724 "UploadPart sent SSE-C headers but CreateMultipartUpload was not SSE-C",
3725 ));
3726 }
3727 }
3728 } else {
3729 // No upload context registered (gateway crashed between
3730 // Create and Part, or pre-#62 abandoned-upload restore).
3731 // We can't check key consistency in this case — strip
3732 // the headers and let the request through unchanged so
3733 // the backend's `NoSuchUpload` reply (or whatever it
3734 // chooses to do) flows back to the client.
3735 let _ = req.input.sse_customer_algorithm.take();
3736 let _ = req.input.sse_customer_key.take();
3737 let _ = req.input.sse_customer_key_md5.take();
3738 }
3739 let _sse_ctx = sse_ctx;
3740 if let Some(blob) = req.input.body.take() {
3741 let bytes = collect_blob(blob, self.max_body_bytes)
3742 .await
3743 .map_err(internal("collect upload_part body"))?;
3744 let sample_len = bytes.len().min(SAMPLE_BYTES);
3745 // v0.8 #56: full part body is already in memory here; use its
3746 // length as the size hint so the dispatcher can promote to GPU
3747 // if it's big enough.
3748 let codec_kind = self
3749 .dispatcher
3750 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
3751 .await;
3752 let original_size = bytes.len() as u64;
3753 // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
3754 let (compress_res, tel) = self
3755 .registry
3756 .compress_with_telemetry(bytes, codec_kind)
3757 .await;
3758 stamp_gpu_compress_telemetry(&tel);
3759 let (compressed, manifest) = compress_res.map_err(internal("registry compress part"))?;
3760 let header = FrameHeader {
3761 codec: codec_kind,
3762 original_size,
3763 compressed_size: compressed.len() as u64,
3764 crc32c: manifest.crc32c,
3765 };
3766 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3767 write_frame(&mut framed, header, &compressed);
3768 // v0.2 #5: heuristic-based padding skip for likely-final parts.
3769 //
3770 // AWS SDK / aws-cli / boto3 always send the final (and only the
3771 // final) part below the configured part_size. So if the raw user
3772 // part is already smaller than S3's 5 MiB multipart minimum, this
3773 // is overwhelmingly likely to be the final part — and the final
3774 // part is exempt from S3's size constraint. Skipping padding here
3775 // saves up to ~5 MiB per object on highly compressible workloads.
3776 //
3777 // If a misbehaving client sends a tiny **non-final** part, S3
3778 // itself rejects with EntityTooSmall at CompleteMultipartUpload —
3779 // identical outcome to a vanilla S3 PUT, just earlier than
3780 // padding-then-complete would catch it.
3781 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3782 if !likely_final {
3783 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3784 }
3785 let framed_bytes = framed.freeze();
3786 let new_len = framed_bytes.len() as i64;
3787 // 同じ wire 互換問題が multipart にもある (content-length / checksum)
3788 req.input.content_length = Some(new_len);
3789 req.input.checksum_algorithm = None;
3790 req.input.checksum_crc32 = None;
3791 req.input.checksum_crc32c = None;
3792 req.input.checksum_crc64nvme = None;
3793 req.input.checksum_sha1 = None;
3794 req.input.checksum_sha256 = None;
3795 req.input.content_md5 = None;
3796 req.input.body = Some(bytes_to_blob(framed_bytes));
3797 debug!(
3798 part_number = ?req.input.part_number,
3799 upload_id = ?req.input.upload_id,
3800 original_size,
3801 framed_size = new_len,
3802 "S4 upload_part: framed compressed payload"
3803 );
3804 }
3805 self.backend.upload_part(req).await
3806 }
3807 async fn complete_multipart_upload(
3808 &self,
3809 mut req: S3Request<CompleteMultipartUploadInput>,
3810 ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3811 let bucket = req.input.bucket.clone();
3812 let key = req.input.key.clone();
3813 let upload_id = req.input.upload_id.clone();
3814 // v0.8.1 #59: serialise concurrent Complete invocations on the
3815 // same `(bucket, key)`. The race window the lock closes is the
3816 // GET-assembled-body → encrypt → PUT-encrypted-body triple
3817 // below (BUG-5 fix); without serialisation, two Completes for
3818 // different `upload_id` but the same logical key could each
3819 // read the other's plaintext assembled body and overwrite the
3820 // peer's encrypted result. The guard is held to function exit
3821 // (drop on `Ok` / `Err`), covering version-id mint, object-
3822 // lock apply, tagging persist, and replication enqueue too.
3823 let completion_lock = self.multipart_state.completion_lock(&bucket, &key);
3824 let _completion_guard = completion_lock.lock().await;
3825 // v0.8 #54 — fetch the per-upload context captured on Create.
3826 // `None` means an abandoned / unknown upload_id (gateway
3827 // crashed between Create and Complete, or pre-v0.8 state
3828 // restore); we still let the backend do its thing for
3829 // transparency, but we can't apply any SSE / version / lock /
3830 // tag / replication post-processing because we never captured
3831 // the recipe.
3832 let ctx = self.multipart_state.get(upload_id.as_str());
3833 // v0.8 #54 BUG-10 fix: same SSE-C header strip as upload_part
3834 // — some clients (boto3 / aws-sdk-cpp older versions) replay
3835 // the SSE-C triple on Complete too, and MinIO will choke if
3836 // they reach the backend.
3837 let _ = req.input.sse_customer_algorithm.take();
3838 let _ = req.input.sse_customer_key.take();
3839 let _ = req.input.sse_customer_key_md5.take();
3840 let mut resp = self.backend.complete_multipart_upload(req).await?;
3841 // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
3842 // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
3843 // partial fetch path が利用可能になる (Range request の帯域節約)。
3844 // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
3845 // できれば爆速になるので 1 回の cost は payback される
3846 //
3847 // v0.8 #54 BUG-5..9: this same fetch is the choke-point for
3848 // the SSE encrypt re-PUT + versioning shadow-key rewrite +
3849 // replication source-bytes capture, so we GET once and reuse
3850 // the bytes for every post-processing step.
3851 let assembled_body: Option<bytes::Bytes> =
3852 if let Ok(uri) = safe_object_uri(&bucket, &key) {
3853 let get_input = GetObjectInput {
3854 bucket: bucket.clone(),
3855 key: key.clone(),
3856 ..Default::default()
3857 };
3858 let get_req = S3Request {
3859 input: get_input,
3860 method: http::Method::GET,
3861 uri,
3862 headers: http::HeaderMap::new(),
3863 extensions: http::Extensions::new(),
3864 credentials: None,
3865 region: None,
3866 service: None,
3867 trailing_headers: None,
3868 };
3869 match self.backend.get_object(get_req).await {
3870 Ok(get_resp) => match get_resp.output.body {
3871 Some(blob) => collect_blob(blob, self.max_body_bytes).await.ok(),
3872 None => None,
3873 },
3874 Err(_) => None,
3875 }
3876 } else {
3877 None
3878 };
3879 // Sidecar build (existing behaviour, gated on assembled body).
3880 if let Some(ref body) = assembled_body
3881 && let Ok(index) = build_index_from_body(body)
3882 {
3883 self.write_sidecar(&bucket, &key, &index).await;
3884 }
3885 // From here on, post-processing depends on the context —
3886 // short-circuit when the upload had no captured recipe
3887 // (legacy / crashed-Create / pre-v0.8 state restore).
3888 if let Some(ctx) = ctx {
3889 // v0.8 #54 BUG-6 fix: mint a version-id when the bucket
3890 // is versioning-Enabled. The single-PUT path does this in
3891 // `put_object` ~L1968; multipart was the missing branch.
3892 // We mint here (post-Complete, before any re-PUT) so the
3893 // same vid threads into both the shadow-key rewrite and
3894 // the VersionEntry the manager records.
3895 let pending_version: Option<crate::versioning::PutOutcome> = self
3896 .versioning
3897 .as_ref()
3898 .map(|mgr| mgr.state(&bucket))
3899 .map(|state| match state {
3900 crate::versioning::VersioningState::Enabled => {
3901 crate::versioning::PutOutcome {
3902 version_id: crate::versioning::VersioningManager::new_version_id(),
3903 versioned_response: true,
3904 }
3905 }
3906 crate::versioning::VersioningState::Suspended
3907 | crate::versioning::VersioningState::Unversioned => {
3908 crate::versioning::PutOutcome {
3909 version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
3910 versioned_response: false,
3911 }
3912 }
3913 });
3914 // v0.8 #54 BUG-5 fix: encrypt the assembled framed body
3915 // and re-PUT it to the backend so the on-disk bytes are
3916 // SSE-encrypted. The single-PUT path does this body-by-
3917 // body inside `put_object` (L1907-L1942); for multipart,
3918 // encrypt-per-part would require a multi-segment decrypt
3919 // path on GET — we instead do a single encrypt over the
3920 // assembled framed body so the existing GET decrypt
3921 // branch (`is_sse_encrypted` → `decrypt(body, source)` →
3922 // FrameIter) handles it unchanged.
3923 //
3924 // The cost is one extra round-trip per Complete for SSE-
3925 // enabled multipart (already-paid for the sidecar build).
3926 // For single-instance gateways pointing at a co-located
3927 // backend this is negligible; cross-region operators
3928 // would benefit from per-part encrypt + multi-segment
3929 // decrypt as a follow-up.
3930 let needs_re_put = matches!(
3931 ctx.sse,
3932 crate::multipart_state::MultipartSseMode::SseS4
3933 | crate::multipart_state::MultipartSseMode::SseC { .. }
3934 | crate::multipart_state::MultipartSseMode::SseKms { .. }
3935 ) || pending_version
3936 .as_ref()
3937 .map(|pv| pv.versioned_response)
3938 .unwrap_or(false);
3939 // Snapshot replication body in advance so we can pass it
3940 // to the spawn helper after the (possibly absent) re-PUT.
3941 let replication_body = assembled_body.clone();
3942 let mut applied_metadata: Option<std::collections::HashMap<String, String>> = None;
3943 if needs_re_put && let Some(body) = assembled_body {
3944 // v0.8.1 #58: same Zeroizing pattern as put_object's
3945 // single-PUT KMS branch — DEK plaintext lives in
3946 // `Zeroizing<[u8; 32]>` for the lifetime of this
3947 // Complete handler, then is wiped on drop.
3948 let kms_wrap: Option<(
3949 zeroize::Zeroizing<[u8; 32]>,
3950 crate::kms::WrappedDek,
3951 )> = if let crate::multipart_state::MultipartSseMode::SseKms {
3952 ref key_id,
3953 } = ctx.sse
3954 {
3955 let kms = self.kms.as_ref().ok_or_else(|| {
3956 S3Error::with_message(
3957 S3ErrorCode::InvalidRequest,
3958 "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3959 )
3960 })?;
3961 let (dek, wrapped) = kms
3962 .generate_dek(key_id)
3963 .await
3964 .map_err(kms_error_to_s3)?;
3965 if dek.len() != 32 {
3966 return Err(S3Error::with_message(
3967 S3ErrorCode::InternalError,
3968 format!(
3969 "KMS backend returned a DEK of {} bytes (expected 32)",
3970 dek.len()
3971 ),
3972 ));
3973 }
3974 let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
3975 zeroize::Zeroizing::new([0u8; 32]);
3976 dek_arr.copy_from_slice(&dek);
3977 // `dek` (Zeroizing<Vec<u8>>) is dropped at scope end.
3978 Some((dek_arr, wrapped))
3979 } else {
3980 None
3981 };
3982 // Build the new metadata map: re-fetch via HEAD so
3983 // the multipart / codec markers the backend stamped
3984 // on Create flow through unchanged, then layer the
3985 // SSE markers on top.
3986 let head_req = S3Request {
3987 input: HeadObjectInput {
3988 bucket: bucket.clone(),
3989 key: key.clone(),
3990 ..Default::default()
3991 },
3992 method: http::Method::HEAD,
3993 uri: safe_object_uri(&bucket, &key)?,
3994 headers: http::HeaderMap::new(),
3995 extensions: http::Extensions::new(),
3996 credentials: None,
3997 region: None,
3998 service: None,
3999 trailing_headers: None,
4000 };
4001 let mut new_metadata: std::collections::HashMap<String, String> =
4002 match self.backend.head_object(head_req).await {
4003 Ok(h) => h.output.metadata.unwrap_or_default(),
4004 Err(_) => std::collections::HashMap::new(),
4005 };
4006 let new_body = match &ctx.sse {
4007 crate::multipart_state::MultipartSseMode::SseC { key, key_md5 } => {
4008 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
4009 new_metadata.insert("s4-sse-type".into(), "AES256".into());
4010 new_metadata.insert(
4011 "s4-sse-c-key-md5".into(),
4012 base64::engine::general_purpose::STANDARD.encode(key_md5),
4013 );
4014 // v0.8.2 #62: `key` is `&Zeroizing<[u8; 32]>`;
4015 // auto-deref through one explicit binding so
4016 // `SseSource::CustomerKey` gets the `&[u8; 32]`
4017 // it expects (mirrors the SSE-KMS DEK shape
4018 // a few lines down).
4019 let key_ref: &[u8; 32] = key;
4020 crate::sse::encrypt_with_source(
4021 &body,
4022 crate::sse::SseSource::CustomerKey { key: key_ref, key_md5 },
4023 )
4024 }
4025 crate::multipart_state::MultipartSseMode::SseKms { .. } => {
4026 let (dek, wrapped) = kms_wrap
4027 .as_ref()
4028 .expect("SseKms branch implies kms_wrap is Some");
4029 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
4030 new_metadata.insert("s4-sse-type".into(), "aws:kms".into());
4031 new_metadata
4032 .insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
4033 // v0.8.1 #58: auto-deref from `&Zeroizing<[u8; 32]>`
4034 // to `&[u8; 32]` (same shape as the put_object
4035 // single-PUT branch).
4036 let dek_ref: &[u8; 32] = dek;
4037 crate::sse::encrypt_with_source(
4038 &body,
4039 crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
4040 )
4041 }
4042 crate::multipart_state::MultipartSseMode::SseS4 => {
4043 let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
4044 S3Error::with_message(
4045 S3ErrorCode::InternalError,
4046 "SSE-S4 captured at Create but keyring missing at Complete",
4047 )
4048 })?;
4049 new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
4050 // SSE-S4 deliberately omits `s4-sse-type` so
4051 // HEAD doesn't falsely advertise AWS-style
4052 // SSE-S3 (matches the put_object L1929-L1939
4053 // comment).
4054 // v0.8 #52: same chunk_size dispatch as the
4055 // single-PUT branch — multipart Complete
4056 // re-encrypts the assembled body, so honoring
4057 // the chunked path here is required to keep
4058 // GET streaming on multipart-uploaded objects.
4059 if self.sse_chunk_size > 0 {
4060 crate::sse::encrypt_v2_chunked(
4061 &body,
4062 keyring,
4063 self.sse_chunk_size,
4064 )
4065 .map_err(|e| {
4066 S3Error::with_message(
4067 S3ErrorCode::InternalError,
4068 format!(
4069 "SSE-S4 chunked encrypt failed at Complete: {e}"
4070 ),
4071 )
4072 })?
4073 } else {
4074 crate::sse::encrypt_v2(&body, keyring)
4075 }
4076 }
4077 crate::multipart_state::MultipartSseMode::None => body.clone(),
4078 };
4079 // v0.8 #54 BUG-6 fix: write the re-PUT under the
4080 // shadow key so the version chain doesn't overwrite
4081 // the previous version on a versioned bucket. The
4082 // original (unshadowed) key was assembled by the
4083 // backend on Complete; we delete it after the shadow
4084 // PUT lands.
4085 let put_target_key = if let Some(pv) = pending_version.as_ref() {
4086 if pv.versioned_response {
4087 versioned_shadow_key(&key, &pv.version_id)
4088 } else {
4089 key.clone()
4090 }
4091 } else {
4092 key.clone()
4093 };
4094 let new_body_len = new_body.len() as i64;
4095 let put_req = S3Request {
4096 input: PutObjectInput {
4097 bucket: bucket.clone(),
4098 key: put_target_key.clone(),
4099 body: Some(bytes_to_blob(new_body.clone())),
4100 metadata: Some(new_metadata.clone()),
4101 content_length: Some(new_body_len),
4102 ..Default::default()
4103 },
4104 method: http::Method::PUT,
4105 uri: safe_object_uri(&bucket, &put_target_key)?,
4106 headers: http::HeaderMap::new(),
4107 extensions: http::Extensions::new(),
4108 credentials: None,
4109 region: None,
4110 service: None,
4111 trailing_headers: None,
4112 };
4113 self.backend.put_object(put_req).await?;
4114 // If we rewrote the storage key (versioning shadow),
4115 // we must drop the original (unshadowed) Complete-
4116 // assembled bytes so subsequent listings don't see a
4117 // duplicate.
4118 if put_target_key != key {
4119 let del_req = S3Request {
4120 input: DeleteObjectInput {
4121 bucket: bucket.clone(),
4122 key: key.clone(),
4123 ..Default::default()
4124 },
4125 method: http::Method::DELETE,
4126 uri: safe_object_uri(&bucket, &key)?,
4127 headers: http::HeaderMap::new(),
4128 extensions: http::Extensions::new(),
4129 credentials: None,
4130 region: None,
4131 service: None,
4132 trailing_headers: None,
4133 };
4134 let _ = self.backend.delete_object(del_req).await;
4135 }
4136 applied_metadata = Some(new_metadata);
4137 }
4138 // v0.8 #54 BUG-6 commit: register the new version with
4139 // the VersioningManager so list_object_versions /
4140 // GET ?versionId= see it.
4141 if let (Some(mgr), Some(pv)) = (self.versioning.as_ref(), pending_version.as_ref()) {
4142 let etag = resp
4143 .output
4144 .e_tag
4145 .clone()
4146 .map(ETag::into_value)
4147 .unwrap_or_default();
4148 let now = chrono::Utc::now();
4149 mgr.commit_put_with_version(
4150 &bucket,
4151 &key,
4152 crate::versioning::VersionEntry {
4153 version_id: pv.version_id.clone(),
4154 etag,
4155 size: replication_body
4156 .as_ref()
4157 .map(|b| b.len() as u64)
4158 .unwrap_or(0),
4159 is_delete_marker: false,
4160 created_at: now,
4161 },
4162 );
4163 if pv.versioned_response {
4164 resp.output.version_id = Some(pv.version_id.clone());
4165 }
4166 }
4167 // v0.8 #54 BUG-7 fix: persist any per-upload Object Lock
4168 // recipe + auto-apply the bucket default. Mirrors the
4169 // put_object L2057-L2074 block.
4170 if let Some(mgr) = self.object_lock.as_ref() {
4171 if ctx.object_lock_mode.is_some()
4172 || ctx.object_lock_retain_until.is_some()
4173 || ctx.object_lock_legal_hold
4174 {
4175 let mut state = mgr.get(&bucket, &key).unwrap_or_default();
4176 if let Some(m) = ctx.object_lock_mode {
4177 state.mode = Some(m);
4178 }
4179 if let Some(u) = ctx.object_lock_retain_until {
4180 state.retain_until = Some(u);
4181 }
4182 if ctx.object_lock_legal_hold {
4183 state.legal_hold_on = true;
4184 }
4185 mgr.set(&bucket, &key, state);
4186 }
4187 mgr.apply_default_on_put(&bucket, &key, chrono::Utc::now());
4188 }
4189 // v0.8 #54 BUG-9 fix: persist the captured tags via the
4190 // TagManager so GetObjectTagging returns them.
4191 if let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), ctx.tags.as_ref()) {
4192 mgr.put_object_tags(&bucket, &key, tags.clone());
4193 }
4194 // SSE-C / SSE-KMS response echo. The
4195 // CompleteMultipartUploadOutput only exposes
4196 // `server_side_encryption` + `ssekms_key_id` (no
4197 // sse_customer_* — those round-tripped on Create / parts).
4198 match &ctx.sse {
4199 crate::multipart_state::MultipartSseMode::SseC { .. } => {
4200 resp.output.server_side_encryption = Some(
4201 ServerSideEncryption::from_static(ServerSideEncryption::AES256),
4202 );
4203 }
4204 crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
4205 resp.output.server_side_encryption = Some(
4206 ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
4207 );
4208 resp.output.ssekms_key_id = Some(key_id.clone());
4209 }
4210 _ => {}
4211 }
4212 // v0.8 #54 BUG-8 fix: fire cross-bucket replication just
4213 // like put_object L2165 does. We hand the dispatcher the
4214 // assembled body bytes (post-encrypt where applicable, so
4215 // the destination ends up byte-identical to the source's
4216 // on-disk shape) plus the metadata that was actually
4217 // committed.
4218 let replication_body_bytes = replication_body.unwrap_or_default();
4219 // v0.8.2 #61: thread the multipart-Complete `pending_version`
4220 // through so a versioning-Enabled source's destination
4221 // receives the same shadow-key path (mirror of the
4222 // single-PUT branch above).
4223 self.spawn_replication_if_matched(
4224 &bucket,
4225 &key,
4226 &ctx.tags,
4227 &replication_body_bytes,
4228 &applied_metadata,
4229 true,
4230 pending_version.as_ref(),
4231 );
4232 self.multipart_state.remove(upload_id.as_str());
4233 }
4234 // v0.8.1 #59 janitor: best-effort sweep of stale completion
4235 // locks while we are still on the critical path of a single
4236 // Complete (so steady-state workloads of unique keys don't
4237 // accumulate `DashMap` entries). The sweep only retires
4238 // entries whose `Arc::strong_count == 1`, so any other in-
4239 // flight Complete on a different key keeps its lock alive.
4240 // Our own `_completion_guard` keeps `bucket`/`key`'s entry
4241 // alive across this call; it's reaped on the next Complete or
4242 // the next caller-driven prune.
4243 self.multipart_state.prune_completion_locks();
4244 Ok(resp)
4245 }
4246 async fn abort_multipart_upload(
4247 &self,
4248 req: S3Request<AbortMultipartUploadInput>,
4249 ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
4250 // v0.8 #54: drop the per-upload state (SSE-C key bytes / tag
4251 // set) promptly so an aborted upload doesn't leak the
4252 // customer's key into a long-running gateway's RSS.
4253 self.multipart_state.remove(req.input.upload_id.as_str());
4254 self.backend.abort_multipart_upload(req).await
4255 }
4256 async fn list_multipart_uploads(
4257 &self,
4258 req: S3Request<ListMultipartUploadsInput>,
4259 ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
4260 self.backend.list_multipart_uploads(req).await
4261 }
4262 async fn list_parts(
4263 &self,
4264 req: S3Request<ListPartsInput>,
4265 ) -> S3Result<S3Response<ListPartsOutput>> {
4266 self.backend.list_parts(req).await
4267 }
4268
4269 // =========================================================================
4270 // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
4271 // 持たないので、backend (= AWS S3) の動作と完全に同一。
4272 //
4273 // 既知の制限事項:
4274 // - copy_object / upload_part_copy: source object が S4-compressed の場合、
4275 // backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
4276 // coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
4277 // 経由で正しく decompress できる。MetadataDirective REPLACE で上書き
4278 // されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
4279 // - list_object_versions: versioning enabled bucket では各 version も S4
4280 // metadata を維持する。古い version も S4 経由で正しく GET できる。
4281 // =========================================================================
4282
4283 // ---- Object ACL / tagging / attributes ----
4284 async fn get_object_acl(
4285 &self,
4286 req: S3Request<GetObjectAclInput>,
4287 ) -> S3Result<S3Response<GetObjectAclOutput>> {
4288 self.backend.get_object_acl(req).await
4289 }
4290 async fn put_object_acl(
4291 &self,
4292 req: S3Request<PutObjectAclInput>,
4293 ) -> S3Result<S3Response<PutObjectAclOutput>> {
4294 self.backend.put_object_acl(req).await
4295 }
4296 // v0.6 #39: object tagging — when a `TagManager` is attached the
4297 // configuration / per-(bucket, key) state lives in the manager and
4298 // these handlers serve directly from it; when no manager is
4299 // attached they fall back to the backend (legacy passthrough so
4300 // v0.5 deployments are unaffected).
4301 async fn get_object_tagging(
4302 &self,
4303 req: S3Request<GetObjectTaggingInput>,
4304 ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
4305 let Some(mgr) = self.tagging.as_ref() else {
4306 return self.backend.get_object_tagging(req).await;
4307 };
4308 let tags = mgr
4309 .get_object_tags(&req.input.bucket, &req.input.key)
4310 .unwrap_or_default();
4311 Ok(S3Response::new(GetObjectTaggingOutput {
4312 tag_set: tagset_to_aws(&tags),
4313 ..Default::default()
4314 }))
4315 }
4316 async fn put_object_tagging(
4317 &self,
4318 req: S3Request<PutObjectTaggingInput>,
4319 ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
4320 let Some(mgr) = self.tagging.as_ref() else {
4321 return self.backend.put_object_tagging(req).await;
4322 };
4323 let bucket = req.input.bucket.clone();
4324 let key = req.input.key.clone();
4325 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4326 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4327 })?;
4328 // v0.6 #39: gate via IAM policy with both the request tags
4329 // (`s3:RequestObjectTag/<key>`) and any existing tags on the
4330 // target object (`s3:ExistingObjectTag/<key>`).
4331 let existing = mgr.get_object_tags(&bucket, &key);
4332 self.enforce_policy_with_extra(
4333 &req,
4334 "s3:PutObjectTagging",
4335 &bucket,
4336 Some(&key),
4337 Some(&parsed),
4338 existing.as_ref(),
4339 )?;
4340 mgr.put_object_tags(&bucket, &key, parsed);
4341 Ok(S3Response::new(PutObjectTaggingOutput::default()))
4342 }
4343 async fn delete_object_tagging(
4344 &self,
4345 req: S3Request<DeleteObjectTaggingInput>,
4346 ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
4347 let Some(mgr) = self.tagging.as_ref() else {
4348 return self.backend.delete_object_tagging(req).await;
4349 };
4350 let bucket = req.input.bucket.clone();
4351 let key = req.input.key.clone();
4352 let existing = mgr.get_object_tags(&bucket, &key);
4353 self.enforce_policy_with_extra(
4354 &req,
4355 "s3:DeleteObjectTagging",
4356 &bucket,
4357 Some(&key),
4358 None,
4359 existing.as_ref(),
4360 )?;
4361 mgr.delete_object_tags(&bucket, &key);
4362 Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
4363 }
4364 async fn get_object_attributes(
4365 &self,
4366 req: S3Request<GetObjectAttributesInput>,
4367 ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
4368 self.backend.get_object_attributes(req).await
4369 }
4370 async fn restore_object(
4371 &self,
4372 req: S3Request<RestoreObjectInput>,
4373 ) -> S3Result<S3Response<RestoreObjectOutput>> {
4374 self.backend.restore_object(req).await
4375 }
4376 async fn upload_part_copy(
4377 &self,
4378 req: S3Request<UploadPartCopyInput>,
4379 ) -> S3Result<S3Response<UploadPartCopyOutput>> {
4380 // v0.2 #6: byte-range aware copy when the source is S4-framed.
4381 //
4382 // For a framed source (multipart upload OR single-PUT framed-v2),
4383 // a naive byte-range passthrough would copy compressed bytes that
4384 // don't align with S4 frame boundaries — silently corrupting the
4385 // result. Instead we GET the source through S4 (which handles
4386 // decompression + Range), re-compress + re-frame as a new part,
4387 // and forward as upload_part. For non-framed sources (S4-untouched
4388 // raw objects), passthrough is correct and we keep the original
4389 // (cheaper) code path.
4390 let CopySource::Bucket {
4391 bucket: src_bucket,
4392 key: src_key,
4393 ..
4394 } = &req.input.copy_source
4395 else {
4396 return self.backend.upload_part_copy(req).await;
4397 };
4398 let src_bucket = src_bucket.to_string();
4399 let src_key = src_key.to_string();
4400
4401 // Probe metadata to decide whether the source needs S4-aware copy.
4402 let head_input = HeadObjectInput {
4403 bucket: src_bucket.clone(),
4404 key: src_key.clone(),
4405 ..Default::default()
4406 };
4407 let head_req = S3Request {
4408 input: head_input,
4409 method: http::Method::HEAD,
4410 uri: req.uri.clone(),
4411 headers: req.headers.clone(),
4412 extensions: http::Extensions::new(),
4413 credentials: req.credentials.clone(),
4414 region: req.region.clone(),
4415 service: req.service.clone(),
4416 trailing_headers: None,
4417 };
4418 let needs_s4_copy = match self.backend.head_object(head_req).await {
4419 Ok(h) => {
4420 is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
4421 }
4422 Err(_) => false,
4423 };
4424 if !needs_s4_copy {
4425 return self.backend.upload_part_copy(req).await;
4426 }
4427
4428 // Resolve the optional source byte range to pass to GET.
4429 let source_range = req
4430 .input
4431 .copy_source_range
4432 .as_ref()
4433 .map(|r| parse_copy_source_range(r))
4434 .transpose()
4435 .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
4436
4437 // GET source via S4 (handles decompression + sidecar partial fetch
4438 // when range is present). The result is the requested user-visible
4439 // byte range, fully decompressed.
4440 let mut get_input = GetObjectInput {
4441 bucket: src_bucket.clone(),
4442 key: src_key.clone(),
4443 ..Default::default()
4444 };
4445 get_input.range = source_range;
4446 let get_req = S3Request {
4447 input: get_input,
4448 method: http::Method::GET,
4449 uri: req.uri.clone(),
4450 headers: req.headers.clone(),
4451 extensions: http::Extensions::new(),
4452 credentials: req.credentials.clone(),
4453 region: req.region.clone(),
4454 service: req.service.clone(),
4455 trailing_headers: None,
4456 };
4457 let get_resp = self.get_object(get_req).await?;
4458 let blob = get_resp.output.body.ok_or_else(|| {
4459 S3Error::with_message(
4460 S3ErrorCode::InternalError,
4461 "upload_part_copy: empty body from source GET",
4462 )
4463 })?;
4464 let bytes = collect_blob(blob, self.max_body_bytes)
4465 .await
4466 .map_err(internal("collect upload_part_copy source body"))?;
4467
4468 // Compress + frame as a fresh part (mirrors upload_part path).
4469 let sample_len = bytes.len().min(SAMPLE_BYTES);
4470 // v0.8 #56: same size-hint promotion as the upload_part path.
4471 let codec_kind = self
4472 .dispatcher
4473 .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
4474 .await;
4475 let original_size = bytes.len() as u64;
4476 // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
4477 let (compress_res, tel) = self
4478 .registry
4479 .compress_with_telemetry(bytes, codec_kind)
4480 .await;
4481 stamp_gpu_compress_telemetry(&tel);
4482 let (compressed, manifest) =
4483 compress_res.map_err(internal("registry compress upload_part_copy"))?;
4484 let header = FrameHeader {
4485 codec: codec_kind,
4486 original_size,
4487 compressed_size: compressed.len() as u64,
4488 crc32c: manifest.crc32c,
4489 };
4490 let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
4491 write_frame(&mut framed, header, &compressed);
4492 let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
4493 if !likely_final {
4494 pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
4495 }
4496 let framed_bytes = framed.freeze();
4497 let framed_len = framed_bytes.len() as i64;
4498
4499 // Forward as upload_part to the destination multipart upload.
4500 let part_input = UploadPartInput {
4501 bucket: req.input.bucket.clone(),
4502 key: req.input.key.clone(),
4503 part_number: req.input.part_number,
4504 upload_id: req.input.upload_id.clone(),
4505 body: Some(bytes_to_blob(framed_bytes)),
4506 content_length: Some(framed_len),
4507 ..Default::default()
4508 };
4509 let part_req = S3Request {
4510 input: part_input,
4511 method: http::Method::PUT,
4512 uri: req.uri.clone(),
4513 headers: req.headers.clone(),
4514 extensions: http::Extensions::new(),
4515 credentials: req.credentials.clone(),
4516 region: req.region.clone(),
4517 service: req.service.clone(),
4518 trailing_headers: None,
4519 };
4520 let upload_resp = self.backend.upload_part(part_req).await?;
4521
4522 let copy_output = UploadPartCopyOutput {
4523 copy_part_result: Some(CopyPartResult {
4524 e_tag: upload_resp.output.e_tag.clone(),
4525 ..Default::default()
4526 }),
4527 ..Default::default()
4528 };
4529 Ok(S3Response::new(copy_output))
4530 }
4531
4532 // ---- Object lock / retention / legal hold (v0.5 #30) ----
4533 //
4534 // When an `ObjectLockManager` is attached the configuration / per-object
4535 // state lives in the manager and these handlers serve directly from it;
4536 // when no manager is attached they fall back to the backend (legacy
4537 // passthrough so v0.4 deployments are unaffected).
4538 async fn get_object_lock_configuration(
4539 &self,
4540 req: S3Request<GetObjectLockConfigurationInput>,
4541 ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
4542 if let Some(mgr) = self.object_lock.as_ref() {
4543 let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
4544 ObjectLockConfiguration {
4545 object_lock_enabled: Some(ObjectLockEnabled::from_static(
4546 ObjectLockEnabled::ENABLED,
4547 )),
4548 rule: Some(ObjectLockRule {
4549 default_retention: Some(DefaultRetention {
4550 days: Some(d.retention_days as i32),
4551 mode: Some(ObjectLockRetentionMode::from_static(
4552 match d.mode {
4553 crate::object_lock::LockMode::Governance => {
4554 ObjectLockRetentionMode::GOVERNANCE
4555 }
4556 crate::object_lock::LockMode::Compliance => {
4557 ObjectLockRetentionMode::COMPLIANCE
4558 }
4559 },
4560 )),
4561 years: None,
4562 }),
4563 }),
4564 }
4565 });
4566 let output = GetObjectLockConfigurationOutput {
4567 object_lock_configuration: cfg,
4568 };
4569 return Ok(S3Response::new(output));
4570 }
4571 self.backend.get_object_lock_configuration(req).await
4572 }
4573 async fn put_object_lock_configuration(
4574 &self,
4575 req: S3Request<PutObjectLockConfigurationInput>,
4576 ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
4577 if let Some(mgr) = self.object_lock.as_ref() {
4578 let bucket = req.input.bucket.clone();
4579 if let Some(cfg) = req.input.object_lock_configuration.as_ref()
4580 && let Some(rule) = cfg.rule.as_ref()
4581 && let Some(d) = rule.default_retention.as_ref()
4582 {
4583 let mode = d
4584 .mode
4585 .as_ref()
4586 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
4587 .ok_or_else(|| {
4588 S3Error::with_message(
4589 S3ErrorCode::InvalidRequest,
4590 "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
4591 )
4592 })?;
4593 // S3 spec: exactly one of Days / Years (we accept Days
4594 // outright and convert Years → Days for storage; Years
4595 // is just a UX shorthand on the wire).
4596 let days: u32 = match (d.days, d.years) {
4597 (Some(d), None) if d > 0 => d as u32,
4598 (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
4599 _ => {
4600 return Err(S3Error::with_message(
4601 S3ErrorCode::InvalidRequest,
4602 "Object Lock default retention requires exactly one of Days or Years (positive integer)",
4603 ));
4604 }
4605 };
4606 mgr.set_bucket_default(
4607 &bucket,
4608 crate::object_lock::BucketObjectLockDefault {
4609 mode,
4610 retention_days: days,
4611 },
4612 );
4613 }
4614 return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
4615 }
4616 self.backend.put_object_lock_configuration(req).await
4617 }
4618 async fn get_object_legal_hold(
4619 &self,
4620 req: S3Request<GetObjectLegalHoldInput>,
4621 ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
4622 if let Some(mgr) = self.object_lock.as_ref() {
4623 let on = mgr
4624 .get(&req.input.bucket, &req.input.key)
4625 .map(|s| s.legal_hold_on)
4626 .unwrap_or(false);
4627 let status = ObjectLockLegalHoldStatus::from_static(if on {
4628 ObjectLockLegalHoldStatus::ON
4629 } else {
4630 ObjectLockLegalHoldStatus::OFF
4631 });
4632 let output = GetObjectLegalHoldOutput {
4633 legal_hold: Some(ObjectLockLegalHold {
4634 status: Some(status),
4635 }),
4636 };
4637 return Ok(S3Response::new(output));
4638 }
4639 self.backend.get_object_legal_hold(req).await
4640 }
4641 async fn put_object_legal_hold(
4642 &self,
4643 req: S3Request<PutObjectLegalHoldInput>,
4644 ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
4645 if let Some(mgr) = self.object_lock.as_ref() {
4646 let on = req
4647 .input
4648 .legal_hold
4649 .as_ref()
4650 .and_then(|h| h.status.as_ref())
4651 .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
4652 .unwrap_or(false);
4653 mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
4654 return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
4655 }
4656 self.backend.put_object_legal_hold(req).await
4657 }
4658 async fn get_object_retention(
4659 &self,
4660 req: S3Request<GetObjectRetentionInput>,
4661 ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
4662 if let Some(mgr) = self.object_lock.as_ref() {
4663 let retention = mgr
4664 .get(&req.input.bucket, &req.input.key)
4665 .filter(|s| s.mode.is_some() || s.retain_until.is_some())
4666 .map(|s| {
4667 let mode = s.mode.map(|m| {
4668 ObjectLockRetentionMode::from_static(match m {
4669 crate::object_lock::LockMode::Governance => {
4670 ObjectLockRetentionMode::GOVERNANCE
4671 }
4672 crate::object_lock::LockMode::Compliance => {
4673 ObjectLockRetentionMode::COMPLIANCE
4674 }
4675 })
4676 });
4677 let until = s.retain_until.map(chrono_utc_to_timestamp);
4678 ObjectLockRetention {
4679 mode,
4680 retain_until_date: until,
4681 }
4682 });
4683 let output = GetObjectRetentionOutput { retention };
4684 return Ok(S3Response::new(output));
4685 }
4686 self.backend.get_object_retention(req).await
4687 }
4688 async fn put_object_retention(
4689 &self,
4690 req: S3Request<PutObjectRetentionInput>,
4691 ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
4692 if let Some(mgr) = self.object_lock.as_ref() {
4693 let bucket = req.input.bucket.clone();
4694 let key = req.input.key.clone();
4695 let bypass = req.input.bypass_governance_retention.unwrap_or(false);
4696 let retention = req.input.retention.as_ref().ok_or_else(|| {
4697 S3Error::with_message(
4698 S3ErrorCode::InvalidRequest,
4699 "PutObjectRetention requires a Retention element",
4700 )
4701 })?;
4702 let new_mode = retention
4703 .mode
4704 .as_ref()
4705 .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
4706 let new_until = retention
4707 .retain_until_date
4708 .as_ref()
4709 .map(timestamp_to_chrono_utc)
4710 .unwrap_or(None);
4711 let now = chrono::Utc::now();
4712 let existing = mgr.get(&bucket, &key).unwrap_or_default();
4713 // S3 immutability rules:
4714 // - Compliance is one-way: once set, mode cannot move to
4715 // Governance, and retain-until cannot be shortened.
4716 // - Governance can be lengthened freely; shortened only
4717 // with bypass=true.
4718 if let Some(existing_mode) = existing.mode
4719 && existing_mode == crate::object_lock::LockMode::Compliance
4720 && existing.is_locked(now)
4721 {
4722 if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
4723 return Err(S3Error::with_message(
4724 S3ErrorCode::AccessDenied,
4725 "Cannot downgrade Compliance retention to Governance while lock is active",
4726 ));
4727 }
4728 if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4729 && next < prev
4730 {
4731 return Err(S3Error::with_message(
4732 S3ErrorCode::AccessDenied,
4733 "Cannot shorten Compliance retention while lock is active",
4734 ));
4735 }
4736 }
4737 if let Some(existing_mode) = existing.mode
4738 && existing_mode == crate::object_lock::LockMode::Governance
4739 && existing.is_locked(now)
4740 && !bypass
4741 && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4742 && next < prev
4743 {
4744 return Err(S3Error::with_message(
4745 S3ErrorCode::AccessDenied,
4746 "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
4747 ));
4748 }
4749 let mut state = existing;
4750 if new_mode.is_some() {
4751 state.mode = new_mode;
4752 }
4753 if new_until.is_some() {
4754 state.retain_until = new_until;
4755 }
4756 mgr.set(&bucket, &key, state);
4757 return Ok(S3Response::new(PutObjectRetentionOutput::default()));
4758 }
4759 self.backend.put_object_retention(req).await
4760 }
4761
4762 // ---- Versioning ----
4763 // list_object_versions is implemented above in the compression-hook
4764 // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
4765 // VersioningManager is attached (v0.5 #34), serves chains directly
4766 // from the in-memory index.
4767 async fn get_bucket_versioning(
4768 &self,
4769 req: S3Request<GetBucketVersioningInput>,
4770 ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
4771 // v0.5 #34: when a VersioningManager is attached, the bucket's
4772 // versioning state lives in the manager (= S4-server's
4773 // authoritative source). Pass-through hits the backend only
4774 // when no manager is configured (legacy v0.4 behaviour).
4775 if let Some(mgr) = self.versioning.as_ref() {
4776 let output = match mgr.state(&req.input.bucket).as_aws_status() {
4777 Some(s) => GetBucketVersioningOutput {
4778 status: Some(BucketVersioningStatus::from(s.to_owned())),
4779 ..Default::default()
4780 },
4781 None => GetBucketVersioningOutput::default(),
4782 };
4783 return Ok(S3Response::new(output));
4784 }
4785 self.backend.get_bucket_versioning(req).await
4786 }
4787 async fn put_bucket_versioning(
4788 &self,
4789 req: S3Request<PutBucketVersioningInput>,
4790 ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
4791 // v0.6 #42: MFA gating on the `PutBucketVersioning` request
4792 // itself. S3 spec: when the request body carries an
4793 // `MfaDelete` element (either `Enabled` or `Disabled`), the
4794 // request must include a valid `x-amz-mfa` token — both for
4795 // the *first* enable (so the operator can't quietly side-step
4796 // the gate by never enabling it) and for any subsequent
4797 // change (so a leaked credential alone can't disable MFA
4798 // Delete to bypass it on subsequent DELETEs). Requests that
4799 // omit the `MfaDelete` element entirely (i.e. they flip only
4800 // `Status`) skip this gate, matching AWS.
4801 if let Some(mgr) = self.mfa_delete.as_ref()
4802 && let Some(target_enabled) = req
4803 .input
4804 .versioning_configuration
4805 .mfa_delete
4806 .as_ref()
4807 .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
4808 {
4809 let bucket = req.input.bucket.clone();
4810 let header = req.input.mfa.as_deref();
4811 let secret = mgr.lookup_secret(&bucket);
4812 let verified = match (header, secret.as_ref()) {
4813 (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
4814 Ok((serial, code)) => {
4815 serial == s.serial
4816 && crate::mfa::verify_totp(
4817 &s.secret_base32,
4818 &code,
4819 current_unix_secs(),
4820 )
4821 }
4822 Err(_) => false,
4823 },
4824 _ => false,
4825 };
4826 if !verified {
4827 crate::metrics::record_mfa_delete_denial(&bucket);
4828 let err = if header.is_none() {
4829 crate::mfa::MfaError::Missing
4830 } else {
4831 crate::mfa::MfaError::InvalidCode
4832 };
4833 return Err(mfa_error_to_s3(err));
4834 }
4835 mgr.set_bucket_state(&bucket, target_enabled);
4836 }
4837 // v0.5 #34: stash the new state in the manager, then forward to
4838 // the backend so any downstream that *also* tracks state
4839 // (e.g. a real S3 backend) stays in sync. Manager-attached but
4840 // backend rejection is treated as a soft-fail (state is still
4841 // owned by the manager).
4842 if let Some(mgr) = self.versioning.as_ref() {
4843 let new_state = match req
4844 .input
4845 .versioning_configuration
4846 .status
4847 .as_ref()
4848 .map(|s| s.as_str())
4849 {
4850 Some(s) if s.eq_ignore_ascii_case("Enabled") => {
4851 crate::versioning::VersioningState::Enabled
4852 }
4853 Some(s) if s.eq_ignore_ascii_case("Suspended") => {
4854 crate::versioning::VersioningState::Suspended
4855 }
4856 _ => crate::versioning::VersioningState::Unversioned,
4857 };
4858 mgr.set_state(&req.input.bucket, new_state);
4859 return Ok(S3Response::new(PutBucketVersioningOutput::default()));
4860 }
4861 self.backend.put_bucket_versioning(req).await
4862 }
4863
4864 // ---- Bucket location ----
4865 async fn get_bucket_location(
4866 &self,
4867 req: S3Request<GetBucketLocationInput>,
4868 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
4869 self.backend.get_bucket_location(req).await
4870 }
4871
4872 // ---- Bucket policy ----
4873 async fn get_bucket_policy(
4874 &self,
4875 req: S3Request<GetBucketPolicyInput>,
4876 ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
4877 self.backend.get_bucket_policy(req).await
4878 }
4879 async fn put_bucket_policy(
4880 &self,
4881 req: S3Request<PutBucketPolicyInput>,
4882 ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
4883 self.backend.put_bucket_policy(req).await
4884 }
4885 async fn delete_bucket_policy(
4886 &self,
4887 req: S3Request<DeleteBucketPolicyInput>,
4888 ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
4889 self.backend.delete_bucket_policy(req).await
4890 }
4891 async fn get_bucket_policy_status(
4892 &self,
4893 req: S3Request<GetBucketPolicyStatusInput>,
4894 ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
4895 self.backend.get_bucket_policy_status(req).await
4896 }
4897
4898 // ---- Bucket ACL ----
4899 async fn get_bucket_acl(
4900 &self,
4901 req: S3Request<GetBucketAclInput>,
4902 ) -> S3Result<S3Response<GetBucketAclOutput>> {
4903 self.backend.get_bucket_acl(req).await
4904 }
4905 async fn put_bucket_acl(
4906 &self,
4907 req: S3Request<PutBucketAclInput>,
4908 ) -> S3Result<S3Response<PutBucketAclOutput>> {
4909 self.backend.put_bucket_acl(req).await
4910 }
4911
4912 // ---- Bucket CORS (v0.6 #38) ----
4913 async fn get_bucket_cors(
4914 &self,
4915 req: S3Request<GetBucketCorsInput>,
4916 ) -> S3Result<S3Response<GetBucketCorsOutput>> {
4917 if let Some(mgr) = self.cors.as_ref() {
4918 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4919 S3Error::with_message(
4920 S3ErrorCode::NoSuchCORSConfiguration,
4921 "The CORS configuration does not exist".to_string(),
4922 )
4923 })?;
4924 let rules: Vec<CORSRule> = cfg
4925 .rules
4926 .into_iter()
4927 .map(|r| CORSRule {
4928 allowed_headers: if r.allowed_headers.is_empty() {
4929 None
4930 } else {
4931 Some(r.allowed_headers)
4932 },
4933 allowed_methods: r.allowed_methods,
4934 allowed_origins: r.allowed_origins,
4935 expose_headers: if r.expose_headers.is_empty() {
4936 None
4937 } else {
4938 Some(r.expose_headers)
4939 },
4940 id: r.id,
4941 max_age_seconds: r.max_age_seconds.map(|s| s as i32),
4942 })
4943 .collect();
4944 return Ok(S3Response::new(GetBucketCorsOutput {
4945 cors_rules: Some(rules),
4946 }));
4947 }
4948 self.backend.get_bucket_cors(req).await
4949 }
4950 async fn put_bucket_cors(
4951 &self,
4952 req: S3Request<PutBucketCorsInput>,
4953 ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4954 if let Some(mgr) = self.cors.as_ref() {
4955 let cfg = crate::cors::CorsConfig {
4956 rules: req
4957 .input
4958 .cors_configuration
4959 .cors_rules
4960 .into_iter()
4961 .map(|r| crate::cors::CorsRule {
4962 allowed_origins: r.allowed_origins,
4963 allowed_methods: r.allowed_methods,
4964 allowed_headers: r.allowed_headers.unwrap_or_default(),
4965 expose_headers: r.expose_headers.unwrap_or_default(),
4966 max_age_seconds: r.max_age_seconds.and_then(|s| {
4967 if s < 0 { None } else { Some(s as u32) }
4968 }),
4969 id: r.id,
4970 })
4971 .collect(),
4972 };
4973 mgr.put(&req.input.bucket, cfg);
4974 return Ok(S3Response::new(PutBucketCorsOutput::default()));
4975 }
4976 self.backend.put_bucket_cors(req).await
4977 }
4978 async fn delete_bucket_cors(
4979 &self,
4980 req: S3Request<DeleteBucketCorsInput>,
4981 ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4982 if let Some(mgr) = self.cors.as_ref() {
4983 mgr.delete(&req.input.bucket);
4984 return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4985 }
4986 self.backend.delete_bucket_cors(req).await
4987 }
4988
4989 // ---- Bucket lifecycle (v0.6 #37) ----
4990 async fn get_bucket_lifecycle_configuration(
4991 &self,
4992 req: S3Request<GetBucketLifecycleConfigurationInput>,
4993 ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4994 if let Some(mgr) = self.lifecycle.as_ref() {
4995 let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4996 S3Error::with_message(
4997 S3ErrorCode::NoSuchLifecycleConfiguration,
4998 "The lifecycle configuration does not exist".to_string(),
4999 )
5000 })?;
5001 let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
5002 return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
5003 rules: Some(rules),
5004 transition_default_minimum_object_size: None,
5005 }));
5006 }
5007 self.backend.get_bucket_lifecycle_configuration(req).await
5008 }
5009 async fn put_bucket_lifecycle_configuration(
5010 &self,
5011 req: S3Request<PutBucketLifecycleConfigurationInput>,
5012 ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
5013 if let Some(mgr) = self.lifecycle.as_ref() {
5014 let bucket = req.input.bucket.clone();
5015 let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
5016 let cfg = dto_lifecycle_to_internal(&dto_cfg);
5017 mgr.put(&bucket, cfg);
5018 return Ok(S3Response::new(
5019 PutBucketLifecycleConfigurationOutput::default(),
5020 ));
5021 }
5022 self.backend.put_bucket_lifecycle_configuration(req).await
5023 }
5024 async fn delete_bucket_lifecycle(
5025 &self,
5026 req: S3Request<DeleteBucketLifecycleInput>,
5027 ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
5028 if let Some(mgr) = self.lifecycle.as_ref() {
5029 mgr.delete(&req.input.bucket);
5030 return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
5031 }
5032 self.backend.delete_bucket_lifecycle(req).await
5033 }
5034
5035 // ---- Bucket tagging (v0.6 #39) ----
5036 async fn get_bucket_tagging(
5037 &self,
5038 req: S3Request<GetBucketTaggingInput>,
5039 ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
5040 let Some(mgr) = self.tagging.as_ref() else {
5041 return self.backend.get_bucket_tagging(req).await;
5042 };
5043 let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
5044 Ok(S3Response::new(GetBucketTaggingOutput {
5045 tag_set: tagset_to_aws(&tags),
5046 }))
5047 }
5048 async fn put_bucket_tagging(
5049 &self,
5050 req: S3Request<PutBucketTaggingInput>,
5051 ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
5052 let Some(mgr) = self.tagging.as_ref() else {
5053 return self.backend.put_bucket_tagging(req).await;
5054 };
5055 let bucket = req.input.bucket.clone();
5056 let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
5057 S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
5058 })?;
5059 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
5060 mgr.put_bucket_tags(&bucket, parsed);
5061 Ok(S3Response::new(PutBucketTaggingOutput::default()))
5062 }
5063 async fn delete_bucket_tagging(
5064 &self,
5065 req: S3Request<DeleteBucketTaggingInput>,
5066 ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
5067 let Some(mgr) = self.tagging.as_ref() else {
5068 return self.backend.delete_bucket_tagging(req).await;
5069 };
5070 let bucket = req.input.bucket.clone();
5071 self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
5072 mgr.delete_bucket_tags(&bucket);
5073 Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
5074 }
5075
5076 // ---- Bucket encryption ----
5077 async fn get_bucket_encryption(
5078 &self,
5079 req: S3Request<GetBucketEncryptionInput>,
5080 ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
5081 self.backend.get_bucket_encryption(req).await
5082 }
5083 async fn put_bucket_encryption(
5084 &self,
5085 req: S3Request<PutBucketEncryptionInput>,
5086 ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
5087 self.backend.put_bucket_encryption(req).await
5088 }
5089 async fn delete_bucket_encryption(
5090 &self,
5091 req: S3Request<DeleteBucketEncryptionInput>,
5092 ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
5093 self.backend.delete_bucket_encryption(req).await
5094 }
5095
5096 // ---- Bucket logging ----
5097 async fn get_bucket_logging(
5098 &self,
5099 req: S3Request<GetBucketLoggingInput>,
5100 ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
5101 self.backend.get_bucket_logging(req).await
5102 }
5103 async fn put_bucket_logging(
5104 &self,
5105 req: S3Request<PutBucketLoggingInput>,
5106 ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
5107 self.backend.put_bucket_logging(req).await
5108 }
5109
5110 // ---- Bucket notification (v0.6 #35) ----
5111 //
5112 // When a `NotificationManager` is attached, S4 itself owns per-bucket
5113 // notification configurations and the PUT / GET handlers route through
5114 // the manager. The wire DTO's queue / topic configurations map onto
5115 // S4's `Destination::Sqs` / `Destination::Sns`; LambdaFunction and
5116 // EventBridge configurations are accepted on PUT but silently dropped
5117 // (out of scope for v0.6 #35). When no manager is attached the legacy
5118 // backend-passthrough behaviour applies.
5119 async fn get_bucket_notification_configuration(
5120 &self,
5121 req: S3Request<GetBucketNotificationConfigurationInput>,
5122 ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
5123 if let Some(mgr) = self.notifications.as_ref() {
5124 let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
5125 let dto = notif_to_dto(&cfg);
5126 return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
5127 event_bridge_configuration: dto.event_bridge_configuration,
5128 lambda_function_configurations: dto.lambda_function_configurations,
5129 queue_configurations: dto.queue_configurations,
5130 topic_configurations: dto.topic_configurations,
5131 }));
5132 }
5133 self.backend
5134 .get_bucket_notification_configuration(req)
5135 .await
5136 }
5137 async fn put_bucket_notification_configuration(
5138 &self,
5139 req: S3Request<PutBucketNotificationConfigurationInput>,
5140 ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
5141 if let Some(mgr) = self.notifications.as_ref() {
5142 let cfg = notif_from_dto(&req.input.notification_configuration);
5143 mgr.put(&req.input.bucket, cfg);
5144 return Ok(S3Response::new(
5145 PutBucketNotificationConfigurationOutput::default(),
5146 ));
5147 }
5148 self.backend
5149 .put_bucket_notification_configuration(req)
5150 .await
5151 }
5152
5153 // ---- Bucket request payment ----
5154 async fn get_bucket_request_payment(
5155 &self,
5156 req: S3Request<GetBucketRequestPaymentInput>,
5157 ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
5158 self.backend.get_bucket_request_payment(req).await
5159 }
5160 async fn put_bucket_request_payment(
5161 &self,
5162 req: S3Request<PutBucketRequestPaymentInput>,
5163 ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
5164 self.backend.put_bucket_request_payment(req).await
5165 }
5166
5167 // ---- Bucket website ----
5168 async fn get_bucket_website(
5169 &self,
5170 req: S3Request<GetBucketWebsiteInput>,
5171 ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
5172 self.backend.get_bucket_website(req).await
5173 }
5174 async fn put_bucket_website(
5175 &self,
5176 req: S3Request<PutBucketWebsiteInput>,
5177 ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
5178 self.backend.put_bucket_website(req).await
5179 }
5180 async fn delete_bucket_website(
5181 &self,
5182 req: S3Request<DeleteBucketWebsiteInput>,
5183 ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
5184 self.backend.delete_bucket_website(req).await
5185 }
5186
5187 // ---- Bucket replication (v0.6 #40) ----
5188 async fn get_bucket_replication(
5189 &self,
5190 req: S3Request<GetBucketReplicationInput>,
5191 ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
5192 if let Some(mgr) = self.replication.as_ref() {
5193 return match mgr.get(&req.input.bucket) {
5194 Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
5195 replication_configuration: Some(replication_to_dto(&cfg)),
5196 })),
5197 None => Err(S3Error::with_message(
5198 S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
5199 format!("no replication configuration on bucket {}", req.input.bucket),
5200 )),
5201 };
5202 }
5203 self.backend.get_bucket_replication(req).await
5204 }
5205 async fn put_bucket_replication(
5206 &self,
5207 req: S3Request<PutBucketReplicationInput>,
5208 ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
5209 if let Some(mgr) = self.replication.as_ref() {
5210 let cfg = replication_from_dto(&req.input.replication_configuration);
5211 mgr.put(&req.input.bucket, cfg);
5212 return Ok(S3Response::new(PutBucketReplicationOutput::default()));
5213 }
5214 self.backend.put_bucket_replication(req).await
5215 }
5216 async fn delete_bucket_replication(
5217 &self,
5218 req: S3Request<DeleteBucketReplicationInput>,
5219 ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
5220 if let Some(mgr) = self.replication.as_ref() {
5221 mgr.delete(&req.input.bucket);
5222 return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
5223 }
5224 self.backend.delete_bucket_replication(req).await
5225 }
5226
5227 // ---- Bucket accelerate ----
5228 async fn get_bucket_accelerate_configuration(
5229 &self,
5230 req: S3Request<GetBucketAccelerateConfigurationInput>,
5231 ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
5232 self.backend.get_bucket_accelerate_configuration(req).await
5233 }
5234 async fn put_bucket_accelerate_configuration(
5235 &self,
5236 req: S3Request<PutBucketAccelerateConfigurationInput>,
5237 ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
5238 self.backend.put_bucket_accelerate_configuration(req).await
5239 }
5240
5241 // ---- Bucket ownership controls ----
5242 async fn get_bucket_ownership_controls(
5243 &self,
5244 req: S3Request<GetBucketOwnershipControlsInput>,
5245 ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
5246 self.backend.get_bucket_ownership_controls(req).await
5247 }
5248 async fn put_bucket_ownership_controls(
5249 &self,
5250 req: S3Request<PutBucketOwnershipControlsInput>,
5251 ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
5252 self.backend.put_bucket_ownership_controls(req).await
5253 }
5254 async fn delete_bucket_ownership_controls(
5255 &self,
5256 req: S3Request<DeleteBucketOwnershipControlsInput>,
5257 ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
5258 self.backend.delete_bucket_ownership_controls(req).await
5259 }
5260
5261 // ---- Public access block ----
5262 async fn get_public_access_block(
5263 &self,
5264 req: S3Request<GetPublicAccessBlockInput>,
5265 ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
5266 self.backend.get_public_access_block(req).await
5267 }
5268 async fn put_public_access_block(
5269 &self,
5270 req: S3Request<PutPublicAccessBlockInput>,
5271 ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
5272 self.backend.put_public_access_block(req).await
5273 }
5274 async fn delete_public_access_block(
5275 &self,
5276 req: S3Request<DeletePublicAccessBlockInput>,
5277 ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
5278 self.backend.delete_public_access_block(req).await
5279 }
5280
5281 // ====================================================================
5282 // v0.6 #41: S3 Select — server-side SQL filter on object body.
5283 //
5284 // Fetch the object via the regular `get_object` path (so SSE-C /
5285 // SSE-S4 / SSE-KMS / S4 codec all decompress + decrypt transparently),
5286 // run a small SQL subset (CSV + JSON Lines, equality / inequality /
5287 // LIKE / AND / OR / NOT) over the in-memory body, and stream the
5288 // matched rows back as AWS event-stream `Records` + `Stats` + `End`
5289 // frames.
5290 //
5291 // Limitations (deliberate, documented):
5292 // - Parquet input is rejected with NotImplemented.
5293 // - Aggregates / GROUP BY / JOIN / ORDER BY / LIMIT are rejected at
5294 // parse time as InvalidRequest (s3s 0.13 doesn't expose AWS's
5295 // domain-specific `InvalidSqlExpression` code).
5296 // - The body is fully buffered before SQL evaluation (S3 Select
5297 // streaming-during-evaluation is v0.7 scope).
5298 // - GPU-accelerated WHERE evaluation is stubbed out (always None).
5299 async fn select_object_content(
5300 &self,
5301 req: S3Request<SelectObjectContentInput>,
5302 ) -> S3Result<S3Response<SelectObjectContentOutput>> {
5303 use crate::select::{
5304 EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
5305 run_select_jsonlines,
5306 };
5307
5308 let select_bucket = req.input.bucket.clone();
5309 let select_key = req.input.key.clone();
5310 self.enforce_rate_limit(&req, &select_bucket)?;
5311 self.enforce_policy(
5312 &req,
5313 "s3:GetObject",
5314 &select_bucket,
5315 Some(&select_key),
5316 )?;
5317
5318 let request = req.input.request;
5319 let sql = request.expression.clone();
5320 if request.expression_type.as_str() != "SQL" {
5321 return Err(S3Error::with_message(
5322 S3ErrorCode::InvalidExpressionType,
5323 format!(
5324 "ExpressionType must be SQL, got: {}",
5325 request.expression_type.as_str()
5326 ),
5327 ));
5328 }
5329
5330 let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
5331 SelectInputFormat::JsonLines
5332 } else if let Some(csv) = request.input_serialization.csv.as_ref() {
5333 let has_header = csv
5334 .file_header_info
5335 .as_ref()
5336 .map(|h| {
5337 let s = h.as_str();
5338 s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
5339 })
5340 .unwrap_or(false);
5341 let delim = csv
5342 .field_delimiter
5343 .as_deref()
5344 .and_then(|s| s.chars().next())
5345 .unwrap_or(',');
5346 SelectInputFormat::Csv {
5347 has_header,
5348 delimiter: delim,
5349 }
5350 } else if request.input_serialization.parquet.is_some() {
5351 return Err(S3Error::with_message(
5352 S3ErrorCode::NotImplemented,
5353 "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
5354 ));
5355 } else {
5356 return Err(S3Error::with_message(
5357 S3ErrorCode::InvalidRequest,
5358 "InputSerialization requires exactly one of CSV / JSON / Parquet",
5359 ));
5360 };
5361 if let Some(ct) = request.input_serialization.compression_type.as_ref()
5362 && !ct.as_str().eq_ignore_ascii_case("NONE")
5363 {
5364 return Err(S3Error::with_message(
5365 S3ErrorCode::NotImplemented,
5366 format!(
5367 "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
5368 ct.as_str()
5369 ),
5370 ));
5371 }
5372
5373 let output_format = if request.output_serialization.json.is_some() {
5374 SelectOutputFormat::Json
5375 } else if request.output_serialization.csv.is_some() {
5376 SelectOutputFormat::Csv
5377 } else {
5378 return Err(S3Error::with_message(
5379 S3ErrorCode::InvalidRequest,
5380 "OutputSerialization requires exactly one of CSV / JSON",
5381 ));
5382 };
5383
5384 let get_input = GetObjectInput {
5385 bucket: select_bucket.clone(),
5386 key: select_key.clone(),
5387 sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
5388 sse_customer_key: req.input.sse_customer_key.clone(),
5389 sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
5390 ..Default::default()
5391 };
5392 let get_req = S3Request {
5393 input: get_input,
5394 method: http::Method::GET,
5395 uri: format!("/{}/{}", select_bucket, select_key)
5396 .parse()
5397 .map_err(|e| {
5398 S3Error::with_message(
5399 S3ErrorCode::InternalError,
5400 format!("constructing inner GET URI: {e}"),
5401 )
5402 })?,
5403 headers: http::HeaderMap::new(),
5404 extensions: http::Extensions::new(),
5405 credentials: req.credentials.clone(),
5406 region: req.region.clone(),
5407 service: req.service.clone(),
5408 trailing_headers: None,
5409 };
5410 let mut get_resp = self.get_object(get_req).await?;
5411 let blob = get_resp.output.body.take().ok_or_else(|| {
5412 S3Error::with_message(
5413 S3ErrorCode::InternalError,
5414 "Select: object body was empty after GET",
5415 )
5416 })?;
5417 let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
5418 .await
5419 .map_err(internal("collect Select body"))?;
5420 let scanned = body_bytes.len() as u64;
5421
5422 let matched_payload = match input_format {
5423 SelectInputFormat::JsonLines => {
5424 run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
5425 |e| select_error_to_s3(e, "JSON Lines"),
5426 )?
5427 }
5428 SelectInputFormat::Csv { .. } => {
5429 run_select_csv(&sql, &body_bytes, input_format, output_format)
5430 .map_err(|e| select_error_to_s3(e, "CSV"))?
5431 }
5432 };
5433
5434 let returned = matched_payload.len() as u64;
5435 let processed = scanned;
5436 let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
5437 if !matched_payload.is_empty() {
5438 events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
5439 payload: Some(bytes::Bytes::from(matched_payload)),
5440 })));
5441 }
5442 events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
5443 details: Some(Stats {
5444 bytes_scanned: Some(scanned as i64),
5445 bytes_processed: Some(processed as i64),
5446 bytes_returned: Some(returned as i64),
5447 }),
5448 })));
5449 events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
5450 // Touch EventStreamWriter so the public API stays linked into the
5451 // build (the actual wire framing is delegated to s3s).
5452 let _writer = EventStreamWriter::new();
5453
5454 let stream =
5455 SelectObjectContentEventStream::new(futures::stream::iter(events));
5456 let output = SelectObjectContentOutput {
5457 payload: Some(stream),
5458 };
5459 Ok(S3Response::new(output))
5460 }
5461
5462 // ---- Bucket Inventory configuration (v0.6 #36) ----
5463 //
5464 // When an `InventoryManager` is attached, S4-server owns the
5465 // configuration store and these handlers no longer pass through to
5466 // the backend. The mapping between the s3s-typed
5467 // `InventoryConfiguration` and the inventory module's internal
5468 // `InventoryConfig` is intentionally lossy: only the fields S4
5469 // actually uses for periodic CSV emission survive the round trip
5470 // (id, source bucket, destination bucket / prefix, format, included
5471 // versions, schedule frequency). Optional fields, encryption, and
5472 // filter prefixes are accepted on PUT and re-surfaced on GET via
5473 // a best-effort default-shape `InventoryConfiguration` so the
5474 // client sees a roundtrip-clean response.
5475 async fn put_bucket_inventory_configuration(
5476 &self,
5477 req: S3Request<PutBucketInventoryConfigurationInput>,
5478 ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
5479 if let Some(mgr) = self.inventory.as_ref() {
5480 let cfg = inv_from_dto(
5481 &req.input.bucket,
5482 &req.input.id,
5483 &req.input.inventory_configuration,
5484 );
5485 mgr.put(cfg);
5486 return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
5487 }
5488 self.backend.put_bucket_inventory_configuration(req).await
5489 }
5490
5491 async fn get_bucket_inventory_configuration(
5492 &self,
5493 req: S3Request<GetBucketInventoryConfigurationInput>,
5494 ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
5495 if let Some(mgr) = self.inventory.as_ref() {
5496 let cfg = mgr.get(&req.input.bucket, &req.input.id);
5497 if let Some(cfg) = cfg {
5498 let out = GetBucketInventoryConfigurationOutput {
5499 inventory_configuration: Some(inv_to_dto(&cfg)),
5500 };
5501 return Ok(S3Response::new(out));
5502 }
5503 // AWS returns `NoSuchConfiguration` (404) when the id has no
5504 // matching inventory configuration on the bucket. The
5505 // generated `S3ErrorCode` enum doesn't expose a typed variant
5506 // for this code, so we round-trip through `from_bytes` which
5507 // wraps unknown codes as `Custom(...)` (= the AWS-canonical
5508 // error-code string survives into the XML response envelope).
5509 let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
5510 .unwrap_or(S3ErrorCode::NoSuchKey);
5511 return Err(S3Error::with_message(
5512 code,
5513 format!(
5514 "no inventory configuration with id={} on bucket={}",
5515 req.input.id, req.input.bucket
5516 ),
5517 ));
5518 }
5519 self.backend.get_bucket_inventory_configuration(req).await
5520 }
5521
5522 async fn list_bucket_inventory_configurations(
5523 &self,
5524 req: S3Request<ListBucketInventoryConfigurationsInput>,
5525 ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
5526 if let Some(mgr) = self.inventory.as_ref() {
5527 let list = mgr.list_for_bucket(&req.input.bucket);
5528 let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
5529 let out = ListBucketInventoryConfigurationsOutput {
5530 continuation_token: req.input.continuation_token.clone(),
5531 inventory_configuration_list: if dto_list.is_empty() {
5532 None
5533 } else {
5534 Some(dto_list)
5535 },
5536 is_truncated: Some(false),
5537 next_continuation_token: None,
5538 };
5539 return Ok(S3Response::new(out));
5540 }
5541 self.backend.list_bucket_inventory_configurations(req).await
5542 }
5543
5544 async fn delete_bucket_inventory_configuration(
5545 &self,
5546 req: S3Request<DeleteBucketInventoryConfigurationInput>,
5547 ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
5548 if let Some(mgr) = self.inventory.as_ref() {
5549 mgr.delete(&req.input.bucket, &req.input.id);
5550 return Ok(S3Response::new(
5551 DeleteBucketInventoryConfigurationOutput::default(),
5552 ));
5553 }
5554 self.backend.delete_bucket_inventory_configuration(req).await
5555 }
5556}
5557
5558// ---------------------------------------------------------------------------
5559// v0.6 #36: Convert between the s3s-typed `InventoryConfiguration` (the wire
5560// surface) and our internal `crate::inventory::InventoryConfig`. Only the
5561// fields S4 actually uses for CSV emission survive the round trip; the
5562// missing fields (filter prefix, optional fields, encryption) are dropped on
5563// PUT and re-rendered as the AWS-default shape on GET so the client sees a
5564// well-formed `InventoryConfiguration`.
5565// ---------------------------------------------------------------------------
5566
5567fn inv_from_dto(
5568 bucket: &str,
5569 id: &str,
5570 dto: &InventoryConfiguration,
5571) -> crate::inventory::InventoryConfig {
5572 let frequency_hours = match dto.schedule.frequency.as_str() {
5573 "Weekly" => 24 * 7,
5574 // Daily is the default; anything S4 doesn't recognise (incl.
5575 // empty, which is the s3s-default) maps to Daily so the
5576 // operator's PUT doesn't silently turn into a no-op cadence.
5577 _ => 24,
5578 };
5579 // Parquet/ORC are not supported (issue #36 scope); we still accept
5580 // the PUT so callers don't fail-loud, but we record CSV and rely on
5581 // the operator catching the discrepancy on GET.
5582 let format = crate::inventory::InventoryFormat::Csv;
5583 crate::inventory::InventoryConfig {
5584 id: id.to_owned(),
5585 bucket: bucket.to_owned(),
5586 destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
5587 destination_prefix: dto
5588 .destination
5589 .s3_bucket_destination
5590 .prefix
5591 .clone()
5592 .unwrap_or_default(),
5593 frequency_hours,
5594 format,
5595 included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
5596 dto.included_object_versions.as_str(),
5597 ),
5598 }
5599}
5600
5601fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
5602 InventoryConfiguration {
5603 id: cfg.id.clone(),
5604 is_enabled: true,
5605 included_object_versions: InventoryIncludedObjectVersions::from(
5606 cfg.included_object_versions.as_aws_str().to_owned(),
5607 ),
5608 destination: InventoryDestination {
5609 s3_bucket_destination: InventoryS3BucketDestination {
5610 account_id: None,
5611 bucket: cfg.destination_bucket.clone(),
5612 encryption: None,
5613 format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
5614 prefix: if cfg.destination_prefix.is_empty() {
5615 None
5616 } else {
5617 Some(cfg.destination_prefix.clone())
5618 },
5619 },
5620 },
5621 schedule: InventorySchedule {
5622 // `frequency_hours == 168` -> Weekly; everything else maps to
5623 // Daily for the wire response (the manager keeps the precise
5624 // hour count internally for due-checking).
5625 frequency: InventoryFrequency::from(
5626 if cfg.frequency_hours == 24 * 7 {
5627 "Weekly"
5628 } else {
5629 "Daily"
5630 }
5631 .to_owned(),
5632 ),
5633 },
5634 filter: None,
5635 optional_fields: None,
5636 }
5637}
5638
5639// ---------------------------------------------------------------------------
5640// v0.6 #35: Convert between the s3s-typed `NotificationConfiguration` (the
5641// wire surface) and our internal `crate::notifications::NotificationConfig`.
5642//
5643// We support TopicConfiguration (-> Destination::Sns) and QueueConfiguration
5644// (-> Destination::Sqs). LambdaFunction and EventBridge configurations are
5645// silently dropped on PUT (out of scope for v0.6 #35); the GET response only
5646// surfaces topic / queue rules.
5647//
5648// The webhook destination has no AWS-native wire form: operators configure
5649// webhooks via the JSON snapshot file (`--notifications-state-file`) or by
5650// poking `NotificationManager::put` directly from a custom binary. This
5651// keeps the wire surface AWS-compatible while still letting the always-
5652// available `Webhook` destination be reachable.
5653// ---------------------------------------------------------------------------
5654
5655fn notif_from_dto(
5656 dto: &NotificationConfiguration,
5657) -> crate::notifications::NotificationConfig {
5658 let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
5659 if let Some(topics) = dto.topic_configurations.as_ref() {
5660 for (idx, t) in topics.iter().enumerate() {
5661 let events = events_from_dto(&t.events);
5662 let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
5663 rules.push(crate::notifications::NotificationRule {
5664 id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
5665 events,
5666 destination: crate::notifications::Destination::Sns {
5667 topic_arn: t.topic_arn.clone(),
5668 },
5669 filter_prefix: prefix,
5670 filter_suffix: suffix,
5671 });
5672 }
5673 }
5674 if let Some(queues) = dto.queue_configurations.as_ref() {
5675 for (idx, q) in queues.iter().enumerate() {
5676 let events = events_from_dto(&q.events);
5677 let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
5678 rules.push(crate::notifications::NotificationRule {
5679 id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
5680 events,
5681 destination: crate::notifications::Destination::Sqs {
5682 queue_arn: q.queue_arn.clone(),
5683 },
5684 filter_prefix: prefix,
5685 filter_suffix: suffix,
5686 });
5687 }
5688 }
5689 crate::notifications::NotificationConfig { rules }
5690}
5691
5692fn notif_to_dto(
5693 cfg: &crate::notifications::NotificationConfig,
5694) -> NotificationConfiguration {
5695 let mut topics: Vec<TopicConfiguration> = Vec::new();
5696 let mut queues: Vec<QueueConfiguration> = Vec::new();
5697 for rule in &cfg.rules {
5698 let events: Vec<Event> = rule
5699 .events
5700 .iter()
5701 .map(|e| Event::from(e.as_aws_str().to_owned()))
5702 .collect();
5703 let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
5704 match &rule.destination {
5705 crate::notifications::Destination::Sns { topic_arn } => {
5706 topics.push(TopicConfiguration {
5707 events,
5708 filter,
5709 id: Some(rule.id.clone()),
5710 topic_arn: topic_arn.clone(),
5711 });
5712 }
5713 crate::notifications::Destination::Sqs { queue_arn } => {
5714 queues.push(QueueConfiguration {
5715 events,
5716 filter,
5717 id: Some(rule.id.clone()),
5718 queue_arn: queue_arn.clone(),
5719 });
5720 }
5721 // Webhook destinations have no AWS wire equivalent — they
5722 // round-trip through the JSON snapshot only. Skip them on the
5723 // GET surface (an SDK consumer wouldn't know what to do with
5724 // them anyway).
5725 crate::notifications::Destination::Webhook { .. } => {}
5726 }
5727 }
5728 NotificationConfiguration {
5729 event_bridge_configuration: None,
5730 lambda_function_configurations: None,
5731 queue_configurations: if queues.is_empty() { None } else { Some(queues) },
5732 topic_configurations: if topics.is_empty() { None } else { Some(topics) },
5733 }
5734}
5735
5736fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
5737 events
5738 .iter()
5739 .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
5740 .collect()
5741}
5742
5743fn filter_from_dto(
5744 f: Option<&NotificationConfigurationFilter>,
5745) -> (Option<String>, Option<String>) {
5746 let Some(f) = f else {
5747 return (None, None);
5748 };
5749 let Some(key) = f.key.as_ref() else {
5750 return (None, None);
5751 };
5752 let Some(rules) = key.filter_rules.as_ref() else {
5753 return (None, None);
5754 };
5755 let mut prefix = None;
5756 let mut suffix = None;
5757 for r in rules {
5758 let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
5759 let value = r.value.clone();
5760 match name.as_deref() {
5761 Some("prefix") => prefix = value,
5762 Some("suffix") => suffix = value,
5763 _ => {}
5764 }
5765 }
5766 (prefix, suffix)
5767}
5768
5769fn filter_to_dto(
5770 prefix: Option<&str>,
5771 suffix: Option<&str>,
5772) -> Option<NotificationConfigurationFilter> {
5773 if prefix.is_none() && suffix.is_none() {
5774 return None;
5775 }
5776 let mut rules: Vec<FilterRule> = Vec::new();
5777 if let Some(p) = prefix {
5778 rules.push(FilterRule {
5779 name: Some(FilterRuleName::from("prefix".to_owned())),
5780 value: Some(p.to_owned()),
5781 });
5782 }
5783 if let Some(s) = suffix {
5784 rules.push(FilterRule {
5785 name: Some(FilterRuleName::from("suffix".to_owned())),
5786 value: Some(s.to_owned()),
5787 });
5788 }
5789 Some(NotificationConfigurationFilter {
5790 key: Some(S3KeyFilter {
5791 filter_rules: Some(rules),
5792 }),
5793 })
5794}
5795
5796// ---------------------------------------------------------------------------
5797// v0.6 #40: Convert between the s3s-typed `ReplicationConfiguration` (the
5798// wire surface) and our internal `crate::replication::ReplicationConfig`.
5799// AWS's `ReplicationRuleFilter` is a sum type — `Prefix | Tag | And { Prefix,
5800// Tags }`; we flatten it into the single `(prefix, tag-vec)` representation
5801// the matcher needs. Sub-blocks v0.6 #40 does not implement
5802// (DeleteMarkerReplication / SourceSelectionCriteria / ReplicationTime /
5803// Metrics / EncryptionConfiguration) round-trip as `None` on GET — operators
5804// who set them on PUT see them silently dropped, mirroring "feature not
5805// supported in this release" semantics.
5806// ---------------------------------------------------------------------------
5807
5808fn replication_from_dto(
5809 dto: &ReplicationConfiguration,
5810) -> crate::replication::ReplicationConfig {
5811 let rules = dto
5812 .rules
5813 .iter()
5814 .enumerate()
5815 .map(|(idx, r)| {
5816 let id = r
5817 .id
5818 .as_ref()
5819 .map(|s| s.as_str().to_owned())
5820 .unwrap_or_else(|| format!("rule-{idx}"));
5821 let priority = r.priority.unwrap_or(0).max(0) as u32;
5822 let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
5823 let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
5824 let destination_bucket = r.destination.bucket.clone();
5825 let destination_storage_class = r
5826 .destination
5827 .storage_class
5828 .as_ref()
5829 .map(|s| s.as_str().to_owned());
5830 crate::replication::ReplicationRule {
5831 id,
5832 priority,
5833 status_enabled,
5834 filter,
5835 destination_bucket,
5836 destination_storage_class,
5837 }
5838 })
5839 .collect();
5840 crate::replication::ReplicationConfig {
5841 role: dto.role.clone(),
5842 rules,
5843 }
5844}
5845
5846fn replication_to_dto(
5847 cfg: &crate::replication::ReplicationConfig,
5848) -> ReplicationConfiguration {
5849 let rules = cfg
5850 .rules
5851 .iter()
5852 .map(|r| {
5853 let status = if r.status_enabled {
5854 ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
5855 } else {
5856 ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
5857 };
5858 let destination = Destination {
5859 access_control_translation: None,
5860 account: None,
5861 bucket: r.destination_bucket.clone(),
5862 encryption_configuration: None,
5863 metrics: None,
5864 replication_time: None,
5865 storage_class: r
5866 .destination_storage_class
5867 .as_ref()
5868 .map(|s| StorageClass::from(s.clone())),
5869 };
5870 let filter = Some(replication_filter_to_dto(&r.filter));
5871 ReplicationRule {
5872 delete_marker_replication: None,
5873 destination,
5874 existing_object_replication: None,
5875 filter,
5876 id: Some(r.id.clone()),
5877 prefix: None,
5878 priority: Some(r.priority as i32),
5879 source_selection_criteria: None,
5880 status,
5881 }
5882 })
5883 .collect();
5884 ReplicationConfiguration {
5885 role: cfg.role.clone(),
5886 rules,
5887 }
5888}
5889
5890fn replication_filter_from_dto(
5891 f: Option<&ReplicationRuleFilter>,
5892 rule_level_prefix: Option<&str>,
5893) -> crate::replication::ReplicationFilter {
5894 let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
5895 let mut tags: Vec<(String, String)> = Vec::new();
5896 if let Some(f) = f {
5897 if let Some(p) = f.prefix.as_ref()
5898 && prefix.is_none()
5899 {
5900 prefix = Some(p.clone());
5901 }
5902 if let Some(t) = f.tag.as_ref()
5903 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5904 {
5905 tags.push((k.clone(), v.clone()));
5906 }
5907 if let Some(and) = f.and.as_ref() {
5908 if let Some(p) = and.prefix.as_ref()
5909 && prefix.is_none()
5910 {
5911 prefix = Some(p.clone());
5912 }
5913 if let Some(ts) = and.tags.as_ref() {
5914 for t in ts {
5915 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5916 tags.push((k.clone(), v.clone()));
5917 }
5918 }
5919 }
5920 }
5921 }
5922 crate::replication::ReplicationFilter { prefix, tags }
5923}
5924
5925fn replication_filter_to_dto(
5926 f: &crate::replication::ReplicationFilter,
5927) -> ReplicationRuleFilter {
5928 if f.tags.is_empty() {
5929 ReplicationRuleFilter {
5930 and: None,
5931 prefix: f.prefix.clone(),
5932 tag: None,
5933 }
5934 } else if f.tags.len() == 1 && f.prefix.is_none() {
5935 let (k, v) = &f.tags[0];
5936 ReplicationRuleFilter {
5937 and: None,
5938 prefix: None,
5939 tag: Some(Tag {
5940 key: Some(k.clone()),
5941 value: Some(v.clone()),
5942 }),
5943 }
5944 } else {
5945 let tags: Vec<Tag> = f
5946 .tags
5947 .iter()
5948 .map(|(k, v)| Tag {
5949 key: Some(k.clone()),
5950 value: Some(v.clone()),
5951 })
5952 .collect();
5953 ReplicationRuleFilter {
5954 and: Some(ReplicationRuleAndOperator {
5955 prefix: f.prefix.clone(),
5956 tags: Some(tags),
5957 }),
5958 prefix: None,
5959 tag: None,
5960 }
5961 }
5962}
5963
5964// ---------------------------------------------------------------------------
5965// v0.6 #37: Convert between the s3s-typed `BucketLifecycleConfiguration`
5966// (the wire surface) and our internal `crate::lifecycle::LifecycleConfig`.
5967// The internal representation flattens AWS's "Filter | And" disjunction
5968// into a single `LifecycleFilter` struct of optional fields plus a tag
5969// vector. Fields S4's evaluator does not consume
5970// (`expired_object_delete_marker`, `noncurrent_version_transitions`,
5971// `transition_default_minimum_object_size`, the storage class on the
5972// noncurrent expiration) are dropped on PUT and re-rendered as their
5973// AWS-default shape on GET so the client always sees a well-formed
5974// configuration.
5975// ---------------------------------------------------------------------------
5976
5977fn dto_lifecycle_to_internal(
5978 dto: &BucketLifecycleConfiguration,
5979) -> crate::lifecycle::LifecycleConfig {
5980 crate::lifecycle::LifecycleConfig {
5981 rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5982 }
5983}
5984
5985fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5986 let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5987 let filter = rule
5988 .filter
5989 .as_ref()
5990 .map(dto_filter_to_internal)
5991 .unwrap_or_default();
5992 let expiration_days = rule
5993 .expiration
5994 .as_ref()
5995 .and_then(|e| e.days)
5996 .and_then(|d| u32::try_from(d).ok());
5997 let expiration_date = rule
5998 .expiration
5999 .as_ref()
6000 .and_then(|e| e.date.as_ref())
6001 .and_then(timestamp_to_chrono_utc);
6002 let transitions: Vec<crate::lifecycle::TransitionRule> = rule
6003 .transitions
6004 .as_ref()
6005 .map(|ts| {
6006 ts.iter()
6007 .filter_map(|t| {
6008 let days = u32::try_from(t.days?).ok()?;
6009 let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
6010 Some(crate::lifecycle::TransitionRule {
6011 days,
6012 storage_class,
6013 })
6014 })
6015 .collect()
6016 })
6017 .unwrap_or_default();
6018 let noncurrent_version_expiration_days = rule
6019 .noncurrent_version_expiration
6020 .as_ref()
6021 .and_then(|n| n.noncurrent_days)
6022 .and_then(|d| u32::try_from(d).ok());
6023 let abort_incomplete_multipart_upload_days = rule
6024 .abort_incomplete_multipart_upload
6025 .as_ref()
6026 .and_then(|a| a.days_after_initiation)
6027 .and_then(|d| u32::try_from(d).ok());
6028 crate::lifecycle::LifecycleRule {
6029 id: rule.id.clone().unwrap_or_default(),
6030 status,
6031 filter,
6032 expiration_days,
6033 expiration_date,
6034 transitions,
6035 noncurrent_version_expiration_days,
6036 abort_incomplete_multipart_upload_days,
6037 }
6038}
6039
6040fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
6041 let mut prefix = filter.prefix.clone();
6042 let mut tags: Vec<(String, String)> = Vec::new();
6043 let mut size_gt: Option<u64> = filter
6044 .object_size_greater_than
6045 .and_then(|n| u64::try_from(n).ok());
6046 let mut size_lt: Option<u64> = filter
6047 .object_size_less_than
6048 .and_then(|n| u64::try_from(n).ok());
6049 if let Some(t) = &filter.tag
6050 && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
6051 {
6052 tags.push((k.clone(), v.clone()));
6053 }
6054 if let Some(and) = &filter.and {
6055 if prefix.is_none() {
6056 prefix = and.prefix.clone();
6057 }
6058 if size_gt.is_none() {
6059 size_gt = and
6060 .object_size_greater_than
6061 .and_then(|n| u64::try_from(n).ok());
6062 }
6063 if size_lt.is_none() {
6064 size_lt = and
6065 .object_size_less_than
6066 .and_then(|n| u64::try_from(n).ok());
6067 }
6068 if let Some(ts) = &and.tags {
6069 for t in ts {
6070 if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
6071 tags.push((k.clone(), v.clone()));
6072 }
6073 }
6074 }
6075 }
6076 crate::lifecycle::LifecycleFilter {
6077 prefix,
6078 tags,
6079 object_size_greater_than: size_gt,
6080 object_size_less_than: size_lt,
6081 }
6082}
6083
6084fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
6085 let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
6086 Some(LifecycleExpiration {
6087 date: rule.expiration_date.map(chrono_utc_to_timestamp),
6088 days: rule.expiration_days.map(|d| d as i32),
6089 expired_object_delete_marker: None,
6090 })
6091 } else {
6092 None
6093 };
6094 let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
6095 None
6096 } else {
6097 Some(
6098 rule.transitions
6099 .iter()
6100 .map(|t| Transition {
6101 date: None,
6102 days: Some(t.days as i32),
6103 storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
6104 })
6105 .collect(),
6106 )
6107 };
6108 let noncurrent_version_expiration =
6109 rule.noncurrent_version_expiration_days
6110 .map(|d| NoncurrentVersionExpiration {
6111 newer_noncurrent_versions: None,
6112 noncurrent_days: Some(d as i32),
6113 });
6114 let abort_incomplete_multipart_upload =
6115 rule.abort_incomplete_multipart_upload_days
6116 .map(|d| AbortIncompleteMultipartUpload {
6117 days_after_initiation: Some(d as i32),
6118 });
6119 let filter = if rule.filter.tags.is_empty()
6120 && rule.filter.object_size_greater_than.is_none()
6121 && rule.filter.object_size_less_than.is_none()
6122 {
6123 rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
6124 and: None,
6125 object_size_greater_than: None,
6126 object_size_less_than: None,
6127 prefix: Some(p.clone()),
6128 tag: None,
6129 })
6130 } else if rule.filter.tags.len() == 1
6131 && rule.filter.prefix.is_none()
6132 && rule.filter.object_size_greater_than.is_none()
6133 && rule.filter.object_size_less_than.is_none()
6134 {
6135 let (k, v) = rule.filter.tags[0].clone();
6136 Some(LifecycleRuleFilter {
6137 and: None,
6138 object_size_greater_than: None,
6139 object_size_less_than: None,
6140 prefix: None,
6141 tag: Some(Tag {
6142 key: Some(k),
6143 value: Some(v),
6144 }),
6145 })
6146 } else {
6147 let tags = if rule.filter.tags.is_empty() {
6148 None
6149 } else {
6150 Some(
6151 rule.filter
6152 .tags
6153 .iter()
6154 .map(|(k, v)| Tag {
6155 key: Some(k.clone()),
6156 value: Some(v.clone()),
6157 })
6158 .collect(),
6159 )
6160 };
6161 Some(LifecycleRuleFilter {
6162 and: Some(LifecycleRuleAndOperator {
6163 object_size_greater_than: rule
6164 .filter
6165 .object_size_greater_than
6166 .and_then(|n| i64::try_from(n).ok()),
6167 object_size_less_than: rule
6168 .filter
6169 .object_size_less_than
6170 .and_then(|n| i64::try_from(n).ok()),
6171 prefix: rule.filter.prefix.clone(),
6172 tags,
6173 }),
6174 object_size_greater_than: None,
6175 object_size_less_than: None,
6176 prefix: None,
6177 tag: None,
6178 })
6179 };
6180 LifecycleRule {
6181 abort_incomplete_multipart_upload,
6182 expiration,
6183 filter,
6184 id: if rule.id.is_empty() {
6185 None
6186 } else {
6187 Some(rule.id.clone())
6188 },
6189 noncurrent_version_expiration,
6190 noncurrent_version_transitions: None,
6191 prefix: None,
6192 status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
6193 transitions,
6194 }
6195}
6196
6197// (timestamp <-> chrono helpers `timestamp_to_chrono_utc` /
6198// `chrono_utc_to_timestamp` are defined earlier in this file for the
6199// tagging/notifications work; the lifecycle DTO converters reuse them.)
6200
6201// ---------------------------------------------------------------------------
6202// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
6203//
6204// Kept as a self-contained block at the bottom of the file so it doesn't
6205// touch the existing `S4Service` struct, `new()`, or any of the per-op
6206// handlers above. The hook is wired in by the binary at server-build time
6207// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
6208//
6209// Lifecycle:
6210// 1. `SigV4aGate::new(store)` is constructed once at boot from the
6211// operator-supplied credential directory.
6212// 2. For each incoming request, `SigV4aGate::pre_route(&req,
6213// &requested_region, &canonical_request_bytes)` is invoked BEFORE
6214// the request hits the S3 framework. If the request claims SigV4a
6215// and verifies, control returns to the framework. Otherwise a 403
6216// `SignatureDoesNotMatch` is produced.
6217// 3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
6218// ---------------------------------------------------------------------------
6219
6220/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
6221///
6222/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
6223/// `pre_route` entry point that returns `Ok(())` for both
6224/// "request is plain SigV4 — pass through" and "request is SigV4a and
6225/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
6226/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
6227#[derive(Debug, Clone)]
6228pub struct SigV4aGate {
6229 store: crate::sigv4a::SharedSigV4aCredentialStore,
6230}
6231
6232impl SigV4aGate {
6233 #[must_use]
6234 pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
6235 Self { store }
6236 }
6237
6238 /// Inspect an incoming HTTP request. Behaviour:
6239 ///
6240 /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
6241 /// prefix) → returns `Ok(())`; the framework's existing SigV4
6242 /// path handles the request.
6243 /// - SigV4a + valid signature + region match → `Ok(())`.
6244 /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
6245 /// - SigV4a + bad signature / region mismatch → `Err` with
6246 /// `SignatureDoesNotMatch`.
6247 ///
6248 /// `canonical_request_bytes` is the SigV4a string-to-sign (or
6249 /// canonical-request bytes; the caller decides) that the framework
6250 /// has already produced for this request. Keeping it as a parameter
6251 /// instead of rebuilding it inside the hook avoids duplicating the
6252 /// canonicalisation logic.
6253 pub fn pre_route<B>(
6254 &self,
6255 req: &http::Request<B>,
6256 requested_region: &str,
6257 canonical_request_bytes: &[u8],
6258 ) -> Result<(), SigV4aGateError> {
6259 if !crate::sigv4a::detect(req) {
6260 return Ok(());
6261 }
6262 let auth_hdr = req
6263 .headers()
6264 .get(http::header::AUTHORIZATION)
6265 .and_then(|v| v.to_str().ok())
6266 .ok_or(SigV4aGateError::MissingAuthorization)?;
6267 let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
6268 .ok_or(SigV4aGateError::MalformedAuthorization)?;
6269 let region_set = req
6270 .headers()
6271 .get(crate::sigv4a::REGION_SET_HEADER)
6272 .and_then(|v| v.to_str().ok())
6273 .unwrap_or("*");
6274 let key = self
6275 .store
6276 .get(&parsed.access_key_id)
6277 .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
6278 crate::sigv4a::verify(
6279 &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
6280 &parsed.signature_der,
6281 key,
6282 region_set,
6283 requested_region,
6284 )
6285 .map_err(SigV4aGateError::Verify)?;
6286 Ok(())
6287 }
6288}
6289
6290/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
6291/// HTTP 403 with one of the two AWS-standard error codes
6292/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
6293#[derive(Debug, thiserror::Error)]
6294pub enum SigV4aGateError {
6295 #[error("missing Authorization header")]
6296 MissingAuthorization,
6297 #[error("malformed SigV4a Authorization header")]
6298 MalformedAuthorization,
6299 #[error("unknown SigV4a access-key-id: {0}")]
6300 UnknownAccessKey(String),
6301 #[error("SigV4a verification failed: {0}")]
6302 Verify(#[source] crate::sigv4a::SigV4aError),
6303}
6304
6305impl SigV4aGateError {
6306 /// AWS S3 error code that should accompany a 403 response.
6307 #[must_use]
6308 pub fn s3_error_code(&self) -> &'static str {
6309 match self {
6310 Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
6311 _ => "SignatureDoesNotMatch",
6312 }
6313 }
6314}
6315
6316#[cfg(test)]
6317mod tests {
6318 use super::*;
6319
6320 #[test]
6321 fn manifest_roundtrip_via_metadata() {
6322 let original = ChunkManifest {
6323 codec: CodecKind::CpuZstd,
6324 original_size: 1234,
6325 compressed_size: 567,
6326 crc32c: 0xdead_beef,
6327 };
6328 let mut meta: Option<Metadata> = None;
6329 write_manifest(&mut meta, &original);
6330 let extracted = extract_manifest(&meta).expect("manifest must round-trip");
6331 assert_eq!(extracted.codec, original.codec);
6332 assert_eq!(extracted.original_size, original.original_size);
6333 assert_eq!(extracted.compressed_size, original.compressed_size);
6334 assert_eq!(extracted.crc32c, original.crc32c);
6335 }
6336
6337 #[test]
6338 fn missing_metadata_yields_none() {
6339 let meta: Option<Metadata> = None;
6340 assert!(extract_manifest(&meta).is_none());
6341 }
6342
6343 #[test]
6344 fn partial_metadata_yields_none() {
6345 let mut meta = Metadata::new();
6346 meta.insert(META_CODEC.into(), "cpu-zstd".into());
6347 let opt = Some(meta);
6348 assert!(extract_manifest(&opt).is_none());
6349 }
6350
6351 #[test]
6352 fn parse_copy_source_range_basic() {
6353 let r = parse_copy_source_range("bytes=10-20").unwrap();
6354 match r {
6355 s3s::dto::Range::Int { first, last } => {
6356 assert_eq!(first, 10);
6357 assert_eq!(last, Some(20));
6358 }
6359 _ => panic!("expected Int range"),
6360 }
6361 }
6362
6363 #[test]
6364 fn parse_copy_source_range_rejects_inverted() {
6365 let err = parse_copy_source_range("bytes=20-10").unwrap_err();
6366 assert!(err.contains("last < first"));
6367 }
6368
6369 #[test]
6370 fn parse_copy_source_range_rejects_missing_prefix() {
6371 let err = parse_copy_source_range("10-20").unwrap_err();
6372 assert!(err.contains("must start with 'bytes='"));
6373 }
6374
6375 #[test]
6376 fn parse_copy_source_range_rejects_open_ended() {
6377 // S3 upload_part_copy spec requires N-M (closed); suffix and
6378 // open-ended forms are not allowed for this header.
6379 assert!(parse_copy_source_range("bytes=10-").is_err());
6380 assert!(parse_copy_source_range("bytes=-10").is_err());
6381 }
6382
6383 // v0.7 #49: safe_object_uri must round-trip every legal S3 key
6384 // (which includes spaces, slashes, control chars, raw UTF-8) into
6385 // a parseable `http::Uri` instead of panicking like the previous
6386 // `format!(...).parse().unwrap()` call sites did.
6387
6388 #[test]
6389 fn safe_object_uri_basic_ascii() {
6390 let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
6391 assert_eq!(uri.path(), "/bucket/key");
6392 }
6393
6394 #[test]
6395 fn safe_object_uri_encodes_spaces() {
6396 let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
6397 // RFC 3986 path-segment encoding turns ' ' into %20.
6398 assert!(
6399 uri.path().contains("%20"),
6400 "expected percent-encoded space, got {}",
6401 uri.path()
6402 );
6403 assert!(uri.path().starts_with("/bucket/"));
6404 }
6405
6406 #[test]
6407 fn safe_object_uri_preserves_slashes() {
6408 // S3 keys legally contain '/' as a logical path separator —
6409 // the helper must NOT escape it (otherwise the synthetic URI
6410 // changes the perceived hierarchy).
6411 let uri =
6412 safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
6413 assert_eq!(uri.path(), "/bucket/key/with/slashes");
6414 }
6415
6416 #[test]
6417 fn safe_object_uri_handles_newline_without_panic() {
6418 // Newlines are control chars in URIs; whether the result is
6419 // Ok (encoded as %0A) or Err (parse rejects), the helper
6420 // MUST NOT panic. Either outcome is acceptable.
6421 let _ = safe_object_uri("bucket", "key\n");
6422 }
6423
6424 #[test]
6425 fn safe_object_uri_handles_null_byte_without_panic() {
6426 let _ = safe_object_uri("bucket", "key\0bad");
6427 }
6428
6429 #[test]
6430 fn safe_object_uri_handles_unicode_without_panic() {
6431 // RTL override, BOM, plain Japanese — none should panic.
6432 let _ = safe_object_uri("bucket", "rtl\u{202E}override");
6433 let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
6434 let _ = safe_object_uri("bucket", "日本語キー");
6435 }
6436
6437 #[test]
6438 fn safe_object_uri_no_panic_for_every_byte() {
6439 // Exhaustive byte coverage: 0x00..=0xFF as a 1-byte key.
6440 // None of these may panic. (0x80..=0xFF are not valid UTF-8
6441 // by themselves; we go through `String::from_utf8_lossy` so
6442 // the helper sees a real `&str` regardless of the raw byte.)
6443 for b in 0u8..=255 {
6444 let s = String::from_utf8_lossy(&[b]).into_owned();
6445 let _ = safe_object_uri("bucket", &s);
6446 }
6447 }
6448
6449 /// v0.8.1 #58: smoke test for the DEK-handling shape used by the
6450 /// SSE-KMS branches of `put_object` and `complete_multipart_upload`.
6451 /// Mirrors the call pattern (generate_dek → length check → copy
6452 /// into stack `[u8; 32]` → reborrow as `&[u8; 32]` for `SseSource`)
6453 /// without spinning up a full `S4Service`.
6454 ///
6455 /// The real assertion this guards against is a regression where
6456 /// the `Zeroizing` wrapper is accidentally dropped before the
6457 /// stack copy lands (e.g. someone refactors to use
6458 /// `let dek = kms.generate_dek(...).await?.0; drop(dek); ...`)
6459 /// or where `&**dek` is rewritten in a way that doesn't compile.
6460 #[tokio::test]
6461 async fn kms_dek_lifetime_within_function_scope() {
6462 use crate::kms::{KmsBackend, LocalKms};
6463 use std::collections::HashMap;
6464 use std::path::PathBuf;
6465 use zeroize::Zeroizing;
6466
6467 let mut keks = HashMap::new();
6468 keks.insert("scope".to_string(), [33u8; 32]);
6469 let kms = LocalKms::from_keks(PathBuf::from("/tmp/kms-scope-test"), keks);
6470
6471 // Mirror the put_object KMS branch shape exactly.
6472 let (dek, wrapped) = kms.generate_dek("scope").await.unwrap();
6473 assert_eq!(dek.len(), 32);
6474 let mut dek_arr: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
6475 dek_arr.copy_from_slice(&dek);
6476
6477 // The reborrow used at the SseSource construction site —
6478 // mirrors the call-site pattern where `let dek_ref: &[u8; 32]`
6479 // auto-derefs from a `Zeroizing<[u8; 32]>` reference.
6480 let dek_ref: &[u8; 32] = &dek_arr;
6481 // Sanity: the reborrow points at the same bytes.
6482 assert_eq!(dek_ref, &*dek_arr);
6483 // Wrapped key id flows through unchanged.
6484 assert_eq!(wrapped.key_id, "scope");
6485
6486 // At end of scope, both `dek` (Zeroizing<Vec<u8>>) and
6487 // `dek_arr` (Zeroizing<[u8; 32]>) are dropped, wiping the
6488 // backing memory. Cannot directly assert the wipe (would be
6489 // UB to read freed memory), so this test instead enforces
6490 // that the call shape compiles and executes; the wipe itself
6491 // is exercised by the `zeroize` crate's own test suite.
6492 }
6493}