http_cache_stream/storage/
default.rs

1//! Implementation of the default cache storage.
2
3use std::fs;
4use std::io;
5use std::path::PathBuf;
6
7use anyhow::Context;
8use anyhow::Result;
9use http::HeaderMap;
10use http::Response;
11use http::StatusCode;
12use http::Version;
13use http::response::Parts;
14use http_cache_semantics::CachePolicy;
15use serde::Deserialize;
16use serde::Serialize;
17use tracing::debug;
18
19use super::StoredResponse;
20use crate::HttpBody;
21use crate::body::Body;
22use crate::lock::LockedFile;
23use crate::lock::OpenOptionsExt;
24use crate::runtime;
25use crate::storage::CacheStorage;
26
27/// The current directory layout version.
28const STORAGE_VERSION: &str = "v1";
29/// The name of the `responses` directory.
30const RESPONSE_DIRECTORY_NAME: &str = "responses";
31/// The name of the `content` directory.
32const CONTENT_DIRECTORY_NAME: &str = "content";
33/// The name of the `tmp` directory.
34const TEMP_DIRECTORY_NAME: &str = "tmp";
35
36/// Represents a reference to a cached response.
37///
38/// This type is serialized to the response file.
39///
40/// This definition must be kept in sync with `CachedResponse`.
41#[derive(Serialize)]
42struct CachedResponseRef<'a> {
43    /// The response's status.
44    #[serde(with = "http_serde::status_code")]
45    status: StatusCode,
46
47    /// The response's version.
48    #[serde(with = "http_serde::version")]
49    version: Version,
50
51    /// The response's headers.
52    #[serde(with = "http_serde::header_map")]
53    headers: &'a HeaderMap,
54
55    /// The content digest of the response.
56    digest: &'a str,
57
58    /// The last used cached policy.
59    policy: &'a CachePolicy,
60}
61
62/// Represents a cached response.
63///
64/// This type is deserialized from the response file.
65#[derive(Deserialize)]
66struct CachedResponse {
67    /// The response's status.
68    #[serde(with = "http_serde::status_code")]
69    status: StatusCode,
70
71    /// The response's version.
72    #[serde(with = "http_serde::version")]
73    version: Version,
74
75    /// The response's headers.
76    #[serde(with = "http_serde::header_map")]
77    headers: HeaderMap,
78
79    /// The content digest of the response.
80    digest: String,
81
82    /// The last used cached policy.
83    policy: CachePolicy,
84}
85
86/// The default cache storage implementation.
87///
88/// ## Layout
89///
90/// This storage implementation uses the following directory structure:
91///
92/// ```text
93/// <root>/
94/// ├─ <storage-version>/
95/// │  ├─ responses/
96/// │  │  ├─ <key>
97/// │  │  ├─ <key>
98/// │  │  ├─ ...
99/// │  ├─ content/
100/// │  │  ├─ <digest>
101/// │  │  ├─ <digest>
102/// │  │  ├─ ...
103/// │  ├─ tmp/
104/// ```
105///
106/// Where `<root>` is the root storage directory, `<storage-version>` is a
107/// constant that changes when the directory layout changes (currently `v1`),
108/// `<key>` is supplied by the cache, and `<digest>` is the calculated digest of
109/// a response body.
110///
111/// ## The `responses` directory
112///
113/// The `responses` directory contains a file for each cached response.
114///
115/// The file is a bincode-serialized `CachedResponse` that contains information
116/// about the response, including the response body content digest.
117///
118/// ### Response file locking
119///
120/// Advisory file locks are obtained on a response file as the cache entries
121/// are read and updated.
122///
123/// This is used to coordinate access to the storage via this library; it does
124/// not protect against external modifications to the storage.
125///
126/// ## The `content` directory
127///
128/// The `content` directory contains a file for each cached response body.
129///
130/// The file name is the digest of the response body contents.
131///
132/// Currently the [`blake3`][blake3] hash algorithm is used for calculating
133/// response body digests.
134///
135/// ## The `tmp` directory
136///
137/// The `tmp` directory is used for temporarily storing response bodies as they
138/// are saved to the cache.
139///
140/// The content digest of the response is calculated as the response is written
141/// into temporary storage.
142///
143/// Once the response body has been fully read, the temporary file is atomically
144/// renamed to its content directory location; if the content already exists,
145/// the temporary file is deleted.
146///
147/// ## Integrity
148///
149/// This storage implementation does not provide strong guarantees on the
150/// integrity of the stored response bodies.
151///
152/// If the storage is externally modified, the modification will go undetected
153/// and the modified response bodies will be served.
154///
155/// ## Fault tolerance
156///
157/// If an error occurs while updating a cache entry with
158/// [`DefaultCacheStorage::put`], a future [`DefaultCacheStorage::get`] call
159/// will treat the entry as not present.
160///
161/// [blake3]: https://github.com/BLAKE3-team/BLAKE3
162pub struct DefaultCacheStorage(PathBuf);
163
164impl DefaultCacheStorage {
165    /// Constructs a new default cache storage with the given
166    pub fn new(root_dir: impl Into<PathBuf>) -> Self {
167        Self(root_dir.into())
168    }
169}
170
171impl DefaultCacheStorage {
172    /// Calculates the path to a response file.
173    fn response_path(&self, key: &str) -> PathBuf {
174        let mut path = self.0.to_path_buf();
175        path.push(STORAGE_VERSION);
176        path.push(RESPONSE_DIRECTORY_NAME);
177        path.push(key);
178        path
179    }
180
181    /// Calculates the path to a content file.
182    fn content_path(&self, digest: &str) -> PathBuf {
183        let mut path = self.0.to_path_buf();
184        path.push(STORAGE_VERSION);
185        path.push(CONTENT_DIRECTORY_NAME);
186        path.push(digest);
187        path
188    }
189
190    /// Calculates the path to the temp directory.
191    fn temp_dir_path(&self) -> PathBuf {
192        let mut path = self.0.to_path_buf();
193        path.push(STORAGE_VERSION);
194        path.push(TEMP_DIRECTORY_NAME);
195        path
196    }
197
198    /// Reads a response from storage for the given key.
199    ///
200    /// This method will block if the response file is exclusively locked.
201    async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
202        // Acquire a shared lock on the response file
203        let mut response = match self.lock_response_shared(key).await? {
204            Some(file) => file,
205            None => return Ok(None),
206        };
207
208        // Decode the cached response
209        Ok(
210            bincode::serde::decode_from_std_read::<CachedResponse, _, _>(
211                &mut *response,
212                bincode::config::standard(),
213            )
214            .inspect_err(|e| {
215                debug!(
216                    "failed to deserialize response file `{path}`: {e} (cache entry will be \
217                     ignored)",
218                    path = self.response_path(key).display()
219                );
220            })
221            .ok(),
222        )
223    }
224
225    /// Writes a response to storage for the given key.
226    ///
227    /// This method will block if the response file is locked.
228    async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
229        // Acquire a shared lock on the response file
230        let mut file: LockedFile = self.lock_response_exclusive(key).await?;
231
232        // Encode the response
233        bincode::serde::encode_into_std_write(response, &mut *file, bincode::config::standard())
234            .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
235            .map(|_| ())
236    }
237
238    /// Locks a response file for shared access.
239    ///
240    /// Returns `Ok(None)` if the file does not exist.
241    async fn lock_response_shared(&self, key: &str) -> Result<Option<LockedFile>> {
242        let path = self.response_path(key);
243        fs::OpenOptions::new()
244            .read(true)
245            .open_shared(&path)
246            .await
247            .map(Some)
248            .or_else(|e| {
249                if e.kind() == io::ErrorKind::NotFound {
250                    Ok(None)
251                } else {
252                    Err(e)
253                }
254            })
255            .with_context(|| {
256                format!(
257                    "failed to open response file `{path}` with a shared lock",
258                    path = path.display()
259                )
260            })
261    }
262
263    /// Locks a response file for exclusive access.
264    ///
265    /// If the file does not exist, it is created.
266    ///
267    /// The file is intentionally truncated upon lock acquisition.
268    async fn lock_response_exclusive(&self, key: &str) -> Result<LockedFile> {
269        let mut options = fs::OpenOptions::new();
270
271        // Note: we don't use the `truncate` option to truncate the file as we need the
272        // truncation to happen *after* the lock is acquired
273        options.create(true).write(true);
274
275        #[cfg(unix)]
276        {
277            // On Unix, make the mode 600
278            use std::os::unix::fs::OpenOptionsExt;
279            options.mode(0o600);
280        }
281
282        let path = self.response_path(key);
283        let dir = path.parent().expect("should have parent directory");
284        fs::create_dir_all(dir)
285            .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
286
287        let file = options.open_exclusive(&path).await.with_context(|| {
288            format!(
289                "failed to create response file `{path}` with exclusive lock",
290                path = path.display()
291            )
292        })?;
293
294        file.set_len(0).with_context(|| {
295            format!(
296                "failed to truncate response file `{path}`",
297                path = path.display()
298            )
299        })?;
300        Ok(file)
301    }
302}
303
304impl CacheStorage for DefaultCacheStorage {
305    async fn get<B: HttpBody>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
306        let cached = match self.read_response(key).await? {
307            Some(response) => response,
308            None => return Ok(None),
309        };
310
311        // Open the response body
312        let path = self.body_path(&cached.digest);
313        let body = match runtime::File::open(&path)
314            .await
315            .map(Some)
316            .or_else(|e| {
317                if e.kind() == io::ErrorKind::NotFound {
318                    Ok(None)
319                } else {
320                    Err(e)
321                }
322            })
323            .with_context(|| {
324                format!(
325                    "failed to open response body `{path}`",
326                    path = path.display()
327                )
328            })? {
329            Some(file) => file,
330            None => return Ok(None),
331        };
332
333        // Build a response from the cached parts
334        let mut builder = Response::builder()
335            .version(cached.version)
336            .status(cached.status);
337        let headers = builder.headers_mut().expect("should be valid");
338        headers.extend(cached.headers);
339
340        Ok(Some(StoredResponse {
341            response: builder
342                .body(Body::from_file(body).await.with_context(|| {
343                    format!(
344                        "failed to create response body for `{path}`",
345                        path = path.display()
346                    )
347                })?)
348                .expect("should be valid"),
349            policy: cached.policy,
350            digest: cached.digest,
351        }))
352    }
353
354    async fn put(
355        &self,
356        key: &str,
357        parts: &Parts,
358        policy: &CachePolicy,
359        digest: &str,
360    ) -> Result<()> {
361        self.write_response(
362            key,
363            CachedResponseRef {
364                status: parts.status,
365                version: parts.version,
366                headers: &parts.headers,
367                digest,
368                policy,
369            },
370        )
371        .await
372    }
373
374    async fn put_with_body<B: HttpBody>(
375        &self,
376        key: &str,
377        parts: &Parts,
378        policy: &CachePolicy,
379        body: B,
380    ) -> Result<(Body<B>, String)> {
381        // Create a temporary file for the download of the body
382        let temp_dir = self.temp_dir_path();
383        fs::create_dir_all(&temp_dir).with_context(|| {
384            format!(
385                "failed to create temporary directory `{path}`",
386                path = temp_dir.display()
387            )
388        })?;
389        let temp_path = tempfile::NamedTempFile::new_in(temp_dir)
390            .context("failed to create temporary body file for cache storage")?
391            .into_temp_path();
392
393        // Write the HTTP body to the file
394        let mut body_file = runtime::File::create(&*temp_path).await.with_context(|| {
395            format!(
396                "failed to create temporary body file `{path}`",
397                path = temp_path.display()
398            )
399        })?;
400        let digest = Body::from_upstream(body)
401            .write_to(&mut body_file)
402            .await
403            .with_context(|| {
404                format!(
405                    "failed to write to temporary body file `{path}`",
406                    path = temp_path.display()
407                )
408            })?;
409
410        // Drop the body file as we're about to rename it
411        drop(body_file);
412
413        let content_path = self.content_path(&digest);
414        fs::create_dir_all(content_path.parent().expect("should have parent"))
415            .context("failed to create content directory")?;
416
417        // Atomically persist the temp file into the `content` location
418        temp_path.persist(&content_path).with_context(|| {
419            format!(
420                "failed to persist downloaded body to content path `{path}`",
421                path = content_path.display()
422            )
423        })?;
424
425        // Update the response
426        self.write_response(
427            key,
428            CachedResponseRef {
429                status: parts.status,
430                version: parts.version,
431                headers: &parts.headers,
432                digest: &digest,
433                policy,
434            },
435        )
436        .await?;
437
438        Ok((
439            Body::from_file(runtime::File::open(&content_path).await.with_context(|| {
440                format!(
441                    "failed to open body content file `{path}`",
442                    path = content_path.display()
443                )
444            })?)
445            .await?,
446            digest,
447        ))
448    }
449
450    async fn delete(&self, key: &str) -> Result<()> {
451        // Acquire an exclusive lock on the response file
452        // By acquiring the lock, we truncate the file; any attempt to deserialize an
453        // empty response file will fail and be treated as not-present
454        self.lock_response_exclusive(key).await?;
455        Ok(())
456    }
457
458    fn body_path(&self, digest: &str) -> PathBuf {
459        self.content_path(digest)
460    }
461}