coordinode-lsm-tree 5.7.0

Embedded LSM-tree storage engine: BuRR filters, zstd dictionary compression, MVCC, range tombstones, merge operators, K/V separation, AES-256-GCM at rest.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation

//! Power-loss crash simulator [`Fs`] backend for recovery testing.
//!
//! [`CrashFs`] wraps an inner [`Fs`] and models the durability contract of a
//! real disk: bytes become durable only when a file handle is `fsync`ed
//! ([`FsFile::sync_all`] / [`FsFile::sync_data`]). [`CrashFs::crash`] simulates
//! a power loss by rolling every file back to its last-synced content and
//! removing files that were never synced at all — exactly the bytes a real
//! crash would lose. Reopening the storage engine on the post-crash backend
//! (via [`CrashFs::inner`]) then exercises its recovery path against the
//! worst-case durable image.
//!
//! Unlike [`FaultFs`](crate::fs::FaultFs), which makes a *chosen* operation
//! return an error, `CrashFs` is about *durability*: every operation succeeds
//! during the run, but a `crash()` reveals which writes were actually durable.
//! The two compose — wrap `CrashFs` in a `FaultFs` to fail a specific `fsync`
//! and then crash to discard the tail that the failed sync never made durable,
//! reproducing a torn write mid-flush.
//!
//! # Model and limitations
//!
//! Durability is tracked at file-content granularity: a file's durable image is
//! its full content as of its most recent successful `sync_all` / `sync_data`.
//! A file written but never synced vanishes on `crash()`; a synced file keeps
//! exactly its last-synced bytes (a later un-synced append or truncate is
//! rolled back). Directories are not rolled back — the engine fsyncs its data
//! directory on open, and modelling directory-entry durability separately would
//! add no coverage the file-content model lacks for LSM recovery. This is the
//! same power-loss model `RocksDB`'s crash test uses.
//!
//! This is a test/dev surface: it is gated behind the `std` feature and is not
//! part of the production storage path.
//!
//! # Examples
//!
//! ```
//! use lsm_tree::fs::{CrashFs, MemFs};
//!
//! let fs = CrashFs::new(MemFs::new());
//! // ... run a workload through `fs`, fsyncing durable checkpoints ...
//! fs.crash(); // discard everything written since the last fsync of each file
//! // ... reopen the engine on `fs.inner()` and verify recovery ...
//! ```

use super::{
    FileHint, Fs, FsCapabilities, FsDirEntry, FsFile, FsMetadata, FsOpenOptions, SyncMode,
};
use crate::io;
use crate::path::{Path, PathBuf};
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;
use hashbrown::{HashMap, HashSet};

/// Shared crash state: the durable image of each file plus the set of files
/// written during the current run (so `crash()` can vanish never-synced ones).
#[derive(Default)]
struct CrashState {
    /// Last-synced full content per file path. Presence means "this path has
    /// been made durable at least once"; the bytes are its durable image.
    durable: HashMap<PathBuf, Vec<u8>>,
    /// Every path opened for writing this run, whether or not yet synced.
    /// `crash()` visits these (plus `durable` keys) to roll back or remove.
    touched: HashSet<PathBuf>,
}

/// A power-loss crash simulator wrapping an inner [`Fs`].
///
/// See the module-level documentation for the durability model. The inner backend is
/// held as an [`Arc<dyn Fs>`]; obtain a clone with [`inner`](Self::inner) to
/// reopen the engine directly on the post-crash store (e.g. via
/// [`Config::with_shared_fs`](crate::Config::with_shared_fs)).
#[derive(Clone)]
pub struct CrashFs {
    inner: Arc<dyn Fs>,
    state: Arc<spin::Mutex<CrashState>>,
}

impl CrashFs {
    /// Wraps `inner`, treating its current contents as the initial durable
    /// image (nothing is rolled back until something is written and then a
    /// `crash()` occurs).
    #[must_use]
    pub fn new<F: Fs>(inner: F) -> Self {
        Self::from_shared(Arc::new(inner))
    }

    /// Wraps an existing shared backend handle.
    #[must_use]
    pub fn from_shared(inner: Arc<dyn Fs>) -> Self {
        Self {
            inner,
            state: Arc::new(spin::Mutex::new(CrashState::default())),
        }
    }

    /// Returns a clone of the wrapped backend handle, for reopening the engine
    /// on the same store after a [`crash`](Self::crash).
    #[must_use]
    pub fn inner(&self) -> Arc<dyn Fs> {
        Arc::clone(&self.inner)
    }

    /// Simulates a power loss: every file is rolled back to its last-synced
    /// content, and files written but never synced are removed. After this the
    /// backend holds exactly the bytes a real crash would have left durable.
    ///
    /// # Panics
    ///
    /// Panics if rolling a file back to its durable image fails (open / write /
    /// remove on the inner backend). A crash simulator that could not restore
    /// the durable image would silently under-test recovery, so the failure is
    /// surfaced loudly rather than swallowed. In-memory backends never hit this.
    pub fn crash(&self) {
        let mut state = self.state.lock();
        // Visit every path we wrote, plus any durable path (defensive: a file
        // synced in a prior life but only read this run still gets its durable
        // image reasserted).
        let paths: Vec<PathBuf> = state
            .touched
            .iter()
            .chain(state.durable.keys())
            .cloned()
            .collect();

        for path in paths {
            match state.durable.get(&path) {
                Some(bytes) => self.restore_durable(&path, bytes),
                None => {
                    // Never synced -> the file never existed durably.
                    match self.inner.remove_file(&path) {
                        Ok(()) => {}
                        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
                        Err(e) => {
                            panic!("crash(): removing un-synced {} failed: {e}", path.display())
                        }
                    }
                }
            }
        }

        // Post-crash, only durable files exist and they are "clean".
        state.touched = state.durable.keys().cloned().collect();
    }

    /// Overwrites the inner file at `path` with its durable image `bytes`.
    fn restore_durable(&self, path: &Path, bytes: &[u8]) {
        let mut file = self
            .inner
            .open(
                path,
                &FsOpenOptions::new().write(true).create(true).truncate(true),
            )
            .unwrap_or_else(|e| {
                panic!(
                    "crash(): reopening {} for rollback failed: {e}",
                    path.display()
                )
            });
        std::io::Write::write_all(&mut file, bytes).unwrap_or_else(|e| {
            panic!(
                "crash(): rewriting durable image of {} failed: {e}",
                path.display()
            )
        });
    }

    /// Reads the current content of `path` if it already exists, for use as the
    /// initial durable baseline. Returns `Ok(None)` only when the file does not
    /// exist (a brand-new file has no durable image until its first sync); any
    /// other open/read error is propagated rather than swallowed, so a transient
    /// read failure cannot silently drop a pre-existing file's durable image.
    fn read_baseline(&self, path: &Path) -> io::Result<Option<Vec<u8>>> {
        let mut f = match self.inner.open(path, &FsOpenOptions::new().read(true)) {
            Ok(f) => f,
            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
            Err(e) => return Err(e),
        };
        let mut buf = Vec::new();
        std::io::Read::read_to_end(&mut f, &mut buf)?;
        Ok(Some(buf))
    }

    /// Records `path`'s pre-existing content as its durable baseline on the FIRST
    /// touch of this run, then marks it touched. Re-touching is a no-op for the
    /// baseline (a file already touched is tracked by sync; re-reading it would
    /// wrongly promote un-synced bytes to durable). Used by every entry point
    /// that creates or mutates a file's content: `open` (writable), `punch_hole`,
    /// `truncate_file`.
    fn capture_first_touch(&self, path: &Path) -> io::Result<()> {
        let pb = path.to_path_buf();
        let first_touch = !self.state.lock().touched.contains(&pb);
        if first_touch {
            let baseline = self.read_baseline(path)?;
            if let Some(bytes) = baseline {
                self.state.lock().durable.insert(pb.clone(), bytes);
            }
        }
        self.state.lock().touched.insert(pb);
        Ok(())
    }

    /// Records the destination of a copy-style op (`hard_link` / `reflink`): it
    /// mirrors the source's durability so a crash either restores the
    /// linked/cloned bytes (durable source) or removes an un-synced copy.
    ///
    /// The source's durable image comes from one of two places: an entry already
    /// in `durable` (synced or captured this run), or, for a source that is
    /// pre-existing and never touched this run, its on-disk baseline (the same
    /// "current contents are initially durable" rule [`capture_first_touch`]
    /// applies). A source that is touched but not durable holds un-synced bytes,
    /// so the copy correctly inherits no durable image and a crash removes it.
    ///
    /// Reads the baseline outside the state lock (mirroring
    /// [`capture_first_touch`]) so backend I/O never runs under the mutex.
    fn track_copy(&self, src: &Path, dst: &Path) -> io::Result<()> {
        let (src_durable, src_touched) = {
            let state = self.state.lock();
            (state.durable.get(src).cloned(), state.touched.contains(src))
        };
        let dst_image = match src_durable {
            Some(bytes) => Some(bytes),
            None if !src_touched => self.read_baseline(src)?,
            None => None,
        };
        let mut state = self.state.lock();
        if let Some(bytes) = dst_image {
            state.durable.insert(dst.to_path_buf(), bytes);
        }
        state.touched.insert(dst.to_path_buf());
        Ok(())
    }
}

impl Fs for CrashFs {
    fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result<Box<dyn FsFile>> {
        let writable = opts.write || opts.create || opts.create_new || opts.append || opts.truncate;
        if writable {
            // Capture the pre-existing durable image BEFORE the open (which may
            // truncate); a brand-new file captures nothing, so a crash before its
            // first sync removes it.
            self.capture_first_touch(path)?;
        }
        let inner = self.inner.open(path, opts)?;
        Ok(Box::new(CrashFile {
            inner,
            path: path.to_path_buf(),
            fs: Arc::clone(&self.inner),
            state: Arc::clone(&self.state),
        }))
    }

    fn create_dir_all(&self, path: &Path) -> io::Result<()> {
        self.inner.create_dir_all(path)
    }

    fn create_dir(&self, path: &Path) -> io::Result<()> {
        self.inner.create_dir(path)
    }

    fn read_dir(&self, path: &Path) -> io::Result<Vec<FsDirEntry>> {
        self.inner.read_dir(path)
    }

    fn remove_file(&self, path: &Path) -> io::Result<()> {
        self.inner.remove_file(path)?;
        let mut state = self.state.lock();
        state.durable.remove(path);
        state.touched.remove(path);
        Ok(())
    }

    fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
        self.inner.remove_dir_all(path)?;
        // Purge crash state for every tracked path under the removed directory,
        // so crash() neither resurrects nor panics recreating a file whose
        // parent is gone.
        let mut state = self.state.lock();
        state.durable.retain(|k, _| !k.starts_with(path));
        state.touched.retain(|k| !k.starts_with(path));
        Ok(())
    }

    fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
        self.inner.rename(from, to)?;
        let mut state = self.state.lock();
        // The destination is replaced on disk: drop its prior durable image and
        // write-tracking first, then carry the source's across (the rename is
        // as durable as the source was). Clearing `to` unconditionally is what
        // stops a replaced, previously-synced destination from being resurrected
        // to its stale content on crash.
        let from_durable = state.durable.remove(from);
        state.durable.remove(to);
        if let Some(bytes) = from_durable {
            state.durable.insert(to.to_path_buf(), bytes);
        }
        let from_touched = state.touched.remove(from);
        state.touched.remove(to);
        if from_touched {
            state.touched.insert(to.to_path_buf());
        }
        Ok(())
    }

    fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
        self.inner.metadata(path)
    }

    fn sync_directory(&self, path: &Path) -> io::Result<()> {
        self.inner.sync_directory(path)
    }

    fn sync_directory_with(&self, path: &Path, mode: SyncMode) -> io::Result<()> {
        self.inner.sync_directory_with(path, mode)
    }

    fn exists(&self, path: &Path) -> io::Result<bool> {
        self.inner.exists(path)
    }

    fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()> {
        self.inner.hard_link(src, dst)?;
        self.track_copy(src, dst)?;
        Ok(())
    }

    fn backend_id(&self) -> Option<u64> {
        self.inner.backend_id()
    }

    fn volume_id(&self, path: &Path) -> Option<u64> {
        self.inner.volume_id(path)
    }

    fn capabilities(&self, path: &Path) -> FsCapabilities {
        self.inner.capabilities(path)
    }

    fn try_disable_cow(&self, path: &Path) -> io::Result<()> {
        self.inner.try_disable_cow(path)
    }

    fn punch_hole(&self, path: &Path, offset: u64, len: u64) -> io::Result<()> {
        // Content-mutating: capture the pre-mutation durable image so an
        // un-synced punch rolls back on crash.
        self.capture_first_touch(path)?;
        self.inner.punch_hole(path, offset, len)
    }

    fn reflink_file(&self, src: &Path, dst: &Path) -> io::Result<()> {
        self.inner.reflink_file(src, dst)?;
        self.track_copy(src, dst)?;
        Ok(())
    }

    fn truncate_file(&self, path: &Path) -> io::Result<()> {
        // Content-mutating: capture the pre-truncate image so an un-synced
        // reclaim rolls back on crash.
        self.capture_first_touch(path)?;
        self.inner.truncate_file(path)
    }

    fn hard_link_count(&self, path: &Path) -> io::Result<u64> {
        self.inner.hard_link_count(path)
    }

    fn available_space(&self, path: &Path) -> io::Result<u64> {
        self.inner.available_space(path)
    }
}

/// A file handle that records its durable image on `fsync`.
struct CrashFile {
    inner: Box<dyn FsFile>,
    path: PathBuf,
    /// Backend handle, used to reopen `path` read-only when snapshotting its
    /// content on `fsync` (the write handle may lack read access).
    fs: Arc<dyn Fs>,
    state: Arc<spin::Mutex<CrashState>>,
}

impl CrashFile {
    /// Captures this path's full current content as its durable image. Called
    /// after a successful `fsync`. Reopens the path read-only because the write
    /// handle being synced is not necessarily readable.
    fn snapshot(&self) -> io::Result<()> {
        let mut rf = self.fs.open(&self.path, &FsOpenOptions::new().read(true))?;
        let mut buf = Vec::new();
        std::io::Read::read_to_end(&mut rf, &mut buf)?;
        self.state.lock().durable.insert(self.path.clone(), buf);
        Ok(())
    }
}

impl std::io::Read for CrashFile {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        self.inner.read(buf)
    }
}

impl std::io::Write for CrashFile {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.inner.write(buf)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        // flush pushes to the backend but is NOT a durability barrier; the
        // durable image is captured on sync, not flush.
        self.inner.flush()
    }
}

impl std::io::Seek for CrashFile {
    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
        self.inner.seek(pos)
    }
}

impl FsFile for CrashFile {
    fn sync_all(&self) -> io::Result<()> {
        self.inner.sync_all()?;
        self.snapshot()
    }

    fn sync_data(&self) -> io::Result<()> {
        self.inner.sync_data()?;
        self.snapshot()
    }

    fn sync_all_with(&self, mode: SyncMode) -> io::Result<()> {
        self.inner.sync_all_with(mode)?;
        self.snapshot()
    }

    fn sync_data_with(&self, mode: SyncMode) -> io::Result<()> {
        self.inner.sync_data_with(mode)?;
        self.snapshot()
    }

    fn metadata(&self) -> io::Result<FsMetadata> {
        self.inner.metadata()
    }

    fn set_len(&self, size: u64) -> io::Result<()> {
        self.inner.set_len(size)
    }

    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
        self.inner.read_at(buf, offset)
    }

    fn lock_exclusive(&self) -> io::Result<()> {
        self.inner.lock_exclusive()
    }

    fn try_lock_exclusive(&self) -> io::Result<bool> {
        self.inner.try_lock_exclusive()
    }

    fn hint(&self, hint: FileHint) -> io::Result<()> {
        self.inner.hint(hint)
    }
}

#[cfg(test)]
#[expect(clippy::unwrap_used, clippy::expect_used, reason = "test code")]
mod tests;