1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{Array, ArrayRef, IntervalMonthDayNanoBuilder, PrimitiveArray};
22use arrow::datatypes::DataType::Interval;
23use arrow::datatypes::IntervalUnit::MonthDayNano;
24use arrow::datatypes::{DataType, IntervalMonthDayNano};
25use datafusion_common::types::{NativeType, logical_float64, logical_int32};
26use datafusion_common::{DataFusionError, Result, ScalarValue, plan_datafusion_err};
27use datafusion_expr::{
28 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
29 TypeSignatureClass, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32
33#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkMakeInterval {
35 signature: Signature,
36}
37
38impl Default for SparkMakeInterval {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl SparkMakeInterval {
45 pub fn new() -> Self {
46 let int32 = Coercion::new_implicit(
47 TypeSignatureClass::Native(logical_int32()),
48 vec![TypeSignatureClass::Integer],
49 NativeType::Int32,
50 );
51
52 let float64 = Coercion::new_implicit(
53 TypeSignatureClass::Native(logical_float64()),
54 vec![TypeSignatureClass::Numeric],
55 NativeType::Float64,
56 );
57
58 let variants = vec![
59 TypeSignature::Nullary,
60 TypeSignature::Coercible(vec![int32.clone()]),
62 TypeSignature::Coercible(vec![int32.clone(), int32.clone()]),
64 TypeSignature::Coercible(vec![int32.clone(), int32.clone(), int32.clone()]),
66 TypeSignature::Coercible(vec![
68 int32.clone(),
69 int32.clone(),
70 int32.clone(),
71 int32.clone(),
72 ]),
73 TypeSignature::Coercible(vec![
75 int32.clone(),
76 int32.clone(),
77 int32.clone(),
78 int32.clone(),
79 int32.clone(),
80 ]),
81 TypeSignature::Coercible(vec![
83 int32.clone(),
84 int32.clone(),
85 int32.clone(),
86 int32.clone(),
87 int32.clone(),
88 int32.clone(),
89 ]),
90 TypeSignature::Coercible(vec![
92 int32.clone(),
93 int32.clone(),
94 int32.clone(),
95 int32.clone(),
96 int32.clone(),
97 int32.clone(),
98 float64.clone(),
99 ]),
100 ];
101
102 Self {
103 signature: Signature::one_of(variants, Volatility::Immutable),
104 }
105 }
106}
107
108impl ScalarUDFImpl for SparkMakeInterval {
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112
113 fn name(&self) -> &str {
114 "make_interval"
115 }
116
117 fn signature(&self) -> &Signature {
118 &self.signature
119 }
120
121 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
122 Ok(Interval(MonthDayNano))
123 }
124
125 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
126 if args.args.is_empty() {
127 return Ok(ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(
128 Some(IntervalMonthDayNano::new(0, 0, 0)),
129 )));
130 }
131 make_scalar_function(make_interval_kernel, vec![])(&args.args)
132 }
133}
134
135fn make_interval_kernel(args: &[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
136 use arrow::array::AsArray;
137 use arrow::datatypes::{Float64Type, Int32Type};
138
139 let n_rows = args[0].len();
140
141 let years = args[0]
142 .as_primitive_opt::<Int32Type>()
143 .ok_or_else(|| plan_datafusion_err!("make_interval arg[0] must be Int32"))?;
144 let months = args
145 .get(1)
146 .map(|a| {
147 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
148 plan_datafusion_err!("make_dt_interval arg[1] must be Int32")
149 })
150 })
151 .transpose()?;
152 let weeks = args
153 .get(2)
154 .map(|a| {
155 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
156 plan_datafusion_err!("make_dt_interval arg[2] must be Int32")
157 })
158 })
159 .transpose()?;
160 let days: Option<&PrimitiveArray<Int32Type>> = args
161 .get(3)
162 .map(|a| {
163 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
164 plan_datafusion_err!("make_dt_interval arg[3] must be Int32")
165 })
166 })
167 .transpose()?;
168 let hours: Option<&PrimitiveArray<Int32Type>> = args
169 .get(4)
170 .map(|a| {
171 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
172 plan_datafusion_err!("make_dt_interval arg[4] must be Int32")
173 })
174 })
175 .transpose()?;
176 let mins: Option<&PrimitiveArray<Int32Type>> = args
177 .get(5)
178 .map(|a| {
179 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
180 plan_datafusion_err!("make_dt_interval arg[5] must be Int32")
181 })
182 })
183 .transpose()?;
184 let secs: Option<&PrimitiveArray<Float64Type>> = args
185 .get(6)
186 .map(|a| {
187 a.as_primitive_opt::<Float64Type>().ok_or_else(|| {
188 plan_datafusion_err!("make_dt_interval arg[6] must be Float64")
189 })
190 })
191 .transpose()?;
192
193 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(n_rows);
194
195 for i in 0..n_rows {
196 let any_null_present = years.is_null(i)
198 || months.as_ref().is_some_and(|a| a.is_null(i))
199 || weeks.as_ref().is_some_and(|a| a.is_null(i))
200 || days.as_ref().is_some_and(|a| a.is_null(i))
201 || hours.as_ref().is_some_and(|a| a.is_null(i))
202 || mins.as_ref().is_some_and(|a| a.is_null(i))
203 || secs
204 .as_ref()
205 .is_some_and(|a| a.is_null(i) || !a.value(i).is_finite());
206
207 if any_null_present {
208 builder.append_null();
209 continue;
210 }
211
212 let y = years.value(i);
214 let mo = months.as_ref().map_or(0, |a| a.value(i));
215 let w = weeks.as_ref().map_or(0, |a| a.value(i));
216 let d = days.as_ref().map_or(0, |a| a.value(i));
217 let h = hours.as_ref().map_or(0, |a| a.value(i));
218 let mi = mins.as_ref().map_or(0, |a| a.value(i));
219 let s = secs.as_ref().map_or(0.0, |a| a.value(i));
220
221 match make_interval_month_day_nano(y, mo, w, d, h, mi, s) {
222 Some(v) => builder.append_value(v),
223 None => {
224 builder.append_null();
225 continue;
226 }
227 }
228 }
229
230 Ok(Arc::new(builder.finish()))
231}
232
233fn make_interval_month_day_nano(
234 year: i32,
235 month: i32,
236 week: i32,
237 day: i32,
238 hour: i32,
239 min: i32,
240 sec: f64,
241) -> Option<IntervalMonthDayNano> {
242 let months = year.checked_mul(12).and_then(|v| v.checked_add(month))?;
244 let total_days = week.checked_mul(7).and_then(|v| v.checked_add(day))?;
245
246 let hours_nanos = (hour as i64).checked_mul(3_600_000_000_000)?;
247 let mins_nanos = (min as i64).checked_mul(60_000_000_000)?;
248
249 let sec_int = sec.trunc() as i64;
250 let frac = sec - sec.trunc();
251 let mut frac_nanos = (frac * 1_000_000_000.0).round() as i64;
252
253 if frac_nanos.abs() >= 1_000_000_000 {
254 if frac_nanos > 0 {
255 frac_nanos -= 1_000_000_000;
256 } else {
257 frac_nanos += 1_000_000_000;
258 }
259 }
260
261 let secs_nanos = sec_int.checked_mul(1_000_000_000)?;
262
263 let total_nanos = hours_nanos
264 .checked_add(mins_nanos)
265 .and_then(|v| v.checked_add(secs_nanos))
266 .and_then(|v| v.checked_add(frac_nanos))?;
267
268 Some(IntervalMonthDayNano::new(months, total_days, total_nanos))
269}
270
271#[cfg(test)]
272mod tests {
273 use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
274 use arrow::datatypes::Field;
275 use datafusion_common::config::ConfigOptions;
276 use datafusion_common::{
277 Result, assert_eq_or_internal_err, internal_datafusion_err, internal_err,
278 };
279
280 use super::*;
281 fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
282 make_interval_kernel(&arrs)
283 }
284
285 #[test]
286 fn nulls_propagate_per_row() {
287 let year = Arc::new(Int32Array::from(vec![
288 None,
289 Some(2),
290 Some(3),
291 Some(4),
292 Some(5),
293 Some(6),
294 Some(7),
295 Some(8),
296 Some(9),
297 ]));
298 let month = Arc::new(Int32Array::from(vec![
299 Some(1),
300 None,
301 Some(3),
302 Some(4),
303 Some(5),
304 Some(6),
305 Some(7),
306 Some(8),
307 Some(9),
308 ]));
309 let week = Arc::new(Int32Array::from(vec![
310 Some(1),
311 Some(2),
312 None,
313 Some(4),
314 Some(5),
315 Some(6),
316 Some(7),
317 Some(8),
318 Some(9),
319 ]));
320 let day = Arc::new(Int32Array::from(vec![
321 Some(1),
322 Some(2),
323 Some(3),
324 None,
325 Some(5),
326 Some(6),
327 Some(7),
328 Some(8),
329 Some(9),
330 ]));
331 let hour = Arc::new(Int32Array::from(vec![
332 Some(1),
333 Some(2),
334 Some(3),
335 Some(4),
336 None,
337 Some(6),
338 Some(7),
339 Some(8),
340 Some(9),
341 ]));
342 let min = Arc::new(Int32Array::from(vec![
343 Some(1),
344 Some(2),
345 Some(3),
346 Some(4),
347 Some(5),
348 None,
349 Some(7),
350 Some(8),
351 Some(9),
352 ]));
353 let sec = Arc::new(Float64Array::from(vec![
354 Some(1.0),
355 Some(2.0),
356 Some(3.0),
357 Some(4.0),
358 Some(5.0),
359 Some(6.0),
360 None,
361 Some(f64::INFINITY),
362 Some(f64::NEG_INFINITY),
363 ]));
364
365 let out = run_make_interval_month_day_nano(vec![
366 year, month, week, day, hour, min, sec,
367 ])
368 .unwrap();
369 let out = out
370 .as_any()
371 .downcast_ref::<IntervalMonthDayNanoArray>()
372 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
373 .unwrap();
374
375 for i in 0..out.len() {
376 assert!(out.is_null(i), "row {i} should be NULL");
377 }
378 }
379
380 #[test]
381 fn error_months_overflow_should_be_null() {
382 let year = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
384 let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
385 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
386 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
387 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
388 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
389 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
390
391 let out = run_make_interval_month_day_nano(vec![
392 year, month, week, day, hour, min, sec,
393 ])
394 .unwrap();
395 let out = out
396 .as_any()
397 .downcast_ref::<IntervalMonthDayNanoArray>()
398 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
399 .unwrap();
400
401 for i in 0..out.len() {
402 assert!(out.is_null(i), "row {i} should be NULL");
403 }
404 }
405 #[test]
406 fn error_days_overflow_should_be_null() {
407 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
409 let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
410 let week = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
411 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
412 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
413 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
414 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
415
416 let out = run_make_interval_month_day_nano(vec![
417 year, month, week, day, hour, min, sec,
418 ])
419 .unwrap();
420 let out = out
421 .as_any()
422 .downcast_ref::<IntervalMonthDayNanoArray>()
423 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
424 .unwrap();
425
426 for i in 0..out.len() {
427 assert!(out.is_null(i), "row {i} should be NULL");
428 }
429 }
430 #[test]
431 fn error_min_overflow_should_be_null() {
432 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
433 let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
434 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
435 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
436 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
437 let min = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
438 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
439
440 let out = run_make_interval_month_day_nano(vec![
441 year, month, week, day, hour, min, sec,
442 ])
443 .unwrap();
444 let out = out
445 .as_any()
446 .downcast_ref::<IntervalMonthDayNanoArray>()
447 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
448 .unwrap();
449
450 for i in 0..out.len() {
451 assert!(out.is_null(i), "row {i} should be NULL");
452 }
453 }
454 #[test]
455 fn error_sec_overflow_should_be_null() {
456 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
457 let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
458 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
459 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
460 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
461 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
462 let sec = Arc::new(Float64Array::from(vec![Some(f64::MAX)])) as ArrayRef;
463
464 let out = run_make_interval_month_day_nano(vec![
465 year, month, week, day, hour, min, sec,
466 ])
467 .unwrap();
468 let out = out
469 .as_any()
470 .downcast_ref::<IntervalMonthDayNanoArray>()
471 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
472 .unwrap();
473
474 for i in 0..out.len() {
475 assert!(out.is_null(i), "row {i} should be NULL");
476 }
477 }
478
479 #[test]
480 fn happy_path_all_present_single_row() {
481 let year = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
483 let month = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
484 let week = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef;
485 let day = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef;
486 let hour = Arc::new(Int32Array::from(vec![Some(5)])) as ArrayRef;
487 let mins = Arc::new(Int32Array::from(vec![Some(6)])) as ArrayRef;
488 let secs = Arc::new(Float64Array::from(vec![Some(7.25)])) as ArrayRef;
489
490 let out = run_make_interval_month_day_nano(vec![
491 year, month, week, day, hour, mins, secs,
492 ])
493 .unwrap();
494 assert_eq!(out.data_type(), &Interval(MonthDayNano));
495
496 let out = out
497 .as_any()
498 .downcast_ref::<IntervalMonthDayNanoArray>()
499 .unwrap();
500 assert_eq!(out.len(), 1);
501 assert_eq!(out.null_count(), 0);
502
503 let v: IntervalMonthDayNano = out.value(0);
504 assert_eq!(v.months, 12 + 2); assert_eq!(v.days, 3 * 7 + 4); let expected_nanos = (5_i64 * 3600 + 6 * 60 + 7) * 1_000_000_000 + 250_000_000;
507 assert_eq!(v.nanoseconds, expected_nanos);
508 }
509
510 #[test]
511 fn negative_components_and_fractional_seconds() {
512 let year = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
514 let month = Arc::new(Int32Array::from(vec![Some(-2)])) as ArrayRef;
515 let week = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
516 let day = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
517 let hour = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
518 let mins = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
519 let secs = Arc::new(Float64Array::from(vec![Some(-1.5)])) as ArrayRef;
520
521 let out = run_make_interval_month_day_nano(vec![
522 year, month, week, day, hour, mins, secs,
523 ])
524 .unwrap();
525 let out = out
526 .as_any()
527 .downcast_ref::<IntervalMonthDayNanoArray>()
528 .unwrap();
529
530 assert_eq!(out.len(), 1);
531 assert_eq!(out.null_count(), 0);
532 let v = out.value(0);
533
534 assert_eq!(v.months, -12 + (-2)); assert_eq!(v.days, -7 + (-1)); let expected_nanos = -((3600_i64 + 60 + 1) * 1_000_000_000 + 500_000_000);
539 assert_eq!(v.nanoseconds, expected_nanos);
540 }
541
542 fn invoke_make_interval_with_args(
543 args: Vec<ColumnarValue>,
544 number_rows: usize,
545 ) -> Result<ColumnarValue, DataFusionError> {
546 let arg_fields = args
547 .iter()
548 .map(|arg| Field::new("a", arg.data_type(), true).into())
549 .collect::<Vec<_>>();
550 let args = ScalarFunctionArgs {
551 args,
552 arg_fields,
553 number_rows,
554 return_field: Field::new("f", Interval(MonthDayNano), true).into(),
555 config_options: Arc::new(ConfigOptions::default()),
556 };
557 SparkMakeInterval::new().invoke_with_args(args)
558 }
559
560 #[test]
561 fn zero_args_returns_zero_seconds() -> Result<()> {
562 let number_rows = 2;
563 let res: ColumnarValue = invoke_make_interval_with_args(vec![], number_rows)?;
564
565 match res {
566 ColumnarValue::Array(arr) => {
567 let arr = arr
568 .as_any()
569 .downcast_ref::<IntervalMonthDayNanoArray>()
570 .ok_or_else(|| {
571 internal_datafusion_err!("expected IntervalMonthDayNanoArray")
572 })?;
573 assert_eq_or_internal_err!(
574 arr.len(),
575 number_rows,
576 "expected array length {number_rows}"
577 );
578 for i in 0..number_rows {
579 let iv = arr.value(i);
580 assert_eq_or_internal_err!(
581 (iv.months, iv.days, iv.nanoseconds),
582 (0, 0, 0),
583 "row {i}: expected (0,0,0), got ({},{},{})",
584 iv.months,
585 iv.days,
586 iv.nanoseconds
587 );
588 }
589 }
590 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
591 assert_eq_or_internal_err!(
592 (iv.months, iv.days, iv.nanoseconds),
593 (0, 0, 0),
594 "expected scalar 0s, got ({},{},{})",
595 iv.months,
596 iv.days,
597 iv.nanoseconds
598 );
599 }
600 other => {
601 return internal_err!(
602 "expected Array or Scalar IntervalMonthDayNano, got {other:?}"
603 );
604 }
605 }
606
607 Ok(())
608 }
609}