1use std::mem::size_of;
65
66pub const ZERO_COPY_MAGIC: u32 = 0x5A43_4F50; pub const FORMAT_VERSION: u16 = 1;
71
72pub const HEADER_SIZE: usize = 16;
74
75#[repr(C, packed)]
81#[derive(Debug, Clone, Copy)]
82pub struct ZeroCopyHeader {
83 pub magic: u32,
85 pub version: u16,
87 pub flags: u16,
89 pub total_length: u32,
91 pub crc: u32,
93}
94
95impl ZeroCopyHeader {
96 pub fn new(data_length: usize, flags: u16, crc: u32) -> Self {
98 Self {
99 magic: ZERO_COPY_MAGIC,
100 version: FORMAT_VERSION,
101 flags,
102 total_length: (HEADER_SIZE + data_length) as u32,
103 crc,
104 }
105 }
106
107 #[inline]
109 pub fn validate(&self) -> bool {
110 self.magic == ZERO_COPY_MAGIC && self.version <= FORMAT_VERSION
111 }
112
113 pub fn write_to(&self, buf: &mut [u8]) {
115 assert!(buf.len() >= HEADER_SIZE);
116 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
117 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
118 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
119 buf[8..12].copy_from_slice(&self.total_length.to_le_bytes());
120 buf[12..16].copy_from_slice(&self.crc.to_le_bytes());
121 }
122
123 pub fn read_from(buf: &[u8]) -> Option<Self> {
125 if buf.len() < HEADER_SIZE {
126 return None;
127 }
128 Some(Self {
129 magic: u32::from_le_bytes(buf[0..4].try_into().ok()?),
130 version: u16::from_le_bytes(buf[4..6].try_into().ok()?),
131 flags: u16::from_le_bytes(buf[6..8].try_into().ok()?),
132 total_length: u32::from_le_bytes(buf[8..12].try_into().ok()?),
133 crc: u32::from_le_bytes(buf[12..16].try_into().ok()?),
134 })
135 }
136}
137
138#[repr(u8)]
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum WalEntryType {
146 Insert = 1,
148 Update = 2,
150 Delete = 3,
152 BeginTxn = 4,
154 CommitTxn = 5,
156 AbortTxn = 6,
158 Checkpoint = 7,
160}
161
162impl WalEntryType {
163 pub fn from_u8(v: u8) -> Option<Self> {
164 match v {
165 1 => Some(Self::Insert),
166 2 => Some(Self::Update),
167 3 => Some(Self::Delete),
168 4 => Some(Self::BeginTxn),
169 5 => Some(Self::CommitTxn),
170 6 => Some(Self::AbortTxn),
171 7 => Some(Self::Checkpoint),
172 _ => None,
173 }
174 }
175}
176
177#[repr(C)]
179#[derive(Debug, Clone, Copy)]
180pub struct WalEntryHeader {
181 pub txn_id: u64,
183 pub lsn: u64,
185 pub timestamp: u64,
187 pub entry_type: u8,
189 pub field_count: u8,
191 pub _reserved: [u8; 6],
193}
194
195pub const WAL_ENTRY_HEADER_SIZE: usize = size_of::<WalEntryHeader>();
196
197impl WalEntryHeader {
198 pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType, field_count: u8) -> Self {
199 Self {
200 txn_id,
201 lsn,
202 timestamp: std::time::SystemTime::now()
203 .duration_since(std::time::UNIX_EPOCH)
204 .map(|d| d.as_nanos() as u64)
205 .unwrap_or(0),
206 entry_type: entry_type as u8,
207 field_count,
208 _reserved: [0; 6],
209 }
210 }
211
212 pub fn write_to(&self, buf: &mut [u8]) {
214 assert!(buf.len() >= WAL_ENTRY_HEADER_SIZE);
215 buf[0..8].copy_from_slice(&self.txn_id.to_le_bytes());
216 buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
217 buf[16..24].copy_from_slice(&self.timestamp.to_le_bytes());
218 buf[24] = self.entry_type;
219 buf[25] = self.field_count;
220 buf[26..32].copy_from_slice(&self._reserved);
221 }
222
223 #[inline]
225 pub fn read_from(buf: &[u8]) -> Option<&Self> {
226 if buf.len() < WAL_ENTRY_HEADER_SIZE {
227 return None;
228 }
229 unsafe {
232 let ptr = buf.as_ptr() as *const Self;
233 if ptr as usize % std::mem::align_of::<Self>() != 0 {
235 return None;
236 }
237 Some(&*ptr)
238 }
239 }
240
241 pub fn read_from_copy(buf: &[u8]) -> Option<Self> {
243 if buf.len() < WAL_ENTRY_HEADER_SIZE {
244 return None;
245 }
246 Some(Self {
247 txn_id: u64::from_le_bytes(buf[0..8].try_into().ok()?),
248 lsn: u64::from_le_bytes(buf[8..16].try_into().ok()?),
249 timestamp: u64::from_le_bytes(buf[16..24].try_into().ok()?),
250 entry_type: buf[24],
251 field_count: buf[25],
252 _reserved: buf[26..32].try_into().ok()?,
253 })
254 }
255}
256
257#[repr(C)]
263#[derive(Debug, Clone, Copy)]
264pub struct FieldDescriptor {
265 pub offset: u32,
267 pub length: u32,
269}
270
271pub const FIELD_DESCRIPTOR_SIZE: usize = size_of::<FieldDescriptor>();
272
273pub struct WalEntryBuilder {
275 header: WalEntryHeader,
277 fields: Vec<FieldDescriptor>,
279 data: Vec<u8>,
281}
282
283impl WalEntryBuilder {
284 pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType) -> Self {
286 Self {
287 header: WalEntryHeader::new(txn_id, lsn, entry_type, 0),
288 fields: Vec::new(),
289 data: Vec::new(),
290 }
291 }
292
293 pub fn add_field(&mut self, data: &[u8]) -> &mut Self {
295 let offset = self.data.len() as u32;
296 let length = data.len() as u32;
297 self.fields.push(FieldDescriptor { offset, length });
298 self.data.extend_from_slice(data);
299 self.header.field_count = self.fields.len() as u8;
300 self
301 }
302
303 pub fn with_key(&mut self, key: &[u8]) -> &mut Self {
305 self.add_field(key)
306 }
307
308 pub fn with_value(&mut self, value: &[u8]) -> &mut Self {
310 self.add_field(value)
311 }
312
313 pub fn total_size(&self) -> usize {
315 HEADER_SIZE +
316 WAL_ENTRY_HEADER_SIZE +
317 self.fields.len() * FIELD_DESCRIPTOR_SIZE +
318 self.data.len()
319 }
320
321 pub fn build(&self) -> Vec<u8> {
323 let data_len = WAL_ENTRY_HEADER_SIZE +
324 self.fields.len() * FIELD_DESCRIPTOR_SIZE +
325 self.data.len();
326
327 let mut buf = vec![0u8; HEADER_SIZE + data_len];
328
329 let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
331
332 let header = ZeroCopyHeader::new(data_len, 0, crc);
334 header.write_to(&mut buf[0..HEADER_SIZE]);
335
336 let offset = HEADER_SIZE;
338 self.header.write_to(&mut buf[offset..offset + WAL_ENTRY_HEADER_SIZE]);
339
340 let mut offset = HEADER_SIZE + WAL_ENTRY_HEADER_SIZE;
342 for field in &self.fields {
343 buf[offset..offset + 4].copy_from_slice(&field.offset.to_le_bytes());
344 buf[offset + 4..offset + 8].copy_from_slice(&field.length.to_le_bytes());
345 offset += FIELD_DESCRIPTOR_SIZE;
346 }
347
348 buf[offset..].copy_from_slice(&self.data);
350
351 let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
353 buf[12..16].copy_from_slice(&crc.to_le_bytes());
354
355 buf
356 }
357}
358
359pub struct WalEntryReader<'a> {
368 data: &'a [u8],
370 header: &'a WalEntryHeader,
372 field_count: usize,
374 fields_offset: usize,
376 data_offset: usize,
378}
379
380impl<'a> WalEntryReader<'a> {
381 pub fn from_bytes(bytes: &'a [u8]) -> Option<Self> {
383 let outer_header = ZeroCopyHeader::read_from(bytes)?;
385 if !outer_header.validate() {
386 return None;
387 }
388
389 let expected_crc = outer_header.crc;
391 let actual_crc = crc32fast::hash(&bytes[HEADER_SIZE..]);
392 if expected_crc != actual_crc {
393 return None;
394 }
395
396 let entry_data = &bytes[HEADER_SIZE..];
398 let header = WalEntryHeader::read_from(entry_data)?;
399
400 let field_count = header.field_count as usize;
401 let fields_offset = WAL_ENTRY_HEADER_SIZE;
402 let data_offset = fields_offset + field_count * FIELD_DESCRIPTOR_SIZE;
403
404 Some(Self {
405 data: entry_data,
406 header,
407 field_count,
408 fields_offset,
409 data_offset,
410 })
411 }
412
413 #[inline]
415 pub fn txn_id(&self) -> u64 {
416 self.header.txn_id
417 }
418
419 #[inline]
421 pub fn lsn(&self) -> u64 {
422 self.header.lsn
423 }
424
425 #[inline]
427 pub fn timestamp(&self) -> u64 {
428 self.header.timestamp
429 }
430
431 #[inline]
433 pub fn entry_type(&self) -> Option<WalEntryType> {
434 WalEntryType::from_u8(self.header.entry_type)
435 }
436
437 #[inline]
439 pub fn field_count(&self) -> usize {
440 self.field_count
441 }
442
443 #[inline]
445 pub fn get_field(&self, index: usize) -> Option<&'a [u8]> {
446 if index >= self.field_count {
447 return None;
448 }
449
450 let desc_offset = self.fields_offset + index * FIELD_DESCRIPTOR_SIZE;
451 let desc_bytes = self.data.get(desc_offset..desc_offset + FIELD_DESCRIPTOR_SIZE)?;
452
453 let offset = u32::from_le_bytes(desc_bytes[0..4].try_into().ok()?) as usize;
454 let length = u32::from_le_bytes(desc_bytes[4..8].try_into().ok()?) as usize;
455
456 let start = self.data_offset + offset;
457 self.data.get(start..start + length)
458 }
459
460 #[inline]
462 pub fn key(&self) -> Option<&'a [u8]> {
463 self.get_field(0)
464 }
465
466 #[inline]
468 pub fn value(&self) -> Option<&'a [u8]> {
469 self.get_field(1)
470 }
471
472 pub fn fields(&self) -> impl Iterator<Item = &'a [u8]> + '_ {
474 (0..self.field_count).filter_map(|i| self.get_field(i))
475 }
476}
477
478pub struct WalBatchWriter {
487 entries: Vec<Vec<u8>>,
489 total_size: usize,
491}
492
493impl WalBatchWriter {
494 pub fn new() -> Self {
495 Self {
496 entries: Vec::new(),
497 total_size: 0,
498 }
499 }
500
501 pub fn with_capacity(capacity: usize) -> Self {
502 Self {
503 entries: Vec::with_capacity(capacity),
504 total_size: 0,
505 }
506 }
507
508 pub fn add(&mut self, entry: WalEntryBuilder) {
510 let bytes = entry.build();
511 self.total_size += bytes.len();
512 self.entries.push(bytes);
513 }
514
515 pub fn len(&self) -> usize {
517 self.entries.len()
518 }
519
520 pub fn is_empty(&self) -> bool {
522 self.entries.is_empty()
523 }
524
525 pub fn total_size(&self) -> usize {
527 self.total_size
528 }
529
530 pub fn build(&self) -> Vec<u8> {
532 let mut buf = Vec::with_capacity(self.total_size + 8);
533
534 buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
536 buf.extend_from_slice(&(self.total_size as u32).to_le_bytes());
538
539 for entry in &self.entries {
541 buf.extend_from_slice(entry);
542 }
543
544 buf
545 }
546
547 pub fn clear(&mut self) {
549 self.entries.clear();
550 self.total_size = 0;
551 }
552}
553
554impl Default for WalBatchWriter {
555 fn default() -> Self {
556 Self::new()
557 }
558}
559
560pub struct WalBatchReader<'a> {
566 data: &'a [u8],
567 entry_count: usize,
568 #[allow(dead_code)]
569 current_offset: usize,
570}
571
572impl<'a> WalBatchReader<'a> {
573 pub fn from_bytes(data: &'a [u8]) -> Option<Self> {
574 if data.len() < 8 {
575 return None;
576 }
577
578 let entry_count = u32::from_le_bytes(data[0..4].try_into().ok()?) as usize;
579 let _total_size = u32::from_le_bytes(data[4..8].try_into().ok()?) as usize;
580
581 Some(Self {
582 data,
583 entry_count,
584 current_offset: 8,
585 })
586 }
587
588 pub fn entry_count(&self) -> usize {
590 self.entry_count
591 }
592
593 pub fn entries(&self) -> WalBatchIter<'a> {
595 WalBatchIter {
596 data: self.data,
597 offset: 8,
598 remaining: self.entry_count,
599 }
600 }
601}
602
603pub struct WalBatchIter<'a> {
605 data: &'a [u8],
606 offset: usize,
607 remaining: usize,
608}
609
610impl<'a> Iterator for WalBatchIter<'a> {
611 type Item = WalEntryReader<'a>;
612
613 fn next(&mut self) -> Option<Self::Item> {
614 if self.remaining == 0 {
615 return None;
616 }
617
618 let entry_data = &self.data[self.offset..];
619 let header = ZeroCopyHeader::read_from(entry_data)?;
620
621 let entry_len = header.total_length as usize;
622 let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
623
624 self.offset += entry_len;
625 self.remaining -= 1;
626
627 Some(entry)
628 }
629
630 fn size_hint(&self) -> (usize, Option<usize>) {
631 (self.remaining, Some(self.remaining))
632 }
633}
634
635impl<'a> ExactSizeIterator for WalBatchIter<'a> {}
636
637pub struct MmapWalReader {
643 mmap: memmap2::Mmap,
645 size: usize,
647}
648
649impl MmapWalReader {
650 pub fn open(path: &std::path::Path) -> std::io::Result<Self> {
652 let file = std::fs::File::open(path)?;
653 let metadata = file.metadata()?;
654 let size = metadata.len() as usize;
655
656 let mmap = unsafe { memmap2::Mmap::map(&file)? };
658
659 Ok(Self { mmap, size })
660 }
661
662 pub fn as_bytes(&self) -> &[u8] {
664 &self.mmap
665 }
666
667 pub fn size(&self) -> usize {
669 self.size
670 }
671
672 pub fn read_entry_at(&self, offset: usize) -> Option<WalEntryReader<'_>> {
674 if offset >= self.size {
675 return None;
676 }
677 WalEntryReader::from_bytes(&self.mmap[offset..])
678 }
679
680 pub fn entries(&self) -> MmapWalIter<'_> {
682 MmapWalIter {
683 data: &self.mmap,
684 offset: 0,
685 size: self.size,
686 }
687 }
688}
689
690pub struct MmapWalIter<'a> {
692 data: &'a [u8],
693 offset: usize,
694 size: usize,
695}
696
697impl<'a> Iterator for MmapWalIter<'a> {
698 type Item = WalEntryReader<'a>;
699
700 fn next(&mut self) -> Option<Self::Item> {
701 if self.offset >= self.size {
702 return None;
703 }
704
705 let entry_data = &self.data[self.offset..];
706 if entry_data.len() < HEADER_SIZE {
707 return None;
708 }
709
710 let header = ZeroCopyHeader::read_from(entry_data)?;
711 if !header.validate() {
712 return None;
713 }
714
715 let entry_len = header.total_length as usize;
716 if self.offset + entry_len > self.size {
717 return None;
718 }
719
720 let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
721 self.offset += entry_len;
722
723 Some(entry)
724 }
725}
726
727#[derive(Debug, Default)]
733pub struct SerdeStats {
734 pub entries_written: u64,
736 pub bytes_written: u64,
738 pub entries_read: u64,
740 pub bytes_read: u64,
742 pub crc_failures: u64,
744}
745
746impl SerdeStats {
747 pub fn new() -> Self {
748 Self::default()
749 }
750
751 pub fn record_write(&mut self, bytes: usize) {
752 self.entries_written += 1;
753 self.bytes_written += bytes as u64;
754 }
755
756 pub fn record_read(&mut self, bytes: usize) {
757 self.entries_read += 1;
758 self.bytes_read += bytes as u64;
759 }
760
761 pub fn record_crc_failure(&mut self) {
762 self.crc_failures += 1;
763 }
764}
765
766#[cfg(test)]
771mod tests {
772 use super::*;
773
774 #[test]
775 fn test_wal_entry_roundtrip() {
776 let mut builder = WalEntryBuilder::new(42, 100, WalEntryType::Insert);
777 builder.with_key(b"test_key").with_value(b"test_value");
778
779 let bytes = builder.build();
780 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
781
782 assert_eq!(reader.txn_id(), 42);
783 assert_eq!(reader.lsn(), 100);
784 assert_eq!(reader.entry_type(), Some(WalEntryType::Insert));
785 assert_eq!(reader.field_count(), 2);
786 assert_eq!(reader.key(), Some(b"test_key".as_slice()));
787 assert_eq!(reader.value(), Some(b"test_value".as_slice()));
788 }
789
790 #[test]
791 fn test_wal_entry_zero_copy_header() {
792 let header = WalEntryHeader::new(123, 456, WalEntryType::Update, 3);
793 let mut buf = vec![0u8; WAL_ENTRY_HEADER_SIZE];
794 header.write_to(&mut buf);
795
796 if let Some(read_header) = WalEntryHeader::read_from(&buf) {
798 assert_eq!(read_header.txn_id, 123);
799 assert_eq!(read_header.lsn, 456);
800 assert_eq!(read_header.entry_type, WalEntryType::Update as u8);
801 assert_eq!(read_header.field_count, 3);
802 }
803
804 let read_header = WalEntryHeader::read_from_copy(&buf).unwrap();
806 assert_eq!(read_header.txn_id, 123);
807 assert_eq!(read_header.lsn, 456);
808 }
809
810 #[test]
811 fn test_wal_entry_crc_validation() {
812 let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
813 builder.with_key(b"key");
814
815 let mut bytes = builder.build();
816
817 assert!(WalEntryReader::from_bytes(&bytes).is_some());
819
820 if bytes.len() > 20 {
822 bytes[20] ^= 0xFF;
823 }
824
825 assert!(WalEntryReader::from_bytes(&bytes).is_none());
827 }
828
829 #[test]
830 fn test_batch_writer_reader() {
831 let mut batch = WalBatchWriter::new();
832
833 for i in 0..10 {
834 let mut entry = WalEntryBuilder::new(i, i * 10, WalEntryType::Insert);
835 entry.with_key(format!("key_{}", i).as_bytes());
836 entry.with_value(format!("value_{}", i).as_bytes());
837 batch.add(entry);
838 }
839
840 assert_eq!(batch.len(), 10);
841
842 let bytes = batch.build();
843 let reader = WalBatchReader::from_bytes(&bytes).unwrap();
844
845 assert_eq!(reader.entry_count(), 10);
846
847 for (i, entry) in reader.entries().enumerate() {
848 assert_eq!(entry.txn_id(), i as u64);
849 assert_eq!(entry.key(), Some(format!("key_{}", i).as_bytes()));
850 }
851 }
852
853 #[test]
854 fn test_multiple_fields() {
855 let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Update);
856 builder.add_field(b"field_0");
857 builder.add_field(b"field_1");
858 builder.add_field(b"field_2");
859 builder.add_field(b"field_3");
860
861 let bytes = builder.build();
862 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
863
864 assert_eq!(reader.field_count(), 4);
865
866 let fields: Vec<_> = reader.fields().collect();
867 assert_eq!(fields.len(), 4);
868 assert_eq!(fields[0], b"field_0");
869 assert_eq!(fields[1], b"field_1");
870 assert_eq!(fields[2], b"field_2");
871 assert_eq!(fields[3], b"field_3");
872 }
873
874 #[test]
875 fn test_empty_fields() {
876 let builder = WalEntryBuilder::new(1, 1, WalEntryType::BeginTxn);
877
878 let bytes = builder.build();
879 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
880
881 assert_eq!(reader.field_count(), 0);
882 assert_eq!(reader.entry_type(), Some(WalEntryType::BeginTxn));
883 }
884
885 #[test]
886 fn test_large_value() {
887 let large_value = vec![0xAB; 1024 * 1024]; let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
890 builder.with_key(b"large_key").with_value(&large_value);
891
892 let bytes = builder.build();
893 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
894
895 assert_eq!(reader.value(), Some(large_value.as_slice()));
896 }
897
898 #[test]
899 fn test_header_validation() {
900 let header = ZeroCopyHeader::new(100, 0, 12345);
901 assert!(header.validate());
902
903 let mut bad_header = header;
904 bad_header.magic = 0xDEADBEEF;
905 assert!(!bad_header.validate());
906 }
907}