use crate::errors::Errors;
use crate::options::DharmaOpts;
use crate::traits::{ResourceKey, ResourceValue};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fs::File;
use std::io::Write;
#[derive(Serialize, Deserialize, Clone)]
pub struct Value<K, V> {
pub key: K,
pub value: V,
}
impl<K, V> PartialEq for Value<K, V>
where
K: ResourceKey,
V: ResourceValue,
{
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl<K, V> Value<K, V>
where
K: ResourceKey,
V: ResourceValue,
{
pub fn new(key: K, value: V) -> Value<K, V> {
Value { key, value }
}
}
#[derive(Copy, Clone)]
pub enum RecordType {
PADDING = 0,
COMPLETE = 1,
START = 2,
MIDDLE = 3,
END = 4,
UNKNOWN = 5,
}
pub fn to_record_type(val: u8) -> RecordType {
return match val {
0 => RecordType::PADDING,
1 => RecordType::COMPLETE,
2 => RecordType::START,
3 => RecordType::MIDDLE,
4 => RecordType::END,
_ => RecordType::UNKNOWN,
};
}
pub struct Record {
pub record_type: RecordType,
pub data_size_in_bytes: u16,
pub data: Vec<u8>,
}
impl Record {
pub const RECORD_BASE_SIZE_IN_BYTES: usize = 3;
pub fn with_padding(size: u16) -> Record {
Record {
record_type: RecordType::PADDING,
data_size_in_bytes: size,
data: Vec::new(),
}
}
}
pub struct Block {
pub records: Vec<Record>,
}
impl Block {
pub fn new() -> Block {
Block {
records: Vec::new(),
}
}
pub fn add(&mut self, record: Record) {
self.records.push(record);
}
}
pub fn create_blocks<K: ResourceKey, V: ResourceValue>(
options: &DharmaOpts,
values: &Vec<Value<K, V>>,
block_vec: &mut Vec<Block>,
) {
let mut current_block = Block::new();
let mut available_memory_in_bytes = options.block_size_in_bytes;
let mut i = 0;
while i < values.len() {
let val = &values[i];
let encoded = bincode::serialize(val).unwrap();
let record_size = encoded.len();
let required_record_size = Record::RECORD_BASE_SIZE_IN_BYTES + record_size;
match available_memory_in_bytes.cmp(&required_record_size) {
Ordering::Less => {
let mut record_offset = 0;
if available_memory_in_bytes > Record::RECORD_BASE_SIZE_IN_BYTES {
let mut is_first_chunk = true;
while available_memory_in_bytes > Record::RECORD_BASE_SIZE_IN_BYTES {
available_memory_in_bytes -= Record::RECORD_BASE_SIZE_IN_BYTES;
let mut record_type = RecordType::START;
if !is_first_chunk {
record_type = RecordType::MIDDLE;
}
let mut record_offset_end = record_offset + available_memory_in_bytes;
if record_offset_end >= record_size {
record_offset_end = record_size;
record_type = RecordType::END;
}
let mut data_chunk: Vec<u8> = Vec::new();
for i in record_offset..record_offset_end {
data_chunk.push(encoded[i]);
}
let processed_memory_in_bytes = record_offset_end - record_offset;
record_offset = record_offset_end;
let record = Record {
record_type,
data_size_in_bytes: data_chunk.len() as u16,
data: data_chunk,
};
current_block.add(record);
match record_type {
RecordType::END => {
available_memory_in_bytes -= processed_memory_in_bytes;
if available_memory_in_bytes == 0 {
block_vec.push(current_block);
current_block = Block::new();
available_memory_in_bytes = options.block_size_in_bytes;
}
i += 1;
break;
}
_ => {
block_vec.push(current_block);
current_block = Block::new();
available_memory_in_bytes = options.block_size_in_bytes;
is_first_chunk = false;
}
}
}
} else {
let bytes_to_pad = available_memory_in_bytes - 1;
let padding = Record::with_padding(bytes_to_pad as u16);
current_block.add(padding);
block_vec.push(current_block);
current_block = Block::new();
available_memory_in_bytes = options.block_size_in_bytes;
}
}
Ordering::Equal => {
let record = Record {
record_type: RecordType::COMPLETE,
data_size_in_bytes: record_size as u16,
data: encoded,
};
current_block.add(record);
block_vec.push(current_block);
current_block = Block::new();
available_memory_in_bytes = options.block_size_in_bytes;
i += 1;
}
Ordering::Greater => {
let record = Record {
record_type: RecordType::COMPLETE,
data_size_in_bytes: record_size as u16,
data: encoded,
};
current_block.add(record);
available_memory_in_bytes -= required_record_size;
i += 1;
}
}
}
if current_block.records.len() > 0 {
block_vec.push(current_block);
}
}
pub fn write_block_to_disk(
options: &DharmaOpts,
file_handle: &mut File,
block: &Block,
) -> Result<(), Errors> {
let mut written_size_in_bytes = 0;
for record in &block.records {
match record.record_type {
RecordType::PADDING => {
let type_bytes: [u8; 1] = 0_u8.to_be_bytes();
let size_bytes: [u8; 2] = record.data_size_in_bytes.to_be_bytes();
let final_bytes = [type_bytes[0], size_bytes[0], size_bytes[1]];
let mut padding_bytes: Vec<u8> =
Vec::with_capacity(record.data_size_in_bytes as usize);
for i in 0..record.data_size_in_bytes {
padding_bytes.push(0u8);
}
file_handle.write(&final_bytes);
file_handle.write(padding_bytes.as_slice());
written_size_in_bytes += final_bytes.len() + padding_bytes.len();
}
_ => {
let record_type = record.record_type as u8;
let type_bytes: [u8; 1] = record_type.to_be_bytes();
let size_bytes: [u8; 2] = record.data_size_in_bytes.to_be_bytes();
let data_bytes: &[u8] = &record.data;
file_handle.write(&type_bytes);
file_handle.write(&size_bytes);
written_size_in_bytes += 3;
file_handle.write(data_bytes);
written_size_in_bytes += data_bytes.len();
}
}
}
if written_size_in_bytes < options.block_size_in_bytes as usize {
let mut available_space_in_bytes = options.block_size_in_bytes - written_size_in_bytes;
if available_space_in_bytes > Record::RECORD_BASE_SIZE_IN_BYTES {
available_space_in_bytes -= Record::RECORD_BASE_SIZE_IN_BYTES;
let type_bytes: [u8; 1] = 0_i8.to_be_bytes();
let size_bytes = (available_space_in_bytes as u16).to_be_bytes();
let mut padding: Vec<u8> = Vec::with_capacity(available_space_in_bytes as usize);
for _ in 0..available_space_in_bytes {
padding.push(0u8);
}
file_handle.write(&type_bytes);
file_handle.write(&size_bytes);
file_handle.write(padding.as_slice());
} else {
let mut padding: Vec<u8> = Vec::with_capacity(available_space_in_bytes as usize);
for _ in 0..available_space_in_bytes {
padding.push(0u8);
}
file_handle.write(padding.as_slice());
available_space_in_bytes = 0;
}
}
Ok(())
}