use std::io::{Read, Seek, SeekFrom, Write};
use crate::datatypes::{zstd_encoder, CompressionType};
pub struct U64BlockStore {
location: u64,
block_index_pos: u64,
block_locations: Option<Vec<u32>>,
block_size: u64,
pub data: Option<Vec<u64>>,
pub compression_type: CompressionType,
cache: Option<(u32, Vec<u64>)>,
compressed_blocks: Option<Vec<u8>>,
compressed_block_lens: Option<Vec<usize>>,
counter: usize,
}
impl Default for U64BlockStore {
fn default() -> Self {
U64BlockStore {
location: 0,
block_index_pos: 0,
block_locations: None,
block_size: 2048, data: None,
compression_type: CompressionType::ZSTD,
cache: None,
compressed_blocks: None,
compressed_block_lens: None,
counter: 0,
}
}
}
impl U64BlockStore {
pub fn with_location(loc: u64) -> Self {
let mut store = U64BlockStore::default();
store.location = loc;
store
}
pub fn with_block_size(mut self, block_size: usize) -> Self {
self.block_size = block_size as u64;
self
}
fn compress_block(&mut self) {
let mut compressor = zstd_encoder(3, None);
let bincode_config = bincode::config::standard().with_fixed_int_encoding();
if self.compressed_blocks.is_none() {
self.compressed_block_lens = Some(Vec::new());
self.compressed_blocks = Some(Vec::new());
}
#[cfg(test)]
let mut compressed = Vec::with_capacity(8192);
#[cfg(not(test))]
let mut compressed = Vec::with_capacity(self.block_size as usize * 8);
let at = std::cmp::min(self.block_size as usize, self.data.as_mut().unwrap().len());
let mut block = self.data.as_mut().unwrap().split_off(at);
block.reserve(self.block_size as usize);
std::mem::swap(&mut block, self.data.as_mut().unwrap());
let data = bincode::encode_to_vec(&block, bincode_config).unwrap();
let compressed_size = compressor
.compress_to_buffer(&data, &mut compressed)
.unwrap();
self.compressed_block_lens
.as_mut()
.unwrap()
.push(compressed_size);
self.compressed_blocks.as_mut().unwrap().extend(compressed);
}
pub fn add(&mut self, input: u64) -> usize {
if self.data.is_none() {
self.data = Some(Vec::with_capacity(self.block_size as usize));
}
while self.data.as_ref().unwrap().len() > self.block_size as usize {
self.compress_block();
}
let data = self.data.as_mut().unwrap();
data.push(input);
self.counter += 1;
self.counter
}
pub fn emit_blocks(&mut self) -> Vec<&[u8]> {
while self.data.as_ref().unwrap().len() > 0 {
self.compress_block();
}
let data = self.compressed_blocks.as_ref().unwrap();
let mut blocks = Vec::new();
let mut start = 0;
for len in self.compressed_block_lens.as_ref().unwrap() {
blocks.push(&data[start..start + len]);
start += len;
}
blocks
}
pub fn write_to_buffer<W>(&mut self, mut out_buf: &mut W) -> Option<u64>
where
W: Write + Seek,
{
self.data.as_ref()?;
let bincode_config = bincode::config::standard().with_fixed_int_encoding();
let mut block_locations_pos: u64 = 0;
let starting_pos = out_buf.seek(SeekFrom::Current(0)).unwrap();
bincode::encode_into_std_write(self.compression_type, &mut out_buf, bincode_config)
.unwrap();
bincode::encode_into_std_write(block_locations_pos, &mut out_buf, bincode_config).unwrap();
bincode::encode_into_std_write(self.block_size, &mut out_buf, bincode_config).unwrap();
let mut block_locations = Vec::new();
let compressed_blocks = self.emit_blocks();
for compressed_block in compressed_blocks {
let block_start = out_buf.seek(SeekFrom::Current(0)).unwrap();
bincode::encode_into_std_write(&compressed_block, &mut out_buf, bincode_config)
.unwrap();
block_locations.push((block_start - starting_pos) as u32);
}
block_locations_pos = out_buf.seek(SeekFrom::Current(0)).unwrap();
let bincoded_block_locations_size =
bincode::encode_to_vec(&block_locations, bincode_config).unwrap();
let compressed_block_locations =
zstd::bulk::compress(&bincoded_block_locations_size, -3).unwrap();
bincode::encode_into_std_write(&compressed_block_locations, &mut out_buf, bincode_config)
.unwrap();
self.block_locations = Some(block_locations);
let end = out_buf.seek(SeekFrom::Current(0)).unwrap();
out_buf.seek(SeekFrom::Start(starting_pos)).unwrap();
bincode::encode_into_std_write(self.compression_type, &mut out_buf, bincode_config)
.unwrap();
bincode::encode_into_std_write(block_locations_pos, &mut out_buf, bincode_config).unwrap();
bincode::encode_into_std_write(self.block_size, &mut out_buf, bincode_config).unwrap();
out_buf.seek(SeekFrom::Start(end)).unwrap();
Some(starting_pos)
}
pub fn from_buffer<R>(mut in_buf: &mut R, starting_pos: u64) -> Result<Self, String>
where
R: Read + Seek,
{
let bincode_config = bincode::config::standard()
.with_fixed_int_encoding()
.with_limit::<{ 4 * 1024 * 1024 }>();
let mut store = U64BlockStore::default();
in_buf.seek(SeekFrom::Start(starting_pos)).unwrap();
(
store.compression_type,
store.block_index_pos,
store.block_size,
) = match bincode::decode_from_std_read(&mut in_buf, bincode_config) {
Ok(x) => x,
Err(e) => return Err(format!("Error decoding block store: {}", e)),
};
store.location = starting_pos;
in_buf.seek(SeekFrom::Start(store.block_index_pos)).unwrap();
let compressed: Vec<u8> = match bincode::decode_from_std_read(&mut in_buf, bincode_config) {
Ok(x) => x,
Err(e) => return Err(format!("Error decoding block locations: {}", e)),
};
let block_locations: Vec<u8> = zstd::stream::decode_all(&compressed[..]).unwrap();
let block_locations: Vec<u32> =
bincode::decode_from_slice(&block_locations, bincode_config)
.unwrap()
.0;
store.block_locations = Some(block_locations);
Ok(store)
}
pub fn prefetch<R>(&mut self, in_buf: &mut R)
where
R: Read + Seek,
{
let mut data = Vec::with_capacity(
self.block_size as usize * self.block_locations.as_ref().unwrap().len(),
);
for i in 0..self.block_locations.as_ref().unwrap().len() {
data.extend(self.get_block_uncached(in_buf, i as u32));
}
log::info!("Generic Block Store Prefetching done: {}", data.len());
self.data = Some(data);
}
pub fn get_block<R>(&mut self, in_buf: &mut R, block: u32) -> Vec<u64>
where
R: Read + Seek,
{
if self.cache.is_some() && self.cache.as_ref().unwrap().0 == block {
return self.cache.as_ref().unwrap().1.clone();
} else {
self.cache = Some((block, self.get_block_uncached(in_buf, block)));
return self.cache.as_ref().unwrap().1.clone();
}
}
pub fn get_block_uncached<R>(&mut self, mut in_buf: &mut R, block: u32) -> Vec<u64>
where
R: Read + Seek,
{
let bincode_config = bincode::config::standard()
.with_fixed_int_encoding()
.with_limit::<{ 128 * 1024 * 1024 }>();
let block_locations = self.block_locations.as_ref().unwrap();
let mut decompressor = zstd::bulk::Decompressor::new().unwrap();
decompressor.include_magicbytes(false).unwrap();
let block_location = block_locations[block as usize];
in_buf
.seek(SeekFrom::Start(self.location + block_location as u64))
.unwrap();
let compressed_block: Vec<u8> =
bincode::decode_from_std_read(&mut in_buf, bincode_config).unwrap();
let decompressed_block = decompressor
.decompress(&compressed_block, 16 * self.block_size as usize)
.unwrap();
bincode::decode_from_slice(&decompressed_block, bincode_config)
.unwrap()
.0
}
pub fn get<R>(&mut self, in_buf: &mut R, x: usize) -> u64
where
R: Read + Seek,
{
if self.data.is_some() {
self.data.as_ref().unwrap()[x]
} else {
let (block, position_in_block) =
(x / self.block_size as usize, x % self.block_size as usize);
self.get_block(in_buf, block as u32)[position_in_block]
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
}