datafusion_comet_spark_expr/datetime_funcs/
minute.rs1use crate::utils::array_with_timezone;
19use arrow::{
20 compute::{date_part, DatePart},
21 record_batch::RecordBatch,
22};
23use arrow_schema::{DataType, Schema, TimeUnit::Microsecond};
24use datafusion::logical_expr::ColumnarValue;
25use datafusion_common::DataFusionError;
26use datafusion_physical_expr::PhysicalExpr;
27use std::hash::Hash;
28use std::{
29 any::Any,
30 fmt::{Debug, Display, Formatter},
31 sync::Arc,
32};
33
34#[derive(Debug, Eq)]
35pub struct MinuteExpr {
36 child: Arc<dyn PhysicalExpr>,
38 timezone: String,
39}
40
41impl Hash for MinuteExpr {
42 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
43 self.child.hash(state);
44 self.timezone.hash(state);
45 }
46}
47impl PartialEq for MinuteExpr {
48 fn eq(&self, other: &Self) -> bool {
49 self.child.eq(&other.child) && self.timezone.eq(&other.timezone)
50 }
51}
52
53impl MinuteExpr {
54 pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self {
55 MinuteExpr { child, timezone }
56 }
57}
58
59impl Display for MinuteExpr {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 write!(
62 f,
63 "Minute [timezone:{}, child: {}]",
64 self.timezone, self.child
65 )
66 }
67}
68
69impl PhysicalExpr for MinuteExpr {
70 fn as_any(&self) -> &dyn Any {
71 self
72 }
73
74 fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
75 match self.child.data_type(input_schema).unwrap() {
76 DataType::Dictionary(key_type, _) => {
77 Ok(DataType::Dictionary(key_type, Box::new(DataType::Int32)))
78 }
79 _ => Ok(DataType::Int32),
80 }
81 }
82
83 fn nullable(&self, _: &Schema) -> datafusion_common::Result<bool> {
84 Ok(true)
85 }
86
87 fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
88 let arg = self.child.evaluate(batch)?;
89 match arg {
90 ColumnarValue::Array(array) => {
91 let array = array_with_timezone(
92 array,
93 self.timezone.clone(),
94 Some(&DataType::Timestamp(
95 Microsecond,
96 Some(self.timezone.clone().into()),
97 )),
98 )?;
99 let result = date_part(&array, DatePart::Minute)?;
100
101 Ok(ColumnarValue::Array(result))
102 }
103 _ => Err(DataFusionError::Execution(
104 "Minute(scalar) should be fold in Spark JVM side.".to_string(),
105 )),
106 }
107 }
108
109 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
110 vec![&self.child]
111 }
112
113 fn with_new_children(
114 self: Arc<Self>,
115 children: Vec<Arc<dyn PhysicalExpr>>,
116 ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
117 Ok(Arc::new(MinuteExpr::new(
118 Arc::clone(&children[0]),
119 self.timezone.clone(),
120 )))
121 }
122}