Skip to main content

limbo_core/storage/
wal.rs

1#![allow(clippy::arc_with_non_send_sync)]
2#![allow(clippy::not_unsafe_ptr_arg_deref)]
3
4use std::array;
5use std::cell::UnsafeCell;
6use std::collections::HashMap;
7use strum::EnumString;
8use tracing::{instrument, Level};
9
10use std::fmt::Formatter;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::{
13    cell::{Cell, RefCell},
14    fmt,
15    rc::Rc,
16    sync::Arc,
17};
18
19use crate::fast_lock::SpinLock;
20use crate::io::{File, SyncCompletion, IO};
21use crate::result::LimboResult;
22use crate::storage::sqlite3_ondisk::{
23    begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE,
24    WAL_HEADER_SIZE,
25};
26use crate::{Buffer, Result};
27use crate::{Completion, Page};
28
29use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
30
31use super::buffer_pool::BufferPool;
32use super::pager::{PageRef, Pager};
33use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
34
35pub const READMARK_NOT_USED: u32 = 0xffffffff;
36
37pub const NO_LOCK: u32 = 0;
38pub const SHARED_LOCK: u32 = 1;
39pub const WRITE_LOCK: u32 = 2;
40
41#[derive(Debug, Copy, Clone)]
42pub struct CheckpointResult {
43    /// number of frames in WAL
44    pub num_wal_frames: u64,
45    /// number of frames moved successfully from WAL to db file after checkpoint
46    pub num_checkpointed_frames: u64,
47}
48
49impl Default for CheckpointResult {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl CheckpointResult {
56    pub fn new() -> Self {
57        Self {
58            num_wal_frames: 0,
59            num_checkpointed_frames: 0,
60        }
61    }
62}
63
64#[derive(Debug, Copy, Clone, EnumString)]
65#[strum(ascii_case_insensitive)]
66pub enum CheckpointMode {
67    /// Checkpoint as many frames as possible without waiting for any database readers or writers to finish, then sync the database file if all frames in the log were checkpointed.
68    Passive,
69    /// This mode blocks until there is no database writer and all readers are reading from the most recent database snapshot. It then checkpoints all frames in the log file and syncs the database file. This mode blocks new database writers while it is pending, but new database readers are allowed to continue unimpeded.
70    Full,
71    /// This mode works the same way as `Full` with the addition that after checkpointing the log file it blocks (calls the busy-handler callback) until all readers are reading from the database file only. This ensures that the next writer will restart the log file from the beginning. Like `Full`, this mode blocks new database writer attempts while it is pending, but does not impede readers.
72    Restart,
73    /// This mode works the same way as `Restart` with the addition that it also truncates the log file to zero bytes just prior to a successful return.
74    Truncate,
75}
76
77#[derive(Debug, Default)]
78pub struct LimboRwLock {
79    lock: AtomicU32,
80    nreads: AtomicU32,
81    value: AtomicU32,
82}
83
84impl LimboRwLock {
85    pub fn new() -> Self {
86        Self {
87            lock: AtomicU32::new(NO_LOCK),
88            nreads: AtomicU32::new(0),
89            value: AtomicU32::new(READMARK_NOT_USED),
90        }
91    }
92
93    /// Shared lock. Returns true if it was successful, false if it couldn't lock it
94    pub fn read(&mut self) -> bool {
95        let lock = self.lock.load(Ordering::SeqCst);
96        let ok = match lock {
97            NO_LOCK => {
98                let res = self.lock.compare_exchange(
99                    lock,
100                    SHARED_LOCK,
101                    Ordering::SeqCst,
102                    Ordering::SeqCst,
103                );
104                let ok = res.is_ok();
105                if ok {
106                    self.nreads.fetch_add(1, Ordering::SeqCst);
107                }
108                ok
109            }
110            SHARED_LOCK => {
111                // There is this race condition where we could've unlocked after loading lock ==
112                // SHARED_LOCK.
113                self.nreads.fetch_add(1, Ordering::SeqCst);
114                let lock_after_load = self.lock.load(Ordering::SeqCst);
115                if lock_after_load != lock {
116                    // try to lock it again
117                    let res = self.lock.compare_exchange(
118                        lock_after_load,
119                        SHARED_LOCK,
120                        Ordering::SeqCst,
121                        Ordering::SeqCst,
122                    );
123                    let ok = res.is_ok();
124                    if ok {
125                        // we were able to acquire it back
126                        true
127                    } else {
128                        // we couldn't acquire it back, reduce number again
129                        self.nreads.fetch_sub(1, Ordering::SeqCst);
130                        false
131                    }
132                } else {
133                    true
134                }
135            }
136            WRITE_LOCK => false,
137            _ => unreachable!(),
138        };
139        tracing::trace!("read_lock({})", ok);
140        ok
141    }
142
143    /// Locks exclusively. Returns true if it was successful, false if it couldn't lock it
144    pub fn write(&mut self) -> bool {
145        let lock = self.lock.load(Ordering::SeqCst);
146        let ok = match lock {
147            NO_LOCK => {
148                let res = self.lock.compare_exchange(
149                    lock,
150                    WRITE_LOCK,
151                    Ordering::SeqCst,
152                    Ordering::SeqCst,
153                );
154                res.is_ok()
155            }
156            SHARED_LOCK => {
157                // no op
158                false
159            }
160            WRITE_LOCK => false,
161            _ => unreachable!(),
162        };
163        tracing::trace!("write_lock({})", ok);
164        ok
165    }
166
167    /// Unlock the current held lock.
168    pub fn unlock(&mut self) {
169        let lock = self.lock.load(Ordering::SeqCst);
170        tracing::trace!("unlock(lock={})", lock);
171        match lock {
172            NO_LOCK => {}
173            SHARED_LOCK => {
174                let prev = self.nreads.fetch_sub(1, Ordering::SeqCst);
175                if prev == 1 {
176                    let res = self.lock.compare_exchange(
177                        lock,
178                        NO_LOCK,
179                        Ordering::SeqCst,
180                        Ordering::SeqCst,
181                    );
182                    assert!(res.is_ok());
183                }
184            }
185            WRITE_LOCK => {
186                let res =
187                    self.lock
188                        .compare_exchange(lock, NO_LOCK, Ordering::SeqCst, Ordering::SeqCst);
189                assert!(res.is_ok());
190            }
191            _ => unreachable!(),
192        }
193    }
194}
195
196/// Write-ahead log (WAL).
197pub trait Wal {
198    /// Begin a read transaction.
199    fn begin_read_tx(&mut self) -> Result<LimboResult>;
200
201    /// Begin a write transaction.
202    fn begin_write_tx(&mut self) -> Result<LimboResult>;
203
204    /// End a read transaction.
205    fn end_read_tx(&self) -> Result<LimboResult>;
206
207    /// End a write transaction.
208    fn end_write_tx(&self) -> Result<LimboResult>;
209
210    /// Find the latest frame containing a page.
211    fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
212
213    /// Read a frame from the WAL.
214    fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()>;
215
216    /// Read a frame from the WAL.
217    fn read_frame_raw(
218        &self,
219        frame_id: u64,
220        buffer_pool: Rc<BufferPool>,
221        frame: *mut u8,
222        frame_len: u32,
223    ) -> Result<Arc<Completion>>;
224
225    /// Write a frame to the WAL.
226    fn append_frame(
227        &mut self,
228        page: PageRef,
229        db_size: u32,
230        write_counter: Rc<RefCell<usize>>,
231    ) -> Result<()>;
232
233    fn should_checkpoint(&self) -> bool;
234    fn checkpoint(
235        &mut self,
236        pager: &Pager,
237        write_counter: Rc<RefCell<usize>>,
238        mode: CheckpointMode,
239    ) -> Result<CheckpointStatus>;
240    fn sync(&mut self) -> Result<WalFsyncStatus>;
241    fn get_max_frame_in_wal(&self) -> u64;
242    fn get_max_frame(&self) -> u64;
243    fn get_min_frame(&self) -> u64;
244
245    /// Roll back the current write transaction.
246    ///
247    /// Restores `max_frame` and `last_checksum` to the values they held at the
248    /// start of the write transaction, and removes any frame-cache entries that
249    /// were written during that transaction.
250    fn rollback(&self);
251
252    /// Returns (current max_frame, current last_checksum) for savepoint capture.
253    fn current_frame_state(&self) -> (u64, (u32, u32));
254
255    /// Roll back WAL to a specific target frame (for ROLLBACK TO SAVEPOINT).
256    ///
257    /// Prunes frame_cache and pages_in_frames for all frames > target_frame,
258    /// then restores max_frame and last_checksum to the savepoint snapshot.
259    fn rollback_to_frame(&self, target_frame: u64, target_checksum: (u32, u32)) -> Result<()>;
260
261    /// Update the reader-visible max frame boundary.
262    ///
263    /// `WalFile::max_frame` controls which frames `find_frame` will return.
264    /// At `SAVEPOINT` open time we eagerly flush dirty pages to WAL and must
265    /// raise this boundary so that post-rollback reads see the newly written
266    /// frames.  At `ROLLBACK TO` time we restore it to the savepoint snapshot
267    /// so reads see exactly the savepoint state.
268    fn set_reader_max_frame(&mut self, frame: u64);
269}
270
271/// A dummy WAL implementation that does nothing.
272/// This is used for ephemeral indexes where a WAL is not really
273/// needed, and is preferable to passing an Option<dyn Wal> around
274/// everywhere.
275pub struct DummyWAL;
276
277impl Wal for DummyWAL {
278    fn begin_read_tx(&mut self) -> Result<LimboResult> {
279        Ok(LimboResult::Ok)
280    }
281
282    fn end_read_tx(&self) -> Result<LimboResult> {
283        Ok(LimboResult::Ok)
284    }
285
286    fn begin_write_tx(&mut self) -> Result<LimboResult> {
287        Ok(LimboResult::Ok)
288    }
289
290    fn end_write_tx(&self) -> Result<LimboResult> {
291        Ok(LimboResult::Ok)
292    }
293
294    fn find_frame(&self, _page_id: u64) -> Result<Option<u64>> {
295        Ok(None)
296    }
297
298    fn read_frame(
299        &self,
300        _frame_id: u64,
301        _page: crate::PageRef,
302        _buffer_pool: Rc<BufferPool>,
303    ) -> Result<()> {
304        Ok(())
305    }
306
307    fn read_frame_raw(
308        &self,
309        _frame_id: u64,
310        _buffer_pool: Rc<BufferPool>,
311        _frame: *mut u8,
312        _frame_len: u32,
313    ) -> Result<Arc<Completion>> {
314        todo!();
315    }
316
317    fn append_frame(
318        &mut self,
319        _page: crate::PageRef,
320        _db_size: u32,
321        _write_counter: Rc<RefCell<usize>>,
322    ) -> Result<()> {
323        Ok(())
324    }
325
326    fn should_checkpoint(&self) -> bool {
327        false
328    }
329
330    fn checkpoint(
331        &mut self,
332        _pager: &Pager,
333        _write_counter: Rc<RefCell<usize>>,
334        _mode: crate::CheckpointMode,
335    ) -> Result<crate::CheckpointStatus> {
336        Ok(crate::CheckpointStatus::Done(
337            crate::CheckpointResult::default(),
338        ))
339    }
340
341    fn sync(&mut self) -> Result<crate::storage::wal::WalFsyncStatus> {
342        Ok(crate::storage::wal::WalFsyncStatus::Done)
343    }
344
345    fn get_max_frame_in_wal(&self) -> u64 {
346        0
347    }
348
349    fn get_max_frame(&self) -> u64 {
350        0
351    }
352
353    fn get_min_frame(&self) -> u64 {
354        0
355    }
356
357    fn rollback(&self) {
358        // DummyWAL has no persistent state to roll back.
359    }
360
361    fn current_frame_state(&self) -> (u64, (u32, u32)) {
362        (0, (0, 0))
363    }
364
365    fn rollback_to_frame(&self, _target_frame: u64, _target_checksum: (u32, u32)) -> Result<()> {
366        Ok(())
367    }
368
369    fn set_reader_max_frame(&mut self, _frame: u64) {
370        // DummyWAL has no reader max frame state.
371    }
372}
373
374// Syncing requires a state machine because we need to schedule a sync and then wait until it is
375// finished. If we don't wait there will be undefined behaviour that no one wants to debug.
376#[derive(Copy, Clone, Debug)]
377enum SyncState {
378    NotSyncing,
379    Syncing,
380}
381
382#[derive(Debug, Copy, Clone)]
383pub enum CheckpointState {
384    Start,
385    ReadFrame,
386    WaitReadFrame,
387    WritePage,
388    WaitWritePage,
389    Done,
390}
391
392#[derive(Debug, Copy, Clone, PartialEq)]
393pub enum WalFsyncStatus {
394    Done,
395    IO,
396}
397
398#[derive(Debug, Copy, Clone)]
399pub enum CheckpointStatus {
400    Done(CheckpointResult),
401    IO,
402}
403
404// Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save
405// in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do
406// page operations like reading a frame to a page, and writing a page to disk. This page should not
407// be placed back in pager page cache or anything, it's just a helper.
408// min_frame and max_frame is the range of frames that can be safely transferred from WAL to db
409// file.
410// current_page is a helper to iterate through all the pages that might have a frame in the safe
411// range. This is inefficient for now.
412struct OngoingCheckpoint {
413    page: PageRef,
414    state: CheckpointState,
415    min_frame: u64,
416    max_frame: u64,
417    current_page: u64,
418}
419
420impl fmt::Debug for OngoingCheckpoint {
421    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422        f.debug_struct("OngoingCheckpoint")
423            .field("state", &self.state)
424            .field("min_frame", &self.min_frame)
425            .field("max_frame", &self.max_frame)
426            .field("current_page", &self.current_page)
427            .finish()
428    }
429}
430
431#[allow(dead_code)]
432pub struct WalFile {
433    io: Arc<dyn IO>,
434    buffer_pool: Rc<BufferPool>,
435
436    syncing: Rc<Cell<bool>>,
437    sync_state: Cell<SyncState>,
438    page_size: u32,
439
440    shared: Arc<UnsafeCell<WalFileShared>>,
441    ongoing_checkpoint: OngoingCheckpoint,
442    checkpoint_threshold: usize,
443    // min and max frames for this connection
444    /// This is the index to the read_lock in WalFileShared that we are holding. This lock contains
445    /// the max frame for this connection.
446    max_frame_read_lock_index: usize,
447    /// Max frame allowed to lookup range=(minframe..max_frame)
448    max_frame: u64,
449    /// Start of range to look for frames range=(minframe..max_frame)
450    min_frame: u64,
451    /// Snapshot of shared `max_frame` taken at the start of the current write
452    /// transaction.  Used by `rollback()` to undo any appended frames.
453    txn_start_max_frame: Cell<u64>,
454    /// Snapshot of `shared.last_checksum` taken at the start of the current
455    /// write transaction.  Used by `rollback()` to restore the cumulative
456    /// checksum.
457    txn_start_last_checksum: Cell<(u32, u32)>,
458}
459
460impl fmt::Debug for WalFile {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        f.debug_struct("WalFile")
463            .field("syncing", &self.syncing.get())
464            .field("sync_state", &self.sync_state)
465            .field("page_size", &self.page_size)
466            .field("shared", &self.shared)
467            .field("ongoing_checkpoint", &self.ongoing_checkpoint)
468            .field("checkpoint_threshold", &self.checkpoint_threshold)
469            .field("max_frame_read_lock_index", &self.max_frame_read_lock_index)
470            .field("max_frame", &self.max_frame)
471            .field("min_frame", &self.min_frame)
472            .field("txn_start_max_frame", &self.txn_start_max_frame)
473            .field("txn_start_last_checksum", &self.txn_start_last_checksum)
474            // Excluding io, buffer_pool
475            .finish()
476    }
477}
478
479// TODO(pere): lock only important parts + pin WalFileShared
480/// WalFileShared is the part of a WAL that will be shared between threads. A wal has information
481/// that needs to be communicated between threads so this struct does the job.
482#[allow(dead_code)]
483pub struct WalFileShared {
484    pub wal_header: Arc<SpinLock<WalHeader>>,
485    pub min_frame: AtomicU64,
486    pub max_frame: AtomicU64,
487    pub nbackfills: AtomicU64,
488    // Frame cache maps a Page to all the frames it has stored in WAL in ascending order.
489    // This is to easily find the frame it must checkpoint each connection if a checkpoint is
490    // necessary.
491    // One difference between SQLite and limbo is that we will never support multi process, meaning
492    // we don't need WAL's index file. So we can do stuff like this without shared memory.
493    // TODO: this will need refactoring because this is incredible memory inefficient.
494    pub frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
495    // Another memory inefficient array made to just keep track of pages that are in frame_cache.
496    pub pages_in_frames: Arc<SpinLock<Vec<u64>>>,
497    pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
498    pub file: Arc<dyn File>,
499    /// read_locks is a list of read locks that can coexist with the max_frame number stored in
500    /// value. There is a limited amount because and unbounded amount of connections could be
501    /// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max
502    /// frames that is equal to 5
503    pub read_locks: [LimboRwLock; 5],
504    /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only
505    /// one used.
506    pub write_lock: LimboRwLock,
507    pub loaded: AtomicBool,
508}
509
510impl fmt::Debug for WalFileShared {
511    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512        f.debug_struct("WalFileShared")
513            .field("wal_header", &self.wal_header)
514            .field("min_frame", &self.min_frame)
515            .field("max_frame", &self.max_frame)
516            .field("nbackfills", &self.nbackfills)
517            .field("frame_cache", &self.frame_cache)
518            .field("pages_in_frames", &self.pages_in_frames)
519            .field("last_checksum", &self.last_checksum)
520            // Excluding `file`, `read_locks`, and `write_lock`
521            .finish()
522    }
523}
524
525impl Wal for WalFile {
526    /// Begin a read transaction.
527    fn begin_read_tx(&mut self) -> Result<LimboResult> {
528        let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
529
530        let mut max_read_mark = 0;
531        let mut max_read_mark_index = -1;
532        // Find the largest mark we can find, ignore frames that are impossible to be in range and
533        // that are not set
534        for (index, lock) in self.get_shared().read_locks.iter().enumerate() {
535            let this_mark = lock.value.load(Ordering::SeqCst);
536            if this_mark > max_read_mark && this_mark <= max_frame_in_wal as u32 {
537                max_read_mark = this_mark;
538                max_read_mark_index = index as i64;
539            }
540        }
541
542        // If we didn't find any mark or we can update, let's update them
543        if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 {
544            for (index, lock) in self.get_shared().read_locks.iter_mut().enumerate() {
545                let busy = !lock.write();
546                if !busy {
547                    // If this was busy then it must mean >1 threads tried to set this read lock
548                    lock.value.store(max_frame_in_wal as u32, Ordering::SeqCst);
549                    max_read_mark = max_frame_in_wal as u32;
550                    max_read_mark_index = index as i64;
551                    lock.unlock();
552                    break;
553                }
554            }
555        }
556
557        if max_read_mark_index == -1 {
558            return Ok(LimboResult::Busy);
559        }
560
561        let shared = self.get_shared();
562        {
563            let lock = &mut shared.read_locks[max_read_mark_index as usize];
564            let busy = !lock.read();
565            if busy {
566                return Ok(LimboResult::Busy);
567            }
568        }
569        self.min_frame = shared.nbackfills.load(Ordering::SeqCst) + 1;
570        self.max_frame_read_lock_index = max_read_mark_index as usize;
571        self.max_frame = max_read_mark as u64;
572        tracing::debug!(
573            "begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})",
574            self.min_frame,
575            self.max_frame,
576            self.max_frame_read_lock_index,
577            max_frame_in_wal
578        );
579        Ok(LimboResult::Ok)
580    }
581
582    /// End a read transaction.
583    #[inline(always)]
584    fn end_read_tx(&self) -> Result<LimboResult> {
585        tracing::debug!("end_read_tx");
586        let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index];
587        read_lock.unlock();
588        Ok(LimboResult::Ok)
589    }
590
591    /// Begin a write transaction
592    fn begin_write_tx(&mut self) -> Result<LimboResult> {
593        let busy = !self.get_shared().write_lock.write();
594        tracing::debug!("begin_write_transaction(busy={})", busy);
595        if busy {
596            return Ok(LimboResult::Busy);
597        }
598        // Record the WAL state at the start of this write transaction so that
599        // rollback() can restore it precisely.
600        let shared = self.get_shared();
601        self.txn_start_max_frame
602            .set(shared.max_frame.load(Ordering::SeqCst));
603        self.txn_start_last_checksum.set(shared.last_checksum);
604        Ok(LimboResult::Ok)
605    }
606
607    /// End a write transaction
608    fn end_write_tx(&self) -> Result<LimboResult> {
609        tracing::debug!("end_write_txn");
610        self.get_shared().write_lock.unlock();
611        Ok(LimboResult::Ok)
612    }
613
614    /// Find the latest frame containing a page.
615    fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
616        let shared = self.get_shared();
617        let frames = shared.frame_cache.lock();
618        let frames = frames.get(&page_id);
619        if frames.is_none() {
620            return Ok(None);
621        }
622        let frames = frames.expect("frames must be Some after is_none() check");
623        for frame in frames.iter().rev() {
624            if *frame <= self.max_frame {
625                return Ok(Some(*frame));
626            }
627        }
628        Ok(None)
629    }
630
631    /// Read a frame from the WAL.
632    fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()> {
633        tracing::debug!("read_frame({})", frame_id);
634        let offset = self.frame_offset(frame_id);
635        page.set_locked();
636        let frame = page.clone();
637        let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
638            let frame = frame.clone();
639            finish_read_page(page.get().id, buf, frame)
640                .expect("finish_read_page failed in WAL read completion");
641        });
642        begin_read_wal_frame(
643            &self.get_shared().file,
644            offset + WAL_FRAME_HEADER_SIZE,
645            buffer_pool,
646            complete,
647        )?;
648        Ok(())
649    }
650
651    fn read_frame_raw(
652        &self,
653        frame_id: u64,
654        buffer_pool: Rc<BufferPool>,
655        frame: *mut u8,
656        frame_len: u32,
657    ) -> Result<Arc<Completion>> {
658        tracing::debug!("read_frame({})", frame_id);
659        let offset = self.frame_offset(frame_id);
660        let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
661            let buf = buf.borrow();
662            let buf_ptr = buf.as_ptr();
663            unsafe {
664                std::ptr::copy_nonoverlapping(buf_ptr, frame, frame_len as usize);
665            }
666        });
667        let c = begin_read_wal_frame(
668            &self.get_shared().file,
669            offset + WAL_FRAME_HEADER_SIZE,
670            buffer_pool,
671            complete,
672        )?;
673        Ok(c)
674    }
675
676    /// Write a frame to the WAL.
677    fn append_frame(
678        &mut self,
679        page: PageRef,
680        db_size: u32,
681        write_counter: Rc<RefCell<usize>>,
682    ) -> Result<()> {
683        let page_id = page.get().id;
684        let shared = self.get_shared();
685        let max_frame = shared.max_frame.load(Ordering::SeqCst);
686        let frame_id = if max_frame == 0 { 1 } else { max_frame + 1 };
687        let offset = self.frame_offset(frame_id);
688        tracing::debug!(
689            "append_frame(frame={}, offset={}, page_id={})",
690            frame_id,
691            offset,
692            page_id
693        );
694        let header = shared.wal_header.clone();
695        let header = header.lock();
696        let checksums = shared.last_checksum;
697        let checksums = begin_write_wal_frame(
698            &shared.file,
699            offset,
700            &page,
701            self.page_size as u16,
702            db_size,
703            write_counter,
704            &header,
705            checksums,
706        )?;
707        shared.last_checksum = checksums;
708        shared.max_frame.store(frame_id, Ordering::SeqCst);
709        {
710            let mut frame_cache = shared.frame_cache.lock();
711            let frames = frame_cache.get_mut(&(page_id as u64));
712            match frames {
713                Some(frames) => frames.push(frame_id),
714                None => {
715                    frame_cache.insert(page_id as u64, vec![frame_id]);
716                    shared.pages_in_frames.lock().push(page_id as u64);
717                }
718            }
719        }
720        Ok(())
721    }
722
723    fn should_checkpoint(&self) -> bool {
724        let shared = self.get_shared();
725        let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
726        frame_id >= self.checkpoint_threshold
727    }
728
729    #[instrument(skip_all, level = Level::TRACE)]
730    fn checkpoint(
731        &mut self,
732        pager: &Pager,
733        write_counter: Rc<RefCell<usize>>,
734        mode: CheckpointMode,
735    ) -> Result<CheckpointStatus> {
736        'checkpoint_loop: loop {
737            let state = self.ongoing_checkpoint.state;
738            tracing::debug!(?state);
739            match state {
740                CheckpointState::Start => {
741                    // TODO(pere): check what frames are safe to checkpoint between many readers!
742                    self.ongoing_checkpoint.min_frame = self.min_frame;
743                    let shared = self.get_shared();
744                    let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
745                    for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() {
746                        let this_mark = read_lock.value.load(Ordering::SeqCst);
747                        if this_mark < max_safe_frame as u32 {
748                            let busy = !read_lock.write();
749                            if !busy {
750                                let new_mark = if read_lock_idx == 0 {
751                                    max_safe_frame as u32
752                                } else {
753                                    READMARK_NOT_USED
754                                };
755                                read_lock.value.store(new_mark, Ordering::SeqCst);
756                                read_lock.unlock();
757                            } else {
758                                max_safe_frame = this_mark as u64;
759                            }
760                        }
761                    }
762                    self.ongoing_checkpoint.max_frame = max_safe_frame;
763                    self.ongoing_checkpoint.current_page = 0;
764                    self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
765                    tracing::trace!(
766                        "checkpoint_start(min_frame={}, max_frame={})",
767                        self.ongoing_checkpoint.max_frame,
768                        self.ongoing_checkpoint.min_frame
769                    );
770                }
771                CheckpointState::ReadFrame => {
772                    let shared = self.get_shared();
773                    let min_frame = self.ongoing_checkpoint.min_frame;
774                    let max_frame = self.ongoing_checkpoint.max_frame;
775                    let pages_in_frames = shared.pages_in_frames.clone();
776                    let pages_in_frames = pages_in_frames.lock();
777
778                    let frame_cache = shared.frame_cache.clone();
779                    let frame_cache = frame_cache.lock();
780                    assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len());
781                    if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() {
782                        self.ongoing_checkpoint.state = CheckpointState::Done;
783                        continue 'checkpoint_loop;
784                    }
785                    let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize];
786                    let frames = frame_cache
787                        .get(&page)
788                        .expect("page must be in frame cache if it's in list");
789
790                    for frame in frames.iter().rev() {
791                        if *frame >= min_frame && *frame <= max_frame {
792                            tracing::debug!(
793                                "checkpoint page(state={:?}, page={}, frame={})",
794                                state,
795                                page,
796                                *frame
797                            );
798                            self.ongoing_checkpoint.page.get().id = page as usize;
799
800                            self.read_frame(
801                                *frame,
802                                self.ongoing_checkpoint.page.clone(),
803                                self.buffer_pool.clone(),
804                            )?;
805                            self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
806                            continue 'checkpoint_loop;
807                        }
808                    }
809                    self.ongoing_checkpoint.current_page += 1;
810                }
811                CheckpointState::WaitReadFrame => {
812                    if self.ongoing_checkpoint.page.is_locked() {
813                        return Ok(CheckpointStatus::IO);
814                    } else {
815                        self.ongoing_checkpoint.state = CheckpointState::WritePage;
816                    }
817                }
818                CheckpointState::WritePage => {
819                    self.ongoing_checkpoint.page.set_dirty();
820                    begin_write_btree_page(
821                        pager,
822                        &self.ongoing_checkpoint.page,
823                        write_counter.clone(),
824                    )?;
825                    self.ongoing_checkpoint.state = CheckpointState::WaitWritePage;
826                }
827                CheckpointState::WaitWritePage => {
828                    if *write_counter.borrow() > 0 {
829                        return Ok(CheckpointStatus::IO);
830                    }
831                    let shared = self.get_shared();
832                    if (self.ongoing_checkpoint.current_page as usize)
833                        < shared.pages_in_frames.lock().len()
834                    {
835                        self.ongoing_checkpoint.current_page += 1;
836                        self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
837                    } else {
838                        self.ongoing_checkpoint.state = CheckpointState::Done;
839                    }
840                }
841                CheckpointState::Done => {
842                    if *write_counter.borrow() > 0 {
843                        return Ok(CheckpointStatus::IO);
844                    }
845                    let shared = self.get_shared();
846
847                    // Record two num pages fields to return as checkpoint result to caller.
848                    // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
849                    let checkpoint_result = CheckpointResult {
850                        num_wal_frames: shared.max_frame.load(Ordering::SeqCst),
851                        num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
852                    };
853                    let everything_backfilled = shared.max_frame.load(Ordering::SeqCst)
854                        == self.ongoing_checkpoint.max_frame;
855                    if everything_backfilled {
856                        if !matches!(mode, CheckpointMode::Passive) {
857                            // Everything was backfilled into the db file, so it is
858                            // safe to reset the WAL bookkeeping.
859                            shared.frame_cache.lock().clear();
860                            shared.pages_in_frames.lock().clear();
861                            shared.max_frame.store(0, Ordering::SeqCst);
862                            shared.nbackfills.store(0, Ordering::SeqCst);
863                            // For Restart/Truncate, physically reset the on-disk WAL:
864                            // rewrite a fresh header (new salt) so leftover frames are
865                            // rejected by future recovery, and for Truncate shrink the
866                            // file to zero bytes first.
867                            if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) {
868                                self.reset_wal_file(matches!(mode, CheckpointMode::Truncate))?;
869                            }
870                        }
871                    } else {
872                        shared
873                            .nbackfills
874                            .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
875                    }
876                    self.ongoing_checkpoint.state = CheckpointState::Start;
877                    return Ok(CheckpointStatus::Done(checkpoint_result));
878                }
879            }
880        }
881    }
882
883    #[instrument(skip_all, level = Level::DEBUG)]
884    fn sync(&mut self) -> Result<WalFsyncStatus> {
885        match self.sync_state.get() {
886            SyncState::NotSyncing => {
887                tracing::debug!("wal_sync");
888                let syncing = self.syncing.clone();
889                self.syncing.set(true);
890                let completion = Completion::Sync(SyncCompletion {
891                    complete: Box::new(move |_| {
892                        tracing::debug!("wal_sync finish");
893                        syncing.set(false);
894                    }),
895                    is_completed: Cell::new(false),
896                });
897                let shared = self.get_shared();
898                shared.file.sync(Arc::new(completion))?;
899                self.sync_state.set(SyncState::Syncing);
900                Ok(WalFsyncStatus::IO)
901            }
902            SyncState::Syncing => {
903                if self.syncing.get() {
904                    tracing::debug!("wal_sync is already syncing");
905                    Ok(WalFsyncStatus::IO)
906                } else {
907                    self.sync_state.set(SyncState::NotSyncing);
908                    Ok(WalFsyncStatus::Done)
909                }
910            }
911        }
912    }
913
914    fn get_max_frame_in_wal(&self) -> u64 {
915        self.get_shared().max_frame.load(Ordering::SeqCst)
916    }
917
918    fn get_max_frame(&self) -> u64 {
919        self.max_frame
920    }
921
922    fn get_min_frame(&self) -> u64 {
923        self.min_frame
924    }
925
926    /// Roll back the current write transaction.
927    ///
928    /// Restores `shared.max_frame` and `shared.last_checksum` to the snapshot
929    /// captured in `begin_write_tx`, then removes from the frame cache every
930    /// frame that was appended during this transaction.
931    fn rollback(&self) {
932        let start_frame = self.txn_start_max_frame.get();
933        let start_checksum = self.txn_start_last_checksum.get();
934        self.rollback_to_frame(start_frame, start_checksum)
935            .expect("wal rollback_to_frame failed unexpectedly");
936
937        tracing::debug!(
938            "wal_rollback(start_frame={}, restored_max_frame={})",
939            start_frame,
940            self.get_shared().max_frame.load(Ordering::SeqCst),
941        );
942    }
943
944    fn current_frame_state(&self) -> (u64, (u32, u32)) {
945        let shared = self.get_shared();
946        let max_frame = shared.max_frame.load(Ordering::SeqCst);
947        let checksum = shared.last_checksum;
948        (max_frame, checksum)
949    }
950
951    fn rollback_to_frame(&self, target_frame: u64, target_checksum: (u32, u32)) -> Result<()> {
952        let shared = self.get_shared();
953        {
954            let mut frame_cache = shared.frame_cache.lock();
955            let mut pages_in_frames = shared.pages_in_frames.lock();
956            // Remove frames > target_frame from each page's list; drop empty entries.
957            frame_cache.retain(|_page_id, frames| {
958                frames.retain(|&frame| frame <= target_frame);
959                !frames.is_empty()
960            });
961            // Rebuild pages_in_frames to match the pruned frame_cache.
962            pages_in_frames.retain(|page_id| frame_cache.contains_key(page_id));
963        }
964        // Restore watermark and cumulative checksum.
965        shared.max_frame.store(target_frame, Ordering::SeqCst);
966        shared.last_checksum = target_checksum;
967        tracing::debug!(
968            "wal_rollback_to_frame(target_frame={}, restored_max_frame={})",
969            target_frame,
970            shared.max_frame.load(Ordering::SeqCst),
971        );
972        Ok(())
973    }
974
975    fn set_reader_max_frame(&mut self, frame: u64) {
976        self.max_frame = frame;
977    }
978}
979
980impl WalFile {
981    pub fn new(
982        io: Arc<dyn IO>,
983        page_size: u32,
984        shared: Arc<UnsafeCell<WalFileShared>>,
985        buffer_pool: Rc<BufferPool>,
986    ) -> Self {
987        let checkpoint_page = Arc::new(Page::new(0));
988        let buffer = buffer_pool.get();
989        {
990            let buffer_pool = buffer_pool.clone();
991            let drop_fn = Rc::new(move |buf| {
992                buffer_pool.put(buf);
993            });
994            checkpoint_page.get().contents = Some(PageContent::new(
995                0,
996                Arc::new(RefCell::new(Buffer::new(buffer, drop_fn))),
997            ));
998        }
999        Self {
1000            io,
1001            shared,
1002            ongoing_checkpoint: OngoingCheckpoint {
1003                page: checkpoint_page,
1004                state: CheckpointState::Start,
1005                min_frame: 0,
1006                max_frame: 0,
1007                current_page: 0,
1008            },
1009            checkpoint_threshold: 1000,
1010            page_size,
1011            buffer_pool,
1012            syncing: Rc::new(Cell::new(false)),
1013            sync_state: Cell::new(SyncState::NotSyncing),
1014            max_frame: 0,
1015            min_frame: 0,
1016            max_frame_read_lock_index: 0,
1017            txn_start_max_frame: Cell::new(0),
1018            txn_start_last_checksum: Cell::new((0, 0)),
1019        }
1020    }
1021
1022    fn frame_offset(&self, frame_id: u64) -> usize {
1023        assert!(frame_id > 0, "Frame ID must be 1-based");
1024        let page_size = self.page_size;
1025        let page_offset = (frame_id - 1) * (page_size + WAL_FRAME_HEADER_SIZE as u32) as u64;
1026        let offset = WAL_HEADER_SIZE as u64 + page_offset;
1027        offset as usize
1028    }
1029
1030    #[allow(clippy::mut_from_ref)]
1031    fn get_shared(&self) -> &mut WalFileShared {
1032        unsafe {
1033            self.shared
1034                .get()
1035                .as_mut()
1036                .expect("WalFileShared pointer must be non-null")
1037        }
1038    }
1039
1040    /// Reset the on-disk WAL after a Restart/Truncate checkpoint.
1041    ///
1042    /// Rewrites the 32-byte WAL header with a freshly generated salt (so any
1043    /// leftover frames still on disk are rejected by a future recovery via the
1044    /// salt-mismatch check) and resets the in-memory cumulative checksum. When
1045    /// `truncate` is true, the WAL file is also physically shrunk to zero bytes
1046    /// and a fresh empty 32-byte header is rewritten, so the on-disk `-wal`
1047    /// carries no frames (max_frame 0) and a byte-level reader sees a
1048    /// self-contained database file.
1049    fn reset_wal_file(&self, truncate: bool) -> Result<()> {
1050        let shared = self.get_shared();
1051        // Generate a new salt so any leftover frames become unrecoverable, and
1052        // recompute the header checksum over the first 24 bytes.
1053        {
1054            let mut header = shared.wal_header.lock();
1055            header.salt_1 = self.io.generate_random_number() as u32;
1056            header.salt_2 = self.io.generate_random_number() as u32;
1057            let native = cfg!(target_endian = "big");
1058            let checksums = checksum_wal(
1059                &header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4],
1060                &header,
1061                (0, 0),
1062                native,
1063            );
1064            header.checksum_1 = checksums.0;
1065            header.checksum_2 = checksums.1;
1066            shared.last_checksum = checksums;
1067        }
1068        if truncate {
1069            // Physically shrink the WAL file to 0 bytes, awaiting completion.
1070            let truncated = Rc::new(Cell::new(false));
1071            let truncated_cb = truncated.clone();
1072            let completion = Completion::Sync(SyncCompletion {
1073                complete: Box::new(move |_| {
1074                    truncated_cb.set(true);
1075                }),
1076                is_completed: Cell::new(false),
1077            });
1078            shared.file.truncate(0, Arc::new(completion))?;
1079            let mut attempts = 0;
1080            while !truncated.get() {
1081                self.io.run_once()?;
1082                attempts += 1;
1083                if attempts >= 1000 {
1084                    return Err(crate::error::LimboError::InternalError(
1085                        "failed to truncate WAL file".to_string(),
1086                    ));
1087                }
1088            }
1089        }
1090        // Rewrite a valid, empty 32-byte header (fresh salt already set above) so
1091        // subsequent in-process appends start after a well-formed header. After a
1092        // physical truncate this makes the file exactly 32 bytes (zero frames);
1093        // for Restart it rewrites the header in place.
1094        {
1095            let header = shared.wal_header.lock();
1096            sqlite3_ondisk::begin_write_wal_header(&shared.file, &header)?;
1097        }
1098        Ok(())
1099    }
1100}
1101
1102impl WalFileShared {
1103    pub fn open_shared(
1104        io: &Arc<dyn IO>,
1105        path: &str,
1106        page_size: u32,
1107    ) -> Result<Arc<UnsafeCell<WalFileShared>>> {
1108        Self::open_shared_inner(io, path, page_size, false)
1109    }
1110
1111    /// Open the shared WAL state for `path`.
1112    ///
1113    /// When `db_freshly_created` is true, the accompanying main database file
1114    /// did not exist (or was empty) and was just bootstrapped by
1115    /// [`crate::maybe_init_database_file`]. Any `-wal` file present on disk is
1116    /// therefore an orphan left behind by a previous database incarnation
1117    /// (e.g. the main `.db` was deleted while its `-wal` survived). Replaying
1118    /// such a WAL would resurrect stale committed pages on top of the fresh
1119    /// database, corrupting it (the classic symptom is row counts that grow by
1120    /// the previous content on every reopen). In that case we discard the
1121    /// orphaned WAL and start a fresh one instead of recovering from it.
1122    pub fn open_shared_inner(
1123        io: &Arc<dyn IO>,
1124        path: &str,
1125        page_size: u32,
1126        db_freshly_created: bool,
1127    ) -> Result<Arc<UnsafeCell<WalFileShared>>> {
1128        let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
1129        let orphaned_wal = db_freshly_created && file.size()? > 0;
1130        if orphaned_wal {
1131            tracing::warn!(
1132                "discarding orphaned WAL at {:?}: main database file was freshly created, \
1133                 so this WAL belongs to a previous database incarnation and will not be replayed",
1134                path
1135            );
1136        }
1137        // Recover from an existing WAL only when it is NOT orphaned. For an
1138        // orphaned WAL we fall through to the fresh-header branch below, which
1139        // overwrites the 32-byte WAL header with a brand-new random salt. That
1140        // both (a) starts this session at max_frame = 0 (so none of the stale
1141        // frames are visible now) and (b) guarantees every leftover frame still
1142        // on disk carries the OLD salt, so a future `read_entire_wal_dumb`
1143        // recovery rejects them at the first salt check and never replays them.
1144        let header = if !orphaned_wal && file.size()? > 0 {
1145            let (wal_file_shared, parse_error) = sqlite3_ondisk::read_entire_wal_dumb(&file)?;
1146            let mut max_loops = 100_000;
1147            while !unsafe { &*wal_file_shared.get() }
1148                .loaded
1149                .load(Ordering::SeqCst)
1150            {
1151                io.run_once()?;
1152                max_loops -= 1;
1153                if max_loops == 0 {
1154                    return Err(crate::error::LimboError::InternalError(
1155                        "WAL file not loaded after 100000 IO iterations".to_string(),
1156                    ));
1157                }
1158            }
1159            // If parsing the WAL detected corruption, surface it now.
1160            if let Some(err) = parse_error.lock().take() {
1161                return Err(err);
1162            }
1163            return Ok(wal_file_shared);
1164        } else {
1165            let magic = if cfg!(target_endian = "big") {
1166                WAL_MAGIC_BE
1167            } else {
1168                WAL_MAGIC_LE
1169            };
1170            let mut wal_header = WalHeader {
1171                magic,
1172                file_format: 3007000,
1173                page_size,
1174                checkpoint_seq: 0, // TODO implement sequence number
1175                salt_1: io.generate_random_number() as u32,
1176                salt_2: io.generate_random_number() as u32,
1177                checksum_1: 0,
1178                checksum_2: 0,
1179            };
1180            let native = cfg!(target_endian = "big"); // if target_endian is
1181                                                      // already big then we don't care but if isn't, header hasn't yet been
1182                                                      // encoded to big endian, therefore we want to swap bytes to compute this
1183                                                      // checksum.
1184            let checksums = (0, 0);
1185            let checksums = checksum_wal(
1186                &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
1187                &wal_header,
1188                checksums,
1189                native, // this is false because we haven't encoded the wal header yet
1190            );
1191            wal_header.checksum_1 = checksums.0;
1192            wal_header.checksum_2 = checksums.1;
1193            sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
1194            Arc::new(SpinLock::new(wal_header))
1195        };
1196        let checksum = {
1197            let checksum = header.lock();
1198            (checksum.checksum_1, checksum.checksum_2)
1199        };
1200        let shared = WalFileShared {
1201            wal_header: header,
1202            min_frame: AtomicU64::new(0),
1203            max_frame: AtomicU64::new(0),
1204            nbackfills: AtomicU64::new(0),
1205            frame_cache: Arc::new(SpinLock::new(HashMap::new())),
1206            last_checksum: checksum,
1207            file,
1208            pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
1209            read_locks: array::from_fn(|_| LimboRwLock {
1210                lock: AtomicU32::new(NO_LOCK),
1211                nreads: AtomicU32::new(0),
1212                value: AtomicU32::new(READMARK_NOT_USED),
1213            }),
1214            write_lock: LimboRwLock {
1215                lock: AtomicU32::new(NO_LOCK),
1216                nreads: AtomicU32::new(0),
1217                value: AtomicU32::new(READMARK_NOT_USED),
1218            },
1219            loaded: AtomicBool::new(true),
1220        };
1221        Ok(Arc::new(UnsafeCell::new(shared)))
1222    }
1223}