1use crate::fast_lock::SpinLock;
2use crate::result::LimboResult;
3use crate::storage::btree::BTreePageInner;
4use crate::storage::buffer_pool::BufferPool;
5use crate::storage::database::DatabaseStorage;
6use crate::storage::sqlite3_ondisk::{
7 self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID, DATABASE_HEADER_SIZE,
8};
9use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
10use crate::types::CursorResult;
11use crate::Completion;
12use crate::{Buffer, LimboError, Result};
13use parking_lot::RwLock;
14use std::cell::{Cell, RefCell, UnsafeCell};
15use std::collections::HashSet;
16use std::rc::Rc;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use tracing::trace;
20
21use super::btree::BTreePage;
22use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
23use super::wal::{CheckpointMode, CheckpointStatus};
24
25#[cfg(not(feature = "omit_autovacuum"))]
26use {crate::io::Buffer as IoBuffer, ptrmap::*};
27
28pub struct PageInner {
29 pub flags: AtomicUsize,
30 pub contents: Option<PageContent>,
31 pub id: usize,
32}
33
34#[derive(Debug)]
35pub struct Page {
36 pub inner: UnsafeCell<PageInner>,
37}
38
39pub type PageRef = Arc<Page>;
42
43const PAGE_UPTODATE: usize = 0b001;
45const PAGE_LOCKED: usize = 0b010;
47const PAGE_ERROR: usize = 0b100;
49const PAGE_DIRTY: usize = 0b1000;
51const PAGE_LOADED: usize = 0b10000;
53
54impl Page {
55 pub fn new(id: usize) -> Self {
56 Self {
57 inner: UnsafeCell::new(PageInner {
58 flags: AtomicUsize::new(0),
59 contents: None,
60 id,
61 }),
62 }
63 }
64
65 #[allow(clippy::mut_from_ref)]
66 pub fn get(&self) -> &mut PageInner {
67 unsafe { &mut *self.inner.get() }
68 }
69
70 pub fn get_contents(&self) -> &mut PageContent {
71 self.get().contents.as_mut().unwrap()
72 }
73
74 pub fn is_uptodate(&self) -> bool {
75 self.get().flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0
76 }
77
78 pub fn set_uptodate(&self) {
79 self.get().flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst);
80 }
81
82 pub fn clear_uptodate(&self) {
83 self.get().flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst);
84 }
85
86 pub fn is_locked(&self) -> bool {
87 self.get().flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0
88 }
89
90 pub fn set_locked(&self) {
91 self.get().flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst);
92 }
93
94 pub fn clear_locked(&self) {
95 self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst);
96 }
97
98 pub fn is_error(&self) -> bool {
99 self.get().flags.load(Ordering::SeqCst) & PAGE_ERROR != 0
100 }
101
102 pub fn set_error(&self) {
103 self.get().flags.fetch_or(PAGE_ERROR, Ordering::SeqCst);
104 }
105
106 pub fn clear_error(&self) {
107 self.get().flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst);
108 }
109
110 pub fn is_dirty(&self) -> bool {
111 self.get().flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0
112 }
113
114 pub fn set_dirty(&self) {
115 tracing::debug!("set_dirty(page={})", self.get().id);
116 self.get().flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst);
117 }
118
119 pub fn clear_dirty(&self) {
120 tracing::debug!("clear_dirty(page={})", self.get().id);
121 self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst);
122 }
123
124 pub fn is_loaded(&self) -> bool {
125 self.get().flags.load(Ordering::SeqCst) & PAGE_LOADED != 0
126 }
127
128 pub fn set_loaded(&self) {
129 self.get().flags.fetch_or(PAGE_LOADED, Ordering::SeqCst);
130 }
131
132 pub fn clear_loaded(&self) {
133 tracing::debug!("clear loaded {}", self.get().id);
134 self.get().flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst);
135 }
136
137 pub fn is_index(&self) -> bool {
138 match self.get_contents().page_type() {
139 PageType::IndexLeaf | PageType::IndexInterior => true,
140 PageType::TableLeaf | PageType::TableInterior => false,
141 }
142 }
143}
144
145#[derive(Clone, Copy, Debug)]
146enum FlushState {
148 Start,
150 WaitAppendFrames,
152 SyncWal,
154 Checkpoint,
156 SyncDbFile,
158 WaitSyncDbFile,
160}
161
162#[derive(Clone, Debug, Copy)]
163enum CheckpointState {
164 Checkpoint,
165 SyncDbFile,
166 WaitSyncDbFile,
167 CheckpointDone,
168}
169
170pub enum BtreePageAllocMode {
172 Any,
174 Exact(u32),
176 Le(u32),
178}
179
180struct FlushInfo {
182 state: FlushState,
183 in_flight_writes: Rc<RefCell<usize>>,
185}
186
187#[derive(Clone, Copy, Debug)]
189pub enum AutoVacuumMode {
190 None,
191 Full,
192 Incremental,
193}
194
195#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200pub enum SynchronousMode {
201 Off,
203 Normal,
205 Full,
207 Extra,
210}
211
212impl SynchronousMode {
213 pub fn as_i64(self) -> i64 {
215 match self {
216 SynchronousMode::Off => 0,
217 SynchronousMode::Normal => 1,
218 SynchronousMode::Full => 2,
219 SynchronousMode::Extra => 3,
220 }
221 }
222}
223
224#[derive(Debug, Clone)]
229pub struct SavepointFrame {
230 pub name: String,
232 pub wal_max_frame: u64,
234 pub wal_checksum: (u32, u32),
236 pub is_transaction_owner: bool,
240}
241
242pub struct Pager {
246 pub db_file: Arc<dyn DatabaseStorage>,
248 wal: Rc<RefCell<dyn Wal>>,
250 page_cache: Arc<RwLock<DumbLruPageCache>>,
252 buffer_pool: Rc<BufferPool>,
254 pub io: Arc<dyn crate::io::IO>,
256 dirty_pages: Rc<RefCell<HashSet<usize>>>,
257 pub db_header: Arc<SpinLock<DatabaseHeader>>,
258
259 flush_info: RefCell<FlushInfo>,
260 checkpoint_state: RefCell<CheckpointState>,
261 checkpoint_inflight: Rc<RefCell<usize>>,
262 syncing: Rc<RefCell<bool>>,
263 auto_vacuum_mode: RefCell<AutoVacuumMode>,
264 synchronous_mode: Cell<SynchronousMode>,
266 pub savepoints: RefCell<Vec<SavepointFrame>>,
268}
269
270#[derive(Debug, Copy, Clone)]
271pub enum PagerCacheflushStatus {
275 Done(PagerCacheflushResult),
276 IO,
277}
278
279#[derive(Debug, Copy, Clone)]
280pub enum PagerCacheflushResult {
281 WalWritten,
283 Checkpointed(CheckpointResult),
286}
287
288impl Pager {
289 pub fn begin_open(db_file: Arc<dyn DatabaseStorage>) -> Result<Arc<SpinLock<DatabaseHeader>>> {
291 sqlite3_ondisk::begin_read_database_header(db_file)
292 }
293
294 pub fn finish_open(
296 db_header_ref: Arc<SpinLock<DatabaseHeader>>,
297 db_file: Arc<dyn DatabaseStorage>,
298 wal: Rc<RefCell<dyn Wal>>,
299 io: Arc<dyn crate::io::IO>,
300 page_cache: Arc<RwLock<DumbLruPageCache>>,
301 buffer_pool: Rc<BufferPool>,
302 ) -> Result<Self> {
303 Ok(Self {
304 db_file,
305 wal,
306 page_cache,
307 io,
308 dirty_pages: Rc::new(RefCell::new(HashSet::new())),
309 db_header: db_header_ref.clone(),
310 flush_info: RefCell::new(FlushInfo {
311 state: FlushState::Start,
312 in_flight_writes: Rc::new(RefCell::new(0)),
313 }),
314 syncing: Rc::new(RefCell::new(false)),
315 checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
316 checkpoint_inflight: Rc::new(RefCell::new(0)),
317 buffer_pool,
318 auto_vacuum_mode: RefCell::new(AutoVacuumMode::None),
319 synchronous_mode: Cell::new(SynchronousMode::Normal),
320 savepoints: RefCell::new(Vec::new()),
321 })
322 }
323
324 pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode {
325 *self.auto_vacuum_mode.borrow()
326 }
327
328 pub fn set_auto_vacuum_mode(&self, mode: AutoVacuumMode) {
329 *self.auto_vacuum_mode.borrow_mut() = mode;
330 }
331
332 pub fn get_synchronous_mode(&self) -> SynchronousMode {
333 self.synchronous_mode.get()
334 }
335
336 pub fn set_synchronous_mode(&self, mode: SynchronousMode) {
337 self.synchronous_mode.set(mode);
338 }
339
340 #[cfg(not(feature = "omit_autovacuum"))]
344 pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
345 tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
346 let configured_page_size = self.db_header.lock().get_page_size() as usize;
347
348 if target_page_num < FIRST_PTRMAP_PAGE_NO
349 || is_ptrmap_page(target_page_num, configured_page_size)
350 {
351 return Ok(CursorResult::Ok(None));
352 }
353
354 let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
355 let offset_in_ptrmap_page =
356 get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?;
357 tracing::trace!(
358 "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
359 target_page_num,
360 ptrmap_pg_no
361 );
362
363 let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
364 if ptrmap_page.is_locked() {
365 return Ok(CursorResult::IO);
366 }
367 if !ptrmap_page.is_loaded() {
368 return Ok(CursorResult::IO);
369 }
370 let ptrmap_page_inner = ptrmap_page.get();
371
372 let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
373 Some(content) => content,
374 None => {
375 return Err(LimboError::InternalError(format!(
376 "Ptrmap page {} content not loaded",
377 ptrmap_pg_no
378 )))
379 }
380 };
381
382 let page_buffer_guard: std::cell::Ref<IoBuffer> = page_content.buffer.borrow();
383 let full_buffer_slice: &[u8] = page_buffer_guard.as_slice();
384
385 if ptrmap_pg_no != 1 && page_content.offset != 0 {
388 return Err(LimboError::Corrupt(format!(
389 "Ptrmap page {} has unexpected internal offset {}",
390 ptrmap_pg_no, page_content.offset
391 )));
392 }
393 let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
394 let actual_data_length = ptrmap_page_data_slice.len();
395
396 if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
398 return Err(LimboError::InternalError(format!(
399 "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
400 offset_in_ptrmap_page, PTRMAP_ENTRY_SIZE, ptrmap_pg_no, actual_data_length
401 )));
402 }
403
404 let entry_slice = &ptrmap_page_data_slice
405 [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
406 match PtrmapEntry::deserialize(entry_slice) {
407 Some(entry) => Ok(CursorResult::Ok(Some(entry))),
408 None => Err(LimboError::Corrupt(format!(
409 "Failed to deserialize ptrmap entry for page {} from ptrmap page {}",
410 target_page_num, ptrmap_pg_no
411 ))),
412 }
413 }
414
415 #[cfg(not(feature = "omit_autovacuum"))]
419 pub fn ptrmap_put(
420 &self,
421 db_page_no_to_update: u32,
422 entry_type: PtrmapType,
423 parent_page_no: u32,
424 ) -> Result<CursorResult<()>> {
425 tracing::trace!(
426 "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {})",
427 db_page_no_to_update,
428 entry_type,
429 parent_page_no
430 );
431
432 let page_size = self.db_header.lock().get_page_size() as usize;
433
434 if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
435 || is_ptrmap_page(db_page_no_to_update, page_size)
436 {
437 return Err(LimboError::InternalError(format!(
438 "Cannot set ptrmap entry for page {}: it's a header/ptrmap page or invalid.",
439 db_page_no_to_update
440 )));
441 }
442
443 let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size);
444 let offset_in_ptrmap_page =
445 get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?;
446 tracing::trace!(
447 "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}",
448 db_page_no_to_update,
449 entry_type,
450 parent_page_no,
451 ptrmap_pg_no,
452 offset_in_ptrmap_page
453 );
454
455 let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
456 if ptrmap_page.is_locked() {
457 return Ok(CursorResult::IO);
458 }
459 if !ptrmap_page.is_loaded() {
460 return Ok(CursorResult::IO);
461 }
462 let ptrmap_page_inner = ptrmap_page.get();
463
464 let page_content = match ptrmap_page_inner.contents.as_ref() {
465 Some(content) => content,
466 None => {
467 return Err(LimboError::InternalError(format!(
468 "Ptrmap page {} content not loaded",
469 ptrmap_pg_no
470 )))
471 }
472 };
473
474 let mut page_buffer_guard = page_content.buffer.borrow_mut();
475 let full_buffer_slice = page_buffer_guard.as_mut_slice();
476
477 if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
478 return Err(LimboError::InternalError(format!(
479 "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
480 offset_in_ptrmap_page,
481 PTRMAP_ENTRY_SIZE,
482 ptrmap_pg_no,
483 full_buffer_slice.len()
484 )));
485 }
486
487 let entry = PtrmapEntry {
488 entry_type,
489 parent_page_no,
490 };
491 entry.serialize(
492 &mut full_buffer_slice
493 [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE],
494 )?;
495
496 ptrmap_page.set_dirty();
497 self.add_dirty(ptrmap_pg_no as usize);
498 Ok(CursorResult::Ok(()))
499 }
500
501 pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<CursorResult<u32>> {
504 let page_type = match flags {
505 _ if flags.is_table() => PageType::TableLeaf,
506 _ if flags.is_index() => PageType::IndexLeaf,
507 _ => unreachable!("Invalid flags state"),
508 };
509 #[cfg(feature = "omit_autovacuum")]
510 {
511 let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
512 let page_id = page.get().get().id;
513 return Ok(CursorResult::Ok(page_id as u32));
514 }
515
516 #[cfg(not(feature = "omit_autovacuum"))]
518 {
519 let auto_vacuum_mode = self.auto_vacuum_mode.borrow();
520 match *auto_vacuum_mode {
521 AutoVacuumMode::None => {
522 let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
523 let page_id = page.get().get().id;
524 return Ok(CursorResult::Ok(page_id as u32));
525 }
526 AutoVacuumMode::Full => {
527 let mut root_page_num = self.db_header.lock().vacuum_mode_largest_root_page;
528 assert!(root_page_num > 0); root_page_num += 1;
530 assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); while is_ptrmap_page(
533 root_page_num,
534 self.db_header.lock().get_page_size() as usize,
535 ) {
536 root_page_num += 1;
537 }
538 assert!(root_page_num >= 3); let page = self.do_allocate_page(
542 page_type,
543 0,
544 BtreePageAllocMode::Exact(root_page_num),
545 );
546 let allocated_page_id = page.get().get().id as u32;
547 if allocated_page_id != root_page_num {
548 }
550
551 match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? {
555 CursorResult::Ok(_) => Ok(CursorResult::Ok(allocated_page_id as u32)),
556 CursorResult::IO => Ok(CursorResult::IO),
557 }
558 }
559 AutoVacuumMode::Incremental => {
560 unimplemented!()
561 }
562 }
563 }
564 }
565
566 pub fn allocate_overflow_page(&self) -> PageRef {
570 let page = self.allocate_page().unwrap();
571 tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id);
572
573 let contents = page.get().contents.as_mut().unwrap();
575 let buf = contents.as_ptr();
576 buf.fill(0);
577
578 page
579 }
580
581 pub fn do_allocate_page(
585 &self,
586 page_type: PageType,
587 offset: usize,
588 _alloc_mode: BtreePageAllocMode,
589 ) -> BTreePage {
590 let page = self.allocate_page().unwrap();
591 let page = Arc::new(BTreePageInner {
592 page: RefCell::new(page),
593 });
594 crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16);
595 tracing::debug!(
596 "do_allocate_page(id={}, page_type={:?})",
597 page.get().get().id,
598 page.get().get_contents().page_type()
599 );
600 page
601 }
602
603 pub fn usable_space(&self) -> usize {
608 let db_header = self.db_header.lock();
609 (db_header.get_page_size() - db_header.reserved_space as u32) as usize
610 }
611
612 #[inline(always)]
613 pub fn begin_read_tx(&self) -> Result<LimboResult> {
614 self.wal.borrow_mut().begin_read_tx()
615 }
616
617 #[inline(always)]
618 pub fn begin_write_tx(&self) -> Result<LimboResult> {
619 self.wal.borrow_mut().begin_write_tx()
620 }
621
622 pub fn end_tx(&self) -> Result<PagerCacheflushStatus> {
623 let cacheflush_status = self.cacheflush()?;
624 return match cacheflush_status {
625 PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
626 PagerCacheflushStatus::Done(_) => {
627 self.wal.borrow().end_write_tx()?;
628 self.wal.borrow().end_read_tx()?;
629 self.savepoints.borrow_mut().clear();
631 Ok(cacheflush_status)
632 }
633 };
634 }
635
636 pub fn end_read_tx(&self) -> Result<()> {
637 self.wal.borrow().end_read_tx()?;
638 Ok(())
639 }
640
641 pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
643 tracing::trace!("read_page(page_idx = {})", page_idx);
644 let mut page_cache = self.page_cache.write();
645 let page_key = PageCacheKey::new(page_idx);
646 if let Some(page) = page_cache.get(&page_key) {
647 tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
648 return Ok(page.clone());
649 }
650 let page = Arc::new(Page::new(page_idx));
651 page.set_locked();
652
653 if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
654 self.wal
655 .borrow()
656 .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
657 {
658 page.set_uptodate();
659 }
660 match page_cache.insert(page_key, page.clone()) {
663 Ok(_) => {}
664 Err(CacheError::Full) => return Err(LimboError::CacheFull),
665 Err(CacheError::KeyExists) => {
666 unreachable!("Page should not exist in cache after get() miss")
667 }
668 Err(e) => {
669 return Err(LimboError::InternalError(format!(
670 "Failed to insert page into cache: {:?}",
671 e
672 )))
673 }
674 }
675 return Ok(page);
676 }
677
678 sqlite3_ondisk::begin_read_page(
679 self.db_file.clone(),
680 self.buffer_pool.clone(),
681 page.clone(),
682 page_idx,
683 )?;
684 match page_cache.insert(page_key, page.clone()) {
685 Ok(_) => {}
686 Err(CacheError::Full) => return Err(LimboError::CacheFull),
687 Err(CacheError::KeyExists) => {
688 unreachable!("Page should not exist in cache after get() miss")
689 }
690 Err(e) => {
691 return Err(LimboError::InternalError(format!(
692 "Failed to insert page into cache: {:?}",
693 e
694 )))
695 }
696 }
697 Ok(page)
698 }
699
700 pub fn write_database_header(&self, header: &DatabaseHeader) -> Result<()> {
702 let header_page = self.read_page(DATABASE_HEADER_PAGE_ID)?;
703 while header_page.is_locked() {
704 self.io.run_once()?;
706 }
707 header_page.set_dirty();
708 self.add_dirty(DATABASE_HEADER_PAGE_ID);
709
710 let contents = header_page.get().contents.as_ref().unwrap();
711 contents.write_database_header(&header);
712
713 Ok(())
714 }
715
716 pub fn refresh_header_from_wal(&self) -> Result<()> {
739 if let LimboResult::Busy = self.begin_read_tx()? {
744 return Ok(());
748 }
749
750 let result = self.refresh_header_from_wal_inner();
751
752 self.end_read_tx()?;
754 result
755 }
756
757 fn refresh_header_from_wal_inner(&self) -> Result<()> {
758 let has_wal_frame = self
759 .wal
760 .borrow()
761 .find_frame(DATABASE_HEADER_PAGE_ID as u64)?
762 .is_some();
763 if !has_wal_frame {
764 return Ok(());
767 }
768
769 let header_page = self.read_page(DATABASE_HEADER_PAGE_ID)?;
771 while header_page.is_locked() {
772 self.io.run_once()?;
774 }
775
776 let page = header_page.get();
777 let contents = page.contents.as_ref().ok_or_else(|| {
778 LimboError::InternalError(
779 "page 1 contents missing while refreshing database header from WAL".to_string(),
780 )
781 })?;
782 let buf = contents.as_ptr();
783 sqlite3_ondisk::parse_database_header_into(&buf[..DATABASE_HEADER_SIZE], &self.db_header)?;
784
785 self.clear_page_cache();
790 Ok(())
791 }
792
793 pub fn change_page_cache_size(&self, capacity: usize) -> Result<CacheResizeResult> {
795 let mut page_cache = self.page_cache.write();
796 Ok(page_cache.resize(capacity))
797 }
798
799 pub fn add_dirty(&self, page_id: usize) {
800 let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
802 dirty_pages.insert(page_id);
803 }
804
805 pub fn wal_frame_count(&self) -> Result<u64> {
806 Ok(self.wal.borrow().get_max_frame_in_wal())
807 }
808
809 pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
814 let mut checkpoint_result = CheckpointResult::default();
815 loop {
816 let state = self.flush_info.borrow().state;
817 trace!("cacheflush {:?}", state);
818 match state {
819 FlushState::Start => {
820 let db_size = self.db_header.lock().database_size;
821 for page_id in self.dirty_pages.borrow().iter() {
822 let mut cache = self.page_cache.write();
823 let page_key = PageCacheKey::new(*page_id);
824 let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
825 let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
826 trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
827 self.wal.borrow_mut().append_frame(
828 page.clone(),
829 db_size,
830 self.flush_info.borrow().in_flight_writes.clone(),
831 )?;
832 page.clear_dirty();
833 }
834 {
836 let mut cache = self.page_cache.write();
837 cache.clear().unwrap();
838 }
839 self.dirty_pages.borrow_mut().clear();
840 self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
841 return Ok(PagerCacheflushStatus::IO);
842 }
843 FlushState::WaitAppendFrames => {
844 let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
845 if in_flight == 0 {
846 self.flush_info.borrow_mut().state = FlushState::SyncWal;
847 } else {
848 return Ok(PagerCacheflushStatus::IO);
849 }
850 }
851 FlushState::SyncWal => {
852 if self.synchronous_mode.get() != SynchronousMode::Off
853 && WalFsyncStatus::IO == self.wal.borrow_mut().sync()?
854 {
855 return Ok(PagerCacheflushStatus::IO);
856 }
857
858 if !self.wal.borrow().should_checkpoint() {
859 self.flush_info.borrow_mut().state = FlushState::Start;
860 return Ok(PagerCacheflushStatus::Done(
861 PagerCacheflushResult::WalWritten,
862 ));
863 }
864 self.flush_info.borrow_mut().state = FlushState::Checkpoint;
865 }
866 FlushState::Checkpoint => {
867 match self.checkpoint()? {
868 CheckpointStatus::Done(res) => {
869 checkpoint_result = res;
870 self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
871 }
872 CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO),
873 };
874 }
875 FlushState::SyncDbFile => {
876 if self.synchronous_mode.get() == SynchronousMode::Off {
877 self.flush_info.borrow_mut().state = FlushState::Start;
878 break;
879 }
880 sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
881 self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile;
882 }
883 FlushState::WaitSyncDbFile => {
884 if *self.syncing.borrow() {
885 return Ok(PagerCacheflushStatus::IO);
886 } else {
887 self.flush_info.borrow_mut().state = FlushState::Start;
888 break;
889 }
890 }
891 }
892 }
893 Ok(PagerCacheflushStatus::Done(
894 PagerCacheflushResult::Checkpointed(checkpoint_result),
895 ))
896 }
897
898 pub fn wal_get_frame(
899 &self,
900 frame_no: u32,
901 p_frame: *mut u8,
902 frame_len: u32,
903 ) -> Result<Arc<Completion>> {
904 let wal = self.wal.borrow();
905 return wal.read_frame_raw(
906 frame_no.into(),
907 self.buffer_pool.clone(),
908 p_frame,
909 frame_len,
910 );
911 }
912
913 pub fn checkpoint(&self) -> Result<CheckpointStatus> {
914 let mut checkpoint_result = CheckpointResult::default();
915 loop {
916 let state = *self.checkpoint_state.borrow();
917 trace!("pager_checkpoint(state={:?})", state);
918 match state {
919 CheckpointState::Checkpoint => {
920 let in_flight = self.checkpoint_inflight.clone();
921 match self.wal.borrow_mut().checkpoint(
922 self,
923 in_flight,
924 CheckpointMode::Passive,
925 )? {
926 CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
927 CheckpointStatus::Done(res) => {
928 checkpoint_result = res;
929 self.checkpoint_state.replace(CheckpointState::SyncDbFile);
930 }
931 };
932 }
933 CheckpointState::SyncDbFile => {
934 if self.synchronous_mode.get() == SynchronousMode::Off {
935 self.checkpoint_state
936 .replace(CheckpointState::CheckpointDone);
937 } else {
938 sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
939 self.checkpoint_state
940 .replace(CheckpointState::WaitSyncDbFile);
941 }
942 }
943 CheckpointState::WaitSyncDbFile => {
944 if *self.syncing.borrow() {
945 return Ok(CheckpointStatus::IO);
946 } else {
947 self.checkpoint_state
948 .replace(CheckpointState::CheckpointDone);
949 }
950 }
951 CheckpointState::CheckpointDone => {
952 return if *self.checkpoint_inflight.borrow() > 0 {
953 Ok(CheckpointStatus::IO)
954 } else {
955 self.checkpoint_state.replace(CheckpointState::Checkpoint);
956 Ok(CheckpointStatus::Done(checkpoint_result))
957 };
958 }
959 }
960 }
961 }
962
963 pub fn clear_page_cache(&self) {
967 self.dirty_pages.borrow_mut().clear();
968 self.page_cache.write().unset_dirty_all_pages();
969 self.page_cache
970 .write()
971 .clear()
972 .expect("Failed to clear page cache");
973 }
974
975 pub fn rollback(&self) {
981 self.clear_page_cache();
984
985 self.flush_info.borrow_mut().state = FlushState::Start;
987 *self.flush_info.borrow().in_flight_writes.borrow_mut() = 0;
988
989 self.checkpoint_state.replace(CheckpointState::Checkpoint);
991
992 *self.syncing.borrow_mut() = false;
994
995 self.wal.borrow().rollback();
997
998 self.savepoints.borrow_mut().clear();
1000 }
1001
1002 fn flush_dirty_pages_to_wal(&self) -> Result<()> {
1017 if self.dirty_pages.borrow().is_empty() {
1018 return Ok(());
1019 }
1020 let db_size = self.db_header.lock().database_size;
1021 let dirty_ids: Vec<usize> = self.dirty_pages.borrow().iter().copied().collect();
1024 for page_id in dirty_ids {
1025 let page_key = PageCacheKey::new(page_id);
1026 let mut cache = self.page_cache.write();
1028 let page = match cache.get(&page_key) {
1029 Some(p) => p,
1030 None => continue,
1032 };
1033 self.wal.borrow_mut().append_frame(
1034 page.clone(),
1035 db_size,
1036 self.flush_info.borrow().in_flight_writes.clone(),
1037 )?;
1038 }
1039 self.dirty_pages.borrow_mut().clear();
1042 let new_max = self.wal.borrow().current_frame_state().0;
1046 self.wal.borrow_mut().set_reader_max_frame(new_max);
1047 Ok(())
1048 }
1049
1050 pub fn open_savepoint(&self, name: String, is_txn_owner: bool) -> Result<()> {
1060 self.flush_dirty_pages_to_wal()?;
1062 let (max_frame, checksum) = self.wal.borrow().current_frame_state();
1063 let frame = SavepointFrame {
1064 name,
1065 wal_max_frame: max_frame,
1066 wal_checksum: checksum,
1067 is_transaction_owner: is_txn_owner,
1068 };
1069 self.savepoints.borrow_mut().push(frame);
1070 Ok(())
1071 }
1072
1073 pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
1082 let idx = {
1084 let sps = self.savepoints.borrow();
1085 sps.iter()
1086 .rposition(|sp| sp.name.eq_ignore_ascii_case(name))
1087 .ok_or_else(|| LimboError::ParseError(format!("no such savepoint: {name}")))?
1088 };
1089 let sp = {
1090 let sps = self.savepoints.borrow();
1091 sps[idx].clone()
1092 };
1093 self.savepoints.borrow_mut().truncate(idx + 1);
1095 self.clear_page_cache();
1097 self.wal
1099 .borrow()
1100 .rollback_to_frame(sp.wal_max_frame, sp.wal_checksum)?;
1101 self.wal.borrow_mut().set_reader_max_frame(sp.wal_max_frame);
1104 self.flush_info.borrow_mut().state = FlushState::Start;
1108 self.checkpoint_state.replace(CheckpointState::Checkpoint);
1109 *self.syncing.borrow_mut() = false;
1110 Ok(())
1111 }
1112
1113 pub fn release_savepoint(&self, name: &str) -> Result<bool> {
1119 let idx = {
1121 let sps = self.savepoints.borrow();
1122 sps.iter()
1123 .rposition(|sp| sp.name.eq_ignore_ascii_case(name))
1124 .ok_or_else(|| LimboError::ParseError(format!("no such savepoint: {name}")))?
1125 };
1126 let is_owner = self.savepoints.borrow()[idx].is_transaction_owner;
1127 self.savepoints.borrow_mut().truncate(idx);
1129 Ok(is_owner)
1130 }
1131
1132 pub fn checkpoint_shutdown(&self) -> Result<()> {
1133 let mut attempts = 0;
1134 {
1135 let mut wal = self.wal.borrow_mut();
1136 if self.synchronous_mode.get() != SynchronousMode::Off {
1138 while let Ok(WalFsyncStatus::IO) = wal.sync() {
1139 if attempts >= 10 {
1140 return Err(LimboError::InternalError(
1141 "Failed to fsync WAL before final checkpoint, fd likely closed".into(),
1142 ));
1143 }
1144 self.io.run_once()?;
1145 attempts += 1;
1146 }
1147 }
1148 }
1149 match self.wal_checkpoint_mode(CheckpointMode::Truncate) {
1153 Ok(_) => Ok(()),
1154 Err(_) => {
1155 let _ = self.wal_checkpoint_mode(CheckpointMode::Passive);
1156 Ok(())
1157 }
1158 }
1159 }
1160
1161 pub fn wal_checkpoint(&self) -> CheckpointResult {
1162 let checkpoint_result: CheckpointResult;
1163 loop {
1164 match self.wal.borrow_mut().checkpoint(
1165 self,
1166 Rc::new(RefCell::new(0)),
1167 CheckpointMode::Passive,
1168 ) {
1169 Ok(CheckpointStatus::IO) => {
1170 let _ = self.io.run_once();
1171 }
1172 Ok(CheckpointStatus::Done(res)) => {
1173 checkpoint_result = res;
1174 break;
1175 }
1176 Err(err) => panic!("error while clearing cache {}", err),
1177 }
1178 }
1179 self.page_cache
1181 .write()
1182 .clear()
1183 .expect("Failed to clear page cache");
1184 checkpoint_result
1185 }
1186
1187 pub fn wal_checkpoint_mode(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
1191 let checkpoint_result;
1192 loop {
1193 match self
1194 .wal
1195 .borrow_mut()
1196 .checkpoint(self, Rc::new(RefCell::new(0)), mode)?
1197 {
1198 CheckpointStatus::IO => {
1199 self.io.run_once()?;
1200 }
1201 CheckpointStatus::Done(res) => {
1202 checkpoint_result = res;
1203 break;
1204 }
1205 }
1206 }
1207 self.page_cache.write().clear().map_err(|_| {
1208 crate::error::LimboError::InternalError("failed to clear page cache".to_string())
1209 })?;
1210 Ok(checkpoint_result)
1211 }
1212
1213 pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
1216 tracing::trace!("free_page(page_id={})", page_id);
1217 const TRUNK_PAGE_HEADER_SIZE: usize = 8;
1218 const LEAF_ENTRY_SIZE: usize = 4;
1219 const RESERVED_SLOTS: usize = 2;
1220
1221 const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; if page_id < 2 || page_id > self.db_header.lock().database_size as usize {
1225 return Err(LimboError::Corrupt(format!(
1226 "Invalid page number {} for free operation",
1227 page_id
1228 )));
1229 }
1230
1231 let page = match page {
1232 Some(page) => {
1233 assert_eq!(page.get().id, page_id, "Page id mismatch");
1234 page
1235 }
1236 None => self.read_page(page_id)?,
1237 };
1238
1239 self.db_header.lock().freelist_pages += 1;
1240
1241 let trunk_page_id = self.db_header.lock().freelist_trunk_page;
1242
1243 if trunk_page_id != 0 {
1244 let trunk_page = self.read_page(trunk_page_id as usize)?;
1246 let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
1247 let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
1248
1249 let max_free_list_entries = (self.usable_size() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
1251
1252 if number_of_leaf_pages < max_free_list_entries as u32 {
1253 trunk_page.set_dirty();
1254 self.add_dirty(trunk_page_id as usize);
1255
1256 trunk_page_contents
1257 .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
1258 trunk_page_contents.write_u32(
1259 TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
1260 page_id as u32,
1261 );
1262 page.clear_uptodate();
1263 page.clear_loaded();
1264
1265 return Ok(());
1266 }
1267 }
1268
1269 page.set_dirty();
1271 self.add_dirty(page_id);
1272
1273 let contents = page.get().contents.as_mut().unwrap();
1274 contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
1276 contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
1278 self.db_header.lock().freelist_trunk_page = page_id as u32;
1280 page.clear_uptodate();
1282 page.clear_loaded();
1283 Ok(())
1284 }
1285
1286 #[allow(clippy::readonly_write_lock)]
1292 pub fn allocate_page(&self) -> Result<PageRef> {
1293 let header = &self.db_header;
1294 let mut header = header.lock();
1295 header.database_size += 1;
1296
1297 #[cfg(not(feature = "omit_autovacuum"))]
1298 {
1299 if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
1303 && is_ptrmap_page(header.database_size, header.get_page_size() as usize)
1304 {
1305 let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
1306 page.set_dirty();
1307 self.add_dirty(page.get().id);
1308
1309 let page_key = PageCacheKey::new(page.get().id);
1310 let mut cache = self.page_cache.write();
1311 match cache.insert(page_key, page.clone()) {
1312 Ok(_) => (),
1313 Err(CacheError::Full) => return Err(LimboError::CacheFull),
1314 Err(_) => {
1315 return Err(LimboError::InternalError(
1316 "Unknown error inserting page to cache".into(),
1317 ))
1318 }
1319 }
1320 header.database_size += 1;
1321 }
1322 }
1323
1324 self.write_database_header(&mut header)?;
1326
1327 let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
1329 {
1330 page.set_dirty();
1332 self.add_dirty(page.get().id);
1333
1334 let page_key = PageCacheKey::new(page.get().id);
1335 let mut cache = self.page_cache.write();
1336 match cache.insert(page_key, page.clone()) {
1337 Err(CacheError::Full) => Err(LimboError::CacheFull),
1338 Err(_) => Err(LimboError::InternalError(
1339 "Unknown error inserting page to cache".into(),
1340 )),
1341 Ok(_) => Ok(page),
1342 }
1343 }
1344 }
1345
1346 pub fn update_dirty_loaded_page_in_cache(
1347 &self,
1348 id: usize,
1349 page: PageRef,
1350 ) -> Result<(), LimboError> {
1351 let mut cache = self.page_cache.write();
1352 let page_key = PageCacheKey::new(id);
1353
1354 assert!(page.is_dirty());
1356 cache
1357 .insert_ignore_existing(page_key, page.clone())
1358 .map_err(|e| {
1359 LimboError::InternalError(format!(
1360 "Failed to insert loaded page {} into cache: {:?}",
1361 id, e
1362 ))
1363 })?;
1364 page.set_loaded();
1365 Ok(())
1366 }
1367
1368 pub fn usable_size(&self) -> usize {
1369 let db_header = self.db_header.lock();
1370 (db_header.get_page_size() - db_header.reserved_space as u32) as usize
1371 }
1372}
1373
1374pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize) -> PageRef {
1375 let page = Arc::new(Page::new(page_id));
1376 {
1377 let buffer = buffer_pool.get();
1378 let bp = buffer_pool.clone();
1379 let drop_fn = Rc::new(move |buf| {
1380 bp.put(buf);
1381 });
1382 let buffer = Arc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
1383 page.set_loaded();
1384 page.get().contents = Some(PageContent::new(offset, buffer));
1385 }
1386 page
1387}
1388
1389#[derive(Debug)]
1390pub struct CreateBTreeFlags(pub u8);
1391impl CreateBTreeFlags {
1392 pub const TABLE: u8 = 0b0001;
1393 pub const INDEX: u8 = 0b0010;
1394
1395 pub fn new_table() -> Self {
1396 Self(CreateBTreeFlags::TABLE)
1397 }
1398
1399 pub fn new_index() -> Self {
1400 Self(CreateBTreeFlags::INDEX)
1401 }
1402
1403 pub fn is_table(&self) -> bool {
1404 (self.0 & CreateBTreeFlags::TABLE) != 0
1405 }
1406
1407 pub fn is_index(&self) -> bool {
1408 (self.0 & CreateBTreeFlags::INDEX) != 0
1409 }
1410
1411 pub fn get_flags(&self) -> u8 {
1412 self.0
1413 }
1414}
1415
1416#[cfg(not(feature = "omit_autovacuum"))]
1448mod ptrmap {
1449 use crate::{storage::sqlite3_ondisk::MIN_PAGE_SIZE, LimboError, Result};
1450
1451 pub const PTRMAP_ENTRY_SIZE: usize = 5;
1453 pub const FIRST_PTRMAP_PAGE_NO: u32 = 2;
1456
1457 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1458 #[repr(u8)]
1459 pub enum PtrmapType {
1460 RootPage = 1,
1461 FreePage = 2,
1462 Overflow1 = 3,
1463 Overflow2 = 4,
1464 BTreeNode = 5,
1465 }
1466
1467 impl PtrmapType {
1468 pub fn from_u8(value: u8) -> Option<Self> {
1469 match value {
1470 1 => Some(PtrmapType::RootPage),
1471 2 => Some(PtrmapType::FreePage),
1472 3 => Some(PtrmapType::Overflow1),
1473 4 => Some(PtrmapType::Overflow2),
1474 5 => Some(PtrmapType::BTreeNode),
1475 _ => None,
1476 }
1477 }
1478 }
1479
1480 #[derive(Debug, Clone, Copy)]
1481 pub struct PtrmapEntry {
1482 pub entry_type: PtrmapType,
1483 pub parent_page_no: u32,
1484 }
1485
1486 impl PtrmapEntry {
1487 pub fn serialize(&self, buffer: &mut [u8]) -> Result<()> {
1488 if buffer.len() < PTRMAP_ENTRY_SIZE {
1489 return Err(LimboError::InternalError(format!(
1490 "Buffer too small to serialize ptrmap entry. Expected at least {} bytes, got {}",
1491 PTRMAP_ENTRY_SIZE,
1492 buffer.len()
1493 )));
1494 }
1495 buffer[0] = self.entry_type as u8;
1496 buffer[1..5].copy_from_slice(&self.parent_page_no.to_be_bytes());
1497 Ok(())
1498 }
1499
1500 pub fn deserialize(buffer: &[u8]) -> Option<Self> {
1501 if buffer.len() < PTRMAP_ENTRY_SIZE {
1502 return None;
1503 }
1504 let entry_type_u8 = buffer[0];
1505 let parent_bytes_slice = buffer.get(1..5)?;
1506 let parent_page_no = u32::from_be_bytes(parent_bytes_slice.try_into().ok()?);
1507 PtrmapType::from_u8(entry_type_u8).map(|entry_type| PtrmapEntry {
1508 entry_type,
1509 parent_page_no,
1510 })
1511 }
1512 }
1513
1514 pub fn entries_per_ptrmap_page(page_size: usize) -> usize {
1517 assert!(page_size >= MIN_PAGE_SIZE as usize);
1518 page_size / PTRMAP_ENTRY_SIZE
1519 }
1520
1521 pub fn ptrmap_page_cycle_length(page_size: usize) -> usize {
1524 assert!(page_size >= MIN_PAGE_SIZE as usize);
1525 (page_size / PTRMAP_ENTRY_SIZE) + 1
1526 }
1527
1528 pub fn is_ptrmap_page(db_page_no: u32, page_size: usize) -> bool {
1530 if db_page_no == 1 {
1532 return false;
1533 }
1534 if db_page_no == FIRST_PTRMAP_PAGE_NO {
1535 return true;
1536 }
1537 return get_ptrmap_page_no_for_db_page(db_page_no, page_size) == db_page_no;
1538 }
1539
1540 pub fn get_ptrmap_page_no_for_db_page(db_page_no_to_query: u32, page_size: usize) -> u32 {
1543 let group_size = ptrmap_page_cycle_length(page_size) as u32;
1544 if group_size == 0 {
1545 panic!("Page size too small, a ptrmap page cannot map any db pages.");
1546 }
1547
1548 let effective_page_index = db_page_no_to_query - FIRST_PTRMAP_PAGE_NO;
1549 let group_idx = effective_page_index / group_size;
1550
1551 (group_idx * group_size) + FIRST_PTRMAP_PAGE_NO
1552 }
1553
1554 pub fn get_ptrmap_offset_in_page(
1557 db_page_no_to_query: u32,
1558 ptrmap_page_no: u32,
1559 page_size: usize,
1560 ) -> Result<usize> {
1561 let n_data_pages_per_group = entries_per_ptrmap_page(page_size);
1568 let first_data_page_mapped = ptrmap_page_no + 1;
1569 let last_data_page_mapped = ptrmap_page_no + n_data_pages_per_group as u32;
1570
1571 if db_page_no_to_query < first_data_page_mapped
1572 || db_page_no_to_query > last_data_page_mapped
1573 {
1574 return Err(LimboError::InternalError(format!(
1575 "Page {} is not mapped by the data page range [{}, {}] of ptrmap page {}",
1576 db_page_no_to_query, first_data_page_mapped, last_data_page_mapped, ptrmap_page_no
1577 )));
1578 }
1579 if is_ptrmap_page(db_page_no_to_query, page_size) {
1580 return Err(LimboError::InternalError(format!(
1581 "Page {} is a pointer map page and should not have an entry calculated this way.",
1582 db_page_no_to_query
1583 )));
1584 }
1585
1586 let entry_index_on_page = (db_page_no_to_query - first_data_page_mapped) as usize;
1587 Ok(entry_index_on_page * PTRMAP_ENTRY_SIZE)
1588 }
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593 use std::sync::Arc;
1594
1595 use parking_lot::RwLock;
1596
1597 use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey};
1598
1599 use super::Page;
1600
1601 #[test]
1602 fn test_shared_cache() {
1603 let cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
1605
1606 let thread = {
1607 let cache = cache.clone();
1608 std::thread::spawn(move || {
1609 let mut cache = cache.write();
1610 let page_key = PageCacheKey::new(1);
1611 cache.insert(page_key, Arc::new(Page::new(1))).unwrap();
1612 })
1613 };
1614 let _ = thread.join();
1615 let mut cache = cache.write();
1616 let page_key = PageCacheKey::new(1);
1617 let page = cache.get(&page_key);
1618 assert_eq!(page.unwrap().get().id, 1);
1619 }
1620}
1621
1622#[cfg(test)]
1623#[cfg(not(feature = "omit_autovacuum"))]
1624mod ptrmap_tests {
1625 use std::cell::RefCell;
1626 use std::rc::Rc;
1627 use std::sync::Arc;
1628
1629 use super::ptrmap::*;
1630 use super::*;
1631 use crate::fast_lock::SpinLock;
1632 use crate::io::{MemoryIO, OpenFlags, IO};
1633 use crate::storage::buffer_pool::BufferPool;
1634 use crate::storage::database::{DatabaseFile, DatabaseStorage};
1635 use crate::storage::page_cache::DumbLruPageCache;
1636 use crate::storage::pager::Pager;
1637 use crate::storage::sqlite3_ondisk::DatabaseHeader;
1638 use crate::storage::sqlite3_ondisk::MIN_PAGE_SIZE;
1639 use crate::storage::wal::{WalFile, WalFileShared};
1640
1641 fn test_pager_setup(page_size: u32, initial_db_pages: u32) -> Pager {
1643 let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
1644 let db_file_raw = io.open_file("test.db", OpenFlags::Create, true).unwrap();
1645 let db_storage: Arc<dyn DatabaseStorage> = Arc::new(DatabaseFile::new(db_file_raw));
1646
1647 let mut header_data = DatabaseHeader::default();
1649 header_data.update_page_size(page_size);
1650 let db_header_arc = Arc::new(SpinLock::new(header_data));
1651 db_header_arc.lock().vacuum_mode_largest_root_page = 1;
1652
1653 let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
1655 let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(
1656 (initial_db_pages + 10) as usize,
1657 )));
1658
1659 let wal = Rc::new(RefCell::new(WalFile::new(
1660 io.clone(),
1661 page_size,
1662 WalFileShared::open_shared(&io, "test.db-wal", page_size).unwrap(),
1663 buffer_pool.clone(),
1664 )));
1665
1666 let pager = Pager::finish_open(db_header_arc, db_storage, wal, io, page_cache, buffer_pool)
1667 .unwrap();
1668 pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
1669
1670 for _ in 0..initial_db_pages {
1672 match pager.btree_create(&CreateBTreeFlags::new_table()) {
1673 Ok(CursorResult::Ok(_root_page_id)) => (),
1674 Ok(CursorResult::IO) => {
1675 panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly");
1676 }
1677 Err(e) => {
1678 panic!("test_pager_setup: btree_create failed: {:?}", e);
1679 }
1680 }
1681 }
1682
1683 return pager;
1684 }
1685
1686 #[test]
1687 fn test_ptrmap_page_allocation() {
1688 let page_size = 4096;
1689 let initial_db_pages = 10;
1690 let pager = test_pager_setup(page_size, initial_db_pages);
1691
1692 let db_page_to_update: u32 = 5;
1694 let expected_ptrmap_pg_no =
1695 get_ptrmap_page_no_for_db_page(db_page_to_update, page_size as usize);
1696 assert_eq!(expected_ptrmap_pg_no, FIRST_PTRMAP_PAGE_NO);
1697
1698 let ptrmap_page_ref = pager.read_page(expected_ptrmap_pg_no as usize);
1700 assert!(ptrmap_page_ref.is_ok());
1701
1702 assert_eq!(pager.db_header.lock().database_size, initial_db_pages + 2); let entry = pager.ptrmap_get(db_page_to_update).unwrap();
1707 assert!(matches!(entry, CursorResult::Ok(Some(_))));
1708 let CursorResult::Ok(Some(entry)) = entry else {
1709 panic!("entry is not Some");
1710 };
1711 assert_eq!(entry.entry_type, PtrmapType::RootPage);
1712 assert_eq!(entry.parent_page_no, 0);
1713 }
1714
1715 #[test]
1716 fn test_is_ptrmap_page_logic() {
1717 let page_size = MIN_PAGE_SIZE as usize;
1718 let n_data_pages = entries_per_ptrmap_page(page_size);
1719 assert_eq!(n_data_pages, 102); assert!(!is_ptrmap_page(1, page_size)); assert!(is_ptrmap_page(2, page_size)); assert!(!is_ptrmap_page(3, page_size)); assert!(!is_ptrmap_page(4, page_size)); assert!(!is_ptrmap_page(5, page_size)); assert!(is_ptrmap_page(105, page_size)); assert!(!is_ptrmap_page(106, page_size)); assert!(!is_ptrmap_page(107, page_size)); assert!(!is_ptrmap_page(108, page_size)); assert!(is_ptrmap_page(208, page_size)); }
1732
1733 #[test]
1734 fn test_get_ptrmap_page_no() {
1735 let page_size = MIN_PAGE_SIZE as usize; assert_eq!(get_ptrmap_page_no_for_db_page(3, page_size), 2); assert_eq!(get_ptrmap_page_no_for_db_page(4, page_size), 2); assert_eq!(get_ptrmap_page_no_for_db_page(5, page_size), 2); assert_eq!(get_ptrmap_page_no_for_db_page(104, page_size), 2); assert_eq!(get_ptrmap_page_no_for_db_page(105, page_size), 105); assert_eq!(get_ptrmap_page_no_for_db_page(106, page_size), 105); assert_eq!(get_ptrmap_page_no_for_db_page(107, page_size), 105); assert_eq!(get_ptrmap_page_no_for_db_page(108, page_size), 105); assert_eq!(get_ptrmap_page_no_for_db_page(208, page_size), 208); }
1752
1753 #[test]
1754 fn test_get_ptrmap_offset() {
1755 let page_size = MIN_PAGE_SIZE as usize; assert_eq!(get_ptrmap_offset_in_page(3, 2, page_size).unwrap(), 0);
1758 assert_eq!(
1759 get_ptrmap_offset_in_page(4, 2, page_size).unwrap(),
1760 1 * PTRMAP_ENTRY_SIZE
1761 );
1762 assert_eq!(
1763 get_ptrmap_offset_in_page(5, 2, page_size).unwrap(),
1764 2 * PTRMAP_ENTRY_SIZE
1765 );
1766
1767 assert_eq!(get_ptrmap_offset_in_page(106, 105, page_size).unwrap(), 0);
1772 assert_eq!(
1773 get_ptrmap_offset_in_page(107, 105, page_size).unwrap(),
1774 1 * PTRMAP_ENTRY_SIZE
1775 );
1776 assert_eq!(
1777 get_ptrmap_offset_in_page(108, 105, page_size).unwrap(),
1778 2 * PTRMAP_ENTRY_SIZE
1779 );
1780 }
1781}