1use std::io::{self, Read, Write};
44use std::sync::atomic::{AtomicU64, Ordering};
45
46const COLUMNAR_WAL_MAGIC: [u8; 4] = [0x43, 0x57, 0x01, 0x00]; const DEFAULT_BATCH_SIZE: usize = 256;
51
52const MAX_KEY_SIZE: usize = 256;
54
55#[allow(dead_code)]
57const MAX_VALUE_SIZE: usize = 1024 * 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
65#[repr(u8)]
66pub enum WalOpType {
67 Put = 0,
69 Delete = 1,
71 BeginTxn = 2,
73 CommitTxn = 3,
75 AbortTxn = 4,
77 Checkpoint = 5,
79}
80
81impl WalOpType {
82 fn from_u8(v: u8) -> Option<Self> {
83 match v {
84 0 => Some(Self::Put),
85 1 => Some(Self::Delete),
86 2 => Some(Self::BeginTxn),
87 3 => Some(Self::CommitTxn),
88 4 => Some(Self::AbortTxn),
89 5 => Some(Self::Checkpoint),
90 _ => None,
91 }
92 }
93}
94
95#[derive(Clone)]
97pub struct WalEntry {
98 pub op: WalOpType,
100 pub txn_id: u64,
102 pub timestamp: u64,
104 pub key: Vec<u8>,
106 pub value: Vec<u8>,
108}
109
110impl WalEntry {
111 pub fn put(txn_id: u64, timestamp: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
113 Self {
114 op: WalOpType::Put,
115 txn_id,
116 timestamp,
117 key,
118 value,
119 }
120 }
121
122 pub fn delete(txn_id: u64, timestamp: u64, key: Vec<u8>) -> Self {
124 Self {
125 op: WalOpType::Delete,
126 txn_id,
127 timestamp,
128 key,
129 value: Vec::new(),
130 }
131 }
132
133 pub fn begin_txn(txn_id: u64, timestamp: u64) -> Self {
135 Self {
136 op: WalOpType::BeginTxn,
137 txn_id,
138 timestamp,
139 key: Vec::new(),
140 value: Vec::new(),
141 }
142 }
143
144 pub fn commit_txn(txn_id: u64, timestamp: u64) -> Self {
146 Self {
147 op: WalOpType::CommitTxn,
148 txn_id,
149 timestamp,
150 key: Vec::new(),
151 value: Vec::new(),
152 }
153 }
154}
155
156#[derive(Clone, Copy)]
162#[repr(C, packed)]
163struct BlockHeader {
164 magic: [u8; 4],
166 version: u8,
168 entry_count: u16,
170 _reserved: u8,
172 op_lane_offset: u32,
174 txn_lane_offset: u32,
176 ts_lane_offset: u32,
178 key_len_lane_offset: u32,
180 key_data_lane_offset: u32,
182 value_len_lane_offset: u32,
184 value_data_lane_offset: u32,
186 block_size: u32,
188 checksum: u32,
190}
191
192pub struct ColumnarWalBlock {
194 entries: Vec<WalEntry>,
196 batch_size: usize,
198}
199
200impl ColumnarWalBlock {
201 pub fn new() -> Self {
203 Self::with_batch_size(DEFAULT_BATCH_SIZE)
204 }
205
206 pub fn with_batch_size(batch_size: usize) -> Self {
208 Self {
209 entries: Vec::with_capacity(batch_size),
210 batch_size,
211 }
212 }
213
214 pub fn add_entry(&mut self, entry: WalEntry) -> bool {
216 if self.entries.len() >= self.batch_size {
217 return false;
218 }
219 self.entries.push(entry);
220 true
221 }
222
223 pub fn is_full(&self) -> bool {
225 self.entries.len() >= self.batch_size
226 }
227
228 pub fn len(&self) -> usize {
230 self.entries.len()
231 }
232
233 pub fn is_empty(&self) -> bool {
235 self.entries.is_empty()
236 }
237
238 pub fn entries(&self) -> &[WalEntry] {
240 &self.entries
241 }
242
243 pub fn serialize(&self) -> Vec<u8> {
245 let entry_count = self.entries.len();
246 if entry_count == 0 {
247 return Vec::new();
248 }
249
250 let op_lane_size = entry_count;
252 let txn_lane_size = entry_count * 8;
253 let ts_lane_size = entry_count * 8; let key_len_size = entry_count * 2; let key_data_size: usize = self.entries.iter().map(|e| e.key.len()).sum();
256 let value_len_size = entry_count * 4; let value_data_size: usize = self.entries.iter().map(|e| e.value.len()).sum();
258
259 let header_size = std::mem::size_of::<BlockHeader>();
260 let total_size = header_size
261 + op_lane_size
262 + txn_lane_size
263 + ts_lane_size
264 + key_len_size
265 + key_data_size
266 + value_len_size
267 + value_data_size;
268
269 let mut buffer = vec![0u8; total_size];
270 let mut offset = header_size;
271
272 let op_lane_offset = offset as u32;
274 for entry in &self.entries {
275 buffer[offset] = entry.op as u8;
276 offset += 1;
277 }
278
279 let txn_lane_offset = offset as u32;
281 for entry in &self.entries {
282 buffer[offset..offset + 8].copy_from_slice(&entry.txn_id.to_le_bytes());
283 offset += 8;
284 }
285
286 let ts_lane_offset = offset as u32;
288 for entry in &self.entries {
289 buffer[offset..offset + 8].copy_from_slice(&entry.timestamp.to_le_bytes());
290 offset += 8;
291 }
292
293 let key_len_lane_offset = offset as u32;
295 for entry in &self.entries {
296 let len = entry.key.len().min(MAX_KEY_SIZE) as u16;
297 buffer[offset..offset + 2].copy_from_slice(&len.to_le_bytes());
298 offset += 2;
299 }
300
301 let key_data_lane_offset = offset as u32;
303 for entry in &self.entries {
304 let len = entry.key.len().min(MAX_KEY_SIZE);
305 buffer[offset..offset + len].copy_from_slice(&entry.key[..len]);
306 offset += len;
307 }
308
309 let value_len_lane_offset = offset as u32;
311 for entry in &self.entries {
312 let len = entry.value.len() as u32;
313 buffer[offset..offset + 4].copy_from_slice(&len.to_le_bytes());
314 offset += 4;
315 }
316
317 let value_data_lane_offset = offset as u32;
319 for entry in &self.entries {
320 buffer[offset..offset + entry.value.len()].copy_from_slice(&entry.value);
321 offset += entry.value.len();
322 }
323
324 let checksum = crc32_simple(&buffer[header_size..offset]);
326
327 let header = BlockHeader {
329 magic: COLUMNAR_WAL_MAGIC,
330 version: 1,
331 entry_count: entry_count as u16,
332 _reserved: 0,
333 op_lane_offset,
334 txn_lane_offset,
335 ts_lane_offset,
336 key_len_lane_offset,
337 key_data_lane_offset,
338 value_len_lane_offset,
339 value_data_lane_offset,
340 block_size: offset as u32,
341 checksum,
342 };
343
344 let header_bytes = unsafe {
346 std::slice::from_raw_parts(
347 &header as *const BlockHeader as *const u8,
348 std::mem::size_of::<BlockHeader>(),
349 )
350 };
351 buffer[..header_size].copy_from_slice(header_bytes);
352
353 buffer.truncate(offset);
354 buffer
355 }
356
357 pub fn deserialize(data: &[u8]) -> io::Result<Self> {
359 let header_size = std::mem::size_of::<BlockHeader>();
360 if data.len() < header_size {
361 return Err(io::Error::new(io::ErrorKind::InvalidData, "buffer too small"));
362 }
363
364 let header = unsafe { &*(data.as_ptr() as *const BlockHeader) };
366
367 if header.magic != COLUMNAR_WAL_MAGIC {
369 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
370 }
371
372 let expected_checksum = header.checksum;
374 let actual_checksum = crc32_simple(&data[header_size..header.block_size as usize]);
375 if expected_checksum != actual_checksum {
376 return Err(io::Error::new(io::ErrorKind::InvalidData, "checksum mismatch"));
377 }
378
379 let entry_count = header.entry_count as usize;
380 let mut entries = Vec::with_capacity(entry_count);
381
382 let op_lane = &data[header.op_lane_offset as usize..];
384 let txn_lane = &data[header.txn_lane_offset as usize..];
385 let ts_lane = &data[header.ts_lane_offset as usize..];
386 let key_len_lane = &data[header.key_len_lane_offset as usize..];
387 let key_data_lane = &data[header.key_data_lane_offset as usize..];
388 let value_len_lane = &data[header.value_len_lane_offset as usize..];
389 let value_data_lane = &data[header.value_data_lane_offset as usize..];
390
391 let mut key_offset = 0usize;
392 let mut value_offset = 0usize;
393
394 for i in 0..entry_count {
395 let op = WalOpType::from_u8(op_lane[i])
396 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid op"))?;
397
398 let txn_id = u64::from_le_bytes(txn_lane[i * 8..i * 8 + 8].try_into().unwrap());
399 let timestamp = u64::from_le_bytes(ts_lane[i * 8..i * 8 + 8].try_into().unwrap());
400 let key_len = u16::from_le_bytes(key_len_lane[i * 2..i * 2 + 2].try_into().unwrap()) as usize;
401 let value_len = u32::from_le_bytes(value_len_lane[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
402
403 let key = key_data_lane[key_offset..key_offset + key_len].to_vec();
404 key_offset += key_len;
405
406 let value = value_data_lane[value_offset..value_offset + value_len].to_vec();
407 value_offset += value_len;
408
409 entries.push(WalEntry {
410 op,
411 txn_id,
412 timestamp,
413 key,
414 value,
415 });
416 }
417
418 Ok(Self {
419 entries,
420 batch_size: DEFAULT_BATCH_SIZE,
421 })
422 }
423
424 pub fn clear(&mut self) {
426 self.entries.clear();
427 }
428}
429
430impl Default for ColumnarWalBlock {
431 fn default() -> Self {
432 Self::new()
433 }
434}
435
436pub struct SimdTimestampDecoder {
442 base_ts: u64,
444}
445
446impl SimdTimestampDecoder {
447 pub fn new(base_ts: u64) -> Self {
449 Self { base_ts }
450 }
451
452 #[cfg(target_arch = "x86_64")]
457 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
458 #[cfg(target_arch = "x86_64")]
459 {
460 if is_x86_feature_detected!("avx2") && deltas.len() >= 4 {
461 unsafe { self.decode_deltas_avx2_impl(deltas, output) }
462 return;
463 }
464 }
465 self.decode_deltas_scalar(deltas, output);
466 }
467
468 #[cfg(target_arch = "x86_64")]
470 #[target_feature(enable = "avx2")]
471 unsafe fn decode_deltas_avx2_impl(&self, deltas: &[u64], output: &mut [u64]) {
472 use std::arch::x86_64::*;
473
474 let n = deltas.len();
475 let mut current = self.base_ts;
476 let mut i = 0;
477
478 while i + 4 <= n {
480 let _d = unsafe { _mm256_loadu_si256(deltas[i..].as_ptr() as *const __m256i) };
483
484 for j in 0..4 {
489 current = current.wrapping_add(deltas[i + j]);
490 output[i + j] = current;
491 }
492
493 i += 4;
494 }
495
496 while i < n {
498 current = current.wrapping_add(deltas[i]);
499 output[i] = current;
500 i += 1;
501 }
502 }
503
504 pub fn decode_deltas_scalar(&self, deltas: &[u64], output: &mut [u64]) {
506 let mut current = self.base_ts;
507 for (i, &delta) in deltas.iter().enumerate() {
508 current = current.wrapping_add(delta);
509 output[i] = current;
510 }
511 }
512
513 #[cfg(not(target_arch = "x86_64"))]
515 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
516 self.decode_deltas_scalar(deltas, output);
517 }
518}
519
520pub struct SimdKeyComparator;
522
523impl SimdKeyComparator {
524 #[cfg(target_arch = "x86_64")]
528 pub fn match_prefix_avx2(
529 key_lens: &[u16],
530 key_data: &[u8],
531 key_offsets: &[u32],
532 prefix: &[u8],
533 ) -> Vec<bool> {
534 let mut results = vec![false; key_lens.len()];
535 let prefix_len = prefix.len();
536
537 if prefix_len == 0 {
538 results.fill(true);
539 return results;
540 }
541
542 for (i, &len) in key_lens.iter().enumerate() {
543 if (len as usize) >= prefix_len {
544 let offset = key_offsets[i] as usize;
545 let key_slice = &key_data[offset..offset + prefix_len];
546 results[i] = key_slice == prefix;
547 }
548 }
549
550 results
551 }
552
553 #[cfg(not(target_arch = "x86_64"))]
555 pub fn match_prefix_avx2(
556 key_lens: &[u16],
557 key_data: &[u8],
558 key_offsets: &[u32],
559 prefix: &[u8],
560 ) -> Vec<bool> {
561 let mut results = vec![false; key_lens.len()];
562 let prefix_len = prefix.len();
563
564 if prefix_len == 0 {
565 results.fill(true);
566 return results;
567 }
568
569 for (i, &len) in key_lens.iter().enumerate() {
570 if (len as usize) >= prefix_len {
571 let offset = key_offsets[i] as usize;
572 let key_slice = &key_data[offset..offset + prefix_len];
573 results[i] = key_slice == prefix;
574 }
575 }
576
577 results
578 }
579}
580
581pub struct ColumnarWalWriter<W: Write> {
587 writer: W,
589 current_block: ColumnarWalBlock,
591 sequence: AtomicU64,
593 bytes_written: AtomicU64,
595 blocks_written: AtomicU64,
597}
598
599impl<W: Write> ColumnarWalWriter<W> {
600 pub fn new(writer: W) -> Self {
602 Self::with_batch_size(writer, DEFAULT_BATCH_SIZE)
603 }
604
605 pub fn with_batch_size(writer: W, batch_size: usize) -> Self {
607 Self {
608 writer,
609 current_block: ColumnarWalBlock::with_batch_size(batch_size),
610 sequence: AtomicU64::new(0),
611 bytes_written: AtomicU64::new(0),
612 blocks_written: AtomicU64::new(0),
613 }
614 }
615
616 pub fn write_entry(&mut self, entry: WalEntry) -> io::Result<()> {
618 if !self.current_block.add_entry(entry.clone()) {
619 self.flush_block()?;
621 if !self.current_block.add_entry(entry) {
623 return Err(io::Error::new(io::ErrorKind::InvalidData, "entry too large for block"));
624 }
625 }
626 Ok(())
627 }
628
629 pub fn flush_block(&mut self) -> io::Result<()> {
631 if self.current_block.is_empty() {
632 return Ok(());
633 }
634
635 let data = self.current_block.serialize();
636 self.writer.write_all(&data)?;
637
638 self.bytes_written.fetch_add(data.len() as u64, Ordering::Relaxed);
639 self.blocks_written.fetch_add(1, Ordering::Relaxed);
640 self.sequence.fetch_add(1, Ordering::Relaxed);
641
642 self.current_block.clear();
643 Ok(())
644 }
645
646 pub fn flush(&mut self) -> io::Result<()> {
648 self.flush_block()?;
649 self.writer.flush()
650 }
651
652 pub fn stats(&self) -> WalWriterStats {
654 WalWriterStats {
655 bytes_written: self.bytes_written.load(Ordering::Relaxed),
656 blocks_written: self.blocks_written.load(Ordering::Relaxed),
657 current_block_entries: self.current_block.len(),
658 }
659 }
660}
661
662#[derive(Debug, Clone)]
664pub struct WalWriterStats {
665 pub bytes_written: u64,
667 pub blocks_written: u64,
669 pub current_block_entries: usize,
671}
672
673pub struct ColumnarWalReader<R: Read> {
679 reader: R,
681 current_block: Option<ColumnarWalBlock>,
683 current_pos: usize,
685}
686
687impl<R: Read> ColumnarWalReader<R> {
688 pub fn new(reader: R) -> Self {
690 Self {
691 reader,
692 current_block: None,
693 current_pos: 0,
694 }
695 }
696
697 pub fn next_entry(&mut self) -> io::Result<Option<WalEntry>> {
699 if let Some(ref block) = self.current_block {
701 if self.current_pos < block.len() {
702 let entry = block.entries()[self.current_pos].clone();
703 self.current_pos += 1;
704 return Ok(Some(entry));
705 }
706 }
707
708 match self.read_block()? {
710 Some(block) => {
711 if block.is_empty() {
712 return Ok(None);
713 }
714 let entry = block.entries()[0].clone();
715 self.current_block = Some(block);
716 self.current_pos = 1;
717 Ok(Some(entry))
718 }
719 None => Ok(None),
720 }
721 }
722
723 fn read_block(&mut self) -> io::Result<Option<ColumnarWalBlock>> {
725 let header_size = std::mem::size_of::<BlockHeader>();
726 let mut header_buf = vec![0u8; header_size];
727
728 match self.reader.read_exact(&mut header_buf) {
729 Ok(_) => {}
730 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
731 Err(e) => return Err(e),
732 }
733
734 let header = unsafe { &*(header_buf.as_ptr() as *const BlockHeader) };
736
737 if header.magic != COLUMNAR_WAL_MAGIC {
738 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
739 }
740
741 let _remaining = header.block_size as usize - header_size;
742 let mut block_data = header_buf;
743 block_data.resize(header.block_size as usize, 0);
744 self.reader.read_exact(&mut block_data[header_size..])?;
745
746 ColumnarWalBlock::deserialize(&block_data).map(Some)
747 }
748
749 pub fn read_all(&mut self) -> io::Result<Vec<WalEntry>> {
751 let mut entries = Vec::new();
752 while let Some(entry) = self.next_entry()? {
753 entries.push(entry);
754 }
755 Ok(entries)
756 }
757}
758
759fn crc32_simple(data: &[u8]) -> u32 {
765 let mut crc = 0xFFFFFFFFu32;
766 for byte in data {
767 let index = ((crc ^ (*byte as u32)) & 0xFF) as usize;
768 crc = CRC32_TABLE[index] ^ (crc >> 8);
769 }
770 !crc
771}
772
773static CRC32_TABLE: [u32; 256] = {
775 let mut table = [0u32; 256];
776 let mut i = 0;
777 while i < 256 {
778 let mut crc = i as u32;
779 let mut j = 0;
780 while j < 8 {
781 if crc & 1 == 1 {
782 crc = 0xEDB88320 ^ (crc >> 1);
783 } else {
784 crc >>= 1;
785 }
786 j += 1;
787 }
788 table[i] = crc;
789 i += 1;
790 }
791 table
792};
793
794#[cfg(test)]
795mod tests {
796 use super::*;
797 use std::io::Cursor;
798
799 #[test]
800 fn test_wal_entry_creation() {
801 let entry = WalEntry::put(1, 100, b"key".to_vec(), b"value".to_vec());
802 assert_eq!(entry.op, WalOpType::Put);
803 assert_eq!(entry.txn_id, 1);
804 assert_eq!(entry.timestamp, 100);
805 assert_eq!(entry.key, b"key");
806 assert_eq!(entry.value, b"value");
807 }
808
809 #[test]
810 fn test_block_serialize_deserialize() {
811 let mut block = ColumnarWalBlock::new();
812
813 for i in 0..10 {
814 let entry = WalEntry::put(
815 i,
816 100 + i,
817 format!("key{}", i).into_bytes(),
818 format!("value{}", i).into_bytes(),
819 );
820 assert!(block.add_entry(entry));
821 }
822
823 let data = block.serialize();
824 let decoded = ColumnarWalBlock::deserialize(&data).unwrap();
825
826 assert_eq!(decoded.len(), 10);
827 for (i, entry) in decoded.entries().iter().enumerate() {
828 assert_eq!(entry.txn_id, i as u64);
829 assert_eq!(entry.timestamp, 100 + i as u64);
830 assert_eq!(entry.key, format!("key{}", i).into_bytes());
831 assert_eq!(entry.value, format!("value{}", i).into_bytes());
832 }
833 }
834
835 #[test]
836 fn test_block_full() {
837 let mut block = ColumnarWalBlock::with_batch_size(5);
838
839 for i in 0..5 {
840 let entry = WalEntry::put(i, i * 10, vec![i as u8], vec![]);
841 assert!(block.add_entry(entry));
842 }
843
844 assert!(block.is_full());
845
846 let entry = WalEntry::put(5, 50, vec![5], vec![]);
847 assert!(!block.add_entry(entry)); }
849
850 #[test]
851 fn test_writer_reader_roundtrip() {
852 let mut buffer = Vec::new();
853
854 {
856 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(&mut buffer), 10);
857
858 for i in 0..25 {
859 let entry = WalEntry::put(
860 i,
861 1000 + i,
862 format!("key_{}", i).into_bytes(),
863 format!("value_{}", i).into_bytes(),
864 );
865 writer.write_entry(entry).unwrap();
866 }
867
868 writer.flush().unwrap();
869 }
870
871 let mut reader = ColumnarWalReader::new(Cursor::new(&buffer));
873 let entries = reader.read_all().unwrap();
874
875 assert_eq!(entries.len(), 25);
876 for (i, entry) in entries.iter().enumerate() {
877 assert_eq!(entry.txn_id, i as u64);
878 assert_eq!(entry.timestamp, 1000 + i as u64);
879 assert_eq!(entry.key, format!("key_{}", i).into_bytes());
880 assert_eq!(entry.value, format!("value_{}", i).into_bytes());
881 }
882 }
883
884 #[test]
885 fn test_timestamp_decoder() {
886 let decoder = SimdTimestampDecoder::new(1000);
887 let deltas = vec![10, 20, 30, 40, 50, 60, 70, 80];
888 let mut output = vec![0u64; 8];
889
890 decoder.decode_deltas_scalar(&deltas, &mut output);
891
892 assert_eq!(output, vec![1010, 1030, 1060, 1100, 1150, 1210, 1280, 1360]);
893 }
894
895 #[test]
896 fn test_key_comparator() {
897 let key_lens = vec![4u16, 5, 4, 6, 4];
898 let key_data = b"key1key12key3key123key4";
899 let key_offsets = vec![0u32, 4, 9, 13, 19];
900
901 let results = SimdKeyComparator::match_prefix_avx2(
902 &key_lens,
903 key_data,
904 &key_offsets,
905 b"key",
906 );
907
908 assert!(results.iter().all(|&r| r)); let results = SimdKeyComparator::match_prefix_avx2(
911 &key_lens,
912 key_data,
913 &key_offsets,
914 b"key1",
915 );
916
917 assert_eq!(results, vec![true, true, false, true, false]);
918 }
919
920 #[test]
921 fn test_writer_stats() {
922 let buffer = Vec::new();
923 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(buffer), 10);
924
925 for i in 0..5 {
926 writer.write_entry(WalEntry::put(i, i, vec![0], vec![0])).unwrap();
927 }
928
929 let stats = writer.stats();
930 assert_eq!(stats.current_block_entries, 5);
931 assert_eq!(stats.blocks_written, 0);
932
933 writer.flush().unwrap();
934
935 let stats = writer.stats();
936 assert_eq!(stats.current_block_entries, 0);
937 assert_eq!(stats.blocks_written, 1);
938 }
939
940 #[test]
941 fn test_crc32() {
942 let data = b"hello world";
943 let crc = crc32_simple(data);
944 assert_eq!(crc, 0x0D4A1185);
946 }
947}