use crate::cli::BedType;
use crate::config::Config;
use crate::detect::{detect_input_kind, Compression, InputFormat};
use crate::error::Result;
use crate::memory::max_mem_usage_mb;
use genepred::{Bed12, Bed3, Bed4, Bed5, Bed6, Bed9, GenePred, Gff, Gtf, Reader, ReaderOptions};
use genepred::{Writer, WriterOptions};
use rayon::prelude::*;
use std::io::{BufWriter, Write};
use std::path::Path;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
pub struct RunStats {
pub elapsed: Duration,
pub mem_delta_mb: f64,
}
pub fn run(config: &Config) -> Result<RunStats> {
let start = Instant::now();
let start_mem = max_mem_usage_mb();
let reader_options = build_reader_options(config);
let writer_options = build_writer_options(config);
let input_kind = detect_input_kind(&config.input)?;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(config.threads)
.build()?;
let chunks = pool.install(|| {
process_input(
&config.input,
&input_kind,
reader_options,
config,
&writer_options,
)
})?;
write_output(&config.output, chunks)?;
let elapsed = start.elapsed();
let mem_delta = (max_mem_usage_mb() - start_mem).max(0.0);
Ok(RunStats {
elapsed,
mem_delta_mb: mem_delta,
})
}
fn process_input(
path: &Path,
input_kind: &crate::detect::InputKind,
reader_options: ReaderOptions<'_>,
config: &Config,
writer_options: &WriterOptions,
) -> Result<Vec<(usize, Vec<u8>)>> {
match input_kind.format {
InputFormat::Gtf => process_reader::<Gtf>(
path,
reader_options,
config,
writer_options,
input_kind.compression,
),
InputFormat::Gff => process_reader::<Gff>(
path,
reader_options,
config,
writer_options,
input_kind.compression,
),
}
}
fn process_reader<F>(
path: &Path,
reader_options: ReaderOptions<'_>,
config: &Config,
writer_options: &WriterOptions,
compression: Compression,
) -> Result<Vec<(usize, Vec<u8>)>>
where
F: GxfReader + Send,
{
let reader = open_reader::<F>(path, reader_options, compression)?;
let mut outputs = reader
.par_chunks(config.chunks)?
.map(|(idx, chunk)| render_chunk(idx, chunk, config.bed_type, writer_options))
.collect::<Vec<_>>();
let mut merged = Vec::with_capacity(outputs.len());
for output in outputs.drain(..) {
merged.push(output?);
}
merged.sort_by_key(|(idx, _)| *idx);
Ok(merged)
}
fn open_reader<F: GxfReader>(
path: &Path,
options: ReaderOptions<'_>,
compression: Compression,
) -> Result<Reader<F>> {
if compression.is_compressed() {
Ok(F::from_gxf_with_options(path, options)?)
} else {
Ok(F::from_mmap_with_options(path, options)?)
}
}
fn build_reader_options(config: &Config) -> ReaderOptions<'_> {
let mut options = ReaderOptions::default();
if let Some(parent) = &config.parent_feature {
options = options.parent_feature(parent.as_bytes());
}
if let Some(childs) = &config.child_features {
let filtered: Vec<&[u8]> = childs
.iter()
.filter(|c| !c.is_empty())
.map(|c| c.as_bytes())
.collect();
if filtered.is_empty() {
options = options.clear_child_features();
} else {
options = options.child_features(filtered);
}
}
if let Some(parent) = &config.parent_attribute {
options = options.parent_attribute(parent.as_bytes());
}
if let Some(child) = &config.child_attribute {
options = options.child_attribute(child.as_bytes());
}
options
}
fn build_writer_options(config: &Config) -> WriterOptions {
if let Some(additional_fields) = &config.additional_fields {
let mut options = WriterOptions::default();
options = options.extras_allowlist(additional_fields.clone());
options = options.include_non_numeric_extras(true);
options
} else {
WriterOptions::default()
}
}
fn render_chunk(
idx: usize,
chunk: Vec<genepred::ReaderResult<GenePred>>,
bed_type: BedType,
writer_options: &WriterOptions,
) -> Result<(usize, Vec<u8>)> {
let mut buffer = Vec::with_capacity(chunk.len().saturating_mul(128));
{
let mut writer = BufWriter::with_capacity(128 * 1024, &mut buffer);
for record in chunk {
let record = record?;
write_record(&record, &mut writer, bed_type, writer_options)?;
}
writer.flush()?;
}
Ok((idx, buffer))
}
fn write_record<W: Write>(
record: &GenePred,
writer: &mut W,
bed_type: BedType,
writer_options: &WriterOptions,
) -> Result<()> {
match bed_type {
BedType::Bed3 => Writer::<Bed3>::from_record_with_options(record, writer, writer_options)?,
BedType::Bed4 => Writer::<Bed4>::from_record_with_options(record, writer, writer_options)?,
BedType::Bed5 => Writer::<Bed5>::from_record_with_options(record, writer, writer_options)?,
BedType::Bed6 => Writer::<Bed6>::from_record_with_options(record, writer, writer_options)?,
BedType::Bed9 => Writer::<Bed9>::from_record_with_options(record, writer, writer_options)?,
BedType::Bed12 => {
Writer::<Bed12>::from_record_with_options(record, writer, writer_options)?
}
}
Ok(())
}
fn write_output(path: &Path, chunks: Vec<(usize, Vec<u8>)>) -> Result<()> {
let file = std::fs::File::create(path)?;
let mut writer = BufWriter::with_capacity(256 * 1024, file);
for (_, buffer) in chunks {
writer.write_all(&buffer)?;
}
writer.flush()?;
Ok(())
}
trait GxfReader: genepred::BedFormat + Into<GenePred> + Sized {
fn from_gxf_with_options<P: AsRef<Path>>(
path: P,
options: ReaderOptions<'_>,
) -> genepred::ReaderResult<Reader<Self>>;
fn from_mmap_with_options<P: AsRef<Path>>(
path: P,
options: ReaderOptions<'_>,
) -> genepred::ReaderResult<Reader<Self>>;
}
impl GxfReader for Gtf {
fn from_gxf_with_options<P: AsRef<Path>>(
path: P,
options: ReaderOptions<'_>,
) -> genepred::ReaderResult<Reader<Self>> {
Reader::<Gtf>::from_gxf_with_options(path, options)
}
fn from_mmap_with_options<P: AsRef<Path>>(
path: P,
options: ReaderOptions<'_>,
) -> genepred::ReaderResult<Reader<Self>> {
Reader::<Gtf>::from_mmap_with_options(path, options)
}
}
impl GxfReader for Gff {
fn from_gxf_with_options<P: AsRef<Path>>(
path: P,
options: ReaderOptions<'_>,
) -> genepred::ReaderResult<Reader<Self>> {
Reader::<Gff>::from_gxf_with_options(path, options)
}
fn from_mmap_with_options<P: AsRef<Path>>(
path: P,
options: ReaderOptions<'_>,
) -> genepred::ReaderResult<Reader<Self>> {
Reader::<Gff>::from_mmap_with_options(path, options)
}
}