1use std::convert::TryFrom;
2
3use crc32fast::Hasher;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal_record::WalRecordPayload;
7use crate::recovery::Lsn;
8use crate::storage::heap::wal_codec::{decode_heap_record, encode_heap_record, HeapRecordKind};
9
10pub mod checkpoint;
11pub mod clr;
12pub mod page;
13pub mod txn;
14
15pub use checkpoint::{decode_checkpoint, encode_checkpoint, CheckpointPayload};
16pub use clr::{decode_clr, encode_clr, ClrPayload};
17pub use page::{
18 decode_page_delta, decode_page_write, encode_page_delta, encode_page_write, PageDeltaPayload,
19 PageWritePayload,
20};
21pub use txn::{decode_transaction, encode_transaction, TransactionPayload, TransactionRecordKind};
22
23pub const WAL_MAGIC: u32 = 0x5157_414c; pub const WAL_VERSION_V1: u16 = 1;
25pub const WAL_VERSION: u16 = 2;
26pub const WAL_HEADER_LEN: usize = 4 + 2 + 8 + 8 + 1 + 1 + 4;
27pub const WAL_CRC_LEN: usize = 4;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[repr(u8)]
31pub enum ResourceManagerId {
32 Page = 1,
33 Transaction = 2,
34 Heap = 3,
35 Checkpoint = 4,
36 Clr = 5,
37}
38
39impl TryFrom<u8> for ResourceManagerId {
40 type Error = QuillSQLError;
41
42 fn try_from(value: u8) -> Result<Self, Self::Error> {
43 match value {
44 1 => Ok(ResourceManagerId::Page),
45 2 => Ok(ResourceManagerId::Transaction),
46 3 => Ok(ResourceManagerId::Heap),
47 4 => Ok(ResourceManagerId::Checkpoint),
48 5 => Ok(ResourceManagerId::Clr),
49 other => Err(QuillSQLError::Internal(format!(
50 "Unknown WAL resource manager id: {}",
51 other
52 ))),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
58pub struct WalFrame {
59 pub lsn: Lsn,
60 pub prev_lsn: Lsn,
61 pub rmid: ResourceManagerId,
62 pub info: u8,
63 pub body: Vec<u8>,
64}
65
66pub fn encode_frame(lsn: Lsn, prev_lsn: Lsn, payload: &WalRecordPayload) -> Vec<u8> {
67 let (rmid, info, body_bytes) = encode_body(payload);
68 build_frame(lsn, prev_lsn, rmid, info, &body_bytes)
69}
70
71pub fn decode_frame(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
72 if bytes.len() < 6 {
73 return Err(QuillSQLError::Internal(
74 "WAL frame too short to contain version".to_string(),
75 ));
76 }
77 let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
78 if magic != WAL_MAGIC {
79 return Err(QuillSQLError::Internal(format!(
80 "Invalid WAL magic: {:x}",
81 magic
82 )));
83 }
84 let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
85 match version {
86 WAL_VERSION => decode_frame_v2(bytes),
87 WAL_VERSION_V1 => decode_frame_v1(bytes),
88 other => Err(QuillSQLError::Internal(format!(
89 "Unsupported WAL version: {}",
90 other
91 ))),
92 }
93}
94
95pub(crate) fn encode_body(payload: &WalRecordPayload) -> (ResourceManagerId, u8, Vec<u8>) {
96 match payload {
97 WalRecordPayload::PageWrite(body) => (ResourceManagerId::Page, 0, encode_page_write(body)),
98 WalRecordPayload::PageDelta(body) => (ResourceManagerId::Page, 1, encode_page_delta(body)),
99 WalRecordPayload::Transaction(body) => {
100 let (info, buf) = encode_transaction(body);
101 (ResourceManagerId::Transaction, info, buf)
102 }
103 WalRecordPayload::Heap(body) => {
104 let (info, buf) = encode_heap_record(body);
105 (ResourceManagerId::Heap, info, buf)
106 }
107 WalRecordPayload::Checkpoint(body) => {
108 (ResourceManagerId::Checkpoint, 0, encode_checkpoint(body))
109 }
110 WalRecordPayload::Clr(body) => (ResourceManagerId::Clr, 0, encode_clr(body)),
111 }
112}
113
114fn build_frame(
115 lsn: Lsn,
116 prev_lsn: Lsn,
117 rmid: ResourceManagerId,
118 info: u8,
119 body_bytes: &[u8],
120) -> Vec<u8> {
121 let mut frame = Vec::with_capacity(WAL_HEADER_LEN + body_bytes.len() + WAL_CRC_LEN);
122 frame.extend_from_slice(&WAL_MAGIC.to_le_bytes());
123 frame.extend_from_slice(&WAL_VERSION.to_le_bytes());
124 frame.extend_from_slice(&lsn.to_le_bytes());
125 frame.extend_from_slice(&prev_lsn.to_le_bytes());
126 frame.push(rmid as u8);
127 frame.push(info);
128 frame.extend_from_slice(&(body_bytes.len() as u32).to_le_bytes());
129 frame.extend_from_slice(body_bytes);
130
131 let mut hasher = Hasher::new();
132 hasher.update(&frame);
133 let crc = hasher.finalize();
134 frame.extend_from_slice(&crc.to_le_bytes());
135 frame
136}
137
138fn decode_frame_v2(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
139 if bytes.len() < WAL_HEADER_LEN + WAL_CRC_LEN {
140 return Err(QuillSQLError::Internal(
141 "WAL frame too short to contain header".to_string(),
142 ));
143 }
144 let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
145 let prev_lsn = u64::from_le_bytes(bytes[14..22].try_into().unwrap());
146 let rmid = ResourceManagerId::try_from(bytes[22])?;
147 let info = bytes[23];
148 let body_len = u32::from_le_bytes(bytes[24..28].try_into().unwrap()) as usize;
149 let total_len = WAL_HEADER_LEN + body_len + WAL_CRC_LEN;
150 if bytes.len() < total_len {
151 return Err(QuillSQLError::Internal(
152 "WAL frame truncated before body end".to_string(),
153 ));
154 }
155
156 let body = &bytes[WAL_HEADER_LEN..WAL_HEADER_LEN + body_len];
157 let expected_crc = u32::from_le_bytes(
158 bytes[WAL_HEADER_LEN + body_len..total_len]
159 .try_into()
160 .unwrap(),
161 );
162 let mut hasher = Hasher::new();
163 hasher.update(&bytes[0..WAL_HEADER_LEN + body_len]);
164 let actual_crc = hasher.finalize();
165 if expected_crc != actual_crc {
166 return Err(QuillSQLError::Internal(
167 "CRC mismatch for WAL frame".to_string(),
168 ));
169 }
170
171 Ok((
172 WalFrame {
173 lsn,
174 prev_lsn,
175 rmid,
176 info,
177 body: body.to_vec(),
178 },
179 total_len,
180 ))
181}
182
183fn decode_frame_v1(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
184 const HEADER_LEN_V1: usize = 4 + 2 + 8 + 1 + 4; if bytes.len() < HEADER_LEN_V1 + WAL_CRC_LEN {
186 return Err(QuillSQLError::Internal(
187 "WAL frame too short to contain header".to_string(),
188 ));
189 }
190 let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
191 let kind = bytes[14];
192 let rmid = ResourceManagerId::try_from(kind)?;
193 let body_len = u32::from_le_bytes(bytes[15..19].try_into().unwrap()) as usize;
194 let total_len = HEADER_LEN_V1 + body_len + WAL_CRC_LEN;
195 if bytes.len() < total_len {
196 return Err(QuillSQLError::Internal(
197 "WAL frame truncated before body end".to_string(),
198 ));
199 }
200
201 let body = &bytes[HEADER_LEN_V1..HEADER_LEN_V1 + body_len];
202 let expected_crc = u32::from_le_bytes(
203 bytes[HEADER_LEN_V1 + body_len..total_len]
204 .try_into()
205 .unwrap(),
206 );
207 let mut hasher = Hasher::new();
208 hasher.update(&bytes[0..HEADER_LEN_V1 + body_len]);
209 let actual_crc = hasher.finalize();
210 if expected_crc != actual_crc {
211 return Err(QuillSQLError::Internal(
212 "CRC mismatch for WAL frame".to_string(),
213 ));
214 }
215
216 let info = match rmid {
217 ResourceManagerId::Page => 0,
218 ResourceManagerId::Transaction => {
219 if body.len() != 9 {
220 return Err(QuillSQLError::Internal(
221 "Legacy transaction payload must be 9 bytes".to_string(),
222 ));
223 }
224 body[8]
225 }
226 ResourceManagerId::Heap => {
227 if body.is_empty() {
228 return Err(QuillSQLError::Internal(
229 "Legacy heap payload missing kind byte".to_string(),
230 ));
231 }
232 body[0]
233 }
234 ResourceManagerId::Checkpoint | ResourceManagerId::Clr => 0,
235 };
236
237 Ok((
238 WalFrame {
239 lsn,
240 prev_lsn: lsn.saturating_sub(1),
241 rmid,
242 info,
243 body: match rmid {
244 ResourceManagerId::Page
245 | ResourceManagerId::Checkpoint
246 | ResourceManagerId::Clr => body.to_vec(),
247 ResourceManagerId::Transaction => body[..8].to_vec(),
248 ResourceManagerId::Heap => body[1..].to_vec(),
249 },
250 },
251 total_len,
252 ))
253}
254
255pub fn decode_payload(frame: &WalFrame) -> QuillSQLResult<WalRecordPayload> {
256 match frame.rmid {
257 ResourceManagerId::Page => match frame.info {
258 0 => Ok(WalRecordPayload::PageWrite(decode_page_write(&frame.body)?)),
259 1 => Ok(WalRecordPayload::PageDelta(decode_page_delta(&frame.body)?)),
260 other => Err(QuillSQLError::Internal(format!(
261 "Unknown Page info kind: {}",
262 other
263 ))),
264 },
265 ResourceManagerId::Transaction => Ok(WalRecordPayload::Transaction(decode_transaction(
266 &frame.body,
267 frame.info,
268 )?)),
269 ResourceManagerId::Heap => Ok(WalRecordPayload::Heap(decode_heap_record(
270 &frame.body,
271 frame.info,
272 )?)),
273 ResourceManagerId::Checkpoint => Ok(WalRecordPayload::Checkpoint(decode_checkpoint(
274 &frame.body,
275 )?)),
276 ResourceManagerId::Clr => Ok(WalRecordPayload::Clr(decode_clr(&frame.body)?)),
277 }
278}
279
280pub fn heap_record_kind_to_info(kind: HeapRecordKind) -> u8 {
281 kind as u8
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::recovery::wal_record::WalRecordPayload;
288 use crate::storage::heap::wal_codec::{
289 HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, HeapUpdatePayload, RelationIdent,
290 TupleMetaRepr,
291 };
292 use crate::transaction::INVALID_COMMAND_ID;
293
294 #[test]
295 fn encode_decode_page_write() {
296 let payload = WalRecordPayload::PageWrite(PageWritePayload {
297 page_id: 42,
298 prev_page_lsn: 7,
299 page_image: vec![1, 2, 3, 4, 5],
300 });
301 let bytes = payload.encode(100, 99);
302 let (frame, len) = decode_frame(&bytes).unwrap();
303 assert_eq!(len, bytes.len());
304 assert_eq!(frame.lsn, 100);
305 assert_eq!(frame.prev_lsn, 99);
306 assert_eq!(frame.rmid, ResourceManagerId::Page);
307 assert_eq!(frame.info, 0);
308 let decoded = decode_payload(&frame).unwrap();
309 match decoded {
310 WalRecordPayload::PageWrite(body) => {
311 assert_eq!(body.page_id, 42);
312 assert_eq!(body.prev_page_lsn, 7);
313 assert_eq!(body.page_image, vec![1, 2, 3, 4, 5]);
314 }
315 other => panic!("unexpected payload variant: {:?}", other),
316 }
317 }
318
319 #[test]
320 fn encode_decode_page_delta() {
321 let payload = WalRecordPayload::PageDelta(PageDeltaPayload {
322 page_id: 17,
323 prev_page_lsn: 8,
324 offset: 5,
325 data: vec![9, 9, 9],
326 });
327 let bytes = payload.encode(200, 150);
328 let (frame, len) = decode_frame(&bytes).unwrap();
329 assert_eq!(len, bytes.len());
330 assert_eq!(frame.rmid, ResourceManagerId::Page);
331 assert_eq!(frame.info, 1);
332 let decoded = decode_payload(&frame).unwrap();
333 match decoded {
334 WalRecordPayload::PageDelta(body) => {
335 assert_eq!(body.page_id, 17);
336 assert_eq!(body.offset, 5);
337 assert_eq!(body.data, vec![9, 9, 9]);
338 }
339 other => panic!("unexpected payload variant: {:?}", other),
340 }
341 }
342
343 #[test]
344 fn encode_decode_transaction() {
345 let payload = WalRecordPayload::Transaction(TransactionPayload {
346 marker: TransactionRecordKind::Commit,
347 txn_id: 99,
348 });
349 let bytes = payload.encode(300, 200);
350 let (frame, len) = decode_frame(&bytes).unwrap();
351 assert_eq!(len, bytes.len());
352 assert_eq!(frame.rmid, ResourceManagerId::Transaction);
353 assert_eq!(frame.info, TransactionRecordKind::Commit as u8);
354 let decoded = decode_payload(&frame).unwrap();
355 match decoded {
356 WalRecordPayload::Transaction(body) => {
357 assert_eq!(body.marker, TransactionRecordKind::Commit);
358 assert_eq!(body.txn_id, 99);
359 }
360 other => panic!("unexpected payload variant: {:?}", other),
361 }
362 }
363
364 #[test]
365 fn encode_decode_heap_insert() {
366 let payload = WalRecordPayload::Heap(HeapRecordPayload::Insert(HeapInsertPayload {
367 relation: RelationIdent { root_page_id: 10 },
368 page_id: 12,
369 slot_id: 2,
370 op_txn_id: 1,
371 tuple_meta: TupleMetaRepr {
372 insert_txn_id: 1,
373 insert_cid: 0,
374 delete_txn_id: 0,
375 delete_cid: INVALID_COMMAND_ID,
376 is_deleted: false,
377 next_version: None,
378 prev_version: None,
379 },
380 tuple_data: vec![7, 8, 9],
381 }));
382 let bytes = payload.encode(123, 100);
383 let (frame, len) = decode_frame(&bytes).unwrap();
384 assert_eq!(len, bytes.len());
385 assert_eq!(frame.rmid, ResourceManagerId::Heap);
386 assert_eq!(frame.info, HeapRecordKind::Insert as u8);
387 let decoded = decode_payload(&frame).unwrap();
388 match decoded {
389 WalRecordPayload::Heap(HeapRecordPayload::Insert(body)) => {
390 assert_eq!(body.relation.root_page_id, 10);
391 assert_eq!(body.page_id, 12);
392 assert_eq!(body.slot_id, 2);
393 assert_eq!(body.tuple_data, vec![7, 8, 9]);
394 }
395 other => panic!("unexpected payload variant: {:?}", other),
396 }
397 }
398
399 #[test]
400 fn encode_decode_heap_update() {
401 let payload = WalRecordPayload::Heap(HeapRecordPayload::Update(HeapUpdatePayload {
402 relation: RelationIdent { root_page_id: 99 },
403 page_id: 44,
404 slot_id: 5,
405 op_txn_id: 11,
406 new_tuple_meta: TupleMetaRepr {
407 insert_txn_id: 11,
408 insert_cid: 0,
409 delete_txn_id: 0,
410 delete_cid: INVALID_COMMAND_ID,
411 is_deleted: false,
412 next_version: None,
413 prev_version: None,
414 },
415 new_tuple_data: vec![1, 2, 3, 4],
416 old_tuple_meta: Some(TupleMetaRepr {
417 insert_txn_id: 5,
418 insert_cid: 0,
419 delete_txn_id: 7,
420 delete_cid: 0,
421 is_deleted: true,
422 next_version: None,
423 prev_version: None,
424 }),
425 old_tuple_data: Some(vec![9, 9, 9]),
426 }));
427 let bytes = payload.encode(200, 150);
428 let (frame, len) = decode_frame(&bytes).unwrap();
429 assert_eq!(len, bytes.len());
430 assert_eq!(frame.rmid, ResourceManagerId::Heap);
431 assert_eq!(frame.info, HeapRecordKind::Update as u8);
432 let decoded = decode_payload(&frame).unwrap();
433 match decoded {
434 WalRecordPayload::Heap(HeapRecordPayload::Update(body)) => {
435 assert_eq!(body.relation.root_page_id, 99);
436 assert_eq!(body.new_tuple_data, vec![1, 2, 3, 4]);
437 assert!(body.old_tuple_meta.unwrap().is_deleted);
438 assert_eq!(body.old_tuple_data.unwrap(), vec![9, 9, 9]);
439 }
440 other => panic!("unexpected payload variant: {:?}", other),
441 }
442 }
443
444 #[test]
445 fn encode_decode_heap_delete() {
446 let payload = WalRecordPayload::Heap(HeapRecordPayload::Delete(HeapDeletePayload {
447 relation: RelationIdent { root_page_id: 7 },
448 page_id: 3,
449 slot_id: 1,
450 op_txn_id: 4,
451 old_tuple_meta: TupleMetaRepr {
452 insert_txn_id: 2,
453 insert_cid: 0,
454 delete_txn_id: 4,
455 delete_cid: 0,
456 is_deleted: true,
457 next_version: None,
458 prev_version: None,
459 },
460 old_tuple_data: None,
461 }));
462 let bytes = payload.encode(80, 60);
463 let (frame, len) = decode_frame(&bytes).unwrap();
464 assert_eq!(len, bytes.len());
465 assert_eq!(frame.rmid, ResourceManagerId::Heap);
466 assert_eq!(frame.info, HeapRecordKind::Delete as u8);
467 let decoded = decode_payload(&frame).unwrap();
468 match decoded {
469 WalRecordPayload::Heap(HeapRecordPayload::Delete(body)) => {
470 assert_eq!(body.relation.root_page_id, 7);
471 assert!(body.old_tuple_meta.is_deleted);
472 assert!(body.old_tuple_data.is_none());
473 }
474 other => panic!("unexpected payload variant: {:?}", other),
475 }
476 }
477
478 #[test]
479 fn encode_decode_checkpoint() {
480 let payload = WalRecordPayload::Checkpoint(CheckpointPayload {
481 last_lsn: 123,
482 dirty_pages: vec![10, 11, 12],
483 active_transactions: vec![1, 2, 3],
484 dpt: vec![(10, 1000), (11, 1100)],
485 });
486 let bytes = payload.encode(999, 900);
487 let (frame, len) = decode_frame(&bytes).unwrap();
488 assert_eq!(len, bytes.len());
489 assert_eq!(frame.rmid, ResourceManagerId::Checkpoint);
490 let decoded = decode_payload(&frame).unwrap();
491 match decoded {
492 WalRecordPayload::Checkpoint(body) => {
493 assert_eq!(body.last_lsn, 123);
494 assert_eq!(body.dirty_pages, vec![10, 11, 12]);
495 assert_eq!(body.active_transactions, vec![1, 2, 3]);
496 assert_eq!(body.dpt, vec![(10, 1000), (11, 1100)]);
497 }
498 other => panic!("unexpected payload variant: {:?}", other),
499 }
500 }
501
502 #[test]
503 fn encode_decode_clr() {
504 let clr = ClrPayload {
505 txn_id: 11,
506 undone_lsn: 1234,
507 undo_next_lsn: 0,
508 };
509 let payload = WalRecordPayload::Clr(clr.clone());
510 let bytes = payload.encode(200, 150);
511 let (frame, len) = decode_frame(&bytes).unwrap();
512 assert_eq!(len, bytes.len());
513 assert_eq!(frame.rmid, ResourceManagerId::Clr);
514 let decoded = decode_payload(&frame).unwrap();
515 match decoded {
516 WalRecordPayload::Clr(body) => {
517 assert_eq!(body.txn_id, clr.txn_id);
518 assert_eq!(body.undone_lsn, clr.undone_lsn);
519 assert_eq!(body.undo_next_lsn, clr.undo_next_lsn);
520 }
521 other => panic!("unexpected payload variant: {:?}", other),
522 }
523 }
524}