#[cfg(feature = "rayon")]
mod thread;
#[cfg(feature = "rayon")]
pub use thread::BGZFMultiThreadWriter;
use crate::header::BGZFHeader;
use crate::index::{BGZFIndex, BGZFIndexEntry};
use crate::{deflate::*, BGZFError};
use std::convert::TryInto;
use std::io::{self, Write};
pub struct BGZFWriter<W: io::Write> {
writer: W,
original_data: Vec<u8>,
compressed_buffer: Vec<u8>,
compress: Compress,
compress_unit_size: usize,
closed: bool,
current_compressed_pos: u64,
current_uncompressed_pos: u64,
bgzf_index: Option<BGZFIndex>,
}
pub const DEFAULT_COMPRESS_UNIT_SIZE: usize = 65280;
pub const MAXIMUM_COMPRESS_UNIT_SIZE: usize = 64 * 1024;
pub(crate) const EXTRA_COMPRESS_BUFFER_SIZE: usize = 200;
impl<W: io::Write> BGZFWriter<W> {
pub fn new(writer: W, level: Compression) -> Self {
Self::with_compress_unit_size(writer, level, DEFAULT_COMPRESS_UNIT_SIZE, true)
.expect("Unreachable (BGZFWriter)")
}
pub fn with_compress_unit_size(
writer: W,
level: Compression,
compress_unit_size: usize,
create_index: bool,
) -> Result<Self, BGZFError> {
if compress_unit_size >= crate::write::MAXIMUM_COMPRESS_UNIT_SIZE {
return Err(BGZFError::TooLargeCompressUnit);
}
Ok(BGZFWriter {
writer,
original_data: Vec::with_capacity(compress_unit_size),
compressed_buffer: Vec::with_capacity(compress_unit_size + EXTRA_COMPRESS_BUFFER_SIZE),
compress_unit_size,
compress: Compress::new(level),
closed: false,
current_uncompressed_pos: 0,
current_compressed_pos: 0,
bgzf_index: if create_index {
Some(BGZFIndex::new())
} else {
None
},
})
}
pub fn bgzf_pos(&self) -> u64 {
self.current_compressed_pos << 16 | (self.original_data.len() & 0xffff) as u64
}
pub fn pos(&self) -> u64 {
self.current_uncompressed_pos + TryInto::<u64>::try_into(self.original_data.len()).unwrap()
}
fn write_block(&mut self) -> io::Result<()> {
self.compressed_buffer.clear();
write_block(
&mut self.compressed_buffer,
&self.original_data,
&mut self.compress,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.writer.write_all(&self.compressed_buffer)?;
self.current_uncompressed_pos +=
TryInto::<u64>::try_into(self.original_data.len()).unwrap();
self.current_compressed_pos +=
TryInto::<u64>::try_into(self.compressed_buffer.len()).unwrap();
if let Some(index) = self.bgzf_index.as_mut() {
index.entries.push(BGZFIndexEntry {
compressed_offset: self.current_compressed_pos,
uncompressed_offset: self.current_uncompressed_pos,
});
}
Ok(())
}
pub fn close(mut self) -> io::Result<Option<BGZFIndex>> {
if !self.closed {
self.flush()?;
self.writer.write_all(&crate::EOF_MARKER)?;
self.closed = true;
}
if let Some(index) = self.bgzf_index.as_mut() {
index.entries.pop();
}
Ok(self.bgzf_index.take())
}
}
impl<W: io::Write> io::Write for BGZFWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut process_start_pos = 0;
loop {
let to_write_bytes = (buf.len() - process_start_pos)
.min(self.compress_unit_size - self.original_data.len());
if to_write_bytes == 0 {
break;
}
self.original_data
.extend_from_slice(&buf[process_start_pos..(process_start_pos + to_write_bytes)]);
if self.original_data.len() >= self.compress_unit_size {
self.write_block()?;
self.original_data.clear();
}
process_start_pos += to_write_bytes;
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
if !self.original_data.is_empty() {
self.write_block()?;
}
Ok(())
}
}
impl<W: io::Write> Drop for BGZFWriter<W> {
fn drop(&mut self) {
if !self.closed {
self.flush().unwrap();
self.writer.write_all(&crate::EOF_MARKER).unwrap();
self.closed = true;
}
}
}
const FOOTER_SIZE: usize = 8;
pub fn write_block(
compressed_data: &mut Vec<u8>,
original_data: &[u8],
compress: &mut Compress,
) -> Result<usize, CompressError> {
let original_compressed_data_size = compressed_data.len();
let mut header = BGZFHeader::new(false, 0, 0);
let header_size: usize = header.header_size().try_into().unwrap();
compressed_data.resize(
original_compressed_data_size
+ original_data.len()
+ EXTRA_COMPRESS_BUFFER_SIZE
+ header_size
+ FOOTER_SIZE,
0,
);
let compressed_len = compress.compress(
original_data,
&mut compressed_data[(original_compressed_data_size + header_size)..],
)?;
compressed_data.truncate(original_compressed_data_size + header_size + compressed_len);
let mut crc = Crc::new();
crc.update(original_data);
compressed_data.extend_from_slice(&crc.sum().to_le_bytes());
compressed_data.extend_from_slice(&(original_data.len() as u32).to_le_bytes());
let block_size = compressed_data.len() - original_compressed_data_size;
header
.update_block_size(block_size.try_into().unwrap())
.expect("Unreachable");
header
.write(
&mut compressed_data
[original_compressed_data_size..(header_size + original_compressed_data_size)],
)
.expect("Failed to write header");
Ok(block_size)
}
#[cfg(test)]
mod test {
use crate::BGZFReader;
use crate::{deflate::Compression, BinaryReader};
use rand::prelude::*;
use super::*;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Read, Write};
#[test]
fn test_vcf() -> anyhow::Result<()> {
let mut data = Vec::new();
let mut reader = flate2::read::MultiGzDecoder::new(fs::File::open(
"testfiles/common_all_20180418_half.vcf.gz",
)?);
reader.read_to_end(&mut data)?;
let output_path = "tmp/test.vcf.gz";
let mut writer = BGZFWriter::new(fs::File::create(output_path)?, Compression::default());
writer.write_all(&data)?;
writer
.close()?
.unwrap()
.write(fs::File::create(format!("{}.gzi", output_path))?)?;
let mut reader = flate2::read::MultiGzDecoder::new(fs::File::open(output_path)?);
let mut wrote_data = Vec::new();
reader.read_to_end(&mut wrote_data)?;
assert_eq!(wrote_data.len(), data.len());
Ok(())
}
#[test]
fn test_simple() -> anyhow::Result<()> {
let output_path = "tmp/simple1.txt.gz";
let mut writer = BGZFWriter::new(fs::File::create(output_path)?, Compression::default());
writer.write_all(b"1234")?;
std::mem::drop(writer);
let mut reader = flate2::read::MultiGzDecoder::new(std::fs::File::open(output_path)?);
let mut data = Vec::new();
reader.read_to_end(&mut data)?;
assert_eq!(data, b"1234");
Ok(())
}
#[test]
fn test_write_bed() -> anyhow::Result<()> {
const TEST_OUTPUT_PATH: &str = "tmp/test.bed.gz";
let mut writer =
BGZFWriter::new(fs::File::create(TEST_OUTPUT_PATH)?, Compression::default());
let mut all_data = Vec::new();
let mut data_reader =
flate2::read::MultiGzDecoder::new(fs::File::open("testfiles/generated.bed.gz")?);
data_reader.read_to_end(&mut all_data)?;
writer.write_all(&all_data)?;
std::mem::drop(data_reader);
std::mem::drop(writer);
let mut result_data = Vec::new();
let mut result_reader =
flate2::read::MultiGzDecoder::new(BufReader::new(File::open(TEST_OUTPUT_PATH)?));
result_reader.read_to_end(&mut result_data)?;
assert_eq!(result_data, all_data);
let mut result_reader = BufReader::new(File::open(TEST_OUTPUT_PATH)?);
let mut decompress = flate2::Decompress::new(false);
loop {
let header = crate::header::BGZFHeader::from_reader(&mut result_reader)?;
assert_eq!(header.comment, None);
assert_eq!(header.file_name, None);
assert_eq!(header.modified_time, 0);
let block_size = header.block_size()?;
let compressed_data_len = block_size as i64 - 20 - 6;
let mut compressed_data = vec![0u8; compressed_data_len as usize];
result_reader.read_exact(&mut compressed_data)?;
let crc32 = result_reader.read_le_u32()?;
let uncompressed_data_len = result_reader.read_le_u32()?;
if uncompressed_data_len == 0 {
break;
}
let mut decompressed_data = vec![0u8; (uncompressed_data_len) as usize];
decompress.reset(false);
assert_eq!(
decompress.decompress(
&compressed_data,
&mut decompressed_data,
flate2::FlushDecompress::None,
)?,
flate2::Status::StreamEnd
);
assert_eq!(decompressed_data.len(), uncompressed_data_len as usize);
let mut crc = flate2::Crc::new();
crc.update(&decompressed_data);
assert_eq!(crc.sum(), crc32);
}
let mut buf = vec![0u8; 100];
assert_eq!(result_reader.read(&mut buf)?, 0);
Ok(())
}
#[test]
fn test_bgzf_pos() -> anyhow::Result<()> {
let mut data_reader = std::io::BufReader::new(flate2::read::MultiGzDecoder::new(
fs::File::open("testfiles/generated.bed.gz")?,
));
let mut line = String::new();
let mut line_list = Vec::new();
let mut writer = BGZFWriter::new(
fs::File::create("tmp/write-pos.bed.gz")?,
Compression::default(),
);
loop {
let pos = writer.bgzf_pos();
line.clear();
let size = data_reader.read_line(&mut line)?;
if size == 0 {
break;
}
writer.write_all(&line.as_bytes())?;
line_list.push((pos, line.clone()));
}
writer.close()?;
let mut rand = rand_pcg::Pcg64Mcg::seed_from_u64(0x9387402456157523);
let mut reader = BGZFReader::new(fs::File::open("tmp/write-pos.bed.gz")?)?;
for _ in 0..300 {
let i = rand.gen_range(0..line_list.len());
reader.bgzf_seek(line_list[i].0)?;
line.clear();
reader.read_line(&mut line)?;
assert_eq!(line, line_list[i].1);
}
Ok(())
}
}