use crate::errors::Errors;
use crate::options::DharmaOpts;
use crate::storage::block::{create_blocks, write_block_to_disk, Record, Value};
use crate::traits::{ResourceKey, ResourceValue};
use buffered_offset_reader::{BufOffsetReader, OffsetReadMut};
use log;
use serde::de::DeserializeOwned;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
pub fn write_sstable<K: ResourceKey, V: ResourceValue>(
options: &DharmaOpts,
tuples: &Vec<(K, V)>,
table_number: usize,
) -> Result<PathBuf, Errors> {
let values: Vec<Value<K, V>> = tuples
.iter()
.map(|tup| {
return Value::new(tup.0.clone(), tup.1.clone());
})
.collect();
let mut blocks = Vec::new();
create_blocks(options, &values, &mut blocks);
let path_str = format!("{0}/tables/{1}.db", options.path, table_number);
let path = Path::new(&path_str);
if path.parent().is_some() && !path.parent().unwrap().exists() {
create_dir_all(path.parent().unwrap());
}
let file_result = File::create(&path);
if file_result.is_ok() {
let mut file = file_result.unwrap();
for (block_counter, block) in blocks.iter().enumerate() {
let write_result = write_block_to_disk(options, &mut file, &block);
if write_result.is_err() {
log::error!(
"Failed to write block from chunk {0} to disk",
block_counter
);
return Err(Errors::SSTABLE_CREATION_FAILED);
}
}
} else {
log::error!("Failed to create SSTable from chunk from values");
return Err(Errors::SSTABLE_CREATION_FAILED);
}
Ok(PathBuf::from(path_str))
}
pub fn write_sstable_at_path<K: ResourceKey, V: ResourceValue>(
options: &DharmaOpts,
tuples: &Vec<(K, V)>,
path: &PathBuf,
) -> Result<(), Errors> {
let values: Vec<Value<K, V>> = tuples
.iter()
.map(|tup| {
return Value::new(tup.0.clone(), tup.1.clone());
})
.collect();
let mut blocks = Vec::new();
create_blocks(options, &values, &mut blocks);
let file_result = File::create(path);
if file_result.is_ok() {
let mut file = file_result.unwrap();
for (block_counter, block) in blocks.iter().enumerate() {
let write_result = write_block_to_disk(options, &mut file, &block);
if write_result.is_err() {
log::error!(
"Failed to write block from chunk {0} to disk",
block_counter
);
return Err(Errors::SSTABLE_CREATION_FAILED);
}
}
} else {
log::error!("Failed to create SSTable from chunk from values");
return Err(Errors::SSTABLE_CREATION_FAILED);
}
Ok(())
}
pub fn read_sstable<K: DeserializeOwned, V: DeserializeOwned>(
options: &DharmaOpts,
path: &Path,
) -> Result<Vec<Value<K, V>>, Errors> {
let mut output: Vec<Value<K, V>> = Vec::new();
let file_result = File::open(path);
if file_result.is_ok() {
let file = file_result.unwrap();
let metadata = file.metadata().unwrap();
let total_size_in_bytes = metadata.len();
let block_count =
(total_size_in_bytes as f64 / options.block_size_in_bytes as f64).ceil() as u64;
let mut i = 0;
let mut reader = BufOffsetReader::new(file);
let mut record_byte_buffer = Vec::new();
while i < block_count {
let mut buffer = vec![0u8; options.block_size_in_bytes as usize];
reader.read_at(&mut buffer, i * options.block_size_in_bytes as u64);
let mut r = 0;
while r < buffer.len() {
let record_type = buffer[r];
match record_type {
0 => {
let remaining = buffer.len() - r;
if remaining <= Record::RECORD_BASE_SIZE_IN_BYTES {
r += remaining;
} else {
let upper_size_byte = buffer[r + 1] as u16;
let lower_size_byte = buffer[r + 2] as u16;
let size = (upper_size_byte << 8 | lower_size_byte) as usize;
r += 3;
r += size;
}
}
1 => {
let upper_size_byte = buffer[r + 1] as u16;
let lower_size_byte = buffer[r + 2] as u16;
let size = (upper_size_byte << 8 | lower_size_byte) as usize;
r += 3;
let decoded: Value<K, V> =
bincode::deserialize(&buffer[r..r + size]).unwrap();
output.push(decoded);
r += size;
}
2 | 3 => {
let upper_size_byte = buffer[r + 1] as u16;
let lower_size_byte = buffer[r + 2] as u16;
let size = (upper_size_byte << 8 | lower_size_byte) as usize;
r += 3;
for i in 0..size {
record_byte_buffer.push(buffer[r + i]);
}
r += size;
}
4 => {
let upper_size_byte = buffer[r + 1] as u16;
let lower_size_byte = buffer[r + 2] as u16;
let size = (upper_size_byte << 8 | lower_size_byte) as usize;
r += 3;
for i in 0..size {
record_byte_buffer.push(buffer[r + i]);
}
let decoded: Value<K, V> =
bincode::deserialize(record_byte_buffer.as_slice()).unwrap();
output.push(decoded);
r += size;
record_byte_buffer = Vec::new();
}
_ => {}
}
}
i += 1;
}
return Ok(output);
}
Err(Errors::SSTABLE_READ_FAILED)
}