1use std::io::{self, Read, Write};
41use std::sync::atomic::{AtomicU64, Ordering};
42
43const COLUMNAR_WAL_MAGIC: [u8; 4] = [0x43, 0x57, 0x01, 0x00]; const DEFAULT_BATCH_SIZE: usize = 256;
48
49const MAX_KEY_SIZE: usize = 256;
51
52#[allow(dead_code)]
54const MAX_VALUE_SIZE: usize = 1024 * 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
62#[repr(u8)]
63pub enum WalOpType {
64 Put = 0,
66 Delete = 1,
68 BeginTxn = 2,
70 CommitTxn = 3,
72 AbortTxn = 4,
74 Checkpoint = 5,
76}
77
78impl WalOpType {
79 fn from_u8(v: u8) -> Option<Self> {
80 match v {
81 0 => Some(Self::Put),
82 1 => Some(Self::Delete),
83 2 => Some(Self::BeginTxn),
84 3 => Some(Self::CommitTxn),
85 4 => Some(Self::AbortTxn),
86 5 => Some(Self::Checkpoint),
87 _ => None,
88 }
89 }
90}
91
92#[derive(Clone)]
94pub struct WalEntry {
95 pub op: WalOpType,
97 pub txn_id: u64,
99 pub timestamp: u64,
101 pub key: Vec<u8>,
103 pub value: Vec<u8>,
105}
106
107impl WalEntry {
108 pub fn put(txn_id: u64, timestamp: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
110 Self {
111 op: WalOpType::Put,
112 txn_id,
113 timestamp,
114 key,
115 value,
116 }
117 }
118
119 pub fn delete(txn_id: u64, timestamp: u64, key: Vec<u8>) -> Self {
121 Self {
122 op: WalOpType::Delete,
123 txn_id,
124 timestamp,
125 key,
126 value: Vec::new(),
127 }
128 }
129
130 pub fn begin_txn(txn_id: u64, timestamp: u64) -> Self {
132 Self {
133 op: WalOpType::BeginTxn,
134 txn_id,
135 timestamp,
136 key: Vec::new(),
137 value: Vec::new(),
138 }
139 }
140
141 pub fn commit_txn(txn_id: u64, timestamp: u64) -> Self {
143 Self {
144 op: WalOpType::CommitTxn,
145 txn_id,
146 timestamp,
147 key: Vec::new(),
148 value: Vec::new(),
149 }
150 }
151}
152
153#[derive(Clone, Copy)]
159#[repr(C, packed)]
160struct BlockHeader {
161 magic: [u8; 4],
163 version: u8,
165 entry_count: u16,
167 _reserved: u8,
169 op_lane_offset: u32,
171 txn_lane_offset: u32,
173 ts_lane_offset: u32,
175 key_len_lane_offset: u32,
177 key_data_lane_offset: u32,
179 value_len_lane_offset: u32,
181 value_data_lane_offset: u32,
183 block_size: u32,
185 checksum: u32,
187}
188
189pub struct ColumnarWalBlock {
191 entries: Vec<WalEntry>,
193 batch_size: usize,
195}
196
197impl ColumnarWalBlock {
198 pub fn new() -> Self {
200 Self::with_batch_size(DEFAULT_BATCH_SIZE)
201 }
202
203 pub fn with_batch_size(batch_size: usize) -> Self {
205 Self {
206 entries: Vec::with_capacity(batch_size),
207 batch_size,
208 }
209 }
210
211 pub fn add_entry(&mut self, entry: WalEntry) -> bool {
213 if self.entries.len() >= self.batch_size {
214 return false;
215 }
216 self.entries.push(entry);
217 true
218 }
219
220 pub fn is_full(&self) -> bool {
222 self.entries.len() >= self.batch_size
223 }
224
225 pub fn len(&self) -> usize {
227 self.entries.len()
228 }
229
230 pub fn is_empty(&self) -> bool {
232 self.entries.is_empty()
233 }
234
235 pub fn entries(&self) -> &[WalEntry] {
237 &self.entries
238 }
239
240 pub fn serialize(&self) -> Vec<u8> {
242 let entry_count = self.entries.len();
243 if entry_count == 0 {
244 return Vec::new();
245 }
246
247 let op_lane_size = entry_count;
249 let txn_lane_size = entry_count * 8;
250 let ts_lane_size = entry_count * 8; let key_len_size = entry_count * 2; let key_data_size: usize = self.entries.iter().map(|e| e.key.len()).sum();
253 let value_len_size = entry_count * 4; let value_data_size: usize = self.entries.iter().map(|e| e.value.len()).sum();
255
256 let header_size = std::mem::size_of::<BlockHeader>();
257 let total_size = header_size
258 + op_lane_size
259 + txn_lane_size
260 + ts_lane_size
261 + key_len_size
262 + key_data_size
263 + value_len_size
264 + value_data_size;
265
266 let mut buffer = vec![0u8; total_size];
267 let mut offset = header_size;
268
269 let op_lane_offset = offset as u32;
271 for entry in &self.entries {
272 buffer[offset] = entry.op as u8;
273 offset += 1;
274 }
275
276 let txn_lane_offset = offset as u32;
278 for entry in &self.entries {
279 buffer[offset..offset + 8].copy_from_slice(&entry.txn_id.to_le_bytes());
280 offset += 8;
281 }
282
283 let ts_lane_offset = offset as u32;
285 for entry in &self.entries {
286 buffer[offset..offset + 8].copy_from_slice(&entry.timestamp.to_le_bytes());
287 offset += 8;
288 }
289
290 let key_len_lane_offset = offset as u32;
292 for entry in &self.entries {
293 let len = entry.key.len().min(MAX_KEY_SIZE) as u16;
294 buffer[offset..offset + 2].copy_from_slice(&len.to_le_bytes());
295 offset += 2;
296 }
297
298 let key_data_lane_offset = offset as u32;
300 for entry in &self.entries {
301 let len = entry.key.len().min(MAX_KEY_SIZE);
302 buffer[offset..offset + len].copy_from_slice(&entry.key[..len]);
303 offset += len;
304 }
305
306 let value_len_lane_offset = offset as u32;
308 for entry in &self.entries {
309 let len = entry.value.len() as u32;
310 buffer[offset..offset + 4].copy_from_slice(&len.to_le_bytes());
311 offset += 4;
312 }
313
314 let value_data_lane_offset = offset as u32;
316 for entry in &self.entries {
317 buffer[offset..offset + entry.value.len()].copy_from_slice(&entry.value);
318 offset += entry.value.len();
319 }
320
321 let checksum = crc32_simple(&buffer[header_size..offset]);
323
324 let header = BlockHeader {
326 magic: COLUMNAR_WAL_MAGIC,
327 version: 1,
328 entry_count: entry_count as u16,
329 _reserved: 0,
330 op_lane_offset,
331 txn_lane_offset,
332 ts_lane_offset,
333 key_len_lane_offset,
334 key_data_lane_offset,
335 value_len_lane_offset,
336 value_data_lane_offset,
337 block_size: offset as u32,
338 checksum,
339 };
340
341 let header_bytes = unsafe {
343 std::slice::from_raw_parts(
344 &header as *const BlockHeader as *const u8,
345 std::mem::size_of::<BlockHeader>(),
346 )
347 };
348 buffer[..header_size].copy_from_slice(header_bytes);
349
350 buffer.truncate(offset);
351 buffer
352 }
353
354 pub fn deserialize(data: &[u8]) -> io::Result<Self> {
356 let header_size = std::mem::size_of::<BlockHeader>();
357 if data.len() < header_size {
358 return Err(io::Error::new(io::ErrorKind::InvalidData, "buffer too small"));
359 }
360
361 let header = unsafe { &*(data.as_ptr() as *const BlockHeader) };
363
364 if header.magic != COLUMNAR_WAL_MAGIC {
366 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
367 }
368
369 let expected_checksum = header.checksum;
371 let actual_checksum = crc32_simple(&data[header_size..header.block_size as usize]);
372 if expected_checksum != actual_checksum {
373 return Err(io::Error::new(io::ErrorKind::InvalidData, "checksum mismatch"));
374 }
375
376 let entry_count = header.entry_count as usize;
377 let mut entries = Vec::with_capacity(entry_count);
378
379 let op_lane = &data[header.op_lane_offset as usize..];
381 let txn_lane = &data[header.txn_lane_offset as usize..];
382 let ts_lane = &data[header.ts_lane_offset as usize..];
383 let key_len_lane = &data[header.key_len_lane_offset as usize..];
384 let key_data_lane = &data[header.key_data_lane_offset as usize..];
385 let value_len_lane = &data[header.value_len_lane_offset as usize..];
386 let value_data_lane = &data[header.value_data_lane_offset as usize..];
387
388 let mut key_offset = 0usize;
389 let mut value_offset = 0usize;
390
391 for i in 0..entry_count {
392 let op = WalOpType::from_u8(op_lane[i])
393 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid op"))?;
394
395 let txn_id = u64::from_le_bytes(txn_lane[i * 8..i * 8 + 8].try_into().unwrap());
396 let timestamp = u64::from_le_bytes(ts_lane[i * 8..i * 8 + 8].try_into().unwrap());
397 let key_len = u16::from_le_bytes(key_len_lane[i * 2..i * 2 + 2].try_into().unwrap()) as usize;
398 let value_len = u32::from_le_bytes(value_len_lane[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
399
400 let key = key_data_lane[key_offset..key_offset + key_len].to_vec();
401 key_offset += key_len;
402
403 let value = value_data_lane[value_offset..value_offset + value_len].to_vec();
404 value_offset += value_len;
405
406 entries.push(WalEntry {
407 op,
408 txn_id,
409 timestamp,
410 key,
411 value,
412 });
413 }
414
415 Ok(Self {
416 entries,
417 batch_size: DEFAULT_BATCH_SIZE,
418 })
419 }
420
421 pub fn clear(&mut self) {
423 self.entries.clear();
424 }
425}
426
427impl Default for ColumnarWalBlock {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433pub struct SimdTimestampDecoder {
439 base_ts: u64,
441}
442
443impl SimdTimestampDecoder {
444 pub fn new(base_ts: u64) -> Self {
446 Self { base_ts }
447 }
448
449 #[cfg(target_arch = "x86_64")]
454 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
455 #[cfg(target_arch = "x86_64")]
456 {
457 if is_x86_feature_detected!("avx2") && deltas.len() >= 4 {
458 unsafe { self.decode_deltas_avx2_impl(deltas, output) }
459 return;
460 }
461 }
462 self.decode_deltas_scalar(deltas, output);
463 }
464
465 #[cfg(target_arch = "x86_64")]
467 #[target_feature(enable = "avx2")]
468 unsafe fn decode_deltas_avx2_impl(&self, deltas: &[u64], output: &mut [u64]) {
469 use std::arch::x86_64::*;
470
471 let n = deltas.len();
472 let mut current = self.base_ts;
473 let mut i = 0;
474
475 while i + 4 <= n {
477 let d = _mm256_loadu_si256(deltas[i..].as_ptr() as *const __m256i);
479
480 for j in 0..4 {
485 current = current.wrapping_add(deltas[i + j]);
486 output[i + j] = current;
487 }
488
489 i += 4;
490 }
491
492 while i < n {
494 current = current.wrapping_add(deltas[i]);
495 output[i] = current;
496 i += 1;
497 }
498 }
499
500 pub fn decode_deltas_scalar(&self, deltas: &[u64], output: &mut [u64]) {
502 let mut current = self.base_ts;
503 for (i, &delta) in deltas.iter().enumerate() {
504 current = current.wrapping_add(delta);
505 output[i] = current;
506 }
507 }
508
509 #[cfg(not(target_arch = "x86_64"))]
511 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
512 self.decode_deltas_scalar(deltas, output);
513 }
514}
515
516pub struct SimdKeyComparator;
518
519impl SimdKeyComparator {
520 #[cfg(target_arch = "x86_64")]
524 pub fn match_prefix_avx2(
525 key_lens: &[u16],
526 key_data: &[u8],
527 key_offsets: &[u32],
528 prefix: &[u8],
529 ) -> Vec<bool> {
530 let mut results = vec![false; key_lens.len()];
531 let prefix_len = prefix.len();
532
533 if prefix_len == 0 {
534 results.fill(true);
535 return results;
536 }
537
538 for (i, &len) in key_lens.iter().enumerate() {
539 if (len as usize) >= prefix_len {
540 let offset = key_offsets[i] as usize;
541 let key_slice = &key_data[offset..offset + prefix_len];
542 results[i] = key_slice == prefix;
543 }
544 }
545
546 results
547 }
548
549 #[cfg(not(target_arch = "x86_64"))]
551 pub fn match_prefix_avx2(
552 key_lens: &[u16],
553 key_data: &[u8],
554 key_offsets: &[u32],
555 prefix: &[u8],
556 ) -> Vec<bool> {
557 let mut results = vec![false; key_lens.len()];
558 let prefix_len = prefix.len();
559
560 if prefix_len == 0 {
561 results.fill(true);
562 return results;
563 }
564
565 for (i, &len) in key_lens.iter().enumerate() {
566 if (len as usize) >= prefix_len {
567 let offset = key_offsets[i] as usize;
568 let key_slice = &key_data[offset..offset + prefix_len];
569 results[i] = key_slice == prefix;
570 }
571 }
572
573 results
574 }
575}
576
577pub struct ColumnarWalWriter<W: Write> {
583 writer: W,
585 current_block: ColumnarWalBlock,
587 sequence: AtomicU64,
589 bytes_written: AtomicU64,
591 blocks_written: AtomicU64,
593}
594
595impl<W: Write> ColumnarWalWriter<W> {
596 pub fn new(writer: W) -> Self {
598 Self::with_batch_size(writer, DEFAULT_BATCH_SIZE)
599 }
600
601 pub fn with_batch_size(writer: W, batch_size: usize) -> Self {
603 Self {
604 writer,
605 current_block: ColumnarWalBlock::with_batch_size(batch_size),
606 sequence: AtomicU64::new(0),
607 bytes_written: AtomicU64::new(0),
608 blocks_written: AtomicU64::new(0),
609 }
610 }
611
612 pub fn write_entry(&mut self, entry: WalEntry) -> io::Result<()> {
614 if !self.current_block.add_entry(entry.clone()) {
615 self.flush_block()?;
617 if !self.current_block.add_entry(entry) {
619 return Err(io::Error::new(io::ErrorKind::InvalidData, "entry too large for block"));
620 }
621 }
622 Ok(())
623 }
624
625 pub fn flush_block(&mut self) -> io::Result<()> {
627 if self.current_block.is_empty() {
628 return Ok(());
629 }
630
631 let data = self.current_block.serialize();
632 self.writer.write_all(&data)?;
633
634 self.bytes_written.fetch_add(data.len() as u64, Ordering::Relaxed);
635 self.blocks_written.fetch_add(1, Ordering::Relaxed);
636 self.sequence.fetch_add(1, Ordering::Relaxed);
637
638 self.current_block.clear();
639 Ok(())
640 }
641
642 pub fn flush(&mut self) -> io::Result<()> {
644 self.flush_block()?;
645 self.writer.flush()
646 }
647
648 pub fn stats(&self) -> WalWriterStats {
650 WalWriterStats {
651 bytes_written: self.bytes_written.load(Ordering::Relaxed),
652 blocks_written: self.blocks_written.load(Ordering::Relaxed),
653 current_block_entries: self.current_block.len(),
654 }
655 }
656}
657
658#[derive(Debug, Clone)]
660pub struct WalWriterStats {
661 pub bytes_written: u64,
663 pub blocks_written: u64,
665 pub current_block_entries: usize,
667}
668
669pub struct ColumnarWalReader<R: Read> {
675 reader: R,
677 current_block: Option<ColumnarWalBlock>,
679 current_pos: usize,
681}
682
683impl<R: Read> ColumnarWalReader<R> {
684 pub fn new(reader: R) -> Self {
686 Self {
687 reader,
688 current_block: None,
689 current_pos: 0,
690 }
691 }
692
693 pub fn next_entry(&mut self) -> io::Result<Option<WalEntry>> {
695 if let Some(ref block) = self.current_block {
697 if self.current_pos < block.len() {
698 let entry = block.entries()[self.current_pos].clone();
699 self.current_pos += 1;
700 return Ok(Some(entry));
701 }
702 }
703
704 match self.read_block()? {
706 Some(block) => {
707 if block.is_empty() {
708 return Ok(None);
709 }
710 let entry = block.entries()[0].clone();
711 self.current_block = Some(block);
712 self.current_pos = 1;
713 Ok(Some(entry))
714 }
715 None => Ok(None),
716 }
717 }
718
719 fn read_block(&mut self) -> io::Result<Option<ColumnarWalBlock>> {
721 let header_size = std::mem::size_of::<BlockHeader>();
722 let mut header_buf = vec![0u8; header_size];
723
724 match self.reader.read_exact(&mut header_buf) {
725 Ok(_) => {}
726 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
727 Err(e) => return Err(e),
728 }
729
730 let header = unsafe { &*(header_buf.as_ptr() as *const BlockHeader) };
732
733 if header.magic != COLUMNAR_WAL_MAGIC {
734 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
735 }
736
737 let _remaining = header.block_size as usize - header_size;
738 let mut block_data = header_buf;
739 block_data.resize(header.block_size as usize, 0);
740 self.reader.read_exact(&mut block_data[header_size..])?;
741
742 ColumnarWalBlock::deserialize(&block_data).map(Some)
743 }
744
745 pub fn read_all(&mut self) -> io::Result<Vec<WalEntry>> {
747 let mut entries = Vec::new();
748 while let Some(entry) = self.next_entry()? {
749 entries.push(entry);
750 }
751 Ok(entries)
752 }
753}
754
755fn crc32_simple(data: &[u8]) -> u32 {
761 let mut crc = 0xFFFFFFFFu32;
762 for byte in data {
763 let index = ((crc ^ (*byte as u32)) & 0xFF) as usize;
764 crc = CRC32_TABLE[index] ^ (crc >> 8);
765 }
766 !crc
767}
768
769static CRC32_TABLE: [u32; 256] = {
771 let mut table = [0u32; 256];
772 let mut i = 0;
773 while i < 256 {
774 let mut crc = i as u32;
775 let mut j = 0;
776 while j < 8 {
777 if crc & 1 == 1 {
778 crc = 0xEDB88320 ^ (crc >> 1);
779 } else {
780 crc >>= 1;
781 }
782 j += 1;
783 }
784 table[i] = crc;
785 i += 1;
786 }
787 table
788};
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793 use std::io::Cursor;
794
795 #[test]
796 fn test_wal_entry_creation() {
797 let entry = WalEntry::put(1, 100, b"key".to_vec(), b"value".to_vec());
798 assert_eq!(entry.op, WalOpType::Put);
799 assert_eq!(entry.txn_id, 1);
800 assert_eq!(entry.timestamp, 100);
801 assert_eq!(entry.key, b"key");
802 assert_eq!(entry.value, b"value");
803 }
804
805 #[test]
806 fn test_block_serialize_deserialize() {
807 let mut block = ColumnarWalBlock::new();
808
809 for i in 0..10 {
810 let entry = WalEntry::put(
811 i,
812 100 + i,
813 format!("key{}", i).into_bytes(),
814 format!("value{}", i).into_bytes(),
815 );
816 assert!(block.add_entry(entry));
817 }
818
819 let data = block.serialize();
820 let decoded = ColumnarWalBlock::deserialize(&data).unwrap();
821
822 assert_eq!(decoded.len(), 10);
823 for (i, entry) in decoded.entries().iter().enumerate() {
824 assert_eq!(entry.txn_id, i as u64);
825 assert_eq!(entry.timestamp, 100 + i as u64);
826 assert_eq!(entry.key, format!("key{}", i).into_bytes());
827 assert_eq!(entry.value, format!("value{}", i).into_bytes());
828 }
829 }
830
831 #[test]
832 fn test_block_full() {
833 let mut block = ColumnarWalBlock::with_batch_size(5);
834
835 for i in 0..5 {
836 let entry = WalEntry::put(i, i * 10, vec![i as u8], vec![]);
837 assert!(block.add_entry(entry));
838 }
839
840 assert!(block.is_full());
841
842 let entry = WalEntry::put(5, 50, vec![5], vec![]);
843 assert!(!block.add_entry(entry)); }
845
846 #[test]
847 fn test_writer_reader_roundtrip() {
848 let mut buffer = Vec::new();
849
850 {
852 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(&mut buffer), 10);
853
854 for i in 0..25 {
855 let entry = WalEntry::put(
856 i,
857 1000 + i,
858 format!("key_{}", i).into_bytes(),
859 format!("value_{}", i).into_bytes(),
860 );
861 writer.write_entry(entry).unwrap();
862 }
863
864 writer.flush().unwrap();
865 }
866
867 let mut reader = ColumnarWalReader::new(Cursor::new(&buffer));
869 let entries = reader.read_all().unwrap();
870
871 assert_eq!(entries.len(), 25);
872 for (i, entry) in entries.iter().enumerate() {
873 assert_eq!(entry.txn_id, i as u64);
874 assert_eq!(entry.timestamp, 1000 + i as u64);
875 assert_eq!(entry.key, format!("key_{}", i).into_bytes());
876 assert_eq!(entry.value, format!("value_{}", i).into_bytes());
877 }
878 }
879
880 #[test]
881 fn test_timestamp_decoder() {
882 let decoder = SimdTimestampDecoder::new(1000);
883 let deltas = vec![10, 20, 30, 40, 50, 60, 70, 80];
884 let mut output = vec![0u64; 8];
885
886 decoder.decode_deltas_scalar(&deltas, &mut output);
887
888 assert_eq!(output, vec![1010, 1030, 1060, 1100, 1150, 1210, 1280, 1360]);
889 }
890
891 #[test]
892 fn test_key_comparator() {
893 let key_lens = vec![4u16, 5, 4, 6, 4];
894 let key_data = b"key1key12key3key123key4";
895 let key_offsets = vec![0u32, 4, 9, 13, 19];
896
897 let results = SimdKeyComparator::match_prefix_avx2(
898 &key_lens,
899 key_data,
900 &key_offsets,
901 b"key",
902 );
903
904 assert!(results.iter().all(|&r| r)); let results = SimdKeyComparator::match_prefix_avx2(
907 &key_lens,
908 key_data,
909 &key_offsets,
910 b"key1",
911 );
912
913 assert_eq!(results, vec![true, true, false, true, false]);
914 }
915
916 #[test]
917 fn test_writer_stats() {
918 let buffer = Vec::new();
919 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(buffer), 10);
920
921 for i in 0..5 {
922 writer.write_entry(WalEntry::put(i, i, vec![0], vec![0])).unwrap();
923 }
924
925 let stats = writer.stats();
926 assert_eq!(stats.current_block_entries, 5);
927 assert_eq!(stats.blocks_written, 0);
928
929 writer.flush().unwrap();
930
931 let stats = writer.stats();
932 assert_eq!(stats.current_block_entries, 0);
933 assert_eq!(stats.blocks_written, 1);
934 }
935
936 #[test]
937 fn test_crc32() {
938 let data = b"hello world";
939 let crc = crc32_simple(data);
940 assert_eq!(crc, 0x0D4A1185);
942 }
943}