use std::{collections::VecDeque, fs::File, io::{BufReader, Read}, path::Path};
use crate::{busz::{blocks::load_block_header, read_busz_header}, io::{BusParams, BusRecord, CUGIterator, DEFAULT_BUF_SIZE}};
use itertools::izip;
use fastfibonacci::byte_decode::faster::FB_LOOKUP_NEW_U16;
use super::{BuszHeader, blocks::CompressedBlockHeader, PFD_BLOCKSIZE};
pub struct BuszReader <'a> {
params: BusParams,
busz_header: BuszHeader,
reader: Box<dyn Read+ 'a>, buffer: VecDeque<BusRecord>,
is_done: bool, }
impl <'a>BuszReader<'a> {
pub fn new(filename: &Path) -> Self {
let file_handle = File::open(filename).expect("FAIL");
let buf = BufReader::with_capacity(DEFAULT_BUF_SIZE, file_handle);
Self::from_read(buf)
}
pub fn from_read(mut reader: impl Read + 'a) -> Self {
let (params, busz_header) = read_busz_header(&mut reader);
let buffer = VecDeque::with_capacity(busz_header.block_size as usize);
BuszReader { params, busz_header, reader: Box::new(reader), buffer, is_done: false }
}
fn load_block_header(&mut self) -> Option<CompressedBlockHeader>{
load_block_header(&mut self.reader)
}
fn load_busz_block_faster(&mut self) -> Option<Vec<BusRecord>>{
let header = self.load_block_header()?;
let (block_size_bytes, nrecords) = header.get_blocksize_and_nrecords();
let mut block_buffer: Vec<u8> = vec![0;block_size_bytes as usize];
self.reader.read_exact(&mut block_buffer).unwrap();
let mut block = BuszBlock::new(block_buffer.as_slice(), nrecords as usize);
let records =block.parse_block();
Some(records)
}
pub fn get_params(&self) -> &BusParams {
&self.params
}
}
impl <'a>Iterator for BuszReader<'a> {
type Item=BusRecord;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done { return None
}
if self.buffer.is_empty(){
let block =self.load_busz_block_faster();
match block {
Some(records) => {
self.buffer = records.into()
},
None => {
self.is_done = true;
return None
}
};
}
match self.buffer.pop_front() {
Some(record) => {
Some(record)
},
None => panic!("cant happen")
}
}
}
impl <'a> CUGIterator for BuszReader<'a> {}
#[derive(Debug, PartialEq, Eq)]
enum BuszBlockState {
Cb,
Umi,
Ec,
Count,
Flag,
Finished
}
#[derive(Debug)]
struct BuszBlock <'a> {
buffer: &'a [u8],
pos: usize, n_elements: usize , state: BuszBlockState,
}
impl <'a> BuszBlock <'a> {
fn new(buffer: &'a [u8], n_elements: usize) -> Self {
BuszBlock {
buffer,
pos: 0,
n_elements,
state: BuszBlockState::Cb,
}
}
fn fibonacci_factory<R:Read+'a>(stream: R) -> fastfibonacci::byte_decode::faster::FastFibonacciDecoderNewU16<'a, R>{
fastfibonacci::byte_decode::faster::FastFibonacciDecoderNewU16::new(stream, &FB_LOOKUP_NEW_U16, false, fastfibonacci::byte_decode::faster::StreamType::U64)
}
fn parse_cb(&mut self) -> Vec<u64> {
assert_eq!(self.state, BuszBlockState::Cb);
assert_eq!(self.pos, 0, "must be at beginning of buffer to parse CBs");
let mut dd = Self::fibonacci_factory(&self.buffer[self.pos..]);
const CB_RLE_VAL:u64 = 1; let mut cb_delta_encoded: Vec<u64> = Vec::with_capacity(self.n_elements);
while cb_delta_encoded.len() < self.n_elements {
let item = dd.next().unwrap();
if item == CB_RLE_VAL {
let runlength = dd.next().unwrap() as usize;
cb_delta_encoded.resize(cb_delta_encoded.len() + runlength, CB_RLE_VAL -1);
} else {
cb_delta_encoded.push(item - 1); }
}
cb_delta_encoded.iter_mut().fold(0, |acc, x| {
*x += acc;
*x
});
assert!(dd.is_clean());
let bytes_consumed = dd.get_consumed_bytes();
self.pos += bytes_consumed;
self.state = BuszBlockState::Umi;
cb_delta_encoded
}
fn parse_umi(&mut self, cbs: &[u64]) -> Vec<u64> {
assert_eq!(self.state, BuszBlockState::Umi);
let mut dd = Self::fibonacci_factory(&self.buffer[self.pos..]);
const UMI_RLE_VAL: u64 = 1 ;
let mut last_cb = cbs[0] + 1;
let mut umi =0_u64;
let mut umis: Vec<u64> = Vec::with_capacity(self.n_elements);
while umis.len() < self.n_elements {
let diff = dd.next().unwrap() - 1;
let current_cb = cbs[umis.len()];
if last_cb !=current_cb {
umi=0;
}
if diff == UMI_RLE_VAL - 1 {
let runlength = dd.next().unwrap() as usize;
umis.resize(umis.len() + runlength, umi - 1);
} else {
umi+= diff;
umis.push(umi - 1); }
last_cb = current_cb;
}
assert!(dd.is_clean());
let bytes_consumed = dd.get_consumed_bytes();
self.pos += bytes_consumed;
self.state = BuszBlockState::Ec;
umis
}
fn parse_ec(&mut self) -> Vec<u64> {
let bytes = &self.buffer[self.pos..];
let (ecs, bytes_consumed) = newpfd::newpfd_u32::decode(bytes, self.n_elements, PFD_BLOCKSIZE);
assert_eq!(bytes_consumed % 4, 0);
self.pos += bytes_consumed;
self.state = BuszBlockState::Count;
ecs
}
fn parse_counts(&mut self) -> Vec<u64> {
assert_eq!(self.state, BuszBlockState::Count);
let mut dd = Self::fibonacci_factory(&self.buffer[self.pos..]);
const COUNT_RLE_VAL: u64 = 1; let mut counts_encoded: Vec<u64> = Vec::with_capacity(self.n_elements);
while counts_encoded.len() < self.n_elements {
let item = dd.next().unwrap();
if item == COUNT_RLE_VAL {
let runlength = dd.next().unwrap() as usize;
counts_encoded.resize(counts_encoded.len() + runlength, COUNT_RLE_VAL);
} else {
counts_encoded.push(item); }
}
assert!(dd.is_clean());
let bytes_consumed = dd.get_consumed_bytes();
self.pos += bytes_consumed;
self.state = BuszBlockState::Flag;
counts_encoded
}
fn parse_flags(&mut self) -> Vec<u64> {
assert_eq!(self.state, BuszBlockState::Flag);
let mut dd = Self::fibonacci_factory(&self.buffer[self.pos..]);
const FLAG_RLE_VAL: u64 = 1; let mut flag_decoded: Vec<u64> = Vec::with_capacity(self.n_elements);
while flag_decoded.len() < self.n_elements {
let item = dd.next().unwrap();
if item == FLAG_RLE_VAL {
let runlength = dd.next().unwrap() as usize;
flag_decoded.resize(
flag_decoded.len() + runlength,
FLAG_RLE_VAL - 1 );
} else {
flag_decoded.push(item-1); }
}
assert!(dd.is_clean());
let bytes_consumed = dd.get_consumed_bytes();
self.pos += bytes_consumed;
self.state = BuszBlockState::Finished;
assert_eq!(self.pos, self.buffer.len(), "still leftover bytes in the buffer!");
flag_decoded
}
fn parse_block(&mut self) -> Vec<BusRecord>{
let cbs = self.parse_cb();
let umis = self.parse_umi(&cbs);
let ecs = self.parse_ec();
let count = self.parse_counts();
let flag = self.parse_flags();
let mut decoded_records = Vec::with_capacity(self.n_elements);
for (cb,umi,ec,count,flag) in izip!(cbs, umis, ecs, count, flag) {
decoded_records.push(
BusRecord {
CB: cb,
UMI: umi,
EC: ec as u32,
COUNT:count as u32,
FLAG: flag as u32
}
);
}
decoded_records
}
}
#[test]
fn testing(){
use crate::record;
let r1 = record!(0, 1, 0, 12, 0 );
let r2 = record!(0, 1, 1, 2, 0 );
let r3 = record!(0, 2, 0, 12, 0 );
let r4 = record!(1, 1, 1, 2, 0 );
let r5 = record!(1, 2, 1, 2, 0 );
let r6 = record!(1, 3, 1, 2, 1 );
let records = vec![
r1.clone(),
r2.clone(),
r3.clone(),
r4.clone(),
r5.clone(),
r6.clone(),
];
use crate::busz::encode::compress_busrecords_into_block;
let mut block_bytes = compress_busrecords_into_block(&records);
let mut header_bytes =[0_u8; 8];
for i in 0..8 {
header_bytes[i] = block_bytes[i];
}
let h = CompressedBlockHeader::from_u64(u64::from_le_bytes(header_bytes));
block_bytes = block_bytes[8..].to_vec();
let (_blksize, nrecords) = h.get_blocksize_and_nrecords();
println!("{}", block_bytes.len());
assert_eq!(block_bytes.len() % 4, 0);
let mut block = BuszBlock::new(&block_bytes, nrecords as usize);
let records_dec = block.parse_block();
assert_eq!(records, records_dec)
}
#[cfg(test)]
mod testing{
use tempfile::tempdir;
use crate::{busz::BuszWriter, iterators::CellGroupIterator, record};
use super::*;
#[test]
fn test_groupby() {
let r1 = record!(0, 1, 0, 12, 0 );
let r2 = record!(0, 2, 1, 2, 0 );
let r3 = record!(0, 3, 0, 12, 0 );
let r4 = record!(1, 1, 1, 2, 0 );
let r5 = record!(1, 2, 1, 2, 0 );
let r6 = record!(2, 1, 1, 2, 0 );
let records = vec![
r1.clone(),
r2.clone(),
r3.clone(),
r4.clone(),
r5.clone(),
r6.clone(),
];
let dir = tempdir().unwrap();
let fname = dir.path().join("test.bus");
let mut w = BuszWriter::new(
&fname,
BusParams {cb_len:16, umi_len:12},
4
);
w.write_iterator(records.into_iter());
let mut reader = BuszReader::new(&fname).groupby_cb();
assert_eq!(reader.next(), Some((0, vec![r1,r2,r3])));
assert_eq!(reader.next(), Some((1, vec![r4,r5])));
assert_eq!(reader.next(), Some((2, vec![r6])));
assert_eq!(reader.next(), None);
assert_eq!(reader.next(), None);
}
#[test]
fn test_inmem() {
let fname = Path::new("/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.busz");
let mut fh = BufReader::new(
File::open(fname).unwrap()
);
let mut bytes: Vec<u8> = Vec::new();
let bytes_read = fh.read_to_end(&mut bytes).unwrap();
println!("bytes_read : {bytes_read}");
let reader_inmem = BuszReader::from_read(bytes.as_slice());
let reader_ondisk = BuszReader::new(fname);
for (r1, r2) in izip!(reader_inmem, reader_ondisk) {
assert_eq!(r1, r2)
}
}
#[test]
fn test_all_busrecord_fields() {
let r1 = record!(0, 0, 1, 2, 5 );
let r2 = record!(100, 10, 2, 4, 10 );
let r3 = record!(200, 20, 3, 8, 15 );
let r4 = record!(300, 30, 4, 16, 20 );
let records = vec![r1, r2, r3, r4];
let dir = tempdir().unwrap();
let tmpfilename = dir.path().join("allfields.busz");
let mut bytes: Vec<u8> = Vec::new();
let mut w = BuszWriter::new(
&tmpfilename,
BusParams{ cb_len:16, umi_len: 12},
3
);
w.write_iterator(records.clone().into_iter());
let mut fh = File::open(tmpfilename).unwrap();
let bytes_read = fh.read_to_end(&mut bytes).unwrap();
println!("bytes_read : {bytes_read}");
let reader_inmem = BuszReader::from_read(bytes.as_slice());
for (r1, r2) in izip!(reader_inmem, records) {
assert_eq!(r1, r2)
}
}
#[test]
fn confirm_bus_oder() {
let dir = tempdir().unwrap();
let fname = dir.path().join("tmp.bus");
let r1 = record!(6, 4, 1, 2, 5 );
let r2 = record!(6+6, 4, 1, 2, 5 );
let mut w = BuszWriter::new(&fname, BusParams { cb_len: 16, umi_len: 12 }, 100);
w.write_records(vec![r1, r2]);
w.terminal_flush();
let mut r = BuszReader::new(&fname);
let mut buf = [0_u8;8];
r.reader.read_exact(&mut buf).unwrap();
let h = CompressedBlockHeader::from_u64(u64::from_le_bytes(buf));
println!("{:?}", h.get_blocksize_and_nrecords());
let mut buf = [0_u8;8];
r.reader.read_exact(&mut buf).unwrap();
println!("CB {buf:?}, {:064b} -> u64 {}", u64::from_le_bytes(buf), u64::from_le_bytes(buf));
let mut buf = [0_u8;8];
r.reader.read_exact(&mut buf).unwrap();
println!("UMI {buf:?}, {:064b} -> u64 {}", u64::from_le_bytes(buf), u64::from_le_bytes(buf));
let mut buf = [0_u8;4];
r.reader.read_exact(&mut buf).unwrap();
println!("EC {buf:?} {:032b}", u32::from_le_bytes(buf));
let mut buf = [0_u8;8];
r.reader.read_exact(&mut buf).unwrap();
println!("count {buf:?}");
}
}