Skip to main content

reddb_server/storage/engine/
pager.rs

1//! Pager - Page I/O Manager
2//!
3//! The Pager is responsible for reading and writing pages to/from disk.
4//! It integrates with the PageCache for efficient caching and the FreeList
5//! for page allocation.
6//!
7//! # Responsibilities
8//!
9//! 1. **Page I/O**: Read/write 4KB pages from/to disk
10//! 2. **Caching**: Integrate with SIEVE PageCache
11//! 3. **Allocation**: Manage free page allocation via FreeList
12//! 4. **Header Management**: Maintain database header (page 0)
13//!
14//! # File Layout
15//!
16//! ```text
17//! ┌─────────────────────────────────────────────────────────────┐
18//! │ Page 0: Database Header                                     │
19//! │   - Magic bytes "RDDB"                                      │
20//! │   - Version                                                 │
21//! │   - Page count                                              │
22//! │   - Freelist head                                           │
23//! ├─────────────────────────────────────────────────────────────┤
24//! │ Page 1: Root B-tree page (or first data page)              │
25//! ├─────────────────────────────────────────────────────────────┤
26//! │ Page 2..N: Data pages                                       │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # References
31//!
32//! - Turso `core/storage/pager.rs:54-134` - HeaderRef::from_pager()
33//! - Turso `core/storage/pager.rs:120` - pager.add_dirty(&page)
34
35use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43#[cfg(test)]
44use std::sync::atomic::{AtomicU8, Ordering};
45use std::sync::{Arc, Mutex, RwLock};
46
47pub use reddb_file::{DatabaseHeader, PhysicalFileHeader};
48
49/// Default cache size (pages)
50const DEFAULT_CACHE_SIZE: usize = 10_000;
51
52#[cfg(test)]
53static COW_ATOMIC_WRITE_TEST_OVERRIDE: AtomicU8 = AtomicU8::new(0);
54
55/// Pager error types
56#[derive(Debug)]
57pub enum PagerError {
58    /// I/O error
59    Io(std::io::Error),
60    /// Page error
61    Page(PageError),
62    /// Invalid database file
63    InvalidDatabase(String),
64    /// Database is read-only
65    ReadOnly,
66    /// Page not found
67    PageNotFound(u32),
68    /// Database is locked
69    Locked,
70    /// A Mutex or RwLock was poisoned (another thread panicked while holding it)
71    LockPoisoned,
72    /// Database is encrypted but no key was supplied.
73    EncryptionRequired,
74    /// Plain (unencrypted) database opened with an encryption key.
75    PlainDatabaseRefusesKey,
76    /// Encryption key validation failed for an encrypted database.
77    InvalidKey,
78}
79
80/// A contiguous run of database pages reserved for vector-turbo payloads.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct ExtentId {
83    pub start_page: u32,
84    pub n_pages: u32,
85}
86
87impl std::fmt::Display for PagerError {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            Self::Io(e) => write!(f, "I/O error: {}", e),
91            Self::Page(e) => write!(f, "Page error: {}", e),
92            Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
93            Self::ReadOnly => write!(f, "Database is read-only"),
94            Self::PageNotFound(id) => write!(f, "Page {} not found", id),
95            Self::Locked => write!(f, "Database is locked"),
96            Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
97            Self::EncryptionRequired => write!(
98                f,
99                "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
100            ),
101            Self::PlainDatabaseRefusesKey => write!(
102                f,
103                "Plain (unencrypted) database opened with an encryption key — refusing"
104            ),
105            Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
106        }
107    }
108}
109
110impl std::error::Error for PagerError {
111    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112        match self {
113            Self::Io(e) => Some(e),
114            Self::Page(e) => Some(e),
115            _ => None,
116        }
117    }
118}
119
120impl From<std::io::Error> for PagerError {
121    fn from(e: std::io::Error) -> Self {
122        Self::Io(e)
123    }
124}
125
126impl From<PageError> for PagerError {
127    fn from(e: PageError) -> Self {
128        Self::Page(e)
129    }
130}
131
132/// Pager configuration
133#[derive(Debug, Clone)]
134pub struct PagerConfig {
135    /// Page cache capacity
136    pub cache_size: usize,
137    /// Whether to open read-only
138    pub read_only: bool,
139    /// Whether to create if not exists
140    pub create: bool,
141    /// Whether to verify checksums on read
142    pub verify_checksums: bool,
143    /// Enable double-write buffer for torn page protection
144    pub double_write: bool,
145    /// Optional encryption key. When set, `Pager::open` writes/reads
146    /// pages through `PageEncryptor` and rejects any DB whose
147    /// encryption-marker disagrees with the supplied key (or its
148    /// absence). When `None`, the pager refuses to open a DB whose
149    /// header carries the `RDBE` encryption marker.
150    pub encryption: Option<crate::storage::encryption::SecureKey>,
151}
152
153impl Default for PagerConfig {
154    fn default() -> Self {
155        Self {
156            cache_size: DEFAULT_CACHE_SIZE,
157            read_only: false,
158            create: true,
159            verify_checksums: true,
160            double_write: true,
161            encryption: None,
162        }
163    }
164}
165
166/// Page I/O Manager
167///
168/// Handles reading/writing pages and manages the page cache.
169pub struct Pager {
170    /// Database file path
171    path: PathBuf,
172    /// File handle
173    file: Mutex<File>,
174    /// Exclusive file lock (held for lifetime, released on drop)
175    _lock_file: Option<File>,
176    /// Double-write buffer file.
177    dwb_file: Option<Mutex<File>>,
178    /// Page cache
179    cache: PageCache,
180    /// Free page list
181    freelist: RwLock<FreeList>,
182    /// Database header
183    header: RwLock<DatabaseHeader>,
184    /// Configuration
185    config: PagerConfig,
186    /// Dirty flag for header
187    header_dirty: Mutex<bool>,
188    /// Optional WAL writer for WAL-first flush ordering.
189    ///
190    /// When set, [`Pager::flush`] computes the maximum `header.lsn` of
191    /// every dirty page and calls [`WalWriter::flush_until`] before
192    /// passing the batch to the double-write buffer. This guarantees
193    /// the postgres-style invariant: a page on disk implies its WAL
194    /// record is already durable.
195    ///
196    /// Wired in via [`Pager::set_wal_writer`] post-construction so
197    /// existing callers that build a Pager without a WAL keep working
198    /// unchanged. See `PLAN.md` § Target 3.
199    wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
200    /// Optional page encryptor + header. When set, `read_page` /
201    /// `write_page` route through AES-GCM transparently and page 0
202    /// bypasses encryption (it carries the encryption marker +
203    /// header itself). When `None`, all pages are stored plaintext
204    /// and any DB header carrying the `RDBE` marker is rejected at
205    /// open time.
206    pub(crate) encryption: Option<(
207        crate::storage::encryption::PageEncryptor,
208        crate::storage::encryption::EncryptionHeader,
209    )>,
210}
211
212#[path = "pager/impl.rs"]
213mod pager_impl;
214impl Drop for Pager {
215    fn drop(&mut self) {
216        // Try to flush on drop
217        let _ = self.flush();
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    #[cfg(target_os = "linux")]
225    use pager_impl::parse_mountinfo_options_for_path;
226    use pager_impl::{
227        classify_cow_filesystem, CowFilesystemKind, BTRFS_SUPER_MAGIC, FS_NOCOW_FL, ZFS_SUPER_MAGIC,
228    };
229    use std::fs;
230    use std::io::Write;
231
232    fn temp_db_path() -> PathBuf {
233        use std::sync::atomic::{AtomicU64, Ordering};
234        static COUNTER: AtomicU64 = AtomicU64::new(0);
235        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
236        let mut path = std::env::temp_dir();
237        path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
238        path
239    }
240
241    fn cleanup(path: &Path) {
242        let _ = fs::remove_file(path);
243        // Clean up companion files
244        let _ = fs::remove_file(reddb_file::layout::pager_header_shadow_path(path));
245        let _ = fs::remove_file(reddb_file::layout::pager_meta_shadow_path(path));
246        let _ = fs::remove_file(reddb_file::layout::pager_dwb_shadow_path(path));
247    }
248
249    fn dwb_path_for(path: &Path) -> PathBuf {
250        reddb_file::layout::pager_dwb_shadow_path(path)
251    }
252
253    static COW_ATOMIC_WRITE_OVERRIDE_GUARD: Mutex<()> = Mutex::new(());
254
255    struct CowAtomicWriteOverrideGuard {
256        _guard: std::sync::MutexGuard<'static, ()>,
257    }
258
259    impl Drop for CowAtomicWriteOverrideGuard {
260        fn drop(&mut self) {
261            COW_ATOMIC_WRITE_TEST_OVERRIDE.store(0, Ordering::Relaxed);
262        }
263    }
264
265    fn cow_atomic_write_override(value: bool) -> CowAtomicWriteOverrideGuard {
266        let guard = COW_ATOMIC_WRITE_OVERRIDE_GUARD
267            .lock()
268            .unwrap_or_else(|err| err.into_inner());
269        COW_ATOMIC_WRITE_TEST_OVERRIDE.store(if value { 1 } else { 2 }, Ordering::Relaxed);
270        CowAtomicWriteOverrideGuard { _guard: guard }
271    }
272
273    fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
274        let pages: Vec<_> = pages
275            .iter()
276            .map(|(page_id, page)| {
277                let mut page = page.clone();
278                page.update_checksum();
279                (*page_id, page)
280            })
281            .collect();
282        let buf = reddb_file::encode_paged_dwb_frame(
283            pages
284                .iter()
285                .map(|(page_id, page)| (*page_id, page.as_bytes())),
286        );
287
288        let dwb_path = dwb_path_for(path);
289        let mut file = fs::File::create(&dwb_path).expect("create() should succeed");
290        file.write_all(&buf).expect("write_all() should succeed");
291        file.sync_all().expect("sync_all() should succeed");
292    }
293
294    fn write_page_bytes(path: &Path, page_id: u32, page: &Page) {
295        let mut file = OpenOptions::new()
296            .write(true)
297            .open(path)
298            .expect("open() should succeed");
299        file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
300            .expect("value is present");
301        file.write_all(page.as_bytes())
302            .expect("write_all() should succeed");
303        file.sync_all().expect("sync_all() should succeed");
304    }
305
306    fn write_torn_page_bytes(path: &Path, page_id: u32, before: &Page, after: &Page) {
307        let mut torn = *before.as_bytes();
308        torn[..PAGE_SIZE / 2].copy_from_slice(&after.as_bytes()[..PAGE_SIZE / 2]);
309
310        let mut file = OpenOptions::new()
311            .write(true)
312            .open(path)
313            .expect("open() should succeed");
314        file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
315            .expect("value is present");
316        file.write_all(&torn).expect("write_all() should succeed");
317        file.sync_all().expect("sync_all() should succeed");
318    }
319
320    #[test]
321    fn test_pager_create_new() {
322        let path = temp_db_path();
323        cleanup(&path);
324
325        {
326            let pager = Pager::open_default(&path).expect("open_default() should succeed");
327            assert_eq!(pager.page_count().expect("page_count() should succeed"), 3);
328            // Header + reserved pages
329        }
330
331        cleanup(&path);
332    }
333
334    #[test]
335    fn test_pager_reopen() {
336        let path = temp_db_path();
337        cleanup(&path);
338
339        // Create and write
340        {
341            let pager = Pager::open_default(&path).expect("open_default() should succeed");
342
343            // Allocate a page
344            let page = pager
345                .allocate_page(PageType::BTreeLeaf)
346                .expect("allocate_page() should succeed");
347            assert_eq!(page.page_id(), 3);
348
349            pager.sync().expect("sync() should succeed");
350        }
351
352        // Reopen and verify
353        {
354            let pager = Pager::open_default(&path).expect("open_default() should succeed");
355            assert_eq!(pager.page_count().expect("page_count() should succeed"), 4);
356            // Header + reserved pages + 1 data page
357        }
358
359        cleanup(&path);
360    }
361
362    #[test]
363    fn test_pager_read_write() {
364        let path = temp_db_path();
365        cleanup(&path);
366
367        {
368            let pager = Pager::open_default(&path).expect("open_default() should succeed");
369
370            // Allocate and write
371            let mut page = pager
372                .allocate_page(PageType::BTreeLeaf)
373                .expect("allocate_page() should succeed");
374            let page_id = page.page_id();
375
376            page.insert_cell(b"key", b"value")
377                .expect("insert_cell() should succeed");
378            pager
379                .write_page(page_id, page)
380                .expect("write_page() should succeed");
381
382            // Read back
383            let read_page = pager
384                .read_page(page_id)
385                .expect("read_page() should succeed");
386            let (key, value) = read_page.read_cell(0).expect("read_cell() should succeed");
387            assert_eq!(key, b"key");
388            assert_eq!(value, b"value");
389        }
390
391        cleanup(&path);
392    }
393
394    #[test]
395    fn test_pager_cache() {
396        let path = temp_db_path();
397        cleanup(&path);
398
399        {
400            let pager = Pager::open_default(&path).expect("open_default() should succeed");
401
402            // Allocate a page
403            let page = pager
404                .allocate_page(PageType::BTreeLeaf)
405                .expect("allocate_page() should succeed");
406            let page_id = page.page_id();
407
408            // First read - should be cached from allocate
409            let _ = pager
410                .read_page(page_id)
411                .expect("read_page() should succeed");
412
413            // Second read - should hit cache
414            let _ = pager
415                .read_page(page_id)
416                .expect("read_page() should succeed");
417
418            let stats = pager.cache_stats();
419            assert!(stats.hits >= 1);
420        }
421
422        cleanup(&path);
423    }
424
425    #[test]
426    fn test_pager_free_page() {
427        let path = temp_db_path();
428        cleanup(&path);
429
430        {
431            let pager = Pager::open_default(&path).expect("open_default() should succeed");
432
433            // Allocate pages
434            let page1 = pager
435                .allocate_page(PageType::BTreeLeaf)
436                .expect("allocate_page() should succeed");
437            let page2 = pager
438                .allocate_page(PageType::BTreeLeaf)
439                .expect("allocate_page() should succeed");
440
441            let id1 = page1.page_id();
442            let id2 = page2.page_id();
443
444            // Free page 1
445            pager.free_page(id1).expect("free_page() should succeed");
446
447            // Next allocation should reuse page 1
448            let page3 = pager
449                .allocate_page(PageType::BTreeLeaf)
450                .expect("allocate_page() should succeed");
451            assert_eq!(page3.page_id(), id1);
452        }
453
454        cleanup(&path);
455    }
456
457    #[test]
458    fn test_freelist_persistence() {
459        let path = temp_db_path();
460        cleanup(&path);
461
462        let freed_id;
463        {
464            let pager = Pager::open_default(&path).expect("open_default() should succeed");
465            let page1 = pager
466                .allocate_page(PageType::BTreeLeaf)
467                .expect("allocate_page() should succeed");
468            let _page2 = pager
469                .allocate_page(PageType::BTreeLeaf)
470                .expect("allocate_page() should succeed");
471            freed_id = page1.page_id();
472
473            pager
474                .free_page(freed_id)
475                .expect("free_page() should succeed");
476            pager.sync().expect("sync() should succeed");
477        }
478
479        {
480            let pager = Pager::open_default(&path).expect("open_default() should succeed");
481            let page = pager
482                .allocate_page(PageType::BTreeLeaf)
483                .expect("allocate_page() should succeed");
484            assert_eq!(page.page_id(), freed_id);
485        }
486
487        cleanup(&path);
488    }
489
490    #[test]
491    fn test_pager_read_only() {
492        let path = temp_db_path();
493        cleanup(&path);
494
495        // Create database
496        {
497            let pager = Pager::open_default(&path).expect("open_default() should succeed");
498            pager.sync().expect("sync() should succeed");
499        }
500
501        // Open read-only
502        {
503            let config = PagerConfig {
504                read_only: true,
505                ..Default::default()
506            };
507
508            let pager = Pager::open(&path, config).expect("open() should succeed");
509            assert!(pager.is_read_only());
510
511            // Should fail to allocate
512            assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
513        }
514
515        cleanup(&path);
516    }
517
518    #[test]
519    fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
520        let path = temp_db_path();
521        cleanup(&path);
522
523        let config = PagerConfig {
524            double_write: true,
525            ..Default::default()
526        };
527
528        let page_id;
529        {
530            let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
531            let page = pager
532                .allocate_page(PageType::BTreeLeaf)
533                .expect("allocate_page() should succeed");
534            page_id = page.page_id();
535            pager.sync().expect("sync() should succeed");
536        }
537
538        let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
539        recovered_page
540            .insert_cell(b"key", b"value")
541            .expect("insert_cell() should succeed");
542        write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
543
544        let dwb_path = dwb_path_for(&path);
545        assert!(dwb_path.exists());
546        assert!(
547            fs::metadata(&dwb_path)
548                .expect("metadata() should succeed")
549                .len()
550                > 0
551        );
552
553        {
554            let pager = Pager::open(&path, config).expect("open() should succeed");
555
556            let read_page = pager
557                .read_page(page_id)
558                .expect("read_page() should succeed");
559            let (key, value) = read_page.read_cell(0).expect("read_cell() should succeed");
560            assert_eq!(key, b"key");
561            assert_eq!(value, b"value");
562
563            assert!(dwb_path.exists());
564            assert_eq!(
565                fs::metadata(&dwb_path)
566                    .expect("metadata() should succeed")
567                    .len(),
568                0
569            );
570
571            let mut updated_page = recovered_page.clone();
572            updated_page
573                .insert_cell(b"key2", b"value2")
574                .expect("insert_cell() should succeed");
575            pager
576                .write_page(page_id, updated_page)
577                .expect("write_page() should succeed");
578            pager.flush().expect("flush() should succeed");
579
580            assert!(dwb_path.exists());
581            assert_eq!(
582                fs::metadata(&dwb_path)
583                    .expect("metadata() should succeed")
584                    .len(),
585                0
586            );
587        }
588
589        cleanup(&path);
590    }
591
592    #[test]
593    fn cow_probe_classification_fails_closed_for_btrfs_nodatacow() {
594        assert_eq!(
595            classify_cow_filesystem(ZFS_SUPER_MAGIC, None, None),
596            Some(CowFilesystemKind::Zfs),
597            "ZFS is always CoW"
598        );
599        assert_eq!(
600            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,relatime"), Some(0)),
601            Some(CowFilesystemKind::BtrfsDataCow),
602            "btrfs qualifies only when datacow remains enabled"
603        );
604        assert_eq!(
605            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,nodatacow"), Some(0)),
606            None,
607            "btrfs nodatacow mount option must reject DWB skip"
608        );
609        assert_eq!(
610            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), Some(FS_NOCOW_FL)),
611            None,
612            "btrfs chattr +C / NOCOW inode flag must reject DWB skip"
613        );
614        assert_eq!(
615            classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), None),
616            None,
617            "missing btrfs inode flags are uncertain and must fail closed"
618        );
619        assert_eq!(
620            classify_cow_filesystem(BTRFS_SUPER_MAGIC, None, Some(0)),
621            None,
622            "missing btrfs mount options are uncertain and must fail closed"
623        );
624    }
625
626    #[cfg(target_os = "linux")]
627    #[test]
628    fn mountinfo_parser_uses_longest_cow_mount_and_rejects_nodatacow() {
629        let mountinfo = "\
63024 18 0:21 / / rw,relatime - ext4 /dev/root rw\n\
63135 24 0:42 /subvol /mnt/reddb rw,relatime - btrfs /dev/sdb rw,space_cache=v2\n\
63236 35 0:43 /nocow /mnt/reddb/nocow rw,relatime - btrfs /dev/sdb rw,nodatacow\n\
633";
634
635        assert_eq!(
636            parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/data.rdb"))
637                .as_deref(),
638            Some("rw,relatime,rw,space_cache=v2")
639        );
640        assert_eq!(
641            parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/nocow/data.rdb"))
642                .as_deref(),
643            Some("rw,relatime,rw,nodatacow")
644        );
645    }
646
647    #[test]
648    fn double_write_false_keeps_dwb_when_cow_probe_denies() {
649        let _override = cow_atomic_write_override(false);
650        let path = temp_db_path();
651        cleanup(&path);
652
653        {
654            let config = PagerConfig {
655                double_write: false,
656                ..Default::default()
657            };
658            let pager = Pager::open(&path, config).expect("open() should succeed");
659            let page = pager
660                .allocate_page(PageType::BTreeLeaf)
661                .expect("allocate_page() should succeed");
662            pager
663                .write_page(page.page_id(), page)
664                .expect("write_page() should succeed");
665            pager.flush().expect("flush() should succeed");
666        }
667
668        assert!(
669            dwb_path_for(&path).exists(),
670            "DWB must stay enabled when double_write=false is not proven safe"
671        );
672
673        cleanup(&path);
674    }
675
676    #[test]
677    fn double_write_false_skips_dwb_when_cow_probe_allows() {
678        let _override = cow_atomic_write_override(true);
679        let path = temp_db_path();
680        cleanup(&path);
681
682        {
683            let config = PagerConfig {
684                double_write: false,
685                ..Default::default()
686            };
687            let pager = Pager::open(&path, config).expect("open() should succeed");
688            let page = pager
689                .allocate_page(PageType::BTreeLeaf)
690                .expect("allocate_page() should succeed");
691            pager
692                .write_page(page.page_id(), page)
693                .expect("write_page() should succeed");
694            pager.flush().expect("flush() should succeed");
695        }
696
697        assert!(
698            !dwb_path_for(&path).exists(),
699            "DWB may be skipped only after the CoW probe allows it"
700        );
701
702        cleanup(&path);
703    }
704
705    #[test]
706    fn double_write_false_on_cow_replays_then_removes_existing_dwb() {
707        let _override = cow_atomic_write_override(true);
708        let path = temp_db_path();
709        cleanup(&path);
710
711        let page_id;
712        {
713            let pager = Pager::open(&path, PagerConfig::default()).expect("open() should succeed");
714            let page = pager
715                .allocate_page(PageType::BTreeLeaf)
716                .expect("allocate_page() should succeed");
717            page_id = page.page_id();
718            pager.sync().expect("sync() should succeed");
719        }
720
721        let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
722        recovered_page
723            .insert_cell(b"key", b"value")
724            .expect("insert_cell() should succeed");
725        write_dwb_fixture(&path, &[(page_id, recovered_page)]);
726
727        {
728            let config = PagerConfig {
729                double_write: false,
730                ..Default::default()
731            };
732            let pager = Pager::open(&path, config).expect("open() should succeed");
733            let read_page = pager
734                .read_page(page_id)
735                .expect("read_page() should succeed");
736            let (key, value) = read_page.read_cell(0).expect("read_cell() should succeed");
737            assert_eq!(key, b"key");
738            assert_eq!(value, b"value");
739        }
740
741        assert!(
742            !dwb_path_for(&path).exists(),
743            "CoW DWB-skip must replay any existing DWB before removing the sidecar"
744        );
745
746        cleanup(&path);
747    }
748
749    #[test]
750    fn simulated_cow_mid_write_leaves_a_whole_consistent_page_without_dwb() {
751        let _override = cow_atomic_write_override(true);
752        let path = temp_db_path();
753        cleanup(&path);
754
755        let config = PagerConfig {
756            double_write: false,
757            ..Default::default()
758        };
759
760        let page_id;
761        let before;
762        let after;
763        {
764            let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
765            let mut page = pager
766                .allocate_page(PageType::BTreeLeaf)
767                .expect("allocate_page() should succeed");
768            page_id = page.page_id();
769            page.insert_cell(b"phase", b"before")
770                .expect("insert_cell() should succeed");
771            pager
772                .write_page(page_id, page)
773                .expect("write_page() should succeed");
774            pager.sync().expect("sync() should succeed");
775            before = pager
776                .read_page(page_id)
777                .expect("read_page() should succeed");
778
779            let mut page = before.clone();
780            page.insert_cell(b"phase2", b"after")
781                .expect("insert_cell() should succeed");
782            pager
783                .write_page(page_id, page)
784                .expect("write_page() should succeed");
785            pager.flush().expect("flush() should succeed");
786            after = pager
787                .read_page(page_id)
788                .expect("read_page() should succeed");
789        }
790
791        // CoW crash model: the interrupted write leaves either the old full
792        // page or the new full page, never a torn mix. Exercise both outcomes.
793        for (whole_page, expected_cells) in [(&before, 1), (&after, 2)] {
794            write_page_bytes(&path, page_id, whole_page);
795
796            let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
797            let recovered = pager
798                .read_page(page_id)
799                .expect("read_page() should succeed");
800            assert_eq!(recovered.cell_count(), expected_cells);
801            let (key, value) = recovered.read_cell(0).expect("read_cell() should succeed");
802            assert_eq!(key, b"phase");
803            assert_eq!(value, b"before");
804            if expected_cells == 2 {
805                let (key, value) = recovered.read_cell(1).expect("read_cell() should succeed");
806                assert_eq!(key, b"phase2");
807                assert_eq!(value, b"after");
808            }
809            drop(pager);
810        }
811
812        cleanup(&path);
813    }
814
815    #[test]
816    fn same_mid_write_without_cow_recovers_from_dwb() {
817        let _override = cow_atomic_write_override(false);
818        let path = temp_db_path();
819        cleanup(&path);
820
821        let config = PagerConfig {
822            double_write: false,
823            ..Default::default()
824        };
825
826        let page_id;
827        let before;
828        let after;
829        {
830            let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
831            let mut page = pager
832                .allocate_page(PageType::BTreeLeaf)
833                .expect("allocate_page() should succeed");
834            page_id = page.page_id();
835            page.insert_cell(b"phase", b"before")
836                .expect("insert_cell() should succeed");
837            pager
838                .write_page(page_id, page)
839                .expect("write_page() should succeed");
840            pager.sync().expect("sync() should succeed");
841            before = pager
842                .read_page(page_id)
843                .expect("read_page() should succeed");
844
845            let mut page = before.clone();
846            page.insert_cell(b"phase2", b"after")
847                .expect("insert_cell() should succeed");
848            pager
849                .write_page(page_id, page)
850                .expect("write_page() should succeed");
851            pager.flush().expect("flush() should succeed");
852            after = pager
853                .read_page(page_id)
854                .expect("read_page() should succeed");
855        }
856
857        write_dwb_fixture(&path, &[(page_id, after.clone())]);
858        write_torn_page_bytes(&path, page_id, &before, &after);
859
860        {
861            let pager = Pager::open(&path, config).expect("open() should succeed");
862            let recovered = pager
863                .read_page(page_id)
864                .expect("read_page() should succeed");
865            assert_eq!(recovered.cell_count(), 2);
866
867            let (key, value) = recovered.read_cell(0).expect("read_cell() should succeed");
868            assert_eq!(key, b"phase");
869            assert_eq!(value, b"before");
870
871            let (key, value) = recovered.read_cell(1).expect("read_cell() should succeed");
872            assert_eq!(key, b"phase2");
873            assert_eq!(value, b"after");
874        }
875
876        assert_eq!(
877            fs::metadata(dwb_path_for(&path))
878                .expect("metadata() should succeed")
879                .len(),
880            0
881        );
882        cleanup(&path);
883    }
884
885    // -----------------------------------------------------------------
886    // Target 3: WAL-first flush ordering
887    // -----------------------------------------------------------------
888
889    #[test]
890    fn pager_starts_without_wal_writer() {
891        let path = temp_db_path();
892        let pager = Pager::open(&path, PagerConfig::default()).expect("open() should succeed");
893        assert!(!pager.has_wal_writer());
894        drop(pager);
895        cleanup(&path);
896    }
897
898    #[test]
899    fn set_wal_writer_attaches_handle() {
900        use crate::storage::wal::writer::WalWriter;
901        use std::sync::{Arc, Mutex};
902
903        let db_path = temp_db_path();
904        let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
905        let _ = fs::remove_file(&wal_path);
906
907        let pager = Pager::open(&db_path, PagerConfig::default()).expect("open() should succeed");
908        let wal = Arc::new(Mutex::new(
909            WalWriter::open(&wal_path).expect("open() should succeed"),
910        ));
911        pager.set_wal_writer(Arc::clone(&wal));
912        assert!(pager.has_wal_writer());
913
914        pager.clear_wal_writer();
915        assert!(!pager.has_wal_writer());
916
917        drop(pager);
918        let _ = fs::remove_file(&wal_path);
919        cleanup(&db_path);
920    }
921
922    #[test]
923    fn flush_with_lsn_zero_pages_skips_wal_call() {
924        // When every dirty page has lsn == 0 (the legacy auto-commit
925        // path), flush() must NOT call wal.flush_until — there is no
926        // WAL record to wait for. We verify this by attaching a WAL
927        // whose durable_lsn starts at 8 and confirming flush() does
928        // not advance it (no append, no flush).
929        use crate::storage::wal::writer::WalWriter;
930        use std::sync::{Arc, Mutex};
931
932        let db_path = temp_db_path();
933        let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
934        let _ = fs::remove_file(&wal_path);
935
936        let pager = Pager::open(&db_path, PagerConfig::default()).expect("open() should succeed");
937        let wal = Arc::new(Mutex::new(
938            WalWriter::open(&wal_path).expect("open() should succeed"),
939        ));
940        let initial_durable = {
941            let g = wal.lock().expect("lock() should succeed");
942            g.durable_lsn()
943        };
944        pager.set_wal_writer(Arc::clone(&wal));
945
946        // Allocate and write a page with lsn = 0.
947        let mut page = pager
948            .allocate_page(PageType::BTreeLeaf)
949            .expect("allocate_page() should succeed");
950        page.insert_cell(b"k", b"v")
951            .expect("insert_cell() should succeed");
952        // header.lsn stays at 0 — caller did not stamp.
953        pager
954            .write_page(page.page_id(), page)
955            .expect("write_page() should succeed");
956        pager.flush().expect("flush() should succeed");
957
958        // WAL durable_lsn must be unchanged because flush_until was
959        // never called (max lsn over dirty pages was 0).
960        let after_flush = {
961            let g = wal.lock().expect("lock() should succeed");
962            g.durable_lsn()
963        };
964        assert_eq!(after_flush, initial_durable);
965
966        drop(pager);
967        let _ = fs::remove_file(&wal_path);
968        cleanup(&db_path);
969    }
970
971    #[test]
972    fn flush_advances_wal_durable_when_pages_carry_lsn() {
973        // The full WAL-first dance: append a record, capture the
974        // returned LSN, stamp it on a page, flush — afterwards the
975        // WAL must be durable up to at least that LSN.
976        use crate::storage::wal::record::WalRecord;
977        use crate::storage::wal::writer::WalWriter;
978        use std::sync::{Arc, Mutex};
979
980        let db_path = temp_db_path();
981        let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
982        let _ = fs::remove_file(&wal_path);
983
984        let pager = Pager::open(&db_path, PagerConfig::default()).expect("open() should succeed");
985        let wal = Arc::new(Mutex::new(
986            WalWriter::open(&wal_path).expect("open() should succeed"),
987        ));
988        pager.set_wal_writer(Arc::clone(&wal));
989
990        // Stamp two dirty pages with a real WAL LSN.
991        let stamped_lsn = {
992            let mut wal_guard = wal.lock().expect("lock() should succeed");
993            wal_guard
994                .append(&WalRecord::Begin { tx_id: 1 })
995                .expect("append() should succeed");
996            wal_guard
997                .append(&WalRecord::Commit { tx_id: 1 })
998                .expect("append() should succeed");
999            wal_guard.current_lsn()
1000        };
1001        let mut page = pager
1002            .allocate_page(PageType::BTreeLeaf)
1003            .expect("allocate_page() should succeed");
1004        page.insert_cell(b"k", b"v")
1005            .expect("insert_cell() should succeed");
1006        // Use the public Page API to set the LSN.
1007        page.set_lsn(stamped_lsn);
1008        pager
1009            .write_page(page.page_id(), page)
1010            .expect("write_page() should succeed");
1011        pager.flush().expect("flush() should succeed");
1012
1013        // After flush, the WAL is durable at least up to our stamp.
1014        let after_flush = {
1015            let g = wal.lock().expect("lock() should succeed");
1016            g.durable_lsn()
1017        };
1018        assert!(
1019            after_flush >= stamped_lsn,
1020            "after flush durable_lsn {} must be >= stamped {}",
1021            after_flush,
1022            stamped_lsn
1023        );
1024
1025        drop(pager);
1026        let _ = fs::remove_file(&wal_path);
1027        cleanup(&db_path);
1028    }
1029
1030    // -----------------------------------------------------------------
1031    // gh-892: filesystem block-size alignment diagnostic
1032    // -----------------------------------------------------------------
1033
1034    #[test]
1035    fn block_size_warn_fires_for_mismatched_block_size() {
1036        // A block size that does not divide the 16 KiB page size means a
1037        // page write straddles FS blocks — the predicate must report a
1038        // misalignment so `open()` emits the warning.
1039        assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6000));
1040        // Block larger than the page (e.g. 1 MiB): 16384 % 1048576 != 0.
1041        assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 1_048_576));
1042        // 6 KiB also fails to divide 16 KiB.
1043        assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6 * 1024));
1044    }
1045
1046    #[test]
1047    fn block_size_silent_for_divisor() {
1048        // Block sizes that evenly divide the page size: no straddle, no warn.
1049        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 4096));
1050        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 16384));
1051        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 512));
1052        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 8192));
1053    }
1054
1055    #[test]
1056    fn block_size_unavailable_is_silent() {
1057        // st_blksize == 0 means the probe is unavailable; never warn on it.
1058        assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 0));
1059    }
1060
1061    #[test]
1062    fn page_size_is_unchanged_16kib() {
1063        // The diagnostic must never alter the compile-time page size.
1064        assert_eq!(PAGE_SIZE, 16 * 1024);
1065    }
1066}