1#![allow(clippy::arc_with_non_send_sync)]
2#![allow(clippy::not_unsafe_ptr_arg_deref)]
3
4use std::array;
5use std::cell::UnsafeCell;
6use std::collections::HashMap;
7use strum::EnumString;
8use tracing::{instrument, Level};
9
10use std::fmt::Formatter;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::{
13 cell::{Cell, RefCell},
14 fmt,
15 rc::Rc,
16 sync::Arc,
17};
18
19use crate::fast_lock::SpinLock;
20use crate::io::{File, SyncCompletion, IO};
21use crate::result::LimboResult;
22use crate::storage::sqlite3_ondisk::{
23 begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE,
24 WAL_HEADER_SIZE,
25};
26use crate::{Buffer, Result};
27use crate::{Completion, Page};
28
29use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
30
31use super::buffer_pool::BufferPool;
32use super::pager::{PageRef, Pager};
33use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
34
35pub const READMARK_NOT_USED: u32 = 0xffffffff;
36
37pub const NO_LOCK: u32 = 0;
38pub const SHARED_LOCK: u32 = 1;
39pub const WRITE_LOCK: u32 = 2;
40
41#[derive(Debug, Copy, Clone)]
42pub struct CheckpointResult {
43 pub num_wal_frames: u64,
45 pub num_checkpointed_frames: u64,
47}
48
49impl Default for CheckpointResult {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl CheckpointResult {
56 pub fn new() -> Self {
57 Self {
58 num_wal_frames: 0,
59 num_checkpointed_frames: 0,
60 }
61 }
62}
63
64#[derive(Debug, Copy, Clone, EnumString)]
65#[strum(ascii_case_insensitive)]
66pub enum CheckpointMode {
67 Passive,
69 Full,
71 Restart,
73 Truncate,
75}
76
77#[derive(Debug, Default)]
78pub struct LimboRwLock {
79 lock: AtomicU32,
80 nreads: AtomicU32,
81 value: AtomicU32,
82}
83
84impl LimboRwLock {
85 pub fn new() -> Self {
86 Self {
87 lock: AtomicU32::new(NO_LOCK),
88 nreads: AtomicU32::new(0),
89 value: AtomicU32::new(READMARK_NOT_USED),
90 }
91 }
92
93 pub fn read(&mut self) -> bool {
95 let lock = self.lock.load(Ordering::SeqCst);
96 let ok = match lock {
97 NO_LOCK => {
98 let res = self.lock.compare_exchange(
99 lock,
100 SHARED_LOCK,
101 Ordering::SeqCst,
102 Ordering::SeqCst,
103 );
104 let ok = res.is_ok();
105 if ok {
106 self.nreads.fetch_add(1, Ordering::SeqCst);
107 }
108 ok
109 }
110 SHARED_LOCK => {
111 self.nreads.fetch_add(1, Ordering::SeqCst);
114 let lock_after_load = self.lock.load(Ordering::SeqCst);
115 if lock_after_load != lock {
116 let res = self.lock.compare_exchange(
118 lock_after_load,
119 SHARED_LOCK,
120 Ordering::SeqCst,
121 Ordering::SeqCst,
122 );
123 let ok = res.is_ok();
124 if ok {
125 true
127 } else {
128 self.nreads.fetch_sub(1, Ordering::SeqCst);
130 false
131 }
132 } else {
133 true
134 }
135 }
136 WRITE_LOCK => false,
137 _ => unreachable!(),
138 };
139 tracing::trace!("read_lock({})", ok);
140 ok
141 }
142
143 pub fn write(&mut self) -> bool {
145 let lock = self.lock.load(Ordering::SeqCst);
146 let ok = match lock {
147 NO_LOCK => {
148 let res = self.lock.compare_exchange(
149 lock,
150 WRITE_LOCK,
151 Ordering::SeqCst,
152 Ordering::SeqCst,
153 );
154 res.is_ok()
155 }
156 SHARED_LOCK => {
157 false
159 }
160 WRITE_LOCK => false,
161 _ => unreachable!(),
162 };
163 tracing::trace!("write_lock({})", ok);
164 ok
165 }
166
167 pub fn unlock(&mut self) {
169 let lock = self.lock.load(Ordering::SeqCst);
170 tracing::trace!("unlock(lock={})", lock);
171 match lock {
172 NO_LOCK => {}
173 SHARED_LOCK => {
174 let prev = self.nreads.fetch_sub(1, Ordering::SeqCst);
175 if prev == 1 {
176 let res = self.lock.compare_exchange(
177 lock,
178 NO_LOCK,
179 Ordering::SeqCst,
180 Ordering::SeqCst,
181 );
182 assert!(res.is_ok());
183 }
184 }
185 WRITE_LOCK => {
186 let res =
187 self.lock
188 .compare_exchange(lock, NO_LOCK, Ordering::SeqCst, Ordering::SeqCst);
189 assert!(res.is_ok());
190 }
191 _ => unreachable!(),
192 }
193 }
194}
195
196pub trait Wal {
198 fn begin_read_tx(&mut self) -> Result<LimboResult>;
200
201 fn begin_write_tx(&mut self) -> Result<LimboResult>;
203
204 fn end_read_tx(&self) -> Result<LimboResult>;
206
207 fn end_write_tx(&self) -> Result<LimboResult>;
209
210 fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
212
213 fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()>;
215
216 fn read_frame_raw(
218 &self,
219 frame_id: u64,
220 buffer_pool: Rc<BufferPool>,
221 frame: *mut u8,
222 frame_len: u32,
223 ) -> Result<Arc<Completion>>;
224
225 fn append_frame(
227 &mut self,
228 page: PageRef,
229 db_size: u32,
230 write_counter: Rc<RefCell<usize>>,
231 ) -> Result<()>;
232
233 fn should_checkpoint(&self) -> bool;
234 fn checkpoint(
235 &mut self,
236 pager: &Pager,
237 write_counter: Rc<RefCell<usize>>,
238 mode: CheckpointMode,
239 ) -> Result<CheckpointStatus>;
240 fn sync(&mut self) -> Result<WalFsyncStatus>;
241 fn get_max_frame_in_wal(&self) -> u64;
242 fn get_max_frame(&self) -> u64;
243 fn get_min_frame(&self) -> u64;
244
245 fn rollback(&self);
251
252 fn current_frame_state(&self) -> (u64, (u32, u32));
254
255 fn rollback_to_frame(&self, target_frame: u64, target_checksum: (u32, u32)) -> Result<()>;
260
261 fn set_reader_max_frame(&mut self, frame: u64);
269}
270
271pub struct DummyWAL;
276
277impl Wal for DummyWAL {
278 fn begin_read_tx(&mut self) -> Result<LimboResult> {
279 Ok(LimboResult::Ok)
280 }
281
282 fn end_read_tx(&self) -> Result<LimboResult> {
283 Ok(LimboResult::Ok)
284 }
285
286 fn begin_write_tx(&mut self) -> Result<LimboResult> {
287 Ok(LimboResult::Ok)
288 }
289
290 fn end_write_tx(&self) -> Result<LimboResult> {
291 Ok(LimboResult::Ok)
292 }
293
294 fn find_frame(&self, _page_id: u64) -> Result<Option<u64>> {
295 Ok(None)
296 }
297
298 fn read_frame(
299 &self,
300 _frame_id: u64,
301 _page: crate::PageRef,
302 _buffer_pool: Rc<BufferPool>,
303 ) -> Result<()> {
304 Ok(())
305 }
306
307 fn read_frame_raw(
308 &self,
309 _frame_id: u64,
310 _buffer_pool: Rc<BufferPool>,
311 _frame: *mut u8,
312 _frame_len: u32,
313 ) -> Result<Arc<Completion>> {
314 todo!();
315 }
316
317 fn append_frame(
318 &mut self,
319 _page: crate::PageRef,
320 _db_size: u32,
321 _write_counter: Rc<RefCell<usize>>,
322 ) -> Result<()> {
323 Ok(())
324 }
325
326 fn should_checkpoint(&self) -> bool {
327 false
328 }
329
330 fn checkpoint(
331 &mut self,
332 _pager: &Pager,
333 _write_counter: Rc<RefCell<usize>>,
334 _mode: crate::CheckpointMode,
335 ) -> Result<crate::CheckpointStatus> {
336 Ok(crate::CheckpointStatus::Done(
337 crate::CheckpointResult::default(),
338 ))
339 }
340
341 fn sync(&mut self) -> Result<crate::storage::wal::WalFsyncStatus> {
342 Ok(crate::storage::wal::WalFsyncStatus::Done)
343 }
344
345 fn get_max_frame_in_wal(&self) -> u64 {
346 0
347 }
348
349 fn get_max_frame(&self) -> u64 {
350 0
351 }
352
353 fn get_min_frame(&self) -> u64 {
354 0
355 }
356
357 fn rollback(&self) {
358 }
360
361 fn current_frame_state(&self) -> (u64, (u32, u32)) {
362 (0, (0, 0))
363 }
364
365 fn rollback_to_frame(&self, _target_frame: u64, _target_checksum: (u32, u32)) -> Result<()> {
366 Ok(())
367 }
368
369 fn set_reader_max_frame(&mut self, _frame: u64) {
370 }
372}
373
374#[derive(Copy, Clone, Debug)]
377enum SyncState {
378 NotSyncing,
379 Syncing,
380}
381
382#[derive(Debug, Copy, Clone)]
383pub enum CheckpointState {
384 Start,
385 ReadFrame,
386 WaitReadFrame,
387 WritePage,
388 WaitWritePage,
389 Done,
390}
391
392#[derive(Debug, Copy, Clone, PartialEq)]
393pub enum WalFsyncStatus {
394 Done,
395 IO,
396}
397
398#[derive(Debug, Copy, Clone)]
399pub enum CheckpointStatus {
400 Done(CheckpointResult),
401 IO,
402}
403
404struct OngoingCheckpoint {
413 page: PageRef,
414 state: CheckpointState,
415 min_frame: u64,
416 max_frame: u64,
417 current_page: u64,
418}
419
420impl fmt::Debug for OngoingCheckpoint {
421 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422 f.debug_struct("OngoingCheckpoint")
423 .field("state", &self.state)
424 .field("min_frame", &self.min_frame)
425 .field("max_frame", &self.max_frame)
426 .field("current_page", &self.current_page)
427 .finish()
428 }
429}
430
431#[allow(dead_code)]
432pub struct WalFile {
433 io: Arc<dyn IO>,
434 buffer_pool: Rc<BufferPool>,
435
436 syncing: Rc<Cell<bool>>,
437 sync_state: Cell<SyncState>,
438 page_size: u32,
439
440 shared: Arc<UnsafeCell<WalFileShared>>,
441 ongoing_checkpoint: OngoingCheckpoint,
442 checkpoint_threshold: usize,
443 max_frame_read_lock_index: usize,
447 max_frame: u64,
449 min_frame: u64,
451 txn_start_max_frame: Cell<u64>,
454 txn_start_last_checksum: Cell<(u32, u32)>,
458}
459
460impl fmt::Debug for WalFile {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 f.debug_struct("WalFile")
463 .field("syncing", &self.syncing.get())
464 .field("sync_state", &self.sync_state)
465 .field("page_size", &self.page_size)
466 .field("shared", &self.shared)
467 .field("ongoing_checkpoint", &self.ongoing_checkpoint)
468 .field("checkpoint_threshold", &self.checkpoint_threshold)
469 .field("max_frame_read_lock_index", &self.max_frame_read_lock_index)
470 .field("max_frame", &self.max_frame)
471 .field("min_frame", &self.min_frame)
472 .field("txn_start_max_frame", &self.txn_start_max_frame)
473 .field("txn_start_last_checksum", &self.txn_start_last_checksum)
474 .finish()
476 }
477}
478
479#[allow(dead_code)]
483pub struct WalFileShared {
484 pub wal_header: Arc<SpinLock<WalHeader>>,
485 pub min_frame: AtomicU64,
486 pub max_frame: AtomicU64,
487 pub nbackfills: AtomicU64,
488 pub frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
495 pub pages_in_frames: Arc<SpinLock<Vec<u64>>>,
497 pub last_checksum: (u32, u32), pub file: Arc<dyn File>,
499 pub read_locks: [LimboRwLock; 5],
504 pub write_lock: LimboRwLock,
507 pub loaded: AtomicBool,
508}
509
510impl fmt::Debug for WalFileShared {
511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512 f.debug_struct("WalFileShared")
513 .field("wal_header", &self.wal_header)
514 .field("min_frame", &self.min_frame)
515 .field("max_frame", &self.max_frame)
516 .field("nbackfills", &self.nbackfills)
517 .field("frame_cache", &self.frame_cache)
518 .field("pages_in_frames", &self.pages_in_frames)
519 .field("last_checksum", &self.last_checksum)
520 .finish()
522 }
523}
524
525impl Wal for WalFile {
526 fn begin_read_tx(&mut self) -> Result<LimboResult> {
528 let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
529
530 let mut max_read_mark = 0;
531 let mut max_read_mark_index = -1;
532 for (index, lock) in self.get_shared().read_locks.iter().enumerate() {
535 let this_mark = lock.value.load(Ordering::SeqCst);
536 if this_mark > max_read_mark && this_mark <= max_frame_in_wal as u32 {
537 max_read_mark = this_mark;
538 max_read_mark_index = index as i64;
539 }
540 }
541
542 if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 {
544 for (index, lock) in self.get_shared().read_locks.iter_mut().enumerate() {
545 let busy = !lock.write();
546 if !busy {
547 lock.value.store(max_frame_in_wal as u32, Ordering::SeqCst);
549 max_read_mark = max_frame_in_wal as u32;
550 max_read_mark_index = index as i64;
551 lock.unlock();
552 break;
553 }
554 }
555 }
556
557 if max_read_mark_index == -1 {
558 return Ok(LimboResult::Busy);
559 }
560
561 let shared = self.get_shared();
562 {
563 let lock = &mut shared.read_locks[max_read_mark_index as usize];
564 let busy = !lock.read();
565 if busy {
566 return Ok(LimboResult::Busy);
567 }
568 }
569 self.min_frame = shared.nbackfills.load(Ordering::SeqCst) + 1;
570 self.max_frame_read_lock_index = max_read_mark_index as usize;
571 self.max_frame = max_read_mark as u64;
572 tracing::debug!(
573 "begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})",
574 self.min_frame,
575 self.max_frame,
576 self.max_frame_read_lock_index,
577 max_frame_in_wal
578 );
579 Ok(LimboResult::Ok)
580 }
581
582 #[inline(always)]
584 fn end_read_tx(&self) -> Result<LimboResult> {
585 tracing::debug!("end_read_tx");
586 let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index];
587 read_lock.unlock();
588 Ok(LimboResult::Ok)
589 }
590
591 fn begin_write_tx(&mut self) -> Result<LimboResult> {
593 let busy = !self.get_shared().write_lock.write();
594 tracing::debug!("begin_write_transaction(busy={})", busy);
595 if busy {
596 return Ok(LimboResult::Busy);
597 }
598 let shared = self.get_shared();
601 self.txn_start_max_frame
602 .set(shared.max_frame.load(Ordering::SeqCst));
603 self.txn_start_last_checksum.set(shared.last_checksum);
604 Ok(LimboResult::Ok)
605 }
606
607 fn end_write_tx(&self) -> Result<LimboResult> {
609 tracing::debug!("end_write_txn");
610 self.get_shared().write_lock.unlock();
611 Ok(LimboResult::Ok)
612 }
613
614 fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
616 let shared = self.get_shared();
617 let frames = shared.frame_cache.lock();
618 let frames = frames.get(&page_id);
619 if frames.is_none() {
620 return Ok(None);
621 }
622 let frames = frames.expect("frames must be Some after is_none() check");
623 for frame in frames.iter().rev() {
624 if *frame <= self.max_frame {
625 return Ok(Some(*frame));
626 }
627 }
628 Ok(None)
629 }
630
631 fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()> {
633 tracing::debug!("read_frame({})", frame_id);
634 let offset = self.frame_offset(frame_id);
635 page.set_locked();
636 let frame = page.clone();
637 let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
638 let frame = frame.clone();
639 finish_read_page(page.get().id, buf, frame)
640 .expect("finish_read_page failed in WAL read completion");
641 });
642 begin_read_wal_frame(
643 &self.get_shared().file,
644 offset + WAL_FRAME_HEADER_SIZE,
645 buffer_pool,
646 complete,
647 )?;
648 Ok(())
649 }
650
651 fn read_frame_raw(
652 &self,
653 frame_id: u64,
654 buffer_pool: Rc<BufferPool>,
655 frame: *mut u8,
656 frame_len: u32,
657 ) -> Result<Arc<Completion>> {
658 tracing::debug!("read_frame({})", frame_id);
659 let offset = self.frame_offset(frame_id);
660 let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
661 let buf = buf.borrow();
662 let buf_ptr = buf.as_ptr();
663 unsafe {
664 std::ptr::copy_nonoverlapping(buf_ptr, frame, frame_len as usize);
665 }
666 });
667 let c = begin_read_wal_frame(
668 &self.get_shared().file,
669 offset + WAL_FRAME_HEADER_SIZE,
670 buffer_pool,
671 complete,
672 )?;
673 Ok(c)
674 }
675
676 fn append_frame(
678 &mut self,
679 page: PageRef,
680 db_size: u32,
681 write_counter: Rc<RefCell<usize>>,
682 ) -> Result<()> {
683 let page_id = page.get().id;
684 let shared = self.get_shared();
685 let max_frame = shared.max_frame.load(Ordering::SeqCst);
686 let frame_id = if max_frame == 0 { 1 } else { max_frame + 1 };
687 let offset = self.frame_offset(frame_id);
688 tracing::debug!(
689 "append_frame(frame={}, offset={}, page_id={})",
690 frame_id,
691 offset,
692 page_id
693 );
694 let header = shared.wal_header.clone();
695 let header = header.lock();
696 let checksums = shared.last_checksum;
697 let checksums = begin_write_wal_frame(
698 &shared.file,
699 offset,
700 &page,
701 self.page_size as u16,
702 db_size,
703 write_counter,
704 &header,
705 checksums,
706 )?;
707 shared.last_checksum = checksums;
708 shared.max_frame.store(frame_id, Ordering::SeqCst);
709 {
710 let mut frame_cache = shared.frame_cache.lock();
711 let frames = frame_cache.get_mut(&(page_id as u64));
712 match frames {
713 Some(frames) => frames.push(frame_id),
714 None => {
715 frame_cache.insert(page_id as u64, vec![frame_id]);
716 shared.pages_in_frames.lock().push(page_id as u64);
717 }
718 }
719 }
720 Ok(())
721 }
722
723 fn should_checkpoint(&self) -> bool {
724 let shared = self.get_shared();
725 let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
726 frame_id >= self.checkpoint_threshold
727 }
728
729 #[instrument(skip_all, level = Level::TRACE)]
730 fn checkpoint(
731 &mut self,
732 pager: &Pager,
733 write_counter: Rc<RefCell<usize>>,
734 mode: CheckpointMode,
735 ) -> Result<CheckpointStatus> {
736 'checkpoint_loop: loop {
737 let state = self.ongoing_checkpoint.state;
738 tracing::debug!(?state);
739 match state {
740 CheckpointState::Start => {
741 self.ongoing_checkpoint.min_frame = self.min_frame;
743 let shared = self.get_shared();
744 let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
745 for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() {
746 let this_mark = read_lock.value.load(Ordering::SeqCst);
747 if this_mark < max_safe_frame as u32 {
748 let busy = !read_lock.write();
749 if !busy {
750 let new_mark = if read_lock_idx == 0 {
751 max_safe_frame as u32
752 } else {
753 READMARK_NOT_USED
754 };
755 read_lock.value.store(new_mark, Ordering::SeqCst);
756 read_lock.unlock();
757 } else {
758 max_safe_frame = this_mark as u64;
759 }
760 }
761 }
762 self.ongoing_checkpoint.max_frame = max_safe_frame;
763 self.ongoing_checkpoint.current_page = 0;
764 self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
765 tracing::trace!(
766 "checkpoint_start(min_frame={}, max_frame={})",
767 self.ongoing_checkpoint.max_frame,
768 self.ongoing_checkpoint.min_frame
769 );
770 }
771 CheckpointState::ReadFrame => {
772 let shared = self.get_shared();
773 let min_frame = self.ongoing_checkpoint.min_frame;
774 let max_frame = self.ongoing_checkpoint.max_frame;
775 let pages_in_frames = shared.pages_in_frames.clone();
776 let pages_in_frames = pages_in_frames.lock();
777
778 let frame_cache = shared.frame_cache.clone();
779 let frame_cache = frame_cache.lock();
780 assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len());
781 if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() {
782 self.ongoing_checkpoint.state = CheckpointState::Done;
783 continue 'checkpoint_loop;
784 }
785 let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize];
786 let frames = frame_cache
787 .get(&page)
788 .expect("page must be in frame cache if it's in list");
789
790 for frame in frames.iter().rev() {
791 if *frame >= min_frame && *frame <= max_frame {
792 tracing::debug!(
793 "checkpoint page(state={:?}, page={}, frame={})",
794 state,
795 page,
796 *frame
797 );
798 self.ongoing_checkpoint.page.get().id = page as usize;
799
800 self.read_frame(
801 *frame,
802 self.ongoing_checkpoint.page.clone(),
803 self.buffer_pool.clone(),
804 )?;
805 self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
806 continue 'checkpoint_loop;
807 }
808 }
809 self.ongoing_checkpoint.current_page += 1;
810 }
811 CheckpointState::WaitReadFrame => {
812 if self.ongoing_checkpoint.page.is_locked() {
813 return Ok(CheckpointStatus::IO);
814 } else {
815 self.ongoing_checkpoint.state = CheckpointState::WritePage;
816 }
817 }
818 CheckpointState::WritePage => {
819 self.ongoing_checkpoint.page.set_dirty();
820 begin_write_btree_page(
821 pager,
822 &self.ongoing_checkpoint.page,
823 write_counter.clone(),
824 )?;
825 self.ongoing_checkpoint.state = CheckpointState::WaitWritePage;
826 }
827 CheckpointState::WaitWritePage => {
828 if *write_counter.borrow() > 0 {
829 return Ok(CheckpointStatus::IO);
830 }
831 let shared = self.get_shared();
832 if (self.ongoing_checkpoint.current_page as usize)
833 < shared.pages_in_frames.lock().len()
834 {
835 self.ongoing_checkpoint.current_page += 1;
836 self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
837 } else {
838 self.ongoing_checkpoint.state = CheckpointState::Done;
839 }
840 }
841 CheckpointState::Done => {
842 if *write_counter.borrow() > 0 {
843 return Ok(CheckpointStatus::IO);
844 }
845 let shared = self.get_shared();
846
847 let checkpoint_result = CheckpointResult {
850 num_wal_frames: shared.max_frame.load(Ordering::SeqCst),
851 num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
852 };
853 let everything_backfilled = shared.max_frame.load(Ordering::SeqCst)
854 == self.ongoing_checkpoint.max_frame;
855 if everything_backfilled {
856 if !matches!(mode, CheckpointMode::Passive) {
857 shared.frame_cache.lock().clear();
860 shared.pages_in_frames.lock().clear();
861 shared.max_frame.store(0, Ordering::SeqCst);
862 shared.nbackfills.store(0, Ordering::SeqCst);
863 if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) {
868 self.reset_wal_file(matches!(mode, CheckpointMode::Truncate))?;
869 }
870 }
871 } else {
872 shared
873 .nbackfills
874 .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
875 }
876 self.ongoing_checkpoint.state = CheckpointState::Start;
877 return Ok(CheckpointStatus::Done(checkpoint_result));
878 }
879 }
880 }
881 }
882
883 #[instrument(skip_all, level = Level::DEBUG)]
884 fn sync(&mut self) -> Result<WalFsyncStatus> {
885 match self.sync_state.get() {
886 SyncState::NotSyncing => {
887 tracing::debug!("wal_sync");
888 let syncing = self.syncing.clone();
889 self.syncing.set(true);
890 let completion = Completion::Sync(SyncCompletion {
891 complete: Box::new(move |_| {
892 tracing::debug!("wal_sync finish");
893 syncing.set(false);
894 }),
895 is_completed: Cell::new(false),
896 });
897 let shared = self.get_shared();
898 shared.file.sync(Arc::new(completion))?;
899 self.sync_state.set(SyncState::Syncing);
900 Ok(WalFsyncStatus::IO)
901 }
902 SyncState::Syncing => {
903 if self.syncing.get() {
904 tracing::debug!("wal_sync is already syncing");
905 Ok(WalFsyncStatus::IO)
906 } else {
907 self.sync_state.set(SyncState::NotSyncing);
908 Ok(WalFsyncStatus::Done)
909 }
910 }
911 }
912 }
913
914 fn get_max_frame_in_wal(&self) -> u64 {
915 self.get_shared().max_frame.load(Ordering::SeqCst)
916 }
917
918 fn get_max_frame(&self) -> u64 {
919 self.max_frame
920 }
921
922 fn get_min_frame(&self) -> u64 {
923 self.min_frame
924 }
925
926 fn rollback(&self) {
932 let start_frame = self.txn_start_max_frame.get();
933 let start_checksum = self.txn_start_last_checksum.get();
934 self.rollback_to_frame(start_frame, start_checksum)
935 .expect("wal rollback_to_frame failed unexpectedly");
936
937 tracing::debug!(
938 "wal_rollback(start_frame={}, restored_max_frame={})",
939 start_frame,
940 self.get_shared().max_frame.load(Ordering::SeqCst),
941 );
942 }
943
944 fn current_frame_state(&self) -> (u64, (u32, u32)) {
945 let shared = self.get_shared();
946 let max_frame = shared.max_frame.load(Ordering::SeqCst);
947 let checksum = shared.last_checksum;
948 (max_frame, checksum)
949 }
950
951 fn rollback_to_frame(&self, target_frame: u64, target_checksum: (u32, u32)) -> Result<()> {
952 let shared = self.get_shared();
953 {
954 let mut frame_cache = shared.frame_cache.lock();
955 let mut pages_in_frames = shared.pages_in_frames.lock();
956 frame_cache.retain(|_page_id, frames| {
958 frames.retain(|&frame| frame <= target_frame);
959 !frames.is_empty()
960 });
961 pages_in_frames.retain(|page_id| frame_cache.contains_key(page_id));
963 }
964 shared.max_frame.store(target_frame, Ordering::SeqCst);
966 shared.last_checksum = target_checksum;
967 tracing::debug!(
968 "wal_rollback_to_frame(target_frame={}, restored_max_frame={})",
969 target_frame,
970 shared.max_frame.load(Ordering::SeqCst),
971 );
972 Ok(())
973 }
974
975 fn set_reader_max_frame(&mut self, frame: u64) {
976 self.max_frame = frame;
977 }
978}
979
980impl WalFile {
981 pub fn new(
982 io: Arc<dyn IO>,
983 page_size: u32,
984 shared: Arc<UnsafeCell<WalFileShared>>,
985 buffer_pool: Rc<BufferPool>,
986 ) -> Self {
987 let checkpoint_page = Arc::new(Page::new(0));
988 let buffer = buffer_pool.get();
989 {
990 let buffer_pool = buffer_pool.clone();
991 let drop_fn = Rc::new(move |buf| {
992 buffer_pool.put(buf);
993 });
994 checkpoint_page.get().contents = Some(PageContent::new(
995 0,
996 Arc::new(RefCell::new(Buffer::new(buffer, drop_fn))),
997 ));
998 }
999 Self {
1000 io,
1001 shared,
1002 ongoing_checkpoint: OngoingCheckpoint {
1003 page: checkpoint_page,
1004 state: CheckpointState::Start,
1005 min_frame: 0,
1006 max_frame: 0,
1007 current_page: 0,
1008 },
1009 checkpoint_threshold: 1000,
1010 page_size,
1011 buffer_pool,
1012 syncing: Rc::new(Cell::new(false)),
1013 sync_state: Cell::new(SyncState::NotSyncing),
1014 max_frame: 0,
1015 min_frame: 0,
1016 max_frame_read_lock_index: 0,
1017 txn_start_max_frame: Cell::new(0),
1018 txn_start_last_checksum: Cell::new((0, 0)),
1019 }
1020 }
1021
1022 fn frame_offset(&self, frame_id: u64) -> usize {
1023 assert!(frame_id > 0, "Frame ID must be 1-based");
1024 let page_size = self.page_size;
1025 let page_offset = (frame_id - 1) * (page_size + WAL_FRAME_HEADER_SIZE as u32) as u64;
1026 let offset = WAL_HEADER_SIZE as u64 + page_offset;
1027 offset as usize
1028 }
1029
1030 #[allow(clippy::mut_from_ref)]
1031 fn get_shared(&self) -> &mut WalFileShared {
1032 unsafe {
1033 self.shared
1034 .get()
1035 .as_mut()
1036 .expect("WalFileShared pointer must be non-null")
1037 }
1038 }
1039
1040 fn reset_wal_file(&self, truncate: bool) -> Result<()> {
1050 let shared = self.get_shared();
1051 {
1054 let mut header = shared.wal_header.lock();
1055 header.salt_1 = self.io.generate_random_number() as u32;
1056 header.salt_2 = self.io.generate_random_number() as u32;
1057 let native = cfg!(target_endian = "big");
1058 let checksums = checksum_wal(
1059 &header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4],
1060 &header,
1061 (0, 0),
1062 native,
1063 );
1064 header.checksum_1 = checksums.0;
1065 header.checksum_2 = checksums.1;
1066 shared.last_checksum = checksums;
1067 }
1068 if truncate {
1069 let truncated = Rc::new(Cell::new(false));
1071 let truncated_cb = truncated.clone();
1072 let completion = Completion::Sync(SyncCompletion {
1073 complete: Box::new(move |_| {
1074 truncated_cb.set(true);
1075 }),
1076 is_completed: Cell::new(false),
1077 });
1078 shared.file.truncate(0, Arc::new(completion))?;
1079 let mut attempts = 0;
1080 while !truncated.get() {
1081 self.io.run_once()?;
1082 attempts += 1;
1083 if attempts >= 1000 {
1084 return Err(crate::error::LimboError::InternalError(
1085 "failed to truncate WAL file".to_string(),
1086 ));
1087 }
1088 }
1089 }
1090 {
1095 let header = shared.wal_header.lock();
1096 sqlite3_ondisk::begin_write_wal_header(&shared.file, &header)?;
1097 }
1098 Ok(())
1099 }
1100}
1101
1102impl WalFileShared {
1103 pub fn open_shared(
1104 io: &Arc<dyn IO>,
1105 path: &str,
1106 page_size: u32,
1107 ) -> Result<Arc<UnsafeCell<WalFileShared>>> {
1108 Self::open_shared_inner(io, path, page_size, false)
1109 }
1110
1111 pub fn open_shared_inner(
1123 io: &Arc<dyn IO>,
1124 path: &str,
1125 page_size: u32,
1126 db_freshly_created: bool,
1127 ) -> Result<Arc<UnsafeCell<WalFileShared>>> {
1128 let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
1129 let orphaned_wal = db_freshly_created && file.size()? > 0;
1130 if orphaned_wal {
1131 tracing::warn!(
1132 "discarding orphaned WAL at {:?}: main database file was freshly created, \
1133 so this WAL belongs to a previous database incarnation and will not be replayed",
1134 path
1135 );
1136 }
1137 let header = if !orphaned_wal && file.size()? > 0 {
1145 let (wal_file_shared, parse_error) = sqlite3_ondisk::read_entire_wal_dumb(&file)?;
1146 let mut max_loops = 100_000;
1147 while !unsafe { &*wal_file_shared.get() }
1148 .loaded
1149 .load(Ordering::SeqCst)
1150 {
1151 io.run_once()?;
1152 max_loops -= 1;
1153 if max_loops == 0 {
1154 return Err(crate::error::LimboError::InternalError(
1155 "WAL file not loaded after 100000 IO iterations".to_string(),
1156 ));
1157 }
1158 }
1159 if let Some(err) = parse_error.lock().take() {
1161 return Err(err);
1162 }
1163 return Ok(wal_file_shared);
1164 } else {
1165 let magic = if cfg!(target_endian = "big") {
1166 WAL_MAGIC_BE
1167 } else {
1168 WAL_MAGIC_LE
1169 };
1170 let mut wal_header = WalHeader {
1171 magic,
1172 file_format: 3007000,
1173 page_size,
1174 checkpoint_seq: 0, salt_1: io.generate_random_number() as u32,
1176 salt_2: io.generate_random_number() as u32,
1177 checksum_1: 0,
1178 checksum_2: 0,
1179 };
1180 let native = cfg!(target_endian = "big"); let checksums = (0, 0);
1185 let checksums = checksum_wal(
1186 &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], &wal_header,
1188 checksums,
1189 native, );
1191 wal_header.checksum_1 = checksums.0;
1192 wal_header.checksum_2 = checksums.1;
1193 sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
1194 Arc::new(SpinLock::new(wal_header))
1195 };
1196 let checksum = {
1197 let checksum = header.lock();
1198 (checksum.checksum_1, checksum.checksum_2)
1199 };
1200 let shared = WalFileShared {
1201 wal_header: header,
1202 min_frame: AtomicU64::new(0),
1203 max_frame: AtomicU64::new(0),
1204 nbackfills: AtomicU64::new(0),
1205 frame_cache: Arc::new(SpinLock::new(HashMap::new())),
1206 last_checksum: checksum,
1207 file,
1208 pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
1209 read_locks: array::from_fn(|_| LimboRwLock {
1210 lock: AtomicU32::new(NO_LOCK),
1211 nreads: AtomicU32::new(0),
1212 value: AtomicU32::new(READMARK_NOT_USED),
1213 }),
1214 write_lock: LimboRwLock {
1215 lock: AtomicU32::new(NO_LOCK),
1216 nreads: AtomicU32::new(0),
1217 value: AtomicU32::new(READMARK_NOT_USED),
1218 },
1219 loaded: AtomicBool::new(true),
1220 };
1221 Ok(Arc::new(UnsafeCell::new(shared)))
1222 }
1223}