1#![allow(clippy::collapsible_else_if)]
31
32use arrow::array::{Array, PrimitiveArray};
33use arrow::types::NativeType;
34use num_traits::AsPrimitive;
35use polars_utils::algebraic_ops::*;
36
37const CHUNK_SIZE: usize = 128;
38
39#[derive(Default, Clone)]
40pub struct VarState {
41 weight: f64,
42 mean: f64,
43 dp: f64,
44}
45
46#[derive(Default, Clone)]
47pub struct CovState {
48 weight: f64,
49 mean_x: f64,
50 mean_y: f64,
51 dp_xy: f64,
52}
53
54#[derive(Default, Clone)]
55pub struct PearsonState {
56 weight: f64,
57 mean_x: f64,
58 mean_y: f64,
59 dp_xx: f64,
60 dp_xy: f64,
61 dp_yy: f64,
62}
63
64impl VarState {
65 fn new(x: &[f64]) -> Self {
66 if x.is_empty() {
67 return Self::default();
68 }
69
70 let weight = x.len() as f64;
71 let mean = alg_sum_f64(x.iter().copied()) / weight;
72 Self {
73 weight,
74 mean,
75 dp: alg_sum_f64(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
76 }
77 }
78
79 fn clear_zero_weight_nan(&mut self) {
80 if self.weight == 0.0 {
82 self.mean = 0.0;
83 self.dp = 0.0;
84 }
85 }
86
87 pub fn insert_one(&mut self, x: f64) {
88 let new_weight = self.weight + 1.0;
91 let delta_mean = x - self.mean;
92 let new_mean = self.mean + delta_mean / new_weight;
93 self.dp += (x - new_mean) * delta_mean;
94 self.weight = new_weight;
95 self.mean = new_mean;
96 self.clear_zero_weight_nan();
97 }
98
99 pub fn remove_one(&mut self, x: f64) {
100 let new_weight = self.weight - 1.0;
103 let delta_mean = x - self.mean;
104 let new_mean = self.mean - delta_mean / new_weight;
105 self.dp -= (x - new_mean) * delta_mean;
106 self.weight = new_weight;
107 self.mean = new_mean;
108 self.clear_zero_weight_nan();
109 }
110
111 pub fn combine(&mut self, other: &Self) {
112 if other.weight == 0.0 {
113 return;
114 }
115
116 let new_weight = self.weight + other.weight;
117 let other_weight_frac = other.weight / new_weight;
118 let delta_mean = other.mean - self.mean;
119 let new_mean = self.mean + delta_mean * other_weight_frac;
120 self.dp += other.dp + other.weight * (other.mean - new_mean) * delta_mean;
121 self.weight = new_weight;
122 self.mean = new_mean;
123 self.clear_zero_weight_nan();
124 }
125
126 pub fn finalize(&self, ddof: u8) -> Option<f64> {
127 if self.weight <= ddof as f64 {
128 None
129 } else {
130 let var = self.dp / (self.weight - ddof as f64);
131 Some(if var < 0.0 {
132 0.0
135 } else {
136 var
137 })
138 }
139 }
140}
141
142impl CovState {
143 fn new(x: &[f64], y: &[f64]) -> Self {
144 assert!(x.len() == y.len());
145 if x.is_empty() {
146 return Self::default();
147 }
148
149 let weight = x.len() as f64;
150 let inv_weight = 1.0 / weight;
151 let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
152 let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
153 Self {
154 weight,
155 mean_x,
156 mean_y,
157 dp_xy: alg_sum_f64(
158 x.iter()
159 .zip(y)
160 .map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
161 ),
162 }
163 }
164
165 pub fn combine(&mut self, other: &Self) {
166 if other.weight == 0.0 {
167 return;
168 } else if self.weight == 0.0 {
169 *self = other.clone();
170 return;
171 }
172
173 let new_weight = self.weight + other.weight;
174 let other_weight_frac = other.weight / new_weight;
175 let delta_mean_x = other.mean_x - self.mean_x;
176 let delta_mean_y = other.mean_y - self.mean_y;
177 let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
178 let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
179 self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
180 self.weight = new_weight;
181 self.mean_x = new_mean_x;
182 self.mean_y = new_mean_y;
183 }
184
185 pub fn finalize(&self, ddof: u8) -> Option<f64> {
186 if self.weight <= ddof as f64 {
187 None
188 } else {
189 Some(self.dp_xy / (self.weight - ddof as f64))
190 }
191 }
192}
193
194impl PearsonState {
195 fn new(x: &[f64], y: &[f64]) -> Self {
196 assert!(x.len() == y.len());
197 if x.is_empty() {
198 return Self::default();
199 }
200
201 let weight = x.len() as f64;
202 let inv_weight = 1.0 / weight;
203 let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
204 let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
205 let mut dp_xx = 0.0;
206 let mut dp_xy = 0.0;
207 let mut dp_yy = 0.0;
208 for (xi, yi) in x.iter().zip(y.iter()) {
209 dp_xx = alg_add_f64(dp_xx, (xi - mean_x) * (xi - mean_x));
210 dp_xy = alg_add_f64(dp_xy, (xi - mean_x) * (yi - mean_y));
211 dp_yy = alg_add_f64(dp_yy, (yi - mean_y) * (yi - mean_y));
212 }
213 Self {
214 weight,
215 mean_x,
216 mean_y,
217 dp_xx,
218 dp_xy,
219 dp_yy,
220 }
221 }
222
223 pub fn combine(&mut self, other: &Self) {
224 if other.weight == 0.0 {
225 return;
226 } else if self.weight == 0.0 {
227 *self = other.clone();
228 return;
229 }
230
231 let new_weight = self.weight + other.weight;
232 let other_weight_frac = other.weight / new_weight;
233 let delta_mean_x = other.mean_x - self.mean_x;
234 let delta_mean_y = other.mean_y - self.mean_y;
235 let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
236 let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
237 self.dp_xx += other.dp_xx + other.weight * (other.mean_x - new_mean_x) * delta_mean_x;
238 self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
239 self.dp_yy += other.dp_yy + other.weight * (other.mean_y - new_mean_y) * delta_mean_y;
240 self.weight = new_weight;
241 self.mean_x = new_mean_x;
242 self.mean_y = new_mean_y;
243 }
244
245 pub fn finalize(&self) -> f64 {
246 let denom_sq = self.dp_xx * self.dp_yy;
247 if denom_sq > 0.0 {
248 self.dp_xy / denom_sq.sqrt()
249 } else {
250 f64::NAN
251 }
252 }
253}
254
255#[derive(Default, Clone)]
256pub struct SkewState {
257 weight: f64,
258 mean: f64,
259 m2: f64,
260 m3: f64,
261}
262
263impl SkewState {
264 fn new(x: &[f64]) -> Self {
265 Self::from_iter(x.iter().copied(), x.len())
266 }
267
268 fn from_iter(iter: impl Iterator<Item = f64> + Clone, length: usize) -> Self {
269 if length == 0 {
270 return Self::default();
271 }
272
273 let weight = length as f64;
274 let mean = alg_sum_f64(iter.clone()) / weight;
275 let mut m2 = 0.0;
276 let mut m3 = 0.0;
277 for xi in iter {
278 let d = xi - mean;
279 let d2 = d * d;
280 let d3 = d * d2;
281 m2 = alg_add_f64(m2, d2);
282 m3 = alg_add_f64(m3, d3);
283 }
284 Self {
285 weight,
286 mean,
287 m2,
288 m3,
289 }
290 }
291
292 fn clear_zero_weight_nan(&mut self) {
293 if self.weight == 0.0 {
295 self.mean = 0.0;
296 self.m2 = 0.0;
297 self.m3 = 0.0;
298 }
299 }
300
301 pub fn from_array(arr: &PrimitiveArray<f64>, start: usize, length: usize) -> Self {
302 let validity = arr.validity().cloned();
303 let validity = validity
304 .map(|v| v.sliced(start, length))
305 .filter(|v| v.unset_bits() > 0);
306
307 match validity {
308 None => Self::new(&arr.values().as_slice()[start..][..length]),
309 Some(validity) => {
310 let iter = arr.values()[start..][..length].iter().copied();
311 let iter = iter
312 .zip(validity.iter())
313 .filter_map(|(x, v)| v.then_some(x));
314 Self::from_iter(iter, validity.set_bits())
315 },
316 }
317 }
318
319 pub fn insert_one(&mut self, x: f64) {
320 let new_weight = self.weight + 1.0;
322 let delta_mean = x - self.mean;
323 let delta_mean_weight = delta_mean / new_weight;
324 let new_mean = self.mean + delta_mean_weight;
325
326 let weight_diff = self.weight - 1.0;
327 let m2_update = (x - new_mean) * delta_mean;
328 let new_m2 = self.m2 + m2_update;
329 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
330
331 self.weight = new_weight;
332 self.mean = new_mean;
333 self.m2 = new_m2;
334 self.m3 = new_m3;
335 self.clear_zero_weight_nan();
336 }
337
338 pub fn remove_one(&mut self, x: f64) {
339 let new_weight = self.weight - 1.0;
341 let delta_mean = x - self.mean;
342 let delta_mean_weight = delta_mean / new_weight;
343 let new_mean = self.mean - delta_mean_weight;
344
345 let weight_diff = self.weight + 1.0;
346 let m2_update = (new_mean - x) * delta_mean;
347 let new_m2 = self.m2 + m2_update;
348 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
349
350 self.weight = new_weight;
351 self.mean = new_mean;
352 self.m2 = new_m2;
353 self.m3 = new_m3;
354 self.clear_zero_weight_nan();
355 }
356
357 pub fn combine(&mut self, other: &Self) {
358 if other.weight == 0.0 {
359 return;
360 } else if self.weight == 0.0 {
361 *self = other.clone();
362 return;
363 }
364
365 let new_weight = self.weight + other.weight;
366 let delta_mean = other.mean - self.mean;
367 let delta_mean_weight = delta_mean / new_weight;
368 let new_mean = self.mean + other.weight * delta_mean_weight;
369
370 let weight_diff = self.weight - other.weight;
371 let self_weight_other_m2 = self.weight * other.m2;
372 let other_weight_self_m2 = other.weight * self.m2;
373 let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
374 let new_m2 = self.m2 + other.m2 + m2_update;
375 let new_m3 = self.m3
376 + other.m3
377 + delta_mean_weight
378 * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
379
380 self.weight = new_weight;
381 self.mean = new_mean;
382 self.m2 = new_m2;
383 self.m3 = new_m3;
384 self.clear_zero_weight_nan();
385 }
386
387 pub fn finalize(&self, bias: bool) -> Option<f64> {
388 let m2 = self.m2 / self.weight;
389 let m3 = self.m3 / self.weight;
390 let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
391 let biased_est = if is_zero { f64::NAN } else { m3 / m2.powf(1.5) };
392 if bias {
393 if self.weight == 0.0 {
394 None
395 } else {
396 Some(biased_est)
397 }
398 } else {
399 if self.weight <= 2.0 {
400 None
401 } else {
402 let correction = (self.weight * (self.weight - 1.0)).sqrt() / (self.weight - 2.0);
403 Some(correction * biased_est)
404 }
405 }
406 }
407}
408
409#[derive(Default, Clone)]
410pub struct KurtosisState {
411 weight: f64,
412 mean: f64,
413 m2: f64,
414 m3: f64,
415 m4: f64,
416}
417
418impl KurtosisState {
419 pub fn new(x: &[f64]) -> Self {
420 Self::from_iter(x.iter().copied(), x.len())
421 }
422
423 pub fn from_iter(iter: impl Iterator<Item = f64> + Clone, length: usize) -> Self {
424 if length == 0 {
425 return Self::default();
426 }
427
428 let weight = length as f64;
429 let mean = alg_sum_f64(iter.clone()) / weight;
430 let mut m2 = 0.0;
431 let mut m3 = 0.0;
432 let mut m4 = 0.0;
433 for xi in iter {
434 let d = xi - mean;
435 let d2 = d * d;
436 let d3 = d * d2;
437 let d4 = d2 * d2;
438 m2 = alg_add_f64(m2, d2);
439 m3 = alg_add_f64(m3, d3);
440 m4 = alg_add_f64(m4, d4);
441 }
442 Self {
443 weight,
444 mean,
445 m2,
446 m3,
447 m4,
448 }
449 }
450
451 pub fn from_array(arr: &PrimitiveArray<f64>, start: usize, length: usize) -> Self {
452 let validity = arr.validity().cloned();
453 let validity = validity
454 .map(|v| v.sliced(start, length))
455 .filter(|v| v.unset_bits() > 0);
456
457 match validity {
458 None => Self::new(&arr.values().as_slice()[start..][..length]),
459 Some(validity) => {
460 let iter = arr.values()[start..][..length].iter().copied();
461 let iter = iter
462 .zip(validity.iter())
463 .filter_map(|(x, v)| v.then_some(x));
464 Self::from_iter(iter, validity.set_bits())
465 },
466 }
467 }
468
469 fn clear_zero_weight_nan(&mut self) {
470 if self.weight == 0.0 {
472 self.mean = 0.0;
473 self.m2 = 0.0;
474 self.m3 = 0.0;
475 self.m4 = 0.0;
476 }
477 }
478
479 pub fn insert_one(&mut self, x: f64) {
480 let new_weight = self.weight + 1.0;
482 let delta_mean = x - self.mean;
483 let delta_mean_weight = delta_mean / new_weight;
484 let new_mean = self.mean + delta_mean_weight;
485
486 let weight_diff = self.weight - 1.0;
487 let m2_update = (x - new_mean) * delta_mean;
488 let new_m2 = self.m2 + m2_update;
489 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
490 let new_m4 = self.m4
491 + delta_mean_weight
492 * (delta_mean_weight
493 * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
494 - 4.0 * self.m3);
495
496 self.weight = new_weight;
497 self.mean = new_mean;
498 self.m2 = new_m2;
499 self.m3 = new_m3;
500 self.m4 = new_m4;
501 self.clear_zero_weight_nan();
502 }
503
504 pub fn remove_one(&mut self, x: f64) {
505 let new_weight = self.weight - 1.0;
507 let delta_mean = x - self.mean;
508 let delta_mean_weight = delta_mean / new_weight;
509 let new_mean = self.mean - delta_mean_weight;
510
511 let weight_diff = self.weight + 1.0;
512 let m2_update = (new_mean - x) * delta_mean;
513 let new_m2 = self.m2 + m2_update;
514 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
515 let new_m4 = self.m4
516 + delta_mean_weight
517 * (delta_mean_weight
518 * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
519 + 4.0 * self.m3);
520
521 self.weight = new_weight;
522 self.mean = new_mean;
523 self.m2 = new_m2;
524 self.m3 = new_m3;
525 self.m4 = new_m4;
526 self.clear_zero_weight_nan();
527 }
528
529 pub fn combine(&mut self, other: &Self) {
530 if other.weight == 0.0 {
531 return;
532 } else if self.weight == 0.0 {
533 *self = other.clone();
534 return;
535 }
536
537 let new_weight = self.weight + other.weight;
538 let delta_mean = other.mean - self.mean;
539 let delta_mean_weight = delta_mean / new_weight;
540 let new_mean = self.mean + other.weight * delta_mean_weight;
541
542 let weight_diff = self.weight - other.weight;
543 let self_weight_other_m2 = self.weight * other.m2;
544 let other_weight_self_m2 = other.weight * self.m2;
545 let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
546 let new_m2 = self.m2 + other.m2 + m2_update;
547 let new_m3 = self.m3
548 + other.m3
549 + delta_mean_weight
550 * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
551 let new_m4 = self.m4
552 + other.m4
553 + delta_mean_weight
554 * (delta_mean_weight
555 * (m2_update * (self.weight * weight_diff + other.weight * other.weight)
556 + 6.0
557 * (self.weight * self_weight_other_m2
558 + other.weight * other_weight_self_m2))
559 + 4.0 * (self.weight * other.m3 - other.weight * self.m3));
560
561 self.weight = new_weight;
562 self.mean = new_mean;
563 self.m2 = new_m2;
564 self.m3 = new_m3;
565 self.m4 = new_m4;
566 self.clear_zero_weight_nan();
567 }
568
569 pub fn finalize(&self, fisher: bool, bias: bool) -> Option<f64> {
570 let m4 = self.m4 / self.weight;
571 let m2 = self.m2 / self.weight;
572 let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
573 let biased_est = if is_zero { f64::NAN } else { m4 / (m2 * m2) };
574 let out = if bias {
575 if self.weight == 0.0 {
576 return None;
577 }
578
579 biased_est
580 } else {
581 if self.weight <= 3.0 {
582 return None;
583 }
584
585 let n = self.weight;
586 let nm1_nm2 = (n - 1.0) / (n - 2.0);
587 let np1_nm3 = (n + 1.0) / (n - 3.0);
588 let nm1_nm3 = (n - 1.0) / (n - 3.0);
589 nm1_nm2 * (np1_nm3 * biased_est - 3.0 * nm1_nm3) + 3.0
590 };
591
592 if fisher { Some(out - 3.0) } else { Some(out) }
593 }
594}
595
596fn chunk_as_float<T, I, F>(it: I, mut f: F)
597where
598 T: NativeType + AsPrimitive<f64>,
599 I: IntoIterator<Item = T>,
600 F: FnMut(&[f64]),
601{
602 let mut chunk = [0.0; CHUNK_SIZE];
603 let mut i = 0;
604 for val in it {
605 if i >= CHUNK_SIZE {
606 f(&chunk);
607 i = 0;
608 }
609 chunk[i] = val.as_();
610 i += 1;
611 }
612 if i > 0 {
613 f(&chunk[..i]);
614 }
615}
616
617fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
618where
619 T: NativeType + AsPrimitive<f64>,
620 U: NativeType + AsPrimitive<f64>,
621 I: IntoIterator<Item = (T, U)>,
622 F: FnMut(&[f64], &[f64]),
623{
624 let mut left_chunk = [0.0; CHUNK_SIZE];
625 let mut right_chunk = [0.0; CHUNK_SIZE];
626 let mut i = 0;
627 for (l, r) in it {
628 if i >= CHUNK_SIZE {
629 f(&left_chunk, &right_chunk);
630 i = 0;
631 }
632 left_chunk[i] = l.as_();
633 right_chunk[i] = r.as_();
634 i += 1;
635 }
636 if i > 0 {
637 f(&left_chunk[..i], &right_chunk[..i]);
638 }
639}
640
641pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
642where
643 T: NativeType + AsPrimitive<f64>,
644{
645 let mut out = VarState::default();
646 if arr.has_nulls() {
647 chunk_as_float(arr.non_null_values_iter(), |chunk| {
648 out.combine(&VarState::new(chunk))
649 });
650 } else {
651 chunk_as_float(arr.values().iter().copied(), |chunk| {
652 out.combine(&VarState::new(chunk))
653 });
654 }
655 out
656}
657
658pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
659where
660 T: NativeType + AsPrimitive<f64>,
661 U: NativeType + AsPrimitive<f64>,
662{
663 assert!(x.len() == y.len());
664 let mut out = CovState::default();
665 if x.has_nulls() || y.has_nulls() {
666 chunk_as_float_binary(
667 x.iter()
668 .zip(y.iter())
669 .filter_map(|(l, r)| l.copied().zip(r.copied())),
670 |l, r| out.combine(&CovState::new(l, r)),
671 );
672 } else {
673 chunk_as_float_binary(
674 x.values().iter().copied().zip(y.values().iter().copied()),
675 |l, r| out.combine(&CovState::new(l, r)),
676 );
677 }
678 out
679}
680
681pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
682where
683 T: NativeType + AsPrimitive<f64>,
684 U: NativeType + AsPrimitive<f64>,
685{
686 assert!(x.len() == y.len());
687 let mut out = PearsonState::default();
688 if x.has_nulls() || y.has_nulls() {
689 chunk_as_float_binary(
690 x.iter()
691 .zip(y.iter())
692 .filter_map(|(l, r)| l.copied().zip(r.copied())),
693 |l, r| out.combine(&PearsonState::new(l, r)),
694 );
695 } else {
696 chunk_as_float_binary(
697 x.values().iter().copied().zip(y.values().iter().copied()),
698 |l, r| out.combine(&PearsonState::new(l, r)),
699 );
700 }
701 out
702}
703
704pub fn skew<T>(arr: &PrimitiveArray<T>) -> SkewState
705where
706 T: NativeType + AsPrimitive<f64>,
707{
708 let mut out = SkewState::default();
709 if arr.has_nulls() {
710 chunk_as_float(arr.non_null_values_iter(), |chunk| {
711 out.combine(&SkewState::new(chunk))
712 });
713 } else {
714 chunk_as_float(arr.values().iter().copied(), |chunk| {
715 out.combine(&SkewState::new(chunk))
716 });
717 }
718 out
719}
720
721pub fn kurtosis<T>(arr: &PrimitiveArray<T>) -> KurtosisState
722where
723 T: NativeType + AsPrimitive<f64>,
724{
725 let mut out = KurtosisState::default();
726 if arr.has_nulls() {
727 chunk_as_float(arr.non_null_values_iter(), |chunk| {
728 out.combine(&KurtosisState::new(chunk))
729 });
730 } else {
731 chunk_as_float(arr.values().iter().copied(), |chunk| {
732 out.combine(&KurtosisState::new(chunk))
733 });
734 }
735 out
736}