pingora_cache/
hashtable.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Concurrent hash tables and LRUs
16
17use lru::LruCache;
18use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
19use std::collections::HashMap;
20
21// There are probably off-the-shelf crates of this, DashMap?
22/// A hash table that shards to a constant number of tables to reduce lock contention
23#[derive(Debug)]
24pub struct ConcurrentHashTable<V, const N: usize> {
25    tables: [RwLock<HashMap<u128, V>>; N],
26}
27
28#[inline]
29fn get_shard(key: u128, n_shards: usize) -> usize {
30    (key % n_shards as u128) as usize
31}
32
33impl<V, const N: usize> ConcurrentHashTable<V, N>
34where
35    [RwLock<HashMap<u128, V>>; N]: Default,
36{
37    pub fn new() -> Self {
38        ConcurrentHashTable {
39            tables: Default::default(),
40        }
41    }
42    pub fn get(&self, key: u128) -> &RwLock<HashMap<u128, V>> {
43        &self.tables[get_shard(key, N)]
44    }
45
46    #[allow(dead_code)]
47    pub fn read(&self, key: u128) -> RwLockReadGuard<HashMap<u128, V>> {
48        self.get(key).read()
49    }
50
51    pub fn write(&self, key: u128) -> RwLockWriteGuard<HashMap<u128, V>> {
52        self.get(key).write()
53    }
54
55    pub fn for_each<F>(&self, mut f: F)
56    where
57        F: FnMut(&u128, &V),
58    {
59        for shard in &self.tables {
60            let guard = shard.read();
61            for (key, value) in guard.iter() {
62                f(key, value);
63            }
64        }
65    }
66
67    // TODO: work out the lifetimes to provide get/set directly
68}
69
70impl<V, const N: usize> Default for ConcurrentHashTable<V, N>
71where
72    [RwLock<HashMap<u128, V>>; N]: Default,
73{
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79#[doc(hidden)] // not need in public API
80pub struct LruShard<V>(RwLock<LruCache<u128, V>>);
81impl<V> Default for LruShard<V> {
82    fn default() -> Self {
83        // help satisfy default construction of arrays
84        LruShard(RwLock::new(LruCache::unbounded()))
85    }
86}
87
88/// Sharded concurrent data structure for LruCache
89pub struct ConcurrentLruCache<V, const N: usize> {
90    lrus: [LruShard<V>; N],
91}
92
93impl<V, const N: usize> ConcurrentLruCache<V, N>
94where
95    [LruShard<V>; N]: Default,
96{
97    pub fn new(shard_capacity: usize) -> Self {
98        use std::num::NonZeroUsize;
99        // safe, 1 != 0
100        const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
101        let mut cache = ConcurrentLruCache {
102            lrus: Default::default(),
103        };
104        for lru in &mut cache.lrus {
105            lru.0
106                .write()
107                .resize(shard_capacity.try_into().unwrap_or(ONE));
108        }
109        cache
110    }
111    pub fn get(&self, key: u128) -> &RwLock<LruCache<u128, V>> {
112        &self.lrus[get_shard(key, N)].0
113    }
114
115    #[allow(dead_code)]
116    pub fn read(&self, key: u128) -> RwLockReadGuard<LruCache<u128, V>> {
117        self.get(key).read()
118    }
119
120    pub fn write(&self, key: u128) -> RwLockWriteGuard<LruCache<u128, V>> {
121        self.get(key).write()
122    }
123
124    // TODO: work out the lifetimes to provide get/set directly
125}