#![allow(non_snake_case)]
use crate::busz::{BuszReader, BuszWriter};
use crate::consistent_genes::{make_mapper, Ec2GeneMapper, EC};
use crate::consistent_transcripts::{
make_mapper_transcript, Ec2TranscriptMapper, TranscriptId, Transcriptname,
};
use crate::iterators::{CbUmiGroupIterator, CellGroupIterator};
use bincode;
use serde;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Error, Read, Write};
use std::path::{Path, PathBuf};
use rkyv::{self, AlignedVec, Deserialize};
use tempfile::TempDir;
pub(crate) const BUS_ENTRY_SIZE: usize = 32;
pub const BUS_HEADER_SIZE: usize = 20;
#[derive(
serde::Serialize,
serde::Deserialize,
Debug,
PartialEq,
Eq,
Clone,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
)] #[archive(check_bytes)]
pub struct BusRecord {
pub CB: u64, pub UMI: u64, pub EC: u32, pub COUNT: u32, pub FLAG: u32, }
impl BusRecord {
pub fn to_bytes_rkyv(&self) -> AlignedVec {
rkyv::to_bytes::<_, 32>(self).expect("failed to serialize vec")
}
pub fn to_bytes_bincode(&self) -> Vec<u8> {
let mut binrecord = bincode::serialize(self).expect("FAILED to serialze record");
for _i in 0..4 {
binrecord.push(0);
}
binrecord
}
pub fn to_bytes(&self) -> AlignedVec {
self.to_bytes_rkyv()
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let archived = unsafe { rkyv::archived_root::<BusRecord>(bytes) };
let s: BusRecord = archived.deserialize(&mut rkyv::Infallible).unwrap();
s
}
}
pub fn record_tobytes(r: &BusRecord) -> Vec<u8> {
let mut bytes: Vec<u8> = Vec::with_capacity(BUS_ENTRY_SIZE);
let cb_bytes = r.CB.to_ne_bytes();
bytes.extend_from_slice(&cb_bytes);
let umi_bytes = r.UMI.to_ne_bytes();
bytes.extend_from_slice(&umi_bytes);
let ec_bytes = r.EC.to_ne_bytes();
bytes.extend_from_slice(&ec_bytes);
let count_bytes = r.COUNT.to_ne_bytes();
bytes.extend_from_slice(&count_bytes);
let flag_bytes = r.FLAG.to_ne_bytes();
bytes.extend_from_slice(&flag_bytes);
let pad: u32 = 0;
let pad_bytes = pad.to_ne_bytes();
bytes.extend_from_slice(&pad_bytes);
bytes
}
pub fn record_frombytes(bytes: &mut impl Read) -> Result<BusRecord, Error> {
let mut cb_bytes = [0_u8; 8];
bytes.read_exact(&mut cb_bytes)?;
let mut umi_bytes = [0_u8; 8];
bytes.read_exact(&mut umi_bytes)?;
let mut ec_bytes = [0_u8; 4];
bytes.read_exact(&mut ec_bytes)?;
let mut count_bytes = [0_u8; 4];
bytes.read_exact(&mut count_bytes)?;
let mut flag_bytes = [0_u8; 4];
bytes.read_exact(&mut flag_bytes)?;
let mut pad_bytes = [0_u8; 4];
bytes.read_exact(&mut pad_bytes)?;
assert_eq!(pad_bytes, [0_u8; 4]);
let record = BusRecord {
CB: u64::from_ne_bytes(cb_bytes),
UMI: u64::from_ne_bytes(umi_bytes),
EC: u32::from_ne_bytes(ec_bytes),
COUNT: u32::from_ne_bytes(count_bytes),
FLAG: u32::from_ne_bytes(flag_bytes),
};
Ok(record)
}
#[test]
fn ser() {
use crate::record;
use std::io::Cursor;
let rec = record!(121521, 124163, 43, 123, 0);
let bytes = record_tobytes(&rec);
let r = BusRecord::from_bytes(&bytes);
assert_eq!(r, rec);
let mut bytestream = Cursor::new(bytes);
let r2 = record_frombytes(&mut bytestream).unwrap();
assert_eq!(r2, rec);
}
#[test]
fn test_rkyv_write() {
use crate::record;
let rec = record!(121521, 124163, 43, u32::MAX - 1, 12);
let bytes_true = rec.to_bytes_bincode();
let byts_rkyv = rec.to_bytes_rkyv();
println!("{}", byts_rkyv.len());
println!("{:?}", byts_rkyv);
assert_eq!(byts_rkyv.to_vec(), bytes_true);
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BusParams {
pub cb_len: u32,
pub umi_len: u32,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct BusHeader {
pub(crate) magic: [u8; 4],
pub(crate) version: u32,
pub(crate) cb_len: u32,
pub(crate) umi_len: u32,
pub(crate) tlen: u32,
}
impl BusHeader {
pub fn new(cb_len: u32, umi_len: u32, tlen: u32, compressed: bool) -> BusHeader {
let magic: [u8; 4] = if compressed { *b"BUS\x01" } else { *b"BUS\x00" };
BusHeader { cb_len, umi_len, tlen, magic, version: 1 }
}
pub fn from_file(fname: &Path) -> BusHeader {
let file =
std::fs::File::open(fname).unwrap_or_else(|_| panic!("file not found: {fname:?}"));
let mut header = Vec::with_capacity(BUS_HEADER_SIZE);
let _n = file
.take(BUS_HEADER_SIZE as u64)
.read_to_end(&mut header)
.unwrap();
BusHeader::from_bytes(&header)
}
pub fn from_bytes(bytes: &[u8]) -> BusHeader {
let header_struct: BusHeader =
bincode::deserialize(bytes).expect("FAILED to deserialze header");
header_struct
}
pub fn to_bytes(&self) -> Vec<u8> {
bincode::serialize(self).expect("FAILED to serialze header")
}
pub fn get_tlen(&self) -> u32 {
self.tlen
}
pub fn get_params(&self) -> BusParams {
BusParams { cb_len: self.cb_len, umi_len: self.umi_len }
}
}
pub trait CUGIterator: Iterator<Item = BusRecord> {}
pub const DEFAULT_BUF_SIZE: usize = 800 * 1024;
pub enum BusReader<'a> {
Plain(BusReaderPlain<'a>),
Compressed(BuszReader<'a>),
}
impl<'a> BusReader<'a> {
pub fn new(fname: &Path) -> Self {
if fname.extension().unwrap() == "busz" {
Self::new_compressed(fname)
} else {
Self::new_plain(fname)
}
}
pub fn from_read_plain(reader: impl Read + 'a) -> Self {
BusReader::Plain(BusReaderPlain::from_read(reader))
}
pub fn from_read_compressed(reader: impl Read + 'a) -> Self {
BusReader::Compressed(BuszReader::from_read(reader))
}
pub fn new_plain(fname: &Path) -> Self {
BusReader::Plain(BusReaderPlain::new(fname))
}
pub fn new_compressed(fname: &Path) -> Self {
BusReader::Compressed(BuszReader::new(fname))
}
pub fn get_params(&self) -> &BusParams {
match self {
BusReader::Plain(reader) => reader.get_params(),
BusReader::Compressed(reader) => reader.get_params(),
}
}
}
impl<'a> Iterator for BusReader<'a> {
type Item = BusRecord;
fn next(&mut self) -> Option<Self::Item> {
match self {
BusReader::Plain(reader) => reader.next(),
BusReader::Compressed(reader) => reader.next(),
}
}
}
impl<'a> CUGIterator for BusReader<'a> {}
pub struct BusReaderPlain<'a> {
pub params: BusParams,
reader: Box<dyn Read + 'a>, }
impl<'a> BusReaderPlain<'a> {
pub fn new(fname: &Path) -> Self {
let file_handle = File::open(fname).expect("Busfile should exist on disk!");
let buf = BufReader::with_capacity(DEFAULT_BUF_SIZE, file_handle);
Self::from_read(buf)
}
pub fn from_read(mut reader: impl Read + 'a) -> Self {
let mut header_bytes = [0_u8; BUS_HEADER_SIZE];
reader
.read_exact(&mut header_bytes)
.expect("failed to read header");
let header = BusHeader::from_bytes(&header_bytes);
let params = BusParams { cb_len: header.cb_len, umi_len: header.umi_len };
assert_eq!(
&header.magic, b"BUS\x00",
"Header struct not matching; MAGIC is wrong. We're expecting a plain file here!"
);
let mut buffer = Vec::with_capacity(header.tlen as usize);
for _i in 0..header.tlen {
buffer.push(0_u8);
}
reader
.read_exact(&mut buffer)
.expect("failed to read variable header");
BusReaderPlain { params, reader: Box::new(reader) }
}
pub fn get_params(&self) -> &BusParams {
&self.params
}
}
impl<'a> Iterator for BusReaderPlain<'a> {
type Item = BusRecord;
fn next(&mut self) -> Option<Self::Item> {
let mut buffer = [0; BUS_ENTRY_SIZE]; match self.reader.read_exact(&mut buffer) {
Ok(()) => Some(BusRecord::from_bytes(&buffer)),
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => None,
_ => panic!("{e}"),
},
}
}
}
impl<'a> CUGIterator for BusReaderPlain<'a> {}
impl CUGIterator for std::vec::IntoIter<BusRecord> {}
pub enum BusWriter {
Plain(BusWriterPlain),
Compressed(BuszWriter),
}
impl BusWriter {
pub fn new(filename: &Path, params: BusParams) -> Self {
if filename.extension().unwrap() == "busz" {
Self::Compressed(
BuszWriter::new(filename, params, 100_000), )
} else {
Self::Plain(BusWriterPlain::new(filename, params))
}
}
pub fn write_iterator(&mut self, iter: impl Iterator<Item = BusRecord>) {
match self {
BusWriter::Plain(writer) => writer.write_iterator(iter),
BusWriter::Compressed(writer) => writer.write_iterator(iter),
}
}
}
pub struct BusWriterPlain {
writer: BufWriter<File>,
}
impl BusWriterPlain {
pub fn from_filehandle(file_handle: File, params: BusParams) -> BusWriterPlain {
BusWriterPlain::new_with_capacity(file_handle, params, DEFAULT_BUF_SIZE)
}
pub fn new(filename: &Path, params: BusParams) -> BusWriterPlain {
let file_handle: File = File::create(filename).expect("FAILED to open");
BusWriterPlain::from_filehandle(file_handle, params)
}
pub fn write_record(&mut self, record: &BusRecord) {
let binrecord = record.to_bytes();
self.writer
.write_all(&binrecord)
.expect("FAILED to write record");
}
fn new_with_capacity(file_handle: File, params: BusParams, bufsize: usize) -> Self {
let mut writer = BufWriter::with_capacity(bufsize, file_handle);
let custom_header_str = "BUS file produced by kallisto".as_bytes();
let header = BusHeader::new(
params.cb_len,
params.umi_len,
custom_header_str.len() as u32,
false,
);
let binheader = header.to_bytes();
writer
.write_all(&binheader)
.expect("FAILED to write header");
let varheader = custom_header_str;
writer
.write_all(varheader)
.expect("FAILED to write var header");
BusWriterPlain { writer }
}
pub fn write_iterator(&mut self, iter: impl Iterator<Item = BusRecord>) {
for r in iter {
self.write_record(&r)
}
self.writer.flush().unwrap();
}
pub fn write_iterator_ref<'a>(&mut self, iter: impl Iterator<Item = &'a BusRecord>) {
for r in iter {
self.write_record(r)
}
self.writer.flush().unwrap();
}
}
pub struct BusFolder {
busfile: PathBuf,
matrix_ec_file: PathBuf,
transcript_file: PathBuf,
}
pub fn parse_ecmatrix(filename: &Path) -> HashMap<EC, Vec<TranscriptId>> {
let file = File::open(filename).unwrap_or_else(|_| panic!("{:?} not found", filename));
let reader = BufReader::new(file);
let mut ec_dict: HashMap<EC, Vec<TranscriptId>> = HashMap::new();
for line in reader.lines() {
if let Ok(l) = line {
let mut s = l.split_whitespace();
let ec = s.next().unwrap().parse::<u32>().unwrap();
let tmp = s.next().unwrap();
let transcript_list: Vec<TranscriptId> = tmp
.split(',')
.map(|x| TranscriptId(x.parse::<u32>().unwrap()))
.collect();
ec_dict.insert(EC(ec), transcript_list);
} else {
panic!("Error reading file {:?}", filename)
}
}
ec_dict
}
fn parse_transcript(filename: &Path) -> HashMap<TranscriptId, Transcriptname> {
let file = File::open(filename).unwrap();
let reader = BufReader::new(file);
let mut transcript_dict: HashMap<TranscriptId, Transcriptname> = HashMap::new();
for (i, line) in reader.lines().enumerate() {
if let Ok(l) = line {
transcript_dict.insert(TranscriptId(i as u32), Transcriptname(l));
}
}
transcript_dict
}
impl BusFolder {
pub fn new(foldername: &Path) -> BusFolder {
let busfile = foldername.join("output.corrected.sort.bus");
let matrix_ec_file = foldername.join("matrix.ec");
let transcript_file = foldername.join("transcripts.txt");
BusFolder { busfile, matrix_ec_file, transcript_file }
}
pub fn from_files(busfile: &Path, matrix_ec_file: &Path, transcript_file: &Path) -> Self {
BusFolder {
busfile: busfile.to_path_buf(),
matrix_ec_file: matrix_ec_file.to_path_buf(),
transcript_file: transcript_file.to_path_buf(),
}
}
pub fn get_iterator(&self) -> BusReader<'_> {
let bfile = self.get_busfile();
BusReader::new(Path::new(&bfile))
}
pub fn get_busfile(&self) -> PathBuf {
self.busfile.clone()
}
pub fn get_bus_params(&self) -> BusParams {
let h = BusHeader::from_file(Path::new(&self.get_busfile()));
BusParams { cb_len: h.cb_len, umi_len: h.umi_len }
}
pub fn get_ecmatrix_file(&self) -> PathBuf {
self.matrix_ec_file.clone()
}
pub fn get_transcript_file(&self) -> PathBuf {
self.transcript_file.clone()
}
pub fn parse_ecmatrix(&self) -> HashMap<EC, Vec<TranscriptId>> {
let filename = self.get_ecmatrix_file();
parse_ecmatrix(&filename)
}
pub fn parse_transcript(&self) -> HashMap<TranscriptId, Transcriptname> {
let filename = self.get_transcript_file();
parse_transcript(&filename)
}
pub fn get_cb_size(&self) -> usize {
let cb_iter_tmp = self.get_iterator().groupby_cb();
cb_iter_tmp.count()
}
pub fn get_cbumi_size(&self) -> usize {
let cb_iter_tmp = self.get_iterator().groupby_cbumi();
cb_iter_tmp.count()
}
pub fn make_mapper(&self, t2g_file: &Path) -> Ec2GeneMapper {
make_mapper(self, t2g_file)
}
pub fn make_mapper_transcript(&self) -> Ec2TranscriptMapper {
make_mapper_transcript(self)
}
}
pub fn group_record_by_cb_umi(record_list: Vec<BusRecord>) -> HashMap<(u64, u64), Vec<BusRecord>> {
let mut cbumi_map: HashMap<(u64, u64), Vec<BusRecord>> = HashMap::new();
for r in record_list {
let rlist = cbumi_map.entry((r.CB, r.UMI)).or_default(); rlist.push(r);
}
cbumi_map
}
pub fn setup_busfile(records: &[BusRecord]) -> (PathBuf, TempDir) {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let tmpfilename = dir.path().join("output.corrected.sort.bus");
let mut writer = BusWriterPlain::new(&tmpfilename, BusParams { cb_len: 16, umi_len: 12 });
writer.write_iterator_ref(records.iter());
(tmpfilename, dir)
}
pub fn write_partial_busfile(bfile: &Path, boutfile: &Path, nrecords: usize) {
let busiter = BusReaderPlain::new(bfile);
let mut buswriter = BusWriterPlain::new(boutfile, busiter.params.clone());
for record in busiter.take(nrecords) {
buswriter.write_record(&record);
}
}
#[cfg(test)]
mod tests {
use crate::consistent_genes::EC;
use crate::consistent_transcripts::TranscriptId;
use crate::io::{
setup_busfile, BusHeader, BusParams, BusReaderPlain, BusRecord, BusWriterPlain,
};
use crate::iterators::CellGroupIterator;
use crate::record;
use std::io::{BufReader, Read, Write};
use std::path::Path;
use tempfile::tempdir;
#[test]
fn test_read_write_header() {
let r1 = record!(1, 2, 0, 12, 0);
let dir = tempdir().unwrap();
let busname = dir.path().join("test_read_write_header.bus");
let mut writer = BusWriterPlain::new(&busname, BusParams { cb_len: 16, umi_len: 12 });
writer.write_record(&r1);
writer.writer.flush().unwrap();
let bheader = BusHeader::from_file(&busname);
let header = BusHeader::new(16, 12, 20, false);
assert_eq!(header.magic, bheader.magic);
assert_eq!(header.cb_len, bheader.cb_len);
assert_eq!(header.umi_len, bheader.umi_len);
assert_eq!(header.version, bheader.version);
}
#[test]
fn test_read_write() {
let r1 = record!(0, 2, 0, 12, 0);
let r2 = record!(1, 21, 1, 2, 0);
let rlist = vec![r1, r2]; let (busname, _dir) = setup_busfile(&rlist);
let reader = BusReaderPlain::new(&busname);
let records: Vec<BusRecord> = reader.into_iter().collect();
assert_eq!(records, rlist)
}
use std::fs::File;
use std::io::BufWriter;
use super::parse_ecmatrix;
#[test]
fn test_ecmatrix() {
let dir = tempdir().unwrap();
let tmpfilename = dir.path().join("foo.txt");
let file = File::create(tmpfilename.clone()).expect("Unable to create file");
let mut f = BufWriter::new(file);
let data = "0 1,2,3\n1 3,4\n2 4";
f.write_all(data.as_bytes()).expect("Unable to write data");
f.flush().unwrap();
let ec = parse_ecmatrix(&tmpfilename);
let e1 = ec.get(&EC(0)).unwrap();
assert_eq!(*e1, vec![TranscriptId(1), TranscriptId(2), TranscriptId(3)]);
let e1 = ec.get(&EC(1)).unwrap();
assert_eq!(*e1, vec![TranscriptId(3), TranscriptId(4)]);
}
use super::group_record_by_cb_umi;
#[test]
fn test_grouping() {
let r1 = record!(0, 1, 0, 12, 0);
let r2 = record!(0, 1, 1, 2, 0);
let r3 = record!(0, 2, 0, 12, 0);
let r4 = record!(1, 1, 1, 2, 0);
let r5 = record!(1, 2, 1, 2, 0);
let r6 = record!(1, 1, 1, 2, 0);
let records = vec![
r1.clone(),
r2.clone(),
r3.clone(),
r4.clone(),
r5.clone(),
r6.clone(),
];
let grouped = group_record_by_cb_umi(records);
let g1 = grouped.get(&(0 as u64, 1 as u64)).unwrap();
assert_eq!(*g1, vec![r1, r2]);
let g2 = grouped.get(&(0 as u64, 2 as u64)).unwrap();
assert_eq!(*g2, vec![r3]);
let g3 = grouped.get(&(1 as u64, 1 as u64)).unwrap();
assert_eq!(*g3, vec![r4, r6]);
let g4 = grouped.get(&(1 as u64, 2 as u64)).unwrap();
assert_eq!(*g4, vec![r5]);
}
#[test]
fn test_insta_binary_format_write() {
let dir = tempdir().unwrap();
let tmpfilename = dir.path().join("test.bus");
let mut wri = BusWriterPlain::new(&tmpfilename, BusParams { cb_len: 16, umi_len: 12 });
let b = record!(1, 1, 1, 3, 0);
let c = record!(0, 1, 1, 3, 0);
let records = vec![b, c];
wri.write_iterator(records.into_iter());
let digest_str = md5sum(&tmpfilename);
insta::assert_yaml_snapshot!(digest_str, @r###"
---
3b49ecec15ae50a3cf2b7cfd37441ea1
"###);
}
fn md5sum(fname: &Path) -> String {
let file = File::open(fname).unwrap();
let mut reader = BufReader::new(file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).unwrap();
let digest_str = format!("{:x}", md5::compute(buffer));
digest_str
}
#[test]
fn test_insta_binary_format_read() {
let external_file =
Path::new("/home/michi/bus_testing/bus_output_short/output.corrected.sort.bus");
let records: Vec<BusRecord> = BusReaderPlain::new(external_file)
.step_by(10000)
.take(100)
.collect();
insta::with_settings!({sort_maps => true}, {
insta::assert_yaml_snapshot!(records)
});
}
#[test]
fn test_dyn_from_read_file() {
let r1 = record!(0, 2, 0, 12, 0);
let r2 = record!(1, 21, 1, 2, 0);
let rlist = vec![r1.clone(), r2.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let f = File::open(busname).unwrap();
let mut r = BusReaderPlain::from_read(f);
assert_eq!(r.params, BusParams { cb_len: 16, umi_len: 12 });
assert_eq!(r.next(), Some(r1));
assert_eq!(r.next(), Some(r2));
assert_eq!(r.next(), None);
}
#[test]
fn test_dyn_from_file() {
let r1 = record!(0, 2, 0, 12, 0);
let r2 = record!(1, 21, 1, 2, 0);
let rlist = vec![r1.clone(), r2.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let mut r = BusReaderPlain::new(&busname);
assert_eq!(r.params, BusParams { cb_len: 16, umi_len: 12 });
assert_eq!(r.next(), Some(r1));
assert_eq!(r.next(), Some(r2));
assert_eq!(r.next(), None);
}
#[test]
fn test_dyn_from_mem() {
let r1 = record!(0, 2, 0, 12, 0);
let r2 = record!(1, 21, 1, 2, 0);
let rlist = vec![r1.clone(), r2.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let mut f = File::open(busname).unwrap();
let mut buffer = Vec::new();
f.read_to_end(&mut buffer).unwrap();
let mut r = BusReaderPlain::from_read(buffer.as_slice());
assert_eq!(r.params, BusParams { cb_len: 16, umi_len: 12 });
assert_eq!(r.next(), Some(r1));
assert_eq!(r.next(), Some(r2));
assert_eq!(r.next(), None);
}
#[test]
fn test_dyn_group() {
let r1 = record!(0, 2, 0, 12, 0);
let r2 = record!(0, 3, 0, 12, 0);
let r3 = record!(1, 21, 1, 2, 0);
let rlist = vec![r1.clone(), r2.clone(), r3.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let mut f = File::open(busname).unwrap();
let mut buffer = Vec::new();
f.read_to_end(&mut buffer).unwrap();
let r = BusReaderPlain::from_read(buffer.as_slice());
let mut iter = r.groupby_cb();
assert_eq!(iter.next(), Some((0, vec![r1, r2])));
assert_eq!(iter.next(), Some((1, vec![r3])));
}
}