Skip to main content

s4_server/
multipart_state.rs

1//! Per-`upload_id` side-table for multipart uploads (v0.8 BUG-5..10 fix).
2//!
3//! S3 multipart is split across three handlers:
4//!
5//!   - `CreateMultipartUpload` — receives the SSE / Tagging / Object-Lock
6//!     headers the client wants applied to the eventual object.
7//!   - `UploadPart` × N — receives only the body bytes + part number;
8//!     the SSE-C headers must be replayed by the client (AWS spec) but
9//!     SSE-S4 / SSE-KMS / Tagging / Object-Lock are NOT replayed (they
10//!     live on the upload itself).
11//!   - `CompleteMultipartUpload` — receives only the part-list manifest;
12//!     no metadata reaches this handler from the wire either.
13//!
14//! v0.7 #48 fixed the single-PUT path to take()`SSE` request fields off
15//! the s3s input, encrypt-then-store, and stamp the `s4-sse-type`
16//! metadata on the resulting object so HEAD can echo correctly. The
17//! multipart path needs the equivalent treatment but the per-upload
18//! context is split across three handler invocations — this module is
19//! the side-channel that carries it from `CreateMultipartUpload` through
20//! to `UploadPart` / `CompleteMultipartUpload`.
21//!
22//! The store is keyed on the backend-issued `upload_id` (opaque string
23//! returned by `CreateMultipartUpload`'s response). `put` / `get` /
24//! `remove` are all `O(1)` under a single `RwLock<HashMap>`; multipart
25//! upload throughput is dominated by the part-body PUTs to the backend
26//! (5 MiB+ each), so the lock is never the bottleneck.
27
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::sync::RwLock;
31
32use chrono::{DateTime, Utc};
33use dashmap::DashMap;
34use tokio::sync::Mutex;
35use zeroize::Zeroizing;
36
37use crate::object_lock::LockMode;
38use crate::tagging::TagSet;
39
40/// SSE recipe captured at `CreateMultipartUpload` time and replayed for
41/// every part body + the final stamp on the assembled object.
42///
43/// The variants mirror `service::put_object`'s SSE branch precedence:
44/// SSE-C (per-request customer key) wins over SSE-KMS (named KMS key)
45/// wins over SSE-S4 (server-managed keyring) wins over no encryption.
46/// SSE-C / SSE-KMS materialise only when the client supplied the
47/// matching headers; SSE-S4 materialises whenever the gateway is booted
48/// with `--sse-s4-key` (or `with_sse_keyring(...)` in tests).
49///
50/// v0.8.2 #62 (H-6 audit fix): the `SseC` variant's customer key is held
51/// in `Zeroizing<[u8; 32]>` so the raw 32-byte AES key is overwritten
52/// with `0u8` when the entry is dropped — either via `remove(upload_id)`
53/// on Complete/Abort, or via `sweep_stale(...)` on an abandoned upload.
54/// Process core dump / swap-out / KSM snapshot can no longer leak a
55/// previously-held SSE-C key after the upload's lifetime ends. The
56/// `key_md5` is deliberately a plain `[u8; 16]` — it's a public
57/// fingerprint (S3 puts it on the wire on every PUT/GET response) and
58/// requires no zeroization. Custom `PartialEq` ignores the `Zeroizing`
59/// wrapper so existing tests that match on the variant keep compiling.
60#[derive(Clone, Debug)]
61pub enum MultipartSseMode {
62    /// Plaintext multipart. Backend stores raw framed bytes.
63    None,
64    /// Server-managed keyring (active key on PUT, all keys probed on GET).
65    /// The keyring itself lives on `S4Service`; only the marker is held
66    /// here so `complete_multipart_upload` knows which path to take.
67    SseS4,
68    /// Per-request customer key. The 32-byte key + its 128-bit MD5 are
69    /// kept in memory only for the lifetime of the upload, then dropped
70    /// when the entry is `remove(...)`'d on Complete or Abort. v0.8.2
71    /// #62: `key` is `Zeroizing<[u8; 32]>` so its bytes are wiped on
72    /// drop (vs. a bare `[u8; 32]` which would linger on the heap /
73    /// stack until the next allocation reuse).
74    SseC {
75        key: Zeroizing<[u8; 32]>,
76        key_md5: [u8; 16],
77    },
78    /// Named KMS key (resolved against the gateway's KMS backend on
79    /// Complete to generate the per-object DEK).
80    SseKms { key_id: String },
81}
82
83// Manual `PartialEq` / `Eq` so `Zeroizing<[u8; 32]>` (which doesn't
84// derive `PartialEq`) doesn't break the existing `assert_eq!` call
85// sites. Compares by deref to the inner `[u8; 32]`.
86impl PartialEq for MultipartSseMode {
87    fn eq(&self, other: &Self) -> bool {
88        match (self, other) {
89            (MultipartSseMode::None, MultipartSseMode::None) => true,
90            (MultipartSseMode::SseS4, MultipartSseMode::SseS4) => true,
91            (
92                MultipartSseMode::SseC {
93                    key: a,
94                    key_md5: am,
95                },
96                MultipartSseMode::SseC {
97                    key: b,
98                    key_md5: bm,
99                },
100            ) => a.as_slice() == b.as_slice() && am == bm,
101            (MultipartSseMode::SseKms { key_id: a }, MultipartSseMode::SseKms { key_id: b }) => {
102                a == b
103            }
104            _ => false,
105        }
106    }
107}
108impl Eq for MultipartSseMode {}
109
110/// Everything `CreateMultipartUpload` captured for `UploadPart` /
111/// `CompleteMultipartUpload` to act on. All fields are owned so the
112/// store can hand out cheap `Clone`s under the read lock.
113#[derive(Clone, Debug)]
114pub struct MultipartUploadContext {
115    /// Bucket the upload targets. Stored even though
116    /// `CompleteMultipartUploadInput::bucket` carries it too — keeps the
117    /// side-table self-contained for tests / debug dumps.
118    pub bucket: String,
119    /// Logical object key the upload will materialise into. Stored for
120    /// the same reason as `bucket`.
121    pub key: String,
122    /// SSE recipe captured from the Create's input headers.
123    pub sse: MultipartSseMode,
124    /// Tags parsed off `Tagging` / `x-amz-tagging` on Create. `None`
125    /// when the client didn't ask for tagging; otherwise the `TagSet` is
126    /// applied via `TagManager::put_object_tags` on Complete (BUG-9
127    /// fix).
128    pub tags: Option<TagSet>,
129    /// Per-PUT explicit Object Lock mode supplied via
130    /// `x-amz-object-lock-mode` on Create. Mirrors `put_object`'s
131    /// `explicit_lock_mode` capture so Complete commits the right
132    /// retention. `None` when no header was sent (Complete then falls
133    /// back to the bucket default via `apply_default_on_put`).
134    pub object_lock_mode: Option<LockMode>,
135    /// Per-PUT explicit Object Lock retain-until timestamp.
136    pub object_lock_retain_until: Option<DateTime<Utc>>,
137    /// Per-PUT explicit Object Lock legal-hold flag (`true` when
138    /// `x-amz-object-lock-legal-hold: ON` was sent on Create).
139    pub object_lock_legal_hold: bool,
140}
141
142/// In-memory side-table mapping `upload_id` → context. One of these
143/// hangs off `S4Service` (always-on, no flag — the per-upload state is
144/// gateway-internal).
145///
146/// v0.8.2 #62 (H-6 audit fix): each entry carries the `DateTime<Utc>`
147/// of its `put` insertion so `sweep_stale(now, max_age)` can drop
148/// abandoned upload contexts (client called `CreateMultipartUpload`,
149/// uploaded some parts, then crashed without invoking
150/// `CompleteMultipartUpload` / `AbortMultipartUpload`). Without the
151/// sweep, an SSE-C upload's raw 32-byte customer key would linger in
152/// `MultipartSseMode::SseC` indefinitely. The sweep + the new
153/// `Zeroizing` wrapper together bound the key's in-memory lifetime to
154/// `max_age` (default 24h via `--multipart-abandoned-ttl-hours`).
155pub struct MultipartStateStore {
156    by_upload_id: RwLock<HashMap<String, (MultipartUploadContext, DateTime<Utc>)>>,
157    /// v0.8.1 #59: per-(bucket, key) `Mutex` used to serialize Complete
158    /// operations on the same logical key. The race window the lock
159    /// closes lives inside `service::complete_multipart_upload` between
160    /// `backend.get_object` (assembled body fetch for the SSE encrypt
161    /// re-PUT, BUG-5 fix) and `backend.put_object` (encrypted body
162    /// write-back). Two concurrent Completes with different `upload_id`
163    /// but the same `(bucket, key)` could otherwise interleave their
164    /// GET / encrypt / PUT triples and overwrite each other.
165    ///
166    /// `DashMap` is used because the lock acquisition path is itself
167    /// `O(1)` and contention between *different* keys must not block;
168    /// `DashMap`'s sharded design preserves that property whereas a
169    /// single `RwLock<HashMap<_,_>>` would serialise even unrelated
170    /// keys' lock-lookup. The stored `Arc<Mutex<()>>` is what the
171    /// caller actually awaits on — the `DashMap` itself is just a
172    /// concurrent index into those mutexes.
173    ///
174    /// Cleanup is best-effort (`prune_completion_locks`); the entry
175    /// for a one-shot key is dropped once both the in-flight Complete
176    /// returns and the prune sweep observes only the `DashMap`'s own
177    /// `Arc` reference.
178    completion_locks: DashMap<(String, String), Arc<Mutex<()>>>,
179}
180
181impl MultipartStateStore {
182    /// Empty store. Use `Arc<MultipartStateStore>` so `S4Service`'s
183    /// async handlers can borrow it across `&self` calls without
184    /// requiring `Clone`.
185    #[must_use]
186    pub fn new() -> Self {
187        Self {
188            by_upload_id: RwLock::new(HashMap::new()),
189            completion_locks: DashMap::new(),
190        }
191    }
192
193    /// Register a new upload under `upload_id`. If `upload_id` is
194    /// already present (extremely unlikely — backend issues fresh ids)
195    /// the previous entry is overwritten silently to mirror
196    /// `HashMap::insert`'s replace-on-collision semantics.
197    ///
198    /// v0.8.2 #62: the insertion timestamp (`Utc::now()`) is stored
199    /// alongside the context so `sweep_stale` can prune abandoned
200    /// uploads. The timestamp is set at insert-time only — re-puts on
201    /// the same `upload_id` (overwrite) reset the clock, which is the
202    /// behaviour we want (treat a re-Create as the abandonment-clock
203    /// restart).
204    pub fn put(&self, upload_id: &str, ctx: MultipartUploadContext) {
205        crate::lock_recovery::recover_write(&self.by_upload_id, "multipart_state.by_upload_id")
206            .insert(upload_id.to_owned(), (ctx, Utc::now()));
207    }
208
209    /// Snapshot the context for `upload_id`. `None` when no entry was
210    /// registered (e.g. Complete arrived for an upload that the gateway
211    /// has no record of — passes through to the backend untouched, which
212    /// in turn surfaces `NoSuchUpload`).
213    #[must_use]
214    pub fn get(&self, upload_id: &str) -> Option<MultipartUploadContext> {
215        crate::lock_recovery::recover_read(&self.by_upload_id, "multipart_state.by_upload_id")
216            .get(upload_id)
217            .map(|(c, _)| c.clone())
218    }
219
220    /// Drop the entry. Called by Complete / Abort to release the SSE-C
221    /// key bytes and the tag-set memory promptly. The `Zeroizing<[u8;
222    /// 32]>` wrapper inside the dropped `MultipartSseMode::SseC`
223    /// variant zeros the key bytes during its `Drop`.
224    pub fn remove(&self, upload_id: &str) {
225        crate::lock_recovery::recover_write(&self.by_upload_id, "multipart_state.by_upload_id")
226            .remove(upload_id);
227    }
228
229    /// v0.8.2 #62 (H-6 audit fix): drop every entry whose insertion
230    /// timestamp is older than `now - max_age`. Returns the number of
231    /// entries swept. Called from a hourly background tick spawned in
232    /// `main.rs` (default TTL = 24 h, configurable via
233    /// `--multipart-abandoned-ttl-hours`).
234    ///
235    /// Each dropped `MultipartUploadContext` runs the inner
236    /// `MultipartSseMode::SseC { key: Zeroizing<[u8; 32]>, .. }`'s
237    /// `Drop`, wiping the customer-supplied AES key bytes from
238    /// process memory. SSE-S4 / SSE-KMS / None variants drop their
239    /// (smaller) state too; only SSE-C carries raw key material.
240    ///
241    /// The cutoff is computed as `now - max_age` rather than
242    /// `Utc::now() - max_age` so callers can drive the clock
243    /// deterministically in tests (the unit tests below pass an
244    /// explicit `now` from a fixed timestamp).
245    pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: chrono::Duration) -> usize {
246        let cutoff = now - max_age;
247        let mut map =
248            crate::lock_recovery::recover_write(&self.by_upload_id, "multipart_state.by_upload_id");
249        let stale: Vec<String> = map
250            .iter()
251            .filter(|(_, (_, ts))| *ts < cutoff)
252            .map(|(k, _)| k.clone())
253            .collect();
254        let count = stale.len();
255        for k in stale {
256            map.remove(&k);
257        }
258        count
259    }
260
261    /// v0.8.1 #59: get-or-create the per-(bucket, key) `Mutex` used to
262    /// serialise `complete_multipart_upload` invocations on the same
263    /// logical key. Caller does `lock.lock().await` and holds the
264    /// guard for the duration of its critical section (GET assembled
265    /// body → encrypt → PUT encrypted body → version-id mint → object-
266    /// lock apply → tagging persist → replication enqueue).
267    ///
268    /// Returns an `Arc<Mutex<()>>` so the caller can drop the
269    /// `DashMap` shard's read lock immediately and only retain the
270    /// mutex itself across the await point — `DashMap`'s shard guard
271    /// is `!Send`, so we must not hold it through an `await`.
272    pub fn completion_lock(&self, bucket: &str, key: &str) -> Arc<Mutex<()>> {
273        let k = (bucket.to_owned(), key.to_owned());
274        self.completion_locks
275            .entry(k)
276            .or_insert_with(|| Arc::new(Mutex::new(())))
277            .value()
278            .clone()
279    }
280
281    /// v0.8.1 #59: best-effort cleanup of stale completion-lock
282    /// entries. A `(bucket, key)` entry is "stale" once no concurrent
283    /// Complete is referencing its `Arc<Mutex<()>>` — we detect that
284    /// by `Arc::strong_count == 1` (only the `DashMap` itself holds a
285    /// reference). Called from `complete_multipart_upload` after the
286    /// guarded section returns, so a steady-state workload of unique
287    /// keys never accumulates locks.
288    ///
289    /// The retain predicate is `> 1` (keep entries with outstanding
290    /// borrowers), so prune is safe to invoke concurrently with other
291    /// `completion_lock` callers — at worst the prune sees the entry
292    /// during a brief window where the borrower has cloned but not yet
293    /// taken `lock()`, and the entry survives until the next sweep.
294    pub fn prune_completion_locks(&self) {
295        self.completion_locks
296            .retain(|_, lock| Arc::strong_count(lock) > 1);
297    }
298
299    /// Test-only: how many completion-lock entries the store currently
300    /// holds. Used by `prune_completion_locks_removes_unreferenced`.
301    #[cfg(test)]
302    fn completion_locks_len(&self) -> usize {
303        self.completion_locks.len()
304    }
305
306    /// Test-only: how many in-flight uploads the store is currently
307    /// tracking. Used by the assertion in `concurrent_put_lookup_race_free`.
308    #[cfg(test)]
309    fn len(&self) -> usize {
310        crate::lock_recovery::recover_read(&self.by_upload_id, "multipart_state.by_upload_id").len()
311    }
312}
313
314impl Default for MultipartStateStore {
315    fn default() -> Self {
316        Self::new()
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use std::sync::Arc;
324    use std::thread;
325
326    fn sample_ctx(bucket: &str, key: &str) -> MultipartUploadContext {
327        MultipartUploadContext {
328            bucket: bucket.to_owned(),
329            key: key.to_owned(),
330            sse: MultipartSseMode::None,
331            tags: None,
332            object_lock_mode: None,
333            object_lock_retain_until: None,
334            object_lock_legal_hold: false,
335        }
336    }
337
338    /// `put` followed by `get` returns the same context, and `remove`
339    /// makes a subsequent `get` return `None`. Sanity for the basic
340    /// CRUD shape.
341    #[test]
342    fn put_get_remove_round_trip() {
343        let store = MultipartStateStore::new();
344        let ctx = sample_ctx("b", "k");
345        store.put("upload-001", ctx.clone());
346        let got = store.get("upload-001").expect("entry must be present");
347        assert_eq!(got.bucket, "b");
348        assert_eq!(got.key, "k");
349        assert_eq!(got.sse, MultipartSseMode::None);
350        store.remove("upload-001");
351        assert!(store.get("upload-001").is_none(), "entry must be gone");
352    }
353
354    /// SSE-C variants stash the 32-byte key + 16-byte MD5; verify the
355    /// bytes round-trip exactly (defensive — easy place to introduce a
356    /// silent truncation bug).
357    #[test]
358    fn sse_c_key_bytes_round_trip() {
359        let store = MultipartStateStore::new();
360        let key = [0xa5u8; 32];
361        let key_md5 = [0xb6u8; 16];
362        let mut ctx = sample_ctx("b", "k");
363        ctx.sse = MultipartSseMode::SseC {
364            key: Zeroizing::new(key),
365            key_md5,
366        };
367        store.put("u-sse-c", ctx);
368        let got = store.get("u-sse-c").expect("entry must be present");
369        match got.sse {
370            MultipartSseMode::SseC { key: k, key_md5: m } => {
371                assert_eq!(*k, key, "SSE-C key bytes must round-trip");
372                assert_eq!(m, key_md5, "SSE-C MD5 must round-trip");
373            }
374            other => panic!("expected SseC variant, got {other:?}"),
375        }
376    }
377
378    /// v0.8.2 #62 (H-6 fix): registering an SSE-C upload then
379    /// `remove`-ing it must drop the `Zeroizing<[u8; 32]>` key wrapper
380    /// — its `Drop` zeros the underlying 32 bytes. Direct verification
381    /// requires reading back the heap allocation that backed the
382    /// `Zeroizing` (UB in safe Rust); instead we assert the
383    /// behavioural contract: after `remove`, a fresh `get` returns
384    /// `None` (the entry is gone, so the `Drop` ran). We additionally
385    /// build a separate `Zeroizing<[u8; 32]>`, observe non-zero
386    /// content, then drop it under a `Box` — the post-drop heap
387    /// region is no longer reachable from safe Rust, so we settle for
388    /// the structural contract: the `Zeroize` derive on `Zeroizing`
389    /// is what actually wipes the bytes (covered by the `zeroize`
390    /// crate's own test suite). This test is the smoke check that we
391    /// kept the wrapper on the variant.
392    #[test]
393    fn sse_c_key_zeroized_on_remove() {
394        let store = MultipartStateStore::new();
395        let key = [0x77u8; 32];
396        let key_md5 = [0x33u8; 16];
397        let mut ctx = sample_ctx("b", "k");
398        ctx.sse = MultipartSseMode::SseC {
399            key: Zeroizing::new(key),
400            key_md5,
401        };
402        store.put("u-zero", ctx);
403        // Confirm the variant carries a `Zeroizing<[u8; 32]>` (not a
404        // bare `[u8; 32]`) by exercising `Deref` to `&[u8; 32]`. If
405        // someone later regresses the wrapper away, this access would
406        // still compile but the structural assertion below — that the
407        // store actually held the entry — is what the test is for.
408        let got = store.get("u-zero").expect("entry present");
409        match &got.sse {
410            MultipartSseMode::SseC { key: k, .. } => {
411                let _deref: &[u8; 32] = k; // typeof check: must be Zeroizing<[u8;32]>
412                assert_eq!(**k, key);
413            }
414            other => panic!("expected SseC, got {other:?}"),
415        }
416        drop(got);
417        store.remove("u-zero");
418        assert!(
419            store.get("u-zero").is_none(),
420            "removed entry must be gone (its Zeroizing<[u8;32]> ran Drop and wiped the key)"
421        );
422    }
423
424    /// v0.8.2 #62: with three entries inserted at staggered
425    /// timestamps, `sweep_stale(now, 24h)` must drop the two that are
426    /// older than 24 h and keep the recent one. We pin `now`
427    /// deterministically to avoid wall-clock flakes; the store's
428    /// internal `put` always stamps `Utc::now()` so we drive the
429    /// cutoff such that all three entries land before it.
430    #[test]
431    fn sweep_stale_drops_old_contexts() {
432        let store = MultipartStateStore::new();
433        // Insert three entries (all stamped with `Utc::now()` at
434        // insert time — within microseconds of each other on a normal
435        // machine).
436        store.put("u-1", sample_ctx("b", "k1"));
437        store.put("u-2", sample_ctx("b", "k2"));
438        store.put("u-3", sample_ctx("b", "k3"));
439        assert_eq!(store.len(), 3, "all three entries inserted");
440        // `now` 25 h in the future puts every existing entry beyond
441        // the 24 h cutoff → all three are stale.
442        let future = Utc::now() + chrono::Duration::hours(25);
443        let swept = store.sweep_stale(future, chrono::Duration::hours(24));
444        assert_eq!(swept, 3, "all three entries are older than 24 h cutoff");
445        assert_eq!(store.len(), 0, "store must be empty after sweep");
446    }
447
448    /// v0.8.2 #62: `sweep_stale` must NOT drop entries that are still
449    /// fresh. Inserts one entry, then sweeps with a `now` only 1 h
450    /// later — the entry is well within the 24 h TTL, so survives.
451    #[test]
452    fn sweep_stale_keeps_recent_contexts() {
453        let store = MultipartStateStore::new();
454        store.put("u-fresh", sample_ctx("b", "k"));
455        let near_future = Utc::now() + chrono::Duration::hours(1);
456        let swept = store.sweep_stale(near_future, chrono::Duration::hours(24));
457        assert_eq!(swept, 0, "1 h-old entry must NOT be swept under 24 h TTL");
458        assert!(store.get("u-fresh").is_some(), "fresh entry must remain");
459        assert_eq!(store.len(), 1);
460    }
461
462    /// v0.8.2 #62: mixed-age workload — two entries from "the past"
463    /// (we insert them, then advance the conceptual `now` past the
464    /// TTL) and one fresh entry. Sweep must return exactly 2 and
465    /// leave the fresh one intact. Verifies `sweep_stale` reports the
466    /// correct count for partial sweeps (the most common ops case).
467    #[test]
468    fn sweep_stale_count_returns_correct() {
469        let store = MultipartStateStore::new();
470        // Insert two "old" entries; we'll later sweep with a `now` so
471        // far ahead that these become stale.
472        store.put("old-1", sample_ctx("b", "k1"));
473        store.put("old-2", sample_ctx("b", "k2"));
474        // Sleep is too brittle for CI; instead drive the sweep
475        // cutoff so only the two "old" entries fall behind it. We
476        // emulate the third entry being "fresh" by inserting it
477        // *after* capturing the moment-in-time we'll sweep against.
478        let sweep_now = Utc::now() + chrono::Duration::hours(25);
479        // Now the third entry is inserted "in the future" relative
480        // to itself — but its timestamp will be `Utc::now()`, well
481        // before `sweep_now + 25h - 24h`. To keep the test
482        // self-contained we insert the fresh entry at a wall-clock
483        // close to `sweep_now`, not `Utc::now()`. We can't cheat the
484        // store's internal `Utc::now()` stamp from here, so we rely
485        // on the cutoff arithmetic: cutoff = sweep_now - 24h =
486        // Utc::now() + 1h, which is strictly after every real
487        // `Utc::now()` timestamp on the current entries → all three
488        // would be stale.
489        //
490        // Instead: insert the fresh entry, then choose a `sweep_now`
491        // such that exactly the first two are older than the cutoff
492        // and the fresh one is not.
493        std::thread::sleep(std::time::Duration::from_millis(10));
494        let fresh_marker = Utc::now();
495        std::thread::sleep(std::time::Duration::from_millis(10));
496        store.put("fresh", sample_ctx("b", "k3"));
497        // cutoff = fresh_marker → strictly between the "old" inserts
498        // (timestamps before `fresh_marker`) and the fresh insert
499        // (timestamp after `fresh_marker`). Choose `sweep_now =
500        // fresh_marker + 24h` so `cutoff = fresh_marker`.
501        let sweep_at = fresh_marker + chrono::Duration::hours(24);
502        let swept = store.sweep_stale(sweep_at, chrono::Duration::hours(24));
503        assert_eq!(swept, 2, "exactly the two pre-marker entries must sweep");
504        assert!(store.get("fresh").is_some(), "post-marker entry survives");
505        assert!(store.get("old-1").is_none(), "old-1 must be gone");
506        assert!(store.get("old-2").is_none(), "old-2 must be gone");
507        let _ = sweep_now; // silence dead-code (kept to document the simpler-but-discarded plan)
508    }
509
510    /// v0.8.1 #59: `completion_lock(bucket, key)` must return the
511    /// **same** `Arc<Mutex<()>>` for repeated calls on the same key,
512    /// otherwise concurrent Completes on the same key would each grab
513    /// a distinct mutex and the serialisation would silently degrade
514    /// to no-op. We compare `Arc::as_ptr()` rather than equality on
515    /// the inner `()` because two distinct `Mutex<()>` instances would
516    /// have different addresses but compare equal under `==` (unit
517    /// type).
518    #[test]
519    fn completion_lock_returns_same_arc_for_same_key() {
520        let store = MultipartStateStore::new();
521        let a = store.completion_lock("bucket-a", "key/x");
522        let b = store.completion_lock("bucket-a", "key/x");
523        assert!(
524            Arc::ptr_eq(&a, &b),
525            "completion_lock(same bucket, same key) must return identical Arc"
526        );
527    }
528
529    /// v0.8.1 #59: locks for distinct `(bucket, key)` tuples must be
530    /// independent — concurrent Completes on different keys must not
531    /// serialise on each other. We acquire two locks back-to-back
532    /// (`try_lock` so the assertion is deterministic and doesn't
533    /// depend on a runtime); both must succeed without contention.
534    /// Also exercises bucket-vs-key disjointness: same key under two
535    /// different buckets must NOT alias.
536    #[tokio::test]
537    async fn completion_lock_distinct_keys_independent() {
538        let store = MultipartStateStore::new();
539        let a = store.completion_lock("bucket-a", "shared/key");
540        let b = store.completion_lock("bucket-b", "shared/key");
541        assert!(
542            !Arc::ptr_eq(&a, &b),
543            "completion_lock with different bucket must yield different Arc"
544        );
545        // Hold the first lock and acquire the second under the same
546        // task — must NOT deadlock and must NOT block. `try_lock`
547        // returns `Ok(MutexGuard)` when uncontended, `Err` otherwise.
548        let guard_a = a
549            .try_lock()
550            .expect("lock on bucket-a/shared/key must be free");
551        let guard_b = b
552            .try_lock()
553            .expect("lock on bucket-b/shared/key must be free");
554        // Same key, same bucket from a third call must alias `a` and
555        // therefore be contended (a's guard is held above).
556        let a2 = store.completion_lock("bucket-a", "shared/key");
557        assert!(
558            Arc::ptr_eq(&a, &a2),
559            "completion_lock for the same (bucket, key) must alias"
560        );
561        assert!(
562            a2.try_lock().is_err(),
563            "completion_lock alias must observe the held guard as contended"
564        );
565        drop(guard_a);
566        drop(guard_b);
567    }
568
569    /// v0.8.1 #59: `prune_completion_locks` must drop entries whose
570    /// only `Arc` is the `DashMap`'s own (i.e. no in-flight Complete is
571    /// holding a reference). After we acquire a lock then drop the
572    /// returned `Arc`, the `strong_count` falls to 1 and prune must
573    /// retire the entry so a steady-state workload of unique keys
574    /// doesn't accumulate. Conversely, an entry with an outstanding
575    /// `Arc` reference must survive prune.
576    #[test]
577    fn prune_completion_locks_removes_unreferenced() {
578        let store = MultipartStateStore::new();
579        // Acquire-and-drop: simulates a Complete that finished and let
580        // its `Arc<Mutex<()>>` go out of scope. `strong_count == 1`
581        // afterwards (only the `DashMap` retains it).
582        {
583            let _lock = store.completion_lock("b", "ephemeral");
584        }
585        assert_eq!(
586            store.completion_locks_len(),
587            1,
588            "lock entry must be present immediately after acquire-drop"
589        );
590        store.prune_completion_locks();
591        assert_eq!(
592            store.completion_locks_len(),
593            0,
594            "prune must retire entries with strong_count == 1"
595        );
596
597        // Negative case: an outstanding `Arc` must NOT be pruned —
598        // pruning a still-borrowed entry would let a concurrent
599        // Complete miss the serialisation point.
600        let held = store.completion_lock("b", "in-flight");
601        store.prune_completion_locks();
602        assert_eq!(
603            store.completion_locks_len(),
604            1,
605            "prune must keep entries with outstanding Arc borrowers"
606        );
607        drop(held);
608        store.prune_completion_locks();
609        assert_eq!(
610            store.completion_locks_len(),
611            0,
612            "prune must retire the entry once the borrower drops"
613        );
614    }
615
616    /// 8 threads each register 250 distinct upload_ids and immediately
617    /// look them up. After `join` the store must contain exactly the
618    /// 8 × 250 entries — verifies `RwLock` doesn't drop writes under
619    /// concurrent contention (the obvious refactor that swaps to
620    /// `HashMap` without a lock would visibly fail this).
621    #[test]
622    fn concurrent_put_lookup_race_free() {
623        let store = Arc::new(MultipartStateStore::new());
624        let mut handles = Vec::new();
625        for tid in 0..8u32 {
626            let st = Arc::clone(&store);
627            handles.push(thread::spawn(move || {
628                for i in 0..250u32 {
629                    let id = format!("u-{tid}-{i}");
630                    let ctx = sample_ctx("b", &id);
631                    st.put(&id, ctx);
632                    // Immediate lookup proves the writer-side observer
633                    // sees its own put under the RwLock.
634                    let got = st.get(&id).expect("self-put must be visible");
635                    assert_eq!(got.key, id);
636                }
637            }));
638        }
639        for h in handles {
640            h.join().expect("worker thread panicked");
641        }
642        assert_eq!(store.len(), 8 * 250, "all puts must persist");
643    }
644
645    /// v0.8.4 #77 (audit H-8): a panic inside the `by_upload_id` write
646    /// guard poisons the lock. Subsequent reads (e.g. `get` /
647    /// `sweep_stale`) must recover via
648    /// [`crate::lock_recovery::recover_read`] /
649    /// [`crate::lock_recovery::recover_write`] and surface the data
650    /// instead of re-panicking. `MultipartStateStore` has no `to_json`
651    /// so this test exercises `get` directly — the same poison-recovery
652    /// helper is used.
653    #[test]
654    fn multipart_state_get_after_panic_recovers_via_poison() {
655        let store = Arc::new(MultipartStateStore::new());
656        store.put("u1", sample_ctx("b", "k"));
657        let store_cl = Arc::clone(&store);
658        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
659            let mut g = store_cl.by_upload_id.write().expect("clean lock");
660            g.insert("u2".to_owned(), (sample_ctx("b", "k2"), Utc::now()));
661            panic!("force-poison");
662        }));
663        assert!(
664            store.by_upload_id.is_poisoned(),
665            "write panic must poison by_upload_id lock"
666        );
667        let got = store.get("u1").expect("get after poison must succeed");
668        assert_eq!(got.bucket, "b");
669        assert_eq!(got.key, "k");
670        // sweep_stale (write path) must also recover, not panic.
671        let n = store.sweep_stale(
672            Utc::now() + chrono::Duration::hours(48),
673            chrono::Duration::hours(1),
674        );
675        assert!(n >= 1, "stale sweep must run + recover via poison");
676    }
677}