polars_python/interop/arrow/
to_py.rs

1use std::ffi::CString;
2
3use arrow::datatypes::ArrowDataType;
4use arrow::ffi;
5use arrow::record_batch::RecordBatch;
6use polars::datatypes::CompatLevel;
7use polars::frame::DataFrame;
8use polars::prelude::{ArrayRef, ArrowField, PlSmallStr, SchemaExt};
9use polars::series::Series;
10use polars_core::utils::arrow;
11use polars_error::PolarsResult;
12use pyo3::ffi::Py_uintptr_t;
13use pyo3::prelude::*;
14use pyo3::types::PyCapsule;
15
16/// Arrow array to Python.
17pub(crate) fn to_py_array(
18    array: ArrayRef,
19    field: &ArrowField,
20    pyarrow: &Bound<PyModule>,
21) -> PyResult<PyObject> {
22    let schema = Box::new(ffi::export_field_to_c(field));
23    let array = Box::new(ffi::export_array_to_c(array));
24
25    let schema_ptr: *const ffi::ArrowSchema = &*schema;
26    let array_ptr: *const ffi::ArrowArray = &*array;
27
28    let array = pyarrow.getattr("Array")?.call_method1(
29        "_import_from_c",
30        (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
31    )?;
32
33    Ok(array.unbind())
34}
35
36/// RecordBatch to Python.
37pub(crate) fn to_py_rb(
38    rb: &RecordBatch,
39    py: Python<'_>,
40    pyarrow: &Bound<PyModule>,
41) -> PyResult<PyObject> {
42    let mut arrays = Vec::with_capacity(rb.width());
43
44    for (array, field) in rb.columns().iter().zip(rb.schema().iter_values()) {
45        let array_object = to_py_array(array.clone(), field, pyarrow)?;
46        arrays.push(array_object);
47    }
48
49    let schema = Box::new(ffi::export_field_to_c(&ArrowField {
50        name: PlSmallStr::EMPTY,
51        dtype: ArrowDataType::Struct(rb.schema().iter_values().cloned().collect()),
52        is_nullable: false,
53        metadata: None,
54    }));
55    let schema_ptr: *const ffi::ArrowSchema = &*schema;
56
57    let schema = pyarrow
58        .getattr("Schema")?
59        .call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;
60    let record = pyarrow
61        .getattr("RecordBatch")?
62        .call_method1("from_arrays", (arrays, py.None(), schema))?;
63
64    Ok(record.unbind())
65}
66
67/// Export a series to a C stream via a PyCapsule according to the Arrow PyCapsule Interface
68/// https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html
69pub(crate) fn series_to_stream<'py>(
70    series: &Series,
71    py: Python<'py>,
72) -> PyResult<Bound<'py, PyCapsule>> {
73    let field = series.field().to_arrow(CompatLevel::newest());
74    let series = series.clone();
75    let iter = Box::new(
76        (0..series.n_chunks()).map(move |i| Ok(series.to_arrow(i, CompatLevel::newest()))),
77    ) as _;
78
79    let stream = ffi::export_iterator(iter, field);
80    let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
81    PyCapsule::new(py, stream, Some(stream_capsule_name))
82}
83
84pub(crate) fn dataframe_to_stream<'py>(
85    df: &DataFrame,
86    py: Python<'py>,
87) -> PyResult<Bound<'py, PyCapsule>> {
88    let iter = Box::new(DataFrameStreamIterator::new(df));
89    let field = iter.field();
90    let stream = ffi::export_iterator(iter, field);
91    let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
92    PyCapsule::new(py, stream, Some(stream_capsule_name))
93}
94
95#[cfg(feature = "c_api")]
96#[pyfunction]
97pub(crate) fn polars_schema_to_pycapsule<'py>(
98    py: Python<'py>,
99    schema: crate::prelude::Wrap<polars::prelude::Schema>,
100    compat_level: crate::prelude::PyCompatLevel,
101) -> PyResult<Bound<'py, PyCapsule>> {
102    let schema: arrow::ffi::ArrowSchema = arrow::ffi::export_field_to_c(&ArrowField::new(
103        PlSmallStr::EMPTY,
104        ArrowDataType::Struct(
105            schema
106                .0
107                .iter_fields()
108                .map(|x| x.to_arrow(compat_level.0))
109                .collect(),
110        ),
111        false,
112    ));
113
114    let capsule_name = CString::new("arrow_schema").unwrap();
115    PyCapsule::new(py, schema, Some(capsule_name))
116}
117
118pub struct DataFrameStreamIterator {
119    columns: Vec<Series>,
120    dtype: ArrowDataType,
121    idx: usize,
122    n_chunks: usize,
123}
124
125impl DataFrameStreamIterator {
126    fn new(df: &DataFrame) -> Self {
127        let schema = df.schema().to_arrow(CompatLevel::newest());
128        let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
129
130        Self {
131            columns: df
132                .get_columns()
133                .iter()
134                .map(|v| v.as_materialized_series().clone())
135                .collect(),
136            dtype,
137            idx: 0,
138            n_chunks: df.first_col_n_chunks(),
139        }
140    }
141
142    fn field(&self) -> ArrowField {
143        ArrowField::new(PlSmallStr::EMPTY, self.dtype.clone(), false)
144    }
145}
146
147impl Iterator for DataFrameStreamIterator {
148    type Item = PolarsResult<ArrayRef>;
149
150    fn next(&mut self) -> Option<Self::Item> {
151        if self.idx >= self.n_chunks {
152            None
153        } else {
154            // create a batch of the columns with the same chunk no.
155            let batch_cols = self
156                .columns
157                .iter()
158                .map(|s| s.to_arrow(self.idx, CompatLevel::newest()))
159                .collect::<Vec<_>>();
160            self.idx += 1;
161
162            let array = arrow::array::StructArray::new(
163                self.dtype.clone(),
164                batch_cols[0].len(),
165                batch_cols,
166                None,
167            );
168            Some(Ok(Box::new(array)))
169        }
170    }
171}