kismet_cache/
sharded.rs

1//! A [`crate::sharded::Cache`] uses the same basic file-based second
2//! chance strategy as a [`crate::plain::Cache`].  However, while the
3//! simple plain cache is well suited to small caches (down to 2-3
4//! files, and up maybe one hundred), this sharded version can scale
5//! nearly arbitrarily high: each shard should have fewer than one
6//! hundred or so files, but there may be arbitrarily many shards (up
7//! to filesystem limits, since each shard is a subdirectory).
8//!
9//! A sharded cache directory consists of shard subdirectories (with
10//! name equal to the shard index printed as `%04x`), each of which
11//! contains the cached files for that shard, under their `key` name,
12//! and an optional `.kismet_temp` subdirectory for temporary files.
13//!
14//! This module is useful for lower level usage; in most cases, the
15//! [`crate::Cache`] is more convenient and just as efficient.  In
16//! particular, a `crate::sharded::Cache` *does not* invoke
17//! [`std::fs::File::sync_all`] or [`std::fs::File::sync_data`]: the
18//! caller should sync files before letting Kismet persist them in a
19//! directory, if necessary.
20//!
21//! The cache's contents will grow past its stated capacity, but
22//! should rarely reach more than twice that capacity, especially
23//! when the shard capacity is less than 128 files.
24use std::borrow::Cow;
25use std::fs::File;
26use std::io::Result;
27use std::path::Path;
28use std::path::PathBuf;
29use std::sync::atomic::AtomicU8;
30use std::sync::atomic::Ordering::Relaxed;
31use std::sync::Arc;
32
33use crate::cache_dir::CacheDir;
34use crate::multiplicative_hash::MultiplicativeHash;
35use crate::trigger::PeriodicTrigger;
36use crate::Key;
37use crate::KISMET_TEMPORARY_SUBDIRECTORY as TEMP_SUBDIR;
38
39/// We will aim to trigger maintenance at least `MAINTENANCE_SCALE`
40/// times per total capacity inserts or updates, and at least once per
41/// shard capacity inserts or updates.
42const MAINTENANCE_SCALE: usize = 2;
43
44/// These mixers must be the same for all processes that access the
45/// same sharded cache directory.  That's why we derive the parameters
46/// in a const function.
47const PRIMARY_MIXER: MultiplicativeHash =
48    MultiplicativeHash::new_keyed(b"kismet: primary shard mixer");
49
50const SECONDARY_MIXER: MultiplicativeHash =
51    MultiplicativeHash::new_keyed(b"kismet: secondary shard mixer");
52
53/// A sharded cache is a hash-sharded directory of cache
54/// subdirectories.  Each subdirectory is managed as an
55/// independent second chance cache directory.
56#[derive(Clone, Debug)]
57pub struct Cache {
58    // The current load (number of files) estimate for each shard.
59    load_estimates: Arc<[AtomicU8]>,
60    // The parent directory for each shard (cache subdirectory).
61    base_dir: PathBuf,
62    // Triggers periodic second chance maintenance.  It is set to the
63    // least (most frequent) period between ~1/2 the total capacity,
64    // and each shard's capacity.  Whenever the `trigger` fires, we
65    // will maintain two different shards: the one we just updated,
66    // and another randomly chosen shard.
67    trigger: PeriodicTrigger,
68    // Number of shards in the cache, at least 2.
69    num_shards: usize,
70    // Capacity for each shard (rounded up to an integer), at least 1.
71    shard_capacity: usize,
72}
73
74/// Converts a shard id to a subdirectory name.
75///
76/// We use a dot prefix because the resulting subdirectory names are
77/// guaranteed not to collide with "plain" cache filenames.  This
78/// means we can switch between the sharded and plain (unsharded)
79/// strategy for the same directory, without any chance of
80/// misinterpreted file name.
81#[inline]
82fn format_id(shard: usize) -> String {
83    format!(".kismet_{:04x}", shard)
84}
85
86/// We create short-lived Shard objects whenever we want to work with
87/// a given shard of the sharded cache dir.
88struct Shard {
89    id: usize,
90    shard_dir: PathBuf,
91    trigger: PeriodicTrigger,
92    capacity: usize,
93}
94
95impl Shard {
96    /// Returns a shard object for a new shard `id`.
97    fn replace_shard(self, id: usize) -> Shard {
98        let mut shard_dir = self.shard_dir;
99        shard_dir.pop();
100        shard_dir.push(&format_id(id));
101        Shard {
102            id,
103            shard_dir,
104            trigger: self.trigger,
105            capacity: self.capacity,
106        }
107    }
108
109    /// Returns whether the file `name` exists in this shard.
110    fn file_exists(&mut self, name: &str) -> bool {
111        self.shard_dir.push(name);
112        let result = std::fs::metadata(&self.shard_dir);
113        self.shard_dir.pop();
114
115        result.is_ok()
116    }
117}
118
119impl CacheDir for Shard {
120    #[inline]
121    fn temp_dir(&self) -> Cow<Path> {
122        let mut dir = self.shard_dir.clone();
123        dir.push(TEMP_SUBDIR);
124        Cow::from(dir)
125    }
126
127    #[inline]
128    fn base_dir(&self) -> Cow<Path> {
129        Cow::from(&self.shard_dir)
130    }
131
132    #[inline]
133    fn trigger(&self) -> &PeriodicTrigger {
134        &self.trigger
135    }
136
137    #[inline]
138    fn capacity(&self) -> usize {
139        self.capacity
140    }
141}
142
143impl Cache {
144    /// Returns a new cache for approximately `total_capacity` files,
145    /// stores in `num_shards` subdirectories of `base_dir`.
146    pub fn new(base_dir: PathBuf, mut num_shards: usize, mut total_capacity: usize) -> Cache {
147        // We assume at least two shards.
148        if num_shards < 2 {
149            num_shards = 2;
150        }
151
152        if total_capacity < num_shards {
153            total_capacity = num_shards;
154        }
155
156        let mut load_estimates = Vec::with_capacity(num_shards);
157        load_estimates.resize_with(num_shards, || AtomicU8::new(0));
158        let shard_capacity =
159            (total_capacity / num_shards) + ((total_capacity % num_shards) != 0) as usize;
160        let trigger =
161            PeriodicTrigger::new(shard_capacity.min(total_capacity / MAINTENANCE_SCALE) as u64);
162
163        Cache {
164            load_estimates: load_estimates.into_boxed_slice().into(),
165            base_dir,
166            trigger,
167            num_shards,
168            shard_capacity,
169        }
170    }
171
172    /// Returns a random shard id.
173    fn random_shard_id(&self) -> usize {
174        use rand::Rng;
175
176        rand::thread_rng().gen_range(0..self.num_shards)
177    }
178
179    /// Given shard ids `base` and `other`, returns a new shard id for
180    /// `other` such that `base` and `other` do not collide.
181    fn other_shard_id(&self, base: usize, mut other: usize) -> usize {
182        if base != other {
183            return other;
184        }
185
186        other += 1;
187        if other < self.num_shards {
188            other
189        } else {
190            0
191        }
192    }
193
194    /// Returns the two shard ids for `key`.
195    fn shard_ids(&self, key: Key) -> (usize, usize) {
196        // We can't assume the hash is well distributed, so mix it
197        // around a bit with a multiplicative hash.
198        let h1 = PRIMARY_MIXER.map(key.hash, self.num_shards);
199        let h2 = SECONDARY_MIXER.map(key.secondary_hash, self.num_shards);
200
201        // We do not apply a 2-left strategy because our load
202        // estimates can saturate.  When that happens, we want to
203        // revert to sharding based on `key.hash`.
204        (h1, self.other_shard_id(h1, h2))
205    }
206
207    /// Reorders two shard ids to return the least loaded first.
208    fn sort_by_load(&self, (h1, h2): (usize, usize)) -> (usize, usize) {
209        let load1 = self.load_estimates[h1].load(Relaxed) as usize;
210        let load2 = self.load_estimates[h2].load(Relaxed) as usize;
211
212        // Clamp loads at the shard capacity: when both shards are
213        // over the capacity, they're equally overloaded.  This also
214        // lets us revert to only using `key.hash` when at capacity.
215        let capacity = self.shard_capacity;
216        if load1.clamp(0, capacity) <= load2.clamp(0, capacity) {
217            (h1, h2)
218        } else {
219            (h2, h1)
220        }
221    }
222
223    /// Returns a shard object for the `shard_id`.
224    fn shard(&self, shard_id: usize) -> Shard {
225        let mut dir = self.base_dir.clone();
226        dir.push(&format_id(shard_id));
227        Shard {
228            id: shard_id,
229            shard_dir: dir,
230            trigger: self.trigger,
231            capacity: self.shard_capacity,
232        }
233    }
234
235    /// Returns a read-only file for `key` in the shard cache
236    /// directory if it exists, or None if there is no such file.
237    /// Fails with `ErrorKind::InvalidInput` if `key.name` is invalid
238    /// (empty, or starts with a dot or a forward or back slash).
239    ///
240    /// Implicitly "touches" the cached file if it exists.
241    pub fn get(&self, key: Key) -> Result<Option<File>> {
242        let (h1, h2) = self.shard_ids(key);
243        let shard = self.shard(h1);
244
245        if let Some(file) = shard.get(key.name)? {
246            Ok(Some(file))
247        } else {
248            shard.replace_shard(h2).get(key.name)
249        }
250    }
251
252    /// Returns a temporary directory suitable for temporary files
253    /// that will be published to the shard cache directory.
254    ///
255    /// When this temporary file will be published at a known `Key`,
256    /// populate `key` for improved behaviour.
257    pub fn temp_dir(&self, key: Option<Key>) -> Result<Cow<Path>> {
258        let shard_id = match key {
259            Some(key) => self.sort_by_load(self.shard_ids(key)).0,
260            None => self.random_shard_id(),
261        };
262        let shard = self.shard(shard_id);
263        if self.trigger.event() {
264            shard.cleanup_temp_directory()?;
265        }
266
267        Ok(Cow::from(shard.ensure_temp_dir()?.into_owned()))
268    }
269
270    /// Updates the load estimate for `shard_id` with the value
271    /// returned by `CacheDir::{set,put}`.
272    fn update_estimate(&self, shard_id: usize, update: Option<u64>) {
273        let target = &self.load_estimates[shard_id];
274        match update {
275            // If we have an updated estimate, overwrite what we have,
276            // and take the newly added file into account.
277            Some(remaining) => {
278                let update = remaining.clamp(0, u8::MAX as u64 - 1) as u8;
279                target.store(update + 1, Relaxed);
280            }
281            // Otherwise, increment by one with saturation.
282            None => {
283                let _ = target.fetch_update(Relaxed, Relaxed, |i| {
284                    if i < u8::MAX {
285                        Some(i + 1)
286                    } else {
287                        None
288                    }
289                });
290            }
291        };
292    }
293
294    /// Performs a second chance maintenance on `shard`.
295    fn force_maintain_shard(&self, shard: Shard) -> Result<()> {
296        let update = shard.maintain()?.clamp(0, u8::MAX as u64) as u8;
297        self.load_estimates[shard.id].store(update, Relaxed);
298        Ok(())
299    }
300
301    /// Performs a second chance maintenance on a randomly chosen shard
302    /// that is not `base`.
303    fn maintain_random_other_shard(&self, base: Shard) -> Result<()> {
304        let shard_id = self.other_shard_id(base.id, self.random_shard_id());
305        self.force_maintain_shard(base.replace_shard(shard_id))
306    }
307
308    /// Inserts or overwrites the file at `value` as `key` in the
309    /// sharded cache directory.  There may be two entries for the
310    /// same key with concurrent `set` or `put` calls.  Fails with
311    /// `ErrorKind::InvalidInput` if `key.name` is invalid (empty, or
312    /// starts with a dot or a forward or back slash).
313    ///
314    /// Always consumes the file at `value` on success; may consume it
315    /// on error.
316    pub fn set(&self, key: Key, value: &Path) -> Result<()> {
317        let (h1, h2) = self.sort_by_load(self.shard_ids(key));
318        let mut shard = self.shard(h2);
319
320        // If the file does not already exist in the secondary shard,
321        // use the primary.
322        if !shard.file_exists(key.name) {
323            shard = shard.replace_shard(h1);
324        }
325
326        let update = shard.set(key.name, value)?;
327        self.update_estimate(h1, update);
328
329        // If we performed maintenance on this shard, also maintain
330        // a second random shard: writes might be concentrated on a
331        // few shard, but we can still spread the love, if only to
332        // clean up temporary files.
333        if update.is_some() {
334            self.maintain_random_other_shard(shard)?;
335        } else if self.load_estimates[h1].load(Relaxed) as usize / 2 > self.shard_capacity {
336            // Otherwise, we can also force a maintenance for this
337            // shard if we're pretty sure it has grown much too big.
338            self.force_maintain_shard(shard)?;
339        }
340
341        Ok(())
342    }
343
344    /// Inserts the file at `value` as `key` in the cache directory if
345    /// there is no such cached entry already, or touches the cached
346    /// file if it already exists.  There may be two entries for the
347    /// same key with concurrent `set` or `put` calls.  Fails with
348    /// `ErrorKind::InvalidInput` if `key.name` is invalid (empty, or
349    /// starts with a dot or a forward or back slash).
350    ///
351    /// Always consumes the file at `value` on success; may consume it
352    /// on error.
353    pub fn put(&self, key: Key, value: &Path) -> Result<()> {
354        let (h1, h2) = self.sort_by_load(self.shard_ids(key));
355        let mut shard = self.shard(h2);
356
357        // If the file does not already exist in the secondary shard,
358        // use the primary.
359        if !shard.file_exists(key.name) {
360            shard = shard.replace_shard(h1);
361        }
362
363        let update = shard.put(key.name, value)?;
364        self.update_estimate(h1, update);
365
366        // If we performed maintenance on this shard, also maintain
367        // a second random shard.
368        if update.is_some() {
369            self.maintain_random_other_shard(shard)?;
370        } else if self.load_estimates[h1].load(Relaxed) as usize / 2 > self.shard_capacity {
371            self.force_maintain_shard(shard)?;
372        }
373
374        Ok(())
375    }
376
377    /// Marks the cached file `key` as newly used, if it exists.
378    /// Fails with `ErrorKind::InvalidInput` if `key.name` is invalid
379    /// (empty, or starts with a dot or a forward or back slash).
380    ///
381    /// Returns whether a file for `key` exists in the cache.
382    pub fn touch(&self, key: Key) -> Result<bool> {
383        let (h1, h2) = self.shard_ids(key);
384        let shard = self.shard(h1);
385
386        if shard.touch(key.name)? {
387            return Ok(true);
388        }
389
390        shard.replace_shard(h2).touch(key.name)
391    }
392}
393
394/// Put 200 files in a 3x3-file cache.  We should find at least 9, but
395/// at most 18 (2x the capacity), and their contents should match.
396#[test]
397fn smoke_test() {
398    use tempfile::NamedTempFile;
399    use test_dir::{DirBuilder, TestDir};
400
401    // The payload for file `i` is `PAYLOAD_MULTIPLIER * i`.
402    const PAYLOAD_MULTIPLIER: usize = 113;
403
404    let temp = TestDir::temp();
405    let cache = Cache::new(temp.path("."), 3, 9);
406
407    for i in 0..200 {
408        let name = format!("{}", i);
409
410        let temp_dir = cache.temp_dir(None).expect("temp_dir must succeed");
411        let tmp = NamedTempFile::new_in(temp_dir).expect("new temp file must succeed");
412        std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i))
413            .expect("write must succeed");
414        // It shouldn't matter if we PUT or SET.
415        if (i % 2) != 0 {
416            cache
417                .put(Key::new(&name, i as u64, i as u64 + 42), tmp.path())
418                .expect("put must succeed");
419        } else {
420            cache
421                .set(Key::new(&name, i as u64, i as u64 + 42), tmp.path())
422                .expect("set must succeed");
423        }
424    }
425
426    let present: usize = (0..200)
427        .map(|i| {
428            let name = format!("{}", i);
429            match cache
430                .get(Key::new(&name, i as u64, i as u64 + 42))
431                .expect("get must succeed")
432            {
433                Some(mut file) => {
434                    use std::io::Read;
435                    let mut buf = Vec::new();
436                    file.read_to_end(&mut buf).expect("read must succeed");
437                    assert_eq!(buf, format!("{}", PAYLOAD_MULTIPLIER * i).into_bytes());
438                    1
439                }
440                None => 0,
441            }
442        })
443        .sum();
444
445    assert!(present >= 9);
446    assert!(present <= 18);
447}
448
449/// Publish a file, make sure we can read it, then overwrite, and
450/// confirm that the new contents are visible.
451#[test]
452fn test_set() {
453    use std::io::{Read, Write};
454    use tempfile::NamedTempFile;
455    use test_dir::{DirBuilder, TestDir};
456
457    let temp = TestDir::temp();
458    let cache = Cache::new(temp.path("."), 0, 0);
459
460    {
461        let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
462            .expect("new temp file must succeed");
463        tmp.as_file().write_all(b"v1").expect("write must succeed");
464
465        cache
466            .set(Key::new("entry", 1, 2), tmp.path())
467            .expect("initial set must succeed");
468    }
469
470    {
471        let mut cached = cache
472            .get(Key::new("entry", 1, 2))
473            .expect("must succeed")
474            .expect("must be found");
475        let mut dst = Vec::new();
476        cached.read_to_end(&mut dst).expect("read must succeed");
477        assert_eq!(&dst, b"v1");
478    }
479
480    // Now overwrite; it should take.
481    {
482        let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
483            .expect("new temp file must succeed");
484        tmp.as_file().write_all(b"v2").expect("write must succeed");
485
486        cache
487            .set(Key::new("entry", 1, 2), tmp.path())
488            .expect("overwrite must succeed");
489    }
490
491    {
492        let mut cached = cache
493            .get(Key::new("entry", 1, 2))
494            .expect("must succeed")
495            .expect("must be found");
496        let mut dst = Vec::new();
497        cached.read_to_end(&mut dst).expect("read must succeed");
498        assert_eq!(&dst, b"v2");
499    }
500}
501
502/// Publish a file, fail to put a new one with different data, and
503/// confirm that the old contents are visible.
504#[test]
505fn test_put() {
506    use std::io::{Read, Write};
507    use tempfile::NamedTempFile;
508    use test_dir::{DirBuilder, TestDir};
509
510    let temp = TestDir::temp();
511    let cache = Cache::new(temp.path("."), 0, 0);
512
513    {
514        let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
515            .expect("new temp file must succeed");
516        tmp.as_file().write_all(b"v1").expect("write must succeed");
517
518        cache
519            .set(Key::new("entry", 1, 2), tmp.path())
520            .expect("initial set must succeed");
521    }
522
523    // Now put; it should not take.
524    {
525        let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
526            .expect("new temp file must succeed");
527        tmp.as_file().write_all(b"v2").expect("write must succeed");
528
529        cache
530            .put(Key::new("entry", 1, 2), tmp.path())
531            .expect("put must succeed");
532    }
533
534    {
535        let mut cached = cache
536            .get(Key::new("entry", 1, 2))
537            .expect("must succeed")
538            .expect("must be found");
539        let mut dst = Vec::new();
540        cached.read_to_end(&mut dst).expect("read must succeed");
541        assert_eq!(&dst, b"v1");
542    }
543}
544
545/// Put 2000 files in a 2x300-file cache, and keep touching the first.
546/// We should always find the first file, even after all that cleanup.
547#[test]
548fn test_touch() {
549    use std::io::Read;
550    use tempfile::NamedTempFile;
551    use test_dir::{DirBuilder, TestDir};
552
553    // The payload for file `i` is `PAYLOAD_MULTIPLIER * i`.
554    const PAYLOAD_MULTIPLIER: usize = 113;
555
556    let temp = TestDir::temp();
557    let cache = Cache::new(temp.path("."), 2, 600);
558
559    for i in 0..2000 {
560        // After the first write, we should find our file.
561        assert_eq!(
562            cache
563                .touch(Key::new("0", 0, 42))
564                .expect("touch must succeed"),
565            i > 0
566        );
567
568        let name = format!("{}", i);
569
570        let temp_dir = cache.temp_dir(None).expect("temp_dir must succeed");
571        let tmp = NamedTempFile::new_in(temp_dir).expect("new temp file must succeed");
572        std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i))
573            .expect("write must succeed");
574        cache
575            .put(Key::new(&name, i as u64, i as u64 + 42), tmp.path())
576            .expect("put must succeed");
577        if i == 0 {
578            // Make sure file "0" is measurably older than the others.
579            std::thread::sleep(std::time::Duration::from_secs(2));
580        }
581    }
582
583    let mut file = cache
584        .get(Key::new("0", 0, 42))
585        .expect("get must succeed")
586        .expect("file must be found");
587    let mut buf = Vec::new();
588    file.read_to_end(&mut buf).expect("read must succeed");
589    assert_eq!(buf, b"0");
590}