1use std::io::{self, Read, Write};
44use std::sync::atomic::{AtomicU64, Ordering};
45
46const COLUMNAR_WAL_MAGIC: [u8; 4] = [0x43, 0x57, 0x01, 0x00]; const DEFAULT_BATCH_SIZE: usize = 256;
51
52const MAX_KEY_SIZE: usize = 256;
54
55#[allow(dead_code)]
57const MAX_VALUE_SIZE: usize = 1024 * 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
65#[repr(u8)]
66pub enum WalOpType {
67 Put = 0,
69 Delete = 1,
71 BeginTxn = 2,
73 CommitTxn = 3,
75 AbortTxn = 4,
77 Checkpoint = 5,
79}
80
81impl WalOpType {
82 fn from_u8(v: u8) -> Option<Self> {
83 match v {
84 0 => Some(Self::Put),
85 1 => Some(Self::Delete),
86 2 => Some(Self::BeginTxn),
87 3 => Some(Self::CommitTxn),
88 4 => Some(Self::AbortTxn),
89 5 => Some(Self::Checkpoint),
90 _ => None,
91 }
92 }
93}
94
95#[derive(Clone)]
97pub struct WalEntry {
98 pub op: WalOpType,
100 pub txn_id: u64,
102 pub timestamp: u64,
104 pub key: Vec<u8>,
106 pub value: Vec<u8>,
108}
109
110impl WalEntry {
111 pub fn put(txn_id: u64, timestamp: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
113 Self {
114 op: WalOpType::Put,
115 txn_id,
116 timestamp,
117 key,
118 value,
119 }
120 }
121
122 pub fn delete(txn_id: u64, timestamp: u64, key: Vec<u8>) -> Self {
124 Self {
125 op: WalOpType::Delete,
126 txn_id,
127 timestamp,
128 key,
129 value: Vec::new(),
130 }
131 }
132
133 pub fn begin_txn(txn_id: u64, timestamp: u64) -> Self {
135 Self {
136 op: WalOpType::BeginTxn,
137 txn_id,
138 timestamp,
139 key: Vec::new(),
140 value: Vec::new(),
141 }
142 }
143
144 pub fn commit_txn(txn_id: u64, timestamp: u64) -> Self {
146 Self {
147 op: WalOpType::CommitTxn,
148 txn_id,
149 timestamp,
150 key: Vec::new(),
151 value: Vec::new(),
152 }
153 }
154}
155
156#[derive(Clone, Copy)]
162#[repr(C, packed)]
163struct BlockHeader {
164 magic: [u8; 4],
166 version: u8,
168 entry_count: u16,
170 _reserved: u8,
172 op_lane_offset: u32,
174 txn_lane_offset: u32,
176 ts_lane_offset: u32,
178 key_len_lane_offset: u32,
180 key_data_lane_offset: u32,
182 value_len_lane_offset: u32,
184 value_data_lane_offset: u32,
186 block_size: u32,
188 checksum: u32,
190}
191
192pub struct ColumnarWalBlock {
194 entries: Vec<WalEntry>,
196 batch_size: usize,
198}
199
200impl ColumnarWalBlock {
201 pub fn new() -> Self {
203 Self::with_batch_size(DEFAULT_BATCH_SIZE)
204 }
205
206 pub fn with_batch_size(batch_size: usize) -> Self {
208 Self {
209 entries: Vec::with_capacity(batch_size),
210 batch_size,
211 }
212 }
213
214 pub fn add_entry(&mut self, entry: WalEntry) -> bool {
216 if self.entries.len() >= self.batch_size {
217 return false;
218 }
219 self.entries.push(entry);
220 true
221 }
222
223 pub fn is_full(&self) -> bool {
225 self.entries.len() >= self.batch_size
226 }
227
228 pub fn len(&self) -> usize {
230 self.entries.len()
231 }
232
233 pub fn is_empty(&self) -> bool {
235 self.entries.is_empty()
236 }
237
238 pub fn entries(&self) -> &[WalEntry] {
240 &self.entries
241 }
242
243 pub fn serialize(&self) -> Vec<u8> {
245 let entry_count = self.entries.len();
246 if entry_count == 0 {
247 return Vec::new();
248 }
249
250 let op_lane_size = entry_count;
252 let txn_lane_size = entry_count * 8;
253 let ts_lane_size = entry_count * 8; let key_len_size = entry_count * 2; let key_data_size: usize = self.entries.iter().map(|e| e.key.len()).sum();
256 let value_len_size = entry_count * 4; let value_data_size: usize = self.entries.iter().map(|e| e.value.len()).sum();
258
259 let header_size = std::mem::size_of::<BlockHeader>();
260 let total_size = header_size
261 + op_lane_size
262 + txn_lane_size
263 + ts_lane_size
264 + key_len_size
265 + key_data_size
266 + value_len_size
267 + value_data_size;
268
269 let mut buffer = vec![0u8; total_size];
270 let mut offset = header_size;
271
272 let op_lane_offset = offset as u32;
274 for entry in &self.entries {
275 buffer[offset] = entry.op as u8;
276 offset += 1;
277 }
278
279 let txn_lane_offset = offset as u32;
281 for entry in &self.entries {
282 buffer[offset..offset + 8].copy_from_slice(&entry.txn_id.to_le_bytes());
283 offset += 8;
284 }
285
286 let ts_lane_offset = offset as u32;
288 for entry in &self.entries {
289 buffer[offset..offset + 8].copy_from_slice(&entry.timestamp.to_le_bytes());
290 offset += 8;
291 }
292
293 let key_len_lane_offset = offset as u32;
295 for entry in &self.entries {
296 let len = entry.key.len().min(MAX_KEY_SIZE) as u16;
297 buffer[offset..offset + 2].copy_from_slice(&len.to_le_bytes());
298 offset += 2;
299 }
300
301 let key_data_lane_offset = offset as u32;
303 for entry in &self.entries {
304 let len = entry.key.len().min(MAX_KEY_SIZE);
305 buffer[offset..offset + len].copy_from_slice(&entry.key[..len]);
306 offset += len;
307 }
308
309 let value_len_lane_offset = offset as u32;
311 for entry in &self.entries {
312 let len = entry.value.len() as u32;
313 buffer[offset..offset + 4].copy_from_slice(&len.to_le_bytes());
314 offset += 4;
315 }
316
317 let value_data_lane_offset = offset as u32;
319 for entry in &self.entries {
320 buffer[offset..offset + entry.value.len()].copy_from_slice(&entry.value);
321 offset += entry.value.len();
322 }
323
324 let checksum = crc32_simple(&buffer[header_size..offset]);
326
327 let header = BlockHeader {
329 magic: COLUMNAR_WAL_MAGIC,
330 version: 1,
331 entry_count: entry_count as u16,
332 _reserved: 0,
333 op_lane_offset,
334 txn_lane_offset,
335 ts_lane_offset,
336 key_len_lane_offset,
337 key_data_lane_offset,
338 value_len_lane_offset,
339 value_data_lane_offset,
340 block_size: offset as u32,
341 checksum,
342 };
343
344 let header_bytes = unsafe {
346 std::slice::from_raw_parts(
347 &header as *const BlockHeader as *const u8,
348 std::mem::size_of::<BlockHeader>(),
349 )
350 };
351 buffer[..header_size].copy_from_slice(header_bytes);
352
353 buffer.truncate(offset);
354 buffer
355 }
356
357 pub fn deserialize(data: &[u8]) -> io::Result<Self> {
359 let header_size = std::mem::size_of::<BlockHeader>();
360 if data.len() < header_size {
361 return Err(io::Error::new(io::ErrorKind::InvalidData, "buffer too small"));
362 }
363
364 let header = unsafe { &*(data.as_ptr() as *const BlockHeader) };
366
367 if header.magic != COLUMNAR_WAL_MAGIC {
369 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
370 }
371
372 let expected_checksum = header.checksum;
374 let actual_checksum = crc32_simple(&data[header_size..header.block_size as usize]);
375 if expected_checksum != actual_checksum {
376 return Err(io::Error::new(io::ErrorKind::InvalidData, "checksum mismatch"));
377 }
378
379 let entry_count = header.entry_count as usize;
380 let mut entries = Vec::with_capacity(entry_count);
381
382 let op_lane = &data[header.op_lane_offset as usize..];
384 let txn_lane = &data[header.txn_lane_offset as usize..];
385 let ts_lane = &data[header.ts_lane_offset as usize..];
386 let key_len_lane = &data[header.key_len_lane_offset as usize..];
387 let key_data_lane = &data[header.key_data_lane_offset as usize..];
388 let value_len_lane = &data[header.value_len_lane_offset as usize..];
389 let value_data_lane = &data[header.value_data_lane_offset as usize..];
390
391 let mut key_offset = 0usize;
392 let mut value_offset = 0usize;
393
394 for i in 0..entry_count {
395 let op = WalOpType::from_u8(op_lane[i])
396 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid op"))?;
397
398 let txn_id = u64::from_le_bytes(txn_lane[i * 8..i * 8 + 8].try_into().unwrap());
399 let timestamp = u64::from_le_bytes(ts_lane[i * 8..i * 8 + 8].try_into().unwrap());
400 let key_len = u16::from_le_bytes(key_len_lane[i * 2..i * 2 + 2].try_into().unwrap()) as usize;
401 let value_len = u32::from_le_bytes(value_len_lane[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
402
403 let key = key_data_lane[key_offset..key_offset + key_len].to_vec();
404 key_offset += key_len;
405
406 let value = value_data_lane[value_offset..value_offset + value_len].to_vec();
407 value_offset += value_len;
408
409 entries.push(WalEntry {
410 op,
411 txn_id,
412 timestamp,
413 key,
414 value,
415 });
416 }
417
418 Ok(Self {
419 entries,
420 batch_size: DEFAULT_BATCH_SIZE,
421 })
422 }
423
424 pub fn clear(&mut self) {
426 self.entries.clear();
427 }
428}
429
430impl Default for ColumnarWalBlock {
431 fn default() -> Self {
432 Self::new()
433 }
434}
435
436pub struct SimdTimestampDecoder {
442 base_ts: u64,
444}
445
446impl SimdTimestampDecoder {
447 pub fn new(base_ts: u64) -> Self {
449 Self { base_ts }
450 }
451
452 #[cfg(target_arch = "x86_64")]
457 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
458 #[cfg(target_arch = "x86_64")]
459 {
460 if is_x86_feature_detected!("avx2") && deltas.len() >= 4 {
461 unsafe { self.decode_deltas_avx2_impl(deltas, output) }
462 return;
463 }
464 }
465 self.decode_deltas_scalar(deltas, output);
466 }
467
468 #[cfg(target_arch = "x86_64")]
470 #[target_feature(enable = "avx2")]
471 unsafe fn decode_deltas_avx2_impl(&self, deltas: &[u64], output: &mut [u64]) {
472 use std::arch::x86_64::*;
473
474 let n = deltas.len();
475 let mut current = self.base_ts;
476 let mut i = 0;
477
478 while i + 4 <= n {
480 let _d = unsafe { _mm256_loadu_si256(deltas[i..].as_ptr() as *const __m256i) };
483
484 for j in 0..4 {
489 current = current.wrapping_add(deltas[i + j]);
490 output[i + j] = current;
491 }
492
493 i += 4;
494 }
495
496 while i < n {
498 current = current.wrapping_add(deltas[i]);
499 output[i] = current;
500 i += 1;
501 }
502 }
503
504 pub fn decode_deltas_scalar(&self, deltas: &[u64], output: &mut [u64]) {
506 let mut current = self.base_ts;
507 for (i, &delta) in deltas.iter().enumerate() {
508 current = current.wrapping_add(delta);
509 output[i] = current;
510 }
511 }
512
513 #[cfg(not(target_arch = "x86_64"))]
515 pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
516 self.decode_deltas_scalar(deltas, output);
517 }
518}
519
520pub struct SimdKeyComparator;
522
523impl SimdKeyComparator {
524 #[cfg(target_arch = "x86_64")]
528 pub fn match_prefix_avx2(
529 key_lens: &[u16],
530 key_data: &[u8],
531 key_offsets: &[u32],
532 prefix: &[u8],
533 ) -> Vec<bool> {
534 let mut results = vec![false; key_lens.len()];
535 let prefix_len = prefix.len();
536
537 if prefix_len == 0 {
538 results.fill(true);
539 return results;
540 }
541
542 for (i, &len) in key_lens.iter().enumerate() {
543 if (len as usize) >= prefix_len {
544 let offset = key_offsets[i] as usize;
545 let key_slice = &key_data[offset..offset + prefix_len];
546 results[i] = key_slice == prefix;
547 }
548 }
549
550 results
551 }
552
553 #[cfg(not(target_arch = "x86_64"))]
555 pub fn match_prefix_avx2(
556 key_lens: &[u16],
557 key_data: &[u8],
558 key_offsets: &[u32],
559 prefix: &[u8],
560 ) -> Vec<bool> {
561 let mut results = vec![false; key_lens.len()];
562 let prefix_len = prefix.len();
563
564 if prefix_len == 0 {
565 results.fill(true);
566 return results;
567 }
568
569 for (i, &len) in key_lens.iter().enumerate() {
570 if (len as usize) >= prefix_len {
571 let offset = key_offsets[i] as usize;
572 let key_slice = &key_data[offset..offset + prefix_len];
573 results[i] = key_slice == prefix;
574 }
575 }
576
577 results
578 }
579}
580
581pub struct ColumnarWalWriter<W: Write> {
587 writer: W,
589 current_block: ColumnarWalBlock,
591 sequence: AtomicU64,
593 bytes_written: AtomicU64,
595 blocks_written: AtomicU64,
597}
598
599impl<W: Write> ColumnarWalWriter<W> {
600 pub fn new(writer: W) -> Self {
602 Self::with_batch_size(writer, DEFAULT_BATCH_SIZE)
603 }
604
605 pub fn with_batch_size(writer: W, batch_size: usize) -> Self {
607 Self {
608 writer,
609 current_block: ColumnarWalBlock::with_batch_size(batch_size),
610 sequence: AtomicU64::new(0),
611 bytes_written: AtomicU64::new(0),
612 blocks_written: AtomicU64::new(0),
613 }
614 }
615
616 pub fn write_entry(&mut self, entry: WalEntry) -> io::Result<()> {
618 if !self.current_block.add_entry(entry.clone()) {
619 self.flush_block()?;
621 if !self.current_block.add_entry(entry) {
623 return Err(io::Error::new(io::ErrorKind::InvalidData, "entry too large for block"));
624 }
625 }
626 Ok(())
627 }
628
629 pub fn flush_block(&mut self) -> io::Result<()> {
631 if self.current_block.is_empty() {
632 return Ok(());
633 }
634
635 let data = self.current_block.serialize();
636 self.writer.write_all(&data)?;
637
638 self.bytes_written.fetch_add(data.len() as u64, Ordering::Relaxed);
639 self.blocks_written.fetch_add(1, Ordering::Relaxed);
640 self.sequence.fetch_add(1, Ordering::Relaxed);
641
642 self.current_block.clear();
643 Ok(())
644 }
645
646 pub fn flush(&mut self) -> io::Result<()> {
648 self.flush_block()?;
649 self.writer.flush()
650 }
651
652 pub fn stats(&self) -> WalWriterStats {
654 WalWriterStats {
655 bytes_written: self.bytes_written.load(Ordering::Relaxed),
656 blocks_written: self.blocks_written.load(Ordering::Relaxed),
657 current_block_entries: self.current_block.len(),
658 }
659 }
660}
661
662#[derive(Debug, Clone)]
664pub struct WalWriterStats {
665 pub bytes_written: u64,
667 pub blocks_written: u64,
669 pub current_block_entries: usize,
671}
672
673pub struct ColumnarWalReader<R: Read> {
679 reader: R,
681 current_block: Option<ColumnarWalBlock>,
683 current_pos: usize,
685}
686
687impl<R: Read> ColumnarWalReader<R> {
688 pub fn new(reader: R) -> Self {
690 Self {
691 reader,
692 current_block: None,
693 current_pos: 0,
694 }
695 }
696
697 pub fn next_entry(&mut self) -> io::Result<Option<WalEntry>> {
699 if let Some(ref block) = self.current_block {
701 if self.current_pos < block.len() {
702 let entry = block.entries()[self.current_pos].clone();
703 self.current_pos += 1;
704 return Ok(Some(entry));
705 }
706 }
707
708 match self.read_block()? {
710 Some(block) => {
711 if block.is_empty() {
712 return Ok(None);
713 }
714 let entry = block.entries()[0].clone();
715 self.current_block = Some(block);
716 self.current_pos = 1;
717 Ok(Some(entry))
718 }
719 None => Ok(None),
720 }
721 }
722
723 fn read_block(&mut self) -> io::Result<Option<ColumnarWalBlock>> {
725 let header_size = std::mem::size_of::<BlockHeader>();
726 let mut header_buf = vec![0u8; header_size];
727
728 match self.reader.read_exact(&mut header_buf) {
729 Ok(_) => {}
730 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
731 Err(e) => return Err(e),
732 }
733
734 let header = unsafe { &*(header_buf.as_ptr() as *const BlockHeader) };
736
737 if header.magic != COLUMNAR_WAL_MAGIC {
738 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
739 }
740
741 let _remaining = header.block_size as usize - header_size;
742 let mut block_data = header_buf;
743 block_data.resize(header.block_size as usize, 0);
744 self.reader.read_exact(&mut block_data[header_size..])?;
745
746 ColumnarWalBlock::deserialize(&block_data).map(Some)
747 }
748
749 pub fn read_all(&mut self) -> io::Result<Vec<WalEntry>> {
751 let mut entries = Vec::new();
752 while let Some(entry) = self.next_entry()? {
753 entries.push(entry);
754 }
755 Ok(entries)
756 }
757}
758
759fn crc32_simple(data: &[u8]) -> u32 {
768 crc32fast::hash(data)
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774 use std::io::Cursor;
775
776 #[test]
777 fn test_wal_entry_creation() {
778 let entry = WalEntry::put(1, 100, b"key".to_vec(), b"value".to_vec());
779 assert_eq!(entry.op, WalOpType::Put);
780 assert_eq!(entry.txn_id, 1);
781 assert_eq!(entry.timestamp, 100);
782 assert_eq!(entry.key, b"key");
783 assert_eq!(entry.value, b"value");
784 }
785
786 #[test]
787 fn test_block_serialize_deserialize() {
788 let mut block = ColumnarWalBlock::new();
789
790 for i in 0..10 {
791 let entry = WalEntry::put(
792 i,
793 100 + i,
794 format!("key{}", i).into_bytes(),
795 format!("value{}", i).into_bytes(),
796 );
797 assert!(block.add_entry(entry));
798 }
799
800 let data = block.serialize();
801 let decoded = ColumnarWalBlock::deserialize(&data).unwrap();
802
803 assert_eq!(decoded.len(), 10);
804 for (i, entry) in decoded.entries().iter().enumerate() {
805 assert_eq!(entry.txn_id, i as u64);
806 assert_eq!(entry.timestamp, 100 + i as u64);
807 assert_eq!(entry.key, format!("key{}", i).into_bytes());
808 assert_eq!(entry.value, format!("value{}", i).into_bytes());
809 }
810 }
811
812 #[test]
813 fn test_block_full() {
814 let mut block = ColumnarWalBlock::with_batch_size(5);
815
816 for i in 0..5 {
817 let entry = WalEntry::put(i, i * 10, vec![i as u8], vec![]);
818 assert!(block.add_entry(entry));
819 }
820
821 assert!(block.is_full());
822
823 let entry = WalEntry::put(5, 50, vec![5], vec![]);
824 assert!(!block.add_entry(entry)); }
826
827 #[test]
828 fn test_writer_reader_roundtrip() {
829 let mut buffer = Vec::new();
830
831 {
833 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(&mut buffer), 10);
834
835 for i in 0..25 {
836 let entry = WalEntry::put(
837 i,
838 1000 + i,
839 format!("key_{}", i).into_bytes(),
840 format!("value_{}", i).into_bytes(),
841 );
842 writer.write_entry(entry).unwrap();
843 }
844
845 writer.flush().unwrap();
846 }
847
848 let mut reader = ColumnarWalReader::new(Cursor::new(&buffer));
850 let entries = reader.read_all().unwrap();
851
852 assert_eq!(entries.len(), 25);
853 for (i, entry) in entries.iter().enumerate() {
854 assert_eq!(entry.txn_id, i as u64);
855 assert_eq!(entry.timestamp, 1000 + i as u64);
856 assert_eq!(entry.key, format!("key_{}", i).into_bytes());
857 assert_eq!(entry.value, format!("value_{}", i).into_bytes());
858 }
859 }
860
861 #[test]
862 fn test_timestamp_decoder() {
863 let decoder = SimdTimestampDecoder::new(1000);
864 let deltas = vec![10, 20, 30, 40, 50, 60, 70, 80];
865 let mut output = vec![0u64; 8];
866
867 decoder.decode_deltas_scalar(&deltas, &mut output);
868
869 assert_eq!(output, vec![1010, 1030, 1060, 1100, 1150, 1210, 1280, 1360]);
870 }
871
872 #[test]
873 fn test_key_comparator() {
874 let key_lens = vec![4u16, 5, 4, 6, 4];
875 let key_data = b"key1key12key3key123key4";
876 let key_offsets = vec![0u32, 4, 9, 13, 19];
877
878 let results = SimdKeyComparator::match_prefix_avx2(
879 &key_lens,
880 key_data,
881 &key_offsets,
882 b"key",
883 );
884
885 assert!(results.iter().all(|&r| r)); let results = SimdKeyComparator::match_prefix_avx2(
888 &key_lens,
889 key_data,
890 &key_offsets,
891 b"key1",
892 );
893
894 assert_eq!(results, vec![true, true, false, true, false]);
895 }
896
897 #[test]
898 fn test_writer_stats() {
899 let buffer = Vec::new();
900 let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(buffer), 10);
901
902 for i in 0..5 {
903 writer.write_entry(WalEntry::put(i, i, vec![0], vec![0])).unwrap();
904 }
905
906 let stats = writer.stats();
907 assert_eq!(stats.current_block_entries, 5);
908 assert_eq!(stats.blocks_written, 0);
909
910 writer.flush().unwrap();
911
912 let stats = writer.stats();
913 assert_eq!(stats.current_block_entries, 0);
914 assert_eq!(stats.blocks_written, 1);
915 }
916
917 #[test]
918 fn test_crc32() {
919 let data = b"hello world";
920 let crc = crc32_simple(data);
921 assert_eq!(crc, 0x0D4A1185);
923 }
924}