1use crate::storage::engine::crc32::{crc32, crc32_update};
2use std::io::{self, Read};
3
4pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
6
7pub const WAL_VERSION: u8 = 2;
9
10const COMPRESS_THRESHOLD: usize = 256;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u8)]
17pub enum Compression {
18 None = 0,
19 Zstd = 1,
20}
21
22impl Compression {
23 fn from_u8(v: u8) -> Option<Self> {
24 match v {
25 0 => Some(Compression::None),
26 1 => Some(Compression::Zstd),
27 _ => None,
28 }
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34#[repr(u8)]
35pub enum RecordType {
36 Begin = 1,
37 Commit = 2,
38 Rollback = 3,
39 PageWrite = 4,
42 Checkpoint = 5,
43 PageWriteCompressed = 6,
52 TxCommitBatch = 7,
59 FullPageImage = 8,
70 VectorInsert = 9,
72}
73
74impl RecordType {
75 pub fn from_u8(v: u8) -> Option<Self> {
76 match v {
77 1 => Some(RecordType::Begin),
78 2 => Some(RecordType::Commit),
79 3 => Some(RecordType::Rollback),
80 4 => Some(RecordType::PageWrite),
81 5 => Some(RecordType::Checkpoint),
82 6 => Some(RecordType::PageWriteCompressed),
83 7 => Some(RecordType::TxCommitBatch),
84 8 => Some(RecordType::FullPageImage),
85 9 => Some(RecordType::VectorInsert),
86 _ => None,
87 }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq)]
93pub enum WalRecord {
94 Begin { tx_id: u64 },
96 Commit { tx_id: u64 },
98 Rollback { tx_id: u64 },
100 PageWrite {
103 tx_id: u64,
104 page_id: u32,
105 data: Vec<u8>,
106 },
107 TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
110 FullPageImage {
114 tx_id: u64,
115 page_id: u32,
116 ckpt_epoch: u64,
117 data: Vec<u8>,
118 },
119 VectorInsert {
122 collection: String,
123 entity_id: u64,
124 vector: Vec<f32>,
125 },
126 Checkpoint { lsn: u64 },
128}
129
130impl WalRecord {
131 pub fn encode(&self) -> Vec<u8> {
137 let mut buf = Vec::new();
138
139 match self {
154 WalRecord::Begin { tx_id } => {
155 buf.push(RecordType::Begin as u8);
156 buf.extend_from_slice(&tx_id.to_le_bytes());
157 }
158 WalRecord::Commit { tx_id } => {
159 buf.push(RecordType::Commit as u8);
160 buf.extend_from_slice(&tx_id.to_le_bytes());
161 }
162 WalRecord::Rollback { tx_id } => {
163 buf.push(RecordType::Rollback as u8);
164 buf.extend_from_slice(&tx_id.to_le_bytes());
165 }
166 WalRecord::PageWrite {
167 tx_id,
168 page_id,
169 data,
170 } => {
171 if data.len() >= COMPRESS_THRESHOLD {
172 if let Ok(compressed) =
174 zstd::bulk::compress(data.as_slice(), 3)
175 {
176 if compressed.len() < data.len() {
177 buf.push(RecordType::PageWriteCompressed as u8);
179 buf.extend_from_slice(&tx_id.to_le_bytes());
180 buf.extend_from_slice(&page_id.to_le_bytes());
181 buf.push(Compression::Zstd as u8);
182 buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
184 buf.extend_from_slice(&compressed);
185 let checksum = crc32(&buf);
186 buf.extend_from_slice(&checksum.to_le_bytes());
187 return buf;
188 }
189 }
190 }
191 buf.push(RecordType::PageWrite as u8);
193 buf.extend_from_slice(&tx_id.to_le_bytes());
194 buf.extend_from_slice(&page_id.to_le_bytes());
195 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
196 buf.extend_from_slice(data);
197 }
198 WalRecord::TxCommitBatch { tx_id, actions } => {
199 buf.push(RecordType::TxCommitBatch as u8);
200 buf.extend_from_slice(&tx_id.to_le_bytes());
201 buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
202 for action in actions {
203 buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
204 buf.extend_from_slice(action);
205 }
206 }
207 WalRecord::FullPageImage {
208 tx_id,
209 page_id,
210 ckpt_epoch,
211 data,
212 } => {
213 buf.push(RecordType::FullPageImage as u8);
214 buf.extend_from_slice(&tx_id.to_le_bytes());
215 buf.extend_from_slice(&page_id.to_le_bytes());
216 buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
217 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
218 buf.extend_from_slice(data);
219 }
220 WalRecord::VectorInsert {
221 collection,
222 entity_id,
223 vector,
224 } => {
225 buf.push(RecordType::VectorInsert as u8);
226 buf.extend_from_slice(&(collection.len() as u32).to_le_bytes());
227 buf.extend_from_slice(collection.as_bytes());
228 buf.extend_from_slice(&entity_id.to_le_bytes());
229 buf.extend_from_slice(&(vector.len() as u32).to_le_bytes());
230 for value in vector {
231 buf.extend_from_slice(&value.to_le_bytes());
232 }
233 }
234 WalRecord::Checkpoint { lsn } => {
235 buf.push(RecordType::Checkpoint as u8);
236 buf.extend_from_slice(&lsn.to_le_bytes());
237 }
238 }
239
240 let checksum = crc32(&buf);
242 buf.extend_from_slice(&checksum.to_le_bytes());
243
244 buf
245 }
246
247 pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
252 let mut type_buf = [0u8; 1];
254 match reader.read_exact(&mut type_buf) {
255 Ok(_) => (),
256 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
257 Err(e) => return Err(e),
258 };
259
260 let record_type = RecordType::from_u8(type_buf[0])
261 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
262
263 let mut running_crc = crc32_update(0, &type_buf);
265
266 let record = match record_type {
267 RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
268 let mut buf = [0u8; 8];
269 reader.read_exact(&mut buf)?;
270 running_crc = crc32_update(running_crc, &buf);
271 let tx_id = u64::from_le_bytes(buf);
272
273 match record_type {
274 RecordType::Begin => WalRecord::Begin { tx_id },
275 RecordType::Commit => WalRecord::Commit { tx_id },
276 RecordType::Rollback => WalRecord::Rollback { tx_id },
277 _ => unreachable!(),
278 }
279 }
280 RecordType::PageWrite => {
281 let mut tx_buf = [0u8; 8];
283 reader.read_exact(&mut tx_buf)?;
284 running_crc = crc32_update(running_crc, &tx_buf);
285 let tx_id = u64::from_le_bytes(tx_buf);
286
287 let mut page_buf = [0u8; 4];
289 reader.read_exact(&mut page_buf)?;
290 running_crc = crc32_update(running_crc, &page_buf);
291 let page_id = u32::from_le_bytes(page_buf);
292
293 let mut len_buf = [0u8; 4];
295 reader.read_exact(&mut len_buf)?;
296 running_crc = crc32_update(running_crc, &len_buf);
297 let len = u32::from_le_bytes(len_buf) as usize;
298
299 let mut data = vec![0u8; len];
301 reader.read_exact(&mut data)?;
302 running_crc = crc32_update(running_crc, &data);
303
304 WalRecord::PageWrite {
305 tx_id,
306 page_id,
307 data,
308 }
309 }
310 RecordType::PageWriteCompressed => {
311 let mut tx_buf = [0u8; 8];
313 reader.read_exact(&mut tx_buf)?;
314 running_crc = crc32_update(running_crc, &tx_buf);
315 let tx_id = u64::from_le_bytes(tx_buf);
316
317 let mut page_buf = [0u8; 4];
319 reader.read_exact(&mut page_buf)?;
320 running_crc = crc32_update(running_crc, &page_buf);
321 let page_id = u32::from_le_bytes(page_buf);
322
323 let mut comp_buf = [0u8; 1];
325 reader.read_exact(&mut comp_buf)?;
326 running_crc = crc32_update(running_crc, &comp_buf);
327 let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
328 io::Error::new(
329 io::ErrorKind::InvalidData,
330 format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
331 )
332 })?;
333
334 let mut orig_len_buf = [0u8; 4];
336 reader.read_exact(&mut orig_len_buf)?;
337 running_crc = crc32_update(running_crc, &orig_len_buf);
338 let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
339
340 let mut len_buf = [0u8; 4];
342 reader.read_exact(&mut len_buf)?;
343 running_crc = crc32_update(running_crc, &len_buf);
344 let len = u32::from_le_bytes(len_buf) as usize;
345
346 let mut compressed = vec![0u8; len];
348 reader.read_exact(&mut compressed)?;
349 running_crc = crc32_update(running_crc, &compressed);
350
351 let data = match compression {
353 Compression::Zstd => {
354 let mut out = vec![0u8; orig_len];
355 zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
356 io::Error::new(
357 io::ErrorKind::InvalidData,
358 format!("WAL zstd decompress failed: {e}"),
359 )
360 })?;
361 out
362 }
363 Compression::None => compressed,
364 };
365
366 WalRecord::PageWrite {
367 tx_id,
368 page_id,
369 data,
370 }
371 }
372 RecordType::TxCommitBatch => {
373 let mut tx_buf = [0u8; 8];
374 reader.read_exact(&mut tx_buf)?;
375 running_crc = crc32_update(running_crc, &tx_buf);
376 let tx_id = u64::from_le_bytes(tx_buf);
377
378 let mut count_buf = [0u8; 4];
379 reader.read_exact(&mut count_buf)?;
380 running_crc = crc32_update(running_crc, &count_buf);
381 let count = u32::from_le_bytes(count_buf) as usize;
382
383 let mut actions = Vec::with_capacity(count);
384 for _ in 0..count {
385 let mut len_buf = [0u8; 4];
386 reader.read_exact(&mut len_buf)?;
387 running_crc = crc32_update(running_crc, &len_buf);
388 let len = u32::from_le_bytes(len_buf) as usize;
389
390 let mut action = vec![0u8; len];
391 reader.read_exact(&mut action)?;
392 running_crc = crc32_update(running_crc, &action);
393 actions.push(action);
394 }
395
396 WalRecord::TxCommitBatch { tx_id, actions }
397 }
398 RecordType::VectorInsert => {
399 let mut len_buf = [0u8; 4];
400 reader.read_exact(&mut len_buf)?;
401 running_crc = crc32_update(running_crc, &len_buf);
402 let collection_len = u32::from_le_bytes(len_buf) as usize;
403
404 let mut collection_buf = vec![0u8; collection_len];
405 reader.read_exact(&mut collection_buf)?;
406 running_crc = crc32_update(running_crc, &collection_buf);
407 let collection = String::from_utf8(collection_buf).map_err(|err| {
408 io::Error::new(
409 io::ErrorKind::InvalidData,
410 format!("invalid collection utf8: {err}"),
411 )
412 })?;
413
414 let mut entity_buf = [0u8; 8];
415 reader.read_exact(&mut entity_buf)?;
416 running_crc = crc32_update(running_crc, &entity_buf);
417 let entity_id = u64::from_le_bytes(entity_buf);
418
419 let mut count_buf = [0u8; 4];
420 reader.read_exact(&mut count_buf)?;
421 running_crc = crc32_update(running_crc, &count_buf);
422 let count = u32::from_le_bytes(count_buf) as usize;
423
424 let mut vector = Vec::with_capacity(count);
425 for _ in 0..count {
426 let mut value_buf = [0u8; 4];
427 reader.read_exact(&mut value_buf)?;
428 running_crc = crc32_update(running_crc, &value_buf);
429 vector.push(f32::from_le_bytes(value_buf));
430 }
431
432 WalRecord::VectorInsert {
433 collection,
434 entity_id,
435 vector,
436 }
437 }
438 RecordType::FullPageImage => {
439 let mut tx_buf = [0u8; 8];
440 reader.read_exact(&mut tx_buf)?;
441 running_crc = crc32_update(running_crc, &tx_buf);
442 let tx_id = u64::from_le_bytes(tx_buf);
443
444 let mut page_buf = [0u8; 4];
445 reader.read_exact(&mut page_buf)?;
446 running_crc = crc32_update(running_crc, &page_buf);
447 let page_id = u32::from_le_bytes(page_buf);
448
449 let mut epoch_buf = [0u8; 8];
450 reader.read_exact(&mut epoch_buf)?;
451 running_crc = crc32_update(running_crc, &epoch_buf);
452 let ckpt_epoch = u64::from_le_bytes(epoch_buf);
453
454 let mut len_buf = [0u8; 4];
455 reader.read_exact(&mut len_buf)?;
456 running_crc = crc32_update(running_crc, &len_buf);
457 let len = u32::from_le_bytes(len_buf) as usize;
458
459 let mut data = vec![0u8; len];
460 reader.read_exact(&mut data)?;
461 running_crc = crc32_update(running_crc, &data);
462
463 WalRecord::FullPageImage {
464 tx_id,
465 page_id,
466 ckpt_epoch,
467 data,
468 }
469 }
470 RecordType::Checkpoint => {
471 let mut buf = [0u8; 8];
472 reader.read_exact(&mut buf)?;
473 running_crc = crc32_update(running_crc, &buf);
474 let lsn = u64::from_le_bytes(buf);
475 WalRecord::Checkpoint { lsn }
476 }
477 };
478
479 let mut crc_buf = [0u8; 4];
481 reader.read_exact(&mut crc_buf)?;
482 let stored_crc = u32::from_le_bytes(crc_buf);
483
484 if running_crc != stored_crc {
485 return Err(io::Error::new(
486 io::ErrorKind::InvalidData,
487 "WAL record checksum mismatch",
488 ));
489 }
490
491 Ok(Some(record))
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498 use std::io::Cursor;
499
500 #[test]
503 fn test_record_type_from_u8() {
504 assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
505 assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
506 assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
507 assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
508 assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
509 assert_eq!(
510 RecordType::from_u8(6),
511 Some(RecordType::PageWriteCompressed)
512 );
513 assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
514 assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
515 }
516
517 #[test]
518 fn test_record_type_invalid() {
519 assert_eq!(RecordType::from_u8(0), None);
520 assert_eq!(RecordType::from_u8(9), None);
521 assert_eq!(RecordType::from_u8(255), None);
522 }
523
524 #[test]
527 fn test_encode_begin() {
528 let record = WalRecord::Begin { tx_id: 12345 };
529 let encoded = record.encode();
530
531 assert_eq!(encoded.len(), 13);
533 assert_eq!(encoded[0], RecordType::Begin as u8);
534 }
535
536 #[test]
537 fn test_encode_commit() {
538 let record = WalRecord::Commit { tx_id: 99999 };
539 let encoded = record.encode();
540
541 assert_eq!(encoded.len(), 13);
542 assert_eq!(encoded[0], RecordType::Commit as u8);
543 }
544
545 #[test]
546 fn test_encode_rollback() {
547 let record = WalRecord::Rollback { tx_id: 54321 };
548 let encoded = record.encode();
549
550 assert_eq!(encoded.len(), 13);
551 assert_eq!(encoded[0], RecordType::Rollback as u8);
552 }
553
554 #[test]
555 fn test_encode_checkpoint() {
556 let record = WalRecord::Checkpoint { lsn: 1000000 };
557 let encoded = record.encode();
558
559 assert_eq!(encoded.len(), 13);
560 assert_eq!(encoded[0], RecordType::Checkpoint as u8);
561 }
562
563 #[test]
564 fn test_encode_page_write_small() {
565 let data = vec![1, 2, 3, 4, 5];
567 let record = WalRecord::PageWrite {
568 tx_id: 100,
569 page_id: 42,
570 data: data.clone(),
571 };
572 let encoded = record.encode();
573
574 assert_eq!(encoded.len(), 26);
576 assert_eq!(encoded[0], RecordType::PageWrite as u8);
577 }
578
579 #[test]
580 fn test_encode_page_write_empty_data() {
581 let record = WalRecord::PageWrite {
582 tx_id: 1,
583 page_id: 0,
584 data: vec![],
585 };
586 let encoded = record.encode();
587
588 assert_eq!(encoded.len(), 21);
590 }
591
592 #[test]
593 fn test_encode_tx_commit_batch() {
594 let record = WalRecord::TxCommitBatch {
595 tx_id: 7,
596 actions: vec![b"insert".to_vec(), b"update".to_vec()],
597 };
598 let encoded = record.encode();
599
600 assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
601 }
602
603 #[test]
606 fn test_read_begin_roundtrip() {
607 let original = WalRecord::Begin { tx_id: 42 };
608 let encoded = original.encode();
609
610 let mut cursor = Cursor::new(encoded);
611 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
612
613 assert_eq!(decoded, original);
614 }
615
616 #[test]
617 fn test_read_commit_roundtrip() {
618 let original = WalRecord::Commit { tx_id: 999 };
619 let encoded = original.encode();
620
621 let mut cursor = Cursor::new(encoded);
622 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
623
624 assert_eq!(decoded, original);
625 }
626
627 #[test]
628 fn test_read_rollback_roundtrip() {
629 let original = WalRecord::Rollback { tx_id: 777 };
630 let encoded = original.encode();
631
632 let mut cursor = Cursor::new(encoded);
633 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
634
635 assert_eq!(decoded, original);
636 }
637
638 #[test]
639 fn test_read_checkpoint_roundtrip() {
640 let original = WalRecord::Checkpoint { lsn: 123456789 };
641 let encoded = original.encode();
642
643 let mut cursor = Cursor::new(encoded);
644 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
645
646 assert_eq!(decoded, original);
647 }
648
649 #[test]
650 fn test_read_page_write_roundtrip() {
651 let original = WalRecord::PageWrite {
652 tx_id: 50,
653 page_id: 100,
654 data: vec![10, 20, 30, 40, 50, 60, 70, 80],
655 };
656 let encoded = original.encode();
657
658 let mut cursor = Cursor::new(encoded);
659 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
660
661 assert_eq!(decoded, original);
662 }
663
664 #[test]
665 fn test_read_tx_commit_batch_roundtrip() {
666 let original = WalRecord::TxCommitBatch {
667 tx_id: 42,
668 actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
669 };
670 let encoded = original.encode();
671
672 let mut cursor = Cursor::new(encoded);
673 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
674
675 assert_eq!(decoded, original);
676 }
677
678 #[test]
679 fn test_vector_insert_roundtrip() {
680 let original = WalRecord::VectorInsert {
681 collection: "turbo".to_string(),
682 entity_id: 42,
683 vector: vec![1.0, -0.5, 0.25],
684 };
685 let encoded = original.encode();
686
687 let mut cursor = Cursor::new(encoded);
688 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
689
690 assert_eq!(decoded, original);
691 }
692
693 #[test]
694 fn test_read_page_write_large_data() {
695 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
697 let original = WalRecord::PageWrite {
698 tx_id: 1,
699 page_id: 0,
700 data,
701 };
702 let encoded = original.encode();
703
704 let mut cursor = Cursor::new(encoded);
705 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
706
707 assert_eq!(decoded, original);
709 }
710
711 #[test]
712 fn page_write_compressed_roundtrip() {
713 let data = vec![0xABu8; 1024];
715 let record = WalRecord::PageWrite {
716 tx_id: 7,
717 page_id: 3,
718 data: data.clone(),
719 };
720 let encoded = record.encode();
721
722 assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
724
725 let mut cursor = Cursor::new(encoded);
727 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
728 assert_eq!(
729 decoded,
730 WalRecord::PageWrite {
731 tx_id: 7,
732 page_id: 3,
733 data
734 }
735 );
736 }
737
738 #[test]
739 fn full_page_image_roundtrip() {
740 let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
741 let original = WalRecord::FullPageImage {
742 tx_id: 11,
743 page_id: 9,
744 ckpt_epoch: 42,
745 data: data.clone(),
746 };
747 let encoded = original.encode();
748 assert_eq!(encoded[0], RecordType::FullPageImage as u8);
749
750 let mut cursor = Cursor::new(encoded);
751 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
752 assert_eq!(decoded, original);
753 }
754
755 #[test]
756 fn full_page_image_checksum_mismatch_detected() {
757 let original = WalRecord::FullPageImage {
758 tx_id: 1,
759 page_id: 2,
760 ckpt_epoch: 3,
761 data: vec![0xAA; 32],
762 };
763 let mut encoded = original.encode();
764 let mid = encoded.len() / 2;
765 encoded[mid] ^= 0xFF;
766 let mut cursor = Cursor::new(encoded);
767 assert!(WalRecord::read(&mut cursor).is_err());
768 }
769
770 #[test]
771 fn test_read_eof() {
772 let mut cursor = Cursor::new(Vec::<u8>::new());
773 let result = WalRecord::read(&mut cursor).unwrap();
774 assert!(result.is_none());
775 }
776
777 #[test]
778 fn test_read_invalid_record_type() {
779 let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let mut cursor = Cursor::new(buf);
781 let result = WalRecord::read(&mut cursor);
782 assert!(result.is_err());
783 }
784
785 #[test]
786 fn test_read_checksum_mismatch() {
787 let record = WalRecord::Begin { tx_id: 42 };
788 let mut encoded = record.encode();
789
790 let len = encoded.len();
792 encoded[len - 1] ^= 0xFF;
793
794 let mut cursor = Cursor::new(encoded);
795 let result = WalRecord::read(&mut cursor);
796 assert!(result.is_err());
797 }
798
799 #[test]
800 fn test_read_data_corruption() {
801 let record = WalRecord::PageWrite {
802 tx_id: 1,
803 page_id: 2,
804 data: vec![1, 2, 3, 4],
805 };
806 let mut encoded = record.encode();
807
808 encoded[15] ^= 0xFF;
810
811 let mut cursor = Cursor::new(encoded);
812 let result = WalRecord::read(&mut cursor);
813 assert!(result.is_err()); }
815
816 #[test]
819 fn test_multiple_records_sequential() {
820 let records = vec![
821 WalRecord::Begin { tx_id: 1 },
822 WalRecord::PageWrite {
823 tx_id: 1,
824 page_id: 10,
825 data: vec![1, 2, 3],
826 },
827 WalRecord::PageWrite {
828 tx_id: 1,
829 page_id: 20,
830 data: vec![4, 5, 6],
831 },
832 WalRecord::Commit { tx_id: 1 },
833 WalRecord::Checkpoint { lsn: 100 },
834 ];
835
836 let mut buf = Vec::new();
838 for r in &records {
839 buf.extend_from_slice(&r.encode());
840 }
841
842 let mut cursor = Cursor::new(buf);
844 for expected in &records {
845 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
846 assert_eq!(&decoded, expected);
847 }
848
849 assert!(WalRecord::read(&mut cursor).unwrap().is_none());
851 }
852
853 #[test]
856 fn test_wal_magic() {
857 assert_eq!(WAL_MAGIC, b"RDBW");
858 }
859
860 #[test]
861 fn test_wal_version() {
862 assert_eq!(WAL_VERSION, 2);
863 }
864}