use std::{fs::File, io::{BufWriter, Write}};
use crate::{io::{BusRecord, DEFAULT_BUF_SIZE, BusReaderPlain, BusHeader, BusParams}, busz::{utils::{bitslice_to_bytes, swap_endian}, PFD_BLOCKSIZE, CompressedBlockHeader}};
use bitvec::prelude as bv;
use itertools::Itertools;
use fastfibonacci::bit_decode::fibonacci;
use super::{runlength_codec::RunlengthCodec, utils::round_to_multiple, BuszBitVector, BuszHeader};
fn compress_barcodes2(records: &[BusRecord]) -> BuszBitVector {
let runlength_codec = RunlengthCodec {rle_val: 0, shift_up_1: true};
let mut cb_iter = records.iter().map(|r| r.CB);
let mut last_el = cb_iter.next().unwrap();
let mut delta_encoded = Vec::new();
delta_encoded.push(last_el);
for el in cb_iter{
delta_encoded.push(el-last_el);
last_el=el
}
let runlen_encoded = runlength_codec.encode(delta_encoded.into_iter());
let mut enc = fibonacci::encode(&runlen_encoded);
let n_pad = round_to_multiple(enc.len(), 64) - enc.len();
for _ in 0..n_pad {
enc.push(false);
}
enc
}
fn compress_umis(records: &[BusRecord]) -> BuszBitVector {
let runlength_codec = RunlengthCodec {rle_val: 0,shift_up_1: true};
let mut periodic_delta_encoded = Vec::new();
let iii = records.iter();
let last_record = &records[0];
let mut last_umi = 0;
let mut last_bc = last_record.CB + 1;
let mut bc: u64;
let mut umi: u64;
for current_record in iii {
bc = current_record.CB;
umi = current_record.UMI + 1;
if bc != last_bc {
last_umi= 0;
}
let diff = umi - last_umi;
periodic_delta_encoded.push(diff);
last_umi = umi;
last_bc = bc;
};
let runlen_encoded = runlength_codec.encode(periodic_delta_encoded.into_iter());
let mut enc= fibonacci::encode(&runlen_encoded);
let n_pad = round_to_multiple(enc.len(), 64) - enc.len();
for _ in 0..n_pad {
enc.push(false);
}
enc
}
fn compress_ecs(records: &[BusRecord]) -> BuszBitVector {
let ecs = records.iter().map(|r|r.EC as u64);
let (mut encoded, _n_el) = newpfd::newpfd_bitvec::encode(ecs, PFD_BLOCKSIZE);
let n_pad = round_to_multiple(encoded.len(), 32) - encoded.len();
for _ in 0..n_pad {
encoded.push(false);
}
assert_eq!(encoded.len() % 32, 0, "newPFD block size needs to be a mutiple of 32, but is {}", encoded.len());
let bytes = bitslice_to_bytes(encoded.as_bitslice());
let a = swap_endian(&bytes, 8);
let swapped = swap_endian(&a, 4);
let swapped_bv: BuszBitVector = bv::BitVec::from_slice(&swapped);
swapped_bv
}
fn compress_counts(records: &[BusRecord]) -> BuszBitVector {
let runlength_codec = RunlengthCodec {rle_val: 1, shift_up_1: false};
let count_iter = records.iter().map(|r| r.COUNT as u64);
let runlen_encoded = runlength_codec.encode(count_iter);
let mut enc = fibonacci::encode(&runlen_encoded);
let n_pad = round_to_multiple(enc.len(), 64) - enc.len();
for _ in 0..n_pad {
enc.push(false);
}
enc
}
fn compress_flags(records: &[BusRecord]) -> BuszBitVector {
let runlength_codec = RunlengthCodec {rle_val: 0, shift_up_1: true};
let flag_iter = records.iter().map(|r| r.FLAG as u64);
let runlen_encoded = runlength_codec.encode(flag_iter);
let mut enc = fibonacci::encode(&runlen_encoded);
let n_pad = round_to_multiple(enc.len(), 64) - enc.len();
for _ in 0..n_pad {
enc.push(false);
}
enc
}
pub (crate) fn compress_busrecords_into_block(records: &[BusRecord]) -> Vec<u8> { let bcs = compress_barcodes2(records);
let umis = compress_umis(records);
let ecs = compress_ecs(records);
let counts = compress_counts(records);
let flags = compress_flags(records);
let bitsize: usize = bcs.len()+umis.len()+ecs.len()+counts.len()+flags.len();
assert_eq!(bitsize%8, 0);
let mut body_bytes = Vec::with_capacity(bitsize/8);
body_bytes.extend(swap_endian(&bitslice_to_bytes(&bcs), 8));
body_bytes.extend(swap_endian(&bitslice_to_bytes(&umis), 8));
body_bytes.extend(swap_endian(&bitslice_to_bytes(&ecs), 8));
body_bytes.extend(swap_endian(&bitslice_to_bytes(&counts), 8));
body_bytes.extend(swap_endian(&bitslice_to_bytes(&flags), 8));
let nbytes = body_bytes.len();
let header = CompressedBlockHeader::new(
nbytes.try_into().unwrap(),
records.len().try_into().unwrap());
let mut header_bytes = header.header_bytes.to_le_bytes().to_vec();
header_bytes.extend(body_bytes);
header_bytes
}
pub fn compress_busfile(input: &str, output: &str, blocksize: usize) {
let reader = BusReaderPlain::new(input);
let mut writer = BuszWriter::new(output, reader.params.clone(), blocksize);
writer.write_iterator(reader.into_iter());
}
#[derive(Eq, PartialEq, Debug)]
enum BuszWriterState {
Open,
FlushedAndClosed,
}
pub struct BuszWriter {
writer: BufWriter<File>,
internal_buffer: Vec<BusRecord>, busz_blocksize: usize,
params: BusParams,
state: BuszWriterState
}
impl BuszWriter {
pub fn from_filehandle(file_handle: File, params: BusParams, busz_blocksize: usize) -> BuszWriter {
BuszWriter::new_with_capacity(file_handle, params, busz_blocksize)
}
pub fn new(filename: &str, params: BusParams, busz_blocksize: usize) -> BuszWriter {
let file_handle: File = File::create(filename).expect("FAILED to open");
BuszWriter::from_filehandle(file_handle, params, busz_blocksize)
}
pub fn write_iterator(&mut self, iter: impl Iterator<Item=BusRecord>) {
for chunk in &iter.chunks(self.busz_blocksize) {
let records: Vec<BusRecord> = chunk.collect();
let compressed_block = compress_busrecords_into_block(&records);
self.writer.write_all(&compressed_block).unwrap();
}
self.terminal_flush();
}
pub fn write_record(&mut self, record: BusRecord) {
if self.state == BuszWriterState::FlushedAndClosed {
panic!("Buffer has been flushed and closed!")
}
if self.internal_buffer.len() < self.busz_blocksize - 1{ self.internal_buffer.push(record);
} else {
self.internal_buffer.push(record);
self.write_buffer_to_disk()
}
}
fn write_buffer_to_disk(&mut self) {
if self.state == BuszWriterState::FlushedAndClosed {
panic!("Buffer has been flushed and closed!")
}
let compressed_block = compress_busrecords_into_block(&self.internal_buffer);
self.writer.write_all(&compressed_block).unwrap();
self.internal_buffer.clear();
self.writer.flush().unwrap();
}
pub fn write_records(&mut self, records: Vec<BusRecord>) {
for r in records {
self.write_record(r)
}
}
pub fn terminal_flush(&mut self) {
if !self.internal_buffer.is_empty() {
self.write_buffer_to_disk();
}
self.writer.write_all(&[0;8]).unwrap();
self.writer.flush().unwrap();
self.state = BuszWriterState::FlushedAndClosed;
}
pub fn new_with_capacity(file_handle: File, params: BusParams, busz_blocksize: usize) -> Self {
let mut writer = BufWriter::with_capacity(DEFAULT_BUF_SIZE, file_handle);
let custom_header_str = "BUS file produced by kallisto".as_bytes();
let busheader = BusHeader::new(
params.cb_len,
params.umi_len,
custom_header_str.len() as u32,
true
);
let binheader = busheader.to_bytes();
writer
.write_all(&binheader)
.expect("FAILED to write header");
let varheader = custom_header_str;
writer
.write_all(varheader)
.expect("FAILED to write var header");
let busz_header = BuszHeader {
block_size: busz_blocksize.try_into().unwrap(),
pfd_block_size: 512,
lossy_umi: 0
};
let binzheader = busz_header.to_bytes();
writer
.write_all(&binzheader)
.expect("FAILED to write header");
let internal_buffer: Vec<BusRecord> = Vec::with_capacity(busz_blocksize);
BuszWriter { writer, internal_buffer, busz_blocksize, params, state: BuszWriterState::Open }
}
}
impl Drop for BuszWriter {
fn drop(&mut self) {
match self.state {
BuszWriterState::Open => {self.terminal_flush()},
BuszWriterState::FlushedAndClosed => { },
};
}
}
#[cfg(test)]
mod test {
use crate::{busz::{encode::{compress_barcodes2, compress_ecs, compress_umis}, BuszBitSlice}, io::BusRecord};
fn fib_factory(stream: &BuszBitSlice) -> fastfibonacci::bit_decode::fast::FastFibonacciDecoder<u8>{
fastfibonacci::bit_decode::fast::get_u8_decoder(stream, false)
}
#[test]
fn test_cb_encode(){
let v = vec![
BusRecord {CB:0,UMI:0,EC:0,COUNT:1, FLAG: 0 },
BusRecord {CB:0,UMI:0,EC:1,COUNT:1, FLAG: 0 },
BusRecord {CB:1,UMI:0,EC:2,COUNT:1, FLAG: 0 },
BusRecord {CB:1,UMI:0,EC:3,COUNT:1, FLAG: 0 },
BusRecord {CB:1,UMI:0,EC:4,COUNT:1, FLAG: 0 },
BusRecord {CB:1,UMI:0,EC:5,COUNT:1, FLAG: 0 },
];
let enc = compress_barcodes2(&v);
let decoder = fib_factory(&enc);
let decoded: Vec<_> = decoder.collect();
assert_eq!(decoded, vec![
1,2, 2, 1,3 ]);
}
#[test]
fn test_umi_encode(){
let v = vec![
BusRecord {CB:0,UMI:0,EC:0,COUNT:1, FLAG: 0 },
BusRecord {CB:0,UMI:0,EC:1,COUNT:1, FLAG: 0 },
];
let enc = compress_umis(&v);
let decoder = fib_factory(&enc);
let decoded: Vec<_> = decoder.collect();
assert_eq!(decoded, vec![
2, 1,1 ]);
}
#[test]
fn test_umi_encode_wraparound(){
let v = vec![
BusRecord {CB:0,UMI:10,EC:0,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:1,UMI:0,EC:1,COUNT:1, FLAG: 0 }, ];
let enc = compress_umis(&v);
let decoder = fib_factory(&enc);
let decoded: Vec<_> = decoder.collect();
assert_eq!(decoded, vec![
12, 1,2, 2
]);
}
#[test]
fn test_ec_encode(){
let v = vec![
BusRecord {CB:0,UMI:10,EC:0,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:1,UMI:0,EC:1,COUNT:1, FLAG: 0 }, ];
let enc = compress_ecs(&v);
println!("enc size {}", enc.len());
}
mod buszwriter {
use tempfile::tempdir;
use crate::{io::{BusRecord, BusParams}, busz::{encode::{BuszWriter, BuszWriterState}, decode::BuszReader}};
#[test]
fn test_busz_writer_good_blocksize() {
let blocksize = 2;
let v = vec![
BusRecord {CB:0,UMI:10,EC:0,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:1,UMI:0,EC:1,COUNT:1, FLAG: 0 }, ];
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.busz");
let buszfile = file_path.to_str().unwrap();
let mut writer = BuszWriter::new(buszfile, BusParams {cb_len: 16, umi_len: 12} , blocksize);
writer.write_records(v.clone());
writer.terminal_flush();
assert_eq!(writer.state, BuszWriterState::FlushedAndClosed);
assert_eq!(writer.internal_buffer.len(), 0);
let reader = BuszReader::new(buszfile);
assert_eq!(reader.collect::<Vec<BusRecord>>(), v);
}
#[test]
fn test_busz_writer_crooked_blocksize() {
let blocksize = 3;
let v = vec![
BusRecord {CB:0,UMI:10,EC:0,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:1,UMI:0,EC:1,COUNT:1, FLAG: 0 }, ];
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.busz");
let buszfile = file_path.to_str().unwrap();
let mut writer = BuszWriter::new(buszfile, BusParams {cb_len: 16, umi_len: 12} , blocksize);
writer.write_records(v.clone());
writer.terminal_flush();
assert_eq!(writer.state, BuszWriterState::FlushedAndClosed);
assert_eq!(writer.internal_buffer.len(), 0);
let reader = BuszReader::new(buszfile);
assert_eq!(reader.collect::<Vec<BusRecord>>(), v);
}
#[test]
fn test_busz_writer_crooked_blocksize_flush_on_drop() {
let blocksize = 3; let v = vec![
BusRecord {CB:0,UMI:10,EC:0,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:1,UMI:0,EC:1,COUNT:1, FLAG: 0 }, ];
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.busz");
let buszfile = file_path.to_str().unwrap();
{
let mut writer = BuszWriter::new(buszfile, BusParams {cb_len: 16, umi_len: 12} , blocksize);
writer.write_records(v.clone());
}
let reader = BuszReader::new(buszfile);
assert_eq!(reader.collect::<Vec<BusRecord>>(), v);
}
#[test]
fn test_busz_write_iterator() {
let blocksize = 3;
let v = vec![
BusRecord {CB:0,UMI:10,EC:0,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:0,UMI:10,EC:1,COUNT:1, FLAG: 0 }, BusRecord {CB:1,UMI:0,EC:1,COUNT:1, FLAG: 0 }, ];
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.busz");
let buszfile = file_path.to_str().unwrap();
let mut writer = BuszWriter::new(buszfile, BusParams {cb_len: 16, umi_len: 12} , blocksize);
writer.write_iterator(v.iter().cloned());
assert_eq!(writer.state, BuszWriterState::FlushedAndClosed);
assert_eq!(writer.internal_buffer.len(), 0);
let reader = BuszReader::new(buszfile);
assert_eq!(reader.collect::<Vec<BusRecord>>(), v);
}
}
}