1use std::sync::Arc;
19
20use arrow::array::temporal_conversions::NANOSECONDS;
21use arrow::array::types::{
22 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
23 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
24 TimestampSecondType,
25};
26use arrow::array::{ArrayRef, AsArray, PrimitiveArray};
27use arrow::datatypes::DataType::{Time32, Time64, Timestamp};
28use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
29use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
30use arrow::datatypes::{
31 DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
32 Time64NanosecondType, TimeUnit,
33};
34use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
35use datafusion_common::cast::as_primitive_array;
36use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err};
37use datafusion_expr::TypeSignature::Exact;
38use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
39use datafusion_expr::{
40 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
41 TIMEZONE_WILDCARD, Volatility,
42};
43use datafusion_macros::user_doc;
44
45use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
46
47#[user_doc(
48 doc_section(label = "Time and Date Functions"),
49 description = r#"
50Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
51
52For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
53"#,
54 syntax_example = "date_bin(interval, expression, origin-timestamp)",
55 sql_example = r#"```sql
56-- Bin the timestamp into 1 day intervals
57> SELECT date_bin(interval '1 day', time) as bin
58FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
59+---------------------+
60| bin |
61+---------------------+
62| 2023-01-01T00:00:00 |
63| 2023-01-03T00:00:00 |
64+---------------------+
652 row(s) fetched.
66
67-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
68> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
69FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
70+---------------------+
71| bin |
72+---------------------+
73| 2023-01-01T03:00:00 |
74| 2023-01-03T03:00:00 |
75+---------------------+
762 row(s) fetched.
77
78-- Bin the time into 15 minute intervals starting at 1 min
79> SELECT date_bin(interval '15 minutes', time, TIME '00:01:00') as bin
80FROM VALUES (TIME '02:18:18'), (TIME '19:00:03') t(time);
81+----------+
82| bin |
83+----------+
84| 02:16:00 |
85| 18:46:00 |
86+----------+
872 row(s) fetched.
88```"#,
89 argument(name = "interval", description = "Bin interval."),
90 argument(
91 name = "expression",
92 description = "Time expression to operate on. Can be a constant, column, or function."
93 ),
94 argument(
95 name = "origin-timestamp",
96 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
97
98 - nanoseconds
99 - microseconds
100 - milliseconds
101 - seconds
102 - minutes
103 - hours
104 - days
105 - weeks
106 - months
107 - years
108 - century
109"#
110 )
111)]
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct DateBinFunc {
114 signature: Signature,
115}
116
117impl Default for DateBinFunc {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl DateBinFunc {
124 pub fn new() -> Self {
125 let base_sig = |array_type: TimeUnit| {
126 let mut v = vec![
127 Exact(vec![
128 DataType::Interval(MonthDayNano),
129 Timestamp(array_type, None),
130 Timestamp(Nanosecond, None),
131 ]),
132 Exact(vec![
133 DataType::Interval(MonthDayNano),
134 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
135 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
136 ]),
137 Exact(vec![
138 DataType::Interval(DayTime),
139 Timestamp(array_type, None),
140 Timestamp(Nanosecond, None),
141 ]),
142 Exact(vec![
143 DataType::Interval(DayTime),
144 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
145 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
146 ]),
147 Exact(vec![
148 DataType::Interval(MonthDayNano),
149 Timestamp(array_type, None),
150 ]),
151 Exact(vec![
152 DataType::Interval(MonthDayNano),
153 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
154 ]),
155 Exact(vec![
156 DataType::Interval(DayTime),
157 Timestamp(array_type, None),
158 ]),
159 Exact(vec![
160 DataType::Interval(DayTime),
161 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
162 ]),
163 ];
164
165 match array_type {
166 Second | Millisecond => {
167 v.append(&mut vec![
168 Exact(vec![
169 DataType::Interval(MonthDayNano),
170 Time32(array_type),
171 Time32(array_type),
172 ]),
173 Exact(vec![DataType::Interval(MonthDayNano), Time32(array_type)]),
174 Exact(vec![
175 DataType::Interval(DayTime),
176 Time32(array_type),
177 Time32(array_type),
178 ]),
179 Exact(vec![DataType::Interval(DayTime), Time32(array_type)]),
180 ]);
181 }
182 Microsecond | Nanosecond => {
183 v.append(&mut vec![
184 Exact(vec![
185 DataType::Interval(DayTime),
186 Time64(array_type),
187 Time64(array_type),
188 ]),
189 Exact(vec![DataType::Interval(DayTime), Time64(array_type)]),
190 Exact(vec![
191 DataType::Interval(MonthDayNano),
192 Time64(array_type),
193 Time64(array_type),
194 ]),
195 Exact(vec![DataType::Interval(MonthDayNano), Time64(array_type)]),
196 ]);
197 }
198 }
199
200 v
201 };
202
203 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
204 .into_iter()
205 .map(base_sig)
206 .collect::<Vec<_>>()
207 .concat();
208
209 Self {
210 signature: Signature::one_of(full_sig, Volatility::Immutable),
211 }
212 }
213}
214
215impl ScalarUDFImpl for DateBinFunc {
216 fn name(&self) -> &str {
217 "date_bin"
218 }
219
220 fn signature(&self) -> &Signature {
221 &self.signature
222 }
223
224 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
225 match &arg_types[1] {
226 Timestamp(tu, tz_opt) => Ok(Timestamp(*tu, tz_opt.clone())),
227 Time32(tu) => Ok(Time32(*tu)),
228 Time64(tu) => Ok(Time64(*tu)),
229 _ => plan_err!(
230 "The date_bin function can only accept timestamp or time as the second arg."
231 ),
232 }
233 }
234
235 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
236 let args = &args.args;
237 if args.len() == 2 {
238 let origin = match args[1].data_type() {
239 Time32(Second) => {
240 ColumnarValue::Scalar(ScalarValue::Time32Second(Some(0)))
241 }
242 Time32(Millisecond) => {
243 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0)))
244 }
245 Time64(Microsecond) => {
246 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0)))
247 }
248 Time64(Nanosecond) => {
249 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0)))
250 }
251 _ => {
252 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
254 Some(0),
255 Some("+00:00".into()),
256 ))
257 }
258 };
259 date_bin_impl(&args[0], &args[1], &origin)
260 } else if args.len() == 3 {
261 date_bin_impl(&args[0], &args[1], &args[2])
262 } else {
263 exec_err!("DATE_BIN expected two or three arguments")
264 }
265 }
266
267 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
268 let step = &input[0];
270 let date_value = &input[1];
271 let reference = input.get(2);
272
273 if step.sort_properties.eq(&SortProperties::Singleton)
274 && reference
275 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
276 .unwrap_or(true)
277 {
278 Ok(date_value.sort_properties)
279 } else {
280 Ok(SortProperties::Unordered)
281 }
282 }
283 fn documentation(&self) -> Option<&Documentation> {
284 self.doc()
285 }
286}
287
288const NANOS_PER_MICRO: i64 = 1_000;
289const NANOS_PER_MILLI: i64 = 1_000_000;
290const NANOS_PER_SEC: i64 = NANOSECONDS;
291type BinFunction = fn(i64, i64, i64) -> Result<i64>;
300enum Interval {
301 Nanoseconds(i64),
302 Months(i64),
303}
304
305impl Interval {
306 fn bin_fn(&self) -> (i64, BinFunction) {
315 match self {
316 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
317 Interval::Months(months) => (*months, date_bin_months_interval),
318 }
319 }
320}
321
322fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Result<i64> {
324 let time_diff = source.checked_sub(origin).ok_or_else(|| {
325 arrow::error::ArrowError::InvalidArgumentError(format!(
326 "date_bin source timestamp {source} - origin {origin} overflows i64"
327 ))
328 })?;
329
330 let time_delta = compute_distance(time_diff, stride_nanos);
332
333 Ok(origin + time_delta)
334}
335
336fn compute_distance(time_diff: i64, stride: i64) -> i64 {
338 let time_delta = time_diff - (time_diff % stride);
339
340 if time_diff < 0 && stride > 1 && time_delta != time_diff {
341 time_delta - stride
343 } else {
344 time_delta
345 }
346}
347
348fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Result<i64> {
350 let source_date = to_utc_date_time(source)?;
352 let origin_date = to_utc_date_time(origin)?;
353
354 let month_diff = (source_date.year() - origin_date.year()) * 12
356 + source_date.month() as i32
357 - origin_date.month() as i32;
358
359 let month_delta = compute_distance(month_diff as i64, stride_months);
361
362 let mut bin_time = if month_delta < 0 {
363 match origin_date
364 .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
365 {
366 Some(dt) => dt,
367 None => return exec_err!("DATE_BIN month subtraction out of range"),
368 }
369 } else {
370 match origin_date.checked_add_months(Months::new(month_delta as u32)) {
371 Some(dt) => dt,
372 None => return exec_err!("DATE_BIN month addition out of range"),
373 }
374 };
375
376 if bin_time > source_date {
379 let month_delta = month_delta - stride_months;
380 bin_time = if month_delta < 0 {
381 match origin_date
382 .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
383 {
384 Some(dt) => dt,
385 None => return exec_err!("DATE_BIN month subtraction out of range"),
386 }
387 } else {
388 match origin_date.checked_add_months(Months::new(month_delta as u32)) {
389 Some(dt) => dt,
390 None => return exec_err!("DATE_BIN month addition out of range"),
391 }
392 };
393 }
394 match bin_time.timestamp_nanos_opt() {
395 Some(nanos) => Ok(nanos),
396 None => exec_err!("DATE_BIN result timestamp out of range"),
397 }
398}
399
400fn to_utc_date_time(nanos: i64) -> Result<DateTime<Utc>> {
401 let secs = nanos / NANOS_PER_SEC;
402 let nsec = (nanos % NANOS_PER_SEC) as u32;
403 match DateTime::from_timestamp(secs, nsec) {
404 Some(dt) => Ok(dt),
405 None => exec_err!("Invalid timestamp value"),
406 }
407}
408
409fn date_bin_impl(
416 stride: &ColumnarValue,
417 array: &ColumnarValue,
418 origin: &ColumnarValue,
419) -> Result<ColumnarValue> {
420 let stride = match stride {
421 ColumnarValue::Scalar(s) if s.is_null() => {
422 return Ok(ColumnarValue::Scalar(ScalarValue::try_from(
424 array.data_type(),
425 )?));
426 }
427 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
428 let (days, ms) = IntervalDayTimeType::to_parts(*v);
429 let nanos = (TimeDelta::try_days(days as i64).unwrap()
430 + TimeDelta::try_milliseconds(ms as i64).unwrap())
431 .num_nanoseconds();
432
433 match nanos {
434 Some(v) => Interval::Nanoseconds(v),
435 _ => return exec_err!("DATE_BIN stride argument is too large"),
436 }
437 }
438 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
439 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
440
441 if months != 0 {
443 if days != 0 || nanos != 0 {
445 return not_impl_err!(
446 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
447 );
448 } else {
449 Interval::Months(months as i64)
450 }
451 } else {
452 let nanos = (TimeDelta::try_days(days as i64).unwrap()
453 + Duration::nanoseconds(nanos))
454 .num_nanoseconds();
455 match nanos {
456 Some(v) => Interval::Nanoseconds(v),
457 _ => return exec_err!("DATE_BIN stride argument is too large"),
458 }
459 }
460 }
461 ColumnarValue::Scalar(v) => {
462 return exec_err!(
463 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
464 v.data_type()
465 );
466 }
467 ColumnarValue::Array(_) => {
468 return not_impl_err!(
469 "DATE_BIN only supports literal values for the stride argument, not arrays"
470 );
471 }
472 };
473
474 let (origin, is_time) = match origin {
475 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => {
476 (*v, false)
477 }
478 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => {
479 match stride {
480 Interval::Months(m) => {
481 if m > 0 {
482 return exec_err!(
483 "DATE_BIN stride for TIME input must be less than 1 day"
484 );
485 }
486 }
487 Interval::Nanoseconds(ns) => {
488 if ns >= NANOSECONDS_IN_DAY {
489 return exec_err!(
490 "DATE_BIN stride for TIME input must be less than 1 day"
491 );
492 }
493 }
494 }
495
496 (*v as i64 * NANOS_PER_MILLI, true)
497 }
498 ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => {
499 match stride {
500 Interval::Months(m) => {
501 if m > 0 {
502 return exec_err!(
503 "DATE_BIN stride for TIME input must be less than 1 day"
504 );
505 }
506 }
507 Interval::Nanoseconds(ns) => {
508 if ns >= NANOSECONDS_IN_DAY {
509 return exec_err!(
510 "DATE_BIN stride for TIME input must be less than 1 day"
511 );
512 }
513 }
514 }
515
516 (*v as i64 * NANOS_PER_SEC, true)
517 }
518 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => {
519 match stride {
520 Interval::Months(m) => {
521 if m > 0 {
522 return exec_err!(
523 "DATE_BIN stride for TIME input must be less than 1 day"
524 );
525 }
526 }
527 Interval::Nanoseconds(ns) => {
528 if ns >= NANOSECONDS_IN_DAY {
529 return exec_err!(
530 "DATE_BIN stride for TIME input must be less than 1 day"
531 );
532 }
533 }
534 }
535
536 (*v * NANOS_PER_MICRO, true)
537 }
538 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => {
539 match stride {
540 Interval::Months(m) => {
541 if m > 0 {
542 return exec_err!(
543 "DATE_BIN stride for TIME input must be less than 1 day"
544 );
545 }
546 }
547 Interval::Nanoseconds(ns) => {
548 if ns >= NANOSECONDS_IN_DAY {
549 return exec_err!(
550 "DATE_BIN stride for TIME input must be less than 1 day"
551 );
552 }
553 }
554 }
555
556 (*v, true)
557 }
558 ColumnarValue::Scalar(v) => {
559 return exec_err!(
560 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got {}",
561 v.data_type()
562 );
563 }
564 ColumnarValue::Array(_) => {
565 return not_impl_err!(
566 "DATE_BIN only supports literal values for the origin argument, not arrays"
567 );
568 }
569 };
570
571 let (stride, stride_fn) = stride.bin_fn();
572
573 if stride == 0 {
575 return exec_err!("DATE_BIN stride must be non-zero");
576 }
577
578 fn stride_map_fn<T: ArrowTimestampType>(
579 origin: i64,
580 stride: i64,
581 stride_fn: BinFunction,
582 ) -> impl Fn(i64) -> Result<i64> {
583 let scale = match T::UNIT {
584 Nanosecond => 1,
585 Microsecond => NANOS_PER_MICRO,
586 Millisecond => NANOS_PER_MILLI,
587 Second => NANOSECONDS,
588 };
589 move |x: i64| match stride_fn(stride, x * scale, origin) {
590 Ok(result) => Ok(result / scale),
591 Err(e) => Err(e),
592 }
593 }
594
595 Ok(match array {
596 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
597 let apply_stride_fn =
598 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
599 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
600 v.and_then(|val| apply_stride_fn(val).ok()),
601 tz_opt.clone(),
602 ))
603 }
604 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
605 let apply_stride_fn =
606 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
607 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
608 v.and_then(|val| apply_stride_fn(val).ok()),
609 tz_opt.clone(),
610 ))
611 }
612 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
613 let apply_stride_fn =
614 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
615 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
616 v.and_then(|val| apply_stride_fn(val).ok()),
617 tz_opt.clone(),
618 ))
619 }
620 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
621 let apply_stride_fn =
622 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
623 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
624 v.and_then(|val| apply_stride_fn(val).ok()),
625 tz_opt.clone(),
626 ))
627 }
628 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
629 if !is_time {
630 return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
631 }
632 let result = v.and_then(|x| {
633 match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) {
634 Ok(binned_nanos) => {
635 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
636 Some((nanos / NANOS_PER_MILLI) as i32)
637 }
638 Err(_) => None,
639 }
640 });
641 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result))
642 }
643 ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
644 if !is_time {
645 return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
646 }
647 let result = v.and_then(|x| {
648 match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) {
649 Ok(binned_nanos) => {
650 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
651 Some((nanos / NANOS_PER_SEC) as i32)
652 }
653 Err(_) => None,
654 }
655 });
656 ColumnarValue::Scalar(ScalarValue::Time32Second(result))
657 }
658 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
659 if !is_time {
660 return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
661 }
662 let result = v.and_then(|x| match stride_fn(stride, x, origin) {
663 Ok(binned_nanos) => Some(binned_nanos % (NANOSECONDS_IN_DAY)),
664 Err(_) => None,
665 });
666 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result))
667 }
668 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
669 if !is_time {
670 return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
671 }
672 let result =
673 v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) {
674 Ok(binned_nanos) => {
675 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
676 Some(nanos / NANOS_PER_MICRO)
677 }
678 Err(_) => None,
679 });
680 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result))
681 }
682 ColumnarValue::Array(array) => {
683 fn transform_array_with_stride<T>(
684 origin: i64,
685 stride: i64,
686 stride_fn: BinFunction,
687 array: &ArrayRef,
688 tz_opt: &Option<Arc<str>>,
689 ) -> Result<ColumnarValue>
690 where
691 T: ArrowTimestampType,
692 {
693 let array = as_primitive_array::<T>(array)?;
694 let scale = match T::UNIT {
695 Nanosecond => 1,
696 Microsecond => NANOS_PER_MICRO,
697 Millisecond => NANOS_PER_MILLI,
698 Second => NANOSECONDS,
699 };
700
701 let result: PrimitiveArray<T> = array.try_unary(|val| {
702 stride_fn(stride, val * scale, origin)
703 .map(|binned| binned / scale)
704 .map_err(|e| {
705 arrow::error::ArrowError::ComputeError(e.to_string())
706 })
707 })?;
708
709 let array = result.with_timezone_opt(tz_opt.clone());
710 Ok(ColumnarValue::Array(Arc::new(array)))
711 }
712
713 match array.data_type() {
714 Timestamp(Nanosecond, tz_opt) => {
715 transform_array_with_stride::<TimestampNanosecondType>(
716 origin, stride, stride_fn, array, tz_opt,
717 )?
718 }
719 Timestamp(Microsecond, tz_opt) => {
720 transform_array_with_stride::<TimestampMicrosecondType>(
721 origin, stride, stride_fn, array, tz_opt,
722 )?
723 }
724 Timestamp(Millisecond, tz_opt) => {
725 transform_array_with_stride::<TimestampMillisecondType>(
726 origin, stride, stride_fn, array, tz_opt,
727 )?
728 }
729 Timestamp(Second, tz_opt) => {
730 transform_array_with_stride::<TimestampSecondType>(
731 origin, stride, stride_fn, array, tz_opt,
732 )?
733 }
734 Time32(Millisecond) => {
735 if !is_time {
736 return exec_err!(
737 "DATE_BIN with Time32 source requires Time32 origin"
738 );
739 }
740 let array = array.as_primitive::<Time32MillisecondType>();
741 let result: PrimitiveArray<Time32MillisecondType> =
742 array.try_unary(|x| {
743 stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin)
744 .map(|binned_nanos| {
745 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
746 (nanos / NANOS_PER_MILLI) as i32
747 })
748 .map_err(|e| {
749 arrow::error::ArrowError::ComputeError(e.to_string())
750 })
751 })?;
752 ColumnarValue::Array(Arc::new(result))
753 }
754 Time32(Second) => {
755 if !is_time {
756 return exec_err!(
757 "DATE_BIN with Time32 source requires Time32 origin"
758 );
759 }
760 let array = array.as_primitive::<Time32SecondType>();
761 let result: PrimitiveArray<Time32SecondType> =
762 array.try_unary(|x| {
763 stride_fn(stride, x as i64 * NANOS_PER_SEC, origin)
764 .map(|binned_nanos| {
765 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
766 (nanos / NANOS_PER_SEC) as i32
767 })
768 .map_err(|e| {
769 arrow::error::ArrowError::ComputeError(e.to_string())
770 })
771 })?;
772 ColumnarValue::Array(Arc::new(result))
773 }
774 Time64(Microsecond) => {
775 if !is_time {
776 return exec_err!(
777 "DATE_BIN with Time64 source requires Time64 origin"
778 );
779 }
780 let array = array.as_primitive::<Time64MicrosecondType>();
781 let result: PrimitiveArray<Time64MicrosecondType> =
782 array.try_unary(|x| {
783 stride_fn(stride, x * NANOS_PER_MICRO, origin)
784 .map(|binned_nanos| {
785 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
786 nanos / NANOS_PER_MICRO
787 })
788 .map_err(|e| {
789 arrow::error::ArrowError::ComputeError(e.to_string())
790 })
791 })?;
792 ColumnarValue::Array(Arc::new(result))
793 }
794 Time64(Nanosecond) => {
795 if !is_time {
796 return exec_err!(
797 "DATE_BIN with Time64 source requires Time64 origin"
798 );
799 }
800 let array = array.as_primitive::<Time64NanosecondType>();
801 let result: PrimitiveArray<Time64NanosecondType> =
802 array.try_unary(|x| {
803 stride_fn(stride, x, origin)
804 .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY))
805 .map_err(|e| {
806 arrow::error::ArrowError::ComputeError(e.to_string())
807 })
808 })?;
809 ColumnarValue::Array(Arc::new(result))
810 }
811 _ => {
812 return exec_err!(
813 "DATE_BIN expects source argument to be a TIMESTAMP or TIME but got {}",
814 array.data_type()
815 );
816 }
817 }
818 }
819 _ => {
820 return exec_err!(
821 "DATE_BIN expects source argument to be a TIMESTAMP or TIME scalar or array"
822 );
823 }
824 })
825}
826
827#[cfg(test)]
828mod tests {
829 use std::sync::Arc;
830
831 use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval};
832 use arrow::array::types::TimestampNanosecondType;
833 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
834 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
835 use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
836
837 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
838 use datafusion_common::{DataFusionError, ScalarValue};
839 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
840
841 use chrono::TimeDelta;
842 use datafusion_common::config::ConfigOptions;
843
844 fn invoke_date_bin_with_args(
845 args: Vec<ColumnarValue>,
846 number_rows: usize,
847 return_field: &FieldRef,
848 ) -> Result<ColumnarValue, DataFusionError> {
849 let arg_fields = args
850 .iter()
851 .map(|arg| Field::new("a", arg.data_type(), true).into())
852 .collect::<Vec<_>>();
853
854 let args = ScalarFunctionArgs {
855 args,
856 arg_fields,
857 number_rows,
858 return_field: Arc::clone(return_field),
859 config_options: Arc::new(ConfigOptions::default()),
860 };
861 DateBinFunc::new().invoke_with_args(args)
862 }
863
864 #[test]
865 fn test_date_bin() {
866 let return_field = &Arc::new(Field::new(
867 "f",
868 DataType::Timestamp(TimeUnit::Nanosecond, None),
869 true,
870 ));
871
872 let mut args = vec![
873 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
874 days: 0,
875 milliseconds: 1,
876 }))),
877 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
878 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
879 ];
880 let res = invoke_date_bin_with_args(args, 1, return_field);
881 assert!(res.is_ok());
882
883 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
884 let batch_len = timestamps.len();
885 args = vec![
886 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
887 days: 0,
888 milliseconds: 1,
889 }))),
890 ColumnarValue::Array(timestamps),
891 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
892 ];
893 let res = invoke_date_bin_with_args(args, batch_len, return_field);
894 assert!(res.is_ok());
895
896 args = vec![
897 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
898 days: 0,
899 milliseconds: 1,
900 }))),
901 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
902 ];
903 let res = invoke_date_bin_with_args(args, 1, return_field);
904 assert!(res.is_ok());
905
906 args = vec![
908 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
909 IntervalMonthDayNano {
910 months: 0,
911 days: 0,
912 nanoseconds: 1,
913 },
914 ))),
915 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
916 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
917 ];
918 let res = invoke_date_bin_with_args(args, 1, return_field);
919 assert!(res.is_ok());
920
921 args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
927 IntervalDayTime {
928 days: 0,
929 milliseconds: 1,
930 },
931 )))];
932 let res = invoke_date_bin_with_args(args, 1, return_field);
933 assert_eq!(
934 res.err().unwrap().strip_backtrace(),
935 "Execution error: DATE_BIN expected two or three arguments"
936 );
937
938 args = vec![
940 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
941 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
942 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
943 ];
944 let res = invoke_date_bin_with_args(args, 1, return_field);
945 assert_eq!(
946 res.err().unwrap().strip_backtrace(),
947 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
948 );
949
950 args = vec![
953 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
954 days: 0,
955 milliseconds: 0,
956 }))),
957 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
958 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
959 ];
960
961 let res = invoke_date_bin_with_args(args, 1, return_field);
962 assert_eq!(
963 res.err().unwrap().strip_backtrace(),
964 "Execution error: DATE_BIN stride must be non-zero"
965 );
966
967 args = vec![
969 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
970 IntervalDayTime::MAX,
971 ))),
972 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
973 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
974 ];
975 let res = invoke_date_bin_with_args(args, 1, return_field);
976 assert_eq!(
977 res.err().unwrap().strip_backtrace(),
978 "Execution error: DATE_BIN stride argument is too large"
979 );
980
981 args = vec![
983 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
984 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
985 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
986 ];
987 let res = invoke_date_bin_with_args(args, 1, return_field);
988 assert_eq!(
989 res.err().unwrap().strip_backtrace(),
990 "Execution error: DATE_BIN stride argument is too large"
991 );
992
993 args = vec![
995 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
996 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
997 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
998 ];
999 let res = invoke_date_bin_with_args(args, 1, return_field);
1000 assert_eq!(
1001 res.err().unwrap().strip_backtrace(),
1002 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
1003 );
1004
1005 args = vec![
1007 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1008 days: 0,
1009 milliseconds: 1,
1010 }))),
1011 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1012 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1013 ];
1014 let res = invoke_date_bin_with_args(args, 1, return_field);
1015 assert_eq!(
1016 res.err().unwrap().strip_backtrace(),
1017 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got Timestamp(µs)"
1018 );
1019
1020 args = vec![
1021 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1022 days: 0,
1023 milliseconds: 1,
1024 }))),
1025 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1026 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1027 ];
1028 let res = invoke_date_bin_with_args(args, 1, return_field);
1029 assert!(res.is_ok());
1030
1031 let intervals = Arc::new(
1033 (1..6)
1034 .map(|x| {
1035 Some(IntervalDayTime {
1036 days: 0,
1037 milliseconds: x,
1038 })
1039 })
1040 .collect::<IntervalDayTimeArray>(),
1041 );
1042 args = vec![
1043 ColumnarValue::Array(intervals),
1044 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1045 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1046 ];
1047 let res = invoke_date_bin_with_args(args, 1, return_field);
1048 assert_eq!(
1049 res.err().unwrap().strip_backtrace(),
1050 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
1051 );
1052
1053 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
1055 let batch_len = timestamps.len();
1056 args = vec![
1057 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1058 days: 0,
1059 milliseconds: 1,
1060 }))),
1061 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1062 ColumnarValue::Array(timestamps),
1063 ];
1064 let res = invoke_date_bin_with_args(args, batch_len, return_field);
1065 assert_eq!(
1066 res.err().unwrap().strip_backtrace(),
1067 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
1068 );
1069 }
1070
1071 #[test]
1072 fn test_date_bin_timezones() {
1073 let cases = [
1074 (
1075 vec![
1076 "2020-09-08T00:00:00Z",
1077 "2020-09-08T01:00:00Z",
1078 "2020-09-08T02:00:00Z",
1079 "2020-09-08T03:00:00Z",
1080 "2020-09-08T04:00:00Z",
1081 ],
1082 Some("+00".into()),
1083 "1970-01-01T00:00:00Z",
1084 vec![
1085 "2020-09-08T00:00:00Z",
1086 "2020-09-08T00:00:00Z",
1087 "2020-09-08T00:00:00Z",
1088 "2020-09-08T00:00:00Z",
1089 "2020-09-08T00:00:00Z",
1090 ],
1091 ),
1092 (
1093 vec![
1094 "2020-09-08T00:00:00Z",
1095 "2020-09-08T01:00:00Z",
1096 "2020-09-08T02:00:00Z",
1097 "2020-09-08T03:00:00Z",
1098 "2020-09-08T04:00:00Z",
1099 ],
1100 None,
1101 "1970-01-01T00:00:00Z",
1102 vec![
1103 "2020-09-08T00:00:00Z",
1104 "2020-09-08T00:00:00Z",
1105 "2020-09-08T00:00:00Z",
1106 "2020-09-08T00:00:00Z",
1107 "2020-09-08T00:00:00Z",
1108 ],
1109 ),
1110 (
1111 vec![
1112 "2020-09-08T00:00:00Z",
1113 "2020-09-08T01:00:00Z",
1114 "2020-09-08T02:00:00Z",
1115 "2020-09-08T03:00:00Z",
1116 "2020-09-08T04:00:00Z",
1117 ],
1118 Some("-02".into()),
1119 "1970-01-01T00:00:00Z",
1120 vec![
1121 "2020-09-08T00:00:00Z",
1122 "2020-09-08T00:00:00Z",
1123 "2020-09-08T00:00:00Z",
1124 "2020-09-08T00:00:00Z",
1125 "2020-09-08T00:00:00Z",
1126 ],
1127 ),
1128 (
1129 vec![
1130 "2020-09-08T00:00:00+05",
1131 "2020-09-08T01:00:00+05",
1132 "2020-09-08T02:00:00+05",
1133 "2020-09-08T03:00:00+05",
1134 "2020-09-08T04:00:00+05",
1135 ],
1136 Some("+05".into()),
1137 "1970-01-01T00:00:00+05",
1138 vec![
1139 "2020-09-08T00:00:00+05",
1140 "2020-09-08T00:00:00+05",
1141 "2020-09-08T00:00:00+05",
1142 "2020-09-08T00:00:00+05",
1143 "2020-09-08T00:00:00+05",
1144 ],
1145 ),
1146 (
1147 vec![
1148 "2020-09-08T00:00:00+08",
1149 "2020-09-08T01:00:00+08",
1150 "2020-09-08T02:00:00+08",
1151 "2020-09-08T03:00:00+08",
1152 "2020-09-08T04:00:00+08",
1153 ],
1154 Some("+08".into()),
1155 "1970-01-01T00:00:00+08",
1156 vec![
1157 "2020-09-08T00:00:00+08",
1158 "2020-09-08T00:00:00+08",
1159 "2020-09-08T00:00:00+08",
1160 "2020-09-08T00:00:00+08",
1161 "2020-09-08T00:00:00+08",
1162 ],
1163 ),
1164 ];
1165
1166 cases
1167 .iter()
1168 .for_each(|(original, tz_opt, origin, expected)| {
1169 let input = original
1170 .iter()
1171 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1172 .collect::<TimestampNanosecondArray>()
1173 .with_timezone_opt(tz_opt.clone());
1174 let right = expected
1175 .iter()
1176 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1177 .collect::<TimestampNanosecondArray>()
1178 .with_timezone_opt(tz_opt.clone());
1179 let batch_len = input.len();
1180 let args = vec![
1181 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
1182 ColumnarValue::Array(Arc::new(input)),
1183 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1184 Some(string_to_timestamp_nanos(origin).unwrap()),
1185 tz_opt.clone(),
1186 )),
1187 ];
1188 let return_field = &Arc::new(Field::new(
1189 "f",
1190 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
1191 true,
1192 ));
1193 let result =
1194 invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
1195
1196 if let ColumnarValue::Array(result) = result {
1197 assert_eq!(
1198 result.data_type(),
1199 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
1200 );
1201 let left = arrow::array::cast::as_primitive_array::<
1202 TimestampNanosecondType,
1203 >(&result);
1204 assert_eq!(left, &right);
1205 } else {
1206 panic!("unexpected column type");
1207 }
1208 });
1209 }
1210
1211 #[test]
1212 fn test_date_bin_single() {
1213 let cases = [
1214 (
1215 (
1216 TimeDelta::try_minutes(15),
1217 "2004-04-09T02:03:04.123456789Z",
1218 "2001-01-01T00:00:00",
1219 ),
1220 "2004-04-09T02:00:00Z",
1221 ),
1222 (
1223 (
1224 TimeDelta::try_minutes(15),
1225 "2004-04-09T02:03:04.123456789Z",
1226 "2001-01-01T00:02:30",
1227 ),
1228 "2004-04-09T02:02:30Z",
1229 ),
1230 (
1231 (
1232 TimeDelta::try_minutes(15),
1233 "2004-04-09T02:03:04.123456789Z",
1234 "2005-01-01T00:02:30",
1235 ),
1236 "2004-04-09T02:02:30Z",
1237 ),
1238 (
1239 (
1240 TimeDelta::try_hours(1),
1241 "2004-04-09T02:03:04.123456789Z",
1242 "2001-01-01T00:00:00",
1243 ),
1244 "2004-04-09T02:00:00Z",
1245 ),
1246 (
1247 (
1248 TimeDelta::try_seconds(10),
1249 "2004-04-09T02:03:11.123456789Z",
1250 "2001-01-01T00:00:00",
1251 ),
1252 "2004-04-09T02:03:10Z",
1253 ),
1254 ];
1255
1256 cases
1257 .iter()
1258 .for_each(|((stride, source, origin), expected)| {
1259 let stride = stride.unwrap();
1260 let stride1 = stride.num_nanoseconds().unwrap();
1261 let source1 = string_to_timestamp_nanos(source).unwrap();
1262 let origin1 = string_to_timestamp_nanos(origin).unwrap();
1263
1264 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1265 let result = date_bin_nanos_interval(stride1, source1, origin1).unwrap();
1266 assert_eq!(result, expected1, "{source} = {expected}");
1267 })
1268 }
1269
1270 #[test]
1271 fn test_date_bin_before_epoch() {
1272 let cases = [
1273 (
1274 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
1275 "1969-12-31T23:30:00",
1276 ),
1277 (
1278 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
1279 "1969-12-31T23:45:00",
1280 ),
1281 (
1282 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
1283 "1969-12-31T23:45:00",
1284 ),
1285 ];
1286
1287 cases.iter().for_each(|((stride, source), expected)| {
1288 let stride = stride.unwrap();
1289 let stride1 = stride.num_nanoseconds().unwrap();
1290 let source1 = string_to_timestamp_nanos(source).unwrap();
1291
1292 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1293 let result = date_bin_nanos_interval(stride1, source1, 0).unwrap();
1294 assert_eq!(result, expected1, "{source} = {expected}");
1295 })
1296 }
1297
1298 #[test]
1299 fn test_date_bin_out_of_range() {
1300 let return_field = &Arc::new(Field::new(
1301 "f",
1302 DataType::Timestamp(TimeUnit::Millisecond, None),
1303 true,
1304 ));
1305 let args = vec![
1306 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1307 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1308 Some(1040292460),
1309 None,
1310 )),
1311 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1312 Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1313 None,
1314 )),
1315 ];
1316
1317 let result = invoke_date_bin_with_args(args, 1, return_field);
1318 assert!(result.is_ok());
1319 if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1320 result.unwrap()
1321 {
1322 assert!(val.is_none(), "Expected None for out of range operation");
1323 }
1324 let args = vec![
1325 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1326 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1327 Some(-1040292460),
1328 None,
1329 )),
1330 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1331 Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1332 None,
1333 )),
1334 ];
1335
1336 let result = invoke_date_bin_with_args(args, 1, return_field);
1337 assert!(result.is_ok());
1338 if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1339 result.unwrap()
1340 {
1341 assert!(val.is_none(), "Expected None for out of range operation");
1342 }
1343 }
1344}