use arrow::{
array::{Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use crate::{Error, ExpressionMatrix, Result};
pub struct Normalize;
impl Normalize {
pub fn log2(matrix: &ExpressionMatrix) -> Result<ExpressionMatrix> {
let mut columns: Vec<std::sync::Arc<dyn Array>> = Vec::with_capacity(matrix.samples.len());
for col_idx in 0..matrix.values.num_columns() {
let col = matrix.values.column(col_idx);
let array = col
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| Error::Normalization("Expected Float64Array".to_string()))?;
let transformed: Vec<Option<f64>> = (0..array.len())
.map(|i| {
if array.is_null(i) {
None
} else {
let x = array.value(i);
Some((x + 1.0).log2())
}
})
.collect();
columns.push(std::sync::Arc::new(Float64Array::from(transformed)));
}
let schema = Schema::new(
matrix
.samples
.iter()
.map(|s| Field::new(s.clone(), DataType::Float64, true))
.collect::<Vec<_>>(),
);
let batch = RecordBatch::try_new(std::sync::Arc::new(schema), columns)?;
Ok(ExpressionMatrix {
genes: matrix.genes.clone(),
samples: matrix.samples.clone(),
values: batch,
})
}
pub fn quantile(matrix: &ExpressionMatrix) -> Result<ExpressionMatrix> {
let num_genes = matrix.genes.len();
let num_samples = matrix.samples.len();
if num_genes == 0 || num_samples == 0 {
return Ok(matrix.clone());
}
let mut sample_values: Vec<Vec<Option<f64>>> = Vec::with_capacity(num_samples);
for col_idx in 0..num_samples {
let col = matrix.values.column(col_idx);
let array = col
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| Error::Normalization("Expected Float64Array".to_string()))?;
let values: Vec<Option<f64>> = (0..num_genes)
.map(|i| {
if array.is_null(i) {
None
} else {
Some(array.value(i))
}
})
.collect();
sample_values.push(values);
}
let mut target_distribution: Vec<f64> = Vec::with_capacity(num_genes);
let mut sorted_per_sample: Vec<Vec<f64>> = Vec::with_capacity(num_samples);
for values in &sample_values {
let mut sorted: Vec<f64> = values.iter().flatten().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
sorted_per_sample.push(sorted);
}
let max_sorted_len = sorted_per_sample.iter().map(Vec::len).max().unwrap_or(0);
for rank in 0..max_sorted_len {
let mut sum = 0.0;
let mut count = 0;
for sorted in &sorted_per_sample {
if let Some(&val) = sorted.get(rank) {
sum += val;
count += 1;
}
}
if count > 0 {
target_distribution.push(sum / f64::from(count));
}
}
let mut normalized_columns: Vec<std::sync::Arc<dyn Array>> =
Vec::with_capacity(num_samples);
for (sample_idx, values) in sample_values.iter().enumerate() {
let sorted = &sorted_per_sample[sample_idx];
let normalized: Vec<Option<f64>> = values
.iter()
.map(|&opt_val| {
if let Some(val) = opt_val {
if let Ok(pos) = sorted.binary_search_by(|probe| {
probe.partial_cmp(&val).unwrap_or(std::cmp::Ordering::Equal)
}) {
let mut start = pos;
let mut end = pos;
while start > 0 && sorted.get(start - 1) == Some(&val) {
start -= 1;
}
while end + 1 < sorted.len() && sorted.get(end + 1) == Some(&val) {
end += 1;
}
let rank = usize::midpoint(start, end);
target_distribution.get(rank).copied()
} else {
None
}
} else {
None
}
})
.collect();
normalized_columns.push(std::sync::Arc::new(Float64Array::from(normalized)));
}
let schema = Schema::new(
matrix
.samples
.iter()
.map(|s| Field::new(s.clone(), DataType::Float64, true))
.collect::<Vec<_>>(),
);
let batch = RecordBatch::try_new(std::sync::Arc::new(schema), normalized_columns)?;
Ok(ExpressionMatrix {
genes: matrix.genes.clone(),
samples: matrix.samples.clone(),
values: batch,
})
}
pub fn z_score_per_gene(matrix: &ExpressionMatrix) -> Result<ExpressionMatrix> {
let num_genes = matrix.genes.len();
let num_samples = matrix.samples.len();
if num_genes == 0 || num_samples == 0 {
return Ok(matrix.clone());
}
let mut gene_values: Vec<Vec<Option<f64>>> =
vec![Vec::with_capacity(num_samples); num_genes];
for col_idx in 0..num_samples {
let col = matrix.values.column(col_idx);
let array = col
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| Error::Normalization("Expected Float64Array".to_string()))?;
for (gene_idx, opt_val) in (0..num_genes)
.map(|i| {
if array.is_null(i) {
None
} else {
Some(array.value(i))
}
})
.enumerate()
{
gene_values[gene_idx].push(opt_val);
}
}
let mut z_score_columns: Vec<Vec<Option<f64>>> =
vec![Vec::with_capacity(num_genes); num_samples];
for gene_row in &gene_values {
let non_null_values: Vec<f64> = gene_row.iter().flatten().copied().collect();
if non_null_values.len() < 2 {
for (col_idx, &orig) in gene_row.iter().enumerate() {
z_score_columns[col_idx].push(orig);
}
continue;
}
#[allow(clippy::cast_precision_loss)]
let n = non_null_values.len() as f64;
let mean = non_null_values.iter().sum::<f64>() / n;
let variance = non_null_values
.iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>()
/ n;
let std = variance.sqrt();
if std < f64::EPSILON {
for (col_idx, &orig) in gene_row.iter().enumerate() {
z_score_columns[col_idx].push(orig);
}
} else {
for (col_idx, &opt_val) in gene_row.iter().enumerate() {
let z = opt_val.map(|v| (v - mean) / std);
z_score_columns[col_idx].push(z);
}
}
}
let mut columns: Vec<std::sync::Arc<dyn Array>> = Vec::with_capacity(num_samples);
for col_values in z_score_columns {
columns.push(std::sync::Arc::new(Float64Array::from(col_values)));
}
let schema = Schema::new(
matrix
.samples
.iter()
.map(|s| Field::new(s.clone(), DataType::Float64, true))
.collect::<Vec<_>>(),
);
let batch = RecordBatch::try_new(std::sync::Arc::new(schema), columns)?;
Ok(ExpressionMatrix {
genes: matrix.genes.clone(),
samples: matrix.samples.clone(),
values: batch,
})
}
}