Skip to main content

datafusion_spark/function/datetime/
make_interval.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{Array, ArrayRef, IntervalMonthDayNanoBuilder, PrimitiveArray};
21use arrow::datatypes::DataType::Interval;
22use arrow::datatypes::IntervalUnit::MonthDayNano;
23use arrow::datatypes::{DataType, IntervalMonthDayNano};
24use datafusion_common::types::{NativeType, logical_float64, logical_int32};
25use datafusion_common::{DataFusionError, Result, ScalarValue, plan_datafusion_err};
26use datafusion_expr::{
27    Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
28    TypeSignatureClass, Volatility,
29};
30use datafusion_functions::utils::make_scalar_function;
31
32#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct SparkMakeInterval {
34    signature: Signature,
35}
36
37impl Default for SparkMakeInterval {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl SparkMakeInterval {
44    pub fn new() -> Self {
45        let int32 = Coercion::new_implicit(
46            TypeSignatureClass::Native(logical_int32()),
47            vec![TypeSignatureClass::Integer],
48            NativeType::Int32,
49        );
50
51        let float64 = Coercion::new_implicit(
52            TypeSignatureClass::Native(logical_float64()),
53            vec![TypeSignatureClass::Numeric],
54            NativeType::Float64,
55        );
56
57        let variants = vec![
58            TypeSignature::Nullary,
59            // year
60            TypeSignature::Coercible(vec![int32.clone()]),
61            // year, month
62            TypeSignature::Coercible(vec![int32.clone(), int32.clone()]),
63            // year, month, week
64            TypeSignature::Coercible(vec![int32.clone(), int32.clone(), int32.clone()]),
65            // year, month, week, day
66            TypeSignature::Coercible(vec![
67                int32.clone(),
68                int32.clone(),
69                int32.clone(),
70                int32.clone(),
71            ]),
72            // year, month, week, day, hour
73            TypeSignature::Coercible(vec![
74                int32.clone(),
75                int32.clone(),
76                int32.clone(),
77                int32.clone(),
78                int32.clone(),
79            ]),
80            // year, month, week, day, hour, minute
81            TypeSignature::Coercible(vec![
82                int32.clone(),
83                int32.clone(),
84                int32.clone(),
85                int32.clone(),
86                int32.clone(),
87                int32.clone(),
88            ]),
89            // year, month, week, day, hour, minute, second
90            TypeSignature::Coercible(vec![
91                int32.clone(),
92                int32.clone(),
93                int32.clone(),
94                int32.clone(),
95                int32.clone(),
96                int32.clone(),
97                float64.clone(),
98            ]),
99        ];
100
101        Self {
102            signature: Signature::one_of(variants, Volatility::Immutable),
103        }
104    }
105}
106
107impl ScalarUDFImpl for SparkMakeInterval {
108    fn name(&self) -> &str {
109        "make_interval"
110    }
111
112    fn signature(&self) -> &Signature {
113        &self.signature
114    }
115
116    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
117        Ok(Interval(MonthDayNano))
118    }
119
120    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
121        if args.args.is_empty() {
122            return Ok(ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(
123                Some(IntervalMonthDayNano::new(0, 0, 0)),
124            )));
125        }
126        make_scalar_function(make_interval_kernel, vec![])(&args.args)
127    }
128}
129
130fn make_interval_kernel(args: &[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
131    use arrow::array::AsArray;
132    use arrow::datatypes::{Float64Type, Int32Type};
133
134    let n_rows = args[0].len();
135
136    let years = args[0]
137        .as_primitive_opt::<Int32Type>()
138        .ok_or_else(|| plan_datafusion_err!("make_interval arg[0] must be Int32"))?;
139    let months = args
140        .get(1)
141        .map(|a| {
142            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
143                plan_datafusion_err!("make_dt_interval arg[1] must be Int32")
144            })
145        })
146        .transpose()?;
147    let weeks = args
148        .get(2)
149        .map(|a| {
150            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
151                plan_datafusion_err!("make_dt_interval arg[2] must be Int32")
152            })
153        })
154        .transpose()?;
155    let days: Option<&PrimitiveArray<Int32Type>> = args
156        .get(3)
157        .map(|a| {
158            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
159                plan_datafusion_err!("make_dt_interval arg[3] must be Int32")
160            })
161        })
162        .transpose()?;
163    let hours: Option<&PrimitiveArray<Int32Type>> = args
164        .get(4)
165        .map(|a| {
166            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
167                plan_datafusion_err!("make_dt_interval arg[4] must be Int32")
168            })
169        })
170        .transpose()?;
171    let mins: Option<&PrimitiveArray<Int32Type>> = args
172        .get(5)
173        .map(|a| {
174            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
175                plan_datafusion_err!("make_dt_interval arg[5] must be Int32")
176            })
177        })
178        .transpose()?;
179    let secs: Option<&PrimitiveArray<Float64Type>> = args
180        .get(6)
181        .map(|a| {
182            a.as_primitive_opt::<Float64Type>().ok_or_else(|| {
183                plan_datafusion_err!("make_dt_interval arg[6] must be Float64")
184            })
185        })
186        .transpose()?;
187
188    let mut builder = IntervalMonthDayNanoBuilder::with_capacity(n_rows);
189
190    for i in 0..n_rows {
191        // if one column is NULL → result NULL
192        let any_null_present = years.is_null(i)
193            || months.as_ref().is_some_and(|a| a.is_null(i))
194            || weeks.as_ref().is_some_and(|a| a.is_null(i))
195            || days.as_ref().is_some_and(|a| a.is_null(i))
196            || hours.as_ref().is_some_and(|a| a.is_null(i))
197            || mins.as_ref().is_some_and(|a| a.is_null(i))
198            || secs
199                .as_ref()
200                .is_some_and(|a| a.is_null(i) || !a.value(i).is_finite());
201
202        if any_null_present {
203            builder.append_null();
204            continue;
205        }
206
207        // default values 0 or 0.0
208        let y = years.value(i);
209        let mo = months.as_ref().map_or(0, |a| a.value(i));
210        let w = weeks.as_ref().map_or(0, |a| a.value(i));
211        let d = days.as_ref().map_or(0, |a| a.value(i));
212        let h = hours.as_ref().map_or(0, |a| a.value(i));
213        let mi = mins.as_ref().map_or(0, |a| a.value(i));
214        let s = secs.as_ref().map_or(0.0, |a| a.value(i));
215
216        match make_interval_month_day_nano(y, mo, w, d, h, mi, s) {
217            Some(v) => builder.append_value(v),
218            None => {
219                builder.append_null();
220                continue;
221            }
222        }
223    }
224
225    Ok(Arc::new(builder.finish()))
226}
227
228fn make_interval_month_day_nano(
229    year: i32,
230    month: i32,
231    week: i32,
232    day: i32,
233    hour: i32,
234    min: i32,
235    sec: f64,
236) -> Option<IntervalMonthDayNano> {
237    // checks if overflow
238    let months = year.checked_mul(12).and_then(|v| v.checked_add(month))?;
239    let total_days = week.checked_mul(7).and_then(|v| v.checked_add(day))?;
240
241    let hours_nanos = (hour as i64).checked_mul(3_600_000_000_000)?;
242    let mins_nanos = (min as i64).checked_mul(60_000_000_000)?;
243
244    let sec_int = sec.trunc() as i64;
245    let frac = sec - sec.trunc();
246    let mut frac_nanos = (frac * 1_000_000_000.0).round() as i64;
247
248    if frac_nanos.abs() >= 1_000_000_000 {
249        if frac_nanos > 0 {
250            frac_nanos -= 1_000_000_000;
251        } else {
252            frac_nanos += 1_000_000_000;
253        }
254    }
255
256    let secs_nanos = sec_int.checked_mul(1_000_000_000)?;
257
258    let total_nanos = hours_nanos
259        .checked_add(mins_nanos)
260        .and_then(|v| v.checked_add(secs_nanos))
261        .and_then(|v| v.checked_add(frac_nanos))?;
262
263    Some(IntervalMonthDayNano::new(months, total_days, total_nanos))
264}
265
266#[cfg(test)]
267mod tests {
268    use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
269    use arrow::datatypes::Field;
270    use datafusion_common::config::ConfigOptions;
271    use datafusion_common::{
272        Result, assert_eq_or_internal_err, internal_datafusion_err, internal_err,
273    };
274
275    use super::*;
276    fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
277        make_interval_kernel(&arrs)
278    }
279
280    #[test]
281    fn nulls_propagate_per_row() {
282        let year = Arc::new(Int32Array::from(vec![
283            None,
284            Some(2),
285            Some(3),
286            Some(4),
287            Some(5),
288            Some(6),
289            Some(7),
290            Some(8),
291            Some(9),
292        ]));
293        let month = Arc::new(Int32Array::from(vec![
294            Some(1),
295            None,
296            Some(3),
297            Some(4),
298            Some(5),
299            Some(6),
300            Some(7),
301            Some(8),
302            Some(9),
303        ]));
304        let week = Arc::new(Int32Array::from(vec![
305            Some(1),
306            Some(2),
307            None,
308            Some(4),
309            Some(5),
310            Some(6),
311            Some(7),
312            Some(8),
313            Some(9),
314        ]));
315        let day = Arc::new(Int32Array::from(vec![
316            Some(1),
317            Some(2),
318            Some(3),
319            None,
320            Some(5),
321            Some(6),
322            Some(7),
323            Some(8),
324            Some(9),
325        ]));
326        let hour = Arc::new(Int32Array::from(vec![
327            Some(1),
328            Some(2),
329            Some(3),
330            Some(4),
331            None,
332            Some(6),
333            Some(7),
334            Some(8),
335            Some(9),
336        ]));
337        let min = Arc::new(Int32Array::from(vec![
338            Some(1),
339            Some(2),
340            Some(3),
341            Some(4),
342            Some(5),
343            None,
344            Some(7),
345            Some(8),
346            Some(9),
347        ]));
348        let sec = Arc::new(Float64Array::from(vec![
349            Some(1.0),
350            Some(2.0),
351            Some(3.0),
352            Some(4.0),
353            Some(5.0),
354            Some(6.0),
355            None,
356            Some(f64::INFINITY),
357            Some(f64::NEG_INFINITY),
358        ]));
359
360        let out = run_make_interval_month_day_nano(vec![
361            year, month, week, day, hour, min, sec,
362        ])
363        .unwrap();
364        let out = out
365            .as_any()
366            .downcast_ref::<IntervalMonthDayNanoArray>()
367            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
368            .unwrap();
369
370        for i in 0..out.len() {
371            assert!(out.is_null(i), "row {i} should be NULL");
372        }
373    }
374
375    #[test]
376    fn error_months_overflow_should_be_null() {
377        // months = year*12 + month → NULL
378        let year = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
379        let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
380        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
381        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
382        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
383        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
384        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
385
386        let out = run_make_interval_month_day_nano(vec![
387            year, month, week, day, hour, min, sec,
388        ])
389        .unwrap();
390        let out = out
391            .as_any()
392            .downcast_ref::<IntervalMonthDayNanoArray>()
393            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
394            .unwrap();
395
396        for i in 0..out.len() {
397            assert!(out.is_null(i), "row {i} should be NULL");
398        }
399    }
400    #[test]
401    fn error_days_overflow_should_be_null() {
402        // months = year*12 + month →  NULL
403        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
404        let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
405        let week = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
406        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
407        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
408        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
409        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
410
411        let out = run_make_interval_month_day_nano(vec![
412            year, month, week, day, hour, min, sec,
413        ])
414        .unwrap();
415        let out = out
416            .as_any()
417            .downcast_ref::<IntervalMonthDayNanoArray>()
418            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
419            .unwrap();
420
421        for i in 0..out.len() {
422            assert!(out.is_null(i), "row {i} should be NULL");
423        }
424    }
425    #[test]
426    fn error_min_overflow_should_be_null() {
427        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
428        let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
429        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
430        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
431        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
432        let min = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
433        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
434
435        let out = run_make_interval_month_day_nano(vec![
436            year, month, week, day, hour, min, sec,
437        ])
438        .unwrap();
439        let out = out
440            .as_any()
441            .downcast_ref::<IntervalMonthDayNanoArray>()
442            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
443            .unwrap();
444
445        for i in 0..out.len() {
446            assert!(out.is_null(i), "row {i} should be NULL");
447        }
448    }
449    #[test]
450    fn error_sec_overflow_should_be_null() {
451        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
452        let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
453        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
454        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
455        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
456        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
457        let sec = Arc::new(Float64Array::from(vec![Some(f64::MAX)])) as ArrayRef;
458
459        let out = run_make_interval_month_day_nano(vec![
460            year, month, week, day, hour, min, sec,
461        ])
462        .unwrap();
463        let out = out
464            .as_any()
465            .downcast_ref::<IntervalMonthDayNanoArray>()
466            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
467            .unwrap();
468
469        for i in 0..out.len() {
470            assert!(out.is_null(i), "row {i} should be NULL");
471        }
472    }
473
474    #[test]
475    fn happy_path_all_present_single_row() {
476        // 1y 2m 3w 4d 5h 6m 7.25s
477        let year = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
478        let month = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
479        let week = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef;
480        let day = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef;
481        let hour = Arc::new(Int32Array::from(vec![Some(5)])) as ArrayRef;
482        let mins = Arc::new(Int32Array::from(vec![Some(6)])) as ArrayRef;
483        let secs = Arc::new(Float64Array::from(vec![Some(7.25)])) as ArrayRef;
484
485        let out = run_make_interval_month_day_nano(vec![
486            year, month, week, day, hour, mins, secs,
487        ])
488        .unwrap();
489        assert_eq!(out.data_type(), &Interval(MonthDayNano));
490
491        let out = out
492            .as_any()
493            .downcast_ref::<IntervalMonthDayNanoArray>()
494            .unwrap();
495        assert_eq!(out.len(), 1);
496        assert_eq!(out.null_count(), 0);
497
498        let v: IntervalMonthDayNano = out.value(0);
499        assert_eq!(v.months, 12 + 2); // 14
500        assert_eq!(v.days, 3 * 7 + 4); // 25
501        let expected_nanos = (5_i64 * 3600 + 6 * 60 + 7) * 1_000_000_000 + 250_000_000;
502        assert_eq!(v.nanoseconds, expected_nanos);
503    }
504
505    #[test]
506    fn negative_components_and_fractional_seconds() {
507        // -1y -2m  -1w -1d  -1h -1m  -1.5s
508        let year = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
509        let month = Arc::new(Int32Array::from(vec![Some(-2)])) as ArrayRef;
510        let week = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
511        let day = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
512        let hour = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
513        let mins = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
514        let secs = Arc::new(Float64Array::from(vec![Some(-1.5)])) as ArrayRef;
515
516        let out = run_make_interval_month_day_nano(vec![
517            year, month, week, day, hour, mins, secs,
518        ])
519        .unwrap();
520        let out = out
521            .as_any()
522            .downcast_ref::<IntervalMonthDayNanoArray>()
523            .unwrap();
524
525        assert_eq!(out.len(), 1);
526        assert_eq!(out.null_count(), 0);
527        let v = out.value(0);
528
529        assert_eq!(v.months, -12 + (-2)); // -14
530        assert_eq!(v.days, -7 + (-1)); // -8
531
532        // -(1h + 1m + 1.5s) en nanos
533        let expected_nanos = -((3600_i64 + 60 + 1) * 1_000_000_000 + 500_000_000);
534        assert_eq!(v.nanoseconds, expected_nanos);
535    }
536
537    fn invoke_make_interval_with_args(
538        args: Vec<ColumnarValue>,
539        number_rows: usize,
540    ) -> Result<ColumnarValue, DataFusionError> {
541        let arg_fields = args
542            .iter()
543            .map(|arg| Field::new("a", arg.data_type(), true).into())
544            .collect::<Vec<_>>();
545        let args = ScalarFunctionArgs {
546            args,
547            arg_fields,
548            number_rows,
549            return_field: Field::new("f", Interval(MonthDayNano), true).into(),
550            config_options: Arc::new(ConfigOptions::default()),
551        };
552        SparkMakeInterval::new().invoke_with_args(args)
553    }
554
555    #[test]
556    fn zero_args_returns_zero_seconds() -> Result<()> {
557        let number_rows = 2;
558        let res: ColumnarValue = invoke_make_interval_with_args(vec![], number_rows)?;
559
560        match res {
561            ColumnarValue::Array(arr) => {
562                let arr = arr
563                    .as_any()
564                    .downcast_ref::<IntervalMonthDayNanoArray>()
565                    .ok_or_else(|| {
566                        internal_datafusion_err!("expected IntervalMonthDayNanoArray")
567                    })?;
568                assert_eq_or_internal_err!(
569                    arr.len(),
570                    number_rows,
571                    "expected array length {number_rows}"
572                );
573                for i in 0..number_rows {
574                    let iv = arr.value(i);
575                    assert_eq_or_internal_err!(
576                        (iv.months, iv.days, iv.nanoseconds),
577                        (0, 0, 0),
578                        "row {i}: expected (0,0,0), got ({},{},{})",
579                        iv.months,
580                        iv.days,
581                        iv.nanoseconds
582                    );
583                }
584            }
585            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
586                assert_eq_or_internal_err!(
587                    (iv.months, iv.days, iv.nanoseconds),
588                    (0, 0, 0),
589                    "expected scalar 0s, got ({},{},{})",
590                    iv.months,
591                    iv.days,
592                    iv.nanoseconds
593                );
594            }
595            other => {
596                return internal_err!(
597                    "expected Array or Scalar IntervalMonthDayNano, got {other:?}"
598                );
599            }
600        }
601
602        Ok(())
603    }
604}