Skip to main content

bustools_core/busz/
mod.rs

1//! Dealing with the [busz compression format](https://github.com/BUStools/BUSZ-format)
2//! 
3//! # Examples
4//! ## Reading a compressed bus file
5//! ```rust, no_run
6//! use bustools_core::busz::BuszReader;
7//! # use std::path::Path;
8//! let reader = BuszReader::new(Path::new("/some/file.busz"));
9//! for record in reader {
10//!     // ...
11//! }
12//! ```
13//! ## Writing to a compressed bus file
14//! ```rust, no_run
15//! # use std::path::Path;
16//! use bustools_core::record;
17//! use bustools_core::busz::BuszWriter;
18//! use bustools_core::io::{BusRecord, BusParams};
19//! let blocksize = 10000;
20//! let params = BusParams {cb_len: 16, umi_len: 12};
21//! let mut writer = BuszWriter::new(Path::new("/some/file.busz"), params, blocksize);
22//! let records = vec![
23//!     record!(0, 1, 0, 12,  0 ),
24//!     record!(0, 1, 1, 2,  0 ),
25//!     record!(0, 2, 0, 12,  0 ),
26//!     record!(1, 1, 1, 2,  0 ),
27//!     record!(1, 2, 1, 2,  0 ),
28//!     record!(1, 1, 1, 2,  0 ),
29//! ];
30//! writer.write_iterator(records.into_iter());
31//! ```
32//! 
33//! # About Bitvec and Memory layout
34//! This code relies heavily on BitVec. It uses [`bitvec`] to encode/decode
35//! the bits of the busz records, in particular Fibbonnaci encoding and NewPFD encoding.
36//! 
37//! **A certain peculiarity though**:
38//! To turn bytes (e.g from a u64 or read from the file) into [`bitvec::vec::BitVec`] we use `BitVec::from_bytes(byte_Array)`
39//! This takes the bytes literally in the order of the array.
40//! Yet `bustools` writes busz in little endian format, i.e. the byte order is reversed.
41//! In particular, each busz block contains entries for CB,UMI... each PADDED with zeros afterwards(to a multiple of 64)
42//! On disk this is how it looks like:
43//! ```bash, no_run
44//! 0000000...00000000[CBs in Fibbonnaci]
45//! 0000000...00000000[UMIs in Fibbonnaci]
46//! ```
47//! 
48//! Even more, the fibbonacci encoding must be done with little endian byte order, if on disk it looks like
49//! ```bash,no_run
50//! aaaaaaaabbbbbbbbccccccccddddddddeeeeeeeeffffffffgggggggghhhhhhhh  //bits
51//! ```
52//! the correct fibonacci stream to decode is
53//! ```bash, no_run
54//! ddddddddccccccccbbbbbbbbaaaaaaaahhhhhhhhgggggggg....
55//! ``` 
56
57use std::io::Read;
58use bitvec::{order::Msb0, prelude as bv};
59use serde::{Serialize, Deserialize};
60use crate::io::{BUS_HEADER_SIZE, BusHeader, BusParams};
61
62mod blocks;
63#[deprecated]
64mod decode;
65pub mod decode_bytes;
66mod encode;
67mod utils;
68mod runlength_codec;
69
70// exposing some core classes/functions to the public API
71pub use encode::BuszWriter;
72// pub use decode::BuszReader;
73pub use decode_bytes::BuszReader;
74
75const PFD_BLOCKSIZE: usize = 512; // size of a PFD block within busz (this many ECs get encoded together)
76
77pub (crate) type BuszBitSlice = bv::BitSlice<u8,Msb0>;
78/// reftype that goes with [`MyBitSlice`]
79pub (crate) type BuszBitVector = bv::BitVec<u8, Msb0>;
80
81
82
83
84const BUSZ_HEADER_SIZE: usize = 4+4+4;
85/// Some busz-file specific headers, coming after the regular [`BusHeader`]
86#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
87struct BuszHeader {
88    block_size: u32,
89    pfd_block_size: u32,
90    lossy_umi: u32,
91}
92impl BuszHeader {
93    /// desearializes a `BusHeader` from Bytes; when reading busfiles
94    /// assumes Little-Endian! [see here](https://docs.rs/bincode/latest/bincode/config/index.html#options-struct-vs-bincode-functions)
95    pub fn from_bytes(bytes: &[u8]) -> BuszHeader {
96        let header_struct: BuszHeader =
97            // this interprets the bytes in Little Endian!, i.e bytes=[1,0,0,0,0,0,0,0] = 1_u64
98            bincode::deserialize(bytes).expect("FAILED to deserialze busz header");
99            // bincode::serde::decode_from_slice(bytes, bincode::config::legacy()).expect("FAILED to deserialze record").0; //.expect("FAILED to deserialze header");
100
101        assert_eq!(
102            header_struct.lossy_umi, 0,
103            "lossy_umi != 0 not supported"
104        );
105        header_struct
106    }
107    /// seialize the header to bytes
108    /// assumes Little-Endian! [see here](https://docs.rs/bincode/latest/bincode/config/index.html#options-struct-vs-bincode-functions)
109    pub fn to_bytes(&self) -> Vec<u8> {
110        bincode::serialize(self).expect("FAILED to serialze header")
111        // bincode::serde::encode_to_vec(self, bincode::config::legacy()).expect("FAILED to serialze header") //.expect("FAILED to deserialze header");
112
113    }
114}
115
116
117/// parase the header out of the stream/reader
118/// 
119/// note: this moves the reader forward!
120fn read_busz_header(reader: &mut impl Read) -> (BusParams, BuszHeader) {
121    let mut header_bytes = [0_u8; BUS_HEADER_SIZE];
122    reader.read_exact(&mut header_bytes).expect("failed to read header");
123    let header = BusHeader::from_bytes(&header_bytes);
124    let params = header.get_params();
125    
126    assert_eq!(
127        &header.magic, b"BUS\x01",
128        "Header struct not matching; MAGIC is wrong"
129    );
130
131    // the variable header
132    let mut var_buffer = Vec::with_capacity(header.tlen as usize);
133    for _i in 0..header.tlen {
134        var_buffer.push(0_u8);
135    }
136    reader.read_exact(&mut var_buffer).expect("failed to read variable header");
137    
138    // BusZHeader
139    let mut buszheader_bytes = [0_u8; BUSZ_HEADER_SIZE];
140    reader.read_exact(&mut buszheader_bytes).unwrap();
141    let busz_header = BuszHeader::from_bytes(&buszheader_bytes);
142  
143    (params, busz_header)
144}
145
146
147
148
149#[cfg(test)]
150mod test {
151    use crate::busz::blocks::CompressedBlockHeader;
152    
153    #[test]
154    fn test_header_encode_decode() {
155        let nbytes = 20;
156        let nrecords = 10;
157        let h = CompressedBlockHeader::new(nbytes, nrecords);
158
159        assert_eq!(h.get_blocksize_and_nrecords().0, nbytes);
160        assert_eq!(h.get_blocksize_and_nrecords().1, nrecords);
161    }
162
163    mod external {
164        use std::fs::File;
165        use std::io::Read;
166        use std::path::Path;
167        use itertools::Itertools;
168        use tempfile::tempdir;
169        use crate::busz::BuszWriter;
170        use crate::io::{BusRecord, BusWriterPlain, BusReaderPlain, BusParams};
171        // use crate::busz::decode::BuszReader;
172        use crate::busz::decode_bytes::BuszReader;
173
174        fn compress_busfile(input: &Path, output: &Path, blocksize: usize) {
175            let reader = BusReaderPlain::new(input);
176            let mut writer = BuszWriter::new(output, reader.params.clone(), blocksize);
177            writer.write_iterator(reader.into_iter());
178        }
179
180        /// Decompress the `input` busz file into a plain busfile, `output`
181        fn decompress_busfile(input: &Path, output: &Path) {
182            let reader = BuszReader::new(input);
183            let mut writer = BusWriterPlain::new(
184                output,
185                reader.get_params().clone()
186            );
187
188            for r in reader {
189                writer.write_record(&r);
190            }
191        }
192        #[test]
193        fn test_encode_decode_busz(){
194            let v = vec![ 
195                BusRecord {CB:10,UMI:11,EC:10,COUNT:13, FLAG: 20 },   // 10
196                BusRecord {CB:11,UMI:11,EC:10,COUNT:13, FLAG: 20 },   // 0
197                BusRecord {CB:22,UMI:10,EC:10,COUNT:1, FLAG: 0 },   // 0
198                BusRecord {CB:22,UMI:11,EC:10,COUNT:1, FLAG: 0 },    // 1
199            ];
200
201            // write plain bus
202            let dir = tempdir().unwrap();
203            let input_plain= dir.path().join("buscompress.bus");
204            let mut  writer = BusWriterPlain::new(
205                &input_plain, 
206                BusParams {cb_len: 16, umi_len: 12}
207            );
208            writer.write_iterator(v.iter().cloned());
209            drop(writer);
210
211            // copmress it
212            let copmressed_output= dir.path().join("lalalala.busz");
213            compress_busfile(
214                &input_plain,
215                &copmressed_output,
216                100
217            );
218
219            // // decode it
220            let reader = BuszReader::new(&copmressed_output);
221            let recs: Vec<_> = reader.collect();
222            assert_eq!(v, recs);
223
224        }
225
226        #[test]
227        fn test_encode_decode_busz_biggerfile(){
228
229            let input_plain = Path::new("/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.bus");
230
231            let dir = tempdir().unwrap();
232            let copmressed_output = dir.path().join("output.corrected.sort.busz");
233
234            println!("copmressing busfile");
235            compress_busfile(
236                input_plain,
237                &copmressed_output,
238                10000
239            );
240            println!("decoding busfile");
241            // // decode it
242            let reader = BuszReader::new(&copmressed_output);
243            let recs: Vec<_> = reader.collect();
244
245            let x = BusReaderPlain::new(input_plain);
246            assert_eq!(x.collect::<Vec<_>>(), recs);
247
248        }
249
250        // #[test]
251        // fn test_compress1() {
252        //     // let input_compressed = "/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.busz"; 
253        //     let input_plain = "/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.bus";
254        //     let dir = tempdir().unwrap();
255        //     let file_path = dir.path().join("buscompress_testing.busz");
256        //     let copmressed_output = file_path.to_str().unwrap();
257
258        //     compress_busfile(
259        //         input_plain,
260        //         copmressed_output,
261        //         10000
262        //     );
263        // }
264
265        // #[test]
266        // #[allow(dead_code)]
267        // fn test_compress_full() {
268        //     // let input_compressed = "/home/michi/bus_testing/bus_output/output.corrected.sort.busz"; 
269        //     let input_plain = "/home/michi/bus_testing/bus_output/output.corrected.sort.bus";
270        //     let copmressed_output = "/tmp/buscompress_testing_full.busz";
271        //     compress_busfile(
272        //         input_plain,
273        //         copmressed_output,
274        //         10000
275        //     );
276        // }
277
278        #[test]
279        fn test_decompress(){
280            // decompress a busfile, check that the contents match the true (uncompressed version)
281            let input_compressed = Path::new("/home/michi/bus_testing/bus_output/output.corrected.sort.busz"); 
282            let input_plain = Path::new("/home/michi/bus_testing/bus_output/output.corrected.sort.bus");
283
284            let dir = tempdir().unwrap();
285            let output= dir.path().join("buscompress_lala.bus");
286
287            let start = std::time::Instant::now();
288            decompress_busfile(
289                input_compressed,
290                &output);
291
292            let elapsed = start.elapsed().as_millis();
293            println!("decoding: {elapsed} ms");
294
295
296            let r = BusReaderPlain::new(&output);
297            let r_original = BusReaderPlain::new(input_plain);
298
299            for (r1, r2) in r.zip_eq(r_original) {
300                assert_eq!(r1, r2)
301            }
302
303        }
304
305        #[test]
306        fn test_iterator(){
307            
308            let buszfile = "/home/michi/bus_testing/bus_output_shortest/output.corrected.sort.busz";
309            let buffer_busz = bus_to_mem(buszfile);
310            let reader_busz = BuszReader::from_read(buffer_busz.as_slice());
311
312            let busfile  = "/home/michi/bus_testing/bus_output_shortest/output.corrected.sort.bus";
313            let buffer_bus = bus_to_mem(busfile);
314            let r_original = BusReaderPlain::from_read(buffer_bus.as_slice());
315
316            for (r1, r2) in reader_busz.zip_eq(r_original) {
317                assert_eq!(r1, r2)
318            }
319
320        }   
321        fn bus_to_mem(busfile: &str) -> Vec<u8>{
322            let mut buffer = Vec::new();
323            let mut f= File::open(busfile).unwrap();
324            f.read_to_end(&mut buffer).unwrap();
325            buffer
326        }
327    }
328}