use std::collections::BTreeMap;
use std::io::Write;
use crossbeam::channel::{Receiver, Sender};
use crate::bgzf::GziEntry;
use crate::deflate::LZ77Token;
use crate::error::{Error, Result};
use crate::huffman::HuffmanEncoder;
pub(super) struct EncodingJob {
pub block_id: u64,
pub tokens: Vec<LZ77Token>,
pub uncompressed_size: u32,
pub crc: u32,
}
pub(super) struct EncodedBlock {
pub block_id: u64,
pub data: Vec<u8>,
pub uncompressed_size: u32,
}
fn encode_block(encoder: &mut HuffmanEncoder, job: EncodingJob) -> Result<EncodedBlock> {
let crc = job.crc;
let usize_val = job.uncompressed_size;
let deflate_data = encoder.encode(&job.tokens, true)?;
let block_size = 18 + deflate_data.len() + 8; let bsize = block_size - 1;
let mut data = Vec::with_capacity(block_size);
data.extend_from_slice(&[
0x1f,
0x8b, 0x08, 0x04, 0x00,
0x00,
0x00,
0x00, 0x00, 0xff, 0x06,
0x00, 0x42,
0x43, 0x02,
0x00, (bsize & 0xFF) as u8,
((bsize >> 8) & 0xFF) as u8,
]);
data.extend_from_slice(&deflate_data);
data.extend_from_slice(&crc.to_le_bytes());
data.extend_from_slice(&usize_val.to_le_bytes());
Ok(EncodedBlock { block_id: job.block_id, data, uncompressed_size: usize_val })
}
pub(super) fn encoding_worker(
job_rx: Receiver<EncodingJob>,
result_tx: Sender<Result<EncodedBlock>>,
use_fixed_huffman: bool,
) {
let mut encoder = HuffmanEncoder::new(use_fixed_huffman);
while let Ok(job) = job_rx.recv() {
let result = encode_block(&mut encoder, job);
if result_tx.send(result).is_err() {
break;
}
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn send_job_and_drain<W: Write>(
job_tx: &Sender<EncodingJob>,
result_rx: &Receiver<Result<EncodedBlock>>,
job: EncodingJob,
writer: &mut W,
pending_blocks: &mut BTreeMap<u64, EncodedBlock>,
next_write_id: &mut u64,
blocks_written: &mut u64,
output_bytes: &mut u64,
build_index: bool,
index_entries: &mut Vec<GziEntry>,
current_compressed_offset: &mut u64,
current_uncompressed_offset: &mut u64,
) -> Result<()> {
let mut job_to_send = Some(job);
while let Some(j) = job_to_send.take() {
match job_tx.try_send(j) {
Ok(()) => {}
Err(crossbeam::channel::TrySendError::Full(returned)) => {
job_to_send = Some(returned);
match result_rx.recv() {
Ok(result) => {
let block = result?;
buffer_and_write_block(
writer,
block,
pending_blocks,
next_write_id,
blocks_written,
output_bytes,
build_index,
index_entries,
current_compressed_offset,
current_uncompressed_offset,
)?;
}
Err(_) => return Err(Error::Internal("Result channel disconnected".into())),
}
}
Err(crossbeam::channel::TrySendError::Disconnected(_)) => {
return Err(Error::Internal("Workers disconnected".into()));
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) fn buffer_and_write_block<W: Write>(
writer: &mut W,
block: EncodedBlock,
pending: &mut BTreeMap<u64, EncodedBlock>,
next_write_id: &mut u64,
blocks_written: &mut u64,
output_bytes: &mut u64,
build_index: bool,
index_entries: &mut Vec<GziEntry>,
current_compressed_offset: &mut u64,
current_uncompressed_offset: &mut u64,
) -> Result<()> {
if block.block_id == *next_write_id {
write_single_block(
writer,
&block.data,
block.uncompressed_size,
output_bytes,
build_index,
index_entries,
current_compressed_offset,
current_uncompressed_offset,
)?;
*blocks_written += 1;
*next_write_id += 1;
while let Some(buffered) = pending.remove(next_write_id) {
write_single_block(
writer,
&buffered.data,
buffered.uncompressed_size,
output_bytes,
build_index,
index_entries,
current_compressed_offset,
current_uncompressed_offset,
)?;
*blocks_written += 1;
*next_write_id += 1;
}
} else {
pending.insert(block.block_id, block);
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) fn write_single_block<W: Write>(
writer: &mut W,
data: &[u8],
uncompressed_size: u32,
output_bytes: &mut u64,
build_index: bool,
index_entries: &mut Vec<GziEntry>,
current_compressed_offset: &mut u64,
current_uncompressed_offset: &mut u64,
) -> Result<()> {
if build_index {
index_entries.push(GziEntry {
compressed_offset: *current_compressed_offset,
uncompressed_offset: *current_uncompressed_offset,
});
}
*output_bytes += data.len() as u64;
writer.write_all(data).map_err(Error::Io)?;
*current_compressed_offset += data.len() as u64;
*current_uncompressed_offset += uncompressed_size as u64;
Ok(())
}