use crate::{new_empty_array, Array, ArrayRef, StructArray};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef};
use std::ops::Index;
use std::sync::Arc;
pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
    fn schema(&self) -> SchemaRef;
    #[deprecated(
        since = "2.0.0",
        note = "This method is deprecated in favour of `next` from the trait Iterator."
    )]
    fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
        self.next().transpose()
    }
}
impl<R: RecordBatchReader + ?Sized> RecordBatchReader for Box<R> {
    fn schema(&self) -> SchemaRef {
        self.as_ref().schema()
    }
}
pub trait RecordBatchWriter {
    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError>;
    fn close(self) -> Result<(), ArrowError>;
}
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
    schema: SchemaRef,
    columns: Vec<Arc<dyn Array>>,
    row_count: usize,
}
impl RecordBatch {
    pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self, ArrowError> {
        let options = RecordBatchOptions::new();
        Self::try_new_impl(schema, columns, &options)
    }
    pub fn try_new_with_options(
        schema: SchemaRef,
        columns: Vec<ArrayRef>,
        options: &RecordBatchOptions,
    ) -> Result<Self, ArrowError> {
        Self::try_new_impl(schema, columns, options)
    }
    pub fn new_empty(schema: SchemaRef) -> Self {
        let columns = schema
            .fields()
            .iter()
            .map(|field| new_empty_array(field.data_type()))
            .collect();
        RecordBatch {
            schema,
            columns,
            row_count: 0,
        }
    }
    fn try_new_impl(
        schema: SchemaRef,
        columns: Vec<ArrayRef>,
        options: &RecordBatchOptions,
    ) -> Result<Self, ArrowError> {
        if schema.fields().len() != columns.len() {
            return Err(ArrowError::InvalidArgumentError(format!(
                "number of columns({}) must match number of fields({}) in schema",
                columns.len(),
                schema.fields().len(),
            )));
        }
        let row_count = options
            .row_count
            .or_else(|| columns.first().map(|col| col.len()))
            .ok_or_else(|| {
                ArrowError::InvalidArgumentError(
                    "must either specify a row count or at least one column".to_string(),
                )
            })?;
        for (c, f) in columns.iter().zip(&schema.fields) {
            if !f.is_nullable() && c.null_count() > 0 {
                return Err(ArrowError::InvalidArgumentError(format!(
                    "Column '{}' is declared as non-nullable but contains null values",
                    f.name()
                )));
            }
        }
        if columns.iter().any(|c| c.len() != row_count) {
            let err = match options.row_count {
                Some(_) => "all columns in a record batch must have the specified row count",
                None => "all columns in a record batch must have the same length",
            };
            return Err(ArrowError::InvalidArgumentError(err.to_string()));
        }
        let type_not_match = if options.match_field_names {
            |(_, (col_type, field_type)): &(usize, (&DataType, &DataType))| col_type != field_type
        } else {
            |(_, (col_type, field_type)): &(usize, (&DataType, &DataType))| {
                !col_type.equals_datatype(field_type)
            }
        };
        let not_match = columns
            .iter()
            .zip(schema.fields().iter())
            .map(|(col, field)| (col.data_type(), field.data_type()))
            .enumerate()
            .find(type_not_match);
        if let Some((i, (col_type, field_type))) = not_match {
            return Err(ArrowError::InvalidArgumentError(format!(
                "column types must match schema types, expected {field_type:?} but found {col_type:?} at column index {i}")));
        }
        Ok(RecordBatch {
            schema,
            columns,
            row_count,
        })
    }
    pub fn with_schema(self, schema: SchemaRef) -> Result<Self, ArrowError> {
        if !schema.contains(self.schema.as_ref()) {
            return Err(ArrowError::SchemaError(format!(
                "target schema is not superset of current schema target={schema} current={}",
                self.schema
            )));
        }
        Ok(Self {
            schema,
            columns: self.columns,
            row_count: self.row_count,
        })
    }
    pub fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }
    pub fn schema_ref(&self) -> &SchemaRef {
        &self.schema
    }
    pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
        let projected_schema = self.schema.project(indices)?;
        let batch_fields = indices
            .iter()
            .map(|f| {
                self.columns.get(*f).cloned().ok_or_else(|| {
                    ArrowError::SchemaError(format!(
                        "project index {} out of bounds, max field {}",
                        f,
                        self.columns.len()
                    ))
                })
            })
            .collect::<Result<Vec<_>, _>>()?;
        RecordBatch::try_new_with_options(
            SchemaRef::new(projected_schema),
            batch_fields,
            &RecordBatchOptions {
                match_field_names: true,
                row_count: Some(self.row_count),
            },
        )
    }
    pub fn num_columns(&self) -> usize {
        self.columns.len()
    }
    pub fn num_rows(&self) -> usize {
        self.row_count
    }
    pub fn column(&self, index: usize) -> &ArrayRef {
        &self.columns[index]
    }
    pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
        self.schema()
            .column_with_name(name)
            .map(|(index, _)| &self.columns[index])
    }
    pub fn columns(&self) -> &[ArrayRef] {
        &self.columns[..]
    }
    pub fn remove_column(&mut self, index: usize) -> ArrayRef {
        let mut builder = SchemaBuilder::from(self.schema.as_ref());
        builder.remove(index);
        self.schema = Arc::new(builder.finish());
        self.columns.remove(index)
    }
    pub fn slice(&self, offset: usize, length: usize) -> RecordBatch {
        assert!((offset + length) <= self.num_rows());
        let columns = self
            .columns()
            .iter()
            .map(|column| column.slice(offset, length))
            .collect();
        Self {
            schema: self.schema.clone(),
            columns,
            row_count: length,
        }
    }
    pub fn try_from_iter<I, F>(value: I) -> Result<Self, ArrowError>
    where
        I: IntoIterator<Item = (F, ArrayRef)>,
        F: AsRef<str>,
    {
        let iter = value.into_iter().map(|(field_name, array)| {
            let nullable = array.null_count() > 0;
            (field_name, array, nullable)
        });
        Self::try_from_iter_with_nullable(iter)
    }
    pub fn try_from_iter_with_nullable<I, F>(value: I) -> Result<Self, ArrowError>
    where
        I: IntoIterator<Item = (F, ArrayRef, bool)>,
        F: AsRef<str>,
    {
        let iter = value.into_iter();
        let capacity = iter.size_hint().0;
        let mut schema = SchemaBuilder::with_capacity(capacity);
        let mut columns = Vec::with_capacity(capacity);
        for (field_name, array, nullable) in iter {
            let field_name = field_name.as_ref();
            schema.push(Field::new(field_name, array.data_type().clone(), nullable));
            columns.push(array);
        }
        let schema = Arc::new(schema.finish());
        RecordBatch::try_new(schema, columns)
    }
    pub fn get_array_memory_size(&self) -> usize {
        self.columns()
            .iter()
            .map(|array| array.get_array_memory_size())
            .sum()
    }
}
#[derive(Debug)]
#[non_exhaustive]
pub struct RecordBatchOptions {
    pub match_field_names: bool,
    pub row_count: Option<usize>,
}
impl RecordBatchOptions {
    pub fn new() -> Self {
        Self {
            match_field_names: true,
            row_count: None,
        }
    }
    pub fn with_row_count(mut self, row_count: Option<usize>) -> Self {
        self.row_count = row_count;
        self
    }
    pub fn with_match_field_names(mut self, match_field_names: bool) -> Self {
        self.match_field_names = match_field_names;
        self
    }
}
impl Default for RecordBatchOptions {
    fn default() -> Self {
        Self::new()
    }
}
impl From<StructArray> for RecordBatch {
    fn from(value: StructArray) -> Self {
        let row_count = value.len();
        let (fields, columns, nulls) = value.into_parts();
        assert_eq!(
            nulls.map(|n| n.null_count()).unwrap_or_default(),
            0,
            "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
        );
        RecordBatch {
            schema: Arc::new(Schema::new(fields)),
            row_count,
            columns,
        }
    }
}
impl From<&StructArray> for RecordBatch {
    fn from(struct_array: &StructArray) -> Self {
        struct_array.clone().into()
    }
}
impl Index<&str> for RecordBatch {
    type Output = ArrayRef;
    fn index(&self, name: &str) -> &Self::Output {
        self.column_by_name(name).unwrap()
    }
}
pub struct RecordBatchIterator<I>
where
    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
{
    inner: I::IntoIter,
    inner_schema: SchemaRef,
}
impl<I> RecordBatchIterator<I>
where
    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
{
    pub fn new(iter: I, schema: SchemaRef) -> Self {
        Self {
            inner: iter.into_iter(),
            inner_schema: schema,
        }
    }
}
impl<I> Iterator for RecordBatchIterator<I>
where
    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
{
    type Item = I::Item;
    fn next(&mut self) -> Option<Self::Item> {
        self.inner.next()
    }
    fn size_hint(&self) -> (usize, Option<usize>) {
        self.inner.size_hint()
    }
}
impl<I> RecordBatchReader for RecordBatchIterator<I>
where
    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
{
    fn schema(&self) -> SchemaRef {
        self.inner_schema.clone()
    }
}
#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use super::*;
    use crate::{
        BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray,
    };
    use arrow_buffer::{Buffer, ToByteSlice};
    use arrow_data::{ArrayData, ArrayDataBuilder};
    use arrow_schema::Fields;
    #[test]
    fn create_record_batch() {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
            Field::new("b", DataType::Utf8, false),
        ]);
        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
        let record_batch =
            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
        check_batch(record_batch, 5)
    }
    #[test]
    fn create_string_view_record_batch() {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
            Field::new("b", DataType::Utf8View, false),
        ]);
        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
        let b = StringViewArray::from(vec!["a", "b", "c", "d", "e"]);
        let record_batch =
            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
        assert_eq!(5, record_batch.num_rows());
        assert_eq!(2, record_batch.num_columns());
        assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
        assert_eq!(
            &DataType::Utf8View,
            record_batch.schema().field(1).data_type()
        );
        assert_eq!(5, record_batch.column(0).len());
        assert_eq!(5, record_batch.column(1).len());
    }
    #[test]
    fn byte_size_should_not_regress() {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
            Field::new("b", DataType::Utf8, false),
        ]);
        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
        let record_batch =
            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
        assert_eq!(record_batch.get_array_memory_size(), 364);
    }
    fn check_batch(record_batch: RecordBatch, num_rows: usize) {
        assert_eq!(num_rows, record_batch.num_rows());
        assert_eq!(2, record_batch.num_columns());
        assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
        assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
        assert_eq!(num_rows, record_batch.column(0).len());
        assert_eq!(num_rows, record_batch.column(1).len());
    }
    #[test]
    #[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
    fn create_record_batch_slice() {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
            Field::new("b", DataType::Utf8, false),
        ]);
        let expected_schema = schema.clone();
        let a = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
        let b = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "h", "i"]);
        let record_batch =
            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
        let offset = 2;
        let length = 5;
        let record_batch_slice = record_batch.slice(offset, length);
        assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
        check_batch(record_batch_slice, 5);
        let offset = 2;
        let length = 0;
        let record_batch_slice = record_batch.slice(offset, length);
        assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
        check_batch(record_batch_slice, 0);
        let offset = 2;
        let length = 10;
        let _record_batch_slice = record_batch.slice(offset, length);
    }
    #[test]
    #[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
    fn create_record_batch_slice_empty_batch() {
        let schema = Schema::empty();
        let record_batch = RecordBatch::new_empty(Arc::new(schema));
        let offset = 0;
        let length = 0;
        let record_batch_slice = record_batch.slice(offset, length);
        assert_eq!(0, record_batch_slice.schema().fields().len());
        let offset = 1;
        let length = 2;
        let _record_batch_slice = record_batch.slice(offset, length);
    }
    #[test]
    fn create_record_batch_try_from_iter() {
        let a: ArrayRef = Arc::new(Int32Array::from(vec![
            Some(1),
            Some(2),
            None,
            Some(4),
            Some(5),
        ]));
        let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
        let record_batch =
            RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).expect("valid conversion");
        let expected_schema = Schema::new(vec![
            Field::new("a", DataType::Int32, true),
            Field::new("b", DataType::Utf8, false),
        ]);
        assert_eq!(record_batch.schema().as_ref(), &expected_schema);
        check_batch(record_batch, 5);
    }
    #[test]
    fn create_record_batch_try_from_iter_with_nullable() {
        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
        let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
        let record_batch =
            RecordBatch::try_from_iter_with_nullable(vec![("a", a, false), ("b", b, true)])
                .expect("valid conversion");
        let expected_schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
            Field::new("b", DataType::Utf8, true),
        ]);
        assert_eq!(record_batch.schema().as_ref(), &expected_schema);
        check_batch(record_batch, 5);
    }
    #[test]
    fn create_record_batch_schema_mismatch() {
        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
        let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]);
        assert!(batch.is_err());
    }
    #[test]
    fn create_record_batch_field_name_mismatch() {
        let fields = vec![
            Field::new("a1", DataType::Int32, false),
            Field::new_list("a2", Field::new("item", DataType::Int8, false), false),
        ];
        let schema = Arc::new(Schema::new(vec![Field::new_struct("a", fields, true)]));
        let a1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
        let a2_child = Int8Array::from(vec![1, 2, 3, 4]);
        let a2 = ArrayDataBuilder::new(DataType::List(Arc::new(Field::new(
            "array",
            DataType::Int8,
            false,
        ))))
        .add_child_data(a2_child.into_data())
        .len(2)
        .add_buffer(Buffer::from([0i32, 3, 4].to_byte_slice()))
        .build()
        .unwrap();
        let a2: ArrayRef = Arc::new(ListArray::from(a2));
        let a = ArrayDataBuilder::new(DataType::Struct(Fields::from(vec![
            Field::new("aa1", DataType::Int32, false),
            Field::new("a2", a2.data_type().clone(), false),
        ])))
        .add_child_data(a1.into_data())
        .add_child_data(a2.into_data())
        .len(2)
        .build()
        .unwrap();
        let a: ArrayRef = Arc::new(StructArray::from(a));
        let batch = RecordBatch::try_new(schema.clone(), vec![a.clone()]);
        assert!(batch.is_err());
        let options = RecordBatchOptions {
            match_field_names: false,
            row_count: None,
        };
        let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
        assert!(batch.is_ok());
    }
    #[test]
    fn create_record_batch_record_mismatch() {
        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]);
        assert!(batch.is_err());
    }
    #[test]
    fn create_record_batch_from_struct_array() {
        let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
        let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
        let struct_array = StructArray::from(vec![
            (
                Arc::new(Field::new("b", DataType::Boolean, false)),
                boolean.clone() as ArrayRef,
            ),
            (
                Arc::new(Field::new("c", DataType::Int32, false)),
                int.clone() as ArrayRef,
            ),
        ]);
        let batch = RecordBatch::from(&struct_array);
        assert_eq!(2, batch.num_columns());
        assert_eq!(4, batch.num_rows());
        assert_eq!(
            struct_array.data_type(),
            &DataType::Struct(batch.schema().fields().clone())
        );
        assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
        assert_eq!(batch.column(1).as_ref(), int.as_ref());
    }
    #[test]
    fn record_batch_equality() {
        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema1 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema2 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let batch1 = RecordBatch::try_new(
            Arc::new(schema1),
            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
        )
        .unwrap();
        let batch2 = RecordBatch::try_new(
            Arc::new(schema2),
            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
        )
        .unwrap();
        assert_eq!(batch1, batch2);
    }
    #[test]
    fn record_batch_index_access() {
        let id_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
        let val_arr = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
        let schema1 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let record_batch =
            RecordBatch::try_new(Arc::new(schema1), vec![id_arr.clone(), val_arr.clone()]).unwrap();
        assert_eq!(record_batch["id"].as_ref(), id_arr.as_ref());
        assert_eq!(record_batch["val"].as_ref(), val_arr.as_ref());
    }
    #[test]
    fn record_batch_vals_ne() {
        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema1 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
        let schema2 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let batch1 = RecordBatch::try_new(
            Arc::new(schema1),
            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
        )
        .unwrap();
        let batch2 = RecordBatch::try_new(
            Arc::new(schema2),
            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
        )
        .unwrap();
        assert_ne!(batch1, batch2);
    }
    #[test]
    fn record_batch_column_names_ne() {
        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema1 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema2 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("num", DataType::Int32, false),
        ]);
        let batch1 = RecordBatch::try_new(
            Arc::new(schema1),
            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
        )
        .unwrap();
        let batch2 = RecordBatch::try_new(
            Arc::new(schema2),
            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
        )
        .unwrap();
        assert_ne!(batch1, batch2);
    }
    #[test]
    fn record_batch_column_number_ne() {
        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema1 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
        let num_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema2 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
            Field::new("num", DataType::Int32, false),
        ]);
        let batch1 = RecordBatch::try_new(
            Arc::new(schema1),
            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
        )
        .unwrap();
        let batch2 = RecordBatch::try_new(
            Arc::new(schema2),
            vec![Arc::new(id_arr2), Arc::new(val_arr2), Arc::new(num_arr2)],
        )
        .unwrap();
        assert_ne!(batch1, batch2);
    }
    #[test]
    fn record_batch_row_count_ne() {
        let id_arr1 = Int32Array::from(vec![1, 2, 3]);
        let val_arr1 = Int32Array::from(vec![5, 6, 7]);
        let schema1 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("val", DataType::Int32, false),
        ]);
        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
        let schema2 = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("num", DataType::Int32, false),
        ]);
        let batch1 = RecordBatch::try_new(
            Arc::new(schema1),
            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
        )
        .unwrap();
        let batch2 = RecordBatch::try_new(
            Arc::new(schema2),
            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
        )
        .unwrap();
        assert_ne!(batch1, batch2);
    }
    #[test]
    fn project() {
        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
        let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
        let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"]));
        let record_batch =
            RecordBatch::try_from_iter(vec![("a", a.clone()), ("b", b.clone()), ("c", c.clone())])
                .expect("valid conversion");
        let expected =
            RecordBatch::try_from_iter(vec![("a", a), ("c", c)]).expect("valid conversion");
        assert_eq!(expected, record_batch.project(&[0, 2]).unwrap());
    }
    #[test]
    fn project_empty() {
        let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"]));
        let record_batch =
            RecordBatch::try_from_iter(vec![("c", c.clone())]).expect("valid conversion");
        let expected = RecordBatch::try_new_with_options(
            Arc::new(Schema::empty()),
            vec![],
            &RecordBatchOptions {
                match_field_names: true,
                row_count: Some(3),
            },
        )
        .expect("valid conversion");
        assert_eq!(expected, record_batch.project(&[]).unwrap());
    }
    #[test]
    fn test_no_column_record_batch() {
        let schema = Arc::new(Schema::empty());
        let err = RecordBatch::try_new(schema.clone(), vec![]).unwrap_err();
        assert!(err
            .to_string()
            .contains("must either specify a row count or at least one column"));
        let options = RecordBatchOptions::new().with_row_count(Some(10));
        let ok = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
        assert_eq!(ok.num_rows(), 10);
        let a = ok.slice(2, 5);
        assert_eq!(a.num_rows(), 5);
        let b = ok.slice(5, 0);
        assert_eq!(b.num_rows(), 0);
        assert_ne!(a, b);
        assert_eq!(b, RecordBatch::new_empty(schema))
    }
    #[test]
    fn test_nulls_in_non_nullable_field() {
        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
        let maybe_batch = RecordBatch::try_new(
            schema,
            vec![Arc::new(Int32Array::from(vec![Some(1), None]))],
        );
        assert_eq!("Invalid argument error: Column 'a' is declared as non-nullable but contains null values", format!("{}", maybe_batch.err().unwrap()));
    }
    #[test]
    fn test_record_batch_options() {
        let options = RecordBatchOptions::new()
            .with_match_field_names(false)
            .with_row_count(Some(20));
        assert!(!options.match_field_names);
        assert_eq!(options.row_count.unwrap(), 20)
    }
    #[test]
    #[should_panic(expected = "Cannot convert nullable StructArray to RecordBatch")]
    fn test_from_struct() {
        let s = StructArray::from(ArrayData::new_null(
            &DataType::Struct(vec![Field::new("foo", DataType::Int32, false)].into()),
            2,
        ));
        let _ = RecordBatch::from(s);
    }
    #[test]
    fn test_with_schema() {
        let required_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
        let required_schema = Arc::new(required_schema);
        let nullable_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
        let nullable_schema = Arc::new(nullable_schema);
        let batch = RecordBatch::try_new(
            required_schema.clone(),
            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as _],
        )
        .unwrap();
        let batch = batch.with_schema(nullable_schema.clone()).unwrap();
        batch.clone().with_schema(required_schema).unwrap_err();
        let metadata = vec![("foo".to_string(), "bar".to_string())]
            .into_iter()
            .collect();
        let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata);
        let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap();
        batch.with_schema(nullable_schema).unwrap_err();
    }
    #[test]
    fn test_boxed_reader() {
        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
        let schema = Arc::new(schema);
        let reader = RecordBatchIterator::new(std::iter::empty(), schema);
        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
        fn get_size(reader: impl RecordBatchReader) -> usize {
            reader.size_hint().0
        }
        let size = get_size(reader);
        assert_eq!(size, 0);
    }
    #[test]
    fn test_remove_column_maintains_schema_metadata() {
        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
        let bool_array = BooleanArray::from(vec![true, false, false, true, true]);
        let mut metadata = HashMap::new();
        metadata.insert("foo".to_string(), "bar".to_string());
        let schema = Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("bool", DataType::Boolean, false),
        ])
        .with_metadata(metadata);
        let mut batch = RecordBatch::try_new(
            Arc::new(schema),
            vec![Arc::new(id_array), Arc::new(bool_array)],
        )
        .unwrap();
        let _removed_column = batch.remove_column(0);
        assert_eq!(batch.schema().metadata().len(), 1);
        assert_eq!(
            batch.schema().metadata().get("foo").unwrap().as_str(),
            "bar"
        );
    }
}