pyo3_arrow/
table.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow_array::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader;
5use arrow_array::{ArrayRef, RecordBatchReader, StructArray};
6use arrow_array::{RecordBatch, RecordBatchIterator};
7use arrow_cast::pretty::pretty_format_batches_with_options;
8use arrow_schema::{ArrowError, Field, Schema, SchemaRef};
9use arrow_select::concat::concat_batches;
10use indexmap::IndexMap;
11use pyo3::exceptions::{PyTypeError, PyValueError};
12use pyo3::intern;
13use pyo3::prelude::*;
14use pyo3::types::{PyCapsule, PyTuple, PyType};
15
16use crate::error::{PyArrowError, PyArrowResult};
17use crate::export::{
18    Arro3ChunkedArray, Arro3Field, Arro3RecordBatch, Arro3RecordBatchReader, Arro3Schema,
19    Arro3Table,
20};
21use crate::ffi::from_python::utils::import_stream_pycapsule;
22use crate::ffi::to_python::chunked::ArrayIterator;
23use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
24use crate::ffi::to_python::to_stream_pycapsule;
25use crate::ffi::to_schema_pycapsule;
26use crate::input::{
27    AnyArray, AnyRecordBatch, FieldIndexInput, MetadataInput, NameOrField, SelectIndices,
28};
29use crate::utils::{default_repr_options, schema_equals};
30use crate::{PyChunkedArray, PyField, PyRecordBatch, PyRecordBatchReader, PySchema};
31
32/// A Python-facing Arrow table.
33///
34/// This is a wrapper around a [SchemaRef] and a `Vec` of [RecordBatch].
35#[pyclass(module = "arro3.core._core", name = "Table", subclass, frozen)]
36#[derive(Debug)]
37pub struct PyTable {
38    batches: Vec<RecordBatch>,
39    schema: SchemaRef,
40}
41
42impl PyTable {
43    /// Create a new table from batches and a schema.
44    pub fn try_new(batches: Vec<RecordBatch>, schema: SchemaRef) -> PyResult<Self> {
45        if !batches
46            .iter()
47            .all(|rb| schema_equals(rb.schema_ref(), &schema))
48        {
49            return Err(PyTypeError::new_err("All batches must have same schema"));
50        }
51
52        Ok(Self { schema, batches })
53    }
54
55    /// Construct from a raw Arrow C Stream capsule
56    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
57        let stream = import_stream_pycapsule(capsule)?;
58        let stream_reader = ArrowRecordBatchStreamReader::try_new(stream)
59            .map_err(|err| PyValueError::new_err(err.to_string()))?;
60        let schema = stream_reader.schema();
61
62        let mut batches = vec![];
63        for batch in stream_reader {
64            let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?;
65            batches.push(batch);
66        }
67
68        Self::try_new(batches, schema)
69    }
70
71    /// Access the underlying batches
72    pub fn batches(&self) -> &[RecordBatch] {
73        &self.batches
74    }
75
76    /// Consume this and return its internal batches and schema.
77    pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
78        (self.batches, self.schema)
79    }
80
81    /// Export this to a Python `arro3.core.Table`.
82    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
83        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
84        arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
85            intern!(py, "from_arrow_pycapsule"),
86            PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
87        )
88    }
89
90    /// Export this to a Python `arro3.core.Table`.
91    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
92        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
93        let capsule =
94            Self::to_stream_pycapsule(py, self.batches.clone(), self.schema.clone(), None)?;
95        arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
96            intern!(py, "from_arrow_pycapsule"),
97            PyTuple::new(py, vec![capsule])?,
98        )
99    }
100
101    /// Export this to a Python `nanoarrow.ArrayStream`.
102    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
104    }
105
106    /// Export to a pyarrow.Table
107    ///
108    /// Requires pyarrow >=14
109    pub fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
110        let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
111        pyarrow_mod
112            .getattr(intern!(py, "table"))?
113            .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)
114    }
115
116    pub(crate) fn to_stream_pycapsule<'py>(
117        py: Python<'py>,
118        batches: Vec<RecordBatch>,
119        schema: SchemaRef,
120        requested_schema: Option<Bound<'py, PyCapsule>>,
121    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
122        let field = schema.fields();
123        let array_reader = batches.into_iter().map(|batch| {
124            let arr: ArrayRef = Arc::new(StructArray::from(batch));
125            Ok(arr)
126        });
127        let array_reader = Box::new(ArrayIterator::new(
128            array_reader,
129            Field::new_struct("", field.clone(), false)
130                .with_metadata(schema.metadata.clone())
131                .into(),
132        ));
133        to_stream_pycapsule(py, array_reader, requested_schema)
134    }
135
136    pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
137        let total_chunk_length = chunk_lengths.iter().sum::<usize>();
138        if total_chunk_length != self.num_rows() {
139            return Err(
140                PyValueError::new_err("Chunk lengths do not add up to table length").into(),
141            );
142        }
143
144        // If the desired rechunking is the existing chunking, return early
145        let matches_existing_chunking = chunk_lengths
146            .iter()
147            .zip(self.batches())
148            .all(|(length, batch)| *length == batch.num_rows());
149        if matches_existing_chunking {
150            return Ok(Self::try_new(self.batches.clone(), self.schema.clone())?);
151        }
152
153        let mut offset = 0;
154        let batches = chunk_lengths
155            .iter()
156            .map(|chunk_length| {
157                let sliced_table = self.slice(offset, *chunk_length)?;
158                let sliced_concatted = concat_batches(&self.schema, sliced_table.batches.iter())?;
159                offset += chunk_length;
160                Ok(sliced_concatted)
161            })
162            .collect::<PyArrowResult<Vec<_>>>()?;
163
164        Ok(Self::try_new(batches, self.schema.clone())?)
165    }
166
167    pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
168        if offset + length > self.num_rows() {
169            return Err(
170                PyValueError::new_err("offset + length may not exceed length of array").into(),
171            );
172        }
173
174        let mut sliced_batches: Vec<RecordBatch> = vec![];
175        for chunk in self.batches() {
176            if chunk.num_rows() == 0 {
177                continue;
178            }
179
180            // If the offset is greater than the len of this chunk, don't include any rows from
181            // this chunk
182            if offset >= chunk.num_rows() {
183                offset -= chunk.num_rows();
184                continue;
185            }
186
187            let take_count = length.min(chunk.num_rows() - offset);
188            let sliced_chunk = chunk.slice(offset, take_count);
189            sliced_batches.push(sliced_chunk);
190
191            length -= take_count;
192
193            // If we've selected all rows, exit
194            if length == 0 {
195                break;
196            } else {
197                offset = 0;
198            }
199        }
200
201        Ok(Self::try_new(sliced_batches, self.schema.clone())?)
202    }
203}
204
205impl Display for PyTable {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        writeln!(f, "arro3.core.Table")?;
208        let head_table = self
209            .slice(0, 10.min(self.num_rows()))
210            .map_err(|_| std::fmt::Error)?
211            .combine_chunks()
212            .map_err(|_| std::fmt::Error)?;
213
214        pretty_format_batches_with_options(
215            &head_table.into_inner().batches,
216            &default_repr_options(),
217        )
218        .map_err(|_| std::fmt::Error)?
219        .fmt(f)?;
220
221        Ok(())
222    }
223}
224
225#[pymethods]
226impl PyTable {
227    #[new]
228    #[pyo3(signature = (data, *, names=None, schema=None, metadata=None))]
229    fn new(
230        py: Python,
231        data: &Bound<PyAny>,
232        names: Option<Vec<String>>,
233        schema: Option<PySchema>,
234        metadata: Option<MetadataInput>,
235    ) -> PyArrowResult<Self> {
236        if data.hasattr(intern!(py, "__arrow_c_array__"))?
237            || data.hasattr(intern!(py, "__arrow_c_stream__"))?
238        {
239            Ok(data.extract::<AnyRecordBatch>()?.into_table()?)
240        } else if let Ok(mapping) = data.extract::<IndexMap<String, AnyArray>>() {
241            Self::from_pydict(&py.get_type::<PyTable>(), mapping, schema, metadata)
242        } else if let Ok(arrays) = data.extract::<Vec<AnyArray>>() {
243            Self::from_arrays(&py.get_type::<PyTable>(), arrays, names, schema, metadata)
244        } else {
245            Err(PyTypeError::new_err(
246                "Expected Table-like input or dict of arrays or sequence of arrays.",
247            )
248            .into())
249        }
250    }
251
252    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
253        to_schema_pycapsule(py, self.schema.as_ref())
254    }
255
256    #[pyo3(signature = (requested_schema=None))]
257    fn __arrow_c_stream__<'py>(
258        &'py self,
259        py: Python<'py>,
260        requested_schema: Option<Bound<'py, PyCapsule>>,
261    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
262        Self::to_stream_pycapsule(
263            py,
264            self.batches.clone(),
265            self.schema.clone(),
266            requested_schema,
267        )
268    }
269
270    fn __eq__(&self, other: &PyTable) -> bool {
271        self.batches == other.batches && self.schema == other.schema
272    }
273
274    fn __getitem__(&self, key: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
275        self.column(key)
276    }
277
278    fn __len__(&self) -> usize {
279        self.batches.iter().fold(0, |acc, x| acc + x.num_rows())
280    }
281
282    fn __repr__(&self) -> String {
283        self.to_string()
284    }
285
286    #[classmethod]
287    fn from_arrow(_cls: &Bound<PyType>, input: AnyRecordBatch) -> PyArrowResult<Self> {
288        input.into_table()
289    }
290
291    #[classmethod]
292    #[pyo3(name = "from_arrow_pycapsule")]
293    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
294        Self::from_arrow_pycapsule(capsule)
295    }
296
297    #[classmethod]
298    #[pyo3(signature = (batches, *, schema=None))]
299    fn from_batches(
300        _cls: &Bound<PyType>,
301        batches: Vec<PyRecordBatch>,
302        schema: Option<PySchema>,
303    ) -> PyArrowResult<Self> {
304        if batches.is_empty() {
305            let schema = schema.ok_or(PyValueError::new_err(
306                "schema must be passed for an empty list of batches",
307            ))?;
308            return Ok(Self::try_new(vec![], schema.into_inner())?);
309        }
310
311        let batches = batches
312            .into_iter()
313            .map(|batch| batch.into_inner())
314            .collect::<Vec<_>>();
315        let schema = schema
316            .map(|s| s.into_inner())
317            .unwrap_or(batches.first().unwrap().schema());
318        Ok(Self::try_new(batches, schema)?)
319    }
320
321    #[classmethod]
322    #[pyo3(signature = (mapping, *, schema=None, metadata=None))]
323    fn from_pydict(
324        cls: &Bound<PyType>,
325        mapping: IndexMap<String, AnyArray>,
326        schema: Option<PySchema>,
327        metadata: Option<MetadataInput>,
328    ) -> PyArrowResult<Self> {
329        let (names, arrays): (Vec<_>, Vec<_>) = mapping.into_iter().unzip();
330        Self::from_arrays(cls, arrays, Some(names), schema, metadata)
331    }
332
333    #[classmethod]
334    #[pyo3(signature = (arrays, *, names=None, schema=None, metadata=None))]
335    fn from_arrays(
336        _cls: &Bound<PyType>,
337        arrays: Vec<AnyArray>,
338        names: Option<Vec<String>>,
339        schema: Option<PySchema>,
340        metadata: Option<MetadataInput>,
341    ) -> PyArrowResult<Self> {
342        let columns = arrays
343            .into_iter()
344            .map(|array| array.into_chunked_array())
345            .collect::<PyArrowResult<Vec<_>>>()?;
346
347        let schema: SchemaRef = if let Some(schema) = schema {
348            schema.into_inner()
349        } else {
350            let names = names.ok_or(PyValueError::new_err(
351                "names must be passed if schema is not passed.",
352            ))?;
353
354            let fields = columns
355                .iter()
356                .zip(names.iter())
357                .map(|(array, name)| Arc::new(array.field().as_ref().clone().with_name(name)))
358                .collect::<Vec<_>>();
359            Arc::new(
360                Schema::new(fields)
361                    .with_metadata(metadata.unwrap_or_default().into_string_hashmap().unwrap()),
362            )
363        };
364
365        if columns.is_empty() {
366            return Ok(Self::try_new(vec![], schema)?);
367        }
368
369        let column_chunk_lengths = columns
370            .iter()
371            .map(|column| {
372                let chunk_lengths = column
373                    .chunks()
374                    .iter()
375                    .map(|chunk| chunk.len())
376                    .collect::<Vec<_>>();
377                chunk_lengths
378            })
379            .collect::<Vec<_>>();
380        if !column_chunk_lengths.windows(2).all(|w| w[0] == w[1]) {
381            return Err(
382                PyValueError::new_err("All columns must have the same chunk lengths").into(),
383            );
384        }
385        let num_batches = column_chunk_lengths[0].len();
386
387        let mut batches = vec![];
388        for batch_idx in 0..num_batches {
389            let batch = RecordBatch::try_new(
390                schema.clone(),
391                columns
392                    .iter()
393                    .map(|column| column.chunks()[batch_idx].clone())
394                    .collect(),
395            )?;
396            batches.push(batch);
397        }
398
399        Ok(Self::try_new(batches, schema)?)
400    }
401
402    fn add_column(
403        &self,
404        i: usize,
405        field: NameOrField,
406        column: PyChunkedArray,
407    ) -> PyArrowResult<Arro3Table> {
408        if self.num_rows() != column.len() {
409            return Err(
410                PyValueError::new_err("Number of rows in column does not match table.").into(),
411            );
412        }
413
414        let column = column.rechunk(self.chunk_lengths())?;
415
416        let mut fields = self.schema.fields().to_vec();
417        fields.insert(i, field.into_field(column.field()));
418        let new_schema = Arc::new(Schema::new_with_metadata(
419            fields,
420            self.schema.metadata().clone(),
421        ));
422
423        let new_batches = self
424            .batches
425            .iter()
426            .zip(column.chunks())
427            .map(|(batch, array)| {
428                debug_assert_eq!(
429                    array.len(),
430                    batch.num_rows(),
431                    "Array and batch should have same number of rows."
432                );
433
434                let mut columns = batch.columns().to_vec();
435                columns.insert(i, array.clone());
436                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
437            })
438            .collect::<Result<Vec<_>, PyArrowError>>()?;
439
440        Ok(PyTable::try_new(new_batches, new_schema)?.into())
441    }
442
443    fn append_column(
444        &self,
445        field: NameOrField,
446        column: PyChunkedArray,
447    ) -> PyArrowResult<Arro3Table> {
448        if self.num_rows() != column.len() {
449            return Err(
450                PyValueError::new_err("Number of rows in column does not match table.").into(),
451            );
452        }
453
454        let column = column.rechunk(self.chunk_lengths())?;
455
456        let mut fields = self.schema.fields().to_vec();
457        fields.push(field.into_field(column.field()));
458        let new_schema = Arc::new(Schema::new_with_metadata(
459            fields,
460            self.schema.metadata().clone(),
461        ));
462
463        let new_batches = self
464            .batches
465            .iter()
466            .zip(column.chunks())
467            .map(|(batch, array)| {
468                debug_assert_eq!(
469                    array.len(),
470                    batch.num_rows(),
471                    "Array and batch should have same number of rows."
472                );
473
474                let mut columns = batch.columns().to_vec();
475                columns.push(array.clone());
476                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
477            })
478            .collect::<Result<Vec<_>, PyArrowError>>()?;
479
480        Ok(PyTable::try_new(new_batches, new_schema)?.into())
481    }
482
483    #[getter]
484    fn chunk_lengths(&self) -> Vec<usize> {
485        self.batches.iter().map(|batch| batch.num_rows()).collect()
486    }
487
488    fn column(&self, i: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
489        let column_index = i.into_position(&self.schema)?;
490        let field = self.schema.field(column_index).clone();
491        let chunks = self
492            .batches
493            .iter()
494            .map(|batch| batch.column(column_index).clone())
495            .collect();
496        Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
497    }
498
499    #[getter]
500    fn column_names(&self) -> Vec<String> {
501        self.schema
502            .fields()
503            .iter()
504            .map(|f| f.name().clone())
505            .collect()
506    }
507
508    #[getter]
509    fn columns(&self) -> PyArrowResult<Vec<Arro3ChunkedArray>> {
510        (0..self.num_columns())
511            .map(|i| self.column(FieldIndexInput::Position(i)))
512            .collect()
513    }
514
515    fn combine_chunks(&self) -> PyArrowResult<Arro3Table> {
516        let batch = concat_batches(&self.schema, &self.batches)?;
517        Ok(PyTable::try_new(vec![batch], self.schema.clone())?.into())
518    }
519
520    fn field(&self, i: FieldIndexInput) -> PyArrowResult<Arro3Field> {
521        let field = self.schema.field(i.into_position(&self.schema)?);
522        Ok(PyField::new(field.clone().into()).into())
523    }
524
525    #[getter]
526    fn nbytes(&self) -> usize {
527        self.batches
528            .iter()
529            .fold(0, |acc, batch| acc + batch.get_array_memory_size())
530    }
531
532    #[getter]
533    fn num_columns(&self) -> usize {
534        self.schema.fields().len()
535    }
536
537    #[getter]
538    fn num_rows(&self) -> usize {
539        self.batches()
540            .iter()
541            .fold(0, |acc, batch| acc + batch.num_rows())
542    }
543
544    #[pyo3(signature = (*, max_chunksize=None))]
545    #[pyo3(name = "rechunk")]
546    fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3Table> {
547        let max_chunksize = max_chunksize.unwrap_or(self.num_rows());
548        if max_chunksize == 0 {
549            return Err(PyValueError::new_err("max_chunksize must be > 0").into());
550        }
551
552        let mut chunk_lengths = vec![];
553        let mut offset = 0;
554        while offset < self.num_rows() {
555            let chunk_length = max_chunksize.min(self.num_rows() - offset);
556            offset += chunk_length;
557            chunk_lengths.push(chunk_length);
558        }
559        Ok(self.rechunk(chunk_lengths)?.into())
560    }
561
562    fn remove_column(&self, i: usize) -> PyArrowResult<Arro3Table> {
563        let mut fields = self.schema.fields().to_vec();
564        fields.remove(i);
565        let new_schema = Arc::new(Schema::new_with_metadata(
566            fields,
567            self.schema.metadata().clone(),
568        ));
569
570        let new_batches = self
571            .batches
572            .iter()
573            .map(|batch| {
574                let mut columns = batch.columns().to_vec();
575                columns.remove(i);
576                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
577            })
578            .collect::<Result<Vec<_>, PyArrowError>>()?;
579
580        Ok(PyTable::try_new(new_batches, new_schema)?.into())
581    }
582
583    fn rename_columns(&self, names: Vec<String>) -> PyArrowResult<Arro3Table> {
584        if names.len() != self.num_columns() {
585            return Err(PyValueError::new_err("When names is a list[str], must pass the same number of names as there are columns.").into());
586        }
587
588        let new_fields = self
589            .schema
590            .fields()
591            .iter()
592            .zip(names)
593            .map(|(field, name)| field.as_ref().clone().with_name(name))
594            .collect::<Vec<_>>();
595        let new_schema = Arc::new(Schema::new_with_metadata(
596            new_fields,
597            self.schema.metadata().clone(),
598        ));
599        Ok(PyTable::try_new(self.batches.clone(), new_schema)?.into())
600    }
601
602    #[getter]
603    fn schema(&self) -> Arro3Schema {
604        PySchema::new(self.schema.clone()).into()
605    }
606
607    fn select(&self, columns: SelectIndices) -> PyArrowResult<Arro3Table> {
608        let positions = columns.into_positions(self.schema.fields())?;
609
610        let new_schema = Arc::new(self.schema.project(&positions)?);
611        let new_batches = self
612            .batches
613            .iter()
614            .map(|batch| batch.project(&positions))
615            .collect::<Result<Vec<_>, ArrowError>>()?;
616        Ok(PyTable::try_new(new_batches, new_schema)?.into())
617    }
618
619    fn set_column(
620        &self,
621        i: usize,
622        field: NameOrField,
623        column: PyChunkedArray,
624    ) -> PyArrowResult<Arro3Table> {
625        if self.num_rows() != column.len() {
626            return Err(
627                PyValueError::new_err("Number of rows in column does not match table.").into(),
628            );
629        }
630
631        let column = column.rechunk(self.chunk_lengths())?;
632
633        let mut fields = self.schema.fields().to_vec();
634        fields[i] = field.into_field(column.field());
635        let new_schema = Arc::new(Schema::new_with_metadata(
636            fields,
637            self.schema.metadata().clone(),
638        ));
639
640        let new_batches = self
641            .batches
642            .iter()
643            .zip(column.chunks())
644            .map(|(batch, array)| {
645                debug_assert_eq!(
646                    array.len(),
647                    batch.num_rows(),
648                    "Array and batch should have same number of rows."
649                );
650
651                let mut columns = batch.columns().to_vec();
652                columns[i] = array.clone();
653                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
654            })
655            .collect::<Result<Vec<_>, PyArrowError>>()?;
656
657        Ok(PyTable::try_new(new_batches, new_schema)?.into())
658    }
659
660    #[getter]
661    fn shape(&self) -> (usize, usize) {
662        (self.num_rows(), self.num_columns())
663    }
664
665    #[pyo3(signature = (offset=0, length=None))]
666    #[pyo3(name = "slice")]
667    fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3Table> {
668        let length = length.unwrap_or_else(|| self.num_rows() - offset);
669        Ok(self.slice(offset, length)?.into())
670    }
671
672    fn to_batches(&self) -> Vec<Arro3RecordBatch> {
673        self.batches
674            .iter()
675            .map(|batch| PyRecordBatch::new(batch.clone()).into())
676            .collect()
677    }
678
679    fn to_reader(&self) -> Arro3RecordBatchReader {
680        let reader = Box::new(RecordBatchIterator::new(
681            self.batches.clone().into_iter().map(Ok),
682            self.schema.clone(),
683        ));
684        PyRecordBatchReader::new(reader).into()
685    }
686
687    fn to_struct_array(&self) -> PyArrowResult<Arro3ChunkedArray> {
688        let chunks = self
689            .batches
690            .iter()
691            .map(|batch| {
692                let struct_array: StructArray = batch.clone().into();
693                Arc::new(struct_array) as ArrayRef
694            })
695            .collect::<Vec<_>>();
696        let field = Field::new_struct("", self.schema.fields().clone(), false)
697            .with_metadata(self.schema.metadata.clone());
698        Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
699    }
700
701    fn with_schema(&self, schema: PySchema) -> PyArrowResult<Arro3Table> {
702        let new_schema = schema.into_inner();
703        let new_batches = self
704            .batches
705            .iter()
706            .map(|batch| RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec()))
707            .collect::<Result<Vec<_>, ArrowError>>()?;
708        Ok(PyTable::try_new(new_batches, new_schema)?.into())
709    }
710}