use crate::transformations::hamming_correct::{OnTie, Partial_HammingPreMatch, PreMatchData};
use crate::transformations::hamming_exact_counter::PartialHammingExactCounter;
use crate::transformations::{PartialTransformation, TagUser, Transformation};
use anyhow::{Result, anyhow, bail};
use bstr::BString;
use fastqrab_config::{RemovedTags, TagLabel, TagValueType, default_blocks_in_flight};
use fastqrab_config::{
default_block_size, default_buffer_size, default_output_buffer_size,
default_spot_check_read_pairing,
};
use fastqrab_io::io::output::chunked_writer::OutputDeclaration;
use fastqrab_io::io::{self, DetectedInputFormat};
use fastqrab_io::{CompressionFormat, FileFormat};
use indexmap::IndexMap;
use schemars::JsonSchema;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt::Write;
use std::path::Path;
use std::rc::Rc;
use toml_pretty_deser::{prelude::*, suggest_alternatives};
mod barcodes;
mod input;
pub mod options;
mod output;
mod segments;
mod tag_labels;
pub use fastqrab_config::segments::{
ResolvedSourceAll, ResolvedSourceNoAll, SegmentIndex, SegmentIndexOrAll, SegmentOrNameIndex,
};
pub use fastqrab_config::{offer_alternatives, validate_tag_name};
use fastqrab_io::get_number_of_cores;
pub use input::{Input, PartialInput, StructuredInput};
pub use barcodes::{Barcodes, BarcodesFromFile, PartialBarcodes};
pub use options::{Options, PartialOptions};
pub use output::{Output, PartialOutput, validate_compression_level_u8};
pub use segments::{DenyName, ValidateSegment};
pub use tag_labels::ValidateTagLabel;
#[derive(Debug)]
pub struct TagMetadata {
pub used: bool,
pub tag_type: TagValueType,
pub span: std::ops::Range<usize>,
}
pub fn config_from_string(toml: &str) -> Result<Config, DeserError<PartialConfig>> {
Config::tpd_from_toml(toml, FieldMatchMode::AnyCase, VecMode::SingleOk)
}
pub fn validate_segment_label(
label: &str,
match_mode: toml_pretty_deser::prelude::FieldMatchMode,
) -> Result<()> {
if label.is_empty() {
bail!(
"Segment name may not be empty or just whitespace. Please provide a segment name containing only letters, numbers, and underscores."
);
}
for (i, ch) in label.chars().enumerate() {
if i == 0 && !ch.is_ascii_alphabetic() && ch != '_' {
bail!("Segment label must start with a letter or underscore (^[a-zA-Z_]), got '{ch}'",);
}
if !ch.is_ascii_alphanumeric() && ch != '_' {
bail!(
"Segment label must contain only letters, numbers, and underscores (^[a-zA-Z0-9_]+$), found '{ch}'.",
);
}
}
for prohibited in &[
"fasta_fake_quality",
"bam_include_mapped",
"bam_include_unmapped",
"read_comment_character",
"use_rapidgzip",
"build_rapidgzip_index",
"threads_per_segment",
"tpd_field_match_mode",
] {
if match_mode.matches(label, prohibited) {
bail!(
"'{prohibited}' is not allowed as a segment label, as it could be confused with an existing option name or an internal. Please choose a different segment name, or prefix in with 'options.' if you meant the option."
);
}
}
Ok(())
}
#[derive(Debug, Clone, JsonSchema)]
#[tpd(no_verify)]
pub struct Benchmark {
#[tpd(default)]
pub enable: bool,
pub molecule_count: usize,
}
#[derive(Debug, JsonSchema, Default)]
#[expect(
dead_code,
reason = "we currently only use gzip for multi thread considerations, but set them all for consistency"
)]
struct InputFormatsObserved {
fastq: bool,
fasta: bool,
bam: bool,
gzip: bool,
}
#[derive(Debug)]
pub struct Stage {
pub transformation: Transformation,
pub allowed_tags: Vec<TagLabel>,
pub output_declarations: Vec<OutputDeclaration>,
}
#[derive(Debug, Clone)]
pub struct ThreadingConfiguration {
pub n_input_per_segment: std::num::NonZeroUsize,
pub n_output: std::num::NonZeroUsize,
pub n_processing: std::num::NonZeroUsize,
}
#[derive(JsonSchema)]
#[tpd(root)]
#[derive(Debug)]
pub struct Config {
#[tpd(nested)]
pub input: Input,
#[tpd(nested)]
pub output: Option<Output>,
#[schemars(with = "BTreeMap<String, Barcodes>")]
#[tpd(nested)]
pub barcodes: Option<IndexMap<TagLabel, Barcodes>>,
#[tpd(nested)]
pub options: Options,
#[tpd(alias = "step", alias = "steps", alias = "transforms")]
#[tpd(nested)]
pub transform: Vec<Transformation>,
#[tpd(nested)]
pub benchmark: Option<Benchmark>,
#[tpd(skip)]
pub report_labels: Vec<String>,
#[tpd(skip)]
pub allowed_tags_per_transformation: Vec<Vec<TagLabel>>,
#[tpd(skip)]
#[schemars(skip)]
pub output_declarations_per_transformation: Vec<Vec<OutputDeclaration>>,
}
#[derive(Debug)]
pub struct CheckedConfig {
pub input: Input,
pub output: Option<Output>,
pub stages: Vec<Stage>,
pub options: Options,
pub barcodes: IndexMap<TagLabel, Barcodes>,
pub benchmark: Option<Benchmark>,
pub report_labels: Vec<String>,
pub threading_configuration: ThreadingConfiguration,
}
impl VerifyIn<TPDRoot> for PartialConfig {
fn verify(
&mut self,
_parent: &TPDRoot,
_options: &VerifyOptions,
) -> std::result::Result<(), ValidationFailure>
where
Self: Sized,
{
self.options.or_with(|| PartialOptions {
threads: TomlValue::new_ok(None, 0..0),
max_blocks_in_flight: TomlValue::new_ok(default_blocks_in_flight(), 0..0),
block_size: TomlValue::new_ok(default_block_size().into(), 0..0),
buffer_size: TomlValue::new_ok(default_buffer_size(), 0..0),
output_buffer_size: TomlValue::new_ok(default_output_buffer_size(), 0..0),
accept_duplicate_files: TomlValue::new_ok(false, 0..0),
spot_check_read_pairing: TomlValue::new_ok(default_spot_check_read_pairing(), 0..0),
debug_failures: TomlValue::new_ok(
options::PartialFailureOptions {
fail_output_after_bytes: TomlValue::new_ok(None, 0..0),
fail_output_error: TomlValue::new_ok(None, 0..0),
fail_output_raw_os_code: TomlValue::new_ok(None, 0..0),
},
0..0,
),
});
if !self.input.is_ok() {
self.transform = TomlValue::new_ok(Vec::new(), 0..0);
}
self.verify_no_duplicate_files_no_empty_segments();
self.transform.or_default();
self.verify_reports();
self.verify_barcodes_and_segment_names_disjoint();
self.verify_benchmark_molecule_count();
self.disable_output_on_benchmark();
self.verify_for_any_output();
self.verify_head_rapidgzip_conflict();
self.expand_transformations();
let used_barcode_sections = self.verify_transformation_labels();
self.verify_output_filenames_unique();
self.verify_demultiplex_unique();
self.verify_barcodes_used(&used_barcode_sections);
self.verify_merge_demultiplexed();
Ok(())
}
}
impl PartialConfig {
fn verify_no_duplicate_files_no_empty_segments(&mut self) {
let mut seen_files: IndexMap<String, Vec<std::ops::Range<usize>>> = IndexMap::new();
if let Some(input) = self.input.as_mut()
&& let Some(segments) = input.segments.as_mut()
&& let Some(false) = self
.options
.as_ref()
.and_then(|options| options.accept_duplicate_files.as_ref())
{
for tv_files in segments.map.values_mut() {
if let Some(files) = tv_files.as_ref() {
if files.is_empty() {
tv_files.state = TomlValueState::ValidationFailed {
message: "Segment has no files specified".to_string(),
};
} else {
for tv_file in files {
if let Some(str_file) = tv_file.as_ref() {
match seen_files.entry(str_file.clone()) {
indexmap::map::Entry::Occupied(occupied_entry) => {
occupied_entry.into_mut().push(tv_file.span());
}
indexmap::map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(vec![tv_file.span()]);
}
}
} }
}
} }
for (_filename, spans) in seen_files {
if spans.len() > 1 {
let spans = spans
.into_iter()
.map(|span| (span, "This file is listed multiple times".to_string()))
.collect();
self.input.state = TomlValueState::Custom { spans };
self.input.help = Some(
"The same input file is listed multiple times. If this is intentional, set options.accept_duplicate_files = true.".to_string(),
);
}
}
}
}
fn verify_reports(&mut self) {
let report_html = self
.output
.value
.as_ref()
.and_then(|x| x.as_ref())
.and_then(|x| x.report_html.as_ref())
.is_some_and(|o| *o);
let report_json = self
.output
.as_ref()
.and_then(|x| x.as_ref())
.and_then(|x| x.report_json.as_ref())
.is_some_and(|o| *o);
let is_benchmark = self
.benchmark
.as_ref()
.and_then(|x| x.as_ref())
.and_then(|x| x.enable.as_ref())
.is_some_and(|o| *o);
self.transform.sync_nested_state();
let transforms_ok = self.transform.is_ok();
let mut report_transform = self.transform.as_mut().and_then(|x| {
x.iter_mut().find(|t| {
matches!(
t.as_ref(),
Some(
PartialTransformation::Report { .. }
| PartialTransformation::_InternalReadCount { .. }
)
)
})
});
if let Some(report_transform) = &mut report_transform
&& !(report_html || report_json)
&& !is_benchmark
{
let spans = vec![
(
self.output.span(),
"Add report_json | report_html here?".to_string(),
),
(
report_transform.span(),
"Report but no output.report_html | report_json".to_string(),
),
];
report_transform.state = TomlValueState::Custom { spans };
report_transform.help =
Some("Either remove the report, or enable it's output.".to_string());
} else if (report_html || report_json) && report_transform.is_none() && transforms_ok {
let mut spans = Vec::new();
if let Some(tv_report_html) = self
.output
.as_ref()
.and_then(|x| x.as_ref())
.map(|x| &x.report_html)
&& let Some(true) = tv_report_html.as_ref()
{
spans.push((tv_report_html.span(), "Set to true?".to_string()));
}
if let Some(tv_report_json) = self
.output
.as_ref()
.and_then(|x| x.as_ref())
.map(|x| &x.report_json)
&& let Some(true) = tv_report_json.as_ref()
{
spans.push((tv_report_json.span(), "Set to true?".to_string()));
}
if spans.is_empty() {
unreachable!("report_html or report_json was true")
}
self.output.state = TomlValueState::Custom { spans };
self.output.help =
Some("No report step, but report output requested.\nRemove/disable report_html & report_json, or add in a report step.".to_string());
} else {
let mut report_names_to_spans: IndexMap<
String,
Vec<&mut TomlValue<PartialTransformation>>,
> = IndexMap::new();
if let Some(transform) = self.transform.as_mut() {
for tv_transform in transform.iter_mut() {
if let Some(transform) = tv_transform.as_ref() {
let name = if let PartialTransformation::Report(config) = transform {
config
.toml_value
.as_ref()
.and_then(|x| x.name.as_ref())
.cloned()
} else if let PartialTransformation::_InternalReadCount(config) = transform
{
config
.toml_value
.as_ref()
.and_then(|x| x.out_label.as_ref())
.map(std::string::ToString::to_string)
} else {
None
};
if let Some(name) = name {
match report_names_to_spans.entry(name) {
indexmap::map::Entry::Occupied(occupied_entry) => {
occupied_entry.into_mut().push(tv_transform);
}
indexmap::map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(vec![tv_transform]);
}
}
}
} }
}
for (report_name, mut transforms) in report_names_to_spans {
if transforms.len() > 1 {
let spans = transforms
.iter()
.map(|tv_transform| {
(tv_transform.span(), "Report name not unique".to_string())
})
.collect();
transforms[0].state = TomlValueState::Custom { spans };
transforms[0].help = Some(format!(
"Multiple reports with the same name '{report_name}' found. Please ensure all reports have unique names."
));
}
}
}
}
fn verify_benchmark_molecule_count(&mut self) {
if let Some(Some(benchmark)) = self.benchmark.as_mut() {
benchmark.molecule_count.verify(|v| {
if *v == 0 {
Err(ValidationFailure::new(
"molecule_count must be > 0",
Some("Set to a positive integer."),
))
} else {
Ok(())
}
});
}
}
fn verify_head_rapidgzip_conflict(&mut self) {
let build_rapidgzip_index = self
.input
.as_ref()
.and_then(|i| i.options.as_ref())
.and_then(|o| o.build_rapidgzip_index.as_ref())
.and_then(|x| x.as_ref())
.copied()
.unwrap_or(false);
if !build_rapidgzip_index {
return;
}
let rapidgzip_span = self
.input
.as_ref()
.and_then(|i| i.options.as_ref())
.map(|o| o.build_rapidgzip_index.span())
.unwrap_or_default();
let mut head_transform = self.transform.as_mut().and_then(|x| {
x.iter_mut()
.find(|t| matches!(t.as_ref(), Some(PartialTransformation::Head(..))))
});
if let Some(head_tv) = &mut head_transform {
let spans = vec![
(
head_tv.span(),
"This Head transform conflicts with build_rapidgzip_index".to_string(),
),
(
rapidgzip_span,
"build_rapidgzip_index = true set here".to_string(),
),
];
head_tv.state = TomlValueState::Custom { spans };
head_tv.help = Some(
"build_rapidgzip_index and Head cannot be used together (index would not be created).\nSet build_rapidgzip_index to false.".to_string(),
);
}
}
fn verify_for_any_output(&mut self) {
let has_fastq_output = {
if let Some(Some(o)) = self.output.as_ref() {
let has_stdout = o.stdout.as_ref().copied().unwrap_or(false);
let has_outputs = o
.output
.as_ref()
.and_then(|x| x.as_ref())
.map_or(1, Vec::len)
!= 0;
let has_interleave = o
.interleave
.as_ref()
.and_then(|inner| inner.as_ref())
.is_some_and(|v| !v.is_empty());
has_stdout | has_outputs | has_interleave
} else {
false
}
};
let has_report_output = self
.output
.as_ref()
.and_then(|x| x.as_ref())
.is_some_and(|o| {
o.report_html.as_ref().copied().unwrap_or(false)
|| o.report_json.as_ref().copied().unwrap_or(false)
});
let has_tag_output = self.transform.value.as_ref().is_some_and(|transforms| {
transforms.iter().any(|t| {
matches!(
t.value.as_ref(),
Some(
PartialTransformation::StoreTagInFastQ(..)
| PartialTransformation::StoreTagsInTable(..)
| PartialTransformation::Inspect(..)
)
)
})
});
let is_benchmark = self
.benchmark
.as_ref()
.and_then(|x| x.as_ref())
.and_then(|x| x.enable.as_ref())
.copied()
.unwrap_or(false);
let output_ok = self.output.is_ok();
if !has_fastq_output && !has_report_output && !has_tag_output && !is_benchmark && output_ok
{
self.output.state = TomlValueState::ValidationFailed {
message: "No output files and no reports requested. Nothing to do.".to_string(),
};
self.output.help = Some(
"Add an [output] section with output files or reports, or use a benchmark configuration.".to_string(),
);
}
}
fn disable_output_on_benchmark(&mut self) {
if let Some(Some(benchmark)) = &self.benchmark.as_ref()
&& let Some(true) = benchmark.enable.as_ref()
{
self.output = TomlValue::new_ok(
Some(PartialOutput {
prefix: TomlValue::new_ok(String::from("benchmark"), 0..0),
suffix: TomlValue::new_ok(None, 0..0),
format: TomlValue::new_ok(FileFormat::default(), 0..0),
compression: TomlValue::new_ok(CompressionFormat::default(), 0..0),
compression_level: TomlValue::new_ok(None, 0..0),
compression_threads: TomlValue::new_ok(1, 0..0),
report_html: TomlValue::new_ok(false, 0..0),
report_json: TomlValue::new_ok(false, 0..0),
report_timing: TomlValue::new_ok(false, 0..0),
stdout: TomlValue::new_ok(false, 0..0),
interleave: TomlValue::new_ok(None, 0..0),
output: TomlValue::new_ok(Some(Vec::new()), 0..0),
output_hash_uncompressed: TomlValue::new_ok(false, 0..0),
output_hash_compressed: TomlValue::new_ok(false, 0..0),
ix_separator: TomlValue::new_ok(output::default_ix_separator(), 0..0),
chunksize: TomlValue::new_ok(None, 0..0),
bam: TomlValue::new_ok(None, 0..0),
}),
0..0,
);
}
}
fn verify_barcodes_and_segment_names_disjoint(&mut self) {
let mut segment_names = IndexMap::new();
if let Some(input) = self.input.as_mut()
&& let Some(structured) = input.structured.as_mut()
&& let Some(Some(barcodes)) = self.barcodes.as_mut()
{
match structured {
StructuredInput::Interleaved { .. } => {
if let Some(tv_interleaved) = input.interleaved.as_mut()
&& let Some(interleaved) = tv_interleaved.as_mut()
{
for tv_segment in interleaved.iter_mut() {
if let Some(segment) = tv_segment.as_mut() {
segment_names.insert(segment.clone(), tv_segment);
}
}
} }
StructuredInput::Segmented { .. } => {
if let Some(segments) = input.segments.as_mut() {
for tv_segment in &mut segments.keys {
if let Some(segment) = tv_segment.as_mut() {
segment_names.insert(segment.clone(), tv_segment);
}
}
} }
}
for tv_barcode_name in &mut barcodes.keys {
if let Some(barcode_name) = tv_barcode_name.as_ref()
&& let Some(tv_segment) = segment_names.get(barcode_name)
{
tv_barcode_name.state = TomlValueState::Custom {
spans: vec![
(
tv_barcode_name.span(),
"This barcode name collides with a segment name".to_string(),
),
(
tv_segment.span(),
"Segment with the same name defined here".to_string(),
),
],
};
tv_barcode_name.help = Some(
"Barcode names must not collide with segment names. Please choose a different name for this barcode.".to_string(),
);
}
}
}
}
pub fn expand_transformations(&mut self) {
self.transform.sync_nested_state(); if self.transform.value.is_some() {
let transform_span = self.transform.span();
if let Some(mut transforms) = self.transform.value.take() {
let expanded_transforms: RefCell<Vec<TomlValue<PartialTransformation>>> =
RefCell::new(Vec::new());
let mut res_report_labels = Vec::new();
let mut report_no = 0;
let mut push_existing =
|t: TomlValue<PartialTransformation>| expanded_transforms.borrow_mut().push(t);
let mut push_new = |t: PartialTransformation| {
expanded_transforms
.borrow_mut()
.push(TomlValue::new_ok(t, 0..0));
};
self.expand_spot_checks(&mut push_new, &transforms);
for mut t in transforms.drain(..) {
match t.as_mut() {
Some(PartialTransformation::ExtractRegion(step_config)) => {
let tag_span = step_config.tag_span.clone();
let step_config = step_config
.toml_value
.take()
.into_inner()
.expect("Parent was ok?");
let source_span = step_config.source.span.clone();
let regions = TomlValue::new_ok(
vec![TomlValue::new_ok(
crate::transformations::PartialRegionDefinition {
source: step_config.source,
start: step_config.start,
length: step_config.len,
anchor: step_config.anchor,
},
source_span,
)],
tag_span.clone(),
);
push_new(PartialTransformation::ExtractRegions(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::extract::PartialRegions {
out_label: step_config.out_label,
regions,
output_tag_type: None,
},
),
tag_span: tag_span.clone(),
},
));
}
Some(PartialTransformation::Report(report_config)) => {
Self::expand_reports(
&mut push_new,
&mut push_existing,
&mut res_report_labels,
&mut report_no,
&mut report_config.toml_value,
report_config.tag_span.clone(),
);
}
Some(PartialTransformation::_InternalReadCount(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.take().into_inner() {
res_report_labels.push(
step_config
.out_label
.as_ref()
.expect("parent was ok")
.to_string(),
);
let step_config: Box<_> =
Box::new(crate::transformations::Partial_InternalReadCount {
out_label: step_config.out_label,
report_no: Some(report_no),
count: Some(Default::default()),
});
report_no += 1;
push_new(PartialTransformation::_InternalReadCount(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(step_config),
tag_span,
},
));
}
}
Some(PartialTransformation::CalcGCContent(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.take().into_inner() {
push_new(PartialTransformation::CalcBaseContent(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::calc::PartialBaseContent::new(
step_config.out_label,
step_config.segment,
*step_config
.relative
.as_ref()
.expect("was required"),
BString::from("GC"),
BString::from("N"),
),
),
tag_span,
},
));
}
}
Some(PartialTransformation::CalcNContent(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.take().into_inner() {
push_new(PartialTransformation::CalcBaseContent(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::calc::PartialBaseContent::new(
step_config.out_label,
step_config.segment,
*step_config
.relative
.as_ref()
.expect("was required"),
BString::from("N"),
BString::default(),
),
),
tag_span,
},
));
}
}
Some(PartialTransformation::FilterEmpty(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.take().into_inner() {
let segment_index = *step_config
.segment
.as_ref()
.expect("parent was ok")
.as_ref_post()
.expect("parent was ok");
push_new(PartialTransformation::FilterByNumericTag(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::filters::PartialByNumericTag {
in_label: TomlValue::new_ok(
MustAdapt::PostVerify(TagLabel::Length(
segment_index,
"len_not_visible_to_user".to_string(),
)),
step_config.segment.span(),
),
min_value: TomlValue::new_ok_unplaced(Some(1.0)), max_value: TomlValue::new_ok_unplaced(None),
keep_or_remove: TomlValue::new_ok_unplaced(
crate::transformations::KeepOrRemove::Keep,
),
},
),
tag_span,
},
));
}
}
Some(PartialTransformation::ConvertQuality(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.as_ref() {
push_new(PartialTransformation::ValidateQuality(
PartialTaggedVariant {
toml_value:
TomlValue::new_ok_unplaced(
crate::transformations::validation::PartialValidateQuality {
encoding: TomlValue::new_ok(
*step_config.from.as_ref().expect("parent was ok"),
step_config.from.span(),
),
segment: TomlValue::new_ok_unplaced(MustAdapt::PostVerify(
SegmentIndexOrAll::All,
)),
},
),
tag_span,
}
));
push_existing(t);
}
}
Some(PartialTransformation::Lowercase(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.take().into_inner() {
push_new(PartialTransformation::_ChangeCase(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::edits::Partial_ChangeCase::new(
step_config.target,
crate::transformations::edits::CaseType::Lower,
step_config.if_tag,
),
),
tag_span,
},
));
}
}
Some(PartialTransformation::Uppercase(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.take().into_inner() {
push_new(PartialTransformation::_ChangeCase(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::edits::Partial_ChangeCase::new(
step_config.target,
crate::transformations::edits::CaseType::Upper,
step_config.if_tag,
),
),
tag_span,
},
));
}
}
Some(PartialTransformation::HammingCorrect(step_config)) => {
let tag_span = step_config.tag_span.clone();
if let Some(step_config) = step_config.toml_value.as_mut()
&& matches!(
step_config.on_tie.as_ref(),
Some(OnTie::ByMajority | OnTie::ByEditProbability)
)
&& let Some(barcodes_to_use) = step_config.barcodes.as_ref()
&& let Some(Some(barcodes_data)) = self.barcodes.as_ref()
&& let Some(barcodes_section) =
barcodes_data.map.get(barcodes_to_use)
&& let Some(barcodes_section) = barcodes_section.as_ref()
&& let Some(seq_to_name) = &barcodes_section.seq_to_name
{
let on_tie_min_molecules_to_start = *step_config.on_tie_min_molecules_to_start.as_ref().expect("parent was ok, VerifyIn<HammingCorrect> must have set this");
let reads_per_block = self.options.as_ref().and_then(|options| options.block_size.as_ref()).copied().expect("Expect options to have been set/defaulted in at this point");
let blocks_to_count =
on_tie_min_molecules_to_start / reads_per_block;
let pt = PartialHammingExactCounter::new(
step_config
.in_label
.as_ref()
.expect("parent was ok")
.clone(),
seq_to_name.clone(),
blocks_to_count,
);
step_config.majority_data = Some(pt.majority_data.clone());
push_new(PartialTransformation::_HammingExactCounter(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(pt),
tag_span: tag_span.clone(),
},
));
let resonator = step_config
.resonator
.as_ref()
.expect("verify must have built the resonator")
.clone();
let seq_to_idx = step_config
.seq_to_idx
.as_ref()
.expect("verify must have built seq_to_idx")
.clone();
let needs_qualities = matches!(
step_config.on_tie.as_ref(),
Some(OnTie::ByEditProbability)
);
let shared = std::sync::Arc::new(PreMatchData {
seq_to_name: seq_to_name.clone(),
seq_to_idx,
resonator,
needs_qualities,
pending: std::sync::Mutex::new(Default::default()),
});
step_config.pre_match = Some(Some(shared.clone()));
let in_label = step_config
.in_label
.as_ref()
.expect("parent was ok")
.clone();
push_new(PartialTransformation::_HammingPreMatch(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
Partial_HammingPreMatch::new(in_label, shared),
),
tag_span,
},
));
}
push_existing(t);
}
_ => {
push_existing(t);
}
}
}
self.transform =
TomlValue::new_ok(expanded_transforms.into_inner(), transform_span);
self.report_labels = Some(res_report_labels);
} else {
unreachable!()
}
} }
fn expand_spot_checks<F: FnMut(PartialTransformation)>(
&self,
mut push_new: F,
transforms: &[TomlValue<PartialTransformation>],
) {
if let Some(options) = self.options.as_ref()
&& let Some(spot_check_read_pairing) = options.spot_check_read_pairing.as_ref()
&& !spot_check_read_pairing
{
return;
}
if let Some(input) = self.input.as_ref()
&& input.segment_count() <= 1
{
return;
}
let has_validate_name = transforms
.iter()
.any(|step| matches!(step.as_ref(), Some(PartialTransformation::ValidateName(_))));
let has_spot_check = transforms.iter().any(|step| {
matches!(
step.as_ref(),
Some(PartialTransformation::ValidateReadPairing(_))
)
});
let is_benchmark = self
.benchmark
.as_ref()
.and_then(|x| x.as_ref())
.and_then(|x| x.enable.as_ref())
.copied()
.unwrap_or(false);
if !has_validate_name && !has_spot_check && !is_benchmark {
push_new(PartialTransformation::ValidateReadPairing(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(
crate::transformations::validation::PartialValidateReadPairing::new(None),
),
tag_span: 0..0,
},
));
}
}
fn expand_reports<
F: FnMut(PartialTransformation),
G: FnMut(TomlValue<PartialTransformation>),
>(
mut push_new: F,
_push_existing: G,
res_report_labels: &mut Vec<String>,
report_no: &mut usize,
tv_config: &mut TomlValue<crate::transformations::reports::PartialReport>,
_enum_tag_span: std::ops::Range<usize>,
) {
use crate::transformations::reports;
if let Some(config) = tv_config.as_ref() {
res_report_labels.push(config.name.as_ref().expect("parent was ok").clone());
}
if let Some(config) = tv_config.as_mut() {
if let Some(true) = config.count.as_ref() {
push_new(PartialTransformation::_ReportCount(PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportCount::new(*report_no),
)),
tag_span: 0..0,
}));
}
if let Some(true) = config.length_distribution.as_ref() {
push_new(PartialTransformation::_ReportLengthDistribution(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportLengthDistribution::new(*report_no),
)),
tag_span: 0..0,
},
));
}
if let Some(true) = config.duplicate_count_per_read.as_ref() {
push_new(PartialTransformation::_ReportDuplicateCount(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportDuplicateCount::new(
*report_no,
config.debug_reproducibility.clone(),
),
)),
tag_span: 0..0,
},
));
}
if let Some(true) = config.duplicate_count_per_fragment.as_ref() {
push_new(PartialTransformation::_ReportDuplicateFragmentCount(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportDuplicateFragmentCount::new(
*report_no,
config.debug_reproducibility.clone(),
),
)),
tag_span: 0..0,
},
));
}
if let Some(true) = config.base_statistics.as_ref() {
push_new(PartialTransformation::_ReportBaseStatisticsPart1(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportBaseStatisticsPart1::new(*report_no),
)),
tag_span: 0..0,
},
));
push_new(PartialTransformation::_ReportBaseStatisticsPart2(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportBaseStatisticsPart2::new(*report_no),
)),
tag_span: 0..0,
},
));
}
if let Some(Some(count_oligos)) = config.count_oligos.take().into_inner() {
let oligos_map: IndexMap<String, bstr::BString> = count_oligos
.map
.into_iter()
.filter_map(|(name, tv_seq)| {
tv_seq.into_inner().map(|seq| (name, seq.0)) })
.collect();
push_new(PartialTransformation::_ReportCountOligos(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportCountOligos::new(
*report_no,
oligos_map,
config.count_oligos_segment.clone(),
),
)),
tag_span: 0..0,
},
));
}
if let Some(Some(tag_histograms)) = config.tag_histograms.as_ref() {
for tag_name in tag_histograms {
push_new(PartialTransformation::_ReportTagHistogram(
PartialTaggedVariant {
toml_value: TomlValue::new_ok_unplaced(Box::new(
reports::Partial_ReportTagHistogram::new(
*report_no,
tag_name.clone(),
),
)),
tag_span: 0..0,
},
));
}
}
*report_no += 1;
} }
fn _set_type_error(
toml_source: &Rc<RefCell<(&mut TomlValueState, &mut Option<String>)>>,
tag_name: &str,
tag_types: &[TagValueType],
actual_tag_type: &TagValueType,
further_help: Option<&String>,
) {
*toml_source.borrow_mut().0 =
TomlValueState::new_validation_failed("Incompatible tag type");
let str_supposed_tag_types = tag_types
.iter()
.map(|t| format!("'{t}'"))
.collect::<Vec<_>>()
.join(", ");
let mut help_str = format!(
"This transform requires tag '{tag_name}' to be one of the following types: [{str_supposed_tag_types}],\n\
but it is actually of type '{actual_tag_type}'.",
);
if let Some(further_help) = further_help.as_ref() {
let _ = write!(help_str, "\n{further_help}");
}
*toml_source.borrow_mut().1 = Some(help_str);
}
pub fn verify_transformation_labels(&mut self) -> std::collections::HashSet<TagLabel> {
use crate::transformations::TagUser;
let mut allowed_tags_per_stage: Vec<Vec<TagLabel>> = Vec::new();
let mut tags_available: IndexMap<TagLabel, TagMetadata> = IndexMap::new();
let mut used_barcode_sections: std::collections::HashSet<TagLabel> =
std::collections::HashSet::new();
if let Some(transformations) = self.transform.value.as_mut() && let Some(input) = self.input.as_ref()
{
let mut just_trafos = transformations
.iter_mut()
.filter_map(|t| t.value.as_mut())
.collect::<Vec<_>>();
let mut all_tags_ever: IndexMap<String, std::ops::Range<usize>> = IndexMap::new();
let segment_order = input.get_segment_order();
let mut any_tag_errors = false;
for trafo in &mut just_trafos {
let Some(tag_info) = trafo.get_tag_usage(&tags_available, segment_order) else {
any_tag_errors = true;
continue;
};
used_barcode_sections.extend(tag_info.used_barcodes.iter().cloned());
let mut tags_used_here: Vec<TagLabel> = Vec::new();
match tag_info.removed_tags {
RemovedTags::None => {}
RemovedTags::All => {
for metadata in tags_available.values_mut() {
metadata.used = true;
}
tags_used_here.extend(tags_available.keys().cloned());
tags_available.clear();
}
RemovedTags::Some(tags) => {
for (tag_name, toml_source) in tags {
if let Some(metadata) = tags_available.get_mut(&tag_name) {
metadata.used = true;
tags_used_here.push(tag_name.clone());
} else {
any_tag_errors = true;
toml_source.state = TomlValueState::new_validation_failed(format!(
"No such tag: '{tag_name}'",
));
toml_source.help = Some(offer_alternatives(
tag_name.as_ref(),
&tags_available.keys().map(AsRef::as_ref).collect::<Vec<_>>(),
));
continue; }
tags_available.shift_remove(&tag_name);
}
}
}
if tag_info.must_see_all_tags {
tags_used_here.extend(tags_available.keys().cloned());
}
for used_tag_info in tag_info.used_tags.iter().filter_map(|x| x.as_ref()) {
let tag_name = &used_tag_info.name;
if !tag_name.is_virtual() {
let tag_types = &used_tag_info.accepted_tag_types;
let toml_source = &used_tag_info.toml_source;
let entry = tags_available.get_mut(tag_name);
if let Some(metadata) = entry {
metadata.used = true;
if tag_types
.iter()
.any(|tag_type| tag_type.compatible(metadata.tag_type))
{
if !tag_info.must_see_all_tags {
if tags_used_here.contains(tag_name) {
panic!(
"tag declared twice in used_tags, fix that! {tag_name}"
);
} else {
tags_used_here.push(tag_name.clone());
}
}
} else {
any_tag_errors = true;
Self::_set_type_error(
toml_source,
tag_name.as_ref(),
tag_types,
&metadata.tag_type,
used_tag_info.further_help.as_ref(),
);
}
} else {
any_tag_errors = true;
*toml_source.borrow_mut().0 = TomlValueState::new_validation_failed(
format!("No such tag: '{tag_name}'"),
);
if all_tags_ever.contains_key(tag_name.as_ref()) {
*toml_source.borrow_mut().1 = Some(format!(
"Tag '{tag_name}' was generated by a previous step, but it is not available at this point.\n\
This likely means that it was removed (forgotten) by an intermediate step.\n{}",
offer_alternatives(
tag_name.as_ref(),
&tags_available
.keys()
.map(AsRef::as_ref)
.collect::<Vec<_>>()
)
));
} else {
*toml_source.borrow_mut().1 = Some(offer_alternatives(
tag_name.as_ref(),
&tags_available.keys().map(AsRef::as_ref).collect::<Vec<_>>(),
));
}
}
} else {
if let Some(source_tag) = tag_name.source_tag()
&& let Some(entry) =
tags_available.get_mut(&TagLabel::Normal(source_tag.clone()))
{
entry.used = true;
match tag_name {
TagLabel::Normal(_) | TagLabel::ReadNo | TagLabel::Length(_, _) => {
unreachable!() } TagLabel::TagLength(_source_tag, _) => {
if !entry.tag_type.compatible(TagValueType::Location)
&& !entry.tag_type.compatible(TagValueType::String)
{
let toml_source = &used_tag_info.toml_source;
Self::_set_type_error(
toml_source,
source_tag,
&[TagValueType::String, TagValueType::Location],
&entry.tag_type,
used_tag_info.further_help.as_ref(),
);
}
}
TagLabel::TagLocation { .. } => {
if !entry.tag_type.compatible(TagValueType::Location) {
let toml_source = &used_tag_info.toml_source;
Self::_set_type_error(
toml_source,
source_tag,
&[TagValueType::Location],
&entry.tag_type,
used_tag_info.further_help.as_ref(),
);
}
}
}
}
tags_used_here.push(tag_name.clone());
}
}
allowed_tags_per_stage.push(tags_used_here);
if let Some(dt) = tag_info.declared_tag {
if let Some(meta) = tags_available.get(&dt.name) {
any_tag_errors = true;
let spans = vec![
(
dt.toml_source_span.clone(),
"Tag also declared here".to_string(),
),
(meta.span.clone(), "Tag declared here ".to_string()),
];
*dt.toml_source_state = TomlValueState::Custom { spans };
*dt.toml_source_help = Some(
"Rename either tag, or add a ForgetTag step inbetween.".to_string(),
);
} else {
all_tags_ever
.insert(dt.name.as_ref().to_string(), dt.toml_source_span.clone());
tags_available.insert(
dt.name.clone(),
TagMetadata {
used: false,
tag_type: dt.tag_type,
span: dt.toml_source_span,
},
);
}
}
}
self.allowed_tags_per_transformation = Some(allowed_tags_per_stage);
if let Some(Some(barcodes)) = self.barcodes.as_mut() {
for tv_barcode in &mut barcodes.keys {
if let Some(barcode_name) = tv_barcode.as_ref() {
any_tag_errors = true;
if all_tags_ever.contains_key(barcode_name) {
let spans = vec![
(
tv_barcode.span(),
"Barcode with the same name as a tag defined here".to_string(),
),
(
all_tags_ever
.get(barcode_name)
.expect("just checked contains_key")
.clone(),
"Tag with the same name as a barcode defined here".to_string(),
),
];
tv_barcode.state = TomlValueState::Custom { spans };
tv_barcode.help = Some(
"Barcode names must not collide with tag names. Please choose a different name for this barcode.".to_string(),
);
}
}
}
}
if let Some(input) = self.input.as_mut()
&& let Some(structured) = input.structured.as_ref()
{
let segment_iter: Vec<&mut TomlValue<String>> = match structured {
StructuredInput::Interleaved { .. } => input
.interleaved
.as_mut()
.and_then(|x| x.as_mut())
.map_or_else(std::vec::Vec::new, |x| x.iter_mut().collect::<Vec<_>>()),
StructuredInput::Segmented { .. } => input
.segments
.as_mut()
.map_or_else(std::vec::Vec::new, |x| x.keys.iter_mut().collect()),
};
for tv_segment in segment_iter {
if let Some(segment_name) = tv_segment.as_ref()
&& all_tags_ever.contains_key(segment_name)
{
any_tag_errors = true;
let spans = vec![
(
tv_segment.span(),
"Segment with the same name as a tag defined here".to_string(),
),
(
all_tags_ever
.get(segment_name)
.expect("just checked contains_key")
.clone(),
"Tag with the same name as a segment defined here".to_string(),
),
];
tv_segment.state = TomlValueState::Custom { spans };
tv_segment.help = Some(
"Segment names must not collide with tag names. Please choose a different name for this tag (or segment).".to_string(),
);
}
}
}
if let Some(Some(output)) = self.output.as_mut()
&& let Some(Some(bam_opts)) = output.bam.as_mut()
{
if let Some(map_and_keys) = bam_opts.tag_to_bam_tag.as_mut() {
for (toml_tag_label, tag_label) in
map_and_keys.keys.iter_mut().zip(map_and_keys.map.keys())
{
if let Some(meta) = tags_available.get_mut(tag_label.as_ref()) {
meta.used = true;
} else {
toml_tag_label.state =
TomlValueState::new_validation_failed("No such tag".to_string());
toml_tag_label.help = Some(offer_alternatives(
tag_label.as_ref(),
&tags_available.keys().map(AsRef::as_ref).collect::<Vec<_>>(),
));
}
}
let mut seen_bam_tags = IndexMap::new();
for bam_tag in map_and_keys.map.values_mut() {
if let Some(bam_tag_value) = bam_tag.as_mut()
&& let Some(other_span) =
seen_bam_tags.insert(bam_tag_value.0, bam_tag.span())
{
bam_tag.state = TomlValueState::Custom {
spans: vec![
(bam_tag.span(), "Repeated, 2nd use".to_string()),
(other_span, "Repeated, 1st use".to_string()),
],
};
bam_tag.help = Some(
"BAM tags must be distinct, \
can not write two tags into one BAM tag. Rename either one"
.to_string(),
);
}
}
} if let Some(Some(tag_to_ref)) = bam_opts.tag_to_reference.as_mut()
&& let Some(tag_name) = tag_to_ref.tag.as_mut()
{
if let Some(meta) = tags_available.get_mut(tag_name.as_str()) {
meta.used = true;
} else {
tag_to_ref.tag.help = Some(offer_alternatives(
tag_name.as_ref(),
&tags_available.keys().map(AsRef::as_ref).collect::<Vec<_>>(),
));
tag_to_ref.tag.state =
TomlValueState::new_validation_failed("No such tag".to_string());
}
}
}
if !any_tag_errors {
let mut spans = vec![];
for (_tag_label, meta) in &tags_available {
if !meta.used {
spans.push((meta.span.clone(), "Unused tag".to_string()));
}
}
if !spans.is_empty() {
self.transform.state = TomlValueState::Custom { spans };
self.transform.help = Some(
"Make sure to either use, or forget (using ForgetTag) all your tags."
.to_string(),
);
}
if let Some(trafos) = self.transform.as_mut() {
for idx in 0..trafos.len() {
let (before, rest) = trafos.split_at_mut(idx);
if let Some(trafo) = rest[0].as_mut() {
trafo.verify_others(
self.input.as_ref(),
self.output.as_ref().and_then(|o| o.as_ref()),
before,
);
} }
}
}
}
used_barcode_sections
}
pub fn verify_output_filenames_unique(&mut self) {
use fastqrab_io::io::output::chunked_writer::WriteTargetConfig;
type ConflictEntry = (usize, std::ops::Range<usize>);
let mut key_to_entries: IndexMap<(Vec<String>, String), Vec<ConflictEntry>> =
IndexMap::new();
let mut all_decls: Vec<Vec<OutputDeclaration>> = Vec::new();
if let Some(transforms) = self.transform.value.as_ref() {
for (idx, tv_transform) in transforms.iter().enumerate() {
let decls = tv_transform
.value
.as_ref()
.map(|t| t.declare_output_files())
.unwrap_or_default();
for decl in &decls {
if let WriteTargetConfig::File(ft) = &decl.target {
key_to_entries
.entry((ft.infix_parts().to_vec(), ft.suffix().to_string()))
.or_default()
.push((idx, decl.span.clone()));
}
}
all_decls.push(decls);
}
} let mut stdout_entries: Vec<(usize, std::ops::Range<usize>)> = Vec::new();
let mut stdout_has_demux_before: Vec<bool> = Vec::new();
{
let mut seen_demux = false;
if let Some(transforms) = self.transform.value.as_ref() {
for (idx, tv_transform) in transforms.iter().enumerate() {
let has_demux_before = seen_demux;
if matches!(
tv_transform.value.as_ref(),
Some(PartialTransformation::Demultiplex(_))
) {
seen_demux = true;
}
for decl in all_decls.get(idx).into_iter().flatten() {
if matches!(decl.target, WriteTargetConfig::Stdout) {
stdout_entries.push((idx, decl.span.clone()));
stdout_has_demux_before.push(has_demux_before);
}
}
}
} }
self.output_declarations_per_transformation = Some(all_decls);
for ((infix_parts, suffix), entries) in key_to_entries {
if entries.len() > 1 {
if let Some(transforms) = self.transform.value.as_mut() {
let spans: Vec<_> = entries
.iter()
.enumerate()
.map(|(n, (_, span))| {
(
span.clone(),
format!(
"{}step writing to this file",
if n == 0 { "1st " } else { "2nd " }
),
)
})
.collect();
let file_hint = if infix_parts.is_empty() {
format!("suffix .{suffix}") } else {
format!("infix '{}', suffix .{suffix}", infix_parts.join("_"))
};
transforms[entries[0].0].state = TomlValueState::Custom { spans };
transforms[entries[0].0].help = Some(format!(
"Two steps would write to the same output file ({file_hint}).\n\
Change the infix in one of them to avoid the conflict."
));
} }
}
if let Some(transforms) = self.transform.value.as_mut() {
for (entry_idx, (idx, span)) in stdout_entries.iter().enumerate() {
if stdout_has_demux_before[entry_idx] {
transforms[*idx].state = TomlValueState::Custom {
spans: vec![(span.clone(), "this step writes to stdout".to_string())],
};
transforms[*idx].help = Some(
"Cannot write to stdout after a Demultiplex step — output from multiple \
barcodes would be interleaved. Use an infix to write to a file instead."
.to_string(),
);
}
}
if stdout_entries.len() > 1 {
let spans: Vec<_> = stdout_entries
.iter()
.enumerate()
.map(|(n, (_, span))| {
(
span.clone(),
format!(
"{}step writing to stdout",
if n == 0 { "1st " } else { "2nd " }
),
)
})
.collect();
transforms[stdout_entries[0].0].state = TomlValueState::Custom { spans };
transforms[stdout_entries[0].0].help = Some(
"Multiple steps write to stdout. Only one step may write to stdout at a time."
.to_string(),
);
}
if let Some(Some(output_config)) = self.output.as_mut()
&& output_config.stdout.as_ref().is_some_and(|x| *x)
&& !stdout_entries.is_empty()
{
let mut spans = vec![(
output_config.stdout.span(),
"Stdout output conflict.".to_string(),
)];
for (_transform_idx, span) in stdout_entries {
spans.push((span.clone(), "This step also writes to stdout.".to_string()));
}
output_config.stdout.state = TomlValueState::Custom { spans };
output_config.stdout.help = Some(
"Stdout output from [output] conflicts with step(s) writing to stdout.\n\
Either change the [output] to write to files (or not at all), or change the step(s) to write to a file instead."
.to_string(),
);
}
} }
pub fn verify_demultiplex_unique(&mut self) {
let mut seen: IndexMap<String, std::ops::Range<usize>> = IndexMap::new();
if let Some(transforms) = self.transform.as_mut() {
for trafo in transforms.iter_mut() {
if let Some(PartialTransformation::Demultiplex(demultiplex_config)) = trafo.as_mut()
&& let Some(demultiplex_config_value) = demultiplex_config.toml_value.as_ref()
&& let Some(in_label) = demultiplex_config_value.in_label.as_ref()
{
let in_label: String = in_label.as_ref().to_string();
if let Some(old) =
seen.insert(in_label.clone(), demultiplex_config_value.in_label.span())
{
let spans = vec![
(
demultiplex_config_value.in_label.span(),
"2nd use of this label for demultiplexing".to_string(),
),
(
old.clone(),
"first use for this label for demultiplexing".to_string(),
),
];
demultiplex_config.toml_value.help = Some(
"Demultiplexing twice on the same label is nonsentical and unsupported.".to_string());
demultiplex_config.toml_value.state = TomlValueState::Custom { spans }
}
}
}
}
}
fn verify_barcodes_used(
&mut self,
used_barcode_sections: &std::collections::HashSet<TagLabel>,
) {
if self.input.is_ok() {
if let Some(Some(barcodes)) = self.barcodes.as_mut() {
for (barcode_section_name, tv_barcodes) in &mut barcodes.map {
let mut found_barcode_using_step =
used_barcode_sections.contains(barcode_section_name);
if !found_barcode_using_step
&& let Some(Some(output)) = self.output.as_ref()
&& let Some(Some(bam_opts)) = output.bam.as_ref()
&& let Some(Some(tag_to_ref)) = bam_opts.tag_to_reference.as_ref()
&& let Some(Some(barcodes_name)) =
tag_to_ref.references_from_barcodes.as_ref()
&& barcode_section_name.as_ref() == barcodes_name.as_str()
{
found_barcode_using_step = true;
}
if !found_barcode_using_step {
tv_barcodes.state = TomlValueState::new_validation_failed(
"Barcode section not referenced by any step",
);
tv_barcodes.help = Some(
"Add a step that uses this barcode section, use it in `output.bam.references_from_barcodes`, or remove it.".to_string(),
);
}
}
if let Some(Some(output)) = self.output.as_mut()
&& output.format.as_ref() == Some(&FileFormat::Bam)
&& let Some(Some(bam)) = output.bam.as_mut()
&& let Some(Some(tag_to_reference)) = bam.tag_to_reference.as_mut()
&& let Some(Some(from_barcode)) =
tag_to_reference.references_from_barcodes.as_ref()
&& !barcodes.map.contains_key(from_barcode.as_str())
{
tag_to_reference.references_from_barcodes.help = Some(offer_alternatives(
from_barcode,
&barcodes.map.keys().map(AsRef::as_ref).collect::<Vec<_>>(),
));
tag_to_reference.references_from_barcodes.state =
TomlValueState::new_validation_failed(
"Barcode section not found for output.bam.tag_to_reference",
);
}
}
}
}
fn verify_merge_demultiplexed(&mut self) {
let Some(Some(output)) = self.output.as_mut() else {
return;
};
let Some(Some(bam)) = output.bam.as_mut() else {
return;
};
if let Some(None) = bam.merge_demultiplexed.as_ref() {
return;
}
if output.format.as_ref() != Some(&FileFormat::Bam) {
bam.merge_demultiplexed.state = TomlValueState::new_validation_failed(
"merge_demultiplexed requires format = 'bam'",
);
bam.merge_demultiplexed.help =
Some("Set [output] format = 'bam' to use merge_demultiplexed.".to_string());
return;
}
let interleave_empty = match output.interleave.as_ref() {
None => false, Some(None) => true,
Some(Some(x)) => x.is_empty(),
};
if let Some(Some(output_segments)) = output.output.as_ref()
&& output_segments.is_empty()
&& interleave_empty
{
let spans = vec![
(
bam.merge_demultiplexed.span(),
"Incompatible with empty outputs".to_string(),
),
(
output.output.span(),
"These output segments are empty".to_string(),
),
];
bam.merge_demultiplexed.state = TomlValueState::Custom { spans };
bam.merge_demultiplexed.help = Some(
"Either remove 'merge_demultiplexed' or specify either output segments or interleaved output.".to_string(),
);
}
match bam.tag_to_reference.as_ref() {
Some(Some(tag_to_reference)) => {
let mut available_demultiplex_labels = Vec::new();
if let Some(ref_label) = tag_to_reference.tag.as_ref() {
if let Some(trafos) = self.transform.value.as_ref() {
for trafo in trafos {
if let Some(PartialTransformation::Demultiplex(demultiplex_config_toml)) =
trafo.value.as_ref()
&& let Some(demultiplex_config) =
demultiplex_config_toml.toml_value.value.as_ref()
&& let Some(TagLabel::Normal(in_label)) =
demultiplex_config.in_label.as_ref()
&& matches!(
demultiplex_config.lookup_mode,
Some(crate::transformations::demultiplex::LookupMode::NoLookup)
| None
)
{
available_demultiplex_labels.push(in_label.clone());
break;
}
}
} if !available_demultiplex_labels.contains(ref_label) {
bam.merge_demultiplexed.state = TomlValueState::new_validation_failed(
format!("No Demultiplex step found that had in_label = {ref_label}",),
);
if available_demultiplex_labels.is_empty() {
bam.merge_demultiplexed.help = Some(
"No suitable Demultiplex step found. Make sure you have a Demultiplex step with lookup_mode = 'lookup' and an in_label that matches output.bam.tag_to_reference.tag.".to_string(),
);
} else {
bam.merge_demultiplexed.help = Some(format!(
"Either add a Demultiplex step or reuse one of the following: {}",
suggest_alternatives("", &available_demultiplex_labels)
));
}
}
}
}
Some(None) => {
bam.merge_demultiplexed.state = TomlValueState::new_validation_failed(
"merge_demultiplexed requires tag_to_reference to be set.",
);
bam.merge_demultiplexed.help = Some(
"Either remove 'merge_demultiplexed' or set 'tag_to_reference' to specify how to assign reads to BAM references for merging."
.to_string(),
);
}
None => {
unreachable!() }
}
}
}
impl Config {
pub fn check(self) -> Result<CheckedConfig> {
self.inner_check(true)
}
fn inner_check(mut self, check_input_files_exist: bool) -> Result<CheckedConfig> {
let mut errors = Vec::new();
let stages = self.transforms_to_stages();
assert!(self.transform.is_empty());
let threading_configuration = if check_input_files_exist {
let input_formats_observed = self.check_input_format(&mut errors);
self.configure_multithreading(&input_formats_observed)
} else {
ThreadingConfiguration {
n_input_per_segment: std::num::NonZeroUsize::new(1).expect("Can't fail"),
n_output: std::num::NonZeroUsize::new(1).expect("Can't fail"),
n_processing: std::num::NonZeroUsize::new(1).expect("Can't fail"),
}
};
if !errors.is_empty() {
let combined_error = errors
.into_iter()
.map(|e| format!("{e:?}"))
.collect::<Vec<_>>()
.join("\n\n---------\n\n");
bail!("Multiple errors occurred:\n\n{combined_error}");
}
Ok(CheckedConfig {
input: self.input,
output: self.output,
stages,
options: self.options,
barcodes: self.barcodes.unwrap_or_default(),
benchmark: self.benchmark,
report_labels: self.report_labels,
threading_configuration,
})
}
pub fn check_for_validation(self) -> Result<CheckedConfig> {
self.inner_check(false)
}
#[expect(clippy::similar_names, reason = "domain names are that way")]
#[expect(clippy::too_many_lines, reason = "validation takes lines")]
#[mutants::skip] fn check_input_format(&mut self, errors: &mut Vec<anyhow::Error>) -> InputFormatsObserved {
let mut saw_fasta = false;
let mut saw_bam = false;
let mut saw_fastq = false;
let mut saw_gzip = false;
match &self.input.structured {
StructuredInput::Interleaved { files, .. } => {
let mut interleaved_format: Option<DetectedInputFormat> = None;
for filename in files {
if let Ok((format, compression_format)) =
io::input::detect_input_format(Path::new(filename))
{
if let Some(existing) = interleaved_format {
if existing != format {
errors.push(anyhow!(
"(input): Interleaved inputs must all have the same format. Found both {existing:?} and {format:?}."
));
}
} else {
interleaved_format = Some(format);
}
match format {
DetectedInputFormat::Fastq => {
saw_fastq = true;
if compression_format == CompressionFormat::Gzip {
saw_gzip = true;
}
}
DetectedInputFormat::Fasta => {
saw_fasta = true;
if compression_format == CompressionFormat::Gzip {
saw_gzip = true;
}
}
DetectedInputFormat::Bam => saw_bam = true,
}
} else {
}
}
}
StructuredInput::Segmented {
segment_order,
segment_files,
} => {
for segment_name in segment_order {
let mut segment_format: Option<DetectedInputFormat> = None;
if let Some(files) = segment_files.get(segment_name) {
for filename in files {
if let Ok((format, compression_format)) =
io::input::detect_input_format(Path::new(filename))
{
if let Some(existing) = segment_format {
if existing != format {
errors.push(anyhow!(
"(input): Segment '{segment_name}' mixes input formats {existing:?} and {format:?}. Mixing formats like this is not supported."
));
}
} else {
segment_format = Some(format);
}
match format {
DetectedInputFormat::Fastq => {
saw_fastq = true;
if compression_format == CompressionFormat::Gzip {
saw_gzip = true;
}
}
DetectedInputFormat::Fasta => {
saw_fasta = true;
if compression_format == CompressionFormat::Gzip {
saw_gzip = true;
}
}
DetectedInputFormat::Bam => saw_bam = true,
}
} else {
}
}
} }
}
}
if saw_fasta && self.input.options.fasta_fake_quality.is_none() {
errors.push(anyhow!(
"[input.options]: 'fasta_fake_quality' must be set when reading FASTA inputs."
));
}
if saw_bam {
let include_mapped = self.input.options.bam_include_mapped;
let include_unmapped = self.input.options.bam_include_unmapped;
if include_mapped.is_none() {
errors.push(anyhow!(
"[input.options]: 'bam_include_mapped' must be set (true or false) when reading BAM inputs."
));
}
if include_unmapped.is_none() {
errors.push(anyhow!(
"[input.options]: 'bam_include_unmapped' must be set (true or false) when reading BAM inputs."
));
} else if include_mapped == Some(false) && include_unmapped == Some(false) {
errors.push(anyhow!(
"[input.options]: At least one of 'bam_include_mapped' or 'bam_include_unmapped' must be true when reading BAM inputs."
));
}
}
InputFormatsObserved {
fastq: saw_fastq,
fasta: saw_fasta,
bam: saw_bam,
gzip: saw_gzip,
}
}
fn transforms_to_stages(&mut self) -> Vec<Stage> {
let allowed_tags_per_stage = self.allowed_tags_per_transformation.clone();
let output_declarations_per_stage = self.output_declarations_per_transformation.clone();
let stages: Vec<Stage> = self
.transform
.drain(..)
.zip(allowed_tags_per_stage)
.zip(output_declarations_per_stage)
.filter(|((t, _), _)| !matches!(t, Transformation::Report { .. }))
.map(|((t, tags), decls)| Stage {
transformation: t,
allowed_tags: tags.into_iter().collect(),
output_declarations: decls,
})
.collect();
stages
}
#[mutants::skip] fn configure_multithreading(
&mut self,
input_formats_observed: &InputFormatsObserved,
) -> ThreadingConfiguration {
let segment_count = self.input.parser_count();
let can_multicore_input = input_formats_observed.gzip;
let can_multicore_compression = self
.output
.as_ref()
.is_some_and(|o| matches!(o.compression, CompressionFormat::Gzip))
| self
.output
.as_ref()
.is_some_and(|o| matches!(o.format, FileFormat::Bam));
let (thread_count, input_threads_per_segment, output_threads) = calculate_thread_counts(
self.options.threads,
self.input.options.threads_per_segment,
self.output.as_ref().map(|x| x.compression_threads),
segment_count,
get_number_of_cores(),
can_multicore_input,
can_multicore_compression,
);
self.options.threads = Some(thread_count);
self.input.options.threads_per_segment = Some(input_threads_per_segment);
if let Some(output) = &mut self.output {
output.compression_threads = output_threads;
}
if self.input.options.threads_per_segment.expect("Set before") == 1
&& !self.input.options.build_rapidgzip_index.unwrap_or(false)
{
self.input.options.use_rapidgzip = false;
}
ThreadingConfiguration {
n_input_per_segment: std::num::NonZeroUsize::new(input_threads_per_segment)
.expect("Thread count must be > 0"),
n_output: std::num::NonZeroUsize::new(output_threads)
.expect("Thread count must be > 0"),
n_processing: std::num::NonZeroUsize::new(thread_count)
.expect("Thread count must be > 0"),
}
}
}
impl CheckedConfig {
pub fn get_ix_separator(&self) -> String {
self.output
.as_ref()
.map_or_else(output::default_ix_separator, |x| x.ix_separator.clone())
}
}
fn calculate_thread_counts(
step_thread_count: Option<usize>,
threads_per_segment: Option<usize>,
compression_threads: Option<usize>,
segment_count: usize,
cpu_count: usize,
can_multicore_decompression: bool,
can_multicore_compression: bool,
) -> (usize, usize, usize) {
let threads_per_segment = if can_multicore_decompression {
threads_per_segment
} else {
Some(1)
};
let compression_threads = compression_threads.unwrap_or_else(|| {
if can_multicore_compression {
let half = cpu_count / 2;
half.min(5)
} else {
1
}
});
match (step_thread_count, threads_per_segment) {
(Some(step_thread_count), Some(threads_per_segment)) => {
(step_thread_count, threads_per_segment, compression_threads)
}
(None, Some(threads_per_segment)) => (
cpu_count
.saturating_sub(threads_per_segment * segment_count)
.max(1),
threads_per_segment,
compression_threads,
),
(Some(thread_count), None) => {
let per_segment = (cpu_count.saturating_sub(thread_count) / segment_count).max(1);
(thread_count, per_segment, compression_threads)
}
(None, None) => {
let half = cpu_count / 2;
let threads_per_segment = (half / segment_count).clamp(1, 5);
(
cpu_count
.saturating_sub(threads_per_segment * segment_count)
.max(1),
threads_per_segment,
compression_threads,
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_tag_name_valid() {
validate_tag_name("a").unwrap();
validate_tag_name("A").unwrap();
validate_tag_name("_").unwrap();
validate_tag_name("abc").unwrap();
validate_tag_name("ABC").unwrap();
validate_tag_name("a123").unwrap();
validate_tag_name("A123").unwrap();
validate_tag_name("_123").unwrap();
validate_tag_name("tag_name").unwrap();
validate_tag_name("TagName").unwrap();
validate_tag_name("tag123_name").unwrap();
validate_tag_name("_private_tag").unwrap();
}
#[test]
fn test_validate_tag_name_invalid() {
validate_tag_name("").unwrap_err();
validate_tag_name("123").unwrap_err();
validate_tag_name("123abc").unwrap_err();
validate_tag_name("tag-name").unwrap_err();
validate_tag_name("tag.name").unwrap_err();
validate_tag_name("tag name").unwrap_err();
validate_tag_name("tag@name").unwrap_err();
validate_tag_name("tag/name").unwrap_err();
validate_tag_name("tag\\name").unwrap_err();
validate_tag_name("tag:name").unwrap_err();
validate_tag_name("len_123").unwrap_err();
validate_tag_name("len_shu").unwrap_err();
validate_tag_name("ReadName").unwrap_err();
validate_tag_name("read_no").unwrap_err();
}
#[test]
fn test_validate_segment_label_valid() {
let f = toml_pretty_deser::FieldMatchMode::Exact;
validate_segment_label("a", f).unwrap();
validate_segment_label("A", f).unwrap();
validate_segment_label("_", f).unwrap();
validate_segment_label("abc", f).unwrap();
validate_segment_label("ABC", f).unwrap();
validate_segment_label("123", f).unwrap_err();
validate_segment_label("a123", f).unwrap();
validate_segment_label("A123", f).unwrap();
validate_segment_label("123abc", f).unwrap_err();
validate_segment_label("read1", f).unwrap();
validate_segment_label("READ1", f).unwrap();
validate_segment_label("segment_name", f).unwrap();
validate_segment_label("segment123", f).unwrap();
validate_segment_label("_internal", f).unwrap();
}
#[test]
fn test_validate_segment_label_invalid() {
let f = toml_pretty_deser::FieldMatchMode::Exact;
validate_segment_label("", f).unwrap_err();
validate_segment_label("1", f).unwrap_err();
validate_segment_label("segment-name", f).unwrap_err();
validate_segment_label("segment.name", f).unwrap_err();
validate_segment_label("segment name", f).unwrap_err();
validate_segment_label("segment@name", f).unwrap_err();
validate_segment_label("segment/name", f).unwrap_err();
validate_segment_label("segment\\name", f).unwrap_err();
validate_segment_label("segment:name", f).unwrap_err();
validate_segment_label("fasta_fake_quality", f).unwrap_err();
validate_segment_label("bam_include_mapped", f).unwrap_err();
validate_segment_label("bam_include_unmapped", f).unwrap_err();
validate_segment_label("read_comment_character", f).unwrap_err();
validate_segment_label("use_rapidgzip", f).unwrap_err();
validate_segment_label("build_rapidgzip_index", f).unwrap_err();
validate_segment_label("threads_per_segment", f).unwrap_err();
validate_segment_label("tpd_field_match_mode", f).unwrap_err();
let f = toml_pretty_deser::FieldMatchMode::AnyCase;
validate_segment_label("FaSTA___FAKE-QUALITY", f).unwrap_err();
}
#[test]
fn test_calculate_thread_counts() {
assert_eq!(
calculate_thread_counts(Some(8), Some(2), None, 4, 16, true, false),
(8, 2, 1)
);
assert_eq!(
calculate_thread_counts(Some(8), Some(2), None, 40, 1, true, false),
(8, 2, 1)
);
assert_eq!(
calculate_thread_counts(None, Some(2), None, 4, 16, true, false),
(8, 2, 1)
);
assert_eq!(
calculate_thread_counts(Some(8), None, None, 4, 16, true, false),
(8, 2, 1)
);
assert_eq!(
calculate_thread_counts(Some(9), None, None, 4, 16, true, false),
(9, 1, 1)
);
assert_eq!(
calculate_thread_counts(None, None, None, 4, 16, true, false),
(8, 2, 1)
);
assert_eq!(
calculate_thread_counts(None, None, None, 2, 16, true, false),
(8, 4, 1)
);
assert_eq!(
calculate_thread_counts(None, None, None, 1, 16, true, false),
(11, 5, 1)
);
assert_eq!(
calculate_thread_counts(None, None, None, 1, 16, false, false),
(15, 1, 1)
);
assert_eq!(
calculate_thread_counts(None, None, None, 1, 16, false, true),
(15, 1, 5)
);
assert_eq!(
calculate_thread_counts(None, None, None, 1, 8, false, true),
(7, 1, 4)
);
}
}