use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use crate::error::MemMapError;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalCommand {
AppendMemory {
id: u64,
chunk_id: u32,
offset: u64,
length: u64,
tags: Vec<String>,
},
SetKv { key: String, value: Vec<u8> },
DeleteKv { key: String },
AppendStream {
key: String,
chunk_id: u32,
offset: u64,
length: u64,
},
}
pub struct LogManager {
wal_path: PathBuf,
writer: BufWriter<File>,
}
impl LogManager {
pub fn open(root: &Path) -> Result<Self, MemMapError> {
let wal_path = root.join("state.wal");
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&wal_path)?;
Ok(Self {
wal_path,
writer: BufWriter::new(file),
})
}
pub fn append(&mut self, cmd: &WalCommand) -> Result<(), MemMapError> {
let payload = bincode::serialize(cmd)?;
let len = payload.len() as u64;
self.writer.write_all(&len.to_le_bytes())?;
self.writer.write_all(&payload)?;
self.writer.flush()?;
self.writer.get_ref().sync_all()?;
Ok(())
}
pub fn replay(&self) -> Result<Vec<WalCommand>, MemMapError> {
let mut file = File::open(&self.wal_path)?;
let file_len = file.metadata()?.len();
let mut commands = Vec::new();
let mut pos: u64 = 0;
loop {
if pos + 8 > file_len {
break;
}
let mut len_buf = [0u8; 8];
file.seek(SeekFrom::Start(pos))?;
file.read_exact(&mut len_buf)?;
let payload_len = u64::from_le_bytes(len_buf);
if pos + 8 + payload_len > file_len {
break;
}
let mut payload = vec![0u8; payload_len as usize];
file.read_exact(&mut payload)?;
let cmd: WalCommand =
bincode::deserialize(&payload).map_err(|e| MemMapError::CorruptWal {
offset: pos,
reason: e.to_string(),
})?;
commands.push(cmd);
pos += 8 + payload_len;
}
Ok(commands)
}
}
const CHUNK_SIZE_LIMIT: u64 = 64 * 1024 * 1024; const CHUNK_NAME_PREFIX: &str = "chunk_";
pub struct BlockStorage {
blocks_dir: PathBuf,
active_chunk_id: u32,
active_offset: u64,
writer: BufWriter<File>,
active_reader: Mutex<File>,
mmap_cache: Mutex<HashMap<u32, Arc<Mmap>>>,
}
impl BlockStorage {
pub fn open(root: &Path) -> Result<Self, MemMapError> {
let blocks_dir = root.join("blocks");
std::fs::create_dir_all(&blocks_dir)?;
let (active_chunk_id, active_offset) =
Self::find_active_chunk(&blocks_dir)?;
let chunk_path = Self::chunk_path(&blocks_dir, active_chunk_id);
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&chunk_path)?;
let active_reader = File::open(&chunk_path)?;
Ok(Self {
blocks_dir,
active_chunk_id,
active_offset,
writer: BufWriter::new(file),
active_reader: Mutex::new(active_reader),
mmap_cache: Mutex::new(HashMap::new()),
})
}
pub fn append(&mut self, data: &[u8]) -> Result<(u32, u64, u64), MemMapError> {
if self.active_offset + data.len() as u64 > CHUNK_SIZE_LIMIT {
self.roll_chunk()?;
}
let offset = self.active_offset;
self.writer.write_all(data)?;
self.writer.flush()?;
self.active_offset += data.len() as u64;
Ok((self.active_chunk_id, offset, data.len() as u64))
}
pub fn read(&self, chunk_id: u32, offset: u64, length: u64) -> Result<Vec<u8>, MemMapError> {
if chunk_id > self.active_chunk_id {
return Err(MemMapError::BlockNotFound { chunk_id, offset });
}
if chunk_id == self.active_chunk_id {
let mut reader = self.active_reader.lock().unwrap();
if offset + length > self.active_offset {
return Err(MemMapError::BlockNotFound { chunk_id, offset });
}
reader.seek(SeekFrom::Start(offset))?;
let mut buf = vec![0u8; length as usize];
reader.read_exact(&mut buf)?;
Ok(buf)
} else {
let mmap = {
let mut cache = self.mmap_cache.lock().unwrap();
if !cache.contains_key(&chunk_id) {
let path = Self::chunk_path(&self.blocks_dir, chunk_id);
if !path.exists() {
return Err(MemMapError::BlockNotFound { chunk_id, offset });
}
let file = File::open(&path)?;
let mmap = unsafe { Mmap::map(&file)? };
cache.insert(chunk_id, Arc::new(mmap));
}
cache.get(&chunk_id).unwrap().clone()
};
let start = offset as usize;
let end = start + length as usize;
if end > mmap.len() {
return Err(MemMapError::BlockNotFound { chunk_id, offset });
}
Ok(mmap[start..end].to_vec())
}
}
fn roll_chunk(&mut self) -> Result<(), MemMapError> {
self.writer.flush()?;
self.active_chunk_id += 1;
self.active_offset = 0;
let path = Self::chunk_path(&self.blocks_dir, self.active_chunk_id);
let file = OpenOptions::new().create(true).append(true).open(&path)?;
self.writer = BufWriter::new(file);
let reader = File::open(&path)?;
*self.active_reader.lock().unwrap() = reader;
Ok(())
}
fn find_active_chunk(blocks_dir: &Path) -> Result<(u32, u64), MemMapError> {
let mut max_id: Option<u32> = None;
for entry in std::fs::read_dir(blocks_dir)? {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(rest) = name.strip_prefix(CHUNK_NAME_PREFIX) {
let stem = rest.trim_end_matches(".bin");
if let Ok(id) = stem.parse::<u32>() {
max_id = Some(max_id.map_or(id, |m: u32| m.max(id)));
}
}
}
let chunk_id = max_id.unwrap_or(0);
let path = Self::chunk_path(blocks_dir, chunk_id);
let offset = if path.exists() {
std::fs::metadata(&path)?.len()
} else {
0
};
Ok((chunk_id, offset))
}
fn chunk_path(blocks_dir: &Path, id: u32) -> PathBuf {
blocks_dir.join(format!("{}{:04}.bin", CHUNK_NAME_PREFIX, id))
}
}