Skip to main content

datafusion_spark/function/math/
modulus.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Scalar, new_null_array};
19use arrow::compute::kernels::numeric::add;
20use arrow::compute::kernels::{
21    cmp::{eq, lt},
22    numeric::rem,
23    zip::zip,
24};
25use arrow::datatypes::DataType;
26use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err};
27use datafusion_expr::{
28    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
29};
30use std::any::Any;
31
32/// Attempts `rem(left, right)` with per-element divide-by-zero handling.
33/// In ANSI mode, any zero divisor causes an error.
34/// In legacy mode (ANSI off), positions where the divisor is zero return NULL
35/// while other positions compute normally.
36fn try_rem(
37    left: &arrow::array::ArrayRef,
38    right: &arrow::array::ArrayRef,
39    enable_ansi_mode: bool,
40) -> Result<arrow::array::ArrayRef> {
41    match rem(left, right) {
42        Ok(result) => Ok(result),
43        Err(arrow::error::ArrowError::DivideByZero) if !enable_ansi_mode => {
44            // Integer rem fails when ANY divisor element is zero.
45            // Handle per-element: null out zero divisors
46            let zero = ScalarValue::new_zero(right.data_type())?.to_array()?;
47            let zero = Scalar::new(zero);
48            let null = Scalar::new(new_null_array(right.data_type(), 1));
49            let is_zero = eq(right, &zero)?;
50            let safe_right = zip(&is_zero, &null, right)?;
51            Ok(rem(left, &safe_right)?)
52        }
53        Err(e) => Err(e.into()),
54    }
55}
56
57/// Spark-compatible `mod` function
58/// In ANSI mode, division by zero throws an error.
59/// In legacy mode, division by zero returns NULL (Spark behavior).
60pub fn spark_mod(
61    args: &[ColumnarValue],
62    enable_ansi_mode: bool,
63) -> Result<ColumnarValue> {
64    assert_eq_or_internal_err!(args.len(), 2, "mod expects exactly two arguments");
65    let args = ColumnarValue::values_to_arrays(args)?;
66    let result = try_rem(&args[0], &args[1], enable_ansi_mode)?;
67    Ok(ColumnarValue::Array(result))
68}
69
70/// Spark-compatible `pmod` function
71/// In ANSI mode, division by zero throws an error.
72/// In legacy mode, division by zero returns NULL (Spark behavior).
73pub fn spark_pmod(
74    args: &[ColumnarValue],
75    enable_ansi_mode: bool,
76) -> Result<ColumnarValue> {
77    assert_eq_or_internal_err!(args.len(), 2, "pmod expects exactly two arguments");
78    let args = ColumnarValue::values_to_arrays(args)?;
79    let left = &args[0];
80    let right = &args[1];
81    let zero = ScalarValue::new_zero(left.data_type())?.to_array_of_size(left.len())?;
82    let result = try_rem(left, right, enable_ansi_mode)?;
83    let neg = lt(&result, &zero)?;
84    let plus = zip(&neg, right, &zero)?;
85    let result = add(&plus, &result)?;
86    let result = try_rem(&result, right, enable_ansi_mode)?;
87    Ok(ColumnarValue::Array(result))
88}
89
90/// SparkMod implements the Spark-compatible modulo function
91#[derive(Debug, PartialEq, Eq, Hash)]
92pub struct SparkMod {
93    signature: Signature,
94}
95
96impl Default for SparkMod {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102impl SparkMod {
103    pub fn new() -> Self {
104        Self {
105            signature: Signature::numeric(2, Volatility::Immutable),
106        }
107    }
108}
109
110impl ScalarUDFImpl for SparkMod {
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114
115    fn name(&self) -> &str {
116        "mod"
117    }
118
119    fn signature(&self) -> &Signature {
120        &self.signature
121    }
122
123    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
124        assert_eq_or_internal_err!(
125            arg_types.len(),
126            2,
127            "mod expects exactly two arguments"
128        );
129
130        // Return the same type as the first argument for simplicity
131        // Arrow's rem function handles type promotion internally
132        Ok(arg_types[0].clone())
133    }
134
135    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
136        spark_mod(&args.args, args.config_options.execution.enable_ansi_mode)
137    }
138}
139
140/// SparkMod implements the Spark-compatible modulo function
141#[derive(Debug, PartialEq, Eq, Hash)]
142pub struct SparkPmod {
143    signature: Signature,
144}
145
146impl Default for SparkPmod {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl SparkPmod {
153    pub fn new() -> Self {
154        Self {
155            signature: Signature::numeric(2, Volatility::Immutable),
156        }
157    }
158}
159
160impl ScalarUDFImpl for SparkPmod {
161    fn as_any(&self) -> &dyn Any {
162        self
163    }
164
165    fn name(&self) -> &str {
166        "pmod"
167    }
168
169    fn signature(&self) -> &Signature {
170        &self.signature
171    }
172
173    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
174        assert_eq_or_internal_err!(
175            arg_types.len(),
176            2,
177            "pmod expects exactly two arguments"
178        );
179
180        // Return the same type as the first argument for simplicity
181        // Arrow's rem function handles type promotion internally
182        Ok(arg_types[0].clone())
183    }
184
185    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
186        spark_pmod(&args.args, args.config_options.execution.enable_ansi_mode)
187    }
188}
189
190#[cfg(test)]
191mod test {
192    use std::sync::Arc;
193
194    use super::*;
195    use arrow::array::*;
196    use datafusion_common::ScalarValue;
197
198    #[test]
199    fn test_mod_int32() {
200        let left = Int32Array::from(vec![Some(10), Some(7), Some(15), None]);
201        let right = Int32Array::from(vec![Some(3), Some(2), Some(4), Some(5)]);
202
203        let left_value = ColumnarValue::Array(Arc::new(left));
204        let right_value = ColumnarValue::Array(Arc::new(right));
205
206        let result = spark_mod(&[left_value, right_value], false).unwrap();
207
208        if let ColumnarValue::Array(result_array) = result {
209            let result_int32 =
210                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
211            assert_eq!(result_int32.value(0), 1); // 10 % 3 = 1
212            assert_eq!(result_int32.value(1), 1); // 7 % 2 = 1
213            assert_eq!(result_int32.value(2), 3); // 15 % 4 = 3
214            assert!(result_int32.is_null(3)); // None % 5 = None
215        } else {
216            panic!("Expected array result");
217        }
218    }
219
220    #[test]
221    fn test_mod_int64() {
222        let left = Int64Array::from(vec![Some(100), Some(50), Some(200)]);
223        let right = Int64Array::from(vec![Some(30), Some(25), Some(60)]);
224
225        let left_value = ColumnarValue::Array(Arc::new(left));
226        let right_value = ColumnarValue::Array(Arc::new(right));
227
228        let result = spark_mod(&[left_value, right_value], false).unwrap();
229
230        if let ColumnarValue::Array(result_array) = result {
231            let result_int64 =
232                result_array.as_any().downcast_ref::<Int64Array>().unwrap();
233            assert_eq!(result_int64.value(0), 10); // 100 % 30 = 10
234            assert_eq!(result_int64.value(1), 0); // 50 % 25 = 0
235            assert_eq!(result_int64.value(2), 20); // 200 % 60 = 20
236        } else {
237            panic!("Expected array result");
238        }
239    }
240
241    #[test]
242    fn test_mod_float64() {
243        let left = Float64Array::from(vec![
244            Some(10.5),
245            Some(7.2),
246            Some(15.8),
247            Some(f64::NAN),
248            Some(f64::INFINITY),
249            Some(5.0),
250            Some(5.0),
251            Some(f64::NAN),
252            Some(f64::INFINITY),
253        ]);
254        let right = Float64Array::from(vec![
255            Some(3.0),
256            Some(2.5),
257            Some(4.2),
258            Some(2.0),
259            Some(2.0),
260            Some(f64::NAN),
261            Some(f64::INFINITY),
262            Some(f64::INFINITY),
263            Some(f64::NAN),
264        ]);
265
266        let left_value = ColumnarValue::Array(Arc::new(left));
267        let right_value = ColumnarValue::Array(Arc::new(right));
268
269        let result = spark_mod(&[left_value, right_value], false).unwrap();
270
271        if let ColumnarValue::Array(result_array) = result {
272            let result_float64 = result_array
273                .as_any()
274                .downcast_ref::<Float64Array>()
275                .unwrap();
276            // Regular cases
277            assert!((result_float64.value(0) - 1.5).abs() < f64::EPSILON); // 10.5 % 3.0 = 1.5
278            assert!((result_float64.value(1) - 2.2).abs() < f64::EPSILON); // 7.2 % 2.5 = 2.2
279            assert!((result_float64.value(2) - 3.2).abs() < f64::EPSILON); // 15.8 % 4.2 = 3.2
280            // nan % 2.0 = nan
281            assert!(result_float64.value(3).is_nan());
282            // inf % 2.0 = nan (IEEE 754)
283            assert!(result_float64.value(4).is_nan());
284            // 5.0 % nan = nan
285            assert!(result_float64.value(5).is_nan());
286            // 5.0 % inf = 5.0
287            assert!((result_float64.value(6) - 5.0).abs() < f64::EPSILON);
288            // nan % inf = nan
289            assert!(result_float64.value(7).is_nan());
290            // inf % nan = nan
291            assert!(result_float64.value(8).is_nan());
292        } else {
293            panic!("Expected array result");
294        }
295    }
296
297    #[test]
298    fn test_mod_float32() {
299        let left = Float32Array::from(vec![
300            Some(10.5),
301            Some(7.2),
302            Some(15.8),
303            Some(f32::NAN),
304            Some(f32::INFINITY),
305            Some(5.0),
306            Some(5.0),
307            Some(f32::NAN),
308            Some(f32::INFINITY),
309        ]);
310        let right = Float32Array::from(vec![
311            Some(3.0),
312            Some(2.5),
313            Some(4.2),
314            Some(2.0),
315            Some(2.0),
316            Some(f32::NAN),
317            Some(f32::INFINITY),
318            Some(f32::INFINITY),
319            Some(f32::NAN),
320        ]);
321
322        let left_value = ColumnarValue::Array(Arc::new(left));
323        let right_value = ColumnarValue::Array(Arc::new(right));
324
325        let result = spark_mod(&[left_value, right_value], false).unwrap();
326
327        if let ColumnarValue::Array(result_array) = result {
328            let result_float32 = result_array
329                .as_any()
330                .downcast_ref::<Float32Array>()
331                .unwrap();
332            // Regular cases
333            assert!((result_float32.value(0) - 1.5).abs() < f32::EPSILON); // 10.5 % 3.0 = 1.5
334            assert!((result_float32.value(1) - 2.2).abs() < f32::EPSILON * 3.0); // 7.2 % 2.5 = 2.2
335            assert!((result_float32.value(2) - 3.2).abs() < f32::EPSILON * 10.0); // 15.8 % 4.2 = 3.2
336            // nan % 2.0 = nan
337            assert!(result_float32.value(3).is_nan());
338            // inf % 2.0 = nan (IEEE 754)
339            assert!(result_float32.value(4).is_nan());
340            // 5.0 % nan = nan
341            assert!(result_float32.value(5).is_nan());
342            // 5.0 % inf = 5.0
343            assert!((result_float32.value(6) - 5.0).abs() < f32::EPSILON);
344            // nan % inf = nan
345            assert!(result_float32.value(7).is_nan());
346            // inf % nan = nan
347            assert!(result_float32.value(8).is_nan());
348        } else {
349            panic!("Expected array result");
350        }
351    }
352
353    #[test]
354    fn test_mod_scalar() {
355        let left = Int32Array::from(vec![Some(10), Some(7), Some(15)]);
356        let right_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(3)));
357
358        let left_value = ColumnarValue::Array(Arc::new(left));
359
360        let result = spark_mod(&[left_value, right_value], false).unwrap();
361
362        if let ColumnarValue::Array(result_array) = result {
363            let result_int32 =
364                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
365            assert_eq!(result_int32.value(0), 1); // 10 % 3 = 1
366            assert_eq!(result_int32.value(1), 1); // 7 % 3 = 1
367            assert_eq!(result_int32.value(2), 0); // 15 % 3 = 0
368        } else {
369            panic!("Expected array result");
370        }
371    }
372
373    #[test]
374    fn test_mod_wrong_arg_count() {
375        let left = Int32Array::from(vec![Some(10)]);
376        let left_value = ColumnarValue::Array(Arc::new(left));
377
378        let result = spark_mod(&[left_value], false);
379        assert!(result.is_err());
380    }
381
382    #[test]
383    fn test_mod_zero_division_legacy() {
384        // In legacy mode (ANSI off), division by zero returns NULL per-element
385        let left = Int32Array::from(vec![Some(10), Some(7), Some(15)]);
386        let right = Int32Array::from(vec![Some(0), Some(2), Some(4)]);
387
388        let left_value = ColumnarValue::Array(Arc::new(left));
389        let right_value = ColumnarValue::Array(Arc::new(right));
390
391        let result = spark_mod(&[left_value, right_value], false).unwrap();
392
393        if let ColumnarValue::Array(result_array) = result {
394            let result_int32 =
395                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
396            assert!(result_int32.is_null(0)); // 10 % 0 = NULL
397            assert_eq!(result_int32.value(1), 1); // 7 % 2 = 1
398            assert_eq!(result_int32.value(2), 3); // 15 % 4 = 3
399        } else {
400            panic!("Expected array result");
401        }
402    }
403
404    #[test]
405    fn test_mod_zero_division_ansi() {
406        // In ANSI mode, division by zero should error
407        let left = Int32Array::from(vec![Some(10), Some(7), Some(15)]);
408        let right = Int32Array::from(vec![Some(0), Some(2), Some(4)]);
409
410        let left_value = ColumnarValue::Array(Arc::new(left));
411        let right_value = ColumnarValue::Array(Arc::new(right));
412
413        let result = spark_mod(&[left_value, right_value], true);
414        assert!(result.is_err());
415    }
416
417    // PMOD tests
418    #[test]
419    fn test_pmod_int32() {
420        let left = Int32Array::from(vec![Some(10), Some(-7), Some(15), Some(-15), None]);
421        let right = Int32Array::from(vec![Some(3), Some(3), Some(4), Some(4), Some(5)]);
422
423        let left_value = ColumnarValue::Array(Arc::new(left));
424        let right_value = ColumnarValue::Array(Arc::new(right));
425
426        let result = spark_pmod(&[left_value, right_value], false).unwrap();
427
428        if let ColumnarValue::Array(result_array) = result {
429            let result_int32 =
430                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
431            assert_eq!(result_int32.value(0), 1); // 10 pmod 3 = 1
432            assert_eq!(result_int32.value(1), 2); // -7 pmod 3 = 2 (positive remainder)
433            assert_eq!(result_int32.value(2), 3); // 15 pmod 4 = 3
434            assert_eq!(result_int32.value(3), 1); // -15 pmod 4 = 1 (positive remainder)
435            assert!(result_int32.is_null(4)); // None pmod 5 = None
436        } else {
437            panic!("Expected array result");
438        }
439    }
440
441    #[test]
442    fn test_pmod_int64() {
443        let left = Int64Array::from(vec![Some(100), Some(-50), Some(200), Some(-200)]);
444        let right = Int64Array::from(vec![Some(30), Some(30), Some(60), Some(60)]);
445
446        let left_value = ColumnarValue::Array(Arc::new(left));
447        let right_value = ColumnarValue::Array(Arc::new(right));
448
449        let result = spark_pmod(&[left_value, right_value], false).unwrap();
450
451        if let ColumnarValue::Array(result_array) = result {
452            let result_int64 =
453                result_array.as_any().downcast_ref::<Int64Array>().unwrap();
454            assert_eq!(result_int64.value(0), 10); // 100 pmod 30 = 10
455            assert_eq!(result_int64.value(1), 10); // -50 pmod 30 = 10 (positive remainder)
456            assert_eq!(result_int64.value(2), 20); // 200 pmod 60 = 20
457            assert_eq!(result_int64.value(3), 40); // -200 pmod 60 = 40 (positive remainder)
458        } else {
459            panic!("Expected array result");
460        }
461    }
462
463    #[test]
464    fn test_pmod_float64() {
465        let left = Float64Array::from(vec![
466            Some(10.5),
467            Some(-7.2),
468            Some(15.8),
469            Some(-15.8),
470            Some(f64::NAN),
471            Some(f64::INFINITY),
472            Some(5.0),
473            Some(-5.0),
474        ]);
475        let right = Float64Array::from(vec![
476            Some(3.0),
477            Some(3.0),
478            Some(4.2),
479            Some(4.2),
480            Some(2.0),
481            Some(2.0),
482            Some(f64::INFINITY),
483            Some(f64::INFINITY),
484        ]);
485
486        let left_value = ColumnarValue::Array(Arc::new(left));
487        let right_value = ColumnarValue::Array(Arc::new(right));
488
489        let result = spark_pmod(&[left_value, right_value], false).unwrap();
490
491        if let ColumnarValue::Array(result_array) = result {
492            let result_float64 = result_array
493                .as_any()
494                .downcast_ref::<Float64Array>()
495                .unwrap();
496            // Regular cases
497            assert!((result_float64.value(0) - 1.5).abs() < f64::EPSILON); // 10.5 pmod 3.0 = 1.5
498            assert!((result_float64.value(1) - 1.8).abs() < f64::EPSILON * 3.0); // -7.2 pmod 3.0 = 1.8 (positive)
499            assert!((result_float64.value(2) - 3.2).abs() < f64::EPSILON * 3.0); // 15.8 pmod 4.2 = 3.2
500            assert!((result_float64.value(3) - 1.0).abs() < f64::EPSILON * 3.0); // -15.8 pmod 4.2 = 1.0 (positive)
501            // nan pmod 2.0 = nan
502            assert!(result_float64.value(4).is_nan());
503            // inf pmod 2.0 = nan (IEEE 754)
504            assert!(result_float64.value(5).is_nan());
505            // 5.0 pmod inf = 5.0
506            assert!((result_float64.value(6) - 5.0).abs() < f64::EPSILON);
507            // -5.0 pmod inf = NaN
508            assert!(result_float64.value(7).is_nan());
509        } else {
510            panic!("Expected array result");
511        }
512    }
513
514    #[test]
515    fn test_pmod_float32() {
516        let left = Float32Array::from(vec![
517            Some(10.5),
518            Some(-7.2),
519            Some(15.8),
520            Some(-15.8),
521            Some(f32::NAN),
522            Some(f32::INFINITY),
523            Some(5.0),
524            Some(-5.0),
525        ]);
526        let right = Float32Array::from(vec![
527            Some(3.0),
528            Some(3.0),
529            Some(4.2),
530            Some(4.2),
531            Some(2.0),
532            Some(2.0),
533            Some(f32::INFINITY),
534            Some(f32::INFINITY),
535        ]);
536
537        let left_value = ColumnarValue::Array(Arc::new(left));
538        let right_value = ColumnarValue::Array(Arc::new(right));
539
540        let result = spark_pmod(&[left_value, right_value], false).unwrap();
541
542        if let ColumnarValue::Array(result_array) = result {
543            let result_float32 = result_array
544                .as_any()
545                .downcast_ref::<Float32Array>()
546                .unwrap();
547            // Regular cases
548            assert!((result_float32.value(0) - 1.5).abs() < f32::EPSILON); // 10.5 pmod 3.0 = 1.5
549            assert!((result_float32.value(1) - 1.8).abs() < f32::EPSILON * 3.0); // -7.2 pmod 3.0 = 1.8 (positive)
550            assert!((result_float32.value(2) - 3.2).abs() < f32::EPSILON * 10.0); // 15.8 pmod 4.2 = 3.2
551            assert!((result_float32.value(3) - 1.0).abs() < f32::EPSILON * 10.0); // -15.8 pmod 4.2 = 1.0 (positive)
552            // nan pmod 2.0 = nan
553            assert!(result_float32.value(4).is_nan());
554            // inf pmod 2.0 = nan (IEEE 754)
555            assert!(result_float32.value(5).is_nan());
556            // 5.0 pmod inf = 5.0
557            assert!((result_float32.value(6) - 5.0).abs() < f32::EPSILON * 10.0);
558            // -5.0 pmod inf = NaN
559            assert!(result_float32.value(7).is_nan());
560        } else {
561            panic!("Expected array result");
562        }
563    }
564
565    #[test]
566    fn test_pmod_scalar() {
567        let left = Int32Array::from(vec![Some(10), Some(-7), Some(15), Some(-15)]);
568        let right_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(3)));
569
570        let left_value = ColumnarValue::Array(Arc::new(left));
571
572        let result = spark_pmod(&[left_value, right_value], false).unwrap();
573
574        if let ColumnarValue::Array(result_array) = result {
575            let result_int32 =
576                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
577            assert_eq!(result_int32.value(0), 1); // 10 pmod 3 = 1
578            assert_eq!(result_int32.value(1), 2); // -7 pmod 3 = 2 (positive remainder)
579            assert_eq!(result_int32.value(2), 0); // 15 pmod 3 = 0
580            assert_eq!(result_int32.value(3), 0); // -15 pmod 3 = 0 (positive remainder)
581        } else {
582            panic!("Expected array result");
583        }
584    }
585
586    #[test]
587    fn test_pmod_wrong_arg_count() {
588        let left = Int32Array::from(vec![Some(10)]);
589        let left_value = ColumnarValue::Array(Arc::new(left));
590
591        let result = spark_pmod(&[left_value], false);
592        assert!(result.is_err());
593    }
594
595    #[test]
596    fn test_pmod_zero_division_legacy() {
597        // In legacy mode (ANSI off), division by zero returns NULL per-element
598        let left = Int32Array::from(vec![Some(10), Some(-7), Some(15)]);
599        let right = Int32Array::from(vec![Some(0), Some(0), Some(4)]);
600
601        let left_value = ColumnarValue::Array(Arc::new(left));
602        let right_value = ColumnarValue::Array(Arc::new(right));
603
604        let result = spark_pmod(&[left_value, right_value], false).unwrap();
605
606        if let ColumnarValue::Array(result_array) = result {
607            let result_int32 =
608                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
609            assert!(result_int32.is_null(0)); // 10 pmod 0 = NULL
610            assert!(result_int32.is_null(1)); // -7 pmod 0 = NULL
611            assert_eq!(result_int32.value(2), 3); // 15 pmod 4 = 3
612        } else {
613            panic!("Expected array result");
614        }
615    }
616
617    #[test]
618    fn test_pmod_zero_division_ansi() {
619        // In ANSI mode, division by zero should error
620        let left = Int32Array::from(vec![Some(10), Some(-7), Some(15)]);
621        let right = Int32Array::from(vec![Some(0), Some(0), Some(4)]);
622
623        let left_value = ColumnarValue::Array(Arc::new(left));
624        let right_value = ColumnarValue::Array(Arc::new(right));
625
626        let result = spark_pmod(&[left_value, right_value], true);
627        assert!(result.is_err());
628    }
629
630    #[test]
631    fn test_pmod_negative_divisor() {
632        // PMOD with negative divisor should still work like regular mod
633        let left = Int32Array::from(vec![Some(10), Some(-7), Some(15)]);
634        let right = Int32Array::from(vec![Some(-3), Some(-3), Some(-4)]);
635
636        let left_value = ColumnarValue::Array(Arc::new(left));
637        let right_value = ColumnarValue::Array(Arc::new(right));
638
639        let result = spark_pmod(&[left_value, right_value], false).unwrap();
640
641        if let ColumnarValue::Array(result_array) = result {
642            let result_int32 =
643                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
644            assert_eq!(result_int32.value(0), 1); // 10 pmod -3 = 1
645            assert_eq!(result_int32.value(1), -1); // -7 pmod -3 = -1
646            assert_eq!(result_int32.value(2), 3); // 15 pmod -4 = 3
647        } else {
648            panic!("Expected array result");
649        }
650    }
651
652    #[test]
653    fn test_pmod_edge_cases() {
654        // Test edge cases for PMOD
655        let left = Int32Array::from(vec![
656            Some(0),  // 0 pmod 5 = 0
657            Some(-1), // -1 pmod 5 = 4
658            Some(1),  // 1 pmod 5 = 1
659            Some(-5), // -5 pmod 5 = 0
660            Some(5),  // 5 pmod 5 = 0
661            Some(-6), // -6 pmod 5 = 4
662            Some(6),  // 6 pmod 5 = 1
663        ]);
664        let right = Int32Array::from(vec![
665            Some(5),
666            Some(5),
667            Some(5),
668            Some(5),
669            Some(5),
670            Some(5),
671            Some(5),
672        ]);
673
674        let left_value = ColumnarValue::Array(Arc::new(left));
675        let right_value = ColumnarValue::Array(Arc::new(right));
676
677        let result = spark_pmod(&[left_value, right_value], false).unwrap();
678
679        if let ColumnarValue::Array(result_array) = result {
680            let result_int32 =
681                result_array.as_any().downcast_ref::<Int32Array>().unwrap();
682            assert_eq!(result_int32.value(0), 0); // 0 pmod 5 = 0
683            assert_eq!(result_int32.value(1), 4); // -1 pmod 5 = 4
684            assert_eq!(result_int32.value(2), 1); // 1 pmod 5 = 1
685            assert_eq!(result_int32.value(3), 0); // -5 pmod 5 = 0
686            assert_eq!(result_int32.value(4), 0); // 5 pmod 5 = 0
687            assert_eq!(result_int32.value(5), 4); // -6 pmod 5 = 4
688            assert_eq!(result_int32.value(6), 1); // 6 pmod 5 = 1
689        } else {
690            panic!("Expected array result");
691        }
692    }
693}