datafusion_spark/function/datetime/
unix.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
22use datafusion_common::types::logical_date;
23use datafusion_common::utils::take_function_args;
24use datafusion_common::{Result, internal_err};
25use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
26use datafusion_expr::{
27 Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs,
28 ScalarUDFImpl, Signature, TypeSignatureClass, Volatility,
29};
30
31#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkUnixDate {
35 signature: Signature,
36}
37
38impl Default for SparkUnixDate {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl SparkUnixDate {
45 pub fn new() -> Self {
46 Self {
47 signature: Signature::coercible(
48 vec![Coercion::new_exact(TypeSignatureClass::Native(
49 logical_date(),
50 ))],
51 Volatility::Immutable,
52 ),
53 }
54 }
55}
56
57impl ScalarUDFImpl for SparkUnixDate {
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61
62 fn name(&self) -> &str {
63 "unix_date"
64 }
65
66 fn signature(&self) -> &Signature {
67 &self.signature
68 }
69
70 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
71 internal_err!("return_field_from_args should be used instead")
72 }
73
74 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
75 let nullable = args.arg_fields[0].is_nullable();
76 Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
77 }
78
79 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80 internal_err!("invoke_with_args should not be called on SparkUnixDate")
81 }
82
83 fn simplify(
84 &self,
85 args: Vec<Expr>,
86 info: &SimplifyContext,
87 ) -> Result<ExprSimplifyResult> {
88 let [date] = take_function_args(self.name(), args)?;
89 Ok(ExprSimplifyResult::Simplified(
90 date.cast_to(&DataType::Date32, info.schema())?
91 .cast_to(&DataType::Int32, info.schema())?,
92 ))
93 }
94}
95
96#[derive(Debug, PartialEq, Eq, Hash)]
97pub struct SparkUnixTimestamp {
98 time_unit: TimeUnit,
99 signature: Signature,
100 name: &'static str,
101}
102
103impl SparkUnixTimestamp {
104 pub fn new(name: &'static str, time_unit: TimeUnit) -> Self {
105 Self {
106 signature: Signature::coercible(
107 vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
108 Volatility::Immutable,
109 ),
110 time_unit,
111 name,
112 }
113 }
114
115 pub fn microseconds() -> Self {
118 Self::new("unix_micros", TimeUnit::Microsecond)
119 }
120
121 pub fn milliseconds() -> Self {
124 Self::new("unix_millis", TimeUnit::Millisecond)
125 }
126
127 pub fn seconds() -> Self {
130 Self::new("unix_seconds", TimeUnit::Second)
131 }
132}
133
134impl ScalarUDFImpl for SparkUnixTimestamp {
135 fn as_any(&self) -> &dyn Any {
136 self
137 }
138
139 fn name(&self) -> &str {
140 self.name
141 }
142
143 fn signature(&self) -> &Signature {
144 &self.signature
145 }
146
147 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
148 internal_err!("return_field_from_args should be used instead")
149 }
150
151 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
152 let nullable = args.arg_fields[0].is_nullable();
153 Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable)))
154 }
155
156 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
157 internal_err!("invoke_with_args should not be called on `{}`", self.name())
158 }
159
160 fn simplify(
161 &self,
162 args: Vec<Expr>,
163 info: &SimplifyContext,
164 ) -> Result<ExprSimplifyResult> {
165 let [ts] = take_function_args(self.name(), args)?;
166 Ok(ExprSimplifyResult::Simplified(
167 ts.cast_to(
168 &DataType::Timestamp(self.time_unit, Some("UTC".into())),
169 info.schema(),
170 )?
171 .cast_to(&DataType::Int64, info.schema())?,
172 ))
173 }
174}