use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicU64, Ordering};
const COLUMNAR_WAL_MAGIC: [u8; 4] = [0x43, 0x57, 0x01, 0x00];
const DEFAULT_BATCH_SIZE: usize = 256;
const MAX_KEY_SIZE: usize = 256;
#[allow(dead_code)]
const MAX_VALUE_SIZE: usize = 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum WalOpType {
Put = 0,
Delete = 1,
BeginTxn = 2,
CommitTxn = 3,
AbortTxn = 4,
Checkpoint = 5,
}
impl WalOpType {
fn from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Self::Put),
1 => Some(Self::Delete),
2 => Some(Self::BeginTxn),
3 => Some(Self::CommitTxn),
4 => Some(Self::AbortTxn),
5 => Some(Self::Checkpoint),
_ => None,
}
}
}
#[derive(Clone)]
pub struct WalEntry {
pub op: WalOpType,
pub txn_id: u64,
pub timestamp: u64,
pub key: Vec<u8>,
pub value: Vec<u8>,
}
impl WalEntry {
pub fn put(txn_id: u64, timestamp: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
Self {
op: WalOpType::Put,
txn_id,
timestamp,
key,
value,
}
}
pub fn delete(txn_id: u64, timestamp: u64, key: Vec<u8>) -> Self {
Self {
op: WalOpType::Delete,
txn_id,
timestamp,
key,
value: Vec::new(),
}
}
pub fn begin_txn(txn_id: u64, timestamp: u64) -> Self {
Self {
op: WalOpType::BeginTxn,
txn_id,
timestamp,
key: Vec::new(),
value: Vec::new(),
}
}
pub fn commit_txn(txn_id: u64, timestamp: u64) -> Self {
Self {
op: WalOpType::CommitTxn,
txn_id,
timestamp,
key: Vec::new(),
value: Vec::new(),
}
}
}
#[derive(Clone, Copy)]
#[repr(C, packed)]
struct BlockHeader {
magic: [u8; 4],
version: u8,
entry_count: u16,
_reserved: u8,
op_lane_offset: u32,
txn_lane_offset: u32,
ts_lane_offset: u32,
key_len_lane_offset: u32,
key_data_lane_offset: u32,
value_len_lane_offset: u32,
value_data_lane_offset: u32,
block_size: u32,
checksum: u32,
}
pub struct ColumnarWalBlock {
entries: Vec<WalEntry>,
batch_size: usize,
}
impl ColumnarWalBlock {
pub fn new() -> Self {
Self::with_batch_size(DEFAULT_BATCH_SIZE)
}
pub fn with_batch_size(batch_size: usize) -> Self {
Self {
entries: Vec::with_capacity(batch_size),
batch_size,
}
}
pub fn add_entry(&mut self, entry: WalEntry) -> bool {
if self.entries.len() >= self.batch_size {
return false;
}
self.entries.push(entry);
true
}
pub fn is_full(&self) -> bool {
self.entries.len() >= self.batch_size
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn entries(&self) -> &[WalEntry] {
&self.entries
}
pub fn serialize(&self) -> Vec<u8> {
let entry_count = self.entries.len();
if entry_count == 0 {
return Vec::new();
}
let op_lane_size = entry_count;
let txn_lane_size = entry_count * 8;
let ts_lane_size = entry_count * 8; let key_len_size = entry_count * 2; let key_data_size: usize = self.entries.iter().map(|e| e.key.len()).sum();
let value_len_size = entry_count * 4; let value_data_size: usize = self.entries.iter().map(|e| e.value.len()).sum();
let header_size = std::mem::size_of::<BlockHeader>();
let total_size = header_size
+ op_lane_size
+ txn_lane_size
+ ts_lane_size
+ key_len_size
+ key_data_size
+ value_len_size
+ value_data_size;
let mut buffer = vec![0u8; total_size];
let mut offset = header_size;
let op_lane_offset = offset as u32;
for entry in &self.entries {
buffer[offset] = entry.op as u8;
offset += 1;
}
let txn_lane_offset = offset as u32;
for entry in &self.entries {
buffer[offset..offset + 8].copy_from_slice(&entry.txn_id.to_le_bytes());
offset += 8;
}
let ts_lane_offset = offset as u32;
for entry in &self.entries {
buffer[offset..offset + 8].copy_from_slice(&entry.timestamp.to_le_bytes());
offset += 8;
}
let key_len_lane_offset = offset as u32;
for entry in &self.entries {
let len = entry.key.len().min(MAX_KEY_SIZE) as u16;
buffer[offset..offset + 2].copy_from_slice(&len.to_le_bytes());
offset += 2;
}
let key_data_lane_offset = offset as u32;
for entry in &self.entries {
let len = entry.key.len().min(MAX_KEY_SIZE);
buffer[offset..offset + len].copy_from_slice(&entry.key[..len]);
offset += len;
}
let value_len_lane_offset = offset as u32;
for entry in &self.entries {
let len = entry.value.len() as u32;
buffer[offset..offset + 4].copy_from_slice(&len.to_le_bytes());
offset += 4;
}
let value_data_lane_offset = offset as u32;
for entry in &self.entries {
buffer[offset..offset + entry.value.len()].copy_from_slice(&entry.value);
offset += entry.value.len();
}
let checksum = crc32_simple(&buffer[header_size..offset]);
let header = BlockHeader {
magic: COLUMNAR_WAL_MAGIC,
version: 1,
entry_count: entry_count as u16,
_reserved: 0,
op_lane_offset,
txn_lane_offset,
ts_lane_offset,
key_len_lane_offset,
key_data_lane_offset,
value_len_lane_offset,
value_data_lane_offset,
block_size: offset as u32,
checksum,
};
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const BlockHeader as *const u8,
std::mem::size_of::<BlockHeader>(),
)
};
buffer[..header_size].copy_from_slice(header_bytes);
buffer.truncate(offset);
buffer
}
pub fn deserialize(data: &[u8]) -> io::Result<Self> {
let header_size = std::mem::size_of::<BlockHeader>();
if data.len() < header_size {
return Err(io::Error::new(io::ErrorKind::InvalidData, "buffer too small"));
}
let header = unsafe { &*(data.as_ptr() as *const BlockHeader) };
if header.magic != COLUMNAR_WAL_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
}
let expected_checksum = header.checksum;
let actual_checksum = crc32_simple(&data[header_size..header.block_size as usize]);
if expected_checksum != actual_checksum {
return Err(io::Error::new(io::ErrorKind::InvalidData, "checksum mismatch"));
}
let entry_count = header.entry_count as usize;
let mut entries = Vec::with_capacity(entry_count);
let op_lane = &data[header.op_lane_offset as usize..];
let txn_lane = &data[header.txn_lane_offset as usize..];
let ts_lane = &data[header.ts_lane_offset as usize..];
let key_len_lane = &data[header.key_len_lane_offset as usize..];
let key_data_lane = &data[header.key_data_lane_offset as usize..];
let value_len_lane = &data[header.value_len_lane_offset as usize..];
let value_data_lane = &data[header.value_data_lane_offset as usize..];
let mut key_offset = 0usize;
let mut value_offset = 0usize;
for i in 0..entry_count {
let op = WalOpType::from_u8(op_lane[i])
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid op"))?;
let txn_id = u64::from_le_bytes(txn_lane[i * 8..i * 8 + 8].try_into().unwrap());
let timestamp = u64::from_le_bytes(ts_lane[i * 8..i * 8 + 8].try_into().unwrap());
let key_len = u16::from_le_bytes(key_len_lane[i * 2..i * 2 + 2].try_into().unwrap()) as usize;
let value_len = u32::from_le_bytes(value_len_lane[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
let key = key_data_lane[key_offset..key_offset + key_len].to_vec();
key_offset += key_len;
let value = value_data_lane[value_offset..value_offset + value_len].to_vec();
value_offset += value_len;
entries.push(WalEntry {
op,
txn_id,
timestamp,
key,
value,
});
}
Ok(Self {
entries,
batch_size: DEFAULT_BATCH_SIZE,
})
}
pub fn clear(&mut self) {
self.entries.clear();
}
}
impl Default for ColumnarWalBlock {
fn default() -> Self {
Self::new()
}
}
pub struct SimdTimestampDecoder {
base_ts: u64,
}
impl SimdTimestampDecoder {
pub fn new(base_ts: u64) -> Self {
Self { base_ts }
}
#[cfg(target_arch = "x86_64")]
pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
#[cfg(target_arch = "x86_64")]
{
if is_x86_feature_detected!("avx2") && deltas.len() >= 4 {
unsafe { self.decode_deltas_avx2_impl(deltas, output) }
return;
}
}
self.decode_deltas_scalar(deltas, output);
}
#[cfg(target_arch = "x86_64")]
#[target_feature(enable = "avx2")]
unsafe fn decode_deltas_avx2_impl(&self, deltas: &[u64], output: &mut [u64]) {
use std::arch::x86_64::*;
let n = deltas.len();
let mut current = self.base_ts;
let mut i = 0;
while i + 4 <= n {
let _d = unsafe { _mm256_loadu_si256(deltas[i..].as_ptr() as *const __m256i) };
for j in 0..4 {
current = current.wrapping_add(deltas[i + j]);
output[i + j] = current;
}
i += 4;
}
while i < n {
current = current.wrapping_add(deltas[i]);
output[i] = current;
i += 1;
}
}
pub fn decode_deltas_scalar(&self, deltas: &[u64], output: &mut [u64]) {
let mut current = self.base_ts;
for (i, &delta) in deltas.iter().enumerate() {
current = current.wrapping_add(delta);
output[i] = current;
}
}
#[cfg(not(target_arch = "x86_64"))]
pub fn decode_deltas_avx2(&self, deltas: &[u64], output: &mut [u64]) {
self.decode_deltas_scalar(deltas, output);
}
}
pub struct SimdKeyComparator;
impl SimdKeyComparator {
#[cfg(target_arch = "x86_64")]
pub fn match_prefix_avx2(
key_lens: &[u16],
key_data: &[u8],
key_offsets: &[u32],
prefix: &[u8],
) -> Vec<bool> {
let mut results = vec![false; key_lens.len()];
let prefix_len = prefix.len();
if prefix_len == 0 {
results.fill(true);
return results;
}
for (i, &len) in key_lens.iter().enumerate() {
if (len as usize) >= prefix_len {
let offset = key_offsets[i] as usize;
let key_slice = &key_data[offset..offset + prefix_len];
results[i] = key_slice == prefix;
}
}
results
}
#[cfg(not(target_arch = "x86_64"))]
pub fn match_prefix_avx2(
key_lens: &[u16],
key_data: &[u8],
key_offsets: &[u32],
prefix: &[u8],
) -> Vec<bool> {
let mut results = vec![false; key_lens.len()];
let prefix_len = prefix.len();
if prefix_len == 0 {
results.fill(true);
return results;
}
for (i, &len) in key_lens.iter().enumerate() {
if (len as usize) >= prefix_len {
let offset = key_offsets[i] as usize;
let key_slice = &key_data[offset..offset + prefix_len];
results[i] = key_slice == prefix;
}
}
results
}
}
pub struct ColumnarWalWriter<W: Write> {
writer: W,
current_block: ColumnarWalBlock,
sequence: AtomicU64,
bytes_written: AtomicU64,
blocks_written: AtomicU64,
}
impl<W: Write> ColumnarWalWriter<W> {
pub fn new(writer: W) -> Self {
Self::with_batch_size(writer, DEFAULT_BATCH_SIZE)
}
pub fn with_batch_size(writer: W, batch_size: usize) -> Self {
Self {
writer,
current_block: ColumnarWalBlock::with_batch_size(batch_size),
sequence: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
blocks_written: AtomicU64::new(0),
}
}
pub fn write_entry(&mut self, entry: WalEntry) -> io::Result<()> {
if !self.current_block.add_entry(entry.clone()) {
self.flush_block()?;
if !self.current_block.add_entry(entry) {
return Err(io::Error::new(io::ErrorKind::InvalidData, "entry too large for block"));
}
}
Ok(())
}
pub fn flush_block(&mut self) -> io::Result<()> {
if self.current_block.is_empty() {
return Ok(());
}
let data = self.current_block.serialize();
self.writer.write_all(&data)?;
self.bytes_written.fetch_add(data.len() as u64, Ordering::Relaxed);
self.blocks_written.fetch_add(1, Ordering::Relaxed);
self.sequence.fetch_add(1, Ordering::Relaxed);
self.current_block.clear();
Ok(())
}
pub fn flush(&mut self) -> io::Result<()> {
self.flush_block()?;
self.writer.flush()
}
pub fn stats(&self) -> WalWriterStats {
WalWriterStats {
bytes_written: self.bytes_written.load(Ordering::Relaxed),
blocks_written: self.blocks_written.load(Ordering::Relaxed),
current_block_entries: self.current_block.len(),
}
}
}
#[derive(Debug, Clone)]
pub struct WalWriterStats {
pub bytes_written: u64,
pub blocks_written: u64,
pub current_block_entries: usize,
}
pub struct ColumnarWalReader<R: Read> {
reader: R,
current_block: Option<ColumnarWalBlock>,
current_pos: usize,
}
impl<R: Read> ColumnarWalReader<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
current_block: None,
current_pos: 0,
}
}
pub fn next_entry(&mut self) -> io::Result<Option<WalEntry>> {
if let Some(ref block) = self.current_block {
if self.current_pos < block.len() {
let entry = block.entries()[self.current_pos].clone();
self.current_pos += 1;
return Ok(Some(entry));
}
}
match self.read_block()? {
Some(block) => {
if block.is_empty() {
return Ok(None);
}
let entry = block.entries()[0].clone();
self.current_block = Some(block);
self.current_pos = 1;
Ok(Some(entry))
}
None => Ok(None),
}
}
fn read_block(&mut self) -> io::Result<Option<ColumnarWalBlock>> {
let header_size = std::mem::size_of::<BlockHeader>();
let mut header_buf = vec![0u8; header_size];
match self.reader.read_exact(&mut header_buf) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
}
let header = unsafe { &*(header_buf.as_ptr() as *const BlockHeader) };
if header.magic != COLUMNAR_WAL_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid magic"));
}
let _remaining = header.block_size as usize - header_size;
let mut block_data = header_buf;
block_data.resize(header.block_size as usize, 0);
self.reader.read_exact(&mut block_data[header_size..])?;
ColumnarWalBlock::deserialize(&block_data).map(Some)
}
pub fn read_all(&mut self) -> io::Result<Vec<WalEntry>> {
let mut entries = Vec::new();
while let Some(entry) = self.next_entry()? {
entries.push(entry);
}
Ok(entries)
}
}
fn crc32_simple(data: &[u8]) -> u32 {
crc32fast::hash(data)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_wal_entry_creation() {
let entry = WalEntry::put(1, 100, b"key".to_vec(), b"value".to_vec());
assert_eq!(entry.op, WalOpType::Put);
assert_eq!(entry.txn_id, 1);
assert_eq!(entry.timestamp, 100);
assert_eq!(entry.key, b"key");
assert_eq!(entry.value, b"value");
}
#[test]
fn test_block_serialize_deserialize() {
let mut block = ColumnarWalBlock::new();
for i in 0..10 {
let entry = WalEntry::put(
i,
100 + i,
format!("key{}", i).into_bytes(),
format!("value{}", i).into_bytes(),
);
assert!(block.add_entry(entry));
}
let data = block.serialize();
let decoded = ColumnarWalBlock::deserialize(&data).unwrap();
assert_eq!(decoded.len(), 10);
for (i, entry) in decoded.entries().iter().enumerate() {
assert_eq!(entry.txn_id, i as u64);
assert_eq!(entry.timestamp, 100 + i as u64);
assert_eq!(entry.key, format!("key{}", i).into_bytes());
assert_eq!(entry.value, format!("value{}", i).into_bytes());
}
}
#[test]
fn test_block_full() {
let mut block = ColumnarWalBlock::with_batch_size(5);
for i in 0..5 {
let entry = WalEntry::put(i, i * 10, vec![i as u8], vec![]);
assert!(block.add_entry(entry));
}
assert!(block.is_full());
let entry = WalEntry::put(5, 50, vec![5], vec![]);
assert!(!block.add_entry(entry)); }
#[test]
fn test_writer_reader_roundtrip() {
let mut buffer = Vec::new();
{
let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(&mut buffer), 10);
for i in 0..25 {
let entry = WalEntry::put(
i,
1000 + i,
format!("key_{}", i).into_bytes(),
format!("value_{}", i).into_bytes(),
);
writer.write_entry(entry).unwrap();
}
writer.flush().unwrap();
}
let mut reader = ColumnarWalReader::new(Cursor::new(&buffer));
let entries = reader.read_all().unwrap();
assert_eq!(entries.len(), 25);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.txn_id, i as u64);
assert_eq!(entry.timestamp, 1000 + i as u64);
assert_eq!(entry.key, format!("key_{}", i).into_bytes());
assert_eq!(entry.value, format!("value_{}", i).into_bytes());
}
}
#[test]
fn test_timestamp_decoder() {
let decoder = SimdTimestampDecoder::new(1000);
let deltas = vec![10, 20, 30, 40, 50, 60, 70, 80];
let mut output = vec![0u64; 8];
decoder.decode_deltas_scalar(&deltas, &mut output);
assert_eq!(output, vec![1010, 1030, 1060, 1100, 1150, 1210, 1280, 1360]);
}
#[test]
fn test_key_comparator() {
let key_lens = vec![4u16, 5, 4, 6, 4];
let key_data = b"key1key12key3key123key4";
let key_offsets = vec![0u32, 4, 9, 13, 19];
let results = SimdKeyComparator::match_prefix_avx2(
&key_lens,
key_data,
&key_offsets,
b"key",
);
assert!(results.iter().all(|&r| r));
let results = SimdKeyComparator::match_prefix_avx2(
&key_lens,
key_data,
&key_offsets,
b"key1",
);
assert_eq!(results, vec![true, true, false, true, false]);
}
#[test]
fn test_writer_stats() {
let buffer = Vec::new();
let mut writer = ColumnarWalWriter::with_batch_size(Cursor::new(buffer), 10);
for i in 0..5 {
writer.write_entry(WalEntry::put(i, i, vec![0], vec![0])).unwrap();
}
let stats = writer.stats();
assert_eq!(stats.current_block_entries, 5);
assert_eq!(stats.blocks_written, 0);
writer.flush().unwrap();
let stats = writer.stats();
assert_eq!(stats.current_block_entries, 0);
assert_eq!(stats.blocks_written, 1);
}
#[test]
fn test_crc32() {
let data = b"hello world";
let crc = crc32_simple(data);
assert_eq!(crc, 0x0D4A1185);
}
}