1use crate::storage::engine::crc32::{crc32, crc32_update};
2use std::io::{self, Read};
3
4pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
6
7pub const WAL_VERSION: u8 = 2;
9
10const COMPRESS_THRESHOLD: usize = 256;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u8)]
17pub enum Compression {
18 None = 0,
19 Zstd = 1,
20}
21
22impl Compression {
23 fn from_u8(v: u8) -> Option<Self> {
24 match v {
25 0 => Some(Compression::None),
26 1 => Some(Compression::Zstd),
27 _ => None,
28 }
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34#[repr(u8)]
35pub enum RecordType {
36 Begin = 1,
37 Commit = 2,
38 Rollback = 3,
39 PageWrite = 4,
42 Checkpoint = 5,
43 PageWriteCompressed = 6,
52 TxCommitBatch = 7,
59 FullPageImage = 8,
70}
71
72impl RecordType {
73 pub fn from_u8(v: u8) -> Option<Self> {
74 match v {
75 1 => Some(RecordType::Begin),
76 2 => Some(RecordType::Commit),
77 3 => Some(RecordType::Rollback),
78 4 => Some(RecordType::PageWrite),
79 5 => Some(RecordType::Checkpoint),
80 6 => Some(RecordType::PageWriteCompressed),
81 7 => Some(RecordType::TxCommitBatch),
82 8 => Some(RecordType::FullPageImage),
83 _ => None,
84 }
85 }
86}
87
88#[derive(Debug, Clone, PartialEq)]
90pub enum WalRecord {
91 Begin { tx_id: u64 },
93 Commit { tx_id: u64 },
95 Rollback { tx_id: u64 },
97 PageWrite {
100 tx_id: u64,
101 page_id: u32,
102 data: Vec<u8>,
103 },
104 TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
107 FullPageImage {
111 tx_id: u64,
112 page_id: u32,
113 ckpt_epoch: u64,
114 data: Vec<u8>,
115 },
116 Checkpoint { lsn: u64 },
118}
119
120impl WalRecord {
121 pub fn encode(&self) -> Vec<u8> {
127 let mut buf = Vec::new();
128
129 match self {
144 WalRecord::Begin { tx_id } => {
145 buf.push(RecordType::Begin as u8);
146 buf.extend_from_slice(&tx_id.to_le_bytes());
147 }
148 WalRecord::Commit { tx_id } => {
149 buf.push(RecordType::Commit as u8);
150 buf.extend_from_slice(&tx_id.to_le_bytes());
151 }
152 WalRecord::Rollback { tx_id } => {
153 buf.push(RecordType::Rollback as u8);
154 buf.extend_from_slice(&tx_id.to_le_bytes());
155 }
156 WalRecord::PageWrite {
157 tx_id,
158 page_id,
159 data,
160 } => {
161 if data.len() >= COMPRESS_THRESHOLD {
162 if let Ok(compressed) =
164 zstd::bulk::compress(data.as_slice(), 3)
165 {
166 if compressed.len() < data.len() {
167 buf.push(RecordType::PageWriteCompressed as u8);
169 buf.extend_from_slice(&tx_id.to_le_bytes());
170 buf.extend_from_slice(&page_id.to_le_bytes());
171 buf.push(Compression::Zstd as u8);
172 buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
174 buf.extend_from_slice(&compressed);
175 let checksum = crc32(&buf);
176 buf.extend_from_slice(&checksum.to_le_bytes());
177 return buf;
178 }
179 }
180 }
181 buf.push(RecordType::PageWrite as u8);
183 buf.extend_from_slice(&tx_id.to_le_bytes());
184 buf.extend_from_slice(&page_id.to_le_bytes());
185 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
186 buf.extend_from_slice(data);
187 }
188 WalRecord::TxCommitBatch { tx_id, actions } => {
189 buf.push(RecordType::TxCommitBatch as u8);
190 buf.extend_from_slice(&tx_id.to_le_bytes());
191 buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
192 for action in actions {
193 buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
194 buf.extend_from_slice(action);
195 }
196 }
197 WalRecord::FullPageImage {
198 tx_id,
199 page_id,
200 ckpt_epoch,
201 data,
202 } => {
203 buf.push(RecordType::FullPageImage as u8);
204 buf.extend_from_slice(&tx_id.to_le_bytes());
205 buf.extend_from_slice(&page_id.to_le_bytes());
206 buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
207 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
208 buf.extend_from_slice(data);
209 }
210 WalRecord::Checkpoint { lsn } => {
211 buf.push(RecordType::Checkpoint as u8);
212 buf.extend_from_slice(&lsn.to_le_bytes());
213 }
214 }
215
216 let checksum = crc32(&buf);
218 buf.extend_from_slice(&checksum.to_le_bytes());
219
220 buf
221 }
222
223 pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
228 let mut type_buf = [0u8; 1];
230 match reader.read_exact(&mut type_buf) {
231 Ok(_) => (),
232 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
233 Err(e) => return Err(e),
234 };
235
236 let record_type = RecordType::from_u8(type_buf[0])
237 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
238
239 let mut running_crc = crc32_update(0, &type_buf);
241
242 let record = match record_type {
243 RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
244 let mut buf = [0u8; 8];
245 reader.read_exact(&mut buf)?;
246 running_crc = crc32_update(running_crc, &buf);
247 let tx_id = u64::from_le_bytes(buf);
248
249 match record_type {
250 RecordType::Begin => WalRecord::Begin { tx_id },
251 RecordType::Commit => WalRecord::Commit { tx_id },
252 RecordType::Rollback => WalRecord::Rollback { tx_id },
253 _ => unreachable!(),
254 }
255 }
256 RecordType::PageWrite => {
257 let mut tx_buf = [0u8; 8];
259 reader.read_exact(&mut tx_buf)?;
260 running_crc = crc32_update(running_crc, &tx_buf);
261 let tx_id = u64::from_le_bytes(tx_buf);
262
263 let mut page_buf = [0u8; 4];
265 reader.read_exact(&mut page_buf)?;
266 running_crc = crc32_update(running_crc, &page_buf);
267 let page_id = u32::from_le_bytes(page_buf);
268
269 let mut len_buf = [0u8; 4];
271 reader.read_exact(&mut len_buf)?;
272 running_crc = crc32_update(running_crc, &len_buf);
273 let len = u32::from_le_bytes(len_buf) as usize;
274
275 let mut data = vec![0u8; len];
277 reader.read_exact(&mut data)?;
278 running_crc = crc32_update(running_crc, &data);
279
280 WalRecord::PageWrite {
281 tx_id,
282 page_id,
283 data,
284 }
285 }
286 RecordType::PageWriteCompressed => {
287 let mut tx_buf = [0u8; 8];
289 reader.read_exact(&mut tx_buf)?;
290 running_crc = crc32_update(running_crc, &tx_buf);
291 let tx_id = u64::from_le_bytes(tx_buf);
292
293 let mut page_buf = [0u8; 4];
295 reader.read_exact(&mut page_buf)?;
296 running_crc = crc32_update(running_crc, &page_buf);
297 let page_id = u32::from_le_bytes(page_buf);
298
299 let mut comp_buf = [0u8; 1];
301 reader.read_exact(&mut comp_buf)?;
302 running_crc = crc32_update(running_crc, &comp_buf);
303 let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
304 io::Error::new(
305 io::ErrorKind::InvalidData,
306 format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
307 )
308 })?;
309
310 let mut orig_len_buf = [0u8; 4];
312 reader.read_exact(&mut orig_len_buf)?;
313 running_crc = crc32_update(running_crc, &orig_len_buf);
314 let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
315
316 let mut len_buf = [0u8; 4];
318 reader.read_exact(&mut len_buf)?;
319 running_crc = crc32_update(running_crc, &len_buf);
320 let len = u32::from_le_bytes(len_buf) as usize;
321
322 let mut compressed = vec![0u8; len];
324 reader.read_exact(&mut compressed)?;
325 running_crc = crc32_update(running_crc, &compressed);
326
327 let data = match compression {
329 Compression::Zstd => {
330 let mut out = vec![0u8; orig_len];
331 zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
332 io::Error::new(
333 io::ErrorKind::InvalidData,
334 format!("WAL zstd decompress failed: {e}"),
335 )
336 })?;
337 out
338 }
339 Compression::None => compressed,
340 };
341
342 WalRecord::PageWrite {
343 tx_id,
344 page_id,
345 data,
346 }
347 }
348 RecordType::TxCommitBatch => {
349 let mut tx_buf = [0u8; 8];
350 reader.read_exact(&mut tx_buf)?;
351 running_crc = crc32_update(running_crc, &tx_buf);
352 let tx_id = u64::from_le_bytes(tx_buf);
353
354 let mut count_buf = [0u8; 4];
355 reader.read_exact(&mut count_buf)?;
356 running_crc = crc32_update(running_crc, &count_buf);
357 let count = u32::from_le_bytes(count_buf) as usize;
358
359 let mut actions = Vec::with_capacity(count);
360 for _ in 0..count {
361 let mut len_buf = [0u8; 4];
362 reader.read_exact(&mut len_buf)?;
363 running_crc = crc32_update(running_crc, &len_buf);
364 let len = u32::from_le_bytes(len_buf) as usize;
365
366 let mut action = vec![0u8; len];
367 reader.read_exact(&mut action)?;
368 running_crc = crc32_update(running_crc, &action);
369 actions.push(action);
370 }
371
372 WalRecord::TxCommitBatch { tx_id, actions }
373 }
374 RecordType::FullPageImage => {
375 let mut tx_buf = [0u8; 8];
376 reader.read_exact(&mut tx_buf)?;
377 running_crc = crc32_update(running_crc, &tx_buf);
378 let tx_id = u64::from_le_bytes(tx_buf);
379
380 let mut page_buf = [0u8; 4];
381 reader.read_exact(&mut page_buf)?;
382 running_crc = crc32_update(running_crc, &page_buf);
383 let page_id = u32::from_le_bytes(page_buf);
384
385 let mut epoch_buf = [0u8; 8];
386 reader.read_exact(&mut epoch_buf)?;
387 running_crc = crc32_update(running_crc, &epoch_buf);
388 let ckpt_epoch = u64::from_le_bytes(epoch_buf);
389
390 let mut len_buf = [0u8; 4];
391 reader.read_exact(&mut len_buf)?;
392 running_crc = crc32_update(running_crc, &len_buf);
393 let len = u32::from_le_bytes(len_buf) as usize;
394
395 let mut data = vec![0u8; len];
396 reader.read_exact(&mut data)?;
397 running_crc = crc32_update(running_crc, &data);
398
399 WalRecord::FullPageImage {
400 tx_id,
401 page_id,
402 ckpt_epoch,
403 data,
404 }
405 }
406 RecordType::Checkpoint => {
407 let mut buf = [0u8; 8];
408 reader.read_exact(&mut buf)?;
409 running_crc = crc32_update(running_crc, &buf);
410 let lsn = u64::from_le_bytes(buf);
411 WalRecord::Checkpoint { lsn }
412 }
413 };
414
415 let mut crc_buf = [0u8; 4];
417 reader.read_exact(&mut crc_buf)?;
418 let stored_crc = u32::from_le_bytes(crc_buf);
419
420 if running_crc != stored_crc {
421 return Err(io::Error::new(
422 io::ErrorKind::InvalidData,
423 "WAL record checksum mismatch",
424 ));
425 }
426
427 Ok(Some(record))
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use std::io::Cursor;
435
436 #[test]
439 fn test_record_type_from_u8() {
440 assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
441 assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
442 assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
443 assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
444 assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
445 assert_eq!(
446 RecordType::from_u8(6),
447 Some(RecordType::PageWriteCompressed)
448 );
449 assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
450 assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
451 }
452
453 #[test]
454 fn test_record_type_invalid() {
455 assert_eq!(RecordType::from_u8(0), None);
456 assert_eq!(RecordType::from_u8(9), None);
457 assert_eq!(RecordType::from_u8(255), None);
458 }
459
460 #[test]
463 fn test_encode_begin() {
464 let record = WalRecord::Begin { tx_id: 12345 };
465 let encoded = record.encode();
466
467 assert_eq!(encoded.len(), 13);
469 assert_eq!(encoded[0], RecordType::Begin as u8);
470 }
471
472 #[test]
473 fn test_encode_commit() {
474 let record = WalRecord::Commit { tx_id: 99999 };
475 let encoded = record.encode();
476
477 assert_eq!(encoded.len(), 13);
478 assert_eq!(encoded[0], RecordType::Commit as u8);
479 }
480
481 #[test]
482 fn test_encode_rollback() {
483 let record = WalRecord::Rollback { tx_id: 54321 };
484 let encoded = record.encode();
485
486 assert_eq!(encoded.len(), 13);
487 assert_eq!(encoded[0], RecordType::Rollback as u8);
488 }
489
490 #[test]
491 fn test_encode_checkpoint() {
492 let record = WalRecord::Checkpoint { lsn: 1000000 };
493 let encoded = record.encode();
494
495 assert_eq!(encoded.len(), 13);
496 assert_eq!(encoded[0], RecordType::Checkpoint as u8);
497 }
498
499 #[test]
500 fn test_encode_page_write_small() {
501 let data = vec![1, 2, 3, 4, 5];
503 let record = WalRecord::PageWrite {
504 tx_id: 100,
505 page_id: 42,
506 data: data.clone(),
507 };
508 let encoded = record.encode();
509
510 assert_eq!(encoded.len(), 26);
512 assert_eq!(encoded[0], RecordType::PageWrite as u8);
513 }
514
515 #[test]
516 fn test_encode_page_write_empty_data() {
517 let record = WalRecord::PageWrite {
518 tx_id: 1,
519 page_id: 0,
520 data: vec![],
521 };
522 let encoded = record.encode();
523
524 assert_eq!(encoded.len(), 21);
526 }
527
528 #[test]
529 fn test_encode_tx_commit_batch() {
530 let record = WalRecord::TxCommitBatch {
531 tx_id: 7,
532 actions: vec![b"insert".to_vec(), b"update".to_vec()],
533 };
534 let encoded = record.encode();
535
536 assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
537 }
538
539 #[test]
542 fn test_read_begin_roundtrip() {
543 let original = WalRecord::Begin { tx_id: 42 };
544 let encoded = original.encode();
545
546 let mut cursor = Cursor::new(encoded);
547 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
548
549 assert_eq!(decoded, original);
550 }
551
552 #[test]
553 fn test_read_commit_roundtrip() {
554 let original = WalRecord::Commit { tx_id: 999 };
555 let encoded = original.encode();
556
557 let mut cursor = Cursor::new(encoded);
558 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
559
560 assert_eq!(decoded, original);
561 }
562
563 #[test]
564 fn test_read_rollback_roundtrip() {
565 let original = WalRecord::Rollback { tx_id: 777 };
566 let encoded = original.encode();
567
568 let mut cursor = Cursor::new(encoded);
569 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
570
571 assert_eq!(decoded, original);
572 }
573
574 #[test]
575 fn test_read_checkpoint_roundtrip() {
576 let original = WalRecord::Checkpoint { lsn: 123456789 };
577 let encoded = original.encode();
578
579 let mut cursor = Cursor::new(encoded);
580 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
581
582 assert_eq!(decoded, original);
583 }
584
585 #[test]
586 fn test_read_page_write_roundtrip() {
587 let original = WalRecord::PageWrite {
588 tx_id: 50,
589 page_id: 100,
590 data: vec![10, 20, 30, 40, 50, 60, 70, 80],
591 };
592 let encoded = original.encode();
593
594 let mut cursor = Cursor::new(encoded);
595 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
596
597 assert_eq!(decoded, original);
598 }
599
600 #[test]
601 fn test_read_tx_commit_batch_roundtrip() {
602 let original = WalRecord::TxCommitBatch {
603 tx_id: 42,
604 actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
605 };
606 let encoded = original.encode();
607
608 let mut cursor = Cursor::new(encoded);
609 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
610
611 assert_eq!(decoded, original);
612 }
613
614 #[test]
615 fn test_read_page_write_large_data() {
616 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
618 let original = WalRecord::PageWrite {
619 tx_id: 1,
620 page_id: 0,
621 data,
622 };
623 let encoded = original.encode();
624
625 let mut cursor = Cursor::new(encoded);
626 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
627
628 assert_eq!(decoded, original);
630 }
631
632 #[test]
633 fn page_write_compressed_roundtrip() {
634 let data = vec![0xABu8; 1024];
636 let record = WalRecord::PageWrite {
637 tx_id: 7,
638 page_id: 3,
639 data: data.clone(),
640 };
641 let encoded = record.encode();
642
643 assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
645
646 let mut cursor = Cursor::new(encoded);
648 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
649 assert_eq!(
650 decoded,
651 WalRecord::PageWrite {
652 tx_id: 7,
653 page_id: 3,
654 data
655 }
656 );
657 }
658
659 #[test]
660 fn full_page_image_roundtrip() {
661 let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
662 let original = WalRecord::FullPageImage {
663 tx_id: 11,
664 page_id: 9,
665 ckpt_epoch: 42,
666 data: data.clone(),
667 };
668 let encoded = original.encode();
669 assert_eq!(encoded[0], RecordType::FullPageImage as u8);
670
671 let mut cursor = Cursor::new(encoded);
672 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
673 assert_eq!(decoded, original);
674 }
675
676 #[test]
677 fn full_page_image_checksum_mismatch_detected() {
678 let original = WalRecord::FullPageImage {
679 tx_id: 1,
680 page_id: 2,
681 ckpt_epoch: 3,
682 data: vec![0xAA; 32],
683 };
684 let mut encoded = original.encode();
685 let mid = encoded.len() / 2;
686 encoded[mid] ^= 0xFF;
687 let mut cursor = Cursor::new(encoded);
688 assert!(WalRecord::read(&mut cursor).is_err());
689 }
690
691 #[test]
692 fn test_read_eof() {
693 let mut cursor = Cursor::new(Vec::<u8>::new());
694 let result = WalRecord::read(&mut cursor).unwrap();
695 assert!(result.is_none());
696 }
697
698 #[test]
699 fn test_read_invalid_record_type() {
700 let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let mut cursor = Cursor::new(buf);
702 let result = WalRecord::read(&mut cursor);
703 assert!(result.is_err());
704 }
705
706 #[test]
707 fn test_read_checksum_mismatch() {
708 let record = WalRecord::Begin { tx_id: 42 };
709 let mut encoded = record.encode();
710
711 let len = encoded.len();
713 encoded[len - 1] ^= 0xFF;
714
715 let mut cursor = Cursor::new(encoded);
716 let result = WalRecord::read(&mut cursor);
717 assert!(result.is_err());
718 }
719
720 #[test]
721 fn test_read_data_corruption() {
722 let record = WalRecord::PageWrite {
723 tx_id: 1,
724 page_id: 2,
725 data: vec![1, 2, 3, 4],
726 };
727 let mut encoded = record.encode();
728
729 encoded[15] ^= 0xFF;
731
732 let mut cursor = Cursor::new(encoded);
733 let result = WalRecord::read(&mut cursor);
734 assert!(result.is_err()); }
736
737 #[test]
740 fn test_multiple_records_sequential() {
741 let records = vec![
742 WalRecord::Begin { tx_id: 1 },
743 WalRecord::PageWrite {
744 tx_id: 1,
745 page_id: 10,
746 data: vec![1, 2, 3],
747 },
748 WalRecord::PageWrite {
749 tx_id: 1,
750 page_id: 20,
751 data: vec![4, 5, 6],
752 },
753 WalRecord::Commit { tx_id: 1 },
754 WalRecord::Checkpoint { lsn: 100 },
755 ];
756
757 let mut buf = Vec::new();
759 for r in &records {
760 buf.extend_from_slice(&r.encode());
761 }
762
763 let mut cursor = Cursor::new(buf);
765 for expected in &records {
766 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
767 assert_eq!(&decoded, expected);
768 }
769
770 assert!(WalRecord::read(&mut cursor).unwrap().is_none());
772 }
773
774 #[test]
777 fn test_wal_magic() {
778 assert_eq!(WAL_MAGIC, b"RDBW");
779 }
780
781 #[test]
782 fn test_wal_version() {
783 assert_eq!(WAL_VERSION, 2);
784 }
785}