Skip to main content

umbral_core/
storage.rs

1//! Storage: the file-bytes backend abstraction and its ambient registry.
2//!
3//! ## What this is
4//!
5//! [`Storage`] is to file bytes what [`crate::db::DbPool`] is to database
6//! rows: a small, backend-agnostic seam the rest of the framework writes
7//! against without caring whether the bytes land on a local filesystem,
8//! S3, or anything else. A plugin (today `umbral-storage` with its
9//! `FsStorage`) provides the concrete impl and registers it as the
10//! ambient default; future `FileField` / `ImageField` and the admin
11//! resolve uploads through [`storage`] without knowing the backend.
12//!
13//! `umbral-core` defines the trait but never names a concrete impl — the
14//! filesystem backend lives in the `umbral-storage` plugin. This is the
15//! dependency-inversion rule from `CLAUDE.md`: dependencies point inward
16//! toward core, control flows outward through the trait. Cargo's ban on
17//! circular deps enforces that core can't reach back into the plugin.
18//!
19//! ## Why an ambient global
20//!
21//! The storage backend is registered once at boot and read ambiently,
22//! exactly like the DB pool (`crate::db`'s `DB_POOL`) and the template
23//! engine — "the one intentional global" family sanctioned in `CLAUDE.md`.
24//! A storage backend is a *backend service* (like the pool), not arbitrary
25//! shared state: threading an `Arc<dyn Storage>` through every field
26//! render, admin view, and upload handler would be the same boilerplate
27//! the pool's `OnceLock` was introduced to avoid. The set-once discipline
28//! (first registration wins; a second warns rather than panics) mirrors
29//! `crate::db::init` / `crate::settings::init`.
30//!
31//! The ORM-only rule (`CLAUDE.md`) governs *database rows*, not file
32//! bytes: `std::fs` / object-store I/O inside a `Storage` impl is the
33//! sanctioned path, not a raw-SQL workaround.
34
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex, OnceLock};
37
38use async_trait::async_trait;
39
40/// Re-export of `async-trait` so a plugin implementing the
41/// `#[async_trait]` [`Storage`] trait can name the attribute without a
42/// direct `async-trait` dep. Surfaced on the facade as
43/// `umbral::storage::async_trait`. Mirrors the forms module's re-export.
44pub use async_trait::async_trait as async_trait_reexport;
45
46/// A boxed, pinned byte-stream — the streaming-upload/download currency of
47/// [`Storage::store_stream`] / [`Storage::retrieve_stream`].
48///
49/// Object-safe (it's a trait object behind a `Box`, so it survives through
50/// `Arc<dyn Storage>` dispatch) and `Send` so it can cross an `.await` on a
51/// multi-threaded runtime. Each item is a `bytes::Bytes` chunk or an
52/// [`std::io::Error`]; an error item aborts the stream.
53pub type ByteStream =
54    std::pin::Pin<Box<dyn futures_util::Stream<Item = Result<bytes::Bytes, std::io::Error>> + Send>>;
55
56/// The `ErrorKind` a [`cap_stream`] over-limit error carries, so a wrapper
57/// (e.g. `SizeLimitedStorage`) can recognise "the cap tripped" versus a
58/// genuine backend IO failure and map it to [`StorageError::TooLarge`].
59pub const CAP_EXCEEDED_KIND: std::io::ErrorKind = std::io::ErrorKind::Other;
60
61/// Sentinel string carried in a [`cap_stream`] over-limit error's message,
62/// so the cap can be distinguished from any other `ErrorKind::Other`.
63pub const CAP_EXCEEDED_MARKER: &str = "umbral-storage-cap-exceeded";
64
65/// Wrap `body` so it passes bytes through untouched until the cumulative
66/// byte count would exceed `max`, at which point it yields a single
67/// `Err(io::Error)` (kind [`CAP_EXCEEDED_KIND`], message [`CAP_EXCEEDED_MARKER`])
68/// and ends.
69///
70/// **This is the load-bearing security primitive for streaming uploads.**
71/// The cap is enforced *as bytes flow*, never from a declared length: a
72/// client that lies about (or omits) its `Content-Length` is still cut off
73/// the instant the real bytes cross `max`, so an oversized upload can never
74/// be fully written. A wrapper maps the marker error to
75/// [`StorageError::TooLarge`].
76pub fn cap_stream(body: ByteStream, max: u64) -> ByteStream {
77    use futures_util::StreamExt;
78    let mut seen: u64 = 0;
79    let mut tripped = false;
80    let capped = body.flat_map(move |item| {
81        // Once the cap has tripped, end the stream — don't forward more.
82        if tripped {
83            return futures_util::stream::iter(Vec::new());
84        }
85        match item {
86            Ok(chunk) => {
87                seen = seen.saturating_add(chunk.len() as u64);
88                if seen > max {
89                    tripped = true;
90                    let err = std::io::Error::new(CAP_EXCEEDED_KIND, CAP_EXCEEDED_MARKER);
91                    futures_util::stream::iter(vec![Err(err)])
92                } else {
93                    futures_util::stream::iter(vec![Ok(chunk)])
94                }
95            }
96            Err(e) => {
97                tripped = true;
98                futures_util::stream::iter(vec![Err(e)])
99            }
100        }
101    });
102    Box::pin(capped)
103}
104
105/// Is `e` the over-limit error produced by [`cap_stream`]? Used by a
106/// streaming wrapper to map the cap trip onto [`StorageError::TooLarge`]
107/// rather than a generic [`StorageError::Io`].
108pub fn is_cap_exceeded(e: &std::io::Error) -> bool {
109    e.kind() == CAP_EXCEEDED_KIND && e.to_string().contains(CAP_EXCEEDED_MARKER)
110}
111
112/// A storage backend for file bytes.
113///
114/// Implementors persist opaque byte blobs under a generated *key* and
115/// expose them at a public URL. The default impl ships in `umbral-storage`
116/// (`FsStorage`, filesystem-backed); an S3 backend slots in behind the
117/// same trait later (see `docs/decisions/2026-06-02-media-and-s3.md`).
118///
119/// Signed / auth-gated URLs are deliberately out of scope here: [`url`]
120/// returns a *public* URL only. Private media is a deferred v0.x feature.
121///
122/// [`url`]: Storage::url
123#[async_trait]
124pub trait Storage: Send + Sync {
125    /// Persist `bytes` under a freshly generated, collision-resistant key
126    /// derived from `filename`, returning the key plus its public URL.
127    ///
128    /// `content_type` is the MIME type the caller declares; backends may
129    /// record it (e.g. for an S3 object's `Content-Type`) but are not
130    /// required to validate it — the upload handler should validate
131    /// against an allow-list before calling this.
132    async fn store(
133        &self,
134        filename: &str,
135        content_type: &str,
136        bytes: &[u8],
137    ) -> Result<StoredFile, StorageError>;
138
139    /// Read back the bytes stored under `key`.
140    ///
141    /// Returns [`StorageError::NotFound`] if no object exists for `key`.
142    async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError>;
143
144    /// Streaming counterpart of [`store`](Storage::store): persist a
145    /// `body` byte-stream without buffering the whole payload in memory.
146    ///
147    /// **Additive, with a default impl** — an existing backend that does
148    /// not override this still works, just buffered: the default collects
149    /// the stream into a `Vec<u8>` (propagating any mid-stream IO error)
150    /// and delegates to [`store`](Storage::store). Override it to true-stream
151    /// to the backend (the filesystem impl writes chunk-by-chunk to disk).
152    ///
153    /// Size enforcement is a *decorator* concern, not this method's: wrap
154    /// `body` with [`cap_stream`] before calling so the cap is applied as
155    /// bytes flow, never trusting a declared `Content-Length`.
156    async fn store_stream(
157        &self,
158        filename: &str,
159        content_type: &str,
160        body: ByteStream,
161    ) -> Result<StoredFile, StorageError> {
162        // Default: buffer the stream, then delegate to the buffered `store`.
163        let mut bytes: Vec<u8> = Vec::new();
164        let mut body = body;
165        while let Some(chunk) = futures_util::StreamExt::next(&mut body).await {
166            let chunk = chunk.map_err(StorageError::Io)?;
167            bytes.extend_from_slice(&chunk);
168        }
169        self.store(filename, content_type, &bytes).await
170    }
171
172    /// Streaming counterpart of [`retrieve`](Storage::retrieve): read the
173    /// object back as a byte-stream without holding the whole blob.
174    ///
175    /// **Additive, with a default impl** — the default calls
176    /// [`retrieve`](Storage::retrieve) and wraps the resulting `Vec<u8>`
177    /// as a single-chunk stream. Override it to true-stream from the
178    /// backend (the filesystem impl streams the file off disk).
179    async fn retrieve_stream(&self, key: &str) -> Result<ByteStream, StorageError> {
180        let bytes = self.retrieve(key).await?;
181        let chunk: Result<bytes::Bytes, std::io::Error> = Ok(bytes::Bytes::from(bytes));
182        Ok(Box::pin(futures_util::stream::once(async move { chunk })))
183    }
184
185    /// Persist `bytes` at the *exact* `key` the caller supplies — the
186    /// deterministic-path sibling of [`store`](Storage::store), which
187    /// generates a collision-resistant key. Static asset collection needs
188    /// this: a CSS file collected to `css/app.css` must land at that key,
189    /// not a `uuid-app.css` one.
190    ///
191    /// **Additive, with a default impl** — but the default *cannot*
192    /// generically write-at-exact-key without backend knowledge (the
193    /// trait has no "write these bytes here" primitive beyond
194    /// [`store`](Storage::store), which owns its own key). So the default
195    /// returns [`StorageError::Unsupported`]. Backends that can honour an
196    /// exact key (the filesystem backend, the future `LocalStorage` /
197    /// `S3Storage`) override it; media's [`store`](Storage::store) stays
198    /// the key-generating path.
199    ///
200    /// `content_type` is recorded by backends that track it (e.g. an S3
201    /// object's `Content-Type`); the filesystem backend derives the
202    /// served type from the key's extension instead.
203    async fn put(
204        &self,
205        key: &str,
206        content_type: &str,
207        bytes: &[u8],
208    ) -> Result<StoredFile, StorageError> {
209        let _ = (key, content_type, bytes);
210        Err(StorageError::Unsupported(
211            "this Storage backend does not implement put(); override it to write at an exact key"
212                .to_string(),
213        ))
214    }
215
216    /// Streaming counterpart of [`put`](Storage::put): persist a `body`
217    /// byte-stream at the exact `key` without buffering the whole payload.
218    ///
219    /// **Additive, with a default impl** that mirrors the
220    /// [`store_stream`](Storage::store_stream)/[`store`](Storage::store)
221    /// relationship: it collects the stream into a `Vec<u8>` (propagating
222    /// any mid-stream IO error) and delegates to [`put`](Storage::put), so
223    /// a backend that overrides `put` gets a working `put_stream` for
224    /// free. Override it to true-stream to the backend.
225    async fn put_stream(
226        &self,
227        key: &str,
228        content_type: &str,
229        body: ByteStream,
230    ) -> Result<StoredFile, StorageError> {
231        let mut bytes: Vec<u8> = Vec::new();
232        let mut body = body;
233        while let Some(chunk) = futures_util::StreamExt::next(&mut body).await {
234            let chunk = chunk.map_err(StorageError::Io)?;
235            bytes.extend_from_slice(&chunk);
236        }
237        self.put(key, content_type, &bytes).await
238    }
239
240    /// Does an object exist under `key`?
241    ///
242    /// **Additive, with a default impl** — `Ok(self.retrieve(key).await.is_ok())`,
243    /// which works for any backend through [`retrieve`](Storage::retrieve).
244    /// Backends with a cheaper presence check (an S3 `HEAD`, a filesystem
245    /// `metadata` stat) override it to avoid reading the whole blob.
246    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
247        Ok(self.retrieve(key).await.is_ok())
248    }
249
250    /// Remove the object stored under `key`. Idempotent at the backend's
251    /// discretion; deleting a missing key may succeed or return
252    /// [`StorageError::NotFound`].
253    async fn delete(&self, key: &str) -> Result<(), StorageError>;
254
255    /// The public URL a client can fetch the object at. Public-only;
256    /// signed URLs are deferred.
257    fn url(&self, key: &str) -> String;
258}
259
260/// The outcome of a successful [`Storage::store`]: the generated key and
261/// its public URL.
262#[derive(Debug, Clone)]
263pub struct StoredFile {
264    /// The backend-generated key the bytes live under. Stable for the
265    /// lifetime of the object; pass it back to [`Storage::retrieve`] /
266    /// [`Storage::delete`] / [`Storage::url`].
267    pub key: String,
268    /// The public URL the object is served at. Equal to
269    /// `storage.url(&key)`.
270    pub url: String,
271    /// The number of bytes actually written. For [`Storage::store`] this
272    /// equals `bytes.len()`; for [`Storage::store_stream`] it is the
273    /// cumulative count streamed to the backend (the truth a `media_file`
274    /// row records, since a stream has no trustworthy up-front length).
275    pub size: u64,
276}
277
278/// Errors a [`Storage`] operation can return.
279#[derive(Debug)]
280pub enum StorageError {
281    /// No ambient backend has been registered.
282    NoBackend,
283    /// No object exists under the given key.
284    NotFound,
285    /// The bytes exceeded the backend's configured size cap.
286    TooLarge {
287        /// The configured limit, in bytes.
288        limit: u64,
289        /// The actual size that was rejected, in bytes.
290        actual: u64,
291    },
292    /// An underlying I/O error (filesystem read/write, etc.).
293    Io(std::io::Error),
294    /// A backend-specific failure that doesn't map to the variants above
295    /// (e.g. an S3 API error, or a row-insert failure in a wrapper).
296    Backend(String),
297    /// The backend doesn't implement the requested operation — returned by
298    /// the default [`Storage::put`] impl for a backend that can't write at
299    /// an exact key. The message names what's missing.
300    Unsupported(String),
301}
302
303impl std::fmt::Display for StorageError {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        match self {
306            StorageError::NoBackend => write!(
307                f,
308                "storage: no backend registered; add StoragePlugin or call set_storage"
309            ),
310            StorageError::NotFound => write!(f, "storage: object not found"),
311            StorageError::TooLarge { limit, actual } => write!(
312                f,
313                "storage: object {actual}B exceeds configured cap of {limit}B"
314            ),
315            StorageError::Io(e) => write!(f, "storage: io: {e}"),
316            StorageError::Backend(s) => write!(f, "storage: backend: {s}"),
317            StorageError::Unsupported(s) => write!(f, "storage: unsupported: {s}"),
318        }
319    }
320}
321
322impl std::error::Error for StorageError {
323    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
324        match self {
325            StorageError::Io(e) => Some(e),
326            _ => None,
327        }
328    }
329}
330
331impl From<std::io::Error> for StorageError {
332    fn from(e: std::io::Error) -> Self {
333        StorageError::Io(e)
334    }
335}
336
337/// The conventional name of the **media** (user-upload) storage instance,
338/// the `"default"` entry in the storage registry. The back-compat accessors
339/// ([`storage`], [`set_storage`], …) operate on this name.
340pub const DEFAULT: &str = "default";
341
342/// The conventional name of the **static-files** storage instance,
343/// the `"staticfiles"` entry in the storage registry, where `collectstatic`
344/// writes collected assets. Resolved independently of [`DEFAULT`].
345pub const STATICFILES: &str = "staticfiles";
346
347/// The ambient, **named** storage registry, published at boot.
348///
349/// The named storage map: a small map from a static name
350/// (`"default"` for media, `"staticfiles"` for collected assets) to its
351/// backend. Replaces the former single-global `OnceLock<Arc<dyn Storage>>`
352/// so media and static can resolve independent backends under one
353/// abstraction. Registration is boot-time, so a `Mutex<HashMap>` behind a
354/// `OnceLock` is the right shape; the set-once-*per-name* discipline
355/// (first-wins, warn-and-keep on a re-set) mirrors the old single global.
356///
357/// Same "one intentional global" family as `crate::db`'s pool registry
358/// and the settings handle.
359static STORAGES: OnceLock<Mutex<HashMap<&'static str, Arc<dyn Storage>>>> = OnceLock::new();
360
361/// Access the named registry, initialising the empty map on first use.
362fn registry() -> &'static Mutex<HashMap<&'static str, Arc<dyn Storage>>> {
363    STORAGES.get_or_init(|| Mutex::new(HashMap::new()))
364}
365
366/// Register the storage backend under `name` (e.g. [`DEFAULT`] for media,
367/// [`STATICFILES`] for collected static assets).
368///
369/// Set-once **per name**, first-wins: a second call for the *same* name
370/// logs a warning and keeps the originally registered backend (different
371/// names register independently). Returns `true` when this call won the
372/// registration for `name`, `false` when that name was already taken.
373pub fn set_storage_named(name: &'static str, s: Arc<dyn Storage>) -> bool {
374    let mut map = registry()
375        .lock()
376        .unwrap_or_else(std::sync::PoisonError::into_inner);
377    if map.contains_key(name) {
378        tracing::warn!(
379            name,
380            "umbral::storage::set_storage_named called more than once for the same name; \
381             keeping the first-registered backend and ignoring the new one"
382        );
383        false
384    } else {
385        map.insert(name, s);
386        true
387    }
388}
389
390/// Return the storage backend registered under `name`.
391///
392/// # Panics
393///
394/// Panics if no backend has been registered under `name`. Wire one by
395/// adding the plugin that owns that name (`StoragePlugin` for [`DEFAULT`])
396/// or by calling [`set_storage_named`] directly.
397pub fn storage_named(name: &str) -> Arc<dyn Storage> {
398    try_storage_named(name).unwrap_or_else(|_| {
399        panic!(
400            "no Storage backend registered under `{name}`; add the owning plugin \
401             (StoragePlugin for `default`) or call umbral::storage::set_storage_named"
402        )
403    })
404}
405
406/// Return the storage backend registered under `name`, or
407/// [`StorageError::NoBackend`] if none is.
408pub fn try_storage_named(name: &str) -> Result<Arc<dyn Storage>, StorageError> {
409    storage_opt_named(name).ok_or(StorageError::NoBackend)
410}
411
412/// Return the storage backend registered under `name` if one exists, else
413/// `None`. The non-panicking variant of [`storage_named`].
414pub fn storage_opt_named(name: &str) -> Option<Arc<dyn Storage>> {
415    let map = registry()
416        .lock()
417        .unwrap_or_else(std::sync::PoisonError::into_inner);
418    map.get(name).cloned()
419}
420
421/// Register the ambient **default** (media) storage backend — the
422/// back-compat alias for `set_storage_named(`[`DEFAULT`]`, s)`.
423///
424/// Set-once, first-wins: a second call logs a warning and keeps the
425/// originally registered backend, mirroring `crate::settings::init` and
426/// `crate::db::init_atomic_default` rather than panicking on a double
427/// set. Returns `true` when this call won the registration, `false` when
428/// a backend was already registered.
429///
430/// `umbral-storage`'s `StoragePlugin::on_ready` calls this so the ambient
431/// default is its `FsStorage`; an app can also call it directly to wire a
432/// custom backend before (or instead of) any storage plugin.
433pub fn set_storage(s: Arc<dyn Storage>) -> bool {
434    set_storage_named(DEFAULT, s)
435}
436
437/// Return the ambient **default** (media) storage backend — the
438/// back-compat alias for `storage_named(`[`DEFAULT`]`)`.
439///
440/// # Panics
441///
442/// Panics if no backend has been registered. Wire one by adding
443/// `StoragePlugin` (which registers its `FsStorage` in `on_ready`) or by
444/// calling [`set_storage`] directly.
445pub fn storage() -> Arc<dyn Storage> {
446    try_storage().expect(
447        "no Storage backend registered; add StoragePlugin or call umbral::storage::set_storage",
448    )
449}
450
451/// Return the ambient **default** (media) storage backend, or an explicit
452/// error if none is registered. Back-compat alias for
453/// `try_storage_named(`[`DEFAULT`]`)`.
454pub fn try_storage() -> Result<Arc<dyn Storage>, StorageError> {
455    try_storage_named(DEFAULT)
456}
457
458/// Return the ambient **default** (media) storage backend if registered,
459/// else `None`. Back-compat alias for `storage_opt_named(`[`DEFAULT`]`)`.
460///
461/// The non-panicking variant of [`storage`]. Useful for boot-time
462/// system checks (a future `FileField` check can warn when a model
463/// declares a file field but no `Storage` backend is wired) and for
464/// plugin code that runs before `on_ready`.
465pub fn storage_opt() -> Option<Arc<dyn Storage>> {
466    storage_opt_named(DEFAULT)
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472    use std::collections::HashMap as Map;
473    use std::sync::Mutex as StdMutex;
474
475    /// A minimal in-memory backend. `store` generates a key; `put` is left
476    /// at the trait default (returns `Unsupported`) so we can assert the
477    /// default path; `exists` is left at the trait default (via `retrieve`).
478    struct MemNoPut {
479        objects: StdMutex<Map<String, Vec<u8>>>,
480    }
481
482    impl MemNoPut {
483        fn new() -> Self {
484            Self {
485                objects: StdMutex::new(Map::new()),
486            }
487        }
488    }
489
490    #[async_trait]
491    impl Storage for MemNoPut {
492        async fn store(
493            &self,
494            filename: &str,
495            _content_type: &str,
496            bytes: &[u8],
497        ) -> Result<StoredFile, StorageError> {
498            let key = format!("k-{filename}");
499            self.objects
500                .lock()
501                .unwrap()
502                .insert(key.clone(), bytes.to_vec());
503            Ok(StoredFile {
504                url: self.url(&key),
505                key,
506                size: bytes.len() as u64,
507            })
508        }
509
510        async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError> {
511            self.objects
512                .lock()
513                .unwrap()
514                .get(key)
515                .cloned()
516                .ok_or(StorageError::NotFound)
517        }
518
519        async fn delete(&self, key: &str) -> Result<(), StorageError> {
520            self.objects.lock().unwrap().remove(key);
521            Ok(())
522        }
523
524        fn url(&self, key: &str) -> String {
525            format!("/mem/{key}")
526        }
527    }
528
529    /// Same backend but overriding `put` to write at the exact key.
530    struct MemWithPut {
531        objects: StdMutex<Map<String, Vec<u8>>>,
532    }
533
534    impl MemWithPut {
535        fn new() -> Self {
536            Self {
537                objects: StdMutex::new(Map::new()),
538            }
539        }
540    }
541
542    #[async_trait]
543    impl Storage for MemWithPut {
544        async fn store(
545            &self,
546            filename: &str,
547            ct: &str,
548            bytes: &[u8],
549        ) -> Result<StoredFile, StorageError> {
550            self.put(&format!("k-{filename}"), ct, bytes).await
551        }
552
553        async fn retrieve(&self, key: &str) -> Result<Vec<u8>, StorageError> {
554            self.objects
555                .lock()
556                .unwrap()
557                .get(key)
558                .cloned()
559                .ok_or(StorageError::NotFound)
560        }
561
562        async fn put(
563            &self,
564            key: &str,
565            _ct: &str,
566            bytes: &[u8],
567        ) -> Result<StoredFile, StorageError> {
568            self.objects
569                .lock()
570                .unwrap()
571                .insert(key.to_string(), bytes.to_vec());
572            Ok(StoredFile {
573                url: self.url(key),
574                key: key.to_string(),
575                size: bytes.len() as u64,
576            })
577        }
578
579        async fn delete(&self, key: &str) -> Result<(), StorageError> {
580            self.objects.lock().unwrap().remove(key);
581            Ok(())
582        }
583
584        fn url(&self, key: &str) -> String {
585            format!("/mem/{key}")
586        }
587    }
588
589    #[tokio::test]
590    async fn put_default_returns_unsupported() {
591        let s = MemNoPut::new();
592        let err = s.put("css/app.css", "text/css", b"x").await.unwrap_err();
593        match err {
594            StorageError::Unsupported(msg) => {
595                assert!(msg.contains("does not implement put"), "msg = {msg}");
596            }
597            other => panic!("expected Unsupported, got {other:?}"),
598        }
599    }
600
601    #[tokio::test]
602    async fn put_override_writes_at_exact_key() {
603        let s = MemWithPut::new();
604        let stored = s
605            .put("css/app.css", "text/css", b"body{}")
606            .await
607            .unwrap();
608        // The key is EXACTLY what we asked for — no generation.
609        assert_eq!(stored.key, "css/app.css");
610        assert_eq!(stored.size, 6);
611        // And it round-trips back at that exact key.
612        assert_eq!(s.retrieve("css/app.css").await.unwrap(), b"body{}");
613    }
614
615    #[tokio::test]
616    async fn exists_default_true_after_store_false_when_missing() {
617        let s = MemNoPut::new();
618        let stored = s.store("a.txt", "text/plain", b"hi").await.unwrap();
619        assert!(s.exists(&stored.key).await.unwrap());
620        assert!(!s.exists("nope").await.unwrap());
621    }
622
623    #[tokio::test]
624    async fn put_stream_default_delegates_to_put() {
625        let s = MemWithPut::new();
626        let body: ByteStream = Box::pin(futures_util::stream::once(async {
627            Ok(bytes::Bytes::from_static(b"streamed"))
628        }));
629        let stored = s.put_stream("js/app.js", "text/javascript", body).await.unwrap();
630        assert_eq!(stored.key, "js/app.js");
631        assert_eq!(s.retrieve("js/app.js").await.unwrap(), b"streamed");
632    }
633}