datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25    TimestampSecondType,
26};
27use arrow::array::{ArrayRef, AsArray, PrimitiveArray};
28use arrow::datatypes::DataType::{Time32, Time64, Timestamp};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{
32    DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
33    Time64NanosecondType, TimeUnit,
34};
35use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
36use datafusion_common::cast::as_primitive_array;
37use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err};
38use datafusion_expr::TypeSignature::Exact;
39use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
40use datafusion_expr::{
41    ColumnarValue, Documentation, ScalarUDFImpl, Signature, TIMEZONE_WILDCARD, Volatility,
42};
43use datafusion_macros::user_doc;
44
45use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
46
47#[user_doc(
48    doc_section(label = "Time and Date Functions"),
49    description = r#"
50Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
51
52For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
53"#,
54    syntax_example = "date_bin(interval, expression, origin-timestamp)",
55    sql_example = r#"```sql
56-- Bin the timestamp into 1 day intervals
57> SELECT date_bin(interval '1 day', time) as bin
58FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
59+---------------------+
60| bin                 |
61+---------------------+
62| 2023-01-01T00:00:00 |
63| 2023-01-03T00:00:00 |
64+---------------------+
652 row(s) fetched.
66
67-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
68> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
69FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
70+---------------------+
71| bin                 |
72+---------------------+
73| 2023-01-01T03:00:00 |
74| 2023-01-03T03:00:00 |
75+---------------------+
762 row(s) fetched.
77
78-- Bin the time into 15 minute intervals starting at 1 min
79>  SELECT date_bin(interval '15 minutes', time, TIME '00:01:00') as bin
80FROM VALUES (TIME '02:18:18'), (TIME '19:00:03')  t(time);
81+----------+
82| bin      |
83+----------+
84| 02:16:00 |
85| 18:46:00 |
86+----------+
872 row(s) fetched.
88```"#,
89    argument(name = "interval", description = "Bin interval."),
90    argument(
91        name = "expression",
92        description = "Time expression to operate on. Can be a constant, column, or function."
93    ),
94    argument(
95        name = "origin-timestamp",
96        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
97
98    - nanoseconds
99    - microseconds
100    - milliseconds
101    - seconds
102    - minutes
103    - hours
104    - days
105    - weeks
106    - months
107    - years
108    - century
109"#
110    )
111)]
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct DateBinFunc {
114    signature: Signature,
115}
116
117impl Default for DateBinFunc {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl DateBinFunc {
124    pub fn new() -> Self {
125        let base_sig = |array_type: TimeUnit| {
126            let mut v = vec![
127                Exact(vec![
128                    DataType::Interval(MonthDayNano),
129                    Timestamp(array_type, None),
130                    Timestamp(Nanosecond, None),
131                ]),
132                Exact(vec![
133                    DataType::Interval(MonthDayNano),
134                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
135                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
136                ]),
137                Exact(vec![
138                    DataType::Interval(DayTime),
139                    Timestamp(array_type, None),
140                    Timestamp(Nanosecond, None),
141                ]),
142                Exact(vec![
143                    DataType::Interval(DayTime),
144                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
145                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
146                ]),
147                Exact(vec![
148                    DataType::Interval(MonthDayNano),
149                    Timestamp(array_type, None),
150                ]),
151                Exact(vec![
152                    DataType::Interval(MonthDayNano),
153                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
154                ]),
155                Exact(vec![
156                    DataType::Interval(DayTime),
157                    Timestamp(array_type, None),
158                ]),
159                Exact(vec![
160                    DataType::Interval(DayTime),
161                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
162                ]),
163            ];
164
165            match array_type {
166                Second | Millisecond => {
167                    v.append(&mut vec![
168                        Exact(vec![
169                            DataType::Interval(MonthDayNano),
170                            Time32(array_type),
171                            Time32(array_type),
172                        ]),
173                        Exact(vec![DataType::Interval(MonthDayNano), Time32(array_type)]),
174                        Exact(vec![
175                            DataType::Interval(DayTime),
176                            Time32(array_type),
177                            Time32(array_type),
178                        ]),
179                        Exact(vec![DataType::Interval(DayTime), Time32(array_type)]),
180                    ]);
181                }
182                Microsecond | Nanosecond => {
183                    v.append(&mut vec![
184                        Exact(vec![
185                            DataType::Interval(DayTime),
186                            Time64(array_type),
187                            Time64(array_type),
188                        ]),
189                        Exact(vec![DataType::Interval(DayTime), Time64(array_type)]),
190                        Exact(vec![
191                            DataType::Interval(MonthDayNano),
192                            Time64(array_type),
193                            Time64(array_type),
194                        ]),
195                        Exact(vec![DataType::Interval(MonthDayNano), Time64(array_type)]),
196                    ]);
197                }
198            }
199
200            v
201        };
202
203        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
204            .into_iter()
205            .map(base_sig)
206            .collect::<Vec<_>>()
207            .concat();
208
209        Self {
210            signature: Signature::one_of(full_sig, Volatility::Immutable),
211        }
212    }
213}
214
215impl ScalarUDFImpl for DateBinFunc {
216    fn as_any(&self) -> &dyn Any {
217        self
218    }
219
220    fn name(&self) -> &str {
221        "date_bin"
222    }
223
224    fn signature(&self) -> &Signature {
225        &self.signature
226    }
227
228    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
229        match &arg_types[1] {
230            Timestamp(tu, tz_opt) => Ok(Timestamp(*tu, tz_opt.clone())),
231            Time32(tu) => Ok(Time32(*tu)),
232            Time64(tu) => Ok(Time64(*tu)),
233            _ => plan_err!(
234                "The date_bin function can only accept timestamp or time as the second arg."
235            ),
236        }
237    }
238
239    fn invoke_with_args(
240        &self,
241        args: datafusion_expr::ScalarFunctionArgs,
242    ) -> Result<ColumnarValue> {
243        let args = &args.args;
244        if args.len() == 2 {
245            let origin = match args[1].data_type() {
246                Time32(Second) => {
247                    ColumnarValue::Scalar(ScalarValue::Time32Second(Some(0)))
248                }
249                Time32(Millisecond) => {
250                    ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0)))
251                }
252                Time64(Microsecond) => {
253                    ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0)))
254                }
255                Time64(Nanosecond) => {
256                    ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0)))
257                }
258                _ => {
259                    // Default to unix EPOCH
260                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
261                        Some(0),
262                        Some("+00:00".into()),
263                    ))
264                }
265            };
266            date_bin_impl(&args[0], &args[1], &origin)
267        } else if args.len() == 3 {
268            date_bin_impl(&args[0], &args[1], &args[2])
269        } else {
270            exec_err!("DATE_BIN expected two or three arguments")
271        }
272    }
273
274    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
275        // The DATE_BIN function preserves the order of its second argument.
276        let step = &input[0];
277        let date_value = &input[1];
278        let reference = input.get(2);
279
280        if step.sort_properties.eq(&SortProperties::Singleton)
281            && reference
282                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
283                .unwrap_or(true)
284        {
285            Ok(date_value.sort_properties)
286        } else {
287            Ok(SortProperties::Unordered)
288        }
289    }
290    fn documentation(&self) -> Option<&Documentation> {
291        self.doc()
292    }
293}
294
295const NANOS_PER_MICRO: i64 = 1_000;
296const NANOS_PER_MILLI: i64 = 1_000_000;
297const NANOS_PER_SEC: i64 = NANOSECONDS;
298
299enum Interval {
300    Nanoseconds(i64),
301    Months(i64),
302}
303
304impl Interval {
305    /// Returns (`stride_nanos`, `fn`) where
306    ///
307    /// 1. `stride_nanos` is a width, in nanoseconds
308    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
309    ///
310    /// `source` is the timestamp being binned
311    ///
312    /// `origin`  is the time, in nanoseconds, where windows are measured from
313    fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
314        match self {
315            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
316            Interval::Months(months) => (*months, date_bin_months_interval),
317        }
318    }
319}
320
321// return time in nanoseconds that the source timestamp falls into based on the stride and origin
322fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
323    let time_diff = source - origin;
324
325    // distance from origin to bin
326    let time_delta = compute_distance(time_diff, stride_nanos);
327
328    origin + time_delta
329}
330
331// distance from origin to bin
332fn compute_distance(time_diff: i64, stride: i64) -> i64 {
333    let time_delta = time_diff - (time_diff % stride);
334
335    if time_diff < 0 && stride > 1 && time_delta != time_diff {
336        // The origin is later than the source timestamp, round down to the previous bin
337        time_delta - stride
338    } else {
339        time_delta
340    }
341}
342
343// return time in nanoseconds that the source timestamp falls into based on the stride and origin
344fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
345    // convert source and origin to DateTime<Utc>
346    let source_date = to_utc_date_time(source);
347    let origin_date = to_utc_date_time(origin);
348
349    // calculate the number of months between the source and origin
350    let month_diff = (source_date.year() - origin_date.year()) * 12
351        + source_date.month() as i32
352        - origin_date.month() as i32;
353
354    // distance from origin to bin
355    let month_delta = compute_distance(month_diff as i64, stride_months);
356
357    let mut bin_time = if month_delta < 0 {
358        origin_date - Months::new(month_delta.unsigned_abs() as u32)
359    } else {
360        origin_date + Months::new(month_delta as u32)
361    };
362
363    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
364    // In this case, we need to move back to previous bin
365    if bin_time > source_date {
366        let month_delta = month_delta - stride_months;
367        bin_time = if month_delta < 0 {
368            origin_date - Months::new(month_delta.unsigned_abs() as u32)
369        } else {
370            origin_date + Months::new(month_delta as u32)
371        };
372    }
373
374    bin_time.timestamp_nanos_opt().unwrap()
375}
376
377fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
378    let secs = nanos / NANOS_PER_SEC;
379    let nsec = (nanos % NANOS_PER_SEC) as u32;
380    DateTime::from_timestamp(secs, nsec).unwrap()
381}
382
383// Supported intervals:
384//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
385//     We will assume month interval won't be converted into this type
386//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
387//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
388// 2. IntervalMonthDayNano
389fn date_bin_impl(
390    stride: &ColumnarValue,
391    array: &ColumnarValue,
392    origin: &ColumnarValue,
393) -> Result<ColumnarValue> {
394    let stride = match stride {
395        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
396            let (days, ms) = IntervalDayTimeType::to_parts(*v);
397            let nanos = (TimeDelta::try_days(days as i64).unwrap()
398                + TimeDelta::try_milliseconds(ms as i64).unwrap())
399            .num_nanoseconds();
400
401            match nanos {
402                Some(v) => Interval::Nanoseconds(v),
403                _ => return exec_err!("DATE_BIN stride argument is too large"),
404            }
405        }
406        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
407            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
408
409            // If interval is months, its origin must be midnight of first date of the month
410            if months != 0 {
411                // Return error if days or nanos is not zero
412                if days != 0 || nanos != 0 {
413                    return not_impl_err!(
414                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
415                    );
416                } else {
417                    Interval::Months(months as i64)
418                }
419            } else {
420                let nanos = (TimeDelta::try_days(days as i64).unwrap()
421                    + Duration::nanoseconds(nanos))
422                .num_nanoseconds();
423                match nanos {
424                    Some(v) => Interval::Nanoseconds(v),
425                    _ => return exec_err!("DATE_BIN stride argument is too large"),
426                }
427            }
428        }
429        ColumnarValue::Scalar(v) => {
430            return exec_err!(
431                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
432                v.data_type()
433            );
434        }
435        ColumnarValue::Array(_) => {
436            return not_impl_err!(
437                "DATE_BIN only supports literal values for the stride argument, not arrays"
438            );
439        }
440    };
441
442    let (origin, is_time) = match origin {
443        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => {
444            (*v, false)
445        }
446        ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => {
447            match stride {
448                Interval::Months(m) => {
449                    if m > 0 {
450                        return exec_err!(
451                            "DATE_BIN stride for TIME input must be less than 1 day"
452                        );
453                    }
454                }
455                Interval::Nanoseconds(ns) => {
456                    if ns >= NANOSECONDS_IN_DAY {
457                        return exec_err!(
458                            "DATE_BIN stride for TIME input must be less than 1 day"
459                        );
460                    }
461                }
462            }
463
464            (*v as i64 * NANOS_PER_MILLI, true)
465        }
466        ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => {
467            match stride {
468                Interval::Months(m) => {
469                    if m > 0 {
470                        return exec_err!(
471                            "DATE_BIN stride for TIME input must be less than 1 day"
472                        );
473                    }
474                }
475                Interval::Nanoseconds(ns) => {
476                    if ns >= NANOSECONDS_IN_DAY {
477                        return exec_err!(
478                            "DATE_BIN stride for TIME input must be less than 1 day"
479                        );
480                    }
481                }
482            }
483
484            (*v as i64 * NANOS_PER_SEC, true)
485        }
486        ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => {
487            match stride {
488                Interval::Months(m) => {
489                    if m > 0 {
490                        return exec_err!(
491                            "DATE_BIN stride for TIME input must be less than 1 day"
492                        );
493                    }
494                }
495                Interval::Nanoseconds(ns) => {
496                    if ns >= NANOSECONDS_IN_DAY {
497                        return exec_err!(
498                            "DATE_BIN stride for TIME input must be less than 1 day"
499                        );
500                    }
501                }
502            }
503
504            (*v * NANOS_PER_MICRO, true)
505        }
506        ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => {
507            match stride {
508                Interval::Months(m) => {
509                    if m > 0 {
510                        return exec_err!(
511                            "DATE_BIN stride for TIME input must be less than 1 day"
512                        );
513                    }
514                }
515                Interval::Nanoseconds(ns) => {
516                    if ns >= NANOSECONDS_IN_DAY {
517                        return exec_err!(
518                            "DATE_BIN stride for TIME input must be less than 1 day"
519                        );
520                    }
521                }
522            }
523
524            (*v, true)
525        }
526        ColumnarValue::Scalar(v) => {
527            return exec_err!(
528                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got {}",
529                v.data_type()
530            );
531        }
532        ColumnarValue::Array(_) => {
533            return not_impl_err!(
534                "DATE_BIN only supports literal values for the origin argument, not arrays"
535            );
536        }
537    };
538
539    let (stride, stride_fn) = stride.bin_fn();
540
541    // Return error if stride is 0
542    if stride == 0 {
543        return exec_err!("DATE_BIN stride must be non-zero");
544    }
545
546    fn stride_map_fn<T: ArrowTimestampType>(
547        origin: i64,
548        stride: i64,
549        stride_fn: fn(i64, i64, i64) -> i64,
550    ) -> impl Fn(i64) -> i64 {
551        let scale = match T::UNIT {
552            Nanosecond => 1,
553            Microsecond => NANOS_PER_MICRO,
554            Millisecond => NANOS_PER_MILLI,
555            Second => NANOSECONDS,
556        };
557        move |x: i64| stride_fn(stride, x * scale, origin) / scale
558    }
559
560    Ok(match array {
561        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
562            let apply_stride_fn =
563                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
564            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
565                v.map(apply_stride_fn),
566                tz_opt.clone(),
567            ))
568        }
569        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
570            let apply_stride_fn =
571                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
572            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
573                v.map(apply_stride_fn),
574                tz_opt.clone(),
575            ))
576        }
577        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
578            let apply_stride_fn =
579                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
580            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
581                v.map(apply_stride_fn),
582                tz_opt.clone(),
583            ))
584        }
585        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
586            let apply_stride_fn =
587                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
588            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
589                v.map(apply_stride_fn),
590                tz_opt.clone(),
591            ))
592        }
593        ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
594            if !is_time {
595                return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
596            }
597            let apply_stride_fn = move |x: i32| {
598                let binned_nanos = stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin);
599                let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
600                (nanos / NANOS_PER_MILLI) as i32
601            };
602            ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v.map(apply_stride_fn)))
603        }
604        ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
605            if !is_time {
606                return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
607            }
608            let apply_stride_fn = move |x: i32| {
609                let binned_nanos = stride_fn(stride, x as i64 * NANOS_PER_SEC, origin);
610                let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
611                (nanos / NANOS_PER_SEC) as i32
612            };
613            ColumnarValue::Scalar(ScalarValue::Time32Second(v.map(apply_stride_fn)))
614        }
615        ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
616            if !is_time {
617                return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
618            }
619            let apply_stride_fn = move |x: i64| {
620                let binned_nanos = stride_fn(stride, x, origin);
621                binned_nanos % (NANOSECONDS_IN_DAY)
622            };
623            ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v.map(apply_stride_fn)))
624        }
625        ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
626            if !is_time {
627                return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
628            }
629            let apply_stride_fn = move |x: i64| {
630                let binned_nanos = stride_fn(stride, x * NANOS_PER_MICRO, origin);
631                let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
632                nanos / NANOS_PER_MICRO
633            };
634            ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v.map(apply_stride_fn)))
635        }
636        ColumnarValue::Array(array) => {
637            fn transform_array_with_stride<T>(
638                origin: i64,
639                stride: i64,
640                stride_fn: fn(i64, i64, i64) -> i64,
641                array: &ArrayRef,
642                tz_opt: &Option<Arc<str>>,
643            ) -> Result<ColumnarValue>
644            where
645                T: ArrowTimestampType,
646            {
647                let array = as_primitive_array::<T>(array)?;
648                let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
649                let array: PrimitiveArray<T> = array
650                    .unary(apply_stride_fn)
651                    .with_timezone_opt(tz_opt.clone());
652
653                Ok(ColumnarValue::Array(Arc::new(array)))
654            }
655
656            match array.data_type() {
657                Timestamp(Nanosecond, tz_opt) => {
658                    transform_array_with_stride::<TimestampNanosecondType>(
659                        origin, stride, stride_fn, array, tz_opt,
660                    )?
661                }
662                Timestamp(Microsecond, tz_opt) => {
663                    transform_array_with_stride::<TimestampMicrosecondType>(
664                        origin, stride, stride_fn, array, tz_opt,
665                    )?
666                }
667                Timestamp(Millisecond, tz_opt) => {
668                    transform_array_with_stride::<TimestampMillisecondType>(
669                        origin, stride, stride_fn, array, tz_opt,
670                    )?
671                }
672                Timestamp(Second, tz_opt) => {
673                    transform_array_with_stride::<TimestampSecondType>(
674                        origin, stride, stride_fn, array, tz_opt,
675                    )?
676                }
677                Time32(Millisecond) => {
678                    if !is_time {
679                        return exec_err!(
680                            "DATE_BIN with Time32 source requires Time32 origin"
681                        );
682                    }
683                    let array = array.as_primitive::<Time32MillisecondType>();
684                    let apply_stride_fn = move |x: i32| {
685                        let binned_nanos =
686                            stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin);
687                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
688                        (nanos / NANOS_PER_MILLI) as i32
689                    };
690                    let array: PrimitiveArray<Time32MillisecondType> =
691                        array.unary(apply_stride_fn);
692                    ColumnarValue::Array(Arc::new(array))
693                }
694                Time32(Second) => {
695                    if !is_time {
696                        return exec_err!(
697                            "DATE_BIN with Time32 source requires Time32 origin"
698                        );
699                    }
700                    let array = array.as_primitive::<Time32SecondType>();
701                    let apply_stride_fn = move |x: i32| {
702                        let binned_nanos =
703                            stride_fn(stride, x as i64 * NANOS_PER_SEC, origin);
704                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
705                        (nanos / NANOS_PER_SEC) as i32
706                    };
707                    let array: PrimitiveArray<Time32SecondType> =
708                        array.unary(apply_stride_fn);
709                    ColumnarValue::Array(Arc::new(array))
710                }
711                Time64(Microsecond) => {
712                    if !is_time {
713                        return exec_err!(
714                            "DATE_BIN with Time64 source requires Time64 origin"
715                        );
716                    }
717                    let array = array.as_primitive::<Time64MicrosecondType>();
718                    let apply_stride_fn = move |x: i64| {
719                        let binned_nanos = stride_fn(stride, x * NANOS_PER_MICRO, origin);
720                        let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
721                        nanos / NANOS_PER_MICRO
722                    };
723                    let array: PrimitiveArray<Time64MicrosecondType> =
724                        array.unary(apply_stride_fn);
725                    ColumnarValue::Array(Arc::new(array))
726                }
727                Time64(Nanosecond) => {
728                    if !is_time {
729                        return exec_err!(
730                            "DATE_BIN with Time64 source requires Time64 origin"
731                        );
732                    }
733                    let array = array.as_primitive::<Time64NanosecondType>();
734                    let apply_stride_fn = move |x: i64| {
735                        let binned_nanos = stride_fn(stride, x, origin);
736                        binned_nanos % (NANOSECONDS_IN_DAY)
737                    };
738                    let array: PrimitiveArray<Time64NanosecondType> =
739                        array.unary(apply_stride_fn);
740                    ColumnarValue::Array(Arc::new(array))
741                }
742                _ => {
743                    return exec_err!(
744                        "DATE_BIN expects source argument to be a TIMESTAMP or TIME but got {}",
745                        array.data_type()
746                    );
747                }
748            }
749        }
750        _ => {
751            return exec_err!(
752                "DATE_BIN expects source argument to be a TIMESTAMP or TIME scalar or array"
753            );
754        }
755    })
756}
757
758#[cfg(test)]
759mod tests {
760    use std::sync::Arc;
761
762    use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval};
763    use arrow::array::types::TimestampNanosecondType;
764    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
765    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
766    use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
767
768    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
769    use datafusion_common::{DataFusionError, ScalarValue};
770    use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
771
772    use chrono::TimeDelta;
773    use datafusion_common::config::ConfigOptions;
774
775    fn invoke_date_bin_with_args(
776        args: Vec<ColumnarValue>,
777        number_rows: usize,
778        return_field: &FieldRef,
779    ) -> Result<ColumnarValue, DataFusionError> {
780        let arg_fields = args
781            .iter()
782            .map(|arg| Field::new("a", arg.data_type(), true).into())
783            .collect::<Vec<_>>();
784
785        let args = datafusion_expr::ScalarFunctionArgs {
786            args,
787            arg_fields,
788            number_rows,
789            return_field: Arc::clone(return_field),
790            config_options: Arc::new(ConfigOptions::default()),
791        };
792        DateBinFunc::new().invoke_with_args(args)
793    }
794
795    #[test]
796    fn test_date_bin() {
797        let return_field = &Arc::new(Field::new(
798            "f",
799            DataType::Timestamp(TimeUnit::Nanosecond, None),
800            true,
801        ));
802
803        let mut args = vec![
804            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
805                days: 0,
806                milliseconds: 1,
807            }))),
808            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
809            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
810        ];
811        let res = invoke_date_bin_with_args(args, 1, return_field);
812        assert!(res.is_ok());
813
814        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
815        let batch_len = timestamps.len();
816        args = vec![
817            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
818                days: 0,
819                milliseconds: 1,
820            }))),
821            ColumnarValue::Array(timestamps),
822            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
823        ];
824        let res = invoke_date_bin_with_args(args, batch_len, return_field);
825        assert!(res.is_ok());
826
827        args = vec![
828            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
829                days: 0,
830                milliseconds: 1,
831            }))),
832            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
833        ];
834        let res = invoke_date_bin_with_args(args, 1, return_field);
835        assert!(res.is_ok());
836
837        // stride supports month-day-nano
838        args = vec![
839            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
840                IntervalMonthDayNano {
841                    months: 0,
842                    days: 0,
843                    nanoseconds: 1,
844                },
845            ))),
846            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
847            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
848        ];
849        let res = invoke_date_bin_with_args(args, 1, return_field);
850        assert!(res.is_ok());
851
852        //
853        // Fallible test cases
854        //
855
856        // invalid number of arguments
857        args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
858            IntervalDayTime {
859                days: 0,
860                milliseconds: 1,
861            },
862        )))];
863        let res = invoke_date_bin_with_args(args, 1, return_field);
864        assert_eq!(
865            res.err().unwrap().strip_backtrace(),
866            "Execution error: DATE_BIN expected two or three arguments"
867        );
868
869        // stride: invalid type
870        args = vec![
871            ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
872            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
873            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
874        ];
875        let res = invoke_date_bin_with_args(args, 1, return_field);
876        assert_eq!(
877            res.err().unwrap().strip_backtrace(),
878            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
879        );
880
881        // stride: invalid value
882
883        args = vec![
884            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
885                days: 0,
886                milliseconds: 0,
887            }))),
888            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
889            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
890        ];
891
892        let res = invoke_date_bin_with_args(args, 1, return_field);
893        assert_eq!(
894            res.err().unwrap().strip_backtrace(),
895            "Execution error: DATE_BIN stride must be non-zero"
896        );
897
898        // stride: overflow of day-time interval
899        args = vec![
900            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
901                IntervalDayTime::MAX,
902            ))),
903            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
904            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
905        ];
906        let res = invoke_date_bin_with_args(args, 1, return_field);
907        assert_eq!(
908            res.err().unwrap().strip_backtrace(),
909            "Execution error: DATE_BIN stride argument is too large"
910        );
911
912        // stride: overflow of month-day-nano interval
913        args = vec![
914            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
915            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
916            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
917        ];
918        let res = invoke_date_bin_with_args(args, 1, return_field);
919        assert_eq!(
920            res.err().unwrap().strip_backtrace(),
921            "Execution error: DATE_BIN stride argument is too large"
922        );
923
924        // stride: month intervals
925        args = vec![
926            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
927            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
928            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
929        ];
930        let res = invoke_date_bin_with_args(args, 1, return_field);
931        assert_eq!(
932            res.err().unwrap().strip_backtrace(),
933            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
934        );
935
936        // origin: invalid type
937        args = vec![
938            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
939                days: 0,
940                milliseconds: 1,
941            }))),
942            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
943            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
944        ];
945        let res = invoke_date_bin_with_args(args, 1, return_field);
946        assert_eq!(
947            res.err().unwrap().strip_backtrace(),
948            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got Timestamp(µs)"
949        );
950
951        args = vec![
952            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
953                days: 0,
954                milliseconds: 1,
955            }))),
956            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
957            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
958        ];
959        let res = invoke_date_bin_with_args(args, 1, return_field);
960        assert!(res.is_ok());
961
962        // unsupported array type for stride
963        let intervals = Arc::new(
964            (1..6)
965                .map(|x| {
966                    Some(IntervalDayTime {
967                        days: 0,
968                        milliseconds: x,
969                    })
970                })
971                .collect::<IntervalDayTimeArray>(),
972        );
973        args = vec![
974            ColumnarValue::Array(intervals),
975            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
976            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
977        ];
978        let res = invoke_date_bin_with_args(args, 1, return_field);
979        assert_eq!(
980            res.err().unwrap().strip_backtrace(),
981            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
982        );
983
984        // unsupported array type for origin
985        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
986        let batch_len = timestamps.len();
987        args = vec![
988            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
989                days: 0,
990                milliseconds: 1,
991            }))),
992            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
993            ColumnarValue::Array(timestamps),
994        ];
995        let res = invoke_date_bin_with_args(args, batch_len, return_field);
996        assert_eq!(
997            res.err().unwrap().strip_backtrace(),
998            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
999        );
1000    }
1001
1002    #[test]
1003    fn test_date_bin_timezones() {
1004        let cases = [
1005            (
1006                vec![
1007                    "2020-09-08T00:00:00Z",
1008                    "2020-09-08T01:00:00Z",
1009                    "2020-09-08T02:00:00Z",
1010                    "2020-09-08T03:00:00Z",
1011                    "2020-09-08T04:00:00Z",
1012                ],
1013                Some("+00".into()),
1014                "1970-01-01T00:00:00Z",
1015                vec![
1016                    "2020-09-08T00:00:00Z",
1017                    "2020-09-08T00:00:00Z",
1018                    "2020-09-08T00:00:00Z",
1019                    "2020-09-08T00:00:00Z",
1020                    "2020-09-08T00:00:00Z",
1021                ],
1022            ),
1023            (
1024                vec![
1025                    "2020-09-08T00:00:00Z",
1026                    "2020-09-08T01:00:00Z",
1027                    "2020-09-08T02:00:00Z",
1028                    "2020-09-08T03:00:00Z",
1029                    "2020-09-08T04:00:00Z",
1030                ],
1031                None,
1032                "1970-01-01T00:00:00Z",
1033                vec![
1034                    "2020-09-08T00:00:00Z",
1035                    "2020-09-08T00:00:00Z",
1036                    "2020-09-08T00:00:00Z",
1037                    "2020-09-08T00:00:00Z",
1038                    "2020-09-08T00:00:00Z",
1039                ],
1040            ),
1041            (
1042                vec![
1043                    "2020-09-08T00:00:00Z",
1044                    "2020-09-08T01:00:00Z",
1045                    "2020-09-08T02:00:00Z",
1046                    "2020-09-08T03:00:00Z",
1047                    "2020-09-08T04:00:00Z",
1048                ],
1049                Some("-02".into()),
1050                "1970-01-01T00:00:00Z",
1051                vec![
1052                    "2020-09-08T00:00:00Z",
1053                    "2020-09-08T00:00:00Z",
1054                    "2020-09-08T00:00:00Z",
1055                    "2020-09-08T00:00:00Z",
1056                    "2020-09-08T00:00:00Z",
1057                ],
1058            ),
1059            (
1060                vec![
1061                    "2020-09-08T00:00:00+05",
1062                    "2020-09-08T01:00:00+05",
1063                    "2020-09-08T02:00:00+05",
1064                    "2020-09-08T03:00:00+05",
1065                    "2020-09-08T04:00:00+05",
1066                ],
1067                Some("+05".into()),
1068                "1970-01-01T00:00:00+05",
1069                vec![
1070                    "2020-09-08T00:00:00+05",
1071                    "2020-09-08T00:00:00+05",
1072                    "2020-09-08T00:00:00+05",
1073                    "2020-09-08T00:00:00+05",
1074                    "2020-09-08T00:00:00+05",
1075                ],
1076            ),
1077            (
1078                vec![
1079                    "2020-09-08T00:00:00+08",
1080                    "2020-09-08T01:00:00+08",
1081                    "2020-09-08T02:00:00+08",
1082                    "2020-09-08T03:00:00+08",
1083                    "2020-09-08T04:00:00+08",
1084                ],
1085                Some("+08".into()),
1086                "1970-01-01T00:00:00+08",
1087                vec![
1088                    "2020-09-08T00:00:00+08",
1089                    "2020-09-08T00:00:00+08",
1090                    "2020-09-08T00:00:00+08",
1091                    "2020-09-08T00:00:00+08",
1092                    "2020-09-08T00:00:00+08",
1093                ],
1094            ),
1095        ];
1096
1097        cases
1098            .iter()
1099            .for_each(|(original, tz_opt, origin, expected)| {
1100                let input = original
1101                    .iter()
1102                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1103                    .collect::<TimestampNanosecondArray>()
1104                    .with_timezone_opt(tz_opt.clone());
1105                let right = expected
1106                    .iter()
1107                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1108                    .collect::<TimestampNanosecondArray>()
1109                    .with_timezone_opt(tz_opt.clone());
1110                let batch_len = input.len();
1111                let args = vec![
1112                    ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
1113                    ColumnarValue::Array(Arc::new(input)),
1114                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1115                        Some(string_to_timestamp_nanos(origin).unwrap()),
1116                        tz_opt.clone(),
1117                    )),
1118                ];
1119                let return_field = &Arc::new(Field::new(
1120                    "f",
1121                    DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
1122                    true,
1123                ));
1124                let result =
1125                    invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
1126
1127                if let ColumnarValue::Array(result) = result {
1128                    assert_eq!(
1129                        result.data_type(),
1130                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
1131                    );
1132                    let left = arrow::array::cast::as_primitive_array::<
1133                        TimestampNanosecondType,
1134                    >(&result);
1135                    assert_eq!(left, &right);
1136                } else {
1137                    panic!("unexpected column type");
1138                }
1139            });
1140    }
1141
1142    #[test]
1143    fn test_date_bin_single() {
1144        let cases = [
1145            (
1146                (
1147                    TimeDelta::try_minutes(15),
1148                    "2004-04-09T02:03:04.123456789Z",
1149                    "2001-01-01T00:00:00",
1150                ),
1151                "2004-04-09T02:00:00Z",
1152            ),
1153            (
1154                (
1155                    TimeDelta::try_minutes(15),
1156                    "2004-04-09T02:03:04.123456789Z",
1157                    "2001-01-01T00:02:30",
1158                ),
1159                "2004-04-09T02:02:30Z",
1160            ),
1161            (
1162                (
1163                    TimeDelta::try_minutes(15),
1164                    "2004-04-09T02:03:04.123456789Z",
1165                    "2005-01-01T00:02:30",
1166                ),
1167                "2004-04-09T02:02:30Z",
1168            ),
1169            (
1170                (
1171                    TimeDelta::try_hours(1),
1172                    "2004-04-09T02:03:04.123456789Z",
1173                    "2001-01-01T00:00:00",
1174                ),
1175                "2004-04-09T02:00:00Z",
1176            ),
1177            (
1178                (
1179                    TimeDelta::try_seconds(10),
1180                    "2004-04-09T02:03:11.123456789Z",
1181                    "2001-01-01T00:00:00",
1182                ),
1183                "2004-04-09T02:03:10Z",
1184            ),
1185        ];
1186
1187        cases
1188            .iter()
1189            .for_each(|((stride, source, origin), expected)| {
1190                let stride = stride.unwrap();
1191                let stride1 = stride.num_nanoseconds().unwrap();
1192                let source1 = string_to_timestamp_nanos(source).unwrap();
1193                let origin1 = string_to_timestamp_nanos(origin).unwrap();
1194
1195                let expected1 = string_to_timestamp_nanos(expected).unwrap();
1196                let result = date_bin_nanos_interval(stride1, source1, origin1);
1197                assert_eq!(result, expected1, "{source} = {expected}");
1198            })
1199    }
1200
1201    #[test]
1202    fn test_date_bin_before_epoch() {
1203        let cases = [
1204            (
1205                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
1206                "1969-12-31T23:30:00",
1207            ),
1208            (
1209                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
1210                "1969-12-31T23:45:00",
1211            ),
1212            (
1213                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
1214                "1969-12-31T23:45:00",
1215            ),
1216        ];
1217
1218        cases.iter().for_each(|((stride, source), expected)| {
1219            let stride = stride.unwrap();
1220            let stride1 = stride.num_nanoseconds().unwrap();
1221            let source1 = string_to_timestamp_nanos(source).unwrap();
1222
1223            let expected1 = string_to_timestamp_nanos(expected).unwrap();
1224            let result = date_bin_nanos_interval(stride1, source1, 0);
1225            assert_eq!(result, expected1, "{source} = {expected}");
1226        })
1227    }
1228}