mod arrays;
mod batches;
mod compare;
mod format;
mod string_view;
#[cfg(feature = "test")]
mod test_extensions;
use std::sync::Arc;
use arrow::array::{Array as _, AsArray as _, ListArray};
use arrow::datatypes::{DataType, Field};
pub use self::arrays::*;
pub use self::batches::*;
pub use self::compare::*;
pub use self::format::{
RecordBatchFormatOpts, format_field_datatype, format_record_batch, format_record_batch_opts,
format_record_batch_with_width,
};
pub use self::string_view::*;
#[cfg(feature = "test")]
pub use self::test_extensions::*;
pub fn widen_binary_arrays(list_array: &ListArray) -> ListArray {
let list_data_type = list_array.data_type();
if let DataType::List(field) = list_data_type
&& field.data_type() == &DataType::Binary
{
re_tracing::profile_function!();
let large_binary_field = Field::new("item", DataType::LargeBinary, true);
let target_type = DataType::List(Arc::new(large_binary_field));
#[expect(clippy::unwrap_used)]
arrow::compute::kernels::cast::cast(list_array, &target_type)
.unwrap()
.as_list()
.clone()
} else {
list_array.clone()
}
}
#[cfg(test)]
mod tests {
use arrow::array::{BinaryBuilder, ListBuilder};
use super::*;
#[test]
fn test_widen_list_binary() {
let mut list_builder = ListBuilder::new(BinaryBuilder::new());
list_builder.values().append_value(b"hello");
list_builder.values().append_value(b"world");
list_builder.append(true);
list_builder.values().append_value(b"rust");
list_builder.values().append_value(b"arrow");
list_builder.append(true);
list_builder.append_null();
let original_list = list_builder.finish();
let widened_list = widen_binary_arrays(&original_list);
assert_eq!(widened_list.len(), 3);
assert!(!widened_list.is_null(0));
assert!(!widened_list.is_null(1));
assert!(widened_list.is_null(2));
if let DataType::List(field) = widened_list.data_type() {
assert_eq!(field.data_type(), &DataType::LargeBinary);
} else {
panic!("Expected List data type");
}
}
}
pub fn reject_unsupported_widenings(dt: &DataType) -> Result<(), arrow::error::ArrowError> {
match dt {
DataType::Union(_, _) => Err(arrow::error::ArrowError::SchemaError(
"union-typed fields in the checked datatype are not supported for schema merging"
.to_owned(),
)),
DataType::Struct(fields) => {
for f in fields {
reject_unsupported_widenings(f.data_type())?;
}
Ok(())
}
DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
reject_unsupported_widenings(f.data_type())
}
_ => Ok(()),
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub struct MissingColumnError {
pub missing: String,
pub available: Vec<String>,
}
impl std::fmt::Display for MissingColumnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { missing, available } = self;
write!(f, "Missing column: {missing:?}. Available: {available:?}")
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub struct WrongDatatypeError {
pub column_name: Option<String>,
pub expected: Box<DataType>,
pub actual: Box<DataType>,
}
impl WrongDatatypeError {
pub fn ensure_datatype(field: &Field, expected: &DataType) -> Result<(), Self> {
if field.data_type() == expected {
Ok(())
} else {
Err(Self {
column_name: Some(field.name().to_owned()),
expected: expected.clone().into(),
actual: field.data_type().clone().into(),
})
}
}
}
impl std::fmt::Display for WrongDatatypeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
column_name,
expected,
actual,
} = self;
if let Some(column_name) = column_name {
write!(
f,
"Expected column {column_name:?} to be {expected}, got {actual}"
)
} else {
write!(f, "Expected {expected}, got {actual}")
}
}
}
#[cfg(test)]
mod reject_unsupported_widenings_tests {
use super::*;
use arrow::datatypes::{DataType, Field, Fields};
fn small_union_type() -> DataType {
use arrow::datatypes::UnionFields;
let fields = UnionFields::try_new(vec![0], vec![Field::new("a", DataType::Int32, true)])
.expect("valid union fields");
DataType::Union(fields, arrow::datatypes::UnionMode::Sparse)
}
#[test]
fn top_level_union_rejected() {
let err = reject_unsupported_widenings(&small_union_type()).unwrap_err();
assert!(err.to_string().contains("union-typed"), "msg: {err}");
}
#[test]
fn union_nested_inside_struct_rejected() {
let struct_type = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("u", small_union_type(), true),
]));
let err = reject_unsupported_widenings(&struct_type).unwrap_err();
assert!(err.to_string().contains("union-typed"), "msg: {err}");
}
#[test]
fn union_nested_inside_list_rejected() {
let list_of_union = DataType::List(Arc::new(Field::new("item", small_union_type(), true)));
let err = reject_unsupported_widenings(&list_of_union).unwrap_err();
assert!(err.to_string().contains("union-typed"), "msg: {err}");
}
#[test]
fn union_over_rejected_when_only_a_sibling_widens() {
use std::collections::HashMap;
use arrow::datatypes::Schema;
let narrow_struct = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int32, false),
Field::new("u", small_union_type(), true),
]));
let wide_struct = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("u", small_union_type(), true),
]));
let lhs =
Schema::new_with_metadata(vec![Field::new("s", narrow_struct, true)], HashMap::new());
let rhs = Schema::new_with_metadata(
vec![Field::new("s", wide_struct.clone(), true)],
HashMap::new(),
);
Schema::try_merge([lhs, rhs])
.expect("try_merge accepts: Union identical, only sibling widens");
let err = reject_unsupported_widenings(&wide_struct).unwrap_err();
assert!(err.to_string().contains("union-typed"), "msg: {err}");
}
#[test]
fn plain_schema_accepted() {
let schema = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new(
"b",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
true,
),
Field::new(
"c",
DataType::Struct(Fields::from(vec![Field::new("d", DataType::Int64, false)])),
true,
),
]));
assert!(reject_unsupported_widenings(&schema).is_ok());
}
}