use chrono::{DateTime, Utc};
use crate::{
consts::{SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8, VLOG_FILE_NAME},
err::Error,
fs::{FileAsync, FileNode, VLogFileNode, VLogFs},
types::{ByteSerializedEntry, CreatedAt, IsTombStone, ValOffset, Value},
};
use std::path::{Path, PathBuf};
type TotalBytesRead = usize;
#[derive(Debug, Clone)]
pub struct VFile<F: VLogFs> {
pub file: F,
pub path: PathBuf,
}
impl<F: VLogFs> VFile<F> {
pub fn new<P: AsRef<Path> + Send + Sync>(path: P, file: F) -> Self {
Self {
path: path.as_ref().to_path_buf(),
file,
}
}
}
#[derive(Debug, Clone)]
pub struct ValueLog {
pub content: VFile<VLogFileNode>,
pub head_offset: usize,
pub tail_offset: usize,
pub size: usize,
}
#[derive(PartialEq, Debug, Clone)]
pub struct ValueLogEntry {
pub ksize: usize,
pub vsize: usize,
pub key: Vec<u8>,
pub value: Vec<u8>,
pub created_at: DateTime<Utc>,
pub is_tombstone: bool,
}
impl ValueLog {
pub async fn new<P: AsRef<Path> + Send + Sync>(dir: P) -> Result<Self, Error> {
FileNode::create_dir_all(dir.as_ref()).await?;
let file_path = dir.as_ref().join(VLOG_FILE_NAME);
let file = VLogFileNode::new(file_path.to_owned(), crate::fs::FileType::ValueLog)
.await
.unwrap();
let size = file.node.size().await;
Ok(Self {
head_offset: 0,
tail_offset: 0,
content: VFile::new(file_path, file),
size,
})
}
pub async fn append<T: AsRef<[u8]>>(
&mut self,
key: T,
value: T,
created_at: CreatedAt,
is_tombstone: bool,
) -> Result<ValOffset, Error> {
let v_log_entry = ValueLogEntry::new(
key.as_ref().len(),
value.as_ref().len(),
key.as_ref().to_vec(),
value.as_ref().to_vec(),
created_at,
is_tombstone,
);
let serialized_data = v_log_entry.serialize();
let last_offset = self.size;
let data_file = &self.content;
data_file.file.node.write_all(&serialized_data).await?;
self.size += serialized_data.len();
Ok(last_offset)
}
pub async fn get(&self, start_offset: usize) -> Result<Option<(Value, IsTombStone)>, Error> {
self.content.file.get(start_offset).await
}
pub async fn sync_to_disk(&self) -> Result<(), Error> {
self.content.file.node.sync_all().await
}
pub async fn recover(&mut self, start_offset: usize) -> Result<Vec<ValueLogEntry>, Error> {
self.content.file.recover(start_offset).await
}
pub async fn read_chunk_to_garbage_collect(
&self,
bytes_to_collect: usize,
) -> Result<(Vec<ValueLogEntry>, TotalBytesRead), Error> {
self.content
.file
.read_chunk_to_garbage_collect(bytes_to_collect, self.tail_offset as u64)
.await
}
pub async fn clear_all(&mut self) {
if self.content.file.node.metadata().await.is_ok() {
if let Err(err) = self.content.file.node.remove_dir_all().await {
log::info!("{}", err);
}
}
self.size = 0;
self.tail_offset = 0;
self.head_offset = 0;
}
pub fn set_head(&mut self, head: usize) {
self.head_offset = head;
}
pub fn set_tail(&mut self, tail: usize) {
self.tail_offset = tail;
}
}
impl ValueLogEntry {
pub fn new<T: AsRef<[u8]>>(
ksize: usize,
vsize: usize,
key: T,
value: T,
created_at: CreatedAt,
is_tombstone: bool,
) -> Self {
Self {
ksize,
vsize,
key: key.as_ref().to_vec(),
value: value.as_ref().to_vec(),
created_at,
is_tombstone,
}
}
pub(crate) fn serialize(&self) -> ByteSerializedEntry {
let entry_len =
SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + self.key.len() + self.value.len() + SIZE_OF_U8;
let mut serialized_data = Vec::with_capacity(entry_len);
serialized_data.extend_from_slice(&(self.key.len() as u32).to_le_bytes());
serialized_data.extend_from_slice(&(self.value.len() as u32).to_le_bytes());
serialized_data.extend_from_slice(&self.created_at.timestamp_millis().to_le_bytes());
serialized_data.push(self.is_tombstone as u8);
serialized_data.extend_from_slice(&self.key);
serialized_data.extend_from_slice(&self.value);
serialized_data
}
}