use std::io::{BufWriter, Write};
use byteorder::{LittleEndian, WriteBytesExt};
use rand::{SeedableRng, rngs::SmallRng};
use super::FileHeader;
use crate::{
Policy, RNG_SEED, SequencingRecord,
error::{Result, WriteError},
};
pub fn write_flag<W: Write>(writer: &mut W, flag: u64) -> Result<()> {
writer.write_u64::<LittleEndian>(flag)?;
Ok(())
}
pub fn write_buffer<W: Write>(writer: &mut W, ebuf: &[u64]) -> Result<()> {
ebuf.iter()
.try_for_each(|&x| writer.write_u64::<LittleEndian>(x))?;
Ok(())
}
#[derive(Clone)]
pub struct Encoder {
header: FileHeader,
sbuffer: Vec<u64>, xbuffer: Vec<u64>,
s_ibuf: Vec<u8>, x_ibuf: Vec<u8>,
policy: Policy,
rng: SmallRng,
}
impl Encoder {
#[must_use]
pub fn new(header: FileHeader) -> Self {
Self::with_policy(header, Policy::default())
}
#[must_use]
pub fn with_policy(header: FileHeader, policy: Policy) -> Self {
Self {
header,
policy,
sbuffer: Vec::default(),
xbuffer: Vec::default(),
s_ibuf: Vec::default(),
x_ibuf: Vec::default(),
rng: SmallRng::seed_from_u64(RNG_SEED),
}
}
#[must_use]
pub fn is_paired(&self) -> bool {
self.header.is_paired()
}
pub fn encode_single(&mut self, primary: &[u8]) -> Result<Option<&[u64]>> {
if primary.len() != self.header.slen as usize {
return Err(WriteError::UnexpectedSequenceLength {
expected: self.header.slen,
got: primary.len(),
}
.into());
}
self.clear();
if self.header.bits.encode(primary, &mut self.sbuffer).is_err() {
self.clear();
if self
.policy
.handle(primary, &mut self.s_ibuf, &mut self.rng)?
{
self.header.bits.encode(&self.s_ibuf, &mut self.sbuffer)?;
} else {
return Ok(None);
}
}
Ok(Some(&self.sbuffer))
}
pub fn encode_paired(
&mut self,
primary: &[u8],
extended: &[u8],
) -> Result<Option<(&[u64], &[u64])>> {
if primary.len() != self.header.slen as usize {
return Err(WriteError::UnexpectedSequenceLength {
expected: self.header.slen,
got: primary.len(),
}
.into());
}
if extended.len() != self.header.xlen as usize {
return Err(WriteError::UnexpectedSequenceLength {
expected: self.header.xlen,
got: extended.len(),
}
.into());
}
self.clear();
if self.header.bits.encode(primary, &mut self.sbuffer).is_err()
|| self
.header
.bits
.encode(extended, &mut self.xbuffer)
.is_err()
{
self.clear();
if self
.policy
.handle(primary, &mut self.s_ibuf, &mut self.rng)?
&& self
.policy
.handle(extended, &mut self.x_ibuf, &mut self.rng)?
{
self.header.bits.encode(&self.s_ibuf, &mut self.sbuffer)?;
self.header.bits.encode(&self.x_ibuf, &mut self.xbuffer)?;
} else {
return Ok(None);
}
}
Ok(Some((&self.sbuffer, &self.xbuffer)))
}
pub fn clear(&mut self) {
self.sbuffer.clear();
self.xbuffer.clear();
self.s_ibuf.clear();
self.x_ibuf.clear();
}
}
#[derive(Default)]
pub struct WriterBuilder {
header: Option<FileHeader>,
policy: Option<Policy>,
headless: Option<bool>,
}
impl WriterBuilder {
#[must_use]
pub fn header(mut self, header: FileHeader) -> Self {
self.header = Some(header);
self
}
#[must_use]
pub fn policy(mut self, policy: Policy) -> Self {
self.policy = Some(policy);
self
}
#[must_use]
pub fn headless(mut self, headless: bool) -> Self {
self.headless = Some(headless);
self
}
pub fn build<W: Write>(self, inner: W) -> Result<Writer<W>> {
let Some(header) = self.header else {
return Err(WriteError::MissingHeader.into());
};
Writer::new(
inner,
header,
self.policy.unwrap_or_default(),
self.headless.unwrap_or(false),
)
}
}
#[derive(Clone)]
pub struct Writer<W: Write> {
inner: W,
encoder: Encoder,
headless: bool,
}
impl<W: Write> Writer<W> {
pub fn new(mut inner: W, header: FileHeader, policy: Policy, headless: bool) -> Result<Self> {
if !headless {
header.write_bytes(&mut inner)?;
}
Ok(Self {
inner,
encoder: Encoder::with_policy(header, policy),
headless,
})
}
pub fn is_paired(&self) -> bool {
self.encoder.is_paired()
}
pub fn header(&self) -> FileHeader {
self.encoder.header
}
pub fn policy(&self) -> Policy {
self.encoder.policy
}
#[deprecated]
pub fn write_record(&mut self, flag: Option<u64>, primary: &[u8]) -> Result<bool> {
let has_flag = self.encoder.header.flags;
if let Some(sbuffer) = self.encoder.encode_single(primary)? {
if has_flag {
write_flag(&mut self.inner, flag.unwrap_or(0))?;
}
write_buffer(&mut self.inner, sbuffer)?;
Ok(true)
} else {
Ok(false)
}
}
#[deprecated]
pub fn write_paired_record(
&mut self,
flag: Option<u64>,
primary: &[u8],
extended: &[u8],
) -> Result<bool> {
let has_flag = self.encoder.header.flags;
if let Some((sbuffer, xbuffer)) = self.encoder.encode_paired(primary, extended)? {
if has_flag {
write_flag(&mut self.inner, flag.unwrap_or(0))?;
}
write_buffer(&mut self.inner, sbuffer)?;
write_buffer(&mut self.inner, xbuffer)?;
Ok(true)
} else {
Ok(false)
}
}
pub fn push(&mut self, record: SequencingRecord) -> Result<bool> {
let has_flag = self.encoder.header.flags;
if has_flag {
write_flag(&mut self.inner, record.flag().unwrap_or(0))?;
}
if self.encoder.header.is_paired() && !record.is_paired() {
return Err(WriteError::ConfigurationMismatch {
attribute: "paired",
expected: self.encoder.header.is_paired(),
actual: record.is_paired(),
}
.into());
}
if self.encoder.header.is_paired() {
if let Some((sbuffer, xbuffer)) = self
.encoder
.encode_paired(record.s_seq, record.x_seq.unwrap_or_default())?
{
write_buffer(&mut self.inner, sbuffer)?;
write_buffer(&mut self.inner, xbuffer)?;
Ok(true)
} else {
Ok(false)
}
} else if let Some(buffer) = self.encoder.encode_single(record.s_seq)? {
write_buffer(&mut self.inner, buffer)?;
Ok(true)
} else {
Ok(false)
}
}
pub fn into_inner(self) -> W {
self.inner
}
pub fn by_ref(&mut self) -> &mut W {
&mut self.inner
}
pub fn flush(&mut self) -> Result<()> {
self.inner.flush()?;
Ok(())
}
pub fn new_encoder(&self) -> Encoder {
let mut encoder = self.encoder.clone();
encoder.clear();
encoder
}
pub fn is_headless(&self) -> bool {
self.headless
}
pub fn ingest(&mut self, other: &mut Writer<Vec<u8>>) -> Result<()> {
let other_inner = other.by_ref();
self.inner.write_all(other_inner)?;
other_inner.clear();
Ok(())
}
}
pub struct StreamWriter<W: Write> {
writer: Writer<BufWriter<W>>,
}
impl<W: Write> StreamWriter<W> {
pub fn new(inner: W, header: FileHeader, policy: Policy, headless: bool) -> Result<Self> {
Self::with_capacity(inner, 8192, header, policy, headless)
}
pub fn with_capacity(
inner: W,
capacity: usize,
header: FileHeader,
policy: Policy,
headless: bool,
) -> Result<Self> {
let buffered = BufWriter::with_capacity(capacity, inner);
let writer = Writer::new(buffered, header, policy, headless)?;
Ok(Self { writer })
}
#[deprecated(note = "use `push` method with SequencingRecord instead")]
pub fn write_record(&mut self, flag: Option<u64>, primary: &[u8]) -> Result<bool> {
#[allow(deprecated)]
self.writer.write_record(flag, primary)
}
#[deprecated(note = "use `push` method with SequencingRecord instead")]
pub fn write_paired_record(
&mut self,
flag: Option<u64>,
primary: &[u8],
extended: &[u8],
) -> Result<bool> {
#[allow(deprecated)]
self.writer.write_paired_record(flag, primary, extended)
}
pub fn push(&mut self, record: SequencingRecord) -> Result<bool> {
self.writer.push(record)
}
pub fn flush(&mut self) -> Result<()> {
self.writer.flush()
}
pub fn into_inner(self) -> Result<W> {
let bufw = self.writer.into_inner();
match bufw.into_inner() {
Ok(inner) => Ok(inner),
Err(e) => Err(std::io::Error::from(e).into()),
}
}
}
#[derive(Default)]
pub struct StreamWriterBuilder {
header: Option<FileHeader>,
policy: Option<Policy>,
headless: Option<bool>,
buffer_capacity: Option<usize>,
}
impl StreamWriterBuilder {
#[must_use]
pub fn header(mut self, header: FileHeader) -> Self {
self.header = Some(header);
self
}
#[must_use]
pub fn policy(mut self, policy: Policy) -> Self {
self.policy = Some(policy);
self
}
#[must_use]
pub fn headless(mut self, headless: bool) -> Self {
self.headless = Some(headless);
self
}
#[must_use]
pub fn buffer_capacity(mut self, capacity: usize) -> Self {
self.buffer_capacity = Some(capacity);
self
}
pub fn build<W: Write>(self, inner: W) -> Result<StreamWriter<W>> {
let Some(header) = self.header else {
return Err(WriteError::MissingHeader.into());
};
let capacity = self.buffer_capacity.unwrap_or(8192);
StreamWriter::with_capacity(
inner,
capacity,
header,
self.policy.unwrap_or_default(),
self.headless.unwrap_or(false),
)
}
}
#[cfg(test)]
mod testing {
use std::{fs::File, io::BufWriter};
use super::*;
use crate::bq::{FileHeaderBuilder, SIZE_HEADER};
#[test]
fn test_headless() -> Result<()> {
let inner = Vec::new();
let mut writer = WriterBuilder::default()
.header(FileHeaderBuilder::new().slen(32).build()?)
.headless(true)
.build(inner)?;
assert!(writer.is_headless());
let inner = writer.by_ref();
assert!(inner.is_empty());
Ok(())
}
#[test]
fn test_not_headless() -> Result<()> {
let inner = Vec::new();
let mut writer = WriterBuilder::default()
.header(FileHeaderBuilder::new().slen(32).build()?)
.build(inner)?;
assert!(!writer.is_headless());
let inner = writer.by_ref();
assert_eq!(inner.len(), SIZE_HEADER);
Ok(())
}
#[test]
fn test_stdout() -> Result<()> {
let writer = WriterBuilder::default()
.header(FileHeaderBuilder::new().slen(32).build()?)
.build(std::io::stdout())?;
assert!(!writer.is_headless());
Ok(())
}
#[test]
fn test_to_path() -> Result<()> {
let path = "test_to_path.file";
let inner = File::create(path).map(BufWriter::new)?;
let mut writer = WriterBuilder::default()
.header(FileHeaderBuilder::new().slen(32).build()?)
.build(inner)?;
assert!(!writer.is_headless());
let inner = writer.by_ref();
inner.flush()?;
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn test_stream_writer() -> Result<()> {
let inner = Vec::new();
let writer = StreamWriterBuilder::default()
.header(FileHeaderBuilder::new().slen(32).build()?)
.buffer_capacity(16384)
.build(inner)?;
let inner = writer.into_inner()?;
assert_eq!(inner.len(), SIZE_HEADER);
Ok(())
}
}