Skip to main content

datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::temporal_conversions::NANOSECONDS;
21use arrow::array::types::{
22    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
23    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
24    TimestampSecondType,
25};
26use arrow::array::{ArrayRef, AsArray, PrimitiveArray};
27use arrow::datatypes::DataType::{Time32, Time64, Timestamp};
28use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
29use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
30use arrow::datatypes::{
31    DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
32    Time64NanosecondType, TimeUnit,
33};
34use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
35use datafusion_common::cast::as_primitive_array;
36use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err};
37use datafusion_expr::TypeSignature::Exact;
38use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
39use datafusion_expr::{
40    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
41    TIMEZONE_WILDCARD, Volatility,
42};
43use datafusion_macros::user_doc;
44
45use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
46
47#[user_doc(
48    doc_section(label = "Time and Date Functions"),
49    description = r#"
50Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
51
52For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
53"#,
54    syntax_example = "date_bin(interval, expression, origin-timestamp)",
55    sql_example = r#"```sql
56-- Bin the timestamp into 1 day intervals
57> SELECT date_bin(interval '1 day', time) as bin
58FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
59+---------------------+
60| bin                 |
61+---------------------+
62| 2023-01-01T00:00:00 |
63| 2023-01-03T00:00:00 |
64+---------------------+
652 row(s) fetched.
66
67-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
68> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
69FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
70+---------------------+
71| bin                 |
72+---------------------+
73| 2023-01-01T03:00:00 |
74| 2023-01-03T03:00:00 |
75+---------------------+
762 row(s) fetched.
77
78-- Bin the time into 15 minute intervals starting at 1 min
79>  SELECT date_bin(interval '15 minutes', time, TIME '00:01:00') as bin
80FROM VALUES (TIME '02:18:18'), (TIME '19:00:03')  t(time);
81+----------+
82| bin      |
83+----------+
84| 02:16:00 |
85| 18:46:00 |
86+----------+
872 row(s) fetched.
88```"#,
89    argument(name = "interval", description = "Bin interval."),
90    argument(
91        name = "expression",
92        description = "Time expression to operate on. Can be a constant, column, or function."
93    ),
94    argument(
95        name = "origin-timestamp",
96        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
97
98    - nanoseconds
99    - microseconds
100    - milliseconds
101    - seconds
102    - minutes
103    - hours
104    - days
105    - weeks
106    - months
107    - years
108    - century
109"#
110    )
111)]
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct DateBinFunc {
114    signature: Signature,
115}
116
117impl Default for DateBinFunc {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl DateBinFunc {
124    pub fn new() -> Self {
125        let base_sig = |array_type: TimeUnit| {
126            let mut v = vec![
127                Exact(vec![
128                    DataType::Interval(MonthDayNano),
129                    Timestamp(array_type, None),
130                    Timestamp(Nanosecond, None),
131                ]),
132                Exact(vec![
133                    DataType::Interval(MonthDayNano),
134                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
135                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
136                ]),
137                Exact(vec![
138                    DataType::Interval(DayTime),
139                    Timestamp(array_type, None),
140                    Timestamp(Nanosecond, None),
141                ]),
142                Exact(vec![
143                    DataType::Interval(DayTime),
144                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
145                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
146                ]),
147                Exact(vec![
148                    DataType::Interval(MonthDayNano),
149                    Timestamp(array_type, None),
150                ]),
151                Exact(vec![
152                    DataType::Interval(MonthDayNano),
153                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
154                ]),
155                Exact(vec![
156                    DataType::Interval(DayTime),
157                    Timestamp(array_type, None),
158                ]),
159                Exact(vec![
160                    DataType::Interval(DayTime),
161                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
162                ]),
163            ];
164
165            match array_type {
166                Second | Millisecond => {
167                    v.append(&mut vec![
168                        Exact(vec![
169                            DataType::Interval(MonthDayNano),
170                            Time32(array_type),
171                            Time32(array_type),
172                        ]),
173                        Exact(vec![DataType::Interval(MonthDayNano), Time32(array_type)]),
174                        Exact(vec![
175                            DataType::Interval(DayTime),
176                            Time32(array_type),
177                            Time32(array_type),
178                        ]),
179                        Exact(vec![DataType::Interval(DayTime), Time32(array_type)]),
180                    ]);
181                }
182                Microsecond | Nanosecond => {
183                    v.append(&mut vec![
184                        Exact(vec![
185                            DataType::Interval(DayTime),
186                            Time64(array_type),
187                            Time64(array_type),
188                        ]),
189                        Exact(vec![DataType::Interval(DayTime), Time64(array_type)]),
190                        Exact(vec![
191                            DataType::Interval(MonthDayNano),
192                            Time64(array_type),
193                            Time64(array_type),
194                        ]),
195                        Exact(vec![DataType::Interval(MonthDayNano), Time64(array_type)]),
196                    ]);
197                }
198            }
199
200            v
201        };
202
203        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
204            .into_iter()
205            .map(base_sig)
206            .collect::<Vec<_>>()
207            .concat();
208
209        Self {
210            signature: Signature::one_of(full_sig, Volatility::Immutable),
211        }
212    }
213}
214
215impl ScalarUDFImpl for DateBinFunc {
216    fn name(&self) -> &str {
217        "date_bin"
218    }
219
220    fn signature(&self) -> &Signature {
221        &self.signature
222    }
223
224    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
225        match &arg_types[1] {
226            Timestamp(tu, tz_opt) => Ok(Timestamp(*tu, tz_opt.clone())),
227            Time32(tu) => Ok(Time32(*tu)),
228            Time64(tu) => Ok(Time64(*tu)),
229            _ => plan_err!(
230                "The date_bin function can only accept timestamp or time as the second arg."
231            ),
232        }
233    }
234
235    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
236        let args = &args.args;
237        if args.len() == 2 {
238            let origin = match args[1].data_type() {
239                Time32(Second) => {
240                    ColumnarValue::Scalar(ScalarValue::Time32Second(Some(0)))
241                }
242                Time32(Millisecond) => {
243                    ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0)))
244                }
245                Time64(Microsecond) => {
246                    ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0)))
247                }
248                Time64(Nanosecond) => {
249                    ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0)))
250                }
251                _ => {
252                    // Default to unix EPOCH
253                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
254                        Some(0),
255                        Some("+00:00".into()),
256                    ))
257                }
258            };
259            date_bin_impl(&args[0], &args[1], &origin)
260        } else if args.len() == 3 {
261            date_bin_impl(&args[0], &args[1], &args[2])
262        } else {
263            exec_err!("DATE_BIN expected two or three arguments")
264        }
265    }
266
267    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
268        // The DATE_BIN function preserves the order of its second argument.
269        let step = &input[0];
270        let date_value = &input[1];
271        let reference = input.get(2);
272
273        if step.sort_properties.eq(&SortProperties::Singleton)
274            && reference
275                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
276                .unwrap_or(true)
277        {
278            Ok(date_value.sort_properties)
279        } else {
280            Ok(SortProperties::Unordered)
281        }
282    }
283    fn documentation(&self) -> Option<&Documentation> {
284        self.doc()
285    }
286}
287
288const NANOS_PER_MICRO: i64 = 1_000;
289const NANOS_PER_MILLI: i64 = 1_000_000;
290const NANOS_PER_SEC: i64 = NANOSECONDS;
291/// Function type for binning timestamps into intervals
292///
293/// Arguments:
294/// * `stride` - Interval width (nanoseconds for time-based, months for month-based)
295/// * `source` - Timestamp to bin (nanoseconds since epoch)
296/// * `origin` - Origin timestamp (nanoseconds since epoch)
297///
298/// Returns: Binned timestamp in nanoseconds, or error if out of range
299type BinFunction = fn(i64, i64, i64) -> Result<i64>;
300enum Interval {
301    Nanoseconds(i64),
302    Months(i64),
303}
304
305impl Interval {
306    /// Returns (`stride_nanos`, `fn`) where
307    ///
308    /// 1. `stride_nanos` is a width, in nanoseconds
309    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
310    ///
311    /// `source` is the timestamp being binned
312    ///
313    /// `origin`  is the time, in nanoseconds, where windows are measured from
314    fn bin_fn(&self) -> (i64, BinFunction) {
315        match self {
316            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
317            Interval::Months(months) => (*months, date_bin_months_interval),
318        }
319    }
320}
321
322// return time in nanoseconds that the source timestamp falls into based on the stride and origin
323fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Result<i64> {
324    let time_diff = source.checked_sub(origin).ok_or_else(|| {
325        arrow::error::ArrowError::InvalidArgumentError(format!(
326            "date_bin source timestamp {source} - origin {origin} overflows i64"
327        ))
328    })?;
329
330    // distance from origin to bin
331    let time_delta = compute_distance(time_diff, stride_nanos);
332
333    Ok(origin + time_delta)
334}
335
336// distance from origin to bin
337fn compute_distance(time_diff: i64, stride: i64) -> i64 {
338    let time_delta = time_diff - (time_diff % stride);
339
340    if time_diff < 0 && stride > 1 && time_delta != time_diff {
341        // The origin is later than the source timestamp, round down to the previous bin
342        time_delta - stride
343    } else {
344        time_delta
345    }
346}
347
348// return time in nanoseconds that the source timestamp falls into based on the stride and origin
349fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Result<i64> {
350    // convert source and origin to DateTime<Utc>
351    let source_date = to_utc_date_time(source)?;
352    let origin_date = to_utc_date_time(origin)?;
353
354    // calculate the number of months between the source and origin
355    let month_diff = (source_date.year() - origin_date.year()) * 12
356        + source_date.month() as i32
357        - origin_date.month() as i32;
358
359    // distance from origin to bin
360    let month_delta = compute_distance(month_diff as i64, stride_months);
361
362    let mut bin_time = if month_delta < 0 {
363        match origin_date
364            .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
365        {
366            Some(dt) => dt,
367            None => return exec_err!("DATE_BIN month subtraction out of range"),
368        }
369    } else {
370        match origin_date.checked_add_months(Months::new(month_delta as u32)) {
371            Some(dt) => dt,
372            None => return exec_err!("DATE_BIN month addition out of range"),
373        }
374    };
375
376    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
377    // In this case, we need to move back to previous bin
378    if bin_time > source_date {
379        let month_delta = month_delta - stride_months;
380        bin_time = if month_delta < 0 {
381            match origin_date
382                .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
383            {
384                Some(dt) => dt,
385                None => return exec_err!("DATE_BIN month subtraction out of range"),
386            }
387        } else {
388            match origin_date.checked_add_months(Months::new(month_delta as u32)) {
389                Some(dt) => dt,
390                None => return exec_err!("DATE_BIN month addition out of range"),
391            }
392        };
393    }
394    match bin_time.timestamp_nanos_opt() {
395        Some(nanos) => Ok(nanos),
396        None => exec_err!("DATE_BIN result timestamp out of range"),
397    }
398}
399
400fn to_utc_date_time(nanos: i64) -> Result<DateTime<Utc>> {
401    let secs = nanos / NANOS_PER_SEC;
402    let nsec = (nanos % NANOS_PER_SEC) as u32;
403    match DateTime::from_timestamp(secs, nsec) {
404        Some(dt) => Ok(dt),
405        None => exec_err!("Invalid timestamp value"),
406    }
407}
408
409// Supported intervals:
410//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
411//     We will assume month interval won't be converted into this type
412//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
413//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
414// 2. IntervalMonthDayNano
415fn date_bin_impl(
416    stride: &ColumnarValue,
417    array: &ColumnarValue,
418    origin: &ColumnarValue,
419) -> Result<ColumnarValue> {
420    let stride = match stride {
421        ColumnarValue::Scalar(s) if s.is_null() => {
422            // NULL stride -> NULL result (standard SQL NULL propagation)
423            return Ok(ColumnarValue::Scalar(ScalarValue::try_from(
424                array.data_type(),
425            )?));
426        }
427        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
428            let (days, ms) = IntervalDayTimeType::to_parts(*v);
429            let nanos = (TimeDelta::try_days(days as i64).unwrap()
430                + TimeDelta::try_milliseconds(ms as i64).unwrap())
431            .num_nanoseconds();
432
433            match nanos {
434                Some(v) => Interval::Nanoseconds(v),
435                _ => return exec_err!("DATE_BIN stride argument is too large"),
436            }
437        }
438        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
439            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
440
441            // If interval is months, its origin must be midnight of first date of the month
442            if months != 0 {
443                // Return error if days or nanos is not zero
444                if days != 0 || nanos != 0 {
445                    return not_impl_err!(
446                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
447                    );
448                } else {
449                    Interval::Months(months as i64)
450                }
451            } else {
452                let nanos = (TimeDelta::try_days(days as i64).unwrap()
453                    + Duration::nanoseconds(nanos))
454                .num_nanoseconds();
455                match nanos {
456                    Some(v) => Interval::Nanoseconds(v),
457                    _ => return exec_err!("DATE_BIN stride argument is too large"),
458                }
459            }
460        }
461        ColumnarValue::Scalar(v) => {
462            return exec_err!(
463                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
464                v.data_type()
465            );
466        }
467        ColumnarValue::Array(_) => {
468            return not_impl_err!(
469                "DATE_BIN only supports literal values for the stride argument, not arrays"
470            );
471        }
472    };
473
474    let (origin, is_time) = match origin {
475        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => {
476            (*v, false)
477        }
478        ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => {
479            match stride {
480                Interval::Months(m) => {
481                    if m > 0 {
482                        return exec_err!(
483                            "DATE_BIN stride for TIME input must be less than 1 day"
484                        );
485                    }
486                }
487                Interval::Nanoseconds(ns) => {
488                    if ns >= NANOSECONDS_IN_DAY {
489                        return exec_err!(
490                            "DATE_BIN stride for TIME input must be less than 1 day"
491                        );
492                    }
493                }
494            }
495
496            (*v as i64 * NANOS_PER_MILLI, true)
497        }
498        ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => {
499            match stride {
500                Interval::Months(m) => {
501                    if m > 0 {
502                        return exec_err!(
503                            "DATE_BIN stride for TIME input must be less than 1 day"
504                        );
505                    }
506                }
507                Interval::Nanoseconds(ns) => {
508                    if ns >= NANOSECONDS_IN_DAY {
509                        return exec_err!(
510                            "DATE_BIN stride for TIME input must be less than 1 day"
511                        );
512                    }
513                }
514            }
515
516            (*v as i64 * NANOS_PER_SEC, true)
517        }
518        ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => {
519            match stride {
520                Interval::Months(m) => {
521                    if m > 0 {
522                        return exec_err!(
523                            "DATE_BIN stride for TIME input must be less than 1 day"
524                        );
525                    }
526                }
527                Interval::Nanoseconds(ns) => {
528                    if ns >= NANOSECONDS_IN_DAY {
529                        return exec_err!(
530                            "DATE_BIN stride for TIME input must be less than 1 day"
531                        );
532                    }
533                }
534            }
535
536            (*v * NANOS_PER_MICRO, true)
537        }
538        ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => {
539            match stride {
540                Interval::Months(m) => {
541                    if m > 0 {
542                        return exec_err!(
543                            "DATE_BIN stride for TIME input must be less than 1 day"
544                        );
545                    }
546                }
547                Interval::Nanoseconds(ns) => {
548                    if ns >= NANOSECONDS_IN_DAY {
549                        return exec_err!(
550                            "DATE_BIN stride for TIME input must be less than 1 day"
551                        );
552                    }
553                }
554            }
555
556            (*v, true)
557        }
558        ColumnarValue::Scalar(v) => {
559            return exec_err!(
560                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got {}",
561                v.data_type()
562            );
563        }
564        ColumnarValue::Array(_) => {
565            return not_impl_err!(
566                "DATE_BIN only supports literal values for the origin argument, not arrays"
567            );
568        }
569    };
570
571    let (stride, stride_fn) = stride.bin_fn();
572
573    // Return error if stride is 0
574    if stride == 0 {
575        return exec_err!("DATE_BIN stride must be non-zero");
576    }
577
578    fn stride_map_fn<T: ArrowTimestampType>(
579        origin: i64,
580        stride: i64,
581        stride_fn: BinFunction,
582    ) -> impl Fn(i64) -> Result<i64> {
583        let scale = match T::UNIT {
584            Nanosecond => 1,
585            Microsecond => NANOS_PER_MICRO,
586            Millisecond => NANOS_PER_MILLI,
587            Second => NANOSECONDS,
588        };
589        move |x: i64| match stride_fn(stride, x * scale, origin) {
590            Ok(result) => Ok(result / scale),
591            Err(e) => Err(e),
592        }
593    }
594
595    Ok(match array {
596        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
597            let apply_stride_fn =
598                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
599            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
600                v.and_then(|val| apply_stride_fn(val).ok()),
601                tz_opt.clone(),
602            ))
603        }
604        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
605            let apply_stride_fn =
606                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
607            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
608                v.and_then(|val| apply_stride_fn(val).ok()),
609                tz_opt.clone(),
610            ))
611        }
612        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
613            let apply_stride_fn =
614                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
615            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
616                v.and_then(|val| apply_stride_fn(val).ok()),
617                tz_opt.clone(),
618            ))
619        }
620        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
621            let apply_stride_fn =
622                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
623            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
624                v.and_then(|val| apply_stride_fn(val).ok()),
625                tz_opt.clone(),
626            ))
627        }
628        ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
629            if !is_time {
630                return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
631            }
632            let result = v.and_then(|x| {
633                match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) {
634                    Ok(binned_nanos) => {
635                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
636                        Some((nanos / NANOS_PER_MILLI) as i32)
637                    }
638                    Err(_) => None,
639                }
640            });
641            ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result))
642        }
643        ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
644            if !is_time {
645                return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
646            }
647            let result = v.and_then(|x| {
648                match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) {
649                    Ok(binned_nanos) => {
650                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
651                        Some((nanos / NANOS_PER_SEC) as i32)
652                    }
653                    Err(_) => None,
654                }
655            });
656            ColumnarValue::Scalar(ScalarValue::Time32Second(result))
657        }
658        ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
659            if !is_time {
660                return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
661            }
662            let result = v.and_then(|x| match stride_fn(stride, x, origin) {
663                Ok(binned_nanos) => Some(binned_nanos % (NANOSECONDS_IN_DAY)),
664                Err(_) => None,
665            });
666            ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result))
667        }
668        ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
669            if !is_time {
670                return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
671            }
672            let result =
673                v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) {
674                    Ok(binned_nanos) => {
675                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
676                        Some(nanos / NANOS_PER_MICRO)
677                    }
678                    Err(_) => None,
679                });
680            ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result))
681        }
682        ColumnarValue::Array(array) => {
683            fn transform_array_with_stride<T>(
684                origin: i64,
685                stride: i64,
686                stride_fn: BinFunction,
687                array: &ArrayRef,
688                tz_opt: &Option<Arc<str>>,
689            ) -> Result<ColumnarValue>
690            where
691                T: ArrowTimestampType,
692            {
693                let array = as_primitive_array::<T>(array)?;
694                let scale = match T::UNIT {
695                    Nanosecond => 1,
696                    Microsecond => NANOS_PER_MICRO,
697                    Millisecond => NANOS_PER_MILLI,
698                    Second => NANOSECONDS,
699                };
700
701                let result: PrimitiveArray<T> = array.try_unary(|val| {
702                    stride_fn(stride, val * scale, origin)
703                        .map(|binned| binned / scale)
704                        .map_err(|e| {
705                            arrow::error::ArrowError::ComputeError(e.to_string())
706                        })
707                })?;
708
709                let array = result.with_timezone_opt(tz_opt.clone());
710                Ok(ColumnarValue::Array(Arc::new(array)))
711            }
712
713            match array.data_type() {
714                Timestamp(Nanosecond, tz_opt) => {
715                    transform_array_with_stride::<TimestampNanosecondType>(
716                        origin, stride, stride_fn, array, tz_opt,
717                    )?
718                }
719                Timestamp(Microsecond, tz_opt) => {
720                    transform_array_with_stride::<TimestampMicrosecondType>(
721                        origin, stride, stride_fn, array, tz_opt,
722                    )?
723                }
724                Timestamp(Millisecond, tz_opt) => {
725                    transform_array_with_stride::<TimestampMillisecondType>(
726                        origin, stride, stride_fn, array, tz_opt,
727                    )?
728                }
729                Timestamp(Second, tz_opt) => {
730                    transform_array_with_stride::<TimestampSecondType>(
731                        origin, stride, stride_fn, array, tz_opt,
732                    )?
733                }
734                Time32(Millisecond) => {
735                    if !is_time {
736                        return exec_err!(
737                            "DATE_BIN with Time32 source requires Time32 origin"
738                        );
739                    }
740                    let array = array.as_primitive::<Time32MillisecondType>();
741                    let result: PrimitiveArray<Time32MillisecondType> =
742                        array.try_unary(|x| {
743                            stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin)
744                                .map(|binned_nanos| {
745                                    let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
746                                    (nanos / NANOS_PER_MILLI) as i32
747                                })
748                                .map_err(|e| {
749                                    arrow::error::ArrowError::ComputeError(e.to_string())
750                                })
751                        })?;
752                    ColumnarValue::Array(Arc::new(result))
753                }
754                Time32(Second) => {
755                    if !is_time {
756                        return exec_err!(
757                            "DATE_BIN with Time32 source requires Time32 origin"
758                        );
759                    }
760                    let array = array.as_primitive::<Time32SecondType>();
761                    let result: PrimitiveArray<Time32SecondType> =
762                        array.try_unary(|x| {
763                            stride_fn(stride, x as i64 * NANOS_PER_SEC, origin)
764                                .map(|binned_nanos| {
765                                    let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
766                                    (nanos / NANOS_PER_SEC) as i32
767                                })
768                                .map_err(|e| {
769                                    arrow::error::ArrowError::ComputeError(e.to_string())
770                                })
771                        })?;
772                    ColumnarValue::Array(Arc::new(result))
773                }
774                Time64(Microsecond) => {
775                    if !is_time {
776                        return exec_err!(
777                            "DATE_BIN with Time64 source requires Time64 origin"
778                        );
779                    }
780                    let array = array.as_primitive::<Time64MicrosecondType>();
781                    let result: PrimitiveArray<Time64MicrosecondType> =
782                        array.try_unary(|x| {
783                            stride_fn(stride, x * NANOS_PER_MICRO, origin)
784                                .map(|binned_nanos| {
785                                    let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
786                                    nanos / NANOS_PER_MICRO
787                                })
788                                .map_err(|e| {
789                                    arrow::error::ArrowError::ComputeError(e.to_string())
790                                })
791                        })?;
792                    ColumnarValue::Array(Arc::new(result))
793                }
794                Time64(Nanosecond) => {
795                    if !is_time {
796                        return exec_err!(
797                            "DATE_BIN with Time64 source requires Time64 origin"
798                        );
799                    }
800                    let array = array.as_primitive::<Time64NanosecondType>();
801                    let result: PrimitiveArray<Time64NanosecondType> =
802                        array.try_unary(|x| {
803                            stride_fn(stride, x, origin)
804                                .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY))
805                                .map_err(|e| {
806                                    arrow::error::ArrowError::ComputeError(e.to_string())
807                                })
808                        })?;
809                    ColumnarValue::Array(Arc::new(result))
810                }
811                _ => {
812                    return exec_err!(
813                        "DATE_BIN expects source argument to be a TIMESTAMP or TIME but got {}",
814                        array.data_type()
815                    );
816                }
817            }
818        }
819        _ => {
820            return exec_err!(
821                "DATE_BIN expects source argument to be a TIMESTAMP or TIME scalar or array"
822            );
823        }
824    })
825}
826
827#[cfg(test)]
828mod tests {
829    use std::sync::Arc;
830
831    use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval};
832    use arrow::array::types::TimestampNanosecondType;
833    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
834    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
835    use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
836
837    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
838    use datafusion_common::{DataFusionError, ScalarValue};
839    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
840
841    use chrono::TimeDelta;
842    use datafusion_common::config::ConfigOptions;
843
844    fn invoke_date_bin_with_args(
845        args: Vec<ColumnarValue>,
846        number_rows: usize,
847        return_field: &FieldRef,
848    ) -> Result<ColumnarValue, DataFusionError> {
849        let arg_fields = args
850            .iter()
851            .map(|arg| Field::new("a", arg.data_type(), true).into())
852            .collect::<Vec<_>>();
853
854        let args = ScalarFunctionArgs {
855            args,
856            arg_fields,
857            number_rows,
858            return_field: Arc::clone(return_field),
859            config_options: Arc::new(ConfigOptions::default()),
860        };
861        DateBinFunc::new().invoke_with_args(args)
862    }
863
864    #[test]
865    fn test_date_bin() {
866        let return_field = &Arc::new(Field::new(
867            "f",
868            DataType::Timestamp(TimeUnit::Nanosecond, None),
869            true,
870        ));
871
872        let mut args = vec![
873            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
874                days: 0,
875                milliseconds: 1,
876            }))),
877            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
878            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
879        ];
880        let res = invoke_date_bin_with_args(args, 1, return_field);
881        assert!(res.is_ok());
882
883        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
884        let batch_len = timestamps.len();
885        args = vec![
886            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
887                days: 0,
888                milliseconds: 1,
889            }))),
890            ColumnarValue::Array(timestamps),
891            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
892        ];
893        let res = invoke_date_bin_with_args(args, batch_len, return_field);
894        assert!(res.is_ok());
895
896        args = vec![
897            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
898                days: 0,
899                milliseconds: 1,
900            }))),
901            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
902        ];
903        let res = invoke_date_bin_with_args(args, 1, return_field);
904        assert!(res.is_ok());
905
906        // stride supports month-day-nano
907        args = vec![
908            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
909                IntervalMonthDayNano {
910                    months: 0,
911                    days: 0,
912                    nanoseconds: 1,
913                },
914            ))),
915            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
916            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
917        ];
918        let res = invoke_date_bin_with_args(args, 1, return_field);
919        assert!(res.is_ok());
920
921        //
922        // Fallible test cases
923        //
924
925        // invalid number of arguments
926        args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
927            IntervalDayTime {
928                days: 0,
929                milliseconds: 1,
930            },
931        )))];
932        let res = invoke_date_bin_with_args(args, 1, return_field);
933        assert_eq!(
934            res.err().unwrap().strip_backtrace(),
935            "Execution error: DATE_BIN expected two or three arguments"
936        );
937
938        // stride: invalid type
939        args = vec![
940            ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
941            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
942            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
943        ];
944        let res = invoke_date_bin_with_args(args, 1, return_field);
945        assert_eq!(
946            res.err().unwrap().strip_backtrace(),
947            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
948        );
949
950        // stride: invalid value
951
952        args = vec![
953            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
954                days: 0,
955                milliseconds: 0,
956            }))),
957            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
958            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
959        ];
960
961        let res = invoke_date_bin_with_args(args, 1, return_field);
962        assert_eq!(
963            res.err().unwrap().strip_backtrace(),
964            "Execution error: DATE_BIN stride must be non-zero"
965        );
966
967        // stride: overflow of day-time interval
968        args = vec![
969            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
970                IntervalDayTime::MAX,
971            ))),
972            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
973            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
974        ];
975        let res = invoke_date_bin_with_args(args, 1, return_field);
976        assert_eq!(
977            res.err().unwrap().strip_backtrace(),
978            "Execution error: DATE_BIN stride argument is too large"
979        );
980
981        // stride: overflow of month-day-nano interval
982        args = vec![
983            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
984            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
985            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
986        ];
987        let res = invoke_date_bin_with_args(args, 1, return_field);
988        assert_eq!(
989            res.err().unwrap().strip_backtrace(),
990            "Execution error: DATE_BIN stride argument is too large"
991        );
992
993        // stride: month intervals
994        args = vec![
995            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
996            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
997            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
998        ];
999        let res = invoke_date_bin_with_args(args, 1, return_field);
1000        assert_eq!(
1001            res.err().unwrap().strip_backtrace(),
1002            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
1003        );
1004
1005        // origin: invalid type
1006        args = vec![
1007            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1008                days: 0,
1009                milliseconds: 1,
1010            }))),
1011            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1012            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1013        ];
1014        let res = invoke_date_bin_with_args(args, 1, return_field);
1015        assert_eq!(
1016            res.err().unwrap().strip_backtrace(),
1017            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got Timestamp(µs)"
1018        );
1019
1020        args = vec![
1021            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1022                days: 0,
1023                milliseconds: 1,
1024            }))),
1025            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1026            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1027        ];
1028        let res = invoke_date_bin_with_args(args, 1, return_field);
1029        assert!(res.is_ok());
1030
1031        // unsupported array type for stride
1032        let intervals = Arc::new(
1033            (1..6)
1034                .map(|x| {
1035                    Some(IntervalDayTime {
1036                        days: 0,
1037                        milliseconds: x,
1038                    })
1039                })
1040                .collect::<IntervalDayTimeArray>(),
1041        );
1042        args = vec![
1043            ColumnarValue::Array(intervals),
1044            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1045            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1046        ];
1047        let res = invoke_date_bin_with_args(args, 1, return_field);
1048        assert_eq!(
1049            res.err().unwrap().strip_backtrace(),
1050            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
1051        );
1052
1053        // unsupported array type for origin
1054        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
1055        let batch_len = timestamps.len();
1056        args = vec![
1057            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1058                days: 0,
1059                milliseconds: 1,
1060            }))),
1061            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1062            ColumnarValue::Array(timestamps),
1063        ];
1064        let res = invoke_date_bin_with_args(args, batch_len, return_field);
1065        assert_eq!(
1066            res.err().unwrap().strip_backtrace(),
1067            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
1068        );
1069    }
1070
1071    #[test]
1072    fn test_date_bin_timezones() {
1073        let cases = [
1074            (
1075                vec![
1076                    "2020-09-08T00:00:00Z",
1077                    "2020-09-08T01:00:00Z",
1078                    "2020-09-08T02:00:00Z",
1079                    "2020-09-08T03:00:00Z",
1080                    "2020-09-08T04:00:00Z",
1081                ],
1082                Some("+00".into()),
1083                "1970-01-01T00:00:00Z",
1084                vec![
1085                    "2020-09-08T00:00:00Z",
1086                    "2020-09-08T00:00:00Z",
1087                    "2020-09-08T00:00:00Z",
1088                    "2020-09-08T00:00:00Z",
1089                    "2020-09-08T00:00:00Z",
1090                ],
1091            ),
1092            (
1093                vec![
1094                    "2020-09-08T00:00:00Z",
1095                    "2020-09-08T01:00:00Z",
1096                    "2020-09-08T02:00:00Z",
1097                    "2020-09-08T03:00:00Z",
1098                    "2020-09-08T04:00:00Z",
1099                ],
1100                None,
1101                "1970-01-01T00:00:00Z",
1102                vec![
1103                    "2020-09-08T00:00:00Z",
1104                    "2020-09-08T00:00:00Z",
1105                    "2020-09-08T00:00:00Z",
1106                    "2020-09-08T00:00:00Z",
1107                    "2020-09-08T00:00:00Z",
1108                ],
1109            ),
1110            (
1111                vec![
1112                    "2020-09-08T00:00:00Z",
1113                    "2020-09-08T01:00:00Z",
1114                    "2020-09-08T02:00:00Z",
1115                    "2020-09-08T03:00:00Z",
1116                    "2020-09-08T04:00:00Z",
1117                ],
1118                Some("-02".into()),
1119                "1970-01-01T00:00:00Z",
1120                vec![
1121                    "2020-09-08T00:00:00Z",
1122                    "2020-09-08T00:00:00Z",
1123                    "2020-09-08T00:00:00Z",
1124                    "2020-09-08T00:00:00Z",
1125                    "2020-09-08T00:00:00Z",
1126                ],
1127            ),
1128            (
1129                vec![
1130                    "2020-09-08T00:00:00+05",
1131                    "2020-09-08T01:00:00+05",
1132                    "2020-09-08T02:00:00+05",
1133                    "2020-09-08T03:00:00+05",
1134                    "2020-09-08T04:00:00+05",
1135                ],
1136                Some("+05".into()),
1137                "1970-01-01T00:00:00+05",
1138                vec![
1139                    "2020-09-08T00:00:00+05",
1140                    "2020-09-08T00:00:00+05",
1141                    "2020-09-08T00:00:00+05",
1142                    "2020-09-08T00:00:00+05",
1143                    "2020-09-08T00:00:00+05",
1144                ],
1145            ),
1146            (
1147                vec![
1148                    "2020-09-08T00:00:00+08",
1149                    "2020-09-08T01:00:00+08",
1150                    "2020-09-08T02:00:00+08",
1151                    "2020-09-08T03:00:00+08",
1152                    "2020-09-08T04:00:00+08",
1153                ],
1154                Some("+08".into()),
1155                "1970-01-01T00:00:00+08",
1156                vec![
1157                    "2020-09-08T00:00:00+08",
1158                    "2020-09-08T00:00:00+08",
1159                    "2020-09-08T00:00:00+08",
1160                    "2020-09-08T00:00:00+08",
1161                    "2020-09-08T00:00:00+08",
1162                ],
1163            ),
1164        ];
1165
1166        cases
1167            .iter()
1168            .for_each(|(original, tz_opt, origin, expected)| {
1169                let input = original
1170                    .iter()
1171                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1172                    .collect::<TimestampNanosecondArray>()
1173                    .with_timezone_opt(tz_opt.clone());
1174                let right = expected
1175                    .iter()
1176                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1177                    .collect::<TimestampNanosecondArray>()
1178                    .with_timezone_opt(tz_opt.clone());
1179                let batch_len = input.len();
1180                let args = vec![
1181                    ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
1182                    ColumnarValue::Array(Arc::new(input)),
1183                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1184                        Some(string_to_timestamp_nanos(origin).unwrap()),
1185                        tz_opt.clone(),
1186                    )),
1187                ];
1188                let return_field = &Arc::new(Field::new(
1189                    "f",
1190                    DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
1191                    true,
1192                ));
1193                let result =
1194                    invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
1195
1196                if let ColumnarValue::Array(result) = result {
1197                    assert_eq!(
1198                        result.data_type(),
1199                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
1200                    );
1201                    let left = arrow::array::cast::as_primitive_array::<
1202                        TimestampNanosecondType,
1203                    >(&result);
1204                    assert_eq!(left, &right);
1205                } else {
1206                    panic!("unexpected column type");
1207                }
1208            });
1209    }
1210
1211    #[test]
1212    fn test_date_bin_single() {
1213        let cases = [
1214            (
1215                (
1216                    TimeDelta::try_minutes(15),
1217                    "2004-04-09T02:03:04.123456789Z",
1218                    "2001-01-01T00:00:00",
1219                ),
1220                "2004-04-09T02:00:00Z",
1221            ),
1222            (
1223                (
1224                    TimeDelta::try_minutes(15),
1225                    "2004-04-09T02:03:04.123456789Z",
1226                    "2001-01-01T00:02:30",
1227                ),
1228                "2004-04-09T02:02:30Z",
1229            ),
1230            (
1231                (
1232                    TimeDelta::try_minutes(15),
1233                    "2004-04-09T02:03:04.123456789Z",
1234                    "2005-01-01T00:02:30",
1235                ),
1236                "2004-04-09T02:02:30Z",
1237            ),
1238            (
1239                (
1240                    TimeDelta::try_hours(1),
1241                    "2004-04-09T02:03:04.123456789Z",
1242                    "2001-01-01T00:00:00",
1243                ),
1244                "2004-04-09T02:00:00Z",
1245            ),
1246            (
1247                (
1248                    TimeDelta::try_seconds(10),
1249                    "2004-04-09T02:03:11.123456789Z",
1250                    "2001-01-01T00:00:00",
1251                ),
1252                "2004-04-09T02:03:10Z",
1253            ),
1254        ];
1255
1256        cases
1257            .iter()
1258            .for_each(|((stride, source, origin), expected)| {
1259                let stride = stride.unwrap();
1260                let stride1 = stride.num_nanoseconds().unwrap();
1261                let source1 = string_to_timestamp_nanos(source).unwrap();
1262                let origin1 = string_to_timestamp_nanos(origin).unwrap();
1263
1264                let expected1 = string_to_timestamp_nanos(expected).unwrap();
1265                let result = date_bin_nanos_interval(stride1, source1, origin1).unwrap();
1266                assert_eq!(result, expected1, "{source} = {expected}");
1267            })
1268    }
1269
1270    #[test]
1271    fn test_date_bin_before_epoch() {
1272        let cases = [
1273            (
1274                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
1275                "1969-12-31T23:30:00",
1276            ),
1277            (
1278                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
1279                "1969-12-31T23:45:00",
1280            ),
1281            (
1282                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
1283                "1969-12-31T23:45:00",
1284            ),
1285        ];
1286
1287        cases.iter().for_each(|((stride, source), expected)| {
1288            let stride = stride.unwrap();
1289            let stride1 = stride.num_nanoseconds().unwrap();
1290            let source1 = string_to_timestamp_nanos(source).unwrap();
1291
1292            let expected1 = string_to_timestamp_nanos(expected).unwrap();
1293            let result = date_bin_nanos_interval(stride1, source1, 0).unwrap();
1294            assert_eq!(result, expected1, "{source} = {expected}");
1295        })
1296    }
1297
1298    #[test]
1299    fn test_date_bin_out_of_range() {
1300        let return_field = &Arc::new(Field::new(
1301            "f",
1302            DataType::Timestamp(TimeUnit::Millisecond, None),
1303            true,
1304        ));
1305        let args = vec![
1306            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1307            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1308                Some(1040292460),
1309                None,
1310            )),
1311            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1312                Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1313                None,
1314            )),
1315        ];
1316
1317        let result = invoke_date_bin_with_args(args, 1, return_field);
1318        assert!(result.is_ok());
1319        if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1320            result.unwrap()
1321        {
1322            assert!(val.is_none(), "Expected None for out of range operation");
1323        }
1324        let args = vec![
1325            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1326            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1327                Some(-1040292460),
1328                None,
1329            )),
1330            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1331                Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1332                None,
1333            )),
1334        ];
1335
1336        let result = invoke_date_bin_with_args(args, 1, return_field);
1337        assert!(result.is_ok());
1338        if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1339            result.unwrap()
1340        {
1341            assert!(val.is_none(), "Expected None for out of range operation");
1342        }
1343    }
1344}