http_cache_stream/storage/default.rs
1//! Implementation of the default cache storage.
2
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use futures::FutureExt;
13use http::HeaderMap;
14use http::Response;
15use http::StatusCode;
16use http::Version;
17use http::response::Parts;
18use http_cache_semantics::CachePolicy;
19use serde::Deserialize;
20use serde::Serialize;
21use tracing::debug;
22
23use super::StoredResponse;
24use crate::HttpBody;
25use crate::body::Body;
26use crate::runtime;
27use crate::storage::CacheStorage;
28
29/// The current directory layout version.
30const STORAGE_VERSION: &str = "v1";
31/// The name of the `responses` directory.
32const RESPONSE_DIRECTORY_NAME: &str = "responses";
33/// The name of the `content` directory.
34const CONTENT_DIRECTORY_NAME: &str = "content";
35/// The name of the `tmp` directory.
36const TEMP_DIRECTORY_NAME: &str = "tmp";
37
38/// Represents a reference to a cached response.
39///
40/// This type is serialized to the response file.
41///
42/// This definition must be kept in sync with `CachedResponse`.
43#[derive(Serialize)]
44struct CachedResponseRef<'a> {
45 /// The response's status.
46 #[serde(with = "http_serde::status_code")]
47 status: StatusCode,
48
49 /// The response's version.
50 #[serde(with = "http_serde::version")]
51 version: Version,
52
53 /// The response's headers.
54 #[serde(with = "http_serde::header_map")]
55 headers: &'a HeaderMap,
56
57 /// The content digest of the response.
58 digest: &'a str,
59
60 /// The last used cached policy.
61 policy: &'a CachePolicy,
62}
63
64/// Represents a cached response.
65///
66/// This type is deserialized from the response file.
67#[derive(Deserialize)]
68struct CachedResponse {
69 /// The response's status.
70 #[serde(with = "http_serde::status_code")]
71 status: StatusCode,
72
73 /// The response's version.
74 #[serde(with = "http_serde::version")]
75 version: Version,
76
77 /// The response's headers.
78 #[serde(with = "http_serde::header_map")]
79 headers: HeaderMap,
80
81 /// The content digest of the response.
82 digest: String,
83
84 /// The last used cached policy.
85 policy: CachePolicy,
86}
87
88/// The default cache storage implementation.
89///
90/// ## Layout
91///
92/// This storage implementation uses the following directory structure:
93///
94/// ```text
95/// <root>/
96/// ├─ <storage-version>/
97/// │ ├─ responses/
98/// │ │ ├─ <key>
99/// │ │ ├─ <key>
100/// │ │ ├─ ...
101/// │ ├─ content/
102/// │ │ ├─ <digest>
103/// │ │ ├─ <digest>
104/// │ │ ├─ ...
105/// │ ├─ tmp/
106/// ```
107///
108/// Where `<root>` is the root storage directory, `<storage-version>` is a
109/// constant that changes when the directory layout changes (currently `v1`),
110/// `<key>` is supplied by the cache, and `<digest>` is the calculated digest of
111/// a response body.
112///
113/// ## The `responses` directory
114///
115/// The `responses` directory contains a file for each cached response.
116///
117/// The file is a bincode-serialized `CachedResponse` that contains information
118/// about the response, including the response body content digest.
119///
120/// ### Response file locking
121///
122/// Advisory file locks are obtained on a response file as the cache entries
123/// are read and updated.
124///
125/// This is used to coordinate access to the storage via this library; it does
126/// not protect against external modifications to the storage.
127///
128/// ## The `content` directory
129///
130/// The `content` directory contains a file for each cached response body.
131///
132/// The file name is the digest of the response body contents.
133///
134/// Currently the [`blake3`][blake3] hash algorithm is used for calculating
135/// response body digests.
136///
137/// ## The `tmp` directory
138///
139/// The `tmp` directory is used for temporarily storing response bodies as they
140/// are saved to the cache.
141///
142/// The content digest of the response is calculated as the response is written
143/// into temporary storage.
144///
145/// Once the response body has been fully read, the temporary file is atomically
146/// renamed to its content directory location; if the content already exists,
147/// the temporary file is deleted.
148///
149/// ## Integrity
150///
151/// This storage implementation does not provide strong guarantees on the
152/// integrity of the stored response bodies.
153///
154/// If the storage is externally modified, the modification will go undetected
155/// and the modified response bodies will be served.
156///
157/// ## Fault tolerance
158///
159/// If an error occurs while updating a cache entry with
160/// [`DefaultCacheStorage::put`], a future [`DefaultCacheStorage::get`] call
161/// will treat the entry as not present.
162///
163/// [blake3]: https://github.com/BLAKE3-team/BLAKE3
164#[derive(Clone)]
165pub struct DefaultCacheStorage(Arc<DefaultCacheStorageInner>);
166
167impl DefaultCacheStorage {
168 /// Constructs a new default cache storage with the given
169 pub fn new(root_dir: impl Into<PathBuf>) -> Self {
170 Self(Arc::new(DefaultCacheStorageInner(root_dir.into())))
171 }
172}
173
174impl CacheStorage for DefaultCacheStorage {
175 async fn get<B: HttpBody>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
176 let cached = match self.0.read_response(key).await? {
177 Some(response) => response,
178 None => return Ok(None),
179 };
180
181 // Open the response body
182 let path = self.body_path(&cached.digest);
183 let body = match runtime::File::open(&path)
184 .await
185 .map(Some)
186 .or_else(|e| {
187 if e.kind() == io::ErrorKind::NotFound {
188 Ok(None)
189 } else {
190 Err(e)
191 }
192 })
193 .with_context(|| {
194 format!(
195 "failed to open response body `{path}`",
196 path = path.display()
197 )
198 })? {
199 Some(file) => file,
200 None => return Ok(None),
201 };
202
203 // Build a response from the cached parts
204 let mut builder = Response::builder()
205 .version(cached.version)
206 .status(cached.status);
207 let headers = builder.headers_mut().expect("should be valid");
208 headers.extend(cached.headers);
209
210 Ok(Some(StoredResponse {
211 response: builder
212 .body(Body::from_file(body).await.with_context(|| {
213 format!(
214 "failed to create response body for `{path}`",
215 path = path.display()
216 )
217 })?)
218 .expect("should be valid"),
219 policy: cached.policy,
220 digest: cached.digest,
221 }))
222 }
223
224 async fn put(
225 &self,
226 key: &str,
227 parts: &Parts,
228 policy: &CachePolicy,
229 digest: &str,
230 ) -> Result<()> {
231 self.0
232 .write_response(
233 key,
234 CachedResponseRef {
235 status: parts.status,
236 version: parts.version,
237 headers: &parts.headers,
238 digest,
239 policy,
240 },
241 )
242 .await
243 }
244
245 async fn store<B: HttpBody>(
246 &self,
247 key: String,
248 parts: Parts,
249 body: B,
250 policy: CachePolicy,
251 ) -> Result<Response<Body<B>>> {
252 // Create a temporary file for the download of the body
253 let inner = self.0.clone();
254 let temp_dir = inner.temp_dir_path();
255 fs::create_dir_all(&temp_dir).with_context(|| {
256 format!(
257 "failed to create temporary directory `{path}`",
258 path = temp_dir.display()
259 )
260 })?;
261
262 // Create a new caching body from the upstream body
263 // The provided callback will be invoked once the cache file hsa been completed
264 let status = parts.status;
265 let version = parts.version;
266 let headers = parts.headers.clone();
267
268 let body = Body::from_caching_upstream(body, &temp_dir, move |digest, path| {
269 async move {
270 let content_path = inner.content_path(&digest);
271 fs::create_dir_all(content_path.parent().expect("should have parent"))
272 .context("failed to create content directory")?;
273
274 // Atomically persist the temp file into the `content` location
275 path.persist(&content_path).with_context(|| {
276 format!(
277 "failed to persist downloaded body to content path `{path}`",
278 path = content_path.display()
279 )
280 })?;
281
282 // Update the response
283 inner
284 .write_response(
285 &key,
286 CachedResponseRef {
287 status,
288 version,
289 headers: &headers,
290 digest: &digest,
291 policy: &policy,
292 },
293 )
294 .await?;
295
296 debug!(key, digest, "response body stored successfully");
297
298 Ok(())
299 }
300 .boxed()
301 })
302 .await?;
303
304 Ok(Response::from_parts(parts, body))
305 }
306
307 async fn delete(&self, key: &str) -> Result<()> {
308 // Acquire an exclusive lock on the response file
309 // By acquiring the lock, we truncate the file; any attempt to deserialize an
310 // empty response file will fail and be treated as not-present
311 self.0.lock_response_exclusive(key).await?;
312 Ok(())
313 }
314
315 fn body_path(&self, digest: &str) -> PathBuf {
316 self.0.content_path(digest)
317 }
318}
319
320/// Represents the default cache storage implementation.
321struct DefaultCacheStorageInner(PathBuf);
322
323impl DefaultCacheStorageInner {
324 /// Calculates the path to a response file.
325 fn response_path(&self, key: &str) -> PathBuf {
326 let mut path = self.0.to_path_buf();
327 path.push(STORAGE_VERSION);
328 path.push(RESPONSE_DIRECTORY_NAME);
329 path.push(key);
330 path
331 }
332
333 /// Calculates the path to a content file.
334 fn content_path(&self, digest: &str) -> PathBuf {
335 let mut path = self.0.to_path_buf();
336 path.push(STORAGE_VERSION);
337 path.push(CONTENT_DIRECTORY_NAME);
338 path.push(digest);
339 path
340 }
341
342 /// Calculates the path to the temp directory.
343 fn temp_dir_path(&self) -> PathBuf {
344 let mut path = self.0.to_path_buf();
345 path.push(STORAGE_VERSION);
346 path.push(TEMP_DIRECTORY_NAME);
347 path
348 }
349
350 /// Reads a response from storage for the given key.
351 ///
352 /// This method will block if the response file is exclusively locked.
353 async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
354 // Acquire a shared lock on the response file
355 let mut response = match self.lock_response_shared(key).await? {
356 Some(file) => file,
357 None => return Ok(None),
358 };
359
360 // Decode the cached response
361 Ok(
362 bincode::serde::decode_from_std_read::<CachedResponse, _, _>(
363 &mut response,
364 bincode::config::standard(),
365 )
366 .inspect_err(|e| {
367 debug!(
368 "failed to deserialize response file `{path}`: {e} (cache entry will be \
369 ignored)",
370 path = self.response_path(key).display()
371 );
372 })
373 .ok(),
374 )
375 }
376
377 /// Writes a response to storage for the given key.
378 ///
379 /// This method will block if the response file is locked.
380 async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
381 // Acquire a shared lock on the response file
382 let mut file = self.lock_response_exclusive(key).await?;
383
384 // Encode the response
385 bincode::serde::encode_into_std_write(response, &mut file, bincode::config::standard())
386 .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
387 .map(|_| ())
388 }
389
390 /// Locks a response file for shared access.
391 ///
392 /// Returns `Ok(None)` if the file does not exist.
393 async fn lock_response_shared(&self, key: &str) -> Result<Option<File>> {
394 let path = self.response_path(key);
395 match fs::OpenOptions::new()
396 .read(true)
397 .open(&path)
398 .map(Some)
399 .or_else(|e| {
400 if e.kind() == io::ErrorKind::NotFound {
401 Ok(None)
402 } else {
403 Err(e)
404 }
405 })
406 .with_context(|| {
407 format!(
408 "failed to open response file `{path}`",
409 path = path.display()
410 )
411 })? {
412 Some(file) => {
413 match runtime::unwrap_task_output(
414 runtime::spawn_blocking(move || {
415 file.lock_shared()
416 .context("failed to acquire shared lock on response file")?;
417 Ok(file)
418 })
419 .await,
420 ) {
421 Some(res) => res.map(Some),
422 None => bail!("failed to wait for file lock"),
423 }
424 }
425 None => Ok(None),
426 }
427 }
428
429 /// Locks a response file for exclusive access.
430 ///
431 /// If the file does not exist, it is created.
432 ///
433 /// The file is intentionally truncated upon lock acquisition.
434 async fn lock_response_exclusive(&self, key: &str) -> Result<File> {
435 let path = self.response_path(key);
436 let dir = path.parent().expect("should have parent directory");
437 fs::create_dir_all(dir)
438 .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
439
440 let mut options = fs::OpenOptions::new();
441
442 // Note: we don't use the `truncate` option to truncate the file as we need the
443 // truncation to happen *after* the lock is acquired
444 options.create(true).write(true);
445
446 #[cfg(unix)]
447 {
448 // On Unix, make the mode 600
449 use std::os::unix::fs::OpenOptionsExt;
450 options.mode(0o600);
451 }
452
453 let file = options.open(&path).with_context(|| {
454 format!(
455 "failed to create response file `{path}`",
456 path = path.display()
457 )
458 })?;
459
460 let file = match runtime::unwrap_task_output(
461 runtime::spawn_blocking(move || {
462 file.lock()
463 .context("failed to acquire exclusive lock on response file")?;
464 anyhow::Ok(file)
465 })
466 .await,
467 ) {
468 Some(res) => res?,
469 None => bail!("failed to wait for file lock"),
470 };
471
472 file.set_len(0).with_context(|| {
473 format!(
474 "failed to truncate response file `{path}`",
475 path = path.display()
476 )
477 })?;
478
479 Ok(file)
480 }
481}