Skip to main content

s4_server/
sse.rs

1//! Server-side encryption (SSE-S4) — AES-256-GCM (v0.4 #21, v0.5 #29, v0.5 #27, v0.5 #28).
2//!
3//! Wraps the post-compression S3 object body with authenticated
4//! encryption. Compress-then-encrypt is the right order: encryption
5//! produces high-entropy bytes that don't compress, so encrypting last
6//! preserves the codec's ratio.
7//!
8//! ## Wire formats
9//!
10//! ### S4E1 (v0.4) — single key, no rotation
11//!
12//! ```text
13//! [magic: "S4E1" 4B]
14//! [algo:  u8]            # 1 = AES-256-GCM
15//! [reserved: 3B]         # 0x00 0x00 0x00
16//! [nonce: 12B]           # random per-object
17//! [tag:   16B]           # AES-GCM authentication tag
18//! [ciphertext: variable] # encrypted-then-authenticated body
19//! ```
20//!
21//! Total overhead: 36 bytes per object.
22//!
23//! ### S4E2 (v0.5 #29) — keyring-aware, supports rotation
24//!
25//! ```text
26//! [magic:  "S4E2" 4B]
27//! [algo:   u8]            # 1 = AES-256-GCM
28//! [key_id: u16 BE]        # which keyring slot encrypted this body
29//! [reserved: 1B]          # 0x00
30//! [nonce:  12B]           # random per-object
31//! [tag:    16B]           # AES-GCM authentication tag
32//! [ciphertext: variable]
33//! ```
34//!
35//! Same 36-byte overhead — we reused the 3-byte reserved area in S4E1
36//! to fit a 2-byte key-id + 1-byte reserved without bumping the header
37//! size. The key-id is included in the AAD so a flipped key-id byte
38//! fails the auth tag (i.e. an attacker can't trick the gateway into
39//! decrypting under a different keyring slot).
40//!
41//! ### S4E3 (v0.5 #27) — SSE-C, customer-provided key
42//!
43//! ```text
44//! [magic:   "S4E3" 4B]
45//! [algo:    u8]            # 1 = AES-256-GCM
46//! [key_md5: 16B]           # MD5 fingerprint of the customer key
47//! [nonce:   12B]           # random per-object
48//! [tag:     16B]           # AES-GCM authentication tag
49//! [ciphertext: variable]
50//! ```
51//!
52//! Overhead: 49 bytes (`4 + 1 + 16 + 12 + 16`). Unlike S4E1/S4E2 the
53//! gateway does **not** persist the key — the client supplies it on
54//! every PUT/GET via `x-amz-server-side-encryption-customer-{algorithm,
55//! key,key-MD5}` headers. We store only the 16-byte MD5 in the on-disk
56//! frame so a GET with the wrong key surfaces as
57//! [`SseError::WrongCustomerKey`] before AES-GCM is even tried (saves a
58//! useless decrypt + gives operators a distinct error from generic auth
59//! failure).
60//!
61//! The `key_md5` is included in the AAD so flipping a single byte of
62//! the stored fingerprint also breaks AES-GCM auth — i.e. an attacker
63//! who tampered with the metadata can't sneak a different key past the
64//! check.
65//!
66//! ### S4E4 (v0.5 #28) — SSE-KMS envelope, per-object DEK
67//!
68//! ```text
69//! [magic:           "S4E4" 4B]
70//! [algo:            u8]            # 1 = AES-256-GCM
71//! [key_id_len:      u8]            # 1..=255, length of UTF-8 key_id
72//! [key_id:          variable]      # UTF-8, AAD-authenticated
73//! [wrapped_dek_len: u32 BE]        # length of the wrapped DEK blob
74//! [wrapped_dek:     variable]      # opaque, AAD-authenticated
75//! [nonce:           12B]           # random per-object
76//! [tag:             16B]           # AES-GCM auth tag for body
77//! [ciphertext:      variable]      # body encrypted under the DEK
78//! ```
79//!
80//! Header overhead: `4 + 1 + 1 + key_id_len + 4 + wrapped_dek_len + 12
81//! + 16` = 38 + key_id_len + wrapped_dek_len. For a typical
82//! [`crate::kms::LocalKms`] wrap (60-byte ciphertext) and a 36-char
83//! UUID-style `key_id`, that's ~134 bytes per object.
84//!
85//! `key_id` and `wrapped_dek` are both placed in the AAD so an
86//! attacker cannot rewrite either field to point the gateway at a
87//! different KEK or wrapped DEK without invalidating the body's
88//! AES-GCM tag. The plaintext DEK is never persisted; only the
89//! wrapped form is on disk, and the gateway holds the plaintext only
90//! for the duration of one PUT or GET.
91//!
92//! S4E4 decrypt requires an `async` round-trip to the KMS backend
93//! (to unwrap the DEK), so the synchronous [`decrypt`] function
94//! refuses S4E4 with [`SseError::KmsAsyncRequired`] — callers that
95//! peek `S4E4` via [`peek_magic`] must dispatch to
96//! [`decrypt_with_kms`] instead.
97//!
98//! ## v0.5 rotation flow (SSE-S4 only)
99//!
100//! Operators wire one [`SseKeyring`] holding the **active** key plus
101//! any number of **retired** keys. PUT always encrypts under the
102//! active key (S4E2 with that key's id). GET sniffs the magic:
103//!
104//! - `S4E1`: legacy single-key path. The keyring's active key is tried
105//!   first, then every retired key — this lets a v0.4 deployment
106//!   migrate to a keyring with the original key as active and decrypt
107//!   pre-rotation objects unchanged.
108//! - `S4E2`: read the key_id, look it up in the keyring, decrypt with
109//!   that exact key. Missing key_id surfaces as `KeyNotInKeyring`.
110//! - `S4E3`: keyring is **not** consulted. Caller must supply
111//!   [`SseSource::CustomerKey`] with the matching key + md5.
112//!
113//! ## Open follow-ups
114//!
115//! - **Server-managed key only** (for SSE-S4): keys come from local
116//!   files via `--sse-s4-key` / `--sse-s4-key-rotated`. KMS / vault
117//!   integration for the SSE-S4 keyring (i.e. wrapping the keyring's
118//!   keys with KMS) is a separate issue. SSE-KMS for per-object DEKs
119//!   is implemented (see [`SseSource::Kms`] + S4E4 above).
120
121// v0.8.8: aes-gcm 0.10 + hmac 0.12 (pinned by RustCrypto) re-export the
122// `Nonce::from_slice` / `Key::<Aes256Gcm>::from_slice` helpers from
123// generic-array 0.14, whose helpers were deprecated in favour of the 1.x
124// API. Migrating requires bumping aes-gcm to a release that pins
125// generic-array 1.x (not yet stable as of this writing), so silence the
126// deprecation at module scope until the upstream RustCrypto stack lands.
127#![allow(deprecated)]
128
129use std::collections::HashMap;
130use std::path::Path;
131use std::sync::Arc;
132
133use aes_gcm::aead::{Aead, KeyInit, Payload};
134use aes_gcm::{Aes256Gcm, Key, Nonce};
135use bytes::Bytes;
136use md5::{Digest as Md5Digest, Md5};
137use rand::RngCore;
138use thiserror::Error;
139
140use crate::kms::{KmsBackend, KmsError, WrappedDek};
141
142pub const SSE_MAGIC_V1: &[u8; 4] = b"S4E1";
143pub const SSE_MAGIC_V2: &[u8; 4] = b"S4E2";
144pub const SSE_MAGIC_V3: &[u8; 4] = b"S4E3";
145pub const SSE_MAGIC_V4: &[u8; 4] = b"S4E4";
146/// v0.8 #52: chunked variant of S4E2 — same SSE-S4 keyring source,
147/// but the body is sliced into independently-sealed AES-GCM chunks
148/// so the GET path can stream-decrypt + emit chunk-by-chunk instead
149/// of buffering the entire object before tag verify. See
150/// [`encrypt_v2_chunked`] / [`decrypt_chunked_stream`] for the on-
151/// the-wire layout.
152///
153/// **Read-only as of v0.8.1 #57** — new PUTs emit [`SSE_MAGIC_V6`]
154/// (S4E6). S4E5 is kept around for back-compat decrypt of objects
155/// written by v0.8.0.
156pub const SSE_MAGIC_V5: &[u8; 4] = b"S4E5";
157/// v0.8.1 #57: identical layout to S4E5 except the per-PUT salt is
158/// widened from 4 → 8 bytes so the birthday-collision threshold on
159/// AES-GCM nonce reuse jumps from ~65k PUTs/key to ~4 billion. See
160/// [`encrypt_v2_chunked`] (now emits S4E6) / the S4E6 wire-format
161/// docs further down for the full layout.
162pub const SSE_MAGIC_V6: &[u8; 4] = b"S4E6";
163/// Back-compat alias — v0.4 callers that imported `SSE_MAGIC` mean S4E1.
164pub const SSE_MAGIC: &[u8; 4] = SSE_MAGIC_V1;
165
166/// Header layout matches between S4E1 and S4E2 (both 36 bytes total)
167/// because S4E2 reuses the 3-byte reserved slot to fit `key_id (2B) +
168/// reserved (1B)`. Keeping them the same length means the rest of the
169/// pipeline (sidecar offsets, multipart math) doesn't care which
170/// frame variant is in flight.
171pub const SSE_HEADER_BYTES: usize = 4 + 1 + 3 + 12 + 16; // = 36
172/// S4E3 (SSE-C) replaces the 3-byte reserved area with a 16-byte
173/// customer-key MD5 fingerprint, so the header is 49 bytes total.
174/// `magic 4 + algo 1 + key_md5 16 + nonce 12 + tag 16`.
175pub const SSE_HEADER_BYTES_V3: usize = 4 + 1 + KEY_MD5_LEN + 12 + 16; // = 49
176pub const ALGO_AES_256_GCM: u8 = 1;
177const NONCE_LEN: usize = 12;
178const TAG_LEN: usize = 16;
179const KEY_LEN: usize = 32;
180const KEY_MD5_LEN: usize = 16;
181/// AWS S3 SSE-C only allows AES256 in the
182/// `x-amz-server-side-encryption-customer-algorithm` header, so we
183/// match that exact spelling for parity with real S3 clients.
184pub const SSE_C_ALGORITHM: &str = "AES256";
185
186#[derive(Debug, Error)]
187pub enum SseError {
188    #[error("SSE key file {path:?}: {source}")]
189    KeyFileIo {
190        path: std::path::PathBuf,
191        source: std::io::Error,
192    },
193    #[error(
194        "SSE key file must be exactly 32 raw bytes (or 64-char hex / 44-char base64); got {got} bytes after parse"
195    )]
196    BadKeyLength { got: usize },
197    #[error("SSE-encrypted body too short ({got} bytes; need at least {SSE_HEADER_BYTES})")]
198    TooShort { got: usize },
199    #[error("SSE bad magic: expected S4E1/S4E2/S4E3/S4E4/S4E5/S4E6, got {got:?}")]
200    BadMagic { got: [u8; 4] },
201    #[error("SSE unsupported algo tag: {tag} (this build only knows AES-256-GCM = 1)")]
202    UnsupportedAlgo { tag: u8 },
203    #[error(
204        "SSE key_id {id} (S4E2 frame) not present in keyring; rotation history likely incomplete"
205    )]
206    KeyNotInKeyring { id: u16 },
207    #[error("SSE decryption / authentication failed (key mismatch or ciphertext tampered with)")]
208    DecryptFailed,
209    // --- v0.5 #27: SSE-C specific errors ---
210    /// The MD5 fingerprint stored in the S4E3 frame doesn't match the
211    /// MD5 of the customer key the client supplied. This is the
212    /// "wrong customer key on GET" signal — distinct from
213    /// `DecryptFailed` so service.rs can map it to AWS S3's
214    /// `403 AccessDenied` (S3 returns AccessDenied when the supplied
215    /// SSE-C key doesn't match the one used at PUT time).
216    #[error("SSE-C key MD5 fingerprint mismatch — client supplied a different key than PUT")]
217    WrongCustomerKey,
218    /// `parse_customer_key_headers` saw a malformed input. `reason` is
219    /// a short human string ("base64 decode of key", "key length",
220    /// "md5 length", "md5 mismatch") for operator log lines — never
221    /// echoed to the client (would leak crypto details).
222    #[error("SSE-C customer-key headers invalid: {reason}")]
223    InvalidCustomerKey { reason: &'static str },
224    /// Client asked for an SSE-C algorithm the gateway doesn't speak.
225    /// AWS S3 only ever defines `AES256` here; surfacing the offending
226    /// string lets us 400 with a useful message.
227    #[error("SSE-C algorithm {algo:?} unsupported (only {SSE_C_ALGORITHM:?} is allowed)")]
228    CustomerKeyAlgorithmUnsupported { algo: String },
229    /// S4E3 body lacks an SSE-C key — caller passed `SseSource::Keyring`
230    /// when decrypting an SSE-C-encrypted object. service.rs should
231    /// translate this into the same "missing customer key" 400 that
232    /// AWS S3 returns when SSE-C headers are absent on a GET.
233    #[error("S4E3 frame requires SseSource::CustomerKey; got Keyring")]
234    CustomerKeyRequired,
235    /// Inverse: client sent SSE-C headers on a GET for an object stored
236    /// without SSE-C. The supplied key has no role in decryption, but
237    /// AWS S3 actually 400s in this case ("expected an unencrypted
238    /// object" / "extraneous SSE-C headers"), so we mirror that.
239    #[error("S4E1/S4E2 frame stored without SSE-C; SseSource::CustomerKey is unexpected")]
240    CustomerKeyUnexpected,
241    // --- v0.5 #28: SSE-KMS specific errors ---
242    /// `decrypt` (sync) was handed an S4E4 body. SSE-KMS unwrap is
243    /// async (it round-trips to the KMS backend), so callers must
244    /// peek the magic with [`peek_magic`] and dispatch S4E4 frames to
245    /// [`decrypt_with_kms`] instead. service.rs's GET handler does
246    /// this; tests / direct callers may hit this if they forget.
247    #[error(
248        "S4E4 (SSE-KMS) body requires async decrypt — call decrypt_with_kms() instead of decrypt()"
249    )]
250    KmsAsyncRequired,
251    /// S4E4 frame is shorter than the minimum-possible header (38
252    /// bytes for an empty `key_id` + empty `wrapped_dek`, which is
253    /// itself impossible — we just sanity-check the floor).
254    #[error("S4E4 frame too short ({got} bytes; need at least {min})")]
255    KmsFrameTooShort { got: usize, min: usize },
256    /// S4E4 declared a `key_id_len` or `wrapped_dek_len` that runs
257    /// past the end of the body. Almost certainly truncation /
258    /// corruption rather than tampering (tampering would fail the
259    /// AES-GCM tag instead).
260    #[error("S4E4 frame field length out of bounds: {what}")]
261    KmsFrameFieldOob { what: &'static str },
262    /// `key_id` field of an S4E4 frame is not valid UTF-8. We require
263    /// UTF-8 because `LocalKms` uses the basename of a `.kek` file
264    /// (which is OS-string-but-typically-UTF-8) and AWS KMS uses ARNs
265    /// (which are ASCII).
266    #[error("S4E4 key_id is not valid UTF-8")]
267    KmsKeyIdNotUtf8,
268    /// service.rs handed `decrypt_with_kms` a `WrappedDek` whose
269    /// `key_id` doesn't match the one stored in the S4E4 frame. This
270    /// is an integration bug (caller is meant to pull the wrapped
271    /// DEK *from the frame*, not from somewhere else), surface as a
272    /// distinct error so it shows up in tests rather than silently
273    /// failing the AES-GCM tag.
274    #[error(
275        "S4E4 SseSource::Kms wrapped DEK key_id {supplied:?} doesn't match frame key_id {stored:?}"
276    )]
277    KmsWrappedDekMismatch { supplied: String, stored: String },
278    /// SSE-KMS path got a non-Kms `SseSource` for an S4E4 body. The
279    /// async dispatch in `decrypt_with_kms` re-derives the source
280    /// internally so this can only happen if a future caller passes
281    /// `SseSource::Keyring` / `CustomerKey` to a path that expected
282    /// `Kms` — kept around for symmetry with the other "wrong source"
283    /// errors.
284    #[error("S4E4 frame requires SseSource::Kms")]
285    KmsRequired,
286    /// Pass-through for [`crate::kms::KmsError`] surfaced from
287    /// `KmsBackend::decrypt_dek` — boxed so the variant stays small.
288    #[error("KMS unwrap: {0}")]
289    KmsBackend(#[from] KmsError),
290    // --- v0.8 #52: S4E5 (chunked SSE-S4) specific errors ---
291    /// AES-GCM auth tag verify failed on chunk `chunk_index` of an
292    /// S4E5 body. Distinct from the all-or-nothing
293    /// [`SseError::DecryptFailed`] because the streaming GET may
294    /// have already emitted earlier chunks to the client by the
295    /// time chunk N fails — operators need the chunk index in audit
296    /// logs to triangulate which byte range was tampered with (or
297    /// which disk sector flipped).
298    #[error(
299        "S4E5 chunk {chunk_index} auth tag verify failed (key mismatch or chunk tampered with)"
300    )]
301    ChunkAuthFailed { chunk_index: u32 },
302    /// Caller asked [`encrypt_v2_chunked`] to use a chunk size of 0
303    /// — nonsensical (would loop forever). Surfaced as an error
304    /// rather than panicking so service.rs can map a bad
305    /// `--sse-chunk-size 0` configuration to a clear startup error.
306    #[error("S4E5 chunk_size must be > 0 (got 0)")]
307    ChunkSizeInvalid,
308    /// S4E5 frame is shorter than the fixed header or declares a
309    /// (chunk_count × per-chunk-bytes) total that overruns the
310    /// body. Almost certainly truncation / corruption — tampering
311    /// with the per-chunk ciphertext or tag would surface as
312    /// [`SseError::ChunkAuthFailed`] instead.
313    #[error("S4E5 frame truncated: {what}")]
314    ChunkFrameTruncated { what: &'static str },
315    // --- v0.8.1 #57: S4E6 (8-byte salt, 24-bit chunk_index) ---
316    /// S4E6 chunk_index is encoded as a 24-bit big-endian field in
317    /// the per-chunk nonce, capping `chunk_count` at
318    /// `2^24 - 1 = 16_777_215`. At the default 1 MiB chunk size that
319    /// is ~16 PiB per object — well past S3's 5 GiB single-object
320    /// ceiling. Surface as a distinct error so a misconfiguration
321    /// (`--sse-chunk-size 1` on a multi-GiB object, say) shows up at
322    /// PUT time with a clear cause rather than a panic at the u32 →
323    /// u24 cast.
324    #[error("S4E6 chunk_count {got} exceeds 24-bit max ({max}) — pick a larger --sse-chunk-size")]
325    ChunkCountTooLarge { got: u32, max: u32 },
326    // --- v0.8.2 #64: pre-allocation guard for chunked SSE frames ---
327    /// `parse_chunked_header` rejected an S4E5 / S4E6 frame because
328    /// the declared `chunk_size × chunk_count` (or the on-disk total
329    /// after adding per-chunk tag overhead and the fixed header) is
330    /// either:
331    ///
332    /// 1. arithmetically nonsensical (the multiplication / addition
333    ///    overflows u64 on a 64-bit host), or
334    /// 2. larger than the gateway's configured `max_body_bytes`
335    ///    (default 5 GiB — AWS S3's single-object PUT ceiling).
336    ///
337    /// This is the **DoS guard** for the chunked path: without it, a
338    /// 24-byte malicious header that claims `chunk_size = u32::MAX`
339    /// and `chunk_count = u32::MAX` would have caused the buffered
340    /// decrypt path to attempt a multi-PB `Vec::with_capacity` (or
341    /// integer-overflow into a tiny alloc + later out-of-bounds
342    /// panic) before any cryptographic work happened. Surface as a
343    /// distinct variant — never echo the offending sizes back to the
344    /// client (kept as a `&'static str` `details` field for operator
345    /// audit logs only) so the response isn't a tampering oracle.
346    #[error("S4E5/S4E6 chunked frame declares an over-large size: {details}")]
347    ChunkFrameTooLarge { details: &'static str },
348}
349
350/// Default cap on a single chunked-SSE frame's declared plaintext
351/// size, used by [`decrypt_chunked_buffered_default`] and the
352/// streaming pre-validation path. Mirrors AWS S3's 5 GiB single-object
353/// PUT ceiling — anything larger is unreachable on the AWS-compatible
354/// API surface and is therefore safe to reject pre-alloc as a malformed
355/// / malicious header (v0.8.2 #64).
356pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
357
358/// 32-byte symmetric key. `bytes` is `pub` so call sites can construct
359/// keys directly from already-validated bytes (e.g. KMS-decrypted DEKs)
360/// without going through the on-disk parser. Hold inside an `Arc` when
361/// sharing across handler tasks — `SseKeyring` does this internally.
362pub struct SseKey {
363    pub bytes: [u8; 32],
364}
365
366impl SseKey {
367    /// Load a 32-byte key from disk. Accepts three on-disk encodings:
368    /// raw 32 bytes, 64-char ASCII hex, or 44-char ASCII base64 (with or
369    /// without padding). Whitespace is trimmed.
370    pub fn from_path(path: &Path) -> Result<Self, SseError> {
371        let raw = std::fs::read(path).map_err(|source| SseError::KeyFileIo {
372            path: path.to_path_buf(),
373            source,
374        })?;
375        Self::from_bytes(&raw)
376    }
377
378    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SseError> {
379        // Try raw first.
380        if bytes.len() == KEY_LEN {
381            let mut k = [0u8; KEY_LEN];
382            k.copy_from_slice(bytes);
383            return Ok(Self { bytes: k });
384        }
385        // Trim whitespace and try hex / base64.
386        let s = std::str::from_utf8(bytes).unwrap_or("").trim();
387        if s.len() == KEY_LEN * 2 && s.chars().all(|c| c.is_ascii_hexdigit()) {
388            let mut k = [0u8; KEY_LEN];
389            for (i, k_byte) in k.iter_mut().enumerate() {
390                *k_byte = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16)
391                    .map_err(|_| SseError::BadKeyLength { got: bytes.len() })?;
392            }
393            return Ok(Self { bytes: k });
394        }
395        if let Ok(decoded) =
396            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s.as_bytes())
397            && decoded.len() == KEY_LEN
398        {
399            let mut k = [0u8; KEY_LEN];
400            k.copy_from_slice(&decoded);
401            return Ok(Self { bytes: k });
402        }
403        Err(SseError::BadKeyLength { got: bytes.len() })
404    }
405
406    fn as_aes_key(&self) -> &Key<Aes256Gcm> {
407        Key::<Aes256Gcm>::from_slice(&self.bytes)
408    }
409}
410
411impl std::fmt::Debug for SseKey {
412    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413        f.debug_struct("SseKey")
414            .field("len", &KEY_LEN)
415            .field("key", &"<redacted>")
416            .finish()
417    }
418}
419
420/// v0.5 #29: a set of `SseKey`s indexed by `u16` key-id, plus a
421/// designated **active** id used for new encryptions. Rotation is just
422/// "add the new key, flip `active` to its id, leave the old keys for
423/// decryption-only". Cheap to clone (`Arc<SseKey>` per slot).
424#[derive(Clone)]
425pub struct SseKeyring {
426    active: u16,
427    keys: HashMap<u16, Arc<SseKey>>,
428}
429
430impl SseKeyring {
431    /// Create a keyring seeded with one key, immediately marked
432    /// active. Add older keys later via [`SseKeyring::add`] so the
433    /// gateway can still decrypt pre-rotation objects.
434    pub fn new(active: u16, key: Arc<SseKey>) -> Self {
435        let mut keys = HashMap::new();
436        keys.insert(active, key);
437        Self { active, keys }
438    }
439
440    /// Insert another key under id `id`. Does NOT change `active`. If
441    /// `id == active`, the slot is overwritten (useful for tests; in
442    /// production prefer minting a fresh id).
443    pub fn add(&mut self, id: u16, key: Arc<SseKey>) {
444        self.keys.insert(id, key);
445    }
446
447    /// Active (id, key) — used by [`encrypt_v2`] to pick the slot for
448    /// new objects.
449    pub fn active(&self) -> (u16, &SseKey) {
450        let id = self.active;
451        let key = self
452            .keys
453            .get(&id)
454            .expect("active key id must be present in keyring (constructor invariant)");
455        (id, key.as_ref())
456    }
457
458    /// Look up a key by id. Returns `None` for unknown ids — caller
459    /// should surface this as [`SseError::KeyNotInKeyring`].
460    pub fn get(&self, id: u16) -> Option<&SseKey> {
461        self.keys.get(&id).map(Arc::as_ref)
462    }
463}
464
465impl std::fmt::Debug for SseKeyring {
466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467        f.debug_struct("SseKeyring")
468            .field("active", &self.active)
469            .field("key_count", &self.keys.len())
470            .field("key_ids", &self.keys.keys().collect::<Vec<_>>())
471            .finish()
472    }
473}
474
475pub type SharedSseKeyring = Arc<SseKeyring>;
476
477/// Encrypt `plaintext` with the given key, producing the on-the-wire
478/// S4E1-framed output: `[magic 4][algo 1][reserved 3][nonce 12][tag 16][ciphertext]`.
479///
480/// Kept for back-compat: v0.4 callers that hand-built an `SseKey` (no
481/// keyring) still get the v1 frame. New code should use
482/// [`encrypt_v2`] which writes S4E2 and supports rotation on read.
483pub fn encrypt(key: &SseKey, plaintext: &[u8]) -> Bytes {
484    let cipher = Aes256Gcm::new(key.as_aes_key());
485    let mut nonce_bytes = [0u8; NONCE_LEN];
486    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
487    let nonce = Nonce::from_slice(&nonce_bytes);
488    // AAD = magic + algo. Tampering with either bumps the tag check.
489    let mut aad = [0u8; 8];
490    aad[..4].copy_from_slice(SSE_MAGIC_V1);
491    aad[4] = ALGO_AES_256_GCM;
492    let ct_with_tag = cipher
493        .encrypt(
494            nonce,
495            Payload {
496                msg: plaintext,
497                aad: &aad,
498            },
499        )
500        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
501    debug_assert!(ct_with_tag.len() >= TAG_LEN);
502    let split = ct_with_tag.len() - TAG_LEN;
503    let (ct, tag) = ct_with_tag.split_at(split);
504
505    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
506    out.extend_from_slice(SSE_MAGIC_V1);
507    out.push(ALGO_AES_256_GCM);
508    out.extend_from_slice(&[0u8; 3]); // reserved
509    out.extend_from_slice(&nonce_bytes);
510    out.extend_from_slice(tag);
511    out.extend_from_slice(ct);
512    Bytes::from(out)
513}
514
515/// v0.5 #29: encrypt under the keyring's currently-active key, writing
516/// an S4E2-framed body (`[magic 4][algo 1][key_id 2 BE][reserved 1]
517/// [nonce 12][tag 16][ciphertext]`). The key-id is included in the
518/// AAD so flipping it fails the auth tag.
519pub fn encrypt_v2(plaintext: &[u8], keyring: &SseKeyring) -> Bytes {
520    let (key_id, key) = keyring.active();
521    let cipher = Aes256Gcm::new(key.as_aes_key());
522    let mut nonce_bytes = [0u8; NONCE_LEN];
523    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
524    let nonce = Nonce::from_slice(&nonce_bytes);
525    let aad = aad_v2(key_id);
526    let ct_with_tag = cipher
527        .encrypt(
528            nonce,
529            Payload {
530                msg: plaintext,
531                aad: &aad,
532            },
533        )
534        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
535    debug_assert!(ct_with_tag.len() >= TAG_LEN);
536    let split = ct_with_tag.len() - TAG_LEN;
537    let (ct, tag) = ct_with_tag.split_at(split);
538
539    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
540    out.extend_from_slice(SSE_MAGIC_V2);
541    out.push(ALGO_AES_256_GCM);
542    out.extend_from_slice(&key_id.to_be_bytes()); // 2B BE key_id
543    out.push(0u8); // 1B reserved
544    out.extend_from_slice(&nonce_bytes);
545    out.extend_from_slice(tag);
546    out.extend_from_slice(ct);
547    Bytes::from(out)
548}
549
550fn aad_v1() -> [u8; 8] {
551    let mut aad = [0u8; 8];
552    aad[..4].copy_from_slice(SSE_MAGIC_V1);
553    aad[4] = ALGO_AES_256_GCM;
554    aad
555}
556
557fn aad_v2(key_id: u16) -> [u8; 8] {
558    let mut aad = [0u8; 8];
559    aad[..4].copy_from_slice(SSE_MAGIC_V2);
560    aad[4] = ALGO_AES_256_GCM;
561    aad[5..7].copy_from_slice(&key_id.to_be_bytes());
562    aad[7] = 0u8;
563    aad
564}
565
566/// AAD for S4E3 = magic (4) + algo (1) + key_md5 (16). Putting the
567/// fingerprint in the AAD means tampering with the stored MD5 (e.g. an
568/// attacker rewriting the header to match a *different* key they
569/// happen to know) breaks the AES-GCM tag — the wrong-key check isn't
570/// just a plain `==` we could be tricked past.
571fn aad_v3(key_md5: &[u8; KEY_MD5_LEN]) -> [u8; 4 + 1 + KEY_MD5_LEN] {
572    let mut aad = [0u8; 4 + 1 + KEY_MD5_LEN];
573    aad[..4].copy_from_slice(SSE_MAGIC_V3);
574    aad[4] = ALGO_AES_256_GCM;
575    aad[5..5 + KEY_MD5_LEN].copy_from_slice(key_md5);
576    aad
577}
578
579/// Parsed + verified SSE-C key material from the three customer
580/// headers. `key_md5` is the MD5 of `key` (we recompute and compare in
581/// [`parse_customer_key_headers`] — clients send their own to catch
582/// transport corruption, but we *trust* our own computation as the
583/// canonical fingerprint in the S4E3 frame).
584#[derive(Clone)]
585pub struct CustomerKeyMaterial {
586    pub key: [u8; KEY_LEN],
587    pub key_md5: [u8; KEY_MD5_LEN],
588}
589
590impl std::fmt::Debug for CustomerKeyMaterial {
591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592        // Don't leak the key into logs. The MD5 is a public fingerprint
593        // (S3 puts it on the wire), so that's safe to show.
594        f.debug_struct("CustomerKeyMaterial")
595            .field("key", &"<redacted>")
596            .field("key_md5_hex", &hex_lower(&self.key_md5))
597            .finish()
598    }
599}
600
601fn hex_lower(bytes: &[u8]) -> String {
602    let mut s = String::with_capacity(bytes.len() * 2);
603    for b in bytes {
604        s.push_str(&format!("{b:02x}"));
605    }
606    s
607}
608
609/// Source of the encryption key for [`encrypt_with_source`] /
610/// [`decrypt`]. SSE-S4 (server-managed, rotation-aware) goes through
611/// `Keyring`; SSE-C (customer-supplied) goes through `CustomerKey`.
612///
613/// Borrowed (not owned) so the caller can hold a long-lived
614/// `CustomerKeyMaterial` next to the request and just lend it for the
615/// duration of one PUT/GET.
616#[derive(Debug, Clone, Copy)]
617pub enum SseSource<'a> {
618    /// Server-managed keyring path → produces / consumes S4E1 (legacy)
619    /// or S4E2 (rotation-aware) frames.
620    Keyring(&'a SseKeyring),
621    /// Client-supplied AES-256 key + its MD5 fingerprint → produces /
622    /// consumes S4E3 frames. The server never persists the key; it
623    /// stores `key_md5` only.
624    CustomerKey {
625        key: &'a [u8; KEY_LEN],
626        key_md5: &'a [u8; KEY_MD5_LEN],
627    },
628    /// SSE-KMS envelope → produces / consumes S4E4 frames. The server
629    /// holds a per-object plaintext DEK (from a fresh
630    /// [`KmsBackend::generate_dek`] call) and the wrapped form to
631    /// persist alongside the body. The DEK is dropped after one
632    /// PUT/GET; only the wrapped form survives at rest.
633    Kms {
634        /// 32-byte plaintext DEK, used as the AES-GCM key.
635        dek: &'a [u8; KEY_LEN],
636        /// Wrapped form to persist in the S4E4 frame (PUT) or the one
637        /// read out of the frame (GET, after a successful unwrap).
638        wrapped: &'a WrappedDek,
639    },
640}
641
642/// Back-compat coercion: existing call sites pass `&SseKeyring`
643/// directly to [`decrypt`]. With this `From` impl the generic bound
644/// `Into<SseSource>` accepts `&SseKeyring` without the caller writing
645/// `.into()`, keeping v0.4 / v0.5 #29 service.rs callers compiling
646/// untouched while v0.5 #27 SSE-C callers pass `SseSource::CustomerKey`
647/// explicitly.
648impl<'a> From<&'a SseKeyring> for SseSource<'a> {
649    fn from(kr: &'a SseKeyring) -> Self {
650        SseSource::Keyring(kr)
651    }
652}
653
654/// service.rs holds keyring as `Option<Arc<SseKeyring>>` and unwraps to
655/// `&Arc<SseKeyring>` — let that coerce too, otherwise every existing
656/// call site needs `.as_ref()` boilerplate.
657impl<'a> From<&'a Arc<SseKeyring>> for SseSource<'a> {
658    fn from(kr: &'a Arc<SseKeyring>) -> Self {
659        SseSource::Keyring(kr.as_ref())
660    }
661}
662
663impl<'a> From<&'a CustomerKeyMaterial> for SseSource<'a> {
664    fn from(m: &'a CustomerKeyMaterial) -> Self {
665        SseSource::CustomerKey {
666            key: &m.key,
667            key_md5: &m.key_md5,
668        }
669    }
670}
671
672/// Parse the three AWS SSE-C headers and return verified key material.
673///
674/// Validates, in order:
675/// 1. `algorithm == "AES256"` (the only value AWS S3 defines).
676/// 2. `key_base64` decodes to exactly 32 bytes (AES-256 key length).
677/// 3. `key_md5_base64` decodes to exactly 16 bytes (MD5 digest length).
678/// 4. The actual MD5 of the decoded key matches the supplied MD5.
679///
680/// Step 4 catches transport corruption *and* a class of programming
681/// bugs where the client signs with one key but uploads another. AWS
682/// S3 also performs this check.
683pub fn parse_customer_key_headers(
684    algorithm: &str,
685    key_base64: &str,
686    key_md5_base64: &str,
687) -> Result<CustomerKeyMaterial, SseError> {
688    use base64::Engine as _;
689    if algorithm != SSE_C_ALGORITHM {
690        return Err(SseError::CustomerKeyAlgorithmUnsupported {
691            algo: algorithm.to_string(),
692        });
693    }
694    let key_bytes = base64::engine::general_purpose::STANDARD
695        .decode(key_base64.trim().as_bytes())
696        .map_err(|_| SseError::InvalidCustomerKey {
697            reason: "base64 decode of key",
698        })?;
699    if key_bytes.len() != KEY_LEN {
700        return Err(SseError::InvalidCustomerKey {
701            reason: "key length (must be 32 bytes after base64 decode)",
702        });
703    }
704    let supplied_md5 = base64::engine::general_purpose::STANDARD
705        .decode(key_md5_base64.trim().as_bytes())
706        .map_err(|_| SseError::InvalidCustomerKey {
707            reason: "base64 decode of key MD5",
708        })?;
709    if supplied_md5.len() != KEY_MD5_LEN {
710        return Err(SseError::InvalidCustomerKey {
711            reason: "key MD5 length (must be 16 bytes after base64 decode)",
712        });
713    }
714    let actual_md5 = compute_key_md5(&key_bytes);
715    // Constant-time compare — paranoia, MD5 is non-secret but the key
716    // it identifies is, so we don't want a timing oracle.
717    if !constant_time_eq(&actual_md5, &supplied_md5) {
718        return Err(SseError::InvalidCustomerKey {
719            reason: "supplied MD5 does not match MD5 of supplied key",
720        });
721    }
722    let mut key = [0u8; KEY_LEN];
723    key.copy_from_slice(&key_bytes);
724    let mut key_md5 = [0u8; KEY_MD5_LEN];
725    key_md5.copy_from_slice(&actual_md5);
726    Ok(CustomerKeyMaterial { key, key_md5 })
727}
728
729/// Convenience wrapper — compute the MD5 fingerprint of a 32-byte
730/// customer key. Callers that already have the bytes (e.g. derived
731/// from a KMS unwrap) can use this to construct a
732/// [`CustomerKeyMaterial`] directly.
733pub fn compute_key_md5(key: &[u8]) -> [u8; KEY_MD5_LEN] {
734    let mut h = Md5::new();
735    h.update(key);
736    let out = h.finalize();
737    let mut md5 = [0u8; KEY_MD5_LEN];
738    md5.copy_from_slice(&out);
739    md5
740}
741
742/// `subtle`-free constant-time byte slice equality. We only need this
743/// at one site (MD5 verification) so pulling `subtle` in feels excessive.
744fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
745    if a.len() != b.len() {
746        return false;
747    }
748    let mut acc: u8 = 0;
749    for (x, y) in a.iter().zip(b.iter()) {
750        acc |= x ^ y;
751    }
752    acc == 0
753}
754
755/// v0.5 #27: encrypt under whichever source the caller picked.
756///
757/// - `SseSource::Keyring` → delegates to [`encrypt_v2`] (S4E2 frame).
758/// - `SseSource::CustomerKey` → writes an S4E3 frame (no key persisted,
759///   just the MD5 fingerprint for GET-side verification).
760///
761/// service.rs picks the source per-request: SSE-C headers present →
762/// `CustomerKey`, otherwise (and only when `--sse-s4-key` is wired) →
763/// `Keyring`. Plaintext objects skip this function entirely.
764pub fn encrypt_with_source(plaintext: &[u8], source: SseSource<'_>) -> Bytes {
765    match source {
766        SseSource::Keyring(kr) => encrypt_v2(plaintext, kr),
767        SseSource::CustomerKey { key, key_md5 } => encrypt_v3(plaintext, key, key_md5),
768        SseSource::Kms { dek, wrapped } => encrypt_v4(plaintext, dek, wrapped),
769    }
770}
771
772fn encrypt_v3(plaintext: &[u8], key: &[u8; KEY_LEN], key_md5: &[u8; KEY_MD5_LEN]) -> Bytes {
773    let aes_key = Key::<Aes256Gcm>::from_slice(key);
774    let cipher = Aes256Gcm::new(aes_key);
775    let mut nonce_bytes = [0u8; NONCE_LEN];
776    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
777    let nonce = Nonce::from_slice(&nonce_bytes);
778    let aad = aad_v3(key_md5);
779    let ct_with_tag = cipher
780        .encrypt(
781            nonce,
782            Payload {
783                msg: plaintext,
784                aad: &aad,
785            },
786        )
787        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
788    debug_assert!(ct_with_tag.len() >= TAG_LEN);
789    let split = ct_with_tag.len() - TAG_LEN;
790    let (ct, tag) = ct_with_tag.split_at(split);
791
792    let mut out = Vec::with_capacity(SSE_HEADER_BYTES_V3 + ct.len());
793    out.extend_from_slice(SSE_MAGIC_V3);
794    out.push(ALGO_AES_256_GCM);
795    out.extend_from_slice(key_md5);
796    out.extend_from_slice(&nonce_bytes);
797    out.extend_from_slice(tag);
798    out.extend_from_slice(ct);
799    Bytes::from(out)
800}
801
802/// v0.5 #29 + v0.5 #27: dispatch on the body's magic and decrypt under
803/// whichever source the caller supplied.
804///
805/// - `S4E1` / `S4E2` require `SseSource::Keyring` (return
806///   [`SseError::CustomerKeyRequired`] for `CustomerKey` — service.rs
807///   should map this to "extraneous SSE-C headers" 400).
808/// - `S4E3` requires `SseSource::CustomerKey` (return
809///   [`SseError::CustomerKeyUnexpected`] for `Keyring` — service.rs
810///   should map this to "missing SSE-C headers" 400).
811///
812/// Generic over `Into<SseSource>` so existing `decrypt(body, &keyring)`
813/// call sites compile unchanged via the `From<&SseKeyring>` impl above
814/// — only the new SSE-C path needs to type out
815/// `SseSource::CustomerKey { .. }`.
816///
817/// Distinct errors (`KeyNotInKeyring`, `DecryptFailed`,
818/// `WrongCustomerKey`) let operators tell rotation gaps, ciphertext
819/// tampering, and SSE-C key mismatch apart in audit logs.
820pub fn decrypt<'a, S: Into<SseSource<'a>>>(body: &[u8], source: S) -> Result<Bytes, SseError> {
821    let source = source.into();
822    // Outer short-check uses the smaller of the two header sizes
823    // (S4E1/S4E2 = 36 bytes). Anything below this can't be any valid
824    // SSE frame regardless of magic — keeps back-compat with v0.4 /
825    // v0.5 #29 callers that expected `TooShort` for absurdly short
826    // inputs even when the magic is garbage.
827    if body.len() < SSE_HEADER_BYTES {
828        return Err(SseError::TooShort { got: body.len() });
829    }
830    let mut magic = [0u8; 4];
831    magic.copy_from_slice(&body[..4]);
832    match &magic {
833        m if m == SSE_MAGIC_V1 || m == SSE_MAGIC_V2 => {
834            let keyring = match source {
835                SseSource::Keyring(kr) => kr,
836                SseSource::CustomerKey { .. } => return Err(SseError::CustomerKeyUnexpected),
837                // S4E1/E2 stored under the keyring → SseSource::Kms
838                // is just as nonsensical as CustomerKey here. Re-use
839                // the same "wrong source" error so service.rs can
840                // map both to AWS S3's "extraneous SSE-* headers"
841                // 400.
842                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
843            };
844            if m == SSE_MAGIC_V1 {
845                decrypt_v1_with_keyring(body, keyring)
846            } else {
847                decrypt_v2_with_keyring(body, keyring)
848            }
849        }
850        m if m == SSE_MAGIC_V3 => {
851            // S4E3 has a larger 49-byte header, so re-check.
852            if body.len() < SSE_HEADER_BYTES_V3 {
853                return Err(SseError::TooShort { got: body.len() });
854            }
855            let (key, key_md5) = match source {
856                SseSource::CustomerKey { key, key_md5 } => (key, key_md5),
857                SseSource::Keyring(_) => return Err(SseError::CustomerKeyRequired),
858                SseSource::Kms { .. } => return Err(SseError::CustomerKeyRequired),
859            };
860            decrypt_v3(body, key, key_md5)
861        }
862        m if m == SSE_MAGIC_V4 => {
863            // SSE-KMS unwrap is async (KMS round-trip required).
864            // Caller must dispatch to `decrypt_with_kms` after
865            // peeking the magic — surface this as a distinct error
866            // rather than silently failing.
867            Err(SseError::KmsAsyncRequired)
868        }
869        m if m == SSE_MAGIC_V5 || m == SSE_MAGIC_V6 => {
870            // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): chunked SSE-S4.
871            // Sync back-compat path — verifies + decrypts every
872            // chunk into a single Bytes. Callers that want true
873            // streaming (per-chunk emit) should use
874            // `decrypt_chunked_stream` instead. SSE-C and SSE-KMS
875            // sources are nonsensical here for the same reason as
876            // S4E2 (server-managed keyring only).
877            let keyring = match source {
878                SseSource::Keyring(kr) => kr,
879                SseSource::CustomerKey { .. } => {
880                    return Err(SseError::CustomerKeyUnexpected);
881                }
882                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
883            };
884            // v0.8.2 #64: route through the back-compat wrapper so
885            // the public `decrypt` signature stays stable while the
886            // pre-allocation guard runs against the gateway's
887            // 5 GiB single-object cap. Bespoke callers (e.g. tests
888            // or future API surfaces that want a tighter limit) can
889            // call `decrypt_chunked_buffered` directly.
890            decrypt_chunked_buffered_default(body, keyring)
891        }
892        _ => Err(SseError::BadMagic { got: magic }),
893    }
894}
895
896fn decrypt_v3(
897    body: &[u8],
898    key: &[u8; KEY_LEN],
899    supplied_md5: &[u8; KEY_MD5_LEN],
900) -> Result<Bytes, SseError> {
901    let algo = body[4];
902    if algo != ALGO_AES_256_GCM {
903        return Err(SseError::UnsupportedAlgo { tag: algo });
904    }
905    let mut stored_md5 = [0u8; KEY_MD5_LEN];
906    stored_md5.copy_from_slice(&body[5..5 + KEY_MD5_LEN]);
907    // Cheap fingerprint check first — if the supplied key has a
908    // different MD5 than what was used at PUT, fail fast with a
909    // dedicated error. AES-GCM auth would also catch this (different
910    // key → bad tag) but the bespoke error gives operators an audit
911    // signal distinct from "ciphertext was tampered with".
912    if !constant_time_eq(supplied_md5, &stored_md5) {
913        return Err(SseError::WrongCustomerKey);
914    }
915    let nonce_off = 5 + KEY_MD5_LEN;
916    let tag_off = nonce_off + NONCE_LEN;
917    let mut nonce_bytes = [0u8; NONCE_LEN];
918    nonce_bytes.copy_from_slice(&body[nonce_off..nonce_off + NONCE_LEN]);
919    let mut tag_bytes = [0u8; TAG_LEN];
920    tag_bytes.copy_from_slice(&body[tag_off..tag_off + TAG_LEN]);
921    let ct = &body[SSE_HEADER_BYTES_V3..];
922
923    let aad = aad_v3(&stored_md5);
924    let nonce = Nonce::from_slice(&nonce_bytes);
925    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
926    ct_with_tag.extend_from_slice(ct);
927    ct_with_tag.extend_from_slice(&tag_bytes);
928
929    let aes_key = Key::<Aes256Gcm>::from_slice(key);
930    let cipher = Aes256Gcm::new(aes_key);
931    let plain = cipher
932        .decrypt(
933            nonce,
934            Payload {
935                msg: &ct_with_tag,
936                aad: &aad,
937            },
938        )
939        .map_err(|_| SseError::DecryptFailed)?;
940    Ok(Bytes::from(plain))
941}
942
943/// AAD for S4E4 = magic (4) + algo (1) + key_id_len (1) + key_id +
944/// wrapped_dek_len (4 BE) + wrapped_dek. Putting the variable-length
945/// key_id and wrapped_dek into the AAD means an attacker cannot
946/// rewrite either field to redirect the gateway to a different KEK
947/// or wrapped DEK without invalidating the body's AES-GCM tag.
948///
949/// Length-prefixing key_id and wrapped_dek inside the AAD prevents a
950/// canonicalisation ambiguity: without the length prefix, an
951/// attacker could shift bytes between the two fields and produce the
952/// same AAD bytestream, defeating the per-field tampering check.
953fn aad_v4(key_id: &[u8], wrapped_dek: &[u8]) -> Vec<u8> {
954    let mut aad = Vec::with_capacity(4 + 1 + 1 + key_id.len() + 4 + wrapped_dek.len());
955    aad.extend_from_slice(SSE_MAGIC_V4);
956    aad.push(ALGO_AES_256_GCM);
957    aad.push(key_id.len() as u8);
958    aad.extend_from_slice(key_id);
959    aad.extend_from_slice(&(wrapped_dek.len() as u32).to_be_bytes());
960    aad.extend_from_slice(wrapped_dek);
961    aad
962}
963
964fn encrypt_v4(plaintext: &[u8], dek: &[u8; KEY_LEN], wrapped: &WrappedDek) -> Bytes {
965    // Pre-conditions: key_id must fit in a u8 length prefix and be
966    // non-empty (an empty id means we wouldn't be able to re-fetch
967    // the KEK on GET). wrapped_dek length fits in u32 by the same
968    // logic — at u32::MAX bytes you have bigger problems. We assert
969    // these in debug and silently truncate-or-panic in release; in
970    // practice key_id is a UUID or ARN (<256 chars) and wrapped_dek
971    // is 60 bytes (LocalKms) or ~200 bytes (AWS KMS).
972    assert!(
973        !wrapped.key_id.is_empty() && wrapped.key_id.len() <= u8::MAX as usize,
974        "S4E4 key_id must be 1..=255 bytes (got {})",
975        wrapped.key_id.len()
976    );
977    assert!(
978        wrapped.ciphertext.len() <= u32::MAX as usize,
979        "S4E4 wrapped_dek longer than u32::MAX",
980    );
981
982    let aes_key = Key::<Aes256Gcm>::from_slice(dek);
983    let cipher = Aes256Gcm::new(aes_key);
984    let mut nonce_bytes = [0u8; NONCE_LEN];
985    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
986    let nonce = Nonce::from_slice(&nonce_bytes);
987    let aad = aad_v4(wrapped.key_id.as_bytes(), &wrapped.ciphertext);
988    let ct_with_tag = cipher
989        .encrypt(
990            nonce,
991            Payload {
992                msg: plaintext,
993                aad: &aad,
994            },
995        )
996        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
997    debug_assert!(ct_with_tag.len() >= TAG_LEN);
998    let split = ct_with_tag.len() - TAG_LEN;
999    let (ct, tag) = ct_with_tag.split_at(split);
1000
1001    let key_id_bytes = wrapped.key_id.as_bytes();
1002    let mut out = Vec::with_capacity(
1003        4 + 1
1004            + 1
1005            + key_id_bytes.len()
1006            + 4
1007            + wrapped.ciphertext.len()
1008            + NONCE_LEN
1009            + TAG_LEN
1010            + ct.len(),
1011    );
1012    out.extend_from_slice(SSE_MAGIC_V4);
1013    out.push(ALGO_AES_256_GCM);
1014    out.push(key_id_bytes.len() as u8);
1015    out.extend_from_slice(key_id_bytes);
1016    out.extend_from_slice(&(wrapped.ciphertext.len() as u32).to_be_bytes());
1017    out.extend_from_slice(&wrapped.ciphertext);
1018    out.extend_from_slice(&nonce_bytes);
1019    out.extend_from_slice(tag);
1020    out.extend_from_slice(ct);
1021    Bytes::from(out)
1022}
1023
1024/// Parsed view of an S4E4 frame's variable-length header. Returned
1025/// by [`parse_s4e4_header`] so both the async [`decrypt_with_kms`]
1026/// path and any future inspection code (e.g. an admin tool that
1027/// needs to enumerate object → KMS-key bindings) can reuse the same
1028/// parser without re-implementing offset math.
1029#[derive(Debug)]
1030pub struct S4E4Header<'a> {
1031    pub key_id: &'a str,
1032    pub wrapped_dek: &'a [u8],
1033    pub nonce: &'a [u8],
1034    pub tag: &'a [u8],
1035    pub ciphertext: &'a [u8],
1036}
1037
1038/// Parse the (variable-length) S4E4 header. Pure byte-shuffling — no
1039/// crypto, no KMS round-trip. Returns errors on truncation /
1040/// out-of-bounds field lengths / non-UTF-8 key_id.
1041pub fn parse_s4e4_header(body: &[u8]) -> Result<S4E4Header<'_>, SseError> {
1042    // Minimum: magic(4) + algo(1) + key_id_len(1) + key_id(>=1) +
1043    // wrapped_dek_len(4) + wrapped_dek(>=1) + nonce(12) + tag(16)
1044    // = 40 bytes. We use a slightly looser floor here (bytes for
1045    // empty fields = 38) and let the per-field bounds checks below
1046    // catch the actual short reads.
1047    const S4E4_MIN: usize = 4 + 1 + 1 + 4 + NONCE_LEN + TAG_LEN; // 38
1048    if body.len() < S4E4_MIN {
1049        return Err(SseError::KmsFrameTooShort {
1050            got: body.len(),
1051            min: S4E4_MIN,
1052        });
1053    }
1054    let magic = &body[..4];
1055    if magic != SSE_MAGIC_V4 {
1056        let mut got = [0u8; 4];
1057        got.copy_from_slice(magic);
1058        return Err(SseError::BadMagic { got });
1059    }
1060    let algo = body[4];
1061    if algo != ALGO_AES_256_GCM {
1062        return Err(SseError::UnsupportedAlgo { tag: algo });
1063    }
1064    let key_id_len = body[5] as usize;
1065    let key_id_off: usize = 6;
1066    let key_id_end = key_id_off
1067        .checked_add(key_id_len)
1068        .ok_or(SseError::KmsFrameFieldOob { what: "key_id_len" })?;
1069    if key_id_end + 4 > body.len() {
1070        return Err(SseError::KmsFrameFieldOob { what: "key_id" });
1071    }
1072    let key_id = std::str::from_utf8(&body[key_id_off..key_id_end])
1073        .map_err(|_| SseError::KmsKeyIdNotUtf8)?;
1074    let wrapped_len_off = key_id_end;
1075    let wrapped_dek_len = u32::from_be_bytes([
1076        body[wrapped_len_off],
1077        body[wrapped_len_off + 1],
1078        body[wrapped_len_off + 2],
1079        body[wrapped_len_off + 3],
1080    ]) as usize;
1081    let wrapped_off = wrapped_len_off + 4;
1082    let wrapped_end =
1083        wrapped_off
1084            .checked_add(wrapped_dek_len)
1085            .ok_or(SseError::KmsFrameFieldOob {
1086                what: "wrapped_dek_len",
1087            })?;
1088    if wrapped_end + NONCE_LEN + TAG_LEN > body.len() {
1089        return Err(SseError::KmsFrameFieldOob {
1090            what: "wrapped_dek",
1091        });
1092    }
1093    let wrapped_dek = &body[wrapped_off..wrapped_end];
1094    let nonce_off = wrapped_end;
1095    let tag_off = nonce_off + NONCE_LEN;
1096    let ct_off = tag_off + TAG_LEN;
1097    let nonce = &body[nonce_off..nonce_off + NONCE_LEN];
1098    let tag = &body[tag_off..tag_off + TAG_LEN];
1099    let ciphertext = &body[ct_off..];
1100    Ok(S4E4Header {
1101        key_id,
1102        wrapped_dek,
1103        nonce,
1104        tag,
1105        ciphertext,
1106    })
1107}
1108
1109/// Async decrypt for S4E4 (SSE-KMS) bodies. Caller supplies the KMS
1110/// backend; this function parses the frame, calls
1111/// `kms.decrypt_dek(...)` to unwrap the DEK, then runs AES-256-GCM
1112/// to recover the plaintext.
1113///
1114/// service.rs's GET handler should peek the magic with [`peek_magic`]
1115/// and dispatch:
1116///
1117/// - `Some("S4E4")` → `decrypt_with_kms(blob, &*kms).await`
1118/// - everything else → existing sync `decrypt(blob, source)`
1119///
1120/// Note: we don't go through `SseSource::Kms` here because the
1121/// wrapped DEK + key_id come from the frame itself, not from the
1122/// request — the `SseSource` is built for sync paths where the
1123/// caller already knows the key.
1124pub async fn decrypt_with_kms(body: &[u8], kms: &dyn KmsBackend) -> Result<Bytes, SseError> {
1125    let hdr = parse_s4e4_header(body)?;
1126    let wrapped = WrappedDek {
1127        key_id: hdr.key_id.to_string(),
1128        ciphertext: hdr.wrapped_dek.to_vec(),
1129    };
1130    let dek_vec = kms.decrypt_dek(&wrapped).await?;
1131    if dek_vec.len() != KEY_LEN {
1132        // KMS returned a non-32-byte plaintext. AES-256 needs exactly
1133        // 32 bytes. This shouldn't happen with `KeySpec=AES_256` but
1134        // surface as a backend error so it's auditable rather than
1135        // panicking.
1136        return Err(SseError::KmsBackend(KmsError::BackendUnavailable {
1137            message: format!(
1138                "KMS returned {} byte DEK; expected {KEY_LEN}",
1139                dek_vec.len()
1140            ),
1141        }));
1142    }
1143    let mut dek = [0u8; KEY_LEN];
1144    dek.copy_from_slice(&dek_vec);
1145
1146    let aad = aad_v4(hdr.key_id.as_bytes(), hdr.wrapped_dek);
1147    let aes_key = Key::<Aes256Gcm>::from_slice(&dek);
1148    let cipher = Aes256Gcm::new(aes_key);
1149    let nonce = Nonce::from_slice(hdr.nonce);
1150    let mut ct_with_tag = Vec::with_capacity(hdr.ciphertext.len() + TAG_LEN);
1151    ct_with_tag.extend_from_slice(hdr.ciphertext);
1152    ct_with_tag.extend_from_slice(hdr.tag);
1153    let plain = cipher
1154        .decrypt(
1155            nonce,
1156            Payload {
1157                msg: &ct_with_tag,
1158                aad: &aad,
1159            },
1160        )
1161        .map_err(|_| SseError::DecryptFailed)?;
1162    Ok(Bytes::from(plain))
1163}
1164
1165fn decrypt_v1_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1166    let algo = body[4];
1167    if algo != ALGO_AES_256_GCM {
1168        return Err(SseError::UnsupportedAlgo { tag: algo });
1169    }
1170    // body[5..8] reserved (must be ignored — v0.4 wrote zeros, but we
1171    // didn't auth them so we can't insist on it).
1172    let mut nonce_bytes = [0u8; NONCE_LEN];
1173    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1174    let mut tag_bytes = [0u8; TAG_LEN];
1175    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1176    let ct = &body[SSE_HEADER_BYTES..];
1177
1178    let aad = aad_v1();
1179    let nonce = Nonce::from_slice(&nonce_bytes);
1180    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1181    ct_with_tag.extend_from_slice(ct);
1182    ct_with_tag.extend_from_slice(&tag_bytes);
1183
1184    // Active key first, then any others. v0.4 deployments that flip to
1185    // v0.5 with their original key as active hit this path on the
1186    // first try for every legacy object.
1187    let (active_id, _active_key) = keyring.active();
1188    let mut ids: Vec<u16> = keyring.keys.keys().copied().collect();
1189    ids.sort_by_key(|id| if *id == active_id { 0 } else { 1 });
1190    for id in ids {
1191        let key = keyring.get(id).expect("id came from keyring iteration");
1192        let cipher = Aes256Gcm::new(key.as_aes_key());
1193        if let Ok(plain) = cipher.decrypt(
1194            nonce,
1195            Payload {
1196                msg: &ct_with_tag,
1197                aad: &aad,
1198            },
1199        ) {
1200            return Ok(Bytes::from(plain));
1201        }
1202    }
1203    Err(SseError::DecryptFailed)
1204}
1205
1206fn decrypt_v2_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1207    let algo = body[4];
1208    if algo != ALGO_AES_256_GCM {
1209        return Err(SseError::UnsupportedAlgo { tag: algo });
1210    }
1211    let key_id = u16::from_be_bytes([body[5], body[6]]);
1212    // body[7] reserved (1B), authenticated as 0 via AAD.
1213    let key = keyring
1214        .get(key_id)
1215        .ok_or(SseError::KeyNotInKeyring { id: key_id })?;
1216    let mut nonce_bytes = [0u8; NONCE_LEN];
1217    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1218    let mut tag_bytes = [0u8; TAG_LEN];
1219    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1220    let ct = &body[SSE_HEADER_BYTES..];
1221
1222    let aad = aad_v2(key_id);
1223    let nonce = Nonce::from_slice(&nonce_bytes);
1224    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1225    ct_with_tag.extend_from_slice(ct);
1226    ct_with_tag.extend_from_slice(&tag_bytes);
1227    let cipher = Aes256Gcm::new(key.as_aes_key());
1228    let plain = cipher
1229        .decrypt(
1230            nonce,
1231            Payload {
1232                msg: &ct_with_tag,
1233                aad: &aad,
1234            },
1235        )
1236        .map_err(|_| SseError::DecryptFailed)?;
1237    Ok(Bytes::from(plain))
1238}
1239
1240/// Detect whether `body` is SSE-S4 encrypted (S4E1, S4E2, S4E3, or
1241/// S4E4) by sniffing the first 4 magic bytes. Used by the GET path
1242/// to decide whether to run decryption before frame parsing.
1243///
1244/// We require a length check that's safe for *any* of the four
1245/// frames — `SSE_HEADER_BYTES` (36) is the smallest valid header
1246/// (S4E1 / S4E2). S4E3 is 49 bytes; S4E4 is variable but always >=
1247/// 38 bytes. The per-frame decrypt path re-checks the appropriate
1248/// minimum, so this 36-byte gate is just a fast rejection of
1249/// obviously-too-short bodies.
1250pub fn looks_encrypted(body: &[u8]) -> bool {
1251    if body.len() < SSE_HEADER_BYTES {
1252        return false;
1253    }
1254    let m = &body[..4];
1255    m == SSE_MAGIC_V1
1256        || m == SSE_MAGIC_V2
1257        || m == SSE_MAGIC_V3
1258        || m == SSE_MAGIC_V4
1259        || m == SSE_MAGIC_V5
1260        || m == SSE_MAGIC_V6
1261}
1262
1263/// Peek the SSE-S4 magic at the front of `body`, returning a
1264/// stringified frame variant identifier or `None` if `body` is not
1265/// recognized as SSE-S4. Used by the GET path to dispatch between
1266/// the sync [`decrypt`] (S4E1/E2/E3) and the async
1267/// [`decrypt_with_kms`] (S4E4).
1268///
1269/// Returns the same length-gated result as [`looks_encrypted`]: any
1270/// body shorter than `SSE_HEADER_BYTES` (36 bytes) returns `None`,
1271/// so the caller can use this as both the "is encrypted" signal and
1272/// the "which frame" signal in one cheap byte-comparison.
1273pub fn peek_magic(body: &[u8]) -> Option<&'static str> {
1274    if body.len() < SSE_HEADER_BYTES {
1275        return None;
1276    }
1277    match &body[..4] {
1278        m if m == SSE_MAGIC_V1 => Some("S4E1"),
1279        m if m == SSE_MAGIC_V2 => Some("S4E2"),
1280        m if m == SSE_MAGIC_V3 => Some("S4E3"),
1281        m if m == SSE_MAGIC_V4 => Some("S4E4"),
1282        // v0.8 #52: chunked SSE-S4. service.rs's GET handler
1283        // dispatches "S4E5" / "S4E6" → `decrypt_chunked_stream`
1284        // for true streaming GET; the sync `decrypt(...)` also
1285        // accepts both (back-compat — buffered concat).
1286        m if m == SSE_MAGIC_V5 => Some("S4E5"),
1287        // v0.8.1 #57: same dispatch as S4E5 — wider salt only.
1288        m if m == SSE_MAGIC_V6 => Some("S4E6"),
1289        _ => None,
1290    }
1291}
1292
1293pub type SharedSseKey = Arc<SseKey>;
1294
1295// ===========================================================================
1296// v0.8 #52 (S4E5, read-only) + v0.8.1 #57 (S4E6, current emit) —
1297// chunked variant of S4E2 for streaming GET
1298// ===========================================================================
1299//
1300// ## S4E5 wire format (v0.8 #52, **read-only as of v0.8.1 #57**)
1301//
1302// ```text
1303// magic         4B    "S4E5"
1304// algo          1B    0x01 (AES-256-GCM)
1305// key_id        2B    BE — keyring slot the active key was at PUT time
1306// reserved      1B    0x00
1307// chunk_size    4B    BE — plaintext bytes per chunk (final chunk may be smaller)
1308// chunk_count   4B    BE — total chunks (always >= 1; empty plaintext = 1 zero-byte chunk)
1309// salt          4B    random per-PUT, mixed into every nonce
1310// [chunk_count] × {
1311//   tag         16B   AES-GCM auth tag for this chunk
1312//   ciphertext  N B   chunk_size bytes (final chunk: 0..=chunk_size bytes)
1313// }
1314// ```
1315//
1316// Fixed header = 20 bytes ([`S4E5_HEADER_BYTES`]).
1317//
1318// ## S4E6 wire format (v0.8.1 #57, current PUT emit)
1319//
1320// ```text
1321// magic         4B    "S4E6"
1322// algo          1B    0x01 (AES-256-GCM)
1323// key_id        2B    BE
1324// reserved      1B    0x00
1325// chunk_size    4B    BE
1326// chunk_count   4B    BE
1327// salt          8B    random per-PUT  ← 4B → 8B widened
1328// [chunk_count] × { tag 16B, ciphertext N B }
1329// ```
1330//
1331// Fixed header = 24 bytes ([`S4E6_HEADER_BYTES`]). Chunk array
1332// layout is byte-identical to S4E5; only the header (salt 4 → 8)
1333// and the nonce/AAD derivation differ.
1334//
1335// ## Per-chunk overhead (both S4E5 and S4E6)
1336//
1337// 16 bytes — just the AES-GCM auth tag. AES-GCM is CTR-mode, so
1338// `ciphertext.len() == plaintext.len()`. Total overhead for an
1339// N-byte plaintext at chunk size C: `header + ceil(N/C) * 16`.
1340//
1341// ## S4E5 nonce / AAD (read-only)
1342//
1343// ```text
1344// nonce_v5[0..4]  = b"E5\x00\x00"
1345// nonce_v5[4..8]  = salt (4 B)
1346// nonce_v5[8..12] = chunk_index BE (u32)
1347//
1348// aad_v5 = b"S4E5" || algo (1) || chunk_index BE (4) || total BE (4)
1349//        || key_id BE (2) || salt (4)
1350// ```
1351//
1352// Birthday-collision threshold on the 4-byte salt: ~50% at ~65,536
1353// distinct PUTs under the same key — the security regression that
1354// motivated #57.
1355//
1356// ## S4E6 nonce / AAD (current emit)
1357//
1358// ```text
1359// nonce_v6[0]     = b'E'                   (1 B fixed prefix)
1360// nonce_v6[1..9]  = salt (8 B)             (per-PUT random from OsRng)
1361// nonce_v6[9..12] = chunk_index BE (u24)   (3 B → max 16_777_215 chunks)
1362//
1363// aad_v6 = b"S4E6" || algo (1) || chunk_index BE (4) || total BE (4)
1364//        || key_id BE (2) || salt (8)
1365// ```
1366//
1367// Wider salt: birthday collision ~50% at ~2^32 = ~4.3 billion
1368// PUTs/key — four orders of magnitude over S4E5.
1369//
1370// chunk_index narrows from 32-bit to 24-bit, capping `chunk_count`
1371// at `2^24 - 1 = 16_777_215`. At the default `--sse-chunk-size
1372// 1048576` (1 MiB) that's ~16 PiB per object — three orders of
1373// magnitude over S3's 5 GiB single-object cap. Smaller chunk sizes
1374// need to be sized carefully: e.g. `--sse-chunk-size 64` on a
1375// > 1 GiB object would exceed the cap (1 GiB / 64 B = 16M+1
1376// chunks); such configurations surface
1377// [`SseError::ChunkCountTooLarge`] at PUT time rather than
1378// silently truncating.
1379//
1380// AAD on both variants includes the chunk index + total so chunk
1381// reordering or dropping fails the per-chunk tag, plus key_id +
1382// salt so header tampering also fails auth.
1383
1384/// Fixed header size of an S4E5 frame, before any chunks. `magic 4 +
1385/// algo 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 +
1386/// salt 4` = 20 bytes.
1387pub const S4E5_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 4; // = 20
1388
1389/// Per-chunk overhead inside an S4E5 / S4E6 frame: just the AES-GCM
1390/// auth tag. `ciphertext.len() == plaintext.len()` (CTR mode), so a
1391/// chunk of N plaintext bytes costs N + 16 on disk.
1392pub const S4E5_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1393
1394/// v0.8.1 #57: fixed header size of an S4E6 frame. Same layout as
1395/// S4E5 except the per-PUT salt widens 4 → 8 bytes: `magic 4 + algo
1396/// 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 + salt
1397/// 8` = 24 bytes.
1398pub const S4E6_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 8; // = 24
1399
1400/// v0.8.1 #57: per-chunk overhead for S4E6. Identical to S4E5
1401/// (same AES-GCM tag size). Re-exported as a distinct const so call
1402/// sites that compute on-disk size for S4E6 specifically can spell
1403/// the magic clearly in their arithmetic.
1404pub const S4E6_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1405
1406/// v0.8.1 #57: maximum `chunk_count` that fits in the S4E6 nonce's
1407/// 24-bit chunk_index field. At 1 MiB chunks this is ~16 PiB per
1408/// object — three orders of magnitude over S3's 5 GiB single-object
1409/// cap, so it's not a practical limit at the default chunk size.
1410pub const S4E6_MAX_CHUNK_COUNT: u32 = (1u32 << 24) - 1; // 16_777_215
1411
1412/// 4-byte fixed prefix of every S4E5 nonce. Distinct from the bytes
1413/// a random S4E1/E2 nonce could plausibly start with so debugging
1414/// dumps can immediately tell "this is a chunked nonce" from the
1415/// first 4 bytes.
1416const S4E5_NONCE_TAG: [u8; 4] = [b'E', b'5', 0, 0];
1417
1418/// 1-byte fixed prefix of every S4E6 nonce. Trades 3 of S4E5's 4
1419/// "tag" bytes for 4 extra salt bytes (4 → 8) and 0 of the chunk
1420/// index bytes (24-bit instead of 32-bit). The remaining `b'E'`
1421/// keeps debug dumps recognizable as "chunked SSE-S4 nonce".
1422const S4E6_NONCE_PREFIX: u8 = b'E';
1423
1424/// Variant tag for the chunked-frame helpers. Selects the nonce +
1425/// AAD derivation (and incidentally the salt width). The
1426/// chunk-array layout is byte-identical for both — only the header
1427/// size and the nonce/AAD derivation differ.
1428#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1429enum ChunkedVariant {
1430    V5,
1431    V6,
1432}
1433
1434impl ChunkedVariant {
1435    fn header_bytes(self) -> usize {
1436        match self {
1437            ChunkedVariant::V5 => S4E5_HEADER_BYTES,
1438            ChunkedVariant::V6 => S4E6_HEADER_BYTES,
1439        }
1440    }
1441}
1442
1443/// Build the per-chunk AAD for an S4E5 chunk. Includes magic + algo
1444/// plus the structural chunk_index/total_chunks (so chunk reordering
1445/// fails auth) plus key_id + salt (so header tampering — flipping
1446/// key_id or salt — also fails auth).
1447fn aad_v5(
1448    chunk_index: u32,
1449    total_chunks: u32,
1450    key_id: u16,
1451    salt: &[u8; 4],
1452) -> [u8; 4 + 1 + 4 + 4 + 2 + 4] {
1453    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 4]; // = 19
1454    aad[..4].copy_from_slice(SSE_MAGIC_V5);
1455    aad[4] = ALGO_AES_256_GCM;
1456    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1457    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1458    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1459    aad[15..19].copy_from_slice(salt);
1460    aad
1461}
1462
1463/// v0.8.1 #57: per-chunk AAD for S4E6. Same structural fields as
1464/// [`aad_v5`] (magic + algo + chunk_index + total + key_id + salt)
1465/// but with the wider 8-byte salt and the new `b"S4E6"` magic, so
1466/// an attacker can't strip the version tag and replay an S4E5
1467/// nonce/tag against an S4E6 frame.
1468fn aad_v6(
1469    chunk_index: u32,
1470    total_chunks: u32,
1471    key_id: u16,
1472    salt: &[u8; 8],
1473) -> [u8; 4 + 1 + 4 + 4 + 2 + 8] {
1474    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 8]; // = 23
1475    aad[..4].copy_from_slice(SSE_MAGIC_V6);
1476    aad[4] = ALGO_AES_256_GCM;
1477    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1478    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1479    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1480    aad[15..23].copy_from_slice(salt);
1481    aad
1482}
1483
1484/// Derive the 12-byte AES-GCM nonce for chunk `chunk_index` from the
1485/// per-PUT `salt`. Pure function; no RNG state — the same `(salt,
1486/// chunk_index)` always yields the same nonce, which is the whole
1487/// point: GET reads `salt` from the header and walks the chunks
1488/// without storing 12 bytes of nonce per chunk.
1489fn nonce_v5(salt: &[u8; 4], chunk_index: u32) -> [u8; NONCE_LEN] {
1490    let mut n = [0u8; NONCE_LEN];
1491    n[..4].copy_from_slice(&S4E5_NONCE_TAG);
1492    n[4..8].copy_from_slice(salt);
1493    n[8..12].copy_from_slice(&chunk_index.to_be_bytes());
1494    n
1495}
1496
1497/// v0.8.1 #57: derive the 12-byte AES-GCM nonce for an S4E6 chunk:
1498/// `b'E'(1) || salt(8) || chunk_index_BE_u24(3)`. The 24-bit
1499/// chunk_index caps `chunk_count` at 16,777,215 — see
1500/// [`S4E6_MAX_CHUNK_COUNT`]. The pre-encrypt path enforces this cap
1501/// and surfaces [`SseError::ChunkCountTooLarge`], so this function
1502/// only ever sees `chunk_index <= 0xFF_FFFF` (the leading byte of
1503/// the BE u32 is dropped).
1504fn nonce_v6(salt: &[u8; 8], chunk_index: u32) -> [u8; NONCE_LEN] {
1505    debug_assert!(
1506        chunk_index <= S4E6_MAX_CHUNK_COUNT,
1507        "S4E6 chunk_index {chunk_index} exceeds 24-bit cap (caller MUST validate)",
1508    );
1509    let mut n = [0u8; NONCE_LEN];
1510    n[0] = S4E6_NONCE_PREFIX;
1511    n[1..9].copy_from_slice(salt);
1512    let be = chunk_index.to_be_bytes(); // [b3, b2, b1, b0] of u32
1513    // Take the low 3 bytes (b2, b1, b0) — the high byte is 0 by the
1514    // S4E6_MAX_CHUNK_COUNT cap above.
1515    n[9..12].copy_from_slice(&be[1..4]);
1516    n
1517}
1518
1519/// v0.8 #52 / v0.8.1 #57: encrypt `plaintext` under `keyring`'s
1520/// active key, sliced into independently-sealed AES-GCM chunks of
1521/// `chunk_size` plaintext bytes each. Returns the on-the-wire
1522/// **S4E6** frame (v0.8.1 #57 widened the per-PUT salt 4 B → 8 B;
1523/// the S4E5 emit path was retired but the [`decrypt`] /
1524/// [`decrypt_chunked_stream`] paths still read S4E5 objects for
1525/// back-compat).
1526///
1527/// Errors:
1528/// - [`SseError::ChunkSizeInvalid`] if `chunk_size == 0`.
1529/// - [`SseError::ChunkCountTooLarge`] if
1530///   `ceil(plaintext.len() / chunk_size) > 16_777_215` (the S4E6
1531///   24-bit chunk_index cap; pick a larger `--sse-chunk-size`).
1532///
1533/// Empty plaintext is permitted and produces a frame with
1534/// `chunk_count = 1, ciphertext_len = 0` (one all-tag chunk). That
1535/// keeps the GET chunk-walk loop simpler — it never has to
1536/// special-case zero chunks.
1537///
1538/// `chunk_size` is the *plaintext* bytes per chunk; the on-disk
1539/// ciphertext per chunk is the same number (AES-GCM is CTR-mode),
1540/// plus the 16-byte tag prepended.
1541pub fn encrypt_v2_chunked(
1542    plaintext: &[u8],
1543    keyring: &SseKeyring,
1544    chunk_size: usize,
1545) -> Result<Bytes, SseError> {
1546    if chunk_size == 0 {
1547        return Err(SseError::ChunkSizeInvalid);
1548    }
1549    let (key_id, key) = keyring.active();
1550    let cipher = Aes256Gcm::new(key.as_aes_key());
1551    let mut salt = [0u8; 8];
1552    rand::rngs::OsRng.fill_bytes(&mut salt);
1553
1554    // Always emit at least one chunk (so an empty plaintext still
1555    // has a well-defined header → chunk_count >= 1 invariant).
1556    let chunk_count_usize = if plaintext.is_empty() {
1557        1
1558    } else {
1559        plaintext.len().div_ceil(chunk_size)
1560    };
1561    // Saturating-cast to u32 so we report ChunkCountTooLarge cleanly
1562    // for inputs that would overflow u32 too (would need a > 16 EiB
1563    // plaintext at chunk_size = 1 — astronomical, but defensive).
1564    let chunk_count: u32 = u32::try_from(chunk_count_usize).unwrap_or(u32::MAX);
1565    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1566        return Err(SseError::ChunkCountTooLarge {
1567            got: chunk_count,
1568            max: S4E6_MAX_CHUNK_COUNT,
1569        });
1570    }
1571
1572    let mut out = Vec::with_capacity(
1573        S4E6_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E6_PER_CHUNK_OVERHEAD),
1574    );
1575    out.extend_from_slice(SSE_MAGIC_V6);
1576    out.push(ALGO_AES_256_GCM);
1577    out.extend_from_slice(&key_id.to_be_bytes());
1578    out.push(0u8); // reserved
1579    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
1580    out.extend_from_slice(&chunk_count.to_be_bytes());
1581    out.extend_from_slice(&salt);
1582
1583    for i in 0..chunk_count {
1584        let off = (i as usize).saturating_mul(chunk_size);
1585        let end = off.saturating_add(chunk_size).min(plaintext.len());
1586        let chunk_pt: &[u8] = if off >= plaintext.len() {
1587            // Empty-plaintext / past-end (only the single-chunk
1588            // empty-plaintext case lands here).
1589            &[]
1590        } else {
1591            &plaintext[off..end]
1592        };
1593        let nonce_bytes = nonce_v6(&salt, i);
1594        let nonce = Nonce::from_slice(&nonce_bytes);
1595        let aad = aad_v6(i, chunk_count, key_id, &salt);
1596        let ct_with_tag = cipher
1597            .encrypt(
1598                nonce,
1599                Payload {
1600                    msg: chunk_pt,
1601                    aad: &aad,
1602                },
1603            )
1604            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
1605        debug_assert!(ct_with_tag.len() >= TAG_LEN);
1606        let split = ct_with_tag.len() - TAG_LEN;
1607        let (ct, tag) = ct_with_tag.split_at(split);
1608        out.extend_from_slice(tag);
1609        out.extend_from_slice(ct);
1610        crate::metrics::record_sse_streaming_chunk("encrypt");
1611    }
1612    Ok(Bytes::from(out))
1613}
1614
1615/// Salt material for a chunked frame — branches on variant so the
1616/// shared chunk-walking loop can carry both 4-byte (S4E5) and
1617/// 8-byte (S4E6) salts without an extra heap alloc.
1618#[derive(Debug, Clone, Copy)]
1619enum ChunkedSalt {
1620    V5([u8; 4]),
1621    V6([u8; 8]),
1622}
1623
1624/// Parsed S4E5 / S4E6 header — fixed-layout fields. Used by the
1625/// buffered ([`decrypt_chunked_buffered`]) and streaming
1626/// ([`decrypt_chunked_stream`]) paths to share frame validation
1627/// across both variants.
1628#[derive(Debug, Clone, Copy)]
1629struct ChunkedHeader {
1630    /// Used only by tests today (asserts on which frame variant
1631    /// parsed); in production the variant is implicit in
1632    /// `salt`'s ChunkedSalt arm. Kept as a field rather than
1633    /// re-deriving from `salt` so the parser writes one source of
1634    /// truth.
1635    #[allow(dead_code)]
1636    variant: ChunkedVariant,
1637    key_id: u16,
1638    chunk_size: u32,
1639    chunk_count: u32,
1640    salt: ChunkedSalt,
1641    /// Byte offset where the chunk array starts (always
1642    /// `variant.header_bytes()`; carried in the struct so call sites
1643    /// don't have to re-derive it from the variant tag).
1644    chunks_offset: usize,
1645}
1646
1647/// Parsed view of an S4E6 frame's fixed header. Public mirror of
1648/// the S4E4 parser — useful for admin tools or future inspectors
1649/// that want to enumerate object → key_id bindings without
1650/// re-implementing the offset math. The `salt` borrow keeps
1651/// allocations to zero (the slice points back into the input
1652/// buffer).
1653#[derive(Debug, Clone, Copy)]
1654pub struct S4E6Header<'a> {
1655    pub key_id: u16,
1656    pub chunk_size: u32,
1657    pub chunk_count: u32,
1658    pub salt: &'a [u8; 8],
1659}
1660
1661/// Pure byte-shuffle parser for an S4E6 fixed header (24 bytes). No
1662/// crypto, no keyring lookup. Errors on truncation, wrong magic,
1663/// unsupported algo, or zero `chunk_size` / `chunk_count`.
1664pub fn parse_s4e6_header(blob: &[u8]) -> Result<S4E6Header<'_>, SseError> {
1665    if blob.len() < S4E6_HEADER_BYTES {
1666        return Err(SseError::ChunkFrameTruncated { what: "header" });
1667    }
1668    if &blob[..4] != SSE_MAGIC_V6 {
1669        let mut got = [0u8; 4];
1670        got.copy_from_slice(&blob[..4]);
1671        return Err(SseError::BadMagic { got });
1672    }
1673    let algo = blob[4];
1674    if algo != ALGO_AES_256_GCM {
1675        return Err(SseError::UnsupportedAlgo { tag: algo });
1676    }
1677    let key_id = u16::from_be_bytes([blob[5], blob[6]]);
1678    // blob[7] = reserved (0; authenticated as 0 via AAD).
1679    let chunk_size = u32::from_be_bytes([blob[8], blob[9], blob[10], blob[11]]);
1680    let chunk_count = u32::from_be_bytes([blob[12], blob[13], blob[14], blob[15]]);
1681    if chunk_size == 0 {
1682        return Err(SseError::ChunkSizeInvalid);
1683    }
1684    if chunk_count == 0 {
1685        return Err(SseError::ChunkFrameTruncated {
1686            what: "chunk_count == 0",
1687        });
1688    }
1689    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1690        return Err(SseError::ChunkCountTooLarge {
1691            got: chunk_count,
1692            max: S4E6_MAX_CHUNK_COUNT,
1693        });
1694    }
1695    let salt: &[u8; 8] = (&blob[16..24]).try_into().expect("8B salt slice");
1696    Ok(S4E6Header {
1697        key_id,
1698        chunk_size,
1699        chunk_count,
1700        salt,
1701    })
1702}
1703
1704fn parse_chunked_header(body: &[u8], max_body_bytes: usize) -> Result<ChunkedHeader, SseError> {
1705    if body.len() < 4 {
1706        return Err(SseError::ChunkFrameTruncated { what: "magic" });
1707    }
1708    let magic = &body[..4];
1709    let variant = if magic == SSE_MAGIC_V5 {
1710        ChunkedVariant::V5
1711    } else if magic == SSE_MAGIC_V6 {
1712        ChunkedVariant::V6
1713    } else {
1714        let mut got = [0u8; 4];
1715        got.copy_from_slice(magic);
1716        return Err(SseError::BadMagic { got });
1717    };
1718    let header_bytes = variant.header_bytes();
1719    if body.len() < header_bytes {
1720        return Err(SseError::ChunkFrameTruncated { what: "header" });
1721    }
1722    let algo = body[4];
1723    if algo != ALGO_AES_256_GCM {
1724        return Err(SseError::UnsupportedAlgo { tag: algo });
1725    }
1726    let key_id = u16::from_be_bytes([body[5], body[6]]);
1727    // body[7] = reserved (must be 0; authenticated as 0 via AAD).
1728    let chunk_size = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
1729    let chunk_count = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
1730    if chunk_size == 0 {
1731        return Err(SseError::ChunkSizeInvalid);
1732    }
1733    if chunk_count == 0 {
1734        return Err(SseError::ChunkFrameTruncated {
1735            what: "chunk_count == 0",
1736        });
1737    }
1738    let salt = match variant {
1739        ChunkedVariant::V5 => {
1740            let mut s = [0u8; 4];
1741            s.copy_from_slice(&body[16..20]);
1742            ChunkedSalt::V5(s)
1743        }
1744        ChunkedVariant::V6 => {
1745            // v0.8.1 #57 sanity check: the encoder enforces this cap,
1746            // but a tampered / malicious frame could declare a huge
1747            // chunk_count that would loop the walker 16M+ times if
1748            // we trusted it. Reject early.
1749            if chunk_count > S4E6_MAX_CHUNK_COUNT {
1750                return Err(SseError::ChunkCountTooLarge {
1751                    got: chunk_count,
1752                    max: S4E6_MAX_CHUNK_COUNT,
1753                });
1754            }
1755            let mut s = [0u8; 8];
1756            s.copy_from_slice(&body[16..24]);
1757            ChunkedSalt::V6(s)
1758        }
1759    };
1760
1761    // v0.8.2 #64 — DoS guard. Pre-validate the declared
1762    // (chunk_size × chunk_count) plaintext size in u64 before *any*
1763    // downstream allocation or chunk-walk loop. Rejects:
1764    //   1. arithmetic that would overflow u64 (e.g. chunk_size =
1765    //      u32::MAX, chunk_count = u32::MAX → 2^64 ≫ u64::MAX),
1766    //   2. a body that is *larger* than the maximum a header with
1767    //      these declared sizes could plausibly carry (i.e. trailing
1768    //      bytes after the final chunk → corruption / append attack),
1769    //      and
1770    //   3. a plaintext size larger than the caller's configured
1771    //      `max_body_bytes` (default 5 GiB).
1772    //
1773    // We do NOT require the body to be `>= max_total` here — the
1774    // final chunk may legitimately hold fewer than `chunk_size`
1775    // plaintext bytes (encoder uses
1776    // `chunk_count = ceil(plaintext.len() / chunk_size)`, so the last
1777    // chunk holds the remainder). The walker handles that
1778    // variable-length tail; truncation of the final chunk's tag /
1779    // ciphertext surfaces as `ChunkFrameTruncated` once the walker
1780    // gets there. The pre-alloc guard's job is the *upper-bound*
1781    // checks: kill obviously-impossible shapes before we
1782    // `Vec::with_capacity(chunk_size × chunk_count)`.
1783    //
1784    // All paths return concrete error variants; nothing panics on
1785    // adversarial input. The error strings carry only constant
1786    // operator-readable labels (no echoed numbers) so the response is
1787    // not a tampering oracle for the attacker.
1788    let chunk_size_u64 = chunk_size as u64;
1789    let chunk_count_u64 = chunk_count as u64;
1790    let expected_plain_size =
1791        chunk_size_u64
1792            .checked_mul(chunk_count_u64)
1793            .ok_or(SseError::ChunkFrameTooLarge {
1794                details: "chunk_size * chunk_count overflows u64",
1795            })?;
1796    let per_chunk_overhead = S4E5_PER_CHUNK_OVERHEAD as u64; // = 16 (AES-GCM tag)
1797    let total_tag_overhead =
1798        per_chunk_overhead
1799            .checked_mul(chunk_count_u64)
1800            .ok_or(SseError::ChunkFrameTooLarge {
1801                details: "tag_len * chunk_count overflows u64",
1802            })?;
1803    let max_total = expected_plain_size
1804        .checked_add(total_tag_overhead)
1805        .and_then(|t| t.checked_add(header_bytes as u64))
1806        .ok_or(SseError::ChunkFrameTooLarge {
1807            details: "header + plaintext + tag overhead overflows u64",
1808        })?;
1809    // Body cannot be *larger* than what the declared geometry could
1810    // legitimately produce. Any extra bytes past the final chunk are
1811    // either trailing junk (corruption) or an append attack — kill
1812    // them here so the walker doesn't even start verifying chunks
1813    // for an obviously-malformed body. (The walker's own end-of-loop
1814    // `cursor != body.len()` check also catches this; the pre-alloc
1815    // guard saves the chunk-walk worth of AES-GCM verifies in the
1816    // hostile case.)
1817    if (body.len() as u64) > max_total {
1818        return Err(SseError::ChunkFrameTruncated {
1819            what: "trailing bytes past declared chunk geometry",
1820        });
1821    }
1822    // The actual DoS guard: cap on declared plaintext. If the header
1823    // claims more than the operator's configured single-object
1824    // ceiling, refuse before the buffered path's
1825    // `Vec::with_capacity(chunk_size * chunk_count)` runs.
1826    if expected_plain_size > max_body_bytes as u64 {
1827        return Err(SseError::ChunkFrameTooLarge {
1828            details: "declared plaintext exceeds gateway max_body_bytes",
1829        });
1830    }
1831
1832    Ok(ChunkedHeader {
1833        variant,
1834        key_id,
1835        chunk_size,
1836        chunk_count,
1837        salt,
1838        chunks_offset: header_bytes,
1839    })
1840}
1841
1842/// Decrypt one chunk under either the S4E5 or S4E6 derivation. Used
1843/// by both the buffered and streaming paths so AAD / nonce
1844/// derivation lives in exactly one place.
1845fn decrypt_chunked_chunk(
1846    cipher: &Aes256Gcm,
1847    chunk_index: u32,
1848    chunk_count: u32,
1849    key_id: u16,
1850    salt: &ChunkedSalt,
1851    tag: &[u8; TAG_LEN],
1852    ct: &[u8],
1853) -> Result<Bytes, SseError> {
1854    let nonce_bytes = match salt {
1855        ChunkedSalt::V5(s) => nonce_v5(s, chunk_index),
1856        ChunkedSalt::V6(s) => nonce_v6(s, chunk_index),
1857    };
1858    let nonce = Nonce::from_slice(&nonce_bytes);
1859    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1860    ct_with_tag.extend_from_slice(ct);
1861    ct_with_tag.extend_from_slice(tag);
1862    let result = match salt {
1863        ChunkedSalt::V5(s) => {
1864            let aad = aad_v5(chunk_index, chunk_count, key_id, s);
1865            cipher.decrypt(
1866                nonce,
1867                Payload {
1868                    msg: &ct_with_tag,
1869                    aad: &aad,
1870                },
1871            )
1872        }
1873        ChunkedSalt::V6(s) => {
1874            let aad = aad_v6(chunk_index, chunk_count, key_id, s);
1875            cipher.decrypt(
1876                nonce,
1877                Payload {
1878                    msg: &ct_with_tag,
1879                    aad: &aad,
1880                },
1881            )
1882        }
1883    };
1884    result
1885        .map(Bytes::from)
1886        .map_err(|_| SseError::ChunkAuthFailed { chunk_index })
1887}
1888
1889/// Walk an S4E5 / S4E6 body chunk-by-chunk, calling `emit` on each
1890/// successfully-verified plaintext chunk. Returns immediately on the
1891/// first chunk that fails auth or is truncated. Shared core between
1892/// the buffered ([`decrypt_chunked_buffered`]) and streaming
1893/// ([`decrypt_chunked_stream`]) paths.
1894fn walk_chunked<F: FnMut(Bytes) -> Result<(), SseError>>(
1895    body: &[u8],
1896    keyring: &SseKeyring,
1897    max_body_bytes: usize,
1898    mut emit: F,
1899) -> Result<(), SseError> {
1900    let hdr = parse_chunked_header(body, max_body_bytes)?;
1901    let key = keyring
1902        .get(hdr.key_id)
1903        .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1904    let cipher = Aes256Gcm::new(key.as_aes_key());
1905
1906    let mut cursor = hdr.chunks_offset;
1907    let chunk_size = hdr.chunk_size as usize;
1908    for i in 0..hdr.chunk_count {
1909        if cursor + TAG_LEN > body.len() {
1910            return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1911        }
1912        let tag_off = cursor;
1913        let ct_off = tag_off + TAG_LEN;
1914        let is_last = i + 1 == hdr.chunk_count;
1915        let ct_len = if is_last {
1916            if ct_off > body.len() {
1917                return Err(SseError::ChunkFrameTruncated {
1918                    what: "final chunk ciphertext",
1919                });
1920            }
1921            let remaining = body.len() - ct_off;
1922            if remaining > chunk_size {
1923                return Err(SseError::ChunkFrameTruncated {
1924                    what: "trailing bytes after final chunk",
1925                });
1926            }
1927            remaining
1928        } else {
1929            chunk_size
1930        };
1931        let ct_end = ct_off + ct_len;
1932        if ct_end > body.len() {
1933            return Err(SseError::ChunkFrameTruncated {
1934                what: "chunk ciphertext",
1935            });
1936        }
1937        let mut tag = [0u8; TAG_LEN];
1938        tag.copy_from_slice(&body[tag_off..ct_off]);
1939        let ct = &body[ct_off..ct_end];
1940        let plain =
1941            decrypt_chunked_chunk(&cipher, i, hdr.chunk_count, hdr.key_id, &hdr.salt, &tag, ct)?;
1942        crate::metrics::record_sse_streaming_chunk("decrypt");
1943        emit(plain)?;
1944        cursor = ct_end;
1945    }
1946    if cursor != body.len() {
1947        return Err(SseError::ChunkFrameTruncated {
1948            what: "trailing bytes after declared chunk_count",
1949        });
1950    }
1951    Ok(())
1952}
1953
1954/// Sync back-compat path: decrypt every chunk and concatenate into
1955/// a single `Bytes`. Memory peak = full plaintext (defeats the
1956/// point of S4E5/S4E6 streaming, but useful for callers that already
1957/// need the whole body — e.g. server-side restream-rewrite paths or
1958/// unit tests). Accepts both S4E5 (legacy) and S4E6 (current) bodies.
1959///
1960/// `max_body_bytes` (v0.8.2 #64) caps the declared plaintext size in
1961/// the header **before any allocation**. A malicious / corrupted
1962/// frame that claims a multi-PB plaintext is rejected as
1963/// [`SseError::ChunkFrameTooLarge`] with no `Vec::with_capacity` call
1964/// behind it. Callers without a bespoke cap should use
1965/// [`decrypt_chunked_buffered_default`] (5 GiB cap).
1966pub fn decrypt_chunked_buffered(
1967    body: &[u8],
1968    keyring: &SseKeyring,
1969    max_body_bytes: usize,
1970) -> Result<Bytes, SseError> {
1971    let hdr = parse_chunked_header(body, max_body_bytes)?;
1972    // Header is validated above: chunk_size * chunk_count fits u64
1973    // and is ≤ max_body_bytes (≤ 5 GiB on this gateway), so the
1974    // `as usize` casts cannot truncate on a 64-bit host. The
1975    // `Vec::with_capacity` is therefore bounded by the same cap that
1976    // protects the rest of the gateway from oversized PUTs / GETs.
1977    let mut out = Vec::with_capacity(hdr.chunk_size as usize * hdr.chunk_count as usize);
1978    walk_chunked(body, keyring, max_body_bytes, |chunk| {
1979        out.extend_from_slice(&chunk);
1980        Ok(())
1981    })?;
1982    Ok(Bytes::from(out))
1983}
1984
1985/// Back-compat wrapper around [`decrypt_chunked_buffered`] that uses
1986/// [`DEFAULT_MAX_BODY_BYTES`] (5 GiB — AWS S3's single-object PUT
1987/// ceiling). Provided so the existing `decrypt(body, keyring)`
1988/// dispatcher and call sites that don't have a bespoke cap can keep
1989/// their signature unchanged after the v0.8.2 #64 pre-allocation
1990/// guard landed.
1991pub fn decrypt_chunked_buffered_default(
1992    body: &[u8],
1993    keyring: &SseKeyring,
1994) -> Result<Bytes, SseError> {
1995    decrypt_chunked_buffered(body, keyring, DEFAULT_MAX_BODY_BYTES)
1996}
1997
1998/// v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): stream-decrypt API for
1999/// chunked SSE-S4 bodies. Returns a [`futures::Stream`] that yields
2000/// one `Bytes` per chunk in order. Each chunk is emitted only after
2001/// AES-GCM tag verify succeeds, so the client never sees plaintext
2002/// bytes that haven't been authenticated. A failing chunk yields
2003/// its [`SseError::ChunkAuthFailed`] (with the chunk index) and ends
2004/// the stream — earlier chunks may already have left the gateway,
2005/// which matches the standard streaming-AEAD trade-off (operators
2006/// MUST alert on the audit log + metric, not rely on connection
2007/// close to guarantee atomicity).
2008///
2009/// Accepts either S4E5 (v0.8 #52, legacy) or S4E6 (v0.8.1 #57,
2010/// current) magic. Non-chunked magic surfaces as
2011/// [`SseError::BadMagic`] / [`SseError::ChunkFrameTruncated`] on
2012/// the first poll — the stream is "fail-fast" rather than "fall
2013/// through to S4E2 buffered decrypt", because the caller has
2014/// already dispatched on [`peek_magic`] by the time it hands a body
2015/// to this function.
2016///
2017/// `body` is owned by the returned stream so the caller doesn't
2018/// need to keep the bytes alive separately. The returned stream is
2019/// `'static` — the `keyring` borrow is consumed up front to extract
2020/// the per-frame key and build the AES cipher (which owns its key
2021/// material), so the caller's keyring may be dropped immediately.
2022pub fn decrypt_chunked_stream(
2023    body: bytes::Bytes,
2024    keyring: &SseKeyring,
2025) -> impl futures::Stream<Item = Result<Bytes, SseError>> + 'static {
2026    use futures::stream::{self, StreamExt};
2027
2028    // Cheap pre-validation: parse the header + look up the key
2029    // once, up front, so a malformed frame surfaces on the first
2030    // poll instead of being deferred behind the first-chunk loop.
2031    // The `keyring` borrow ends here — we extract the AES key into
2032    // the owned `Aes256Gcm` cipher, then store that in the stream
2033    // state.
2034    let prelude = (|| {
2035        // v0.8.2 #64: streaming path is naturally memory-bounded
2036        // (one chunk-worth of plaintext at a time), so we don't cap
2037        // the *declared* total here — the per-chunk loop already
2038        // bounds memory by `chunk_size`. The truncation /
2039        // arithmetic-overflow checks inside `parse_chunked_header`
2040        // still run regardless of `max_body_bytes`, which is what
2041        // protects the streaming path from the same DoS class as the
2042        // buffered path (the overflow / declared-vs-actual length
2043        // mismatches are independent of the gateway cap).
2044        let hdr = parse_chunked_header(&body, usize::MAX)?;
2045        let key = keyring
2046            .get(hdr.key_id)
2047            .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
2048        let cipher = Aes256Gcm::new(key.as_aes_key());
2049        Ok::<_, SseError>((hdr, cipher))
2050    })();
2051
2052    match prelude {
2053        Err(e) => stream::iter(std::iter::once(Err(e))).left_stream(),
2054        Ok((hdr, cipher)) => {
2055            let chunks_offset = hdr.chunks_offset;
2056            let state = ChunkedDecryptState {
2057                body,
2058                cipher,
2059                hdr,
2060                cursor: chunks_offset,
2061                next_index: 0,
2062            };
2063            stream::try_unfold(state, decrypt_next_chunk).right_stream()
2064        }
2065    }
2066}
2067
2068/// Per-stream state for [`decrypt_chunked_stream`]. Holds the owned
2069/// `body` (so the stream stays self-contained), the prepared
2070/// cipher, and the cursor position into the chunk array.
2071struct ChunkedDecryptState {
2072    body: bytes::Bytes,
2073    cipher: Aes256Gcm,
2074    hdr: ChunkedHeader,
2075    cursor: usize,
2076    next_index: u32,
2077}
2078
2079async fn decrypt_next_chunk(
2080    mut state: ChunkedDecryptState,
2081) -> Result<Option<(Bytes, ChunkedDecryptState)>, SseError> {
2082    if state.next_index >= state.hdr.chunk_count {
2083        // Final boundary check — anything past the declared
2084        // chunk_count would be a truncation / append attack.
2085        if state.cursor != state.body.len() {
2086            return Err(SseError::ChunkFrameTruncated {
2087                what: "trailing bytes after declared chunk_count",
2088            });
2089        }
2090        return Ok(None);
2091    }
2092    let i = state.next_index;
2093    let chunk_size = state.hdr.chunk_size as usize;
2094    if state.cursor + TAG_LEN > state.body.len() {
2095        return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
2096    }
2097    let tag_off = state.cursor;
2098    let ct_off = tag_off + TAG_LEN;
2099    let is_last = i + 1 == state.hdr.chunk_count;
2100    let ct_len = if is_last {
2101        if ct_off > state.body.len() {
2102            return Err(SseError::ChunkFrameTruncated {
2103                what: "final chunk ciphertext",
2104            });
2105        }
2106        let remaining = state.body.len() - ct_off;
2107        if remaining > chunk_size {
2108            return Err(SseError::ChunkFrameTruncated {
2109                what: "trailing bytes after final chunk",
2110            });
2111        }
2112        remaining
2113    } else {
2114        chunk_size
2115    };
2116    let ct_end = ct_off + ct_len;
2117    if ct_end > state.body.len() {
2118        return Err(SseError::ChunkFrameTruncated {
2119            what: "chunk ciphertext",
2120        });
2121    }
2122    let mut tag = [0u8; TAG_LEN];
2123    tag.copy_from_slice(&state.body[tag_off..ct_off]);
2124    let ct = &state.body[ct_off..ct_end];
2125    let plain = decrypt_chunked_chunk(
2126        &state.cipher,
2127        i,
2128        state.hdr.chunk_count,
2129        state.hdr.key_id,
2130        &state.hdr.salt,
2131        &tag,
2132        ct,
2133    )?;
2134    crate::metrics::record_sse_streaming_chunk("decrypt");
2135    state.cursor = ct_end;
2136    state.next_index += 1;
2137    Ok(Some((plain, state)))
2138}
2139
2140/// v0.8.1 #57: build an S4E5 frame. Identical body structure to the
2141/// pre-#57 `encrypt_v2_chunked` (4-byte salt + S4E5 magic + V5
2142/// nonce/AAD); kept around purely so the back-compat-read tests can
2143/// synthesize a "v0.8.0 vintage" blob and prove the new gateway
2144/// still decrypts it.
2145#[cfg(test)]
2146fn encrypt_v2_chunked_s4e5_for_test(
2147    plaintext: &[u8],
2148    keyring: &SseKeyring,
2149    chunk_size: usize,
2150) -> Result<Bytes, SseError> {
2151    if chunk_size == 0 {
2152        return Err(SseError::ChunkSizeInvalid);
2153    }
2154    let (key_id, key) = keyring.active();
2155    let cipher = Aes256Gcm::new(key.as_aes_key());
2156    let mut salt = [0u8; 4];
2157    rand::rngs::OsRng.fill_bytes(&mut salt);
2158
2159    let chunk_count: u32 = if plaintext.is_empty() {
2160        1
2161    } else {
2162        plaintext
2163            .len()
2164            .div_ceil(chunk_size)
2165            .try_into()
2166            .expect("chunk_count overflows u32")
2167    };
2168
2169    let mut out = Vec::with_capacity(
2170        S4E5_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E5_PER_CHUNK_OVERHEAD),
2171    );
2172    out.extend_from_slice(SSE_MAGIC_V5);
2173    out.push(ALGO_AES_256_GCM);
2174    out.extend_from_slice(&key_id.to_be_bytes());
2175    out.push(0u8);
2176    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
2177    out.extend_from_slice(&chunk_count.to_be_bytes());
2178    out.extend_from_slice(&salt);
2179
2180    for i in 0..chunk_count {
2181        let off = (i as usize).saturating_mul(chunk_size);
2182        let end = off.saturating_add(chunk_size).min(plaintext.len());
2183        let chunk_pt: &[u8] = if off >= plaintext.len() {
2184            &[]
2185        } else {
2186            &plaintext[off..end]
2187        };
2188        let nonce_bytes = nonce_v5(&salt, i);
2189        let nonce = Nonce::from_slice(&nonce_bytes);
2190        let aad = aad_v5(i, chunk_count, key_id, &salt);
2191        let ct_with_tag = cipher
2192            .encrypt(
2193                nonce,
2194                Payload {
2195                    msg: chunk_pt,
2196                    aad: &aad,
2197                },
2198            )
2199            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
2200        let split = ct_with_tag.len() - TAG_LEN;
2201        let (ct, tag) = ct_with_tag.split_at(split);
2202        out.extend_from_slice(tag);
2203        out.extend_from_slice(ct);
2204    }
2205    Ok(Bytes::from(out))
2206}
2207
2208#[cfg(test)]
2209mod tests {
2210    use super::*;
2211
2212    fn key32(seed: u8) -> Arc<SseKey> {
2213        Arc::new(SseKey::from_bytes(&[seed; 32]).unwrap())
2214    }
2215
2216    fn keyring_single(seed: u8) -> SseKeyring {
2217        SseKeyring::new(1, key32(seed))
2218    }
2219
2220    #[test]
2221    fn roundtrip_basic_v1() {
2222        // back-compat single-key API — still works.
2223        let k = SseKey::from_bytes(&[7u8; 32]).unwrap();
2224        let pt = b"the quick brown fox jumps over the lazy dog";
2225        let ct = encrypt(&k, pt);
2226        assert!(looks_encrypted(&ct));
2227        assert_eq!(&ct[..4], SSE_MAGIC_V1);
2228        assert_eq!(ct[4], ALGO_AES_256_GCM);
2229        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2230        // decrypt via single-key keyring
2231        let kr = SseKeyring::new(1, Arc::new(k));
2232        let pt2 = decrypt(&ct, &kr).unwrap();
2233        assert_eq!(pt2.as_ref(), pt);
2234    }
2235
2236    #[test]
2237    fn s4e2_roundtrip_active_key() {
2238        let kr = keyring_single(7);
2239        let pt = b"S4E2 active-key roundtrip";
2240        let ct = encrypt_v2(pt, &kr);
2241        assert_eq!(&ct[..4], SSE_MAGIC_V2);
2242        assert_eq!(ct[4], ALGO_AES_256_GCM);
2243        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE");
2244        assert_eq!(ct[7], 0, "reserved byte");
2245        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2246        assert!(looks_encrypted(&ct));
2247        let pt2 = decrypt(&ct, &kr).unwrap();
2248        assert_eq!(pt2.as_ref(), pt);
2249    }
2250
2251    #[test]
2252    fn decrypt_s4e1_via_active_only_keyring() {
2253        // v0.4 wrote S4E1 with key K; v0.5 keyring has K as the only
2254        // (active) key. Decrypt must succeed.
2255        let k_arc = key32(11);
2256        let legacy_ct = encrypt(&k_arc, b"v0.4 vintage object");
2257        assert_eq!(&legacy_ct[..4], SSE_MAGIC_V1);
2258        let kr = SseKeyring::new(1, Arc::clone(&k_arc));
2259        let plain = decrypt(&legacy_ct, &kr).unwrap();
2260        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2261    }
2262
2263    #[test]
2264    fn decrypt_s4e2_under_old_key_after_rotation() {
2265        // Rotation flow: object was encrypted under key id=1 when 1
2266        // was active. Operator rotates to active=2 and keeps 1 in the
2267        // keyring. The S4E2 body must still decrypt.
2268        let k1 = key32(1);
2269        let k2 = key32(2);
2270        let mut kr_old = SseKeyring::new(1, Arc::clone(&k1));
2271        let ct = encrypt_v2(b"old-rotation object", &kr_old);
2272        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2273
2274        // After rotation: active=2, but key 1 still in ring.
2275        kr_old.add(2, Arc::clone(&k2));
2276        let mut kr_new = SseKeyring::new(2, Arc::clone(&k2));
2277        kr_new.add(1, Arc::clone(&k1));
2278
2279        let plain = decrypt(&ct, &kr_new).unwrap();
2280        assert_eq!(plain.as_ref(), b"old-rotation object");
2281
2282        // And new PUTs go to id 2 (active).
2283        let new_ct = encrypt_v2(b"new-rotation object", &kr_new);
2284        assert_eq!(u16::from_be_bytes([new_ct[5], new_ct[6]]), 2);
2285        let plain_new = decrypt(&new_ct, &kr_new).unwrap();
2286        assert_eq!(plain_new.as_ref(), b"new-rotation object");
2287    }
2288
2289    #[test]
2290    fn s4e2_unknown_key_id_errors() {
2291        let kr = keyring_single(3); // only id=1 present
2292        let kr_other = SseKeyring::new(99, key32(3));
2293        let ct = encrypt_v2(b"x", &kr_other); // body claims key_id=99
2294        let err = decrypt(&ct, &kr).unwrap_err();
2295        assert!(
2296            matches!(err, SseError::KeyNotInKeyring { id: 99 }),
2297            "got {err:?}"
2298        );
2299    }
2300
2301    #[test]
2302    fn s4e2_tampered_key_id_fails_auth() {
2303        let kr = SseKeyring::new(1, key32(4));
2304        let mut kr_with_2 = kr.clone();
2305        kr_with_2.add(2, key32(5)); // a real but wrong key under id=2
2306        let mut ct = encrypt_v2(b"do not flip my key id", &kr).to_vec();
2307        // Flip key_id from 1 → 2 in the header. The keyring HAS a key
2308        // for 2, so the lookup succeeds — but AAD authenticates the
2309        // original key_id, so AES-GCM tag verification must fail.
2310        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2311        ct[5] = 0;
2312        ct[6] = 2;
2313        let err = decrypt(&ct, &kr_with_2).unwrap_err();
2314        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2315    }
2316
2317    #[test]
2318    fn s4e2_tampered_ciphertext_fails() {
2319        let kr = SseKeyring::new(7, key32(9));
2320        let mut ct = encrypt_v2(b"secret message v2", &kr).to_vec();
2321        let last = ct.len() - 1;
2322        ct[last] ^= 0x01;
2323        let err = decrypt(&ct, &kr).unwrap_err();
2324        assert!(matches!(err, SseError::DecryptFailed));
2325    }
2326
2327    #[test]
2328    fn s4e2_tampered_algo_byte_fails() {
2329        let kr = SseKeyring::new(1, key32(2));
2330        let mut ct = encrypt_v2(b"hi", &kr).to_vec();
2331        ct[4] = 99;
2332        let err = decrypt(&ct, &kr).unwrap_err();
2333        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2334    }
2335
2336    #[test]
2337    fn wrong_key_fails_v1_via_keyring() {
2338        // S4E1 written under key K1; keyring has only K2 → DecryptFailed.
2339        let k1 = SseKey::from_bytes(&[1u8; 32]).unwrap();
2340        let ct = encrypt(&k1, b"secret");
2341        let kr_wrong = SseKeyring::new(1, Arc::new(SseKey::from_bytes(&[2u8; 32]).unwrap()));
2342        let err = decrypt(&ct, &kr_wrong).unwrap_err();
2343        assert!(matches!(err, SseError::DecryptFailed));
2344    }
2345
2346    #[test]
2347    fn rejects_short_body() {
2348        let kr = SseKeyring::new(1, key32(1));
2349        let err = decrypt(b"short", &kr).unwrap_err();
2350        assert!(matches!(err, SseError::TooShort { got: 5 }));
2351    }
2352
2353    #[test]
2354    fn looks_encrypted_passthrough_returns_false() {
2355        // S4F2 frame magic, NOT S4E1 / S4E2 — must not be confused.
2356        let f2 = b"S4F2\x01\x00\x00\x00........................................";
2357        assert!(!looks_encrypted(f2));
2358        assert!(!looks_encrypted(b""));
2359    }
2360
2361    #[test]
2362    fn looks_encrypted_detects_both_v1_and_v2() {
2363        let kr = SseKeyring::new(1, key32(8));
2364        let v1 = encrypt(&SseKey::from_bytes(&[8u8; 32]).unwrap(), b"x");
2365        let v2 = encrypt_v2(b"x", &kr);
2366        assert!(looks_encrypted(&v1));
2367        assert!(looks_encrypted(&v2));
2368    }
2369
2370    #[test]
2371    fn key_from_hex_string() {
2372        let bad =
2373            SseKey::from_bytes(b"0102030405060708090a0b0c0d0e0f10111213141516171819202122232425")
2374                .unwrap_err();
2375        assert!(matches!(bad, SseError::BadKeyLength { .. }));
2376        let good = b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2377        let _ = SseKey::from_bytes(good).expect("64-char hex should parse");
2378    }
2379
2380    #[test]
2381    fn encrypt_v2_uses_random_nonce() {
2382        let kr = SseKeyring::new(1, key32(3));
2383        let pt = b"deterministic input";
2384        let a = encrypt_v2(pt, &kr);
2385        let b = encrypt_v2(pt, &kr);
2386        assert_ne!(a, b, "nonce must be random per-call");
2387    }
2388
2389    #[test]
2390    fn keyring_active_and_get() {
2391        let k1 = key32(1);
2392        let k2 = key32(2);
2393        let mut kr = SseKeyring::new(1, Arc::clone(&k1));
2394        kr.add(2, Arc::clone(&k2));
2395        let (id, active) = kr.active();
2396        assert_eq!(id, 1);
2397        assert_eq!(active.bytes, [1u8; 32]);
2398        assert!(kr.get(2).is_some());
2399        assert!(kr.get(3).is_none());
2400    }
2401
2402    // -----------------------------------------------------------------
2403    // v0.5 #27 — SSE-C (customer-provided key, S4E3 frame) tests
2404    // -----------------------------------------------------------------
2405
2406    use base64::Engine as _;
2407
2408    fn cust_key(seed: u8) -> CustomerKeyMaterial {
2409        let key = [seed; KEY_LEN];
2410        let key_md5 = compute_key_md5(&key);
2411        CustomerKeyMaterial { key, key_md5 }
2412    }
2413
2414    #[test]
2415    fn s4e3_roundtrip_happy_path() {
2416        let m = cust_key(42);
2417        let pt = b"top-secret SSE-C payload";
2418        let ct = encrypt_with_source(
2419            pt,
2420            SseSource::CustomerKey {
2421                key: &m.key,
2422                key_md5: &m.key_md5,
2423            },
2424        );
2425        // Frame inspection.
2426        assert_eq!(&ct[..4], SSE_MAGIC_V3);
2427        assert_eq!(ct[4], ALGO_AES_256_GCM);
2428        assert_eq!(&ct[5..5 + KEY_MD5_LEN], &m.key_md5);
2429        assert_eq!(ct.len(), SSE_HEADER_BYTES_V3 + pt.len());
2430        assert!(looks_encrypted(&ct));
2431        // Decrypt round-trip.
2432        let plain = decrypt(
2433            &ct,
2434            SseSource::CustomerKey {
2435                key: &m.key,
2436                key_md5: &m.key_md5,
2437            },
2438        )
2439        .unwrap();
2440        assert_eq!(plain.as_ref(), pt);
2441        // And via the From impl on &CustomerKeyMaterial.
2442        let plain2 = decrypt(&ct, &m).unwrap();
2443        assert_eq!(plain2.as_ref(), pt);
2444    }
2445
2446    #[test]
2447    fn s4e3_wrong_key_yields_wrong_customer_key_error() {
2448        let m = cust_key(1);
2449        let other = cust_key(2);
2450        let ct = encrypt_with_source(b"payload", (&m).into());
2451        let err = decrypt(
2452            &ct,
2453            SseSource::CustomerKey {
2454                key: &other.key,
2455                key_md5: &other.key_md5,
2456            },
2457        )
2458        .unwrap_err();
2459        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2460    }
2461
2462    #[test]
2463    fn s4e3_tampered_stored_md5_is_caught() {
2464        // Attacker rewrites the stored MD5 to match a key they know.
2465        // Even though the supplied (attacker) key matches the rewritten
2466        // MD5, AES-GCM authenticates the ORIGINAL md5 via AAD, so the
2467        // tag check fails. Surface: WrongCustomerKey if the supplied
2468        // md5 != stored md5 (this test), or DecryptFailed if attacker
2469        // also rewrites their supplied md5 to match.
2470        let m = cust_key(7);
2471        let mut ct = encrypt_with_source(b"victim payload", (&m).into()).to_vec();
2472        // Flip a byte in the stored fingerprint.
2473        ct[5] ^= 0x55;
2474        // Client supplies the original (unmodified) key + md5.
2475        let err = decrypt(
2476            &ct,
2477            SseSource::CustomerKey {
2478                key: &m.key,
2479                key_md5: &m.key_md5,
2480            },
2481        )
2482        .unwrap_err();
2483        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2484    }
2485
2486    #[test]
2487    fn s4e3_tampered_md5_with_matching_supplied_md5_fails_aead() {
2488        // Both stored md5 AND supplied md5 are flipped to the same bogus
2489        // value. The fingerprint check passes (they match) but AAD
2490        // authenticates the *original* md5, so AES-GCM fails.
2491        let m = cust_key(3);
2492        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2493        ct[5] ^= 0xFF;
2494        let mut bogus_md5 = m.key_md5;
2495        bogus_md5[0] ^= 0xFF;
2496        let err = decrypt(
2497            &ct,
2498            SseSource::CustomerKey {
2499                key: &m.key,
2500                key_md5: &bogus_md5,
2501            },
2502        )
2503        .unwrap_err();
2504        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2505    }
2506
2507    #[test]
2508    fn s4e3_tampered_ciphertext_fails_aead() {
2509        let m = cust_key(8);
2510        let mut ct = encrypt_with_source(b"sealed message", (&m).into()).to_vec();
2511        let last = ct.len() - 1;
2512        ct[last] ^= 0x01;
2513        let err = decrypt(&ct, &m).unwrap_err();
2514        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2515    }
2516
2517    #[test]
2518    fn s4e3_tampered_algo_byte_rejected() {
2519        let m = cust_key(9);
2520        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2521        ct[4] = 99;
2522        let err = decrypt(&ct, &m).unwrap_err();
2523        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2524    }
2525
2526    #[test]
2527    fn s4e3_uses_random_nonce() {
2528        let m = cust_key(10);
2529        let a = encrypt_with_source(b"deterministic input", (&m).into());
2530        let b = encrypt_with_source(b"deterministic input", (&m).into());
2531        assert_ne!(a, b, "nonce must be random per-call");
2532    }
2533
2534    #[test]
2535    fn parse_customer_key_headers_happy_path() {
2536        let key = [11u8; KEY_LEN];
2537        let md5 = compute_key_md5(&key);
2538        let key_b64 = base64::engine::general_purpose::STANDARD.encode(key);
2539        let md5_b64 = base64::engine::general_purpose::STANDARD.encode(md5);
2540        let m = parse_customer_key_headers("AES256", &key_b64, &md5_b64).unwrap();
2541        assert_eq!(m.key, key);
2542        assert_eq!(m.key_md5, md5);
2543    }
2544
2545    #[test]
2546    fn parse_customer_key_headers_rejects_wrong_algorithm() {
2547        let key = [1u8; KEY_LEN];
2548        let md5 = compute_key_md5(&key);
2549        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2550        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2551        let err = parse_customer_key_headers("AES128", &kb, &mb).unwrap_err();
2552        assert!(
2553            matches!(err, SseError::CustomerKeyAlgorithmUnsupported { ref algo } if algo == "AES128"),
2554            "got {err:?}"
2555        );
2556        // Lowercase variant still rejected (AWS S3 accepts only "AES256").
2557        let err2 = parse_customer_key_headers("aes256", &kb, &mb).unwrap_err();
2558        assert!(
2559            matches!(err2, SseError::CustomerKeyAlgorithmUnsupported { .. }),
2560            "got {err2:?}"
2561        );
2562    }
2563
2564    #[test]
2565    fn parse_customer_key_headers_rejects_wrong_key_length() {
2566        let short_key = vec![5u8; 16]; // half-length AES key
2567        let md5 = compute_key_md5(&short_key);
2568        let kb = base64::engine::general_purpose::STANDARD.encode(&short_key);
2569        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2570        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2571        assert!(
2572            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("key length")),
2573            "got {err:?}"
2574        );
2575    }
2576
2577    #[test]
2578    fn parse_customer_key_headers_rejects_wrong_md5_length() {
2579        let key = [3u8; KEY_LEN];
2580        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2581        // Truncated MD5 (15 bytes instead of 16).
2582        let bad_md5 = vec![0u8; 15];
2583        let mb = base64::engine::general_purpose::STANDARD.encode(bad_md5);
2584        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2585        assert!(
2586            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 length")),
2587            "got {err:?}"
2588        );
2589    }
2590
2591    #[test]
2592    fn parse_customer_key_headers_rejects_md5_mismatch() {
2593        let key = [4u8; KEY_LEN];
2594        let other = [5u8; KEY_LEN];
2595        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2596        let wrong_md5 = compute_key_md5(&other);
2597        let mb = base64::engine::general_purpose::STANDARD.encode(wrong_md5);
2598        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2599        assert!(
2600            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 does not match")),
2601            "got {err:?}"
2602        );
2603    }
2604
2605    #[test]
2606    fn parse_customer_key_headers_rejects_bad_base64() {
2607        let valid_key = [0u8; KEY_LEN];
2608        let md5 = compute_key_md5(&valid_key);
2609        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2610        let err = parse_customer_key_headers("AES256", "!!!not-base64!!!", &mb).unwrap_err();
2611        assert!(
2612            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2613            "got {err:?}"
2614        );
2615        // Bad MD5 base64.
2616        let kb = base64::engine::general_purpose::STANDARD.encode(valid_key);
2617        let err2 = parse_customer_key_headers("AES256", &kb, "??not-base64??").unwrap_err();
2618        assert!(
2619            matches!(err2, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2620            "got {err2:?}"
2621        );
2622    }
2623
2624    #[test]
2625    fn parse_customer_key_headers_trims_whitespace() {
2626        // S3 SDKs sometimes pad headers with trailing newlines.
2627        let key = [12u8; KEY_LEN];
2628        let md5 = compute_key_md5(&key);
2629        let kb = format!(
2630            "  {}\n",
2631            base64::engine::general_purpose::STANDARD.encode(key)
2632        );
2633        let mb = format!(
2634            "\t{}  ",
2635            base64::engine::general_purpose::STANDARD.encode(md5)
2636        );
2637        let m = parse_customer_key_headers("AES256", &kb, &mb).unwrap();
2638        assert_eq!(m.key, key);
2639    }
2640
2641    // -----------------------------------------------------------------
2642    // Back-compat + cross-source mixing
2643    // -----------------------------------------------------------------
2644
2645    #[test]
2646    fn back_compat_decrypt_s4e1_with_keyring_source() {
2647        let k = key32(33);
2648        let legacy_ct = encrypt(&k, b"v0.4 vintage object");
2649        let kr = SseKeyring::new(1, Arc::clone(&k));
2650        // Both call styles must work — `&kr` (back-compat) and
2651        // `SseSource::Keyring(&kr)` (explicit).
2652        let plain = decrypt(&legacy_ct, &kr).unwrap();
2653        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2654        let plain2 = decrypt(&legacy_ct, SseSource::Keyring(&kr)).unwrap();
2655        assert_eq!(plain2.as_ref(), b"v0.4 vintage object");
2656    }
2657
2658    #[test]
2659    fn back_compat_decrypt_s4e2_with_keyring_source() {
2660        let kr = keyring_single(34);
2661        let ct = encrypt_v2(b"v0.5 #29 object", &kr);
2662        let plain = decrypt(&ct, &kr).unwrap();
2663        assert_eq!(plain.as_ref(), b"v0.5 #29 object");
2664        // encrypt_with_source(Keyring) should produce the same wire
2665        // format (S4E2).
2666        let ct2 = encrypt_with_source(b"v0.5 #29 object", SseSource::Keyring(&kr));
2667        assert_eq!(&ct2[..4], SSE_MAGIC_V2);
2668        let plain2 = decrypt(&ct2, &kr).unwrap();
2669        assert_eq!(plain2.as_ref(), b"v0.5 #29 object");
2670    }
2671
2672    #[test]
2673    fn s4e2_blob_with_customer_key_source_is_rejected() {
2674        // An object stored with SSE-S4 (S4E2) but a client sending
2675        // SSE-C headers on the GET — this is a misuse, surface as
2676        // CustomerKeyUnexpected so service.rs can return 400.
2677        let kr = keyring_single(50);
2678        let ct = encrypt_v2(b"server-managed object", &kr);
2679        let m = cust_key(99);
2680        let err = decrypt(
2681            &ct,
2682            SseSource::CustomerKey {
2683                key: &m.key,
2684                key_md5: &m.key_md5,
2685            },
2686        )
2687        .unwrap_err();
2688        assert!(
2689            matches!(err, SseError::CustomerKeyUnexpected),
2690            "got {err:?}"
2691        );
2692    }
2693
2694    #[test]
2695    fn s4e3_blob_with_keyring_source_is_rejected() {
2696        // Inverse: object is SSE-C (S4E3) but client forgot to send
2697        // SSE-C headers. Service.rs should map this to 400.
2698        let m = cust_key(60);
2699        let ct = encrypt_with_source(b"customer-key object", (&m).into());
2700        let kr = keyring_single(60);
2701        let err = decrypt(&ct, &kr).unwrap_err();
2702        assert!(matches!(err, SseError::CustomerKeyRequired), "got {err:?}");
2703    }
2704
2705    #[test]
2706    fn looks_encrypted_detects_s4e3() {
2707        let m = cust_key(13);
2708        let ct = encrypt_with_source(b"x", (&m).into());
2709        assert!(looks_encrypted(&ct));
2710    }
2711
2712    #[test]
2713    fn s4e3_rejects_short_body() {
2714        // 36 bytes passes the looks_encrypted gate but is shorter than
2715        // S4E3's 49-byte header.
2716        let mut short = Vec::new();
2717        short.extend_from_slice(SSE_MAGIC_V3);
2718        short.push(ALGO_AES_256_GCM);
2719        // Padding to 36 bytes (SSE_HEADER_BYTES) so the outer length
2720        // check passes but the S4E3 inner check fails.
2721        short.extend_from_slice(&[0u8; SSE_HEADER_BYTES - 5]);
2722        assert_eq!(short.len(), SSE_HEADER_BYTES);
2723        let m = cust_key(1);
2724        let err = decrypt(
2725            &short,
2726            SseSource::CustomerKey {
2727                key: &m.key,
2728                key_md5: &m.key_md5,
2729            },
2730        )
2731        .unwrap_err();
2732        assert!(matches!(err, SseError::TooShort { .. }), "got {err:?}");
2733    }
2734
2735    #[test]
2736    fn customer_key_material_debug_redacts_key() {
2737        let m = cust_key(99);
2738        let s = format!("{m:?}");
2739        assert!(s.contains("redacted"));
2740        assert!(!s.contains(&format!("{:?}", m.key.as_slice())));
2741    }
2742
2743    #[test]
2744    fn constant_time_eq_basic() {
2745        assert!(constant_time_eq(b"abc", b"abc"));
2746        assert!(!constant_time_eq(b"abc", b"abd"));
2747        assert!(!constant_time_eq(b"abc", b"abcd"));
2748        assert!(constant_time_eq(b"", b""));
2749    }
2750
2751    #[test]
2752    fn compute_key_md5_known_vector() {
2753        // Empty input MD5 is known: d41d8cd98f00b204e9800998ecf8427e.
2754        let got = compute_key_md5(b"");
2755        let expected_hex = "d41d8cd98f00b204e9800998ecf8427e";
2756        assert_eq!(hex_lower(&got), expected_hex);
2757    }
2758
2759    // -----------------------------------------------------------------
2760    // v0.5 #28 — SSE-KMS envelope (S4E4) tests
2761    // -----------------------------------------------------------------
2762
2763    use crate::kms::{KmsBackend, LocalKms};
2764    use std::collections::HashMap;
2765    use std::path::PathBuf;
2766
2767    fn local_kms_with(key_ids: &[(&str, [u8; 32])]) -> LocalKms {
2768        let mut keks: HashMap<String, [u8; 32]> = HashMap::new();
2769        for (id, k) in key_ids {
2770            keks.insert((*id).to_string(), *k);
2771        }
2772        LocalKms::from_keks(PathBuf::from("/tmp/none"), keks)
2773    }
2774
2775    #[tokio::test]
2776    async fn s4e4_roundtrip_via_local_kms() {
2777        let kms = local_kms_with(&[("alpha", [42u8; 32])]);
2778        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2779        let mut dek = [0u8; 32];
2780        dek.copy_from_slice(&dek_vec);
2781        let pt = b"SSE-KMS envelope payload across the S4E4 frame";
2782        let ct = encrypt_with_source(
2783            pt,
2784            SseSource::Kms {
2785                dek: &dek,
2786                wrapped: &wrapped,
2787            },
2788        );
2789        // Frame inspection.
2790        assert_eq!(&ct[..4], SSE_MAGIC_V4);
2791        assert_eq!(ct[4], ALGO_AES_256_GCM);
2792        let key_id_len = ct[5] as usize;
2793        assert_eq!(key_id_len, "alpha".len());
2794        assert_eq!(&ct[6..6 + key_id_len], b"alpha");
2795        // peek_magic + looks_encrypted both recognise S4E4.
2796        assert!(looks_encrypted(&ct));
2797        assert_eq!(peek_magic(&ct), Some("S4E4"));
2798        // Async decrypt round-trip.
2799        let plain = decrypt_with_kms(&ct, &kms).await.unwrap();
2800        assert_eq!(plain.as_ref(), pt);
2801    }
2802
2803    #[tokio::test]
2804    async fn s4e4_tampered_key_id_fails_aead() {
2805        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2806        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2807        let mut dek = [0u8; 32];
2808        dek.copy_from_slice(&dek_vec);
2809        let mut ct = encrypt_with_source(
2810            b"do not redirect",
2811            SseSource::Kms {
2812                dek: &dek,
2813                wrapped: &wrapped,
2814            },
2815        )
2816        .to_vec();
2817        // Flip the key_id from "alpha" to "betaa" by changing the
2818        // first byte of the key_id field. The forged id "bltha" is
2819        // not in the KMS, so unwrap fails with KeyNotFound surfaced
2820        // through KmsBackend(KmsError::KeyNotFound).
2821        let key_id_off = 6;
2822        ct[key_id_off] = b'b';
2823        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2824        assert!(
2825            matches!(
2826                err,
2827                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2828                    | SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2829            ),
2830            "got {err:?}"
2831        );
2832    }
2833
2834    #[tokio::test]
2835    async fn s4e4_tampered_key_id_to_real_other_id_still_fails() {
2836        // Wrap under "alpha" but rewrite the stored key_id to "beta"
2837        // (which IS in the KMS). KmsBackend will try to unwrap with
2838        // beta's KEK and AAD = "beta", but the wrapped bytes were
2839        // produced with alpha's KEK + AAD = "alpha", so the local
2840        // KMS unwrap fails with UnwrapFailed.
2841        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2842        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2843        let mut dek = [0u8; 32];
2844        dek.copy_from_slice(&dek_vec);
2845        let mut ct = encrypt_with_source(
2846            b"redirect attempt",
2847            SseSource::Kms {
2848                dek: &dek,
2849                wrapped: &wrapped,
2850            },
2851        )
2852        .to_vec();
2853        // Both "alpha" and "beta" are 5 chars long so the rewrite
2854        // doesn't shift any other field offsets.
2855        let key_id_off = 6;
2856        ct[key_id_off..key_id_off + 5].copy_from_slice(b"beta_");
2857        // Trim back to 4-byte "beta" by also shrinking the length
2858        // prefix would change downstream offsets — instead pad the
2859        // forged id to keep length stable. This mirrors the realistic
2860        // tampering surface (attacker can flip bytes but not change
2861        // the on-disk layout). The KMS now sees key_id "beta_" which
2862        // is unknown → KeyNotFound.
2863        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2864        assert!(
2865            matches!(
2866                err,
2867                SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2868            ),
2869            "got {err:?}"
2870        );
2871    }
2872
2873    #[tokio::test]
2874    async fn s4e4_tampered_wrapped_dek_fails_unwrap() {
2875        let kms = local_kms_with(&[("k", [3u8; 32])]);
2876        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2877        let mut dek = [0u8; 32];
2878        dek.copy_from_slice(&dek_vec);
2879        let mut ct = encrypt_with_source(
2880            b"target body",
2881            SseSource::Kms {
2882                dek: &dek,
2883                wrapped: &wrapped,
2884            },
2885        )
2886        .to_vec();
2887        // Locate the wrapped_dek_len + wrapped_dek field and flip a
2888        // byte in the middle of the wrapped DEK. AES-GCM auth on the
2889        // wrap fails → KmsBackend(UnwrapFailed).
2890        let key_id_len = ct[5] as usize;
2891        let wrapped_len_off = 6 + key_id_len;
2892        let wrapped_off = wrapped_len_off + 4;
2893        let mid = wrapped_off + (wrapped.ciphertext.len() / 2);
2894        ct[mid] ^= 0xFF;
2895        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2896        assert!(
2897            matches!(
2898                err,
2899                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2900            ),
2901            "got {err:?}"
2902        );
2903    }
2904
2905    #[tokio::test]
2906    async fn s4e4_tampered_ciphertext_fails_aead() {
2907        let kms = local_kms_with(&[("k", [4u8; 32])]);
2908        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2909        let mut dek = [0u8; 32];
2910        dek.copy_from_slice(&dek_vec);
2911        let mut ct = encrypt_with_source(
2912            b"sealed body",
2913            SseSource::Kms {
2914                dek: &dek,
2915                wrapped: &wrapped,
2916            },
2917        )
2918        .to_vec();
2919        let last = ct.len() - 1;
2920        ct[last] ^= 0x01;
2921        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2922        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2923    }
2924
2925    #[tokio::test]
2926    async fn s4e4_uses_random_nonce_and_dek_per_put() {
2927        let kms = local_kms_with(&[("k", [5u8; 32])]);
2928        // Two PUTs of the same plaintext under the same KEK must
2929        // produce different ciphertexts (fresh DEK + fresh nonce).
2930        let (dek1_vec, wrapped1) = kms.generate_dek("k").await.unwrap();
2931        let (dek2_vec, wrapped2) = kms.generate_dek("k").await.unwrap();
2932        let mut dek1 = [0u8; 32];
2933        dek1.copy_from_slice(&dek1_vec);
2934        let mut dek2 = [0u8; 32];
2935        dek2.copy_from_slice(&dek2_vec);
2936        let pt = b"deterministic input";
2937        let a = encrypt_with_source(
2938            pt,
2939            SseSource::Kms {
2940                dek: &dek1,
2941                wrapped: &wrapped1,
2942            },
2943        );
2944        let b = encrypt_with_source(
2945            pt,
2946            SseSource::Kms {
2947                dek: &dek2,
2948                wrapped: &wrapped2,
2949            },
2950        );
2951        assert_ne!(a, b);
2952        // Both still decrypt round-trip.
2953        let plain_a = decrypt_with_kms(&a, &kms).await.unwrap();
2954        let plain_b = decrypt_with_kms(&b, &kms).await.unwrap();
2955        assert_eq!(plain_a.as_ref(), pt);
2956        assert_eq!(plain_b.as_ref(), pt);
2957    }
2958
2959    #[tokio::test]
2960    async fn s4e4_sync_decrypt_returns_kms_async_required() {
2961        // The whole point of KmsAsyncRequired: passing an S4E4 body
2962        // to the sync `decrypt` function must surface a distinct
2963        // error so service.rs's GET path notices the bug rather than
2964        // returning a generic "wrong source" 400.
2965        let kms = local_kms_with(&[("k", [6u8; 32])]);
2966        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2967        let mut dek = [0u8; 32];
2968        dek.copy_from_slice(&dek_vec);
2969        let ct = encrypt_with_source(
2970            b"async only",
2971            SseSource::Kms {
2972                dek: &dek,
2973                wrapped: &wrapped,
2974            },
2975        );
2976        // Try via Keyring source (the default sync path).
2977        let kr = SseKeyring::new(1, key32(0));
2978        let err = decrypt(&ct, &kr).unwrap_err();
2979        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2980    }
2981
2982    #[test]
2983    fn back_compat_s4e1_e2_e3_still_decrypt_via_sync() {
2984        // After adding S4E4, the sync `decrypt` path must still
2985        // handle every legacy frame variant unchanged.
2986        let k = key32(7);
2987        let v1 = encrypt(&k, b"v0.4 vintage");
2988        let kr = SseKeyring::new(1, Arc::clone(&k));
2989        assert_eq!(decrypt(&v1, &kr).unwrap().as_ref(), b"v0.4 vintage");
2990
2991        let v2 = encrypt_v2(b"v0.5 #29 vintage", &kr);
2992        assert_eq!(decrypt(&v2, &kr).unwrap().as_ref(), b"v0.5 #29 vintage");
2993
2994        let m = cust_key(7);
2995        let v3 = encrypt_with_source(b"v0.5 #27 vintage", (&m).into());
2996        assert_eq!(decrypt(&v3, &m).unwrap().as_ref(), b"v0.5 #27 vintage");
2997    }
2998
2999    #[test]
3000    fn peek_magic_distinguishes_all_variants() {
3001        // S4E1 / S4E2 / S4E3 — built from real encrypts so the
3002        // length gate also passes.
3003        let k = key32(9);
3004        let v1 = encrypt(&k, b"x");
3005        assert_eq!(peek_magic(&v1), Some("S4E1"));
3006        let kr = SseKeyring::new(1, Arc::clone(&k));
3007        let v2 = encrypt_v2(b"x", &kr);
3008        assert_eq!(peek_magic(&v2), Some("S4E2"));
3009        let m = cust_key(9);
3010        let v3 = encrypt_with_source(b"x", (&m).into());
3011        assert_eq!(peek_magic(&v3), Some("S4E3"));
3012        // Synthetic S4E4 magic with enough trailing bytes to clear
3013        // the 36-byte length gate. peek_magic does NOT validate the
3014        // S4E4 inner header, just the magic — that's the contract
3015        // (cheap dispatch signal).
3016        let mut v4 = Vec::new();
3017        v4.extend_from_slice(SSE_MAGIC_V4);
3018        v4.extend_from_slice(&[0u8; 40]);
3019        assert_eq!(peek_magic(&v4), Some("S4E4"));
3020        // Unknown magic / too-short input → None.
3021        assert!(peek_magic(b"NOPE").is_none());
3022        assert!(peek_magic(b"short").is_none());
3023        assert!(peek_magic(&[0u8; 100]).is_none());
3024    }
3025
3026    #[tokio::test]
3027    async fn s4e4_truncated_frame_errors_cleanly() {
3028        // Truncate to less than the minimum header. Must surface
3029        // KmsFrameTooShort, not panic, not return BadMagic.
3030        let truncated = b"S4E4\x01\x05hi";
3031        let kms = local_kms_with(&[("k", [1u8; 32])]);
3032        let err = decrypt_with_kms(truncated, &kms).await.unwrap_err();
3033        assert!(
3034            matches!(err, SseError::KmsFrameTooShort { .. }),
3035            "got {err:?}"
3036        );
3037    }
3038
3039    #[tokio::test]
3040    async fn s4e4_oob_key_id_len_errors() {
3041        // Build a body that claims key_id_len = 200 but only has 4
3042        // bytes after the length prefix. parse_s4e4_header must
3043        // refuse with KmsFrameFieldOob, not slice-panic.
3044        let mut body = Vec::new();
3045        body.extend_from_slice(SSE_MAGIC_V4);
3046        body.push(ALGO_AES_256_GCM);
3047        body.push(200u8); // key_id_len
3048        // Remaining bytes < 200; pad to clear the looks_encrypted
3049        // floor (36 bytes) but stay short of the claimed key_id +
3050        // wrapped_dek_len + nonce + tag layout.
3051        body.extend_from_slice(&[0u8; 50]);
3052        let kms = local_kms_with(&[("k", [1u8; 32])]);
3053        let err = decrypt_with_kms(&body, &kms).await.unwrap_err();
3054        assert!(
3055            matches!(err, SseError::KmsFrameFieldOob { .. }),
3056            "got {err:?}"
3057        );
3058    }
3059
3060    #[tokio::test]
3061    async fn s4e4_via_keyring_source_into_sync_decrypt_is_kms_async_required() {
3062        // S4E4 + Keyring source: sync decrypt sees the S4E4 magic
3063        // first and returns KmsAsyncRequired regardless of source —
3064        // the source mismatch never gets a chance to surface, which
3065        // is the right behaviour (caller's bug is "didn't peek
3066        // magic" not "wrong source").
3067        let kms = local_kms_with(&[("k", [9u8; 32])]);
3068        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
3069        let mut dek = [0u8; 32];
3070        dek.copy_from_slice(&dek_vec);
3071        let ct = encrypt_with_source(
3072            b"x",
3073            SseSource::Kms {
3074                dek: &dek,
3075                wrapped: &wrapped,
3076            },
3077        );
3078        let m = cust_key(1);
3079        let err = decrypt(&ct, &m).unwrap_err();
3080        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
3081    }
3082
3083    #[tokio::test]
3084    async fn s4e4_looks_encrypted_passthrough_returns_false_for_synthetic() {
3085        // S4F4 (note F not E) must NOT be confused with S4E4.
3086        let mut not_s4e4 = Vec::new();
3087        not_s4e4.extend_from_slice(b"S4F4");
3088        not_s4e4.extend_from_slice(&[0u8; 60]);
3089        assert!(!looks_encrypted(&not_s4e4));
3090        assert_eq!(peek_magic(&not_s4e4), None);
3091    }
3092
3093    #[tokio::test]
3094    async fn s4e4_aad_length_prefix_prevents_byte_shifting() {
3095        // Constructing an S4E4 body where the wrapped_dek_len is
3096        // shrunk by N bytes and the same N bytes are prepended to
3097        // the key_id-equivalent area would, without length-prefixed
3098        // AAD, produce the same AAD bytestream. Verify our AAD
3099        // includes the length prefixes by tampering with
3100        // wrapped_dek_len and confirming AES-GCM auth fails.
3101        let kms = local_kms_with(&[("kk", [11u8; 32])]);
3102        let (dek_vec, wrapped) = kms.generate_dek("kk").await.unwrap();
3103        let mut dek = [0u8; 32];
3104        dek.copy_from_slice(&dek_vec);
3105        let mut ct = encrypt_with_source(
3106            b"length-shift defense",
3107            SseSource::Kms {
3108                dek: &dek,
3109                wrapped: &wrapped,
3110            },
3111        )
3112        .to_vec();
3113        let key_id_len = ct[5] as usize;
3114        let wrapped_len_off = 6 + key_id_len;
3115        // Shrink wrapped_dek_len by 1. parse_s4e4_header now reads a
3116        // shorter wrapped_dek and a different nonce/tag/ciphertext
3117        // alignment — KMS unwrap fails OR AES-GCM fails OR frame
3118        // bounds reject. All three surface as auditable errors;
3119        // none should reach a successful decrypt.
3120        let original_len = u32::from_be_bytes([
3121            ct[wrapped_len_off],
3122            ct[wrapped_len_off + 1],
3123            ct[wrapped_len_off + 2],
3124            ct[wrapped_len_off + 3],
3125        ]);
3126        let new_len = (original_len - 1).to_be_bytes();
3127        ct[wrapped_len_off..wrapped_len_off + 4].copy_from_slice(&new_len);
3128        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
3129        // Acceptable failure modes: unwrap fail (truncated wrapped
3130        // DEK), AES-GCM fail (shifted nonce/tag/AAD), or frame bounds.
3131        assert!(
3132            matches!(
3133                err,
3134                SseError::KmsBackend(_)
3135                    | SseError::DecryptFailed
3136                    | SseError::KmsFrameFieldOob { .. }
3137                    | SseError::KmsFrameTooShort { .. }
3138            ),
3139            "got {err:?}"
3140        );
3141    }
3142
3143    // -----------------------------------------------------------------------
3144    // v0.8 #52: S4E5 chunked SSE-S4 — encrypt_v2_chunked / decrypt_chunked_stream
3145    // -----------------------------------------------------------------------
3146
3147    use futures::StreamExt;
3148
3149    /// Drain a chunked-decrypt stream into a `Vec<Bytes>` for assertion.
3150    /// Surfaces the first error verbatim (so tests can match on it).
3151    async fn collect_chunks(
3152        s: impl futures::Stream<Item = Result<Bytes, SseError>>,
3153    ) -> Result<Vec<Bytes>, SseError> {
3154        let mut out = Vec::new();
3155        let mut s = std::pin::pin!(s);
3156        while let Some(item) = s.next().await {
3157            out.push(item?);
3158        }
3159        Ok(out)
3160    }
3161
3162    #[test]
3163    fn s4e6_encrypt_layout_10mb_at_1mib() {
3164        // v0.8.1 #57: encrypt_v2_chunked now emits S4E6 (24-byte
3165        // header, 8-byte salt). 10 MB plaintext at 1 MiB chunk
3166        // size → magic "S4E6", chunk_count=10, header bytes line
3167        // up to the documented 24 + 10 * 16 + 10 MB layout.
3168        let kr = keyring_single(0x42);
3169        let chunk_size = 1024 * 1024;
3170        let pt_len = 10 * 1024 * 1024;
3171        let pt = vec![0xAB_u8; pt_len];
3172        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).expect("encrypt ok");
3173        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new PUTs emit S4E6 (v0.8.1 #57)");
3174        assert_eq!(ct[4], ALGO_AES_256_GCM);
3175        assert_eq!(
3176            u16::from_be_bytes([ct[5], ct[6]]),
3177            1,
3178            "key_id BE = active id"
3179        );
3180        assert_eq!(ct[7], 0, "reserved must be 0");
3181        assert_eq!(
3182            u32::from_be_bytes([ct[8], ct[9], ct[10], ct[11]]),
3183            chunk_size as u32,
3184            "chunk_size BE",
3185        );
3186        assert_eq!(
3187            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3188            10,
3189            "chunk_count BE — 10 MiB / 1 MiB = 10 (no remainder)",
3190        );
3191        // Salt now 8 bytes — verify the slice exists and isn't all
3192        // zeros (defensive: catches a stuck PRNG that would leave
3193        // the salt array uninitialized).
3194        assert_eq!(&ct[16..24].len(), &8, "S4E6 salt slot is 8 bytes");
3195        assert_ne!(
3196            &ct[16..24],
3197            &[0u8; 8],
3198            "S4E6 salt must be random, not zeros"
3199        );
3200        assert_eq!(
3201            ct.len(),
3202            S4E6_HEADER_BYTES + 10 * S4E6_PER_CHUNK_OVERHEAD + pt_len,
3203            "total = header (24) + 10 tags + plaintext",
3204        );
3205        assert!(looks_encrypted(&ct), "looks_encrypted must accept S4E6");
3206        assert_eq!(peek_magic(&ct), Some("S4E6"));
3207    }
3208
3209    #[tokio::test]
3210    async fn s4e6_decrypt_chunked_stream_byte_equal() {
3211        // v0.8.1 #57: round-trip via S4E6 path. encrypt_v2_chunked
3212        // emits S4E6, decrypt_chunked_stream consumes S4E5/S4E6.
3213        let kr = keyring_single(0x55);
3214        let pt: Vec<u8> = (0..(10 * 1024 * 1024_u32))
3215            .map(|i| (i & 0xFF) as u8)
3216            .collect();
3217        let ct = encrypt_v2_chunked(&pt, &kr, 1024 * 1024).unwrap();
3218        // Sanity: the new PUT is S4E6.
3219        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new emit is S4E6");
3220        let stream = decrypt_chunked_stream(ct, &kr);
3221        let chunks = collect_chunks(stream).await.expect("stream ok");
3222        assert_eq!(chunks.len(), 10, "10 chunks expected for 10 MiB / 1 MiB");
3223        let mut joined = Vec::with_capacity(pt.len());
3224        for c in chunks {
3225            joined.extend_from_slice(&c);
3226        }
3227        assert_eq!(joined.len(), pt.len(), "byte length matches");
3228        assert_eq!(joined, pt, "byte-equal round-trip");
3229    }
3230
3231    #[tokio::test]
3232    async fn s4e6_single_chunk_for_small_object() {
3233        // Plaintext smaller than chunk_size → chunk_count=1. The
3234        // chunk_count field offset is unchanged between S4E5 and
3235        // S4E6 (both at body[12..16]); only the salt width differs.
3236        let kr = keyring_single(0x77);
3237        let pt = b"tiny payload, smaller than chunk_size";
3238        let ct = encrypt_v2_chunked(pt, &kr, 1024 * 1024).unwrap();
3239        assert_eq!(
3240            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3241            1,
3242            "small plaintext = single chunk",
3243        );
3244        let stream = decrypt_chunked_stream(ct, &kr);
3245        let chunks = collect_chunks(stream).await.expect("stream ok");
3246        assert_eq!(chunks.len(), 1);
3247        assert_eq!(chunks[0].as_ref(), pt);
3248    }
3249
3250    #[tokio::test]
3251    async fn s4e6_tampered_chunk_n_reports_chunk_index() {
3252        // v0.8.1 #57: same tamper-detect contract as the S4E5
3253        // version, just with the wider 24-byte header. Tamper
3254        // byte inside chunk index 3 (= 4th chunk) — the stream
3255        // must yield 3 successful chunks, then ChunkAuthFailed { 3 }.
3256        let kr = keyring_single(0x91);
3257        let chunk_size = 1024;
3258        let pt = vec![0xCD_u8; chunk_size * 8]; // 8 chunks
3259        let mut ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap().to_vec();
3260        // Locate chunk 3's first ciphertext byte: header (24) + 3 *
3261        // (tag 16 + ct 1024) + tag 16 = 24 + 3*1040 + 16 = 3160.
3262        let target = S4E6_HEADER_BYTES + 3 * (TAG_LEN + chunk_size) + TAG_LEN;
3263        ct[target] ^= 0x42;
3264        let stream = decrypt_chunked_stream(bytes::Bytes::from(ct), &kr);
3265        let mut s = std::pin::pin!(stream);
3266        // Chunks 0, 1, 2 must succeed.
3267        for expected_i in 0..3_u32 {
3268            let item = s.next().await.expect("yield");
3269            item.unwrap_or_else(|e| panic!("chunk {expected_i}: {e:?}"));
3270        }
3271        // Chunk 3 fails with the right index.
3272        let err = s.next().await.expect("yield error").unwrap_err();
3273        assert!(
3274            matches!(err, SseError::ChunkAuthFailed { chunk_index: 3 }),
3275            "got {err:?}",
3276        );
3277    }
3278
3279    #[tokio::test]
3280    async fn s4e5_back_compat_s4e2_blob_rejected_with_clear_error() {
3281        // Feeding an S4E2 frame to decrypt_chunked_stream should
3282        // surface BadMagic on the first poll (NOT silently fall
3283        // back — the caller is expected to peek_magic and dispatch).
3284        let kr = keyring_single(0x12);
3285        let s4e2 = encrypt_v2(b"a v2 blob, not chunked", &kr);
3286        let stream = decrypt_chunked_stream(s4e2, &kr);
3287        let result = collect_chunks(stream).await;
3288        let err = result.unwrap_err();
3289        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3290    }
3291
3292    #[test]
3293    fn s4e6_salt_randomness_smoke() {
3294        // 8-byte salt → birthday paradox 50% collision at ~2^32
3295        // (~4.3 billion) PUTs. 1024 PUTs → effectively zero
3296        // expected collisions; we don't enforce zero, just
3297        // sanity-check the salt actually differs more than half
3298        // the time (catches a stuck PRNG without a 4-billion-PUT
3299        // test).
3300        let kr = keyring_single(0x33);
3301        let mut salts = std::collections::HashSet::new();
3302        let n = 1024;
3303        for _ in 0..n {
3304            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3305            let mut salt = [0u8; 8];
3306            salt.copy_from_slice(&ct[16..24]);
3307            salts.insert(salt);
3308        }
3309        assert!(
3310            salts.len() > n / 2,
3311            "expected most of the {n} salts to be unique (got {} unique)",
3312            salts.len(),
3313        );
3314    }
3315
3316    #[test]
3317    fn s4e6_chunk_size_zero_invalid() {
3318        let kr = keyring_single(0x66);
3319        let err = encrypt_v2_chunked(b"hi", &kr, 0).unwrap_err();
3320        assert!(matches!(err, SseError::ChunkSizeInvalid));
3321    }
3322
3323    #[tokio::test]
3324    async fn s4e6_truncated_body_reports_frame_truncated() {
3325        // Truncate inside chunk 2's tag → ChunkFrameTruncated, not
3326        // panic, not silent success. Header is now 24 bytes (S4E6).
3327        let kr = keyring_single(0xA1);
3328        let chunk_size = 256;
3329        let pt = vec![0u8; chunk_size * 4];
3330        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3331        // Truncate to inside chunk 2's tag: header + chunk0 + chunk1
3332        // + 8B partial of chunk2's tag.
3333        let trunc = S4E6_HEADER_BYTES + 2 * (TAG_LEN + chunk_size) + 8;
3334        let truncated = bytes::Bytes::copy_from_slice(&ct[..trunc]);
3335        let stream = decrypt_chunked_stream(truncated, &kr);
3336        let result = collect_chunks(stream).await;
3337        let err = result.unwrap_err();
3338        assert!(
3339            matches!(err, SseError::ChunkFrameTruncated { .. }),
3340            "got {err:?}",
3341        );
3342    }
3343
3344    #[test]
3345    fn s4e6_decrypt_buffered_round_trip_via_top_level_decrypt() {
3346        // Sync `decrypt(blob, &keyring)` must also accept the
3347        // chunked frames (back-compat path for callers that need
3348        // the whole plaintext).
3349        let kr = keyring_single(0xDE);
3350        let pt = b"buffered sync decrypt path".repeat(32);
3351        let ct = encrypt_v2_chunked(&pt, &kr, 13).unwrap();
3352        let plain = decrypt(&ct, &kr).expect("buffered S4E6 decrypt ok");
3353        assert_eq!(plain.as_ref(), pt.as_slice());
3354    }
3355
3356    #[tokio::test]
3357    async fn s4e6_unknown_key_id_in_frame_errors() {
3358        // Encrypt under id=7, decrypt under a keyring that lacks id=7.
3359        let kr_put = SseKeyring::new(7, key32(0xCC));
3360        let kr_get = keyring_single(0xCC); // only id=1
3361        let ct = encrypt_v2_chunked(b"orphan key", &kr_put, 64).unwrap();
3362        // Sync path
3363        let err = decrypt(&ct, &kr_get).unwrap_err();
3364        assert!(
3365            matches!(err, SseError::KeyNotInKeyring { id: 7 }),
3366            "got {err:?}"
3367        );
3368        // Stream path
3369        let stream = decrypt_chunked_stream(ct, &kr_get);
3370        let result = collect_chunks(stream).await;
3371        assert!(
3372            matches!(result, Err(SseError::KeyNotInKeyring { id: 7 })),
3373            "got {result:?}",
3374        );
3375    }
3376
3377    #[tokio::test]
3378    async fn s4e6_final_chunk_smaller_than_chunk_size() {
3379        // Plaintext = 2.5 chunks → final chunk holds half the bytes.
3380        // S4E6 header = 24 bytes → total on-disk = 24 + 48 + 250.
3381        let kr = keyring_single(0xEF);
3382        let chunk_size = 100;
3383        let pt: Vec<u8> = (0..250_u32).map(|i| i as u8).collect();
3384        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3385        assert_eq!(
3386            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3387            3,
3388            "ceil(250/100) = 3 chunks",
3389        );
3390        // Total on-disk: 24 header + 3 tags (48) + 250 plaintext = 322.
3391        assert_eq!(ct.len(), S4E6_HEADER_BYTES + 48 + 250);
3392        let stream = decrypt_chunked_stream(ct, &kr);
3393        let chunks = collect_chunks(stream).await.expect("stream ok");
3394        assert_eq!(chunks.len(), 3);
3395        assert_eq!(chunks[0].len(), 100);
3396        assert_eq!(chunks[1].len(), 100);
3397        assert_eq!(chunks[2].len(), 50, "final chunk is the remainder");
3398        let joined: Vec<u8> = chunks.iter().flat_map(|c| c.iter().copied()).collect();
3399        assert_eq!(joined, pt);
3400    }
3401
3402    // -----------------------------------------------------------------------
3403    // v0.8.1 #57: S4E6-specific tests added on top of the renamed
3404    // s4e6_* battery above. Keep these focused on what's *new*:
3405    //   - back-compat read of legacy S4E5 blobs
3406    //   - 24-bit chunk_count cap
3407    //   - the parse_s4e6_header public API
3408    // -----------------------------------------------------------------------
3409
3410    #[test]
3411    fn s4e6_back_compat_read_s4e5_blob() {
3412        // Synthesize a "v0.8.0 vintage" S4E5 blob via the test-only
3413        // helper, then prove the v0.8.1 gateway decrypts it under
3414        // the same keyring — both via sync `decrypt` (buffered)
3415        // and the streaming path. Without this, every S4E5 object
3416        // in production becomes unreadable after the upgrade.
3417        let kr = keyring_single(0x57);
3418        let pt = b"v0.8.0 vintage chunked SSE-S4 object".repeat(64);
3419        let s4e5 = encrypt_v2_chunked_s4e5_for_test(&pt, &kr, 91).unwrap();
3420        // Confirm the test fixture really is S4E5 magic + 20-byte header.
3421        assert_eq!(&s4e5[..4], SSE_MAGIC_V5, "fixture must be S4E5");
3422        assert_eq!(peek_magic(&s4e5), Some("S4E5"));
3423        // Sync decrypt path (top-level `decrypt`, dispatches V5 + V6).
3424        let plain_sync = decrypt(&s4e5, &kr).expect("sync S4E5 decrypt ok");
3425        assert_eq!(plain_sync.as_ref(), pt.as_slice());
3426        // Streaming decrypt path — must also accept S4E5.
3427        let collected = futures::executor::block_on(async {
3428            let stream = decrypt_chunked_stream(s4e5.clone(), &kr);
3429            collect_chunks(stream).await
3430        })
3431        .expect("stream S4E5 decrypt ok");
3432        let mut joined = Vec::with_capacity(pt.len());
3433        for c in collected {
3434            joined.extend_from_slice(&c);
3435        }
3436        assert_eq!(joined, pt, "S4E5 streaming round-trip byte-equal");
3437    }
3438
3439    #[test]
3440    fn s4e6_layout_24_bytes_header() {
3441        // Sanity check: the S4E6 fixed header is exactly 24 bytes
3442        // (vs 20 for S4E5). Catches an accidental const drift in a
3443        // future PR.
3444        assert_eq!(S4E6_HEADER_BYTES, 24);
3445        assert_eq!(S4E6_PER_CHUNK_OVERHEAD, TAG_LEN);
3446        assert_eq!(S4E6_HEADER_BYTES, S4E5_HEADER_BYTES + 4);
3447    }
3448
3449    #[test]
3450    fn s4e6_parse_header_round_trip() {
3451        // parse_s4e6_header is the public mirror of the internal
3452        // parse_chunked_header, useful for admin tools. Verify it
3453        // returns the same field values that encrypt_v2_chunked wrote.
3454        let kr = keyring_single(0xAB);
3455        let chunk_size = 256;
3456        let pt = vec![1u8; 7 * chunk_size];
3457        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3458        let hdr = parse_s4e6_header(&ct).expect("parse ok");
3459        assert_eq!(hdr.key_id, 1);
3460        assert_eq!(hdr.chunk_size, chunk_size as u32);
3461        assert_eq!(hdr.chunk_count, 7);
3462        assert_eq!(hdr.salt.len(), 8);
3463        // Bad magic on a non-S4E6 blob → BadMagic.
3464        let bogus = b"S4E2\x01\x00\x00\x00........................";
3465        let err = parse_s4e6_header(bogus).unwrap_err();
3466        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3467        // Truncation → ChunkFrameTruncated.
3468        let err2 = parse_s4e6_header(&ct[..10]).unwrap_err();
3469        assert!(
3470            matches!(err2, SseError::ChunkFrameTruncated { .. }),
3471            "got {err2:?}"
3472        );
3473    }
3474
3475    #[test]
3476    fn s4e6_salt_uniqueness_smoke_16m() {
3477        // v0.8.1 #57 security regression detection: with the
3478        // 4-byte S4E5 salt, ~65,536 PUTs already had ~50%
3479        // birthday collision (the bug that motivated this patch).
3480        // With the 8-byte S4E6 salt the expected collisions over
3481        // 65,536 PUTs is ~2^16 * 2^16 / 2^65 ≈ 2^-33 — i.e.
3482        // effectively zero.
3483        //
3484        // We can't actually run 16M PUTs in unit-test wall-clock
3485        // (each PUT does an AES-GCM encrypt), so we run a fast
3486        // smoke (16k) and additionally validate the math: at the
3487        // S4E5 4-byte salt width, 16k PUTs would already give a
3488        // ~3.1% collision probability by birthday bound; at the
3489        // S4E6 8-byte salt that drops to ~3.6e-11. The smoke test
3490        // therefore *would* show collisions if we'd accidentally
3491        // shipped the 4-byte salt — confirming the regression
3492        // detector.
3493        let kr = keyring_single(0xA6);
3494        let mut salts = std::collections::HashSet::with_capacity(16384);
3495        let n = 16384_usize;
3496        let mut collisions_top4 = 0usize;
3497        let mut top4_seen = std::collections::HashSet::with_capacity(16384);
3498        for _ in 0..n {
3499            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3500            let mut salt = [0u8; 8];
3501            salt.copy_from_slice(&ct[16..24]);
3502            salts.insert(salt);
3503            // Side-channel: count collisions on just the *first 4
3504            // bytes* of the 8-byte salt. If we'd kept the old
3505            // 4-byte salt, this collision count would be the only
3506            // collision count — and at n=16k it should be ~62
3507            // (birthday: n^2/(2 * 2^32) = 16384^2/2^33 ≈ 31, with
3508            // some noise). The full 8-byte salt test passes if the
3509            // FULL salts are all unique while the truncated-to-4
3510            // count is non-zero, proving the extra 4 bytes really
3511            // are doing the security work.
3512            let mut top4 = [0u8; 4];
3513            top4.copy_from_slice(&salt[..4]);
3514            if !top4_seen.insert(top4) {
3515                collisions_top4 += 1;
3516            }
3517        }
3518        assert_eq!(
3519            salts.len(),
3520            n,
3521            "all 8-byte salts must be unique across {n} PUTs (got {} unique)",
3522            salts.len(),
3523        );
3524        // Sanity check the regression detector: at 16k PUTs with a
3525        // 4-byte salt, birthday math predicts ~31 collisions on
3526        // average. Anything in the 0..200 range is statistically
3527        // believable for 16k uniform 32-bit draws; we only assert
3528        // ">= 1" (i.e. at least one collision happened — which
3529        // would have been a real bug under S4E5).
3530        eprintln!(
3531            "s4e6_salt_uniqueness_smoke_16m: 16k PUTs, full 8B salts \
3532             all unique ({}/{}), simulated 4B-truncated salt yielded \
3533             {} collisions (this is what S4E5 would have shipped)",
3534            salts.len(),
3535            n,
3536            collisions_top4,
3537        );
3538        // Don't make the test flaky on the simulated number (it's a
3539        // statistical signal); just leave the eprintln for the
3540        // operator audit log when the test runs verbose.
3541    }
3542
3543    #[test]
3544    fn s4e6_max_chunks_24bit() {
3545        // The S4E6 nonce embeds the chunk index as 24-bit BE, so
3546        // chunk_count > 2^24 - 1 must surface ChunkCountTooLarge
3547        // at PUT time. We can't actually run a 16M-chunk encrypt
3548        // in unit-test wall-clock (16M AES-GCM tag verifies even
3549        // on AES-NI is several minutes), but we can verify the
3550        // CAP constant matches expectations + exercise the cap by
3551        // picking a chunk_size that forces overflow on a tiny
3552        // plaintext.
3553        assert_eq!(S4E6_MAX_CHUNK_COUNT, (1u32 << 24) - 1);
3554        assert_eq!(S4E6_MAX_CHUNK_COUNT, 16_777_215);
3555
3556        // chunk_size=1 + plaintext.len()=16_777_216 → 16M+1 chunks
3557        // → over the cap → ChunkCountTooLarge. Allocating a 16
3558        // MiB plaintext is fine.
3559        let kr = keyring_single(0xC4);
3560        let pt = vec![0u8; (S4E6_MAX_CHUNK_COUNT as usize) + 1]; // 16,777,216 B
3561        let err = encrypt_v2_chunked(&pt, &kr, 1).unwrap_err();
3562        assert!(
3563            matches!(
3564                err,
3565                SseError::ChunkCountTooLarge {
3566                    got: 16_777_216,
3567                    max: 16_777_215
3568                }
3569            ),
3570            "got {err:?}",
3571        );
3572
3573        // And just under the cap (chunk_count = 16_777_215) should
3574        // succeed. We pick chunk_size that produces exactly the cap
3575        // so the inner loop only runs N times. 16M chunk-encrypts
3576        // would be slow, so test with a smaller cap-near config
3577        // that exercises the same boundary check: 1023 chunks of 1
3578        // byte each = 1023 chunks well under the cap → success.
3579        // The actual on-cap encrypt is exercised by the buffered
3580        // decrypt path through `parse_chunked_header`.
3581        let pt_ok = vec![0u8; 1023];
3582        let ct = encrypt_v2_chunked(&pt_ok, &kr, 1).expect("under-cap PUT must succeed");
3583        let hdr = parse_s4e6_header(&ct).unwrap();
3584        assert_eq!(hdr.chunk_count, 1023);
3585
3586        // Synthesize a frame that *claims* chunk_count > cap and
3587        // verify the parser rejects it (defensive: a tampered
3588        // header should not loop the walker 16M+ times).
3589        let mut tampered = ct.to_vec();
3590        // Rewrite chunk_count BE to S4E6_MAX_CHUNK_COUNT + 1 = 2^24.
3591        let bad = (S4E6_MAX_CHUNK_COUNT + 1).to_be_bytes();
3592        tampered[12..16].copy_from_slice(&bad);
3593        let err2 = parse_s4e6_header(&tampered).unwrap_err();
3594        assert!(
3595            matches!(
3596                err2,
3597                SseError::ChunkCountTooLarge {
3598                    got: 16_777_216,
3599                    max: 16_777_215
3600                }
3601            ),
3602            "got {err2:?}",
3603        );
3604    }
3605
3606    #[test]
3607    fn s4e6_nonce_v6_layout() {
3608        // Direct unit test on nonce_v6: prefix b'E', then 8B salt,
3609        // then 24-bit BE chunk_index. The high byte of u32
3610        // chunk_index must be dropped (caller-validated cap).
3611        let salt = [0xAA_u8; 8];
3612        let n0 = nonce_v6(&salt, 0);
3613        assert_eq!(n0[0], b'E');
3614        assert_eq!(&n0[1..9], &salt);
3615        assert_eq!(&n0[9..12], &[0, 0, 0]);
3616        let n1 = nonce_v6(&salt, 1);
3617        assert_eq!(&n1[9..12], &[0, 0, 1]);
3618        let n_mid = nonce_v6(&salt, 0x123456);
3619        assert_eq!(&n_mid[9..12], &[0x12, 0x34, 0x56]);
3620        let n_max = nonce_v6(&salt, S4E6_MAX_CHUNK_COUNT);
3621        assert_eq!(&n_max[9..12], &[0xFF, 0xFF, 0xFF]);
3622    }
3623
3624    #[tokio::test]
3625    async fn s4e6_tampered_salt_byte_fails_aead() {
3626        // Flipping a single byte of the 8-byte salt in the header
3627        // must invalidate every chunk's AES-GCM tag (salt is in
3628        // the AAD). Confirms the salt expansion didn't drop
3629        // header authentication.
3630        let kr = keyring_single(0xB6);
3631        let pt = b"salt-in-aad coverage".repeat(64);
3632        let mut ct = encrypt_v2_chunked(&pt, &kr, 128).unwrap().to_vec();
3633        // Salt bytes 16..24 — flip the middle byte.
3634        ct[20] ^= 0x01;
3635        let err = decrypt(&ct, &kr).unwrap_err();
3636        assert!(
3637            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3638            "got {err:?}",
3639        );
3640    }
3641
3642    // -----------------------------------------------------------------------
3643    // v0.8.2 #64: pre-allocation guard for chunked SSE frames
3644    //
3645    // The buffered decrypt path used to call `Vec::with_capacity(chunk_size
3646    // * chunk_count)` *before* validating either factor. A 24-byte malicious
3647    // S4E6 header could declare a multi-PB plaintext and trigger an instant
3648    // OOM / panic on a tiny ciphertext. Verify the guard rejects every
3649    // adversarial header pattern before any allocation happens.
3650    // -----------------------------------------------------------------------
3651
3652    /// Build a minimal S4E6 header with attacker-controlled chunk_size /
3653    /// chunk_count (no chunks attached — the body is exactly 24 bytes).
3654    /// Used by the DoS tests below to synthesize malicious frames without
3655    /// running an encrypt.
3656    fn synth_s4e6_header(chunk_size: u32, chunk_count: u32) -> Vec<u8> {
3657        let mut blob = Vec::with_capacity(S4E6_HEADER_BYTES);
3658        blob.extend_from_slice(SSE_MAGIC_V6);
3659        blob.push(ALGO_AES_256_GCM);
3660        blob.extend_from_slice(&1_u16.to_be_bytes()); // key_id = 1
3661        blob.push(0); // reserved
3662        blob.extend_from_slice(&chunk_size.to_be_bytes());
3663        blob.extend_from_slice(&chunk_count.to_be_bytes());
3664        blob.extend_from_slice(&[0u8; 8]); // 8-byte salt
3665        debug_assert_eq!(blob.len(), S4E6_HEADER_BYTES);
3666        blob
3667    }
3668
3669    #[test]
3670    fn s4e6_header_claims_huge_size_rejected_pre_alloc() {
3671        // 1 GiB chunk_size × 100 chunks = 100 GiB declared plaintext, but
3672        // the body the attacker actually shipped is only 100 bytes. The
3673        // pre-alloc guard's cap arm fires (100 GiB > 5 GiB default cap)
3674        // *before* the walker / `Vec::with_capacity(100 GiB)` — surfaces
3675        // as ChunkFrameTooLarge.
3676        let kr = keyring_single(0x01);
3677        let chunk_size: u32 = 1 << 30; // 1 GiB
3678        let chunk_count: u32 = 100;
3679        let mut blob = synth_s4e6_header(chunk_size, chunk_count);
3680        // Pad with 100 random bytes — the attack model: tiny payload,
3681        // monster header.
3682        blob.extend_from_slice(&[0u8; 100]);
3683        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3684        assert!(
3685            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3686            "expected ChunkFrameTooLarge (declared 100 GiB > 5 GiB cap), got {err:?}",
3687        );
3688        // Same input, tighter caller-supplied cap (1 MiB): still
3689        // ChunkFrameTooLarge. No panic, no OOM either way.
3690        let err2 = decrypt_chunked_buffered(&blob, &kr, 1024 * 1024).unwrap_err();
3691        assert!(
3692            matches!(err2, SseError::ChunkFrameTooLarge { .. }),
3693            "expected ChunkFrameTooLarge under tighter cap, got {err2:?}",
3694        );
3695    }
3696
3697    #[test]
3698    fn s4e6_header_chunk_size_x_chunk_count_overflows_u64() {
3699        // chunk_size = u32::MAX, chunk_count > 1 → product > u64 cap?
3700        // No — u32::MAX * u32::MAX fits in u64 (~2^64). To force u64
3701        // overflow we'd need both > 2^32. But chunk_count is capped to
3702        // 16M (S4E6_MAX_CHUNK_COUNT) by the earlier check, so the
3703        // realistic adversarial maximum is u32::MAX * 16M ≈ 2^56 — well
3704        // under u64::MAX. The overflow path is therefore *theoretically*
3705        // unreachable on S4E6 (24-bit chunk_count cap protects it), but
3706        // we still test it via S4E5 which has no 24-bit cap on
3707        // chunk_count: a 4-byte salt frame with chunk_size = u32::MAX
3708        // and chunk_count = u32::MAX gives 2^64 - 2^33 + 1 — fits u64
3709        // but adding the 16-byte tag overhead × u32::MAX chunks
3710        // overflows. Verify ChunkFrameTooLarge surfaces.
3711        let kr = keyring_single(0x02);
3712        // Synthesize an S4E5 header (20 bytes) with maximally
3713        // adversarial chunk_size + chunk_count.
3714        let mut blob = Vec::with_capacity(S4E5_HEADER_BYTES);
3715        blob.extend_from_slice(SSE_MAGIC_V5);
3716        blob.push(ALGO_AES_256_GCM);
3717        blob.extend_from_slice(&1_u16.to_be_bytes());
3718        blob.push(0);
3719        blob.extend_from_slice(&u32::MAX.to_be_bytes()); // chunk_size = 2^32 - 1
3720        blob.extend_from_slice(&u32::MAX.to_be_bytes()); // chunk_count = 2^32 - 1
3721        blob.extend_from_slice(&[0u8; 4]); // 4-byte S4E5 salt
3722        debug_assert_eq!(blob.len(), S4E5_HEADER_BYTES);
3723        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3724        assert!(
3725            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3726            "expected ChunkFrameTooLarge for u64 overflow, got {err:?}",
3727        );
3728
3729        // Also smoke the same input through `parse_chunked_header`
3730        // directly (the streaming path) — must error, never panic.
3731        let direct = parse_chunked_header(&blob, usize::MAX).unwrap_err();
3732        assert!(
3733            matches!(direct, SseError::ChunkFrameTooLarge { .. }),
3734            "streaming path: expected ChunkFrameTooLarge, got {direct:?}",
3735        );
3736    }
3737
3738    #[test]
3739    fn s4e6_header_within_max_body_bytes_passes() {
3740        // 1 MiB chunk_size × 100 chunks = 100 MiB declared plaintext —
3741        // well under the 5 GiB default cap. The header validation must
3742        // NOT reject this; instead it falls through to chunk-walk +
3743        // AES-GCM verify (which then fails on this synthetic frame
3744        // because we didn't write any ciphertext, so we expect
3745        // ChunkFrameTruncated *not* ChunkFrameTooLarge).
3746        let kr = keyring_single(0x03);
3747        let chunk_size: u32 = 1024 * 1024; // 1 MiB
3748        let chunk_count: u32 = 100;
3749        let mut blob = synth_s4e6_header(chunk_size, chunk_count);
3750        // Append the full declared chunk array so the truncation arm
3751        // doesn't fire — 100 chunks × (16 B tag + 1 MiB ct) = 100 MiB
3752        // + 1.6 KiB tags. We write zeros; AES-GCM verify will fail on
3753        // chunk 0, but that proves the pre-alloc guard *let it through*.
3754        let chunk_array_size =
3755            (chunk_count as usize) * (S4E6_PER_CHUNK_OVERHEAD + chunk_size as usize);
3756        blob.resize(blob.len() + chunk_array_size, 0);
3757        let err = decrypt_chunked_buffered(&blob, &kr, DEFAULT_MAX_BODY_BYTES).unwrap_err();
3758        // The guard let it through (we got past parse_chunked_header)
3759        // and AES-GCM tag verify failed on chunk 0 — that's the right
3760        // failure mode. ChunkFrameTooLarge would mean the cap fired
3761        // (a regression of this test). KeyNotInKeyring would mean the
3762        // header parsed but the key id (1) wasn't present (also a
3763        // regression — the keyring has id=1 active). Anything else is
3764        // a guard-too-strict bug.
3765        assert!(
3766            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3767            "expected ChunkAuthFailed (guard let it through), got {err:?}",
3768        );
3769    }
3770
3771    #[test]
3772    fn s4e6_header_exceeds_max_body_bytes_rejected() {
3773        // 1 MiB × 6000 chunks = 6 GiB declared plaintext > 5 GiB cap.
3774        // Must reject as ChunkFrameTooLarge before any alloc. We don't
3775        // attach the 6 GiB chunk array to the body — the cap arm only
3776        // looks at the declared `chunk_size × chunk_count`, not the
3777        // actual body length, so a tiny body suffices to exercise the
3778        // cap.
3779        let kr = keyring_single(0x04);
3780        let chunk_size: u32 = 1024 * 1024; // 1 MiB
3781        let chunk_count: u32 = 6000;
3782        let blob = synth_s4e6_header(chunk_size, chunk_count);
3783        // Body is exactly the 24-byte header, no chunks attached.
3784        // body.len() = 24 ≤ max_total (~6 GiB) so the trailing-bytes
3785        // check does NOT fire; the 6 GiB > 5 GiB cap check does.
3786        let err = decrypt_chunked_buffered(&blob, &kr, DEFAULT_MAX_BODY_BYTES).unwrap_err();
3787        assert!(
3788            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3789            "expected ChunkFrameTooLarge (6 GiB declared > 5 GiB cap), got {err:?}",
3790        );
3791
3792        // Directly exercise the cap arm with a tighter cap (1 MiB)
3793        // and a 100 MiB declared plaintext that fits fully in the
3794        // body, so the cap is unambiguously the deciding factor.
3795        let chunk_size_b: u32 = 1024 * 1024; // 1 MiB
3796        let chunk_count_b: u32 = 100;
3797        let mut blob_b = synth_s4e6_header(chunk_size_b, chunk_count_b);
3798        let pad_b = (chunk_count_b as usize) * (S4E6_PER_CHUNK_OVERHEAD + chunk_size_b as usize);
3799        blob_b.resize(blob_b.len() + pad_b, 0);
3800        // Cap = 1 MiB, declared plaintext = 100 MiB → ChunkFrameTooLarge.
3801        let err_b = decrypt_chunked_buffered(&blob_b, &kr, 1024 * 1024).unwrap_err();
3802        assert!(
3803            matches!(err_b, SseError::ChunkFrameTooLarge { .. }),
3804            "expected ChunkFrameTooLarge (cap < declared), got {err_b:?}",
3805        );
3806    }
3807
3808    #[test]
3809    fn s4e6_random_header_never_panics() {
3810        // DoS regression — feed 100k random byte sequences shaped like
3811        // chunked SSE headers to `parse_chunked_header`. Every result
3812        // must be Ok or Err; no panic, no OOM. This exercises the
3813        // arithmetic-overflow + truncation arms of the v0.8.2 #64
3814        // guard against adversarial inputs the previous unit tests
3815        // didn't enumerate.
3816        //
3817        // We use a fixed seed (deterministic) rather than proptest
3818        // here so the test is repeatable across CI runs without
3819        // pulling in shrink machinery for a pure no-panic property.
3820        use rand::{Rng, SeedableRng, rngs::StdRng};
3821        let mut rng = StdRng::seed_from_u64(0xC0FF_EE64_6464_64DE);
3822        let mut max_body_bytes_choices = [
3823            0_usize,
3824            1024,
3825            1024 * 1024,
3826            DEFAULT_MAX_BODY_BYTES,
3827            usize::MAX,
3828        ]
3829        .iter()
3830        .copied()
3831        .cycle();
3832        for _ in 0..100_000 {
3833            // Body length in [0, 256] — tiny on purpose: most random
3834            // bytes won't be a valid header, but we want the parser
3835            // to robustly reject every shape (truncated, wrong magic,
3836            // wrong algo, declared-vs-actual mismatch, overflow).
3837            let body_len = rng.gen_range(0..=256_usize);
3838            let mut body = vec![0u8; body_len];
3839            rng.fill(body.as_mut_slice());
3840            // Bias 1/4 of trials toward valid magic so the deeper
3841            // arithmetic paths get exercised, not just the BadMagic
3842            // early-out.
3843            if body_len >= 4 && rng.gen_bool(0.25) {
3844                if rng.gen_bool(0.5) {
3845                    body[..4].copy_from_slice(SSE_MAGIC_V5);
3846                } else {
3847                    body[..4].copy_from_slice(SSE_MAGIC_V6);
3848                }
3849            }
3850            let max_cap = max_body_bytes_choices.next().unwrap();
3851            // The contract: never panic, never alloc more than
3852            // `max_cap` worth of memory. We assert only no-panic
3853            // here; OOM behavior is implicit (the function returns
3854            // Err before any large alloc).
3855            let _ = parse_chunked_header(&body, max_cap);
3856        }
3857    }
3858
3859    // Bonus: an extreme-overflow case the guard is specifically
3860    // hardened against — chunk_count = u32::MAX with the per-chunk
3861    // overhead multiplication forced to overflow u64. Catches a
3862    // future refactor that would replace `checked_mul` with `*`.
3863    #[test]
3864    fn s4e5_extreme_overflow_chunk_count_u32_max() {
3865        let kr = keyring_single(0x05);
3866        // u32::MAX chunk_size × u32::MAX chunk_count overflows u64
3867        // when adding the 16 * u32::MAX tag overhead. (Strictly:
3868        // chunk_size × chunk_count = (2^32-1)^2 ≈ 2^64 - 2^33 + 1
3869        // — already u64-saturating; the +tag step then overflows.)
3870        // The guard's `checked_add` chain must catch this.
3871        let mut blob = Vec::with_capacity(S4E5_HEADER_BYTES);
3872        blob.extend_from_slice(SSE_MAGIC_V5);
3873        blob.push(ALGO_AES_256_GCM);
3874        blob.extend_from_slice(&1_u16.to_be_bytes());
3875        blob.push(0);
3876        blob.extend_from_slice(&u32::MAX.to_be_bytes());
3877        blob.extend_from_slice(&u32::MAX.to_be_bytes());
3878        blob.extend_from_slice(&[0u8; 4]);
3879        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3880        assert!(
3881            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3882            "expected ChunkFrameTooLarge for extreme overflow, got {err:?}",
3883        );
3884    }
3885}