Skip to main content

datafusion_spark/function/datetime/
make_interval.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{Array, ArrayRef, IntervalMonthDayNanoBuilder, PrimitiveArray};
22use arrow::datatypes::DataType::Interval;
23use arrow::datatypes::IntervalUnit::MonthDayNano;
24use arrow::datatypes::{DataType, IntervalMonthDayNano};
25use datafusion_common::types::{NativeType, logical_float64, logical_int32};
26use datafusion_common::{DataFusionError, Result, ScalarValue, plan_datafusion_err};
27use datafusion_expr::{
28    Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
29    TypeSignatureClass, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32
33#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkMakeInterval {
35    signature: Signature,
36}
37
38impl Default for SparkMakeInterval {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl SparkMakeInterval {
45    pub fn new() -> Self {
46        let int32 = Coercion::new_implicit(
47            TypeSignatureClass::Native(logical_int32()),
48            vec![TypeSignatureClass::Integer],
49            NativeType::Int32,
50        );
51
52        let float64 = Coercion::new_implicit(
53            TypeSignatureClass::Native(logical_float64()),
54            vec![TypeSignatureClass::Numeric],
55            NativeType::Float64,
56        );
57
58        let variants = vec![
59            TypeSignature::Nullary,
60            // year
61            TypeSignature::Coercible(vec![int32.clone()]),
62            // year, month
63            TypeSignature::Coercible(vec![int32.clone(), int32.clone()]),
64            // year, month, week
65            TypeSignature::Coercible(vec![int32.clone(), int32.clone(), int32.clone()]),
66            // year, month, week, day
67            TypeSignature::Coercible(vec![
68                int32.clone(),
69                int32.clone(),
70                int32.clone(),
71                int32.clone(),
72            ]),
73            // year, month, week, day, hour
74            TypeSignature::Coercible(vec![
75                int32.clone(),
76                int32.clone(),
77                int32.clone(),
78                int32.clone(),
79                int32.clone(),
80            ]),
81            // year, month, week, day, hour, minute
82            TypeSignature::Coercible(vec![
83                int32.clone(),
84                int32.clone(),
85                int32.clone(),
86                int32.clone(),
87                int32.clone(),
88                int32.clone(),
89            ]),
90            // year, month, week, day, hour, minute, second
91            TypeSignature::Coercible(vec![
92                int32.clone(),
93                int32.clone(),
94                int32.clone(),
95                int32.clone(),
96                int32.clone(),
97                int32.clone(),
98                float64.clone(),
99            ]),
100        ];
101
102        Self {
103            signature: Signature::one_of(variants, Volatility::Immutable),
104        }
105    }
106}
107
108impl ScalarUDFImpl for SparkMakeInterval {
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112
113    fn name(&self) -> &str {
114        "make_interval"
115    }
116
117    fn signature(&self) -> &Signature {
118        &self.signature
119    }
120
121    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
122        Ok(Interval(MonthDayNano))
123    }
124
125    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
126        if args.args.is_empty() {
127            return Ok(ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(
128                Some(IntervalMonthDayNano::new(0, 0, 0)),
129            )));
130        }
131        make_scalar_function(make_interval_kernel, vec![])(&args.args)
132    }
133}
134
135fn make_interval_kernel(args: &[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
136    use arrow::array::AsArray;
137    use arrow::datatypes::{Float64Type, Int32Type};
138
139    let n_rows = args[0].len();
140
141    let years = args[0]
142        .as_primitive_opt::<Int32Type>()
143        .ok_or_else(|| plan_datafusion_err!("make_interval arg[0] must be Int32"))?;
144    let months = args
145        .get(1)
146        .map(|a| {
147            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
148                plan_datafusion_err!("make_dt_interval arg[1] must be Int32")
149            })
150        })
151        .transpose()?;
152    let weeks = args
153        .get(2)
154        .map(|a| {
155            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
156                plan_datafusion_err!("make_dt_interval arg[2] must be Int32")
157            })
158        })
159        .transpose()?;
160    let days: Option<&PrimitiveArray<Int32Type>> = args
161        .get(3)
162        .map(|a| {
163            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
164                plan_datafusion_err!("make_dt_interval arg[3] must be Int32")
165            })
166        })
167        .transpose()?;
168    let hours: Option<&PrimitiveArray<Int32Type>> = args
169        .get(4)
170        .map(|a| {
171            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
172                plan_datafusion_err!("make_dt_interval arg[4] must be Int32")
173            })
174        })
175        .transpose()?;
176    let mins: Option<&PrimitiveArray<Int32Type>> = args
177        .get(5)
178        .map(|a| {
179            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
180                plan_datafusion_err!("make_dt_interval arg[5] must be Int32")
181            })
182        })
183        .transpose()?;
184    let secs: Option<&PrimitiveArray<Float64Type>> = args
185        .get(6)
186        .map(|a| {
187            a.as_primitive_opt::<Float64Type>().ok_or_else(|| {
188                plan_datafusion_err!("make_dt_interval arg[6] must be Float64")
189            })
190        })
191        .transpose()?;
192
193    let mut builder = IntervalMonthDayNanoBuilder::with_capacity(n_rows);
194
195    for i in 0..n_rows {
196        // if one column is NULL → result NULL
197        let any_null_present = years.is_null(i)
198            || months.as_ref().is_some_and(|a| a.is_null(i))
199            || weeks.as_ref().is_some_and(|a| a.is_null(i))
200            || days.as_ref().is_some_and(|a| a.is_null(i))
201            || hours.as_ref().is_some_and(|a| a.is_null(i))
202            || mins.as_ref().is_some_and(|a| a.is_null(i))
203            || secs
204                .as_ref()
205                .is_some_and(|a| a.is_null(i) || !a.value(i).is_finite());
206
207        if any_null_present {
208            builder.append_null();
209            continue;
210        }
211
212        // default values 0 or 0.0
213        let y = years.value(i);
214        let mo = months.as_ref().map_or(0, |a| a.value(i));
215        let w = weeks.as_ref().map_or(0, |a| a.value(i));
216        let d = days.as_ref().map_or(0, |a| a.value(i));
217        let h = hours.as_ref().map_or(0, |a| a.value(i));
218        let mi = mins.as_ref().map_or(0, |a| a.value(i));
219        let s = secs.as_ref().map_or(0.0, |a| a.value(i));
220
221        match make_interval_month_day_nano(y, mo, w, d, h, mi, s) {
222            Some(v) => builder.append_value(v),
223            None => {
224                builder.append_null();
225                continue;
226            }
227        }
228    }
229
230    Ok(Arc::new(builder.finish()))
231}
232
233fn make_interval_month_day_nano(
234    year: i32,
235    month: i32,
236    week: i32,
237    day: i32,
238    hour: i32,
239    min: i32,
240    sec: f64,
241) -> Option<IntervalMonthDayNano> {
242    // checks if overflow
243    let months = year.checked_mul(12).and_then(|v| v.checked_add(month))?;
244    let total_days = week.checked_mul(7).and_then(|v| v.checked_add(day))?;
245
246    let hours_nanos = (hour as i64).checked_mul(3_600_000_000_000)?;
247    let mins_nanos = (min as i64).checked_mul(60_000_000_000)?;
248
249    let sec_int = sec.trunc() as i64;
250    let frac = sec - sec.trunc();
251    let mut frac_nanos = (frac * 1_000_000_000.0).round() as i64;
252
253    if frac_nanos.abs() >= 1_000_000_000 {
254        if frac_nanos > 0 {
255            frac_nanos -= 1_000_000_000;
256        } else {
257            frac_nanos += 1_000_000_000;
258        }
259    }
260
261    let secs_nanos = sec_int.checked_mul(1_000_000_000)?;
262
263    let total_nanos = hours_nanos
264        .checked_add(mins_nanos)
265        .and_then(|v| v.checked_add(secs_nanos))
266        .and_then(|v| v.checked_add(frac_nanos))?;
267
268    Some(IntervalMonthDayNano::new(months, total_days, total_nanos))
269}
270
271#[cfg(test)]
272mod tests {
273    use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
274    use arrow::datatypes::Field;
275    use datafusion_common::config::ConfigOptions;
276    use datafusion_common::{
277        Result, assert_eq_or_internal_err, internal_datafusion_err, internal_err,
278    };
279
280    use super::*;
281    fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
282        make_interval_kernel(&arrs)
283    }
284
285    #[test]
286    fn nulls_propagate_per_row() {
287        let year = Arc::new(Int32Array::from(vec![
288            None,
289            Some(2),
290            Some(3),
291            Some(4),
292            Some(5),
293            Some(6),
294            Some(7),
295            Some(8),
296            Some(9),
297        ]));
298        let month = Arc::new(Int32Array::from(vec![
299            Some(1),
300            None,
301            Some(3),
302            Some(4),
303            Some(5),
304            Some(6),
305            Some(7),
306            Some(8),
307            Some(9),
308        ]));
309        let week = Arc::new(Int32Array::from(vec![
310            Some(1),
311            Some(2),
312            None,
313            Some(4),
314            Some(5),
315            Some(6),
316            Some(7),
317            Some(8),
318            Some(9),
319        ]));
320        let day = Arc::new(Int32Array::from(vec![
321            Some(1),
322            Some(2),
323            Some(3),
324            None,
325            Some(5),
326            Some(6),
327            Some(7),
328            Some(8),
329            Some(9),
330        ]));
331        let hour = Arc::new(Int32Array::from(vec![
332            Some(1),
333            Some(2),
334            Some(3),
335            Some(4),
336            None,
337            Some(6),
338            Some(7),
339            Some(8),
340            Some(9),
341        ]));
342        let min = Arc::new(Int32Array::from(vec![
343            Some(1),
344            Some(2),
345            Some(3),
346            Some(4),
347            Some(5),
348            None,
349            Some(7),
350            Some(8),
351            Some(9),
352        ]));
353        let sec = Arc::new(Float64Array::from(vec![
354            Some(1.0),
355            Some(2.0),
356            Some(3.0),
357            Some(4.0),
358            Some(5.0),
359            Some(6.0),
360            None,
361            Some(f64::INFINITY),
362            Some(f64::NEG_INFINITY),
363        ]));
364
365        let out = run_make_interval_month_day_nano(vec![
366            year, month, week, day, hour, min, sec,
367        ])
368        .unwrap();
369        let out = out
370            .as_any()
371            .downcast_ref::<IntervalMonthDayNanoArray>()
372            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
373            .unwrap();
374
375        for i in 0..out.len() {
376            assert!(out.is_null(i), "row {i} should be NULL");
377        }
378    }
379
380    #[test]
381    fn error_months_overflow_should_be_null() {
382        // months = year*12 + month → NULL
383        let year = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
384        let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
385        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
386        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
387        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
388        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
389        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
390
391        let out = run_make_interval_month_day_nano(vec![
392            year, month, week, day, hour, min, sec,
393        ])
394        .unwrap();
395        let out = out
396            .as_any()
397            .downcast_ref::<IntervalMonthDayNanoArray>()
398            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
399            .unwrap();
400
401        for i in 0..out.len() {
402            assert!(out.is_null(i), "row {i} should be NULL");
403        }
404    }
405    #[test]
406    fn error_days_overflow_should_be_null() {
407        // months = year*12 + month →  NULL
408        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
409        let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
410        let week = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
411        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
412        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
413        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
414        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
415
416        let out = run_make_interval_month_day_nano(vec![
417            year, month, week, day, hour, min, sec,
418        ])
419        .unwrap();
420        let out = out
421            .as_any()
422            .downcast_ref::<IntervalMonthDayNanoArray>()
423            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
424            .unwrap();
425
426        for i in 0..out.len() {
427            assert!(out.is_null(i), "row {i} should be NULL");
428        }
429    }
430    #[test]
431    fn error_min_overflow_should_be_null() {
432        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
433        let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
434        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
435        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
436        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
437        let min = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
438        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
439
440        let out = run_make_interval_month_day_nano(vec![
441            year, month, week, day, hour, min, sec,
442        ])
443        .unwrap();
444        let out = out
445            .as_any()
446            .downcast_ref::<IntervalMonthDayNanoArray>()
447            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
448            .unwrap();
449
450        for i in 0..out.len() {
451            assert!(out.is_null(i), "row {i} should be NULL");
452        }
453    }
454    #[test]
455    fn error_sec_overflow_should_be_null() {
456        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
457        let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
458        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
459        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
460        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
461        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
462        let sec = Arc::new(Float64Array::from(vec![Some(f64::MAX)])) as ArrayRef;
463
464        let out = run_make_interval_month_day_nano(vec![
465            year, month, week, day, hour, min, sec,
466        ])
467        .unwrap();
468        let out = out
469            .as_any()
470            .downcast_ref::<IntervalMonthDayNanoArray>()
471            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
472            .unwrap();
473
474        for i in 0..out.len() {
475            assert!(out.is_null(i), "row {i} should be NULL");
476        }
477    }
478
479    #[test]
480    fn happy_path_all_present_single_row() {
481        // 1y 2m 3w 4d 5h 6m 7.25s
482        let year = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
483        let month = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
484        let week = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef;
485        let day = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef;
486        let hour = Arc::new(Int32Array::from(vec![Some(5)])) as ArrayRef;
487        let mins = Arc::new(Int32Array::from(vec![Some(6)])) as ArrayRef;
488        let secs = Arc::new(Float64Array::from(vec![Some(7.25)])) as ArrayRef;
489
490        let out = run_make_interval_month_day_nano(vec![
491            year, month, week, day, hour, mins, secs,
492        ])
493        .unwrap();
494        assert_eq!(out.data_type(), &Interval(MonthDayNano));
495
496        let out = out
497            .as_any()
498            .downcast_ref::<IntervalMonthDayNanoArray>()
499            .unwrap();
500        assert_eq!(out.len(), 1);
501        assert_eq!(out.null_count(), 0);
502
503        let v: IntervalMonthDayNano = out.value(0);
504        assert_eq!(v.months, 12 + 2); // 14
505        assert_eq!(v.days, 3 * 7 + 4); // 25
506        let expected_nanos = (5_i64 * 3600 + 6 * 60 + 7) * 1_000_000_000 + 250_000_000;
507        assert_eq!(v.nanoseconds, expected_nanos);
508    }
509
510    #[test]
511    fn negative_components_and_fractional_seconds() {
512        // -1y -2m  -1w -1d  -1h -1m  -1.5s
513        let year = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
514        let month = Arc::new(Int32Array::from(vec![Some(-2)])) as ArrayRef;
515        let week = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
516        let day = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
517        let hour = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
518        let mins = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
519        let secs = Arc::new(Float64Array::from(vec![Some(-1.5)])) as ArrayRef;
520
521        let out = run_make_interval_month_day_nano(vec![
522            year, month, week, day, hour, mins, secs,
523        ])
524        .unwrap();
525        let out = out
526            .as_any()
527            .downcast_ref::<IntervalMonthDayNanoArray>()
528            .unwrap();
529
530        assert_eq!(out.len(), 1);
531        assert_eq!(out.null_count(), 0);
532        let v = out.value(0);
533
534        assert_eq!(v.months, -12 + (-2)); // -14
535        assert_eq!(v.days, -7 + (-1)); // -8
536
537        // -(1h + 1m + 1.5s) en nanos
538        let expected_nanos = -((3600_i64 + 60 + 1) * 1_000_000_000 + 500_000_000);
539        assert_eq!(v.nanoseconds, expected_nanos);
540    }
541
542    fn invoke_make_interval_with_args(
543        args: Vec<ColumnarValue>,
544        number_rows: usize,
545    ) -> Result<ColumnarValue, DataFusionError> {
546        let arg_fields = args
547            .iter()
548            .map(|arg| Field::new("a", arg.data_type(), true).into())
549            .collect::<Vec<_>>();
550        let args = ScalarFunctionArgs {
551            args,
552            arg_fields,
553            number_rows,
554            return_field: Field::new("f", Interval(MonthDayNano), true).into(),
555            config_options: Arc::new(ConfigOptions::default()),
556        };
557        SparkMakeInterval::new().invoke_with_args(args)
558    }
559
560    #[test]
561    fn zero_args_returns_zero_seconds() -> Result<()> {
562        let number_rows = 2;
563        let res: ColumnarValue = invoke_make_interval_with_args(vec![], number_rows)?;
564
565        match res {
566            ColumnarValue::Array(arr) => {
567                let arr = arr
568                    .as_any()
569                    .downcast_ref::<IntervalMonthDayNanoArray>()
570                    .ok_or_else(|| {
571                        internal_datafusion_err!("expected IntervalMonthDayNanoArray")
572                    })?;
573                assert_eq_or_internal_err!(
574                    arr.len(),
575                    number_rows,
576                    "expected array length {number_rows}"
577                );
578                for i in 0..number_rows {
579                    let iv = arr.value(i);
580                    assert_eq_or_internal_err!(
581                        (iv.months, iv.days, iv.nanoseconds),
582                        (0, 0, 0),
583                        "row {i}: expected (0,0,0), got ({},{},{})",
584                        iv.months,
585                        iv.days,
586                        iv.nanoseconds
587                    );
588                }
589            }
590            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
591                assert_eq_or_internal_err!(
592                    (iv.months, iv.days, iv.nanoseconds),
593                    (0, 0, 0),
594                    "expected scalar 0s, got ({},{},{})",
595                    iv.months,
596                    iv.days,
597                    iv.nanoseconds
598                );
599            }
600            other => {
601                return internal_err!(
602                    "expected Array or Scalar IntervalMonthDayNano, got {other:?}"
603                );
604            }
605        }
606
607        Ok(())
608    }
609}