http_cache_stream/storage/
default.rs1use std::fs;
4use std::io;
5use std::path::PathBuf;
6
7use anyhow::Context;
8use anyhow::Result;
9use http::HeaderMap;
10use http::Response;
11use http::StatusCode;
12use http::Version;
13use http::response::Parts;
14use http_cache_semantics::CachePolicy;
15use serde::Deserialize;
16use serde::Serialize;
17use tracing::debug;
18
19use super::StoredResponse;
20use crate::HttpBody;
21use crate::body::Body;
22use crate::lock::LockedFile;
23use crate::lock::OpenOptionsExt;
24use crate::runtime;
25use crate::storage::CacheStorage;
26
27const STORAGE_VERSION: &str = "v1";
29const RESPONSE_DIRECTORY_NAME: &str = "responses";
31const CONTENT_DIRECTORY_NAME: &str = "content";
33const TEMP_DIRECTORY_NAME: &str = "tmp";
35
36#[derive(Serialize)]
42struct CachedResponseRef<'a> {
43 #[serde(with = "http_serde::status_code")]
45 status: StatusCode,
46
47 #[serde(with = "http_serde::version")]
49 version: Version,
50
51 #[serde(with = "http_serde::header_map")]
53 headers: &'a HeaderMap,
54
55 digest: &'a str,
57
58 policy: &'a CachePolicy,
60}
61
62#[derive(Deserialize)]
66struct CachedResponse {
67 #[serde(with = "http_serde::status_code")]
69 status: StatusCode,
70
71 #[serde(with = "http_serde::version")]
73 version: Version,
74
75 #[serde(with = "http_serde::header_map")]
77 headers: HeaderMap,
78
79 digest: String,
81
82 policy: CachePolicy,
84}
85
86pub struct DefaultCacheStorage(PathBuf);
163
164impl DefaultCacheStorage {
165 pub fn new(root_dir: impl Into<PathBuf>) -> Self {
167 Self(root_dir.into())
168 }
169}
170
171impl DefaultCacheStorage {
172 fn response_path(&self, key: &str) -> PathBuf {
174 let mut path = self.0.to_path_buf();
175 path.push(STORAGE_VERSION);
176 path.push(RESPONSE_DIRECTORY_NAME);
177 path.push(key);
178 path
179 }
180
181 fn content_path(&self, digest: &str) -> PathBuf {
183 let mut path = self.0.to_path_buf();
184 path.push(STORAGE_VERSION);
185 path.push(CONTENT_DIRECTORY_NAME);
186 path.push(digest);
187 path
188 }
189
190 fn temp_dir_path(&self) -> PathBuf {
192 let mut path = self.0.to_path_buf();
193 path.push(STORAGE_VERSION);
194 path.push(TEMP_DIRECTORY_NAME);
195 path
196 }
197
198 async fn read_response(&self, key: &str) -> Result<Option<CachedResponse>> {
202 let mut response = match self.lock_response_shared(key).await? {
204 Some(file) => file,
205 None => return Ok(None),
206 };
207
208 Ok(
210 bincode::serde::decode_from_std_read::<CachedResponse, _, _>(
211 &mut *response,
212 bincode::config::standard(),
213 )
214 .inspect_err(|e| {
215 debug!(
216 "failed to deserialize response file `{path}`: {e} (cache entry will be \
217 ignored)",
218 path = self.response_path(key).display()
219 );
220 })
221 .ok(),
222 )
223 }
224
225 async fn write_response(&self, key: &str, response: CachedResponseRef<'_>) -> Result<()> {
229 let mut file: LockedFile = self.lock_response_exclusive(key).await?;
231
232 bincode::serde::encode_into_std_write(response, &mut *file, bincode::config::standard())
234 .with_context(|| format!("failed to serialize response data for cache key `{key}`"))
235 .map(|_| ())
236 }
237
238 async fn lock_response_shared(&self, key: &str) -> Result<Option<LockedFile>> {
242 let path = self.response_path(key);
243 fs::OpenOptions::new()
244 .read(true)
245 .open_shared(&path)
246 .await
247 .map(Some)
248 .or_else(|e| {
249 if e.kind() == io::ErrorKind::NotFound {
250 Ok(None)
251 } else {
252 Err(e)
253 }
254 })
255 .with_context(|| {
256 format!(
257 "failed to open response file `{path}` with a shared lock",
258 path = path.display()
259 )
260 })
261 }
262
263 async fn lock_response_exclusive(&self, key: &str) -> Result<LockedFile> {
269 let mut options = fs::OpenOptions::new();
270
271 options.create(true).write(true);
274
275 #[cfg(unix)]
276 {
277 use std::os::unix::fs::OpenOptionsExt;
279 options.mode(0o600);
280 }
281
282 let path = self.response_path(key);
283 let dir = path.parent().expect("should have parent directory");
284 fs::create_dir_all(dir)
285 .with_context(|| format!("failed to create directory `{dir}`", dir = dir.display()))?;
286
287 let file = options.open_exclusive(&path).await.with_context(|| {
288 format!(
289 "failed to create response file `{path}` with exclusive lock",
290 path = path.display()
291 )
292 })?;
293
294 file.set_len(0).with_context(|| {
295 format!(
296 "failed to truncate response file `{path}`",
297 path = path.display()
298 )
299 })?;
300 Ok(file)
301 }
302}
303
304impl CacheStorage for DefaultCacheStorage {
305 async fn get<B: HttpBody>(&self, key: &str) -> Result<Option<StoredResponse<B>>> {
306 let cached = match self.read_response(key).await? {
307 Some(response) => response,
308 None => return Ok(None),
309 };
310
311 let path = self.body_path(&cached.digest);
313 let body = match runtime::File::open(&path)
314 .await
315 .map(Some)
316 .or_else(|e| {
317 if e.kind() == io::ErrorKind::NotFound {
318 Ok(None)
319 } else {
320 Err(e)
321 }
322 })
323 .with_context(|| {
324 format!(
325 "failed to open response body `{path}`",
326 path = path.display()
327 )
328 })? {
329 Some(file) => file,
330 None => return Ok(None),
331 };
332
333 let mut builder = Response::builder()
335 .version(cached.version)
336 .status(cached.status);
337 let headers = builder.headers_mut().expect("should be valid");
338 headers.extend(cached.headers);
339
340 Ok(Some(StoredResponse {
341 response: builder
342 .body(Body::from_file(body).await.with_context(|| {
343 format!(
344 "failed to create response body for `{path}`",
345 path = path.display()
346 )
347 })?)
348 .expect("should be valid"),
349 policy: cached.policy,
350 digest: cached.digest,
351 }))
352 }
353
354 async fn put(
355 &self,
356 key: &str,
357 parts: &Parts,
358 policy: &CachePolicy,
359 digest: &str,
360 ) -> Result<()> {
361 self.write_response(
362 key,
363 CachedResponseRef {
364 status: parts.status,
365 version: parts.version,
366 headers: &parts.headers,
367 digest,
368 policy,
369 },
370 )
371 .await
372 }
373
374 async fn put_with_body<B: HttpBody>(
375 &self,
376 key: &str,
377 parts: &Parts,
378 policy: &CachePolicy,
379 body: B,
380 ) -> Result<(Body<B>, String)> {
381 let temp_dir = self.temp_dir_path();
383 fs::create_dir_all(&temp_dir).with_context(|| {
384 format!(
385 "failed to create temporary directory `{path}`",
386 path = temp_dir.display()
387 )
388 })?;
389 let temp_path = tempfile::NamedTempFile::new_in(temp_dir)
390 .context("failed to create temporary body file for cache storage")?
391 .into_temp_path();
392
393 let mut body_file = runtime::File::create(&*temp_path).await.with_context(|| {
395 format!(
396 "failed to create temporary body file `{path}`",
397 path = temp_path.display()
398 )
399 })?;
400 let digest = Body::from_upstream(body)
401 .write_to(&mut body_file)
402 .await
403 .with_context(|| {
404 format!(
405 "failed to write to temporary body file `{path}`",
406 path = temp_path.display()
407 )
408 })?;
409
410 drop(body_file);
412
413 let content_path = self.content_path(&digest);
414 fs::create_dir_all(content_path.parent().expect("should have parent"))
415 .context("failed to create content directory")?;
416
417 temp_path.persist(&content_path).with_context(|| {
419 format!(
420 "failed to persist downloaded body to content path `{path}`",
421 path = content_path.display()
422 )
423 })?;
424
425 self.write_response(
427 key,
428 CachedResponseRef {
429 status: parts.status,
430 version: parts.version,
431 headers: &parts.headers,
432 digest: &digest,
433 policy,
434 },
435 )
436 .await?;
437
438 Ok((
439 Body::from_file(runtime::File::open(&content_path).await.with_context(|| {
440 format!(
441 "failed to open body content file `{path}`",
442 path = content_path.display()
443 )
444 })?)
445 .await?,
446 digest,
447 ))
448 }
449
450 async fn delete(&self, key: &str) -> Result<()> {
451 self.lock_response_exclusive(key).await?;
455 Ok(())
456 }
457
458 fn body_path(&self, digest: &str) -> PathBuf {
459 self.content_path(digest)
460 }
461}