1use std::mem::size_of;
65
66pub const ZERO_COPY_MAGIC: u32 = 0x5A43_4F50; pub const FORMAT_VERSION: u16 = 1;
71
72pub const HEADER_SIZE: usize = 16;
74
75#[repr(C, packed)]
81#[derive(Debug, Clone, Copy)]
82pub struct ZeroCopyHeader {
83 pub magic: u32,
85 pub version: u16,
87 pub flags: u16,
89 pub total_length: u32,
91 pub crc: u32,
93}
94
95impl ZeroCopyHeader {
96 pub fn new(data_length: usize, flags: u16, crc: u32) -> Self {
98 Self {
99 magic: ZERO_COPY_MAGIC,
100 version: FORMAT_VERSION,
101 flags,
102 total_length: (HEADER_SIZE + data_length) as u32,
103 crc,
104 }
105 }
106
107 #[inline]
109 pub fn validate(&self) -> bool {
110 self.magic == ZERO_COPY_MAGIC && self.version <= FORMAT_VERSION
111 }
112
113 pub fn write_to(&self, buf: &mut [u8]) {
115 assert!(buf.len() >= HEADER_SIZE);
116 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
117 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
118 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
119 buf[8..12].copy_from_slice(&self.total_length.to_le_bytes());
120 buf[12..16].copy_from_slice(&self.crc.to_le_bytes());
121 }
122
123 pub fn read_from(buf: &[u8]) -> Option<Self> {
125 if buf.len() < HEADER_SIZE {
126 return None;
127 }
128 Some(Self {
129 magic: u32::from_le_bytes(buf[0..4].try_into().ok()?),
130 version: u16::from_le_bytes(buf[4..6].try_into().ok()?),
131 flags: u16::from_le_bytes(buf[6..8].try_into().ok()?),
132 total_length: u32::from_le_bytes(buf[8..12].try_into().ok()?),
133 crc: u32::from_le_bytes(buf[12..16].try_into().ok()?),
134 })
135 }
136}
137
138#[repr(u8)]
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum WalEntryType {
146 Insert = 1,
148 Update = 2,
150 Delete = 3,
152 BeginTxn = 4,
154 CommitTxn = 5,
156 AbortTxn = 6,
158 Checkpoint = 7,
160}
161
162impl WalEntryType {
163 pub fn from_u8(v: u8) -> Option<Self> {
164 match v {
165 1 => Some(Self::Insert),
166 2 => Some(Self::Update),
167 3 => Some(Self::Delete),
168 4 => Some(Self::BeginTxn),
169 5 => Some(Self::CommitTxn),
170 6 => Some(Self::AbortTxn),
171 7 => Some(Self::Checkpoint),
172 _ => None,
173 }
174 }
175}
176
177#[repr(C)]
179#[derive(Debug, Clone, Copy)]
180pub struct WalEntryHeader {
181 pub txn_id: u64,
183 pub lsn: u64,
185 pub timestamp: u64,
187 pub entry_type: u8,
189 pub field_count: u8,
191 pub _reserved: [u8; 6],
193}
194
195pub const WAL_ENTRY_HEADER_SIZE: usize = size_of::<WalEntryHeader>();
196
197impl WalEntryHeader {
198 pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType, field_count: u8) -> Self {
199 Self {
200 txn_id,
201 lsn,
202 timestamp: std::time::SystemTime::now()
203 .duration_since(std::time::UNIX_EPOCH)
204 .map(|d| d.as_nanos() as u64)
205 .unwrap_or(0),
206 entry_type: entry_type as u8,
207 field_count,
208 _reserved: [0; 6],
209 }
210 }
211
212 pub fn write_to(&self, buf: &mut [u8]) {
214 assert!(buf.len() >= WAL_ENTRY_HEADER_SIZE);
215 buf[0..8].copy_from_slice(&self.txn_id.to_le_bytes());
216 buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
217 buf[16..24].copy_from_slice(&self.timestamp.to_le_bytes());
218 buf[24] = self.entry_type;
219 buf[25] = self.field_count;
220 buf[26..32].copy_from_slice(&self._reserved);
221 }
222
223 #[inline]
225 pub fn read_from(buf: &[u8]) -> Option<&Self> {
226 if buf.len() < WAL_ENTRY_HEADER_SIZE {
227 return None;
228 }
229 unsafe {
232 let ptr = buf.as_ptr() as *const Self;
233 if ptr as usize % std::mem::align_of::<Self>() != 0 {
235 return None;
236 }
237 Some(&*ptr)
238 }
239 }
240
241 pub fn read_from_copy(buf: &[u8]) -> Option<Self> {
243 if buf.len() < WAL_ENTRY_HEADER_SIZE {
244 return None;
245 }
246 Some(Self {
247 txn_id: u64::from_le_bytes(buf[0..8].try_into().ok()?),
248 lsn: u64::from_le_bytes(buf[8..16].try_into().ok()?),
249 timestamp: u64::from_le_bytes(buf[16..24].try_into().ok()?),
250 entry_type: buf[24],
251 field_count: buf[25],
252 _reserved: buf[26..32].try_into().ok()?,
253 })
254 }
255}
256
257#[repr(C)]
263#[derive(Debug, Clone, Copy)]
264pub struct FieldDescriptor {
265 pub offset: u32,
267 pub length: u32,
269}
270
271pub const FIELD_DESCRIPTOR_SIZE: usize = size_of::<FieldDescriptor>();
272
273pub struct WalEntryBuilder {
275 header: WalEntryHeader,
277 fields: Vec<FieldDescriptor>,
279 data: Vec<u8>,
281}
282
283impl WalEntryBuilder {
284 pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType) -> Self {
286 Self {
287 header: WalEntryHeader::new(txn_id, lsn, entry_type, 0),
288 fields: Vec::new(),
289 data: Vec::new(),
290 }
291 }
292
293 pub fn add_field(&mut self, data: &[u8]) -> &mut Self {
295 let offset = self.data.len() as u32;
296 let length = data.len() as u32;
297 self.fields.push(FieldDescriptor { offset, length });
298 self.data.extend_from_slice(data);
299 self.header.field_count = self.fields.len() as u8;
300 self
301 }
302
303 pub fn with_key(&mut self, key: &[u8]) -> &mut Self {
305 self.add_field(key)
306 }
307
308 pub fn with_value(&mut self, value: &[u8]) -> &mut Self {
310 self.add_field(value)
311 }
312
313 pub fn total_size(&self) -> usize {
315 HEADER_SIZE
316 + WAL_ENTRY_HEADER_SIZE
317 + self.fields.len() * FIELD_DESCRIPTOR_SIZE
318 + self.data.len()
319 }
320
321 pub fn build(&self) -> Vec<u8> {
323 let data_len =
324 WAL_ENTRY_HEADER_SIZE + self.fields.len() * FIELD_DESCRIPTOR_SIZE + self.data.len();
325
326 let mut buf = vec![0u8; HEADER_SIZE + data_len];
327
328 let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
330
331 let header = ZeroCopyHeader::new(data_len, 0, crc);
333 header.write_to(&mut buf[0..HEADER_SIZE]);
334
335 let offset = HEADER_SIZE;
337 self.header
338 .write_to(&mut buf[offset..offset + WAL_ENTRY_HEADER_SIZE]);
339
340 let mut offset = HEADER_SIZE + WAL_ENTRY_HEADER_SIZE;
342 for field in &self.fields {
343 buf[offset..offset + 4].copy_from_slice(&field.offset.to_le_bytes());
344 buf[offset + 4..offset + 8].copy_from_slice(&field.length.to_le_bytes());
345 offset += FIELD_DESCRIPTOR_SIZE;
346 }
347
348 buf[offset..].copy_from_slice(&self.data);
350
351 let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
353 buf[12..16].copy_from_slice(&crc.to_le_bytes());
354
355 buf
356 }
357}
358
359pub struct WalEntryReader<'a> {
368 data: &'a [u8],
370 header: &'a WalEntryHeader,
372 field_count: usize,
374 fields_offset: usize,
376 data_offset: usize,
378}
379
380impl<'a> WalEntryReader<'a> {
381 pub fn from_bytes(bytes: &'a [u8]) -> Option<Self> {
383 let outer_header = ZeroCopyHeader::read_from(bytes)?;
385 if !outer_header.validate() {
386 return None;
387 }
388
389 let expected_crc = outer_header.crc;
391 let actual_crc = crc32fast::hash(&bytes[HEADER_SIZE..]);
392 if expected_crc != actual_crc {
393 return None;
394 }
395
396 let entry_data = &bytes[HEADER_SIZE..];
398 let header = WalEntryHeader::read_from(entry_data)?;
399
400 let field_count = header.field_count as usize;
401 let fields_offset = WAL_ENTRY_HEADER_SIZE;
402 let data_offset = fields_offset + field_count * FIELD_DESCRIPTOR_SIZE;
403
404 Some(Self {
405 data: entry_data,
406 header,
407 field_count,
408 fields_offset,
409 data_offset,
410 })
411 }
412
413 #[inline]
415 pub fn txn_id(&self) -> u64 {
416 self.header.txn_id
417 }
418
419 #[inline]
421 pub fn lsn(&self) -> u64 {
422 self.header.lsn
423 }
424
425 #[inline]
427 pub fn timestamp(&self) -> u64 {
428 self.header.timestamp
429 }
430
431 #[inline]
433 pub fn entry_type(&self) -> Option<WalEntryType> {
434 WalEntryType::from_u8(self.header.entry_type)
435 }
436
437 #[inline]
439 pub fn field_count(&self) -> usize {
440 self.field_count
441 }
442
443 #[inline]
445 pub fn get_field(&self, index: usize) -> Option<&'a [u8]> {
446 if index >= self.field_count {
447 return None;
448 }
449
450 let desc_offset = self.fields_offset + index * FIELD_DESCRIPTOR_SIZE;
451 let desc_bytes = self
452 .data
453 .get(desc_offset..desc_offset + FIELD_DESCRIPTOR_SIZE)?;
454
455 let offset = u32::from_le_bytes(desc_bytes[0..4].try_into().ok()?) as usize;
456 let length = u32::from_le_bytes(desc_bytes[4..8].try_into().ok()?) as usize;
457
458 let start = self.data_offset + offset;
459 self.data.get(start..start + length)
460 }
461
462 #[inline]
464 pub fn key(&self) -> Option<&'a [u8]> {
465 self.get_field(0)
466 }
467
468 #[inline]
470 pub fn value(&self) -> Option<&'a [u8]> {
471 self.get_field(1)
472 }
473
474 pub fn fields(&self) -> impl Iterator<Item = &'a [u8]> + '_ {
476 (0..self.field_count).filter_map(|i| self.get_field(i))
477 }
478}
479
480pub struct WalBatchWriter {
489 entries: Vec<Vec<u8>>,
491 total_size: usize,
493}
494
495impl WalBatchWriter {
496 pub fn new() -> Self {
497 Self {
498 entries: Vec::new(),
499 total_size: 0,
500 }
501 }
502
503 pub fn with_capacity(capacity: usize) -> Self {
504 Self {
505 entries: Vec::with_capacity(capacity),
506 total_size: 0,
507 }
508 }
509
510 pub fn add(&mut self, entry: WalEntryBuilder) {
512 let bytes = entry.build();
513 self.total_size += bytes.len();
514 self.entries.push(bytes);
515 }
516
517 pub fn len(&self) -> usize {
519 self.entries.len()
520 }
521
522 pub fn is_empty(&self) -> bool {
524 self.entries.is_empty()
525 }
526
527 pub fn total_size(&self) -> usize {
529 self.total_size
530 }
531
532 pub fn build(&self) -> Vec<u8> {
534 let mut buf = Vec::with_capacity(self.total_size + 8);
535
536 buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
538 buf.extend_from_slice(&(self.total_size as u32).to_le_bytes());
540
541 for entry in &self.entries {
543 buf.extend_from_slice(entry);
544 }
545
546 buf
547 }
548
549 pub fn clear(&mut self) {
551 self.entries.clear();
552 self.total_size = 0;
553 }
554}
555
556impl Default for WalBatchWriter {
557 fn default() -> Self {
558 Self::new()
559 }
560}
561
562pub struct WalBatchReader<'a> {
568 data: &'a [u8],
569 entry_count: usize,
570 #[allow(dead_code)]
571 current_offset: usize,
572}
573
574impl<'a> WalBatchReader<'a> {
575 pub fn from_bytes(data: &'a [u8]) -> Option<Self> {
576 if data.len() < 8 {
577 return None;
578 }
579
580 let entry_count = u32::from_le_bytes(data[0..4].try_into().ok()?) as usize;
581 let _total_size = u32::from_le_bytes(data[4..8].try_into().ok()?) as usize;
582
583 Some(Self {
584 data,
585 entry_count,
586 current_offset: 8,
587 })
588 }
589
590 pub fn entry_count(&self) -> usize {
592 self.entry_count
593 }
594
595 pub fn entries(&self) -> WalBatchIter<'a> {
597 WalBatchIter {
598 data: self.data,
599 offset: 8,
600 remaining: self.entry_count,
601 }
602 }
603}
604
605pub struct WalBatchIter<'a> {
607 data: &'a [u8],
608 offset: usize,
609 remaining: usize,
610}
611
612impl<'a> Iterator for WalBatchIter<'a> {
613 type Item = WalEntryReader<'a>;
614
615 fn next(&mut self) -> Option<Self::Item> {
616 if self.remaining == 0 {
617 return None;
618 }
619
620 let entry_data = &self.data[self.offset..];
621 let header = ZeroCopyHeader::read_from(entry_data)?;
622
623 let entry_len = header.total_length as usize;
624 let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
625
626 self.offset += entry_len;
627 self.remaining -= 1;
628
629 Some(entry)
630 }
631
632 fn size_hint(&self) -> (usize, Option<usize>) {
633 (self.remaining, Some(self.remaining))
634 }
635}
636
637impl<'a> ExactSizeIterator for WalBatchIter<'a> {}
638
639pub struct MmapWalReader {
645 mmap: memmap2::Mmap,
647 size: usize,
649}
650
651impl MmapWalReader {
652 pub fn open(path: &std::path::Path) -> std::io::Result<Self> {
654 let file = std::fs::File::open(path)?;
655 let metadata = file.metadata()?;
656 let size = metadata.len() as usize;
657
658 let mmap = unsafe { memmap2::Mmap::map(&file)? };
660
661 Ok(Self { mmap, size })
662 }
663
664 pub fn as_bytes(&self) -> &[u8] {
666 &self.mmap
667 }
668
669 pub fn size(&self) -> usize {
671 self.size
672 }
673
674 pub fn read_entry_at(&self, offset: usize) -> Option<WalEntryReader<'_>> {
676 if offset >= self.size {
677 return None;
678 }
679 WalEntryReader::from_bytes(&self.mmap[offset..])
680 }
681
682 pub fn entries(&self) -> MmapWalIter<'_> {
684 MmapWalIter {
685 data: &self.mmap,
686 offset: 0,
687 size: self.size,
688 }
689 }
690}
691
692pub struct MmapWalIter<'a> {
694 data: &'a [u8],
695 offset: usize,
696 size: usize,
697}
698
699impl<'a> Iterator for MmapWalIter<'a> {
700 type Item = WalEntryReader<'a>;
701
702 fn next(&mut self) -> Option<Self::Item> {
703 if self.offset >= self.size {
704 return None;
705 }
706
707 let entry_data = &self.data[self.offset..];
708 if entry_data.len() < HEADER_SIZE {
709 return None;
710 }
711
712 let header = ZeroCopyHeader::read_from(entry_data)?;
713 if !header.validate() {
714 return None;
715 }
716
717 let entry_len = header.total_length as usize;
718 if self.offset + entry_len > self.size {
719 return None;
720 }
721
722 let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
723 self.offset += entry_len;
724
725 Some(entry)
726 }
727}
728
729#[derive(Debug, Default)]
735pub struct SerdeStats {
736 pub entries_written: u64,
738 pub bytes_written: u64,
740 pub entries_read: u64,
742 pub bytes_read: u64,
744 pub crc_failures: u64,
746}
747
748impl SerdeStats {
749 pub fn new() -> Self {
750 Self::default()
751 }
752
753 pub fn record_write(&mut self, bytes: usize) {
754 self.entries_written += 1;
755 self.bytes_written += bytes as u64;
756 }
757
758 pub fn record_read(&mut self, bytes: usize) {
759 self.entries_read += 1;
760 self.bytes_read += bytes as u64;
761 }
762
763 pub fn record_crc_failure(&mut self) {
764 self.crc_failures += 1;
765 }
766}
767
768#[cfg(test)]
773mod tests {
774 use super::*;
775
776 #[test]
777 fn test_wal_entry_roundtrip() {
778 let mut builder = WalEntryBuilder::new(42, 100, WalEntryType::Insert);
779 builder.with_key(b"test_key").with_value(b"test_value");
780
781 let bytes = builder.build();
782 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
783
784 assert_eq!(reader.txn_id(), 42);
785 assert_eq!(reader.lsn(), 100);
786 assert_eq!(reader.entry_type(), Some(WalEntryType::Insert));
787 assert_eq!(reader.field_count(), 2);
788 assert_eq!(reader.key(), Some(b"test_key".as_slice()));
789 assert_eq!(reader.value(), Some(b"test_value".as_slice()));
790 }
791
792 #[test]
793 fn test_wal_entry_zero_copy_header() {
794 let header = WalEntryHeader::new(123, 456, WalEntryType::Update, 3);
795 let mut buf = vec![0u8; WAL_ENTRY_HEADER_SIZE];
796 header.write_to(&mut buf);
797
798 if let Some(read_header) = WalEntryHeader::read_from(&buf) {
800 assert_eq!(read_header.txn_id, 123);
801 assert_eq!(read_header.lsn, 456);
802 assert_eq!(read_header.entry_type, WalEntryType::Update as u8);
803 assert_eq!(read_header.field_count, 3);
804 }
805
806 let read_header = WalEntryHeader::read_from_copy(&buf).unwrap();
808 assert_eq!(read_header.txn_id, 123);
809 assert_eq!(read_header.lsn, 456);
810 }
811
812 #[test]
813 fn test_wal_entry_crc_validation() {
814 let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
815 builder.with_key(b"key");
816
817 let mut bytes = builder.build();
818
819 assert!(WalEntryReader::from_bytes(&bytes).is_some());
821
822 if bytes.len() > 20 {
824 bytes[20] ^= 0xFF;
825 }
826
827 assert!(WalEntryReader::from_bytes(&bytes).is_none());
829 }
830
831 #[test]
832 fn test_batch_writer_reader() {
833 let mut batch = WalBatchWriter::new();
834
835 for i in 0..10 {
836 let mut entry = WalEntryBuilder::new(i, i * 10, WalEntryType::Insert);
837 entry.with_key(format!("key_{}", i).as_bytes());
838 entry.with_value(format!("value_{}", i).as_bytes());
839 batch.add(entry);
840 }
841
842 assert_eq!(batch.len(), 10);
843
844 let bytes = batch.build();
845 let reader = WalBatchReader::from_bytes(&bytes).unwrap();
846
847 assert_eq!(reader.entry_count(), 10);
848
849 for (i, entry) in reader.entries().enumerate() {
850 assert_eq!(entry.txn_id(), i as u64);
851 assert_eq!(entry.key(), Some(format!("key_{}", i).as_bytes()));
852 }
853 }
854
855 #[test]
856 fn test_multiple_fields() {
857 let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Update);
858 builder.add_field(b"field_0");
859 builder.add_field(b"field_1");
860 builder.add_field(b"field_2");
861 builder.add_field(b"field_3");
862
863 let bytes = builder.build();
864 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
865
866 assert_eq!(reader.field_count(), 4);
867
868 let fields: Vec<_> = reader.fields().collect();
869 assert_eq!(fields.len(), 4);
870 assert_eq!(fields[0], b"field_0");
871 assert_eq!(fields[1], b"field_1");
872 assert_eq!(fields[2], b"field_2");
873 assert_eq!(fields[3], b"field_3");
874 }
875
876 #[test]
877 fn test_empty_fields() {
878 let builder = WalEntryBuilder::new(1, 1, WalEntryType::BeginTxn);
879
880 let bytes = builder.build();
881 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
882
883 assert_eq!(reader.field_count(), 0);
884 assert_eq!(reader.entry_type(), Some(WalEntryType::BeginTxn));
885 }
886
887 #[test]
888 fn test_large_value() {
889 let large_value = vec![0xAB; 1024 * 1024]; let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
892 builder.with_key(b"large_key").with_value(&large_value);
893
894 let bytes = builder.build();
895 let reader = WalEntryReader::from_bytes(&bytes).unwrap();
896
897 assert_eq!(reader.value(), Some(large_value.as_slice()));
898 }
899
900 #[test]
901 fn test_header_validation() {
902 let header = ZeroCopyHeader::new(100, 0, 12345);
903 assert!(header.validate());
904
905 let mut bad_header = header;
906 bad_header.magic = 0xDEADBEEF;
907 assert!(!bad_header.validate());
908 }
909}