Skip to main content

http_cache_stream/storage/
default.rs

1//! Implementation of the default cache storage.
2
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use futures::FutureExt;
13use http::HeaderMap;
14use http::Response;
15use http::StatusCode;
16use http::Version;
17use http::response::Parts;
18use http_body::Body;
19use http_cache_semantics::CachePolicy;
20use serde::Deserialize;
21use serde::Serialize;
22use tracing::debug;
23
24use super::StoredResponse;
25use crate::body::CacheBody;
26use crate::runtime;
27use crate::storage::CacheStorage;
28
29/// The current directory layout version.
30const STORAGE_VERSION: &str = "v1";
31/// The name of the `responses` directory.
32const RESPONSE_DIRECTORY_NAME: &str = "responses";
33/// The name of the `content` directory.
34const CONTENT_DIRECTORY_NAME: &str = "content";
35/// The name of the `tmp` directory.
36const TEMP_DIRECTORY_NAME: &str = "tmp";
37
38/// Represents a reference to a cached response.
39///
40/// This type is serialized to the response file.
41///
42/// This definition must be kept in sync with `CachedResponse`.
43#[derive(Serialize)]
44struct CachedResponseRef<'a> {
45    /// The response's status.
46    #[serde(with = "http_serde::status_code")]
47    status: StatusCode,
48
49    /// The response's version.
50    #[serde(with = "http_serde::version")]
51    version: Version,
52
53    /// The response's headers.
54    #[serde(with = "http_serde::header_map")]
55    headers: &'a HeaderMap,
56
57    /// The content digest of the response.
58    digest: &'a str,
59
60    /// The last used cached policy.
61    policy: &'a CachePolicy,
62}
63
64/// Represents a cached response.
65///
66/// This type is deserialized from the response file.
67#[derive(Deserialize)]
68struct CachedResponse {
69    /// The response's status.
70    #[serde(with = "http_serde::status_code")]
71    status: StatusCode,
72
73    /// The response's version.
74    #[serde(with = "http_serde::version")]
75    version: Version,
76
77    /// The response's headers.
78    #[serde(with = "http_serde::header_map")]
79    headers: HeaderMap,
80
81    /// The content digest of the response.
82    digest: String,
83
84    /// The last used cached policy.
85    policy: CachePolicy,
86}
87
88/// The default cache storage implementation.
89///
90/// ## Layout
91///
92/// This storage implementation uses the following directory structure:
93///
94/// ```text
95/// <root>/
96/// ├─ <storage-version>/
97/// │  ├─ responses/
98/// │  │  ├─ <key>
99/// │  │  ├─ <key>
100/// │  │  ├─ ...
101/// │  ├─ content/
102/// │  │  ├─ <digest>
103/// │  │  ├─ <digest>
104/// │  │  ├─ ...
105/// │  ├─ tmp/
106/// ```
107///
108/// Where `<root>` is the root storage directory, `<storage-version>` is a
109/// constant that changes when the directory layout changes (currently `v1`),
110/// `<key>` is supplied by the cache, and `<digest>` is the calculated digest of
111/// a response body.
112///
113/// ## The `responses` directory
114///
115/// The `responses` directory contains a file for each cached response.
116///
117/// The file is a bincode-serialized `CachedResponse` that contains information
118/// about the response, including the response body content digest.
119///
120/// ### Response file locking
121///
122/// Advisory file locks are obtained on a response file as the cache entries
123/// are read and updated.
124///
125/// This is used to coordinate access to the storage via this library; it does
126/// not protect against external modifications to the storage.
127///
128/// ## The `content` directory
129///
130/// The `content` directory contains a file for each cached response body.
131///
132/// The file name is the digest of the response body contents.
133///
134/// Currently the [`blake3`][blake3] hash algorithm is used for calculating
135/// response body digests.
136///
137/// ## The `tmp` directory
138///
139/// The `tmp` directory is used for temporarily storing response bodies as they
140/// are saved to the cache.
141///
142/// The content digest of the response is calculated as the response is written
143/// into temporary storage.
144///
145/// Once the response body has been fully read, the temporary file is atomically
146/// renamed to its content directory location; if the content already exists,
147/// the temporary file is deleted.
148///
149/// ## Integrity
150///
151/// This storage implementation does not provide strong guarantees on the
152/// integrity of the stored response bodies.
153///
154/// If the storage is externally modified, the modification will go undetected
155/// and the modified response bodies will be served.
156///
157/// ## Fault tolerance
158///
159/// If an error occurs while updating a cache entry with
160/// [`DefaultCacheStorage::put`], a future [`DefaultCacheStorage::get`] call
161/// will treat the entry as not present.
162///
163/// [blake3]: https://github.com/BLAKE3-team/BLAKE3
164#[derive(Clone)]
165pub struct DefaultCacheStorage(Arc<DefaultCacheStorageInner>);
166
167impl DefaultCacheStorage {
168    /// Constructs a new default cache storage with the given
169    pub fn new(root_dir: impl Into<PathBuf>) -> Self {
170        Self(Arc::new(DefaultCacheStorageInner(root_dir.into())))
171    }
172}
173
174impl CacheStorage for DefaultCacheStorage {
175    async fn get<B: Body + Send>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
176        let cached = match self.0.read_response(key).await? {
177            Some(response) => response,
178            None => return Ok(None),
179        };
180
181        // Open the response body
182        let path = self.body_path(&cached.digest);
183        let body = match runtime::File::open(&path)
184            .await
185            .map(Some)
186            .or_else(|e| {
187                if e.kind() == io::ErrorKind::NotFound {
188                    Ok(None)
189                } else {
190                    Err(e)
191                }
192            })
193            .with_context(|| {
194                format!(
195                    "failed to open response body `{path}`",
196                    path = path.display()
197                )
198            })? {
199            Some(file) => file,
200            None => return Ok(None),
201        };
202
203        // Build a response from the cached parts
204        let mut builder = Response::builder()
205            .version(cached.version)
206            .status(cached.status);
207        let headers = builder.headers_mut().expect("should be valid");
208        headers.extend(cached.headers);
209
210        Ok(Some(StoredResponse {
211            response: builder
212                .body(CacheBody::from_file(body).await.with_context(|| {
213                    format!(
214                        "failed to create response body for `{path}`",
215                        path = path.display()
216                    )
217                })?)
218                .expect("should be valid"),
219            policy: cached.policy,
220            digest: cached.digest,
221        }))
222    }
223
224    async fn put(
225        &self,
226        key: &str,
227        parts: &Parts,
228        policy: &CachePolicy,
229        digest: &str,
230    ) -> Result<()> {
231        self.0
232            .write_response(
233                key,
234                CachedResponseRef {
235                    status: parts.status,
236                    version: parts.version,
237                    headers: &parts.headers,
238                    digest,
239                    policy,
240                },
241            )
242            .await
243    }
244
245    async fn store<B: Body + Send>(
246        &self,
247        key: String,
248        parts: Parts,
249        body: B,
250        policy: CachePolicy,
251    ) -> Result<Response<CacheBody<B>>> {
252        // Create a temporary file for the download of the body
253        let inner = self.0.clone();
254        let temp_dir = inner.temp_dir_path();
255        fs::create_dir_all(&temp_dir).with_context(|| {
256            format!(
257                "failed to create temporary directory `{path}`",
258                path = temp_dir.display()
259            )
260        })?;
261
262        // Create a new caching body from the upstream body
263        // The provided callback will be invoked once the cache file hsa been completed
264        let status = parts.status;
265        let version = parts.version;
266        let headers = parts.headers.clone();
267
268        let body = CacheBody::from_caching_upstream(body, &temp_dir, move |digest, path| {
269            async move {
270                let content_path = inner.content_path(&digest);
271                fs::create_dir_all(content_path.parent().expect("should have parent"))
272                    .context("failed to create content directory")?;
273
274                // Atomically persist the temp file into the `content` location
275                path.persist(&content_path).with_context(|| {
276                    format!(
277                        "failed to persist downloaded body to content path `{path}`",
278                        path = content_path.display()
279                    )
280                })?;
281
282                // Update the response
283                inner
284                    .write_response(
285                        &key,
286                        CachedResponseRef {
287                            status,
288                            version,
289                            headers: &headers,
290                            digest: &digest,
291                            policy: &policy,
292                        },
293                    )
294                    .await?;
295
296                debug!(key, digest, "response body stored successfully");
297
298                Ok(())
299            }
300            .boxed()
301        })
302        .await?;
303
304        Ok(Response::from_parts(parts, body))
305    }
306
307    async fn delete(&self, key: &str) -> Result<()> {
308        // Acquire an exclusive lock on the response file
309        // By acquiring the lock, we truncate the file; any attempt to deserialize an
310        // empty response file will fail and be treated as not-present
311        self.0.lock_response_exclusive(key).await?;
312        Ok(())
313    }
314
315    fn body_path(&self, digest: &str) -> PathBuf {
316        self.0.content_path(digest)
317    }
318}
319
320/// Represents the default cache storage implementation.
321struct DefaultCacheStorageInner(PathBuf);
322
323impl DefaultCacheStorageInner {
324    /// Calculates the path to a response file.
325    fn response_path(&self, key: &str) -> PathBuf {
326        let mut path = self.0.to_path_buf();
327        path.push(STORAGE_VERSION);
328        path.push(RESPONSE_DIRECTORY_NAME);
329        path.push(key);
330        path
331    }
332
333    /// Calculates the path to a content file.
334    fn content_path(&self, digest: &str) -> PathBuf {
335        let mut path = self.0.to_path_buf();
336        path.push(STORAGE_VERSION);
337        path.push(CONTENT_DIRECTORY_NAME);
338        path.push(digest);
339        path
340    }
341
342    /// Calculates the path to the temp directory.
343    fn temp_dir_path(&self) -> PathBuf {
344        let mut path = self.0.to_path_buf();
345        path.push(STORAGE_VERSION);
346        path.push(TEMP_DIRECTORY_NAME);
347        path
348    }
349
350    /// Reads a response from storage for the given key.
351    ///
352    /// This method will block if the response file is exclusively locked.
353    async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
354        // Acquire a shared lock on the response file
355        let mut response = match self.lock_response_shared(key).await? {
356            Some(file) => file,
357            None => return Ok(None),
358        };
359
360        // Decode the cached response
361        Ok(bincode::deserialize_from(&mut response)
362            .inspect_err(|e| {
363                debug!(
364                    "failed to deserialize response file `{path}`: {e} (cache entry will be \
365                     ignored)",
366                    path = self.response_path(key).display()
367                );
368            })
369            .ok())
370    }
371
372    /// Writes a response to storage for the given key.
373    ///
374    /// This method will block if the response file is locked.
375    async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
376        // Acquire a shared lock on the response file
377        let mut file = self.lock_response_exclusive(key).await?;
378
379        // Encode the response
380        bincode::serialize_into(&mut file, &response)
381            .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
382            .map(|_| ())
383    }
384
385    /// Locks a response file for shared access.
386    ///
387    /// Returns `Ok(None)` if the file does not exist.
388    async fn lock_response_shared(&self, key: &str) -> Result<Option<File>> {
389        let path = self.response_path(key);
390        match fs::OpenOptions::new()
391            .read(true)
392            .open(&path)
393            .map(Some)
394            .or_else(|e| {
395                if e.kind() == io::ErrorKind::NotFound {
396                    Ok(None)
397                } else {
398                    Err(e)
399                }
400            })
401            .with_context(|| {
402                format!(
403                    "failed to open response file `{path}`",
404                    path = path.display()
405                )
406            })? {
407            Some(file) => {
408                match runtime::unwrap_task_output(
409                    runtime::spawn_blocking(move || {
410                        file.lock_shared()
411                            .context("failed to acquire shared lock on response file")?;
412                        Ok(file)
413                    })
414                    .await,
415                ) {
416                    Some(res) => res.map(Some),
417                    None => bail!("failed to wait for file lock"),
418                }
419            }
420            None => Ok(None),
421        }
422    }
423
424    /// Locks a response file for exclusive access.
425    ///
426    /// If the file does not exist, it is created.
427    ///
428    /// The file is intentionally truncated upon lock acquisition.
429    async fn lock_response_exclusive(&self, key: &str) -> Result<File> {
430        let path = self.response_path(key);
431        let dir = path.parent().expect("should have parent directory");
432        fs::create_dir_all(dir)
433            .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
434
435        let mut options = fs::OpenOptions::new();
436
437        // Note: we don't use the `truncate` option to truncate the file as we need the
438        // truncation to happen *after* the lock is acquired
439        options.create(true).write(true);
440
441        #[cfg(unix)]
442        {
443            // On Unix, make the mode 600
444            use std::os::unix::fs::OpenOptionsExt;
445            options.mode(0o600);
446        }
447
448        let file = options.open(&path).with_context(|| {
449            format!(
450                "failed to create response file `{path}`",
451                path = path.display()
452            )
453        })?;
454
455        let file = match runtime::unwrap_task_output(
456            runtime::spawn_blocking(move || {
457                file.lock()
458                    .context("failed to acquire exclusive lock on response file")?;
459                anyhow::Ok(file)
460            })
461            .await,
462        ) {
463            Some(res) => res?,
464            None => bail!("failed to wait for file lock"),
465        };
466
467        file.set_len(0).with_context(|| {
468            format!(
469                "failed to truncate response file `{path}`",
470                path = path.display()
471            )
472        })?;
473
474        Ok(file)
475    }
476}
477
478#[cfg(all(test, feature = "tokio"))]
479mod test {
480    use futures::StreamExt;
481    use http::Request;
482    use http_body_util::BodyDataStream;
483    use http_cache_semantics::CachePolicy;
484    use tempfile::tempdir;
485
486    use super::*;
487
488    #[tokio::test]
489    async fn cache_miss() {
490        let dir = tempdir().unwrap();
491        let storage = DefaultCacheStorage::new(dir.path());
492        assert!(
493            storage
494                .get::<String>("does-not-exist")
495                .await
496                .expect("should not fail")
497                .is_none()
498        );
499    }
500
501    #[tokio::test]
502    async fn cache_hit() {
503        const KEY: &str = "key";
504        const BODY: &str = "hello world";
505        const DIGEST: &str = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
506        const HEADER_NAME: &str = "foo";
507        const HEADER_VALUE: &str = "bar";
508
509        let dir = tempdir().unwrap();
510        let storage = DefaultCacheStorage::new(dir.path());
511
512        // Assert the key doesn't currently exist in the cache
513        assert!(storage.get::<String>(KEY).await.unwrap().is_none());
514
515        // Store a response in the cache
516        let request = Request::builder().body("").unwrap();
517        let response = Response::builder().body(BODY.to_string()).unwrap();
518        let policy: CachePolicy = CachePolicy::new(&request, &response);
519
520        let (parts, body) = response.into_parts();
521        let response = storage
522            .store(KEY.to_string(), parts, body, policy)
523            .await
524            .unwrap();
525
526        // Read the response to the end to fully cache the body
527        let mut stream = BodyDataStream::new(response.into_body());
528        let data = stream.next().await.unwrap().unwrap();
529        assert!(stream.next().await.is_none());
530        assert_eq!(data, BODY);
531        drop(stream);
532
533        // Lookup the cache entry (should exist now, without the header)
534        let cached = storage.get::<String>(KEY).await.unwrap().unwrap();
535        assert!(cached.response.headers().get(HEADER_NAME).is_none());
536
537        // Read the cached response
538        let data = BodyDataStream::new(cached.response.into_body())
539            .next()
540            .await
541            .unwrap()
542            .unwrap();
543        assert_eq!(data, BODY);
544        assert_eq!(cached.digest, DIGEST);
545
546        // Create an "updated" response and put it into the cache with the same body
547        let response = Response::builder()
548            .header(HEADER_NAME, HEADER_VALUE)
549            .body(BODY.to_string())
550            .unwrap();
551        let policy = CachePolicy::new(&request, &response);
552
553        let (parts, _) = response.into_parts();
554        storage.put(KEY, &parts, &policy, DIGEST).await.unwrap();
555
556        // Lookup the cache entry (should exist with the header)
557        let cached = storage.get::<String>(KEY).await.unwrap().unwrap();
558        assert_eq!(
559            cached
560                .response
561                .headers()
562                .get(HEADER_NAME)
563                .map(|v| v.to_str().unwrap()),
564            Some(HEADER_VALUE)
565        );
566
567        // Read the cached response (should be unchanged)
568        let data = BodyDataStream::new(cached.response.into_body())
569            .next()
570            .await
571            .unwrap()
572            .unwrap();
573        assert_eq!(data, BODY);
574        assert_eq!(cached.digest, DIGEST);
575
576        // Delete the key and ensure it no longer exists
577        storage.delete(KEY).await.unwrap();
578        assert!(storage.get::<String>(KEY).await.unwrap().is_none());
579    }
580}