use crate::{Result, StorageError};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Write, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use super::BlobRef;
const BLOB_MAGIC: u32 = 0x424C4F42; const BLOB_VERSION: u32 = 1;
struct BlobState {
current_file: BlobFile,
current_file_id: u32,
}
pub struct BlobStore {
dir: PathBuf,
state: Mutex<BlobState>,
max_file_size: usize,
}
struct BlobFile {
file_id: u32,
writer: BufWriter<File>, offset: u64,
#[allow(dead_code)]
path: PathBuf,
}
impl BlobStore {
pub fn new<P: AsRef<Path>>(dir: P, max_file_size: usize) -> Result<Self> {
let dir = dir.as_ref().to_path_buf();
std::fs::create_dir_all(&dir)?;
Self::recover_last_blob_file(&dir)?;
let file_id = Self::find_next_file_id(&dir)?;
let blob_file = BlobFile::create(&dir, file_id)?;
Ok(Self {
dir,
state: Mutex::new(BlobState {
current_file: blob_file,
current_file_id: file_id,
}),
max_file_size,
})
}
pub fn put(&self, data: &[u8]) -> Result<BlobRef> {
let mut state = self.state.lock()
.map_err(|_| StorageError::Lock("BlobStore state lock poisoned".into()))?;
if state.current_file.offset + data.len() as u64 + 12 > self.max_file_size as u64 {
self.rotate_file_locked(&mut state)?;
}
state.current_file.write_blob(data)
}
pub fn get(&self, blob_ref: &BlobRef) -> Result<Vec<u8>> {
let path = self.blob_file_path(blob_ref.file_id);
let mut file = File::open(&path)?;
file.seek(SeekFrom::Start(blob_ref.offset))?;
let mut size_buf = [0u8; 4];
file.read_exact(&mut size_buf)?;
let size = u32::from_le_bytes(size_buf);
if size != blob_ref.size {
return Err(StorageError::InvalidData("Blob size mismatch".into()));
}
let mut data = vec![0u8; size as usize];
file.read_exact(&mut data)?;
let mut crc_buf = [0u8; 4];
file.read_exact(&mut crc_buf)?;
let stored_crc = u32::from_le_bytes(crc_buf);
let computed_crc = crc32fast::hash(&data);
if stored_crc != computed_crc {
return Err(StorageError::InvalidData("Blob CRC mismatch".into()));
}
Ok(data)
}
pub fn delete(&self, _blob_ref: &BlobRef) -> Result<()> {
Ok(())
}
pub fn gc_blob_files(&self, live_refs: &std::collections::HashSet<(u32, u64)>) -> Result<usize> {
let mut removed = 0;
let live_file_ids: std::collections::HashSet<u32> =
live_refs.iter().map(|(fid, _)| *fid).collect();
let entries: Vec<_> = std::fs::read_dir(&self.dir)?
.filter_map(|e| e.ok())
.collect();
let state = self.state.lock()
.map_err(|_| StorageError::Lock("BlobStore state lock poisoned".into()))?;
for entry in entries {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".blob") {
if let Some(id_str) = name.strip_suffix(".blob") {
if let Ok(file_id) = id_str.parse::<u32>() {
if file_id == state.current_file_id {
continue;
}
if !live_file_ids.contains(&file_id) {
if std::fs::remove_file(entry.path()).is_ok() {
removed += 1;
}
}
}
}
}
}
}
Ok(removed)
}
fn find_next_file_id(dir: &Path) -> Result<u32> {
let mut max_id = 0u32;
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".blob") {
if let Some(id_str) = name.strip_suffix(".blob") {
if let Ok(id) = id_str.parse::<u32>() {
max_id = max_id.max(id);
}
}
}
}
}
}
Ok(max_id + 1)
}
fn blob_file_path(&self, file_id: u32) -> PathBuf {
self.dir.join(format!("{:08}.blob", file_id))
}
fn rotate_file_locked(&self, state: &mut BlobState) -> Result<()> {
state.current_file.flush()?;
let new_id = state.current_file_id + 1;
let new_file = BlobFile::create(&self.dir, new_id)?;
state.current_file = new_file;
state.current_file_id = new_id;
Ok(())
}
fn recover_last_blob_file(dir: &Path) -> Result<()> {
let max_id = Self::find_next_file_id(dir)?;
if max_id == 0 {
return Ok(()); }
let last_file_id = max_id.saturating_sub(1);
if last_file_id == 0 {
return Ok(());
}
let path = dir.join(format!("{:08}.blob", last_file_id));
if !path.exists() {
return Ok(());
}
let mut file = match File::open(&path) {
Ok(f) => f,
Err(_) => return Ok(()),
};
let mut header = [0u8; 8];
if file.read_exact(&mut header).is_err() {
let _ = std::fs::remove_file(&path);
return Ok(());
}
let mut valid_offset: u64 = 8;
loop {
let mut size_buf = [0u8; 4];
match file.read_exact(&mut size_buf) {
Ok(_) => {}
Err(_) => break, }
let size = u32::from_le_bytes(size_buf);
let mut data = vec![0u8; size as usize];
match file.read_exact(&mut data) {
Ok(_) => {}
Err(_) => break, }
let mut crc_buf = [0u8; 4];
match file.read_exact(&mut crc_buf) {
Ok(_) => {}
Err(_) => break, }
let stored_crc = u32::from_le_bytes(crc_buf);
let computed_crc = crc32fast::hash(&data);
if stored_crc != computed_crc {
debug_log!("[BlobStore] CRC mismatch in blob file {}, truncating to offset {}", last_file_id, valid_offset);
break;
}
valid_offset += 4 + size as u64 + 4;
}
let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
if valid_offset < file_size {
drop(file);
let file = OpenOptions::new().write(true).open(&path)?;
file.set_len(valid_offset)?;
debug_log!("[BlobStore] Recovered blob file {}: truncated from {} to {} bytes",
last_file_id, file_size, valid_offset);
}
Ok(())
}
}
impl BlobFile {
fn create(dir: &Path, file_id: u32) -> Result<Self> {
let path = dir.join(format!("{:08}.blob", file_id));
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
file.write_all(&BLOB_MAGIC.to_le_bytes())?;
file.write_all(&BLOB_VERSION.to_le_bytes())?;
let offset = 8;
Ok(Self {
file_id,
writer: BufWriter::with_capacity(64 * 1024, file),
offset,
path,
})
}
fn flush(&mut self) -> Result<()> {
self.writer.flush()?;
self.writer.get_mut().sync_data()?;
Ok(())
}
fn write_blob(&mut self, data: &[u8]) -> Result<BlobRef> {
let size = data.len() as u32;
let offset = self.offset;
self.writer.write_all(&size.to_le_bytes())?;
self.writer.write_all(data)?;
let crc = crc32fast::hash(data);
self.writer.write_all(&crc.to_le_bytes())?;
self.writer.flush()?;
self.writer.get_mut().sync_data()?;
self.offset += 4 + size as u64 + 4;
Ok(BlobRef {
file_id: self.file_id,
offset,
size,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_blob_store_basic() {
let temp_dir = TempDir::new().unwrap();
let store = BlobStore::new(temp_dir.path(), 1024 * 1024).unwrap();
let data = b"Hello, Blob World!".to_vec();
let blob_ref = store.put(&data).unwrap();
let retrieved = store.get(&blob_ref).unwrap();
assert_eq!(data, retrieved);
}
#[test]
fn test_large_blob() {
let temp_dir = TempDir::new().unwrap();
let store = BlobStore::new(temp_dir.path(), 1024 * 1024).unwrap();
let large_data = vec![42u8; 1024 * 1024];
let blob_ref = store.put(&large_data).unwrap();
let retrieved = store.get(&blob_ref).unwrap();
assert_eq!(large_data.len(), retrieved.len());
assert_eq!(large_data, retrieved);
}
}