datafusion_physical_expr/intervals/
utils.rs1use std::sync::Arc;
21
22use crate::{
23 PhysicalExpr,
24 expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
25};
26
27use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
28use arrow::datatypes::{DataType, SchemaRef};
29use datafusion_common::{Result, ScalarValue, internal_err};
30use datafusion_expr::Operator;
31use datafusion_expr::interval_arithmetic::Interval;
32
33pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
39 if let Some(binary_expr) = expr.downcast_ref::<BinaryExpr>() {
40 is_operator_supported(binary_expr.op())
41 && check_support(binary_expr.left(), schema)
42 && check_support(binary_expr.right(), schema)
43 } else if let Some(column) = expr.downcast_ref::<Column>() {
44 if let Ok(field) = schema.field_with_name(column.name()) {
45 is_datatype_supported(field.data_type())
46 } else {
47 false
48 }
49 } else if let Some(literal) = expr.downcast_ref::<Literal>() {
50 if let Ok(dt) = literal.data_type(schema) {
51 is_datatype_supported(&dt)
52 } else {
53 false
54 }
55 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
56 check_support(cast.expr(), schema)
57 } else if let Some(negative) = expr.downcast_ref::<NegativeExpr>() {
58 check_support(negative.arg(), schema)
59 } else {
60 false
61 }
62}
63
64pub fn get_inverse_op(op: Operator) -> Result<Operator> {
66 match op {
67 Operator::Plus => Ok(Operator::Minus),
68 Operator::Minus => Ok(Operator::Plus),
69 Operator::Multiply => Ok(Operator::Divide),
70 Operator::Divide => Ok(Operator::Multiply),
71 _ => internal_err!("Interval arithmetic does not support the operator {}", op),
72 }
73}
74
75pub fn is_operator_supported(op: &Operator) -> bool {
77 matches!(
78 op,
79 &Operator::Plus
80 | &Operator::Minus
81 | &Operator::And
82 | &Operator::Gt
83 | &Operator::GtEq
84 | &Operator::Lt
85 | &Operator::LtEq
86 | &Operator::Eq
87 | &Operator::Multiply
88 | &Operator::Divide
89 )
90}
91
92pub fn is_datatype_supported(data_type: &DataType) -> bool {
94 matches!(
95 data_type,
96 &DataType::Int64
97 | &DataType::Int32
98 | &DataType::Int16
99 | &DataType::Int8
100 | &DataType::UInt64
101 | &DataType::UInt32
102 | &DataType::UInt16
103 | &DataType::UInt8
104 | &DataType::Float64
105 | &DataType::Float32
106 | &DataType::Date32
107 | &DataType::Date64
108 | &DataType::Timestamp(_, _)
109 )
110}
111
112pub fn convert_interval_type_to_duration(interval: &Interval) -> Option<Interval> {
114 if let (Some(lower), Some(upper)) = (
115 convert_interval_bound_to_duration(interval.lower()),
116 convert_interval_bound_to_duration(interval.upper()),
117 ) {
118 Interval::try_new(lower, upper).ok()
119 } else {
120 None
121 }
122}
123
124fn convert_interval_bound_to_duration(
126 interval_bound: &ScalarValue,
127) -> Option<ScalarValue> {
128 match interval_bound {
129 ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn)
130 .ok()
131 .map(|duration| ScalarValue::DurationNanosecond(Some(duration))),
132 ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt)
133 .ok()
134 .map(|duration| ScalarValue::DurationMillisecond(Some(duration))),
135 _ => None,
136 }
137}
138
139pub fn convert_duration_type_to_interval(interval: &Interval) -> Option<Interval> {
141 if let (Some(lower), Some(upper)) = (
142 convert_duration_bound_to_interval(interval.lower()),
143 convert_duration_bound_to_interval(interval.upper()),
144 ) {
145 Interval::try_new(lower, upper).ok()
146 } else {
147 None
148 }
149}
150
151fn convert_duration_bound_to_interval(
153 interval_bound: &ScalarValue,
154) -> Option<ScalarValue> {
155 match interval_bound {
156 ScalarValue::DurationNanosecond(Some(duration)) => {
157 Some(ScalarValue::new_interval_mdn(0, 0, *duration))
158 }
159 ScalarValue::DurationMicrosecond(Some(duration)) => {
160 Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000))
161 }
162 ScalarValue::DurationMillisecond(Some(duration)) => {
163 Some(ScalarValue::new_interval_dt(0, *duration as i32))
164 }
165 ScalarValue::DurationSecond(Some(duration)) => {
166 Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000))
167 }
168 _ => None,
169 }
170}
171
172fn interval_mdn_to_duration_ns(mdn: &IntervalMonthDayNano) -> Result<i64> {
175 if mdn.months == 0 && mdn.days == 0 {
176 Ok(mdn.nanoseconds)
177 } else {
178 internal_err!(
179 "The interval cannot have a non-zero month or day value for duration convertibility"
180 )
181 }
182}
183
184fn interval_dt_to_duration_ms(dt: &IntervalDayTime) -> Result<i64> {
187 if dt.days == 0 {
188 Ok(dt.milliseconds as i64)
190 } else {
191 internal_err!(
192 "The interval cannot have a non-zero day value for duration convertibility"
193 )
194 }
195}