quill_sql/recovery/wal/
page.rs

1use std::collections::VecDeque;
2
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::recovery::{Lsn, WalRecord};
5
6pub const WAL_PAGE_SIZE: usize = 4096;
7const WAL_PAGE_MAGIC: u32 = 0x5157_5047; // "QWPG"
8const WAL_PAGE_VERSION: u16 = 1;
9
10const WAL_PAGE_HEADER_LEN: usize = 4 + 2 + 2 + 8 + 2 + 2;
11const WAL_PAGE_SLOT_LEN: usize = 8;
12
13/// Header for a WalPage, containing metadata about the page itself.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct WalPageHeader {
16    pub magic: u32,
17    pub version: u16,
18    pub flags: u16,
19    pub prev_page_lsn: Lsn,
20    pub payload_size: u16,
21    pub slot_count: u16,
22}
23
24impl WalPageHeader {
25    fn encode(&self, buf: &mut [u8]) {
26        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
27        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
28        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
29        buf[8..16].copy_from_slice(&self.prev_page_lsn.to_le_bytes());
30        buf[16..18].copy_from_slice(&self.payload_size.to_le_bytes());
31        buf[18..20].copy_from_slice(&self.slot_count.to_le_bytes());
32    }
33
34    fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
35        if bytes.len() < WAL_PAGE_HEADER_LEN {
36            return Err(QuillSQLError::Internal(
37                "WAL page truncated before header".to_string(),
38            ));
39        }
40        let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
41        let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
42        let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
43        let prev_page_lsn = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
44        let payload_size = u16::from_le_bytes(bytes[16..18].try_into().unwrap());
45        let slot_count = u16::from_le_bytes(bytes[18..20].try_into().unwrap());
46        Ok(Self {
47            magic,
48            version,
49            flags,
50            prev_page_lsn,
51            payload_size,
52            slot_count,
53        })
54    }
55}
56
57/// Describes whether a fragment in a WalPage is a self-contained record
58/// or part of a larger record that spans multiple pages.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum WalPageFragmentKind {
61    Complete,
62    Start,
63    Middle,
64    End,
65}
66
67impl WalPageFragmentKind {
68    fn to_byte(self) -> u8 {
69        match self {
70            WalPageFragmentKind::Complete => 0,
71            WalPageFragmentKind::Start => 1,
72            WalPageFragmentKind::Middle => 2,
73            WalPageFragmentKind::End => 3,
74        }
75    }
76
77    fn from_byte(value: u8) -> QuillSQLResult<Self> {
78        match value {
79            0 => Ok(WalPageFragmentKind::Complete),
80            1 => Ok(WalPageFragmentKind::Start),
81            2 => Ok(WalPageFragmentKind::Middle),
82            3 => Ok(WalPageFragmentKind::End),
83            other => Err(QuillSQLError::Internal(format!(
84                "Unknown WAL page fragment kind: {}",
85                other
86            ))),
87        }
88    }
89}
90
91/// A slot in the WalPage's slot directory. It points to a specific
92/// fragment within the page's payload area.
93#[derive(Debug, Clone, Copy)]
94pub struct WalPageSlot {
95    pub offset: u16,
96    pub len: u16,
97    pub kind: WalPageFragmentKind,
98}
99
100impl WalPageSlot {
101    fn encode(&self) -> [u8; WAL_PAGE_SLOT_LEN] {
102        let mut buf = [0u8; WAL_PAGE_SLOT_LEN];
103        buf[0..2].copy_from_slice(&self.offset.to_le_bytes());
104        buf[2..4].copy_from_slice(&self.len.to_le_bytes());
105        buf[4] = self.kind.to_byte();
106        buf
107    }
108
109    fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
110        if bytes.len() < WAL_PAGE_SLOT_LEN {
111            return Err(QuillSQLError::Internal(
112                "WAL page truncated before slot".to_string(),
113            ));
114        }
115        let offset = u16::from_le_bytes(bytes[0..2].try_into().unwrap());
116        let len = u16::from_le_bytes(bytes[2..4].try_into().unwrap());
117        let kind = WalPageFragmentKind::from_byte(bytes[4])?;
118        Ok(Self { offset, len, kind })
119    }
120}
121
122/// Represents a WalRecord that is being carried over to the next WalPage
123/// because it could not fit entirely in the previous one.
124#[derive(Clone)]
125pub struct WalFrameContinuation {
126    pub record: WalRecord,
127    pub offset: usize,
128}
129
130impl WalFrameContinuation {
131    fn remaining(&self) -> usize {
132        self.record.payload.len().saturating_sub(self.offset)
133    }
134}
135
136/// A WalPage is a fixed-size (e.g., 4KB) block within a WAL segment file.
137/// It contains a header, a payload area for raw data, and a slot directory
138/// that describes the fragments packed into the payload.
139///
140/// The payload and slot directory grow towards each other from opposite ends of the page.
141///
142/// WalPage On-disk Layout (4KB):
143/// ------------------------------------------------------------------------------------------
144/// | WalPageHeader | Payload Data (grows forward) | ... Free Space ... | Slot Array (grows backward) |
145/// ------------------------------------------------------------------------------------------
146///
147/// WalPageHeader (20 bytes):
148/// ------------------------------------------------------------------------------------------
149/// | Magic (u32) | Ver (u16) | Flags (u16) | PrevPageLSN (u64) | PayloadSize (u16) | SlotCount (u16) |
150/// ------------------------------------------------------------------------------------------
151///
152/// WalPageSlot (8 bytes):
153/// -----------------------------------------------------------------
154/// | Offset (u16) | Length (u16) | Kind (u8) | Reserved (3) |
155/// -----------------------------------------------------------------
156/// - Offset/Length: Point to a fragment within this page's Payload Data.
157/// - Kind: Indicates if the fragment is a Complete, Start, Middle, or End piece of a WalFrame.
158#[derive(Clone)]
159pub struct WalPage {
160    header: WalPageHeader,
161    payload: Vec<u8>,
162    slots: Vec<WalPageSlot>,
163    full: bool,
164    continuation: Option<WalFrameContinuation>,
165    last_end_lsn: Option<Lsn>,
166}
167
168impl WalPage {
169    pub fn pack_frames(
170        prev_page_lsn: Lsn,
171        frames: Vec<WalRecord>,
172        carry: Option<WalFrameContinuation>,
173    ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
174        let queue: VecDeque<WalRecord> = frames.into();
175        let (payload, slots, leftover, continuation, last_end_lsn, full) =
176            Self::fill_page(Vec::new(), Vec::new(), queue, carry, None);
177
178        let header = WalPageHeader {
179            magic: WAL_PAGE_MAGIC,
180            version: WAL_PAGE_VERSION,
181            flags: 0,
182            prev_page_lsn,
183            payload_size: payload.len() as u16,
184            slot_count: slots.len() as u16,
185        };
186
187        (
188            Self {
189                header,
190                payload,
191                slots,
192                full,
193                continuation: continuation.clone(),
194                last_end_lsn,
195            },
196            leftover,
197            continuation,
198        )
199    }
200
201    pub fn continue_pack(
202        mut self,
203        frames: Vec<WalRecord>,
204    ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
205        let queue: VecDeque<WalRecord> = frames.into();
206        let (payload, slots, leftover, continuation, last_end_lsn, full) = Self::fill_page(
207            self.payload,
208            self.slots,
209            queue,
210            self.continuation.take(),
211            self.last_end_lsn,
212        );
213
214        self.payload = payload;
215        self.slots = slots;
216        self.continuation = continuation.clone();
217        self.last_end_lsn = last_end_lsn;
218        self.full = full;
219        self.header.payload_size = self.payload.len() as u16;
220        self.header.slot_count = self.slots.len() as u16;
221
222        (self, leftover, continuation)
223    }
224
225    fn fill_page(
226        mut payload: Vec<u8>,
227        mut slots: Vec<WalPageSlot>,
228        mut queue: VecDeque<WalRecord>,
229        mut continuation: Option<WalFrameContinuation>,
230        mut last_end_lsn: Option<Lsn>,
231    ) -> (
232        Vec<u8>,
233        Vec<WalPageSlot>,
234        Vec<WalRecord>,
235        Option<WalFrameContinuation>,
236        Option<Lsn>,
237        bool,
238    ) {
239        loop {
240            if continuation.is_none() && queue.is_empty() {
241                break;
242            }
243
244            let available = Self::available_bytes(payload.len(), slots.len());
245            if available == 0 {
246                break;
247            }
248
249            if let Some(mut cont) = continuation.take() {
250                if cont.remaining() == 0 {
251                    last_end_lsn = Some(cont.record.end_lsn);
252                    continue;
253                }
254                let take = available.min(cont.remaining());
255                if take == 0 {
256                    continuation = Some(cont);
257                    break;
258                }
259                let start = payload.len();
260                payload.extend_from_slice(&cont.record.payload[cont.offset..cont.offset + take]);
261                let kind = if cont.offset == 0 {
262                    if cont.offset + take == cont.record.payload.len() {
263                        WalPageFragmentKind::Complete
264                    } else {
265                        WalPageFragmentKind::Start
266                    }
267                } else if cont.offset + take == cont.record.payload.len() {
268                    WalPageFragmentKind::End
269                } else {
270                    WalPageFragmentKind::Middle
271                };
272                slots.push(WalPageSlot {
273                    offset: start as u16,
274                    len: take as u16,
275                    kind,
276                });
277                cont.offset += take;
278                if cont.offset == cont.record.payload.len() {
279                    last_end_lsn = Some(cont.record.end_lsn);
280                    continuation = None;
281                } else {
282                    continuation = Some(cont);
283                }
284                continue;
285            }
286
287            if let Some(record) = queue.pop_front() {
288                continuation = Some(WalFrameContinuation { record, offset: 0 });
289                continue;
290            }
291        }
292
293        let leftover: Vec<WalRecord> = queue.into_iter().collect();
294        let full = Self::available_for_next(payload.len(), slots.len()) == 0;
295
296        (payload, slots, leftover, continuation, last_end_lsn, full)
297    }
298
299    pub fn unpack_frames(bytes: &[u8]) -> QuillSQLResult<Self> {
300        if bytes.len() < WAL_PAGE_SIZE {
301            return Err(QuillSQLError::Internal(
302                "WAL page truncated before full page".to_string(),
303            ));
304        }
305        if bytes.iter().all(|&b| b == 0) {
306            return Ok(Self::empty());
307        }
308
309        let header = WalPageHeader::decode(&bytes[..WAL_PAGE_HEADER_LEN])?;
310        if header.magic != WAL_PAGE_MAGIC {
311            return Err(QuillSQLError::Internal(format!(
312                "Invalid WAL page magic: {:x}",
313                header.magic
314            )));
315        }
316        if header.version != WAL_PAGE_VERSION {
317            return Err(QuillSQLError::Internal(format!(
318                "Unsupported WAL page version: {}",
319                header.version
320            )));
321        }
322
323        let payload_end = WAL_PAGE_HEADER_LEN + header.payload_size as usize;
324        if payload_end > WAL_PAGE_SIZE {
325            return Err(QuillSQLError::Internal(
326                "WAL page payload exceeds page size".to_string(),
327            ));
328        }
329        let dir_start = WAL_PAGE_SIZE
330            .checked_sub(header.slot_count as usize * WAL_PAGE_SLOT_LEN)
331            .ok_or_else(|| {
332                QuillSQLError::Internal("WAL page directory exceeds page size".to_string())
333            })?;
334        if dir_start < payload_end {
335            return Err(QuillSQLError::Internal(
336                "WAL page directory overlaps payload".to_string(),
337            ));
338        }
339
340        let payload = bytes[WAL_PAGE_HEADER_LEN..payload_end].to_vec();
341        let mut slots = Vec::with_capacity(header.slot_count as usize);
342        let mut cursor = dir_start;
343        for _ in 0..header.slot_count {
344            let slot = WalPageSlot::decode(&bytes[cursor..cursor + WAL_PAGE_SLOT_LEN])?;
345            if slot.offset as usize + slot.len as usize > payload.len() {
346                return Err(QuillSQLError::Internal(
347                    "WAL page slot exceeds payload".to_string(),
348                ));
349            }
350            slots.push(slot);
351            cursor += WAL_PAGE_SLOT_LEN;
352        }
353
354        let full = Self::available_for_next(payload.len(), slots.len()) == 0;
355        Ok(Self {
356            header,
357            payload,
358            slots,
359            full,
360            continuation: None,
361            last_end_lsn: None,
362        })
363    }
364
365    pub fn to_bytes(&self) -> Vec<u8> {
366        let mut buf = vec![0u8; WAL_PAGE_SIZE];
367        self.header.encode(&mut buf[..WAL_PAGE_HEADER_LEN]);
368        buf[WAL_PAGE_HEADER_LEN..WAL_PAGE_HEADER_LEN + self.payload.len()]
369            .copy_from_slice(&self.payload);
370        let dir_start = WAL_PAGE_SIZE - self.slots.len() * WAL_PAGE_SLOT_LEN;
371        let mut cursor = dir_start;
372        for slot in &self.slots {
373            let encoded = slot.encode();
374            buf[cursor..cursor + WAL_PAGE_SLOT_LEN].copy_from_slice(&encoded);
375            cursor += WAL_PAGE_SLOT_LEN;
376        }
377        buf
378    }
379
380    pub fn fragments(&self) -> &[WalPageSlot] {
381        &self.slots
382    }
383
384    pub fn payload(&self) -> &[u8] {
385        &self.payload
386    }
387
388    pub fn is_full(&self) -> bool {
389        self.full
390    }
391
392    pub fn has_payload(&self) -> bool {
393        !self.payload.is_empty() || !self.slots.is_empty()
394    }
395
396    pub fn last_end_lsn(&self) -> Option<Lsn> {
397        self.last_end_lsn
398    }
399
400    pub fn continuation(&self) -> Option<&WalFrameContinuation> {
401        self.continuation.as_ref()
402    }
403
404    pub fn prev_page_lsn(&self) -> Lsn {
405        self.header.prev_page_lsn
406    }
407
408    fn empty() -> Self {
409        Self {
410            header: WalPageHeader {
411                magic: WAL_PAGE_MAGIC,
412                version: WAL_PAGE_VERSION,
413                flags: 0,
414                prev_page_lsn: 0,
415                payload_size: 0,
416                slot_count: 0,
417            },
418            payload: Vec::new(),
419            slots: Vec::new(),
420            full: false,
421            continuation: None,
422            last_end_lsn: None,
423        }
424    }
425
426    fn available_bytes(payload_len: usize, slot_count: usize) -> usize {
427        WAL_PAGE_SIZE
428            .saturating_sub(WAL_PAGE_HEADER_LEN)
429            .saturating_sub(payload_len)
430            .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
431    }
432
433    fn available_for_next(payload_len: usize, slot_count: usize) -> usize {
434        WAL_PAGE_SIZE
435            .saturating_sub(WAL_PAGE_HEADER_LEN)
436            .saturating_sub(payload_len)
437            .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
438    }
439}
440
441impl Default for WalPage {
442    fn default() -> Self {
443        Self::empty()
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::recovery::wal::codec::{decode_frame, encode_frame};
451    use crate::recovery::wal_record::{
452        PageWritePayload, TransactionPayload, TransactionRecordKind, WalFrame, WalRecordPayload,
453        WAL_CRC_LEN, WAL_HEADER_LEN,
454    };
455    use bytes::Bytes;
456
457    fn make_transaction_record(start: Lsn, prev: Lsn, txn: u64) -> WalRecord {
458        let payload = WalRecordPayload::Transaction(TransactionPayload {
459            marker: TransactionRecordKind::Begin,
460            txn_id: txn,
461        });
462        build_record(start, prev, &payload)
463    }
464
465    fn build_record(start: Lsn, prev: Lsn, payload: &WalRecordPayload) -> WalRecord {
466        let frame = encode_frame(start, prev, payload);
467        let end = start + frame.len() as u64;
468        WalRecord {
469            start_lsn: start,
470            end_lsn: end,
471            payload: Bytes::from(frame),
472        }
473    }
474
475    fn decode_pages(pages: &[WalPage]) -> Vec<WalFrame> {
476        let mut frames = Vec::new();
477        let mut buffer = Vec::new();
478        for page in pages {
479            for slot in page.fragments() {
480                let start = slot.offset as usize;
481                let end = start + slot.len as usize;
482                let fragment = &page.payload()[start..end];
483                match slot.kind {
484                    WalPageFragmentKind::Complete => {
485                        buffer.clear();
486                        let (frame, _) = decode_frame(fragment).expect("frame");
487                        frames.push(frame);
488                    }
489                    WalPageFragmentKind::Start => {
490                        buffer.clear();
491                        buffer.extend_from_slice(fragment);
492                    }
493                    WalPageFragmentKind::Middle => {
494                        assert!(!buffer.is_empty());
495                        buffer.extend_from_slice(fragment);
496                    }
497                    WalPageFragmentKind::End => {
498                        assert!(!buffer.is_empty());
499                        buffer.extend_from_slice(fragment);
500                        let (frame, _) = decode_frame(&buffer).expect("frame");
501                        frames.push(frame);
502                        buffer.clear();
503                    }
504                }
505            }
506        }
507        frames
508    }
509
510    #[test]
511    fn pack_single_page_roundtrip() {
512        let mut records = Vec::new();
513        let mut start = 0;
514        let mut prev = 0;
515        for txn in 0..8 {
516            let record = make_transaction_record(start, prev, txn);
517            prev = record.start_lsn;
518            start = record.end_lsn;
519            records.push(record);
520        }
521
522        let (page, leftover, carry) = WalPage::pack_frames(0, records.clone(), None);
523        assert!(leftover.is_empty());
524        assert!(carry.is_none());
525        assert!(page.has_payload());
526
527        let bytes = page.to_bytes();
528        let decoded = WalPage::unpack_frames(&bytes).expect("unpack");
529        let frames = decode_pages(&[decoded]);
530        assert_eq!(frames.len(), records.len());
531        for (frame, record) in frames.iter().zip(records.iter()) {
532            assert_eq!(frame.lsn, record.start_lsn);
533        }
534    }
535
536    #[test]
537    fn pack_multiple_pages() {
538        let mut records = Vec::new();
539        let mut start = 0;
540        let mut prev = 0;
541        // Enough records to span multiple pages
542        for txn in 0..128 {
543            let record = make_transaction_record(start, prev, txn);
544            prev = record.start_lsn;
545            start = record.end_lsn;
546            records.push(record);
547        }
548
549        let mut queue = records.clone();
550        let mut prev_page_lsn = 0;
551        let mut carry = None;
552        let mut pages = Vec::new();
553        while !queue.is_empty() || carry.is_some() {
554            let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
555            if page.has_payload() {
556                prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
557                pages.push(page);
558            }
559            queue = leftover;
560            carry = next;
561        }
562
563        assert!(pages.len() > 1);
564        let frames = decode_pages(&pages);
565        assert_eq!(frames.len(), records.len());
566        for (frame, record) in frames.iter().zip(records.iter()) {
567            assert_eq!(frame.lsn, record.start_lsn);
568        }
569    }
570
571    #[test]
572    fn pack_cross_page_frame() {
573        let page_image = vec![7u8; 4096];
574        let payload = WalRecordPayload::PageWrite(PageWritePayload {
575            page_id: 1,
576            prev_page_lsn: 0,
577            page_image,
578        });
579        let record = build_record(0, 0, &payload);
580
581        let mut pages = Vec::new();
582        let mut queue = vec![record.clone()];
583        let mut prev_page_lsn = 0;
584        let mut carry = None;
585        while !queue.is_empty() || carry.is_some() {
586            let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
587            assert!(page.has_payload());
588            prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
589            pages.push(page);
590            queue = leftover;
591            carry = next;
592        }
593
594        assert!(pages.len() >= 2);
595        let frames = decode_pages(&pages);
596        assert_eq!(frames.len(), 1);
597        assert_eq!(frames[0].lsn, record.start_lsn);
598        assert_eq!(
599            frames[0].body.len(),
600            record.payload.len() - WAL_HEADER_LEN - WAL_CRC_LEN
601        );
602    }
603}