polars_compute/
moment.rs

1// Some formulae:
2//     mean_x = sum(weight[i] * x[i]) / sum(weight)
3//     dp_xy = weighted sum of deviation products of variables x, y, written in
4//             the paper as simply XY.
5//     dp_xy = sum(weight[i] * (x[i] - mean_x) * (y[i] - mean_y))
6//
7//     cov(x, y) = dp_xy / sum(weight)
8//     var(x) = cov(x, x)
9//
10// Algorithms from:
11// Numerically stable parallel computation of (co-)variance.
12// Schubert, E. & Gertz, M. (2018).
13//
14// Key equations from the paper:
15// (17) for mean update, (23) for dp update (and also Table 1).
16//
17//
18// For higher moments we refer to:
19// Numerically Stable, Scalable Formulas for Parallel and Online Computation of
20// Higher-Order Multivariate Central Moments with Arbitrary Weights.
21// Pébay, P. & Terriberry, T. B. & Kolla, H. & Bennett J. (2016)
22//
23// Key equations from paper:
24// (3.26) mean update, (3.27) moment update.
25//
26// Here we use mk to mean the weighted kth central moment:
27//    mk = sum(weight[i] * (x[i] - mean_x)**k)
28// Note that we'll use the terms m2 = dp = dp_xx if unambiguous.
29
30#![allow(clippy::collapsible_else_if)]
31
32use arrow::array::{Array, PrimitiveArray};
33use arrow::types::NativeType;
34use num_traits::AsPrimitive;
35use polars_utils::algebraic_ops::*;
36
37const CHUNK_SIZE: usize = 128;
38
39#[derive(Default, Clone)]
40pub struct VarState {
41    weight: f64,
42    mean: f64,
43    dp: f64,
44}
45
46#[derive(Default, Clone)]
47pub struct CovState {
48    weight: f64,
49    mean_x: f64,
50    mean_y: f64,
51    dp_xy: f64,
52}
53
54#[derive(Default, Clone)]
55pub struct PearsonState {
56    weight: f64,
57    mean_x: f64,
58    mean_y: f64,
59    dp_xx: f64,
60    dp_xy: f64,
61    dp_yy: f64,
62}
63
64impl VarState {
65    fn new(x: &[f64]) -> Self {
66        if x.is_empty() {
67            return Self::default();
68        }
69
70        let weight = x.len() as f64;
71        let mean = alg_sum_f64(x.iter().copied()) / weight;
72        Self {
73            weight,
74            mean,
75            dp: alg_sum_f64(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
76        }
77    }
78
79    fn clear_zero_weight_nan(&mut self) {
80        // Clear NaNs due to division by zero.
81        if self.weight == 0.0 {
82            self.mean = 0.0;
83            self.dp = 0.0;
84        }
85    }
86
87    pub fn insert_one(&mut self, x: f64) {
88        // Just a specialized version of
89        // self.combine(&Self { weight: 1.0, mean: x, dp: 0.0 })
90        let new_weight = self.weight + 1.0;
91        let delta_mean = x - self.mean;
92        let new_mean = self.mean + delta_mean / new_weight;
93        self.dp += (x - new_mean) * delta_mean;
94        self.weight = new_weight;
95        self.mean = new_mean;
96        self.clear_zero_weight_nan();
97    }
98
99    pub fn remove_one(&mut self, x: f64) {
100        // Just a specialized version of
101        // self.combine(&Self { weight: -1.0, mean: x, dp: 0.0 })
102        let new_weight = self.weight - 1.0;
103        let delta_mean = x - self.mean;
104        let new_mean = self.mean - delta_mean / new_weight;
105        self.dp -= (x - new_mean) * delta_mean;
106        self.weight = new_weight;
107        self.mean = new_mean;
108        self.clear_zero_weight_nan();
109    }
110
111    pub fn combine(&mut self, other: &Self) {
112        if other.weight == 0.0 {
113            return;
114        }
115
116        let new_weight = self.weight + other.weight;
117        let other_weight_frac = other.weight / new_weight;
118        let delta_mean = other.mean - self.mean;
119        let new_mean = self.mean + delta_mean * other_weight_frac;
120        self.dp += other.dp + other.weight * (other.mean - new_mean) * delta_mean;
121        self.weight = new_weight;
122        self.mean = new_mean;
123        self.clear_zero_weight_nan();
124    }
125
126    pub fn finalize(&self, ddof: u8) -> Option<f64> {
127        if self.weight <= ddof as f64 {
128            None
129        } else {
130            let var = self.dp / (self.weight - ddof as f64);
131            Some(if var < 0.0 {
132                // Variance can't be negative, except through numerical instability.
133                // We don't use f64::max here so we propagate nans.
134                0.0
135            } else {
136                var
137            })
138        }
139    }
140}
141
142impl CovState {
143    fn new(x: &[f64], y: &[f64]) -> Self {
144        assert!(x.len() == y.len());
145        if x.is_empty() {
146            return Self::default();
147        }
148
149        let weight = x.len() as f64;
150        let inv_weight = 1.0 / weight;
151        let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
152        let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
153        Self {
154            weight,
155            mean_x,
156            mean_y,
157            dp_xy: alg_sum_f64(
158                x.iter()
159                    .zip(y)
160                    .map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
161            ),
162        }
163    }
164
165    pub fn combine(&mut self, other: &Self) {
166        if other.weight == 0.0 {
167            return;
168        } else if self.weight == 0.0 {
169            *self = other.clone();
170            return;
171        }
172
173        let new_weight = self.weight + other.weight;
174        let other_weight_frac = other.weight / new_weight;
175        let delta_mean_x = other.mean_x - self.mean_x;
176        let delta_mean_y = other.mean_y - self.mean_y;
177        let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
178        let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
179        self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
180        self.weight = new_weight;
181        self.mean_x = new_mean_x;
182        self.mean_y = new_mean_y;
183    }
184
185    pub fn finalize(&self, ddof: u8) -> Option<f64> {
186        if self.weight <= ddof as f64 {
187            None
188        } else {
189            Some(self.dp_xy / (self.weight - ddof as f64))
190        }
191    }
192}
193
194impl PearsonState {
195    fn new(x: &[f64], y: &[f64]) -> Self {
196        assert!(x.len() == y.len());
197        if x.is_empty() {
198            return Self::default();
199        }
200
201        let weight = x.len() as f64;
202        let inv_weight = 1.0 / weight;
203        let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
204        let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
205        let mut dp_xx = 0.0;
206        let mut dp_xy = 0.0;
207        let mut dp_yy = 0.0;
208        for (xi, yi) in x.iter().zip(y.iter()) {
209            dp_xx = alg_add_f64(dp_xx, (xi - mean_x) * (xi - mean_x));
210            dp_xy = alg_add_f64(dp_xy, (xi - mean_x) * (yi - mean_y));
211            dp_yy = alg_add_f64(dp_yy, (yi - mean_y) * (yi - mean_y));
212        }
213        Self {
214            weight,
215            mean_x,
216            mean_y,
217            dp_xx,
218            dp_xy,
219            dp_yy,
220        }
221    }
222
223    pub fn combine(&mut self, other: &Self) {
224        if other.weight == 0.0 {
225            return;
226        } else if self.weight == 0.0 {
227            *self = other.clone();
228            return;
229        }
230
231        let new_weight = self.weight + other.weight;
232        let other_weight_frac = other.weight / new_weight;
233        let delta_mean_x = other.mean_x - self.mean_x;
234        let delta_mean_y = other.mean_y - self.mean_y;
235        let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
236        let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
237        self.dp_xx += other.dp_xx + other.weight * (other.mean_x - new_mean_x) * delta_mean_x;
238        self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
239        self.dp_yy += other.dp_yy + other.weight * (other.mean_y - new_mean_y) * delta_mean_y;
240        self.weight = new_weight;
241        self.mean_x = new_mean_x;
242        self.mean_y = new_mean_y;
243    }
244
245    pub fn finalize(&self) -> f64 {
246        let denom_sq = self.dp_xx * self.dp_yy;
247        if denom_sq > 0.0 {
248            self.dp_xy / denom_sq.sqrt()
249        } else {
250            f64::NAN
251        }
252    }
253}
254
255#[derive(Default, Clone)]
256pub struct SkewState {
257    weight: f64,
258    mean: f64,
259    m2: f64,
260    m3: f64,
261}
262
263impl SkewState {
264    fn new(x: &[f64]) -> Self {
265        if x.is_empty() {
266            return Self::default();
267        }
268
269        let weight = x.len() as f64;
270        let mean = alg_sum_f64(x.iter().copied()) / weight;
271        let mut m2 = 0.0;
272        let mut m3 = 0.0;
273        for xi in x.iter() {
274            let d = xi - mean;
275            let d2 = d * d;
276            let d3 = d * d2;
277            m2 = alg_add_f64(m2, d2);
278            m3 = alg_add_f64(m3, d3);
279        }
280        Self {
281            weight,
282            mean,
283            m2,
284            m3,
285        }
286    }
287
288    fn clear_zero_weight_nan(&mut self) {
289        // Clear NaNs due to division by zero.
290        if self.weight == 0.0 {
291            self.mean = 0.0;
292            self.m2 = 0.0;
293            self.m3 = 0.0;
294        }
295    }
296
297    pub fn insert_one(&mut self, x: f64) {
298        // Specialization of self.combine(&SkewState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0 });
299        let new_weight = self.weight + 1.0;
300        let delta_mean = x - self.mean;
301        let delta_mean_weight = delta_mean / new_weight;
302        let new_mean = self.mean + delta_mean_weight;
303
304        let weight_diff = self.weight - 1.0;
305        let m2_update = (x - new_mean) * delta_mean;
306        let new_m2 = self.m2 + m2_update;
307        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
308
309        self.weight = new_weight;
310        self.mean = new_mean;
311        self.m2 = new_m2;
312        self.m3 = new_m3;
313        self.clear_zero_weight_nan();
314    }
315
316    pub fn remove_one(&mut self, x: f64) {
317        // Specialization of self.combine(&SkewState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0 });
318        let new_weight = self.weight - 1.0;
319        let delta_mean = x - self.mean;
320        let delta_mean_weight = delta_mean / new_weight;
321        let new_mean = self.mean - delta_mean_weight;
322
323        let weight_diff = self.weight + 1.0;
324        let m2_update = (new_mean - x) * delta_mean;
325        let new_m2 = self.m2 + m2_update;
326        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
327
328        self.weight = new_weight;
329        self.mean = new_mean;
330        self.m2 = new_m2;
331        self.m3 = new_m3;
332        self.clear_zero_weight_nan();
333    }
334
335    pub fn combine(&mut self, other: &Self) {
336        if other.weight == 0.0 {
337            return;
338        } else if self.weight == 0.0 {
339            *self = other.clone();
340            return;
341        }
342
343        let new_weight = self.weight + other.weight;
344        let delta_mean = other.mean - self.mean;
345        let delta_mean_weight = delta_mean / new_weight;
346        let new_mean = self.mean + other.weight * delta_mean_weight;
347
348        let weight_diff = self.weight - other.weight;
349        let self_weight_other_m2 = self.weight * other.m2;
350        let other_weight_self_m2 = other.weight * self.m2;
351        let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
352        let new_m2 = self.m2 + other.m2 + m2_update;
353        let new_m3 = self.m3
354            + other.m3
355            + delta_mean_weight
356                * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
357
358        self.weight = new_weight;
359        self.mean = new_mean;
360        self.m2 = new_m2;
361        self.m3 = new_m3;
362        self.clear_zero_weight_nan();
363    }
364
365    pub fn finalize(&self, bias: bool) -> Option<f64> {
366        let m2 = self.m2 / self.weight;
367        let m3 = self.m3 / self.weight;
368        let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
369        let biased_est = if is_zero { f64::NAN } else { m3 / m2.powf(1.5) };
370        if bias {
371            if self.weight == 0.0 {
372                None
373            } else {
374                Some(biased_est)
375            }
376        } else {
377            if self.weight <= 2.0 {
378                None
379            } else {
380                let correction = (self.weight * (self.weight - 1.0)).sqrt() / (self.weight - 2.0);
381                Some(correction * biased_est)
382            }
383        }
384    }
385}
386
387#[derive(Default, Clone)]
388pub struct KurtosisState {
389    weight: f64,
390    mean: f64,
391    m2: f64,
392    m3: f64,
393    m4: f64,
394}
395
396impl KurtosisState {
397    fn new(x: &[f64]) -> Self {
398        if x.is_empty() {
399            return Self::default();
400        }
401
402        let weight = x.len() as f64;
403        let mean = alg_sum_f64(x.iter().copied()) / weight;
404        let mut m2 = 0.0;
405        let mut m3 = 0.0;
406        let mut m4 = 0.0;
407        for xi in x.iter() {
408            let d = xi - mean;
409            let d2 = d * d;
410            let d3 = d * d2;
411            let d4 = d2 * d2;
412            m2 = alg_add_f64(m2, d2);
413            m3 = alg_add_f64(m3, d3);
414            m4 = alg_add_f64(m4, d4);
415        }
416        Self {
417            weight,
418            mean,
419            m2,
420            m3,
421            m4,
422        }
423    }
424
425    fn clear_zero_weight_nan(&mut self) {
426        // Clear NaNs due to division by zero.
427        if self.weight == 0.0 {
428            self.mean = 0.0;
429            self.m2 = 0.0;
430            self.m3 = 0.0;
431            self.m4 = 0.0;
432        }
433    }
434
435    pub fn insert_one(&mut self, x: f64) {
436        // Specialization of self.combine(&KurtosisState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
437        let new_weight = self.weight + 1.0;
438        let delta_mean = x - self.mean;
439        let delta_mean_weight = delta_mean / new_weight;
440        let new_mean = self.mean + delta_mean_weight;
441
442        let weight_diff = self.weight - 1.0;
443        let m2_update = (x - new_mean) * delta_mean;
444        let new_m2 = self.m2 + m2_update;
445        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
446        let new_m4 = self.m4
447            + delta_mean_weight
448                * (delta_mean_weight
449                    * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
450                    - 4.0 * self.m3);
451
452        self.weight = new_weight;
453        self.mean = new_mean;
454        self.m2 = new_m2;
455        self.m3 = new_m3;
456        self.m4 = new_m4;
457        self.clear_zero_weight_nan();
458    }
459
460    pub fn remove_one(&mut self, x: f64) {
461        // Specialization of self.combine(&KurtosisState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
462        let new_weight = self.weight - 1.0;
463        let delta_mean = x - self.mean;
464        let delta_mean_weight = delta_mean / new_weight;
465        let new_mean = self.mean - delta_mean_weight;
466
467        let weight_diff = self.weight + 1.0;
468        let m2_update = (new_mean - x) * delta_mean;
469        let new_m2 = self.m2 + m2_update;
470        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
471        let new_m4 = self.m4
472            + delta_mean_weight
473                * (delta_mean_weight
474                    * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
475                    + 4.0 * self.m3);
476
477        self.weight = new_weight;
478        self.mean = new_mean;
479        self.m2 = new_m2;
480        self.m3 = new_m3;
481        self.m4 = new_m4;
482        self.clear_zero_weight_nan();
483    }
484
485    pub fn combine(&mut self, other: &Self) {
486        if other.weight == 0.0 {
487            return;
488        } else if self.weight == 0.0 {
489            *self = other.clone();
490            return;
491        }
492
493        let new_weight = self.weight + other.weight;
494        let delta_mean = other.mean - self.mean;
495        let delta_mean_weight = delta_mean / new_weight;
496        let new_mean = self.mean + other.weight * delta_mean_weight;
497
498        let weight_diff = self.weight - other.weight;
499        let self_weight_other_m2 = self.weight * other.m2;
500        let other_weight_self_m2 = other.weight * self.m2;
501        let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
502        let new_m2 = self.m2 + other.m2 + m2_update;
503        let new_m3 = self.m3
504            + other.m3
505            + delta_mean_weight
506                * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
507        let new_m4 = self.m4
508            + other.m4
509            + delta_mean_weight
510                * (delta_mean_weight
511                    * (m2_update * (self.weight * weight_diff + other.weight * other.weight)
512                        + 6.0
513                            * (self.weight * self_weight_other_m2
514                                + other.weight * other_weight_self_m2))
515                    + 4.0 * (self.weight * other.m3 - other.weight * self.m3));
516
517        self.weight = new_weight;
518        self.mean = new_mean;
519        self.m2 = new_m2;
520        self.m3 = new_m3;
521        self.m4 = new_m4;
522        self.clear_zero_weight_nan();
523    }
524
525    pub fn finalize(&self, fisher: bool, bias: bool) -> Option<f64> {
526        let m4 = self.m4 / self.weight;
527        let m2 = self.m2 / self.weight;
528        let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
529        let biased_est = if is_zero { f64::NAN } else { m4 / (m2 * m2) };
530        let out = if bias {
531            if self.weight == 0.0 {
532                return None;
533            }
534
535            biased_est
536        } else {
537            if self.weight <= 3.0 {
538                return None;
539            }
540
541            let n = self.weight;
542            let nm1_nm2 = (n - 1.0) / (n - 2.0);
543            let np1_nm3 = (n + 1.0) / (n - 3.0);
544            let nm1_nm3 = (n - 1.0) / (n - 3.0);
545            nm1_nm2 * (np1_nm3 * biased_est - 3.0 * nm1_nm3) + 3.0
546        };
547
548        if fisher { Some(out - 3.0) } else { Some(out) }
549    }
550}
551
552fn chunk_as_float<T, I, F>(it: I, mut f: F)
553where
554    T: NativeType + AsPrimitive<f64>,
555    I: IntoIterator<Item = T>,
556    F: FnMut(&[f64]),
557{
558    let mut chunk = [0.0; CHUNK_SIZE];
559    let mut i = 0;
560    for val in it {
561        if i >= CHUNK_SIZE {
562            f(&chunk);
563            i = 0;
564        }
565        chunk[i] = val.as_();
566        i += 1;
567    }
568    if i > 0 {
569        f(&chunk[..i]);
570    }
571}
572
573fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
574where
575    T: NativeType + AsPrimitive<f64>,
576    U: NativeType + AsPrimitive<f64>,
577    I: IntoIterator<Item = (T, U)>,
578    F: FnMut(&[f64], &[f64]),
579{
580    let mut left_chunk = [0.0; CHUNK_SIZE];
581    let mut right_chunk = [0.0; CHUNK_SIZE];
582    let mut i = 0;
583    for (l, r) in it {
584        if i >= CHUNK_SIZE {
585            f(&left_chunk, &right_chunk);
586            i = 0;
587        }
588        left_chunk[i] = l.as_();
589        right_chunk[i] = r.as_();
590        i += 1;
591    }
592    if i > 0 {
593        f(&left_chunk[..i], &right_chunk[..i]);
594    }
595}
596
597pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
598where
599    T: NativeType + AsPrimitive<f64>,
600{
601    let mut out = VarState::default();
602    if arr.has_nulls() {
603        chunk_as_float(arr.non_null_values_iter(), |chunk| {
604            out.combine(&VarState::new(chunk))
605        });
606    } else {
607        chunk_as_float(arr.values().iter().copied(), |chunk| {
608            out.combine(&VarState::new(chunk))
609        });
610    }
611    out
612}
613
614pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
615where
616    T: NativeType + AsPrimitive<f64>,
617    U: NativeType + AsPrimitive<f64>,
618{
619    assert!(x.len() == y.len());
620    let mut out = CovState::default();
621    if x.has_nulls() || y.has_nulls() {
622        chunk_as_float_binary(
623            x.iter()
624                .zip(y.iter())
625                .filter_map(|(l, r)| l.copied().zip(r.copied())),
626            |l, r| out.combine(&CovState::new(l, r)),
627        );
628    } else {
629        chunk_as_float_binary(
630            x.values().iter().copied().zip(y.values().iter().copied()),
631            |l, r| out.combine(&CovState::new(l, r)),
632        );
633    }
634    out
635}
636
637pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
638where
639    T: NativeType + AsPrimitive<f64>,
640    U: NativeType + AsPrimitive<f64>,
641{
642    assert!(x.len() == y.len());
643    let mut out = PearsonState::default();
644    if x.has_nulls() || y.has_nulls() {
645        chunk_as_float_binary(
646            x.iter()
647                .zip(y.iter())
648                .filter_map(|(l, r)| l.copied().zip(r.copied())),
649            |l, r| out.combine(&PearsonState::new(l, r)),
650        );
651    } else {
652        chunk_as_float_binary(
653            x.values().iter().copied().zip(y.values().iter().copied()),
654            |l, r| out.combine(&PearsonState::new(l, r)),
655        );
656    }
657    out
658}
659
660pub fn skew<T>(arr: &PrimitiveArray<T>) -> SkewState
661where
662    T: NativeType + AsPrimitive<f64>,
663{
664    let mut out = SkewState::default();
665    if arr.has_nulls() {
666        chunk_as_float(arr.non_null_values_iter(), |chunk| {
667            out.combine(&SkewState::new(chunk))
668        });
669    } else {
670        chunk_as_float(arr.values().iter().copied(), |chunk| {
671            out.combine(&SkewState::new(chunk))
672        });
673    }
674    out
675}
676
677pub fn kurtosis<T>(arr: &PrimitiveArray<T>) -> KurtosisState
678where
679    T: NativeType + AsPrimitive<f64>,
680{
681    let mut out = KurtosisState::default();
682    if arr.has_nulls() {
683        chunk_as_float(arr.non_null_values_iter(), |chunk| {
684            out.combine(&KurtosisState::new(chunk))
685        });
686    } else {
687        chunk_as_float(arr.values().iter().copied(), |chunk| {
688            out.combine(&KurtosisState::new(chunk))
689        });
690    }
691    out
692}