Skip to main content

s4_server/
multipart_state.rs

1//! Per-`upload_id` side-table for multipart uploads (v0.8 BUG-5..10 fix).
2//!
3//! S3 multipart is split across three handlers:
4//!
5//!   - `CreateMultipartUpload` — receives the SSE / Tagging / Object-Lock
6//!     headers the client wants applied to the eventual object.
7//!   - `UploadPart` × N — receives only the body bytes + part number;
8//!     the SSE-C headers must be replayed by the client (AWS spec) but
9//!     SSE-S4 / SSE-KMS / Tagging / Object-Lock are NOT replayed (they
10//!     live on the upload itself).
11//!   - `CompleteMultipartUpload` — receives only the part-list manifest;
12//!     no metadata reaches this handler from the wire either.
13//!
14//! v0.7 #48 fixed the single-PUT path to take()`SSE` request fields off
15//! the s3s input, encrypt-then-store, and stamp the `s4-sse-type`
16//! metadata on the resulting object so HEAD can echo correctly. The
17//! multipart path needs the equivalent treatment but the per-upload
18//! context is split across three handler invocations — this module is
19//! the side-channel that carries it from `CreateMultipartUpload` through
20//! to `UploadPart` / `CompleteMultipartUpload`.
21//!
22//! The store is keyed on the backend-issued `upload_id` (opaque string
23//! returned by `CreateMultipartUpload`'s response). `put` / `get` /
24//! `remove` are all `O(1)` under a single `RwLock<HashMap>`; multipart
25//! upload throughput is dominated by the part-body PUTs to the backend
26//! (5 MiB+ each), so the lock is never the bottleneck.
27
28use std::collections::HashMap;
29use std::sync::RwLock;
30
31use chrono::{DateTime, Utc};
32
33use crate::object_lock::LockMode;
34use crate::tagging::TagSet;
35
36/// SSE recipe captured at `CreateMultipartUpload` time and replayed for
37/// every part body + the final stamp on the assembled object.
38///
39/// The variants mirror `service::put_object`'s SSE branch precedence:
40/// SSE-C (per-request customer key) wins over SSE-KMS (named KMS key)
41/// wins over SSE-S4 (server-managed keyring) wins over no encryption.
42/// SSE-C / SSE-KMS materialise only when the client supplied the
43/// matching headers; SSE-S4 materialises whenever the gateway is booted
44/// with `--sse-s4-key` (or `with_sse_keyring(...)` in tests).
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub enum MultipartSseMode {
47    /// Plaintext multipart. Backend stores raw framed bytes.
48    None,
49    /// Server-managed keyring (active key on PUT, all keys probed on GET).
50    /// The keyring itself lives on `S4Service`; only the marker is held
51    /// here so `complete_multipart_upload` knows which path to take.
52    SseS4,
53    /// Per-request customer key. The 32-byte key + its 128-bit MD5 are
54    /// kept in memory only for the lifetime of the upload, then dropped
55    /// when the entry is `remove(...)`'d on Complete or Abort.
56    SseC {
57        key: [u8; 32],
58        key_md5: [u8; 16],
59    },
60    /// Named KMS key (resolved against the gateway's KMS backend on
61    /// Complete to generate the per-object DEK).
62    SseKms {
63        key_id: String,
64    },
65}
66
67/// Everything `CreateMultipartUpload` captured for `UploadPart` /
68/// `CompleteMultipartUpload` to act on. All fields are owned so the
69/// store can hand out cheap `Clone`s under the read lock.
70#[derive(Clone, Debug)]
71pub struct MultipartUploadContext {
72    /// Bucket the upload targets. Stored even though
73    /// `CompleteMultipartUploadInput::bucket` carries it too — keeps the
74    /// side-table self-contained for tests / debug dumps.
75    pub bucket: String,
76    /// Logical object key the upload will materialise into. Stored for
77    /// the same reason as `bucket`.
78    pub key: String,
79    /// SSE recipe captured from the Create's input headers.
80    pub sse: MultipartSseMode,
81    /// Tags parsed off `Tagging` / `x-amz-tagging` on Create. `None`
82    /// when the client didn't ask for tagging; otherwise the `TagSet` is
83    /// applied via `TagManager::put_object_tags` on Complete (BUG-9
84    /// fix).
85    pub tags: Option<TagSet>,
86    /// Per-PUT explicit Object Lock mode supplied via
87    /// `x-amz-object-lock-mode` on Create. Mirrors `put_object`'s
88    /// `explicit_lock_mode` capture so Complete commits the right
89    /// retention. `None` when no header was sent (Complete then falls
90    /// back to the bucket default via `apply_default_on_put`).
91    pub object_lock_mode: Option<LockMode>,
92    /// Per-PUT explicit Object Lock retain-until timestamp.
93    pub object_lock_retain_until: Option<DateTime<Utc>>,
94    /// Per-PUT explicit Object Lock legal-hold flag (`true` when
95    /// `x-amz-object-lock-legal-hold: ON` was sent on Create).
96    pub object_lock_legal_hold: bool,
97}
98
99/// In-memory side-table mapping `upload_id` → context. One of these
100/// hangs off `S4Service` (always-on, no flag — the per-upload state is
101/// gateway-internal).
102pub struct MultipartStateStore {
103    by_upload_id: RwLock<HashMap<String, MultipartUploadContext>>,
104}
105
106impl MultipartStateStore {
107    /// Empty store. Use `Arc<MultipartStateStore>` so `S4Service`'s
108    /// async handlers can borrow it across `&self` calls without
109    /// requiring `Clone`.
110    #[must_use]
111    pub fn new() -> Self {
112        Self {
113            by_upload_id: RwLock::new(HashMap::new()),
114        }
115    }
116
117    /// Register a new upload under `upload_id`. If `upload_id` is
118    /// already present (extremely unlikely — backend issues fresh ids)
119    /// the previous entry is overwritten silently to mirror
120    /// `HashMap::insert`'s replace-on-collision semantics.
121    pub fn put(&self, upload_id: &str, ctx: MultipartUploadContext) {
122        self.by_upload_id
123            .write()
124            .expect("multipart-state by_upload_id RwLock poisoned")
125            .insert(upload_id.to_owned(), ctx);
126    }
127
128    /// Snapshot the context for `upload_id`. `None` when no entry was
129    /// registered (e.g. Complete arrived for an upload that the gateway
130    /// has no record of — passes through to the backend untouched, which
131    /// in turn surfaces `NoSuchUpload`).
132    #[must_use]
133    pub fn get(&self, upload_id: &str) -> Option<MultipartUploadContext> {
134        self.by_upload_id
135            .read()
136            .expect("multipart-state by_upload_id RwLock poisoned")
137            .get(upload_id)
138            .cloned()
139    }
140
141    /// Drop the entry. Called by Complete / Abort to release the SSE-C
142    /// key bytes and the tag-set memory promptly.
143    pub fn remove(&self, upload_id: &str) {
144        self.by_upload_id
145            .write()
146            .expect("multipart-state by_upload_id RwLock poisoned")
147            .remove(upload_id);
148    }
149
150    /// Test-only: how many in-flight uploads the store is currently
151    /// tracking. Used by the assertion in `concurrent_put_lookup_race_free`.
152    #[cfg(test)]
153    fn len(&self) -> usize {
154        self.by_upload_id
155            .read()
156            .expect("multipart-state by_upload_id RwLock poisoned")
157            .len()
158    }
159}
160
161impl Default for MultipartStateStore {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use std::sync::Arc;
171    use std::thread;
172
173    fn sample_ctx(bucket: &str, key: &str) -> MultipartUploadContext {
174        MultipartUploadContext {
175            bucket: bucket.to_owned(),
176            key: key.to_owned(),
177            sse: MultipartSseMode::None,
178            tags: None,
179            object_lock_mode: None,
180            object_lock_retain_until: None,
181            object_lock_legal_hold: false,
182        }
183    }
184
185    /// `put` followed by `get` returns the same context, and `remove`
186    /// makes a subsequent `get` return `None`. Sanity for the basic
187    /// CRUD shape.
188    #[test]
189    fn put_get_remove_round_trip() {
190        let store = MultipartStateStore::new();
191        let ctx = sample_ctx("b", "k");
192        store.put("upload-001", ctx.clone());
193        let got = store.get("upload-001").expect("entry must be present");
194        assert_eq!(got.bucket, "b");
195        assert_eq!(got.key, "k");
196        assert_eq!(got.sse, MultipartSseMode::None);
197        store.remove("upload-001");
198        assert!(store.get("upload-001").is_none(), "entry must be gone");
199    }
200
201    /// SSE-C variants stash the 32-byte key + 16-byte MD5; verify the
202    /// bytes round-trip exactly (defensive — easy place to introduce a
203    /// silent truncation bug).
204    #[test]
205    fn sse_c_key_bytes_round_trip() {
206        let store = MultipartStateStore::new();
207        let key = [0xa5u8; 32];
208        let key_md5 = [0xb6u8; 16];
209        let mut ctx = sample_ctx("b", "k");
210        ctx.sse = MultipartSseMode::SseC { key, key_md5 };
211        store.put("u-sse-c", ctx);
212        let got = store.get("u-sse-c").expect("entry must be present");
213        match got.sse {
214            MultipartSseMode::SseC { key: k, key_md5: m } => {
215                assert_eq!(k, key, "SSE-C key bytes must round-trip");
216                assert_eq!(m, key_md5, "SSE-C MD5 must round-trip");
217            }
218            other => panic!("expected SseC variant, got {other:?}"),
219        }
220    }
221
222    /// 8 threads each register 250 distinct upload_ids and immediately
223    /// look them up. After `join` the store must contain exactly the
224    /// 8 × 250 entries — verifies `RwLock` doesn't drop writes under
225    /// concurrent contention (the obvious refactor that swaps to
226    /// `HashMap` without a lock would visibly fail this).
227    #[test]
228    fn concurrent_put_lookup_race_free() {
229        let store = Arc::new(MultipartStateStore::new());
230        let mut handles = Vec::new();
231        for tid in 0..8u32 {
232            let st = Arc::clone(&store);
233            handles.push(thread::spawn(move || {
234                for i in 0..250u32 {
235                    let id = format!("u-{tid}-{i}");
236                    let ctx = sample_ctx("b", &id);
237                    st.put(&id, ctx);
238                    // Immediate lookup proves the writer-side observer
239                    // sees its own put under the RwLock.
240                    let got = st.get(&id).expect("self-put must be visible");
241                    assert_eq!(got.key, id);
242                }
243            }));
244        }
245        for h in handles {
246            h.join().expect("worker thread panicked");
247        }
248        assert_eq!(store.len(), 8 * 250, "all puts must persist");
249    }
250}