1use std::collections::HashMap;
2use std::fmt::Display;
3use std::sync::Arc;
4
5use arrow_array::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader;
6use arrow_array::{ArrayRef, RecordBatchReader, StructArray};
7use arrow_array::{RecordBatch, RecordBatchIterator};
8use arrow_cast::pretty::pretty_format_batches_with_options;
9use arrow_schema::{ArrowError, Field, Schema, SchemaRef};
10use arrow_select::concat::concat_batches;
11use indexmap::IndexMap;
12use pyo3::exceptions::{PyIndexError, PyKeyError, PyTypeError, PyValueError};
13use pyo3::intern;
14use pyo3::prelude::*;
15use pyo3::types::{PyCapsule, PyTuple, PyType};
16
17use crate::error::{PyArrowError, PyArrowResult};
18use crate::export::{
19 Arro3ChunkedArray, Arro3Field, Arro3RecordBatch, Arro3RecordBatchReader, Arro3Schema,
20 Arro3Table,
21};
22use crate::ffi::from_python::utils::import_stream_pycapsule;
23use crate::ffi::to_python::chunked::ArrayIterator;
24use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
25use crate::ffi::to_python::to_stream_pycapsule;
26use crate::ffi::to_schema_pycapsule;
27use crate::input::{
28 AnyArray, AnyRecordBatch, FieldIndexInput, MetadataInput, NameOrField, SelectIndices,
29};
30use crate::utils::{default_repr_options, schema_equals};
31use crate::{PyChunkedArray, PyField, PyRecordBatch, PyRecordBatchReader, PySchema};
32
33#[pyclass(module = "arro3.core._core", name = "Table", subclass, frozen)]
37#[derive(Debug)]
38pub struct PyTable {
39 batches: Vec<RecordBatch>,
40 schema: SchemaRef,
41}
42
43impl PyTable {
44 pub fn try_new(batches: Vec<RecordBatch>, schema: SchemaRef) -> PyResult<Self> {
46 if !batches
47 .iter()
48 .all(|rb| schema_equals(rb.schema_ref(), &schema))
49 {
50 return Err(PyTypeError::new_err("All batches must have same schema"));
51 }
52
53 Ok(Self { schema, batches })
54 }
55
56 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
58 let stream = import_stream_pycapsule(capsule)?;
59 let stream_reader = ArrowRecordBatchStreamReader::try_new(stream)
60 .map_err(|err| PyValueError::new_err(err.to_string()))?;
61 let schema = stream_reader.schema();
62
63 let mut batches = vec![];
64 for batch in stream_reader {
65 let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?;
66 batches.push(batch);
67 }
68
69 Self::try_new(batches, schema)
70 }
71
72 pub fn batches(&self) -> &[RecordBatch] {
74 &self.batches
75 }
76
77 pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
79 (self.batches, self.schema)
80 }
81
82 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
84 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
85 arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
86 intern!(py, "from_arrow_pycapsule"),
87 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
88 )
89 }
90
91 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
93 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
94 let capsule =
95 Self::to_stream_pycapsule(py, self.batches.clone(), self.schema.clone(), None)?;
96 arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
97 intern!(py, "from_arrow_pycapsule"),
98 PyTuple::new(py, vec![capsule])?,
99 )
100 }
101
102 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
104 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
105 }
106
107 pub fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
111 let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
112 pyarrow_mod
113 .getattr(intern!(py, "table"))?
114 .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)
115 }
116
117 pub(crate) fn to_stream_pycapsule<'py>(
118 py: Python<'py>,
119 batches: Vec<RecordBatch>,
120 schema: SchemaRef,
121 requested_schema: Option<Bound<'py, PyCapsule>>,
122 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
123 let field = schema.fields();
124 let array_reader = batches.into_iter().map(|batch| {
125 let arr: ArrayRef = Arc::new(StructArray::from(batch));
126 Ok(arr)
127 });
128 let array_reader = Box::new(ArrayIterator::new(
129 array_reader,
130 Field::new_struct("", field.clone(), false)
131 .with_metadata(schema.metadata.clone())
132 .into(),
133 ));
134 to_stream_pycapsule(py, array_reader, requested_schema)
135 }
136
137 pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
138 let total_chunk_length = chunk_lengths.iter().sum::<usize>();
139 if total_chunk_length != self.num_rows() {
140 return Err(PyValueError::new_err(format!(
141 "Chunk lengths ({total_chunk_length})\
142 do not add up to table length ({})",
143 self.num_rows()
144 ))
145 .into());
146 }
147
148 let matches_existing_chunking = chunk_lengths
150 .iter()
151 .zip(self.batches())
152 .all(|(length, batch)| *length == batch.num_rows());
153 if matches_existing_chunking {
154 return Ok(Self::try_new(self.batches.clone(), self.schema.clone())?);
155 }
156
157 let mut offset = 0;
158 let batches = chunk_lengths
159 .iter()
160 .map(|chunk_length| {
161 let sliced_table = self.slice(offset, *chunk_length)?;
162 let sliced_concatted = concat_batches(&self.schema, sliced_table.batches.iter())?;
163 offset += chunk_length;
164 Ok(sliced_concatted)
165 })
166 .collect::<PyArrowResult<Vec<_>>>()?;
167
168 Ok(Self::try_new(batches, self.schema.clone())?)
169 }
170
171 pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
172 if offset + length > self.num_rows() {
173 return Err(PyValueError::new_err(format!(
174 "offset + length ({}) may not exceed length of array ({})",
175 offset + length,
176 self.num_rows()
177 ))
178 .into());
179 }
180
181 let mut sliced_batches: Vec<RecordBatch> = vec![];
182 for chunk in self.batches() {
183 if chunk.num_rows() == 0 {
184 continue;
185 }
186
187 if offset >= chunk.num_rows() {
190 offset -= chunk.num_rows();
191 continue;
192 }
193
194 let take_count = length.min(chunk.num_rows() - offset);
195 let sliced_chunk = chunk.slice(offset, take_count);
196 sliced_batches.push(sliced_chunk);
197
198 length -= take_count;
199
200 if length == 0 {
202 break;
203 } else {
204 offset = 0;
205 }
206 }
207
208 Ok(Self::try_new(sliced_batches, self.schema.clone())?)
209 }
210}
211
212impl Display for PyTable {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 writeln!(f, "arro3.core.Table")?;
215 let head_table = self
216 .slice(0, 10.min(self.num_rows()))
217 .map_err(|_| std::fmt::Error)?
218 .combine_chunks()
219 .map_err(|_| std::fmt::Error)?;
220
221 pretty_format_batches_with_options(
222 &head_table.into_inner().batches,
223 &default_repr_options(),
224 )
225 .map_err(|_| std::fmt::Error)?
226 .fmt(f)?;
227
228 Ok(())
229 }
230}
231#[pymethods]
232impl PyTable {
233 #[new]
234 #[pyo3(signature = (data, *, names=None, schema=None, metadata=None))]
235 fn new(
236 py: Python,
237 data: &Bound<PyAny>,
238 names: Option<Vec<String>>,
239 schema: Option<PySchema>,
240 metadata: Option<MetadataInput>,
241 ) -> PyArrowResult<Self> {
242 if data.hasattr(intern!(py, "__arrow_c_array__"))?
243 || data.hasattr(intern!(py, "__arrow_c_stream__"))?
244 {
245 Ok(data.extract::<AnyRecordBatch>()?.into_table()?)
246 } else if let Ok(mapping) = data.extract::<IndexMap<String, AnyArray>>() {
247 Self::from_pydict(&py.get_type::<Self>(), mapping, schema, metadata)
248 } else if let Ok(arrays) = data.extract::<Vec<AnyArray>>() {
249 Self::from_arrays(&py.get_type::<Self>(), arrays, names, schema, metadata)
250 } else {
251 Err(PyTypeError::new_err(
252 "Expected Table-like input or dict of arrays or sequence of arrays.",
253 )
254 .into())
255 }
256 }
257
258 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
259 to_schema_pycapsule(py, self.schema.as_ref())
260 }
261
262 #[pyo3(signature = (requested_schema=None))]
263 fn __arrow_c_stream__<'py>(
264 &'py self,
265 py: Python<'py>,
266 requested_schema: Option<Bound<'py, PyCapsule>>,
267 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
268 Self::to_stream_pycapsule(
269 py,
270 self.batches.clone(),
271 self.schema.clone(),
272 requested_schema,
273 )
274 }
275
276 fn __eq__(&self, other: &PyTable) -> bool {
277 self.batches == other.batches && self.schema == other.schema
278 }
279
280 fn __getitem__(&self, key: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
281 self.column(key)
282 }
283
284 fn __len__(&self) -> usize {
285 self.batches.iter().fold(0, |acc, x| acc + x.num_rows())
286 }
287
288 fn __repr__(&self) -> String {
289 self.to_string()
290 }
291
292 #[classmethod]
293 fn from_arrow(_cls: &Bound<PyType>, input: AnyRecordBatch) -> PyArrowResult<Self> {
294 input.into_table()
295 }
296
297 #[classmethod]
298 #[pyo3(name = "from_arrow_pycapsule")]
299 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
300 Self::from_arrow_pycapsule(capsule)
301 }
302
303 #[classmethod]
304 #[pyo3(signature = (batches, *, schema=None))]
305 fn from_batches(
306 _cls: &Bound<PyType>,
307 batches: Vec<PyRecordBatch>,
308 schema: Option<PySchema>,
309 ) -> PyArrowResult<Self> {
310 if batches.is_empty() {
311 let schema = schema.ok_or(PyValueError::new_err(
312 "schema must be passed for an empty list of batches",
313 ))?;
314 return Ok(Self::try_new(vec![], schema.into_inner())?);
315 }
316
317 let batches = batches
318 .into_iter()
319 .map(|batch| batch.into_inner())
320 .collect::<Vec<_>>();
321 let schema = schema
322 .map(|s| s.into_inner())
323 .unwrap_or(batches.first().unwrap().schema());
324 Ok(Self::try_new(batches, schema)?)
325 }
326
327 #[classmethod]
328 #[pyo3(signature = (mapping, *, schema=None, metadata=None))]
329 fn from_pydict(
330 cls: &Bound<PyType>,
331 mapping: IndexMap<String, AnyArray>,
332 schema: Option<PySchema>,
333 metadata: Option<MetadataInput>,
334 ) -> PyArrowResult<Self> {
335 let (names, arrays): (Vec<_>, Vec<_>) = mapping.into_iter().unzip();
336 Self::from_arrays(cls, arrays, Some(names), schema, metadata)
337 }
338
339 #[classmethod]
340 #[pyo3(signature = (arrays, *, names=None, schema=None, metadata=None))]
341 fn from_arrays(
342 _cls: &Bound<PyType>,
343 arrays: Vec<AnyArray>,
344 names: Option<Vec<String>>,
345 schema: Option<PySchema>,
346 metadata: Option<MetadataInput>,
347 ) -> PyArrowResult<Self> {
348 if schema.is_some() && metadata.is_some() {
349 return Err(PyValueError::new_err("Cannot pass both schema and metadata").into());
350 }
351
352 let columns = arrays
353 .into_iter()
354 .map(|array| array.into_chunked_array())
355 .collect::<PyArrowResult<Vec<_>>>()?;
356
357 let schema: SchemaRef = if let Some(schema) = schema {
358 schema.into_inner()
359 } else {
360 let names = names.ok_or(PyValueError::new_err(
361 "names must be passed if schema is not passed.",
362 ))?;
363
364 let fields = columns
365 .iter()
366 .zip(names.iter())
367 .map(|(array, name)| Arc::new(array.field().as_ref().clone().with_name(name)))
368 .collect::<Vec<_>>();
369 Arc::new(
370 Schema::new(fields)
371 .with_metadata(metadata.unwrap_or_default().into_string_hashmap().unwrap()),
372 )
373 };
374
375 if columns.is_empty() {
376 return Ok(Self::try_new(vec![], schema)?);
377 }
378
379 let column_chunk_lengths = columns
380 .iter()
381 .map(|column| {
382 let chunk_lengths = column
383 .chunks()
384 .iter()
385 .map(|chunk| chunk.len())
386 .collect::<Vec<_>>();
387 chunk_lengths
388 })
389 .collect::<Vec<_>>();
390 if !column_chunk_lengths.windows(2).all(|w| w[0] == w[1]) {
391 return Err(
392 PyValueError::new_err("All columns must have the same chunk lengths").into(),
393 );
394 }
395 let num_batches = column_chunk_lengths[0].len();
396
397 let mut batches = vec![];
398 for batch_idx in 0..num_batches {
399 let batch = RecordBatch::try_new(
400 schema.clone(),
401 columns
402 .iter()
403 .map(|column| column.chunks()[batch_idx].clone())
404 .collect(),
405 )?;
406 batches.push(batch);
407 }
408
409 Ok(Self::try_new(batches, schema)?)
410 }
411
412 fn drop_columns(&self, columns: Vec<String>) -> PyArrowResult<Arro3Table> {
413 let current_columns = self.column_names();
414 let mut pos: HashMap<&str, usize> = HashMap::with_capacity(current_columns.len());
415 for (i, s) in current_columns.iter().enumerate() {
416 pos.insert(s.as_str(), i);
417 }
418
419 let drop_indices: Vec<usize> = columns
420 .iter()
421 .filter_map(|s| pos.get(s.as_str()).copied())
422 .collect();
423
424 if drop_indices.len() < columns.len() {
425 let missing: Vec<&str> = columns
427 .iter()
428 .map(|s| s.as_str())
429 .filter(|s| !pos.contains_key(s))
430 .collect();
431 return Err(PyKeyError::new_err(format!("Column(s): {missing:?} not found")).into());
432 }
433
434 let keep_indices: Vec<usize> = (0..self.num_columns())
435 .filter(|i| !drop_indices.contains(i))
436 .collect();
437
438 self.select(SelectIndices::Positions(keep_indices))
439 }
440
441 fn add_column(
442 &self,
443 i: usize,
444 field: NameOrField,
445 column: AnyArray,
446 ) -> PyArrowResult<Arro3Table> {
447 let column = column.into_chunked_array()?;
448 if self.num_rows() != column.len() {
449 return Err(PyValueError::new_err(format!(
450 "The number of rows in column ({}) does not match the table ({}).",
451 column.len(),
452 self.num_rows()
453 ))
454 .into());
455 }
456
457 if i > self.num_columns() {
458 return Err(PyIndexError::new_err(format!(
459 "Column index out of range, index is {i} but should be <= {}",
460 self.num_columns()
461 ))
462 .into());
463 }
464
465 let column = column.rechunk(self.chunk_lengths())?;
466
467 let mut fields = self.schema.fields().to_vec();
468 fields.insert(i, field.into_field(column.field()));
469 let new_schema = Arc::new(Schema::new_with_metadata(
470 fields,
471 self.schema.metadata().clone(),
472 ));
473
474 let new_batches = self
475 .batches
476 .iter()
477 .zip(column.chunks())
478 .map(|(batch, array)| {
479 debug_assert_eq!(
480 array.len(),
481 batch.num_rows(),
482 "Array and batch should have the same number of rows."
483 );
484
485 let mut columns = batch.columns().to_vec();
486 columns.insert(i, array.clone());
487 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
488 })
489 .collect::<Result<Vec<_>, PyArrowError>>()?;
490
491 Ok(PyTable::try_new(new_batches, new_schema)?.into())
492 }
493
494 fn append_column(&self, field: NameOrField, column: AnyArray) -> PyArrowResult<Arro3Table> {
495 let column: PyChunkedArray = column.into_chunked_array()?;
496
497 if self.num_rows() != column.len() {
498 return Err(PyValueError::new_err(format!(
499 "The number of rows in column ({}) does not match the table ({}).",
500 column.len(),
501 self.num_rows()
502 ))
503 .into());
504 }
505
506 let column = column.rechunk(self.chunk_lengths())?;
507
508 let mut fields = self.schema.fields().to_vec();
509 fields.push(field.into_field(column.field()));
510 let new_schema = Arc::new(Schema::new_with_metadata(
511 fields,
512 self.schema.metadata().clone(),
513 ));
514
515 let new_batches = self
516 .batches
517 .iter()
518 .zip(column.chunks())
519 .map(|(batch, array)| {
520 debug_assert_eq!(
521 array.len(),
522 batch.num_rows(),
523 "Array and batch should have the same number of rows."
524 );
525
526 let mut columns = batch.columns().to_vec();
527 columns.push(array.clone());
528 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
529 })
530 .collect::<Result<Vec<_>, PyArrowError>>()?;
531
532 Ok(PyTable::try_new(new_batches, new_schema)?.into())
533 }
534
535 #[getter]
536 fn chunk_lengths(&self) -> Vec<usize> {
537 self.batches.iter().map(|batch| batch.num_rows()).collect()
538 }
539
540 fn column(&self, i: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
541 let column_index = i.into_position(&self.schema)?;
542 let field = self.schema.field(column_index).clone();
543 let chunks = self
544 .batches
545 .iter()
546 .map(|batch| batch.column(column_index).clone())
547 .collect();
548 Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
549 }
550
551 #[getter]
552 fn column_names(&self) -> Vec<String> {
553 self.schema
554 .fields()
555 .iter()
556 .map(|f| f.name().clone())
557 .collect()
558 }
559
560 #[getter]
561 fn columns(&self) -> PyArrowResult<Vec<Arro3ChunkedArray>> {
562 (0..self.num_columns())
563 .map(|i| self.column(FieldIndexInput::Position(i)))
564 .collect()
565 }
566
567 fn combine_chunks(&self) -> PyArrowResult<Arro3Table> {
568 let batch = concat_batches(&self.schema, &self.batches)?;
569 Ok(PyTable::try_new(vec![batch], self.schema.clone())?.into())
570 }
571
572 fn field(&self, i: FieldIndexInput) -> PyArrowResult<Arro3Field> {
573 let field = self.schema.field(i.into_position(&self.schema)?);
574 Ok(PyField::new(field.clone().into()).into())
575 }
576
577 #[getter]
578 fn nbytes(&self) -> usize {
579 self.batches
580 .iter()
581 .fold(0, |acc, batch| acc + batch.get_array_memory_size())
582 }
583
584 #[getter]
585 fn num_columns(&self) -> usize {
586 self.schema.fields().len()
587 }
588
589 #[getter]
590 fn num_rows(&self) -> usize {
591 self.batches()
592 .iter()
593 .fold(0, |acc, batch| acc + batch.num_rows())
594 }
595
596 #[pyo3(signature = (*, max_chunksize=None))]
597 #[pyo3(name = "rechunk")]
598 fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3Table> {
599 let max_chunksize = max_chunksize.unwrap_or(self.num_rows());
600 if max_chunksize == 0 {
601 return Err(PyValueError::new_err("max_chunksize must be > 0").into());
602 }
603
604 let mut chunk_lengths = vec![];
605 let mut offset = 0;
606 while offset < self.num_rows() {
607 let chunk_length = max_chunksize.min(self.num_rows() - offset);
608 offset += chunk_length;
609 chunk_lengths.push(chunk_length);
610 }
611 Ok(self.rechunk(chunk_lengths)?.into())
612 }
613
614 fn remove_column(&self, i: usize) -> PyArrowResult<Arro3Table> {
615 if i >= self.num_columns() {
616 return Err(PyIndexError::new_err(format!("Invalid column index \"{i}\"")).into());
617 }
618
619 let mut fields = self.schema.fields().to_vec();
620 fields.remove(i);
621 let new_schema = Arc::new(Schema::new_with_metadata(
622 fields,
623 self.schema.metadata().clone(),
624 ));
625
626 let new_batches = self
627 .batches
628 .iter()
629 .map(|batch| {
630 let mut columns = batch.columns().to_vec();
631 columns.remove(i);
632 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
633 })
634 .collect::<Result<Vec<_>, PyArrowError>>()?;
635
636 Ok(PyTable::try_new(new_batches, new_schema)?.into())
637 }
638
639 fn rename_columns(&self, names: Vec<String>) -> PyArrowResult<Arro3Table> {
640 if names.len() != self.num_columns() {
641 return Err(PyValueError::new_err(format!(
642 "Expected {} names, got {}",
643 self.num_columns(),
644 names.len()
645 ))
646 .into());
647 }
648 let new_fields = self
649 .schema
650 .fields()
651 .iter()
652 .zip(names)
653 .map(|(field, name)| field.as_ref().clone().with_name(name))
654 .collect::<Vec<_>>();
655 let new_schema = Arc::new(Schema::new_with_metadata(
656 new_fields,
657 self.schema.metadata().clone(),
658 ));
659
660 let new_batches = self
661 .batches
662 .iter()
663 .map(|batch| {
664 RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec()).unwrap()
665 })
666 .collect();
667
668 Ok(PyTable::try_new(new_batches, new_schema)?.into())
669 }
670
671 #[getter]
672 fn schema(&self) -> Arro3Schema {
673 PySchema::new(self.schema.clone()).into()
674 }
675
676 fn select(&self, columns: SelectIndices) -> PyArrowResult<Arro3Table> {
677 let positions = columns.into_positions(self.schema.fields())?;
678
679 let new_schema = Arc::new(self.schema.project(&positions)?);
680 let new_batches = self
681 .batches
682 .iter()
683 .map(|batch| batch.project(&positions))
684 .collect::<Result<Vec<_>, ArrowError>>()?;
685 Ok(PyTable::try_new(new_batches, new_schema)?.into())
686 }
687
688 fn set_column(
689 &self,
690 i: usize,
691 field: NameOrField,
692 column: AnyArray,
693 ) -> PyArrowResult<Arro3Table> {
694 let column = column.into_chunked_array()?;
695 if self.num_rows() != column.len() {
696 return Err(PyValueError::new_err(format!(
697 "The number of rows in column ({}) does not match the table ({}).",
698 column.len(),
699 self.num_rows()
700 ))
701 .into());
702 }
703
704 let column = column.rechunk(self.chunk_lengths())?;
705
706 let mut fields = self.schema.fields().to_vec();
707 fields[i] = field.into_field(column.field());
708 let new_schema = Arc::new(Schema::new_with_metadata(
709 fields,
710 self.schema.metadata().clone(),
711 ));
712
713 let new_batches = self
714 .batches
715 .iter()
716 .zip(column.chunks())
717 .map(|(batch, array)| {
718 debug_assert_eq!(
719 array.len(),
720 batch.num_rows(),
721 "Array and batch should have same number of rows."
722 );
723
724 let mut columns = batch.columns().to_vec();
725 columns[i] = array.clone();
726 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
727 })
728 .collect::<Result<Vec<_>, PyArrowError>>()?;
729
730 Ok(PyTable::try_new(new_batches, new_schema)?.into())
731 }
732
733 #[getter]
734 fn shape(&self) -> (usize, usize) {
735 (self.num_rows(), self.num_columns())
736 }
737
738 #[pyo3(signature = (offset=0, length=None))]
739 #[pyo3(name = "slice")]
740 fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3Table> {
741 let length = length.unwrap_or_else(|| self.num_rows() - offset);
742 Ok(self.slice(offset, length)?.into())
743 }
744
745 fn to_batches(&self) -> Vec<Arro3RecordBatch> {
746 self.batches
747 .iter()
748 .map(|batch| PyRecordBatch::new(batch.clone()).into())
749 .collect()
750 }
751
752 fn to_reader(&self) -> Arro3RecordBatchReader {
753 let reader = Box::new(RecordBatchIterator::new(
754 self.batches.clone().into_iter().map(Ok),
755 self.schema.clone(),
756 ));
757 PyRecordBatchReader::new(reader).into()
758 }
759
760 fn to_struct_array(&self) -> PyArrowResult<Arro3ChunkedArray> {
761 let chunks = self
762 .batches
763 .iter()
764 .map(|batch| {
765 let struct_array: StructArray = batch.clone().into();
766 Arc::new(struct_array) as ArrayRef
767 })
768 .collect::<Vec<_>>();
769 let field = Field::new_struct("", self.schema.fields().clone(), false)
770 .with_metadata(self.schema.metadata.clone());
771 Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
772 }
773
774 fn with_schema(&self, schema: PySchema) -> PyArrowResult<Arro3Table> {
775 let new_schema = schema.into_inner();
776 let new_batches = self
777 .batches
778 .iter()
779 .map(|batch| RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec()))
780 .collect::<Result<Vec<_>, ArrowError>>()?;
781 Ok(PyTable::try_new(new_batches, new_schema)?.into())
782 }
783}