use crate::cast::*;
use arrow_ord::partition::partition;
pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
array: &dyn Array,
to_type: &DataType,
cast_options: &CastOptions,
) -> Result<ArrayRef, ArrowError> {
match array.data_type() {
DataType::RunEndEncoded(_, _) => {
let run_array = array
.as_any()
.downcast_ref::<RunArray<K>>()
.ok_or_else(|| ArrowError::CastError("Expected RunArray".to_string()))?;
match to_type {
DataType::RunEndEncoded(target_index_field, target_value_field) => {
let values = run_array.values_slice();
let cast_values = cast_with_options(
values.as_ref(),
target_value_field.data_type(),
cast_options,
)?;
let run_ends_array =
PrimitiveArray::<K>::from_iter_values(run_array.run_ends().sliced_values());
let cast_run_ends = cast_with_options(
&run_ends_array,
target_index_field.data_type(),
cast_options,
)?;
let new_run_array: ArrayRef = match target_index_field.data_type() {
DataType::Int16 => {
let re = cast_run_ends.as_primitive::<Int16Type>();
Arc::new(RunArray::<Int16Type>::try_new(re, cast_values.as_ref())?)
}
DataType::Int32 => {
let re = cast_run_ends.as_primitive::<Int32Type>();
Arc::new(RunArray::<Int32Type>::try_new(re, cast_values.as_ref())?)
}
DataType::Int64 => {
let re = cast_run_ends.as_primitive::<Int64Type>();
Arc::new(RunArray::<Int64Type>::try_new(re, cast_values.as_ref())?)
}
_ => {
return Err(ArrowError::CastError(
"Run-end type must be i16, i32, or i64".to_string(),
));
}
};
Ok(new_run_array)
}
_ => {
let values = run_array.values();
let len = run_array.len();
let offset = run_array.offset();
let run_ends = run_array.run_ends().values();
let mut indices = Vec::with_capacity(len);
let mut physical_idx = run_array.get_start_physical_index();
for logical_idx in offset..offset + len {
if logical_idx == run_ends[physical_idx].as_usize() {
physical_idx += 1;
}
indices.push(physical_idx as i32);
}
let taken = take(&values, &Int32Array::from_iter_values(indices), None)?;
if taken.data_type() != to_type {
cast_with_options(taken.as_ref(), to_type, cast_options)
} else {
Ok(taken)
}
}
}
}
_ => Err(ArrowError::CastError(format!(
"Cannot cast array of type {:?} to RunEndEncodedArray",
array.data_type()
))),
}
}
pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
array: &ArrayRef,
value_type: &DataType,
cast_options: &CastOptions,
) -> Result<ArrayRef, ArrowError> {
let mut run_ends_builder = PrimitiveBuilder::<K>::new();
let cast_array = if array.data_type() == value_type {
array
} else {
&cast_with_options(array, value_type, cast_options)?
};
if cast_array.is_empty() {
let empty_run_ends = run_ends_builder.finish();
let empty_values = make_array(ArrayData::new_empty(value_type));
return Ok(Arc::new(RunArray::<K>::try_new(
&empty_run_ends,
empty_values.as_ref(),
)?));
}
if let DataType::RunEndEncoded(_, _) = array.data_type() {
return Err(ArrowError::CastError(
"Source array is already a RunEndEncoded array, should have been handled by run_end_encoded_cast".to_string()
));
}
let partitions = partition(&[Arc::clone(cast_array)])?;
let size = partitions.len();
let mut run_ends = Vec::with_capacity(size);
let mut values_indexes = Vec::with_capacity(size);
let mut last_partition_end = 0;
for partition in partitions.ranges() {
values_indexes.push(last_partition_end);
run_ends.push(partition.end);
last_partition_end = partition.end;
}
for run_end in run_ends {
run_ends_builder.append_value(K::Native::from_usize(run_end).ok_or_else(|| {
ArrowError::CastError(format!("Run end index out of range: {}", run_end))
})?);
}
let run_ends_array = run_ends_builder.finish();
let indices = PrimitiveArray::<UInt32Type>::from_iter_values(
values_indexes.iter().map(|&idx| idx as u32),
);
let values_array = take(&cast_array, &indices, None)?;
let run_array = RunArray::<K>::try_new(&run_ends_array, values_array.as_ref())?;
Ok(Arc::new(run_array))
}