lmutils/
lib.rs

1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2#![allow(dead_code, unused, clippy::excessive_precision)]
3mod calc;
4mod coef;
5mod error;
6mod file;
7mod gam;
8mod glm;
9mod lm;
10mod mat;
11mod matrix;
12mod mean;
13mod norm;
14mod packing;
15mod r2;
16mod spline;
17mod standardize;
18mod variance;
19
20use std::{mem::MaybeUninit, panic::AssertUnwindSafe, sync::Mutex};
21
22use rayon::prelude::*;
23use tracing::{debug, debug_span, error, info, trace, warn};
24
25pub use crate::{
26    calc::*, coef::*, error::*, file::*, glm::*, lm::*, matrix::*, mean::*, norm::*, packing::*,
27    r2::*, standardize::*, variance::*,
28};
29
30#[cfg_attr(coverage_nightly, coverage(off))]
31#[doc(hidden)]
32pub fn core_parallelize<T, F, R, E>(data: Vec<T>, out: Option<usize>, f: F) -> Result<Vec<R>, E>
33where
34    T: Send + Sync,
35    for<'a> F: (Fn(usize, &'a mut T) -> Result<Vec<R>, E>) + Send + Sync,
36    R: Send + Sync,
37    E: std::error::Error,
38{
39    let core_parallelism = std::env::var("LMUTILS_CORE_PARALLELISM")
40        .ok()
41        .or_else(|| std::env::var("LMUTILS_NUM_MAIN_THREADS").ok())
42        .and_then(|x| x.parse::<usize>().ok())
43        .unwrap_or(16)
44        .clamp(1, data.len().min(num_cpus::get()));
45    let ignore_errors = std::env::var("LMUTILS_IGNORE_CORE_PARALLEL_ERRORS")
46        .ok()
47        .map(|x| x == "1")
48        .unwrap_or(true);
49
50    let mut results_uninit = if let Some(out) = out {
51        // if there's a known output size, we can preallocate the results and guarantee
52        // an order
53        let mut results: Vec<MaybeUninit<R>> = Vec::with_capacity(out * data.len());
54        results.extend((0..(out * data.len())).map(|_| MaybeUninit::uninit()));
55        Some(results)
56    } else {
57        // if there isn't, then we can't guarantee an order, so we just return None
58        None
59    };
60    let mut results_push = if out.is_none() {
61        Some(Mutex::new(Vec::new()))
62    } else {
63        None
64    };
65
66    let data = Mutex::new(data.into_iter().enumerate().collect::<Vec<_>>());
67
68    std::thread::scope(|s| {
69        for _ in 0..core_parallelism {
70            s.spawn(|| loop {
71                let mut guard = data.lock().unwrap();
72                let d = guard.pop();
73                drop(guard);
74                if let Some((i, mut d)) = d {
75                    rayon::scope(|s| {
76                        s.spawn(|_| {
77                            let s = debug_span!("core_scope");
78                            let _e = s.enter();
79                            let mut tries = 1;
80                            #[allow(clippy::blocks_in_conditions)]
81                            while std::panic::catch_unwind(AssertUnwindSafe(|| {
82                                let r = f(i, &mut d).unwrap();
83                                if let Some(out) = out {
84                                    let results = unsafe {
85                                        std::slice::from_raw_parts_mut(
86                                            results_uninit
87                                                .as_ref()
88                                                .unwrap()
89                                                .as_ptr()
90                                                .add(i * out)
91                                                .cast_mut(),
92                                            out,
93                                        )
94                                    };
95                                    for (i, p) in r.into_iter().enumerate() {
96                                        results[i].write(p);
97                                    }
98                                } else {
99                                    let mut results =
100                                        results_push.as_ref().unwrap().lock().unwrap();
101                                    results.extend(r);
102                                }
103                            }))
104                            .is_err()
105                            {
106                                let duration = std::time::Duration::from_secs(4u64.pow(tries));
107                                warn!(
108                                    "Error in core scope, retrying in {} seconds",
109                                    duration.as_secs_f64()
110                                );
111                                std::thread::sleep(duration);
112                                tries += 1;
113                                if tries > 5 {
114                                    if ignore_errors {
115                                        error!("Error in core scope, ignoring");
116                                        break;
117                                    } else {
118                                        error!("Error in core scope, too many retries");
119                                        panic!("Error in core scope, too many retries");
120                                    }
121                                }
122                            }
123                        })
124                    })
125                } else {
126                    break;
127                }
128            });
129        }
130    });
131
132    Ok(if let Some(out) = out {
133        // SAFETY: We have initialized all elements of the array.
134        unsafe { std::mem::transmute::<Vec<MaybeUninit<R>>, Vec<R>>(results_uninit.unwrap()) }
135    } else {
136        results_push.unwrap().into_inner().unwrap()
137    })
138}
139
140// Calculate R^2 and adjusted R^2 for a list of data and outcomes.
141#[tracing::instrument(skip(data, outcomes, data_names))]
142pub fn calculate_r2s(
143    mut data: Vec<Matrix>,
144    mut outcomes: Matrix,
145    data_names: Option<Vec<&str>>,
146) -> Result<Vec<R2>, crate::Error> {
147    outcomes.remove_column_by_name_if_exists("eid");
148    outcomes.remove_column_by_name_if_exists("IID");
149    let colnames = outcomes
150        .colnames()?
151        .map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>());
152    debug!("Loading outcomes");
153    let or = outcomes.as_mat_ref()?;
154    debug!("Loaded outcomes");
155    // let data = Mutex::new(
156    //     data.into_iter()
157    //         .enumerate()
158    //         .map(|(i, m)| m.make_parallel_safe().map(|m| (i, m)))
159    //         .collect::<Result<Vec<_>, _>>()?,
160    // );
161    // let ndata = data.lock().unwrap().len();
162    // let mut results: Vec<MaybeUninit<R2>> = Vec::with_capacity(or.ncols() *
163    // ndata); results.extend((0..(or.ncols() * ndata)).map(|_|
164    // MaybeUninit::uninit()));
165    let data_names = match data_names {
166        Some(data_names) if data_names.iter().all(|x| *x == "NA") => None,
167        x => x,
168    };
169    core_parallelize(data, Some(or.ncols()), |i, mat| {
170        let data_set = if let Some(data_names) = &data_names {
171            data_names[i].to_string()
172        } else {
173            (i + 1).to_string()
174        };
175        info!("Calculating R^2 for data set {}", data_set);
176        if !mat.is_loaded() {
177            info!("Loading data set {}", data_set);
178            mat.into_owned()?;
179            info!("Loaded data set {}", data_set);
180        } else {
181            info!("Data set {} already loaded", data_set);
182        }
183        if mat.has_column_loaded("eid") || mat.has_column_loaded("IID") {
184            mat.remove_column_by_name_if_exists("eid")?;
185            mat.remove_column_by_name_if_exists("IID")?;
186        }
187        let r = mat.as_mat_ref_loaded();
188        let r2s = get_r2s(r, or)
189            .into_iter()
190            .enumerate()
191            .map(|(j, mut r)| {
192                if let Some(data_names) = &data_names {
193                    r.data = Some(data_names[i].to_string());
194                } else {
195                    r.data = Some((i + 1).to_string());
196                }
197                r.outcome = colnames
198                    .as_ref()
199                    .and_then(|c| c.get(j).map(|c| c.to_string()))
200                    .or_else(|| Some((j + 1).to_string()));
201                r
202            })
203            .collect::<Vec<_>>();
204        info!(
205            "Finished calculating R^2 for data set {}",
206            if let Some(data_names) = &data_names {
207                data_names[i].to_string()
208            } else {
209                i.to_string()
210            }
211        );
212        Ok(r2s)
213    })
214}
215
216#[tracing::instrument(skip(data, outcomes, data_names))]
217pub fn column_p_values(
218    mut data: Vec<Matrix>,
219    mut outcomes: Matrix,
220    data_names: Option<Vec<&str>>,
221) -> Result<Vec<PValue>, crate::Error> {
222    outcomes.remove_column_by_name_if_exists("eid");
223    outcomes.remove_column_by_name_if_exists("IID");
224    let colnames = outcomes
225        .colnames()?
226        .map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>());
227    let or = outcomes.as_mat_ref()?;
228    // let data = Mutex::new(
229    //     data.into_iter()
230    //         .enumerate()
231    //         .map(|(i, m)| m.make_parallel_safe().map(|m| (i, m)))
232    //         .collect::<Result<Vec<_>, _>>()?,
233    // );
234    // let ndata = or.nrows();
235    // let mut results: Vec<MaybeUninit<PValue>> = Vec::with_capacity(or.ncols() *
236    // ndata); results.extend((0..(or.ncols() * ndata)).map(|_|
237    // MaybeUninit::uninit()));
238    core_parallelize(data, None, |i, mat| {
239        info!(
240            "Calculating p-values for data set {}",
241            if let Some(data_names) = &data_names {
242                data_names[i].to_string()
243            } else {
244                (i + 1).to_string()
245            }
246        );
247        if !mat.is_loaded() {
248            mat.into_owned()?;
249        }
250        if mat.has_column_loaded("eid") || mat.has_column_loaded("IID") {
251            mat.remove_column_by_name_if_exists("eid")?;
252            mat.remove_column_by_name_if_exists("IID")?;
253        }
254        let data = mat.as_mat_ref_loaded();
255        let p_values = (0..data.ncols())
256            .into_par_iter()
257            .flat_map(|x| {
258                let xs = data
259                    .get(.., x)
260                    .try_as_col_major()
261                    .expect("could not get slice")
262                    .as_slice();
263                (0..or.ncols()).into_par_iter().map(move |y| {
264                    let ys = or
265                        .get(.., y)
266                        .try_as_col_major()
267                        .expect("could not get slice")
268                        .as_slice();
269                    (x, y, p_value(xs, ys))
270                })
271            })
272            .map(|(x, y, mut p)| {
273                if let Some(data_names) = &data_names {
274                    p.data = Some(data_names[i].to_string());
275                } else {
276                    p.data = Some((i + 1).to_string());
277                }
278                p.data_column = Some((x + 1) as u32);
279                p.outcome = colnames
280                    .as_ref()
281                    .and_then(|c| c.get(y).map(|c| c.to_string()))
282                    .or_else(|| Some((y + 1).to_string()));
283                p
284            })
285            .collect::<Vec<_>>();
286        info!(
287            "Finished calculating p-values for data set {}",
288            if let Some(data_names) = &data_names {
289                data_names[i].to_string()
290            } else {
291                i.to_string()
292            }
293        );
294        Ok(p_values)
295    })
296}
297
298pub fn compute_r2(actual: &[f64], predicted: &[f64]) -> f64 {
299    R2Simd::new(actual, predicted).calculate()
300}
301
302#[cfg(test)]
303mod tests {
304    use test_log::test;
305
306    use super::*;
307
308    #[test]
309    fn test_calculate_r2s() {
310        let mut m1 = Matrix::Owned(OwnedMatrix::new(
311            3,
312            3,
313            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
314            None,
315        ));
316        let mut m2 = Matrix::Owned(OwnedMatrix::new(
317            3,
318            3,
319            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
320            None,
321        ));
322        let mut m3 = Matrix::Owned(OwnedMatrix::new(
323            3,
324            3,
325            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
326            None,
327        ));
328        let results = calculate_r2s(vec![m1, m2], m3, None).unwrap();
329        assert_eq!(results.len(), 6);
330    }
331
332    #[test]
333    fn test_calculate_r2s_names() {
334        let mut m1 = Matrix::Owned(OwnedMatrix::new(
335            3,
336            3,
337            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
338            Some(vec!["a".to_string(), "b".to_string(), "c".to_string()]),
339        ));
340        let mut m2 = Matrix::Owned(OwnedMatrix::new(
341            3,
342            3,
343            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
344            Some(vec!["d".to_string(), "e".to_string(), "f".to_string()]),
345        ));
346        let mut m3 = Matrix::Owned(OwnedMatrix::new(
347            3,
348            3,
349            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
350            Some(vec!["g".to_string(), "h".to_string(), "i".to_string()]),
351        ));
352        let results = calculate_r2s(vec![m1, m2], m3, Some(vec!["j", "k"])).unwrap();
353        assert_eq!(results.len(), 6);
354    }
355
356    #[test]
357    fn test_column_p_values() {
358        let mut m1 = Matrix::Owned(OwnedMatrix::new(
359            3,
360            3,
361            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
362            None,
363        ));
364        let mut m2 = Matrix::Owned(OwnedMatrix::new(
365            3,
366            3,
367            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
368            None,
369        ));
370        let mut m3 = Matrix::Owned(OwnedMatrix::new(
371            3,
372            3,
373            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
374            None,
375        ));
376        let results = column_p_values(vec![m1, m2], m3, None).unwrap();
377        assert_eq!(results.len(), 18);
378    }
379
380    #[test]
381    fn test_column_p_values_names() {
382        let mut m1 = Matrix::Owned(OwnedMatrix::new(
383            3,
384            3,
385            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
386            Some(vec!["a".to_string(), "b".to_string(), "c".to_string()]),
387        ));
388        let mut m2 = Matrix::Owned(OwnedMatrix::new(
389            3,
390            3,
391            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
392            Some(vec!["d".to_string(), "e".to_string(), "f".to_string()]),
393        ));
394        let mut m3 = Matrix::Owned(OwnedMatrix::new(
395            3,
396            3,
397            vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0],
398            Some(vec!["g".to_string(), "h".to_string(), "i".to_string()]),
399        ));
400        let results = column_p_values(vec![m1, m2], m3, Some(vec!["j", "k"])).unwrap();
401        assert_eq!(results.len(), 18);
402    }
403}