Skip to main content

reddb_server/storage/engine/
pager.rs

1//! Pager - Page I/O Manager
2//!
3//! The Pager is responsible for reading and writing pages to/from disk.
4//! It integrates with the PageCache for efficient caching and the FreeList
5//! for page allocation.
6//!
7//! # Responsibilities
8//!
9//! 1. **Page I/O**: Read/write 4KB pages from/to disk
10//! 2. **Caching**: Integrate with SIEVE PageCache
11//! 3. **Allocation**: Manage free page allocation via FreeList
12//! 4. **Header Management**: Maintain database header (page 0)
13//!
14//! # File Layout
15//!
16//! ```text
17//! ┌─────────────────────────────────────────────────────────────┐
18//! │ Page 0: Database Header                                     │
19//! │   - Magic bytes "RDDB"                                      │
20//! │   - Version                                                 │
21//! │   - Page count                                              │
22//! │   - Freelist head                                           │
23//! ├─────────────────────────────────────────────────────────────┤
24//! │ Page 1: Root B-tree page (or first data page)              │
25//! ├─────────────────────────────────────────────────────────────┤
26//! │ Page 2..N: Data pages                                       │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # References
31//!
32//! - Turso `core/storage/pager.rs:54-134` - HeaderRef::from_pager()
33//! - Turso `core/storage/pager.rs:120` - pager.add_dirty(&page)
34
35use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, DB_VERSION, HEADER_SIZE, MAGIC_BYTES, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::{Arc, Mutex, RwLock};
44
45/// Default cache size (pages)
46const DEFAULT_CACHE_SIZE: usize = 10_000;
47
48/// Pager error types
49#[derive(Debug)]
50pub enum PagerError {
51    /// I/O error
52    Io(std::io::Error),
53    /// Page error
54    Page(PageError),
55    /// Invalid database file
56    InvalidDatabase(String),
57    /// Database is read-only
58    ReadOnly,
59    /// Page not found
60    PageNotFound(u32),
61    /// Database is locked
62    Locked,
63    /// A Mutex or RwLock was poisoned (another thread panicked while holding it)
64    LockPoisoned,
65    /// Database is encrypted but no key was supplied.
66    EncryptionRequired,
67    /// Plain (unencrypted) database opened with an encryption key.
68    PlainDatabaseRefusesKey,
69    /// Encryption key validation failed for an encrypted database.
70    InvalidKey,
71}
72
73impl std::fmt::Display for PagerError {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            Self::Io(e) => write!(f, "I/O error: {}", e),
77            Self::Page(e) => write!(f, "Page error: {}", e),
78            Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
79            Self::ReadOnly => write!(f, "Database is read-only"),
80            Self::PageNotFound(id) => write!(f, "Page {} not found", id),
81            Self::Locked => write!(f, "Database is locked"),
82            Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
83            Self::EncryptionRequired => write!(
84                f,
85                "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
86            ),
87            Self::PlainDatabaseRefusesKey => write!(
88                f,
89                "Plain (unencrypted) database opened with an encryption key — refusing"
90            ),
91            Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
92        }
93    }
94}
95
96impl std::error::Error for PagerError {
97    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
98        match self {
99            Self::Io(e) => Some(e),
100            Self::Page(e) => Some(e),
101            _ => None,
102        }
103    }
104}
105
106impl From<std::io::Error> for PagerError {
107    fn from(e: std::io::Error) -> Self {
108        Self::Io(e)
109    }
110}
111
112impl From<PageError> for PagerError {
113    fn from(e: PageError) -> Self {
114        Self::Page(e)
115    }
116}
117
118/// Pager configuration
119#[derive(Debug, Clone)]
120pub struct PagerConfig {
121    /// Page cache capacity
122    pub cache_size: usize,
123    /// Whether to open read-only
124    pub read_only: bool,
125    /// Whether to create if not exists
126    pub create: bool,
127    /// Whether to verify checksums on read
128    pub verify_checksums: bool,
129    /// Enable double-write buffer for torn page protection
130    pub double_write: bool,
131    /// Optional encryption key. When set, `Pager::open` writes/reads
132    /// pages through `PageEncryptor` and rejects any DB whose
133    /// encryption-marker disagrees with the supplied key (or its
134    /// absence). When `None`, the pager refuses to open a DB whose
135    /// header carries the `RDBE` encryption marker.
136    pub encryption: Option<crate::storage::encryption::SecureKey>,
137}
138
139impl Default for PagerConfig {
140    fn default() -> Self {
141        Self {
142            cache_size: DEFAULT_CACHE_SIZE,
143            read_only: false,
144            create: true,
145            verify_checksums: true,
146            double_write: true,
147            encryption: None,
148        }
149    }
150}
151
152/// Database file header information
153#[derive(Debug, Clone)]
154pub struct DatabaseHeader {
155    /// Database version
156    pub version: u32,
157    /// Page size (always 4096)
158    pub page_size: u32,
159    /// Total number of pages
160    pub page_count: u32,
161    /// First freelist trunk page ID (0 = no free pages)
162    pub freelist_head: u32,
163    /// Schema version (for migrations)
164    pub schema_version: u32,
165    /// Last checkpoint LSN
166    pub checkpoint_lsn: u64,
167    /// Whether a checkpoint is currently in progress (two-phase)
168    pub checkpoint_in_progress: bool,
169    /// Target LSN for the in-progress checkpoint
170    pub checkpoint_target_lsn: u64,
171    /// Physical layout header mirrored into page 0
172    pub physical: PhysicalFileHeader,
173}
174
175/// Minimal physical state published into page 0 for paged databases.
176#[derive(Debug, Clone, Copy, Default)]
177pub struct PhysicalFileHeader {
178    pub format_version: u32,
179    pub sequence: u64,
180    pub manifest_oldest_root: u64,
181    pub manifest_root: u64,
182    pub free_set_root: u64,
183    pub manifest_page: u32,
184    pub manifest_checksum: u64,
185    pub collection_roots_page: u32,
186    pub collection_roots_checksum: u64,
187    pub collection_root_count: u32,
188    pub snapshot_count: u32,
189    pub index_count: u32,
190    pub catalog_collection_count: u32,
191    pub catalog_total_entities: u64,
192    pub export_count: u32,
193    pub graph_projection_count: u32,
194    pub analytics_job_count: u32,
195    pub manifest_event_count: u32,
196    pub registry_page: u32,
197    pub registry_checksum: u64,
198    pub recovery_page: u32,
199    pub recovery_checksum: u64,
200    pub catalog_page: u32,
201    pub catalog_checksum: u64,
202    pub metadata_state_page: u32,
203    pub metadata_state_checksum: u64,
204    pub vector_artifact_page: u32,
205    pub vector_artifact_checksum: u64,
206}
207
208impl Default for DatabaseHeader {
209    fn default() -> Self {
210        Self {
211            version: DB_VERSION,
212            page_size: PAGE_SIZE as u32,
213            page_count: 1, // Header page
214            freelist_head: 0,
215            schema_version: 0,
216            checkpoint_lsn: 0,
217            checkpoint_in_progress: false,
218            checkpoint_target_lsn: 0,
219            physical: PhysicalFileHeader::default(),
220        }
221    }
222}
223
224/// Page I/O Manager
225///
226/// Handles reading/writing pages and manages the page cache.
227pub struct Pager {
228    /// Database file path
229    path: PathBuf,
230    /// File handle
231    file: Mutex<File>,
232    /// Exclusive file lock (held for lifetime, released on drop)
233    _lock_file: Option<File>,
234    /// Double-write buffer file (.rdb-dwb)
235    dwb_file: Option<Mutex<File>>,
236    /// Page cache
237    cache: PageCache,
238    /// Free page list
239    freelist: RwLock<FreeList>,
240    /// Database header
241    header: RwLock<DatabaseHeader>,
242    /// Configuration
243    config: PagerConfig,
244    /// Dirty flag for header
245    header_dirty: Mutex<bool>,
246    /// Optional WAL writer for WAL-first flush ordering.
247    ///
248    /// When set, [`Pager::flush`] computes the maximum `header.lsn` of
249    /// every dirty page and calls [`WalWriter::flush_until`] before
250    /// passing the batch to the double-write buffer. This guarantees
251    /// the postgres-style invariant: a page on disk implies its WAL
252    /// record is already durable.
253    ///
254    /// Wired in via [`Pager::set_wal_writer`] post-construction so
255    /// existing callers that build a Pager without a WAL keep working
256    /// unchanged. See `PLAN.md` § Target 3.
257    wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
258    /// Optional page encryptor + header. When set, `read_page` /
259    /// `write_page` route through AES-GCM transparently and page 0
260    /// bypasses encryption (it carries the encryption marker +
261    /// header itself). When `None`, all pages are stored plaintext
262    /// and any DB header carrying the `RDBE` marker is rejected at
263    /// open time.
264    pub(crate) encryption: Option<(
265        crate::storage::encryption::PageEncryptor,
266        crate::storage::encryption::EncryptionHeader,
267    )>,
268}
269
270#[path = "pager/impl.rs"]
271mod pager_impl;
272impl Drop for Pager {
273    fn drop(&mut self) {
274        // Try to flush on drop
275        let _ = self.flush();
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use std::fs;
283    use std::io::Write;
284
285    fn temp_db_path() -> PathBuf {
286        use std::sync::atomic::{AtomicU64, Ordering};
287        static COUNTER: AtomicU64 = AtomicU64::new(0);
288        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
289        let mut path = std::env::temp_dir();
290        path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
291        path
292    }
293
294    fn cleanup(path: &Path) {
295        let _ = fs::remove_file(path);
296        // Clean up companion files
297        let mut hdr = path.to_path_buf().into_os_string();
298        hdr.push("-hdr");
299        let _ = fs::remove_file(&hdr);
300        let mut meta = path.to_path_buf().into_os_string();
301        meta.push("-meta");
302        let _ = fs::remove_file(&meta);
303        let mut dwb = path.to_path_buf().into_os_string();
304        dwb.push("-dwb");
305        let _ = fs::remove_file(&dwb);
306    }
307
308    fn dwb_path_for(path: &Path) -> PathBuf {
309        let mut dwb = path.to_path_buf().into_os_string();
310        dwb.push("-dwb");
311        PathBuf::from(dwb)
312    }
313
314    fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
315        let entry_size = 4 + PAGE_SIZE;
316        let header_len = 12;
317        let total = header_len + pages.len() * entry_size;
318        let mut buf = Vec::with_capacity(total);
319
320        buf.extend_from_slice(&[0x52, 0x44, 0x44, 0x57]); // "RDDW"
321        buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
322        buf.extend_from_slice(&[0u8; 4]);
323
324        for (page_id, page) in pages {
325            let mut page = page.clone();
326            page.update_checksum();
327            buf.extend_from_slice(&page_id.to_le_bytes());
328            buf.extend_from_slice(page.as_bytes());
329        }
330
331        let checksum = crate::storage::engine::crc32::crc32(&buf[header_len..]);
332        buf[8..12].copy_from_slice(&checksum.to_le_bytes());
333
334        let dwb_path = dwb_path_for(path);
335        let mut file = fs::File::create(&dwb_path).unwrap();
336        file.write_all(&buf).unwrap();
337        file.sync_all().unwrap();
338    }
339
340    #[test]
341    fn test_pager_create_new() {
342        let path = temp_db_path();
343        cleanup(&path);
344
345        {
346            let pager = Pager::open_default(&path).unwrap();
347            assert_eq!(pager.page_count().unwrap(), 3); // Header + reserved pages
348        }
349
350        cleanup(&path);
351    }
352
353    #[test]
354    fn test_pager_reopen() {
355        let path = temp_db_path();
356        cleanup(&path);
357
358        // Create and write
359        {
360            let pager = Pager::open_default(&path).unwrap();
361
362            // Allocate a page
363            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
364            assert_eq!(page.page_id(), 3);
365
366            pager.sync().unwrap();
367        }
368
369        // Reopen and verify
370        {
371            let pager = Pager::open_default(&path).unwrap();
372            assert_eq!(pager.page_count().unwrap(), 4); // Header + reserved pages + 1 data page
373        }
374
375        cleanup(&path);
376    }
377
378    #[test]
379    fn test_pager_read_write() {
380        let path = temp_db_path();
381        cleanup(&path);
382
383        {
384            let pager = Pager::open_default(&path).unwrap();
385
386            // Allocate and write
387            let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
388            let page_id = page.page_id();
389
390            page.insert_cell(b"key", b"value").unwrap();
391            pager.write_page(page_id, page).unwrap();
392
393            // Read back
394            let read_page = pager.read_page(page_id).unwrap();
395            let (key, value) = read_page.read_cell(0).unwrap();
396            assert_eq!(key, b"key");
397            assert_eq!(value, b"value");
398        }
399
400        cleanup(&path);
401    }
402
403    #[test]
404    fn test_pager_cache() {
405        let path = temp_db_path();
406        cleanup(&path);
407
408        {
409            let pager = Pager::open_default(&path).unwrap();
410
411            // Allocate a page
412            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
413            let page_id = page.page_id();
414
415            // First read - should be cached from allocate
416            let _ = pager.read_page(page_id).unwrap();
417
418            // Second read - should hit cache
419            let _ = pager.read_page(page_id).unwrap();
420
421            let stats = pager.cache_stats();
422            assert!(stats.hits >= 1);
423        }
424
425        cleanup(&path);
426    }
427
428    #[test]
429    fn test_pager_free_page() {
430        let path = temp_db_path();
431        cleanup(&path);
432
433        {
434            let pager = Pager::open_default(&path).unwrap();
435
436            // Allocate pages
437            let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
438            let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
439
440            let id1 = page1.page_id();
441            let id2 = page2.page_id();
442
443            // Free page 1
444            pager.free_page(id1).unwrap();
445
446            // Next allocation should reuse page 1
447            let page3 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
448            assert_eq!(page3.page_id(), id1);
449        }
450
451        cleanup(&path);
452    }
453
454    #[test]
455    fn test_freelist_persistence() {
456        let path = temp_db_path();
457        cleanup(&path);
458
459        let freed_id;
460        {
461            let pager = Pager::open_default(&path).unwrap();
462            let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
463            let _page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
464            freed_id = page1.page_id();
465
466            pager.free_page(freed_id).unwrap();
467            pager.sync().unwrap();
468        }
469
470        {
471            let pager = Pager::open_default(&path).unwrap();
472            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
473            assert_eq!(page.page_id(), freed_id);
474        }
475
476        cleanup(&path);
477    }
478
479    #[test]
480    fn test_pager_read_only() {
481        let path = temp_db_path();
482        cleanup(&path);
483
484        // Create database
485        {
486            let pager = Pager::open_default(&path).unwrap();
487            pager.sync().unwrap();
488        }
489
490        // Open read-only
491        {
492            let config = PagerConfig {
493                read_only: true,
494                ..Default::default()
495            };
496
497            let pager = Pager::open(&path, config).unwrap();
498            assert!(pager.is_read_only());
499
500            // Should fail to allocate
501            assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
502        }
503
504        cleanup(&path);
505    }
506
507    #[test]
508    fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
509        let path = temp_db_path();
510        cleanup(&path);
511
512        let config = PagerConfig {
513            double_write: true,
514            ..Default::default()
515        };
516
517        let page_id;
518        {
519            let pager = Pager::open(&path, config.clone()).unwrap();
520            let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
521            page_id = page.page_id();
522            pager.sync().unwrap();
523        }
524
525        let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
526        recovered_page.insert_cell(b"key", b"value").unwrap();
527        write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
528
529        let dwb_path = dwb_path_for(&path);
530        assert!(dwb_path.exists());
531        assert!(fs::metadata(&dwb_path).unwrap().len() > 0);
532
533        {
534            let pager = Pager::open(&path, config).unwrap();
535
536            let read_page = pager.read_page(page_id).unwrap();
537            let (key, value) = read_page.read_cell(0).unwrap();
538            assert_eq!(key, b"key");
539            assert_eq!(value, b"value");
540
541            assert!(dwb_path.exists());
542            assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
543
544            let mut updated_page = recovered_page.clone();
545            updated_page.insert_cell(b"key2", b"value2").unwrap();
546            pager.write_page(page_id, updated_page).unwrap();
547            pager.flush().unwrap();
548
549            assert!(dwb_path.exists());
550            assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
551        }
552
553        cleanup(&path);
554    }
555
556    // -----------------------------------------------------------------
557    // Target 3: WAL-first flush ordering
558    // -----------------------------------------------------------------
559
560    #[test]
561    fn pager_starts_without_wal_writer() {
562        let path = temp_db_path();
563        let pager = Pager::open(&path, PagerConfig::default()).unwrap();
564        assert!(!pager.has_wal_writer());
565        drop(pager);
566        cleanup(&path);
567    }
568
569    #[test]
570    fn set_wal_writer_attaches_handle() {
571        use crate::storage::wal::writer::WalWriter;
572        use std::sync::{Arc, Mutex};
573
574        let db_path = temp_db_path();
575        let mut wal_path = db_path.clone();
576        wal_path.set_extension("wal");
577        let _ = fs::remove_file(&wal_path);
578
579        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
580        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
581        pager.set_wal_writer(Arc::clone(&wal));
582        assert!(pager.has_wal_writer());
583
584        pager.clear_wal_writer();
585        assert!(!pager.has_wal_writer());
586
587        drop(pager);
588        let _ = fs::remove_file(&wal_path);
589        cleanup(&db_path);
590    }
591
592    #[test]
593    fn flush_with_lsn_zero_pages_skips_wal_call() {
594        // When every dirty page has lsn == 0 (the legacy auto-commit
595        // path), flush() must NOT call wal.flush_until — there is no
596        // WAL record to wait for. We verify this by attaching a WAL
597        // whose durable_lsn starts at 8 and confirming flush() does
598        // not advance it (no append, no flush).
599        use crate::storage::wal::writer::WalWriter;
600        use std::sync::{Arc, Mutex};
601
602        let db_path = temp_db_path();
603        let mut wal_path = db_path.clone();
604        wal_path.set_extension("wal");
605        let _ = fs::remove_file(&wal_path);
606
607        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
608        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
609        let initial_durable = {
610            let g = wal.lock().unwrap();
611            g.durable_lsn()
612        };
613        pager.set_wal_writer(Arc::clone(&wal));
614
615        // Allocate and write a page with lsn = 0.
616        let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
617        page.insert_cell(b"k", b"v").unwrap();
618        // header.lsn stays at 0 — caller did not stamp.
619        pager.write_page(page.page_id(), page).unwrap();
620        pager.flush().unwrap();
621
622        // WAL durable_lsn must be unchanged because flush_until was
623        // never called (max lsn over dirty pages was 0).
624        let after_flush = {
625            let g = wal.lock().unwrap();
626            g.durable_lsn()
627        };
628        assert_eq!(after_flush, initial_durable);
629
630        drop(pager);
631        let _ = fs::remove_file(&wal_path);
632        cleanup(&db_path);
633    }
634
635    #[test]
636    fn flush_advances_wal_durable_when_pages_carry_lsn() {
637        // The full WAL-first dance: append a record, capture the
638        // returned LSN, stamp it on a page, flush — afterwards the
639        // WAL must be durable up to at least that LSN.
640        use crate::storage::wal::record::WalRecord;
641        use crate::storage::wal::writer::WalWriter;
642        use std::sync::{Arc, Mutex};
643
644        let db_path = temp_db_path();
645        let mut wal_path = db_path.clone();
646        wal_path.set_extension("wal");
647        let _ = fs::remove_file(&wal_path);
648
649        let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
650        let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
651        pager.set_wal_writer(Arc::clone(&wal));
652
653        // Stamp two dirty pages with a real WAL LSN.
654        let stamped_lsn = {
655            let mut wal_guard = wal.lock().unwrap();
656            wal_guard.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
657            wal_guard.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
658            wal_guard.current_lsn()
659        };
660        let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
661        page.insert_cell(b"k", b"v").unwrap();
662        // Use the public Page API to set the LSN.
663        page.set_lsn(stamped_lsn);
664        pager.write_page(page.page_id(), page).unwrap();
665        pager.flush().unwrap();
666
667        // After flush, the WAL is durable at least up to our stamp.
668        let after_flush = {
669            let g = wal.lock().unwrap();
670            g.durable_lsn()
671        };
672        assert!(
673            after_flush >= stamped_lsn,
674            "after flush durable_lsn {} must be >= stamped {}",
675            after_flush,
676            stamped_lsn
677        );
678
679        drop(pager);
680        let _ = fs::remove_file(&wal_path);
681        cleanup(&db_path);
682    }
683}