1use std::collections::VecDeque;
2
3use super::record::WalRecord;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::recovery::Lsn;
6
7pub const WAL_PAGE_SIZE: usize = 4096;
8const WAL_PAGE_MAGIC: u32 = 0x5157_5047; const WAL_PAGE_VERSION: u16 = 1;
10
11const WAL_PAGE_HEADER_LEN: usize = 4 + 2 + 2 + 8 + 2 + 2;
12const WAL_PAGE_SLOT_LEN: usize = 8;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct WalPageHeader {
17 pub magic: u32,
18 pub version: u16,
19 pub flags: u16,
20 pub prev_page_lsn: Lsn,
21 pub payload_size: u16,
22 pub slot_count: u16,
23}
24
25impl WalPageHeader {
26 fn encode(&self, buf: &mut [u8]) {
27 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
28 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
29 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
30 buf[8..16].copy_from_slice(&self.prev_page_lsn.to_le_bytes());
31 buf[16..18].copy_from_slice(&self.payload_size.to_le_bytes());
32 buf[18..20].copy_from_slice(&self.slot_count.to_le_bytes());
33 }
34
35 fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
36 if bytes.len() < WAL_PAGE_HEADER_LEN {
37 return Err(QuillSQLError::Internal(
38 "WAL page truncated before header".to_string(),
39 ));
40 }
41 let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
42 let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
43 let flags = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
44 let prev_page_lsn = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
45 let payload_size = u16::from_le_bytes(bytes[16..18].try_into().unwrap());
46 let slot_count = u16::from_le_bytes(bytes[18..20].try_into().unwrap());
47 Ok(Self {
48 magic,
49 version,
50 flags,
51 prev_page_lsn,
52 payload_size,
53 slot_count,
54 })
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum WalPageFragmentKind {
62 Complete,
63 Start,
64 Middle,
65 End,
66}
67
68impl WalPageFragmentKind {
69 fn to_byte(self) -> u8 {
70 match self {
71 WalPageFragmentKind::Complete => 0,
72 WalPageFragmentKind::Start => 1,
73 WalPageFragmentKind::Middle => 2,
74 WalPageFragmentKind::End => 3,
75 }
76 }
77
78 fn from_byte(value: u8) -> QuillSQLResult<Self> {
79 match value {
80 0 => Ok(WalPageFragmentKind::Complete),
81 1 => Ok(WalPageFragmentKind::Start),
82 2 => Ok(WalPageFragmentKind::Middle),
83 3 => Ok(WalPageFragmentKind::End),
84 other => Err(QuillSQLError::Internal(format!(
85 "Unknown WAL page fragment kind: {}",
86 other
87 ))),
88 }
89 }
90}
91
92#[derive(Debug, Clone, Copy)]
95pub struct WalPageSlot {
96 pub offset: u16,
97 pub len: u16,
98 pub kind: WalPageFragmentKind,
99}
100
101impl WalPageSlot {
102 fn encode(&self) -> [u8; WAL_PAGE_SLOT_LEN] {
103 let mut buf = [0u8; WAL_PAGE_SLOT_LEN];
104 buf[0..2].copy_from_slice(&self.offset.to_le_bytes());
105 buf[2..4].copy_from_slice(&self.len.to_le_bytes());
106 buf[4] = self.kind.to_byte();
107 buf
108 }
109
110 fn decode(bytes: &[u8]) -> QuillSQLResult<Self> {
111 if bytes.len() < WAL_PAGE_SLOT_LEN {
112 return Err(QuillSQLError::Internal(
113 "WAL page truncated before slot".to_string(),
114 ));
115 }
116 let offset = u16::from_le_bytes(bytes[0..2].try_into().unwrap());
117 let len = u16::from_le_bytes(bytes[2..4].try_into().unwrap());
118 let kind = WalPageFragmentKind::from_byte(bytes[4])?;
119 Ok(Self { offset, len, kind })
120 }
121}
122
123#[derive(Clone)]
126pub struct WalFrameContinuation {
127 pub record: WalRecord,
128 pub offset: usize,
129}
130
131impl WalFrameContinuation {
132 fn remaining(&self) -> usize {
133 self.record.payload.len().saturating_sub(self.offset)
134 }
135}
136
137#[derive(Clone)]
160pub struct WalPage {
161 header: WalPageHeader,
162 payload: Vec<u8>,
163 slots: Vec<WalPageSlot>,
164 full: bool,
165 continuation: Option<WalFrameContinuation>,
166 last_end_lsn: Option<Lsn>,
167}
168
169impl WalPage {
170 pub fn pack_frames(
171 prev_page_lsn: Lsn,
172 frames: Vec<WalRecord>,
173 carry: Option<WalFrameContinuation>,
174 ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
175 let queue: VecDeque<WalRecord> = frames.into();
176 let (payload, slots, leftover, continuation, last_end_lsn, full) =
177 Self::fill_page(Vec::new(), Vec::new(), queue, carry, None);
178
179 let header = WalPageHeader {
180 magic: WAL_PAGE_MAGIC,
181 version: WAL_PAGE_VERSION,
182 flags: 0,
183 prev_page_lsn,
184 payload_size: payload.len() as u16,
185 slot_count: slots.len() as u16,
186 };
187
188 (
189 Self {
190 header,
191 payload,
192 slots,
193 full,
194 continuation: continuation.clone(),
195 last_end_lsn,
196 },
197 leftover,
198 continuation,
199 )
200 }
201
202 pub fn continue_pack(
203 mut self,
204 frames: Vec<WalRecord>,
205 ) -> (Self, Vec<WalRecord>, Option<WalFrameContinuation>) {
206 let queue: VecDeque<WalRecord> = frames.into();
207 let (payload, slots, leftover, continuation, last_end_lsn, full) = Self::fill_page(
208 self.payload,
209 self.slots,
210 queue,
211 self.continuation.take(),
212 self.last_end_lsn,
213 );
214
215 self.payload = payload;
216 self.slots = slots;
217 self.continuation = continuation.clone();
218 self.last_end_lsn = last_end_lsn;
219 self.full = full;
220 self.header.payload_size = self.payload.len() as u16;
221 self.header.slot_count = self.slots.len() as u16;
222
223 (self, leftover, continuation)
224 }
225
226 fn fill_page(
227 mut payload: Vec<u8>,
228 mut slots: Vec<WalPageSlot>,
229 mut queue: VecDeque<WalRecord>,
230 mut continuation: Option<WalFrameContinuation>,
231 mut last_end_lsn: Option<Lsn>,
232 ) -> (
233 Vec<u8>,
234 Vec<WalPageSlot>,
235 Vec<WalRecord>,
236 Option<WalFrameContinuation>,
237 Option<Lsn>,
238 bool,
239 ) {
240 loop {
241 if continuation.is_none() && queue.is_empty() {
242 break;
243 }
244
245 let available = Self::available_bytes(payload.len(), slots.len());
246 if available == 0 {
247 break;
248 }
249
250 if let Some(mut cont) = continuation.take() {
251 if cont.remaining() == 0 {
252 last_end_lsn = Some(cont.record.end_lsn);
253 continue;
254 }
255 let take = available.min(cont.remaining());
256 if take == 0 {
257 continuation = Some(cont);
258 break;
259 }
260 let start = payload.len();
261 payload.extend_from_slice(&cont.record.payload[cont.offset..cont.offset + take]);
262 let kind = if cont.offset == 0 {
263 if cont.offset + take == cont.record.payload.len() {
264 WalPageFragmentKind::Complete
265 } else {
266 WalPageFragmentKind::Start
267 }
268 } else if cont.offset + take == cont.record.payload.len() {
269 WalPageFragmentKind::End
270 } else {
271 WalPageFragmentKind::Middle
272 };
273 slots.push(WalPageSlot {
274 offset: start as u16,
275 len: take as u16,
276 kind,
277 });
278 cont.offset += take;
279 if cont.offset == cont.record.payload.len() {
280 last_end_lsn = Some(cont.record.end_lsn);
281 continuation = None;
282 } else {
283 continuation = Some(cont);
284 }
285 continue;
286 }
287
288 if let Some(record) = queue.pop_front() {
289 continuation = Some(WalFrameContinuation { record, offset: 0 });
290 continue;
291 }
292 }
293
294 let leftover: Vec<WalRecord> = queue.into_iter().collect();
295 let full = Self::available_for_next(payload.len(), slots.len()) == 0;
296
297 (payload, slots, leftover, continuation, last_end_lsn, full)
298 }
299
300 pub fn unpack_frames(bytes: &[u8]) -> QuillSQLResult<Self> {
301 if bytes.len() < WAL_PAGE_SIZE {
302 return Err(QuillSQLError::Internal(
303 "WAL page truncated before full page".to_string(),
304 ));
305 }
306 if bytes.iter().all(|&b| b == 0) {
307 return Ok(Self::empty());
308 }
309
310 let header = WalPageHeader::decode(&bytes[..WAL_PAGE_HEADER_LEN])?;
311 if header.magic != WAL_PAGE_MAGIC {
312 return Err(QuillSQLError::Internal(format!(
313 "Invalid WAL page magic: {:x}",
314 header.magic
315 )));
316 }
317 if header.version != WAL_PAGE_VERSION {
318 return Err(QuillSQLError::Internal(format!(
319 "Unsupported WAL page version: {}",
320 header.version
321 )));
322 }
323
324 let payload_end = WAL_PAGE_HEADER_LEN + header.payload_size as usize;
325 if payload_end > WAL_PAGE_SIZE {
326 return Err(QuillSQLError::Internal(
327 "WAL page payload exceeds page size".to_string(),
328 ));
329 }
330 let dir_start = WAL_PAGE_SIZE
331 .checked_sub(header.slot_count as usize * WAL_PAGE_SLOT_LEN)
332 .ok_or_else(|| {
333 QuillSQLError::Internal("WAL page directory exceeds page size".to_string())
334 })?;
335 if dir_start < payload_end {
336 return Err(QuillSQLError::Internal(
337 "WAL page directory overlaps payload".to_string(),
338 ));
339 }
340
341 let payload = bytes[WAL_PAGE_HEADER_LEN..payload_end].to_vec();
342 let mut slots = Vec::with_capacity(header.slot_count as usize);
343 let mut cursor = dir_start;
344 for _ in 0..header.slot_count {
345 let slot = WalPageSlot::decode(&bytes[cursor..cursor + WAL_PAGE_SLOT_LEN])?;
346 if slot.offset as usize + slot.len as usize > payload.len() {
347 return Err(QuillSQLError::Internal(
348 "WAL page slot exceeds payload".to_string(),
349 ));
350 }
351 slots.push(slot);
352 cursor += WAL_PAGE_SLOT_LEN;
353 }
354
355 let full = Self::available_for_next(payload.len(), slots.len()) == 0;
356 Ok(Self {
357 header,
358 payload,
359 slots,
360 full,
361 continuation: None,
362 last_end_lsn: None,
363 })
364 }
365
366 pub fn to_bytes(&self) -> Vec<u8> {
367 let mut buf = vec![0u8; WAL_PAGE_SIZE];
368 self.header.encode(&mut buf[..WAL_PAGE_HEADER_LEN]);
369 buf[WAL_PAGE_HEADER_LEN..WAL_PAGE_HEADER_LEN + self.payload.len()]
370 .copy_from_slice(&self.payload);
371 let dir_start = WAL_PAGE_SIZE - self.slots.len() * WAL_PAGE_SLOT_LEN;
372 let mut cursor = dir_start;
373 for slot in &self.slots {
374 let encoded = slot.encode();
375 buf[cursor..cursor + WAL_PAGE_SLOT_LEN].copy_from_slice(&encoded);
376 cursor += WAL_PAGE_SLOT_LEN;
377 }
378 buf
379 }
380
381 pub fn fragments(&self) -> &[WalPageSlot] {
382 &self.slots
383 }
384
385 pub fn payload(&self) -> &[u8] {
386 &self.payload
387 }
388
389 pub fn is_full(&self) -> bool {
390 self.full
391 }
392
393 pub fn has_payload(&self) -> bool {
394 !self.payload.is_empty() || !self.slots.is_empty()
395 }
396
397 pub fn last_end_lsn(&self) -> Option<Lsn> {
398 self.last_end_lsn
399 }
400
401 pub fn continuation(&self) -> Option<&WalFrameContinuation> {
402 self.continuation.as_ref()
403 }
404
405 pub fn prev_page_lsn(&self) -> Lsn {
406 self.header.prev_page_lsn
407 }
408
409 fn empty() -> Self {
410 Self {
411 header: WalPageHeader {
412 magic: WAL_PAGE_MAGIC,
413 version: WAL_PAGE_VERSION,
414 flags: 0,
415 prev_page_lsn: 0,
416 payload_size: 0,
417 slot_count: 0,
418 },
419 payload: Vec::new(),
420 slots: Vec::new(),
421 full: false,
422 continuation: None,
423 last_end_lsn: None,
424 }
425 }
426
427 fn available_bytes(payload_len: usize, slot_count: usize) -> usize {
428 WAL_PAGE_SIZE
429 .saturating_sub(WAL_PAGE_HEADER_LEN)
430 .saturating_sub(payload_len)
431 .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
432 }
433
434 fn available_for_next(payload_len: usize, slot_count: usize) -> usize {
435 WAL_PAGE_SIZE
436 .saturating_sub(WAL_PAGE_HEADER_LEN)
437 .saturating_sub(payload_len)
438 .saturating_sub((slot_count + 1) * WAL_PAGE_SLOT_LEN)
439 }
440}
441
442impl Default for WalPage {
443 fn default() -> Self {
444 Self::empty()
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use crate::recovery::wal::codec::{decode_frame, encode_frame};
452 use crate::recovery::wal_record::{
453 PageWritePayload, TransactionPayload, TransactionRecordKind, WalFrame, WalRecordPayload,
454 WAL_CRC_LEN, WAL_HEADER_LEN,
455 };
456 use bytes::Bytes;
457
458 fn make_transaction_record(start: Lsn, prev: Lsn, txn: u64) -> WalRecord {
459 let payload = WalRecordPayload::Transaction(TransactionPayload {
460 marker: TransactionRecordKind::Begin,
461 txn_id: txn,
462 });
463 build_record(start, prev, &payload)
464 }
465
466 fn build_record(start: Lsn, prev: Lsn, payload: &WalRecordPayload) -> WalRecord {
467 let frame = encode_frame(start, prev, payload);
468 let end = start + frame.len() as u64;
469 WalRecord {
470 start_lsn: start,
471 end_lsn: end,
472 payload: Bytes::from(frame),
473 }
474 }
475
476 fn decode_pages(pages: &[WalPage]) -> Vec<WalFrame> {
477 let mut frames = Vec::new();
478 let mut buffer = Vec::new();
479 for page in pages {
480 for slot in page.fragments() {
481 let start = slot.offset as usize;
482 let end = start + slot.len as usize;
483 let fragment = &page.payload()[start..end];
484 match slot.kind {
485 WalPageFragmentKind::Complete => {
486 buffer.clear();
487 let (frame, _) = decode_frame(fragment).expect("frame");
488 frames.push(frame);
489 }
490 WalPageFragmentKind::Start => {
491 buffer.clear();
492 buffer.extend_from_slice(fragment);
493 }
494 WalPageFragmentKind::Middle => {
495 assert!(!buffer.is_empty());
496 buffer.extend_from_slice(fragment);
497 }
498 WalPageFragmentKind::End => {
499 assert!(!buffer.is_empty());
500 buffer.extend_from_slice(fragment);
501 let (frame, _) = decode_frame(&buffer).expect("frame");
502 frames.push(frame);
503 buffer.clear();
504 }
505 }
506 }
507 }
508 frames
509 }
510
511 #[test]
512 fn pack_single_page_roundtrip() {
513 let mut records = Vec::new();
514 let mut start = 0;
515 let mut prev = 0;
516 for txn in 0..8 {
517 let record = make_transaction_record(start, prev, txn);
518 prev = record.start_lsn;
519 start = record.end_lsn;
520 records.push(record);
521 }
522
523 let (page, leftover, carry) = WalPage::pack_frames(0, records.clone(), None);
524 assert!(leftover.is_empty());
525 assert!(carry.is_none());
526 assert!(page.has_payload());
527
528 let bytes = page.to_bytes();
529 let decoded = WalPage::unpack_frames(&bytes).expect("unpack");
530 let frames = decode_pages(&[decoded]);
531 assert_eq!(frames.len(), records.len());
532 for (frame, record) in frames.iter().zip(records.iter()) {
533 assert_eq!(frame.lsn, record.start_lsn);
534 }
535 }
536
537 #[test]
538 fn pack_multiple_pages() {
539 let mut records = Vec::new();
540 let mut start = 0;
541 let mut prev = 0;
542 for txn in 0..128 {
544 let record = make_transaction_record(start, prev, txn);
545 prev = record.start_lsn;
546 start = record.end_lsn;
547 records.push(record);
548 }
549
550 let mut queue = records.clone();
551 let mut prev_page_lsn = 0;
552 let mut carry = None;
553 let mut pages = Vec::new();
554 while !queue.is_empty() || carry.is_some() {
555 let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
556 if page.has_payload() {
557 prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
558 pages.push(page);
559 }
560 queue = leftover;
561 carry = next;
562 }
563
564 assert!(pages.len() > 1);
565 let frames = decode_pages(&pages);
566 assert_eq!(frames.len(), records.len());
567 for (frame, record) in frames.iter().zip(records.iter()) {
568 assert_eq!(frame.lsn, record.start_lsn);
569 }
570 }
571
572 #[test]
573 fn pack_cross_page_frame() {
574 let page_image = vec![7u8; 4096];
575 let payload = WalRecordPayload::PageWrite(PageWritePayload {
576 page_id: 1,
577 prev_page_lsn: 0,
578 page_image,
579 });
580 let record = build_record(0, 0, &payload);
581
582 let mut pages = Vec::new();
583 let mut queue = vec![record.clone()];
584 let mut prev_page_lsn = 0;
585 let mut carry = None;
586 while !queue.is_empty() || carry.is_some() {
587 let (page, leftover, next) = WalPage::pack_frames(prev_page_lsn, queue, carry);
588 assert!(page.has_payload());
589 prev_page_lsn = page.last_end_lsn().unwrap_or(prev_page_lsn);
590 pages.push(page);
591 queue = leftover;
592 carry = next;
593 }
594
595 assert!(pages.len() >= 2);
596 let frames = decode_pages(&pages);
597 assert_eq!(frames.len(), 1);
598 assert_eq!(frames[0].lsn, record.start_lsn);
599 assert_eq!(
600 frames[0].body.len(),
601 record.payload.len() - WAL_HEADER_LEN - WAL_CRC_LEN
602 );
603 }
604}