1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25 TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45 doc_section(label = "Time and Date Functions"),
46 description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51 syntax_example = "date_bin(interval, expression, origin-timestamp)",
52 sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
56+---------------------+
57| bin |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
65> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
67+---------------------+
68| bin |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75 argument(name = "interval", description = "Bin interval."),
76 argument(
77 name = "expression",
78 description = "Time expression to operate on. Can be a constant, column, or function."
79 ),
80 argument(
81 name = "origin-timestamp",
82 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84 - nanoseconds
85 - microseconds
86 - milliseconds
87 - seconds
88 - minutes
89 - hours
90 - days
91 - weeks
92 - months
93 - years
94 - century
95"#
96 )
97)]
98#[derive(Debug, PartialEq, Eq, Hash)]
99pub struct DateBinFunc {
100 signature: Signature,
101}
102
103impl Default for DateBinFunc {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl DateBinFunc {
110 pub fn new() -> Self {
111 let base_sig = |array_type: TimeUnit| {
112 vec![
113 Exact(vec![
114 DataType::Interval(MonthDayNano),
115 Timestamp(array_type, None),
116 Timestamp(Nanosecond, None),
117 ]),
118 Exact(vec![
119 DataType::Interval(MonthDayNano),
120 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122 ]),
123 Exact(vec![
124 DataType::Interval(DayTime),
125 Timestamp(array_type, None),
126 Timestamp(Nanosecond, None),
127 ]),
128 Exact(vec![
129 DataType::Interval(DayTime),
130 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132 ]),
133 Exact(vec![
134 DataType::Interval(MonthDayNano),
135 Timestamp(array_type, None),
136 ]),
137 Exact(vec![
138 DataType::Interval(MonthDayNano),
139 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140 ]),
141 Exact(vec![
142 DataType::Interval(DayTime),
143 Timestamp(array_type, None),
144 ]),
145 Exact(vec![
146 DataType::Interval(DayTime),
147 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148 ]),
149 ]
150 };
151
152 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153 .into_iter()
154 .map(base_sig)
155 .collect::<Vec<_>>()
156 .concat();
157
158 Self {
159 signature: Signature::one_of(full_sig, Volatility::Immutable),
160 }
161 }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn name(&self) -> &str {
170 "date_bin"
171 }
172
173 fn signature(&self) -> &Signature {
174 &self.signature
175 }
176
177 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178 match &arg_types[1] {
179 Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180 Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181 Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182 Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183 Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184 _ => plan_err!(
185 "The date_bin function can only accept timestamp as the second arg."
186 ),
187 }
188 }
189
190 fn invoke_with_args(
191 &self,
192 args: datafusion_expr::ScalarFunctionArgs,
193 ) -> Result<ColumnarValue> {
194 let args = &args.args;
195 if args.len() == 2 {
196 let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198 Some(0),
199 Some("+00:00".into()),
200 ));
201 date_bin_impl(&args[0], &args[1], &origin)
202 } else if args.len() == 3 {
203 date_bin_impl(&args[0], &args[1], &args[2])
204 } else {
205 exec_err!("DATE_BIN expected two or three arguments")
206 }
207 }
208
209 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210 let step = &input[0];
212 let date_value = &input[1];
213 let reference = input.get(2);
214
215 if step.sort_properties.eq(&SortProperties::Singleton)
216 && reference
217 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218 .unwrap_or(true)
219 {
220 Ok(date_value.sort_properties)
221 } else {
222 Ok(SortProperties::Unordered)
223 }
224 }
225 fn documentation(&self) -> Option<&Documentation> {
226 self.doc()
227 }
228}
229
230enum Interval {
231 Nanoseconds(i64),
232 Months(i64),
233}
234
235impl Interval {
236 fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245 match self {
246 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247 Interval::Months(months) => (*months, date_bin_months_interval),
248 }
249 }
250}
251
252fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254 let time_diff = source - origin;
255
256 let time_delta = compute_distance(time_diff, stride_nanos);
258
259 origin + time_delta
260}
261
262fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264 let time_delta = time_diff - (time_diff % stride);
265
266 if time_diff < 0 && stride > 1 && time_delta != time_diff {
267 time_delta - stride
269 } else {
270 time_delta
271 }
272}
273
274fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276 let source_date = to_utc_date_time(source);
278 let origin_date = to_utc_date_time(origin);
279
280 let month_diff = (source_date.year() - origin_date.year()) * 12
282 + source_date.month() as i32
283 - origin_date.month() as i32;
284
285 let month_delta = compute_distance(month_diff as i64, stride_months);
287
288 let mut bin_time = if month_delta < 0 {
289 origin_date - Months::new(month_delta.unsigned_abs() as u32)
290 } else {
291 origin_date + Months::new(month_delta as u32)
292 };
293
294 if bin_time > source_date {
297 let month_delta = month_delta - stride_months;
298 bin_time = if month_delta < 0 {
299 origin_date - Months::new(month_delta.unsigned_abs() as u32)
300 } else {
301 origin_date + Months::new(month_delta as u32)
302 };
303 }
304
305 bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309 let secs = nanos / 1_000_000_000;
310 let nsec = (nanos % 1_000_000_000) as u32;
311 DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314fn date_bin_impl(
321 stride: &ColumnarValue,
322 array: &ColumnarValue,
323 origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325 let stride = match stride {
326 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327 let (days, ms) = IntervalDayTimeType::to_parts(*v);
328 let nanos = (TimeDelta::try_days(days as i64).unwrap()
329 + TimeDelta::try_milliseconds(ms as i64).unwrap())
330 .num_nanoseconds();
331
332 match nanos {
333 Some(v) => Interval::Nanoseconds(v),
334 _ => return exec_err!("DATE_BIN stride argument is too large"),
335 }
336 }
337 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340 if months != 0 {
342 if days != 0 || nanos != 0 {
344 return not_impl_err!(
345 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346 );
347 } else {
348 Interval::Months(months as i64)
349 }
350 } else {
351 let nanos = (TimeDelta::try_days(days as i64).unwrap()
352 + Duration::nanoseconds(nanos))
353 .num_nanoseconds();
354 match nanos {
355 Some(v) => Interval::Nanoseconds(v),
356 _ => return exec_err!("DATE_BIN stride argument is too large"),
357 }
358 }
359 }
360 ColumnarValue::Scalar(v) => {
361 return exec_err!(
362 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363 v.data_type()
364 );
365 }
366 ColumnarValue::Array(_) => {
367 return not_impl_err!(
368 "DATE_BIN only supports literal values for the stride argument, not arrays"
369 );
370 }
371 };
372
373 let origin = match origin {
374 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375 ColumnarValue::Scalar(v) => {
376 return exec_err!(
377 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378 v.data_type()
379 );
380 }
381 ColumnarValue::Array(_) => {
382 return not_impl_err!(
383 "DATE_BIN only supports literal values for the origin argument, not arrays"
384 );
385 }
386 };
387
388 let (stride, stride_fn) = stride.bin_fn();
389
390 if stride == 0 {
392 return exec_err!("DATE_BIN stride must be non-zero");
393 }
394
395 fn stride_map_fn<T: ArrowTimestampType>(
396 origin: i64,
397 stride: i64,
398 stride_fn: fn(i64, i64, i64) -> i64,
399 ) -> impl Fn(i64) -> i64 {
400 let scale = match T::UNIT {
401 Nanosecond => 1,
402 Microsecond => NANOSECONDS / 1_000_000,
403 Millisecond => NANOSECONDS / 1_000,
404 Second => NANOSECONDS,
405 };
406 move |x: i64| stride_fn(stride, x * scale, origin) / scale
407 }
408
409 Ok(match array {
410 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411 let apply_stride_fn =
412 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414 v.map(apply_stride_fn),
415 tz_opt.clone(),
416 ))
417 }
418 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419 let apply_stride_fn =
420 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422 v.map(apply_stride_fn),
423 tz_opt.clone(),
424 ))
425 }
426 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427 let apply_stride_fn =
428 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430 v.map(apply_stride_fn),
431 tz_opt.clone(),
432 ))
433 }
434 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435 let apply_stride_fn =
436 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438 v.map(apply_stride_fn),
439 tz_opt.clone(),
440 ))
441 }
442
443 ColumnarValue::Array(array) => {
444 fn transform_array_with_stride<T>(
445 origin: i64,
446 stride: i64,
447 stride_fn: fn(i64, i64, i64) -> i64,
448 array: &ArrayRef,
449 tz_opt: &Option<Arc<str>>,
450 ) -> Result<ColumnarValue>
451 where
452 T: ArrowTimestampType,
453 {
454 let array = as_primitive_array::<T>(array)?;
455 let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456 let array: PrimitiveArray<T> = array
457 .unary(apply_stride_fn)
458 .with_timezone_opt(tz_opt.clone());
459
460 Ok(ColumnarValue::Array(Arc::new(array)))
461 }
462
463 match array.data_type() {
464 Timestamp(Nanosecond, tz_opt) => {
465 transform_array_with_stride::<TimestampNanosecondType>(
466 origin, stride, stride_fn, array, tz_opt,
467 )?
468 }
469 Timestamp(Microsecond, tz_opt) => {
470 transform_array_with_stride::<TimestampMicrosecondType>(
471 origin, stride, stride_fn, array, tz_opt,
472 )?
473 }
474 Timestamp(Millisecond, tz_opt) => {
475 transform_array_with_stride::<TimestampMillisecondType>(
476 origin, stride, stride_fn, array, tz_opt,
477 )?
478 }
479 Timestamp(Second, tz_opt) => {
480 transform_array_with_stride::<TimestampSecondType>(
481 origin, stride, stride_fn, array, tz_opt,
482 )?
483 }
484 _ => {
485 return exec_err!(
486 "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487 array.data_type()
488 );
489 }
490 }
491 }
492 _ => {
493 return exec_err!(
494 "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495 );
496 }
497 })
498}
499
500#[cfg(test)]
501mod tests {
502 use std::sync::Arc;
503
504 use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505 use arrow::array::types::TimestampNanosecondType;
506 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508 use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
509
510 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511 use datafusion_common::{DataFusionError, ScalarValue};
512 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514 use chrono::TimeDelta;
515 use datafusion_common::config::ConfigOptions;
516
517 fn invoke_date_bin_with_args(
518 args: Vec<ColumnarValue>,
519 number_rows: usize,
520 return_field: &FieldRef,
521 ) -> Result<ColumnarValue, DataFusionError> {
522 let arg_fields = args
523 .iter()
524 .map(|arg| Field::new("a", arg.data_type(), true).into())
525 .collect::<Vec<_>>();
526
527 let args = datafusion_expr::ScalarFunctionArgs {
528 args,
529 arg_fields,
530 number_rows,
531 return_field: Arc::clone(return_field),
532 config_options: Arc::new(ConfigOptions::default()),
533 };
534 DateBinFunc::new().invoke_with_args(args)
535 }
536
537 #[test]
538 fn test_date_bin() {
539 let return_field = &Arc::new(Field::new(
540 "f",
541 DataType::Timestamp(TimeUnit::Nanosecond, None),
542 true,
543 ));
544
545 let mut args = vec![
546 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
547 days: 0,
548 milliseconds: 1,
549 }))),
550 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
551 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
552 ];
553 let res = invoke_date_bin_with_args(args, 1, return_field);
554 assert!(res.is_ok());
555
556 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
557 let batch_len = timestamps.len();
558 args = vec![
559 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
560 days: 0,
561 milliseconds: 1,
562 }))),
563 ColumnarValue::Array(timestamps),
564 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
565 ];
566 let res = invoke_date_bin_with_args(args, batch_len, return_field);
567 assert!(res.is_ok());
568
569 args = vec![
570 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
571 days: 0,
572 milliseconds: 1,
573 }))),
574 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
575 ];
576 let res = invoke_date_bin_with_args(args, 1, return_field);
577 assert!(res.is_ok());
578
579 args = vec![
581 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
582 IntervalMonthDayNano {
583 months: 0,
584 days: 0,
585 nanoseconds: 1,
586 },
587 ))),
588 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
589 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
590 ];
591 let res = invoke_date_bin_with_args(args, 1, return_field);
592 assert!(res.is_ok());
593
594 args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
600 IntervalDayTime {
601 days: 0,
602 milliseconds: 1,
603 },
604 )))];
605 let res = invoke_date_bin_with_args(args, 1, return_field);
606 assert_eq!(
607 res.err().unwrap().strip_backtrace(),
608 "Execution error: DATE_BIN expected two or three arguments"
609 );
610
611 args = vec![
613 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
614 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
615 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
616 ];
617 let res = invoke_date_bin_with_args(args, 1, return_field);
618 assert_eq!(
619 res.err().unwrap().strip_backtrace(),
620 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
621 );
622
623 args = vec![
626 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
627 days: 0,
628 milliseconds: 0,
629 }))),
630 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
631 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
632 ];
633
634 let res = invoke_date_bin_with_args(args, 1, return_field);
635 assert_eq!(
636 res.err().unwrap().strip_backtrace(),
637 "Execution error: DATE_BIN stride must be non-zero"
638 );
639
640 args = vec![
642 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
643 IntervalDayTime::MAX,
644 ))),
645 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
646 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
647 ];
648 let res = invoke_date_bin_with_args(args, 1, return_field);
649 assert_eq!(
650 res.err().unwrap().strip_backtrace(),
651 "Execution error: DATE_BIN stride argument is too large"
652 );
653
654 args = vec![
656 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
657 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
658 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
659 ];
660 let res = invoke_date_bin_with_args(args, 1, return_field);
661 assert_eq!(
662 res.err().unwrap().strip_backtrace(),
663 "Execution error: DATE_BIN stride argument is too large"
664 );
665
666 args = vec![
668 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
669 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
670 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
671 ];
672 let res = invoke_date_bin_with_args(args, 1, return_field);
673 assert_eq!(
674 res.err().unwrap().strip_backtrace(),
675 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
676 );
677
678 args = vec![
680 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
681 days: 0,
682 milliseconds: 1,
683 }))),
684 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
685 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
686 ];
687 let res = invoke_date_bin_with_args(args, 1, return_field);
688 assert_eq!(
689 res.err().unwrap().strip_backtrace(),
690 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
691 );
692
693 args = vec![
694 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
695 days: 0,
696 milliseconds: 1,
697 }))),
698 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
699 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
700 ];
701 let res = invoke_date_bin_with_args(args, 1, return_field);
702 assert!(res.is_ok());
703
704 let intervals = Arc::new(
706 (1..6)
707 .map(|x| {
708 Some(IntervalDayTime {
709 days: 0,
710 milliseconds: x,
711 })
712 })
713 .collect::<IntervalDayTimeArray>(),
714 );
715 args = vec![
716 ColumnarValue::Array(intervals),
717 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
718 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
719 ];
720 let res = invoke_date_bin_with_args(args, 1, return_field);
721 assert_eq!(
722 res.err().unwrap().strip_backtrace(),
723 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
724 );
725
726 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
728 let batch_len = timestamps.len();
729 args = vec![
730 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
731 days: 0,
732 milliseconds: 1,
733 }))),
734 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
735 ColumnarValue::Array(timestamps),
736 ];
737 let res = invoke_date_bin_with_args(args, batch_len, return_field);
738 assert_eq!(
739 res.err().unwrap().strip_backtrace(),
740 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
741 );
742 }
743
744 #[test]
745 fn test_date_bin_timezones() {
746 let cases = vec![
747 (
748 vec![
749 "2020-09-08T00:00:00Z",
750 "2020-09-08T01:00:00Z",
751 "2020-09-08T02:00:00Z",
752 "2020-09-08T03:00:00Z",
753 "2020-09-08T04:00:00Z",
754 ],
755 Some("+00".into()),
756 "1970-01-01T00:00:00Z",
757 vec![
758 "2020-09-08T00:00:00Z",
759 "2020-09-08T00:00:00Z",
760 "2020-09-08T00:00:00Z",
761 "2020-09-08T00:00:00Z",
762 "2020-09-08T00:00:00Z",
763 ],
764 ),
765 (
766 vec![
767 "2020-09-08T00:00:00Z",
768 "2020-09-08T01:00:00Z",
769 "2020-09-08T02:00:00Z",
770 "2020-09-08T03:00:00Z",
771 "2020-09-08T04:00:00Z",
772 ],
773 None,
774 "1970-01-01T00:00:00Z",
775 vec![
776 "2020-09-08T00:00:00Z",
777 "2020-09-08T00:00:00Z",
778 "2020-09-08T00:00:00Z",
779 "2020-09-08T00:00:00Z",
780 "2020-09-08T00:00:00Z",
781 ],
782 ),
783 (
784 vec![
785 "2020-09-08T00:00:00Z",
786 "2020-09-08T01:00:00Z",
787 "2020-09-08T02:00:00Z",
788 "2020-09-08T03:00:00Z",
789 "2020-09-08T04:00:00Z",
790 ],
791 Some("-02".into()),
792 "1970-01-01T00:00:00Z",
793 vec![
794 "2020-09-08T00:00:00Z",
795 "2020-09-08T00:00:00Z",
796 "2020-09-08T00:00:00Z",
797 "2020-09-08T00:00:00Z",
798 "2020-09-08T00:00:00Z",
799 ],
800 ),
801 (
802 vec![
803 "2020-09-08T00:00:00+05",
804 "2020-09-08T01:00:00+05",
805 "2020-09-08T02:00:00+05",
806 "2020-09-08T03:00:00+05",
807 "2020-09-08T04:00:00+05",
808 ],
809 Some("+05".into()),
810 "1970-01-01T00:00:00+05",
811 vec![
812 "2020-09-08T00:00:00+05",
813 "2020-09-08T00:00:00+05",
814 "2020-09-08T00:00:00+05",
815 "2020-09-08T00:00:00+05",
816 "2020-09-08T00:00:00+05",
817 ],
818 ),
819 (
820 vec![
821 "2020-09-08T00:00:00+08",
822 "2020-09-08T01:00:00+08",
823 "2020-09-08T02:00:00+08",
824 "2020-09-08T03:00:00+08",
825 "2020-09-08T04:00:00+08",
826 ],
827 Some("+08".into()),
828 "1970-01-01T00:00:00+08",
829 vec![
830 "2020-09-08T00:00:00+08",
831 "2020-09-08T00:00:00+08",
832 "2020-09-08T00:00:00+08",
833 "2020-09-08T00:00:00+08",
834 "2020-09-08T00:00:00+08",
835 ],
836 ),
837 ];
838
839 cases
840 .iter()
841 .for_each(|(original, tz_opt, origin, expected)| {
842 let input = original
843 .iter()
844 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
845 .collect::<TimestampNanosecondArray>()
846 .with_timezone_opt(tz_opt.clone());
847 let right = expected
848 .iter()
849 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
850 .collect::<TimestampNanosecondArray>()
851 .with_timezone_opt(tz_opt.clone());
852 let batch_len = input.len();
853 let args = vec![
854 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
855 ColumnarValue::Array(Arc::new(input)),
856 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
857 Some(string_to_timestamp_nanos(origin).unwrap()),
858 tz_opt.clone(),
859 )),
860 ];
861 let return_field = &Arc::new(Field::new(
862 "f",
863 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
864 true,
865 ));
866 let result =
867 invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
868
869 if let ColumnarValue::Array(result) = result {
870 assert_eq!(
871 result.data_type(),
872 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
873 );
874 let left = arrow::array::cast::as_primitive_array::<
875 TimestampNanosecondType,
876 >(&result);
877 assert_eq!(left, &right);
878 } else {
879 panic!("unexpected column type");
880 }
881 });
882 }
883
884 #[test]
885 fn test_date_bin_single() {
886 let cases = vec![
887 (
888 (
889 TimeDelta::try_minutes(15),
890 "2004-04-09T02:03:04.123456789Z",
891 "2001-01-01T00:00:00",
892 ),
893 "2004-04-09T02:00:00Z",
894 ),
895 (
896 (
897 TimeDelta::try_minutes(15),
898 "2004-04-09T02:03:04.123456789Z",
899 "2001-01-01T00:02:30",
900 ),
901 "2004-04-09T02:02:30Z",
902 ),
903 (
904 (
905 TimeDelta::try_minutes(15),
906 "2004-04-09T02:03:04.123456789Z",
907 "2005-01-01T00:02:30",
908 ),
909 "2004-04-09T02:02:30Z",
910 ),
911 (
912 (
913 TimeDelta::try_hours(1),
914 "2004-04-09T02:03:04.123456789Z",
915 "2001-01-01T00:00:00",
916 ),
917 "2004-04-09T02:00:00Z",
918 ),
919 (
920 (
921 TimeDelta::try_seconds(10),
922 "2004-04-09T02:03:11.123456789Z",
923 "2001-01-01T00:00:00",
924 ),
925 "2004-04-09T02:03:10Z",
926 ),
927 ];
928
929 cases
930 .iter()
931 .for_each(|((stride, source, origin), expected)| {
932 let stride = stride.unwrap();
933 let stride1 = stride.num_nanoseconds().unwrap();
934 let source1 = string_to_timestamp_nanos(source).unwrap();
935 let origin1 = string_to_timestamp_nanos(origin).unwrap();
936
937 let expected1 = string_to_timestamp_nanos(expected).unwrap();
938 let result = date_bin_nanos_interval(stride1, source1, origin1);
939 assert_eq!(result, expected1, "{source} = {expected}");
940 })
941 }
942
943 #[test]
944 fn test_date_bin_before_epoch() {
945 let cases = [
946 (
947 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
948 "1969-12-31T23:30:00",
949 ),
950 (
951 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
952 "1969-12-31T23:45:00",
953 ),
954 (
955 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
956 "1969-12-31T23:45:00",
957 ),
958 ];
959
960 cases.iter().for_each(|((stride, source), expected)| {
961 let stride = stride.unwrap();
962 let stride1 = stride.num_nanoseconds().unwrap();
963 let source1 = string_to_timestamp_nanos(source).unwrap();
964
965 let expected1 = string_to_timestamp_nanos(expected).unwrap();
966 let result = date_bin_nanos_interval(stride1, source1, 0);
967 assert_eq!(result, expected1, "{source} = {expected}");
968 })
969 }
970}