Skip to main content

datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25    TimestampSecondType,
26};
27use arrow::array::{ArrayRef, AsArray, PrimitiveArray};
28use arrow::datatypes::DataType::{Time32, Time64, Timestamp};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{
32    DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
33    Time64NanosecondType, TimeUnit,
34};
35use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
36use datafusion_common::cast::as_primitive_array;
37use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err};
38use datafusion_expr::TypeSignature::Exact;
39use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
40use datafusion_expr::{
41    ColumnarValue, Documentation, ScalarUDFImpl, Signature, TIMEZONE_WILDCARD, Volatility,
42};
43use datafusion_macros::user_doc;
44
45use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
46
47#[user_doc(
48    doc_section(label = "Time and Date Functions"),
49    description = r#"
50Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
51
52For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
53"#,
54    syntax_example = "date_bin(interval, expression, origin-timestamp)",
55    sql_example = r#"```sql
56-- Bin the timestamp into 1 day intervals
57> SELECT date_bin(interval '1 day', time) as bin
58FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
59+---------------------+
60| bin                 |
61+---------------------+
62| 2023-01-01T00:00:00 |
63| 2023-01-03T00:00:00 |
64+---------------------+
652 row(s) fetched.
66
67-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
68> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
69FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
70+---------------------+
71| bin                 |
72+---------------------+
73| 2023-01-01T03:00:00 |
74| 2023-01-03T03:00:00 |
75+---------------------+
762 row(s) fetched.
77
78-- Bin the time into 15 minute intervals starting at 1 min
79>  SELECT date_bin(interval '15 minutes', time, TIME '00:01:00') as bin
80FROM VALUES (TIME '02:18:18'), (TIME '19:00:03')  t(time);
81+----------+
82| bin      |
83+----------+
84| 02:16:00 |
85| 18:46:00 |
86+----------+
872 row(s) fetched.
88```"#,
89    argument(name = "interval", description = "Bin interval."),
90    argument(
91        name = "expression",
92        description = "Time expression to operate on. Can be a constant, column, or function."
93    ),
94    argument(
95        name = "origin-timestamp",
96        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
97
98    - nanoseconds
99    - microseconds
100    - milliseconds
101    - seconds
102    - minutes
103    - hours
104    - days
105    - weeks
106    - months
107    - years
108    - century
109"#
110    )
111)]
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct DateBinFunc {
114    signature: Signature,
115}
116
117impl Default for DateBinFunc {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl DateBinFunc {
124    pub fn new() -> Self {
125        let base_sig = |array_type: TimeUnit| {
126            let mut v = vec![
127                Exact(vec![
128                    DataType::Interval(MonthDayNano),
129                    Timestamp(array_type, None),
130                    Timestamp(Nanosecond, None),
131                ]),
132                Exact(vec![
133                    DataType::Interval(MonthDayNano),
134                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
135                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
136                ]),
137                Exact(vec![
138                    DataType::Interval(DayTime),
139                    Timestamp(array_type, None),
140                    Timestamp(Nanosecond, None),
141                ]),
142                Exact(vec![
143                    DataType::Interval(DayTime),
144                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
145                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
146                ]),
147                Exact(vec![
148                    DataType::Interval(MonthDayNano),
149                    Timestamp(array_type, None),
150                ]),
151                Exact(vec![
152                    DataType::Interval(MonthDayNano),
153                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
154                ]),
155                Exact(vec![
156                    DataType::Interval(DayTime),
157                    Timestamp(array_type, None),
158                ]),
159                Exact(vec![
160                    DataType::Interval(DayTime),
161                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
162                ]),
163            ];
164
165            match array_type {
166                Second | Millisecond => {
167                    v.append(&mut vec![
168                        Exact(vec![
169                            DataType::Interval(MonthDayNano),
170                            Time32(array_type),
171                            Time32(array_type),
172                        ]),
173                        Exact(vec![DataType::Interval(MonthDayNano), Time32(array_type)]),
174                        Exact(vec![
175                            DataType::Interval(DayTime),
176                            Time32(array_type),
177                            Time32(array_type),
178                        ]),
179                        Exact(vec![DataType::Interval(DayTime), Time32(array_type)]),
180                    ]);
181                }
182                Microsecond | Nanosecond => {
183                    v.append(&mut vec![
184                        Exact(vec![
185                            DataType::Interval(DayTime),
186                            Time64(array_type),
187                            Time64(array_type),
188                        ]),
189                        Exact(vec![DataType::Interval(DayTime), Time64(array_type)]),
190                        Exact(vec![
191                            DataType::Interval(MonthDayNano),
192                            Time64(array_type),
193                            Time64(array_type),
194                        ]),
195                        Exact(vec![DataType::Interval(MonthDayNano), Time64(array_type)]),
196                    ]);
197                }
198            }
199
200            v
201        };
202
203        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
204            .into_iter()
205            .map(base_sig)
206            .collect::<Vec<_>>()
207            .concat();
208
209        Self {
210            signature: Signature::one_of(full_sig, Volatility::Immutable),
211        }
212    }
213}
214
215impl ScalarUDFImpl for DateBinFunc {
216    fn as_any(&self) -> &dyn Any {
217        self
218    }
219
220    fn name(&self) -> &str {
221        "date_bin"
222    }
223
224    fn signature(&self) -> &Signature {
225        &self.signature
226    }
227
228    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
229        match &arg_types[1] {
230            Timestamp(tu, tz_opt) => Ok(Timestamp(*tu, tz_opt.clone())),
231            Time32(tu) => Ok(Time32(*tu)),
232            Time64(tu) => Ok(Time64(*tu)),
233            _ => plan_err!(
234                "The date_bin function can only accept timestamp or time as the second arg."
235            ),
236        }
237    }
238
239    fn invoke_with_args(
240        &self,
241        args: datafusion_expr::ScalarFunctionArgs,
242    ) -> Result<ColumnarValue> {
243        let args = &args.args;
244        if args.len() == 2 {
245            let origin = match args[1].data_type() {
246                Time32(Second) => {
247                    ColumnarValue::Scalar(ScalarValue::Time32Second(Some(0)))
248                }
249                Time32(Millisecond) => {
250                    ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0)))
251                }
252                Time64(Microsecond) => {
253                    ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0)))
254                }
255                Time64(Nanosecond) => {
256                    ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0)))
257                }
258                _ => {
259                    // Default to unix EPOCH
260                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
261                        Some(0),
262                        Some("+00:00".into()),
263                    ))
264                }
265            };
266            date_bin_impl(&args[0], &args[1], &origin)
267        } else if args.len() == 3 {
268            date_bin_impl(&args[0], &args[1], &args[2])
269        } else {
270            exec_err!("DATE_BIN expected two or three arguments")
271        }
272    }
273
274    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
275        // The DATE_BIN function preserves the order of its second argument.
276        let step = &input[0];
277        let date_value = &input[1];
278        let reference = input.get(2);
279
280        if step.sort_properties.eq(&SortProperties::Singleton)
281            && reference
282                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
283                .unwrap_or(true)
284        {
285            Ok(date_value.sort_properties)
286        } else {
287            Ok(SortProperties::Unordered)
288        }
289    }
290    fn documentation(&self) -> Option<&Documentation> {
291        self.doc()
292    }
293}
294
295const NANOS_PER_MICRO: i64 = 1_000;
296const NANOS_PER_MILLI: i64 = 1_000_000;
297const NANOS_PER_SEC: i64 = NANOSECONDS;
298/// Function type for binning timestamps into intervals
299///
300/// Arguments:
301/// * `stride` - Interval width (nanoseconds for time-based, months for month-based)
302/// * `source` - Timestamp to bin (nanoseconds since epoch)
303/// * `origin` - Origin timestamp (nanoseconds since epoch)
304///
305/// Returns: Binned timestamp in nanoseconds, or error if out of range
306type BinFunction = fn(i64, i64, i64) -> Result<i64>;
307enum Interval {
308    Nanoseconds(i64),
309    Months(i64),
310}
311
312impl Interval {
313    /// Returns (`stride_nanos`, `fn`) where
314    ///
315    /// 1. `stride_nanos` is a width, in nanoseconds
316    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
317    ///
318    /// `source` is the timestamp being binned
319    ///
320    /// `origin`  is the time, in nanoseconds, where windows are measured from
321    fn bin_fn(&self) -> (i64, BinFunction) {
322        match self {
323            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
324            Interval::Months(months) => (*months, date_bin_months_interval),
325        }
326    }
327}
328
329// return time in nanoseconds that the source timestamp falls into based on the stride and origin
330fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Result<i64> {
331    let time_diff = source - origin;
332
333    // distance from origin to bin
334    let time_delta = compute_distance(time_diff, stride_nanos);
335
336    Ok(origin + time_delta)
337}
338
339// distance from origin to bin
340fn compute_distance(time_diff: i64, stride: i64) -> i64 {
341    let time_delta = time_diff - (time_diff % stride);
342
343    if time_diff < 0 && stride > 1 && time_delta != time_diff {
344        // The origin is later than the source timestamp, round down to the previous bin
345        time_delta - stride
346    } else {
347        time_delta
348    }
349}
350
351// return time in nanoseconds that the source timestamp falls into based on the stride and origin
352fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Result<i64> {
353    // convert source and origin to DateTime<Utc>
354    let source_date = to_utc_date_time(source)?;
355    let origin_date = to_utc_date_time(origin)?;
356
357    // calculate the number of months between the source and origin
358    let month_diff = (source_date.year() - origin_date.year()) * 12
359        + source_date.month() as i32
360        - origin_date.month() as i32;
361
362    // distance from origin to bin
363    let month_delta = compute_distance(month_diff as i64, stride_months);
364
365    let mut bin_time = if month_delta < 0 {
366        match origin_date
367            .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
368        {
369            Some(dt) => dt,
370            None => return exec_err!("DATE_BIN month subtraction out of range"),
371        }
372    } else {
373        match origin_date.checked_add_months(Months::new(month_delta as u32)) {
374            Some(dt) => dt,
375            None => return exec_err!("DATE_BIN month addition out of range"),
376        }
377    };
378
379    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
380    // In this case, we need to move back to previous bin
381    if bin_time > source_date {
382        let month_delta = month_delta - stride_months;
383        bin_time = if month_delta < 0 {
384            match origin_date
385                .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
386            {
387                Some(dt) => dt,
388                None => return exec_err!("DATE_BIN month subtraction out of range"),
389            }
390        } else {
391            match origin_date.checked_add_months(Months::new(month_delta as u32)) {
392                Some(dt) => dt,
393                None => return exec_err!("DATE_BIN month addition out of range"),
394            }
395        };
396    }
397    match bin_time.timestamp_nanos_opt() {
398        Some(nanos) => Ok(nanos),
399        None => exec_err!("DATE_BIN result timestamp out of range"),
400    }
401}
402
403fn to_utc_date_time(nanos: i64) -> Result<DateTime<Utc>> {
404    let secs = nanos / NANOS_PER_SEC;
405    let nsec = (nanos % NANOS_PER_SEC) as u32;
406    match DateTime::from_timestamp(secs, nsec) {
407        Some(dt) => Ok(dt),
408        None => exec_err!("Invalid timestamp value"),
409    }
410}
411
412// Supported intervals:
413//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
414//     We will assume month interval won't be converted into this type
415//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
416//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
417// 2. IntervalMonthDayNano
418fn date_bin_impl(
419    stride: &ColumnarValue,
420    array: &ColumnarValue,
421    origin: &ColumnarValue,
422) -> Result<ColumnarValue> {
423    let stride = match stride {
424        ColumnarValue::Scalar(s) if s.is_null() => {
425            // NULL stride -> NULL result (standard SQL NULL propagation)
426            return Ok(ColumnarValue::Scalar(ScalarValue::try_from(
427                array.data_type(),
428            )?));
429        }
430        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
431            let (days, ms) = IntervalDayTimeType::to_parts(*v);
432            let nanos = (TimeDelta::try_days(days as i64).unwrap()
433                + TimeDelta::try_milliseconds(ms as i64).unwrap())
434            .num_nanoseconds();
435
436            match nanos {
437                Some(v) => Interval::Nanoseconds(v),
438                _ => return exec_err!("DATE_BIN stride argument is too large"),
439            }
440        }
441        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
442            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
443
444            // If interval is months, its origin must be midnight of first date of the month
445            if months != 0 {
446                // Return error if days or nanos is not zero
447                if days != 0 || nanos != 0 {
448                    return not_impl_err!(
449                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
450                    );
451                } else {
452                    Interval::Months(months as i64)
453                }
454            } else {
455                let nanos = (TimeDelta::try_days(days as i64).unwrap()
456                    + Duration::nanoseconds(nanos))
457                .num_nanoseconds();
458                match nanos {
459                    Some(v) => Interval::Nanoseconds(v),
460                    _ => return exec_err!("DATE_BIN stride argument is too large"),
461                }
462            }
463        }
464        ColumnarValue::Scalar(v) => {
465            return exec_err!(
466                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
467                v.data_type()
468            );
469        }
470        ColumnarValue::Array(_) => {
471            return not_impl_err!(
472                "DATE_BIN only supports literal values for the stride argument, not arrays"
473            );
474        }
475    };
476
477    let (origin, is_time) = match origin {
478        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => {
479            (*v, false)
480        }
481        ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => {
482            match stride {
483                Interval::Months(m) => {
484                    if m > 0 {
485                        return exec_err!(
486                            "DATE_BIN stride for TIME input must be less than 1 day"
487                        );
488                    }
489                }
490                Interval::Nanoseconds(ns) => {
491                    if ns >= NANOSECONDS_IN_DAY {
492                        return exec_err!(
493                            "DATE_BIN stride for TIME input must be less than 1 day"
494                        );
495                    }
496                }
497            }
498
499            (*v as i64 * NANOS_PER_MILLI, true)
500        }
501        ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => {
502            match stride {
503                Interval::Months(m) => {
504                    if m > 0 {
505                        return exec_err!(
506                            "DATE_BIN stride for TIME input must be less than 1 day"
507                        );
508                    }
509                }
510                Interval::Nanoseconds(ns) => {
511                    if ns >= NANOSECONDS_IN_DAY {
512                        return exec_err!(
513                            "DATE_BIN stride for TIME input must be less than 1 day"
514                        );
515                    }
516                }
517            }
518
519            (*v as i64 * NANOS_PER_SEC, true)
520        }
521        ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => {
522            match stride {
523                Interval::Months(m) => {
524                    if m > 0 {
525                        return exec_err!(
526                            "DATE_BIN stride for TIME input must be less than 1 day"
527                        );
528                    }
529                }
530                Interval::Nanoseconds(ns) => {
531                    if ns >= NANOSECONDS_IN_DAY {
532                        return exec_err!(
533                            "DATE_BIN stride for TIME input must be less than 1 day"
534                        );
535                    }
536                }
537            }
538
539            (*v * NANOS_PER_MICRO, true)
540        }
541        ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => {
542            match stride {
543                Interval::Months(m) => {
544                    if m > 0 {
545                        return exec_err!(
546                            "DATE_BIN stride for TIME input must be less than 1 day"
547                        );
548                    }
549                }
550                Interval::Nanoseconds(ns) => {
551                    if ns >= NANOSECONDS_IN_DAY {
552                        return exec_err!(
553                            "DATE_BIN stride for TIME input must be less than 1 day"
554                        );
555                    }
556                }
557            }
558
559            (*v, true)
560        }
561        ColumnarValue::Scalar(v) => {
562            return exec_err!(
563                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got {}",
564                v.data_type()
565            );
566        }
567        ColumnarValue::Array(_) => {
568            return not_impl_err!(
569                "DATE_BIN only supports literal values for the origin argument, not arrays"
570            );
571        }
572    };
573
574    let (stride, stride_fn) = stride.bin_fn();
575
576    // Return error if stride is 0
577    if stride == 0 {
578        return exec_err!("DATE_BIN stride must be non-zero");
579    }
580
581    fn stride_map_fn<T: ArrowTimestampType>(
582        origin: i64,
583        stride: i64,
584        stride_fn: BinFunction,
585    ) -> impl Fn(i64) -> Result<i64> {
586        let scale = match T::UNIT {
587            Nanosecond => 1,
588            Microsecond => NANOS_PER_MICRO,
589            Millisecond => NANOS_PER_MILLI,
590            Second => NANOSECONDS,
591        };
592        move |x: i64| match stride_fn(stride, x * scale, origin) {
593            Ok(result) => Ok(result / scale),
594            Err(e) => Err(e),
595        }
596    }
597
598    Ok(match array {
599        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
600            let apply_stride_fn =
601                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
602            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
603                v.and_then(|val| apply_stride_fn(val).ok()),
604                tz_opt.clone(),
605            ))
606        }
607        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
608            let apply_stride_fn =
609                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
610            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
611                v.and_then(|val| apply_stride_fn(val).ok()),
612                tz_opt.clone(),
613            ))
614        }
615        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
616            let apply_stride_fn =
617                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
618            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
619                v.and_then(|val| apply_stride_fn(val).ok()),
620                tz_opt.clone(),
621            ))
622        }
623        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
624            let apply_stride_fn =
625                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
626            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
627                v.and_then(|val| apply_stride_fn(val).ok()),
628                tz_opt.clone(),
629            ))
630        }
631        ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
632            if !is_time {
633                return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
634            }
635            let result = v.and_then(|x| {
636                match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) {
637                    Ok(binned_nanos) => {
638                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
639                        Some((nanos / NANOS_PER_MILLI) as i32)
640                    }
641                    Err(_) => None,
642                }
643            });
644            ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result))
645        }
646        ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
647            if !is_time {
648                return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
649            }
650            let result = v.and_then(|x| {
651                match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) {
652                    Ok(binned_nanos) => {
653                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
654                        Some((nanos / NANOS_PER_SEC) as i32)
655                    }
656                    Err(_) => None,
657                }
658            });
659            ColumnarValue::Scalar(ScalarValue::Time32Second(result))
660        }
661        ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
662            if !is_time {
663                return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
664            }
665            let result = v.and_then(|x| match stride_fn(stride, x, origin) {
666                Ok(binned_nanos) => Some(binned_nanos % (NANOSECONDS_IN_DAY)),
667                Err(_) => None,
668            });
669            ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result))
670        }
671        ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
672            if !is_time {
673                return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
674            }
675            let result =
676                v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) {
677                    Ok(binned_nanos) => {
678                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
679                        Some(nanos / NANOS_PER_MICRO)
680                    }
681                    Err(_) => None,
682                });
683            ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result))
684        }
685        ColumnarValue::Array(array) => {
686            fn transform_array_with_stride<T>(
687                origin: i64,
688                stride: i64,
689                stride_fn: BinFunction,
690                array: &ArrayRef,
691                tz_opt: &Option<Arc<str>>,
692            ) -> Result<ColumnarValue>
693            where
694                T: ArrowTimestampType,
695            {
696                let array = as_primitive_array::<T>(array)?;
697                let scale = match T::UNIT {
698                    Nanosecond => 1,
699                    Microsecond => NANOS_PER_MICRO,
700                    Millisecond => NANOS_PER_MILLI,
701                    Second => NANOSECONDS,
702                };
703
704                let result: PrimitiveArray<T> = array.try_unary(|val| {
705                    stride_fn(stride, val * scale, origin)
706                        .map(|binned| binned / scale)
707                        .map_err(|e| {
708                            arrow::error::ArrowError::ComputeError(e.to_string())
709                        })
710                })?;
711
712                let array = result.with_timezone_opt(tz_opt.clone());
713                Ok(ColumnarValue::Array(Arc::new(array)))
714            }
715
716            match array.data_type() {
717                Timestamp(Nanosecond, tz_opt) => {
718                    transform_array_with_stride::<TimestampNanosecondType>(
719                        origin, stride, stride_fn, array, tz_opt,
720                    )?
721                }
722                Timestamp(Microsecond, tz_opt) => {
723                    transform_array_with_stride::<TimestampMicrosecondType>(
724                        origin, stride, stride_fn, array, tz_opt,
725                    )?
726                }
727                Timestamp(Millisecond, tz_opt) => {
728                    transform_array_with_stride::<TimestampMillisecondType>(
729                        origin, stride, stride_fn, array, tz_opt,
730                    )?
731                }
732                Timestamp(Second, tz_opt) => {
733                    transform_array_with_stride::<TimestampSecondType>(
734                        origin, stride, stride_fn, array, tz_opt,
735                    )?
736                }
737                Time32(Millisecond) => {
738                    if !is_time {
739                        return exec_err!(
740                            "DATE_BIN with Time32 source requires Time32 origin"
741                        );
742                    }
743                    let array = array.as_primitive::<Time32MillisecondType>();
744                    let result: PrimitiveArray<Time32MillisecondType> =
745                        array.try_unary(|x| {
746                            stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin)
747                                .map(|binned_nanos| {
748                                    let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
749                                    (nanos / NANOS_PER_MILLI) as i32
750                                })
751                                .map_err(|e| {
752                                    arrow::error::ArrowError::ComputeError(e.to_string())
753                                })
754                        })?;
755                    ColumnarValue::Array(Arc::new(result))
756                }
757                Time32(Second) => {
758                    if !is_time {
759                        return exec_err!(
760                            "DATE_BIN with Time32 source requires Time32 origin"
761                        );
762                    }
763                    let array = array.as_primitive::<Time32SecondType>();
764                    let result: PrimitiveArray<Time32SecondType> =
765                        array.try_unary(|x| {
766                            stride_fn(stride, x as i64 * NANOS_PER_SEC, origin)
767                                .map(|binned_nanos| {
768                                    let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
769                                    (nanos / NANOS_PER_SEC) as i32
770                                })
771                                .map_err(|e| {
772                                    arrow::error::ArrowError::ComputeError(e.to_string())
773                                })
774                        })?;
775                    ColumnarValue::Array(Arc::new(result))
776                }
777                Time64(Microsecond) => {
778                    if !is_time {
779                        return exec_err!(
780                            "DATE_BIN with Time64 source requires Time64 origin"
781                        );
782                    }
783                    let array = array.as_primitive::<Time64MicrosecondType>();
784                    let result: PrimitiveArray<Time64MicrosecondType> =
785                        array.try_unary(|x| {
786                            stride_fn(stride, x * NANOS_PER_MICRO, origin)
787                                .map(|binned_nanos| {
788                                    let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
789                                    nanos / NANOS_PER_MICRO
790                                })
791                                .map_err(|e| {
792                                    arrow::error::ArrowError::ComputeError(e.to_string())
793                                })
794                        })?;
795                    ColumnarValue::Array(Arc::new(result))
796                }
797                Time64(Nanosecond) => {
798                    if !is_time {
799                        return exec_err!(
800                            "DATE_BIN with Time64 source requires Time64 origin"
801                        );
802                    }
803                    let array = array.as_primitive::<Time64NanosecondType>();
804                    let result: PrimitiveArray<Time64NanosecondType> =
805                        array.try_unary(|x| {
806                            stride_fn(stride, x, origin)
807                                .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY))
808                                .map_err(|e| {
809                                    arrow::error::ArrowError::ComputeError(e.to_string())
810                                })
811                        })?;
812                    ColumnarValue::Array(Arc::new(result))
813                }
814                _ => {
815                    return exec_err!(
816                        "DATE_BIN expects source argument to be a TIMESTAMP or TIME but got {}",
817                        array.data_type()
818                    );
819                }
820            }
821        }
822        _ => {
823            return exec_err!(
824                "DATE_BIN expects source argument to be a TIMESTAMP or TIME scalar or array"
825            );
826        }
827    })
828}
829
830#[cfg(test)]
831mod tests {
832    use std::sync::Arc;
833
834    use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval};
835    use arrow::array::types::TimestampNanosecondType;
836    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
837    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
838    use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
839
840    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
841    use datafusion_common::{DataFusionError, ScalarValue};
842    use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
843
844    use chrono::TimeDelta;
845    use datafusion_common::config::ConfigOptions;
846
847    fn invoke_date_bin_with_args(
848        args: Vec<ColumnarValue>,
849        number_rows: usize,
850        return_field: &FieldRef,
851    ) -> Result<ColumnarValue, DataFusionError> {
852        let arg_fields = args
853            .iter()
854            .map(|arg| Field::new("a", arg.data_type(), true).into())
855            .collect::<Vec<_>>();
856
857        let args = datafusion_expr::ScalarFunctionArgs {
858            args,
859            arg_fields,
860            number_rows,
861            return_field: Arc::clone(return_field),
862            config_options: Arc::new(ConfigOptions::default()),
863        };
864        DateBinFunc::new().invoke_with_args(args)
865    }
866
867    #[test]
868    fn test_date_bin() {
869        let return_field = &Arc::new(Field::new(
870            "f",
871            DataType::Timestamp(TimeUnit::Nanosecond, None),
872            true,
873        ));
874
875        let mut args = vec![
876            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
877                days: 0,
878                milliseconds: 1,
879            }))),
880            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
881            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
882        ];
883        let res = invoke_date_bin_with_args(args, 1, return_field);
884        assert!(res.is_ok());
885
886        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
887        let batch_len = timestamps.len();
888        args = vec![
889            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
890                days: 0,
891                milliseconds: 1,
892            }))),
893            ColumnarValue::Array(timestamps),
894            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
895        ];
896        let res = invoke_date_bin_with_args(args, batch_len, return_field);
897        assert!(res.is_ok());
898
899        args = vec![
900            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
901                days: 0,
902                milliseconds: 1,
903            }))),
904            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
905        ];
906        let res = invoke_date_bin_with_args(args, 1, return_field);
907        assert!(res.is_ok());
908
909        // stride supports month-day-nano
910        args = vec![
911            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
912                IntervalMonthDayNano {
913                    months: 0,
914                    days: 0,
915                    nanoseconds: 1,
916                },
917            ))),
918            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
919            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
920        ];
921        let res = invoke_date_bin_with_args(args, 1, return_field);
922        assert!(res.is_ok());
923
924        //
925        // Fallible test cases
926        //
927
928        // invalid number of arguments
929        args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
930            IntervalDayTime {
931                days: 0,
932                milliseconds: 1,
933            },
934        )))];
935        let res = invoke_date_bin_with_args(args, 1, return_field);
936        assert_eq!(
937            res.err().unwrap().strip_backtrace(),
938            "Execution error: DATE_BIN expected two or three arguments"
939        );
940
941        // stride: invalid type
942        args = vec![
943            ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
944            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
945            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
946        ];
947        let res = invoke_date_bin_with_args(args, 1, return_field);
948        assert_eq!(
949            res.err().unwrap().strip_backtrace(),
950            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
951        );
952
953        // stride: invalid value
954
955        args = vec![
956            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
957                days: 0,
958                milliseconds: 0,
959            }))),
960            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
961            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
962        ];
963
964        let res = invoke_date_bin_with_args(args, 1, return_field);
965        assert_eq!(
966            res.err().unwrap().strip_backtrace(),
967            "Execution error: DATE_BIN stride must be non-zero"
968        );
969
970        // stride: overflow of day-time interval
971        args = vec![
972            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
973                IntervalDayTime::MAX,
974            ))),
975            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
976            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
977        ];
978        let res = invoke_date_bin_with_args(args, 1, return_field);
979        assert_eq!(
980            res.err().unwrap().strip_backtrace(),
981            "Execution error: DATE_BIN stride argument is too large"
982        );
983
984        // stride: overflow of month-day-nano interval
985        args = vec![
986            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
987            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
988            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
989        ];
990        let res = invoke_date_bin_with_args(args, 1, return_field);
991        assert_eq!(
992            res.err().unwrap().strip_backtrace(),
993            "Execution error: DATE_BIN stride argument is too large"
994        );
995
996        // stride: month intervals
997        args = vec![
998            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
999            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1000            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1001        ];
1002        let res = invoke_date_bin_with_args(args, 1, return_field);
1003        assert_eq!(
1004            res.err().unwrap().strip_backtrace(),
1005            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
1006        );
1007
1008        // origin: invalid type
1009        args = vec![
1010            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1011                days: 0,
1012                milliseconds: 1,
1013            }))),
1014            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1015            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1016        ];
1017        let res = invoke_date_bin_with_args(args, 1, return_field);
1018        assert_eq!(
1019            res.err().unwrap().strip_backtrace(),
1020            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got Timestamp(µs)"
1021        );
1022
1023        args = vec![
1024            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1025                days: 0,
1026                milliseconds: 1,
1027            }))),
1028            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1029            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1030        ];
1031        let res = invoke_date_bin_with_args(args, 1, return_field);
1032        assert!(res.is_ok());
1033
1034        // unsupported array type for stride
1035        let intervals = Arc::new(
1036            (1..6)
1037                .map(|x| {
1038                    Some(IntervalDayTime {
1039                        days: 0,
1040                        milliseconds: x,
1041                    })
1042                })
1043                .collect::<IntervalDayTimeArray>(),
1044        );
1045        args = vec![
1046            ColumnarValue::Array(intervals),
1047            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1048            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1049        ];
1050        let res = invoke_date_bin_with_args(args, 1, return_field);
1051        assert_eq!(
1052            res.err().unwrap().strip_backtrace(),
1053            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
1054        );
1055
1056        // unsupported array type for origin
1057        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
1058        let batch_len = timestamps.len();
1059        args = vec![
1060            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1061                days: 0,
1062                milliseconds: 1,
1063            }))),
1064            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1065            ColumnarValue::Array(timestamps),
1066        ];
1067        let res = invoke_date_bin_with_args(args, batch_len, return_field);
1068        assert_eq!(
1069            res.err().unwrap().strip_backtrace(),
1070            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
1071        );
1072    }
1073
1074    #[test]
1075    fn test_date_bin_timezones() {
1076        let cases = [
1077            (
1078                vec![
1079                    "2020-09-08T00:00:00Z",
1080                    "2020-09-08T01:00:00Z",
1081                    "2020-09-08T02:00:00Z",
1082                    "2020-09-08T03:00:00Z",
1083                    "2020-09-08T04:00:00Z",
1084                ],
1085                Some("+00".into()),
1086                "1970-01-01T00:00:00Z",
1087                vec![
1088                    "2020-09-08T00:00:00Z",
1089                    "2020-09-08T00:00:00Z",
1090                    "2020-09-08T00:00:00Z",
1091                    "2020-09-08T00:00:00Z",
1092                    "2020-09-08T00:00:00Z",
1093                ],
1094            ),
1095            (
1096                vec![
1097                    "2020-09-08T00:00:00Z",
1098                    "2020-09-08T01:00:00Z",
1099                    "2020-09-08T02:00:00Z",
1100                    "2020-09-08T03:00:00Z",
1101                    "2020-09-08T04:00:00Z",
1102                ],
1103                None,
1104                "1970-01-01T00:00:00Z",
1105                vec![
1106                    "2020-09-08T00:00:00Z",
1107                    "2020-09-08T00:00:00Z",
1108                    "2020-09-08T00:00:00Z",
1109                    "2020-09-08T00:00:00Z",
1110                    "2020-09-08T00:00:00Z",
1111                ],
1112            ),
1113            (
1114                vec![
1115                    "2020-09-08T00:00:00Z",
1116                    "2020-09-08T01:00:00Z",
1117                    "2020-09-08T02:00:00Z",
1118                    "2020-09-08T03:00:00Z",
1119                    "2020-09-08T04:00:00Z",
1120                ],
1121                Some("-02".into()),
1122                "1970-01-01T00:00:00Z",
1123                vec![
1124                    "2020-09-08T00:00:00Z",
1125                    "2020-09-08T00:00:00Z",
1126                    "2020-09-08T00:00:00Z",
1127                    "2020-09-08T00:00:00Z",
1128                    "2020-09-08T00:00:00Z",
1129                ],
1130            ),
1131            (
1132                vec![
1133                    "2020-09-08T00:00:00+05",
1134                    "2020-09-08T01:00:00+05",
1135                    "2020-09-08T02:00:00+05",
1136                    "2020-09-08T03:00:00+05",
1137                    "2020-09-08T04:00:00+05",
1138                ],
1139                Some("+05".into()),
1140                "1970-01-01T00:00:00+05",
1141                vec![
1142                    "2020-09-08T00:00:00+05",
1143                    "2020-09-08T00:00:00+05",
1144                    "2020-09-08T00:00:00+05",
1145                    "2020-09-08T00:00:00+05",
1146                    "2020-09-08T00:00:00+05",
1147                ],
1148            ),
1149            (
1150                vec![
1151                    "2020-09-08T00:00:00+08",
1152                    "2020-09-08T01:00:00+08",
1153                    "2020-09-08T02:00:00+08",
1154                    "2020-09-08T03:00:00+08",
1155                    "2020-09-08T04:00:00+08",
1156                ],
1157                Some("+08".into()),
1158                "1970-01-01T00:00:00+08",
1159                vec![
1160                    "2020-09-08T00:00:00+08",
1161                    "2020-09-08T00:00:00+08",
1162                    "2020-09-08T00:00:00+08",
1163                    "2020-09-08T00:00:00+08",
1164                    "2020-09-08T00:00:00+08",
1165                ],
1166            ),
1167        ];
1168
1169        cases
1170            .iter()
1171            .for_each(|(original, tz_opt, origin, expected)| {
1172                let input = original
1173                    .iter()
1174                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1175                    .collect::<TimestampNanosecondArray>()
1176                    .with_timezone_opt(tz_opt.clone());
1177                let right = expected
1178                    .iter()
1179                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1180                    .collect::<TimestampNanosecondArray>()
1181                    .with_timezone_opt(tz_opt.clone());
1182                let batch_len = input.len();
1183                let args = vec![
1184                    ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
1185                    ColumnarValue::Array(Arc::new(input)),
1186                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1187                        Some(string_to_timestamp_nanos(origin).unwrap()),
1188                        tz_opt.clone(),
1189                    )),
1190                ];
1191                let return_field = &Arc::new(Field::new(
1192                    "f",
1193                    DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
1194                    true,
1195                ));
1196                let result =
1197                    invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
1198
1199                if let ColumnarValue::Array(result) = result {
1200                    assert_eq!(
1201                        result.data_type(),
1202                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
1203                    );
1204                    let left = arrow::array::cast::as_primitive_array::<
1205                        TimestampNanosecondType,
1206                    >(&result);
1207                    assert_eq!(left, &right);
1208                } else {
1209                    panic!("unexpected column type");
1210                }
1211            });
1212    }
1213
1214    #[test]
1215    fn test_date_bin_single() {
1216        let cases = [
1217            (
1218                (
1219                    TimeDelta::try_minutes(15),
1220                    "2004-04-09T02:03:04.123456789Z",
1221                    "2001-01-01T00:00:00",
1222                ),
1223                "2004-04-09T02:00:00Z",
1224            ),
1225            (
1226                (
1227                    TimeDelta::try_minutes(15),
1228                    "2004-04-09T02:03:04.123456789Z",
1229                    "2001-01-01T00:02:30",
1230                ),
1231                "2004-04-09T02:02:30Z",
1232            ),
1233            (
1234                (
1235                    TimeDelta::try_minutes(15),
1236                    "2004-04-09T02:03:04.123456789Z",
1237                    "2005-01-01T00:02:30",
1238                ),
1239                "2004-04-09T02:02:30Z",
1240            ),
1241            (
1242                (
1243                    TimeDelta::try_hours(1),
1244                    "2004-04-09T02:03:04.123456789Z",
1245                    "2001-01-01T00:00:00",
1246                ),
1247                "2004-04-09T02:00:00Z",
1248            ),
1249            (
1250                (
1251                    TimeDelta::try_seconds(10),
1252                    "2004-04-09T02:03:11.123456789Z",
1253                    "2001-01-01T00:00:00",
1254                ),
1255                "2004-04-09T02:03:10Z",
1256            ),
1257        ];
1258
1259        cases
1260            .iter()
1261            .for_each(|((stride, source, origin), expected)| {
1262                let stride = stride.unwrap();
1263                let stride1 = stride.num_nanoseconds().unwrap();
1264                let source1 = string_to_timestamp_nanos(source).unwrap();
1265                let origin1 = string_to_timestamp_nanos(origin).unwrap();
1266
1267                let expected1 = string_to_timestamp_nanos(expected).unwrap();
1268                let result = date_bin_nanos_interval(stride1, source1, origin1).unwrap();
1269                assert_eq!(result, expected1, "{source} = {expected}");
1270            })
1271    }
1272
1273    #[test]
1274    fn test_date_bin_before_epoch() {
1275        let cases = [
1276            (
1277                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
1278                "1969-12-31T23:30:00",
1279            ),
1280            (
1281                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
1282                "1969-12-31T23:45:00",
1283            ),
1284            (
1285                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
1286                "1969-12-31T23:45:00",
1287            ),
1288        ];
1289
1290        cases.iter().for_each(|((stride, source), expected)| {
1291            let stride = stride.unwrap();
1292            let stride1 = stride.num_nanoseconds().unwrap();
1293            let source1 = string_to_timestamp_nanos(source).unwrap();
1294
1295            let expected1 = string_to_timestamp_nanos(expected).unwrap();
1296            let result = date_bin_nanos_interval(stride1, source1, 0).unwrap();
1297            assert_eq!(result, expected1, "{source} = {expected}");
1298        })
1299    }
1300
1301    #[test]
1302    fn test_date_bin_out_of_range() {
1303        let return_field = &Arc::new(Field::new(
1304            "f",
1305            DataType::Timestamp(TimeUnit::Millisecond, None),
1306            true,
1307        ));
1308        let args = vec![
1309            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1310            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1311                Some(1040292460),
1312                None,
1313            )),
1314            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1315                Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1316                None,
1317            )),
1318        ];
1319
1320        let result = invoke_date_bin_with_args(args, 1, return_field);
1321        assert!(result.is_ok());
1322        if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1323            result.unwrap()
1324        {
1325            assert!(val.is_none(), "Expected None for out of range operation");
1326        }
1327        let args = vec![
1328            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1329            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1330                Some(-1040292460),
1331                None,
1332            )),
1333            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1334                Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1335                None,
1336            )),
1337        ];
1338
1339        let result = invoke_date_bin_with_args(args, 1, return_field);
1340        assert!(result.is_ok());
1341        if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1342            result.unwrap()
1343        {
1344            assert!(val.is_none(), "Expected None for out of range operation");
1345        }
1346    }
1347}