1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow_array::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader;
5use arrow_array::{ArrayRef, RecordBatchReader, StructArray};
6use arrow_array::{RecordBatch, RecordBatchIterator};
7use arrow_cast::pretty::pretty_format_batches_with_options;
8use arrow_schema::{ArrowError, Field, Schema, SchemaRef};
9use arrow_select::concat::concat_batches;
10use indexmap::IndexMap;
11use pyo3::exceptions::{PyTypeError, PyValueError};
12use pyo3::intern;
13use pyo3::prelude::*;
14use pyo3::types::{PyCapsule, PyTuple, PyType};
15
16use crate::error::{PyArrowError, PyArrowResult};
17use crate::export::{
18 Arro3ChunkedArray, Arro3Field, Arro3RecordBatch, Arro3RecordBatchReader, Arro3Schema,
19 Arro3Table,
20};
21use crate::ffi::from_python::utils::import_stream_pycapsule;
22use crate::ffi::to_python::chunked::ArrayIterator;
23use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
24use crate::ffi::to_python::to_stream_pycapsule;
25use crate::ffi::to_schema_pycapsule;
26use crate::input::{
27 AnyArray, AnyRecordBatch, FieldIndexInput, MetadataInput, NameOrField, SelectIndices,
28};
29use crate::utils::{default_repr_options, schema_equals};
30use crate::{PyChunkedArray, PyField, PyRecordBatch, PyRecordBatchReader, PySchema};
31
32#[pyclass(module = "arro3.core._core", name = "Table", subclass, frozen)]
36#[derive(Debug)]
37pub struct PyTable {
38 batches: Vec<RecordBatch>,
39 schema: SchemaRef,
40}
41
42impl PyTable {
43 pub fn try_new(batches: Vec<RecordBatch>, schema: SchemaRef) -> PyResult<Self> {
45 if !batches
46 .iter()
47 .all(|rb| schema_equals(rb.schema_ref(), &schema))
48 {
49 return Err(PyTypeError::new_err("All batches must have same schema"));
50 }
51
52 Ok(Self { schema, batches })
53 }
54
55 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
57 let stream = import_stream_pycapsule(capsule)?;
58 let stream_reader = ArrowRecordBatchStreamReader::try_new(stream)
59 .map_err(|err| PyValueError::new_err(err.to_string()))?;
60 let schema = stream_reader.schema();
61
62 let mut batches = vec![];
63 for batch in stream_reader {
64 let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?;
65 batches.push(batch);
66 }
67
68 Self::try_new(batches, schema)
69 }
70
71 pub fn batches(&self) -> &[RecordBatch] {
73 &self.batches
74 }
75
76 pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
78 (self.batches, self.schema)
79 }
80
81 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
83 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
84 arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
85 intern!(py, "from_arrow_pycapsule"),
86 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
87 )
88 }
89
90 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
92 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
93 let capsule =
94 Self::to_stream_pycapsule(py, self.batches.clone(), self.schema.clone(), None)?;
95 arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
96 intern!(py, "from_arrow_pycapsule"),
97 PyTuple::new(py, vec![capsule])?,
98 )
99 }
100
101 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
104 }
105
106 pub fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
110 let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
111 pyarrow_mod
112 .getattr(intern!(py, "table"))?
113 .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)
114 }
115
116 pub(crate) fn to_stream_pycapsule<'py>(
117 py: Python<'py>,
118 batches: Vec<RecordBatch>,
119 schema: SchemaRef,
120 requested_schema: Option<Bound<'py, PyCapsule>>,
121 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
122 let field = schema.fields();
123 let array_reader = batches.into_iter().map(|batch| {
124 let arr: ArrayRef = Arc::new(StructArray::from(batch));
125 Ok(arr)
126 });
127 let array_reader = Box::new(ArrayIterator::new(
128 array_reader,
129 Field::new_struct("", field.clone(), false)
130 .with_metadata(schema.metadata.clone())
131 .into(),
132 ));
133 to_stream_pycapsule(py, array_reader, requested_schema)
134 }
135
136 pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
137 let total_chunk_length = chunk_lengths.iter().sum::<usize>();
138 if total_chunk_length != self.num_rows() {
139 return Err(
140 PyValueError::new_err("Chunk lengths do not add up to table length").into(),
141 );
142 }
143
144 let matches_existing_chunking = chunk_lengths
146 .iter()
147 .zip(self.batches())
148 .all(|(length, batch)| *length == batch.num_rows());
149 if matches_existing_chunking {
150 return Ok(Self::try_new(self.batches.clone(), self.schema.clone())?);
151 }
152
153 let mut offset = 0;
154 let batches = chunk_lengths
155 .iter()
156 .map(|chunk_length| {
157 let sliced_table = self.slice(offset, *chunk_length)?;
158 let sliced_concatted = concat_batches(&self.schema, sliced_table.batches.iter())?;
159 offset += chunk_length;
160 Ok(sliced_concatted)
161 })
162 .collect::<PyArrowResult<Vec<_>>>()?;
163
164 Ok(Self::try_new(batches, self.schema.clone())?)
165 }
166
167 pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
168 if offset + length > self.num_rows() {
169 return Err(
170 PyValueError::new_err("offset + length may not exceed length of array").into(),
171 );
172 }
173
174 let mut sliced_batches: Vec<RecordBatch> = vec![];
175 for chunk in self.batches() {
176 if chunk.num_rows() == 0 {
177 continue;
178 }
179
180 if offset >= chunk.num_rows() {
183 offset -= chunk.num_rows();
184 continue;
185 }
186
187 let take_count = length.min(chunk.num_rows() - offset);
188 let sliced_chunk = chunk.slice(offset, take_count);
189 sliced_batches.push(sliced_chunk);
190
191 length -= take_count;
192
193 if length == 0 {
195 break;
196 } else {
197 offset = 0;
198 }
199 }
200
201 Ok(Self::try_new(sliced_batches, self.schema.clone())?)
202 }
203}
204
205impl Display for PyTable {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 writeln!(f, "arro3.core.Table")?;
208 let head_table = self
209 .slice(0, 10.min(self.num_rows()))
210 .map_err(|_| std::fmt::Error)?
211 .combine_chunks()
212 .map_err(|_| std::fmt::Error)?;
213
214 pretty_format_batches_with_options(
215 &head_table.into_inner().batches,
216 &default_repr_options(),
217 )
218 .map_err(|_| std::fmt::Error)?
219 .fmt(f)?;
220
221 Ok(())
222 }
223}
224
225#[pymethods]
226impl PyTable {
227 #[new]
228 #[pyo3(signature = (data, *, names=None, schema=None, metadata=None))]
229 fn new(
230 py: Python,
231 data: &Bound<PyAny>,
232 names: Option<Vec<String>>,
233 schema: Option<PySchema>,
234 metadata: Option<MetadataInput>,
235 ) -> PyArrowResult<Self> {
236 if data.hasattr(intern!(py, "__arrow_c_array__"))?
237 || data.hasattr(intern!(py, "__arrow_c_stream__"))?
238 {
239 Ok(data.extract::<AnyRecordBatch>()?.into_table()?)
240 } else if let Ok(mapping) = data.extract::<IndexMap<String, AnyArray>>() {
241 Self::from_pydict(&py.get_type::<PyTable>(), mapping, schema, metadata)
242 } else if let Ok(arrays) = data.extract::<Vec<AnyArray>>() {
243 Self::from_arrays(&py.get_type::<PyTable>(), arrays, names, schema, metadata)
244 } else {
245 Err(PyTypeError::new_err(
246 "Expected Table-like input or dict of arrays or sequence of arrays.",
247 )
248 .into())
249 }
250 }
251
252 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
253 to_schema_pycapsule(py, self.schema.as_ref())
254 }
255
256 #[pyo3(signature = (requested_schema=None))]
257 fn __arrow_c_stream__<'py>(
258 &'py self,
259 py: Python<'py>,
260 requested_schema: Option<Bound<'py, PyCapsule>>,
261 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
262 Self::to_stream_pycapsule(
263 py,
264 self.batches.clone(),
265 self.schema.clone(),
266 requested_schema,
267 )
268 }
269
270 fn __eq__(&self, other: &PyTable) -> bool {
271 self.batches == other.batches && self.schema == other.schema
272 }
273
274 fn __getitem__(&self, key: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
275 self.column(key)
276 }
277
278 fn __len__(&self) -> usize {
279 self.batches.iter().fold(0, |acc, x| acc + x.num_rows())
280 }
281
282 fn __repr__(&self) -> String {
283 self.to_string()
284 }
285
286 #[classmethod]
287 fn from_arrow(_cls: &Bound<PyType>, input: AnyRecordBatch) -> PyArrowResult<Self> {
288 input.into_table()
289 }
290
291 #[classmethod]
292 #[pyo3(name = "from_arrow_pycapsule")]
293 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
294 Self::from_arrow_pycapsule(capsule)
295 }
296
297 #[classmethod]
298 #[pyo3(signature = (batches, *, schema=None))]
299 fn from_batches(
300 _cls: &Bound<PyType>,
301 batches: Vec<PyRecordBatch>,
302 schema: Option<PySchema>,
303 ) -> PyArrowResult<Self> {
304 if batches.is_empty() {
305 let schema = schema.ok_or(PyValueError::new_err(
306 "schema must be passed for an empty list of batches",
307 ))?;
308 return Ok(Self::try_new(vec![], schema.into_inner())?);
309 }
310
311 let batches = batches
312 .into_iter()
313 .map(|batch| batch.into_inner())
314 .collect::<Vec<_>>();
315 let schema = schema
316 .map(|s| s.into_inner())
317 .unwrap_or(batches.first().unwrap().schema());
318 Ok(Self::try_new(batches, schema)?)
319 }
320
321 #[classmethod]
322 #[pyo3(signature = (mapping, *, schema=None, metadata=None))]
323 fn from_pydict(
324 cls: &Bound<PyType>,
325 mapping: IndexMap<String, AnyArray>,
326 schema: Option<PySchema>,
327 metadata: Option<MetadataInput>,
328 ) -> PyArrowResult<Self> {
329 let (names, arrays): (Vec<_>, Vec<_>) = mapping.into_iter().unzip();
330 Self::from_arrays(cls, arrays, Some(names), schema, metadata)
331 }
332
333 #[classmethod]
334 #[pyo3(signature = (arrays, *, names=None, schema=None, metadata=None))]
335 fn from_arrays(
336 _cls: &Bound<PyType>,
337 arrays: Vec<AnyArray>,
338 names: Option<Vec<String>>,
339 schema: Option<PySchema>,
340 metadata: Option<MetadataInput>,
341 ) -> PyArrowResult<Self> {
342 let columns = arrays
343 .into_iter()
344 .map(|array| array.into_chunked_array())
345 .collect::<PyArrowResult<Vec<_>>>()?;
346
347 let schema: SchemaRef = if let Some(schema) = schema {
348 schema.into_inner()
349 } else {
350 let names = names.ok_or(PyValueError::new_err(
351 "names must be passed if schema is not passed.",
352 ))?;
353
354 let fields = columns
355 .iter()
356 .zip(names.iter())
357 .map(|(array, name)| Arc::new(array.field().as_ref().clone().with_name(name)))
358 .collect::<Vec<_>>();
359 Arc::new(
360 Schema::new(fields)
361 .with_metadata(metadata.unwrap_or_default().into_string_hashmap().unwrap()),
362 )
363 };
364
365 if columns.is_empty() {
366 return Ok(Self::try_new(vec![], schema)?);
367 }
368
369 let column_chunk_lengths = columns
370 .iter()
371 .map(|column| {
372 let chunk_lengths = column
373 .chunks()
374 .iter()
375 .map(|chunk| chunk.len())
376 .collect::<Vec<_>>();
377 chunk_lengths
378 })
379 .collect::<Vec<_>>();
380 if !column_chunk_lengths.windows(2).all(|w| w[0] == w[1]) {
381 return Err(
382 PyValueError::new_err("All columns must have the same chunk lengths").into(),
383 );
384 }
385 let num_batches = column_chunk_lengths[0].len();
386
387 let mut batches = vec![];
388 for batch_idx in 0..num_batches {
389 let batch = RecordBatch::try_new(
390 schema.clone(),
391 columns
392 .iter()
393 .map(|column| column.chunks()[batch_idx].clone())
394 .collect(),
395 )?;
396 batches.push(batch);
397 }
398
399 Ok(Self::try_new(batches, schema)?)
400 }
401
402 fn add_column(
403 &self,
404 i: usize,
405 field: NameOrField,
406 column: PyChunkedArray,
407 ) -> PyArrowResult<Arro3Table> {
408 if self.num_rows() != column.len() {
409 return Err(
410 PyValueError::new_err("Number of rows in column does not match table.").into(),
411 );
412 }
413
414 let column = column.rechunk(self.chunk_lengths())?;
415
416 let mut fields = self.schema.fields().to_vec();
417 fields.insert(i, field.into_field(column.field()));
418 let new_schema = Arc::new(Schema::new_with_metadata(
419 fields,
420 self.schema.metadata().clone(),
421 ));
422
423 let new_batches = self
424 .batches
425 .iter()
426 .zip(column.chunks())
427 .map(|(batch, array)| {
428 debug_assert_eq!(
429 array.len(),
430 batch.num_rows(),
431 "Array and batch should have same number of rows."
432 );
433
434 let mut columns = batch.columns().to_vec();
435 columns.insert(i, array.clone());
436 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
437 })
438 .collect::<Result<Vec<_>, PyArrowError>>()?;
439
440 Ok(PyTable::try_new(new_batches, new_schema)?.into())
441 }
442
443 fn append_column(
444 &self,
445 field: NameOrField,
446 column: PyChunkedArray,
447 ) -> PyArrowResult<Arro3Table> {
448 if self.num_rows() != column.len() {
449 return Err(
450 PyValueError::new_err("Number of rows in column does not match table.").into(),
451 );
452 }
453
454 let column = column.rechunk(self.chunk_lengths())?;
455
456 let mut fields = self.schema.fields().to_vec();
457 fields.push(field.into_field(column.field()));
458 let new_schema = Arc::new(Schema::new_with_metadata(
459 fields,
460 self.schema.metadata().clone(),
461 ));
462
463 let new_batches = self
464 .batches
465 .iter()
466 .zip(column.chunks())
467 .map(|(batch, array)| {
468 debug_assert_eq!(
469 array.len(),
470 batch.num_rows(),
471 "Array and batch should have same number of rows."
472 );
473
474 let mut columns = batch.columns().to_vec();
475 columns.push(array.clone());
476 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
477 })
478 .collect::<Result<Vec<_>, PyArrowError>>()?;
479
480 Ok(PyTable::try_new(new_batches, new_schema)?.into())
481 }
482
483 #[getter]
484 fn chunk_lengths(&self) -> Vec<usize> {
485 self.batches.iter().map(|batch| batch.num_rows()).collect()
486 }
487
488 fn column(&self, i: FieldIndexInput) -> PyArrowResult<Arro3ChunkedArray> {
489 let column_index = i.into_position(&self.schema)?;
490 let field = self.schema.field(column_index).clone();
491 let chunks = self
492 .batches
493 .iter()
494 .map(|batch| batch.column(column_index).clone())
495 .collect();
496 Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
497 }
498
499 #[getter]
500 fn column_names(&self) -> Vec<String> {
501 self.schema
502 .fields()
503 .iter()
504 .map(|f| f.name().clone())
505 .collect()
506 }
507
508 #[getter]
509 fn columns(&self) -> PyArrowResult<Vec<Arro3ChunkedArray>> {
510 (0..self.num_columns())
511 .map(|i| self.column(FieldIndexInput::Position(i)))
512 .collect()
513 }
514
515 fn combine_chunks(&self) -> PyArrowResult<Arro3Table> {
516 let batch = concat_batches(&self.schema, &self.batches)?;
517 Ok(PyTable::try_new(vec![batch], self.schema.clone())?.into())
518 }
519
520 fn field(&self, i: FieldIndexInput) -> PyArrowResult<Arro3Field> {
521 let field = self.schema.field(i.into_position(&self.schema)?);
522 Ok(PyField::new(field.clone().into()).into())
523 }
524
525 #[getter]
526 fn nbytes(&self) -> usize {
527 self.batches
528 .iter()
529 .fold(0, |acc, batch| acc + batch.get_array_memory_size())
530 }
531
532 #[getter]
533 fn num_columns(&self) -> usize {
534 self.schema.fields().len()
535 }
536
537 #[getter]
538 fn num_rows(&self) -> usize {
539 self.batches()
540 .iter()
541 .fold(0, |acc, batch| acc + batch.num_rows())
542 }
543
544 #[pyo3(signature = (*, max_chunksize=None))]
545 #[pyo3(name = "rechunk")]
546 fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3Table> {
547 let max_chunksize = max_chunksize.unwrap_or(self.num_rows());
548 if max_chunksize == 0 {
549 return Err(PyValueError::new_err("max_chunksize must be > 0").into());
550 }
551
552 let mut chunk_lengths = vec![];
553 let mut offset = 0;
554 while offset < self.num_rows() {
555 let chunk_length = max_chunksize.min(self.num_rows() - offset);
556 offset += chunk_length;
557 chunk_lengths.push(chunk_length);
558 }
559 Ok(self.rechunk(chunk_lengths)?.into())
560 }
561
562 fn remove_column(&self, i: usize) -> PyArrowResult<Arro3Table> {
563 let mut fields = self.schema.fields().to_vec();
564 fields.remove(i);
565 let new_schema = Arc::new(Schema::new_with_metadata(
566 fields,
567 self.schema.metadata().clone(),
568 ));
569
570 let new_batches = self
571 .batches
572 .iter()
573 .map(|batch| {
574 let mut columns = batch.columns().to_vec();
575 columns.remove(i);
576 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
577 })
578 .collect::<Result<Vec<_>, PyArrowError>>()?;
579
580 Ok(PyTable::try_new(new_batches, new_schema)?.into())
581 }
582
583 fn rename_columns(&self, names: Vec<String>) -> PyArrowResult<Arro3Table> {
584 if names.len() != self.num_columns() {
585 return Err(PyValueError::new_err("When names is a list[str], must pass the same number of names as there are columns.").into());
586 }
587
588 let new_fields = self
589 .schema
590 .fields()
591 .iter()
592 .zip(names)
593 .map(|(field, name)| field.as_ref().clone().with_name(name))
594 .collect::<Vec<_>>();
595 let new_schema = Arc::new(Schema::new_with_metadata(
596 new_fields,
597 self.schema.metadata().clone(),
598 ));
599 Ok(PyTable::try_new(self.batches.clone(), new_schema)?.into())
600 }
601
602 #[getter]
603 fn schema(&self) -> Arro3Schema {
604 PySchema::new(self.schema.clone()).into()
605 }
606
607 fn select(&self, columns: SelectIndices) -> PyArrowResult<Arro3Table> {
608 let positions = columns.into_positions(self.schema.fields())?;
609
610 let new_schema = Arc::new(self.schema.project(&positions)?);
611 let new_batches = self
612 .batches
613 .iter()
614 .map(|batch| batch.project(&positions))
615 .collect::<Result<Vec<_>, ArrowError>>()?;
616 Ok(PyTable::try_new(new_batches, new_schema)?.into())
617 }
618
619 fn set_column(
620 &self,
621 i: usize,
622 field: NameOrField,
623 column: PyChunkedArray,
624 ) -> PyArrowResult<Arro3Table> {
625 if self.num_rows() != column.len() {
626 return Err(
627 PyValueError::new_err("Number of rows in column does not match table.").into(),
628 );
629 }
630
631 let column = column.rechunk(self.chunk_lengths())?;
632
633 let mut fields = self.schema.fields().to_vec();
634 fields[i] = field.into_field(column.field());
635 let new_schema = Arc::new(Schema::new_with_metadata(
636 fields,
637 self.schema.metadata().clone(),
638 ));
639
640 let new_batches = self
641 .batches
642 .iter()
643 .zip(column.chunks())
644 .map(|(batch, array)| {
645 debug_assert_eq!(
646 array.len(),
647 batch.num_rows(),
648 "Array and batch should have same number of rows."
649 );
650
651 let mut columns = batch.columns().to_vec();
652 columns[i] = array.clone();
653 Ok(RecordBatch::try_new(new_schema.clone(), columns)?)
654 })
655 .collect::<Result<Vec<_>, PyArrowError>>()?;
656
657 Ok(PyTable::try_new(new_batches, new_schema)?.into())
658 }
659
660 #[getter]
661 fn shape(&self) -> (usize, usize) {
662 (self.num_rows(), self.num_columns())
663 }
664
665 #[pyo3(signature = (offset=0, length=None))]
666 #[pyo3(name = "slice")]
667 fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3Table> {
668 let length = length.unwrap_or_else(|| self.num_rows() - offset);
669 Ok(self.slice(offset, length)?.into())
670 }
671
672 fn to_batches(&self) -> Vec<Arro3RecordBatch> {
673 self.batches
674 .iter()
675 .map(|batch| PyRecordBatch::new(batch.clone()).into())
676 .collect()
677 }
678
679 fn to_reader(&self) -> Arro3RecordBatchReader {
680 let reader = Box::new(RecordBatchIterator::new(
681 self.batches.clone().into_iter().map(Ok),
682 self.schema.clone(),
683 ));
684 PyRecordBatchReader::new(reader).into()
685 }
686
687 fn to_struct_array(&self) -> PyArrowResult<Arro3ChunkedArray> {
688 let chunks = self
689 .batches
690 .iter()
691 .map(|batch| {
692 let struct_array: StructArray = batch.clone().into();
693 Arc::new(struct_array) as ArrayRef
694 })
695 .collect::<Vec<_>>();
696 let field = Field::new_struct("", self.schema.fields().clone(), false)
697 .with_metadata(self.schema.metadata.clone());
698 Ok(PyChunkedArray::try_new(chunks, field.into())?.into())
699 }
700
701 fn with_schema(&self, schema: PySchema) -> PyArrowResult<Arro3Table> {
702 let new_schema = schema.into_inner();
703 let new_batches = self
704 .batches
705 .iter()
706 .map(|batch| RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec()))
707 .collect::<Result<Vec<_>, ArrowError>>()?;
708 Ok(PyTable::try_new(new_batches, new_schema)?.into())
709 }
710}