datafusion_comet_spark_expr/datetime_funcs/
minute.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::utils::array_with_timezone;
19use arrow::{
20    compute::{date_part, DatePart},
21    record_batch::RecordBatch,
22};
23use arrow_schema::{DataType, Schema, TimeUnit::Microsecond};
24use datafusion::logical_expr::ColumnarValue;
25use datafusion_common::DataFusionError;
26use datafusion_physical_expr::PhysicalExpr;
27use std::hash::Hash;
28use std::{
29    any::Any,
30    fmt::{Debug, Display, Formatter},
31    sync::Arc,
32};
33
34#[derive(Debug, Eq)]
35pub struct MinuteExpr {
36    /// An array with DataType::Timestamp(TimeUnit::Microsecond, None)
37    child: Arc<dyn PhysicalExpr>,
38    timezone: String,
39}
40
41impl Hash for MinuteExpr {
42    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
43        self.child.hash(state);
44        self.timezone.hash(state);
45    }
46}
47impl PartialEq for MinuteExpr {
48    fn eq(&self, other: &Self) -> bool {
49        self.child.eq(&other.child) && self.timezone.eq(&other.timezone)
50    }
51}
52
53impl MinuteExpr {
54    pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self {
55        MinuteExpr { child, timezone }
56    }
57}
58
59impl Display for MinuteExpr {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        write!(
62            f,
63            "Minute [timezone:{}, child: {}]",
64            self.timezone, self.child
65        )
66    }
67}
68
69impl PhysicalExpr for MinuteExpr {
70    fn as_any(&self) -> &dyn Any {
71        self
72    }
73
74    fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
75        match self.child.data_type(input_schema).unwrap() {
76            DataType::Dictionary(key_type, _) => {
77                Ok(DataType::Dictionary(key_type, Box::new(DataType::Int32)))
78            }
79            _ => Ok(DataType::Int32),
80        }
81    }
82
83    fn nullable(&self, _: &Schema) -> datafusion_common::Result<bool> {
84        Ok(true)
85    }
86
87    fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
88        let arg = self.child.evaluate(batch)?;
89        match arg {
90            ColumnarValue::Array(array) => {
91                let array = array_with_timezone(
92                    array,
93                    self.timezone.clone(),
94                    Some(&DataType::Timestamp(
95                        Microsecond,
96                        Some(self.timezone.clone().into()),
97                    )),
98                )?;
99                let result = date_part(&array, DatePart::Minute)?;
100
101                Ok(ColumnarValue::Array(result))
102            }
103            _ => Err(DataFusionError::Execution(
104                "Minute(scalar) should be fold in Spark JVM side.".to_string(),
105            )),
106        }
107    }
108
109    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
110        vec![&self.child]
111    }
112
113    fn with_new_children(
114        self: Arc<Self>,
115        children: Vec<Arc<dyn PhysicalExpr>>,
116    ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
117        Ok(Arc::new(MinuteExpr::new(
118            Arc::clone(&children[0]),
119            self.timezone.clone(),
120        )))
121    }
122}