1use crate::storage::engine::crc32::{crc32, crc32_update};
2use std::io::{self, Read};
3
4pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
6
7pub const WAL_VERSION: u8 = 3;
9pub const WAL_VERSION_V2: u8 = 2;
10pub const WAL_DEFAULT_TERM: u64 = crate::replication::DEFAULT_REPLICATION_TERM;
11
12const COMPRESS_THRESHOLD: usize = 256;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18#[repr(u8)]
19pub enum Compression {
20 None = 0,
21 Zstd = 1,
22}
23
24impl Compression {
25 fn from_u8(v: u8) -> Option<Self> {
26 match v {
27 0 => Some(Compression::None),
28 1 => Some(Compression::Zstd),
29 _ => None,
30 }
31 }
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36#[repr(u8)]
37pub enum RecordType {
38 Begin = 1,
39 Commit = 2,
40 Rollback = 3,
41 PageWrite = 4,
44 Checkpoint = 5,
45 PageWriteCompressed = 6,
54 TxCommitBatch = 7,
61 FullPageImage = 8,
72 VectorInsert = 9,
74}
75
76impl RecordType {
77 pub fn from_u8(v: u8) -> Option<Self> {
78 match v {
79 1 => Some(RecordType::Begin),
80 2 => Some(RecordType::Commit),
81 3 => Some(RecordType::Rollback),
82 4 => Some(RecordType::PageWrite),
83 5 => Some(RecordType::Checkpoint),
84 6 => Some(RecordType::PageWriteCompressed),
85 7 => Some(RecordType::TxCommitBatch),
86 8 => Some(RecordType::FullPageImage),
87 9 => Some(RecordType::VectorInsert),
88 _ => None,
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq)]
95pub enum WalRecord {
96 Begin { tx_id: u64 },
98 Commit { tx_id: u64 },
100 Rollback { tx_id: u64 },
102 PageWrite {
105 tx_id: u64,
106 page_id: u32,
107 data: Vec<u8>,
108 },
109 TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
112 FullPageImage {
116 tx_id: u64,
117 page_id: u32,
118 ckpt_epoch: u64,
119 data: Vec<u8>,
120 },
121 VectorInsert {
124 collection: String,
125 entity_id: u64,
126 vector: Vec<f32>,
127 },
128 Checkpoint { lsn: u64 },
130}
131
132impl WalRecord {
133 pub fn encode(&self) -> Vec<u8> {
139 self.encode_with_term(WAL_DEFAULT_TERM)
140 }
141
142 pub fn encode_with_term(&self, term: u64) -> Vec<u8> {
145 let mut buf = Vec::new();
146 self.encode_with_term_into(&mut buf, term);
147 buf
148 }
149
150 pub fn encode_into(&self, out: &mut Vec<u8>) {
161 self.encode_with_term_into(out, WAL_DEFAULT_TERM)
162 }
163
164 pub fn encode_with_term_into(&self, out: &mut Vec<u8>, term: u64) {
171 let start = out.len();
174 let buf = out;
175
176 match self {
192 WalRecord::Begin { tx_id } => {
193 buf.push(RecordType::Begin as u8);
194 buf.extend_from_slice(&term.to_le_bytes());
195 buf.extend_from_slice(&tx_id.to_le_bytes());
196 }
197 WalRecord::Commit { tx_id } => {
198 buf.push(RecordType::Commit as u8);
199 buf.extend_from_slice(&term.to_le_bytes());
200 buf.extend_from_slice(&tx_id.to_le_bytes());
201 }
202 WalRecord::Rollback { tx_id } => {
203 buf.push(RecordType::Rollback as u8);
204 buf.extend_from_slice(&term.to_le_bytes());
205 buf.extend_from_slice(&tx_id.to_le_bytes());
206 }
207 WalRecord::PageWrite {
208 tx_id,
209 page_id,
210 data,
211 } => {
212 if data.len() >= COMPRESS_THRESHOLD {
213 if let Ok(compressed) =
215 zstd::bulk::compress(data.as_slice(), 3)
216 {
217 if compressed.len() < data.len() {
218 buf.push(RecordType::PageWriteCompressed as u8);
220 buf.extend_from_slice(&term.to_le_bytes());
221 buf.extend_from_slice(&tx_id.to_le_bytes());
222 buf.extend_from_slice(&page_id.to_le_bytes());
223 buf.push(Compression::Zstd as u8);
224 buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
226 buf.extend_from_slice(&compressed);
227 let checksum = crc32(&buf[start..]);
228 buf.extend_from_slice(&checksum.to_le_bytes());
229 return;
230 }
231 }
232 }
233 buf.push(RecordType::PageWrite as u8);
235 buf.extend_from_slice(&term.to_le_bytes());
236 buf.extend_from_slice(&tx_id.to_le_bytes());
237 buf.extend_from_slice(&page_id.to_le_bytes());
238 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
239 buf.extend_from_slice(data);
240 }
241 WalRecord::TxCommitBatch { tx_id, actions } => {
242 buf.push(RecordType::TxCommitBatch as u8);
243 buf.extend_from_slice(&term.to_le_bytes());
244 buf.extend_from_slice(&tx_id.to_le_bytes());
245 buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
246 for action in actions {
247 buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
248 buf.extend_from_slice(action);
249 }
250 }
251 WalRecord::FullPageImage {
252 tx_id,
253 page_id,
254 ckpt_epoch,
255 data,
256 } => {
257 buf.push(RecordType::FullPageImage as u8);
258 buf.extend_from_slice(&term.to_le_bytes());
259 buf.extend_from_slice(&tx_id.to_le_bytes());
260 buf.extend_from_slice(&page_id.to_le_bytes());
261 buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
262 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
263 buf.extend_from_slice(data);
264 }
265 WalRecord::VectorInsert {
266 collection,
267 entity_id,
268 vector,
269 } => {
270 buf.push(RecordType::VectorInsert as u8);
271 buf.extend_from_slice(&term.to_le_bytes());
272 buf.extend_from_slice(&(collection.len() as u32).to_le_bytes());
273 buf.extend_from_slice(collection.as_bytes());
274 buf.extend_from_slice(&entity_id.to_le_bytes());
275 buf.extend_from_slice(&(vector.len() as u32).to_le_bytes());
276 for value in vector {
277 buf.extend_from_slice(&value.to_le_bytes());
278 }
279 }
280 WalRecord::Checkpoint { lsn } => {
281 buf.push(RecordType::Checkpoint as u8);
282 buf.extend_from_slice(&term.to_le_bytes());
283 buf.extend_from_slice(&lsn.to_le_bytes());
284 }
285 }
286
287 let checksum = crc32(&buf[start..]);
289 buf.extend_from_slice(&checksum.to_le_bytes());
290 }
291
292 pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
297 Ok(Self::read_with_term(reader)?.map(|(_, record)| record))
298 }
299
300 pub fn read_with_term<R: Read>(reader: &mut R) -> io::Result<Option<(u64, WalRecord)>> {
302 Self::read_with_format_version(reader, WAL_VERSION)
303 }
304
305 pub(crate) fn read_with_format_version<R: Read>(
306 reader: &mut R,
307 format_version: u8,
308 ) -> io::Result<Option<(u64, WalRecord)>> {
309 let mut type_buf = [0u8; 1];
311 match reader.read_exact(&mut type_buf) {
312 Ok(_) => (),
313 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
314 Err(e) => return Err(e),
315 };
316
317 let record_type = RecordType::from_u8(type_buf[0])
318 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
319
320 let mut running_crc = crc32_update(0, &type_buf);
322 let term = match format_version {
323 WAL_VERSION => {
324 let mut term_buf = [0u8; 8];
325 reader.read_exact(&mut term_buf)?;
326 running_crc = crc32_update(running_crc, &term_buf);
327 u64::from_le_bytes(term_buf)
328 }
329 WAL_VERSION_V2 => WAL_DEFAULT_TERM,
330 _ => {
331 return Err(io::Error::new(
332 io::ErrorKind::InvalidData,
333 format!("Unsupported WAL version: {format_version}"),
334 ));
335 }
336 };
337
338 let record = match record_type {
339 RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
340 let mut buf = [0u8; 8];
341 reader.read_exact(&mut buf)?;
342 running_crc = crc32_update(running_crc, &buf);
343 let tx_id = u64::from_le_bytes(buf);
344
345 match record_type {
346 RecordType::Begin => WalRecord::Begin { tx_id },
347 RecordType::Commit => WalRecord::Commit { tx_id },
348 RecordType::Rollback => WalRecord::Rollback { tx_id },
349 _ => unreachable!(),
350 }
351 }
352 RecordType::PageWrite => {
353 let mut tx_buf = [0u8; 8];
355 reader.read_exact(&mut tx_buf)?;
356 running_crc = crc32_update(running_crc, &tx_buf);
357 let tx_id = u64::from_le_bytes(tx_buf);
358
359 let mut page_buf = [0u8; 4];
361 reader.read_exact(&mut page_buf)?;
362 running_crc = crc32_update(running_crc, &page_buf);
363 let page_id = u32::from_le_bytes(page_buf);
364
365 let mut len_buf = [0u8; 4];
367 reader.read_exact(&mut len_buf)?;
368 running_crc = crc32_update(running_crc, &len_buf);
369 let len = u32::from_le_bytes(len_buf) as usize;
370
371 let mut data = vec![0u8; len];
373 reader.read_exact(&mut data)?;
374 running_crc = crc32_update(running_crc, &data);
375
376 WalRecord::PageWrite {
377 tx_id,
378 page_id,
379 data,
380 }
381 }
382 RecordType::PageWriteCompressed => {
383 let mut tx_buf = [0u8; 8];
385 reader.read_exact(&mut tx_buf)?;
386 running_crc = crc32_update(running_crc, &tx_buf);
387 let tx_id = u64::from_le_bytes(tx_buf);
388
389 let mut page_buf = [0u8; 4];
391 reader.read_exact(&mut page_buf)?;
392 running_crc = crc32_update(running_crc, &page_buf);
393 let page_id = u32::from_le_bytes(page_buf);
394
395 let mut comp_buf = [0u8; 1];
397 reader.read_exact(&mut comp_buf)?;
398 running_crc = crc32_update(running_crc, &comp_buf);
399 let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
400 io::Error::new(
401 io::ErrorKind::InvalidData,
402 format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
403 )
404 })?;
405
406 let mut orig_len_buf = [0u8; 4];
408 reader.read_exact(&mut orig_len_buf)?;
409 running_crc = crc32_update(running_crc, &orig_len_buf);
410 let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
411
412 let mut len_buf = [0u8; 4];
414 reader.read_exact(&mut len_buf)?;
415 running_crc = crc32_update(running_crc, &len_buf);
416 let len = u32::from_le_bytes(len_buf) as usize;
417
418 let mut compressed = vec![0u8; len];
420 reader.read_exact(&mut compressed)?;
421 running_crc = crc32_update(running_crc, &compressed);
422
423 let data = match compression {
425 Compression::Zstd => {
426 let mut out = vec![0u8; orig_len];
427 zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
428 io::Error::new(
429 io::ErrorKind::InvalidData,
430 format!("WAL zstd decompress failed: {e}"),
431 )
432 })?;
433 out
434 }
435 Compression::None => compressed,
436 };
437
438 WalRecord::PageWrite {
439 tx_id,
440 page_id,
441 data,
442 }
443 }
444 RecordType::TxCommitBatch => {
445 let mut tx_buf = [0u8; 8];
446 reader.read_exact(&mut tx_buf)?;
447 running_crc = crc32_update(running_crc, &tx_buf);
448 let tx_id = u64::from_le_bytes(tx_buf);
449
450 let mut count_buf = [0u8; 4];
451 reader.read_exact(&mut count_buf)?;
452 running_crc = crc32_update(running_crc, &count_buf);
453 let count = u32::from_le_bytes(count_buf) as usize;
454
455 let mut actions = Vec::with_capacity(count);
456 for _ in 0..count {
457 let mut len_buf = [0u8; 4];
458 reader.read_exact(&mut len_buf)?;
459 running_crc = crc32_update(running_crc, &len_buf);
460 let len = u32::from_le_bytes(len_buf) as usize;
461
462 let mut action = vec![0u8; len];
463 reader.read_exact(&mut action)?;
464 running_crc = crc32_update(running_crc, &action);
465 actions.push(action);
466 }
467
468 WalRecord::TxCommitBatch { tx_id, actions }
469 }
470 RecordType::VectorInsert => {
471 let mut len_buf = [0u8; 4];
472 reader.read_exact(&mut len_buf)?;
473 running_crc = crc32_update(running_crc, &len_buf);
474 let collection_len = u32::from_le_bytes(len_buf) as usize;
475
476 let mut collection_buf = vec![0u8; collection_len];
477 reader.read_exact(&mut collection_buf)?;
478 running_crc = crc32_update(running_crc, &collection_buf);
479 let collection = String::from_utf8(collection_buf).map_err(|err| {
480 io::Error::new(
481 io::ErrorKind::InvalidData,
482 format!("invalid collection utf8: {err}"),
483 )
484 })?;
485
486 let mut entity_buf = [0u8; 8];
487 reader.read_exact(&mut entity_buf)?;
488 running_crc = crc32_update(running_crc, &entity_buf);
489 let entity_id = u64::from_le_bytes(entity_buf);
490
491 let mut count_buf = [0u8; 4];
492 reader.read_exact(&mut count_buf)?;
493 running_crc = crc32_update(running_crc, &count_buf);
494 let count = u32::from_le_bytes(count_buf) as usize;
495
496 let mut vector = Vec::with_capacity(count);
497 for _ in 0..count {
498 let mut value_buf = [0u8; 4];
499 reader.read_exact(&mut value_buf)?;
500 running_crc = crc32_update(running_crc, &value_buf);
501 vector.push(f32::from_le_bytes(value_buf));
502 }
503
504 WalRecord::VectorInsert {
505 collection,
506 entity_id,
507 vector,
508 }
509 }
510 RecordType::FullPageImage => {
511 let mut tx_buf = [0u8; 8];
512 reader.read_exact(&mut tx_buf)?;
513 running_crc = crc32_update(running_crc, &tx_buf);
514 let tx_id = u64::from_le_bytes(tx_buf);
515
516 let mut page_buf = [0u8; 4];
517 reader.read_exact(&mut page_buf)?;
518 running_crc = crc32_update(running_crc, &page_buf);
519 let page_id = u32::from_le_bytes(page_buf);
520
521 let mut epoch_buf = [0u8; 8];
522 reader.read_exact(&mut epoch_buf)?;
523 running_crc = crc32_update(running_crc, &epoch_buf);
524 let ckpt_epoch = u64::from_le_bytes(epoch_buf);
525
526 let mut len_buf = [0u8; 4];
527 reader.read_exact(&mut len_buf)?;
528 running_crc = crc32_update(running_crc, &len_buf);
529 let len = u32::from_le_bytes(len_buf) as usize;
530
531 let mut data = vec![0u8; len];
532 reader.read_exact(&mut data)?;
533 running_crc = crc32_update(running_crc, &data);
534
535 WalRecord::FullPageImage {
536 tx_id,
537 page_id,
538 ckpt_epoch,
539 data,
540 }
541 }
542 RecordType::Checkpoint => {
543 let mut buf = [0u8; 8];
544 reader.read_exact(&mut buf)?;
545 running_crc = crc32_update(running_crc, &buf);
546 let lsn = u64::from_le_bytes(buf);
547 WalRecord::Checkpoint { lsn }
548 }
549 };
550
551 let mut crc_buf = [0u8; 4];
553 reader.read_exact(&mut crc_buf)?;
554 let stored_crc = u32::from_le_bytes(crc_buf);
555
556 if running_crc != stored_crc {
557 return Err(io::Error::new(
558 io::ErrorKind::InvalidData,
559 "WAL record checksum mismatch",
560 ));
561 }
562
563 Ok(Some((term, record)))
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570 use std::io::Cursor;
571
572 #[test]
575 fn test_record_type_from_u8() {
576 assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
577 assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
578 assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
579 assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
580 assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
581 assert_eq!(
582 RecordType::from_u8(6),
583 Some(RecordType::PageWriteCompressed)
584 );
585 assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
586 assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
587 assert_eq!(RecordType::from_u8(9), Some(RecordType::VectorInsert));
588 }
589
590 #[test]
591 fn test_record_type_invalid() {
592 assert_eq!(RecordType::from_u8(0), None);
593 assert_eq!(RecordType::from_u8(10), None);
594 assert_eq!(RecordType::from_u8(255), None);
595 }
596
597 #[test]
600 fn test_encode_begin() {
601 let record = WalRecord::Begin { tx_id: 12345 };
602 let encoded = record.encode();
603
604 assert_eq!(encoded.len(), 21);
606 assert_eq!(encoded[0], RecordType::Begin as u8);
607 }
608
609 #[test]
610 fn test_encode_commit() {
611 let record = WalRecord::Commit { tx_id: 99999 };
612 let encoded = record.encode();
613
614 assert_eq!(encoded.len(), 21);
615 assert_eq!(encoded[0], RecordType::Commit as u8);
616 }
617
618 #[test]
619 fn test_encode_rollback() {
620 let record = WalRecord::Rollback { tx_id: 54321 };
621 let encoded = record.encode();
622
623 assert_eq!(encoded.len(), 21);
624 assert_eq!(encoded[0], RecordType::Rollback as u8);
625 }
626
627 #[test]
628 fn test_encode_checkpoint() {
629 let record = WalRecord::Checkpoint { lsn: 1000000 };
630 let encoded = record.encode();
631
632 assert_eq!(encoded.len(), 21);
633 assert_eq!(encoded[0], RecordType::Checkpoint as u8);
634 }
635
636 #[test]
637 fn test_encode_page_write_small() {
638 let data = vec![1, 2, 3, 4, 5];
640 let record = WalRecord::PageWrite {
641 tx_id: 100,
642 page_id: 42,
643 data: data.clone(),
644 };
645 let encoded = record.encode();
646
647 assert_eq!(encoded.len(), 34);
649 assert_eq!(encoded[0], RecordType::PageWrite as u8);
650 }
651
652 #[test]
653 fn test_encode_page_write_empty_data() {
654 let record = WalRecord::PageWrite {
655 tx_id: 1,
656 page_id: 0,
657 data: vec![],
658 };
659 let encoded = record.encode();
660
661 assert_eq!(encoded.len(), 29);
663 }
664
665 #[test]
666 fn test_encode_tx_commit_batch() {
667 let record = WalRecord::TxCommitBatch {
668 tx_id: 7,
669 actions: vec![b"insert".to_vec(), b"update".to_vec()],
670 };
671 let encoded = record.encode();
672
673 assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
674 }
675
676 #[test]
679 fn test_read_begin_roundtrip() {
680 let original = WalRecord::Begin { tx_id: 42 };
681 let encoded = original.encode();
682
683 let mut cursor = Cursor::new(encoded);
684 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
685
686 assert_eq!(decoded, original);
687 }
688
689 #[test]
690 fn test_read_begin_roundtrip_preserves_term() {
691 let original = WalRecord::Begin { tx_id: 42 };
692 let encoded = original.encode_with_term(9);
693
694 let mut cursor = Cursor::new(encoded);
695 let (term, decoded) = WalRecord::read_with_term(&mut cursor).unwrap().unwrap();
696
697 assert_eq!(term, 9);
698 assert_eq!(decoded, original);
699 }
700
701 #[test]
702 fn test_read_v2_begin_defaults_term() {
703 let tx_id = 42u64;
704 let mut encoded = Vec::new();
705 encoded.push(RecordType::Begin as u8);
706 encoded.extend_from_slice(&tx_id.to_le_bytes());
707 let checksum = crc32(&encoded);
708 encoded.extend_from_slice(&checksum.to_le_bytes());
709
710 let mut cursor = Cursor::new(encoded);
711 let (term, decoded) = WalRecord::read_with_format_version(&mut cursor, WAL_VERSION_V2)
712 .unwrap()
713 .unwrap();
714
715 assert_eq!(term, WAL_DEFAULT_TERM);
716 assert_eq!(decoded, WalRecord::Begin { tx_id });
717 }
718
719 #[test]
720 fn test_read_commit_roundtrip() {
721 let original = WalRecord::Commit { tx_id: 999 };
722 let encoded = original.encode();
723
724 let mut cursor = Cursor::new(encoded);
725 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
726
727 assert_eq!(decoded, original);
728 }
729
730 #[test]
731 fn test_read_rollback_roundtrip() {
732 let original = WalRecord::Rollback { tx_id: 777 };
733 let encoded = original.encode();
734
735 let mut cursor = Cursor::new(encoded);
736 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
737
738 assert_eq!(decoded, original);
739 }
740
741 #[test]
742 fn test_read_checkpoint_roundtrip() {
743 let original = WalRecord::Checkpoint { lsn: 123456789 };
744 let encoded = original.encode();
745
746 let mut cursor = Cursor::new(encoded);
747 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
748
749 assert_eq!(decoded, original);
750 }
751
752 #[test]
753 fn test_read_page_write_roundtrip() {
754 let original = WalRecord::PageWrite {
755 tx_id: 50,
756 page_id: 100,
757 data: vec![10, 20, 30, 40, 50, 60, 70, 80],
758 };
759 let encoded = original.encode();
760
761 let mut cursor = Cursor::new(encoded);
762 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
763
764 assert_eq!(decoded, original);
765 }
766
767 #[test]
768 fn test_read_tx_commit_batch_roundtrip() {
769 let original = WalRecord::TxCommitBatch {
770 tx_id: 42,
771 actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
772 };
773 let encoded = original.encode();
774
775 let mut cursor = Cursor::new(encoded);
776 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
777
778 assert_eq!(decoded, original);
779 }
780
781 #[test]
782 fn test_vector_insert_roundtrip() {
783 let original = WalRecord::VectorInsert {
784 collection: "turbo".to_string(),
785 entity_id: 42,
786 vector: vec![1.0, -0.5, 0.25],
787 };
788 let encoded = original.encode();
789
790 let mut cursor = Cursor::new(encoded);
791 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
792
793 assert_eq!(decoded, original);
794 }
795
796 #[test]
797 fn test_read_page_write_large_data() {
798 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
800 let original = WalRecord::PageWrite {
801 tx_id: 1,
802 page_id: 0,
803 data,
804 };
805 let encoded = original.encode();
806
807 let mut cursor = Cursor::new(encoded);
808 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
809
810 assert_eq!(decoded, original);
812 }
813
814 #[test]
815 fn page_write_compressed_roundtrip() {
816 let data = vec![0xABu8; 1024];
818 let record = WalRecord::PageWrite {
819 tx_id: 7,
820 page_id: 3,
821 data: data.clone(),
822 };
823 let encoded = record.encode();
824
825 assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
827
828 let mut cursor = Cursor::new(encoded);
830 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
831 assert_eq!(
832 decoded,
833 WalRecord::PageWrite {
834 tx_id: 7,
835 page_id: 3,
836 data
837 }
838 );
839 }
840
841 #[test]
842 fn full_page_image_roundtrip() {
843 let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
844 let original = WalRecord::FullPageImage {
845 tx_id: 11,
846 page_id: 9,
847 ckpt_epoch: 42,
848 data: data.clone(),
849 };
850 let encoded = original.encode();
851 assert_eq!(encoded[0], RecordType::FullPageImage as u8);
852
853 let mut cursor = Cursor::new(encoded);
854 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
855 assert_eq!(decoded, original);
856 }
857
858 #[test]
859 fn full_page_image_checksum_mismatch_detected() {
860 let original = WalRecord::FullPageImage {
861 tx_id: 1,
862 page_id: 2,
863 ckpt_epoch: 3,
864 data: vec![0xAA; 32],
865 };
866 let mut encoded = original.encode();
867 let mid = encoded.len() / 2;
868 encoded[mid] ^= 0xFF;
869 let mut cursor = Cursor::new(encoded);
870 assert!(WalRecord::read(&mut cursor).is_err());
871 }
872
873 #[test]
874 fn test_read_eof() {
875 let mut cursor = Cursor::new(Vec::<u8>::new());
876 let result = WalRecord::read(&mut cursor).unwrap();
877 assert!(result.is_none());
878 }
879
880 #[test]
881 fn test_read_invalid_record_type() {
882 let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let mut cursor = Cursor::new(buf);
884 let result = WalRecord::read(&mut cursor);
885 assert!(result.is_err());
886 }
887
888 #[test]
889 fn test_read_checksum_mismatch() {
890 let record = WalRecord::Begin { tx_id: 42 };
891 let mut encoded = record.encode();
892
893 let len = encoded.len();
895 encoded[len - 1] ^= 0xFF;
896
897 let mut cursor = Cursor::new(encoded);
898 let result = WalRecord::read(&mut cursor);
899 assert!(result.is_err());
900 }
901
902 #[test]
903 fn test_read_data_corruption() {
904 let record = WalRecord::PageWrite {
905 tx_id: 1,
906 page_id: 2,
907 data: vec![1, 2, 3, 4],
908 };
909 let mut encoded = record.encode();
910
911 encoded[15] ^= 0xFF;
913
914 let mut cursor = Cursor::new(encoded);
915 let result = WalRecord::read(&mut cursor);
916 assert!(result.is_err()); }
918
919 #[test]
922 fn test_multiple_records_sequential() {
923 let records = vec![
924 WalRecord::Begin { tx_id: 1 },
925 WalRecord::PageWrite {
926 tx_id: 1,
927 page_id: 10,
928 data: vec![1, 2, 3],
929 },
930 WalRecord::PageWrite {
931 tx_id: 1,
932 page_id: 20,
933 data: vec![4, 5, 6],
934 },
935 WalRecord::Commit { tx_id: 1 },
936 WalRecord::Checkpoint { lsn: 100 },
937 ];
938
939 let mut buf = Vec::new();
941 for r in &records {
942 buf.extend_from_slice(&r.encode());
943 }
944
945 let mut cursor = Cursor::new(buf);
947 for expected in &records {
948 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
949 assert_eq!(&decoded, expected);
950 }
951
952 assert!(WalRecord::read(&mut cursor).unwrap().is_none());
954 }
955
956 #[test]
963 fn test_encode_into_matches_encode_for_all_variants() {
964 let records = vec![
965 WalRecord::Begin { tx_id: 12345 },
966 WalRecord::Commit { tx_id: 99999 },
967 WalRecord::Rollback { tx_id: 54321 },
968 WalRecord::Checkpoint { lsn: 1_000_000 },
969 WalRecord::PageWrite {
970 tx_id: 100,
971 page_id: 42,
972 data: vec![1, 2, 3, 4, 5],
973 },
974 WalRecord::PageWrite {
977 tx_id: 7,
978 page_id: 3,
979 data: vec![0xABu8; 1024],
980 },
981 WalRecord::TxCommitBatch {
982 tx_id: 7,
983 actions: vec![b"insert".to_vec(), b"update".to_vec()],
984 },
985 WalRecord::FullPageImage {
986 tx_id: 11,
987 page_id: 9,
988 ckpt_epoch: 42,
989 data: (0..4096).map(|i| (i % 251) as u8).collect(),
990 },
991 WalRecord::VectorInsert {
992 collection: "turbo".to_string(),
993 entity_id: 42,
994 vector: vec![1.0, -0.5, 0.25],
995 },
996 ];
997
998 for record in &records {
999 let baseline = record.encode();
1000 let mut scratch = Vec::new();
1001 record.encode_into(&mut scratch);
1002 assert_eq!(scratch, baseline, "encode_into mismatch for {record:?}");
1003 }
1004 }
1005
1006 #[test]
1010 fn test_encode_into_reuses_scratch_across_records() {
1011 let records = vec![
1012 WalRecord::Begin { tx_id: 1 },
1013 WalRecord::PageWrite {
1014 tx_id: 1,
1015 page_id: 10,
1016 data: vec![1, 2, 3],
1017 },
1018 WalRecord::Commit { tx_id: 1 },
1019 ];
1020
1021 let mut expected = Vec::new();
1022 for r in &records {
1023 expected.extend_from_slice(&r.encode());
1024 }
1025
1026 let mut scratch = Vec::new();
1028 for r in &records {
1029 r.encode_into(&mut scratch);
1030 }
1031
1032 assert_eq!(scratch, expected);
1033
1034 let mut cursor = Cursor::new(scratch);
1036 for expected in &records {
1037 let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
1038 assert_eq!(&decoded, expected);
1039 }
1040 assert!(WalRecord::read(&mut cursor).unwrap().is_none());
1041 }
1042
1043 #[test]
1046 fn test_encode_with_term_into_matches_and_preserves_prefix() {
1047 let prefix = b"PREFIX-BYTES".to_vec();
1048 let record = WalRecord::Begin { tx_id: 42 };
1049
1050 let mut scratch = prefix.clone();
1051 record.encode_with_term_into(&mut scratch, 9);
1052
1053 assert_eq!(&scratch[..prefix.len()], &prefix[..]);
1055 assert_eq!(&scratch[prefix.len()..], &record.encode_with_term(9)[..]);
1056 }
1057
1058 #[test]
1059 fn test_wal_magic() {
1060 assert_eq!(WAL_MAGIC, b"RDBW");
1061 }
1062
1063 #[test]
1064 fn test_wal_version() {
1065 assert_eq!(WAL_VERSION, 3);
1066 }
1067}