datafusion_spark/function/datetime/
make_interval.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{Array, ArrayRef, IntervalMonthDayNanoBuilder, PrimitiveArray};
22use arrow::datatypes::DataType::Interval;
23use arrow::datatypes::IntervalUnit::MonthDayNano;
24use arrow::datatypes::{DataType, IntervalMonthDayNano};
25use datafusion_common::{
26    exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue,
27};
28use datafusion_expr::{
29    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32
33#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkMakeInterval {
35    signature: Signature,
36}
37
38impl Default for SparkMakeInterval {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl SparkMakeInterval {
45    pub fn new() -> Self {
46        Self {
47            signature: Signature::user_defined(Volatility::Immutable),
48        }
49    }
50}
51
52impl ScalarUDFImpl for SparkMakeInterval {
53    fn as_any(&self) -> &dyn Any {
54        self
55    }
56
57    fn name(&self) -> &str {
58        "make_interval"
59    }
60
61    fn signature(&self) -> &Signature {
62        &self.signature
63    }
64
65    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
66        Ok(Interval(MonthDayNano))
67    }
68
69    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
70        if args.args.is_empty() {
71            return Ok(ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(
72                Some(IntervalMonthDayNano::new(0, 0, 0)),
73            )));
74        }
75        make_scalar_function(make_interval_kernel, vec![])(&args.args)
76    }
77
78    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
79        let length = arg_types.len();
80        match length {
81            x if x > 7 => {
82                exec_err!(
83                    "make_interval expects between 0 and 7 arguments, got {}",
84                    arg_types.len()
85                )
86            }
87            _ => Ok((0..arg_types.len())
88                .map(|i| {
89                    if i == 6 {
90                        DataType::Float64
91                    } else {
92                        DataType::Int32
93                    }
94                })
95                .collect()),
96        }
97    }
98}
99
100fn make_interval_kernel(args: &[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
101    use arrow::array::AsArray;
102    use arrow::datatypes::{Float64Type, Int32Type};
103
104    let n_rows = args[0].len();
105
106    let years = args[0]
107        .as_primitive_opt::<Int32Type>()
108        .ok_or_else(|| plan_datafusion_err!("make_interval arg[0] must be Int32"))?;
109    let months = args
110        .get(1)
111        .map(|a| {
112            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
113                plan_datafusion_err!("make_dt_interval arg[1] must be Int32")
114            })
115        })
116        .transpose()?;
117    let weeks = args
118        .get(2)
119        .map(|a| {
120            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
121                plan_datafusion_err!("make_dt_interval arg[2] must be Int32")
122            })
123        })
124        .transpose()?;
125    let days: Option<&PrimitiveArray<Int32Type>> = args
126        .get(3)
127        .map(|a| {
128            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
129                plan_datafusion_err!("make_dt_interval arg[3] must be Int32")
130            })
131        })
132        .transpose()?;
133    let hours: Option<&PrimitiveArray<Int32Type>> = args
134        .get(4)
135        .map(|a| {
136            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
137                plan_datafusion_err!("make_dt_interval arg[4] must be Int32")
138            })
139        })
140        .transpose()?;
141    let mins: Option<&PrimitiveArray<Int32Type>> = args
142        .get(5)
143        .map(|a| {
144            a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
145                plan_datafusion_err!("make_dt_interval arg[5] must be Int32")
146            })
147        })
148        .transpose()?;
149    let secs: Option<&PrimitiveArray<Float64Type>> = args
150        .get(6)
151        .map(|a| {
152            a.as_primitive_opt::<Float64Type>().ok_or_else(|| {
153                plan_datafusion_err!("make_dt_interval arg[6] must be Float64")
154            })
155        })
156        .transpose()?;
157
158    let mut builder = IntervalMonthDayNanoBuilder::with_capacity(n_rows);
159
160    for i in 0..n_rows {
161        // if one column is NULL → result NULL
162        let any_null_present = years.is_null(i)
163            || months.as_ref().is_some_and(|a| a.is_null(i))
164            || weeks.as_ref().is_some_and(|a| a.is_null(i))
165            || days.as_ref().is_some_and(|a| a.is_null(i))
166            || hours.as_ref().is_some_and(|a| a.is_null(i))
167            || mins.as_ref().is_some_and(|a| a.is_null(i))
168            || secs
169                .as_ref()
170                .is_some_and(|a| a.is_null(i) || !a.value(i).is_finite());
171
172        if any_null_present {
173            builder.append_null();
174            continue;
175        }
176
177        // default values 0 or 0.0
178        let y = years.value(i);
179        let mo = months.as_ref().map_or(0, |a| a.value(i));
180        let w = weeks.as_ref().map_or(0, |a| a.value(i));
181        let d = days.as_ref().map_or(0, |a| a.value(i));
182        let h = hours.as_ref().map_or(0, |a| a.value(i));
183        let mi = mins.as_ref().map_or(0, |a| a.value(i));
184        let s = secs.as_ref().map_or(0.0, |a| a.value(i));
185
186        match make_interval_month_day_nano(y, mo, w, d, h, mi, s) {
187            Some(v) => builder.append_value(v),
188            None => {
189                builder.append_null();
190                continue;
191            }
192        }
193    }
194
195    Ok(Arc::new(builder.finish()))
196}
197
198fn make_interval_month_day_nano(
199    year: i32,
200    month: i32,
201    week: i32,
202    day: i32,
203    hour: i32,
204    min: i32,
205    sec: f64,
206) -> Option<IntervalMonthDayNano> {
207    // checks if overflow
208    let months = year.checked_mul(12).and_then(|v| v.checked_add(month))?;
209    let total_days = week.checked_mul(7).and_then(|v| v.checked_add(day))?;
210
211    let hours_nanos = (hour as i64).checked_mul(3_600_000_000_000)?;
212    let mins_nanos = (min as i64).checked_mul(60_000_000_000)?;
213
214    let sec_int = sec.trunc() as i64;
215    let frac = sec - sec.trunc();
216    let mut frac_nanos = (frac * 1_000_000_000.0).round() as i64;
217
218    if frac_nanos.abs() >= 1_000_000_000 {
219        if frac_nanos > 0 {
220            frac_nanos -= 1_000_000_000;
221        } else {
222            frac_nanos += 1_000_000_000;
223        }
224    }
225
226    let secs_nanos = sec_int.checked_mul(1_000_000_000)?;
227
228    let total_nanos = hours_nanos
229        .checked_add(mins_nanos)
230        .and_then(|v| v.checked_add(secs_nanos))
231        .and_then(|v| v.checked_add(frac_nanos))?;
232
233    Some(IntervalMonthDayNano::new(months, total_days, total_nanos))
234}
235
236#[cfg(test)]
237mod tests {
238    use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
239    use arrow::datatypes::Field;
240    use datafusion_common::config::ConfigOptions;
241    use datafusion_common::{internal_datafusion_err, internal_err, Result};
242
243    use super::*;
244    fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
245        make_interval_kernel(&arrs)
246    }
247
248    #[test]
249    fn nulls_propagate_per_row() {
250        let year = Arc::new(Int32Array::from(vec![
251            None,
252            Some(2),
253            Some(3),
254            Some(4),
255            Some(5),
256            Some(6),
257            Some(7),
258            Some(8),
259            Some(9),
260        ]));
261        let month = Arc::new(Int32Array::from(vec![
262            Some(1),
263            None,
264            Some(3),
265            Some(4),
266            Some(5),
267            Some(6),
268            Some(7),
269            Some(8),
270            Some(9),
271        ]));
272        let week = Arc::new(Int32Array::from(vec![
273            Some(1),
274            Some(2),
275            None,
276            Some(4),
277            Some(5),
278            Some(6),
279            Some(7),
280            Some(8),
281            Some(9),
282        ]));
283        let day = Arc::new(Int32Array::from(vec![
284            Some(1),
285            Some(2),
286            Some(3),
287            None,
288            Some(5),
289            Some(6),
290            Some(7),
291            Some(8),
292            Some(9),
293        ]));
294        let hour = Arc::new(Int32Array::from(vec![
295            Some(1),
296            Some(2),
297            Some(3),
298            Some(4),
299            None,
300            Some(6),
301            Some(7),
302            Some(8),
303            Some(9),
304        ]));
305        let min = Arc::new(Int32Array::from(vec![
306            Some(1),
307            Some(2),
308            Some(3),
309            Some(4),
310            Some(5),
311            None,
312            Some(7),
313            Some(8),
314            Some(9),
315        ]));
316        let sec = Arc::new(Float64Array::from(vec![
317            Some(1.0),
318            Some(2.0),
319            Some(3.0),
320            Some(4.0),
321            Some(5.0),
322            Some(6.0),
323            None,
324            Some(f64::INFINITY),
325            Some(f64::NEG_INFINITY),
326        ]));
327
328        let out = run_make_interval_month_day_nano(vec![
329            year, month, week, day, hour, min, sec,
330        ])
331        .unwrap();
332        let out = out
333            .as_any()
334            .downcast_ref::<IntervalMonthDayNanoArray>()
335            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
336            .unwrap();
337
338        for i in 0..out.len() {
339            assert!(out.is_null(i), "row {i} should be NULL");
340        }
341    }
342
343    #[test]
344    fn error_months_overflow_should_be_null() {
345        // months = year*12 + month → NULL
346        let year = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
347        let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
348        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
349        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
350        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
351        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
352        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
353
354        let out = run_make_interval_month_day_nano(vec![
355            year, month, week, day, hour, min, sec,
356        ])
357        .unwrap();
358        let out = out
359            .as_any()
360            .downcast_ref::<IntervalMonthDayNanoArray>()
361            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
362            .unwrap();
363
364        for i in 0..out.len() {
365            assert!(out.is_null(i), "row {i} should be NULL");
366        }
367    }
368    #[test]
369    fn error_days_overflow_should_be_null() {
370        // months = year*12 + month →  NULL
371        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
372        let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
373        let week = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
374        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
375        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
376        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
377        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
378
379        let out = run_make_interval_month_day_nano(vec![
380            year, month, week, day, hour, min, sec,
381        ])
382        .unwrap();
383        let out = out
384            .as_any()
385            .downcast_ref::<IntervalMonthDayNanoArray>()
386            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
387            .unwrap();
388
389        for i in 0..out.len() {
390            assert!(out.is_null(i), "row {i} should be NULL");
391        }
392    }
393    #[test]
394    fn error_min_overflow_should_be_null() {
395        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
396        let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
397        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
398        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
399        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
400        let min = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
401        let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
402
403        let out = run_make_interval_month_day_nano(vec![
404            year, month, week, day, hour, min, sec,
405        ])
406        .unwrap();
407        let out = out
408            .as_any()
409            .downcast_ref::<IntervalMonthDayNanoArray>()
410            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
411            .unwrap();
412
413        for i in 0..out.len() {
414            assert!(out.is_null(i), "row {i} should be NULL");
415        }
416    }
417    #[test]
418    fn error_sec_overflow_should_be_null() {
419        let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
420        let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
421        let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
422        let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
423        let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
424        let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
425        let sec = Arc::new(Float64Array::from(vec![Some(f64::MAX)])) as ArrayRef;
426
427        let out = run_make_interval_month_day_nano(vec![
428            year, month, week, day, hour, min, sec,
429        ])
430        .unwrap();
431        let out = out
432            .as_any()
433            .downcast_ref::<IntervalMonthDayNanoArray>()
434            .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
435            .unwrap();
436
437        for i in 0..out.len() {
438            assert!(out.is_null(i), "row {i} should be NULL");
439        }
440    }
441
442    #[test]
443    fn happy_path_all_present_single_row() {
444        // 1y 2m 3w 4d 5h 6m 7.25s
445        let year = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
446        let month = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
447        let week = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef;
448        let day = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef;
449        let hour = Arc::new(Int32Array::from(vec![Some(5)])) as ArrayRef;
450        let mins = Arc::new(Int32Array::from(vec![Some(6)])) as ArrayRef;
451        let secs = Arc::new(Float64Array::from(vec![Some(7.25)])) as ArrayRef;
452
453        let out = run_make_interval_month_day_nano(vec![
454            year, month, week, day, hour, mins, secs,
455        ])
456        .unwrap();
457        assert_eq!(out.data_type(), &Interval(MonthDayNano));
458
459        let out = out
460            .as_any()
461            .downcast_ref::<IntervalMonthDayNanoArray>()
462            .unwrap();
463        assert_eq!(out.len(), 1);
464        assert_eq!(out.null_count(), 0);
465
466        let v: IntervalMonthDayNano = out.value(0);
467        assert_eq!(v.months, 12 + 2); // 14
468        assert_eq!(v.days, 3 * 7 + 4); // 25
469        let expected_nanos = (5_i64 * 3600 + 6 * 60 + 7) * 1_000_000_000 + 250_000_000;
470        assert_eq!(v.nanoseconds, expected_nanos);
471    }
472
473    #[test]
474    fn negative_components_and_fractional_seconds() {
475        // -1y -2m  -1w -1d  -1h -1m  -1.5s
476        let year = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
477        let month = Arc::new(Int32Array::from(vec![Some(-2)])) as ArrayRef;
478        let week = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
479        let day = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
480        let hour = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
481        let mins = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
482        let secs = Arc::new(Float64Array::from(vec![Some(-1.5)])) as ArrayRef;
483
484        let out = run_make_interval_month_day_nano(vec![
485            year, month, week, day, hour, mins, secs,
486        ])
487        .unwrap();
488        let out = out
489            .as_any()
490            .downcast_ref::<IntervalMonthDayNanoArray>()
491            .unwrap();
492
493        assert_eq!(out.len(), 1);
494        assert_eq!(out.null_count(), 0);
495        let v = out.value(0);
496
497        assert_eq!(v.months, -12 + (-2)); // -14
498        assert_eq!(v.days, -7 + (-1)); // -8
499
500        // -(1h + 1m + 1.5s) en nanos
501        let expected_nanos = -((3600_i64 + 60 + 1) * 1_000_000_000 + 500_000_000);
502        assert_eq!(v.nanoseconds, expected_nanos);
503    }
504
505    fn invoke_make_interval_with_args(
506        args: Vec<ColumnarValue>,
507        number_rows: usize,
508    ) -> Result<ColumnarValue, DataFusionError> {
509        let arg_fields = args
510            .iter()
511            .map(|arg| Field::new("a", arg.data_type(), true).into())
512            .collect::<Vec<_>>();
513        let args = ScalarFunctionArgs {
514            args,
515            arg_fields,
516            number_rows,
517            return_field: Field::new("f", Interval(MonthDayNano), true).into(),
518            config_options: Arc::new(ConfigOptions::default()),
519        };
520        SparkMakeInterval::new().invoke_with_args(args)
521    }
522
523    #[test]
524    fn zero_args_returns_zero_seconds() -> Result<()> {
525        let number_rows = 2;
526        let res: ColumnarValue = invoke_make_interval_with_args(vec![], number_rows)?;
527
528        match res {
529            ColumnarValue::Array(arr) => {
530                let arr = arr
531                    .as_any()
532                    .downcast_ref::<IntervalMonthDayNanoArray>()
533                    .ok_or_else(|| {
534                        internal_datafusion_err!("expected IntervalMonthDayNanoArray")
535                    })?;
536                if arr.len() != number_rows {
537                    return internal_err!(
538                        "expected array length {number_rows}, got {}",
539                        arr.len()
540                    );
541                }
542                for i in 0..number_rows {
543                    let iv = arr.value(i);
544                    if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
545                        return internal_err!(
546                            "row {i}: expected (0,0,0), got ({},{},{})",
547                            iv.months,
548                            iv.days,
549                            iv.nanoseconds
550                        );
551                    }
552                }
553            }
554            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
555                if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
556                    return internal_err!(
557                        "expected scalar 0s, got ({},{},{})",
558                        iv.months,
559                        iv.days,
560                        iv.nanoseconds
561                    );
562                }
563            }
564            other => {
565                return internal_err!(
566                    "expected Array or Scalar IntervalMonthDayNano, got {other:?}"
567                );
568            }
569        }
570
571        Ok(())
572    }
573}