Skip to main content

s4_server/
sse.rs

1//! Server-side encryption (SSE-S4) — AES-256-GCM (v0.4 #21, v0.5 #29, v0.5 #27, v0.5 #28).
2//!
3//! Wraps the post-compression S3 object body with authenticated
4//! encryption. Compress-then-encrypt is the right order: encryption
5//! produces high-entropy bytes that don't compress, so encrypting last
6//! preserves the codec's ratio.
7//!
8//! ## Wire formats
9//!
10//! ### S4E1 (v0.4) — single key, no rotation
11//!
12//! ```text
13//! [magic: "S4E1" 4B]
14//! [algo:  u8]            # 1 = AES-256-GCM
15//! [reserved: 3B]         # 0x00 0x00 0x00
16//! [nonce: 12B]           # random per-object
17//! [tag:   16B]           # AES-GCM authentication tag
18//! [ciphertext: variable] # encrypted-then-authenticated body
19//! ```
20//!
21//! Total overhead: 36 bytes per object.
22//!
23//! ### S4E2 (v0.5 #29) — keyring-aware, supports rotation
24//!
25//! ```text
26//! [magic:  "S4E2" 4B]
27//! [algo:   u8]            # 1 = AES-256-GCM
28//! [key_id: u16 BE]        # which keyring slot encrypted this body
29//! [reserved: 1B]          # 0x00
30//! [nonce:  12B]           # random per-object
31//! [tag:    16B]           # AES-GCM authentication tag
32//! [ciphertext: variable]
33//! ```
34//!
35//! Same 36-byte overhead — we reused the 3-byte reserved area in S4E1
36//! to fit a 2-byte key-id + 1-byte reserved without bumping the header
37//! size. The key-id is included in the AAD so a flipped key-id byte
38//! fails the auth tag (i.e. an attacker can't trick the gateway into
39//! decrypting under a different keyring slot).
40//!
41//! ### S4E3 (v0.5 #27) — SSE-C, customer-provided key
42//!
43//! ```text
44//! [magic:   "S4E3" 4B]
45//! [algo:    u8]            # 1 = AES-256-GCM
46//! [key_md5: 16B]           # MD5 fingerprint of the customer key
47//! [nonce:   12B]           # random per-object
48//! [tag:     16B]           # AES-GCM authentication tag
49//! [ciphertext: variable]
50//! ```
51//!
52//! Overhead: 49 bytes (`4 + 1 + 16 + 12 + 16`). Unlike S4E1/S4E2 the
53//! gateway does **not** persist the key — the client supplies it on
54//! every PUT/GET via `x-amz-server-side-encryption-customer-{algorithm,
55//! key,key-MD5}` headers. We store only the 16-byte MD5 in the on-disk
56//! frame so a GET with the wrong key surfaces as
57//! [`SseError::WrongCustomerKey`] before AES-GCM is even tried (saves a
58//! useless decrypt + gives operators a distinct error from generic auth
59//! failure).
60//!
61//! The `key_md5` is included in the AAD so flipping a single byte of
62//! the stored fingerprint also breaks AES-GCM auth — i.e. an attacker
63//! who tampered with the metadata can't sneak a different key past the
64//! check.
65//!
66//! ### S4E4 (v0.5 #28) — SSE-KMS envelope, per-object DEK
67//!
68//! ```text
69//! [magic:           "S4E4" 4B]
70//! [algo:            u8]            # 1 = AES-256-GCM
71//! [key_id_len:      u8]            # 1..=255, length of UTF-8 key_id
72//! [key_id:          variable]      # UTF-8, AAD-authenticated
73//! [wrapped_dek_len: u32 BE]        # length of the wrapped DEK blob
74//! [wrapped_dek:     variable]      # opaque, AAD-authenticated
75//! [nonce:           12B]           # random per-object
76//! [tag:             16B]           # AES-GCM auth tag for body
77//! [ciphertext:      variable]      # body encrypted under the DEK
78//! ```
79//!
80//! Header overhead: `4 + 1 + 1 + key_id_len + 4 + wrapped_dek_len + 12
81//! + 16` = 38 + key_id_len + wrapped_dek_len. For a typical
82//! [`crate::kms::LocalKms`] wrap (60-byte ciphertext) and a 36-char
83//! UUID-style `key_id`, that's ~134 bytes per object.
84//!
85//! `key_id` and `wrapped_dek` are both placed in the AAD so an
86//! attacker cannot rewrite either field to point the gateway at a
87//! different KEK or wrapped DEK without invalidating the body's
88//! AES-GCM tag. The plaintext DEK is never persisted; only the
89//! wrapped form is on disk, and the gateway holds the plaintext only
90//! for the duration of one PUT or GET.
91//!
92//! S4E4 decrypt requires an `async` round-trip to the KMS backend
93//! (to unwrap the DEK), so the synchronous [`decrypt`] function
94//! refuses S4E4 with [`SseError::KmsAsyncRequired`] — callers that
95//! peek `S4E4` via [`peek_magic`] must dispatch to
96//! [`decrypt_with_kms`] instead.
97//!
98//! ## v0.5 rotation flow (SSE-S4 only)
99//!
100//! Operators wire one [`SseKeyring`] holding the **active** key plus
101//! any number of **retired** keys. PUT always encrypts under the
102//! active key (S4E2 with that key's id). GET sniffs the magic:
103//!
104//! - `S4E1`: legacy single-key path. The keyring's active key is tried
105//!   first, then every retired key — this lets a v0.4 deployment
106//!   migrate to a keyring with the original key as active and decrypt
107//!   pre-rotation objects unchanged.
108//! - `S4E2`: read the key_id, look it up in the keyring, decrypt with
109//!   that exact key. Missing key_id surfaces as `KeyNotInKeyring`.
110//! - `S4E3`: keyring is **not** consulted. Caller must supply
111//!   [`SseSource::CustomerKey`] with the matching key + md5.
112//!
113//! ## Open follow-ups
114//!
115//! - **Server-managed key only** (for SSE-S4): keys come from local
116//!   files via `--sse-s4-key` / `--sse-s4-key-rotated`. KMS / vault
117//!   integration for the SSE-S4 keyring (i.e. wrapping the keyring's
118//!   keys with KMS) is a separate issue. SSE-KMS for per-object DEKs
119//!   is implemented (see [`SseSource::Kms`] + S4E4 above).
120
121use std::collections::HashMap;
122use std::path::Path;
123use std::sync::Arc;
124
125use aes_gcm::aead::{Aead, KeyInit, Payload};
126use aes_gcm::{Aes256Gcm, Key, Nonce};
127use bytes::Bytes;
128use md5::{Digest as Md5Digest, Md5};
129use rand::RngCore;
130use thiserror::Error;
131
132use crate::kms::{KmsBackend, KmsError, WrappedDek};
133
134pub const SSE_MAGIC_V1: &[u8; 4] = b"S4E1";
135pub const SSE_MAGIC_V2: &[u8; 4] = b"S4E2";
136pub const SSE_MAGIC_V3: &[u8; 4] = b"S4E3";
137pub const SSE_MAGIC_V4: &[u8; 4] = b"S4E4";
138/// v0.8 #52: chunked variant of S4E2 — same SSE-S4 keyring source,
139/// but the body is sliced into independently-sealed AES-GCM chunks
140/// so the GET path can stream-decrypt + emit chunk-by-chunk instead
141/// of buffering the entire object before tag verify. See
142/// [`encrypt_v2_chunked`] / [`decrypt_chunked_stream`] for the on-
143/// the-wire layout.
144///
145/// **Read-only as of v0.8.1 #57** — new PUTs emit [`SSE_MAGIC_V6`]
146/// (S4E6). S4E5 is kept around for back-compat decrypt of objects
147/// written by v0.8.0.
148pub const SSE_MAGIC_V5: &[u8; 4] = b"S4E5";
149/// v0.8.1 #57: identical layout to S4E5 except the per-PUT salt is
150/// widened from 4 → 8 bytes so the birthday-collision threshold on
151/// AES-GCM nonce reuse jumps from ~65k PUTs/key to ~4 billion. See
152/// [`encrypt_v2_chunked`] (now emits S4E6) / the S4E6 wire-format
153/// docs further down for the full layout.
154pub const SSE_MAGIC_V6: &[u8; 4] = b"S4E6";
155/// Back-compat alias — v0.4 callers that imported `SSE_MAGIC` mean S4E1.
156pub const SSE_MAGIC: &[u8; 4] = SSE_MAGIC_V1;
157
158/// Header layout matches between S4E1 and S4E2 (both 36 bytes total)
159/// because S4E2 reuses the 3-byte reserved slot to fit `key_id (2B) +
160/// reserved (1B)`. Keeping them the same length means the rest of the
161/// pipeline (sidecar offsets, multipart math) doesn't care which
162/// frame variant is in flight.
163pub const SSE_HEADER_BYTES: usize = 4 + 1 + 3 + 12 + 16; // = 36
164/// S4E3 (SSE-C) replaces the 3-byte reserved area with a 16-byte
165/// customer-key MD5 fingerprint, so the header is 49 bytes total.
166/// `magic 4 + algo 1 + key_md5 16 + nonce 12 + tag 16`.
167pub const SSE_HEADER_BYTES_V3: usize = 4 + 1 + KEY_MD5_LEN + 12 + 16; // = 49
168pub const ALGO_AES_256_GCM: u8 = 1;
169const NONCE_LEN: usize = 12;
170const TAG_LEN: usize = 16;
171const KEY_LEN: usize = 32;
172const KEY_MD5_LEN: usize = 16;
173/// AWS S3 SSE-C only allows AES256 in the
174/// `x-amz-server-side-encryption-customer-algorithm` header, so we
175/// match that exact spelling for parity with real S3 clients.
176pub const SSE_C_ALGORITHM: &str = "AES256";
177
178#[derive(Debug, Error)]
179pub enum SseError {
180    #[error("SSE key file {path:?}: {source}")]
181    KeyFileIo {
182        path: std::path::PathBuf,
183        source: std::io::Error,
184    },
185    #[error(
186        "SSE key file must be exactly 32 raw bytes (or 64-char hex / 44-char base64); got {got} bytes after parse"
187    )]
188    BadKeyLength { got: usize },
189    #[error("SSE-encrypted body too short ({got} bytes; need at least {SSE_HEADER_BYTES})")]
190    TooShort { got: usize },
191    #[error("SSE bad magic: expected S4E1/S4E2/S4E3/S4E4/S4E5/S4E6, got {got:?}")]
192    BadMagic { got: [u8; 4] },
193    #[error("SSE unsupported algo tag: {tag} (this build only knows AES-256-GCM = 1)")]
194    UnsupportedAlgo { tag: u8 },
195    #[error(
196        "SSE key_id {id} (S4E2 frame) not present in keyring; rotation history likely incomplete"
197    )]
198    KeyNotInKeyring { id: u16 },
199    #[error("SSE decryption / authentication failed (key mismatch or ciphertext tampered with)")]
200    DecryptFailed,
201    // --- v0.5 #27: SSE-C specific errors ---
202    /// The MD5 fingerprint stored in the S4E3 frame doesn't match the
203    /// MD5 of the customer key the client supplied. This is the
204    /// "wrong customer key on GET" signal — distinct from
205    /// `DecryptFailed` so service.rs can map it to AWS S3's
206    /// `403 AccessDenied` (S3 returns AccessDenied when the supplied
207    /// SSE-C key doesn't match the one used at PUT time).
208    #[error("SSE-C key MD5 fingerprint mismatch — client supplied a different key than PUT")]
209    WrongCustomerKey,
210    /// `parse_customer_key_headers` saw a malformed input. `reason` is
211    /// a short human string ("base64 decode of key", "key length",
212    /// "md5 length", "md5 mismatch") for operator log lines — never
213    /// echoed to the client (would leak crypto details).
214    #[error("SSE-C customer-key headers invalid: {reason}")]
215    InvalidCustomerKey { reason: &'static str },
216    /// Client asked for an SSE-C algorithm the gateway doesn't speak.
217    /// AWS S3 only ever defines `AES256` here; surfacing the offending
218    /// string lets us 400 with a useful message.
219    #[error("SSE-C algorithm {algo:?} unsupported (only {SSE_C_ALGORITHM:?} is allowed)")]
220    CustomerKeyAlgorithmUnsupported { algo: String },
221    /// S4E3 body lacks an SSE-C key — caller passed `SseSource::Keyring`
222    /// when decrypting an SSE-C-encrypted object. service.rs should
223    /// translate this into the same "missing customer key" 400 that
224    /// AWS S3 returns when SSE-C headers are absent on a GET.
225    #[error("S4E3 frame requires SseSource::CustomerKey; got Keyring")]
226    CustomerKeyRequired,
227    /// Inverse: client sent SSE-C headers on a GET for an object stored
228    /// without SSE-C. The supplied key has no role in decryption, but
229    /// AWS S3 actually 400s in this case ("expected an unencrypted
230    /// object" / "extraneous SSE-C headers"), so we mirror that.
231    #[error("S4E1/S4E2 frame stored without SSE-C; SseSource::CustomerKey is unexpected")]
232    CustomerKeyUnexpected,
233    // --- v0.5 #28: SSE-KMS specific errors ---
234    /// `decrypt` (sync) was handed an S4E4 body. SSE-KMS unwrap is
235    /// async (it round-trips to the KMS backend), so callers must
236    /// peek the magic with [`peek_magic`] and dispatch S4E4 frames to
237    /// [`decrypt_with_kms`] instead. service.rs's GET handler does
238    /// this; tests / direct callers may hit this if they forget.
239    #[error(
240        "S4E4 (SSE-KMS) body requires async decrypt — call decrypt_with_kms() instead of decrypt()"
241    )]
242    KmsAsyncRequired,
243    /// S4E4 frame is shorter than the minimum-possible header (38
244    /// bytes for an empty `key_id` + empty `wrapped_dek`, which is
245    /// itself impossible — we just sanity-check the floor).
246    #[error("S4E4 frame too short ({got} bytes; need at least {min})")]
247    KmsFrameTooShort { got: usize, min: usize },
248    /// S4E4 declared a `key_id_len` or `wrapped_dek_len` that runs
249    /// past the end of the body. Almost certainly truncation /
250    /// corruption rather than tampering (tampering would fail the
251    /// AES-GCM tag instead).
252    #[error("S4E4 frame field length out of bounds: {what}")]
253    KmsFrameFieldOob { what: &'static str },
254    /// `key_id` field of an S4E4 frame is not valid UTF-8. We require
255    /// UTF-8 because `LocalKms` uses the basename of a `.kek` file
256    /// (which is OS-string-but-typically-UTF-8) and AWS KMS uses ARNs
257    /// (which are ASCII).
258    #[error("S4E4 key_id is not valid UTF-8")]
259    KmsKeyIdNotUtf8,
260    /// service.rs handed `decrypt_with_kms` a `WrappedDek` whose
261    /// `key_id` doesn't match the one stored in the S4E4 frame. This
262    /// is an integration bug (caller is meant to pull the wrapped
263    /// DEK *from the frame*, not from somewhere else), surface as a
264    /// distinct error so it shows up in tests rather than silently
265    /// failing the AES-GCM tag.
266    #[error(
267        "S4E4 SseSource::Kms wrapped DEK key_id {supplied:?} doesn't match frame key_id {stored:?}"
268    )]
269    KmsWrappedDekMismatch {
270        supplied: String,
271        stored: String,
272    },
273    /// SSE-KMS path got a non-Kms `SseSource` for an S4E4 body. The
274    /// async dispatch in `decrypt_with_kms` re-derives the source
275    /// internally so this can only happen if a future caller passes
276    /// `SseSource::Keyring` / `CustomerKey` to a path that expected
277    /// `Kms` — kept around for symmetry with the other "wrong source"
278    /// errors.
279    #[error("S4E4 frame requires SseSource::Kms")]
280    KmsRequired,
281    /// Pass-through for [`crate::kms::KmsError`] surfaced from
282    /// `KmsBackend::decrypt_dek` — boxed so the variant stays small.
283    #[error("KMS unwrap: {0}")]
284    KmsBackend(#[from] KmsError),
285    // --- v0.8 #52: S4E5 (chunked SSE-S4) specific errors ---
286    /// AES-GCM auth tag verify failed on chunk `chunk_index` of an
287    /// S4E5 body. Distinct from the all-or-nothing
288    /// [`SseError::DecryptFailed`] because the streaming GET may
289    /// have already emitted earlier chunks to the client by the
290    /// time chunk N fails — operators need the chunk index in audit
291    /// logs to triangulate which byte range was tampered with (or
292    /// which disk sector flipped).
293    #[error("S4E5 chunk {chunk_index} auth tag verify failed (key mismatch or chunk tampered with)")]
294    ChunkAuthFailed { chunk_index: u32 },
295    /// Caller asked [`encrypt_v2_chunked`] to use a chunk size of 0
296    /// — nonsensical (would loop forever). Surfaced as an error
297    /// rather than panicking so service.rs can map a bad
298    /// `--sse-chunk-size 0` configuration to a clear startup error.
299    #[error("S4E5 chunk_size must be > 0 (got 0)")]
300    ChunkSizeInvalid,
301    /// S4E5 frame is shorter than the fixed header or declares a
302    /// (chunk_count × per-chunk-bytes) total that overruns the
303    /// body. Almost certainly truncation / corruption — tampering
304    /// with the per-chunk ciphertext or tag would surface as
305    /// [`SseError::ChunkAuthFailed`] instead.
306    #[error("S4E5 frame truncated: {what}")]
307    ChunkFrameTruncated { what: &'static str },
308    // --- v0.8.1 #57: S4E6 (8-byte salt, 24-bit chunk_index) ---
309    /// S4E6 chunk_index is encoded as a 24-bit big-endian field in
310    /// the per-chunk nonce, capping `chunk_count` at
311    /// `2^24 - 1 = 16_777_215`. At the default 1 MiB chunk size that
312    /// is ~16 PiB per object — well past S3's 5 GiB single-object
313    /// ceiling. Surface as a distinct error so a misconfiguration
314    /// (`--sse-chunk-size 1` on a multi-GiB object, say) shows up at
315    /// PUT time with a clear cause rather than a panic at the u32 →
316    /// u24 cast.
317    #[error(
318        "S4E6 chunk_count {got} exceeds 24-bit max ({max}) — pick a larger --sse-chunk-size"
319    )]
320    ChunkCountTooLarge { got: u32, max: u32 },
321}
322
323/// 32-byte symmetric key. `bytes` is `pub` so call sites can construct
324/// keys directly from already-validated bytes (e.g. KMS-decrypted DEKs)
325/// without going through the on-disk parser. Hold inside an `Arc` when
326/// sharing across handler tasks — `SseKeyring` does this internally.
327pub struct SseKey {
328    pub bytes: [u8; 32],
329}
330
331impl SseKey {
332    /// Load a 32-byte key from disk. Accepts three on-disk encodings:
333    /// raw 32 bytes, 64-char ASCII hex, or 44-char ASCII base64 (with or
334    /// without padding). Whitespace is trimmed.
335    pub fn from_path(path: &Path) -> Result<Self, SseError> {
336        let raw = std::fs::read(path).map_err(|source| SseError::KeyFileIo {
337            path: path.to_path_buf(),
338            source,
339        })?;
340        Self::from_bytes(&raw)
341    }
342
343    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SseError> {
344        // Try raw first.
345        if bytes.len() == KEY_LEN {
346            let mut k = [0u8; KEY_LEN];
347            k.copy_from_slice(bytes);
348            return Ok(Self { bytes: k });
349        }
350        // Trim whitespace and try hex / base64.
351        let s = std::str::from_utf8(bytes).unwrap_or("").trim();
352        if s.len() == KEY_LEN * 2 && s.chars().all(|c| c.is_ascii_hexdigit()) {
353            let mut k = [0u8; KEY_LEN];
354            for (i, k_byte) in k.iter_mut().enumerate() {
355                *k_byte = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16)
356                    .map_err(|_| SseError::BadKeyLength { got: bytes.len() })?;
357            }
358            return Ok(Self { bytes: k });
359        }
360        if let Ok(decoded) =
361            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s.as_bytes())
362            && decoded.len() == KEY_LEN
363        {
364            let mut k = [0u8; KEY_LEN];
365            k.copy_from_slice(&decoded);
366            return Ok(Self { bytes: k });
367        }
368        Err(SseError::BadKeyLength { got: bytes.len() })
369    }
370
371    fn as_aes_key(&self) -> &Key<Aes256Gcm> {
372        Key::<Aes256Gcm>::from_slice(&self.bytes)
373    }
374}
375
376impl std::fmt::Debug for SseKey {
377    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378        f.debug_struct("SseKey")
379            .field("len", &KEY_LEN)
380            .field("key", &"<redacted>")
381            .finish()
382    }
383}
384
385/// v0.5 #29: a set of `SseKey`s indexed by `u16` key-id, plus a
386/// designated **active** id used for new encryptions. Rotation is just
387/// "add the new key, flip `active` to its id, leave the old keys for
388/// decryption-only". Cheap to clone (`Arc<SseKey>` per slot).
389#[derive(Clone)]
390pub struct SseKeyring {
391    active: u16,
392    keys: HashMap<u16, Arc<SseKey>>,
393}
394
395impl SseKeyring {
396    /// Create a keyring seeded with one key, immediately marked
397    /// active. Add older keys later via [`SseKeyring::add`] so the
398    /// gateway can still decrypt pre-rotation objects.
399    pub fn new(active: u16, key: Arc<SseKey>) -> Self {
400        let mut keys = HashMap::new();
401        keys.insert(active, key);
402        Self { active, keys }
403    }
404
405    /// Insert another key under id `id`. Does NOT change `active`. If
406    /// `id == active`, the slot is overwritten (useful for tests; in
407    /// production prefer minting a fresh id).
408    pub fn add(&mut self, id: u16, key: Arc<SseKey>) {
409        self.keys.insert(id, key);
410    }
411
412    /// Active (id, key) — used by [`encrypt_v2`] to pick the slot for
413    /// new objects.
414    pub fn active(&self) -> (u16, &SseKey) {
415        let id = self.active;
416        let key = self
417            .keys
418            .get(&id)
419            .expect("active key id must be present in keyring (constructor invariant)");
420        (id, key.as_ref())
421    }
422
423    /// Look up a key by id. Returns `None` for unknown ids — caller
424    /// should surface this as [`SseError::KeyNotInKeyring`].
425    pub fn get(&self, id: u16) -> Option<&SseKey> {
426        self.keys.get(&id).map(Arc::as_ref)
427    }
428}
429
430impl std::fmt::Debug for SseKeyring {
431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432        f.debug_struct("SseKeyring")
433            .field("active", &self.active)
434            .field("key_count", &self.keys.len())
435            .field("key_ids", &self.keys.keys().collect::<Vec<_>>())
436            .finish()
437    }
438}
439
440pub type SharedSseKeyring = Arc<SseKeyring>;
441
442/// Encrypt `plaintext` with the given key, producing the on-the-wire
443/// S4E1-framed output: `[magic 4][algo 1][reserved 3][nonce 12][tag 16][ciphertext]`.
444///
445/// Kept for back-compat: v0.4 callers that hand-built an `SseKey` (no
446/// keyring) still get the v1 frame. New code should use
447/// [`encrypt_v2`] which writes S4E2 and supports rotation on read.
448pub fn encrypt(key: &SseKey, plaintext: &[u8]) -> Bytes {
449    let cipher = Aes256Gcm::new(key.as_aes_key());
450    let mut nonce_bytes = [0u8; NONCE_LEN];
451    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
452    let nonce = Nonce::from_slice(&nonce_bytes);
453    // AAD = magic + algo. Tampering with either bumps the tag check.
454    let mut aad = [0u8; 8];
455    aad[..4].copy_from_slice(SSE_MAGIC_V1);
456    aad[4] = ALGO_AES_256_GCM;
457    let ct_with_tag = cipher
458        .encrypt(
459            nonce,
460            Payload {
461                msg: plaintext,
462                aad: &aad,
463            },
464        )
465        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
466    debug_assert!(ct_with_tag.len() >= TAG_LEN);
467    let split = ct_with_tag.len() - TAG_LEN;
468    let (ct, tag) = ct_with_tag.split_at(split);
469
470    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
471    out.extend_from_slice(SSE_MAGIC_V1);
472    out.push(ALGO_AES_256_GCM);
473    out.extend_from_slice(&[0u8; 3]); // reserved
474    out.extend_from_slice(&nonce_bytes);
475    out.extend_from_slice(tag);
476    out.extend_from_slice(ct);
477    Bytes::from(out)
478}
479
480/// v0.5 #29: encrypt under the keyring's currently-active key, writing
481/// an S4E2-framed body (`[magic 4][algo 1][key_id 2 BE][reserved 1]
482/// [nonce 12][tag 16][ciphertext]`). The key-id is included in the
483/// AAD so flipping it fails the auth tag.
484pub fn encrypt_v2(plaintext: &[u8], keyring: &SseKeyring) -> Bytes {
485    let (key_id, key) = keyring.active();
486    let cipher = Aes256Gcm::new(key.as_aes_key());
487    let mut nonce_bytes = [0u8; NONCE_LEN];
488    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
489    let nonce = Nonce::from_slice(&nonce_bytes);
490    let aad = aad_v2(key_id);
491    let ct_with_tag = cipher
492        .encrypt(
493            nonce,
494            Payload {
495                msg: plaintext,
496                aad: &aad,
497            },
498        )
499        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
500    debug_assert!(ct_with_tag.len() >= TAG_LEN);
501    let split = ct_with_tag.len() - TAG_LEN;
502    let (ct, tag) = ct_with_tag.split_at(split);
503
504    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
505    out.extend_from_slice(SSE_MAGIC_V2);
506    out.push(ALGO_AES_256_GCM);
507    out.extend_from_slice(&key_id.to_be_bytes()); // 2B BE key_id
508    out.push(0u8); // 1B reserved
509    out.extend_from_slice(&nonce_bytes);
510    out.extend_from_slice(tag);
511    out.extend_from_slice(ct);
512    Bytes::from(out)
513}
514
515fn aad_v1() -> [u8; 8] {
516    let mut aad = [0u8; 8];
517    aad[..4].copy_from_slice(SSE_MAGIC_V1);
518    aad[4] = ALGO_AES_256_GCM;
519    aad
520}
521
522fn aad_v2(key_id: u16) -> [u8; 8] {
523    let mut aad = [0u8; 8];
524    aad[..4].copy_from_slice(SSE_MAGIC_V2);
525    aad[4] = ALGO_AES_256_GCM;
526    aad[5..7].copy_from_slice(&key_id.to_be_bytes());
527    aad[7] = 0u8;
528    aad
529}
530
531/// AAD for S4E3 = magic (4) + algo (1) + key_md5 (16). Putting the
532/// fingerprint in the AAD means tampering with the stored MD5 (e.g. an
533/// attacker rewriting the header to match a *different* key they
534/// happen to know) breaks the AES-GCM tag — the wrong-key check isn't
535/// just a plain `==` we could be tricked past.
536fn aad_v3(key_md5: &[u8; KEY_MD5_LEN]) -> [u8; 4 + 1 + KEY_MD5_LEN] {
537    let mut aad = [0u8; 4 + 1 + KEY_MD5_LEN];
538    aad[..4].copy_from_slice(SSE_MAGIC_V3);
539    aad[4] = ALGO_AES_256_GCM;
540    aad[5..5 + KEY_MD5_LEN].copy_from_slice(key_md5);
541    aad
542}
543
544/// Parsed + verified SSE-C key material from the three customer
545/// headers. `key_md5` is the MD5 of `key` (we recompute and compare in
546/// [`parse_customer_key_headers`] — clients send their own to catch
547/// transport corruption, but we *trust* our own computation as the
548/// canonical fingerprint in the S4E3 frame).
549#[derive(Clone)]
550pub struct CustomerKeyMaterial {
551    pub key: [u8; KEY_LEN],
552    pub key_md5: [u8; KEY_MD5_LEN],
553}
554
555impl std::fmt::Debug for CustomerKeyMaterial {
556    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
557        // Don't leak the key into logs. The MD5 is a public fingerprint
558        // (S3 puts it on the wire), so that's safe to show.
559        f.debug_struct("CustomerKeyMaterial")
560            .field("key", &"<redacted>")
561            .field("key_md5_hex", &hex_lower(&self.key_md5))
562            .finish()
563    }
564}
565
566fn hex_lower(bytes: &[u8]) -> String {
567    let mut s = String::with_capacity(bytes.len() * 2);
568    for b in bytes {
569        s.push_str(&format!("{b:02x}"));
570    }
571    s
572}
573
574/// Source of the encryption key for [`encrypt_with_source`] /
575/// [`decrypt`]. SSE-S4 (server-managed, rotation-aware) goes through
576/// `Keyring`; SSE-C (customer-supplied) goes through `CustomerKey`.
577///
578/// Borrowed (not owned) so the caller can hold a long-lived
579/// `CustomerKeyMaterial` next to the request and just lend it for the
580/// duration of one PUT/GET.
581#[derive(Debug, Clone, Copy)]
582pub enum SseSource<'a> {
583    /// Server-managed keyring path → produces / consumes S4E1 (legacy)
584    /// or S4E2 (rotation-aware) frames.
585    Keyring(&'a SseKeyring),
586    /// Client-supplied AES-256 key + its MD5 fingerprint → produces /
587    /// consumes S4E3 frames. The server never persists the key; it
588    /// stores `key_md5` only.
589    CustomerKey {
590        key: &'a [u8; KEY_LEN],
591        key_md5: &'a [u8; KEY_MD5_LEN],
592    },
593    /// SSE-KMS envelope → produces / consumes S4E4 frames. The server
594    /// holds a per-object plaintext DEK (from a fresh
595    /// [`KmsBackend::generate_dek`] call) and the wrapped form to
596    /// persist alongside the body. The DEK is dropped after one
597    /// PUT/GET; only the wrapped form survives at rest.
598    Kms {
599        /// 32-byte plaintext DEK, used as the AES-GCM key.
600        dek: &'a [u8; KEY_LEN],
601        /// Wrapped form to persist in the S4E4 frame (PUT) or the one
602        /// read out of the frame (GET, after a successful unwrap).
603        wrapped: &'a WrappedDek,
604    },
605}
606
607/// Back-compat coercion: existing call sites pass `&SseKeyring`
608/// directly to [`decrypt`]. With this `From` impl the generic bound
609/// `Into<SseSource>` accepts `&SseKeyring` without the caller writing
610/// `.into()`, keeping v0.4 / v0.5 #29 service.rs callers compiling
611/// untouched while v0.5 #27 SSE-C callers pass `SseSource::CustomerKey`
612/// explicitly.
613impl<'a> From<&'a SseKeyring> for SseSource<'a> {
614    fn from(kr: &'a SseKeyring) -> Self {
615        SseSource::Keyring(kr)
616    }
617}
618
619/// service.rs holds keyring as `Option<Arc<SseKeyring>>` and unwraps to
620/// `&Arc<SseKeyring>` — let that coerce too, otherwise every existing
621/// call site needs `.as_ref()` boilerplate.
622impl<'a> From<&'a Arc<SseKeyring>> for SseSource<'a> {
623    fn from(kr: &'a Arc<SseKeyring>) -> Self {
624        SseSource::Keyring(kr.as_ref())
625    }
626}
627
628impl<'a> From<&'a CustomerKeyMaterial> for SseSource<'a> {
629    fn from(m: &'a CustomerKeyMaterial) -> Self {
630        SseSource::CustomerKey {
631            key: &m.key,
632            key_md5: &m.key_md5,
633        }
634    }
635}
636
637/// Parse the three AWS SSE-C headers and return verified key material.
638///
639/// Validates, in order:
640/// 1. `algorithm == "AES256"` (the only value AWS S3 defines).
641/// 2. `key_base64` decodes to exactly 32 bytes (AES-256 key length).
642/// 3. `key_md5_base64` decodes to exactly 16 bytes (MD5 digest length).
643/// 4. The actual MD5 of the decoded key matches the supplied MD5.
644///
645/// Step 4 catches transport corruption *and* a class of programming
646/// bugs where the client signs with one key but uploads another. AWS
647/// S3 also performs this check.
648pub fn parse_customer_key_headers(
649    algorithm: &str,
650    key_base64: &str,
651    key_md5_base64: &str,
652) -> Result<CustomerKeyMaterial, SseError> {
653    use base64::Engine as _;
654    if algorithm != SSE_C_ALGORITHM {
655        return Err(SseError::CustomerKeyAlgorithmUnsupported {
656            algo: algorithm.to_string(),
657        });
658    }
659    let key_bytes = base64::engine::general_purpose::STANDARD
660        .decode(key_base64.trim().as_bytes())
661        .map_err(|_| SseError::InvalidCustomerKey {
662            reason: "base64 decode of key",
663        })?;
664    if key_bytes.len() != KEY_LEN {
665        return Err(SseError::InvalidCustomerKey {
666            reason: "key length (must be 32 bytes after base64 decode)",
667        });
668    }
669    let supplied_md5 = base64::engine::general_purpose::STANDARD
670        .decode(key_md5_base64.trim().as_bytes())
671        .map_err(|_| SseError::InvalidCustomerKey {
672            reason: "base64 decode of key MD5",
673        })?;
674    if supplied_md5.len() != KEY_MD5_LEN {
675        return Err(SseError::InvalidCustomerKey {
676            reason: "key MD5 length (must be 16 bytes after base64 decode)",
677        });
678    }
679    let actual_md5 = compute_key_md5(&key_bytes);
680    // Constant-time compare — paranoia, MD5 is non-secret but the key
681    // it identifies is, so we don't want a timing oracle.
682    if !constant_time_eq(&actual_md5, &supplied_md5) {
683        return Err(SseError::InvalidCustomerKey {
684            reason: "supplied MD5 does not match MD5 of supplied key",
685        });
686    }
687    let mut key = [0u8; KEY_LEN];
688    key.copy_from_slice(&key_bytes);
689    let mut key_md5 = [0u8; KEY_MD5_LEN];
690    key_md5.copy_from_slice(&actual_md5);
691    Ok(CustomerKeyMaterial { key, key_md5 })
692}
693
694/// Convenience wrapper — compute the MD5 fingerprint of a 32-byte
695/// customer key. Callers that already have the bytes (e.g. derived
696/// from a KMS unwrap) can use this to construct a
697/// [`CustomerKeyMaterial`] directly.
698pub fn compute_key_md5(key: &[u8]) -> [u8; KEY_MD5_LEN] {
699    let mut h = Md5::new();
700    h.update(key);
701    let out = h.finalize();
702    let mut md5 = [0u8; KEY_MD5_LEN];
703    md5.copy_from_slice(&out);
704    md5
705}
706
707/// `subtle`-free constant-time byte slice equality. We only need this
708/// at one site (MD5 verification) so pulling `subtle` in feels excessive.
709fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
710    if a.len() != b.len() {
711        return false;
712    }
713    let mut acc: u8 = 0;
714    for (x, y) in a.iter().zip(b.iter()) {
715        acc |= x ^ y;
716    }
717    acc == 0
718}
719
720/// v0.5 #27: encrypt under whichever source the caller picked.
721///
722/// - `SseSource::Keyring` → delegates to [`encrypt_v2`] (S4E2 frame).
723/// - `SseSource::CustomerKey` → writes an S4E3 frame (no key persisted,
724///   just the MD5 fingerprint for GET-side verification).
725///
726/// service.rs picks the source per-request: SSE-C headers present →
727/// `CustomerKey`, otherwise (and only when `--sse-s4-key` is wired) →
728/// `Keyring`. Plaintext objects skip this function entirely.
729pub fn encrypt_with_source(plaintext: &[u8], source: SseSource<'_>) -> Bytes {
730    match source {
731        SseSource::Keyring(kr) => encrypt_v2(plaintext, kr),
732        SseSource::CustomerKey { key, key_md5 } => encrypt_v3(plaintext, key, key_md5),
733        SseSource::Kms { dek, wrapped } => encrypt_v4(plaintext, dek, wrapped),
734    }
735}
736
737fn encrypt_v3(
738    plaintext: &[u8],
739    key: &[u8; KEY_LEN],
740    key_md5: &[u8; KEY_MD5_LEN],
741) -> Bytes {
742    let aes_key = Key::<Aes256Gcm>::from_slice(key);
743    let cipher = Aes256Gcm::new(aes_key);
744    let mut nonce_bytes = [0u8; NONCE_LEN];
745    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
746    let nonce = Nonce::from_slice(&nonce_bytes);
747    let aad = aad_v3(key_md5);
748    let ct_with_tag = cipher
749        .encrypt(
750            nonce,
751            Payload {
752                msg: plaintext,
753                aad: &aad,
754            },
755        )
756        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
757    debug_assert!(ct_with_tag.len() >= TAG_LEN);
758    let split = ct_with_tag.len() - TAG_LEN;
759    let (ct, tag) = ct_with_tag.split_at(split);
760
761    let mut out = Vec::with_capacity(SSE_HEADER_BYTES_V3 + ct.len());
762    out.extend_from_slice(SSE_MAGIC_V3);
763    out.push(ALGO_AES_256_GCM);
764    out.extend_from_slice(key_md5);
765    out.extend_from_slice(&nonce_bytes);
766    out.extend_from_slice(tag);
767    out.extend_from_slice(ct);
768    Bytes::from(out)
769}
770
771/// v0.5 #29 + v0.5 #27: dispatch on the body's magic and decrypt under
772/// whichever source the caller supplied.
773///
774/// - `S4E1` / `S4E2` require `SseSource::Keyring` (return
775///   [`SseError::CustomerKeyRequired`] for `CustomerKey` — service.rs
776///   should map this to "extraneous SSE-C headers" 400).
777/// - `S4E3` requires `SseSource::CustomerKey` (return
778///   [`SseError::CustomerKeyUnexpected`] for `Keyring` — service.rs
779///   should map this to "missing SSE-C headers" 400).
780///
781/// Generic over `Into<SseSource>` so existing `decrypt(body, &keyring)`
782/// call sites compile unchanged via the `From<&SseKeyring>` impl above
783/// — only the new SSE-C path needs to type out
784/// `SseSource::CustomerKey { .. }`.
785///
786/// Distinct errors (`KeyNotInKeyring`, `DecryptFailed`,
787/// `WrongCustomerKey`) let operators tell rotation gaps, ciphertext
788/// tampering, and SSE-C key mismatch apart in audit logs.
789pub fn decrypt<'a, S: Into<SseSource<'a>>>(body: &[u8], source: S) -> Result<Bytes, SseError> {
790    let source = source.into();
791    // Outer short-check uses the smaller of the two header sizes
792    // (S4E1/S4E2 = 36 bytes). Anything below this can't be any valid
793    // SSE frame regardless of magic — keeps back-compat with v0.4 /
794    // v0.5 #29 callers that expected `TooShort` for absurdly short
795    // inputs even when the magic is garbage.
796    if body.len() < SSE_HEADER_BYTES {
797        return Err(SseError::TooShort { got: body.len() });
798    }
799    let mut magic = [0u8; 4];
800    magic.copy_from_slice(&body[..4]);
801    match &magic {
802        m if m == SSE_MAGIC_V1 || m == SSE_MAGIC_V2 => {
803            let keyring = match source {
804                SseSource::Keyring(kr) => kr,
805                SseSource::CustomerKey { .. } => return Err(SseError::CustomerKeyUnexpected),
806                // S4E1/E2 stored under the keyring → SseSource::Kms
807                // is just as nonsensical as CustomerKey here. Re-use
808                // the same "wrong source" error so service.rs can
809                // map both to AWS S3's "extraneous SSE-* headers"
810                // 400.
811                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
812            };
813            if m == SSE_MAGIC_V1 {
814                decrypt_v1_with_keyring(body, keyring)
815            } else {
816                decrypt_v2_with_keyring(body, keyring)
817            }
818        }
819        m if m == SSE_MAGIC_V3 => {
820            // S4E3 has a larger 49-byte header, so re-check.
821            if body.len() < SSE_HEADER_BYTES_V3 {
822                return Err(SseError::TooShort { got: body.len() });
823            }
824            let (key, key_md5) = match source {
825                SseSource::CustomerKey { key, key_md5 } => (key, key_md5),
826                SseSource::Keyring(_) => return Err(SseError::CustomerKeyRequired),
827                SseSource::Kms { .. } => return Err(SseError::CustomerKeyRequired),
828            };
829            decrypt_v3(body, key, key_md5)
830        }
831        m if m == SSE_MAGIC_V4 => {
832            // SSE-KMS unwrap is async (KMS round-trip required).
833            // Caller must dispatch to `decrypt_with_kms` after
834            // peeking the magic — surface this as a distinct error
835            // rather than silently failing.
836            Err(SseError::KmsAsyncRequired)
837        }
838        m if m == SSE_MAGIC_V5 || m == SSE_MAGIC_V6 => {
839            // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): chunked SSE-S4.
840            // Sync back-compat path — verifies + decrypts every
841            // chunk into a single Bytes. Callers that want true
842            // streaming (per-chunk emit) should use
843            // `decrypt_chunked_stream` instead. SSE-C and SSE-KMS
844            // sources are nonsensical here for the same reason as
845            // S4E2 (server-managed keyring only).
846            let keyring = match source {
847                SseSource::Keyring(kr) => kr,
848                SseSource::CustomerKey { .. } => {
849                    return Err(SseError::CustomerKeyUnexpected);
850                }
851                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
852            };
853            decrypt_chunked_buffered(body, keyring)
854        }
855        _ => Err(SseError::BadMagic { got: magic }),
856    }
857}
858
859fn decrypt_v3(
860    body: &[u8],
861    key: &[u8; KEY_LEN],
862    supplied_md5: &[u8; KEY_MD5_LEN],
863) -> Result<Bytes, SseError> {
864    let algo = body[4];
865    if algo != ALGO_AES_256_GCM {
866        return Err(SseError::UnsupportedAlgo { tag: algo });
867    }
868    let mut stored_md5 = [0u8; KEY_MD5_LEN];
869    stored_md5.copy_from_slice(&body[5..5 + KEY_MD5_LEN]);
870    // Cheap fingerprint check first — if the supplied key has a
871    // different MD5 than what was used at PUT, fail fast with a
872    // dedicated error. AES-GCM auth would also catch this (different
873    // key → bad tag) but the bespoke error gives operators an audit
874    // signal distinct from "ciphertext was tampered with".
875    if !constant_time_eq(supplied_md5, &stored_md5) {
876        return Err(SseError::WrongCustomerKey);
877    }
878    let nonce_off = 5 + KEY_MD5_LEN;
879    let tag_off = nonce_off + NONCE_LEN;
880    let mut nonce_bytes = [0u8; NONCE_LEN];
881    nonce_bytes.copy_from_slice(&body[nonce_off..nonce_off + NONCE_LEN]);
882    let mut tag_bytes = [0u8; TAG_LEN];
883    tag_bytes.copy_from_slice(&body[tag_off..tag_off + TAG_LEN]);
884    let ct = &body[SSE_HEADER_BYTES_V3..];
885
886    let aad = aad_v3(&stored_md5);
887    let nonce = Nonce::from_slice(&nonce_bytes);
888    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
889    ct_with_tag.extend_from_slice(ct);
890    ct_with_tag.extend_from_slice(&tag_bytes);
891
892    let aes_key = Key::<Aes256Gcm>::from_slice(key);
893    let cipher = Aes256Gcm::new(aes_key);
894    let plain = cipher
895        .decrypt(
896            nonce,
897            Payload {
898                msg: &ct_with_tag,
899                aad: &aad,
900            },
901        )
902        .map_err(|_| SseError::DecryptFailed)?;
903    Ok(Bytes::from(plain))
904}
905
906/// AAD for S4E4 = magic (4) + algo (1) + key_id_len (1) + key_id +
907/// wrapped_dek_len (4 BE) + wrapped_dek. Putting the variable-length
908/// key_id and wrapped_dek into the AAD means an attacker cannot
909/// rewrite either field to redirect the gateway to a different KEK
910/// or wrapped DEK without invalidating the body's AES-GCM tag.
911///
912/// Length-prefixing key_id and wrapped_dek inside the AAD prevents a
913/// canonicalisation ambiguity: without the length prefix, an
914/// attacker could shift bytes between the two fields and produce the
915/// same AAD bytestream, defeating the per-field tampering check.
916fn aad_v4(key_id: &[u8], wrapped_dek: &[u8]) -> Vec<u8> {
917    let mut aad = Vec::with_capacity(4 + 1 + 1 + key_id.len() + 4 + wrapped_dek.len());
918    aad.extend_from_slice(SSE_MAGIC_V4);
919    aad.push(ALGO_AES_256_GCM);
920    aad.push(key_id.len() as u8);
921    aad.extend_from_slice(key_id);
922    aad.extend_from_slice(&(wrapped_dek.len() as u32).to_be_bytes());
923    aad.extend_from_slice(wrapped_dek);
924    aad
925}
926
927fn encrypt_v4(plaintext: &[u8], dek: &[u8; KEY_LEN], wrapped: &WrappedDek) -> Bytes {
928    // Pre-conditions: key_id must fit in a u8 length prefix and be
929    // non-empty (an empty id means we wouldn't be able to re-fetch
930    // the KEK on GET). wrapped_dek length fits in u32 by the same
931    // logic — at u32::MAX bytes you have bigger problems. We assert
932    // these in debug and silently truncate-or-panic in release; in
933    // practice key_id is a UUID or ARN (<256 chars) and wrapped_dek
934    // is 60 bytes (LocalKms) or ~200 bytes (AWS KMS).
935    assert!(
936        !wrapped.key_id.is_empty() && wrapped.key_id.len() <= u8::MAX as usize,
937        "S4E4 key_id must be 1..=255 bytes (got {})",
938        wrapped.key_id.len()
939    );
940    assert!(
941        wrapped.ciphertext.len() <= u32::MAX as usize,
942        "S4E4 wrapped_dek longer than u32::MAX",
943    );
944
945    let aes_key = Key::<Aes256Gcm>::from_slice(dek);
946    let cipher = Aes256Gcm::new(aes_key);
947    let mut nonce_bytes = [0u8; NONCE_LEN];
948    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
949    let nonce = Nonce::from_slice(&nonce_bytes);
950    let aad = aad_v4(wrapped.key_id.as_bytes(), &wrapped.ciphertext);
951    let ct_with_tag = cipher
952        .encrypt(
953            nonce,
954            Payload {
955                msg: plaintext,
956                aad: &aad,
957            },
958        )
959        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
960    debug_assert!(ct_with_tag.len() >= TAG_LEN);
961    let split = ct_with_tag.len() - TAG_LEN;
962    let (ct, tag) = ct_with_tag.split_at(split);
963
964    let key_id_bytes = wrapped.key_id.as_bytes();
965    let mut out = Vec::with_capacity(
966        4 + 1 + 1 + key_id_bytes.len() + 4 + wrapped.ciphertext.len() + NONCE_LEN + TAG_LEN + ct.len(),
967    );
968    out.extend_from_slice(SSE_MAGIC_V4);
969    out.push(ALGO_AES_256_GCM);
970    out.push(key_id_bytes.len() as u8);
971    out.extend_from_slice(key_id_bytes);
972    out.extend_from_slice(&(wrapped.ciphertext.len() as u32).to_be_bytes());
973    out.extend_from_slice(&wrapped.ciphertext);
974    out.extend_from_slice(&nonce_bytes);
975    out.extend_from_slice(tag);
976    out.extend_from_slice(ct);
977    Bytes::from(out)
978}
979
980/// Parsed view of an S4E4 frame's variable-length header. Returned
981/// by [`parse_s4e4_header`] so both the async [`decrypt_with_kms`]
982/// path and any future inspection code (e.g. an admin tool that
983/// needs to enumerate object → KMS-key bindings) can reuse the same
984/// parser without re-implementing offset math.
985#[derive(Debug)]
986pub struct S4E4Header<'a> {
987    pub key_id: &'a str,
988    pub wrapped_dek: &'a [u8],
989    pub nonce: &'a [u8],
990    pub tag: &'a [u8],
991    pub ciphertext: &'a [u8],
992}
993
994/// Parse the (variable-length) S4E4 header. Pure byte-shuffling — no
995/// crypto, no KMS round-trip. Returns errors on truncation /
996/// out-of-bounds field lengths / non-UTF-8 key_id.
997pub fn parse_s4e4_header(body: &[u8]) -> Result<S4E4Header<'_>, SseError> {
998    // Minimum: magic(4) + algo(1) + key_id_len(1) + key_id(>=1) +
999    // wrapped_dek_len(4) + wrapped_dek(>=1) + nonce(12) + tag(16)
1000    // = 40 bytes. We use a slightly looser floor here (bytes for
1001    // empty fields = 38) and let the per-field bounds checks below
1002    // catch the actual short reads.
1003    const S4E4_MIN: usize = 4 + 1 + 1 + 4 + NONCE_LEN + TAG_LEN; // 38
1004    if body.len() < S4E4_MIN {
1005        return Err(SseError::KmsFrameTooShort {
1006            got: body.len(),
1007            min: S4E4_MIN,
1008        });
1009    }
1010    let magic = &body[..4];
1011    if magic != SSE_MAGIC_V4 {
1012        let mut got = [0u8; 4];
1013        got.copy_from_slice(magic);
1014        return Err(SseError::BadMagic { got });
1015    }
1016    let algo = body[4];
1017    if algo != ALGO_AES_256_GCM {
1018        return Err(SseError::UnsupportedAlgo { tag: algo });
1019    }
1020    let key_id_len = body[5] as usize;
1021    let key_id_off: usize = 6;
1022    let key_id_end = key_id_off
1023        .checked_add(key_id_len)
1024        .ok_or(SseError::KmsFrameFieldOob { what: "key_id_len" })?;
1025    if key_id_end + 4 > body.len() {
1026        return Err(SseError::KmsFrameFieldOob { what: "key_id" });
1027    }
1028    let key_id = std::str::from_utf8(&body[key_id_off..key_id_end])
1029        .map_err(|_| SseError::KmsKeyIdNotUtf8)?;
1030    let wrapped_len_off = key_id_end;
1031    let wrapped_dek_len = u32::from_be_bytes([
1032        body[wrapped_len_off],
1033        body[wrapped_len_off + 1],
1034        body[wrapped_len_off + 2],
1035        body[wrapped_len_off + 3],
1036    ]) as usize;
1037    let wrapped_off = wrapped_len_off + 4;
1038    let wrapped_end = wrapped_off
1039        .checked_add(wrapped_dek_len)
1040        .ok_or(SseError::KmsFrameFieldOob { what: "wrapped_dek_len" })?;
1041    if wrapped_end + NONCE_LEN + TAG_LEN > body.len() {
1042        return Err(SseError::KmsFrameFieldOob { what: "wrapped_dek" });
1043    }
1044    let wrapped_dek = &body[wrapped_off..wrapped_end];
1045    let nonce_off = wrapped_end;
1046    let tag_off = nonce_off + NONCE_LEN;
1047    let ct_off = tag_off + TAG_LEN;
1048    let nonce = &body[nonce_off..nonce_off + NONCE_LEN];
1049    let tag = &body[tag_off..tag_off + TAG_LEN];
1050    let ciphertext = &body[ct_off..];
1051    Ok(S4E4Header {
1052        key_id,
1053        wrapped_dek,
1054        nonce,
1055        tag,
1056        ciphertext,
1057    })
1058}
1059
1060/// Async decrypt for S4E4 (SSE-KMS) bodies. Caller supplies the KMS
1061/// backend; this function parses the frame, calls
1062/// `kms.decrypt_dek(...)` to unwrap the DEK, then runs AES-256-GCM
1063/// to recover the plaintext.
1064///
1065/// service.rs's GET handler should peek the magic with [`peek_magic`]
1066/// and dispatch:
1067///
1068/// - `Some("S4E4")` → `decrypt_with_kms(blob, &*kms).await`
1069/// - everything else → existing sync `decrypt(blob, source)`
1070///
1071/// Note: we don't go through `SseSource::Kms` here because the
1072/// wrapped DEK + key_id come from the frame itself, not from the
1073/// request — the `SseSource` is built for sync paths where the
1074/// caller already knows the key.
1075pub async fn decrypt_with_kms(
1076    body: &[u8],
1077    kms: &dyn KmsBackend,
1078) -> Result<Bytes, SseError> {
1079    let hdr = parse_s4e4_header(body)?;
1080    let wrapped = WrappedDek {
1081        key_id: hdr.key_id.to_string(),
1082        ciphertext: hdr.wrapped_dek.to_vec(),
1083    };
1084    let dek_vec = kms.decrypt_dek(&wrapped).await?;
1085    if dek_vec.len() != KEY_LEN {
1086        // KMS returned a non-32-byte plaintext. AES-256 needs exactly
1087        // 32 bytes. This shouldn't happen with `KeySpec=AES_256` but
1088        // surface as a backend error so it's auditable rather than
1089        // panicking.
1090        return Err(SseError::KmsBackend(KmsError::BackendUnavailable {
1091            message: format!(
1092                "KMS returned {} byte DEK; expected {KEY_LEN}",
1093                dek_vec.len()
1094            ),
1095        }));
1096    }
1097    let mut dek = [0u8; KEY_LEN];
1098    dek.copy_from_slice(&dek_vec);
1099
1100    let aad = aad_v4(hdr.key_id.as_bytes(), hdr.wrapped_dek);
1101    let aes_key = Key::<Aes256Gcm>::from_slice(&dek);
1102    let cipher = Aes256Gcm::new(aes_key);
1103    let nonce = Nonce::from_slice(hdr.nonce);
1104    let mut ct_with_tag = Vec::with_capacity(hdr.ciphertext.len() + TAG_LEN);
1105    ct_with_tag.extend_from_slice(hdr.ciphertext);
1106    ct_with_tag.extend_from_slice(hdr.tag);
1107    let plain = cipher
1108        .decrypt(
1109            nonce,
1110            Payload {
1111                msg: &ct_with_tag,
1112                aad: &aad,
1113            },
1114        )
1115        .map_err(|_| SseError::DecryptFailed)?;
1116    Ok(Bytes::from(plain))
1117}
1118
1119fn decrypt_v1_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1120    let algo = body[4];
1121    if algo != ALGO_AES_256_GCM {
1122        return Err(SseError::UnsupportedAlgo { tag: algo });
1123    }
1124    // body[5..8] reserved (must be ignored — v0.4 wrote zeros, but we
1125    // didn't auth them so we can't insist on it).
1126    let mut nonce_bytes = [0u8; NONCE_LEN];
1127    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1128    let mut tag_bytes = [0u8; TAG_LEN];
1129    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1130    let ct = &body[SSE_HEADER_BYTES..];
1131
1132    let aad = aad_v1();
1133    let nonce = Nonce::from_slice(&nonce_bytes);
1134    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1135    ct_with_tag.extend_from_slice(ct);
1136    ct_with_tag.extend_from_slice(&tag_bytes);
1137
1138    // Active key first, then any others. v0.4 deployments that flip to
1139    // v0.5 with their original key as active hit this path on the
1140    // first try for every legacy object.
1141    let (active_id, _active_key) = keyring.active();
1142    let mut ids: Vec<u16> = keyring.keys.keys().copied().collect();
1143    ids.sort_by_key(|id| if *id == active_id { 0 } else { 1 });
1144    for id in ids {
1145        let key = keyring.get(id).expect("id came from keyring iteration");
1146        let cipher = Aes256Gcm::new(key.as_aes_key());
1147        if let Ok(plain) = cipher.decrypt(
1148            nonce,
1149            Payload {
1150                msg: &ct_with_tag,
1151                aad: &aad,
1152            },
1153        ) {
1154            return Ok(Bytes::from(plain));
1155        }
1156    }
1157    Err(SseError::DecryptFailed)
1158}
1159
1160fn decrypt_v2_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1161    let algo = body[4];
1162    if algo != ALGO_AES_256_GCM {
1163        return Err(SseError::UnsupportedAlgo { tag: algo });
1164    }
1165    let key_id = u16::from_be_bytes([body[5], body[6]]);
1166    // body[7] reserved (1B), authenticated as 0 via AAD.
1167    let key = keyring
1168        .get(key_id)
1169        .ok_or(SseError::KeyNotInKeyring { id: key_id })?;
1170    let mut nonce_bytes = [0u8; NONCE_LEN];
1171    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1172    let mut tag_bytes = [0u8; TAG_LEN];
1173    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1174    let ct = &body[SSE_HEADER_BYTES..];
1175
1176    let aad = aad_v2(key_id);
1177    let nonce = Nonce::from_slice(&nonce_bytes);
1178    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1179    ct_with_tag.extend_from_slice(ct);
1180    ct_with_tag.extend_from_slice(&tag_bytes);
1181    let cipher = Aes256Gcm::new(key.as_aes_key());
1182    let plain = cipher
1183        .decrypt(
1184            nonce,
1185            Payload {
1186                msg: &ct_with_tag,
1187                aad: &aad,
1188            },
1189        )
1190        .map_err(|_| SseError::DecryptFailed)?;
1191    Ok(Bytes::from(plain))
1192}
1193
1194/// Detect whether `body` is SSE-S4 encrypted (S4E1, S4E2, S4E3, or
1195/// S4E4) by sniffing the first 4 magic bytes. Used by the GET path
1196/// to decide whether to run decryption before frame parsing.
1197///
1198/// We require a length check that's safe for *any* of the four
1199/// frames — `SSE_HEADER_BYTES` (36) is the smallest valid header
1200/// (S4E1 / S4E2). S4E3 is 49 bytes; S4E4 is variable but always >=
1201/// 38 bytes. The per-frame decrypt path re-checks the appropriate
1202/// minimum, so this 36-byte gate is just a fast rejection of
1203/// obviously-too-short bodies.
1204pub fn looks_encrypted(body: &[u8]) -> bool {
1205    if body.len() < SSE_HEADER_BYTES {
1206        return false;
1207    }
1208    let m = &body[..4];
1209    m == SSE_MAGIC_V1
1210        || m == SSE_MAGIC_V2
1211        || m == SSE_MAGIC_V3
1212        || m == SSE_MAGIC_V4
1213        || m == SSE_MAGIC_V5
1214        || m == SSE_MAGIC_V6
1215}
1216
1217/// Peek the SSE-S4 magic at the front of `body`, returning a
1218/// stringified frame variant identifier or `None` if `body` is not
1219/// recognized as SSE-S4. Used by the GET path to dispatch between
1220/// the sync [`decrypt`] (S4E1/E2/E3) and the async
1221/// [`decrypt_with_kms`] (S4E4).
1222///
1223/// Returns the same length-gated result as [`looks_encrypted`]: any
1224/// body shorter than `SSE_HEADER_BYTES` (36 bytes) returns `None`,
1225/// so the caller can use this as both the "is encrypted" signal and
1226/// the "which frame" signal in one cheap byte-comparison.
1227pub fn peek_magic(body: &[u8]) -> Option<&'static str> {
1228    if body.len() < SSE_HEADER_BYTES {
1229        return None;
1230    }
1231    match &body[..4] {
1232        m if m == SSE_MAGIC_V1 => Some("S4E1"),
1233        m if m == SSE_MAGIC_V2 => Some("S4E2"),
1234        m if m == SSE_MAGIC_V3 => Some("S4E3"),
1235        m if m == SSE_MAGIC_V4 => Some("S4E4"),
1236        // v0.8 #52: chunked SSE-S4. service.rs's GET handler
1237        // dispatches "S4E5" / "S4E6" → `decrypt_chunked_stream`
1238        // for true streaming GET; the sync `decrypt(...)` also
1239        // accepts both (back-compat — buffered concat).
1240        m if m == SSE_MAGIC_V5 => Some("S4E5"),
1241        // v0.8.1 #57: same dispatch as S4E5 — wider salt only.
1242        m if m == SSE_MAGIC_V6 => Some("S4E6"),
1243        _ => None,
1244    }
1245}
1246
1247pub type SharedSseKey = Arc<SseKey>;
1248
1249// ===========================================================================
1250// v0.8 #52 (S4E5, read-only) + v0.8.1 #57 (S4E6, current emit) —
1251// chunked variant of S4E2 for streaming GET
1252// ===========================================================================
1253//
1254// ## S4E5 wire format (v0.8 #52, **read-only as of v0.8.1 #57**)
1255//
1256// ```text
1257// magic         4B    "S4E5"
1258// algo          1B    0x01 (AES-256-GCM)
1259// key_id        2B    BE — keyring slot the active key was at PUT time
1260// reserved      1B    0x00
1261// chunk_size    4B    BE — plaintext bytes per chunk (final chunk may be smaller)
1262// chunk_count   4B    BE — total chunks (always >= 1; empty plaintext = 1 zero-byte chunk)
1263// salt          4B    random per-PUT, mixed into every nonce
1264// [chunk_count] × {
1265//   tag         16B   AES-GCM auth tag for this chunk
1266//   ciphertext  N B   chunk_size bytes (final chunk: 0..=chunk_size bytes)
1267// }
1268// ```
1269//
1270// Fixed header = 20 bytes ([`S4E5_HEADER_BYTES`]).
1271//
1272// ## S4E6 wire format (v0.8.1 #57, current PUT emit)
1273//
1274// ```text
1275// magic         4B    "S4E6"
1276// algo          1B    0x01 (AES-256-GCM)
1277// key_id        2B    BE
1278// reserved      1B    0x00
1279// chunk_size    4B    BE
1280// chunk_count   4B    BE
1281// salt          8B    random per-PUT  ← 4B → 8B widened
1282// [chunk_count] × { tag 16B, ciphertext N B }
1283// ```
1284//
1285// Fixed header = 24 bytes ([`S4E6_HEADER_BYTES`]). Chunk array
1286// layout is byte-identical to S4E5; only the header (salt 4 → 8)
1287// and the nonce/AAD derivation differ.
1288//
1289// ## Per-chunk overhead (both S4E5 and S4E6)
1290//
1291// 16 bytes — just the AES-GCM auth tag. AES-GCM is CTR-mode, so
1292// `ciphertext.len() == plaintext.len()`. Total overhead for an
1293// N-byte plaintext at chunk size C: `header + ceil(N/C) * 16`.
1294//
1295// ## S4E5 nonce / AAD (read-only)
1296//
1297// ```text
1298// nonce_v5[0..4]  = b"E5\x00\x00"
1299// nonce_v5[4..8]  = salt (4 B)
1300// nonce_v5[8..12] = chunk_index BE (u32)
1301//
1302// aad_v5 = b"S4E5" || algo (1) || chunk_index BE (4) || total BE (4)
1303//        || key_id BE (2) || salt (4)
1304// ```
1305//
1306// Birthday-collision threshold on the 4-byte salt: ~50% at ~65,536
1307// distinct PUTs under the same key — the security regression that
1308// motivated #57.
1309//
1310// ## S4E6 nonce / AAD (current emit)
1311//
1312// ```text
1313// nonce_v6[0]     = b'E'                   (1 B fixed prefix)
1314// nonce_v6[1..9]  = salt (8 B)             (per-PUT random from OsRng)
1315// nonce_v6[9..12] = chunk_index BE (u24)   (3 B → max 16_777_215 chunks)
1316//
1317// aad_v6 = b"S4E6" || algo (1) || chunk_index BE (4) || total BE (4)
1318//        || key_id BE (2) || salt (8)
1319// ```
1320//
1321// Wider salt: birthday collision ~50% at ~2^32 = ~4.3 billion
1322// PUTs/key — four orders of magnitude over S4E5.
1323//
1324// chunk_index narrows from 32-bit to 24-bit, capping `chunk_count`
1325// at `2^24 - 1 = 16_777_215`. At the default `--sse-chunk-size
1326// 1048576` (1 MiB) that's ~16 PiB per object — three orders of
1327// magnitude over S3's 5 GiB single-object cap. Smaller chunk sizes
1328// need to be sized carefully: e.g. `--sse-chunk-size 64` on a
1329// > 1 GiB object would exceed the cap (1 GiB / 64 B = 16M+1
1330// chunks); such configurations surface
1331// [`SseError::ChunkCountTooLarge`] at PUT time rather than
1332// silently truncating.
1333//
1334// AAD on both variants includes the chunk index + total so chunk
1335// reordering or dropping fails the per-chunk tag, plus key_id +
1336// salt so header tampering also fails auth.
1337
1338/// Fixed header size of an S4E5 frame, before any chunks. `magic 4 +
1339/// algo 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 +
1340/// salt 4` = 20 bytes.
1341pub const S4E5_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 4; // = 20
1342
1343/// Per-chunk overhead inside an S4E5 / S4E6 frame: just the AES-GCM
1344/// auth tag. `ciphertext.len() == plaintext.len()` (CTR mode), so a
1345/// chunk of N plaintext bytes costs N + 16 on disk.
1346pub const S4E5_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1347
1348/// v0.8.1 #57: fixed header size of an S4E6 frame. Same layout as
1349/// S4E5 except the per-PUT salt widens 4 → 8 bytes: `magic 4 + algo
1350/// 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 + salt
1351/// 8` = 24 bytes.
1352pub const S4E6_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 8; // = 24
1353
1354/// v0.8.1 #57: per-chunk overhead for S4E6. Identical to S4E5
1355/// (same AES-GCM tag size). Re-exported as a distinct const so call
1356/// sites that compute on-disk size for S4E6 specifically can spell
1357/// the magic clearly in their arithmetic.
1358pub const S4E6_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1359
1360/// v0.8.1 #57: maximum `chunk_count` that fits in the S4E6 nonce's
1361/// 24-bit chunk_index field. At 1 MiB chunks this is ~16 PiB per
1362/// object — three orders of magnitude over S3's 5 GiB single-object
1363/// cap, so it's not a practical limit at the default chunk size.
1364pub const S4E6_MAX_CHUNK_COUNT: u32 = (1u32 << 24) - 1; // 16_777_215
1365
1366/// 4-byte fixed prefix of every S4E5 nonce. Distinct from the bytes
1367/// a random S4E1/E2 nonce could plausibly start with so debugging
1368/// dumps can immediately tell "this is a chunked nonce" from the
1369/// first 4 bytes.
1370const S4E5_NONCE_TAG: [u8; 4] = [b'E', b'5', 0, 0];
1371
1372/// 1-byte fixed prefix of every S4E6 nonce. Trades 3 of S4E5's 4
1373/// "tag" bytes for 4 extra salt bytes (4 → 8) and 0 of the chunk
1374/// index bytes (24-bit instead of 32-bit). The remaining `b'E'`
1375/// keeps debug dumps recognizable as "chunked SSE-S4 nonce".
1376const S4E6_NONCE_PREFIX: u8 = b'E';
1377
1378/// Variant tag for the chunked-frame helpers. Selects the nonce +
1379/// AAD derivation (and incidentally the salt width). The
1380/// chunk-array layout is byte-identical for both — only the header
1381/// size and the nonce/AAD derivation differ.
1382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1383enum ChunkedVariant {
1384    V5,
1385    V6,
1386}
1387
1388impl ChunkedVariant {
1389    fn header_bytes(self) -> usize {
1390        match self {
1391            ChunkedVariant::V5 => S4E5_HEADER_BYTES,
1392            ChunkedVariant::V6 => S4E6_HEADER_BYTES,
1393        }
1394    }
1395}
1396
1397/// Build the per-chunk AAD for an S4E5 chunk. Includes magic + algo
1398/// plus the structural chunk_index/total_chunks (so chunk reordering
1399/// fails auth) plus key_id + salt (so header tampering — flipping
1400/// key_id or salt — also fails auth).
1401fn aad_v5(
1402    chunk_index: u32,
1403    total_chunks: u32,
1404    key_id: u16,
1405    salt: &[u8; 4],
1406) -> [u8; 4 + 1 + 4 + 4 + 2 + 4] {
1407    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 4]; // = 19
1408    aad[..4].copy_from_slice(SSE_MAGIC_V5);
1409    aad[4] = ALGO_AES_256_GCM;
1410    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1411    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1412    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1413    aad[15..19].copy_from_slice(salt);
1414    aad
1415}
1416
1417/// v0.8.1 #57: per-chunk AAD for S4E6. Same structural fields as
1418/// [`aad_v5`] (magic + algo + chunk_index + total + key_id + salt)
1419/// but with the wider 8-byte salt and the new `b"S4E6"` magic, so
1420/// an attacker can't strip the version tag and replay an S4E5
1421/// nonce/tag against an S4E6 frame.
1422fn aad_v6(
1423    chunk_index: u32,
1424    total_chunks: u32,
1425    key_id: u16,
1426    salt: &[u8; 8],
1427) -> [u8; 4 + 1 + 4 + 4 + 2 + 8] {
1428    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 8]; // = 23
1429    aad[..4].copy_from_slice(SSE_MAGIC_V6);
1430    aad[4] = ALGO_AES_256_GCM;
1431    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1432    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1433    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1434    aad[15..23].copy_from_slice(salt);
1435    aad
1436}
1437
1438/// Derive the 12-byte AES-GCM nonce for chunk `chunk_index` from the
1439/// per-PUT `salt`. Pure function; no RNG state — the same `(salt,
1440/// chunk_index)` always yields the same nonce, which is the whole
1441/// point: GET reads `salt` from the header and walks the chunks
1442/// without storing 12 bytes of nonce per chunk.
1443fn nonce_v5(salt: &[u8; 4], chunk_index: u32) -> [u8; NONCE_LEN] {
1444    let mut n = [0u8; NONCE_LEN];
1445    n[..4].copy_from_slice(&S4E5_NONCE_TAG);
1446    n[4..8].copy_from_slice(salt);
1447    n[8..12].copy_from_slice(&chunk_index.to_be_bytes());
1448    n
1449}
1450
1451/// v0.8.1 #57: derive the 12-byte AES-GCM nonce for an S4E6 chunk:
1452/// `b'E'(1) || salt(8) || chunk_index_BE_u24(3)`. The 24-bit
1453/// chunk_index caps `chunk_count` at 16,777,215 — see
1454/// [`S4E6_MAX_CHUNK_COUNT`]. The pre-encrypt path enforces this cap
1455/// and surfaces [`SseError::ChunkCountTooLarge`], so this function
1456/// only ever sees `chunk_index <= 0xFF_FFFF` (the leading byte of
1457/// the BE u32 is dropped).
1458fn nonce_v6(salt: &[u8; 8], chunk_index: u32) -> [u8; NONCE_LEN] {
1459    debug_assert!(
1460        chunk_index <= S4E6_MAX_CHUNK_COUNT,
1461        "S4E6 chunk_index {chunk_index} exceeds 24-bit cap (caller MUST validate)",
1462    );
1463    let mut n = [0u8; NONCE_LEN];
1464    n[0] = S4E6_NONCE_PREFIX;
1465    n[1..9].copy_from_slice(salt);
1466    let be = chunk_index.to_be_bytes(); // [b3, b2, b1, b0] of u32
1467    // Take the low 3 bytes (b2, b1, b0) — the high byte is 0 by the
1468    // S4E6_MAX_CHUNK_COUNT cap above.
1469    n[9..12].copy_from_slice(&be[1..4]);
1470    n
1471}
1472
1473/// v0.8 #52 / v0.8.1 #57: encrypt `plaintext` under `keyring`'s
1474/// active key, sliced into independently-sealed AES-GCM chunks of
1475/// `chunk_size` plaintext bytes each. Returns the on-the-wire
1476/// **S4E6** frame (v0.8.1 #57 widened the per-PUT salt 4 B → 8 B;
1477/// the S4E5 emit path was retired but the [`decrypt`] /
1478/// [`decrypt_chunked_stream`] paths still read S4E5 objects for
1479/// back-compat).
1480///
1481/// Errors:
1482/// - [`SseError::ChunkSizeInvalid`] if `chunk_size == 0`.
1483/// - [`SseError::ChunkCountTooLarge`] if
1484///   `ceil(plaintext.len() / chunk_size) > 16_777_215` (the S4E6
1485///   24-bit chunk_index cap; pick a larger `--sse-chunk-size`).
1486///
1487/// Empty plaintext is permitted and produces a frame with
1488/// `chunk_count = 1, ciphertext_len = 0` (one all-tag chunk). That
1489/// keeps the GET chunk-walk loop simpler — it never has to
1490/// special-case zero chunks.
1491///
1492/// `chunk_size` is the *plaintext* bytes per chunk; the on-disk
1493/// ciphertext per chunk is the same number (AES-GCM is CTR-mode),
1494/// plus the 16-byte tag prepended.
1495pub fn encrypt_v2_chunked(
1496    plaintext: &[u8],
1497    keyring: &SseKeyring,
1498    chunk_size: usize,
1499) -> Result<Bytes, SseError> {
1500    if chunk_size == 0 {
1501        return Err(SseError::ChunkSizeInvalid);
1502    }
1503    let (key_id, key) = keyring.active();
1504    let cipher = Aes256Gcm::new(key.as_aes_key());
1505    let mut salt = [0u8; 8];
1506    rand::rngs::OsRng.fill_bytes(&mut salt);
1507
1508    // Always emit at least one chunk (so an empty plaintext still
1509    // has a well-defined header → chunk_count >= 1 invariant).
1510    let chunk_count_usize = if plaintext.is_empty() {
1511        1
1512    } else {
1513        plaintext.len().div_ceil(chunk_size)
1514    };
1515    // Saturating-cast to u32 so we report ChunkCountTooLarge cleanly
1516    // for inputs that would overflow u32 too (would need a > 16 EiB
1517    // plaintext at chunk_size = 1 — astronomical, but defensive).
1518    let chunk_count: u32 = u32::try_from(chunk_count_usize).unwrap_or(u32::MAX);
1519    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1520        return Err(SseError::ChunkCountTooLarge {
1521            got: chunk_count,
1522            max: S4E6_MAX_CHUNK_COUNT,
1523        });
1524    }
1525
1526    let mut out = Vec::with_capacity(
1527        S4E6_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E6_PER_CHUNK_OVERHEAD),
1528    );
1529    out.extend_from_slice(SSE_MAGIC_V6);
1530    out.push(ALGO_AES_256_GCM);
1531    out.extend_from_slice(&key_id.to_be_bytes());
1532    out.push(0u8); // reserved
1533    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
1534    out.extend_from_slice(&chunk_count.to_be_bytes());
1535    out.extend_from_slice(&salt);
1536
1537    for i in 0..chunk_count {
1538        let off = (i as usize).saturating_mul(chunk_size);
1539        let end = off.saturating_add(chunk_size).min(plaintext.len());
1540        let chunk_pt: &[u8] = if off >= plaintext.len() {
1541            // Empty-plaintext / past-end (only the single-chunk
1542            // empty-plaintext case lands here).
1543            &[]
1544        } else {
1545            &plaintext[off..end]
1546        };
1547        let nonce_bytes = nonce_v6(&salt, i);
1548        let nonce = Nonce::from_slice(&nonce_bytes);
1549        let aad = aad_v6(i, chunk_count, key_id, &salt);
1550        let ct_with_tag = cipher
1551            .encrypt(
1552                nonce,
1553                Payload {
1554                    msg: chunk_pt,
1555                    aad: &aad,
1556                },
1557            )
1558            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
1559        debug_assert!(ct_with_tag.len() >= TAG_LEN);
1560        let split = ct_with_tag.len() - TAG_LEN;
1561        let (ct, tag) = ct_with_tag.split_at(split);
1562        out.extend_from_slice(tag);
1563        out.extend_from_slice(ct);
1564        crate::metrics::record_sse_streaming_chunk("encrypt");
1565    }
1566    Ok(Bytes::from(out))
1567}
1568
1569/// Salt material for a chunked frame — branches on variant so the
1570/// shared chunk-walking loop can carry both 4-byte (S4E5) and
1571/// 8-byte (S4E6) salts without an extra heap alloc.
1572#[derive(Debug, Clone, Copy)]
1573enum ChunkedSalt {
1574    V5([u8; 4]),
1575    V6([u8; 8]),
1576}
1577
1578/// Parsed S4E5 / S4E6 header — fixed-layout fields. Used by the
1579/// buffered ([`decrypt_chunked_buffered`]) and streaming
1580/// ([`decrypt_chunked_stream`]) paths to share frame validation
1581/// across both variants.
1582#[derive(Debug, Clone, Copy)]
1583struct ChunkedHeader {
1584    /// Used only by tests today (asserts on which frame variant
1585    /// parsed); in production the variant is implicit in
1586    /// `salt`'s ChunkedSalt arm. Kept as a field rather than
1587    /// re-deriving from `salt` so the parser writes one source of
1588    /// truth.
1589    #[allow(dead_code)]
1590    variant: ChunkedVariant,
1591    key_id: u16,
1592    chunk_size: u32,
1593    chunk_count: u32,
1594    salt: ChunkedSalt,
1595    /// Byte offset where the chunk array starts (always
1596    /// `variant.header_bytes()`; carried in the struct so call sites
1597    /// don't have to re-derive it from the variant tag).
1598    chunks_offset: usize,
1599}
1600
1601/// Parsed view of an S4E6 frame's fixed header. Public mirror of
1602/// the S4E4 parser — useful for admin tools or future inspectors
1603/// that want to enumerate object → key_id bindings without
1604/// re-implementing the offset math. The `salt` borrow keeps
1605/// allocations to zero (the slice points back into the input
1606/// buffer).
1607#[derive(Debug, Clone, Copy)]
1608pub struct S4E6Header<'a> {
1609    pub key_id: u16,
1610    pub chunk_size: u32,
1611    pub chunk_count: u32,
1612    pub salt: &'a [u8; 8],
1613}
1614
1615/// Pure byte-shuffle parser for an S4E6 fixed header (24 bytes). No
1616/// crypto, no keyring lookup. Errors on truncation, wrong magic,
1617/// unsupported algo, or zero `chunk_size` / `chunk_count`.
1618pub fn parse_s4e6_header(blob: &[u8]) -> Result<S4E6Header<'_>, SseError> {
1619    if blob.len() < S4E6_HEADER_BYTES {
1620        return Err(SseError::ChunkFrameTruncated { what: "header" });
1621    }
1622    if &blob[..4] != SSE_MAGIC_V6 {
1623        let mut got = [0u8; 4];
1624        got.copy_from_slice(&blob[..4]);
1625        return Err(SseError::BadMagic { got });
1626    }
1627    let algo = blob[4];
1628    if algo != ALGO_AES_256_GCM {
1629        return Err(SseError::UnsupportedAlgo { tag: algo });
1630    }
1631    let key_id = u16::from_be_bytes([blob[5], blob[6]]);
1632    // blob[7] = reserved (0; authenticated as 0 via AAD).
1633    let chunk_size = u32::from_be_bytes([blob[8], blob[9], blob[10], blob[11]]);
1634    let chunk_count = u32::from_be_bytes([blob[12], blob[13], blob[14], blob[15]]);
1635    if chunk_size == 0 {
1636        return Err(SseError::ChunkSizeInvalid);
1637    }
1638    if chunk_count == 0 {
1639        return Err(SseError::ChunkFrameTruncated {
1640            what: "chunk_count == 0",
1641        });
1642    }
1643    if chunk_count > S4E6_MAX_CHUNK_COUNT {
1644        return Err(SseError::ChunkCountTooLarge {
1645            got: chunk_count,
1646            max: S4E6_MAX_CHUNK_COUNT,
1647        });
1648    }
1649    let salt: &[u8; 8] = (&blob[16..24]).try_into().expect("8B salt slice");
1650    Ok(S4E6Header {
1651        key_id,
1652        chunk_size,
1653        chunk_count,
1654        salt,
1655    })
1656}
1657
1658fn parse_chunked_header(body: &[u8]) -> Result<ChunkedHeader, SseError> {
1659    if body.len() < 4 {
1660        return Err(SseError::ChunkFrameTruncated { what: "magic" });
1661    }
1662    let magic = &body[..4];
1663    let variant = if magic == SSE_MAGIC_V5 {
1664        ChunkedVariant::V5
1665    } else if magic == SSE_MAGIC_V6 {
1666        ChunkedVariant::V6
1667    } else {
1668        let mut got = [0u8; 4];
1669        got.copy_from_slice(magic);
1670        return Err(SseError::BadMagic { got });
1671    };
1672    let header_bytes = variant.header_bytes();
1673    if body.len() < header_bytes {
1674        return Err(SseError::ChunkFrameTruncated { what: "header" });
1675    }
1676    let algo = body[4];
1677    if algo != ALGO_AES_256_GCM {
1678        return Err(SseError::UnsupportedAlgo { tag: algo });
1679    }
1680    let key_id = u16::from_be_bytes([body[5], body[6]]);
1681    // body[7] = reserved (must be 0; authenticated as 0 via AAD).
1682    let chunk_size = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
1683    let chunk_count = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
1684    if chunk_size == 0 {
1685        return Err(SseError::ChunkSizeInvalid);
1686    }
1687    if chunk_count == 0 {
1688        return Err(SseError::ChunkFrameTruncated {
1689            what: "chunk_count == 0",
1690        });
1691    }
1692    let salt = match variant {
1693        ChunkedVariant::V5 => {
1694            let mut s = [0u8; 4];
1695            s.copy_from_slice(&body[16..20]);
1696            ChunkedSalt::V5(s)
1697        }
1698        ChunkedVariant::V6 => {
1699            // v0.8.1 #57 sanity check: the encoder enforces this cap,
1700            // but a tampered / malicious frame could declare a huge
1701            // chunk_count that would loop the walker 16M+ times if
1702            // we trusted it. Reject early.
1703            if chunk_count > S4E6_MAX_CHUNK_COUNT {
1704                return Err(SseError::ChunkCountTooLarge {
1705                    got: chunk_count,
1706                    max: S4E6_MAX_CHUNK_COUNT,
1707                });
1708            }
1709            let mut s = [0u8; 8];
1710            s.copy_from_slice(&body[16..24]);
1711            ChunkedSalt::V6(s)
1712        }
1713    };
1714    Ok(ChunkedHeader {
1715        variant,
1716        key_id,
1717        chunk_size,
1718        chunk_count,
1719        salt,
1720        chunks_offset: header_bytes,
1721    })
1722}
1723
1724/// Decrypt one chunk under either the S4E5 or S4E6 derivation. Used
1725/// by both the buffered and streaming paths so AAD / nonce
1726/// derivation lives in exactly one place.
1727fn decrypt_chunked_chunk(
1728    cipher: &Aes256Gcm,
1729    chunk_index: u32,
1730    chunk_count: u32,
1731    key_id: u16,
1732    salt: &ChunkedSalt,
1733    tag: &[u8; TAG_LEN],
1734    ct: &[u8],
1735) -> Result<Bytes, SseError> {
1736    let nonce_bytes = match salt {
1737        ChunkedSalt::V5(s) => nonce_v5(s, chunk_index),
1738        ChunkedSalt::V6(s) => nonce_v6(s, chunk_index),
1739    };
1740    let nonce = Nonce::from_slice(&nonce_bytes);
1741    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1742    ct_with_tag.extend_from_slice(ct);
1743    ct_with_tag.extend_from_slice(tag);
1744    let result = match salt {
1745        ChunkedSalt::V5(s) => {
1746            let aad = aad_v5(chunk_index, chunk_count, key_id, s);
1747            cipher.decrypt(
1748                nonce,
1749                Payload {
1750                    msg: &ct_with_tag,
1751                    aad: &aad,
1752                },
1753            )
1754        }
1755        ChunkedSalt::V6(s) => {
1756            let aad = aad_v6(chunk_index, chunk_count, key_id, s);
1757            cipher.decrypt(
1758                nonce,
1759                Payload {
1760                    msg: &ct_with_tag,
1761                    aad: &aad,
1762                },
1763            )
1764        }
1765    };
1766    result
1767        .map(Bytes::from)
1768        .map_err(|_| SseError::ChunkAuthFailed { chunk_index })
1769}
1770
1771/// Walk an S4E5 / S4E6 body chunk-by-chunk, calling `emit` on each
1772/// successfully-verified plaintext chunk. Returns immediately on the
1773/// first chunk that fails auth or is truncated. Shared core between
1774/// the buffered ([`decrypt_chunked_buffered`]) and streaming
1775/// ([`decrypt_chunked_stream`]) paths.
1776fn walk_chunked<F: FnMut(Bytes) -> Result<(), SseError>>(
1777    body: &[u8],
1778    keyring: &SseKeyring,
1779    mut emit: F,
1780) -> Result<(), SseError> {
1781    let hdr = parse_chunked_header(body)?;
1782    let key = keyring
1783        .get(hdr.key_id)
1784        .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1785    let cipher = Aes256Gcm::new(key.as_aes_key());
1786
1787    let mut cursor = hdr.chunks_offset;
1788    let chunk_size = hdr.chunk_size as usize;
1789    for i in 0..hdr.chunk_count {
1790        if cursor + TAG_LEN > body.len() {
1791            return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1792        }
1793        let tag_off = cursor;
1794        let ct_off = tag_off + TAG_LEN;
1795        let is_last = i + 1 == hdr.chunk_count;
1796        let ct_len = if is_last {
1797            if ct_off > body.len() {
1798                return Err(SseError::ChunkFrameTruncated {
1799                    what: "final chunk ciphertext",
1800                });
1801            }
1802            let remaining = body.len() - ct_off;
1803            if remaining > chunk_size {
1804                return Err(SseError::ChunkFrameTruncated {
1805                    what: "trailing bytes after final chunk",
1806                });
1807            }
1808            remaining
1809        } else {
1810            chunk_size
1811        };
1812        let ct_end = ct_off + ct_len;
1813        if ct_end > body.len() {
1814            return Err(SseError::ChunkFrameTruncated {
1815                what: "chunk ciphertext",
1816            });
1817        }
1818        let mut tag = [0u8; TAG_LEN];
1819        tag.copy_from_slice(&body[tag_off..ct_off]);
1820        let ct = &body[ct_off..ct_end];
1821        let plain = decrypt_chunked_chunk(
1822            &cipher,
1823            i,
1824            hdr.chunk_count,
1825            hdr.key_id,
1826            &hdr.salt,
1827            &tag,
1828            ct,
1829        )?;
1830        crate::metrics::record_sse_streaming_chunk("decrypt");
1831        emit(plain)?;
1832        cursor = ct_end;
1833    }
1834    if cursor != body.len() {
1835        return Err(SseError::ChunkFrameTruncated {
1836            what: "trailing bytes after declared chunk_count",
1837        });
1838    }
1839    Ok(())
1840}
1841
1842/// Sync back-compat path: decrypt every chunk and concatenate into
1843/// a single `Bytes`. Memory peak = full plaintext (defeats the
1844/// point of S4E5/S4E6 streaming, but useful for callers that already
1845/// need the whole body — e.g. server-side restream-rewrite paths or
1846/// unit tests). Accepts both S4E5 (legacy) and S4E6 (current) bodies.
1847fn decrypt_chunked_buffered(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1848    let hdr = parse_chunked_header(body)?;
1849    let mut out = Vec::with_capacity(hdr.chunk_size as usize * hdr.chunk_count as usize);
1850    walk_chunked(body, keyring, |chunk| {
1851        out.extend_from_slice(&chunk);
1852        Ok(())
1853    })?;
1854    Ok(Bytes::from(out))
1855}
1856
1857/// v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): stream-decrypt API for
1858/// chunked SSE-S4 bodies. Returns a [`futures::Stream`] that yields
1859/// one `Bytes` per chunk in order. Each chunk is emitted only after
1860/// AES-GCM tag verify succeeds, so the client never sees plaintext
1861/// bytes that haven't been authenticated. A failing chunk yields
1862/// its [`SseError::ChunkAuthFailed`] (with the chunk index) and ends
1863/// the stream — earlier chunks may already have left the gateway,
1864/// which matches the standard streaming-AEAD trade-off (operators
1865/// MUST alert on the audit log + metric, not rely on connection
1866/// close to guarantee atomicity).
1867///
1868/// Accepts either S4E5 (v0.8 #52, legacy) or S4E6 (v0.8.1 #57,
1869/// current) magic. Non-chunked magic surfaces as
1870/// [`SseError::BadMagic`] / [`SseError::ChunkFrameTruncated`] on
1871/// the first poll — the stream is "fail-fast" rather than "fall
1872/// through to S4E2 buffered decrypt", because the caller has
1873/// already dispatched on [`peek_magic`] by the time it hands a body
1874/// to this function.
1875///
1876/// `body` is owned by the returned stream so the caller doesn't
1877/// need to keep the bytes alive separately. The returned stream is
1878/// `'static` — the `keyring` borrow is consumed up front to extract
1879/// the per-frame key and build the AES cipher (which owns its key
1880/// material), so the caller's keyring may be dropped immediately.
1881pub fn decrypt_chunked_stream(
1882    body: bytes::Bytes,
1883    keyring: &SseKeyring,
1884) -> impl futures::Stream<Item = Result<Bytes, SseError>> + 'static {
1885    use futures::stream::{self, StreamExt};
1886
1887    // Cheap pre-validation: parse the header + look up the key
1888    // once, up front, so a malformed frame surfaces on the first
1889    // poll instead of being deferred behind the first-chunk loop.
1890    // The `keyring` borrow ends here — we extract the AES key into
1891    // the owned `Aes256Gcm` cipher, then store that in the stream
1892    // state.
1893    let prelude = (|| {
1894        let hdr = parse_chunked_header(&body)?;
1895        let key = keyring
1896            .get(hdr.key_id)
1897            .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1898        let cipher = Aes256Gcm::new(key.as_aes_key());
1899        Ok::<_, SseError>((hdr, cipher))
1900    })();
1901
1902    match prelude {
1903        Err(e) => stream::iter(std::iter::once(Err(e))).left_stream(),
1904        Ok((hdr, cipher)) => {
1905            let chunks_offset = hdr.chunks_offset;
1906            let state = ChunkedDecryptState {
1907                body,
1908                cipher,
1909                hdr,
1910                cursor: chunks_offset,
1911                next_index: 0,
1912            };
1913            stream::try_unfold(state, decrypt_next_chunk).right_stream()
1914        }
1915    }
1916}
1917
1918/// Per-stream state for [`decrypt_chunked_stream`]. Holds the owned
1919/// `body` (so the stream stays self-contained), the prepared
1920/// cipher, and the cursor position into the chunk array.
1921struct ChunkedDecryptState {
1922    body: bytes::Bytes,
1923    cipher: Aes256Gcm,
1924    hdr: ChunkedHeader,
1925    cursor: usize,
1926    next_index: u32,
1927}
1928
1929async fn decrypt_next_chunk(
1930    mut state: ChunkedDecryptState,
1931) -> Result<Option<(Bytes, ChunkedDecryptState)>, SseError> {
1932    if state.next_index >= state.hdr.chunk_count {
1933        // Final boundary check — anything past the declared
1934        // chunk_count would be a truncation / append attack.
1935        if state.cursor != state.body.len() {
1936            return Err(SseError::ChunkFrameTruncated {
1937                what: "trailing bytes after declared chunk_count",
1938            });
1939        }
1940        return Ok(None);
1941    }
1942    let i = state.next_index;
1943    let chunk_size = state.hdr.chunk_size as usize;
1944    if state.cursor + TAG_LEN > state.body.len() {
1945        return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1946    }
1947    let tag_off = state.cursor;
1948    let ct_off = tag_off + TAG_LEN;
1949    let is_last = i + 1 == state.hdr.chunk_count;
1950    let ct_len = if is_last {
1951        if ct_off > state.body.len() {
1952            return Err(SseError::ChunkFrameTruncated {
1953                what: "final chunk ciphertext",
1954            });
1955        }
1956        let remaining = state.body.len() - ct_off;
1957        if remaining > chunk_size {
1958            return Err(SseError::ChunkFrameTruncated {
1959                what: "trailing bytes after final chunk",
1960            });
1961        }
1962        remaining
1963    } else {
1964        chunk_size
1965    };
1966    let ct_end = ct_off + ct_len;
1967    if ct_end > state.body.len() {
1968        return Err(SseError::ChunkFrameTruncated {
1969            what: "chunk ciphertext",
1970        });
1971    }
1972    let mut tag = [0u8; TAG_LEN];
1973    tag.copy_from_slice(&state.body[tag_off..ct_off]);
1974    let ct = &state.body[ct_off..ct_end];
1975    let plain = decrypt_chunked_chunk(
1976        &state.cipher,
1977        i,
1978        state.hdr.chunk_count,
1979        state.hdr.key_id,
1980        &state.hdr.salt,
1981        &tag,
1982        ct,
1983    )?;
1984    crate::metrics::record_sse_streaming_chunk("decrypt");
1985    state.cursor = ct_end;
1986    state.next_index += 1;
1987    Ok(Some((plain, state)))
1988}
1989
1990/// v0.8.1 #57: build an S4E5 frame. Identical body structure to the
1991/// pre-#57 `encrypt_v2_chunked` (4-byte salt + S4E5 magic + V5
1992/// nonce/AAD); kept around purely so the back-compat-read tests can
1993/// synthesize a "v0.8.0 vintage" blob and prove the new gateway
1994/// still decrypts it.
1995#[cfg(test)]
1996fn encrypt_v2_chunked_s4e5_for_test(
1997    plaintext: &[u8],
1998    keyring: &SseKeyring,
1999    chunk_size: usize,
2000) -> Result<Bytes, SseError> {
2001    if chunk_size == 0 {
2002        return Err(SseError::ChunkSizeInvalid);
2003    }
2004    let (key_id, key) = keyring.active();
2005    let cipher = Aes256Gcm::new(key.as_aes_key());
2006    let mut salt = [0u8; 4];
2007    rand::rngs::OsRng.fill_bytes(&mut salt);
2008
2009    let chunk_count: u32 = if plaintext.is_empty() {
2010        1
2011    } else {
2012        plaintext
2013            .len()
2014            .div_ceil(chunk_size)
2015            .try_into()
2016            .expect("chunk_count overflows u32")
2017    };
2018
2019    let mut out = Vec::with_capacity(
2020        S4E5_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E5_PER_CHUNK_OVERHEAD),
2021    );
2022    out.extend_from_slice(SSE_MAGIC_V5);
2023    out.push(ALGO_AES_256_GCM);
2024    out.extend_from_slice(&key_id.to_be_bytes());
2025    out.push(0u8);
2026    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
2027    out.extend_from_slice(&chunk_count.to_be_bytes());
2028    out.extend_from_slice(&salt);
2029
2030    for i in 0..chunk_count {
2031        let off = (i as usize).saturating_mul(chunk_size);
2032        let end = off.saturating_add(chunk_size).min(plaintext.len());
2033        let chunk_pt: &[u8] = if off >= plaintext.len() {
2034            &[]
2035        } else {
2036            &plaintext[off..end]
2037        };
2038        let nonce_bytes = nonce_v5(&salt, i);
2039        let nonce = Nonce::from_slice(&nonce_bytes);
2040        let aad = aad_v5(i, chunk_count, key_id, &salt);
2041        let ct_with_tag = cipher
2042            .encrypt(
2043                nonce,
2044                Payload {
2045                    msg: chunk_pt,
2046                    aad: &aad,
2047                },
2048            )
2049            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
2050        let split = ct_with_tag.len() - TAG_LEN;
2051        let (ct, tag) = ct_with_tag.split_at(split);
2052        out.extend_from_slice(tag);
2053        out.extend_from_slice(ct);
2054    }
2055    Ok(Bytes::from(out))
2056}
2057
2058#[cfg(test)]
2059mod tests {
2060    use super::*;
2061
2062    fn key32(seed: u8) -> Arc<SseKey> {
2063        Arc::new(SseKey::from_bytes(&[seed; 32]).unwrap())
2064    }
2065
2066    fn keyring_single(seed: u8) -> SseKeyring {
2067        SseKeyring::new(1, key32(seed))
2068    }
2069
2070    #[test]
2071    fn roundtrip_basic_v1() {
2072        // back-compat single-key API — still works.
2073        let k = SseKey::from_bytes(&[7u8; 32]).unwrap();
2074        let pt = b"the quick brown fox jumps over the lazy dog";
2075        let ct = encrypt(&k, pt);
2076        assert!(looks_encrypted(&ct));
2077        assert_eq!(&ct[..4], SSE_MAGIC_V1);
2078        assert_eq!(ct[4], ALGO_AES_256_GCM);
2079        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2080        // decrypt via single-key keyring
2081        let kr = SseKeyring::new(1, Arc::new(k));
2082        let pt2 = decrypt(&ct, &kr).unwrap();
2083        assert_eq!(pt2.as_ref(), pt);
2084    }
2085
2086    #[test]
2087    fn s4e2_roundtrip_active_key() {
2088        let kr = keyring_single(7);
2089        let pt = b"S4E2 active-key roundtrip";
2090        let ct = encrypt_v2(pt, &kr);
2091        assert_eq!(&ct[..4], SSE_MAGIC_V2);
2092        assert_eq!(ct[4], ALGO_AES_256_GCM);
2093        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE");
2094        assert_eq!(ct[7], 0, "reserved byte");
2095        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
2096        assert!(looks_encrypted(&ct));
2097        let pt2 = decrypt(&ct, &kr).unwrap();
2098        assert_eq!(pt2.as_ref(), pt);
2099    }
2100
2101    #[test]
2102    fn decrypt_s4e1_via_active_only_keyring() {
2103        // v0.4 wrote S4E1 with key K; v0.5 keyring has K as the only
2104        // (active) key. Decrypt must succeed.
2105        let k_arc = key32(11);
2106        let legacy_ct = encrypt(&k_arc, b"v0.4 vintage object");
2107        assert_eq!(&legacy_ct[..4], SSE_MAGIC_V1);
2108        let kr = SseKeyring::new(1, Arc::clone(&k_arc));
2109        let plain = decrypt(&legacy_ct, &kr).unwrap();
2110        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2111    }
2112
2113    #[test]
2114    fn decrypt_s4e2_under_old_key_after_rotation() {
2115        // Rotation flow: object was encrypted under key id=1 when 1
2116        // was active. Operator rotates to active=2 and keeps 1 in the
2117        // keyring. The S4E2 body must still decrypt.
2118        let k1 = key32(1);
2119        let k2 = key32(2);
2120        let mut kr_old = SseKeyring::new(1, Arc::clone(&k1));
2121        let ct = encrypt_v2(b"old-rotation object", &kr_old);
2122        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2123
2124        // After rotation: active=2, but key 1 still in ring.
2125        kr_old.add(2, Arc::clone(&k2));
2126        let mut kr_new = SseKeyring::new(2, Arc::clone(&k2));
2127        kr_new.add(1, Arc::clone(&k1));
2128
2129        let plain = decrypt(&ct, &kr_new).unwrap();
2130        assert_eq!(plain.as_ref(), b"old-rotation object");
2131
2132        // And new PUTs go to id 2 (active).
2133        let new_ct = encrypt_v2(b"new-rotation object", &kr_new);
2134        assert_eq!(u16::from_be_bytes([new_ct[5], new_ct[6]]), 2);
2135        let plain_new = decrypt(&new_ct, &kr_new).unwrap();
2136        assert_eq!(plain_new.as_ref(), b"new-rotation object");
2137    }
2138
2139    #[test]
2140    fn s4e2_unknown_key_id_errors() {
2141        let kr = keyring_single(3); // only id=1 present
2142        let kr_other = SseKeyring::new(99, key32(3));
2143        let ct = encrypt_v2(b"x", &kr_other); // body claims key_id=99
2144        let err = decrypt(&ct, &kr).unwrap_err();
2145        assert!(
2146            matches!(err, SseError::KeyNotInKeyring { id: 99 }),
2147            "got {err:?}"
2148        );
2149    }
2150
2151    #[test]
2152    fn s4e2_tampered_key_id_fails_auth() {
2153        let kr = SseKeyring::new(1, key32(4));
2154        let mut kr_with_2 = kr.clone();
2155        kr_with_2.add(2, key32(5)); // a real but wrong key under id=2
2156        let mut ct = encrypt_v2(b"do not flip my key id", &kr).to_vec();
2157        // Flip key_id from 1 → 2 in the header. The keyring HAS a key
2158        // for 2, so the lookup succeeds — but AAD authenticates the
2159        // original key_id, so AES-GCM tag verification must fail.
2160        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
2161        ct[5] = 0;
2162        ct[6] = 2;
2163        let err = decrypt(&ct, &kr_with_2).unwrap_err();
2164        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2165    }
2166
2167    #[test]
2168    fn s4e2_tampered_ciphertext_fails() {
2169        let kr = SseKeyring::new(7, key32(9));
2170        let mut ct = encrypt_v2(b"secret message v2", &kr).to_vec();
2171        let last = ct.len() - 1;
2172        ct[last] ^= 0x01;
2173        let err = decrypt(&ct, &kr).unwrap_err();
2174        assert!(matches!(err, SseError::DecryptFailed));
2175    }
2176
2177    #[test]
2178    fn s4e2_tampered_algo_byte_fails() {
2179        let kr = SseKeyring::new(1, key32(2));
2180        let mut ct = encrypt_v2(b"hi", &kr).to_vec();
2181        ct[4] = 99;
2182        let err = decrypt(&ct, &kr).unwrap_err();
2183        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2184    }
2185
2186    #[test]
2187    fn wrong_key_fails_v1_via_keyring() {
2188        // S4E1 written under key K1; keyring has only K2 → DecryptFailed.
2189        let k1 = SseKey::from_bytes(&[1u8; 32]).unwrap();
2190        let ct = encrypt(&k1, b"secret");
2191        let kr_wrong = SseKeyring::new(1, Arc::new(SseKey::from_bytes(&[2u8; 32]).unwrap()));
2192        let err = decrypt(&ct, &kr_wrong).unwrap_err();
2193        assert!(matches!(err, SseError::DecryptFailed));
2194    }
2195
2196    #[test]
2197    fn rejects_short_body() {
2198        let kr = SseKeyring::new(1, key32(1));
2199        let err = decrypt(b"short", &kr).unwrap_err();
2200        assert!(matches!(err, SseError::TooShort { got: 5 }));
2201    }
2202
2203    #[test]
2204    fn looks_encrypted_passthrough_returns_false() {
2205        // S4F2 frame magic, NOT S4E1 / S4E2 — must not be confused.
2206        let f2 = b"S4F2\x01\x00\x00\x00........................................";
2207        assert!(!looks_encrypted(f2));
2208        assert!(!looks_encrypted(b""));
2209    }
2210
2211    #[test]
2212    fn looks_encrypted_detects_both_v1_and_v2() {
2213        let kr = SseKeyring::new(1, key32(8));
2214        let v1 = encrypt(&SseKey::from_bytes(&[8u8; 32]).unwrap(), b"x");
2215        let v2 = encrypt_v2(b"x", &kr);
2216        assert!(looks_encrypted(&v1));
2217        assert!(looks_encrypted(&v2));
2218    }
2219
2220    #[test]
2221    fn key_from_hex_string() {
2222        let bad =
2223            SseKey::from_bytes(b"0102030405060708090a0b0c0d0e0f10111213141516171819202122232425")
2224                .unwrap_err();
2225        assert!(matches!(bad, SseError::BadKeyLength { .. }));
2226        let good = b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2227        let _ = SseKey::from_bytes(good).expect("64-char hex should parse");
2228    }
2229
2230    #[test]
2231    fn encrypt_v2_uses_random_nonce() {
2232        let kr = SseKeyring::new(1, key32(3));
2233        let pt = b"deterministic input";
2234        let a = encrypt_v2(pt, &kr);
2235        let b = encrypt_v2(pt, &kr);
2236        assert_ne!(a, b, "nonce must be random per-call");
2237    }
2238
2239    #[test]
2240    fn keyring_active_and_get() {
2241        let k1 = key32(1);
2242        let k2 = key32(2);
2243        let mut kr = SseKeyring::new(1, Arc::clone(&k1));
2244        kr.add(2, Arc::clone(&k2));
2245        let (id, active) = kr.active();
2246        assert_eq!(id, 1);
2247        assert_eq!(active.bytes, [1u8; 32]);
2248        assert!(kr.get(2).is_some());
2249        assert!(kr.get(3).is_none());
2250    }
2251
2252    // -----------------------------------------------------------------
2253    // v0.5 #27 — SSE-C (customer-provided key, S4E3 frame) tests
2254    // -----------------------------------------------------------------
2255
2256    use base64::Engine as _;
2257
2258    fn cust_key(seed: u8) -> CustomerKeyMaterial {
2259        let key = [seed; KEY_LEN];
2260        let key_md5 = compute_key_md5(&key);
2261        CustomerKeyMaterial { key, key_md5 }
2262    }
2263
2264    #[test]
2265    fn s4e3_roundtrip_happy_path() {
2266        let m = cust_key(42);
2267        let pt = b"top-secret SSE-C payload";
2268        let ct = encrypt_with_source(
2269            pt,
2270            SseSource::CustomerKey {
2271                key: &m.key,
2272                key_md5: &m.key_md5,
2273            },
2274        );
2275        // Frame inspection.
2276        assert_eq!(&ct[..4], SSE_MAGIC_V3);
2277        assert_eq!(ct[4], ALGO_AES_256_GCM);
2278        assert_eq!(&ct[5..5 + KEY_MD5_LEN], &m.key_md5);
2279        assert_eq!(ct.len(), SSE_HEADER_BYTES_V3 + pt.len());
2280        assert!(looks_encrypted(&ct));
2281        // Decrypt round-trip.
2282        let plain = decrypt(
2283            &ct,
2284            SseSource::CustomerKey {
2285                key: &m.key,
2286                key_md5: &m.key_md5,
2287            },
2288        )
2289        .unwrap();
2290        assert_eq!(plain.as_ref(), pt);
2291        // And via the From impl on &CustomerKeyMaterial.
2292        let plain2 = decrypt(&ct, &m).unwrap();
2293        assert_eq!(plain2.as_ref(), pt);
2294    }
2295
2296    #[test]
2297    fn s4e3_wrong_key_yields_wrong_customer_key_error() {
2298        let m = cust_key(1);
2299        let other = cust_key(2);
2300        let ct = encrypt_with_source(b"payload", (&m).into());
2301        let err = decrypt(
2302            &ct,
2303            SseSource::CustomerKey {
2304                key: &other.key,
2305                key_md5: &other.key_md5,
2306            },
2307        )
2308        .unwrap_err();
2309        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2310    }
2311
2312    #[test]
2313    fn s4e3_tampered_stored_md5_is_caught() {
2314        // Attacker rewrites the stored MD5 to match a key they know.
2315        // Even though the supplied (attacker) key matches the rewritten
2316        // MD5, AES-GCM authenticates the ORIGINAL md5 via AAD, so the
2317        // tag check fails. Surface: WrongCustomerKey if the supplied
2318        // md5 != stored md5 (this test), or DecryptFailed if attacker
2319        // also rewrites their supplied md5 to match.
2320        let m = cust_key(7);
2321        let mut ct = encrypt_with_source(b"victim payload", (&m).into()).to_vec();
2322        // Flip a byte in the stored fingerprint.
2323        ct[5] ^= 0x55;
2324        // Client supplies the original (unmodified) key + md5.
2325        let err = decrypt(
2326            &ct,
2327            SseSource::CustomerKey {
2328                key: &m.key,
2329                key_md5: &m.key_md5,
2330            },
2331        )
2332        .unwrap_err();
2333        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
2334    }
2335
2336    #[test]
2337    fn s4e3_tampered_md5_with_matching_supplied_md5_fails_aead() {
2338        // Both stored md5 AND supplied md5 are flipped to the same bogus
2339        // value. The fingerprint check passes (they match) but AAD
2340        // authenticates the *original* md5, so AES-GCM fails.
2341        let m = cust_key(3);
2342        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2343        ct[5] ^= 0xFF;
2344        let mut bogus_md5 = m.key_md5;
2345        bogus_md5[0] ^= 0xFF;
2346        let err = decrypt(
2347            &ct,
2348            SseSource::CustomerKey {
2349                key: &m.key,
2350                key_md5: &bogus_md5,
2351            },
2352        )
2353        .unwrap_err();
2354        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2355    }
2356
2357    #[test]
2358    fn s4e3_tampered_ciphertext_fails_aead() {
2359        let m = cust_key(8);
2360        let mut ct = encrypt_with_source(b"sealed message", (&m).into()).to_vec();
2361        let last = ct.len() - 1;
2362        ct[last] ^= 0x01;
2363        let err = decrypt(&ct, &m).unwrap_err();
2364        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2365    }
2366
2367    #[test]
2368    fn s4e3_tampered_algo_byte_rejected() {
2369        let m = cust_key(9);
2370        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2371        ct[4] = 99;
2372        let err = decrypt(&ct, &m).unwrap_err();
2373        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2374    }
2375
2376    #[test]
2377    fn s4e3_uses_random_nonce() {
2378        let m = cust_key(10);
2379        let a = encrypt_with_source(b"deterministic input", (&m).into());
2380        let b = encrypt_with_source(b"deterministic input", (&m).into());
2381        assert_ne!(a, b, "nonce must be random per-call");
2382    }
2383
2384    #[test]
2385    fn parse_customer_key_headers_happy_path() {
2386        let key = [11u8; KEY_LEN];
2387        let md5 = compute_key_md5(&key);
2388        let key_b64 = base64::engine::general_purpose::STANDARD.encode(key);
2389        let md5_b64 = base64::engine::general_purpose::STANDARD.encode(md5);
2390        let m = parse_customer_key_headers("AES256", &key_b64, &md5_b64).unwrap();
2391        assert_eq!(m.key, key);
2392        assert_eq!(m.key_md5, md5);
2393    }
2394
2395    #[test]
2396    fn parse_customer_key_headers_rejects_wrong_algorithm() {
2397        let key = [1u8; KEY_LEN];
2398        let md5 = compute_key_md5(&key);
2399        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2400        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2401        let err = parse_customer_key_headers("AES128", &kb, &mb).unwrap_err();
2402        assert!(
2403            matches!(err, SseError::CustomerKeyAlgorithmUnsupported { ref algo } if algo == "AES128"),
2404            "got {err:?}"
2405        );
2406        // Lowercase variant still rejected (AWS S3 accepts only "AES256").
2407        let err2 = parse_customer_key_headers("aes256", &kb, &mb).unwrap_err();
2408        assert!(
2409            matches!(err2, SseError::CustomerKeyAlgorithmUnsupported { .. }),
2410            "got {err2:?}"
2411        );
2412    }
2413
2414    #[test]
2415    fn parse_customer_key_headers_rejects_wrong_key_length() {
2416        let short_key = vec![5u8; 16]; // half-length AES key
2417        let md5 = compute_key_md5(&short_key);
2418        let kb = base64::engine::general_purpose::STANDARD.encode(&short_key);
2419        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2420        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2421        assert!(
2422            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("key length")),
2423            "got {err:?}"
2424        );
2425    }
2426
2427    #[test]
2428    fn parse_customer_key_headers_rejects_wrong_md5_length() {
2429        let key = [3u8; KEY_LEN];
2430        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2431        // Truncated MD5 (15 bytes instead of 16).
2432        let bad_md5 = vec![0u8; 15];
2433        let mb = base64::engine::general_purpose::STANDARD.encode(bad_md5);
2434        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2435        assert!(
2436            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 length")),
2437            "got {err:?}"
2438        );
2439    }
2440
2441    #[test]
2442    fn parse_customer_key_headers_rejects_md5_mismatch() {
2443        let key = [4u8; KEY_LEN];
2444        let other = [5u8; KEY_LEN];
2445        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2446        let wrong_md5 = compute_key_md5(&other);
2447        let mb = base64::engine::general_purpose::STANDARD.encode(wrong_md5);
2448        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2449        assert!(
2450            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 does not match")),
2451            "got {err:?}"
2452        );
2453    }
2454
2455    #[test]
2456    fn parse_customer_key_headers_rejects_bad_base64() {
2457        let valid_key = [0u8; KEY_LEN];
2458        let md5 = compute_key_md5(&valid_key);
2459        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2460        let err = parse_customer_key_headers("AES256", "!!!not-base64!!!", &mb).unwrap_err();
2461        assert!(
2462            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2463            "got {err:?}"
2464        );
2465        // Bad MD5 base64.
2466        let kb = base64::engine::general_purpose::STANDARD.encode(valid_key);
2467        let err2 = parse_customer_key_headers("AES256", &kb, "??not-base64??").unwrap_err();
2468        assert!(
2469            matches!(err2, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2470            "got {err2:?}"
2471        );
2472    }
2473
2474    #[test]
2475    fn parse_customer_key_headers_trims_whitespace() {
2476        // S3 SDKs sometimes pad headers with trailing newlines.
2477        let key = [12u8; KEY_LEN];
2478        let md5 = compute_key_md5(&key);
2479        let kb = format!(
2480            "  {}\n",
2481            base64::engine::general_purpose::STANDARD.encode(key)
2482        );
2483        let mb = format!(
2484            "\t{}  ",
2485            base64::engine::general_purpose::STANDARD.encode(md5)
2486        );
2487        let m = parse_customer_key_headers("AES256", &kb, &mb).unwrap();
2488        assert_eq!(m.key, key);
2489    }
2490
2491    // -----------------------------------------------------------------
2492    // Back-compat + cross-source mixing
2493    // -----------------------------------------------------------------
2494
2495    #[test]
2496    fn back_compat_decrypt_s4e1_with_keyring_source() {
2497        let k = key32(33);
2498        let legacy_ct = encrypt(&k, b"v0.4 vintage object");
2499        let kr = SseKeyring::new(1, Arc::clone(&k));
2500        // Both call styles must work — `&kr` (back-compat) and
2501        // `SseSource::Keyring(&kr)` (explicit).
2502        let plain = decrypt(&legacy_ct, &kr).unwrap();
2503        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2504        let plain2 = decrypt(&legacy_ct, SseSource::Keyring(&kr)).unwrap();
2505        assert_eq!(plain2.as_ref(), b"v0.4 vintage object");
2506    }
2507
2508    #[test]
2509    fn back_compat_decrypt_s4e2_with_keyring_source() {
2510        let kr = keyring_single(34);
2511        let ct = encrypt_v2(b"v0.5 #29 object", &kr);
2512        let plain = decrypt(&ct, &kr).unwrap();
2513        assert_eq!(plain.as_ref(), b"v0.5 #29 object");
2514        // encrypt_with_source(Keyring) should produce the same wire
2515        // format (S4E2).
2516        let ct2 = encrypt_with_source(b"v0.5 #29 object", SseSource::Keyring(&kr));
2517        assert_eq!(&ct2[..4], SSE_MAGIC_V2);
2518        let plain2 = decrypt(&ct2, &kr).unwrap();
2519        assert_eq!(plain2.as_ref(), b"v0.5 #29 object");
2520    }
2521
2522    #[test]
2523    fn s4e2_blob_with_customer_key_source_is_rejected() {
2524        // An object stored with SSE-S4 (S4E2) but a client sending
2525        // SSE-C headers on the GET — this is a misuse, surface as
2526        // CustomerKeyUnexpected so service.rs can return 400.
2527        let kr = keyring_single(50);
2528        let ct = encrypt_v2(b"server-managed object", &kr);
2529        let m = cust_key(99);
2530        let err = decrypt(
2531            &ct,
2532            SseSource::CustomerKey {
2533                key: &m.key,
2534                key_md5: &m.key_md5,
2535            },
2536        )
2537        .unwrap_err();
2538        assert!(matches!(err, SseError::CustomerKeyUnexpected), "got {err:?}");
2539    }
2540
2541    #[test]
2542    fn s4e3_blob_with_keyring_source_is_rejected() {
2543        // Inverse: object is SSE-C (S4E3) but client forgot to send
2544        // SSE-C headers. Service.rs should map this to 400.
2545        let m = cust_key(60);
2546        let ct = encrypt_with_source(b"customer-key object", (&m).into());
2547        let kr = keyring_single(60);
2548        let err = decrypt(&ct, &kr).unwrap_err();
2549        assert!(matches!(err, SseError::CustomerKeyRequired), "got {err:?}");
2550    }
2551
2552    #[test]
2553    fn looks_encrypted_detects_s4e3() {
2554        let m = cust_key(13);
2555        let ct = encrypt_with_source(b"x", (&m).into());
2556        assert!(looks_encrypted(&ct));
2557    }
2558
2559    #[test]
2560    fn s4e3_rejects_short_body() {
2561        // 36 bytes passes the looks_encrypted gate but is shorter than
2562        // S4E3's 49-byte header.
2563        let mut short = Vec::new();
2564        short.extend_from_slice(SSE_MAGIC_V3);
2565        short.push(ALGO_AES_256_GCM);
2566        // Padding to 36 bytes (SSE_HEADER_BYTES) so the outer length
2567        // check passes but the S4E3 inner check fails.
2568        short.extend_from_slice(&[0u8; SSE_HEADER_BYTES - 5]);
2569        assert_eq!(short.len(), SSE_HEADER_BYTES);
2570        let m = cust_key(1);
2571        let err = decrypt(
2572            &short,
2573            SseSource::CustomerKey {
2574                key: &m.key,
2575                key_md5: &m.key_md5,
2576            },
2577        )
2578        .unwrap_err();
2579        assert!(matches!(err, SseError::TooShort { .. }), "got {err:?}");
2580    }
2581
2582    #[test]
2583    fn customer_key_material_debug_redacts_key() {
2584        let m = cust_key(99);
2585        let s = format!("{m:?}");
2586        assert!(s.contains("redacted"));
2587        assert!(!s.contains(&format!("{:?}", m.key.as_slice())));
2588    }
2589
2590    #[test]
2591    fn constant_time_eq_basic() {
2592        assert!(constant_time_eq(b"abc", b"abc"));
2593        assert!(!constant_time_eq(b"abc", b"abd"));
2594        assert!(!constant_time_eq(b"abc", b"abcd"));
2595        assert!(constant_time_eq(b"", b""));
2596    }
2597
2598    #[test]
2599    fn compute_key_md5_known_vector() {
2600        // Empty input MD5 is known: d41d8cd98f00b204e9800998ecf8427e.
2601        let got = compute_key_md5(b"");
2602        let expected_hex = "d41d8cd98f00b204e9800998ecf8427e";
2603        assert_eq!(hex_lower(&got), expected_hex);
2604    }
2605
2606    // -----------------------------------------------------------------
2607    // v0.5 #28 — SSE-KMS envelope (S4E4) tests
2608    // -----------------------------------------------------------------
2609
2610    use crate::kms::{KmsBackend, LocalKms};
2611    use std::collections::HashMap;
2612    use std::path::PathBuf;
2613
2614    fn local_kms_with(key_ids: &[(&str, [u8; 32])]) -> LocalKms {
2615        let mut keks: HashMap<String, [u8; 32]> = HashMap::new();
2616        for (id, k) in key_ids {
2617            keks.insert((*id).to_string(), *k);
2618        }
2619        LocalKms::from_keks(PathBuf::from("/tmp/none"), keks)
2620    }
2621
2622    #[tokio::test]
2623    async fn s4e4_roundtrip_via_local_kms() {
2624        let kms = local_kms_with(&[("alpha", [42u8; 32])]);
2625        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2626        let mut dek = [0u8; 32];
2627        dek.copy_from_slice(&dek_vec);
2628        let pt = b"SSE-KMS envelope payload across the S4E4 frame";
2629        let ct = encrypt_with_source(
2630            pt,
2631            SseSource::Kms {
2632                dek: &dek,
2633                wrapped: &wrapped,
2634            },
2635        );
2636        // Frame inspection.
2637        assert_eq!(&ct[..4], SSE_MAGIC_V4);
2638        assert_eq!(ct[4], ALGO_AES_256_GCM);
2639        let key_id_len = ct[5] as usize;
2640        assert_eq!(key_id_len, "alpha".len());
2641        assert_eq!(&ct[6..6 + key_id_len], b"alpha");
2642        // peek_magic + looks_encrypted both recognise S4E4.
2643        assert!(looks_encrypted(&ct));
2644        assert_eq!(peek_magic(&ct), Some("S4E4"));
2645        // Async decrypt round-trip.
2646        let plain = decrypt_with_kms(&ct, &kms).await.unwrap();
2647        assert_eq!(plain.as_ref(), pt);
2648    }
2649
2650    #[tokio::test]
2651    async fn s4e4_tampered_key_id_fails_aead() {
2652        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2653        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2654        let mut dek = [0u8; 32];
2655        dek.copy_from_slice(&dek_vec);
2656        let mut ct = encrypt_with_source(
2657            b"do not redirect",
2658            SseSource::Kms {
2659                dek: &dek,
2660                wrapped: &wrapped,
2661            },
2662        )
2663        .to_vec();
2664        // Flip the key_id from "alpha" to "betaa" by changing the
2665        // first byte of the key_id field. The forged id "bltha" is
2666        // not in the KMS, so unwrap fails with KeyNotFound surfaced
2667        // through KmsBackend(KmsError::KeyNotFound).
2668        let key_id_off = 6;
2669        ct[key_id_off] = b'b';
2670        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2671        assert!(
2672            matches!(
2673                err,
2674                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2675                    | SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2676            ),
2677            "got {err:?}"
2678        );
2679    }
2680
2681    #[tokio::test]
2682    async fn s4e4_tampered_key_id_to_real_other_id_still_fails() {
2683        // Wrap under "alpha" but rewrite the stored key_id to "beta"
2684        // (which IS in the KMS). KmsBackend will try to unwrap with
2685        // beta's KEK and AAD = "beta", but the wrapped bytes were
2686        // produced with alpha's KEK + AAD = "alpha", so the local
2687        // KMS unwrap fails with UnwrapFailed.
2688        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2689        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2690        let mut dek = [0u8; 32];
2691        dek.copy_from_slice(&dek_vec);
2692        let mut ct = encrypt_with_source(
2693            b"redirect attempt",
2694            SseSource::Kms {
2695                dek: &dek,
2696                wrapped: &wrapped,
2697            },
2698        )
2699        .to_vec();
2700        // Both "alpha" and "beta" are 5 chars long so the rewrite
2701        // doesn't shift any other field offsets.
2702        let key_id_off = 6;
2703        ct[key_id_off..key_id_off + 5].copy_from_slice(b"beta_");
2704        // Trim back to 4-byte "beta" by also shrinking the length
2705        // prefix would change downstream offsets — instead pad the
2706        // forged id to keep length stable. This mirrors the realistic
2707        // tampering surface (attacker can flip bytes but not change
2708        // the on-disk layout). The KMS now sees key_id "beta_" which
2709        // is unknown → KeyNotFound.
2710        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2711        assert!(
2712            matches!(
2713                err,
2714                SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2715            ),
2716            "got {err:?}"
2717        );
2718    }
2719
2720    #[tokio::test]
2721    async fn s4e4_tampered_wrapped_dek_fails_unwrap() {
2722        let kms = local_kms_with(&[("k", [3u8; 32])]);
2723        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2724        let mut dek = [0u8; 32];
2725        dek.copy_from_slice(&dek_vec);
2726        let mut ct = encrypt_with_source(
2727            b"target body",
2728            SseSource::Kms {
2729                dek: &dek,
2730                wrapped: &wrapped,
2731            },
2732        )
2733        .to_vec();
2734        // Locate the wrapped_dek_len + wrapped_dek field and flip a
2735        // byte in the middle of the wrapped DEK. AES-GCM auth on the
2736        // wrap fails → KmsBackend(UnwrapFailed).
2737        let key_id_len = ct[5] as usize;
2738        let wrapped_len_off = 6 + key_id_len;
2739        let wrapped_off = wrapped_len_off + 4;
2740        let mid = wrapped_off + (wrapped.ciphertext.len() / 2);
2741        ct[mid] ^= 0xFF;
2742        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2743        assert!(
2744            matches!(
2745                err,
2746                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2747            ),
2748            "got {err:?}"
2749        );
2750    }
2751
2752    #[tokio::test]
2753    async fn s4e4_tampered_ciphertext_fails_aead() {
2754        let kms = local_kms_with(&[("k", [4u8; 32])]);
2755        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2756        let mut dek = [0u8; 32];
2757        dek.copy_from_slice(&dek_vec);
2758        let mut ct = encrypt_with_source(
2759            b"sealed body",
2760            SseSource::Kms {
2761                dek: &dek,
2762                wrapped: &wrapped,
2763            },
2764        )
2765        .to_vec();
2766        let last = ct.len() - 1;
2767        ct[last] ^= 0x01;
2768        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2769        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2770    }
2771
2772    #[tokio::test]
2773    async fn s4e4_uses_random_nonce_and_dek_per_put() {
2774        let kms = local_kms_with(&[("k", [5u8; 32])]);
2775        // Two PUTs of the same plaintext under the same KEK must
2776        // produce different ciphertexts (fresh DEK + fresh nonce).
2777        let (dek1_vec, wrapped1) = kms.generate_dek("k").await.unwrap();
2778        let (dek2_vec, wrapped2) = kms.generate_dek("k").await.unwrap();
2779        let mut dek1 = [0u8; 32];
2780        dek1.copy_from_slice(&dek1_vec);
2781        let mut dek2 = [0u8; 32];
2782        dek2.copy_from_slice(&dek2_vec);
2783        let pt = b"deterministic input";
2784        let a = encrypt_with_source(
2785            pt,
2786            SseSource::Kms {
2787                dek: &dek1,
2788                wrapped: &wrapped1,
2789            },
2790        );
2791        let b = encrypt_with_source(
2792            pt,
2793            SseSource::Kms {
2794                dek: &dek2,
2795                wrapped: &wrapped2,
2796            },
2797        );
2798        assert_ne!(a, b);
2799        // Both still decrypt round-trip.
2800        let plain_a = decrypt_with_kms(&a, &kms).await.unwrap();
2801        let plain_b = decrypt_with_kms(&b, &kms).await.unwrap();
2802        assert_eq!(plain_a.as_ref(), pt);
2803        assert_eq!(plain_b.as_ref(), pt);
2804    }
2805
2806    #[tokio::test]
2807    async fn s4e4_sync_decrypt_returns_kms_async_required() {
2808        // The whole point of KmsAsyncRequired: passing an S4E4 body
2809        // to the sync `decrypt` function must surface a distinct
2810        // error so service.rs's GET path notices the bug rather than
2811        // returning a generic "wrong source" 400.
2812        let kms = local_kms_with(&[("k", [6u8; 32])]);
2813        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2814        let mut dek = [0u8; 32];
2815        dek.copy_from_slice(&dek_vec);
2816        let ct = encrypt_with_source(
2817            b"async only",
2818            SseSource::Kms {
2819                dek: &dek,
2820                wrapped: &wrapped,
2821            },
2822        );
2823        // Try via Keyring source (the default sync path).
2824        let kr = SseKeyring::new(1, key32(0));
2825        let err = decrypt(&ct, &kr).unwrap_err();
2826        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2827    }
2828
2829    #[test]
2830    fn back_compat_s4e1_e2_e3_still_decrypt_via_sync() {
2831        // After adding S4E4, the sync `decrypt` path must still
2832        // handle every legacy frame variant unchanged.
2833        let k = key32(7);
2834        let v1 = encrypt(&k, b"v0.4 vintage");
2835        let kr = SseKeyring::new(1, Arc::clone(&k));
2836        assert_eq!(decrypt(&v1, &kr).unwrap().as_ref(), b"v0.4 vintage");
2837
2838        let v2 = encrypt_v2(b"v0.5 #29 vintage", &kr);
2839        assert_eq!(
2840            decrypt(&v2, &kr).unwrap().as_ref(),
2841            b"v0.5 #29 vintage"
2842        );
2843
2844        let m = cust_key(7);
2845        let v3 = encrypt_with_source(b"v0.5 #27 vintage", (&m).into());
2846        assert_eq!(
2847            decrypt(&v3, &m).unwrap().as_ref(),
2848            b"v0.5 #27 vintage"
2849        );
2850    }
2851
2852    #[test]
2853    fn peek_magic_distinguishes_all_variants() {
2854        // S4E1 / S4E2 / S4E3 — built from real encrypts so the
2855        // length gate also passes.
2856        let k = key32(9);
2857        let v1 = encrypt(&k, b"x");
2858        assert_eq!(peek_magic(&v1), Some("S4E1"));
2859        let kr = SseKeyring::new(1, Arc::clone(&k));
2860        let v2 = encrypt_v2(b"x", &kr);
2861        assert_eq!(peek_magic(&v2), Some("S4E2"));
2862        let m = cust_key(9);
2863        let v3 = encrypt_with_source(b"x", (&m).into());
2864        assert_eq!(peek_magic(&v3), Some("S4E3"));
2865        // Synthetic S4E4 magic with enough trailing bytes to clear
2866        // the 36-byte length gate. peek_magic does NOT validate the
2867        // S4E4 inner header, just the magic — that's the contract
2868        // (cheap dispatch signal).
2869        let mut v4 = Vec::new();
2870        v4.extend_from_slice(SSE_MAGIC_V4);
2871        v4.extend_from_slice(&[0u8; 40]);
2872        assert_eq!(peek_magic(&v4), Some("S4E4"));
2873        // Unknown magic / too-short input → None.
2874        assert!(peek_magic(b"NOPE").is_none());
2875        assert!(peek_magic(b"short").is_none());
2876        assert!(peek_magic(&[0u8; 100]).is_none());
2877    }
2878
2879    #[tokio::test]
2880    async fn s4e4_truncated_frame_errors_cleanly() {
2881        // Truncate to less than the minimum header. Must surface
2882        // KmsFrameTooShort, not panic, not return BadMagic.
2883        let truncated = b"S4E4\x01\x05hi";
2884        let kms = local_kms_with(&[("k", [1u8; 32])]);
2885        let err = decrypt_with_kms(truncated, &kms).await.unwrap_err();
2886        assert!(
2887            matches!(err, SseError::KmsFrameTooShort { .. }),
2888            "got {err:?}"
2889        );
2890    }
2891
2892    #[tokio::test]
2893    async fn s4e4_oob_key_id_len_errors() {
2894        // Build a body that claims key_id_len = 200 but only has 4
2895        // bytes after the length prefix. parse_s4e4_header must
2896        // refuse with KmsFrameFieldOob, not slice-panic.
2897        let mut body = Vec::new();
2898        body.extend_from_slice(SSE_MAGIC_V4);
2899        body.push(ALGO_AES_256_GCM);
2900        body.push(200u8); // key_id_len
2901        // Remaining bytes < 200; pad to clear the looks_encrypted
2902        // floor (36 bytes) but stay short of the claimed key_id +
2903        // wrapped_dek_len + nonce + tag layout.
2904        body.extend_from_slice(&[0u8; 50]);
2905        let kms = local_kms_with(&[("k", [1u8; 32])]);
2906        let err = decrypt_with_kms(&body, &kms).await.unwrap_err();
2907        assert!(
2908            matches!(err, SseError::KmsFrameFieldOob { .. }),
2909            "got {err:?}"
2910        );
2911    }
2912
2913    #[tokio::test]
2914    async fn s4e4_via_keyring_source_into_sync_decrypt_is_kms_async_required() {
2915        // S4E4 + Keyring source: sync decrypt sees the S4E4 magic
2916        // first and returns KmsAsyncRequired regardless of source —
2917        // the source mismatch never gets a chance to surface, which
2918        // is the right behaviour (caller's bug is "didn't peek
2919        // magic" not "wrong source").
2920        let kms = local_kms_with(&[("k", [9u8; 32])]);
2921        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2922        let mut dek = [0u8; 32];
2923        dek.copy_from_slice(&dek_vec);
2924        let ct = encrypt_with_source(
2925            b"x",
2926            SseSource::Kms {
2927                dek: &dek,
2928                wrapped: &wrapped,
2929            },
2930        );
2931        let m = cust_key(1);
2932        let err = decrypt(&ct, &m).unwrap_err();
2933        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2934    }
2935
2936    #[tokio::test]
2937    async fn s4e4_looks_encrypted_passthrough_returns_false_for_synthetic() {
2938        // S4F4 (note F not E) must NOT be confused with S4E4.
2939        let mut not_s4e4 = Vec::new();
2940        not_s4e4.extend_from_slice(b"S4F4");
2941        not_s4e4.extend_from_slice(&[0u8; 60]);
2942        assert!(!looks_encrypted(&not_s4e4));
2943        assert_eq!(peek_magic(&not_s4e4), None);
2944    }
2945
2946    #[tokio::test]
2947    async fn s4e4_aad_length_prefix_prevents_byte_shifting() {
2948        // Constructing an S4E4 body where the wrapped_dek_len is
2949        // shrunk by N bytes and the same N bytes are prepended to
2950        // the key_id-equivalent area would, without length-prefixed
2951        // AAD, produce the same AAD bytestream. Verify our AAD
2952        // includes the length prefixes by tampering with
2953        // wrapped_dek_len and confirming AES-GCM auth fails.
2954        let kms = local_kms_with(&[("kk", [11u8; 32])]);
2955        let (dek_vec, wrapped) = kms.generate_dek("kk").await.unwrap();
2956        let mut dek = [0u8; 32];
2957        dek.copy_from_slice(&dek_vec);
2958        let mut ct = encrypt_with_source(
2959            b"length-shift defense",
2960            SseSource::Kms {
2961                dek: &dek,
2962                wrapped: &wrapped,
2963            },
2964        )
2965        .to_vec();
2966        let key_id_len = ct[5] as usize;
2967        let wrapped_len_off = 6 + key_id_len;
2968        // Shrink wrapped_dek_len by 1. parse_s4e4_header now reads a
2969        // shorter wrapped_dek and a different nonce/tag/ciphertext
2970        // alignment — KMS unwrap fails OR AES-GCM fails OR frame
2971        // bounds reject. All three surface as auditable errors;
2972        // none should reach a successful decrypt.
2973        let original_len = u32::from_be_bytes([
2974            ct[wrapped_len_off],
2975            ct[wrapped_len_off + 1],
2976            ct[wrapped_len_off + 2],
2977            ct[wrapped_len_off + 3],
2978        ]);
2979        let new_len = (original_len - 1).to_be_bytes();
2980        ct[wrapped_len_off..wrapped_len_off + 4].copy_from_slice(&new_len);
2981        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2982        // Acceptable failure modes: unwrap fail (truncated wrapped
2983        // DEK), AES-GCM fail (shifted nonce/tag/AAD), or frame bounds.
2984        assert!(
2985            matches!(
2986                err,
2987                SseError::KmsBackend(_)
2988                    | SseError::DecryptFailed
2989                    | SseError::KmsFrameFieldOob { .. }
2990                    | SseError::KmsFrameTooShort { .. }
2991            ),
2992            "got {err:?}"
2993        );
2994    }
2995
2996    // -----------------------------------------------------------------------
2997    // v0.8 #52: S4E5 chunked SSE-S4 — encrypt_v2_chunked / decrypt_chunked_stream
2998    // -----------------------------------------------------------------------
2999
3000    use futures::StreamExt;
3001
3002    /// Drain a chunked-decrypt stream into a `Vec<Bytes>` for assertion.
3003    /// Surfaces the first error verbatim (so tests can match on it).
3004    async fn collect_chunks(
3005        s: impl futures::Stream<Item = Result<Bytes, SseError>>,
3006    ) -> Result<Vec<Bytes>, SseError> {
3007        let mut out = Vec::new();
3008        let mut s = std::pin::pin!(s);
3009        while let Some(item) = s.next().await {
3010            out.push(item?);
3011        }
3012        Ok(out)
3013    }
3014
3015    #[test]
3016    fn s4e6_encrypt_layout_10mb_at_1mib() {
3017        // v0.8.1 #57: encrypt_v2_chunked now emits S4E6 (24-byte
3018        // header, 8-byte salt). 10 MB plaintext at 1 MiB chunk
3019        // size → magic "S4E6", chunk_count=10, header bytes line
3020        // up to the documented 24 + 10 * 16 + 10 MB layout.
3021        let kr = keyring_single(0x42);
3022        let chunk_size = 1024 * 1024;
3023        let pt_len = 10 * 1024 * 1024;
3024        let pt = vec![0xAB_u8; pt_len];
3025        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).expect("encrypt ok");
3026        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new PUTs emit S4E6 (v0.8.1 #57)");
3027        assert_eq!(ct[4], ALGO_AES_256_GCM);
3028        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE = active id");
3029        assert_eq!(ct[7], 0, "reserved must be 0");
3030        assert_eq!(
3031            u32::from_be_bytes([ct[8], ct[9], ct[10], ct[11]]),
3032            chunk_size as u32,
3033            "chunk_size BE",
3034        );
3035        assert_eq!(
3036            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3037            10,
3038            "chunk_count BE — 10 MiB / 1 MiB = 10 (no remainder)",
3039        );
3040        // Salt now 8 bytes — verify the slice exists and isn't all
3041        // zeros (defensive: catches a stuck PRNG that would leave
3042        // the salt array uninitialized).
3043        assert_eq!(&ct[16..24].len(), &8, "S4E6 salt slot is 8 bytes");
3044        assert_ne!(&ct[16..24], &[0u8; 8], "S4E6 salt must be random, not zeros");
3045        assert_eq!(
3046            ct.len(),
3047            S4E6_HEADER_BYTES + 10 * S4E6_PER_CHUNK_OVERHEAD + pt_len,
3048            "total = header (24) + 10 tags + plaintext",
3049        );
3050        assert!(looks_encrypted(&ct), "looks_encrypted must accept S4E6");
3051        assert_eq!(peek_magic(&ct), Some("S4E6"));
3052    }
3053
3054    #[tokio::test]
3055    async fn s4e6_decrypt_chunked_stream_byte_equal() {
3056        // v0.8.1 #57: round-trip via S4E6 path. encrypt_v2_chunked
3057        // emits S4E6, decrypt_chunked_stream consumes S4E5/S4E6.
3058        let kr = keyring_single(0x55);
3059        let pt: Vec<u8> = (0..(10 * 1024 * 1024_u32)).map(|i| (i & 0xFF) as u8).collect();
3060        let ct = encrypt_v2_chunked(&pt, &kr, 1024 * 1024).unwrap();
3061        // Sanity: the new PUT is S4E6.
3062        assert_eq!(&ct[..4], SSE_MAGIC_V6, "new emit is S4E6");
3063        let stream = decrypt_chunked_stream(ct, &kr);
3064        let chunks = collect_chunks(stream).await.expect("stream ok");
3065        assert_eq!(chunks.len(), 10, "10 chunks expected for 10 MiB / 1 MiB");
3066        let mut joined = Vec::with_capacity(pt.len());
3067        for c in chunks {
3068            joined.extend_from_slice(&c);
3069        }
3070        assert_eq!(joined.len(), pt.len(), "byte length matches");
3071        assert_eq!(joined, pt, "byte-equal round-trip");
3072    }
3073
3074    #[tokio::test]
3075    async fn s4e6_single_chunk_for_small_object() {
3076        // Plaintext smaller than chunk_size → chunk_count=1. The
3077        // chunk_count field offset is unchanged between S4E5 and
3078        // S4E6 (both at body[12..16]); only the salt width differs.
3079        let kr = keyring_single(0x77);
3080        let pt = b"tiny payload, smaller than chunk_size";
3081        let ct = encrypt_v2_chunked(pt, &kr, 1024 * 1024).unwrap();
3082        assert_eq!(
3083            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3084            1,
3085            "small plaintext = single chunk",
3086        );
3087        let stream = decrypt_chunked_stream(ct, &kr);
3088        let chunks = collect_chunks(stream).await.expect("stream ok");
3089        assert_eq!(chunks.len(), 1);
3090        assert_eq!(chunks[0].as_ref(), pt);
3091    }
3092
3093    #[tokio::test]
3094    async fn s4e6_tampered_chunk_n_reports_chunk_index() {
3095        // v0.8.1 #57: same tamper-detect contract as the S4E5
3096        // version, just with the wider 24-byte header. Tamper
3097        // byte inside chunk index 3 (= 4th chunk) — the stream
3098        // must yield 3 successful chunks, then ChunkAuthFailed { 3 }.
3099        let kr = keyring_single(0x91);
3100        let chunk_size = 1024;
3101        let pt = vec![0xCD_u8; chunk_size * 8]; // 8 chunks
3102        let mut ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap().to_vec();
3103        // Locate chunk 3's first ciphertext byte: header (24) + 3 *
3104        // (tag 16 + ct 1024) + tag 16 = 24 + 3*1040 + 16 = 3160.
3105        let target = S4E6_HEADER_BYTES + 3 * (TAG_LEN + chunk_size) + TAG_LEN;
3106        ct[target] ^= 0x42;
3107        let stream = decrypt_chunked_stream(bytes::Bytes::from(ct), &kr);
3108        let mut s = std::pin::pin!(stream);
3109        // Chunks 0, 1, 2 must succeed.
3110        for expected_i in 0..3_u32 {
3111            let item = s.next().await.expect("yield");
3112            item.unwrap_or_else(|e| panic!("chunk {expected_i}: {e:?}"));
3113        }
3114        // Chunk 3 fails with the right index.
3115        let err = s.next().await.expect("yield error").unwrap_err();
3116        assert!(
3117            matches!(err, SseError::ChunkAuthFailed { chunk_index: 3 }),
3118            "got {err:?}",
3119        );
3120    }
3121
3122    #[tokio::test]
3123    async fn s4e5_back_compat_s4e2_blob_rejected_with_clear_error() {
3124        // Feeding an S4E2 frame to decrypt_chunked_stream should
3125        // surface BadMagic on the first poll (NOT silently fall
3126        // back — the caller is expected to peek_magic and dispatch).
3127        let kr = keyring_single(0x12);
3128        let s4e2 = encrypt_v2(b"a v2 blob, not chunked", &kr);
3129        let stream = decrypt_chunked_stream(s4e2, &kr);
3130        let result = collect_chunks(stream).await;
3131        let err = result.unwrap_err();
3132        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3133    }
3134
3135    #[test]
3136    fn s4e6_salt_randomness_smoke() {
3137        // 8-byte salt → birthday paradox 50% collision at ~2^32
3138        // (~4.3 billion) PUTs. 1024 PUTs → effectively zero
3139        // expected collisions; we don't enforce zero, just
3140        // sanity-check the salt actually differs more than half
3141        // the time (catches a stuck PRNG without a 4-billion-PUT
3142        // test).
3143        let kr = keyring_single(0x33);
3144        let mut salts = std::collections::HashSet::new();
3145        let n = 1024;
3146        for _ in 0..n {
3147            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3148            let mut salt = [0u8; 8];
3149            salt.copy_from_slice(&ct[16..24]);
3150            salts.insert(salt);
3151        }
3152        assert!(
3153            salts.len() > n / 2,
3154            "expected most of the {n} salts to be unique (got {} unique)",
3155            salts.len(),
3156        );
3157    }
3158
3159    #[test]
3160    fn s4e6_chunk_size_zero_invalid() {
3161        let kr = keyring_single(0x66);
3162        let err = encrypt_v2_chunked(b"hi", &kr, 0).unwrap_err();
3163        assert!(matches!(err, SseError::ChunkSizeInvalid));
3164    }
3165
3166    #[tokio::test]
3167    async fn s4e6_truncated_body_reports_frame_truncated() {
3168        // Truncate inside chunk 2's tag → ChunkFrameTruncated, not
3169        // panic, not silent success. Header is now 24 bytes (S4E6).
3170        let kr = keyring_single(0xA1);
3171        let chunk_size = 256;
3172        let pt = vec![0u8; chunk_size * 4];
3173        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3174        // Truncate to inside chunk 2's tag: header + chunk0 + chunk1
3175        // + 8B partial of chunk2's tag.
3176        let trunc = S4E6_HEADER_BYTES + 2 * (TAG_LEN + chunk_size) + 8;
3177        let truncated = bytes::Bytes::copy_from_slice(&ct[..trunc]);
3178        let stream = decrypt_chunked_stream(truncated, &kr);
3179        let result = collect_chunks(stream).await;
3180        let err = result.unwrap_err();
3181        assert!(
3182            matches!(err, SseError::ChunkFrameTruncated { .. }),
3183            "got {err:?}",
3184        );
3185    }
3186
3187    #[test]
3188    fn s4e6_decrypt_buffered_round_trip_via_top_level_decrypt() {
3189        // Sync `decrypt(blob, &keyring)` must also accept the
3190        // chunked frames (back-compat path for callers that need
3191        // the whole plaintext).
3192        let kr = keyring_single(0xDE);
3193        let pt = b"buffered sync decrypt path".repeat(32);
3194        let ct = encrypt_v2_chunked(&pt, &kr, 13).unwrap();
3195        let plain = decrypt(&ct, &kr).expect("buffered S4E6 decrypt ok");
3196        assert_eq!(plain.as_ref(), pt.as_slice());
3197    }
3198
3199    #[tokio::test]
3200    async fn s4e6_unknown_key_id_in_frame_errors() {
3201        // Encrypt under id=7, decrypt under a keyring that lacks id=7.
3202        let kr_put = SseKeyring::new(7, key32(0xCC));
3203        let kr_get = keyring_single(0xCC); // only id=1
3204        let ct = encrypt_v2_chunked(b"orphan key", &kr_put, 64).unwrap();
3205        // Sync path
3206        let err = decrypt(&ct, &kr_get).unwrap_err();
3207        assert!(matches!(err, SseError::KeyNotInKeyring { id: 7 }), "got {err:?}");
3208        // Stream path
3209        let stream = decrypt_chunked_stream(ct, &kr_get);
3210        let result = collect_chunks(stream).await;
3211        assert!(
3212            matches!(result, Err(SseError::KeyNotInKeyring { id: 7 })),
3213            "got {result:?}",
3214        );
3215    }
3216
3217    #[tokio::test]
3218    async fn s4e6_final_chunk_smaller_than_chunk_size() {
3219        // Plaintext = 2.5 chunks → final chunk holds half the bytes.
3220        // S4E6 header = 24 bytes → total on-disk = 24 + 48 + 250.
3221        let kr = keyring_single(0xEF);
3222        let chunk_size = 100;
3223        let pt: Vec<u8> = (0..250_u32).map(|i| i as u8).collect();
3224        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3225        assert_eq!(
3226            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
3227            3,
3228            "ceil(250/100) = 3 chunks",
3229        );
3230        // Total on-disk: 24 header + 3 tags (48) + 250 plaintext = 322.
3231        assert_eq!(ct.len(), S4E6_HEADER_BYTES + 48 + 250);
3232        let stream = decrypt_chunked_stream(ct, &kr);
3233        let chunks = collect_chunks(stream).await.expect("stream ok");
3234        assert_eq!(chunks.len(), 3);
3235        assert_eq!(chunks[0].len(), 100);
3236        assert_eq!(chunks[1].len(), 100);
3237        assert_eq!(chunks[2].len(), 50, "final chunk is the remainder");
3238        let joined: Vec<u8> = chunks.iter().flat_map(|c| c.iter().copied()).collect();
3239        assert_eq!(joined, pt);
3240    }
3241
3242    // -----------------------------------------------------------------------
3243    // v0.8.1 #57: S4E6-specific tests added on top of the renamed
3244    // s4e6_* battery above. Keep these focused on what's *new*:
3245    //   - back-compat read of legacy S4E5 blobs
3246    //   - 24-bit chunk_count cap
3247    //   - the parse_s4e6_header public API
3248    // -----------------------------------------------------------------------
3249
3250    #[test]
3251    fn s4e6_back_compat_read_s4e5_blob() {
3252        // Synthesize a "v0.8.0 vintage" S4E5 blob via the test-only
3253        // helper, then prove the v0.8.1 gateway decrypts it under
3254        // the same keyring — both via sync `decrypt` (buffered)
3255        // and the streaming path. Without this, every S4E5 object
3256        // in production becomes unreadable after the upgrade.
3257        let kr = keyring_single(0x57);
3258        let pt = b"v0.8.0 vintage chunked SSE-S4 object".repeat(64);
3259        let s4e5 = encrypt_v2_chunked_s4e5_for_test(&pt, &kr, 91).unwrap();
3260        // Confirm the test fixture really is S4E5 magic + 20-byte header.
3261        assert_eq!(&s4e5[..4], SSE_MAGIC_V5, "fixture must be S4E5");
3262        assert_eq!(peek_magic(&s4e5), Some("S4E5"));
3263        // Sync decrypt path (top-level `decrypt`, dispatches V5 + V6).
3264        let plain_sync = decrypt(&s4e5, &kr).expect("sync S4E5 decrypt ok");
3265        assert_eq!(plain_sync.as_ref(), pt.as_slice());
3266        // Streaming decrypt path — must also accept S4E5.
3267        let collected = futures::executor::block_on(async {
3268            let stream = decrypt_chunked_stream(s4e5.clone(), &kr);
3269            collect_chunks(stream).await
3270        })
3271        .expect("stream S4E5 decrypt ok");
3272        let mut joined = Vec::with_capacity(pt.len());
3273        for c in collected {
3274            joined.extend_from_slice(&c);
3275        }
3276        assert_eq!(joined, pt, "S4E5 streaming round-trip byte-equal");
3277    }
3278
3279    #[test]
3280    fn s4e6_layout_24_bytes_header() {
3281        // Sanity check: the S4E6 fixed header is exactly 24 bytes
3282        // (vs 20 for S4E5). Catches an accidental const drift in a
3283        // future PR.
3284        assert_eq!(S4E6_HEADER_BYTES, 24);
3285        assert_eq!(S4E6_PER_CHUNK_OVERHEAD, TAG_LEN);
3286        assert_eq!(S4E6_HEADER_BYTES, S4E5_HEADER_BYTES + 4);
3287    }
3288
3289    #[test]
3290    fn s4e6_parse_header_round_trip() {
3291        // parse_s4e6_header is the public mirror of the internal
3292        // parse_chunked_header, useful for admin tools. Verify it
3293        // returns the same field values that encrypt_v2_chunked wrote.
3294        let kr = keyring_single(0xAB);
3295        let chunk_size = 256;
3296        let pt = vec![1u8; 7 * chunk_size];
3297        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
3298        let hdr = parse_s4e6_header(&ct).expect("parse ok");
3299        assert_eq!(hdr.key_id, 1);
3300        assert_eq!(hdr.chunk_size, chunk_size as u32);
3301        assert_eq!(hdr.chunk_count, 7);
3302        assert_eq!(hdr.salt.len(), 8);
3303        // Bad magic on a non-S4E6 blob → BadMagic.
3304        let bogus = b"S4E2\x01\x00\x00\x00........................";
3305        let err = parse_s4e6_header(bogus).unwrap_err();
3306        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
3307        // Truncation → ChunkFrameTruncated.
3308        let err2 = parse_s4e6_header(&ct[..10]).unwrap_err();
3309        assert!(matches!(err2, SseError::ChunkFrameTruncated { .. }), "got {err2:?}");
3310    }
3311
3312    #[test]
3313    fn s4e6_salt_uniqueness_smoke_16m() {
3314        // v0.8.1 #57 security regression detection: with the
3315        // 4-byte S4E5 salt, ~65,536 PUTs already had ~50%
3316        // birthday collision (the bug that motivated this patch).
3317        // With the 8-byte S4E6 salt the expected collisions over
3318        // 65,536 PUTs is ~2^16 * 2^16 / 2^65 ≈ 2^-33 — i.e.
3319        // effectively zero.
3320        //
3321        // We can't actually run 16M PUTs in unit-test wall-clock
3322        // (each PUT does an AES-GCM encrypt), so we run a fast
3323        // smoke (16k) and additionally validate the math: at the
3324        // S4E5 4-byte salt width, 16k PUTs would already give a
3325        // ~3.1% collision probability by birthday bound; at the
3326        // S4E6 8-byte salt that drops to ~3.6e-11. The smoke test
3327        // therefore *would* show collisions if we'd accidentally
3328        // shipped the 4-byte salt — confirming the regression
3329        // detector.
3330        let kr = keyring_single(0xA6);
3331        let mut salts = std::collections::HashSet::with_capacity(16384);
3332        let n = 16384_usize;
3333        let mut collisions_top4 = 0usize;
3334        let mut top4_seen = std::collections::HashSet::with_capacity(16384);
3335        for _ in 0..n {
3336            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
3337            let mut salt = [0u8; 8];
3338            salt.copy_from_slice(&ct[16..24]);
3339            salts.insert(salt);
3340            // Side-channel: count collisions on just the *first 4
3341            // bytes* of the 8-byte salt. If we'd kept the old
3342            // 4-byte salt, this collision count would be the only
3343            // collision count — and at n=16k it should be ~62
3344            // (birthday: n^2/(2 * 2^32) = 16384^2/2^33 ≈ 31, with
3345            // some noise). The full 8-byte salt test passes if the
3346            // FULL salts are all unique while the truncated-to-4
3347            // count is non-zero, proving the extra 4 bytes really
3348            // are doing the security work.
3349            let mut top4 = [0u8; 4];
3350            top4.copy_from_slice(&salt[..4]);
3351            if !top4_seen.insert(top4) {
3352                collisions_top4 += 1;
3353            }
3354        }
3355        assert_eq!(
3356            salts.len(),
3357            n,
3358            "all 8-byte salts must be unique across {n} PUTs (got {} unique)",
3359            salts.len(),
3360        );
3361        // Sanity check the regression detector: at 16k PUTs with a
3362        // 4-byte salt, birthday math predicts ~31 collisions on
3363        // average. Anything in the 0..200 range is statistically
3364        // believable for 16k uniform 32-bit draws; we only assert
3365        // ">= 1" (i.e. at least one collision happened — which
3366        // would have been a real bug under S4E5).
3367        eprintln!(
3368            "s4e6_salt_uniqueness_smoke_16m: 16k PUTs, full 8B salts \
3369             all unique ({}/{}), simulated 4B-truncated salt yielded \
3370             {} collisions (this is what S4E5 would have shipped)",
3371            salts.len(),
3372            n,
3373            collisions_top4,
3374        );
3375        // Don't make the test flaky on the simulated number (it's a
3376        // statistical signal); just leave the eprintln for the
3377        // operator audit log when the test runs verbose.
3378    }
3379
3380    #[test]
3381    fn s4e6_max_chunks_24bit() {
3382        // The S4E6 nonce embeds the chunk index as 24-bit BE, so
3383        // chunk_count > 2^24 - 1 must surface ChunkCountTooLarge
3384        // at PUT time. We can't actually run a 16M-chunk encrypt
3385        // in unit-test wall-clock (16M AES-GCM tag verifies even
3386        // on AES-NI is several minutes), but we can verify the
3387        // CAP constant matches expectations + exercise the cap by
3388        // picking a chunk_size that forces overflow on a tiny
3389        // plaintext.
3390        assert_eq!(S4E6_MAX_CHUNK_COUNT, (1u32 << 24) - 1);
3391        assert_eq!(S4E6_MAX_CHUNK_COUNT, 16_777_215);
3392
3393        // chunk_size=1 + plaintext.len()=16_777_216 → 16M+1 chunks
3394        // → over the cap → ChunkCountTooLarge. Allocating a 16
3395        // MiB plaintext is fine.
3396        let kr = keyring_single(0xC4);
3397        let pt = vec![0u8; (S4E6_MAX_CHUNK_COUNT as usize) + 1]; // 16,777,216 B
3398        let err = encrypt_v2_chunked(&pt, &kr, 1).unwrap_err();
3399        assert!(
3400            matches!(
3401                err,
3402                SseError::ChunkCountTooLarge {
3403                    got: 16_777_216,
3404                    max: 16_777_215
3405                }
3406            ),
3407            "got {err:?}",
3408        );
3409
3410        // And just under the cap (chunk_count = 16_777_215) should
3411        // succeed. We pick chunk_size that produces exactly the cap
3412        // so the inner loop only runs N times. 16M chunk-encrypts
3413        // would be slow, so test with a smaller cap-near config
3414        // that exercises the same boundary check: 1023 chunks of 1
3415        // byte each = 1023 chunks well under the cap → success.
3416        // The actual on-cap encrypt is exercised by the buffered
3417        // decrypt path through `parse_chunked_header`.
3418        let pt_ok = vec![0u8; 1023];
3419        let ct = encrypt_v2_chunked(&pt_ok, &kr, 1).expect("under-cap PUT must succeed");
3420        let hdr = parse_s4e6_header(&ct).unwrap();
3421        assert_eq!(hdr.chunk_count, 1023);
3422
3423        // Synthesize a frame that *claims* chunk_count > cap and
3424        // verify the parser rejects it (defensive: a tampered
3425        // header should not loop the walker 16M+ times).
3426        let mut tampered = ct.to_vec();
3427        // Rewrite chunk_count BE to S4E6_MAX_CHUNK_COUNT + 1 = 2^24.
3428        let bad = (S4E6_MAX_CHUNK_COUNT + 1).to_be_bytes();
3429        tampered[12..16].copy_from_slice(&bad);
3430        let err2 = parse_s4e6_header(&tampered).unwrap_err();
3431        assert!(
3432            matches!(
3433                err2,
3434                SseError::ChunkCountTooLarge { got: 16_777_216, max: 16_777_215 }
3435            ),
3436            "got {err2:?}",
3437        );
3438    }
3439
3440    #[test]
3441    fn s4e6_nonce_v6_layout() {
3442        // Direct unit test on nonce_v6: prefix b'E', then 8B salt,
3443        // then 24-bit BE chunk_index. The high byte of u32
3444        // chunk_index must be dropped (caller-validated cap).
3445        let salt = [0xAA_u8; 8];
3446        let n0 = nonce_v6(&salt, 0);
3447        assert_eq!(n0[0], b'E');
3448        assert_eq!(&n0[1..9], &salt);
3449        assert_eq!(&n0[9..12], &[0, 0, 0]);
3450        let n1 = nonce_v6(&salt, 1);
3451        assert_eq!(&n1[9..12], &[0, 0, 1]);
3452        let n_mid = nonce_v6(&salt, 0x123456);
3453        assert_eq!(&n_mid[9..12], &[0x12, 0x34, 0x56]);
3454        let n_max = nonce_v6(&salt, S4E6_MAX_CHUNK_COUNT);
3455        assert_eq!(&n_max[9..12], &[0xFF, 0xFF, 0xFF]);
3456    }
3457
3458    #[tokio::test]
3459    async fn s4e6_tampered_salt_byte_fails_aead() {
3460        // Flipping a single byte of the 8-byte salt in the header
3461        // must invalidate every chunk's AES-GCM tag (salt is in
3462        // the AAD). Confirms the salt expansion didn't drop
3463        // header authentication.
3464        let kr = keyring_single(0xB6);
3465        let pt = b"salt-in-aad coverage".repeat(64);
3466        let mut ct = encrypt_v2_chunked(&pt, &kr, 128).unwrap().to_vec();
3467        // Salt bytes 16..24 — flip the middle byte.
3468        ct[20] ^= 0x01;
3469        let err = decrypt(&ct, &kr).unwrap_err();
3470        assert!(
3471            matches!(err, SseError::ChunkAuthFailed { chunk_index: 0 }),
3472            "got {err:?}",
3473        );
3474    }
3475}