Skip to main content

datafusion_physical_expr/intervals/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility functions for the interval arithmetic library
19
20use std::sync::Arc;
21
22use crate::{
23    PhysicalExpr,
24    expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
25};
26
27use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
28use arrow::datatypes::{DataType, SchemaRef};
29use datafusion_common::{Result, ScalarValue, internal_err};
30use datafusion_expr::Operator;
31use datafusion_expr::interval_arithmetic::Interval;
32
33/// Indicates whether interval arithmetic is supported for the given expression.
34/// Currently, we do not support all [`PhysicalExpr`]s for interval calculations.
35/// We do not support every type of [`Operator`]s either. Over time, this check
36/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
37/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
38pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
39    if let Some(binary_expr) = expr.downcast_ref::<BinaryExpr>() {
40        is_operator_supported(binary_expr.op())
41            && check_support(binary_expr.left(), schema)
42            && check_support(binary_expr.right(), schema)
43    } else if let Some(column) = expr.downcast_ref::<Column>() {
44        if let Ok(field) = schema.field_with_name(column.name()) {
45            is_datatype_supported(field.data_type())
46        } else {
47            false
48        }
49    } else if let Some(literal) = expr.downcast_ref::<Literal>() {
50        if let Ok(dt) = literal.data_type(schema) {
51            is_datatype_supported(&dt)
52        } else {
53            false
54        }
55    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
56        check_support(cast.expr(), schema)
57    } else if let Some(negative) = expr.downcast_ref::<NegativeExpr>() {
58        check_support(negative.arg(), schema)
59    } else {
60        false
61    }
62}
63
64// This function returns the inverse operator of the given operator.
65pub fn get_inverse_op(op: Operator) -> Result<Operator> {
66    match op {
67        Operator::Plus => Ok(Operator::Minus),
68        Operator::Minus => Ok(Operator::Plus),
69        Operator::Multiply => Ok(Operator::Divide),
70        Operator::Divide => Ok(Operator::Multiply),
71        _ => internal_err!("Interval arithmetic does not support the operator {}", op),
72    }
73}
74
75/// Indicates whether interval arithmetic is supported for the given operator.
76pub fn is_operator_supported(op: &Operator) -> bool {
77    matches!(
78        op,
79        &Operator::Plus
80            | &Operator::Minus
81            | &Operator::And
82            | &Operator::Gt
83            | &Operator::GtEq
84            | &Operator::Lt
85            | &Operator::LtEq
86            | &Operator::Eq
87            | &Operator::Multiply
88            | &Operator::Divide
89    )
90}
91
92/// Indicates whether interval arithmetic is supported for the given data type.
93pub fn is_datatype_supported(data_type: &DataType) -> bool {
94    matches!(
95        data_type,
96        &DataType::Int64
97            | &DataType::Int32
98            | &DataType::Int16
99            | &DataType::Int8
100            | &DataType::UInt64
101            | &DataType::UInt32
102            | &DataType::UInt16
103            | &DataType::UInt8
104            | &DataType::Float64
105            | &DataType::Float32
106            | &DataType::Date32
107            | &DataType::Date64
108            | &DataType::Timestamp(_, _)
109    )
110}
111
112/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`].
113pub fn convert_interval_type_to_duration(interval: &Interval) -> Option<Interval> {
114    if let (Some(lower), Some(upper)) = (
115        convert_interval_bound_to_duration(interval.lower()),
116        convert_interval_bound_to_duration(interval.upper()),
117    ) {
118        Interval::try_new(lower, upper).ok()
119    } else {
120        None
121    }
122}
123
124/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`].
125fn convert_interval_bound_to_duration(
126    interval_bound: &ScalarValue,
127) -> Option<ScalarValue> {
128    match interval_bound {
129        ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn)
130            .ok()
131            .map(|duration| ScalarValue::DurationNanosecond(Some(duration))),
132        ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt)
133            .ok()
134            .map(|duration| ScalarValue::DurationMillisecond(Some(duration))),
135        _ => None,
136    }
137}
138
139/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`].
140pub fn convert_duration_type_to_interval(interval: &Interval) -> Option<Interval> {
141    if let (Some(lower), Some(upper)) = (
142        convert_duration_bound_to_interval(interval.lower()),
143        convert_duration_bound_to_interval(interval.upper()),
144    ) {
145        Interval::try_new(lower, upper).ok()
146    } else {
147        None
148    }
149}
150
151/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`].
152fn convert_duration_bound_to_interval(
153    interval_bound: &ScalarValue,
154) -> Option<ScalarValue> {
155    match interval_bound {
156        ScalarValue::DurationNanosecond(Some(duration)) => {
157            Some(ScalarValue::new_interval_mdn(0, 0, *duration))
158        }
159        ScalarValue::DurationMicrosecond(Some(duration)) => {
160            Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000))
161        }
162        ScalarValue::DurationMillisecond(Some(duration)) => {
163            Some(ScalarValue::new_interval_dt(0, *duration as i32))
164        }
165        ScalarValue::DurationSecond(Some(duration)) => {
166            Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000))
167        }
168        _ => None,
169    }
170}
171
172/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part.
173/// Otherwise, it returns an error.
174fn interval_mdn_to_duration_ns(mdn: &IntervalMonthDayNano) -> Result<i64> {
175    if mdn.months == 0 && mdn.days == 0 {
176        Ok(mdn.nanoseconds)
177    } else {
178        internal_err!(
179            "The interval cannot have a non-zero month or day value for duration convertibility"
180        )
181    }
182}
183
184/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part.
185/// Otherwise, it returns an error.
186fn interval_dt_to_duration_ms(dt: &IntervalDayTime) -> Result<i64> {
187    if dt.days == 0 {
188        // Safe to cast i32 to i64
189        Ok(dt.milliseconds as i64)
190    } else {
191        internal_err!(
192            "The interval cannot have a non-zero day value for duration convertibility"
193        )
194    }
195}