file_backed 0.6.7

Provides types for managing collections of large objects, using an in-memory LRU cache backed by persistent storage (typically the filesystem).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
use std::path::{Path, PathBuf};
use std::sync::Arc;

use redb::{Database, Durability, ReadableDatabase, ReadableTable, TableDefinition};
use uuid::Uuid;

use crate::backing_store::{BackingStoreT, Strategy};

const BLOBS: TableDefinition<&[u8; 16], &[u8]> = TableDefinition::new("file_backed_blobs");

/// An implementation of [`BackingStoreT`] and [`Strategy`] that stores blobs in
/// a redb embedded key-value database.
///
/// This backend is intended for values that are too small to deserve individual
/// files. It stores each item under its UUID bytes in a single redb table.
pub struct RedbStore<C> {
    codec: C,
    db: RedbPath,
}

/// Trait defining how to encode and decode a type `T` for byte-oriented stores.
pub trait BlobCodec<T>: Send + Sync + 'static {
    /// Encodes `data` into an owned byte vector.
    fn encode(&self, data: &T) -> anyhow::Result<Vec<u8>>;

    /// Decodes `data` from bytes.
    fn decode(&self, data: &[u8]) -> anyhow::Result<T>;
}

/// A [`BlobCodec`] implementation using the `bincode` crate.
///
/// Requires the `redb-bincodec` feature flag.
#[cfg(feature = "redb-bincodec")]
pub struct BinCodec;

#[cfg(feature = "redb-bincodec")]
impl<T: serde::Serialize + serde::de::DeserializeOwned> BlobCodec<T> for BinCodec {
    fn encode(&self, data: &T) -> anyhow::Result<Vec<u8>> {
        Ok(bincode::serde::encode_to_vec(
            data,
            bincode::config::legacy(),
        )?)
    }

    fn decode(&self, data: &[u8]) -> anyhow::Result<T> {
        Ok(bincode::serde::decode_from_slice(data, bincode::config::legacy())?.0)
    }
}

/// A [`BlobCodec`] implementation using the `prost` crate.
///
/// Requires the `redb-prostcodec` feature flag.
#[cfg(feature = "redb-prostcodec")]
pub struct ProstCodec;

#[cfg(feature = "redb-prostcodec")]
impl<T: Default + prost::Message> BlobCodec<T> for ProstCodec {
    fn encode(&self, data: &T) -> anyhow::Result<Vec<u8>> {
        Ok(data.encode_to_vec())
    }

    fn decode(&self, data: &[u8]) -> anyhow::Result<T> {
        Ok(T::decode(data)?)
    }
}

/// A prepared redb database path.
///
/// We expect exclusive access to this database file for writes. Multiple
/// [`RedbPath`] clones share the same opened database handle.
#[derive(Clone)]
pub struct RedbPath {
    path: Arc<PathBuf>,
    db: Arc<Database>,
}

impl RedbPath {
    /// Opens the specified redb database file, creating it if needed, and ensures
    /// the blob table exists.
    pub fn new(path: PathBuf) -> Self {
        let db = Database::create(&path).unwrap_or_else(|err| {
            panic!("Failed to open redb database {}: {err:?}", path.display())
        });
        Self::from_db(path, db)
    }

    /// Opens the specified redb database file with a configured cache size, creating it
    /// if needed, and ensures the blob table exists.
    pub fn new_with_cache_size(path: PathBuf, cache_size_bytes: usize) -> Self {
        let db = Database::builder()
            .set_cache_size(cache_size_bytes)
            .create(&path)
            .unwrap_or_else(|err| {
                panic!("Failed to open redb database {}: {err:?}", path.display())
            });
        Self::from_db(path, db)
    }

    fn from_db(path: PathBuf, db: Database) -> Self {
        let this = Self {
            path: Arc::new(path),
            db: Arc::new(db),
        };
        this.ensure_table();
        this
    }

    /// Returns the database file path.
    pub fn path(&self) -> &Path {
        &self.path
    }

    fn ensure_table(&self) {
        let write_txn = self.db.begin_write().unwrap_or_else(|err| {
            panic!(
                "Failed to begin redb write transaction for {}: {err:?}",
                self.path.display()
            )
        });
        {
            write_txn.open_table(BLOBS).unwrap_or_else(|err| {
                panic!(
                    "Failed to open redb table in {}: {err:?}",
                    self.path.display()
                )
            });
        }
        write_txn.commit().unwrap_or_else(|err| {
            panic!(
                "Failed to commit redb table initialization for {}: {err:?}",
                self.path.display()
            )
        });
    }
}

impl<C> RedbStore<C> {
    /// Creates a new redb-backed store.
    pub fn new(codec: C, db: RedbPath) -> Self {
        Self { codec, db }
    }
}

impl<C: Send + Sync + 'static> BackingStoreT for RedbStore<C> {
    type PersistPath = RedbPath;

    fn delete(&self, key: Uuid) {
        remove_key(&self.db, key, "temporary");
    }

    fn delete_persisted(&self, path: &Self::PersistPath, key: Uuid) {
        remove_key(path, key, "persisted");
    }

    fn register(&self, src_path: &Self::PersistPath, key: Uuid) {
        let bytes = read_bytes(src_path, key, "persisted");
        insert_bytes(&self.db, key, &bytes, "temporary");
    }

    fn persist(&self, dest_path: &Self::PersistPath, key: Uuid) {
        let bytes = read_bytes(&self.db, key, "temporary");
        insert_bytes(dest_path, key, &bytes, "persisted");
    }

    fn sanitize_path(&self, path: &Self::PersistPath) -> impl IntoIterator<Item = Uuid> {
        let read_txn = path.db.begin_read().unwrap_or_else(|err| {
            panic!(
                "Failed to begin redb read transaction for {}: {err:?}",
                path.path.display()
            )
        });
        let table = read_txn.open_table(BLOBS).unwrap_or_else(|err| {
            panic!(
                "Failed to open redb table in {}: {err:?}",
                path.path.display()
            )
        });
        table
            .iter()
            .unwrap_or_else(|err| {
                panic!(
                    "Failed to iterate redb table in {}: {err:?}",
                    path.path.display()
                )
            })
            .map(|entry| {
                let (key, _) = entry.unwrap_or_else(|err| {
                    panic!(
                        "Failed to read redb table entry in {}: {err:?}",
                        path.path.display()
                    )
                });
                Uuid::from_bytes(*key.value())
            })
            .collect::<Vec<_>>()
    }

    fn sync_persisted(&self, path: &Self::PersistPath) {
        let mut write_txn = path.db.begin_write().unwrap_or_else(|err| {
            panic!(
                "Failed to begin redb sync transaction for {}: {err:?}",
                path.path.display()
            )
        });
        write_txn
            .set_durability(Durability::Immediate)
            .unwrap_or_else(|err| {
                panic!(
                    "Failed to set redb sync durability for {}: {err:?}",
                    path.path.display()
                )
            });
        write_txn.commit().unwrap_or_else(|err| {
            panic!(
                "Failed to commit redb sync transaction for {}: {err:?}",
                path.path.display()
            )
        });
    }
}

impl<T, C: BlobCodec<T>> Strategy<T> for RedbStore<C> {
    fn store(&self, key: Uuid, data: &T) {
        let bytes = self.codec.encode(data).unwrap_or_else(|err| {
            panic!("Failed to encode data for redb key {key}: {err:?}");
        });
        insert_bytes(&self.db, key, &bytes, "temporary");
    }

    fn load(&self, key: Uuid) -> T {
        let bytes = read_bytes(&self.db, key, "temporary");
        self.codec.decode(&bytes).unwrap_or_else(|err| {
            panic!("Failed to decode data for redb key {key}: {err:?}");
        })
    }
}

pub fn read_blob(path: &RedbPath, key: Uuid) -> Vec<u8> {
    read_bytes(path, key, "persisted")
}

fn read_bytes(path: &RedbPath, key: Uuid, label: &str) -> Vec<u8> {
    let read_txn = path.db.begin_read().unwrap_or_else(|err| {
        panic!(
            "Failed to begin redb read transaction for {} store {}: {err:?}",
            label,
            path.path.display()
        )
    });
    let table = read_txn.open_table(BLOBS).unwrap_or_else(|err| {
        panic!(
            "Failed to open redb table for {} store {}: {err:?}",
            label,
            path.path.display()
        )
    });
    table
        .get(key.as_bytes())
        .unwrap_or_else(|err| {
            panic!(
                "Failed to read redb key {} from {} store {}: {err:?}",
                key,
                label,
                path.path.display()
            )
        })
        .unwrap_or_else(|| {
            panic!(
                "Attempted to read missing redb key {} from {} store {}",
                key,
                label,
                path.path.display()
            )
        })
        .value()
        .to_vec()
}

fn insert_bytes(path: &RedbPath, key: Uuid, bytes: &[u8], label: &str) {
    let mut write_txn = path.db.begin_write().unwrap_or_else(|err| {
        panic!(
            "Failed to begin redb write transaction for {} store {}: {err:?}",
            label,
            path.path.display()
        )
    });
    write_txn
        .set_durability(Durability::None)
        .unwrap_or_else(|err| {
            panic!(
                "Failed to set redb durability for {} store {}: {err:?}",
                label,
                path.path.display()
            )
        });
    {
        let mut table = write_txn.open_table(BLOBS).unwrap_or_else(|err| {
            panic!(
                "Failed to open redb table for {} store {}: {err:?}",
                label,
                path.path.display()
            )
        });
        let old = table.insert(key.as_bytes(), bytes).unwrap_or_else(|err| {
            panic!(
                "Failed to insert redb key {} into {} store {}: {err:?}",
                key,
                label,
                path.path.display()
            )
        });
        assert!(
            old.is_none(),
            "Attempted to overwrite existing redb key {} in {} store {}",
            key,
            label,
            path.path.display()
        );
    }
    write_txn.commit().unwrap_or_else(|err| {
        panic!(
            "Failed to commit redb insert for {} store {}: {err:?}",
            label,
            path.path.display()
        )
    });
}

fn remove_key(path: &RedbPath, key: Uuid, label: &str) {
    let mut write_txn = path.db.begin_write().unwrap_or_else(|err| {
        panic!(
            "Failed to begin redb write transaction for {} store {}: {err:?}",
            label,
            path.path.display()
        )
    });
    write_txn
        .set_durability(Durability::None)
        .unwrap_or_else(|err| {
            panic!(
                "Failed to set redb durability for {} store {}: {err:?}",
                label,
                path.path.display()
            )
        });
    {
        let mut table = write_txn.open_table(BLOBS).unwrap_or_else(|err| {
            panic!(
                "Failed to open redb table for {} store {}: {err:?}",
                label,
                path.path.display()
            )
        });
        let old = table.remove(key.as_bytes()).unwrap_or_else(|err| {
            panic!(
                "Failed to remove redb key {} from {} store {}: {err:?}",
                key,
                label,
                path.path.display()
            )
        });
        assert!(
            old.is_some(),
            "Attempted to delete missing redb key {} from {} store {}",
            key,
            label,
            path.path.display()
        );
    }
    write_txn.commit().unwrap_or_else(|err| {
        panic!(
            "Failed to commit redb delete for {} store {}: {err:?}",
            label,
            path.path.display()
        )
    });
}

#[cfg(all(test, feature = "redb-bincodec"))]
mod tests {
    use std::sync::Arc;

    use tempfile::tempdir;
    use tokio::runtime::Handle;

    use crate::{BackingStore, FBPool};

    use super::{BinCodec, RedbPath, RedbStore};

    #[tokio::test]
    async fn persists_registers_and_loads() {
        let cache_dir = tempdir().unwrap();
        let persist_dir = tempdir().unwrap();
        let cache_path = cache_dir.path().join("cache.redb");
        let persist_path = persist_dir.path().join("persist.redb");
        let persisted_key;

        {
            let redb_store = RedbStore::new(BinCodec, RedbPath::new(cache_path.clone()));
            let store = Arc::new(BackingStore::new(redb_store, Handle::current()));
            let pool: Arc<FBPool<String, _>> = Arc::new(FBPool::new(store.clone(), 1));
            let tracked_persist = Arc::new(
                store
                    .track_path(RedbPath::new(persist_path.clone()))
                    .await
                    .unwrap(),
            );

            let item = pool.insert("Persisted Data".to_string());
            persisted_key = item.key();
            item.spawn_persist(&tracked_persist).await.await.unwrap();
            drop(item);
            store.finished().await;
        }

        {
            let redb_store = RedbStore::new(BinCodec, RedbPath::new(cache_path));
            let store = Arc::new(BackingStore::new(redb_store, Handle::current()));
            let pool: Arc<FBPool<String, _>> = Arc::new(FBPool::new(store.clone(), 1));
            let tracked_persist =
                Arc::new(store.track_path(RedbPath::new(persist_path)).await.unwrap());

            assert!(tracked_persist.all_keys().contains(&persisted_key));

            let item = pool
                .register(&tracked_persist, persisted_key)
                .await
                .expect("registered item");
            let guard = item.load().await;

            assert_eq!(*guard, "Persisted Data");
            drop(guard);
            drop(item);
            store.finished().await;
        }
    }
}