use std::fmt;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::thread;
use anyhow::{bail, Context, Result};
use regex::bytes::{CaptureLocations, Regex};
use seq_geom_parser::{FragmentGeomDesc, GeomLen, GeomPiece, NucStr};
use needletail::{parse_fastx_file, Sequence};
use thousands::Separable;
use tracing::info;
use nix::sys::stat;
use nix::unistd;
use tempfile::tempdir;
#[derive(Debug)]
pub struct FragmentRegexDesc {
pub r1_cginfo: Vec<GeomPiece>,
pub r2_cginfo: Vec<GeomPiece>,
pub r1_re: Regex,
pub r2_re: Regex,
r1_clocs: CaptureLocations,
r2_clocs: CaptureLocations,
}
#[derive(Debug)]
pub struct SeqPair {
pub s1: String,
pub s2: String,
}
impl SeqPair {
pub fn new() -> Self {
SeqPair {
s1: String::new(),
s2: String::new(),
}
}
fn clear(&mut self) {
self.s1.clear();
self.s2.clear();
}
}
impl Default for SeqPair {
fn default() -> Self {
Self::new()
}
}
const BOUNDED_RANGE_LIMIT: u32 = 4;
const VAR_LEN_BC_PADDING: &[&str] = &["A", "AC", "AAG", "AAAT"];
#[inline(always)]
fn parse_single_read(
clocs: &CaptureLocations,
gpieces: &Vec<GeomPiece>,
r: &str,
outstr: &mut String,
) -> bool {
if gpieces.len() == 1 {
outstr.push_str(r);
} else {
for cl in 1..clocs.len() {
if let Some(g) = clocs.get(cl) {
outstr.push_str(r.get(g.0..g.1).unwrap());
match gpieces.get(cl - 1) {
Some(GeomPiece::Barcode(GeomLen::LenRange(_l, h)))
| Some(GeomPiece::Umi(GeomLen::LenRange(_l, h)))
| Some(GeomPiece::ReadSeq(GeomLen::LenRange(_l, h))) => {
let captured_len = g.1 - g.0;
outstr.push_str(VAR_LEN_BC_PADDING[(*h as usize) - (captured_len)]);
}
_ => {
}
}
} else {
return false;
}
}
}
true
}
fn get_simplified_piscem_string(geo_pieces: &[GeomPiece]) -> String {
let mut rep = String::new();
for gp in geo_pieces {
match gp {
GeomPiece::Discard(GeomLen::FixedLen(x)) => {
rep += &format!("x[{}]", x);
}
GeomPiece::Barcode(GeomLen::FixedLen(x)) => {
rep += &format!("b[{}]", x);
}
GeomPiece::Umi(GeomLen::FixedLen(x)) => {
rep += &format!("u[{}]", x);
}
GeomPiece::ReadSeq(GeomLen::FixedLen(x)) => {
rep += &format!("r[{}]", x);
}
GeomPiece::Discard(GeomLen::LenRange(_l, h)) => {
rep += &format!("x[{}]", h + 1);
}
GeomPiece::Barcode(GeomLen::LenRange(_l, h)) => {
rep += &format!("b[{}]", h + 1);
}
GeomPiece::Umi(GeomLen::LenRange(_l, h)) => {
rep += &format!("u[{}]", h + 1);
}
GeomPiece::ReadSeq(GeomLen::LenRange(_l, h)) => {
rep += &format!("r[{}]", h + 1);
}
GeomPiece::Discard(GeomLen::Unbounded) => {
rep += "x:";
}
GeomPiece::Barcode(GeomLen::Unbounded) => {
rep += "b:";
}
GeomPiece::Umi(GeomLen::Unbounded) => {
rep += "u:";
}
GeomPiece::ReadSeq(GeomLen::Unbounded) => {
rep += "r:";
}
_ => {
unimplemented!();
}
}
}
rep
}
fn get_simplified_geo(gp: &GeomPiece) -> GeomPiece {
match gp {
GeomPiece::Discard(GeomLen::LenRange(_l, h)) => {
GeomPiece::Discard(GeomLen::FixedLen(h + 1))
}
GeomPiece::Barcode(GeomLen::LenRange(_l, h)) => {
GeomPiece::Barcode(GeomLen::FixedLen(h + 1))
}
GeomPiece::Umi(GeomLen::LenRange(_l, h)) => GeomPiece::Umi(GeomLen::FixedLen(h + 1)),
GeomPiece::ReadSeq(GeomLen::LenRange(_l, h)) => {
GeomPiece::ReadSeq(GeomLen::FixedLen(h + 1))
}
_ => gp.clone(),
}
}
impl FragmentRegexDesc {
pub fn parse_into(&mut self, r1: &[u8], r2: &[u8], sp: &mut SeqPair) -> bool {
sp.clear();
let m1 = self.r1_re.captures_read(&mut self.r1_clocs, r1);
let m2 = self.r2_re.captures_read(&mut self.r2_clocs, r2);
if m1.or(m2).is_none() {
return false;
}
let s1 = unsafe { std::str::from_utf8_unchecked(r1) };
let s2 = unsafe { std::str::from_utf8_unchecked(r2) };
let parsed_r1 = parse_single_read(&self.r1_clocs, &self.r1_cginfo, s1, &mut sp.s1);
if parsed_r1 {
parse_single_read(&self.r2_clocs, &self.r2_cginfo, s2, &mut sp.s2)
} else {
false
}
}
pub fn get_simplified_geo_desc(&self) -> FragmentGeomDesc {
FragmentGeomDesc {
read1_desc: self
.r1_cginfo
.iter()
.map(get_simplified_geo)
.collect::<Vec<GeomPiece>>(),
read2_desc: self
.r2_cginfo
.iter()
.map(get_simplified_geo)
.collect::<Vec<GeomPiece>>(),
}
}
pub fn get_simplified_description_string(&self) -> String {
let mut rep = String::from("");
if !self.r1_cginfo.is_empty() {
let d = get_simplified_piscem_string(&self.r1_cginfo);
rep += &format!("1{{{}}}", d);
}
if !self.r2_cginfo.is_empty() {
let d = get_simplified_piscem_string(&self.r2_cginfo);
rep += &format!("2{{{}}}", d);
}
rep
}
}
pub trait FragmentGeomDescExt {
fn as_regex(&self) -> Result<FragmentRegexDesc, anyhow::Error>;
}
fn geom_piece_as_regex_string(gp: &GeomPiece) -> Result<(String, Option<GeomPiece>)> {
let mut rep = String::from("");
let mut geo = None;
match gp {
GeomPiece::Discard(GeomLen::FixedLen(x)) => {
rep.push_str(&format!(r#"[ACGTN]{{{}}}"#, x));
}
GeomPiece::Barcode(GeomLen::FixedLen(x))
| GeomPiece::Umi(GeomLen::FixedLen(x))
| GeomPiece::ReadSeq(GeomLen::FixedLen(x)) => {
rep.push_str(&format!(r#"([ACGTN]{{{}}})"#, x));
geo = Some(gp.clone());
}
GeomPiece::Discard(GeomLen::LenRange(l, h)) => {
if h - l > BOUNDED_RANGE_LIMIT {
bail!("Bounded range can have variable width at most {} but the current element {:?} has variable width {}.",
BOUNDED_RANGE_LIMIT, &gp, h-l);
}
rep.push_str(&format!(r#"[ACGTN]{{{},{}}}"#, l, h));
}
GeomPiece::Barcode(GeomLen::LenRange(l, h))
| GeomPiece::Umi(GeomLen::LenRange(l, h))
| GeomPiece::ReadSeq(GeomLen::LenRange(l, h)) => {
if h - l > BOUNDED_RANGE_LIMIT {
bail!("Bounded range can have variable width at most {} but the current element {:?} has variable width {}.",
BOUNDED_RANGE_LIMIT, &gp, h-l);
}
rep.push_str(&format!(r#"([ACGTN]{{{},{}}})"#, l, h));
geo = Some(gp.clone());
}
GeomPiece::Fixed(NucStr::Seq(s)) => {
rep.push_str(s);
}
GeomPiece::Discard(GeomLen::Unbounded) => {
rep += r#"[ACGTN]*"#;
}
GeomPiece::Barcode(GeomLen::Unbounded)
| GeomPiece::Umi(GeomLen::Unbounded)
| GeomPiece::ReadSeq(GeomLen::Unbounded) => {
rep += r#"([ACGTN]*)"#;
geo = Some(gp.clone());
}
}
Ok((rep, geo))
}
impl FragmentGeomDescExt for FragmentGeomDesc {
fn as_regex(&self) -> Result<FragmentRegexDesc, anyhow::Error> {
let mut r1_re_str = String::from("^");
let mut r1_cginfo = Vec::<GeomPiece>::new();
for geo_piece in &self.read1_desc {
let (str_piece, geo_len) = geom_piece_as_regex_string(geo_piece)?;
r1_re_str.push_str(&str_piece);
if let Some(elem) = geo_len {
r1_cginfo.push(elem);
}
}
if let Some(geo_piece) = &self.read1_desc.last() {
if geo_piece.is_fixed_len() {
let (str_piece, _geo_len) =
geom_piece_as_regex_string(&GeomPiece::Discard(GeomLen::Unbounded))?;
r1_re_str.push_str(&str_piece);
}
}
r1_re_str.push('$');
let mut r2_re_str = String::from("^");
let mut r2_cginfo = Vec::<GeomPiece>::new();
for geo_piece in &self.read2_desc {
let (str_piece, geo_len) = geom_piece_as_regex_string(geo_piece)?;
r2_re_str.push_str(&str_piece);
if let Some(elem) = geo_len {
r2_cginfo.push(elem);
}
}
if let Some(geo_piece) = &self.read2_desc.last() {
if geo_piece.is_fixed_len() {
let (str_piece, _geo_len) =
geom_piece_as_regex_string(&GeomPiece::Discard(GeomLen::Unbounded))?;
r2_re_str.push_str(&str_piece);
}
}
r2_re_str.push('$');
let r1_re = Regex::new(&r1_re_str)
.with_context(|| format!("Could not compile {} into regex description", r1_re_str))?;
let r2_re = Regex::new(&r2_re_str)
.with_context(|| format!("Could not compile {} into regex description", r2_re_str))?;
let cloc1 = r1_re.capture_locations();
let cloc2 = r2_re.capture_locations();
Ok(FragmentRegexDesc {
r1_cginfo,
r2_cginfo,
r1_re,
r2_re,
r1_clocs: cloc1,
r2_clocs: cloc2,
})
}
}
#[derive(Debug)]
pub struct FifoXFormData {
pub r1_fifo: PathBuf,
pub r2_fifo: PathBuf,
pub join_handle: thread::JoinHandle<Result<XformStats>>,
}
#[derive(Debug)]
pub struct XformStats {
pub total_fragments: u64,
pub failed_parsing: u64,
}
impl XformStats {
pub fn new() -> Self {
Self {
total_fragments: 0u64,
failed_parsing: 0u64,
}
}
}
impl Default for XformStats {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for XformStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
r#"XformStats {{
total fragments: {},
fragments failing parsing: {},
percentage successfully transformed fragments: {:.2},
}}"#,
self.total_fragments.separate_with_commas(),
self.failed_parsing.separate_with_commas(),
if self.total_fragments > 0 {
1_f64 - ((self.failed_parsing as f64) / (self.total_fragments as f64))
} else {
1_f64
} * 100_f64
)
}
}
pub fn xform_read_pairs_to_file(
mut geo_re: FragmentRegexDesc,
r1: &[PathBuf],
r2: &[PathBuf],
r1_ofile: PathBuf,
r2_ofile: PathBuf,
) -> Result<XformStats> {
let f1 = File::create(r1_ofile).expect("Unable to open read 1 file");
let f2 = File::create(r2_ofile).expect("Unable to open read 2 file");
let mut stream1 = BufWriter::new(f1);
let mut stream2 = BufWriter::new(f2);
let mut xform_stats = XformStats::new();
let mut parsed_records = SeqPair::new();
for (filename1, filename2) in r1.iter().zip(r2.iter()) {
let mut reader = parse_fastx_file(filename1).expect("valid path/file");
let mut reader2 = parse_fastx_file(filename2).expect("valid path/file");
while let (Some(record), Some(record2)) = (reader.next(), reader2.next()) {
xform_stats.total_fragments += 1;
let seqrec = record.expect("invalid record");
let seqrec2 = record2.expect("invalid record");
if geo_re.parse_into(seqrec.sequence(), seqrec2.sequence(), &mut parsed_records) {
unsafe {
std::write!(
&mut stream1,
">{}\n{}\n",
std::str::from_utf8_unchecked(seqrec.id()),
parsed_records.s1
)
.expect("couldn't write output to file 1");
std::write!(
&mut stream2,
">{}\n{}\n",
std::str::from_utf8_unchecked(seqrec2.id()),
parsed_records.s2
)
.expect("couldn't write output to file 2");
}
} else {
xform_stats.failed_parsing += 1;
}
}
}
Ok(xform_stats)
}
pub fn xform_read_pairs_to_fifo(
geo_re: FragmentRegexDesc,
r1: Vec<PathBuf>,
r2: Vec<PathBuf>,
) -> Result<FifoXFormData> {
if r1.len() != r2.len() {
bail!(
"The number of R1 files ({}) must match the number of R2 files ({})",
r1.len(),
r2.len()
);
}
let tmp_dir = tempdir()?;
let r1_fifo = tmp_dir.path().join("r1.pipe");
let r2_fifo = tmp_dir.path().join("r2.pipe");
match unistd::mkfifo(&r1_fifo, stat::Mode::S_IRWXU) {
Ok(_) => {
info!("created {:?}", r1_fifo);
assert!(std::path::Path::new(&r1_fifo).exists());
}
Err(err) => bail!("Error creating read 1 fifo: {}", err),
}
match unistd::mkfifo(&r2_fifo, stat::Mode::S_IRWXU) {
Ok(_) => {
info!("created {:?}", r2_fifo);
assert!(std::path::Path::new(&r2_fifo).exists());
}
Err(err) => bail!("Error creating read 2 fifo: {}", err),
}
let r1_fifo_clone = r1_fifo.clone();
let r2_fifo_clone = r2_fifo.clone();
let join_handle: thread::JoinHandle<Result<XformStats>> = thread::spawn(move || {
let xform_stats = xform_read_pairs_to_file(geo_re, &r1, &r2, r1_fifo_clone, r2_fifo_clone)?;
match tmp_dir.close() {
Ok(_) => Ok(xform_stats),
Err(e) => {
bail!("When closing (deleting) the temp directory, the following error was encountered {:?}", e);
}
}
});
Ok(FifoXFormData {
r1_fifo,
r2_fifo,
join_handle,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sciseq3_transforms() {
let test_technical_reads = vec![
("TNGCGCATTCAGAGCGCCACTTTCGGAAGATATTTT", true, 9),
("TNTATACCTTCAGAGCGTGAGGATGTCCTAGAGGTT", true, 10),
("AGAGATGAATCAGAGCTGTGCCGGGCTAACCTCATT", true, 10),
("TGAACGCGTTTTTTTTTTTTTTTTTTTTTTTTTTTT", false, 0),
("AAACTCCAATCAGAGCTCCGAGACAACCATTGGATT", true, 10),
("ACGAGGTTTCTGAGCCGATAAAGTGATGGCCTTTTT", false, 0),
("GCTCTTAGTCAGAGCCGTTTTGGGCGACGCCTTTTT", true, 9),
("TCCGTATGTCAGAGCGACTGATGTTATAGCAGATTT", true, 9),
("TCTCTCCATCAGAGCAAAAGATTCATTCAATCATTC", true, 9),
("AGAACTCCTCTGAGCAATGTCGCTTATTCTGAGTTT", false, 0),
("AAGTATTGGTCAGAGCTACGCATTACGCAACTCCTT", true, 10),
("TGTCCTTATTCAGAGCCCATTTACGCCACGCAGCTC", true, 10),
("GCGCTCAATCAGAGCCGGTGGAAAGACTCAAGCCCT", true, 9),
("TTCTTAACCTCAGAGCTGACTAGTACTAGAGAGTTT", true, 10),
("TCCTCGAGTCAGAGCGTCCTGGCTTAATTATTGTTT", true, 9),
("AACTGGCATCAGAGCCCTCTAATGATCGCTTCTTTT", true, 9),
("TATGCGATTTCAGAGCGCGGGGGGCCGAGAATCCTT", true, 10),
("TAGTTACCTTCAGAGCGGTTTACACATCCGACTATT", true, 10),
("CAAGCAACTCAGAGCTTTTTCTTTCGCGGTTGGTTT", true, 9),
("ACCGTAGCTCAGAGCGGCCAGTTACGCAACTCCTTT", true, 9),
("CCAAGGATTCAGAGCGGCGCGCCATCCATGACTTTT", true, 9),
("TTACTAAGTCAGAGCAGAGGGACGCCAGGATCTTTT", true, 9),
("GTAGCGATTCAGAGCAGGCGTGATTATAGCAGATTT", true, 9),
("AGCAACGATCTGAGCATATTCAGACCGCGCAACCAT", false, 0),
("ATTAATGCCTCAGAGCACGGTACAGATCTTACGCTT", true, 10),
];
let geo = FragmentGeomDesc::try_from("1{b[9-10]f[CAGAGC]u[8]b[10]}2{r:}").unwrap();
let mut sp = SeqPair::new();
match geo.as_regex() {
Ok(mut geo_re) => {
for (tr, should_parse, pref_len) in &test_technical_reads {
let r = geo_re.parse_into(tr.as_bytes(), tr.as_bytes(), &mut sp);
assert_eq!(r, *should_parse);
if r {
dbg!("tr = {}, sp = {:?}", &tr, &sp);
match pref_len {
9 => {
assert_eq!(&sp.s1[9..11], VAR_LEN_BC_PADDING[1]);
}
10 => {
assert_eq!(&sp.s1[10..11], VAR_LEN_BC_PADDING[0]);
}
_ => {
panic!("shouldn't happen");
}
}
}
}
}
Err(e) => {
panic!("{:?} : couldn't parse valid geometry!", e);
}
}
}
}