use super::chunks::*;
use byteorder::{BigEndian, ByteOrder};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io::Error;
use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
use std::thread;
use crossbeam_channel::{bounded, Sender, Receiver};
use crossbeam_utils::sync::WaitGroup;
const ENTRIES_PER_CHUNK: u32 = 100_000;
#[derive(Debug)]
struct ThreadManager<T1, T2> {
pub sender_work: Sender<T1>,
pub receiver_work: Receiver<T1>,
pub sender_result: Sender<T2>,
pub receiver_result: Receiver<T2>,
pub wg: WaitGroup,
pub threads_started: bool,
}
#[derive(Debug)]
pub struct BDFReader {
reader: BufReader<File>,
pub metadata: Option<MetaChunk>,
pub lookup_table: Option<HashLookupTable>,
compressed: bool,
thread_manager: ThreadManager<GenericChunk, GenericChunk>,
}
#[derive(Debug)]
pub struct BDFWriter {
writer: BufWriter<File>,
metadata: MetaChunk,
lookup_table: HashLookupTable,
data_entries: Vec<DataEntry>,
head_written: bool,
compressed: bool,
compression_level: u32,
thread_manager: ThreadManager<GenericChunk, Vec<u8>>,
}
impl<T1, T2> ThreadManager<T1, T2> {
pub fn new(cap: usize) -> Self {
let (s1, r1) = bounded(cap);
let (s2, r2) = bounded(cap);
Self {
sender_work: s1,
receiver_work: r1,
sender_result: s2,
receiver_result: r2,
wg: WaitGroup::new(),
threads_started: false,
}
}
pub fn drop_sender(&mut self) {
let sender = self.sender_work.clone();
let (s1, _) = bounded(0);
self.sender_work = s1;
drop(sender);
}
pub fn drop_sender_result(&mut self) {
let sender = self.sender_result.clone();
let (s2,_) = bounded(0);
self.sender_result = s2;
drop(sender);
}
pub fn wait(&mut self) {
let wg = self.wg.clone();
self.wg = WaitGroup::new();
wg.wait();
}
}
impl BDFWriter {
pub fn new(inner: File, entry_count: u64, compress: bool) -> Self {
Self {
metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress),
lookup_table: HashLookupTable::new(HashMap::new()),
data_entries: Vec::new(),
writer: BufWriter::new(inner),
head_written: false,
compressed: compress,
compression_level: 1,
thread_manager: ThreadManager::new(num_cpus::get()),
}
}
fn start_threads(&self) {
for _ in 0..num_cpus::get() {
let compress = self.compressed;
let compression_level = self.compression_level;
thread::spawn({
let r = self.thread_manager.receiver_work.clone();
let s = self.thread_manager.sender_result.clone();
let wg: WaitGroup = self.thread_manager.wg.clone();
move || {
for mut chunk in r {
if compress {
chunk.compress(compression_level).expect("failed to compress chunk");
}
s.send(chunk.serialize()).expect("failed to send result");
}
drop(wg);
drop(s);
}
});
}
}
pub fn add_lookup_entry(&mut self, mut entry: HashEntry) -> Result<u32, Error> {
if self.head_written {
return Err(Error::new(
ErrorKind::Other,
"the head has already been written",
));
}
let id = self.lookup_table.entries.len() as u32;
entry.id = id;
self.lookup_table.entries.insert(id, entry);
Ok(id)
}
pub fn add_data_entry(&mut self, data_entry: DataEntry) -> Result<(), Error> {
self.data_entries.push(data_entry);
if self.data_entries.len() >= self.metadata.entries_per_chunk as usize {
self.flush()?;
}
Ok(())
}
fn flush(&mut self) -> Result<(), Error> {
if !self.head_written {
self.writer.write(BDF_HDR)?;
let mut generic_meta = GenericChunk::from(&self.metadata);
self.writer.write(generic_meta.serialize().as_slice())?;
let mut generic_lookup = GenericChunk::from(&self.lookup_table);
self.writer.write(generic_lookup.serialize().as_slice())?;
self.head_written = true;
}
if !self.thread_manager.threads_started {
self.start_threads();
self.thread_manager.threads_started = true;
}
let data_chunk =
GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table);
self.thread_manager.sender_work.send(data_chunk).expect("failed to send work to threads");
self.write_serialized()?;
self.data_entries = Vec::new();
Ok(())
}
fn write_serialized(&mut self) -> Result<(), Error> {
while let Ok(data) = self.thread_manager.receiver_result.try_recv() {
self.writer.write(data.as_slice())?;
}
Ok(())
}
fn flush_writer(&mut self) -> Result<(), Error> {
self.writer.flush()
}
pub fn finish(&mut self) -> Result<(), Error> {
self.flush()?;
self.thread_manager.drop_sender();
self.thread_manager.wait();
self.write_serialized()?;
self.flush_writer()?;
Ok(())
}
pub fn set_compression_level(&mut self, level: u32) {
self.compression_level = level;
}
pub fn set_entries_per_chunk(&mut self, number: u32) -> Result<(), Error> {
if self.head_written {
return Err(Error::new(
ErrorKind::Other,
"the head has already been written",
));
}
self.metadata.entries_per_chunk = number;
self.metadata.chunk_count =
(self.metadata.entry_count as f64 / number as f64).ceil() as u32;
Ok(())
}
}
impl BDFReader {
pub fn new(inner: File) -> Self {
Self {
metadata: None,
lookup_table: None,
reader: BufReader::new(inner),
compressed: false,
thread_manager: ThreadManager::new(num_cpus::get() * 2),
}
}
pub fn read_start(&mut self) -> Result<(), Error> {
self.read_metadata()?;
self.read_lookup_table()?;
Ok(())
}
fn start_threads(&mut self) {
for _ in 0..(num_cpus::get() as f32/2f32).max(1f32) as usize {
thread::spawn({
let r = self.thread_manager.receiver_work.clone();
let s = self.thread_manager.sender_result.clone();
let wg = self.thread_manager.wg.clone();
move || {
for mut chunk in r {
chunk.decompress().expect("failed to decompress chunk");
s.send(chunk).expect("failed to send decompression result");
}
drop(wg);
}
});
}
for _ in 0..num_cpus::get() * 2 {
if let Err(_) = self.add_compression_chunk() {
self.thread_manager.drop_sender();
break;
}
}
self.thread_manager.drop_sender_result();
}
pub fn add_compression_chunk(&mut self) -> Result<(), Error> {
let gen_chunk = self.next_chunk_raw()?;
if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed {
if let Err(_) = self.thread_manager.sender_work.send(gen_chunk) {
return Err(Error::new(ErrorKind::Other, "failed to send chunk data"))
}
}
Ok(())
}
pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> {
if !self.validate_header() {
return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header"));
}
let meta_chunk: MetaChunk = self.next_chunk_raw()?.try_into()?;
if let Some(method) = &meta_chunk.compression_method {
if *method == LZMA.to_string() {
self.compressed = true;
} else {
return Err(Error::new(
ErrorKind::Other,
"unsupported compression method",
));
}
}
self.metadata = Some(meta_chunk);
if let Some(chunk) = &self.metadata {
Ok(&chunk)
} else {
Err(Error::new(
ErrorKind::Other,
"Failed to read self assigned metadata.",
))
}
}
pub fn read_lookup_table(&mut self) -> Result<&HashLookupTable, Error> {
match &self.metadata {
None => self.read_metadata()?,
Some(t) => t,
};
let lookup_table: HashLookupTable = self.next_chunk_raw()?.try_into()?;
self.lookup_table = Some(lookup_table);
if self.compressed {
self.start_threads();
}
if let Some(chunk) = &self.lookup_table {
Ok(&chunk)
} else {
Err(Error::new(
ErrorKind::Other,
"failed to read self assigned chunk",
))
}
}
fn validate_header(&mut self) -> bool {
let mut header = [0u8; 11];
let _ = self.reader.read(&mut header);
header == BDF_HDR.as_ref()
}
pub fn next_chunk(&mut self) -> Result<GenericChunk, Error> {
if self.compressed {
if let Err(_) = self.add_compression_chunk() {
self.thread_manager.drop_sender();
}
if let Ok(chunk) = self.thread_manager.receiver_result.recv() {
Ok(chunk)
} else {
Err(Error::new(ErrorKind::Other, "failed to get chunk"))
}
} else {
self.next_chunk_raw()
}
}
fn next_chunk_raw(&mut self) -> Result<GenericChunk, Error> {
let mut length_raw = [0u8; 4];
let _ = self.reader.read_exact(&mut length_raw)?;
let length = BigEndian::read_u32(&mut length_raw);
let mut name_raw = [0u8; 4];
let _ = self.reader.read_exact(&mut name_raw)?;
let name = String::from_utf8(name_raw.to_vec()).expect("Failed to parse name string.");
let mut data = vec![0u8; length as usize];
let _ = self.reader.read_exact(&mut data)?;
let mut crc_raw = [0u8; 4];
let _ = self.reader.read_exact(&mut crc_raw)?;
let crc = BigEndian::read_u32(&mut crc_raw);
Ok(GenericChunk {
length,
name,
data,
crc,
})
}
}