use std::sync::{Arc, Mutex, atomic::AtomicUsize};
use super::common::{default_progress_n, thousands_format};
use crate::transformations::prelude::*;
fn format_seconds_to_hhmmss(seconds: u64) -> String {
let hours = seconds / 3600;
let minutes = (seconds % 3600) / 60;
let secs = seconds % 60;
format!("{hours:02}:{minutes:02}:{secs:02}")
}
#[derive(Clone, JsonSchema)]
#[tpd]
#[derive(Debug)]
pub struct Progress {
#[schemars(skip)]
#[tpd(skip, default)]
pub total_count: Arc<AtomicUsize>,
#[schemars(skip)]
#[tpd(skip, default)]
pub start_time: Option<std::time::Instant>,
pub n: usize,
pub output_infix: String,
#[schemars(skip)]
#[tpd(skip, default)]
pub finalize_timepoint: Arc<Mutex<Option<std::time::Instant>>>,
#[schemars(skip)]
#[tpd(skip, default)]
writer: Arc<Mutex<Option<ChunkedRecordWriter>>>,
}
impl VerifyIn<PartialConfig> for PartialProgress {
fn verify(
&mut self,
_parent: &PartialConfig,
_options: &VerifyOptions,
) -> std::result::Result<(), ValidationFailure>
where
Self: Sized + toml_pretty_deser::Visitor,
{
self.n.or_with(default_progress_n);
self.output_infix.or_with(|| "--stdout--".to_string());
self.finalize_timepoint = Some(Arc::new(Mutex::new(None)));
Ok(())
}
}
impl Progress {
pub fn output(&self, msg: &str) -> Result<()> {
let mut guard = self
.writer
.lock()
.expect("writer lock must not be poisoned");
let writer = guard.as_mut().expect("Writer set in init");
let mut bytes = msg.as_bytes().to_vec();
bytes.push(b'\n');
writer
.write_text_record(&bytes)
.context("failed to write to progress file")?;
writer.flush().context("failed to flush progress file")?;
Ok(())
}
}
impl TagUser for PartialTaggedVariant<PartialProgress> {
fn declare_output_files(&self) -> Vec<OutputDeclaration> {
let inner = self
.toml_value
.value
.as_ref()
.expect("can't call declare_output_files when validation failed");
let infix = inner
.output_infix
.as_ref()
.expect("output_infix must be set in config");
vec![OutputDeclaration {
id: "progress".to_string(),
target: WriteTargetConfig::new(vec![infix.clone()], "progress".to_string()),
sink_config: SinkConfig::default(),
format: fastqrab_io::FileFormat::Text,
chunk_policy: ChunkPolicy::default(),
bam_options: None,
singleton: true,
span: inner.output_infix.span(),
}]
}
}
impl Step for Progress {
fn init(
&mut self,
input_info: &InputInfo,
mut output_files: StepOutputFiles,
_demultiplex_info: &OptDemultiplex,
) -> Result<Option<DemultiplexBarcodes>> {
let mut per_tag = output_files.take("progress");
let writer = per_tag
.remove(&0)
.expect("tag 0 writer must exist for progress output");
*self.writer.lock().expect("poisoned") = Some(writer);
self.start_time = Some(std::time::Instant::now());
self.output(&format!(
"Thread config: per_input_segment {}, processing: {}, per_output_file: {}",
input_info.threading_configuration.n_input_per_segment,
input_info.threading_configuration.n_processing,
input_info.threading_configuration.n_output,
))?;
Ok(None)
}
#[expect(
clippy::cast_precision_loss,
reason = "Number of reads is not going to be that high. And if it is, the loss of precision is ok."
)]
#[mutants::skip] fn apply(
&self,
block: FastQBlocksCombined,
_input_info: &InputInfo,
_demultiplex_info: &OptDemultiplex,
) -> anyhow::Result<(FastQBlocksCombined, bool)> {
let (counter, next) = {
let len = block.len();
let val = self
.total_count
.fetch_add(len, std::sync::atomic::Ordering::Relaxed);
let next = val + len;
(val, next)
};
let offset = counter % self.n;
for ii in ((counter + offset)..next).step_by(self.n) {
let elapsed = self
.start_time
.expect("start_time must be set when processing blocks")
.elapsed()
.as_secs_f64();
let rate_total = ii as f64 / elapsed;
let msg: String = if elapsed > 1.0 {
format!(
"Processed Total: {} ({:} molecules/s), Elapsed: {}s",
thousands_format(ii as f64, 0),
thousands_format(rate_total, 2),
self.start_time
.expect("start_time must be set when processing blocks")
.elapsed()
.as_secs()
)
} else {
format!(
"Processed Total: {}, Elapsed: {}s",
thousands_format(ii as f64, 0),
self.start_time
.expect("start_time must be set when processing blocks")
.elapsed()
.as_secs()
)
};
self.output(&msg)?;
}
if block.is_final {
self.output("Final block passed Progress stage.")?;
}
Ok((block, true))
}
#[expect(
clippy::cast_sign_loss,
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
reason = "loss of precision is ok for giant read counts"
)]
fn finalize(&self, _demultiplex_info: &OptDemultiplex) -> Result<Option<FinalizeReportResult>> {
let elapsed = self
.start_time
.unwrap_or_else(std::time::Instant::now)
.elapsed()
.as_secs_f64();
let count: usize = self.total_count.load(std::sync::atomic::Ordering::SeqCst) as usize;
let msg = format!(
"Took {:.2} s ({}) to process {} molecules for an effective rate of {:} molecules/s",
elapsed,
format_seconds_to_hhmmss(elapsed as u64),
thousands_format(count as f64, 0),
thousands_format(count as f64 / elapsed, 2),
);
self.output(&msg)?;
self.finalize_timepoint
.lock()
.expect("poisoned")
.replace(std::time::Instant::now());
Ok(None)
}
fn post_finalize(&self) {
let elapsed = self
.finalize_timepoint
.lock()
.expect("poisoned")
.unwrap_or_else(std::time::Instant::now)
.elapsed()
.as_secs_f64();
let msg = format!(
"Finalizing all steps took {:.2} s ({}).",
elapsed,
format_seconds_to_hhmmss(elapsed as u64)
);
self.output(&msg).ok();
if let Some(writer) = self.writer.lock().expect("poisoned").take() {
let _ = writer.finish().ok(); }
}
}
#[cfg(test)]
mod test {
#[test]
fn test_format_seconds_to_hhmmss() {
assert_eq!(super::format_seconds_to_hhmmss(0), "00:00:00");
assert_eq!(super::format_seconds_to_hhmmss(59), "00:00:59");
assert_eq!(super::format_seconds_to_hhmmss(60), "00:01:00");
assert_eq!(super::format_seconds_to_hhmmss(3599), "00:59:59");
assert_eq!(super::format_seconds_to_hhmmss(3600), "01:00:00");
assert_eq!(super::format_seconds_to_hhmmss(3601), "01:00:01");
assert_eq!(super::format_seconds_to_hhmmss(3661), "01:01:01");
assert_eq!(super::format_seconds_to_hhmmss(86399), "23:59:59");
assert_eq!(super::format_seconds_to_hhmmss(86400), "24:00:00");
assert_eq!(super::format_seconds_to_hhmmss(86400 * 10), "240:00:00");
}
}