use crate::{data::*, PharmsolError};
use csv::WriterBuilder;
use serde::de::{MapAccess, Visitor};
use serde::{de, Deserialize, Deserializer, Serialize};
use std::collections::HashMap;
use crate::data::row::build_data;
use crate::data::row::DataError;
use crate::data::row::DataRow;
use std::fmt;
use std::str::FromStr;
#[allow(dead_code)]
pub fn read_pmetrics(path: impl Into<String>) -> Result<Data, DataError> {
let path = path.into();
let mut reader = csv::ReaderBuilder::new()
.comment(Some(b'#'))
.has_headers(true)
.from_path(&path)
.map_err(|e| DataError::CSVError(e.to_string()))?;
let headers = reader
.headers()
.map_err(|e| DataError::CSVError(e.to_string()))?
.iter()
.map(|h| h.to_lowercase())
.collect::<Vec<_>>();
reader.set_headers(csv::StringRecord::from(headers));
let mut data_rows: Vec<DataRow> = Vec::new();
for row_result in reader.deserialize() {
let row: Row = row_result.map_err(|e| DataError::CSVError(e.to_string()))?;
data_rows.push(row.to_datarow());
}
build_data(data_rows)
}
#[derive(Deserialize, Debug, Serialize, Default, Clone)]
#[serde(rename_all = "lowercase")]
struct Row {
id: String,
evid: isize,
time: f64,
#[serde(deserialize_with = "deserialize_option_f64")]
dur: Option<f64>,
#[serde(deserialize_with = "deserialize_option_f64")]
dose: Option<f64>,
#[serde(deserialize_with = "deserialize_option_isize")]
addl: Option<isize>,
#[serde(deserialize_with = "deserialize_option_f64")]
ii: Option<f64>,
#[serde(deserialize_with = "deserialize_option_route_label")]
input: Option<InputLabel>,
#[serde(deserialize_with = "deserialize_option_f64")]
out: Option<f64>,
#[serde(deserialize_with = "deserialize_option_output_label")]
outeq: Option<OutputLabel>,
#[serde(default, deserialize_with = "deserialize_option_censor")]
cens: Option<Censor>,
#[serde(deserialize_with = "deserialize_option_f64")]
c0: Option<f64>,
#[serde(deserialize_with = "deserialize_option_f64")]
c1: Option<f64>,
#[serde(deserialize_with = "deserialize_option_f64")]
c2: Option<f64>,
#[serde(deserialize_with = "deserialize_option_f64")]
c3: Option<f64>,
#[serde(deserialize_with = "deserialize_covs", flatten)]
covs: HashMap<String, Option<f64>>,
}
impl Row {
fn to_datarow(&self) -> DataRow {
DataRow {
id: self.id.clone(),
time: self.time,
evid: self.evid as i32,
dose: self.dose,
dur: self.dur,
addl: self.addl.map(|a| a as i64),
ii: self.ii,
input: self.input.clone(),
out: self
.out
.and_then(|v| if v == -99.0 { None } else { Some(v) }),
outeq: self.outeq.clone(),
cens: self.cens,
c0: self.c0,
c1: self.c1,
c2: self.c2,
c3: self.c3,
covariates: self
.covs
.iter()
.filter_map(|(k, v)| v.map(|val| (k.clone(), val)))
.collect(),
}
}
}
fn deserialize_option<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: FromStr,
T::Err: std::fmt::Display,
{
let s: String = Deserialize::deserialize(deserializer)?;
if s.is_empty() || s == "." || s == "NA" {
Ok(None)
} else {
T::from_str(&s).map(Some).map_err(serde::de::Error::custom)
}
}
fn deserialize_option_f64<'de, D>(deserializer: D) -> Result<Option<f64>, D::Error>
where
D: Deserializer<'de>,
{
deserialize_option::<f64, D>(deserializer)
}
fn deserialize_option_censor<'de, D>(deserializer: D) -> Result<Option<Censor>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
if s.is_empty() || s == "." || s == "NA" {
Ok(None)
} else {
match s.as_str() {
"1" | "bloq" => Ok(Some(Censor::BLOQ)),
"0" | "none" => Ok(Some(Censor::None)),
"-1" | "aloq" => Ok(Some(Censor::ALOQ)),
_ => Err(serde::de::Error::custom(format!(
"Expected one of 1/-1/0 or bloq/aloq/none), got {}",
s
))),
}
}
}
fn deserialize_option_route_label<'de, D>(deserializer: D) -> Result<Option<InputLabel>, D::Error>
where
D: Deserializer<'de>,
{
deserialize_option::<String, D>(deserializer).map(|value| value.map(InputLabel::from))
}
fn deserialize_option_output_label<'de, D>(deserializer: D) -> Result<Option<OutputLabel>, D::Error>
where
D: Deserializer<'de>,
{
deserialize_option::<String, D>(deserializer).map(|value| value.map(OutputLabel::from))
}
fn deserialize_option_isize<'de, D>(deserializer: D) -> Result<Option<isize>, D::Error>
where
D: Deserializer<'de>,
{
deserialize_option::<isize, D>(deserializer)
}
fn deserialize_covs<'de, D>(deserializer: D) -> Result<HashMap<String, Option<f64>>, D::Error>
where
D: Deserializer<'de>,
{
struct CovsVisitor;
impl<'de> Visitor<'de> for CovsVisitor {
type Value = HashMap<String, Option<f64>>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(
"a map of string keys to optionally floating-point numbers or placeholders",
)
}
fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut covs = HashMap::new();
while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
let opt_value = match value {
serde_json::Value::String(s) => match s.as_str() {
"" => None,
"." => None,
_ => match s.parse::<f64>() {
Ok(val) => Some(val),
Err(_) => {
return Err(de::Error::custom(
"expected a floating-point number or empty string",
))
}
},
},
serde_json::Value::Number(n) => Some(n.as_f64().unwrap()),
_ => return Err(de::Error::custom("expected a string or number")),
};
covs.insert(key, opt_value);
}
Ok(covs)
}
}
deserializer.deserialize_map(CovsVisitor)
}
impl Data {
pub fn write_pmetrics(&self, file: &std::fs::File) -> Result<(), PharmsolError> {
let mut writer = WriterBuilder::new().has_headers(true).from_writer(file);
writer
.write_record([
"ID", "EVID", "TIME", "DUR", "DOSE", "ADDL", "II", "INPUT", "OUT", "OUTEQ", "CENS",
"C0", "C1", "C2", "C3",
])
.map_err(|e| PharmsolError::OtherError(e.to_string()))?;
for subject in self.subjects() {
for occasion in subject.occasions() {
for event in occasion.process_events(None, false) {
match event {
Event::Observation(obs) => {
let time = obs.time().to_string();
let value = obs
.value()
.map_or_else(|| ".".to_string(), |v| v.to_string());
let outeq = obs.outeq().to_string();
let censor = match obs.censoring() {
Censor::None => "0".to_string(),
Censor::BLOQ => "1".to_string(),
Censor::ALOQ => "-1".to_string(),
};
let (c0, c1, c2, c3) = obs
.errorpoly()
.map(|poly| {
let (c0, c1, c2, c3) = poly.coefficients();
(
c0.to_string(),
c1.to_string(),
c2.to_string(),
c3.to_string(),
)
})
.unwrap_or_else(|| {
(
".".to_string(),
".".to_string(),
".".to_string(),
".".to_string(),
)
});
writer
.write_record([
subject.id(),
&"0".to_string(),
&time,
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&value,
&outeq,
&censor,
&c0,
&c1,
&c2,
&c3,
])
.map_err(|e| PharmsolError::OtherError(e.to_string()))?;
}
Event::Infusion(inf) => {
writer
.write_record([
subject.id(),
&"1".to_string(),
&inf.time().to_string(),
&inf.duration().to_string(),
&inf.amount().to_string(),
&".".to_string(),
&".".to_string(),
&inf.input().to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
])
.map_err(|e| PharmsolError::OtherError(e.to_string()))?;
}
Event::Bolus(bol) => {
writer
.write_record([
subject.id(),
&"1".to_string(),
&bol.time().to_string(),
&"0".to_string(),
&bol.amount().to_string(),
&".".to_string(),
&".".to_string(),
&bol.input().to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
&".".to_string(),
])
.map_err(|e| PharmsolError::OtherError(e.to_string()))?;
}
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Censor, ErrorPoly, SubjectBuilderExt};
use csv::ReaderBuilder;
use std::io::Cursor;
use tempfile::NamedTempFile;
#[test]
fn test_addl() {
let data = read_pmetrics("src/tests/data/addl_test.csv");
assert!(data.is_ok(), "Failed to parse data");
let data = data.unwrap();
let subjects = data.subjects();
let first_subject = subjects.first().unwrap();
let second_subject = subjects.get(1).unwrap();
let s1_occasions = first_subject.occasions();
let s2_occasions = second_subject.occasions();
let first_scenario = s1_occasions.first().unwrap();
let second_scenario = s2_occasions.first().unwrap();
let s1_times = first_scenario
.events()
.iter()
.map(|e| e.time())
.collect::<Vec<_>>();
assert_eq!(
s1_times,
vec![-120.0, -108.0, -96.0, -84.0, -72.0, -60.0, -48.0, -36.0, -24.0, -12.0, 0.0, 9.0]
);
let s2_times = second_scenario
.events()
.iter()
.map(|e| e.time())
.collect::<Vec<_>>();
assert_eq!(
s2_times,
vec![0.0, 9.0, 12.0, 24.0, 36.0, 48.0, 60.0, 72.0, 84.0, 96.0, 108.0, 120.0]
);
}
#[test]
fn write_pmetrics_preserves_infusion_input() {
let subject = Subject::builder("writer")
.infusion(0.0, 200.0, 3, 1.0) .observation(1.0, 0.0, 1) .build();
let data = Data::new(vec![subject]);
let file = NamedTempFile::new().unwrap();
data.write_pmetrics(file.as_file()).unwrap();
let contents = std::fs::read_to_string(file.path()).unwrap();
let mut reader = ReaderBuilder::new()
.has_headers(true)
.from_reader(Cursor::new(contents));
let infusion_row = reader
.records()
.filter_map(Result::ok)
.find(|record| record.get(3) != Some("0"))
.expect("infusion row missing");
assert_eq!(infusion_row.get(7), Some("3")); }
#[test]
fn write_pmetrics_preserves_censoring_and_errorpoly() {
let subject = Subject::builder("writer")
.observation_with_error(
0.0,
2.5,
0,
ErrorPoly::new(0.1, 0.2, 0.3, 0.4),
Censor::BLOQ,
)
.censored_observation(1.0, 3.5, 1, Censor::ALOQ)
.build();
let data = Data::new(vec![subject]);
let file = NamedTempFile::new().unwrap();
data.write_pmetrics(file.as_file()).unwrap();
let contents = std::fs::read_to_string(file.path()).unwrap();
let mut reader = ReaderBuilder::new()
.has_headers(true)
.from_reader(Cursor::new(contents));
let mut observations: Vec<_> = reader
.records()
.filter_map(Result::ok)
.filter(|record| record.get(1) == Some("0"))
.collect();
assert_eq!(observations.len(), 2, "expected two observation rows");
let first = observations.remove(0);
assert_eq!(first.get(10), Some("1"));
assert_eq!(first.get(11), Some("0.1"));
assert_eq!(first.get(12), Some("0.2"));
assert_eq!(first.get(13), Some("0.3"));
assert_eq!(first.get(14), Some("0.4"));
let second = observations.remove(0);
assert_eq!(second.get(10), Some("-1"));
assert_eq!(second.get(11), Some("."));
assert_eq!(second.get(14), Some("."));
}
#[test]
fn read_pmetrics_preserves_named_route_and_output_labels() {
let file = NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
"ID,EVID,TIME,DUR,DOSE,ADDL,II,INPUT,OUT,OUTEQ,CENS,C0,C1,C2,C3\npt1,1,0,1,100,.,.,iv,.,.,.,.,.,.,.\npt1,0,1,.,.,.,.,.,42,cp,0,.,.,.,.\n",
)
.unwrap();
let data = read_pmetrics(file.path().display().to_string()).unwrap();
let events = data.subjects()[0].occasions()[0].events();
match &events[0] {
Event::Infusion(infusion) => assert_eq!(infusion.input().as_str(), "iv"),
_ => panic!("expected infusion event"),
}
match &events[1] {
Event::Observation(observation) => assert_eq!(observation.outeq().as_str(), "cp"),
_ => panic!("expected observation event"),
}
}
#[test]
fn read_pmetrics_preserves_numeric_labels_as_strings() {
let file = NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
"ID,EVID,TIME,DUR,DOSE,ADDL,II,INPUT,OUT,OUTEQ,CENS,C0,C1,C2,C3\npt1,1,0,.,100,.,.,1,.,.,.,.,.,.,.\npt1,0,1,.,.,.,.,.,42,1,0,.,.,.,.\n",
)
.unwrap();
let data = read_pmetrics(file.path().display().to_string()).unwrap();
let events = data.subjects()[0].occasions()[0].events();
match &events[0] {
Event::Bolus(bolus) => assert_eq!(bolus.input().as_str(), "1"),
_ => panic!("expected bolus event"),
}
match &events[1] {
Event::Observation(observation) => assert_eq!(observation.outeq().as_str(), "1"),
_ => panic!("expected observation event"),
}
}
}