fuser_async/
cache.rs

1//! Caching utilities
2
3use std::hash::Hasher;
4use std::sync::atomic::AtomicU64;
5
6use rustc_hash::FxHasher;
7use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
8use tracing::*;
9
10#[derive(Debug, thiserror::Error)]
11pub enum CacheError<E> {
12    #[error("Cache with too few blocks ({0})")]
13    TooFewBlocks(usize),
14    #[error("Position out of bounds")]
15    OutOfBounds,
16    #[error("Unexpected empty block")]
17    UnexpectedEmpty,
18    #[error("Block insertion error: {0}")]
19    Insert(E),
20}
21
22/// Cached block
23pub struct Block {
24    pub data: bytes::BytesMut,
25    pub pos: Option<u64>,
26}
27/// Block cache trait
28#[async_trait::async_trait]
29pub trait DataBlockCache<InsertError>: Send + Sync + std::fmt::Display + Sized + 'static {
30    /// Create a new block cache with given maximal capacity, block size, and cumulative size of all
31    /// blocks.
32    fn new(
33        capacity_mb: u64,
34        block_size: u64,
35        total_size: u64,
36    ) -> Result<Self, CacheError<InsertError>>;
37    /// Get the block at a position.
38    async fn get(&self, pos: u64) -> Option<RwLockReadGuard<Block>>;
39    /// Insert a block at a position, locking until the output is ready.
40    async fn insert_lock<A: AsRef<[u8]> + Send>(
41        &self,
42        pos: u64,
43        f: impl std::future::Future<Output = Result<A, InsertError>> + Send,
44    ) -> Result<RwLockReadGuard<Block>, CacheError<InsertError>>;
45    /// Caching statistics.
46    fn stats(&self) -> &CacheStats;
47}
48
49/// Cache statistics
50#[derive(Default)]
51pub struct CacheStats {
52    pub capacity_blocks: usize,
53    pub capacity_mb: u64,
54    pub block_size: u64,
55    pub hits: AtomicU64,
56    pub misses: AtomicU64,
57}
58impl std::fmt::Display for CacheStats {
59    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
60        let hits = self.hits.load(std::sync::atomic::Ordering::SeqCst);
61        let misses = self.misses.load(std::sync::atomic::Ordering::SeqCst);
62        let misses_excluding = misses
63            .checked_sub(self.capacity_blocks as u64)
64            .unwrap_or_default();
65        write!(
66            f,
67            "Cache of capacity {} blocks of {:.2} MB (total {} MB). {} hits, {} hits excluding capacity",
68            self.capacity_blocks,
69            self.block_size as f64 / 1e6,
70            self.capacity_mb,
71            crate::utils::OutOf::new(hits, hits + misses).display_full(),
72            crate::utils::OutOf::new(hits, hits + misses_excluding).display_full(),
73        )
74    }
75}
76
77impl CacheStats {
78    pub fn add_hit(&self) {
79        self.hits.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
80    }
81    pub fn add_miss(&self) {
82        self.misses
83            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
84    }
85    pub fn remove_miss(&self) {
86        self.misses
87            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
88    }
89}
90
91/// Least Recently Used (LRU) cache, with one slot for every possible block, each protected by a
92/// [`RwLock`].
93pub struct LRUCache {
94    lru: Mutex<lru::LruCache<u64, ()>>,
95    data: Vec<RwLock<Option<Block>>>,
96    stats: CacheStats,
97}
98impl std::fmt::Display for LRUCache {
99    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
100        write!(f, "{}", self.stats)
101    }
102}
103
104#[async_trait::async_trait]
105impl<E> DataBlockCache<E> for LRUCache {
106    fn new(capacity_mb: u64, block_size: u64, total_size: u64) -> Result<Self, CacheError<E>> {
107        let capacity_blocks = (capacity_mb as f64 / block_size as f64 * 1e6).ceil() as usize;
108        let total_blocks = (total_size as f64 / block_size as f64).ceil() as usize;
109        debug!(
110            capacity_blocks,
111            capacity_mb, block_size, "Allocating LRU cache"
112        );
113        if capacity_blocks < 4 {
114            return Err(CacheError::TooFewBlocks(capacity_blocks));
115        }
116
117        Ok(Self {
118            stats: CacheStats {
119                capacity_blocks,
120                capacity_mb,
121                block_size,
122                ..Default::default()
123            },
124            data: (0..total_blocks).map(|_| None).map(RwLock::new).collect(),
125            lru: Mutex::new(lru::LruCache::<u64, ()>::unbounded()),
126        })
127    }
128    async fn get(&self, pos: u64) -> Option<RwLockReadGuard<Block>> {
129        assert!((pos as usize) < self.data.len());
130        {
131            let mut lru = self.lru.lock().await;
132            lru.get(&pos);
133        }
134        let data = self.data[pos as usize].read().await;
135        if data.is_some() {
136            self.stats.add_hit();
137            return Some(RwLockReadGuard::map(data, |x| x.as_ref().unwrap()));
138        }
139        self.stats.add_miss();
140        None
141    }
142    async fn insert_lock<A: AsRef<[u8]> + Send>(
143        &self,
144        pos: u64,
145        f: impl std::future::Future<Output = Result<A, E>> + Send,
146    ) -> Result<RwLockReadGuard<Block>, CacheError<E>> {
147        if (pos as usize) >= self.data.len() {
148            return Err(CacheError::OutOfBounds);
149        }
150        let mut block = self.data[pos as usize].write().await;
151        if block.is_some() {
152            self.stats.remove_miss();
153            return Ok(RwLockReadGuard::map(block.downgrade(), |f| {
154                f.as_ref().unwrap()
155            }));
156        }
157
158        let buf = f.await.map_err(|e| CacheError::Insert(e))?;
159        let buf = buf.as_ref();
160
161        let mut lru = self.lru.lock().await;
162        if lru.len() < self.stats.capacity_blocks {
163            lru.put(pos, ());
164            let mut data = bytes::BytesMut::with_capacity(self.stats.block_size as usize);
165            data.resize(buf.len(), 0);
166            data.copy_from_slice(buf);
167            *block = Some(Block { data, pos: None });
168        } else {
169            let (old, _) = lru.pop_lru().unwrap();
170            let mut block_old = self.data[old as usize].write().await;
171            lru.put(pos, ());
172            let mut block_old = block_old.take().ok_or(CacheError::UnexpectedEmpty)?;
173            block_old.data.clear();
174            block_old.data.resize(buf.len(), 0);
175            block_old.data.copy_from_slice(buf);
176
177            *block = Some(block_old);
178        }
179        return Ok(RwLockReadGuard::map(block.downgrade(), |f| {
180            f.as_ref().unwrap()
181        }));
182    }
183    fn stats(&self) -> &CacheStats {
184        &self.stats
185    }
186}
187
188/// Simple cache implemented with a [`Vec<RwLock<Block>>`], with the key determined by the position
189/// modulo the size.
190///
191/// Prone to collisions.
192pub struct IndexCache {
193    data: Vec<RwLock<Block>>,
194    collisions: AtomicU64,
195    pub stats: CacheStats,
196    use_hash: bool,
197}
198
199impl IndexCache {
200    fn key(&self, pos: u64) -> usize {
201        if self.use_hash {
202            pos as usize % self.stats.capacity_blocks
203        } else {
204            let mut hasher = FxHasher::default();
205            hasher.write_u64(pos);
206            hasher.finish() as usize % self.stats.capacity_blocks
207        }
208    }
209}
210impl std::fmt::Display for IndexCache {
211    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
212        let collisions = self.collisions.load(std::sync::atomic::Ordering::SeqCst);
213        write!(f, "{}, {} collisions", self.stats, collisions)
214    }
215}
216impl std::fmt::Debug for IndexCache {
217    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
218        write!(f, "{}", self)
219    }
220}
221
222#[async_trait::async_trait]
223impl<E> DataBlockCache<E> for IndexCache {
224    fn new(capacity_mb: u64, block_size: u64, _total_size: u64) -> Result<Self, CacheError<E>> {
225        let capacity_blocks = (capacity_mb as f64 / block_size as f64 * 1e6).ceil() as usize;
226        debug!(capacity_blocks, capacity_mb, "Allocating index cache",);
227        if capacity_blocks < 4 {
228            return Err(CacheError::TooFewBlocks(capacity_blocks));
229        }
230
231        Ok(Self {
232            stats: CacheStats {
233                capacity_mb,
234                capacity_blocks,
235                block_size,
236                ..Default::default()
237            },
238            data: (0..capacity_blocks)
239                .map(|_| bytes::BytesMut::zeroed(block_size as usize))
240                .map(|data| Block { pos: None, data })
241                .map(RwLock::new)
242                .collect(),
243            collisions: Default::default(),
244            use_hash: false,
245        })
246    }
247    fn stats(&self) -> &CacheStats {
248        &self.stats
249    }
250    async fn get(&self, pos: u64) -> Option<RwLockReadGuard<Block>> {
251        let data = self.data[self.key(pos)].read().await;
252        if data.pos == Some(pos) {
253            self.stats.add_hit();
254            return Some(data);
255        }
256        self.stats.add_miss();
257        None
258    }
259    async fn insert_lock<A: AsRef<[u8]> + Send>(
260        &self,
261        pos: u64,
262        f: impl std::future::Future<Output = Result<A, E>> + Send,
263    ) -> Result<RwLockReadGuard<Block>, CacheError<E>> {
264        let key = self.key(pos);
265        let mut block = self.data[key].write().await;
266        match block.pos {
267            Some(pos2) if pos2 == pos => {
268                self.stats.remove_miss();
269                return Ok(block.downgrade());
270            }
271            Some(_) => {
272                self.collisions
273                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
274            }
275            None => {}
276        }
277
278        block.pos = Some(pos);
279        let buf = f.await.map_err(|e| CacheError::Insert(e))?;
280        let buf = buf.as_ref();
281        block.data.resize(buf.len(), 0);
282        block.data.copy_from_slice(buf);
283        return Ok(block.downgrade());
284    }
285}