use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::str;
use std::{
collections::{btree_map::Entry, BTreeMap},
convert::TryInto,
};
use tokio::fs::{self, read_dir, DirBuilder, File, OpenOptions};
use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
const DELETE_FILE_INTERVAL: u64 = 8;
const INDEX_NAME: &str = "index";
const LOGS_PATH: &str = "/logs";
#[derive(Debug, Clone, Copy)]
pub enum LogType {
Skip = 0,
Propose = 1,
QuorumVotes = 2,
FinalizeBlock = 3,
}
impl From<u8> for LogType {
fn from(s: u8) -> LogType {
match s {
1 => LogType::Propose,
2 => LogType::QuorumVotes,
3 => LogType::FinalizeBlock,
_ => LogType::Skip,
}
}
}
pub struct Wal {
height_fs: BTreeMap<u64, File>,
dir: String,
current_height: u64,
ifile: File,
}
impl Wal {
async fn delete_old_file(dir: &str, current_height: u64) -> io::Result<()> {
let mut read_dir = read_dir(dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
let fpath = entry.path();
if let Some(fname) = fpath.file_name() {
let strs: Vec<&str> = fname.to_str().unwrap().split('.').collect();
if !strs.is_empty() {
let num = strs[0].parse::<u64>().unwrap_or(current_height);
if num + DELETE_FILE_INTERVAL < current_height {
fs::remove_file(fpath).await?;
}
}
}
}
Ok(())
}
pub async fn create(dir: &str) -> io::Result<Wal> {
let fss = read_dir(dir).await;
if fss.is_err() {
DirBuilder::new().recursive(true).create(dir).await?;
}
let file_path = dir.to_string() + "/" + INDEX_NAME;
let mut ifs = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.open(file_path)
.await?;
ifs.rewind().await?;
let mut string_buf: String = String::new();
let res_fsize = ifs.read_to_string(&mut string_buf).await?;
let num_str = string_buf.trim();
let cur_height: u64;
let last_file_path: String;
let logs_dir = dir.to_string() + LOGS_PATH;
let logsfss = read_dir(&logs_dir).await;
if logsfss.is_err() {
DirBuilder::new().recursive(true).create(&logs_dir).await?;
}
if res_fsize == 0 {
last_file_path = dir.to_string() + LOGS_PATH + "/1.log";
cur_height = 0;
} else {
let hi_res = num_str.parse::<u64>();
if let Ok(hi) = hi_res {
cur_height = hi;
last_file_path =
dir.to_string() + LOGS_PATH + "/" + cur_height.to_string().as_str() + ".log"
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"index file data wrong",
));
}
}
Self::delete_old_file(&(dir.to_string() + LOGS_PATH + "/"), cur_height).await?;
let fs = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.open(last_file_path)
.await?;
let mut tmp = BTreeMap::new();
tmp.insert(cur_height, fs);
Ok(Wal {
height_fs: tmp,
dir: dir.to_string(),
current_height: cur_height,
ifile: ifs,
})
}
fn get_file_path(dir: &str, height: u64) -> String {
let mut name = height.to_string();
name += ".log";
let pathname = dir.to_string() + LOGS_PATH + "/";
pathname + &*name
}
async fn set_index_file(&mut self, height: u64) -> io::Result<u64> {
self.current_height = height;
self.ifile.rewind().await?;
let hstr = height.to_string();
let content = hstr.as_bytes();
let len = content.len() as u64;
self.ifile.set_len(len).await?;
self.ifile.write_all(content).await?;
self.ifile.sync_data().await?;
Ok(len)
}
pub async fn set_height(&mut self, height: u64) -> io::Result<u64> {
let len = self.set_index_file(height).await?;
if let Entry::Vacant(entry) = self.height_fs.entry(height) {
let filename = Wal::get_file_path(&self.dir, height);
let fs = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(filename)
.await?;
entry.insert(fs);
}
if height > DELETE_FILE_INTERVAL {
let newer_file = self.height_fs.split_off(&(height - DELETE_FILE_INTERVAL));
for &i in self.height_fs.keys() {
let delfilename = Wal::get_file_path(&self.dir, i);
let _ = fs::remove_file(delfilename).await;
}
self.height_fs = newer_file;
}
Ok(len)
}
pub async fn save(&mut self, height: u64, log_type: LogType, msg: &[u8]) -> io::Result<u64> {
let mtype = log_type as u8;
let mlen = msg.len() as u32;
if mlen == 0 {
return Ok(0);
}
let logs_dir = self.dir.to_string() + LOGS_PATH;
let logsfss = read_dir(&logs_dir).await;
if logsfss.is_err() {
DirBuilder::new().recursive(true).create(&logs_dir).await?;
}
if height > self.current_height && height < self.current_height + DELETE_FILE_INTERVAL {
if let Entry::Vacant(entry) = self.height_fs.entry(height) {
let filename = Wal::get_file_path(&self.dir, height);
let fs = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(filename)
.await?;
entry.insert(fs);
}
}
let mut hlen = 0;
if let Some(fs) = self.height_fs.get_mut(&height) {
let len_bytes: [u8; 4] = mlen.to_le_bytes();
let type_bytes: [u8; 1] = [mtype];
let check_sum = calculate_hash(&msg);
fs.seek(io::SeekFrom::End(0)).await?;
fs.write_all(&len_bytes[..]).await?;
fs.write_all(&type_bytes[..]).await?;
fs.write_all(&check_sum.to_le_bytes()).await?;
hlen = fs.write(msg).await?;
fs.flush().await?;
} else {
warn!(
"wal not save height {} current height {} ",
height, self.current_height
);
}
let _ = self.set_height(height).await;
Ok(hlen as u64)
}
pub fn get_cur_height(&self) -> u64 {
self.current_height
}
pub async fn load(&mut self) -> Vec<(u8, Vec<u8>)> {
let mut vec_buf: Vec<u8> = Vec::new();
let mut vec_out: Vec<(u8, Vec<u8>)> = Vec::new();
let cur_height = self.current_height;
if self.height_fs.is_empty() || cur_height == 0 {
return vec_out;
}
for (height, fs) in &mut self.height_fs {
if *height < self.current_height {
continue;
}
fs.rewind().await.unwrap();
let res_fsize = fs.read_to_end(&mut vec_buf).await;
if res_fsize.is_err() {
return vec_out;
}
let fsize = res_fsize.unwrap();
let mut index = 0;
loop {
if index + 13 > fsize {
break;
}
let bytes = match vec_buf[index..index + 4].try_into() {
Ok(bytes) => bytes,
Err(e) => {
warn!("wal file may be corrupted in len_bytes: {}", e);
return Vec::new();
}
};
let tmp = u32::from_le_bytes(bytes);
let bodylen = tmp as usize;
let mtype = vec_buf[index + 4];
let bytes = match vec_buf[index + 5..index + 13].try_into() {
Ok(bytes) => bytes,
Err(e) => {
warn!("wal file may be corrupted in check_sum: {}", e);
return Vec::new();
}
};
let saved_crc = u64::from_le_bytes(bytes);
index += 13;
if index + bodylen > fsize {
break;
}
let msg = &vec_buf[index..index + bodylen];
let check_sum = calculate_hash(&msg);
if check_sum != saved_crc {
warn!(
"wal crc checked error saved {} check {}",
saved_crc, check_sum
);
break;
}
vec_out.push((mtype, msg.to_vec()));
index += bodylen;
}
}
vec_out
}
pub async fn clear_file(&mut self) -> io::Result<()> {
self.height_fs.clear();
fs::remove_dir_all(&(self.dir.to_string() + LOGS_PATH)).await
}
}
fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}