bustools_core 0.16.2

Interacting with the kallisto/bus format of scRNAseq data
Documentation
//! Dealing with the [busz compression format](https://github.com/BUStools/BUSZ-format)
//! 
//! # Examples
//! ## Reading a compressed bus file
//! ```rust, no_run
//! use bustools_core::busz::BuszReader;
//! # use std::path::Path;
//! let reader = BuszReader::new(Path::new("/some/file.busz"));
//! for record in reader {
//!     // ...
//! }
//! ```
//! ## Writing to a compressed bus file
//! ```rust, no_run
//! # use std::path::Path;
//! use bustools_core::record;
//! use bustools_core::busz::BuszWriter;
//! use bustools_core::io::{BusRecord, BusParams};
//! let blocksize = 10000;
//! let params = BusParams {cb_len: 16, umi_len: 12};
//! let mut writer = BuszWriter::new(Path::new("/some/file.busz"), params, blocksize);
//! let records = vec![
//!     record!(0, 1, 0, 12,  0 ),
//!     record!(0, 1, 1, 2,  0 ),
//!     record!(0, 2, 0, 12,  0 ),
//!     record!(1, 1, 1, 2,  0 ),
//!     record!(1, 2, 1, 2,  0 ),
//!     record!(1, 1, 1, 2,  0 ),
//! ];
//! writer.write_iterator(records.into_iter());
//! ```
//! 
//! # About Bitvec and Memory layout
//! This code relies heavily on BitVec. It uses [`bitvec`] to encode/decode
//! the bits of the busz records, in particular Fibbonnaci encoding and NewPFD encoding.
//! 
//! **A certain peculiarity though**:
//! To turn bytes (e.g from a u64 or read from the file) into [`bitvec::vec::BitVec`] we use `BitVec::from_bytes(byte_Array)`
//! This takes the bytes literally in the order of the array.
//! Yet `bustools` writes busz in little endian format, i.e. the byte order is reversed.
//! In particular, each busz block contains entries for CB,UMI... each PADDED with zeros afterwards(to a multiple of 64)
//! On disk this is how it looks like:
//! ```bash, no_run
//! 0000000...00000000[CBs in Fibbonnaci]
//! 0000000...00000000[UMIs in Fibbonnaci]
//! ```
//! 
//! Even more, the fibbonacci encoding must be done with little endian byte order, if on disk it looks like
//! ```bash,no_run
//! aaaaaaaabbbbbbbbccccccccddddddddeeeeeeeeffffffffgggggggghhhhhhhh  //bits
//! ```
//! the correct fibonacci stream to decode is
//! ```bash, no_run
//! ddddddddccccccccbbbbbbbbaaaaaaaahhhhhhhhgggggggg....
//! ``` 

use std::io::Read;
use bitvec::{order::Msb0, prelude as bv};
use serde::{Serialize, Deserialize};
use crate::io::{BUS_HEADER_SIZE, BusHeader, BusParams};

mod blocks;
#[deprecated]
mod decode;
pub mod decode_bytes;
mod encode;
mod utils;
mod runlength_codec;

// exposing some core classes/functions to the public API
pub use encode::BuszWriter;
// pub use decode::BuszReader;
pub use decode_bytes::BuszReader;

const PFD_BLOCKSIZE: usize = 512; // size of a PFD block within busz (this many ECs get encoded together)

pub (crate) type BuszBitSlice = bv::BitSlice<u8,Msb0>;
/// reftype that goes with [`MyBitSlice`]
pub (crate) type BuszBitVector = bv::BitVec<u8, Msb0>;




const BUSZ_HEADER_SIZE: usize = 4+4+4;
/// Some busz-file specific headers, coming after the regular [`BusHeader`]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
struct BuszHeader {
    block_size: u32,
    pfd_block_size: u32,
    lossy_umi: u32,
}
impl BuszHeader {
    /// desearializes a `BusHeader` from Bytes; when reading busfiles
    /// assumes Little-Endian! [see here](https://docs.rs/bincode/latest/bincode/config/index.html#options-struct-vs-bincode-functions)
    pub fn from_bytes(bytes: &[u8]) -> BuszHeader {
        let header_struct: BuszHeader =
            // this interprets the bytes in Little Endian!, i.e bytes=[1,0,0,0,0,0,0,0] = 1_u64
            bincode::deserialize(bytes).expect("FAILED to deserialze busz header");
            // bincode::serde::decode_from_slice(bytes, bincode::config::legacy()).expect("FAILED to deserialze record").0; //.expect("FAILED to deserialze header");

        assert_eq!(
            header_struct.lossy_umi, 0,
            "lossy_umi != 0 not supported"
        );
        header_struct
    }
    /// seialize the header to bytes
    /// assumes Little-Endian! [see here](https://docs.rs/bincode/latest/bincode/config/index.html#options-struct-vs-bincode-functions)
    pub fn to_bytes(&self) -> Vec<u8> {
        bincode::serialize(self).expect("FAILED to serialze header")
        // bincode::serde::encode_to_vec(self, bincode::config::legacy()).expect("FAILED to serialze header") //.expect("FAILED to deserialze header");

    }
}


/// parase the header out of the stream/reader
/// 
/// note: this moves the reader forward!
fn read_busz_header(reader: &mut impl Read) -> (BusParams, BuszHeader) {
    let mut header_bytes = [0_u8; BUS_HEADER_SIZE];
    reader.read_exact(&mut header_bytes).expect("failed to read header");
    let header = BusHeader::from_bytes(&header_bytes);
    let params = header.get_params();
    
    assert_eq!(
        &header.magic, b"BUS\x01",
        "Header struct not matching; MAGIC is wrong"
    );

    // the variable header
    let mut var_buffer = Vec::with_capacity(header.tlen as usize);
    for _i in 0..header.tlen {
        var_buffer.push(0_u8);
    }
    reader.read_exact(&mut var_buffer).expect("failed to read variable header");
    
    // BusZHeader
    let mut buszheader_bytes = [0_u8; BUSZ_HEADER_SIZE];
    reader.read_exact(&mut buszheader_bytes).unwrap();
    let busz_header = BuszHeader::from_bytes(&buszheader_bytes);
  
    (params, busz_header)
}




#[cfg(test)]
mod test {
    use crate::busz::blocks::CompressedBlockHeader;
    
    #[test]
    fn test_header_encode_decode() {
        let nbytes = 20;
        let nrecords = 10;
        let h = CompressedBlockHeader::new(nbytes, nrecords);

        assert_eq!(h.get_blocksize_and_nrecords().0, nbytes);
        assert_eq!(h.get_blocksize_and_nrecords().1, nrecords);
    }

    mod external {
        use std::fs::File;
        use std::io::Read;
        use std::path::Path;
        use itertools::Itertools;
        use tempfile::tempdir;
        use crate::busz::BuszWriter;
        use crate::io::{BusRecord, BusWriterPlain, BusReaderPlain, BusParams};
        // use crate::busz::decode::BuszReader;
        use crate::busz::decode_bytes::BuszReader;

        fn compress_busfile(input: &Path, output: &Path, blocksize: usize) {
            let reader = BusReaderPlain::new(input);
            let mut writer = BuszWriter::new(output, reader.params.clone(), blocksize);
            writer.write_iterator(reader.into_iter());
        }

        /// Decompress the `input` busz file into a plain busfile, `output`
        fn decompress_busfile(input: &Path, output: &Path) {
            let reader = BuszReader::new(input);
            let mut writer = BusWriterPlain::new(
                output,
                reader.get_params().clone()
            );

            for r in reader {
                writer.write_record(&r);
            }
        }
        #[test]
        fn test_encode_decode_busz(){
            let v = vec![ 
                BusRecord {CB:10,UMI:11,EC:10,COUNT:13, FLAG: 20 },   // 10
                BusRecord {CB:11,UMI:11,EC:10,COUNT:13, FLAG: 20 },   // 0
                BusRecord {CB:22,UMI:10,EC:10,COUNT:1, FLAG: 0 },   // 0
                BusRecord {CB:22,UMI:11,EC:10,COUNT:1, FLAG: 0 },    // 1
            ];

            // write plain bus
            let dir = tempdir().unwrap();
            let input_plain= dir.path().join("buscompress.bus");
            let mut  writer = BusWriterPlain::new(
                &input_plain, 
                BusParams {cb_len: 16, umi_len: 12}
            );
            writer.write_iterator(v.iter().cloned());
            drop(writer);

            // copmress it
            let copmressed_output= dir.path().join("lalalala.busz");
            compress_busfile(
                &input_plain,
                &copmressed_output,
                100
            );

            // // decode it
            let reader = BuszReader::new(&copmressed_output);
            let recs: Vec<_> = reader.collect();
            assert_eq!(v, recs);

        }

        #[test]
        fn test_encode_decode_busz_biggerfile(){

            let input_plain = Path::new("/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.bus");

            let dir = tempdir().unwrap();
            let copmressed_output = dir.path().join("output.corrected.sort.busz");

            println!("copmressing busfile");
            compress_busfile(
                input_plain,
                &copmressed_output,
                10000
            );
            println!("decoding busfile");
            // // decode it
            let reader = BuszReader::new(&copmressed_output);
            let recs: Vec<_> = reader.collect();

            let x = BusReaderPlain::new(input_plain);
            assert_eq!(x.collect::<Vec<_>>(), recs);

        }

        // #[test]
        // fn test_compress1() {
        //     // let input_compressed = "/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.busz"; 
        //     let input_plain = "/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.bus";
        //     let dir = tempdir().unwrap();
        //     let file_path = dir.path().join("buscompress_testing.busz");
        //     let copmressed_output = file_path.to_str().unwrap();

        //     compress_busfile(
        //         input_plain,
        //         copmressed_output,
        //         10000
        //     );
        // }

        // #[test]
        // #[allow(dead_code)]
        // fn test_compress_full() {
        //     // let input_compressed = "/home/michi/bus_testing/bus_output/output.corrected.sort.busz"; 
        //     let input_plain = "/home/michi/bus_testing/bus_output/output.corrected.sort.bus";
        //     let copmressed_output = "/tmp/buscompress_testing_full.busz";
        //     compress_busfile(
        //         input_plain,
        //         copmressed_output,
        //         10000
        //     );
        // }

        #[test]
        fn test_decompress(){
            // decompress a busfile, check that the contents match the true (uncompressed version)
            let input_compressed = Path::new("/home/michi/bus_testing/bus_output/output.corrected.sort.busz"); 
            let input_plain = Path::new("/home/michi/bus_testing/bus_output/output.corrected.sort.bus");

            let dir = tempdir().unwrap();
            let output= dir.path().join("buscompress_lala.bus");

            let start = std::time::Instant::now();
            decompress_busfile(
                input_compressed,
                &output);

            let elapsed = start.elapsed().as_millis();
            println!("decoding: {elapsed} ms");


            let r = BusReaderPlain::new(&output);
            let r_original = BusReaderPlain::new(input_plain);

            for (r1, r2) in r.zip_eq(r_original) {
                assert_eq!(r1, r2)
            }

        }

        #[test]
        fn test_iterator(){
            
            let buszfile = "/home/michi/bus_testing/bus_output_shortest/output.corrected.sort.busz";
            let buffer_busz = bus_to_mem(buszfile);
            let reader_busz = BuszReader::from_read(buffer_busz.as_slice());

            let busfile  = "/home/michi/bus_testing/bus_output_shortest/output.corrected.sort.bus";
            let buffer_bus = bus_to_mem(busfile);
            let r_original = BusReaderPlain::from_read(buffer_bus.as_slice());

            for (r1, r2) in reader_busz.zip_eq(r_original) {
                assert_eq!(r1, r2)
            }

        }   
        fn bus_to_mem(busfile: &str) -> Vec<u8>{
            let mut buffer = Vec::new();
            let mut f= File::open(busfile).unwrap();
            f.read_to_end(&mut buffer).unwrap();
            buffer
        }
    }
}