Skip to main content

alopex_dataframe/ops/
nulls.rs

1use std::sync::Arc;
2
3use arrow::array::{
4    Array, BooleanArray, BooleanBuilder, Float32Array, Float32Builder, Float64Array,
5    Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array, Int64Builder,
6    Int8Array, Int8Builder, StringArray, StringBuilder, UInt16Array, UInt16Builder, UInt32Array,
7    UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder,
8};
9use arrow::datatypes::{DataType, Field, Schema};
10use arrow::record_batch::RecordBatch;
11
12use crate::expr::Scalar;
13use crate::ops::{FillNull, FillNullStrategy};
14use crate::{DataFrameError, Result};
15
16pub fn fill_null_batches(input: Vec<RecordBatch>, fill: &FillNull) -> Result<Vec<RecordBatch>> {
17    let batch = concat_batches(&input)?;
18    if batch.num_rows() == 0 {
19        return Ok(vec![batch]);
20    }
21
22    let arrays = match fill {
23        FillNull::Value(value) => fill_with_scalar(&batch, value)?,
24        FillNull::Strategy(strategy) => fill_with_strategy(&batch, *strategy)?,
25    };
26
27    let batch = RecordBatch::try_new(batch.schema(), arrays).map_err(|e| {
28        DataFrameError::schema_mismatch(format!("failed to build RecordBatch: {e}"))
29    })?;
30    Ok(vec![batch])
31}
32
33pub fn drop_nulls_batches(
34    input: Vec<RecordBatch>,
35    subset: Option<&[String]>,
36) -> Result<Vec<RecordBatch>> {
37    let batch = concat_batches(&input)?;
38    if batch.num_rows() == 0 {
39        return Ok(vec![batch]);
40    }
41    let indices = resolve_subset(&batch, subset)?;
42
43    let mut mask_builder = BooleanBuilder::with_capacity(batch.num_rows());
44    for row in 0..batch.num_rows() {
45        let mut keep = true;
46        for idx in &indices {
47            if batch.column(*idx).is_null(row) {
48                keep = false;
49                break;
50            }
51        }
52        mask_builder.append_value(keep);
53    }
54    let mask = mask_builder.finish();
55    let batch = arrow::compute::filter_record_batch(&batch, &mask)
56        .map_err(|source| DataFrameError::Arrow { source })?;
57    Ok(vec![batch])
58}
59
60pub fn null_count_batches(input: Vec<RecordBatch>) -> Result<Vec<RecordBatch>> {
61    if input.is_empty() {
62        return Ok(vec![RecordBatch::new_empty(Arc::new(Schema::empty()))]);
63    }
64
65    let schema = input[0].schema();
66    let mut counts = vec![0_u64; schema.fields().len()];
67    for batch in &input {
68        for (idx, col) in batch.columns().iter().enumerate() {
69            counts[idx] += col.null_count() as u64;
70        }
71    }
72
73    let mut fields = Vec::with_capacity(counts.len());
74    let mut arrays = Vec::with_capacity(counts.len());
75    for (idx, field) in schema.fields().iter().enumerate() {
76        fields.push(Field::new(field.name(), DataType::UInt64, false));
77        let array = UInt64Array::from(vec![counts[idx]]);
78        arrays.push(Arc::new(array) as _);
79    }
80
81    let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(|e| {
82        DataFrameError::schema_mismatch(format!("failed to build RecordBatch: {e}"))
83    })?;
84    Ok(vec![batch])
85}
86
87fn concat_batches(batches: &[RecordBatch]) -> Result<RecordBatch> {
88    if batches.is_empty() {
89        return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
90    }
91    let schema = batches[0].schema();
92    if batches.len() == 1 {
93        return Ok(batches[0].clone());
94    }
95    arrow::compute::concat_batches(&schema, batches)
96        .map_err(|source| DataFrameError::Arrow { source })
97}
98
99fn resolve_subset(batch: &RecordBatch, subset: Option<&[String]>) -> Result<Vec<usize>> {
100    let schema = batch.schema();
101    let indices = match subset {
102        Some(cols) => {
103            if cols.is_empty() {
104                return Err(DataFrameError::invalid_operation(
105                    "drop_nulls subset must be non-empty",
106                ));
107            }
108            cols.iter()
109                .map(|name| {
110                    schema
111                        .fields()
112                        .iter()
113                        .position(|f| f.name() == name)
114                        .ok_or_else(|| DataFrameError::column_not_found(name.clone()))
115                })
116                .collect::<Result<Vec<_>>>()?
117        }
118        None => (0..schema.fields().len()).collect(),
119    };
120    Ok(indices)
121}
122
123fn fill_with_scalar(batch: &RecordBatch, value: &Scalar) -> Result<Vec<Arc<dyn Array>>> {
124    let mut arrays = Vec::with_capacity(batch.num_columns());
125    for col in batch.columns() {
126        let filled: Arc<dyn Array> = match col.data_type() {
127            DataType::Boolean => Arc::new(fill_boolean(col, value)?),
128            DataType::Int8 => Arc::new(fill_int8(col, value)?),
129            DataType::Int16 => Arc::new(fill_int16(col, value)?),
130            DataType::Int32 => Arc::new(fill_int32(col, value)?),
131            DataType::Int64 => Arc::new(fill_int64(col, value)?),
132            DataType::UInt8 => Arc::new(fill_uint8(col, value)?),
133            DataType::UInt16 => Arc::new(fill_uint16(col, value)?),
134            DataType::UInt32 => Arc::new(fill_uint32(col, value)?),
135            DataType::UInt64 => Arc::new(fill_uint64(col, value)?),
136            DataType::Float32 => Arc::new(fill_float32(col, value)?),
137            DataType::Float64 => Arc::new(fill_float64(col, value)?),
138            DataType::Utf8 => Arc::new(fill_utf8(col, value)?),
139            other => {
140                return Err(DataFrameError::type_mismatch(
141                    None::<String>,
142                    other.to_string(),
143                    format!("{value:?}"),
144                ))
145            }
146        };
147        arrays.push(filled);
148    }
149    Ok(arrays)
150}
151
152fn fill_with_strategy(
153    batch: &RecordBatch,
154    strategy: FillNullStrategy,
155) -> Result<Vec<Arc<dyn Array>>> {
156    let mut arrays = Vec::with_capacity(batch.num_columns());
157    for col in batch.columns() {
158        let filled: Arc<dyn Array> = match strategy {
159            FillNullStrategy::Forward => Arc::new(fill_forward(col)?),
160            FillNullStrategy::Backward => Arc::new(fill_backward(col)?),
161            FillNullStrategy::Min => Arc::new(fill_min(col)?),
162            FillNullStrategy::Max => Arc::new(fill_max(col)?),
163            FillNullStrategy::Mean => Arc::new(fill_mean(col)?),
164            FillNullStrategy::Zero => Arc::new(fill_numeric_constant(col, 0)?),
165            FillNullStrategy::One => Arc::new(fill_numeric_constant(col, 1)?),
166        };
167        arrays.push(filled);
168    }
169    Ok(arrays)
170}
171
172fn fill_boolean(col: &Arc<dyn Array>, value: &Scalar) -> Result<BooleanArray> {
173    let array = col
174        .as_any()
175        .downcast_ref::<BooleanArray>()
176        .ok_or_else(|| DataFrameError::invalid_operation("bad BooleanArray downcast"))?;
177    let fill = match value {
178        Scalar::Boolean(v) => *v,
179        Scalar::Null => return Ok(array.clone()),
180        other => {
181            return Err(DataFrameError::type_mismatch(
182                None::<String>,
183                "Boolean".to_string(),
184                format!("{other:?}"),
185            ))
186        }
187    };
188
189    let mut builder = BooleanBuilder::with_capacity(array.len());
190    for i in 0..array.len() {
191        if array.is_null(i) {
192            builder.append_value(fill);
193        } else {
194            builder.append_value(array.value(i));
195        }
196    }
197    Ok(builder.finish())
198}
199
200fn fill_int8(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int8Array> {
201    let array = col
202        .as_any()
203        .downcast_ref::<Int8Array>()
204        .ok_or_else(|| DataFrameError::invalid_operation("bad Int8Array downcast"))?;
205    let fill = match value {
206        Scalar::Int64(v) => i8::try_from(*v)
207            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "Int8", "out of range"))?,
208        Scalar::Null => return Ok(array.clone()),
209        other => {
210            return Err(DataFrameError::type_mismatch(
211                None::<String>,
212                "Int8".to_string(),
213                format!("{other:?}"),
214            ))
215        }
216    };
217
218    let mut builder = Int8Builder::with_capacity(array.len());
219    for i in 0..array.len() {
220        if array.is_null(i) {
221            builder.append_value(fill);
222        } else {
223            builder.append_value(array.value(i));
224        }
225    }
226    Ok(builder.finish())
227}
228
229fn fill_int16(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int16Array> {
230    let array = col
231        .as_any()
232        .downcast_ref::<Int16Array>()
233        .ok_or_else(|| DataFrameError::invalid_operation("bad Int16Array downcast"))?;
234    let fill = match value {
235        Scalar::Int64(v) => i16::try_from(*v)
236            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "Int16", "out of range"))?,
237        Scalar::Null => return Ok(array.clone()),
238        other => {
239            return Err(DataFrameError::type_mismatch(
240                None::<String>,
241                "Int16".to_string(),
242                format!("{other:?}"),
243            ))
244        }
245    };
246
247    let mut builder = Int16Builder::with_capacity(array.len());
248    for i in 0..array.len() {
249        if array.is_null(i) {
250            builder.append_value(fill);
251        } else {
252            builder.append_value(array.value(i));
253        }
254    }
255    Ok(builder.finish())
256}
257
258fn fill_int32(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int32Array> {
259    let array = col
260        .as_any()
261        .downcast_ref::<Int32Array>()
262        .ok_or_else(|| DataFrameError::invalid_operation("bad Int32Array downcast"))?;
263    let fill = match value {
264        Scalar::Int64(v) => i32::try_from(*v)
265            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "Int32", "out of range"))?,
266        Scalar::Null => return Ok(array.clone()),
267        other => {
268            return Err(DataFrameError::type_mismatch(
269                None::<String>,
270                "Int32".to_string(),
271                format!("{other:?}"),
272            ))
273        }
274    };
275
276    let mut builder = Int32Builder::with_capacity(array.len());
277    for i in 0..array.len() {
278        if array.is_null(i) {
279            builder.append_value(fill);
280        } else {
281            builder.append_value(array.value(i));
282        }
283    }
284    Ok(builder.finish())
285}
286
287fn fill_int64(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int64Array> {
288    let array = col
289        .as_any()
290        .downcast_ref::<Int64Array>()
291        .ok_or_else(|| DataFrameError::invalid_operation("bad Int64Array downcast"))?;
292    let fill = match value {
293        Scalar::Int64(v) => *v,
294        Scalar::Null => return Ok(array.clone()),
295        other => {
296            return Err(DataFrameError::type_mismatch(
297                None::<String>,
298                "Int64".to_string(),
299                format!("{other:?}"),
300            ))
301        }
302    };
303
304    let mut builder = Int64Builder::with_capacity(array.len());
305    for i in 0..array.len() {
306        if array.is_null(i) {
307            builder.append_value(fill);
308        } else {
309            builder.append_value(array.value(i));
310        }
311    }
312    Ok(builder.finish())
313}
314
315fn fill_uint8(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt8Array> {
316    let array = col
317        .as_any()
318        .downcast_ref::<UInt8Array>()
319        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt8Array downcast"))?;
320    let fill = match value {
321        Scalar::Int64(v) => u8::try_from(*v)
322            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt8", "out of range"))?,
323        Scalar::Null => return Ok(array.clone()),
324        other => {
325            return Err(DataFrameError::type_mismatch(
326                None::<String>,
327                "UInt8".to_string(),
328                format!("{other:?}"),
329            ))
330        }
331    };
332
333    let mut builder = UInt8Builder::with_capacity(array.len());
334    for i in 0..array.len() {
335        if array.is_null(i) {
336            builder.append_value(fill);
337        } else {
338            builder.append_value(array.value(i));
339        }
340    }
341    Ok(builder.finish())
342}
343
344fn fill_uint16(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt16Array> {
345    let array = col
346        .as_any()
347        .downcast_ref::<UInt16Array>()
348        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt16Array downcast"))?;
349    let fill = match value {
350        Scalar::Int64(v) => u16::try_from(*v)
351            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt16", "out of range"))?,
352        Scalar::Null => return Ok(array.clone()),
353        other => {
354            return Err(DataFrameError::type_mismatch(
355                None::<String>,
356                "UInt16".to_string(),
357                format!("{other:?}"),
358            ))
359        }
360    };
361
362    let mut builder = UInt16Builder::with_capacity(array.len());
363    for i in 0..array.len() {
364        if array.is_null(i) {
365            builder.append_value(fill);
366        } else {
367            builder.append_value(array.value(i));
368        }
369    }
370    Ok(builder.finish())
371}
372
373fn fill_uint32(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt32Array> {
374    let array = col
375        .as_any()
376        .downcast_ref::<UInt32Array>()
377        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt32Array downcast"))?;
378    let fill = match value {
379        Scalar::Int64(v) => u32::try_from(*v)
380            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt32", "out of range"))?,
381        Scalar::Null => return Ok(array.clone()),
382        other => {
383            return Err(DataFrameError::type_mismatch(
384                None::<String>,
385                "UInt32".to_string(),
386                format!("{other:?}"),
387            ))
388        }
389    };
390
391    let mut builder = UInt32Builder::with_capacity(array.len());
392    for i in 0..array.len() {
393        if array.is_null(i) {
394            builder.append_value(fill);
395        } else {
396            builder.append_value(array.value(i));
397        }
398    }
399    Ok(builder.finish())
400}
401
402fn fill_uint64(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt64Array> {
403    let array = col
404        .as_any()
405        .downcast_ref::<UInt64Array>()
406        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt64Array downcast"))?;
407    let fill = match value {
408        Scalar::Int64(v) => u64::try_from(*v)
409            .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt64", "out of range"))?,
410        Scalar::Null => return Ok(array.clone()),
411        other => {
412            return Err(DataFrameError::type_mismatch(
413                None::<String>,
414                "UInt64".to_string(),
415                format!("{other:?}"),
416            ))
417        }
418    };
419
420    let mut builder = UInt64Builder::with_capacity(array.len());
421    for i in 0..array.len() {
422        if array.is_null(i) {
423            builder.append_value(fill);
424        } else {
425            builder.append_value(array.value(i));
426        }
427    }
428    Ok(builder.finish())
429}
430
431fn fill_float32(col: &Arc<dyn Array>, value: &Scalar) -> Result<Float32Array> {
432    let array = col
433        .as_any()
434        .downcast_ref::<Float32Array>()
435        .ok_or_else(|| DataFrameError::invalid_operation("bad Float32Array downcast"))?;
436    let fill = match value {
437        Scalar::Float64(v) => *v as f32,
438        Scalar::Int64(v) => *v as f32,
439        Scalar::Null => return Ok(array.clone()),
440        other => {
441            return Err(DataFrameError::type_mismatch(
442                None::<String>,
443                "Float32".to_string(),
444                format!("{other:?}"),
445            ))
446        }
447    };
448
449    let mut builder = Float32Builder::with_capacity(array.len());
450    for i in 0..array.len() {
451        if array.is_null(i) {
452            builder.append_value(fill);
453        } else {
454            builder.append_value(array.value(i));
455        }
456    }
457    Ok(builder.finish())
458}
459
460fn fill_float64(col: &Arc<dyn Array>, value: &Scalar) -> Result<Float64Array> {
461    let array = col
462        .as_any()
463        .downcast_ref::<Float64Array>()
464        .ok_or_else(|| DataFrameError::invalid_operation("bad Float64Array downcast"))?;
465    let fill = match value {
466        Scalar::Float64(v) => *v,
467        Scalar::Int64(v) => *v as f64,
468        Scalar::Null => return Ok(array.clone()),
469        other => {
470            return Err(DataFrameError::type_mismatch(
471                None::<String>,
472                "Float64".to_string(),
473                format!("{other:?}"),
474            ))
475        }
476    };
477
478    let mut builder = Float64Builder::with_capacity(array.len());
479    for i in 0..array.len() {
480        if array.is_null(i) {
481            builder.append_value(fill);
482        } else {
483            builder.append_value(array.value(i));
484        }
485    }
486    Ok(builder.finish())
487}
488
489fn fill_utf8(col: &Arc<dyn Array>, value: &Scalar) -> Result<StringArray> {
490    let array = col
491        .as_any()
492        .downcast_ref::<StringArray>()
493        .ok_or_else(|| DataFrameError::invalid_operation("bad StringArray downcast"))?;
494    let fill = match value {
495        Scalar::Utf8(v) => v.as_str(),
496        Scalar::Null => return Ok(array.clone()),
497        other => {
498            return Err(DataFrameError::type_mismatch(
499                None::<String>,
500                "Utf8".to_string(),
501                format!("{other:?}"),
502            ))
503        }
504    };
505
506    let mut builder = StringBuilder::with_capacity(array.len(), array.value_data().len());
507    for i in 0..array.len() {
508        if array.is_null(i) {
509            builder.append_value(fill);
510        } else {
511            builder.append_value(array.value(i));
512        }
513    }
514    Ok(builder.finish())
515}
516
517fn fill_forward(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
518    match col.data_type() {
519        DataType::Boolean => Ok(Arc::new(fill_forward_bool(col)?)),
520        DataType::Int8 => Ok(Arc::new(fill_forward_i8(col)?)),
521        DataType::Int16 => Ok(Arc::new(fill_forward_i16(col)?)),
522        DataType::Int32 => Ok(Arc::new(fill_forward_i32(col)?)),
523        DataType::Int64 => Ok(Arc::new(fill_forward_i64(col)?)),
524        DataType::UInt8 => Ok(Arc::new(fill_forward_u8(col)?)),
525        DataType::UInt16 => Ok(Arc::new(fill_forward_u16(col)?)),
526        DataType::UInt32 => Ok(Arc::new(fill_forward_u32(col)?)),
527        DataType::UInt64 => Ok(Arc::new(fill_forward_u64(col)?)),
528        DataType::Float32 => Ok(Arc::new(fill_forward_f32(col)?)),
529        DataType::Float64 => Ok(Arc::new(fill_forward_f64(col)?)),
530        DataType::Utf8 => Ok(Arc::new(fill_forward_utf8(col)?)),
531        other => Err(DataFrameError::invalid_operation(format!(
532            "unsupported fill_null forward type {other:?}",
533        ))),
534    }
535}
536
537fn fill_backward(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
538    match col.data_type() {
539        DataType::Boolean => Ok(Arc::new(fill_backward_bool(col)?)),
540        DataType::Int8 => Ok(Arc::new(fill_backward_i8(col)?)),
541        DataType::Int16 => Ok(Arc::new(fill_backward_i16(col)?)),
542        DataType::Int32 => Ok(Arc::new(fill_backward_i32(col)?)),
543        DataType::Int64 => Ok(Arc::new(fill_backward_i64(col)?)),
544        DataType::UInt8 => Ok(Arc::new(fill_backward_u8(col)?)),
545        DataType::UInt16 => Ok(Arc::new(fill_backward_u16(col)?)),
546        DataType::UInt32 => Ok(Arc::new(fill_backward_u32(col)?)),
547        DataType::UInt64 => Ok(Arc::new(fill_backward_u64(col)?)),
548        DataType::Float32 => Ok(Arc::new(fill_backward_f32(col)?)),
549        DataType::Float64 => Ok(Arc::new(fill_backward_f64(col)?)),
550        DataType::Utf8 => Ok(Arc::new(fill_backward_utf8(col)?)),
551        other => Err(DataFrameError::invalid_operation(format!(
552            "unsupported fill_null backward type {other:?}",
553        ))),
554    }
555}
556
557fn fill_min(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
558    fill_numeric_stat(col, Stat::Min)
559}
560
561fn fill_max(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
562    fill_numeric_stat(col, Stat::Max)
563}
564
565fn fill_mean(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
566    fill_numeric_stat(col, Stat::Mean)
567}
568
569fn fill_numeric_constant(col: &Arc<dyn Array>, value: i64) -> Result<Arc<dyn Array>> {
570    match col.data_type() {
571        DataType::Int8 => Ok(Arc::new(fill_int8(col, &Scalar::Int64(value))?)),
572        DataType::Int16 => Ok(Arc::new(fill_int16(col, &Scalar::Int64(value))?)),
573        DataType::Int32 => Ok(Arc::new(fill_int32(col, &Scalar::Int64(value))?)),
574        DataType::Int64 => Ok(Arc::new(fill_int64(col, &Scalar::Int64(value))?)),
575        DataType::UInt8 => Ok(Arc::new(fill_uint8(col, &Scalar::Int64(value))?)),
576        DataType::UInt16 => Ok(Arc::new(fill_uint16(col, &Scalar::Int64(value))?)),
577        DataType::UInt32 => Ok(Arc::new(fill_uint32(col, &Scalar::Int64(value))?)),
578        DataType::UInt64 => Ok(Arc::new(fill_uint64(col, &Scalar::Int64(value))?)),
579        DataType::Float32 => Ok(Arc::new(fill_float32(col, &Scalar::Int64(value))?)),
580        DataType::Float64 => Ok(Arc::new(fill_float64(col, &Scalar::Int64(value))?)),
581        other => Err(DataFrameError::type_mismatch(
582            None::<String>,
583            "numeric".to_string(),
584            other.to_string(),
585        )),
586    }
587}
588
589enum Stat {
590    Min,
591    Max,
592    Mean,
593}
594
595fn fill_numeric_stat(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
596    match col.data_type() {
597        DataType::Int8 => fill_stat_i8(col, stat),
598        DataType::Int16 => fill_stat_i16(col, stat),
599        DataType::Int32 => fill_stat_i32(col, stat),
600        DataType::Int64 => fill_stat_i64(col, stat),
601        DataType::UInt8 => fill_stat_u8(col, stat),
602        DataType::UInt16 => fill_stat_u16(col, stat),
603        DataType::UInt32 => fill_stat_u32(col, stat),
604        DataType::UInt64 => fill_stat_u64(col, stat),
605        DataType::Float32 => fill_stat_f32(col, stat),
606        DataType::Float64 => fill_stat_f64(col, stat),
607        other => Err(DataFrameError::type_mismatch(
608            None::<String>,
609            "numeric".to_string(),
610            other.to_string(),
611        )),
612    }
613}
614
615fn fill_forward_bool(col: &Arc<dyn Array>) -> Result<BooleanArray> {
616    let array = col
617        .as_any()
618        .downcast_ref::<BooleanArray>()
619        .ok_or_else(|| DataFrameError::invalid_operation("bad BooleanArray downcast"))?;
620    let mut builder = BooleanBuilder::with_capacity(array.len());
621    let mut last: Option<bool> = None;
622    for i in 0..array.len() {
623        if array.is_null(i) {
624            match last {
625                Some(v) => builder.append_value(v),
626                None => builder.append_null(),
627            }
628        } else {
629            let v = array.value(i);
630            last = Some(v);
631            builder.append_value(v);
632        }
633    }
634    Ok(builder.finish())
635}
636
637fn fill_backward_bool(col: &Arc<dyn Array>) -> Result<BooleanArray> {
638    let array = col
639        .as_any()
640        .downcast_ref::<BooleanArray>()
641        .ok_or_else(|| DataFrameError::invalid_operation("bad BooleanArray downcast"))?;
642    let mut tmp: Vec<Option<bool>> = Vec::with_capacity(array.len());
643    let mut next: Option<bool> = None;
644    for i in (0..array.len()).rev() {
645        if array.is_null(i) {
646            tmp.push(next);
647        } else {
648            let v = array.value(i);
649            next = Some(v);
650            tmp.push(Some(v));
651        }
652    }
653    tmp.reverse();
654    let mut builder = BooleanBuilder::with_capacity(array.len());
655    for v in tmp {
656        match v {
657            Some(v) => builder.append_value(v),
658            None => builder.append_null(),
659        }
660    }
661    Ok(builder.finish())
662}
663
664macro_rules! fill_forward_numeric {
665    ($name:ident, $array_ty:ty, $builder_ty:ty, $cast:expr) => {
666        fn $name(col: &Arc<dyn Array>) -> Result<$array_ty> {
667            let array = col
668                .as_any()
669                .downcast_ref::<$array_ty>()
670                .ok_or_else(|| DataFrameError::invalid_operation("bad array downcast"))?;
671            let mut builder = <$builder_ty>::with_capacity(array.len());
672            let mut last = None;
673            for i in 0..array.len() {
674                if array.is_null(i) {
675                    match last {
676                        Some(v) => builder.append_value(v),
677                        None => builder.append_null(),
678                    }
679                } else {
680                    let v = array.value(i);
681                    let v = $cast(v);
682                    last = Some(v);
683                    builder.append_value(v);
684                }
685            }
686            Ok(builder.finish())
687        }
688    };
689}
690
691macro_rules! fill_backward_numeric {
692    ($name:ident, $array_ty:ty, $builder_ty:ty, $cast:expr) => {
693        fn $name(col: &Arc<dyn Array>) -> Result<$array_ty> {
694            let array = col
695                .as_any()
696                .downcast_ref::<$array_ty>()
697                .ok_or_else(|| DataFrameError::invalid_operation("bad array downcast"))?;
698            let mut tmp = Vec::with_capacity(array.len());
699            let mut next = None;
700            for i in (0..array.len()).rev() {
701                if array.is_null(i) {
702                    tmp.push(next);
703                } else {
704                    let v = $cast(array.value(i));
705                    next = Some(v);
706                    tmp.push(Some(v));
707                }
708            }
709            tmp.reverse();
710            let mut builder = <$builder_ty>::with_capacity(array.len());
711            for v in tmp {
712                match v {
713                    Some(v) => builder.append_value(v),
714                    None => builder.append_null(),
715                }
716            }
717            Ok(builder.finish())
718        }
719    };
720}
721
722fill_forward_numeric!(fill_forward_i8, Int8Array, Int8Builder, |v: i8| v);
723fill_forward_numeric!(fill_forward_i16, Int16Array, Int16Builder, |v: i16| v);
724fill_forward_numeric!(fill_forward_i32, Int32Array, Int32Builder, |v: i32| v);
725fill_forward_numeric!(fill_forward_i64, Int64Array, Int64Builder, |v: i64| v);
726fill_forward_numeric!(fill_forward_u8, UInt8Array, UInt8Builder, |v: u8| v);
727fill_forward_numeric!(fill_forward_u16, UInt16Array, UInt16Builder, |v: u16| v);
728fill_forward_numeric!(fill_forward_u32, UInt32Array, UInt32Builder, |v: u32| v);
729fill_forward_numeric!(fill_forward_u64, UInt64Array, UInt64Builder, |v: u64| v);
730fill_forward_numeric!(fill_forward_f32, Float32Array, Float32Builder, |v: f32| v);
731fill_forward_numeric!(fill_forward_f64, Float64Array, Float64Builder, |v: f64| v);
732
733fill_backward_numeric!(fill_backward_i8, Int8Array, Int8Builder, |v: i8| v);
734fill_backward_numeric!(fill_backward_i16, Int16Array, Int16Builder, |v: i16| v);
735fill_backward_numeric!(fill_backward_i32, Int32Array, Int32Builder, |v: i32| v);
736fill_backward_numeric!(fill_backward_i64, Int64Array, Int64Builder, |v: i64| v);
737fill_backward_numeric!(fill_backward_u8, UInt8Array, UInt8Builder, |v: u8| v);
738fill_backward_numeric!(fill_backward_u16, UInt16Array, UInt16Builder, |v: u16| v);
739fill_backward_numeric!(fill_backward_u32, UInt32Array, UInt32Builder, |v: u32| v);
740fill_backward_numeric!(fill_backward_u64, UInt64Array, UInt64Builder, |v: u64| v);
741fill_backward_numeric!(fill_backward_f32, Float32Array, Float32Builder, |v: f32| v);
742fill_backward_numeric!(fill_backward_f64, Float64Array, Float64Builder, |v: f64| v);
743
744fn fill_forward_utf8(col: &Arc<dyn Array>) -> Result<StringArray> {
745    let array = col
746        .as_any()
747        .downcast_ref::<StringArray>()
748        .ok_or_else(|| DataFrameError::invalid_operation("bad StringArray downcast"))?;
749    let mut builder = StringBuilder::with_capacity(array.len(), array.value_data().len());
750    let mut last: Option<String> = None;
751    for i in 0..array.len() {
752        if array.is_null(i) {
753            match last.as_deref() {
754                Some(v) => builder.append_value(v),
755                None => builder.append_null(),
756            }
757        } else {
758            let v = array.value(i).to_string();
759            last = Some(v.clone());
760            builder.append_value(v);
761        }
762    }
763    Ok(builder.finish())
764}
765
766fn fill_backward_utf8(col: &Arc<dyn Array>) -> Result<StringArray> {
767    let array = col
768        .as_any()
769        .downcast_ref::<StringArray>()
770        .ok_or_else(|| DataFrameError::invalid_operation("bad StringArray downcast"))?;
771    let mut tmp = Vec::with_capacity(array.len());
772    let mut next: Option<String> = None;
773    for i in (0..array.len()).rev() {
774        if array.is_null(i) {
775            tmp.push(next.clone());
776        } else {
777            let v = array.value(i).to_string();
778            next = Some(v.clone());
779            tmp.push(Some(v));
780        }
781    }
782    tmp.reverse();
783    let mut builder = StringBuilder::with_capacity(array.len(), array.value_data().len());
784    for v in tmp {
785        match v {
786            Some(v) => builder.append_value(v),
787            None => builder.append_null(),
788        }
789    }
790    Ok(builder.finish())
791}
792
793fn fill_stat_i8(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
794    let array = col
795        .as_any()
796        .downcast_ref::<Int8Array>()
797        .ok_or_else(|| DataFrameError::invalid_operation("bad Int8Array downcast"))?;
798    let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
799    let value = match stat {
800        Stat::Min => min.map(|v| v as i64),
801        Stat::Max => max.map(|v| v as i64),
802        Stat::Mean => mean.map(|v| v as i64),
803    };
804    match value {
805        Some(v) => Ok(Arc::new(fill_int8(col, &Scalar::Int64(v))?)),
806        None => Ok(col.clone()),
807    }
808}
809
810fn fill_stat_i16(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
811    let array = col
812        .as_any()
813        .downcast_ref::<Int16Array>()
814        .ok_or_else(|| DataFrameError::invalid_operation("bad Int16Array downcast"))?;
815    let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
816    let value = match stat {
817        Stat::Min => min.map(|v| v as i64),
818        Stat::Max => max.map(|v| v as i64),
819        Stat::Mean => mean.map(|v| v as i64),
820    };
821    match value {
822        Some(v) => Ok(Arc::new(fill_int16(col, &Scalar::Int64(v))?)),
823        None => Ok(col.clone()),
824    }
825}
826
827fn fill_stat_i32(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
828    let array = col
829        .as_any()
830        .downcast_ref::<Int32Array>()
831        .ok_or_else(|| DataFrameError::invalid_operation("bad Int32Array downcast"))?;
832    let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
833    let value = match stat {
834        Stat::Min => min.map(|v| v as i64),
835        Stat::Max => max.map(|v| v as i64),
836        Stat::Mean => mean.map(|v| v as i64),
837    };
838    match value {
839        Some(v) => Ok(Arc::new(fill_int32(col, &Scalar::Int64(v))?)),
840        None => Ok(col.clone()),
841    }
842}
843
844fn fill_stat_i64(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
845    let array = col
846        .as_any()
847        .downcast_ref::<Int64Array>()
848        .ok_or_else(|| DataFrameError::invalid_operation("bad Int64Array downcast"))?;
849    let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
850    let value = match stat {
851        Stat::Min => min.map(|v| v as i64),
852        Stat::Max => max.map(|v| v as i64),
853        Stat::Mean => mean.map(|v| v as i64),
854    };
855    match value {
856        Some(v) => Ok(Arc::new(fill_int64(col, &Scalar::Int64(v))?)),
857        None => Ok(col.clone()),
858    }
859}
860
861fn fill_stat_u8(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
862    let array = col
863        .as_any()
864        .downcast_ref::<UInt8Array>()
865        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt8Array downcast"))?;
866    let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
867    let value = match stat {
868        Stat::Min => min.map(|v| v as i64),
869        Stat::Max => max.map(|v| v as i64),
870        Stat::Mean => mean.map(|v| v as i64),
871    };
872    match value {
873        Some(v) => Ok(Arc::new(fill_uint8(col, &Scalar::Int64(v))?)),
874        None => Ok(col.clone()),
875    }
876}
877
878fn fill_stat_u16(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
879    let array = col
880        .as_any()
881        .downcast_ref::<UInt16Array>()
882        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt16Array downcast"))?;
883    let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
884    let value = match stat {
885        Stat::Min => min.map(|v| v as i64),
886        Stat::Max => max.map(|v| v as i64),
887        Stat::Mean => mean.map(|v| v as i64),
888    };
889    match value {
890        Some(v) => Ok(Arc::new(fill_uint16(col, &Scalar::Int64(v))?)),
891        None => Ok(col.clone()),
892    }
893}
894
895fn fill_stat_u32(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
896    let array = col
897        .as_any()
898        .downcast_ref::<UInt32Array>()
899        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt32Array downcast"))?;
900    let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
901    let value = match stat {
902        Stat::Min => min.map(|v| v as i64),
903        Stat::Max => max.map(|v| v as i64),
904        Stat::Mean => mean.map(|v| v as i64),
905    };
906    match value {
907        Some(v) => Ok(Arc::new(fill_uint32(col, &Scalar::Int64(v))?)),
908        None => Ok(col.clone()),
909    }
910}
911
912fn fill_stat_u64(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
913    let array = col
914        .as_any()
915        .downcast_ref::<UInt64Array>()
916        .ok_or_else(|| DataFrameError::invalid_operation("bad UInt64Array downcast"))?;
917    let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
918    let value = match stat {
919        Stat::Min => min.map(|v| v as i64),
920        Stat::Max => max.map(|v| v as i64),
921        Stat::Mean => mean.map(|v| v as i64),
922    };
923    match value {
924        Some(v) => Ok(Arc::new(fill_uint64(col, &Scalar::Int64(v))?)),
925        None => Ok(col.clone()),
926    }
927}
928
929fn fill_stat_f32(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
930    let array = col
931        .as_any()
932        .downcast_ref::<Float32Array>()
933        .ok_or_else(|| DataFrameError::invalid_operation("bad Float32Array downcast"))?;
934    let (min, max, mean) = stats_float(array.iter().flatten().map(|v| v as f64));
935    let value = match stat {
936        Stat::Min => min,
937        Stat::Max => max,
938        Stat::Mean => mean,
939    };
940    match value {
941        Some(v) => Ok(Arc::new(fill_float32(col, &Scalar::Float64(v))?)),
942        None => Ok(col.clone()),
943    }
944}
945
946fn fill_stat_f64(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
947    let array = col
948        .as_any()
949        .downcast_ref::<Float64Array>()
950        .ok_or_else(|| DataFrameError::invalid_operation("bad Float64Array downcast"))?;
951    let (min, max, mean) = stats_float(array.iter().flatten());
952    let value = match stat {
953        Stat::Min => min,
954        Stat::Max => max,
955        Stat::Mean => mean,
956    };
957    match value {
958        Some(v) => Ok(Arc::new(fill_float64(col, &Scalar::Float64(v))?)),
959        None => Ok(col.clone()),
960    }
961}
962
963fn stats_signed<I>(values: I) -> (Option<i128>, Option<i128>, Option<i128>)
964where
965    I: Iterator<Item = i128>,
966{
967    let mut min: Option<i128> = None;
968    let mut max: Option<i128> = None;
969    let mut sum: i128 = 0;
970    let mut count = 0;
971    for v in values {
972        min = Some(min.map_or(v, |m| m.min(v)));
973        max = Some(max.map_or(v, |m| m.max(v)));
974        sum += v;
975        count += 1;
976    }
977    let mean = if count == 0 { None } else { Some(sum / count) };
978    (min, max, mean)
979}
980
981fn stats_unsigned<I>(values: I) -> (Option<u128>, Option<u128>, Option<u128>)
982where
983    I: Iterator<Item = u128>,
984{
985    let mut min: Option<u128> = None;
986    let mut max: Option<u128> = None;
987    let mut sum: u128 = 0;
988    let mut count = 0;
989    for v in values {
990        min = Some(min.map_or(v, |m| m.min(v)));
991        max = Some(max.map_or(v, |m| m.max(v)));
992        sum += v;
993        count += 1;
994    }
995    let mean = if count == 0 { None } else { Some(sum / count) };
996    (min, max, mean)
997}
998
999fn stats_float<I>(values: I) -> (Option<f64>, Option<f64>, Option<f64>)
1000where
1001    I: Iterator<Item = f64>,
1002{
1003    let mut min: Option<f64> = None;
1004    let mut max: Option<f64> = None;
1005    let mut sum = 0.0_f64;
1006    let mut count = 0;
1007    for v in values {
1008        min = Some(min.map_or(v, |m| m.min(v)));
1009        max = Some(max.map_or(v, |m| m.max(v)));
1010        sum += v;
1011        count += 1;
1012    }
1013    let mean = if count == 0 {
1014        None
1015    } else {
1016        Some(sum / count as f64)
1017    };
1018    (min, max, mean)
1019}