http_cache_stream/storage/
default.rs

1//! Implementation of the default cache storage.
2
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use futures::FutureExt;
13use http::HeaderMap;
14use http::Response;
15use http::StatusCode;
16use http::Version;
17use http::response::Parts;
18use http_cache_semantics::CachePolicy;
19use serde::Deserialize;
20use serde::Serialize;
21use tracing::debug;
22
23use super::StoredResponse;
24use crate::HttpBody;
25use crate::body::Body;
26use crate::runtime;
27use crate::storage::CacheStorage;
28
29/// The current directory layout version.
30const STORAGE_VERSION: &str = "v1";
31/// The name of the `responses` directory.
32const RESPONSE_DIRECTORY_NAME: &str = "responses";
33/// The name of the `content` directory.
34const CONTENT_DIRECTORY_NAME: &str = "content";
35/// The name of the `tmp` directory.
36const TEMP_DIRECTORY_NAME: &str = "tmp";
37
38/// Represents a reference to a cached response.
39///
40/// This type is serialized to the response file.
41///
42/// This definition must be kept in sync with `CachedResponse`.
43#[derive(Serialize)]
44struct CachedResponseRef<'a> {
45    /// The response's status.
46    #[serde(with = "http_serde::status_code")]
47    status: StatusCode,
48
49    /// The response's version.
50    #[serde(with = "http_serde::version")]
51    version: Version,
52
53    /// The response's headers.
54    #[serde(with = "http_serde::header_map")]
55    headers: &'a HeaderMap,
56
57    /// The content digest of the response.
58    digest: &'a str,
59
60    /// The last used cached policy.
61    policy: &'a CachePolicy,
62}
63
64/// Represents a cached response.
65///
66/// This type is deserialized from the response file.
67#[derive(Deserialize)]
68struct CachedResponse {
69    /// The response's status.
70    #[serde(with = "http_serde::status_code")]
71    status: StatusCode,
72
73    /// The response's version.
74    #[serde(with = "http_serde::version")]
75    version: Version,
76
77    /// The response's headers.
78    #[serde(with = "http_serde::header_map")]
79    headers: HeaderMap,
80
81    /// The content digest of the response.
82    digest: String,
83
84    /// The last used cached policy.
85    policy: CachePolicy,
86}
87
88/// The default cache storage implementation.
89///
90/// ## Layout
91///
92/// This storage implementation uses the following directory structure:
93///
94/// ```text
95/// <root>/
96/// ├─ <storage-version>/
97/// │  ├─ responses/
98/// │  │  ├─ <key>
99/// │  │  ├─ <key>
100/// │  │  ├─ ...
101/// │  ├─ content/
102/// │  │  ├─ <digest>
103/// │  │  ├─ <digest>
104/// │  │  ├─ ...
105/// │  ├─ tmp/
106/// ```
107///
108/// Where `<root>` is the root storage directory, `<storage-version>` is a
109/// constant that changes when the directory layout changes (currently `v1`),
110/// `<key>` is supplied by the cache, and `<digest>` is the calculated digest of
111/// a response body.
112///
113/// ## The `responses` directory
114///
115/// The `responses` directory contains a file for each cached response.
116///
117/// The file is a bincode-serialized `CachedResponse` that contains information
118/// about the response, including the response body content digest.
119///
120/// ### Response file locking
121///
122/// Advisory file locks are obtained on a response file as the cache entries
123/// are read and updated.
124///
125/// This is used to coordinate access to the storage via this library; it does
126/// not protect against external modifications to the storage.
127///
128/// ## The `content` directory
129///
130/// The `content` directory contains a file for each cached response body.
131///
132/// The file name is the digest of the response body contents.
133///
134/// Currently the [`blake3`][blake3] hash algorithm is used for calculating
135/// response body digests.
136///
137/// ## The `tmp` directory
138///
139/// The `tmp` directory is used for temporarily storing response bodies as they
140/// are saved to the cache.
141///
142/// The content digest of the response is calculated as the response is written
143/// into temporary storage.
144///
145/// Once the response body has been fully read, the temporary file is atomically
146/// renamed to its content directory location; if the content already exists,
147/// the temporary file is deleted.
148///
149/// ## Integrity
150///
151/// This storage implementation does not provide strong guarantees on the
152/// integrity of the stored response bodies.
153///
154/// If the storage is externally modified, the modification will go undetected
155/// and the modified response bodies will be served.
156///
157/// ## Fault tolerance
158///
159/// If an error occurs while updating a cache entry with
160/// [`DefaultCacheStorage::put`], a future [`DefaultCacheStorage::get`] call
161/// will treat the entry as not present.
162///
163/// [blake3]: https://github.com/BLAKE3-team/BLAKE3
164#[derive(Clone)]
165pub struct DefaultCacheStorage(Arc<DefaultCacheStorageInner>);
166
167impl DefaultCacheStorage {
168    /// Constructs a new default cache storage with the given
169    pub fn new(root_dir: impl Into<PathBuf>) -> Self {
170        Self(Arc::new(DefaultCacheStorageInner(root_dir.into())))
171    }
172}
173
174impl CacheStorage for DefaultCacheStorage {
175    async fn get<B: HttpBody>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
176        let cached = match self.0.read_response(key).await? {
177            Some(response) => response,
178            None => return Ok(None),
179        };
180
181        // Open the response body
182        let path = self.body_path(&cached.digest);
183        let body = match runtime::File::open(&path)
184            .await
185            .map(Some)
186            .or_else(|e| {
187                if e.kind() == io::ErrorKind::NotFound {
188                    Ok(None)
189                } else {
190                    Err(e)
191                }
192            })
193            .with_context(|| {
194                format!(
195                    "failed to open response body `{path}`",
196                    path = path.display()
197                )
198            })? {
199            Some(file) => file,
200            None => return Ok(None),
201        };
202
203        // Build a response from the cached parts
204        let mut builder = Response::builder()
205            .version(cached.version)
206            .status(cached.status);
207        let headers = builder.headers_mut().expect("should be valid");
208        headers.extend(cached.headers);
209
210        Ok(Some(StoredResponse {
211            response: builder
212                .body(Body::from_file(body).await.with_context(|| {
213                    format!(
214                        "failed to create response body for `{path}`",
215                        path = path.display()
216                    )
217                })?)
218                .expect("should be valid"),
219            policy: cached.policy,
220            digest: cached.digest,
221        }))
222    }
223
224    async fn put(
225        &self,
226        key: &str,
227        parts: &Parts,
228        policy: &CachePolicy,
229        digest: &str,
230    ) -> Result<()> {
231        self.0
232            .write_response(
233                key,
234                CachedResponseRef {
235                    status: parts.status,
236                    version: parts.version,
237                    headers: &parts.headers,
238                    digest,
239                    policy,
240                },
241            )
242            .await
243    }
244
245    async fn store<B: HttpBody>(
246        &self,
247        key: String,
248        parts: Parts,
249        body: B,
250        policy: CachePolicy,
251    ) -> Result<Response<Body<B>>> {
252        // Create a temporary file for the download of the body
253        let inner = self.0.clone();
254        let temp_dir = inner.temp_dir_path();
255        fs::create_dir_all(&temp_dir).with_context(|| {
256            format!(
257                "failed to create temporary directory `{path}`",
258                path = temp_dir.display()
259            )
260        })?;
261
262        // Create a new caching body from the upstream body
263        // The provided callback will be invoked once the cache file hsa been completed
264        let status = parts.status;
265        let version = parts.version;
266        let headers = parts.headers.clone();
267
268        let body = Body::from_caching_upstream(body, &temp_dir, move |digest, path| {
269            async move {
270                let content_path = inner.content_path(&digest);
271                fs::create_dir_all(content_path.parent().expect("should have parent"))
272                    .context("failed to create content directory")?;
273
274                // Atomically persist the temp file into the `content` location
275                path.persist(&content_path).with_context(|| {
276                    format!(
277                        "failed to persist downloaded body to content path `{path}`",
278                        path = content_path.display()
279                    )
280                })?;
281
282                // Update the response
283                inner
284                    .write_response(
285                        &key,
286                        CachedResponseRef {
287                            status,
288                            version,
289                            headers: &headers,
290                            digest: &digest,
291                            policy: &policy,
292                        },
293                    )
294                    .await?;
295
296                debug!(key, digest, "response body stored successfully");
297
298                Ok(())
299            }
300            .boxed()
301        })
302        .await?;
303
304        Ok(Response::from_parts(parts, body))
305    }
306
307    async fn delete(&self, key: &str) -> Result<()> {
308        // Acquire an exclusive lock on the response file
309        // By acquiring the lock, we truncate the file; any attempt to deserialize an
310        // empty response file will fail and be treated as not-present
311        self.0.lock_response_exclusive(key).await?;
312        Ok(())
313    }
314
315    fn body_path(&self, digest: &str) -> PathBuf {
316        self.0.content_path(digest)
317    }
318}
319
320/// Represents the default cache storage implementation.
321struct DefaultCacheStorageInner(PathBuf);
322
323impl DefaultCacheStorageInner {
324    /// Calculates the path to a response file.
325    fn response_path(&self, key: &str) -> PathBuf {
326        let mut path = self.0.to_path_buf();
327        path.push(STORAGE_VERSION);
328        path.push(RESPONSE_DIRECTORY_NAME);
329        path.push(key);
330        path
331    }
332
333    /// Calculates the path to a content file.
334    fn content_path(&self, digest: &str) -> PathBuf {
335        let mut path = self.0.to_path_buf();
336        path.push(STORAGE_VERSION);
337        path.push(CONTENT_DIRECTORY_NAME);
338        path.push(digest);
339        path
340    }
341
342    /// Calculates the path to the temp directory.
343    fn temp_dir_path(&self) -> PathBuf {
344        let mut path = self.0.to_path_buf();
345        path.push(STORAGE_VERSION);
346        path.push(TEMP_DIRECTORY_NAME);
347        path
348    }
349
350    /// Reads a response from storage for the given key.
351    ///
352    /// This method will block if the response file is exclusively locked.
353    async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
354        // Acquire a shared lock on the response file
355        let mut response = match self.lock_response_shared(key).await? {
356            Some(file) => file,
357            None => return Ok(None),
358        };
359
360        // Decode the cached response
361        Ok(
362            bincode::serde::decode_from_std_read::<CachedResponse, _, _>(
363                &mut response,
364                bincode::config::standard(),
365            )
366            .inspect_err(|e| {
367                debug!(
368                    "failed to deserialize response file `{path}`: {e} (cache entry will be \
369                     ignored)",
370                    path = self.response_path(key).display()
371                );
372            })
373            .ok(),
374        )
375    }
376
377    /// Writes a response to storage for the given key.
378    ///
379    /// This method will block if the response file is locked.
380    async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
381        // Acquire a shared lock on the response file
382        let mut file = self.lock_response_exclusive(key).await?;
383
384        // Encode the response
385        bincode::serde::encode_into_std_write(response, &mut file, bincode::config::standard())
386            .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
387            .map(|_| ())
388    }
389
390    /// Locks a response file for shared access.
391    ///
392    /// Returns `Ok(None)` if the file does not exist.
393    async fn lock_response_shared(&self, key: &str) -> Result<Option<File>> {
394        let path = self.response_path(key);
395        match fs::OpenOptions::new()
396            .read(true)
397            .open(&path)
398            .map(Some)
399            .or_else(|e| {
400                if e.kind() == io::ErrorKind::NotFound {
401                    Ok(None)
402                } else {
403                    Err(e)
404                }
405            })
406            .with_context(|| {
407                format!(
408                    "failed to open response file `{path}`",
409                    path = path.display()
410                )
411            })? {
412            Some(file) => {
413                match runtime::unwrap_task_output(
414                    runtime::spawn_blocking(move || {
415                        file.lock_shared()
416                            .context("failed to acquire shared lock on response file")?;
417                        Ok(file)
418                    })
419                    .await,
420                ) {
421                    Some(res) => res.map(Some),
422                    None => bail!("failed to wait for file lock"),
423                }
424            }
425            None => Ok(None),
426        }
427    }
428
429    /// Locks a response file for exclusive access.
430    ///
431    /// If the file does not exist, it is created.
432    ///
433    /// The file is intentionally truncated upon lock acquisition.
434    async fn lock_response_exclusive(&self, key: &str) -> Result<File> {
435        let path = self.response_path(key);
436        let dir = path.parent().expect("should have parent directory");
437        fs::create_dir_all(dir)
438            .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
439
440        let mut options = fs::OpenOptions::new();
441
442        // Note: we don't use the `truncate` option to truncate the file as we need the
443        // truncation to happen *after* the lock is acquired
444        options.create(true).write(true);
445
446        #[cfg(unix)]
447        {
448            // On Unix, make the mode 600
449            use std::os::unix::fs::OpenOptionsExt;
450            options.mode(0o600);
451        }
452
453        let file = options.open(&path).with_context(|| {
454            format!(
455                "failed to create response file `{path}`",
456                path = path.display()
457            )
458        })?;
459
460        let file = match runtime::unwrap_task_output(
461            runtime::spawn_blocking(move || {
462                file.lock()
463                    .context("failed to acquire exclusive lock on response file")?;
464                anyhow::Ok(file)
465            })
466            .await,
467        ) {
468            Some(res) => res?,
469            None => bail!("failed to wait for file lock"),
470        };
471
472        file.set_len(0).with_context(|| {
473            format!(
474                "failed to truncate response file `{path}`",
475                path = path.display()
476            )
477        })?;
478
479        Ok(file)
480    }
481}