polars_compute/
moment.rs

1// Some formulae:
2//     mean_x = sum(weight[i] * x[i]) / sum(weight)
3//     dp_xy = weighted sum of deviation products of variables x, y, written in
4//             the paper as simply XY.
5//     dp_xy = sum(weight[i] * (x[i] - mean_x) * (y[i] - mean_y))
6//
7//     cov(x, y) = dp_xy / sum(weight)
8//     var(x) = cov(x, x)
9//
10// Algorithms from:
11// Numerically stable parallel computation of (co-)variance.
12// Schubert, E. & Gertz, M. (2018).
13//
14// Key equations from the paper:
15// (17) for mean update, (23) for dp update (and also Table 1).
16//
17//
18// For higher moments we refer to:
19// Numerically Stable, Scalable Formulas for Parallel and Online Computation of
20// Higher-Order Multivariate Central Moments with Arbitrary Weights.
21// Pébay, P. & Terriberry, T. B. & Kolla, H. & Bennett J. (2016)
22//
23// Key equations from paper:
24// (3.26) mean update, (3.27) moment update.
25//
26// Here we use mk to mean the weighted kth central moment:
27//    mk = sum(weight[i] * (x[i] - mean_x)**k)
28// Note that we'll use the terms m2 = dp = dp_xx if unambiguous.
29
30#![allow(clippy::collapsible_else_if)]
31
32use arrow::array::{Array, PrimitiveArray};
33use arrow::types::NativeType;
34use num_traits::AsPrimitive;
35use polars_utils::algebraic_ops::*;
36
37const CHUNK_SIZE: usize = 128;
38
39#[derive(Default, Clone)]
40pub struct VarState {
41    weight: f64,
42    mean: f64,
43    dp: f64,
44}
45
46#[derive(Default, Clone)]
47pub struct CovState {
48    weight: f64,
49    mean_x: f64,
50    mean_y: f64,
51    dp_xy: f64,
52}
53
54#[derive(Default, Clone)]
55pub struct PearsonState {
56    weight: f64,
57    mean_x: f64,
58    mean_y: f64,
59    dp_xx: f64,
60    dp_xy: f64,
61    dp_yy: f64,
62}
63
64impl VarState {
65    fn new(x: &[f64]) -> Self {
66        if x.is_empty() {
67            return Self::default();
68        }
69
70        let weight = x.len() as f64;
71        let mean = alg_sum_f64(x.iter().copied()) / weight;
72        Self {
73            weight,
74            mean,
75            dp: alg_sum_f64(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
76        }
77    }
78
79    fn clear_zero_weight_nan(&mut self) {
80        // Clear NaNs due to division by zero.
81        if self.weight == 0.0 {
82            self.mean = 0.0;
83            self.dp = 0.0;
84        }
85    }
86
87    pub fn insert_one(&mut self, x: f64) {
88        // Just a specialized version of
89        // self.combine(&Self { weight: 1.0, mean: x, dp: 0.0 })
90        let new_weight = self.weight + 1.0;
91        let delta_mean = x - self.mean;
92        let new_mean = self.mean + delta_mean / new_weight;
93        self.dp += (x - new_mean) * delta_mean;
94        self.weight = new_weight;
95        self.mean = new_mean;
96        self.clear_zero_weight_nan();
97    }
98
99    pub fn remove_one(&mut self, x: f64) {
100        // Just a specialized version of
101        // self.combine(&Self { weight: -1.0, mean: x, dp: 0.0 })
102        let new_weight = self.weight - 1.0;
103        let delta_mean = x - self.mean;
104        let new_mean = self.mean - delta_mean / new_weight;
105        self.dp -= (x - new_mean) * delta_mean;
106        self.weight = new_weight;
107        self.mean = new_mean;
108        self.clear_zero_weight_nan();
109    }
110
111    pub fn combine(&mut self, other: &Self) {
112        if other.weight == 0.0 {
113            return;
114        }
115
116        let new_weight = self.weight + other.weight;
117        let other_weight_frac = other.weight / new_weight;
118        let delta_mean = other.mean - self.mean;
119        let new_mean = self.mean + delta_mean * other_weight_frac;
120        self.dp += other.dp + other.weight * (other.mean - new_mean) * delta_mean;
121        self.weight = new_weight;
122        self.mean = new_mean;
123        self.clear_zero_weight_nan();
124    }
125
126    pub fn finalize(&self, ddof: u8) -> Option<f64> {
127        if self.weight <= ddof as f64 {
128            None
129        } else {
130            let var = self.dp / (self.weight - ddof as f64);
131            Some(if var < 0.0 {
132                // Variance can't be negative, except through numerical instability.
133                // We don't use f64::max here so we propagate nans.
134                0.0
135            } else {
136                var
137            })
138        }
139    }
140}
141
142impl CovState {
143    fn new(x: &[f64], y: &[f64]) -> Self {
144        assert!(x.len() == y.len());
145        if x.is_empty() {
146            return Self::default();
147        }
148
149        let weight = x.len() as f64;
150        let inv_weight = 1.0 / weight;
151        let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
152        let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
153        Self {
154            weight,
155            mean_x,
156            mean_y,
157            dp_xy: alg_sum_f64(
158                x.iter()
159                    .zip(y)
160                    .map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
161            ),
162        }
163    }
164
165    pub fn combine(&mut self, other: &Self) {
166        if other.weight == 0.0 {
167            return;
168        } else if self.weight == 0.0 {
169            *self = other.clone();
170            return;
171        }
172
173        let new_weight = self.weight + other.weight;
174        let other_weight_frac = other.weight / new_weight;
175        let delta_mean_x = other.mean_x - self.mean_x;
176        let delta_mean_y = other.mean_y - self.mean_y;
177        let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
178        let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
179        self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
180        self.weight = new_weight;
181        self.mean_x = new_mean_x;
182        self.mean_y = new_mean_y;
183    }
184
185    pub fn finalize(&self, ddof: u8) -> Option<f64> {
186        if self.weight <= ddof as f64 {
187            None
188        } else {
189            Some(self.dp_xy / (self.weight - ddof as f64))
190        }
191    }
192}
193
194impl PearsonState {
195    fn new(x: &[f64], y: &[f64]) -> Self {
196        assert!(x.len() == y.len());
197        if x.is_empty() {
198            return Self::default();
199        }
200
201        let weight = x.len() as f64;
202        let inv_weight = 1.0 / weight;
203        let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
204        let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
205        let mut dp_xx = 0.0;
206        let mut dp_xy = 0.0;
207        let mut dp_yy = 0.0;
208        for (xi, yi) in x.iter().zip(y.iter()) {
209            dp_xx = alg_add_f64(dp_xx, (xi - mean_x) * (xi - mean_x));
210            dp_xy = alg_add_f64(dp_xy, (xi - mean_x) * (yi - mean_y));
211            dp_yy = alg_add_f64(dp_yy, (yi - mean_y) * (yi - mean_y));
212        }
213        Self {
214            weight,
215            mean_x,
216            mean_y,
217            dp_xx,
218            dp_xy,
219            dp_yy,
220        }
221    }
222
223    pub fn combine(&mut self, other: &Self) {
224        if other.weight == 0.0 {
225            return;
226        } else if self.weight == 0.0 {
227            *self = other.clone();
228            return;
229        }
230
231        let new_weight = self.weight + other.weight;
232        let other_weight_frac = other.weight / new_weight;
233        let delta_mean_x = other.mean_x - self.mean_x;
234        let delta_mean_y = other.mean_y - self.mean_y;
235        let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
236        let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
237        self.dp_xx += other.dp_xx + other.weight * (other.mean_x - new_mean_x) * delta_mean_x;
238        self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
239        self.dp_yy += other.dp_yy + other.weight * (other.mean_y - new_mean_y) * delta_mean_y;
240        self.weight = new_weight;
241        self.mean_x = new_mean_x;
242        self.mean_y = new_mean_y;
243    }
244
245    pub fn finalize(&self) -> f64 {
246        let denom_sq = self.dp_xx * self.dp_yy;
247        if denom_sq > 0.0 {
248            self.dp_xy / denom_sq.sqrt()
249        } else {
250            f64::NAN
251        }
252    }
253}
254
255#[derive(Default, Clone)]
256pub struct SkewState {
257    weight: f64,
258    mean: f64,
259    m2: f64,
260    m3: f64,
261}
262
263impl SkewState {
264    fn new(x: &[f64]) -> Self {
265        Self::from_iter(x.iter().copied(), x.len())
266    }
267
268    fn from_iter(iter: impl Iterator<Item = f64> + Clone, length: usize) -> Self {
269        if length == 0 {
270            return Self::default();
271        }
272
273        let weight = length as f64;
274        let mean = alg_sum_f64(iter.clone()) / weight;
275        let mut m2 = 0.0;
276        let mut m3 = 0.0;
277        for xi in iter {
278            let d = xi - mean;
279            let d2 = d * d;
280            let d3 = d * d2;
281            m2 = alg_add_f64(m2, d2);
282            m3 = alg_add_f64(m3, d3);
283        }
284        Self {
285            weight,
286            mean,
287            m2,
288            m3,
289        }
290    }
291
292    fn clear_zero_weight_nan(&mut self) {
293        // Clear NaNs due to division by zero.
294        if self.weight == 0.0 {
295            self.mean = 0.0;
296            self.m2 = 0.0;
297            self.m3 = 0.0;
298        }
299    }
300
301    pub fn from_array(arr: &PrimitiveArray<f64>, start: usize, length: usize) -> Self {
302        let validity = arr.validity().cloned();
303        let validity = validity
304            .map(|v| v.sliced(start, length))
305            .filter(|v| v.unset_bits() > 0);
306
307        match validity {
308            None => Self::new(&arr.values().as_slice()[start..][..length]),
309            Some(validity) => {
310                let iter = arr.values()[start..][..length].iter().copied();
311                let iter = iter
312                    .zip(validity.iter())
313                    .filter_map(|(x, v)| v.then_some(x));
314                Self::from_iter(iter, validity.set_bits())
315            },
316        }
317    }
318
319    pub fn insert_one(&mut self, x: f64) {
320        // Specialization of self.combine(&SkewState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0 });
321        let new_weight = self.weight + 1.0;
322        let delta_mean = x - self.mean;
323        let delta_mean_weight = delta_mean / new_weight;
324        let new_mean = self.mean + delta_mean_weight;
325
326        let weight_diff = self.weight - 1.0;
327        let m2_update = (x - new_mean) * delta_mean;
328        let new_m2 = self.m2 + m2_update;
329        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
330
331        self.weight = new_weight;
332        self.mean = new_mean;
333        self.m2 = new_m2;
334        self.m3 = new_m3;
335        self.clear_zero_weight_nan();
336    }
337
338    pub fn remove_one(&mut self, x: f64) {
339        // Specialization of self.combine(&SkewState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0 });
340        let new_weight = self.weight - 1.0;
341        let delta_mean = x - self.mean;
342        let delta_mean_weight = delta_mean / new_weight;
343        let new_mean = self.mean - delta_mean_weight;
344
345        let weight_diff = self.weight + 1.0;
346        let m2_update = (new_mean - x) * delta_mean;
347        let new_m2 = self.m2 + m2_update;
348        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
349
350        self.weight = new_weight;
351        self.mean = new_mean;
352        self.m2 = new_m2;
353        self.m3 = new_m3;
354        self.clear_zero_weight_nan();
355    }
356
357    pub fn combine(&mut self, other: &Self) {
358        if other.weight == 0.0 {
359            return;
360        } else if self.weight == 0.0 {
361            *self = other.clone();
362            return;
363        }
364
365        let new_weight = self.weight + other.weight;
366        let delta_mean = other.mean - self.mean;
367        let delta_mean_weight = delta_mean / new_weight;
368        let new_mean = self.mean + other.weight * delta_mean_weight;
369
370        let weight_diff = self.weight - other.weight;
371        let self_weight_other_m2 = self.weight * other.m2;
372        let other_weight_self_m2 = other.weight * self.m2;
373        let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
374        let new_m2 = self.m2 + other.m2 + m2_update;
375        let new_m3 = self.m3
376            + other.m3
377            + delta_mean_weight
378                * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
379
380        self.weight = new_weight;
381        self.mean = new_mean;
382        self.m2 = new_m2;
383        self.m3 = new_m3;
384        self.clear_zero_weight_nan();
385    }
386
387    pub fn finalize(&self, bias: bool) -> Option<f64> {
388        let m2 = self.m2 / self.weight;
389        let m3 = self.m3 / self.weight;
390        let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
391        let biased_est = if is_zero { f64::NAN } else { m3 / m2.powf(1.5) };
392        if bias {
393            if self.weight == 0.0 {
394                None
395            } else {
396                Some(biased_est)
397            }
398        } else {
399            if self.weight <= 2.0 {
400                None
401            } else {
402                let correction = (self.weight * (self.weight - 1.0)).sqrt() / (self.weight - 2.0);
403                Some(correction * biased_est)
404            }
405        }
406    }
407}
408
409#[derive(Default, Clone)]
410pub struct KurtosisState {
411    weight: f64,
412    mean: f64,
413    m2: f64,
414    m3: f64,
415    m4: f64,
416}
417
418impl KurtosisState {
419    pub fn new(x: &[f64]) -> Self {
420        Self::from_iter(x.iter().copied(), x.len())
421    }
422
423    pub fn from_iter(iter: impl Iterator<Item = f64> + Clone, length: usize) -> Self {
424        if length == 0 {
425            return Self::default();
426        }
427
428        let weight = length as f64;
429        let mean = alg_sum_f64(iter.clone()) / weight;
430        let mut m2 = 0.0;
431        let mut m3 = 0.0;
432        let mut m4 = 0.0;
433        for xi in iter {
434            let d = xi - mean;
435            let d2 = d * d;
436            let d3 = d * d2;
437            let d4 = d2 * d2;
438            m2 = alg_add_f64(m2, d2);
439            m3 = alg_add_f64(m3, d3);
440            m4 = alg_add_f64(m4, d4);
441        }
442        Self {
443            weight,
444            mean,
445            m2,
446            m3,
447            m4,
448        }
449    }
450
451    pub fn from_array(arr: &PrimitiveArray<f64>, start: usize, length: usize) -> Self {
452        let validity = arr.validity().cloned();
453        let validity = validity
454            .map(|v| v.sliced(start, length))
455            .filter(|v| v.unset_bits() > 0);
456
457        match validity {
458            None => Self::new(&arr.values().as_slice()[start..][..length]),
459            Some(validity) => {
460                let iter = arr.values()[start..][..length].iter().copied();
461                let iter = iter
462                    .zip(validity.iter())
463                    .filter_map(|(x, v)| v.then_some(x));
464                Self::from_iter(iter, validity.set_bits())
465            },
466        }
467    }
468
469    fn clear_zero_weight_nan(&mut self) {
470        // Clear NaNs due to division by zero.
471        if self.weight == 0.0 {
472            self.mean = 0.0;
473            self.m2 = 0.0;
474            self.m3 = 0.0;
475            self.m4 = 0.0;
476        }
477    }
478
479    pub fn insert_one(&mut self, x: f64) {
480        // Specialization of self.combine(&KurtosisState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
481        let new_weight = self.weight + 1.0;
482        let delta_mean = x - self.mean;
483        let delta_mean_weight = delta_mean / new_weight;
484        let new_mean = self.mean + delta_mean_weight;
485
486        let weight_diff = self.weight - 1.0;
487        let m2_update = (x - new_mean) * delta_mean;
488        let new_m2 = self.m2 + m2_update;
489        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
490        let new_m4 = self.m4
491            + delta_mean_weight
492                * (delta_mean_weight
493                    * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
494                    - 4.0 * self.m3);
495
496        self.weight = new_weight;
497        self.mean = new_mean;
498        self.m2 = new_m2;
499        self.m3 = new_m3;
500        self.m4 = new_m4;
501        self.clear_zero_weight_nan();
502    }
503
504    pub fn remove_one(&mut self, x: f64) {
505        // Specialization of self.combine(&KurtosisState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
506        let new_weight = self.weight - 1.0;
507        let delta_mean = x - self.mean;
508        let delta_mean_weight = delta_mean / new_weight;
509        let new_mean = self.mean - delta_mean_weight;
510
511        let weight_diff = self.weight + 1.0;
512        let m2_update = (new_mean - x) * delta_mean;
513        let new_m2 = self.m2 + m2_update;
514        let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
515        let new_m4 = self.m4
516            + delta_mean_weight
517                * (delta_mean_weight
518                    * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
519                    + 4.0 * self.m3);
520
521        self.weight = new_weight;
522        self.mean = new_mean;
523        self.m2 = new_m2;
524        self.m3 = new_m3;
525        self.m4 = new_m4;
526        self.clear_zero_weight_nan();
527    }
528
529    pub fn combine(&mut self, other: &Self) {
530        if other.weight == 0.0 {
531            return;
532        } else if self.weight == 0.0 {
533            *self = other.clone();
534            return;
535        }
536
537        let new_weight = self.weight + other.weight;
538        let delta_mean = other.mean - self.mean;
539        let delta_mean_weight = delta_mean / new_weight;
540        let new_mean = self.mean + other.weight * delta_mean_weight;
541
542        let weight_diff = self.weight - other.weight;
543        let self_weight_other_m2 = self.weight * other.m2;
544        let other_weight_self_m2 = other.weight * self.m2;
545        let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
546        let new_m2 = self.m2 + other.m2 + m2_update;
547        let new_m3 = self.m3
548            + other.m3
549            + delta_mean_weight
550                * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
551        let new_m4 = self.m4
552            + other.m4
553            + delta_mean_weight
554                * (delta_mean_weight
555                    * (m2_update * (self.weight * weight_diff + other.weight * other.weight)
556                        + 6.0
557                            * (self.weight * self_weight_other_m2
558                                + other.weight * other_weight_self_m2))
559                    + 4.0 * (self.weight * other.m3 - other.weight * self.m3));
560
561        self.weight = new_weight;
562        self.mean = new_mean;
563        self.m2 = new_m2;
564        self.m3 = new_m3;
565        self.m4 = new_m4;
566        self.clear_zero_weight_nan();
567    }
568
569    pub fn finalize(&self, fisher: bool, bias: bool) -> Option<f64> {
570        let m4 = self.m4 / self.weight;
571        let m2 = self.m2 / self.weight;
572        let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
573        let biased_est = if is_zero { f64::NAN } else { m4 / (m2 * m2) };
574        let out = if bias {
575            if self.weight == 0.0 {
576                return None;
577            }
578
579            biased_est
580        } else {
581            if self.weight <= 3.0 {
582                return None;
583            }
584
585            let n = self.weight;
586            let nm1_nm2 = (n - 1.0) / (n - 2.0);
587            let np1_nm3 = (n + 1.0) / (n - 3.0);
588            let nm1_nm3 = (n - 1.0) / (n - 3.0);
589            nm1_nm2 * (np1_nm3 * biased_est - 3.0 * nm1_nm3) + 3.0
590        };
591
592        if fisher { Some(out - 3.0) } else { Some(out) }
593    }
594}
595
596fn chunk_as_float<T, I, F>(it: I, mut f: F)
597where
598    T: NativeType + AsPrimitive<f64>,
599    I: IntoIterator<Item = T>,
600    F: FnMut(&[f64]),
601{
602    let mut chunk = [0.0; CHUNK_SIZE];
603    let mut i = 0;
604    for val in it {
605        if i >= CHUNK_SIZE {
606            f(&chunk);
607            i = 0;
608        }
609        chunk[i] = val.as_();
610        i += 1;
611    }
612    if i > 0 {
613        f(&chunk[..i]);
614    }
615}
616
617fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
618where
619    T: NativeType + AsPrimitive<f64>,
620    U: NativeType + AsPrimitive<f64>,
621    I: IntoIterator<Item = (T, U)>,
622    F: FnMut(&[f64], &[f64]),
623{
624    let mut left_chunk = [0.0; CHUNK_SIZE];
625    let mut right_chunk = [0.0; CHUNK_SIZE];
626    let mut i = 0;
627    for (l, r) in it {
628        if i >= CHUNK_SIZE {
629            f(&left_chunk, &right_chunk);
630            i = 0;
631        }
632        left_chunk[i] = l.as_();
633        right_chunk[i] = r.as_();
634        i += 1;
635    }
636    if i > 0 {
637        f(&left_chunk[..i], &right_chunk[..i]);
638    }
639}
640
641pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
642where
643    T: NativeType + AsPrimitive<f64>,
644{
645    let mut out = VarState::default();
646    if arr.has_nulls() {
647        chunk_as_float(arr.non_null_values_iter(), |chunk| {
648            out.combine(&VarState::new(chunk))
649        });
650    } else {
651        chunk_as_float(arr.values().iter().copied(), |chunk| {
652            out.combine(&VarState::new(chunk))
653        });
654    }
655    out
656}
657
658pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
659where
660    T: NativeType + AsPrimitive<f64>,
661    U: NativeType + AsPrimitive<f64>,
662{
663    assert!(x.len() == y.len());
664    let mut out = CovState::default();
665    if x.has_nulls() || y.has_nulls() {
666        chunk_as_float_binary(
667            x.iter()
668                .zip(y.iter())
669                .filter_map(|(l, r)| l.copied().zip(r.copied())),
670            |l, r| out.combine(&CovState::new(l, r)),
671        );
672    } else {
673        chunk_as_float_binary(
674            x.values().iter().copied().zip(y.values().iter().copied()),
675            |l, r| out.combine(&CovState::new(l, r)),
676        );
677    }
678    out
679}
680
681pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
682where
683    T: NativeType + AsPrimitive<f64>,
684    U: NativeType + AsPrimitive<f64>,
685{
686    assert!(x.len() == y.len());
687    let mut out = PearsonState::default();
688    if x.has_nulls() || y.has_nulls() {
689        chunk_as_float_binary(
690            x.iter()
691                .zip(y.iter())
692                .filter_map(|(l, r)| l.copied().zip(r.copied())),
693            |l, r| out.combine(&PearsonState::new(l, r)),
694        );
695    } else {
696        chunk_as_float_binary(
697            x.values().iter().copied().zip(y.values().iter().copied()),
698            |l, r| out.combine(&PearsonState::new(l, r)),
699        );
700    }
701    out
702}
703
704pub fn skew<T>(arr: &PrimitiveArray<T>) -> SkewState
705where
706    T: NativeType + AsPrimitive<f64>,
707{
708    let mut out = SkewState::default();
709    if arr.has_nulls() {
710        chunk_as_float(arr.non_null_values_iter(), |chunk| {
711            out.combine(&SkewState::new(chunk))
712        });
713    } else {
714        chunk_as_float(arr.values().iter().copied(), |chunk| {
715            out.combine(&SkewState::new(chunk))
716        });
717    }
718    out
719}
720
721pub fn kurtosis<T>(arr: &PrimitiveArray<T>) -> KurtosisState
722where
723    T: NativeType + AsPrimitive<f64>,
724{
725    let mut out = KurtosisState::default();
726    if arr.has_nulls() {
727        chunk_as_float(arr.non_null_values_iter(), |chunk| {
728            out.combine(&KurtosisState::new(chunk))
729        });
730    } else {
731        chunk_as_float(arr.values().iter().copied(), |chunk| {
732            out.combine(&KurtosisState::new(chunk))
733        });
734    }
735    out
736}