Skip to main content

datafusion_spark/function/datetime/
unix.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
22use datafusion_common::types::logical_date;
23use datafusion_common::utils::take_function_args;
24use datafusion_common::{Result, internal_err};
25use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
26use datafusion_expr::{
27    Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs,
28    ScalarUDFImpl, Signature, TypeSignatureClass, Volatility,
29};
30
31/// Returns the number of days since epoch (1970-01-01) for the given date.
32/// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_date>
33#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkUnixDate {
35    signature: Signature,
36}
37
38impl Default for SparkUnixDate {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl SparkUnixDate {
45    pub fn new() -> Self {
46        Self {
47            signature: Signature::coercible(
48                vec![Coercion::new_exact(TypeSignatureClass::Native(
49                    logical_date(),
50                ))],
51                Volatility::Immutable,
52            ),
53        }
54    }
55}
56
57impl ScalarUDFImpl for SparkUnixDate {
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61
62    fn name(&self) -> &str {
63        "unix_date"
64    }
65
66    fn signature(&self) -> &Signature {
67        &self.signature
68    }
69
70    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
71        internal_err!("return_field_from_args should be used instead")
72    }
73
74    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
75        let nullable = args.arg_fields[0].is_nullable();
76        Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
77    }
78
79    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80        internal_err!("invoke_with_args should not be called on SparkUnixDate")
81    }
82
83    fn simplify(
84        &self,
85        args: Vec<Expr>,
86        info: &SimplifyContext,
87    ) -> Result<ExprSimplifyResult> {
88        let [date] = take_function_args(self.name(), args)?;
89        Ok(ExprSimplifyResult::Simplified(
90            date.cast_to(&DataType::Date32, info.schema())?
91                .cast_to(&DataType::Int32, info.schema())?,
92        ))
93    }
94}
95
96#[derive(Debug, PartialEq, Eq, Hash)]
97pub struct SparkUnixTimestamp {
98    time_unit: TimeUnit,
99    signature: Signature,
100    name: &'static str,
101}
102
103impl SparkUnixTimestamp {
104    pub fn new(name: &'static str, time_unit: TimeUnit) -> Self {
105        Self {
106            signature: Signature::coercible(
107                vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
108                Volatility::Immutable,
109            ),
110            time_unit,
111            name,
112        }
113    }
114
115    /// Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
116    /// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_micros>
117    pub fn microseconds() -> Self {
118        Self::new("unix_micros", TimeUnit::Microsecond)
119    }
120
121    /// Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
122    /// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_millis>
123    pub fn milliseconds() -> Self {
124        Self::new("unix_millis", TimeUnit::Millisecond)
125    }
126
127    /// Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
128    /// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_seconds>
129    pub fn seconds() -> Self {
130        Self::new("unix_seconds", TimeUnit::Second)
131    }
132}
133
134impl ScalarUDFImpl for SparkUnixTimestamp {
135    fn as_any(&self) -> &dyn Any {
136        self
137    }
138
139    fn name(&self) -> &str {
140        self.name
141    }
142
143    fn signature(&self) -> &Signature {
144        &self.signature
145    }
146
147    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
148        internal_err!("return_field_from_args should be used instead")
149    }
150
151    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
152        let nullable = args.arg_fields[0].is_nullable();
153        Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable)))
154    }
155
156    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
157        internal_err!("invoke_with_args should not be called on `{}`", self.name())
158    }
159
160    fn simplify(
161        &self,
162        args: Vec<Expr>,
163        info: &SimplifyContext,
164    ) -> Result<ExprSimplifyResult> {
165        let [ts] = take_function_args(self.name(), args)?;
166        Ok(ExprSimplifyResult::Simplified(
167            ts.cast_to(
168                &DataType::Timestamp(self.time_unit, Some("UTC".into())),
169                info.schema(),
170            )?
171            .cast_to(&DataType::Int64, info.schema())?,
172        ))
173    }
174}