reddb-io-server 1.1.2

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
//! Pager - Page I/O Manager
//!
//! The Pager is responsible for reading and writing pages to/from disk.
//! It integrates with the PageCache for efficient caching and the FreeList
//! for page allocation.
//!
//! # Responsibilities
//!
//! 1. **Page I/O**: Read/write 4KB pages from/to disk
//! 2. **Caching**: Integrate with SIEVE PageCache
//! 3. **Allocation**: Manage free page allocation via FreeList
//! 4. **Header Management**: Maintain database header (page 0)
//!
//! # File Layout
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────┐
//! │ Page 0: Database Header                                     │
//! │   - Magic bytes "RDDB"                                      │
//! │   - Version                                                 │
//! │   - Page count                                              │
//! │   - Freelist head                                           │
//! ├─────────────────────────────────────────────────────────────┤
//! │ Page 1: Root B-tree page (or first data page)              │
//! ├─────────────────────────────────────────────────────────────┤
//! │ Page 2..N: Data pages                                       │
//! └─────────────────────────────────────────────────────────────┘
//! ```
//!
//! # References
//!
//! - Turso `core/storage/pager.rs:54-134` - HeaderRef::from_pager()
//! - Turso `core/storage/pager.rs:120` - pager.add_dirty(&page)

use super::freelist::FreeList;
use super::page::{Page, PageError, PageType, DB_VERSION, HEADER_SIZE, MAGIC_BYTES, PAGE_SIZE};
use super::page_cache::PageCache;
use crate::storage::wal::writer::WalWriter;
use fs2::FileExt;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};

/// Default cache size (pages)
const DEFAULT_CACHE_SIZE: usize = 10_000;

/// Pager error types
#[derive(Debug)]
pub enum PagerError {
    /// I/O error
    Io(std::io::Error),
    /// Page error
    Page(PageError),
    /// Invalid database file
    InvalidDatabase(String),
    /// Database is read-only
    ReadOnly,
    /// Page not found
    PageNotFound(u32),
    /// Database is locked
    Locked,
    /// A Mutex or RwLock was poisoned (another thread panicked while holding it)
    LockPoisoned,
    /// Database is encrypted but no key was supplied.
    EncryptionRequired,
    /// Plain (unencrypted) database opened with an encryption key.
    PlainDatabaseRefusesKey,
    /// Encryption key validation failed for an encrypted database.
    InvalidKey,
}

impl std::fmt::Display for PagerError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Io(e) => write!(f, "I/O error: {}", e),
            Self::Page(e) => write!(f, "Page error: {}", e),
            Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
            Self::ReadOnly => write!(f, "Database is read-only"),
            Self::PageNotFound(id) => write!(f, "Page {} not found", id),
            Self::Locked => write!(f, "Database is locked"),
            Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
            Self::EncryptionRequired => write!(
                f,
                "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
            ),
            Self::PlainDatabaseRefusesKey => write!(
                f,
                "Plain (unencrypted) database opened with an encryption key — refusing"
            ),
            Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
        }
    }
}

impl std::error::Error for PagerError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            Self::Io(e) => Some(e),
            Self::Page(e) => Some(e),
            _ => None,
        }
    }
}

impl From<std::io::Error> for PagerError {
    fn from(e: std::io::Error) -> Self {
        Self::Io(e)
    }
}

impl From<PageError> for PagerError {
    fn from(e: PageError) -> Self {
        Self::Page(e)
    }
}

/// Pager configuration
#[derive(Debug, Clone)]
pub struct PagerConfig {
    /// Page cache capacity
    pub cache_size: usize,
    /// Whether to open read-only
    pub read_only: bool,
    /// Whether to create if not exists
    pub create: bool,
    /// Whether to verify checksums on read
    pub verify_checksums: bool,
    /// Enable double-write buffer for torn page protection
    pub double_write: bool,
    /// Optional encryption key. When set, `Pager::open` writes/reads
    /// pages through `PageEncryptor` and rejects any DB whose
    /// encryption-marker disagrees with the supplied key (or its
    /// absence). When `None`, the pager refuses to open a DB whose
    /// header carries the `RDBE` encryption marker.
    pub encryption: Option<crate::storage::encryption::SecureKey>,
}

impl Default for PagerConfig {
    fn default() -> Self {
        Self {
            cache_size: DEFAULT_CACHE_SIZE,
            read_only: false,
            create: true,
            verify_checksums: true,
            double_write: true,
            encryption: None,
        }
    }
}

/// Database file header information
#[derive(Debug, Clone)]
pub struct DatabaseHeader {
    /// Database version
    pub version: u32,
    /// Page size (always 4096)
    pub page_size: u32,
    /// Total number of pages
    pub page_count: u32,
    /// First freelist trunk page ID (0 = no free pages)
    pub freelist_head: u32,
    /// Schema version (for migrations)
    pub schema_version: u32,
    /// Last checkpoint LSN
    pub checkpoint_lsn: u64,
    /// Whether a checkpoint is currently in progress (two-phase)
    pub checkpoint_in_progress: bool,
    /// Target LSN for the in-progress checkpoint
    pub checkpoint_target_lsn: u64,
    /// Physical layout header mirrored into page 0
    pub physical: PhysicalFileHeader,
}

/// Minimal physical state published into page 0 for paged databases.
#[derive(Debug, Clone, Copy, Default)]
pub struct PhysicalFileHeader {
    pub format_version: u32,
    pub sequence: u64,
    pub manifest_oldest_root: u64,
    pub manifest_root: u64,
    pub free_set_root: u64,
    pub manifest_page: u32,
    pub manifest_checksum: u64,
    pub collection_roots_page: u32,
    pub collection_roots_checksum: u64,
    pub collection_root_count: u32,
    pub snapshot_count: u32,
    pub index_count: u32,
    pub catalog_collection_count: u32,
    pub catalog_total_entities: u64,
    pub export_count: u32,
    pub graph_projection_count: u32,
    pub analytics_job_count: u32,
    pub manifest_event_count: u32,
    pub registry_page: u32,
    pub registry_checksum: u64,
    pub recovery_page: u32,
    pub recovery_checksum: u64,
    pub catalog_page: u32,
    pub catalog_checksum: u64,
    pub metadata_state_page: u32,
    pub metadata_state_checksum: u64,
    pub vector_artifact_page: u32,
    pub vector_artifact_checksum: u64,
}

impl Default for DatabaseHeader {
    fn default() -> Self {
        Self {
            version: DB_VERSION,
            page_size: PAGE_SIZE as u32,
            page_count: 1, // Header page
            freelist_head: 0,
            schema_version: 0,
            checkpoint_lsn: 0,
            checkpoint_in_progress: false,
            checkpoint_target_lsn: 0,
            physical: PhysicalFileHeader::default(),
        }
    }
}

/// Page I/O Manager
///
/// Handles reading/writing pages and manages the page cache.
pub struct Pager {
    /// Database file path
    path: PathBuf,
    /// File handle
    file: Mutex<File>,
    /// Exclusive file lock (held for lifetime, released on drop)
    _lock_file: Option<File>,
    /// Double-write buffer file (.rdb-dwb)
    dwb_file: Option<Mutex<File>>,
    /// Page cache
    cache: PageCache,
    /// Free page list
    freelist: RwLock<FreeList>,
    /// Database header
    header: RwLock<DatabaseHeader>,
    /// Configuration
    config: PagerConfig,
    /// Dirty flag for header
    header_dirty: Mutex<bool>,
    /// Optional WAL writer for WAL-first flush ordering.
    ///
    /// When set, [`Pager::flush`] computes the maximum `header.lsn` of
    /// every dirty page and calls [`WalWriter::flush_until`] before
    /// passing the batch to the double-write buffer. This guarantees
    /// the postgres-style invariant: a page on disk implies its WAL
    /// record is already durable.
    ///
    /// Wired in via [`Pager::set_wal_writer`] post-construction so
    /// existing callers that build a Pager without a WAL keep working
    /// unchanged. See `PLAN.md` § Target 3.
    wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
    /// Optional page encryptor + header. When set, `read_page` /
    /// `write_page` route through AES-GCM transparently and page 0
    /// bypasses encryption (it carries the encryption marker +
    /// header itself). When `None`, all pages are stored plaintext
    /// and any DB header carrying the `RDBE` marker is rejected at
    /// open time.
    pub(crate) encryption: Option<(
        crate::storage::encryption::PageEncryptor,
        crate::storage::encryption::EncryptionHeader,
    )>,
}

#[path = "pager/impl.rs"]
mod pager_impl;
impl Drop for Pager {
    fn drop(&mut self) {
        // Try to flush on drop
        let _ = self.flush();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;
    use std::io::Write;

    fn temp_db_path() -> PathBuf {
        use std::sync::atomic::{AtomicU64, Ordering};
        static COUNTER: AtomicU64 = AtomicU64::new(0);
        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
        let mut path = std::env::temp_dir();
        path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
        path
    }

    fn cleanup(path: &Path) {
        let _ = fs::remove_file(path);
        // Clean up companion files
        let mut hdr = path.to_path_buf().into_os_string();
        hdr.push("-hdr");
        let _ = fs::remove_file(&hdr);
        let mut meta = path.to_path_buf().into_os_string();
        meta.push("-meta");
        let _ = fs::remove_file(&meta);
        let mut dwb = path.to_path_buf().into_os_string();
        dwb.push("-dwb");
        let _ = fs::remove_file(&dwb);
    }

    fn dwb_path_for(path: &Path) -> PathBuf {
        let mut dwb = path.to_path_buf().into_os_string();
        dwb.push("-dwb");
        PathBuf::from(dwb)
    }

    fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
        let entry_size = 4 + PAGE_SIZE;
        let header_len = 12;
        let total = header_len + pages.len() * entry_size;
        let mut buf = Vec::with_capacity(total);

        buf.extend_from_slice(&[0x52, 0x44, 0x44, 0x57]); // "RDDW"
        buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
        buf.extend_from_slice(&[0u8; 4]);

        for (page_id, page) in pages {
            let mut page = page.clone();
            page.update_checksum();
            buf.extend_from_slice(&page_id.to_le_bytes());
            buf.extend_from_slice(page.as_bytes());
        }

        let checksum = crate::storage::engine::crc32::crc32(&buf[header_len..]);
        buf[8..12].copy_from_slice(&checksum.to_le_bytes());

        let dwb_path = dwb_path_for(path);
        let mut file = fs::File::create(&dwb_path).unwrap();
        file.write_all(&buf).unwrap();
        file.sync_all().unwrap();
    }

    #[test]
    fn test_pager_create_new() {
        let path = temp_db_path();
        cleanup(&path);

        {
            let pager = Pager::open_default(&path).unwrap();
            assert_eq!(pager.page_count().unwrap(), 3); // Header + reserved pages
        }

        cleanup(&path);
    }

    #[test]
    fn test_pager_reopen() {
        let path = temp_db_path();
        cleanup(&path);

        // Create and write
        {
            let pager = Pager::open_default(&path).unwrap();

            // Allocate a page
            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            assert_eq!(page.page_id(), 3);

            pager.sync().unwrap();
        }

        // Reopen and verify
        {
            let pager = Pager::open_default(&path).unwrap();
            assert_eq!(pager.page_count().unwrap(), 4); // Header + reserved pages + 1 data page
        }

        cleanup(&path);
    }

    #[test]
    fn test_pager_read_write() {
        let path = temp_db_path();
        cleanup(&path);

        {
            let pager = Pager::open_default(&path).unwrap();

            // Allocate and write
            let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            let page_id = page.page_id();

            page.insert_cell(b"key", b"value").unwrap();
            pager.write_page(page_id, page).unwrap();

            // Read back
            let read_page = pager.read_page(page_id).unwrap();
            let (key, value) = read_page.read_cell(0).unwrap();
            assert_eq!(key, b"key");
            assert_eq!(value, b"value");
        }

        cleanup(&path);
    }

    #[test]
    fn test_pager_cache() {
        let path = temp_db_path();
        cleanup(&path);

        {
            let pager = Pager::open_default(&path).unwrap();

            // Allocate a page
            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            let page_id = page.page_id();

            // First read - should be cached from allocate
            let _ = pager.read_page(page_id).unwrap();

            // Second read - should hit cache
            let _ = pager.read_page(page_id).unwrap();

            let stats = pager.cache_stats();
            assert!(stats.hits >= 1);
        }

        cleanup(&path);
    }

    #[test]
    fn test_pager_free_page() {
        let path = temp_db_path();
        cleanup(&path);

        {
            let pager = Pager::open_default(&path).unwrap();

            // Allocate pages
            let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();

            let id1 = page1.page_id();
            let id2 = page2.page_id();

            // Free page 1
            pager.free_page(id1).unwrap();

            // Next allocation should reuse page 1
            let page3 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            assert_eq!(page3.page_id(), id1);
        }

        cleanup(&path);
    }

    #[test]
    fn test_freelist_persistence() {
        let path = temp_db_path();
        cleanup(&path);

        let freed_id;
        {
            let pager = Pager::open_default(&path).unwrap();
            let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            let _page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            freed_id = page1.page_id();

            pager.free_page(freed_id).unwrap();
            pager.sync().unwrap();
        }

        {
            let pager = Pager::open_default(&path).unwrap();
            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            assert_eq!(page.page_id(), freed_id);
        }

        cleanup(&path);
    }

    #[test]
    fn test_pager_read_only() {
        let path = temp_db_path();
        cleanup(&path);

        // Create database
        {
            let pager = Pager::open_default(&path).unwrap();
            pager.sync().unwrap();
        }

        // Open read-only
        {
            let config = PagerConfig {
                read_only: true,
                ..Default::default()
            };

            let pager = Pager::open(&path, config).unwrap();
            assert!(pager.is_read_only());

            // Should fail to allocate
            assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
        }

        cleanup(&path);
    }

    #[test]
    fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
        let path = temp_db_path();
        cleanup(&path);

        let config = PagerConfig {
            double_write: true,
            ..Default::default()
        };

        let page_id;
        {
            let pager = Pager::open(&path, config.clone()).unwrap();
            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
            page_id = page.page_id();
            pager.sync().unwrap();
        }

        let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
        recovered_page.insert_cell(b"key", b"value").unwrap();
        write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);

        let dwb_path = dwb_path_for(&path);
        assert!(dwb_path.exists());
        assert!(fs::metadata(&dwb_path).unwrap().len() > 0);

        {
            let pager = Pager::open(&path, config).unwrap();

            let read_page = pager.read_page(page_id).unwrap();
            let (key, value) = read_page.read_cell(0).unwrap();
            assert_eq!(key, b"key");
            assert_eq!(value, b"value");

            assert!(dwb_path.exists());
            assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);

            let mut updated_page = recovered_page.clone();
            updated_page.insert_cell(b"key2", b"value2").unwrap();
            pager.write_page(page_id, updated_page).unwrap();
            pager.flush().unwrap();

            assert!(dwb_path.exists());
            assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
        }

        cleanup(&path);
    }

    // -----------------------------------------------------------------
    // Target 3: WAL-first flush ordering
    // -----------------------------------------------------------------

    #[test]
    fn pager_starts_without_wal_writer() {
        let path = temp_db_path();
        let pager = Pager::open(&path, PagerConfig::default()).unwrap();
        assert!(!pager.has_wal_writer());
        drop(pager);
        cleanup(&path);
    }

    #[test]
    fn set_wal_writer_attaches_handle() {
        use crate::storage::wal::writer::WalWriter;
        use std::sync::{Arc, Mutex};

        let db_path = temp_db_path();
        let mut wal_path = db_path.clone();
        wal_path.set_extension("wal");
        let _ = fs::remove_file(&wal_path);

        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
        pager.set_wal_writer(Arc::clone(&wal));
        assert!(pager.has_wal_writer());

        pager.clear_wal_writer();
        assert!(!pager.has_wal_writer());

        drop(pager);
        let _ = fs::remove_file(&wal_path);
        cleanup(&db_path);
    }

    #[test]
    fn flush_with_lsn_zero_pages_skips_wal_call() {
        // When every dirty page has lsn == 0 (the legacy auto-commit
        // path), flush() must NOT call wal.flush_until — there is no
        // WAL record to wait for. We verify this by attaching a WAL
        // whose durable_lsn starts at 8 and confirming flush() does
        // not advance it (no append, no flush).
        use crate::storage::wal::writer::WalWriter;
        use std::sync::{Arc, Mutex};

        let db_path = temp_db_path();
        let mut wal_path = db_path.clone();
        wal_path.set_extension("wal");
        let _ = fs::remove_file(&wal_path);

        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
        let initial_durable = {
            let g = wal.lock().unwrap();
            g.durable_lsn()
        };
        pager.set_wal_writer(Arc::clone(&wal));

        // Allocate and write a page with lsn = 0.
        let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
        page.insert_cell(b"k", b"v").unwrap();
        // header.lsn stays at 0 — caller did not stamp.
        pager.write_page(page.page_id(), page).unwrap();
        pager.flush().unwrap();

        // WAL durable_lsn must be unchanged because flush_until was
        // never called (max lsn over dirty pages was 0).
        let after_flush = {
            let g = wal.lock().unwrap();
            g.durable_lsn()
        };
        assert_eq!(after_flush, initial_durable);

        drop(pager);
        let _ = fs::remove_file(&wal_path);
        cleanup(&db_path);
    }

    #[test]
    fn flush_advances_wal_durable_when_pages_carry_lsn() {
        // The full WAL-first dance: append a record, capture the
        // returned LSN, stamp it on a page, flush — afterwards the
        // WAL must be durable up to at least that LSN.
        use crate::storage::wal::record::WalRecord;
        use crate::storage::wal::writer::WalWriter;
        use std::sync::{Arc, Mutex};

        let db_path = temp_db_path();
        let mut wal_path = db_path.clone();
        wal_path.set_extension("wal");
        let _ = fs::remove_file(&wal_path);

        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
        pager.set_wal_writer(Arc::clone(&wal));

        // Stamp two dirty pages with a real WAL LSN.
        let stamped_lsn = {
            let mut wal_guard = wal.lock().unwrap();
            wal_guard.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
            wal_guard.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
            wal_guard.current_lsn()
        };
        let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
        page.insert_cell(b"k", b"v").unwrap();
        // Use the public Page API to set the LSN.
        page.set_lsn(stamped_lsn);
        pager.write_page(page.page_id(), page).unwrap();
        pager.flush().unwrap();

        // After flush, the WAL is durable at least up to our stamp.
        let after_flush = {
            let g = wal.lock().unwrap();
            g.durable_lsn()
        };
        assert!(
            after_flush >= stamped_lsn,
            "after flush durable_lsn {} must be >= stamped {}",
            after_flush,
            stamped_lsn
        );

        drop(pager);
        let _ = fs::remove_file(&wal_path);
        cleanup(&db_path);
    }
}