polars_python/interop/arrow/
to_py.rs1use std::ffi::CString;
2
3use arrow::datatypes::ArrowDataType;
4use arrow::ffi;
5use arrow::record_batch::RecordBatch;
6use polars::datatypes::CompatLevel;
7use polars::frame::DataFrame;
8use polars::prelude::{ArrayRef, ArrowField, PlSmallStr, SchemaExt};
9use polars::series::Series;
10use polars_core::utils::arrow;
11use polars_error::PolarsResult;
12use pyo3::ffi::Py_uintptr_t;
13use pyo3::prelude::*;
14use pyo3::types::PyCapsule;
15
16pub(crate) fn to_py_array(
18 array: ArrayRef,
19 field: &ArrowField,
20 pyarrow: &Bound<PyModule>,
21) -> PyResult<PyObject> {
22 let schema = Box::new(ffi::export_field_to_c(field));
23 let array = Box::new(ffi::export_array_to_c(array));
24
25 let schema_ptr: *const ffi::ArrowSchema = &*schema;
26 let array_ptr: *const ffi::ArrowArray = &*array;
27
28 let array = pyarrow.getattr("Array")?.call_method1(
29 "_import_from_c",
30 (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
31 )?;
32
33 Ok(array.unbind())
34}
35
36pub(crate) fn to_py_rb(
38 rb: &RecordBatch,
39 py: Python<'_>,
40 pyarrow: &Bound<PyModule>,
41) -> PyResult<PyObject> {
42 let mut arrays = Vec::with_capacity(rb.width());
43
44 for (array, field) in rb.columns().iter().zip(rb.schema().iter_values()) {
45 let array_object = to_py_array(array.clone(), field, pyarrow)?;
46 arrays.push(array_object);
47 }
48
49 let schema = Box::new(ffi::export_field_to_c(&ArrowField {
50 name: PlSmallStr::EMPTY,
51 dtype: ArrowDataType::Struct(rb.schema().iter_values().cloned().collect()),
52 is_nullable: false,
53 metadata: None,
54 }));
55 let schema_ptr: *const ffi::ArrowSchema = &*schema;
56
57 let schema = pyarrow
58 .getattr("Schema")?
59 .call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;
60 let record = pyarrow
61 .getattr("RecordBatch")?
62 .call_method1("from_arrays", (arrays, py.None(), schema))?;
63
64 Ok(record.unbind())
65}
66
67pub(crate) fn series_to_stream<'py>(
70 series: &Series,
71 py: Python<'py>,
72) -> PyResult<Bound<'py, PyCapsule>> {
73 let field = series.field().to_arrow(CompatLevel::newest());
74 let iter = Box::new(series.chunks().clone().into_iter().map(Ok)) as _;
75 let stream = ffi::export_iterator(iter, field);
76 let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
77 PyCapsule::new(py, stream, Some(stream_capsule_name))
78}
79
80pub(crate) fn dataframe_to_stream<'py>(
81 df: &DataFrame,
82 py: Python<'py>,
83) -> PyResult<Bound<'py, PyCapsule>> {
84 let iter = Box::new(DataFrameStreamIterator::new(df));
85 let field = iter.field();
86 let stream = ffi::export_iterator(iter, field);
87 let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
88 PyCapsule::new(py, stream, Some(stream_capsule_name))
89}
90
91pub struct DataFrameStreamIterator {
92 columns: Vec<Series>,
93 dtype: ArrowDataType,
94 idx: usize,
95 n_chunks: usize,
96}
97
98impl DataFrameStreamIterator {
99 fn new(df: &DataFrame) -> Self {
100 let schema = df.schema().to_arrow(CompatLevel::newest());
101 let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
102
103 Self {
104 columns: df
105 .get_columns()
106 .iter()
107 .map(|v| v.as_materialized_series().clone())
108 .collect(),
109 dtype,
110 idx: 0,
111 n_chunks: df.first_col_n_chunks(),
112 }
113 }
114
115 fn field(&self) -> ArrowField {
116 ArrowField::new(PlSmallStr::EMPTY, self.dtype.clone(), false)
117 }
118}
119
120impl Iterator for DataFrameStreamIterator {
121 type Item = PolarsResult<ArrayRef>;
122
123 fn next(&mut self) -> Option<Self::Item> {
124 if self.idx >= self.n_chunks {
125 None
126 } else {
127 let batch_cols = self
129 .columns
130 .iter()
131 .map(|s| s.to_arrow(self.idx, CompatLevel::newest()))
132 .collect::<Vec<_>>();
133 self.idx += 1;
134
135 let array = arrow::array::StructArray::new(
136 self.dtype.clone(),
137 batch_cols[0].len(),
138 batch_cols,
139 None,
140 );
141 Some(Ok(Box::new(array)))
142 }
143 }
144}