http_cache_stream/storage/
default.rs1use std::fs;
4use std::fs::File;
5use std::io;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use futures::FutureExt;
13use http::HeaderMap;
14use http::Response;
15use http::StatusCode;
16use http::Version;
17use http::response::Parts;
18use http_body::Body;
19use http_cache_semantics::CachePolicy;
20use serde::Deserialize;
21use serde::Serialize;
22use tracing::debug;
23
24use super::StoredResponse;
25use crate::body::CacheBody;
26use crate::runtime;
27use crate::storage::CacheStorage;
28
29const STORAGE_VERSION: &str = "v1";
31const RESPONSE_DIRECTORY_NAME: &str = "responses";
33const CONTENT_DIRECTORY_NAME: &str = "content";
35const TEMP_DIRECTORY_NAME: &str = "tmp";
37
38#[derive(Serialize)]
44struct CachedResponseRef<'a> {
45 #[serde(with = "http_serde::status_code")]
47 status: StatusCode,
48
49 #[serde(with = "http_serde::version")]
51 version: Version,
52
53 #[serde(with = "http_serde::header_map")]
55 headers: &'a HeaderMap,
56
57 digest: &'a str,
59
60 policy: &'a CachePolicy,
62}
63
64#[derive(Deserialize)]
68struct CachedResponse {
69 #[serde(with = "http_serde::status_code")]
71 status: StatusCode,
72
73 #[serde(with = "http_serde::version")]
75 version: Version,
76
77 #[serde(with = "http_serde::header_map")]
79 headers: HeaderMap,
80
81 digest: String,
83
84 policy: CachePolicy,
86}
87
88#[derive(Clone)]
165pub struct DefaultCacheStorage(Arc<DefaultCacheStorageInner>);
166
167impl DefaultCacheStorage {
168 pub fn new(root_dir: impl Into<PathBuf>) -> Self {
170 Self(Arc::new(DefaultCacheStorageInner(root_dir.into())))
171 }
172}
173
174impl CacheStorage for DefaultCacheStorage {
175 async fn get<B: Body + Send>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
176 let cached = match self.0.read_response(key).await? {
177 Some(response) => response,
178 None => return Ok(None),
179 };
180
181 let path = self.body_path(&cached.digest);
183 let body = match runtime::File::open(&path)
184 .await
185 .map(Some)
186 .or_else(|e| {
187 if e.kind() == io::ErrorKind::NotFound {
188 Ok(None)
189 } else {
190 Err(e)
191 }
192 })
193 .with_context(|| {
194 format!(
195 "failed to open response body `{path}`",
196 path = path.display()
197 )
198 })? {
199 Some(file) => file,
200 None => return Ok(None),
201 };
202
203 let mut builder = Response::builder()
205 .version(cached.version)
206 .status(cached.status);
207 let headers = builder.headers_mut().expect("should be valid");
208 headers.extend(cached.headers);
209
210 Ok(Some(StoredResponse {
211 response: builder
212 .body(CacheBody::from_file(body).await.with_context(|| {
213 format!(
214 "failed to create response body for `{path}`",
215 path = path.display()
216 )
217 })?)
218 .expect("should be valid"),
219 policy: cached.policy,
220 digest: cached.digest,
221 }))
222 }
223
224 async fn put(
225 &self,
226 key: &str,
227 parts: &Parts,
228 policy: &CachePolicy,
229 digest: &str,
230 ) -> Result<()> {
231 self.0
232 .write_response(
233 key,
234 CachedResponseRef {
235 status: parts.status,
236 version: parts.version,
237 headers: &parts.headers,
238 digest,
239 policy,
240 },
241 )
242 .await
243 }
244
245 async fn store<B: Body + Send>(
246 &self,
247 key: String,
248 parts: Parts,
249 body: B,
250 policy: CachePolicy,
251 ) -> Result<Response<CacheBody<B>>> {
252 let inner = self.0.clone();
254 let temp_dir = inner.temp_dir_path();
255 fs::create_dir_all(&temp_dir).with_context(|| {
256 format!(
257 "failed to create temporary directory `{path}`",
258 path = temp_dir.display()
259 )
260 })?;
261
262 let status = parts.status;
265 let version = parts.version;
266 let headers = parts.headers.clone();
267
268 let body = CacheBody::from_caching_upstream(body, &temp_dir, move |digest, path| {
269 async move {
270 let content_path = inner.content_path(&digest);
271 fs::create_dir_all(content_path.parent().expect("should have parent"))
272 .context("failed to create content directory")?;
273
274 path.persist(&content_path).with_context(|| {
276 format!(
277 "failed to persist downloaded body to content path `{path}`",
278 path = content_path.display()
279 )
280 })?;
281
282 inner
284 .write_response(
285 &key,
286 CachedResponseRef {
287 status,
288 version,
289 headers: &headers,
290 digest: &digest,
291 policy: &policy,
292 },
293 )
294 .await?;
295
296 debug!(key, digest, "response body stored successfully");
297
298 Ok(())
299 }
300 .boxed()
301 })
302 .await?;
303
304 Ok(Response::from_parts(parts, body))
305 }
306
307 async fn delete(&self, key: &str) -> Result<()> {
308 self.0.lock_response_exclusive(key).await?;
312 Ok(())
313 }
314
315 fn body_path(&self, digest: &str) -> PathBuf {
316 self.0.content_path(digest)
317 }
318}
319
320struct DefaultCacheStorageInner(PathBuf);
322
323impl DefaultCacheStorageInner {
324 fn response_path(&self, key: &str) -> PathBuf {
326 let mut path = self.0.to_path_buf();
327 path.push(STORAGE_VERSION);
328 path.push(RESPONSE_DIRECTORY_NAME);
329 path.push(key);
330 path
331 }
332
333 fn content_path(&self, digest: &str) -> PathBuf {
335 let mut path = self.0.to_path_buf();
336 path.push(STORAGE_VERSION);
337 path.push(CONTENT_DIRECTORY_NAME);
338 path.push(digest);
339 path
340 }
341
342 fn temp_dir_path(&self) -> PathBuf {
344 let mut path = self.0.to_path_buf();
345 path.push(STORAGE_VERSION);
346 path.push(TEMP_DIRECTORY_NAME);
347 path
348 }
349
350 async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
354 let mut response = match self.lock_response_shared(key).await? {
356 Some(file) => file,
357 None => return Ok(None),
358 };
359
360 Ok(bincode::deserialize_from(&mut response)
362 .inspect_err(|e| {
363 debug!(
364 "failed to deserialize response file `{path}`: {e} (cache entry will be \
365 ignored)",
366 path = self.response_path(key).display()
367 );
368 })
369 .ok())
370 }
371
372 async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
376 let mut file = self.lock_response_exclusive(key).await?;
378
379 bincode::serialize_into(&mut file, &response)
381 .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
382 .map(|_| ())
383 }
384
385 async fn lock_response_shared(&self, key: &str) -> Result<Option<File>> {
389 let path = self.response_path(key);
390 match fs::OpenOptions::new()
391 .read(true)
392 .open(&path)
393 .map(Some)
394 .or_else(|e| {
395 if e.kind() == io::ErrorKind::NotFound {
396 Ok(None)
397 } else {
398 Err(e)
399 }
400 })
401 .with_context(|| {
402 format!(
403 "failed to open response file `{path}`",
404 path = path.display()
405 )
406 })? {
407 Some(file) => {
408 match runtime::unwrap_task_output(
409 runtime::spawn_blocking(move || {
410 file.lock_shared()
411 .context("failed to acquire shared lock on response file")?;
412 Ok(file)
413 })
414 .await,
415 ) {
416 Some(res) => res.map(Some),
417 None => bail!("failed to wait for file lock"),
418 }
419 }
420 None => Ok(None),
421 }
422 }
423
424 async fn lock_response_exclusive(&self, key: &str) -> Result<File> {
430 let path = self.response_path(key);
431 let dir = path.parent().expect("should have parent directory");
432 fs::create_dir_all(dir)
433 .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
434
435 let mut options = fs::OpenOptions::new();
436
437 options.create(true).write(true);
440
441 #[cfg(unix)]
442 {
443 use std::os::unix::fs::OpenOptionsExt;
445 options.mode(0o600);
446 }
447
448 let file = options.open(&path).with_context(|| {
449 format!(
450 "failed to create response file `{path}`",
451 path = path.display()
452 )
453 })?;
454
455 let file = match runtime::unwrap_task_output(
456 runtime::spawn_blocking(move || {
457 file.lock()
458 .context("failed to acquire exclusive lock on response file")?;
459 anyhow::Ok(file)
460 })
461 .await,
462 ) {
463 Some(res) => res?,
464 None => bail!("failed to wait for file lock"),
465 };
466
467 file.set_len(0).with_context(|| {
468 format!(
469 "failed to truncate response file `{path}`",
470 path = path.display()
471 )
472 })?;
473
474 Ok(file)
475 }
476}
477
478#[cfg(all(test, feature = "tokio"))]
479mod test {
480 use futures::StreamExt;
481 use http::Request;
482 use http_body_util::BodyDataStream;
483 use http_cache_semantics::CachePolicy;
484 use tempfile::tempdir;
485
486 use super::*;
487
488 #[tokio::test]
489 async fn cache_miss() {
490 let dir = tempdir().unwrap();
491 let storage = DefaultCacheStorage::new(dir.path());
492 assert!(
493 storage
494 .get::<String>("does-not-exist")
495 .await
496 .expect("should not fail")
497 .is_none()
498 );
499 }
500
501 #[tokio::test]
502 async fn cache_hit() {
503 const KEY: &str = "key";
504 const BODY: &str = "hello world";
505 const DIGEST: &str = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
506 const HEADER_NAME: &str = "foo";
507 const HEADER_VALUE: &str = "bar";
508
509 let dir = tempdir().unwrap();
510 let storage = DefaultCacheStorage::new(dir.path());
511
512 assert!(storage.get::<String>(KEY).await.unwrap().is_none());
514
515 let request = Request::builder().body("").unwrap();
517 let response = Response::builder().body(BODY.to_string()).unwrap();
518 let policy: CachePolicy = CachePolicy::new(&request, &response);
519
520 let (parts, body) = response.into_parts();
521 let response = storage
522 .store(KEY.to_string(), parts, body, policy)
523 .await
524 .unwrap();
525
526 let mut stream = BodyDataStream::new(response.into_body());
528 let data = stream.next().await.unwrap().unwrap();
529 assert!(stream.next().await.is_none());
530 assert_eq!(data, BODY);
531 drop(stream);
532
533 let cached = storage.get::<String>(KEY).await.unwrap().unwrap();
535 assert!(cached.response.headers().get(HEADER_NAME).is_none());
536
537 let data = BodyDataStream::new(cached.response.into_body())
539 .next()
540 .await
541 .unwrap()
542 .unwrap();
543 assert_eq!(data, BODY);
544 assert_eq!(cached.digest, DIGEST);
545
546 let response = Response::builder()
548 .header(HEADER_NAME, HEADER_VALUE)
549 .body(BODY.to_string())
550 .unwrap();
551 let policy = CachePolicy::new(&request, &response);
552
553 let (parts, _) = response.into_parts();
554 storage.put(KEY, &parts, &policy, DIGEST).await.unwrap();
555
556 let cached = storage.get::<String>(KEY).await.unwrap().unwrap();
558 assert_eq!(
559 cached
560 .response
561 .headers()
562 .get(HEADER_NAME)
563 .map(|v| v.to_str().unwrap()),
564 Some(HEADER_VALUE)
565 );
566
567 let data = BodyDataStream::new(cached.response.into_body())
569 .next()
570 .await
571 .unwrap()
572 .unwrap();
573 assert_eq!(data, BODY);
574 assert_eq!(cached.digest, DIGEST);
575
576 storage.delete(KEY).await.unwrap();
578 assert!(storage.get::<String>(KEY).await.unwrap().is_none());
579 }
580}