Skip to main content

s4_server/
sse.rs

1//! Server-side encryption (SSE-S4) — AES-256-GCM (v0.4 #21, v0.5 #29, v0.5 #27, v0.5 #28).
2//!
3//! Wraps the post-compression S3 object body with authenticated
4//! encryption. Compress-then-encrypt is the right order: encryption
5//! produces high-entropy bytes that don't compress, so encrypting last
6//! preserves the codec's ratio.
7//!
8//! ## Wire formats
9//!
10//! ### S4E1 (v0.4) — single key, no rotation
11//!
12//! ```text
13//! [magic: "S4E1" 4B]
14//! [algo:  u8]            # 1 = AES-256-GCM
15//! [reserved: 3B]         # 0x00 0x00 0x00
16//! [nonce: 12B]           # random per-object
17//! [tag:   16B]           # AES-GCM authentication tag
18//! [ciphertext: variable] # encrypted-then-authenticated body
19//! ```
20//!
21//! Total overhead: 36 bytes per object.
22//!
23//! ### S4E2 (v0.5 #29) — keyring-aware, supports rotation
24//!
25//! ```text
26//! [magic:  "S4E2" 4B]
27//! [algo:   u8]            # 1 = AES-256-GCM
28//! [key_id: u16 BE]        # which keyring slot encrypted this body
29//! [reserved: 1B]          # 0x00
30//! [nonce:  12B]           # random per-object
31//! [tag:    16B]           # AES-GCM authentication tag
32//! [ciphertext: variable]
33//! ```
34//!
35//! Same 36-byte overhead — we reused the 3-byte reserved area in S4E1
36//! to fit a 2-byte key-id + 1-byte reserved without bumping the header
37//! size. The key-id is included in the AAD so a flipped key-id byte
38//! fails the auth tag (i.e. an attacker can't trick the gateway into
39//! decrypting under a different keyring slot).
40//!
41//! ### S4E3 (v0.5 #27) — SSE-C, customer-provided key
42//!
43//! ```text
44//! [magic:   "S4E3" 4B]
45//! [algo:    u8]            # 1 = AES-256-GCM
46//! [key_md5: 16B]           # MD5 fingerprint of the customer key
47//! [nonce:   12B]           # random per-object
48//! [tag:     16B]           # AES-GCM authentication tag
49//! [ciphertext: variable]
50//! ```
51//!
52//! Overhead: 49 bytes (`4 + 1 + 16 + 12 + 16`). Unlike S4E1/S4E2 the
53//! gateway does **not** persist the key — the client supplies it on
54//! every PUT/GET via `x-amz-server-side-encryption-customer-{algorithm,
55//! key,key-MD5}` headers. We store only the 16-byte MD5 in the on-disk
56//! frame so a GET with the wrong key surfaces as
57//! [`SseError::WrongCustomerKey`] before AES-GCM is even tried (saves a
58//! useless decrypt + gives operators a distinct error from generic auth
59//! failure).
60//!
61//! The `key_md5` is included in the AAD so flipping a single byte of
62//! the stored fingerprint also breaks AES-GCM auth — i.e. an attacker
63//! who tampered with the metadata can't sneak a different key past the
64//! check.
65//!
66//! ### S4E4 (v0.5 #28) — SSE-KMS envelope, per-object DEK
67//!
68//! ```text
69//! [magic:           "S4E4" 4B]
70//! [algo:            u8]            # 1 = AES-256-GCM
71//! [key_id_len:      u8]            # 1..=255, length of UTF-8 key_id
72//! [key_id:          variable]      # UTF-8, AAD-authenticated
73//! [wrapped_dek_len: u32 BE]        # length of the wrapped DEK blob
74//! [wrapped_dek:     variable]      # opaque, AAD-authenticated
75//! [nonce:           12B]           # random per-object
76//! [tag:             16B]           # AES-GCM auth tag for body
77//! [ciphertext:      variable]      # body encrypted under the DEK
78//! ```
79//!
80//! Header overhead: `4 + 1 + 1 + key_id_len + 4 + wrapped_dek_len + 12
81//! + 16` = 38 + key_id_len + wrapped_dek_len. For a typical
82//! [`crate::kms::LocalKms`] wrap (60-byte ciphertext) and a 36-char
83//! UUID-style `key_id`, that's ~134 bytes per object.
84//!
85//! `key_id` and `wrapped_dek` are both placed in the AAD so an
86//! attacker cannot rewrite either field to point the gateway at a
87//! different KEK or wrapped DEK without invalidating the body's
88//! AES-GCM tag. The plaintext DEK is never persisted; only the
89//! wrapped form is on disk, and the gateway holds the plaintext only
90//! for the duration of one PUT or GET.
91//!
92//! S4E4 decrypt requires an `async` round-trip to the KMS backend
93//! (to unwrap the DEK), so the synchronous [`decrypt`] function
94//! refuses S4E4 with [`SseError::KmsAsyncRequired`] — callers that
95//! peek `S4E4` via [`peek_magic`] must dispatch to
96//! [`decrypt_with_kms`] instead.
97//!
98//! ## v0.5 rotation flow (SSE-S4 only)
99//!
100//! Operators wire one [`SseKeyring`] holding the **active** key plus
101//! any number of **retired** keys. PUT always encrypts under the
102//! active key (S4E2 with that key's id). GET sniffs the magic:
103//!
104//! - `S4E1`: legacy single-key path. The keyring's active key is tried
105//!   first, then every retired key — this lets a v0.4 deployment
106//!   migrate to a keyring with the original key as active and decrypt
107//!   pre-rotation objects unchanged.
108//! - `S4E2`: read the key_id, look it up in the keyring, decrypt with
109//!   that exact key. Missing key_id surfaces as `KeyNotInKeyring`.
110//! - `S4E3`: keyring is **not** consulted. Caller must supply
111//!   [`SseSource::CustomerKey`] with the matching key + md5.
112//!
113//! ## Open follow-ups
114//!
115//! - **Server-managed key only** (for SSE-S4): keys come from local
116//!   files via `--sse-s4-key` / `--sse-s4-key-rotated`. KMS / vault
117//!   integration for the SSE-S4 keyring (i.e. wrapping the keyring's
118//!   keys with KMS) is a separate issue. SSE-KMS for per-object DEKs
119//!   is implemented (see [`SseSource::Kms`] + S4E4 above).
120
121use std::collections::HashMap;
122use std::path::Path;
123use std::sync::Arc;
124
125use aes_gcm::aead::{Aead, KeyInit, Payload};
126use aes_gcm::{Aes256Gcm, Key, Nonce};
127use bytes::Bytes;
128use md5::{Digest as Md5Digest, Md5};
129use rand::RngCore;
130use thiserror::Error;
131
132use crate::kms::{KmsBackend, KmsError, WrappedDek};
133
134pub const SSE_MAGIC_V1: &[u8; 4] = b"S4E1";
135pub const SSE_MAGIC_V2: &[u8; 4] = b"S4E2";
136pub const SSE_MAGIC_V3: &[u8; 4] = b"S4E3";
137pub const SSE_MAGIC_V4: &[u8; 4] = b"S4E4";
138/// v0.8 #52: chunked variant of S4E2 — same SSE-S4 keyring source,
139/// but the body is sliced into independently-sealed AES-GCM chunks
140/// so the GET path can stream-decrypt + emit chunk-by-chunk instead
141/// of buffering the entire object before tag verify. See
142/// [`encrypt_v2_chunked`] / [`decrypt_chunked_stream`] for the on-
143/// the-wire layout.
144///
145/// **Read-only as of v0.8.1 #57** — new PUTs emit [`SSE_MAGIC_V6`]
146/// (S4E6). S4E5 is kept around for back-compat decrypt of objects
147/// written by v0.8.0.
148pub const SSE_MAGIC_V5: &[u8; 4] = b"S4E5";
149/// v0.8.1 #57: identical layout to S4E5 except the per-PUT salt is
150/// widened from 4 → 8 bytes so the birthday-collision threshold on
151/// AES-GCM nonce reuse jumps from ~65k PUTs/key to ~4 billion. See
152/// [`encrypt_v2_chunked`] (now emits S4E6) / the S4E6 wire-format
153/// docs further down for the full layout.
154pub const SSE_MAGIC_V6: &[u8; 4] = b"S4E6";
155/// Back-compat alias — v0.4 callers that imported `SSE_MAGIC` mean S4E1.
156pub const SSE_MAGIC: &[u8; 4] = SSE_MAGIC_V1;
157
158/// Header layout matches between S4E1 and S4E2 (both 36 bytes total)
159/// because S4E2 reuses the 3-byte reserved slot to fit `key_id (2B) +
160/// reserved (1B)`. Keeping them the same length means the rest of the
161/// pipeline (sidecar offsets, multipart math) doesn't care which
162/// frame variant is in flight.
163pub const SSE_HEADER_BYTES: usize = 4 + 1 + 3 + 12 + 16; // = 36
164/// S4E3 (SSE-C) replaces the 3-byte reserved area with a 16-byte
165/// customer-key MD5 fingerprint, so the header is 49 bytes total.
166/// `magic 4 + algo 1 + key_md5 16 + nonce 12 + tag 16`.
167pub const SSE_HEADER_BYTES_V3: usize = 4 + 1 + KEY_MD5_LEN + 12 + 16; // = 49
168pub const ALGO_AES_256_GCM: u8 = 1;
169const NONCE_LEN: usize = 12;
170const TAG_LEN: usize = 16;
171const KEY_LEN: usize = 32;
172const KEY_MD5_LEN: usize = 16;
173/// AWS S3 SSE-C only allows AES256 in the
174/// `x-amz-server-side-encryption-customer-algorithm` header, so we
175/// match that exact spelling for parity with real S3 clients.
176pub const SSE_C_ALGORITHM: &str = "AES256";
177
178#[derive(Debug, Error)]
179pub enum SseError {
180    #[error("SSE key file {path:?}: {source}")]
181    KeyFileIo {
182        path: std::path::PathBuf,
183        source: std::io::Error,
184    },
185    #[error(
186        "SSE key file must be exactly 32 raw bytes (or 64-char hex / 44-char base64); got {got} bytes after parse"
187    )]
188    BadKeyLength { got: usize },
189    #[error("SSE-encrypted body too short ({got} bytes; need at least {SSE_HEADER_BYTES})")]
190    TooShort { got: usize },
191    #[error("SSE bad magic: expected S4E1/S4E2/S4E3/S4E4/S4E5/S4E6, got {got:?}")]
192    BadMagic { got: [u8; 4] },
193    #[error("SSE unsupported algo tag: {tag} (this build only knows AES-256-GCM = 1)")]
194    UnsupportedAlgo { tag: u8 },
195    #[error(
196        "SSE key_id {id} (S4E2 frame) not present in keyring; rotation history likely incomplete"
197    )]
198    KeyNotInKeyring { id: u16 },
199    #[error("SSE decryption / authentication failed (key mismatch or ciphertext tampered with)")]
200    DecryptFailed,
201    // --- v0.5 #27: SSE-C specific errors ---
202    /// The MD5 fingerprint stored in the S4E3 frame doesn't match the
203    /// MD5 of the customer key the client supplied. This is the
204    /// "wrong customer key on GET" signal — distinct from
205    /// `DecryptFailed` so service.rs can map it to AWS S3's
206    /// `403 AccessDenied` (S3 returns AccessDenied when the supplied
207    /// SSE-C key doesn't match the one used at PUT time).
208    #[error("SSE-C key MD5 fingerprint mismatch — client supplied a different key than PUT")]
209    WrongCustomerKey,
210    /// `parse_customer_key_headers` saw a malformed input. `reason` is
211    /// a short human string ("base64 decode of key", "key length",
212    /// "md5 length", "md5 mismatch") for operator log lines — never
213    /// echoed to the client (would leak crypto details).
214    #[error("SSE-C customer-key headers invalid: {reason}")]
215    InvalidCustomerKey { reason: &'static str },
216    /// Client asked for an SSE-C algorithm the gateway doesn't speak.
217    /// AWS S3 only ever defines `AES256` here; surfacing the offending
218    /// string lets us 400 with a useful message.
219    #[error("SSE-C algorithm {algo:?} unsupported (only {SSE_C_ALGORITHM:?} is allowed)")]
220    CustomerKeyAlgorithmUnsupported { algo: String },
221    /// S4E3 body lacks an SSE-C key — caller passed `SseSource::Keyring`
222    /// when decrypting an SSE-C-encrypted object. service.rs should
223    /// translate this into the same "missing customer key" 400 that
224    /// AWS S3 returns when SSE-C headers are absent on a GET.
225    #[error("S4E3 frame requires SseSource::CustomerKey; got Keyring")]
226    CustomerKeyRequired,
227    /// Inverse: client sent SSE-C headers on a GET for an object stored
228    /// without SSE-C. The supplied key has no role in decryption, but
229    /// AWS S3 actually 400s in this case ("expected an unencrypted
230    /// object" / "extraneous SSE-C headers"), so we mirror that.
231    #[error("S4E1/S4E2 frame stored without SSE-C; SseSource::CustomerKey is unexpected")]
232    CustomerKeyUnexpected,
233    // --- v0.5 #28: SSE-KMS specific errors ---
234    /// `decrypt` (sync) was handed an S4E4 body. SSE-KMS unwrap is
235    /// async (it round-trips to the KMS backend), so callers must
236    /// peek the magic with [`peek_magic`] and dispatch S4E4 frames to
237    /// [`decrypt_with_kms`] instead. service.rs's GET handler does
238    /// this; tests / direct callers may hit this if they forget.
239    #[error(
240        "S4E4 (SSE-KMS) body requires async decrypt — call decrypt_with_kms() instead of decrypt()"
241    )]
242    KmsAsyncRequired,
243    /// S4E4 frame is shorter than the minimum-possible header (38
244    /// bytes for an empty `key_id` + empty `wrapped_dek`, which is
245    /// itself impossible — we just sanity-check the floor).
246    #[error("S4E4 frame too short ({got} bytes; need at least {min})")]
247    KmsFrameTooShort { got: usize, min: usize },
248    /// S4E4 declared a `key_id_len` or `wrapped_dek_len` that runs
249    /// past the end of the body. Almost certainly truncation /
250    /// corruption rather than tampering (tampering would fail the
251    /// AES-GCM tag instead).
252    #[error("S4E4 frame field length out of bounds: {what}")]
253    KmsFrameFieldOob { what: &'static str },
254    /// `key_id` field of an S4E4 frame is not valid UTF-8. We require
255    /// UTF-8 because `LocalKms` uses the basename of a `.kek` file
256    /// (which is OS-string-but-typically-UTF-8) and AWS KMS uses ARNs
257    /// (which are ASCII).
258    #[error("S4E4 key_id is not valid UTF-8")]
259    KmsKeyIdNotUtf8,
260    /// service.rs handed `decrypt_with_kms` a `WrappedDek` whose
261    /// `key_id` doesn't match the one stored in the S4E4 frame. This
262    /// is an integration bug (caller is meant to pull the wrapped
263    /// DEK *from the frame*, not from somewhere else), surface as a
264    /// distinct error so it shows up in tests rather than silently
265    /// failing the AES-GCM tag.
266    #[error(
267        "S4E4 SseSource::Kms wrapped DEK key_id {supplied:?} doesn't match frame key_id {stored:?}"
268    )]
269    KmsWrappedDekMismatch {
270        supplied: String,
271        stored: String,
272    },
273    /// SSE-KMS path got a non-Kms `SseSource` for an S4E4 body. The
274    /// async dispatch in `decrypt_with_kms` re-derives the source
275    /// internally so this can only happen if a future caller passes
276    /// `SseSource::Keyring` / `CustomerKey` to a path that expected
277    /// `Kms` — kept around for symmetry with the other "wrong source"
278    /// errors.
279    #[error("S4E4 frame requires SseSource::Kms")]
280    KmsRequired,
281    /// Pass-through for [`crate::kms::KmsError`] surfaced from
282    /// `KmsBackend::decrypt_dek` — boxed so the variant stays small.
283    #[error("KMS unwrap: {0}")]
284    KmsBackend(#[from] KmsError),
285    // --- v0.8 #52: S4E5 (chunked SSE-S4) specific errors ---
286    /// AES-GCM auth tag verify failed on chunk `chunk_index` of an
287    /// S4E5 body. Distinct from the all-or-nothing
288    /// [`SseError::DecryptFailed`] because the streaming GET may
289    /// have already emitted earlier chunks to the client by the
290    /// time chunk N fails — operators need the chunk index in audit
291    /// logs to triangulate which byte range was tampered with (or
292    /// which disk sector flipped).
293    #[error("S4E5 chunk {chunk_index} auth tag verify failed (key mismatch or chunk tampered with)")]
294    ChunkAuthFailed { chunk_index: u32 },
295    /// Caller asked [`encrypt_v2_chunked`] to use a chunk size of 0
296    /// — nonsensical (would loop forever). Surfaced as an error
297    /// rather than panicking so service.rs can map a bad
298    /// `--sse-chunk-size 0` configuration to a clear startup error.
299    #[error("S4E5 chunk_size must be > 0 (got 0)")]
300    ChunkSizeInvalid,
301    /// S4E5 frame is shorter than the fixed header or declares a
302    /// (chunk_count × per-chunk-bytes) total that overruns the
303    /// body. Almost certainly truncation / corruption — tampering
304    /// with the per-chunk ciphertext or tag would surface as
305    /// [`SseError::ChunkAuthFailed`] instead.
306    #[error("S4E5 frame truncated: {what}")]
307    ChunkFrameTruncated { what: &'static str },
308    // --- v0.8.1 #57: S4E6 (8-byte salt, 24-bit chunk_index) ---
309    /// S4E6 chunk_index is encoded as a 24-bit big-endian field in
310    /// the per-chunk nonce, capping `chunk_count` at
311    /// `2^24 - 1 = 16_777_215`. At the default 1 MiB chunk size that
312    /// is ~16 PiB per object — well past S3's 5 GiB single-object
313    /// ceiling. Surface as a distinct error so a misconfiguration
314    /// (`--sse-chunk-size 1` on a multi-GiB object, say) shows up at
315    /// PUT time with a clear cause rather than a panic at the u32 →
316    /// u24 cast.
317    #[error(
318        "S4E6 chunk_count {got} exceeds 24-bit max ({max}) — pick a larger --sse-chunk-size"
319    )]
320    ChunkCountTooLarge { got: u32, max: u32 },
321    // --- v0.8.2 #64: pre-allocation guard for chunked SSE frames ---
322    /// `parse_chunked_header` rejected an S4E5 / S4E6 frame because
323    /// the declared `chunk_size × chunk_count` (or the on-disk total
324    /// after adding per-chunk tag overhead and the fixed header) is
325    /// either:
326    ///
327    /// 1. arithmetically nonsensical (the multiplication / addition
328    ///    overflows u64 on a 64-bit host), or
329    /// 2. larger than the gateway's configured `max_body_bytes`
330    ///    (default 5 GiB — AWS S3's single-object PUT ceiling).
331    ///
332    /// This is the **DoS guard** for the chunked path: without it, a
333    /// 24-byte malicious header that claims `chunk_size = u32::MAX`
334    /// and `chunk_count = u32::MAX` would have caused the buffered
335    /// decrypt path to attempt a multi-PB `Vec::with_capacity` (or
336    /// integer-overflow into a tiny alloc + later out-of-bounds
337    /// panic) before any cryptographic work happened. Surface as a
338    /// distinct variant — never echo the offending sizes back to the
339    /// client (kept as a `&'static str` `details` field for operator
340    /// audit logs only) so the response isn't a tampering oracle.
341    #[error("S4E5/S4E6 chunked frame declares an over-large size: {details}")]
342    ChunkFrameTooLarge { details: &'static str },
343}
344
345/// Default cap on a single chunked-SSE frame's declared plaintext
346/// size, used by [`decrypt_chunked_buffered_default`] and the
347/// streaming pre-validation path. Mirrors AWS S3's 5 GiB single-object
348/// PUT ceiling — anything larger is unreachable on the AWS-compatible
349/// API surface and is therefore safe to reject pre-alloc as a malformed
350/// / malicious header (v0.8.2 #64).
351pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
352
353/// 32-byte symmetric key. `bytes` is `pub` so call sites can construct
354/// keys directly from already-validated bytes (e.g. KMS-decrypted DEKs)
355/// without going through the on-disk parser. Hold inside an `Arc` when
356/// sharing across handler tasks — `SseKeyring` does this internally.
357pub struct SseKey {
358    pub bytes: [u8; 32],
359}
360
361impl SseKey {
362    /// Load a 32-byte key from disk. Accepts three on-disk encodings:
363    /// raw 32 bytes, 64-char ASCII hex, or 44-char ASCII base64 (with or
364    /// without padding). Whitespace is trimmed.
365    pub fn from_path(path: &Path) -> Result<Self, SseError> {
366        let raw = std::fs::read(path).map_err(|source| SseError::KeyFileIo {
367            path: path.to_path_buf(),
368            source,
369        })?;
370        Self::from_bytes(&raw)
371    }
372
373    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SseError> {
374        // Try raw first.
375        if bytes.len() == KEY_LEN {
376            let mut k = [0u8; KEY_LEN];
377            k.copy_from_slice(bytes);
378            return Ok(Self { bytes: k });
379        }
380        // Trim whitespace and try hex / base64.
381        let s = std::str::from_utf8(bytes).unwrap_or("").trim();
382        if s.len() == KEY_LEN * 2 && s.chars().all(|c| c.is_ascii_hexdigit()) {
383            let mut k = [0u8; KEY_LEN];
384            for (i, k_byte) in k.iter_mut().enumerate() {
385                *k_byte = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16)
386                    .map_err(|_| SseError::BadKeyLength { got: bytes.len() })?;
387            }
388            return Ok(Self { bytes: k });
389        }
390        if let Ok(decoded) =
391            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s.as_bytes())
392            && decoded.len() == KEY_LEN
393        {
394            let mut k = [0u8; KEY_LEN];
395            k.copy_from_slice(&decoded);
396            return Ok(Self { bytes: k });
397        }
398        Err(SseError::BadKeyLength { got: bytes.len() })
399    }
400
401    fn as_aes_key(&self) -> &Key<Aes256Gcm> {
402        Key::<Aes256Gcm>::from_slice(&self.bytes)
403    }
404}
405
406impl std::fmt::Debug for SseKey {
407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408        f.debug_struct("SseKey")
409            .field("len", &KEY_LEN)
410            .field("key", &"<redacted>")
411            .finish()
412    }
413}
414
415/// v0.5 #29: a set of `SseKey`s indexed by `u16` key-id, plus a
416/// designated **active** id used for new encryptions. Rotation is just
417/// "add the new key, flip `active` to its id, leave the old keys for
418/// decryption-only". Cheap to clone (`Arc<SseKey>` per slot).
419#[derive(Clone)]
420pub struct SseKeyring {
421    active: u16,
422    keys: HashMap<u16, Arc<SseKey>>,
423}
424
425impl SseKeyring {
426    /// Create a keyring seeded with one key, immediately marked
427    /// active. Add older keys later via [`SseKeyring::add`] so the
428    /// gateway can still decrypt pre-rotation objects.
429    pub fn new(active: u16, key: Arc<SseKey>) -> Self {
430        let mut keys = HashMap::new();
431        keys.insert(active, key);
432        Self { active, keys }
433    }
434
435    /// Insert another key under id `id`. Does NOT change `active`. If
436    /// `id == active`, the slot is overwritten (useful for tests; in
437    /// production prefer minting a fresh id).
438    pub fn add(&mut self, id: u16, key: Arc<SseKey>) {
439        self.keys.insert(id, key);
440    }
441
442    /// Active (id, key) — used by [`encrypt_v2`] to pick the slot for
443    /// new objects.
444    pub fn active(&self) -> (u16, &SseKey) {
445        let id = self.active;
446        let key = self
447            .keys
448            .get(&id)
449            .expect("active key id must be present in keyring (constructor invariant)");
450        (id, key.as_ref())
451    }
452
453    /// Look up a key by id. Returns `None` for unknown ids — caller
454    /// should surface this as [`SseError::KeyNotInKeyring`].
455    pub fn get(&self, id: u16) -> Option<&SseKey> {
456        self.keys.get(&id).map(Arc::as_ref)
457    }
458}
459
460impl std::fmt::Debug for SseKeyring {
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        f.debug_struct("SseKeyring")
463            .field("active", &self.active)
464            .field("key_count", &self.keys.len())
465            .field("key_ids", &self.keys.keys().collect::<Vec<_>>())
466            .finish()
467    }
468}
469
470pub type SharedSseKeyring = Arc<SseKeyring>;
471
472/// Encrypt `plaintext` with the given key, producing the on-the-wire
473/// S4E1-framed output: `[magic 4][algo 1][reserved 3][nonce 12][tag 16][ciphertext]`.
474///
475/// Kept for back-compat: v0.4 callers that hand-built an `SseKey` (no
476/// keyring) still get the v1 frame. New code should use
477/// [`encrypt_v2`] which writes S4E2 and supports rotation on read.
478pub fn encrypt(key: &SseKey, plaintext: &[u8]) -> Bytes {
479    let cipher = Aes256Gcm::new(key.as_aes_key());
480    let mut nonce_bytes = [0u8; NONCE_LEN];
481    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
482    let nonce = Nonce::from_slice(&nonce_bytes);
483    // AAD = magic + algo. Tampering with either bumps the tag check.
484    let mut aad = [0u8; 8];
485    aad[..4].copy_from_slice(SSE_MAGIC_V1);
486    aad[4] = ALGO_AES_256_GCM;
487    let ct_with_tag = cipher
488        .encrypt(
489            nonce,
490            Payload {
491                msg: plaintext,
492                aad: &aad,
493            },
494        )
495        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
496    debug_assert!(ct_with_tag.len() >= TAG_LEN);
497    let split = ct_with_tag.len() - TAG_LEN;
498    let (ct, tag) = ct_with_tag.split_at(split);
499
500    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
501    out.extend_from_slice(SSE_MAGIC_V1);
502    out.push(ALGO_AES_256_GCM);
503    out.extend_from_slice(&[0u8; 3]); // reserved
504    out.extend_from_slice(&nonce_bytes);
505    out.extend_from_slice(tag);
506    out.extend_from_slice(ct);
507    Bytes::from(out)
508}
509
510/// v0.5 #29: encrypt under the keyring's currently-active key, writing
511/// an S4E2-framed body (`[magic 4][algo 1][key_id 2 BE][reserved 1]
512/// [nonce 12][tag 16][ciphertext]`). The key-id is included in the
513/// AAD so flipping it fails the auth tag.
514pub fn encrypt_v2(plaintext: &[u8], keyring: &SseKeyring) -> Bytes {
515    let (key_id, key) = keyring.active();
516    let cipher = Aes256Gcm::new(key.as_aes_key());
517    let mut nonce_bytes = [0u8; NONCE_LEN];
518    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
519    let nonce = Nonce::from_slice(&nonce_bytes);
520    let aad = aad_v2(key_id);
521    let ct_with_tag = cipher
522        .encrypt(
523            nonce,
524            Payload {
525                msg: plaintext,
526                aad: &aad,
527            },
528        )
529        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
530    debug_assert!(ct_with_tag.len() >= TAG_LEN);
531    let split = ct_with_tag.len() - TAG_LEN;
532    let (ct, tag) = ct_with_tag.split_at(split);
533
534    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
535    out.extend_from_slice(SSE_MAGIC_V2);
536    out.push(ALGO_AES_256_GCM);
537    out.extend_from_slice(&key_id.to_be_bytes()); // 2B BE key_id
538    out.push(0u8); // 1B reserved
539    out.extend_from_slice(&nonce_bytes);
540    out.extend_from_slice(tag);
541    out.extend_from_slice(ct);
542    Bytes::from(out)
543}
544
545fn aad_v1() -> [u8; 8] {
546    let mut aad = [0u8; 8];
547    aad[..4].copy_from_slice(SSE_MAGIC_V1);
548    aad[4] = ALGO_AES_256_GCM;
549    aad
550}
551
552fn aad_v2(key_id: u16) -> [u8; 8] {
553    let mut aad = [0u8; 8];
554    aad[..4].copy_from_slice(SSE_MAGIC_V2);
555    aad[4] = ALGO_AES_256_GCM;
556    aad[5..7].copy_from_slice(&key_id.to_be_bytes());
557    aad[7] = 0u8;
558    aad
559}
560
561/// AAD for S4E3 = magic (4) + algo (1) + key_md5 (16). Putting the
562/// fingerprint in the AAD means tampering with the stored MD5 (e.g. an
563/// attacker rewriting the header to match a *different* key they
564/// happen to know) breaks the AES-GCM tag — the wrong-key check isn't
565/// just a plain `==` we could be tricked past.
566fn aad_v3(key_md5: &[u8; KEY_MD5_LEN]) -> [u8; 4 + 1 + KEY_MD5_LEN] {
567    let mut aad = [0u8; 4 + 1 + KEY_MD5_LEN];
568    aad[..4].copy_from_slice(SSE_MAGIC_V3);
569    aad[4] = ALGO_AES_256_GCM;
570    aad[5..5 + KEY_MD5_LEN].copy_from_slice(key_md5);
571    aad
572}
573
574/// Parsed + verified SSE-C key material from the three customer
575/// headers. `key_md5` is the MD5 of `key` (we recompute and compare in
576/// [`parse_customer_key_headers`] — clients send their own to catch
577/// transport corruption, but we *trust* our own computation as the
578/// canonical fingerprint in the S4E3 frame).
579#[derive(Clone)]
580pub struct CustomerKeyMaterial {
581    pub key: [u8; KEY_LEN],
582    pub key_md5: [u8; KEY_MD5_LEN],
583}
584
585impl std::fmt::Debug for CustomerKeyMaterial {
586    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
587        // Don't leak the key into logs. The MD5 is a public fingerprint
588        // (S3 puts it on the wire), so that's safe to show.
589        f.debug_struct("CustomerKeyMaterial")
590            .field("key", &"<redacted>")
591            .field("key_md5_hex", &hex_lower(&self.key_md5))
592            .finish()
593    }
594}
595
596fn hex_lower(bytes: &[u8]) -> String {
597    let mut s = String::with_capacity(bytes.len() * 2);
598    for b in bytes {
599        s.push_str(&format!("{b:02x}"));
600    }
601    s
602}
603
604/// Source of the encryption key for [`encrypt_with_source`] /
605/// [`decrypt`]. SSE-S4 (server-managed, rotation-aware) goes through
606/// `Keyring`; SSE-C (customer-supplied) goes through `CustomerKey`.
607///
608/// Borrowed (not owned) so the caller can hold a long-lived
609/// `CustomerKeyMaterial` next to the request and just lend it for the
610/// duration of one PUT/GET.
611#[derive(Debug, Clone, Copy)]
612pub enum SseSource<'a> {
613    /// Server-managed keyring path → produces / consumes S4E1 (legacy)
614    /// or S4E2 (rotation-aware) frames.
615    Keyring(&'a SseKeyring),
616    /// Client-supplied AES-256 key + its MD5 fingerprint → produces /
617    /// consumes S4E3 frames. The server never persists the key; it
618    /// stores `key_md5` only.
619    CustomerKey {
620        key: &'a [u8; KEY_LEN],
621        key_md5: &'a [u8; KEY_MD5_LEN],
622    },
623    /// SSE-KMS envelope → produces / consumes S4E4 frames. The server
624    /// holds a per-object plaintext DEK (from a fresh
625    /// [`KmsBackend::generate_dek`] call) and the wrapped form to
626    /// persist alongside the body. The DEK is dropped after one
627    /// PUT/GET; only the wrapped form survives at rest.
628    Kms {
629        /// 32-byte plaintext DEK, used as the AES-GCM key.
630        dek: &'a [u8; KEY_LEN],
631        /// Wrapped form to persist in the S4E4 frame (PUT) or the one
632        /// read out of the frame (GET, after a successful unwrap).
633        wrapped: &'a WrappedDek,
634    },
635}
636
637/// Back-compat coercion: existing call sites pass `&SseKeyring`
638/// directly to [`decrypt`]. With this `From` impl the generic bound
639/// `Into<SseSource>` accepts `&SseKeyring` without the caller writing
640/// `.into()`, keeping v0.4 / v0.5 #29 service.rs callers compiling
641/// untouched while v0.5 #27 SSE-C callers pass `SseSource::CustomerKey`
642/// explicitly.
643impl<'a> From<&'a SseKeyring> for SseSource<'a> {
644    fn from(kr: &'a SseKeyring) -> Self {
645        SseSource::Keyring(kr)
646    }
647}
648
649/// service.rs holds keyring as `Option<Arc<SseKeyring>>` and unwraps to
650/// `&Arc<SseKeyring>` — let that coerce too, otherwise every existing
651/// call site needs `.as_ref()` boilerplate.
652impl<'a> From<&'a Arc<SseKeyring>> for SseSource<'a> {
653    fn from(kr: &'a Arc<SseKeyring>) -> Self {
654        SseSource::Keyring(kr.as_ref())
655    }
656}
657
658impl<'a> From<&'a CustomerKeyMaterial> for SseSource<'a> {
659    fn from(m: &'a CustomerKeyMaterial) -> Self {
660        SseSource::CustomerKey {
661            key: &m.key,
662            key_md5: &m.key_md5,
663        }
664    }
665}
666
667/// Parse the three AWS SSE-C headers and return verified key material.
668///
669/// Validates, in order:
670/// 1. `algorithm == "AES256"` (the only value AWS S3 defines).
671/// 2. `key_base64` decodes to exactly 32 bytes (AES-256 key length).
672/// 3. `key_md5_base64` decodes to exactly 16 bytes (MD5 digest length).
673/// 4. The actual MD5 of the decoded key matches the supplied MD5.
674///
675/// Step 4 catches transport corruption *and* a class of programming
676/// bugs where the client signs with one key but uploads another. AWS
677/// S3 also performs this check.
678pub fn parse_customer_key_headers(
679    algorithm: &str,
680    key_base64: &str,
681    key_md5_base64: &str,
682) -> Result<CustomerKeyMaterial, SseError> {
683    use base64::Engine as _;
684    if algorithm != SSE_C_ALGORITHM {
685        return Err(SseError::CustomerKeyAlgorithmUnsupported {
686            algo: algorithm.to_string(),
687        });
688    }
689    let key_bytes = base64::engine::general_purpose::STANDARD
690        .decode(key_base64.trim().as_bytes())
691        .map_err(|_| SseError::InvalidCustomerKey {
692            reason: "base64 decode of key",
693        })?;
694    if key_bytes.len() != KEY_LEN {
695        return Err(SseError::InvalidCustomerKey {
696            reason: "key length (must be 32 bytes after base64 decode)",
697        });
698    }
699    let supplied_md5 = base64::engine::general_purpose::STANDARD
700        .decode(key_md5_base64.trim().as_bytes())
701        .map_err(|_| SseError::InvalidCustomerKey {
702            reason: "base64 decode of key MD5",
703        })?;
704    if supplied_md5.len() != KEY_MD5_LEN {
705        return Err(SseError::InvalidCustomerKey {
706            reason: "key MD5 length (must be 16 bytes after base64 decode)",
707        });
708    }
709    let actual_md5 = compute_key_md5(&key_bytes);
710    // Constant-time compare — paranoia, MD5 is non-secret but the key
711    // it identifies is, so we don't want a timing oracle.
712    if !constant_time_eq(&actual_md5, &supplied_md5) {
713        return Err(SseError::InvalidCustomerKey {
714            reason: "supplied MD5 does not match MD5 of supplied key",
715        });
716    }
717    let mut key = [0u8; KEY_LEN];
718    key.copy_from_slice(&key_bytes);
719    let mut key_md5 = [0u8; KEY_MD5_LEN];
720    key_md5.copy_from_slice(&actual_md5);
721    Ok(CustomerKeyMaterial { key, key_md5 })
722}
723
724/// Convenience wrapper — compute the MD5 fingerprint of a 32-byte
725/// customer key. Callers that already have the bytes (e.g. derived
726/// from a KMS unwrap) can use this to construct a
727/// [`CustomerKeyMaterial`] directly.
728pub fn compute_key_md5(key: &[u8]) -> [u8; KEY_MD5_LEN] {
729    let mut h = Md5::new();
730    h.update(key);
731    let out = h.finalize();
732    let mut md5 = [0u8; KEY_MD5_LEN];
733    md5.copy_from_slice(&out);
734    md5
735}
736
737/// `subtle`-free constant-time byte slice equality. We only need this
738/// at one site (MD5 verification) so pulling `subtle` in feels excessive.
739fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
740    if a.len() != b.len() {
741        return false;
742    }
743    let mut acc: u8 = 0;
744    for (x, y) in a.iter().zip(b.iter()) {
745        acc |= x ^ y;
746    }
747    acc == 0
748}
749
750/// v0.5 #27: encrypt under whichever source the caller picked.
751///
752/// - `SseSource::Keyring` → delegates to [`encrypt_v2`] (S4E2 frame).
753/// - `SseSource::CustomerKey` → writes an S4E3 frame (no key persisted,
754///   just the MD5 fingerprint for GET-side verification).
755///
756/// service.rs picks the source per-request: SSE-C headers present →
757/// `CustomerKey`, otherwise (and only when `--sse-s4-key` is wired) →
758/// `Keyring`. Plaintext objects skip this function entirely.
759pub fn encrypt_with_source(plaintext: &[u8], source: SseSource<'_>) -> Bytes {
760    match source {
761        SseSource::Keyring(kr) => encrypt_v2(plaintext, kr),
762        SseSource::CustomerKey { key, key_md5 } => encrypt_v3(plaintext, key, key_md5),
763        SseSource::Kms { dek, wrapped } => encrypt_v4(plaintext, dek, wrapped),
764    }
765}
766
767fn encrypt_v3(
768    plaintext: &[u8],
769    key: &[u8; KEY_LEN],
770    key_md5: &[u8; KEY_MD5_LEN],
771) -> Bytes {
772    let aes_key = Key::<Aes256Gcm>::from_slice(key);
773    let cipher = Aes256Gcm::new(aes_key);
774    let mut nonce_bytes = [0u8; NONCE_LEN];
775    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
776    let nonce = Nonce::from_slice(&nonce_bytes);
777    let aad = aad_v3(key_md5);
778    let ct_with_tag = cipher
779        .encrypt(
780            nonce,
781            Payload {
782                msg: plaintext,
783                aad: &aad,
784            },
785        )
786        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
787    debug_assert!(ct_with_tag.len() >= TAG_LEN);
788    let split = ct_with_tag.len() - TAG_LEN;
789    let (ct, tag) = ct_with_tag.split_at(split);
790
791    let mut out = Vec::with_capacity(SSE_HEADER_BYTES_V3 + ct.len());
792    out.extend_from_slice(SSE_MAGIC_V3);
793    out.push(ALGO_AES_256_GCM);
794    out.extend_from_slice(key_md5);
795    out.extend_from_slice(&nonce_bytes);
796    out.extend_from_slice(tag);
797    out.extend_from_slice(ct);
798    Bytes::from(out)
799}
800
801/// v0.5 #29 + v0.5 #27: dispatch on the body's magic and decrypt under
802/// whichever source the caller supplied.
803///
804/// - `S4E1` / `S4E2` require `SseSource::Keyring` (return
805///   [`SseError::CustomerKeyRequired`] for `CustomerKey` — service.rs
806///   should map this to "extraneous SSE-C headers" 400).
807/// - `S4E3` requires `SseSource::CustomerKey` (return
808///   [`SseError::CustomerKeyUnexpected`] for `Keyring` — service.rs
809///   should map this to "missing SSE-C headers" 400).
810///
811/// Generic over `Into<SseSource>` so existing `decrypt(body, &keyring)`
812/// call sites compile unchanged via the `From<&SseKeyring>` impl above
813/// — only the new SSE-C path needs to type out
814/// `SseSource::CustomerKey { .. }`.
815///
816/// Distinct errors (`KeyNotInKeyring`, `DecryptFailed`,
817/// `WrongCustomerKey`) let operators tell rotation gaps, ciphertext
818/// tampering, and SSE-C key mismatch apart in audit logs.
819pub fn decrypt<'a, S: Into<SseSource<'a>>>(body: &[u8], source: S) -> Result<Bytes, SseError> {
820    let source = source.into();
821    // Outer short-check uses the smaller of the two header sizes
822    // (S4E1/S4E2 = 36 bytes). Anything below this can't be any valid
823    // SSE frame regardless of magic — keeps back-compat with v0.4 /
824    // v0.5 #29 callers that expected `TooShort` for absurdly short
825    // inputs even when the magic is garbage.
826    if body.len() < SSE_HEADER_BYTES {
827        return Err(SseError::TooShort { got: body.len() });
828    }
829    let mut magic = [0u8; 4];
830    magic.copy_from_slice(&body[..4]);
831    match &magic {
832        m if m == SSE_MAGIC_V1 || m == SSE_MAGIC_V2 => {
833            let keyring = match source {
834                SseSource::Keyring(kr) => kr,
835                SseSource::CustomerKey { .. } => return Err(SseError::CustomerKeyUnexpected),
836                // S4E1/E2 stored under the keyring → SseSource::Kms
837                // is just as nonsensical as CustomerKey here. Re-use
838                // the same "wrong source" error so service.rs can
839                // map both to AWS S3's "extraneous SSE-* headers"
840                // 400.
841                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
842            };
843            if m == SSE_MAGIC_V1 {
844                decrypt_v1_with_keyring(body, keyring)
845            } else {
846                decrypt_v2_with_keyring(body, keyring)
847            }
848        }
849        m if m == SSE_MAGIC_V3 => {
850            // S4E3 has a larger 49-byte header, so re-check.
851            if body.len() < SSE_HEADER_BYTES_V3 {
852                return Err(SseError::TooShort { got: body.len() });
853            }
854            let (key, key_md5) = match source {
855                SseSource::CustomerKey { key, key_md5 } => (key, key_md5),
856                SseSource::Keyring(_) => return Err(SseError::CustomerKeyRequired),
857                SseSource::Kms { .. } => return Err(SseError::CustomerKeyRequired),
858            };
859            decrypt_v3(body, key, key_md5)
860        }
861        m if m == SSE_MAGIC_V4 => {
862            // SSE-KMS unwrap is async (KMS round-trip required).
863            // Caller must dispatch to `decrypt_with_kms` after
864            // peeking the magic — surface this as a distinct error
865            // rather than silently failing.
866            Err(SseError::KmsAsyncRequired)
867        }
868        m if m == SSE_MAGIC_V5 || m == SSE_MAGIC_V6 => {
869            // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): chunked SSE-S4.
870            // Sync back-compat path — verifies + decrypts every
871            // chunk into a single Bytes. Callers that want true
872            // streaming (per-chunk emit) should use
873            // `decrypt_chunked_stream` instead. SSE-C and SSE-KMS
874            // sources are nonsensical here for the same reason as
875            // S4E2 (server-managed keyring only).
876            let keyring = match source {
877                SseSource::Keyring(kr) => kr,
878                SseSource::CustomerKey { .. } => {
879                    return Err(SseError::CustomerKeyUnexpected);
880                }
881                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
882            };
883            // v0.8.2 #64: route through the back-compat wrapper so
884            // the public `decrypt` signature stays stable while the
885            // pre-allocation guard runs against the gateway's
886            // 5 GiB single-object cap. Bespoke callers (e.g. tests
887            // or future API surfaces that want a tighter limit) can
888            // call `decrypt_chunked_buffered` directly.
889            decrypt_chunked_buffered_default(body, keyring)
890        }
891        _ => Err(SseError::BadMagic { got: magic }),
892    }
893}
894
895fn decrypt_v3(
896    body: &[u8],
897    key: &[u8; KEY_LEN],
898    supplied_md5: &[u8; KEY_MD5_LEN],
899) -> Result<Bytes, SseError> {
900    let algo = body[4];
901    if algo != ALGO_AES_256_GCM {
902        return Err(SseError::UnsupportedAlgo { tag: algo });
903    }
904    let mut stored_md5 = [0u8; KEY_MD5_LEN];
905    stored_md5.copy_from_slice(&body[5..5 + KEY_MD5_LEN]);
906    // Cheap fingerprint check first — if the supplied key has a
907    // different MD5 than what was used at PUT, fail fast with a
908    // dedicated error. AES-GCM auth would also catch this (different
909    // key → bad tag) but the bespoke error gives operators an audit
910    // signal distinct from "ciphertext was tampered with".
911    if !constant_time_eq(supplied_md5, &stored_md5) {
912        return Err(SseError::WrongCustomerKey);
913    }
914    let nonce_off = 5 + KEY_MD5_LEN;
915    let tag_off = nonce_off + NONCE_LEN;
916    let mut nonce_bytes = [0u8; NONCE_LEN];
917    nonce_bytes.copy_from_slice(&body[nonce_off..nonce_off + NONCE_LEN]);
918    let mut tag_bytes = [0u8; TAG_LEN];
919    tag_bytes.copy_from_slice(&body[tag_off..tag_off + TAG_LEN]);
920    let ct = &body[SSE_HEADER_BYTES_V3..];
921
922    let aad = aad_v3(&stored_md5);
923    let nonce = Nonce::from_slice(&nonce_bytes);
924    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
925    ct_with_tag.extend_from_slice(ct);
926    ct_with_tag.extend_from_slice(&tag_bytes);
927
928    let aes_key = Key::<Aes256Gcm>::from_slice(key);
929    let cipher = Aes256Gcm::new(aes_key);
930    let plain = cipher
931        .decrypt(
932            nonce,
933            Payload {
934                msg: &ct_with_tag,
935                aad: &aad,
936            },
937        )
938        .map_err(|_| SseError::DecryptFailed)?;
939    Ok(Bytes::from(plain))
940}
941
942/// AAD for S4E4 = magic (4) + algo (1) + key_id_len (1) + key_id +
943/// wrapped_dek_len (4 BE) + wrapped_dek. Putting the variable-length
944/// key_id and wrapped_dek into the AAD means an attacker cannot
945/// rewrite either field to redirect the gateway to a different KEK
946/// or wrapped DEK without invalidating the body's AES-GCM tag.
947///
948/// Length-prefixing key_id and wrapped_dek inside the AAD prevents a
949/// canonicalisation ambiguity: without the length prefix, an
950/// attacker could shift bytes between the two fields and produce the
951/// same AAD bytestream, defeating the per-field tampering check.
952fn aad_v4(key_id: &[u8], wrapped_dek: &[u8]) -> Vec<u8> {
953    let mut aad = Vec::with_capacity(4 + 1 + 1 + key_id.len() + 4 + wrapped_dek.len());
954    aad.extend_from_slice(SSE_MAGIC_V4);
955    aad.push(ALGO_AES_256_GCM);
956    aad.push(key_id.len() as u8);
957    aad.extend_from_slice(key_id);
958    aad.extend_from_slice(&(wrapped_dek.len() as u32).to_be_bytes());
959    aad.extend_from_slice(wrapped_dek);
960    aad
961}
962
963fn encrypt_v4(plaintext: &[u8], dek: &[u8; KEY_LEN], wrapped: &WrappedDek) -> Bytes {
964    // Pre-conditions: key_id must fit in a u8 length prefix and be
965    // non-empty (an empty id means we wouldn't be able to re-fetch
966    // the KEK on GET). wrapped_dek length fits in u32 by the same
967    // logic — at u32::MAX bytes you have bigger problems. We assert
968    // these in debug and silently truncate-or-panic in release; in
969    // practice key_id is a UUID or ARN (<256 chars) and wrapped_dek
970    // is 60 bytes (LocalKms) or ~200 bytes (AWS KMS).
971    assert!(
972        !wrapped.key_id.is_empty() && wrapped.key_id.len() <= u8::MAX as usize,
973        "S4E4 key_id must be 1..=255 bytes (got {})",
974        wrapped.key_id.len()
975    );
976    assert!(
977        wrapped.ciphertext.len() <= u32::MAX as usize,
978        "S4E4 wrapped_dek longer than u32::MAX",
979    );
980
981    let aes_key = Key::<Aes256Gcm>::from_slice(dek);
982    let cipher = Aes256Gcm::new(aes_key);
983    let mut nonce_bytes = [0u8; NONCE_LEN];
984    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
985    let nonce = Nonce::from_slice(&nonce_bytes);
986    let aad = aad_v4(wrapped.key_id.as_bytes(), &wrapped.ciphertext);
987    let ct_with_tag = cipher
988        .encrypt(
989            nonce,
990            Payload {
991                msg: plaintext,
992                aad: &aad,
993            },
994        )
995        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
996    debug_assert!(ct_with_tag.len() >= TAG_LEN);
997    let split = ct_with_tag.len() - TAG_LEN;
998    let (ct, tag) = ct_with_tag.split_at(split);
999
1000    let key_id_bytes = wrapped.key_id.as_bytes();
1001    let mut out = Vec::with_capacity(
1002        4 + 1 + 1 + key_id_bytes.len() + 4 + wrapped.ciphertext.len() + NONCE_LEN + TAG_LEN + ct.len(),
1003    );
1004    out.extend_from_slice(SSE_MAGIC_V4);
1005    out.push(ALGO_AES_256_GCM);
1006    out.push(key_id_bytes.len() as u8);
1007    out.extend_from_slice(key_id_bytes);
1008    out.extend_from_slice(&(wrapped.ciphertext.len() as u32).to_be_bytes());
1009    out.extend_from_slice(&wrapped.ciphertext);
1010    out.extend_from_slice(&nonce_bytes);
1011    out.extend_from_slice(tag);
1012    out.extend_from_slice(ct);
1013    Bytes::from(out)
1014}
1015
1016/// Parsed view of an S4E4 frame's variable-length header. Returned
1017/// by [`parse_s4e4_header`] so both the async [`decrypt_with_kms`]
1018/// path and any future inspection code (e.g. an admin tool that
1019/// needs to enumerate object → KMS-key bindings) can reuse the same
1020/// parser without re-implementing offset math.
1021#[derive(Debug)]
1022pub struct S4E4Header<'a> {
1023    pub key_id: &'a str,
1024    pub wrapped_dek: &'a [u8],
1025    pub nonce: &'a [u8],
1026    pub tag: &'a [u8],
1027    pub ciphertext: &'a [u8],
1028}
1029
1030/// Parse the (variable-length) S4E4 header. Pure byte-shuffling — no
1031/// crypto, no KMS round-trip. Returns errors on truncation /
1032/// out-of-bounds field lengths / non-UTF-8 key_id.
1033pub fn parse_s4e4_header(body: &[u8]) -> Result<S4E4Header<'_>, SseError> {
1034    // Minimum: magic(4) + algo(1) + key_id_len(1) + key_id(>=1) +
1035    // wrapped_dek_len(4) + wrapped_dek(>=1) + nonce(12) + tag(16)
1036    // = 40 bytes. We use a slightly looser floor here (bytes for
1037    // empty fields = 38) and let the per-field bounds checks below
1038    // catch the actual short reads.
1039    const S4E4_MIN: usize = 4 + 1 + 1 + 4 + NONCE_LEN + TAG_LEN; // 38
1040    if body.len() < S4E4_MIN {
1041        return Err(SseError::KmsFrameTooShort {
1042            got: body.len(),
1043            min: S4E4_MIN,
1044        });
1045    }
1046    let magic = &body[..4];
1047    if magic != SSE_MAGIC_V4 {
1048        let mut got = [0u8; 4];
1049        got.copy_from_slice(magic);
1050        return Err(SseError::BadMagic { got });
1051    }
1052    let algo = body[4];
1053    if algo != ALGO_AES_256_GCM {
1054        return Err(SseError::UnsupportedAlgo { tag: algo });
1055    }
1056    let key_id_len = body[5] as usize;
1057    let key_id_off: usize = 6;
1058    let key_id_end = key_id_off
1059        .checked_add(key_id_len)
1060        .ok_or(SseError::KmsFrameFieldOob { what: "key_id_len" })?;
1061    if key_id_end + 4 > body.len() {
1062        return Err(SseError::KmsFrameFieldOob { what: "key_id" });
1063    }
1064    let key_id = std::str::from_utf8(&body[key_id_off..key_id_end])
1065        .map_err(|_| SseError::KmsKeyIdNotUtf8)?;
1066    let wrapped_len_off = key_id_end;
1067    let wrapped_dek_len = u32::from_be_bytes([
1068        body[wrapped_len_off],
1069        body[wrapped_len_off + 1],
1070        body[wrapped_len_off + 2],
1071        body[wrapped_len_off + 3],
1072    ]) as usize;
1073    let wrapped_off = wrapped_len_off + 4;
1074    let wrapped_end = wrapped_off
1075        .checked_add(wrapped_dek_len)
1076        .ok_or(SseError::KmsFrameFieldOob { what: "wrapped_dek_len" })?;
1077    if wrapped_end + NONCE_LEN + TAG_LEN > body.len() {
1078        return Err(SseError::KmsFrameFieldOob { what: "wrapped_dek" });
1079    }
1080    let wrapped_dek = &body[wrapped_off..wrapped_end];
1081    let nonce_off = wrapped_end;
1082    let tag_off = nonce_off + NONCE_LEN;
1083    let ct_off = tag_off + TAG_LEN;
1084    let nonce = &body[nonce_off..nonce_off + NONCE_LEN];
1085    let tag = &body[tag_off..tag_off + TAG_LEN];
1086    let ciphertext = &body[ct_off..];
1087    Ok(S4E4Header {
1088        key_id,
1089        wrapped_dek,
1090        nonce,
1091        tag,
1092        ciphertext,
1093    })
1094}
1095
1096/// Async decrypt for S4E4 (SSE-KMS) bodies. Caller supplies the KMS
1097/// backend; this function parses the frame, calls
1098/// `kms.decrypt_dek(...)` to unwrap the DEK, then runs AES-256-GCM
1099/// to recover the plaintext.
1100///
1101/// service.rs's GET handler should peek the magic with [`peek_magic`]
1102/// and dispatch:
1103///
1104/// - `Some("S4E4")` → `decrypt_with_kms(blob, &*kms).await`
1105/// - everything else → existing sync `decrypt(blob, source)`
1106///
1107/// Note: we don't go through `SseSource::Kms` here because the
1108/// wrapped DEK + key_id come from the frame itself, not from the
1109/// request — the `SseSource` is built for sync paths where the
1110/// caller already knows the key.
1111pub async fn decrypt_with_kms(
1112    body: &[u8],
1113    kms: &dyn KmsBackend,
1114) -> Result<Bytes, SseError> {
1115    let hdr = parse_s4e4_header(body)?;
1116    let wrapped = WrappedDek {
1117        key_id: hdr.key_id.to_string(),
1118        ciphertext: hdr.wrapped_dek.to_vec(),
1119    };
1120    let dek_vec = kms.decrypt_dek(&wrapped).await?;
1121    if dek_vec.len() != KEY_LEN {
1122        // KMS returned a non-32-byte plaintext. AES-256 needs exactly
1123        // 32 bytes. This shouldn't happen with `KeySpec=AES_256` but
1124        // surface as a backend error so it's auditable rather than
1125        // panicking.
1126        return Err(SseError::KmsBackend(KmsError::BackendUnavailable {
1127            message: format!(
1128                "KMS returned {} byte DEK; expected {KEY_LEN}",
1129                dek_vec.len()
1130            ),
1131        }));
1132    }
1133    let mut dek = [0u8; KEY_LEN];
1134    dek.copy_from_slice(&dek_vec);
1135
1136    let aad = aad_v4(hdr.key_id.as_bytes(), hdr.wrapped_dek);
1137    let aes_key = Key::<Aes256Gcm>::from_slice(&dek);
1138    let cipher = Aes256Gcm::new(aes_key);
1139    let nonce = Nonce::from_slice(hdr.nonce);
1140    let mut ct_with_tag = Vec::with_capacity(hdr.ciphertext.len() + TAG_LEN);
1141    ct_with_tag.extend_from_slice(hdr.ciphertext);
1142    ct_with_tag.extend_from_slice(hdr.tag);
1143    let plain = cipher
1144        .decrypt(
1145            nonce,
1146            Payload {
1147                msg: &ct_with_tag,
1148                aad: &aad,
1149            },
1150        )
1151        .map_err(|_| SseError::DecryptFailed)?;
1152    Ok(Bytes::from(plain))
1153}
1154
1155fn decrypt_v1_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1156    let algo = body[4];
1157    if algo != ALGO_AES_256_GCM {
1158        return Err(SseError::UnsupportedAlgo { tag: algo });
1159    }
1160    // body[5..8] reserved (must be ignored — v0.4 wrote zeros, but we
1161    // didn't auth them so we can't insist on it).
1162    let mut nonce_bytes = [0u8; NONCE_LEN];
1163    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1164    let mut tag_bytes = [0u8; TAG_LEN];
1165    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1166    let ct = &body[SSE_HEADER_BYTES..];
1167
1168    let aad = aad_v1();
1169    let nonce = Nonce::from_slice(&nonce_bytes);
1170    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1171    ct_with_tag.extend_from_slice(ct);
1172    ct_with_tag.extend_from_slice(&tag_bytes);
1173
1174    // Active key first, then any others. v0.4 deployments that flip to
1175    // v0.5 with their original key as active hit this path on the
1176    // first try for every legacy object.
1177    let (active_id, _active_key) = keyring.active();
1178    let mut ids: Vec<u16> = keyring.keys.keys().copied().collect();
1179    ids.sort_by_key(|id| if *id == active_id { 0 } else { 1 });
1180    for id in ids {
1181        let key = keyring.get(id).expect("id came from keyring iteration");
1182        let cipher = Aes256Gcm::new(key.as_aes_key());
1183        if let Ok(plain) = cipher.decrypt(
1184            nonce,
1185            Payload {
1186                msg: &ct_with_tag,
1187                aad: &aad,
1188            },
1189        ) {
1190            return Ok(Bytes::from(plain));
1191        }
1192    }
1193    Err(SseError::DecryptFailed)
1194}
1195
1196fn decrypt_v2_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1197    let algo = body[4];
1198    if algo != ALGO_AES_256_GCM {
1199        return Err(SseError::UnsupportedAlgo { tag: algo });
1200    }
1201    let key_id = u16::from_be_bytes([body[5], body[6]]);
1202    // body[7] reserved (1B), authenticated as 0 via AAD.
1203    let key = keyring
1204        .get(key_id)
1205        .ok_or(SseError::KeyNotInKeyring { id: key_id })?;
1206    let mut nonce_bytes = [0u8; NONCE_LEN];
1207    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1208    let mut tag_bytes = [0u8; TAG_LEN];
1209    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1210    let ct = &body[SSE_HEADER_BYTES..];
1211
1212    let aad = aad_v2(key_id);
1213    let nonce = Nonce::from_slice(&nonce_bytes);
1214    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1215    ct_with_tag.extend_from_slice(ct);
1216    ct_with_tag.extend_from_slice(&tag_bytes);
1217    let cipher = Aes256Gcm::new(key.as_aes_key());
1218    let plain = cipher
1219        .decrypt(
1220            nonce,
1221            Payload {
1222                msg: &ct_with_tag,
1223                aad: &aad,
1224            },
1225        )
1226        .map_err(|_| SseError::DecryptFailed)?;
1227    Ok(Bytes::from(plain))
1228}
1229
1230/// Detect whether `body` is SSE-S4 encrypted (S4E1, S4E2, S4E3, or
1231/// S4E4) by sniffing the first 4 magic bytes. Used by the GET path
1232/// to decide whether to run decryption before frame parsing.
1233///
1234/// We require a length check that's safe for *any* of the four
1235/// frames — `SSE_HEADER_BYTES` (36) is the smallest valid header
1236/// (S4E1 / S4E2). S4E3 is 49 bytes; S4E4 is variable but always >=
1237/// 38 bytes. The per-frame decrypt path re-checks the appropriate
1238/// minimum, so this 36-byte gate is just a fast rejection of
1239/// obviously-too-short bodies.
1240pub fn looks_encrypted(body: &[u8]) -> bool {
1241    if body.len() < SSE_HEADER_BYTES {
1242        return false;
1243    }
1244    let m = &body[..4];
1245    m == SSE_MAGIC_V1
1246        || m == SSE_MAGIC_V2
1247        || m == SSE_MAGIC_V3
1248        || m == SSE_MAGIC_V4
1249        || m == SSE_MAGIC_V5
1250        || m == SSE_MAGIC_V6
1251}
1252
1253/// Peek the SSE-S4 magic at the front of `body`, returning a
1254/// stringified frame variant identifier or `None` if `body` is not
1255/// recognized as SSE-S4. Used by the GET path to dispatch between
1256/// the sync [`decrypt`] (S4E1/E2/E3) and the async
1257/// [`decrypt_with_kms`] (S4E4).
1258///
1259/// Returns the same length-gated result as [`looks_encrypted`]: any
1260/// body shorter than `SSE_HEADER_BYTES` (36 bytes) returns `None`,
1261/// so the caller can use this as both the "is encrypted" signal and
1262/// the "which frame" signal in one cheap byte-comparison.
1263pub fn peek_magic(body: &[u8]) -> Option<&'static str> {
1264    if body.len() < SSE_HEADER_BYTES {
1265        return None;
1266    }
1267    match &body[..4] {
1268        m if m == SSE_MAGIC_V1 => Some("S4E1"),
1269        m if m == SSE_MAGIC_V2 => Some("S4E2"),
1270        m if m == SSE_MAGIC_V3 => Some("S4E3"),
1271        m if m == SSE_MAGIC_V4 => Some("S4E4"),
1272        // v0.8 #52: chunked SSE-S4. service.rs's GET handler
1273        // dispatches "S4E5" / "S4E6" → `decrypt_chunked_stream`
1274        // for true streaming GET; the sync `decrypt(...)` also
1275        // accepts both (back-compat — buffered concat).
1276        m if m == SSE_MAGIC_V5 => Some("S4E5"),
1277        // v0.8.1 #57: same dispatch as S4E5 — wider salt only.
1278        m if m == SSE_MAGIC_V6 => Some("S4E6"),
1279        _ => None,
1280    }
1281}
1282
1283pub type SharedSseKey = Arc<SseKey>;
1284
1285// ===========================================================================
1286// v0.8 #52 (S4E5, read-only) + v0.8.1 #57 (S4E6, current emit) —
1287// chunked variant of S4E2 for streaming GET
1288// ===========================================================================
1289//
1290// ## S4E5 wire format (v0.8 #52, **read-only as of v0.8.1 #57**)
1291//
1292// ```text
1293// magic         4B    "S4E5"
1294// algo          1B    0x01 (AES-256-GCM)
1295// key_id        2B    BE — keyring slot the active key was at PUT time
1296// reserved      1B    0x00
1297// chunk_size    4B    BE — plaintext bytes per chunk (final chunk may be smaller)
1298// chunk_count   4B    BE — total chunks (always >= 1; empty plaintext = 1 zero-byte chunk)
1299// salt          4B    random per-PUT, mixed into every nonce
1300// [chunk_count] × {
1301//   tag         16B   AES-GCM auth tag for this chunk
1302//   ciphertext  N B   chunk_size bytes (final chunk: 0..=chunk_size bytes)
1303// }
1304// ```
1305//
1306// Fixed header = 20 bytes ([`S4E5_HEADER_BYTES`]).
1307//
1308// ## S4E6 wire format (v0.8.1 #57, current PUT emit)
1309//
1310// ```text
1311// magic         4B    "S4E6"
1312// algo          1B    0x01 (AES-256-GCM)
1313// key_id        2B    BE
1314// reserved      1B    0x00
1315// chunk_size    4B    BE
1316// chunk_count   4B    BE
1317// salt          8B    random per-PUT  ← 4B → 8B widened
1318// [chunk_count] × { tag 16B, ciphertext N B }
1319// ```
1320//
1321// Fixed header = 24 bytes ([`S4E6_HEADER_BYTES`]). Chunk array
1322// layout is byte-identical to S4E5; only the header (salt 4 → 8)
1323// and the nonce/AAD derivation differ.
1324//
1325// ## Per-chunk overhead (both S4E5 and S4E6)
1326//
1327// 16 bytes — just the AES-GCM auth tag. AES-GCM is CTR-mode, so
1328// `ciphertext.len() == plaintext.len()`. Total overhead for an
1329// N-byte plaintext at chunk size C: `header + ceil(N/C) * 16`.
1330//
1331// ## S4E5 nonce / AAD (read-only)
1332//
1333// ```text
1334// nonce_v5[0..4]  = b"E5\x00\x00"
1335// nonce_v5[4..8]  = salt (4 B)
1336// nonce_v5[8..12] = chunk_index BE (u32)
1337//
1338// aad_v5 = b"S4E5" || algo (1) || chunk_index BE (4) || total BE (4)
1339//        || key_id BE (2) || salt (4)
1340// ```
1341//
1342// Birthday-collision threshold on the 4-byte salt: ~50% at ~65,536
1343// distinct PUTs under the same key — the security regression that
1344// motivated #57.
1345//
1346// ## S4E6 nonce / AAD (current emit)
1347//
1348// ```text
1349// nonce_v6[0]     = b'E'                   (1 B fixed prefix)
1350// nonce_v6[1..9]  = salt (8 B)             (per-PUT random from OsRng)
1351// nonce_v6[9..12] = chunk_index BE (u24)   (3 B → max 16_777_215 chunks)
1352//
1353// aad_v6 = b"S4E6" || algo (1) || chunk_index BE (4) || total BE (4)
1354//        || key_id BE (2) || salt (8)
1355// ```
1356//
1357// Wider salt: birthday collision ~50% at ~2^32 = ~4.3 billion
1358// PUTs/key — four orders of magnitude over S4E5.
1359//
1360// chunk_index narrows from 32-bit to 24-bit, capping `chunk_count`
1361// at `2^24 - 1 = 16_777_215`. At the default `--sse-chunk-size
1362// 1048576` (1 MiB) that's ~16 PiB per object — three orders of
1363// magnitude over S3's 5 GiB single-object cap. Smaller chunk sizes
1364// need to be sized carefully: e.g. `--sse-chunk-size 64` on a
1365// > 1 GiB object would exceed the cap (1 GiB / 64 B = 16M+1
1366// chunks); such configurations surface
1367// [`SseError::ChunkCountTooLarge`] at PUT time rather than
1368// silently truncating.
1369//
1370// AAD on both variants includes the chunk index + total so chunk
1371// reordering or dropping fails the per-chunk tag, plus key_id +
1372// salt so header tampering also fails auth.
1373
1374/// Fixed header size of an S4E5 frame, before any chunks. `magic 4 +
1375/// algo 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 +
1376/// salt 4` = 20 bytes.
1377pub const S4E5_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 4; // = 20
1378
1379/// Per-chunk overhead inside an S4E5 / S4E6 frame: just the AES-GCM
1380/// auth tag. `ciphertext.len() == plaintext.len()` (CTR mode), so a
1381/// chunk of N plaintext bytes costs N + 16 on disk.
1382pub const S4E5_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1383
1384/// v0.8.1 #57: fixed header size of an S4E6 frame. Same layout as
1385/// S4E5 except the per-PUT salt widens 4 → 8 bytes: `magic 4 + algo
1386/// 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 + salt
1387/// 8` = 24 bytes.
1388pub const S4E6_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 8; // = 24
1389
1390/// v0.8.1 #57: per-chunk overhead for S4E6. Identical to S4E5
1391/// (same AES-GCM tag size). Re-exported as a distinct const so call
1392/// sites that compute on-disk size for S4E6 specifically can spell
1393/// the magic clearly in their arithmetic.
1394pub const S4E6_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1395
1396/// v0.8.1 #57: maximum `chunk_count` that fits in the S4E6 nonce's
1397/// 24-bit chunk_index field. At 1 MiB chunks this is ~16 PiB per
1398/// object — three orders of magnitude over S3's 5 GiB single-object
1399/// cap, so it's not a practical limit at the default chunk size.
1400pub const S4E6_MAX_CHUNK_COUNT: u32 = (1u32 << 24) - 1; // 16_777_215
1401
1402/// 4-byte fixed prefix of every S4E5 nonce. Distinct from the bytes
1403/// a random S4E1/E2 nonce could plausibly start with so debugging
1404/// dumps can immediately tell "this is a chunked nonce" from the
1405/// first 4 bytes.
1406const S4E5_NONCE_TAG: [u8; 4] = [b'E', b'5', 0, 0];
1407
1408/// 1-byte fixed prefix of every S4E6 nonce. Trades 3 of S4E5's 4
1409/// "tag" bytes for 4 extra salt bytes (4 → 8) and 0 of the chunk
1410/// index bytes (24-bit instead of 32-bit). The remaining `b'E'`
1411/// keeps debug dumps recognizable as "chunked SSE-S4 nonce".
1412const S4E6_NONCE_PREFIX: u8 = b'E';
1413
1414/// Variant tag for the chunked-frame helpers. Selects the nonce +
1415/// AAD derivation (and incidentally the salt width). The
1416/// chunk-array layout is byte-identical for both — only the header
1417/// size and the nonce/AAD derivation differ.
1418#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1419enum ChunkedVariant {
1420    V5,
1421    V6,
1422}
1423
1424impl ChunkedVariant {
1425    fn header_bytes(self) -> usize {
1426        match self {
1427            ChunkedVariant::V5 => S4E5_HEADER_BYTES,
1428            ChunkedVariant::V6 => S4E6_HEADER_BYTES,
1429        }
1430    }
1431}
1432
1433/// Build the per-chunk AAD for an S4E5 chunk. Includes magic + algo
1434/// plus the structural chunk_index/total_chunks (so chunk reordering
1435/// fails auth) plus key_id + salt (so header tampering — flipping
1436/// key_id or salt — also fails auth).
1437fn aad_v5(
1438    chunk_index: u32,
1439    total_chunks: u32,
1440    key_id: u16,
1441    salt: &[u8; 4],
1442) -> [u8; 4 + 1 + 4 + 4 + 2 + 4] {
1443    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 4]; // = 19
1444    aad[..4].copy_from_slice(SSE_MAGIC_V5);
1445    aad[4] = ALGO_AES_256_GCM;
1446    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1447    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1448    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1449    aad[15..19].copy_from_slice(salt);
1450    aad
1451}
1452
1453/// v0.8.1 #57: per-chunk AAD for S4E6. Same structural fields as
1454/// [`aad_v5`] (magic + algo + chunk_index + total + key_id + salt)
1455/// but with the wider 8-byte salt and the new `b"S4E6"` magic, so
1456/// an attacker can't strip the version tag and replay an S4E5
1457/// nonce/tag against an S4E6 frame.
1458fn aad_v6(
1459    chunk_index: u32,
1460    total_chunks: u32,
1461    key_id: u16,
1462    salt: &[u8; 8],
1463) -> [u8; 4 + 1 + 4 + 4 + 2 + 8] {
1464    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 8]; // = 23
1465    aad[..4].copy_from_slice(SSE_MAGIC_V6);
1466    aad[4] = ALGO_AES_256_GCM;
1467    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1468    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1469    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1470    aad[15..23].copy_from_slice(salt);
1471    aad
1472}
1473
1474/// Derive the 12-byte AES-GCM nonce for chunk `chunk_index` from the
1475/// per-PUT `salt`. Pure function; no RNG state — the same `(salt,
1476/// chunk_index)` always yields the same nonce, which is the whole
1477/// point: GET reads `salt` from the header and walks the chunks
1478/// without storing 12 bytes of nonce per chunk.
1479fn nonce_v5(salt: &[u8; 4], chunk_index: u32) -> [u8; NONCE_LEN] {
1480    let mut n = [0u8; NONCE_LEN];
1481    n[..4].copy_from_slice(&S4E5_NONCE_TAG);
1482    n[4..8].copy_from_slice(salt);
1483    n[8..12].copy_from_slice(&chunk_index.to_be_bytes());
1484    n
1485}
1486
1487/// v0.8.1 #57: derive the 12-byte AES-GCM nonce for an S4E6 chunk:
1488/// `b'E'(1) || salt(8) || chunk_index_BE_u24(3)`. The 24-bit
1489/// chunk_index caps `chunk_count` at 16,777,215 — see
1490/// [`S4E6_MAX_CHUNK_COUNT`]. The pre-encrypt path enforces this cap
1491/// and surfaces [`SseError::ChunkCountTooLarge`], so this function
1492/// only ever sees `chunk_index <= 0xFF_FFFF` (the leading byte of
1493/// the BE u32 is dropped).
1494fn nonce_v6(salt: &[u8; 8], chunk_index: u32) -> [u8; NONCE_LEN] {
1495    debug_assert!(
1496        chunk_index <= S4E6_MAX_CHUNK_COUNT,
1497        "S4E6 chunk_index {chunk_index} exceeds 24-bit cap (caller MUST validate)",
1498    );
1499    let mut n = [0u8; NONCE_LEN];
1500    n[0] = S4E6_NONCE_PREFIX;
1501    n[1..9].copy_from_slice(salt);
1502    let be = chunk_index.to_be_bytes(); // [b3, b2, b1, b0] of u32
1503    // Take the low 3 bytes (b2, b1, b0) — the high byte is 0 by the
1504    // S4E6_MAX_CHUNK_COUNT cap above.
1505    n[9..12].copy_from_slice(&be[1..4]);
1506    n
1507}
1508
1509/// v0.8 #52 / v0.8.1 #57: encrypt `plaintext` under `keyring`'s
1510/// active key, sliced into independently-sealed AES-GCM chunks of
1511/// `chunk_size` plaintext bytes each. Returns the on-the-wire
1512/// **S4E6** frame (v0.8.1 #57 widened the per-PUT salt 4 B → 8 B;
1513/// the S4E5 emit path was retired but the [`decrypt`] /
1514/// [`decrypt_chunked_stream`] paths still read S4E5 objects for
1515/// back-compat).
1516///
1517/// Errors:
1518/// - [`SseError::ChunkSizeInvalid`] if `chunk_size == 0`.
1519/// - [`SseError::ChunkCountTooLarge`] if
1520///   `ceil(plaintext.len() / chunk_size) > 16_777_215` (the S4E6
1521///   24-bit chunk_index cap; pick a larger `--sse-chunk-size`).
1522///
1523/// Empty plaintext is permitted and produces a frame with
1524/// `chunk_count = 1, ciphertext_len = 0` (one all-tag chunk). That
1525/// keeps the GET chunk-walk loop simpler — it never has to
1526/// special-case zero chunks.
1527///
1528/// `chunk_size` is the *plaintext* bytes per chunk; the on-disk
1529/// ciphertext per chunk is the same number (AES-GCM is CTR-mode),
1530/// plus the 16-byte tag prepended.
1531pub fn encrypt_v2_chunked(
1532    plaintext: &[u8],
1533    keyring: &SseKeyring,
1534    chunk_size: usize,
1535) -> Result<Bytes, SseError> {
1536    if chunk_size == 0 {
1537        return Err(SseError::ChunkSizeInvalid);
1538    }
1539    let (key_id, key) = keyring.active();
1540    let cipher = Aes256Gcm::new(key.as_aes_key());
1541    let mut salt = [0u8; 8];
1542    rand::rngs::OsRng.fill_bytes(&mut salt);
1543
1544    // Always emit at least one chunk (so an empty plaintext still
1545    // has a well-defined header → chunk_count >= 1 invariant).
1546    let chunk_count_usize = if plaintext.is_empty() {
1547        1
1548    } else {
1549        plaintext.len().div_ceil(chunk_size)
1550    };
1551    // Saturating-cast to u32 so we report ChunkCountTooLarge cleanly
1552    // for inputs that would overflow u32 too (would need a > 16 EiB
1553    // plaintext at chunk_size = 1 — astronomical, but defensive).
1554    let chunk_count: u32 = u32::try_from(chunk_count_usize).unwrap_or(u32::MAX);
1555    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1556        return Err(SseError::ChunkCountTooLarge {
1557            got: chunk_count,
1558            max: S4E6_MAX_CHUNK_COUNT,
1559        });
1560    }
1561
1562    let mut out = Vec::with_capacity(
1563        S4E6_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E6_PER_CHUNK_OVERHEAD),
1564    );
1565    out.extend_from_slice(SSE_MAGIC_V6);
1566    out.push(ALGO_AES_256_GCM);
1567    out.extend_from_slice(&key_id.to_be_bytes());
1568    out.push(0u8); // reserved
1569    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
1570    out.extend_from_slice(&chunk_count.to_be_bytes());
1571    out.extend_from_slice(&salt);
1572
1573    for i in 0..chunk_count {
1574        let off = (i as usize).saturating_mul(chunk_size);
1575        let end = off.saturating_add(chunk_size).min(plaintext.len());
1576        let chunk_pt: &[u8] = if off >= plaintext.len() {
1577            // Empty-plaintext / past-end (only the single-chunk
1578            // empty-plaintext case lands here).
1579            &[]
1580        } else {
1581            &plaintext[off..end]
1582        };
1583        let nonce_bytes = nonce_v6(&salt, i);
1584        let nonce = Nonce::from_slice(&nonce_bytes);
1585        let aad = aad_v6(i, chunk_count, key_id, &salt);
1586        let ct_with_tag = cipher
1587            .encrypt(
1588                nonce,
1589                Payload {
1590                    msg: chunk_pt,
1591                    aad: &aad,
1592                },
1593            )
1594            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
1595        debug_assert!(ct_with_tag.len() >= TAG_LEN);
1596        let split = ct_with_tag.len() - TAG_LEN;
1597        let (ct, tag) = ct_with_tag.split_at(split);
1598        out.extend_from_slice(tag);
1599        out.extend_from_slice(ct);
1600        crate::metrics::record_sse_streaming_chunk("encrypt");
1601    }
1602    Ok(Bytes::from(out))
1603}
1604
1605/// Salt material for a chunked frame — branches on variant so the
1606/// shared chunk-walking loop can carry both 4-byte (S4E5) and
1607/// 8-byte (S4E6) salts without an extra heap alloc.
1608#[derive(Debug, Clone, Copy)]
1609enum ChunkedSalt {
1610    V5([u8; 4]),
1611    V6([u8; 8]),
1612}
1613
1614/// Parsed S4E5 / S4E6 header — fixed-layout fields. Used by the
1615/// buffered ([`decrypt_chunked_buffered`]) and streaming
1616/// ([`decrypt_chunked_stream`]) paths to share frame validation
1617/// across both variants.
1618#[derive(Debug, Clone, Copy)]
1619struct ChunkedHeader {
1620    /// Used only by tests today (asserts on which frame variant
1621    /// parsed); in production the variant is implicit in
1622    /// `salt`'s ChunkedSalt arm. Kept as a field rather than
1623    /// re-deriving from `salt` so the parser writes one source of
1624    /// truth.
1625    #[allow(dead_code)]
1626    variant: ChunkedVariant,
1627    key_id: u16,
1628    chunk_size: u32,
1629    chunk_count: u32,
1630    salt: ChunkedSalt,
1631    /// Byte offset where the chunk array starts (always
1632    /// `variant.header_bytes()`; carried in the struct so call sites
1633    /// don't have to re-derive it from the variant tag).
1634    chunks_offset: usize,
1635}
1636
1637/// Parsed view of an S4E6 frame's fixed header. Public mirror of
1638/// the S4E4 parser — useful for admin tools or future inspectors
1639/// that want to enumerate object → key_id bindings without
1640/// re-implementing the offset math. The `salt` borrow keeps
1641/// allocations to zero (the slice points back into the input
1642/// buffer).
1643#[derive(Debug, Clone, Copy)]
1644pub struct S4E6Header<'a> {
1645    pub key_id: u16,
1646    pub chunk_size: u32,
1647    pub chunk_count: u32,
1648    pub salt: &'a [u8; 8],
1649}
1650
1651/// Pure byte-shuffle parser for an S4E6 fixed header (24 bytes). No
1652/// crypto, no keyring lookup. Errors on truncation, wrong magic,
1653/// unsupported algo, or zero `chunk_size` / `chunk_count`.
1654pub fn parse_s4e6_header(blob: &[u8]) -> Result<S4E6Header<'_>, SseError> {
1655    if blob.len() < S4E6_HEADER_BYTES {
1656        return Err(SseError::ChunkFrameTruncated { what: "header" });
1657    }
1658    if &blob[..4] != SSE_MAGIC_V6 {
1659        let mut got = [0u8; 4];
1660        got.copy_from_slice(&blob[..4]);
1661        return Err(SseError::BadMagic { got });
1662    }
1663    let algo = blob[4];
1664    if algo != ALGO_AES_256_GCM {
1665        return Err(SseError::UnsupportedAlgo { tag: algo });
1666    }
1667    let key_id = u16::from_be_bytes([blob[5], blob[6]]);
1668    // blob[7] = reserved (0; authenticated as 0 via AAD).
1669    let chunk_size = u32::from_be_bytes([blob[8], blob[9], blob[10], blob[11]]);
1670    let chunk_count = u32::from_be_bytes([blob[12], blob[13], blob[14], blob[15]]);
1671    if chunk_size == 0 {
1672        return Err(SseError::ChunkSizeInvalid);
1673    }
1674    if chunk_count == 0 {
1675        return Err(SseError::ChunkFrameTruncated {
1676            what: "chunk_count == 0",
1677        });
1678    }
1679    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1680        return Err(SseError::ChunkCountTooLarge {
1681            got: chunk_count,
1682            max: S4E6_MAX_CHUNK_COUNT,
1683        });
1684    }
1685    let salt: &[u8; 8] = (&blob[16..24]).try_into().expect("8B salt slice");
1686    Ok(S4E6Header {
1687        key_id,
1688        chunk_size,
1689        chunk_count,
1690        salt,
1691    })
1692}
1693
1694fn parse_chunked_header(
1695    body: &[u8],
1696    max_body_bytes: usize,
1697) -> Result<ChunkedHeader, SseError> {
1698    if body.len() < 4 {
1699        return Err(SseError::ChunkFrameTruncated { what: "magic" });
1700    }
1701    let magic = &body[..4];
1702    let variant = if magic == SSE_MAGIC_V5 {
1703        ChunkedVariant::V5
1704    } else if magic == SSE_MAGIC_V6 {
1705        ChunkedVariant::V6
1706    } else {
1707        let mut got = [0u8; 4];
1708        got.copy_from_slice(magic);
1709        return Err(SseError::BadMagic { got });
1710    };
1711    let header_bytes = variant.header_bytes();
1712    if body.len() < header_bytes {
1713        return Err(SseError::ChunkFrameTruncated { what: "header" });
1714    }
1715    let algo = body[4];
1716    if algo != ALGO_AES_256_GCM {
1717        return Err(SseError::UnsupportedAlgo { tag: algo });
1718    }
1719    let key_id = u16::from_be_bytes([body[5], body[6]]);
1720    // body[7] = reserved (must be 0; authenticated as 0 via AAD).
1721    let chunk_size = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
1722    let chunk_count = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
1723    if chunk_size == 0 {
1724        return Err(SseError::ChunkSizeInvalid);
1725    }
1726    if chunk_count == 0 {
1727        return Err(SseError::ChunkFrameTruncated {
1728            what: "chunk_count == 0",
1729        });
1730    }
1731    let salt = match variant {
1732        ChunkedVariant::V5 => {
1733            let mut s = [0u8; 4];
1734            s.copy_from_slice(&body[16..20]);
1735            ChunkedSalt::V5(s)
1736        }
1737        ChunkedVariant::V6 => {
1738            // v0.8.1 #57 sanity check: the encoder enforces this cap,
1739            // but a tampered / malicious frame could declare a huge
1740            // chunk_count that would loop the walker 16M+ times if
1741            // we trusted it. Reject early.
1742            if chunk_count > S4E6_MAX_CHUNK_COUNT {
1743                return Err(SseError::ChunkCountTooLarge {
1744                    got: chunk_count,
1745                    max: S4E6_MAX_CHUNK_COUNT,
1746                });
1747            }
1748            let mut s = [0u8; 8];
1749            s.copy_from_slice(&body[16..24]);
1750            ChunkedSalt::V6(s)
1751        }
1752    };
1753
1754    // v0.8.2 #64 — DoS guard. Pre-validate the declared
1755    // (chunk_size × chunk_count) plaintext size in u64 before *any*
1756    // downstream allocation or chunk-walk loop. Rejects:
1757    //   1. arithmetic that would overflow u64 (e.g. chunk_size =
1758    //      u32::MAX, chunk_count = u32::MAX → 2^64 ≫ u64::MAX),
1759    //   2. a body that is *larger* than the maximum a header with
1760    //      these declared sizes could plausibly carry (i.e. trailing
1761    //      bytes after the final chunk → corruption / append attack),
1762    //      and
1763    //   3. a plaintext size larger than the caller's configured
1764    //      `max_body_bytes` (default 5 GiB).
1765    //
1766    // We do NOT require the body to be `>= max_total` here — the
1767    // final chunk may legitimately hold fewer than `chunk_size`
1768    // plaintext bytes (encoder uses
1769    // `chunk_count = ceil(plaintext.len() / chunk_size)`, so the last
1770    // chunk holds the remainder). The walker handles that
1771    // variable-length tail; truncation of the final chunk's tag /
1772    // ciphertext surfaces as `ChunkFrameTruncated` once the walker
1773    // gets there. The pre-alloc guard's job is the *upper-bound*
1774    // checks: kill obviously-impossible shapes before we
1775    // `Vec::with_capacity(chunk_size × chunk_count)`.
1776    //
1777    // All paths return concrete error variants; nothing panics on
1778    // adversarial input. The error strings carry only constant
1779    // operator-readable labels (no echoed numbers) so the response is
1780    // not a tampering oracle for the attacker.
1781    let chunk_size_u64 = chunk_size as u64;
1782    let chunk_count_u64 = chunk_count as u64;
1783    let expected_plain_size = chunk_size_u64
1784        .checked_mul(chunk_count_u64)
1785        .ok_or(SseError::ChunkFrameTooLarge {
1786            details: "chunk_size * chunk_count overflows u64",
1787        })?;
1788    let per_chunk_overhead = S4E5_PER_CHUNK_OVERHEAD as u64; // = 16 (AES-GCM tag)
1789    let total_tag_overhead = per_chunk_overhead.checked_mul(chunk_count_u64).ok_or(
1790        SseError::ChunkFrameTooLarge {
1791            details: "tag_len * chunk_count overflows u64",
1792        },
1793    )?;
1794    let max_total = expected_plain_size
1795        .checked_add(total_tag_overhead)
1796        .and_then(|t| t.checked_add(header_bytes as u64))
1797        .ok_or(SseError::ChunkFrameTooLarge {
1798            details: "header + plaintext + tag overhead overflows u64",
1799        })?;
1800    // Body cannot be *larger* than what the declared geometry could
1801    // legitimately produce. Any extra bytes past the final chunk are
1802    // either trailing junk (corruption) or an append attack — kill
1803    // them here so the walker doesn't even start verifying chunks
1804    // for an obviously-malformed body. (The walker's own end-of-loop
1805    // `cursor != body.len()` check also catches this; the pre-alloc
1806    // guard saves the chunk-walk worth of AES-GCM verifies in the
1807    // hostile case.)
1808    if (body.len() as u64) > max_total {
1809        return Err(SseError::ChunkFrameTruncated {
1810            what: "trailing bytes past declared chunk geometry",
1811        });
1812    }
1813    // The actual DoS guard: cap on declared plaintext. If the header
1814    // claims more than the operator's configured single-object
1815    // ceiling, refuse before the buffered path's
1816    // `Vec::with_capacity(chunk_size * chunk_count)` runs.
1817    if expected_plain_size > max_body_bytes as u64 {
1818        return Err(SseError::ChunkFrameTooLarge {
1819            details: "declared plaintext exceeds gateway max_body_bytes",
1820        });
1821    }
1822
1823    Ok(ChunkedHeader {
1824        variant,
1825        key_id,
1826        chunk_size,
1827        chunk_count,
1828        salt,
1829        chunks_offset: header_bytes,
1830    })
1831}
1832
1833/// Decrypt one chunk under either the S4E5 or S4E6 derivation. Used
1834/// by both the buffered and streaming paths so AAD / nonce
1835/// derivation lives in exactly one place.
1836fn decrypt_chunked_chunk(
1837    cipher: &Aes256Gcm,
1838    chunk_index: u32,
1839    chunk_count: u32,
1840    key_id: u16,
1841    salt: &ChunkedSalt,
1842    tag: &[u8; TAG_LEN],
1843    ct: &[u8],
1844) -> Result<Bytes, SseError> {
1845    let nonce_bytes = match salt {
1846        ChunkedSalt::V5(s) => nonce_v5(s, chunk_index),
1847        ChunkedSalt::V6(s) => nonce_v6(s, chunk_index),
1848    };
1849    let nonce = Nonce::from_slice(&nonce_bytes);
1850    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1851    ct_with_tag.extend_from_slice(ct);
1852    ct_with_tag.extend_from_slice(tag);
1853    let result = match salt {
1854        ChunkedSalt::V5(s) => {
1855            let aad = aad_v5(chunk_index, chunk_count, key_id, s);
1856            cipher.decrypt(
1857                nonce,
1858                Payload {
1859                    msg: &ct_with_tag,
1860                    aad: &aad,
1861                },
1862            )
1863        }
1864        ChunkedSalt::V6(s) => {
1865            let aad = aad_v6(chunk_index, chunk_count, key_id, s);
1866            cipher.decrypt(
1867                nonce,
1868                Payload {
1869                    msg: &ct_with_tag,
1870                    aad: &aad,
1871                },
1872            )
1873        }
1874    };
1875    result
1876        .map(Bytes::from)
1877        .map_err(|_| SseError::ChunkAuthFailed { chunk_index })
1878}
1879
1880/// Walk an S4E5 / S4E6 body chunk-by-chunk, calling `emit` on each
1881/// successfully-verified plaintext chunk. Returns immediately on the
1882/// first chunk that fails auth or is truncated. Shared core between
1883/// the buffered ([`decrypt_chunked_buffered`]) and streaming
1884/// ([`decrypt_chunked_stream`]) paths.
1885fn walk_chunked<F: FnMut(Bytes) -> Result<(), SseError>>(
1886    body: &[u8],
1887    keyring: &SseKeyring,
1888    max_body_bytes: usize,
1889    mut emit: F,
1890) -> Result<(), SseError> {
1891    let hdr = parse_chunked_header(body, max_body_bytes)?;
1892    let key = keyring
1893        .get(hdr.key_id)
1894        .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1895    let cipher = Aes256Gcm::new(key.as_aes_key());
1896
1897    let mut cursor = hdr.chunks_offset;
1898    let chunk_size = hdr.chunk_size as usize;
1899    for i in 0..hdr.chunk_count {
1900        if cursor + TAG_LEN > body.len() {
1901            return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1902        }
1903        let tag_off = cursor;
1904        let ct_off = tag_off + TAG_LEN;
1905        let is_last = i + 1 == hdr.chunk_count;
1906        let ct_len = if is_last {
1907            if ct_off > body.len() {
1908                return Err(SseError::ChunkFrameTruncated {
1909                    what: "final chunk ciphertext",
1910                });
1911            }
1912            let remaining = body.len() - ct_off;
1913            if remaining > chunk_size {
1914                return Err(SseError::ChunkFrameTruncated {
1915                    what: "trailing bytes after final chunk",
1916                });
1917            }
1918            remaining
1919        } else {
1920            chunk_size
1921        };
1922        let ct_end = ct_off + ct_len;
1923        if ct_end > body.len() {
1924            return Err(SseError::ChunkFrameTruncated {
1925                what: "chunk ciphertext",
1926            });
1927        }
1928        let mut tag = [0u8; TAG_LEN];
1929        tag.copy_from_slice(&body[tag_off..ct_off]);
1930        let ct = &body[ct_off..ct_end];
1931        let plain = decrypt_chunked_chunk(
1932            &cipher,
1933            i,
1934            hdr.chunk_count,
1935            hdr.key_id,
1936            &hdr.salt,
1937            &tag,
1938            ct,
1939        )?;
1940        crate::metrics::record_sse_streaming_chunk("decrypt");
1941        emit(plain)?;
1942        cursor = ct_end;
1943    }
1944    if cursor != body.len() {
1945        return Err(SseError::ChunkFrameTruncated {
1946            what: "trailing bytes after declared chunk_count",
1947        });
1948    }
1949    Ok(())
1950}
1951
1952/// Sync back-compat path: decrypt every chunk and concatenate into
1953/// a single `Bytes`. Memory peak = full plaintext (defeats the
1954/// point of S4E5/S4E6 streaming, but useful for callers that already
1955/// need the whole body — e.g. server-side restream-rewrite paths or
1956/// unit tests). Accepts both S4E5 (legacy) and S4E6 (current) bodies.
1957///
1958/// `max_body_bytes` (v0.8.2 #64) caps the declared plaintext size in
1959/// the header **before any allocation**. A malicious / corrupted
1960/// frame that claims a multi-PB plaintext is rejected as
1961/// [`SseError::ChunkFrameTooLarge`] with no `Vec::with_capacity` call
1962/// behind it. Callers without a bespoke cap should use
1963/// [`decrypt_chunked_buffered_default`] (5 GiB cap).
1964pub fn decrypt_chunked_buffered(
1965    body: &[u8],
1966    keyring: &SseKeyring,
1967    max_body_bytes: usize,
1968) -> Result<Bytes, SseError> {
1969    let hdr = parse_chunked_header(body, max_body_bytes)?;
1970    // Header is validated above: chunk_size * chunk_count fits u64
1971    // and is ≤ max_body_bytes (≤ 5 GiB on this gateway), so the
1972    // `as usize` casts cannot truncate on a 64-bit host. The
1973    // `Vec::with_capacity` is therefore bounded by the same cap that
1974    // protects the rest of the gateway from oversized PUTs / GETs.
1975    let mut out = Vec::with_capacity(hdr.chunk_size as usize * hdr.chunk_count as usize);
1976    walk_chunked(body, keyring, max_body_bytes, |chunk| {
1977        out.extend_from_slice(&chunk);
1978        Ok(())
1979    })?;
1980    Ok(Bytes::from(out))
1981}
1982
1983/// Back-compat wrapper around [`decrypt_chunked_buffered`] that uses
1984/// [`DEFAULT_MAX_BODY_BYTES`] (5 GiB — AWS S3's single-object PUT
1985/// ceiling). Provided so the existing `decrypt(body, keyring)`
1986/// dispatcher and call sites that don't have a bespoke cap can keep
1987/// their signature unchanged after the v0.8.2 #64 pre-allocation
1988/// guard landed.
1989pub fn decrypt_chunked_buffered_default(
1990    body: &[u8],
1991    keyring: &SseKeyring,
1992) -> Result<Bytes, SseError> {
1993    decrypt_chunked_buffered(body, keyring, DEFAULT_MAX_BODY_BYTES)
1994}
1995
1996/// v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): stream-decrypt API for
1997/// chunked SSE-S4 bodies. Returns a [`futures::Stream`] that yields
1998/// one `Bytes` per chunk in order. Each chunk is emitted only after
1999/// AES-GCM tag verify succeeds, so the client never sees plaintext
2000/// bytes that haven't been authenticated. A failing chunk yields
2001/// its [`SseError::ChunkAuthFailed`] (with the chunk index) and ends
2002/// the stream — earlier chunks may already have left the gateway,
2003/// which matches the standard streaming-AEAD trade-off (operators
2004/// MUST alert on the audit log + metric, not rely on connection
2005/// close to guarantee atomicity).
2006///
2007/// Accepts either S4E5 (v0.8 #52, legacy) or S4E6 (v0.8.1 #57,
2008/// current) magic. Non-chunked magic surfaces as
2009/// [`SseError::BadMagic`] / [`SseError::ChunkFrameTruncated`] on
2010/// the first poll — the stream is "fail-fast" rather than "fall
2011/// through to S4E2 buffered decrypt", because the caller has
2012/// already dispatched on [`peek_magic`] by the time it hands a body
2013/// to this function.
2014///
2015/// `body` is owned by the returned stream so the caller doesn't
2016/// need to keep the bytes alive separately. The returned stream is
2017/// `'static` — the `keyring` borrow is consumed up front to extract
2018/// the per-frame key and build the AES cipher (which owns its key
2019/// material), so the caller's keyring may be dropped immediately.
2020pub fn decrypt_chunked_stream(
2021    body: bytes::Bytes,
2022    keyring: &SseKeyring,
2023) -> impl futures::Stream<Item = Result<Bytes, SseError>> + 'static {
2024    use futures::stream::{self, StreamExt};
2025
2026    // Cheap pre-validation: parse the header + look up the key
2027    // once, up front, so a malformed frame surfaces on the first
2028    // poll instead of being deferred behind the first-chunk loop.
2029    // The `keyring` borrow ends here — we extract the AES key into
2030    // the owned `Aes256Gcm` cipher, then store that in the stream
2031    // state.
2032    let prelude = (|| {
2033        // v0.8.2 #64: streaming path is naturally memory-bounded
2034        // (one chunk-worth of plaintext at a time), so we don't cap
2035        // the *declared* total here — the per-chunk loop already
2036        // bounds memory by `chunk_size`. The truncation /
2037        // arithmetic-overflow checks inside `parse_chunked_header`
2038        // still run regardless of `max_body_bytes`, which is what
2039        // protects the streaming path from the same DoS class as the
2040        // buffered path (the overflow / declared-vs-actual length
2041        // mismatches are independent of the gateway cap).
2042        let hdr = parse_chunked_header(&body, usize::MAX)?;
2043        let key = keyring
2044            .get(hdr.key_id)
2045            .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
2046        let cipher = Aes256Gcm::new(key.as_aes_key());
2047        Ok::<_, SseError>((hdr, cipher))
2048    })();
2049
2050    match prelude {
2051        Err(e) => stream::iter(std::iter::once(Err(e))).left_stream(),
2052        Ok((hdr, cipher)) => {
2053            let chunks_offset = hdr.chunks_offset;
2054            let state = ChunkedDecryptState {
2055                body,
2056                cipher,
2057                hdr,
2058                cursor: chunks_offset,
2059                next_index: 0,
2060            };
2061            stream::try_unfold(state, decrypt_next_chunk).right_stream()
2062        }
2063    }
2064}
2065
2066/// Per-stream state for [`decrypt_chunked_stream`]. Holds the owned
2067/// `body` (so the stream stays self-contained), the prepared
2068/// cipher, and the cursor position into the chunk array.
2069struct ChunkedDecryptState {
2070    body: bytes::Bytes,
2071    cipher: Aes256Gcm,
2072    hdr: ChunkedHeader,
2073    cursor: usize,
2074    next_index: u32,
2075}
2076
2077async fn decrypt_next_chunk(
2078    mut state: ChunkedDecryptState,
2079) -> Result<Option<(Bytes, ChunkedDecryptState)>, SseError> {
2080    if state.next_index >= state.hdr.chunk_count {
2081        // Final boundary check — anything past the declared
2082        // chunk_count would be a truncation / append attack.
2083        if state.cursor != state.body.len() {
2084            return Err(SseError::ChunkFrameTruncated {
2085                what: "trailing bytes after declared chunk_count",
2086            });
2087        }
2088        return Ok(None);
2089    }
2090    let i = state.next_index;
2091    let chunk_size = state.hdr.chunk_size as usize;
2092    if state.cursor + TAG_LEN > state.body.len() {
2093        return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
2094    }
2095    let tag_off = state.cursor;
2096    let ct_off = tag_off + TAG_LEN;
2097    let is_last = i + 1 == state.hdr.chunk_count;
2098    let ct_len = if is_last {
2099        if ct_off > state.body.len() {
2100            return Err(SseError::ChunkFrameTruncated {
2101                what: "final chunk ciphertext",
2102            });
2103        }
2104        let remaining = state.body.len() - ct_off;
2105        if remaining > chunk_size {
2106            return Err(SseError::ChunkFrameTruncated {
2107                what: "trailing bytes after final chunk",
2108            });
2109        }
2110        remaining
2111    } else {
2112        chunk_size
2113    };
2114    let ct_end = ct_off + ct_len;
2115    if ct_end > state.body.len() {
2116        return Err(SseError::ChunkFrameTruncated {
2117            what: "chunk ciphertext",
2118        });
2119    }
2120    let mut tag = [0u8; TAG_LEN];
2121    tag.copy_from_slice(&state.body[tag_off..ct_off]);
2122    let ct = &state.body[ct_off..ct_end];
2123    let plain = decrypt_chunked_chunk(
2124        &state.cipher,
2125        i,
2126        state.hdr.chunk_count,
2127        state.hdr.key_id,
2128        &state.hdr.salt,
2129        &tag,
2130        ct,
2131    )?;
2132    crate::metrics::record_sse_streaming_chunk("decrypt");
2133    state.cursor = ct_end;
2134    state.next_index += 1;
2135    Ok(Some((plain, state)))
2136}
2137
2138/// v0.8.1 #57: build an S4E5 frame. Identical body structure to the
2139/// pre-#57 `encrypt_v2_chunked` (4-byte salt + S4E5 magic + V5
2140/// nonce/AAD); kept around purely so the back-compat-read tests can
2141/// synthesize a "v0.8.0 vintage" blob and prove the new gateway
2142/// still decrypts it.
2143#[cfg(test)]
2144fn encrypt_v2_chunked_s4e5_for_test(
2145    plaintext: &[u8],
2146    keyring: &SseKeyring,
2147    chunk_size: usize,
2148) -> Result<Bytes, SseError> {
2149    if chunk_size == 0 {
2150        return Err(SseError::ChunkSizeInvalid);
2151    }
2152    let (key_id, key) = keyring.active();
2153    let cipher = Aes256Gcm::new(key.as_aes_key());
2154    let mut salt = [0u8; 4];
2155    rand::rngs::OsRng.fill_bytes(&mut salt);
2156
2157    let chunk_count: u32 = if plaintext.is_empty() {
2158        1
2159    } else {
2160        plaintext
2161            .len()
2162            .div_ceil(chunk_size)
2163            .try_into()
2164            .expect("chunk_count overflows u32")
2165    };
2166
2167    let mut out = Vec::with_capacity(
2168        S4E5_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E5_PER_CHUNK_OVERHEAD),
2169    );
2170    out.extend_from_slice(SSE_MAGIC_V5);
2171    out.push(ALGO_AES_256_GCM);
2172    out.extend_from_slice(&key_id.to_be_bytes());
2173    out.push(0u8);
2174    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
2175    out.extend_from_slice(&chunk_count.to_be_bytes());
2176    out.extend_from_slice(&salt);
2177
2178    for i in 0..chunk_count {
2179        let off = (i as usize).saturating_mul(chunk_size);
2180        let end = off.saturating_add(chunk_size).min(plaintext.len());
2181        let chunk_pt: &[u8] = if off >= plaintext.len() {
2182            &[]
2183        } else {
2184            &plaintext[off..end]
2185        };
2186        let nonce_bytes = nonce_v5(&salt, i);
2187        let nonce = Nonce::from_slice(&nonce_bytes);
2188        let aad = aad_v5(i, chunk_count, key_id, &salt);
2189        let ct_with_tag = cipher
2190            .encrypt(
2191                nonce,
2192                Payload {
2193                    msg: chunk_pt,
2194                    aad: &aad,
2195                },
2196            )
2197            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
2198        let split = ct_with_tag.len() - TAG_LEN;
2199        let (ct, tag) = ct_with_tag.split_at(split);
2200        out.extend_from_slice(tag);
2201        out.extend_from_slice(ct);
2202    }
2203    Ok(Bytes::from(out))
2204}
2205
2206#[cfg(test)]
2207mod tests {
2208    use super::*;
2209
2210    fn key32(seed: u8) -> Arc<SseKey> {
2211        Arc::new(SseKey::from_bytes(&[seed; 32]).unwrap())
2212    }
2213
2214    fn keyring_single(seed: u8) -> SseKeyring {
2215        SseKeyring::new(1, key32(seed))
2216    }
2217
2218    #[test]
2219    fn roundtrip_basic_v1() {
2220        // back-compat single-key API — still works.
2221        let k = SseKey::from_bytes(&[7u8; 32]).unwrap();
2222        let pt = b"the quick brown fox jumps over the lazy dog";
2223        let ct = encrypt(&k, pt);
2224        assert!(looks_encrypted(&ct));
2225        assert_eq!(&ct[..4], SSE_MAGIC_V1);
2226        assert_eq!(ct[4], ALGO_AES_256_GCM);
2227        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2228        // decrypt via single-key keyring
2229        let kr = SseKeyring::new(1, Arc::new(k));
2230        let pt2 = decrypt(&ct, &kr).unwrap();
2231        assert_eq!(pt2.as_ref(), pt);
2232    }
2233
2234    #[test]
2235    fn s4e2_roundtrip_active_key() {
2236        let kr = keyring_single(7);
2237        let pt = b"S4E2 active-key roundtrip";
2238        let ct = encrypt_v2(pt, &kr);
2239        assert_eq!(&ct[..4], SSE_MAGIC_V2);
2240        assert_eq!(ct[4], ALGO_AES_256_GCM);
2241        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE");
2242        assert_eq!(ct[7], 0, "reserved byte");
2243        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2244        assert!(looks_encrypted(&ct));
2245        let pt2 = decrypt(&ct, &kr).unwrap();
2246        assert_eq!(pt2.as_ref(), pt);
2247    }
2248
2249    #[test]
2250    fn decrypt_s4e1_via_active_only_keyring() {
2251        // v0.4 wrote S4E1 with key K; v0.5 keyring has K as the only
2252        // (active) key. Decrypt must succeed.
2253        let k_arc = key32(11);
2254        let legacy_ct = encrypt(&k_arc, b"v0.4 vintage object");
2255        assert_eq!(&legacy_ct[..4], SSE_MAGIC_V1);
2256        let kr = SseKeyring::new(1, Arc::clone(&k_arc));
2257        let plain = decrypt(&legacy_ct, &kr).unwrap();
2258        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2259    }
2260
2261    #[test]
2262    fn decrypt_s4e2_under_old_key_after_rotation() {
2263        // Rotation flow: object was encrypted under key id=1 when 1
2264        // was active. Operator rotates to active=2 and keeps 1 in the
2265        // keyring. The S4E2 body must still decrypt.
2266        let k1 = key32(1);
2267        let k2 = key32(2);
2268        let mut kr_old = SseKeyring::new(1, Arc::clone(&k1));
2269        let ct = encrypt_v2(b"old-rotation object", &kr_old);
2270        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2271
2272        // After rotation: active=2, but key 1 still in ring.
2273        kr_old.add(2, Arc::clone(&k2));
2274        let mut kr_new = SseKeyring::new(2, Arc::clone(&k2));
2275        kr_new.add(1, Arc::clone(&k1));
2276
2277        let plain = decrypt(&ct, &kr_new).unwrap();
2278        assert_eq!(plain.as_ref(), b"old-rotation object");
2279
2280        // And new PUTs go to id 2 (active).
2281        let new_ct = encrypt_v2(b"new-rotation object", &kr_new);
2282        assert_eq!(u16::from_be_bytes([new_ct[5], new_ct[6]]), 2);
2283        let plain_new = decrypt(&new_ct, &kr_new).unwrap();
2284        assert_eq!(plain_new.as_ref(), b"new-rotation object");
2285    }
2286
2287    #[test]
2288    fn s4e2_unknown_key_id_errors() {
2289        let kr = keyring_single(3); // only id=1 present
2290        let kr_other = SseKeyring::new(99, key32(3));
2291        let ct = encrypt_v2(b"x", &kr_other); // body claims key_id=99
2292        let err = decrypt(&ct, &kr).unwrap_err();
2293        assert!(
2294            matches!(err, SseError::KeyNotInKeyring { id: 99 }),
2295            "got {err:?}"
2296        );
2297    }
2298
2299    #[test]
2300    fn s4e2_tampered_key_id_fails_auth() {
2301        let kr = SseKeyring::new(1, key32(4));
2302        let mut kr_with_2 = kr.clone();
2303        kr_with_2.add(2, key32(5)); // a real but wrong key under id=2
2304        let mut ct = encrypt_v2(b"do not flip my key id", &kr).to_vec();
2305        // Flip key_id from 1 → 2 in the header. The keyring HAS a key
2306        // for 2, so the lookup succeeds — but AAD authenticates the
2307        // original key_id, so AES-GCM tag verification must fail.
2308        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2309        ct[5] = 0;
2310        ct[6] = 2;
2311        let err = decrypt(&ct, &kr_with_2).unwrap_err();
2312        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2313    }
2314
2315    #[test]
2316    fn s4e2_tampered_ciphertext_fails() {
2317        let kr = SseKeyring::new(7, key32(9));
2318        let mut ct = encrypt_v2(b"secret message v2", &kr).to_vec();
2319        let last = ct.len() - 1;
2320        ct[last] ^= 0x01;
2321        let err = decrypt(&ct, &kr).unwrap_err();
2322        assert!(matches!(err, SseError::DecryptFailed));
2323    }
2324
2325    #[test]
2326    fn s4e2_tampered_algo_byte_fails() {
2327        let kr = SseKeyring::new(1, key32(2));
2328        let mut ct = encrypt_v2(b"hi", &kr).to_vec();
2329        ct[4] = 99;
2330        let err = decrypt(&ct, &kr).unwrap_err();
2331        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2332    }
2333
2334    #[test]
2335    fn wrong_key_fails_v1_via_keyring() {
2336        // S4E1 written under key K1; keyring has only K2 → DecryptFailed.
2337        let k1 = SseKey::from_bytes(&[1u8; 32]).unwrap();
2338        let ct = encrypt(&k1, b"secret");
2339        let kr_wrong = SseKeyring::new(1, Arc::new(SseKey::from_bytes(&[2u8; 32]).unwrap()));
2340        let err = decrypt(&ct, &kr_wrong).unwrap_err();
2341        assert!(matches!(err, SseError::DecryptFailed));
2342    }
2343
2344    #[test]
2345    fn rejects_short_body() {
2346        let kr = SseKeyring::new(1, key32(1));
2347        let err = decrypt(b"short", &kr).unwrap_err();
2348        assert!(matches!(err, SseError::TooShort { got: 5 }));
2349    }
2350
2351    #[test]
2352    fn looks_encrypted_passthrough_returns_false() {
2353        // S4F2 frame magic, NOT S4E1 / S4E2 — must not be confused.
2354        let f2 = b"S4F2\x01\x00\x00\x00........................................";
2355        assert!(!looks_encrypted(f2));
2356        assert!(!looks_encrypted(b""));
2357    }
2358
2359    #[test]
2360    fn looks_encrypted_detects_both_v1_and_v2() {
2361        let kr = SseKeyring::new(1, key32(8));
2362        let v1 = encrypt(&SseKey::from_bytes(&[8u8; 32]).unwrap(), b"x");
2363        let v2 = encrypt_v2(b"x", &kr);
2364        assert!(looks_encrypted(&v1));
2365        assert!(looks_encrypted(&v2));
2366    }
2367
2368    #[test]
2369    fn key_from_hex_string() {
2370        let bad =
2371            SseKey::from_bytes(b"0102030405060708090a0b0c0d0e0f10111213141516171819202122232425")
2372                .unwrap_err();
2373        assert!(matches!(bad, SseError::BadKeyLength { .. }));
2374        let good = b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2375        let _ = SseKey::from_bytes(good).expect("64-char hex should parse");
2376    }
2377
2378    #[test]
2379    fn encrypt_v2_uses_random_nonce() {
2380        let kr = SseKeyring::new(1, key32(3));
2381        let pt = b"deterministic input";
2382        let a = encrypt_v2(pt, &kr);
2383        let b = encrypt_v2(pt, &kr);
2384        assert_ne!(a, b, "nonce must be random per-call");
2385    }
2386
2387    #[test]
2388    fn keyring_active_and_get() {
2389        let k1 = key32(1);
2390        let k2 = key32(2);
2391        let mut kr = SseKeyring::new(1, Arc::clone(&k1));
2392        kr.add(2, Arc::clone(&k2));
2393        let (id, active) = kr.active();
2394        assert_eq!(id, 1);
2395        assert_eq!(active.bytes, [1u8; 32]);
2396        assert!(kr.get(2).is_some());
2397        assert!(kr.get(3).is_none());
2398    }
2399
2400    // -----------------------------------------------------------------
2401    // v0.5 #27 — SSE-C (customer-provided key, S4E3 frame) tests
2402    // -----------------------------------------------------------------
2403
2404    use base64::Engine as _;
2405
2406    fn cust_key(seed: u8) -> CustomerKeyMaterial {
2407        let key = [seed; KEY_LEN];
2408        let key_md5 = compute_key_md5(&key);
2409        CustomerKeyMaterial { key, key_md5 }
2410    }
2411
2412    #[test]
2413    fn s4e3_roundtrip_happy_path() {
2414        let m = cust_key(42);
2415        let pt = b"top-secret SSE-C payload";
2416        let ct = encrypt_with_source(
2417            pt,
2418            SseSource::CustomerKey {
2419                key: &m.key,
2420                key_md5: &m.key_md5,
2421            },
2422        );
2423        // Frame inspection.
2424        assert_eq!(&ct[..4], SSE_MAGIC_V3);
2425        assert_eq!(ct[4], ALGO_AES_256_GCM);
2426        assert_eq!(&ct[5..5 + KEY_MD5_LEN], &m.key_md5);
2427        assert_eq!(ct.len(), SSE_HEADER_BYTES_V3 + pt.len());
2428        assert!(looks_encrypted(&ct));
2429        // Decrypt round-trip.
2430        let plain = decrypt(
2431            &ct,
2432            SseSource::CustomerKey {
2433                key: &m.key,
2434                key_md5: &m.key_md5,
2435            },
2436        )
2437        .unwrap();
2438        assert_eq!(plain.as_ref(), pt);
2439        // And via the From impl on &CustomerKeyMaterial.
2440        let plain2 = decrypt(&ct, &m).unwrap();
2441        assert_eq!(plain2.as_ref(), pt);
2442    }
2443
2444    #[test]
2445    fn s4e3_wrong_key_yields_wrong_customer_key_error() {
2446        let m = cust_key(1);
2447        let other = cust_key(2);
2448        let ct = encrypt_with_source(b"payload", (&m).into());
2449        let err = decrypt(
2450            &ct,
2451            SseSource::CustomerKey {
2452                key: &other.key,
2453                key_md5: &other.key_md5,
2454            },
2455        )
2456        .unwrap_err();
2457        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2458    }
2459
2460    #[test]
2461    fn s4e3_tampered_stored_md5_is_caught() {
2462        // Attacker rewrites the stored MD5 to match a key they know.
2463        // Even though the supplied (attacker) key matches the rewritten
2464        // MD5, AES-GCM authenticates the ORIGINAL md5 via AAD, so the
2465        // tag check fails. Surface: WrongCustomerKey if the supplied
2466        // md5 != stored md5 (this test), or DecryptFailed if attacker
2467        // also rewrites their supplied md5 to match.
2468        let m = cust_key(7);
2469        let mut ct = encrypt_with_source(b"victim payload", (&m).into()).to_vec();
2470        // Flip a byte in the stored fingerprint.
2471        ct[5] ^= 0x55;
2472        // Client supplies the original (unmodified) key + md5.
2473        let err = decrypt(
2474            &ct,
2475            SseSource::CustomerKey {
2476                key: &m.key,
2477                key_md5: &m.key_md5,
2478            },
2479        )
2480        .unwrap_err();
2481        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2482    }
2483
2484    #[test]
2485    fn s4e3_tampered_md5_with_matching_supplied_md5_fails_aead() {
2486        // Both stored md5 AND supplied md5 are flipped to the same bogus
2487        // value. The fingerprint check passes (they match) but AAD
2488        // authenticates the *original* md5, so AES-GCM fails.
2489        let m = cust_key(3);
2490        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2491        ct[5] ^= 0xFF;
2492        let mut bogus_md5 = m.key_md5;
2493        bogus_md5[0] ^= 0xFF;
2494        let err = decrypt(
2495            &ct,
2496            SseSource::CustomerKey {
2497                key: &m.key,
2498                key_md5: &bogus_md5,
2499            },
2500        )
2501        .unwrap_err();
2502        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2503    }
2504
2505    #[test]
2506    fn s4e3_tampered_ciphertext_fails_aead() {
2507        let m = cust_key(8);
2508        let mut ct = encrypt_with_source(b"sealed message", (&m).into()).to_vec();
2509        let last = ct.len() - 1;
2510        ct[last] ^= 0x01;
2511        let err = decrypt(&ct, &m).unwrap_err();
2512        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2513    }
2514
2515    #[test]
2516    fn s4e3_tampered_algo_byte_rejected() {
2517        let m = cust_key(9);
2518        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2519        ct[4] = 99;
2520        let err = decrypt(&ct, &m).unwrap_err();
2521        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2522    }
2523
2524    #[test]
2525    fn s4e3_uses_random_nonce() {
2526        let m = cust_key(10);
2527        let a = encrypt_with_source(b"deterministic input", (&m).into());
2528        let b = encrypt_with_source(b"deterministic input", (&m).into());
2529        assert_ne!(a, b, "nonce must be random per-call");
2530    }
2531
2532    #[test]
2533    fn parse_customer_key_headers_happy_path() {
2534        let key = [11u8; KEY_LEN];
2535        let md5 = compute_key_md5(&key);
2536        let key_b64 = base64::engine::general_purpose::STANDARD.encode(key);
2537        let md5_b64 = base64::engine::general_purpose::STANDARD.encode(md5);
2538        let m = parse_customer_key_headers("AES256", &key_b64, &md5_b64).unwrap();
2539        assert_eq!(m.key, key);
2540        assert_eq!(m.key_md5, md5);
2541    }
2542
2543    #[test]
2544    fn parse_customer_key_headers_rejects_wrong_algorithm() {
2545        let key = [1u8; KEY_LEN];
2546        let md5 = compute_key_md5(&key);
2547        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2548        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2549        let err = parse_customer_key_headers("AES128", &kb, &mb).unwrap_err();
2550        assert!(
2551            matches!(err, SseError::CustomerKeyAlgorithmUnsupported { ref algo } if algo == "AES128"),
2552            "got {err:?}"
2553        );
2554        // Lowercase variant still rejected (AWS S3 accepts only "AES256").
2555        let err2 = parse_customer_key_headers("aes256", &kb, &mb).unwrap_err();
2556        assert!(
2557            matches!(err2, SseError::CustomerKeyAlgorithmUnsupported { .. }),
2558            "got {err2:?}"
2559        );
2560    }
2561
2562    #[test]
2563    fn parse_customer_key_headers_rejects_wrong_key_length() {
2564        let short_key = vec![5u8; 16]; // half-length AES key
2565        let md5 = compute_key_md5(&short_key);
2566        let kb = base64::engine::general_purpose::STANDARD.encode(&short_key);
2567        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2568        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2569        assert!(
2570            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("key length")),
2571            "got {err:?}"
2572        );
2573    }
2574
2575    #[test]
2576    fn parse_customer_key_headers_rejects_wrong_md5_length() {
2577        let key = [3u8; KEY_LEN];
2578        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2579        // Truncated MD5 (15 bytes instead of 16).
2580        let bad_md5 = vec![0u8; 15];
2581        let mb = base64::engine::general_purpose::STANDARD.encode(bad_md5);
2582        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2583        assert!(
2584            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 length")),
2585            "got {err:?}"
2586        );
2587    }
2588
2589    #[test]
2590    fn parse_customer_key_headers_rejects_md5_mismatch() {
2591        let key = [4u8; KEY_LEN];
2592        let other = [5u8; KEY_LEN];
2593        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2594        let wrong_md5 = compute_key_md5(&other);
2595        let mb = base64::engine::general_purpose::STANDARD.encode(wrong_md5);
2596        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2597        assert!(
2598            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 does not match")),
2599            "got {err:?}"
2600        );
2601    }
2602
2603    #[test]
2604    fn parse_customer_key_headers_rejects_bad_base64() {
2605        let valid_key = [0u8; KEY_LEN];
2606        let md5 = compute_key_md5(&valid_key);
2607        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2608        let err = parse_customer_key_headers("AES256", "!!!not-base64!!!", &mb).unwrap_err();
2609        assert!(
2610            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2611            "got {err:?}"
2612        );
2613        // Bad MD5 base64.
2614        let kb = base64::engine::general_purpose::STANDARD.encode(valid_key);
2615        let err2 = parse_customer_key_headers("AES256", &kb, "??not-base64??").unwrap_err();
2616        assert!(
2617            matches!(err2, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2618            "got {err2:?}"
2619        );
2620    }
2621
2622    #[test]
2623    fn parse_customer_key_headers_trims_whitespace() {
2624        // S3 SDKs sometimes pad headers with trailing newlines.
2625        let key = [12u8; KEY_LEN];
2626        let md5 = compute_key_md5(&key);
2627        let kb = format!(
2628            "  {}\n",
2629            base64::engine::general_purpose::STANDARD.encode(key)
2630        );
2631        let mb = format!(
2632            "\t{}  ",
2633            base64::engine::general_purpose::STANDARD.encode(md5)
2634        );
2635        let m = parse_customer_key_headers("AES256", &kb, &mb).unwrap();
2636        assert_eq!(m.key, key);
2637    }
2638
2639    // -----------------------------------------------------------------
2640    // Back-compat + cross-source mixing
2641    // -----------------------------------------------------------------
2642
2643    #[test]
2644    fn back_compat_decrypt_s4e1_with_keyring_source() {
2645        let k = key32(33);
2646        let legacy_ct = encrypt(&k, b"v0.4 vintage object");
2647        let kr = SseKeyring::new(1, Arc::clone(&k));
2648        // Both call styles must work — `&kr` (back-compat) and
2649        // `SseSource::Keyring(&kr)` (explicit).
2650        let plain = decrypt(&legacy_ct, &kr).unwrap();
2651        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2652        let plain2 = decrypt(&legacy_ct, SseSource::Keyring(&kr)).unwrap();
2653        assert_eq!(plain2.as_ref(), b"v0.4 vintage object");
2654    }
2655
2656    #[test]
2657    fn back_compat_decrypt_s4e2_with_keyring_source() {
2658        let kr = keyring_single(34);
2659        let ct = encrypt_v2(b"v0.5 #29 object", &kr);
2660        let plain = decrypt(&ct, &kr).unwrap();
2661        assert_eq!(plain.as_ref(), b"v0.5 #29 object");
2662        // encrypt_with_source(Keyring) should produce the same wire
2663        // format (S4E2).
2664        let ct2 = encrypt_with_source(b"v0.5 #29 object", SseSource::Keyring(&kr));
2665        assert_eq!(&ct2[..4], SSE_MAGIC_V2);
2666        let plain2 = decrypt(&ct2, &kr).unwrap();
2667        assert_eq!(plain2.as_ref(), b"v0.5 #29 object");
2668    }
2669
2670    #[test]
2671    fn s4e2_blob_with_customer_key_source_is_rejected() {
2672        // An object stored with SSE-S4 (S4E2) but a client sending
2673        // SSE-C headers on the GET — this is a misuse, surface as
2674        // CustomerKeyUnexpected so service.rs can return 400.
2675        let kr = keyring_single(50);
2676        let ct = encrypt_v2(b"server-managed object", &kr);
2677        let m = cust_key(99);
2678        let err = decrypt(
2679            &ct,
2680            SseSource::CustomerKey {
2681                key: &m.key,
2682                key_md5: &m.key_md5,
2683            },
2684        )
2685        .unwrap_err();
2686        assert!(matches!(err, SseError::CustomerKeyUnexpected), "got {err:?}");
2687    }
2688
2689    #[test]
2690    fn s4e3_blob_with_keyring_source_is_rejected() {
2691        // Inverse: object is SSE-C (S4E3) but client forgot to send
2692        // SSE-C headers. Service.rs should map this to 400.
2693        let m = cust_key(60);
2694        let ct = encrypt_with_source(b"customer-key object", (&m).into());
2695        let kr = keyring_single(60);
2696        let err = decrypt(&ct, &kr).unwrap_err();
2697        assert!(matches!(err, SseError::CustomerKeyRequired), "got {err:?}");
2698    }
2699
2700    #[test]
2701    fn looks_encrypted_detects_s4e3() {
2702        let m = cust_key(13);
2703        let ct = encrypt_with_source(b"x", (&m).into());
2704        assert!(looks_encrypted(&ct));
2705    }
2706
2707    #[test]
2708    fn s4e3_rejects_short_body() {
2709        // 36 bytes passes the looks_encrypted gate but is shorter than
2710        // S4E3's 49-byte header.
2711        let mut short = Vec::new();
2712        short.extend_from_slice(SSE_MAGIC_V3);
2713        short.push(ALGO_AES_256_GCM);
2714        // Padding to 36 bytes (SSE_HEADER_BYTES) so the outer length
2715        // check passes but the S4E3 inner check fails.
2716        short.extend_from_slice(&[0u8; SSE_HEADER_BYTES - 5]);
2717        assert_eq!(short.len(), SSE_HEADER_BYTES);
2718        let m = cust_key(1);
2719        let err = decrypt(
2720            &short,
2721            SseSource::CustomerKey {
2722                key: &m.key,
2723                key_md5: &m.key_md5,
2724            },
2725        )
2726        .unwrap_err();
2727        assert!(matches!(err, SseError::TooShort { .. }), "got {err:?}");
2728    }
2729
2730    #[test]
2731    fn customer_key_material_debug_redacts_key() {
2732        let m = cust_key(99);
2733        let s = format!("{m:?}");
2734        assert!(s.contains("redacted"));
2735        assert!(!s.contains(&format!("{:?}", m.key.as_slice())));
2736    }
2737
2738    #[test]
2739    fn constant_time_eq_basic() {
2740        assert!(constant_time_eq(b"abc", b"abc"));
2741        assert!(!constant_time_eq(b"abc", b"abd"));
2742        assert!(!constant_time_eq(b"abc", b"abcd"));
2743        assert!(constant_time_eq(b"", b""));
2744    }
2745
2746    #[test]
2747    fn compute_key_md5_known_vector() {
2748        // Empty input MD5 is known: d41d8cd98f00b204e9800998ecf8427e.
2749        let got = compute_key_md5(b"");
2750        let expected_hex = "d41d8cd98f00b204e9800998ecf8427e";
2751        assert_eq!(hex_lower(&got), expected_hex);
2752    }
2753
2754    // -----------------------------------------------------------------
2755    // v0.5 #28 — SSE-KMS envelope (S4E4) tests
2756    // -----------------------------------------------------------------
2757
2758    use crate::kms::{KmsBackend, LocalKms};
2759    use std::collections::HashMap;
2760    use std::path::PathBuf;
2761
2762    fn local_kms_with(key_ids: &[(&str, [u8; 32])]) -> LocalKms {
2763        let mut keks: HashMap<String, [u8; 32]> = HashMap::new();
2764        for (id, k) in key_ids {
2765            keks.insert((*id).to_string(), *k);
2766        }
2767        LocalKms::from_keks(PathBuf::from("/tmp/none"), keks)
2768    }
2769
2770    #[tokio::test]
2771    async fn s4e4_roundtrip_via_local_kms() {
2772        let kms = local_kms_with(&[("alpha", [42u8; 32])]);
2773        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2774        let mut dek = [0u8; 32];
2775        dek.copy_from_slice(&dek_vec);
2776        let pt = b"SSE-KMS envelope payload across the S4E4 frame";
2777        let ct = encrypt_with_source(
2778            pt,
2779            SseSource::Kms {
2780                dek: &dek,
2781                wrapped: &wrapped,
2782            },
2783        );
2784        // Frame inspection.
2785        assert_eq!(&ct[..4], SSE_MAGIC_V4);
2786        assert_eq!(ct[4], ALGO_AES_256_GCM);
2787        let key_id_len = ct[5] as usize;
2788        assert_eq!(key_id_len, "alpha".len());
2789        assert_eq!(&ct[6..6 + key_id_len], b"alpha");
2790        // peek_magic + looks_encrypted both recognise S4E4.
2791        assert!(looks_encrypted(&ct));
2792        assert_eq!(peek_magic(&ct), Some("S4E4"));
2793        // Async decrypt round-trip.
2794        let plain = decrypt_with_kms(&ct, &kms).await.unwrap();
2795        assert_eq!(plain.as_ref(), pt);
2796    }
2797
2798    #[tokio::test]
2799    async fn s4e4_tampered_key_id_fails_aead() {
2800        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2801        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2802        let mut dek = [0u8; 32];
2803        dek.copy_from_slice(&dek_vec);
2804        let mut ct = encrypt_with_source(
2805            b"do not redirect",
2806            SseSource::Kms {
2807                dek: &dek,
2808                wrapped: &wrapped,
2809            },
2810        )
2811        .to_vec();
2812        // Flip the key_id from "alpha" to "betaa" by changing the
2813        // first byte of the key_id field. The forged id "bltha" is
2814        // not in the KMS, so unwrap fails with KeyNotFound surfaced
2815        // through KmsBackend(KmsError::KeyNotFound).
2816        let key_id_off = 6;
2817        ct[key_id_off] = b'b';
2818        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2819        assert!(
2820            matches!(
2821                err,
2822                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2823                    | SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2824            ),
2825            "got {err:?}"
2826        );
2827    }
2828
2829    #[tokio::test]
2830    async fn s4e4_tampered_key_id_to_real_other_id_still_fails() {
2831        // Wrap under "alpha" but rewrite the stored key_id to "beta"
2832        // (which IS in the KMS). KmsBackend will try to unwrap with
2833        // beta's KEK and AAD = "beta", but the wrapped bytes were
2834        // produced with alpha's KEK + AAD = "alpha", so the local
2835        // KMS unwrap fails with UnwrapFailed.
2836        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2837        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2838        let mut dek = [0u8; 32];
2839        dek.copy_from_slice(&dek_vec);
2840        let mut ct = encrypt_with_source(
2841            b"redirect attempt",
2842            SseSource::Kms {
2843                dek: &dek,
2844                wrapped: &wrapped,
2845            },
2846        )
2847        .to_vec();
2848        // Both "alpha" and "beta" are 5 chars long so the rewrite
2849        // doesn't shift any other field offsets.
2850        let key_id_off = 6;
2851        ct[key_id_off..key_id_off + 5].copy_from_slice(b"beta_");
2852        // Trim back to 4-byte "beta" by also shrinking the length
2853        // prefix would change downstream offsets — instead pad the
2854        // forged id to keep length stable. This mirrors the realistic
2855        // tampering surface (attacker can flip bytes but not change
2856        // the on-disk layout). The KMS now sees key_id "beta_" which
2857        // is unknown → KeyNotFound.
2858        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2859        assert!(
2860            matches!(
2861                err,
2862                SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2863            ),
2864            "got {err:?}"
2865        );
2866    }
2867
2868    #[tokio::test]
2869    async fn s4e4_tampered_wrapped_dek_fails_unwrap() {
2870        let kms = local_kms_with(&[("k", [3u8; 32])]);
2871        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2872        let mut dek = [0u8; 32];
2873        dek.copy_from_slice(&dek_vec);
2874        let mut ct = encrypt_with_source(
2875            b"target body",
2876            SseSource::Kms {
2877                dek: &dek,
2878                wrapped: &wrapped,
2879            },
2880        )
2881        .to_vec();
2882        // Locate the wrapped_dek_len + wrapped_dek field and flip a
2883        // byte in the middle of the wrapped DEK. AES-GCM auth on the
2884        // wrap fails → KmsBackend(UnwrapFailed).
2885        let key_id_len = ct[5] as usize;
2886        let wrapped_len_off = 6 + key_id_len;
2887        let wrapped_off = wrapped_len_off + 4;
2888        let mid = wrapped_off + (wrapped.ciphertext.len() / 2);
2889        ct[mid] ^= 0xFF;
2890        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2891        assert!(
2892            matches!(
2893                err,
2894                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2895            ),
2896            "got {err:?}"
2897        );
2898    }
2899
2900    #[tokio::test]
2901    async fn s4e4_tampered_ciphertext_fails_aead() {
2902        let kms = local_kms_with(&[("k", [4u8; 32])]);
2903        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2904        let mut dek = [0u8; 32];
2905        dek.copy_from_slice(&dek_vec);
2906        let mut ct = encrypt_with_source(
2907            b"sealed body",
2908            SseSource::Kms {
2909                dek: &dek,
2910                wrapped: &wrapped,
2911            },
2912        )
2913        .to_vec();
2914        let last = ct.len() - 1;
2915        ct[last] ^= 0x01;
2916        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2917        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2918    }
2919
2920    #[tokio::test]
2921    async fn s4e4_uses_random_nonce_and_dek_per_put() {
2922        let kms = local_kms_with(&[("k", [5u8; 32])]);
2923        // Two PUTs of the same plaintext under the same KEK must
2924        // produce different ciphertexts (fresh DEK + fresh nonce).
2925        let (dek1_vec, wrapped1) = kms.generate_dek("k").await.unwrap();
2926        let (dek2_vec, wrapped2) = kms.generate_dek("k").await.unwrap();
2927        let mut dek1 = [0u8; 32];
2928        dek1.copy_from_slice(&dek1_vec);
2929        let mut dek2 = [0u8; 32];
2930        dek2.copy_from_slice(&dek2_vec);
2931        let pt = b"deterministic input";
2932        let a = encrypt_with_source(
2933            pt,
2934            SseSource::Kms {
2935                dek: &dek1,
2936                wrapped: &wrapped1,
2937            },
2938        );
2939        let b = encrypt_with_source(
2940            pt,
2941            SseSource::Kms {
2942                dek: &dek2,
2943                wrapped: &wrapped2,
2944            },
2945        );
2946        assert_ne!(a, b);
2947        // Both still decrypt round-trip.
2948        let plain_a = decrypt_with_kms(&a, &kms).await.unwrap();
2949        let plain_b = decrypt_with_kms(&b, &kms).await.unwrap();
2950        assert_eq!(plain_a.as_ref(), pt);
2951        assert_eq!(plain_b.as_ref(), pt);
2952    }
2953
2954    #[tokio::test]
2955    async fn s4e4_sync_decrypt_returns_kms_async_required() {
2956        // The whole point of KmsAsyncRequired: passing an S4E4 body
2957        // to the sync `decrypt` function must surface a distinct
2958        // error so service.rs's GET path notices the bug rather than
2959        // returning a generic "wrong source" 400.
2960        let kms = local_kms_with(&[("k", [6u8; 32])]);
2961        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2962        let mut dek = [0u8; 32];
2963        dek.copy_from_slice(&dek_vec);
2964        let ct = encrypt_with_source(
2965            b"async only",
2966            SseSource::Kms {
2967                dek: &dek,
2968                wrapped: &wrapped,
2969            },
2970        );
2971        // Try via Keyring source (the default sync path).
2972        let kr = SseKeyring::new(1, key32(0));
2973        let err = decrypt(&ct, &kr).unwrap_err();
2974        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2975    }
2976
2977    #[test]
2978    fn back_compat_s4e1_e2_e3_still_decrypt_via_sync() {
2979        // After adding S4E4, the sync `decrypt` path must still
2980        // handle every legacy frame variant unchanged.
2981        let k = key32(7);
2982        let v1 = encrypt(&k, b"v0.4 vintage");
2983        let kr = SseKeyring::new(1, Arc::clone(&k));
2984        assert_eq!(decrypt(&v1, &kr).unwrap().as_ref(), b"v0.4 vintage");
2985
2986        let v2 = encrypt_v2(b"v0.5 #29 vintage", &kr);
2987        assert_eq!(
2988            decrypt(&v2, &kr).unwrap().as_ref(),
2989            b"v0.5 #29 vintage"
2990        );
2991
2992        let m = cust_key(7);
2993        let v3 = encrypt_with_source(b"v0.5 #27 vintage", (&m).into());
2994        assert_eq!(
2995            decrypt(&v3, &m).unwrap().as_ref(),
2996            b"v0.5 #27 vintage"
2997        );
2998    }
2999
3000    #[test]
3001    fn peek_magic_distinguishes_all_variants() {
3002        // S4E1 / S4E2 / S4E3 — built from real encrypts so the
3003        // length gate also passes.
3004        let k = key32(9);
3005        let v1 = encrypt(&k, b"x");
3006        assert_eq!(peek_magic(&v1), Some("S4E1"));
3007        let kr = SseKeyring::new(1, Arc::clone(&k));
3008        let v2 = encrypt_v2(b"x", &kr);
3009        assert_eq!(peek_magic(&v2), Some("S4E2"));
3010        let m = cust_key(9);
3011        let v3 = encrypt_with_source(b"x", (&m).into());
3012        assert_eq!(peek_magic(&v3), Some("S4E3"));
3013        // Synthetic S4E4 magic with enough trailing bytes to clear
3014        // the 36-byte length gate. peek_magic does NOT validate the
3015        // S4E4 inner header, just the magic — that's the contract
3016        // (cheap dispatch signal).
3017        let mut v4 = Vec::new();
3018        v4.extend_from_slice(SSE_MAGIC_V4);
3019        v4.extend_from_slice(&[0u8; 40]);
3020        assert_eq!(peek_magic(&v4), Some("S4E4"));
3021        // Unknown magic / too-short input → None.
3022        assert!(peek_magic(b"NOPE").is_none());
3023        assert!(peek_magic(b"short").is_none());
3024        assert!(peek_magic(&[0u8; 100]).is_none());
3025    }
3026
3027    #[tokio::test]
3028    async fn s4e4_truncated_frame_errors_cleanly() {
3029        // Truncate to less than the minimum header. Must surface
3030        // KmsFrameTooShort, not panic, not return BadMagic.
3031        let truncated = b"S4E4\x01\x05hi";
3032        let kms = local_kms_with(&[("k", [1u8; 32])]);
3033        let err = decrypt_with_kms(truncated, &kms).await.unwrap_err();
3034        assert!(
3035            matches!(err, SseError::KmsFrameTooShort { .. }),
3036            "got {err:?}"
3037        );
3038    }
3039
3040    #[tokio::test]
3041    async fn s4e4_oob_key_id_len_errors() {
3042        // Build a body that claims key_id_len = 200 but only has 4
3043        // bytes after the length prefix. parse_s4e4_header must
3044        // refuse with KmsFrameFieldOob, not slice-panic.
3045        let mut body = Vec::new();
3046        body.extend_from_slice(SSE_MAGIC_V4);
3047        body.push(ALGO_AES_256_GCM);
3048        body.push(200u8); // key_id_len
3049        // Remaining bytes < 200; pad to clear the looks_encrypted
3050        // floor (36 bytes) but stay short of the claimed key_id +
3051        // wrapped_dek_len + nonce + tag layout.
3052        body.extend_from_slice(&[0u8; 50]);
3053        let kms = local_kms_with(&[("k", [1u8; 32])]);
3054        let err = decrypt_with_kms(&body, &kms).await.unwrap_err();
3055        assert!(
3056            matches!(err, SseError::KmsFrameFieldOob { .. }),
3057            "got {err:?}"
3058        );
3059    }
3060
3061    #[tokio::test]
3062    async fn s4e4_via_keyring_source_into_sync_decrypt_is_kms_async_required() {
3063        // S4E4 + Keyring source: sync decrypt sees the S4E4 magic
3064        // first and returns KmsAsyncRequired regardless of source —
3065        // the source mismatch never gets a chance to surface, which
3066        // is the right behaviour (caller's bug is "didn't peek
3067        // magic" not "wrong source").
3068        let kms = local_kms_with(&[("k", [9u8; 32])]);
3069        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
3070        let mut dek = [0u8; 32];
3071        dek.copy_from_slice(&dek_vec);
3072        let ct = encrypt_with_source(
3073            b"x",
3074            SseSource::Kms {
3075                dek: &dek,
3076                wrapped: &wrapped,
3077            },
3078        );
3079        let m = cust_key(1);
3080        let err = decrypt(&ct, &m).unwrap_err();
3081        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
3082    }
3083
3084    #[tokio::test]
3085    async fn s4e4_looks_encrypted_passthrough_returns_false_for_synthetic() {
3086        // S4F4 (note F not E) must NOT be confused with S4E4.
3087        let mut not_s4e4 = Vec::new();
3088        not_s4e4.extend_from_slice(b"S4F4");
3089        not_s4e4.extend_from_slice(&[0u8; 60]);
3090        assert!(!looks_encrypted(&not_s4e4));
3091        assert_eq!(peek_magic(&not_s4e4), None);
3092    }
3093
3094    #[tokio::test]
3095    async fn s4e4_aad_length_prefix_prevents_byte_shifting() {
3096        // Constructing an S4E4 body where the wrapped_dek_len is
3097        // shrunk by N bytes and the same N bytes are prepended to
3098        // the key_id-equivalent area would, without length-prefixed
3099        // AAD, produce the same AAD bytestream. Verify our AAD
3100        // includes the length prefixes by tampering with
3101        // wrapped_dek_len and confirming AES-GCM auth fails.
3102        let kms = local_kms_with(&[("kk", [11u8; 32])]);
3103        let (dek_vec, wrapped) = kms.generate_dek("kk").await.unwrap();
3104        let mut dek = [0u8; 32];
3105        dek.copy_from_slice(&dek_vec);
3106        let mut ct = encrypt_with_source(
3107            b"length-shift defense",
3108            SseSource::Kms {
3109                dek: &dek,
3110                wrapped: &wrapped,
3111            },
3112        )
3113        .to_vec();
3114        let key_id_len = ct[5] as usize;
3115        let wrapped_len_off = 6 + key_id_len;
3116        // Shrink wrapped_dek_len by 1. parse_s4e4_header now reads a
3117        // shorter wrapped_dek and a different nonce/tag/ciphertext
3118        // alignment — KMS unwrap fails OR AES-GCM fails OR frame
3119        // bounds reject. All three surface as auditable errors;
3120        // none should reach a successful decrypt.
3121        let original_len = u32::from_be_bytes([
3122            ct[wrapped_len_off],
3123            ct[wrapped_len_off + 1],
3124            ct[wrapped_len_off + 2],
3125            ct[wrapped_len_off + 3],
3126        ]);
3127        let new_len = (original_len - 1).to_be_bytes();
3128        ct[wrapped_len_off..wrapped_len_off + 4].copy_from_slice(&new_len);
3129        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
3130        // Acceptable failure modes: unwrap fail (truncated wrapped
3131        // DEK), AES-GCM fail (shifted nonce/tag/AAD), or frame bounds.
3132        assert!(
3133            matches!(
3134                err,
3135                SseError::KmsBackend(_)
3136                    | SseError::DecryptFailed
3137                    | SseError::KmsFrameFieldOob { .. }
3138                    | SseError::KmsFrameTooShort { .. }
3139            ),
3140            "got {err:?}"
3141        );
3142    }
3143
3144    // -----------------------------------------------------------------------
3145    // v0.8 #52: S4E5 chunked SSE-S4 — encrypt_v2_chunked / decrypt_chunked_stream
3146    // -----------------------------------------------------------------------
3147
3148    use futures::StreamExt;
3149
3150    /// Drain a chunked-decrypt stream into a `Vec<Bytes>` for assertion.
3151    /// Surfaces the first error verbatim (so tests can match on it).
3152    async fn collect_chunks(
3153        s: impl futures::Stream<Item = Result<Bytes, SseError>>,
3154    ) -> Result<Vec<Bytes>, SseError> {
3155        let mut out = Vec::new();
3156        let mut s = std::pin::pin!(s);
3157        while let Some(item) = s.next().await {
3158            out.push(item?);
3159        }
3160        Ok(out)
3161    }
3162
3163    #[test]
3164    fn s4e6_encrypt_layout_10mb_at_1mib() {
3165        // v0.8.1 #57: encrypt_v2_chunked now emits S4E6 (24-byte
3166        // header, 8-byte salt). 10 MB plaintext at 1 MiB chunk
3167        // size → magic "S4E6", chunk_count=10, header bytes line
3168        // up to the documented 24 + 10 * 16 + 10 MB layout.
3169        let kr = keyring_single(0x42);
3170        let chunk_size = 1024 * 1024;
3171        let pt_len = 10 * 1024 * 1024;
3172        let pt = vec![0xAB_u8; pt_len];
3173        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).expect("encrypt ok");
3174        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new PUTs emit S4E6 (v0.8.1 #57)");
3175        assert_eq!(ct[4], ALGO_AES_256_GCM);
3176        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE = active id");
3177        assert_eq!(ct[7], 0, "reserved must be 0");
3178        assert_eq!(
3179            u32::from_be_bytes([ct[8], ct[9], ct[10], ct[11]]),
3180            chunk_size as u32,
3181            "chunk_size BE",
3182        );
3183        assert_eq!(
3184            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3185            10,
3186            "chunk_count BE — 10 MiB / 1 MiB = 10 (no remainder)",
3187        );
3188        // Salt now 8 bytes — verify the slice exists and isn't all
3189        // zeros (defensive: catches a stuck PRNG that would leave
3190        // the salt array uninitialized).
3191        assert_eq!(&ct[16..24].len(), &8, "S4E6 salt slot is 8 bytes");
3192        assert_ne!(&ct[16..24], &[0u8; 8], "S4E6 salt must be random, not zeros");
3193        assert_eq!(
3194            ct.len(),
3195            S4E6_HEADER_BYTES + 10 * S4E6_PER_CHUNK_OVERHEAD + pt_len,
3196            "total = header (24) + 10 tags + plaintext",
3197        );
3198        assert!(looks_encrypted(&ct), "looks_encrypted must accept S4E6");
3199        assert_eq!(peek_magic(&ct), Some("S4E6"));
3200    }
3201
3202    #[tokio::test]
3203    async fn s4e6_decrypt_chunked_stream_byte_equal() {
3204        // v0.8.1 #57: round-trip via S4E6 path. encrypt_v2_chunked
3205        // emits S4E6, decrypt_chunked_stream consumes S4E5/S4E6.
3206        let kr = keyring_single(0x55);
3207        let pt: Vec<u8> = (0..(10 * 1024 * 1024_u32)).map(|i| (i & 0xFF) as u8).collect();
3208        let ct = encrypt_v2_chunked(&pt, &kr, 1024 * 1024).unwrap();
3209        // Sanity: the new PUT is S4E6.
3210        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new emit is S4E6");
3211        let stream = decrypt_chunked_stream(ct, &kr);
3212        let chunks = collect_chunks(stream).await.expect("stream ok");
3213        assert_eq!(chunks.len(), 10, "10 chunks expected for 10 MiB / 1 MiB");
3214        let mut joined = Vec::with_capacity(pt.len());
3215        for c in chunks {
3216            joined.extend_from_slice(&c);
3217        }
3218        assert_eq!(joined.len(), pt.len(), "byte length matches");
3219        assert_eq!(joined, pt, "byte-equal round-trip");
3220    }
3221
3222    #[tokio::test]
3223    async fn s4e6_single_chunk_for_small_object() {
3224        // Plaintext smaller than chunk_size → chunk_count=1. The
3225        // chunk_count field offset is unchanged between S4E5 and
3226        // S4E6 (both at body[12..16]); only the salt width differs.
3227        let kr = keyring_single(0x77);
3228        let pt = b"tiny payload, smaller than chunk_size";
3229        let ct = encrypt_v2_chunked(pt, &kr, 1024 * 1024).unwrap();
3230        assert_eq!(
3231            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3232            1,
3233            "small plaintext = single chunk",
3234        );
3235        let stream = decrypt_chunked_stream(ct, &kr);
3236        let chunks = collect_chunks(stream).await.expect("stream ok");
3237        assert_eq!(chunks.len(), 1);
3238        assert_eq!(chunks[0].as_ref(), pt);
3239    }
3240
3241    #[tokio::test]
3242    async fn s4e6_tampered_chunk_n_reports_chunk_index() {
3243        // v0.8.1 #57: same tamper-detect contract as the S4E5
3244        // version, just with the wider 24-byte header. Tamper
3245        // byte inside chunk index 3 (= 4th chunk) — the stream
3246        // must yield 3 successful chunks, then ChunkAuthFailed { 3 }.
3247        let kr = keyring_single(0x91);
3248        let chunk_size = 1024;
3249        let pt = vec![0xCD_u8; chunk_size * 8]; // 8 chunks
3250        let mut ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap().to_vec();
3251        // Locate chunk 3's first ciphertext byte: header (24) + 3 *
3252        // (tag 16 + ct 1024) + tag 16 = 24 + 3*1040 + 16 = 3160.
3253        let target = S4E6_HEADER_BYTES + 3 * (TAG_LEN + chunk_size) + TAG_LEN;
3254        ct[target] ^= 0x42;
3255        let stream = decrypt_chunked_stream(bytes::Bytes::from(ct), &kr);
3256        let mut s = std::pin::pin!(stream);
3257        // Chunks 0, 1, 2 must succeed.
3258        for expected_i in 0..3_u32 {
3259            let item = s.next().await.expect("yield");
3260            item.unwrap_or_else(|e| panic!("chunk {expected_i}: {e:?}"));
3261        }
3262        // Chunk 3 fails with the right index.
3263        let err = s.next().await.expect("yield error").unwrap_err();
3264        assert!(
3265            matches!(err, SseError::ChunkAuthFailed { chunk_index: 3 }),
3266            "got {err:?}",
3267        );
3268    }
3269
3270    #[tokio::test]
3271    async fn s4e5_back_compat_s4e2_blob_rejected_with_clear_error() {
3272        // Feeding an S4E2 frame to decrypt_chunked_stream should
3273        // surface BadMagic on the first poll (NOT silently fall
3274        // back — the caller is expected to peek_magic and dispatch).
3275        let kr = keyring_single(0x12);
3276        let s4e2 = encrypt_v2(b"a v2 blob, not chunked", &kr);
3277        let stream = decrypt_chunked_stream(s4e2, &kr);
3278        let result = collect_chunks(stream).await;
3279        let err = result.unwrap_err();
3280        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3281    }
3282
3283    #[test]
3284    fn s4e6_salt_randomness_smoke() {
3285        // 8-byte salt → birthday paradox 50% collision at ~2^32
3286        // (~4.3 billion) PUTs. 1024 PUTs → effectively zero
3287        // expected collisions; we don't enforce zero, just
3288        // sanity-check the salt actually differs more than half
3289        // the time (catches a stuck PRNG without a 4-billion-PUT
3290        // test).
3291        let kr = keyring_single(0x33);
3292        let mut salts = std::collections::HashSet::new();
3293        let n = 1024;
3294        for _ in 0..n {
3295            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3296            let mut salt = [0u8; 8];
3297            salt.copy_from_slice(&ct[16..24]);
3298            salts.insert(salt);
3299        }
3300        assert!(
3301            salts.len() > n / 2,
3302            "expected most of the {n} salts to be unique (got {} unique)",
3303            salts.len(),
3304        );
3305    }
3306
3307    #[test]
3308    fn s4e6_chunk_size_zero_invalid() {
3309        let kr = keyring_single(0x66);
3310        let err = encrypt_v2_chunked(b"hi", &kr, 0).unwrap_err();
3311        assert!(matches!(err, SseError::ChunkSizeInvalid));
3312    }
3313
3314    #[tokio::test]
3315    async fn s4e6_truncated_body_reports_frame_truncated() {
3316        // Truncate inside chunk 2's tag → ChunkFrameTruncated, not
3317        // panic, not silent success. Header is now 24 bytes (S4E6).
3318        let kr = keyring_single(0xA1);
3319        let chunk_size = 256;
3320        let pt = vec![0u8; chunk_size * 4];
3321        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3322        // Truncate to inside chunk 2's tag: header + chunk0 + chunk1
3323        // + 8B partial of chunk2's tag.
3324        let trunc = S4E6_HEADER_BYTES + 2 * (TAG_LEN + chunk_size) + 8;
3325        let truncated = bytes::Bytes::copy_from_slice(&ct[..trunc]);
3326        let stream = decrypt_chunked_stream(truncated, &kr);
3327        let result = collect_chunks(stream).await;
3328        let err = result.unwrap_err();
3329        assert!(
3330            matches!(err, SseError::ChunkFrameTruncated { .. }),
3331            "got {err:?}",
3332        );
3333    }
3334
3335    #[test]
3336    fn s4e6_decrypt_buffered_round_trip_via_top_level_decrypt() {
3337        // Sync `decrypt(blob, &keyring)` must also accept the
3338        // chunked frames (back-compat path for callers that need
3339        // the whole plaintext).
3340        let kr = keyring_single(0xDE);
3341        let pt = b"buffered sync decrypt path".repeat(32);
3342        let ct = encrypt_v2_chunked(&pt, &kr, 13).unwrap();
3343        let plain = decrypt(&ct, &kr).expect("buffered S4E6 decrypt ok");
3344        assert_eq!(plain.as_ref(), pt.as_slice());
3345    }
3346
3347    #[tokio::test]
3348    async fn s4e6_unknown_key_id_in_frame_errors() {
3349        // Encrypt under id=7, decrypt under a keyring that lacks id=7.
3350        let kr_put = SseKeyring::new(7, key32(0xCC));
3351        let kr_get = keyring_single(0xCC); // only id=1
3352        let ct = encrypt_v2_chunked(b"orphan key", &kr_put, 64).unwrap();
3353        // Sync path
3354        let err = decrypt(&ct, &kr_get).unwrap_err();
3355        assert!(matches!(err, SseError::KeyNotInKeyring { id: 7 }), "got {err:?}");
3356        // Stream path
3357        let stream = decrypt_chunked_stream(ct, &kr_get);
3358        let result = collect_chunks(stream).await;
3359        assert!(
3360            matches!(result, Err(SseError::KeyNotInKeyring { id: 7 })),
3361            "got {result:?}",
3362        );
3363    }
3364
3365    #[tokio::test]
3366    async fn s4e6_final_chunk_smaller_than_chunk_size() {
3367        // Plaintext = 2.5 chunks → final chunk holds half the bytes.
3368        // S4E6 header = 24 bytes → total on-disk = 24 + 48 + 250.
3369        let kr = keyring_single(0xEF);
3370        let chunk_size = 100;
3371        let pt: Vec<u8> = (0..250_u32).map(|i| i as u8).collect();
3372        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3373        assert_eq!(
3374            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3375            3,
3376            "ceil(250/100) = 3 chunks",
3377        );
3378        // Total on-disk: 24 header + 3 tags (48) + 250 plaintext = 322.
3379        assert_eq!(ct.len(), S4E6_HEADER_BYTES + 48 + 250);
3380        let stream = decrypt_chunked_stream(ct, &kr);
3381        let chunks = collect_chunks(stream).await.expect("stream ok");
3382        assert_eq!(chunks.len(), 3);
3383        assert_eq!(chunks[0].len(), 100);
3384        assert_eq!(chunks[1].len(), 100);
3385        assert_eq!(chunks[2].len(), 50, "final chunk is the remainder");
3386        let joined: Vec<u8> = chunks.iter().flat_map(|c| c.iter().copied()).collect();
3387        assert_eq!(joined, pt);
3388    }
3389
3390    // -----------------------------------------------------------------------
3391    // v0.8.1 #57: S4E6-specific tests added on top of the renamed
3392    // s4e6_* battery above. Keep these focused on what's *new*:
3393    //   - back-compat read of legacy S4E5 blobs
3394    //   - 24-bit chunk_count cap
3395    //   - the parse_s4e6_header public API
3396    // -----------------------------------------------------------------------
3397
3398    #[test]
3399    fn s4e6_back_compat_read_s4e5_blob() {
3400        // Synthesize a "v0.8.0 vintage" S4E5 blob via the test-only
3401        // helper, then prove the v0.8.1 gateway decrypts it under
3402        // the same keyring — both via sync `decrypt` (buffered)
3403        // and the streaming path. Without this, every S4E5 object
3404        // in production becomes unreadable after the upgrade.
3405        let kr = keyring_single(0x57);
3406        let pt = b"v0.8.0 vintage chunked SSE-S4 object".repeat(64);
3407        let s4e5 = encrypt_v2_chunked_s4e5_for_test(&pt, &kr, 91).unwrap();
3408        // Confirm the test fixture really is S4E5 magic + 20-byte header.
3409        assert_eq!(&s4e5[..4], SSE_MAGIC_V5, "fixture must be S4E5");
3410        assert_eq!(peek_magic(&s4e5), Some("S4E5"));
3411        // Sync decrypt path (top-level `decrypt`, dispatches V5 + V6).
3412        let plain_sync = decrypt(&s4e5, &kr).expect("sync S4E5 decrypt ok");
3413        assert_eq!(plain_sync.as_ref(), pt.as_slice());
3414        // Streaming decrypt path — must also accept S4E5.
3415        let collected = futures::executor::block_on(async {
3416            let stream = decrypt_chunked_stream(s4e5.clone(), &kr);
3417            collect_chunks(stream).await
3418        })
3419        .expect("stream S4E5 decrypt ok");
3420        let mut joined = Vec::with_capacity(pt.len());
3421        for c in collected {
3422            joined.extend_from_slice(&c);
3423        }
3424        assert_eq!(joined, pt, "S4E5 streaming round-trip byte-equal");
3425    }
3426
3427    #[test]
3428    fn s4e6_layout_24_bytes_header() {
3429        // Sanity check: the S4E6 fixed header is exactly 24 bytes
3430        // (vs 20 for S4E5). Catches an accidental const drift in a
3431        // future PR.
3432        assert_eq!(S4E6_HEADER_BYTES, 24);
3433        assert_eq!(S4E6_PER_CHUNK_OVERHEAD, TAG_LEN);
3434        assert_eq!(S4E6_HEADER_BYTES, S4E5_HEADER_BYTES + 4);
3435    }
3436
3437    #[test]
3438    fn s4e6_parse_header_round_trip() {
3439        // parse_s4e6_header is the public mirror of the internal
3440        // parse_chunked_header, useful for admin tools. Verify it
3441        // returns the same field values that encrypt_v2_chunked wrote.
3442        let kr = keyring_single(0xAB);
3443        let chunk_size = 256;
3444        let pt = vec![1u8; 7 * chunk_size];
3445        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3446        let hdr = parse_s4e6_header(&ct).expect("parse ok");
3447        assert_eq!(hdr.key_id, 1);
3448        assert_eq!(hdr.chunk_size, chunk_size as u32);
3449        assert_eq!(hdr.chunk_count, 7);
3450        assert_eq!(hdr.salt.len(), 8);
3451        // Bad magic on a non-S4E6 blob → BadMagic.
3452        let bogus = b"S4E2\x01\x00\x00\x00........................";
3453        let err = parse_s4e6_header(bogus).unwrap_err();
3454        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3455        // Truncation → ChunkFrameTruncated.
3456        let err2 = parse_s4e6_header(&ct[..10]).unwrap_err();
3457        assert!(matches!(err2, SseError::ChunkFrameTruncated { .. }), "got {err2:?}");
3458    }
3459
3460    #[test]
3461    fn s4e6_salt_uniqueness_smoke_16m() {
3462        // v0.8.1 #57 security regression detection: with the
3463        // 4-byte S4E5 salt, ~65,536 PUTs already had ~50%
3464        // birthday collision (the bug that motivated this patch).
3465        // With the 8-byte S4E6 salt the expected collisions over
3466        // 65,536 PUTs is ~2^16 * 2^16 / 2^65 ≈ 2^-33 — i.e.
3467        // effectively zero.
3468        //
3469        // We can't actually run 16M PUTs in unit-test wall-clock
3470        // (each PUT does an AES-GCM encrypt), so we run a fast
3471        // smoke (16k) and additionally validate the math: at the
3472        // S4E5 4-byte salt width, 16k PUTs would already give a
3473        // ~3.1% collision probability by birthday bound; at the
3474        // S4E6 8-byte salt that drops to ~3.6e-11. The smoke test
3475        // therefore *would* show collisions if we'd accidentally
3476        // shipped the 4-byte salt — confirming the regression
3477        // detector.
3478        let kr = keyring_single(0xA6);
3479        let mut salts = std::collections::HashSet::with_capacity(16384);
3480        let n = 16384_usize;
3481        let mut collisions_top4 = 0usize;
3482        let mut top4_seen = std::collections::HashSet::with_capacity(16384);
3483        for _ in 0..n {
3484            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3485            let mut salt = [0u8; 8];
3486            salt.copy_from_slice(&ct[16..24]);
3487            salts.insert(salt);
3488            // Side-channel: count collisions on just the *first 4
3489            // bytes* of the 8-byte salt. If we'd kept the old
3490            // 4-byte salt, this collision count would be the only
3491            // collision count — and at n=16k it should be ~62
3492            // (birthday: n^2/(2 * 2^32) = 16384^2/2^33 ≈ 31, with
3493            // some noise). The full 8-byte salt test passes if the
3494            // FULL salts are all unique while the truncated-to-4
3495            // count is non-zero, proving the extra 4 bytes really
3496            // are doing the security work.
3497            let mut top4 = [0u8; 4];
3498            top4.copy_from_slice(&salt[..4]);
3499            if !top4_seen.insert(top4) {
3500                collisions_top4 += 1;
3501            }
3502        }
3503        assert_eq!(
3504            salts.len(),
3505            n,
3506            "all 8-byte salts must be unique across {n} PUTs (got {} unique)",
3507            salts.len(),
3508        );
3509        // Sanity check the regression detector: at 16k PUTs with a
3510        // 4-byte salt, birthday math predicts ~31 collisions on
3511        // average. Anything in the 0..200 range is statistically
3512        // believable for 16k uniform 32-bit draws; we only assert
3513        // ">= 1" (i.e. at least one collision happened — which
3514        // would have been a real bug under S4E5).
3515        eprintln!(
3516            "s4e6_salt_uniqueness_smoke_16m: 16k PUTs, full 8B salts \
3517             all unique ({}/{}), simulated 4B-truncated salt yielded \
3518             {} collisions (this is what S4E5 would have shipped)",
3519            salts.len(),
3520            n,
3521            collisions_top4,
3522        );
3523        // Don't make the test flaky on the simulated number (it's a
3524        // statistical signal); just leave the eprintln for the
3525        // operator audit log when the test runs verbose.
3526    }
3527
3528    #[test]
3529    fn s4e6_max_chunks_24bit() {
3530        // The S4E6 nonce embeds the chunk index as 24-bit BE, so
3531        // chunk_count > 2^24 - 1 must surface ChunkCountTooLarge
3532        // at PUT time. We can't actually run a 16M-chunk encrypt
3533        // in unit-test wall-clock (16M AES-GCM tag verifies even
3534        // on AES-NI is several minutes), but we can verify the
3535        // CAP constant matches expectations + exercise the cap by
3536        // picking a chunk_size that forces overflow on a tiny
3537        // plaintext.
3538        assert_eq!(S4E6_MAX_CHUNK_COUNT, (1u32 << 24) - 1);
3539        assert_eq!(S4E6_MAX_CHUNK_COUNT, 16_777_215);
3540
3541        // chunk_size=1 + plaintext.len()=16_777_216 → 16M+1 chunks
3542        // → over the cap → ChunkCountTooLarge. Allocating a 16
3543        // MiB plaintext is fine.
3544        let kr = keyring_single(0xC4);
3545        let pt = vec![0u8; (S4E6_MAX_CHUNK_COUNT as usize) + 1]; // 16,777,216 B
3546        let err = encrypt_v2_chunked(&pt, &kr, 1).unwrap_err();
3547        assert!(
3548            matches!(
3549                err,
3550                SseError::ChunkCountTooLarge {
3551                    got: 16_777_216,
3552                    max: 16_777_215
3553                }
3554            ),
3555            "got {err:?}",
3556        );
3557
3558        // And just under the cap (chunk_count = 16_777_215) should
3559        // succeed. We pick chunk_size that produces exactly the cap
3560        // so the inner loop only runs N times. 16M chunk-encrypts
3561        // would be slow, so test with a smaller cap-near config
3562        // that exercises the same boundary check: 1023 chunks of 1
3563        // byte each = 1023 chunks well under the cap → success.
3564        // The actual on-cap encrypt is exercised by the buffered
3565        // decrypt path through `parse_chunked_header`.
3566        let pt_ok = vec![0u8; 1023];
3567        let ct = encrypt_v2_chunked(&pt_ok, &kr, 1).expect("under-cap PUT must succeed");
3568        let hdr = parse_s4e6_header(&ct).unwrap();
3569        assert_eq!(hdr.chunk_count, 1023);
3570
3571        // Synthesize a frame that *claims* chunk_count > cap and
3572        // verify the parser rejects it (defensive: a tampered
3573        // header should not loop the walker 16M+ times).
3574        let mut tampered = ct.to_vec();
3575        // Rewrite chunk_count BE to S4E6_MAX_CHUNK_COUNT + 1 = 2^24.
3576        let bad = (S4E6_MAX_CHUNK_COUNT + 1).to_be_bytes();
3577        tampered[12..16].copy_from_slice(&bad);
3578        let err2 = parse_s4e6_header(&tampered).unwrap_err();
3579        assert!(
3580            matches!(
3581                err2,
3582                SseError::ChunkCountTooLarge { got: 16_777_216, max: 16_777_215 }
3583            ),
3584            "got {err2:?}",
3585        );
3586    }
3587
3588    #[test]
3589    fn s4e6_nonce_v6_layout() {
3590        // Direct unit test on nonce_v6: prefix b'E', then 8B salt,
3591        // then 24-bit BE chunk_index. The high byte of u32
3592        // chunk_index must be dropped (caller-validated cap).
3593        let salt = [0xAA_u8; 8];
3594        let n0 = nonce_v6(&salt, 0);
3595        assert_eq!(n0[0], b'E');
3596        assert_eq!(&n0[1..9], &salt);
3597        assert_eq!(&n0[9..12], &[0, 0, 0]);
3598        let n1 = nonce_v6(&salt, 1);
3599        assert_eq!(&n1[9..12], &[0, 0, 1]);
3600        let n_mid = nonce_v6(&salt, 0x123456);
3601        assert_eq!(&n_mid[9..12], &[0x12, 0x34, 0x56]);
3602        let n_max = nonce_v6(&salt, S4E6_MAX_CHUNK_COUNT);
3603        assert_eq!(&n_max[9..12], &[0xFF, 0xFF, 0xFF]);
3604    }
3605
3606    #[tokio::test]
3607    async fn s4e6_tampered_salt_byte_fails_aead() {
3608        // Flipping a single byte of the 8-byte salt in the header
3609        // must invalidate every chunk's AES-GCM tag (salt is in
3610        // the AAD). Confirms the salt expansion didn't drop
3611        // header authentication.
3612        let kr = keyring_single(0xB6);
3613        let pt = b"salt-in-aad coverage".repeat(64);
3614        let mut ct = encrypt_v2_chunked(&pt, &kr, 128).unwrap().to_vec();
3615        // Salt bytes 16..24 — flip the middle byte.
3616        ct[20] ^= 0x01;
3617        let err = decrypt(&ct, &kr).unwrap_err();
3618        assert!(
3619            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3620            "got {err:?}",
3621        );
3622    }
3623
3624    // -----------------------------------------------------------------------
3625    // v0.8.2 #64: pre-allocation guard for chunked SSE frames
3626    //
3627    // The buffered decrypt path used to call `Vec::with_capacity(chunk_size
3628    // * chunk_count)` *before* validating either factor. A 24-byte malicious
3629    // S4E6 header could declare a multi-PB plaintext and trigger an instant
3630    // OOM / panic on a tiny ciphertext. Verify the guard rejects every
3631    // adversarial header pattern before any allocation happens.
3632    // -----------------------------------------------------------------------
3633
3634    /// Build a minimal S4E6 header with attacker-controlled chunk_size /
3635    /// chunk_count (no chunks attached — the body is exactly 24 bytes).
3636    /// Used by the DoS tests below to synthesize malicious frames without
3637    /// running an encrypt.
3638    fn synth_s4e6_header(chunk_size: u32, chunk_count: u32) -> Vec<u8> {
3639        let mut blob = Vec::with_capacity(S4E6_HEADER_BYTES);
3640        blob.extend_from_slice(SSE_MAGIC_V6);
3641        blob.push(ALGO_AES_256_GCM);
3642        blob.extend_from_slice(&1_u16.to_be_bytes()); // key_id = 1
3643        blob.push(0); // reserved
3644        blob.extend_from_slice(&chunk_size.to_be_bytes());
3645        blob.extend_from_slice(&chunk_count.to_be_bytes());
3646        blob.extend_from_slice(&[0u8; 8]); // 8-byte salt
3647        debug_assert_eq!(blob.len(), S4E6_HEADER_BYTES);
3648        blob
3649    }
3650
3651    #[test]
3652    fn s4e6_header_claims_huge_size_rejected_pre_alloc() {
3653        // 1 GiB chunk_size × 100 chunks = 100 GiB declared plaintext, but
3654        // the body the attacker actually shipped is only 100 bytes. The
3655        // pre-alloc guard's cap arm fires (100 GiB > 5 GiB default cap)
3656        // *before* the walker / `Vec::with_capacity(100 GiB)` — surfaces
3657        // as ChunkFrameTooLarge.
3658        let kr = keyring_single(0x01);
3659        let chunk_size: u32 = 1 << 30; // 1 GiB
3660        let chunk_count: u32 = 100;
3661        let mut blob = synth_s4e6_header(chunk_size, chunk_count);
3662        // Pad with 100 random bytes — the attack model: tiny payload,
3663        // monster header.
3664        blob.extend_from_slice(&[0u8; 100]);
3665        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3666        assert!(
3667            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3668            "expected ChunkFrameTooLarge (declared 100 GiB > 5 GiB cap), got {err:?}",
3669        );
3670        // Same input, tighter caller-supplied cap (1 MiB): still
3671        // ChunkFrameTooLarge. No panic, no OOM either way.
3672        let err2 = decrypt_chunked_buffered(&blob, &kr, 1024 * 1024).unwrap_err();
3673        assert!(
3674            matches!(err2, SseError::ChunkFrameTooLarge { .. }),
3675            "expected ChunkFrameTooLarge under tighter cap, got {err2:?}",
3676        );
3677    }
3678
3679    #[test]
3680    fn s4e6_header_chunk_size_x_chunk_count_overflows_u64() {
3681        // chunk_size = u32::MAX, chunk_count > 1 → product > u64 cap?
3682        // No — u32::MAX * u32::MAX fits in u64 (~2^64). To force u64
3683        // overflow we'd need both > 2^32. But chunk_count is capped to
3684        // 16M (S4E6_MAX_CHUNK_COUNT) by the earlier check, so the
3685        // realistic adversarial maximum is u32::MAX * 16M ≈ 2^56 — well
3686        // under u64::MAX. The overflow path is therefore *theoretically*
3687        // unreachable on S4E6 (24-bit chunk_count cap protects it), but
3688        // we still test it via S4E5 which has no 24-bit cap on
3689        // chunk_count: a 4-byte salt frame with chunk_size = u32::MAX
3690        // and chunk_count = u32::MAX gives 2^64 - 2^33 + 1 — fits u64
3691        // but adding the 16-byte tag overhead × u32::MAX chunks
3692        // overflows. Verify ChunkFrameTooLarge surfaces.
3693        let kr = keyring_single(0x02);
3694        // Synthesize an S4E5 header (20 bytes) with maximally
3695        // adversarial chunk_size + chunk_count.
3696        let mut blob = Vec::with_capacity(S4E5_HEADER_BYTES);
3697        blob.extend_from_slice(SSE_MAGIC_V5);
3698        blob.push(ALGO_AES_256_GCM);
3699        blob.extend_from_slice(&1_u16.to_be_bytes());
3700        blob.push(0);
3701        blob.extend_from_slice(&u32::MAX.to_be_bytes()); // chunk_size = 2^32 - 1
3702        blob.extend_from_slice(&u32::MAX.to_be_bytes()); // chunk_count = 2^32 - 1
3703        blob.extend_from_slice(&[0u8; 4]); // 4-byte S4E5 salt
3704        debug_assert_eq!(blob.len(), S4E5_HEADER_BYTES);
3705        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3706        assert!(
3707            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3708            "expected ChunkFrameTooLarge for u64 overflow, got {err:?}",
3709        );
3710
3711        // Also smoke the same input through `parse_chunked_header`
3712        // directly (the streaming path) — must error, never panic.
3713        let direct = parse_chunked_header(&blob, usize::MAX).unwrap_err();
3714        assert!(
3715            matches!(direct, SseError::ChunkFrameTooLarge { .. }),
3716            "streaming path: expected ChunkFrameTooLarge, got {direct:?}",
3717        );
3718    }
3719
3720    #[test]
3721    fn s4e6_header_within_max_body_bytes_passes() {
3722        // 1 MiB chunk_size × 100 chunks = 100 MiB declared plaintext —
3723        // well under the 5 GiB default cap. The header validation must
3724        // NOT reject this; instead it falls through to chunk-walk +
3725        // AES-GCM verify (which then fails on this synthetic frame
3726        // because we didn't write any ciphertext, so we expect
3727        // ChunkFrameTruncated *not* ChunkFrameTooLarge).
3728        let kr = keyring_single(0x03);
3729        let chunk_size: u32 = 1024 * 1024; // 1 MiB
3730        let chunk_count: u32 = 100;
3731        let mut blob = synth_s4e6_header(chunk_size, chunk_count);
3732        // Append the full declared chunk array so the truncation arm
3733        // doesn't fire — 100 chunks × (16 B tag + 1 MiB ct) = 100 MiB
3734        // + 1.6 KiB tags. We write zeros; AES-GCM verify will fail on
3735        // chunk 0, but that proves the pre-alloc guard *let it through*.
3736        let chunk_array_size =
3737            (chunk_count as usize) * (S4E6_PER_CHUNK_OVERHEAD + chunk_size as usize);
3738        blob.resize(blob.len() + chunk_array_size, 0);
3739        let err =
3740            decrypt_chunked_buffered(&blob, &kr, DEFAULT_MAX_BODY_BYTES).unwrap_err();
3741        // The guard let it through (we got past parse_chunked_header)
3742        // and AES-GCM tag verify failed on chunk 0 — that's the right
3743        // failure mode. ChunkFrameTooLarge would mean the cap fired
3744        // (a regression of this test). KeyNotInKeyring would mean the
3745        // header parsed but the key id (1) wasn't present (also a
3746        // regression — the keyring has id=1 active). Anything else is
3747        // a guard-too-strict bug.
3748        assert!(
3749            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3750            "expected ChunkAuthFailed (guard let it through), got {err:?}",
3751        );
3752    }
3753
3754    #[test]
3755    fn s4e6_header_exceeds_max_body_bytes_rejected() {
3756        // 1 MiB × 6000 chunks = 6 GiB declared plaintext > 5 GiB cap.
3757        // Must reject as ChunkFrameTooLarge before any alloc. We don't
3758        // attach the 6 GiB chunk array to the body — the cap arm only
3759        // looks at the declared `chunk_size × chunk_count`, not the
3760        // actual body length, so a tiny body suffices to exercise the
3761        // cap.
3762        let kr = keyring_single(0x04);
3763        let chunk_size: u32 = 1024 * 1024; // 1 MiB
3764        let chunk_count: u32 = 6000;
3765        let blob = synth_s4e6_header(chunk_size, chunk_count);
3766        // Body is exactly the 24-byte header, no chunks attached.
3767        // body.len() = 24 ≤ max_total (~6 GiB) so the trailing-bytes
3768        // check does NOT fire; the 6 GiB > 5 GiB cap check does.
3769        let err = decrypt_chunked_buffered(&blob, &kr, DEFAULT_MAX_BODY_BYTES).unwrap_err();
3770        assert!(
3771            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3772            "expected ChunkFrameTooLarge (6 GiB declared > 5 GiB cap), got {err:?}",
3773        );
3774
3775        // Directly exercise the cap arm with a tighter cap (1 MiB)
3776        // and a 100 MiB declared plaintext that fits fully in the
3777        // body, so the cap is unambiguously the deciding factor.
3778        let chunk_size_b: u32 = 1024 * 1024; // 1 MiB
3779        let chunk_count_b: u32 = 100;
3780        let mut blob_b = synth_s4e6_header(chunk_size_b, chunk_count_b);
3781        let pad_b =
3782            (chunk_count_b as usize) * (S4E6_PER_CHUNK_OVERHEAD + chunk_size_b as usize);
3783        blob_b.resize(blob_b.len() + pad_b, 0);
3784        // Cap = 1 MiB, declared plaintext = 100 MiB → ChunkFrameTooLarge.
3785        let err_b = decrypt_chunked_buffered(&blob_b, &kr, 1024 * 1024).unwrap_err();
3786        assert!(
3787            matches!(err_b, SseError::ChunkFrameTooLarge { .. }),
3788            "expected ChunkFrameTooLarge (cap < declared), got {err_b:?}",
3789        );
3790    }
3791
3792    #[test]
3793    fn s4e6_random_header_never_panics() {
3794        // DoS regression — feed 100k random byte sequences shaped like
3795        // chunked SSE headers to `parse_chunked_header`. Every result
3796        // must be Ok or Err; no panic, no OOM. This exercises the
3797        // arithmetic-overflow + truncation arms of the v0.8.2 #64
3798        // guard against adversarial inputs the previous unit tests
3799        // didn't enumerate.
3800        //
3801        // We use a fixed seed (deterministic) rather than proptest
3802        // here so the test is repeatable across CI runs without
3803        // pulling in shrink machinery for a pure no-panic property.
3804        use rand::{Rng, SeedableRng, rngs::StdRng};
3805        let mut rng = StdRng::seed_from_u64(0xC0FF_EE64_6464_64DE);
3806        let mut max_body_bytes_choices = [
3807            0_usize,
3808            1024,
3809            1024 * 1024,
3810            DEFAULT_MAX_BODY_BYTES,
3811            usize::MAX,
3812        ]
3813        .iter()
3814        .copied()
3815        .cycle();
3816        for _ in 0..100_000 {
3817            // Body length in [0, 256] — tiny on purpose: most random
3818            // bytes won't be a valid header, but we want the parser
3819            // to robustly reject every shape (truncated, wrong magic,
3820            // wrong algo, declared-vs-actual mismatch, overflow).
3821            let body_len = rng.gen_range(0..=256_usize);
3822            let mut body = vec![0u8; body_len];
3823            rng.fill(body.as_mut_slice());
3824            // Bias 1/4 of trials toward valid magic so the deeper
3825            // arithmetic paths get exercised, not just the BadMagic
3826            // early-out.
3827            if body_len >= 4 && rng.gen_bool(0.25) {
3828                if rng.gen_bool(0.5) {
3829                    body[..4].copy_from_slice(SSE_MAGIC_V5);
3830                } else {
3831                    body[..4].copy_from_slice(SSE_MAGIC_V6);
3832                }
3833            }
3834            let max_cap = max_body_bytes_choices.next().unwrap();
3835            // The contract: never panic, never alloc more than
3836            // `max_cap` worth of memory. We assert only no-panic
3837            // here; OOM behavior is implicit (the function returns
3838            // Err before any large alloc).
3839            let _ = parse_chunked_header(&body, max_cap);
3840        }
3841    }
3842
3843    // Bonus: an extreme-overflow case the guard is specifically
3844    // hardened against — chunk_count = u32::MAX with the per-chunk
3845    // overhead multiplication forced to overflow u64. Catches a
3846    // future refactor that would replace `checked_mul` with `*`.
3847    #[test]
3848    fn s4e5_extreme_overflow_chunk_count_u32_max() {
3849        let kr = keyring_single(0x05);
3850        // u32::MAX chunk_size × u32::MAX chunk_count overflows u64
3851        // when adding the 16 * u32::MAX tag overhead. (Strictly:
3852        // chunk_size × chunk_count = (2^32-1)^2 ≈ 2^64 - 2^33 + 1
3853        // — already u64-saturating; the +tag step then overflows.)
3854        // The guard's `checked_add` chain must catch this.
3855        let mut blob = Vec::with_capacity(S4E5_HEADER_BYTES);
3856        blob.extend_from_slice(SSE_MAGIC_V5);
3857        blob.push(ALGO_AES_256_GCM);
3858        blob.extend_from_slice(&1_u16.to_be_bytes());
3859        blob.push(0);
3860        blob.extend_from_slice(&u32::MAX.to_be_bytes());
3861        blob.extend_from_slice(&u32::MAX.to_be_bytes());
3862        blob.extend_from_slice(&[0u8; 4]);
3863        let err = decrypt_chunked_buffered_default(&blob, &kr).unwrap_err();
3864        assert!(
3865            matches!(err, SseError::ChunkFrameTooLarge { .. }),
3866            "expected ChunkFrameTooLarge for extreme overflow, got {err:?}",
3867        );
3868    }
3869}