1use std::sync::Arc;
4
5use arrow::{
6 array::{Array, RecordBatch},
7 datatypes::{Field, Schema},
8};
9
10use super::Transform;
11use crate::error::{Error, Result};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum NormMethod {
16 MinMax,
18 ZScore,
20 L2,
22}
23
24#[derive(Debug, Clone)]
40pub struct Normalize {
41 columns: Option<Vec<String>>,
42 method: NormMethod,
43}
44
45impl Normalize {
46 pub fn new<S: Into<String>>(columns: impl IntoIterator<Item = S>, method: NormMethod) -> Self {
48 Self {
49 columns: Some(columns.into_iter().map(Into::into).collect()),
50 method,
51 }
52 }
53
54 pub fn all_numeric(method: NormMethod) -> Self {
56 Self {
57 columns: None,
58 method,
59 }
60 }
61
62 pub fn columns(&self) -> Option<&[String]> {
64 self.columns.as_deref()
65 }
66
67 pub fn method(&self) -> NormMethod {
69 self.method
70 }
71
72 fn is_numeric_type(dtype: &arrow::datatypes::DataType) -> bool {
73 use arrow::datatypes::DataType;
74 matches!(
75 dtype,
76 DataType::Int8
77 | DataType::Int16
78 | DataType::Int32
79 | DataType::Int64
80 | DataType::UInt8
81 | DataType::UInt16
82 | DataType::UInt32
83 | DataType::UInt64
84 | DataType::Float16
85 | DataType::Float32
86 | DataType::Float64
87 )
88 }
89
90 fn normalize_array(
91 &self,
92 array: &dyn Array,
93 _dtype: &arrow::datatypes::DataType,
94 ) -> Result<Arc<dyn Array>> {
95 use arrow::{array::Float64Array, compute::cast, datatypes::DataType};
96
97 let float_array = cast(array, &DataType::Float64)
99 .map_err(|e| Error::transform(format!("Failed to cast to Float64: {}", e)))?;
100
101 let float_values = float_array
102 .as_any()
103 .downcast_ref::<Float64Array>()
104 .ok_or_else(|| Error::transform("Expected Float64Array after cast"))?;
105
106 let normalized = match self.method {
107 NormMethod::MinMax => Self::min_max_normalize(float_values),
108 NormMethod::ZScore => Self::zscore_normalize(float_values),
109 NormMethod::L2 => Self::l2_normalize(float_values),
110 };
111
112 Ok(Arc::new(normalized))
114 }
115
116 fn min_max_normalize(array: &arrow::array::Float64Array) -> arrow::array::Float64Array {
117 let mut min = f64::INFINITY;
118 let mut max = f64::NEG_INFINITY;
119
120 for i in 0..array.len() {
121 if !array.is_null(i) {
122 let v = array.value(i);
123 if v < min {
124 min = v;
125 }
126 if v > max {
127 max = v;
128 }
129 }
130 }
131
132 let range = max - min;
133 let values: Vec<Option<f64>> = (0..array.len())
134 .map(|i| {
135 if array.is_null(i) {
136 None
137 } else if range == 0.0 {
138 Some(0.0)
139 } else {
140 Some((array.value(i) - min) / range)
141 }
142 })
143 .collect();
144
145 arrow::array::Float64Array::from(values)
146 }
147
148 #[allow(clippy::cast_precision_loss)]
149 fn zscore_normalize(array: &arrow::array::Float64Array) -> arrow::array::Float64Array {
150 let mut sum = 0.0;
151 let mut count = 0usize;
152
153 for i in 0..array.len() {
154 if !array.is_null(i) {
155 sum += array.value(i);
156 count += 1;
157 }
158 }
159
160 if count == 0 {
161 return array.clone();
162 }
163
164 let mean = sum / count as f64;
165
166 let mut variance_sum = 0.0;
167 for i in 0..array.len() {
168 if !array.is_null(i) {
169 let diff = array.value(i) - mean;
170 variance_sum += diff * diff;
171 }
172 }
173
174 let std = (variance_sum / count as f64).sqrt();
175
176 let values: Vec<Option<f64>> = (0..array.len())
177 .map(|i| {
178 if array.is_null(i) {
179 None
180 } else if std == 0.0 {
181 Some(0.0)
182 } else {
183 Some((array.value(i) - mean) / std)
184 }
185 })
186 .collect();
187
188 arrow::array::Float64Array::from(values)
189 }
190
191 fn l2_normalize(array: &arrow::array::Float64Array) -> arrow::array::Float64Array {
192 let mut sum_sq = 0.0;
193
194 for i in 0..array.len() {
195 if !array.is_null(i) {
196 let v = array.value(i);
197 sum_sq += v * v;
198 }
199 }
200
201 let norm = sum_sq.sqrt();
202
203 let values: Vec<Option<f64>> = (0..array.len())
204 .map(|i| {
205 if array.is_null(i) {
206 None
207 } else if norm == 0.0 {
208 Some(0.0)
209 } else {
210 Some(array.value(i) / norm)
211 }
212 })
213 .collect();
214
215 arrow::array::Float64Array::from(values)
216 }
217}
218
219impl Transform for Normalize {
220 fn apply(&self, batch: RecordBatch) -> Result<RecordBatch> {
221 let schema = batch.schema();
222
223 let columns_to_normalize: std::collections::HashSet<&str> = match &self.columns {
224 Some(cols) => cols.iter().map(String::as_str).collect(),
225 None => schema
226 .fields()
227 .iter()
228 .filter(|f| Self::is_numeric_type(f.data_type()))
229 .map(|f| f.name().as_str())
230 .collect(),
231 };
232
233 let mut fields = Vec::with_capacity(schema.fields().len());
234 let mut arrays = Vec::with_capacity(schema.fields().len());
235
236 for (idx, field) in schema.fields().iter().enumerate() {
237 let col = batch.column(idx);
238
239 if columns_to_normalize.contains(field.name().as_str()) {
240 if !Self::is_numeric_type(field.data_type()) {
241 return Err(Error::transform(format!(
242 "Column '{}' is not numeric (type: {:?})",
243 field.name(),
244 field.data_type()
245 )));
246 }
247
248 let normalized = self.normalize_array(col.as_ref(), field.data_type())?;
249 fields.push(Field::new(
251 field.name(),
252 arrow::datatypes::DataType::Float64,
253 field.is_nullable(),
254 ));
255 arrays.push(normalized);
256 } else {
257 fields.push(field.as_ref().clone());
258 arrays.push(Arc::clone(col));
259 }
260 }
261
262 let new_schema = Arc::new(Schema::new(fields));
263 RecordBatch::try_new(new_schema, arrays).map_err(Error::Arrow)
264 }
265}
266
267#[derive(Debug, Clone)]
281pub struct Cast {
282 mappings: Vec<(String, arrow::datatypes::DataType)>,
283}
284
285impl Cast {
286 pub fn new<S: Into<String>>(
288 mappings: impl IntoIterator<Item = (S, arrow::datatypes::DataType)>,
289 ) -> Self {
290 Self {
291 mappings: mappings
292 .into_iter()
293 .map(|(name, dtype)| (name.into(), dtype))
294 .collect(),
295 }
296 }
297
298 pub fn mappings(&self) -> &[(String, arrow::datatypes::DataType)] {
300 &self.mappings
301 }
302}
303
304impl Transform for Cast {
305 fn apply(&self, batch: RecordBatch) -> Result<RecordBatch> {
306 use arrow::compute::cast;
307
308 let schema = batch.schema();
309 let cast_map: std::collections::HashMap<&str, &arrow::datatypes::DataType> =
310 self.mappings.iter().map(|(n, t)| (n.as_str(), t)).collect();
311
312 let mut fields = Vec::with_capacity(schema.fields().len());
313 let mut arrays = Vec::with_capacity(schema.fields().len());
314
315 for (idx, field) in schema.fields().iter().enumerate() {
316 let col = batch.column(idx);
317
318 if let Some(&target_type) = cast_map.get(field.name().as_str()) {
319 let casted = cast(col.as_ref(), target_type).map_err(|e| {
320 Error::transform(format!(
321 "Failed to cast column '{}' to {:?}: {}",
322 field.name(),
323 target_type,
324 e
325 ))
326 })?;
327 fields.push(Field::new(
328 field.name(),
329 target_type.clone(),
330 field.is_nullable(),
331 ));
332 arrays.push(casted);
333 } else {
334 fields.push(field.as_ref().clone());
335 arrays.push(Arc::clone(col));
336 }
337 }
338
339 let new_schema = Arc::new(Schema::new(fields));
340 RecordBatch::try_new(new_schema, arrays).map_err(Error::Arrow)
341 }
342}
343
344#[derive(Debug, Clone)]
346pub enum FillStrategy {
347 Int(i64),
349 Float(f64),
351 String(String),
353 Bool(bool),
355 Zero,
357 Forward,
359 Backward,
361}
362
363#[derive(Debug, Clone)]
380pub struct FillNull {
381 column: String,
382 strategy: FillStrategy,
383}
384
385impl FillNull {
386 pub fn new<S: Into<String>>(column: S, strategy: FillStrategy) -> Self {
388 Self {
389 column: column.into(),
390 strategy,
391 }
392 }
393
394 pub fn with_zero<S: Into<String>>(column: S) -> Self {
396 Self::new(column, FillStrategy::Zero)
397 }
398
399 pub fn column(&self) -> &str {
401 &self.column
402 }
403
404 pub fn strategy(&self) -> &FillStrategy {
406 &self.strategy
407 }
408
409 fn fill_i32_array(col: &Arc<dyn Array>, fill_value: i32) -> arrow::array::Int32Array {
410 use arrow::array::Int32Array;
411 let arr = col.as_any().downcast_ref::<Int32Array>();
412 if let Some(arr) = arr {
413 let values: Vec<i32> = (0..arr.len())
414 .map(|i| {
415 if arr.is_null(i) {
416 fill_value
417 } else {
418 arr.value(i)
419 }
420 })
421 .collect();
422 Int32Array::from(values)
423 } else {
424 Int32Array::from(Vec::<i32>::new())
425 }
426 }
427
428 fn fill_i64_array(col: &Arc<dyn Array>, fill_value: i64) -> arrow::array::Int64Array {
429 use arrow::array::Int64Array;
430 let arr = col.as_any().downcast_ref::<Int64Array>();
431 if let Some(arr) = arr {
432 let values: Vec<i64> = (0..arr.len())
433 .map(|i| {
434 if arr.is_null(i) {
435 fill_value
436 } else {
437 arr.value(i)
438 }
439 })
440 .collect();
441 Int64Array::from(values)
442 } else {
443 Int64Array::from(Vec::<i64>::new())
444 }
445 }
446
447 fn fill_f32_array(col: &Arc<dyn Array>, fill_value: f32) -> arrow::array::Float32Array {
448 use arrow::array::Float32Array;
449 let arr = col.as_any().downcast_ref::<Float32Array>();
450 if let Some(arr) = arr {
451 let values: Vec<f32> = (0..arr.len())
452 .map(|i| {
453 if arr.is_null(i) {
454 fill_value
455 } else {
456 arr.value(i)
457 }
458 })
459 .collect();
460 Float32Array::from(values)
461 } else {
462 Float32Array::from(Vec::<f32>::new())
463 }
464 }
465
466 fn fill_f64_array(col: &Arc<dyn Array>, fill_value: f64) -> arrow::array::Float64Array {
467 use arrow::array::Float64Array;
468 let arr = col.as_any().downcast_ref::<Float64Array>();
469 if let Some(arr) = arr {
470 let values: Vec<f64> = (0..arr.len())
471 .map(|i| {
472 if arr.is_null(i) {
473 fill_value
474 } else {
475 arr.value(i)
476 }
477 })
478 .collect();
479 Float64Array::from(values)
480 } else {
481 Float64Array::from(Vec::<f64>::new())
482 }
483 }
484
485 fn fill_string_array(col: &Arc<dyn Array>, fill_value: &str) -> arrow::array::StringArray {
486 use arrow::array::StringArray;
487 let arr = col.as_any().downcast_ref::<StringArray>();
488 if let Some(arr) = arr {
489 let values: Vec<&str> = (0..arr.len())
490 .map(|i| {
491 if arr.is_null(i) {
492 fill_value
493 } else {
494 arr.value(i)
495 }
496 })
497 .collect();
498 StringArray::from(values)
499 } else {
500 StringArray::from(Vec::<&str>::new())
501 }
502 }
503
504 fn fill_bool_array(col: &Arc<dyn Array>, fill_value: bool) -> arrow::array::BooleanArray {
505 use arrow::array::BooleanArray;
506 let arr = col.as_any().downcast_ref::<BooleanArray>();
507 if let Some(arr) = arr {
508 let values: Vec<bool> = (0..arr.len())
509 .map(|i| {
510 if arr.is_null(i) {
511 fill_value
512 } else {
513 arr.value(i)
514 }
515 })
516 .collect();
517 BooleanArray::from(values)
518 } else {
519 BooleanArray::from(Vec::<bool>::new())
520 }
521 }
522
523 fn forward_fill_i32(col: &Arc<dyn Array>) -> arrow::array::Int32Array {
524 use arrow::array::Int32Array;
525 let arr = col.as_any().downcast_ref::<Int32Array>();
526 if let Some(arr) = arr {
527 let mut last: Option<i32> = None;
528 let values: Vec<Option<i32>> = (0..arr.len())
529 .map(|i| {
530 if arr.is_null(i) {
531 last
532 } else {
533 let v = arr.value(i);
534 last = Some(v);
535 Some(v)
536 }
537 })
538 .collect();
539 Int32Array::from(values)
540 } else {
541 Int32Array::from(Vec::<i32>::new())
542 }
543 }
544
545 fn forward_fill_i64(col: &Arc<dyn Array>) -> arrow::array::Int64Array {
546 use arrow::array::Int64Array;
547 let arr = col.as_any().downcast_ref::<Int64Array>();
548 if let Some(arr) = arr {
549 let mut last: Option<i64> = None;
550 let values: Vec<Option<i64>> = (0..arr.len())
551 .map(|i| {
552 if arr.is_null(i) {
553 last
554 } else {
555 let v = arr.value(i);
556 last = Some(v);
557 Some(v)
558 }
559 })
560 .collect();
561 Int64Array::from(values)
562 } else {
563 Int64Array::from(Vec::<i64>::new())
564 }
565 }
566
567 fn forward_fill_f64(col: &Arc<dyn Array>) -> arrow::array::Float64Array {
568 use arrow::array::Float64Array;
569 let arr = col.as_any().downcast_ref::<Float64Array>();
570 if let Some(arr) = arr {
571 let mut last: Option<f64> = None;
572 let values: Vec<Option<f64>> = (0..arr.len())
573 .map(|i| {
574 if arr.is_null(i) {
575 last
576 } else {
577 let v = arr.value(i);
578 last = Some(v);
579 Some(v)
580 }
581 })
582 .collect();
583 Float64Array::from(values)
584 } else {
585 Float64Array::from(Vec::<f64>::new())
586 }
587 }
588
589 fn backward_fill_i32(col: &Arc<dyn Array>) -> arrow::array::Int32Array {
590 use arrow::array::Int32Array;
591 let arr = col.as_any().downcast_ref::<Int32Array>();
592 if let Some(arr) = arr {
593 let mut next: Option<i32> = None;
594 let mut values: Vec<Option<i32>> = (0..arr.len())
595 .rev()
596 .map(|i| {
597 if arr.is_null(i) {
598 next
599 } else {
600 let v = arr.value(i);
601 next = Some(v);
602 Some(v)
603 }
604 })
605 .collect();
606 values.reverse();
607 Int32Array::from(values)
608 } else {
609 Int32Array::from(Vec::<i32>::new())
610 }
611 }
612
613 fn backward_fill_i64(col: &Arc<dyn Array>) -> arrow::array::Int64Array {
614 use arrow::array::Int64Array;
615 let arr = col.as_any().downcast_ref::<Int64Array>();
616 if let Some(arr) = arr {
617 let mut next: Option<i64> = None;
618 let mut values: Vec<Option<i64>> = (0..arr.len())
619 .rev()
620 .map(|i| {
621 if arr.is_null(i) {
622 next
623 } else {
624 let v = arr.value(i);
625 next = Some(v);
626 Some(v)
627 }
628 })
629 .collect();
630 values.reverse();
631 Int64Array::from(values)
632 } else {
633 Int64Array::from(Vec::<i64>::new())
634 }
635 }
636
637 fn backward_fill_f64(col: &Arc<dyn Array>) -> arrow::array::Float64Array {
638 use arrow::array::Float64Array;
639 let arr = col.as_any().downcast_ref::<Float64Array>();
640 if let Some(arr) = arr {
641 let mut next: Option<f64> = None;
642 let mut values: Vec<Option<f64>> = (0..arr.len())
643 .rev()
644 .map(|i| {
645 if arr.is_null(i) {
646 next
647 } else {
648 let v = arr.value(i);
649 next = Some(v);
650 Some(v)
651 }
652 })
653 .collect();
654 values.reverse();
655 Float64Array::from(values)
656 } else {
657 Float64Array::from(Vec::<f64>::new())
658 }
659 }
660}
661
662impl Transform for FillNull {
663 #[allow(clippy::cast_possible_truncation)]
664 fn apply(&self, batch: RecordBatch) -> Result<RecordBatch> {
665 use arrow::datatypes::DataType;
666
667 let schema = batch.schema();
668 let (col_idx, field) = schema
669 .column_with_name(&self.column)
670 .ok_or_else(|| Error::column_not_found(&self.column))?;
671
672 let mut arrays: Vec<Arc<dyn Array>> = batch.columns().to_vec();
673 let col = batch.column(col_idx);
674
675 let filled: Arc<dyn Array> = match (field.data_type(), &self.strategy) {
676 (DataType::Int32, FillStrategy::Int(v)) => {
677 Arc::new(Self::fill_i32_array(col, *v as i32))
678 }
679 (DataType::Int64, FillStrategy::Int(v)) => Arc::new(Self::fill_i64_array(col, *v)),
680 (DataType::Float32, FillStrategy::Float(v)) => {
681 Arc::new(Self::fill_f32_array(col, *v as f32))
682 }
683 (DataType::Float64, FillStrategy::Float(v)) => Arc::new(Self::fill_f64_array(col, *v)),
684 (DataType::Int32, FillStrategy::Zero) => Arc::new(Self::fill_i32_array(col, 0)),
685 (DataType::Int64, FillStrategy::Zero) => Arc::new(Self::fill_i64_array(col, 0)),
686 (DataType::Float32, FillStrategy::Zero) => Arc::new(Self::fill_f32_array(col, 0.0)),
687 (DataType::Float64, FillStrategy::Zero) => Arc::new(Self::fill_f64_array(col, 0.0)),
688 (DataType::Utf8, FillStrategy::String(s)) => Arc::new(Self::fill_string_array(col, s)),
689 (DataType::Boolean, FillStrategy::Bool(b)) => Arc::new(Self::fill_bool_array(col, *b)),
690 (DataType::Int32, FillStrategy::Forward) => Arc::new(Self::forward_fill_i32(col)),
691 (DataType::Int64, FillStrategy::Forward) => Arc::new(Self::forward_fill_i64(col)),
692 (DataType::Float64, FillStrategy::Forward) => Arc::new(Self::forward_fill_f64(col)),
693 (DataType::Int32, FillStrategy::Backward) => Arc::new(Self::backward_fill_i32(col)),
694 (DataType::Int64, FillStrategy::Backward) => Arc::new(Self::backward_fill_i64(col)),
695 (DataType::Float64, FillStrategy::Backward) => Arc::new(Self::backward_fill_f64(col)),
696 _ => {
697 return Err(Error::transform(format!(
698 "Unsupported type {:?} for fill strategy {:?}",
699 field.data_type(),
700 self.strategy
701 )));
702 }
703 };
704
705 arrays[col_idx] = filled;
706 RecordBatch::try_new(schema, arrays).map_err(Error::Arrow)
707 }
708}
709
710#[cfg(test)]
711#[allow(
712 clippy::float_cmp,
713 clippy::cast_precision_loss,
714 clippy::redundant_closure
715)]
716mod tests {
717 use arrow::{
718 array::{Int32Array, StringArray},
719 datatypes::DataType,
720 };
721
722 use super::*;
723
724 fn create_test_batch() -> RecordBatch {
725 let schema = Arc::new(Schema::new(vec![
726 Field::new("id", DataType::Int32, false),
727 Field::new("name", DataType::Utf8, false),
728 Field::new("value", DataType::Int32, false),
729 ]));
730
731 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
732 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
733 let value_array = Int32Array::from(vec![10, 20, 30, 40, 50]);
734
735 RecordBatch::try_new(
736 schema,
737 vec![
738 Arc::new(id_array),
739 Arc::new(name_array),
740 Arc::new(value_array),
741 ],
742 )
743 .ok()
744 .unwrap_or_else(|| panic!("Should create batch"))
745 }
746
747 #[test]
748 fn test_cast_transform() {
749 let batch = create_test_batch();
750 let transform = Cast::new(vec![("id", DataType::Int64), ("value", DataType::Float64)]);
751
752 let result = transform.apply(batch);
753 assert!(result.is_ok());
754 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
755
756 assert_eq!(result.schema().field(0).data_type(), &DataType::Int64);
757 assert_eq!(result.schema().field(2).data_type(), &DataType::Float64);
758 }
759
760 #[test]
761 fn test_cast_preserves_values() {
762 let batch = create_test_batch();
763 let transform = Cast::new(vec![("id", DataType::Float64)]);
764
765 let result = transform.apply(batch);
766 assert!(result.is_ok());
767 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
768
769 let col = result
770 .column(0)
771 .as_any()
772 .downcast_ref::<arrow::array::Float64Array>()
773 .unwrap_or_else(|| panic!("Should be Float64Array"));
774
775 assert_eq!(col.value(0), 1.0);
776 assert_eq!(col.value(1), 2.0);
777 assert_eq!(col.value(2), 3.0);
778 }
779
780 #[test]
781 fn test_cast_mappings_getter() {
782 let transform = Cast::new(vec![("a", DataType::Int64)]);
783 assert_eq!(transform.mappings().len(), 1);
784 assert_eq!(transform.mappings()[0].0, "a");
785 assert_eq!(transform.mappings()[0].1, DataType::Int64);
786 }
787
788 #[test]
789 fn test_cast_debug() {
790 let cast = Cast::new(vec![("col", DataType::Int64)]);
791 let debug_str = format!("{:?}", cast);
792 assert!(debug_str.contains("Cast"));
793 }
794
795 #[test]
796 fn test_cast_int_to_string() {
797 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
798 let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
799 .ok()
800 .unwrap_or_else(|| panic!("Should create batch"));
801
802 let cast = Cast::new(vec![("id", DataType::Utf8)]);
803 let result = cast.apply(batch);
804 assert!(result.is_ok());
805 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
806 assert_eq!(result.schema().field(0).data_type(), &DataType::Utf8);
807 }
808
809 #[test]
810 fn test_cast_nonexistent_column() {
811 let batch = create_test_batch();
812 let cast = Cast::new(vec![("nonexistent", DataType::Int64)]);
813 let result = cast.apply(batch.clone());
814 assert!(result.is_ok());
816 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
817 assert_eq!(result.num_rows(), batch.num_rows());
818 }
819
820 #[test]
821 fn test_cast_incompatible_type() {
822 let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
824 let arr = StringArray::from(vec!["hello", "world"]);
825 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
826 .ok()
827 .unwrap_or_else(|| panic!("batch"));
828
829 let cast = Cast::new(vec![("text", DataType::Int32)]);
830 let result = cast.apply(batch);
831 assert!(result.is_err());
833 }
834
835 #[test]
836 fn test_normalize_minmax() {
837 let schema = Arc::new(Schema::new(vec![Field::new(
838 "value",
839 DataType::Float64,
840 false,
841 )]));
842 let values = arrow::array::Float64Array::from(vec![0.0, 50.0, 100.0]);
843 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
844 .ok()
845 .unwrap_or_else(|| panic!("Should create batch"));
846
847 let transform = Normalize::new(vec!["value"], NormMethod::MinMax);
848 let result = transform.apply(batch);
849 assert!(result.is_ok());
850 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
851
852 let col = result
853 .column(0)
854 .as_any()
855 .downcast_ref::<arrow::array::Float64Array>()
856 .unwrap_or_else(|| panic!("Should be Float64Array"));
857
858 assert!((col.value(0) - 0.0).abs() < 1e-10);
859 assert!((col.value(1) - 0.5).abs() < 1e-10);
860 assert!((col.value(2) - 1.0).abs() < 1e-10);
861 }
862
863 #[test]
864 fn test_normalize_zscore() {
865 let schema = Arc::new(Schema::new(vec![Field::new(
866 "value",
867 DataType::Float64,
868 false,
869 )]));
870 let values = arrow::array::Float64Array::from(vec![-1.0, 0.0, 1.0]);
872 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
873 .ok()
874 .unwrap_or_else(|| panic!("Should create batch"));
875
876 let transform = Normalize::new(vec!["value"], NormMethod::ZScore);
877 let result = transform.apply(batch);
878 assert!(result.is_ok());
879 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
880
881 let col = result
882 .column(0)
883 .as_any()
884 .downcast_ref::<arrow::array::Float64Array>()
885 .unwrap_or_else(|| panic!("Should be Float64Array"));
886
887 let mean: f64 = (0..col.len()).map(|i| col.value(i)).sum::<f64>() / col.len() as f64;
889 assert!(mean.abs() < 1e-10);
890 }
891
892 #[test]
893 fn test_normalize_l2() {
894 let schema = Arc::new(Schema::new(vec![Field::new(
895 "value",
896 DataType::Float64,
897 false,
898 )]));
899 let values = arrow::array::Float64Array::from(vec![3.0, 4.0]);
900 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
901 .ok()
902 .unwrap_or_else(|| panic!("Should create batch"));
903
904 let transform = Normalize::new(vec!["value"], NormMethod::L2);
905 let result = transform.apply(batch);
906 assert!(result.is_ok());
907 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
908
909 let col = result
910 .column(0)
911 .as_any()
912 .downcast_ref::<arrow::array::Float64Array>()
913 .unwrap_or_else(|| panic!("Should be Float64Array"));
914
915 assert!((col.value(0) - 0.6).abs() < 1e-10);
917 assert!((col.value(1) - 0.8).abs() < 1e-10);
918 }
919
920 #[test]
921 fn test_normalize_all_numeric() {
922 let batch = create_test_batch();
923 let transform = Normalize::all_numeric(NormMethod::MinMax);
924
925 let result = transform.apply(batch);
926 assert!(result.is_ok());
927 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
928
929 assert_eq!(result.schema().field(0).data_type(), &DataType::Float64);
931 assert_eq!(result.schema().field(1).data_type(), &DataType::Utf8); assert_eq!(result.schema().field(2).data_type(), &DataType::Float64);
933 }
934
935 #[test]
936 fn test_normalize_non_numeric_error() {
937 let batch = create_test_batch();
938 let transform = Normalize::new(vec!["name"], NormMethod::MinMax);
939
940 let result = transform.apply(batch);
941 assert!(result.is_err());
942 }
943
944 #[test]
945 fn test_normalize_getters() {
946 let transform = Normalize::new(vec!["a", "b"], NormMethod::ZScore);
947 assert!(transform.columns().is_some());
948 assert_eq!(
949 transform
950 .columns()
951 .unwrap_or_else(|| panic!("Should have columns")),
952 &["a", "b"]
953 );
954 assert_eq!(transform.method(), NormMethod::ZScore);
955
956 let transform2 = Normalize::all_numeric(NormMethod::L2);
957 assert!(transform2.columns().is_none());
958 assert_eq!(transform2.method(), NormMethod::L2);
959 }
960
961 #[test]
962 fn test_normalize_constant_values() {
963 let schema = Arc::new(Schema::new(vec![Field::new(
964 "value",
965 DataType::Float64,
966 false,
967 )]));
968 let values = arrow::array::Float64Array::from(vec![5.0, 5.0, 5.0]);
970 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
971 .ok()
972 .unwrap_or_else(|| panic!("Should create batch"));
973
974 let transform = Normalize::new(vec!["value"], NormMethod::MinMax);
975 let result = transform.apply(batch);
976 assert!(result.is_ok());
977 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
978
979 let col = result
980 .column(0)
981 .as_any()
982 .downcast_ref::<arrow::array::Float64Array>()
983 .unwrap_or_else(|| panic!("Should be Float64Array"));
984
985 for i in 0..col.len() {
987 assert_eq!(col.value(i), 0.0);
988 }
989 }
990
991 #[test]
992 fn test_normalize_with_nulls() {
993 let schema = Arc::new(Schema::new(vec![Field::new(
994 "value",
995 DataType::Float64,
996 true,
997 )]));
998 let values = arrow::array::Float64Array::from(vec![Some(0.0), None, Some(100.0)]);
999 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1000 .ok()
1001 .unwrap_or_else(|| panic!("Should create batch"));
1002
1003 let transform = Normalize::new(vec!["value"], NormMethod::MinMax);
1004 let result = transform.apply(batch);
1005 assert!(result.is_ok());
1006 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1007
1008 let col = result
1009 .column(0)
1010 .as_any()
1011 .downcast_ref::<arrow::array::Float64Array>()
1012 .unwrap_or_else(|| panic!("Should be Float64Array"));
1013
1014 assert!((col.value(0) - 0.0).abs() < 1e-10);
1015 assert!(col.is_null(1));
1016 assert!((col.value(2) - 1.0).abs() < 1e-10);
1017 }
1018
1019 #[test]
1020 fn test_normalize_debug() {
1021 let normalize = Normalize::new(vec!["col"], NormMethod::MinMax);
1022 let debug_str = format!("{:?}", normalize);
1023 assert!(debug_str.contains("Normalize"));
1024 }
1025
1026 #[test]
1027 fn test_normalize_nonexistent_column() {
1028 let batch = create_test_batch();
1029 let normalize = Normalize::new(["nonexistent"], NormMethod::MinMax);
1030 let result = normalize.apply(batch);
1031 assert!(result.is_ok());
1033 }
1034
1035 #[test]
1038 fn test_fillnull_with_zero_i32() {
1039 let schema = Arc::new(Schema::new(vec![Field::new(
1040 "value",
1041 DataType::Int32,
1042 true,
1043 )]));
1044 let values = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
1045 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1046 .ok()
1047 .unwrap_or_else(|| panic!("Should create batch"));
1048
1049 let transform = FillNull::with_zero("value");
1050 let result = transform.apply(batch);
1051 assert!(result.is_ok());
1052 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1053
1054 let col = result
1055 .column(0)
1056 .as_any()
1057 .downcast_ref::<Int32Array>()
1058 .unwrap_or_else(|| panic!("Should be Int32Array"));
1059
1060 assert_eq!(col.value(0), 1);
1061 assert_eq!(col.value(1), 0); assert_eq!(col.value(2), 3);
1063 assert_eq!(col.value(3), 0); assert_eq!(col.value(4), 5);
1065 }
1066
1067 #[test]
1068 fn test_fillnull_with_int_value() {
1069 let schema = Arc::new(Schema::new(vec![Field::new(
1070 "value",
1071 DataType::Int32,
1072 true,
1073 )]));
1074 let values = Int32Array::from(vec![Some(1), None, Some(3)]);
1075 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1076 .ok()
1077 .unwrap_or_else(|| panic!("Should create batch"));
1078
1079 let transform = FillNull::new("value", FillStrategy::Int(-1));
1080 let result = transform.apply(batch);
1081 assert!(result.is_ok());
1082 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1083
1084 let col = result
1085 .column(0)
1086 .as_any()
1087 .downcast_ref::<Int32Array>()
1088 .unwrap_or_else(|| panic!("Should be Int32Array"));
1089
1090 assert_eq!(col.value(1), -1);
1091 }
1092
1093 #[test]
1094 fn test_fillnull_with_float() {
1095 let schema = Arc::new(Schema::new(vec![Field::new(
1096 "value",
1097 DataType::Float64,
1098 true,
1099 )]));
1100 let values = arrow::array::Float64Array::from(vec![Some(1.0), None, Some(3.0)]);
1101 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1102 .ok()
1103 .unwrap_or_else(|| panic!("Should create batch"));
1104
1105 let transform = FillNull::new("value", FillStrategy::Float(99.9));
1106 let result = transform.apply(batch);
1107 assert!(result.is_ok());
1108 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1109
1110 let col = result
1111 .column(0)
1112 .as_any()
1113 .downcast_ref::<arrow::array::Float64Array>()
1114 .unwrap_or_else(|| panic!("Should be Float64Array"));
1115
1116 assert!((col.value(1) - 99.9).abs() < f64::EPSILON);
1117 }
1118
1119 #[test]
1120 fn test_fillnull_with_string() {
1121 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1122 let values = StringArray::from(vec![Some("a"), None, Some("c")]);
1123 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1124 .ok()
1125 .unwrap_or_else(|| panic!("Should create batch"));
1126
1127 let transform = FillNull::new("name", FillStrategy::String("unknown".to_string()));
1128 let result = transform.apply(batch);
1129 assert!(result.is_ok());
1130 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1131
1132 let col = result
1133 .column(0)
1134 .as_any()
1135 .downcast_ref::<StringArray>()
1136 .unwrap_or_else(|| panic!("Should be StringArray"));
1137
1138 assert_eq!(col.value(1), "unknown");
1139 }
1140
1141 #[test]
1142 fn test_fillnull_forward_fill() {
1143 let schema = Arc::new(Schema::new(vec![Field::new(
1144 "value",
1145 DataType::Int32,
1146 true,
1147 )]));
1148 let values = Int32Array::from(vec![Some(1), None, None, Some(4), None]);
1149 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1150 .ok()
1151 .unwrap_or_else(|| panic!("Should create batch"));
1152
1153 let transform = FillNull::new("value", FillStrategy::Forward);
1154 let result = transform.apply(batch);
1155 assert!(result.is_ok());
1156 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1157
1158 let col = result
1159 .column(0)
1160 .as_any()
1161 .downcast_ref::<Int32Array>()
1162 .unwrap_or_else(|| panic!("Should be Int32Array"));
1163
1164 assert_eq!(col.value(0), 1);
1165 assert_eq!(col.value(1), 1); assert_eq!(col.value(2), 1); assert_eq!(col.value(3), 4);
1168 assert_eq!(col.value(4), 4); }
1170
1171 #[test]
1172 fn test_fillnull_backward_fill() {
1173 let schema = Arc::new(Schema::new(vec![Field::new(
1174 "value",
1175 DataType::Int32,
1176 true,
1177 )]));
1178 let values = Int32Array::from(vec![None, None, Some(3), None, Some(5)]);
1179 let batch = RecordBatch::try_new(schema, vec![Arc::new(values)])
1180 .ok()
1181 .unwrap_or_else(|| panic!("Should create batch"));
1182
1183 let transform = FillNull::new("value", FillStrategy::Backward);
1184 let result = transform.apply(batch);
1185 assert!(result.is_ok());
1186 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1187
1188 let col = result
1189 .column(0)
1190 .as_any()
1191 .downcast_ref::<Int32Array>()
1192 .unwrap_or_else(|| panic!("Should be Int32Array"));
1193
1194 assert_eq!(col.value(0), 3); assert_eq!(col.value(1), 3); assert_eq!(col.value(2), 3);
1197 assert_eq!(col.value(3), 5); assert_eq!(col.value(4), 5);
1199 }
1200
1201 #[test]
1202 fn test_fillnull_column_not_found() {
1203 let batch = create_test_batch();
1204 let transform = FillNull::with_zero("nonexistent");
1205
1206 let result = transform.apply(batch);
1207 assert!(result.is_err());
1208 }
1209
1210 #[test]
1211 fn test_fillnull_getters() {
1212 let transform = FillNull::new("col", FillStrategy::Int(42));
1213 assert_eq!(transform.column(), "col");
1214 assert!(matches!(transform.strategy(), FillStrategy::Int(42)));
1215 }
1216
1217 #[test]
1218 fn test_fillnull_debug() {
1219 let fillnull = FillNull::new("col", FillStrategy::Zero);
1220 let debug_str = format!("{:?}", fillnull);
1221 assert!(debug_str.contains("FillNull"));
1222 }
1223
1224 #[test]
1225 fn test_fillnull_with_int64() {
1226 use arrow::array::Int64Array;
1227
1228 let schema = Arc::new(Schema::new(vec![Field::new(
1229 "value",
1230 DataType::Int64,
1231 true,
1232 )]));
1233 let arr = Int64Array::from(vec![Some(1i64), None, Some(3i64), None, Some(5i64)]);
1234 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1235 .ok()
1236 .unwrap_or_else(|| panic!("batch"));
1237
1238 let fillnull = FillNull::new("value", FillStrategy::Int(42));
1239 let result = fillnull.apply(batch);
1240 assert!(result.is_ok());
1241 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1242 let col = result
1243 .column(0)
1244 .as_any()
1245 .downcast_ref::<Int64Array>()
1246 .ok_or_else(|| panic!("cast"))
1247 .ok()
1248 .unwrap_or_else(|| panic!("cast"));
1249 assert_eq!(col.value(1), 42);
1250 assert_eq!(col.value(3), 42);
1251 }
1252
1253 #[test]
1254 fn test_fillnull_with_float32() {
1255 use arrow::array::Float32Array;
1256
1257 let schema = Arc::new(Schema::new(vec![Field::new(
1258 "value",
1259 DataType::Float32,
1260 true,
1261 )]));
1262 let arr = Float32Array::from(vec![Some(1.0f32), None, Some(3.0f32)]);
1263 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1264 .ok()
1265 .unwrap_or_else(|| panic!("batch"));
1266
1267 let fillnull = FillNull::new("value", FillStrategy::Float(2.5));
1268 let result = fillnull.apply(batch);
1269 assert!(result.is_ok());
1270 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1271 let col = result
1272 .column(0)
1273 .as_any()
1274 .downcast_ref::<Float32Array>()
1275 .ok_or_else(|| panic!("cast"))
1276 .ok()
1277 .unwrap_or_else(|| panic!("cast"));
1278 assert!((col.value(1) - 2.5f32).abs() < 0.001);
1279 }
1280
1281 #[test]
1282 fn test_fillnull_with_bool() {
1283 use arrow::array::BooleanArray;
1284
1285 let schema = Arc::new(Schema::new(vec![Field::new(
1286 "flag",
1287 DataType::Boolean,
1288 true,
1289 )]));
1290 let arr = BooleanArray::from(vec![Some(true), None, Some(false)]);
1291 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1292 .ok()
1293 .unwrap_or_else(|| panic!("batch"));
1294
1295 let fillnull = FillNull::new("flag", FillStrategy::Bool(true));
1296 let result = fillnull.apply(batch);
1297 assert!(result.is_ok());
1298 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1299 let col = result
1300 .column(0)
1301 .as_any()
1302 .downcast_ref::<BooleanArray>()
1303 .ok_or_else(|| panic!("cast"))
1304 .ok()
1305 .unwrap_or_else(|| panic!("cast"));
1306 assert!(col.value(1));
1307 }
1308
1309 #[test]
1310 fn test_fillnull_zero_int64() {
1311 use arrow::array::Int64Array;
1312
1313 let schema = Arc::new(Schema::new(vec![Field::new(
1314 "value",
1315 DataType::Int64,
1316 true,
1317 )]));
1318 let arr = Int64Array::from(vec![Some(1i64), None, Some(3i64)]);
1319 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1320 .ok()
1321 .unwrap_or_else(|| panic!("batch"));
1322
1323 let fillnull = FillNull::new("value", FillStrategy::Zero);
1324 let result = fillnull.apply(batch);
1325 assert!(result.is_ok());
1326 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1327 let col = result
1328 .column(0)
1329 .as_any()
1330 .downcast_ref::<Int64Array>()
1331 .ok_or_else(|| panic!("cast"))
1332 .ok()
1333 .unwrap_or_else(|| panic!("cast"));
1334 assert_eq!(col.value(1), 0);
1335 }
1336
1337 #[test]
1338 fn test_fillnull_zero_float32() {
1339 use arrow::array::Float32Array;
1340
1341 let schema = Arc::new(Schema::new(vec![Field::new(
1342 "value",
1343 DataType::Float32,
1344 true,
1345 )]));
1346 let arr = Float32Array::from(vec![Some(1.0f32), None, Some(3.0f32)]);
1347 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1348 .ok()
1349 .unwrap_or_else(|| panic!("batch"));
1350
1351 let fillnull = FillNull::new("value", FillStrategy::Zero);
1352 let result = fillnull.apply(batch);
1353 assert!(result.is_ok());
1354 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1355 let col = result
1356 .column(0)
1357 .as_any()
1358 .downcast_ref::<Float32Array>()
1359 .ok_or_else(|| panic!("cast"))
1360 .ok()
1361 .unwrap_or_else(|| panic!("cast"));
1362 assert_eq!(col.value(1), 0.0f32);
1363 }
1364
1365 #[test]
1366 fn test_fillnull_zero_float64() {
1367 use arrow::array::Float64Array;
1368
1369 let schema = Arc::new(Schema::new(vec![Field::new(
1370 "value",
1371 DataType::Float64,
1372 true,
1373 )]));
1374 let arr = Float64Array::from(vec![Some(1.0f64), None, Some(3.0f64)]);
1375 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1376 .ok()
1377 .unwrap_or_else(|| panic!("batch"));
1378
1379 let fillnull = FillNull::new("value", FillStrategy::Zero);
1380 let result = fillnull.apply(batch);
1381 assert!(result.is_ok());
1382 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1383 let col = result
1384 .column(0)
1385 .as_any()
1386 .downcast_ref::<Float64Array>()
1387 .ok_or_else(|| panic!("cast"))
1388 .ok()
1389 .unwrap_or_else(|| panic!("cast"));
1390 assert_eq!(col.value(1), 0.0f64);
1391 }
1392
1393 #[test]
1394 fn test_fillnull_forward_fill_int64() {
1395 use arrow::array::Int64Array;
1396
1397 let schema = Arc::new(Schema::new(vec![Field::new(
1398 "value",
1399 DataType::Int64,
1400 true,
1401 )]));
1402 let arr = Int64Array::from(vec![Some(1i64), None, None, Some(4i64)]);
1403 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1404 .ok()
1405 .unwrap_or_else(|| panic!("batch"));
1406
1407 let fillnull = FillNull::new("value", FillStrategy::Forward);
1408 let result = fillnull.apply(batch);
1409 assert!(result.is_ok());
1410 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1411 let col = result
1412 .column(0)
1413 .as_any()
1414 .downcast_ref::<Int64Array>()
1415 .ok_or_else(|| panic!("cast"))
1416 .ok()
1417 .unwrap_or_else(|| panic!("cast"));
1418 assert_eq!(col.value(1), 1);
1419 assert_eq!(col.value(2), 1);
1420 }
1421
1422 #[test]
1423 fn test_fillnull_backward_fill_int64() {
1424 use arrow::array::Int64Array;
1425
1426 let schema = Arc::new(Schema::new(vec![Field::new(
1427 "value",
1428 DataType::Int64,
1429 true,
1430 )]));
1431 let arr = Int64Array::from(vec![Some(1i64), None, None, Some(4i64)]);
1432 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1433 .ok()
1434 .unwrap_or_else(|| panic!("batch"));
1435
1436 let fillnull = FillNull::new("value", FillStrategy::Backward);
1437 let result = fillnull.apply(batch);
1438 assert!(result.is_ok());
1439 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1440 let col = result
1441 .column(0)
1442 .as_any()
1443 .downcast_ref::<Int64Array>()
1444 .ok_or_else(|| panic!("cast"))
1445 .ok()
1446 .unwrap_or_else(|| panic!("cast"));
1447 assert_eq!(col.value(1), 4);
1448 assert_eq!(col.value(2), 4);
1449 }
1450
1451 #[test]
1452 fn test_fillnull_unsupported_type_strategy() {
1453 use arrow::array::Date32Array;
1454
1455 let schema = Arc::new(Schema::new(vec![Field::new(
1457 "date",
1458 DataType::Date32,
1459 true,
1460 )]));
1461 let arr = Date32Array::from(vec![Some(1000), None, Some(3000)]);
1462 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1463 .ok()
1464 .unwrap_or_else(|| panic!("batch"));
1465
1466 let fillnull = FillNull::new("date", FillStrategy::Int(42));
1467 let result = fillnull.apply(batch);
1468 assert!(result.is_err());
1469 }
1470
1471 #[test]
1472 fn test_fillnull_forward_fill_float64() {
1473 use arrow::array::Float64Array;
1474
1475 let schema = Arc::new(Schema::new(vec![Field::new(
1476 "value",
1477 DataType::Float64,
1478 true,
1479 )]));
1480 let arr = Float64Array::from(vec![Some(1.0f64), None, None, Some(4.0f64), None]);
1481 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1482 .ok()
1483 .unwrap_or_else(|| panic!("batch"));
1484
1485 let fillnull = FillNull::new("value", FillStrategy::Forward);
1486 let result = fillnull.apply(batch);
1487 assert!(result.is_ok());
1488 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1489 let col = result
1490 .column(0)
1491 .as_any()
1492 .downcast_ref::<Float64Array>()
1493 .ok_or_else(|| panic!("cast"))
1494 .ok()
1495 .unwrap_or_else(|| panic!("cast"));
1496 assert_eq!(col.value(1), 1.0);
1497 assert_eq!(col.value(2), 1.0);
1498 assert_eq!(col.value(4), 4.0);
1499 }
1500
1501 #[test]
1502 fn test_fillnull_backward_fill_float64() {
1503 use arrow::array::Float64Array;
1504
1505 let schema = Arc::new(Schema::new(vec![Field::new(
1506 "value",
1507 DataType::Float64,
1508 true,
1509 )]));
1510 let arr = Float64Array::from(vec![None, Some(2.0f64), None, None, Some(5.0f64)]);
1511 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])
1512 .ok()
1513 .unwrap_or_else(|| panic!("batch"));
1514
1515 let fillnull = FillNull::new("value", FillStrategy::Backward);
1516 let result = fillnull.apply(batch);
1517 assert!(result.is_ok());
1518 let result = result.ok().unwrap_or_else(|| panic!("Should succeed"));
1519 let col = result
1520 .column(0)
1521 .as_any()
1522 .downcast_ref::<Float64Array>()
1523 .ok_or_else(|| panic!("cast"))
1524 .ok()
1525 .unwrap_or_else(|| panic!("cast"));
1526 assert_eq!(col.value(0), 2.0);
1527 assert_eq!(col.value(2), 5.0);
1528 assert_eq!(col.value(3), 5.0);
1529 }
1530
1531 #[test]
1532 fn test_fillnull_nonexistent_column() {
1533 let batch = create_test_batch();
1534 let fillnull = FillNull::new("nonexistent", FillStrategy::Int(42));
1535 let result = fillnull.apply(batch);
1536 assert!(result.is_err());
1538 }
1539}