1use std::io::{self, Read, Write};
41use std::sync::atomic::{AtomicU64, Ordering};
42
43const COLUMNAR_WAL_MAGIC: [u8; 4] = [0x43, 0x57, 0x01, 0x00]; const DEFAULT_BATCH_SIZE: usize = 256;
48
49const MAX_KEY_SIZE: usize = 256;
51
52#[allow(dead_code)]
54const MAX_VALUE_SIZE: usize = 1024 * 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
62#[repr(u8)]
63pub enum WalOpType {
64 Put = 0,
66 Delete = 1,
68 BeginTxn = 2,
70 CommitTxn = 3,
72 AbortTxn = 4,
74 Checkpoint = 5,
76}
77
78impl WalOpType {
79 fn from_u8(v: u8) -> Option<Self> {
80 match v {
81 0 => Some(Self::Put),
82 1 => Some(Self::Delete),
83 2 => Some(Self::BeginTxn),
84 3 => Some(Self::CommitTxn),
85 4 => Some(Self::AbortTxn),
86 5 => Some(Self::Checkpoint),
87 _ => None,
88 }
89 }
90}
91
92#[derive(Clone)]
94pub struct WalEntry {
95 pub op: WalOpType,
97 pub txn_id: u64,
99 pub timestamp: u64,
101 pub key: Vec<u8>,
103 pub value: Vec<u8>,
105}
106
107impl WalEntry {
108 pub fn put(txn_id: u64, timestamp: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
110 Self {
111 op: WalOpType::Put,
112 txn_id,
113 timestamp,
114 key,
115 value,
116 }
117 }
118
119 pub fn delete(txn_id: u64, timestamp: u64, key: Vec<u8>) -> Self {
121 Self {
122 op: WalOpType::Delete,
123 txn_id,
124 timestamp,
125 key,
126 value: Vec::new(),
127 }
128 }
129
130 pub fn begin_txn(txn_id: u64, timestamp: u64) -> Self {
132 Self {
133 op: WalOpType::BeginTxn,
134 txn_id,
135 timestamp,
136 key: Vec::new(),
137 value: Vec::new(),
138 }
139 }
140
141 pub fn commit_txn(txn_id: u64, timestamp: u64) -> Self {
143 Self {
144 op: WalOpType::CommitTxn,
145 txn_id,
146 timestamp,
147 key: Vec::new(),
148 value: Vec::new(),
149 }
150 }
151}
152
153#[derive(Clone, Copy)]
159#[repr(C, packed)]
160struct BlockHeader {
161 magic: [u8; 4],
163 version: u8,
165 entry_count: u16,
167 _reserved: u8,
169 op_lane_offset: u32,
171 txn_lane_offset: u32,
173 ts_lane_offset: u32,
175 key_len_lane_offset: u32,
177 key_data_lane_offset: u32,
179 value_len_lane_offset: u32,
181 value_data_lane_offset: u32,
183 block_size: u32,
185 checksum: u32,
187}
188
189pub struct ColumnarWalBlock {
191 entries: Vec<WalEntry>,
193 batch_size: usize,
195}
196
197impl ColumnarWalBlock {
198 pub fn new() -> Self {
200 Self::with_batch_size(DEFAULT_BATCH_SIZE)
201 }
202
203 pub fn with_batch_size(batch_size: usize) -> Self {
205 Self {
206 entries: Vec::with_capacity(batch_size),
207 batch_size,
208 }
209 }
210
211 pub fn add_entry(&mut self, entry: WalEntry) -> bool {
213 if self.entries.len() >= self.batch_size {
214 return false;
215 }
216 self.entries.push(entry);
217 true
218 }
219
220 pub fn is_full(&self) -> bool {
222 self.entries.len() >= self.batch_size
223 }
224
225 pub fn len(&self) -> usize {
227 self.entries.len()
228 }
229
230 pub fn is_empty(&self) -> bool {
232 self.entries.is_empty()
233 }
234
235 pub fn entries(&self) -> &[WalEntry] {
237 &self.entries
238 }
239
240 pub fn serialize(&self) -> Vec<u8> {
242 let entry_count = self.entries.len();
243 if entry_count == 0 {
244 return Vec::new();
245 }
246
247 let op_lane_size = entry_count;
249 let txn_lane_size = entry_count * 8;
250 let ts_lane_size = entry_count * 8; let key_len_size = entry_count * 2; let key_data_size: usize = self.entries.iter().map(|e| e.key.len()).sum();
253 let value_len_size = entry_count * 4; let value_data_size: usize = self.entries.iter().map(|e| e.value.len()).sum();
255
256 let header_size = std::mem::size_of::<BlockHeader>();
257 let total_size = header_size
258 + op_lane_size
259 + txn_lane_size
260 + ts_lane_size
261 + key_len_size
262 + key_data_size
263 + value_len_size
264 + value_data_size;
265
266 let mut buffer = vec![0u8; total_size];
267 let mut offset = header_size;
268
269 let op_lane_offset = offset as u32;
271 for entry in &self.entries {
272 buffer[offset] = entry.op as u8;
273 offset += 1;
274 }
275
276 let txn_lane_offset = offset as u32;
278 for entry in &self.entries {
279 buffer[offset..offset + 8].copy_from_slice(&entry.txn_id.to_le_bytes());
280 offset += 8;
281 }
282
283 let ts_lane_offset = offset as u32;
285 for entry in &self.entries {
286 buffer[offset..offset + 8].copy_from_slice(&entry.timestamp.to_le_bytes());
287 offset += 8;
288 }
289
290 let key_len_lane_offset = offset as u32;
292 for entry in &self.entries {
293 let len = entry.key.len().min(MAX_KEY_SIZE) as u16;
294 buffer[offset..offset + 2].copy_from_slice(&len.to_le_bytes());
295 offset += 2;
296 }
297
298 let key_data_lane_offset = offset as u32;
300 for entry in &self.entries {
301 let len = entry.key.len().min(MAX_KEY_SIZE);
302 buffer[offset..offset + len].copy_from_slice(&entry.key[..len]);
303 offset += len;
304 }
305
306 let value_len_lane_offset = offset as u32;
308 for entry in &self.entries {
309 let len = entry.value.len() as u32;
310 buffer[offset..offset + 4].copy_from_slice(&len.to_le_bytes());
311 offset += 4;
312 }
313
314 let value_data_lane_offset = offset as u32;
316 for entry in &self.entries {
317 buffer[offset..offset + entry.value.len()].copy_from_slice(&entry.value);
318 offset += entry.value.len();
319 }
320
321 let checksum = crc32_simple(&buffer[header_size..offset]);
323
324 let header = BlockHeader {
326 magic: COLUMNAR_WAL_MAGIC,
327 version: 1,
328 entry_count: entry_count as u16,
329 _reserved: 0,
330 op_lane_offset,
331 txn_lane_offset,
332 ts_lane_offset,
333 key_len_lane_offset,
334 key_data_lane_offset,
335 value_len_lane_offset,
336 value_data_lane_offset,
337 block_size: offset as u32,
338 checksum,
339 };
340
341 let header_bytes = unsafe {
343 std::slice::from_raw_parts(
344 &header as *const BlockHeader as *const u8,
345 std::mem::size_of::<BlockHeader>(),
346 )
347 };
348 buffer[..header_size].copy_from_slice(header_bytes);
349
350 buffer.truncate(offset);
351 buffer
352 }
353
354 pub fn deserialize(data: &[u8]) -> io::Result<Self> {
356 let header_size = std::mem::size_of::<BlockHeader>();
357 if data.len() < header_size {
358 return Err(io::Error::new(io::ErrorKind::InvalidData, "buffer too small"));
359 }
360
361 let header = unsafe { &*(data.as_ptr() as *const BlockHeader) };
363
364 if header.magic != COLUMNAR_WAL_MAGIC {
366 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
367 }
368
369 let expected_checksum = header.checksum;
371 let actual_checksum = crc32_simple(&data[header_size..header.block_size as usize]);
372 if expected_checksum != actual_checksum {
373 return Err(io::Error::new(io::ErrorKind::InvalidData, "checksum mismatch"));
374 }
375
376 let entry_count = header.entry_count as usize;
377 let mut entries = Vec::with_capacity(entry_count);
378
379 let op_lane = &data[header.op_lane_offset as usize..];
381 let txn_lane = &data[header.txn_lane_offset as usize..];
382 let ts_lane = &data[header.ts_lane_offset as usize..];
383 let key_len_lane = &data[header.key_len_lane_offset as usize..];
384 let key_data_lane = &data[header.key_data_lane_offset as usize..];
385 let value_len_lane = &data[header.value_len_lane_offset as usize..];
386 let value_data_lane = &data[header.value_data_lane_offset as usize..];
387
388 let mut key_offset = 0usize;
389 let mut value_offset = 0usize;
390
391 for i in 0..entry_count {
392 let op = WalOpType::from_u8(op_lane[i])
393 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid op"))?;
394
395 let txn_id = u64::from_le_bytes(txn_lane[i * 8..i * 8 + 8].try_into().unwrap());
396 let timestamp = u64::from_le_bytes(ts_lane[i * 8..i * 8 + 8].try_into().unwrap());
397 let key_len = u16::from_le_bytes(key_len_lane[i * 2..i * 2 + 2].try_into().unwrap()) as usize;
398 let value_len = u32::from_le_bytes(value_len_lane[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
399
400 let key = key_data_lane[key_offset..key_offset + key_len].to_vec();
401 key_offset += key_len;
402
403 let value = value_data_lane[value_offset..value_offset + value_len].to_vec();
404 value_offset += value_len;
405
406 entries.push(WalEntry {
407 op,
408 txn_id,
409 timestamp,
410 key,
411 value,
412 });
413 }
414
415 Ok(Self {
416 entries,
417 batch_size: DEFAULT_BATCH_SIZE,
418 })
419 }
420
421 pub fn clear(&mut self) {
423 self.entries.clear();
424 }
425}
426
427impl Default for ColumnarWalBlock {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433pub struct SimdTimestampDecoder {
439 base_ts: u64,
441}
442
443impl SimdTimestampDecoder {
444 pub fn new(base_ts: u64) -> Self {
446 Self { base_ts }
447 }
448
449 #[cfg(target_arch = "x86_64")]
454 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
455 #[cfg(target_arch = "x86_64")]
456 {
457 if is_x86_feature_detected!("avx2") && deltas.len() >= 4 {
458 unsafe { self.decode_deltas_avx2_impl(deltas, output) }
459 return;
460 }
461 }
462 self.decode_deltas_scalar(deltas, output);
463 }
464
465 #[cfg(target_arch = "x86_64")]
467 #[target_feature(enable = "avx2")]
468 unsafe fn decode_deltas_avx2_impl(&self, deltas: &[u64], output: &mut [u64]) {
469 use std::arch::x86_64::*;
470
471 let n = deltas.len();
472 let mut current = self.base_ts;
473 let mut i = 0;
474
475 while i + 4 <= n {
477 let _d = unsafe { _mm256_loadu_si256(deltas[i..].as_ptr() as *const __m256i) };
480
481 for j in 0..4 {
486 current = current.wrapping_add(deltas[i + j]);
487 output[i + j] = current;
488 }
489
490 i += 4;
491 }
492
493 while i < n {
495 current = current.wrapping_add(deltas[i]);
496 output[i] = current;
497 i += 1;
498 }
499 }
500
501 pub fn decode_deltas_scalar(&self, deltas: &[u64], output: &mut [u64]) {
503 let mut current = self.base_ts;
504 for (i, &delta) in deltas.iter().enumerate() {
505 current = current.wrapping_add(delta);
506 output[i] = current;
507 }
508 }
509
510 #[cfg(not(target_arch = "x86_64"))]
512 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
513 self.decode_deltas_scalar(deltas, output);
514 }
515}
516
517pub struct SimdKeyComparator;
519
520impl SimdKeyComparator {
521 #[cfg(target_arch = "x86_64")]
525 pub fn match_prefix_avx2(
526 key_lens: &[u16],
527 key_data: &[u8],
528 key_offsets: &[u32],
529 prefix: &[u8],
530 ) -> Vec<bool> {
531 let mut results = vec![false; key_lens.len()];
532 let prefix_len = prefix.len();
533
534 if prefix_len == 0 {
535 results.fill(true);
536 return results;
537 }
538
539 for (i, &len) in key_lens.iter().enumerate() {
540 if (len as usize) >= prefix_len {
541 let offset = key_offsets[i] as usize;
542 let key_slice = &key_data[offset..offset + prefix_len];
543 results[i] = key_slice == prefix;
544 }
545 }
546
547 results
548 }
549
550 #[cfg(not(target_arch = "x86_64"))]
552 pub fn match_prefix_avx2(
553 key_lens: &[u16],
554 key_data: &[u8],
555 key_offsets: &[u32],
556 prefix: &[u8],
557 ) -> Vec<bool> {
558 let mut results = vec![false; key_lens.len()];
559 let prefix_len = prefix.len();
560
561 if prefix_len == 0 {
562 results.fill(true);
563 return results;
564 }
565
566 for (i, &len) in key_lens.iter().enumerate() {
567 if (len as usize) >= prefix_len {
568 let offset = key_offsets[i] as usize;
569 let key_slice = &key_data[offset..offset + prefix_len];
570 results[i] = key_slice == prefix;
571 }
572 }
573
574 results
575 }
576}
577
578pub struct ColumnarWalWriter<W: Write> {
584 writer: W,
586 current_block: ColumnarWalBlock,
588 sequence: AtomicU64,
590 bytes_written: AtomicU64,
592 blocks_written: AtomicU64,
594}
595
596impl<W: Write> ColumnarWalWriter<W> {
597 pub fn new(writer: W) -> Self {
599 Self::with_batch_size(writer, DEFAULT_BATCH_SIZE)
600 }
601
602 pub fn with_batch_size(writer: W, batch_size: usize) -> Self {
604 Self {
605 writer,
606 current_block: ColumnarWalBlock::with_batch_size(batch_size),
607 sequence: AtomicU64::new(0),
608 bytes_written: AtomicU64::new(0),
609 blocks_written: AtomicU64::new(0),
610 }
611 }
612
613 pub fn write_entry(&mut self, entry: WalEntry) -> io::Result<()> {
615 if !self.current_block.add_entry(entry.clone()) {
616 self.flush_block()?;
618 if !self.current_block.add_entry(entry) {
620 return Err(io::Error::new(io::ErrorKind::InvalidData, "entry too large for block"));
621 }
622 }
623 Ok(())
624 }
625
626 pub fn flush_block(&mut self) -> io::Result<()> {
628 if self.current_block.is_empty() {
629 return Ok(());
630 }
631
632 let data = self.current_block.serialize();
633 self.writer.write_all(&data)?;
634
635 self.bytes_written.fetch_add(data.len() as u64, Ordering::Relaxed);
636 self.blocks_written.fetch_add(1, Ordering::Relaxed);
637 self.sequence.fetch_add(1, Ordering::Relaxed);
638
639 self.current_block.clear();
640 Ok(())
641 }
642
643 pub fn flush(&mut self) -> io::Result<()> {
645 self.flush_block()?;
646 self.writer.flush()
647 }
648
649 pub fn stats(&self) -> WalWriterStats {
651 WalWriterStats {
652 bytes_written: self.bytes_written.load(Ordering::Relaxed),
653 blocks_written: self.blocks_written.load(Ordering::Relaxed),
654 current_block_entries: self.current_block.len(),
655 }
656 }
657}
658
659#[derive(Debug, Clone)]
661pub struct WalWriterStats {
662 pub bytes_written: u64,
664 pub blocks_written: u64,
666 pub current_block_entries: usize,
668}
669
670pub struct ColumnarWalReader<R: Read> {
676 reader: R,
678 current_block: Option<ColumnarWalBlock>,
680 current_pos: usize,
682}
683
684impl<R: Read> ColumnarWalReader<R> {
685 pub fn new(reader: R) -> Self {
687 Self {
688 reader,
689 current_block: None,
690 current_pos: 0,
691 }
692 }
693
694 pub fn next_entry(&mut self) -> io::Result<Option<WalEntry>> {
696 if let Some(ref block) = self.current_block {
698 if self.current_pos < block.len() {
699 let entry = block.entries()[self.current_pos].clone();
700 self.current_pos += 1;
701 return Ok(Some(entry));
702 }
703 }
704
705 match self.read_block()? {
707 Some(block) => {
708 if block.is_empty() {
709 return Ok(None);
710 }
711 let entry = block.entries()[0].clone();
712 self.current_block = Some(block);
713 self.current_pos = 1;
714 Ok(Some(entry))
715 }
716 None => Ok(None),
717 }
718 }
719
720 fn read_block(&mut self) -> io::Result<Option<ColumnarWalBlock>> {
722 let header_size = std::mem::size_of::<BlockHeader>();
723 let mut header_buf = vec![0u8; header_size];
724
725 match self.reader.read_exact(&mut header_buf) {
726 Ok(_) => {}
727 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
728 Err(e) => return Err(e),
729 }
730
731 let header = unsafe { &*(header_buf.as_ptr() as *const BlockHeader) };
733
734 if header.magic != COLUMNAR_WAL_MAGIC {
735 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
736 }
737
738 let _remaining = header.block_size as usize - header_size;
739 let mut block_data = header_buf;
740 block_data.resize(header.block_size as usize, 0);
741 self.reader.read_exact(&mut block_data[header_size..])?;
742
743 ColumnarWalBlock::deserialize(&block_data).map(Some)
744 }
745
746 pub fn read_all(&mut self) -> io::Result<Vec<WalEntry>> {
748 let mut entries = Vec::new();
749 while let Some(entry) = self.next_entry()? {
750 entries.push(entry);
751 }
752 Ok(entries)
753 }
754}
755
756fn crc32_simple(data: &[u8]) -> u32 {
762 let mut crc = 0xFFFFFFFFu32;
763 for byte in data {
764 let index = ((crc ^ (*byte as u32)) & 0xFF) as usize;
765 crc = CRC32_TABLE[index] ^ (crc >> 8);
766 }
767 !crc
768}
769
770static CRC32_TABLE: [u32; 256] = {
772 let mut table = [0u32; 256];
773 let mut i = 0;
774 while i < 256 {
775 let mut crc = i as u32;
776 let mut j = 0;
777 while j < 8 {
778 if crc & 1 == 1 {
779 crc = 0xEDB88320 ^ (crc >> 1);
780 } else {
781 crc >>= 1;
782 }
783 j += 1;
784 }
785 table[i] = crc;
786 i += 1;
787 }
788 table
789};
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794 use std::io::Cursor;
795
796 #[test]
797 fn test_wal_entry_creation() {
798 let entry = WalEntry::put(1, 100, b"key".to_vec(), b"value".to_vec());
799 assert_eq!(entry.op, WalOpType::Put);
800 assert_eq!(entry.txn_id, 1);
801 assert_eq!(entry.timestamp, 100);
802 assert_eq!(entry.key, b"key");
803 assert_eq!(entry.value, b"value");
804 }
805
806 #[test]
807 fn test_block_serialize_deserialize() {
808 let mut block = ColumnarWalBlock::new();
809
810 for i in 0..10 {
811 let entry = WalEntry::put(
812 i,
813 100 + i,
814 format!("key{}", i).into_bytes(),
815 format!("value{}", i).into_bytes(),
816 );
817 assert!(block.add_entry(entry));
818 }
819
820 let data = block.serialize();
821 let decoded = ColumnarWalBlock::deserialize(&data).unwrap();
822
823 assert_eq!(decoded.len(), 10);
824 for (i, entry) in decoded.entries().iter().enumerate() {
825 assert_eq!(entry.txn_id, i as u64);
826 assert_eq!(entry.timestamp, 100 + i as u64);
827 assert_eq!(entry.key, format!("key{}", i).into_bytes());
828 assert_eq!(entry.value, format!("value{}", i).into_bytes());
829 }
830 }
831
832 #[test]
833 fn test_block_full() {
834 let mut block = ColumnarWalBlock::with_batch_size(5);
835
836 for i in 0..5 {
837 let entry = WalEntry::put(i, i * 10, vec![i as u8], vec![]);
838 assert!(block.add_entry(entry));
839 }
840
841 assert!(block.is_full());
842
843 let entry = WalEntry::put(5, 50, vec![5], vec![]);
844 assert!(!block.add_entry(entry)); }
846
847 #[test]
848 fn test_writer_reader_roundtrip() {
849 let mut buffer = Vec::new();
850
851 {
853 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(&mut buffer), 10);
854
855 for i in 0..25 {
856 let entry = WalEntry::put(
857 i,
858 1000 + i,
859 format!("key_{}", i).into_bytes(),
860 format!("value_{}", i).into_bytes(),
861 );
862 writer.write_entry(entry).unwrap();
863 }
864
865 writer.flush().unwrap();
866 }
867
868 let mut reader = ColumnarWalReader::new(Cursor::new(&buffer));
870 let entries = reader.read_all().unwrap();
871
872 assert_eq!(entries.len(), 25);
873 for (i, entry) in entries.iter().enumerate() {
874 assert_eq!(entry.txn_id, i as u64);
875 assert_eq!(entry.timestamp, 1000 + i as u64);
876 assert_eq!(entry.key, format!("key_{}", i).into_bytes());
877 assert_eq!(entry.value, format!("value_{}", i).into_bytes());
878 }
879 }
880
881 #[test]
882 fn test_timestamp_decoder() {
883 let decoder = SimdTimestampDecoder::new(1000);
884 let deltas = vec![10, 20, 30, 40, 50, 60, 70, 80];
885 let mut output = vec![0u64; 8];
886
887 decoder.decode_deltas_scalar(&deltas, &mut output);
888
889 assert_eq!(output, vec![1010, 1030, 1060, 1100, 1150, 1210, 1280, 1360]);
890 }
891
892 #[test]
893 fn test_key_comparator() {
894 let key_lens = vec![4u16, 5, 4, 6, 4];
895 let key_data = b"key1key12key3key123key4";
896 let key_offsets = vec![0u32, 4, 9, 13, 19];
897
898 let results = SimdKeyComparator::match_prefix_avx2(
899 &key_lens,
900 key_data,
901 &key_offsets,
902 b"key",
903 );
904
905 assert!(results.iter().all(|&r| r)); let results = SimdKeyComparator::match_prefix_avx2(
908 &key_lens,
909 key_data,
910 &key_offsets,
911 b"key1",
912 );
913
914 assert_eq!(results, vec![true, true, false, true, false]);
915 }
916
917 #[test]
918 fn test_writer_stats() {
919 let buffer = Vec::new();
920 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(buffer), 10);
921
922 for i in 0..5 {
923 writer.write_entry(WalEntry::put(i, i, vec![0], vec![0])).unwrap();
924 }
925
926 let stats = writer.stats();
927 assert_eq!(stats.current_block_entries, 5);
928 assert_eq!(stats.blocks_written, 0);
929
930 writer.flush().unwrap();
931
932 let stats = writer.stats();
933 assert_eq!(stats.current_block_entries, 0);
934 assert_eq!(stats.blocks_written, 1);
935 }
936
937 #[test]
938 fn test_crc32() {
939 let data = b"hello world";
940 let crc = crc32_simple(data);
941 assert_eq!(crc, 0x0D4A1185);
943 }
944}