1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25 TimestampSecondType,
26};
27use arrow::array::{ArrayRef, AsArray, PrimitiveArray};
28use arrow::datatypes::DataType::{Time32, Time64, Timestamp};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{
32 DataType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
33 Time64NanosecondType, TimeUnit,
34};
35use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
36use datafusion_common::cast::as_primitive_array;
37use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err};
38use datafusion_expr::TypeSignature::Exact;
39use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
40use datafusion_expr::{
41 ColumnarValue, Documentation, ScalarUDFImpl, Signature, TIMEZONE_WILDCARD, Volatility,
42};
43use datafusion_macros::user_doc;
44
45use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
46
47#[user_doc(
48 doc_section(label = "Time and Date Functions"),
49 description = r#"
50Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
51
52For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
53"#,
54 syntax_example = "date_bin(interval, expression, origin-timestamp)",
55 sql_example = r#"```sql
56-- Bin the timestamp into 1 day intervals
57> SELECT date_bin(interval '1 day', time) as bin
58FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
59+---------------------+
60| bin |
61+---------------------+
62| 2023-01-01T00:00:00 |
63| 2023-01-03T00:00:00 |
64+---------------------+
652 row(s) fetched.
66
67-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
68> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
69FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
70+---------------------+
71| bin |
72+---------------------+
73| 2023-01-01T03:00:00 |
74| 2023-01-03T03:00:00 |
75+---------------------+
762 row(s) fetched.
77
78-- Bin the time into 15 minute intervals starting at 1 min
79> SELECT date_bin(interval '15 minutes', time, TIME '00:01:00') as bin
80FROM VALUES (TIME '02:18:18'), (TIME '19:00:03') t(time);
81+----------+
82| bin |
83+----------+
84| 02:16:00 |
85| 18:46:00 |
86+----------+
872 row(s) fetched.
88```"#,
89 argument(name = "interval", description = "Bin interval."),
90 argument(
91 name = "expression",
92 description = "Time expression to operate on. Can be a constant, column, or function."
93 ),
94 argument(
95 name = "origin-timestamp",
96 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
97
98 - nanoseconds
99 - microseconds
100 - milliseconds
101 - seconds
102 - minutes
103 - hours
104 - days
105 - weeks
106 - months
107 - years
108 - century
109"#
110 )
111)]
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct DateBinFunc {
114 signature: Signature,
115}
116
117impl Default for DateBinFunc {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl DateBinFunc {
124 pub fn new() -> Self {
125 let base_sig = |array_type: TimeUnit| {
126 let mut v = vec![
127 Exact(vec![
128 DataType::Interval(MonthDayNano),
129 Timestamp(array_type, None),
130 Timestamp(Nanosecond, None),
131 ]),
132 Exact(vec![
133 DataType::Interval(MonthDayNano),
134 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
135 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
136 ]),
137 Exact(vec![
138 DataType::Interval(DayTime),
139 Timestamp(array_type, None),
140 Timestamp(Nanosecond, None),
141 ]),
142 Exact(vec![
143 DataType::Interval(DayTime),
144 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
145 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
146 ]),
147 Exact(vec![
148 DataType::Interval(MonthDayNano),
149 Timestamp(array_type, None),
150 ]),
151 Exact(vec![
152 DataType::Interval(MonthDayNano),
153 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
154 ]),
155 Exact(vec![
156 DataType::Interval(DayTime),
157 Timestamp(array_type, None),
158 ]),
159 Exact(vec![
160 DataType::Interval(DayTime),
161 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
162 ]),
163 ];
164
165 match array_type {
166 Second | Millisecond => {
167 v.append(&mut vec![
168 Exact(vec![
169 DataType::Interval(MonthDayNano),
170 Time32(array_type),
171 Time32(array_type),
172 ]),
173 Exact(vec![DataType::Interval(MonthDayNano), Time32(array_type)]),
174 Exact(vec![
175 DataType::Interval(DayTime),
176 Time32(array_type),
177 Time32(array_type),
178 ]),
179 Exact(vec![DataType::Interval(DayTime), Time32(array_type)]),
180 ]);
181 }
182 Microsecond | Nanosecond => {
183 v.append(&mut vec![
184 Exact(vec![
185 DataType::Interval(DayTime),
186 Time64(array_type),
187 Time64(array_type),
188 ]),
189 Exact(vec![DataType::Interval(DayTime), Time64(array_type)]),
190 Exact(vec![
191 DataType::Interval(MonthDayNano),
192 Time64(array_type),
193 Time64(array_type),
194 ]),
195 Exact(vec![DataType::Interval(MonthDayNano), Time64(array_type)]),
196 ]);
197 }
198 }
199
200 v
201 };
202
203 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
204 .into_iter()
205 .map(base_sig)
206 .collect::<Vec<_>>()
207 .concat();
208
209 Self {
210 signature: Signature::one_of(full_sig, Volatility::Immutable),
211 }
212 }
213}
214
215impl ScalarUDFImpl for DateBinFunc {
216 fn as_any(&self) -> &dyn Any {
217 self
218 }
219
220 fn name(&self) -> &str {
221 "date_bin"
222 }
223
224 fn signature(&self) -> &Signature {
225 &self.signature
226 }
227
228 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
229 match &arg_types[1] {
230 Timestamp(tu, tz_opt) => Ok(Timestamp(*tu, tz_opt.clone())),
231 Time32(tu) => Ok(Time32(*tu)),
232 Time64(tu) => Ok(Time64(*tu)),
233 _ => plan_err!(
234 "The date_bin function can only accept timestamp or time as the second arg."
235 ),
236 }
237 }
238
239 fn invoke_with_args(
240 &self,
241 args: datafusion_expr::ScalarFunctionArgs,
242 ) -> Result<ColumnarValue> {
243 let args = &args.args;
244 if args.len() == 2 {
245 let origin = match args[1].data_type() {
246 Time32(Second) => {
247 ColumnarValue::Scalar(ScalarValue::Time32Second(Some(0)))
248 }
249 Time32(Millisecond) => {
250 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0)))
251 }
252 Time64(Microsecond) => {
253 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0)))
254 }
255 Time64(Nanosecond) => {
256 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0)))
257 }
258 _ => {
259 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
261 Some(0),
262 Some("+00:00".into()),
263 ))
264 }
265 };
266 date_bin_impl(&args[0], &args[1], &origin)
267 } else if args.len() == 3 {
268 date_bin_impl(&args[0], &args[1], &args[2])
269 } else {
270 exec_err!("DATE_BIN expected two or three arguments")
271 }
272 }
273
274 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
275 let step = &input[0];
277 let date_value = &input[1];
278 let reference = input.get(2);
279
280 if step.sort_properties.eq(&SortProperties::Singleton)
281 && reference
282 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
283 .unwrap_or(true)
284 {
285 Ok(date_value.sort_properties)
286 } else {
287 Ok(SortProperties::Unordered)
288 }
289 }
290 fn documentation(&self) -> Option<&Documentation> {
291 self.doc()
292 }
293}
294
295const NANOS_PER_MICRO: i64 = 1_000;
296const NANOS_PER_MILLI: i64 = 1_000_000;
297const NANOS_PER_SEC: i64 = NANOSECONDS;
298type BinFunction = fn(i64, i64, i64) -> Result<i64>;
307enum Interval {
308 Nanoseconds(i64),
309 Months(i64),
310}
311
312impl Interval {
313 fn bin_fn(&self) -> (i64, BinFunction) {
322 match self {
323 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
324 Interval::Months(months) => (*months, date_bin_months_interval),
325 }
326 }
327}
328
329fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> Result<i64> {
331 let time_diff = source - origin;
332
333 let time_delta = compute_distance(time_diff, stride_nanos);
335
336 Ok(origin + time_delta)
337}
338
339fn compute_distance(time_diff: i64, stride: i64) -> i64 {
341 let time_delta = time_diff - (time_diff % stride);
342
343 if time_diff < 0 && stride > 1 && time_delta != time_diff {
344 time_delta - stride
346 } else {
347 time_delta
348 }
349}
350
351fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Result<i64> {
353 let source_date = to_utc_date_time(source)?;
355 let origin_date = to_utc_date_time(origin)?;
356
357 let month_diff = (source_date.year() - origin_date.year()) * 12
359 + source_date.month() as i32
360 - origin_date.month() as i32;
361
362 let month_delta = compute_distance(month_diff as i64, stride_months);
364
365 let mut bin_time = if month_delta < 0 {
366 match origin_date
367 .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
368 {
369 Some(dt) => dt,
370 None => return exec_err!("DATE_BIN month subtraction out of range"),
371 }
372 } else {
373 match origin_date.checked_add_months(Months::new(month_delta as u32)) {
374 Some(dt) => dt,
375 None => return exec_err!("DATE_BIN month addition out of range"),
376 }
377 };
378
379 if bin_time > source_date {
382 let month_delta = month_delta - stride_months;
383 bin_time = if month_delta < 0 {
384 match origin_date
385 .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32))
386 {
387 Some(dt) => dt,
388 None => return exec_err!("DATE_BIN month subtraction out of range"),
389 }
390 } else {
391 match origin_date.checked_add_months(Months::new(month_delta as u32)) {
392 Some(dt) => dt,
393 None => return exec_err!("DATE_BIN month addition out of range"),
394 }
395 };
396 }
397 match bin_time.timestamp_nanos_opt() {
398 Some(nanos) => Ok(nanos),
399 None => exec_err!("DATE_BIN result timestamp out of range"),
400 }
401}
402
403fn to_utc_date_time(nanos: i64) -> Result<DateTime<Utc>> {
404 let secs = nanos / NANOS_PER_SEC;
405 let nsec = (nanos % NANOS_PER_SEC) as u32;
406 match DateTime::from_timestamp(secs, nsec) {
407 Some(dt) => Ok(dt),
408 None => exec_err!("Invalid timestamp value"),
409 }
410}
411
412fn date_bin_impl(
419 stride: &ColumnarValue,
420 array: &ColumnarValue,
421 origin: &ColumnarValue,
422) -> Result<ColumnarValue> {
423 let stride = match stride {
424 ColumnarValue::Scalar(s) if s.is_null() => {
425 return Ok(ColumnarValue::Scalar(ScalarValue::try_from(
427 array.data_type(),
428 )?));
429 }
430 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
431 let (days, ms) = IntervalDayTimeType::to_parts(*v);
432 let nanos = (TimeDelta::try_days(days as i64).unwrap()
433 + TimeDelta::try_milliseconds(ms as i64).unwrap())
434 .num_nanoseconds();
435
436 match nanos {
437 Some(v) => Interval::Nanoseconds(v),
438 _ => return exec_err!("DATE_BIN stride argument is too large"),
439 }
440 }
441 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
442 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
443
444 if months != 0 {
446 if days != 0 || nanos != 0 {
448 return not_impl_err!(
449 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
450 );
451 } else {
452 Interval::Months(months as i64)
453 }
454 } else {
455 let nanos = (TimeDelta::try_days(days as i64).unwrap()
456 + Duration::nanoseconds(nanos))
457 .num_nanoseconds();
458 match nanos {
459 Some(v) => Interval::Nanoseconds(v),
460 _ => return exec_err!("DATE_BIN stride argument is too large"),
461 }
462 }
463 }
464 ColumnarValue::Scalar(v) => {
465 return exec_err!(
466 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
467 v.data_type()
468 );
469 }
470 ColumnarValue::Array(_) => {
471 return not_impl_err!(
472 "DATE_BIN only supports literal values for the stride argument, not arrays"
473 );
474 }
475 };
476
477 let (origin, is_time) = match origin {
478 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => {
479 (*v, false)
480 }
481 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => {
482 match stride {
483 Interval::Months(m) => {
484 if m > 0 {
485 return exec_err!(
486 "DATE_BIN stride for TIME input must be less than 1 day"
487 );
488 }
489 }
490 Interval::Nanoseconds(ns) => {
491 if ns >= NANOSECONDS_IN_DAY {
492 return exec_err!(
493 "DATE_BIN stride for TIME input must be less than 1 day"
494 );
495 }
496 }
497 }
498
499 (*v as i64 * NANOS_PER_MILLI, true)
500 }
501 ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => {
502 match stride {
503 Interval::Months(m) => {
504 if m > 0 {
505 return exec_err!(
506 "DATE_BIN stride for TIME input must be less than 1 day"
507 );
508 }
509 }
510 Interval::Nanoseconds(ns) => {
511 if ns >= NANOSECONDS_IN_DAY {
512 return exec_err!(
513 "DATE_BIN stride for TIME input must be less than 1 day"
514 );
515 }
516 }
517 }
518
519 (*v as i64 * NANOS_PER_SEC, true)
520 }
521 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => {
522 match stride {
523 Interval::Months(m) => {
524 if m > 0 {
525 return exec_err!(
526 "DATE_BIN stride for TIME input must be less than 1 day"
527 );
528 }
529 }
530 Interval::Nanoseconds(ns) => {
531 if ns >= NANOSECONDS_IN_DAY {
532 return exec_err!(
533 "DATE_BIN stride for TIME input must be less than 1 day"
534 );
535 }
536 }
537 }
538
539 (*v * NANOS_PER_MICRO, true)
540 }
541 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => {
542 match stride {
543 Interval::Months(m) => {
544 if m > 0 {
545 return exec_err!(
546 "DATE_BIN stride for TIME input must be less than 1 day"
547 );
548 }
549 }
550 Interval::Nanoseconds(ns) => {
551 if ns >= NANOSECONDS_IN_DAY {
552 return exec_err!(
553 "DATE_BIN stride for TIME input must be less than 1 day"
554 );
555 }
556 }
557 }
558
559 (*v, true)
560 }
561 ColumnarValue::Scalar(v) => {
562 return exec_err!(
563 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got {}",
564 v.data_type()
565 );
566 }
567 ColumnarValue::Array(_) => {
568 return not_impl_err!(
569 "DATE_BIN only supports literal values for the origin argument, not arrays"
570 );
571 }
572 };
573
574 let (stride, stride_fn) = stride.bin_fn();
575
576 if stride == 0 {
578 return exec_err!("DATE_BIN stride must be non-zero");
579 }
580
581 fn stride_map_fn<T: ArrowTimestampType>(
582 origin: i64,
583 stride: i64,
584 stride_fn: BinFunction,
585 ) -> impl Fn(i64) -> Result<i64> {
586 let scale = match T::UNIT {
587 Nanosecond => 1,
588 Microsecond => NANOS_PER_MICRO,
589 Millisecond => NANOS_PER_MILLI,
590 Second => NANOSECONDS,
591 };
592 move |x: i64| match stride_fn(stride, x * scale, origin) {
593 Ok(result) => Ok(result / scale),
594 Err(e) => Err(e),
595 }
596 }
597
598 Ok(match array {
599 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
600 let apply_stride_fn =
601 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
602 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
603 v.and_then(|val| apply_stride_fn(val).ok()),
604 tz_opt.clone(),
605 ))
606 }
607 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
608 let apply_stride_fn =
609 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
610 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
611 v.and_then(|val| apply_stride_fn(val).ok()),
612 tz_opt.clone(),
613 ))
614 }
615 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
616 let apply_stride_fn =
617 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
618 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
619 v.and_then(|val| apply_stride_fn(val).ok()),
620 tz_opt.clone(),
621 ))
622 }
623 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
624 let apply_stride_fn =
625 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
626 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
627 v.and_then(|val| apply_stride_fn(val).ok()),
628 tz_opt.clone(),
629 ))
630 }
631 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
632 if !is_time {
633 return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
634 }
635 let result = v.and_then(|x| {
636 match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) {
637 Ok(binned_nanos) => {
638 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
639 Some((nanos / NANOS_PER_MILLI) as i32)
640 }
641 Err(_) => None,
642 }
643 });
644 ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result))
645 }
646 ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
647 if !is_time {
648 return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
649 }
650 let result = v.and_then(|x| {
651 match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) {
652 Ok(binned_nanos) => {
653 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
654 Some((nanos / NANOS_PER_SEC) as i32)
655 }
656 Err(_) => None,
657 }
658 });
659 ColumnarValue::Scalar(ScalarValue::Time32Second(result))
660 }
661 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
662 if !is_time {
663 return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
664 }
665 let result = v.and_then(|x| match stride_fn(stride, x, origin) {
666 Ok(binned_nanos) => Some(binned_nanos % (NANOSECONDS_IN_DAY)),
667 Err(_) => None,
668 });
669 ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result))
670 }
671 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
672 if !is_time {
673 return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
674 }
675 let result =
676 v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) {
677 Ok(binned_nanos) => {
678 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
679 Some(nanos / NANOS_PER_MICRO)
680 }
681 Err(_) => None,
682 });
683 ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result))
684 }
685 ColumnarValue::Array(array) => {
686 fn transform_array_with_stride<T>(
687 origin: i64,
688 stride: i64,
689 stride_fn: BinFunction,
690 array: &ArrayRef,
691 tz_opt: &Option<Arc<str>>,
692 ) -> Result<ColumnarValue>
693 where
694 T: ArrowTimestampType,
695 {
696 let array = as_primitive_array::<T>(array)?;
697 let scale = match T::UNIT {
698 Nanosecond => 1,
699 Microsecond => NANOS_PER_MICRO,
700 Millisecond => NANOS_PER_MILLI,
701 Second => NANOSECONDS,
702 };
703
704 let result: PrimitiveArray<T> = array.try_unary(|val| {
705 stride_fn(stride, val * scale, origin)
706 .map(|binned| binned / scale)
707 .map_err(|e| {
708 arrow::error::ArrowError::ComputeError(e.to_string())
709 })
710 })?;
711
712 let array = result.with_timezone_opt(tz_opt.clone());
713 Ok(ColumnarValue::Array(Arc::new(array)))
714 }
715
716 match array.data_type() {
717 Timestamp(Nanosecond, tz_opt) => {
718 transform_array_with_stride::<TimestampNanosecondType>(
719 origin, stride, stride_fn, array, tz_opt,
720 )?
721 }
722 Timestamp(Microsecond, tz_opt) => {
723 transform_array_with_stride::<TimestampMicrosecondType>(
724 origin, stride, stride_fn, array, tz_opt,
725 )?
726 }
727 Timestamp(Millisecond, tz_opt) => {
728 transform_array_with_stride::<TimestampMillisecondType>(
729 origin, stride, stride_fn, array, tz_opt,
730 )?
731 }
732 Timestamp(Second, tz_opt) => {
733 transform_array_with_stride::<TimestampSecondType>(
734 origin, stride, stride_fn, array, tz_opt,
735 )?
736 }
737 Time32(Millisecond) => {
738 if !is_time {
739 return exec_err!(
740 "DATE_BIN with Time32 source requires Time32 origin"
741 );
742 }
743 let array = array.as_primitive::<Time32MillisecondType>();
744 let result: PrimitiveArray<Time32MillisecondType> =
745 array.try_unary(|x| {
746 stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin)
747 .map(|binned_nanos| {
748 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
749 (nanos / NANOS_PER_MILLI) as i32
750 })
751 .map_err(|e| {
752 arrow::error::ArrowError::ComputeError(e.to_string())
753 })
754 })?;
755 ColumnarValue::Array(Arc::new(result))
756 }
757 Time32(Second) => {
758 if !is_time {
759 return exec_err!(
760 "DATE_BIN with Time32 source requires Time32 origin"
761 );
762 }
763 let array = array.as_primitive::<Time32SecondType>();
764 let result: PrimitiveArray<Time32SecondType> =
765 array.try_unary(|x| {
766 stride_fn(stride, x as i64 * NANOS_PER_SEC, origin)
767 .map(|binned_nanos| {
768 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
769 (nanos / NANOS_PER_SEC) as i32
770 })
771 .map_err(|e| {
772 arrow::error::ArrowError::ComputeError(e.to_string())
773 })
774 })?;
775 ColumnarValue::Array(Arc::new(result))
776 }
777 Time64(Microsecond) => {
778 if !is_time {
779 return exec_err!(
780 "DATE_BIN with Time64 source requires Time64 origin"
781 );
782 }
783 let array = array.as_primitive::<Time64MicrosecondType>();
784 let result: PrimitiveArray<Time64MicrosecondType> =
785 array.try_unary(|x| {
786 stride_fn(stride, x * NANOS_PER_MICRO, origin)
787 .map(|binned_nanos| {
788 let nanos = binned_nanos % (NANOSECONDS_IN_DAY);
789 nanos / NANOS_PER_MICRO
790 })
791 .map_err(|e| {
792 arrow::error::ArrowError::ComputeError(e.to_string())
793 })
794 })?;
795 ColumnarValue::Array(Arc::new(result))
796 }
797 Time64(Nanosecond) => {
798 if !is_time {
799 return exec_err!(
800 "DATE_BIN with Time64 source requires Time64 origin"
801 );
802 }
803 let array = array.as_primitive::<Time64NanosecondType>();
804 let result: PrimitiveArray<Time64NanosecondType> =
805 array.try_unary(|x| {
806 stride_fn(stride, x, origin)
807 .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY))
808 .map_err(|e| {
809 arrow::error::ArrowError::ComputeError(e.to_string())
810 })
811 })?;
812 ColumnarValue::Array(Arc::new(result))
813 }
814 _ => {
815 return exec_err!(
816 "DATE_BIN expects source argument to be a TIMESTAMP or TIME but got {}",
817 array.data_type()
818 );
819 }
820 }
821 }
822 _ => {
823 return exec_err!(
824 "DATE_BIN expects source argument to be a TIMESTAMP or TIME scalar or array"
825 );
826 }
827 })
828}
829
830#[cfg(test)]
831mod tests {
832 use std::sync::Arc;
833
834 use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval};
835 use arrow::array::types::TimestampNanosecondType;
836 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
837 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
838 use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
839
840 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
841 use datafusion_common::{DataFusionError, ScalarValue};
842 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
843
844 use chrono::TimeDelta;
845 use datafusion_common::config::ConfigOptions;
846
847 fn invoke_date_bin_with_args(
848 args: Vec<ColumnarValue>,
849 number_rows: usize,
850 return_field: &FieldRef,
851 ) -> Result<ColumnarValue, DataFusionError> {
852 let arg_fields = args
853 .iter()
854 .map(|arg| Field::new("a", arg.data_type(), true).into())
855 .collect::<Vec<_>>();
856
857 let args = datafusion_expr::ScalarFunctionArgs {
858 args,
859 arg_fields,
860 number_rows,
861 return_field: Arc::clone(return_field),
862 config_options: Arc::new(ConfigOptions::default()),
863 };
864 DateBinFunc::new().invoke_with_args(args)
865 }
866
867 #[test]
868 fn test_date_bin() {
869 let return_field = &Arc::new(Field::new(
870 "f",
871 DataType::Timestamp(TimeUnit::Nanosecond, None),
872 true,
873 ));
874
875 let mut args = vec![
876 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
877 days: 0,
878 milliseconds: 1,
879 }))),
880 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
881 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
882 ];
883 let res = invoke_date_bin_with_args(args, 1, return_field);
884 assert!(res.is_ok());
885
886 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
887 let batch_len = timestamps.len();
888 args = vec![
889 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
890 days: 0,
891 milliseconds: 1,
892 }))),
893 ColumnarValue::Array(timestamps),
894 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
895 ];
896 let res = invoke_date_bin_with_args(args, batch_len, return_field);
897 assert!(res.is_ok());
898
899 args = vec![
900 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
901 days: 0,
902 milliseconds: 1,
903 }))),
904 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
905 ];
906 let res = invoke_date_bin_with_args(args, 1, return_field);
907 assert!(res.is_ok());
908
909 args = vec![
911 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
912 IntervalMonthDayNano {
913 months: 0,
914 days: 0,
915 nanoseconds: 1,
916 },
917 ))),
918 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
919 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
920 ];
921 let res = invoke_date_bin_with_args(args, 1, return_field);
922 assert!(res.is_ok());
923
924 args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
930 IntervalDayTime {
931 days: 0,
932 milliseconds: 1,
933 },
934 )))];
935 let res = invoke_date_bin_with_args(args, 1, return_field);
936 assert_eq!(
937 res.err().unwrap().strip_backtrace(),
938 "Execution error: DATE_BIN expected two or three arguments"
939 );
940
941 args = vec![
943 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
944 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
945 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
946 ];
947 let res = invoke_date_bin_with_args(args, 1, return_field);
948 assert_eq!(
949 res.err().unwrap().strip_backtrace(),
950 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
951 );
952
953 args = vec![
956 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
957 days: 0,
958 milliseconds: 0,
959 }))),
960 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
961 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
962 ];
963
964 let res = invoke_date_bin_with_args(args, 1, return_field);
965 assert_eq!(
966 res.err().unwrap().strip_backtrace(),
967 "Execution error: DATE_BIN stride must be non-zero"
968 );
969
970 args = vec![
972 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
973 IntervalDayTime::MAX,
974 ))),
975 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
976 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
977 ];
978 let res = invoke_date_bin_with_args(args, 1, return_field);
979 assert_eq!(
980 res.err().unwrap().strip_backtrace(),
981 "Execution error: DATE_BIN stride argument is too large"
982 );
983
984 args = vec![
986 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
987 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
988 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
989 ];
990 let res = invoke_date_bin_with_args(args, 1, return_field);
991 assert_eq!(
992 res.err().unwrap().strip_backtrace(),
993 "Execution error: DATE_BIN stride argument is too large"
994 );
995
996 args = vec![
998 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
999 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1000 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1001 ];
1002 let res = invoke_date_bin_with_args(args, 1, return_field);
1003 assert_eq!(
1004 res.err().unwrap().strip_backtrace(),
1005 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
1006 );
1007
1008 args = vec![
1010 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1011 days: 0,
1012 milliseconds: 1,
1013 }))),
1014 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1015 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1016 ];
1017 let res = invoke_date_bin_with_args(args, 1, return_field);
1018 assert_eq!(
1019 res.err().unwrap().strip_backtrace(),
1020 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision or a TIME but got Timestamp(µs)"
1021 );
1022
1023 args = vec![
1024 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1025 days: 0,
1026 milliseconds: 1,
1027 }))),
1028 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
1029 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1030 ];
1031 let res = invoke_date_bin_with_args(args, 1, return_field);
1032 assert!(res.is_ok());
1033
1034 let intervals = Arc::new(
1036 (1..6)
1037 .map(|x| {
1038 Some(IntervalDayTime {
1039 days: 0,
1040 milliseconds: x,
1041 })
1042 })
1043 .collect::<IntervalDayTimeArray>(),
1044 );
1045 args = vec![
1046 ColumnarValue::Array(intervals),
1047 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1048 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1049 ];
1050 let res = invoke_date_bin_with_args(args, 1, return_field);
1051 assert_eq!(
1052 res.err().unwrap().strip_backtrace(),
1053 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
1054 );
1055
1056 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
1058 let batch_len = timestamps.len();
1059 args = vec![
1060 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
1061 days: 0,
1062 milliseconds: 1,
1063 }))),
1064 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
1065 ColumnarValue::Array(timestamps),
1066 ];
1067 let res = invoke_date_bin_with_args(args, batch_len, return_field);
1068 assert_eq!(
1069 res.err().unwrap().strip_backtrace(),
1070 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
1071 );
1072 }
1073
1074 #[test]
1075 fn test_date_bin_timezones() {
1076 let cases = [
1077 (
1078 vec![
1079 "2020-09-08T00:00:00Z",
1080 "2020-09-08T01:00:00Z",
1081 "2020-09-08T02:00:00Z",
1082 "2020-09-08T03:00:00Z",
1083 "2020-09-08T04:00:00Z",
1084 ],
1085 Some("+00".into()),
1086 "1970-01-01T00:00:00Z",
1087 vec![
1088 "2020-09-08T00:00:00Z",
1089 "2020-09-08T00:00:00Z",
1090 "2020-09-08T00:00:00Z",
1091 "2020-09-08T00:00:00Z",
1092 "2020-09-08T00:00:00Z",
1093 ],
1094 ),
1095 (
1096 vec![
1097 "2020-09-08T00:00:00Z",
1098 "2020-09-08T01:00:00Z",
1099 "2020-09-08T02:00:00Z",
1100 "2020-09-08T03:00:00Z",
1101 "2020-09-08T04:00:00Z",
1102 ],
1103 None,
1104 "1970-01-01T00:00:00Z",
1105 vec![
1106 "2020-09-08T00:00:00Z",
1107 "2020-09-08T00:00:00Z",
1108 "2020-09-08T00:00:00Z",
1109 "2020-09-08T00:00:00Z",
1110 "2020-09-08T00:00:00Z",
1111 ],
1112 ),
1113 (
1114 vec![
1115 "2020-09-08T00:00:00Z",
1116 "2020-09-08T01:00:00Z",
1117 "2020-09-08T02:00:00Z",
1118 "2020-09-08T03:00:00Z",
1119 "2020-09-08T04:00:00Z",
1120 ],
1121 Some("-02".into()),
1122 "1970-01-01T00:00:00Z",
1123 vec![
1124 "2020-09-08T00:00:00Z",
1125 "2020-09-08T00:00:00Z",
1126 "2020-09-08T00:00:00Z",
1127 "2020-09-08T00:00:00Z",
1128 "2020-09-08T00:00:00Z",
1129 ],
1130 ),
1131 (
1132 vec![
1133 "2020-09-08T00:00:00+05",
1134 "2020-09-08T01:00:00+05",
1135 "2020-09-08T02:00:00+05",
1136 "2020-09-08T03:00:00+05",
1137 "2020-09-08T04:00:00+05",
1138 ],
1139 Some("+05".into()),
1140 "1970-01-01T00:00:00+05",
1141 vec![
1142 "2020-09-08T00:00:00+05",
1143 "2020-09-08T00:00:00+05",
1144 "2020-09-08T00:00:00+05",
1145 "2020-09-08T00:00:00+05",
1146 "2020-09-08T00:00:00+05",
1147 ],
1148 ),
1149 (
1150 vec![
1151 "2020-09-08T00:00:00+08",
1152 "2020-09-08T01:00:00+08",
1153 "2020-09-08T02:00:00+08",
1154 "2020-09-08T03:00:00+08",
1155 "2020-09-08T04:00:00+08",
1156 ],
1157 Some("+08".into()),
1158 "1970-01-01T00:00:00+08",
1159 vec![
1160 "2020-09-08T00:00:00+08",
1161 "2020-09-08T00:00:00+08",
1162 "2020-09-08T00:00:00+08",
1163 "2020-09-08T00:00:00+08",
1164 "2020-09-08T00:00:00+08",
1165 ],
1166 ),
1167 ];
1168
1169 cases
1170 .iter()
1171 .for_each(|(original, tz_opt, origin, expected)| {
1172 let input = original
1173 .iter()
1174 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1175 .collect::<TimestampNanosecondArray>()
1176 .with_timezone_opt(tz_opt.clone());
1177 let right = expected
1178 .iter()
1179 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
1180 .collect::<TimestampNanosecondArray>()
1181 .with_timezone_opt(tz_opt.clone());
1182 let batch_len = input.len();
1183 let args = vec![
1184 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
1185 ColumnarValue::Array(Arc::new(input)),
1186 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1187 Some(string_to_timestamp_nanos(origin).unwrap()),
1188 tz_opt.clone(),
1189 )),
1190 ];
1191 let return_field = &Arc::new(Field::new(
1192 "f",
1193 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
1194 true,
1195 ));
1196 let result =
1197 invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
1198
1199 if let ColumnarValue::Array(result) = result {
1200 assert_eq!(
1201 result.data_type(),
1202 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
1203 );
1204 let left = arrow::array::cast::as_primitive_array::<
1205 TimestampNanosecondType,
1206 >(&result);
1207 assert_eq!(left, &right);
1208 } else {
1209 panic!("unexpected column type");
1210 }
1211 });
1212 }
1213
1214 #[test]
1215 fn test_date_bin_single() {
1216 let cases = [
1217 (
1218 (
1219 TimeDelta::try_minutes(15),
1220 "2004-04-09T02:03:04.123456789Z",
1221 "2001-01-01T00:00:00",
1222 ),
1223 "2004-04-09T02:00:00Z",
1224 ),
1225 (
1226 (
1227 TimeDelta::try_minutes(15),
1228 "2004-04-09T02:03:04.123456789Z",
1229 "2001-01-01T00:02:30",
1230 ),
1231 "2004-04-09T02:02:30Z",
1232 ),
1233 (
1234 (
1235 TimeDelta::try_minutes(15),
1236 "2004-04-09T02:03:04.123456789Z",
1237 "2005-01-01T00:02:30",
1238 ),
1239 "2004-04-09T02:02:30Z",
1240 ),
1241 (
1242 (
1243 TimeDelta::try_hours(1),
1244 "2004-04-09T02:03:04.123456789Z",
1245 "2001-01-01T00:00:00",
1246 ),
1247 "2004-04-09T02:00:00Z",
1248 ),
1249 (
1250 (
1251 TimeDelta::try_seconds(10),
1252 "2004-04-09T02:03:11.123456789Z",
1253 "2001-01-01T00:00:00",
1254 ),
1255 "2004-04-09T02:03:10Z",
1256 ),
1257 ];
1258
1259 cases
1260 .iter()
1261 .for_each(|((stride, source, origin), expected)| {
1262 let stride = stride.unwrap();
1263 let stride1 = stride.num_nanoseconds().unwrap();
1264 let source1 = string_to_timestamp_nanos(source).unwrap();
1265 let origin1 = string_to_timestamp_nanos(origin).unwrap();
1266
1267 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1268 let result = date_bin_nanos_interval(stride1, source1, origin1).unwrap();
1269 assert_eq!(result, expected1, "{source} = {expected}");
1270 })
1271 }
1272
1273 #[test]
1274 fn test_date_bin_before_epoch() {
1275 let cases = [
1276 (
1277 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
1278 "1969-12-31T23:30:00",
1279 ),
1280 (
1281 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
1282 "1969-12-31T23:45:00",
1283 ),
1284 (
1285 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
1286 "1969-12-31T23:45:00",
1287 ),
1288 ];
1289
1290 cases.iter().for_each(|((stride, source), expected)| {
1291 let stride = stride.unwrap();
1292 let stride1 = stride.num_nanoseconds().unwrap();
1293 let source1 = string_to_timestamp_nanos(source).unwrap();
1294
1295 let expected1 = string_to_timestamp_nanos(expected).unwrap();
1296 let result = date_bin_nanos_interval(stride1, source1, 0).unwrap();
1297 assert_eq!(result, expected1, "{source} = {expected}");
1298 })
1299 }
1300
1301 #[test]
1302 fn test_date_bin_out_of_range() {
1303 let return_field = &Arc::new(Field::new(
1304 "f",
1305 DataType::Timestamp(TimeUnit::Millisecond, None),
1306 true,
1307 ));
1308 let args = vec![
1309 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1310 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1311 Some(1040292460),
1312 None,
1313 )),
1314 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1315 Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1316 None,
1317 )),
1318 ];
1319
1320 let result = invoke_date_bin_with_args(args, 1, return_field);
1321 assert!(result.is_ok());
1322 if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1323 result.unwrap()
1324 {
1325 assert!(val.is_none(), "Expected None for out of range operation");
1326 }
1327 let args = vec![
1328 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1637426858, 0, 0)),
1329 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1330 Some(-1040292460),
1331 None,
1332 )),
1333 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1334 Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()),
1335 None,
1336 )),
1337 ];
1338
1339 let result = invoke_date_bin_with_args(args, 1, return_field);
1340 assert!(result.is_ok());
1341 if let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(val, _)) =
1342 result.unwrap()
1343 {
1344 assert!(val.is_none(), "Expected None for out of range operation");
1345 }
1346 }
1347}