quill_sql/recovery/wal/
mod.rs

1pub mod codec;
2pub mod page;
3
4mod buffer;
5mod io;
6mod record;
7mod storage;
8mod writer;
9
10use parking_lot::{Condvar, Mutex};
11use std::cmp;
12use std::collections::VecDeque;
13use std::fmt;
14use std::fs::{self, File};
15use std::io::{BufReader, ErrorKind, Read};
16use std::path::PathBuf;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::Duration;
20
21use crate::buffer::{PageId, PAGE_SIZE};
22use crate::config::WalConfig;
23use crate::error::{QuillSQLError, QuillSQLResult};
24use crate::recovery::control_file::{ControlFileManager, WalInitState};
25use crate::recovery::wal::codec::{
26    decode_frame, encode_body, encode_frame, CheckpointPayload, PageWritePayload,
27    ResourceManagerId, WalFrame, WAL_CRC_LEN, WAL_HEADER_LEN,
28};
29use crate::recovery::wal::page::{WalPage, WalPageFragmentKind, WAL_PAGE_SIZE};
30use crate::recovery::wal_record::WalRecordPayload;
31use crate::storage::disk_manager::DiskManager;
32use crate::storage::disk_scheduler::DiskScheduler;
33use bytes::Bytes;
34use dashmap::DashSet;
35use serde::Serialize;
36
37use buffer::WalBuffer;
38use io::{DiskSchedulerWalSink, WalSink};
39use record::WalRecord;
40use storage::{list_segments, segment_path, WalFlushTicket, WalStorage};
41use writer::WalWriterRuntime;
42
43pub type Lsn = u64;
44const DEFAULT_WAL_BUFFER_CAPACITY: usize = 4096;
45
46#[derive(Debug, Clone, Copy)]
47pub struct WalAppendContext {
48    pub start_lsn: Lsn,
49    pub end_lsn: Lsn,
50}
51
52#[derive(Debug, Clone, Copy)]
53pub struct WalAppendResult {
54    pub start_lsn: Lsn,
55    pub end_lsn: Lsn,
56}
57
58struct WalState {
59    storage: WalStorage,
60}
61
62pub struct WalManager {
63    next_lsn: AtomicU64,
64    durable_lsn: AtomicU64,
65    buffer: WalBuffer,
66    state: Mutex<WalState>,
67    writer: Mutex<Option<WalWriterRuntime>>,
68    persist_control_file_on_flush: bool,
69    max_buffer_records: usize,
70    flush_coalesce_bytes: usize,
71    last_checkpoint: AtomicU64,
72    last_record_start: AtomicU64,
73    control_file: Option<Arc<ControlFileManager>>,
74    flush_lock: Mutex<()>,
75    flush_cond: Condvar,
76    checkpoint_redo_start: AtomicU64,
77    touched_pages: DashSet<PageId>,
78}
79
80#[derive(Debug, Clone, Serialize)]
81pub struct WalHeadDebug {
82    pub durable_lsn: Lsn,
83    pub max_assigned_lsn: Lsn,
84    pub last_checkpoint_lsn: Lsn,
85    pub pending_records: usize,
86    pub pending_bytes: usize,
87    pub has_background_writer: bool,
88    pub flush_coalesce_bytes: usize,
89}
90
91#[derive(Debug, Clone, Serialize)]
92pub struct WalSegmentDebug {
93    pub id: u64,
94    pub size_bytes: u64,
95}
96
97#[derive(Debug, Clone, Serialize)]
98pub struct WalPeekDebug {
99    pub lsn: Lsn,
100    pub prev_lsn: Lsn,
101    pub rmid: String,
102    pub info: u8,
103    pub body_len: usize,
104}
105
106pub struct WalWriterHandle {
107    manager: Option<Arc<WalManager>>,
108}
109
110impl WalWriterHandle {
111    fn new(manager: Arc<WalManager>) -> Self {
112        Self {
113            manager: Some(manager),
114        }
115    }
116
117    pub fn stop(mut self) -> QuillSQLResult<()> {
118        if let Some(manager) = self.manager.take() {
119            manager.stop_background_writer()
120        } else {
121            Ok(())
122        }
123    }
124}
125
126impl Drop for WalWriterHandle {
127    fn drop(&mut self) {
128        if let Some(manager) = self.manager.take() {
129            let _ = manager.stop_background_writer();
130        }
131    }
132}
133
134impl WalManager {
135    pub fn new(
136        config: WalConfig,
137        init_state: Option<WalInitState>,
138        control_file: Option<Arc<ControlFileManager>>,
139    ) -> QuillSQLResult<Self> {
140        let scheduler = Self::default_scheduler(&config)?;
141        Self::new_with_scheduler(config, init_state, control_file, scheduler)
142    }
143
144    pub fn new_with_scheduler(
145        config: WalConfig,
146        init_state: Option<WalInitState>,
147        control_file: Option<Arc<ControlFileManager>>,
148        scheduler: Arc<DiskScheduler>,
149    ) -> QuillSQLResult<Self> {
150        let buffer_capacity_cfg = config.buffer_capacity;
151        let max_buffer_records = if buffer_capacity_cfg == 0 {
152            usize::MAX
153        } else {
154            buffer_capacity_cfg
155        };
156        let flush_bytes = if config.flush_coalesce_bytes == 0 {
157            usize::MAX
158        } else {
159            config.flush_coalesce_bytes
160        };
161        let sink: Arc<dyn WalSink> = Arc::new(DiskSchedulerWalSink::new(scheduler.clone()));
162        let persist_control_file_on_flush = config.persist_control_file_on_flush;
163        let mut storage = WalStorage::new(config, sink)?;
164        let (durable_ptr, next_ptr, checkpoint_ptr, last_record_start, redo_start) =
165            if let Some(state) = init_state {
166                (
167                    state.durable_lsn,
168                    state.max_assigned_lsn,
169                    state.last_checkpoint_lsn,
170                    state.last_record_start,
171                    state.checkpoint_redo_start,
172                )
173            } else {
174                let (next_offset, last_start) = storage.recover_offsets()?;
175                (next_offset, next_offset, last_start, last_start, 0)
176            };
177        let ring_capacity = if buffer_capacity_cfg == 0 {
178            DEFAULT_WAL_BUFFER_CAPACITY
179        } else {
180            buffer_capacity_cfg.max(DEFAULT_WAL_BUFFER_CAPACITY)
181        };
182        Ok(Self {
183            next_lsn: AtomicU64::new(next_ptr),
184            durable_lsn: AtomicU64::new(durable_ptr),
185            buffer: WalBuffer::with_capacity(ring_capacity),
186            state: Mutex::new(WalState { storage }),
187            writer: Mutex::new(None),
188            persist_control_file_on_flush,
189            max_buffer_records,
190            flush_coalesce_bytes: flush_bytes,
191            last_checkpoint: AtomicU64::new(checkpoint_ptr),
192            last_record_start: AtomicU64::new(last_record_start),
193            control_file,
194            flush_lock: Mutex::new(()),
195            flush_cond: Condvar::new(),
196            checkpoint_redo_start: AtomicU64::new(redo_start),
197            touched_pages: DashSet::new(),
198        })
199    }
200
201    #[inline]
202    pub fn max_assigned_lsn(&self) -> Lsn {
203        self.next_lsn.load(Ordering::Acquire)
204    }
205
206    #[inline]
207    pub fn durable_lsn(&self) -> Lsn {
208        self.durable_lsn.load(Ordering::Acquire)
209    }
210
211    pub fn debug_head(&self) -> WalHeadDebug {
212        let pending = self.pending_records();
213        let pending_bytes: usize = pending.iter().map(|rec| rec.payload.len()).sum();
214        WalHeadDebug {
215            durable_lsn: self.durable_lsn(),
216            max_assigned_lsn: self.max_assigned_lsn(),
217            last_checkpoint_lsn: self.last_checkpoint_lsn(),
218            pending_records: pending.len(),
219            pending_bytes,
220            has_background_writer: self.has_background_writer(),
221            flush_coalesce_bytes: self.flush_coalesce_bytes,
222        }
223    }
224
225    pub fn debug_segments(&self) -> QuillSQLResult<Vec<WalSegmentDebug>> {
226        let guard = self.state.lock();
227        let directory = guard.storage.directory_path();
228        drop(guard);
229        let mut info = Vec::new();
230        for id in list_segments(&directory)? {
231            let path = segment_path(&directory, id);
232            let size_bytes = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
233            info.push(WalSegmentDebug { id, size_bytes });
234        }
235        Ok(info)
236    }
237
238    pub fn debug_peek(&self, limit: usize) -> QuillSQLResult<Vec<WalPeekDebug>> {
239        let mut reader = self.reader()?;
240        let mut frames = Vec::new();
241        while let Some(frame) = reader.next_frame()? {
242            frames.push(frame);
243        }
244        let start = frames.len().saturating_sub(limit);
245        Ok(frames[start..]
246            .iter()
247            .map(|frame| WalPeekDebug {
248                lsn: frame.lsn,
249                prev_lsn: frame.prev_lsn,
250                rmid: match frame.rmid {
251                    ResourceManagerId::Page => "Page",
252                    ResourceManagerId::Transaction => "Transaction",
253                    ResourceManagerId::Heap => "Heap",
254                    ResourceManagerId::Index => "Index",
255                    ResourceManagerId::Checkpoint => "Checkpoint",
256                    ResourceManagerId::Clr => "Clr",
257                }
258                .to_string(),
259                info: frame.info,
260                body_len: frame.body.len(),
261            })
262            .collect())
263    }
264
265    pub fn append_record_with<F>(&self, mut build: F) -> QuillSQLResult<WalAppendResult>
266    where
267        F: FnMut(WalAppendContext) -> WalRecordPayload,
268    {
269        let preview_ctx = WalAppendContext {
270            start_lsn: 0,
271            end_lsn: WAL_HEADER_LEN as u64 + WAL_CRC_LEN as u64,
272        };
273        let preview_payload = build(preview_ctx);
274        let (_, _, preview_body) = encode_body(&preview_payload);
275        let preview_frame_len = WAL_HEADER_LEN + preview_body.len() + WAL_CRC_LEN;
276
277        let prev_start = self.last_record_start.load(Ordering::Acquire);
278        let start_lsn = self
279            .next_lsn
280            .fetch_add(preview_frame_len as u64, Ordering::SeqCst);
281        let end_lsn_preview = start_lsn + preview_frame_len as u64;
282
283        let preview_ctx = WalAppendContext {
284            start_lsn,
285            end_lsn: end_lsn_preview,
286        };
287        let payload = build(preview_ctx);
288        let frame_bytes = encode_frame(start_lsn, prev_start, &payload);
289        let frame_len = frame_bytes.len();
290        debug_assert_eq!(frame_len, preview_frame_len);
291        let end_lsn = start_lsn + frame_len as u64;
292        let encoded = Bytes::from(frame_bytes);
293
294        self.buffer.push(WalRecord {
295            start_lsn,
296            end_lsn,
297            payload: encoded,
298        });
299
300        let buffer_len = self.buffer.len();
301        let buffer_bytes = self.buffer.bytes();
302        let should_flush = buffer_len >= self.max_buffer_records
303            || buffer_bytes >= self.flush_coalesce_bytes
304            || buffer_bytes >= WAL_PAGE_SIZE;
305
306        self.last_record_start.store(start_lsn, Ordering::Release);
307
308        if should_flush {
309            self.flush_with_mode(Some(end_lsn), false)?;
310        }
311        Ok(WalAppendResult { start_lsn, end_lsn })
312    }
313
314    pub fn log_page_update(
315        &self,
316        page_id: PageId,
317        prev_page_lsn: Lsn,
318        new_image: &[u8],
319    ) -> QuillSQLResult<Option<WalAppendResult>> {
320        if new_image.len() != PAGE_SIZE {
321            return Err(QuillSQLError::Internal(format!(
322                "page {} image size mismatch: new={}",
323                page_id,
324                new_image.len()
325            )));
326        }
327
328        // Track touched page for checkpoint bookkeeping even though we always log full image.
329        let _ = self.fpw_first_touch(page_id);
330        let page_image = new_image.to_vec();
331        let result = self.append_record_with(|_| {
332            WalRecordPayload::PageWrite(PageWritePayload {
333                page_id,
334                prev_page_lsn,
335                page_image: page_image.clone(),
336            })
337        })?;
338        Ok(Some(result))
339    }
340
341    pub fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
342        let redo_start = payload
343            .dpt
344            .iter()
345            .map(|(_, lsn)| *lsn)
346            .min()
347            .unwrap_or(payload.last_lsn);
348        let result = self.append_record_with(|_| WalRecordPayload::Checkpoint(payload.clone()))?;
349        self.last_checkpoint
350            .store(result.start_lsn, Ordering::Release);
351        self.checkpoint_redo_start
352            .store(redo_start, Ordering::Release);
353        self.flush_with_mode(Some(result.end_lsn), true)?;
354        self.touched_pages.clear();
355        if let Some(ctrl) = &self.control_file {
356            ctrl.update(
357                self.durable_lsn(),
358                self.max_assigned_lsn(),
359                self.last_checkpoint_lsn(),
360                self.last_record_start.load(Ordering::Acquire),
361                self.checkpoint_redo_start.load(Ordering::Acquire),
362            )?;
363        }
364        Ok(result.end_lsn)
365    }
366
367    #[inline]
368    pub fn last_checkpoint_lsn(&self) -> Lsn {
369        self.last_checkpoint.load(Ordering::Acquire)
370    }
371
372    pub fn start_background_flush(
373        self: &Arc<Self>,
374        interval: Duration,
375    ) -> QuillSQLResult<Option<WalWriterHandle>> {
376        if interval.is_zero() {
377            return Ok(None);
378        }
379        let mut guard = self.writer.lock();
380        if guard.is_some() {
381            return Ok(None);
382        }
383        let runtime = WalWriterRuntime::spawn(Arc::downgrade(self), interval)?;
384        *guard = Some(runtime);
385        Ok(Some(WalWriterHandle::new(Arc::clone(self))))
386    }
387
388    fn stop_background_writer(&self) -> QuillSQLResult<()> {
389        if let Some(runtime) = self.writer.lock().take() {
390            runtime.stop()?;
391        }
392        Ok(())
393    }
394
395    pub fn has_background_writer(&self) -> bool {
396        self.writer.lock().is_some()
397    }
398
399    pub fn flush(&self, target: Option<Lsn>) -> QuillSQLResult<Lsn> {
400        self.flush_with_mode(target, false)
401    }
402
403    fn flush_with_mode(&self, target: Option<Lsn>, force_sync: bool) -> QuillSQLResult<Lsn> {
404        let recycle_lsn = self.checkpoint_redo_start.load(Ordering::Acquire);
405
406        let (desired, tickets) = {
407            let mut guard = self.state.lock();
408            let current_durable = self.durable_lsn();
409            let highest_buffered = if self.buffer.is_empty() {
410                current_durable
411            } else {
412                self.buffer.highest_end_lsn()
413            };
414            let mut desired = target.filter(|lsn| *lsn != 0).unwrap_or(highest_buffered);
415            desired = cmp::min(self.max_assigned_lsn(), desired);
416
417            if desired <= current_durable {
418                guard.storage.flush(force_sync)?;
419                guard.storage.recycle_segments(recycle_lsn)?;
420                drop(guard);
421                return Ok(current_durable);
422            }
423
424            let (to_flush, _) = self.buffer.drain_until(desired);
425            if !to_flush.is_empty() {
426                guard.storage.append_records(&to_flush)?;
427            }
428            guard.storage.flush(force_sync)?;
429
430            guard.storage.recycle_segments(recycle_lsn)?;
431            let ready = guard.storage.take_ready(desired);
432            (desired, ready)
433        };
434
435        self.wait_for_flush_tickets(tickets)?;
436        self.durable_lsn.store(desired, Ordering::Release);
437        if self.persist_control_file_on_flush {
438            self.persist_control_file()?;
439        }
440        self.flush_cond.notify_all();
441        Ok(desired)
442    }
443
444    pub fn pending_records(&self) -> Vec<WalRecord> {
445        self.buffer.pending()
446    }
447
448    fn default_scheduler(config: &WalConfig) -> QuillSQLResult<Arc<DiskScheduler>> {
449        fs::create_dir_all(&config.directory)?;
450        let db_path = config.directory.join("wal_scheduler.db");
451        let disk_manager = Arc::new(DiskManager::try_new(&db_path)?);
452        Ok(Arc::new(DiskScheduler::new(disk_manager)))
453    }
454
455    fn wait_for_flush_tickets(&self, tickets: Vec<WalFlushTicket>) -> QuillSQLResult<()> {
456        for ticket in tickets {
457            match ticket.receiver.recv() {
458                Ok(Ok(())) => {}
459                Ok(Err(err)) => return Err(err),
460                Err(err) => {
461                    return Err(QuillSQLError::Internal(format!(
462                        "WAL flush completion dropped: {}",
463                        err
464                    )))
465                }
466            }
467        }
468        Ok(())
469    }
470
471    pub fn persist_control_file(&self) -> QuillSQLResult<()> {
472        if let Some(ctrl) = &self.control_file {
473            ctrl.update(
474                self.durable_lsn(),
475                self.max_assigned_lsn(),
476                self.last_checkpoint_lsn(),
477                self.last_record_start.load(Ordering::Acquire),
478                self.checkpoint_redo_start.load(Ordering::Acquire),
479            )?;
480        }
481        Ok(())
482    }
483
484    pub fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
485        if target == 0 {
486            return Ok(self.durable_lsn());
487        }
488        if self.durable_lsn() >= target {
489            return Ok(self.durable_lsn());
490        }
491        self.flush_with_mode(Some(target), true)
492    }
493
494    pub fn wait_for_durable(&self, target: Lsn) -> QuillSQLResult<()> {
495        if target == 0 {
496            return Ok(());
497        }
498        if self.durable_lsn() >= target {
499            return Ok(());
500        }
501        self.flush_with_mode(Some(target), true)?;
502        if self.durable_lsn() >= target {
503            return Ok(());
504        }
505        let mut guard = self.flush_lock.lock();
506        while self.durable_lsn() < target {
507            self.flush_cond.wait(&mut guard);
508        }
509        Ok(())
510    }
511
512    pub fn reader(&self) -> QuillSQLResult<WalReader> {
513        let directory = self.state.lock().storage.directory_path();
514        WalReader::new(directory)
515    }
516
517    pub fn fpw_first_touch(&self, page_id: PageId) -> bool {
518        self.touched_pages.insert(page_id)
519    }
520
521    pub fn control_file(&self) -> Option<Arc<ControlFileManager>> {
522        self.control_file.as_ref().map(Arc::clone)
523    }
524}
525
526impl fmt::Debug for WalManager {
527    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528        f.debug_struct("WalManager")
529            .field("next_lsn", &self.next_lsn.load(Ordering::Relaxed))
530            .field("durable_lsn", &self.durable_lsn.load(Ordering::Relaxed))
531            .field("max_buffer_records", &self.max_buffer_records)
532            .field("flush_coalesce_bytes", &self.flush_coalesce_bytes)
533            .finish()
534    }
535}
536
537impl Drop for WalManager {
538    fn drop(&mut self) {
539        if let Some(runtime) = self.writer.lock().take() {
540            let _ = runtime.stop();
541        }
542    }
543}
544
545pub struct WalReader {
546    directory: PathBuf,
547    segments: Vec<u64>,
548    current_idx: usize,
549    cursor: Option<SegmentCursor>,
550}
551
552impl WalReader {
553    pub fn new(directory: PathBuf) -> QuillSQLResult<Self> {
554        let mut segments = list_segments(&directory)?;
555        segments.sort_unstable();
556        Ok(Self {
557            directory,
558            segments,
559            current_idx: 0,
560            cursor: None,
561        })
562    }
563
564    pub fn next_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
565        loop {
566            if self.cursor.is_none() && !self.open_next_segment()? {
567                return Ok(None);
568            }
569
570            if let Some(cursor) = self.cursor.as_mut() {
571                match cursor.read_frame()? {
572                    Some(frame) => return Ok(Some(frame)),
573                    None => {
574                        self.cursor = None;
575                        continue;
576                    }
577                }
578            } else {
579                return Ok(None);
580            }
581        }
582    }
583
584    fn open_next_segment(&mut self) -> QuillSQLResult<bool> {
585        let Some(segment_id) = self.segments.get(self.current_idx).copied() else {
586            return Ok(false);
587        };
588        let path = segment_path(&self.directory, segment_id);
589        let cursor = SegmentCursor::open(&path)?;
590        self.cursor = Some(cursor);
591        self.current_idx += 1;
592        Ok(true)
593    }
594}
595
596struct SegmentCursor {
597    len: u64,
598    offset: u64,
599    reader: BufReader<File>,
600    fragment_buf: Vec<u8>,
601    ready_frames: VecDeque<WalFrame>,
602}
603
604impl SegmentCursor {
605    fn open(path: &PathBuf) -> QuillSQLResult<Self> {
606        let file = File::open(path)?;
607        let metadata = file.metadata()?;
608        Ok(Self {
609            len: metadata.len(),
610            offset: 0,
611            reader: BufReader::new(file),
612            fragment_buf: Vec::new(),
613            ready_frames: VecDeque::new(),
614        })
615    }
616
617    fn read_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
618        loop {
619            if let Some(frame) = self.ready_frames.pop_front() {
620                return Ok(Some(frame));
621            }
622            if self.offset >= self.len {
623                return Ok(None);
624            }
625            let mut page_buf = vec![0u8; WAL_PAGE_SIZE];
626            if let Err(err) = self.reader.read_exact(&mut page_buf) {
627                if err.kind() == ErrorKind::UnexpectedEof {
628                    self.offset = self.len;
629                    return Ok(None);
630                }
631                return Err(QuillSQLError::Io(err));
632            }
633            self.offset += WAL_PAGE_SIZE as u64;
634            let page = match WalPage::unpack_frames(&page_buf) {
635                Ok(page) => page,
636                Err(QuillSQLError::Internal(message))
637                    if message.contains("truncated")
638                        || message.contains("CRC mismatch")
639                        || message.contains("Invalid WAL page magic") =>
640                {
641                    self.offset = self.len;
642                    return Ok(None);
643                }
644                Err(err) => return Err(err),
645            };
646
647            for slot in page.fragments() {
648                let start = slot.offset as usize;
649                let end = start + slot.len as usize;
650                let fragment = &page.payload()[start..end];
651                match slot.kind {
652                    WalPageFragmentKind::Complete => {
653                        self.fragment_buf.clear();
654                        match decode_frame(fragment) {
655                            Ok((frame, _)) => {
656                                self.ready_frames.push_back(frame);
657                            }
658                            Err(QuillSQLError::Internal(message))
659                                if message.contains("too short")
660                                    || message.contains("truncated")
661                                    || message.contains("CRC mismatch") =>
662                            {
663                                self.offset = self.len;
664                                return Ok(None);
665                            }
666                            Err(err) => return Err(err),
667                        }
668                    }
669                    WalPageFragmentKind::Start => {
670                        self.fragment_buf.clear();
671                        self.fragment_buf.extend_from_slice(fragment);
672                    }
673                    WalPageFragmentKind::Middle => {
674                        if self.fragment_buf.is_empty() {
675                            self.offset = self.len;
676                            return Ok(None);
677                        }
678                        self.fragment_buf.extend_from_slice(fragment);
679                    }
680                    WalPageFragmentKind::End => {
681                        if self.fragment_buf.is_empty() {
682                            self.offset = self.len;
683                            return Ok(None);
684                        }
685                        self.fragment_buf.extend_from_slice(fragment);
686                        let frame_bytes = std::mem::take(&mut self.fragment_buf);
687                        match decode_frame(&frame_bytes) {
688                            Ok((frame, _)) => {
689                                self.ready_frames.push_back(frame);
690                            }
691                            Err(QuillSQLError::Internal(message))
692                                if message.contains("too short")
693                                    || message.contains("truncated")
694                                    || message.contains("CRC mismatch") =>
695                            {
696                                self.offset = self.len;
697                                return Ok(None);
698                            }
699                            Err(err) => return Err(err),
700                        }
701                    }
702                }
703            }
704        }
705    }
706}