Skip to main content

s4_server/
sse.rs

1//! Server-side encryption (SSE-S4) — AES-256-GCM (v0.4 #21, v0.5 #29, v0.5 #27, v0.5 #28).
2//!
3//! Wraps the post-compression S3 object body with authenticated
4//! encryption. Compress-then-encrypt is the right order: encryption
5//! produces high-entropy bytes that don't compress, so encrypting last
6//! preserves the codec's ratio.
7//!
8//! ## Wire formats
9//!
10//! ### S4E1 (v0.4) — single key, no rotation
11//!
12//! ```text
13//! [magic: "S4E1" 4B]
14//! [algo:  u8]            # 1 = AES-256-GCM
15//! [reserved: 3B]         # 0x00 0x00 0x00
16//! [nonce: 12B]           # random per-object
17//! [tag:   16B]           # AES-GCM authentication tag
18//! [ciphertext: variable] # encrypted-then-authenticated body
19//! ```
20//!
21//! Total overhead: 36 bytes per object.
22//!
23//! ### S4E2 (v0.5 #29) — keyring-aware, supports rotation
24//!
25//! ```text
26//! [magic:  "S4E2" 4B]
27//! [algo:   u8]            # 1 = AES-256-GCM
28//! [key_id: u16 BE]        # which keyring slot encrypted this body
29//! [reserved: 1B]          # 0x00
30//! [nonce:  12B]           # random per-object
31//! [tag:    16B]           # AES-GCM authentication tag
32//! [ciphertext: variable]
33//! ```
34//!
35//! Same 36-byte overhead — we reused the 3-byte reserved area in S4E1
36//! to fit a 2-byte key-id + 1-byte reserved without bumping the header
37//! size. The key-id is included in the AAD so a flipped key-id byte
38//! fails the auth tag (i.e. an attacker can't trick the gateway into
39//! decrypting under a different keyring slot).
40//!
41//! ### S4E3 (v0.5 #27) — SSE-C, customer-provided key
42//!
43//! ```text
44//! [magic:   "S4E3" 4B]
45//! [algo:    u8]            # 1 = AES-256-GCM
46//! [key_md5: 16B]           # MD5 fingerprint of the customer key
47//! [nonce:   12B]           # random per-object
48//! [tag:     16B]           # AES-GCM authentication tag
49//! [ciphertext: variable]
50//! ```
51//!
52//! Overhead: 49 bytes (`4 + 1 + 16 + 12 + 16`). Unlike S4E1/S4E2 the
53//! gateway does **not** persist the key — the client supplies it on
54//! every PUT/GET via `x-amz-server-side-encryption-customer-{algorithm,
55//! key,key-MD5}` headers. We store only the 16-byte MD5 in the on-disk
56//! frame so a GET with the wrong key surfaces as
57//! [`SseError::WrongCustomerKey`] before AES-GCM is even tried (saves a
58//! useless decrypt + gives operators a distinct error from generic auth
59//! failure).
60//!
61//! The `key_md5` is included in the AAD so flipping a single byte of
62//! the stored fingerprint also breaks AES-GCM auth — i.e. an attacker
63//! who tampered with the metadata can't sneak a different key past the
64//! check.
65//!
66//! ### S4E4 (v0.5 #28) — SSE-KMS envelope, per-object DEK
67//!
68//! ```text
69//! [magic:           "S4E4" 4B]
70//! [algo:            u8]            # 1 = AES-256-GCM
71//! [key_id_len:      u8]            # 1..=255, length of UTF-8 key_id
72//! [key_id:          variable]      # UTF-8, AAD-authenticated
73//! [wrapped_dek_len: u32 BE]        # length of the wrapped DEK blob
74//! [wrapped_dek:     variable]      # opaque, AAD-authenticated
75//! [nonce:           12B]           # random per-object
76//! [tag:             16B]           # AES-GCM auth tag for body
77//! [ciphertext:      variable]      # body encrypted under the DEK
78//! ```
79//!
80//! Header overhead: `4 + 1 + 1 + key_id_len + 4 + wrapped_dek_len + 12
81//! + 16` = 38 + key_id_len + wrapped_dek_len. For a typical
82//! [`crate::kms::LocalKms`] wrap (60-byte ciphertext) and a 36-char
83//! UUID-style `key_id`, that's ~134 bytes per object.
84//!
85//! `key_id` and `wrapped_dek` are both placed in the AAD so an
86//! attacker cannot rewrite either field to point the gateway at a
87//! different KEK or wrapped DEK without invalidating the body's
88//! AES-GCM tag. The plaintext DEK is never persisted; only the
89//! wrapped form is on disk, and the gateway holds the plaintext only
90//! for the duration of one PUT or GET.
91//!
92//! S4E4 decrypt requires an `async` round-trip to the KMS backend
93//! (to unwrap the DEK), so the synchronous [`decrypt`] function
94//! refuses S4E4 with [`SseError::KmsAsyncRequired`] — callers that
95//! peek `S4E4` via [`peek_magic`] must dispatch to
96//! [`decrypt_with_kms`] instead.
97//!
98//! ## v0.5 rotation flow (SSE-S4 only)
99//!
100//! Operators wire one [`SseKeyring`] holding the **active** key plus
101//! any number of **retired** keys. PUT always encrypts under the
102//! active key (S4E2 with that key's id). GET sniffs the magic:
103//!
104//! - `S4E1`: legacy single-key path. The keyring's active key is tried
105//!   first, then every retired key — this lets a v0.4 deployment
106//!   migrate to a keyring with the original key as active and decrypt
107//!   pre-rotation objects unchanged.
108//! - `S4E2`: read the key_id, look it up in the keyring, decrypt with
109//!   that exact key. Missing key_id surfaces as `KeyNotInKeyring`.
110//! - `S4E3`: keyring is **not** consulted. Caller must supply
111//!   [`SseSource::CustomerKey`] with the matching key + md5.
112//!
113//! ## Open follow-ups
114//!
115//! - **Server-managed key only** (for SSE-S4): keys come from local
116//!   files via `--sse-s4-key` / `--sse-s4-key-rotated`. KMS / vault
117//!   integration for the SSE-S4 keyring (i.e. wrapping the keyring's
118//!   keys with KMS) is a separate issue. SSE-KMS for per-object DEKs
119//!   is implemented (see [`SseSource::Kms`] + S4E4 above).
120
121use std::collections::HashMap;
122use std::path::Path;
123use std::sync::Arc;
124
125use aes_gcm::aead::{Aead, KeyInit, Payload};
126use aes_gcm::{Aes256Gcm, Key, Nonce};
127use bytes::Bytes;
128use md5::{Digest as Md5Digest, Md5};
129use rand::RngCore;
130use thiserror::Error;
131
132use crate::kms::{KmsBackend, KmsError, WrappedDek};
133
134pub const SSE_MAGIC_V1: &[u8; 4] = b"S4E1";
135pub const SSE_MAGIC_V2: &[u8; 4] = b"S4E2";
136pub const SSE_MAGIC_V3: &[u8; 4] = b"S4E3";
137pub const SSE_MAGIC_V4: &[u8; 4] = b"S4E4";
138/// v0.8 #52: chunked variant of S4E2 — same SSE-S4 keyring source,
139/// but the body is sliced into independently-sealed AES-GCM chunks
140/// so the GET path can stream-decrypt + emit chunk-by-chunk instead
141/// of buffering the entire object before tag verify. See
142/// [`encrypt_v2_chunked`] / [`decrypt_chunked_stream`] for the on-
143/// the-wire layout.
144pub const SSE_MAGIC_V5: &[u8; 4] = b"S4E5";
145/// Back-compat alias — v0.4 callers that imported `SSE_MAGIC` mean S4E1.
146pub const SSE_MAGIC: &[u8; 4] = SSE_MAGIC_V1;
147
148/// Header layout matches between S4E1 and S4E2 (both 36 bytes total)
149/// because S4E2 reuses the 3-byte reserved slot to fit `key_id (2B) +
150/// reserved (1B)`. Keeping them the same length means the rest of the
151/// pipeline (sidecar offsets, multipart math) doesn't care which
152/// frame variant is in flight.
153pub const SSE_HEADER_BYTES: usize = 4 + 1 + 3 + 12 + 16; // = 36
154/// S4E3 (SSE-C) replaces the 3-byte reserved area with a 16-byte
155/// customer-key MD5 fingerprint, so the header is 49 bytes total.
156/// `magic 4 + algo 1 + key_md5 16 + nonce 12 + tag 16`.
157pub const SSE_HEADER_BYTES_V3: usize = 4 + 1 + KEY_MD5_LEN + 12 + 16; // = 49
158pub const ALGO_AES_256_GCM: u8 = 1;
159const NONCE_LEN: usize = 12;
160const TAG_LEN: usize = 16;
161const KEY_LEN: usize = 32;
162const KEY_MD5_LEN: usize = 16;
163/// AWS S3 SSE-C only allows AES256 in the
164/// `x-amz-server-side-encryption-customer-algorithm` header, so we
165/// match that exact spelling for parity with real S3 clients.
166pub const SSE_C_ALGORITHM: &str = "AES256";
167
168#[derive(Debug, Error)]
169pub enum SseError {
170    #[error("SSE key file {path:?}: {source}")]
171    KeyFileIo {
172        path: std::path::PathBuf,
173        source: std::io::Error,
174    },
175    #[error(
176        "SSE key file must be exactly 32 raw bytes (or 64-char hex / 44-char base64); got {got} bytes after parse"
177    )]
178    BadKeyLength { got: usize },
179    #[error("SSE-encrypted body too short ({got} bytes; need at least {SSE_HEADER_BYTES})")]
180    TooShort { got: usize },
181    #[error("SSE bad magic: expected S4E1/S4E2/S4E3/S4E4/S4E5, got {got:?}")]
182    BadMagic { got: [u8; 4] },
183    #[error("SSE unsupported algo tag: {tag} (this build only knows AES-256-GCM = 1)")]
184    UnsupportedAlgo { tag: u8 },
185    #[error(
186        "SSE key_id {id} (S4E2 frame) not present in keyring; rotation history likely incomplete"
187    )]
188    KeyNotInKeyring { id: u16 },
189    #[error("SSE decryption / authentication failed (key mismatch or ciphertext tampered with)")]
190    DecryptFailed,
191    // --- v0.5 #27: SSE-C specific errors ---
192    /// The MD5 fingerprint stored in the S4E3 frame doesn't match the
193    /// MD5 of the customer key the client supplied. This is the
194    /// "wrong customer key on GET" signal — distinct from
195    /// `DecryptFailed` so service.rs can map it to AWS S3's
196    /// `403 AccessDenied` (S3 returns AccessDenied when the supplied
197    /// SSE-C key doesn't match the one used at PUT time).
198    #[error("SSE-C key MD5 fingerprint mismatch — client supplied a different key than PUT")]
199    WrongCustomerKey,
200    /// `parse_customer_key_headers` saw a malformed input. `reason` is
201    /// a short human string ("base64 decode of key", "key length",
202    /// "md5 length", "md5 mismatch") for operator log lines — never
203    /// echoed to the client (would leak crypto details).
204    #[error("SSE-C customer-key headers invalid: {reason}")]
205    InvalidCustomerKey { reason: &'static str },
206    /// Client asked for an SSE-C algorithm the gateway doesn't speak.
207    /// AWS S3 only ever defines `AES256` here; surfacing the offending
208    /// string lets us 400 with a useful message.
209    #[error("SSE-C algorithm {algo:?} unsupported (only {SSE_C_ALGORITHM:?} is allowed)")]
210    CustomerKeyAlgorithmUnsupported { algo: String },
211    /// S4E3 body lacks an SSE-C key — caller passed `SseSource::Keyring`
212    /// when decrypting an SSE-C-encrypted object. service.rs should
213    /// translate this into the same "missing customer key" 400 that
214    /// AWS S3 returns when SSE-C headers are absent on a GET.
215    #[error("S4E3 frame requires SseSource::CustomerKey; got Keyring")]
216    CustomerKeyRequired,
217    /// Inverse: client sent SSE-C headers on a GET for an object stored
218    /// without SSE-C. The supplied key has no role in decryption, but
219    /// AWS S3 actually 400s in this case ("expected an unencrypted
220    /// object" / "extraneous SSE-C headers"), so we mirror that.
221    #[error("S4E1/S4E2 frame stored without SSE-C; SseSource::CustomerKey is unexpected")]
222    CustomerKeyUnexpected,
223    // --- v0.5 #28: SSE-KMS specific errors ---
224    /// `decrypt` (sync) was handed an S4E4 body. SSE-KMS unwrap is
225    /// async (it round-trips to the KMS backend), so callers must
226    /// peek the magic with [`peek_magic`] and dispatch S4E4 frames to
227    /// [`decrypt_with_kms`] instead. service.rs's GET handler does
228    /// this; tests / direct callers may hit this if they forget.
229    #[error(
230        "S4E4 (SSE-KMS) body requires async decrypt — call decrypt_with_kms() instead of decrypt()"
231    )]
232    KmsAsyncRequired,
233    /// S4E4 frame is shorter than the minimum-possible header (38
234    /// bytes for an empty `key_id` + empty `wrapped_dek`, which is
235    /// itself impossible — we just sanity-check the floor).
236    #[error("S4E4 frame too short ({got} bytes; need at least {min})")]
237    KmsFrameTooShort { got: usize, min: usize },
238    /// S4E4 declared a `key_id_len` or `wrapped_dek_len` that runs
239    /// past the end of the body. Almost certainly truncation /
240    /// corruption rather than tampering (tampering would fail the
241    /// AES-GCM tag instead).
242    #[error("S4E4 frame field length out of bounds: {what}")]
243    KmsFrameFieldOob { what: &'static str },
244    /// `key_id` field of an S4E4 frame is not valid UTF-8. We require
245    /// UTF-8 because `LocalKms` uses the basename of a `.kek` file
246    /// (which is OS-string-but-typically-UTF-8) and AWS KMS uses ARNs
247    /// (which are ASCII).
248    #[error("S4E4 key_id is not valid UTF-8")]
249    KmsKeyIdNotUtf8,
250    /// service.rs handed `decrypt_with_kms` a `WrappedDek` whose
251    /// `key_id` doesn't match the one stored in the S4E4 frame. This
252    /// is an integration bug (caller is meant to pull the wrapped
253    /// DEK *from the frame*, not from somewhere else), surface as a
254    /// distinct error so it shows up in tests rather than silently
255    /// failing the AES-GCM tag.
256    #[error(
257        "S4E4 SseSource::Kms wrapped DEK key_id {supplied:?} doesn't match frame key_id {stored:?}"
258    )]
259    KmsWrappedDekMismatch {
260        supplied: String,
261        stored: String,
262    },
263    /// SSE-KMS path got a non-Kms `SseSource` for an S4E4 body. The
264    /// async dispatch in `decrypt_with_kms` re-derives the source
265    /// internally so this can only happen if a future caller passes
266    /// `SseSource::Keyring` / `CustomerKey` to a path that expected
267    /// `Kms` — kept around for symmetry with the other "wrong source"
268    /// errors.
269    #[error("S4E4 frame requires SseSource::Kms")]
270    KmsRequired,
271    /// Pass-through for [`crate::kms::KmsError`] surfaced from
272    /// `KmsBackend::decrypt_dek` — boxed so the variant stays small.
273    #[error("KMS unwrap: {0}")]
274    KmsBackend(#[from] KmsError),
275    // --- v0.8 #52: S4E5 (chunked SSE-S4) specific errors ---
276    /// AES-GCM auth tag verify failed on chunk `chunk_index` of an
277    /// S4E5 body. Distinct from the all-or-nothing
278    /// [`SseError::DecryptFailed`] because the streaming GET may
279    /// have already emitted earlier chunks to the client by the
280    /// time chunk N fails — operators need the chunk index in audit
281    /// logs to triangulate which byte range was tampered with (or
282    /// which disk sector flipped).
283    #[error("S4E5 chunk {chunk_index} auth tag verify failed (key mismatch or chunk tampered with)")]
284    ChunkAuthFailed { chunk_index: u32 },
285    /// Caller asked [`encrypt_v2_chunked`] to use a chunk size of 0
286    /// — nonsensical (would loop forever). Surfaced as an error
287    /// rather than panicking so service.rs can map a bad
288    /// `--sse-chunk-size 0` configuration to a clear startup error.
289    #[error("S4E5 chunk_size must be > 0 (got 0)")]
290    ChunkSizeInvalid,
291    /// S4E5 frame is shorter than the fixed header or declares a
292    /// (chunk_count × per-chunk-bytes) total that overruns the
293    /// body. Almost certainly truncation / corruption — tampering
294    /// with the per-chunk ciphertext or tag would surface as
295    /// [`SseError::ChunkAuthFailed`] instead.
296    #[error("S4E5 frame truncated: {what}")]
297    ChunkFrameTruncated { what: &'static str },
298}
299
300/// 32-byte symmetric key. `bytes` is `pub` so call sites can construct
301/// keys directly from already-validated bytes (e.g. KMS-decrypted DEKs)
302/// without going through the on-disk parser. Hold inside an `Arc` when
303/// sharing across handler tasks — `SseKeyring` does this internally.
304pub struct SseKey {
305    pub bytes: [u8; 32],
306}
307
308impl SseKey {
309    /// Load a 32-byte key from disk. Accepts three on-disk encodings:
310    /// raw 32 bytes, 64-char ASCII hex, or 44-char ASCII base64 (with or
311    /// without padding). Whitespace is trimmed.
312    pub fn from_path(path: &Path) -> Result<Self, SseError> {
313        let raw = std::fs::read(path).map_err(|source| SseError::KeyFileIo {
314            path: path.to_path_buf(),
315            source,
316        })?;
317        Self::from_bytes(&raw)
318    }
319
320    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SseError> {
321        // Try raw first.
322        if bytes.len() == KEY_LEN {
323            let mut k = [0u8; KEY_LEN];
324            k.copy_from_slice(bytes);
325            return Ok(Self { bytes: k });
326        }
327        // Trim whitespace and try hex / base64.
328        let s = std::str::from_utf8(bytes).unwrap_or("").trim();
329        if s.len() == KEY_LEN * 2 && s.chars().all(|c| c.is_ascii_hexdigit()) {
330            let mut k = [0u8; KEY_LEN];
331            for (i, k_byte) in k.iter_mut().enumerate() {
332                *k_byte = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16)
333                    .map_err(|_| SseError::BadKeyLength { got: bytes.len() })?;
334            }
335            return Ok(Self { bytes: k });
336        }
337        if let Ok(decoded) =
338            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s.as_bytes())
339            && decoded.len() == KEY_LEN
340        {
341            let mut k = [0u8; KEY_LEN];
342            k.copy_from_slice(&decoded);
343            return Ok(Self { bytes: k });
344        }
345        Err(SseError::BadKeyLength { got: bytes.len() })
346    }
347
348    fn as_aes_key(&self) -> &Key<Aes256Gcm> {
349        Key::<Aes256Gcm>::from_slice(&self.bytes)
350    }
351}
352
353impl std::fmt::Debug for SseKey {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        f.debug_struct("SseKey")
356            .field("len", &KEY_LEN)
357            .field("key", &"<redacted>")
358            .finish()
359    }
360}
361
362/// v0.5 #29: a set of `SseKey`s indexed by `u16` key-id, plus a
363/// designated **active** id used for new encryptions. Rotation is just
364/// "add the new key, flip `active` to its id, leave the old keys for
365/// decryption-only". Cheap to clone (`Arc<SseKey>` per slot).
366#[derive(Clone)]
367pub struct SseKeyring {
368    active: u16,
369    keys: HashMap<u16, Arc<SseKey>>,
370}
371
372impl SseKeyring {
373    /// Create a keyring seeded with one key, immediately marked
374    /// active. Add older keys later via [`SseKeyring::add`] so the
375    /// gateway can still decrypt pre-rotation objects.
376    pub fn new(active: u16, key: Arc<SseKey>) -> Self {
377        let mut keys = HashMap::new();
378        keys.insert(active, key);
379        Self { active, keys }
380    }
381
382    /// Insert another key under id `id`. Does NOT change `active`. If
383    /// `id == active`, the slot is overwritten (useful for tests; in
384    /// production prefer minting a fresh id).
385    pub fn add(&mut self, id: u16, key: Arc<SseKey>) {
386        self.keys.insert(id, key);
387    }
388
389    /// Active (id, key) — used by [`encrypt_v2`] to pick the slot for
390    /// new objects.
391    pub fn active(&self) -> (u16, &SseKey) {
392        let id = self.active;
393        let key = self
394            .keys
395            .get(&id)
396            .expect("active key id must be present in keyring (constructor invariant)");
397        (id, key.as_ref())
398    }
399
400    /// Look up a key by id. Returns `None` for unknown ids — caller
401    /// should surface this as [`SseError::KeyNotInKeyring`].
402    pub fn get(&self, id: u16) -> Option<&SseKey> {
403        self.keys.get(&id).map(Arc::as_ref)
404    }
405}
406
407impl std::fmt::Debug for SseKeyring {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        f.debug_struct("SseKeyring")
410            .field("active", &self.active)
411            .field("key_count", &self.keys.len())
412            .field("key_ids", &self.keys.keys().collect::<Vec<_>>())
413            .finish()
414    }
415}
416
417pub type SharedSseKeyring = Arc<SseKeyring>;
418
419/// Encrypt `plaintext` with the given key, producing the on-the-wire
420/// S4E1-framed output: `[magic 4][algo 1][reserved 3][nonce 12][tag 16][ciphertext]`.
421///
422/// Kept for back-compat: v0.4 callers that hand-built an `SseKey` (no
423/// keyring) still get the v1 frame. New code should use
424/// [`encrypt_v2`] which writes S4E2 and supports rotation on read.
425pub fn encrypt(key: &SseKey, plaintext: &[u8]) -> Bytes {
426    let cipher = Aes256Gcm::new(key.as_aes_key());
427    let mut nonce_bytes = [0u8; NONCE_LEN];
428    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
429    let nonce = Nonce::from_slice(&nonce_bytes);
430    // AAD = magic + algo. Tampering with either bumps the tag check.
431    let mut aad = [0u8; 8];
432    aad[..4].copy_from_slice(SSE_MAGIC_V1);
433    aad[4] = ALGO_AES_256_GCM;
434    let ct_with_tag = cipher
435        .encrypt(
436            nonce,
437            Payload {
438                msg: plaintext,
439                aad: &aad,
440            },
441        )
442        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
443    debug_assert!(ct_with_tag.len() >= TAG_LEN);
444    let split = ct_with_tag.len() - TAG_LEN;
445    let (ct, tag) = ct_with_tag.split_at(split);
446
447    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
448    out.extend_from_slice(SSE_MAGIC_V1);
449    out.push(ALGO_AES_256_GCM);
450    out.extend_from_slice(&[0u8; 3]); // reserved
451    out.extend_from_slice(&nonce_bytes);
452    out.extend_from_slice(tag);
453    out.extend_from_slice(ct);
454    Bytes::from(out)
455}
456
457/// v0.5 #29: encrypt under the keyring's currently-active key, writing
458/// an S4E2-framed body (`[magic 4][algo 1][key_id 2 BE][reserved 1]
459/// [nonce 12][tag 16][ciphertext]`). The key-id is included in the
460/// AAD so flipping it fails the auth tag.
461pub fn encrypt_v2(plaintext: &[u8], keyring: &SseKeyring) -> Bytes {
462    let (key_id, key) = keyring.active();
463    let cipher = Aes256Gcm::new(key.as_aes_key());
464    let mut nonce_bytes = [0u8; NONCE_LEN];
465    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
466    let nonce = Nonce::from_slice(&nonce_bytes);
467    let aad = aad_v2(key_id);
468    let ct_with_tag = cipher
469        .encrypt(
470            nonce,
471            Payload {
472                msg: plaintext,
473                aad: &aad,
474            },
475        )
476        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
477    debug_assert!(ct_with_tag.len() >= TAG_LEN);
478    let split = ct_with_tag.len() - TAG_LEN;
479    let (ct, tag) = ct_with_tag.split_at(split);
480
481    let mut out = Vec::with_capacity(SSE_HEADER_BYTES + ct.len());
482    out.extend_from_slice(SSE_MAGIC_V2);
483    out.push(ALGO_AES_256_GCM);
484    out.extend_from_slice(&key_id.to_be_bytes()); // 2B BE key_id
485    out.push(0u8); // 1B reserved
486    out.extend_from_slice(&nonce_bytes);
487    out.extend_from_slice(tag);
488    out.extend_from_slice(ct);
489    Bytes::from(out)
490}
491
492fn aad_v1() -> [u8; 8] {
493    let mut aad = [0u8; 8];
494    aad[..4].copy_from_slice(SSE_MAGIC_V1);
495    aad[4] = ALGO_AES_256_GCM;
496    aad
497}
498
499fn aad_v2(key_id: u16) -> [u8; 8] {
500    let mut aad = [0u8; 8];
501    aad[..4].copy_from_slice(SSE_MAGIC_V2);
502    aad[4] = ALGO_AES_256_GCM;
503    aad[5..7].copy_from_slice(&key_id.to_be_bytes());
504    aad[7] = 0u8;
505    aad
506}
507
508/// AAD for S4E3 = magic (4) + algo (1) + key_md5 (16). Putting the
509/// fingerprint in the AAD means tampering with the stored MD5 (e.g. an
510/// attacker rewriting the header to match a *different* key they
511/// happen to know) breaks the AES-GCM tag — the wrong-key check isn't
512/// just a plain `==` we could be tricked past.
513fn aad_v3(key_md5: &[u8; KEY_MD5_LEN]) -> [u8; 4 + 1 + KEY_MD5_LEN] {
514    let mut aad = [0u8; 4 + 1 + KEY_MD5_LEN];
515    aad[..4].copy_from_slice(SSE_MAGIC_V3);
516    aad[4] = ALGO_AES_256_GCM;
517    aad[5..5 + KEY_MD5_LEN].copy_from_slice(key_md5);
518    aad
519}
520
521/// Parsed + verified SSE-C key material from the three customer
522/// headers. `key_md5` is the MD5 of `key` (we recompute and compare in
523/// [`parse_customer_key_headers`] — clients send their own to catch
524/// transport corruption, but we *trust* our own computation as the
525/// canonical fingerprint in the S4E3 frame).
526#[derive(Clone)]
527pub struct CustomerKeyMaterial {
528    pub key: [u8; KEY_LEN],
529    pub key_md5: [u8; KEY_MD5_LEN],
530}
531
532impl std::fmt::Debug for CustomerKeyMaterial {
533    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534        // Don't leak the key into logs. The MD5 is a public fingerprint
535        // (S3 puts it on the wire), so that's safe to show.
536        f.debug_struct("CustomerKeyMaterial")
537            .field("key", &"<redacted>")
538            .field("key_md5_hex", &hex_lower(&self.key_md5))
539            .finish()
540    }
541}
542
543fn hex_lower(bytes: &[u8]) -> String {
544    let mut s = String::with_capacity(bytes.len() * 2);
545    for b in bytes {
546        s.push_str(&format!("{b:02x}"));
547    }
548    s
549}
550
551/// Source of the encryption key for [`encrypt_with_source`] /
552/// [`decrypt`]. SSE-S4 (server-managed, rotation-aware) goes through
553/// `Keyring`; SSE-C (customer-supplied) goes through `CustomerKey`.
554///
555/// Borrowed (not owned) so the caller can hold a long-lived
556/// `CustomerKeyMaterial` next to the request and just lend it for the
557/// duration of one PUT/GET.
558#[derive(Debug, Clone, Copy)]
559pub enum SseSource<'a> {
560    /// Server-managed keyring path → produces / consumes S4E1 (legacy)
561    /// or S4E2 (rotation-aware) frames.
562    Keyring(&'a SseKeyring),
563    /// Client-supplied AES-256 key + its MD5 fingerprint → produces /
564    /// consumes S4E3 frames. The server never persists the key; it
565    /// stores `key_md5` only.
566    CustomerKey {
567        key: &'a [u8; KEY_LEN],
568        key_md5: &'a [u8; KEY_MD5_LEN],
569    },
570    /// SSE-KMS envelope → produces / consumes S4E4 frames. The server
571    /// holds a per-object plaintext DEK (from a fresh
572    /// [`KmsBackend::generate_dek`] call) and the wrapped form to
573    /// persist alongside the body. The DEK is dropped after one
574    /// PUT/GET; only the wrapped form survives at rest.
575    Kms {
576        /// 32-byte plaintext DEK, used as the AES-GCM key.
577        dek: &'a [u8; KEY_LEN],
578        /// Wrapped form to persist in the S4E4 frame (PUT) or the one
579        /// read out of the frame (GET, after a successful unwrap).
580        wrapped: &'a WrappedDek,
581    },
582}
583
584/// Back-compat coercion: existing call sites pass `&SseKeyring`
585/// directly to [`decrypt`]. With this `From` impl the generic bound
586/// `Into<SseSource>` accepts `&SseKeyring` without the caller writing
587/// `.into()`, keeping v0.4 / v0.5 #29 service.rs callers compiling
588/// untouched while v0.5 #27 SSE-C callers pass `SseSource::CustomerKey`
589/// explicitly.
590impl<'a> From<&'a SseKeyring> for SseSource<'a> {
591    fn from(kr: &'a SseKeyring) -> Self {
592        SseSource::Keyring(kr)
593    }
594}
595
596/// service.rs holds keyring as `Option<Arc<SseKeyring>>` and unwraps to
597/// `&Arc<SseKeyring>` — let that coerce too, otherwise every existing
598/// call site needs `.as_ref()` boilerplate.
599impl<'a> From<&'a Arc<SseKeyring>> for SseSource<'a> {
600    fn from(kr: &'a Arc<SseKeyring>) -> Self {
601        SseSource::Keyring(kr.as_ref())
602    }
603}
604
605impl<'a> From<&'a CustomerKeyMaterial> for SseSource<'a> {
606    fn from(m: &'a CustomerKeyMaterial) -> Self {
607        SseSource::CustomerKey {
608            key: &m.key,
609            key_md5: &m.key_md5,
610        }
611    }
612}
613
614/// Parse the three AWS SSE-C headers and return verified key material.
615///
616/// Validates, in order:
617/// 1. `algorithm == "AES256"` (the only value AWS S3 defines).
618/// 2. `key_base64` decodes to exactly 32 bytes (AES-256 key length).
619/// 3. `key_md5_base64` decodes to exactly 16 bytes (MD5 digest length).
620/// 4. The actual MD5 of the decoded key matches the supplied MD5.
621///
622/// Step 4 catches transport corruption *and* a class of programming
623/// bugs where the client signs with one key but uploads another. AWS
624/// S3 also performs this check.
625pub fn parse_customer_key_headers(
626    algorithm: &str,
627    key_base64: &str,
628    key_md5_base64: &str,
629) -> Result<CustomerKeyMaterial, SseError> {
630    use base64::Engine as _;
631    if algorithm != SSE_C_ALGORITHM {
632        return Err(SseError::CustomerKeyAlgorithmUnsupported {
633            algo: algorithm.to_string(),
634        });
635    }
636    let key_bytes = base64::engine::general_purpose::STANDARD
637        .decode(key_base64.trim().as_bytes())
638        .map_err(|_| SseError::InvalidCustomerKey {
639            reason: "base64 decode of key",
640        })?;
641    if key_bytes.len() != KEY_LEN {
642        return Err(SseError::InvalidCustomerKey {
643            reason: "key length (must be 32 bytes after base64 decode)",
644        });
645    }
646    let supplied_md5 = base64::engine::general_purpose::STANDARD
647        .decode(key_md5_base64.trim().as_bytes())
648        .map_err(|_| SseError::InvalidCustomerKey {
649            reason: "base64 decode of key MD5",
650        })?;
651    if supplied_md5.len() != KEY_MD5_LEN {
652        return Err(SseError::InvalidCustomerKey {
653            reason: "key MD5 length (must be 16 bytes after base64 decode)",
654        });
655    }
656    let actual_md5 = compute_key_md5(&key_bytes);
657    // Constant-time compare — paranoia, MD5 is non-secret but the key
658    // it identifies is, so we don't want a timing oracle.
659    if !constant_time_eq(&actual_md5, &supplied_md5) {
660        return Err(SseError::InvalidCustomerKey {
661            reason: "supplied MD5 does not match MD5 of supplied key",
662        });
663    }
664    let mut key = [0u8; KEY_LEN];
665    key.copy_from_slice(&key_bytes);
666    let mut key_md5 = [0u8; KEY_MD5_LEN];
667    key_md5.copy_from_slice(&actual_md5);
668    Ok(CustomerKeyMaterial { key, key_md5 })
669}
670
671/// Convenience wrapper — compute the MD5 fingerprint of a 32-byte
672/// customer key. Callers that already have the bytes (e.g. derived
673/// from a KMS unwrap) can use this to construct a
674/// [`CustomerKeyMaterial`] directly.
675pub fn compute_key_md5(key: &[u8]) -> [u8; KEY_MD5_LEN] {
676    let mut h = Md5::new();
677    h.update(key);
678    let out = h.finalize();
679    let mut md5 = [0u8; KEY_MD5_LEN];
680    md5.copy_from_slice(&out);
681    md5
682}
683
684/// `subtle`-free constant-time byte slice equality. We only need this
685/// at one site (MD5 verification) so pulling `subtle` in feels excessive.
686fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
687    if a.len() != b.len() {
688        return false;
689    }
690    let mut acc: u8 = 0;
691    for (x, y) in a.iter().zip(b.iter()) {
692        acc |= x ^ y;
693    }
694    acc == 0
695}
696
697/// v0.5 #27: encrypt under whichever source the caller picked.
698///
699/// - `SseSource::Keyring` → delegates to [`encrypt_v2`] (S4E2 frame).
700/// - `SseSource::CustomerKey` → writes an S4E3 frame (no key persisted,
701///   just the MD5 fingerprint for GET-side verification).
702///
703/// service.rs picks the source per-request: SSE-C headers present →
704/// `CustomerKey`, otherwise (and only when `--sse-s4-key` is wired) →
705/// `Keyring`. Plaintext objects skip this function entirely.
706pub fn encrypt_with_source(plaintext: &[u8], source: SseSource<'_>) -> Bytes {
707    match source {
708        SseSource::Keyring(kr) => encrypt_v2(plaintext, kr),
709        SseSource::CustomerKey { key, key_md5 } => encrypt_v3(plaintext, key, key_md5),
710        SseSource::Kms { dek, wrapped } => encrypt_v4(plaintext, dek, wrapped),
711    }
712}
713
714fn encrypt_v3(
715    plaintext: &[u8],
716    key: &[u8; KEY_LEN],
717    key_md5: &[u8; KEY_MD5_LEN],
718) -> Bytes {
719    let aes_key = Key::<Aes256Gcm>::from_slice(key);
720    let cipher = Aes256Gcm::new(aes_key);
721    let mut nonce_bytes = [0u8; NONCE_LEN];
722    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
723    let nonce = Nonce::from_slice(&nonce_bytes);
724    let aad = aad_v3(key_md5);
725    let ct_with_tag = cipher
726        .encrypt(
727            nonce,
728            Payload {
729                msg: plaintext,
730                aad: &aad,
731            },
732        )
733        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
734    debug_assert!(ct_with_tag.len() >= TAG_LEN);
735    let split = ct_with_tag.len() - TAG_LEN;
736    let (ct, tag) = ct_with_tag.split_at(split);
737
738    let mut out = Vec::with_capacity(SSE_HEADER_BYTES_V3 + ct.len());
739    out.extend_from_slice(SSE_MAGIC_V3);
740    out.push(ALGO_AES_256_GCM);
741    out.extend_from_slice(key_md5);
742    out.extend_from_slice(&nonce_bytes);
743    out.extend_from_slice(tag);
744    out.extend_from_slice(ct);
745    Bytes::from(out)
746}
747
748/// v0.5 #29 + v0.5 #27: dispatch on the body's magic and decrypt under
749/// whichever source the caller supplied.
750///
751/// - `S4E1` / `S4E2` require `SseSource::Keyring` (return
752///   [`SseError::CustomerKeyRequired`] for `CustomerKey` — service.rs
753///   should map this to "extraneous SSE-C headers" 400).
754/// - `S4E3` requires `SseSource::CustomerKey` (return
755///   [`SseError::CustomerKeyUnexpected`] for `Keyring` — service.rs
756///   should map this to "missing SSE-C headers" 400).
757///
758/// Generic over `Into<SseSource>` so existing `decrypt(body, &keyring)`
759/// call sites compile unchanged via the `From<&SseKeyring>` impl above
760/// — only the new SSE-C path needs to type out
761/// `SseSource::CustomerKey { .. }`.
762///
763/// Distinct errors (`KeyNotInKeyring`, `DecryptFailed`,
764/// `WrongCustomerKey`) let operators tell rotation gaps, ciphertext
765/// tampering, and SSE-C key mismatch apart in audit logs.
766pub fn decrypt<'a, S: Into<SseSource<'a>>>(body: &[u8], source: S) -> Result<Bytes, SseError> {
767    let source = source.into();
768    // Outer short-check uses the smaller of the two header sizes
769    // (S4E1/S4E2 = 36 bytes). Anything below this can't be any valid
770    // SSE frame regardless of magic — keeps back-compat with v0.4 /
771    // v0.5 #29 callers that expected `TooShort` for absurdly short
772    // inputs even when the magic is garbage.
773    if body.len() < SSE_HEADER_BYTES {
774        return Err(SseError::TooShort { got: body.len() });
775    }
776    let mut magic = [0u8; 4];
777    magic.copy_from_slice(&body[..4]);
778    match &magic {
779        m if m == SSE_MAGIC_V1 || m == SSE_MAGIC_V2 => {
780            let keyring = match source {
781                SseSource::Keyring(kr) => kr,
782                SseSource::CustomerKey { .. } => return Err(SseError::CustomerKeyUnexpected),
783                // S4E1/E2 stored under the keyring → SseSource::Kms
784                // is just as nonsensical as CustomerKey here. Re-use
785                // the same "wrong source" error so service.rs can
786                // map both to AWS S3's "extraneous SSE-* headers"
787                // 400.
788                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
789            };
790            if m == SSE_MAGIC_V1 {
791                decrypt_v1_with_keyring(body, keyring)
792            } else {
793                decrypt_v2_with_keyring(body, keyring)
794            }
795        }
796        m if m == SSE_MAGIC_V3 => {
797            // S4E3 has a larger 49-byte header, so re-check.
798            if body.len() < SSE_HEADER_BYTES_V3 {
799                return Err(SseError::TooShort { got: body.len() });
800            }
801            let (key, key_md5) = match source {
802                SseSource::CustomerKey { key, key_md5 } => (key, key_md5),
803                SseSource::Keyring(_) => return Err(SseError::CustomerKeyRequired),
804                SseSource::Kms { .. } => return Err(SseError::CustomerKeyRequired),
805            };
806            decrypt_v3(body, key, key_md5)
807        }
808        m if m == SSE_MAGIC_V4 => {
809            // SSE-KMS unwrap is async (KMS round-trip required).
810            // Caller must dispatch to `decrypt_with_kms` after
811            // peeking the magic — surface this as a distinct error
812            // rather than silently failing.
813            Err(SseError::KmsAsyncRequired)
814        }
815        m if m == SSE_MAGIC_V5 => {
816            // v0.8 #52: S4E5 (chunked SSE-S4). Sync back-compat
817            // path — verifies + decrypts every chunk into a single
818            // Bytes. Callers that want true streaming (per-chunk
819            // emit) should use `decrypt_chunked_stream` instead.
820            // SSE-C and SSE-KMS sources are nonsensical here for
821            // the same reason as S4E2 (server-managed keyring only).
822            let keyring = match source {
823                SseSource::Keyring(kr) => kr,
824                SseSource::CustomerKey { .. } => {
825                    return Err(SseError::CustomerKeyUnexpected);
826                }
827                SseSource::Kms { .. } => return Err(SseError::CustomerKeyUnexpected),
828            };
829            decrypt_v5_buffered(body, keyring)
830        }
831        _ => Err(SseError::BadMagic { got: magic }),
832    }
833}
834
835fn decrypt_v3(
836    body: &[u8],
837    key: &[u8; KEY_LEN],
838    supplied_md5: &[u8; KEY_MD5_LEN],
839) -> Result<Bytes, SseError> {
840    let algo = body[4];
841    if algo != ALGO_AES_256_GCM {
842        return Err(SseError::UnsupportedAlgo { tag: algo });
843    }
844    let mut stored_md5 = [0u8; KEY_MD5_LEN];
845    stored_md5.copy_from_slice(&body[5..5 + KEY_MD5_LEN]);
846    // Cheap fingerprint check first — if the supplied key has a
847    // different MD5 than what was used at PUT, fail fast with a
848    // dedicated error. AES-GCM auth would also catch this (different
849    // key → bad tag) but the bespoke error gives operators an audit
850    // signal distinct from "ciphertext was tampered with".
851    if !constant_time_eq(supplied_md5, &stored_md5) {
852        return Err(SseError::WrongCustomerKey);
853    }
854    let nonce_off = 5 + KEY_MD5_LEN;
855    let tag_off = nonce_off + NONCE_LEN;
856    let mut nonce_bytes = [0u8; NONCE_LEN];
857    nonce_bytes.copy_from_slice(&body[nonce_off..nonce_off + NONCE_LEN]);
858    let mut tag_bytes = [0u8; TAG_LEN];
859    tag_bytes.copy_from_slice(&body[tag_off..tag_off + TAG_LEN]);
860    let ct = &body[SSE_HEADER_BYTES_V3..];
861
862    let aad = aad_v3(&stored_md5);
863    let nonce = Nonce::from_slice(&nonce_bytes);
864    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
865    ct_with_tag.extend_from_slice(ct);
866    ct_with_tag.extend_from_slice(&tag_bytes);
867
868    let aes_key = Key::<Aes256Gcm>::from_slice(key);
869    let cipher = Aes256Gcm::new(aes_key);
870    let plain = cipher
871        .decrypt(
872            nonce,
873            Payload {
874                msg: &ct_with_tag,
875                aad: &aad,
876            },
877        )
878        .map_err(|_| SseError::DecryptFailed)?;
879    Ok(Bytes::from(plain))
880}
881
882/// AAD for S4E4 = magic (4) + algo (1) + key_id_len (1) + key_id +
883/// wrapped_dek_len (4 BE) + wrapped_dek. Putting the variable-length
884/// key_id and wrapped_dek into the AAD means an attacker cannot
885/// rewrite either field to redirect the gateway to a different KEK
886/// or wrapped DEK without invalidating the body's AES-GCM tag.
887///
888/// Length-prefixing key_id and wrapped_dek inside the AAD prevents a
889/// canonicalisation ambiguity: without the length prefix, an
890/// attacker could shift bytes between the two fields and produce the
891/// same AAD bytestream, defeating the per-field tampering check.
892fn aad_v4(key_id: &[u8], wrapped_dek: &[u8]) -> Vec<u8> {
893    let mut aad = Vec::with_capacity(4 + 1 + 1 + key_id.len() + 4 + wrapped_dek.len());
894    aad.extend_from_slice(SSE_MAGIC_V4);
895    aad.push(ALGO_AES_256_GCM);
896    aad.push(key_id.len() as u8);
897    aad.extend_from_slice(key_id);
898    aad.extend_from_slice(&(wrapped_dek.len() as u32).to_be_bytes());
899    aad.extend_from_slice(wrapped_dek);
900    aad
901}
902
903fn encrypt_v4(plaintext: &[u8], dek: &[u8; KEY_LEN], wrapped: &WrappedDek) -> Bytes {
904    // Pre-conditions: key_id must fit in a u8 length prefix and be
905    // non-empty (an empty id means we wouldn't be able to re-fetch
906    // the KEK on GET). wrapped_dek length fits in u32 by the same
907    // logic — at u32::MAX bytes you have bigger problems. We assert
908    // these in debug and silently truncate-or-panic in release; in
909    // practice key_id is a UUID or ARN (<256 chars) and wrapped_dek
910    // is 60 bytes (LocalKms) or ~200 bytes (AWS KMS).
911    assert!(
912        !wrapped.key_id.is_empty() && wrapped.key_id.len() <= u8::MAX as usize,
913        "S4E4 key_id must be 1..=255 bytes (got {})",
914        wrapped.key_id.len()
915    );
916    assert!(
917        wrapped.ciphertext.len() <= u32::MAX as usize,
918        "S4E4 wrapped_dek longer than u32::MAX",
919    );
920
921    let aes_key = Key::<Aes256Gcm>::from_slice(dek);
922    let cipher = Aes256Gcm::new(aes_key);
923    let mut nonce_bytes = [0u8; NONCE_LEN];
924    rand::rngs::OsRng.fill_bytes(&mut nonce_bytes);
925    let nonce = Nonce::from_slice(&nonce_bytes);
926    let aad = aad_v4(wrapped.key_id.as_bytes(), &wrapped.ciphertext);
927    let ct_with_tag = cipher
928        .encrypt(
929            nonce,
930            Payload {
931                msg: plaintext,
932                aad: &aad,
933            },
934        )
935        .expect("aes-gcm encrypt cannot fail with a 32-byte key");
936    debug_assert!(ct_with_tag.len() >= TAG_LEN);
937    let split = ct_with_tag.len() - TAG_LEN;
938    let (ct, tag) = ct_with_tag.split_at(split);
939
940    let key_id_bytes = wrapped.key_id.as_bytes();
941    let mut out = Vec::with_capacity(
942        4 + 1 + 1 + key_id_bytes.len() + 4 + wrapped.ciphertext.len() + NONCE_LEN + TAG_LEN + ct.len(),
943    );
944    out.extend_from_slice(SSE_MAGIC_V4);
945    out.push(ALGO_AES_256_GCM);
946    out.push(key_id_bytes.len() as u8);
947    out.extend_from_slice(key_id_bytes);
948    out.extend_from_slice(&(wrapped.ciphertext.len() as u32).to_be_bytes());
949    out.extend_from_slice(&wrapped.ciphertext);
950    out.extend_from_slice(&nonce_bytes);
951    out.extend_from_slice(tag);
952    out.extend_from_slice(ct);
953    Bytes::from(out)
954}
955
956/// Parsed view of an S4E4 frame's variable-length header. Returned
957/// by [`parse_s4e4_header`] so both the async [`decrypt_with_kms`]
958/// path and any future inspection code (e.g. an admin tool that
959/// needs to enumerate object → KMS-key bindings) can reuse the same
960/// parser without re-implementing offset math.
961#[derive(Debug)]
962pub struct S4E4Header<'a> {
963    pub key_id: &'a str,
964    pub wrapped_dek: &'a [u8],
965    pub nonce: &'a [u8],
966    pub tag: &'a [u8],
967    pub ciphertext: &'a [u8],
968}
969
970/// Parse the (variable-length) S4E4 header. Pure byte-shuffling — no
971/// crypto, no KMS round-trip. Returns errors on truncation /
972/// out-of-bounds field lengths / non-UTF-8 key_id.
973pub fn parse_s4e4_header(body: &[u8]) -> Result<S4E4Header<'_>, SseError> {
974    // Minimum: magic(4) + algo(1) + key_id_len(1) + key_id(>=1) +
975    // wrapped_dek_len(4) + wrapped_dek(>=1) + nonce(12) + tag(16)
976    // = 40 bytes. We use a slightly looser floor here (bytes for
977    // empty fields = 38) and let the per-field bounds checks below
978    // catch the actual short reads.
979    const S4E4_MIN: usize = 4 + 1 + 1 + 4 + NONCE_LEN + TAG_LEN; // 38
980    if body.len() < S4E4_MIN {
981        return Err(SseError::KmsFrameTooShort {
982            got: body.len(),
983            min: S4E4_MIN,
984        });
985    }
986    let magic = &body[..4];
987    if magic != SSE_MAGIC_V4 {
988        let mut got = [0u8; 4];
989        got.copy_from_slice(magic);
990        return Err(SseError::BadMagic { got });
991    }
992    let algo = body[4];
993    if algo != ALGO_AES_256_GCM {
994        return Err(SseError::UnsupportedAlgo { tag: algo });
995    }
996    let key_id_len = body[5] as usize;
997    let key_id_off: usize = 6;
998    let key_id_end = key_id_off
999        .checked_add(key_id_len)
1000        .ok_or(SseError::KmsFrameFieldOob { what: "key_id_len" })?;
1001    if key_id_end + 4 > body.len() {
1002        return Err(SseError::KmsFrameFieldOob { what: "key_id" });
1003    }
1004    let key_id = std::str::from_utf8(&body[key_id_off..key_id_end])
1005        .map_err(|_| SseError::KmsKeyIdNotUtf8)?;
1006    let wrapped_len_off = key_id_end;
1007    let wrapped_dek_len = u32::from_be_bytes([
1008        body[wrapped_len_off],
1009        body[wrapped_len_off + 1],
1010        body[wrapped_len_off + 2],
1011        body[wrapped_len_off + 3],
1012    ]) as usize;
1013    let wrapped_off = wrapped_len_off + 4;
1014    let wrapped_end = wrapped_off
1015        .checked_add(wrapped_dek_len)
1016        .ok_or(SseError::KmsFrameFieldOob { what: "wrapped_dek_len" })?;
1017    if wrapped_end + NONCE_LEN + TAG_LEN > body.len() {
1018        return Err(SseError::KmsFrameFieldOob { what: "wrapped_dek" });
1019    }
1020    let wrapped_dek = &body[wrapped_off..wrapped_end];
1021    let nonce_off = wrapped_end;
1022    let tag_off = nonce_off + NONCE_LEN;
1023    let ct_off = tag_off + TAG_LEN;
1024    let nonce = &body[nonce_off..nonce_off + NONCE_LEN];
1025    let tag = &body[tag_off..tag_off + TAG_LEN];
1026    let ciphertext = &body[ct_off..];
1027    Ok(S4E4Header {
1028        key_id,
1029        wrapped_dek,
1030        nonce,
1031        tag,
1032        ciphertext,
1033    })
1034}
1035
1036/// Async decrypt for S4E4 (SSE-KMS) bodies. Caller supplies the KMS
1037/// backend; this function parses the frame, calls
1038/// `kms.decrypt_dek(...)` to unwrap the DEK, then runs AES-256-GCM
1039/// to recover the plaintext.
1040///
1041/// service.rs's GET handler should peek the magic with [`peek_magic`]
1042/// and dispatch:
1043///
1044/// - `Some("S4E4")` → `decrypt_with_kms(blob, &*kms).await`
1045/// - everything else → existing sync `decrypt(blob, source)`
1046///
1047/// Note: we don't go through `SseSource::Kms` here because the
1048/// wrapped DEK + key_id come from the frame itself, not from the
1049/// request — the `SseSource` is built for sync paths where the
1050/// caller already knows the key.
1051pub async fn decrypt_with_kms(
1052    body: &[u8],
1053    kms: &dyn KmsBackend,
1054) -> Result<Bytes, SseError> {
1055    let hdr = parse_s4e4_header(body)?;
1056    let wrapped = WrappedDek {
1057        key_id: hdr.key_id.to_string(),
1058        ciphertext: hdr.wrapped_dek.to_vec(),
1059    };
1060    let dek_vec = kms.decrypt_dek(&wrapped).await?;
1061    if dek_vec.len() != KEY_LEN {
1062        // KMS returned a non-32-byte plaintext. AES-256 needs exactly
1063        // 32 bytes. This shouldn't happen with `KeySpec=AES_256` but
1064        // surface as a backend error so it's auditable rather than
1065        // panicking.
1066        return Err(SseError::KmsBackend(KmsError::BackendUnavailable {
1067            message: format!(
1068                "KMS returned {} byte DEK; expected {KEY_LEN}",
1069                dek_vec.len()
1070            ),
1071        }));
1072    }
1073    let mut dek = [0u8; KEY_LEN];
1074    dek.copy_from_slice(&dek_vec);
1075
1076    let aad = aad_v4(hdr.key_id.as_bytes(), hdr.wrapped_dek);
1077    let aes_key = Key::<Aes256Gcm>::from_slice(&dek);
1078    let cipher = Aes256Gcm::new(aes_key);
1079    let nonce = Nonce::from_slice(hdr.nonce);
1080    let mut ct_with_tag = Vec::with_capacity(hdr.ciphertext.len() + TAG_LEN);
1081    ct_with_tag.extend_from_slice(hdr.ciphertext);
1082    ct_with_tag.extend_from_slice(hdr.tag);
1083    let plain = cipher
1084        .decrypt(
1085            nonce,
1086            Payload {
1087                msg: &ct_with_tag,
1088                aad: &aad,
1089            },
1090        )
1091        .map_err(|_| SseError::DecryptFailed)?;
1092    Ok(Bytes::from(plain))
1093}
1094
1095fn decrypt_v1_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1096    let algo = body[4];
1097    if algo != ALGO_AES_256_GCM {
1098        return Err(SseError::UnsupportedAlgo { tag: algo });
1099    }
1100    // body[5..8] reserved (must be ignored — v0.4 wrote zeros, but we
1101    // didn't auth them so we can't insist on it).
1102    let mut nonce_bytes = [0u8; NONCE_LEN];
1103    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1104    let mut tag_bytes = [0u8; TAG_LEN];
1105    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1106    let ct = &body[SSE_HEADER_BYTES..];
1107
1108    let aad = aad_v1();
1109    let nonce = Nonce::from_slice(&nonce_bytes);
1110    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1111    ct_with_tag.extend_from_slice(ct);
1112    ct_with_tag.extend_from_slice(&tag_bytes);
1113
1114    // Active key first, then any others. v0.4 deployments that flip to
1115    // v0.5 with their original key as active hit this path on the
1116    // first try for every legacy object.
1117    let (active_id, _active_key) = keyring.active();
1118    let mut ids: Vec<u16> = keyring.keys.keys().copied().collect();
1119    ids.sort_by_key(|id| if *id == active_id { 0 } else { 1 });
1120    for id in ids {
1121        let key = keyring.get(id).expect("id came from keyring iteration");
1122        let cipher = Aes256Gcm::new(key.as_aes_key());
1123        if let Ok(plain) = cipher.decrypt(
1124            nonce,
1125            Payload {
1126                msg: &ct_with_tag,
1127                aad: &aad,
1128            },
1129        ) {
1130            return Ok(Bytes::from(plain));
1131        }
1132    }
1133    Err(SseError::DecryptFailed)
1134}
1135
1136fn decrypt_v2_with_keyring(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1137    let algo = body[4];
1138    if algo != ALGO_AES_256_GCM {
1139        return Err(SseError::UnsupportedAlgo { tag: algo });
1140    }
1141    let key_id = u16::from_be_bytes([body[5], body[6]]);
1142    // body[7] reserved (1B), authenticated as 0 via AAD.
1143    let key = keyring
1144        .get(key_id)
1145        .ok_or(SseError::KeyNotInKeyring { id: key_id })?;
1146    let mut nonce_bytes = [0u8; NONCE_LEN];
1147    nonce_bytes.copy_from_slice(&body[8..8 + NONCE_LEN]);
1148    let mut tag_bytes = [0u8; TAG_LEN];
1149    tag_bytes.copy_from_slice(&body[8 + NONCE_LEN..SSE_HEADER_BYTES]);
1150    let ct = &body[SSE_HEADER_BYTES..];
1151
1152    let aad = aad_v2(key_id);
1153    let nonce = Nonce::from_slice(&nonce_bytes);
1154    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1155    ct_with_tag.extend_from_slice(ct);
1156    ct_with_tag.extend_from_slice(&tag_bytes);
1157    let cipher = Aes256Gcm::new(key.as_aes_key());
1158    let plain = cipher
1159        .decrypt(
1160            nonce,
1161            Payload {
1162                msg: &ct_with_tag,
1163                aad: &aad,
1164            },
1165        )
1166        .map_err(|_| SseError::DecryptFailed)?;
1167    Ok(Bytes::from(plain))
1168}
1169
1170/// Detect whether `body` is SSE-S4 encrypted (S4E1, S4E2, S4E3, or
1171/// S4E4) by sniffing the first 4 magic bytes. Used by the GET path
1172/// to decide whether to run decryption before frame parsing.
1173///
1174/// We require a length check that's safe for *any* of the four
1175/// frames — `SSE_HEADER_BYTES` (36) is the smallest valid header
1176/// (S4E1 / S4E2). S4E3 is 49 bytes; S4E4 is variable but always >=
1177/// 38 bytes. The per-frame decrypt path re-checks the appropriate
1178/// minimum, so this 36-byte gate is just a fast rejection of
1179/// obviously-too-short bodies.
1180pub fn looks_encrypted(body: &[u8]) -> bool {
1181    if body.len() < SSE_HEADER_BYTES {
1182        return false;
1183    }
1184    let m = &body[..4];
1185    m == SSE_MAGIC_V1
1186        || m == SSE_MAGIC_V2
1187        || m == SSE_MAGIC_V3
1188        || m == SSE_MAGIC_V4
1189        || m == SSE_MAGIC_V5
1190}
1191
1192/// Peek the SSE-S4 magic at the front of `body`, returning a
1193/// stringified frame variant identifier or `None` if `body` is not
1194/// recognized as SSE-S4. Used by the GET path to dispatch between
1195/// the sync [`decrypt`] (S4E1/E2/E3) and the async
1196/// [`decrypt_with_kms`] (S4E4).
1197///
1198/// Returns the same length-gated result as [`looks_encrypted`]: any
1199/// body shorter than `SSE_HEADER_BYTES` (36 bytes) returns `None`,
1200/// so the caller can use this as both the "is encrypted" signal and
1201/// the "which frame" signal in one cheap byte-comparison.
1202pub fn peek_magic(body: &[u8]) -> Option<&'static str> {
1203    if body.len() < SSE_HEADER_BYTES {
1204        return None;
1205    }
1206    match &body[..4] {
1207        m if m == SSE_MAGIC_V1 => Some("S4E1"),
1208        m if m == SSE_MAGIC_V2 => Some("S4E2"),
1209        m if m == SSE_MAGIC_V3 => Some("S4E3"),
1210        m if m == SSE_MAGIC_V4 => Some("S4E4"),
1211        // v0.8 #52: chunked SSE-S4. service.rs's GET handler
1212        // dispatches "S4E5" → `decrypt_chunked_stream` for true
1213        // streaming GET; the sync `decrypt(...)` also accepts S4E5
1214        // (back-compat — buffered concat).
1215        m if m == SSE_MAGIC_V5 => Some("S4E5"),
1216        _ => None,
1217    }
1218}
1219
1220pub type SharedSseKey = Arc<SseKey>;
1221
1222// ===========================================================================
1223// v0.8 #52 — S4E5: chunked variant of S4E2 for streaming GET
1224// ===========================================================================
1225//
1226// ## Wire format
1227//
1228// ```text
1229// magic         4B    "S4E5"
1230// algo          1B    0x01 (AES-256-GCM)
1231// key_id        2B    BE — keyring slot the active key was at PUT time
1232// reserved      1B    0x00
1233// chunk_size    4B    BE — plaintext bytes per chunk (final chunk may be smaller)
1234// chunk_count   4B    BE — total chunks (always >= 1; empty plaintext = 1 zero-byte chunk)
1235// salt          4B    random per-PUT, mixed into every nonce
1236// [chunk_count] × {
1237//   tag         16B   AES-GCM auth tag for this chunk
1238//   ciphertext  N B   chunk_size bytes (final chunk: 0..=chunk_size bytes)
1239// }
1240// ```
1241//
1242// Fixed header = 20 bytes ([`S4E5_HEADER_BYTES`]). Per-chunk overhead =
1243// 16 bytes (the tag — ciphertext is 1:1 with plaintext, AES-GCM is
1244// CTR-mode). Total overhead for an N-byte plaintext at chunk size C:
1245// `20 + ceil(N/C) * 16` bytes.
1246//
1247// ## Nonce derivation
1248//
1249// Each chunk's 12-byte AES-GCM nonce is built deterministically from
1250// the salt + chunk index (no per-chunk random nonce stored on disk):
1251//
1252// ```text
1253// nonce[0..4]  = b"E5\x00\x00"      (4-byte fixed magic-tag)
1254// nonce[4..8]  = salt               (4-byte random per-PUT)
1255// nonce[8..12] = chunk_index BE     (4-byte chunk number, 0..chunk_count)
1256// ```
1257//
1258// Within one PUT (= one salt) the chunk_index varies, so nonces are
1259// guaranteed unique per chunk. Across PUTs the salt is fresh
1260// randomness from `OsRng`. Birthday-collision risk on the 4-byte
1261// salt: ~50% at ~65,536 distinct PUTs under the same key. Deployments
1262// that exceed that should plan a key rotation (or wait for the
1263// follow-up issue widening the salt to 8 bytes).
1264//
1265// ## Per-chunk AAD
1266//
1267// AAD for chunk `i` of `n` total chunks: `b"S4E5"` || `algo` ||
1268// `i_BE_u32` || `n_BE_u32` || `key_id_BE_u16` || `salt`. Includes
1269// the chunk index so re-ordering or dropping chunks (which would
1270// shift their nonces around) fails the AES-GCM tag — i.e. an
1271// attacker cannot rearrange the on-disk chunk list to exfiltrate
1272// plaintext from a different position.
1273
1274/// Fixed header size of an S4E5 frame, before any chunks. `magic 4 +
1275/// algo 1 + key_id 2 + reserved 1 + chunk_size 4 + chunk_count 4 +
1276/// salt 4` = 20 bytes.
1277pub const S4E5_HEADER_BYTES: usize = 4 + 1 + 2 + 1 + 4 + 4 + 4; // = 20
1278
1279/// Per-chunk overhead inside an S4E5 frame: just the AES-GCM auth
1280/// tag. `ciphertext.len() == plaintext.len()` (CTR mode), so a chunk
1281/// of N plaintext bytes costs N + 16 on disk.
1282pub const S4E5_PER_CHUNK_OVERHEAD: usize = TAG_LEN; // = 16
1283
1284/// 4-byte fixed prefix of every S4E5 nonce. Distinct from the bytes
1285/// a random S4E1/E2 nonce could plausibly start with so debugging
1286/// dumps can immediately tell "this is a chunked nonce" from the
1287/// first 4 bytes.
1288const S4E5_NONCE_TAG: [u8; 4] = [b'E', b'5', 0, 0];
1289
1290/// Build the per-chunk AAD for an S4E5 chunk. Includes magic + algo
1291/// plus the structural chunk_index/total_chunks (so chunk reordering
1292/// fails auth) plus key_id + salt (so header tampering — flipping
1293/// key_id or salt — also fails auth).
1294fn aad_v5(
1295    chunk_index: u32,
1296    total_chunks: u32,
1297    key_id: u16,
1298    salt: &[u8; 4],
1299) -> [u8; 4 + 1 + 4 + 4 + 2 + 4] {
1300    let mut aad = [0u8; 4 + 1 + 4 + 4 + 2 + 4]; // = 19
1301    aad[..4].copy_from_slice(SSE_MAGIC_V5);
1302    aad[4] = ALGO_AES_256_GCM;
1303    aad[5..9].copy_from_slice(&chunk_index.to_be_bytes());
1304    aad[9..13].copy_from_slice(&total_chunks.to_be_bytes());
1305    aad[13..15].copy_from_slice(&key_id.to_be_bytes());
1306    aad[15..19].copy_from_slice(salt);
1307    aad
1308}
1309
1310/// Derive the 12-byte AES-GCM nonce for chunk `chunk_index` from the
1311/// per-PUT `salt`. Pure function; no RNG state — the same `(salt,
1312/// chunk_index)` always yields the same nonce, which is the whole
1313/// point: GET reads `salt` from the header and walks the chunks
1314/// without storing 12 bytes of nonce per chunk.
1315fn nonce_v5(salt: &[u8; 4], chunk_index: u32) -> [u8; NONCE_LEN] {
1316    let mut n = [0u8; NONCE_LEN];
1317    n[..4].copy_from_slice(&S4E5_NONCE_TAG);
1318    n[4..8].copy_from_slice(salt);
1319    n[8..12].copy_from_slice(&chunk_index.to_be_bytes());
1320    n
1321}
1322
1323/// v0.8 #52: encrypt `plaintext` under `keyring`'s active key,
1324/// sliced into independently-sealed AES-GCM chunks of `chunk_size`
1325/// plaintext bytes each. Returns the on-the-wire S4E5 frame.
1326///
1327/// Errors:
1328/// - [`SseError::ChunkSizeInvalid`] if `chunk_size == 0`.
1329///
1330/// Empty plaintext is permitted and produces a frame with
1331/// `chunk_count = 1, ciphertext_len = 0` (one all-tag chunk). That
1332/// keeps the GET chunk-walk loop simpler — it never has to
1333/// special-case zero chunks.
1334///
1335/// `chunk_size` is the *plaintext* bytes per chunk; the on-disk
1336/// ciphertext per chunk is the same number (AES-GCM is CTR-mode),
1337/// plus the 16-byte tag prepended.
1338pub fn encrypt_v2_chunked(
1339    plaintext: &[u8],
1340    keyring: &SseKeyring,
1341    chunk_size: usize,
1342) -> Result<Bytes, SseError> {
1343    if chunk_size == 0 {
1344        return Err(SseError::ChunkSizeInvalid);
1345    }
1346    let (key_id, key) = keyring.active();
1347    let cipher = Aes256Gcm::new(key.as_aes_key());
1348    let mut salt = [0u8; 4];
1349    rand::rngs::OsRng.fill_bytes(&mut salt);
1350
1351    // Always emit at least one chunk (so an empty plaintext still
1352    // has a well-defined header → chunk_count >= 1 invariant).
1353    let chunk_count: u32 = if plaintext.is_empty() {
1354        1
1355    } else {
1356        plaintext
1357            .len()
1358            .div_ceil(chunk_size)
1359            .try_into()
1360            .expect("chunk_count overflows u32 — plaintext > 16 EiB at min chunk_size")
1361    };
1362
1363    let mut out = Vec::with_capacity(
1364        S4E5_HEADER_BYTES + plaintext.len() + (chunk_count as usize * S4E5_PER_CHUNK_OVERHEAD),
1365    );
1366    out.extend_from_slice(SSE_MAGIC_V5);
1367    out.push(ALGO_AES_256_GCM);
1368    out.extend_from_slice(&key_id.to_be_bytes());
1369    out.push(0u8); // reserved
1370    out.extend_from_slice(&(chunk_size as u32).to_be_bytes());
1371    out.extend_from_slice(&chunk_count.to_be_bytes());
1372    out.extend_from_slice(&salt);
1373
1374    for i in 0..chunk_count {
1375        let off = (i as usize).saturating_mul(chunk_size);
1376        let end = off.saturating_add(chunk_size).min(plaintext.len());
1377        let chunk_pt: &[u8] = if off >= plaintext.len() {
1378            // Empty-plaintext / past-end (only the single-chunk
1379            // empty-plaintext case lands here).
1380            &[]
1381        } else {
1382            &plaintext[off..end]
1383        };
1384        let nonce_bytes = nonce_v5(&salt, i);
1385        let nonce = Nonce::from_slice(&nonce_bytes);
1386        let aad = aad_v5(i, chunk_count, key_id, &salt);
1387        let ct_with_tag = cipher
1388            .encrypt(
1389                nonce,
1390                Payload {
1391                    msg: chunk_pt,
1392                    aad: &aad,
1393                },
1394            )
1395            .expect("aes-gcm encrypt cannot fail with a 32-byte key");
1396        debug_assert!(ct_with_tag.len() >= TAG_LEN);
1397        let split = ct_with_tag.len() - TAG_LEN;
1398        let (ct, tag) = ct_with_tag.split_at(split);
1399        out.extend_from_slice(tag);
1400        out.extend_from_slice(ct);
1401        crate::metrics::record_sse_streaming_chunk("encrypt");
1402    }
1403    Ok(Bytes::from(out))
1404}
1405
1406/// Parsed S4E5 header — fixed-layout fields. Used by both the
1407/// buffered ([`decrypt_v5_buffered`]) and streaming
1408/// ([`decrypt_chunked_stream`]) paths to share frame validation.
1409#[derive(Debug, Clone, Copy)]
1410struct S4E5Header {
1411    key_id: u16,
1412    chunk_size: u32,
1413    chunk_count: u32,
1414    salt: [u8; 4],
1415    /// Byte offset where the chunk array starts (always
1416    /// [`S4E5_HEADER_BYTES`]; carried in the struct so future
1417    /// header extensions don't break callers).
1418    chunks_offset: usize,
1419}
1420
1421fn parse_s4e5_header(body: &[u8]) -> Result<S4E5Header, SseError> {
1422    if body.len() < S4E5_HEADER_BYTES {
1423        return Err(SseError::ChunkFrameTruncated { what: "header" });
1424    }
1425    if &body[..4] != SSE_MAGIC_V5 {
1426        let mut got = [0u8; 4];
1427        got.copy_from_slice(&body[..4]);
1428        return Err(SseError::BadMagic { got });
1429    }
1430    let algo = body[4];
1431    if algo != ALGO_AES_256_GCM {
1432        return Err(SseError::UnsupportedAlgo { tag: algo });
1433    }
1434    let key_id = u16::from_be_bytes([body[5], body[6]]);
1435    // body[7] = reserved (must be 0; authenticated as 0 via AAD).
1436    let chunk_size = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
1437    let chunk_count = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
1438    let mut salt = [0u8; 4];
1439    salt.copy_from_slice(&body[16..20]);
1440    if chunk_size == 0 {
1441        return Err(SseError::ChunkSizeInvalid);
1442    }
1443    if chunk_count == 0 {
1444        return Err(SseError::ChunkFrameTruncated {
1445            what: "chunk_count == 0",
1446        });
1447    }
1448    Ok(S4E5Header {
1449        key_id,
1450        chunk_size,
1451        chunk_count,
1452        salt,
1453        chunks_offset: S4E5_HEADER_BYTES,
1454    })
1455}
1456
1457/// Decrypt one S4E5 chunk. Used by both the buffered and streaming
1458/// paths so AAD / nonce derivation lives in exactly one place.
1459fn decrypt_v5_chunk(
1460    cipher: &Aes256Gcm,
1461    chunk_index: u32,
1462    chunk_count: u32,
1463    key_id: u16,
1464    salt: &[u8; 4],
1465    tag: &[u8; TAG_LEN],
1466    ct: &[u8],
1467) -> Result<Bytes, SseError> {
1468    let nonce_bytes = nonce_v5(salt, chunk_index);
1469    let nonce = Nonce::from_slice(&nonce_bytes);
1470    let aad = aad_v5(chunk_index, chunk_count, key_id, salt);
1471    let mut ct_with_tag = Vec::with_capacity(ct.len() + TAG_LEN);
1472    ct_with_tag.extend_from_slice(ct);
1473    ct_with_tag.extend_from_slice(tag);
1474    cipher
1475        .decrypt(
1476            nonce,
1477            Payload {
1478                msg: &ct_with_tag,
1479                aad: &aad,
1480            },
1481        )
1482        .map(Bytes::from)
1483        .map_err(|_| SseError::ChunkAuthFailed { chunk_index })
1484}
1485
1486/// Walk an S4E5 body chunk-by-chunk, calling `emit` on each
1487/// successfully-verified plaintext chunk. Returns immediately on the
1488/// first chunk that fails auth or is truncated. Shared core between
1489/// the buffered ([`decrypt_v5_buffered`]) and streaming
1490/// ([`decrypt_chunked_stream`]) paths.
1491fn walk_s4e5<F: FnMut(Bytes) -> Result<(), SseError>>(
1492    body: &[u8],
1493    keyring: &SseKeyring,
1494    mut emit: F,
1495) -> Result<(), SseError> {
1496    let hdr = parse_s4e5_header(body)?;
1497    let key = keyring
1498        .get(hdr.key_id)
1499        .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1500    let cipher = Aes256Gcm::new(key.as_aes_key());
1501
1502    let mut cursor = hdr.chunks_offset;
1503    let chunk_size = hdr.chunk_size as usize;
1504    for i in 0..hdr.chunk_count {
1505        if cursor + TAG_LEN > body.len() {
1506            return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1507        }
1508        let tag_off = cursor;
1509        let ct_off = tag_off + TAG_LEN;
1510        let is_last = i + 1 == hdr.chunk_count;
1511        let ct_len = if is_last {
1512            if ct_off > body.len() {
1513                return Err(SseError::ChunkFrameTruncated {
1514                    what: "final chunk ciphertext",
1515                });
1516            }
1517            let remaining = body.len() - ct_off;
1518            if remaining > chunk_size {
1519                return Err(SseError::ChunkFrameTruncated {
1520                    what: "trailing bytes after final chunk",
1521                });
1522            }
1523            remaining
1524        } else {
1525            chunk_size
1526        };
1527        let ct_end = ct_off + ct_len;
1528        if ct_end > body.len() {
1529            return Err(SseError::ChunkFrameTruncated {
1530                what: "chunk ciphertext",
1531            });
1532        }
1533        let mut tag = [0u8; TAG_LEN];
1534        tag.copy_from_slice(&body[tag_off..ct_off]);
1535        let ct = &body[ct_off..ct_end];
1536        let plain = decrypt_v5_chunk(
1537            &cipher,
1538            i,
1539            hdr.chunk_count,
1540            hdr.key_id,
1541            &hdr.salt,
1542            &tag,
1543            ct,
1544        )?;
1545        crate::metrics::record_sse_streaming_chunk("decrypt");
1546        emit(plain)?;
1547        cursor = ct_end;
1548    }
1549    if cursor != body.len() {
1550        return Err(SseError::ChunkFrameTruncated {
1551            what: "trailing bytes after declared chunk_count",
1552        });
1553    }
1554    Ok(())
1555}
1556
1557/// Sync back-compat path: decrypt every chunk and concatenate into
1558/// a single `Bytes`. Memory peak = full plaintext (defeats the
1559/// point of S4E5 streaming, but useful for callers that already
1560/// need the whole body — e.g. server-side restream-rewrite paths or
1561/// unit tests).
1562fn decrypt_v5_buffered(body: &[u8], keyring: &SseKeyring) -> Result<Bytes, SseError> {
1563    let hdr = parse_s4e5_header(body)?;
1564    let mut out = Vec::with_capacity(hdr.chunk_size as usize * hdr.chunk_count as usize);
1565    walk_s4e5(body, keyring, |chunk| {
1566        out.extend_from_slice(&chunk);
1567        Ok(())
1568    })?;
1569    Ok(Bytes::from(out))
1570}
1571
1572/// v0.8 #52: stream-decrypt API for S4E5 bodies. Returns a
1573/// [`futures::Stream`] that yields one `Bytes` per chunk in order.
1574/// Each chunk is emitted only after AES-GCM tag verify succeeds, so
1575/// the client never sees plaintext bytes that haven't been
1576/// authenticated. A failing chunk yields its
1577/// [`SseError::ChunkAuthFailed`] (with the chunk index) and ends
1578/// the stream — earlier chunks may already have left the gateway,
1579/// which matches the standard streaming-AEAD trade-off (operators
1580/// MUST alert on the audit log + metric, not rely on connection
1581/// close to guarantee atomicity).
1582///
1583/// Non-S4E5 magic surfaces as [`SseError::BadMagic`] /
1584/// [`SseError::ChunkFrameTruncated`] on the first poll — the stream
1585/// is "fail-fast" rather than "fall through to S4E2 buffered
1586/// decrypt", because the caller has already dispatched on
1587/// [`peek_magic`] by the time it hands a body to this function.
1588///
1589/// `body` is owned by the returned stream so the caller doesn't
1590/// need to keep the bytes alive separately. The returned stream is
1591/// `'static` — the `keyring` borrow is consumed up front to extract
1592/// the per-frame key and build the AES cipher (which owns its key
1593/// material), so the caller's keyring may be dropped immediately.
1594pub fn decrypt_chunked_stream(
1595    body: bytes::Bytes,
1596    keyring: &SseKeyring,
1597) -> impl futures::Stream<Item = Result<Bytes, SseError>> + 'static {
1598    use futures::stream::{self, StreamExt};
1599
1600    // Cheap pre-validation: parse the header + look up the key
1601    // once, up front, so a malformed frame surfaces on the first
1602    // poll instead of being deferred behind the first-chunk loop.
1603    // The `keyring` borrow ends here — we extract the AES key into
1604    // the owned `Aes256Gcm` cipher, then store that in the stream
1605    // state.
1606    let prelude = (|| {
1607        let hdr = parse_s4e5_header(&body)?;
1608        let key = keyring
1609            .get(hdr.key_id)
1610            .ok_or(SseError::KeyNotInKeyring { id: hdr.key_id })?;
1611        let cipher = Aes256Gcm::new(key.as_aes_key());
1612        Ok::<_, SseError>((hdr, cipher))
1613    })();
1614
1615    match prelude {
1616        Err(e) => stream::iter(std::iter::once(Err(e))).left_stream(),
1617        Ok((hdr, cipher)) => {
1618            let chunks_offset = hdr.chunks_offset;
1619            let state = ChunkedDecryptState {
1620                body,
1621                cipher,
1622                hdr,
1623                cursor: chunks_offset,
1624                next_index: 0,
1625            };
1626            stream::try_unfold(state, decrypt_next_chunk).right_stream()
1627        }
1628    }
1629}
1630
1631/// Per-stream state for [`decrypt_chunked_stream`]. Holds the owned
1632/// `body` (so the stream stays self-contained), the prepared
1633/// cipher, and the cursor position into the chunk array.
1634struct ChunkedDecryptState {
1635    body: bytes::Bytes,
1636    cipher: Aes256Gcm,
1637    hdr: S4E5Header,
1638    cursor: usize,
1639    next_index: u32,
1640}
1641
1642async fn decrypt_next_chunk(
1643    mut state: ChunkedDecryptState,
1644) -> Result<Option<(Bytes, ChunkedDecryptState)>, SseError> {
1645    if state.next_index >= state.hdr.chunk_count {
1646        // Final boundary check — anything past the declared
1647        // chunk_count would be a truncation / append attack.
1648        if state.cursor != state.body.len() {
1649            return Err(SseError::ChunkFrameTruncated {
1650                what: "trailing bytes after declared chunk_count",
1651            });
1652        }
1653        return Ok(None);
1654    }
1655    let i = state.next_index;
1656    let chunk_size = state.hdr.chunk_size as usize;
1657    if state.cursor + TAG_LEN > state.body.len() {
1658        return Err(SseError::ChunkFrameTruncated { what: "chunk tag" });
1659    }
1660    let tag_off = state.cursor;
1661    let ct_off = tag_off + TAG_LEN;
1662    let is_last = i + 1 == state.hdr.chunk_count;
1663    let ct_len = if is_last {
1664        if ct_off > state.body.len() {
1665            return Err(SseError::ChunkFrameTruncated {
1666                what: "final chunk ciphertext",
1667            });
1668        }
1669        let remaining = state.body.len() - ct_off;
1670        if remaining > chunk_size {
1671            return Err(SseError::ChunkFrameTruncated {
1672                what: "trailing bytes after final chunk",
1673            });
1674        }
1675        remaining
1676    } else {
1677        chunk_size
1678    };
1679    let ct_end = ct_off + ct_len;
1680    if ct_end > state.body.len() {
1681        return Err(SseError::ChunkFrameTruncated {
1682            what: "chunk ciphertext",
1683        });
1684    }
1685    let mut tag = [0u8; TAG_LEN];
1686    tag.copy_from_slice(&state.body[tag_off..ct_off]);
1687    let ct = &state.body[ct_off..ct_end];
1688    let plain = decrypt_v5_chunk(
1689        &state.cipher,
1690        i,
1691        state.hdr.chunk_count,
1692        state.hdr.key_id,
1693        &state.hdr.salt,
1694        &tag,
1695        ct,
1696    )?;
1697    crate::metrics::record_sse_streaming_chunk("decrypt");
1698    state.cursor = ct_end;
1699    state.next_index += 1;
1700    Ok(Some((plain, state)))
1701}
1702
1703#[cfg(test)]
1704mod tests {
1705    use super::*;
1706
1707    fn key32(seed: u8) -> Arc<SseKey> {
1708        Arc::new(SseKey::from_bytes(&[seed; 32]).unwrap())
1709    }
1710
1711    fn keyring_single(seed: u8) -> SseKeyring {
1712        SseKeyring::new(1, key32(seed))
1713    }
1714
1715    #[test]
1716    fn roundtrip_basic_v1() {
1717        // back-compat single-key API — still works.
1718        let k = SseKey::from_bytes(&[7u8; 32]).unwrap();
1719        let pt = b"the quick brown fox jumps over the lazy dog";
1720        let ct = encrypt(&k, pt);
1721        assert!(looks_encrypted(&ct));
1722        assert_eq!(&ct[..4], SSE_MAGIC_V1);
1723        assert_eq!(ct[4], ALGO_AES_256_GCM);
1724        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
1725        // decrypt via single-key keyring
1726        let kr = SseKeyring::new(1, Arc::new(k));
1727        let pt2 = decrypt(&ct, &kr).unwrap();
1728        assert_eq!(pt2.as_ref(), pt);
1729    }
1730
1731    #[test]
1732    fn s4e2_roundtrip_active_key() {
1733        let kr = keyring_single(7);
1734        let pt = b"S4E2 active-key roundtrip";
1735        let ct = encrypt_v2(pt, &kr);
1736        assert_eq!(&ct[..4], SSE_MAGIC_V2);
1737        assert_eq!(ct[4], ALGO_AES_256_GCM);
1738        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE");
1739        assert_eq!(ct[7], 0, "reserved byte");
1740        assert_eq!(ct.len(), SSE_HEADER_BYTES + pt.len());
1741        assert!(looks_encrypted(&ct));
1742        let pt2 = decrypt(&ct, &kr).unwrap();
1743        assert_eq!(pt2.as_ref(), pt);
1744    }
1745
1746    #[test]
1747    fn decrypt_s4e1_via_active_only_keyring() {
1748        // v0.4 wrote S4E1 with key K; v0.5 keyring has K as the only
1749        // (active) key. Decrypt must succeed.
1750        let k_arc = key32(11);
1751        let legacy_ct = encrypt(&k_arc, b"v0.4 vintage object");
1752        assert_eq!(&legacy_ct[..4], SSE_MAGIC_V1);
1753        let kr = SseKeyring::new(1, Arc::clone(&k_arc));
1754        let plain = decrypt(&legacy_ct, &kr).unwrap();
1755        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
1756    }
1757
1758    #[test]
1759    fn decrypt_s4e2_under_old_key_after_rotation() {
1760        // Rotation flow: object was encrypted under key id=1 when 1
1761        // was active. Operator rotates to active=2 and keeps 1 in the
1762        // keyring. The S4E2 body must still decrypt.
1763        let k1 = key32(1);
1764        let k2 = key32(2);
1765        let mut kr_old = SseKeyring::new(1, Arc::clone(&k1));
1766        let ct = encrypt_v2(b"old-rotation object", &kr_old);
1767        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
1768
1769        // After rotation: active=2, but key 1 still in ring.
1770        kr_old.add(2, Arc::clone(&k2));
1771        let mut kr_new = SseKeyring::new(2, Arc::clone(&k2));
1772        kr_new.add(1, Arc::clone(&k1));
1773
1774        let plain = decrypt(&ct, &kr_new).unwrap();
1775        assert_eq!(plain.as_ref(), b"old-rotation object");
1776
1777        // And new PUTs go to id 2 (active).
1778        let new_ct = encrypt_v2(b"new-rotation object", &kr_new);
1779        assert_eq!(u16::from_be_bytes([new_ct[5], new_ct[6]]), 2);
1780        let plain_new = decrypt(&new_ct, &kr_new).unwrap();
1781        assert_eq!(plain_new.as_ref(), b"new-rotation object");
1782    }
1783
1784    #[test]
1785    fn s4e2_unknown_key_id_errors() {
1786        let kr = keyring_single(3); // only id=1 present
1787        let kr_other = SseKeyring::new(99, key32(3));
1788        let ct = encrypt_v2(b"x", &kr_other); // body claims key_id=99
1789        let err = decrypt(&ct, &kr).unwrap_err();
1790        assert!(
1791            matches!(err, SseError::KeyNotInKeyring { id: 99 }),
1792            "got {err:?}"
1793        );
1794    }
1795
1796    #[test]
1797    fn s4e2_tampered_key_id_fails_auth() {
1798        let kr = SseKeyring::new(1, key32(4));
1799        let mut kr_with_2 = kr.clone();
1800        kr_with_2.add(2, key32(5)); // a real but wrong key under id=2
1801        let mut ct = encrypt_v2(b"do not flip my key id", &kr).to_vec();
1802        // Flip key_id from 1 → 2 in the header. The keyring HAS a key
1803        // for 2, so the lookup succeeds — but AAD authenticates the
1804        // original key_id, so AES-GCM tag verification must fail.
1805        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1);
1806        ct[5] = 0;
1807        ct[6] = 2;
1808        let err = decrypt(&ct, &kr_with_2).unwrap_err();
1809        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
1810    }
1811
1812    #[test]
1813    fn s4e2_tampered_ciphertext_fails() {
1814        let kr = SseKeyring::new(7, key32(9));
1815        let mut ct = encrypt_v2(b"secret message v2", &kr).to_vec();
1816        let last = ct.len() - 1;
1817        ct[last] ^= 0x01;
1818        let err = decrypt(&ct, &kr).unwrap_err();
1819        assert!(matches!(err, SseError::DecryptFailed));
1820    }
1821
1822    #[test]
1823    fn s4e2_tampered_algo_byte_fails() {
1824        let kr = SseKeyring::new(1, key32(2));
1825        let mut ct = encrypt_v2(b"hi", &kr).to_vec();
1826        ct[4] = 99;
1827        let err = decrypt(&ct, &kr).unwrap_err();
1828        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
1829    }
1830
1831    #[test]
1832    fn wrong_key_fails_v1_via_keyring() {
1833        // S4E1 written under key K1; keyring has only K2 → DecryptFailed.
1834        let k1 = SseKey::from_bytes(&[1u8; 32]).unwrap();
1835        let ct = encrypt(&k1, b"secret");
1836        let kr_wrong = SseKeyring::new(1, Arc::new(SseKey::from_bytes(&[2u8; 32]).unwrap()));
1837        let err = decrypt(&ct, &kr_wrong).unwrap_err();
1838        assert!(matches!(err, SseError::DecryptFailed));
1839    }
1840
1841    #[test]
1842    fn rejects_short_body() {
1843        let kr = SseKeyring::new(1, key32(1));
1844        let err = decrypt(b"short", &kr).unwrap_err();
1845        assert!(matches!(err, SseError::TooShort { got: 5 }));
1846    }
1847
1848    #[test]
1849    fn looks_encrypted_passthrough_returns_false() {
1850        // S4F2 frame magic, NOT S4E1 / S4E2 — must not be confused.
1851        let f2 = b"S4F2\x01\x00\x00\x00........................................";
1852        assert!(!looks_encrypted(f2));
1853        assert!(!looks_encrypted(b""));
1854    }
1855
1856    #[test]
1857    fn looks_encrypted_detects_both_v1_and_v2() {
1858        let kr = SseKeyring::new(1, key32(8));
1859        let v1 = encrypt(&SseKey::from_bytes(&[8u8; 32]).unwrap(), b"x");
1860        let v2 = encrypt_v2(b"x", &kr);
1861        assert!(looks_encrypted(&v1));
1862        assert!(looks_encrypted(&v2));
1863    }
1864
1865    #[test]
1866    fn key_from_hex_string() {
1867        let bad =
1868            SseKey::from_bytes(b"0102030405060708090a0b0c0d0e0f10111213141516171819202122232425")
1869                .unwrap_err();
1870        assert!(matches!(bad, SseError::BadKeyLength { .. }));
1871        let good = b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1872        let _ = SseKey::from_bytes(good).expect("64-char hex should parse");
1873    }
1874
1875    #[test]
1876    fn encrypt_v2_uses_random_nonce() {
1877        let kr = SseKeyring::new(1, key32(3));
1878        let pt = b"deterministic input";
1879        let a = encrypt_v2(pt, &kr);
1880        let b = encrypt_v2(pt, &kr);
1881        assert_ne!(a, b, "nonce must be random per-call");
1882    }
1883
1884    #[test]
1885    fn keyring_active_and_get() {
1886        let k1 = key32(1);
1887        let k2 = key32(2);
1888        let mut kr = SseKeyring::new(1, Arc::clone(&k1));
1889        kr.add(2, Arc::clone(&k2));
1890        let (id, active) = kr.active();
1891        assert_eq!(id, 1);
1892        assert_eq!(active.bytes, [1u8; 32]);
1893        assert!(kr.get(2).is_some());
1894        assert!(kr.get(3).is_none());
1895    }
1896
1897    // -----------------------------------------------------------------
1898    // v0.5 #27 — SSE-C (customer-provided key, S4E3 frame) tests
1899    // -----------------------------------------------------------------
1900
1901    use base64::Engine as _;
1902
1903    fn cust_key(seed: u8) -> CustomerKeyMaterial {
1904        let key = [seed; KEY_LEN];
1905        let key_md5 = compute_key_md5(&key);
1906        CustomerKeyMaterial { key, key_md5 }
1907    }
1908
1909    #[test]
1910    fn s4e3_roundtrip_happy_path() {
1911        let m = cust_key(42);
1912        let pt = b"top-secret SSE-C payload";
1913        let ct = encrypt_with_source(
1914            pt,
1915            SseSource::CustomerKey {
1916                key: &m.key,
1917                key_md5: &m.key_md5,
1918            },
1919        );
1920        // Frame inspection.
1921        assert_eq!(&ct[..4], SSE_MAGIC_V3);
1922        assert_eq!(ct[4], ALGO_AES_256_GCM);
1923        assert_eq!(&ct[5..5 + KEY_MD5_LEN], &m.key_md5);
1924        assert_eq!(ct.len(), SSE_HEADER_BYTES_V3 + pt.len());
1925        assert!(looks_encrypted(&ct));
1926        // Decrypt round-trip.
1927        let plain = decrypt(
1928            &ct,
1929            SseSource::CustomerKey {
1930                key: &m.key,
1931                key_md5: &m.key_md5,
1932            },
1933        )
1934        .unwrap();
1935        assert_eq!(plain.as_ref(), pt);
1936        // And via the From impl on &CustomerKeyMaterial.
1937        let plain2 = decrypt(&ct, &m).unwrap();
1938        assert_eq!(plain2.as_ref(), pt);
1939    }
1940
1941    #[test]
1942    fn s4e3_wrong_key_yields_wrong_customer_key_error() {
1943        let m = cust_key(1);
1944        let other = cust_key(2);
1945        let ct = encrypt_with_source(b"payload", (&m).into());
1946        let err = decrypt(
1947            &ct,
1948            SseSource::CustomerKey {
1949                key: &other.key,
1950                key_md5: &other.key_md5,
1951            },
1952        )
1953        .unwrap_err();
1954        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
1955    }
1956
1957    #[test]
1958    fn s4e3_tampered_stored_md5_is_caught() {
1959        // Attacker rewrites the stored MD5 to match a key they know.
1960        // Even though the supplied (attacker) key matches the rewritten
1961        // MD5, AES-GCM authenticates the ORIGINAL md5 via AAD, so the
1962        // tag check fails. Surface: WrongCustomerKey if the supplied
1963        // md5 != stored md5 (this test), or DecryptFailed if attacker
1964        // also rewrites their supplied md5 to match.
1965        let m = cust_key(7);
1966        let mut ct = encrypt_with_source(b"victim payload", (&m).into()).to_vec();
1967        // Flip a byte in the stored fingerprint.
1968        ct[5] ^= 0x55;
1969        // Client supplies the original (unmodified) key + md5.
1970        let err = decrypt(
1971            &ct,
1972            SseSource::CustomerKey {
1973                key: &m.key,
1974                key_md5: &m.key_md5,
1975            },
1976        )
1977        .unwrap_err();
1978        assert!(matches!(err, SseError::WrongCustomerKey), "got {err:?}");
1979    }
1980
1981    #[test]
1982    fn s4e3_tampered_md5_with_matching_supplied_md5_fails_aead() {
1983        // Both stored md5 AND supplied md5 are flipped to the same bogus
1984        // value. The fingerprint check passes (they match) but AAD
1985        // authenticates the *original* md5, so AES-GCM fails.
1986        let m = cust_key(3);
1987        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
1988        ct[5] ^= 0xFF;
1989        let mut bogus_md5 = m.key_md5;
1990        bogus_md5[0] ^= 0xFF;
1991        let err = decrypt(
1992            &ct,
1993            SseSource::CustomerKey {
1994                key: &m.key,
1995                key_md5: &bogus_md5,
1996            },
1997        )
1998        .unwrap_err();
1999        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2000    }
2001
2002    #[test]
2003    fn s4e3_tampered_ciphertext_fails_aead() {
2004        let m = cust_key(8);
2005        let mut ct = encrypt_with_source(b"sealed message", (&m).into()).to_vec();
2006        let last = ct.len() - 1;
2007        ct[last] ^= 0x01;
2008        let err = decrypt(&ct, &m).unwrap_err();
2009        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2010    }
2011
2012    #[test]
2013    fn s4e3_tampered_algo_byte_rejected() {
2014        let m = cust_key(9);
2015        let mut ct = encrypt_with_source(b"x", (&m).into()).to_vec();
2016        ct[4] = 99;
2017        let err = decrypt(&ct, &m).unwrap_err();
2018        assert!(matches!(err, SseError::UnsupportedAlgo { tag: 99 }));
2019    }
2020
2021    #[test]
2022    fn s4e3_uses_random_nonce() {
2023        let m = cust_key(10);
2024        let a = encrypt_with_source(b"deterministic input", (&m).into());
2025        let b = encrypt_with_source(b"deterministic input", (&m).into());
2026        assert_ne!(a, b, "nonce must be random per-call");
2027    }
2028
2029    #[test]
2030    fn parse_customer_key_headers_happy_path() {
2031        let key = [11u8; KEY_LEN];
2032        let md5 = compute_key_md5(&key);
2033        let key_b64 = base64::engine::general_purpose::STANDARD.encode(key);
2034        let md5_b64 = base64::engine::general_purpose::STANDARD.encode(md5);
2035        let m = parse_customer_key_headers("AES256", &key_b64, &md5_b64).unwrap();
2036        assert_eq!(m.key, key);
2037        assert_eq!(m.key_md5, md5);
2038    }
2039
2040    #[test]
2041    fn parse_customer_key_headers_rejects_wrong_algorithm() {
2042        let key = [1u8; KEY_LEN];
2043        let md5 = compute_key_md5(&key);
2044        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2045        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2046        let err = parse_customer_key_headers("AES128", &kb, &mb).unwrap_err();
2047        assert!(
2048            matches!(err, SseError::CustomerKeyAlgorithmUnsupported { ref algo } if algo == "AES128"),
2049            "got {err:?}"
2050        );
2051        // Lowercase variant still rejected (AWS S3 accepts only "AES256").
2052        let err2 = parse_customer_key_headers("aes256", &kb, &mb).unwrap_err();
2053        assert!(
2054            matches!(err2, SseError::CustomerKeyAlgorithmUnsupported { .. }),
2055            "got {err2:?}"
2056        );
2057    }
2058
2059    #[test]
2060    fn parse_customer_key_headers_rejects_wrong_key_length() {
2061        let short_key = vec![5u8; 16]; // half-length AES key
2062        let md5 = compute_key_md5(&short_key);
2063        let kb = base64::engine::general_purpose::STANDARD.encode(&short_key);
2064        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2065        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2066        assert!(
2067            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("key length")),
2068            "got {err:?}"
2069        );
2070    }
2071
2072    #[test]
2073    fn parse_customer_key_headers_rejects_wrong_md5_length() {
2074        let key = [3u8; KEY_LEN];
2075        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2076        // Truncated MD5 (15 bytes instead of 16).
2077        let bad_md5 = vec![0u8; 15];
2078        let mb = base64::engine::general_purpose::STANDARD.encode(bad_md5);
2079        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2080        assert!(
2081            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 length")),
2082            "got {err:?}"
2083        );
2084    }
2085
2086    #[test]
2087    fn parse_customer_key_headers_rejects_md5_mismatch() {
2088        let key = [4u8; KEY_LEN];
2089        let other = [5u8; KEY_LEN];
2090        let kb = base64::engine::general_purpose::STANDARD.encode(key);
2091        let wrong_md5 = compute_key_md5(&other);
2092        let mb = base64::engine::general_purpose::STANDARD.encode(wrong_md5);
2093        let err = parse_customer_key_headers("AES256", &kb, &mb).unwrap_err();
2094        assert!(
2095            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("MD5 does not match")),
2096            "got {err:?}"
2097        );
2098    }
2099
2100    #[test]
2101    fn parse_customer_key_headers_rejects_bad_base64() {
2102        let valid_key = [0u8; KEY_LEN];
2103        let md5 = compute_key_md5(&valid_key);
2104        let mb = base64::engine::general_purpose::STANDARD.encode(md5);
2105        let err = parse_customer_key_headers("AES256", "!!!not-base64!!!", &mb).unwrap_err();
2106        assert!(
2107            matches!(err, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2108            "got {err:?}"
2109        );
2110        // Bad MD5 base64.
2111        let kb = base64::engine::general_purpose::STANDARD.encode(valid_key);
2112        let err2 = parse_customer_key_headers("AES256", &kb, "??not-base64??").unwrap_err();
2113        assert!(
2114            matches!(err2, SseError::InvalidCustomerKey { reason } if reason.contains("base64")),
2115            "got {err2:?}"
2116        );
2117    }
2118
2119    #[test]
2120    fn parse_customer_key_headers_trims_whitespace() {
2121        // S3 SDKs sometimes pad headers with trailing newlines.
2122        let key = [12u8; KEY_LEN];
2123        let md5 = compute_key_md5(&key);
2124        let kb = format!(
2125            "  {}\n",
2126            base64::engine::general_purpose::STANDARD.encode(key)
2127        );
2128        let mb = format!(
2129            "\t{}  ",
2130            base64::engine::general_purpose::STANDARD.encode(md5)
2131        );
2132        let m = parse_customer_key_headers("AES256", &kb, &mb).unwrap();
2133        assert_eq!(m.key, key);
2134    }
2135
2136    // -----------------------------------------------------------------
2137    // Back-compat + cross-source mixing
2138    // -----------------------------------------------------------------
2139
2140    #[test]
2141    fn back_compat_decrypt_s4e1_with_keyring_source() {
2142        let k = key32(33);
2143        let legacy_ct = encrypt(&k, b"v0.4 vintage object");
2144        let kr = SseKeyring::new(1, Arc::clone(&k));
2145        // Both call styles must work — `&kr` (back-compat) and
2146        // `SseSource::Keyring(&kr)` (explicit).
2147        let plain = decrypt(&legacy_ct, &kr).unwrap();
2148        assert_eq!(plain.as_ref(), b"v0.4 vintage object");
2149        let plain2 = decrypt(&legacy_ct, SseSource::Keyring(&kr)).unwrap();
2150        assert_eq!(plain2.as_ref(), b"v0.4 vintage object");
2151    }
2152
2153    #[test]
2154    fn back_compat_decrypt_s4e2_with_keyring_source() {
2155        let kr = keyring_single(34);
2156        let ct = encrypt_v2(b"v0.5 #29 object", &kr);
2157        let plain = decrypt(&ct, &kr).unwrap();
2158        assert_eq!(plain.as_ref(), b"v0.5 #29 object");
2159        // encrypt_with_source(Keyring) should produce the same wire
2160        // format (S4E2).
2161        let ct2 = encrypt_with_source(b"v0.5 #29 object", SseSource::Keyring(&kr));
2162        assert_eq!(&ct2[..4], SSE_MAGIC_V2);
2163        let plain2 = decrypt(&ct2, &kr).unwrap();
2164        assert_eq!(plain2.as_ref(), b"v0.5 #29 object");
2165    }
2166
2167    #[test]
2168    fn s4e2_blob_with_customer_key_source_is_rejected() {
2169        // An object stored with SSE-S4 (S4E2) but a client sending
2170        // SSE-C headers on the GET — this is a misuse, surface as
2171        // CustomerKeyUnexpected so service.rs can return 400.
2172        let kr = keyring_single(50);
2173        let ct = encrypt_v2(b"server-managed object", &kr);
2174        let m = cust_key(99);
2175        let err = decrypt(
2176            &ct,
2177            SseSource::CustomerKey {
2178                key: &m.key,
2179                key_md5: &m.key_md5,
2180            },
2181        )
2182        .unwrap_err();
2183        assert!(matches!(err, SseError::CustomerKeyUnexpected), "got {err:?}");
2184    }
2185
2186    #[test]
2187    fn s4e3_blob_with_keyring_source_is_rejected() {
2188        // Inverse: object is SSE-C (S4E3) but client forgot to send
2189        // SSE-C headers. Service.rs should map this to 400.
2190        let m = cust_key(60);
2191        let ct = encrypt_with_source(b"customer-key object", (&m).into());
2192        let kr = keyring_single(60);
2193        let err = decrypt(&ct, &kr).unwrap_err();
2194        assert!(matches!(err, SseError::CustomerKeyRequired), "got {err:?}");
2195    }
2196
2197    #[test]
2198    fn looks_encrypted_detects_s4e3() {
2199        let m = cust_key(13);
2200        let ct = encrypt_with_source(b"x", (&m).into());
2201        assert!(looks_encrypted(&ct));
2202    }
2203
2204    #[test]
2205    fn s4e3_rejects_short_body() {
2206        // 36 bytes passes the looks_encrypted gate but is shorter than
2207        // S4E3's 49-byte header.
2208        let mut short = Vec::new();
2209        short.extend_from_slice(SSE_MAGIC_V3);
2210        short.push(ALGO_AES_256_GCM);
2211        // Padding to 36 bytes (SSE_HEADER_BYTES) so the outer length
2212        // check passes but the S4E3 inner check fails.
2213        short.extend_from_slice(&[0u8; SSE_HEADER_BYTES - 5]);
2214        assert_eq!(short.len(), SSE_HEADER_BYTES);
2215        let m = cust_key(1);
2216        let err = decrypt(
2217            &short,
2218            SseSource::CustomerKey {
2219                key: &m.key,
2220                key_md5: &m.key_md5,
2221            },
2222        )
2223        .unwrap_err();
2224        assert!(matches!(err, SseError::TooShort { .. }), "got {err:?}");
2225    }
2226
2227    #[test]
2228    fn customer_key_material_debug_redacts_key() {
2229        let m = cust_key(99);
2230        let s = format!("{m:?}");
2231        assert!(s.contains("redacted"));
2232        assert!(!s.contains(&format!("{:?}", m.key.as_slice())));
2233    }
2234
2235    #[test]
2236    fn constant_time_eq_basic() {
2237        assert!(constant_time_eq(b"abc", b"abc"));
2238        assert!(!constant_time_eq(b"abc", b"abd"));
2239        assert!(!constant_time_eq(b"abc", b"abcd"));
2240        assert!(constant_time_eq(b"", b""));
2241    }
2242
2243    #[test]
2244    fn compute_key_md5_known_vector() {
2245        // Empty input MD5 is known: d41d8cd98f00b204e9800998ecf8427e.
2246        let got = compute_key_md5(b"");
2247        let expected_hex = "d41d8cd98f00b204e9800998ecf8427e";
2248        assert_eq!(hex_lower(&got), expected_hex);
2249    }
2250
2251    // -----------------------------------------------------------------
2252    // v0.5 #28 — SSE-KMS envelope (S4E4) tests
2253    // -----------------------------------------------------------------
2254
2255    use crate::kms::{KmsBackend, LocalKms};
2256    use std::collections::HashMap;
2257    use std::path::PathBuf;
2258
2259    fn local_kms_with(key_ids: &[(&str, [u8; 32])]) -> LocalKms {
2260        let mut keks: HashMap<String, [u8; 32]> = HashMap::new();
2261        for (id, k) in key_ids {
2262            keks.insert((*id).to_string(), *k);
2263        }
2264        LocalKms::from_keks(PathBuf::from("/tmp/none"), keks)
2265    }
2266
2267    #[tokio::test]
2268    async fn s4e4_roundtrip_via_local_kms() {
2269        let kms = local_kms_with(&[("alpha", [42u8; 32])]);
2270        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2271        let mut dek = [0u8; 32];
2272        dek.copy_from_slice(&dek_vec);
2273        let pt = b"SSE-KMS envelope payload across the S4E4 frame";
2274        let ct = encrypt_with_source(
2275            pt,
2276            SseSource::Kms {
2277                dek: &dek,
2278                wrapped: &wrapped,
2279            },
2280        );
2281        // Frame inspection.
2282        assert_eq!(&ct[..4], SSE_MAGIC_V4);
2283        assert_eq!(ct[4], ALGO_AES_256_GCM);
2284        let key_id_len = ct[5] as usize;
2285        assert_eq!(key_id_len, "alpha".len());
2286        assert_eq!(&ct[6..6 + key_id_len], b"alpha");
2287        // peek_magic + looks_encrypted both recognise S4E4.
2288        assert!(looks_encrypted(&ct));
2289        assert_eq!(peek_magic(&ct), Some("S4E4"));
2290        // Async decrypt round-trip.
2291        let plain = decrypt_with_kms(&ct, &kms).await.unwrap();
2292        assert_eq!(plain.as_ref(), pt);
2293    }
2294
2295    #[tokio::test]
2296    async fn s4e4_tampered_key_id_fails_aead() {
2297        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2298        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2299        let mut dek = [0u8; 32];
2300        dek.copy_from_slice(&dek_vec);
2301        let mut ct = encrypt_with_source(
2302            b"do not redirect",
2303            SseSource::Kms {
2304                dek: &dek,
2305                wrapped: &wrapped,
2306            },
2307        )
2308        .to_vec();
2309        // Flip the key_id from "alpha" to "betaa" by changing the
2310        // first byte of the key_id field. The forged id "bltha" is
2311        // not in the KMS, so unwrap fails with KeyNotFound surfaced
2312        // through KmsBackend(KmsError::KeyNotFound).
2313        let key_id_off = 6;
2314        ct[key_id_off] = b'b';
2315        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2316        assert!(
2317            matches!(
2318                err,
2319                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2320                    | SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2321            ),
2322            "got {err:?}"
2323        );
2324    }
2325
2326    #[tokio::test]
2327    async fn s4e4_tampered_key_id_to_real_other_id_still_fails() {
2328        // Wrap under "alpha" but rewrite the stored key_id to "beta"
2329        // (which IS in the KMS). KmsBackend will try to unwrap with
2330        // beta's KEK and AAD = "beta", but the wrapped bytes were
2331        // produced with alpha's KEK + AAD = "alpha", so the local
2332        // KMS unwrap fails with UnwrapFailed.
2333        let kms = local_kms_with(&[("alpha", [1u8; 32]), ("beta", [2u8; 32])]);
2334        let (dek_vec, wrapped) = kms.generate_dek("alpha").await.unwrap();
2335        let mut dek = [0u8; 32];
2336        dek.copy_from_slice(&dek_vec);
2337        let mut ct = encrypt_with_source(
2338            b"redirect attempt",
2339            SseSource::Kms {
2340                dek: &dek,
2341                wrapped: &wrapped,
2342            },
2343        )
2344        .to_vec();
2345        // Both "alpha" and "beta" are 5 chars long so the rewrite
2346        // doesn't shift any other field offsets.
2347        let key_id_off = 6;
2348        ct[key_id_off..key_id_off + 5].copy_from_slice(b"beta_");
2349        // Trim back to 4-byte "beta" by also shrinking the length
2350        // prefix would change downstream offsets — instead pad the
2351        // forged id to keep length stable. This mirrors the realistic
2352        // tampering surface (attacker can flip bytes but not change
2353        // the on-disk layout). The KMS now sees key_id "beta_" which
2354        // is unknown → KeyNotFound.
2355        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2356        assert!(
2357            matches!(
2358                err,
2359                SseError::KmsBackend(crate::kms::KmsError::KeyNotFound { .. })
2360            ),
2361            "got {err:?}"
2362        );
2363    }
2364
2365    #[tokio::test]
2366    async fn s4e4_tampered_wrapped_dek_fails_unwrap() {
2367        let kms = local_kms_with(&[("k", [3u8; 32])]);
2368        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2369        let mut dek = [0u8; 32];
2370        dek.copy_from_slice(&dek_vec);
2371        let mut ct = encrypt_with_source(
2372            b"target body",
2373            SseSource::Kms {
2374                dek: &dek,
2375                wrapped: &wrapped,
2376            },
2377        )
2378        .to_vec();
2379        // Locate the wrapped_dek_len + wrapped_dek field and flip a
2380        // byte in the middle of the wrapped DEK. AES-GCM auth on the
2381        // wrap fails → KmsBackend(UnwrapFailed).
2382        let key_id_len = ct[5] as usize;
2383        let wrapped_len_off = 6 + key_id_len;
2384        let wrapped_off = wrapped_len_off + 4;
2385        let mid = wrapped_off + (wrapped.ciphertext.len() / 2);
2386        ct[mid] ^= 0xFF;
2387        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2388        assert!(
2389            matches!(
2390                err,
2391                SseError::KmsBackend(crate::kms::KmsError::UnwrapFailed { .. })
2392            ),
2393            "got {err:?}"
2394        );
2395    }
2396
2397    #[tokio::test]
2398    async fn s4e4_tampered_ciphertext_fails_aead() {
2399        let kms = local_kms_with(&[("k", [4u8; 32])]);
2400        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2401        let mut dek = [0u8; 32];
2402        dek.copy_from_slice(&dek_vec);
2403        let mut ct = encrypt_with_source(
2404            b"sealed body",
2405            SseSource::Kms {
2406                dek: &dek,
2407                wrapped: &wrapped,
2408            },
2409        )
2410        .to_vec();
2411        let last = ct.len() - 1;
2412        ct[last] ^= 0x01;
2413        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2414        assert!(matches!(err, SseError::DecryptFailed), "got {err:?}");
2415    }
2416
2417    #[tokio::test]
2418    async fn s4e4_uses_random_nonce_and_dek_per_put() {
2419        let kms = local_kms_with(&[("k", [5u8; 32])]);
2420        // Two PUTs of the same plaintext under the same KEK must
2421        // produce different ciphertexts (fresh DEK + fresh nonce).
2422        let (dek1_vec, wrapped1) = kms.generate_dek("k").await.unwrap();
2423        let (dek2_vec, wrapped2) = kms.generate_dek("k").await.unwrap();
2424        let mut dek1 = [0u8; 32];
2425        dek1.copy_from_slice(&dek1_vec);
2426        let mut dek2 = [0u8; 32];
2427        dek2.copy_from_slice(&dek2_vec);
2428        let pt = b"deterministic input";
2429        let a = encrypt_with_source(
2430            pt,
2431            SseSource::Kms {
2432                dek: &dek1,
2433                wrapped: &wrapped1,
2434            },
2435        );
2436        let b = encrypt_with_source(
2437            pt,
2438            SseSource::Kms {
2439                dek: &dek2,
2440                wrapped: &wrapped2,
2441            },
2442        );
2443        assert_ne!(a, b);
2444        // Both still decrypt round-trip.
2445        let plain_a = decrypt_with_kms(&a, &kms).await.unwrap();
2446        let plain_b = decrypt_with_kms(&b, &kms).await.unwrap();
2447        assert_eq!(plain_a.as_ref(), pt);
2448        assert_eq!(plain_b.as_ref(), pt);
2449    }
2450
2451    #[tokio::test]
2452    async fn s4e4_sync_decrypt_returns_kms_async_required() {
2453        // The whole point of KmsAsyncRequired: passing an S4E4 body
2454        // to the sync `decrypt` function must surface a distinct
2455        // error so service.rs's GET path notices the bug rather than
2456        // returning a generic "wrong source" 400.
2457        let kms = local_kms_with(&[("k", [6u8; 32])]);
2458        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2459        let mut dek = [0u8; 32];
2460        dek.copy_from_slice(&dek_vec);
2461        let ct = encrypt_with_source(
2462            b"async only",
2463            SseSource::Kms {
2464                dek: &dek,
2465                wrapped: &wrapped,
2466            },
2467        );
2468        // Try via Keyring source (the default sync path).
2469        let kr = SseKeyring::new(1, key32(0));
2470        let err = decrypt(&ct, &kr).unwrap_err();
2471        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2472    }
2473
2474    #[test]
2475    fn back_compat_s4e1_e2_e3_still_decrypt_via_sync() {
2476        // After adding S4E4, the sync `decrypt` path must still
2477        // handle every legacy frame variant unchanged.
2478        let k = key32(7);
2479        let v1 = encrypt(&k, b"v0.4 vintage");
2480        let kr = SseKeyring::new(1, Arc::clone(&k));
2481        assert_eq!(decrypt(&v1, &kr).unwrap().as_ref(), b"v0.4 vintage");
2482
2483        let v2 = encrypt_v2(b"v0.5 #29 vintage", &kr);
2484        assert_eq!(
2485            decrypt(&v2, &kr).unwrap().as_ref(),
2486            b"v0.5 #29 vintage"
2487        );
2488
2489        let m = cust_key(7);
2490        let v3 = encrypt_with_source(b"v0.5 #27 vintage", (&m).into());
2491        assert_eq!(
2492            decrypt(&v3, &m).unwrap().as_ref(),
2493            b"v0.5 #27 vintage"
2494        );
2495    }
2496
2497    #[test]
2498    fn peek_magic_distinguishes_all_variants() {
2499        // S4E1 / S4E2 / S4E3 — built from real encrypts so the
2500        // length gate also passes.
2501        let k = key32(9);
2502        let v1 = encrypt(&k, b"x");
2503        assert_eq!(peek_magic(&v1), Some("S4E1"));
2504        let kr = SseKeyring::new(1, Arc::clone(&k));
2505        let v2 = encrypt_v2(b"x", &kr);
2506        assert_eq!(peek_magic(&v2), Some("S4E2"));
2507        let m = cust_key(9);
2508        let v3 = encrypt_with_source(b"x", (&m).into());
2509        assert_eq!(peek_magic(&v3), Some("S4E3"));
2510        // Synthetic S4E4 magic with enough trailing bytes to clear
2511        // the 36-byte length gate. peek_magic does NOT validate the
2512        // S4E4 inner header, just the magic — that's the contract
2513        // (cheap dispatch signal).
2514        let mut v4 = Vec::new();
2515        v4.extend_from_slice(SSE_MAGIC_V4);
2516        v4.extend_from_slice(&[0u8; 40]);
2517        assert_eq!(peek_magic(&v4), Some("S4E4"));
2518        // Unknown magic / too-short input → None.
2519        assert!(peek_magic(b"NOPE").is_none());
2520        assert!(peek_magic(b"short").is_none());
2521        assert!(peek_magic(&[0u8; 100]).is_none());
2522    }
2523
2524    #[tokio::test]
2525    async fn s4e4_truncated_frame_errors_cleanly() {
2526        // Truncate to less than the minimum header. Must surface
2527        // KmsFrameTooShort, not panic, not return BadMagic.
2528        let truncated = b"S4E4\x01\x05hi";
2529        let kms = local_kms_with(&[("k", [1u8; 32])]);
2530        let err = decrypt_with_kms(truncated, &kms).await.unwrap_err();
2531        assert!(
2532            matches!(err, SseError::KmsFrameTooShort { .. }),
2533            "got {err:?}"
2534        );
2535    }
2536
2537    #[tokio::test]
2538    async fn s4e4_oob_key_id_len_errors() {
2539        // Build a body that claims key_id_len = 200 but only has 4
2540        // bytes after the length prefix. parse_s4e4_header must
2541        // refuse with KmsFrameFieldOob, not slice-panic.
2542        let mut body = Vec::new();
2543        body.extend_from_slice(SSE_MAGIC_V4);
2544        body.push(ALGO_AES_256_GCM);
2545        body.push(200u8); // key_id_len
2546        // Remaining bytes < 200; pad to clear the looks_encrypted
2547        // floor (36 bytes) but stay short of the claimed key_id +
2548        // wrapped_dek_len + nonce + tag layout.
2549        body.extend_from_slice(&[0u8; 50]);
2550        let kms = local_kms_with(&[("k", [1u8; 32])]);
2551        let err = decrypt_with_kms(&body, &kms).await.unwrap_err();
2552        assert!(
2553            matches!(err, SseError::KmsFrameFieldOob { .. }),
2554            "got {err:?}"
2555        );
2556    }
2557
2558    #[tokio::test]
2559    async fn s4e4_via_keyring_source_into_sync_decrypt_is_kms_async_required() {
2560        // S4E4 + Keyring source: sync decrypt sees the S4E4 magic
2561        // first and returns KmsAsyncRequired regardless of source —
2562        // the source mismatch never gets a chance to surface, which
2563        // is the right behaviour (caller's bug is "didn't peek
2564        // magic" not "wrong source").
2565        let kms = local_kms_with(&[("k", [9u8; 32])]);
2566        let (dek_vec, wrapped) = kms.generate_dek("k").await.unwrap();
2567        let mut dek = [0u8; 32];
2568        dek.copy_from_slice(&dek_vec);
2569        let ct = encrypt_with_source(
2570            b"x",
2571            SseSource::Kms {
2572                dek: &dek,
2573                wrapped: &wrapped,
2574            },
2575        );
2576        let m = cust_key(1);
2577        let err = decrypt(&ct, &m).unwrap_err();
2578        assert!(matches!(err, SseError::KmsAsyncRequired), "got {err:?}");
2579    }
2580
2581    #[tokio::test]
2582    async fn s4e4_looks_encrypted_passthrough_returns_false_for_synthetic() {
2583        // S4F4 (note F not E) must NOT be confused with S4E4.
2584        let mut not_s4e4 = Vec::new();
2585        not_s4e4.extend_from_slice(b"S4F4");
2586        not_s4e4.extend_from_slice(&[0u8; 60]);
2587        assert!(!looks_encrypted(&not_s4e4));
2588        assert_eq!(peek_magic(&not_s4e4), None);
2589    }
2590
2591    #[tokio::test]
2592    async fn s4e4_aad_length_prefix_prevents_byte_shifting() {
2593        // Constructing an S4E4 body where the wrapped_dek_len is
2594        // shrunk by N bytes and the same N bytes are prepended to
2595        // the key_id-equivalent area would, without length-prefixed
2596        // AAD, produce the same AAD bytestream. Verify our AAD
2597        // includes the length prefixes by tampering with
2598        // wrapped_dek_len and confirming AES-GCM auth fails.
2599        let kms = local_kms_with(&[("kk", [11u8; 32])]);
2600        let (dek_vec, wrapped) = kms.generate_dek("kk").await.unwrap();
2601        let mut dek = [0u8; 32];
2602        dek.copy_from_slice(&dek_vec);
2603        let mut ct = encrypt_with_source(
2604            b"length-shift defense",
2605            SseSource::Kms {
2606                dek: &dek,
2607                wrapped: &wrapped,
2608            },
2609        )
2610        .to_vec();
2611        let key_id_len = ct[5] as usize;
2612        let wrapped_len_off = 6 + key_id_len;
2613        // Shrink wrapped_dek_len by 1. parse_s4e4_header now reads a
2614        // shorter wrapped_dek and a different nonce/tag/ciphertext
2615        // alignment — KMS unwrap fails OR AES-GCM fails OR frame
2616        // bounds reject. All three surface as auditable errors;
2617        // none should reach a successful decrypt.
2618        let original_len = u32::from_be_bytes([
2619            ct[wrapped_len_off],
2620            ct[wrapped_len_off + 1],
2621            ct[wrapped_len_off + 2],
2622            ct[wrapped_len_off + 3],
2623        ]);
2624        let new_len = (original_len - 1).to_be_bytes();
2625        ct[wrapped_len_off..wrapped_len_off + 4].copy_from_slice(&new_len);
2626        let err = decrypt_with_kms(&ct, &kms).await.unwrap_err();
2627        // Acceptable failure modes: unwrap fail (truncated wrapped
2628        // DEK), AES-GCM fail (shifted nonce/tag/AAD), or frame bounds.
2629        assert!(
2630            matches!(
2631                err,
2632                SseError::KmsBackend(_)
2633                    | SseError::DecryptFailed
2634                    | SseError::KmsFrameFieldOob { .. }
2635                    | SseError::KmsFrameTooShort { .. }
2636            ),
2637            "got {err:?}"
2638        );
2639    }
2640
2641    // -----------------------------------------------------------------------
2642    // v0.8 #52: S4E5 chunked SSE-S4 — encrypt_v2_chunked / decrypt_chunked_stream
2643    // -----------------------------------------------------------------------
2644
2645    use futures::StreamExt;
2646
2647    /// Drain a chunked-decrypt stream into a `Vec<Bytes>` for assertion.
2648    /// Surfaces the first error verbatim (so tests can match on it).
2649    async fn collect_chunks(
2650        s: impl futures::Stream<Item = Result<Bytes, SseError>>,
2651    ) -> Result<Vec<Bytes>, SseError> {
2652        let mut out = Vec::new();
2653        let mut s = std::pin::pin!(s);
2654        while let Some(item) = s.next().await {
2655            out.push(item?);
2656        }
2657        Ok(out)
2658    }
2659
2660    #[test]
2661    fn s4e5_encrypt_layout_10mb_at_1mib() {
2662        // 10 MB plaintext at 1 MiB chunk size → magic "S4E5",
2663        // chunk_count=10, header bytes line up to the documented 20
2664        // + 10 * 16 + 10 MB layout.
2665        let kr = keyring_single(0x42);
2666        let chunk_size = 1024 * 1024;
2667        let pt_len = 10 * 1024 * 1024;
2668        let pt = vec![0xAB_u8; pt_len];
2669        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).expect("encrypt ok");
2670        assert_eq!(&ct[..4], SSE_MAGIC_V5);
2671        assert_eq!(ct[4], ALGO_AES_256_GCM);
2672        assert_eq!(u16::from_be_bytes([ct[5], ct[6]]), 1, "key_id BE = active id");
2673        assert_eq!(ct[7], 0, "reserved must be 0");
2674        assert_eq!(
2675            u32::from_be_bytes([ct[8], ct[9], ct[10], ct[11]]),
2676            chunk_size as u32,
2677            "chunk_size BE",
2678        );
2679        assert_eq!(
2680            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
2681            10,
2682            "chunk_count BE — 10 MiB / 1 MiB = 10 (no remainder)",
2683        );
2684        assert_eq!(
2685            ct.len(),
2686            S4E5_HEADER_BYTES + 10 * S4E5_PER_CHUNK_OVERHEAD + pt_len,
2687            "total = header + 10 tags + plaintext",
2688        );
2689        assert!(looks_encrypted(&ct), "looks_encrypted must accept S4E5");
2690        assert_eq!(peek_magic(&ct), Some("S4E5"));
2691    }
2692
2693    #[tokio::test]
2694    async fn s4e5_decrypt_chunked_stream_byte_equal() {
2695        // Round-trip: encrypt 10 MB at 1 MiB chunks, stream-decrypt,
2696        // concatenate yielded chunks, byte-equal to original.
2697        let kr = keyring_single(0x55);
2698        let pt: Vec<u8> = (0..(10 * 1024 * 1024_u32)).map(|i| (i & 0xFF) as u8).collect();
2699        let ct = encrypt_v2_chunked(&pt, &kr, 1024 * 1024).unwrap();
2700        let stream = decrypt_chunked_stream(ct, &kr);
2701        let chunks = collect_chunks(stream).await.expect("stream ok");
2702        assert_eq!(chunks.len(), 10, "10 chunks expected for 10 MiB / 1 MiB");
2703        let mut joined = Vec::with_capacity(pt.len());
2704        for c in chunks {
2705            joined.extend_from_slice(&c);
2706        }
2707        assert_eq!(joined.len(), pt.len(), "byte length matches");
2708        assert_eq!(joined, pt, "byte-equal round-trip");
2709    }
2710
2711    #[tokio::test]
2712    async fn s4e5_single_chunk_for_small_object() {
2713        // Plaintext smaller than chunk_size → chunk_count=1.
2714        let kr = keyring_single(0x77);
2715        let pt = b"tiny payload, smaller than chunk_size";
2716        let ct = encrypt_v2_chunked(pt, &kr, 1024 * 1024).unwrap();
2717        assert_eq!(
2718            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
2719            1,
2720            "small plaintext = single chunk",
2721        );
2722        let stream = decrypt_chunked_stream(ct, &kr);
2723        let chunks = collect_chunks(stream).await.expect("stream ok");
2724        assert_eq!(chunks.len(), 1);
2725        assert_eq!(chunks[0].as_ref(), pt);
2726    }
2727
2728    #[tokio::test]
2729    async fn s4e5_tampered_chunk_n_reports_chunk_index() {
2730        // Tamper byte inside chunk index 3 (= 4th chunk) — the
2731        // stream must yield 3 successful chunks, then
2732        // ChunkAuthFailed { 3 }.
2733        let kr = keyring_single(0x91);
2734        let chunk_size = 1024;
2735        let pt = vec![0xCD_u8; chunk_size * 8]; // 8 chunks
2736        let mut ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap().to_vec();
2737        // Locate chunk 3's first ciphertext byte: header (20) + 3 *
2738        // (tag 16 + ct 1024) + tag 16 = 20 + 3*1040 + 16 = 3156.
2739        let target = S4E5_HEADER_BYTES + 3 * (TAG_LEN + chunk_size) + TAG_LEN;
2740        ct[target] ^= 0x42;
2741        let stream = decrypt_chunked_stream(bytes::Bytes::from(ct), &kr);
2742        let mut s = std::pin::pin!(stream);
2743        // Chunks 0, 1, 2 must succeed.
2744        for expected_i in 0..3_u32 {
2745            let item = s.next().await.expect("yield");
2746            item.unwrap_or_else(|e| panic!("chunk {expected_i}: {e:?}"));
2747        }
2748        // Chunk 3 fails with the right index.
2749        let err = s.next().await.expect("yield error").unwrap_err();
2750        assert!(
2751            matches!(err, SseError::ChunkAuthFailed { chunk_index: 3 }),
2752            "got {err:?}",
2753        );
2754    }
2755
2756    #[tokio::test]
2757    async fn s4e5_back_compat_s4e2_blob_rejected_with_clear_error() {
2758        // Feeding an S4E2 frame to decrypt_chunked_stream should
2759        // surface BadMagic on the first poll (NOT silently fall
2760        // back — the caller is expected to peek_magic and dispatch).
2761        let kr = keyring_single(0x12);
2762        let s4e2 = encrypt_v2(b"a v2 blob, not chunked", &kr);
2763        let stream = decrypt_chunked_stream(s4e2, &kr);
2764        let result = collect_chunks(stream).await;
2765        let err = result.unwrap_err();
2766        assert!(matches!(err, SseError::BadMagic { .. }), "got {err:?}");
2767    }
2768
2769    #[test]
2770    fn s4e5_salt_uniqueness_birthday_smoke() {
2771        // 4-byte salt → birthday paradox 50% collision at ~65,536
2772        // PUTs. 1024 PUTs → ~0.012% collision; we don't enforce
2773        // zero, just sanity-check the salt actually differs more
2774        // than half the time (ensures we're sampling fresh
2775        // randomness, not a stuck PRNG).
2776        let kr = keyring_single(0x33);
2777        let mut salts = std::collections::HashSet::new();
2778        let n = 1024;
2779        for _ in 0..n {
2780            let ct = encrypt_v2_chunked(b"x", &kr, 64).unwrap();
2781            let mut salt = [0u8; 4];
2782            salt.copy_from_slice(&ct[16..20]);
2783            salts.insert(salt);
2784        }
2785        assert!(
2786            salts.len() > n / 2,
2787            "expected most of the {n} salts to be unique (got {} unique)",
2788            salts.len(),
2789        );
2790    }
2791
2792    #[test]
2793    fn s4e5_chunk_size_zero_is_invalid() {
2794        let kr = keyring_single(0x66);
2795        let err = encrypt_v2_chunked(b"hi", &kr, 0).unwrap_err();
2796        assert!(matches!(err, SseError::ChunkSizeInvalid));
2797    }
2798
2799    #[tokio::test]
2800    async fn s4e5_truncated_body_surfaces_chunk_frame_truncated() {
2801        // Truncate inside chunk 2's tag → ChunkFrameTruncated, not
2802        // panic, not silent success.
2803        let kr = keyring_single(0xA1);
2804        let chunk_size = 256;
2805        let pt = vec![0u8; chunk_size * 4];
2806        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
2807        // Truncate to inside chunk 2's tag: header + chunk0 + chunk1
2808        // + 8B partial of chunk2's tag.
2809        let trunc = S4E5_HEADER_BYTES + 2 * (TAG_LEN + chunk_size) + 8;
2810        let truncated = bytes::Bytes::copy_from_slice(&ct[..trunc]);
2811        let stream = decrypt_chunked_stream(truncated, &kr);
2812        let result = collect_chunks(stream).await;
2813        let err = result.unwrap_err();
2814        assert!(
2815            matches!(err, SseError::ChunkFrameTruncated { .. }),
2816            "got {err:?}",
2817        );
2818    }
2819
2820    #[test]
2821    fn s4e5_decrypt_buffered_round_trip_via_top_level_decrypt() {
2822        // Sync `decrypt(blob, &keyring)` must also accept S4E5
2823        // (back-compat path for callers that need the whole
2824        // plaintext).
2825        let kr = keyring_single(0xDE);
2826        let pt = b"buffered sync decrypt path".repeat(32);
2827        let ct = encrypt_v2_chunked(&pt, &kr, 13).unwrap();
2828        let plain = decrypt(&ct, &kr).expect("buffered S4E5 decrypt ok");
2829        assert_eq!(plain.as_ref(), pt.as_slice());
2830    }
2831
2832    #[tokio::test]
2833    async fn s4e5_unknown_key_id_in_frame_errors() {
2834        // Encrypt under id=7, decrypt under a keyring that lacks id=7.
2835        let kr_put = SseKeyring::new(7, key32(0xCC));
2836        let kr_get = keyring_single(0xCC); // only id=1
2837        let ct = encrypt_v2_chunked(b"orphan key", &kr_put, 64).unwrap();
2838        // Sync path
2839        let err = decrypt(&ct, &kr_get).unwrap_err();
2840        assert!(matches!(err, SseError::KeyNotInKeyring { id: 7 }), "got {err:?}");
2841        // Stream path
2842        let stream = decrypt_chunked_stream(ct, &kr_get);
2843        let result = collect_chunks(stream).await;
2844        assert!(
2845            matches!(result, Err(SseError::KeyNotInKeyring { id: 7 })),
2846            "got {result:?}",
2847        );
2848    }
2849
2850    #[tokio::test]
2851    async fn s4e5_final_chunk_smaller_than_chunk_size() {
2852        // Plaintext = 2.5 chunks → final chunk holds half the bytes.
2853        let kr = keyring_single(0xEF);
2854        let chunk_size = 100;
2855        let pt: Vec<u8> = (0..250_u32).map(|i| i as u8).collect();
2856        let ct = encrypt_v2_chunked(&pt, &kr, chunk_size).unwrap();
2857        assert_eq!(
2858            u32::from_be_bytes([ct[12], ct[13], ct[14], ct[15]]),
2859            3,
2860            "ceil(250/100) = 3 chunks",
2861        );
2862        // Total on-disk: 20 header + 3 tags (48) + 250 plaintext = 318.
2863        assert_eq!(ct.len(), 20 + 48 + 250);
2864        let stream = decrypt_chunked_stream(ct, &kr);
2865        let chunks = collect_chunks(stream).await.expect("stream ok");
2866        assert_eq!(chunks.len(), 3);
2867        assert_eq!(chunks[0].len(), 100);
2868        assert_eq!(chunks[1].len(), 100);
2869        assert_eq!(chunks[2].len(), 50, "final chunk is the remainder");
2870        let joined: Vec<u8> = chunks.iter().flat_map(|c| c.iter().copied()).collect();
2871        assert_eq!(joined, pt);
2872    }
2873}