1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use arrow::array::PrimitiveArray;
use polars_arrow::prelude::FromData;

use crate::datatypes::PolarsNumericType;
use crate::prelude::*;
use crate::series::arithmetic::coerce_lhs_rhs;
use crate::utils::align_chunks_binary;

fn cmp_binary<T, F>(left: &ChunkedArray<T>, right: &ChunkedArray<T>, op: F) -> ChunkedArray<T>
where
    T: PolarsNumericType,
    F: Fn(T::Native, T::Native) -> T::Native,
{
    let (left, right) = align_chunks_binary(left, right);
    let chunks = left
        .downcast_iter()
        .zip(right.downcast_iter())
        .map(|(left, right)| {
            let values = left
                .values()
                .iter()
                .zip(right.values().iter())
                .map(|(l, r)| op(*l, *r))
                .collect::<Vec<_>>();
            PrimitiveArray::from_data_default(values.into(), None)
        });
    ChunkedArray::from_chunk_iter(left.name(), chunks)
}

fn min_binary<T>(left: &ChunkedArray<T>, right: &ChunkedArray<T>) -> ChunkedArray<T>
where
    T: PolarsNumericType,
    T::Native: PartialOrd,
{
    let op = |l, r| {
        if l < r {
            l
        } else {
            r
        }
    };
    cmp_binary(left, right, op)
}

fn max_binary<T>(left: &ChunkedArray<T>, right: &ChunkedArray<T>) -> ChunkedArray<T>
where
    T: PolarsNumericType,
    T::Native: PartialOrd,
{
    let op = |l, r| {
        if l > r {
            l
        } else {
            r
        }
    };
    cmp_binary(left, right, op)
}

pub(crate) fn min_max_binary_series(
    left: &Series,
    right: &Series,
    min: bool,
) -> PolarsResult<Series> {
    if left.dtype().to_physical().is_numeric()
        && left.null_count() == 0
        && right.null_count() == 0
        && left.len() == right.len()
    {
        let (lhs, rhs) = coerce_lhs_rhs(left, right)?;
        let logical = lhs.dtype();
        let lhs = lhs.to_physical_repr();
        let rhs = rhs.to_physical_repr();

        with_match_physical_numeric_polars_type!(lhs.dtype(), |$T| {
        let a: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref();
        let b: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref();

        if min {
            min_binary(a, b).into_series().cast(logical)
        } else {
            max_binary(a, b).into_series().cast(logical)
            }
        })
    } else {
        let mask = if min {
            left.lt(right)? & left.is_not_null() | right.is_null()
        } else {
            left.gt(right)? & left.is_not_null() | right.is_null()
        };
        left.zip_with(&mask, right)
    }
}