use anyhow::{anyhow, Result};
use std::path::{Path, PathBuf};
use super::fastx_io::PrefetchingIoHandler;
use super::parquet_io::PrefetchingParquetReader;
pub struct InputReaderConfig<'a> {
pub r1_path: &'a Path,
pub r2_path: Option<&'a PathBuf>,
pub batch_size: usize,
pub parallel_input_rg: usize,
pub is_parquet: bool,
pub trim_to: Option<usize>,
pub minimum_length: Option<usize>,
}
#[allow(clippy::large_enum_variant)]
pub enum ClassificationInput {
Parquet(PrefetchingParquetReader),
Fastx(PrefetchingIoHandler),
}
impl ClassificationInput {
pub fn finish(&mut self) -> Result<()> {
match self {
ClassificationInput::Parquet(reader) => reader.finish(),
ClassificationInput::Fastx(io) => io.finish(),
}
}
}
pub fn validate_input_config(is_parquet: bool, r2_path: Option<&PathBuf>) -> Result<()> {
if is_parquet && r2_path.is_some() {
return Err(anyhow!(
"Parquet input with separate R2 file is not supported. \
Use a Parquet file with 'sequence2' column for paired-end data."
));
}
Ok(())
}
pub fn create_input_reader(
config: &InputReaderConfig,
preserve_for_output: bool,
) -> Result<ClassificationInput> {
if config.is_parquet {
let parallel_rg_opt = if config.parallel_input_rg > 0 {
Some(config.parallel_input_rg)
} else {
None
};
log::info!(
"Using prefetching Parquet input reader (batch_size={}, parallel_rg={:?}) for {:?}",
config.batch_size,
parallel_rg_opt,
config.r1_path
);
let reader = PrefetchingParquetReader::with_parallel_row_groups(
config.r1_path,
config.batch_size,
parallel_rg_opt,
config.trim_to,
config.minimum_length,
)?;
Ok(ClassificationInput::Parquet(reader))
} else {
log::debug!(
"Using prefetching FASTX input reader (batch_size={}, preserve_for_output={}) for {:?}",
config.batch_size,
preserve_for_output,
config.r1_path
);
let reader = PrefetchingIoHandler::with_options(
config.r1_path,
config.r2_path,
None, config.batch_size,
config.trim_to,
config.minimum_length,
preserve_for_output,
)?;
Ok(ClassificationInput::Fastx(reader))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_input_config_parquet_with_r2_fails() {
let r2 = PathBuf::from("reads_R2.fastq");
let result = validate_input_config(true, Some(&r2));
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Parquet input with separate R2 file is not supported"));
assert!(err.contains("sequence2"));
}
#[test]
fn test_validate_input_config_parquet_without_r2_passes() {
let result = validate_input_config(true, None);
assert!(result.is_ok());
}
#[test]
fn test_validate_input_config_fastx_with_r2_passes() {
let r2 = PathBuf::from("reads_R2.fastq");
let result = validate_input_config(false, Some(&r2));
assert!(result.is_ok());
}
#[test]
fn test_validate_input_config_fastx_without_r2_passes() {
let result = validate_input_config(false, None);
assert!(result.is_ok());
}
}