datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25    TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45    doc_section(label = "Time and Date Functions"),
46    description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51    syntax_example = "date_bin(interval, expression, origin-timestamp)",
52    sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
56+---------------------+
57| bin                 |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
65> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
67+---------------------+
68| bin                 |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75    argument(name = "interval", description = "Bin interval."),
76    argument(
77        name = "expression",
78        description = "Time expression to operate on. Can be a constant, column, or function."
79    ),
80    argument(
81        name = "origin-timestamp",
82        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84    - nanoseconds
85    - microseconds
86    - milliseconds
87    - seconds
88    - minutes
89    - hours
90    - days
91    - weeks
92    - months
93    - years
94    - century
95"#
96    )
97)]
98#[derive(Debug, PartialEq, Eq, Hash)]
99pub struct DateBinFunc {
100    signature: Signature,
101}
102
103impl Default for DateBinFunc {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109impl DateBinFunc {
110    pub fn new() -> Self {
111        let base_sig = |array_type: TimeUnit| {
112            vec![
113                Exact(vec![
114                    DataType::Interval(MonthDayNano),
115                    Timestamp(array_type, None),
116                    Timestamp(Nanosecond, None),
117                ]),
118                Exact(vec![
119                    DataType::Interval(MonthDayNano),
120                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122                ]),
123                Exact(vec![
124                    DataType::Interval(DayTime),
125                    Timestamp(array_type, None),
126                    Timestamp(Nanosecond, None),
127                ]),
128                Exact(vec![
129                    DataType::Interval(DayTime),
130                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132                ]),
133                Exact(vec![
134                    DataType::Interval(MonthDayNano),
135                    Timestamp(array_type, None),
136                ]),
137                Exact(vec![
138                    DataType::Interval(MonthDayNano),
139                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140                ]),
141                Exact(vec![
142                    DataType::Interval(DayTime),
143                    Timestamp(array_type, None),
144                ]),
145                Exact(vec![
146                    DataType::Interval(DayTime),
147                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148                ]),
149            ]
150        };
151
152        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153            .into_iter()
154            .map(base_sig)
155            .collect::<Vec<_>>()
156            .concat();
157
158        Self {
159            signature: Signature::one_of(full_sig, Volatility::Immutable),
160        }
161    }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn name(&self) -> &str {
170        "date_bin"
171    }
172
173    fn signature(&self) -> &Signature {
174        &self.signature
175    }
176
177    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178        match &arg_types[1] {
179            Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180            Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181            Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182            Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183            Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184            _ => plan_err!(
185                "The date_bin function can only accept timestamp as the second arg."
186            ),
187        }
188    }
189
190    fn invoke_with_args(
191        &self,
192        args: datafusion_expr::ScalarFunctionArgs,
193    ) -> Result<ColumnarValue> {
194        let args = &args.args;
195        if args.len() == 2 {
196            // Default to unix EPOCH
197            let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198                Some(0),
199                Some("+00:00".into()),
200            ));
201            date_bin_impl(&args[0], &args[1], &origin)
202        } else if args.len() == 3 {
203            date_bin_impl(&args[0], &args[1], &args[2])
204        } else {
205            exec_err!("DATE_BIN expected two or three arguments")
206        }
207    }
208
209    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210        // The DATE_BIN function preserves the order of its second argument.
211        let step = &input[0];
212        let date_value = &input[1];
213        let reference = input.get(2);
214
215        if step.sort_properties.eq(&SortProperties::Singleton)
216            && reference
217                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218                .unwrap_or(true)
219        {
220            Ok(date_value.sort_properties)
221        } else {
222            Ok(SortProperties::Unordered)
223        }
224    }
225    fn documentation(&self) -> Option<&Documentation> {
226        self.doc()
227    }
228}
229
230enum Interval {
231    Nanoseconds(i64),
232    Months(i64),
233}
234
235impl Interval {
236    /// Returns (`stride_nanos`, `fn`) where
237    ///
238    /// 1. `stride_nanos` is a width, in nanoseconds
239    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
240    ///
241    /// `source` is the timestamp being binned
242    ///
243    /// `origin`  is the time, in nanoseconds, where windows are measured from
244    fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245        match self {
246            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247            Interval::Months(months) => (*months, date_bin_months_interval),
248        }
249    }
250}
251
252// return time in nanoseconds that the source timestamp falls into based on the stride and origin
253fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254    let time_diff = source - origin;
255
256    // distance from origin to bin
257    let time_delta = compute_distance(time_diff, stride_nanos);
258
259    origin + time_delta
260}
261
262// distance from origin to bin
263fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264    let time_delta = time_diff - (time_diff % stride);
265
266    if time_diff < 0 && stride > 1 && time_delta != time_diff {
267        // The origin is later than the source timestamp, round down to the previous bin
268        time_delta - stride
269    } else {
270        time_delta
271    }
272}
273
274// return time in nanoseconds that the source timestamp falls into based on the stride and origin
275fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276    // convert source and origin to DateTime<Utc>
277    let source_date = to_utc_date_time(source);
278    let origin_date = to_utc_date_time(origin);
279
280    // calculate the number of months between the source and origin
281    let month_diff = (source_date.year() - origin_date.year()) * 12
282        + source_date.month() as i32
283        - origin_date.month() as i32;
284
285    // distance from origin to bin
286    let month_delta = compute_distance(month_diff as i64, stride_months);
287
288    let mut bin_time = if month_delta < 0 {
289        origin_date - Months::new(month_delta.unsigned_abs() as u32)
290    } else {
291        origin_date + Months::new(month_delta as u32)
292    };
293
294    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
295    // In this case, we need to move back to previous bin
296    if bin_time > source_date {
297        let month_delta = month_delta - stride_months;
298        bin_time = if month_delta < 0 {
299            origin_date - Months::new(month_delta.unsigned_abs() as u32)
300        } else {
301            origin_date + Months::new(month_delta as u32)
302        };
303    }
304
305    bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309    let secs = nanos / 1_000_000_000;
310    let nsec = (nanos % 1_000_000_000) as u32;
311    DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314// Supported intervals:
315//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
316//     We will assume month interval won't be converted into this type
317//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
318//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
319// 2. IntervalMonthDayNano
320fn date_bin_impl(
321    stride: &ColumnarValue,
322    array: &ColumnarValue,
323    origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325    let stride = match stride {
326        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327            let (days, ms) = IntervalDayTimeType::to_parts(*v);
328            let nanos = (TimeDelta::try_days(days as i64).unwrap()
329                + TimeDelta::try_milliseconds(ms as i64).unwrap())
330            .num_nanoseconds();
331
332            match nanos {
333                Some(v) => Interval::Nanoseconds(v),
334                _ => return exec_err!("DATE_BIN stride argument is too large"),
335            }
336        }
337        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340            // If interval is months, its origin must be midnight of first date of the month
341            if months != 0 {
342                // Return error if days or nanos is not zero
343                if days != 0 || nanos != 0 {
344                    return not_impl_err!(
345                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346                    );
347                } else {
348                    Interval::Months(months as i64)
349                }
350            } else {
351                let nanos = (TimeDelta::try_days(days as i64).unwrap()
352                    + Duration::nanoseconds(nanos))
353                .num_nanoseconds();
354                match nanos {
355                    Some(v) => Interval::Nanoseconds(v),
356                    _ => return exec_err!("DATE_BIN stride argument is too large"),
357                }
358            }
359        }
360        ColumnarValue::Scalar(v) => {
361            return exec_err!(
362                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363                v.data_type()
364            );
365        }
366        ColumnarValue::Array(_) => {
367            return not_impl_err!(
368            "DATE_BIN only supports literal values for the stride argument, not arrays"
369        );
370        }
371    };
372
373    let origin = match origin {
374        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375        ColumnarValue::Scalar(v) => {
376            return exec_err!(
377                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378                v.data_type()
379            );
380        }
381        ColumnarValue::Array(_) => {
382            return not_impl_err!(
383            "DATE_BIN only supports literal values for the origin argument, not arrays"
384        );
385        }
386    };
387
388    let (stride, stride_fn) = stride.bin_fn();
389
390    // Return error if stride is 0
391    if stride == 0 {
392        return exec_err!("DATE_BIN stride must be non-zero");
393    }
394
395    fn stride_map_fn<T: ArrowTimestampType>(
396        origin: i64,
397        stride: i64,
398        stride_fn: fn(i64, i64, i64) -> i64,
399    ) -> impl Fn(i64) -> i64 {
400        let scale = match T::UNIT {
401            Nanosecond => 1,
402            Microsecond => NANOSECONDS / 1_000_000,
403            Millisecond => NANOSECONDS / 1_000,
404            Second => NANOSECONDS,
405        };
406        move |x: i64| stride_fn(stride, x * scale, origin) / scale
407    }
408
409    Ok(match array {
410        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411            let apply_stride_fn =
412                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414                v.map(apply_stride_fn),
415                tz_opt.clone(),
416            ))
417        }
418        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419            let apply_stride_fn =
420                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422                v.map(apply_stride_fn),
423                tz_opt.clone(),
424            ))
425        }
426        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427            let apply_stride_fn =
428                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430                v.map(apply_stride_fn),
431                tz_opt.clone(),
432            ))
433        }
434        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435            let apply_stride_fn =
436                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438                v.map(apply_stride_fn),
439                tz_opt.clone(),
440            ))
441        }
442
443        ColumnarValue::Array(array) => {
444            fn transform_array_with_stride<T>(
445                origin: i64,
446                stride: i64,
447                stride_fn: fn(i64, i64, i64) -> i64,
448                array: &ArrayRef,
449                tz_opt: &Option<Arc<str>>,
450            ) -> Result<ColumnarValue>
451            where
452                T: ArrowTimestampType,
453            {
454                let array = as_primitive_array::<T>(array)?;
455                let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456                let array: PrimitiveArray<T> = array
457                    .unary(apply_stride_fn)
458                    .with_timezone_opt(tz_opt.clone());
459
460                Ok(ColumnarValue::Array(Arc::new(array)))
461            }
462
463            match array.data_type() {
464                Timestamp(Nanosecond, tz_opt) => {
465                    transform_array_with_stride::<TimestampNanosecondType>(
466                        origin, stride, stride_fn, array, tz_opt,
467                    )?
468                }
469                Timestamp(Microsecond, tz_opt) => {
470                    transform_array_with_stride::<TimestampMicrosecondType>(
471                        origin, stride, stride_fn, array, tz_opt,
472                    )?
473                }
474                Timestamp(Millisecond, tz_opt) => {
475                    transform_array_with_stride::<TimestampMillisecondType>(
476                        origin, stride, stride_fn, array, tz_opt,
477                    )?
478                }
479                Timestamp(Second, tz_opt) => {
480                    transform_array_with_stride::<TimestampSecondType>(
481                        origin, stride, stride_fn, array, tz_opt,
482                    )?
483                }
484                _ => {
485                    return exec_err!(
486                        "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487                        array.data_type()
488                    );
489                }
490            }
491        }
492        _ => {
493            return exec_err!(
494                "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495            );
496        }
497    })
498}
499
500#[cfg(test)]
501mod tests {
502    use std::sync::Arc;
503
504    use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505    use arrow::array::types::TimestampNanosecondType;
506    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508    use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
509
510    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511    use datafusion_common::{DataFusionError, ScalarValue};
512    use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514    use chrono::TimeDelta;
515    use datafusion_common::config::ConfigOptions;
516
517    fn invoke_date_bin_with_args(
518        args: Vec<ColumnarValue>,
519        number_rows: usize,
520        return_field: &FieldRef,
521    ) -> Result<ColumnarValue, DataFusionError> {
522        let arg_fields = args
523            .iter()
524            .map(|arg| Field::new("a", arg.data_type(), true).into())
525            .collect::<Vec<_>>();
526
527        let args = datafusion_expr::ScalarFunctionArgs {
528            args,
529            arg_fields,
530            number_rows,
531            return_field: Arc::clone(return_field),
532            config_options: Arc::new(ConfigOptions::default()),
533        };
534        DateBinFunc::new().invoke_with_args(args)
535    }
536
537    #[test]
538    fn test_date_bin() {
539        let return_field = &Arc::new(Field::new(
540            "f",
541            DataType::Timestamp(TimeUnit::Nanosecond, None),
542            true,
543        ));
544
545        let mut args = vec![
546            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
547                days: 0,
548                milliseconds: 1,
549            }))),
550            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
551            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
552        ];
553        let res = invoke_date_bin_with_args(args, 1, return_field);
554        assert!(res.is_ok());
555
556        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
557        let batch_len = timestamps.len();
558        args = vec![
559            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
560                days: 0,
561                milliseconds: 1,
562            }))),
563            ColumnarValue::Array(timestamps),
564            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
565        ];
566        let res = invoke_date_bin_with_args(args, batch_len, return_field);
567        assert!(res.is_ok());
568
569        args = vec![
570            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
571                days: 0,
572                milliseconds: 1,
573            }))),
574            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
575        ];
576        let res = invoke_date_bin_with_args(args, 1, return_field);
577        assert!(res.is_ok());
578
579        // stride supports month-day-nano
580        args = vec![
581            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
582                IntervalMonthDayNano {
583                    months: 0,
584                    days: 0,
585                    nanoseconds: 1,
586                },
587            ))),
588            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
589            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
590        ];
591        let res = invoke_date_bin_with_args(args, 1, return_field);
592        assert!(res.is_ok());
593
594        //
595        // Fallible test cases
596        //
597
598        // invalid number of arguments
599        args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
600            IntervalDayTime {
601                days: 0,
602                milliseconds: 1,
603            },
604        )))];
605        let res = invoke_date_bin_with_args(args, 1, return_field);
606        assert_eq!(
607            res.err().unwrap().strip_backtrace(),
608            "Execution error: DATE_BIN expected two or three arguments"
609        );
610
611        // stride: invalid type
612        args = vec![
613            ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
614            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
615            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
616        ];
617        let res = invoke_date_bin_with_args(args, 1, return_field);
618        assert_eq!(
619            res.err().unwrap().strip_backtrace(),
620            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
621        );
622
623        // stride: invalid value
624
625        args = vec![
626            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
627                days: 0,
628                milliseconds: 0,
629            }))),
630            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
631            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
632        ];
633
634        let res = invoke_date_bin_with_args(args, 1, return_field);
635        assert_eq!(
636            res.err().unwrap().strip_backtrace(),
637            "Execution error: DATE_BIN stride must be non-zero"
638        );
639
640        // stride: overflow of day-time interval
641        args = vec![
642            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
643                IntervalDayTime::MAX,
644            ))),
645            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
646            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
647        ];
648        let res = invoke_date_bin_with_args(args, 1, return_field);
649        assert_eq!(
650            res.err().unwrap().strip_backtrace(),
651            "Execution error: DATE_BIN stride argument is too large"
652        );
653
654        // stride: overflow of month-day-nano interval
655        args = vec![
656            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
657            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
658            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
659        ];
660        let res = invoke_date_bin_with_args(args, 1, return_field);
661        assert_eq!(
662            res.err().unwrap().strip_backtrace(),
663            "Execution error: DATE_BIN stride argument is too large"
664        );
665
666        // stride: month intervals
667        args = vec![
668            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
669            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
670            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
671        ];
672        let res = invoke_date_bin_with_args(args, 1, return_field);
673        assert_eq!(
674            res.err().unwrap().strip_backtrace(),
675            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
676        );
677
678        // origin: invalid type
679        args = vec![
680            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
681                days: 0,
682                milliseconds: 1,
683            }))),
684            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
685            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
686        ];
687        let res = invoke_date_bin_with_args(args, 1, return_field);
688        assert_eq!(
689            res.err().unwrap().strip_backtrace(),
690            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
691        );
692
693        args = vec![
694            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
695                days: 0,
696                milliseconds: 1,
697            }))),
698            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
699            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
700        ];
701        let res = invoke_date_bin_with_args(args, 1, return_field);
702        assert!(res.is_ok());
703
704        // unsupported array type for stride
705        let intervals = Arc::new(
706            (1..6)
707                .map(|x| {
708                    Some(IntervalDayTime {
709                        days: 0,
710                        milliseconds: x,
711                    })
712                })
713                .collect::<IntervalDayTimeArray>(),
714        );
715        args = vec![
716            ColumnarValue::Array(intervals),
717            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
718            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
719        ];
720        let res = invoke_date_bin_with_args(args, 1, return_field);
721        assert_eq!(
722            res.err().unwrap().strip_backtrace(),
723            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
724        );
725
726        // unsupported array type for origin
727        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
728        let batch_len = timestamps.len();
729        args = vec![
730            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
731                days: 0,
732                milliseconds: 1,
733            }))),
734            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
735            ColumnarValue::Array(timestamps),
736        ];
737        let res = invoke_date_bin_with_args(args, batch_len, return_field);
738        assert_eq!(
739            res.err().unwrap().strip_backtrace(),
740            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
741        );
742    }
743
744    #[test]
745    fn test_date_bin_timezones() {
746        let cases = vec![
747            (
748                vec![
749                    "2020-09-08T00:00:00Z",
750                    "2020-09-08T01:00:00Z",
751                    "2020-09-08T02:00:00Z",
752                    "2020-09-08T03:00:00Z",
753                    "2020-09-08T04:00:00Z",
754                ],
755                Some("+00".into()),
756                "1970-01-01T00:00:00Z",
757                vec![
758                    "2020-09-08T00:00:00Z",
759                    "2020-09-08T00:00:00Z",
760                    "2020-09-08T00:00:00Z",
761                    "2020-09-08T00:00:00Z",
762                    "2020-09-08T00:00:00Z",
763                ],
764            ),
765            (
766                vec![
767                    "2020-09-08T00:00:00Z",
768                    "2020-09-08T01:00:00Z",
769                    "2020-09-08T02:00:00Z",
770                    "2020-09-08T03:00:00Z",
771                    "2020-09-08T04:00:00Z",
772                ],
773                None,
774                "1970-01-01T00:00:00Z",
775                vec![
776                    "2020-09-08T00:00:00Z",
777                    "2020-09-08T00:00:00Z",
778                    "2020-09-08T00:00:00Z",
779                    "2020-09-08T00:00:00Z",
780                    "2020-09-08T00:00:00Z",
781                ],
782            ),
783            (
784                vec![
785                    "2020-09-08T00:00:00Z",
786                    "2020-09-08T01:00:00Z",
787                    "2020-09-08T02:00:00Z",
788                    "2020-09-08T03:00:00Z",
789                    "2020-09-08T04:00:00Z",
790                ],
791                Some("-02".into()),
792                "1970-01-01T00:00:00Z",
793                vec![
794                    "2020-09-08T00:00:00Z",
795                    "2020-09-08T00:00:00Z",
796                    "2020-09-08T00:00:00Z",
797                    "2020-09-08T00:00:00Z",
798                    "2020-09-08T00:00:00Z",
799                ],
800            ),
801            (
802                vec![
803                    "2020-09-08T00:00:00+05",
804                    "2020-09-08T01:00:00+05",
805                    "2020-09-08T02:00:00+05",
806                    "2020-09-08T03:00:00+05",
807                    "2020-09-08T04:00:00+05",
808                ],
809                Some("+05".into()),
810                "1970-01-01T00:00:00+05",
811                vec![
812                    "2020-09-08T00:00:00+05",
813                    "2020-09-08T00:00:00+05",
814                    "2020-09-08T00:00:00+05",
815                    "2020-09-08T00:00:00+05",
816                    "2020-09-08T00:00:00+05",
817                ],
818            ),
819            (
820                vec![
821                    "2020-09-08T00:00:00+08",
822                    "2020-09-08T01:00:00+08",
823                    "2020-09-08T02:00:00+08",
824                    "2020-09-08T03:00:00+08",
825                    "2020-09-08T04:00:00+08",
826                ],
827                Some("+08".into()),
828                "1970-01-01T00:00:00+08",
829                vec![
830                    "2020-09-08T00:00:00+08",
831                    "2020-09-08T00:00:00+08",
832                    "2020-09-08T00:00:00+08",
833                    "2020-09-08T00:00:00+08",
834                    "2020-09-08T00:00:00+08",
835                ],
836            ),
837        ];
838
839        cases
840            .iter()
841            .for_each(|(original, tz_opt, origin, expected)| {
842                let input = original
843                    .iter()
844                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
845                    .collect::<TimestampNanosecondArray>()
846                    .with_timezone_opt(tz_opt.clone());
847                let right = expected
848                    .iter()
849                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
850                    .collect::<TimestampNanosecondArray>()
851                    .with_timezone_opt(tz_opt.clone());
852                let batch_len = input.len();
853                let args = vec![
854                    ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
855                    ColumnarValue::Array(Arc::new(input)),
856                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
857                        Some(string_to_timestamp_nanos(origin).unwrap()),
858                        tz_opt.clone(),
859                    )),
860                ];
861                let return_field = &Arc::new(Field::new(
862                    "f",
863                    DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
864                    true,
865                ));
866                let result =
867                    invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
868
869                if let ColumnarValue::Array(result) = result {
870                    assert_eq!(
871                        result.data_type(),
872                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
873                    );
874                    let left = arrow::array::cast::as_primitive_array::<
875                        TimestampNanosecondType,
876                    >(&result);
877                    assert_eq!(left, &right);
878                } else {
879                    panic!("unexpected column type");
880                }
881            });
882    }
883
884    #[test]
885    fn test_date_bin_single() {
886        let cases = vec![
887            (
888                (
889                    TimeDelta::try_minutes(15),
890                    "2004-04-09T02:03:04.123456789Z",
891                    "2001-01-01T00:00:00",
892                ),
893                "2004-04-09T02:00:00Z",
894            ),
895            (
896                (
897                    TimeDelta::try_minutes(15),
898                    "2004-04-09T02:03:04.123456789Z",
899                    "2001-01-01T00:02:30",
900                ),
901                "2004-04-09T02:02:30Z",
902            ),
903            (
904                (
905                    TimeDelta::try_minutes(15),
906                    "2004-04-09T02:03:04.123456789Z",
907                    "2005-01-01T00:02:30",
908                ),
909                "2004-04-09T02:02:30Z",
910            ),
911            (
912                (
913                    TimeDelta::try_hours(1),
914                    "2004-04-09T02:03:04.123456789Z",
915                    "2001-01-01T00:00:00",
916                ),
917                "2004-04-09T02:00:00Z",
918            ),
919            (
920                (
921                    TimeDelta::try_seconds(10),
922                    "2004-04-09T02:03:11.123456789Z",
923                    "2001-01-01T00:00:00",
924                ),
925                "2004-04-09T02:03:10Z",
926            ),
927        ];
928
929        cases
930            .iter()
931            .for_each(|((stride, source, origin), expected)| {
932                let stride = stride.unwrap();
933                let stride1 = stride.num_nanoseconds().unwrap();
934                let source1 = string_to_timestamp_nanos(source).unwrap();
935                let origin1 = string_to_timestamp_nanos(origin).unwrap();
936
937                let expected1 = string_to_timestamp_nanos(expected).unwrap();
938                let result = date_bin_nanos_interval(stride1, source1, origin1);
939                assert_eq!(result, expected1, "{source} = {expected}");
940            })
941    }
942
943    #[test]
944    fn test_date_bin_before_epoch() {
945        let cases = [
946            (
947                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
948                "1969-12-31T23:30:00",
949            ),
950            (
951                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
952                "1969-12-31T23:45:00",
953            ),
954            (
955                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
956                "1969-12-31T23:45:00",
957            ),
958        ];
959
960        cases.iter().for_each(|((stride, source), expected)| {
961            let stride = stride.unwrap();
962            let stride1 = stride.num_nanoseconds().unwrap();
963            let source1 = string_to_timestamp_nanos(source).unwrap();
964
965            let expected1 = string_to_timestamp_nanos(expected).unwrap();
966            let result = date_bin_nanos_interval(stride1, source1, 0);
967            assert_eq!(result, expected1, "{source} = {expected}");
968        })
969    }
970}