use crate::config::{CompressionAlgorithm, ValueLogConfig};
use crate::error::{Error, Result};
use crate::storage::{Value, ValuePointer};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VlogHeader {
pub magic: [u8; 8],
pub version: u32,
pub created_at: u64,
pub compression: CompressionAlgorithm,
pub checksum: u32,
}
impl VlogHeader {
const MAGIC: [u8; 8] = [0x41, 0x55, 0x52, 0x41, 0x44, 0x42, 0x56, 0x4C]; const VERSION: u32 = 1;
pub fn new(compression: CompressionAlgorithm) -> Self {
let created_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
magic: Self::MAGIC,
version: Self::VERSION,
created_at,
compression,
checksum: 0, }
}
pub fn calculate_checksum(&self) -> u32 {
use crc32fast::Hasher;
let mut hasher = Hasher::new();
hasher.update(&self.magic);
hasher.update(&self.version.to_le_bytes());
hasher.update(&self.created_at.to_le_bytes());
hasher.update(&(self.compression as u8).to_le_bytes());
hasher.finalize()
}
pub fn validate(&self) -> bool {
self.magic == Self::MAGIC
&& self.version == Self::VERSION
&& self.checksum == self.calculate_checksum()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VlogEntry {
pub length: u32,
pub compression: CompressionAlgorithm,
pub checksum: u32,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VlogSegmentMeta {
pub path: PathBuf,
pub size: u64,
pub entry_count: u64,
pub first_offset: u64,
pub last_offset: u64,
pub created_at: u64,
pub closed: bool,
}
pub struct VlogWriter {
segments: Vec<Arc<RwLock<VlogSegment>>>,
config: ValueLogConfig,
next_segment_id: AtomicU64,
vlog_dir: PathBuf,
write_queues: Vec<mpsc::UnboundedSender<WriteRequest>>,
background_handles: Vec<JoinHandle<()>>,
segment_metadata: HashMap<u64, VlogSegmentMeta>,
}
impl VlogWriter {
pub fn new(config: ValueLogConfig) -> Result<Self> {
let vlog_dir = config.vlog_path.clone();
std::fs::create_dir_all(&vlog_dir)?;
let mut writer = Self {
segments: Vec::new(),
config,
next_segment_id: AtomicU64::new(1),
vlog_dir,
write_queues: Vec::new(),
background_handles: Vec::new(),
segment_metadata: HashMap::new(),
};
writer.initialize_write_queues()?;
for _ in 0..writer.config.write_queues {
writer.create_new_segment()?;
}
Ok(writer)
}
fn initialize_write_queues(&mut self) -> Result<()> {
for queue_id in 0..self.config.write_queues {
let (tx, mut rx) = mpsc::unbounded_channel();
self.write_queues.push(tx);
let vlog_dir = self.vlog_dir.clone();
let config = self.config.clone();
let queue_id = queue_id;
let handle = tokio::spawn(async move {
let mut current_segment = None;
let mut write_buffer = Vec::new();
while let Some(request) = rx.recv().await {
match request {
WriteRequest::Write { value, callback } => {
write_buffer.push((value, callback));
if write_buffer.len() >= 100 {
if let Err(e) = Self::flush_values(&mut current_segment, &vlog_dir, &config, &mut write_buffer, queue_id).await {
error!("Failed to flush values in queue {}: {}", queue_id, e);
}
}
}
WriteRequest::Sync => {
if let Err(e) = Self::flush_values(&mut current_segment, &vlog_dir, &config, &mut write_buffer, queue_id).await {
error!("Failed to sync values in queue {}: {}", queue_id, e);
}
}
WriteRequest::Shutdown => break,
}
}
});
self.background_handles.push(handle);
}
Ok(())
}
async fn flush_values(
current_segment: &mut Option<VlogSegment>,
vlog_dir: &PathBuf,
config: &ValueLogConfig,
write_buffer: &mut Vec<(Value, Option<WriteCallback>)>,
queue_id: usize,
) -> Result<()> {
if write_buffer.is_empty() {
return Ok(());
}
if current_segment.is_none() {
*current_segment = Some(VlogSegment::new(vlog_dir, config, queue_id as u64)?);
}
let segment = current_segment.as_mut().unwrap();
for (value, callback) in write_buffer.drain(..) {
match segment.write_value(&value) {
Ok(vptr) => {
if let Some(cb) = &callback {
match cb {
WriteCallback::Channel(sender) => { let _ = sender.send(Ok(vptr)); }
WriteCallback::None => {}
}
}
}
Err(e) => {
if let Some(cb) = &callback {
match cb {
WriteCallback::Channel(sender) => { let _ = sender.send(Err(e)); }
WriteCallback::None => {}
}
}
}
}
}
if segment.should_rotate() {
segment.close()?;
*current_segment = Some(VlogSegment::new(vlog_dir, config, queue_id as u64)?);
}
Ok(())
}
pub async fn write_value(&self, value: Value) -> Result<ValuePointer> {
let queue_id = self.choose_write_queue(&value);
let (tx, mut rx) = mpsc::channel(1);
let callback = WriteCallback::Channel(tx);
if let Some(sender) = self.write_queues.get(queue_id) {
let request = WriteRequest::Write { value, callback };
sender.send(request).map_err(|_| Error::Concurrency("Failed to send write request".to_string()))?;
} else {
return Err(Error::Concurrency("Invalid write queue".to_string()));
}
match rx.recv().await {
Some(result) => result,
None => Err(Error::Concurrency("Write request timed out".to_string())),
}
}
pub fn write_value_sync(&mut self, value: Value) -> Result<ValuePointer> {
let segment_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst) % self.segments.len() as u64;
if let Some(segment) = self.segments.get(segment_id as usize) {
let mut segment = segment.write();
segment.write_value(&value)
} else {
Err(Error::Concurrency("No available segments".to_string()))
}
}
fn choose_write_queue(&self, value: &Value) -> usize {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
value.data.hash(&mut hasher);
(hasher.finish() % self.config.write_queues as u64) as usize
}
fn create_new_segment(&mut self) -> Result<()> {
let segment_id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
let segment = VlogSegment::new(&self.vlog_dir, &self.config, segment_id)?;
self.segments.push(Arc::new(RwLock::new(segment)));
Ok(())
}
pub async fn sync(&self) -> Result<()> {
for sender in &self.write_queues {
let _ = sender.send(WriteRequest::Sync);
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
Ok(())
}
pub async fn close(&mut self) -> Result<()> {
for sender in &self.write_queues {
let _ = sender.send(WriteRequest::Shutdown);
}
for handle in self.background_handles.drain(..) {
let _ = handle.await;
}
for segment in &self.segments {
let mut segment = segment.write();
segment.close()?;
}
Ok(())
}
pub fn get_segment_metadata(&self, segment_id: u64) -> Option<&VlogSegmentMeta> {
self.segment_metadata.get(&segment_id)
}
}
#[derive(Debug)]
pub enum WriteRequest {
Write { value: Value, callback: WriteCallback },
Sync,
Shutdown,
}
#[derive(Debug)]
pub enum WriteCallback {
Channel(mpsc::Sender<Result<ValuePointer>>),
None,
}
pub struct VlogSegment {
file: BufWriter<File>,
meta: VlogSegmentMeta,
current_offset: u64,
config: ValueLogConfig,
}
impl VlogSegment {
fn new(vlog_dir: &PathBuf, config: &ValueLogConfig, segment_id: u64) -> Result<Self> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let filename = format!("vlog_{:016x}_{:016x}.seg", segment_id, timestamp);
let path = vlog_dir.join(filename);
let file = OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&path)?;
let mut buf_writer = BufWriter::with_capacity(config.cache_size, file);
let header = VlogHeader::new(config.compression_algorithm.clone());
let header_bytes = bincode::serialize(&header)?;
buf_writer.write_all(&header_bytes)?;
buf_writer.flush()?;
let meta = VlogSegmentMeta {
path: path.clone(),
size: header_bytes.len() as u64,
entry_count: 0,
first_offset: header_bytes.len() as u64,
last_offset: header_bytes.len() as u64,
created_at: timestamp,
closed: false,
};
Ok(Self {
file: buf_writer,
meta,
current_offset: header_bytes.len() as u64,
config: config.clone(),
})
}
fn write_value(&mut self, value: &Value) -> Result<ValuePointer> {
let (compressed_data, compression, checksum) = if self.config.compress_values {
self.compress_value(&value.data)?
} else {
(value.data.clone(), CompressionAlgorithm::None, self.calculate_checksum(&value.data))
};
let entry = VlogEntry {
length: compressed_data.len() as u32,
compression,
checksum,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
};
let entry_bytes = bincode::serialize(&entry)?;
self.file.write_all(&(entry_bytes.len() as u32).to_le_bytes())?;
self.file.write_all(&entry_bytes)?;
self.file.write_all(&compressed_data)?;
let entry_size = 4 + entry_bytes.len() + compressed_data.len();
let vptr = ValuePointer::with_checksum(
self.meta.path.file_name().unwrap().to_string_lossy().parse::<u64>().unwrap_or(0),
self.current_offset,
compressed_data.len() as u32,
checksum,
);
self.current_offset += entry_size as u64;
self.meta.size = self.current_offset;
self.meta.entry_count += 1;
self.meta.last_offset = self.current_offset;
Ok(vptr)
}
fn compress_value(&self, data: &[u8]) -> Result<(Vec<u8>, CompressionAlgorithm, u32)> {
let checksum = self.calculate_checksum(data);
Ok((data.to_vec(), CompressionAlgorithm::None, checksum))
}
fn calculate_checksum(&self, data: &[u8]) -> u32 {
crc32fast::hash(data)
}
fn should_rotate(&self) -> bool {
self.meta.size >= self.config.max_segment_size
}
fn close(&mut self) -> Result<()> {
self.file.flush()?;
self.file.get_ref().sync_all()?;
self.meta.closed = true;
Ok(())
}
}
pub struct VlogReader {
vlog_dir: PathBuf,
segments: HashMap<u64, VlogSegmentReader>,
}
impl VlogReader {
pub fn new(vlog_dir: PathBuf) -> Result<Self> {
Ok(Self {
vlog_dir,
segments: HashMap::new(),
})
}
pub fn read_value(&mut self, vptr: &ValuePointer) -> Result<Value> {
let segment_reader = if let Some(reader) = self.segments.get_mut(&vptr.segment_id) {
reader
} else {
let reader = VlogSegmentReader::new(&self.vlog_dir, vptr.segment_id)?;
self.segments.insert(vptr.segment_id, reader);
self.segments.get_mut(&vptr.segment_id).unwrap()
};
segment_reader.read_value_at(vptr.offset, vptr.length)
}
pub fn close(&mut self) -> Result<()> {
for (_, reader) in self.segments.drain() {
reader.close()?;
}
Ok(())
}
}
struct VlogSegmentReader {
file: File,
path: PathBuf,
}
impl VlogSegmentReader {
fn new(vlog_dir: &PathBuf, segment_id: u64) -> Result<Self> {
let entries = std::fs::read_dir(vlog_dir)?;
let segment_path = entries
.filter_map(|entry| entry.ok())
.find(|entry| {
entry.path().to_string_lossy().contains(&format!("vlog_{:016x}", segment_id))
})
.ok_or_else(|| Error::InvalidValuePointer(format!("Segment {} not found", segment_id)))?
.path();
let file = OpenOptions::new().read(true).open(&segment_path)?;
Ok(Self {
file,
path: segment_path,
})
}
fn read_value_at(&mut self, offset: u64, length: u32) -> Result<Value> {
self.file.seek(SeekFrom::Start(offset))?;
let mut len_bytes = [0u8; 4];
self.file.read_exact(&mut len_bytes)?;
let entry_len = u32::from_le_bytes(len_bytes) as usize;
let mut entry_bytes = vec![0u8; entry_len];
self.file.read_exact(&mut entry_bytes)?;
let entry: VlogEntry = bincode::deserialize(&entry_bytes)?;
let mut value_data = vec![0u8; entry.length as usize];
self.file.read_exact(&mut value_data)?;
let decompressed_data = if entry.compression != CompressionAlgorithm::None {
self.decompress_value(&value_data, &entry.compression)?
} else {
value_data
};
let calculated_checksum = self.calculate_checksum(&decompressed_data);
if calculated_checksum != entry.checksum {
return Err(Error::ValueLogCorruption(format!(
"Checksum mismatch: expected {}, got {}",
entry.checksum, calculated_checksum
)));
}
Ok(Value::new(decompressed_data))
}
fn decompress_value(&self, data: &[u8], compression: &CompressionAlgorithm) -> Result<Vec<u8>> {
match compression {
CompressionAlgorithm::Lz4 => {
Ok(data.to_vec())
}
CompressionAlgorithm::Zstd => {
Ok(data.to_vec())
}
CompressionAlgorithm::Snappy => {
Ok(data.to_vec())
}
CompressionAlgorithm::None => Ok(data.to_vec()),
}
}
fn calculate_checksum(&self, data: &[u8]) -> u32 {
crc32fast::hash(data)
}
fn close(&mut self) -> Result<()> {
Ok(())
}
}
impl Drop for VlogWriter {
fn drop(&mut self) {
let _ = tokio::runtime::Handle::current().block_on(self.close());
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_vlog_writer_creation() {
let temp_dir = tempdir().unwrap();
let config = ValueLogConfig {
vlog_path: temp_dir.path().to_path_buf(),
..Default::default()
};
let writer = VlogWriter::new(config);
assert!(writer.is_ok());
}
#[test]
fn test_vlog_header_validation() {
let header = VlogHeader::new(CompressionAlgorithm::Lz4);
assert!(header.validate());
}
#[test]
fn test_compression_decompression() {
let data = b"Hello, World! This is a test string for compression testing.";
let config = ValueLogConfig::default();
let compressed = data.to_vec();
let decompressed = data.to_vec();
assert_eq!(data, &decompressed[..]);
}
}