use crate::PeacoQCData;
use crate::error::{PeacoQCError, Result};
use crate::qc::consecutive::{ConsecutiveConfig, remove_short_regions};
use crate::qc::isolation_tree::{IsolationTreeConfig, isolation_tree_detect};
use crate::qc::mad::{MADConfig, mad_outlier_method};
use crate::qc::peaks::{
ChannelPeakFrame, PeakDetectionConfig, create_breaks, determine_peaks_all_channels,
};
use crate::qc::debug;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use tracing::{debug, info, trace, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QCMode {
All,
IsolationTree,
MAD,
None,
}
#[derive(Debug, Clone)]
pub struct PeacoQCConfig {
pub channels: Vec<String>,
pub determine_good_cells: QCMode,
pub min_cells: usize,
pub max_bins: usize,
pub events_per_bin: Option<usize>,
pub mad: f64,
pub it_limit: f64,
pub consecutive_bins: usize,
pub remove_zeros: bool,
pub peak_removal: f64,
pub min_nr_bins_peakdetection: f64,
pub force_it: usize,
#[cfg(feature = "flow-fcs")]
pub apply_compensation: bool,
#[cfg(feature = "flow-fcs")]
pub apply_transformation: bool,
#[cfg(feature = "flow-fcs")]
pub transform_cofactor: f32,
}
impl Default for PeacoQCConfig {
fn default() -> Self {
Self {
channels: Vec::new(),
determine_good_cells: QCMode::All,
min_cells: 150,
max_bins: 500,
events_per_bin: None,
mad: 6.0,
it_limit: 0.6,
consecutive_bins: 5,
remove_zeros: false,
peak_removal: 1.0 / 3.0,
min_nr_bins_peakdetection: 10.0,
force_it: 150,
#[cfg(feature = "flow-fcs")]
apply_compensation: true,
#[cfg(feature = "flow-fcs")]
apply_transformation: true,
#[cfg(feature = "flow-fcs")]
transform_cofactor: 2000.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeacoQCResult {
#[serde(skip)]
pub good_cells: Vec<bool>,
pub percentage_removed: f64,
pub it_percentage: Option<f64>,
pub mad_percentage: Option<f64>,
pub consecutive_percentage: f64,
pub peaks: HashMap<String, ChannelPeakFrame>,
pub n_bins: usize,
pub events_per_bin: usize,
}
impl PeacoQCResult {
pub fn export_csv_boolean(&self, path: impl AsRef<Path>) -> Result<()> {
crate::qc::export::export_csv_boolean(self, path, None)
}
pub fn export_csv_boolean_with_name(
&self,
path: impl AsRef<Path>,
column_name: &str,
) -> Result<()> {
crate::qc::export::export_csv_boolean(self, path, Some(column_name))
}
pub fn export_csv_numeric(
&self,
path: impl AsRef<Path>,
good_value: u16,
bad_value: u16,
) -> Result<()> {
crate::qc::export::export_csv_numeric(self, path, good_value, bad_value, None)
}
pub fn export_csv_numeric_with_name(
&self,
path: impl AsRef<Path>,
good_value: u16,
bad_value: u16,
column_name: &str,
) -> Result<()> {
crate::qc::export::export_csv_numeric(self, path, good_value, bad_value, Some(column_name))
}
pub fn export_json_metadata(
&self,
config: &PeacoQCConfig,
path: impl AsRef<Path>,
) -> Result<()> {
crate::qc::export::export_json_metadata(self, config, path)
}
}
pub fn peacoqc<T: PeacoQCData>(fcs: &T, config: &PeacoQCConfig) -> Result<PeacoQCResult> {
if config.channels.is_empty() {
return Err(PeacoQCError::ConfigError(
"No channels specified".to_string(),
));
}
let n_events = fcs.n_events();
info!(
"Starting PeacoQC analysis: {} events, {} channels",
n_events,
config.channels.len()
);
debug!("Channels: {:?}", config.channels);
let events_per_bin = config
.events_per_bin
.unwrap_or_else(|| find_events_per_bin(n_events, config.min_cells, config.max_bins, 500));
let breaks = create_breaks(n_events, events_per_bin);
let n_bins = breaks.len();
info!(
"Binning configuration: {} bins (50% overlap), {} events per bin (min_cells={}, max_bins={})",
n_bins, events_per_bin, config.min_cells, config.max_bins
);
let time_channel = if std::env::var("PEACOQC_DEBUG_BINS").is_ok() {
find_time_channel_for_debug(fcs)
} else {
None
};
info!(
"Starting peak detection across {} channels",
config.channels.len()
);
let peak_config = PeakDetectionConfig {
events_per_bin,
peak_removal: config.peak_removal,
min_nr_bins_peakdetection: config.min_nr_bins_peakdetection,
remove_zeros: config.remove_zeros,
};
debug!(
"Peak detection config: peak_removal={}, min_nr_bins={}, remove_zeros={}",
peak_config.peak_removal, peak_config.min_nr_bins_peakdetection, peak_config.remove_zeros
);
let peaks = determine_peaks_all_channels(fcs, &config.channels, &peak_config)?;
if peaks.is_empty() {
return Err(PeacoQCError::NoPeaksDetected);
}
info!(
"Peak detection complete: {} channels with peaks detected",
peaks.len()
);
trace!(
"Peak details per channel: {:?}",
peaks
.iter()
.map(|(ch, pf)| (ch, pf.peaks.len()))
.collect::<Vec<_>>()
);
let mut outlier_bins = vec![false; n_bins];
let mut it_percentage = None;
let mut mad_percentage = None;
let mut it_outliers = vec![false; n_bins];
let mut mad_outliers = vec![false; n_bins];
let mut consecutive_outliers = vec![false; n_bins];
match config.determine_good_cells {
QCMode::All | QCMode::IsolationTree => {
if n_bins >= config.force_it {
info!(
"Running Isolation Tree analysis (IT_limit={})",
config.it_limit
);
let it_config = IsolationTreeConfig {
it_limit: config.it_limit,
force_it: config.force_it,
..Default::default()
};
match isolation_tree_detect(&peaks, n_bins, &it_config) {
Ok(it_result) => {
outlier_bins = it_result.outlier_bins.clone();
it_outliers = it_result.outlier_bins.clone();
let n_it_outliers = outlier_bins.iter().filter(|&&x| x).count();
let it_pct = (n_it_outliers as f64 / n_bins as f64) * 100.0;
it_percentage = Some(it_pct);
info!(
"Isolation Tree analysis removed {:.2}% of the bins ({} outlier bins)",
it_pct, n_it_outliers
);
if std::env::var("PEACOQC_DEBUG_BINS").is_ok() {
if let Some(ref tc) = time_channel {
if let Ok(debug_info) = debug::collect_bin_debug_info(
fcs,
Some(tc),
&breaks,
&it_outliers,
&mad_outliers,
&consecutive_outliers,
&outlier_bins,
) {
debug::log_bin_debug_info(&debug_info, "After Isolation Tree");
}
}
}
}
Err(e) => {
warn!("Isolation Tree failed: {}, continuing with MAD only", e);
}
}
} else {
warn!(
"Not enough bins ({}) for Isolation Tree (need {}), skipping IT",
n_bins, config.force_it
);
}
}
_ => {}
}
if config.determine_good_cells == QCMode::All || config.determine_good_cells == QCMode::MAD {
info!(
"Running MAD outlier detection (MAD threshold={})",
config.mad
);
let mad_config = MADConfig {
mad_threshold: config.mad,
..Default::default()
};
let existing_good_bins: Vec<bool> =
outlier_bins.iter().map(|&is_outlier| !is_outlier).collect();
let mad_result = mad_outlier_method(&peaks, &existing_good_bins, n_bins, &mad_config)?;
let n_mad_outliers_before = outlier_bins.iter().filter(|&&x| x).count();
for (i, &is_mad_outlier) in mad_result.outlier_bins.iter().enumerate() {
if is_mad_outlier {
outlier_bins[i] = true;
mad_outliers[i] = true;
}
}
let n_mad_outliers = outlier_bins.iter().filter(|&&x| x).count();
let n_new_from_mad = n_mad_outliers.saturating_sub(n_mad_outliers_before); let mad_pct = (n_mad_outliers as f64 / n_bins as f64) * 100.0;
mad_percentage = Some(mad_pct);
info!(
"MAD analysis removed {:.2}% of the bins ({} outlier bins, {} from IT, {} new from MAD)",
mad_pct,
n_mad_outliers,
n_mad_outliers_before,
n_new_from_mad
);
if std::env::var("PEACOQC_DEBUG_BINS").is_ok() {
if let Some(ref tc) = time_channel {
if let Ok(debug_info) = debug::collect_bin_debug_info(
fcs,
Some(tc),
&breaks,
&it_outliers,
&mad_outliers,
&consecutive_outliers,
&outlier_bins,
) {
debug::log_bin_debug_info(&debug_info, "After MAD");
debug::analyze_events_per_second_correlation(&debug_info);
}
}
}
}
let n_outliers_before_consecutive = outlier_bins.iter().filter(|&&x| x).count();
if config.determine_good_cells != QCMode::None {
info!(
"Applying consecutive bin filtering (consecutive_bins={})",
config.consecutive_bins
);
let consecutive_config = ConsecutiveConfig {
consecutive_bins: config.consecutive_bins,
};
let outlier_bins_before_consecutive = outlier_bins.clone();
outlier_bins = remove_short_regions(&outlier_bins, &consecutive_config)?;
let n_outliers_after_consecutive = outlier_bins.iter().filter(|&&x| x).count();
for (i, (before, after)) in outlier_bins_before_consecutive.iter().zip(outlier_bins.iter()).enumerate() {
if !before && *after {
consecutive_outliers[i] = true;
}
}
let regions_removed = if n_outliers_after_consecutive >= n_outliers_before_consecutive {
n_outliers_after_consecutive - n_outliers_before_consecutive
} else {
0
};
debug!(
"Consecutive filtering: {} → {} outlier bins (removed {} short regions)",
n_outliers_before_consecutive, n_outliers_after_consecutive, regions_removed
);
if std::env::var("PEACOQC_DEBUG_BINS").is_ok() {
if let Some(ref tc) = time_channel {
if let Ok(debug_info) = debug::collect_bin_debug_info(
fcs,
Some(tc),
&breaks,
&it_outliers,
&mad_outliers,
&consecutive_outliers,
&outlier_bins,
) {
debug::log_bin_debug_info(&debug_info, "After Consecutive Filtering");
}
}
}
}
let good_cells = bin_mask_to_cell_mask_overlapping(&outlier_bins, &breaks, n_events);
let n_removed = good_cells.iter().filter(|&&x| !x).count();
let percentage_removed = (n_removed as f64 / n_events as f64) * 100.0;
let consecutive_percentage = percentage_removed - mad_percentage.unwrap_or(0.0);
info!(
"PeacoQC complete: {} events removed ({:.2}%), {} events remaining ({:.2}%)",
n_removed,
percentage_removed,
n_events - n_removed,
100.0 - percentage_removed
);
if percentage_removed > 70.0 {
warn!(
"More than 70% of events removed! This may indicate data quality issues or incorrect configuration."
);
}
if std::env::var("PEACOQC_DEBUG_BINS").is_ok() {
if let Some(ref tc) = time_channel {
if let Ok(debug_info) = debug::collect_bin_debug_info(
fcs,
Some(tc),
&breaks,
&it_outliers,
&mad_outliers,
&consecutive_outliers,
&outlier_bins,
) {
debug::log_bin_debug_info(&debug_info, "Final Results");
debug::analyze_events_per_second_correlation(&debug_info);
}
}
}
Ok(PeacoQCResult {
good_cells,
percentage_removed,
it_percentage,
mad_percentage,
consecutive_percentage,
peaks,
n_bins,
events_per_bin,
})
}
fn find_time_channel_for_debug<T: PeacoQCData>(fcs: &T) -> Option<String> {
fcs.channel_names().into_iter().find(|name| {
let upper = name.to_uppercase();
upper.contains("TIME") || upper == "TIME"
})
}
fn find_events_per_bin(n_events: usize, min_cells: usize, max_bins: usize, step: usize) -> usize {
let max_cells = ((n_events as f64 / max_bins as f64) * 2.0).ceil() as usize;
let max_cells_rounded = ((max_cells / step) * step) + step;
max_cells_rounded.max(min_cells)
}
fn bin_mask_to_cell_mask_overlapping(
bin_mask: &[bool], breaks: &[(usize, usize)],
n_events: usize,
) -> Vec<bool> {
let mut bad_cells: HashSet<usize> = HashSet::new();
for (bin_idx, &is_bad) in bin_mask.iter().enumerate() {
if is_bad {
if let Some(&(start, end)) = breaks.get(bin_idx) {
for cell_idx in start..end {
bad_cells.insert(cell_idx);
}
}
}
}
(0..n_events).map(|i| !bad_cells.contains(&i)).collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::PeacoQCData;
use crate::error::Result;
use flow_fcs::parameter::EventDataFrame;
use polars::prelude::*;
use std::sync::Arc;
struct TestFcs {
data_frame: EventDataFrame,
}
impl PeacoQCData for TestFcs {
fn n_events(&self) -> usize {
self.data_frame.height()
}
fn channel_names(&self) -> Vec<String> {
self.data_frame
.get_column_names()
.iter()
.map(|s| s.to_string())
.collect()
}
fn get_channel_range(&self, _channel: &str) -> Option<(f64, f64)> {
Some((0.0, 262144.0))
}
fn get_channel_f64(&self, channel: &str) -> Result<Vec<f64>> {
let series = self
.data_frame
.column(channel)
.map_err(|_| crate::PeacoQCError::ChannelNotFound(channel.to_string()))?;
let values = if let Ok(f64_vals) = series.f64() {
f64_vals.into_iter().filter_map(|x| x).collect()
} else if let Ok(f32_vals) = series.f32() {
f32_vals
.into_iter()
.filter_map(|x| x.map(|v| v as f64))
.collect()
} else {
return Err(crate::PeacoQCError::InvalidChannel(format!(
"Channel {} is not numeric (dtype: {:?})",
channel,
series.dtype()
)));
};
Ok(values)
}
}
#[test]
fn test_peacoqc_basic() {
let mut data = Vec::new();
for _ in 0..10000 {
data.push(100.0 + (rand::random::<f64>() - 0.5) * 20.0);
}
let df = Arc::new(
df![
"FL1-A" => data,
]
.unwrap(),
);
let fcs = TestFcs { data_frame: df };
let config = PeacoQCConfig {
channels: vec!["FL1-A".to_string()],
determine_good_cells: QCMode::MAD,
..Default::default()
};
let result = peacoqc(&fcs, &config);
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.good_cells.len(), 10000);
assert!(result.percentage_removed >= 0.0);
assert!(result.percentage_removed < 100.0);
}
#[test]
fn test_peacoqc_empty_channels() {
let df = Arc::new(
df![
"FL1-A" => vec![100.0f64; 1000],
]
.unwrap(),
);
let fcs = TestFcs { data_frame: df };
let config = PeacoQCConfig {
channels: vec![],
..Default::default()
};
let result = peacoqc(&fcs, &config);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), PeacoQCError::ConfigError(_)));
}
#[test]
fn test_peacoqc_invalid_channel() {
let df = Arc::new(
df![
"FL1-A" => vec![100.0f64; 1000],
]
.unwrap(),
);
let fcs = TestFcs { data_frame: df };
let config = PeacoQCConfig {
channels: vec!["NONEXISTENT".to_string()],
..Default::default()
};
let result = peacoqc(&fcs, &config);
assert!(result.is_err());
}
#[test]
fn test_peacoqc_small_dataset() {
let df = Arc::new(
df![
"FL1-A" => vec![100.0f64, 200.0, 300.0, 400.0, 500.0], ]
.unwrap(),
);
let fcs = TestFcs { data_frame: df };
let config = PeacoQCConfig {
channels: vec!["FL1-A".to_string()],
min_cells: 150, determine_good_cells: QCMode::None, ..Default::default()
};
let result = peacoqc(&fcs, &config);
if result.is_ok() {
let r = result.unwrap();
assert_eq!(r.good_cells.len(), 5);
}
}
#[test]
fn test_peacoqc_all_identical_values() {
let df = Arc::new(
df![
"FL1-A" => vec![100.0f64; 1000], ]
.unwrap(),
);
let fcs = TestFcs { data_frame: df };
let config = PeacoQCConfig {
channels: vec!["FL1-A".to_string()],
determine_good_cells: QCMode::MAD,
..Default::default()
};
let result = peacoqc(&fcs, &config);
if result.is_ok() {
let r = result.unwrap();
assert_eq!(r.good_cells.len(), 1000);
}
}
#[test]
fn test_peacoqc_qc_mode_none() {
let mut data = Vec::new();
for i in 0..5000 {
data.push(100.0 + (i % 100) as f64);
}
let df = Arc::new(
df![
"FL1-A" => data,
]
.unwrap(),
);
let fcs = TestFcs { data_frame: df };
let config = PeacoQCConfig {
channels: vec!["FL1-A".to_string()],
determine_good_cells: QCMode::None,
..Default::default()
};
let result = peacoqc(&fcs, &config);
assert!(result.is_ok());
let r = result.unwrap();
assert_eq!(r.good_cells.len(), 5000);
assert_eq!(r.percentage_removed, 0.0);
}
}