forceps/
cache.rs

1mod builder;
2pub use builder::CacheBuilder;
3
4use crate::{ForcepError, MetaDb, Metadata, Result, mem_cache::MemCache};
5use bytes::Bytes;
6use sled::Db;
7use std::io;
8use std::path;
9use std::result;
10use tokio::fs as afs;
11
12/// Creates a writeable and persistent temporary file in the path provided, returning the path and
13/// file handle.
14async fn tempfile(dir: &path::Path) -> Result<(afs::File, path::PathBuf)> {
15    let tmppath = crate::tmp::tmppath_in(dir);
16    let tmp = afs::OpenOptions::new()
17        .write(true)
18        .create(true)
19        .truncate(true)
20        .open(&tmppath)
21        .await
22        .map_err(ForcepError::Io)?;
23    Ok((tmp, tmppath))
24}
25
26#[derive(Debug, Clone)]
27struct Options {
28    path: path::PathBuf,
29    dir_depth: u8,
30    track_access: bool,
31
32    // maximum size of the in-memory lru in bytes
33    lru_size: usize,
34
35    // read and write buffer sizes
36    rbuff_sz: usize,
37    wbuff_sz: usize,
38}
39
40/// The main component of `forceps`, and  acts as the API for interacting with the on-disk cache.
41///
42/// This structure includes the async `read`, `write`, and `remove` operations which are the basic
43/// operations of the cache. It also includes some misc functions to interact with metadata and
44/// evict items from the cache.
45///
46/// # Eviction
47///
48/// This cache can evict items with a number of different eviction algorithms. To see more, see
49/// [`evict_with`] and the [`evictors`] module.
50///
51/// # Memory Cache
52///
53/// An in-memory cache can be optionally enabled as a layer over the regular on-disk cache. The
54/// memcache provides fast `HIT`s for recently used entries, circumventing filesystem operations
55/// altogether. To enable, use the [`CacheBuilder`]`::memory_lru_max_size` method.
56///
57/// # Examples
58///
59/// ```rust
60/// # #[tokio::main(flavor = "current_thread")]
61/// # async fn main() {
62/// use forceps::Cache;
63///
64/// let cache = Cache::new("./cache")
65///     .build()
66///     .await
67///     .unwrap();
68/// # }
69/// ```
70///
71/// [`evict_with`]: #method.evict_with
72/// [`evictors`]: crate::evictors
73/// [`CacheBuilder`]: crate::CacheBuilder
74#[derive(Debug)]
75pub struct Cache {
76    meta: MetaDb,
77    mem: MemCache,
78    opts: Options,
79}
80
81impl Cache {
82    /// Creates a new [`CacheBuilder`], which can be used to customize and create a [`Cache`]
83    /// instance. This function is an alias for [`CacheBuilder::new`].
84    ///
85    /// The `path` supplied is the base directory of the cache instance.
86    ///
87    /// [`CacheBuilder`]: crate::CacheBuilder
88    /// [`CacheBuilder::new`]: crate::CacheBuilder::new
89    ///
90    /// # Examples
91    ///
92    /// ```rust
93    /// use forceps::Cache;
94    ///
95    /// let builder = Cache::new("./cache");
96    /// // Use other methods for configuration
97    /// ```
98    #[inline]
99    #[allow(clippy::new_ret_no_self)]
100    pub fn new<P: AsRef<path::Path>>(path: P) -> CacheBuilder {
101        CacheBuilder::new(path)
102    }
103
104    /// Creates a new Cache instance based on the CacheBuilder
105    async fn create(opts: Options) -> Result<Self> {
106        // create the base directory for the cache
107        afs::create_dir_all(&opts.path)
108            .await
109            .map_err(ForcepError::Io)?;
110
111        let mut meta_path = opts.path.clone();
112        meta_path.push("index");
113        Ok(Self {
114            meta: MetaDb::new(&meta_path)?,
115            mem: MemCache::new(opts.lru_size),
116            opts,
117        })
118    }
119
120    /// Gets a reference to the underlying meta database.
121    pub fn get_meta_db_ref(&self) -> &Db {
122        self.meta.get_db_ref()
123    }
124
125    /// Creates a PathBuf based on the key provided
126    fn path_from_key(&self, key: &[u8]) -> path::PathBuf {
127        let hex = hex::encode(key);
128        let mut buf = self.opts.path.clone();
129
130        // push segments of key as paths to the PathBuf. If the hex isn't long enough, then push
131        // "__" instead.
132        for n in (0..self.opts.dir_depth).map(|x| x as usize * 2) {
133            let n_end = n + 2;
134            buf.push(if n_end >= hex.len() {
135                "__"
136            } else {
137                &hex[n..n_end]
138            })
139        }
140        buf.push(&hex);
141        buf
142    }
143
144    /// Tracks the access for a cache entry if the option is enabled
145    #[inline]
146    fn track_access_for(&self, k: &[u8], meta: Metadata) -> Result<()> {
147        if self.opts.track_access {
148            self.meta.track_access_for(k, Some(meta))?;
149        }
150        Ok(())
151    }
152
153    /// Reads an entry from the database, returning a vector of bytes that represent the entry.
154    ///
155    /// # Not Found
156    ///
157    /// If the entry is not found, then it will return
158    /// `Err(`[`ForcepError::NotFound`]`)`.
159    ///
160    /// # Metadata
161    ///
162    /// This function will *not* perform a metadata read or write **unless** the `track_access`
163    /// build option is set. If the option is set, then it will perform a blocking read/write to
164    /// write new values to track the last access time and the total hits.
165    ///
166    /// # Examples
167    ///
168    /// ```rust
169    /// # #[tokio::main(flavor = "current_thread")]
170    /// # async fn main() {
171    /// use forceps::Cache;
172    ///
173    /// let cache = Cache::new("./cache")
174    ///     .build()
175    ///     .await
176    ///     .unwrap();
177    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
178    ///
179    /// let value = cache.read(b"MY_KEY").await.unwrap();
180    /// assert_eq!(value.as_ref(), b"Hello World");
181    /// # }
182    /// ```
183    pub async fn read<K: AsRef<[u8]>>(&self, key: K) -> Result<Bytes> {
184        use tokio::io::AsyncReadExt;
185        let k = key.as_ref();
186
187        // read the metadata to reduce miss cost, since the metadata DB should generally fit in
188        // memory (and also removes the need to read file metadata for a hit.)
189        let meta = self.meta.get_metadata(k)?;
190
191        // look in the memory cache to see if it's there and return if it is
192        if let Some(val) = self.mem.get(k) {
193            return self.track_access_for(k, meta).map(|_| val);
194        }
195
196        let file = {
197            let path = self.path_from_key(k);
198            afs::OpenOptions::new()
199                .read(true)
200                .open(&path)
201                .await
202                .map_err(|e| match e.kind() {
203                    io::ErrorKind::NotFound => ForcepError::NotFound,
204                    _ => ForcepError::Io(e),
205                })?
206        };
207
208        // create a new buffer based on the estimated size of the file
209        let mut buf = Vec::with_capacity(meta.get_size() as _);
210
211        // read the entire file to the buffer
212        tokio::io::BufReader::with_capacity(self.opts.rbuff_sz, file)
213            .read_to_end(&mut buf)
214            .await
215            .map_err(ForcepError::Io)?;
216
217        self.track_access_for(k, meta)?;
218        let bytes = Bytes::from(buf);
219        self.mem.put(k, Bytes::clone(&bytes));
220        Ok(bytes)
221    }
222
223    /// Writes an entry with the specified key to the cache database. This will replace the
224    /// previous entry if it exists, otherwise it will store a completely new one.
225    ///
226    /// # Examples
227    ///
228    /// ```rust
229    /// # #[tokio::main(flavor = "current_thread")]
230    /// # async fn main() {
231    /// use forceps::Cache;
232    ///
233    /// let cache = Cache::new("./cache")
234    ///     .build()
235    ///     .await
236    ///     .unwrap();
237    ///
238    /// cache.write(b"MY_KEY", b"Hello World").await.unwrap();
239    /// # }
240    /// ```
241    pub async fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
242        &self,
243        key: K,
244        value: V,
245    ) -> Result<Metadata> {
246        use tokio::io::AsyncWriteExt;
247        let key = key.as_ref();
248        let value = value.as_ref();
249
250        let (tmp, tmp_path) = tempfile(&self.opts.path).await?;
251        // write all data to a temporary file to allow for atomic replacement and simultaneous reads.
252        {
253            let mut writer = tokio::io::BufWriter::with_capacity(self.opts.wbuff_sz, tmp);
254            writer.write_all(value).await.map_err(ForcepError::Io)?;
255            writer.flush().await.map_err(ForcepError::Io)?;
256        }
257
258        // move the temporary file to the final destination
259        let final_path = self.path_from_key(key);
260        if let Some(parent) = final_path.parent() {
261            afs::create_dir_all(parent).await.map_err(ForcepError::Io)?;
262        }
263        afs::rename(&tmp_path, &final_path)
264            .await
265            .map_err(ForcepError::Io)?;
266
267        if !self.mem.is_nil() {
268            self.mem.put(key, Bytes::from(Vec::from(value)));
269        }
270        self.meta.insert_metadata_for(key, value)
271    }
272
273    /// Removes an entry from the cache, returning its [`Metadata`].
274    ///
275    /// This will remove the entry from both the main cache database and the metadata database.
276    /// Please note that this will return `Error::NotFound` if either the main database *or* the
277    /// meta database didn't find the entry.
278    ///
279    /// # Examples
280    ///
281    /// ```rust
282    /// # #[tokio::main(flavor = "current_thread")]
283    /// # async fn main() {
284    /// use forceps::Cache;
285    ///
286    /// let cache = Cache::new("./cache")
287    ///     .build()
288    ///     .await
289    ///     .unwrap();
290    ///
291    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
292    /// let metadata = cache.remove(b"MY_KEY").await.unwrap();
293    /// assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
294    /// # }
295    /// ```
296    pub async fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
297        let key = key.as_ref();
298
299        let cur_path = self.path_from_key(key);
300        let tmp_path = crate::tmp::tmppath_in(&self.opts.path);
301
302        // move then delete the file
303        //
304        // the purpose of moving then deleting is that file moves are much faster than file
305        // deletes. if we were to delete in place, and another thread starts reading, it could
306        // spell bad news.
307        afs::rename(&cur_path, &tmp_path)
308            .await
309            .map_err(|e| match e.kind() {
310                io::ErrorKind::NotFound => ForcepError::NotFound,
311                _ => ForcepError::Io(e),
312            })?;
313        afs::remove_file(&tmp_path).await.map_err(ForcepError::Io)?;
314
315        // remove the metadata for the entry
316        self.meta.remove_metadata_for(key)
317    }
318
319    /// Queries the index database for metadata on the entry with the corresponding key.
320    ///
321    /// This will return the metadata for the associated key. For information about what metadata
322    /// is stored, look at [`Metadata`].
323    ///
324    /// # Non-Async
325    ///
326    /// Note that this function is not an async call. This is because the backend database used,
327    /// `sled`, is not async-compatible. However, these calls are instead very fast.
328    ///
329    /// # Not Found
330    ///
331    /// If the entry is not found, then it will return
332    /// `Err(`[`Error::NotFound`](ForcepError::NotFound)`)`.
333    ///
334    /// # Examples
335    ///
336    /// ```rust
337    /// # #[tokio::main(flavor = "current_thread")]
338    /// # async fn main() {
339    /// use forceps::Cache;
340    ///
341    /// let cache = Cache::new("./cache")
342    ///     .build()
343    ///     .await
344    ///     .unwrap();
345    ///
346    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
347    /// let meta = cache.read_metadata(b"MY_KEY").unwrap();
348    /// assert_eq!(meta.get_size(), b"Hello World".len() as u64);
349    /// # }
350    /// ```
351    #[inline]
352    pub fn read_metadata<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
353        self.meta.get_metadata(key.as_ref())
354    }
355
356    /// An iterator over the entire metadata database, which provides metadata for every entry.
357    ///
358    /// This iterator provides every key in the database and the associated metadata for that key.
359    /// This is *not* an iterator over the actual values of the database.
360    ///
361    /// # Non-Async
362    ///
363    /// Note that this function is not an async call. This is because the backend database used,
364    /// `sled`, is not async-compatible. However, these calls are instead very fast.
365    ///
366    /// # Examples
367    ///
368    /// ```rust
369    /// # #[tokio::main(flavor = "current_thread")]
370    /// # async fn main() {
371    /// use forceps::Cache;
372    ///
373    /// let cache = Cache::new("./cache")
374    ///     .build()
375    ///     .await
376    ///     .unwrap();
377    ///
378    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
379    /// for result in cache.metadata_iter() {
380    ///     let (key, meta) = result.unwrap();
381    ///     println!("{}", String::from_utf8_lossy(&key))
382    /// }
383    /// # }
384    /// ```
385    #[inline]
386    pub fn metadata_iter(&self) -> impl Iterator<Item = Result<(Vec<u8>, Metadata)>> {
387        self.meta.metadata_iter()
388    }
389
390    /// Runs the specified eviction algorithm over this instance cache instance.
391    ///
392    /// Eviction algorithms will remove items out of the cache until certain a condition has been
393    /// met, usually a size requirement. See the [`evictors`] module for more information and
394    /// examples.
395    ///
396    /// [`evictors`]: crate::evictors
397    #[inline]
398    pub async fn evict_with<E>(&self, evictor: E) -> result::Result<u64, E::Err>
399    where
400        E: crate::evictors::Evictor,
401    {
402        evictor.evict(self).await
403    }
404}
405
406#[cfg(test)]
407mod test {
408    use super::*;
409    use crate::CacheBuilder;
410
411    async fn default_cache() -> Cache {
412        CacheBuilder::default().build().await.unwrap()
413    }
414
415    #[tokio::test]
416    async fn short_path() {
417        let cache = default_cache().await;
418        cache.path_from_key(&[0xAA]);
419        cache.path_from_key(&[0xAA, 0xBB]);
420        cache.path_from_key(&[0xAA, 0xBB, 0xCC]);
421    }
422
423    #[tokio::test]
424    async fn write_read_remove() {
425        let cache = default_cache().await;
426
427        cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
428        let data = cache.read(&b"CACHE_KEY").await.unwrap();
429        assert_eq!(data.as_ref(), b"Hello World");
430        cache.remove(&b"CACHE_KEY").await.unwrap();
431    }
432
433    #[tokio::test]
434    async fn tracking_test() {
435        let cache = CacheBuilder::default()
436            .track_access(true)
437            .build()
438            .await
439            .unwrap();
440
441        cache.write(b"CACHE_KEY", b"Hello World").await.unwrap();
442        for _ in 0..100 {
443            cache.read(b"CACHE_KEY").await.unwrap();
444        }
445        assert_eq!(cache.read_metadata(b"CACHE_KEY").unwrap().get_hits(), 100);
446    }
447
448    #[tokio::test]
449    async fn read_metadata() {
450        let cache = default_cache().await;
451
452        cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
453        let metadata = cache.read_metadata(&b"CACHE_KEY").unwrap();
454        assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
455    }
456}