#![cfg(target_family = "unix")]
use std::fs;
use std::io;
use std::io::Read;
use std::io::Write;
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::Mutex;
use rayon::iter::{ParallelBridge, ParallelIterator};
use crate::errors;
use crate::io::fasta;
#[derive(Debug, StructOpt)]
#[structopt(verbatim_doc_comment)]
pub struct ProtToKmerToLca {
#[structopt(short = "k", long = "length", default_value = "9")]
pub length: usize,
#[structopt(short = "o", long = "one-on-one")]
pub one_on_one: bool,
#[structopt(parse(from_os_str))]
pub fst_file: PathBuf,
#[structopt(parse(from_os_str), short = "s", long = "socket")]
pub socket: Option<PathBuf>,
#[structopt(short = "m", long = "in-memory")]
pub fst_in_memory: bool,
#[structopt(short = "c", long = "chunksize", default_value = "240")]
pub chunk_size: usize,
}
pub fn prot2kmer2lca(args: ProtToKmerToLca) -> errors::Result<()> {
let fst = if args.fst_in_memory {
let bytes = fs::read(&args.fst_file)?;
fst::Map::from_bytes(bytes)?
} else {
unsafe { fst::Map::from_path(&args.fst_file) }?
};
let default = if args.one_on_one { Some(0) } else { None };
if let Some(socket_addr) = &args.socket {
let listener = UnixListener::bind(socket_addr)?;
println!("Socket created, listening for connections.");
listener
.incoming()
.map(|stream| {
println!("Connection accepted. Processing...");
let stream = stream?;
stream_prot2kmer2lca(
&stream,
&stream,
&fst,
args.length,
args.chunk_size,
default,
)
})
.for_each(|result| match result {
Ok(_) => println!("Connection finished succesfully."),
Err(e) => println!("Connection died with an error: {}", e),
});
Ok(())
} else {
stream_prot2kmer2lca(
io::stdin(),
io::stdout(),
&fst,
args.length,
args.chunk_size,
default,
)
}
}
fn stream_prot2kmer2lca<R, W>(
input: R,
output: W,
fst: &fst::Map,
k: usize,
chunk_size: usize,
default: Option<u64>,
) -> errors::Result<()>
where
R: Read + Send,
W: Write + Send,
{
let output_mutex = Mutex::new(output);
fasta::Reader::new(input, true)
.records()
.chunked(chunk_size)
.par_bridge()
.map(|chunk| {
let chunk = chunk?;
let mut chunk_output = String::new();
for read in chunk {
if let Some(prot) = read.sequence.get(0).filter(|p| p.len() >= k) {
chunk_output.push_str(&format!(">{}\n", read.header));
let mut lcas = (0..(prot.len() - k + 1))
.map(|i| &prot[i..i + k])
.filter_map(|kmer| fst.get(kmer).map(Some).unwrap_or(default))
.map(|lca| lca.to_string())
.collect::<Vec<_>>()
.join("\n");
if !lcas.is_empty() {
lcas.push('\n');
}
chunk_output.push_str(&lcas);
}
}
output_mutex
.lock()
.unwrap()
.write_all(chunk_output.as_bytes())?;
Ok(())
})
.collect()
}