Skip to main content

bustools_core/
io.rs

1//! The io module of bustools deals with reading and writing busfiles.
2//!
3//! The most important components are:
4//! - [`BusRecord`]: a single record of a Busfile, containing CB/UMI/EC/COUNT
5//! - [`BusFolder`]: representing the directory structure of kallisto quantification
6//! - [`BusReader`]: to iterate over Records of a busfile
7//! - [`BusWriter`]: writing records to busfile
8//!
9//! # Example
10//! ```rust, no_run
11//! # use bustools_core::io::{BusReader, BusParams, BusWriter, BusRecord};
12//! # use std::path::Path;
13//! let bus = BusReader::new(Path::new("/tmp/some.bus"));
14//! let params = BusParams {cb_len: 16, umi_len: 12};
15//! let mut writer = BusWriter::new(Path::new("/tmp/out.bus"), params);
16//! let records: Vec<BusRecord> = bus.collect();
17//! writer.write_iterator(records.into_iter());
18//!
19#![allow(non_snake_case)]
20
21use crate::busz::{BuszReader, BuszWriter};
22use crate::consistent_genes::{make_mapper, Ec2GeneMapper, EC};
23use crate::consistent_transcripts::{
24    make_mapper_transcript, Ec2TranscriptMapper, TranscriptId, Transcriptname,
25};
26use crate::iterators::{CbUmiGroupIterator, CellGroupIterator};
27use bincode;
28use serde;
29use std::collections::HashMap;
30use std::fs::File;
31use std::io::{BufRead, BufReader, BufWriter, Error, Read, Write};
32use std::path::{Path, PathBuf};
33// use std::u32;
34use rkyv::{self, AlignedVec, Deserialize};
35use tempfile::TempDir;
36
37pub(crate) const BUS_ENTRY_SIZE: usize = 32;
38pub const BUS_HEADER_SIZE: usize = 20;
39
40/// Basic unit of a busfile as created by kallisto.
41/// Represents a single scRNAseq molecule, CB/UMI/EC and COUNT
42///
43/// from python implemenation/specification
44/// BUS_ENTRY_SIZE = 32  # each entry is 32 bytes!!
45///  unpack_str = 'QQiIIxxxx'
46/// Q: 8byte unsigned long,long int
47/// i: 4byte int
48/// I: unsigned int, 4byte
49#[derive(
50    serde::Serialize,
51    serde::Deserialize,
52    Debug,
53    PartialEq,
54    Eq,
55    Clone,
56    rkyv::Archive,
57    rkyv::Deserialize,
58    rkyv::Serialize,
59)] // TODO take away the copy and clone
60#[archive(check_bytes)]
61pub struct BusRecord {
62    pub CB: u64,    // 8byte
63    pub UMI: u64,   // 8byte
64    pub EC: u32,    // 4v byte
65    pub COUNT: u32, // 4v byte
66    pub FLAG: u32,  // 4v byte    including this, we have 28 bytes, missing 4 to fill up
67                    // PAD: u32     // just padding to fill 32bytes
68}
69impl BusRecord {
70    pub fn to_bytes_rkyv(&self) -> AlignedVec {
71        rkyv::to_bytes::<_, 32>(self).expect("failed to serialize vec")
72    }
73
74    /// converts a busrecord to byte representation, e.g. to write the bytes to a busfile
75    pub fn to_bytes_bincode(&self) -> Vec<u8> {
76        let mut binrecord = bincode::serialize(self).expect("FAILED to serialze record");
77
78        // the struct is only 28bytes, so we need 4 padding bytes
79        for _i in 0..4 {
80            //TODO feel like theres a better way to do this
81            binrecord.push(0);
82        }
83        binrecord
84    }
85
86    pub fn to_bytes(&self) -> AlignedVec {
87        // self.to_bytes_bincode()
88        self.to_bytes_rkyv()
89    }
90
91    /// coverts bytes into burecord, e.g. when reading a busfile
92    pub fn from_bytes(bytes: &[u8]) -> Self {
93        // bincode::serde::decode_from_slice(bytes, bincode::config::legacy()).expect("FAILED to deserialze record").0 //.expect("FAILED to deserialze header");
94
95        // let archived = rkyv::check_archived_root::<BusRecord>(&bytes[..]).unwrap();
96        let archived = unsafe { rkyv::archived_root::<BusRecord>(bytes) };
97        let s: BusRecord = archived.deserialize(&mut rkyv::Infallible).unwrap();
98        s
99        // bincode::deserialize(bytes).expect("deserial error")
100    }
101}
102
103pub fn record_tobytes(r: &BusRecord) -> Vec<u8> {
104    let mut bytes: Vec<u8> = Vec::with_capacity(BUS_ENTRY_SIZE);
105    let cb_bytes = r.CB.to_ne_bytes();
106    bytes.extend_from_slice(&cb_bytes);
107
108    let umi_bytes = r.UMI.to_ne_bytes();
109    bytes.extend_from_slice(&umi_bytes);
110
111    let ec_bytes = r.EC.to_ne_bytes();
112    bytes.extend_from_slice(&ec_bytes);
113
114    let count_bytes = r.COUNT.to_ne_bytes();
115    bytes.extend_from_slice(&count_bytes);
116
117    let flag_bytes = r.FLAG.to_ne_bytes();
118    bytes.extend_from_slice(&flag_bytes);
119
120    let pad: u32 = 0;
121    let pad_bytes = pad.to_ne_bytes();
122    bytes.extend_from_slice(&pad_bytes);
123    bytes
124}
125
126pub fn record_frombytes(bytes: &mut impl Read) -> Result<BusRecord, Error> {
127    // assert_eq!(bytes.len(), 32);
128    let mut cb_bytes = [0_u8; 8];
129    bytes.read_exact(&mut cb_bytes)?;
130
131    let mut umi_bytes = [0_u8; 8];
132    bytes.read_exact(&mut umi_bytes)?;
133
134    let mut ec_bytes = [0_u8; 4];
135    bytes.read_exact(&mut ec_bytes)?;
136
137    let mut count_bytes = [0_u8; 4];
138    bytes.read_exact(&mut count_bytes)?;
139
140    let mut flag_bytes = [0_u8; 4];
141    bytes.read_exact(&mut flag_bytes)?;
142
143    let mut pad_bytes = [0_u8; 4];
144    bytes.read_exact(&mut pad_bytes)?;
145
146    assert_eq!(pad_bytes, [0_u8; 4]);
147    let record = BusRecord {
148        CB: u64::from_ne_bytes(cb_bytes),
149        UMI: u64::from_ne_bytes(umi_bytes),
150        EC: u32::from_ne_bytes(ec_bytes),
151        COUNT: u32::from_ne_bytes(count_bytes),
152        FLAG: u32::from_ne_bytes(flag_bytes),
153    };
154    Ok(record)
155}
156
157#[test]
158fn ser() {
159    use crate::record;
160    use std::io::Cursor;
161    let rec = record!(121521, 124163, 43, 123, 0);
162    let bytes = record_tobytes(&rec);
163
164    let r = BusRecord::from_bytes(&bytes);
165    assert_eq!(r, rec);
166
167    let mut bytestream = Cursor::new(bytes);
168    let r2 = record_frombytes(&mut bytestream).unwrap();
169    assert_eq!(r2, rec);
170}
171
172#[test]
173// make sure rkyv writes the same bytes as would be with bincode
174// esp the padding to get to 32 bytes!!
175fn test_rkyv_write() {
176    use crate::record;
177    let rec = record!(121521, 124163, 43, u32::MAX - 1, 12);
178
179    let bytes_true = rec.to_bytes_bincode();
180
181    let byts_rkyv = rec.to_bytes_rkyv();
182    println!("{}", byts_rkyv.len());
183    println!("{:?}", byts_rkyv);
184    assert_eq!(byts_rkyv.to_vec(), bytes_true);
185}
186
187#[derive(Debug, PartialEq, Eq, Clone)]
188pub struct BusParams {
189    pub cb_len: u32,
190    pub umi_len: u32,
191}
192
193/// Header of a busfile, as specified by kallisto
194/// the only things that are variable are:
195/// - cb_len: The number of bases in the cellbarcode (usually 16BP)
196/// - umi_len: The number of bases in the cellbarcode (usually 12BP)
197/// # Example
198/// ```
199/// // use bustools::io::BusHeader;
200/// // let header = BusHeader::new(16, 12, 1);
201/// // can also be obtained from an existing busfile
202/// // let header = BusHeader::from_file("somefile.bus");
203/// ```
204#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone)]
205pub struct BusHeader {
206    //4sIIII: 20bytes
207    pub(crate) magic: [u8; 4],
208    pub(crate) version: u32,
209    pub(crate) cb_len: u32,
210    pub(crate) umi_len: u32,
211    pub(crate) tlen: u32,
212}
213
214impl BusHeader {
215    /// construct a busheader from CB/UMI length
216    pub fn new(cb_len: u32, umi_len: u32, tlen: u32, compressed: bool) -> BusHeader {
217        let magic: [u8; 4] = if compressed { *b"BUS\x01" } else { *b"BUS\x00" };
218        BusHeader { cb_len, umi_len, tlen, magic, version: 1 }
219    }
220
221    /// creates the header struct, extracting it from an existing bus file
222    pub fn from_file(fname: &Path) -> BusHeader {
223        // getting 20 bytes out of the file, which is the header
224        let file =
225            std::fs::File::open(fname).unwrap_or_else(|_| panic!("file not found: {fname:?}"));
226        let mut header = Vec::with_capacity(BUS_HEADER_SIZE);
227        let _n = file
228            .take(BUS_HEADER_SIZE as u64)
229            .read_to_end(&mut header)
230            .unwrap();
231        BusHeader::from_bytes(&header)
232    }
233
234    /// desearializes a BusHeader from Bytes; when reading busfiles
235    pub fn from_bytes(bytes: &[u8]) -> BusHeader {
236        let header_struct: BusHeader =
237            bincode::deserialize(bytes).expect("FAILED to deserialze header");
238        header_struct
239    }
240
241    /// seialize the header to bytes
242    pub fn to_bytes(&self) -> Vec<u8> {
243        bincode::serialize(self).expect("FAILED to serialze header")
244    }
245
246    /// return the length of the variable part of the busheader
247    pub fn get_tlen(&self) -> u32 {
248        self.tlen
249    }
250
251    /// extract parameters, i.e. CB/UMI length
252    pub fn get_params(&self) -> BusParams {
253        BusParams { cb_len: self.cb_len, umi_len: self.umi_len }
254    }
255
256    // /// return the length of the Cell Barcode in the busfile
257    // pub fn get_cb_len(&self) -> u32 {
258    //     self.cb_len
259    // }
260}
261
262/// A marker trait for iterators over CB/UMI/gene_EC iterators
263/// to create an iterator compatible with this framework,
264/// just implement the usual iterator trait (giving the next() function)
265/// and also tag the iterator with CUGIterator: `impl CUGIterator for Dummy {}`
266/// ## Example
267/// ```
268/// struct Dummy{
269///     f: u32
270/// }
271/// use bustools_core::io::{BusRecord, CUGIterator};
272/// impl Iterator for Dummy {
273///     type Item=BusRecord;
274///
275///     fn next(&mut self) -> Option<Self::Item> {
276///         Some(BusRecord{CB: 0,UMI: 0, EC: 0, COUNT: self.f, FLAG:0})
277///     }
278/// }
279/// impl CUGIterator for Dummy {}
280/// let mut x =  Dummy{f:1};
281/// let s = x.next();
282/// println!("{s:?}")
283/// ```
284///
285/// pretty much a type alias for Iterator<Item=BusRecord>
286pub trait CUGIterator: Iterator<Item = BusRecord> {}
287pub const DEFAULT_BUF_SIZE: usize = 800 * 1024; // 800  KB
288
289/// The main struct to read (un)compressed bus files.
290/// Should always be used in favour of the concrete [`BusReaderPlain`] (plain .bus files) and [`BuszReader`] (compressed .busz files),
291/// as this allows code to be transparent/agnositc of wether the underlying file is compressed or not.
292///
293/// # Example
294/// Uncompressed busfile
295/// ```rust, no_run
296/// # use bustools_core::io::BusReader;
297/// # use std::path::Path;
298/// let breader = BusReader::new(Path::new("somefile.bus"));
299/// for record in breader{
300///     let cb= record.CB;
301/// };
302/// ```
303/// Compressed busfile (works just the same way, note the different file extension)
304/// ```rust, no_run
305/// # use bustools_core::io::BusReader;
306/// # use std::path::Path;
307/// let breader = BusReader::new(Path::new("somefile.busz"));
308/// for record in breader{
309///     let cb= record.CB;
310/// };
311/// ```
312pub enum BusReader<'a> {
313    Plain(BusReaderPlain<'a>),
314    Compressed(BuszReader<'a>),
315}
316
317impl<'a> BusReader<'a> {
318    /// Creates a new reader of a bus/busz file. Depending on the file extension,
319    /// it infers wether the file is plain-binary (.bus) or compressed (.busz).
320    ///
321    /// # TODO: make the plain/compressed choice explicit?
322    pub fn new(fname: &Path) -> Self {
323        // NOTE: cant use ends_with(), which checks the entire filename
324        if fname.extension().unwrap() == "busz" {
325            Self::new_compressed(fname)
326        } else {
327            Self::new_plain(fname)
328        }
329    }
330
331    /// construct the BusReader on any input stream which yields bytes (implements read)
332    /// if this is based on a File, highly recommended to wrap it in a BufferedReader for performance
333    /// otherwise every iteration will cause a read/system call
334    /// IF YOU WANT SEPCIFIC BUFFER SIZE, FEED a BufferedReader into :from_read_*
335    /// buffer size (800KB = 800 * 1024) is a good choice.
336    pub fn from_read_plain(reader: impl Read + 'a) -> Self {
337        BusReader::Plain(BusReaderPlain::from_read(reader))
338    }
339
340    /// IF YOU WANT SEPCIFIC BUFFER SIZE, FEED a BufferedReader into :from_read_*
341    /// buffer size (800KB = 800 * 1024) is a good choice.
342    pub fn from_read_compressed(reader: impl Read + 'a) -> Self {
343        BusReader::Compressed(BuszReader::from_read(reader))
344    }
345
346    /// specifically return a Reader for plain bus files
347    pub fn new_plain(fname: &Path) -> Self {
348        BusReader::Plain(BusReaderPlain::new(fname))
349    }
350
351    /// specifically return a Reader for compressed busfiles
352    pub fn new_compressed(fname: &Path) -> Self {
353        BusReader::Compressed(BuszReader::new(fname))
354    }
355
356    // /// get the [`BusParams`] of the file.
357    pub fn get_params(&self) -> &BusParams {
358        match self {
359            BusReader::Plain(reader) => reader.get_params(),
360            BusReader::Compressed(reader) => reader.get_params(),
361        }
362    }
363}
364
365impl<'a> Iterator for BusReader<'a> {
366    type Item = BusRecord;
367    fn next(&mut self) -> Option<Self::Item> {
368        match self {
369            BusReader::Plain(reader) => reader.next(),
370            BusReader::Compressed(reader) => reader.next(),
371        }
372    }
373}
374
375impl<'a> CUGIterator for BusReader<'a> {}
376
377//=================================================================================================
378
379/// Main reader for plain Busfiles
380/// Allows to iterate over the BusRecords in the file
381///
382/// # Example
383/// ```rust, no_run
384/// # use bustools_core::io::BusReaderPlain;
385/// # use std::path::Path;
386/// let breader = BusReaderPlain::new(Path::new("somefile.bus"));
387/// for record in breader{
388///     let cb= record.CB;
389/// };
390/// ```
391/// # From `Read`
392/// The [`BusReaderPlain`] can operate on anything implementing the `Read`-trait.
393/// For example, one can create a [`BusReaderPlain`] from a [`File`]:
394/// ```rust, no_run
395/// # use bustools_core::io::BusReaderPlain;
396/// # use std::io::BufReader;
397/// # use std::fs::File;
398/// // note: Buffering is highly recommended for performance reasons
399/// let bufReader = BufReader::with_capacity(10000, File::open("somefile.bus").unwrap());
400/// let breader = BusReaderPlain::from_read(bufReader);
401/// ```
402///
403/// Similarly one can also construct a [`BusReaderPlain`] for in-memory data:
404/// ```rust
405/// # use bustools_core::io::{BusReaderPlain, BusRecord, BusHeader};
406/// // create an in memory representation of a busfile (in bytes)
407/// let header = BusHeader::new(16, 12, 1, false);
408/// let r1 = BusRecord{CB: 1, UMI:1, EC:1, COUNT: 10, FLAG: 0};
409/// let mut v = Vec::new();
410/// v.extend_from_slice(&header.to_bytes());
411/// v.extend_from_slice(&r1.to_bytes());
412///
413/// // v contains a busfile as a Vec<u8> which we can read with BusReader
414/// let breader = BusReaderPlain::from_read(&v[..]);
415/// let records:Vec<_> = breader.collect();
416/// ```
417pub struct BusReaderPlain<'a> {
418    /// containing info about CB-length, UMI length for decoding
419    pub params: BusParams,
420    reader: Box<dyn Read + 'a>, //ugly way to store any Read-like object in here, BufferedReader, File, Cursor, or just a vec<u8>!
421}
422impl<'a> BusReaderPlain<'a> {
423    /// Creates a BusReader for a file on disk.
424    /// Turns the file into a bufferedReader.
425    pub fn new(fname: &Path) -> Self {
426        let file_handle = File::open(fname).expect("Busfile should exist on disk!");
427        let buf = BufReader::with_capacity(DEFAULT_BUF_SIZE, file_handle);
428        Self::from_read(buf)
429    }
430
431    /// construct the BusReader on any input stream which yields bytes (implements read)
432    /// if this is based on a File, highly recommended to wrap it in a BufferedReader for performance
433    /// otherwise every iteration will cause a read/system call
434    pub fn from_read(mut reader: impl Read + 'a) -> Self {
435        // parse header
436        let mut header_bytes = [0_u8; BUS_HEADER_SIZE];
437        reader
438            .read_exact(&mut header_bytes)
439            .expect("failed to read header");
440        let header = BusHeader::from_bytes(&header_bytes);
441        let params = BusParams { cb_len: header.cb_len, umi_len: header.umi_len };
442
443        assert_eq!(
444            &header.magic, b"BUS\x00",
445            "Header struct not matching; MAGIC is wrong. We're expecting a plain file here!"
446        );
447
448        // the variable header
449        let mut buffer = Vec::with_capacity(header.tlen as usize);
450        for _i in 0..header.tlen {
451            buffer.push(0_u8);
452        }
453        reader
454            .read_exact(&mut buffer)
455            .expect("failed to read variable header");
456
457        // reader is now positioned at the first record
458        BusReaderPlain { params, reader: Box::new(reader) }
459    }
460
461    pub fn get_params(&self) -> &BusParams {
462        &self.params
463    }
464}
465
466impl<'a> Iterator for BusReaderPlain<'a> {
467    type Item = BusRecord;
468
469    fn next(&mut self) -> Option<Self::Item> {
470        /*
471        The previous version using only `read()` had a bit of a bug:
472        from unknown reasons (but consitent with the `read() documentation)
473        `read` will not fill the entire buffer, which would case the previous implementation
474        to throw an error.
475        We can instead use `read_exact`. Drawback:
476        This is a bit of a dangerous version: We dont check what happens at the EOF.
477        If the file contains a truncated last record, we would just skip over that
478
479        However: It's much faster than the above take/read_to_end (~5x)
480        */
481        let mut buffer = [0; BUS_ENTRY_SIZE]; // TODO this gets allocated at each next(). We could move it out into the struct: Actually doesnt make a diference, bottleneck is the reading
482        match self.reader.read_exact(&mut buffer) {
483            Ok(()) => Some(BusRecord::from_bytes(&buffer)),
484            Err(e) => match e.kind() {
485                // this can happen due to a real EOF, or we're almost at the EOF
486                // and try to read more bytes than whats left (i.e. a truncated file)
487                std::io::ErrorKind::UnexpectedEof => None,
488                _ => panic!("{e}"),
489            },
490        }
491    }
492}
493
494// tag our iterator to be compatible with our framework
495impl<'a> CUGIterator for BusReaderPlain<'a> {}
496
497/// actually, also make general iterators of BusRecord amenable
498// impl CUGIterator for dyn Iterator<Item = BusRecord> {}
499impl CUGIterator for std::vec::IntoIter<BusRecord> {}
500
501/// Main writer for bus records, either uncompressed or compressed.
502pub enum BusWriter {
503    Plain(BusWriterPlain),
504    Compressed(BuszWriter),
505}
506
507impl BusWriter {
508    pub fn new(filename: &Path, params: BusParams) -> Self {
509        if filename.extension().unwrap() == "busz" {
510            Self::Compressed(
511                // BuszWriter::new_with_capacity(file_handle, header, DEFAULT_BUF_SIZE)
512                BuszWriter::new(filename, params, 100_000), // TODO: buz_block
513            )
514        } else {
515            Self::Plain(BusWriterPlain::new(filename, params))
516        }
517    }
518
519    pub fn write_iterator(&mut self, iter: impl Iterator<Item = BusRecord>) {
520        match self {
521            BusWriter::Plain(writer) => writer.write_iterator(iter),
522            BusWriter::Compressed(writer) => writer.write_iterator(iter),
523        }
524    }
525}
526
527// ========================================
528/// Writing BusRecords into a File, using Buffers
529/// needs the Header of the busfile to be specified (length of CB and UMI)
530/// # Example
531/// ```
532/// # use bustools_core::io::{BusWriterPlain, BusParams, BusRecord};
533/// # use std::path::Path;
534/// let params = BusParams { cb_len: 16, umi_len: 12};
535/// let mut w = BusWriterPlain::new(Path::new("/tmp/target.bus"), params);
536/// let record = BusRecord{CB: 1, UMI: 2, EC: 0, COUNT: 10, FLAG: 0};
537/// w.write_record(&record);
538/// ```
539pub struct BusWriterPlain {
540    writer: BufWriter<File>,
541    // header: BusHeader,
542}
543
544impl BusWriterPlain {
545    /// create a BusWriter from an open Filehandle
546    pub fn from_filehandle(file_handle: File, params: BusParams) -> BusWriterPlain {
547        BusWriterPlain::new_with_capacity(file_handle, params, DEFAULT_BUF_SIZE)
548    }
549
550    /// create a Buswriter that streams records into a file
551    pub fn new(filename: &Path, params: BusParams) -> BusWriterPlain {
552        let file_handle: File = File::create(filename).expect("FAILED to open");
553        BusWriterPlain::from_filehandle(file_handle, params)
554    }
555
556    /// Writing a single BusRecord
557    pub fn write_record(&mut self, record: &BusRecord) {
558        let binrecord = record.to_bytes();
559        self.writer
560            .write_all(&binrecord)
561            .expect("FAILED to write record");
562    }
563
564    /// Writing multiple BusRecords at once
565    // pub fn write_records(&mut self, records: &Vec<BusRecord>) {
566    //     // TODO: pass forward to write_iterator!
567    //     //
568    //     // writes several recordsd and flushes
569    //     for r in records {
570    //         self.write_record(r)
571    //     }
572    //     self.writer.flush().unwrap();
573    // }
574
575    /// creates a buswriter with specified buffer capacity (after how many records an actual write happens)
576    /// dont use , 800KB is the default buffer size and optimal for performance
577    fn new_with_capacity(file_handle: File, params: BusParams, bufsize: usize) -> Self {
578        let mut writer = BufWriter::with_capacity(bufsize, file_handle);
579
580        let custom_header_str = "BUS file produced by kallisto".as_bytes();
581        let header = BusHeader::new(
582            params.cb_len,
583            params.umi_len,
584            custom_header_str.len() as u32,
585            false,
586        );
587
588        // write the header into the file
589        let binheader = header.to_bytes();
590        writer
591            .write_all(&binheader)
592            .expect("FAILED to write header");
593
594        let varheader = custom_header_str;
595
596        writer
597            .write_all(varheader)
598            .expect("FAILED to write var header");
599
600        BusWriterPlain { writer /*, header*/ }
601    }
602
603    pub fn write_iterator(&mut self, iter: impl Iterator<Item = BusRecord>) {
604        for r in iter {
605            self.write_record(&r)
606        }
607        self.writer.flush().unwrap();
608    }
609
610    pub fn write_iterator_ref<'a>(&mut self, iter: impl Iterator<Item = &'a BusRecord>) {
611        for r in iter {
612            self.write_record(r)
613        }
614        self.writer.flush().unwrap();
615    }
616}
617//=================================================================================
618
619/// Represents the standard file structure of kallisto quantification:
620/// - a busfile
621/// - and ec.matrix file, containing the mapping between EC and transcript_id
622/// - transcripts.txt, containing the ENST transcript id
623///
624/// makes life easier when dealing with kallisto quantifications and the mapping of EC to Gene.
625/// To construct it, one has to supply a transcript_to_gene mapping file. This allows ECs to be mapped to a set of genes via
626/// the Ec2GeneMapper
627pub struct BusFolder {
628    // pub foldername: String,
629    busfile: PathBuf,
630    matrix_ec_file: PathBuf,
631    transcript_file: PathBuf,
632}
633
634pub fn parse_ecmatrix(filename: &Path) -> HashMap<EC, Vec<TranscriptId>> {
635    // parsing an ec.matrix into a Hashmap EC->list_of_transcript_ids
636    let file = File::open(filename).unwrap_or_else(|_| panic!("{:?} not found", filename));
637    let reader = BufReader::new(file);
638    let mut ec_dict: HashMap<EC, Vec<TranscriptId>> = HashMap::new();
639
640    for line in reader.lines() {
641        if let Ok(l) = line {
642            // println!("{}", l);
643            let mut s = l.split_whitespace();
644            let ec = s.next().unwrap().parse::<u32>().unwrap();
645            let tmp = s.next().unwrap();
646
647            let transcript_list: Vec<TranscriptId> = tmp
648                .split(',')
649                .map(|x| TranscriptId(x.parse::<u32>().unwrap()))
650                .collect();
651            ec_dict.insert(EC(ec), transcript_list);
652        } else {
653            panic!("Error reading file {:?}", filename)
654        }
655    }
656    ec_dict
657}
658
659fn parse_transcript(filename: &Path) -> HashMap<TranscriptId, Transcriptname> {
660    let file = File::open(filename).unwrap();
661    let reader = BufReader::new(file);
662    let mut transcript_dict: HashMap<TranscriptId, Transcriptname> = HashMap::new();
663    for (i, line) in reader.lines().enumerate() {
664        if let Ok(l) = line {
665            transcript_dict.insert(TranscriptId(i as u32), Transcriptname(l));
666        }
667    }
668    transcript_dict
669}
670
671impl BusFolder {
672    pub fn new(foldername: &Path) -> BusFolder {
673        // the default names
674        let busfile = foldername.join("output.corrected.sort.bus");
675        let matrix_ec_file = foldername.join("matrix.ec");
676        let transcript_file = foldername.join("transcripts.txt");
677
678        BusFolder { busfile, matrix_ec_file, transcript_file }
679    }
680
681    pub fn from_files(busfile: &Path, matrix_ec_file: &Path, transcript_file: &Path) -> Self {
682        BusFolder {
683            busfile: busfile.to_path_buf(),
684            matrix_ec_file: matrix_ec_file.to_path_buf(),
685            transcript_file: transcript_file.to_path_buf(),
686        }
687    }
688
689    /// returns an iterator of the folder's busfile
690    pub fn get_iterator(&self) -> BusReader<'_> {
691        let bfile = self.get_busfile();
692        BusReader::new(Path::new(&bfile))
693    }
694
695    /// return the folders busfile
696    pub fn get_busfile(&self) -> PathBuf {
697        self.busfile.clone()
698    }
699
700    // return the busfiles header
701    // pub fn get_bus_header(&self) -> BusHeader {
702    //     BusHeader::from_file(&self.get_busfile())
703    // }
704
705    pub fn get_bus_params(&self) -> BusParams {
706        let h = BusHeader::from_file(Path::new(&self.get_busfile()));
707        BusParams { cb_len: h.cb_len, umi_len: h.umi_len }
708    }
709
710    /// return the matric.ec file
711    pub fn get_ecmatrix_file(&self) -> PathBuf {
712        self.matrix_ec_file.clone()
713    }
714
715    /// return the transcript file
716    pub fn get_transcript_file(&self) -> PathBuf {
717        self.transcript_file.clone()
718    }
719
720    /// return the matric.ec file
721    pub fn parse_ecmatrix(&self) -> HashMap<EC, Vec<TranscriptId>> {
722        let filename = self.get_ecmatrix_file();
723        parse_ecmatrix(&filename)
724    }
725
726    pub fn parse_transcript(&self) -> HashMap<TranscriptId, Transcriptname> {
727        let filename = self.get_transcript_file();
728        parse_transcript(&filename)
729    }
730
731    pub fn get_cb_size(&self) -> usize {
732        let cb_iter_tmp = self.get_iterator().groupby_cb();
733        cb_iter_tmp.count()
734    }
735
736    pub fn get_cbumi_size(&self) -> usize {
737        let cb_iter_tmp = self.get_iterator().groupby_cbumi();
738        cb_iter_tmp.count()
739    }
740
741    pub fn make_mapper(&self, t2g_file: &Path) -> Ec2GeneMapper {
742        make_mapper(self, t2g_file)
743    }
744    pub fn make_mapper_transcript(&self) -> Ec2TranscriptMapper {
745        make_mapper_transcript(self)
746    }
747}
748
749/// group a list of records by their CB/UMI (i.e. a single molecule)
750///
751/// takes ownership of record_list, since it reorders the elements
752pub fn group_record_by_cb_umi(record_list: Vec<BusRecord>) -> HashMap<(u64, u64), Vec<BusRecord>> {
753    let mut cbumi_map: HashMap<(u64, u64), Vec<BusRecord>> = HashMap::new();
754
755    for r in record_list {
756        let rlist = cbumi_map.entry((r.CB, r.UMI)).or_default(); // inserts a Vec::new if not present
757        rlist.push(r);
758    }
759    cbumi_map
760}
761
762pub fn setup_busfile(records: &[BusRecord]) -> (PathBuf, TempDir) {
763    // just writes the records into a temporay file
764    // returns the filename
765    // returns TempDir so that it doesnt go out of scope and gets deleted right after tis function returns
766    use tempfile::tempdir;
767    let dir = tempdir().unwrap();
768    let tmpfilename = dir.path().join("output.corrected.sort.bus");
769
770    let mut writer = BusWriterPlain::new(&tmpfilename, BusParams { cb_len: 16, umi_len: 12 });
771    writer.write_iterator_ref(records.iter());
772
773    (tmpfilename, dir)
774}
775
776pub fn write_partial_busfile(bfile: &Path, boutfile: &Path, nrecords: usize) {
777    // write the first nrecords of the intput file into the output
778    let busiter = BusReaderPlain::new(bfile);
779    let mut buswriter = BusWriterPlain::new(boutfile, busiter.params.clone());
780
781    for record in busiter.take(nrecords) {
782        buswriter.write_record(&record);
783    }
784}
785
786//=================================================================================
787#[cfg(test)]
788mod tests {
789    use crate::consistent_genes::EC;
790    use crate::consistent_transcripts::TranscriptId;
791    use crate::io::{
792        setup_busfile, BusHeader, BusParams, BusReaderPlain, BusRecord, BusWriterPlain,
793    };
794    use crate::iterators::CellGroupIterator;
795    use crate::record;
796    use std::io::{BufReader, Read, Write};
797    use std::path::Path;
798    use tempfile::tempdir;
799
800    #[test]
801    fn test_read_write_header() {
802        let r1 = record!(1, 2, 0, 12, 0);
803        let dir = tempdir().unwrap();
804        let busname = dir.path().join("test_read_write_header.bus");
805
806        let mut writer = BusWriterPlain::new(&busname, BusParams { cb_len: 16, umi_len: 12 });
807        writer.write_record(&r1);
808        writer.writer.flush().unwrap();
809
810        let bheader = BusHeader::from_file(&busname);
811        let header = BusHeader::new(16, 12, 20, false);
812        // let params = BusParams{cb_len: 16, umi_len:12 };
813
814        assert_eq!(header.magic, bheader.magic);
815        assert_eq!(header.cb_len, bheader.cb_len);
816        assert_eq!(header.umi_len, bheader.umi_len);
817        assert_eq!(header.version, bheader.version);
818        // Note: tlen is not preserved!!
819    }
820
821    #[test]
822    fn test_read_write() {
823        let r1 = record!(0, 2, 0, 12, 0);
824        let r2 = record!(1, 21, 1, 2, 0);
825        let rlist = vec![r1, r2]; // note: this clones r1, r2!
826        let (busname, _dir) = setup_busfile(&rlist);
827        let reader = BusReaderPlain::new(&busname);
828
829        let records: Vec<BusRecord> = reader.into_iter().collect();
830        assert_eq!(records, rlist)
831    }
832
833    use std::fs::File;
834    use std::io::BufWriter;
835
836    use super::parse_ecmatrix;
837    #[test]
838    fn test_ecmatrix() {
839        let dir = tempdir().unwrap();
840        let tmpfilename = dir.path().join("foo.txt");
841
842        let file = File::create(tmpfilename.clone()).expect("Unable to create file");
843        let mut f = BufWriter::new(file);
844
845        let data = "0 1,2,3\n1 3,4\n2 4";
846        f.write_all(data.as_bytes()).expect("Unable to write data");
847        f.flush().unwrap();
848
849        let ec = parse_ecmatrix(&tmpfilename);
850        // println!("{}", ec.len());
851        // println!("{:?}", ec);
852        let e1 = ec.get(&EC(0)).unwrap();
853        assert_eq!(*e1, vec![TranscriptId(1), TranscriptId(2), TranscriptId(3)]);
854
855        let e1 = ec.get(&EC(1)).unwrap();
856        assert_eq!(*e1, vec![TranscriptId(3), TranscriptId(4)]);
857    }
858
859    use super::group_record_by_cb_umi;
860    #[test]
861    fn test_grouping() {
862        let r1 = record!(0, 1, 0, 12, 0);
863        let r2 = record!(0, 1, 1, 2, 0);
864        let r3 = record!(0, 2, 0, 12, 0);
865        let r4 = record!(1, 1, 1, 2, 0);
866        let r5 = record!(1, 2, 1, 2, 0);
867        let r6 = record!(1, 1, 1, 2, 0);
868
869        let records = vec![
870            r1.clone(),
871            r2.clone(),
872            r3.clone(),
873            r4.clone(),
874            r5.clone(),
875            r6.clone(),
876        ];
877
878        let grouped = group_record_by_cb_umi(records);
879
880        let g1 = grouped.get(&(0 as u64, 1 as u64)).unwrap();
881        assert_eq!(*g1, vec![r1, r2]);
882
883        let g2 = grouped.get(&(0 as u64, 2 as u64)).unwrap();
884        assert_eq!(*g2, vec![r3]);
885
886        let g3 = grouped.get(&(1 as u64, 1 as u64)).unwrap();
887        assert_eq!(*g3, vec![r4, r6]);
888
889        let g4 = grouped.get(&(1 as u64, 2 as u64)).unwrap();
890        assert_eq!(*g4, vec![r5]);
891    }
892
893    #[test]
894    /// make sure the format on disk stays the same when we write it
895    fn test_insta_binary_format_write() {
896        let dir = tempdir().unwrap();
897        let tmpfilename = dir.path().join("test.bus");
898
899        let mut wri = BusWriterPlain::new(&tmpfilename, BusParams { cb_len: 16, umi_len: 12 });
900        let b = record!(1, 1, 1, 3, 0);
901        let c = record!(0, 1, 1, 3, 0);
902
903        let records = vec![b, c];
904        wri.write_iterator(records.into_iter());
905
906        let digest_str = md5sum(&tmpfilename);
907        insta::assert_yaml_snapshot!(digest_str, @r###"
908        ---
909        3b49ecec15ae50a3cf2b7cfd37441ea1
910        "###);
911    }
912
913    fn md5sum(fname: &Path) -> String {
914        let file = File::open(fname).unwrap();
915        let mut reader = BufReader::new(file);
916        let mut buffer = Vec::new();
917
918        // Read file into vector.
919        reader.read_to_end(&mut buffer).unwrap();
920        let digest_str = format!("{:x}", md5::compute(buffer));
921        digest_str
922    }
923
924    #[test]
925    /// make sure the format on disk stays the same when we read it
926    fn test_insta_binary_format_read() {
927        let external_file =
928            Path::new("/home/michi/bus_testing/bus_output_short/output.corrected.sort.bus");
929        let records: Vec<BusRecord> = BusReaderPlain::new(external_file)
930            .step_by(10000)
931            .take(100)
932            .collect();
933
934        insta::with_settings!({sort_maps => true}, {
935            insta::assert_yaml_snapshot!(records)
936        });
937    }
938
939    #[test]
940    fn test_dyn_from_read_file() {
941        let r1 = record!(0, 2, 0, 12, 0);
942        let r2 = record!(1, 21, 1, 2, 0);
943        let rlist = vec![r1.clone(), r2.clone()]; // note: this clones r1, r2!
944        let (busname, _dir) = setup_busfile(&rlist);
945
946        let f = File::open(busname).unwrap();
947        let mut r = BusReaderPlain::from_read(f);
948        assert_eq!(r.params, BusParams { cb_len: 16, umi_len: 12 });
949
950        assert_eq!(r.next(), Some(r1));
951        assert_eq!(r.next(), Some(r2));
952        assert_eq!(r.next(), None);
953    }
954
955    #[test]
956    fn test_dyn_from_file() {
957        let r1 = record!(0, 2, 0, 12, 0);
958        let r2 = record!(1, 21, 1, 2, 0);
959        let rlist = vec![r1.clone(), r2.clone()]; // note: this clones r1, r2!
960        let (busname, _dir) = setup_busfile(&rlist);
961
962        let mut r = BusReaderPlain::new(&busname);
963        assert_eq!(r.params, BusParams { cb_len: 16, umi_len: 12 });
964
965        assert_eq!(r.next(), Some(r1));
966        assert_eq!(r.next(), Some(r2));
967        assert_eq!(r.next(), None);
968    }
969
970    #[test]
971    fn test_dyn_from_mem() {
972        let r1 = record!(0, 2, 0, 12, 0);
973        let r2 = record!(1, 21, 1, 2, 0);
974        let rlist = vec![r1.clone(), r2.clone()]; // note: this clones r1, r2!
975        let (busname, _dir) = setup_busfile(&rlist);
976
977        let mut f = File::open(busname).unwrap();
978
979        let mut buffer = Vec::new();
980        f.read_to_end(&mut buffer).unwrap();
981
982        let mut r = BusReaderPlain::from_read(buffer.as_slice());
983        assert_eq!(r.params, BusParams { cb_len: 16, umi_len: 12 });
984
985        assert_eq!(r.next(), Some(r1));
986        assert_eq!(r.next(), Some(r2));
987        assert_eq!(r.next(), None);
988    }
989
990    #[test]
991    fn test_dyn_group() {
992        let r1 = record!(0, 2, 0, 12, 0);
993        let r2 = record!(0, 3, 0, 12, 0);
994        let r3 = record!(1, 21, 1, 2, 0);
995        let rlist = vec![r1.clone(), r2.clone(), r3.clone()]; // note: this clones r1, r2!
996        let (busname, _dir) = setup_busfile(&rlist);
997
998        let mut f = File::open(busname).unwrap();
999
1000        let mut buffer = Vec::new();
1001        f.read_to_end(&mut buffer).unwrap();
1002
1003        let r = BusReaderPlain::from_read(buffer.as_slice());
1004
1005        let mut iter = r.groupby_cb();
1006
1007        assert_eq!(iter.next(), Some((0, vec![r1, r2])));
1008        assert_eq!(iter.next(), Some((1, vec![r3])));
1009    }
1010}