Skip to main content

alimentar/transform/
numeric.rs

1//! Numeric transforms: normalization, casting, and null filling.
2
3use std::sync::Arc;
4
5use arrow::{
6    array::{Array, RecordBatch},
7    datatypes::{Field, Schema},
8};
9
10use super::Transform;
11use crate::error::{Error, Result};
12
13/// Normalization method for numeric columns.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum NormMethod {
16    /// Min-max normalization: (x - min) / (max - min), scales to [0, 1]
17    MinMax,
18    /// Z-score normalization: (x - mean) / std, centers around 0
19    ZScore,
20    /// Scale to unit length (L2 norm)
21    L2,
22}
23
24/// A transform that normalizes numeric columns.
25///
26/// Supports min-max scaling, z-score standardization, and L2 normalization.
27///
28/// # Example
29///
30/// ```ignore
31/// use alimentar::{Normalize, NormMethod};
32///
33/// // Min-max normalize specific columns
34/// let normalize = Normalize::new(vec!["feature1", "feature2"], NormMethod::MinMax);
35///
36/// // Z-score normalize all numeric columns
37/// let normalize = Normalize::all_numeric(NormMethod::ZScore);
38/// ```
39#[derive(Debug, Clone)]
40pub struct Normalize {
41    columns: Option<Vec<String>>,
42    method: NormMethod,
43}
44
45impl Normalize {
46    /// Creates a Normalize transform for specific columns.
47    pub fn new<S: Into<String>>(columns: impl IntoIterator<Item = S>, method: NormMethod) -> Self {
48        Self {
49            columns: Some(columns.into_iter().map(Into::into).collect()),
50            method,
51        }
52    }
53
54    /// Creates a Normalize transform that applies to all numeric columns.
55    pub fn all_numeric(method: NormMethod) -> Self {
56        Self {
57            columns: None,
58            method,
59        }
60    }
61
62    /// Returns the columns to normalize (None means all numeric).
63    pub fn columns(&self) -> Option<&[String]> {
64        self.columns.as_deref()
65    }
66
67    /// Returns the normalization method.
68    pub fn method(&self) -> NormMethod {
69        self.method
70    }
71
72    fn is_numeric_type(dtype: &arrow::datatypes::DataType) -> bool {
73        use arrow::datatypes::DataType;
74        matches!(
75            dtype,
76            DataType::Int8
77                | DataType::Int16
78                | DataType::Int32
79                | DataType::Int64
80                | DataType::UInt8
81                | DataType::UInt16
82                | DataType::UInt32
83                | DataType::UInt64
84                | DataType::Float16
85                | DataType::Float32
86                | DataType::Float64
87        )
88    }
89
90    fn normalize_array(
91        &self,
92        array: &dyn Array,
93        _dtype: &arrow::datatypes::DataType,
94    ) -> Result<Arc<dyn Array>> {
95        use arrow::{array::Float64Array, compute::cast, datatypes::DataType};
96
97        // Cast to Float64 for normalization
98        let float_array = cast(array, &DataType::Float64)
99            .map_err(|e| Error::transform(format!("Failed to cast to Float64: {}", e)))?;
100
101        let float_values = float_array
102            .as_any()
103            .downcast_ref::<Float64Array>()
104            .ok_or_else(|| Error::transform("Expected Float64Array after cast"))?;
105
106        let normalized = match self.method {
107            NormMethod::MinMax => Self::min_max_normalize(float_values),
108            NormMethod::ZScore => Self::zscore_normalize(float_values),
109            NormMethod::L2 => Self::l2_normalize(float_values),
110        };
111
112        // Normalized values are always Float64 (they're now in [0,1] or centered)
113        Ok(Arc::new(normalized))
114    }
115
116    fn min_max_normalize(array: &arrow::array::Float64Array) -> arrow::array::Float64Array {
117        let mut min = f64::INFINITY;
118        let mut max = f64::NEG_INFINITY;
119
120        for i in 0..array.len() {
121            if !array.is_null(i) {
122                let v = array.value(i);
123                if v < min {
124                    min = v;
125                }
126                if v > max {
127                    max = v;
128                }
129            }
130        }
131
132        let range = max - min;
133        let values: Vec<Option<f64>> = (0..array.len())
134            .map(|i| {
135                if array.is_null(i) {
136                    None
137                } else if range == 0.0 {
138                    Some(0.0)
139                } else {
140                    Some((array.value(i) - min) / range)
141                }
142            })
143            .collect();
144
145        arrow::array::Float64Array::from(values)
146    }
147
148    #[allow(clippy::cast_precision_loss)]
149    fn zscore_normalize(array: &arrow::array::Float64Array) -> arrow::array::Float64Array {
150        let mut sum = 0.0;
151        let mut count = 0usize;
152
153        for i in 0..array.len() {
154            if !array.is_null(i) {
155                sum += array.value(i);
156                count += 1;
157            }
158        }
159
160        if count == 0 {
161            return array.clone();
162        }
163
164        let mean = sum / count as f64;
165
166        let mut variance_sum = 0.0;
167        for i in 0..array.len() {
168            if !array.is_null(i) {
169                let diff = array.value(i) - mean;
170                variance_sum += diff * diff;
171            }
172        }
173
174        let std = (variance_sum / count as f64).sqrt();
175
176        let values: Vec<Option<f64>> = (0..array.len())
177            .map(|i| {
178                if array.is_null(i) {
179                    None
180                } else if std == 0.0 {
181                    Some(0.0)
182                } else {
183                    Some((array.value(i) - mean) / std)
184                }
185            })
186            .collect();
187
188        arrow::array::Float64Array::from(values)
189    }
190
191    fn l2_normalize(array: &arrow::array::Float64Array) -> arrow::array::Float64Array {
192        let mut sum_sq = 0.0;
193
194        for i in 0..array.len() {
195            if !array.is_null(i) {
196                let v = array.value(i);
197                sum_sq += v * v;
198            }
199        }
200
201        let norm = sum_sq.sqrt();
202
203        let values: Vec<Option<f64>> = (0..array.len())
204            .map(|i| {
205                if array.is_null(i) {
206                    None
207                } else if norm == 0.0 {
208                    Some(0.0)
209                } else {
210                    Some(array.value(i) / norm)
211                }
212            })
213            .collect();
214
215        arrow::array::Float64Array::from(values)
216    }
217}
218
219impl Transform for Normalize {
220    fn apply(&self, batch: RecordBatch) -> Result<RecordBatch> {
221        let schema = batch.schema();
222
223        let columns_to_normalize: std::collections::HashSet<&str> = match &self.columns {
224            Some(cols) => cols.iter().map(String::as_str).collect(),
225            None => schema
226                .fields()
227                .iter()
228                .filter(|f| Self::is_numeric_type(f.data_type()))
229                .map(|f| f.name().as_str())
230                .collect(),
231        };
232
233        let mut fields = Vec::with_capacity(schema.fields().len());
234        let mut arrays = Vec::with_capacity(schema.fields().len());
235
236        for (idx, field) in schema.fields().iter().enumerate() {
237            let col = batch.column(idx);
238
239            if columns_to_normalize.contains(field.name().as_str()) {
240                if !Self::is_numeric_type(field.data_type()) {
241                    return Err(Error::transform(format!(
242                        "Column '{}' is not numeric (type: {:?})",
243                        field.name(),
244                        field.data_type()
245                    )));
246                }
247
248                let normalized = self.normalize_array(col.as_ref(), field.data_type())?;
249                // Normalized columns become Float64
250                fields.push(Field::new(
251                    field.name(),
252                    arrow::datatypes::DataType::Float64,
253                    field.is_nullable(),
254                ));
255                arrays.push(normalized);
256            } else {
257                fields.push(field.as_ref().clone());
258                arrays.push(Arc::clone(col));
259            }
260        }
261
262        let new_schema = Arc::new(Schema::new(fields));
263        RecordBatch::try_new(new_schema, arrays).map_err(Error::Arrow)
264    }
265}
266
267/// A transform that casts columns to different data types.
268///
269/// # Example
270///
271/// ```ignore
272/// use alimentar::Cast;
273/// use arrow::datatypes::DataType;
274///
275/// let cast = Cast::new(vec![
276///     ("id", DataType::Int64),
277///     ("value", DataType::Float64),
278/// ]);
279/// ```
280#[derive(Debug, Clone)]
281pub struct Cast {
282    mappings: Vec<(String, arrow::datatypes::DataType)>,
283}
284
285impl Cast {
286    /// Creates a new Cast transform with column-to-type mappings.
287    pub fn new<S: Into<String>>(
288        mappings: impl IntoIterator<Item = (S, arrow::datatypes::DataType)>,
289    ) -> Self {
290        Self {
291            mappings: mappings
292                .into_iter()
293                .map(|(name, dtype)| (name.into(), dtype))
294                .collect(),
295        }
296    }
297
298    /// Returns the cast mappings.
299    pub fn mappings(&self) -> &[(String, arrow::datatypes::DataType)] {
300        &self.mappings
301    }
302}
303
304impl Transform for Cast {
305    fn apply(&self, batch: RecordBatch) -> Result<RecordBatch> {
306        use arrow::compute::cast;
307
308        let schema = batch.schema();
309        let cast_map: std::collections::HashMap<&str, &arrow::datatypes::DataType> =
310            self.mappings.iter().map(|(n, t)| (n.as_str(), t)).collect();
311
312        let mut fields = Vec::with_capacity(schema.fields().len());
313        let mut arrays = Vec::with_capacity(schema.fields().len());
314
315        for (idx, field) in schema.fields().iter().enumerate() {
316            let col = batch.column(idx);
317
318            if let Some(&target_type) = cast_map.get(field.name().as_str()) {
319                let casted = cast(col.as_ref(), target_type).map_err(|e| {
320                    Error::transform(format!(
321                        "Failed to cast column '{}' to {:?}: {}",
322                        field.name(),
323                        target_type,
324                        e
325                    ))
326                })?;
327                fields.push(Field::new(
328                    field.name(),
329                    target_type.clone(),
330                    field.is_nullable(),
331                ));
332                arrays.push(casted);
333            } else {
334                fields.push(field.as_ref().clone());
335                arrays.push(Arc::clone(col));
336            }
337        }
338
339        let new_schema = Arc::new(Schema::new(fields));
340        RecordBatch::try_new(new_schema, arrays).map_err(Error::Arrow)
341    }
342}
343
344/// Strategy for filling null values.
345#[derive(Debug, Clone)]
346pub enum FillStrategy {
347    /// Fill with a constant integer value
348    Int(i64),
349    /// Fill with a constant float value
350    Float(f64),
351    /// Fill with a constant string value
352    String(String),
353    /// Fill with a constant boolean value
354    Bool(bool),
355    /// Fill with zero (works for numeric types)
356    Zero,
357    /// Forward fill (use previous non-null value)
358    Forward,
359    /// Backward fill (use next non-null value)
360    Backward,
361}
362
363/// A transform that fills null values in specified columns.
364///
365/// # Example
366///
367/// ```ignore
368/// use alimentar::{FillNull, FillStrategy};
369///
370/// // Fill nulls in "age" column with 0
371/// let fill = FillNull::new("age", FillStrategy::Zero);
372///
373/// // Fill nulls with a specific value
374/// let fill = FillNull::new("score", FillStrategy::Float(0.0));
375///
376/// // Forward fill (use previous value)
377/// let fill = FillNull::new("value", FillStrategy::Forward);
378/// ```
379#[derive(Debug, Clone)]
380pub struct FillNull {
381    column: String,
382    strategy: FillStrategy,
383}
384
385impl FillNull {
386    /// Creates a FillNull transform for the specified column.
387    pub fn new<S: Into<String>>(column: S, strategy: FillStrategy) -> Self {
388        Self {
389            column: column.into(),
390            strategy,
391        }
392    }
393
394    /// Creates a FillNull transform that fills with zero.
395    pub fn with_zero<S: Into<String>>(column: S) -> Self {
396        Self::new(column, FillStrategy::Zero)
397    }
398
399    /// Returns the column name.
400    pub fn column(&self) -> &str {
401        &self.column
402    }
403
404    /// Returns the fill strategy.
405    pub fn strategy(&self) -> &FillStrategy {
406        &self.strategy
407    }
408
409    fn fill_i32_array(col: &Arc<dyn Array>, fill_value: i32) -> arrow::array::Int32Array {
410        use arrow::array::Int32Array;
411        let arr = col.as_any().downcast_ref::<Int32Array>();
412        if let Some(arr) = arr {
413            let values: Vec<i32> = (0..arr.len())
414                .map(|i| {
415                    if arr.is_null(i) {
416                        fill_value
417                    } else {
418                        arr.value(i)
419                    }
420                })
421                .collect();
422            Int32Array::from(values)
423        } else {
424            Int32Array::from(Vec::<i32>::new())
425        }
426    }
427
428    fn fill_i64_array(col: &Arc<dyn Array>, fill_value: i64) -> arrow::array::Int64Array {
429        use arrow::array::Int64Array;
430        let arr = col.as_any().downcast_ref::<Int64Array>();
431        if let Some(arr) = arr {
432            let values: Vec<i64> = (0..arr.len())
433                .map(|i| {
434                    if arr.is_null(i) {
435                        fill_value
436                    } else {
437                        arr.value(i)
438                    }
439                })
440                .collect();
441            Int64Array::from(values)
442        } else {
443            Int64Array::from(Vec::<i64>::new())
444        }
445    }
446
447    fn fill_f32_array(col: &Arc<dyn Array>, fill_value: f32) -> arrow::array::Float32Array {
448        use arrow::array::Float32Array;
449        let arr = col.as_any().downcast_ref::<Float32Array>();
450        if let Some(arr) = arr {
451            let values: Vec<f32> = (0..arr.len())
452                .map(|i| {
453                    if arr.is_null(i) {
454                        fill_value
455                    } else {
456                        arr.value(i)
457                    }
458                })
459                .collect();
460            Float32Array::from(values)
461        } else {
462            Float32Array::from(Vec::<f32>::new())
463        }
464    }
465
466    fn fill_f64_array(col: &Arc<dyn Array>, fill_value: f64) -> arrow::array::Float64Array {
467        use arrow::array::Float64Array;
468        let arr = col.as_any().downcast_ref::<Float64Array>();
469        if let Some(arr) = arr {
470            let values: Vec<f64> = (0..arr.len())
471                .map(|i| {
472                    if arr.is_null(i) {
473                        fill_value
474                    } else {
475                        arr.value(i)
476                    }
477                })
478                .collect();
479            Float64Array::from(values)
480        } else {
481            Float64Array::from(Vec::<f64>::new())
482        }
483    }
484
485    fn fill_string_array(col: &Arc<dyn Array>, fill_value: &str) -> arrow::array::StringArray {
486        use arrow::array::StringArray;
487        let arr = col.as_any().downcast_ref::<StringArray>();
488        if let Some(arr) = arr {
489            let values: Vec<&str> = (0..arr.len())
490                .map(|i| {
491                    if arr.is_null(i) {
492                        fill_value
493                    } else {
494                        arr.value(i)
495                    }
496                })
497                .collect();
498            StringArray::from(values)
499        } else {
500            StringArray::from(Vec::<&str>::new())
501        }
502    }
503
504    fn fill_bool_array(col: &Arc<dyn Array>, fill_value: bool) -> arrow::array::BooleanArray {
505        use arrow::array::BooleanArray;
506        let arr = col.as_any().downcast_ref::<BooleanArray>();
507        if let Some(arr) = arr {
508            let values: Vec<bool> = (0..arr.len())
509                .map(|i| {
510                    if arr.is_null(i) {
511                        fill_value
512                    } else {
513                        arr.value(i)
514                    }
515                })
516                .collect();
517            BooleanArray::from(values)
518        } else {
519            BooleanArray::from(Vec::<bool>::new())
520        }
521    }
522
523    fn forward_fill_i32(col: &Arc<dyn Array>) -> arrow::array::Int32Array {
524        use arrow::array::Int32Array;
525        let arr = col.as_any().downcast_ref::<Int32Array>();
526        if let Some(arr) = arr {
527            let mut last: Option<i32> = None;
528            let values: Vec<Option<i32>> = (0..arr.len())
529                .map(|i| {
530                    if arr.is_null(i) {
531                        last
532                    } else {
533                        let v = arr.value(i);
534                        last = Some(v);
535                        Some(v)
536                    }
537                })
538                .collect();
539            Int32Array::from(values)
540        } else {
541            Int32Array::from(Vec::<i32>::new())
542        }
543    }
544
545    fn forward_fill_i64(col: &Arc<dyn Array>) -> arrow::array::Int64Array {
546        use arrow::array::Int64Array;
547        let arr = col.as_any().downcast_ref::<Int64Array>();
548        if let Some(arr) = arr {
549            let mut last: Option<i64> = None;
550            let values: Vec<Option<i64>> = (0..arr.len())
551                .map(|i| {
552                    if arr.is_null(i) {
553                        last
554                    } else {
555                        let v = arr.value(i);
556                        last = Some(v);
557                        Some(v)
558                    }
559                })
560                .collect();
561            Int64Array::from(values)
562        } else {
563            Int64Array::from(Vec::<i64>::new())
564        }
565    }
566
567    fn forward_fill_f64(col: &Arc<dyn Array>) -> arrow::array::Float64Array {
568        use arrow::array::Float64Array;
569        let arr = col.as_any().downcast_ref::<Float64Array>();
570        if let Some(arr) = arr {
571            let mut last: Option<f64> = None;
572            let values: Vec<Option<f64>> = (0..arr.len())
573                .map(|i| {
574                    if arr.is_null(i) {
575                        last
576                    } else {
577                        let v = arr.value(i);
578                        last = Some(v);
579                        Some(v)
580                    }
581                })
582                .collect();
583            Float64Array::from(values)
584        } else {
585            Float64Array::from(Vec::<f64>::new())
586        }
587    }
588
589    fn backward_fill_i32(col: &Arc<dyn Array>) -> arrow::array::Int32Array {
590        use arrow::array::Int32Array;
591        let arr = col.as_any().downcast_ref::<Int32Array>();
592        if let Some(arr) = arr {
593            let mut next: Option<i32> = None;
594            let mut values: Vec<Option<i32>> = (0..arr.len())
595                .rev()
596                .map(|i| {
597                    if arr.is_null(i) {
598                        next
599                    } else {
600                        let v = arr.value(i);
601                        next = Some(v);
602                        Some(v)
603                    }
604                })
605                .collect();
606            values.reverse();
607            Int32Array::from(values)
608        } else {
609            Int32Array::from(Vec::<i32>::new())
610        }
611    }
612
613    fn backward_fill_i64(col: &Arc<dyn Array>) -> arrow::array::Int64Array {
614        use arrow::array::Int64Array;
615        let arr = col.as_any().downcast_ref::<Int64Array>();
616        if let Some(arr) = arr {
617            let mut next: Option<i64> = None;
618            let mut values: Vec<Option<i64>> = (0..arr.len())
619                .rev()
620                .map(|i| {
621                    if arr.is_null(i) {
622                        next
623                    } else {
624                        let v = arr.value(i);
625                        next = Some(v);
626                        Some(v)
627                    }
628                })
629                .collect();
630            values.reverse();
631            Int64Array::from(values)
632        } else {
633            Int64Array::from(Vec::<i64>::new())
634        }
635    }
636
637    fn backward_fill_f64(col: &Arc<dyn Array>) -> arrow::array::Float64Array {
638        use arrow::array::Float64Array;
639        let arr = col.as_any().downcast_ref::<Float64Array>();
640        if let Some(arr) = arr {
641            let mut next: Option<f64> = None;
642            let mut values: Vec<Option<f64>> = (0..arr.len())
643                .rev()
644                .map(|i| {
645                    if arr.is_null(i) {
646                        next
647                    } else {
648                        let v = arr.value(i);
649                        next = Some(v);
650                        Some(v)
651                    }
652                })
653                .collect();
654            values.reverse();
655            Float64Array::from(values)
656        } else {
657            Float64Array::from(Vec::<f64>::new())
658        }
659    }
660}
661
662impl Transform for FillNull {
663    #[allow(clippy::cast_possible_truncation)]
664    fn apply(&self, batch: RecordBatch) -> Result<RecordBatch> {
665        use arrow::datatypes::DataType;
666
667        let schema = batch.schema();
668        let (col_idx, field) = schema
669            .column_with_name(&self.column)
670            .ok_or_else(|| Error::column_not_found(&self.column))?;
671
672        let mut arrays: Vec<Arc<dyn Array>> = batch.columns().to_vec();
673        let col = batch.column(col_idx);
674
675        let filled: Arc<dyn Array> = match (field.data_type(), &self.strategy) {
676            (DataType::Int32, FillStrategy::Int(v)) => {
677                Arc::new(Self::fill_i32_array(col, *v as i32))
678            }
679            (DataType::Int64, FillStrategy::Int(v)) => Arc::new(Self::fill_i64_array(col, *v)),
680            (DataType::Float32, FillStrategy::Float(v)) => {
681                Arc::new(Self::fill_f32_array(col, *v as f32))
682            }
683            (DataType::Float64, FillStrategy::Float(v)) => Arc::new(Self::fill_f64_array(col, *v)),
684            (DataType::Int32, FillStrategy::Zero) => Arc::new(Self::fill_i32_array(col, 0)),
685            (DataType::Int64, FillStrategy::Zero) => Arc::new(Self::fill_i64_array(col, 0)),
686            (DataType::Float32, FillStrategy::Zero) => Arc::new(Self::fill_f32_array(col, 0.0)),
687            (DataType::Float64, FillStrategy::Zero) => Arc::new(Self::fill_f64_array(col, 0.0)),
688            (DataType::Utf8, FillStrategy::String(s)) => Arc::new(Self::fill_string_array(col, s)),
689            (DataType::Boolean, FillStrategy::Bool(b)) => Arc::new(Self::fill_bool_array(col, *b)),
690            (DataType::Int32, FillStrategy::Forward) => Arc::new(Self::forward_fill_i32(col)),
691            (DataType::Int64, FillStrategy::Forward) => Arc::new(Self::forward_fill_i64(col)),
692            (DataType::Float64, FillStrategy::Forward) => Arc::new(Self::forward_fill_f64(col)),
693            (DataType::Int32, FillStrategy::Backward) => Arc::new(Self::backward_fill_i32(col)),
694            (DataType::Int64, FillStrategy::Backward) => Arc::new(Self::backward_fill_i64(col)),
695            (DataType::Float64, FillStrategy::Backward) => Arc::new(Self::backward_fill_f64(col)),
696            _ => {
697                return Err(Error::transform(format!(
698                    "Unsupported type {:?} for fill strategy {:?}",
699                    field.data_type(),
700                    self.strategy
701                )));
702            }
703        };
704
705        arrays[col_idx] = filled;
706        RecordBatch::try_new(schema, arrays).map_err(Error::Arrow)
707    }
708}
709
710#[cfg(test)]
711#[allow(
712    clippy::float_cmp,
713    clippy::cast_precision_loss,
714    clippy::redundant_closure
715)]
716mod tests {
717    use arrow::{
718        array::{Int32Array, StringArray},
719        datatypes::DataType,
720    };
721
722    use super::*;
723
724    fn create_test_batch() -> RecordBatch {
725        let schema = Arc::new(Schema::new(vec![
726            Field::new("id", DataType::Int32, false),
727            Field::new("name", DataType::Utf8, false),
728            Field::new("value", DataType::Int32, false),
729        ]));
730
731        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
732        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
733        let value_array = Int32Array::from(vec![10, 20, 30, 40, 50]);
734
735        RecordBatch::try_new(
736            schema,
737            vec![
738                Arc::new(id_array),
739                Arc::new(name_array),
740                Arc::new(value_array),
741            ],
742        )
743        .ok()
744        .unwrap_or_else(|| panic!("Should create batch"))
745    }
746
747    #[test]
748    fn test_cast_transform() {
749        let batch = create_test_batch();
750        let transform = Cast::new(vec![("id", DataType::Int64), ("value", DataType::Float64)]);
751
752        let result = transform.apply(batch);
753        assert!(result.is_ok());
754        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
755
756        assert_eq!(result.schema().field(0).data_type(), &DataType::Int64);
757        assert_eq!(result.schema().field(2).data_type(), &DataType::Float64);
758    }
759
760    #[test]
761    fn test_cast_preserves_values() {
762        let batch = create_test_batch();
763        let transform = Cast::new(vec![("id", DataType::Float64)]);
764
765        let result = transform.apply(batch);
766        assert!(result.is_ok());
767        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
768
769        let col = result
770            .column(0)
771            .as_any()
772            .downcast_ref::<arrow::array::Float64Array>()
773            .unwrap_or_else(|| panic!("Should be Float64Array"));
774
775        assert_eq!(col.value(0), 1.0);
776        assert_eq!(col.value(1), 2.0);
777        assert_eq!(col.value(2), 3.0);
778    }
779
780    #[test]
781    fn test_cast_mappings_getter() {
782        let transform = Cast::new(vec![("a", DataType::Int64)]);
783        assert_eq!(transform.mappings().len(), 1);
784        assert_eq!(transform.mappings()[0].0, "a");
785        assert_eq!(transform.mappings()[0].1, DataType::Int64);
786    }
787
788    #[test]
789    fn test_cast_debug() {
790        let cast = Cast::new(vec![("col", DataType::Int64)]);
791        let debug_str = format!("{:?}", cast);
792        assert!(debug_str.contains("Cast"));
793    }
794
795    #[test]
796    fn test_cast_int_to_string() {
797        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
798        let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
799            .ok()
800            .unwrap_or_else(|| panic!("Should create batch"));
801
802        let cast = Cast::new(vec![("id", DataType::Utf8)]);
803        let result = cast.apply(batch);
804        assert!(result.is_ok());
805        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
806        assert_eq!(result.schema().field(0).data_type(), &DataType::Utf8);
807    }
808
809    #[test]
810    fn test_cast_nonexistent_column() {
811        let batch = create_test_batch();
812        let cast = Cast::new(vec![("nonexistent", DataType::Int64)]);
813        let result = cast.apply(batch.clone());
814        // Cast on nonexistent column is a no-op (doesn't error)
815        assert!(result.is_ok());
816        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
817        assert_eq!(result.num_rows(), batch.num_rows());
818    }
819
820    #[test]
821    fn test_cast_incompatible_type() {
822        // Try to cast string to int - should fail
823        let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
824        let arr = StringArray::from(vec!["hello", "world"]);
825        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
826            .ok()
827            .unwrap_or_else(|| panic!("batch"));
828
829        let cast = Cast::new(vec![("text", DataType::Int32)]);
830        let result = cast.apply(batch);
831        // Arrow cast will fail for incompatible types
832        assert!(result.is_err());
833    }
834
835    #[test]
836    fn test_normalize_minmax() {
837        let schema = Arc::new(Schema::new(vec![Field::new(
838            "value",
839            DataType::Float64,
840            false,
841        )]));
842        let values = arrow::array::Float64Array::from(vec![0.0, 50.0, 100.0]);
843        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
844            .ok()
845            .unwrap_or_else(|| panic!("Should create batch"));
846
847        let transform = Normalize::new(vec!["value"], NormMethod::MinMax);
848        let result = transform.apply(batch);
849        assert!(result.is_ok());
850        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
851
852        let col = result
853            .column(0)
854            .as_any()
855            .downcast_ref::<arrow::array::Float64Array>()
856            .unwrap_or_else(|| panic!("Should be Float64Array"));
857
858        assert!((col.value(0) - 0.0).abs() < 1e-10);
859        assert!((col.value(1) - 0.5).abs() < 1e-10);
860        assert!((col.value(2) - 1.0).abs() < 1e-10);
861    }
862
863    #[test]
864    fn test_normalize_zscore() {
865        let schema = Arc::new(Schema::new(vec![Field::new(
866            "value",
867            DataType::Float64,
868            false,
869        )]));
870        // Values with mean=0, std=1
871        let values = arrow::array::Float64Array::from(vec![-1.0, 0.0, 1.0]);
872        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
873            .ok()
874            .unwrap_or_else(|| panic!("Should create batch"));
875
876        let transform = Normalize::new(vec!["value"], NormMethod::ZScore);
877        let result = transform.apply(batch);
878        assert!(result.is_ok());
879        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
880
881        let col = result
882            .column(0)
883            .as_any()
884            .downcast_ref::<arrow::array::Float64Array>()
885            .unwrap_or_else(|| panic!("Should be Float64Array"));
886
887        // Mean should be 0 (within floating point tolerance)
888        let mean: f64 = (0..col.len()).map(|i| col.value(i)).sum::<f64>() / col.len() as f64;
889        assert!(mean.abs() < 1e-10);
890    }
891
892    #[test]
893    fn test_normalize_l2() {
894        let schema = Arc::new(Schema::new(vec![Field::new(
895            "value",
896            DataType::Float64,
897            false,
898        )]));
899        let values = arrow::array::Float64Array::from(vec![3.0, 4.0]);
900        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
901            .ok()
902            .unwrap_or_else(|| panic!("Should create batch"));
903
904        let transform = Normalize::new(vec!["value"], NormMethod::L2);
905        let result = transform.apply(batch);
906        assert!(result.is_ok());
907        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
908
909        let col = result
910            .column(0)
911            .as_any()
912            .downcast_ref::<arrow::array::Float64Array>()
913            .unwrap_or_else(|| panic!("Should be Float64Array"));
914
915        // L2 norm of [3, 4] is 5, so normalized should be [0.6, 0.8]
916        assert!((col.value(0) - 0.6).abs() < 1e-10);
917        assert!((col.value(1) - 0.8).abs() < 1e-10);
918    }
919
920    #[test]
921    fn test_normalize_all_numeric() {
922        let batch = create_test_batch();
923        let transform = Normalize::all_numeric(NormMethod::MinMax);
924
925        let result = transform.apply(batch);
926        assert!(result.is_ok());
927        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
928
929        // id and value columns should be Float64 now (normalized)
930        assert_eq!(result.schema().field(0).data_type(), &DataType::Float64);
931        assert_eq!(result.schema().field(1).data_type(), &DataType::Utf8); // name unchanged
932        assert_eq!(result.schema().field(2).data_type(), &DataType::Float64);
933    }
934
935    #[test]
936    fn test_normalize_non_numeric_error() {
937        let batch = create_test_batch();
938        let transform = Normalize::new(vec!["name"], NormMethod::MinMax);
939
940        let result = transform.apply(batch);
941        assert!(result.is_err());
942    }
943
944    #[test]
945    fn test_normalize_getters() {
946        let transform = Normalize::new(vec!["a", "b"], NormMethod::ZScore);
947        assert!(transform.columns().is_some());
948        assert_eq!(
949            transform
950                .columns()
951                .unwrap_or_else(|| panic!("Should have columns")),
952            &["a", "b"]
953        );
954        assert_eq!(transform.method(), NormMethod::ZScore);
955
956        let transform2 = Normalize::all_numeric(NormMethod::L2);
957        assert!(transform2.columns().is_none());
958        assert_eq!(transform2.method(), NormMethod::L2);
959    }
960
961    #[test]
962    fn test_normalize_constant_values() {
963        let schema = Arc::new(Schema::new(vec![Field::new(
964            "value",
965            DataType::Float64,
966            false,
967        )]));
968        // All same values - should result in 0 (no range)
969        let values = arrow::array::Float64Array::from(vec![5.0, 5.0, 5.0]);
970        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
971            .ok()
972            .unwrap_or_else(|| panic!("Should create batch"));
973
974        let transform = Normalize::new(vec!["value"], NormMethod::MinMax);
975        let result = transform.apply(batch);
976        assert!(result.is_ok());
977        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
978
979        let col = result
980            .column(0)
981            .as_any()
982            .downcast_ref::<arrow::array::Float64Array>()
983            .unwrap_or_else(|| panic!("Should be Float64Array"));
984
985        // With constant values, minmax returns 0
986        for i in 0..col.len() {
987            assert_eq!(col.value(i), 0.0);
988        }
989    }
990
991    #[test]
992    fn test_normalize_with_nulls() {
993        let schema = Arc::new(Schema::new(vec![Field::new(
994            "value",
995            DataType::Float64,
996            true,
997        )]));
998        let values = arrow::array::Float64Array::from(vec![Some(0.0), None, Some(100.0)]);
999        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1000            .ok()
1001            .unwrap_or_else(|| panic!("Should create batch"));
1002
1003        let transform = Normalize::new(vec!["value"], NormMethod::MinMax);
1004        let result = transform.apply(batch);
1005        assert!(result.is_ok());
1006        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1007
1008        let col = result
1009            .column(0)
1010            .as_any()
1011            .downcast_ref::<arrow::array::Float64Array>()
1012            .unwrap_or_else(|| panic!("Should be Float64Array"));
1013
1014        assert!((col.value(0) - 0.0).abs() < 1e-10);
1015        assert!(col.is_null(1));
1016        assert!((col.value(2) - 1.0).abs() < 1e-10);
1017    }
1018
1019    #[test]
1020    fn test_normalize_debug() {
1021        let normalize = Normalize::new(vec!["col"], NormMethod::MinMax);
1022        let debug_str = format!("{:?}", normalize);
1023        assert!(debug_str.contains("Normalize"));
1024    }
1025
1026    #[test]
1027    fn test_normalize_nonexistent_column() {
1028        let batch = create_test_batch();
1029        let normalize = Normalize::new(["nonexistent"], NormMethod::MinMax);
1030        let result = normalize.apply(batch);
1031        // Normalizing nonexistent column returns Ok (skips the column)
1032        assert!(result.is_ok());
1033    }
1034
1035    // FillNull tests
1036
1037    #[test]
1038    fn test_fillnull_with_zero_i32() {
1039        let schema = Arc::new(Schema::new(vec![Field::new(
1040            "value",
1041            DataType::Int32,
1042            true,
1043        )]));
1044        let values = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
1045        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1046            .ok()
1047            .unwrap_or_else(|| panic!("Should create batch"));
1048
1049        let transform = FillNull::with_zero("value");
1050        let result = transform.apply(batch);
1051        assert!(result.is_ok());
1052        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1053
1054        let col = result
1055            .column(0)
1056            .as_any()
1057            .downcast_ref::<Int32Array>()
1058            .unwrap_or_else(|| panic!("Should be Int32Array"));
1059
1060        assert_eq!(col.value(0), 1);
1061        assert_eq!(col.value(1), 0); // Was null, now 0
1062        assert_eq!(col.value(2), 3);
1063        assert_eq!(col.value(3), 0); // Was null, now 0
1064        assert_eq!(col.value(4), 5);
1065    }
1066
1067    #[test]
1068    fn test_fillnull_with_int_value() {
1069        let schema = Arc::new(Schema::new(vec![Field::new(
1070            "value",
1071            DataType::Int32,
1072            true,
1073        )]));
1074        let values = Int32Array::from(vec![Some(1), None, Some(3)]);
1075        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1076            .ok()
1077            .unwrap_or_else(|| panic!("Should create batch"));
1078
1079        let transform = FillNull::new("value", FillStrategy::Int(-1));
1080        let result = transform.apply(batch);
1081        assert!(result.is_ok());
1082        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1083
1084        let col = result
1085            .column(0)
1086            .as_any()
1087            .downcast_ref::<Int32Array>()
1088            .unwrap_or_else(|| panic!("Should be Int32Array"));
1089
1090        assert_eq!(col.value(1), -1);
1091    }
1092
1093    #[test]
1094    fn test_fillnull_with_float() {
1095        let schema = Arc::new(Schema::new(vec![Field::new(
1096            "value",
1097            DataType::Float64,
1098            true,
1099        )]));
1100        let values = arrow::array::Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1101        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1102            .ok()
1103            .unwrap_or_else(|| panic!("Should create batch"));
1104
1105        let transform = FillNull::new("value", FillStrategy::Float(99.9));
1106        let result = transform.apply(batch);
1107        assert!(result.is_ok());
1108        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1109
1110        let col = result
1111            .column(0)
1112            .as_any()
1113            .downcast_ref::<arrow::array::Float64Array>()
1114            .unwrap_or_else(|| panic!("Should be Float64Array"));
1115
1116        assert!((col.value(1) - 99.9).abs() < f64::EPSILON);
1117    }
1118
1119    #[test]
1120    fn test_fillnull_with_string() {
1121        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1122        let values = StringArray::from(vec![Some("a"), None, Some("c")]);
1123        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1124            .ok()
1125            .unwrap_or_else(|| panic!("Should create batch"));
1126
1127        let transform = FillNull::new("name", FillStrategy::String("unknown".to_string()));
1128        let result = transform.apply(batch);
1129        assert!(result.is_ok());
1130        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1131
1132        let col = result
1133            .column(0)
1134            .as_any()
1135            .downcast_ref::<StringArray>()
1136            .unwrap_or_else(|| panic!("Should be StringArray"));
1137
1138        assert_eq!(col.value(1), "unknown");
1139    }
1140
1141    #[test]
1142    fn test_fillnull_forward_fill() {
1143        let schema = Arc::new(Schema::new(vec![Field::new(
1144            "value",
1145            DataType::Int32,
1146            true,
1147        )]));
1148        let values = Int32Array::from(vec![Some(1), None, None, Some(4), None]);
1149        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1150            .ok()
1151            .unwrap_or_else(|| panic!("Should create batch"));
1152
1153        let transform = FillNull::new("value", FillStrategy::Forward);
1154        let result = transform.apply(batch);
1155        assert!(result.is_ok());
1156        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1157
1158        let col = result
1159            .column(0)
1160            .as_any()
1161            .downcast_ref::<Int32Array>()
1162            .unwrap_or_else(|| panic!("Should be Int32Array"));
1163
1164        assert_eq!(col.value(0), 1);
1165        assert_eq!(col.value(1), 1); // Forward filled from index 0
1166        assert_eq!(col.value(2), 1); // Forward filled from index 0
1167        assert_eq!(col.value(3), 4);
1168        assert_eq!(col.value(4), 4); // Forward filled from index 3
1169    }
1170
1171    #[test]
1172    fn test_fillnull_backward_fill() {
1173        let schema = Arc::new(Schema::new(vec![Field::new(
1174            "value",
1175            DataType::Int32,
1176            true,
1177        )]));
1178        let values = Int32Array::from(vec![None, None, Some(3), None, Some(5)]);
1179        let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1180            .ok()
1181            .unwrap_or_else(|| panic!("Should create batch"));
1182
1183        let transform = FillNull::new("value", FillStrategy::Backward);
1184        let result = transform.apply(batch);
1185        assert!(result.is_ok());
1186        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1187
1188        let col = result
1189            .column(0)
1190            .as_any()
1191            .downcast_ref::<Int32Array>()
1192            .unwrap_or_else(|| panic!("Should be Int32Array"));
1193
1194        assert_eq!(col.value(0), 3); // Backward filled from index 2
1195        assert_eq!(col.value(1), 3); // Backward filled from index 2
1196        assert_eq!(col.value(2), 3);
1197        assert_eq!(col.value(3), 5); // Backward filled from index 4
1198        assert_eq!(col.value(4), 5);
1199    }
1200
1201    #[test]
1202    fn test_fillnull_column_not_found() {
1203        let batch = create_test_batch();
1204        let transform = FillNull::with_zero("nonexistent");
1205
1206        let result = transform.apply(batch);
1207        assert!(result.is_err());
1208    }
1209
1210    #[test]
1211    fn test_fillnull_getters() {
1212        let transform = FillNull::new("col", FillStrategy::Int(42));
1213        assert_eq!(transform.column(), "col");
1214        assert!(matches!(transform.strategy(), FillStrategy::Int(42)));
1215    }
1216
1217    #[test]
1218    fn test_fillnull_debug() {
1219        let fillnull = FillNull::new("col", FillStrategy::Zero);
1220        let debug_str = format!("{:?}", fillnull);
1221        assert!(debug_str.contains("FillNull"));
1222    }
1223
1224    #[test]
1225    fn test_fillnull_with_int64() {
1226        use arrow::array::Int64Array;
1227
1228        let schema = Arc::new(Schema::new(vec![Field::new(
1229            "value",
1230            DataType::Int64,
1231            true,
1232        )]));
1233        let arr = Int64Array::from(vec![Some(1i64), None, Some(3i64), None, Some(5i64)]);
1234        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1235            .ok()
1236            .unwrap_or_else(|| panic!("batch"));
1237
1238        let fillnull = FillNull::new("value", FillStrategy::Int(42));
1239        let result = fillnull.apply(batch);
1240        assert!(result.is_ok());
1241        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1242        let col = result
1243            .column(0)
1244            .as_any()
1245            .downcast_ref::<Int64Array>()
1246            .ok_or_else(|| panic!("cast"))
1247            .ok()
1248            .unwrap_or_else(|| panic!("cast"));
1249        assert_eq!(col.value(1), 42);
1250        assert_eq!(col.value(3), 42);
1251    }
1252
1253    #[test]
1254    fn test_fillnull_with_float32() {
1255        use arrow::array::Float32Array;
1256
1257        let schema = Arc::new(Schema::new(vec![Field::new(
1258            "value",
1259            DataType::Float32,
1260            true,
1261        )]));
1262        let arr = Float32Array::from(vec![Some(1.0f32), None, Some(3.0f32)]);
1263        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1264            .ok()
1265            .unwrap_or_else(|| panic!("batch"));
1266
1267        let fillnull = FillNull::new("value", FillStrategy::Float(2.5));
1268        let result = fillnull.apply(batch);
1269        assert!(result.is_ok());
1270        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1271        let col = result
1272            .column(0)
1273            .as_any()
1274            .downcast_ref::<Float32Array>()
1275            .ok_or_else(|| panic!("cast"))
1276            .ok()
1277            .unwrap_or_else(|| panic!("cast"));
1278        assert!((col.value(1) - 2.5f32).abs() < 0.001);
1279    }
1280
1281    #[test]
1282    fn test_fillnull_with_bool() {
1283        use arrow::array::BooleanArray;
1284
1285        let schema = Arc::new(Schema::new(vec![Field::new(
1286            "flag",
1287            DataType::Boolean,
1288            true,
1289        )]));
1290        let arr = BooleanArray::from(vec![Some(true), None, Some(false)]);
1291        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1292            .ok()
1293            .unwrap_or_else(|| panic!("batch"));
1294
1295        let fillnull = FillNull::new("flag", FillStrategy::Bool(true));
1296        let result = fillnull.apply(batch);
1297        assert!(result.is_ok());
1298        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1299        let col = result
1300            .column(0)
1301            .as_any()
1302            .downcast_ref::<BooleanArray>()
1303            .ok_or_else(|| panic!("cast"))
1304            .ok()
1305            .unwrap_or_else(|| panic!("cast"));
1306        assert!(col.value(1));
1307    }
1308
1309    #[test]
1310    fn test_fillnull_zero_int64() {
1311        use arrow::array::Int64Array;
1312
1313        let schema = Arc::new(Schema::new(vec![Field::new(
1314            "value",
1315            DataType::Int64,
1316            true,
1317        )]));
1318        let arr = Int64Array::from(vec![Some(1i64), None, Some(3i64)]);
1319        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1320            .ok()
1321            .unwrap_or_else(|| panic!("batch"));
1322
1323        let fillnull = FillNull::new("value", FillStrategy::Zero);
1324        let result = fillnull.apply(batch);
1325        assert!(result.is_ok());
1326        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1327        let col = result
1328            .column(0)
1329            .as_any()
1330            .downcast_ref::<Int64Array>()
1331            .ok_or_else(|| panic!("cast"))
1332            .ok()
1333            .unwrap_or_else(|| panic!("cast"));
1334        assert_eq!(col.value(1), 0);
1335    }
1336
1337    #[test]
1338    fn test_fillnull_zero_float32() {
1339        use arrow::array::Float32Array;
1340
1341        let schema = Arc::new(Schema::new(vec![Field::new(
1342            "value",
1343            DataType::Float32,
1344            true,
1345        )]));
1346        let arr = Float32Array::from(vec![Some(1.0f32), None, Some(3.0f32)]);
1347        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1348            .ok()
1349            .unwrap_or_else(|| panic!("batch"));
1350
1351        let fillnull = FillNull::new("value", FillStrategy::Zero);
1352        let result = fillnull.apply(batch);
1353        assert!(result.is_ok());
1354        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1355        let col = result
1356            .column(0)
1357            .as_any()
1358            .downcast_ref::<Float32Array>()
1359            .ok_or_else(|| panic!("cast"))
1360            .ok()
1361            .unwrap_or_else(|| panic!("cast"));
1362        assert_eq!(col.value(1), 0.0f32);
1363    }
1364
1365    #[test]
1366    fn test_fillnull_zero_float64() {
1367        use arrow::array::Float64Array;
1368
1369        let schema = Arc::new(Schema::new(vec![Field::new(
1370            "value",
1371            DataType::Float64,
1372            true,
1373        )]));
1374        let arr = Float64Array::from(vec![Some(1.0f64), None, Some(3.0f64)]);
1375        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1376            .ok()
1377            .unwrap_or_else(|| panic!("batch"));
1378
1379        let fillnull = FillNull::new("value", FillStrategy::Zero);
1380        let result = fillnull.apply(batch);
1381        assert!(result.is_ok());
1382        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1383        let col = result
1384            .column(0)
1385            .as_any()
1386            .downcast_ref::<Float64Array>()
1387            .ok_or_else(|| panic!("cast"))
1388            .ok()
1389            .unwrap_or_else(|| panic!("cast"));
1390        assert_eq!(col.value(1), 0.0f64);
1391    }
1392
1393    #[test]
1394    fn test_fillnull_forward_fill_int64() {
1395        use arrow::array::Int64Array;
1396
1397        let schema = Arc::new(Schema::new(vec![Field::new(
1398            "value",
1399            DataType::Int64,
1400            true,
1401        )]));
1402        let arr = Int64Array::from(vec![Some(1i64), None, None, Some(4i64)]);
1403        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1404            .ok()
1405            .unwrap_or_else(|| panic!("batch"));
1406
1407        let fillnull = FillNull::new("value", FillStrategy::Forward);
1408        let result = fillnull.apply(batch);
1409        assert!(result.is_ok());
1410        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1411        let col = result
1412            .column(0)
1413            .as_any()
1414            .downcast_ref::<Int64Array>()
1415            .ok_or_else(|| panic!("cast"))
1416            .ok()
1417            .unwrap_or_else(|| panic!("cast"));
1418        assert_eq!(col.value(1), 1);
1419        assert_eq!(col.value(2), 1);
1420    }
1421
1422    #[test]
1423    fn test_fillnull_backward_fill_int64() {
1424        use arrow::array::Int64Array;
1425
1426        let schema = Arc::new(Schema::new(vec![Field::new(
1427            "value",
1428            DataType::Int64,
1429            true,
1430        )]));
1431        let arr = Int64Array::from(vec![Some(1i64), None, None, Some(4i64)]);
1432        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1433            .ok()
1434            .unwrap_or_else(|| panic!("batch"));
1435
1436        let fillnull = FillNull::new("value", FillStrategy::Backward);
1437        let result = fillnull.apply(batch);
1438        assert!(result.is_ok());
1439        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1440        let col = result
1441            .column(0)
1442            .as_any()
1443            .downcast_ref::<Int64Array>()
1444            .ok_or_else(|| panic!("cast"))
1445            .ok()
1446            .unwrap_or_else(|| panic!("cast"));
1447        assert_eq!(col.value(1), 4);
1448        assert_eq!(col.value(2), 4);
1449    }
1450
1451    #[test]
1452    fn test_fillnull_unsupported_type_strategy() {
1453        use arrow::array::Date32Array;
1454
1455        // Date32 with Int fill strategy is not supported
1456        let schema = Arc::new(Schema::new(vec![Field::new(
1457            "date",
1458            DataType::Date32,
1459            true,
1460        )]));
1461        let arr = Date32Array::from(vec![Some(1000), None, Some(3000)]);
1462        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1463            .ok()
1464            .unwrap_or_else(|| panic!("batch"));
1465
1466        let fillnull = FillNull::new("date", FillStrategy::Int(42));
1467        let result = fillnull.apply(batch);
1468        assert!(result.is_err());
1469    }
1470
1471    #[test]
1472    fn test_fillnull_forward_fill_float64() {
1473        use arrow::array::Float64Array;
1474
1475        let schema = Arc::new(Schema::new(vec![Field::new(
1476            "value",
1477            DataType::Float64,
1478            true,
1479        )]));
1480        let arr = Float64Array::from(vec![Some(1.0f64), None, None, Some(4.0f64), None]);
1481        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1482            .ok()
1483            .unwrap_or_else(|| panic!("batch"));
1484
1485        let fillnull = FillNull::new("value", FillStrategy::Forward);
1486        let result = fillnull.apply(batch);
1487        assert!(result.is_ok());
1488        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1489        let col = result
1490            .column(0)
1491            .as_any()
1492            .downcast_ref::<Float64Array>()
1493            .ok_or_else(|| panic!("cast"))
1494            .ok()
1495            .unwrap_or_else(|| panic!("cast"));
1496        assert_eq!(col.value(1), 1.0);
1497        assert_eq!(col.value(2), 1.0);
1498        assert_eq!(col.value(4), 4.0);
1499    }
1500
1501    #[test]
1502    fn test_fillnull_backward_fill_float64() {
1503        use arrow::array::Float64Array;
1504
1505        let schema = Arc::new(Schema::new(vec![Field::new(
1506            "value",
1507            DataType::Float64,
1508            true,
1509        )]));
1510        let arr = Float64Array::from(vec![None, Some(2.0f64), None, None, Some(5.0f64)]);
1511        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1512            .ok()
1513            .unwrap_or_else(|| panic!("batch"));
1514
1515        let fillnull = FillNull::new("value", FillStrategy::Backward);
1516        let result = fillnull.apply(batch);
1517        assert!(result.is_ok());
1518        let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1519        let col = result
1520            .column(0)
1521            .as_any()
1522            .downcast_ref::<Float64Array>()
1523            .ok_or_else(|| panic!("cast"))
1524            .ok()
1525            .unwrap_or_else(|| panic!("cast"));
1526        assert_eq!(col.value(0), 2.0);
1527        assert_eq!(col.value(2), 5.0);
1528        assert_eq!(col.value(3), 5.0);
1529    }
1530
1531    #[test]
1532    fn test_fillnull_nonexistent_column() {
1533        let batch = create_test_batch();
1534        let fillnull = FillNull::new("nonexistent", FillStrategy::Int(42));
1535        let result = fillnull.apply(batch);
1536        // FillNull on nonexistent column returns error
1537        assert!(result.is_err());
1538    }
1539}