1use std::sync::Arc;
2
3use arrow::array::{
4 Array, BooleanArray, BooleanBuilder, Float32Array, Float32Builder, Float64Array,
5 Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array, Int64Builder,
6 Int8Array, Int8Builder, StringArray, StringBuilder, UInt16Array, UInt16Builder, UInt32Array,
7 UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder,
8};
9use arrow::datatypes::{DataType, Field, Schema};
10use arrow::record_batch::RecordBatch;
11
12use crate::expr::Scalar;
13use crate::ops::{FillNull, FillNullStrategy};
14use crate::{DataFrameError, Result};
15
16pub fn fill_null_batches(input: Vec<RecordBatch>, fill: &FillNull) -> Result<Vec<RecordBatch>> {
17 let batch = concat_batches(&input)?;
18 if batch.num_rows() == 0 {
19 return Ok(vec![batch]);
20 }
21
22 let arrays = match fill {
23 FillNull::Value(value) => fill_with_scalar(&batch, value)?,
24 FillNull::Strategy(strategy) => fill_with_strategy(&batch, *strategy)?,
25 };
26
27 let batch = RecordBatch::try_new(batch.schema(), arrays).map_err(|e| {
28 DataFrameError::schema_mismatch(format!("failed to build RecordBatch: {e}"))
29 })?;
30 Ok(vec![batch])
31}
32
33pub fn drop_nulls_batches(
34 input: Vec<RecordBatch>,
35 subset: Option<&[String]>,
36) -> Result<Vec<RecordBatch>> {
37 let batch = concat_batches(&input)?;
38 if batch.num_rows() == 0 {
39 return Ok(vec![batch]);
40 }
41 let indices = resolve_subset(&batch, subset)?;
42
43 let mut mask_builder = BooleanBuilder::with_capacity(batch.num_rows());
44 for row in 0..batch.num_rows() {
45 let mut keep = true;
46 for idx in &indices {
47 if batch.column(*idx).is_null(row) {
48 keep = false;
49 break;
50 }
51 }
52 mask_builder.append_value(keep);
53 }
54 let mask = mask_builder.finish();
55 let batch = arrow::compute::filter_record_batch(&batch, &mask)
56 .map_err(|source| DataFrameError::Arrow { source })?;
57 Ok(vec![batch])
58}
59
60pub fn null_count_batches(input: Vec<RecordBatch>) -> Result<Vec<RecordBatch>> {
61 if input.is_empty() {
62 return Ok(vec![RecordBatch::new_empty(Arc::new(Schema::empty()))]);
63 }
64
65 let schema = input[0].schema();
66 let mut counts = vec![0_u64; schema.fields().len()];
67 for batch in &input {
68 for (idx, col) in batch.columns().iter().enumerate() {
69 counts[idx] += col.null_count() as u64;
70 }
71 }
72
73 let mut fields = Vec::with_capacity(counts.len());
74 let mut arrays = Vec::with_capacity(counts.len());
75 for (idx, field) in schema.fields().iter().enumerate() {
76 fields.push(Field::new(field.name(), DataType::UInt64, false));
77 let array = UInt64Array::from(vec![counts[idx]]);
78 arrays.push(Arc::new(array) as _);
79 }
80
81 let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(|e| {
82 DataFrameError::schema_mismatch(format!("failed to build RecordBatch: {e}"))
83 })?;
84 Ok(vec![batch])
85}
86
87fn concat_batches(batches: &[RecordBatch]) -> Result<RecordBatch> {
88 if batches.is_empty() {
89 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
90 }
91 let schema = batches[0].schema();
92 if batches.len() == 1 {
93 return Ok(batches[0].clone());
94 }
95 arrow::compute::concat_batches(&schema, batches)
96 .map_err(|source| DataFrameError::Arrow { source })
97}
98
99fn resolve_subset(batch: &RecordBatch, subset: Option<&[String]>) -> Result<Vec<usize>> {
100 let schema = batch.schema();
101 let indices = match subset {
102 Some(cols) => {
103 if cols.is_empty() {
104 return Err(DataFrameError::invalid_operation(
105 "drop_nulls subset must be non-empty",
106 ));
107 }
108 cols.iter()
109 .map(|name| {
110 schema
111 .fields()
112 .iter()
113 .position(|f| f.name() == name)
114 .ok_or_else(|| DataFrameError::column_not_found(name.clone()))
115 })
116 .collect::<Result<Vec<_>>>()?
117 }
118 None => (0..schema.fields().len()).collect(),
119 };
120 Ok(indices)
121}
122
123fn fill_with_scalar(batch: &RecordBatch, value: &Scalar) -> Result<Vec<Arc<dyn Array>>> {
124 let mut arrays = Vec::with_capacity(batch.num_columns());
125 for col in batch.columns() {
126 let filled: Arc<dyn Array> = match col.data_type() {
127 DataType::Boolean => Arc::new(fill_boolean(col, value)?),
128 DataType::Int8 => Arc::new(fill_int8(col, value)?),
129 DataType::Int16 => Arc::new(fill_int16(col, value)?),
130 DataType::Int32 => Arc::new(fill_int32(col, value)?),
131 DataType::Int64 => Arc::new(fill_int64(col, value)?),
132 DataType::UInt8 => Arc::new(fill_uint8(col, value)?),
133 DataType::UInt16 => Arc::new(fill_uint16(col, value)?),
134 DataType::UInt32 => Arc::new(fill_uint32(col, value)?),
135 DataType::UInt64 => Arc::new(fill_uint64(col, value)?),
136 DataType::Float32 => Arc::new(fill_float32(col, value)?),
137 DataType::Float64 => Arc::new(fill_float64(col, value)?),
138 DataType::Utf8 => Arc::new(fill_utf8(col, value)?),
139 other => {
140 return Err(DataFrameError::type_mismatch(
141 None::<String>,
142 other.to_string(),
143 format!("{value:?}"),
144 ))
145 }
146 };
147 arrays.push(filled);
148 }
149 Ok(arrays)
150}
151
152fn fill_with_strategy(
153 batch: &RecordBatch,
154 strategy: FillNullStrategy,
155) -> Result<Vec<Arc<dyn Array>>> {
156 let mut arrays = Vec::with_capacity(batch.num_columns());
157 for col in batch.columns() {
158 let filled: Arc<dyn Array> = match strategy {
159 FillNullStrategy::Forward => Arc::new(fill_forward(col)?),
160 FillNullStrategy::Backward => Arc::new(fill_backward(col)?),
161 FillNullStrategy::Min => Arc::new(fill_min(col)?),
162 FillNullStrategy::Max => Arc::new(fill_max(col)?),
163 FillNullStrategy::Mean => Arc::new(fill_mean(col)?),
164 FillNullStrategy::Zero => Arc::new(fill_numeric_constant(col, 0)?),
165 FillNullStrategy::One => Arc::new(fill_numeric_constant(col, 1)?),
166 };
167 arrays.push(filled);
168 }
169 Ok(arrays)
170}
171
172fn fill_boolean(col: &Arc<dyn Array>, value: &Scalar) -> Result<BooleanArray> {
173 let array = col
174 .as_any()
175 .downcast_ref::<BooleanArray>()
176 .ok_or_else(|| DataFrameError::invalid_operation("bad BooleanArray downcast"))?;
177 let fill = match value {
178 Scalar::Boolean(v) => *v,
179 Scalar::Null => return Ok(array.clone()),
180 other => {
181 return Err(DataFrameError::type_mismatch(
182 None::<String>,
183 "Boolean".to_string(),
184 format!("{other:?}"),
185 ))
186 }
187 };
188
189 let mut builder = BooleanBuilder::with_capacity(array.len());
190 for i in 0..array.len() {
191 if array.is_null(i) {
192 builder.append_value(fill);
193 } else {
194 builder.append_value(array.value(i));
195 }
196 }
197 Ok(builder.finish())
198}
199
200fn fill_int8(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int8Array> {
201 let array = col
202 .as_any()
203 .downcast_ref::<Int8Array>()
204 .ok_or_else(|| DataFrameError::invalid_operation("bad Int8Array downcast"))?;
205 let fill = match value {
206 Scalar::Int64(v) => i8::try_from(*v)
207 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "Int8", "out of range"))?,
208 Scalar::Null => return Ok(array.clone()),
209 other => {
210 return Err(DataFrameError::type_mismatch(
211 None::<String>,
212 "Int8".to_string(),
213 format!("{other:?}"),
214 ))
215 }
216 };
217
218 let mut builder = Int8Builder::with_capacity(array.len());
219 for i in 0..array.len() {
220 if array.is_null(i) {
221 builder.append_value(fill);
222 } else {
223 builder.append_value(array.value(i));
224 }
225 }
226 Ok(builder.finish())
227}
228
229fn fill_int16(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int16Array> {
230 let array = col
231 .as_any()
232 .downcast_ref::<Int16Array>()
233 .ok_or_else(|| DataFrameError::invalid_operation("bad Int16Array downcast"))?;
234 let fill = match value {
235 Scalar::Int64(v) => i16::try_from(*v)
236 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "Int16", "out of range"))?,
237 Scalar::Null => return Ok(array.clone()),
238 other => {
239 return Err(DataFrameError::type_mismatch(
240 None::<String>,
241 "Int16".to_string(),
242 format!("{other:?}"),
243 ))
244 }
245 };
246
247 let mut builder = Int16Builder::with_capacity(array.len());
248 for i in 0..array.len() {
249 if array.is_null(i) {
250 builder.append_value(fill);
251 } else {
252 builder.append_value(array.value(i));
253 }
254 }
255 Ok(builder.finish())
256}
257
258fn fill_int32(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int32Array> {
259 let array = col
260 .as_any()
261 .downcast_ref::<Int32Array>()
262 .ok_or_else(|| DataFrameError::invalid_operation("bad Int32Array downcast"))?;
263 let fill = match value {
264 Scalar::Int64(v) => i32::try_from(*v)
265 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "Int32", "out of range"))?,
266 Scalar::Null => return Ok(array.clone()),
267 other => {
268 return Err(DataFrameError::type_mismatch(
269 None::<String>,
270 "Int32".to_string(),
271 format!("{other:?}"),
272 ))
273 }
274 };
275
276 let mut builder = Int32Builder::with_capacity(array.len());
277 for i in 0..array.len() {
278 if array.is_null(i) {
279 builder.append_value(fill);
280 } else {
281 builder.append_value(array.value(i));
282 }
283 }
284 Ok(builder.finish())
285}
286
287fn fill_int64(col: &Arc<dyn Array>, value: &Scalar) -> Result<Int64Array> {
288 let array = col
289 .as_any()
290 .downcast_ref::<Int64Array>()
291 .ok_or_else(|| DataFrameError::invalid_operation("bad Int64Array downcast"))?;
292 let fill = match value {
293 Scalar::Int64(v) => *v,
294 Scalar::Null => return Ok(array.clone()),
295 other => {
296 return Err(DataFrameError::type_mismatch(
297 None::<String>,
298 "Int64".to_string(),
299 format!("{other:?}"),
300 ))
301 }
302 };
303
304 let mut builder = Int64Builder::with_capacity(array.len());
305 for i in 0..array.len() {
306 if array.is_null(i) {
307 builder.append_value(fill);
308 } else {
309 builder.append_value(array.value(i));
310 }
311 }
312 Ok(builder.finish())
313}
314
315fn fill_uint8(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt8Array> {
316 let array = col
317 .as_any()
318 .downcast_ref::<UInt8Array>()
319 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt8Array downcast"))?;
320 let fill = match value {
321 Scalar::Int64(v) => u8::try_from(*v)
322 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt8", "out of range"))?,
323 Scalar::Null => return Ok(array.clone()),
324 other => {
325 return Err(DataFrameError::type_mismatch(
326 None::<String>,
327 "UInt8".to_string(),
328 format!("{other:?}"),
329 ))
330 }
331 };
332
333 let mut builder = UInt8Builder::with_capacity(array.len());
334 for i in 0..array.len() {
335 if array.is_null(i) {
336 builder.append_value(fill);
337 } else {
338 builder.append_value(array.value(i));
339 }
340 }
341 Ok(builder.finish())
342}
343
344fn fill_uint16(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt16Array> {
345 let array = col
346 .as_any()
347 .downcast_ref::<UInt16Array>()
348 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt16Array downcast"))?;
349 let fill = match value {
350 Scalar::Int64(v) => u16::try_from(*v)
351 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt16", "out of range"))?,
352 Scalar::Null => return Ok(array.clone()),
353 other => {
354 return Err(DataFrameError::type_mismatch(
355 None::<String>,
356 "UInt16".to_string(),
357 format!("{other:?}"),
358 ))
359 }
360 };
361
362 let mut builder = UInt16Builder::with_capacity(array.len());
363 for i in 0..array.len() {
364 if array.is_null(i) {
365 builder.append_value(fill);
366 } else {
367 builder.append_value(array.value(i));
368 }
369 }
370 Ok(builder.finish())
371}
372
373fn fill_uint32(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt32Array> {
374 let array = col
375 .as_any()
376 .downcast_ref::<UInt32Array>()
377 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt32Array downcast"))?;
378 let fill = match value {
379 Scalar::Int64(v) => u32::try_from(*v)
380 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt32", "out of range"))?,
381 Scalar::Null => return Ok(array.clone()),
382 other => {
383 return Err(DataFrameError::type_mismatch(
384 None::<String>,
385 "UInt32".to_string(),
386 format!("{other:?}"),
387 ))
388 }
389 };
390
391 let mut builder = UInt32Builder::with_capacity(array.len());
392 for i in 0..array.len() {
393 if array.is_null(i) {
394 builder.append_value(fill);
395 } else {
396 builder.append_value(array.value(i));
397 }
398 }
399 Ok(builder.finish())
400}
401
402fn fill_uint64(col: &Arc<dyn Array>, value: &Scalar) -> Result<UInt64Array> {
403 let array = col
404 .as_any()
405 .downcast_ref::<UInt64Array>()
406 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt64Array downcast"))?;
407 let fill = match value {
408 Scalar::Int64(v) => u64::try_from(*v)
409 .map_err(|_| DataFrameError::type_mismatch(None::<String>, "UInt64", "out of range"))?,
410 Scalar::Null => return Ok(array.clone()),
411 other => {
412 return Err(DataFrameError::type_mismatch(
413 None::<String>,
414 "UInt64".to_string(),
415 format!("{other:?}"),
416 ))
417 }
418 };
419
420 let mut builder = UInt64Builder::with_capacity(array.len());
421 for i in 0..array.len() {
422 if array.is_null(i) {
423 builder.append_value(fill);
424 } else {
425 builder.append_value(array.value(i));
426 }
427 }
428 Ok(builder.finish())
429}
430
431fn fill_float32(col: &Arc<dyn Array>, value: &Scalar) -> Result<Float32Array> {
432 let array = col
433 .as_any()
434 .downcast_ref::<Float32Array>()
435 .ok_or_else(|| DataFrameError::invalid_operation("bad Float32Array downcast"))?;
436 let fill = match value {
437 Scalar::Float64(v) => *v as f32,
438 Scalar::Int64(v) => *v as f32,
439 Scalar::Null => return Ok(array.clone()),
440 other => {
441 return Err(DataFrameError::type_mismatch(
442 None::<String>,
443 "Float32".to_string(),
444 format!("{other:?}"),
445 ))
446 }
447 };
448
449 let mut builder = Float32Builder::with_capacity(array.len());
450 for i in 0..array.len() {
451 if array.is_null(i) {
452 builder.append_value(fill);
453 } else {
454 builder.append_value(array.value(i));
455 }
456 }
457 Ok(builder.finish())
458}
459
460fn fill_float64(col: &Arc<dyn Array>, value: &Scalar) -> Result<Float64Array> {
461 let array = col
462 .as_any()
463 .downcast_ref::<Float64Array>()
464 .ok_or_else(|| DataFrameError::invalid_operation("bad Float64Array downcast"))?;
465 let fill = match value {
466 Scalar::Float64(v) => *v,
467 Scalar::Int64(v) => *v as f64,
468 Scalar::Null => return Ok(array.clone()),
469 other => {
470 return Err(DataFrameError::type_mismatch(
471 None::<String>,
472 "Float64".to_string(),
473 format!("{other:?}"),
474 ))
475 }
476 };
477
478 let mut builder = Float64Builder::with_capacity(array.len());
479 for i in 0..array.len() {
480 if array.is_null(i) {
481 builder.append_value(fill);
482 } else {
483 builder.append_value(array.value(i));
484 }
485 }
486 Ok(builder.finish())
487}
488
489fn fill_utf8(col: &Arc<dyn Array>, value: &Scalar) -> Result<StringArray> {
490 let array = col
491 .as_any()
492 .downcast_ref::<StringArray>()
493 .ok_or_else(|| DataFrameError::invalid_operation("bad StringArray downcast"))?;
494 let fill = match value {
495 Scalar::Utf8(v) => v.as_str(),
496 Scalar::Null => return Ok(array.clone()),
497 other => {
498 return Err(DataFrameError::type_mismatch(
499 None::<String>,
500 "Utf8".to_string(),
501 format!("{other:?}"),
502 ))
503 }
504 };
505
506 let mut builder = StringBuilder::with_capacity(array.len(), array.value_data().len());
507 for i in 0..array.len() {
508 if array.is_null(i) {
509 builder.append_value(fill);
510 } else {
511 builder.append_value(array.value(i));
512 }
513 }
514 Ok(builder.finish())
515}
516
517fn fill_forward(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
518 match col.data_type() {
519 DataType::Boolean => Ok(Arc::new(fill_forward_bool(col)?)),
520 DataType::Int8 => Ok(Arc::new(fill_forward_i8(col)?)),
521 DataType::Int16 => Ok(Arc::new(fill_forward_i16(col)?)),
522 DataType::Int32 => Ok(Arc::new(fill_forward_i32(col)?)),
523 DataType::Int64 => Ok(Arc::new(fill_forward_i64(col)?)),
524 DataType::UInt8 => Ok(Arc::new(fill_forward_u8(col)?)),
525 DataType::UInt16 => Ok(Arc::new(fill_forward_u16(col)?)),
526 DataType::UInt32 => Ok(Arc::new(fill_forward_u32(col)?)),
527 DataType::UInt64 => Ok(Arc::new(fill_forward_u64(col)?)),
528 DataType::Float32 => Ok(Arc::new(fill_forward_f32(col)?)),
529 DataType::Float64 => Ok(Arc::new(fill_forward_f64(col)?)),
530 DataType::Utf8 => Ok(Arc::new(fill_forward_utf8(col)?)),
531 other => Err(DataFrameError::invalid_operation(format!(
532 "unsupported fill_null forward type {other:?}",
533 ))),
534 }
535}
536
537fn fill_backward(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
538 match col.data_type() {
539 DataType::Boolean => Ok(Arc::new(fill_backward_bool(col)?)),
540 DataType::Int8 => Ok(Arc::new(fill_backward_i8(col)?)),
541 DataType::Int16 => Ok(Arc::new(fill_backward_i16(col)?)),
542 DataType::Int32 => Ok(Arc::new(fill_backward_i32(col)?)),
543 DataType::Int64 => Ok(Arc::new(fill_backward_i64(col)?)),
544 DataType::UInt8 => Ok(Arc::new(fill_backward_u8(col)?)),
545 DataType::UInt16 => Ok(Arc::new(fill_backward_u16(col)?)),
546 DataType::UInt32 => Ok(Arc::new(fill_backward_u32(col)?)),
547 DataType::UInt64 => Ok(Arc::new(fill_backward_u64(col)?)),
548 DataType::Float32 => Ok(Arc::new(fill_backward_f32(col)?)),
549 DataType::Float64 => Ok(Arc::new(fill_backward_f64(col)?)),
550 DataType::Utf8 => Ok(Arc::new(fill_backward_utf8(col)?)),
551 other => Err(DataFrameError::invalid_operation(format!(
552 "unsupported fill_null backward type {other:?}",
553 ))),
554 }
555}
556
557fn fill_min(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
558 fill_numeric_stat(col, Stat::Min)
559}
560
561fn fill_max(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
562 fill_numeric_stat(col, Stat::Max)
563}
564
565fn fill_mean(col: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
566 fill_numeric_stat(col, Stat::Mean)
567}
568
569fn fill_numeric_constant(col: &Arc<dyn Array>, value: i64) -> Result<Arc<dyn Array>> {
570 match col.data_type() {
571 DataType::Int8 => Ok(Arc::new(fill_int8(col, &Scalar::Int64(value))?)),
572 DataType::Int16 => Ok(Arc::new(fill_int16(col, &Scalar::Int64(value))?)),
573 DataType::Int32 => Ok(Arc::new(fill_int32(col, &Scalar::Int64(value))?)),
574 DataType::Int64 => Ok(Arc::new(fill_int64(col, &Scalar::Int64(value))?)),
575 DataType::UInt8 => Ok(Arc::new(fill_uint8(col, &Scalar::Int64(value))?)),
576 DataType::UInt16 => Ok(Arc::new(fill_uint16(col, &Scalar::Int64(value))?)),
577 DataType::UInt32 => Ok(Arc::new(fill_uint32(col, &Scalar::Int64(value))?)),
578 DataType::UInt64 => Ok(Arc::new(fill_uint64(col, &Scalar::Int64(value))?)),
579 DataType::Float32 => Ok(Arc::new(fill_float32(col, &Scalar::Int64(value))?)),
580 DataType::Float64 => Ok(Arc::new(fill_float64(col, &Scalar::Int64(value))?)),
581 other => Err(DataFrameError::type_mismatch(
582 None::<String>,
583 "numeric".to_string(),
584 other.to_string(),
585 )),
586 }
587}
588
589enum Stat {
590 Min,
591 Max,
592 Mean,
593}
594
595fn fill_numeric_stat(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
596 match col.data_type() {
597 DataType::Int8 => fill_stat_i8(col, stat),
598 DataType::Int16 => fill_stat_i16(col, stat),
599 DataType::Int32 => fill_stat_i32(col, stat),
600 DataType::Int64 => fill_stat_i64(col, stat),
601 DataType::UInt8 => fill_stat_u8(col, stat),
602 DataType::UInt16 => fill_stat_u16(col, stat),
603 DataType::UInt32 => fill_stat_u32(col, stat),
604 DataType::UInt64 => fill_stat_u64(col, stat),
605 DataType::Float32 => fill_stat_f32(col, stat),
606 DataType::Float64 => fill_stat_f64(col, stat),
607 other => Err(DataFrameError::type_mismatch(
608 None::<String>,
609 "numeric".to_string(),
610 other.to_string(),
611 )),
612 }
613}
614
615fn fill_forward_bool(col: &Arc<dyn Array>) -> Result<BooleanArray> {
616 let array = col
617 .as_any()
618 .downcast_ref::<BooleanArray>()
619 .ok_or_else(|| DataFrameError::invalid_operation("bad BooleanArray downcast"))?;
620 let mut builder = BooleanBuilder::with_capacity(array.len());
621 let mut last: Option<bool> = None;
622 for i in 0..array.len() {
623 if array.is_null(i) {
624 match last {
625 Some(v) => builder.append_value(v),
626 None => builder.append_null(),
627 }
628 } else {
629 let v = array.value(i);
630 last = Some(v);
631 builder.append_value(v);
632 }
633 }
634 Ok(builder.finish())
635}
636
637fn fill_backward_bool(col: &Arc<dyn Array>) -> Result<BooleanArray> {
638 let array = col
639 .as_any()
640 .downcast_ref::<BooleanArray>()
641 .ok_or_else(|| DataFrameError::invalid_operation("bad BooleanArray downcast"))?;
642 let mut tmp: Vec<Option<bool>> = Vec::with_capacity(array.len());
643 let mut next: Option<bool> = None;
644 for i in (0..array.len()).rev() {
645 if array.is_null(i) {
646 tmp.push(next);
647 } else {
648 let v = array.value(i);
649 next = Some(v);
650 tmp.push(Some(v));
651 }
652 }
653 tmp.reverse();
654 let mut builder = BooleanBuilder::with_capacity(array.len());
655 for v in tmp {
656 match v {
657 Some(v) => builder.append_value(v),
658 None => builder.append_null(),
659 }
660 }
661 Ok(builder.finish())
662}
663
664macro_rules! fill_forward_numeric {
665 ($name:ident, $array_ty:ty, $builder_ty:ty, $cast:expr) => {
666 fn $name(col: &Arc<dyn Array>) -> Result<$array_ty> {
667 let array = col
668 .as_any()
669 .downcast_ref::<$array_ty>()
670 .ok_or_else(|| DataFrameError::invalid_operation("bad array downcast"))?;
671 let mut builder = <$builder_ty>::with_capacity(array.len());
672 let mut last = None;
673 for i in 0..array.len() {
674 if array.is_null(i) {
675 match last {
676 Some(v) => builder.append_value(v),
677 None => builder.append_null(),
678 }
679 } else {
680 let v = array.value(i);
681 let v = $cast(v);
682 last = Some(v);
683 builder.append_value(v);
684 }
685 }
686 Ok(builder.finish())
687 }
688 };
689}
690
691macro_rules! fill_backward_numeric {
692 ($name:ident, $array_ty:ty, $builder_ty:ty, $cast:expr) => {
693 fn $name(col: &Arc<dyn Array>) -> Result<$array_ty> {
694 let array = col
695 .as_any()
696 .downcast_ref::<$array_ty>()
697 .ok_or_else(|| DataFrameError::invalid_operation("bad array downcast"))?;
698 let mut tmp = Vec::with_capacity(array.len());
699 let mut next = None;
700 for i in (0..array.len()).rev() {
701 if array.is_null(i) {
702 tmp.push(next);
703 } else {
704 let v = $cast(array.value(i));
705 next = Some(v);
706 tmp.push(Some(v));
707 }
708 }
709 tmp.reverse();
710 let mut builder = <$builder_ty>::with_capacity(array.len());
711 for v in tmp {
712 match v {
713 Some(v) => builder.append_value(v),
714 None => builder.append_null(),
715 }
716 }
717 Ok(builder.finish())
718 }
719 };
720}
721
722fill_forward_numeric!(fill_forward_i8, Int8Array, Int8Builder, |v: i8| v);
723fill_forward_numeric!(fill_forward_i16, Int16Array, Int16Builder, |v: i16| v);
724fill_forward_numeric!(fill_forward_i32, Int32Array, Int32Builder, |v: i32| v);
725fill_forward_numeric!(fill_forward_i64, Int64Array, Int64Builder, |v: i64| v);
726fill_forward_numeric!(fill_forward_u8, UInt8Array, UInt8Builder, |v: u8| v);
727fill_forward_numeric!(fill_forward_u16, UInt16Array, UInt16Builder, |v: u16| v);
728fill_forward_numeric!(fill_forward_u32, UInt32Array, UInt32Builder, |v: u32| v);
729fill_forward_numeric!(fill_forward_u64, UInt64Array, UInt64Builder, |v: u64| v);
730fill_forward_numeric!(fill_forward_f32, Float32Array, Float32Builder, |v: f32| v);
731fill_forward_numeric!(fill_forward_f64, Float64Array, Float64Builder, |v: f64| v);
732
733fill_backward_numeric!(fill_backward_i8, Int8Array, Int8Builder, |v: i8| v);
734fill_backward_numeric!(fill_backward_i16, Int16Array, Int16Builder, |v: i16| v);
735fill_backward_numeric!(fill_backward_i32, Int32Array, Int32Builder, |v: i32| v);
736fill_backward_numeric!(fill_backward_i64, Int64Array, Int64Builder, |v: i64| v);
737fill_backward_numeric!(fill_backward_u8, UInt8Array, UInt8Builder, |v: u8| v);
738fill_backward_numeric!(fill_backward_u16, UInt16Array, UInt16Builder, |v: u16| v);
739fill_backward_numeric!(fill_backward_u32, UInt32Array, UInt32Builder, |v: u32| v);
740fill_backward_numeric!(fill_backward_u64, UInt64Array, UInt64Builder, |v: u64| v);
741fill_backward_numeric!(fill_backward_f32, Float32Array, Float32Builder, |v: f32| v);
742fill_backward_numeric!(fill_backward_f64, Float64Array, Float64Builder, |v: f64| v);
743
744fn fill_forward_utf8(col: &Arc<dyn Array>) -> Result<StringArray> {
745 let array = col
746 .as_any()
747 .downcast_ref::<StringArray>()
748 .ok_or_else(|| DataFrameError::invalid_operation("bad StringArray downcast"))?;
749 let mut builder = StringBuilder::with_capacity(array.len(), array.value_data().len());
750 let mut last: Option<String> = None;
751 for i in 0..array.len() {
752 if array.is_null(i) {
753 match last.as_deref() {
754 Some(v) => builder.append_value(v),
755 None => builder.append_null(),
756 }
757 } else {
758 let v = array.value(i).to_string();
759 last = Some(v.clone());
760 builder.append_value(v);
761 }
762 }
763 Ok(builder.finish())
764}
765
766fn fill_backward_utf8(col: &Arc<dyn Array>) -> Result<StringArray> {
767 let array = col
768 .as_any()
769 .downcast_ref::<StringArray>()
770 .ok_or_else(|| DataFrameError::invalid_operation("bad StringArray downcast"))?;
771 let mut tmp = Vec::with_capacity(array.len());
772 let mut next: Option<String> = None;
773 for i in (0..array.len()).rev() {
774 if array.is_null(i) {
775 tmp.push(next.clone());
776 } else {
777 let v = array.value(i).to_string();
778 next = Some(v.clone());
779 tmp.push(Some(v));
780 }
781 }
782 tmp.reverse();
783 let mut builder = StringBuilder::with_capacity(array.len(), array.value_data().len());
784 for v in tmp {
785 match v {
786 Some(v) => builder.append_value(v),
787 None => builder.append_null(),
788 }
789 }
790 Ok(builder.finish())
791}
792
793fn fill_stat_i8(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
794 let array = col
795 .as_any()
796 .downcast_ref::<Int8Array>()
797 .ok_or_else(|| DataFrameError::invalid_operation("bad Int8Array downcast"))?;
798 let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
799 let value = match stat {
800 Stat::Min => min.map(|v| v as i64),
801 Stat::Max => max.map(|v| v as i64),
802 Stat::Mean => mean.map(|v| v as i64),
803 };
804 match value {
805 Some(v) => Ok(Arc::new(fill_int8(col, &Scalar::Int64(v))?)),
806 None => Ok(col.clone()),
807 }
808}
809
810fn fill_stat_i16(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
811 let array = col
812 .as_any()
813 .downcast_ref::<Int16Array>()
814 .ok_or_else(|| DataFrameError::invalid_operation("bad Int16Array downcast"))?;
815 let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
816 let value = match stat {
817 Stat::Min => min.map(|v| v as i64),
818 Stat::Max => max.map(|v| v as i64),
819 Stat::Mean => mean.map(|v| v as i64),
820 };
821 match value {
822 Some(v) => Ok(Arc::new(fill_int16(col, &Scalar::Int64(v))?)),
823 None => Ok(col.clone()),
824 }
825}
826
827fn fill_stat_i32(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
828 let array = col
829 .as_any()
830 .downcast_ref::<Int32Array>()
831 .ok_or_else(|| DataFrameError::invalid_operation("bad Int32Array downcast"))?;
832 let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
833 let value = match stat {
834 Stat::Min => min.map(|v| v as i64),
835 Stat::Max => max.map(|v| v as i64),
836 Stat::Mean => mean.map(|v| v as i64),
837 };
838 match value {
839 Some(v) => Ok(Arc::new(fill_int32(col, &Scalar::Int64(v))?)),
840 None => Ok(col.clone()),
841 }
842}
843
844fn fill_stat_i64(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
845 let array = col
846 .as_any()
847 .downcast_ref::<Int64Array>()
848 .ok_or_else(|| DataFrameError::invalid_operation("bad Int64Array downcast"))?;
849 let (min, max, mean) = stats_signed(array.iter().flatten().map(|v| v as i128));
850 let value = match stat {
851 Stat::Min => min.map(|v| v as i64),
852 Stat::Max => max.map(|v| v as i64),
853 Stat::Mean => mean.map(|v| v as i64),
854 };
855 match value {
856 Some(v) => Ok(Arc::new(fill_int64(col, &Scalar::Int64(v))?)),
857 None => Ok(col.clone()),
858 }
859}
860
861fn fill_stat_u8(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
862 let array = col
863 .as_any()
864 .downcast_ref::<UInt8Array>()
865 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt8Array downcast"))?;
866 let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
867 let value = match stat {
868 Stat::Min => min.map(|v| v as i64),
869 Stat::Max => max.map(|v| v as i64),
870 Stat::Mean => mean.map(|v| v as i64),
871 };
872 match value {
873 Some(v) => Ok(Arc::new(fill_uint8(col, &Scalar::Int64(v))?)),
874 None => Ok(col.clone()),
875 }
876}
877
878fn fill_stat_u16(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
879 let array = col
880 .as_any()
881 .downcast_ref::<UInt16Array>()
882 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt16Array downcast"))?;
883 let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
884 let value = match stat {
885 Stat::Min => min.map(|v| v as i64),
886 Stat::Max => max.map(|v| v as i64),
887 Stat::Mean => mean.map(|v| v as i64),
888 };
889 match value {
890 Some(v) => Ok(Arc::new(fill_uint16(col, &Scalar::Int64(v))?)),
891 None => Ok(col.clone()),
892 }
893}
894
895fn fill_stat_u32(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
896 let array = col
897 .as_any()
898 .downcast_ref::<UInt32Array>()
899 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt32Array downcast"))?;
900 let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
901 let value = match stat {
902 Stat::Min => min.map(|v| v as i64),
903 Stat::Max => max.map(|v| v as i64),
904 Stat::Mean => mean.map(|v| v as i64),
905 };
906 match value {
907 Some(v) => Ok(Arc::new(fill_uint32(col, &Scalar::Int64(v))?)),
908 None => Ok(col.clone()),
909 }
910}
911
912fn fill_stat_u64(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
913 let array = col
914 .as_any()
915 .downcast_ref::<UInt64Array>()
916 .ok_or_else(|| DataFrameError::invalid_operation("bad UInt64Array downcast"))?;
917 let (min, max, mean) = stats_unsigned(array.iter().flatten().map(|v| v as u128));
918 let value = match stat {
919 Stat::Min => min.map(|v| v as i64),
920 Stat::Max => max.map(|v| v as i64),
921 Stat::Mean => mean.map(|v| v as i64),
922 };
923 match value {
924 Some(v) => Ok(Arc::new(fill_uint64(col, &Scalar::Int64(v))?)),
925 None => Ok(col.clone()),
926 }
927}
928
929fn fill_stat_f32(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
930 let array = col
931 .as_any()
932 .downcast_ref::<Float32Array>()
933 .ok_or_else(|| DataFrameError::invalid_operation("bad Float32Array downcast"))?;
934 let (min, max, mean) = stats_float(array.iter().flatten().map(|v| v as f64));
935 let value = match stat {
936 Stat::Min => min,
937 Stat::Max => max,
938 Stat::Mean => mean,
939 };
940 match value {
941 Some(v) => Ok(Arc::new(fill_float32(col, &Scalar::Float64(v))?)),
942 None => Ok(col.clone()),
943 }
944}
945
946fn fill_stat_f64(col: &Arc<dyn Array>, stat: Stat) -> Result<Arc<dyn Array>> {
947 let array = col
948 .as_any()
949 .downcast_ref::<Float64Array>()
950 .ok_or_else(|| DataFrameError::invalid_operation("bad Float64Array downcast"))?;
951 let (min, max, mean) = stats_float(array.iter().flatten());
952 let value = match stat {
953 Stat::Min => min,
954 Stat::Max => max,
955 Stat::Mean => mean,
956 };
957 match value {
958 Some(v) => Ok(Arc::new(fill_float64(col, &Scalar::Float64(v))?)),
959 None => Ok(col.clone()),
960 }
961}
962
963fn stats_signed<I>(values: I) -> (Option<i128>, Option<i128>, Option<i128>)
964where
965 I: Iterator<Item = i128>,
966{
967 let mut min: Option<i128> = None;
968 let mut max: Option<i128> = None;
969 let mut sum: i128 = 0;
970 let mut count = 0;
971 for v in values {
972 min = Some(min.map_or(v, |m| m.min(v)));
973 max = Some(max.map_or(v, |m| m.max(v)));
974 sum += v;
975 count += 1;
976 }
977 let mean = if count == 0 { None } else { Some(sum / count) };
978 (min, max, mean)
979}
980
981fn stats_unsigned<I>(values: I) -> (Option<u128>, Option<u128>, Option<u128>)
982where
983 I: Iterator<Item = u128>,
984{
985 let mut min: Option<u128> = None;
986 let mut max: Option<u128> = None;
987 let mut sum: u128 = 0;
988 let mut count = 0;
989 for v in values {
990 min = Some(min.map_or(v, |m| m.min(v)));
991 max = Some(max.map_or(v, |m| m.max(v)));
992 sum += v;
993 count += 1;
994 }
995 let mean = if count == 0 { None } else { Some(sum / count) };
996 (min, max, mean)
997}
998
999fn stats_float<I>(values: I) -> (Option<f64>, Option<f64>, Option<f64>)
1000where
1001 I: Iterator<Item = f64>,
1002{
1003 let mut min: Option<f64> = None;
1004 let mut max: Option<f64> = None;
1005 let mut sum = 0.0_f64;
1006 let mut count = 0;
1007 for v in values {
1008 min = Some(min.map_or(v, |m| m.min(v)));
1009 max = Some(max.map_or(v, |m| m.max(v)));
1010 sum += v;
1011 count += 1;
1012 }
1013 let mean = if count == 0 {
1014 None
1015 } else {
1016 Some(sum / count as f64)
1017 };
1018 (min, max, mean)
1019}