bustools_core/busz/mod.rs
1//! Dealing with the [busz compression format](https://github.com/BUStools/BUSZ-format)
2//!
3//! # Examples
4//! ## Reading a compressed bus file
5//! ```rust, no_run
6//! use bustools_core::busz::BuszReader;
7//! # use std::path::Path;
8//! let reader = BuszReader::new(Path::new("/some/file.busz"));
9//! for record in reader {
10//! // ...
11//! }
12//! ```
13//! ## Writing to a compressed bus file
14//! ```rust, no_run
15//! # use std::path::Path;
16//! use bustools_core::record;
17//! use bustools_core::busz::BuszWriter;
18//! use bustools_core::io::{BusRecord, BusParams};
19//! let blocksize = 10000;
20//! let params = BusParams {cb_len: 16, umi_len: 12};
21//! let mut writer = BuszWriter::new(Path::new("/some/file.busz"), params, blocksize);
22//! let records = vec![
23//! record!(0, 1, 0, 12, 0 ),
24//! record!(0, 1, 1, 2, 0 ),
25//! record!(0, 2, 0, 12, 0 ),
26//! record!(1, 1, 1, 2, 0 ),
27//! record!(1, 2, 1, 2, 0 ),
28//! record!(1, 1, 1, 2, 0 ),
29//! ];
30//! writer.write_iterator(records.into_iter());
31//! ```
32//!
33//! # About Bitvec and Memory layout
34//! This code relies heavily on BitVec. It uses [`bitvec`] to encode/decode
35//! the bits of the busz records, in particular Fibbonnaci encoding and NewPFD encoding.
36//!
37//! **A certain peculiarity though**:
38//! To turn bytes (e.g from a u64 or read from the file) into [`bitvec::vec::BitVec`] we use `BitVec::from_bytes(byte_Array)`
39//! This takes the bytes literally in the order of the array.
40//! Yet `bustools` writes busz in little endian format, i.e. the byte order is reversed.
41//! In particular, each busz block contains entries for CB,UMI... each PADDED with zeros afterwards(to a multiple of 64)
42//! On disk this is how it looks like:
43//! ```bash, no_run
44//! 0000000...00000000[CBs in Fibbonnaci]
45//! 0000000...00000000[UMIs in Fibbonnaci]
46//! ```
47//!
48//! Even more, the fibbonacci encoding must be done with little endian byte order, if on disk it looks like
49//! ```bash,no_run
50//! aaaaaaaabbbbbbbbccccccccddddddddeeeeeeeeffffffffgggggggghhhhhhhh //bits
51//! ```
52//! the correct fibonacci stream to decode is
53//! ```bash, no_run
54//! ddddddddccccccccbbbbbbbbaaaaaaaahhhhhhhhgggggggg....
55//! ```
56
57use std::io::Read;
58use bitvec::{order::Msb0, prelude as bv};
59use serde::{Serialize, Deserialize};
60use crate::io::{BUS_HEADER_SIZE, BusHeader, BusParams};
61
62mod blocks;
63#[deprecated]
64mod decode;
65pub mod decode_bytes;
66mod encode;
67mod utils;
68mod runlength_codec;
69
70// exposing some core classes/functions to the public API
71pub use encode::BuszWriter;
72// pub use decode::BuszReader;
73pub use decode_bytes::BuszReader;
74
75const PFD_BLOCKSIZE: usize = 512; // size of a PFD block within busz (this many ECs get encoded together)
76
77pub (crate) type BuszBitSlice = bv::BitSlice<u8,Msb0>;
78/// reftype that goes with [`MyBitSlice`]
79pub (crate) type BuszBitVector = bv::BitVec<u8, Msb0>;
80
81
82
83
84const BUSZ_HEADER_SIZE: usize = 4+4+4;
85/// Some busz-file specific headers, coming after the regular [`BusHeader`]
86#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
87struct BuszHeader {
88 block_size: u32,
89 pfd_block_size: u32,
90 lossy_umi: u32,
91}
92impl BuszHeader {
93 /// desearializes a `BusHeader` from Bytes; when reading busfiles
94 /// assumes Little-Endian! [see here](https://docs.rs/bincode/latest/bincode/config/index.html#options-struct-vs-bincode-functions)
95 pub fn from_bytes(bytes: &[u8]) -> BuszHeader {
96 let header_struct: BuszHeader =
97 // this interprets the bytes in Little Endian!, i.e bytes=[1,0,0,0,0,0,0,0] = 1_u64
98 bincode::deserialize(bytes).expect("FAILED to deserialze busz header");
99 // bincode::serde::decode_from_slice(bytes, bincode::config::legacy()).expect("FAILED to deserialze record").0; //.expect("FAILED to deserialze header");
100
101 assert_eq!(
102 header_struct.lossy_umi, 0,
103 "lossy_umi != 0 not supported"
104 );
105 header_struct
106 }
107 /// seialize the header to bytes
108 /// assumes Little-Endian! [see here](https://docs.rs/bincode/latest/bincode/config/index.html#options-struct-vs-bincode-functions)
109 pub fn to_bytes(&self) -> Vec<u8> {
110 bincode::serialize(self).expect("FAILED to serialze header")
111 // bincode::serde::encode_to_vec(self, bincode::config::legacy()).expect("FAILED to serialze header") //.expect("FAILED to deserialze header");
112
113 }
114}
115
116
117/// parase the header out of the stream/reader
118///
119/// note: this moves the reader forward!
120fn read_busz_header(reader: &mut impl Read) -> (BusParams, BuszHeader) {
121 let mut header_bytes = [0_u8; BUS_HEADER_SIZE];
122 reader.read_exact(&mut header_bytes).expect("failed to read header");
123 let header = BusHeader::from_bytes(&header_bytes);
124 let params = header.get_params();
125
126 assert_eq!(
127 &header.magic, b"BUS\x01",
128 "Header struct not matching; MAGIC is wrong"
129 );
130
131 // the variable header
132 let mut var_buffer = Vec::with_capacity(header.tlen as usize);
133 for _i in 0..header.tlen {
134 var_buffer.push(0_u8);
135 }
136 reader.read_exact(&mut var_buffer).expect("failed to read variable header");
137
138 // BusZHeader
139 let mut buszheader_bytes = [0_u8; BUSZ_HEADER_SIZE];
140 reader.read_exact(&mut buszheader_bytes).unwrap();
141 let busz_header = BuszHeader::from_bytes(&buszheader_bytes);
142
143 (params, busz_header)
144}
145
146
147
148
149#[cfg(test)]
150mod test {
151 use crate::busz::blocks::CompressedBlockHeader;
152
153 #[test]
154 fn test_header_encode_decode() {
155 let nbytes = 20;
156 let nrecords = 10;
157 let h = CompressedBlockHeader::new(nbytes, nrecords);
158
159 assert_eq!(h.get_blocksize_and_nrecords().0, nbytes);
160 assert_eq!(h.get_blocksize_and_nrecords().1, nrecords);
161 }
162
163 mod external {
164 use std::fs::File;
165 use std::io::Read;
166 use std::path::Path;
167 use itertools::Itertools;
168 use tempfile::tempdir;
169 use crate::busz::BuszWriter;
170 use crate::io::{BusRecord, BusWriterPlain, BusReaderPlain, BusParams};
171 // use crate::busz::decode::BuszReader;
172 use crate::busz::decode_bytes::BuszReader;
173
174 fn compress_busfile(input: &Path, output: &Path, blocksize: usize) {
175 let reader = BusReaderPlain::new(input);
176 let mut writer = BuszWriter::new(output, reader.params.clone(), blocksize);
177 writer.write_iterator(reader.into_iter());
178 }
179
180 /// Decompress the `input` busz file into a plain busfile, `output`
181 fn decompress_busfile(input: &Path, output: &Path) {
182 let reader = BuszReader::new(input);
183 let mut writer = BusWriterPlain::new(
184 output,
185 reader.get_params().clone()
186 );
187
188 for r in reader {
189 writer.write_record(&r);
190 }
191 }
192 #[test]
193 fn test_encode_decode_busz(){
194 let v = vec![
195 BusRecord {CB:10,UMI:11,EC:10,COUNT:13, FLAG: 20 }, // 10
196 BusRecord {CB:11,UMI:11,EC:10,COUNT:13, FLAG: 20 }, // 0
197 BusRecord {CB:22,UMI:10,EC:10,COUNT:1, FLAG: 0 }, // 0
198 BusRecord {CB:22,UMI:11,EC:10,COUNT:1, FLAG: 0 }, // 1
199 ];
200
201 // write plain bus
202 let dir = tempdir().unwrap();
203 let input_plain= dir.path().join("buscompress.bus");
204 let mut writer = BusWriterPlain::new(
205 &input_plain,
206 BusParams {cb_len: 16, umi_len: 12}
207 );
208 writer.write_iterator(v.iter().cloned());
209 drop(writer);
210
211 // copmress it
212 let copmressed_output= dir.path().join("lalalala.busz");
213 compress_busfile(
214 &input_plain,
215 &copmressed_output,
216 100
217 );
218
219 // // decode it
220 let reader = BuszReader::new(&copmressed_output);
221 let recs: Vec<_> = reader.collect();
222 assert_eq!(v, recs);
223
224 }
225
226 #[test]
227 fn test_encode_decode_busz_biggerfile(){
228
229 let input_plain = Path::new("/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.bus");
230
231 let dir = tempdir().unwrap();
232 let copmressed_output = dir.path().join("output.corrected.sort.busz");
233
234 println!("copmressing busfile");
235 compress_busfile(
236 input_plain,
237 &copmressed_output,
238 10000
239 );
240 println!("decoding busfile");
241 // // decode it
242 let reader = BuszReader::new(&copmressed_output);
243 let recs: Vec<_> = reader.collect();
244
245 let x = BusReaderPlain::new(input_plain);
246 assert_eq!(x.collect::<Vec<_>>(), recs);
247
248 }
249
250 // #[test]
251 // fn test_compress1() {
252 // // let input_compressed = "/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.busz";
253 // let input_plain = "/home/michi/bus_testing/bus_output_shorter/output.corrected.sort.bus";
254 // let dir = tempdir().unwrap();
255 // let file_path = dir.path().join("buscompress_testing.busz");
256 // let copmressed_output = file_path.to_str().unwrap();
257
258 // compress_busfile(
259 // input_plain,
260 // copmressed_output,
261 // 10000
262 // );
263 // }
264
265 // #[test]
266 // #[allow(dead_code)]
267 // fn test_compress_full() {
268 // // let input_compressed = "/home/michi/bus_testing/bus_output/output.corrected.sort.busz";
269 // let input_plain = "/home/michi/bus_testing/bus_output/output.corrected.sort.bus";
270 // let copmressed_output = "/tmp/buscompress_testing_full.busz";
271 // compress_busfile(
272 // input_plain,
273 // copmressed_output,
274 // 10000
275 // );
276 // }
277
278 #[test]
279 fn test_decompress(){
280 // decompress a busfile, check that the contents match the true (uncompressed version)
281 let input_compressed = Path::new("/home/michi/bus_testing/bus_output/output.corrected.sort.busz");
282 let input_plain = Path::new("/home/michi/bus_testing/bus_output/output.corrected.sort.bus");
283
284 let dir = tempdir().unwrap();
285 let output= dir.path().join("buscompress_lala.bus");
286
287 let start = std::time::Instant::now();
288 decompress_busfile(
289 input_compressed,
290 &output);
291
292 let elapsed = start.elapsed().as_millis();
293 println!("decoding: {elapsed} ms");
294
295
296 let r = BusReaderPlain::new(&output);
297 let r_original = BusReaderPlain::new(input_plain);
298
299 for (r1, r2) in r.zip_eq(r_original) {
300 assert_eq!(r1, r2)
301 }
302
303 }
304
305 #[test]
306 fn test_iterator(){
307
308 let buszfile = "/home/michi/bus_testing/bus_output_shortest/output.corrected.sort.busz";
309 let buffer_busz = bus_to_mem(buszfile);
310 let reader_busz = BuszReader::from_read(buffer_busz.as_slice());
311
312 let busfile = "/home/michi/bus_testing/bus_output_shortest/output.corrected.sort.bus";
313 let buffer_bus = bus_to_mem(busfile);
314 let r_original = BusReaderPlain::from_read(buffer_bus.as_slice());
315
316 for (r1, r2) in reader_busz.zip_eq(r_original) {
317 assert_eq!(r1, r2)
318 }
319
320 }
321 fn bus_to_mem(busfile: &str) -> Vec<u8>{
322 let mut buffer = Vec::new();
323 let mut f= File::open(busfile).unwrap();
324 f.read_to_end(&mut buffer).unwrap();
325 buffer
326 }
327 }
328}