use bytes::{Bytes, BytesMut};
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use thiserror::Error;
const MAGIC: u32 = 0x564C_4F47;
const VERSION: u32 = 0x0000_0001;
const HEADER_SIZE: u64 = 8;
#[derive(Debug, Error)]
pub enum VLogError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("CRC mismatch: expected {expected:x}, got {actual:x}")]
CrcMismatch { expected: u32, actual: u32 },
#[error("Invalid record format")]
InvalidRecordFormat,
#[error("Invalid VLog format: bad magic or version")]
InvalidFormat,
}
pub type Result<T> = std::result::Result<T, VLogError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ValuePointer {
pub offset: u64,
pub length: u32,
}
impl ValuePointer {
#[must_use]
pub fn to_bytes(&self) -> Bytes {
let mut buf = bytes::BytesMut::with_capacity(12);
buf.extend_from_slice(&self.offset.to_le_bytes());
buf.extend_from_slice(&self.length.to_le_bytes());
buf.freeze()
}
}
#[derive(Debug, Clone)]
pub struct VLogRecord {
pub key: Bytes,
pub value: Bytes,
}
impl VLogRecord {
pub fn encode(&self) -> Bytes {
let key_len = self.key.len() as u32;
let value_len = self.value.len() as u32;
let total_len = 4 + key_len as usize + 4 + value_len as usize + 4;
let mut buf = BytesMut::with_capacity(total_len);
buf.extend_from_slice(&key_len.to_le_bytes());
buf.extend_from_slice(&self.key);
buf.extend_from_slice(&value_len.to_le_bytes());
buf.extend_from_slice(&self.value);
let mut crc = crc32c::crc32c(&key_len.to_le_bytes());
crc = crc32c::crc32c_append(crc, &self.key);
crc = crc32c::crc32c_append(crc, &value_len.to_le_bytes());
crc = crc32c::crc32c_append(crc, &self.value);
buf.extend_from_slice(&crc.to_le_bytes());
buf.freeze()
}
#[allow(clippy::needless_pass_by_value)] pub fn decode(data: Bytes) -> Result<Self> {
if data.len() < 12 {
return Err(VLogError::InvalidRecordFormat);
}
let mut offset = 0;
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
offset += 4;
if offset + key_len > data.len() {
return Err(VLogError::InvalidRecordFormat);
}
let key = data.slice(offset..offset + key_len);
offset += key_len;
if offset + 4 > data.len() {
return Err(VLogError::InvalidRecordFormat);
}
let value_len = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
if offset + value_len + 4 > data.len() {
return Err(VLogError::InvalidRecordFormat);
}
let value = data.slice(offset..offset + value_len);
offset += value_len;
let stored_crc = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
let mut computed_crc = crc32c::crc32c(&(key_len as u32).to_le_bytes());
computed_crc = crc32c::crc32c_append(computed_crc, &key);
computed_crc = crc32c::crc32c_append(computed_crc, &(value_len as u32).to_le_bytes());
computed_crc = crc32c::crc32c_append(computed_crc, &value);
if stored_crc != computed_crc {
return Err(VLogError::CrcMismatch {
expected: stored_crc,
actual: computed_crc,
});
}
Ok(Self { key, value })
}
}
#[derive(Debug)]
pub struct VLog {
file: File,
path: PathBuf,
head: u64, tail: u64, }
impl VLog {
pub fn create(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(&path)?;
file.write_all(&MAGIC.to_le_bytes())?;
file.write_all(&VERSION.to_le_bytes())?;
file.sync_all()?;
Ok(Self {
file,
path,
head: HEADER_SIZE,
tail: HEADER_SIZE,
})
}
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new().write(true).read(true).open(&path)?;
file.seek(SeekFrom::Start(0))?;
let mut header = [0u8; 8];
file.read_exact(&mut header)?;
let magic = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
if magic != MAGIC || version != VERSION {
return Err(VLogError::InvalidFormat);
}
let head = file.metadata()?.len();
Ok(Self {
file,
path,
head,
tail: HEADER_SIZE,
})
}
pub fn append(&mut self, key: &[u8], value: &[u8]) -> Result<ValuePointer> {
let record = VLogRecord {
key: Bytes::copy_from_slice(key),
value: Bytes::copy_from_slice(value),
};
let encoded = record.encode();
let record_offset = self.head;
self.file.seek(SeekFrom::Start(self.head))?;
self.file.write_all(&encoded)?;
self.head += encoded.len() as u64;
let key_len = key.len() as u64;
let value_offset = record_offset + 4 + key_len + 4;
let value_len = value.len() as u32;
Ok(ValuePointer {
offset: value_offset,
length: value_len,
})
}
pub fn sync(&mut self) -> Result<()> {
self.file.sync_data()?;
Ok(())
}
pub fn read(&mut self, pointer: ValuePointer) -> Result<Bytes> {
let mut buf = vec![0u8; pointer.length as usize];
self.file.seek(SeekFrom::Start(pointer.offset))?;
self.file.read_exact(&mut buf)?;
Ok(Bytes::from(buf))
}
pub fn read_record(&mut self, offset: u64) -> Result<(VLogRecord, u64)> {
self.file.seek(SeekFrom::Start(offset))?;
let mut len_buf = [0u8; 4];
self.file.read_exact(&mut len_buf)?;
let key_len = u32::from_le_bytes(len_buf) as usize;
let mut key_buf = vec![0u8; key_len];
self.file.read_exact(&mut key_buf)?;
self.file.read_exact(&mut len_buf)?;
let value_len = u32::from_le_bytes(len_buf) as usize;
let mut value_buf = vec![0u8; value_len];
self.file.read_exact(&mut value_buf)?;
self.file.read_exact(&mut len_buf)?;
let total_len = 4 + key_len + 4 + value_len + 4;
let mut record_data = BytesMut::with_capacity(total_len);
record_data.extend_from_slice(&(key_len as u32).to_le_bytes());
record_data.extend_from_slice(&key_buf);
record_data.extend_from_slice(&(value_len as u32).to_le_bytes());
record_data.extend_from_slice(&value_buf);
record_data.extend_from_slice(&len_buf);
let record = VLogRecord::decode(record_data.freeze())?;
let next_offset = offset + total_len as u64;
Ok((record, next_offset))
}
#[must_use]
pub const fn head(&self) -> u64 {
self.head
}
#[must_use]
pub const fn tail(&self) -> u64 {
self.tail
}
pub const fn set_tail(&mut self, tail: u64) {
self.tail = tail;
}
pub fn size(&self) -> Result<u64> {
Ok(self.file.metadata()?.len())
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
pub fn verify(&mut self) -> Result<VLogVerifyResult> {
let mut records_verified = 0u64;
let mut bytes_verified = 0u64;
let mut offset = HEADER_SIZE;
while offset < self.head {
match self.read_record(offset) {
Ok((_, next_offset)) => {
bytes_verified += next_offset - offset;
records_verified += 1;
offset = next_offset;
}
Err(VLogError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => return Err(e),
}
}
Ok(VLogVerifyResult {
records_verified,
bytes_verified,
})
}
}
#[derive(Debug, Clone, Default)]
pub struct VLogVerifyResult {
pub records_verified: u64,
pub bytes_verified: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_vlog_record_encode_decode() {
let record = VLogRecord {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
};
let encoded = record.encode();
let decoded = VLogRecord::decode(encoded).unwrap();
assert_eq!(record.key, decoded.key);
assert_eq!(record.value, decoded.value);
}
#[test]
fn test_vlog_append_and_read() {
let dir = tempdir().unwrap();
let vlog_path = dir.path().join("test.vlog");
let mut vlog = VLog::create(&vlog_path).unwrap();
let pointer = vlog.append(b"key1", b"value1").unwrap();
assert_eq!(pointer.length, 6);
let value = vlog.read(pointer).unwrap();
assert_eq!(value, Bytes::from("value1"));
}
#[test]
fn test_vlog_multiple_values() {
let dir = tempdir().unwrap();
let vlog_path = dir.path().join("test.vlog");
let mut vlog = VLog::create(&vlog_path).unwrap();
let p1 = vlog.append(b"key1", b"value1").unwrap();
let p2 = vlog.append(b"key2", b"value2").unwrap();
let p3 = vlog.append(b"key3", b"value3").unwrap();
assert_eq!(vlog.read(p1).unwrap(), Bytes::from("value1"));
assert_eq!(vlog.read(p2).unwrap(), Bytes::from("value2"));
assert_eq!(vlog.read(p3).unwrap(), Bytes::from("value3"));
}
#[test]
fn test_vlog_reopen() {
let dir = tempdir().unwrap();
let vlog_path = dir.path().join("test.vlog");
let pointer = {
let mut vlog = VLog::create(&vlog_path).unwrap();
vlog.append(b"key1", b"value1").unwrap()
};
let mut vlog = VLog::open(&vlog_path).unwrap();
let value = vlog.read(pointer).unwrap();
assert_eq!(value, Bytes::from("value1"));
}
#[test]
fn test_vlog_read_record() {
let dir = tempdir().unwrap();
let vlog_path = dir.path().join("test.vlog");
let mut vlog = VLog::create(&vlog_path).unwrap();
vlog.append(b"key1", b"value1").unwrap();
vlog.append(b"key2", b"value2").unwrap();
let (record, next_offset) = vlog.read_record(HEADER_SIZE).unwrap();
assert_eq!(record.key, Bytes::from("key1"));
assert_eq!(record.value, Bytes::from("value1"));
let (record, _) = vlog.read_record(next_offset).unwrap();
assert_eq!(record.key, Bytes::from("key2"));
assert_eq!(record.value, Bytes::from("value2"));
}
#[test]
fn test_vlog_large_values() {
let dir = tempdir().unwrap();
let vlog_path = dir.path().join("test.vlog");
let mut vlog = VLog::create(&vlog_path).unwrap();
let large_value = vec![b'x'; 4096];
let pointer = vlog.append(b"key1", &large_value).unwrap();
let value = vlog.read(pointer).unwrap();
assert_eq!(value.len(), 4096);
assert_eq!(value, Bytes::from(large_value));
}
}