1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2#![allow(dead_code, unused, clippy::excessive_precision)]
3mod calc;
4mod coef;
5mod error;
6mod file;
7mod gam;
8mod glm;
9mod lm;
10mod mat;
11mod matrix;
12mod mean;
13mod norm;
14mod packing;
15mod r2;
16mod spline;
17mod standardize;
18mod variance;
19
20use std::{mem::MaybeUninit, panic::AssertUnwindSafe, sync::Mutex};
21
22use rayon::prelude::*;
23use tracing::{debug, debug_span, error, info, trace, warn};
24
25pub use crate::{
26 calc::*, coef::*, error::*, file::*, glm::*, lm::*, matrix::*, mean::*, norm::*, packing::*,
27 r2::*, standardize::*, variance::*,
28};
29
30#[cfg_attr(coverage_nightly, coverage(off))]
31#[doc(hidden)]
32pub fn core_parallelize<T, F, R, E>(data: Vec<T>, out: Option<usize>, f: F) -> Result<Vec<R>, E>
33where
34 T: Send + Sync,
35 for<'a> F: (Fn(usize, &'a mut T) -> Result<Vec<R>, E>) + Send + Sync,
36 R: Send + Sync,
37 E: std::error::Error,
38{
39 let core_parallelism = std::env::var("LMUTILS_CORE_PARALLELISM")
40 .ok()
41 .or_else(|| std::env::var("LMUTILS_NUM_MAIN_THREADS").ok())
42 .and_then(|x| x.parse::<usize>().ok())
43 .unwrap_or(16)
44 .clamp(1, data.len().min(num_cpus::get()));
45 let ignore_errors = std::env::var("LMUTILS_IGNORE_CORE_PARALLEL_ERRORS")
46 .ok()
47 .map(|x| x == "1")
48 .unwrap_or(true);
49
50 let mut results_uninit = if let Some(out) = out {
51 let mut results: Vec<MaybeUninit<R>> = Vec::with_capacity(out * data.len());
54 results.extend((0..(out * data.len())).map(|_| MaybeUninit::uninit()));
55 Some(results)
56 } else {
57 None
59 };
60 let mut results_push = if out.is_none() {
61 Some(Mutex::new(Vec::new()))
62 } else {
63 None
64 };
65
66 let data = Mutex::new(data.into_iter().enumerate().collect::<Vec<_>>());
67
68 std::thread::scope(|s| {
69 for _ in 0..core_parallelism {
70 s.spawn(|| loop {
71 let mut guard = data.lock().unwrap();
72 let d = guard.pop();
73 drop(guard);
74 if let Some((i, mut d)) = d {
75 rayon::scope(|s| {
76 s.spawn(|_| {
77 let s = debug_span!("core_scope");
78 let _e = s.enter();
79 let mut tries = 1;
80 #[allow(clippy::blocks_in_conditions)]
81 while std::panic::catch_unwind(AssertUnwindSafe(|| {
82 let r = f(i, &mut d).unwrap();
83 if let Some(out) = out {
84 let results = unsafe {
85 std::slice::from_raw_parts_mut(
86 results_uninit
87 .as_ref()
88 .unwrap()
89 .as_ptr()
90 .add(i * out)
91 .cast_mut(),
92 out,
93 )
94 };
95 for (i, p) in r.into_iter().enumerate() {
96 results[i].write(p);
97 }
98 } else {
99 let mut results =
100 results_push.as_ref().unwrap().lock().unwrap();
101 results.extend(r);
102 }
103 }))
104 .is_err()
105 {
106 let duration = std::time::Duration::from_secs(4u64.pow(tries));
107 warn!(
108 "Error in core scope, retrying in {} seconds",
109 duration.as_secs_f64()
110 );
111 std::thread::sleep(duration);
112 tries += 1;
113 if tries > 5 {
114 if ignore_errors {
115 error!("Error in core scope, ignoring");
116 break;
117 } else {
118 error!("Error in core scope, too many retries");
119 panic!("Error in core scope, too many retries");
120 }
121 }
122 }
123 })
124 })
125 } else {
126 break;
127 }
128 });
129 }
130 });
131
132 Ok(if let Some(out) = out {
133 unsafe { std::mem::transmute::<Vec<MaybeUninit<R>>, Vec<R>>(results_uninit.unwrap()) }
135 } else {
136 results_push.unwrap().into_inner().unwrap()
137 })
138}
139
140#[tracing::instrument(skip(data, outcomes, data_names))]
142pub fn calculate_r2s(
143 mut data: Vec<Matrix>,
144 mut outcomes: Matrix,
145 data_names: Option<Vec<&str>>,
146) -> Result<Vec<R2>, crate::Error> {
147 outcomes.remove_column_by_name_if_exists("eid");
148 outcomes.remove_column_by_name_if_exists("IID");
149 let colnames = outcomes
150 .colnames()?
151 .map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>());
152 debug!("Loading outcomes");
153 let or = outcomes.as_mat_ref()?;
154 debug!("Loaded outcomes");
155 let data_names = match data_names {
166 Some(data_names) if data_names.iter().all(|x| *x == "NA") => None,
167 x => x,
168 };
169 core_parallelize(data, Some(or.ncols()), |i, mat| {
170 let data_set = if let Some(data_names) = &data_names {
171 data_names[i].to_string()
172 } else {
173 (i + 1).to_string()
174 };
175 info!("Calculating R^2 for data set {}", data_set);
176 if !mat.is_loaded() {
177 info!("Loading data set {}", data_set);
178 mat.into_owned()?;
179 info!("Loaded data set {}", data_set);
180 } else {
181 info!("Data set {} already loaded", data_set);
182 }
183 if mat.has_column_loaded("eid") || mat.has_column_loaded("IID") {
184 mat.remove_column_by_name_if_exists("eid")?;
185 mat.remove_column_by_name_if_exists("IID")?;
186 }
187 let r = mat.as_mat_ref_loaded();
188 let r2s = get_r2s(r, or)
189 .into_iter()
190 .enumerate()
191 .map(|(j, mut r)| {
192 if let Some(data_names) = &data_names {
193 r.data = Some(data_names[i].to_string());
194 } else {
195 r.data = Some((i + 1).to_string());
196 }
197 r.outcome = colnames
198 .as_ref()
199 .and_then(|c| c.get(j).map(|c| c.to_string()))
200 .or_else(|| Some((j + 1).to_string()));
201 r
202 })
203 .collect::<Vec<_>>();
204 info!(
205 "Finished calculating R^2 for data set {}",
206 if let Some(data_names) = &data_names {
207 data_names[i].to_string()
208 } else {
209 i.to_string()
210 }
211 );
212 Ok(r2s)
213 })
214}
215
216#[tracing::instrument(skip(data, outcomes, data_names))]
217pub fn column_p_values(
218 mut data: Vec<Matrix>,
219 mut outcomes: Matrix,
220 data_names: Option<Vec<&str>>,
221) -> Result<Vec<PValue>, crate::Error> {
222 outcomes.remove_column_by_name_if_exists("eid");
223 outcomes.remove_column_by_name_if_exists("IID");
224 let colnames = outcomes
225 .colnames()?
226 .map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>());
227 let or = outcomes.as_mat_ref()?;
228 core_parallelize(data, None, |i, mat| {
239 info!(
240 "Calculating p-values for data set {}",
241 if let Some(data_names) = &data_names {
242 data_names[i].to_string()
243 } else {
244 (i + 1).to_string()
245 }
246 );
247 if !mat.is_loaded() {
248 mat.into_owned()?;
249 }
250 if mat.has_column_loaded("eid") || mat.has_column_loaded("IID") {
251 mat.remove_column_by_name_if_exists("eid")?;
252 mat.remove_column_by_name_if_exists("IID")?;
253 }
254 let data = mat.as_mat_ref_loaded();
255 let p_values = (0..data.ncols())
256 .into_par_iter()
257 .flat_map(|x| {
258 let xs = data
259 .get(.., x)
260 .try_as_col_major()
261 .expect("could not get slice")
262 .as_slice();
263 (0..or.ncols()).into_par_iter().map(move |y| {
264 let ys = or
265 .get(.., y)
266 .try_as_col_major()
267 .expect("could not get slice")
268 .as_slice();
269 (x, y, p_value(xs, ys))
270 })
271 })
272 .map(|(x, y, mut p)| {
273 if let Some(data_names) = &data_names {
274 p.data = Some(data_names[i].to_string());
275 } else {
276 p.data = Some((i + 1).to_string());
277 }
278 p.data_column = Some((x + 1) as u32);
279 p.outcome = colnames
280 .as_ref()
281 .and_then(|c| c.get(y).map(|c| c.to_string()))
282 .or_else(|| Some((y + 1).to_string()));
283 p
284 })
285 .collect::<Vec<_>>();
286 info!(
287 "Finished calculating p-values for data set {}",
288 if let Some(data_names) = &data_names {
289 data_names[i].to_string()
290 } else {
291 i.to_string()
292 }
293 );
294 Ok(p_values)
295 })
296}
297
298pub fn compute_r2(actual: &[f64], predicted: &[f64]) -> f64 {
299 R2Simd::new(actual, predicted).calculate()
300}
301
302#[cfg(test)]
303mod tests {
304 use test_log::test;
305
306 use super::*;
307
308 #[test]
309 fn test_calculate_r2s() {
310 let mut m1 = Matrix::Owned(OwnedMatrix::new(
311 3,
312 3,
313 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
314 None,
315 ));
316 let mut m2 = Matrix::Owned(OwnedMatrix::new(
317 3,
318 3,
319 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
320 None,
321 ));
322 let mut m3 = Matrix::Owned(OwnedMatrix::new(
323 3,
324 3,
325 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
326 None,
327 ));
328 let results = calculate_r2s(vec![m1, m2], m3, None).unwrap();
329 assert_eq!(results.len(), 6);
330 }
331
332 #[test]
333 fn test_calculate_r2s_names() {
334 let mut m1 = Matrix::Owned(OwnedMatrix::new(
335 3,
336 3,
337 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
338 Some(vec!["a".to_string(), "b".to_string(), "c".to_string()]),
339 ));
340 let mut m2 = Matrix::Owned(OwnedMatrix::new(
341 3,
342 3,
343 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
344 Some(vec!["d".to_string(), "e".to_string(), "f".to_string()]),
345 ));
346 let mut m3 = Matrix::Owned(OwnedMatrix::new(
347 3,
348 3,
349 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
350 Some(vec!["g".to_string(), "h".to_string(), "i".to_string()]),
351 ));
352 let results = calculate_r2s(vec![m1, m2], m3, Some(vec!["j", "k"])).unwrap();
353 assert_eq!(results.len(), 6);
354 }
355
356 #[test]
357 fn test_column_p_values() {
358 let mut m1 = Matrix::Owned(OwnedMatrix::new(
359 3,
360 3,
361 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
362 None,
363 ));
364 let mut m2 = Matrix::Owned(OwnedMatrix::new(
365 3,
366 3,
367 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
368 None,
369 ));
370 let mut m3 = Matrix::Owned(OwnedMatrix::new(
371 3,
372 3,
373 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
374 None,
375 ));
376 let results = column_p_values(vec![m1, m2], m3, None).unwrap();
377 assert_eq!(results.len(), 18);
378 }
379
380 #[test]
381 fn test_column_p_values_names() {
382 let mut m1 = Matrix::Owned(OwnedMatrix::new(
383 3,
384 3,
385 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
386 Some(vec!["a".to_string(), "b".to_string(), "c".to_string()]),
387 ));
388 let mut m2 = Matrix::Owned(OwnedMatrix::new(
389 3,
390 3,
391 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
392 Some(vec!["d".to_string(), "e".to_string(), "f".to_string()]),
393 ));
394 let mut m3 = Matrix::Owned(OwnedMatrix::new(
395 3,
396 3,
397 vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
398 Some(vec!["g".to_string(), "h".to_string(), "i".to_string()]),
399 ));
400 let results = column_p_values(vec![m1, m2], m3, Some(vec!["j", "k"])).unwrap();
401 assert_eq!(results.len(), 18);
402 }
403}