1use std::mem::size_of;
62
63pub const ZERO_COPY_MAGIC: u32 = 0x5A43_4F50; pub const FORMAT_VERSION: u16 = 1;
68
69pub const HEADER_SIZE: usize = 16;
71
72#[repr(C, packed)]
78#[derive(Debug, Clone, Copy)]
79pub struct ZeroCopyHeader {
80 pub magic: u32,
82 pub version: u16,
84 pub flags: u16,
86 pub total_length: u32,
88 pub crc: u32,
90}
91
92impl ZeroCopyHeader {
93 pub fn new(data_length: usize, flags: u16, crc: u32) -> Self {
95 Self {
96 magic: ZERO_COPY_MAGIC,
97 version: FORMAT_VERSION,
98 flags,
99 total_length: (HEADER_SIZE + data_length) as u32,
100 crc,
101 }
102 }
103
104 #[inline]
106 pub fn validate(&self) -> bool {
107 self.magic == ZERO_COPY_MAGIC && self.version <= FORMAT_VERSION
108 }
109
110 pub fn write_to(&self, buf: &mut [u8]) {
112 assert!(buf.len() >= HEADER_SIZE);
113 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
114 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
115 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
116 buf[8..12].copy_from_slice(&self.total_length.to_le_bytes());
117 buf[12..16].copy_from_slice(&self.crc.to_le_bytes());
118 }
119
120 pub fn read_from(buf: &[u8]) -> Option<Self> {
122 if buf.len() < HEADER_SIZE {
123 return None;
124 }
125 Some(Self {
126 magic: u32::from_le_bytes(buf[0..4].try_into().ok()?),
127 version: u16::from_le_bytes(buf[4..6].try_into().ok()?),
128 flags: u16::from_le_bytes(buf[6..8].try_into().ok()?),
129 total_length: u32::from_le_bytes(buf[8..12].try_into().ok()?),
130 crc: u32::from_le_bytes(buf[12..16].try_into().ok()?),
131 })
132 }
133}
134
135#[repr(u8)]
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum WalEntryType {
143 Insert = 1,
145 Update = 2,
147 Delete = 3,
149 BeginTxn = 4,
151 CommitTxn = 5,
153 AbortTxn = 6,
155 Checkpoint = 7,
157}
158
159impl WalEntryType {
160 pub fn from_u8(v: u8) -> Option<Self> {
161 match v {
162 1 => Some(Self::Insert),
163 2 => Some(Self::Update),
164 3 => Some(Self::Delete),
165 4 => Some(Self::BeginTxn),
166 5 => Some(Self::CommitTxn),
167 6 => Some(Self::AbortTxn),
168 7 => Some(Self::Checkpoint),
169 _ => None,
170 }
171 }
172}
173
174#[repr(C)]
176#[derive(Debug, Clone, Copy)]
177pub struct WalEntryHeader {
178 pub txn_id: u64,
180 pub lsn: u64,
182 pub timestamp: u64,
184 pub entry_type: u8,
186 pub field_count: u8,
188 pub _reserved: [u8; 6],
190}
191
192pub const WAL_ENTRY_HEADER_SIZE: usize = size_of::<WalEntryHeader>();
193
194impl WalEntryHeader {
195 pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType, field_count: u8) -> Self {
196 Self {
197 txn_id,
198 lsn,
199 timestamp: std::time::SystemTime::now()
200 .duration_since(std::time::UNIX_EPOCH)
201 .map(|d| d.as_nanos() as u64)
202 .unwrap_or(0),
203 entry_type: entry_type as u8,
204 field_count,
205 _reserved: [0; 6],
206 }
207 }
208
209 pub fn write_to(&self, buf: &mut [u8]) {
211 assert!(buf.len() >= WAL_ENTRY_HEADER_SIZE);
212 buf[0..8].copy_from_slice(&self.txn_id.to_le_bytes());
213 buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
214 buf[16..24].copy_from_slice(&self.timestamp.to_le_bytes());
215 buf[24] = self.entry_type;
216 buf[25] = self.field_count;
217 buf[26..32].copy_from_slice(&self._reserved);
218 }
219
220 #[inline]
222 pub fn read_from(buf: &[u8]) -> Option<&Self> {
223 if buf.len() < WAL_ENTRY_HEADER_SIZE {
224 return None;
225 }
226 unsafe {
229 let ptr = buf.as_ptr() as *const Self;
230 if ptr as usize % std::mem::align_of::<Self>() != 0 {
232 return None;
233 }
234 Some(&*ptr)
235 }
236 }
237
238 pub fn read_from_copy(buf: &[u8]) -> Option<Self> {
240 if buf.len() < WAL_ENTRY_HEADER_SIZE {
241 return None;
242 }
243 Some(Self {
244 txn_id: u64::from_le_bytes(buf[0..8].try_into().ok()?),
245 lsn: u64::from_le_bytes(buf[8..16].try_into().ok()?),
246 timestamp: u64::from_le_bytes(buf[16..24].try_into().ok()?),
247 entry_type: buf[24],
248 field_count: buf[25],
249 _reserved: buf[26..32].try_into().ok()?,
250 })
251 }
252}
253
254#[repr(C)]
260#[derive(Debug, Clone, Copy)]
261pub struct FieldDescriptor {
262 pub offset: u32,
264 pub length: u32,
266}
267
268pub const FIELD_DESCRIPTOR_SIZE: usize = size_of::<FieldDescriptor>();
269
270pub struct WalEntryBuilder {
272 header: WalEntryHeader,
274 fields: Vec<FieldDescriptor>,
276 data: Vec<u8>,
278}
279
280impl WalEntryBuilder {
281 pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType) -> Self {
283 Self {
284 header: WalEntryHeader::new(txn_id, lsn, entry_type, 0),
285 fields: Vec::new(),
286 data: Vec::new(),
287 }
288 }
289
290 pub fn add_field(&mut self, data: &[u8]) -> &mut Self {
292 let offset = self.data.len() as u32;
293 let length = data.len() as u32;
294 self.fields.push(FieldDescriptor { offset, length });
295 self.data.extend_from_slice(data);
296 self.header.field_count = self.fields.len() as u8;
297 self
298 }
299
300 pub fn with_key(&mut self, key: &[u8]) -> &mut Self {
302 self.add_field(key)
303 }
304
305 pub fn with_value(&mut self, value: &[u8]) -> &mut Self {
307 self.add_field(value)
308 }
309
310 pub fn total_size(&self) -> usize {
312 HEADER_SIZE +
313 WAL_ENTRY_HEADER_SIZE +
314 self.fields.len() * FIELD_DESCRIPTOR_SIZE +
315 self.data.len()
316 }
317
318 pub fn build(&self) -> Vec<u8> {
320 let data_len = WAL_ENTRY_HEADER_SIZE +
321 self.fields.len() * FIELD_DESCRIPTOR_SIZE +
322 self.data.len();
323
324 let mut buf = vec![0u8; HEADER_SIZE + data_len];
325
326 let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
328
329 let header = ZeroCopyHeader::new(data_len, 0, crc);
331 header.write_to(&mut buf[0..HEADER_SIZE]);
332
333 let offset = HEADER_SIZE;
335 self.header.write_to(&mut buf[offset..offset + WAL_ENTRY_HEADER_SIZE]);
336
337 let mut offset = HEADER_SIZE + WAL_ENTRY_HEADER_SIZE;
339 for field in &self.fields {
340 buf[offset..offset + 4].copy_from_slice(&field.offset.to_le_bytes());
341 buf[offset + 4..offset + 8].copy_from_slice(&field.length.to_le_bytes());
342 offset += FIELD_DESCRIPTOR_SIZE;
343 }
344
345 buf[offset..].copy_from_slice(&self.data);
347
348 let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
350 buf[12..16].copy_from_slice(&crc.to_le_bytes());
351
352 buf
353 }
354}
355
356pub struct WalEntryReader<'a> {
365 data: &'a [u8],
367 header: &'a WalEntryHeader,
369 field_count: usize,
371 fields_offset: usize,
373 data_offset: usize,
375}
376
377impl<'a> WalEntryReader<'a> {
378 pub fn from_bytes(bytes: &'a [u8]) -> Option<Self> {
380 let outer_header = ZeroCopyHeader::read_from(bytes)?;
382 if !outer_header.validate() {
383 return None;
384 }
385
386 let expected_crc = outer_header.crc;
388 let actual_crc = crc32fast::hash(&bytes[HEADER_SIZE..]);
389 if expected_crc != actual_crc {
390 return None;
391 }
392
393 let entry_data = &bytes[HEADER_SIZE..];
395 let header = WalEntryHeader::read_from(entry_data)?;
396
397 let field_count = header.field_count as usize;
398 let fields_offset = WAL_ENTRY_HEADER_SIZE;
399 let data_offset = fields_offset + field_count * FIELD_DESCRIPTOR_SIZE;
400
401 Some(Self {
402 data: entry_data,
403 header,
404 field_count,
405 fields_offset,
406 data_offset,
407 })
408 }
409
410 #[inline]
412 pub fn txn_id(&self) -> u64 {
413 self.header.txn_id
414 }
415
416 #[inline]
418 pub fn lsn(&self) -> u64 {
419 self.header.lsn
420 }
421
422 #[inline]
424 pub fn timestamp(&self) -> u64 {
425 self.header.timestamp
426 }
427
428 #[inline]
430 pub fn entry_type(&self) -> Option<WalEntryType> {
431 WalEntryType::from_u8(self.header.entry_type)
432 }
433
434 #[inline]
436 pub fn field_count(&self) -> usize {
437 self.field_count
438 }
439
440 #[inline]
442 pub fn get_field(&self, index: usize) -> Option<&'a [u8]> {
443 if index >= self.field_count {
444 return None;
445 }
446
447 let desc_offset = self.fields_offset + index * FIELD_DESCRIPTOR_SIZE;
448 let desc_bytes = self.data.get(desc_offset..desc_offset + FIELD_DESCRIPTOR_SIZE)?;
449
450 let offset = u32::from_le_bytes(desc_bytes[0..4].try_into().ok()?) as usize;
451 let length = u32::from_le_bytes(desc_bytes[4..8].try_into().ok()?) as usize;
452
453 let start = self.data_offset + offset;
454 self.data.get(start..start + length)
455 }
456
457 #[inline]
459 pub fn key(&self) -> Option<&'a [u8]> {
460 self.get_field(0)
461 }
462
463 #[inline]
465 pub fn value(&self) -> Option<&'a [u8]> {
466 self.get_field(1)
467 }
468
469 pub fn fields(&self) -> impl Iterator<Item = &'a [u8]> + '_ {
471 (0..self.field_count).filter_map(|i| self.get_field(i))
472 }
473}
474
475pub struct WalBatchWriter {
484 entries: Vec<Vec<u8>>,
486 total_size: usize,
488}
489
490impl WalBatchWriter {
491 pub fn new() -> Self {
492 Self {
493 entries: Vec::new(),
494 total_size: 0,
495 }
496 }
497
498 pub fn with_capacity(capacity: usize) -> Self {
499 Self {
500 entries: Vec::with_capacity(capacity),
501 total_size: 0,
502 }
503 }
504
505 pub fn add(&mut self, entry: WalEntryBuilder) {
507 let bytes = entry.build();
508 self.total_size += bytes.len();
509 self.entries.push(bytes);
510 }
511
512 pub fn len(&self) -> usize {
514 self.entries.len()
515 }
516
517 pub fn is_empty(&self) -> bool {
519 self.entries.is_empty()
520 }
521
522 pub fn total_size(&self) -> usize {
524 self.total_size
525 }
526
527 pub fn build(&self) -> Vec<u8> {
529 let mut buf = Vec::with_capacity(self.total_size + 8);
530
531 buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
533 buf.extend_from_slice(&(self.total_size as u32).to_le_bytes());
535
536 for entry in &self.entries {
538 buf.extend_from_slice(entry);
539 }
540
541 buf
542 }
543
544 pub fn clear(&mut self) {
546 self.entries.clear();
547 self.total_size = 0;
548 }
549}
550
551impl Default for WalBatchWriter {
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557pub struct WalBatchReader<'a> {
563 data: &'a [u8],
564 entry_count: usize,
565 #[allow(dead_code)]
566 current_offset: usize,
567}
568
569impl<'a> WalBatchReader<'a> {
570 pub fn from_bytes(data: &'a [u8]) -> Option<Self> {
571 if data.len() < 8 {
572 return None;
573 }
574
575 let entry_count = u32::from_le_bytes(data[0..4].try_into().ok()?) as usize;
576 let _total_size = u32::from_le_bytes(data[4..8].try_into().ok()?) as usize;
577
578 Some(Self {
579 data,
580 entry_count,
581 current_offset: 8,
582 })
583 }
584
585 pub fn entry_count(&self) -> usize {
587 self.entry_count
588 }
589
590 pub fn entries(&self) -> WalBatchIter<'a> {
592 WalBatchIter {
593 data: self.data,
594 offset: 8,
595 remaining: self.entry_count,
596 }
597 }
598}
599
600pub struct WalBatchIter<'a> {
602 data: &'a [u8],
603 offset: usize,
604 remaining: usize,
605}
606
607impl<'a> Iterator for WalBatchIter<'a> {
608 type Item = WalEntryReader<'a>;
609
610 fn next(&mut self) -> Option<Self::Item> {
611 if self.remaining == 0 {
612 return None;
613 }
614
615 let entry_data = &self.data[self.offset..];
616 let header = ZeroCopyHeader::read_from(entry_data)?;
617
618 let entry_len = header.total_length as usize;
619 let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
620
621 self.offset += entry_len;
622 self.remaining -= 1;
623
624 Some(entry)
625 }
626
627 fn size_hint(&self) -> (usize, Option<usize>) {
628 (self.remaining, Some(self.remaining))
629 }
630}
631
632impl<'a> ExactSizeIterator for WalBatchIter<'a> {}
633
634pub struct MmapWalReader {
640 mmap: memmap2::Mmap,
642 size: usize,
644}
645
646impl MmapWalReader {
647 pub fn open(path: &std::path::Path) -> std::io::Result<Self> {
649 let file = std::fs::File::open(path)?;
650 let metadata = file.metadata()?;
651 let size = metadata.len() as usize;
652
653 let mmap = unsafe { memmap2::Mmap::map(&file)? };
655
656 Ok(Self { mmap, size })
657 }
658
659 pub fn as_bytes(&self) -> &[u8] {
661 &self.mmap
662 }
663
664 pub fn size(&self) -> usize {
666 self.size
667 }
668
669 pub fn read_entry_at(&self, offset: usize) -> Option<WalEntryReader<'_>> {
671 if offset >= self.size {
672 return None;
673 }
674 WalEntryReader::from_bytes(&self.mmap[offset..])
675 }
676
677 pub fn entries(&self) -> MmapWalIter<'_> {
679 MmapWalIter {
680 data: &self.mmap,
681 offset: 0,
682 size: self.size,
683 }
684 }
685}
686
687pub struct MmapWalIter<'a> {
689 data: &'a [u8],
690 offset: usize,
691 size: usize,
692}
693
694impl<'a> Iterator for MmapWalIter<'a> {
695 type Item = WalEntryReader<'a>;
696
697 fn next(&mut self) -> Option<Self::Item> {
698 if self.offset >= self.size {
699 return None;
700 }
701
702 let entry_data = &self.data[self.offset..];
703 if entry_data.len() < HEADER_SIZE {
704 return None;
705 }
706
707 let header = ZeroCopyHeader::read_from(entry_data)?;
708 if !header.validate() {
709 return None;
710 }
711
712 let entry_len = header.total_length as usize;
713 if self.offset + entry_len > self.size {
714 return None;
715 }
716
717 let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
718 self.offset += entry_len;
719
720 Some(entry)
721 }
722}
723
724#[derive(Debug, Default)]
730pub struct SerdeStats {
731 pub entries_written: u64,
733 pub bytes_written: u64,
735 pub entries_read: u64,
737 pub bytes_read: u64,
739 pub crc_failures: u64,
741}
742
743impl SerdeStats {
744 pub fn new() -> Self {
745 Self::default()
746 }
747
748 pub fn record_write(&mut self, bytes: usize) {
749 self.entries_written += 1;
750 self.bytes_written += bytes as u64;
751 }
752
753 pub fn record_read(&mut self, bytes: usize) {
754 self.entries_read += 1;
755 self.bytes_read += bytes as u64;
756 }
757
758 pub fn record_crc_failure(&mut self) {
759 self.crc_failures += 1;
760 }
761}
762
763#[cfg(test)]
768mod tests {
769 use super::*;
770
771 #[test]
772 fn test_wal_entry_roundtrip() {
773 let mut builder = WalEntryBuilder::new(42, 100, WalEntryType::Insert);
774 builder.with_key(b"test_key").with_value(b"test_value");
775
776 let bytes = builder.build();
777 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
778
779 assert_eq!(reader.txn_id(), 42);
780 assert_eq!(reader.lsn(), 100);
781 assert_eq!(reader.entry_type(), Some(WalEntryType::Insert));
782 assert_eq!(reader.field_count(), 2);
783 assert_eq!(reader.key(), Some(b"test_key".as_slice()));
784 assert_eq!(reader.value(), Some(b"test_value".as_slice()));
785 }
786
787 #[test]
788 fn test_wal_entry_zero_copy_header() {
789 let header = WalEntryHeader::new(123, 456, WalEntryType::Update, 3);
790 let mut buf = vec![0u8; WAL_ENTRY_HEADER_SIZE];
791 header.write_to(&mut buf);
792
793 if let Some(read_header) = WalEntryHeader::read_from(&buf) {
795 assert_eq!(read_header.txn_id, 123);
796 assert_eq!(read_header.lsn, 456);
797 assert_eq!(read_header.entry_type, WalEntryType::Update as u8);
798 assert_eq!(read_header.field_count, 3);
799 }
800
801 let read_header = WalEntryHeader::read_from_copy(&buf).unwrap();
803 assert_eq!(read_header.txn_id, 123);
804 assert_eq!(read_header.lsn, 456);
805 }
806
807 #[test]
808 fn test_wal_entry_crc_validation() {
809 let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
810 builder.with_key(b"key");
811
812 let mut bytes = builder.build();
813
814 assert!(WalEntryReader::from_bytes(&bytes).is_some());
816
817 if bytes.len() > 20 {
819 bytes[20] ^= 0xFF;
820 }
821
822 assert!(WalEntryReader::from_bytes(&bytes).is_none());
824 }
825
826 #[test]
827 fn test_batch_writer_reader() {
828 let mut batch = WalBatchWriter::new();
829
830 for i in 0..10 {
831 let mut entry = WalEntryBuilder::new(i, i * 10, WalEntryType::Insert);
832 entry.with_key(format!("key_{}", i).as_bytes());
833 entry.with_value(format!("value_{}", i).as_bytes());
834 batch.add(entry);
835 }
836
837 assert_eq!(batch.len(), 10);
838
839 let bytes = batch.build();
840 let reader = WalBatchReader::from_bytes(&bytes).unwrap();
841
842 assert_eq!(reader.entry_count(), 10);
843
844 for (i, entry) in reader.entries().enumerate() {
845 assert_eq!(entry.txn_id(), i as u64);
846 assert_eq!(entry.key(), Some(format!("key_{}", i).as_bytes()));
847 }
848 }
849
850 #[test]
851 fn test_multiple_fields() {
852 let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Update);
853 builder.add_field(b"field_0");
854 builder.add_field(b"field_1");
855 builder.add_field(b"field_2");
856 builder.add_field(b"field_3");
857
858 let bytes = builder.build();
859 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
860
861 assert_eq!(reader.field_count(), 4);
862
863 let fields: Vec<_> = reader.fields().collect();
864 assert_eq!(fields.len(), 4);
865 assert_eq!(fields[0], b"field_0");
866 assert_eq!(fields[1], b"field_1");
867 assert_eq!(fields[2], b"field_2");
868 assert_eq!(fields[3], b"field_3");
869 }
870
871 #[test]
872 fn test_empty_fields() {
873 let builder = WalEntryBuilder::new(1, 1, WalEntryType::BeginTxn);
874
875 let bytes = builder.build();
876 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
877
878 assert_eq!(reader.field_count(), 0);
879 assert_eq!(reader.entry_type(), Some(WalEntryType::BeginTxn));
880 }
881
882 #[test]
883 fn test_large_value() {
884 let large_value = vec![0xAB; 1024 * 1024]; let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
887 builder.with_key(b"large_key").with_value(&large_value);
888
889 let bytes = builder.build();
890 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
891
892 assert_eq!(reader.value(), Some(large_value.as_slice()));
893 }
894
895 #[test]
896 fn test_header_validation() {
897 let header = ZeroCopyHeader::new(100, 0, 12345);
898 assert!(header.validate());
899
900 let mut bad_header = header;
901 bad_header.magic = 0xDEADBEEF;
902 assert!(!bad_header.validate());
903 }
904}