use std::any::Any;
use std::num::NonZeroI64;
use std::ops::{Add, Sub};
use std::str::FromStr;
use std::sync::Arc;
use arrow::array::temporal_conversions::{
as_datetime_with_timezone, timestamp_ns_to_datetime,
};
use arrow::array::timezone::Tz;
use arrow::array::types::{
ArrowTimestampType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow::array::{Array, ArrayRef, PrimitiveArray};
use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8, Utf8View};
use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second};
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, plan_err,
};
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, TIMEZONE_WILDCARD, Volatility,
};
use datafusion_macros::user_doc;
use chrono::{
DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, TimeDelta, Timelike,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DateTruncGranularity {
Microsecond,
Millisecond,
Second,
Minute,
Hour,
Day,
Week,
Month,
Quarter,
Year,
}
impl DateTruncGranularity {
const SUPPORTED_GRANULARITIES: &[&str] = &[
"microsecond",
"millisecond",
"second",
"minute",
"hour",
"day",
"week",
"month",
"quarter",
"year",
];
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"microsecond" => Ok(Self::Microsecond),
"millisecond" => Ok(Self::Millisecond),
"second" => Ok(Self::Second),
"minute" => Ok(Self::Minute),
"hour" => Ok(Self::Hour),
"day" => Ok(Self::Day),
"week" => Ok(Self::Week),
"month" => Ok(Self::Month),
"quarter" => Ok(Self::Quarter),
"year" => Ok(Self::Year),
_ => {
let supported = Self::SUPPORTED_GRANULARITIES.join(", ");
exec_err!(
"Unsupported date_trunc granularity: '{s}'. Supported values are: {supported}"
)
}
}
}
fn is_fine_granularity(&self) -> bool {
matches!(
self,
Self::Second | Self::Minute | Self::Millisecond | Self::Microsecond
)
}
fn is_fine_granularity_utc(&self) -> bool {
self.is_fine_granularity() || matches!(self, Self::Hour | Self::Day)
}
}
#[user_doc(
doc_section(label = "Time and Date Functions"),
description = "Truncates a timestamp value to a specified precision.",
syntax_example = "date_trunc(precision, expression)",
argument(
name = "precision",
description = r#"Time precision to truncate to. The following precisions are supported:
- year / YEAR
- quarter / QUARTER
- month / MONTH
- week / WEEK
- day / DAY
- hour / HOUR
- minute / MINUTE
- second / SECOND
- millisecond / MILLISECOND
- microsecond / MICROSECOND
"#
),
argument(
name = "expression",
description = "Time expression to operate on. Can be a constant, column, or function."
)
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct DateTruncFunc {
signature: Signature,
aliases: Vec<String>,
}
impl Default for DateTruncFunc {
fn default() -> Self {
Self::new()
}
}
impl DateTruncFunc {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8View, Timestamp(Nanosecond, None)]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8View, Timestamp(Microsecond, None)]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8View, Timestamp(Millisecond, None)]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![Utf8View, Timestamp(Second, None)]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
]),
],
Volatility::Immutable,
),
aliases: vec![String::from("datetrunc")],
}
}
}
impl ScalarUDFImpl for DateTruncFunc {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"date_trunc"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[1] {
Timestamp(Nanosecond, None) | Utf8 | DataType::Date32 | Null => {
Ok(Timestamp(Nanosecond, None))
}
Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
_ => plan_err!(
"The date_trunc function can only accept timestamp as the second arg."
),
}
}
fn invoke_with_args(
&self,
args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
let args = args.args;
let (granularity, array) = (&args[0], &args[1]);
let granularity_str = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) =
granularity
{
v.to_lowercase()
} else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = granularity
{
v.to_lowercase()
} else {
return exec_err!("Granularity of `date_trunc` must be non-null scalar Utf8");
};
let granularity = DateTruncGranularity::from_str(&granularity_str)?;
fn process_array<T: ArrowTimestampType>(
array: &dyn Array,
granularity: DateTruncGranularity,
tz_opt: &Option<Arc<str>>,
) -> Result<ColumnarValue> {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_primitive_array::<T>(array)?;
if granularity.is_fine_granularity()
|| (parsed_tz.is_none() && granularity.is_fine_granularity_utc())
{
let result = general_date_trunc_array_fine_granularity(
T::UNIT,
array,
granularity,
tz_opt.clone(),
)?;
return Ok(ColumnarValue::Array(result));
}
let array: PrimitiveArray<T> = array
.try_unary(|x| general_date_trunc(T::UNIT, x, parsed_tz, granularity))?
.with_timezone_opt(tz_opt.clone());
Ok(ColumnarValue::Array(Arc::new(array)))
}
fn process_scalar<T: ArrowTimestampType>(
v: &Option<i64>,
granularity: DateTruncGranularity,
tz_opt: &Option<Arc<str>>,
) -> Result<ColumnarValue> {
let parsed_tz = parse_tz(tz_opt)?;
let value = if let Some(v) = v {
Some(general_date_trunc(T::UNIT, *v, parsed_tz, granularity)?)
} else {
None
};
let value = ScalarValue::new_timestamp::<T>(value, tz_opt.clone());
Ok(ColumnarValue::Scalar(value))
}
Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
process_scalar::<TimestampNanosecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
process_scalar::<TimestampMicrosecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
process_scalar::<TimestampMillisecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
process_scalar::<TimestampSecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Array(array) => {
let array_type = array.data_type();
if let Timestamp(unit, tz_opt) = array_type {
match unit {
Second => process_array::<TimestampSecondType>(
array,
granularity,
tz_opt,
)?,
Millisecond => process_array::<TimestampMillisecondType>(
array,
granularity,
tz_opt,
)?,
Microsecond => process_array::<TimestampMicrosecondType>(
array,
granularity,
tz_opt,
)?,
Nanosecond => process_array::<TimestampNanosecondType>(
array,
granularity,
tz_opt,
)?,
}
} else {
return exec_err!(
"second argument of `date_trunc` is an unsupported array type: {array_type}"
);
}
}
_ => {
return exec_err!(
"second argument of `date_trunc` must be timestamp scalar or array"
);
}
})
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
let precision = &input[0];
let date_value = &input[1];
if precision.sort_properties.eq(&SortProperties::Singleton) {
Ok(date_value.sort_properties)
} else {
Ok(SortProperties::Unordered)
}
}
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}
fn _date_trunc_coarse<T>(
granularity: DateTruncGranularity,
value: Option<T>,
) -> Result<Option<T>>
where
T: Datelike + Timelike + Sub<Duration, Output = T> + Copy,
{
let value = match granularity {
DateTruncGranularity::Millisecond => value,
DateTruncGranularity::Microsecond => value,
DateTruncGranularity::Second => value.and_then(|d| d.with_nanosecond(0)),
DateTruncGranularity::Minute => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0)),
DateTruncGranularity::Hour => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0)),
DateTruncGranularity::Day => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0)),
DateTruncGranularity::Week => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.map(|d| {
d - TimeDelta::try_seconds(60 * 60 * 24 * d.weekday() as i64).unwrap()
}),
DateTruncGranularity::Month => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0)),
DateTruncGranularity::Quarter => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month(quarter_month(&d))),
DateTruncGranularity::Year => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month0(0)),
};
Ok(value)
}
fn quarter_month<T>(date: &T) -> u32
where
T: Datelike,
{
1 + 3 * ((date.month() - 1) / 3)
}
fn _date_trunc_coarse_with_tz(
granularity: DateTruncGranularity,
value: Option<DateTime<Tz>>,
) -> Result<Option<i64>> {
if let Some(value) = value {
let local = value.naive_local();
let truncated = _date_trunc_coarse::<NaiveDateTime>(granularity, Some(local))?;
let truncated = truncated.and_then(|truncated| {
match truncated.and_local_timezone(value.timezone()) {
LocalResult::None => {
truncated
.sub(TimeDelta::try_hours(3).unwrap())
.and_local_timezone(value.timezone())
.single()
.map(|v| v.add(TimeDelta::try_hours(3).unwrap()))
}
LocalResult::Single(datetime) => Some(datetime),
LocalResult::Ambiguous(datetime1, datetime2) => {
if datetime1.offset().fix() == value.offset().fix() {
Some(datetime1)
} else {
Some(datetime2)
}
}
}
});
Ok(truncated.and_then(|value| value.timestamp_nanos_opt()))
} else {
_date_trunc_coarse::<NaiveDateTime>(granularity, None)?;
Ok(None)
}
}
fn _date_trunc_coarse_without_tz(
granularity: DateTruncGranularity,
value: Option<NaiveDateTime>,
) -> Result<Option<i64>> {
let value = _date_trunc_coarse::<NaiveDateTime>(granularity, value)?;
Ok(value.and_then(|value| value.and_utc().timestamp_nanos_opt()))
}
fn date_trunc_coarse(
granularity: DateTruncGranularity,
value: i64,
tz: Option<Tz>,
) -> Result<i64> {
let value = match tz {
Some(tz) => {
let value = as_datetime_with_timezone::<TimestampNanosecondType>(value, tz)
.ok_or(exec_datafusion_err!("Timestamp {value} out of range"))?;
_date_trunc_coarse_with_tz(granularity, Some(value))
}
None => {
let value = timestamp_ns_to_datetime(value)
.ok_or_else(|| exec_datafusion_err!("Timestamp {value} out of range"))?;
_date_trunc_coarse_without_tz(granularity, Some(value))
}
}?;
Ok(value.unwrap())
}
fn general_date_trunc_array_fine_granularity<T: ArrowTimestampType>(
tu: TimeUnit,
array: &PrimitiveArray<T>,
granularity: DateTruncGranularity,
tz_opt: Option<Arc<str>>,
) -> Result<ArrayRef> {
let unit = match (tu, granularity) {
(Second, DateTruncGranularity::Minute) => NonZeroI64::new(60),
(Second, DateTruncGranularity::Hour) => NonZeroI64::new(3600),
(Second, DateTruncGranularity::Day) => NonZeroI64::new(86400),
(Millisecond, DateTruncGranularity::Second) => NonZeroI64::new(1_000),
(Millisecond, DateTruncGranularity::Minute) => NonZeroI64::new(60_000),
(Millisecond, DateTruncGranularity::Hour) => NonZeroI64::new(3_600_000),
(Millisecond, DateTruncGranularity::Day) => NonZeroI64::new(86_400_000),
(Microsecond, DateTruncGranularity::Millisecond) => NonZeroI64::new(1_000),
(Microsecond, DateTruncGranularity::Second) => NonZeroI64::new(1_000_000),
(Microsecond, DateTruncGranularity::Minute) => NonZeroI64::new(60_000_000),
(Microsecond, DateTruncGranularity::Hour) => NonZeroI64::new(3_600_000_000),
(Microsecond, DateTruncGranularity::Day) => NonZeroI64::new(86_400_000_000),
(Nanosecond, DateTruncGranularity::Microsecond) => NonZeroI64::new(1_000),
(Nanosecond, DateTruncGranularity::Millisecond) => NonZeroI64::new(1_000_000),
(Nanosecond, DateTruncGranularity::Second) => NonZeroI64::new(1_000_000_000),
(Nanosecond, DateTruncGranularity::Minute) => NonZeroI64::new(60_000_000_000),
(Nanosecond, DateTruncGranularity::Hour) => NonZeroI64::new(3_600_000_000_000),
(Nanosecond, DateTruncGranularity::Day) => NonZeroI64::new(86_400_000_000_000),
_ => None,
};
if let Some(unit) = unit {
let unit = unit.get();
let array = PrimitiveArray::<T>::from_iter_values_with_nulls(
array
.values()
.iter()
.map(|v| *v - i64::rem_euclid(*v, unit)),
array.nulls().cloned(),
)
.with_timezone_opt(tz_opt);
Ok(Arc::new(array))
} else {
Ok(Arc::new(array.clone()))
}
}
fn general_date_trunc(
tu: TimeUnit,
value: i64,
tz: Option<Tz>,
granularity: DateTruncGranularity,
) -> Result<i64, DataFusionError> {
let scale = match tu {
Second => 1_000_000_000,
Millisecond => 1_000_000,
Microsecond => 1_000,
Nanosecond => 1,
};
let nano = date_trunc_coarse(granularity, scale * value, tz)?;
let result = match tu {
Second => match granularity {
DateTruncGranularity::Minute => nano / 1_000_000_000 / 60 * 60,
_ => nano / 1_000_000_000,
},
Millisecond => match granularity {
DateTruncGranularity::Minute => nano / 1_000_000 / 1_000 / 60 * 1_000 * 60,
DateTruncGranularity::Second => nano / 1_000_000 / 1_000 * 1_000,
_ => nano / 1_000_000,
},
Microsecond => match granularity {
DateTruncGranularity::Minute => {
nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000
}
DateTruncGranularity::Second => nano / 1_000 / 1_000_000 * 1_000_000,
DateTruncGranularity::Millisecond => nano / 1_000 / 1_000 * 1_000,
_ => nano / 1_000,
},
_ => match granularity {
DateTruncGranularity::Minute => {
nano / 1_000_000_000 / 60 * 1_000_000_000 * 60
}
DateTruncGranularity::Second => nano / 1_000_000_000 * 1_000_000_000,
DateTruncGranularity::Millisecond => nano / 1_000_000 * 1_000_000,
DateTruncGranularity::Microsecond => nano / 1_000 * 1_000,
_ => nano,
},
};
Ok(result)
}
fn parse_tz(tz: &Option<Arc<str>>) -> Result<Option<Tz>> {
tz.as_ref()
.map(|tz| {
Tz::from_str(tz)
.map_err(|op| exec_datafusion_err!("failed on timezone {tz}: {op:?}"))
})
.transpose()
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::datetime::date_trunc::{
DateTruncFunc, DateTruncGranularity, date_trunc_coarse,
};
use arrow::array::cast::as_primitive_array;
use arrow::array::types::TimestampNanosecondType;
use arrow::array::{Array, TimestampNanosecondArray};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
#[test]
fn date_trunc_test() {
let cases = vec![
(
"2020-09-08T13:42:29.190855Z",
"second",
"2020-09-08T13:42:29.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"minute",
"2020-09-08T13:42:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"hour",
"2020-09-08T13:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"day",
"2020-09-08T00:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"week",
"2020-09-07T00:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"month",
"2020-09-01T00:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"year",
"2020-01-01T00:00:00.000000Z",
),
(
"2021-01-01T13:42:29.190855Z",
"week",
"2020-12-28T00:00:00.000000Z",
),
(
"2020-01-01T13:42:29.190855Z",
"week",
"2019-12-30T00:00:00.000000Z",
),
(
"2020-01-01T13:42:29.190855Z",
"quarter",
"2020-01-01T00:00:00.000000Z",
),
(
"2020-02-01T13:42:29.190855Z",
"quarter",
"2020-01-01T00:00:00.000000Z",
),
(
"2020-03-01T13:42:29.190855Z",
"quarter",
"2020-01-01T00:00:00.000000Z",
),
(
"2020-04-01T13:42:29.190855Z",
"quarter",
"2020-04-01T00:00:00.000000Z",
),
(
"2020-08-01T13:42:29.190855Z",
"quarter",
"2020-07-01T00:00:00.000000Z",
),
(
"2020-11-01T13:42:29.190855Z",
"quarter",
"2020-10-01T00:00:00.000000Z",
),
(
"2020-12-01T13:42:29.190855Z",
"quarter",
"2020-10-01T00:00:00.000000Z",
),
];
cases.iter().for_each(|(original, granularity, expected)| {
let left = string_to_timestamp_nanos(original).unwrap();
let right = string_to_timestamp_nanos(expected).unwrap();
let granularity_enum = DateTruncGranularity::from_str(granularity).unwrap();
let result = date_trunc_coarse(granularity_enum, left, None).unwrap();
assert_eq!(result, right, "{original} = {expected}");
});
}
#[test]
fn test_date_trunc_timezones() {
let cases = [
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
Some("+00".into()),
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
None,
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
Some("-02".into()),
vec![
"2020-09-07T02:00:00Z",
"2020-09-07T02:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T02:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T01:00:00+05",
"2020-09-08T02:00:00+05",
"2020-09-08T03:00:00+05",
"2020-09-08T04:00:00+05",
],
Some("+05".into()),
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
],
),
(
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T01:00:00+08",
"2020-09-08T02:00:00+08",
"2020-09-08T03:00:00+08",
"2020-09-08T04:00:00+08",
],
Some("+08".into()),
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
],
),
(
vec![
"2024-10-26T23:00:00Z",
"2024-10-27T00:00:00Z",
"2024-10-27T01:00:00Z",
"2024-10-27T02:00:00Z",
],
Some("Europe/Berlin".into()),
vec![
"2024-10-27T00:00:00+02",
"2024-10-27T00:00:00+02",
"2024-10-27T00:00:00+02",
"2024-10-27T00:00:00+02",
],
),
(
vec![
"2018-02-18T00:00:00Z",
"2018-02-18T01:00:00Z",
"2018-02-18T02:00:00Z",
"2018-02-18T03:00:00Z",
"2018-11-04T01:00:00Z",
"2018-11-04T02:00:00Z",
"2018-11-04T03:00:00Z",
"2018-11-04T04:00:00Z",
],
Some("America/Sao_Paulo".into()),
vec![
"2018-02-17T00:00:00-02",
"2018-02-17T00:00:00-02",
"2018-02-17T00:00:00-02",
"2018-02-18T00:00:00-03",
"2018-11-03T00:00:00-03",
"2018-11-03T00:00:00-03",
"2018-11-04T01:00:00-02",
"2018-11-04T01:00:00-02",
],
),
];
cases.iter().for_each(|(original, tz_opt, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let batch_len = input.len();
let arg_fields = vec![
Field::new("a", DataType::Utf8, false).into(),
Field::new("b", input.data_type().clone(), false).into(),
];
let args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::from("day")),
ColumnarValue::Array(Arc::new(input)),
],
arg_fields,
number_rows: batch_len,
return_field: Field::new(
"f",
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
true,
)
.into(),
config_options: Arc::new(ConfigOptions::default()),
};
let result = DateTruncFunc::new().invoke_with_args(args).unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
);
let left = as_primitive_array::<TimestampNanosecondType>(&result);
assert_eq!(left, &right);
} else {
panic!("unexpected column type");
}
});
}
#[test]
fn test_date_trunc_hour_timezones() {
let cases = [
(
vec![
"2020-09-08T00:30:00Z",
"2020-09-08T01:30:00Z",
"2020-09-08T02:30:00Z",
"2020-09-08T03:30:00Z",
"2020-09-08T04:30:00Z",
],
Some("+00".into()),
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
),
(
vec![
"2020-09-08T00:30:00Z",
"2020-09-08T01:30:00Z",
"2020-09-08T02:30:00Z",
"2020-09-08T03:30:00Z",
"2020-09-08T04:30:00Z",
],
None,
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
),
(
vec![
"2020-09-08T00:30:00Z",
"2020-09-08T01:30:00Z",
"2020-09-08T02:30:00Z",
"2020-09-08T03:30:00Z",
"2020-09-08T04:30:00Z",
],
Some("-02".into()),
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
),
(
vec![
"2020-09-08T00:30:00+05",
"2020-09-08T01:30:00+05",
"2020-09-08T02:30:00+05",
"2020-09-08T03:30:00+05",
"2020-09-08T04:30:00+05",
],
Some("+05".into()),
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T01:00:00+05",
"2020-09-08T02:00:00+05",
"2020-09-08T03:00:00+05",
"2020-09-08T04:00:00+05",
],
),
(
vec![
"2020-09-08T00:30:00+08",
"2020-09-08T01:30:00+08",
"2020-09-08T02:30:00+08",
"2020-09-08T03:30:00+08",
"2020-09-08T04:30:00+08",
],
Some("+08".into()),
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T01:00:00+08",
"2020-09-08T02:00:00+08",
"2020-09-08T03:00:00+08",
"2020-09-08T04:00:00+08",
],
),
(
vec![
"2024-10-26T23:30:00Z",
"2024-10-27T00:30:00Z",
"2024-10-27T01:30:00Z",
"2024-10-27T02:30:00Z",
],
Some("Europe/Berlin".into()),
vec![
"2024-10-27T01:00:00+02",
"2024-10-27T02:00:00+02",
"2024-10-27T02:00:00+01",
"2024-10-27T03:00:00+01",
],
),
(
vec![
"2018-02-18T00:30:00Z",
"2018-02-18T01:30:00Z",
"2018-02-18T02:30:00Z",
"2018-02-18T03:30:00Z",
"2018-11-04T01:00:00Z",
"2018-11-04T02:00:00Z",
"2018-11-04T03:00:00Z",
"2018-11-04T04:00:00Z",
],
Some("America/Sao_Paulo".into()),
vec![
"2018-02-17T22:00:00-02",
"2018-02-17T23:00:00-02",
"2018-02-17T23:00:00-03",
"2018-02-18T00:00:00-03",
"2018-11-03T22:00:00-03",
"2018-11-03T23:00:00-03",
"2018-11-04T01:00:00-02",
"2018-11-04T02:00:00-02",
],
),
(
vec![
"2024-10-26T23:30:00Z",
"2024-10-27T00:30:00Z",
"2024-10-27T01:30:00Z",
"2024-10-27T02:30:00Z",
],
Some("Asia/Kathmandu".into()), vec![
"2024-10-27T05:00:00+05:45",
"2024-10-27T06:00:00+05:45",
"2024-10-27T07:00:00+05:45",
"2024-10-27T08:00:00+05:45",
],
),
];
cases.iter().for_each(|(original, tz_opt, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let batch_len = input.len();
let arg_fields = vec![
Field::new("a", DataType::Utf8, false).into(),
Field::new("b", input.data_type().clone(), false).into(),
];
let args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::from("hour")),
ColumnarValue::Array(Arc::new(input)),
],
arg_fields,
number_rows: batch_len,
return_field: Field::new(
"f",
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
true,
)
.into(),
config_options: Arc::new(ConfigOptions::default()),
};
let result = DateTruncFunc::new().invoke_with_args(args).unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
);
let left = as_primitive_array::<TimestampNanosecondType>(&result);
assert_eq!(left, &right);
} else {
panic!("unexpected column type");
}
});
}
#[test]
fn test_date_trunc_fine_granularity_timezones() {
let cases = [
(
vec![
"2020-09-08T13:42:29.190855Z",
"2020-09-08T13:42:30.500000Z",
"2020-09-08T13:42:31.999999Z",
],
Some("+00".into()),
"second",
vec![
"2020-09-08T13:42:29.000000Z",
"2020-09-08T13:42:30.000000Z",
"2020-09-08T13:42:31.000000Z",
],
),
(
vec![
"2020-09-08T13:42:29.190855+05",
"2020-09-08T13:42:30.500000+05",
"2020-09-08T13:42:31.999999+05",
],
Some("+05".into()),
"second",
vec![
"2020-09-08T13:42:29.000000+05",
"2020-09-08T13:42:30.000000+05",
"2020-09-08T13:42:31.000000+05",
],
),
(
vec![
"2020-09-08T13:42:29.190855Z",
"2020-09-08T13:42:30.500000Z",
"2020-09-08T13:42:31.999999Z",
],
Some("Europe/Berlin".into()),
"second",
vec![
"2020-09-08T13:42:29.000000Z",
"2020-09-08T13:42:30.000000Z",
"2020-09-08T13:42:31.000000Z",
],
),
(
vec![
"2020-09-08T13:42:29.190855Z",
"2020-09-08T13:43:30.500000Z",
"2020-09-08T13:44:31.999999Z",
],
Some("+00".into()),
"minute",
vec![
"2020-09-08T13:42:00.000000Z",
"2020-09-08T13:43:00.000000Z",
"2020-09-08T13:44:00.000000Z",
],
),
(
vec![
"2020-09-08T13:42:29.190855+08",
"2020-09-08T13:43:30.500000+08",
"2020-09-08T13:44:31.999999+08",
],
Some("+08".into()),
"minute",
vec![
"2020-09-08T13:42:00.000000+08",
"2020-09-08T13:43:00.000000+08",
"2020-09-08T13:44:00.000000+08",
],
),
(
vec![
"2020-09-08T13:42:29.190855Z",
"2020-09-08T13:43:30.500000Z",
"2020-09-08T13:44:31.999999Z",
],
Some("America/Sao_Paulo".into()),
"minute",
vec![
"2020-09-08T13:42:00.000000Z",
"2020-09-08T13:43:00.000000Z",
"2020-09-08T13:44:00.000000Z",
],
),
(
vec![
"2020-09-08T13:42:29.190855Z",
"2020-09-08T13:43:30.500000Z",
"2020-09-08T13:44:31.999999Z",
],
None,
"minute",
vec![
"2020-09-08T13:42:00.000000Z",
"2020-09-08T13:43:00.000000Z",
"2020-09-08T13:44:00.000000Z",
],
),
(
vec![
"2020-09-08T13:42:29.190855Z",
"2020-09-08T13:42:29.191999Z",
"2020-09-08T13:42:29.192500Z",
],
Some("Asia/Kolkata".into()),
"millisecond",
vec![
"2020-09-08T19:12:29.190000+05:30",
"2020-09-08T19:12:29.191000+05:30",
"2020-09-08T19:12:29.192000+05:30",
],
),
];
cases
.iter()
.for_each(|(original, tz_opt, granularity, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let batch_len = input.len();
let arg_fields = vec![
Field::new("a", DataType::Utf8, false).into(),
Field::new("b", input.data_type().clone(), false).into(),
];
let args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::from(*granularity)),
ColumnarValue::Array(Arc::new(input)),
],
arg_fields,
number_rows: batch_len,
return_field: Field::new(
"f",
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
true,
)
.into(),
config_options: Arc::new(ConfigOptions::default()),
};
let result = DateTruncFunc::new().invoke_with_args(args).unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
"Failed for granularity: {granularity}, timezone: {tz_opt:?}"
);
let left = as_primitive_array::<TimestampNanosecondType>(&result);
assert_eq!(
left, &right,
"Failed for granularity: {granularity}, timezone: {tz_opt:?}"
);
} else {
panic!("unexpected column type");
}
});
}
}