1use std::collections::VecDeque;
2
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::recovery::{Lsn, WalRecord};
5
6pub const WAL_PAGE_SIZE: usize = 4096;
7const WAL_PAGE_MAGIC: u32 = 0x5157_5047; const WAL_PAGE_VERSION: u16 = 1;
9
10const WAL_PAGE_HEADER_LEN: usize = 4 + 2 + 2 + 8 + 2 + 2;
11const WAL_PAGE_SLOT_LEN: usize = 8;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct WalPageHeader {
16 pub magic: u32,
17 pub version: u16,
18 pub flags: u16,
19 pub prev_page_lsn: Lsn,
20 pub payload_size: u16,
21 pub slot_count: u16,
22}
23
24impl WalPageHeader {
25 fn encode(&self, buf: &mut [u8]) {
26 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
27 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
28 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
29 buf[8..16].copy_from_slice(&self.prev_page_lsn.to_le_bytes());
30 buf[16..18].copy_from_slice(&self.payload_size.to_le_bytes());
31 buf[18..20].copy_from_slice(&self.slot_count.to_le_bytes());
32 }
33
34 fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
35 if bytes.len() < WAL_PAGE_HEADER_LEN {
36 return Err(QuillSQLError::Internal(
37 "WAL page truncated before header".to_string(),
38 ));
39 }
40 let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
41 let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
42 let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
43 let prev_page_lsn = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
44 let payload_size = u16::from_le_bytes(bytes[16..18].try_into().unwrap());
45 let slot_count = u16::from_le_bytes(bytes[18..20].try_into().unwrap());
46 Ok(Self {
47 magic,
48 version,
49 flags,
50 prev_page_lsn,
51 payload_size,
52 slot_count,
53 })
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum WalPageFragmentKind {
61 Complete,
62 Start,
63 Middle,
64 End,
65}
66
67impl WalPageFragmentKind {
68 fn to_byte(self) -> u8 {
69 match self {
70 WalPageFragmentKind::Complete => 0,
71 WalPageFragmentKind::Start => 1,
72 WalPageFragmentKind::Middle => 2,
73 WalPageFragmentKind::End => 3,
74 }
75 }
76
77 fn from_byte(value: u8) -> QuillSQLResult<Self> {
78 match value {
79 0 => Ok(WalPageFragmentKind::Complete),
80 1 => Ok(WalPageFragmentKind::Start),
81 2 => Ok(WalPageFragmentKind::Middle),
82 3 => Ok(WalPageFragmentKind::End),
83 other => Err(QuillSQLError::Internal(format!(
84 "Unknown WAL page fragment kind: {}",
85 other
86 ))),
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy)]
94pub struct WalPageSlot {
95 pub offset: u16,
96 pub len: u16,
97 pub kind: WalPageFragmentKind,
98}
99
100impl WalPageSlot {
101 fn encode(&self) -> [u8; WAL_PAGE_SLOT_LEN] {
102 let mut buf = [0u8; WAL_PAGE_SLOT_LEN];
103 buf[0..2].copy_from_slice(&self.offset.to_le_bytes());
104 buf[2..4].copy_from_slice(&self.len.to_le_bytes());
105 buf[4] = self.kind.to_byte();
106 buf
107 }
108
109 fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
110 if bytes.len() < WAL_PAGE_SLOT_LEN {
111 return Err(QuillSQLError::Internal(
112 "WAL page truncated before slot".to_string(),
113 ));
114 }
115 let offset = u16::from_le_bytes(bytes[0..2].try_into().unwrap());
116 let len = u16::from_le_bytes(bytes[2..4].try_into().unwrap());
117 let kind = WalPageFragmentKind::from_byte(bytes[4])?;
118 Ok(Self { offset, len, kind })
119 }
120}
121
122#[derive(Clone)]
125pub struct WalFrameContinuation {
126 pub record: WalRecord,
127 pub offset: usize,
128}
129
130impl WalFrameContinuation {
131 fn remaining(&self) -> usize {
132 self.record.payload.len().saturating_sub(self.offset)
133 }
134}
135
136#[derive(Clone)]
159pub struct WalPage {
160 header: WalPageHeader,
161 payload: Vec<u8>,
162 slots: Vec<WalPageSlot>,
163 full: bool,
164 continuation: Option<WalFrameContinuation>,
165 last_end_lsn: Option<Lsn>,
166}
167
168impl WalPage {
169 pub fn pack_frames(
170 prev_page_lsn: Lsn,
171 frames: Vec<WalRecord>,
172 carry: Option<WalFrameContinuation>,
173 ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
174 let queue: VecDeque<WalRecord> = frames.into();
175 let (payload, slots, leftover, continuation, last_end_lsn, full) =
176 Self::fill_page(Vec::new(), Vec::new(), queue, carry, None);
177
178 let header = WalPageHeader {
179 magic: WAL_PAGE_MAGIC,
180 version: WAL_PAGE_VERSION,
181 flags: 0,
182 prev_page_lsn,
183 payload_size: payload.len() as u16,
184 slot_count: slots.len() as u16,
185 };
186
187 (
188 Self {
189 header,
190 payload,
191 slots,
192 full,
193 continuation: continuation.clone(),
194 last_end_lsn,
195 },
196 leftover,
197 continuation,
198 )
199 }
200
201 pub fn continue_pack(
202 mut self,
203 frames: Vec<WalRecord>,
204 ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
205 let queue: VecDeque<WalRecord> = frames.into();
206 let (payload, slots, leftover, continuation, last_end_lsn, full) = Self::fill_page(
207 self.payload,
208 self.slots,
209 queue,
210 self.continuation.take(),
211 self.last_end_lsn,
212 );
213
214 self.payload = payload;
215 self.slots = slots;
216 self.continuation = continuation.clone();
217 self.last_end_lsn = last_end_lsn;
218 self.full = full;
219 self.header.payload_size = self.payload.len() as u16;
220 self.header.slot_count = self.slots.len() as u16;
221
222 (self, leftover, continuation)
223 }
224
225 fn fill_page(
226 mut payload: Vec<u8>,
227 mut slots: Vec<WalPageSlot>,
228 mut queue: VecDeque<WalRecord>,
229 mut continuation: Option<WalFrameContinuation>,
230 mut last_end_lsn: Option<Lsn>,
231 ) -> (
232 Vec<u8>,
233 Vec<WalPageSlot>,
234 Vec<WalRecord>,
235 Option<WalFrameContinuation>,
236 Option<Lsn>,
237 bool,
238 ) {
239 loop {
240 if continuation.is_none() && queue.is_empty() {
241 break;
242 }
243
244 let available = Self::available_bytes(payload.len(), slots.len());
245 if available == 0 {
246 break;
247 }
248
249 if let Some(mut cont) = continuation.take() {
250 if cont.remaining() == 0 {
251 last_end_lsn = Some(cont.record.end_lsn);
252 continue;
253 }
254 let take = available.min(cont.remaining());
255 if take == 0 {
256 continuation = Some(cont);
257 break;
258 }
259 let start = payload.len();
260 payload.extend_from_slice(&cont.record.payload[cont.offset..cont.offset + take]);
261 let kind = if cont.offset == 0 {
262 if cont.offset + take == cont.record.payload.len() {
263 WalPageFragmentKind::Complete
264 } else {
265 WalPageFragmentKind::Start
266 }
267 } else if cont.offset + take == cont.record.payload.len() {
268 WalPageFragmentKind::End
269 } else {
270 WalPageFragmentKind::Middle
271 };
272 slots.push(WalPageSlot {
273 offset: start as u16,
274 len: take as u16,
275 kind,
276 });
277 cont.offset += take;
278 if cont.offset == cont.record.payload.len() {
279 last_end_lsn = Some(cont.record.end_lsn);
280 continuation = None;
281 } else {
282 continuation = Some(cont);
283 }
284 continue;
285 }
286
287 if let Some(record) = queue.pop_front() {
288 continuation = Some(WalFrameContinuation { record, offset: 0 });
289 continue;
290 }
291 }
292
293 let leftover: Vec<WalRecord> = queue.into_iter().collect();
294 let full = Self::available_for_next(payload.len(), slots.len()) == 0;
295
296 (payload, slots, leftover, continuation, last_end_lsn, full)
297 }
298
299 pub fn unpack_frames(bytes: &[u8]) -> QuillSQLResult<Self> {
300 if bytes.len() < WAL_PAGE_SIZE {
301 return Err(QuillSQLError::Internal(
302 "WAL page truncated before full page".to_string(),
303 ));
304 }
305 if bytes.iter().all(|&b| b == 0) {
306 return Ok(Self::empty());
307 }
308
309 let header = WalPageHeader::decode(&bytes[..WAL_PAGE_HEADER_LEN])?;
310 if header.magic != WAL_PAGE_MAGIC {
311 return Err(QuillSQLError::Internal(format!(
312 "Invalid WAL page magic: {:x}",
313 header.magic
314 )));
315 }
316 if header.version != WAL_PAGE_VERSION {
317 return Err(QuillSQLError::Internal(format!(
318 "Unsupported WAL page version: {}",
319 header.version
320 )));
321 }
322
323 let payload_end = WAL_PAGE_HEADER_LEN + header.payload_size as usize;
324 if payload_end > WAL_PAGE_SIZE {
325 return Err(QuillSQLError::Internal(
326 "WAL page payload exceeds page size".to_string(),
327 ));
328 }
329 let dir_start = WAL_PAGE_SIZE
330 .checked_sub(header.slot_count as usize * WAL_PAGE_SLOT_LEN)
331 .ok_or_else(|| {
332 QuillSQLError::Internal("WAL page directory exceeds page size".to_string())
333 })?;
334 if dir_start < payload_end {
335 return Err(QuillSQLError::Internal(
336 "WAL page directory overlaps payload".to_string(),
337 ));
338 }
339
340 let payload = bytes[WAL_PAGE_HEADER_LEN..payload_end].to_vec();
341 let mut slots = Vec::with_capacity(header.slot_count as usize);
342 let mut cursor = dir_start;
343 for _ in 0..header.slot_count {
344 let slot = WalPageSlot::decode(&bytes[cursor..cursor + WAL_PAGE_SLOT_LEN])?;
345 if slot.offset as usize + slot.len as usize > payload.len() {
346 return Err(QuillSQLError::Internal(
347 "WAL page slot exceeds payload".to_string(),
348 ));
349 }
350 slots.push(slot);
351 cursor += WAL_PAGE_SLOT_LEN;
352 }
353
354 let full = Self::available_for_next(payload.len(), slots.len()) == 0;
355 Ok(Self {
356 header,
357 payload,
358 slots,
359 full,
360 continuation: None,
361 last_end_lsn: None,
362 })
363 }
364
365 pub fn to_bytes(&self) -> Vec<u8> {
366 let mut buf = vec![0u8; WAL_PAGE_SIZE];
367 self.header.encode(&mut buf[..WAL_PAGE_HEADER_LEN]);
368 buf[WAL_PAGE_HEADER_LEN..WAL_PAGE_HEADER_LEN + self.payload.len()]
369 .copy_from_slice(&self.payload);
370 let dir_start = WAL_PAGE_SIZE - self.slots.len() * WAL_PAGE_SLOT_LEN;
371 let mut cursor = dir_start;
372 for slot in &self.slots {
373 let encoded = slot.encode();
374 buf[cursor..cursor + WAL_PAGE_SLOT_LEN].copy_from_slice(&encoded);
375 cursor += WAL_PAGE_SLOT_LEN;
376 }
377 buf
378 }
379
380 pub fn fragments(&self) -> &[WalPageSlot] {
381 &self.slots
382 }
383
384 pub fn payload(&self) -> &[u8] {
385 &self.payload
386 }
387
388 pub fn is_full(&self) -> bool {
389 self.full
390 }
391
392 pub fn has_payload(&self) -> bool {
393 !self.payload.is_empty() || !self.slots.is_empty()
394 }
395
396 pub fn last_end_lsn(&self) -> Option<Lsn> {
397 self.last_end_lsn
398 }
399
400 pub fn continuation(&self) -> Option<&WalFrameContinuation> {
401 self.continuation.as_ref()
402 }
403
404 pub fn prev_page_lsn(&self) -> Lsn {
405 self.header.prev_page_lsn
406 }
407
408 fn empty() -> Self {
409 Self {
410 header: WalPageHeader {
411 magic: WAL_PAGE_MAGIC,
412 version: WAL_PAGE_VERSION,
413 flags: 0,
414 prev_page_lsn: 0,
415 payload_size: 0,
416 slot_count: 0,
417 },
418 payload: Vec::new(),
419 slots: Vec::new(),
420 full: false,
421 continuation: None,
422 last_end_lsn: None,
423 }
424 }
425
426 fn available_bytes(payload_len: usize, slot_count: usize) -> usize {
427 WAL_PAGE_SIZE
428 .saturating_sub(WAL_PAGE_HEADER_LEN)
429 .saturating_sub(payload_len)
430 .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
431 }
432
433 fn available_for_next(payload_len: usize, slot_count: usize) -> usize {
434 WAL_PAGE_SIZE
435 .saturating_sub(WAL_PAGE_HEADER_LEN)
436 .saturating_sub(payload_len)
437 .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
438 }
439}
440
441impl Default for WalPage {
442 fn default() -> Self {
443 Self::empty()
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::recovery::wal::codec::{decode_frame, encode_frame};
451 use crate::recovery::wal_record::{
452 PageWritePayload, TransactionPayload, TransactionRecordKind, WalFrame, WalRecordPayload,
453 WAL_CRC_LEN, WAL_HEADER_LEN,
454 };
455 use bytes::Bytes;
456
457 fn make_transaction_record(start: Lsn, prev: Lsn, txn: u64) -> WalRecord {
458 let payload = WalRecordPayload::Transaction(TransactionPayload {
459 marker: TransactionRecordKind::Begin,
460 txn_id: txn,
461 });
462 build_record(start, prev, &payload)
463 }
464
465 fn build_record(start: Lsn, prev: Lsn, payload: &WalRecordPayload) -> WalRecord {
466 let frame = encode_frame(start, prev, payload);
467 let end = start + frame.len() as u64;
468 WalRecord {
469 start_lsn: start,
470 end_lsn: end,
471 payload: Bytes::from(frame),
472 }
473 }
474
475 fn decode_pages(pages: &[WalPage]) -> Vec<WalFrame> {
476 let mut frames = Vec::new();
477 let mut buffer = Vec::new();
478 for page in pages {
479 for slot in page.fragments() {
480 let start = slot.offset as usize;
481 let end = start + slot.len as usize;
482 let fragment = &page.payload()[start..end];
483 match slot.kind {
484 WalPageFragmentKind::Complete => {
485 buffer.clear();
486 let (frame, _) = decode_frame(fragment).expect("frame");
487 frames.push(frame);
488 }
489 WalPageFragmentKind::Start => {
490 buffer.clear();
491 buffer.extend_from_slice(fragment);
492 }
493 WalPageFragmentKind::Middle => {
494 assert!(!buffer.is_empty());
495 buffer.extend_from_slice(fragment);
496 }
497 WalPageFragmentKind::End => {
498 assert!(!buffer.is_empty());
499 buffer.extend_from_slice(fragment);
500 let (frame, _) = decode_frame(&buffer).expect("frame");
501 frames.push(frame);
502 buffer.clear();
503 }
504 }
505 }
506 }
507 frames
508 }
509
510 #[test]
511 fn pack_single_page_roundtrip() {
512 let mut records = Vec::new();
513 let mut start = 0;
514 let mut prev = 0;
515 for txn in 0..8 {
516 let record = make_transaction_record(start, prev, txn);
517 prev = record.start_lsn;
518 start = record.end_lsn;
519 records.push(record);
520 }
521
522 let (page, leftover, carry) = WalPage::pack_frames(0, records.clone(), None);
523 assert!(leftover.is_empty());
524 assert!(carry.is_none());
525 assert!(page.has_payload());
526
527 let bytes = page.to_bytes();
528 let decoded = WalPage::unpack_frames(&bytes).expect("unpack");
529 let frames = decode_pages(&[decoded]);
530 assert_eq!(frames.len(), records.len());
531 for (frame, record) in frames.iter().zip(records.iter()) {
532 assert_eq!(frame.lsn, record.start_lsn);
533 }
534 }
535
536 #[test]
537 fn pack_multiple_pages() {
538 let mut records = Vec::new();
539 let mut start = 0;
540 let mut prev = 0;
541 for txn in 0..128 {
543 let record = make_transaction_record(start, prev, txn);
544 prev = record.start_lsn;
545 start = record.end_lsn;
546 records.push(record);
547 }
548
549 let mut queue = records.clone();
550 let mut prev_page_lsn = 0;
551 let mut carry = None;
552 let mut pages = Vec::new();
553 while !queue.is_empty() || carry.is_some() {
554 let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
555 if page.has_payload() {
556 prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
557 pages.push(page);
558 }
559 queue = leftover;
560 carry = next;
561 }
562
563 assert!(pages.len() > 1);
564 let frames = decode_pages(&pages);
565 assert_eq!(frames.len(), records.len());
566 for (frame, record) in frames.iter().zip(records.iter()) {
567 assert_eq!(frame.lsn, record.start_lsn);
568 }
569 }
570
571 #[test]
572 fn pack_cross_page_frame() {
573 let page_image = vec![7u8; 4096];
574 let payload = WalRecordPayload::PageWrite(PageWritePayload {
575 page_id: 1,
576 prev_page_lsn: 0,
577 page_image,
578 });
579 let record = build_record(0, 0, &payload);
580
581 let mut pages = Vec::new();
582 let mut queue = vec![record.clone()];
583 let mut prev_page_lsn = 0;
584 let mut carry = None;
585 while !queue.is_empty() || carry.is_some() {
586 let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
587 assert!(page.has_payload());
588 prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
589 pages.push(page);
590 queue = leftover;
591 carry = next;
592 }
593
594 assert!(pages.len() >= 2);
595 let frames = decode_pages(&pages);
596 assert_eq!(frames.len(), 1);
597 assert_eq!(frames[0].lsn, record.start_lsn);
598 assert_eq!(
599 frames[0].body.len(),
600 record.payload.len() - WAL_HEADER_LEN - WAL_CRC_LEN
601 );
602 }
603}