Skip to main content

datafusion_spark/function/datetime/
unix.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
21use datafusion_common::types::logical_date;
22use datafusion_common::utils::take_function_args;
23use datafusion_common::{Result, internal_err};
24use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
25use datafusion_expr::{
26    Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs,
27    ScalarUDFImpl, Signature, TypeSignatureClass, Volatility,
28};
29
30/// Returns the number of days since epoch (1970-01-01) for the given date.
31/// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_date>
32#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct SparkUnixDate {
34    signature: Signature,
35}
36
37impl Default for SparkUnixDate {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl SparkUnixDate {
44    pub fn new() -> Self {
45        Self {
46            signature: Signature::coercible(
47                vec![Coercion::new_exact(TypeSignatureClass::Native(
48                    logical_date(),
49                ))],
50                Volatility::Immutable,
51            ),
52        }
53    }
54}
55
56impl ScalarUDFImpl for SparkUnixDate {
57    fn name(&self) -> &str {
58        "unix_date"
59    }
60
61    fn signature(&self) -> &Signature {
62        &self.signature
63    }
64
65    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
66        internal_err!("return_field_from_args should be used instead")
67    }
68
69    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
70        let nullable = args.arg_fields[0].is_nullable();
71        Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
72    }
73
74    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
75        internal_err!("invoke_with_args should not be called on SparkUnixDate")
76    }
77
78    fn simplify(
79        &self,
80        args: Vec<Expr>,
81        info: &SimplifyContext,
82    ) -> Result<ExprSimplifyResult> {
83        let [date] = take_function_args(self.name(), args)?;
84        Ok(ExprSimplifyResult::Simplified(
85            date.cast_to(&DataType::Date32, info.schema())?
86                .cast_to(&DataType::Int32, info.schema())?,
87        ))
88    }
89}
90
91#[derive(Debug, PartialEq, Eq, Hash)]
92pub struct SparkUnixTimestamp {
93    time_unit: TimeUnit,
94    signature: Signature,
95    name: &'static str,
96}
97
98impl SparkUnixTimestamp {
99    pub fn new(name: &'static str, time_unit: TimeUnit) -> Self {
100        Self {
101            signature: Signature::coercible(
102                vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
103                Volatility::Immutable,
104            ),
105            time_unit,
106            name,
107        }
108    }
109
110    /// Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
111    /// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_micros>
112    pub fn microseconds() -> Self {
113        Self::new("unix_micros", TimeUnit::Microsecond)
114    }
115
116    /// Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
117    /// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_millis>
118    pub fn milliseconds() -> Self {
119        Self::new("unix_millis", TimeUnit::Millisecond)
120    }
121
122    /// Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
123    /// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_seconds>
124    pub fn seconds() -> Self {
125        Self::new("unix_seconds", TimeUnit::Second)
126    }
127}
128
129impl ScalarUDFImpl for SparkUnixTimestamp {
130    fn name(&self) -> &str {
131        self.name
132    }
133
134    fn signature(&self) -> &Signature {
135        &self.signature
136    }
137
138    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
139        internal_err!("return_field_from_args should be used instead")
140    }
141
142    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
143        let nullable = args.arg_fields[0].is_nullable();
144        Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable)))
145    }
146
147    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
148        internal_err!("invoke_with_args should not be called on `{}`", self.name())
149    }
150
151    fn simplify(
152        &self,
153        args: Vec<Expr>,
154        info: &SimplifyContext,
155    ) -> Result<ExprSimplifyResult> {
156        let [ts] = take_function_args(self.name(), args)?;
157        Ok(ExprSimplifyResult::Simplified(
158            ts.cast_to(
159                &DataType::Timestamp(self.time_unit, Some("UTC".into())),
160                info.schema(),
161            )?
162            .cast_to(&DataType::Int64, info.schema())?,
163        ))
164    }
165}