1#![allow(clippy::collapsible_else_if)]
31
32use arrow::array::{Array, PrimitiveArray};
33use arrow::types::NativeType;
34use num_traits::AsPrimitive;
35use polars_utils::algebraic_ops::*;
36
37const CHUNK_SIZE: usize = 128;
38
39#[derive(Default, Clone)]
40pub struct VarState {
41 weight: f64,
42 mean: f64,
43 dp: f64,
44}
45
46#[derive(Default, Clone)]
47pub struct CovState {
48 weight: f64,
49 mean_x: f64,
50 mean_y: f64,
51 dp_xy: f64,
52}
53
54#[derive(Default, Clone)]
55pub struct PearsonState {
56 weight: f64,
57 mean_x: f64,
58 mean_y: f64,
59 dp_xx: f64,
60 dp_xy: f64,
61 dp_yy: f64,
62}
63
64impl VarState {
65 fn new(x: &[f64]) -> Self {
66 if x.is_empty() {
67 return Self::default();
68 }
69
70 let weight = x.len() as f64;
71 let mean = alg_sum_f64(x.iter().copied()) / weight;
72 Self {
73 weight,
74 mean,
75 dp: alg_sum_f64(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
76 }
77 }
78
79 fn clear_zero_weight_nan(&mut self) {
80 if self.weight == 0.0 {
82 self.mean = 0.0;
83 self.dp = 0.0;
84 }
85 }
86
87 pub fn insert_one(&mut self, x: f64) {
88 let new_weight = self.weight + 1.0;
91 let delta_mean = x - self.mean;
92 let new_mean = self.mean + delta_mean / new_weight;
93 self.dp += (x - new_mean) * delta_mean;
94 self.weight = new_weight;
95 self.mean = new_mean;
96 self.clear_zero_weight_nan();
97 }
98
99 pub fn remove_one(&mut self, x: f64) {
100 let new_weight = self.weight - 1.0;
103 let delta_mean = x - self.mean;
104 let new_mean = self.mean - delta_mean / new_weight;
105 self.dp -= (x - new_mean) * delta_mean;
106 self.weight = new_weight;
107 self.mean = new_mean;
108 self.clear_zero_weight_nan();
109 }
110
111 pub fn combine(&mut self, other: &Self) {
112 if other.weight == 0.0 {
113 return;
114 }
115
116 let new_weight = self.weight + other.weight;
117 let other_weight_frac = other.weight / new_weight;
118 let delta_mean = other.mean - self.mean;
119 let new_mean = self.mean + delta_mean * other_weight_frac;
120 self.dp += other.dp + other.weight * (other.mean - new_mean) * delta_mean;
121 self.weight = new_weight;
122 self.mean = new_mean;
123 self.clear_zero_weight_nan();
124 }
125
126 pub fn finalize(&self, ddof: u8) -> Option<f64> {
127 if self.weight <= ddof as f64 {
128 None
129 } else {
130 let var = self.dp / (self.weight - ddof as f64);
131 Some(if var < 0.0 {
132 0.0
135 } else {
136 var
137 })
138 }
139 }
140}
141
142impl CovState {
143 fn new(x: &[f64], y: &[f64]) -> Self {
144 assert!(x.len() == y.len());
145 if x.is_empty() {
146 return Self::default();
147 }
148
149 let weight = x.len() as f64;
150 let inv_weight = 1.0 / weight;
151 let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
152 let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
153 Self {
154 weight,
155 mean_x,
156 mean_y,
157 dp_xy: alg_sum_f64(
158 x.iter()
159 .zip(y)
160 .map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
161 ),
162 }
163 }
164
165 pub fn combine(&mut self, other: &Self) {
166 if other.weight == 0.0 {
167 return;
168 } else if self.weight == 0.0 {
169 *self = other.clone();
170 return;
171 }
172
173 let new_weight = self.weight + other.weight;
174 let other_weight_frac = other.weight / new_weight;
175 let delta_mean_x = other.mean_x - self.mean_x;
176 let delta_mean_y = other.mean_y - self.mean_y;
177 let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
178 let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
179 self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
180 self.weight = new_weight;
181 self.mean_x = new_mean_x;
182 self.mean_y = new_mean_y;
183 }
184
185 pub fn finalize(&self, ddof: u8) -> Option<f64> {
186 if self.weight <= ddof as f64 {
187 None
188 } else {
189 Some(self.dp_xy / (self.weight - ddof as f64))
190 }
191 }
192}
193
194impl PearsonState {
195 fn new(x: &[f64], y: &[f64]) -> Self {
196 assert!(x.len() == y.len());
197 if x.is_empty() {
198 return Self::default();
199 }
200
201 let weight = x.len() as f64;
202 let inv_weight = 1.0 / weight;
203 let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
204 let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
205 let mut dp_xx = 0.0;
206 let mut dp_xy = 0.0;
207 let mut dp_yy = 0.0;
208 for (xi, yi) in x.iter().zip(y.iter()) {
209 dp_xx = alg_add_f64(dp_xx, (xi - mean_x) * (xi - mean_x));
210 dp_xy = alg_add_f64(dp_xy, (xi - mean_x) * (yi - mean_y));
211 dp_yy = alg_add_f64(dp_yy, (yi - mean_y) * (yi - mean_y));
212 }
213 Self {
214 weight,
215 mean_x,
216 mean_y,
217 dp_xx,
218 dp_xy,
219 dp_yy,
220 }
221 }
222
223 pub fn combine(&mut self, other: &Self) {
224 if other.weight == 0.0 {
225 return;
226 } else if self.weight == 0.0 {
227 *self = other.clone();
228 return;
229 }
230
231 let new_weight = self.weight + other.weight;
232 let other_weight_frac = other.weight / new_weight;
233 let delta_mean_x = other.mean_x - self.mean_x;
234 let delta_mean_y = other.mean_y - self.mean_y;
235 let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
236 let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
237 self.dp_xx += other.dp_xx + other.weight * (other.mean_x - new_mean_x) * delta_mean_x;
238 self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
239 self.dp_yy += other.dp_yy + other.weight * (other.mean_y - new_mean_y) * delta_mean_y;
240 self.weight = new_weight;
241 self.mean_x = new_mean_x;
242 self.mean_y = new_mean_y;
243 }
244
245 pub fn finalize(&self) -> f64 {
246 let denom_sq = self.dp_xx * self.dp_yy;
247 if denom_sq > 0.0 {
248 self.dp_xy / denom_sq.sqrt()
249 } else {
250 f64::NAN
251 }
252 }
253}
254
255#[derive(Default, Clone)]
256pub struct SkewState {
257 weight: f64,
258 mean: f64,
259 m2: f64,
260 m3: f64,
261}
262
263impl SkewState {
264 fn new(x: &[f64]) -> Self {
265 if x.is_empty() {
266 return Self::default();
267 }
268
269 let weight = x.len() as f64;
270 let mean = alg_sum_f64(x.iter().copied()) / weight;
271 let mut m2 = 0.0;
272 let mut m3 = 0.0;
273 for xi in x.iter() {
274 let d = xi - mean;
275 let d2 = d * d;
276 let d3 = d * d2;
277 m2 = alg_add_f64(m2, d2);
278 m3 = alg_add_f64(m3, d3);
279 }
280 Self {
281 weight,
282 mean,
283 m2,
284 m3,
285 }
286 }
287
288 fn clear_zero_weight_nan(&mut self) {
289 if self.weight == 0.0 {
291 self.mean = 0.0;
292 self.m2 = 0.0;
293 self.m3 = 0.0;
294 }
295 }
296
297 pub fn insert_one(&mut self, x: f64) {
298 let new_weight = self.weight + 1.0;
300 let delta_mean = x - self.mean;
301 let delta_mean_weight = delta_mean / new_weight;
302 let new_mean = self.mean + delta_mean_weight;
303
304 let weight_diff = self.weight - 1.0;
305 let m2_update = (x - new_mean) * delta_mean;
306 let new_m2 = self.m2 + m2_update;
307 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
308
309 self.weight = new_weight;
310 self.mean = new_mean;
311 self.m2 = new_m2;
312 self.m3 = new_m3;
313 self.clear_zero_weight_nan();
314 }
315
316 pub fn remove_one(&mut self, x: f64) {
317 let new_weight = self.weight - 1.0;
319 let delta_mean = x - self.mean;
320 let delta_mean_weight = delta_mean / new_weight;
321 let new_mean = self.mean - delta_mean_weight;
322
323 let weight_diff = self.weight + 1.0;
324 let m2_update = (new_mean - x) * delta_mean;
325 let new_m2 = self.m2 + m2_update;
326 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
327
328 self.weight = new_weight;
329 self.mean = new_mean;
330 self.m2 = new_m2;
331 self.m3 = new_m3;
332 self.clear_zero_weight_nan();
333 }
334
335 pub fn combine(&mut self, other: &Self) {
336 if other.weight == 0.0 {
337 return;
338 } else if self.weight == 0.0 {
339 *self = other.clone();
340 return;
341 }
342
343 let new_weight = self.weight + other.weight;
344 let delta_mean = other.mean - self.mean;
345 let delta_mean_weight = delta_mean / new_weight;
346 let new_mean = self.mean + other.weight * delta_mean_weight;
347
348 let weight_diff = self.weight - other.weight;
349 let self_weight_other_m2 = self.weight * other.m2;
350 let other_weight_self_m2 = other.weight * self.m2;
351 let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
352 let new_m2 = self.m2 + other.m2 + m2_update;
353 let new_m3 = self.m3
354 + other.m3
355 + delta_mean_weight
356 * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
357
358 self.weight = new_weight;
359 self.mean = new_mean;
360 self.m2 = new_m2;
361 self.m3 = new_m3;
362 self.clear_zero_weight_nan();
363 }
364
365 pub fn finalize(&self, bias: bool) -> Option<f64> {
366 let m2 = self.m2 / self.weight;
367 let m3 = self.m3 / self.weight;
368 let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
369 let biased_est = if is_zero { f64::NAN } else { m3 / m2.powf(1.5) };
370 if bias {
371 if self.weight == 0.0 {
372 None
373 } else {
374 Some(biased_est)
375 }
376 } else {
377 if self.weight <= 2.0 {
378 None
379 } else {
380 let correction = (self.weight * (self.weight - 1.0)).sqrt() / (self.weight - 2.0);
381 Some(correction * biased_est)
382 }
383 }
384 }
385}
386
387#[derive(Default, Clone)]
388pub struct KurtosisState {
389 weight: f64,
390 mean: f64,
391 m2: f64,
392 m3: f64,
393 m4: f64,
394}
395
396impl KurtosisState {
397 fn new(x: &[f64]) -> Self {
398 if x.is_empty() {
399 return Self::default();
400 }
401
402 let weight = x.len() as f64;
403 let mean = alg_sum_f64(x.iter().copied()) / weight;
404 let mut m2 = 0.0;
405 let mut m3 = 0.0;
406 let mut m4 = 0.0;
407 for xi in x.iter() {
408 let d = xi - mean;
409 let d2 = d * d;
410 let d3 = d * d2;
411 let d4 = d2 * d2;
412 m2 = alg_add_f64(m2, d2);
413 m3 = alg_add_f64(m3, d3);
414 m4 = alg_add_f64(m4, d4);
415 }
416 Self {
417 weight,
418 mean,
419 m2,
420 m3,
421 m4,
422 }
423 }
424
425 fn clear_zero_weight_nan(&mut self) {
426 if self.weight == 0.0 {
428 self.mean = 0.0;
429 self.m2 = 0.0;
430 self.m3 = 0.0;
431 self.m4 = 0.0;
432 }
433 }
434
435 pub fn insert_one(&mut self, x: f64) {
436 let new_weight = self.weight + 1.0;
438 let delta_mean = x - self.mean;
439 let delta_mean_weight = delta_mean / new_weight;
440 let new_mean = self.mean + delta_mean_weight;
441
442 let weight_diff = self.weight - 1.0;
443 let m2_update = (x - new_mean) * delta_mean;
444 let new_m2 = self.m2 + m2_update;
445 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
446 let new_m4 = self.m4
447 + delta_mean_weight
448 * (delta_mean_weight
449 * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
450 - 4.0 * self.m3);
451
452 self.weight = new_weight;
453 self.mean = new_mean;
454 self.m2 = new_m2;
455 self.m3 = new_m3;
456 self.m4 = new_m4;
457 self.clear_zero_weight_nan();
458 }
459
460 pub fn remove_one(&mut self, x: f64) {
461 let new_weight = self.weight - 1.0;
463 let delta_mean = x - self.mean;
464 let delta_mean_weight = delta_mean / new_weight;
465 let new_mean = self.mean - delta_mean_weight;
466
467 let weight_diff = self.weight + 1.0;
468 let m2_update = (new_mean - x) * delta_mean;
469 let new_m2 = self.m2 + m2_update;
470 let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
471 let new_m4 = self.m4
472 + delta_mean_weight
473 * (delta_mean_weight
474 * (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
475 + 4.0 * self.m3);
476
477 self.weight = new_weight;
478 self.mean = new_mean;
479 self.m2 = new_m2;
480 self.m3 = new_m3;
481 self.m4 = new_m4;
482 self.clear_zero_weight_nan();
483 }
484
485 pub fn combine(&mut self, other: &Self) {
486 if other.weight == 0.0 {
487 return;
488 } else if self.weight == 0.0 {
489 *self = other.clone();
490 return;
491 }
492
493 let new_weight = self.weight + other.weight;
494 let delta_mean = other.mean - self.mean;
495 let delta_mean_weight = delta_mean / new_weight;
496 let new_mean = self.mean + other.weight * delta_mean_weight;
497
498 let weight_diff = self.weight - other.weight;
499 let self_weight_other_m2 = self.weight * other.m2;
500 let other_weight_self_m2 = other.weight * self.m2;
501 let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
502 let new_m2 = self.m2 + other.m2 + m2_update;
503 let new_m3 = self.m3
504 + other.m3
505 + delta_mean_weight
506 * (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
507 let new_m4 = self.m4
508 + other.m4
509 + delta_mean_weight
510 * (delta_mean_weight
511 * (m2_update * (self.weight * weight_diff + other.weight * other.weight)
512 + 6.0
513 * (self.weight * self_weight_other_m2
514 + other.weight * other_weight_self_m2))
515 + 4.0 * (self.weight * other.m3 - other.weight * self.m3));
516
517 self.weight = new_weight;
518 self.mean = new_mean;
519 self.m2 = new_m2;
520 self.m3 = new_m3;
521 self.m4 = new_m4;
522 self.clear_zero_weight_nan();
523 }
524
525 pub fn finalize(&self, fisher: bool, bias: bool) -> Option<f64> {
526 let m4 = self.m4 / self.weight;
527 let m2 = self.m2 / self.weight;
528 let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
529 let biased_est = if is_zero { f64::NAN } else { m4 / (m2 * m2) };
530 let out = if bias {
531 if self.weight == 0.0 {
532 return None;
533 }
534
535 biased_est
536 } else {
537 if self.weight <= 3.0 {
538 return None;
539 }
540
541 let n = self.weight;
542 let nm1_nm2 = (n - 1.0) / (n - 2.0);
543 let np1_nm3 = (n + 1.0) / (n - 3.0);
544 let nm1_nm3 = (n - 1.0) / (n - 3.0);
545 nm1_nm2 * (np1_nm3 * biased_est - 3.0 * nm1_nm3) + 3.0
546 };
547
548 if fisher { Some(out - 3.0) } else { Some(out) }
549 }
550}
551
552fn chunk_as_float<T, I, F>(it: I, mut f: F)
553where
554 T: NativeType + AsPrimitive<f64>,
555 I: IntoIterator<Item = T>,
556 F: FnMut(&[f64]),
557{
558 let mut chunk = [0.0; CHUNK_SIZE];
559 let mut i = 0;
560 for val in it {
561 if i >= CHUNK_SIZE {
562 f(&chunk);
563 i = 0;
564 }
565 chunk[i] = val.as_();
566 i += 1;
567 }
568 if i > 0 {
569 f(&chunk[..i]);
570 }
571}
572
573fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
574where
575 T: NativeType + AsPrimitive<f64>,
576 U: NativeType + AsPrimitive<f64>,
577 I: IntoIterator<Item = (T, U)>,
578 F: FnMut(&[f64], &[f64]),
579{
580 let mut left_chunk = [0.0; CHUNK_SIZE];
581 let mut right_chunk = [0.0; CHUNK_SIZE];
582 let mut i = 0;
583 for (l, r) in it {
584 if i >= CHUNK_SIZE {
585 f(&left_chunk, &right_chunk);
586 i = 0;
587 }
588 left_chunk[i] = l.as_();
589 right_chunk[i] = r.as_();
590 i += 1;
591 }
592 if i > 0 {
593 f(&left_chunk[..i], &right_chunk[..i]);
594 }
595}
596
597pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
598where
599 T: NativeType + AsPrimitive<f64>,
600{
601 let mut out = VarState::default();
602 if arr.has_nulls() {
603 chunk_as_float(arr.non_null_values_iter(), |chunk| {
604 out.combine(&VarState::new(chunk))
605 });
606 } else {
607 chunk_as_float(arr.values().iter().copied(), |chunk| {
608 out.combine(&VarState::new(chunk))
609 });
610 }
611 out
612}
613
614pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
615where
616 T: NativeType + AsPrimitive<f64>,
617 U: NativeType + AsPrimitive<f64>,
618{
619 assert!(x.len() == y.len());
620 let mut out = CovState::default();
621 if x.has_nulls() || y.has_nulls() {
622 chunk_as_float_binary(
623 x.iter()
624 .zip(y.iter())
625 .filter_map(|(l, r)| l.copied().zip(r.copied())),
626 |l, r| out.combine(&CovState::new(l, r)),
627 );
628 } else {
629 chunk_as_float_binary(
630 x.values().iter().copied().zip(y.values().iter().copied()),
631 |l, r| out.combine(&CovState::new(l, r)),
632 );
633 }
634 out
635}
636
637pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
638where
639 T: NativeType + AsPrimitive<f64>,
640 U: NativeType + AsPrimitive<f64>,
641{
642 assert!(x.len() == y.len());
643 let mut out = PearsonState::default();
644 if x.has_nulls() || y.has_nulls() {
645 chunk_as_float_binary(
646 x.iter()
647 .zip(y.iter())
648 .filter_map(|(l, r)| l.copied().zip(r.copied())),
649 |l, r| out.combine(&PearsonState::new(l, r)),
650 );
651 } else {
652 chunk_as_float_binary(
653 x.values().iter().copied().zip(y.values().iter().copied()),
654 |l, r| out.combine(&PearsonState::new(l, r)),
655 );
656 }
657 out
658}
659
660pub fn skew<T>(arr: &PrimitiveArray<T>) -> SkewState
661where
662 T: NativeType + AsPrimitive<f64>,
663{
664 let mut out = SkewState::default();
665 if arr.has_nulls() {
666 chunk_as_float(arr.non_null_values_iter(), |chunk| {
667 out.combine(&SkewState::new(chunk))
668 });
669 } else {
670 chunk_as_float(arr.values().iter().copied(), |chunk| {
671 out.combine(&SkewState::new(chunk))
672 });
673 }
674 out
675}
676
677pub fn kurtosis<T>(arr: &PrimitiveArray<T>) -> KurtosisState
678where
679 T: NativeType + AsPrimitive<f64>,
680{
681 let mut out = KurtosisState::default();
682 if arr.has_nulls() {
683 chunk_as_float(arr.non_null_values_iter(), |chunk| {
684 out.combine(&KurtosisState::new(chunk))
685 });
686 } else {
687 chunk_as_float(arr.values().iter().copied(), |chunk| {
688 out.combine(&KurtosisState::new(chunk))
689 });
690 }
691 out
692}