forceps 0.5.0

An easy-to-use async large file database/cache
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
mod builder;
pub use builder::CacheBuilder;

use crate::{ForcepError, MetaDb, Metadata, Result, mem_cache::MemCache};
use bytes::Bytes;
use sled::Db;
use std::io;
use std::path;
use std::result;
use tokio::fs as afs;

/// Creates a writeable and persistent temporary file in the path provided, returning the path and
/// file handle.
async fn tempfile(dir: &path::Path) -> Result<(afs::File, path::PathBuf)> {
    let tmppath = crate::tmp::tmppath_in(dir);
    let tmp = afs::OpenOptions::new()
        .write(true)
        .create(true)
        .truncate(true)
        .open(&tmppath)
        .await
        .map_err(ForcepError::Io)?;
    Ok((tmp, tmppath))
}

#[derive(Debug, Clone)]
struct Options {
    path: path::PathBuf,
    dir_depth: u8,
    track_access: bool,

    // maximum size of the in-memory lru in bytes
    lru_size: usize,

    // read and write buffer sizes
    rbuff_sz: usize,
    wbuff_sz: usize,
}

/// The main component of `forceps`, and  acts as the API for interacting with the on-disk cache.
///
/// This structure includes the async `read`, `write`, and `remove` operations which are the basic
/// operations of the cache. It also includes some misc functions to interact with metadata and
/// evict items from the cache.
///
/// # Eviction
///
/// This cache can evict items with a number of different eviction algorithms. To see more, see
/// [`evict_with`] and the [`evictors`] module.
///
/// # Memory Cache
///
/// An in-memory cache can be optionally enabled as a layer over the regular on-disk cache. The
/// memcache provides fast `HIT`s for recently used entries, circumventing filesystem operations
/// altogether. To enable, use the [`CacheBuilder`]`::memory_lru_max_size` method.
///
/// # Examples
///
/// ```rust
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// use forceps::Cache;
///
/// let cache = Cache::new("./cache")
///     .build()
///     .await
///     .unwrap();
/// # }
/// ```
///
/// [`evict_with`]: #method.evict_with
/// [`evictors`]: crate::evictors
/// [`CacheBuilder`]: crate::CacheBuilder
#[derive(Debug)]
pub struct Cache {
    meta: MetaDb,
    mem: MemCache,
    opts: Options,
}

impl Cache {
    /// Creates a new [`CacheBuilder`], which can be used to customize and create a [`Cache`]
    /// instance. This function is an alias for [`CacheBuilder::new`].
    ///
    /// The `path` supplied is the base directory of the cache instance.
    ///
    /// [`CacheBuilder`]: crate::CacheBuilder
    /// [`CacheBuilder::new`]: crate::CacheBuilder::new
    ///
    /// # Examples
    ///
    /// ```rust
    /// use forceps::Cache;
    ///
    /// let builder = Cache::new("./cache");
    /// // Use other methods for configuration
    /// ```
    #[inline]
    #[allow(clippy::new_ret_no_self)]
    pub fn new<P: AsRef<path::Path>>(path: P) -> CacheBuilder {
        CacheBuilder::new(path)
    }

    /// Creates a new Cache instance based on the CacheBuilder
    async fn create(opts: Options) -> Result<Self> {
        // create the base directory for the cache
        afs::create_dir_all(&opts.path)
            .await
            .map_err(ForcepError::Io)?;

        let mut meta_path = opts.path.clone();
        meta_path.push("index");
        Ok(Self {
            meta: MetaDb::new(&meta_path)?,
            mem: MemCache::new(opts.lru_size),
            opts,
        })
    }

    /// Gets a reference to the underlying meta database.
    pub fn get_meta_db_ref(&self) -> &Db {
        self.meta.get_db_ref()
    }

    /// Creates a PathBuf based on the key provided
    fn path_from_key(&self, key: &[u8]) -> path::PathBuf {
        let hex = hex::encode(key);
        let mut buf = self.opts.path.clone();

        // push segments of key as paths to the PathBuf. If the hex isn't long enough, then push
        // "__" instead.
        for n in (0..self.opts.dir_depth).map(|x| x as usize * 2) {
            let n_end = n + 2;
            buf.push(if n_end >= hex.len() {
                "__"
            } else {
                &hex[n..n_end]
            })
        }
        buf.push(&hex);
        buf
    }

    /// Tracks the access for a cache entry if the option is enabled
    #[inline]
    fn track_access_for(&self, k: &[u8], meta: Metadata) -> Result<()> {
        if self.opts.track_access {
            self.meta.track_access_for(k, Some(meta))?;
        }
        Ok(())
    }

    /// Reads an entry from the database, returning a vector of bytes that represent the entry.
    ///
    /// # Not Found
    ///
    /// If the entry is not found, then it will return
    /// `Err(`[`ForcepError::NotFound`]`)`.
    ///
    /// # Metadata
    ///
    /// This function will *not* perform a metadata read or write **unless** the `track_access`
    /// build option is set. If the option is set, then it will perform a blocking read/write to
    /// write new values to track the last access time and the total hits.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// use forceps::Cache;
    ///
    /// let cache = Cache::new("./cache")
    ///     .build()
    ///     .await
    ///     .unwrap();
    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
    ///
    /// let value = cache.read(b"MY_KEY").await.unwrap();
    /// assert_eq!(value.as_ref(), b"Hello World");
    /// # }
    /// ```
    pub async fn read<K: AsRef<[u8]>>(&self, key: K) -> Result<Bytes> {
        use tokio::io::AsyncReadExt;
        let k = key.as_ref();

        // read the metadata to reduce miss cost, since the metadata DB should generally fit in
        // memory (and also removes the need to read file metadata for a hit.)
        let meta = self.meta.get_metadata(k)?;

        // look in the memory cache to see if it's there and return if it is
        if let Some(val) = self.mem.get(k) {
            return self.track_access_for(k, meta).map(|_| val);
        }

        let file = {
            let path = self.path_from_key(k);
            afs::OpenOptions::new()
                .read(true)
                .open(&path)
                .await
                .map_err(|e| match e.kind() {
                    io::ErrorKind::NotFound => ForcepError::NotFound,
                    _ => ForcepError::Io(e),
                })?
        };

        // create a new buffer based on the estimated size of the file
        let mut buf = Vec::with_capacity(meta.get_size() as _);

        // read the entire file to the buffer
        tokio::io::BufReader::with_capacity(self.opts.rbuff_sz, file)
            .read_to_end(&mut buf)
            .await
            .map_err(ForcepError::Io)?;

        self.track_access_for(k, meta)?;
        let bytes = Bytes::from(buf);
        self.mem.put(k, Bytes::clone(&bytes));
        Ok(bytes)
    }

    /// Writes an entry with the specified key to the cache database. This will replace the
    /// previous entry if it exists, otherwise it will store a completely new one.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// use forceps::Cache;
    ///
    /// let cache = Cache::new("./cache")
    ///     .build()
    ///     .await
    ///     .unwrap();
    ///
    /// cache.write(b"MY_KEY", b"Hello World").await.unwrap();
    /// # }
    /// ```
    pub async fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
        &self,
        key: K,
        value: V,
    ) -> Result<Metadata> {
        use tokio::io::AsyncWriteExt;
        let key = key.as_ref();
        let value = value.as_ref();

        let (tmp, tmp_path) = tempfile(&self.opts.path).await?;
        // write all data to a temporary file to allow for atomic replacement and simultaneous reads.
        {
            let mut writer = tokio::io::BufWriter::with_capacity(self.opts.wbuff_sz, tmp);
            writer.write_all(value).await.map_err(ForcepError::Io)?;
            writer.flush().await.map_err(ForcepError::Io)?;
        }

        // move the temporary file to the final destination
        let final_path = self.path_from_key(key);
        if let Some(parent) = final_path.parent() {
            afs::create_dir_all(parent).await.map_err(ForcepError::Io)?;
        }
        afs::rename(&tmp_path, &final_path)
            .await
            .map_err(ForcepError::Io)?;

        if !self.mem.is_nil() {
            self.mem.put(key, Bytes::from(Vec::from(value)));
        }
        self.meta.insert_metadata_for(key, value)
    }

    /// Removes an entry from the cache, returning its [`Metadata`].
    ///
    /// This will remove the entry from both the main cache database and the metadata database.
    /// Please note that this will return `Error::NotFound` if either the main database *or* the
    /// meta database didn't find the entry.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// use forceps::Cache;
    ///
    /// let cache = Cache::new("./cache")
    ///     .build()
    ///     .await
    ///     .unwrap();
    ///
    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
    /// let metadata = cache.remove(b"MY_KEY").await.unwrap();
    /// assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
    /// # }
    /// ```
    pub async fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
        let key = key.as_ref();

        let cur_path = self.path_from_key(key);
        let tmp_path = crate::tmp::tmppath_in(&self.opts.path);

        // move then delete the file
        //
        // the purpose of moving then deleting is that file moves are much faster than file
        // deletes. if we were to delete in place, and another thread starts reading, it could
        // spell bad news.
        afs::rename(&cur_path, &tmp_path)
            .await
            .map_err(|e| match e.kind() {
                io::ErrorKind::NotFound => ForcepError::NotFound,
                _ => ForcepError::Io(e),
            })?;
        afs::remove_file(&tmp_path).await.map_err(ForcepError::Io)?;

        // remove the metadata for the entry
        self.meta.remove_metadata_for(key)
    }

    /// Queries the index database for metadata on the entry with the corresponding key.
    ///
    /// This will return the metadata for the associated key. For information about what metadata
    /// is stored, look at [`Metadata`].
    ///
    /// # Non-Async
    ///
    /// Note that this function is not an async call. This is because the backend database used,
    /// `sled`, is not async-compatible. However, these calls are instead very fast.
    ///
    /// # Not Found
    ///
    /// If the entry is not found, then it will return
    /// `Err(`[`Error::NotFound`](ForcepError::NotFound)`)`.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// use forceps::Cache;
    ///
    /// let cache = Cache::new("./cache")
    ///     .build()
    ///     .await
    ///     .unwrap();
    ///
    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
    /// let meta = cache.read_metadata(b"MY_KEY").unwrap();
    /// assert_eq!(meta.get_size(), b"Hello World".len() as u64);
    /// # }
    /// ```
    #[inline]
    pub fn read_metadata<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
        self.meta.get_metadata(key.as_ref())
    }

    /// An iterator over the entire metadata database, which provides metadata for every entry.
    ///
    /// This iterator provides every key in the database and the associated metadata for that key.
    /// This is *not* an iterator over the actual values of the database.
    ///
    /// # Non-Async
    ///
    /// Note that this function is not an async call. This is because the backend database used,
    /// `sled`, is not async-compatible. However, these calls are instead very fast.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    /// use forceps::Cache;
    ///
    /// let cache = Cache::new("./cache")
    ///     .build()
    ///     .await
    ///     .unwrap();
    ///
    /// # cache.write(b"MY_KEY", b"Hello World").await.unwrap();
    /// for result in cache.metadata_iter() {
    ///     let (key, meta) = result.unwrap();
    ///     println!("{}", String::from_utf8_lossy(&key))
    /// }
    /// # }
    /// ```
    #[inline]
    pub fn metadata_iter(&self) -> impl Iterator<Item = Result<(Vec<u8>, Metadata)>> {
        self.meta.metadata_iter()
    }

    /// Runs the specified eviction algorithm over this instance cache instance.
    ///
    /// Eviction algorithms will remove items out of the cache until certain a condition has been
    /// met, usually a size requirement. See the [`evictors`] module for more information and
    /// examples.
    ///
    /// [`evictors`]: crate::evictors
    #[inline]
    pub async fn evict_with<E>(&self, evictor: E) -> result::Result<u64, E::Err>
    where
        E: crate::evictors::Evictor,
    {
        evictor.evict(self).await
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::CacheBuilder;

    async fn default_cache() -> Cache {
        CacheBuilder::default().build().await.unwrap()
    }

    #[tokio::test]
    async fn short_path() {
        let cache = default_cache().await;
        cache.path_from_key(&[0xAA]);
        cache.path_from_key(&[0xAA, 0xBB]);
        cache.path_from_key(&[0xAA, 0xBB, 0xCC]);
    }

    #[tokio::test]
    async fn write_read_remove() {
        let cache = default_cache().await;

        cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
        let data = cache.read(&b"CACHE_KEY").await.unwrap();
        assert_eq!(data.as_ref(), b"Hello World");
        cache.remove(&b"CACHE_KEY").await.unwrap();
    }

    #[tokio::test]
    async fn tracking_test() {
        let cache = CacheBuilder::default()
            .track_access(true)
            .build()
            .await
            .unwrap();

        cache.write(b"CACHE_KEY", b"Hello World").await.unwrap();
        for _ in 0..100 {
            cache.read(b"CACHE_KEY").await.unwrap();
        }
        assert_eq!(cache.read_metadata(b"CACHE_KEY").unwrap().get_hits(), 100);
    }

    #[tokio::test]
    async fn read_metadata() {
        let cache = default_cache().await;

        cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
        let metadata = cache.read_metadata(&b"CACHE_KEY").unwrap();
        assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
    }
}