file_backed/
lib.rs

1use std::{
2    ops::{Deref, DerefMut},
3    sync::Arc,
4};
5
6use consume_on_drop::{Consume, ConsumeOnDrop};
7use cutoff_list::CutoffList;
8use parking_lot::RwLock;
9use tokio::sync::{RwLockMappedWriteGuard, RwLockReadGuard};
10
11use uuid::Uuid;
12
13mod entries;
14
15pub mod backing_store;
16pub mod convenience;
17#[cfg(feature = "fbstore")]
18pub mod fbstore;
19
20use self::backing_store::{Strategy, TrackedPath};
21use self::entries::{FullEntry, LimitedEntry};
22
23pub use self::backing_store::{BackingStore, BackingStoreT};
24
25/// A handle to data managed by an `FBPool`.
26///
27/// This acts like a `Box` but for potentially large data (`T`) that might
28/// reside either in memory (in an LRU cache) or on disk (in the backing store) or
29/// both.
30/// Accessing the underlying data requires calling one of the `load` methods.
31///
32/// When dropped:
33/// - If the item was never persisted or written to the backing store's temporary
34///   location, it is simply dropped.
35/// - If the item exists in the backing store's temporary location, a background
36///   task is spawned via the `BackingStore` to delete it using `BackingStoreT::delete`.
37// Field ordering is important to remove entries from the cutoff list when
38// the last reference to the entry is dropped
39pub struct Fb<T, B: BackingStoreT> {
40    entry: FullEntry<T, B>,
41    inner: FbInner<T, B>,
42}
43
44struct FbInner<T, B: BackingStoreT> {
45    index: cutoff_list::Index,
46    pool: Arc<FBPool<T, B>>,
47}
48
49impl<T, B: BackingStoreT> Drop for FbInner<T, B> {
50    fn drop(&mut self) {
51        let mut write_guard = self.pool.entries.write();
52        write_guard.remove(self.index).unwrap();
53    }
54}
55
56/// Manages a pool of `Fb` instances, backed by an in-memory cache
57/// and a `BackingStore` for disk persistence.
58///
59/// ## Caching Strategy (Segmented LRU Variant)
60///
61/// The pool utilizes a variation of an LRU (Least Recently Used) cache designed
62/// to reduce overhead for frequently accessed items. The cache is conceptually
63/// divided into two segments (e.g., a "front half" and a "back half", typically
64/// split evenly based on the total `mem_size`).
65///
66/// - **Promotion:** Items loaded from the backing store (`Strategy::load`) or
67///   accessed while residing in the "back half" of the cache are promoted
68///   to the most recently used position (the front of the "front half").
69/// - **No Movement:** Items accessed while *already* in the "front half" do
70///   **not** change position. This avoids the overhead of cache entry shuffling
71///   for frequently hit "hot" items that are already near the front.
72/// - **Insertion:** New items inserted via [`FBPool::insert`] also enter the
73///   front of the "front half".
74/// - **Eviction:** Items are eventually evicted from the least recently used
75///   end of the "back half" when the cache is full.
76///
77/// This approach aims to provide LRU-like behavior while optimizing for workloads
78/// where a subset of items is accessed very frequently.
79///
80/// Internally, we store each item in an `Option<T>`. When an item is evicted from cache,
81/// we will replace `Some(val)` with None. Note that the size of `None::<T>`` on the
82/// stack is the exact same as that of Some(val). What this means is that if T's resources
83/// are primarily represented by its space on the stack (e.g. when `T` is `[f32; 4096]`),
84/// there will be zero savings when it's removed from cache. In these cases, you should
85/// use Box<T> instead.
86pub struct FBPool<T, B: BackingStoreT> {
87    entries: RwLock<CutoffList<LimitedEntry<T, B>>>,
88    store: Arc<BackingStore<B>>,
89}
90
91impl<T, B: BackingStoreT> FBPool<T, B> {
92    /// Creates a new pool managing items of type `T`.
93    ///
94    /// # Arguments
95    /// * `store` - The configured `BackingStore` manager.
96    /// * `mem_size` - The maximum number of items to keep loaded in the in-memory cache.
97    ///   This size is divided internally (50/50) to implement the
98    ///   two-segment caching strategy (see main [`FBPool`] documentation for details).
99    pub fn new(store: Arc<BackingStore<B>>, mem_size: usize) -> Self {
100        let entries = RwLock::new(CutoffList::new(vec![mem_size / 2, mem_size]));
101        Self { entries, store }
102    }
103
104    /// Returns a reference to the underlying `BackingStore`.
105    pub fn store(&self) -> &Arc<BackingStore<B>> {
106        &self.store
107    }
108
109    /// Inserts new `data` into the pool, returning an `Fb` handle.
110    ///
111    /// The data is initially placed only in the in-memory LRU cache. It will only be
112    /// written to the backing store's temporary location if it's evicted from the cache
113    /// or explicitly written via`persist`/`blocking_persist`/`spawn_write_now`/`blocking_write_now`.
114    ///
115    /// Whenever the data is evicted from memory, after being written to the backing store
116    /// with `B::store`, the data will be dropped normally, which means if there's a custom `Drop`
117    /// implementation, it will be called. Each time the data is loaded back into memory, this
118    /// could happen again if the data is evicted again.
119    pub fn insert(self: &Arc<Self>, data: T) -> Fb<T, B>
120    where
121        T: Send + Sync + 'static,
122        B: Strategy<T>,
123    {
124        let entry = FullEntry::new(data);
125        let mut guard = self.entries.write();
126        let index = guard.insert_first(entry.limited());
127        let dump_entry = guard.get(guard.following_ind(1));
128        if let Some(entry) = dump_entry {
129            entry.try_dump_to_disk(&self.store);
130        }
131        drop(guard);
132        Fb {
133            entry,
134            inner: FbInner {
135                index,
136                pool: Arc::clone(self),
137            },
138        }
139    }
140
141    /// Asynchronously registers an existing item from a persistent path into the pool.
142    ///
143    /// Creates an `Fb` handle for an item identified by `key` located at the
144    /// tracked persistent `path`. This typically involves calling `BackingStoreT::register`
145    /// (e.g., hard-linking the file into the managed temporary area).
146    ///
147    /// The item data is *not* loaded into memory by this call.
148    /// Returns `None` if the registration fails (e.g., the underlying store fails to find the key).
149    pub async fn register(
150        self: &Arc<Self>,
151        path: &Arc<TrackedPath<B::PersistPath>>,
152        key: Uuid,
153    ) -> Option<Fb<T, B>>
154    where
155        T: Send + Sync + 'static,
156    {
157        let entry = FullEntry::register(key, &self.store, path).await?;
158        let index = self.entries.write().insert_last(entry.limited());
159        Some(Fb {
160            entry,
161            inner: FbInner {
162                index,
163                pool: Arc::clone(self),
164            },
165        })
166    }
167
168    /// Blocking version of `register`. Waits for the registration to complete.
169    /// Must not be called from an async context that isn't allowed to block.
170    pub fn blocking_register(
171        self: &Arc<Self>,
172        path: &TrackedPath<B::PersistPath>,
173        key: Uuid,
174    ) -> Option<Fb<T, B>> {
175        let entry = FullEntry::blocking_register(key, &self.store, path)?;
176        let index = self.entries.write().insert_last(entry.limited());
177        Some(Fb {
178            entry,
179            inner: FbInner {
180                index,
181                pool: Arc::clone(self),
182            },
183        })
184    }
185
186    /// Returns the current number of items managed by the pool (both in memory and on disk).
187    pub fn size(&self) -> usize {
188        self.entries.read().len()
189    }
190}
191
192impl<T, B: BackingStoreT> Fb<T, B> {
193    /// Returns the unique identifier (`Uuid`) for the data associated with this handle.
194    /// This key will change if the data is mutated via `try_load_mut` or `make_mut`.
195    pub fn key(&self) -> Uuid {
196        self.entry.key()
197    }
198
199    /// Returns a reference to the `FBPool` this `Fb` belongs to.
200    pub fn pool(&self) -> &Arc<FBPool<T, B>> {
201        &self.inner.pool
202    }
203}
204
205impl<T: Send + Sync + 'static, B: Strategy<T>> Fb<T, B> {
206    /// Asynchronously loads the data and returns a read guard.
207    ///
208    /// Returns a `Future` that resolves to a `ReadGuard` once the data is available
209    /// in memory (either immediately or after loading from the backing store).
210    /// Suitable for use within `async` functions and tasks.
211    pub async fn load(&self) -> ReadGuard<T, B> {
212        // We do this _before_ loading the backing value so that if the caller
213        // cancels the operation, we don't waste the work done to load it by
214        // immediately dumping it back to disk.
215        shift_forward(&self.inner.pool, self.inner.index);
216        // Construct before loading so that if cancelled, the object will be dumped
217        // if necessary (possible if a lot of things are loaded simultaneously).
218        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
219        let data_guard = self.entry.load(&self.inner.pool.store).await;
220        ReadGuard {
221            data_guard,
222            _on_drop: on_drop,
223        }
224    }
225
226    /// Attempts to load the data and return a read guard, returning None if the data is not
227    /// already in memory or is currently being evicted.
228    /// The entry will only be potentially shifted in the LRU cache on success.
229    pub fn try_load(&self) -> Option<ReadGuard<T, B>> {
230        let guard = self.entry.try_load()?;
231        shift_forward(&self.inner.pool, self.inner.index);
232        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
233        Some(ReadGuard {
234            data_guard: guard,
235            _on_drop: on_drop,
236        })
237    }
238
239    /// Loads the data and returns a read guard, performing blocking I/O if necessary.
240    ///
241    /// - If the data is already in the memory cache, returns immediately.
242    /// - If the data is not in memory, it performs a blocking load operation via
243    ///   `Strategy::load`.
244    ///
245    /// This method should only be called from a context where blocking is acceptable
246    /// (e.g., outside a Tokio runtime, or within `spawn_blocking` or `block_in_place`).
247    pub fn blocking_load(&self) -> ReadGuard<T, B> {
248        shift_forward(&self.inner.pool, self.inner.index);
249        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
250        let data_guard = self.entry.blocking_load(&self.inner.pool.store);
251        ReadGuard {
252            data_guard,
253            _on_drop: on_drop,
254        }
255    }
256
257    /// Loads the data and returns a read guard for immutable access.
258    ///
259    /// - If the data is already in the memory cache, returns immediately.
260    /// - If the data is not in memory, it uses `tokio::task::block_in_place` to
261    ///   call `blocking_load` to load it from the backing store.
262    ///
263    /// This is for the somewhat niche situation where you need to load an FBArc in a
264    /// blocking function nested many blocking calls deep within an async task running
265    /// on a tokio multithreaded runtime. Ideally you would propagate async down and use
266    /// `load` instead.
267    ///
268    /// # Panics
269    /// This method will panic if called from within a `tokio::runtime::Runtime`
270    /// created using `Runtime::new_current_thread`, as `block_in_place` is not
271    /// supported there. Use `load` instead in async contexts and
272    /// `blocking_load` in known blocking contexts.
273    pub fn load_in_place(&self) -> ReadGuard<T, B> {
274        shift_forward(&self.inner.pool, self.inner.index);
275        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
276        let data_guard = self.entry.load_in_place(&self.inner.pool.store);
277        ReadGuard {
278            data_guard,
279            _on_drop: on_drop,
280        }
281    }
282
283    /// Asynchronously acquires mutable access to the data.
284    ///
285    /// On return:
286    /// 1. The data is ensured to be in memory.
287    /// 2. The corresponding file in the backing store's temporary location (if any) is deleted.
288    /// 3. The internal `Uuid` key for this data is changed.
289    /// 4. A `WriteGuard` providing mutable access is returned.
290    pub async fn load_mut(&mut self) -> WriteGuard<T, B> {
291        // We do this _before_ loading the backing value so that if the caller
292        // cancels the operation, we don't waste the work done to load it by
293        // immediately dumping it back to disk.
294        shift_forward(&self.inner.pool, self.inner.index);
295        // Construct before loading so that if cancelled, the object will be dumped
296        // if necessary (possible if a lot of things are loaded simultaneously).
297        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
298        let data_guard = self.entry.load_mut(&self.inner.pool.store).await;
299        WriteGuard {
300            data_guard,
301            _on_drop: on_drop,
302        }
303    }
304
305    /// Attempts to load the data and return a write guard, returning None if the data is not
306    /// already in memory or is currently being evicted.
307    /// The entry will only be potentially shifted in the LRU cache on success.
308    pub fn try_load_mut(&mut self) -> Option<WriteGuard<T, B>> {
309        let guard = self.entry.try_load_mut()?;
310        shift_forward(&self.inner.pool, self.inner.index);
311        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
312        Some(WriteGuard {
313            data_guard: guard,
314            _on_drop: on_drop,
315        })
316    }
317
318    /// Blocking version of `load_mut`. Waits for the operation to complete.
319    /// Must not be called from an async context that isn't allowed to block.
320    pub fn blocking_load_mut(&mut self) -> WriteGuard<T, B> {
321        shift_forward(&self.inner.pool, self.inner.index);
322        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
323        let data_guard = self.entry.blocking_load_mut(&self.inner.pool.store);
324        WriteGuard {
325            data_guard,
326            _on_drop: on_drop,
327        }
328    }
329
330    /// Performs a blocking write of the data to the backing store's temporary location
331    /// if it is isn't already there. Waits for the write to complete.
332    /// Must not be called from an async context that isn't allowed to block.
333    pub fn blocking_write_now(&self) {
334        self.entry.blocking_write_now(&self.inner.pool.store);
335    }
336
337    /// Performs a blocking persistence of the data to the specified `TrackedPath`.
338    /// Waits for the operation (including any preliminary writes) to complete.
339    /// Must not be called from an async context that isn't allowed to block.
340    pub fn blocking_persist(&self, path: &TrackedPath<B::PersistPath>) {
341        self.entry.blocking_persist(&self.inner.pool.store, path)
342    }
343}
344
345fn shift_forward<T: Send + Sync + 'static, B: Strategy<T>>(
346    pool: &FBPool<T, B>,
347    index: cutoff_list::Index,
348) {
349    let read_guard = pool.entries.read();
350    let preceding_cutoffs = read_guard.preceding_cutoffs(index).unwrap();
351    if preceding_cutoffs == 0 {
352        return;
353    }
354    drop(read_guard);
355    let mut write_guard = pool.entries.write();
356    let preceding_cutoffs = write_guard.preceding_cutoffs(index).unwrap();
357    if preceding_cutoffs == 0 {
358        return;
359    }
360    write_guard.shift_to_front(index);
361    if preceding_cutoffs == 1 {
362        return;
363    }
364    assert!(preceding_cutoffs == 2);
365    let read_guard = parking_lot::RwLockWriteGuard::downgrade(write_guard);
366    let dump_entry = read_guard.get(read_guard.following_ind(1)).unwrap();
367    dump_entry.try_dump_to_disk(&pool.store);
368}
369
370/// An RAII guard providing immutable access (`Deref`) to the underlying data `T`.
371///
372/// While this guard is alive, the data is guaranteed to remain loaded in memory
373/// and will not be immediately evicted if it leaves the LRU cache.
374// Field ordering is important for try_dump_to_disk to succeed on drop
375pub struct ReadGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
376    data_guard: RwLockReadGuard<'a, T>,
377    _on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
378}
379
380impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for ReadGuard<'_, T, B> {
381    type Target = T;
382
383    /// Dereferences to the immutable underlying data `T`.
384    fn deref(&self) -> &Self::Target {
385        &self.data_guard
386    }
387}
388
389/// An RAII guard providing mutable access (`DerefMut`) to the underlying data `T`.
390///
391/// While this guard is alive, the data is guaranteed to remain loaded in memory.
392// Field ordering is important for try_dump_to_disk to succeed on drop
393pub struct WriteGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
394    data_guard: RwLockMappedWriteGuard<'a, T>,
395    _on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
396}
397
398impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for WriteGuard<'_, T, B> {
399    type Target = T;
400
401    fn deref(&self) -> &Self::Target {
402        &self.data_guard
403    }
404}
405
406impl<T: Send + Sync + 'static, B: Strategy<T>> DerefMut for WriteGuard<'_, T, B> {
407    fn deref_mut(&mut self) -> &mut Self::Target {
408        &mut self.data_guard
409    }
410}
411
412struct GuardDropper<'a, T: Send + Sync + 'static, B: Strategy<T>> {
413    pool: &'a FBPool<T, B>,
414    index: cutoff_list::Index,
415}
416
417impl<T: Send + Sync + 'static, B: Strategy<T>> Consume for GuardDropper<'_, T, B> {
418    fn consume(self) {
419        let entry_guard = self.pool.entries.read();
420        let preceding_cutoffs = entry_guard.preceding_cutoffs(self.index).unwrap();
421        assert!(preceding_cutoffs <= 2);
422        if preceding_cutoffs == 2 {
423            entry_guard
424                .get(self.index)
425                .unwrap()
426                .try_dump_to_disk(&self.pool.store);
427        }
428    }
429}
430
431impl<'a, T: Send + Sync + 'static, B: Strategy<T>> GuardDropper<'a, T, B> {
432    pub fn new(pool: &'a FBPool<T, B>, index: cutoff_list::Index) -> ConsumeOnDrop<Self> {
433        ConsumeOnDrop::new(GuardDropper { pool, index })
434    }
435}