1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{Array, ArrayRef, IntervalMonthDayNanoBuilder, PrimitiveArray};
22use arrow::datatypes::DataType::Interval;
23use arrow::datatypes::IntervalUnit::MonthDayNano;
24use arrow::datatypes::{DataType, IntervalMonthDayNano};
25use datafusion_common::{
26 exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue,
27};
28use datafusion_expr::{
29 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32
33#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkMakeInterval {
35 signature: Signature,
36}
37
38impl Default for SparkMakeInterval {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl SparkMakeInterval {
45 pub fn new() -> Self {
46 Self {
47 signature: Signature::user_defined(Volatility::Immutable),
48 }
49 }
50}
51
52impl ScalarUDFImpl for SparkMakeInterval {
53 fn as_any(&self) -> &dyn Any {
54 self
55 }
56
57 fn name(&self) -> &str {
58 "make_interval"
59 }
60
61 fn signature(&self) -> &Signature {
62 &self.signature
63 }
64
65 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
66 Ok(Interval(MonthDayNano))
67 }
68
69 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
70 if args.args.is_empty() {
71 return Ok(ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(
72 Some(IntervalMonthDayNano::new(0, 0, 0)),
73 )));
74 }
75 make_scalar_function(make_interval_kernel, vec![])(&args.args)
76 }
77
78 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
79 let length = arg_types.len();
80 match length {
81 x if x > 7 => {
82 exec_err!(
83 "make_interval expects between 0 and 7 arguments, got {}",
84 arg_types.len()
85 )
86 }
87 _ => Ok((0..arg_types.len())
88 .map(|i| {
89 if i == 6 {
90 DataType::Float64
91 } else {
92 DataType::Int32
93 }
94 })
95 .collect()),
96 }
97 }
98}
99
100fn make_interval_kernel(args: &[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
101 use arrow::array::AsArray;
102 use arrow::datatypes::{Float64Type, Int32Type};
103
104 let n_rows = args[0].len();
105
106 let years = args[0]
107 .as_primitive_opt::<Int32Type>()
108 .ok_or_else(|| plan_datafusion_err!("make_interval arg[0] must be Int32"))?;
109 let months = args
110 .get(1)
111 .map(|a| {
112 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
113 plan_datafusion_err!("make_dt_interval arg[1] must be Int32")
114 })
115 })
116 .transpose()?;
117 let weeks = args
118 .get(2)
119 .map(|a| {
120 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
121 plan_datafusion_err!("make_dt_interval arg[2] must be Int32")
122 })
123 })
124 .transpose()?;
125 let days: Option<&PrimitiveArray<Int32Type>> = args
126 .get(3)
127 .map(|a| {
128 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
129 plan_datafusion_err!("make_dt_interval arg[3] must be Int32")
130 })
131 })
132 .transpose()?;
133 let hours: Option<&PrimitiveArray<Int32Type>> = args
134 .get(4)
135 .map(|a| {
136 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
137 plan_datafusion_err!("make_dt_interval arg[4] must be Int32")
138 })
139 })
140 .transpose()?;
141 let mins: Option<&PrimitiveArray<Int32Type>> = args
142 .get(5)
143 .map(|a| {
144 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
145 plan_datafusion_err!("make_dt_interval arg[5] must be Int32")
146 })
147 })
148 .transpose()?;
149 let secs: Option<&PrimitiveArray<Float64Type>> = args
150 .get(6)
151 .map(|a| {
152 a.as_primitive_opt::<Float64Type>().ok_or_else(|| {
153 plan_datafusion_err!("make_dt_interval arg[6] must be Float64")
154 })
155 })
156 .transpose()?;
157
158 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(n_rows);
159
160 for i in 0..n_rows {
161 let any_null_present = years.is_null(i)
163 || months.as_ref().is_some_and(|a| a.is_null(i))
164 || weeks.as_ref().is_some_and(|a| a.is_null(i))
165 || days.as_ref().is_some_and(|a| a.is_null(i))
166 || hours.as_ref().is_some_and(|a| a.is_null(i))
167 || mins.as_ref().is_some_and(|a| a.is_null(i))
168 || secs
169 .as_ref()
170 .is_some_and(|a| a.is_null(i) || !a.value(i).is_finite());
171
172 if any_null_present {
173 builder.append_null();
174 continue;
175 }
176
177 let y = years.value(i);
179 let mo = months.as_ref().map_or(0, |a| a.value(i));
180 let w = weeks.as_ref().map_or(0, |a| a.value(i));
181 let d = days.as_ref().map_or(0, |a| a.value(i));
182 let h = hours.as_ref().map_or(0, |a| a.value(i));
183 let mi = mins.as_ref().map_or(0, |a| a.value(i));
184 let s = secs.as_ref().map_or(0.0, |a| a.value(i));
185
186 match make_interval_month_day_nano(y, mo, w, d, h, mi, s) {
187 Some(v) => builder.append_value(v),
188 None => {
189 builder.append_null();
190 continue;
191 }
192 }
193 }
194
195 Ok(Arc::new(builder.finish()))
196}
197
198fn make_interval_month_day_nano(
199 year: i32,
200 month: i32,
201 week: i32,
202 day: i32,
203 hour: i32,
204 min: i32,
205 sec: f64,
206) -> Option<IntervalMonthDayNano> {
207 let months = year.checked_mul(12).and_then(|v| v.checked_add(month))?;
209 let total_days = week.checked_mul(7).and_then(|v| v.checked_add(day))?;
210
211 let hours_nanos = (hour as i64).checked_mul(3_600_000_000_000)?;
212 let mins_nanos = (min as i64).checked_mul(60_000_000_000)?;
213
214 let sec_int = sec.trunc() as i64;
215 let frac = sec - sec.trunc();
216 let mut frac_nanos = (frac * 1_000_000_000.0).round() as i64;
217
218 if frac_nanos.abs() >= 1_000_000_000 {
219 if frac_nanos > 0 {
220 frac_nanos -= 1_000_000_000;
221 } else {
222 frac_nanos += 1_000_000_000;
223 }
224 }
225
226 let secs_nanos = sec_int.checked_mul(1_000_000_000)?;
227
228 let total_nanos = hours_nanos
229 .checked_add(mins_nanos)
230 .and_then(|v| v.checked_add(secs_nanos))
231 .and_then(|v| v.checked_add(frac_nanos))?;
232
233 Some(IntervalMonthDayNano::new(months, total_days, total_nanos))
234}
235
236#[cfg(test)]
237mod tests {
238 use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
239 use arrow::datatypes::Field;
240 use datafusion_common::config::ConfigOptions;
241 use datafusion_common::{internal_datafusion_err, internal_err, Result};
242
243 use super::*;
244 fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
245 make_interval_kernel(&arrs)
246 }
247
248 #[test]
249 fn nulls_propagate_per_row() {
250 let year = Arc::new(Int32Array::from(vec![
251 None,
252 Some(2),
253 Some(3),
254 Some(4),
255 Some(5),
256 Some(6),
257 Some(7),
258 Some(8),
259 Some(9),
260 ]));
261 let month = Arc::new(Int32Array::from(vec![
262 Some(1),
263 None,
264 Some(3),
265 Some(4),
266 Some(5),
267 Some(6),
268 Some(7),
269 Some(8),
270 Some(9),
271 ]));
272 let week = Arc::new(Int32Array::from(vec![
273 Some(1),
274 Some(2),
275 None,
276 Some(4),
277 Some(5),
278 Some(6),
279 Some(7),
280 Some(8),
281 Some(9),
282 ]));
283 let day = Arc::new(Int32Array::from(vec![
284 Some(1),
285 Some(2),
286 Some(3),
287 None,
288 Some(5),
289 Some(6),
290 Some(7),
291 Some(8),
292 Some(9),
293 ]));
294 let hour = Arc::new(Int32Array::from(vec![
295 Some(1),
296 Some(2),
297 Some(3),
298 Some(4),
299 None,
300 Some(6),
301 Some(7),
302 Some(8),
303 Some(9),
304 ]));
305 let min = Arc::new(Int32Array::from(vec![
306 Some(1),
307 Some(2),
308 Some(3),
309 Some(4),
310 Some(5),
311 None,
312 Some(7),
313 Some(8),
314 Some(9),
315 ]));
316 let sec = Arc::new(Float64Array::from(vec![
317 Some(1.0),
318 Some(2.0),
319 Some(3.0),
320 Some(4.0),
321 Some(5.0),
322 Some(6.0),
323 None,
324 Some(f64::INFINITY),
325 Some(f64::NEG_INFINITY),
326 ]));
327
328 let out = run_make_interval_month_day_nano(vec![
329 year, month, week, day, hour, min, sec,
330 ])
331 .unwrap();
332 let out = out
333 .as_any()
334 .downcast_ref::<IntervalMonthDayNanoArray>()
335 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
336 .unwrap();
337
338 for i in 0..out.len() {
339 assert!(out.is_null(i), "row {i} should be NULL");
340 }
341 }
342
343 #[test]
344 fn error_months_overflow_should_be_null() {
345 let year = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
347 let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
348 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
349 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
350 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
351 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
352 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
353
354 let out = run_make_interval_month_day_nano(vec![
355 year, month, week, day, hour, min, sec,
356 ])
357 .unwrap();
358 let out = out
359 .as_any()
360 .downcast_ref::<IntervalMonthDayNanoArray>()
361 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
362 .unwrap();
363
364 for i in 0..out.len() {
365 assert!(out.is_null(i), "row {i} should be NULL");
366 }
367 }
368 #[test]
369 fn error_days_overflow_should_be_null() {
370 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
372 let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
373 let week = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
374 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
375 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
376 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
377 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
378
379 let out = run_make_interval_month_day_nano(vec![
380 year, month, week, day, hour, min, sec,
381 ])
382 .unwrap();
383 let out = out
384 .as_any()
385 .downcast_ref::<IntervalMonthDayNanoArray>()
386 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
387 .unwrap();
388
389 for i in 0..out.len() {
390 assert!(out.is_null(i), "row {i} should be NULL");
391 }
392 }
393 #[test]
394 fn error_min_overflow_should_be_null() {
395 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
396 let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
397 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
398 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
399 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
400 let min = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
401 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
402
403 let out = run_make_interval_month_day_nano(vec![
404 year, month, week, day, hour, min, sec,
405 ])
406 .unwrap();
407 let out = out
408 .as_any()
409 .downcast_ref::<IntervalMonthDayNanoArray>()
410 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
411 .unwrap();
412
413 for i in 0..out.len() {
414 assert!(out.is_null(i), "row {i} should be NULL");
415 }
416 }
417 #[test]
418 fn error_sec_overflow_should_be_null() {
419 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
420 let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
421 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
422 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
423 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
424 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
425 let sec = Arc::new(Float64Array::from(vec![Some(f64::MAX)])) as ArrayRef;
426
427 let out = run_make_interval_month_day_nano(vec![
428 year, month, week, day, hour, min, sec,
429 ])
430 .unwrap();
431 let out = out
432 .as_any()
433 .downcast_ref::<IntervalMonthDayNanoArray>()
434 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
435 .unwrap();
436
437 for i in 0..out.len() {
438 assert!(out.is_null(i), "row {i} should be NULL");
439 }
440 }
441
442 #[test]
443 fn happy_path_all_present_single_row() {
444 let year = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
446 let month = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
447 let week = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef;
448 let day = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef;
449 let hour = Arc::new(Int32Array::from(vec![Some(5)])) as ArrayRef;
450 let mins = Arc::new(Int32Array::from(vec![Some(6)])) as ArrayRef;
451 let secs = Arc::new(Float64Array::from(vec![Some(7.25)])) as ArrayRef;
452
453 let out = run_make_interval_month_day_nano(vec![
454 year, month, week, day, hour, mins, secs,
455 ])
456 .unwrap();
457 assert_eq!(out.data_type(), &Interval(MonthDayNano));
458
459 let out = out
460 .as_any()
461 .downcast_ref::<IntervalMonthDayNanoArray>()
462 .unwrap();
463 assert_eq!(out.len(), 1);
464 assert_eq!(out.null_count(), 0);
465
466 let v: IntervalMonthDayNano = out.value(0);
467 assert_eq!(v.months, 12 + 2); assert_eq!(v.days, 3 * 7 + 4); let expected_nanos = (5_i64 * 3600 + 6 * 60 + 7) * 1_000_000_000 + 250_000_000;
470 assert_eq!(v.nanoseconds, expected_nanos);
471 }
472
473 #[test]
474 fn negative_components_and_fractional_seconds() {
475 let year = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
477 let month = Arc::new(Int32Array::from(vec![Some(-2)])) as ArrayRef;
478 let week = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
479 let day = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
480 let hour = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
481 let mins = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
482 let secs = Arc::new(Float64Array::from(vec![Some(-1.5)])) as ArrayRef;
483
484 let out = run_make_interval_month_day_nano(vec![
485 year, month, week, day, hour, mins, secs,
486 ])
487 .unwrap();
488 let out = out
489 .as_any()
490 .downcast_ref::<IntervalMonthDayNanoArray>()
491 .unwrap();
492
493 assert_eq!(out.len(), 1);
494 assert_eq!(out.null_count(), 0);
495 let v = out.value(0);
496
497 assert_eq!(v.months, -12 + (-2)); assert_eq!(v.days, -7 + (-1)); let expected_nanos = -((3600_i64 + 60 + 1) * 1_000_000_000 + 500_000_000);
502 assert_eq!(v.nanoseconds, expected_nanos);
503 }
504
505 fn invoke_make_interval_with_args(
506 args: Vec<ColumnarValue>,
507 number_rows: usize,
508 ) -> Result<ColumnarValue, DataFusionError> {
509 let arg_fields = args
510 .iter()
511 .map(|arg| Field::new("a", arg.data_type(), true).into())
512 .collect::<Vec<_>>();
513 let args = ScalarFunctionArgs {
514 args,
515 arg_fields,
516 number_rows,
517 return_field: Field::new("f", Interval(MonthDayNano), true).into(),
518 config_options: Arc::new(ConfigOptions::default()),
519 };
520 SparkMakeInterval::new().invoke_with_args(args)
521 }
522
523 #[test]
524 fn zero_args_returns_zero_seconds() -> Result<()> {
525 let number_rows = 2;
526 let res: ColumnarValue = invoke_make_interval_with_args(vec![], number_rows)?;
527
528 match res {
529 ColumnarValue::Array(arr) => {
530 let arr = arr
531 .as_any()
532 .downcast_ref::<IntervalMonthDayNanoArray>()
533 .ok_or_else(|| {
534 internal_datafusion_err!("expected IntervalMonthDayNanoArray")
535 })?;
536 if arr.len() != number_rows {
537 return internal_err!(
538 "expected array length {number_rows}, got {}",
539 arr.len()
540 );
541 }
542 for i in 0..number_rows {
543 let iv = arr.value(i);
544 if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
545 return internal_err!(
546 "row {i}: expected (0,0,0), got ({},{},{})",
547 iv.months,
548 iv.days,
549 iv.nanoseconds
550 );
551 }
552 }
553 }
554 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
555 if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
556 return internal_err!(
557 "expected scalar 0s, got ({},{},{})",
558 iv.months,
559 iv.days,
560 iv.nanoseconds
561 );
562 }
563 }
564 other => {
565 return internal_err!(
566 "expected Array or Scalar IntervalMonthDayNano, got {other:?}"
567 );
568 }
569 }
570
571 Ok(())
572 }
573}