use std::io::Write;
use bitnuc::BitSize;
use byteorder::{LittleEndian, WriteBytesExt};
use rand::SeedableRng;
use rand::rngs::SmallRng;
use zstd::stream::copy_encode;
use super::header::{BlockHeader, FileHeader};
use crate::SequencingRecord;
use crate::error::{Result, WriteError};
use crate::policy::{Policy, RNG_SEED};
use crate::vbq::header::{SIZE_BLOCK_HEADER, SIZE_HEADER};
use crate::vbq::index::{INDEX_END_MAGIC, IndexHeader};
use crate::vbq::{BlockIndex, BlockRange};
#[derive(Default)]
pub struct WriterBuilder {
header: Option<FileHeader>,
policy: Option<Policy>,
headless: Option<bool>,
}
impl WriterBuilder {
#[must_use]
pub fn header(mut self, header: FileHeader) -> Self {
self.header = Some(header);
self
}
#[must_use]
pub fn policy(mut self, policy: Policy) -> Self {
self.policy = Some(policy);
self
}
#[must_use]
pub fn headless(mut self, headless: bool) -> Self {
self.headless = Some(headless);
self
}
pub fn build<W: Write>(self, inner: W) -> Result<Writer<W>> {
Writer::new(
inner,
self.header.unwrap_or_default(),
self.policy.unwrap_or_default(),
self.headless.unwrap_or(false),
)
}
}
#[derive(Clone)]
pub struct Writer<W: Write> {
inner: W,
header: FileHeader,
encoder: Encoder,
cblock: BlockWriter,
ranges: Vec<BlockRange>,
bytes_written: usize,
records_written: usize,
index_written: bool,
}
impl<W: Write> Writer<W> {
pub fn new(inner: W, header: FileHeader, policy: Policy, headless: bool) -> Result<Self> {
let mut wtr = Self {
inner,
header,
encoder: Encoder::with_policy(header.bits, policy),
cblock: BlockWriter::new(
header.block as usize,
header.compressed,
header.flags,
header.qual,
header.headers,
),
ranges: Vec::new(),
bytes_written: 0,
records_written: 0,
index_written: false,
};
if !headless {
wtr.init()?;
}
Ok(wtr)
}
fn init(&mut self) -> Result<()> {
self.header.write_bytes(&mut self.inner)?;
self.bytes_written += SIZE_HEADER;
Ok(())
}
pub fn is_paired(&self) -> bool {
self.header.paired
}
pub fn header(&self) -> FileHeader {
self.header
}
pub fn policy(&self) -> Policy {
self.encoder.policy
}
pub fn has_quality(&self) -> bool {
self.header.qual
}
pub fn has_headers(&self) -> bool {
self.header.headers
}
#[deprecated(note = "use `push` method with SequencingRecord instead")]
pub fn write_record(
&mut self,
flag: Option<u64>,
header: Option<&[u8]>,
sequence: &[u8],
quality: Option<&[u8]>,
) -> Result<bool> {
let record = SequencingRecord::new(sequence, quality, header, None, None, None, flag);
self.push(record)
}
#[deprecated(note = "use `push` method with SequencingRecord instead")]
pub fn write_paired_record(
&mut self,
flag: Option<u64>,
s_header: Option<&[u8]>,
s_sequence: &[u8],
s_qual: Option<&[u8]>,
x_header: Option<&[u8]>,
x_sequence: &[u8],
x_qual: Option<&[u8]>,
) -> Result<bool> {
let record = SequencingRecord::new(
s_sequence,
s_qual,
s_header,
Some(x_sequence),
x_qual,
x_header,
flag,
);
self.push(record)
}
pub fn push(&mut self, record: SequencingRecord) -> Result<bool> {
if self.header.paired && !record.is_paired() {
return Err(WriteError::ConfigurationMismatch {
attribute: "paired",
expected: self.header.paired,
actual: record.is_paired(),
}
.into());
}
if self.header.qual && !record.has_qualities() {
return Err(WriteError::ConfigurationMismatch {
attribute: "qual",
expected: self.header.qual,
actual: record.has_qualities(),
}
.into());
}
if self.header.headers && !record.has_headers() {
return Err(WriteError::ConfigurationMismatch {
attribute: "headers",
expected: self.header.headers,
actual: record.has_headers(),
}
.into());
}
let record_size = record.configured_size_vbq(
self.header.paired,
self.header.flags,
self.header.headers,
self.header.qual,
self.header.bits,
);
if self.header.is_paired() {
if let Some((sbuffer, xbuffer)) = self
.encoder
.encode_paired(record.s_seq, record.x_seq.unwrap_or_default())?
{
if self.cblock.exceeds_block_size(record_size)? {
impl_flush_block(
&mut self.inner,
&mut self.cblock,
&mut self.ranges,
&mut self.bytes_written,
&mut self.records_written,
)?;
}
self.cblock.write_record(&record, sbuffer, Some(xbuffer))?;
Ok(true)
} else {
Ok(false)
}
} else {
if let Some(sbuffer) = self.encoder.encode_single(record.s_seq)? {
if self.cblock.exceeds_block_size(record_size)? {
impl_flush_block(
&mut self.inner,
&mut self.cblock,
&mut self.ranges,
&mut self.bytes_written,
&mut self.records_written,
)?;
}
self.cblock.write_record(&record, sbuffer, None)?;
Ok(true)
} else {
Ok(false)
}
}
}
pub fn finish(&mut self) -> Result<()> {
impl_flush_block(
&mut self.inner,
&mut self.cblock,
&mut self.ranges,
&mut self.bytes_written,
&mut self.records_written,
)?;
self.inner.flush()?;
if !self.index_written {
self.write_index()?;
self.index_written = true;
}
Ok(())
}
fn by_ref(&mut self) -> &mut W {
self.inner.by_ref()
}
fn cblock_mut(&mut self) -> &mut BlockWriter {
&mut self.cblock
}
pub fn ingest(&mut self, other: &mut Writer<Vec<u8>>) -> Result<()> {
if self.header != other.header {
return Err(WriteError::IncompatibleHeaders(self.header, other.header).into());
}
{
self.inner.write_all(other.by_ref())?;
other.by_ref().clear();
}
{
for range in other.ranges.drain(..) {
let updated_range = BlockRange::new(
self.bytes_written as u64, range.len,
range.block_records,
self.records_written as u64, );
self.ranges.push(updated_range);
self.bytes_written += (range.len + SIZE_BLOCK_HEADER as u64) as usize;
self.records_written += range.block_records as usize;
}
other.bytes_written = 0;
other.records_written = 0;
}
{
let header = self.cblock.ingest(other.cblock_mut(), &mut self.inner)?;
if !header.is_empty() {
let range = BlockRange::new(
self.bytes_written as u64,
header.size,
header.records,
self.records_written as u64,
);
self.ranges.push(range);
self.bytes_written += header.size_with_header();
self.records_written += header.records as usize;
}
}
Ok(())
}
pub fn write_index(&mut self) -> Result<()> {
let index_header = IndexHeader::new(self.bytes_written as u64);
let block_index = BlockIndex {
header: index_header,
ranges: self.ranges.clone(),
};
let mut buffer = Vec::new();
block_index.write_bytes(&mut buffer)?;
let n_bytes = buffer.len() as u64;
self.inner.write_all(&buffer)?;
self.inner.write_u64::<LittleEndian>(n_bytes)?;
self.inner.write_u64::<LittleEndian>(INDEX_END_MAGIC)?;
Ok(())
}
}
fn impl_flush_block<W: Write>(
writer: &mut W,
cblock: &mut BlockWriter,
ranges: &mut Vec<BlockRange>,
bytes_written: &mut usize,
records_written: &mut usize,
) -> Result<()> {
let block_header = cblock.flush(writer)?;
let range = BlockRange::new(
*bytes_written as u64,
block_header.size,
block_header.records,
*records_written as u64,
);
ranges.push(range);
*bytes_written += block_header.size_with_header();
*records_written += block_header.records as usize;
Ok(())
}
impl<W: Write> Drop for Writer<W> {
fn drop(&mut self) {
self.finish().expect("Writer: Failed to finish writing");
}
}
#[derive(Clone)]
struct BlockWriter {
pos: usize,
starts: Vec<usize>,
block_size: usize,
level: i32,
ubuf: Vec<u8>,
zbuf: Vec<u8>,
padding: Vec<u8>,
compress: bool,
has_flags: bool,
has_qualities: bool,
has_headers: bool,
}
impl BlockWriter {
fn new(
block_size: usize,
compress: bool,
has_flags: bool,
has_qualities: bool,
has_headers: bool,
) -> Self {
Self {
pos: 0,
starts: Vec::default(),
block_size,
level: 3,
ubuf: Vec::with_capacity(block_size),
zbuf: Vec::with_capacity(block_size),
padding: vec![0; block_size],
compress,
has_flags,
has_qualities,
has_headers,
}
}
fn exceeds_block_size(&self, record_size: usize) -> Result<bool> {
if record_size > self.block_size {
return Err(WriteError::RecordSizeExceedsMaximumBlockSize(
record_size,
self.block_size,
)
.into());
}
Ok(self.pos + record_size > self.block_size)
}
fn write_record(
&mut self,
record: &SequencingRecord,
sbuf: &[u64],
xbuf: Option<&[u64]>,
) -> Result<()> {
self.starts.push(self.pos);
if self.has_flags {
self.write_flag(record.flag.unwrap_or(0))?;
}
self.write_length(record.s_seq.len() as u64)?;
self.write_length(record.x_seq.map_or(0, <[u8]>::len) as u64)?;
self.write_buffer(sbuf)?;
if self.has_qualities
&& let Some(qual) = record.s_qual
{
self.write_u8buf(qual)?;
}
if self.has_headers
&& let Some(sheader) = record.s_header
{
self.write_length(sheader.len() as u64)?;
self.write_u8buf(sheader)?;
}
if let Some(xbuf) = xbuf {
self.write_buffer(xbuf)?;
}
if self.has_qualities
&& let Some(qual) = record.x_qual
{
self.write_u8buf(qual)?;
}
if self.has_headers
&& let Some(xheader) = record.x_header
{
self.write_length(xheader.len() as u64)?;
self.write_u8buf(xheader)?;
}
Ok(())
}
fn write_flag(&mut self, flag: u64) -> Result<()> {
self.ubuf.write_u64::<LittleEndian>(flag)?;
self.pos += 8;
Ok(())
}
fn write_length(&mut self, length: u64) -> Result<()> {
self.ubuf.write_u64::<LittleEndian>(length)?;
self.pos += 8;
Ok(())
}
fn write_buffer(&mut self, ebuf: &[u64]) -> Result<()> {
ebuf.iter()
.try_for_each(|&x| self.ubuf.write_u64::<LittleEndian>(x))?;
self.pos += 8 * ebuf.len();
Ok(())
}
fn write_u8buf(&mut self, buf: &[u8]) -> Result<()> {
self.ubuf.write_all(buf)?;
self.pos += buf.len();
Ok(())
}
fn flush_compressed<W: Write>(&mut self, inner: &mut W) -> Result<BlockHeader> {
copy_encode(self.ubuf.as_slice(), &mut self.zbuf, self.level)?;
let header = BlockHeader::new(self.zbuf.len() as u64, self.starts.len() as u32);
header.write_bytes(inner)?;
inner.write_all(&self.zbuf)?;
Ok(header)
}
fn flush_uncompressed<W: Write>(&mut self, inner: &mut W) -> Result<BlockHeader> {
let header = BlockHeader::new(self.block_size as u64, self.starts.len() as u32);
header.write_bytes(inner)?;
inner.write_all(&self.ubuf)?;
Ok(header)
}
fn flush<W: Write>(&mut self, inner: &mut W) -> Result<BlockHeader> {
if self.pos == 0 {
return Ok(BlockHeader::empty());
}
let bytes_to_next_start = self.block_size - self.pos;
self.ubuf.write_all(&self.padding[..bytes_to_next_start])?;
let header = if self.compress {
self.flush_compressed(inner)
} else {
self.flush_uncompressed(inner)
}?;
self.clear();
Ok(header)
}
fn clear(&mut self) {
self.pos = 0;
self.starts.clear();
self.ubuf.clear();
self.zbuf.clear();
}
fn ingest<W: Write>(&mut self, other: &mut Self, inner: &mut W) -> Result<BlockHeader> {
if self.block_size != other.block_size {
return Err(
WriteError::IncompatibleBlockSizes(self.block_size, other.block_size).into(),
);
}
let remaining = self.block_size - self.pos;
if other.pos <= remaining {
self.ingest_all(other)?;
Ok(BlockHeader::empty())
} else {
self.ingest_subset(other)?;
let header = self.flush(inner)?;
self.ingest_all(other)?;
Ok(header)
}
}
fn ingest_all(&mut self, other: &mut Self) -> Result<()> {
let n_bytes = other.pos;
self.ubuf.write_all(other.ubuf.drain(..).as_slice())?;
other
.starts
.drain(..)
.for_each(|start| self.starts.push(start + self.pos));
other.starts.iter_mut().for_each(|x| {
*x -= n_bytes;
});
self.pos += n_bytes;
other.clear();
Ok(())
}
fn ingest_subset(&mut self, other: &mut Self) -> Result<()> {
let remaining = self.block_size - self.pos;
let (start_index, end_byte) = other
.starts
.iter()
.enumerate()
.take_while(|(_idx, x)| **x <= remaining)
.last()
.map(|(idx, x)| (idx, *x))
.unwrap();
self.ubuf
.write_all(other.ubuf.drain(0..end_byte).as_slice())?;
other
.starts
.drain(0..start_index)
.for_each(|start| self.starts.push(start + self.pos));
other.starts.iter_mut().for_each(|x| {
*x -= end_byte;
});
self.pos += end_byte;
other.pos -= end_byte;
Ok(())
}
}
#[derive(Clone)]
pub struct Encoder {
bitsize: BitSize,
sbuffer: Vec<u64>,
xbuffer: Vec<u64>,
s_ibuf: Vec<u8>,
x_ibuf: Vec<u8>,
policy: Policy,
rng: SmallRng,
}
impl Encoder {
pub fn with_policy(bitsize: BitSize, policy: Policy) -> Self {
Self {
bitsize,
policy,
sbuffer: Vec::default(),
xbuffer: Vec::default(),
s_ibuf: Vec::default(),
x_ibuf: Vec::default(),
rng: SmallRng::seed_from_u64(RNG_SEED),
}
}
pub fn encode_single(&mut self, primary: &[u8]) -> Result<Option<&[u64]>> {
self.clear();
if self.bitsize.encode(primary, &mut self.sbuffer).is_err() {
self.clear();
if self
.policy
.handle(primary, &mut self.s_ibuf, &mut self.rng)?
{
self.bitsize.encode(&self.s_ibuf, &mut self.sbuffer)?;
} else {
return Ok(None);
}
}
Ok(Some(&self.sbuffer))
}
pub fn encode_paired(
&mut self,
primary: &[u8],
extended: &[u8],
) -> Result<Option<(&[u64], &[u64])>> {
self.clear();
if self.bitsize.encode(primary, &mut self.sbuffer).is_err()
|| self.bitsize.encode(extended, &mut self.xbuffer).is_err()
{
self.clear();
if self
.policy
.handle(primary, &mut self.s_ibuf, &mut self.rng)?
&& self
.policy
.handle(extended, &mut self.x_ibuf, &mut self.rng)?
{
self.bitsize.encode(&self.s_ibuf, &mut self.sbuffer)?;
self.bitsize.encode(&self.x_ibuf, &mut self.xbuffer)?;
} else {
return Ok(None);
}
}
Ok(Some((&self.sbuffer, &self.xbuffer)))
}
pub fn clear(&mut self) {
self.sbuffer.clear();
self.xbuffer.clear();
self.s_ibuf.clear();
self.x_ibuf.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SequencingRecordBuilder;
use crate::vbq::{FileHeaderBuilder, header::SIZE_HEADER};
#[test]
fn test_headless_writer() -> super::Result<()> {
let writer = WriterBuilder::default().headless(true).build(Vec::new())?;
assert_eq!(writer.inner.len(), 0);
let writer = WriterBuilder::default().headless(false).build(Vec::new())?;
assert_eq!(writer.inner.len(), SIZE_HEADER);
Ok(())
}
#[test]
fn test_ingest_empty_writer() -> super::Result<()> {
let header = FileHeaderBuilder::new().build();
let mut source = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
let mut dest = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
dest.ingest(&mut source)?;
let source_vec = source.by_ref();
let dest_vec = dest.by_ref();
assert_eq!(source_vec.len(), 0);
assert_eq!(dest_vec.len(), 0);
Ok(())
}
#[test]
fn test_ingest_single_record() -> super::Result<()> {
let header = FileHeaderBuilder::new().build();
let mut source = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
let record = SequencingRecordBuilder::default()
.s_seq(b"ACGTACGTACGT")
.flag(1)
.build()?;
source.push(record)?;
assert!(source.by_ref().is_empty());
let mut dest = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
dest.ingest(&mut source)?;
let source_vec = source.by_ref();
assert_eq!(source_vec.len(), 0);
let source_ubuf = &source.cblock.ubuf;
assert!(source_ubuf.is_empty());
let dest_vec = dest.by_ref();
assert!(dest_vec.is_empty());
let dest_ubuf = &dest.cblock.ubuf;
assert!(!dest_ubuf.is_empty());
Ok(())
}
#[test]
fn test_ingest_multi_record() -> super::Result<()> {
let header = FileHeaderBuilder::new().build();
let mut source = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
for _ in 0..30 {
let record = SequencingRecordBuilder::default()
.s_seq(b"ACGTACGTACGT")
.flag(1)
.build()?;
source.push(record)?;
}
assert!(source.by_ref().is_empty());
let mut dest = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
dest.ingest(&mut source)?;
let source_vec = source.by_ref();
assert_eq!(source_vec.len(), 0);
let source_ubuf = &source.cblock.ubuf;
assert!(source_ubuf.is_empty());
let dest_vec = dest.by_ref();
assert!(dest_vec.is_empty());
let dest_ubuf = &dest.cblock.ubuf;
assert!(!dest_ubuf.is_empty());
Ok(())
}
#[test]
fn test_ingest_block_boundary() -> super::Result<()> {
let header = FileHeaderBuilder::new().build();
let mut source = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
for _ in 0..30000 {
let record = SequencingRecordBuilder::default()
.s_seq(b"ACGTACGTACGT")
.flag(1)
.build()?;
source.push(record)?;
}
assert!(!source.by_ref().is_empty());
let mut dest = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
dest.ingest(&mut source)?;
let source_vec = source.by_ref();
assert_eq!(source_vec.len(), 0);
let source_ubuf = &source.cblock.ubuf;
assert!(source_ubuf.is_empty());
let dest_vec = dest.by_ref();
assert!(!dest_vec.is_empty());
let dest_ubuf = &dest.cblock.ubuf;
assert!(!dest_ubuf.is_empty());
Ok(())
}
#[test]
fn test_ingest_with_quality_scores() -> super::Result<()> {
let source_header = FileHeaderBuilder::new().qual(true).build();
let dest_header = FileHeaderBuilder::new().qual(true).build();
let mut source = WriterBuilder::default()
.header(source_header)
.headless(true)
.build(Vec::new())?;
let seq = b"ACGTACGTACGT";
let qual = vec![40u8; seq.len()];
for i in 0..5 {
let record = SequencingRecordBuilder::default()
.s_seq(seq)
.s_qual(&qual)
.flag(i)
.build()?;
source.push(record)?;
}
let mut dest = WriterBuilder::default()
.header(dest_header)
.headless(true)
.build(Vec::new())?;
dest.ingest(&mut source)?;
let source_vec = source.by_ref();
assert_eq!(source_vec.len(), 0);
let dest_ubuf = &dest.cblock.ubuf;
assert!(!dest_ubuf.is_empty());
Ok(())
}
#[test]
fn test_ingest_with_compression() -> super::Result<()> {
let header = FileHeaderBuilder::new().compressed(true).build();
let mut source = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
for _ in 0..30000 {
let record = SequencingRecordBuilder::default()
.s_seq(b"ACGTACGTACGT")
.flag(1)
.build()?;
source.push(record)?;
}
let mut dest = WriterBuilder::default()
.header(header)
.headless(true)
.build(Vec::new())?;
dest.ingest(&mut source)?;
let source_vec = source.by_ref();
assert_eq!(source_vec.len(), 0);
let source_ubuf = &source.cblock.ubuf;
assert!(source_ubuf.is_empty());
let dest_vec = dest.by_ref();
assert!(!dest_vec.is_empty());
let dest_ubuf = &dest.cblock.ubuf;
assert!(!dest_ubuf.is_empty());
Ok(())
}
#[test]
fn test_ingest_incompatible_headers() -> super::Result<()> {
let source_header = FileHeaderBuilder::new().build();
let dest_header = FileHeaderBuilder::new().qual(true).build();
let mut source = WriterBuilder::default()
.header(source_header)
.headless(true)
.build(Vec::new())?;
let mut dest = WriterBuilder::default()
.header(dest_header)
.headless(true)
.build(Vec::new())?;
assert!(dest.ingest(&mut source).is_err());
Ok(())
}
#[test]
fn test_index_always_written_on_finish() -> super::Result<()> {
use crate::vbq::index::INDEX_END_MAGIC;
use byteorder::{ByteOrder, LittleEndian};
let header = FileHeaderBuilder::new().build();
let mut writer = WriterBuilder::default().header(header).build(Vec::new())?;
for i in 0..10 {
let record = SequencingRecordBuilder::default()
.s_seq(b"ACGTACGTACGT")
.flag(i)
.build()?;
writer.push(record)?;
}
writer.finish()?;
let bytes = &writer.inner;
assert!(bytes.len() >= 8, "File is too short to contain index");
let magic_offset = bytes.len() - 8;
let magic = LittleEndian::read_u64(&bytes[magic_offset..]);
assert_eq!(
magic, INDEX_END_MAGIC,
"Index magic number not found at end of file"
);
assert!(bytes.len() >= 16, "File is too short to contain index size");
let size_offset = bytes.len() - 16;
let index_size = LittleEndian::read_u64(&bytes[size_offset..size_offset + 8]);
assert!(index_size > 0, "Index size should be greater than 0");
assert!(
index_size < bytes.len() as u64,
"Index size is larger than file"
);
Ok(())
}
#[test]
fn test_finish_idempotent() -> super::Result<()> {
use crate::vbq::index::INDEX_END_MAGIC;
use byteorder::{ByteOrder, LittleEndian};
let header = FileHeaderBuilder::new().build();
let mut writer = WriterBuilder::default().header(header).build(Vec::new())?;
for i in 0..10 {
let record = SequencingRecordBuilder::default()
.s_seq(b"ACGTACGTACGT")
.flag(i)
.build()?;
writer.push(record)?;
}
writer.finish()?;
let size_after_first_finish = writer.inner.len();
writer.finish()?;
let size_after_second_finish = writer.inner.len();
writer.finish()?;
let size_after_third_finish = writer.inner.len();
assert_eq!(size_after_first_finish, size_after_second_finish);
assert_eq!(size_after_second_finish, size_after_third_finish);
let bytes = &writer.inner;
let magic_offset = bytes.len() - 8;
let magic = LittleEndian::read_u64(&bytes[magic_offset..]);
assert_eq!(magic, INDEX_END_MAGIC);
Ok(())
}
}