quill_sql/recovery/wal/
page.rs

1use std::collections::VecDeque;
2
3use super::record::WalRecord;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::recovery::Lsn;
6
7pub const WAL_PAGE_SIZE: usize = 4096;
8const WAL_PAGE_MAGIC: u32 = 0x5157_5047; // "QWPG"
9const WAL_PAGE_VERSION: u16 = 1;
10
11const WAL_PAGE_HEADER_LEN: usize = 4 + 2 + 2 + 8 + 2 + 2;
12const WAL_PAGE_SLOT_LEN: usize = 8;
13
14/// Header for a WalPage, containing metadata about the page itself.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct WalPageHeader {
17    pub magic: u32,
18    pub version: u16,
19    pub flags: u16,
20    pub prev_page_lsn: Lsn,
21    pub payload_size: u16,
22    pub slot_count: u16,
23}
24
25impl WalPageHeader {
26    fn encode(&self, buf: &mut [u8]) {
27        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
28        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
29        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
30        buf[8..16].copy_from_slice(&self.prev_page_lsn.to_le_bytes());
31        buf[16..18].copy_from_slice(&self.payload_size.to_le_bytes());
32        buf[18..20].copy_from_slice(&self.slot_count.to_le_bytes());
33    }
34
35    fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
36        if bytes.len() < WAL_PAGE_HEADER_LEN {
37            return Err(QuillSQLError::Internal(
38                "WAL page truncated before header".to_string(),
39            ));
40        }
41        let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
42        let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
43        let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
44        let prev_page_lsn = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
45        let payload_size = u16::from_le_bytes(bytes[16..18].try_into().unwrap());
46        let slot_count = u16::from_le_bytes(bytes[18..20].try_into().unwrap());
47        Ok(Self {
48            magic,
49            version,
50            flags,
51            prev_page_lsn,
52            payload_size,
53            slot_count,
54        })
55    }
56}
57
58/// Describes whether a fragment in a WalPage is a self-contained record
59/// or part of a larger record that spans multiple pages.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum WalPageFragmentKind {
62    Complete,
63    Start,
64    Middle,
65    End,
66}
67
68impl WalPageFragmentKind {
69    fn to_byte(self) -> u8 {
70        match self {
71            WalPageFragmentKind::Complete => 0,
72            WalPageFragmentKind::Start => 1,
73            WalPageFragmentKind::Middle => 2,
74            WalPageFragmentKind::End => 3,
75        }
76    }
77
78    fn from_byte(value: u8) -> QuillSQLResult<Self> {
79        match value {
80            0 => Ok(WalPageFragmentKind::Complete),
81            1 => Ok(WalPageFragmentKind::Start),
82            2 => Ok(WalPageFragmentKind::Middle),
83            3 => Ok(WalPageFragmentKind::End),
84            other => Err(QuillSQLError::Internal(format!(
85                "Unknown WAL page fragment kind: {}",
86                other
87            ))),
88        }
89    }
90}
91
92/// A slot in the WalPage's slot directory. It points to a specific
93/// fragment within the page's payload area.
94#[derive(Debug, Clone, Copy)]
95pub struct WalPageSlot {
96    pub offset: u16,
97    pub len: u16,
98    pub kind: WalPageFragmentKind,
99}
100
101impl WalPageSlot {
102    fn encode(&self) -> [u8; WAL_PAGE_SLOT_LEN] {
103        let mut buf = [0u8; WAL_PAGE_SLOT_LEN];
104        buf[0..2].copy_from_slice(&self.offset.to_le_bytes());
105        buf[2..4].copy_from_slice(&self.len.to_le_bytes());
106        buf[4] = self.kind.to_byte();
107        buf
108    }
109
110    fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
111        if bytes.len() < WAL_PAGE_SLOT_LEN {
112            return Err(QuillSQLError::Internal(
113                "WAL page truncated before slot".to_string(),
114            ));
115        }
116        let offset = u16::from_le_bytes(bytes[0..2].try_into().unwrap());
117        let len = u16::from_le_bytes(bytes[2..4].try_into().unwrap());
118        let kind = WalPageFragmentKind::from_byte(bytes[4])?;
119        Ok(Self { offset, len, kind })
120    }
121}
122
123/// Represents a WalRecord that is being carried over to the next WalPage
124/// because it could not fit entirely in the previous one.
125#[derive(Clone)]
126pub struct WalFrameContinuation {
127    pub record: WalRecord,
128    pub offset: usize,
129}
130
131impl WalFrameContinuation {
132    fn remaining(&self) -> usize {
133        self.record.payload.len().saturating_sub(self.offset)
134    }
135}
136
137/// A WalPage is a fixed-size (e.g., 4KB) block within a WAL segment file.
138/// It contains a header, a payload area for raw data, and a slot directory
139/// that describes the fragments packed into the payload.
140///
141/// The payload and slot directory grow towards each other from opposite ends of the page.
142///
143/// WalPage On-disk Layout (4KB):
144/// ------------------------------------------------------------------------------------------
145/// | WalPageHeader | Payload Data (grows forward) | ... Free Space ... | Slot Array (grows backward) |
146/// ------------------------------------------------------------------------------------------
147///
148/// WalPageHeader (20 bytes):
149/// ------------------------------------------------------------------------------------------
150/// | Magic (u32) | Ver (u16) | Flags (u16) | PrevPageLSN (u64) | PayloadSize (u16) | SlotCount (u16) |
151/// ------------------------------------------------------------------------------------------
152///
153/// WalPageSlot (8 bytes):
154/// -----------------------------------------------------------------
155/// | Offset (u16) | Length (u16) | Kind (u8) | Reserved (3) |
156/// -----------------------------------------------------------------
157/// - Offset/Length: Point to a fragment within this page's Payload Data.
158/// - Kind: Indicates if the fragment is a Complete, Start, Middle, or End piece of a WalFrame.
159#[derive(Clone)]
160pub struct WalPage {
161    header: WalPageHeader,
162    payload: Vec<u8>,
163    slots: Vec<WalPageSlot>,
164    full: bool,
165    continuation: Option<WalFrameContinuation>,
166    last_end_lsn: Option<Lsn>,
167}
168
169impl WalPage {
170    pub fn pack_frames(
171        prev_page_lsn: Lsn,
172        frames: Vec<WalRecord>,
173        carry: Option<WalFrameContinuation>,
174    ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
175        let queue: VecDeque<WalRecord> = frames.into();
176        let (payload, slots, leftover, continuation, last_end_lsn, full) =
177            Self::fill_page(Vec::new(), Vec::new(), queue, carry, None);
178
179        let header = WalPageHeader {
180            magic: WAL_PAGE_MAGIC,
181            version: WAL_PAGE_VERSION,
182            flags: 0,
183            prev_page_lsn,
184            payload_size: payload.len() as u16,
185            slot_count: slots.len() as u16,
186        };
187
188        (
189            Self {
190                header,
191                payload,
192                slots,
193                full,
194                continuation: continuation.clone(),
195                last_end_lsn,
196            },
197            leftover,
198            continuation,
199        )
200    }
201
202    pub fn continue_pack(
203        mut self,
204        frames: Vec<WalRecord>,
205    ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
206        let queue: VecDeque<WalRecord> = frames.into();
207        let (payload, slots, leftover, continuation, last_end_lsn, full) = Self::fill_page(
208            self.payload,
209            self.slots,
210            queue,
211            self.continuation.take(),
212            self.last_end_lsn,
213        );
214
215        self.payload = payload;
216        self.slots = slots;
217        self.continuation = continuation.clone();
218        self.last_end_lsn = last_end_lsn;
219        self.full = full;
220        self.header.payload_size = self.payload.len() as u16;
221        self.header.slot_count = self.slots.len() as u16;
222
223        (self, leftover, continuation)
224    }
225
226    fn fill_page(
227        mut payload: Vec<u8>,
228        mut slots: Vec<WalPageSlot>,
229        mut queue: VecDeque<WalRecord>,
230        mut continuation: Option<WalFrameContinuation>,
231        mut last_end_lsn: Option<Lsn>,
232    ) -> (
233        Vec<u8>,
234        Vec<WalPageSlot>,
235        Vec<WalRecord>,
236        Option<WalFrameContinuation>,
237        Option<Lsn>,
238        bool,
239    ) {
240        loop {
241            if continuation.is_none() && queue.is_empty() {
242                break;
243            }
244
245            let available = Self::available_bytes(payload.len(), slots.len());
246            if available == 0 {
247                break;
248            }
249
250            if let Some(mut cont) = continuation.take() {
251                if cont.remaining() == 0 {
252                    last_end_lsn = Some(cont.record.end_lsn);
253                    continue;
254                }
255                let take = available.min(cont.remaining());
256                if take == 0 {
257                    continuation = Some(cont);
258                    break;
259                }
260                let start = payload.len();
261                payload.extend_from_slice(&cont.record.payload[cont.offset..cont.offset + take]);
262                let kind = if cont.offset == 0 {
263                    if cont.offset + take == cont.record.payload.len() {
264                        WalPageFragmentKind::Complete
265                    } else {
266                        WalPageFragmentKind::Start
267                    }
268                } else if cont.offset + take == cont.record.payload.len() {
269                    WalPageFragmentKind::End
270                } else {
271                    WalPageFragmentKind::Middle
272                };
273                slots.push(WalPageSlot {
274                    offset: start as u16,
275                    len: take as u16,
276                    kind,
277                });
278                cont.offset += take;
279                if cont.offset == cont.record.payload.len() {
280                    last_end_lsn = Some(cont.record.end_lsn);
281                    continuation = None;
282                } else {
283                    continuation = Some(cont);
284                }
285                continue;
286            }
287
288            if let Some(record) = queue.pop_front() {
289                continuation = Some(WalFrameContinuation { record, offset: 0 });
290                continue;
291            }
292        }
293
294        let leftover: Vec<WalRecord> = queue.into_iter().collect();
295        let full = Self::available_for_next(payload.len(), slots.len()) == 0;
296
297        (payload, slots, leftover, continuation, last_end_lsn, full)
298    }
299
300    pub fn unpack_frames(bytes: &[u8]) -> QuillSQLResult<Self> {
301        if bytes.len() < WAL_PAGE_SIZE {
302            return Err(QuillSQLError::Internal(
303                "WAL page truncated before full page".to_string(),
304            ));
305        }
306        if bytes.iter().all(|&b| b == 0) {
307            return Ok(Self::empty());
308        }
309
310        let header = WalPageHeader::decode(&bytes[..WAL_PAGE_HEADER_LEN])?;
311        if header.magic != WAL_PAGE_MAGIC {
312            return Err(QuillSQLError::Internal(format!(
313                "Invalid WAL page magic: {:x}",
314                header.magic
315            )));
316        }
317        if header.version != WAL_PAGE_VERSION {
318            return Err(QuillSQLError::Internal(format!(
319                "Unsupported WAL page version: {}",
320                header.version
321            )));
322        }
323
324        let payload_end = WAL_PAGE_HEADER_LEN + header.payload_size as usize;
325        if payload_end > WAL_PAGE_SIZE {
326            return Err(QuillSQLError::Internal(
327                "WAL page payload exceeds page size".to_string(),
328            ));
329        }
330        let dir_start = WAL_PAGE_SIZE
331            .checked_sub(header.slot_count as usize * WAL_PAGE_SLOT_LEN)
332            .ok_or_else(|| {
333                QuillSQLError::Internal("WAL page directory exceeds page size".to_string())
334            })?;
335        if dir_start < payload_end {
336            return Err(QuillSQLError::Internal(
337                "WAL page directory overlaps payload".to_string(),
338            ));
339        }
340
341        let payload = bytes[WAL_PAGE_HEADER_LEN..payload_end].to_vec();
342        let mut slots = Vec::with_capacity(header.slot_count as usize);
343        let mut cursor = dir_start;
344        for _ in 0..header.slot_count {
345            let slot = WalPageSlot::decode(&bytes[cursor..cursor + WAL_PAGE_SLOT_LEN])?;
346            if slot.offset as usize + slot.len as usize > payload.len() {
347                return Err(QuillSQLError::Internal(
348                    "WAL page slot exceeds payload".to_string(),
349                ));
350            }
351            slots.push(slot);
352            cursor += WAL_PAGE_SLOT_LEN;
353        }
354
355        let full = Self::available_for_next(payload.len(), slots.len()) == 0;
356        Ok(Self {
357            header,
358            payload,
359            slots,
360            full,
361            continuation: None,
362            last_end_lsn: None,
363        })
364    }
365
366    pub fn to_bytes(&self) -> Vec<u8> {
367        let mut buf = vec![0u8; WAL_PAGE_SIZE];
368        self.header.encode(&mut buf[..WAL_PAGE_HEADER_LEN]);
369        buf[WAL_PAGE_HEADER_LEN..WAL_PAGE_HEADER_LEN + self.payload.len()]
370            .copy_from_slice(&self.payload);
371        let dir_start = WAL_PAGE_SIZE - self.slots.len() * WAL_PAGE_SLOT_LEN;
372        let mut cursor = dir_start;
373        for slot in &self.slots {
374            let encoded = slot.encode();
375            buf[cursor..cursor + WAL_PAGE_SLOT_LEN].copy_from_slice(&encoded);
376            cursor += WAL_PAGE_SLOT_LEN;
377        }
378        buf
379    }
380
381    pub fn fragments(&self) -> &[WalPageSlot] {
382        &self.slots
383    }
384
385    pub fn payload(&self) -> &[u8] {
386        &self.payload
387    }
388
389    pub fn is_full(&self) -> bool {
390        self.full
391    }
392
393    pub fn has_payload(&self) -> bool {
394        !self.payload.is_empty() || !self.slots.is_empty()
395    }
396
397    pub fn last_end_lsn(&self) -> Option<Lsn> {
398        self.last_end_lsn
399    }
400
401    pub fn continuation(&self) -> Option<&WalFrameContinuation> {
402        self.continuation.as_ref()
403    }
404
405    pub fn prev_page_lsn(&self) -> Lsn {
406        self.header.prev_page_lsn
407    }
408
409    fn empty() -> Self {
410        Self {
411            header: WalPageHeader {
412                magic: WAL_PAGE_MAGIC,
413                version: WAL_PAGE_VERSION,
414                flags: 0,
415                prev_page_lsn: 0,
416                payload_size: 0,
417                slot_count: 0,
418            },
419            payload: Vec::new(),
420            slots: Vec::new(),
421            full: false,
422            continuation: None,
423            last_end_lsn: None,
424        }
425    }
426
427    fn available_bytes(payload_len: usize, slot_count: usize) -> usize {
428        WAL_PAGE_SIZE
429            .saturating_sub(WAL_PAGE_HEADER_LEN)
430            .saturating_sub(payload_len)
431            .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
432    }
433
434    fn available_for_next(payload_len: usize, slot_count: usize) -> usize {
435        WAL_PAGE_SIZE
436            .saturating_sub(WAL_PAGE_HEADER_LEN)
437            .saturating_sub(payload_len)
438            .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
439    }
440}
441
442impl Default for WalPage {
443    fn default() -> Self {
444        Self::empty()
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use crate::recovery::wal::codec::{decode_frame, encode_frame};
452    use crate::recovery::wal_record::{
453        PageWritePayload, TransactionPayload, TransactionRecordKind, WalFrame, WalRecordPayload,
454        WAL_CRC_LEN, WAL_HEADER_LEN,
455    };
456    use bytes::Bytes;
457
458    fn make_transaction_record(start: Lsn, prev: Lsn, txn: u64) -> WalRecord {
459        let payload = WalRecordPayload::Transaction(TransactionPayload {
460            marker: TransactionRecordKind::Begin,
461            txn_id: txn,
462        });
463        build_record(start, prev, &payload)
464    }
465
466    fn build_record(start: Lsn, prev: Lsn, payload: &WalRecordPayload) -> WalRecord {
467        let frame = encode_frame(start, prev, payload);
468        let end = start + frame.len() as u64;
469        WalRecord {
470            start_lsn: start,
471            end_lsn: end,
472            payload: Bytes::from(frame),
473        }
474    }
475
476    fn decode_pages(pages: &[WalPage]) -> Vec<WalFrame> {
477        let mut frames = Vec::new();
478        let mut buffer = Vec::new();
479        for page in pages {
480            for slot in page.fragments() {
481                let start = slot.offset as usize;
482                let end = start + slot.len as usize;
483                let fragment = &page.payload()[start..end];
484                match slot.kind {
485                    WalPageFragmentKind::Complete => {
486                        buffer.clear();
487                        let (frame, _) = decode_frame(fragment).expect("frame");
488                        frames.push(frame);
489                    }
490                    WalPageFragmentKind::Start => {
491                        buffer.clear();
492                        buffer.extend_from_slice(fragment);
493                    }
494                    WalPageFragmentKind::Middle => {
495                        assert!(!buffer.is_empty());
496                        buffer.extend_from_slice(fragment);
497                    }
498                    WalPageFragmentKind::End => {
499                        assert!(!buffer.is_empty());
500                        buffer.extend_from_slice(fragment);
501                        let (frame, _) = decode_frame(&buffer).expect("frame");
502                        frames.push(frame);
503                        buffer.clear();
504                    }
505                }
506            }
507        }
508        frames
509    }
510
511    #[test]
512    fn pack_single_page_roundtrip() {
513        let mut records = Vec::new();
514        let mut start = 0;
515        let mut prev = 0;
516        for txn in 0..8 {
517            let record = make_transaction_record(start, prev, txn);
518            prev = record.start_lsn;
519            start = record.end_lsn;
520            records.push(record);
521        }
522
523        let (page, leftover, carry) = WalPage::pack_frames(0, records.clone(), None);
524        assert!(leftover.is_empty());
525        assert!(carry.is_none());
526        assert!(page.has_payload());
527
528        let bytes = page.to_bytes();
529        let decoded = WalPage::unpack_frames(&bytes).expect("unpack");
530        let frames = decode_pages(&[decoded]);
531        assert_eq!(frames.len(), records.len());
532        for (frame, record) in frames.iter().zip(records.iter()) {
533            assert_eq!(frame.lsn, record.start_lsn);
534        }
535    }
536
537    #[test]
538    fn pack_multiple_pages() {
539        let mut records = Vec::new();
540        let mut start = 0;
541        let mut prev = 0;
542        // Enough records to span multiple pages
543        for txn in 0..128 {
544            let record = make_transaction_record(start, prev, txn);
545            prev = record.start_lsn;
546            start = record.end_lsn;
547            records.push(record);
548        }
549
550        let mut queue = records.clone();
551        let mut prev_page_lsn = 0;
552        let mut carry = None;
553        let mut pages = Vec::new();
554        while !queue.is_empty() || carry.is_some() {
555            let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
556            if page.has_payload() {
557                prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
558                pages.push(page);
559            }
560            queue = leftover;
561            carry = next;
562        }
563
564        assert!(pages.len() > 1);
565        let frames = decode_pages(&pages);
566        assert_eq!(frames.len(), records.len());
567        for (frame, record) in frames.iter().zip(records.iter()) {
568            assert_eq!(frame.lsn, record.start_lsn);
569        }
570    }
571
572    #[test]
573    fn pack_cross_page_frame() {
574        let page_image = vec![7u8; 4096];
575        let payload = WalRecordPayload::PageWrite(PageWritePayload {
576            page_id: 1,
577            prev_page_lsn: 0,
578            page_image,
579        });
580        let record = build_record(0, 0, &payload);
581
582        let mut pages = Vec::new();
583        let mut queue = vec![record.clone()];
584        let mut prev_page_lsn = 0;
585        let mut carry = None;
586        while !queue.is_empty() || carry.is_some() {
587            let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
588            assert!(page.has_payload());
589            prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
590            pages.push(page);
591            queue = leftover;
592            carry = next;
593        }
594
595        assert!(pages.len() >= 2);
596        let frames = decode_pages(&pages);
597        assert_eq!(frames.len(), 1);
598        assert_eq!(frames[0].lsn, record.start_lsn);
599        assert_eq!(
600            frames[0].body.len(),
601            record.payload.len() - WAL_HEADER_LEN - WAL_CRC_LEN
602        );
603    }
604}