#![allow(non_snake_case)]
use crate::busz::{BuszReader, BuszWriter};
use crate::consistent_genes::{Ec2GeneMapper, EC, make_mapper};
use crate::consistent_transcripts::{make_mapper_transcript, Ec2TranscriptMapper, TranscriptId, Transcriptname};
use crate::iterators::{CbUmiGroupIterator, CellGroupIterator};
use bincode;
use serde;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Read, Write};
use tempfile::TempDir;
use rkyv::{self, Deserialize};
pub (crate) const BUS_ENTRY_SIZE: usize = 32;
pub const BUS_HEADER_SIZE: usize = 20;
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize)] #[archive(check_bytes)]
pub struct BusRecord {
pub CB: u64, pub UMI: u64, pub EC: u32, pub COUNT: u32, pub FLAG: u32, }
impl BusRecord {
pub fn to_bytes(&self) -> Vec<u8> {
let mut binrecord = bincode::serialize(self).expect("FAILED to serialze record");
for _i in 0..4 {
binrecord.push(0);
}
binrecord
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let archived = unsafe { rkyv::archived_root::<BusRecord>(bytes) };
let s:BusRecord = archived.deserialize(&mut rkyv::Infallible).unwrap();
s
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BusParams {
pub cb_len: u32,
pub umi_len: u32,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct BusHeader {
pub(crate) magic: [u8; 4],
pub(crate) version: u32,
pub(crate) cb_len: u32,
pub(crate) umi_len: u32,
pub(crate) tlen: u32,
}
impl BusHeader {
pub fn new(cb_len: u32, umi_len: u32, tlen: u32, compressed: bool) -> BusHeader {
let magic: [u8; 4] = if compressed {
*b"BUS\x01"
} else {
*b"BUS\x00"
};
BusHeader { cb_len, umi_len, tlen, magic, version: 1 }
}
pub fn from_file(fname: &str) -> BusHeader {
let file = std::fs::File::open(fname).unwrap_or_else(|_| panic!("file not found: {fname}"));
let mut header = Vec::with_capacity(BUS_HEADER_SIZE);
let _n = file
.take(BUS_HEADER_SIZE as u64)
.read_to_end(&mut header)
.unwrap();
BusHeader::from_bytes(&header)
}
pub fn from_bytes(bytes: &[u8]) -> BusHeader {
let header_struct: BusHeader =
bincode::deserialize(bytes).expect("FAILED to deserialze header");
header_struct
}
pub fn to_bytes(&self) -> Vec<u8> {
bincode::serialize(self).expect("FAILED to serialze header")
}
pub fn get_tlen(&self) -> u32 {
self.tlen
}
}
pub trait CUGIterator: Iterator<Item = BusRecord> {}
pub const DEFAULT_BUF_SIZE: usize = 800 * 1024;
pub enum BusReader <'a> {
Plain(BusReaderPlain<'a>),
Compressed(BuszReader<'a>),
}
impl <'a>BusReader<'a> {
pub fn new(fname: &str) -> Self {
if fname.ends_with(".busz") {
Self::new_compressed(fname)
} else {
Self::new_plain(fname)
}
}
pub fn from_read_plain(reader: impl Read + 'a) -> Self {
BusReader::Plain(BusReaderPlain::from_read(reader))
}
pub fn from_read_compressed(reader: impl Read + 'a) -> Self {
BusReader::Compressed(BuszReader::from_read(reader))
}
pub fn new_plain(fname: &str) -> Self{
BusReader::Plain(BusReaderPlain::new(fname))
}
pub fn new_compressed(fname: &str) -> Self{
BusReader::Compressed(BuszReader::new(fname))
}
pub fn get_params(&self) -> &BusParams {
match self {
BusReader::Plain(reader) => reader.get_params(),
BusReader::Compressed(reader) => reader.get_params(),
}
}
}
impl <'a> Iterator for BusReader<'a> {
type Item = BusRecord;
fn next(&mut self) -> Option<Self::Item> {
match self {
BusReader::Plain(reader) => reader.next(),
BusReader::Compressed(reader) => reader.next(),
}
}
}
impl <'a> CUGIterator for BusReader <'a> {}
pub struct BusReaderPlain<'a> {
pub params: BusParams,
reader: Box<dyn Read+ 'a>, }
impl <'a> BusReaderPlain <'a> {
pub fn new(fname: &str) -> Self {
let file_handle = File::open(fname).expect("FAIL");
let buf = BufReader::with_capacity(DEFAULT_BUF_SIZE, file_handle);
Self::from_read(buf)
}
pub fn from_read(mut reader: impl Read + 'a) -> Self {
let mut header_bytes = [0_u8; BUS_HEADER_SIZE];
reader.read_exact(&mut header_bytes).expect("failed to read header");
let header = BusHeader::from_bytes(&header_bytes);
let params = BusParams { cb_len: header.cb_len, umi_len: header.umi_len };
assert_eq!(
&header.magic, b"BUS\x00",
"Header struct not matching; MAGIC is wrong"
);
let mut buffer = Vec::with_capacity(header.tlen as usize);
for _i in 0..header.tlen {
buffer.push(0_u8);
}
reader.read_exact(&mut buffer).expect("failed to read variable header");
BusReaderPlain { params, reader: Box::new(reader) }
}
pub fn get_params(&self) -> &BusParams {
&self.params
}
}
impl <'a> Iterator for BusReaderPlain <'a> {
type Item = BusRecord;
fn next(&mut self) -> Option<Self::Item> {
let mut buffer = [0; BUS_ENTRY_SIZE]; match self.reader.read_exact(&mut buffer) {
Ok(()) => Some(BusRecord::from_bytes(&buffer)),
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => None,
_ => panic!("{e}"),
}
}
}
}
impl <'a> CUGIterator for BusReaderPlain<'a> {}
impl CUGIterator for std::vec::IntoIter<BusRecord> {}
pub enum BusWriter {
Plain(BusWriterPlain),
Compressed(BuszWriter),
}
impl BusWriter {
pub fn new(filename: &str, params: BusParams) -> Self {
if filename.ends_with(".busz") {
Self::Compressed(
BuszWriter::new(filename, params, 100_000) )
} else {
Self::Plain(
BusWriterPlain::new(filename, params)
)
}
}
pub fn write_iterator(&mut self, iter: impl Iterator<Item=BusRecord>) {
match self {
BusWriter::Plain(writer) => writer.write_records(&iter.collect()), BusWriter::Compressed(writer) => writer.write_iterator(iter),
}
}
}
pub struct BusWriterPlain {
writer: BufWriter<File>,
header: BusHeader,
}
impl BusWriterPlain {
pub fn from_filehandle(file_handle: File, params: BusParams) -> BusWriterPlain {
BusWriterPlain::new_with_capacity(file_handle, params, DEFAULT_BUF_SIZE)
}
pub fn new(filename: &str, params: BusParams) -> BusWriterPlain {
let file_handle: File = File::create(filename).expect("FAILED to open");
BusWriterPlain::from_filehandle(file_handle, params)
}
pub fn write_record(&mut self, record: &BusRecord) {
let binrecord = record.to_bytes();
self.writer
.write_all(&binrecord)
.expect("FAILED to write record");
}
pub fn write_records(&mut self, records: &Vec<BusRecord>) {
for r in records {
self.write_record(r)
}
self.writer.flush().unwrap();
}
pub fn new_with_capacity(file_handle: File, params: BusParams, bufsize: usize) -> Self {
let mut writer = BufWriter::with_capacity(bufsize, file_handle);
let custom_header_str = "BUS file produced by kallisto".as_bytes();
let header = BusHeader::new(params.cb_len, params.umi_len, custom_header_str.len() as u32, false);
let binheader = header.to_bytes();
writer
.write_all(&binheader)
.expect("FAILED to write header");
let varheader = custom_header_str;
writer
.write_all(varheader)
.expect("FAILED to write var header");
BusWriterPlain { writer, header }
}
pub fn write_iterator(&mut self, iter: impl Iterator<Item=BusRecord>) {
for r in iter {
self.write_record(&r)
}
self.writer.flush().unwrap();
}
}
pub struct BusFolder {
busfile: String,
matrix_ec_file: String,
transcript_file: String,
}
pub fn parse_ecmatrix(filename: &str) -> HashMap<EC, Vec<TranscriptId>> {
let file = File::open(filename).unwrap_or_else(|_| panic!("{} not found", filename));
let reader = BufReader::new(file);
let mut ec_dict: HashMap<EC, Vec<TranscriptId>> = HashMap::new();
for line in reader.lines() {
if let Ok(l) = line {
let mut s = l.split_whitespace();
let ec = s.next().unwrap().parse::<u32>().unwrap();
let tmp = s.next().unwrap();
let transcript_list: Vec<TranscriptId> =
tmp.split(',').map(|x| TranscriptId(x.parse::<u32>().unwrap())).collect();
ec_dict.insert(EC(ec), transcript_list);
} else {
panic!("Error reading file {}", filename)
}
}
ec_dict
}
fn parse_transcript(filename: &str) -> HashMap<TranscriptId, Transcriptname> {
let file = File::open(filename).unwrap();
let reader = BufReader::new(file);
let mut transcript_dict: HashMap<TranscriptId, Transcriptname> = HashMap::new();
for (i, line) in reader.lines().enumerate() {
if let Ok(l) = line {
transcript_dict.insert(TranscriptId(i as u32), Transcriptname(l));
}
}
transcript_dict
}
impl BusFolder {
pub fn new(foldername: &str) -> BusFolder {
let busfile = format!("{foldername}/output.corrected.sort.bus");
let matrix_ec_file = format!("{foldername}/matrix.ec");
let transcript_file = format!("{foldername}/transcripts.txt");
BusFolder { busfile, matrix_ec_file, transcript_file}
}
pub fn from_files(busfile: &str, matrix_ec_file: &str, transcript_file: &str) -> Self{
BusFolder { busfile: busfile.to_string(), matrix_ec_file: matrix_ec_file.to_string(), transcript_file: transcript_file.to_string()}
}
pub fn get_iterator(&self) -> BusReaderPlain {
let bfile = self.get_busfile();
BusReaderPlain::new(&bfile)
}
pub fn get_busfile(&self) -> String {
self.busfile.clone()
}
pub fn get_bus_params(&self) -> BusParams {
let h = BusHeader::from_file(&self.get_busfile());
BusParams {cb_len:h.cb_len, umi_len: h.umi_len}
}
pub fn get_ecmatrix_file(&self) -> String {
self.matrix_ec_file.clone()
}
pub fn get_transcript_file(&self) -> String {
self.transcript_file.clone()
}
pub fn parse_ecmatrix(&self) -> HashMap<EC, Vec<TranscriptId>> {
let filename = self.get_ecmatrix_file();
parse_ecmatrix(&filename)
}
pub fn parse_transcript(&self) -> HashMap<TranscriptId, Transcriptname> {
let filename = self.get_transcript_file();
parse_transcript(&filename)
}
pub fn get_cb_size(&self) -> usize {
let cb_iter_tmp = self.get_iterator().groupby_cb();
cb_iter_tmp.count()
}
pub fn get_cbumi_size(&self) -> usize {
let cb_iter_tmp = self.get_iterator().groupby_cbumi();
cb_iter_tmp.count()
}
pub fn make_mapper(&self, t2g_file: &str) -> Ec2GeneMapper {
make_mapper(self, t2g_file)
}
pub fn make_mapper_transcript(&self) -> Ec2TranscriptMapper {
make_mapper_transcript(self)
}
}
pub fn group_record_by_cb_umi(record_list: Vec<BusRecord>) -> HashMap<(u64, u64), Vec<BusRecord>> {
let mut cbumi_map: HashMap<(u64, u64), Vec<BusRecord>> = HashMap::new();
for r in record_list {
let rlist = cbumi_map.entry((r.CB, r.UMI)).or_default(); rlist.push(r);
}
cbumi_map
}
pub fn setup_busfile(records: &Vec<BusRecord>) -> (String, TempDir) {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let file_path = dir.path().join("output.corrected.sort.bus");
let tmpfilename = file_path.to_str().unwrap();
let mut writer = BusWriterPlain::new(tmpfilename, BusParams {cb_len: 16, umi_len: 12});
writer.write_records(records);
(tmpfilename.to_string(), dir)
}
pub fn write_partial_busfile(bfile: &str, boutfile: &str, nrecords: usize) {
let busiter = BusReaderPlain::new(bfile);
let mut buswriter = BusWriterPlain::new(boutfile, busiter.params.clone());
for record in busiter.take(nrecords) {
buswriter.write_record(&record);
}
}
#[cfg(test)]
mod tests {
use crate::consistent_genes::EC;
use crate::consistent_transcripts::TranscriptId;
use crate::io::{setup_busfile, BusHeader, BusReaderPlain, BusRecord, BusWriterPlain, BusParams};
use crate::iterators::CellGroupIterator;
use std::io::{BufReader, Read, Write};
use tempfile::tempdir;
#[test]
fn test_read_write_header() {
let r1 = BusRecord { CB: 1, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let dir = tempdir().unwrap();
let file_path = dir.path().join("test_read_write_header.bus");
let busname = file_path.to_str().unwrap();
let mut writer = BusWriterPlain::new(busname, BusParams {cb_len: 16, umi_len: 12});
writer.write_record(&r1);
writer.writer.flush().unwrap();
let bheader = BusHeader::from_file(&busname);
let header = BusHeader::new(16, 12, 20, false);
assert_eq!(header.magic, bheader.magic);
assert_eq!(header.cb_len, bheader.cb_len);
assert_eq!(header.umi_len, bheader.umi_len);
assert_eq!(header.version, bheader.version);
}
#[test]
fn test_read_write() {
let r1 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let r2 = BusRecord { CB: 1, UMI: 21, EC: 1, COUNT: 2, FLAG: 0 };
let rlist = vec![r1, r2]; let (busname, _dir) = setup_busfile(&rlist);
let reader = BusReaderPlain::new(&busname);
let records: Vec<BusRecord> = reader.into_iter().collect();
assert_eq!(records, rlist)
}
use std::fs::File;
use std::io::BufWriter;
use super::parse_ecmatrix;
#[test]
fn test_ecmatrix() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("foo.txt");
let tmpfilename = file_path.to_str().unwrap();
let file = File::create(tmpfilename).expect("Unable to create file");
let mut f = BufWriter::new(file);
let data = "0 1,2,3\n1 3,4\n2 4";
f.write_all(data.as_bytes()).expect("Unable to write data");
f.flush().unwrap();
let ec = parse_ecmatrix(tmpfilename);
let e1 = ec.get(&EC(0)).unwrap();
assert_eq!(*e1, vec![TranscriptId(1), TranscriptId(2), TranscriptId(3)]);
let e1 = ec.get(&EC(1)).unwrap();
assert_eq!(*e1, vec![TranscriptId(3), TranscriptId(4)]);
}
use super::group_record_by_cb_umi;
#[test]
fn test_grouping() {
let r1 = BusRecord { CB: 0, UMI: 1, EC: 0, COUNT: 12, FLAG: 0 };
let r2 = BusRecord { CB: 0, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
let r3 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let r4 = BusRecord { CB: 1, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
let r5 = BusRecord { CB: 1, UMI: 2, EC: 1, COUNT: 2, FLAG: 0 };
let r6 = BusRecord { CB: 1, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
let records = vec![
r1.clone(),
r2.clone(),
r3.clone(),
r4.clone(),
r5.clone(),
r6.clone(),
];
let grouped = group_record_by_cb_umi(records);
let g1 = grouped.get(&(0 as u64, 1 as u64)).unwrap();
assert_eq!(*g1, vec![r1, r2]);
let g2 = grouped.get(&(0 as u64, 2 as u64)).unwrap();
assert_eq!(*g2, vec![r3]);
let g3 = grouped.get(&(1 as u64, 1 as u64)).unwrap();
assert_eq!(*g3, vec![r4, r6]);
let g4 = grouped.get(&(1 as u64, 2 as u64)).unwrap();
assert_eq!(*g4, vec![r5]);
}
#[test]
fn test_insta_binary_format_write() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.bus");
let tmpfilename = file_path.to_str().unwrap();
let mut wri = BusWriterPlain::new(tmpfilename, BusParams {cb_len: 16, umi_len: 12});
let b = BusRecord {CB: 1, UMI: 1, EC: 1, COUNT: 3, FLAG: 0};
let c = BusRecord {CB: 0, UMI: 1, EC: 1, COUNT: 3, FLAG: 0};
let records = vec![b, c];
wri.write_iterator(records.into_iter());
let file = File::open(&tmpfilename).unwrap();
let mut reader = BufReader::new(file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).unwrap();
let digest_str = format!("{:x}", md5::compute(buffer));
insta::assert_yaml_snapshot!(digest_str, @r###"
---
3b49ecec15ae50a3cf2b7cfd37441ea1
"###);
}
#[test]
fn test_insta_binary_format_read() {
let external_file = "/home/michi/bus_testing/bus_output_short/output.corrected.sort.bus";
let records: Vec<BusRecord> = BusReaderPlain::new(external_file).step_by(10000).take(100).collect();
insta::assert_yaml_snapshot!(records)
}
#[test]
fn test_dyn_from_read_file(){
let r1 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let r2 = BusRecord { CB: 1, UMI: 21, EC: 1, COUNT: 2, FLAG: 0 };
let rlist = vec![r1.clone(), r2.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let f = File::open(busname).unwrap();
let mut r = BusReaderPlain::from_read(f);
assert_eq!(r.params, BusParams {cb_len: 16, umi_len: 12});
assert_eq!(r.next(), Some(r1));
assert_eq!(r.next(), Some(r2));
assert_eq!(r.next(), None);
}
#[test]
fn test_dyn_from_file(){
let r1 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let r2 = BusRecord { CB: 1, UMI: 21, EC: 1, COUNT: 2, FLAG: 0 };
let rlist = vec![r1.clone(), r2.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let mut r = BusReaderPlain::new(&busname);
assert_eq!(r.params, BusParams {cb_len: 16, umi_len: 12});
assert_eq!(r.next(), Some(r1));
assert_eq!(r.next(), Some(r2));
assert_eq!(r.next(), None);
}
#[test]
fn test_dyn_from_mem(){
let r1 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let r2 = BusRecord { CB: 1, UMI: 21, EC: 1, COUNT: 2, FLAG: 0 };
let rlist = vec![r1.clone(), r2.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let mut f = File::open(busname).unwrap();
let mut buffer = Vec::new();
f.read_to_end(&mut buffer).unwrap();
let mut r = BusReaderPlain::from_read(buffer.as_slice());
assert_eq!(r.params, BusParams {cb_len: 16, umi_len: 12});
assert_eq!(r.next(), Some(r1));
assert_eq!(r.next(), Some(r2));
assert_eq!(r.next(), None);
}
#[test]
fn test_dyn_group(){
let r1 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
let r2 = BusRecord { CB: 0, UMI: 3, EC: 0, COUNT: 12, FLAG: 0 };
let r3 = BusRecord { CB: 1, UMI: 21, EC: 1, COUNT: 2, FLAG: 0 };
let rlist = vec![r1.clone(), r2.clone(), r3.clone()]; let (busname, _dir) = setup_busfile(&rlist);
let mut f = File::open(busname).unwrap();
let mut buffer = Vec::new();
f.read_to_end(&mut buffer).unwrap();
let r = BusReaderPlain::from_read(buffer.as_slice());
let mut iter = r.groupby_cb();
assert_eq!(iter.next(), Some((0, vec![r1, r2])));
assert_eq!(iter.next(), Some((1, vec![r3])));
}
}