Skip to main content

reddb_server/storage/engine/
pager.rs

1//! Pager - Page I/O Manager
2//!
3//! The Pager is responsible for reading and writing pages to/from disk.
4//! It integrates with the PageCache for efficient caching and the FreeList
5//! for page allocation.
6//!
7//! # Responsibilities
8//!
9//! 1. **Page I/O**: Read/write 4KB pages from/to disk
10//! 2. **Caching**: Integrate with SIEVE PageCache
11//! 3. **Allocation**: Manage free page allocation via FreeList
12//! 4. **Header Management**: Maintain database header (page 0)
13//!
14//! # File Layout
15//!
16//! ```text
17//! ┌─────────────────────────────────────────────────────────────┐
18//! │ Page 0: Database Header                                     │
19//! │   - Magic bytes "RDDB"                                      │
20//! │   - Version                                                 │
21//! │   - Page count                                              │
22//! │   - Freelist head                                           │
23//! ├─────────────────────────────────────────────────────────────┤
24//! │ Page 1: Root B-tree page (or first data page)              │
25//! ├─────────────────────────────────────────────────────────────┤
26//! │ Page 2..N: Data pages                                       │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # References
31//!
32//! - Turso `core/storage/pager.rs:54-134` - HeaderRef::from_pager()
33//! - Turso `core/storage/pager.rs:120` - pager.add_dirty(&page)
34
35use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43#[cfg(test)]
44use std::sync::atomic::{AtomicU8, Ordering};
45use std::sync::{Arc, Mutex, RwLock};
46
47pub use reddb_file::{DatabaseHeader, PhysicalFileHeader};
48
49/// Default cache size (pages)
50const DEFAULT_CACHE_SIZE: usize = 10_000;
51
52#[cfg(test)]
53static COW_ATOMIC_WRITE_TEST_OVERRIDE: AtomicU8 = AtomicU8::new(0);
54
55/// Pager error types
56#[derive(Debug)]
57pub enum PagerError {
58    /// I/O error
59    Io(std::io::Error),
60    /// Page error
61    Page(PageError),
62    /// Invalid database file
63    InvalidDatabase(String),
64    /// Database is read-only
65    ReadOnly,
66    /// Page not found
67    PageNotFound(u32),
68    /// Database is locked
69    Locked,
70    /// A Mutex or RwLock was poisoned (another thread panicked while holding it)
71    LockPoisoned,
72    /// Database is encrypted but no key was supplied.
73    EncryptionRequired,
74    /// Plain (unencrypted) database opened with an encryption key.
75    PlainDatabaseRefusesKey,
76    /// Encryption key validation failed for an encrypted database.
77    InvalidKey,
78}
79
80/// A contiguous run of database pages reserved for vector-turbo payloads.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct ExtentId {
83    pub start_page: u32,
84    pub n_pages: u32,
85}
86
87impl std::fmt::Display for PagerError {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            Self::Io(e) => write!(f, "I/O error: {}", e),
91            Self::Page(e) => write!(f, "Page error: {}", e),
92            Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
93            Self::ReadOnly => write!(f, "Database is read-only"),
94            Self::PageNotFound(id) => write!(f, "Page {} not found", id),
95            Self::Locked => write!(f, "Database is locked"),
96            Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
97            Self::EncryptionRequired => write!(
98                f,
99                "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
100            ),
101            Self::PlainDatabaseRefusesKey => write!(
102                f,
103                "Plain (unencrypted) database opened with an encryption key — refusing"
104            ),
105            Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
106        }
107    }
108}
109
110impl std::error::Error for PagerError {
111    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112        match self {
113            Self::Io(e) => Some(e),
114            Self::Page(e) => Some(e),
115            _ => None,
116        }
117    }
118}
119
120impl From<std::io::Error> for PagerError {
121    fn from(e: std::io::Error) -> Self {
122        Self::Io(e)
123    }
124}
125
126impl From<PageError> for PagerError {
127    fn from(e: PageError) -> Self {
128        Self::Page(e)
129    }
130}
131
132/// Pager configuration
133#[derive(Debug, Clone)]
134pub struct PagerConfig {
135    /// Page cache capacity
136    pub cache_size: usize,
137    /// Whether to open read-only
138    pub read_only: bool,
139    /// Whether to create if not exists
140    pub create: bool,
141    /// Whether to verify checksums on read
142    pub verify_checksums: bool,
143    /// Enable double-write buffer for torn page protection
144    pub double_write: bool,
145    /// Optional encryption key. When set, `Pager::open` writes/reads
146    /// pages through `PageEncryptor` and rejects any DB whose
147    /// encryption-marker disagrees with the supplied key (or its
148    /// absence). When `None`, the pager refuses to open a DB whose
149    /// header carries the `RDBE` encryption marker.
150    pub encryption: Option<crate::storage::encryption::SecureKey>,
151}
152
153impl Default for PagerConfig {
154    fn default() -> Self {
155        Self {
156            cache_size: DEFAULT_CACHE_SIZE,
157            read_only: false,
158            create: true,
159            verify_checksums: true,
160            double_write: true,
161            encryption: None,
162        }
163    }
164}
165
166/// Page I/O Manager
167///
168/// Handles reading/writing pages and manages the page cache.
169pub struct Pager {
170    /// Database file path
171    path: PathBuf,
172    /// File handle
173    file: Mutex<File>,
174    /// Exclusive file lock (held for lifetime, released on drop)
175    _lock_file: Option<File>,
176    /// Double-write buffer file.
177    dwb_file: Option<Mutex<File>>,
178    /// Page cache
179    cache: PageCache,
180    /// Free page list
181    freelist: RwLock<FreeList>,
182    /// Database header
183    header: RwLock<DatabaseHeader>,
184    /// Configuration
185    config: PagerConfig,
186    /// Dirty flag for header
187    header_dirty: Mutex<bool>,
188    /// Optional WAL writer for WAL-first flush ordering.
189    ///
190    /// When set, [`Pager::flush`] computes the maximum `header.lsn` of
191    /// every dirty page and calls [`WalWriter::flush_until`] before
192    /// passing the batch to the double-write buffer. This guarantees
193    /// the postgres-style invariant: a page on disk implies its WAL
194    /// record is already durable.
195    ///
196    /// Wired in via [`Pager::set_wal_writer`] post-construction so
197    /// existing callers that build a Pager without a WAL keep working
198    /// unchanged. See `PLAN.md` § Target 3.
199    wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
200    /// Optional page encryptor + header. When set, `read_page` /
201    /// `write_page` route through AES-GCM transparently and page 0
202    /// bypasses encryption (it carries the encryption marker +
203    /// header itself). When `None`, all pages are stored plaintext
204    /// and any DB header carrying the `RDBE` marker is rejected at
205    /// open time.
206    pub(crate) encryption: Option<(
207        crate::storage::encryption::PageEncryptor,
208        crate::storage::encryption::EncryptionHeader,
209    )>,
210}
211
212#[path = "pager/impl.rs"]
213mod pager_impl;
214impl Drop for Pager {
215    fn drop(&mut self) {
216        // Try to flush on drop
217        let _ = self.flush();
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    #[cfg(target_os = "linux")]
225    use pager_impl::parse_mountinfo_options_for_path;
226    use pager_impl::{
227        classify_cow_filesystem, CowFilesystemKind, BTRFS_SUPER_MAGIC, FS_NOCOW_FL, ZFS_SUPER_MAGIC,
228    };
229    use std::fs;
230    use std::io::Write;
231
232    fn temp_db_path() -> PathBuf {
233        use std::sync::atomic::{AtomicU64, Ordering};
234        static COUNTER: AtomicU64 = AtomicU64::new(0);
235        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
236        let mut path = std::env::temp_dir();
237        path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
238        path
239    }
240
241    fn cleanup(path: &Path) {
242        let _ = fs::remove_file(path);
243        // Clean up companion files
244        let _ = fs::remove_file(reddb_file::layout::pager_header_shadow_path(path));
245        let _ = fs::remove_file(reddb_file::layout::pager_meta_shadow_path(path));
246        let _ = fs::remove_file(reddb_file::layout::pager_dwb_shadow_path(path));
247    }
248
249    fn dwb_path_for(path: &Path) -> PathBuf {
250        reddb_file::layout::pager_dwb_shadow_path(path)
251    }
252
253    static COW_ATOMIC_WRITE_OVERRIDE_GUARD: Mutex<()> = Mutex::new(());
254
255    struct CowAtomicWriteOverrideGuard {
256        _guard: std::sync::MutexGuard<'static, ()>,
257    }
258
259    impl Drop for CowAtomicWriteOverrideGuard {
260        fn drop(&mut self) {
261            COW_ATOMIC_WRITE_TEST_OVERRIDE.store(0, Ordering::Relaxed);
262        }
263    }
264
265    fn cow_atomic_write_override(value: bool) -> CowAtomicWriteOverrideGuard {
266        let guard = COW_ATOMIC_WRITE_OVERRIDE_GUARD
267            .lock()
268            .unwrap_or_else(|err| err.into_inner());
269        COW_ATOMIC_WRITE_TEST_OVERRIDE.store(if value { 1 } else { 2 }, Ordering::Relaxed);
270        CowAtomicWriteOverrideGuard { _guard: guard }
271    }
272
273    fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
274        let pages: Vec<_> = pages
275            .iter()
276            .map(|(page_id, page)| {
277                let mut page = page.clone();
278                page.update_checksum();
279                (*page_id, page)
280            })
281            .collect();
282        let buf = reddb_file::encode_paged_dwb_frame(
283            pages
284                .iter()
285                .map(|(page_id, page)| (*page_id, page.as_bytes())),
286        );
287
288        let dwb_path = dwb_path_for(path);
289        let mut file = fs::File::create(&dwb_path).unwrap();
290        file.write_all(&buf).unwrap();
291        file.sync_all().unwrap();
292    }
293
294    fn write_page_bytes(path: &Path, page_id: u32, page: &Page) {
295        let mut file = OpenOptions::new().write(true).open(path).unwrap();
296        file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
297            .unwrap();
298        file.write_all(page.as_bytes()).unwrap();
299        file.sync_all().unwrap();
300    }
301
302    fn write_torn_page_bytes(path: &Path, page_id: u32, before: &Page, after: &Page) {
303        let mut torn = *before.as_bytes();
304        torn[..PAGE_SIZE / 2].copy_from_slice(&after.as_bytes()[..PAGE_SIZE / 2]);
305
306        let mut file = OpenOptions::new().write(true).open(path).unwrap();
307        file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
308            .unwrap();
309        file.write_all(&torn).unwrap();
310        file.sync_all().unwrap();
311    }
312
313    #[test]
314    fn test_pager_create_new() {
315        let path = temp_db_path();
316        cleanup(&path);
317
318        {
319            let pager = Pager::open_default(&path).unwrap();
320            assert_eq!(pager.page_count().unwrap(), 3); // Header + reserved pages
321        }
322
323        cleanup(&path);
324    }
325
326    #[test]
327    fn test_pager_reopen() {
328        let path = temp_db_path();
329        cleanup(&path);
330
331        // Create and write
332        {
333            let pager = Pager::open_default(&path).unwrap();
334
335            // Allocate a page
336            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
337            assert_eq!(page.page_id(), 3);
338
339            pager.sync().unwrap();
340        }
341
342        // Reopen and verify
343        {
344            let pager = Pager::open_default(&path).unwrap();
345            assert_eq!(pager.page_count().unwrap(), 4); // Header + reserved pages + 1 data page
346        }
347
348        cleanup(&path);
349    }
350
351    #[test]
352    fn test_pager_read_write() {
353        let path = temp_db_path();
354        cleanup(&path);
355
356        {
357            let pager = Pager::open_default(&path).unwrap();
358
359            // Allocate and write
360            let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
361            let page_id = page.page_id();
362
363            page.insert_cell(b"key", b"value").unwrap();
364            pager.write_page(page_id, page).unwrap();
365
366            // Read back
367            let read_page = pager.read_page(page_id).unwrap();
368            let (key, value) = read_page.read_cell(0).unwrap();
369            assert_eq!(key, b"key");
370            assert_eq!(value, b"value");
371        }
372
373        cleanup(&path);
374    }
375
376    #[test]
377    fn test_pager_cache() {
378        let path = temp_db_path();
379        cleanup(&path);
380
381        {
382            let pager = Pager::open_default(&path).unwrap();
383
384            // Allocate a page
385            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
386            let page_id = page.page_id();
387
388            // First read - should be cached from allocate
389            let _ = pager.read_page(page_id).unwrap();
390
391            // Second read - should hit cache
392            let _ = pager.read_page(page_id).unwrap();
393
394            let stats = pager.cache_stats();
395            assert!(stats.hits >= 1);
396        }
397
398        cleanup(&path);
399    }
400
401    #[test]
402    fn test_pager_free_page() {
403        let path = temp_db_path();
404        cleanup(&path);
405
406        {
407            let pager = Pager::open_default(&path).unwrap();
408
409            // Allocate pages
410            let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
411            let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
412
413            let id1 = page1.page_id();
414            let id2 = page2.page_id();
415
416            // Free page 1
417            pager.free_page(id1).unwrap();
418
419            // Next allocation should reuse page 1
420            let page3 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
421            assert_eq!(page3.page_id(), id1);
422        }
423
424        cleanup(&path);
425    }
426
427    #[test]
428    fn test_freelist_persistence() {
429        let path = temp_db_path();
430        cleanup(&path);
431
432        let freed_id;
433        {
434            let pager = Pager::open_default(&path).unwrap();
435            let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
436            let _page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
437            freed_id = page1.page_id();
438
439            pager.free_page(freed_id).unwrap();
440            pager.sync().unwrap();
441        }
442
443        {
444            let pager = Pager::open_default(&path).unwrap();
445            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
446            assert_eq!(page.page_id(), freed_id);
447        }
448
449        cleanup(&path);
450    }
451
452    #[test]
453    fn test_pager_read_only() {
454        let path = temp_db_path();
455        cleanup(&path);
456
457        // Create database
458        {
459            let pager = Pager::open_default(&path).unwrap();
460            pager.sync().unwrap();
461        }
462
463        // Open read-only
464        {
465            let config = PagerConfig {
466                read_only: true,
467                ..Default::default()
468            };
469
470            let pager = Pager::open(&path, config).unwrap();
471            assert!(pager.is_read_only());
472
473            // Should fail to allocate
474            assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
475        }
476
477        cleanup(&path);
478    }
479
480    #[test]
481    fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
482        let path = temp_db_path();
483        cleanup(&path);
484
485        let config = PagerConfig {
486            double_write: true,
487            ..Default::default()
488        };
489
490        let page_id;
491        {
492            let pager = Pager::open(&path, config.clone()).unwrap();
493            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
494            page_id = page.page_id();
495            pager.sync().unwrap();
496        }
497
498        let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
499        recovered_page.insert_cell(b"key", b"value").unwrap();
500        write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
501
502        let dwb_path = dwb_path_for(&path);
503        assert!(dwb_path.exists());
504        assert!(fs::metadata(&dwb_path).unwrap().len() > 0);
505
506        {
507            let pager = Pager::open(&path, config).unwrap();
508
509            let read_page = pager.read_page(page_id).unwrap();
510            let (key, value) = read_page.read_cell(0).unwrap();
511            assert_eq!(key, b"key");
512            assert_eq!(value, b"value");
513
514            assert!(dwb_path.exists());
515            assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
516
517            let mut updated_page = recovered_page.clone();
518            updated_page.insert_cell(b"key2", b"value2").unwrap();
519            pager.write_page(page_id, updated_page).unwrap();
520            pager.flush().unwrap();
521
522            assert!(dwb_path.exists());
523            assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
524        }
525
526        cleanup(&path);
527    }
528
529    #[test]
530    fn cow_probe_classification_fails_closed_for_btrfs_nodatacow() {
531        assert_eq!(
532            classify_cow_filesystem(ZFS_SUPER_MAGIC, None, None),
533            Some(CowFilesystemKind::Zfs),
534            "ZFS is always CoW"
535        );
536        assert_eq!(
537            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,relatime"), Some(0)),
538            Some(CowFilesystemKind::BtrfsDataCow),
539            "btrfs qualifies only when datacow remains enabled"
540        );
541        assert_eq!(
542            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,nodatacow"), Some(0)),
543            None,
544            "btrfs nodatacow mount option must reject DWB skip"
545        );
546        assert_eq!(
547            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), Some(FS_NOCOW_FL)),
548            None,
549            "btrfs chattr +C / NOCOW inode flag must reject DWB skip"
550        );
551        assert_eq!(
552            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), None),
553            None,
554            "missing btrfs inode flags are uncertain and must fail closed"
555        );
556        assert_eq!(
557            classify_cow_filesystem(BTRFS_SUPER_MAGIC, None, Some(0)),
558            None,
559            "missing btrfs mount options are uncertain and must fail closed"
560        );
561    }
562
563    #[cfg(target_os = "linux")]
564    #[test]
565    fn mountinfo_parser_uses_longest_cow_mount_and_rejects_nodatacow() {
566        let mountinfo = "\
56724 18 0:21 / / rw,relatime - ext4 /dev/root rw\n\
56835 24 0:42 /subvol /mnt/reddb rw,relatime - btrfs /dev/sdb rw,space_cache=v2\n\
56936 35 0:43 /nocow /mnt/reddb/nocow rw,relatime - btrfs /dev/sdb rw,nodatacow\n\
570";
571
572        assert_eq!(
573            parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/data.rdb"))
574                .as_deref(),
575            Some("rw,relatime,rw,space_cache=v2")
576        );
577        assert_eq!(
578            parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/nocow/data.rdb"))
579                .as_deref(),
580            Some("rw,relatime,rw,nodatacow")
581        );
582    }
583
584    #[test]
585    fn double_write_false_keeps_dwb_when_cow_probe_denies() {
586        let _override = cow_atomic_write_override(false);
587        let path = temp_db_path();
588        cleanup(&path);
589
590        {
591            let config = PagerConfig {
592                double_write: false,
593                ..Default::default()
594            };
595            let pager = Pager::open(&path, config).unwrap();
596            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
597            pager.write_page(page.page_id(), page).unwrap();
598            pager.flush().unwrap();
599        }
600
601        assert!(
602            dwb_path_for(&path).exists(),
603            "DWB must stay enabled when double_write=false is not proven safe"
604        );
605
606        cleanup(&path);
607    }
608
609    #[test]
610    fn double_write_false_skips_dwb_when_cow_probe_allows() {
611        let _override = cow_atomic_write_override(true);
612        let path = temp_db_path();
613        cleanup(&path);
614
615        {
616            let config = PagerConfig {
617                double_write: false,
618                ..Default::default()
619            };
620            let pager = Pager::open(&path, config).unwrap();
621            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
622            pager.write_page(page.page_id(), page).unwrap();
623            pager.flush().unwrap();
624        }
625
626        assert!(
627            !dwb_path_for(&path).exists(),
628            "DWB may be skipped only after the CoW probe allows it"
629        );
630
631        cleanup(&path);
632    }
633
634    #[test]
635    fn double_write_false_on_cow_replays_then_removes_existing_dwb() {
636        let _override = cow_atomic_write_override(true);
637        let path = temp_db_path();
638        cleanup(&path);
639
640        let page_id;
641        {
642            let pager = Pager::open(&path, PagerConfig::default()).unwrap();
643            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
644            page_id = page.page_id();
645            pager.sync().unwrap();
646        }
647
648        let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
649        recovered_page.insert_cell(b"key", b"value").unwrap();
650        write_dwb_fixture(&path, &[(page_id, recovered_page)]);
651
652        {
653            let config = PagerConfig {
654                double_write: false,
655                ..Default::default()
656            };
657            let pager = Pager::open(&path, config).unwrap();
658            let read_page = pager.read_page(page_id).unwrap();
659            let (key, value) = read_page.read_cell(0).unwrap();
660            assert_eq!(key, b"key");
661            assert_eq!(value, b"value");
662        }
663
664        assert!(
665            !dwb_path_for(&path).exists(),
666            "CoW DWB-skip must replay any existing DWB before removing the sidecar"
667        );
668
669        cleanup(&path);
670    }
671
672    #[test]
673    fn simulated_cow_mid_write_leaves_a_whole_consistent_page_without_dwb() {
674        let _override = cow_atomic_write_override(true);
675        let path = temp_db_path();
676        cleanup(&path);
677
678        let config = PagerConfig {
679            double_write: false,
680            ..Default::default()
681        };
682
683        let page_id;
684        let before;
685        let after;
686        {
687            let pager = Pager::open(&path, config.clone()).unwrap();
688            let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
689            page_id = page.page_id();
690            page.insert_cell(b"phase", b"before").unwrap();
691            pager.write_page(page_id, page).unwrap();
692            pager.sync().unwrap();
693            before = pager.read_page(page_id).unwrap();
694
695            let mut page = before.clone();
696            page.insert_cell(b"phase2", b"after").unwrap();
697            pager.write_page(page_id, page).unwrap();
698            pager.flush().unwrap();
699            after = pager.read_page(page_id).unwrap();
700        }
701
702        // CoW crash model: the interrupted write leaves either the old full
703        // page or the new full page, never a torn mix. Exercise both outcomes.
704        for (whole_page, expected_cells) in [(&before, 1), (&after, 2)] {
705            write_page_bytes(&path, page_id, whole_page);
706
707            let pager = Pager::open(&path, config.clone()).unwrap();
708            let recovered = pager.read_page(page_id).unwrap();
709            assert_eq!(recovered.cell_count(), expected_cells);
710            let (key, value) = recovered.read_cell(0).unwrap();
711            assert_eq!(key, b"phase");
712            assert_eq!(value, b"before");
713            if expected_cells == 2 {
714                let (key, value) = recovered.read_cell(1).unwrap();
715                assert_eq!(key, b"phase2");
716                assert_eq!(value, b"after");
717            }
718            drop(pager);
719        }
720
721        cleanup(&path);
722    }
723
724    #[test]
725    fn same_mid_write_without_cow_recovers_from_dwb() {
726        let _override = cow_atomic_write_override(false);
727        let path = temp_db_path();
728        cleanup(&path);
729
730        let config = PagerConfig {
731            double_write: false,
732            ..Default::default()
733        };
734
735        let page_id;
736        let before;
737        let after;
738        {
739            let pager = Pager::open(&path, config.clone()).unwrap();
740            let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
741            page_id = page.page_id();
742            page.insert_cell(b"phase", b"before").unwrap();
743            pager.write_page(page_id, page).unwrap();
744            pager.sync().unwrap();
745            before = pager.read_page(page_id).unwrap();
746
747            let mut page = before.clone();
748            page.insert_cell(b"phase2", b"after").unwrap();
749            pager.write_page(page_id, page).unwrap();
750            pager.flush().unwrap();
751            after = pager.read_page(page_id).unwrap();
752        }
753
754        write_dwb_fixture(&path, &[(page_id, after.clone())]);
755        write_torn_page_bytes(&path, page_id, &before, &after);
756
757        {
758            let pager = Pager::open(&path, config).unwrap();
759            let recovered = pager.read_page(page_id).unwrap();
760            assert_eq!(recovered.cell_count(), 2);
761
762            let (key, value) = recovered.read_cell(0).unwrap();
763            assert_eq!(key, b"phase");
764            assert_eq!(value, b"before");
765
766            let (key, value) = recovered.read_cell(1).unwrap();
767            assert_eq!(key, b"phase2");
768            assert_eq!(value, b"after");
769        }
770
771        assert_eq!(fs::metadata(dwb_path_for(&path)).unwrap().len(), 0);
772        cleanup(&path);
773    }
774
775    // -----------------------------------------------------------------
776    // Target 3: WAL-first flush ordering
777    // -----------------------------------------------------------------
778
779    #[test]
780    fn pager_starts_without_wal_writer() {
781        let path = temp_db_path();
782        let pager = Pager::open(&path, PagerConfig::default()).unwrap();
783        assert!(!pager.has_wal_writer());
784        drop(pager);
785        cleanup(&path);
786    }
787
788    #[test]
789    fn set_wal_writer_attaches_handle() {
790        use crate::storage::wal::writer::WalWriter;
791        use std::sync::{Arc, Mutex};
792
793        let db_path = temp_db_path();
794        let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
795        let _ = fs::remove_file(&wal_path);
796
797        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
798        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
799        pager.set_wal_writer(Arc::clone(&wal));
800        assert!(pager.has_wal_writer());
801
802        pager.clear_wal_writer();
803        assert!(!pager.has_wal_writer());
804
805        drop(pager);
806        let _ = fs::remove_file(&wal_path);
807        cleanup(&db_path);
808    }
809
810    #[test]
811    fn flush_with_lsn_zero_pages_skips_wal_call() {
812        // When every dirty page has lsn == 0 (the legacy auto-commit
813        // path), flush() must NOT call wal.flush_until — there is no
814        // WAL record to wait for. We verify this by attaching a WAL
815        // whose durable_lsn starts at 8 and confirming flush() does
816        // not advance it (no append, no flush).
817        use crate::storage::wal::writer::WalWriter;
818        use std::sync::{Arc, Mutex};
819
820        let db_path = temp_db_path();
821        let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
822        let _ = fs::remove_file(&wal_path);
823
824        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
825        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
826        let initial_durable = {
827            let g = wal.lock().unwrap();
828            g.durable_lsn()
829        };
830        pager.set_wal_writer(Arc::clone(&wal));
831
832        // Allocate and write a page with lsn = 0.
833        let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
834        page.insert_cell(b"k", b"v").unwrap();
835        // header.lsn stays at 0 — caller did not stamp.
836        pager.write_page(page.page_id(), page).unwrap();
837        pager.flush().unwrap();
838
839        // WAL durable_lsn must be unchanged because flush_until was
840        // never called (max lsn over dirty pages was 0).
841        let after_flush = {
842            let g = wal.lock().unwrap();
843            g.durable_lsn()
844        };
845        assert_eq!(after_flush, initial_durable);
846
847        drop(pager);
848        let _ = fs::remove_file(&wal_path);
849        cleanup(&db_path);
850    }
851
852    #[test]
853    fn flush_advances_wal_durable_when_pages_carry_lsn() {
854        // The full WAL-first dance: append a record, capture the
855        // returned LSN, stamp it on a page, flush — afterwards the
856        // WAL must be durable up to at least that LSN.
857        use crate::storage::wal::record::WalRecord;
858        use crate::storage::wal::writer::WalWriter;
859        use std::sync::{Arc, Mutex};
860
861        let db_path = temp_db_path();
862        let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
863        let _ = fs::remove_file(&wal_path);
864
865        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
866        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
867        pager.set_wal_writer(Arc::clone(&wal));
868
869        // Stamp two dirty pages with a real WAL LSN.
870        let stamped_lsn = {
871            let mut wal_guard = wal.lock().unwrap();
872            wal_guard.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
873            wal_guard.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
874            wal_guard.current_lsn()
875        };
876        let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
877        page.insert_cell(b"k", b"v").unwrap();
878        // Use the public Page API to set the LSN.
879        page.set_lsn(stamped_lsn);
880        pager.write_page(page.page_id(), page).unwrap();
881        pager.flush().unwrap();
882
883        // After flush, the WAL is durable at least up to our stamp.
884        let after_flush = {
885            let g = wal.lock().unwrap();
886            g.durable_lsn()
887        };
888        assert!(
889            after_flush >= stamped_lsn,
890            "after flush durable_lsn {} must be >= stamped {}",
891            after_flush,
892            stamped_lsn
893        );
894
895        drop(pager);
896        let _ = fs::remove_file(&wal_path);
897        cleanup(&db_path);
898    }
899
900    // -----------------------------------------------------------------
901    // gh-892: filesystem block-size alignment diagnostic
902    // -----------------------------------------------------------------
903
904    #[test]
905    fn block_size_warn_fires_for_mismatched_block_size() {
906        // A block size that does not divide the 16 KiB page size means a
907        // page write straddles FS blocks — the predicate must report a
908        // misalignment so `open()` emits the warning.
909        assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6000));
910        // Block larger than the page (e.g. 1 MiB): 16384 % 1048576 != 0.
911        assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 1_048_576));
912        // 6 KiB also fails to divide 16 KiB.
913        assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6 * 1024));
914    }
915
916    #[test]
917    fn block_size_silent_for_divisor() {
918        // Block sizes that evenly divide the page size: no straddle, no warn.
919        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 4096));
920        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 16384));
921        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 512));
922        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 8192));
923    }
924
925    #[test]
926    fn block_size_unavailable_is_silent() {
927        // st_blksize == 0 means the probe is unavailable; never warn on it.
928        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 0));
929    }
930
931    #[test]
932    fn page_size_is_unchanged_16kib() {
933        // The diagnostic must never alter the compile-time page size.
934        assert_eq!(PAGE_SIZE, 16 * 1024);
935    }
936}