vortex_array/arrow/
record_batch.rs1use arrow_array::RecordBatch;
5use arrow_array::cast::AsArray;
6use arrow_schema::DataType;
7use arrow_schema::Schema;
8use vortex_error::VortexError;
9use vortex_error::VortexResult;
10use vortex_error::vortex_bail;
11use vortex_error::vortex_ensure;
12
13use crate::Array;
14use crate::Canonical;
15use crate::LEGACY_SESSION;
16use crate::VortexSessionExecute;
17use crate::array::IntoArray;
18use crate::arrays::StructArray;
19use crate::arrow::ArrowArrayExecutor;
20
21impl TryFrom<&dyn Array> for RecordBatch {
22 type Error = VortexError;
23
24 fn try_from(value: &dyn Array) -> VortexResult<Self> {
25 let Canonical::Struct(struct_array) = value.to_canonical()? else {
26 vortex_bail!("RecordBatch can only be constructed from ")
27 };
28
29 vortex_ensure!(
30 struct_array.all_valid()?,
31 "RecordBatch can only be constructed from StructArray with no nulls"
32 );
33
34 let data_type = struct_array.dtype().to_arrow_dtype()?;
35 let array_ref = struct_array
36 .into_array()
37 .execute_arrow(Some(&data_type), &mut LEGACY_SESSION.create_execution_ctx())?;
38 Ok(RecordBatch::from(array_ref.as_struct()))
39 }
40}
41
42impl StructArray {
43 pub fn into_record_batch_with_schema(
44 self,
45 schema: impl AsRef<Schema>,
46 ) -> VortexResult<RecordBatch> {
47 let data_type = DataType::Struct(schema.as_ref().fields.clone());
48 let array_ref = self
49 .to_array()
50 .execute_arrow(Some(&data_type), &mut LEGACY_SESSION.create_execution_ctx())?;
51 Ok(RecordBatch::from(array_ref.as_struct()))
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use std::sync::Arc;
58
59 use arrow_schema::DataType;
60 use arrow_schema::Field;
61 use arrow_schema::FieldRef;
62 use arrow_schema::Schema;
63 use vortex_dtype::DType;
64 use vortex_dtype::Nullability;
65 use vortex_dtype::PType;
66
67 use crate::arrays::StructArray;
68 use crate::builders::ArrayBuilder;
69 use crate::builders::ListBuilder;
70 use crate::scalar::Scalar;
71
72 #[test]
73 fn test_into_rb_with_schema() {
74 let mut xs = ListBuilder::<u32>::new(
75 Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)),
76 Nullability::Nullable,
77 );
78
79 xs.append_scalar(&Scalar::list(
80 xs.element_dtype().clone(),
81 vec![1i32.into(), 2i32.into(), 3i32.into()],
82 Nullability::Nullable,
83 ))
84 .unwrap();
85 xs.append_null();
86 xs.append_zero();
87
88 let xs = xs.finish();
89
90 let array = StructArray::from_fields(&[("xs", xs)]).unwrap();
91
92 let arrow_schema = Arc::new(Schema::new(vec![Field::new(
94 "xs",
95 DataType::LargeListView(FieldRef::new(Field::new_list_field(DataType::Int32, false))),
96 true,
97 )]));
98 let rb = array.into_record_batch_with_schema(arrow_schema).unwrap();
99
100 let xs = rb.column(0);
101 assert_eq!(
102 xs.data_type(),
103 &DataType::LargeListView(FieldRef::new(Field::new_list_field(DataType::Int32, false)))
104 );
105 }
106}