1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25 TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45 doc_section(label = "Time and Date Functions"),
46 description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51 syntax_example = "date_bin(interval, expression, origin-timestamp)",
52 sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
56+---------------------+
57| bin |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
65> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
67+---------------------+
68| bin |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75 argument(name = "interval", description = "Bin interval."),
76 argument(
77 name = "expression",
78 description = "Time expression to operate on. Can be a constant, column, or function."
79 ),
80 argument(
81 name = "origin-timestamp",
82 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84 - nanoseconds
85 - microseconds
86 - milliseconds
87 - seconds
88 - minutes
89 - hours
90 - days
91 - weeks
92 - months
93 - years
94 - century
95"#
96 )
97)]
98#[derive(Debug)]
99pub struct DateBinFunc {
100 signature: Signature,
101}
102
103impl Default for DateBinFunc {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl DateBinFunc {
110 pub fn new() -> Self {
111 let base_sig = |array_type: TimeUnit| {
112 vec![
113 Exact(vec![
114 DataType::Interval(MonthDayNano),
115 Timestamp(array_type, None),
116 Timestamp(Nanosecond, None),
117 ]),
118 Exact(vec![
119 DataType::Interval(MonthDayNano),
120 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122 ]),
123 Exact(vec![
124 DataType::Interval(DayTime),
125 Timestamp(array_type, None),
126 Timestamp(Nanosecond, None),
127 ]),
128 Exact(vec![
129 DataType::Interval(DayTime),
130 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132 ]),
133 Exact(vec![
134 DataType::Interval(MonthDayNano),
135 Timestamp(array_type, None),
136 ]),
137 Exact(vec![
138 DataType::Interval(MonthDayNano),
139 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140 ]),
141 Exact(vec![
142 DataType::Interval(DayTime),
143 Timestamp(array_type, None),
144 ]),
145 Exact(vec![
146 DataType::Interval(DayTime),
147 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148 ]),
149 ]
150 };
151
152 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153 .into_iter()
154 .map(base_sig)
155 .collect::<Vec<_>>()
156 .concat();
157
158 Self {
159 signature: Signature::one_of(full_sig, Volatility::Immutable),
160 }
161 }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn name(&self) -> &str {
170 "date_bin"
171 }
172
173 fn signature(&self) -> &Signature {
174 &self.signature
175 }
176
177 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178 match &arg_types[1] {
179 Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180 Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181 Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182 Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183 Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184 _ => plan_err!(
185 "The date_bin function can only accept timestamp as the second arg."
186 ),
187 }
188 }
189
190 fn invoke_with_args(
191 &self,
192 args: datafusion_expr::ScalarFunctionArgs,
193 ) -> Result<ColumnarValue> {
194 let args = &args.args;
195 if args.len() == 2 {
196 let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198 Some(0),
199 Some("+00:00".into()),
200 ));
201 date_bin_impl(&args[0], &args[1], &origin)
202 } else if args.len() == 3 {
203 date_bin_impl(&args[0], &args[1], &args[2])
204 } else {
205 exec_err!("DATE_BIN expected two or three arguments")
206 }
207 }
208
209 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210 let step = &input[0];
212 let date_value = &input[1];
213 let reference = input.get(2);
214
215 if step.sort_properties.eq(&SortProperties::Singleton)
216 && reference
217 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218 .unwrap_or(true)
219 {
220 Ok(date_value.sort_properties)
221 } else {
222 Ok(SortProperties::Unordered)
223 }
224 }
225 fn documentation(&self) -> Option<&Documentation> {
226 self.doc()
227 }
228}
229
230enum Interval {
231 Nanoseconds(i64),
232 Months(i64),
233}
234
235impl Interval {
236 fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245 match self {
246 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247 Interval::Months(months) => (*months, date_bin_months_interval),
248 }
249 }
250}
251
252fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254 let time_diff = source - origin;
255
256 let time_delta = compute_distance(time_diff, stride_nanos);
258
259 origin + time_delta
260}
261
262fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264 let time_delta = time_diff - (time_diff % stride);
265
266 if time_diff < 0 && stride > 1 && time_delta != time_diff {
267 time_delta - stride
269 } else {
270 time_delta
271 }
272}
273
274fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276 let source_date = to_utc_date_time(source);
278 let origin_date = to_utc_date_time(origin);
279
280 let month_diff = (source_date.year() - origin_date.year()) * 12
282 + source_date.month() as i32
283 - origin_date.month() as i32;
284
285 let month_delta = compute_distance(month_diff as i64, stride_months);
287
288 let mut bin_time = if month_delta < 0 {
289 origin_date - Months::new(month_delta.unsigned_abs() as u32)
290 } else {
291 origin_date + Months::new(month_delta as u32)
292 };
293
294 if bin_time > source_date {
297 let month_delta = month_delta - stride_months;
298 bin_time = if month_delta < 0 {
299 origin_date - Months::new(month_delta.unsigned_abs() as u32)
300 } else {
301 origin_date + Months::new(month_delta as u32)
302 };
303 }
304
305 bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309 let secs = nanos / 1_000_000_000;
310 let nsec = (nanos % 1_000_000_000) as u32;
311 DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314fn date_bin_impl(
321 stride: &ColumnarValue,
322 array: &ColumnarValue,
323 origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325 let stride = match stride {
326 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327 let (days, ms) = IntervalDayTimeType::to_parts(*v);
328 let nanos = (TimeDelta::try_days(days as i64).unwrap()
329 + TimeDelta::try_milliseconds(ms as i64).unwrap())
330 .num_nanoseconds();
331
332 match nanos {
333 Some(v) => Interval::Nanoseconds(v),
334 _ => return exec_err!("DATE_BIN stride argument is too large"),
335 }
336 }
337 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340 if months != 0 {
342 if days != 0 || nanos != 0 {
344 return not_impl_err!(
345 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346 );
347 } else {
348 Interval::Months(months as i64)
349 }
350 } else {
351 let nanos = (TimeDelta::try_days(days as i64).unwrap()
352 + Duration::nanoseconds(nanos))
353 .num_nanoseconds();
354 match nanos {
355 Some(v) => Interval::Nanoseconds(v),
356 _ => return exec_err!("DATE_BIN stride argument is too large"),
357 }
358 }
359 }
360 ColumnarValue::Scalar(v) => {
361 return exec_err!(
362 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363 v.data_type()
364 );
365 }
366 ColumnarValue::Array(_) => {
367 return not_impl_err!(
368 "DATE_BIN only supports literal values for the stride argument, not arrays"
369 );
370 }
371 };
372
373 let origin = match origin {
374 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375 ColumnarValue::Scalar(v) => {
376 return exec_err!(
377 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378 v.data_type()
379 );
380 }
381 ColumnarValue::Array(_) => {
382 return not_impl_err!(
383 "DATE_BIN only supports literal values for the origin argument, not arrays"
384 );
385 }
386 };
387
388 let (stride, stride_fn) = stride.bin_fn();
389
390 if stride == 0 {
392 return exec_err!("DATE_BIN stride must be non-zero");
393 }
394
395 fn stride_map_fn<T: ArrowTimestampType>(
396 origin: i64,
397 stride: i64,
398 stride_fn: fn(i64, i64, i64) -> i64,
399 ) -> impl Fn(i64) -> i64 {
400 let scale = match T::UNIT {
401 Nanosecond => 1,
402 Microsecond => NANOSECONDS / 1_000_000,
403 Millisecond => NANOSECONDS / 1_000,
404 Second => NANOSECONDS,
405 };
406 move |x: i64| stride_fn(stride, x * scale, origin) / scale
407 }
408
409 Ok(match array {
410 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411 let apply_stride_fn =
412 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414 v.map(apply_stride_fn),
415 tz_opt.clone(),
416 ))
417 }
418 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419 let apply_stride_fn =
420 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422 v.map(apply_stride_fn),
423 tz_opt.clone(),
424 ))
425 }
426 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427 let apply_stride_fn =
428 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430 v.map(apply_stride_fn),
431 tz_opt.clone(),
432 ))
433 }
434 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435 let apply_stride_fn =
436 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438 v.map(apply_stride_fn),
439 tz_opt.clone(),
440 ))
441 }
442
443 ColumnarValue::Array(array) => {
444 fn transform_array_with_stride<T>(
445 origin: i64,
446 stride: i64,
447 stride_fn: fn(i64, i64, i64) -> i64,
448 array: &ArrayRef,
449 tz_opt: &Option<Arc<str>>,
450 ) -> Result<ColumnarValue>
451 where
452 T: ArrowTimestampType,
453 {
454 let array = as_primitive_array::<T>(array)?;
455 let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456 let array: PrimitiveArray<T> = array
457 .unary(apply_stride_fn)
458 .with_timezone_opt(tz_opt.clone());
459
460 Ok(ColumnarValue::Array(Arc::new(array)))
461 }
462
463 match array.data_type() {
464 Timestamp(Nanosecond, tz_opt) => {
465 transform_array_with_stride::<TimestampNanosecondType>(
466 origin, stride, stride_fn, array, tz_opt,
467 )?
468 }
469 Timestamp(Microsecond, tz_opt) => {
470 transform_array_with_stride::<TimestampMicrosecondType>(
471 origin, stride, stride_fn, array, tz_opt,
472 )?
473 }
474 Timestamp(Millisecond, tz_opt) => {
475 transform_array_with_stride::<TimestampMillisecondType>(
476 origin, stride, stride_fn, array, tz_opt,
477 )?
478 }
479 Timestamp(Second, tz_opt) => {
480 transform_array_with_stride::<TimestampSecondType>(
481 origin, stride, stride_fn, array, tz_opt,
482 )?
483 }
484 _ => {
485 return exec_err!(
486 "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487 array.data_type()
488 );
489 }
490 }
491 }
492 _ => {
493 return exec_err!(
494 "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495 );
496 }
497 })
498}
499
500#[cfg(test)]
501mod tests {
502 use std::sync::Arc;
503
504 use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505 use arrow::array::types::TimestampNanosecondType;
506 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508 use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
509
510 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511 use datafusion_common::{DataFusionError, ScalarValue};
512 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514 use chrono::TimeDelta;
515
516 fn invoke_date_bin_with_args(
517 args: Vec<ColumnarValue>,
518 number_rows: usize,
519 return_field: &FieldRef,
520 ) -> Result<ColumnarValue, DataFusionError> {
521 let arg_fields = args
522 .iter()
523 .map(|arg| Field::new("a", arg.data_type(), true).into())
524 .collect::<Vec<_>>();
525
526 let args = datafusion_expr::ScalarFunctionArgs {
527 args,
528 arg_fields,
529 number_rows,
530 return_field: Arc::clone(return_field),
531 };
532 DateBinFunc::new().invoke_with_args(args)
533 }
534
535 #[test]
536 fn test_date_bin() {
537 let return_field = &Arc::new(Field::new(
538 "f",
539 DataType::Timestamp(TimeUnit::Nanosecond, None),
540 true,
541 ));
542
543 let mut args = vec![
544 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
545 days: 0,
546 milliseconds: 1,
547 }))),
548 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
549 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
550 ];
551 let res = invoke_date_bin_with_args(args, 1, return_field);
552 assert!(res.is_ok());
553
554 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
555 let batch_len = timestamps.len();
556 args = vec![
557 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
558 days: 0,
559 milliseconds: 1,
560 }))),
561 ColumnarValue::Array(timestamps),
562 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
563 ];
564 let res = invoke_date_bin_with_args(args, batch_len, return_field);
565 assert!(res.is_ok());
566
567 args = vec![
568 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
569 days: 0,
570 milliseconds: 1,
571 }))),
572 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
573 ];
574 let res = invoke_date_bin_with_args(args, 1, return_field);
575 assert!(res.is_ok());
576
577 args = vec![
579 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
580 IntervalMonthDayNano {
581 months: 0,
582 days: 0,
583 nanoseconds: 1,
584 },
585 ))),
586 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
587 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
588 ];
589 let res = invoke_date_bin_with_args(args, 1, return_field);
590 assert!(res.is_ok());
591
592 args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
598 IntervalDayTime {
599 days: 0,
600 milliseconds: 1,
601 },
602 )))];
603 let res = invoke_date_bin_with_args(args, 1, return_field);
604 assert_eq!(
605 res.err().unwrap().strip_backtrace(),
606 "Execution error: DATE_BIN expected two or three arguments"
607 );
608
609 args = vec![
611 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
612 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
613 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
614 ];
615 let res = invoke_date_bin_with_args(args, 1, return_field);
616 assert_eq!(
617 res.err().unwrap().strip_backtrace(),
618 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
619 );
620
621 args = vec![
624 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
625 days: 0,
626 milliseconds: 0,
627 }))),
628 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
629 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
630 ];
631
632 let res = invoke_date_bin_with_args(args, 1, return_field);
633 assert_eq!(
634 res.err().unwrap().strip_backtrace(),
635 "Execution error: DATE_BIN stride must be non-zero"
636 );
637
638 args = vec![
640 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
641 IntervalDayTime::MAX,
642 ))),
643 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
644 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
645 ];
646 let res = invoke_date_bin_with_args(args, 1, return_field);
647 assert_eq!(
648 res.err().unwrap().strip_backtrace(),
649 "Execution error: DATE_BIN stride argument is too large"
650 );
651
652 args = vec![
654 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
655 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
656 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
657 ];
658 let res = invoke_date_bin_with_args(args, 1, return_field);
659 assert_eq!(
660 res.err().unwrap().strip_backtrace(),
661 "Execution error: DATE_BIN stride argument is too large"
662 );
663
664 args = vec![
666 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
667 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
668 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
669 ];
670 let res = invoke_date_bin_with_args(args, 1, return_field);
671 assert_eq!(
672 res.err().unwrap().strip_backtrace(),
673 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
674 );
675
676 args = vec![
678 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
679 days: 0,
680 milliseconds: 1,
681 }))),
682 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
683 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
684 ];
685 let res = invoke_date_bin_with_args(args, 1, return_field);
686 assert_eq!(
687 res.err().unwrap().strip_backtrace(),
688 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
689 );
690
691 args = vec![
692 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
693 days: 0,
694 milliseconds: 1,
695 }))),
696 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
697 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
698 ];
699 let res = invoke_date_bin_with_args(args, 1, return_field);
700 assert!(res.is_ok());
701
702 let intervals = Arc::new(
704 (1..6)
705 .map(|x| {
706 Some(IntervalDayTime {
707 days: 0,
708 milliseconds: x,
709 })
710 })
711 .collect::<IntervalDayTimeArray>(),
712 );
713 args = vec![
714 ColumnarValue::Array(intervals),
715 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
716 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
717 ];
718 let res = invoke_date_bin_with_args(args, 1, return_field);
719 assert_eq!(
720 res.err().unwrap().strip_backtrace(),
721 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
722 );
723
724 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
726 let batch_len = timestamps.len();
727 args = vec![
728 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
729 days: 0,
730 milliseconds: 1,
731 }))),
732 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
733 ColumnarValue::Array(timestamps),
734 ];
735 let res = invoke_date_bin_with_args(args, batch_len, return_field);
736 assert_eq!(
737 res.err().unwrap().strip_backtrace(),
738 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
739 );
740 }
741
742 #[test]
743 fn test_date_bin_timezones() {
744 let cases = vec![
745 (
746 vec![
747 "2020-09-08T00:00:00Z",
748 "2020-09-08T01:00:00Z",
749 "2020-09-08T02:00:00Z",
750 "2020-09-08T03:00:00Z",
751 "2020-09-08T04:00:00Z",
752 ],
753 Some("+00".into()),
754 "1970-01-01T00:00:00Z",
755 vec![
756 "2020-09-08T00:00:00Z",
757 "2020-09-08T00:00:00Z",
758 "2020-09-08T00:00:00Z",
759 "2020-09-08T00:00:00Z",
760 "2020-09-08T00:00:00Z",
761 ],
762 ),
763 (
764 vec![
765 "2020-09-08T00:00:00Z",
766 "2020-09-08T01:00:00Z",
767 "2020-09-08T02:00:00Z",
768 "2020-09-08T03:00:00Z",
769 "2020-09-08T04:00:00Z",
770 ],
771 None,
772 "1970-01-01T00:00:00Z",
773 vec![
774 "2020-09-08T00:00:00Z",
775 "2020-09-08T00:00:00Z",
776 "2020-09-08T00:00:00Z",
777 "2020-09-08T00:00:00Z",
778 "2020-09-08T00:00:00Z",
779 ],
780 ),
781 (
782 vec![
783 "2020-09-08T00:00:00Z",
784 "2020-09-08T01:00:00Z",
785 "2020-09-08T02:00:00Z",
786 "2020-09-08T03:00:00Z",
787 "2020-09-08T04:00:00Z",
788 ],
789 Some("-02".into()),
790 "1970-01-01T00:00:00Z",
791 vec![
792 "2020-09-08T00:00:00Z",
793 "2020-09-08T00:00:00Z",
794 "2020-09-08T00:00:00Z",
795 "2020-09-08T00:00:00Z",
796 "2020-09-08T00:00:00Z",
797 ],
798 ),
799 (
800 vec![
801 "2020-09-08T00:00:00+05",
802 "2020-09-08T01:00:00+05",
803 "2020-09-08T02:00:00+05",
804 "2020-09-08T03:00:00+05",
805 "2020-09-08T04:00:00+05",
806 ],
807 Some("+05".into()),
808 "1970-01-01T00:00:00+05",
809 vec![
810 "2020-09-08T00:00:00+05",
811 "2020-09-08T00:00:00+05",
812 "2020-09-08T00:00:00+05",
813 "2020-09-08T00:00:00+05",
814 "2020-09-08T00:00:00+05",
815 ],
816 ),
817 (
818 vec![
819 "2020-09-08T00:00:00+08",
820 "2020-09-08T01:00:00+08",
821 "2020-09-08T02:00:00+08",
822 "2020-09-08T03:00:00+08",
823 "2020-09-08T04:00:00+08",
824 ],
825 Some("+08".into()),
826 "1970-01-01T00:00:00+08",
827 vec![
828 "2020-09-08T00:00:00+08",
829 "2020-09-08T00:00:00+08",
830 "2020-09-08T00:00:00+08",
831 "2020-09-08T00:00:00+08",
832 "2020-09-08T00:00:00+08",
833 ],
834 ),
835 ];
836
837 cases
838 .iter()
839 .for_each(|(original, tz_opt, origin, expected)| {
840 let input = original
841 .iter()
842 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
843 .collect::<TimestampNanosecondArray>()
844 .with_timezone_opt(tz_opt.clone());
845 let right = expected
846 .iter()
847 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
848 .collect::<TimestampNanosecondArray>()
849 .with_timezone_opt(tz_opt.clone());
850 let batch_len = input.len();
851 let args = vec![
852 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
853 ColumnarValue::Array(Arc::new(input)),
854 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
855 Some(string_to_timestamp_nanos(origin).unwrap()),
856 tz_opt.clone(),
857 )),
858 ];
859 let return_field = &Arc::new(Field::new(
860 "f",
861 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
862 true,
863 ));
864 let result =
865 invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
866
867 if let ColumnarValue::Array(result) = result {
868 assert_eq!(
869 result.data_type(),
870 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
871 );
872 let left = arrow::array::cast::as_primitive_array::<
873 TimestampNanosecondType,
874 >(&result);
875 assert_eq!(left, &right);
876 } else {
877 panic!("unexpected column type");
878 }
879 });
880 }
881
882 #[test]
883 fn test_date_bin_single() {
884 let cases = vec![
885 (
886 (
887 TimeDelta::try_minutes(15),
888 "2004-04-09T02:03:04.123456789Z",
889 "2001-01-01T00:00:00",
890 ),
891 "2004-04-09T02:00:00Z",
892 ),
893 (
894 (
895 TimeDelta::try_minutes(15),
896 "2004-04-09T02:03:04.123456789Z",
897 "2001-01-01T00:02:30",
898 ),
899 "2004-04-09T02:02:30Z",
900 ),
901 (
902 (
903 TimeDelta::try_minutes(15),
904 "2004-04-09T02:03:04.123456789Z",
905 "2005-01-01T00:02:30",
906 ),
907 "2004-04-09T02:02:30Z",
908 ),
909 (
910 (
911 TimeDelta::try_hours(1),
912 "2004-04-09T02:03:04.123456789Z",
913 "2001-01-01T00:00:00",
914 ),
915 "2004-04-09T02:00:00Z",
916 ),
917 (
918 (
919 TimeDelta::try_seconds(10),
920 "2004-04-09T02:03:11.123456789Z",
921 "2001-01-01T00:00:00",
922 ),
923 "2004-04-09T02:03:10Z",
924 ),
925 ];
926
927 cases
928 .iter()
929 .for_each(|((stride, source, origin), expected)| {
930 let stride = stride.unwrap();
931 let stride1 = stride.num_nanoseconds().unwrap();
932 let source1 = string_to_timestamp_nanos(source).unwrap();
933 let origin1 = string_to_timestamp_nanos(origin).unwrap();
934
935 let expected1 = string_to_timestamp_nanos(expected).unwrap();
936 let result = date_bin_nanos_interval(stride1, source1, origin1);
937 assert_eq!(result, expected1, "{source} = {expected}");
938 })
939 }
940
941 #[test]
942 fn test_date_bin_before_epoch() {
943 let cases = [
944 (
945 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
946 "1969-12-31T23:30:00",
947 ),
948 (
949 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
950 "1969-12-31T23:45:00",
951 ),
952 (
953 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
954 "1969-12-31T23:45:00",
955 ),
956 ];
957
958 cases.iter().for_each(|((stride, source), expected)| {
959 let stride = stride.unwrap();
960 let stride1 = stride.num_nanoseconds().unwrap();
961 let source1 = string_to_timestamp_nanos(source).unwrap();
962
963 let expected1 = string_to_timestamp_nanos(expected).unwrap();
964 let result = date_bin_nanos_interval(stride1, source1, 0);
965 assert_eq!(result, expected1, "{source} = {expected}");
966 })
967 }
968}