use serde::{Deserialize, Serialize};
use std::{
fs,
io::{Read, Write},
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::BlockHeight;
#[derive(Debug, thiserror::Error)]
pub enum CacheError {
#[error("Io: {0}")]
Io(#[from] std::io::Error),
#[error("Postcard: {0}")]
Postcard(#[from] postcard::Error),
#[error("Tokio join: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("Unknown file format: {0}")]
UnknownFile(String),
#[error("Wrong last processed height")]
WrongLastProcessedHeight,
}
pub type CacheResult<T> = Result<T, CacheError>;
#[derive(Clone)]
pub struct Cache<T: serde::Serialize> {
path: String,
storage: Arc<tokio::sync::Mutex<Vec<BlockHeight>>>,
__marker: std::marker::PhantomData<T>,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(bound = "T: serde::de::DeserializeOwned")]
pub struct UpdateCapable<T: serde::Serialize + serde::de::DeserializeOwned> {
pub block: crate::BlockMeta,
pub data: T,
}
const EXTENSION: &str = "blob";
const LAST_PROCESSED_BLOCK: &str = "last_processed_block";
impl<T: serde::Serialize + serde::de::DeserializeOwned> Cache<T> {
pub async fn new(path: String) -> CacheResult<Self> {
tokio::fs::create_dir_all(&path).await.ok();
let mut entries = tokio::fs::read_dir(&path).await?;
let mut files: Vec<BlockHeight> = vec![];
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if name == LAST_PROCESSED_BLOCK {
continue;
}
let name = name
.split('.')
.next()
.ok_or(CacheError::UnknownFile(name.to_string()))?;
files.push(
name.parse()
.map_err(|_| CacheError::UnknownFile(name.to_string()))?,
);
}
}
}
files.sort_unstable_by_key(|x| *x);
Ok(Self {
path: path.to_string(),
storage: Arc::new(tokio::sync::Mutex::new(files)),
__marker: std::marker::PhantomData,
})
}
pub async fn len(&self) -> usize {
self.storage.lock().await.len()
}
pub async fn items(&self) -> Vec<BlockHeight> {
self.storage.lock().await.clone()
}
pub async fn read_cache(&self, block_id: BlockHeight) -> CacheResult<UpdateCapable<T>> {
let file = format!("{block_id}.{EXTENSION}");
let path = std::path::Path::new(&self.path).join(file);
let mut buffer = vec![];
tokio::fs::OpenOptions::new()
.read(true)
.open(path)
.await?
.read_to_end(&mut buffer)
.await?;
let data = Self::decompress_brotli(buffer)?;
Ok(postcard::from_bytes(&data)?)
}
pub async fn add(&self, block_id: BlockHeight, data: &UpdateCapable<T>) -> CacheResult<()> {
let path = std::path::Path::new(&self.path).join(format!("{block_id}.{EXTENSION}"));
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.await?;
let data = postcard::to_allocvec(data)?;
let buffer = tokio::task::spawn_blocking(|| Self::compress_brotli(data)).await??;
file.write_all(&buffer).await?;
file.flush().await?;
{
let mut l = self.storage.lock().await;
if let Some(last) = l.last() {
if block_id - 1 != *last {
panic!("Failed to push same block in the cache");
}
}
l.push(block_id);
}
Ok(())
}
pub async fn read_last_processed(&self) -> CacheResult<BlockHeight> {
let path = std::path::Path::new(&self.path).join(LAST_PROCESSED_BLOCK);
if !path.exists() {
return Ok(0);
}
let bytes = tokio::fs::read(&path)
.await
.inspect_err(|_| println!("write path: {path:?}"))?;
let height = String::from_utf8(bytes).map_err(|_| CacheError::WrongLastProcessedHeight)?;
height
.parse()
.map_err(|_| CacheError::WrongLastProcessedHeight)
}
pub async fn write_last_processed(&self, height: BlockHeight) -> CacheResult<()> {
let path = std::path::Path::new(&self.path).join(LAST_PROCESSED_BLOCK);
tokio::fs::write(&path, height.to_string())
.await
.inspect_err(|_| println!("write path: {path:?}"))?;
Ok(())
}
pub async fn remove(&self, block_id: BlockHeight) -> CacheResult<()> {
let mut lock = self.storage.lock().await;
let pos = lock.binary_search(&block_id);
if let Ok(pos) = pos {
lock.remove(pos);
drop(lock);
let path = std::path::Path::new(&self.path).join(format!("{block_id}.{EXTENSION}"));
tokio::fs::remove_file(&path)
.await
.inspect_err(|_| println!("rm path: {path:?}"))?;
}
Ok(())
}
pub async fn remove_oldest(&self) -> CacheResult<()> {
let mut lock = self.storage.lock().await;
if !lock.is_empty() {
let block_id = lock.remove(0);
drop(lock);
let path = std::path::Path::new(&self.path).join(format!("{block_id}.{EXTENSION}"));
tokio::fs::remove_file(&path)
.await
.inspect_err(|_| println!("rm oldest path: {path:?}"))?;
}
Ok(())
}
pub fn compress_brotli(data: Vec<u8>) -> CacheResult<Vec<u8>> {
let mut buffer = Vec::new();
brotli::CompressorWriter::new(&mut buffer, 4096, 4, 22).write_all(&data)?;
Ok(buffer)
}
pub fn decompress_brotli(buffer: Vec<u8>) -> CacheResult<Vec<u8>> {
let mut decompressed_data = Vec::new();
brotli::Decompressor::new(&mut std::io::Cursor::new(buffer), 4096)
.read_to_end(&mut decompressed_data)?;
Ok(decompressed_data)
}
pub fn clean_all(&self) -> CacheResult<()> {
clear_dir(PathBuf::from(self.path.clone()))?;
Ok(())
}
}
fn clear_dir<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
for entry in fs::read_dir(&path)? {
let entry = entry?;
let entry_path = entry.path();
if entry_path.is_dir() {
fs::remove_dir_all(entry_path)?;
} else {
fs::remove_file(entry_path)?;
}
}
Ok(())
}