file_backed/lib.rs
1use std::{
2 ops::{Deref, DerefMut},
3 sync::Arc,
4};
5
6use consume_on_drop::{Consume, ConsumeOnDrop};
7use cutoff_list::CutoffList;
8use parking_lot::RwLock;
9use tokio::sync::{RwLockMappedWriteGuard, RwLockReadGuard};
10
11use uuid::Uuid;
12
13mod entries;
14
15pub mod backing_store;
16pub mod convenience;
17#[cfg(feature = "fbstore")]
18pub mod fbstore;
19
20use self::backing_store::{Strategy, TrackedPath};
21use self::entries::{FullEntry, LimitedEntry};
22
23pub use self::backing_store::{BackingStore, BackingStoreT};
24
25/// A handle to data managed by an `FBPool`.
26///
27/// This acts like a `Box` but for potentially large data (`T`) that might
28/// reside either in memory (in an LRU cache) or on disk (in the backing store) or
29/// both.
30/// Accessing the underlying data requires calling one of the `load` methods.
31///
32/// When dropped:
33/// - If the item was never persisted or written to the backing store's temporary
34/// location, it is simply dropped.
35/// - If the item exists in the backing store's temporary location, a background
36/// task is spawned via the `BackingStore` to delete it using `BackingStoreT::delete`.
37// Field ordering is important to remove entries from the cutoff list when
38// the last reference to the entry is dropped
39pub struct Fb<T, B: BackingStoreT> {
40 entry: FullEntry<T, B>,
41 inner: FbInner<T, B>,
42}
43
44struct FbInner<T, B: BackingStoreT> {
45 index: cutoff_list::Index,
46 pool: Arc<FBPool<T, B>>,
47}
48
49impl<T, B: BackingStoreT> Drop for FbInner<T, B> {
50 fn drop(&mut self) {
51 let mut write_guard = self.pool.entries.write();
52 write_guard.remove(self.index).unwrap();
53 }
54}
55
56/// Manages a pool of `Fb` instances, backed by an in-memory cache
57/// and a `BackingStore` for disk persistence.
58///
59/// ## Caching Strategy (Segmented LRU Variant)
60///
61/// The pool utilizes a variation of an LRU (Least Recently Used) cache designed
62/// to reduce overhead for frequently accessed items. The cache is conceptually
63/// divided into two segments (e.g., a "front half" and a "back half", typically
64/// split evenly based on the total `mem_size`).
65///
66/// - **Promotion:** Items loaded from the backing store (`Strategy::load`) or
67/// accessed while residing in the "back half" of the cache are promoted
68/// to the most recently used position (the front of the "front half").
69/// - **No Movement:** Items accessed while *already* in the "front half" do
70/// **not** change position. This avoids the overhead of cache entry shuffling
71/// for frequently hit "hot" items that are already near the front.
72/// - **Insertion:** New items inserted via [`FBPool::insert`] also enter the
73/// front of the "front half".
74/// - **Eviction:** Items are eventually evicted from the least recently used
75/// end of the "back half" when the cache is full.
76///
77/// This approach aims to provide LRU-like behavior while optimizing for workloads
78/// where a subset of items is accessed very frequently.
79///
80/// Internally, we store each item in an `Option<T>`. When an item is evicted from cache,
81/// we will replace `Some(val)` with None. Note that the size of `None::<T>`` on the
82/// stack is the exact same as that of Some(val). What this means is that if T's resources
83/// are primarily represented by its space on the stack (e.g. when `T` is `[f32; 4096]`),
84/// there will be zero savings when it's removed from cache. In these cases, you should
85/// use Box<T> instead.
86pub struct FBPool<T, B: BackingStoreT> {
87 entries: RwLock<CutoffList<LimitedEntry<T, B>>>,
88 store: Arc<BackingStore<B>>,
89}
90
91impl<T, B: BackingStoreT> FBPool<T, B> {
92 /// Creates a new pool managing items of type `T`.
93 ///
94 /// # Arguments
95 /// * `store` - The configured `BackingStore` manager.
96 /// * `mem_size` - The maximum number of items to keep loaded in the in-memory cache.
97 /// This size is divided internally (50/50) to implement the
98 /// two-segment caching strategy (see main [`FBPool`] documentation for details).
99 pub fn new(store: Arc<BackingStore<B>>, mem_size: usize) -> Self {
100 let entries = RwLock::new(CutoffList::new(vec![mem_size / 2, mem_size]));
101 Self { entries, store }
102 }
103
104 /// Returns a reference to the underlying `BackingStore`.
105 pub fn store(&self) -> &Arc<BackingStore<B>> {
106 &self.store
107 }
108
109 /// Inserts new `data` into the pool, returning an `Fb` handle.
110 ///
111 /// The data is initially placed only in the in-memory LRU cache. It will only be
112 /// written to the backing store's temporary location if it's evicted from the cache
113 /// or explicitly written via`persist`/`blocking_persist`/`spawn_write_now`/`blocking_write_now`.
114 ///
115 /// Whenever the data is evicted from memory, after being written to the backing store
116 /// with `B::store`, the data will be dropped normally, which means if there's a custom `Drop`
117 /// implementation, it will be called. Each time the data is loaded back into memory, this
118 /// could happen again if the data is evicted again.
119 pub fn insert(self: &Arc<Self>, data: T) -> Fb<T, B>
120 where
121 T: Send + Sync + 'static,
122 B: Strategy<T>,
123 {
124 let entry = FullEntry::new(data);
125 let mut guard = self.entries.write();
126 let index = guard.insert_first(entry.limited());
127 let dump_entry = guard.get(guard.following_ind(1));
128 if let Some(entry) = dump_entry {
129 entry.try_dump_to_disk(&self.store);
130 }
131 drop(guard);
132 Fb {
133 entry,
134 inner: FbInner {
135 index,
136 pool: Arc::clone(self),
137 },
138 }
139 }
140
141 /// Asynchronously registers an existing item from a persistent path into the pool.
142 ///
143 /// Creates an `Fb` handle for an item identified by `key` located at the
144 /// tracked persistent `path`. This typically involves calling `BackingStoreT::register`
145 /// (e.g., hard-linking the file into the managed temporary area).
146 ///
147 /// The item data is *not* loaded into memory by this call.
148 /// Returns `None` if the registration fails (e.g., the underlying store fails to find the key).
149 pub async fn register(
150 self: &Arc<Self>,
151 path: &Arc<TrackedPath<B::PersistPath>>,
152 key: Uuid,
153 ) -> Option<Fb<T, B>>
154 where
155 T: Send + Sync + 'static,
156 {
157 let entry = FullEntry::register(key, &self.store, path).await?;
158 let index = self.entries.write().insert_last(entry.limited());
159 Some(Fb {
160 entry,
161 inner: FbInner {
162 index,
163 pool: Arc::clone(self),
164 },
165 })
166 }
167
168 /// Blocking version of `register`. Waits for the registration to complete.
169 /// Must not be called from an async context that isn't allowed to block.
170 pub fn blocking_register(
171 self: &Arc<Self>,
172 path: &TrackedPath<B::PersistPath>,
173 key: Uuid,
174 ) -> Option<Fb<T, B>> {
175 let entry = FullEntry::blocking_register(key, &self.store, path)?;
176 let index = self.entries.write().insert_last(entry.limited());
177 Some(Fb {
178 entry,
179 inner: FbInner {
180 index,
181 pool: Arc::clone(self),
182 },
183 })
184 }
185
186 /// Returns the current number of items managed by the pool (both in memory and on disk).
187 pub fn size(&self) -> usize {
188 self.entries.read().len()
189 }
190}
191
192impl<T, B: BackingStoreT> Fb<T, B> {
193 /// Returns the unique identifier (`Uuid`) for the data associated with this handle.
194 /// This key will change if the data is mutated via `try_load_mut` or `make_mut`.
195 pub fn key(&self) -> Uuid {
196 self.entry.key()
197 }
198
199 /// Returns a reference to the `FBPool` this `Fb` belongs to.
200 pub fn pool(&self) -> &Arc<FBPool<T, B>> {
201 &self.inner.pool
202 }
203}
204
205impl<T: Send + Sync + 'static, B: Strategy<T>> Fb<T, B> {
206 /// Asynchronously loads the data and returns a read guard.
207 ///
208 /// Returns a `Future` that resolves to a `ReadGuard` once the data is available
209 /// in memory (either immediately or after loading from the backing store).
210 /// Suitable for use within `async` functions and tasks.
211 pub async fn load(&self) -> ReadGuard<T, B> {
212 // We do this _before_ loading the backing value so that if the caller
213 // cancels the operation, we don't waste the work done to load it by
214 // immediately dumping it back to disk.
215 shift_forward(&self.inner.pool, self.inner.index);
216 // Construct before loading so that if cancelled, the object will be dumped
217 // if necessary (possible if a lot of things are loaded simultaneously).
218 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
219 let data_guard = self.entry.load(&self.inner.pool.store).await;
220 ReadGuard {
221 data_guard,
222 _on_drop: on_drop,
223 }
224 }
225
226 /// Attempts to load the data and return a read guard, returning None if the data is not
227 /// already in memory or is currently being evicted.
228 /// The entry will only be potentially shifted in the LRU cache on success.
229 pub fn try_load(&self) -> Option<ReadGuard<T, B>> {
230 let guard = self.entry.try_load()?;
231 shift_forward(&self.inner.pool, self.inner.index);
232 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
233 Some(ReadGuard {
234 data_guard: guard,
235 _on_drop: on_drop,
236 })
237 }
238
239 /// Loads the data and returns a read guard, performing blocking I/O if necessary.
240 ///
241 /// - If the data is already in the memory cache, returns immediately.
242 /// - If the data is not in memory, it performs a blocking load operation via
243 /// `Strategy::load`.
244 ///
245 /// This method should only be called from a context where blocking is acceptable
246 /// (e.g., outside a Tokio runtime, or within `spawn_blocking` or `block_in_place`).
247 pub fn blocking_load(&self) -> ReadGuard<T, B> {
248 shift_forward(&self.inner.pool, self.inner.index);
249 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
250 let data_guard = self.entry.blocking_load(&self.inner.pool.store);
251 ReadGuard {
252 data_guard,
253 _on_drop: on_drop,
254 }
255 }
256
257 /// Loads the data and returns a read guard for immutable access.
258 ///
259 /// - If the data is already in the memory cache, returns immediately.
260 /// - If the data is not in memory, it uses `tokio::task::block_in_place` to
261 /// call `blocking_load` to load it from the backing store.
262 ///
263 /// This is for the somewhat niche situation where you need to load an FBArc in a
264 /// blocking function nested many blocking calls deep within an async task running
265 /// on a tokio multithreaded runtime. Ideally you would propagate async down and use
266 /// `load` instead.
267 ///
268 /// # Panics
269 /// This method will panic if called from within a `tokio::runtime::Runtime`
270 /// created using `Runtime::new_current_thread`, as `block_in_place` is not
271 /// supported there. Use `load` instead in async contexts and
272 /// `blocking_load` in known blocking contexts.
273 pub fn load_in_place(&self) -> ReadGuard<T, B> {
274 shift_forward(&self.inner.pool, self.inner.index);
275 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
276 let data_guard = self.entry.load_in_place(&self.inner.pool.store);
277 ReadGuard {
278 data_guard,
279 _on_drop: on_drop,
280 }
281 }
282
283 /// Asynchronously acquires mutable access to the data.
284 ///
285 /// On return:
286 /// 1. The data is ensured to be in memory.
287 /// 2. The corresponding file in the backing store's temporary location (if any) is deleted.
288 /// 3. The internal `Uuid` key for this data is changed.
289 /// 4. A `WriteGuard` providing mutable access is returned.
290 pub async fn load_mut(&mut self) -> WriteGuard<T, B> {
291 // We do this _before_ loading the backing value so that if the caller
292 // cancels the operation, we don't waste the work done to load it by
293 // immediately dumping it back to disk.
294 shift_forward(&self.inner.pool, self.inner.index);
295 // Construct before loading so that if cancelled, the object will be dumped
296 // if necessary (possible if a lot of things are loaded simultaneously).
297 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
298 let data_guard = self.entry.load_mut(&self.inner.pool.store).await;
299 WriteGuard {
300 data_guard,
301 _on_drop: on_drop,
302 }
303 }
304
305 /// Attempts to load the data and return a write guard, returning None if the data is not
306 /// already in memory or is currently being evicted.
307 /// The entry will only be potentially shifted in the LRU cache on success.
308 pub fn try_load_mut(&mut self) -> Option<WriteGuard<T, B>> {
309 let guard = self.entry.try_load_mut()?;
310 shift_forward(&self.inner.pool, self.inner.index);
311 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
312 Some(WriteGuard {
313 data_guard: guard,
314 _on_drop: on_drop,
315 })
316 }
317
318 /// Blocking version of `load_mut`. Waits for the operation to complete.
319 /// Must not be called from an async context that isn't allowed to block.
320 pub fn blocking_load_mut(&mut self) -> WriteGuard<T, B> {
321 shift_forward(&self.inner.pool, self.inner.index);
322 let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
323 let data_guard = self.entry.blocking_load_mut(&self.inner.pool.store);
324 WriteGuard {
325 data_guard,
326 _on_drop: on_drop,
327 }
328 }
329
330 /// Performs a blocking write of the data to the backing store's temporary location
331 /// if it is isn't already there. Waits for the write to complete.
332 /// Must not be called from an async context that isn't allowed to block.
333 pub fn blocking_write_now(&self) {
334 self.entry.blocking_write_now(&self.inner.pool.store);
335 }
336
337 /// Performs a blocking persistence of the data to the specified `TrackedPath`.
338 /// Waits for the operation (including any preliminary writes) to complete.
339 /// Must not be called from an async context that isn't allowed to block.
340 pub fn blocking_persist(&self, path: &TrackedPath<B::PersistPath>) {
341 self.entry.blocking_persist(&self.inner.pool.store, path)
342 }
343}
344
345fn shift_forward<T: Send + Sync + 'static, B: Strategy<T>>(
346 pool: &FBPool<T, B>,
347 index: cutoff_list::Index,
348) {
349 let read_guard = pool.entries.read();
350 let preceding_cutoffs = read_guard.preceding_cutoffs(index).unwrap();
351 if preceding_cutoffs == 0 {
352 return;
353 }
354 drop(read_guard);
355 let mut write_guard = pool.entries.write();
356 let preceding_cutoffs = write_guard.preceding_cutoffs(index).unwrap();
357 if preceding_cutoffs == 0 {
358 return;
359 }
360 write_guard.shift_to_front(index);
361 if preceding_cutoffs == 1 {
362 return;
363 }
364 assert!(preceding_cutoffs == 2);
365 let read_guard = parking_lot::RwLockWriteGuard::downgrade(write_guard);
366 let dump_entry = read_guard.get(read_guard.following_ind(1)).unwrap();
367 dump_entry.try_dump_to_disk(&pool.store);
368}
369
370/// An RAII guard providing immutable access (`Deref`) to the underlying data `T`.
371///
372/// While this guard is alive, the data is guaranteed to remain loaded in memory
373/// and will not be immediately evicted if it leaves the LRU cache.
374// Field ordering is important for try_dump_to_disk to succeed on drop
375pub struct ReadGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
376 data_guard: RwLockReadGuard<'a, T>,
377 _on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
378}
379
380impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for ReadGuard<'_, T, B> {
381 type Target = T;
382
383 /// Dereferences to the immutable underlying data `T`.
384 fn deref(&self) -> &Self::Target {
385 &self.data_guard
386 }
387}
388
389/// An RAII guard providing mutable access (`DerefMut`) to the underlying data `T`.
390///
391/// While this guard is alive, the data is guaranteed to remain loaded in memory.
392// Field ordering is important for try_dump_to_disk to succeed on drop
393pub struct WriteGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
394 data_guard: RwLockMappedWriteGuard<'a, T>,
395 _on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
396}
397
398impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for WriteGuard<'_, T, B> {
399 type Target = T;
400
401 fn deref(&self) -> &Self::Target {
402 &self.data_guard
403 }
404}
405
406impl<T: Send + Sync + 'static, B: Strategy<T>> DerefMut for WriteGuard<'_, T, B> {
407 fn deref_mut(&mut self) -> &mut Self::Target {
408 &mut self.data_guard
409 }
410}
411
412struct GuardDropper<'a, T: Send + Sync + 'static, B: Strategy<T>> {
413 pool: &'a FBPool<T, B>,
414 index: cutoff_list::Index,
415}
416
417impl<T: Send + Sync + 'static, B: Strategy<T>> Consume for GuardDropper<'_, T, B> {
418 fn consume(self) {
419 let entry_guard = self.pool.entries.read();
420 let preceding_cutoffs = entry_guard.preceding_cutoffs(self.index).unwrap();
421 assert!(preceding_cutoffs <= 2);
422 if preceding_cutoffs == 2 {
423 entry_guard
424 .get(self.index)
425 .unwrap()
426 .try_dump_to_disk(&self.pool.store);
427 }
428 }
429}
430
431impl<'a, T: Send + Sync + 'static, B: Strategy<T>> GuardDropper<'a, T, B> {
432 pub fn new(pool: &'a FBPool<T, B>, index: cutoff_list::Index) -> ConsumeOnDrop<Self> {
433 ConsumeOnDrop::new(GuardDropper { pool, index })
434 }
435}