Skip to main content

pyo3_arrow/
table.rs

1use std::collections::HashMap;
2use std::fmt::Display;
3use std::sync::Arc;
4
5use arrow_array::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader;
6use arrow_array::{ArrayRef, RecordBatchReader, StructArray};
7use arrow_array::{RecordBatch, RecordBatchIterator};
8use arrow_cast::pretty::pretty_format_batches_with_options;
9use arrow_schema::{ArrowError, Field, Schema, SchemaRef};
10use arrow_select::concat::concat_batches;
11use indexmap::IndexMap;
12use pyo3::exceptions::{PyIndexError, PyKeyError, PyTypeError, PyValueError};
13use pyo3::intern;
14use pyo3::prelude::*;
15use pyo3::types::{PyCapsule, PyTuple, PyType};
16
17use crate::error::{PyArrowError, PyArrowResult};
18use crate::export::{
19    Arro3ChunkedArray, Arro3Field, Arro3RecordBatch, Arro3RecordBatchReader, Arro3Schema,
20    Arro3Table,
21};
22use crate::ffi::from_python::utils::import_stream_pycapsule;
23use crate::ffi::to_python::chunked::ArrayIterator;
24use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
25use crate::ffi::to_python::to_stream_pycapsule;
26use crate::ffi::to_schema_pycapsule;
27use crate::input::{
28    AnyArray, AnyRecordBatch, FieldIndexInput, MetadataInput, NameOrField, SelectIndices,
29};
30use crate::utils::{default_repr_options, schema_equals};
31use crate::{PyChunkedArray, PyField, PyRecordBatch, PyRecordBatchReader, PySchema};
32
33/// A Python-facing Arrow table.
34///
35/// This is a wrapper around a [SchemaRef] and a `Vec` of [RecordBatch].
36#[pyclass(module = "arro3.core._core", name = "Table", subclass, frozen)]
37#[derive(Debug)]
38pub struct PyTable {
39    batches: Vec<RecordBatch>,
40    schema: SchemaRef,
41}
42
43impl PyTable {
44    /// Create a new table from batches and a schema.
45    pub fn try_new(batches: Vec<RecordBatch>, schema: SchemaRef) -> PyResult<Self> {
46        if !batches
47            .iter()
48            .all(|rb| schema_equals(rb.schema_ref(), &schema))
49        {
50            return Err(PyTypeError::new_err("All batches must have same schema"));
51        }
52
53        Ok(Self { schema, batches })
54    }
55
56    /// Construct from a raw Arrow C Stream capsule
57    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
58        let stream = import_stream_pycapsule(capsule)?;
59        let stream_reader = ArrowRecordBatchStreamReader::try_new(stream)
60            .map_err(|err| PyValueError::new_err(err.to_string()))?;
61        let schema = stream_reader.schema();
62
63        let mut batches = vec![];
64        for batch in stream_reader {
65            let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?;
66            batches.push(batch);
67        }
68
69        Self::try_new(batches, schema)
70    }
71
72    /// Access the underlying batches
73    pub fn batches(&self) -> &[RecordBatch] {
74        &self.batches
75    }
76
77    /// Consume this and return its internal batches and schema.
78    pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
79        (self.batches, self.schema)
80    }
81
82    /// Export this to a Python `arro3.core.Table`.
83    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
84        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
85        arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
86            intern!(py, "from_arrow_pycapsule"),
87            PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
88        )
89    }
90
91    /// Export this to a Python `arro3.core.Table`.
92    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
93        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
94        let capsule =
95            Self::to_stream_pycapsule(py, self.batches.clone(), self.schema.clone(), None)?;
96        arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
97            intern!(py, "from_arrow_pycapsule"),
98            PyTuple::new(py, vec![capsule])?,
99        )
100    }
101
102    /// Export this to a Python `nanoarrow.ArrayStream`.
103    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
104        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
105    }
106
107    /// Export to a pyarrow.Table
108    ///
109    /// Requires pyarrow >=14
110    pub fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
111        let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
112        pyarrow_mod
113            .getattr(intern!(py, "table"))?
114            .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)
115    }
116
117    pub(crate) fn to_stream_pycapsule<'py>(
118        py: Python<'py>,
119        batches: Vec<RecordBatch>,
120        schema: SchemaRef,
121        requested_schema: Option<Bound<'py, PyCapsule>>,
122    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
123        let field = schema.fields();
124        let array_reader = batches.into_iter().map(|batch| {
125            let arr: ArrayRef = Arc::new(StructArray::from(batch));
126            Ok(arr)
127        });
128        let array_reader = Box::new(ArrayIterator::new(
129            array_reader,
130            Field::new_struct("", field.clone(), false)
131                .with_metadata(schema.metadata.clone())
132                .into(),
133        ));
134        to_stream_pycapsule(py, array_reader, requested_schema)
135    }
136
137    pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
138        let total_chunk_length = chunk_lengths.iter().sum::<usize>();
139        if total_chunk_length != self.num_rows() {
140            return Err(PyValueError::new_err(format!(
141                "Chunk lengths ({total_chunk_length})\
142                 do not add up to table length ({})",
143                self.num_rows()
144            ))
145            .into());
146        }
147
148        // If the desired rechunking is the existing chunking, return early
149        let matches_existing_chunking = chunk_lengths
150            .iter()
151            .zip(self.batches())
152            .all(|(length, batch)| *length == batch.num_rows());
153        if matches_existing_chunking {
154            return Ok(Self::try_new(self.batches.clone(), self.schema.clone())?);
155        }
156
157        let mut offset = 0;
158        let batches = chunk_lengths
159            .iter()
160            .map(|chunk_length| {
161                let sliced_table = self.slice(offset, *chunk_length)?;
162                let sliced_concatted = concat_batches(&self.schema, sliced_table.batches.iter())?;
163                offset += chunk_length;
164                Ok(sliced_concatted)
165            })
166            .collect::<PyArrowResult<Vec<_>>>()?;
167
168        Ok(Self::try_new(batches, self.schema.clone())?)
169    }
170
171    pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
172        if offset + length > self.num_rows() {
173            return Err(PyValueError::new_err(format!(
174                "offset + length ({}) may not exceed length of array ({})",
175                offset + length,
176                self.num_rows()
177            ))
178            .into());
179        }
180
181        let mut sliced_batches: Vec<RecordBatch> = vec![];
182        for chunk in self.batches() {
183            if chunk.num_rows() == 0 {
184                continue;
185            }
186
187            // If the offset is greater than the len of this chunk, don't include any rows from
188            // this chunk
189            if offset >= chunk.num_rows() {
190                offset -= chunk.num_rows();
191                continue;
192            }
193
194            let take_count = length.min(chunk.num_rows() - offset);
195            let sliced_chunk = chunk.slice(offset, take_count);
196            sliced_batches.push(sliced_chunk);
197
198            length -= take_count;
199
200            // If we've selected all rows, exit
201            if length == 0 {
202                break;
203            } else {
204                offset = 0;
205            }
206        }
207
208        Ok(Self::try_new(sliced_batches, self.schema.clone())?)
209    }
210}
211
212impl Display for PyTable {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        writeln!(f, "arro3.core.Table")?;
215        let head_table = self
216            .slice(0, 10.min(self.num_rows()))
217            .map_err(|_| std::fmt::Error)?
218            .combine_chunks()
219            .map_err(|_| std::fmt::Error)?;
220
221        pretty_format_batches_with_options(
222            &head_table.into_inner().batches,
223            &default_repr_options(),
224        )
225        .map_err(|_| std::fmt::Error)?
226        .fmt(f)?;
227
228        Ok(())
229    }
230}
231#[pymethods]
232impl PyTable {
233    #[new]
234    #[pyo3(signature = (data, *, names=None, schema=None, metadata=None))]
235    fn new(
236        py: Python,
237        data: &Bound<PyAny>,
238        names: Option<Vec<String>>,
239        schema: Option<PySchema>,
240        metadata: Option<MetadataInput>,
241    ) -> PyArrowResult<Self> {
242        if data.hasattr(intern!(py, "__arrow_c_array__"))?
243            || data.hasattr(intern!(py, "__arrow_c_stream__"))?
244        {
245            Ok(data.extract::<AnyRecordBatch>()?.into_table()?)
246        } else if let Ok(mapping) = data.extract::<IndexMap<String, AnyArray>>() {
247            Self::from_pydict(&py.get_type::<Self>(), mapping, schema, metadata)
248        } else if let Ok(arrays) = data.extract::<Vec<AnyArray>>() {
249            Self::from_arrays(&py.get_type::<Self>(), arrays, names, schema, metadata)
250        } else {
251            Err(PyTypeError::new_err(
252                "Expected Table-like input or dict of arrays or sequence of arrays.",
253            )
254            .into())
255        }
256    }
257
258    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
259        to_schema_pycapsule(py, self.schema.as_ref())
260    }
261
262    #[pyo3(signature = (requested_schema=None))]
263    fn __arrow_c_stream__<'py>(
264        &'py self,
265        py: Python<'py>,
266        requested_schema: Option<Bound<'py, PyCapsule>>,
267    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
268        Self::to_stream_pycapsule(
269            py,
270            self.batches.clone(),
271            self.schema.clone(),
272            requested_schema,
273        )
274    }
275
276    fn __eq__(&self, other: &PyTable) -> bool {
277        self.batches == other.batches && self.schema == other.schema
278    }
279
280    fn __getitem__(&self, key: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
281        self.column(key)
282    }
283
284    fn __len__(&self) -> usize {
285        self.batches.iter().fold(0, |acc, x| acc + x.num_rows())
286    }
287
288    fn __repr__(&self) -> String {
289        self.to_string()
290    }
291
292    #[classmethod]
293    fn from_arrow(_cls: &Bound<PyType>, input: AnyRecordBatch) -> PyArrowResult<Self> {
294        input.into_table()
295    }
296
297    #[classmethod]
298    #[pyo3(name = "from_arrow_pycapsule")]
299    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
300        Self::from_arrow_pycapsule(capsule)
301    }
302
303    #[classmethod]
304    #[pyo3(signature = (batches, *, schema=None))]
305    fn from_batches(
306        _cls: &Bound<PyType>,
307        batches: Vec<PyRecordBatch>,
308        schema: Option<PySchema>,
309    ) -> PyArrowResult<Self> {
310        if batches.is_empty() {
311            let schema = schema.ok_or(PyValueError::new_err(
312                "schema must be passed for an empty list of batches",
313            ))?;
314            return Ok(Self::try_new(vec![], schema.into_inner())?);
315        }
316
317        let batches = batches
318            .into_iter()
319            .map(|batch| batch.into_inner())
320            .collect::<Vec<_>>();
321        let schema = schema
322            .map(|s| s.into_inner())
323            .unwrap_or(batches.first().unwrap().schema());
324        Ok(Self::try_new(batches, schema)?)
325    }
326
327    #[classmethod]
328    #[pyo3(signature = (mapping, *, schema=None, metadata=None))]
329    fn from_pydict(
330        cls: &Bound<PyType>,
331        mapping: IndexMap<String, AnyArray>,
332        schema: Option<PySchema>,
333        metadata: Option<MetadataInput>,
334    ) -> PyArrowResult<Self> {
335        let (names, arrays): (Vec<_>, Vec<_>) = mapping.into_iter().unzip();
336        Self::from_arrays(cls, arrays, Some(names), schema, metadata)
337    }
338
339    #[classmethod]
340    #[pyo3(signature = (arrays, *, names=None, schema=None, metadata=None))]
341    fn from_arrays(
342        _cls: &Bound<PyType>,
343        arrays: Vec<AnyArray>,
344        names: Option<Vec<String>>,
345        schema: Option<PySchema>,
346        metadata: Option<MetadataInput>,
347    ) -> PyArrowResult<Self> {
348        if schema.is_some() && metadata.is_some() {
349            return Err(PyValueError::new_err("Cannot pass both schema and metadata").into());
350        }
351
352        let columns = arrays
353            .into_iter()
354            .map(|array| array.into_chunked_array())
355            .collect::<PyArrowResult<Vec<_>>>()?;
356
357        let schema: SchemaRef = if let Some(schema) = schema {
358            schema.into_inner()
359        } else {
360            let names = names.ok_or(PyValueError::new_err(
361                "names must be passed if schema is not passed.",
362            ))?;
363
364            let fields = columns
365                .iter()
366                .zip(names.iter())
367                .map(|(array, name)| Arc::new(array.field().as_ref().clone().with_name(name)))
368                .collect::<Vec<_>>();
369            Arc::new(
370                Schema::new(fields)
371                    .with_metadata(metadata.unwrap_or_default().into_string_hashmap().unwrap()),
372            )
373        };
374
375        if columns.is_empty() {
376            return Ok(Self::try_new(vec![], schema)?);
377        }
378
379        let column_chunk_lengths = columns
380            .iter()
381            .map(|column| {
382                let chunk_lengths = column
383                    .chunks()
384                    .iter()
385                    .map(|chunk| chunk.len())
386                    .collect::<Vec<_>>();
387                chunk_lengths
388            })
389            .collect::<Vec<_>>();
390        if !column_chunk_lengths.windows(2).all(|w| w[0] == w[1]) {
391            return Err(
392                PyValueError::new_err("All columns must have the same chunk lengths").into(),
393            );
394        }
395        let num_batches = column_chunk_lengths[0].len();
396
397        let mut batches = vec![];
398        for batch_idx in 0..num_batches {
399            let batch = RecordBatch::try_new(
400                schema.clone(),
401                columns
402                    .iter()
403                    .map(|column| column.chunks()[batch_idx].clone())
404                    .collect(),
405            )?;
406            batches.push(batch);
407        }
408
409        Ok(Self::try_new(batches, schema)?)
410    }
411
412    fn drop_columns(&self, columns: Vec<String>) -> PyArrowResult<Arro3Table> {
413        let current_columns = self.column_names();
414        let mut pos: HashMap<&str, usize> = HashMap::with_capacity(current_columns.len());
415        for (i, s) in current_columns.iter().enumerate() {
416            pos.insert(s.as_str(), i);
417        }
418
419        let drop_indices: Vec<usize> = columns
420            .iter()
421            .filter_map(|s| pos.get(s.as_str()).copied())
422            .collect();
423
424        if drop_indices.len() < columns.len() {
425            // We have columns that don't exist
426            let missing: Vec<&str> = columns
427                .iter()
428                .map(|s| s.as_str())
429                .filter(|s| !pos.contains_key(s))
430                .collect();
431            return Err(PyKeyError::new_err(format!("Column(s): {missing:?} not found")).into());
432        }
433
434        let keep_indices: Vec<usize> = (0..self.num_columns())
435            .filter(|i| !drop_indices.contains(i))
436            .collect();
437
438        self.select(SelectIndices::Positions(keep_indices))
439    }
440
441    fn add_column(
442        &self,
443        i: usize,
444        field: NameOrField,
445        column: AnyArray,
446    ) -> PyArrowResult<Arro3Table> {
447        let column = column.into_chunked_array()?;
448        if self.num_rows() != column.len() {
449            return Err(PyValueError::new_err(format!(
450                "The number of rows in column ({}) does not match the table ({}).",
451                column.len(),
452                self.num_rows()
453            ))
454            .into());
455        }
456
457        if i > self.num_columns() {
458            return Err(PyIndexError::new_err(format!(
459                "Column index out of range, index is {i} but should be <= {}",
460                self.num_columns()
461            ))
462            .into());
463        }
464
465        let column = column.rechunk(self.chunk_lengths())?;
466
467        let mut fields = self.schema.fields().to_vec();
468        fields.insert(i, field.into_field(column.field()));
469        let new_schema = Arc::new(Schema::new_with_metadata(
470            fields,
471            self.schema.metadata().clone(),
472        ));
473
474        let new_batches = self
475            .batches
476            .iter()
477            .zip(column.chunks())
478            .map(|(batch, array)| {
479                debug_assert_eq!(
480                    array.len(),
481                    batch.num_rows(),
482                    "Array and batch should have the same number of rows."
483                );
484
485                let mut columns = batch.columns().to_vec();
486                columns.insert(i, array.clone());
487                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
488            })
489            .collect::<Result<Vec<_>, PyArrowError>>()?;
490
491        Ok(PyTable::try_new(new_batches, new_schema)?.into())
492    }
493
494    fn append_column(&self, field: NameOrField, column: AnyArray) -> PyArrowResult<Arro3Table> {
495        let column: PyChunkedArray = column.into_chunked_array()?;
496
497        if self.num_rows() != column.len() {
498            return Err(PyValueError::new_err(format!(
499                "The number of rows in column ({}) does not match the table ({}).",
500                column.len(),
501                self.num_rows()
502            ))
503            .into());
504        }
505
506        let column = column.rechunk(self.chunk_lengths())?;
507
508        let mut fields = self.schema.fields().to_vec();
509        fields.push(field.into_field(column.field()));
510        let new_schema = Arc::new(Schema::new_with_metadata(
511            fields,
512            self.schema.metadata().clone(),
513        ));
514
515        let new_batches = self
516            .batches
517            .iter()
518            .zip(column.chunks())
519            .map(|(batch, array)| {
520                debug_assert_eq!(
521                    array.len(),
522                    batch.num_rows(),
523                    "Array and batch should have the same number of rows."
524                );
525
526                let mut columns = batch.columns().to_vec();
527                columns.push(array.clone());
528                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
529            })
530            .collect::<Result<Vec<_>, PyArrowError>>()?;
531
532        Ok(PyTable::try_new(new_batches, new_schema)?.into())
533    }
534
535    #[getter]
536    fn chunk_lengths(&self) -> Vec<usize> {
537        self.batches.iter().map(|batch| batch.num_rows()).collect()
538    }
539
540    fn column(&self, i: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
541        let column_index = i.into_position(&self.schema)?;
542        let field = self.schema.field(column_index).clone();
543        let chunks = self
544            .batches
545            .iter()
546            .map(|batch| batch.column(column_index).clone())
547            .collect();
548        Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
549    }
550
551    #[getter]
552    fn column_names(&self) -> Vec<String> {
553        self.schema
554            .fields()
555            .iter()
556            .map(|f| f.name().clone())
557            .collect()
558    }
559
560    #[getter]
561    fn columns(&self) -> PyArrowResult<Vec<Arro3ChunkedArray>> {
562        (0..self.num_columns())
563            .map(|i| self.column(FieldIndexInput::Position(i)))
564            .collect()
565    }
566
567    fn combine_chunks(&self) -> PyArrowResult<Arro3Table> {
568        let batch = concat_batches(&self.schema, &self.batches)?;
569        Ok(PyTable::try_new(vec![batch], self.schema.clone())?.into())
570    }
571
572    fn field(&self, i: FieldIndexInput) -> PyArrowResult<Arro3Field> {
573        let field = self.schema.field(i.into_position(&self.schema)?);
574        Ok(PyField::new(field.clone().into()).into())
575    }
576
577    #[getter]
578    fn nbytes(&self) -> usize {
579        self.batches
580            .iter()
581            .fold(0, |acc, batch| acc + batch.get_array_memory_size())
582    }
583
584    #[getter]
585    fn num_columns(&self) -> usize {
586        self.schema.fields().len()
587    }
588
589    #[getter]
590    fn num_rows(&self) -> usize {
591        self.batches()
592            .iter()
593            .fold(0, |acc, batch| acc + batch.num_rows())
594    }
595
596    #[pyo3(signature = (*, max_chunksize=None))]
597    #[pyo3(name = "rechunk")]
598    fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3Table> {
599        let max_chunksize = max_chunksize.unwrap_or(self.num_rows());
600        if max_chunksize == 0 {
601            return Err(PyValueError::new_err("max_chunksize must be > 0").into());
602        }
603
604        let mut chunk_lengths = vec![];
605        let mut offset = 0;
606        while offset < self.num_rows() {
607            let chunk_length = max_chunksize.min(self.num_rows() - offset);
608            offset += chunk_length;
609            chunk_lengths.push(chunk_length);
610        }
611        Ok(self.rechunk(chunk_lengths)?.into())
612    }
613
614    fn remove_column(&self, i: usize) -> PyArrowResult<Arro3Table> {
615        if i >= self.num_columns() {
616            return Err(PyIndexError::new_err(format!("Invalid column index \"{i}\"")).into());
617        }
618
619        let mut fields = self.schema.fields().to_vec();
620        fields.remove(i);
621        let new_schema = Arc::new(Schema::new_with_metadata(
622            fields,
623            self.schema.metadata().clone(),
624        ));
625
626        let new_batches = self
627            .batches
628            .iter()
629            .map(|batch| {
630                let mut columns = batch.columns().to_vec();
631                columns.remove(i);
632                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
633            })
634            .collect::<Result<Vec<_>, PyArrowError>>()?;
635
636        Ok(PyTable::try_new(new_batches, new_schema)?.into())
637    }
638
639    fn rename_columns(&self, names: Vec<String>) -> PyArrowResult<Arro3Table> {
640        if names.len() != self.num_columns() {
641            return Err(PyValueError::new_err(format!(
642                "Expected {} names, got {}",
643                self.num_columns(),
644                names.len()
645            ))
646            .into());
647        }
648        let new_fields = self
649            .schema
650            .fields()
651            .iter()
652            .zip(names)
653            .map(|(field, name)| field.as_ref().clone().with_name(name))
654            .collect::<Vec<_>>();
655        let new_schema = Arc::new(Schema::new_with_metadata(
656            new_fields,
657            self.schema.metadata().clone(),
658        ));
659
660        let new_batches = self
661            .batches
662            .iter()
663            .map(|batch| {
664                RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec()).unwrap()
665            })
666            .collect();
667
668        Ok(PyTable::try_new(new_batches, new_schema)?.into())
669    }
670
671    #[getter]
672    fn schema(&self) -> Arro3Schema {
673        PySchema::new(self.schema.clone()).into()
674    }
675
676    fn select(&self, columns: SelectIndices) -> PyArrowResult<Arro3Table> {
677        let positions = columns.into_positions(self.schema.fields())?;
678
679        let new_schema = Arc::new(self.schema.project(&positions)?);
680        let new_batches = self
681            .batches
682            .iter()
683            .map(|batch| batch.project(&positions))
684            .collect::<Result<Vec<_>, ArrowError>>()?;
685        Ok(PyTable::try_new(new_batches, new_schema)?.into())
686    }
687
688    fn set_column(
689        &self,
690        i: usize,
691        field: NameOrField,
692        column: AnyArray,
693    ) -> PyArrowResult<Arro3Table> {
694        let column = column.into_chunked_array()?;
695        if self.num_rows() != column.len() {
696            return Err(PyValueError::new_err(format!(
697                "The number of rows in column ({}) does not match the table ({}).",
698                column.len(),
699                self.num_rows()
700            ))
701            .into());
702        }
703
704        let column = column.rechunk(self.chunk_lengths())?;
705
706        let mut fields = self.schema.fields().to_vec();
707        fields[i] = field.into_field(column.field());
708        let new_schema = Arc::new(Schema::new_with_metadata(
709            fields,
710            self.schema.metadata().clone(),
711        ));
712
713        let new_batches = self
714            .batches
715            .iter()
716            .zip(column.chunks())
717            .map(|(batch, array)| {
718                debug_assert_eq!(
719                    array.len(),
720                    batch.num_rows(),
721                    "Array and batch should have same number of rows."
722                );
723
724                let mut columns = batch.columns().to_vec();
725                columns[i] = array.clone();
726                Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
727            })
728            .collect::<Result<Vec<_>, PyArrowError>>()?;
729
730        Ok(PyTable::try_new(new_batches, new_schema)?.into())
731    }
732
733    #[getter]
734    fn shape(&self) -> (usize, usize) {
735        (self.num_rows(), self.num_columns())
736    }
737
738    #[pyo3(signature = (offset=0, length=None))]
739    #[pyo3(name = "slice")]
740    fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3Table> {
741        let length = length.unwrap_or_else(|| self.num_rows() - offset);
742        Ok(self.slice(offset, length)?.into())
743    }
744
745    fn to_batches(&self) -> Vec<Arro3RecordBatch> {
746        self.batches
747            .iter()
748            .map(|batch| PyRecordBatch::new(batch.clone()).into())
749            .collect()
750    }
751
752    fn to_reader(&self) -> Arro3RecordBatchReader {
753        let reader = Box::new(RecordBatchIterator::new(
754            self.batches.clone().into_iter().map(Ok),
755            self.schema.clone(),
756        ));
757        PyRecordBatchReader::new(reader).into()
758    }
759
760    fn to_struct_array(&self) -> PyArrowResult<Arro3ChunkedArray> {
761        let chunks = self
762            .batches
763            .iter()
764            .map(|batch| {
765                let struct_array: StructArray = batch.clone().into();
766                Arc::new(struct_array) as ArrayRef
767            })
768            .collect::<Vec<_>>();
769        let field = Field::new_struct("", self.schema.fields().clone(), false)
770            .with_metadata(self.schema.metadata.clone());
771        Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
772    }
773
774    fn with_schema(&self, schema: PySchema) -> PyArrowResult<Arro3Table> {
775        let new_schema = schema.into_inner();
776        let new_batches = self
777            .batches
778            .iter()
779            .map(|batch| RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec()))
780            .collect::<Result<Vec<_>, ArrowError>>()?;
781        Ok(PyTable::try_new(new_batches, new_schema)?.into())
782    }
783}