use std::{
collections::HashMap,
fs,
io::{self},
path::PathBuf,
sync::Arc,
};
use bincode::{config::standard, decode_from_slice, encode_to_vec};
use blake3::Hasher;
use parking_lot::RwLock;
use crate::infinitedb_core::block::{Block, BlockId};
pub struct LruBlockCache {
entries: HashMap<BlockId, (Arc<Block>, u64)>,
generation: u64,
current_bytes: usize,
max_bytes: usize,
}
impl LruBlockCache {
pub fn new(max_bytes: usize) -> Self {
Self {
entries: HashMap::new(),
generation: 0,
current_bytes: 0,
max_bytes,
}
}
pub fn get(&mut self, id: BlockId) -> Option<Arc<Block>> {
let cur_gen = self.generation + 1;
if let Some((block, last)) = self.entries.get_mut(&id) {
self.generation = cur_gen;
*last = cur_gen;
Some(Arc::clone(block))
} else {
None
}
}
pub fn insert(&mut self, block: Arc<Block>) {
let size = block_byte_size(&block);
if size > self.max_bytes {
return;
}
if let Some((existing, last)) = self.entries.get_mut(&block.id) {
self.current_bytes -= block_byte_size(existing);
self.generation += 1;
*last = self.generation;
*existing = block;
self.current_bytes += size;
return;
}
while self.current_bytes + size > self.max_bytes && !self.entries.is_empty() {
let lru_id = self
.entries
.iter()
.min_by_key(|(_, (_, g))| *g)
.map(|(id, _)| *id)
.unwrap();
if let Some((evicted, _)) = self.entries.remove(&lru_id) {
self.current_bytes -= block_byte_size(&evicted);
}
}
self.generation += 1;
self.current_bytes += size;
self.entries.insert(block.id, (block, self.generation));
}
pub fn invalidate(&mut self, id: BlockId) {
if let Some((evicted, _)) = self.entries.remove(&id) {
self.current_bytes -= block_byte_size(&evicted);
}
}
pub fn resident_bytes(&self) -> usize {
self.current_bytes
}
pub fn len(&self) -> usize {
self.entries.len()
}
}
fn block_byte_size(block: &Block) -> usize {
64 + block.records.iter().map(|r| 48 + r.data.len()).sum::<usize>()
}
pub struct BlockStore {
root: PathBuf,
cache: RwLock<LruBlockCache>,
}
impl BlockStore {
pub fn open(dir: PathBuf) -> io::Result<Self> {
Self::open_with_cache(dir, 10 * 1024 * 1024)
}
pub fn open_with_cache(dir: PathBuf, cache_bytes: usize) -> io::Result<Self> {
let blocks_dir = dir.join("blocks");
let meta_dir = dir.join("meta");
fs::create_dir_all(&blocks_dir)?;
fs::create_dir_all(&meta_dir)?;
Ok(Self {
root: dir,
cache: RwLock::new(LruBlockCache::new(cache_bytes)),
})
}
pub fn write_block(&self, block: &Block) -> io::Result<()> {
let path = self.block_path(block.id);
let payload = encode_to_vec(block, standard())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let tmp = path.with_extension("tmp");
fs::write(&tmp, &payload)?;
fs::rename(&tmp, &path)?;
self.cache
.write()
.insert(Arc::new(block.clone()));
Ok(())
}
pub fn read_block_shared(&self, id: BlockId) -> io::Result<Arc<Block>> {
if let Some(cached) = self.cache.write().get(id) {
return Ok(cached);
}
let path = self.block_path(id);
let bytes = fs::read(&path)?;
let (block, _): (Block, _) = decode_from_slice(&bytes, standard())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
verify_checksum(&block)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let arc = Arc::new(block);
self.cache.write().insert(Arc::clone(&arc));
Ok(arc)
}
pub fn read_block(&self, id: BlockId) -> io::Result<Block> {
Ok((*self.read_block_shared(id)?).clone())
}
pub fn delete_block(&self, id: BlockId) -> io::Result<()> {
let path = self.block_path(id);
self.cache.write().invalidate(id);
fs::remove_file(&path)
}
pub fn cache_stats(&self) -> (usize, usize) {
let cache = self.cache.read();
(cache.resident_bytes(), cache.len())
}
pub fn exists(&self, id: BlockId) -> io::Result<bool> {
Ok(self.block_path(id).exists())
}
pub fn list_blocks(&self) -> io::Result<Vec<BlockId>> {
let dir = self.root.join("blocks");
let mut ids = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let s = name.to_string_lossy();
if let Some(stem) = s.strip_suffix(".blk") {
if let Ok(n) = stem.parse::<u64>() {
ids.push(BlockId(n));
}
}
}
Ok(ids)
}
pub fn write_meta(&self, name: &str, data: &[u8]) -> io::Result<()> {
let path = self.root.join("meta").join(name);
let tmp = path.with_extension("tmp");
fs::write(&tmp, data)?;
fs::rename(&tmp, &path)
}
pub fn read_meta(&self, name: &str) -> io::Result<Vec<u8>> {
fs::read(self.root.join("meta").join(name))
}
pub fn root(&self) -> &PathBuf {
&self.root
}
pub fn wal_path(&self) -> PathBuf {
self.root.join("wal.log")
}
pub fn staging_wal_path(&self) -> PathBuf {
self.root.join("wal").join("staging.log")
}
fn block_path(&self, id: BlockId) -> PathBuf {
self.root.join("blocks").join(format!("{}.blk", id.0))
}
}
pub fn compute_checksum(block: &Block) -> io::Result<[u8; 32]> {
let payload = encode_to_vec(&block.records, standard())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let mut h = Hasher::new();
h.update(&payload);
Ok(*h.finalize().as_bytes())
}
fn verify_checksum(block: &Block) -> Result<(), String> {
let expected = compute_checksum(block).map_err(|e| e.to_string())?;
if block.checksum != expected {
Err(format!("Block {:?} checksum mismatch", block.id))
} else {
Ok(())
}
}