1use std::sync::Arc;
19
20use arrow::array::{Array, ArrayRef, IntervalMonthDayNanoBuilder, PrimitiveArray};
21use arrow::datatypes::DataType::Interval;
22use arrow::datatypes::IntervalUnit::MonthDayNano;
23use arrow::datatypes::{DataType, IntervalMonthDayNano};
24use datafusion_common::types::{NativeType, logical_float64, logical_int32};
25use datafusion_common::{DataFusionError, Result, ScalarValue, plan_datafusion_err};
26use datafusion_expr::{
27 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
28 TypeSignatureClass, Volatility,
29};
30use datafusion_functions::utils::make_scalar_function;
31
32#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct SparkMakeInterval {
34 signature: Signature,
35}
36
37impl Default for SparkMakeInterval {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl SparkMakeInterval {
44 pub fn new() -> Self {
45 let int32 = Coercion::new_implicit(
46 TypeSignatureClass::Native(logical_int32()),
47 vec![TypeSignatureClass::Integer],
48 NativeType::Int32,
49 );
50
51 let float64 = Coercion::new_implicit(
52 TypeSignatureClass::Native(logical_float64()),
53 vec![TypeSignatureClass::Numeric],
54 NativeType::Float64,
55 );
56
57 let variants = vec![
58 TypeSignature::Nullary,
59 TypeSignature::Coercible(vec![int32.clone()]),
61 TypeSignature::Coercible(vec![int32.clone(), int32.clone()]),
63 TypeSignature::Coercible(vec![int32.clone(), int32.clone(), int32.clone()]),
65 TypeSignature::Coercible(vec![
67 int32.clone(),
68 int32.clone(),
69 int32.clone(),
70 int32.clone(),
71 ]),
72 TypeSignature::Coercible(vec![
74 int32.clone(),
75 int32.clone(),
76 int32.clone(),
77 int32.clone(),
78 int32.clone(),
79 ]),
80 TypeSignature::Coercible(vec![
82 int32.clone(),
83 int32.clone(),
84 int32.clone(),
85 int32.clone(),
86 int32.clone(),
87 int32.clone(),
88 ]),
89 TypeSignature::Coercible(vec![
91 int32.clone(),
92 int32.clone(),
93 int32.clone(),
94 int32.clone(),
95 int32.clone(),
96 int32.clone(),
97 float64.clone(),
98 ]),
99 ];
100
101 Self {
102 signature: Signature::one_of(variants, Volatility::Immutable),
103 }
104 }
105}
106
107impl ScalarUDFImpl for SparkMakeInterval {
108 fn name(&self) -> &str {
109 "make_interval"
110 }
111
112 fn signature(&self) -> &Signature {
113 &self.signature
114 }
115
116 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
117 Ok(Interval(MonthDayNano))
118 }
119
120 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
121 if args.args.is_empty() {
122 return Ok(ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(
123 Some(IntervalMonthDayNano::new(0, 0, 0)),
124 )));
125 }
126 make_scalar_function(make_interval_kernel, vec![])(&args.args)
127 }
128}
129
130fn make_interval_kernel(args: &[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
131 use arrow::array::AsArray;
132 use arrow::datatypes::{Float64Type, Int32Type};
133
134 let n_rows = args[0].len();
135
136 let years = args[0]
137 .as_primitive_opt::<Int32Type>()
138 .ok_or_else(|| plan_datafusion_err!("make_interval arg[0] must be Int32"))?;
139 let months = args
140 .get(1)
141 .map(|a| {
142 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
143 plan_datafusion_err!("make_dt_interval arg[1] must be Int32")
144 })
145 })
146 .transpose()?;
147 let weeks = args
148 .get(2)
149 .map(|a| {
150 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
151 plan_datafusion_err!("make_dt_interval arg[2] must be Int32")
152 })
153 })
154 .transpose()?;
155 let days: Option<&PrimitiveArray<Int32Type>> = args
156 .get(3)
157 .map(|a| {
158 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
159 plan_datafusion_err!("make_dt_interval arg[3] must be Int32")
160 })
161 })
162 .transpose()?;
163 let hours: Option<&PrimitiveArray<Int32Type>> = args
164 .get(4)
165 .map(|a| {
166 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
167 plan_datafusion_err!("make_dt_interval arg[4] must be Int32")
168 })
169 })
170 .transpose()?;
171 let mins: Option<&PrimitiveArray<Int32Type>> = args
172 .get(5)
173 .map(|a| {
174 a.as_primitive_opt::<Int32Type>().ok_or_else(|| {
175 plan_datafusion_err!("make_dt_interval arg[5] must be Int32")
176 })
177 })
178 .transpose()?;
179 let secs: Option<&PrimitiveArray<Float64Type>> = args
180 .get(6)
181 .map(|a| {
182 a.as_primitive_opt::<Float64Type>().ok_or_else(|| {
183 plan_datafusion_err!("make_dt_interval arg[6] must be Float64")
184 })
185 })
186 .transpose()?;
187
188 let mut builder = IntervalMonthDayNanoBuilder::with_capacity(n_rows);
189
190 for i in 0..n_rows {
191 let any_null_present = years.is_null(i)
193 || months.as_ref().is_some_and(|a| a.is_null(i))
194 || weeks.as_ref().is_some_and(|a| a.is_null(i))
195 || days.as_ref().is_some_and(|a| a.is_null(i))
196 || hours.as_ref().is_some_and(|a| a.is_null(i))
197 || mins.as_ref().is_some_and(|a| a.is_null(i))
198 || secs
199 .as_ref()
200 .is_some_and(|a| a.is_null(i) || !a.value(i).is_finite());
201
202 if any_null_present {
203 builder.append_null();
204 continue;
205 }
206
207 let y = years.value(i);
209 let mo = months.as_ref().map_or(0, |a| a.value(i));
210 let w = weeks.as_ref().map_or(0, |a| a.value(i));
211 let d = days.as_ref().map_or(0, |a| a.value(i));
212 let h = hours.as_ref().map_or(0, |a| a.value(i));
213 let mi = mins.as_ref().map_or(0, |a| a.value(i));
214 let s = secs.as_ref().map_or(0.0, |a| a.value(i));
215
216 match make_interval_month_day_nano(y, mo, w, d, h, mi, s) {
217 Some(v) => builder.append_value(v),
218 None => {
219 builder.append_null();
220 continue;
221 }
222 }
223 }
224
225 Ok(Arc::new(builder.finish()))
226}
227
228fn make_interval_month_day_nano(
229 year: i32,
230 month: i32,
231 week: i32,
232 day: i32,
233 hour: i32,
234 min: i32,
235 sec: f64,
236) -> Option<IntervalMonthDayNano> {
237 let months = year.checked_mul(12).and_then(|v| v.checked_add(month))?;
239 let total_days = week.checked_mul(7).and_then(|v| v.checked_add(day))?;
240
241 let hours_nanos = (hour as i64).checked_mul(3_600_000_000_000)?;
242 let mins_nanos = (min as i64).checked_mul(60_000_000_000)?;
243
244 let sec_int = sec.trunc() as i64;
245 let frac = sec - sec.trunc();
246 let mut frac_nanos = (frac * 1_000_000_000.0).round() as i64;
247
248 if frac_nanos.abs() >= 1_000_000_000 {
249 if frac_nanos > 0 {
250 frac_nanos -= 1_000_000_000;
251 } else {
252 frac_nanos += 1_000_000_000;
253 }
254 }
255
256 let secs_nanos = sec_int.checked_mul(1_000_000_000)?;
257
258 let total_nanos = hours_nanos
259 .checked_add(mins_nanos)
260 .and_then(|v| v.checked_add(secs_nanos))
261 .and_then(|v| v.checked_add(frac_nanos))?;
262
263 Some(IntervalMonthDayNano::new(months, total_days, total_nanos))
264}
265
266#[cfg(test)]
267mod tests {
268 use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
269 use arrow::datatypes::Field;
270 use datafusion_common::config::ConfigOptions;
271 use datafusion_common::{
272 Result, assert_eq_or_internal_err, internal_datafusion_err, internal_err,
273 };
274
275 use super::*;
276 fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
277 make_interval_kernel(&arrs)
278 }
279
280 #[test]
281 fn nulls_propagate_per_row() {
282 let year = Arc::new(Int32Array::from(vec![
283 None,
284 Some(2),
285 Some(3),
286 Some(4),
287 Some(5),
288 Some(6),
289 Some(7),
290 Some(8),
291 Some(9),
292 ]));
293 let month = Arc::new(Int32Array::from(vec![
294 Some(1),
295 None,
296 Some(3),
297 Some(4),
298 Some(5),
299 Some(6),
300 Some(7),
301 Some(8),
302 Some(9),
303 ]));
304 let week = Arc::new(Int32Array::from(vec![
305 Some(1),
306 Some(2),
307 None,
308 Some(4),
309 Some(5),
310 Some(6),
311 Some(7),
312 Some(8),
313 Some(9),
314 ]));
315 let day = Arc::new(Int32Array::from(vec![
316 Some(1),
317 Some(2),
318 Some(3),
319 None,
320 Some(5),
321 Some(6),
322 Some(7),
323 Some(8),
324 Some(9),
325 ]));
326 let hour = Arc::new(Int32Array::from(vec![
327 Some(1),
328 Some(2),
329 Some(3),
330 Some(4),
331 None,
332 Some(6),
333 Some(7),
334 Some(8),
335 Some(9),
336 ]));
337 let min = Arc::new(Int32Array::from(vec![
338 Some(1),
339 Some(2),
340 Some(3),
341 Some(4),
342 Some(5),
343 None,
344 Some(7),
345 Some(8),
346 Some(9),
347 ]));
348 let sec = Arc::new(Float64Array::from(vec![
349 Some(1.0),
350 Some(2.0),
351 Some(3.0),
352 Some(4.0),
353 Some(5.0),
354 Some(6.0),
355 None,
356 Some(f64::INFINITY),
357 Some(f64::NEG_INFINITY),
358 ]));
359
360 let out = run_make_interval_month_day_nano(vec![
361 year, month, week, day, hour, min, sec,
362 ])
363 .unwrap();
364 let out = out
365 .as_any()
366 .downcast_ref::<IntervalMonthDayNanoArray>()
367 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
368 .unwrap();
369
370 for i in 0..out.len() {
371 assert!(out.is_null(i), "row {i} should be NULL");
372 }
373 }
374
375 #[test]
376 fn error_months_overflow_should_be_null() {
377 let year = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
379 let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
380 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
381 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
382 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
383 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
384 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
385
386 let out = run_make_interval_month_day_nano(vec![
387 year, month, week, day, hour, min, sec,
388 ])
389 .unwrap();
390 let out = out
391 .as_any()
392 .downcast_ref::<IntervalMonthDayNanoArray>()
393 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
394 .unwrap();
395
396 for i in 0..out.len() {
397 assert!(out.is_null(i), "row {i} should be NULL");
398 }
399 }
400 #[test]
401 fn error_days_overflow_should_be_null() {
402 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
404 let month = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
405 let week = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
406 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
407 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
408 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
409 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
410
411 let out = run_make_interval_month_day_nano(vec![
412 year, month, week, day, hour, min, sec,
413 ])
414 .unwrap();
415 let out = out
416 .as_any()
417 .downcast_ref::<IntervalMonthDayNanoArray>()
418 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
419 .unwrap();
420
421 for i in 0..out.len() {
422 assert!(out.is_null(i), "row {i} should be NULL");
423 }
424 }
425 #[test]
426 fn error_min_overflow_should_be_null() {
427 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
428 let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
429 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
430 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
431 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
432 let min = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef;
433 let sec = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef;
434
435 let out = run_make_interval_month_day_nano(vec![
436 year, month, week, day, hour, min, sec,
437 ])
438 .unwrap();
439 let out = out
440 .as_any()
441 .downcast_ref::<IntervalMonthDayNanoArray>()
442 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
443 .unwrap();
444
445 for i in 0..out.len() {
446 assert!(out.is_null(i), "row {i} should be NULL");
447 }
448 }
449 #[test]
450 fn error_sec_overflow_should_be_null() {
451 let year = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
452 let month = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
453 let week = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
454 let day = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
455 let hour = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
456 let min = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef;
457 let sec = Arc::new(Float64Array::from(vec![Some(f64::MAX)])) as ArrayRef;
458
459 let out = run_make_interval_month_day_nano(vec![
460 year, month, week, day, hour, min, sec,
461 ])
462 .unwrap();
463 let out = out
464 .as_any()
465 .downcast_ref::<IntervalMonthDayNanoArray>()
466 .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano"))
467 .unwrap();
468
469 for i in 0..out.len() {
470 assert!(out.is_null(i), "row {i} should be NULL");
471 }
472 }
473
474 #[test]
475 fn happy_path_all_present_single_row() {
476 let year = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef;
478 let month = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
479 let week = Arc::new(Int32Array::from(vec![Some(3)])) as ArrayRef;
480 let day = Arc::new(Int32Array::from(vec![Some(4)])) as ArrayRef;
481 let hour = Arc::new(Int32Array::from(vec![Some(5)])) as ArrayRef;
482 let mins = Arc::new(Int32Array::from(vec![Some(6)])) as ArrayRef;
483 let secs = Arc::new(Float64Array::from(vec![Some(7.25)])) as ArrayRef;
484
485 let out = run_make_interval_month_day_nano(vec![
486 year, month, week, day, hour, mins, secs,
487 ])
488 .unwrap();
489 assert_eq!(out.data_type(), &Interval(MonthDayNano));
490
491 let out = out
492 .as_any()
493 .downcast_ref::<IntervalMonthDayNanoArray>()
494 .unwrap();
495 assert_eq!(out.len(), 1);
496 assert_eq!(out.null_count(), 0);
497
498 let v: IntervalMonthDayNano = out.value(0);
499 assert_eq!(v.months, 12 + 2); assert_eq!(v.days, 3 * 7 + 4); let expected_nanos = (5_i64 * 3600 + 6 * 60 + 7) * 1_000_000_000 + 250_000_000;
502 assert_eq!(v.nanoseconds, expected_nanos);
503 }
504
505 #[test]
506 fn negative_components_and_fractional_seconds() {
507 let year = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
509 let month = Arc::new(Int32Array::from(vec![Some(-2)])) as ArrayRef;
510 let week = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
511 let day = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
512 let hour = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
513 let mins = Arc::new(Int32Array::from(vec![Some(-1)])) as ArrayRef;
514 let secs = Arc::new(Float64Array::from(vec![Some(-1.5)])) as ArrayRef;
515
516 let out = run_make_interval_month_day_nano(vec![
517 year, month, week, day, hour, mins, secs,
518 ])
519 .unwrap();
520 let out = out
521 .as_any()
522 .downcast_ref::<IntervalMonthDayNanoArray>()
523 .unwrap();
524
525 assert_eq!(out.len(), 1);
526 assert_eq!(out.null_count(), 0);
527 let v = out.value(0);
528
529 assert_eq!(v.months, -12 + (-2)); assert_eq!(v.days, -7 + (-1)); let expected_nanos = -((3600_i64 + 60 + 1) * 1_000_000_000 + 500_000_000);
534 assert_eq!(v.nanoseconds, expected_nanos);
535 }
536
537 fn invoke_make_interval_with_args(
538 args: Vec<ColumnarValue>,
539 number_rows: usize,
540 ) -> Result<ColumnarValue, DataFusionError> {
541 let arg_fields = args
542 .iter()
543 .map(|arg| Field::new("a", arg.data_type(), true).into())
544 .collect::<Vec<_>>();
545 let args = ScalarFunctionArgs {
546 args,
547 arg_fields,
548 number_rows,
549 return_field: Field::new("f", Interval(MonthDayNano), true).into(),
550 config_options: Arc::new(ConfigOptions::default()),
551 };
552 SparkMakeInterval::new().invoke_with_args(args)
553 }
554
555 #[test]
556 fn zero_args_returns_zero_seconds() -> Result<()> {
557 let number_rows = 2;
558 let res: ColumnarValue = invoke_make_interval_with_args(vec![], number_rows)?;
559
560 match res {
561 ColumnarValue::Array(arr) => {
562 let arr = arr
563 .as_any()
564 .downcast_ref::<IntervalMonthDayNanoArray>()
565 .ok_or_else(|| {
566 internal_datafusion_err!("expected IntervalMonthDayNanoArray")
567 })?;
568 assert_eq_or_internal_err!(
569 arr.len(),
570 number_rows,
571 "expected array length {number_rows}"
572 );
573 for i in 0..number_rows {
574 let iv = arr.value(i);
575 assert_eq_or_internal_err!(
576 (iv.months, iv.days, iv.nanoseconds),
577 (0, 0, 0),
578 "row {i}: expected (0,0,0), got ({},{},{})",
579 iv.months,
580 iv.days,
581 iv.nanoseconds
582 );
583 }
584 }
585 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
586 assert_eq_or_internal_err!(
587 (iv.months, iv.days, iv.nanoseconds),
588 (0, 0, 0),
589 "expected scalar 0s, got ({},{},{})",
590 iv.months,
591 iv.days,
592 iv.nanoseconds
593 );
594 }
595 other => {
596 return internal_err!(
597 "expected Array or Scalar IntervalMonthDayNano, got {other:?}"
598 );
599 }
600 }
601
602 Ok(())
603 }
604}