1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25 TimestampSecondType,
26};
27use arrow::array::{ArrayRef, AsArray, PrimitiveArray};
28use arrow::datatypes::DataType::{Time32, Time64, Timestamp};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{
32 DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
33 Time64NanosecondType, TimeUnit,
34};
35use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
36use datafusion_common::cast::as_primitive_array;
37use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err};
38use datafusion_expr::TypeSignature::Exact;
39use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
40use datafusion_expr::{
41 ColumnarValue, Documentation, ScalarUDFImpl, Signature, TIMEZONE_WILDCARD, Volatility,
42};
43use datafusion_macros::user_doc;
44
45use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
46
47#[user_doc(
48 doc_section(label = "Time and Date Functions"),
49 description = r#"
50Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
51
52For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
53"#,
54 syntax_example = "date_bin(interval, expression, origin-timestamp)",
55 sql_example = r#"```sql
56-- Bin the timestamp into 1 day intervals
57> SELECT date_bin(interval '1 day', time) as bin
58FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
59+---------------------+
60| bin |
61+---------------------+
62| 2023-01-01T00:00:00 |
63| 2023-01-03T00:00:00 |
64+---------------------+
652 row(s) fetched.
66
67-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
68> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
69FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
70+---------------------+
71| bin |
72+---------------------+
73| 2023-01-01T03:00:00 |
74| 2023-01-03T03:00:00 |
75+---------------------+
762 row(s) fetched.
77
78-- Bin the time into 15 minute intervals starting at 1 min
79> SELECT date_bin(interval '15 minutes', time, TIME '00:01:00') as bin
80FROM VALUES (TIME '02:18:18'), (TIME '19:00:03') t(time);
81+----------+
82| bin |
83+----------+
84| 02:16:00 |
85| 18:46:00 |
86+----------+
872 row(s) fetched.
88```"#,
89 argument(name = "interval", description = "Bin interval."),
90 argument(
91 name = "expression",
92 description = "Time expression to operate on. Can be a constant, column, or function."
93 ),
94 argument(
95 name = "origin-timestamp",
96 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
97
98 - nanoseconds
99 - microseconds
100 - milliseconds
101 - seconds
102 - minutes
103 - hours
104 - days
105 - weeks
106 - months
107 - years
108 - century
109"#
110 )
111)]
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct DateBinFunc {
114 signature: Signature,
115}
116
117impl Default for DateBinFunc {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl DateBinFunc {
124 pub fn new() -> Self {
125 let base_sig = |array_type: TimeUnit| {
126 let mut v = vec![
127 Exact(vec![
128 DataType::Interval(MonthDayNano),
129 Timestamp(array_type, None),
130 Timestamp(Nanosecond, None),
131 ]),
132 Exact(vec![
133 DataType::Interval(MonthDayNano),
134 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
135 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
136 ]),
137 Exact(vec![
138 DataType::Interval(DayTime),
139 Timestamp(array_type, None),
140 Timestamp(Nanosecond, None),
141 ]),
142 Exact(vec![
143 DataType::Interval(DayTime),
144 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
145 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
146 ]),
147 Exact(vec![
148 DataType::Interval(MonthDayNano),
149 Timestamp(array_type, None),
150 ]),
151 Exact(vec![
152 DataType::Interval(MonthDayNano),
153 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
154 ]),
155 Exact(vec![
156 DataType::Interval(DayTime),
157 Timestamp(array_type, None),
158 ]),
159 Exact(vec![
160 DataType::Interval(DayTime),
161 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
162 ]),
163 ];
164
165 match array_type {
166 Second | Millisecond => {
167 v.append(&mut vec![
168 Exact(vec![
169 DataType::Interval(MonthDayNano),
170 Time32(array_type),
171 Time32(array_type),
172 ]),
173 Exact(vec![DataType::Interval(MonthDayNano), Time32(array_type)]),
174 Exact(vec![
175 DataType::Interval(DayTime),
176 Time32(array_type),
177 Time32(array_type),
178 ]),
179 Exact(vec![DataType::Interval(DayTime), Time32(array_type)]),
180 ]);
181 }
182 Microsecond | Nanosecond => {
183 v.append(&mut vec![
184 Exact(vec![
185 DataType::Interval(DayTime),
186 Time64(array_type),
187 Time64(array_type),
188 ]),
189 Exact(vec![DataType::Interval(DayTime), Time64(array_type)]),
190 Exact(vec![
191 DataType::Interval(MonthDayNano),
192 Time64(array_type),
193 Time64(array_type),
194 ]),
195 Exact(vec![DataType::Interval(MonthDayNano), Time64(array_type)]),
196 ]);
197 }
198 }
199
200 v
201 };
202
203 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
204 .into_iter()
205 .map(base_sig)
206 .collect::<Vec<_>>()
207 .concat();
208
209 Self {
210 signature: Signature::one_of(full_sig, Volatility::Immutable),
211 }
212 }
213}
214
215impl ScalarUDFImpl for DateBinFunc {
216 fn as_any(&self) -> &dyn Any {
217 self
218 }
219
220 fn name(&self) -> &str {
221 "date_bin"
222 }
223
224 fn signature(&self) -> &Signature {
225 &self.signature
226 }
227
228 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
229 match &arg_types[1] {
230 Timestamp(tu, tz_opt) => Ok(Timestamp(*tu, tz_opt.clone())),
231 Time32(tu) => Ok(Time32(*tu)),
232 Time64(tu) => Ok(Time64(*tu)),
233 _ => plan_err!(
234 "The date_bin function can only accept timestamp or time as the second arg."
235 ),
236 }
237 }
238
239 fn invoke_with_args(
240 &self,
241 args: datafusion_expr::ScalarFunctionArgs,
242 ) -> Result<ColumnarValue> {
243 let args = &args.args;
244 if args.len() == 2 {
245 let origin = match args[1].data_type() {
246 Time32(Second) => {
247 ColumnarValue::Scalar(ScalarValue::Time32Second(Some(0)))
248 }
249 Time32(Millisecond) => {
250 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0)))
251 }
252 Time64(Microsecond) => {
253 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0)))
254 }
255 Time64(Nanosecond) => {
256 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0)))
257 }
258 _ => {
259 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
261 Some(0),
262 Some("+00:00".into()),
263 ))
264 }
265 };
266 date_bin_impl(&args[0], &args[1], &origin)
267 } else if args.len() == 3 {
268 date_bin_impl(&args[0], &args[1], &args[2])
269 } else {
270 exec_err!("DATE_BIN expected two or three arguments")
271 }
272 }
273
274 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
275 let step = &input[0];
277 let date_value = &input[1];
278 let reference = input.get(2);
279
280 if step.sort_properties.eq(&SortProperties::Singleton)
281 && reference
282 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
283 .unwrap_or(true)
284 {
285 Ok(date_value.sort_properties)
286 } else {
287 Ok(SortProperties::Unordered)
288 }
289 }
290 fn documentation(&self) -> Option<&Documentation> {
291 self.doc()
292 }
293}
294
295const NANOS_PER_MICRO: i64 = 1_000;
296const NANOS_PER_MILLI: i64 = 1_000_000;
297const NANOS_PER_SEC: i64 = NANOSECONDS;
298
299enum Interval {
300 Nanoseconds(i64),
301 Months(i64),
302}
303
304impl Interval {
305 fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
314 match self {
315 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
316 Interval::Months(months) => (*months, date_bin_months_interval),
317 }
318 }
319}
320
321fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
323 let time_diff = source - origin;
324
325 let time_delta = compute_distance(time_diff, stride_nanos);
327
328 origin + time_delta
329}
330
331fn compute_distance(time_diff: i64, stride: i64) -> i64 {
333 let time_delta = time_diff - (time_diff % stride);
334
335 if time_diff < 0 && stride > 1 && time_delta != time_diff {
336 time_delta - stride
338 } else {
339 time_delta
340 }
341}
342
343fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
345 let source_date = to_utc_date_time(source);
347 let origin_date = to_utc_date_time(origin);
348
349 let month_diff = (source_date.year() - origin_date.year()) * 12
351 + source_date.month() as i32
352 - origin_date.month() as i32;
353
354 let month_delta = compute_distance(month_diff as i64, stride_months);
356
357 let mut bin_time = if month_delta < 0 {
358 origin_date - Months::new(month_delta.unsigned_abs() as u32)
359 } else {
360 origin_date + Months::new(month_delta as u32)
361 };
362
363 if bin_time > source_date {
366 let month_delta = month_delta - stride_months;
367 bin_time = if month_delta < 0 {
368 origin_date - Months::new(month_delta.unsigned_abs() as u32)
369 } else {
370 origin_date + Months::new(month_delta as u32)
371 };
372 }
373
374 bin_time.timestamp_nanos_opt().unwrap()
375}
376
377fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
378 let secs = nanos / NANOS_PER_SEC;
379 let nsec = (nanos % NANOS_PER_SEC) as u32;
380 DateTime::from_timestamp(secs, nsec).unwrap()
381}
382
383fn date_bin_impl(
390 stride: &ColumnarValue,
391 array: &ColumnarValue,
392 origin: &ColumnarValue,
393) -> Result<ColumnarValue> {
394 let stride = match stride {
395 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
396 let (days, ms) = IntervalDayTimeType::to_parts(*v);
397 let nanos = (TimeDelta::try_days(days as i64).unwrap()
398 + TimeDelta::try_milliseconds(ms as i64).unwrap())
399 .num_nanoseconds();
400
401 match nanos {
402 Some(v) => Interval::Nanoseconds(v),
403 _ => return exec_err!("DATE_BIN stride argument is too large"),
404 }
405 }
406 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
407 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
408
409 if months != 0 {
411 if days != 0 || nanos != 0 {
413 return not_impl_err!(
414 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
415 );
416 } else {
417 Interval::Months(months as i64)
418 }
419 } else {
420 let nanos = (TimeDelta::try_days(days as i64).unwrap()
421 + Duration::nanoseconds(nanos))
422 .num_nanoseconds();
423 match nanos {
424 Some(v) => Interval::Nanoseconds(v),
425 _ => return exec_err!("DATE_BIN stride argument is too large"),
426 }
427 }
428 }
429 ColumnarValue::Scalar(v) => {
430 return exec_err!(
431 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
432 v.data_type()
433 );
434 }
435 ColumnarValue::Array(_) => {
436 return not_impl_err!(
437 "DATE_BIN only supports literal values for the stride argument, not arrays"
438 );
439 }
440 };
441
442 let (origin, is_time) = match origin {
443 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => {
444 (*v, false)
445 }
446 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => {
447 match stride {
448 Interval::Months(m) => {
449 if m > 0 {
450 return exec_err!(
451 "DATE_BIN stride for TIME input must be less than 1 day"
452 );
453 }
454 }
455 Interval::Nanoseconds(ns) => {
456 if ns >= NANOSECONDS_IN_DAY {
457 return exec_err!(
458 "DATE_BIN stride for TIME input must be less than 1 day"
459 );
460 }
461 }
462 }
463
464 (*v as i64 * NANOS_PER_MILLI, true)
465 }
466 ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => {
467 match stride {
468 Interval::Months(m) => {
469 if m > 0 {
470 return exec_err!(
471 "DATE_BIN stride for TIME input must be less than 1 day"
472 );
473 }
474 }
475 Interval::Nanoseconds(ns) => {
476 if ns >= NANOSECONDS_IN_DAY {
477 return exec_err!(
478 "DATE_BIN stride for TIME input must be less than 1 day"
479 );
480 }
481 }
482 }
483
484 (*v as i64 * NANOS_PER_SEC, true)
485 }
486 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => {
487 match stride {
488 Interval::Months(m) => {
489 if m > 0 {
490 return exec_err!(
491 "DATE_BIN stride for TIME input must be less than 1 day"
492 );
493 }
494 }
495 Interval::Nanoseconds(ns) => {
496 if ns >= NANOSECONDS_IN_DAY {
497 return exec_err!(
498 "DATE_BIN stride for TIME input must be less than 1 day"
499 );
500 }
501 }
502 }
503
504 (*v * NANOS_PER_MICRO, true)
505 }
506 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => {
507 match stride {
508 Interval::Months(m) => {
509 if m > 0 {
510 return exec_err!(
511 "DATE_BIN stride for TIME input must be less than 1 day"
512 );
513 }
514 }
515 Interval::Nanoseconds(ns) => {
516 if ns >= NANOSECONDS_IN_DAY {
517 return exec_err!(
518 "DATE_BIN stride for TIME input must be less than 1 day"
519 );
520 }
521 }
522 }
523
524 (*v, true)
525 }
526 ColumnarValue::Scalar(v) => {
527 return exec_err!(
528 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got {}",
529 v.data_type()
530 );
531 }
532 ColumnarValue::Array(_) => {
533 return not_impl_err!(
534 "DATE_BIN only supports literal values for the origin argument, not arrays"
535 );
536 }
537 };
538
539 let (stride, stride_fn) = stride.bin_fn();
540
541 if stride == 0 {
543 return exec_err!("DATE_BIN stride must be non-zero");
544 }
545
546 fn stride_map_fn<T: ArrowTimestampType>(
547 origin: i64,
548 stride: i64,
549 stride_fn: fn(i64, i64, i64) -> i64,
550 ) -> impl Fn(i64) -> i64 {
551 let scale = match T::UNIT {
552 Nanosecond => 1,
553 Microsecond => NANOS_PER_MICRO,
554 Millisecond => NANOS_PER_MILLI,
555 Second => NANOSECONDS,
556 };
557 move |x: i64| stride_fn(stride, x * scale, origin) / scale
558 }
559
560 Ok(match array {
561 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
562 let apply_stride_fn =
563 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
564 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
565 v.map(apply_stride_fn),
566 tz_opt.clone(),
567 ))
568 }
569 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
570 let apply_stride_fn =
571 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
572 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
573 v.map(apply_stride_fn),
574 tz_opt.clone(),
575 ))
576 }
577 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
578 let apply_stride_fn =
579 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
580 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
581 v.map(apply_stride_fn),
582 tz_opt.clone(),
583 ))
584 }
585 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
586 let apply_stride_fn =
587 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
588 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
589 v.map(apply_stride_fn),
590 tz_opt.clone(),
591 ))
592 }
593 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
594 if !is_time {
595 return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
596 }
597 let apply_stride_fn = move |x: i32| {
598 let binned_nanos = stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin);
599 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
600 (nanos / NANOS_PER_MILLI) as i32
601 };
602 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v.map(apply_stride_fn)))
603 }
604 ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
605 if !is_time {
606 return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
607 }
608 let apply_stride_fn = move |x: i32| {
609 let binned_nanos = stride_fn(stride, x as i64 * NANOS_PER_SEC, origin);
610 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
611 (nanos / NANOS_PER_SEC) as i32
612 };
613 ColumnarValue::Scalar(ScalarValue::Time32Second(v.map(apply_stride_fn)))
614 }
615 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
616 if !is_time {
617 return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
618 }
619 let apply_stride_fn = move |x: i64| {
620 let binned_nanos = stride_fn(stride, x, origin);
621 binned_nanos % (NANOSECONDS_IN_DAY)
622 };
623 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v.map(apply_stride_fn)))
624 }
625 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
626 if !is_time {
627 return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
628 }
629 let apply_stride_fn = move |x: i64| {
630 let binned_nanos = stride_fn(stride, x * NANOS_PER_MICRO, origin);
631 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
632 nanos / NANOS_PER_MICRO
633 };
634 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v.map(apply_stride_fn)))
635 }
636 ColumnarValue::Array(array) => {
637 fn transform_array_with_stride<T>(
638 origin: i64,
639 stride: i64,
640 stride_fn: fn(i64, i64, i64) -> i64,
641 array: &ArrayRef,
642 tz_opt: &Option<Arc<str>>,
643 ) -> Result<ColumnarValue>
644 where
645 T: ArrowTimestampType,
646 {
647 let array = as_primitive_array::<T>(array)?;
648 let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
649 let array: PrimitiveArray<T> = array
650 .unary(apply_stride_fn)
651 .with_timezone_opt(tz_opt.clone());
652
653 Ok(ColumnarValue::Array(Arc::new(array)))
654 }
655
656 match array.data_type() {
657 Timestamp(Nanosecond, tz_opt) => {
658 transform_array_with_stride::<TimestampNanosecondType>(
659 origin, stride, stride_fn, array, tz_opt,
660 )?
661 }
662 Timestamp(Microsecond, tz_opt) => {
663 transform_array_with_stride::<TimestampMicrosecondType>(
664 origin, stride, stride_fn, array, tz_opt,
665 )?
666 }
667 Timestamp(Millisecond, tz_opt) => {
668 transform_array_with_stride::<TimestampMillisecondType>(
669 origin, stride, stride_fn, array, tz_opt,
670 )?
671 }
672 Timestamp(Second, tz_opt) => {
673 transform_array_with_stride::<TimestampSecondType>(
674 origin, stride, stride_fn, array, tz_opt,
675 )?
676 }
677 Time32(Millisecond) => {
678 if !is_time {
679 return exec_err!(
680 "DATE_BIN with Time32 source requires Time32 origin"
681 );
682 }
683 let array = array.as_primitive::<Time32MillisecondType>();
684 let apply_stride_fn = move |x: i32| {
685 let binned_nanos =
686 stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin);
687 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
688 (nanos / NANOS_PER_MILLI) as i32
689 };
690 let array: PrimitiveArray<Time32MillisecondType> =
691 array.unary(apply_stride_fn);
692 ColumnarValue::Array(Arc::new(array))
693 }
694 Time32(Second) => {
695 if !is_time {
696 return exec_err!(
697 "DATE_BIN with Time32 source requires Time32 origin"
698 );
699 }
700 let array = array.as_primitive::<Time32SecondType>();
701 let apply_stride_fn = move |x: i32| {
702 let binned_nanos =
703 stride_fn(stride, x as i64 * NANOS_PER_SEC, origin);
704 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
705 (nanos / NANOS_PER_SEC) as i32
706 };
707 let array: PrimitiveArray<Time32SecondType> =
708 array.unary(apply_stride_fn);
709 ColumnarValue::Array(Arc::new(array))
710 }
711 Time64(Microsecond) => {
712 if !is_time {
713 return exec_err!(
714 "DATE_BIN with Time64 source requires Time64 origin"
715 );
716 }
717 let array = array.as_primitive::<Time64MicrosecondType>();
718 let apply_stride_fn = move |x: i64| {
719 let binned_nanos = stride_fn(stride, x * NANOS_PER_MICRO, origin);
720 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
721 nanos / NANOS_PER_MICRO
722 };
723 let array: PrimitiveArray<Time64MicrosecondType> =
724 array.unary(apply_stride_fn);
725 ColumnarValue::Array(Arc::new(array))
726 }
727 Time64(Nanosecond) => {
728 if !is_time {
729 return exec_err!(
730 "DATE_BIN with Time64 source requires Time64 origin"
731 );
732 }
733 let array = array.as_primitive::<Time64NanosecondType>();
734 let apply_stride_fn = move |x: i64| {
735 let binned_nanos = stride_fn(stride, x, origin);
736 binned_nanos % (NANOSECONDS_IN_DAY)
737 };
738 let array: PrimitiveArray<Time64NanosecondType> =
739 array.unary(apply_stride_fn);
740 ColumnarValue::Array(Arc::new(array))
741 }
742 _ => {
743 return exec_err!(
744 "DATE_BIN expects source argument to be a TIMESTAMP or TIME but got {}",
745 array.data_type()
746 );
747 }
748 }
749 }
750 _ => {
751 return exec_err!(
752 "DATE_BIN expects source argument to be a TIMESTAMP or TIME scalar or array"
753 );
754 }
755 })
756}
757
758#[cfg(test)]
759mod tests {
760 use std::sync::Arc;
761
762 use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval};
763 use arrow::array::types::TimestampNanosecondType;
764 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
765 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
766 use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
767
768 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
769 use datafusion_common::{DataFusionError, ScalarValue};
770 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
771
772 use chrono::TimeDelta;
773 use datafusion_common::config::ConfigOptions;
774
775 fn invoke_date_bin_with_args(
776 args: Vec<ColumnarValue>,
777 number_rows: usize,
778 return_field: &FieldRef,
779 ) -> Result<ColumnarValue, DataFusionError> {
780 let arg_fields = args
781 .iter()
782 .map(|arg| Field::new("a", arg.data_type(), true).into())
783 .collect::<Vec<_>>();
784
785 let args = datafusion_expr::ScalarFunctionArgs {
786 args,
787 arg_fields,
788 number_rows,
789 return_field: Arc::clone(return_field),
790 config_options: Arc::new(ConfigOptions::default()),
791 };
792 DateBinFunc::new().invoke_with_args(args)
793 }
794
795 #[test]
796 fn test_date_bin() {
797 let return_field = &Arc::new(Field::new(
798 "f",
799 DataType::Timestamp(TimeUnit::Nanosecond, None),
800 true,
801 ));
802
803 let mut args = vec![
804 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
805 days: 0,
806 milliseconds: 1,
807 }))),
808 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
809 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
810 ];
811 let res = invoke_date_bin_with_args(args, 1, return_field);
812 assert!(res.is_ok());
813
814 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
815 let batch_len = timestamps.len();
816 args = vec![
817 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
818 days: 0,
819 milliseconds: 1,
820 }))),
821 ColumnarValue::Array(timestamps),
822 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
823 ];
824 let res = invoke_date_bin_with_args(args, batch_len, return_field);
825 assert!(res.is_ok());
826
827 args = vec![
828 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
829 days: 0,
830 milliseconds: 1,
831 }))),
832 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
833 ];
834 let res = invoke_date_bin_with_args(args, 1, return_field);
835 assert!(res.is_ok());
836
837 args = vec![
839 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
840 IntervalMonthDayNano {
841 months: 0,
842 days: 0,
843 nanoseconds: 1,
844 },
845 ))),
846 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
847 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
848 ];
849 let res = invoke_date_bin_with_args(args, 1, return_field);
850 assert!(res.is_ok());
851
852 args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
858 IntervalDayTime {
859 days: 0,
860 milliseconds: 1,
861 },
862 )))];
863 let res = invoke_date_bin_with_args(args, 1, return_field);
864 assert_eq!(
865 res.err().unwrap().strip_backtrace(),
866 "Execution error: DATE_BIN expected two or three arguments"
867 );
868
869 args = vec![
871 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
872 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
873 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
874 ];
875 let res = invoke_date_bin_with_args(args, 1, return_field);
876 assert_eq!(
877 res.err().unwrap().strip_backtrace(),
878 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
879 );
880
881 args = vec![
884 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
885 days: 0,
886 milliseconds: 0,
887 }))),
888 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
889 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
890 ];
891
892 let res = invoke_date_bin_with_args(args, 1, return_field);
893 assert_eq!(
894 res.err().unwrap().strip_backtrace(),
895 "Execution error: DATE_BIN stride must be non-zero"
896 );
897
898 args = vec![
900 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
901 IntervalDayTime::MAX,
902 ))),
903 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
904 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
905 ];
906 let res = invoke_date_bin_with_args(args, 1, return_field);
907 assert_eq!(
908 res.err().unwrap().strip_backtrace(),
909 "Execution error: DATE_BIN stride argument is too large"
910 );
911
912 args = vec![
914 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
915 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
916 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
917 ];
918 let res = invoke_date_bin_with_args(args, 1, return_field);
919 assert_eq!(
920 res.err().unwrap().strip_backtrace(),
921 "Execution error: DATE_BIN stride argument is too large"
922 );
923
924 args = vec![
926 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
927 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
928 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
929 ];
930 let res = invoke_date_bin_with_args(args, 1, return_field);
931 assert_eq!(
932 res.err().unwrap().strip_backtrace(),
933 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
934 );
935
936 args = vec![
938 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
939 days: 0,
940 milliseconds: 1,
941 }))),
942 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
943 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
944 ];
945 let res = invoke_date_bin_with_args(args, 1, return_field);
946 assert_eq!(
947 res.err().unwrap().strip_backtrace(),
948 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got Timestamp(µs)"
949 );
950
951 args = vec![
952 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
953 days: 0,
954 milliseconds: 1,
955 }))),
956 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
957 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
958 ];
959 let res = invoke_date_bin_with_args(args, 1, return_field);
960 assert!(res.is_ok());
961
962 let intervals = Arc::new(
964 (1..6)
965 .map(|x| {
966 Some(IntervalDayTime {
967 days: 0,
968 milliseconds: x,
969 })
970 })
971 .collect::<IntervalDayTimeArray>(),
972 );
973 args = vec![
974 ColumnarValue::Array(intervals),
975 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
976 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
977 ];
978 let res = invoke_date_bin_with_args(args, 1, return_field);
979 assert_eq!(
980 res.err().unwrap().strip_backtrace(),
981 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
982 );
983
984 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
986 let batch_len = timestamps.len();
987 args = vec![
988 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
989 days: 0,
990 milliseconds: 1,
991 }))),
992 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
993 ColumnarValue::Array(timestamps),
994 ];
995 let res = invoke_date_bin_with_args(args, batch_len, return_field);
996 assert_eq!(
997 res.err().unwrap().strip_backtrace(),
998 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
999 );
1000 }
1001
1002 #[test]
1003 fn test_date_bin_timezones() {
1004 let cases = [
1005 (
1006 vec![
1007 "2020-09-08T00:00:00Z",
1008 "2020-09-08T01:00:00Z",
1009 "2020-09-08T02:00:00Z",
1010 "2020-09-08T03:00:00Z",
1011 "2020-09-08T04:00:00Z",
1012 ],
1013 Some("+00".into()),
1014 "1970-01-01T00:00:00Z",
1015 vec![
1016 "2020-09-08T00:00:00Z",
1017 "2020-09-08T00:00:00Z",
1018 "2020-09-08T00:00:00Z",
1019 "2020-09-08T00:00:00Z",
1020 "2020-09-08T00:00:00Z",
1021 ],
1022 ),
1023 (
1024 vec![
1025 "2020-09-08T00:00:00Z",
1026 "2020-09-08T01:00:00Z",
1027 "2020-09-08T02:00:00Z",
1028 "2020-09-08T03:00:00Z",
1029 "2020-09-08T04:00:00Z",
1030 ],
1031 None,
1032 "1970-01-01T00:00:00Z",
1033 vec![
1034 "2020-09-08T00:00:00Z",
1035 "2020-09-08T00:00:00Z",
1036 "2020-09-08T00:00:00Z",
1037 "2020-09-08T00:00:00Z",
1038 "2020-09-08T00:00:00Z",
1039 ],
1040 ),
1041 (
1042 vec![
1043 "2020-09-08T00:00:00Z",
1044 "2020-09-08T01:00:00Z",
1045 "2020-09-08T02:00:00Z",
1046 "2020-09-08T03:00:00Z",
1047 "2020-09-08T04:00:00Z",
1048 ],
1049 Some("-02".into()),
1050 "1970-01-01T00:00:00Z",
1051 vec![
1052 "2020-09-08T00:00:00Z",
1053 "2020-09-08T00:00:00Z",
1054 "2020-09-08T00:00:00Z",
1055 "2020-09-08T00:00:00Z",
1056 "2020-09-08T00:00:00Z",
1057 ],
1058 ),
1059 (
1060 vec![
1061 "2020-09-08T00:00:00+05",
1062 "2020-09-08T01:00:00+05",
1063 "2020-09-08T02:00:00+05",
1064 "2020-09-08T03:00:00+05",
1065 "2020-09-08T04:00:00+05",
1066 ],
1067 Some("+05".into()),
1068 "1970-01-01T00:00:00+05",
1069 vec![
1070 "2020-09-08T00:00:00+05",
1071 "2020-09-08T00:00:00+05",
1072 "2020-09-08T00:00:00+05",
1073 "2020-09-08T00:00:00+05",
1074 "2020-09-08T00:00:00+05",
1075 ],
1076 ),
1077 (
1078 vec![
1079 "2020-09-08T00:00:00+08",
1080 "2020-09-08T01:00:00+08",
1081 "2020-09-08T02:00:00+08",
1082 "2020-09-08T03:00:00+08",
1083 "2020-09-08T04:00:00+08",
1084 ],
1085 Some("+08".into()),
1086 "1970-01-01T00:00:00+08",
1087 vec![
1088 "2020-09-08T00:00:00+08",
1089 "2020-09-08T00:00:00+08",
1090 "2020-09-08T00:00:00+08",
1091 "2020-09-08T00:00:00+08",
1092 "2020-09-08T00:00:00+08",
1093 ],
1094 ),
1095 ];
1096
1097 cases
1098 .iter()
1099 .for_each(|(original, tz_opt, origin, expected)| {
1100 let input = original
1101 .iter()
1102 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1103 .collect::<TimestampNanosecondArray>()
1104 .with_timezone_opt(tz_opt.clone());
1105 let right = expected
1106 .iter()
1107 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1108 .collect::<TimestampNanosecondArray>()
1109 .with_timezone_opt(tz_opt.clone());
1110 let batch_len = input.len();
1111 let args = vec![
1112 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
1113 ColumnarValue::Array(Arc::new(input)),
1114 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1115 Some(string_to_timestamp_nanos(origin).unwrap()),
1116 tz_opt.clone(),
1117 )),
1118 ];
1119 let return_field = &Arc::new(Field::new(
1120 "f",
1121 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
1122 true,
1123 ));
1124 let result =
1125 invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
1126
1127 if let ColumnarValue::Array(result) = result {
1128 assert_eq!(
1129 result.data_type(),
1130 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
1131 );
1132 let left = arrow::array::cast::as_primitive_array::<
1133 TimestampNanosecondType,
1134 >(&result);
1135 assert_eq!(left, &right);
1136 } else {
1137 panic!("unexpected column type");
1138 }
1139 });
1140 }
1141
1142 #[test]
1143 fn test_date_bin_single() {
1144 let cases = [
1145 (
1146 (
1147 TimeDelta::try_minutes(15),
1148 "2004-04-09T02:03:04.123456789Z",
1149 "2001-01-01T00:00:00",
1150 ),
1151 "2004-04-09T02:00:00Z",
1152 ),
1153 (
1154 (
1155 TimeDelta::try_minutes(15),
1156 "2004-04-09T02:03:04.123456789Z",
1157 "2001-01-01T00:02:30",
1158 ),
1159 "2004-04-09T02:02:30Z",
1160 ),
1161 (
1162 (
1163 TimeDelta::try_minutes(15),
1164 "2004-04-09T02:03:04.123456789Z",
1165 "2005-01-01T00:02:30",
1166 ),
1167 "2004-04-09T02:02:30Z",
1168 ),
1169 (
1170 (
1171 TimeDelta::try_hours(1),
1172 "2004-04-09T02:03:04.123456789Z",
1173 "2001-01-01T00:00:00",
1174 ),
1175 "2004-04-09T02:00:00Z",
1176 ),
1177 (
1178 (
1179 TimeDelta::try_seconds(10),
1180 "2004-04-09T02:03:11.123456789Z",
1181 "2001-01-01T00:00:00",
1182 ),
1183 "2004-04-09T02:03:10Z",
1184 ),
1185 ];
1186
1187 cases
1188 .iter()
1189 .for_each(|((stride, source, origin), expected)| {
1190 let stride = stride.unwrap();
1191 let stride1 = stride.num_nanoseconds().unwrap();
1192 let source1 = string_to_timestamp_nanos(source).unwrap();
1193 let origin1 = string_to_timestamp_nanos(origin).unwrap();
1194
1195 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1196 let result = date_bin_nanos_interval(stride1, source1, origin1);
1197 assert_eq!(result, expected1, "{source} = {expected}");
1198 })
1199 }
1200
1201 #[test]
1202 fn test_date_bin_before_epoch() {
1203 let cases = [
1204 (
1205 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
1206 "1969-12-31T23:30:00",
1207 ),
1208 (
1209 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
1210 "1969-12-31T23:45:00",
1211 ),
1212 (
1213 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
1214 "1969-12-31T23:45:00",
1215 ),
1216 ];
1217
1218 cases.iter().for_each(|((stride, source), expected)| {
1219 let stride = stride.unwrap();
1220 let stride1 = stride.num_nanoseconds().unwrap();
1221 let source1 = string_to_timestamp_nanos(source).unwrap();
1222
1223 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1224 let result = date_bin_nanos_interval(stride1, source1, 0);
1225 assert_eq!(result, expected1, "{source} = {expected}");
1226 })
1227 }
1228}