datafusion_spark/function/datetime/
unix.rs1use std::sync::Arc;
19
20use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
21use datafusion_common::types::logical_date;
22use datafusion_common::utils::take_function_args;
23use datafusion_common::{Result, internal_err};
24use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
25use datafusion_expr::{
26 Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs,
27 ScalarUDFImpl, Signature, TypeSignatureClass, Volatility,
28};
29
30#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct SparkUnixDate {
34 signature: Signature,
35}
36
37impl Default for SparkUnixDate {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl SparkUnixDate {
44 pub fn new() -> Self {
45 Self {
46 signature: Signature::coercible(
47 vec![Coercion::new_exact(TypeSignatureClass::Native(
48 logical_date(),
49 ))],
50 Volatility::Immutable,
51 ),
52 }
53 }
54}
55
56impl ScalarUDFImpl for SparkUnixDate {
57 fn name(&self) -> &str {
58 "unix_date"
59 }
60
61 fn signature(&self) -> &Signature {
62 &self.signature
63 }
64
65 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
66 internal_err!("return_field_from_args should be used instead")
67 }
68
69 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
70 let nullable = args.arg_fields[0].is_nullable();
71 Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
72 }
73
74 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
75 internal_err!("invoke_with_args should not be called on SparkUnixDate")
76 }
77
78 fn simplify(
79 &self,
80 args: Vec<Expr>,
81 info: &SimplifyContext,
82 ) -> Result<ExprSimplifyResult> {
83 let [date] = take_function_args(self.name(), args)?;
84 Ok(ExprSimplifyResult::Simplified(
85 date.cast_to(&DataType::Date32, info.schema())?
86 .cast_to(&DataType::Int32, info.schema())?,
87 ))
88 }
89}
90
91#[derive(Debug, PartialEq, Eq, Hash)]
92pub struct SparkUnixTimestamp {
93 time_unit: TimeUnit,
94 signature: Signature,
95 name: &'static str,
96}
97
98impl SparkUnixTimestamp {
99 pub fn new(name: &'static str, time_unit: TimeUnit) -> Self {
100 Self {
101 signature: Signature::coercible(
102 vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
103 Volatility::Immutable,
104 ),
105 time_unit,
106 name,
107 }
108 }
109
110 pub fn microseconds() -> Self {
113 Self::new("unix_micros", TimeUnit::Microsecond)
114 }
115
116 pub fn milliseconds() -> Self {
119 Self::new("unix_millis", TimeUnit::Millisecond)
120 }
121
122 pub fn seconds() -> Self {
125 Self::new("unix_seconds", TimeUnit::Second)
126 }
127}
128
129impl ScalarUDFImpl for SparkUnixTimestamp {
130 fn name(&self) -> &str {
131 self.name
132 }
133
134 fn signature(&self) -> &Signature {
135 &self.signature
136 }
137
138 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
139 internal_err!("return_field_from_args should be used instead")
140 }
141
142 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
143 let nullable = args.arg_fields[0].is_nullable();
144 Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable)))
145 }
146
147 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
148 internal_err!("invoke_with_args should not be called on `{}`", self.name())
149 }
150
151 fn simplify(
152 &self,
153 args: Vec<Expr>,
154 info: &SimplifyContext,
155 ) -> Result<ExprSimplifyResult> {
156 let [ts] = take_function_args(self.name(), args)?;
157 Ok(ExprSimplifyResult::Simplified(
158 ts.cast_to(
159 &DataType::Timestamp(self.time_unit, Some("UTC".into())),
160 info.schema(),
161 )?
162 .cast_to(&DataType::Int64, info.schema())?,
163 ))
164 }
165}