use chrono::{DateTime, Utc};
use err::Error::*;
use crate::{
consts::{BLOCK_SIZE, SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8},
err::{self, Error},
fs::{FileAsync, FileNode},
types::ByteSerializedEntry,
};
type BytesWritten = usize;
#[derive(Debug, Clone)]
pub struct Block {
pub(crate) entries: Vec<BlockEntry>,
pub(crate) size: usize,
pub(crate) entry_count: usize,
}
#[derive(Debug, Clone)]
pub struct BlockEntry {
pub key_prefix: u32,
pub key: Vec<u8>,
pub value_offset: u32,
pub creation_date: DateTime<Utc>,
pub is_tombstone: bool,
}
impl Block {
pub fn new() -> Self {
Block {
size: Default::default(),
entries: Vec::with_capacity(BLOCK_SIZE),
entry_count: Default::default(),
}
}
pub fn set_entry(
&mut self,
key_prefix: u32,
key: impl AsRef<[u8]>,
value_offset: u32,
creation_date: DateTime<Utc>,
is_tombstone: bool,
) -> Result<(), Error> {
let entry_size = key.as_ref().len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;
if self.is_full(entry_size) {
return Err(Error::BlockIsFull);
}
let entry = BlockEntry {
key: key.as_ref().to_vec(),
key_prefix,
creation_date,
is_tombstone,
value_offset,
};
self.entries.push(entry);
self.size += entry_size;
self.entry_count += 1;
Ok(())
}
pub async fn write_to_file(&self, file: FileNode) -> Result<BytesWritten, Error> {
let mut bytes_written = 0;
for entry in &self.entries {
let serialized_entry = self.serialize(entry)?;
file.write_all(&serialized_entry).await?;
bytes_written += serialized_entry.len();
}
Ok(bytes_written)
}
pub fn is_full(&self, entry_size: usize) -> bool {
self.size + entry_size > BLOCK_SIZE
}
pub fn get_last_entry(&self) -> BlockEntry {
self.entries.last().unwrap().to_owned()
}
#[cfg(test)]
pub fn get_entry_count(&self) -> usize {
self.entry_count
}
pub(crate) fn serialize(&self, entry: &BlockEntry) -> Result<ByteSerializedEntry, Error> {
let entry_len = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;
let mut entry_vec = Vec::with_capacity(entry_len);
entry_vec.extend_from_slice(&(entry.key_prefix).to_le_bytes());
entry_vec.extend_from_slice(&entry.key);
entry_vec.extend_from_slice(&entry.value_offset.to_le_bytes());
entry_vec.extend_from_slice(&entry.creation_date.timestamp_millis().to_le_bytes());
entry_vec.push(entry.is_tombstone as u8);
if entry_len != entry_vec.len() {
return Err(Serialization("Invalid input"));
}
Ok(entry_vec)
}
#[allow(dead_code)]
pub(crate) fn get_entry(&self, key: impl AsRef<[u8]>) -> Option<&BlockEntry> {
self.entries.iter().find(|entry| *entry.key == *key.as_ref())
}
}
#[cfg(test)]
mod tests {
use crate::types::Key;
use super::*;
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::{fs::File, sync::RwLock};
#[test]
fn test_new_empty_block_creation() {
let block = Block::new();
assert_eq!(block.entries.len(), 0);
assert_eq!(block.entries.capacity(), BLOCK_SIZE);
assert_eq!(block.entry_count, 0);
}
#[test]
fn test_is_full() {
let block = Block::new();
assert!(!block.is_full(10));
assert!(block.is_full(BLOCK_SIZE + 1));
}
#[test]
fn test_set_entry() {
let mut block = Block::new();
let key: Key = vec![1, 2, 3];
let value_offset: u32 = 1000;
let creation_date = Utc::now();
let is_tombstone: bool = false;
let res = block.set_entry(key.len() as u32, &key, value_offset, creation_date, is_tombstone);
assert!(res.is_ok());
assert_eq!(block.entries.len(), 1);
assert_eq!(block.entry_count, 1);
assert_eq!(
block.size,
key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8
);
}
#[test]
fn test_serialize() {
let block = Block::new();
let key: Key = vec![1, 2, 3];
let value_offset: u32 = 1000;
let creation_date = Utc::now();
let is_tombstone: bool = false;
let entry = BlockEntry {
key_prefix: key.len() as u32,
key: key.clone(),
value_offset,
creation_date,
is_tombstone,
};
let res = block.serialize(&entry);
assert!(res.is_ok());
assert_eq!(
res.unwrap().len(),
key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8
);
}
#[tokio::test]
async fn test_write_to_file() {
let mut block = Block::new();
let key: Key = vec![1, 2, 3];
let value_offset: u32 = 1000;
let creation_date = Utc::now();
let is_tombstone: bool = false;
let res = block.set_entry(key.len() as u32, &key, value_offset, creation_date, is_tombstone);
assert!(res.is_ok());
assert_eq!(block.entries.len(), 1);
assert_eq!(block.entry_count, 1);
assert_eq!(
block.size,
key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8
);
let temp_file = NamedTempFile::new().unwrap();
let temp_file_path = temp_file.path().to_path_buf();
let std_file = temp_file.into_file();
let tokio_file = File::from_std(std_file);
let file = FileNode {
file_path: temp_file_path.to_owned(),
file: Arc::new(RwLock::new(tokio_file)),
file_type: crate::fs::FileType::Data,
};
let write_res = block.write_to_file(file.clone()).await;
assert!(write_res.is_ok());
assert_eq!(write_res.unwrap(), block.size)
}
#[test]
fn test_get_entry() {
let mut block = Block::new();
let key: Key = vec![1, 2, 3];
let value_offset: u32 = 1000;
let creation_date = Utc::now();
let is_tombstone: bool = false;
let res = block.set_entry(key.len() as u32, &key, value_offset, creation_date, is_tombstone);
assert!(res.is_ok());
let entry = block.get_entry(&key);
assert!(entry.is_some());
assert_eq!(entry.unwrap().key, key);
}
#[test]
fn test_get_value_nonexistent_key() {
let block = Block::new();
let key: Key = vec![1, 2, 3];
let value = block.get_entry(&key);
assert!(value.is_none());
}
#[test]
fn test_set_entry_full_block() {
let mut block = Block::new();
let key: Key = vec![1, 2, 3];
let value_offset: u32 = 1000;
let creation_date = Utc::now();
let is_tombstone: bool = false;
while !block.is_full(key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8) {
block
.set_entry(key.len() as u32, &key, value_offset, creation_date, is_tombstone)
.unwrap();
}
let res = block.set_entry(key.len() as u32, &key, value_offset, creation_date, is_tombstone);
assert!(res.is_err());
assert_eq!(
block.get_entry_count(),
BLOCK_SIZE / (key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8)
);
}
}