Skip to main content

s4_server/
sse.rs

1//! Server-side encryption (SSE-S4) — AES-256-GCM (v0.4 #21, v0.5 #29, v0.5 #27, v0.5 #28).
2//!
3//! Wraps the post-compression S3 object body with authenticated
4//! encryption. Compress-then-encrypt is the right order: encryption
5//! produces high-entropy bytes that don't compress, so encrypting last
6//! preserves the codec's ratio.
7//!
8//! ## Wire formats
9//!
10//! ### S4E1 (v0.4) — single key, no rotation
11//!
12//! ```text
13//! [magic: "S4E1" 4B]
14//! [algo:  u8]            # 1 = AES-256-GCM
15//! [reserved: 3B]         # 0x00 0x00 0x00
16//! [nonce: 12B]           # random per-object
17//! [tag:   16B]           # AES-GCM authentication tag
18//! [ciphertext: variable] # encrypted-then-authenticated body
19//! ```
20//!
21//! Total overhead: 36 bytes per object.
22//!
23//! ### S4E2 (v0.5 #29) — keyring-aware, supports rotation
24//!
25//! ```text
26//! [magic:  "S4E2" 4B]
27//! [algo:   u8]            # 1 = AES-256-GCM
28//! [key_id: u16 BE]        # which keyring slot encrypted this body
29//! [reserved: 1B]          # 0x00
30//! [nonce:  12B]           # random per-object
31//! [tag:    16B]           # AES-GCM authentication tag
32//! [ciphertext: variable]
33//! ```
34//!
35//! Same 36-byte overhead — we reused the 3-byte reserved area in S4E1
36//! to fit a 2-byte key-id + 1-byte reserved without bumping the header
37//! size. The key-id is included in the AAD so a flipped key-id byte
38//! fails the auth tag (i.e. an attacker can't trick the gateway into
39//! decrypting under a different keyring slot).
40//!
41//! ### S4E3 (v0.5 #27) — SSE-C, customer-provided key
42//!
43//! ```text
44//! [magic:   "S4E3" 4B]
45//! [algo:    u8]            # 1 = AES-256-GCM
46//! [key_md5: 16B]           # MD5 fingerprint of the customer key
47//! [nonce:   12B]           # random per-object
48//! [tag:     16B]           # AES-GCM authentication tag
49//! [ciphertext: variable]
50//! ```
51//!
52//! Overhead: 49 bytes (`4 + 1 + 16 + 12 + 16`). Unlike S4E1/S4E2 the
53//! gateway does **not** persist the key — the client supplies it on
54//! every PUT/GET via `x-amz-server-side-encryption-customer-{algorithm,
55//! key,key-MD5}` headers. We store only the 16-byte MD5 in the on-disk
56//! frame so a GET with the wrong key surfaces as
57//! [`SseError::WrongCustomerKey`] before AES-GCM is even tried (saves a
58//! useless decrypt + gives operators a distinct error from generic auth
59//! failure).
60//!
61//! The `key_md5` is included in the AAD so flipping a single byte of
62//! the stored fingerprint also breaks AES-GCM auth — i.e. an attacker
63//! who tampered with the metadata can't sneak a different key past the
64//! check.
65//!
66//! ### S4E4 (v0.5 #28) — SSE-KMS envelope, per-object DEK
67//!
68//! ```text
69//! [magic:           "S4E4" 4B]
70//! [algo:            u8]            # 1 = AES-256-GCM
71//! [key_id_len:      u8]            # 1..=255, length of UTF-8 key_id
72//! [key_id:          variable]      # UTF-8, AAD-authenticated
73//! [wrapped_dek_len: u32 BE]        # length of the wrapped DEK blob
74//! [wrapped_dek:     variable]      # opaque, AAD-authenticated
75//! [nonce:           12B]           # random per-object
76//! [tag:             16B]           # AES-GCM auth tag for body
77//! [ciphertext:      variable]      # body encrypted under the DEK
78//! ```
79//!
80//! Header overhead: `4 + 1 + 1 + key_id_len + 4 + wrapped_dek_len + 12
81//! + 16` = 38 + key_id_len + wrapped_dek_len. For a typical
82//! [`crate::kms::LocalKms`] wrap (60-byte ciphertext) and a 36-char
83//! UUID-style `key_id`, that's ~134 bytes per object.
84//!
85//! `key_id` and `wrapped_dek` are both placed in the AAD so an
86//! attacker cannot rewrite either field to point the gateway at a
87//! different KEK or wrapped DEK without invalidating the body's
88//! AES-GCM tag. The plaintext DEK is never persisted; only the
89//! wrapped form is on disk, and the gateway holds the plaintext only
90//! for the duration of one PUT or GET.
91//!
92//! S4E4 decrypt requires an `async` round-trip to the KMS backend
93//! (to unwrap the DEK), so the synchronous [`decrypt`] function
94//! refuses S4E4 with [`SseError::KmsAsyncRequired`] — callers that
95//! peek `S4E4` via [`peek_magic`] must dispatch to
96//! [`decrypt_with_kms`] instead.
97//!
98//! ## v0.5 rotation flow (SSE-S4 only)
99//!
100//! Operators wire one [`SseKeyring`] holding the **active** key plus
101//! any number of **retired** keys. PUT always encrypts under the
102//! active key (S4E2 with that key's id). GET sniffs the magic:
103//!
104//! - `S4E1`: legacy single-key path. The keyring's active key is tried
105//!   first, then every retired key — this lets a v0.4 deployment
106//!   migrate to a keyring with the original key as active and decrypt
107//!   pre-rotation objects unchanged.
108//! - `S4E2`: read the key_id, look it up in the keyring, decrypt with
109//!   that exact key. Missing key_id surfaces as `KeyNotInKeyring`.
110//! - `S4E3`: keyring is **not** consulted. Caller must supply
111//!   [`SseSource::CustomerKey`] with the matching key + md5.
112//!
113//! ## Open follow-ups
114//!
115//! - **Server-managed key only** (for SSE-S4): keys come from local
116//!   files via `--sse-s4-key` / `--sse-s4-key-rotated`. KMS / vault
117//!   integration for the SSE-S4 keyring (i.e. wrapping the keyring's
118//!   keys with KMS) is a separate issue. SSE-KMS for per-object DEKs
119//!   is implemented (see [`SseSource::Kms`] + S4E4 above).
120
121use std::collections::HashMap;
122use std::path::Path;
123use std::sync::Arc;
124
125use aes_gcm::aead::{Aead, KeyInit, Payload};
126use aes_gcm::{Aes256Gcm, Key, Nonce};
127use bytes::Bytes;
128use md5::{Digest as Md5Digest, Md5};
129use rand::RngCore;
130use thiserror::Error;
131
132use crate::kms::{KmsBackend, KmsError, WrappedDek};
133
134pub const SSE_MAGIC_V1: &[u8; 4] = b"S4E1";
135pub const SSE_MAGIC_V2: &[u8; 4] = b"S4E2";
136pub const SSE_MAGIC_V3: &[u8; 4] = b"S4E3";
137pub const SSE_MAGIC_V4: &[u8; 4] = b"S4E4";
138/// v0.8 #52: chunked variant of S4E2 — same SSE-S4 keyring source,
139/// but the body is sliced into independently-sealed AES-GCM chunks
140/// so the GET path can stream-decrypt + emit chunk-by-chunk instead
141/// of buffering the entire object before tag verify. See
142/// [`encrypt_v2_chunked`] / [`decrypt_chunked_stream`] for the on-
143/// the-wire layout.
144///
145/// **Read-only as of v0.8.1 #57** — new PUTs emit [`SSE_MAGIC_V6`]
146/// (S4E6). S4E5 is kept around for back-compat decrypt of objects
147/// written by v0.8.0.
148pub const SSE_MAGIC_V5: &[u8; 4] = b"S4E5";
149/// v0.8.1 #57: identical layout to S4E5 except the per-PUT salt is
150/// widened from 4 → 8 bytes so the birthday-collision threshold on
151/// AES-GCM nonce reuse jumps from ~65k PUTs/key to ~4 billion. See
152/// [`encrypt_v2_chunked`] (now emits S4E6) / the S4E6 wire-format
153/// docs further down for the full layout.
154pub const SSE_MAGIC_V6: &[u8; 4] = b"S4E6";
155/// Back-compat alias — v0.4 callers that imported `SSE_MAGIC` mean S4E1.
156pub const SSE_MAGIC: &[u8; 4] = SSE_MAGIC_V1;
157
158/// Header layout matches between S4E1 and S4E2 (both 36 bytes total)
159/// because S4E2 reuses the 3-byte reserved slot to fit `key_id (2B) +
160/// reserved (1B)`. Keeping them the same length means the rest of the
161/// pipeline (sidecar offsets, multipart math) doesn't care which
162/// frame variant is in flight.
163pub const SSE_HEADER_BYTES: usize = 4 + 1 + 3 + 12 + 16; // = 36
164/// S4E3 (SSE-C) replaces the 3-byte reserved area with a 16-byte
165/// customer-key MD5 fingerprint, so the header is 49 bytes total.
166/// `magic 4 + algo 1 + key_md5 16 + nonce 12 + tag 16`.
167pub const SSE_HEADER_BYTES_V3: usize = 4 + 1 + KEY_MD5_LEN + 12 + 16; // = 49
168pub const ALGO_AES_256_GCM: u8 = 1;
169const NONCE_LEN: usize = 12;
170const TAG_LEN: usize = 16;
171const KEY_LEN: usize = 32;
172const KEY_MD5_LEN: usize = 16;
173/// AWS S3 SSE-C only allows AES256 in the
174/// `x-amz-server-side-encryption-customer-algorithm` header, so we
175/// match that exact spelling for parity with real S3 clients.
176pub const SSE_C_ALGORITHM: &str = "AES256";
177
178#[derive(Debug, Error)]
179pub enum SseError {
180    #[error("SSE key file {path:?}: {source}")]
181    KeyFileIo {
182        path: std::path::PathBuf,
183        source: std::io::Error,
184    },
185    #[error(
186        "SSE key file must be exactly 32 raw bytes (or 64-char hex / 44-char base64); got {got} bytes after parse"
187    )]
188    BadKeyLength { got: usize },
189    #[error("SSE-encrypted body too short ({got} bytes; need at least {SSE_HEADER_BYTES})")]
190    TooShort { got: usize },
191    #[error("SSE bad magic: expected S4E1/S4E2/S4E3/S4E4/S4E5/S4E6, got {got:?}")]
192    BadMagic { got: [u8; 4] },
193    #[error("SSE unsupported algo tag: {tag} (this build only knows AES-256-GCM = 1)")]
194    UnsupportedAlgo { tag: u8 },
195    #[error(
196        "SSE key_id {id} (S4E2 frame) not present in keyring; rotation history likely incomplete"
197    )]
198    KeyNotInKeyring { id: u16 },
199    #[error("SSE decryption / authentication failed (key mismatch or ciphertext tampered with)")]
200    DecryptFailed,
201    // --- v0.5 #27: SSE-C specific errors ---
202    /// The MD5 fingerprint stored in the S4E3 frame doesn't match the
203    /// MD5 of the customer key the client supplied. This is the
204    /// "wrong customer key on GET" signal — distinct from
205    /// `DecryptFailed` so service.rs can map it to AWS S3's
206    /// `403 AccessDenied` (S3 returns AccessDenied when the supplied
207    /// SSE-C key doesn't match the one used at PUT time).
208    #[error("SSE-C key MD5 fingerprint mismatch — client supplied a different key than PUT")]
209    WrongCustomerKey,
210    /// `parse_customer_key_headers` saw a malformed input. `reason` is
211    /// a short human string ("base64 decode of key", "key length",
212    /// "md5 length", "md5 mismatch") for operator log lines — never
213    /// echoed to the client (would leak crypto details).
214    #[error("SSE-C customer-key headers invalid: {reason}")]
215    InvalidCustomerKey { reason: &'static str },
216    /// Client asked for an SSE-C algorithm the gateway doesn't speak.
217    /// AWS S3 only ever defines `AES256` here; surfacing the offending
218    /// string lets us 400 with a useful message.
219    #[error("SSE-C algorithm {algo:?} unsupported (only {SSE_C_ALGORITHM:?} is allowed)")]
220    CustomerKeyAlgorithmUnsupported { algo: String },
221    /// S4E3 body lacks an SSE-C key — caller passed `SseSource::Keyring`
222    /// when decrypting an SSE-C-encrypted object. service.rs should
223    /// translate this into the same "missing customer key" 400 that
224    /// AWS S3 returns when SSE-C headers are absent on a GET.
225    #[error("S4E3 frame requires SseSource::CustomerKey; got Keyring")]
226    CustomerKeyRequired,
227    /// Inverse: client sent SSE-C headers on a GET for an object stored
228    /// without SSE-C. The supplied key has no role in decryption, but
229    /// AWS S3 actually 400s in this case ("expected an unencrypted
230    /// object" / "extraneous SSE-C headers"), so we mirror that.
231    #[error("S4E1/S4E2 frame stored without SSE-C; SseSource::CustomerKey is unexpected")]
232    CustomerKeyUnexpected,
233    // --- v0.5 #28: SSE-KMS specific errors ---
234    /// `decrypt` (sync) was handed an S4E4 body. SSE-KMS unwrap is
235    /// async (it round-trips to the KMS backend), so callers must
236    /// peek the magic with [`peek_magic`] and dispatch S4E4 frames to
237    /// [`decrypt_with_kms`] instead. service.rs's GET handler does
238    /// this; tests / direct callers may hit this if they forget.
239    #[error(
240        "S4E4 (SSE-KMS) body requires async decrypt — call decrypt_with_kms() instead of decrypt()"
241    )]
242    KmsAsyncRequired,
243    /// S4E4 frame is shorter than the minimum-possible header (38
244    /// bytes for an empty `key_id` + empty `wrapped_dek`, which is
245    /// itself impossible — we just sanity-check the floor).
246    #[error("S4E4 frame too short ({got} bytes; need at least {min})")]
247    KmsFrameTooShort { got: usize, min: usize },
248    /// S4E4 declared a `key_id_len` or `wrapped_dek_len` that runs
249    /// past the end of the body. Almost certainly truncation /
250    /// corruption rather than tampering (tampering would fail the
251    /// AES-GCM tag instead).
252    #[error("S4E4 frame field length out of bounds: {what}")]
253    KmsFrameFieldOob { what: &'static str },
254    /// `key_id` field of an S4E4 frame is not valid UTF-8. We require
255    /// UTF-8 because `LocalKms` uses the basename of a `.kek` file
256    /// (which is OS-string-but-typically-UTF-8) and AWS KMS uses ARNs
257    /// (which are ASCII).
258    #[error("S4E4 key_id is not valid UTF-8")]
259    KmsKeyIdNotUtf8,
260    /// service.rs handed `decrypt_with_kms` a `WrappedDek` whose
261    /// `key_id` doesn't match the one stored in the S4E4 frame. This
262    /// is an integration bug (caller is meant to pull the wrapped
263    /// DEK *from the frame*, not from somewhere else), surface as a
264    /// distinct error so it shows up in tests rather than silently
265    /// failing the AES-GCM tag.
266    #[error(
267        "S4E4 SseSource::Kms wrapped DEK key_id {supplied:?} doesn't match frame key_id {stored:?}"
268    )]
269    KmsWrappedDekMismatch { supplied: String, stored: String },
270    /// SSE-KMS path got a non-Kms `SseSource` for an S4E4 body. The
271    /// async dispatch in `decrypt_with_kms` re-derives the source
272    /// internally so this can only happen if a future caller passes
273    /// `SseSource::Keyring` / `CustomerKey` to a path that expected
274    /// `Kms` — kept around for symmetry with the other "wrong source"
275    /// errors.
276    #[error("S4E4 frame requires SseSource::Kms")]
277    KmsRequired,
278    /// Pass-through for [`crate::kms::KmsError`] surfaced from
279    /// `KmsBackend::decrypt_dek` — boxed so the variant stays small.
280    #[error("KMS unwrap: {0}")]
281    KmsBackend(#[from] KmsError),
282    // --- v0.8 #52: S4E5 (chunked SSE-S4) specific errors ---
283    /// AES-GCM auth tag verify failed on chunk `chunk_index` of an
284    /// S4E5 body. Distinct from the all-or-nothing
285    /// [`SseError::DecryptFailed`] because the streaming GET may
286    /// have already emitted earlier chunks to the client by the
287    /// time chunk N fails — operators need the chunk index in audit
288    /// logs to triangulate which byte range was tampered with (or
289    /// which disk sector flipped).
290    #[error(
291        "S4E5 chunk {chunk_index} auth tag verify failed (key mismatch or chunk tampered with)"
292    )]
293    ChunkAuthFailed { chunk_index: u32 },
294    /// Caller asked [`encrypt_v2_chunked`] to use a chunk size of 0
295    /// — nonsensical (would loop forever). Surfaced as an error
296    /// rather than panicking so service.rs can map a bad
297    /// `--sse-chunk-size 0` configuration to a clear startup error.
298    #[error("S4E5 chunk_size must be > 0 (got 0)")]
299    ChunkSizeInvalid,
300    /// S4E5 frame is shorter than the fixed header or declares a
301    /// (chunk_count × per-chunk-bytes) total that overruns the
302    /// body. Almost certainly truncation / corruption — tampering
303    /// with the per-chunk ciphertext or tag would surface as
304    /// [`SseError::ChunkAuthFailed`] instead.
305    #[error("S4E5 frame truncated: {what}")]
306    ChunkFrameTruncated { what: &'static str },
307    // --- v0.8.1 #57: S4E6 (8-byte salt, 24-bit chunk_index) ---
308    /// S4E6 chunk_index is encoded as a 24-bit big-endian field in
309    /// the per-chunk nonce, capping `chunk_count` at
310    /// `2^24 - 1 = 16_777_215`. At the default 1 MiB chunk size that
311    /// is ~16 PiB per object — well past S3's 5 GiB single-object
312    /// ceiling. Surface as a distinct error so a misconfiguration
313    /// (`--sse-chunk-size 1` on a multi-GiB object, say) shows up at
314    /// PUT time with a clear cause rather than a panic at the u32 →
315    /// u24 cast.
316    #[error("S4E6 chunk_count {got} exceeds 24-bit max ({max}) — pick a larger --sse-chunk-size")]
317    ChunkCountTooLarge { got: u32, max: u32 },
318    // --- v0.8.2 #64: pre-allocation guard for chunked SSE frames ---
319    /// `parse_chunked_header` rejected an S4E5 / S4E6 frame because
320    /// the declared `chunk_size × chunk_count` (or the on-disk total
321    /// after adding per-chunk tag overhead and the fixed header) is
322    /// either:
323    ///
324    /// 1. arithmetically nonsensical (the multiplication / addition
325    ///    overflows u64 on a 64-bit host), or
326    /// 2. larger than the gateway's configured `max_body_bytes`
327    ///    (default 5 GiB — AWS S3's single-object PUT ceiling).
328    ///
329    /// This is the **DoS guard** for the chunked path: without it, a
330    /// 24-byte malicious header that claims `chunk_size = u32::MAX`
331    /// and `chunk_count = u32::MAX` would have caused the buffered
332    /// decrypt path to attempt a multi-PB `Vec::with_capacity` (or
333    /// integer-overflow into a tiny alloc + later out-of-bounds
334    /// panic) before any cryptographic work happened. Surface as a
335    /// distinct variant — never echo the offending sizes back to the
336    /// client (kept as a `&'static str` `details` field for operator
337    /// audit logs only) so the response isn't a tampering oracle.
338    #[error("S4E5/S4E6 chunked frame declares an over-large size: {details}")]
339    ChunkFrameTooLarge { details: &'static str },
340}
341
342/// Default cap on a single chunked-SSE frame's declared plaintext
343/// size, used by [`decrypt_chunked_buffered_default`] and the
344/// streaming pre-validation path. Mirrors AWS S3's 5 GiB single-object
345/// PUT ceiling — anything larger is unreachable on the AWS-compatible
346/// API surface and is therefore safe to reject pre-alloc as a malformed
347/// / malicious header (v0.8.2 #64).
348pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
349
350/// 32-byte symmetric key. `bytes` is `pub` so call sites can construct
351/// keys directly from already-validated bytes (e.g. KMS-decrypted DEKs)
352/// without going through the on-disk parser. Hold inside an `Arc` when
353/// sharing across handler tasks — `SseKeyring` does this internally.
354pub struct SseKey {
355    pub bytes: [u8; 32],
356}
357
358impl SseKey {
359    /// Load a 32-byte key from disk. Accepts three on-disk encodings:
360    /// raw 32 bytes, 64-char ASCII hex, or 44-char ASCII base64 (with or
361    /// without padding). Whitespace is trimmed.
362    pub fn from_path(path: &Path) -> Result<Self, SseError> {
363        let raw = std::fs::read(path).map_err(|source| SseError::KeyFileIo {
364            path: path.to_path_buf(),
365            source,
366        })?;
367        Self::from_bytes(&raw)
368    }
369
370    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SseError> {
371        // Try raw first.
372        if bytes.len() == KEY_LEN {
373            let mut k = [0u8; KEY_LEN];
374            k.copy_from_slice(bytes);
375            return Ok(Self { bytes: k });
376        }
377        // Trim whitespace and try hex / base64.
378        let s = std::str::from_utf8(bytes).unwrap_or("").trim();
379        if s.len() == KEY_LEN * 2 && s.chars().all(|c| c.is_ascii_hexdigit()) {
380            let mut k = [0u8; KEY_LEN];
381            for (i, k_byte) in k.iter_mut().enumerate() {
382                *k_byte = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16)
383                    .map_err(|_| SseError::BadKeyLength { got: bytes.len() })?;
384            }
385            return Ok(Self { bytes: k });
386        }
387        if let Ok(decoded) =
388            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s.as_bytes())
389            && decoded.len() == KEY_LEN
390        {
391            let mut k = [0u8; KEY_LEN];
392            k.copy_from_slice(&decoded);
393            return Ok(Self { bytes: k });
394        }
395        Err(SseError::BadKeyLength { got: bytes.len() })
396    }
397
398    fn as_aes_key(&self) -> &Key<Aes256Gcm> {
399        Key::<Aes256Gcm>::from_slice(&self.bytes)
400    }
401}
402
403impl std::fmt::Debug for SseKey {
404    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405        f.debug_struct("SseKey")
406            .field("len", &KEY_LEN)
407            .field("key", &"<redacted>")
408            .finish()
409    }
410}
411
412/// v0.5 #29: a set of `SseKey`s indexed by `u16` key-id, plus a
413/// designated **active** id used for new encryptions. Rotation is just
414/// "add the new key, flip `active` to its id, leave the old keys for
415/// decryption-only". Cheap to clone (`Arc<SseKey>` per slot).
416#[derive(Clone)]
417pub struct SseKeyring {
418    active: u16,
419    keys: HashMap<u16, Arc<SseKey>>,
420}
421
422impl SseKeyring {
423    /// Create a keyring seeded with one key, immediately marked
424    /// active. Add older keys later via [`SseKeyring::add`] so the
425    /// gateway can still decrypt pre-rotation objects.
426    pub fn new(active: u16, key: Arc<SseKey>) -> Self {
427        let mut keys = HashMap::new();
428        keys.insert(active, key);
429        Self { active, keys }
430    }
431
432    /// Insert another key under id `id`. Does NOT change `active`. If
433    /// `id == active`, the slot is overwritten (useful for tests; in
434    /// production prefer minting a fresh id).
435    pub fn add(&mut self, id: u16, key: Arc<SseKey>) {
436        self.keys.insert(id, key);
437    }
438
439    /// Active (id, key) — used by [`encrypt_v2`] to pick the slot for
440    /// new objects.
441    pub fn active(&self) -> (u16, &SseKey) {
442        let id = self.active;
443        let key = self
444            .keys
445            .get(&id)
446            .expect("active key id must be present in keyring (constructor invariant)");
447        (id, key.as_ref())
448    }
449
450    /// Look up a key by id. Returns `None` for unknown ids — caller
451    /// should surface this as [`SseError::KeyNotInKeyring`].
452    pub fn get(&self, id: u16) -> Option<&SseKey> {
453        self.keys.get(&id).map(Arc::as_ref)
454    }
455}
456
457impl std::fmt::Debug for SseKeyring {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        f.debug_struct("SseKeyring")
460            .field("active", &self.active)
461            .field("key_count", &self.keys.len())
462            .field("key_ids", &self.keys.keys().collect::<Vec<_>>())
463            .finish()
464    }
465}
466
467pub type SharedSseKeyring = Arc<SseKeyring>;
468
469/// Encrypt `plaintext` with the given key, producing the on-the-wire
470/// S4E1-framed output: `[magic 4][algo 1][reserved 3][nonce 12][tag 16][ciphertext]`.
471///
472/// Kept for back-compat: v0.4 callers that hand-built an `SseKey` (no
473/// keyring) still get the v1 frame. New code should use
474/// [`encrypt_v2`] which writes S4E2 and supports rotation on read.
475pub fn encrypt(key: &SseKey, plaintext: &[u8]) -> Bytes {
476    let cipher = Aes256Gcm::new(key.as_aes_key());
477    let mut nonce_bytes = [0u8; NONCE_LEN];
478    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
479    let nonce = Nonce::from_slice(&nonce_bytes);
480    // AAD = magic + algo. Tampering with either bumps the tag check.
481    let mut aad = [0u8; 8];
482    aad[..4].copy_from_slice(SSE_MAGIC_V1);
483    aad[4] = ALGO_AES_256_GCM;
484    let ct_with_tag = cipher
485        .encrypt(
486            nonce,
487            Payload {
488                msg: plaintext,
489                aad: &aad,
490            },
491        )
492        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
493    debug_assert!(ct_with_tag.len() >= TAG_LEN);
494    let split = ct_with_tag.len() - TAG_LEN;
495    let (ct, tag) = ct_with_tag.split_at(split);
496
497    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
498    out.extend_from_slice(SSE_MAGIC_V1);
499    out.push(ALGO_AES_256_GCM);
500    out.extend_from_slice(&[0u8; 3]); // reserved
501    out.extend_from_slice(&nonce_bytes);
502    out.extend_from_slice(tag);
503    out.extend_from_slice(ct);
504    Bytes::from(out)
505}
506
507/// v0.5 #29: encrypt under the keyring's currently-active key, writing
508/// an S4E2-framed body (`[magic 4][algo 1][key_id 2 BE][reserved 1]
509/// [nonce 12][tag 16][ciphertext]`). The key-id is included in the
510/// AAD so flipping it fails the auth tag.
511pub fn encrypt_v2(plaintext: &[u8], keyring: &SseKeyring) -> Bytes {
512    let (key_id, key) = keyring.active();
513    let cipher = Aes256Gcm::new(key.as_aes_key());
514    let mut nonce_bytes = [0u8; NONCE_LEN];
515    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
516    let nonce = Nonce::from_slice(&nonce_bytes);
517    let aad = aad_v2(key_id);
518    let ct_with_tag = cipher
519        .encrypt(
520            nonce,
521            Payload {
522                msg: plaintext,
523                aad: &aad,
524            },
525        )
526        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
527    debug_assert!(ct_with_tag.len() >= TAG_LEN);
528    let split = ct_with_tag.len() - TAG_LEN;
529    let (ct, tag) = ct_with_tag.split_at(split);
530
531    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
532    out.extend_from_slice(SSE_MAGIC_V2);
533    out.push(ALGO_AES_256_GCM);
534    out.extend_from_slice(&key_id.to_be_bytes()); // 2B BE key_id
535    out.push(0u8); // 1B reserved
536    out.extend_from_slice(&nonce_bytes);
537    out.extend_from_slice(tag);
538    out.extend_from_slice(ct);
539    Bytes::from(out)
540}
541
542fn aad_v1() -> [u8; 8] {
543    let mut aad = [0u8; 8];
544    aad[..4].copy_from_slice(SSE_MAGIC_V1);
545    aad[4] = ALGO_AES_256_GCM;
546    aad
547}
548
549fn aad_v2(key_id: u16) -> [u8; 8] {
550    let mut aad = [0u8; 8];
551    aad[..4].copy_from_slice(SSE_MAGIC_V2);
552    aad[4] = ALGO_AES_256_GCM;
553    aad[5..7].copy_from_slice(&key_id.to_be_bytes());
554    aad[7] = 0u8;
555    aad
556}
557
558/// AAD for S4E3 = magic (4) + algo (1) + key_md5 (16). Putting the
559/// fingerprint in the AAD means tampering with the stored MD5 (e.g. an
560/// attacker rewriting the header to match a *different* key they
561/// happen to know) breaks the AES-GCM tag — the wrong-key check isn't
562/// just a plain `==` we could be tricked past.
563fn aad_v3(key_md5: &[u8; KEY_MD5_LEN]) -> [u8; 4 + 1 + KEY_MD5_LEN] {
564    let mut aad = [0u8; 4 + 1 + KEY_MD5_LEN];
565    aad[..4].copy_from_slice(SSE_MAGIC_V3);
566    aad[4] = ALGO_AES_256_GCM;
567    aad[5..5 + KEY_MD5_LEN].copy_from_slice(key_md5);
568    aad
569}
570
571/// Parsed + verified SSE-C key material from the three customer
572/// headers. `key_md5` is the MD5 of `key` (we recompute and compare in
573/// [`parse_customer_key_headers`] — clients send their own to catch
574/// transport corruption, but we *trust* our own computation as the
575/// canonical fingerprint in the S4E3 frame).
576#[derive(Clone)]
577pub struct CustomerKeyMaterial {
578    pub key: [u8; KEY_LEN],
579    pub key_md5: [u8; KEY_MD5_LEN],
580}
581
582impl std::fmt::Debug for CustomerKeyMaterial {
583    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
584        // Don't leak the key into logs. The MD5 is a public fingerprint
585        // (S3 puts it on the wire), so that's safe to show.
586        f.debug_struct("CustomerKeyMaterial")
587            .field("key", &"<redacted>")
588            .field("key_md5_hex", &hex_lower(&self.key_md5))
589            .finish()
590    }
591}
592
593fn hex_lower(bytes: &[u8]) -> String {
594    let mut s = String::with_capacity(bytes.len() * 2);
595    for b in bytes {
596        s.push_str(&format!("{b:02x}"));
597    }
598    s
599}
600
601/// Source of the encryption key for [`encrypt_with_source`] /
602/// [`decrypt`]. SSE-S4 (server-managed, rotation-aware) goes through
603/// `Keyring`; SSE-C (customer-supplied) goes through `CustomerKey`.
604///
605/// Borrowed (not owned) so the caller can hold a long-lived
606/// `CustomerKeyMaterial` next to the request and just lend it for the
607/// duration of one PUT/GET.
608#[derive(Debug, Clone, Copy)]
609pub enum SseSource<'a> {
610    /// Server-managed keyring path → produces / consumes S4E1 (legacy)
611    /// or S4E2 (rotation-aware) frames.
612    Keyring(&'a SseKeyring),
613    /// Client-supplied AES-256 key + its MD5 fingerprint → produces /
614    /// consumes S4E3 frames. The server never persists the key; it
615    /// stores `key_md5` only.
616    CustomerKey {
617        key: &'a [u8; KEY_LEN],
618        key_md5: &'a [u8; KEY_MD5_LEN],
619    },
620    /// SSE-KMS envelope → produces / consumes S4E4 frames. The server
621    /// holds a per-object plaintext DEK (from a fresh
622    /// [`KmsBackend::generate_dek`] call) and the wrapped form to
623    /// persist alongside the body. The DEK is dropped after one
624    /// PUT/GET; only the wrapped form survives at rest.
625    Kms {
626        /// 32-byte plaintext DEK, used as the AES-GCM key.
627        dek: &'a [u8; KEY_LEN],
628        /// Wrapped form to persist in the S4E4 frame (PUT) or the one
629        /// read out of the frame (GET, after a successful unwrap).
630        wrapped: &'a WrappedDek,
631    },
632}
633
634/// Back-compat coercion: existing call sites pass `&SseKeyring`
635/// directly to [`decrypt`]. With this `From` impl the generic bound
636/// `Into<SseSource>` accepts `&SseKeyring` without the caller writing
637/// `.into()`, keeping v0.4 / v0.5 #29 service.rs callers compiling
638/// untouched while v0.5 #27 SSE-C callers pass `SseSource::CustomerKey`
639/// explicitly.
640impl<'a> From<&'a SseKeyring> for SseSource<'a> {
641    fn from(kr: &'a SseKeyring) -> Self {
642        SseSource::Keyring(kr)
643    }
644}
645
646/// service.rs holds keyring as `Option<Arc<SseKeyring>>` and unwraps to
647/// `&Arc<SseKeyring>` — let that coerce too, otherwise every existing
648/// call site needs `.as_ref()` boilerplate.
649impl<'a> From<&'a Arc<SseKeyring>> for SseSource<'a> {
650    fn from(kr: &'a Arc<SseKeyring>) -> Self {
651        SseSource::Keyring(kr.as_ref())
652    }
653}
654
655impl<'a> From<&'a CustomerKeyMaterial> for SseSource<'a> {
656    fn from(m: &'a CustomerKeyMaterial) -> Self {
657        SseSource::CustomerKey {
658            key: &m.key,
659            key_md5: &m.key_md5,
660        }
661    }
662}
663
664/// Parse the three AWS SSE-C headers and return verified key material.
665///
666/// Validates, in order:
667/// 1. `algorithm == "AES256"` (the only value AWS S3 defines).
668/// 2. `key_base64` decodes to exactly 32 bytes (AES-256 key length).
669/// 3. `key_md5_base64` decodes to exactly 16 bytes (MD5 digest length).
670/// 4. The actual MD5 of the decoded key matches the supplied MD5.
671///
672/// Step 4 catches transport corruption *and* a class of programming
673/// bugs where the client signs with one key but uploads another. AWS
674/// S3 also performs this check.
675pub fn parse_customer_key_headers(
676    algorithm: &str,
677    key_base64: &str,
678    key_md5_base64: &str,
679) -> Result<CustomerKeyMaterial, SseError> {
680    use base64::Engine as _;
681    if algorithm != SSE_C_ALGORITHM {
682        return Err(SseError::CustomerKeyAlgorithmUnsupported {
683            algo: algorithm.to_string(),
684        });
685    }
686    let key_bytes = base64::engine::general_purpose::STANDARD
687        .decode(key_base64.trim().as_bytes())
688        .map_err(|_| SseError::InvalidCustomerKey {
689            reason: "base64 decode of key",
690        })?;
691    if key_bytes.len() != KEY_LEN {
692        return Err(SseError::InvalidCustomerKey {
693            reason: "key length (must be 32 bytes after base64 decode)",
694        });
695    }
696    let supplied_md5 = base64::engine::general_purpose::STANDARD
697        .decode(key_md5_base64.trim().as_bytes())
698        .map_err(|_| SseError::InvalidCustomerKey {
699            reason: "base64 decode of key MD5",
700        })?;
701    if supplied_md5.len() != KEY_MD5_LEN {
702        return Err(SseError::InvalidCustomerKey {
703            reason: "key MD5 length (must be 16 bytes after base64 decode)",
704        });
705    }
706    let actual_md5 = compute_key_md5(&key_bytes);
707    // Constant-time compare — paranoia, MD5 is non-secret but the key
708    // it identifies is, so we don't want a timing oracle.
709    if !constant_time_eq(&actual_md5, &supplied_md5) {
710        return Err(SseError::InvalidCustomerKey {
711            reason: "supplied MD5 does not match MD5 of supplied key",
712        });
713    }
714    let mut key = [0u8; KEY_LEN];
715    key.copy_from_slice(&key_bytes);
716    let mut key_md5 = [0u8; KEY_MD5_LEN];
717    key_md5.copy_from_slice(&actual_md5);
718    Ok(CustomerKeyMaterial { key, key_md5 })
719}
720
721/// Convenience wrapper — compute the MD5 fingerprint of a 32-byte
722/// customer key. Callers that already have the bytes (e.g. derived
723/// from a KMS unwrap) can use this to construct a
724/// [`CustomerKeyMaterial`] directly.
725pub fn compute_key_md5(key: &[u8]) -> [u8; KEY_MD5_LEN] {
726    let mut h = Md5::new();
727    h.update(key);
728    let out = h.finalize();
729    let mut md5 = [0u8; KEY_MD5_LEN];
730    md5.copy_from_slice(&out);
731    md5
732}
733
734/// `subtle`-free constant-time byte slice equality. We only need this
735/// at one site (MD5 verification) so pulling `subtle` in feels excessive.
736fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
737    if a.len() != b.len() {
738        return false;
739    }
740    let mut acc: u8 = 0;
741    for (x, y) in a.iter().zip(b.iter()) {
742        acc |= x ^ y;
743    }
744    acc == 0
745}
746
747/// v0.5 #27: encrypt under whichever source the caller picked.
748///
749/// - `SseSource::Keyring` → delegates to [`encrypt_v2`] (S4E2 frame).
750/// - `SseSource::CustomerKey` → writes an S4E3 frame (no key persisted,
751///   just the MD5 fingerprint for GET-side verification).
752///
753/// service.rs picks the source per-request: SSE-C headers present →
754/// `CustomerKey`, otherwise (and only when `--sse-s4-key` is wired) →
755/// `Keyring`. Plaintext objects skip this function entirely.
756pub fn encrypt_with_source(plaintext: &[u8], source: SseSource<'_>) -> Bytes {
757    match source {
758        SseSource::Keyring(kr) => encrypt_v2(plaintext, kr),
759        SseSource::CustomerKey { key, key_md5 } => encrypt_v3(plaintext, key, key_md5),
760        SseSource::Kms { dek, wrapped } => encrypt_v4(plaintext, dek, wrapped),
761    }
762}
763
764fn encrypt_v3(plaintext: &[u8], key: &[u8; KEY_LEN], key_md5: &[u8; KEY_MD5_LEN]) -> Bytes {
765    let aes_key = Key::<Aes256Gcm>::from_slice(key);
766    let cipher = Aes256Gcm::new(aes_key);
767    let mut nonce_bytes = [0u8; NONCE_LEN];
768    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
769    let nonce = Nonce::from_slice(&nonce_bytes);
770    let aad = aad_v3(key_md5);
771    let ct_with_tag = cipher
772        .encrypt(
773            nonce,
774            Payload {
775                msg: plaintext,
776                aad: &aad,
777            },
778        )
779        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
780    debug_assert!(ct_with_tag.len() >= TAG_LEN);
781    let split = ct_with_tag.len() - TAG_LEN;
782    let (ct, tag) = ct_with_tag.split_at(split);
783
784    let mut out = Vec::with_capacity(SSE_HEADER_BYTES_V3 + ct.len());
785    out.extend_from_slice(SSE_MAGIC_V3);
786    out.push(ALGO_AES_256_GCM);
787    out.extend_from_slice(key_md5);
788    out.extend_from_slice(&nonce_bytes);
789    out.extend_from_slice(tag);
790    out.extend_from_slice(ct);
791    Bytes::from(out)
792}
793
794/// v0.5 #29 + v0.5 #27: dispatch on the body's magic and decrypt under
795/// whichever source the caller supplied.
796///
797/// - `S4E1` / `S4E2` require `SseSource::Keyring` (return
798///   [`SseError::CustomerKeyRequired`] for `CustomerKey` — service.rs
799///   should map this to "extraneous SSE-C headers" 400).
800/// - `S4E3` requires `SseSource::CustomerKey` (return
801///   [`SseError::CustomerKeyUnexpected`] for `Keyring` — service.rs
802///   should map this to "missing SSE-C headers" 400).
803///
804/// Generic over `Into<SseSource>` so existing `decrypt(body, &keyring)`
805/// call sites compile unchanged via the `From<&SseKeyring>` impl above
806/// — only the new SSE-C path needs to type out
807/// `SseSource::CustomerKey { .. }`.
808///
809/// Distinct errors (`KeyNotInKeyring`, `DecryptFailed`,
810/// `WrongCustomerKey`) let operators tell rotation gaps, ciphertext
811/// tampering, and SSE-C key mismatch apart in audit logs.
812pub fn decrypt<'a, S: Into<SseSource<'a>>>(body: &[u8], source: S) -> Result<Bytes, SseError> {
813    let source = source.into();
814    // Outer short-check uses the smaller of the two header sizes
815    // (S4E1/S4E2 = 36 bytes). Anything below this can't be any valid
816    // SSE frame regardless of magic — keeps back-compat with v0.4 /
817    // v0.5 #29 callers that expected `TooShort` for absurdly short
818    // inputs even when the magic is garbage.
819    if body.len() < SSE_HEADER_BYTES {
820        return Err(SseError::TooShort { got: body.len() });
821    }
822    let mut magic = [0u8; 4];
823    magic.copy_from_slice(&body[..4]);
824    match &magic {
825        m if m == SSE_MAGIC_V1 || m == SSE_MAGIC_V2 => {
826            let keyring = match source {
827                SseSource::Keyring(kr) => kr,
828                SseSource::CustomerKey { .. } => return Err(SseError::CustomerKeyUnexpected),
829                // S4E1/E2 stored under the keyring → SseSource::Kms
830                // is just as nonsensical as CustomerKey here. Re-use
831                // the same "wrong source" error so service.rs can
832                // map both to AWS S3's "extraneous SSE-* headers"
833                // 400.
834                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
835            };
836            if m == SSE_MAGIC_V1 {
837                decrypt_v1_with_keyring(body, keyring)
838            } else {
839                decrypt_v2_with_keyring(body, keyring)
840            }
841        }
842        m if m == SSE_MAGIC_V3 => {
843            // S4E3 has a larger 49-byte header, so re-check.
844            if body.len() < SSE_HEADER_BYTES_V3 {
845                return Err(SseError::TooShort { got: body.len() });
846            }
847            let (key, key_md5) = match source {
848                SseSource::CustomerKey { key, key_md5 } => (key, key_md5),
849                SseSource::Keyring(_) => return Err(SseError::CustomerKeyRequired),
850                SseSource::Kms { .. } => return Err(SseError::CustomerKeyRequired),
851            };
852            decrypt_v3(body, key, key_md5)
853        }
854        m if m == SSE_MAGIC_V4 => {
855            // SSE-KMS unwrap is async (KMS round-trip required).
856            // Caller must dispatch to `decrypt_with_kms` after
857            // peeking the magic — surface this as a distinct error
858            // rather than silently failing.
859            Err(SseError::KmsAsyncRequired)
860        }
861        m if m == SSE_MAGIC_V5 || m == SSE_MAGIC_V6 => {
862            // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): chunked SSE-S4.
863            // Sync back-compat path — verifies + decrypts every
864            // chunk into a single Bytes. Callers that want true
865            // streaming (per-chunk emit) should use
866            // `decrypt_chunked_stream` instead. SSE-C and SSE-KMS
867            // sources are nonsensical here for the same reason as
868            // S4E2 (server-managed keyring only).
869            let keyring = match source {
870                SseSource::Keyring(kr) => kr,
871                SseSource::CustomerKey { .. } => {
872                    return Err(SseError::CustomerKeyUnexpected);
873                }
874                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
875            };
876            // v0.8.2 #64: route through the back-compat wrapper so
877            // the public `decrypt` signature stays stable while the
878            // pre-allocation guard runs against the gateway's
879            // 5 GiB single-object cap. Bespoke callers (e.g. tests
880            // or future API surfaces that want a tighter limit) can
881            // call `decrypt_chunked_buffered` directly.
882            decrypt_chunked_buffered_default(body, keyring)
883        }
884        _ => Err(SseError::BadMagic { got: magic }),
885    }
886}
887
888fn decrypt_v3(
889    body: &[u8],
890    key: &[u8; KEY_LEN],
891    supplied_md5: &[u8; KEY_MD5_LEN],
892) -> Result<Bytes, SseError> {
893    let algo = body[4];
894    if algo != ALGO_AES_256_GCM {
895        return Err(SseError::UnsupportedAlgo { tag: algo });
896    }
897    let mut stored_md5 = [0u8; KEY_MD5_LEN];
898    stored_md5.copy_from_slice(&body[5..5 + KEY_MD5_LEN]);
899    // Cheap fingerprint check first — if the supplied key has a
900    // different MD5 than what was used at PUT, fail fast with a
901    // dedicated error. AES-GCM auth would also catch this (different
902    // key → bad tag) but the bespoke error gives operators an audit
903    // signal distinct from "ciphertext was tampered with".
904    if !constant_time_eq(supplied_md5, &stored_md5) {
905        return Err(SseError::WrongCustomerKey);
906    }
907    let nonce_off = 5 + KEY_MD5_LEN;
908    let tag_off = nonce_off + NONCE_LEN;
909    let mut nonce_bytes = [0u8; NONCE_LEN];
910    nonce_bytes.copy_from_slice(&body[nonce_off..nonce_off + NONCE_LEN]);
911    let mut tag_bytes = [0u8; TAG_LEN];
912    tag_bytes.copy_from_slice(&body[tag_off..tag_off + TAG_LEN]);
913    let ct = &body[SSE_HEADER_BYTES_V3..];
914
915    let aad = aad_v3(&stored_md5);
916    let nonce = Nonce::from_slice(&nonce_bytes);
917    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
918    ct_with_tag.extend_from_slice(ct);
919    ct_with_tag.extend_from_slice(&tag_bytes);
920
921    let aes_key = Key::<Aes256Gcm>::from_slice(key);
922    let cipher = Aes256Gcm::new(aes_key);
923    let plain = cipher
924        .decrypt(
925            nonce,
926            Payload {
927                msg: &ct_with_tag,
928                aad: &aad,
929            },
930        )
931        .map_err(|_| SseError::DecryptFailed)?;
932    Ok(Bytes::from(plain))
933}
934
935/// AAD for S4E4 = magic (4) + algo (1) + key_id_len (1) + key_id +
936/// wrapped_dek_len (4 BE) + wrapped_dek. Putting the variable-length
937/// key_id and wrapped_dek into the AAD means an attacker cannot
938/// rewrite either field to redirect the gateway to a different KEK
939/// or wrapped DEK without invalidating the body's AES-GCM tag.
940///
941/// Length-prefixing key_id and wrapped_dek inside the AAD prevents a
942/// canonicalisation ambiguity: without the length prefix, an
943/// attacker could shift bytes between the two fields and produce the
944/// same AAD bytestream, defeating the per-field tampering check.
945fn aad_v4(key_id: &[u8], wrapped_dek: &[u8]) -> Vec<u8> {
946    let mut aad = Vec::with_capacity(4 + 1 + 1 + key_id.len() + 4 + wrapped_dek.len());
947    aad.extend_from_slice(SSE_MAGIC_V4);
948    aad.push(ALGO_AES_256_GCM);
949    aad.push(key_id.len() as u8);
950    aad.extend_from_slice(key_id);
951    aad.extend_from_slice(&(wrapped_dek.len() as u32).to_be_bytes());
952    aad.extend_from_slice(wrapped_dek);
953    aad
954}
955
956fn encrypt_v4(plaintext: &[u8], dek: &[u8; KEY_LEN], wrapped: &WrappedDek) -> Bytes {
957    // Pre-conditions: key_id must fit in a u8 length prefix and be
958    // non-empty (an empty id means we wouldn't be able to re-fetch
959    // the KEK on GET). wrapped_dek length fits in u32 by the same
960    // logic — at u32::MAX bytes you have bigger problems. We assert
961    // these in debug and silently truncate-or-panic in release; in
962    // practice key_id is a UUID or ARN (<256 chars) and wrapped_dek
963    // is 60 bytes (LocalKms) or ~200 bytes (AWS KMS).
964    assert!(
965        !wrapped.key_id.is_empty() && wrapped.key_id.len() <= u8::MAX as usize,
966        "S4E4 key_id must be 1..=255 bytes (got {})",
967        wrapped.key_id.len()
968    );
969    assert!(
970        wrapped.ciphertext.len() <= u32::MAX as usize,
971        "S4E4 wrapped_dek longer than u32::MAX",
972    );
973
974    let aes_key = Key::<Aes256Gcm>::from_slice(dek);
975    let cipher = Aes256Gcm::new(aes_key);
976    let mut nonce_bytes = [0u8; NONCE_LEN];
977    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
978    let nonce = Nonce::from_slice(&nonce_bytes);
979    let aad = aad_v4(wrapped.key_id.as_bytes(), &wrapped.ciphertext);
980    let ct_with_tag = cipher
981        .encrypt(
982            nonce,
983            Payload {
984                msg: plaintext,
985                aad: &aad,
986            },
987        )
988        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
989    debug_assert!(ct_with_tag.len() >= TAG_LEN);
990    let split = ct_with_tag.len() - TAG_LEN;
991    let (ct, tag) = ct_with_tag.split_at(split);
992
993    let key_id_bytes = wrapped.key_id.as_bytes();
994    let mut out = Vec::with_capacity(
995        4 + 1
996            + 1
997            + key_id_bytes.len()
998            + 4
999            + wrapped.ciphertext.len()
1000            + NONCE_LEN
1001            + TAG_LEN
1002            + ct.len(),
1003    );
1004    out.extend_from_slice(SSE_MAGIC_V4);
1005    out.push(ALGO_AES_256_GCM);
1006    out.push(key_id_bytes.len() as u8);
1007    out.extend_from_slice(key_id_bytes);
1008    out.extend_from_slice(&(wrapped.ciphertext.len() as u32).to_be_bytes());
1009    out.extend_from_slice(&wrapped.ciphertext);
1010    out.extend_from_slice(&nonce_bytes);
1011    out.extend_from_slice(tag);
1012    out.extend_from_slice(ct);
1013    Bytes::from(out)
1014}
1015
1016/// Parsed view of an S4E4 frame's variable-length header. Returned
1017/// by [`parse_s4e4_header`] so both the async [`decrypt_with_kms`]
1018/// path and any future inspection code (e.g. an admin tool that
1019/// needs to enumerate object → KMS-key bindings) can reuse the same
1020/// parser without re-implementing offset math.
1021#[derive(Debug)]
1022pub struct S4E4Header<'a> {
1023    pub key_id: &'a str,
1024    pub wrapped_dek: &'a [u8],
1025    pub nonce: &'a [u8],
1026    pub tag: &'a [u8],
1027    pub ciphertext: &'a [u8],
1028}
1029
1030/// Parse the (variable-length) S4E4 header. Pure byte-shuffling — no
1031/// crypto, no KMS round-trip. Returns errors on truncation /
1032/// out-of-bounds field lengths / non-UTF-8 key_id.
1033pub fn parse_s4e4_header(body: &[u8]) -> Result<S4E4Header<'_>, SseError> {
1034    // Minimum: magic(4) + algo(1) + key_id_len(1) + key_id(>=1) +
1035    // wrapped_dek_len(4) + wrapped_dek(>=1) + nonce(12) + tag(16)
1036    // = 40 bytes. We use a slightly looser floor here (bytes for
1037    // empty fields = 38) and let the per-field bounds checks below
1038    // catch the actual short reads.
1039    const S4E4_MIN: usize = 4 + 1 + 1 + 4 + NONCE_LEN + TAG_LEN; // 38
1040    if body.len() < S4E4_MIN {
1041        return Err(SseError::KmsFrameTooShort {
1042            got: body.len(),
1043            min: S4E4_MIN,
1044        });
1045    }
1046    let magic = &body[..4];
1047    if magic != SSE_MAGIC_V4 {
1048        let mut got = [0u8; 4];
1049        got.copy_from_slice(magic);
1050        return Err(SseError::BadMagic { got });
1051    }
1052    let algo = body[4];
1053    if algo != ALGO_AES_256_GCM {
1054        return Err(SseError::UnsupportedAlgo { tag: algo });
1055    }
1056    let key_id_len = body[5] as usize;
1057    let key_id_off: usize = 6;
1058    let key_id_end = key_id_off
1059        .checked_add(key_id_len)
1060        .ok_or(SseError::KmsFrameFieldOob { what: "key_id_len" })?;
1061    if key_id_end + 4 > body.len() {
1062        return Err(SseError::KmsFrameFieldOob { what: "key_id" });
1063    }
1064    let key_id = std::str::from_utf8(&body[key_id_off..key_id_end])
1065        .map_err(|_| SseError::KmsKeyIdNotUtf8)?;
1066    let wrapped_len_off = key_id_end;
1067    let wrapped_dek_len = u32::from_be_bytes([
1068        body[wrapped_len_off],
1069        body[wrapped_len_off + 1],
1070        body[wrapped_len_off + 2],
1071        body[wrapped_len_off + 3],
1072    ]) as usize;
1073    let wrapped_off = wrapped_len_off + 4;
1074    let wrapped_end =
1075        wrapped_off
1076            .checked_add(wrapped_dek_len)
1077            .ok_or(SseError::KmsFrameFieldOob {
1078                what: "wrapped_dek_len",
1079            })?;
1080    if wrapped_end + NONCE_LEN + TAG_LEN > body.len() {
1081        return Err(SseError::KmsFrameFieldOob {
1082            what: "wrapped_dek",
1083        });
1084    }
1085    let wrapped_dek = &body[wrapped_off..wrapped_end];
1086    let nonce_off = wrapped_end;
1087    let tag_off = nonce_off + NONCE_LEN;
1088    let ct_off = tag_off + TAG_LEN;
1089    let nonce = &body[nonce_off..nonce_off + NONCE_LEN];
1090    let tag = &body[tag_off..tag_off + TAG_LEN];
1091    let ciphertext = &body[ct_off..];
1092    Ok(S4E4Header {
1093        key_id,
1094        wrapped_dek,
1095        nonce,
1096        tag,
1097        ciphertext,
1098    })
1099}
1100
1101/// Async decrypt for S4E4 (SSE-KMS) bodies. Caller supplies the KMS
1102/// backend; this function parses the frame, calls
1103/// `kms.decrypt_dek(...)` to unwrap the DEK, then runs AES-256-GCM
1104/// to recover the plaintext.
1105///
1106/// service.rs's GET handler should peek the magic with [`peek_magic`]
1107/// and dispatch:
1108///
1109/// - `Some("S4E4")` → `decrypt_with_kms(blob, &*kms).await`
1110/// - everything else → existing sync `decrypt(blob, source)`
1111///
1112/// Note: we don't go through `SseSource::Kms` here because the
1113/// wrapped DEK + key_id come from the frame itself, not from the
1114/// request — the `SseSource` is built for sync paths where the
1115/// caller already knows the key.
1116pub async fn decrypt_with_kms(body: &[u8], kms: &dyn KmsBackend) -> Result<Bytes, SseError> {
1117    let hdr = parse_s4e4_header(body)?;
1118    let wrapped = WrappedDek {
1119        key_id: hdr.key_id.to_string(),
1120        ciphertext: hdr.wrapped_dek.to_vec(),
1121    };
1122    let dek_vec = kms.decrypt_dek(&wrapped).await?;
1123    if dek_vec.len() != KEY_LEN {
1124        // KMS returned a non-32-byte plaintext. AES-256 needs exactly
1125        // 32 bytes. This shouldn't happen with `KeySpec=AES_256` but
1126        // surface as a backend error so it's auditable rather than
1127        // panicking.
1128        return Err(SseError::KmsBackend(KmsError::BackendUnavailable {
1129            message: format!(
1130                "KMS returned {} byte DEK; expected {KEY_LEN}",
1131                dek_vec.len()
1132            ),
1133        }));
1134    }
1135    let mut dek = [0u8; KEY_LEN];
1136    dek.copy_from_slice(&dek_vec);
1137
1138    let aad = aad_v4(hdr.key_id.as_bytes(), hdr.wrapped_dek);
1139    let aes_key = Key::<Aes256Gcm>::from_slice(&dek);
1140    let cipher = Aes256Gcm::new(aes_key);
1141    let nonce = Nonce::from_slice(hdr.nonce);
1142    let mut ct_with_tag = Vec::with_capacity(hdr.ciphertext.len() + TAG_LEN);
1143    ct_with_tag.extend_from_slice(hdr.ciphertext);
1144    ct_with_tag.extend_from_slice(hdr.tag);
1145    let plain = cipher
1146        .decrypt(
1147            nonce,
1148            Payload {
1149                msg: &ct_with_tag,
1150                aad: &aad,
1151            },
1152        )
1153        .map_err(|_| SseError::DecryptFailed)?;
1154    Ok(Bytes::from(plain))
1155}
1156
1157fn decrypt_v1_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1158    let algo = body[4];
1159    if algo != ALGO_AES_256_GCM {
1160        return Err(SseError::UnsupportedAlgo { tag: algo });
1161    }
1162    // body[5..8] reserved (must be ignored — v0.4 wrote zeros, but we
1163    // didn't auth them so we can't insist on it).
1164    let mut nonce_bytes = [0u8; NONCE_LEN];
1165    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1166    let mut tag_bytes = [0u8; TAG_LEN];
1167    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1168    let ct = &body[SSE_HEADER_BYTES..];
1169
1170    let aad = aad_v1();
1171    let nonce = Nonce::from_slice(&nonce_bytes);
1172    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1173    ct_with_tag.extend_from_slice(ct);
1174    ct_with_tag.extend_from_slice(&tag_bytes);
1175
1176    // Active key first, then any others. v0.4 deployments that flip to
1177    // v0.5 with their original key as active hit this path on the
1178    // first try for every legacy object.
1179    let (active_id, _active_key) = keyring.active();
1180    let mut ids: Vec<u16> = keyring.keys.keys().copied().collect();
1181    ids.sort_by_key(|id| if *id == active_id { 0 } else { 1 });
1182    for id in ids {
1183        let key = keyring.get(id).expect("id came from keyring iteration");
1184        let cipher = Aes256Gcm::new(key.as_aes_key());
1185        if let Ok(plain) = cipher.decrypt(
1186            nonce,
1187            Payload {
1188                msg: &ct_with_tag,
1189                aad: &aad,
1190            },
1191        ) {
1192            return Ok(Bytes::from(plain));
1193        }
1194    }
1195    Err(SseError::DecryptFailed)
1196}
1197
1198fn decrypt_v2_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1199    let algo = body[4];
1200    if algo != ALGO_AES_256_GCM {
1201        return Err(SseError::UnsupportedAlgo { tag: algo });
1202    }
1203    let key_id = u16::from_be_bytes([body[5], body[6]]);
1204    // body[7] reserved (1B), authenticated as 0 via AAD.
1205    let key = keyring
1206        .get(key_id)
1207        .ok_or(SseError::KeyNotInKeyring { id: key_id })?;
1208    let mut nonce_bytes = [0u8; NONCE_LEN];
1209    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1210    let mut tag_bytes = [0u8; TAG_LEN];
1211    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1212    let ct = &body[SSE_HEADER_BYTES..];
1213
1214    let aad = aad_v2(key_id);
1215    let nonce = Nonce::from_slice(&nonce_bytes);
1216    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1217    ct_with_tag.extend_from_slice(ct);
1218    ct_with_tag.extend_from_slice(&tag_bytes);
1219    let cipher = Aes256Gcm::new(key.as_aes_key());
1220    let plain = cipher
1221        .decrypt(
1222            nonce,
1223            Payload {
1224                msg: &ct_with_tag,
1225                aad: &aad,
1226            },
1227        )
1228        .map_err(|_| SseError::DecryptFailed)?;
1229    Ok(Bytes::from(plain))
1230}
1231
1232/// Detect whether `body` is SSE-S4 encrypted (S4E1, S4E2, S4E3, or
1233/// S4E4) by sniffing the first 4 magic bytes. Used by the GET path
1234/// to decide whether to run decryption before frame parsing.
1235///
1236/// We require a length check that's safe for *any* of the four
1237/// frames — `SSE_HEADER_BYTES` (36) is the smallest valid header
1238/// (S4E1 / S4E2). S4E3 is 49 bytes; S4E4 is variable but always >=
1239/// 38 bytes. The per-frame decrypt path re-checks the appropriate
1240/// minimum, so this 36-byte gate is just a fast rejection of
1241/// obviously-too-short bodies.
1242pub fn looks_encrypted(body: &[u8]) -> bool {
1243    if body.len() < SSE_HEADER_BYTES {
1244        return false;
1245    }
1246    let m = &body[..4];
1247    m == SSE_MAGIC_V1
1248        || m == SSE_MAGIC_V2
1249        || m == SSE_MAGIC_V3
1250        || m == SSE_MAGIC_V4
1251        || m == SSE_MAGIC_V5
1252        || m == SSE_MAGIC_V6
1253}
1254
1255/// Peek the SSE-S4 magic at the front of `body`, returning a
1256/// stringified frame variant identifier or `None` if `body` is not
1257/// recognized as SSE-S4. Used by the GET path to dispatch between
1258/// the sync [`decrypt`] (S4E1/E2/E3) and the async
1259/// [`decrypt_with_kms`] (S4E4).
1260///
1261/// Returns the same length-gated result as [`looks_encrypted`]: any
1262/// body shorter than `SSE_HEADER_BYTES` (36 bytes) returns `None`,
1263/// so the caller can use this as both the "is encrypted" signal and
1264/// the "which frame" signal in one cheap byte-comparison.
1265pub fn peek_magic(body: &[u8]) -> Option<&'static str> {
1266    if body.len() < SSE_HEADER_BYTES {
1267        return None;
1268    }
1269    match &body[..4] {
1270        m if m == SSE_MAGIC_V1 => Some("S4E1"),
1271        m if m == SSE_MAGIC_V2 => Some("S4E2"),
1272        m if m == SSE_MAGIC_V3 => Some("S4E3"),
1273        m if m == SSE_MAGIC_V4 => Some("S4E4"),
1274        // v0.8 #52: chunked SSE-S4. service.rs's GET handler
1275        // dispatches "S4E5" / "S4E6" → `decrypt_chunked_stream`
1276        // for true streaming GET; the sync `decrypt(...)` also
1277        // accepts both (back-compat — buffered concat).
1278        m if m == SSE_MAGIC_V5 => Some("S4E5"),
1279        // v0.8.1 #57: same dispatch as S4E5 — wider salt only.
1280        m if m == SSE_MAGIC_V6 => Some("S4E6"),
1281        _ => None,
1282    }
1283}
1284
1285pub type SharedSseKey = Arc<SseKey>;
1286
1287// ===========================================================================
1288// v0.8 #52 (S4E5, read-only) + v0.8.1 #57 (S4E6, current emit) —
1289// chunked variant of S4E2 for streaming GET
1290// ===========================================================================
1291//
1292// ## S4E5 wire format (v0.8 #52, **read-only as of v0.8.1 #57**)
1293//
1294// ```text
1295// magic         4B    "S4E5"
1296// algo          1B    0x01 (AES-256-GCM)
1297// key_id        2B    BE — keyring slot the active key was at PUT time
1298// reserved      1B    0x00
1299// chunk_size    4B    BE — plaintext bytes per chunk (final chunk may be smaller)
1300// chunk_count   4B    BE — total chunks (always >= 1; empty plaintext = 1 zero-byte chunk)
1301// salt          4B    random per-PUT, mixed into every nonce
1302// [chunk_count] × {
1303//   tag         16B   AES-GCM auth tag for this chunk
1304//   ciphertext  N B   chunk_size bytes (final chunk: 0..=chunk_size bytes)
1305// }
1306// ```
1307//
1308// Fixed header = 20 bytes ([`S4E5_HEADER_BYTES`]).
1309//
1310// ## S4E6 wire format (v0.8.1 #57, current PUT emit)
1311//
1312// ```text
1313// magic         4B    "S4E6"
1314// algo          1B    0x01 (AES-256-GCM)
1315// key_id        2B    BE
1316// reserved      1B    0x00
1317// chunk_size    4B    BE
1318// chunk_count   4B    BE
1319// salt          8B    random per-PUT  ← 4B → 8B widened
1320// [chunk_count] × { tag 16B, ciphertext N B }
1321// ```
1322//
1323// Fixed header = 24 bytes ([`S4E6_HEADER_BYTES`]). Chunk array
1324// layout is byte-identical to S4E5; only the header (salt 4 → 8)
1325// and the nonce/AAD derivation differ.
1326//
1327// ## Per-chunk overhead (both S4E5 and S4E6)
1328//
1329// 16 bytes — just the AES-GCM auth tag. AES-GCM is CTR-mode, so
1330// `ciphertext.len() == plaintext.len()`. Total overhead for an
1331// N-byte plaintext at chunk size C: `header + ceil(N/C) * 16`.
1332//
1333// ## S4E5 nonce / AAD (read-only)
1334//
1335// ```text
1336// nonce_v5[0..4]  = b"E5\x00\x00"
1337// nonce_v5[4..8]  = salt (4 B)
1338// nonce_v5[8..12] = chunk_index BE (u32)
1339//
1340// aad_v5 = b"S4E5" || algo (1) || chunk_index BE (4) || total BE (4)
1341//        || key_id BE (2) || salt (4)
1342// ```
1343//
1344// Birthday-collision threshold on the 4-byte salt: ~50% at ~65,536
1345// distinct PUTs under the same key — the security regression that
1346// motivated #57.
1347//
1348// ## S4E6 nonce / AAD (current emit)
1349//
1350// ```text
1351// nonce_v6[0]     = b'E'                   (1 B fixed prefix)
1352// nonce_v6[1..9]  = salt (8 B)             (per-PUT random from OsRng)
1353// nonce_v6[9..12] = chunk_index BE (u24)   (3 B → max 16_777_215 chunks)
1354//
1355// aad_v6 = b"S4E6" || algo (1) || chunk_index BE (4) || total BE (4)
1356//        || key_id BE (2) || salt (8)
1357// ```
1358//
1359// Wider salt: birthday collision ~50% at ~2^32 = ~4.3 billion
1360// PUTs/key — four orders of magnitude over S4E5.
1361//
1362// chunk_index narrows from 32-bit to 24-bit, capping `chunk_count`
1363// at `2^24 - 1 = 16_777_215`. At the default `--sse-chunk-size
1364// 1048576` (1 MiB) that's ~16 PiB per object — three orders of
1365// magnitude over S3's 5 GiB single-object cap. Smaller chunk sizes
1366// need to be sized carefully: e.g. `--sse-chunk-size 64` on a
1367// > 1 GiB object would exceed the cap (1 GiB / 64 B = 16M+1
1368// chunks); such configurations surface
1369// [`SseError::ChunkCountTooLarge`] at PUT time rather than
1370// silently truncating.
1371//
1372// AAD on both variants includes the chunk index + total so chunk
1373// reordering or dropping fails the per-chunk tag, plus key_id +
1374// salt so header tampering also fails auth.
1375
1376/// Fixed header size of an S4E5 frame, before any chunks. `magic 4 +
1377/// algo 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 +
1378/// salt 4` = 20 bytes.
1379pub const S4E5_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 4; // = 20
1380
1381/// Per-chunk overhead inside an S4E5 / S4E6 frame: just the AES-GCM
1382/// auth tag. `ciphertext.len() == plaintext.len()` (CTR mode), so a
1383/// chunk of N plaintext bytes costs N + 16 on disk.
1384pub const S4E5_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1385
1386/// v0.8.1 #57: fixed header size of an S4E6 frame. Same layout as
1387/// S4E5 except the per-PUT salt widens 4 → 8 bytes: `magic 4 + algo
1388/// 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 + salt
1389/// 8` = 24 bytes.
1390pub const S4E6_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 8; // = 24
1391
1392/// v0.8.1 #57: per-chunk overhead for S4E6. Identical to S4E5
1393/// (same AES-GCM tag size). Re-exported as a distinct const so call
1394/// sites that compute on-disk size for S4E6 specifically can spell
1395/// the magic clearly in their arithmetic.
1396pub const S4E6_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1397
1398/// v0.8.1 #57: maximum `chunk_count` that fits in the S4E6 nonce's
1399/// 24-bit chunk_index field. At 1 MiB chunks this is ~16 PiB per
1400/// object — three orders of magnitude over S3's 5 GiB single-object
1401/// cap, so it's not a practical limit at the default chunk size.
1402pub const S4E6_MAX_CHUNK_COUNT: u32 = (1u32 << 24) - 1; // 16_777_215
1403
1404/// 4-byte fixed prefix of every S4E5 nonce. Distinct from the bytes
1405/// a random S4E1/E2 nonce could plausibly start with so debugging
1406/// dumps can immediately tell "this is a chunked nonce" from the
1407/// first 4 bytes.
1408const S4E5_NONCE_TAG: [u8; 4] = [b'E', b'5', 0, 0];
1409
1410/// 1-byte fixed prefix of every S4E6 nonce. Trades 3 of S4E5's 4
1411/// "tag" bytes for 4 extra salt bytes (4 → 8) and 0 of the chunk
1412/// index bytes (24-bit instead of 32-bit). The remaining `b'E'`
1413/// keeps debug dumps recognizable as "chunked SSE-S4 nonce".
1414const S4E6_NONCE_PREFIX: u8 = b'E';
1415
1416/// Variant tag for the chunked-frame helpers. Selects the nonce +
1417/// AAD derivation (and incidentally the salt width). The
1418/// chunk-array layout is byte-identical for both — only the header
1419/// size and the nonce/AAD derivation differ.
1420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1421enum ChunkedVariant {
1422    V5,
1423    V6,
1424}
1425
1426impl ChunkedVariant {
1427    fn header_bytes(self) -> usize {
1428        match self {
1429            ChunkedVariant::V5 => S4E5_HEADER_BYTES,
1430            ChunkedVariant::V6 => S4E6_HEADER_BYTES,
1431        }
1432    }
1433}
1434
1435/// Build the per-chunk AAD for an S4E5 chunk. Includes magic + algo
1436/// plus the structural chunk_index/total_chunks (so chunk reordering
1437/// fails auth) plus key_id + salt (so header tampering — flipping
1438/// key_id or salt — also fails auth).
1439fn aad_v5(
1440    chunk_index: u32,
1441    total_chunks: u32,
1442    key_id: u16,
1443    salt: &[u8; 4],
1444) -> [u8; 4 + 1 + 4 + 4 + 2 + 4] {
1445    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 4]; // = 19
1446    aad[..4].copy_from_slice(SSE_MAGIC_V5);
1447    aad[4] = ALGO_AES_256_GCM;
1448    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1449    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1450    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1451    aad[15..19].copy_from_slice(salt);
1452    aad
1453}
1454
1455/// v0.8.1 #57: per-chunk AAD for S4E6. Same structural fields as
1456/// [`aad_v5`] (magic + algo + chunk_index + total + key_id + salt)
1457/// but with the wider 8-byte salt and the new `b"S4E6"` magic, so
1458/// an attacker can't strip the version tag and replay an S4E5
1459/// nonce/tag against an S4E6 frame.
1460fn aad_v6(
1461    chunk_index: u32,
1462    total_chunks: u32,
1463    key_id: u16,
1464    salt: &[u8; 8],
1465) -> [u8; 4 + 1 + 4 + 4 + 2 + 8] {
1466    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 8]; // = 23
1467    aad[..4].copy_from_slice(SSE_MAGIC_V6);
1468    aad[4] = ALGO_AES_256_GCM;
1469    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1470    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1471    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1472    aad[15..23].copy_from_slice(salt);
1473    aad
1474}
1475
1476/// Derive the 12-byte AES-GCM nonce for chunk `chunk_index` from the
1477/// per-PUT `salt`. Pure function; no RNG state — the same `(salt,
1478/// chunk_index)` always yields the same nonce, which is the whole
1479/// point: GET reads `salt` from the header and walks the chunks
1480/// without storing 12 bytes of nonce per chunk.
1481fn nonce_v5(salt: &[u8; 4], chunk_index: u32) -> [u8; NONCE_LEN] {
1482    let mut n = [0u8; NONCE_LEN];
1483    n[..4].copy_from_slice(&S4E5_NONCE_TAG);
1484    n[4..8].copy_from_slice(salt);
1485    n[8..12].copy_from_slice(&chunk_index.to_be_bytes());
1486    n
1487}
1488
1489/// v0.8.1 #57: derive the 12-byte AES-GCM nonce for an S4E6 chunk:
1490/// `b'E'(1) || salt(8) || chunk_index_BE_u24(3)`. The 24-bit
1491/// chunk_index caps `chunk_count` at 16,777,215 — see
1492/// [`S4E6_MAX_CHUNK_COUNT`]. The pre-encrypt path enforces this cap
1493/// and surfaces [`SseError::ChunkCountTooLarge`], so this function
1494/// only ever sees `chunk_index <= 0xFF_FFFF` (the leading byte of
1495/// the BE u32 is dropped).
1496fn nonce_v6(salt: &[u8; 8], chunk_index: u32) -> [u8; NONCE_LEN] {
1497    debug_assert!(
1498        chunk_index <= S4E6_MAX_CHUNK_COUNT,
1499        "S4E6 chunk_index {chunk_index} exceeds 24-bit cap (caller MUST validate)",
1500    );
1501    let mut n = [0u8; NONCE_LEN];
1502    n[0] = S4E6_NONCE_PREFIX;
1503    n[1..9].copy_from_slice(salt);
1504    let be = chunk_index.to_be_bytes(); // [b3, b2, b1, b0] of u32
1505    // Take the low 3 bytes (b2, b1, b0) — the high byte is 0 by the
1506    // S4E6_MAX_CHUNK_COUNT cap above.
1507    n[9..12].copy_from_slice(&be[1..4]);
1508    n
1509}
1510
1511/// v0.8 #52 / v0.8.1 #57: encrypt `plaintext` under `keyring`'s
1512/// active key, sliced into independently-sealed AES-GCM chunks of
1513/// `chunk_size` plaintext bytes each. Returns the on-the-wire
1514/// **S4E6** frame (v0.8.1 #57 widened the per-PUT salt 4 B → 8 B;
1515/// the S4E5 emit path was retired but the [`decrypt`] /
1516/// [`decrypt_chunked_stream`] paths still read S4E5 objects for
1517/// back-compat).
1518///
1519/// Errors:
1520/// - [`SseError::ChunkSizeInvalid`] if `chunk_size == 0`.
1521/// - [`SseError::ChunkCountTooLarge`] if
1522///   `ceil(plaintext.len() / chunk_size) > 16_777_215` (the S4E6
1523///   24-bit chunk_index cap; pick a larger `--sse-chunk-size`).
1524///
1525/// Empty plaintext is permitted and produces a frame with
1526/// `chunk_count = 1, ciphertext_len = 0` (one all-tag chunk). That
1527/// keeps the GET chunk-walk loop simpler — it never has to
1528/// special-case zero chunks.
1529///
1530/// `chunk_size` is the *plaintext* bytes per chunk; the on-disk
1531/// ciphertext per chunk is the same number (AES-GCM is CTR-mode),
1532/// plus the 16-byte tag prepended.
1533pub fn encrypt_v2_chunked(
1534    plaintext: &[u8],
1535    keyring: &SseKeyring,
1536    chunk_size: usize,
1537) -> Result<Bytes, SseError> {
1538    if chunk_size == 0 {
1539        return Err(SseError::ChunkSizeInvalid);
1540    }
1541    let (key_id, key) = keyring.active();
1542    let cipher = Aes256Gcm::new(key.as_aes_key());
1543    let mut salt = [0u8; 8];
1544    rand::rngs::OsRng.fill_bytes(&mut salt);
1545
1546    // Always emit at least one chunk (so an empty plaintext still
1547    // has a well-defined header → chunk_count >= 1 invariant).
1548    let chunk_count_usize = if plaintext.is_empty() {
1549        1
1550    } else {
1551        plaintext.len().div_ceil(chunk_size)
1552    };
1553    // Saturating-cast to u32 so we report ChunkCountTooLarge cleanly
1554    // for inputs that would overflow u32 too (would need a > 16 EiB
1555    // plaintext at chunk_size = 1 — astronomical, but defensive).
1556    let chunk_count: u32 = u32::try_from(chunk_count_usize).unwrap_or(u32::MAX);
1557    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1558        return Err(SseError::ChunkCountTooLarge {
1559            got: chunk_count,
1560            max: S4E6_MAX_CHUNK_COUNT,
1561        });
1562    }
1563
1564    let mut out = Vec::with_capacity(
1565        S4E6_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E6_PER_CHUNK_OVERHEAD),
1566    );
1567    out.extend_from_slice(SSE_MAGIC_V6);
1568    out.push(ALGO_AES_256_GCM);
1569    out.extend_from_slice(&key_id.to_be_bytes());
1570    out.push(0u8); // reserved
1571    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
1572    out.extend_from_slice(&chunk_count.to_be_bytes());
1573    out.extend_from_slice(&salt);
1574
1575    for i in 0..chunk_count {
1576        let off = (i as usize).saturating_mul(chunk_size);
1577        let end = off.saturating_add(chunk_size).min(plaintext.len());
1578        let chunk_pt: &[u8] = if off >= plaintext.len() {
1579            // Empty-plaintext / past-end (only the single-chunk
1580            // empty-plaintext case lands here).
1581            &[]
1582        } else {
1583            &plaintext[off..end]
1584        };
1585        let nonce_bytes = nonce_v6(&salt, i);
1586        let nonce = Nonce::from_slice(&nonce_bytes);
1587        let aad = aad_v6(i, chunk_count, key_id, &salt);
1588        let ct_with_tag = cipher
1589            .encrypt(
1590                nonce,
1591                Payload {
1592                    msg: chunk_pt,
1593                    aad: &aad,
1594                },
1595            )
1596            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
1597        debug_assert!(ct_with_tag.len() >= TAG_LEN);
1598        let split = ct_with_tag.len() - TAG_LEN;
1599        let (ct, tag) = ct_with_tag.split_at(split);
1600        out.extend_from_slice(tag);
1601        out.extend_from_slice(ct);
1602        crate::metrics::record_sse_streaming_chunk("encrypt");
1603    }
1604    Ok(Bytes::from(out))
1605}
1606
1607/// Salt material for a chunked frame — branches on variant so the
1608/// shared chunk-walking loop can carry both 4-byte (S4E5) and
1609/// 8-byte (S4E6) salts without an extra heap alloc.
1610#[derive(Debug, Clone, Copy)]
1611enum ChunkedSalt {
1612    V5([u8; 4]),
1613    V6([u8; 8]),
1614}
1615
1616/// Parsed S4E5 / S4E6 header — fixed-layout fields. Used by the
1617/// buffered ([`decrypt_chunked_buffered`]) and streaming
1618/// ([`decrypt_chunked_stream`]) paths to share frame validation
1619/// across both variants.
1620#[derive(Debug, Clone, Copy)]
1621struct ChunkedHeader {
1622    /// Used only by tests today (asserts on which frame variant
1623    /// parsed); in production the variant is implicit in
1624    /// `salt`'s ChunkedSalt arm. Kept as a field rather than
1625    /// re-deriving from `salt` so the parser writes one source of
1626    /// truth.
1627    #[allow(dead_code)]
1628    variant: ChunkedVariant,
1629    key_id: u16,
1630    chunk_size: u32,
1631    chunk_count: u32,
1632    salt: ChunkedSalt,
1633    /// Byte offset where the chunk array starts (always
1634    /// `variant.header_bytes()`; carried in the struct so call sites
1635    /// don't have to re-derive it from the variant tag).
1636    chunks_offset: usize,
1637}
1638
1639/// Parsed view of an S4E6 frame's fixed header. Public mirror of
1640/// the S4E4 parser — useful for admin tools or future inspectors
1641/// that want to enumerate object → key_id bindings without
1642/// re-implementing the offset math. The `salt` borrow keeps
1643/// allocations to zero (the slice points back into the input
1644/// buffer).
1645#[derive(Debug, Clone, Copy)]
1646pub struct S4E6Header<'a> {
1647    pub key_id: u16,
1648    pub chunk_size: u32,
1649    pub chunk_count: u32,
1650    pub salt: &'a [u8; 8],
1651}
1652
1653/// Pure byte-shuffle parser for an S4E6 fixed header (24 bytes). No
1654/// crypto, no keyring lookup. Errors on truncation, wrong magic,
1655/// unsupported algo, or zero `chunk_size` / `chunk_count`.
1656pub fn parse_s4e6_header(blob: &[u8]) -> Result<S4E6Header<'_>, SseError> {
1657    if blob.len() < S4E6_HEADER_BYTES {
1658        return Err(SseError::ChunkFrameTruncated { what: "header" });
1659    }
1660    if &blob[..4] != SSE_MAGIC_V6 {
1661        let mut got = [0u8; 4];
1662        got.copy_from_slice(&blob[..4]);
1663        return Err(SseError::BadMagic { got });
1664    }
1665    let algo = blob[4];
1666    if algo != ALGO_AES_256_GCM {
1667        return Err(SseError::UnsupportedAlgo { tag: algo });
1668    }
1669    let key_id = u16::from_be_bytes([blob[5], blob[6]]);
1670    // blob[7] = reserved (0; authenticated as 0 via AAD).
1671    let chunk_size = u32::from_be_bytes([blob[8], blob[9], blob[10], blob[11]]);
1672    let chunk_count = u32::from_be_bytes([blob[12], blob[13], blob[14], blob[15]]);
1673    if chunk_size == 0 {
1674        return Err(SseError::ChunkSizeInvalid);
1675    }
1676    if chunk_count == 0 {
1677        return Err(SseError::ChunkFrameTruncated {
1678            what: "chunk_count == 0",
1679        });
1680    }
1681    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1682        return Err(SseError::ChunkCountTooLarge {
1683            got: chunk_count,
1684            max: S4E6_MAX_CHUNK_COUNT,
1685        });
1686    }
1687    let salt: &[u8; 8] = (&blob[16..24]).try_into().expect("8B salt slice");
1688    Ok(S4E6Header {
1689        key_id,
1690        chunk_size,
1691        chunk_count,
1692        salt,
1693    })
1694}
1695
1696fn parse_chunked_header(body: &[u8], max_body_bytes: usize) -> Result<ChunkedHeader, SseError> {
1697    if body.len() < 4 {
1698        return Err(SseError::ChunkFrameTruncated { what: "magic" });
1699    }
1700    let magic = &body[..4];
1701    let variant = if magic == SSE_MAGIC_V5 {
1702        ChunkedVariant::V5
1703    } else if magic == SSE_MAGIC_V6 {
1704        ChunkedVariant::V6
1705    } else {
1706        let mut got = [0u8; 4];
1707        got.copy_from_slice(magic);
1708        return Err(SseError::BadMagic { got });
1709    };
1710    let header_bytes = variant.header_bytes();
1711    if body.len() < header_bytes {
1712        return Err(SseError::ChunkFrameTruncated { what: "header" });
1713    }
1714    let algo = body[4];
1715    if algo != ALGO_AES_256_GCM {
1716        return Err(SseError::UnsupportedAlgo { tag: algo });
1717    }
1718    let key_id = u16::from_be_bytes([body[5], body[6]]);
1719    // body[7] = reserved (must be 0; authenticated as 0 via AAD).
1720    let chunk_size = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
1721    let chunk_count = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
1722    if chunk_size == 0 {
1723        return Err(SseError::ChunkSizeInvalid);
1724    }
1725    if chunk_count == 0 {
1726        return Err(SseError::ChunkFrameTruncated {
1727            what: "chunk_count == 0",
1728        });
1729    }
1730    let salt = match variant {
1731        ChunkedVariant::V5 => {
1732            let mut s = [0u8; 4];
1733            s.copy_from_slice(&body[16..20]);
1734            ChunkedSalt::V5(s)
1735        }
1736        ChunkedVariant::V6 => {
1737            // v0.8.1 #57 sanity check: the encoder enforces this cap,
1738            // but a tampered / malicious frame could declare a huge
1739            // chunk_count that would loop the walker 16M+ times if
1740            // we trusted it. Reject early.
1741            if chunk_count > S4E6_MAX_CHUNK_COUNT {
1742                return Err(SseError::ChunkCountTooLarge {
1743                    got: chunk_count,
1744                    max: S4E6_MAX_CHUNK_COUNT,
1745                });
1746            }
1747            let mut s = [0u8; 8];
1748            s.copy_from_slice(&body[16..24]);
1749            ChunkedSalt::V6(s)
1750        }
1751    };
1752
1753    // v0.8.2 #64 — DoS guard. Pre-validate the declared
1754    // (chunk_size × chunk_count) plaintext size in u64 before *any*
1755    // downstream allocation or chunk-walk loop. Rejects:
1756    //   1. arithmetic that would overflow u64 (e.g. chunk_size =
1757    //      u32::MAX, chunk_count = u32::MAX → 2^64 ≫ u64::MAX),
1758    //   2. a body that is *larger* than the maximum a header with
1759    //      these declared sizes could plausibly carry (i.e. trailing
1760    //      bytes after the final chunk → corruption / append attack),
1761    //      and
1762    //   3. a plaintext size larger than the caller's configured
1763    //      `max_body_bytes` (default 5 GiB).
1764    //
1765    // We do NOT require the body to be `>= max_total` here — the
1766    // final chunk may legitimately hold fewer than `chunk_size`
1767    // plaintext bytes (encoder uses
1768    // `chunk_count = ceil(plaintext.len() / chunk_size)`, so the last
1769    // chunk holds the remainder). The walker handles that
1770    // variable-length tail; truncation of the final chunk's tag /
1771    // ciphertext surfaces as `ChunkFrameTruncated` once the walker
1772    // gets there. The pre-alloc guard's job is the *upper-bound*
1773    // checks: kill obviously-impossible shapes before we
1774    // `Vec::with_capacity(chunk_size × chunk_count)`.
1775    //
1776    // All paths return concrete error variants; nothing panics on
1777    // adversarial input. The error strings carry only constant
1778    // operator-readable labels (no echoed numbers) so the response is
1779    // not a tampering oracle for the attacker.
1780    let chunk_size_u64 = chunk_size as u64;
1781    let chunk_count_u64 = chunk_count as u64;
1782    let expected_plain_size =
1783        chunk_size_u64
1784            .checked_mul(chunk_count_u64)
1785            .ok_or(SseError::ChunkFrameTooLarge {
1786                details: "chunk_size * chunk_count overflows u64",
1787            })?;
1788    let per_chunk_overhead = S4E5_PER_CHUNK_OVERHEAD as u64; // = 16 (AES-GCM tag)
1789    let total_tag_overhead =
1790        per_chunk_overhead
1791            .checked_mul(chunk_count_u64)
1792            .ok_or(SseError::ChunkFrameTooLarge {
1793                details: "tag_len * chunk_count overflows u64",
1794            })?;
1795    let max_total = expected_plain_size
1796        .checked_add(total_tag_overhead)
1797        .and_then(|t| t.checked_add(header_bytes as u64))
1798        .ok_or(SseError::ChunkFrameTooLarge {
1799            details: "header + plaintext + tag overhead overflows u64",
1800        })?;
1801    // Body cannot be *larger* than what the declared geometry could
1802    // legitimately produce. Any extra bytes past the final chunk are
1803    // either trailing junk (corruption) or an append attack — kill
1804    // them here so the walker doesn't even start verifying chunks
1805    // for an obviously-malformed body. (The walker's own end-of-loop
1806    // `cursor != body.len()` check also catches this; the pre-alloc
1807    // guard saves the chunk-walk worth of AES-GCM verifies in the
1808    // hostile case.)
1809    if (body.len() as u64) > max_total {
1810        return Err(SseError::ChunkFrameTruncated {
1811            what: "trailing bytes past declared chunk geometry",
1812        });
1813    }
1814    // The actual DoS guard: cap on declared plaintext. If the header
1815    // claims more than the operator's configured single-object
1816    // ceiling, refuse before the buffered path's
1817    // `Vec::with_capacity(chunk_size * chunk_count)` runs.
1818    if expected_plain_size > max_body_bytes as u64 {
1819        return Err(SseError::ChunkFrameTooLarge {
1820            details: "declared plaintext exceeds gateway max_body_bytes",
1821        });
1822    }
1823
1824    Ok(ChunkedHeader {
1825        variant,
1826        key_id,
1827        chunk_size,
1828        chunk_count,
1829        salt,
1830        chunks_offset: header_bytes,
1831    })
1832}
1833
1834/// Decrypt one chunk under either the S4E5 or S4E6 derivation. Used
1835/// by both the buffered and streaming paths so AAD / nonce
1836/// derivation lives in exactly one place.
1837fn decrypt_chunked_chunk(
1838    cipher: &Aes256Gcm,
1839    chunk_index: u32,
1840    chunk_count: u32,
1841    key_id: u16,
1842    salt: &ChunkedSalt,
1843    tag: &[u8; TAG_LEN],
1844    ct: &[u8],
1845) -> Result<Bytes, SseError> {
1846    let nonce_bytes = match salt {
1847        ChunkedSalt::V5(s) => nonce_v5(s, chunk_index),
1848        ChunkedSalt::V6(s) => nonce_v6(s, chunk_index),
1849    };
1850    let nonce = Nonce::from_slice(&nonce_bytes);
1851    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1852    ct_with_tag.extend_from_slice(ct);
1853    ct_with_tag.extend_from_slice(tag);
1854    let result = match salt {
1855        ChunkedSalt::V5(s) => {
1856            let aad = aad_v5(chunk_index, chunk_count, key_id, s);
1857            cipher.decrypt(
1858                nonce,
1859                Payload {
1860                    msg: &ct_with_tag,
1861                    aad: &aad,
1862                },
1863            )
1864        }
1865        ChunkedSalt::V6(s) => {
1866            let aad = aad_v6(chunk_index, chunk_count, key_id, s);
1867            cipher.decrypt(
1868                nonce,
1869                Payload {
1870                    msg: &ct_with_tag,
1871                    aad: &aad,
1872                },
1873            )
1874        }
1875    };
1876    result
1877        .map(Bytes::from)
1878        .map_err(|_| SseError::ChunkAuthFailed { chunk_index })
1879}
1880
1881/// Walk an S4E5 / S4E6 body chunk-by-chunk, calling `emit` on each
1882/// successfully-verified plaintext chunk. Returns immediately on the
1883/// first chunk that fails auth or is truncated. Shared core between
1884/// the buffered ([`decrypt_chunked_buffered`]) and streaming
1885/// ([`decrypt_chunked_stream`]) paths.
1886fn walk_chunked<F: FnMut(Bytes) -> Result<(), SseError>>(
1887    body: &[u8],
1888    keyring: &SseKeyring,
1889    max_body_bytes: usize,
1890    mut emit: F,
1891) -> Result<(), SseError> {
1892    let hdr = parse_chunked_header(body, max_body_bytes)?;
1893    let key = keyring
1894        .get(hdr.key_id)
1895        .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1896    let cipher = Aes256Gcm::new(key.as_aes_key());
1897
1898    let mut cursor = hdr.chunks_offset;
1899    let chunk_size = hdr.chunk_size as usize;
1900    for i in 0..hdr.chunk_count {
1901        if cursor + TAG_LEN > body.len() {
1902            return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1903        }
1904        let tag_off = cursor;
1905        let ct_off = tag_off + TAG_LEN;
1906        let is_last = i + 1 == hdr.chunk_count;
1907        let ct_len = if is_last {
1908            if ct_off > body.len() {
1909                return Err(SseError::ChunkFrameTruncated {
1910                    what: "final chunk ciphertext",
1911                });
1912            }
1913            let remaining = body.len() - ct_off;
1914            if remaining > chunk_size {
1915                return Err(SseError::ChunkFrameTruncated {
1916                    what: "trailing bytes after final chunk",
1917                });
1918            }
1919            remaining
1920        } else {
1921            chunk_size
1922        };
1923        let ct_end = ct_off + ct_len;
1924        if ct_end > body.len() {
1925            return Err(SseError::ChunkFrameTruncated {
1926                what: "chunk ciphertext",
1927            });
1928        }
1929        let mut tag = [0u8; TAG_LEN];
1930        tag.copy_from_slice(&body[tag_off..ct_off]);
1931        let ct = &body[ct_off..ct_end];
1932        let plain =
1933            decrypt_chunked_chunk(&cipher, i, hdr.chunk_count, hdr.key_id, &hdr.salt, &tag, ct)?;
1934        crate::metrics::record_sse_streaming_chunk("decrypt");
1935        emit(plain)?;
1936        cursor = ct_end;
1937    }
1938    if cursor != body.len() {
1939        return Err(SseError::ChunkFrameTruncated {
1940            what: "trailing bytes after declared chunk_count",
1941        });
1942    }
1943    Ok(())
1944}
1945
1946/// Sync back-compat path: decrypt every chunk and concatenate into
1947/// a single `Bytes`. Memory peak = full plaintext (defeats the
1948/// point of S4E5/S4E6 streaming, but useful for callers that already
1949/// need the whole body — e.g. server-side restream-rewrite paths or
1950/// unit tests). Accepts both S4E5 (legacy) and S4E6 (current) bodies.
1951///
1952/// `max_body_bytes` (v0.8.2 #64) caps the declared plaintext size in
1953/// the header **before any allocation**. A malicious / corrupted
1954/// frame that claims a multi-PB plaintext is rejected as
1955/// [`SseError::ChunkFrameTooLarge`] with no `Vec::with_capacity` call
1956/// behind it. Callers without a bespoke cap should use
1957/// [`decrypt_chunked_buffered_default`] (5 GiB cap).
1958pub fn decrypt_chunked_buffered(
1959    body: &[u8],
1960    keyring: &SseKeyring,
1961    max_body_bytes: usize,
1962) -> Result<Bytes, SseError> {
1963    let hdr = parse_chunked_header(body, max_body_bytes)?;
1964    // Header is validated above: chunk_size * chunk_count fits u64
1965    // and is ≤ max_body_bytes (≤ 5 GiB on this gateway), so the
1966    // `as usize` casts cannot truncate on a 64-bit host. The
1967    // `Vec::with_capacity` is therefore bounded by the same cap that
1968    // protects the rest of the gateway from oversized PUTs / GETs.
1969    let mut out = Vec::with_capacity(hdr.chunk_size as usize * hdr.chunk_count as usize);
1970    walk_chunked(body, keyring, max_body_bytes, |chunk| {
1971        out.extend_from_slice(&chunk);
1972        Ok(())
1973    })?;
1974    Ok(Bytes::from(out))
1975}
1976
1977/// Back-compat wrapper around [`decrypt_chunked_buffered`] that uses
1978/// [`DEFAULT_MAX_BODY_BYTES`] (5 GiB — AWS S3's single-object PUT
1979/// ceiling). Provided so the existing `decrypt(body, keyring)`
1980/// dispatcher and call sites that don't have a bespoke cap can keep
1981/// their signature unchanged after the v0.8.2 #64 pre-allocation
1982/// guard landed.
1983pub fn decrypt_chunked_buffered_default(
1984    body: &[u8],
1985    keyring: &SseKeyring,
1986) -> Result<Bytes, SseError> {
1987    decrypt_chunked_buffered(body, keyring, DEFAULT_MAX_BODY_BYTES)
1988}
1989
1990/// v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): stream-decrypt API for
1991/// chunked SSE-S4 bodies. Returns a [`futures::Stream`] that yields
1992/// one `Bytes` per chunk in order. Each chunk is emitted only after
1993/// AES-GCM tag verify succeeds, so the client never sees plaintext
1994/// bytes that haven't been authenticated. A failing chunk yields
1995/// its [`SseError::ChunkAuthFailed`] (with the chunk index) and ends
1996/// the stream — earlier chunks may already have left the gateway,
1997/// which matches the standard streaming-AEAD trade-off (operators
1998/// MUST alert on the audit log + metric, not rely on connection
1999/// close to guarantee atomicity).
2000///
2001/// Accepts either S4E5 (v0.8 #52, legacy) or S4E6 (v0.8.1 #57,
2002/// current) magic. Non-chunked magic surfaces as
2003/// [`SseError::BadMagic`] / [`SseError::ChunkFrameTruncated`] on
2004/// the first poll — the stream is "fail-fast" rather than "fall
2005/// through to S4E2 buffered decrypt", because the caller has
2006/// already dispatched on [`peek_magic`] by the time it hands a body
2007/// to this function.
2008///
2009/// `body` is owned by the returned stream so the caller doesn't
2010/// need to keep the bytes alive separately. The returned stream is
2011/// `'static` — the `keyring` borrow is consumed up front to extract
2012/// the per-frame key and build the AES cipher (which owns its key
2013/// material), so the caller's keyring may be dropped immediately.
2014pub fn decrypt_chunked_stream(
2015    body: bytes::Bytes,
2016    keyring: &SseKeyring,
2017) -> impl futures::Stream<Item = Result<Bytes, SseError>> + 'static {
2018    use futures::stream::{self, StreamExt};
2019
2020    // Cheap pre-validation: parse the header + look up the key
2021    // once, up front, so a malformed frame surfaces on the first
2022    // poll instead of being deferred behind the first-chunk loop.
2023    // The `keyring` borrow ends here — we extract the AES key into
2024    // the owned `Aes256Gcm` cipher, then store that in the stream
2025    // state.
2026    let prelude = (|| {
2027        // v0.8.2 #64: streaming path is naturally memory-bounded
2028        // (one chunk-worth of plaintext at a time), so we don't cap
2029        // the *declared* total here — the per-chunk loop already
2030        // bounds memory by `chunk_size`. The truncation /
2031        // arithmetic-overflow checks inside `parse_chunked_header`
2032        // still run regardless of `max_body_bytes`, which is what
2033        // protects the streaming path from the same DoS class as the
2034        // buffered path (the overflow / declared-vs-actual length
2035        // mismatches are independent of the gateway cap).
2036        let hdr = parse_chunked_header(&body, usize::MAX)?;
2037        let key = keyring
2038            .get(hdr.key_id)
2039            .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
2040        let cipher = Aes256Gcm::new(key.as_aes_key());
2041        Ok::<_, SseError>((hdr, cipher))
2042    })();
2043
2044    match prelude {
2045        Err(e) => stream::iter(std::iter::once(Err(e))).left_stream(),
2046        Ok((hdr, cipher)) => {
2047            let chunks_offset = hdr.chunks_offset;
2048            let state = ChunkedDecryptState {
2049                body,
2050                cipher,
2051                hdr,
2052                cursor: chunks_offset,
2053                next_index: 0,
2054            };
2055            stream::try_unfold(state, decrypt_next_chunk).right_stream()
2056        }
2057    }
2058}
2059
2060/// Per-stream state for [`decrypt_chunked_stream`]. Holds the owned
2061/// `body` (so the stream stays self-contained), the prepared
2062/// cipher, and the cursor position into the chunk array.
2063struct ChunkedDecryptState {
2064    body: bytes::Bytes,
2065    cipher: Aes256Gcm,
2066    hdr: ChunkedHeader,
2067    cursor: usize,
2068    next_index: u32,
2069}
2070
2071async fn decrypt_next_chunk(
2072    mut state: ChunkedDecryptState,
2073) -> Result<Option<(Bytes, ChunkedDecryptState)>, SseError> {
2074    if state.next_index >= state.hdr.chunk_count {
2075        // Final boundary check — anything past the declared
2076        // chunk_count would be a truncation / append attack.
2077        if state.cursor != state.body.len() {
2078            return Err(SseError::ChunkFrameTruncated {
2079                what: "trailing bytes after declared chunk_count",
2080            });
2081        }
2082        return Ok(None);
2083    }
2084    let i = state.next_index;
2085    let chunk_size = state.hdr.chunk_size as usize;
2086    if state.cursor + TAG_LEN > state.body.len() {
2087        return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
2088    }
2089    let tag_off = state.cursor;
2090    let ct_off = tag_off + TAG_LEN;
2091    let is_last = i + 1 == state.hdr.chunk_count;
2092    let ct_len = if is_last {
2093        if ct_off > state.body.len() {
2094            return Err(SseError::ChunkFrameTruncated {
2095                what: "final chunk ciphertext",
2096            });
2097        }
2098        let remaining = state.body.len() - ct_off;
2099        if remaining > chunk_size {
2100            return Err(SseError::ChunkFrameTruncated {
2101                what: "trailing bytes after final chunk",
2102            });
2103        }
2104        remaining
2105    } else {
2106        chunk_size
2107    };
2108    let ct_end = ct_off + ct_len;
2109    if ct_end > state.body.len() {
2110        return Err(SseError::ChunkFrameTruncated {
2111            what: "chunk ciphertext",
2112        });
2113    }
2114    let mut tag = [0u8; TAG_LEN];
2115    tag.copy_from_slice(&state.body[tag_off..ct_off]);
2116    let ct = &state.body[ct_off..ct_end];
2117    let plain = decrypt_chunked_chunk(
2118        &state.cipher,
2119        i,
2120        state.hdr.chunk_count,
2121        state.hdr.key_id,
2122        &state.hdr.salt,
2123        &tag,
2124        ct,
2125    )?;
2126    crate::metrics::record_sse_streaming_chunk("decrypt");
2127    state.cursor = ct_end;
2128    state.next_index += 1;
2129    Ok(Some((plain, state)))
2130}
2131
2132/// v0.8.1 #57: build an S4E5 frame. Identical body structure to the
2133/// pre-#57 `encrypt_v2_chunked` (4-byte salt + S4E5 magic + V5
2134/// nonce/AAD); kept around purely so the back-compat-read tests can
2135/// synthesize a "v0.8.0 vintage" blob and prove the new gateway
2136/// still decrypts it.
2137#[cfg(test)]
2138fn encrypt_v2_chunked_s4e5_for_test(
2139    plaintext: &[u8],
2140    keyring: &SseKeyring,
2141    chunk_size: usize,
2142) -> Result<Bytes, SseError> {
2143    if chunk_size == 0 {
2144        return Err(SseError::ChunkSizeInvalid);
2145    }
2146    let (key_id, key) = keyring.active();
2147    let cipher = Aes256Gcm::new(key.as_aes_key());
2148    let mut salt = [0u8; 4];
2149    rand::rngs::OsRng.fill_bytes(&mut salt);
2150
2151    let chunk_count: u32 = if plaintext.is_empty() {
2152        1
2153    } else {
2154        plaintext
2155            .len()
2156            .div_ceil(chunk_size)
2157            .try_into()
2158            .expect("chunk_count overflows u32")
2159    };
2160
2161    let mut out = Vec::with_capacity(
2162        S4E5_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E5_PER_CHUNK_OVERHEAD),
2163    );
2164    out.extend_from_slice(SSE_MAGIC_V5);
2165    out.push(ALGO_AES_256_GCM);
2166    out.extend_from_slice(&key_id.to_be_bytes());
2167    out.push(0u8);
2168    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
2169    out.extend_from_slice(&chunk_count.to_be_bytes());
2170    out.extend_from_slice(&salt);
2171
2172    for i in 0..chunk_count {
2173        let off = (i as usize).saturating_mul(chunk_size);
2174        let end = off.saturating_add(chunk_size).min(plaintext.len());
2175        let chunk_pt: &[u8] = if off >= plaintext.len() {
2176            &[]
2177        } else {
2178            &plaintext[off..end]
2179        };
2180        let nonce_bytes = nonce_v5(&salt, i);
2181        let nonce = Nonce::from_slice(&nonce_bytes);
2182        let aad = aad_v5(i, chunk_count, key_id, &salt);
2183        let ct_with_tag = cipher
2184            .encrypt(
2185                nonce,
2186                Payload {
2187                    msg: chunk_pt,
2188                    aad: &aad,
2189                },
2190            )
2191            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
2192        let split = ct_with_tag.len() - TAG_LEN;
2193        let (ct, tag) = ct_with_tag.split_at(split);
2194        out.extend_from_slice(tag);
2195        out.extend_from_slice(ct);
2196    }
2197    Ok(Bytes::from(out))
2198}
2199
2200#[cfg(test)]
2201mod tests {
2202    use super::*;
2203
2204    fn key32(seed: u8) -> Arc<SseKey> {
2205        Arc::new(SseKey::from_bytes(&[seed; 32]).unwrap())
2206    }
2207
2208    fn keyring_single(seed: u8) -> SseKeyring {
2209        SseKeyring::new(1, key32(seed))
2210    }
2211
2212    #[test]
2213    fn roundtrip_basic_v1() {
2214        // back-compat single-key API — still works.
2215        let k = SseKey::from_bytes(&[7u8; 32]).unwrap();
2216        let pt = b"the quick brown fox jumps over the lazy dog";
2217        let ct = encrypt(&k, pt);
2218        assert!(looks_encrypted(&ct));
2219        assert_eq!(&ct[..4], SSE_MAGIC_V1);
2220        assert_eq!(ct[4], ALGO_AES_256_GCM);
2221        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2222        // decrypt via single-key keyring
2223        let kr = SseKeyring::new(1, Arc::new(k));
2224        let pt2 = decrypt(&ct, &kr).unwrap();
2225        assert_eq!(pt2.as_ref(), pt);
2226    }
2227
2228    #[test]
2229    fn s4e2_roundtrip_active_key() {
2230        let kr = keyring_single(7);
2231        let pt = b"S4E2 active-key roundtrip";
2232        let ct = encrypt_v2(pt, &kr);
2233        assert_eq!(&ct[..4], SSE_MAGIC_V2);
2234        assert_eq!(ct[4], ALGO_AES_256_GCM);
2235        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE");
2236        assert_eq!(ct[7], 0, "reserved byte");
2237        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2238        assert!(looks_encrypted(&ct));
2239        let pt2 = decrypt(&ct, &kr).unwrap();
2240        assert_eq!(pt2.as_ref(), pt);
2241    }
2242
2243    #[test]
2244    fn decrypt_s4e1_via_active_only_keyring() {
2245        // v0.4 wrote S4E1 with key K; v0.5 keyring has K as the only
2246        // (active) key. Decrypt must succeed.
2247        let k_arc = key32(11);
2248        let legacy_ct = encrypt(&k_arc, b"v0.4 vintage object");
2249        assert_eq!(&legacy_ct[..4], SSE_MAGIC_V1);
2250        let kr = SseKeyring::new(1, Arc::clone(&k_arc));
2251        let plain = decrypt(&legacy_ct, &kr).unwrap();
2252        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2253    }
2254
2255    #[test]
2256    fn decrypt_s4e2_under_old_key_after_rotation() {
2257        // Rotation flow: object was encrypted under key id=1 when 1
2258        // was active. Operator rotates to active=2 and keeps 1 in the
2259        // keyring. The S4E2 body must still decrypt.
2260        let k1 = key32(1);
2261        let k2 = key32(2);
2262        let mut kr_old = SseKeyring::new(1, Arc::clone(&k1));
2263        let ct = encrypt_v2(b"old-rotation object", &kr_old);
2264        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2265
2266        // After rotation: active=2, but key 1 still in ring.
2267        kr_old.add(2, Arc::clone(&k2));
2268        let mut kr_new = SseKeyring::new(2, Arc::clone(&k2));
2269        kr_new.add(1, Arc::clone(&k1));
2270
2271        let plain = decrypt(&ct, &kr_new).unwrap();
2272        assert_eq!(plain.as_ref(), b"old-rotation object");
2273
2274        // And new PUTs go to id 2 (active).
2275        let new_ct = encrypt_v2(b"new-rotation object", &kr_new);
2276        assert_eq!(u16::from_be_bytes([new_ct[5], new_ct[6]]), 2);
2277        let plain_new = decrypt(&new_ct, &kr_new).unwrap();
2278        assert_eq!(plain_new.as_ref(), b"new-rotation object");
2279    }
2280
2281    #[test]
2282    fn s4e2_unknown_key_id_errors() {
2283        let kr = keyring_single(3); // only id=1 present
2284        let kr_other = SseKeyring::new(99, key32(3));
2285        let ct = encrypt_v2(b"x", &kr_other); // body claims key_id=99
2286        let err = decrypt(&ct, &kr).unwrap_err();
2287        assert!(
2288            matches!(err, SseError::KeyNotInKeyring { id: 99 }),
2289            "got {err:?}"
2290        );
2291    }
2292
2293    #[test]
2294    fn s4e2_tampered_key_id_fails_auth() {
2295        let kr = SseKeyring::new(1, key32(4));
2296        let mut kr_with_2 = kr.clone();
2297        kr_with_2.add(2, key32(5)); // a real but wrong key under id=2
2298        let mut ct = encrypt_v2(b"do not flip my key id", &kr).to_vec();
2299        // Flip key_id from 1 → 2 in the header. The keyring HAS a key
2300        // for 2, so the lookup succeeds — but AAD authenticates the
2301        // original key_id, so AES-GCM tag verification must fail.
2302        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2303        ct[5] = 0;
2304        ct[6] = 2;
2305        let err = decrypt(&ct, &kr_with_2).unwrap_err();
2306        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2307    }
2308
2309    #[test]
2310    fn s4e2_tampered_ciphertext_fails() {
2311        let kr = SseKeyring::new(7, key32(9));
2312        let mut ct = encrypt_v2(b"secret message v2", &kr).to_vec();
2313        let last = ct.len() - 1;
2314        ct[last] ^= 0x01;
2315        let err = decrypt(&ct, &kr).unwrap_err();
2316        assert!(matches!(err, SseError::DecryptFailed));
2317    }
2318
2319    #[test]
2320    fn s4e2_tampered_algo_byte_fails() {
2321        let kr = SseKeyring::new(1, key32(2));
2322        let mut ct = encrypt_v2(b"hi", &kr).to_vec();
2323        ct[4] = 99;
2324        let err = decrypt(&ct, &kr).unwrap_err();
2325        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2326    }
2327
2328    #[test]
2329    fn wrong_key_fails_v1_via_keyring() {
2330        // S4E1 written under key K1; keyring has only K2 → DecryptFailed.
2331        let k1 = SseKey::from_bytes(&[1u8; 32]).unwrap();
2332        let ct = encrypt(&k1, b"secret");
2333        let kr_wrong = SseKeyring::new(1, Arc::new(SseKey::from_bytes(&[2u8; 32]).unwrap()));
2334        let err = decrypt(&ct, &kr_wrong).unwrap_err();
2335        assert!(matches!(err, SseError::DecryptFailed));
2336    }
2337
2338    #[test]
2339    fn rejects_short_body() {
2340        let kr = SseKeyring::new(1, key32(1));
2341        let err = decrypt(b"short", &kr).unwrap_err();
2342        assert!(matches!(err, SseError::TooShort { got: 5 }));
2343    }
2344
2345    #[test]
2346    fn looks_encrypted_passthrough_returns_false() {
2347        // S4F2 frame magic, NOT S4E1 / S4E2 — must not be confused.
2348        let f2 = b"S4F2\x01\x00\x00\x00........................................";
2349        assert!(!looks_encrypted(f2));
2350        assert!(!looks_encrypted(b""));
2351    }
2352
2353    #[test]
2354    fn looks_encrypted_detects_both_v1_and_v2() {
2355        let kr = SseKeyring::new(1, key32(8));
2356        let v1 = encrypt(&SseKey::from_bytes(&[8u8; 32]).unwrap(), b"x");
2357        let v2 = encrypt_v2(b"x", &kr);
2358        assert!(looks_encrypted(&v1));
2359        assert!(looks_encrypted(&v2));
2360    }
2361
2362    #[test]
2363    fn key_from_hex_string() {
2364        let bad =
2365            SseKey::from_bytes(b"0102030405060708090a0b0c0d0e0f10111213141516171819202122232425")
2366                .unwrap_err();
2367        assert!(matches!(bad, SseError::BadKeyLength { .. }));
2368        let good = b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2369        let _ = SseKey::from_bytes(good).expect("64-char hex should parse");
2370    }
2371
2372    #[test]
2373    fn encrypt_v2_uses_random_nonce() {
2374        let kr = SseKeyring::new(1, key32(3));
2375        let pt = b"deterministic input";
2376        let a = encrypt_v2(pt, &kr);
2377        let b = encrypt_v2(pt, &kr);
2378        assert_ne!(a, b, "nonce must be random per-call");
2379    }
2380
2381    #[test]
2382    fn keyring_active_and_get() {
2383        let k1 = key32(1);
2384        let k2 = key32(2);
2385        let mut kr = SseKeyring::new(1, Arc::clone(&k1));
2386        kr.add(2, Arc::clone(&k2));
2387        let (id, active) = kr.active();
2388        assert_eq!(id, 1);
2389        assert_eq!(active.bytes, [1u8; 32]);
2390        assert!(kr.get(2).is_some());
2391        assert!(kr.get(3).is_none());
2392    }
2393
2394    // -----------------------------------------------------------------
2395    // v0.5 #27 — SSE-C (customer-provided key, S4E3 frame) tests
2396    // -----------------------------------------------------------------
2397
2398    use base64::Engine as _;
2399
2400    fn cust_key(seed: u8) -> CustomerKeyMaterial {
2401        let key = [seed; KEY_LEN];
2402        let key_md5 = compute_key_md5(&key);
2403        CustomerKeyMaterial { key, key_md5 }
2404    }
2405
2406    #[test]
2407    fn s4e3_roundtrip_happy_path() {
2408        let m = cust_key(42);
2409        let pt = b"top-secret SSE-C payload";
2410        let ct = encrypt_with_source(
2411            pt,
2412            SseSource::CustomerKey {
2413                key: &m.key,
2414                key_md5: &m.key_md5,
2415            },
2416        );
2417        // Frame inspection.
2418        assert_eq!(&ct[..4], SSE_MAGIC_V3);
2419        assert_eq!(ct[4], ALGO_AES_256_GCM);
2420        assert_eq!(&ct[5..5 + KEY_MD5_LEN], &m.key_md5);
2421        assert_eq!(ct.len(), SSE_HEADER_BYTES_V3 + pt.len());
2422        assert!(looks_encrypted(&ct));
2423        // Decrypt round-trip.
2424        let plain = decrypt(
2425            &ct,
2426            SseSource::CustomerKey {
2427                key: &m.key,
2428                key_md5: &m.key_md5,
2429            },
2430        )
2431        .unwrap();
2432        assert_eq!(plain.as_ref(), pt);
2433        // And via the From impl on &CustomerKeyMaterial.
2434        let plain2 = decrypt(&ct, &m).unwrap();
2435        assert_eq!(plain2.as_ref(), pt);
2436    }
2437
2438    #[test]
2439    fn s4e3_wrong_key_yields_wrong_customer_key_error() {
2440        let m = cust_key(1);
2441        let other = cust_key(2);
2442        let ct = encrypt_with_source(b"payload", (&m).into());
2443        let err = decrypt(
2444            &ct,
2445            SseSource::CustomerKey {
2446                key: &other.key,
2447                key_md5: &other.key_md5,
2448            },
2449        )
2450        .unwrap_err();
2451        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2452    }
2453
2454    #[test]
2455    fn s4e3_tampered_stored_md5_is_caught() {
2456        // Attacker rewrites the stored MD5 to match a key they know.
2457        // Even though the supplied (attacker) key matches the rewritten
2458        // MD5, AES-GCM authenticates the ORIGINAL md5 via AAD, so the
2459        // tag check fails. Surface: WrongCustomerKey if the supplied
2460        // md5 != stored md5 (this test), or DecryptFailed if attacker
2461        // also rewrites their supplied md5 to match.
2462        let m = cust_key(7);
2463        let mut ct = encrypt_with_source(b"victim payload", (&m).into()).to_vec();
2464        // Flip a byte in the stored fingerprint.
2465        ct[5] ^= 0x55;
2466        // Client supplies the original (unmodified) key + md5.
2467        let err = decrypt(
2468            &ct,
2469            SseSource::CustomerKey {
2470                key: &m.key,
2471                key_md5: &m.key_md5,
2472            },
2473        )
2474        .unwrap_err();
2475        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2476    }
2477
2478    #[test]
2479    fn s4e3_tampered_md5_with_matching_supplied_md5_fails_aead() {
2480        // Both stored md5 AND supplied md5 are flipped to the same bogus
2481        // value. The fingerprint check passes (they match) but AAD
2482        // authenticates the *original* md5, so AES-GCM fails.
2483        let m = cust_key(3);
2484        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2485        ct[5] ^= 0xFF;
2486        let mut bogus_md5 = m.key_md5;
2487        bogus_md5[0] ^= 0xFF;
2488        let err = decrypt(
2489            &ct,
2490            SseSource::CustomerKey {
2491                key: &m.key,
2492                key_md5: &bogus_md5,
2493            },
2494        )
2495        .unwrap_err();
2496        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2497    }
2498
2499    #[test]
2500    fn s4e3_tampered_ciphertext_fails_aead() {
2501        let m = cust_key(8);
2502        let mut ct = encrypt_with_source(b"sealed message", (&m).into()).to_vec();
2503        let last = ct.len() - 1;
2504        ct[last] ^= 0x01;
2505        let err = decrypt(&ct, &m).unwrap_err();
2506        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2507    }
2508
2509    #[test]
2510    fn s4e3_tampered_algo_byte_rejected() {
2511        let m = cust_key(9);
2512        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2513        ct[4] = 99;
2514        let err = decrypt(&ct, &m).unwrap_err();
2515        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2516    }
2517
2518    #[test]
2519    fn s4e3_uses_random_nonce() {
2520        let m = cust_key(10);
2521        let a = encrypt_with_source(b"deterministic input", (&m).into());
2522        let b = encrypt_with_source(b"deterministic input", (&m).into());
2523        assert_ne!(a, b, "nonce must be random per-call");
2524    }
2525
2526    #[test]
2527    fn parse_customer_key_headers_happy_path() {
2528        let key = [11u8; KEY_LEN];
2529        let md5 = compute_key_md5(&key);
2530        let key_b64 = base64::engine::general_purpose::STANDARD.encode(key);
2531        let md5_b64 = base64::engine::general_purpose::STANDARD.encode(md5);
2532        let m = parse_customer_key_headers("AES256", &key_b64, &md5_b64).unwrap();
2533        assert_eq!(m.key, key);
2534        assert_eq!(m.key_md5, md5);
2535    }
2536
2537    #[test]
2538    fn parse_customer_key_headers_rejects_wrong_algorithm() {
2539        let key = [1u8; KEY_LEN];
2540        let md5 = compute_key_md5(&key);
2541        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2542        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2543        let err = parse_customer_key_headers("AES128", &kb, &mb).unwrap_err();
2544        assert!(
2545            matches!(err, SseError::CustomerKeyAlgorithmUnsupported { ref algo } if algo == "AES128"),
2546            "got {err:?}"
2547        );
2548        // Lowercase variant still rejected (AWS S3 accepts only "AES256").
2549        let err2 = parse_customer_key_headers("aes256", &kb, &mb).unwrap_err();
2550        assert!(
2551            matches!(err2, SseError::CustomerKeyAlgorithmUnsupported { .. }),
2552            "got {err2:?}"
2553        );
2554    }
2555
2556    #[test]
2557    fn parse_customer_key_headers_rejects_wrong_key_length() {
2558        let short_key = vec![5u8; 16]; // half-length AES key
2559        let md5 = compute_key_md5(&short_key);
2560        let kb = base64::engine::general_purpose::STANDARD.encode(&short_key);
2561        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2562        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2563        assert!(
2564            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("key length")),
2565            "got {err:?}"
2566        );
2567    }
2568
2569    #[test]
2570    fn parse_customer_key_headers_rejects_wrong_md5_length() {
2571        let key = [3u8; KEY_LEN];
2572        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2573        // Truncated MD5 (15 bytes instead of 16).
2574        let bad_md5 = vec![0u8; 15];
2575        let mb = base64::engine::general_purpose::STANDARD.encode(bad_md5);
2576        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2577        assert!(
2578            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 length")),
2579            "got {err:?}"
2580        );
2581    }
2582
2583    #[test]
2584    fn parse_customer_key_headers_rejects_md5_mismatch() {
2585        let key = [4u8; KEY_LEN];
2586        let other = [5u8; KEY_LEN];
2587        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2588        let wrong_md5 = compute_key_md5(&other);
2589        let mb = base64::engine::general_purpose::STANDARD.encode(wrong_md5);
2590        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2591        assert!(
2592            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 does not match")),
2593            "got {err:?}"
2594        );
2595    }
2596
2597    #[test]
2598    fn parse_customer_key_headers_rejects_bad_base64() {
2599        let valid_key = [0u8; KEY_LEN];
2600        let md5 = compute_key_md5(&valid_key);
2601        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2602        let err = parse_customer_key_headers("AES256", "!!!not-base64!!!", &mb).unwrap_err();
2603        assert!(
2604            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2605            "got {err:?}"
2606        );
2607        // Bad MD5 base64.
2608        let kb = base64::engine::general_purpose::STANDARD.encode(valid_key);
2609        let err2 = parse_customer_key_headers("AES256", &kb, "??not-base64??").unwrap_err();
2610        assert!(
2611            matches!(err2, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2612            "got {err2:?}"
2613        );
2614    }
2615
2616    #[test]
2617    fn parse_customer_key_headers_trims_whitespace() {
2618        // S3 SDKs sometimes pad headers with trailing newlines.
2619        let key = [12u8; KEY_LEN];
2620        let md5 = compute_key_md5(&key);
2621        let kb = format!(
2622            "  {}\n",
2623            base64::engine::general_purpose::STANDARD.encode(key)
2624        );
2625        let mb = format!(
2626            "\t{}  ",
2627            base64::engine::general_purpose::STANDARD.encode(md5)
2628        );
2629        let m = parse_customer_key_headers("AES256", &kb, &mb).unwrap();
2630        assert_eq!(m.key, key);
2631    }
2632
2633    // -----------------------------------------------------------------
2634    // Back-compat + cross-source mixing
2635    // -----------------------------------------------------------------
2636
2637    #[test]
2638    fn back_compat_decrypt_s4e1_with_keyring_source() {
2639        let k = key32(33);
2640        let legacy_ct = encrypt(&k, b"v0.4 vintage object");
2641        let kr = SseKeyring::new(1, Arc::clone(&k));
2642        // Both call styles must work — `&kr` (back-compat) and
2643        // `SseSource::Keyring(&kr)` (explicit).
2644        let plain = decrypt(&legacy_ct, &kr).unwrap();
2645        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2646        let plain2 = decrypt(&legacy_ct, SseSource::Keyring(&kr)).unwrap();
2647        assert_eq!(plain2.as_ref(), b"v0.4 vintage object");
2648    }
2649
2650    #[test]
2651    fn back_compat_decrypt_s4e2_with_keyring_source() {
2652        let kr = keyring_single(34);
2653        let ct = encrypt_v2(b"v0.5 #29 object", &kr);
2654        let plain = decrypt(&ct, &kr).unwrap();
2655        assert_eq!(plain.as_ref(), b"v0.5 #29 object");
2656        // encrypt_with_source(Keyring) should produce the same wire
2657        // format (S4E2).
2658        let ct2 = encrypt_with_source(b"v0.5 #29 object", SseSource::Keyring(&kr));
2659        assert_eq!(&ct2[..4], SSE_MAGIC_V2);
2660        let plain2 = decrypt(&ct2, &kr).unwrap();
2661        assert_eq!(plain2.as_ref(), b"v0.5 #29 object");
2662    }
2663
2664    #[test]
2665    fn s4e2_blob_with_customer_key_source_is_rejected() {
2666        // An object stored with SSE-S4 (S4E2) but a client sending
2667        // SSE-C headers on the GET — this is a misuse, surface as
2668        // CustomerKeyUnexpected so service.rs can return 400.
2669        let kr = keyring_single(50);
2670        let ct = encrypt_v2(b"server-managed object", &kr);
2671        let m = cust_key(99);
2672        let err = decrypt(
2673            &ct,
2674            SseSource::CustomerKey {
2675                key: &m.key,
2676                key_md5: &m.key_md5,
2677            },
2678        )
2679        .unwrap_err();
2680        assert!(
2681            matches!(err, SseError::CustomerKeyUnexpected),
2682            "got {err:?}"
2683        );
2684    }
2685
2686    #[test]
2687    fn s4e3_blob_with_keyring_source_is_rejected() {
2688        // Inverse: object is SSE-C (S4E3) but client forgot to send
2689        // SSE-C headers. Service.rs should map this to 400.
2690        let m = cust_key(60);
2691        let ct = encrypt_with_source(b"customer-key object", (&m).into());
2692        let kr = keyring_single(60);
2693        let err = decrypt(&ct, &kr).unwrap_err();
2694        assert!(matches!(err, SseError::CustomerKeyRequired), "got {err:?}");
2695    }
2696
2697    #[test]
2698    fn looks_encrypted_detects_s4e3() {
2699        let m = cust_key(13);
2700        let ct = encrypt_with_source(b"x", (&m).into());
2701        assert!(looks_encrypted(&ct));
2702    }
2703
2704    #[test]
2705    fn s4e3_rejects_short_body() {
2706        // 36 bytes passes the looks_encrypted gate but is shorter than
2707        // S4E3's 49-byte header.
2708        let mut short = Vec::new();
2709        short.extend_from_slice(SSE_MAGIC_V3);
2710        short.push(ALGO_AES_256_GCM);
2711        // Padding to 36 bytes (SSE_HEADER_BYTES) so the outer length
2712        // check passes but the S4E3 inner check fails.
2713        short.extend_from_slice(&[0u8; SSE_HEADER_BYTES - 5]);
2714        assert_eq!(short.len(), SSE_HEADER_BYTES);
2715        let m = cust_key(1);
2716        let err = decrypt(
2717            &short,
2718            SseSource::CustomerKey {
2719                key: &m.key,
2720                key_md5: &m.key_md5,
2721            },
2722        )
2723        .unwrap_err();
2724        assert!(matches!(err, SseError::TooShort { .. }), "got {err:?}");
2725    }
2726
2727    #[test]
2728    fn customer_key_material_debug_redacts_key() {
2729        let m = cust_key(99);
2730        let s = format!("{m:?}");
2731        assert!(s.contains("redacted"));
2732        assert!(!s.contains(&format!("{:?}", m.key.as_slice())));
2733    }
2734
2735    #[test]
2736    fn constant_time_eq_basic() {
2737        assert!(constant_time_eq(b"abc", b"abc"));
2738        assert!(!constant_time_eq(b"abc", b"abd"));
2739        assert!(!constant_time_eq(b"abc", b"abcd"));
2740        assert!(constant_time_eq(b"", b""));
2741    }
2742
2743    #[test]
2744    fn compute_key_md5_known_vector() {
2745        // Empty input MD5 is known: d41d8cd98f00b204e9800998ecf8427e.
2746        let got = compute_key_md5(b"");
2747        let expected_hex = "d41d8cd98f00b204e9800998ecf8427e";
2748        assert_eq!(hex_lower(&got), expected_hex);
2749    }
2750
2751    // -----------------------------------------------------------------
2752    // v0.5 #28 — SSE-KMS envelope (S4E4) tests
2753    // -----------------------------------------------------------------
2754
2755    use crate::kms::{KmsBackend, LocalKms};
2756    use std::collections::HashMap;
2757    use std::path::PathBuf;
2758
2759    fn local_kms_with(key_ids: &[(&str, [u8; 32])]) -> LocalKms {
2760        let mut keks: HashMap<String, [u8; 32]> = HashMap::new();
2761        for (id, k) in key_ids {
2762            keks.insert((*id).to_string(), *k);
2763        }
2764        LocalKms::from_keks(PathBuf::from("/tmp/none"), keks)
2765    }
2766
2767    #[tokio::test]
2768    async fn s4e4_roundtrip_via_local_kms() {
2769        let kms = local_kms_with(&[("alpha", [42u8; 32])]);
2770        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2771        let mut dek = [0u8; 32];
2772        dek.copy_from_slice(&dek_vec);
2773        let pt = b"SSE-KMS envelope payload across the S4E4 frame";
2774        let ct = encrypt_with_source(
2775            pt,
2776            SseSource::Kms {
2777                dek: &dek,
2778                wrapped: &wrapped,
2779            },
2780        );
2781        // Frame inspection.
2782        assert_eq!(&ct[..4], SSE_MAGIC_V4);
2783        assert_eq!(ct[4], ALGO_AES_256_GCM);
2784        let key_id_len = ct[5] as usize;
2785        assert_eq!(key_id_len, "alpha".len());
2786        assert_eq!(&ct[6..6 + key_id_len], b"alpha");
2787        // peek_magic + looks_encrypted both recognise S4E4.
2788        assert!(looks_encrypted(&ct));
2789        assert_eq!(peek_magic(&ct), Some("S4E4"));
2790        // Async decrypt round-trip.
2791        let plain = decrypt_with_kms(&ct, &kms).await.unwrap();
2792        assert_eq!(plain.as_ref(), pt);
2793    }
2794
2795    #[tokio::test]
2796    async fn s4e4_tampered_key_id_fails_aead() {
2797        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2798        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2799        let mut dek = [0u8; 32];
2800        dek.copy_from_slice(&dek_vec);
2801        let mut ct = encrypt_with_source(
2802            b"do not redirect",
2803            SseSource::Kms {
2804                dek: &dek,
2805                wrapped: &wrapped,
2806            },
2807        )
2808        .to_vec();
2809        // Flip the key_id from "alpha" to "betaa" by changing the
2810        // first byte of the key_id field. The forged id "bltha" is
2811        // not in the KMS, so unwrap fails with KeyNotFound surfaced
2812        // through KmsBackend(KmsError::KeyNotFound).
2813        let key_id_off = 6;
2814        ct[key_id_off] = b'b';
2815        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2816        assert!(
2817            matches!(
2818                err,
2819                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2820                    | SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2821            ),
2822            "got {err:?}"
2823        );
2824    }
2825
2826    #[tokio::test]
2827    async fn s4e4_tampered_key_id_to_real_other_id_still_fails() {
2828        // Wrap under "alpha" but rewrite the stored key_id to "beta"
2829        // (which IS in the KMS). KmsBackend will try to unwrap with
2830        // beta's KEK and AAD = "beta", but the wrapped bytes were
2831        // produced with alpha's KEK + AAD = "alpha", so the local
2832        // KMS unwrap fails with UnwrapFailed.
2833        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2834        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2835        let mut dek = [0u8; 32];
2836        dek.copy_from_slice(&dek_vec);
2837        let mut ct = encrypt_with_source(
2838            b"redirect attempt",
2839            SseSource::Kms {
2840                dek: &dek,
2841                wrapped: &wrapped,
2842            },
2843        )
2844        .to_vec();
2845        // Both "alpha" and "beta" are 5 chars long so the rewrite
2846        // doesn't shift any other field offsets.
2847        let key_id_off = 6;
2848        ct[key_id_off..key_id_off + 5].copy_from_slice(b"beta_");
2849        // Trim back to 4-byte "beta" by also shrinking the length
2850        // prefix would change downstream offsets — instead pad the
2851        // forged id to keep length stable. This mirrors the realistic
2852        // tampering surface (attacker can flip bytes but not change
2853        // the on-disk layout). The KMS now sees key_id "beta_" which
2854        // is unknown → KeyNotFound.
2855        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2856        assert!(
2857            matches!(
2858                err,
2859                SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2860            ),
2861            "got {err:?}"
2862        );
2863    }
2864
2865    #[tokio::test]
2866    async fn s4e4_tampered_wrapped_dek_fails_unwrap() {
2867        let kms = local_kms_with(&[("k", [3u8; 32])]);
2868        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2869        let mut dek = [0u8; 32];
2870        dek.copy_from_slice(&dek_vec);
2871        let mut ct = encrypt_with_source(
2872            b"target body",
2873            SseSource::Kms {
2874                dek: &dek,
2875                wrapped: &wrapped,
2876            },
2877        )
2878        .to_vec();
2879        // Locate the wrapped_dek_len + wrapped_dek field and flip a
2880        // byte in the middle of the wrapped DEK. AES-GCM auth on the
2881        // wrap fails → KmsBackend(UnwrapFailed).
2882        let key_id_len = ct[5] as usize;
2883        let wrapped_len_off = 6 + key_id_len;
2884        let wrapped_off = wrapped_len_off + 4;
2885        let mid = wrapped_off + (wrapped.ciphertext.len() / 2);
2886        ct[mid] ^= 0xFF;
2887        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2888        assert!(
2889            matches!(
2890                err,
2891                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2892            ),
2893            "got {err:?}"
2894        );
2895    }
2896
2897    #[tokio::test]
2898    async fn s4e4_tampered_ciphertext_fails_aead() {
2899        let kms = local_kms_with(&[("k", [4u8; 32])]);
2900        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2901        let mut dek = [0u8; 32];
2902        dek.copy_from_slice(&dek_vec);
2903        let mut ct = encrypt_with_source(
2904            b"sealed body",
2905            SseSource::Kms {
2906                dek: &dek,
2907                wrapped: &wrapped,
2908            },
2909        )
2910        .to_vec();
2911        let last = ct.len() - 1;
2912        ct[last] ^= 0x01;
2913        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2914        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2915    }
2916
2917    #[tokio::test]
2918    async fn s4e4_uses_random_nonce_and_dek_per_put() {
2919        let kms = local_kms_with(&[("k", [5u8; 32])]);
2920        // Two PUTs of the same plaintext under the same KEK must
2921        // produce different ciphertexts (fresh DEK + fresh nonce).
2922        let (dek1_vec, wrapped1) = kms.generate_dek("k").await.unwrap();
2923        let (dek2_vec, wrapped2) = kms.generate_dek("k").await.unwrap();
2924        let mut dek1 = [0u8; 32];
2925        dek1.copy_from_slice(&dek1_vec);
2926        let mut dek2 = [0u8; 32];
2927        dek2.copy_from_slice(&dek2_vec);
2928        let pt = b"deterministic input";
2929        let a = encrypt_with_source(
2930            pt,
2931            SseSource::Kms {
2932                dek: &dek1,
2933                wrapped: &wrapped1,
2934            },
2935        );
2936        let b = encrypt_with_source(
2937            pt,
2938            SseSource::Kms {
2939                dek: &dek2,
2940                wrapped: &wrapped2,
2941            },
2942        );
2943        assert_ne!(a, b);
2944        // Both still decrypt round-trip.
2945        let plain_a = decrypt_with_kms(&a, &kms).await.unwrap();
2946        let plain_b = decrypt_with_kms(&b, &kms).await.unwrap();
2947        assert_eq!(plain_a.as_ref(), pt);
2948        assert_eq!(plain_b.as_ref(), pt);
2949    }
2950
2951    #[tokio::test]
2952    async fn s4e4_sync_decrypt_returns_kms_async_required() {
2953        // The whole point of KmsAsyncRequired: passing an S4E4 body
2954        // to the sync `decrypt` function must surface a distinct
2955        // error so service.rs's GET path notices the bug rather than
2956        // returning a generic "wrong source" 400.
2957        let kms = local_kms_with(&[("k", [6u8; 32])]);
2958        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2959        let mut dek = [0u8; 32];
2960        dek.copy_from_slice(&dek_vec);
2961        let ct = encrypt_with_source(
2962            b"async only",
2963            SseSource::Kms {
2964                dek: &dek,
2965                wrapped: &wrapped,
2966            },
2967        );
2968        // Try via Keyring source (the default sync path).
2969        let kr = SseKeyring::new(1, key32(0));
2970        let err = decrypt(&ct, &kr).unwrap_err();
2971        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2972    }
2973
2974    #[test]
2975    fn back_compat_s4e1_e2_e3_still_decrypt_via_sync() {
2976        // After adding S4E4, the sync `decrypt` path must still
2977        // handle every legacy frame variant unchanged.
2978        let k = key32(7);
2979        let v1 = encrypt(&k, b"v0.4 vintage");
2980        let kr = SseKeyring::new(1, Arc::clone(&k));
2981        assert_eq!(decrypt(&v1, &kr).unwrap().as_ref(), b"v0.4 vintage");
2982
2983        let v2 = encrypt_v2(b"v0.5 #29 vintage", &kr);
2984        assert_eq!(decrypt(&v2, &kr).unwrap().as_ref(), b"v0.5 #29 vintage");
2985
2986        let m = cust_key(7);
2987        let v3 = encrypt_with_source(b"v0.5 #27 vintage", (&m).into());
2988        assert_eq!(decrypt(&v3, &m).unwrap().as_ref(), b"v0.5 #27 vintage");
2989    }
2990
2991    #[test]
2992    fn peek_magic_distinguishes_all_variants() {
2993        // S4E1 / S4E2 / S4E3 — built from real encrypts so the
2994        // length gate also passes.
2995        let k = key32(9);
2996        let v1 = encrypt(&k, b"x");
2997        assert_eq!(peek_magic(&v1), Some("S4E1"));
2998        let kr = SseKeyring::new(1, Arc::clone(&k));
2999        let v2 = encrypt_v2(b"x", &kr);
3000        assert_eq!(peek_magic(&v2), Some("S4E2"));
3001        let m = cust_key(9);
3002        let v3 = encrypt_with_source(b"x", (&m).into());
3003        assert_eq!(peek_magic(&v3), Some("S4E3"));
3004        // Synthetic S4E4 magic with enough trailing bytes to clear
3005        // the 36-byte length gate. peek_magic does NOT validate the
3006        // S4E4 inner header, just the magic — that's the contract
3007        // (cheap dispatch signal).
3008        let mut v4 = Vec::new();
3009        v4.extend_from_slice(SSE_MAGIC_V4);
3010        v4.extend_from_slice(&[0u8; 40]);
3011        assert_eq!(peek_magic(&v4), Some("S4E4"));
3012        // Unknown magic / too-short input → None.
3013        assert!(peek_magic(b"NOPE").is_none());
3014        assert!(peek_magic(b"short").is_none());
3015        assert!(peek_magic(&[0u8; 100]).is_none());
3016    }
3017
3018    #[tokio::test]
3019    async fn s4e4_truncated_frame_errors_cleanly() {
3020        // Truncate to less than the minimum header. Must surface
3021        // KmsFrameTooShort, not panic, not return BadMagic.
3022        let truncated = b"S4E4\x01\x05hi";
3023        let kms = local_kms_with(&[("k", [1u8; 32])]);
3024        let err = decrypt_with_kms(truncated, &kms).await.unwrap_err();
3025        assert!(
3026            matches!(err, SseError::KmsFrameTooShort { .. }),
3027            "got {err:?}"
3028        );
3029    }
3030
3031    #[tokio::test]
3032    async fn s4e4_oob_key_id_len_errors() {
3033        // Build a body that claims key_id_len = 200 but only has 4
3034        // bytes after the length prefix. parse_s4e4_header must
3035        // refuse with KmsFrameFieldOob, not slice-panic.
3036        let mut body = Vec::new();
3037        body.extend_from_slice(SSE_MAGIC_V4);
3038        body.push(ALGO_AES_256_GCM);
3039        body.push(200u8); // key_id_len
3040        // Remaining bytes < 200; pad to clear the looks_encrypted
3041        // floor (36 bytes) but stay short of the claimed key_id +
3042        // wrapped_dek_len + nonce + tag layout.
3043        body.extend_from_slice(&[0u8; 50]);
3044        let kms = local_kms_with(&[("k", [1u8; 32])]);
3045        let err = decrypt_with_kms(&body, &kms).await.unwrap_err();
3046        assert!(
3047            matches!(err, SseError::KmsFrameFieldOob { .. }),
3048            "got {err:?}"
3049        );
3050    }
3051
3052    #[tokio::test]
3053    async fn s4e4_via_keyring_source_into_sync_decrypt_is_kms_async_required() {
3054        // S4E4 + Keyring source: sync decrypt sees the S4E4 magic
3055        // first and returns KmsAsyncRequired regardless of source —
3056        // the source mismatch never gets a chance to surface, which
3057        // is the right behaviour (caller's bug is "didn't peek
3058        // magic" not "wrong source").
3059        let kms = local_kms_with(&[("k", [9u8; 32])]);
3060        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
3061        let mut dek = [0u8; 32];
3062        dek.copy_from_slice(&dek_vec);
3063        let ct = encrypt_with_source(
3064            b"x",
3065            SseSource::Kms {
3066                dek: &dek,
3067                wrapped: &wrapped,
3068            },
3069        );
3070        let m = cust_key(1);
3071        let err = decrypt(&ct, &m).unwrap_err();
3072        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
3073    }
3074
3075    #[tokio::test]
3076    async fn s4e4_looks_encrypted_passthrough_returns_false_for_synthetic() {
3077        // S4F4 (note F not E) must NOT be confused with S4E4.
3078        let mut not_s4e4 = Vec::new();
3079        not_s4e4.extend_from_slice(b"S4F4");
3080        not_s4e4.extend_from_slice(&[0u8; 60]);
3081        assert!(!looks_encrypted(&not_s4e4));
3082        assert_eq!(peek_magic(&not_s4e4), None);
3083    }
3084
3085    #[tokio::test]
3086    async fn s4e4_aad_length_prefix_prevents_byte_shifting() {
3087        // Constructing an S4E4 body where the wrapped_dek_len is
3088        // shrunk by N bytes and the same N bytes are prepended to
3089        // the key_id-equivalent area would, without length-prefixed
3090        // AAD, produce the same AAD bytestream. Verify our AAD
3091        // includes the length prefixes by tampering with
3092        // wrapped_dek_len and confirming AES-GCM auth fails.
3093        let kms = local_kms_with(&[("kk", [11u8; 32])]);
3094        let (dek_vec, wrapped) = kms.generate_dek("kk").await.unwrap();
3095        let mut dek = [0u8; 32];
3096        dek.copy_from_slice(&dek_vec);
3097        let mut ct = encrypt_with_source(
3098            b"length-shift defense",
3099            SseSource::Kms {
3100                dek: &dek,
3101                wrapped: &wrapped,
3102            },
3103        )
3104        .to_vec();
3105        let key_id_len = ct[5] as usize;
3106        let wrapped_len_off = 6 + key_id_len;
3107        // Shrink wrapped_dek_len by 1. parse_s4e4_header now reads a
3108        // shorter wrapped_dek and a different nonce/tag/ciphertext
3109        // alignment — KMS unwrap fails OR AES-GCM fails OR frame
3110        // bounds reject. All three surface as auditable errors;
3111        // none should reach a successful decrypt.
3112        let original_len = u32::from_be_bytes([
3113            ct[wrapped_len_off],
3114            ct[wrapped_len_off + 1],
3115            ct[wrapped_len_off + 2],
3116            ct[wrapped_len_off + 3],
3117        ]);
3118        let new_len = (original_len - 1).to_be_bytes();
3119        ct[wrapped_len_off..wrapped_len_off + 4].copy_from_slice(&new_len);
3120        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
3121        // Acceptable failure modes: unwrap fail (truncated wrapped
3122        // DEK), AES-GCM fail (shifted nonce/tag/AAD), or frame bounds.
3123        assert!(
3124            matches!(
3125                err,
3126                SseError::KmsBackend(_)
3127                    | SseError::DecryptFailed
3128                    | SseError::KmsFrameFieldOob { .. }
3129                    | SseError::KmsFrameTooShort { .. }
3130            ),
3131            "got {err:?}"
3132        );
3133    }
3134
3135    // -----------------------------------------------------------------------
3136    // v0.8 #52: S4E5 chunked SSE-S4 — encrypt_v2_chunked / decrypt_chunked_stream
3137    // -----------------------------------------------------------------------
3138
3139    use futures::StreamExt;
3140
3141    /// Drain a chunked-decrypt stream into a `Vec<Bytes>` for assertion.
3142    /// Surfaces the first error verbatim (so tests can match on it).
3143    async fn collect_chunks(
3144        s: impl futures::Stream<Item = Result<Bytes, SseError>>,
3145    ) -> Result<Vec<Bytes>, SseError> {
3146        let mut out = Vec::new();
3147        let mut s = std::pin::pin!(s);
3148        while let Some(item) = s.next().await {
3149            out.push(item?);
3150        }
3151        Ok(out)
3152    }
3153
3154    #[test]
3155    fn s4e6_encrypt_layout_10mb_at_1mib() {
3156        // v0.8.1 #57: encrypt_v2_chunked now emits S4E6 (24-byte
3157        // header, 8-byte salt). 10 MB plaintext at 1 MiB chunk
3158        // size → magic "S4E6", chunk_count=10, header bytes line
3159        // up to the documented 24 + 10 * 16 + 10 MB layout.
3160        let kr = keyring_single(0x42);
3161        let chunk_size = 1024 * 1024;
3162        let pt_len = 10 * 1024 * 1024;
3163        let pt = vec![0xAB_u8; pt_len];
3164        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).expect("encrypt ok");
3165        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new PUTs emit S4E6 (v0.8.1 #57)");
3166        assert_eq!(ct[4], ALGO_AES_256_GCM);
3167        assert_eq!(
3168            u16::from_be_bytes([ct[5], ct[6]]),
3169            1,
3170            "key_id BE = active id"
3171        );
3172        assert_eq!(ct[7], 0, "reserved must be 0");
3173        assert_eq!(
3174            u32::from_be_bytes([ct[8], ct[9], ct[10], ct[11]]),
3175            chunk_size as u32,
3176            "chunk_size BE",
3177        );
3178        assert_eq!(
3179            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3180            10,
3181            "chunk_count BE — 10 MiB / 1 MiB = 10 (no remainder)",
3182        );
3183        // Salt now 8 bytes — verify the slice exists and isn't all
3184        // zeros (defensive: catches a stuck PRNG that would leave
3185        // the salt array uninitialized).
3186        assert_eq!(&ct[16..24].len(), &8, "S4E6 salt slot is 8 bytes");
3187        assert_ne!(
3188            &ct[16..24],
3189            &[0u8; 8],
3190            "S4E6 salt must be random, not zeros"
3191        );
3192        assert_eq!(
3193            ct.len(),
3194            S4E6_HEADER_BYTES + 10 * S4E6_PER_CHUNK_OVERHEAD + pt_len,
3195            "total = header (24) + 10 tags + plaintext",
3196        );
3197        assert!(looks_encrypted(&ct), "looks_encrypted must accept S4E6");
3198        assert_eq!(peek_magic(&ct), Some("S4E6"));
3199    }
3200
3201    #[tokio::test]
3202    async fn s4e6_decrypt_chunked_stream_byte_equal() {
3203        // v0.8.1 #57: round-trip via S4E6 path. encrypt_v2_chunked
3204        // emits S4E6, decrypt_chunked_stream consumes S4E5/S4E6.
3205        let kr = keyring_single(0x55);
3206        let pt: Vec<u8> = (0..(10 * 1024 * 1024_u32))
3207            .map(|i| (i & 0xFF) as u8)
3208            .collect();
3209        let ct = encrypt_v2_chunked(&pt, &kr, 1024 * 1024).unwrap();
3210        // Sanity: the new PUT is S4E6.
3211        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new emit is S4E6");
3212        let stream = decrypt_chunked_stream(ct, &kr);
3213        let chunks = collect_chunks(stream).await.expect("stream ok");
3214        assert_eq!(chunks.len(), 10, "10 chunks expected for 10 MiB / 1 MiB");
3215        let mut joined = Vec::with_capacity(pt.len());
3216        for c in chunks {
3217            joined.extend_from_slice(&c);
3218        }
3219        assert_eq!(joined.len(), pt.len(), "byte length matches");
3220        assert_eq!(joined, pt, "byte-equal round-trip");
3221    }
3222
3223    #[tokio::test]
3224    async fn s4e6_single_chunk_for_small_object() {
3225        // Plaintext smaller than chunk_size → chunk_count=1. The
3226        // chunk_count field offset is unchanged between S4E5 and
3227        // S4E6 (both at body[12..16]); only the salt width differs.
3228        let kr = keyring_single(0x77);
3229        let pt = b"tiny payload, smaller than chunk_size";
3230        let ct = encrypt_v2_chunked(pt, &kr, 1024 * 1024).unwrap();
3231        assert_eq!(
3232            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3233            1,
3234            "small plaintext = single chunk",
3235        );
3236        let stream = decrypt_chunked_stream(ct, &kr);
3237        let chunks = collect_chunks(stream).await.expect("stream ok");
3238        assert_eq!(chunks.len(), 1);
3239        assert_eq!(chunks[0].as_ref(), pt);
3240    }
3241
3242    #[tokio::test]
3243    async fn s4e6_tampered_chunk_n_reports_chunk_index() {
3244        // v0.8.1 #57: same tamper-detect contract as the S4E5
3245        // version, just with the wider 24-byte header. Tamper
3246        // byte inside chunk index 3 (= 4th chunk) — the stream
3247        // must yield 3 successful chunks, then ChunkAuthFailed { 3 }.
3248        let kr = keyring_single(0x91);
3249        let chunk_size = 1024;
3250        let pt = vec![0xCD_u8; chunk_size * 8]; // 8 chunks
3251        let mut ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap().to_vec();
3252        // Locate chunk 3's first ciphertext byte: header (24) + 3 *
3253        // (tag 16 + ct 1024) + tag 16 = 24 + 3*1040 + 16 = 3160.
3254        let target = S4E6_HEADER_BYTES + 3 * (TAG_LEN + chunk_size) + TAG_LEN;
3255        ct[target] ^= 0x42;
3256        let stream = decrypt_chunked_stream(bytes::Bytes::from(ct), &kr);
3257        let mut s = std::pin::pin!(stream);
3258        // Chunks 0, 1, 2 must succeed.
3259        for expected_i in 0..3_u32 {
3260            let item = s.next().await.expect("yield");
3261            item.unwrap_or_else(|e| panic!("chunk {expected_i}: {e:?}"));
3262        }
3263        // Chunk 3 fails with the right index.
3264        let err = s.next().await.expect("yield error").unwrap_err();
3265        assert!(
3266            matches!(err, SseError::ChunkAuthFailed { chunk_index: 3 }),
3267            "got {err:?}",
3268        );
3269    }
3270
3271    #[tokio::test]
3272    async fn s4e5_back_compat_s4e2_blob_rejected_with_clear_error() {
3273        // Feeding an S4E2 frame to decrypt_chunked_stream should
3274        // surface BadMagic on the first poll (NOT silently fall
3275        // back — the caller is expected to peek_magic and dispatch).
3276        let kr = keyring_single(0x12);
3277        let s4e2 = encrypt_v2(b"a v2 blob, not chunked", &kr);
3278        let stream = decrypt_chunked_stream(s4e2, &kr);
3279        let result = collect_chunks(stream).await;
3280        let err = result.unwrap_err();
3281        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3282    }
3283
3284    #[test]
3285    fn s4e6_salt_randomness_smoke() {
3286        // 8-byte salt → birthday paradox 50% collision at ~2^32
3287        // (~4.3 billion) PUTs. 1024 PUTs → effectively zero
3288        // expected collisions; we don't enforce zero, just
3289        // sanity-check the salt actually differs more than half
3290        // the time (catches a stuck PRNG without a 4-billion-PUT
3291        // test).
3292        let kr = keyring_single(0x33);
3293        let mut salts = std::collections::HashSet::new();
3294        let n = 1024;
3295        for _ in 0..n {
3296            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3297            let mut salt = [0u8; 8];
3298            salt.copy_from_slice(&ct[16..24]);
3299            salts.insert(salt);
3300        }
3301        assert!(
3302            salts.len() > n / 2,
3303            "expected most of the {n} salts to be unique (got {} unique)",
3304            salts.len(),
3305        );
3306    }
3307
3308    #[test]
3309    fn s4e6_chunk_size_zero_invalid() {
3310        let kr = keyring_single(0x66);
3311        let err = encrypt_v2_chunked(b"hi", &kr, 0).unwrap_err();
3312        assert!(matches!(err, SseError::ChunkSizeInvalid));
3313    }
3314
3315    #[tokio::test]
3316    async fn s4e6_truncated_body_reports_frame_truncated() {
3317        // Truncate inside chunk 2's tag → ChunkFrameTruncated, not
3318        // panic, not silent success. Header is now 24 bytes (S4E6).
3319        let kr = keyring_single(0xA1);
3320        let chunk_size = 256;
3321        let pt = vec![0u8; chunk_size * 4];
3322        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3323        // Truncate to inside chunk 2's tag: header + chunk0 + chunk1
3324        // + 8B partial of chunk2's tag.
3325        let trunc = S4E6_HEADER_BYTES + 2 * (TAG_LEN + chunk_size) + 8;
3326        let truncated = bytes::Bytes::copy_from_slice(&ct[..trunc]);
3327        let stream = decrypt_chunked_stream(truncated, &kr);
3328        let result = collect_chunks(stream).await;
3329        let err = result.unwrap_err();
3330        assert!(
3331            matches!(err, SseError::ChunkFrameTruncated { .. }),
3332            "got {err:?}",
3333        );
3334    }
3335
3336    #[test]
3337    fn s4e6_decrypt_buffered_round_trip_via_top_level_decrypt() {
3338        // Sync `decrypt(blob, &keyring)` must also accept the
3339        // chunked frames (back-compat path for callers that need
3340        // the whole plaintext).
3341        let kr = keyring_single(0xDE);
3342        let pt = b"buffered sync decrypt path".repeat(32);
3343        let ct = encrypt_v2_chunked(&pt, &kr, 13).unwrap();
3344        let plain = decrypt(&ct, &kr).expect("buffered S4E6 decrypt ok");
3345        assert_eq!(plain.as_ref(), pt.as_slice());
3346    }
3347
3348    #[tokio::test]
3349    async fn s4e6_unknown_key_id_in_frame_errors() {
3350        // Encrypt under id=7, decrypt under a keyring that lacks id=7.
3351        let kr_put = SseKeyring::new(7, key32(0xCC));
3352        let kr_get = keyring_single(0xCC); // only id=1
3353        let ct = encrypt_v2_chunked(b"orphan key", &kr_put, 64).unwrap();
3354        // Sync path
3355        let err = decrypt(&ct, &kr_get).unwrap_err();
3356        assert!(
3357            matches!(err, SseError::KeyNotInKeyring { id: 7 }),
3358            "got {err:?}"
3359        );
3360        // Stream path
3361        let stream = decrypt_chunked_stream(ct, &kr_get);
3362        let result = collect_chunks(stream).await;
3363        assert!(
3364            matches!(result, Err(SseError::KeyNotInKeyring { id: 7 })),
3365            "got {result:?}",
3366        );
3367    }
3368
3369    #[tokio::test]
3370    async fn s4e6_final_chunk_smaller_than_chunk_size() {
3371        // Plaintext = 2.5 chunks → final chunk holds half the bytes.
3372        // S4E6 header = 24 bytes → total on-disk = 24 + 48 + 250.
3373        let kr = keyring_single(0xEF);
3374        let chunk_size = 100;
3375        let pt: Vec<u8> = (0..250_u32).map(|i| i as u8).collect();
3376        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3377        assert_eq!(
3378            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3379            3,
3380            "ceil(250/100) = 3 chunks",
3381        );
3382        // Total on-disk: 24 header + 3 tags (48) + 250 plaintext = 322.
3383        assert_eq!(ct.len(), S4E6_HEADER_BYTES + 48 + 250);
3384        let stream = decrypt_chunked_stream(ct, &kr);
3385        let chunks = collect_chunks(stream).await.expect("stream ok");
3386        assert_eq!(chunks.len(), 3);
3387        assert_eq!(chunks[0].len(), 100);
3388        assert_eq!(chunks[1].len(), 100);
3389        assert_eq!(chunks[2].len(), 50, "final chunk is the remainder");
3390        let joined: Vec<u8> = chunks.iter().flat_map(|c| c.iter().copied()).collect();
3391        assert_eq!(joined, pt);
3392    }
3393
3394    // -----------------------------------------------------------------------
3395    // v0.8.1 #57: S4E6-specific tests added on top of the renamed
3396    // s4e6_* battery above. Keep these focused on what's *new*:
3397    //   - back-compat read of legacy S4E5 blobs
3398    //   - 24-bit chunk_count cap
3399    //   - the parse_s4e6_header public API
3400    // -----------------------------------------------------------------------
3401
3402    #[test]
3403    fn s4e6_back_compat_read_s4e5_blob() {
3404        // Synthesize a "v0.8.0 vintage" S4E5 blob via the test-only
3405        // helper, then prove the v0.8.1 gateway decrypts it under
3406        // the same keyring — both via sync `decrypt` (buffered)
3407        // and the streaming path. Without this, every S4E5 object
3408        // in production becomes unreadable after the upgrade.
3409        let kr = keyring_single(0x57);
3410        let pt = b"v0.8.0 vintage chunked SSE-S4 object".repeat(64);
3411        let s4e5 = encrypt_v2_chunked_s4e5_for_test(&pt, &kr, 91).unwrap();
3412        // Confirm the test fixture really is S4E5 magic + 20-byte header.
3413        assert_eq!(&s4e5[..4], SSE_MAGIC_V5, "fixture must be S4E5");
3414        assert_eq!(peek_magic(&s4e5), Some("S4E5"));
3415        // Sync decrypt path (top-level `decrypt`, dispatches V5 + V6).
3416        let plain_sync = decrypt(&s4e5, &kr).expect("sync S4E5 decrypt ok");
3417        assert_eq!(plain_sync.as_ref(), pt.as_slice());
3418        // Streaming decrypt path — must also accept S4E5.
3419        let collected = futures::executor::block_on(async {
3420            let stream = decrypt_chunked_stream(s4e5.clone(), &kr);
3421            collect_chunks(stream).await
3422        })
3423        .expect("stream S4E5 decrypt ok");
3424        let mut joined = Vec::with_capacity(pt.len());
3425        for c in collected {
3426            joined.extend_from_slice(&c);
3427        }
3428        assert_eq!(joined, pt, "S4E5 streaming round-trip byte-equal");
3429    }
3430
3431    #[test]
3432    fn s4e6_layout_24_bytes_header() {
3433        // Sanity check: the S4E6 fixed header is exactly 24 bytes
3434        // (vs 20 for S4E5). Catches an accidental const drift in a
3435        // future PR.
3436        assert_eq!(S4E6_HEADER_BYTES, 24);
3437        assert_eq!(S4E6_PER_CHUNK_OVERHEAD, TAG_LEN);
3438        assert_eq!(S4E6_HEADER_BYTES, S4E5_HEADER_BYTES + 4);
3439    }
3440
3441    #[test]
3442    fn s4e6_parse_header_round_trip() {
3443        // parse_s4e6_header is the public mirror of the internal
3444        // parse_chunked_header, useful for admin tools. Verify it
3445        // returns the same field values that encrypt_v2_chunked wrote.
3446        let kr = keyring_single(0xAB);
3447        let chunk_size = 256;
3448        let pt = vec![1u8; 7 * chunk_size];
3449        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3450        let hdr = parse_s4e6_header(&ct).expect("parse ok");
3451        assert_eq!(hdr.key_id, 1);
3452        assert_eq!(hdr.chunk_size, chunk_size as u32);
3453        assert_eq!(hdr.chunk_count, 7);
3454        assert_eq!(hdr.salt.len(), 8);
3455        // Bad magic on a non-S4E6 blob → BadMagic.
3456        let bogus = b"S4E2\x01\x00\x00\x00........................";
3457        let err = parse_s4e6_header(bogus).unwrap_err();
3458        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3459        // Truncation → ChunkFrameTruncated.
3460        let err2 = parse_s4e6_header(&ct[..10]).unwrap_err();
3461        assert!(
3462            matches!(err2, SseError::ChunkFrameTruncated { .. }),
3463            "got {err2:?}"
3464        );
3465    }
3466
3467    #[test]
3468    fn s4e6_salt_uniqueness_smoke_16m() {
3469        // v0.8.1 #57 security regression detection: with the
3470        // 4-byte S4E5 salt, ~65,536 PUTs already had ~50%
3471        // birthday collision (the bug that motivated this patch).
3472        // With the 8-byte S4E6 salt the expected collisions over
3473        // 65,536 PUTs is ~2^16 * 2^16 / 2^65 ≈ 2^-33 — i.e.
3474        // effectively zero.
3475        //
3476        // We can't actually run 16M PUTs in unit-test wall-clock
3477        // (each PUT does an AES-GCM encrypt), so we run a fast
3478        // smoke (16k) and additionally validate the math: at the
3479        // S4E5 4-byte salt width, 16k PUTs would already give a
3480        // ~3.1% collision probability by birthday bound; at the
3481        // S4E6 8-byte salt that drops to ~3.6e-11. The smoke test
3482        // therefore *would* show collisions if we'd accidentally
3483        // shipped the 4-byte salt — confirming the regression
3484        // detector.
3485        let kr = keyring_single(0xA6);
3486        let mut salts = std::collections::HashSet::with_capacity(16384);
3487        let n = 16384_usize;
3488        let mut collisions_top4 = 0usize;
3489        let mut top4_seen = std::collections::HashSet::with_capacity(16384);
3490        for _ in 0..n {
3491            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3492            let mut salt = [0u8; 8];
3493            salt.copy_from_slice(&ct[16..24]);
3494            salts.insert(salt);
3495            // Side-channel: count collisions on just the *first 4
3496            // bytes* of the 8-byte salt. If we'd kept the old
3497            // 4-byte salt, this collision count would be the only
3498            // collision count — and at n=16k it should be ~62
3499            // (birthday: n^2/(2 * 2^32) = 16384^2/2^33 ≈ 31, with
3500            // some noise). The full 8-byte salt test passes if the
3501            // FULL salts are all unique while the truncated-to-4
3502            // count is non-zero, proving the extra 4 bytes really
3503            // are doing the security work.
3504            let mut top4 = [0u8; 4];
3505            top4.copy_from_slice(&salt[..4]);
3506            if !top4_seen.insert(top4) {
3507                collisions_top4 += 1;
3508            }
3509        }
3510        assert_eq!(
3511            salts.len(),
3512            n,
3513            "all 8-byte salts must be unique across {n} PUTs (got {} unique)",
3514            salts.len(),
3515        );
3516        // Sanity check the regression detector: at 16k PUTs with a
3517        // 4-byte salt, birthday math predicts ~31 collisions on
3518        // average. Anything in the 0..200 range is statistically
3519        // believable for 16k uniform 32-bit draws; we only assert
3520        // ">= 1" (i.e. at least one collision happened — which
3521        // would have been a real bug under S4E5).
3522        eprintln!(
3523            "s4e6_salt_uniqueness_smoke_16m: 16k PUTs, full 8B salts \
3524             all unique ({}/{}), simulated 4B-truncated salt yielded \
3525             {} collisions (this is what S4E5 would have shipped)",
3526            salts.len(),
3527            n,
3528            collisions_top4,
3529        );
3530        // Don't make the test flaky on the simulated number (it's a
3531        // statistical signal); just leave the eprintln for the
3532        // operator audit log when the test runs verbose.
3533    }
3534
3535    #[test]
3536    fn s4e6_max_chunks_24bit() {
3537        // The S4E6 nonce embeds the chunk index as 24-bit BE, so
3538        // chunk_count > 2^24 - 1 must surface ChunkCountTooLarge
3539        // at PUT time. We can't actually run a 16M-chunk encrypt
3540        // in unit-test wall-clock (16M AES-GCM tag verifies even
3541        // on AES-NI is several minutes), but we can verify the
3542        // CAP constant matches expectations + exercise the cap by
3543        // picking a chunk_size that forces overflow on a tiny
3544        // plaintext.
3545        assert_eq!(S4E6_MAX_CHUNK_COUNT, (1u32 << 24) - 1);
3546        assert_eq!(S4E6_MAX_CHUNK_COUNT, 16_777_215);
3547
3548        // chunk_size=1 + plaintext.len()=16_777_216 → 16M+1 chunks
3549        // → over the cap → ChunkCountTooLarge. Allocating a 16
3550        // MiB plaintext is fine.
3551        let kr = keyring_single(0xC4);
3552        let pt = vec![0u8; (S4E6_MAX_CHUNK_COUNT as usize) + 1]; // 16,777,216 B
3553        let err = encrypt_v2_chunked(&pt, &kr, 1).unwrap_err();
3554        assert!(
3555            matches!(
3556                err,
3557                SseError::ChunkCountTooLarge {
3558                    got: 16_777_216,
3559                    max: 16_777_215
3560                }
3561            ),
3562            "got {err:?}",
3563        );
3564
3565        // And just under the cap (chunk_count = 16_777_215) should
3566        // succeed. We pick chunk_size that produces exactly the cap
3567        // so the inner loop only runs N times. 16M chunk-encrypts
3568        // would be slow, so test with a smaller cap-near config
3569        // that exercises the same boundary check: 1023 chunks of 1
3570        // byte each = 1023 chunks well under the cap → success.
3571        // The actual on-cap encrypt is exercised by the buffered
3572        // decrypt path through `parse_chunked_header`.
3573        let pt_ok = vec![0u8; 1023];
3574        let ct = encrypt_v2_chunked(&pt_ok, &kr, 1).expect("under-cap PUT must succeed");
3575        let hdr = parse_s4e6_header(&ct).unwrap();
3576        assert_eq!(hdr.chunk_count, 1023);
3577
3578        // Synthesize a frame that *claims* chunk_count > cap and
3579        // verify the parser rejects it (defensive: a tampered
3580        // header should not loop the walker 16M+ times).
3581        let mut tampered = ct.to_vec();
3582        // Rewrite chunk_count BE to S4E6_MAX_CHUNK_COUNT + 1 = 2^24.
3583        let bad = (S4E6_MAX_CHUNK_COUNT + 1).to_be_bytes();
3584        tampered[12..16].copy_from_slice(&bad);
3585        let err2 = parse_s4e6_header(&tampered).unwrap_err();
3586        assert!(
3587            matches!(
3588                err2,
3589                SseError::ChunkCountTooLarge {
3590                    got: 16_777_216,
3591                    max: 16_777_215
3592                }
3593            ),
3594            "got {err2:?}",
3595        );
3596    }
3597
3598    #[test]
3599    fn s4e6_nonce_v6_layout() {
3600        // Direct unit test on nonce_v6: prefix b'E', then 8B salt,
3601        // then 24-bit BE chunk_index. The high byte of u32
3602        // chunk_index must be dropped (caller-validated cap).
3603        let salt = [0xAA_u8; 8];
3604        let n0 = nonce_v6(&salt, 0);
3605        assert_eq!(n0[0], b'E');
3606        assert_eq!(&n0[1..9], &salt);
3607        assert_eq!(&n0[9..12], &[0, 0, 0]);
3608        let n1 = nonce_v6(&salt, 1);
3609        assert_eq!(&n1[9..12], &[0, 0, 1]);
3610        let n_mid = nonce_v6(&salt, 0x123456);
3611        assert_eq!(&n_mid[9..12], &[0x12, 0x34, 0x56]);
3612        let n_max = nonce_v6(&salt, S4E6_MAX_CHUNK_COUNT);
3613        assert_eq!(&n_max[9..12], &[0xFF, 0xFF, 0xFF]);
3614    }
3615
3616    #[tokio::test]
3617    async fn s4e6_tampered_salt_byte_fails_aead() {
3618        // Flipping a single byte of the 8-byte salt in the header
3619        // must invalidate every chunk's AES-GCM tag (salt is in
3620        // the AAD). Confirms the salt expansion didn't drop
3621        // header authentication.
3622        let kr = keyring_single(0xB6);
3623        let pt = b"salt-in-aad coverage".repeat(64);
3624        let mut ct = encrypt_v2_chunked(&pt, &kr, 128).unwrap().to_vec();
3625        // Salt bytes 16..24 — flip the middle byte.
3626        ct[20] ^= 0x01;
3627        let err = decrypt(&ct, &kr).unwrap_err();
3628        assert!(
3629            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3630            "got {err:?}",
3631        );
3632    }
3633
3634    // -----------------------------------------------------------------------
3635    // v0.8.2 #64: pre-allocation guard for chunked SSE frames
3636    //
3637    // The buffered decrypt path used to call `Vec::with_capacity(chunk_size
3638    // * chunk_count)` *before* validating either factor. A 24-byte malicious
3639    // S4E6 header could declare a multi-PB plaintext and trigger an instant
3640    // OOM / panic on a tiny ciphertext. Verify the guard rejects every
3641    // adversarial header pattern before any allocation happens.
3642    // -----------------------------------------------------------------------
3643
3644    /// Build a minimal S4E6 header with attacker-controlled chunk_size /
3645    /// chunk_count (no chunks attached — the body is exactly 24 bytes).
3646    /// Used by the DoS tests below to synthesize malicious frames without
3647    /// running an encrypt.
3648    fn synth_s4e6_header(chunk_size: u32, chunk_count: u32) -> Vec<u8> {
3649        let mut blob = Vec::with_capacity(S4E6_HEADER_BYTES);
3650        blob.extend_from_slice(SSE_MAGIC_V6);
3651        blob.push(ALGO_AES_256_GCM);
3652        blob.extend_from_slice(&1_u16.to_be_bytes()); // key_id = 1
3653        blob.push(0); // reserved
3654        blob.extend_from_slice(&chunk_size.to_be_bytes());
3655        blob.extend_from_slice(&chunk_count.to_be_bytes());
3656        blob.extend_from_slice(&[0u8; 8]); // 8-byte salt
3657        debug_assert_eq!(blob.len(), S4E6_HEADER_BYTES);
3658        blob
3659    }
3660
3661    #[test]
3662    fn s4e6_header_claims_huge_size_rejected_pre_alloc() {
3663        // 1 GiB chunk_size × 100 chunks = 100 GiB declared plaintext, but
3664        // the body the attacker actually shipped is only 100 bytes. The
3665        // pre-alloc guard's cap arm fires (100 GiB > 5 GiB default cap)
3666        // *before* the walker / `Vec::with_capacity(100 GiB)` — surfaces
3667        // as ChunkFrameTooLarge.
3668        let kr = keyring_single(0x01);
3669        let chunk_size: u32 = 1 << 30; // 1 GiB
3670        let chunk_count: u32 = 100;
3671        let mut blob = synth_s4e6_header(chunk_size, chunk_count);
3672        // Pad with 100 random bytes — the attack model: tiny payload,
3673        // monster header.
3674        blob.extend_from_slice(&[0u8; 100]);
3675        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3676        assert!(
3677            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3678            "expected ChunkFrameTooLarge (declared 100 GiB > 5 GiB cap), got {err:?}",
3679        );
3680        // Same input, tighter caller-supplied cap (1 MiB): still
3681        // ChunkFrameTooLarge. No panic, no OOM either way.
3682        let err2 = decrypt_chunked_buffered(&blob, &kr, 1024 * 1024).unwrap_err();
3683        assert!(
3684            matches!(err2, SseError::ChunkFrameTooLarge { .. }),
3685            "expected ChunkFrameTooLarge under tighter cap, got {err2:?}",
3686        );
3687    }
3688
3689    #[test]
3690    fn s4e6_header_chunk_size_x_chunk_count_overflows_u64() {
3691        // chunk_size = u32::MAX, chunk_count > 1 → product > u64 cap?
3692        // No — u32::MAX * u32::MAX fits in u64 (~2^64). To force u64
3693        // overflow we'd need both > 2^32. But chunk_count is capped to
3694        // 16M (S4E6_MAX_CHUNK_COUNT) by the earlier check, so the
3695        // realistic adversarial maximum is u32::MAX * 16M ≈ 2^56 — well
3696        // under u64::MAX. The overflow path is therefore *theoretically*
3697        // unreachable on S4E6 (24-bit chunk_count cap protects it), but
3698        // we still test it via S4E5 which has no 24-bit cap on
3699        // chunk_count: a 4-byte salt frame with chunk_size = u32::MAX
3700        // and chunk_count = u32::MAX gives 2^64 - 2^33 + 1 — fits u64
3701        // but adding the 16-byte tag overhead × u32::MAX chunks
3702        // overflows. Verify ChunkFrameTooLarge surfaces.
3703        let kr = keyring_single(0x02);
3704        // Synthesize an S4E5 header (20 bytes) with maximally
3705        // adversarial chunk_size + chunk_count.
3706        let mut blob = Vec::with_capacity(S4E5_HEADER_BYTES);
3707        blob.extend_from_slice(SSE_MAGIC_V5);
3708        blob.push(ALGO_AES_256_GCM);
3709        blob.extend_from_slice(&1_u16.to_be_bytes());
3710        blob.push(0);
3711        blob.extend_from_slice(&u32::MAX.to_be_bytes()); // chunk_size = 2^32 - 1
3712        blob.extend_from_slice(&u32::MAX.to_be_bytes()); // chunk_count = 2^32 - 1
3713        blob.extend_from_slice(&[0u8; 4]); // 4-byte S4E5 salt
3714        debug_assert_eq!(blob.len(), S4E5_HEADER_BYTES);
3715        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3716        assert!(
3717            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3718            "expected ChunkFrameTooLarge for u64 overflow, got {err:?}",
3719        );
3720
3721        // Also smoke the same input through `parse_chunked_header`
3722        // directly (the streaming path) — must error, never panic.
3723        let direct = parse_chunked_header(&blob, usize::MAX).unwrap_err();
3724        assert!(
3725            matches!(direct, SseError::ChunkFrameTooLarge { .. }),
3726            "streaming path: expected ChunkFrameTooLarge, got {direct:?}",
3727        );
3728    }
3729
3730    #[test]
3731    fn s4e6_header_within_max_body_bytes_passes() {
3732        // 1 MiB chunk_size × 100 chunks = 100 MiB declared plaintext —
3733        // well under the 5 GiB default cap. The header validation must
3734        // NOT reject this; instead it falls through to chunk-walk +
3735        // AES-GCM verify (which then fails on this synthetic frame
3736        // because we didn't write any ciphertext, so we expect
3737        // ChunkFrameTruncated *not* ChunkFrameTooLarge).
3738        let kr = keyring_single(0x03);
3739        let chunk_size: u32 = 1024 * 1024; // 1 MiB
3740        let chunk_count: u32 = 100;
3741        let mut blob = synth_s4e6_header(chunk_size, chunk_count);
3742        // Append the full declared chunk array so the truncation arm
3743        // doesn't fire — 100 chunks × (16 B tag + 1 MiB ct) = 100 MiB
3744        // + 1.6 KiB tags. We write zeros; AES-GCM verify will fail on
3745        // chunk 0, but that proves the pre-alloc guard *let it through*.
3746        let chunk_array_size =
3747            (chunk_count as usize) * (S4E6_PER_CHUNK_OVERHEAD + chunk_size as usize);
3748        blob.resize(blob.len() + chunk_array_size, 0);
3749        let err = decrypt_chunked_buffered(&blob, &kr, DEFAULT_MAX_BODY_BYTES).unwrap_err();
3750        // The guard let it through (we got past parse_chunked_header)
3751        // and AES-GCM tag verify failed on chunk 0 — that's the right
3752        // failure mode. ChunkFrameTooLarge would mean the cap fired
3753        // (a regression of this test). KeyNotInKeyring would mean the
3754        // header parsed but the key id (1) wasn't present (also a
3755        // regression — the keyring has id=1 active). Anything else is
3756        // a guard-too-strict bug.
3757        assert!(
3758            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3759            "expected ChunkAuthFailed (guard let it through), got {err:?}",
3760        );
3761    }
3762
3763    #[test]
3764    fn s4e6_header_exceeds_max_body_bytes_rejected() {
3765        // 1 MiB × 6000 chunks = 6 GiB declared plaintext > 5 GiB cap.
3766        // Must reject as ChunkFrameTooLarge before any alloc. We don't
3767        // attach the 6 GiB chunk array to the body — the cap arm only
3768        // looks at the declared `chunk_size × chunk_count`, not the
3769        // actual body length, so a tiny body suffices to exercise the
3770        // cap.
3771        let kr = keyring_single(0x04);
3772        let chunk_size: u32 = 1024 * 1024; // 1 MiB
3773        let chunk_count: u32 = 6000;
3774        let blob = synth_s4e6_header(chunk_size, chunk_count);
3775        // Body is exactly the 24-byte header, no chunks attached.
3776        // body.len() = 24 ≤ max_total (~6 GiB) so the trailing-bytes
3777        // check does NOT fire; the 6 GiB > 5 GiB cap check does.
3778        let err = decrypt_chunked_buffered(&blob, &kr, DEFAULT_MAX_BODY_BYTES).unwrap_err();
3779        assert!(
3780            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3781            "expected ChunkFrameTooLarge (6 GiB declared > 5 GiB cap), got {err:?}",
3782        );
3783
3784        // Directly exercise the cap arm with a tighter cap (1 MiB)
3785        // and a 100 MiB declared plaintext that fits fully in the
3786        // body, so the cap is unambiguously the deciding factor.
3787        let chunk_size_b: u32 = 1024 * 1024; // 1 MiB
3788        let chunk_count_b: u32 = 100;
3789        let mut blob_b = synth_s4e6_header(chunk_size_b, chunk_count_b);
3790        let pad_b = (chunk_count_b as usize) * (S4E6_PER_CHUNK_OVERHEAD + chunk_size_b as usize);
3791        blob_b.resize(blob_b.len() + pad_b, 0);
3792        // Cap = 1 MiB, declared plaintext = 100 MiB → ChunkFrameTooLarge.
3793        let err_b = decrypt_chunked_buffered(&blob_b, &kr, 1024 * 1024).unwrap_err();
3794        assert!(
3795            matches!(err_b, SseError::ChunkFrameTooLarge { .. }),
3796            "expected ChunkFrameTooLarge (cap < declared), got {err_b:?}",
3797        );
3798    }
3799
3800    #[test]
3801    fn s4e6_random_header_never_panics() {
3802        // DoS regression — feed 100k random byte sequences shaped like
3803        // chunked SSE headers to `parse_chunked_header`. Every result
3804        // must be Ok or Err; no panic, no OOM. This exercises the
3805        // arithmetic-overflow + truncation arms of the v0.8.2 #64
3806        // guard against adversarial inputs the previous unit tests
3807        // didn't enumerate.
3808        //
3809        // We use a fixed seed (deterministic) rather than proptest
3810        // here so the test is repeatable across CI runs without
3811        // pulling in shrink machinery for a pure no-panic property.
3812        use rand::{Rng, SeedableRng, rngs::StdRng};
3813        let mut rng = StdRng::seed_from_u64(0xC0FF_EE64_6464_64DE);
3814        let mut max_body_bytes_choices = [
3815            0_usize,
3816            1024,
3817            1024 * 1024,
3818            DEFAULT_MAX_BODY_BYTES,
3819            usize::MAX,
3820        ]
3821        .iter()
3822        .copied()
3823        .cycle();
3824        for _ in 0..100_000 {
3825            // Body length in [0, 256] — tiny on purpose: most random
3826            // bytes won't be a valid header, but we want the parser
3827            // to robustly reject every shape (truncated, wrong magic,
3828            // wrong algo, declared-vs-actual mismatch, overflow).
3829            let body_len = rng.gen_range(0..=256_usize);
3830            let mut body = vec![0u8; body_len];
3831            rng.fill(body.as_mut_slice());
3832            // Bias 1/4 of trials toward valid magic so the deeper
3833            // arithmetic paths get exercised, not just the BadMagic
3834            // early-out.
3835            if body_len >= 4 && rng.gen_bool(0.25) {
3836                if rng.gen_bool(0.5) {
3837                    body[..4].copy_from_slice(SSE_MAGIC_V5);
3838                } else {
3839                    body[..4].copy_from_slice(SSE_MAGIC_V6);
3840                }
3841            }
3842            let max_cap = max_body_bytes_choices.next().unwrap();
3843            // The contract: never panic, never alloc more than
3844            // `max_cap` worth of memory. We assert only no-panic
3845            // here; OOM behavior is implicit (the function returns
3846            // Err before any large alloc).
3847            let _ = parse_chunked_header(&body, max_cap);
3848        }
3849    }
3850
3851    // Bonus: an extreme-overflow case the guard is specifically
3852    // hardened against — chunk_count = u32::MAX with the per-chunk
3853    // overhead multiplication forced to overflow u64. Catches a
3854    // future refactor that would replace `checked_mul` with `*`.
3855    #[test]
3856    fn s4e5_extreme_overflow_chunk_count_u32_max() {
3857        let kr = keyring_single(0x05);
3858        // u32::MAX chunk_size × u32::MAX chunk_count overflows u64
3859        // when adding the 16 * u32::MAX tag overhead. (Strictly:
3860        // chunk_size × chunk_count = (2^32-1)^2 ≈ 2^64 - 2^33 + 1
3861        // — already u64-saturating; the +tag step then overflows.)
3862        // The guard's `checked_add` chain must catch this.
3863        let mut blob = Vec::with_capacity(S4E5_HEADER_BYTES);
3864        blob.extend_from_slice(SSE_MAGIC_V5);
3865        blob.push(ALGO_AES_256_GCM);
3866        blob.extend_from_slice(&1_u16.to_be_bytes());
3867        blob.push(0);
3868        blob.extend_from_slice(&u32::MAX.to_be_bytes());
3869        blob.extend_from_slice(&u32::MAX.to_be_bytes());
3870        blob.extend_from_slice(&[0u8; 4]);
3871        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3872        assert!(
3873            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3874            "expected ChunkFrameTooLarge for extreme overflow, got {err:?}",
3875        );
3876    }
3877}