use crossbeam_channel::{bounded, Receiver};
use std::collections::HashMap;
use std::io::{self, Read};
use std::sync::Arc;
use crate::{decompress_block_into, scan_blocks, Bz2Error, Result};
pub struct Bz2Decoder {
#[allow(dead_code)]
data: Arc<dyn AsRef<[u8]> + Send + Sync>,
receiver: Receiver<(usize, Vec<u8>)>,
buffer: Vec<u8>,
buffer_pos: usize,
next_block_idx: usize,
pending_blocks: HashMap<usize, Vec<u8>>,
}
impl Bz2Decoder {
pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
let file = std::fs::File::open(path)?;
let mmap =
unsafe { memmap2::MmapOptions::new().map(&file) }.map_err(Bz2Error::MmapFailed)?;
Ok(Self::new(Arc::new(mmap)))
}
pub fn new<T>(data: Arc<T>) -> Self
where
T: AsRef<[u8]> + Send + Sync + 'static,
{
let (result_sender, result_receiver) = bounded(rayon::current_num_threads() * 2);
let data_ref: Arc<dyn AsRef<[u8]> + Send + Sync> = data;
let data_clone = data_ref.clone();
std::thread::spawn(move || {
let slice = data_clone.as_ref().as_ref();
let task_receiver = scan_blocks(slice);
use rayon::prelude::*;
let _ = task_receiver
.into_iter()
.enumerate() .par_bridge() .try_for_each_init(
Vec::new, |scratch, (idx, (start_bit, end_bit))| -> Result<()> {
let mut decomp_buf = Vec::new();
decompress_block_into(slice, start_bit, end_bit, &mut decomp_buf, scratch)?;
result_sender.send((idx, decomp_buf)).unwrap();
Ok(())
},
);
});
Self {
data: data_ref,
receiver: result_receiver,
buffer: Vec::new(),
buffer_pos: 0,
next_block_idx: 0,
pending_blocks: HashMap::new(),
}
}
}
impl std::fmt::Debug for Bz2Decoder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bz2Decoder")
.field("buffer_len", &self.buffer.len())
.field("buffer_pos", &self.buffer_pos)
.field("next_block_idx", &self.next_block_idx)
.field("pending_blocks_count", &self.pending_blocks.len())
.finish_non_exhaustive()
}
}
impl Read for Bz2Decoder {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buffer_pos < self.buffer.len() {
let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
self.buffer_pos += len;
return Ok(len);
}
loop {
if let Some(block) = self.pending_blocks.remove(&self.next_block_idx) {
self.buffer = block;
self.buffer_pos = 0;
self.next_block_idx += 1;
return self.read(buf);
}
match self.receiver.recv() {
Ok((idx, block)) => {
if idx == self.next_block_idx {
self.buffer = block;
self.buffer_pos = 0;
self.next_block_idx += 1;
return self.read(buf);
} else {
self.pending_blocks.insert(idx, block);
}
}
Err(_) => {
return Ok(0);
}
}
}
}
}