1use std::convert::TryFrom;
2
3use crc32fast::Hasher;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal_record::WalRecordPayload;
7use crate::recovery::Lsn;
8use crate::storage::heap::wal_codec::{decode_heap_record, encode_heap_record, HeapRecordKind};
9use crate::storage::index::wal_codec::{decode_index_record, encode_index_record};
10
11pub mod checkpoint;
12pub mod clr;
13pub mod page;
14pub mod txn;
15
16pub use checkpoint::{decode_checkpoint, encode_checkpoint, CheckpointPayload};
17pub use clr::{decode_clr, encode_clr, ClrPayload};
18pub use page::{decode_page_write, encode_page_write, PageWritePayload};
19pub use txn::{decode_transaction, encode_transaction, TransactionPayload, TransactionRecordKind};
20
21pub const WAL_MAGIC: u32 = 0x5157_414c; pub const WAL_VERSION_V1: u16 = 1;
23pub const WAL_VERSION: u16 = 2;
24pub const WAL_HEADER_LEN: usize = 4 + 2 + 8 + 8 + 1 + 1 + 4;
25pub const WAL_CRC_LEN: usize = 4;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28#[repr(u8)]
29pub enum ResourceManagerId {
30 Page = 1,
31 Transaction = 2,
32 Heap = 3,
33 Checkpoint = 4,
34 Clr = 5,
35 Index = 6,
36}
37
38impl TryFrom<u8> for ResourceManagerId {
39 type Error = QuillSQLError;
40
41 fn try_from(value: u8) -> Result<Self, Self::Error> {
42 match value {
43 1 => Ok(ResourceManagerId::Page),
44 2 => Ok(ResourceManagerId::Transaction),
45 3 => Ok(ResourceManagerId::Heap),
46 4 => Ok(ResourceManagerId::Checkpoint),
47 5 => Ok(ResourceManagerId::Clr),
48 6 => Ok(ResourceManagerId::Index),
49 other => Err(QuillSQLError::Internal(format!(
50 "Unknown WAL resource manager id: {}",
51 other
52 ))),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
58pub struct WalFrame {
59 pub lsn: Lsn,
60 pub prev_lsn: Lsn,
61 pub rmid: ResourceManagerId,
62 pub info: u8,
63 pub body: Vec<u8>,
64}
65
66pub fn encode_frame(lsn: Lsn, prev_lsn: Lsn, payload: &WalRecordPayload) -> Vec<u8> {
67 let (rmid, info, body_bytes) = encode_body(payload);
68 build_frame(lsn, prev_lsn, rmid, info, &body_bytes)
69}
70
71pub fn decode_frame(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
72 if bytes.len() < 6 {
73 return Err(QuillSQLError::Internal(
74 "WAL frame too short to contain version".to_string(),
75 ));
76 }
77 let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
78 if magic != WAL_MAGIC {
79 return Err(QuillSQLError::Internal(format!(
80 "Invalid WAL magic: {:x}",
81 magic
82 )));
83 }
84 let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
85 match version {
86 WAL_VERSION => decode_frame_v2(bytes),
87 WAL_VERSION_V1 => decode_frame_v1(bytes),
88 other => Err(QuillSQLError::Internal(format!(
89 "Unsupported WAL version: {}",
90 other
91 ))),
92 }
93}
94
95pub(crate) fn encode_body(payload: &WalRecordPayload) -> (ResourceManagerId, u8, Vec<u8>) {
96 match payload {
97 WalRecordPayload::PageWrite(body) => (ResourceManagerId::Page, 0, encode_page_write(body)),
98 WalRecordPayload::Transaction(body) => {
99 let (info, buf) = encode_transaction(body);
100 (ResourceManagerId::Transaction, info, buf)
101 }
102 WalRecordPayload::Heap(body) => {
103 let (info, buf) = encode_heap_record(body);
104 (ResourceManagerId::Heap, info, buf)
105 }
106 WalRecordPayload::Index(body) => {
107 let (info, buf) = encode_index_record(body);
108 (ResourceManagerId::Index, info, buf)
109 }
110 WalRecordPayload::Checkpoint(body) => {
111 (ResourceManagerId::Checkpoint, 0, encode_checkpoint(body))
112 }
113 WalRecordPayload::Clr(body) => (ResourceManagerId::Clr, 0, encode_clr(body)),
114 }
115}
116
117fn build_frame(
118 lsn: Lsn,
119 prev_lsn: Lsn,
120 rmid: ResourceManagerId,
121 info: u8,
122 body_bytes: &[u8],
123) -> Vec<u8> {
124 let mut frame = Vec::with_capacity(WAL_HEADER_LEN + body_bytes.len() + WAL_CRC_LEN);
125 frame.extend_from_slice(&WAL_MAGIC.to_le_bytes());
126 frame.extend_from_slice(&WAL_VERSION.to_le_bytes());
127 frame.extend_from_slice(&lsn.to_le_bytes());
128 frame.extend_from_slice(&prev_lsn.to_le_bytes());
129 frame.push(rmid as u8);
130 frame.push(info);
131 frame.extend_from_slice(&(body_bytes.len() as u32).to_le_bytes());
132 frame.extend_from_slice(body_bytes);
133
134 let mut hasher = Hasher::new();
135 hasher.update(&frame);
136 let crc = hasher.finalize();
137 frame.extend_from_slice(&crc.to_le_bytes());
138 frame
139}
140
141fn decode_frame_v2(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
142 if bytes.len() < WAL_HEADER_LEN + WAL_CRC_LEN {
143 return Err(QuillSQLError::Internal(
144 "WAL frame too short to contain header".to_string(),
145 ));
146 }
147 let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
148 let prev_lsn = u64::from_le_bytes(bytes[14..22].try_into().unwrap());
149 let rmid = ResourceManagerId::try_from(bytes[22])?;
150 let info = bytes[23];
151 let body_len = u32::from_le_bytes(bytes[24..28].try_into().unwrap()) as usize;
152 let total_len = WAL_HEADER_LEN + body_len + WAL_CRC_LEN;
153 if bytes.len() < total_len {
154 return Err(QuillSQLError::Internal(
155 "WAL frame truncated before body end".to_string(),
156 ));
157 }
158
159 let body = &bytes[WAL_HEADER_LEN..WAL_HEADER_LEN + body_len];
160 let expected_crc = u32::from_le_bytes(
161 bytes[WAL_HEADER_LEN + body_len..total_len]
162 .try_into()
163 .unwrap(),
164 );
165 let mut hasher = Hasher::new();
166 hasher.update(&bytes[0..WAL_HEADER_LEN + body_len]);
167 let actual_crc = hasher.finalize();
168 if expected_crc != actual_crc {
169 return Err(QuillSQLError::Internal(
170 "CRC mismatch for WAL frame".to_string(),
171 ));
172 }
173
174 Ok((
175 WalFrame {
176 lsn,
177 prev_lsn,
178 rmid,
179 info,
180 body: body.to_vec(),
181 },
182 total_len,
183 ))
184}
185
186fn decode_frame_v1(bytes: &[u8]) -> QuillSQLResult<(WalFrame, usize)> {
187 const HEADER_LEN_V1: usize = 4 + 2 + 8 + 1 + 4; if bytes.len() < HEADER_LEN_V1 + WAL_CRC_LEN {
189 return Err(QuillSQLError::Internal(
190 "WAL frame too short to contain header".to_string(),
191 ));
192 }
193 let lsn = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
194 let kind = bytes[14];
195 let rmid = ResourceManagerId::try_from(kind)?;
196 let body_len = u32::from_le_bytes(bytes[15..19].try_into().unwrap()) as usize;
197 let total_len = HEADER_LEN_V1 + body_len + WAL_CRC_LEN;
198 if bytes.len() < total_len {
199 return Err(QuillSQLError::Internal(
200 "WAL frame truncated before body end".to_string(),
201 ));
202 }
203
204 let body = &bytes[HEADER_LEN_V1..HEADER_LEN_V1 + body_len];
205 let expected_crc = u32::from_le_bytes(
206 bytes[HEADER_LEN_V1 + body_len..total_len]
207 .try_into()
208 .unwrap(),
209 );
210 let mut hasher = Hasher::new();
211 hasher.update(&bytes[0..HEADER_LEN_V1 + body_len]);
212 let actual_crc = hasher.finalize();
213 if expected_crc != actual_crc {
214 return Err(QuillSQLError::Internal(
215 "CRC mismatch for WAL frame".to_string(),
216 ));
217 }
218
219 let info = match rmid {
220 ResourceManagerId::Page => 0,
221 ResourceManagerId::Transaction => {
222 if body.len() != 9 {
223 return Err(QuillSQLError::Internal(
224 "Legacy transaction payload must be 9 bytes".to_string(),
225 ));
226 }
227 body[8]
228 }
229 ResourceManagerId::Heap => {
230 if body.is_empty() {
231 return Err(QuillSQLError::Internal(
232 "Legacy heap payload missing kind byte".to_string(),
233 ));
234 }
235 body[0]
236 }
237 ResourceManagerId::Index => {
238 if body.is_empty() {
239 return Err(QuillSQLError::Internal(
240 "Legacy index payload missing kind byte".to_string(),
241 ));
242 }
243 body[0]
244 }
245 ResourceManagerId::Checkpoint | ResourceManagerId::Clr => 0,
246 };
247
248 Ok((
249 WalFrame {
250 lsn,
251 prev_lsn: lsn.saturating_sub(1),
252 rmid,
253 info,
254 body: match rmid {
255 ResourceManagerId::Page
256 | ResourceManagerId::Checkpoint
257 | ResourceManagerId::Clr => body.to_vec(),
258 ResourceManagerId::Transaction => body[..8].to_vec(),
259 ResourceManagerId::Heap => body[1..].to_vec(),
260 ResourceManagerId::Index => body[1..].to_vec(),
261 },
262 },
263 total_len,
264 ))
265}
266
267pub fn decode_payload(frame: &WalFrame) -> QuillSQLResult<WalRecordPayload> {
268 match frame.rmid {
269 ResourceManagerId::Page => match frame.info {
270 0 => Ok(WalRecordPayload::PageWrite(decode_page_write(&frame.body)?)),
271 other => Err(QuillSQLError::Internal(format!(
272 "Unknown Page info kind: {}",
273 other
274 ))),
275 },
276 ResourceManagerId::Transaction => Ok(WalRecordPayload::Transaction(decode_transaction(
277 &frame.body,
278 frame.info,
279 )?)),
280 ResourceManagerId::Heap => Ok(WalRecordPayload::Heap(decode_heap_record(
281 &frame.body,
282 frame.info,
283 )?)),
284 ResourceManagerId::Index => Ok(WalRecordPayload::Index(decode_index_record(
285 &frame.body,
286 frame.info,
287 )?)),
288 ResourceManagerId::Checkpoint => Ok(WalRecordPayload::Checkpoint(decode_checkpoint(
289 &frame.body,
290 )?)),
291 ResourceManagerId::Clr => Ok(WalRecordPayload::Clr(decode_clr(&frame.body)?)),
292 }
293}
294
295pub fn heap_record_kind_to_info(kind: HeapRecordKind) -> u8 {
296 kind as u8
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use crate::recovery::wal_record::WalRecordPayload;
303 use crate::storage::heap::wal_codec::{
304 HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, RelationIdent, TupleMetaRepr,
305 };
306 use crate::transaction::INVALID_COMMAND_ID;
307
308 #[test]
309 fn encode_decode_page_write() {
310 let payload = WalRecordPayload::PageWrite(PageWritePayload {
311 page_id: 42,
312 prev_page_lsn: 7,
313 page_image: vec![1, 2, 3, 4, 5],
314 });
315 let bytes = payload.encode(100, 99);
316 let (frame, len) = decode_frame(&bytes).unwrap();
317 assert_eq!(len, bytes.len());
318 assert_eq!(frame.lsn, 100);
319 assert_eq!(frame.prev_lsn, 99);
320 assert_eq!(frame.rmid, ResourceManagerId::Page);
321 assert_eq!(frame.info, 0);
322 let decoded = decode_payload(&frame).unwrap();
323 match decoded {
324 WalRecordPayload::PageWrite(body) => {
325 assert_eq!(body.page_id, 42);
326 assert_eq!(body.prev_page_lsn, 7);
327 assert_eq!(body.page_image, vec![1, 2, 3, 4, 5]);
328 }
329 other => panic!("unexpected payload variant: {:?}", other),
330 }
331 }
332
333 #[test]
334 fn encode_decode_transaction() {
335 let payload = WalRecordPayload::Transaction(TransactionPayload {
336 marker: TransactionRecordKind::Commit,
337 txn_id: 99,
338 });
339 let bytes = payload.encode(300, 200);
340 let (frame, len) = decode_frame(&bytes).unwrap();
341 assert_eq!(len, bytes.len());
342 assert_eq!(frame.rmid, ResourceManagerId::Transaction);
343 assert_eq!(frame.info, TransactionRecordKind::Commit as u8);
344 let decoded = decode_payload(&frame).unwrap();
345 match decoded {
346 WalRecordPayload::Transaction(body) => {
347 assert_eq!(body.marker, TransactionRecordKind::Commit);
348 assert_eq!(body.txn_id, 99);
349 }
350 other => panic!("unexpected payload variant: {:?}", other),
351 }
352 }
353
354 #[test]
355 fn encode_decode_heap_insert() {
356 let payload = WalRecordPayload::Heap(HeapRecordPayload::Insert(HeapInsertPayload {
357 relation: RelationIdent { root_page_id: 10 },
358 page_id: 12,
359 slot_id: 2,
360 op_txn_id: 1,
361 tuple_meta: TupleMetaRepr {
362 insert_txn_id: 1,
363 insert_cid: 0,
364 delete_txn_id: 0,
365 delete_cid: INVALID_COMMAND_ID,
366 is_deleted: false,
367 next_version: None,
368 prev_version: None,
369 },
370 tuple_data: vec![7, 8, 9],
371 }));
372 let bytes = payload.encode(123, 100);
373 let (frame, len) = decode_frame(&bytes).unwrap();
374 assert_eq!(len, bytes.len());
375 assert_eq!(frame.rmid, ResourceManagerId::Heap);
376 assert_eq!(frame.info, HeapRecordKind::Insert as u8);
377 let decoded = decode_payload(&frame).unwrap();
378 match decoded {
379 WalRecordPayload::Heap(HeapRecordPayload::Insert(body)) => {
380 assert_eq!(body.relation.root_page_id, 10);
381 assert_eq!(body.page_id, 12);
382 assert_eq!(body.slot_id, 2);
383 assert_eq!(body.tuple_data, vec![7, 8, 9]);
384 }
385 other => panic!("unexpected payload variant: {:?}", other),
386 }
387 }
388
389 #[test]
390 fn encode_decode_heap_delete() {
391 let payload = WalRecordPayload::Heap(HeapRecordPayload::Delete(HeapDeletePayload {
392 relation: RelationIdent { root_page_id: 7 },
393 page_id: 3,
394 slot_id: 1,
395 op_txn_id: 4,
396 new_tuple_meta: TupleMetaRepr {
397 insert_txn_id: 2,
398 insert_cid: 0,
399 delete_txn_id: 4,
400 delete_cid: 0,
401 is_deleted: true,
402 next_version: None,
403 prev_version: None,
404 },
405 old_tuple_meta: TupleMetaRepr {
406 insert_txn_id: 2,
407 insert_cid: 0,
408 delete_txn_id: 4,
409 delete_cid: 0,
410 is_deleted: true,
411 next_version: None,
412 prev_version: None,
413 },
414 old_tuple_data: vec![1, 2, 3],
415 }));
416 let bytes = payload.encode(80, 60);
417 let (frame, len) = decode_frame(&bytes).unwrap();
418 assert_eq!(len, bytes.len());
419 assert_eq!(frame.rmid, ResourceManagerId::Heap);
420 assert_eq!(frame.info, HeapRecordKind::Delete as u8);
421 let decoded = decode_payload(&frame).unwrap();
422 match decoded {
423 WalRecordPayload::Heap(HeapRecordPayload::Delete(body)) => {
424 assert_eq!(body.relation.root_page_id, 7);
425 assert!(body.new_tuple_meta.is_deleted);
426 assert!(body.old_tuple_meta.is_deleted);
427 assert_eq!(body.old_tuple_data, vec![1, 2, 3]);
428 }
429 other => panic!("unexpected payload variant: {:?}", other),
430 }
431 }
432
433 #[test]
434 fn encode_decode_checkpoint() {
435 let payload = WalRecordPayload::Checkpoint(CheckpointPayload {
436 last_lsn: 123,
437 dirty_pages: vec![10, 11, 12],
438 active_transactions: vec![1, 2, 3],
439 dpt: vec![(10, 1000), (11, 1100)],
440 });
441 let bytes = payload.encode(999, 900);
442 let (frame, len) = decode_frame(&bytes).unwrap();
443 assert_eq!(len, bytes.len());
444 assert_eq!(frame.rmid, ResourceManagerId::Checkpoint);
445 let decoded = decode_payload(&frame).unwrap();
446 match decoded {
447 WalRecordPayload::Checkpoint(body) => {
448 assert_eq!(body.last_lsn, 123);
449 assert_eq!(body.dirty_pages, vec![10, 11, 12]);
450 assert_eq!(body.active_transactions, vec![1, 2, 3]);
451 assert_eq!(body.dpt, vec![(10, 1000), (11, 1100)]);
452 }
453 other => panic!("unexpected payload variant: {:?}", other),
454 }
455 }
456
457 #[test]
458 fn encode_decode_clr() {
459 let clr = ClrPayload {
460 txn_id: 11,
461 undone_lsn: 1234,
462 undo_next_lsn: 0,
463 };
464 let payload = WalRecordPayload::Clr(clr.clone());
465 let bytes = payload.encode(200, 150);
466 let (frame, len) = decode_frame(&bytes).unwrap();
467 assert_eq!(len, bytes.len());
468 assert_eq!(frame.rmid, ResourceManagerId::Clr);
469 let decoded = decode_payload(&frame).unwrap();
470 match decoded {
471 WalRecordPayload::Clr(body) => {
472 assert_eq!(body.txn_id, clr.txn_id);
473 assert_eq!(body.undone_lsn, clr.undone_lsn);
474 assert_eq!(body.undo_next_lsn, clr.undo_next_lsn);
475 }
476 other => panic!("unexpected payload variant: {:?}", other),
477 }
478 }
479}