datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25    TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45    doc_section(label = "Time and Date Functions"),
46    description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51    syntax_example = "date_bin(interval, expression, origin-timestamp)",
52    sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
56+---------------------+
57| bin                 |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
65> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
67+---------------------+
68| bin                 |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75    argument(name = "interval", description = "Bin interval."),
76    argument(
77        name = "expression",
78        description = "Time expression to operate on. Can be a constant, column, or function."
79    ),
80    argument(
81        name = "origin-timestamp",
82        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84    - nanoseconds
85    - microseconds
86    - milliseconds
87    - seconds
88    - minutes
89    - hours
90    - days
91    - weeks
92    - months
93    - years
94    - century
95"#
96    )
97)]
98#[derive(Debug)]
99pub struct DateBinFunc {
100    signature: Signature,
101}
102
103impl Default for DateBinFunc {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109impl DateBinFunc {
110    pub fn new() -> Self {
111        let base_sig = |array_type: TimeUnit| {
112            vec![
113                Exact(vec![
114                    DataType::Interval(MonthDayNano),
115                    Timestamp(array_type, None),
116                    Timestamp(Nanosecond, None),
117                ]),
118                Exact(vec![
119                    DataType::Interval(MonthDayNano),
120                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122                ]),
123                Exact(vec![
124                    DataType::Interval(DayTime),
125                    Timestamp(array_type, None),
126                    Timestamp(Nanosecond, None),
127                ]),
128                Exact(vec![
129                    DataType::Interval(DayTime),
130                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132                ]),
133                Exact(vec![
134                    DataType::Interval(MonthDayNano),
135                    Timestamp(array_type, None),
136                ]),
137                Exact(vec![
138                    DataType::Interval(MonthDayNano),
139                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140                ]),
141                Exact(vec![
142                    DataType::Interval(DayTime),
143                    Timestamp(array_type, None),
144                ]),
145                Exact(vec![
146                    DataType::Interval(DayTime),
147                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148                ]),
149            ]
150        };
151
152        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153            .into_iter()
154            .map(base_sig)
155            .collect::<Vec<_>>()
156            .concat();
157
158        Self {
159            signature: Signature::one_of(full_sig, Volatility::Immutable),
160        }
161    }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn name(&self) -> &str {
170        "date_bin"
171    }
172
173    fn signature(&self) -> &Signature {
174        &self.signature
175    }
176
177    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178        match &arg_types[1] {
179            Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180            Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181            Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182            Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183            Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184            _ => plan_err!(
185                "The date_bin function can only accept timestamp as the second arg."
186            ),
187        }
188    }
189
190    fn invoke_with_args(
191        &self,
192        args: datafusion_expr::ScalarFunctionArgs,
193    ) -> Result<ColumnarValue> {
194        let args = &args.args;
195        if args.len() == 2 {
196            // Default to unix EPOCH
197            let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198                Some(0),
199                Some("+00:00".into()),
200            ));
201            date_bin_impl(&args[0], &args[1], &origin)
202        } else if args.len() == 3 {
203            date_bin_impl(&args[0], &args[1], &args[2])
204        } else {
205            exec_err!("DATE_BIN expected two or three arguments")
206        }
207    }
208
209    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210        // The DATE_BIN function preserves the order of its second argument.
211        let step = &input[0];
212        let date_value = &input[1];
213        let reference = input.get(2);
214
215        if step.sort_properties.eq(&SortProperties::Singleton)
216            && reference
217                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218                .unwrap_or(true)
219        {
220            Ok(date_value.sort_properties)
221        } else {
222            Ok(SortProperties::Unordered)
223        }
224    }
225    fn documentation(&self) -> Option<&Documentation> {
226        self.doc()
227    }
228}
229
230enum Interval {
231    Nanoseconds(i64),
232    Months(i64),
233}
234
235impl Interval {
236    /// Returns (`stride_nanos`, `fn`) where
237    ///
238    /// 1. `stride_nanos` is a width, in nanoseconds
239    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
240    ///
241    /// `source` is the timestamp being binned
242    ///
243    /// `origin`  is the time, in nanoseconds, where windows are measured from
244    fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245        match self {
246            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247            Interval::Months(months) => (*months, date_bin_months_interval),
248        }
249    }
250}
251
252// return time in nanoseconds that the source timestamp falls into based on the stride and origin
253fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254    let time_diff = source - origin;
255
256    // distance from origin to bin
257    let time_delta = compute_distance(time_diff, stride_nanos);
258
259    origin + time_delta
260}
261
262// distance from origin to bin
263fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264    let time_delta = time_diff - (time_diff % stride);
265
266    if time_diff < 0 && stride > 1 && time_delta != time_diff {
267        // The origin is later than the source timestamp, round down to the previous bin
268        time_delta - stride
269    } else {
270        time_delta
271    }
272}
273
274// return time in nanoseconds that the source timestamp falls into based on the stride and origin
275fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276    // convert source and origin to DateTime<Utc>
277    let source_date = to_utc_date_time(source);
278    let origin_date = to_utc_date_time(origin);
279
280    // calculate the number of months between the source and origin
281    let month_diff = (source_date.year() - origin_date.year()) * 12
282        + source_date.month() as i32
283        - origin_date.month() as i32;
284
285    // distance from origin to bin
286    let month_delta = compute_distance(month_diff as i64, stride_months);
287
288    let mut bin_time = if month_delta < 0 {
289        origin_date - Months::new(month_delta.unsigned_abs() as u32)
290    } else {
291        origin_date + Months::new(month_delta as u32)
292    };
293
294    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
295    // In this case, we need to move back to previous bin
296    if bin_time > source_date {
297        let month_delta = month_delta - stride_months;
298        bin_time = if month_delta < 0 {
299            origin_date - Months::new(month_delta.unsigned_abs() as u32)
300        } else {
301            origin_date + Months::new(month_delta as u32)
302        };
303    }
304
305    bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309    let secs = nanos / 1_000_000_000;
310    let nsec = (nanos % 1_000_000_000) as u32;
311    DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314// Supported intervals:
315//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
316//     We will assume month interval won't be converted into this type
317//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
318//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
319// 2. IntervalMonthDayNano
320fn date_bin_impl(
321    stride: &ColumnarValue,
322    array: &ColumnarValue,
323    origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325    let stride = match stride {
326        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327            let (days, ms) = IntervalDayTimeType::to_parts(*v);
328            let nanos = (TimeDelta::try_days(days as i64).unwrap()
329                + TimeDelta::try_milliseconds(ms as i64).unwrap())
330            .num_nanoseconds();
331
332            match nanos {
333                Some(v) => Interval::Nanoseconds(v),
334                _ => return exec_err!("DATE_BIN stride argument is too large"),
335            }
336        }
337        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340            // If interval is months, its origin must be midnight of first date of the month
341            if months != 0 {
342                // Return error if days or nanos is not zero
343                if days != 0 || nanos != 0 {
344                    return not_impl_err!(
345                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346                    );
347                } else {
348                    Interval::Months(months as i64)
349                }
350            } else {
351                let nanos = (TimeDelta::try_days(days as i64).unwrap()
352                    + Duration::nanoseconds(nanos))
353                .num_nanoseconds();
354                match nanos {
355                    Some(v) => Interval::Nanoseconds(v),
356                    _ => return exec_err!("DATE_BIN stride argument is too large"),
357                }
358            }
359        }
360        ColumnarValue::Scalar(v) => {
361            return exec_err!(
362                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363                v.data_type()
364            );
365        }
366        ColumnarValue::Array(_) => {
367            return not_impl_err!(
368            "DATE_BIN only supports literal values for the stride argument, not arrays"
369        );
370        }
371    };
372
373    let origin = match origin {
374        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375        ColumnarValue::Scalar(v) => {
376            return exec_err!(
377                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378                v.data_type()
379            );
380        }
381        ColumnarValue::Array(_) => {
382            return not_impl_err!(
383            "DATE_BIN only supports literal values for the origin argument, not arrays"
384        );
385        }
386    };
387
388    let (stride, stride_fn) = stride.bin_fn();
389
390    // Return error if stride is 0
391    if stride == 0 {
392        return exec_err!("DATE_BIN stride must be non-zero");
393    }
394
395    fn stride_map_fn<T: ArrowTimestampType>(
396        origin: i64,
397        stride: i64,
398        stride_fn: fn(i64, i64, i64) -> i64,
399    ) -> impl Fn(i64) -> i64 {
400        let scale = match T::UNIT {
401            Nanosecond => 1,
402            Microsecond => NANOSECONDS / 1_000_000,
403            Millisecond => NANOSECONDS / 1_000,
404            Second => NANOSECONDS,
405        };
406        move |x: i64| stride_fn(stride, x * scale, origin) / scale
407    }
408
409    Ok(match array {
410        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411            let apply_stride_fn =
412                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414                v.map(apply_stride_fn),
415                tz_opt.clone(),
416            ))
417        }
418        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419            let apply_stride_fn =
420                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422                v.map(apply_stride_fn),
423                tz_opt.clone(),
424            ))
425        }
426        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427            let apply_stride_fn =
428                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430                v.map(apply_stride_fn),
431                tz_opt.clone(),
432            ))
433        }
434        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435            let apply_stride_fn =
436                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438                v.map(apply_stride_fn),
439                tz_opt.clone(),
440            ))
441        }
442
443        ColumnarValue::Array(array) => {
444            fn transform_array_with_stride<T>(
445                origin: i64,
446                stride: i64,
447                stride_fn: fn(i64, i64, i64) -> i64,
448                array: &ArrayRef,
449                tz_opt: &Option<Arc<str>>,
450            ) -> Result<ColumnarValue>
451            where
452                T: ArrowTimestampType,
453            {
454                let array = as_primitive_array::<T>(array)?;
455                let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456                let array: PrimitiveArray<T> = array
457                    .unary(apply_stride_fn)
458                    .with_timezone_opt(tz_opt.clone());
459
460                Ok(ColumnarValue::Array(Arc::new(array)))
461            }
462
463            match array.data_type() {
464                Timestamp(Nanosecond, tz_opt) => {
465                    transform_array_with_stride::<TimestampNanosecondType>(
466                        origin, stride, stride_fn, array, tz_opt,
467                    )?
468                }
469                Timestamp(Microsecond, tz_opt) => {
470                    transform_array_with_stride::<TimestampMicrosecondType>(
471                        origin, stride, stride_fn, array, tz_opt,
472                    )?
473                }
474                Timestamp(Millisecond, tz_opt) => {
475                    transform_array_with_stride::<TimestampMillisecondType>(
476                        origin, stride, stride_fn, array, tz_opt,
477                    )?
478                }
479                Timestamp(Second, tz_opt) => {
480                    transform_array_with_stride::<TimestampSecondType>(
481                        origin, stride, stride_fn, array, tz_opt,
482                    )?
483                }
484                _ => {
485                    return exec_err!(
486                        "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487                        array.data_type()
488                    );
489                }
490            }
491        }
492        _ => {
493            return exec_err!(
494                "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495            );
496        }
497    })
498}
499
500#[cfg(test)]
501mod tests {
502    use std::sync::Arc;
503
504    use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505    use arrow::array::types::TimestampNanosecondType;
506    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508    use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
509
510    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511    use datafusion_common::{DataFusionError, ScalarValue};
512    use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514    use chrono::TimeDelta;
515
516    fn invoke_date_bin_with_args(
517        args: Vec<ColumnarValue>,
518        number_rows: usize,
519        return_field: &FieldRef,
520    ) -> Result<ColumnarValue, DataFusionError> {
521        let arg_fields = args
522            .iter()
523            .map(|arg| Field::new("a", arg.data_type(), true).into())
524            .collect::<Vec<_>>();
525
526        let args = datafusion_expr::ScalarFunctionArgs {
527            args,
528            arg_fields,
529            number_rows,
530            return_field: Arc::clone(return_field),
531        };
532        DateBinFunc::new().invoke_with_args(args)
533    }
534
535    #[test]
536    fn test_date_bin() {
537        let return_field = &Arc::new(Field::new(
538            "f",
539            DataType::Timestamp(TimeUnit::Nanosecond, None),
540            true,
541        ));
542
543        let mut args = vec![
544            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
545                days: 0,
546                milliseconds: 1,
547            }))),
548            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
549            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
550        ];
551        let res = invoke_date_bin_with_args(args, 1, return_field);
552        assert!(res.is_ok());
553
554        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
555        let batch_len = timestamps.len();
556        args = vec![
557            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
558                days: 0,
559                milliseconds: 1,
560            }))),
561            ColumnarValue::Array(timestamps),
562            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
563        ];
564        let res = invoke_date_bin_with_args(args, batch_len, return_field);
565        assert!(res.is_ok());
566
567        args = vec![
568            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
569                days: 0,
570                milliseconds: 1,
571            }))),
572            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
573        ];
574        let res = invoke_date_bin_with_args(args, 1, return_field);
575        assert!(res.is_ok());
576
577        // stride supports month-day-nano
578        args = vec![
579            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
580                IntervalMonthDayNano {
581                    months: 0,
582                    days: 0,
583                    nanoseconds: 1,
584                },
585            ))),
586            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
587            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
588        ];
589        let res = invoke_date_bin_with_args(args, 1, return_field);
590        assert!(res.is_ok());
591
592        //
593        // Fallible test cases
594        //
595
596        // invalid number of arguments
597        args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
598            IntervalDayTime {
599                days: 0,
600                milliseconds: 1,
601            },
602        )))];
603        let res = invoke_date_bin_with_args(args, 1, return_field);
604        assert_eq!(
605            res.err().unwrap().strip_backtrace(),
606            "Execution error: DATE_BIN expected two or three arguments"
607        );
608
609        // stride: invalid type
610        args = vec![
611            ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
612            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
613            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
614        ];
615        let res = invoke_date_bin_with_args(args, 1, return_field);
616        assert_eq!(
617            res.err().unwrap().strip_backtrace(),
618            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
619        );
620
621        // stride: invalid value
622
623        args = vec![
624            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
625                days: 0,
626                milliseconds: 0,
627            }))),
628            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
629            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
630        ];
631
632        let res = invoke_date_bin_with_args(args, 1, return_field);
633        assert_eq!(
634            res.err().unwrap().strip_backtrace(),
635            "Execution error: DATE_BIN stride must be non-zero"
636        );
637
638        // stride: overflow of day-time interval
639        args = vec![
640            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
641                IntervalDayTime::MAX,
642            ))),
643            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
644            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
645        ];
646        let res = invoke_date_bin_with_args(args, 1, return_field);
647        assert_eq!(
648            res.err().unwrap().strip_backtrace(),
649            "Execution error: DATE_BIN stride argument is too large"
650        );
651
652        // stride: overflow of month-day-nano interval
653        args = vec![
654            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
655            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
656            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
657        ];
658        let res = invoke_date_bin_with_args(args, 1, return_field);
659        assert_eq!(
660            res.err().unwrap().strip_backtrace(),
661            "Execution error: DATE_BIN stride argument is too large"
662        );
663
664        // stride: month intervals
665        args = vec![
666            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
667            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
668            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
669        ];
670        let res = invoke_date_bin_with_args(args, 1, return_field);
671        assert_eq!(
672            res.err().unwrap().strip_backtrace(),
673            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
674        );
675
676        // origin: invalid type
677        args = vec![
678            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
679                days: 0,
680                milliseconds: 1,
681            }))),
682            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
683            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
684        ];
685        let res = invoke_date_bin_with_args(args, 1, return_field);
686        assert_eq!(
687            res.err().unwrap().strip_backtrace(),
688            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
689        );
690
691        args = vec![
692            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
693                days: 0,
694                milliseconds: 1,
695            }))),
696            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
697            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
698        ];
699        let res = invoke_date_bin_with_args(args, 1, return_field);
700        assert!(res.is_ok());
701
702        // unsupported array type for stride
703        let intervals = Arc::new(
704            (1..6)
705                .map(|x| {
706                    Some(IntervalDayTime {
707                        days: 0,
708                        milliseconds: x,
709                    })
710                })
711                .collect::<IntervalDayTimeArray>(),
712        );
713        args = vec![
714            ColumnarValue::Array(intervals),
715            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
716            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
717        ];
718        let res = invoke_date_bin_with_args(args, 1, return_field);
719        assert_eq!(
720            res.err().unwrap().strip_backtrace(),
721            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
722        );
723
724        // unsupported array type for origin
725        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
726        let batch_len = timestamps.len();
727        args = vec![
728            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
729                days: 0,
730                milliseconds: 1,
731            }))),
732            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
733            ColumnarValue::Array(timestamps),
734        ];
735        let res = invoke_date_bin_with_args(args, batch_len, return_field);
736        assert_eq!(
737            res.err().unwrap().strip_backtrace(),
738            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
739        );
740    }
741
742    #[test]
743    fn test_date_bin_timezones() {
744        let cases = vec![
745            (
746                vec![
747                    "2020-09-08T00:00:00Z",
748                    "2020-09-08T01:00:00Z",
749                    "2020-09-08T02:00:00Z",
750                    "2020-09-08T03:00:00Z",
751                    "2020-09-08T04:00:00Z",
752                ],
753                Some("+00".into()),
754                "1970-01-01T00:00:00Z",
755                vec![
756                    "2020-09-08T00:00:00Z",
757                    "2020-09-08T00:00:00Z",
758                    "2020-09-08T00:00:00Z",
759                    "2020-09-08T00:00:00Z",
760                    "2020-09-08T00:00:00Z",
761                ],
762            ),
763            (
764                vec![
765                    "2020-09-08T00:00:00Z",
766                    "2020-09-08T01:00:00Z",
767                    "2020-09-08T02:00:00Z",
768                    "2020-09-08T03:00:00Z",
769                    "2020-09-08T04:00:00Z",
770                ],
771                None,
772                "1970-01-01T00:00:00Z",
773                vec![
774                    "2020-09-08T00:00:00Z",
775                    "2020-09-08T00:00:00Z",
776                    "2020-09-08T00:00:00Z",
777                    "2020-09-08T00:00:00Z",
778                    "2020-09-08T00:00:00Z",
779                ],
780            ),
781            (
782                vec![
783                    "2020-09-08T00:00:00Z",
784                    "2020-09-08T01:00:00Z",
785                    "2020-09-08T02:00:00Z",
786                    "2020-09-08T03:00:00Z",
787                    "2020-09-08T04:00:00Z",
788                ],
789                Some("-02".into()),
790                "1970-01-01T00:00:00Z",
791                vec![
792                    "2020-09-08T00:00:00Z",
793                    "2020-09-08T00:00:00Z",
794                    "2020-09-08T00:00:00Z",
795                    "2020-09-08T00:00:00Z",
796                    "2020-09-08T00:00:00Z",
797                ],
798            ),
799            (
800                vec![
801                    "2020-09-08T00:00:00+05",
802                    "2020-09-08T01:00:00+05",
803                    "2020-09-08T02:00:00+05",
804                    "2020-09-08T03:00:00+05",
805                    "2020-09-08T04:00:00+05",
806                ],
807                Some("+05".into()),
808                "1970-01-01T00:00:00+05",
809                vec![
810                    "2020-09-08T00:00:00+05",
811                    "2020-09-08T00:00:00+05",
812                    "2020-09-08T00:00:00+05",
813                    "2020-09-08T00:00:00+05",
814                    "2020-09-08T00:00:00+05",
815                ],
816            ),
817            (
818                vec![
819                    "2020-09-08T00:00:00+08",
820                    "2020-09-08T01:00:00+08",
821                    "2020-09-08T02:00:00+08",
822                    "2020-09-08T03:00:00+08",
823                    "2020-09-08T04:00:00+08",
824                ],
825                Some("+08".into()),
826                "1970-01-01T00:00:00+08",
827                vec![
828                    "2020-09-08T00:00:00+08",
829                    "2020-09-08T00:00:00+08",
830                    "2020-09-08T00:00:00+08",
831                    "2020-09-08T00:00:00+08",
832                    "2020-09-08T00:00:00+08",
833                ],
834            ),
835        ];
836
837        cases
838            .iter()
839            .for_each(|(original, tz_opt, origin, expected)| {
840                let input = original
841                    .iter()
842                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
843                    .collect::<TimestampNanosecondArray>()
844                    .with_timezone_opt(tz_opt.clone());
845                let right = expected
846                    .iter()
847                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
848                    .collect::<TimestampNanosecondArray>()
849                    .with_timezone_opt(tz_opt.clone());
850                let batch_len = input.len();
851                let args = vec![
852                    ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
853                    ColumnarValue::Array(Arc::new(input)),
854                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
855                        Some(string_to_timestamp_nanos(origin).unwrap()),
856                        tz_opt.clone(),
857                    )),
858                ];
859                let return_field = &Arc::new(Field::new(
860                    "f",
861                    DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
862                    true,
863                ));
864                let result =
865                    invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
866
867                if let ColumnarValue::Array(result) = result {
868                    assert_eq!(
869                        result.data_type(),
870                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
871                    );
872                    let left = arrow::array::cast::as_primitive_array::<
873                        TimestampNanosecondType,
874                    >(&result);
875                    assert_eq!(left, &right);
876                } else {
877                    panic!("unexpected column type");
878                }
879            });
880    }
881
882    #[test]
883    fn test_date_bin_single() {
884        let cases = vec![
885            (
886                (
887                    TimeDelta::try_minutes(15),
888                    "2004-04-09T02:03:04.123456789Z",
889                    "2001-01-01T00:00:00",
890                ),
891                "2004-04-09T02:00:00Z",
892            ),
893            (
894                (
895                    TimeDelta::try_minutes(15),
896                    "2004-04-09T02:03:04.123456789Z",
897                    "2001-01-01T00:02:30",
898                ),
899                "2004-04-09T02:02:30Z",
900            ),
901            (
902                (
903                    TimeDelta::try_minutes(15),
904                    "2004-04-09T02:03:04.123456789Z",
905                    "2005-01-01T00:02:30",
906                ),
907                "2004-04-09T02:02:30Z",
908            ),
909            (
910                (
911                    TimeDelta::try_hours(1),
912                    "2004-04-09T02:03:04.123456789Z",
913                    "2001-01-01T00:00:00",
914                ),
915                "2004-04-09T02:00:00Z",
916            ),
917            (
918                (
919                    TimeDelta::try_seconds(10),
920                    "2004-04-09T02:03:11.123456789Z",
921                    "2001-01-01T00:00:00",
922                ),
923                "2004-04-09T02:03:10Z",
924            ),
925        ];
926
927        cases
928            .iter()
929            .for_each(|((stride, source, origin), expected)| {
930                let stride = stride.unwrap();
931                let stride1 = stride.num_nanoseconds().unwrap();
932                let source1 = string_to_timestamp_nanos(source).unwrap();
933                let origin1 = string_to_timestamp_nanos(origin).unwrap();
934
935                let expected1 = string_to_timestamp_nanos(expected).unwrap();
936                let result = date_bin_nanos_interval(stride1, source1, origin1);
937                assert_eq!(result, expected1, "{source} = {expected}");
938            })
939    }
940
941    #[test]
942    fn test_date_bin_before_epoch() {
943        let cases = [
944            (
945                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
946                "1969-12-31T23:30:00",
947            ),
948            (
949                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
950                "1969-12-31T23:45:00",
951            ),
952            (
953                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
954                "1969-12-31T23:45:00",
955            ),
956        ];
957
958        cases.iter().for_each(|((stride, source), expected)| {
959            let stride = stride.unwrap();
960            let stride1 = stride.num_nanoseconds().unwrap();
961            let source1 = string_to_timestamp_nanos(source).unwrap();
962
963            let expected1 = string_to_timestamp_nanos(expected).unwrap();
964            let result = date_bin_nanos_interval(stride1, source1, 0);
965            assert_eq!(result, expected1, "{source} = {expected}");
966        })
967    }
968}