Skip to main content

s4_server/
multipart_state.rs

1//! Per-`upload_id` side-table for multipart uploads (v0.8 BUG-5..10 fix).
2//!
3//! S3 multipart is split across three handlers:
4//!
5//!   - `CreateMultipartUpload` — receives the SSE / Tagging / Object-Lock
6//!     headers the client wants applied to the eventual object.
7//!   - `UploadPart` × N — receives only the body bytes + part number;
8//!     the SSE-C headers must be replayed by the client (AWS spec) but
9//!     SSE-S4 / SSE-KMS / Tagging / Object-Lock are NOT replayed (they
10//!     live on the upload itself).
11//!   - `CompleteMultipartUpload` — receives only the part-list manifest;
12//!     no metadata reaches this handler from the wire either.
13//!
14//! v0.7 #48 fixed the single-PUT path to take()`SSE` request fields off
15//! the s3s input, encrypt-then-store, and stamp the `s4-sse-type`
16//! metadata on the resulting object so HEAD can echo correctly. The
17//! multipart path needs the equivalent treatment but the per-upload
18//! context is split across three handler invocations — this module is
19//! the side-channel that carries it from `CreateMultipartUpload` through
20//! to `UploadPart` / `CompleteMultipartUpload`.
21//!
22//! The store is keyed on the backend-issued `upload_id` (opaque string
23//! returned by `CreateMultipartUpload`'s response). `put` / `get` /
24//! `remove` are all `O(1)` under a single `RwLock<HashMap>`; multipart
25//! upload throughput is dominated by the part-body PUTs to the backend
26//! (5 MiB+ each), so the lock is never the bottleneck.
27
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::sync::RwLock;
31
32use chrono::{DateTime, Utc};
33use dashmap::DashMap;
34use tokio::sync::Mutex;
35
36use crate::object_lock::LockMode;
37use crate::tagging::TagSet;
38
39/// SSE recipe captured at `CreateMultipartUpload` time and replayed for
40/// every part body + the final stamp on the assembled object.
41///
42/// The variants mirror `service::put_object`'s SSE branch precedence:
43/// SSE-C (per-request customer key) wins over SSE-KMS (named KMS key)
44/// wins over SSE-S4 (server-managed keyring) wins over no encryption.
45/// SSE-C / SSE-KMS materialise only when the client supplied the
46/// matching headers; SSE-S4 materialises whenever the gateway is booted
47/// with `--sse-s4-key` (or `with_sse_keyring(...)` in tests).
48#[derive(Clone, Debug, PartialEq, Eq)]
49pub enum MultipartSseMode {
50    /// Plaintext multipart. Backend stores raw framed bytes.
51    None,
52    /// Server-managed keyring (active key on PUT, all keys probed on GET).
53    /// The keyring itself lives on `S4Service`; only the marker is held
54    /// here so `complete_multipart_upload` knows which path to take.
55    SseS4,
56    /// Per-request customer key. The 32-byte key + its 128-bit MD5 are
57    /// kept in memory only for the lifetime of the upload, then dropped
58    /// when the entry is `remove(...)`'d on Complete or Abort.
59    SseC {
60        key: [u8; 32],
61        key_md5: [u8; 16],
62    },
63    /// Named KMS key (resolved against the gateway's KMS backend on
64    /// Complete to generate the per-object DEK).
65    SseKms {
66        key_id: String,
67    },
68}
69
70/// Everything `CreateMultipartUpload` captured for `UploadPart` /
71/// `CompleteMultipartUpload` to act on. All fields are owned so the
72/// store can hand out cheap `Clone`s under the read lock.
73#[derive(Clone, Debug)]
74pub struct MultipartUploadContext {
75    /// Bucket the upload targets. Stored even though
76    /// `CompleteMultipartUploadInput::bucket` carries it too — keeps the
77    /// side-table self-contained for tests / debug dumps.
78    pub bucket: String,
79    /// Logical object key the upload will materialise into. Stored for
80    /// the same reason as `bucket`.
81    pub key: String,
82    /// SSE recipe captured from the Create's input headers.
83    pub sse: MultipartSseMode,
84    /// Tags parsed off `Tagging` / `x-amz-tagging` on Create. `None`
85    /// when the client didn't ask for tagging; otherwise the `TagSet` is
86    /// applied via `TagManager::put_object_tags` on Complete (BUG-9
87    /// fix).
88    pub tags: Option<TagSet>,
89    /// Per-PUT explicit Object Lock mode supplied via
90    /// `x-amz-object-lock-mode` on Create. Mirrors `put_object`'s
91    /// `explicit_lock_mode` capture so Complete commits the right
92    /// retention. `None` when no header was sent (Complete then falls
93    /// back to the bucket default via `apply_default_on_put`).
94    pub object_lock_mode: Option<LockMode>,
95    /// Per-PUT explicit Object Lock retain-until timestamp.
96    pub object_lock_retain_until: Option<DateTime<Utc>>,
97    /// Per-PUT explicit Object Lock legal-hold flag (`true` when
98    /// `x-amz-object-lock-legal-hold: ON` was sent on Create).
99    pub object_lock_legal_hold: bool,
100}
101
102/// In-memory side-table mapping `upload_id` → context. One of these
103/// hangs off `S4Service` (always-on, no flag — the per-upload state is
104/// gateway-internal).
105pub struct MultipartStateStore {
106    by_upload_id: RwLock<HashMap<String, MultipartUploadContext>>,
107    /// v0.8.1 #59: per-(bucket, key) `Mutex` used to serialize Complete
108    /// operations on the same logical key. The race window the lock
109    /// closes lives inside `service::complete_multipart_upload` between
110    /// `backend.get_object` (assembled body fetch for the SSE encrypt
111    /// re-PUT, BUG-5 fix) and `backend.put_object` (encrypted body
112    /// write-back). Two concurrent Completes with different `upload_id`
113    /// but the same `(bucket, key)` could otherwise interleave their
114    /// GET / encrypt / PUT triples and overwrite each other.
115    ///
116    /// `DashMap` is used because the lock acquisition path is itself
117    /// `O(1)` and contention between *different* keys must not block;
118    /// `DashMap`'s sharded design preserves that property whereas a
119    /// single `RwLock<HashMap<_,_>>` would serialise even unrelated
120    /// keys' lock-lookup. The stored `Arc<Mutex<()>>` is what the
121    /// caller actually awaits on — the `DashMap` itself is just a
122    /// concurrent index into those mutexes.
123    ///
124    /// Cleanup is best-effort (`prune_completion_locks`); the entry
125    /// for a one-shot key is dropped once both the in-flight Complete
126    /// returns and the prune sweep observes only the `DashMap`'s own
127    /// `Arc` reference.
128    completion_locks: DashMap<(String, String), Arc<Mutex<()>>>,
129}
130
131impl MultipartStateStore {
132    /// Empty store. Use `Arc<MultipartStateStore>` so `S4Service`'s
133    /// async handlers can borrow it across `&self` calls without
134    /// requiring `Clone`.
135    #[must_use]
136    pub fn new() -> Self {
137        Self {
138            by_upload_id: RwLock::new(HashMap::new()),
139            completion_locks: DashMap::new(),
140        }
141    }
142
143    /// Register a new upload under `upload_id`. If `upload_id` is
144    /// already present (extremely unlikely — backend issues fresh ids)
145    /// the previous entry is overwritten silently to mirror
146    /// `HashMap::insert`'s replace-on-collision semantics.
147    pub fn put(&self, upload_id: &str, ctx: MultipartUploadContext) {
148        self.by_upload_id
149            .write()
150            .expect("multipart-state by_upload_id RwLock poisoned")
151            .insert(upload_id.to_owned(), ctx);
152    }
153
154    /// Snapshot the context for `upload_id`. `None` when no entry was
155    /// registered (e.g. Complete arrived for an upload that the gateway
156    /// has no record of — passes through to the backend untouched, which
157    /// in turn surfaces `NoSuchUpload`).
158    #[must_use]
159    pub fn get(&self, upload_id: &str) -> Option<MultipartUploadContext> {
160        self.by_upload_id
161            .read()
162            .expect("multipart-state by_upload_id RwLock poisoned")
163            .get(upload_id)
164            .cloned()
165    }
166
167    /// Drop the entry. Called by Complete / Abort to release the SSE-C
168    /// key bytes and the tag-set memory promptly.
169    pub fn remove(&self, upload_id: &str) {
170        self.by_upload_id
171            .write()
172            .expect("multipart-state by_upload_id RwLock poisoned")
173            .remove(upload_id);
174    }
175
176    /// v0.8.1 #59: get-or-create the per-(bucket, key) `Mutex` used to
177    /// serialise `complete_multipart_upload` invocations on the same
178    /// logical key. Caller does `lock.lock().await` and holds the
179    /// guard for the duration of its critical section (GET assembled
180    /// body → encrypt → PUT encrypted body → version-id mint → object-
181    /// lock apply → tagging persist → replication enqueue).
182    ///
183    /// Returns an `Arc<Mutex<()>>` so the caller can drop the
184    /// `DashMap` shard's read lock immediately and only retain the
185    /// mutex itself across the await point — `DashMap`'s shard guard
186    /// is `!Send`, so we must not hold it through an `await`.
187    pub fn completion_lock(&self, bucket: &str, key: &str) -> Arc<Mutex<()>> {
188        let k = (bucket.to_owned(), key.to_owned());
189        self.completion_locks
190            .entry(k)
191            .or_insert_with(|| Arc::new(Mutex::new(())))
192            .value()
193            .clone()
194    }
195
196    /// v0.8.1 #59: best-effort cleanup of stale completion-lock
197    /// entries. A `(bucket, key)` entry is "stale" once no concurrent
198    /// Complete is referencing its `Arc<Mutex<()>>` — we detect that
199    /// by `Arc::strong_count == 1` (only the `DashMap` itself holds a
200    /// reference). Called from `complete_multipart_upload` after the
201    /// guarded section returns, so a steady-state workload of unique
202    /// keys never accumulates locks.
203    ///
204    /// The retain predicate is `> 1` (keep entries with outstanding
205    /// borrowers), so prune is safe to invoke concurrently with other
206    /// `completion_lock` callers — at worst the prune sees the entry
207    /// during a brief window where the borrower has cloned but not yet
208    /// taken `lock()`, and the entry survives until the next sweep.
209    pub fn prune_completion_locks(&self) {
210        self.completion_locks
211            .retain(|_, lock| Arc::strong_count(lock) > 1);
212    }
213
214    /// Test-only: how many completion-lock entries the store currently
215    /// holds. Used by `prune_completion_locks_removes_unreferenced`.
216    #[cfg(test)]
217    fn completion_locks_len(&self) -> usize {
218        self.completion_locks.len()
219    }
220
221    /// Test-only: how many in-flight uploads the store is currently
222    /// tracking. Used by the assertion in `concurrent_put_lookup_race_free`.
223    #[cfg(test)]
224    fn len(&self) -> usize {
225        self.by_upload_id
226            .read()
227            .expect("multipart-state by_upload_id RwLock poisoned")
228            .len()
229    }
230}
231
232impl Default for MultipartStateStore {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use std::sync::Arc;
242    use std::thread;
243
244    fn sample_ctx(bucket: &str, key: &str) -> MultipartUploadContext {
245        MultipartUploadContext {
246            bucket: bucket.to_owned(),
247            key: key.to_owned(),
248            sse: MultipartSseMode::None,
249            tags: None,
250            object_lock_mode: None,
251            object_lock_retain_until: None,
252            object_lock_legal_hold: false,
253        }
254    }
255
256    /// `put` followed by `get` returns the same context, and `remove`
257    /// makes a subsequent `get` return `None`. Sanity for the basic
258    /// CRUD shape.
259    #[test]
260    fn put_get_remove_round_trip() {
261        let store = MultipartStateStore::new();
262        let ctx = sample_ctx("b", "k");
263        store.put("upload-001", ctx.clone());
264        let got = store.get("upload-001").expect("entry must be present");
265        assert_eq!(got.bucket, "b");
266        assert_eq!(got.key, "k");
267        assert_eq!(got.sse, MultipartSseMode::None);
268        store.remove("upload-001");
269        assert!(store.get("upload-001").is_none(), "entry must be gone");
270    }
271
272    /// SSE-C variants stash the 32-byte key + 16-byte MD5; verify the
273    /// bytes round-trip exactly (defensive — easy place to introduce a
274    /// silent truncation bug).
275    #[test]
276    fn sse_c_key_bytes_round_trip() {
277        let store = MultipartStateStore::new();
278        let key = [0xa5u8; 32];
279        let key_md5 = [0xb6u8; 16];
280        let mut ctx = sample_ctx("b", "k");
281        ctx.sse = MultipartSseMode::SseC { key, key_md5 };
282        store.put("u-sse-c", ctx);
283        let got = store.get("u-sse-c").expect("entry must be present");
284        match got.sse {
285            MultipartSseMode::SseC { key: k, key_md5: m } => {
286                assert_eq!(k, key, "SSE-C key bytes must round-trip");
287                assert_eq!(m, key_md5, "SSE-C MD5 must round-trip");
288            }
289            other => panic!("expected SseC variant, got {other:?}"),
290        }
291    }
292
293    /// v0.8.1 #59: `completion_lock(bucket, key)` must return the
294    /// **same** `Arc<Mutex<()>>` for repeated calls on the same key,
295    /// otherwise concurrent Completes on the same key would each grab
296    /// a distinct mutex and the serialisation would silently degrade
297    /// to no-op. We compare `Arc::as_ptr()` rather than equality on
298    /// the inner `()` because two distinct `Mutex<()>` instances would
299    /// have different addresses but compare equal under `==` (unit
300    /// type).
301    #[test]
302    fn completion_lock_returns_same_arc_for_same_key() {
303        let store = MultipartStateStore::new();
304        let a = store.completion_lock("bucket-a", "key/x");
305        let b = store.completion_lock("bucket-a", "key/x");
306        assert!(
307            Arc::ptr_eq(&a, &b),
308            "completion_lock(same bucket, same key) must return identical Arc"
309        );
310    }
311
312    /// v0.8.1 #59: locks for distinct `(bucket, key)` tuples must be
313    /// independent — concurrent Completes on different keys must not
314    /// serialise on each other. We acquire two locks back-to-back
315    /// (`try_lock` so the assertion is deterministic and doesn't
316    /// depend on a runtime); both must succeed without contention.
317    /// Also exercises bucket-vs-key disjointness: same key under two
318    /// different buckets must NOT alias.
319    #[tokio::test]
320    async fn completion_lock_distinct_keys_independent() {
321        let store = MultipartStateStore::new();
322        let a = store.completion_lock("bucket-a", "shared/key");
323        let b = store.completion_lock("bucket-b", "shared/key");
324        assert!(
325            !Arc::ptr_eq(&a, &b),
326            "completion_lock with different bucket must yield different Arc"
327        );
328        // Hold the first lock and acquire the second under the same
329        // task — must NOT deadlock and must NOT block. `try_lock`
330        // returns `Ok(MutexGuard)` when uncontended, `Err` otherwise.
331        let guard_a = a.try_lock().expect("lock on bucket-a/shared/key must be free");
332        let guard_b = b.try_lock().expect("lock on bucket-b/shared/key must be free");
333        // Same key, same bucket from a third call must alias `a` and
334        // therefore be contended (a's guard is held above).
335        let a2 = store.completion_lock("bucket-a", "shared/key");
336        assert!(
337            Arc::ptr_eq(&a, &a2),
338            "completion_lock for the same (bucket, key) must alias"
339        );
340        assert!(
341            a2.try_lock().is_err(),
342            "completion_lock alias must observe the held guard as contended"
343        );
344        drop(guard_a);
345        drop(guard_b);
346    }
347
348    /// v0.8.1 #59: `prune_completion_locks` must drop entries whose
349    /// only `Arc` is the `DashMap`'s own (i.e. no in-flight Complete is
350    /// holding a reference). After we acquire a lock then drop the
351    /// returned `Arc`, the `strong_count` falls to 1 and prune must
352    /// retire the entry so a steady-state workload of unique keys
353    /// doesn't accumulate. Conversely, an entry with an outstanding
354    /// `Arc` reference must survive prune.
355    #[test]
356    fn prune_completion_locks_removes_unreferenced() {
357        let store = MultipartStateStore::new();
358        // Acquire-and-drop: simulates a Complete that finished and let
359        // its `Arc<Mutex<()>>` go out of scope. `strong_count == 1`
360        // afterwards (only the `DashMap` retains it).
361        {
362            let _lock = store.completion_lock("b", "ephemeral");
363        }
364        assert_eq!(
365            store.completion_locks_len(),
366            1,
367            "lock entry must be present immediately after acquire-drop"
368        );
369        store.prune_completion_locks();
370        assert_eq!(
371            store.completion_locks_len(),
372            0,
373            "prune must retire entries with strong_count == 1"
374        );
375
376        // Negative case: an outstanding `Arc` must NOT be pruned —
377        // pruning a still-borrowed entry would let a concurrent
378        // Complete miss the serialisation point.
379        let held = store.completion_lock("b", "in-flight");
380        store.prune_completion_locks();
381        assert_eq!(
382            store.completion_locks_len(),
383            1,
384            "prune must keep entries with outstanding Arc borrowers"
385        );
386        drop(held);
387        store.prune_completion_locks();
388        assert_eq!(
389            store.completion_locks_len(),
390            0,
391            "prune must retire the entry once the borrower drops"
392        );
393    }
394
395    /// 8 threads each register 250 distinct upload_ids and immediately
396    /// look them up. After `join` the store must contain exactly the
397    /// 8 × 250 entries — verifies `RwLock` doesn't drop writes under
398    /// concurrent contention (the obvious refactor that swaps to
399    /// `HashMap` without a lock would visibly fail this).
400    #[test]
401    fn concurrent_put_lookup_race_free() {
402        let store = Arc::new(MultipartStateStore::new());
403        let mut handles = Vec::new();
404        for tid in 0..8u32 {
405            let st = Arc::clone(&store);
406            handles.push(thread::spawn(move || {
407                for i in 0..250u32 {
408                    let id = format!("u-{tid}-{i}");
409                    let ctx = sample_ctx("b", &id);
410                    st.put(&id, ctx);
411                    // Immediate lookup proves the writer-side observer
412                    // sees its own put under the RwLock.
413                    let got = st.get(&id).expect("self-put must be visible");
414                    assert_eq!(got.key, id);
415                }
416            }));
417        }
418        for h in handles {
419            h.join().expect("worker thread panicked");
420        }
421        assert_eq!(store.len(), 8 * 250, "all puts must persist");
422    }
423}