http_cache_stream/storage/
default.rs1use std::fs;
4use std::fs::File;
5use std::io;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use futures::FutureExt;
13use http::HeaderMap;
14use http::Response;
15use http::StatusCode;
16use http::Version;
17use http::response::Parts;
18use http_body::Body;
19use http_cache_semantics::CachePolicy;
20use serde::Deserialize;
21use serde::Serialize;
22use tracing::debug;
23
24use super::StoredResponse;
25use crate::body::CacheBody;
26use crate::runtime;
27use crate::storage::CacheStorage;
28
29const STORAGE_VERSION: &str = "v1";
31const RESPONSE_DIRECTORY_NAME: &str = "responses";
33const CONTENT_DIRECTORY_NAME: &str = "content";
35const TEMP_DIRECTORY_NAME: &str = "tmp";
37
38#[derive(Serialize)]
44struct CachedResponseRef<'a> {
45 #[serde(with = "http_serde::status_code")]
47 status: StatusCode,
48
49 #[serde(with = "http_serde::version")]
51 version: Version,
52
53 #[serde(with = "http_serde::header_map")]
55 headers: &'a HeaderMap,
56
57 digest: &'a str,
59
60 policy: &'a CachePolicy,
62}
63
64#[derive(Deserialize)]
68struct CachedResponse {
69 #[serde(with = "http_serde::status_code")]
71 status: StatusCode,
72
73 #[serde(with = "http_serde::version")]
75 version: Version,
76
77 #[serde(with = "http_serde::header_map")]
79 headers: HeaderMap,
80
81 digest: String,
83
84 policy: CachePolicy,
86}
87
88#[derive(Clone)]
165pub struct DefaultCacheStorage(Arc<DefaultCacheStorageInner>);
166
167impl DefaultCacheStorage {
168 pub fn new(root_dir: impl Into<PathBuf>) -> Self {
170 Self(Arc::new(DefaultCacheStorageInner(root_dir.into())))
171 }
172}
173
174impl CacheStorage for DefaultCacheStorage {
175 async fn get<B: Body + Send>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
176 let cached = match self.0.read_response(key).await? {
177 Some(response) => response,
178 None => return Ok(None),
179 };
180
181 let path = self.body_path(&cached.digest);
183 let body = match runtime::File::open(&path)
184 .await
185 .map(Some)
186 .or_else(|e| {
187 if e.kind() == io::ErrorKind::NotFound {
188 Ok(None)
189 } else {
190 Err(e)
191 }
192 })
193 .with_context(|| {
194 format!(
195 "failed to open response body `{path}`",
196 path = path.display()
197 )
198 })? {
199 Some(file) => file,
200 None => return Ok(None),
201 };
202
203 let mut builder = Response::builder()
205 .version(cached.version)
206 .status(cached.status);
207 let headers = builder.headers_mut().expect("should be valid");
208 headers.extend(cached.headers);
209
210 Ok(Some(StoredResponse {
211 response: builder
212 .body(CacheBody::from_file(body).await.with_context(|| {
213 format!(
214 "failed to create response body for `{path}`",
215 path = path.display()
216 )
217 })?)
218 .expect("should be valid"),
219 policy: cached.policy,
220 digest: cached.digest,
221 }))
222 }
223
224 async fn put(
225 &self,
226 key: &str,
227 parts: &Parts,
228 policy: &CachePolicy,
229 digest: &str,
230 ) -> Result<()> {
231 self.0
232 .write_response(
233 key,
234 CachedResponseRef {
235 status: parts.status,
236 version: parts.version,
237 headers: &parts.headers,
238 digest,
239 policy,
240 },
241 )
242 .await
243 }
244
245 async fn store<B: Body + Send>(
246 &self,
247 key: String,
248 parts: Parts,
249 body: B,
250 policy: CachePolicy,
251 ) -> Result<Response<CacheBody<B>>> {
252 let inner = self.0.clone();
254 let temp_dir = inner.temp_dir_path();
255 fs::create_dir_all(&temp_dir).with_context(|| {
256 format!(
257 "failed to create temporary directory `{path}`",
258 path = temp_dir.display()
259 )
260 })?;
261
262 let status = parts.status;
265 let version = parts.version;
266 let headers = parts.headers.clone();
267
268 let body = CacheBody::from_caching_upstream(body, &temp_dir, move |digest, path| {
269 async move {
270 let content_path = inner.content_path(&digest);
271 fs::create_dir_all(content_path.parent().expect("should have parent"))
272 .context("failed to create content directory")?;
273
274 path.persist(&content_path).with_context(|| {
276 format!(
277 "failed to persist downloaded body to content path `{path}`",
278 path = content_path.display()
279 )
280 })?;
281
282 inner
284 .write_response(
285 &key,
286 CachedResponseRef {
287 status,
288 version,
289 headers: &headers,
290 digest: &digest,
291 policy: &policy,
292 },
293 )
294 .await?;
295
296 debug!(key, digest, "response body stored successfully");
297
298 Ok(())
299 }
300 .boxed()
301 })
302 .await?;
303
304 Ok(Response::from_parts(parts, body))
305 }
306
307 async fn delete(&self, key: &str) -> Result<()> {
308 self.0.lock_response_exclusive(key).await?;
312 Ok(())
313 }
314
315 fn body_path(&self, digest: &str) -> PathBuf {
316 self.0.content_path(digest)
317 }
318}
319
320struct DefaultCacheStorageInner(PathBuf);
322
323impl DefaultCacheStorageInner {
324 fn response_path(&self, key: &str) -> PathBuf {
326 let mut path = self.0.to_path_buf();
327 path.push(STORAGE_VERSION);
328 path.push(RESPONSE_DIRECTORY_NAME);
329 path.push(key);
330 path
331 }
332
333 fn content_path(&self, digest: &str) -> PathBuf {
335 let mut path = self.0.to_path_buf();
336 path.push(STORAGE_VERSION);
337 path.push(CONTENT_DIRECTORY_NAME);
338 path.push(digest);
339 path
340 }
341
342 fn temp_dir_path(&self) -> PathBuf {
344 let mut path = self.0.to_path_buf();
345 path.push(STORAGE_VERSION);
346 path.push(TEMP_DIRECTORY_NAME);
347 path
348 }
349
350 async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
354 let mut response = match self.lock_response_shared(key).await? {
356 Some(file) => file,
357 None => return Ok(None),
358 };
359
360 Ok(
362 bincode::serde::decode_from_std_read::<CachedResponse, _, _>(
363 &mut response,
364 bincode::config::standard(),
365 )
366 .inspect_err(|e| {
367 debug!(
368 "failed to deserialize response file `{path}`: {e} (cache entry will be \
369 ignored)",
370 path = self.response_path(key).display()
371 );
372 })
373 .ok(),
374 )
375 }
376
377 async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
381 let mut file = self.lock_response_exclusive(key).await?;
383
384 bincode::serde::encode_into_std_write(response, &mut file, bincode::config::standard())
386 .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
387 .map(|_| ())
388 }
389
390 async fn lock_response_shared(&self, key: &str) -> Result<Option<File>> {
394 let path = self.response_path(key);
395 match fs::OpenOptions::new()
396 .read(true)
397 .open(&path)
398 .map(Some)
399 .or_else(|e| {
400 if e.kind() == io::ErrorKind::NotFound {
401 Ok(None)
402 } else {
403 Err(e)
404 }
405 })
406 .with_context(|| {
407 format!(
408 "failed to open response file `{path}`",
409 path = path.display()
410 )
411 })? {
412 Some(file) => {
413 match runtime::unwrap_task_output(
414 runtime::spawn_blocking(move || {
415 file.lock_shared()
416 .context("failed to acquire shared lock on response file")?;
417 Ok(file)
418 })
419 .await,
420 ) {
421 Some(res) => res.map(Some),
422 None => bail!("failed to wait for file lock"),
423 }
424 }
425 None => Ok(None),
426 }
427 }
428
429 async fn lock_response_exclusive(&self, key: &str) -> Result<File> {
435 let path = self.response_path(key);
436 let dir = path.parent().expect("should have parent directory");
437 fs::create_dir_all(dir)
438 .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
439
440 let mut options = fs::OpenOptions::new();
441
442 options.create(true).write(true);
445
446 #[cfg(unix)]
447 {
448 use std::os::unix::fs::OpenOptionsExt;
450 options.mode(0o600);
451 }
452
453 let file = options.open(&path).with_context(|| {
454 format!(
455 "failed to create response file `{path}`",
456 path = path.display()
457 )
458 })?;
459
460 let file = match runtime::unwrap_task_output(
461 runtime::spawn_blocking(move || {
462 file.lock()
463 .context("failed to acquire exclusive lock on response file")?;
464 anyhow::Ok(file)
465 })
466 .await,
467 ) {
468 Some(res) => res?,
469 None => bail!("failed to wait for file lock"),
470 };
471
472 file.set_len(0).with_context(|| {
473 format!(
474 "failed to truncate response file `{path}`",
475 path = path.display()
476 )
477 })?;
478
479 Ok(file)
480 }
481}
482
483#[cfg(all(test, feature = "tokio"))]
484mod test {
485 use futures::StreamExt;
486 use http::Request;
487 use http_body_util::BodyDataStream;
488 use http_cache_semantics::CachePolicy;
489 use tempfile::tempdir;
490
491 use super::*;
492
493 #[tokio::test]
494 async fn cache_miss() {
495 let dir = tempdir().unwrap();
496 let storage = DefaultCacheStorage::new(dir.path());
497 assert!(
498 storage
499 .get::<String>("does-not-exist")
500 .await
501 .expect("should not fail")
502 .is_none()
503 );
504 }
505
506 #[tokio::test]
507 async fn cache_hit() {
508 const KEY: &str = "key";
509 const BODY: &str = "hello world";
510 const DIGEST: &str = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
511 const HEADER_NAME: &str = "foo";
512 const HEADER_VALUE: &str = "bar";
513
514 let dir = tempdir().unwrap();
515 let storage = DefaultCacheStorage::new(dir.path());
516
517 assert!(storage.get::<String>(KEY).await.unwrap().is_none());
519
520 let request = Request::builder().body("").unwrap();
522 let response = Response::builder().body(BODY.to_string()).unwrap();
523 let policy: CachePolicy = CachePolicy::new(&request, &response);
524
525 let (parts, body) = response.into_parts();
526 let response = storage
527 .store(KEY.to_string(), parts, body, policy)
528 .await
529 .unwrap();
530
531 let mut stream = BodyDataStream::new(response.into_body());
533 let data = stream.next().await.unwrap().unwrap();
534 assert!(stream.next().await.is_none());
535 assert_eq!(data, BODY);
536 drop(stream);
537
538 let cached = storage.get::<String>(KEY).await.unwrap().unwrap();
540 assert!(cached.response.headers().get(HEADER_NAME).is_none());
541
542 let data = BodyDataStream::new(cached.response.into_body())
544 .next()
545 .await
546 .unwrap()
547 .unwrap();
548 assert_eq!(data, BODY);
549 assert_eq!(cached.digest, DIGEST);
550
551 let response = Response::builder()
553 .header(HEADER_NAME, HEADER_VALUE)
554 .body(BODY.to_string())
555 .unwrap();
556 let policy = CachePolicy::new(&request, &response);
557
558 let (parts, _) = response.into_parts();
559 storage.put(KEY, &parts, &policy, DIGEST).await.unwrap();
560
561 let cached = storage.get::<String>(KEY).await.unwrap().unwrap();
563 assert_eq!(
564 cached
565 .response
566 .headers()
567 .get(HEADER_NAME)
568 .map(|v| v.to_str().unwrap()),
569 Some(HEADER_VALUE)
570 );
571
572 let data = BodyDataStream::new(cached.response.into_body())
574 .next()
575 .await
576 .unwrap()
577 .unwrap();
578 assert_eq!(data, BODY);
579 assert_eq!(cached.digest, DIGEST);
580
581 storage.delete(KEY).await.unwrap();
583 assert!(storage.get::<String>(KEY).await.unwrap().is_none());
584 }
585}