s4-server 0.8.1

S4 — Squished S3 — GPU-accelerated transparent compression S3-compatible storage gateway (cargo install s4-server installs the `s4` binary).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
//! Per-`upload_id` side-table for multipart uploads (v0.8 BUG-5..10 fix).
//!
//! S3 multipart is split across three handlers:
//!
//!   - `CreateMultipartUpload` — receives the SSE / Tagging / Object-Lock
//!     headers the client wants applied to the eventual object.
//!   - `UploadPart` × N — receives only the body bytes + part number;
//!     the SSE-C headers must be replayed by the client (AWS spec) but
//!     SSE-S4 / SSE-KMS / Tagging / Object-Lock are NOT replayed (they
//!     live on the upload itself).
//!   - `CompleteMultipartUpload` — receives only the part-list manifest;
//!     no metadata reaches this handler from the wire either.
//!
//! v0.7 #48 fixed the single-PUT path to take()`SSE` request fields off
//! the s3s input, encrypt-then-store, and stamp the `s4-sse-type`
//! metadata on the resulting object so HEAD can echo correctly. The
//! multipart path needs the equivalent treatment but the per-upload
//! context is split across three handler invocations — this module is
//! the side-channel that carries it from `CreateMultipartUpload` through
//! to `UploadPart` / `CompleteMultipartUpload`.
//!
//! The store is keyed on the backend-issued `upload_id` (opaque string
//! returned by `CreateMultipartUpload`'s response). `put` / `get` /
//! `remove` are all `O(1)` under a single `RwLock<HashMap>`; multipart
//! upload throughput is dominated by the part-body PUTs to the backend
//! (5 MiB+ each), so the lock is never the bottleneck.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use chrono::{DateTime, Utc};
use dashmap::DashMap;
use tokio::sync::Mutex;

use crate::object_lock::LockMode;
use crate::tagging::TagSet;

/// SSE recipe captured at `CreateMultipartUpload` time and replayed for
/// every part body + the final stamp on the assembled object.
///
/// The variants mirror `service::put_object`'s SSE branch precedence:
/// SSE-C (per-request customer key) wins over SSE-KMS (named KMS key)
/// wins over SSE-S4 (server-managed keyring) wins over no encryption.
/// SSE-C / SSE-KMS materialise only when the client supplied the
/// matching headers; SSE-S4 materialises whenever the gateway is booted
/// with `--sse-s4-key` (or `with_sse_keyring(...)` in tests).
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MultipartSseMode {
    /// Plaintext multipart. Backend stores raw framed bytes.
    None,
    /// Server-managed keyring (active key on PUT, all keys probed on GET).
    /// The keyring itself lives on `S4Service`; only the marker is held
    /// here so `complete_multipart_upload` knows which path to take.
    SseS4,
    /// Per-request customer key. The 32-byte key + its 128-bit MD5 are
    /// kept in memory only for the lifetime of the upload, then dropped
    /// when the entry is `remove(...)`'d on Complete or Abort.
    SseC {
        key: [u8; 32],
        key_md5: [u8; 16],
    },
    /// Named KMS key (resolved against the gateway's KMS backend on
    /// Complete to generate the per-object DEK).
    SseKms {
        key_id: String,
    },
}

/// Everything `CreateMultipartUpload` captured for `UploadPart` /
/// `CompleteMultipartUpload` to act on. All fields are owned so the
/// store can hand out cheap `Clone`s under the read lock.
#[derive(Clone, Debug)]
pub struct MultipartUploadContext {
    /// Bucket the upload targets. Stored even though
    /// `CompleteMultipartUploadInput::bucket` carries it too — keeps the
    /// side-table self-contained for tests / debug dumps.
    pub bucket: String,
    /// Logical object key the upload will materialise into. Stored for
    /// the same reason as `bucket`.
    pub key: String,
    /// SSE recipe captured from the Create's input headers.
    pub sse: MultipartSseMode,
    /// Tags parsed off `Tagging` / `x-amz-tagging` on Create. `None`
    /// when the client didn't ask for tagging; otherwise the `TagSet` is
    /// applied via `TagManager::put_object_tags` on Complete (BUG-9
    /// fix).
    pub tags: Option<TagSet>,
    /// Per-PUT explicit Object Lock mode supplied via
    /// `x-amz-object-lock-mode` on Create. Mirrors `put_object`'s
    /// `explicit_lock_mode` capture so Complete commits the right
    /// retention. `None` when no header was sent (Complete then falls
    /// back to the bucket default via `apply_default_on_put`).
    pub object_lock_mode: Option<LockMode>,
    /// Per-PUT explicit Object Lock retain-until timestamp.
    pub object_lock_retain_until: Option<DateTime<Utc>>,
    /// Per-PUT explicit Object Lock legal-hold flag (`true` when
    /// `x-amz-object-lock-legal-hold: ON` was sent on Create).
    pub object_lock_legal_hold: bool,
}

/// In-memory side-table mapping `upload_id` → context. One of these
/// hangs off `S4Service` (always-on, no flag — the per-upload state is
/// gateway-internal).
pub struct MultipartStateStore {
    by_upload_id: RwLock<HashMap<String, MultipartUploadContext>>,
    /// v0.8.1 #59: per-(bucket, key) `Mutex` used to serialize Complete
    /// operations on the same logical key. The race window the lock
    /// closes lives inside `service::complete_multipart_upload` between
    /// `backend.get_object` (assembled body fetch for the SSE encrypt
    /// re-PUT, BUG-5 fix) and `backend.put_object` (encrypted body
    /// write-back). Two concurrent Completes with different `upload_id`
    /// but the same `(bucket, key)` could otherwise interleave their
    /// GET / encrypt / PUT triples and overwrite each other.
    ///
    /// `DashMap` is used because the lock acquisition path is itself
    /// `O(1)` and contention between *different* keys must not block;
    /// `DashMap`'s sharded design preserves that property whereas a
    /// single `RwLock<HashMap<_,_>>` would serialise even unrelated
    /// keys' lock-lookup. The stored `Arc<Mutex<()>>` is what the
    /// caller actually awaits on — the `DashMap` itself is just a
    /// concurrent index into those mutexes.
    ///
    /// Cleanup is best-effort (`prune_completion_locks`); the entry
    /// for a one-shot key is dropped once both the in-flight Complete
    /// returns and the prune sweep observes only the `DashMap`'s own
    /// `Arc` reference.
    completion_locks: DashMap<(String, String), Arc<Mutex<()>>>,
}

impl MultipartStateStore {
    /// Empty store. Use `Arc<MultipartStateStore>` so `S4Service`'s
    /// async handlers can borrow it across `&self` calls without
    /// requiring `Clone`.
    #[must_use]
    pub fn new() -> Self {
        Self {
            by_upload_id: RwLock::new(HashMap::new()),
            completion_locks: DashMap::new(),
        }
    }

    /// Register a new upload under `upload_id`. If `upload_id` is
    /// already present (extremely unlikely — backend issues fresh ids)
    /// the previous entry is overwritten silently to mirror
    /// `HashMap::insert`'s replace-on-collision semantics.
    pub fn put(&self, upload_id: &str, ctx: MultipartUploadContext) {
        self.by_upload_id
            .write()
            .expect("multipart-state by_upload_id RwLock poisoned")
            .insert(upload_id.to_owned(), ctx);
    }

    /// Snapshot the context for `upload_id`. `None` when no entry was
    /// registered (e.g. Complete arrived for an upload that the gateway
    /// has no record of — passes through to the backend untouched, which
    /// in turn surfaces `NoSuchUpload`).
    #[must_use]
    pub fn get(&self, upload_id: &str) -> Option<MultipartUploadContext> {
        self.by_upload_id
            .read()
            .expect("multipart-state by_upload_id RwLock poisoned")
            .get(upload_id)
            .cloned()
    }

    /// Drop the entry. Called by Complete / Abort to release the SSE-C
    /// key bytes and the tag-set memory promptly.
    pub fn remove(&self, upload_id: &str) {
        self.by_upload_id
            .write()
            .expect("multipart-state by_upload_id RwLock poisoned")
            .remove(upload_id);
    }

    /// v0.8.1 #59: get-or-create the per-(bucket, key) `Mutex` used to
    /// serialise `complete_multipart_upload` invocations on the same
    /// logical key. Caller does `lock.lock().await` and holds the
    /// guard for the duration of its critical section (GET assembled
    /// body → encrypt → PUT encrypted body → version-id mint → object-
    /// lock apply → tagging persist → replication enqueue).
    ///
    /// Returns an `Arc<Mutex<()>>` so the caller can drop the
    /// `DashMap` shard's read lock immediately and only retain the
    /// mutex itself across the await point — `DashMap`'s shard guard
    /// is `!Send`, so we must not hold it through an `await`.
    pub fn completion_lock(&self, bucket: &str, key: &str) -> Arc<Mutex<()>> {
        let k = (bucket.to_owned(), key.to_owned());
        self.completion_locks
            .entry(k)
            .or_insert_with(|| Arc::new(Mutex::new(())))
            .value()
            .clone()
    }

    /// v0.8.1 #59: best-effort cleanup of stale completion-lock
    /// entries. A `(bucket, key)` entry is "stale" once no concurrent
    /// Complete is referencing its `Arc<Mutex<()>>` — we detect that
    /// by `Arc::strong_count == 1` (only the `DashMap` itself holds a
    /// reference). Called from `complete_multipart_upload` after the
    /// guarded section returns, so a steady-state workload of unique
    /// keys never accumulates locks.
    ///
    /// The retain predicate is `> 1` (keep entries with outstanding
    /// borrowers), so prune is safe to invoke concurrently with other
    /// `completion_lock` callers — at worst the prune sees the entry
    /// during a brief window where the borrower has cloned but not yet
    /// taken `lock()`, and the entry survives until the next sweep.
    pub fn prune_completion_locks(&self) {
        self.completion_locks
            .retain(|_, lock| Arc::strong_count(lock) > 1);
    }

    /// Test-only: how many completion-lock entries the store currently
    /// holds. Used by `prune_completion_locks_removes_unreferenced`.
    #[cfg(test)]
    fn completion_locks_len(&self) -> usize {
        self.completion_locks.len()
    }

    /// Test-only: how many in-flight uploads the store is currently
    /// tracking. Used by the assertion in `concurrent_put_lookup_race_free`.
    #[cfg(test)]
    fn len(&self) -> usize {
        self.by_upload_id
            .read()
            .expect("multipart-state by_upload_id RwLock poisoned")
            .len()
    }
}

impl Default for MultipartStateStore {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::thread;

    fn sample_ctx(bucket: &str, key: &str) -> MultipartUploadContext {
        MultipartUploadContext {
            bucket: bucket.to_owned(),
            key: key.to_owned(),
            sse: MultipartSseMode::None,
            tags: None,
            object_lock_mode: None,
            object_lock_retain_until: None,
            object_lock_legal_hold: false,
        }
    }

    /// `put` followed by `get` returns the same context, and `remove`
    /// makes a subsequent `get` return `None`. Sanity for the basic
    /// CRUD shape.
    #[test]
    fn put_get_remove_round_trip() {
        let store = MultipartStateStore::new();
        let ctx = sample_ctx("b", "k");
        store.put("upload-001", ctx.clone());
        let got = store.get("upload-001").expect("entry must be present");
        assert_eq!(got.bucket, "b");
        assert_eq!(got.key, "k");
        assert_eq!(got.sse, MultipartSseMode::None);
        store.remove("upload-001");
        assert!(store.get("upload-001").is_none(), "entry must be gone");
    }

    /// SSE-C variants stash the 32-byte key + 16-byte MD5; verify the
    /// bytes round-trip exactly (defensive — easy place to introduce a
    /// silent truncation bug).
    #[test]
    fn sse_c_key_bytes_round_trip() {
        let store = MultipartStateStore::new();
        let key = [0xa5u8; 32];
        let key_md5 = [0xb6u8; 16];
        let mut ctx = sample_ctx("b", "k");
        ctx.sse = MultipartSseMode::SseC { key, key_md5 };
        store.put("u-sse-c", ctx);
        let got = store.get("u-sse-c").expect("entry must be present");
        match got.sse {
            MultipartSseMode::SseC { key: k, key_md5: m } => {
                assert_eq!(k, key, "SSE-C key bytes must round-trip");
                assert_eq!(m, key_md5, "SSE-C MD5 must round-trip");
            }
            other => panic!("expected SseC variant, got {other:?}"),
        }
    }

    /// v0.8.1 #59: `completion_lock(bucket, key)` must return the
    /// **same** `Arc<Mutex<()>>` for repeated calls on the same key,
    /// otherwise concurrent Completes on the same key would each grab
    /// a distinct mutex and the serialisation would silently degrade
    /// to no-op. We compare `Arc::as_ptr()` rather than equality on
    /// the inner `()` because two distinct `Mutex<()>` instances would
    /// have different addresses but compare equal under `==` (unit
    /// type).
    #[test]
    fn completion_lock_returns_same_arc_for_same_key() {
        let store = MultipartStateStore::new();
        let a = store.completion_lock("bucket-a", "key/x");
        let b = store.completion_lock("bucket-a", "key/x");
        assert!(
            Arc::ptr_eq(&a, &b),
            "completion_lock(same bucket, same key) must return identical Arc"
        );
    }

    /// v0.8.1 #59: locks for distinct `(bucket, key)` tuples must be
    /// independent — concurrent Completes on different keys must not
    /// serialise on each other. We acquire two locks back-to-back
    /// (`try_lock` so the assertion is deterministic and doesn't
    /// depend on a runtime); both must succeed without contention.
    /// Also exercises bucket-vs-key disjointness: same key under two
    /// different buckets must NOT alias.
    #[tokio::test]
    async fn completion_lock_distinct_keys_independent() {
        let store = MultipartStateStore::new();
        let a = store.completion_lock("bucket-a", "shared/key");
        let b = store.completion_lock("bucket-b", "shared/key");
        assert!(
            !Arc::ptr_eq(&a, &b),
            "completion_lock with different bucket must yield different Arc"
        );
        // Hold the first lock and acquire the second under the same
        // task — must NOT deadlock and must NOT block. `try_lock`
        // returns `Ok(MutexGuard)` when uncontended, `Err` otherwise.
        let guard_a = a.try_lock().expect("lock on bucket-a/shared/key must be free");
        let guard_b = b.try_lock().expect("lock on bucket-b/shared/key must be free");
        // Same key, same bucket from a third call must alias `a` and
        // therefore be contended (a's guard is held above).
        let a2 = store.completion_lock("bucket-a", "shared/key");
        assert!(
            Arc::ptr_eq(&a, &a2),
            "completion_lock for the same (bucket, key) must alias"
        );
        assert!(
            a2.try_lock().is_err(),
            "completion_lock alias must observe the held guard as contended"
        );
        drop(guard_a);
        drop(guard_b);
    }

    /// v0.8.1 #59: `prune_completion_locks` must drop entries whose
    /// only `Arc` is the `DashMap`'s own (i.e. no in-flight Complete is
    /// holding a reference). After we acquire a lock then drop the
    /// returned `Arc`, the `strong_count` falls to 1 and prune must
    /// retire the entry so a steady-state workload of unique keys
    /// doesn't accumulate. Conversely, an entry with an outstanding
    /// `Arc` reference must survive prune.
    #[test]
    fn prune_completion_locks_removes_unreferenced() {
        let store = MultipartStateStore::new();
        // Acquire-and-drop: simulates a Complete that finished and let
        // its `Arc<Mutex<()>>` go out of scope. `strong_count == 1`
        // afterwards (only the `DashMap` retains it).
        {
            let _lock = store.completion_lock("b", "ephemeral");
        }
        assert_eq!(
            store.completion_locks_len(),
            1,
            "lock entry must be present immediately after acquire-drop"
        );
        store.prune_completion_locks();
        assert_eq!(
            store.completion_locks_len(),
            0,
            "prune must retire entries with strong_count == 1"
        );

        // Negative case: an outstanding `Arc` must NOT be pruned —
        // pruning a still-borrowed entry would let a concurrent
        // Complete miss the serialisation point.
        let held = store.completion_lock("b", "in-flight");
        store.prune_completion_locks();
        assert_eq!(
            store.completion_locks_len(),
            1,
            "prune must keep entries with outstanding Arc borrowers"
        );
        drop(held);
        store.prune_completion_locks();
        assert_eq!(
            store.completion_locks_len(),
            0,
            "prune must retire the entry once the borrower drops"
        );
    }

    /// 8 threads each register 250 distinct upload_ids and immediately
    /// look them up. After `join` the store must contain exactly the
    /// 8 × 250 entries — verifies `RwLock` doesn't drop writes under
    /// concurrent contention (the obvious refactor that swaps to
    /// `HashMap` without a lock would visibly fail this).
    #[test]
    fn concurrent_put_lookup_race_free() {
        let store = Arc::new(MultipartStateStore::new());
        let mut handles = Vec::new();
        for tid in 0..8u32 {
            let st = Arc::clone(&store);
            handles.push(thread::spawn(move || {
                for i in 0..250u32 {
                    let id = format!("u-{tid}-{i}");
                    let ctx = sample_ctx("b", &id);
                    st.put(&id, ctx);
                    // Immediate lookup proves the writer-side observer
                    // sees its own put under the RwLock.
                    let got = st.get(&id).expect("self-put must be visible");
                    assert_eq!(got.key, id);
                }
            }));
        }
        for h in handles {
            h.join().expect("worker thread panicked");
        }
        assert_eq!(store.len(), 8 * 250, "all puts must persist");
    }
}