forceps/cache.rs
1mod builder;
2pub use builder::CacheBuilder;
3
4use crate::{ForcepError, MetaDb, Metadata, Result, mem_cache::MemCache};
5use bytes::Bytes;
6use sled::Db;
7use std::io;
8use std::path;
9use std::result;
10use tokio::fs as afs;
11
12/// Creates a writeable and persistent temporary file in the path provided, returning the path and
13/// file handle.
14async fn tempfile(dir: &path::Path) -> Result<(afs::File, path::PathBuf)> {
15 let tmppath = crate::tmp::tmppath_in(dir);
16 let tmp = afs::OpenOptions::new()
17 .write(true)
18 .create(true)
19 .truncate(true)
20 .open(&tmppath)
21 .await
22 .map_err(ForcepError::Io)?;
23 Ok((tmp, tmppath))
24}
25
26#[derive(Debug, Clone)]
27struct Options {
28 path: path::PathBuf,
29 dir_depth: u8,
30 track_access: bool,
31
32 // maximum size of the in-memory lru in bytes
33 lru_size: usize,
34
35 // read and write buffer sizes
36 rbuff_sz: usize,
37 wbuff_sz: usize,
38}
39
40/// The main component of `forceps`, and acts as the API for interacting with the on-disk cache.
41///
42/// This structure includes the async `read`, `write`, and `remove` operations which are the basic
43/// operations of the cache. It also includes some misc functions to interact with metadata and
44/// evict items from the cache.
45///
46/// # Eviction
47///
48/// This cache can evict items with a number of different eviction algorithms. To see more, see
49/// [`evict_with`] and the [`evictors`] module.
50///
51/// # Memory Cache
52///
53/// An in-memory cache can be optionally enabled as a layer over the regular on-disk cache. The
54/// memcache provides fast `HIT`s for recently used entries, circumventing filesystem operations
55/// altogether. To enable, use the [`CacheBuilder`]`::memory_lru_max_size` method.
56///
57/// # Examples
58///
59/// ```rust
60/// # #[tokio::main(flavor = "current_thread")]
61/// # async fn main() {
62/// use forceps::Cache;
63///
64/// let cache = Cache::new("./cache")
65/// .build()
66/// .await
67/// .unwrap();
68/// # }
69/// ```
70///
71/// [`evict_with`]: #method.evict_with
72/// [`evictors`]: crate::evictors
73/// [`CacheBuilder`]: crate::CacheBuilder
74#[derive(Debug)]
75pub struct Cache {
76 meta: MetaDb,
77 mem: MemCache,
78 opts: Options,
79}
80
81impl Cache {
82 /// Creates a new [`CacheBuilder`], which can be used to customize and create a [`Cache`]
83 /// instance. This function is an alias for [`CacheBuilder::new`].
84 ///
85 /// The `path` supplied is the base directory of the cache instance.
86 ///
87 /// [`CacheBuilder`]: crate::CacheBuilder
88 /// [`CacheBuilder::new`]: crate::CacheBuilder::new
89 ///
90 /// # Examples
91 ///
92 /// ```rust
93 /// use forceps::Cache;
94 ///
95 /// let builder = Cache::new("./cache");
96 /// // Use other methods for configuration
97 /// ```
98 #[inline]
99 #[allow(clippy::new_ret_no_self)]
100 pub fn new<P: AsRef<path::Path>>(path: P) -> CacheBuilder {
101 CacheBuilder::new(path)
102 }
103
104 /// Creates a new Cache instance based on the CacheBuilder
105 async fn create(opts: Options) -> Result<Self> {
106 // create the base directory for the cache
107 afs::create_dir_all(&opts.path)
108 .await
109 .map_err(ForcepError::Io)?;
110
111 let mut meta_path = opts.path.clone();
112 meta_path.push("index");
113 Ok(Self {
114 meta: MetaDb::new(&meta_path)?,
115 mem: MemCache::new(opts.lru_size),
116 opts,
117 })
118 }
119
120 /// Gets a reference to the underlying meta database.
121 pub fn get_meta_db_ref(&self) -> &Db {
122 self.meta.get_db_ref()
123 }
124
125 /// Creates a PathBuf based on the key provided
126 fn path_from_key(&self, key: &[u8]) -> path::PathBuf {
127 let hex = hex::encode(key);
128 let mut buf = self.opts.path.clone();
129
130 // push segments of key as paths to the PathBuf. If the hex isn't long enough, then push
131 // "__" instead.
132 for n in (0..self.opts.dir_depth).map(|x| x as usize * 2) {
133 let n_end = n + 2;
134 buf.push(if n_end >= hex.len() {
135 "__"
136 } else {
137 &hex[n..n_end]
138 })
139 }
140 buf.push(&hex);
141 buf
142 }
143
144 /// Tracks the access for a cache entry if the option is enabled
145 #[inline]
146 fn track_access_for(&self, k: &[u8], meta: Metadata) -> Result<()> {
147 if self.opts.track_access {
148 self.meta.track_access_for(k, Some(meta))?;
149 }
150 Ok(())
151 }
152
153 /// Reads an entry from the database, returning a vector of bytes that represent the entry.
154 ///
155 /// # Not Found
156 ///
157 /// If the entry is not found, then it will return
158 /// `Err(`[`ForcepError::NotFound`]`)`.
159 ///
160 /// # Metadata
161 ///
162 /// This function will *not* perform a metadata read or write **unless** the `track_access`
163 /// build option is set. If the option is set, then it will perform a blocking read/write to
164 /// write new values to track the last access time and the total hits.
165 ///
166 /// # Examples
167 ///
168 /// ```rust
169 /// # #[tokio::main(flavor = "current_thread")]
170 /// # async fn main() {
171 /// use forceps::Cache;
172 ///
173 /// let cache = Cache::new("./cache")
174 /// .build()
175 /// .await
176 /// .unwrap();
177 /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
178 ///
179 /// let value = cache.read(b"MY_KEY").await.unwrap();
180 /// assert_eq!(value.as_ref(), b"Hello World");
181 /// # }
182 /// ```
183 pub async fn read<K: AsRef<[u8]>>(&self, key: K) -> Result<Bytes> {
184 use tokio::io::AsyncReadExt;
185 let k = key.as_ref();
186
187 // read the metadata to reduce miss cost, since the metadata DB should generally fit in
188 // memory (and also removes the need to read file metadata for a hit.)
189 let meta = self.meta.get_metadata(k)?;
190
191 // look in the memory cache to see if it's there and return if it is
192 if let Some(val) = self.mem.get(k) {
193 return self.track_access_for(k, meta).map(|_| val);
194 }
195
196 let file = {
197 let path = self.path_from_key(k);
198 afs::OpenOptions::new()
199 .read(true)
200 .open(&path)
201 .await
202 .map_err(|e| match e.kind() {
203 io::ErrorKind::NotFound => ForcepError::NotFound,
204 _ => ForcepError::Io(e),
205 })?
206 };
207
208 // create a new buffer based on the estimated size of the file
209 let mut buf = Vec::with_capacity(meta.get_size() as _);
210
211 // read the entire file to the buffer
212 tokio::io::BufReader::with_capacity(self.opts.rbuff_sz, file)
213 .read_to_end(&mut buf)
214 .await
215 .map_err(ForcepError::Io)?;
216
217 self.track_access_for(k, meta)?;
218 let bytes = Bytes::from(buf);
219 self.mem.put(k, Bytes::clone(&bytes));
220 Ok(bytes)
221 }
222
223 /// Writes an entry with the specified key to the cache database. This will replace the
224 /// previous entry if it exists, otherwise it will store a completely new one.
225 ///
226 /// # Examples
227 ///
228 /// ```rust
229 /// # #[tokio::main(flavor = "current_thread")]
230 /// # async fn main() {
231 /// use forceps::Cache;
232 ///
233 /// let cache = Cache::new("./cache")
234 /// .build()
235 /// .await
236 /// .unwrap();
237 ///
238 /// cache.write(b"MY_KEY", b"Hello World").await.unwrap();
239 /// # }
240 /// ```
241 pub async fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
242 &self,
243 key: K,
244 value: V,
245 ) -> Result<Metadata> {
246 use tokio::io::AsyncWriteExt;
247 let key = key.as_ref();
248 let value = value.as_ref();
249
250 let (tmp, tmp_path) = tempfile(&self.opts.path).await?;
251 // write all data to a temporary file to allow for atomic replacement and simultaneous reads.
252 {
253 let mut writer = tokio::io::BufWriter::with_capacity(self.opts.wbuff_sz, tmp);
254 writer.write_all(value).await.map_err(ForcepError::Io)?;
255 writer.flush().await.map_err(ForcepError::Io)?;
256 }
257
258 // move the temporary file to the final destination
259 let final_path = self.path_from_key(key);
260 if let Some(parent) = final_path.parent() {
261 afs::create_dir_all(parent).await.map_err(ForcepError::Io)?;
262 }
263 afs::rename(&tmp_path, &final_path)
264 .await
265 .map_err(ForcepError::Io)?;
266
267 if !self.mem.is_nil() {
268 self.mem.put(key, Bytes::from(Vec::from(value)));
269 }
270 self.meta.insert_metadata_for(key, value)
271 }
272
273 /// Removes an entry from the cache, returning its [`Metadata`].
274 ///
275 /// This will remove the entry from both the main cache database and the metadata database.
276 /// Please note that this will return `Error::NotFound` if either the main database *or* the
277 /// meta database didn't find the entry.
278 ///
279 /// # Examples
280 ///
281 /// ```rust
282 /// # #[tokio::main(flavor = "current_thread")]
283 /// # async fn main() {
284 /// use forceps::Cache;
285 ///
286 /// let cache = Cache::new("./cache")
287 /// .build()
288 /// .await
289 /// .unwrap();
290 ///
291 /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
292 /// let metadata = cache.remove(b"MY_KEY").await.unwrap();
293 /// assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
294 /// # }
295 /// ```
296 pub async fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
297 let key = key.as_ref();
298
299 let cur_path = self.path_from_key(key);
300 let tmp_path = crate::tmp::tmppath_in(&self.opts.path);
301
302 // move then delete the file
303 //
304 // the purpose of moving then deleting is that file moves are much faster than file
305 // deletes. if we were to delete in place, and another thread starts reading, it could
306 // spell bad news.
307 afs::rename(&cur_path, &tmp_path)
308 .await
309 .map_err(|e| match e.kind() {
310 io::ErrorKind::NotFound => ForcepError::NotFound,
311 _ => ForcepError::Io(e),
312 })?;
313 afs::remove_file(&tmp_path).await.map_err(ForcepError::Io)?;
314
315 // remove the metadata for the entry
316 self.meta.remove_metadata_for(key)
317 }
318
319 /// Queries the index database for metadata on the entry with the corresponding key.
320 ///
321 /// This will return the metadata for the associated key. For information about what metadata
322 /// is stored, look at [`Metadata`].
323 ///
324 /// # Non-Async
325 ///
326 /// Note that this function is not an async call. This is because the backend database used,
327 /// `sled`, is not async-compatible. However, these calls are instead very fast.
328 ///
329 /// # Not Found
330 ///
331 /// If the entry is not found, then it will return
332 /// `Err(`[`Error::NotFound`](ForcepError::NotFound)`)`.
333 ///
334 /// # Examples
335 ///
336 /// ```rust
337 /// # #[tokio::main(flavor = "current_thread")]
338 /// # async fn main() {
339 /// use forceps::Cache;
340 ///
341 /// let cache = Cache::new("./cache")
342 /// .build()
343 /// .await
344 /// .unwrap();
345 ///
346 /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
347 /// let meta = cache.read_metadata(b"MY_KEY").unwrap();
348 /// assert_eq!(meta.get_size(), b"Hello World".len() as u64);
349 /// # }
350 /// ```
351 #[inline]
352 pub fn read_metadata<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
353 self.meta.get_metadata(key.as_ref())
354 }
355
356 /// An iterator over the entire metadata database, which provides metadata for every entry.
357 ///
358 /// This iterator provides every key in the database and the associated metadata for that key.
359 /// This is *not* an iterator over the actual values of the database.
360 ///
361 /// # Non-Async
362 ///
363 /// Note that this function is not an async call. This is because the backend database used,
364 /// `sled`, is not async-compatible. However, these calls are instead very fast.
365 ///
366 /// # Examples
367 ///
368 /// ```rust
369 /// # #[tokio::main(flavor = "current_thread")]
370 /// # async fn main() {
371 /// use forceps::Cache;
372 ///
373 /// let cache = Cache::new("./cache")
374 /// .build()
375 /// .await
376 /// .unwrap();
377 ///
378 /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
379 /// for result in cache.metadata_iter() {
380 /// let (key, meta) = result.unwrap();
381 /// println!("{}", String::from_utf8_lossy(&key))
382 /// }
383 /// # }
384 /// ```
385 #[inline]
386 pub fn metadata_iter(&self) -> impl Iterator<Item = Result<(Vec<u8>, Metadata)>> {
387 self.meta.metadata_iter()
388 }
389
390 /// Runs the specified eviction algorithm over this instance cache instance.
391 ///
392 /// Eviction algorithms will remove items out of the cache until certain a condition has been
393 /// met, usually a size requirement. See the [`evictors`] module for more information and
394 /// examples.
395 ///
396 /// [`evictors`]: crate::evictors
397 #[inline]
398 pub async fn evict_with<E>(&self, evictor: E) -> result::Result<u64, E::Err>
399 where
400 E: crate::evictors::Evictor,
401 {
402 evictor.evict(self).await
403 }
404}
405
406#[cfg(test)]
407mod test {
408 use super::*;
409 use crate::CacheBuilder;
410
411 async fn default_cache() -> Cache {
412 CacheBuilder::default().build().await.unwrap()
413 }
414
415 #[tokio::test]
416 async fn short_path() {
417 let cache = default_cache().await;
418 cache.path_from_key(&[0xAA]);
419 cache.path_from_key(&[0xAA, 0xBB]);
420 cache.path_from_key(&[0xAA, 0xBB, 0xCC]);
421 }
422
423 #[tokio::test]
424 async fn write_read_remove() {
425 let cache = default_cache().await;
426
427 cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
428 let data = cache.read(&b"CACHE_KEY").await.unwrap();
429 assert_eq!(data.as_ref(), b"Hello World");
430 cache.remove(&b"CACHE_KEY").await.unwrap();
431 }
432
433 #[tokio::test]
434 async fn tracking_test() {
435 let cache = CacheBuilder::default()
436 .track_access(true)
437 .build()
438 .await
439 .unwrap();
440
441 cache.write(b"CACHE_KEY", b"Hello World").await.unwrap();
442 for _ in 0..100 {
443 cache.read(b"CACHE_KEY").await.unwrap();
444 }
445 assert_eq!(cache.read_metadata(b"CACHE_KEY").unwrap().get_hits(), 100);
446 }
447
448 #[tokio::test]
449 async fn read_metadata() {
450 let cache = default_cache().await;
451
452 cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
453 let metadata = cache.read_metadata(&b"CACHE_KEY").unwrap();
454 assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
455 }
456}