file_backed 0.6.6

Provides types for managing collections of large objects, using an in-memory LRU cache backed by persistent storage (typically the filesystem).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
use std::{
    ops::{Deref, DerefMut},
    sync::Arc,
};

use consume_on_drop::{Consume, ConsumeOnDrop};
use cutoff_list::CutoffList;
use parking_lot::RwLock;
use tokio::{
    sync::{RwLockMappedWriteGuard, RwLockReadGuard},
    task::JoinHandle,
};

use uuid::Uuid;

mod entries;

pub mod backing_store;
pub mod convenience;
#[cfg(feature = "fbstore")]
pub mod fbstore;

use self::backing_store::{Strategy, TrackedPath};
use self::entries::{FullEntry, LimitedEntry};

pub use self::backing_store::{BackingStore, BackingStoreT};

/// A handle to data managed by an `FBPool`.
///
/// This acts like a `Box` but for potentially large data (`T`) that might
/// reside either in memory (in an LRU cache) or on disk (in the backing store) or
/// both.
/// Accessing the underlying data requires calling one of the `load` methods.
///
/// When dropped:
/// - If the item was never persisted or written to the backing store's temporary
///   location, it is simply dropped.
/// - If the item exists in the backing store's temporary location, a background
///   task is spawned via the `BackingStore` to delete it using `BackingStoreT::delete`.
// Field ordering is important to remove entries from the cutoff list when
// the last reference to the entry is dropped
pub struct Fb<T, B: BackingStoreT> {
    entry: FullEntry<T, B>,
    inner: FbInner<T, B>,
}

struct FbInner<T, B: BackingStoreT> {
    index: cutoff_list::Index,
    pool: Arc<FBPool<T, B>>,
}

impl<T, B: BackingStoreT> Drop for FbInner<T, B> {
    fn drop(&mut self) {
        let mut write_guard = self.pool.entries.write();
        write_guard.remove(self.index).unwrap();
    }
}

/// Manages a pool of `Fb` instances, backed by an in-memory cache
/// and a `BackingStore` for disk persistence.
///
/// ## Caching Strategy (Segmented LRU Variant)
///
/// The pool utilizes a variation of an LRU (Least Recently Used) cache designed
/// to reduce overhead for frequently accessed items. The cache is conceptually
/// divided into two segments (e.g., a "front half" and a "back half", typically
/// split evenly based on the total `mem_size`).
///
/// - **Promotion:** Items loaded from the backing store (`Strategy::load`) or
///   accessed while residing in the "back half" of the cache are promoted
///   to the most recently used position (the front of the "front half").
/// - **No Movement:** Items accessed while *already* in the "front half" do
///   **not** change position. This avoids the overhead of cache entry shuffling
///   for frequently hit "hot" items that are already near the front.
/// - **Insertion:** New items inserted via [`FBPool::insert`] also enter the
///   front of the "front half".
/// - **Eviction:** Items are eventually evicted from the least recently used
///   end of the "back half" when the cache is full.
///
/// This approach aims to provide LRU-like behavior while optimizing for workloads
/// where a subset of items is accessed very frequently.
///
/// Internally, we store each item in an `Option<T>`. When an item is evicted from cache,
/// we will replace `Some(val)` with None. Note that the size of `None::<T>`` on the
/// stack is the exact same as that of Some(val). What this means is that if T's resources
/// are primarily represented by its space on the stack (e.g. when `T` is `[f32; 4096]`),
/// there will be zero savings when it's removed from cache. In these cases, you should
/// use `Box<T>` instead.
pub struct FBPool<T, B: BackingStoreT> {
    entries: RwLock<CutoffList<LimitedEntry<T, B>>>,
    store: Arc<BackingStore<B>>,
}

impl<T, B: BackingStoreT> FBPool<T, B> {
    /// Creates a new pool managing items of type `T`.
    ///
    /// # Arguments
    /// * `store` - The configured `BackingStore` manager.
    /// * `mem_size` - The maximum number of items to keep loaded in the in-memory cache.
    ///   This size is divided internally (50/50) to implement the
    ///   two-segment caching strategy (see main [`FBPool`] documentation for details).
    pub fn new(store: Arc<BackingStore<B>>, mem_size: usize) -> Self {
        let entries = RwLock::new(CutoffList::new(vec![mem_size / 2, mem_size]));
        Self { entries, store }
    }

    /// Returns a reference to the underlying `BackingStore`.
    pub fn store(&self) -> &Arc<BackingStore<B>> {
        &self.store
    }

    /// Inserts new `data` into the pool, returning an `Fb` handle.
    ///
    /// The data is initially placed only in the in-memory LRU cache. It will only be
    /// written to the backing store's temporary location if it's evicted from the cache
    /// or explicitly written via`persist`/`blocking_persist`/`spawn_write_now`/`blocking_write_now`.
    ///
    /// Whenever the data is evicted from memory, after being written to the backing store
    /// with `B::store`, the data will be dropped normally, which means if there's a custom `Drop`
    /// implementation, it will be called. Each time the data is loaded back into memory, this
    /// could happen again if the data is evicted again.
    pub fn insert(self: &Arc<Self>, data: T) -> Fb<T, B>
    where
        T: Send + Sync + 'static,
        B: Strategy<T>,
    {
        let entry = FullEntry::new(data);
        let mut guard = self.entries.write();
        let index = guard.insert_first(entry.limited());
        let dump_entry = guard.get(guard.index_following_qth_cutoff(1));
        if let Some(entry) = dump_entry {
            entry.try_dump_to_disk(&self.store);
        }
        drop(guard);
        Fb {
            entry,
            inner: FbInner {
                index,
                pool: Arc::clone(self),
            },
        }
    }

    /// Asynchronously registers an existing item from a persistent path into the pool.
    ///
    /// Creates an `Fb` handle for an item identified by `key` located at the
    /// tracked persistent `path`. This typically involves calling `BackingStoreT::register`
    /// (e.g., hard-linking the file into the managed temporary area).
    ///
    /// The item data is *not* loaded into memory by this call.
    /// Returns `None` if the registration fails (e.g., the underlying store fails to find the key).
    pub async fn register(
        self: &Arc<Self>,
        path: &Arc<TrackedPath<B::PersistPath>>,
        key: Uuid,
    ) -> Option<Fb<T, B>>
    where
        T: Send + Sync + 'static,
    {
        let entry = FullEntry::register(key, &self.store, path).await?;
        let index = self.entries.write().insert_last(entry.limited());
        Some(Fb {
            entry,
            inner: FbInner {
                index,
                pool: Arc::clone(self),
            },
        })
    }

    /// Blocking version of `register`. Waits for the registration to complete.
    /// Must not be called from an async context that isn't allowed to block.
    pub fn blocking_register(
        self: &Arc<Self>,
        path: &TrackedPath<B::PersistPath>,
        key: Uuid,
    ) -> Option<Fb<T, B>> {
        let entry = FullEntry::blocking_register(key, &self.store, path)?;
        let index = self.entries.write().insert_last(entry.limited());
        Some(Fb {
            entry,
            inner: FbInner {
                index,
                pool: Arc::clone(self),
            },
        })
    }

    /// Returns the current number of items managed by the pool (both in memory and on disk).
    pub fn size(&self) -> usize {
        self.entries.read().len()
    }
}

impl<T, B: BackingStoreT> Fb<T, B> {
    /// Returns the unique identifier (`Uuid`) for the data associated with this handle.
    /// This key will change if the data is mutated via `try_load_mut` or `make_mut`.
    pub fn key(&self) -> Uuid {
        self.entry.key()
    }

    /// Returns a reference to the `FBPool` this `Fb` belongs to.
    pub fn pool(&self) -> &Arc<FBPool<T, B>> {
        &self.inner.pool
    }
}

impl<T: Send + Sync + 'static, B: Strategy<T>> Fb<T, B> {
    /// Asynchronously loads the data and returns a read guard.
    ///
    /// Returns a `Future` that resolves to a `ReadGuard` once the data is available
    /// in memory (either immediately or after loading from the backing store).
    /// Suitable for use within `async` functions and tasks.
    pub async fn load(&self) -> ReadGuard<'_, T, B> {
        // We do this _before_ loading the backing value so that if the caller
        // cancels the operation, we don't waste the work done to load it by
        // immediately dumping it back to disk.
        shift_forward(&self.inner.pool, self.inner.index);
        // Construct before loading so that if cancelled, the object will be dumped
        // if necessary (possible if a lot of things are loaded simultaneously).
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        let data_guard = self.entry.load(&self.inner.pool.store).await;
        ReadGuard {
            data_guard,
            _on_drop: on_drop,
        }
    }

    /// Attempts to load the data and return a read guard, returning None if the data is not
    /// already in memory or is currently being evicted.
    /// The entry will only be potentially shifted in the LRU cache on success.
    pub fn try_load(&self) -> Option<ReadGuard<'_, T, B>> {
        let guard = self.entry.try_load()?;
        shift_forward(&self.inner.pool, self.inner.index);
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        Some(ReadGuard {
            data_guard: guard,
            _on_drop: on_drop,
        })
    }

    /// Loads the data and returns a read guard, performing blocking I/O if necessary.
    ///
    /// - If the data is already in the memory cache, returns immediately.
    /// - If the data is not in memory, it performs a blocking load operation via
    ///   `Strategy::load`.
    ///
    /// This method should only be called from a context where blocking is acceptable
    /// (e.g., outside a Tokio runtime, or within `spawn_blocking` or `block_in_place`).
    pub fn blocking_load(&self) -> ReadGuard<'_, T, B> {
        shift_forward(&self.inner.pool, self.inner.index);
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        let data_guard = self.entry.blocking_load(&self.inner.pool.store);
        ReadGuard {
            data_guard,
            _on_drop: on_drop,
        }
    }

    /// Loads the data and returns a read guard for immutable access.
    ///
    /// - If the data is already in the memory cache, returns immediately.
    /// - If the data is not in memory, it uses `tokio::task::block_in_place` to
    ///   call `blocking_load` to load it from the backing store.
    ///
    /// This is for the somewhat niche situation where you need to load an FBArc in a
    /// blocking function nested many blocking calls deep within an async task running
    /// on a tokio multithreaded runtime. Ideally you would propagate async down and use
    /// `load` instead.
    ///
    /// # Panics
    /// This method will panic if called from within a `tokio::runtime::Runtime`
    /// created using `Runtime::new_current_thread`, as `block_in_place` is not
    /// supported there. Use `load` instead in async contexts and
    /// `blocking_load` in known blocking contexts.
    pub fn load_in_place(&self) -> ReadGuard<'_, T, B> {
        shift_forward(&self.inner.pool, self.inner.index);
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        let data_guard = self.entry.load_in_place(&self.inner.pool.store);
        ReadGuard {
            data_guard,
            _on_drop: on_drop,
        }
    }

    /// Asynchronously acquires mutable access to the data.
    ///
    /// On return:
    /// 1. The data is ensured to be in memory.
    /// 2. The corresponding file in the backing store's temporary location (if any) is deleted.
    /// 3. The internal `Uuid` key for this data is changed.
    /// 4. A `WriteGuard` providing mutable access is returned.
    pub async fn load_mut(&mut self) -> WriteGuard<'_, T, B> {
        // We do this _before_ loading the backing value so that if the caller
        // cancels the operation, we don't waste the work done to load it by
        // immediately dumping it back to disk.
        shift_forward(&self.inner.pool, self.inner.index);
        // Construct before loading so that if cancelled, the object will be dumped
        // if necessary (possible if a lot of things are loaded simultaneously).
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        let data_guard = self.entry.load_mut(&self.inner.pool.store).await;
        WriteGuard {
            data_guard,
            _on_drop: on_drop,
        }
    }

    /// Attempts to load the data and return a write guard, returning None if the data is not
    /// already in memory or is currently being evicted.
    /// The entry will only be potentially shifted in the LRU cache on success.
    pub fn try_load_mut(&mut self) -> Option<WriteGuard<'_, T, B>> {
        let guard = self.entry.try_load_mut()?;
        shift_forward(&self.inner.pool, self.inner.index);
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        Some(WriteGuard {
            data_guard: guard,
            _on_drop: on_drop,
        })
    }

    /// Blocking version of `load_mut`. Waits for the operation to complete.
    /// Must not be called from an async context that isn't allowed to block.
    pub fn blocking_load_mut(&mut self) -> WriteGuard<'_, T, B> {
        shift_forward(&self.inner.pool, self.inner.index);
        let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
        let data_guard = self.entry.blocking_load_mut(&self.inner.pool.store);
        WriteGuard {
            data_guard,
            _on_drop: on_drop,
        }
    }

    /// Spawns a background task to immediately write the data to the backing store's
    /// temporary location if it isn't already there.
    ///
    /// Acquires the read guard and then returns a `JoinHandle` that completes when the write
    /// operation finishes.
    pub async fn spawn_write_now(&self) -> JoinHandle<()> {
        self.entry.spawn_write_now(&self.inner.pool.store).await
    }

    /// Performs a blocking write of the data to the backing store's temporary location
    /// if it is isn't already there. Waits for the write to complete.
    /// Must not be called from an async context that isn't allowed to block.
    pub fn blocking_write_now(&self) {
        self.entry.blocking_write_now(&self.inner.pool.store);
    }

    /// Spawns a background task to persist the data to the specified `TrackedPath`.
    ///
    /// This calls `BackingStoreT::persist` (typically a hard-link). If the data
    /// is currently only in memory, it ensures it's written to the temporary
    /// location first before attempting persistence.
    /// If the data is already in the persistent location, this is a no-op.
    ///
    /// Acquires the read guard and then returns a `JoinHandle` that completes when the persistence
    /// operation finishes.
    pub async fn spawn_persist(&self, path: &Arc<TrackedPath<B::PersistPath>>) -> JoinHandle<()> {
        self.entry.spawn_persist(&self.inner.pool.store, path).await
    }

    /// Performs a blocking persistence of the data to the specified `TrackedPath`.
    /// Waits for the operation (including any preliminary writes) to complete.
    /// Must not be called from an async context that isn't allowed to block.
    pub fn blocking_persist(&self, path: &TrackedPath<B::PersistPath>) {
        self.entry.blocking_persist(&self.inner.pool.store, path)
    }
}

fn shift_forward<T: Send + Sync + 'static, B: Strategy<T>>(
    pool: &FBPool<T, B>,
    index: cutoff_list::Index,
) {
    let read_guard = pool.entries.read();
    let preceding_cutoffs = read_guard.preceding_cutoffs(index).unwrap();
    if preceding_cutoffs == 0 {
        return;
    }
    drop(read_guard);
    let mut write_guard = pool.entries.write();
    let preceding_cutoffs = write_guard.preceding_cutoffs(index).unwrap();
    if preceding_cutoffs == 0 {
        return;
    }
    write_guard.shift_to_front(index);
    if preceding_cutoffs == 1 {
        return;
    }
    assert!(preceding_cutoffs == 2);
    let read_guard = parking_lot::RwLockWriteGuard::downgrade(write_guard);
    let dump_entry = read_guard
        .get(read_guard.index_following_qth_cutoff(1))
        .unwrap();
    dump_entry.try_dump_to_disk(&pool.store);
}

/// An RAII guard providing immutable access (`Deref`) to the underlying data `T`.
///
/// While this guard is alive, the data is guaranteed to remain loaded in memory
/// and will not be immediately evicted if it leaves the LRU cache.
// Field ordering is important for try_dump_to_disk to succeed on drop
pub struct ReadGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
    data_guard: RwLockReadGuard<'a, T>,
    _on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
}

impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for ReadGuard<'_, T, B> {
    type Target = T;

    /// Dereferences to the immutable underlying data `T`.
    fn deref(&self) -> &Self::Target {
        &self.data_guard
    }
}

/// An RAII guard providing mutable access (`DerefMut`) to the underlying data `T`.
///
/// While this guard is alive, the data is guaranteed to remain loaded in memory.
// Field ordering is important for try_dump_to_disk to succeed on drop
pub struct WriteGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
    data_guard: RwLockMappedWriteGuard<'a, T>,
    _on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
}

impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for WriteGuard<'_, T, B> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.data_guard
    }
}

impl<T: Send + Sync + 'static, B: Strategy<T>> DerefMut for WriteGuard<'_, T, B> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.data_guard
    }
}

struct GuardDropper<'a, T: Send + Sync + 'static, B: Strategy<T>> {
    pool: &'a FBPool<T, B>,
    index: cutoff_list::Index,
}

impl<T: Send + Sync + 'static, B: Strategy<T>> Consume for GuardDropper<'_, T, B> {
    fn consume(self) {
        let entry_guard = self.pool.entries.read();
        let preceding_cutoffs = entry_guard.preceding_cutoffs(self.index).unwrap();
        assert!(preceding_cutoffs <= 2);
        if preceding_cutoffs == 2 {
            entry_guard
                .get(self.index)
                .unwrap()
                .try_dump_to_disk(&self.pool.store);
        }
    }
}

impl<'a, T: Send + Sync + 'static, B: Strategy<T>> GuardDropper<'a, T, B> {
    pub fn new(pool: &'a FBPool<T, B>, index: cutoff_list::Index) -> ConsumeOnDrop<Self> {
        ConsumeOnDrop::new(GuardDropper { pool, index })
    }
}