#![cfg(feature = "std")]
use find_peaks::PeakFinder;
use polars::{
datatypes::DataType,
frame::DataFrame,
prelude::{CsvReadOptions, NamedFrom, PolarsError, SerReader},
series::Series,
};
use polars_lazy::{
frame::IntoLazy,
prelude::{LazyFrame, col, lit},
};
use std::path::PathBuf;
#[derive(Clone)]
pub struct Data {
data: LazyFrame,
}
impl Data {
pub fn collect(self) -> DataFrame {
self.data.collect().unwrap()
}
pub fn get_rr(self) -> Vec<f64> {
let df = self.data.clone().collect().unwrap();
let x: Vec<f64> = df
.column("time")
.unwrap()
.cast(&DataType::Float64)
.unwrap()
.f64()
.unwrap()
.into_no_null_iter()
.collect();
let signal = df
.column("signal")
.unwrap()
.f64()
.unwrap()
.into_no_null_iter()
.collect::<Vec<f64>>();
let ps = PeakFinder::new_with_x(&signal, &x)
.with_min_prominence(0.75)
.with_min_distance(0.33)
.find_peaks();
let mut ps: Vec<f64> = ps.into_iter().map(|i| x[i.middle_position()]).collect();
ps.sort_by(|a, b| a.partial_cmp(b).unwrap());
ps.windows(2).map(|i| (i[1] - i[0]) * 1000.0).collect()
}
}
pub struct DataBuilder {
file: PathBuf,
signal: String,
time: Option<String>,
rate: Option<f64>,
}
impl DataBuilder {
pub fn new(file: PathBuf, signal: String) -> Self {
DataBuilder {
file,
signal,
time: None,
rate: None,
}
}
pub fn with_time(mut self, time: String) -> Self {
if self.rate.is_some() {
panic!("Rate is already set");
}
self.time = Some(time);
self
}
pub fn with_rate(mut self, rate: f64) -> Self {
if self.time.is_some() {
panic!("Time is already set");
}
self.rate = Some(rate);
self
}
pub fn build(self) -> Result<Data, PolarsError> {
let mut data = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(self.file))?
.finish()?
.lazy();
let mut selected_cols = vec![
((col(&self.signal) - col(&self.signal).mean()) / col(&self.signal).std(1))
.alias("signal"),
];
if let Some(time) = self.time {
selected_cols.push(col(time).alias("time"));
} else if let Some(rate) = self.rate {
let len_expr = data
.clone()
.select([col(&self.signal)])
.collect()
.unwrap()
.height();
let time_series: Vec<f64> = (0..len_expr).map(|i| i as f64 / rate).collect();
let time_series = Series::new("time".into(), time_series);
let time_series_lit = lit(time_series).alias("time");
selected_cols.push(time_series_lit);
}
data = data.select(&selected_cols);
Ok(Data { data })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_process_ppg() {
let path = "tests/ppg.csv";
let signal = "PPG_Raw";
let time = "Time";
let data = DataBuilder::new(path.into(), signal.into())
.with_time(time.into())
.build()
.unwrap();
let rr = data.get_rr();
let reference = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(path.into()))
.unwrap()
.finish()
.unwrap()
.lazy();
let peaks = reference
.filter(col("PPG_Peaks").eq(lit(1)))
.select([col("Time").cast(DataType::Float64)])
.collect()
.unwrap()
.column("Time")
.unwrap()
.f64()
.unwrap()
.into_no_null_iter()
.collect::<Vec<f64>>();
let reference_rr: Vec<f64> = peaks.windows(2).map(|w| (w[1] - w[0]) * 1_000.).collect();
assert_eq!(rr, reference_rr);
}
#[test]
fn test_process_ecg() {
let path = "tests/ecg.csv";
let signal = "ECG_Raw";
let time = "Time";
let data = DataBuilder::new(path.into(), signal.into())
.with_time(time.into())
.build()
.unwrap();
let rr = data.get_rr();
let reference = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(path.into()))
.unwrap()
.finish()
.unwrap()
.lazy();
let peaks = reference
.filter(col("ECG_Peaks").eq(lit(1)))
.select([col("Time").cast(DataType::Float64)])
.collect()
.unwrap()
.column("Time")
.unwrap()
.f64()
.unwrap()
.into_no_null_iter()
.collect::<Vec<f64>>();
let reference_rr: Vec<f64> = peaks.windows(2).map(|w| (w[1] - w[0]) * 1_000.).collect();
assert_eq!(rr, reference_rr);
}
#[test]
fn test_process_rate() {
let path = "tests/ecg.csv";
let signal = "ECG_Raw";
let data = DataBuilder::new(path.into(), signal.into())
.with_rate(1_000.)
.build()
.unwrap();
let rr = data.get_rr();
let reference = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(path.into()))
.unwrap()
.finish()
.unwrap()
.lazy();
let peaks = reference
.filter(col("ECG_Peaks").eq(lit(1)))
.select([col("Time").cast(DataType::Float64)])
.collect()
.unwrap()
.column("Time")
.unwrap()
.f64()
.unwrap()
.into_no_null_iter()
.collect::<Vec<f64>>();
let reference_rr: Vec<f64> = peaks.windows(2).map(|w| (w[1] - w[0]) * 1_000.).collect();
assert_eq!(rr, reference_rr);
}
}