use std::io;
use bitnuc::BitSize;
use bytemuck::{cast_slice, cast_slice_mut};
use sucds::Serializable;
use sucds::mii_sequences::{EliasFano, EliasFanoBuilder};
use zstd::stream::copy_decode;
use zstd::zstd_safe;
use crate::cbq::core::utils::sized_compress;
use crate::error::{CbqError, WriteError};
use crate::{BinseqRecord, DEFAULT_QUALITY_SCORE, Result};
use super::utils::{Span, calculate_offsets, extension_read, resize_uninit, slice_and_increment};
use super::{BlockHeader, BlockRange, FileHeader};
use crate::SequencingRecord;
#[derive(Clone, Default)]
pub struct ColumnarBlock {
seq: Vec<u8>,
flags: Vec<u64>,
headers: Vec<u8>,
qual: Vec<u8>,
pub(crate) l_seq: Vec<u64>,
pub(crate) l_headers: Vec<u64>,
pub(crate) npos: Vec<u64>,
ebuf: Vec<u64>,
pub(crate) ef: Option<EliasFano>,
pub(crate) ef_bytes: Vec<u8>,
pub(crate) len_nef: usize,
pub(crate) z_seq_len: Vec<u8>,
pub(crate) z_header_len: Vec<u8>,
pub(crate) z_npos: Vec<u8>,
pub(crate) z_seq: Vec<u8>,
pub(crate) z_flags: Vec<u8>,
pub(crate) z_headers: Vec<u8>,
pub(crate) z_qual: Vec<u8>,
l_seq_offsets: Vec<u64>,
l_header_offsets: Vec<u64>,
pub(crate) num_records: usize,
pub(crate) num_sequences: usize,
pub(crate) nuclen: usize,
pub(crate) num_npos: usize,
current_size: usize,
qbuf: Vec<u8>,
default_quality_score: u8,
pub(crate) header: FileHeader,
}
impl ColumnarBlock {
#[must_use]
pub fn new(header: FileHeader) -> Self {
Self {
header,
default_quality_score: DEFAULT_QUALITY_SCORE,
..Default::default()
}
}
pub fn set_default_quality_score(&mut self, score: u8) {
self.default_quality_score = score;
self.qbuf.clear();
}
fn is_empty(&self) -> bool {
self.current_size == 0
}
pub(crate) fn clear(&mut self) {
{
self.nuclen = 0;
self.num_sequences = 0;
self.num_records = 0;
self.current_size = 0;
self.num_npos = 0;
}
{
self.l_seq.clear();
self.l_headers.clear();
self.l_seq_offsets.clear();
self.l_header_offsets.clear();
}
{
self.seq.clear();
self.flags.clear();
self.headers.clear();
self.qual.clear();
self.npos.clear();
self.ef = None;
}
{
self.ebuf.clear();
self.z_seq_len.clear();
self.z_header_len.clear();
self.z_npos.clear();
self.z_seq.clear();
self.z_flags.clear();
self.z_headers.clear();
self.z_qual.clear();
self.ef_bytes.clear();
}
}
fn add_sequence(&mut self, record: &SequencingRecord) -> Result<()> {
self.l_seq.push(record.s_seq.len() as u64);
self.seq.extend_from_slice(record.s_seq);
self.num_sequences += 1;
if self.header.is_paired() {
let Some(x_seq) = record.x_seq else {
return Err(WriteError::ConfigurationMismatch {
attribute: "x_seq",
expected: true,
actual: false,
}
.into());
};
self.l_seq.push(x_seq.len() as u64);
self.seq.extend_from_slice(x_seq);
self.num_sequences += 1;
}
self.nuclen = self.seq.len();
Ok(())
}
fn add_flag(&mut self, record: &SequencingRecord) -> Result<()> {
if self.header.has_flags() {
let Some(flag) = record.flag else {
return Err(WriteError::ConfigurationMismatch {
attribute: "flag",
expected: true,
actual: false,
}
.into());
};
self.flags.push(flag);
}
Ok(())
}
fn add_headers(&mut self, record: &SequencingRecord) -> Result<()> {
if self.header.has_headers() {
let Some(sheader) = record.s_header else {
return Err(WriteError::ConfigurationMismatch {
attribute: "s_header",
expected: true,
actual: false,
}
.into());
};
self.l_headers.push(sheader.len() as u64);
self.headers.extend_from_slice(sheader);
if self.header.is_paired() {
let Some(xheader) = record.x_header else {
return Err(WriteError::ConfigurationMismatch {
attribute: "x_header",
expected: true,
actual: false,
}
.into());
};
self.l_headers.push(xheader.len() as u64);
self.headers.extend_from_slice(xheader);
}
}
Ok(())
}
fn add_quality(&mut self, record: &SequencingRecord) -> Result<()> {
if self.header.has_qualities() {
let Some(squal) = record.s_qual() else {
return Err(WriteError::ConfigurationMismatch {
attribute: "s_qual",
expected: true,
actual: false,
}
.into());
};
self.qual.extend_from_slice(squal);
if self.header.is_paired() {
let Some(xqual) = record.x_qual() else {
return Err(WriteError::ConfigurationMismatch {
attribute: "x_qual",
expected: true,
actual: false,
}
.into());
};
self.qual.extend_from_slice(xqual);
}
}
Ok(())
}
#[must_use]
pub fn usage(&self) -> f64 {
self.current_size as f64 / self.header.block_size as f64
}
pub(crate) fn can_fit(&self, record: &SequencingRecord<'_>) -> bool {
let configured_size = record.configured_size_cbq(
self.header.is_paired(),
self.header.has_flags(),
self.header.has_headers(),
self.header.has_qualities(),
);
self.current_size + configured_size <= self.header.block_size as usize
}
pub(crate) fn can_ingest(&self, other: &Self) -> bool {
self.current_size + other.current_size <= self.header.block_size as usize
}
fn validate_record(&self, record: &SequencingRecord) -> Result<()> {
let configured_size = record.configured_size_cbq(
self.header.is_paired(),
self.header.has_flags(),
self.header.has_headers(),
self.header.has_qualities(),
);
if !self.can_fit(record) {
if configured_size > self.header.block_size as usize {
return Err(WriteError::RecordSizeExceedsMaximumBlockSize(
configured_size,
self.header.block_size as usize,
)
.into());
}
return Err(CbqError::BlockFull {
current_size: self.current_size,
record_size: configured_size,
block_size: self.header.block_size as usize,
}
.into());
}
if self.header.is_paired() && !record.is_paired() {
return Err(WriteError::ConfigurationMismatch {
attribute: "paired",
expected: self.header.is_paired(),
actual: record.is_paired(),
}
.into());
}
if self.header.has_flags() && !record.has_flags() {
return Err(WriteError::ConfigurationMismatch {
attribute: "flags",
expected: self.header.has_flags(),
actual: record.has_flags(),
}
.into());
}
if self.header.has_headers() && !record.has_headers() {
return Err(WriteError::ConfigurationMismatch {
attribute: "headers",
expected: self.header.has_headers(),
actual: record.has_headers(),
}
.into());
}
if self.header.has_qualities() && !record.has_qualities() {
return Err(WriteError::ConfigurationMismatch {
attribute: "qualities",
expected: self.header.has_qualities(),
actual: record.has_qualities(),
}
.into());
}
Ok(())
}
pub fn push(&mut self, record: SequencingRecord) -> Result<()> {
self.validate_record(&record)?;
let configured_size = record.configured_size_cbq(
self.header.is_paired(),
self.header.has_flags(),
self.header.has_headers(),
self.header.has_qualities(),
);
self.add_sequence(&record)?;
self.add_flag(&record)?;
self.add_headers(&record)?;
self.add_quality(&record)?;
self.current_size += configured_size;
self.num_records += 1;
Ok(())
}
fn ebuf_len(&self) -> usize {
self.nuclen.div_ceil(32)
}
fn encode_sequence(&mut self) -> Result<()> {
bitnuc::twobit::encode_with_invalid(&self.seq, &mut self.ebuf)?;
Ok(())
}
fn fill_npos(&mut self) -> Result<()> {
self.npos
.extend(memchr::memchr_iter(b'N', &self.seq).map(|i| i as u64));
self.num_npos = self.npos.len();
if self.npos.is_empty() {
self.ef = None;
Ok(())
} else {
let mut ef_builder = EliasFanoBuilder::new(self.seq.len(), self.npos.len())?;
ef_builder.extend(self.npos.iter().map(|idx| *idx as usize))?;
let ef = ef_builder.build();
self.ef = Some(ef);
Ok(())
}
}
fn backfill_npos(&mut self) {
if let Some(ef) = self.ef.as_ref() {
ef.iter(0).for_each(|idx| {
if let Some(base) = self.seq.get_mut(idx) {
*base = b'N';
}
});
}
}
fn compress_columns(&mut self, cctx: &mut zstd_safe::CCtx) -> Result<()> {
sized_compress(&mut self.z_seq_len, cast_slice(&self.l_seq), cctx)?;
if !self.headers.is_empty() {
sized_compress(&mut self.z_header_len, cast_slice(&self.l_headers), cctx)?;
}
if let Some(ef) = self.ef.as_ref() {
ef.serialize_into(&mut self.ef_bytes)?;
self.len_nef = self.ef_bytes.len();
sized_compress(&mut self.z_npos, &self.ef_bytes, cctx)?;
}
sized_compress(&mut self.z_seq, cast_slice(&self.ebuf), cctx)?;
if !self.flags.is_empty() {
sized_compress(&mut self.z_flags, cast_slice(&self.flags), cctx)?;
}
if !self.headers.is_empty() {
sized_compress(&mut self.z_headers, cast_slice(&self.headers), cctx)?;
}
if !self.qual.is_empty() {
sized_compress(&mut self.z_qual, cast_slice(&self.qual), cctx)?;
}
Ok(())
}
pub fn decompress_columns(&mut self) -> Result<()> {
{
self.l_seq.resize(self.num_sequences, 0);
copy_decode(self.z_seq_len.as_slice(), cast_slice_mut(&mut self.l_seq))?;
}
if !self.z_header_len.is_empty() {
self.l_headers.resize(self.num_sequences, 0);
copy_decode(
self.z_header_len.as_slice(),
cast_slice_mut(&mut self.l_headers),
)?;
}
if !self.z_npos.is_empty() {
self.ef_bytes.resize(self.len_nef, 0);
copy_decode(self.z_npos.as_slice(), &mut self.ef_bytes)?;
let ef = EliasFano::deserialize_from(self.ef_bytes.as_slice())?;
self.num_npos = ef.len();
self.ef = Some(ef);
}
{
self.ebuf.resize(self.ebuf_len(), 0);
copy_decode(self.z_seq.as_slice(), cast_slice_mut(&mut self.ebuf))?;
bitnuc::twobit::decode(&self.ebuf, self.nuclen, &mut self.seq)?;
self.backfill_npos();
}
if !self.z_flags.is_empty() {
self.flags.resize(self.num_records, 0);
copy_decode(self.z_flags.as_slice(), cast_slice_mut(&mut self.flags))?;
}
if !self.z_headers.is_empty() {
copy_decode(self.z_headers.as_slice(), &mut self.headers)?;
}
if !self.z_qual.is_empty() {
copy_decode(self.z_qual.as_slice(), &mut self.qual)?;
}
{
calculate_offsets(&self.l_seq, &mut self.l_seq_offsets);
calculate_offsets(&self.l_headers, &mut self.l_header_offsets);
}
Ok(())
}
fn write<W: io::Write>(&mut self, writer: &mut W) -> Result<()> {
writer.write_all(&self.z_seq_len)?;
writer.write_all(&self.z_header_len)?;
writer.write_all(&self.z_npos)?;
writer.write_all(&self.z_seq)?;
writer.write_all(&self.z_flags)?;
writer.write_all(&self.z_headers)?;
writer.write_all(&self.z_qual)?;
Ok(())
}
pub fn flush_to<W: io::Write>(
&mut self,
writer: &mut W,
cctx: &mut zstd_safe::CCtx,
) -> Result<Option<BlockHeader>> {
if self.is_empty() {
return Ok(None);
}
self.encode_sequence()?;
self.fill_npos()?;
self.compress_columns(cctx)?;
let header = BlockHeader::from_block(self);
header.write(writer)?;
self.write(writer)?;
self.clear();
Ok(Some(header))
}
pub fn read_from<R: io::Read>(&mut self, reader: &mut R, header: BlockHeader) -> Result<()> {
self.clear();
self.nuclen = header.nuclen as usize;
self.num_records = header.num_records as usize;
self.num_sequences = header.num_sequences as usize;
self.len_nef = header.len_nef as usize;
extension_read(reader, &mut self.z_seq_len, header.len_z_seq_len as usize)?;
extension_read(
reader,
&mut self.z_header_len,
header.len_z_header_len as usize,
)?;
extension_read(reader, &mut self.z_npos, header.len_z_npos as usize)?;
extension_read(reader, &mut self.z_seq, header.len_z_seq as usize)?;
extension_read(reader, &mut self.z_flags, header.len_z_flags as usize)?;
extension_read(reader, &mut self.z_headers, header.len_z_headers as usize)?;
extension_read(reader, &mut self.z_qual, header.len_z_qual as usize)?;
Ok(())
}
pub fn decompress_from_bytes(
&mut self,
bytes: &[u8],
header: BlockHeader,
dctx: &mut zstd_safe::DCtx,
) -> Result<()> {
self.clear();
self.nuclen = header.nuclen as usize;
self.num_records = header.num_records as usize;
self.num_sequences = header.num_sequences as usize;
self.len_nef = header.len_nef as usize;
let mut byte_offset = 0;
{
resize_uninit(&mut self.l_seq, self.num_sequences);
dctx.decompress(
cast_slice_mut(&mut self.l_seq),
slice_and_increment(&mut byte_offset, header.len_z_seq_len, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
self.l_seq.iter().for_each(|len| {
if *len as usize > self.qbuf.len() {
self.qbuf.resize(*len as usize, self.default_quality_score);
}
});
}
if header.len_z_header_len > 0 {
resize_uninit(&mut self.l_headers, self.num_sequences);
dctx.decompress(
cast_slice_mut(&mut self.l_headers),
slice_and_increment(&mut byte_offset, header.len_z_header_len, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
}
{
calculate_offsets(&self.l_seq, &mut self.l_seq_offsets);
calculate_offsets(&self.l_headers, &mut self.l_header_offsets);
}
if header.len_z_npos > 0 {
resize_uninit(&mut self.ef_bytes, self.len_nef);
dctx.decompress(
&mut self.ef_bytes,
slice_and_increment(&mut byte_offset, header.len_z_npos, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
let ef = EliasFano::deserialize_from(self.ef_bytes.as_slice())?;
self.num_npos = ef.len();
self.ef = Some(ef);
}
{
let ebuf_len = self.ebuf_len();
resize_uninit(&mut self.ebuf, ebuf_len);
dctx.decompress(
cast_slice_mut(&mut self.ebuf),
slice_and_increment(&mut byte_offset, header.len_z_seq, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
bitnuc::twobit::decode(&self.ebuf, self.nuclen, &mut self.seq)?;
self.backfill_npos();
}
if header.len_z_flags > 0 {
resize_uninit(&mut self.flags, self.num_records);
dctx.decompress(
cast_slice_mut(&mut self.flags),
slice_and_increment(&mut byte_offset, header.len_z_flags, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
}
if header.len_z_headers > 0 {
let headers_len = (self.l_header_offsets.last().copied().unwrap_or(0)
+ self.l_headers.last().copied().unwrap_or(0))
as usize;
resize_uninit(&mut self.headers, headers_len);
dctx.decompress(
&mut self.headers,
slice_and_increment(&mut byte_offset, header.len_z_headers, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
}
if header.len_z_qual > 0 {
resize_uninit(&mut self.qual, self.nuclen);
dctx.decompress(
&mut self.qual,
slice_and_increment(&mut byte_offset, header.len_z_qual, bytes),
)
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
}
Ok(())
}
pub(crate) fn take_incomplete(&mut self, other: &Self) -> Result<()> {
if !self.can_ingest(other) {
return Err(CbqError::CannotIngestBlock {
self_block_size: self.header.block_size as usize,
other_block_size: other.header.block_size as usize,
}
.into());
}
{
self.nuclen += other.nuclen;
self.num_records += other.num_records;
self.num_sequences += other.num_sequences;
self.current_size += other.current_size;
}
{
self.seq.extend_from_slice(&other.seq);
self.flags.extend_from_slice(&other.flags);
self.headers.extend_from_slice(&other.headers);
self.qual.extend_from_slice(&other.qual);
self.l_seq.extend_from_slice(&other.l_seq);
self.l_headers.extend_from_slice(&other.l_headers);
}
{
}
Ok(())
}
#[must_use]
pub fn iter_records(&self, range: BlockRange) -> RefRecordIter<'_> {
RefRecordIter {
block: self,
range,
qbuf: &self.qbuf,
index: 0,
is_paired: self.header.is_paired(),
has_headers: self.header.has_headers(),
header_buffer: itoa::Buffer::new(),
}
}
}
pub struct RefRecordIter<'a> {
block: &'a ColumnarBlock,
range: BlockRange,
index: usize,
is_paired: bool,
has_headers: bool,
qbuf: &'a [u8],
header_buffer: itoa::Buffer,
}
impl<'a> Iterator for RefRecordIter<'a> {
type Item = RefRecord<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.block.num_records {
None
} else {
let seq_idx = if self.is_paired {
self.index * 2
} else {
self.index
};
let sseq_span =
Span::new_u64(self.block.l_seq_offsets[seq_idx], self.block.l_seq[seq_idx]);
let sheader_span = if self.has_headers {
Some(Span::new_u64(
self.block.l_header_offsets[seq_idx],
self.block.l_headers[seq_idx],
))
} else {
None
};
let xseq_span = if self.is_paired {
Some(Span::new_u64(
self.block.l_seq_offsets[seq_idx + 1],
self.block.l_seq[seq_idx + 1],
))
} else {
None
};
let xheader_span = if self.is_paired && self.has_headers {
Some(Span::new_u64(
self.block.l_header_offsets[seq_idx + 1],
self.block.l_headers[seq_idx + 1],
))
} else {
None
};
let global_index =
self.range.cumulative_records as usize - self.block.num_records + self.index;
let rr_index = RefRecordIndex::new(global_index, &mut self.header_buffer);
let record = RefRecord {
block: self.block,
index: self.index,
qbuf: self.qbuf,
global_index,
sseq_span,
sheader_span,
xseq_span,
xheader_span,
rr_index,
};
self.index += 1;
Some(record)
}
}
}
#[derive(Clone, Copy)]
struct RefRecordIndex {
index_buf: [u8; 20],
index_len: usize,
}
impl RefRecordIndex {
fn new(index: usize, itoa_buf: &mut itoa::Buffer) -> Self {
let mut index_buf = [0u8; 20];
let header_str = itoa_buf.format(index);
let index_len = header_str.len();
index_buf[..index_len].copy_from_slice(header_str.as_bytes());
Self {
index_buf,
index_len,
}
}
fn as_bytes(&self) -> &[u8] {
&self.index_buf[..self.index_len]
}
}
#[derive(Clone, Copy)]
pub struct RefRecord<'a> {
block: &'a ColumnarBlock,
qbuf: &'a [u8],
index: usize,
global_index: usize,
sseq_span: Span,
xseq_span: Option<Span>,
sheader_span: Option<Span>,
xheader_span: Option<Span>,
rr_index: RefRecordIndex,
}
impl BinseqRecord for RefRecord<'_> {
fn bitsize(&self) -> BitSize {
BitSize::Two
}
fn index(&self) -> u64 {
self.global_index as u64
}
fn flag(&self) -> Option<u64> {
self.block.flags.get(self.index).copied()
}
fn is_paired(&self) -> bool {
self.xseq_span.is_some()
}
fn sheader(&self) -> &[u8] {
if let Some(span) = self.sheader_span {
&self.block.headers[span.range()]
} else {
self.rr_index.as_bytes()
}
}
fn xheader(&self) -> &[u8] {
if let Some(span) = self.xheader_span {
&self.block.headers[span.range()]
} else {
self.rr_index.as_bytes()
}
}
fn sbuf(&self) -> &[u64] {
unimplemented!("sbuf is not implemented for cbq")
}
fn xbuf(&self) -> &[u64] {
unimplemented!("xbuf is not implemented for cbq")
}
fn slen(&self) -> u64 {
self.sseq_span.len() as u64
}
fn xlen(&self) -> u64 {
self.xseq_span.map_or(0, |span| span.len() as u64)
}
fn decode_s(&self, buf: &mut Vec<u8>) -> crate::Result<()> {
buf.extend_from_slice(self.sseq());
Ok(())
}
fn decode_x(&self, buf: &mut Vec<u8>) -> crate::Result<()> {
buf.extend_from_slice(self.xseq());
Ok(())
}
fn sseq(&self) -> &[u8] {
&self.block.seq[self.sseq_span.range()]
}
fn xseq(&self) -> &[u8] {
self.xseq_span
.map_or(&[], |span| &self.block.seq[span.range()])
}
fn has_quality(&self) -> bool {
self.block.header.has_qualities()
}
fn squal(&self) -> &[u8] {
if self.has_quality() {
&self.block.qual[self.sseq_span.range()]
} else {
&self.qbuf[..self.slen() as usize]
}
}
fn xqual(&self) -> &[u8] {
if self.has_quality()
&& let Some(span) = self.xseq_span
{
&self.block.qual[span.range()]
} else {
&self.qbuf[..self.xlen() as usize]
}
}
}