polars_python/dataframe/
export.rs

1use arrow::datatypes::IntegerType;
2use arrow::record_batch::RecordBatch;
3use polars::prelude::*;
4use polars_compute::cast::CastOptionsImpl;
5use pyo3::prelude::*;
6use pyo3::types::{PyCapsule, PyList, PyTuple};
7use pyo3::IntoPyObjectExt;
8
9use super::PyDataFrame;
10use crate::conversion::{ObjectValue, Wrap};
11use crate::error::PyPolarsErr;
12use crate::interop;
13use crate::interop::arrow::to_py::dataframe_to_stream;
14use crate::prelude::PyCompatLevel;
15
16#[pymethods]
17impl PyDataFrame {
18    #[cfg(feature = "object")]
19    pub fn row_tuple<'py>(&self, idx: i64, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
20        let idx = if idx < 0 {
21            (self.df.height() as i64 + idx) as usize
22        } else {
23            idx as usize
24        };
25        if idx >= self.df.height() {
26            return Err(PyPolarsErr::from(polars_err!(oob = idx, self.df.height())).into());
27        }
28        PyTuple::new(
29            py,
30            self.df.get_columns().iter().map(|s| match s.dtype() {
31                DataType::Object(_, _) => {
32                    let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());
33                    obj.into_py_any(py).unwrap()
34                },
35                _ => Wrap(s.get(idx).unwrap()).into_py_any(py).unwrap(),
36            }),
37        )
38    }
39
40    #[cfg(feature = "object")]
41    pub fn row_tuples<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
42        let mut rechunked;
43        // Rechunk if random access would become rather expensive.
44        // TODO: iterate over the chunks directly instead of using random access.
45        let df = if self.df.max_n_chunks() > 16 {
46            rechunked = self.df.clone();
47            rechunked.as_single_chunk_par();
48            &rechunked
49        } else {
50            &self.df
51        };
52        PyList::new(
53            py,
54            (0..df.height()).map(|idx| {
55                PyTuple::new(
56                    py,
57                    df.get_columns().iter().map(|c| match c.dtype() {
58                        DataType::Null => py.None(),
59                        DataType::Object(_, _) => {
60                            let obj: Option<&ObjectValue> = c.get_object(idx).map(|any| any.into());
61                            obj.into_py_any(py).unwrap()
62                        },
63                        _ => {
64                            // SAFETY: we are in bounds.
65                            let av = unsafe { c.get_unchecked(idx) };
66                            Wrap(av).into_py_any(py).unwrap()
67                        },
68                    }),
69                )
70                .unwrap()
71            }),
72        )
73    }
74
75    #[allow(clippy::wrong_self_convention)]
76    pub fn to_arrow(&mut self, py: Python, compat_level: PyCompatLevel) -> PyResult<Vec<PyObject>> {
77        py.allow_threads(|| self.df.align_chunks_par());
78        let pyarrow = py.import("pyarrow")?;
79
80        let rbs = self
81            .df
82            .iter_chunks(compat_level.0, true)
83            .map(|rb| interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow))
84            .collect::<PyResult<_>>()?;
85        Ok(rbs)
86    }
87
88    /// Create a `Vec` of PyArrow RecordBatch instances.
89    ///
90    /// Note this will give bad results for columns with dtype `pl.Object`,
91    /// since those can't be converted correctly via PyArrow. The calling Python
92    /// code should make sure these are not included.
93    #[allow(clippy::wrong_self_convention)]
94    pub fn to_pandas(&mut self, py: Python) -> PyResult<Vec<PyObject>> {
95        py.allow_threads(|| self.df.as_single_chunk_par());
96        Python::with_gil(|py| {
97            let pyarrow = py.import("pyarrow")?;
98            let cat_columns = self
99                .df
100                .get_columns()
101                .iter()
102                .enumerate()
103                .filter(|(_i, s)| {
104                    matches!(
105                        s.dtype(),
106                        DataType::Categorical(_, _) | DataType::Enum(_, _)
107                    )
108                })
109                .map(|(i, _)| i)
110                .collect::<Vec<_>>();
111
112            let enum_and_categorical_dtype = ArrowDataType::Dictionary(
113                IntegerType::Int64,
114                Box::new(ArrowDataType::LargeUtf8),
115                false,
116            );
117
118            let mut replaced_schema = None;
119            let rbs = self
120                .df
121                .iter_chunks(CompatLevel::oldest(), true)
122                .map(|rb| {
123                    let length = rb.len();
124                    let (schema, mut arrays) = rb.into_schema_and_arrays();
125
126                    // Pandas does not allow unsigned dictionary indices so we replace them.
127                    replaced_schema =
128                        (replaced_schema.is_none() && !cat_columns.is_empty()).then(|| {
129                            let mut schema = schema.as_ref().clone();
130                            for i in &cat_columns {
131                                let (_, field) = schema.get_at_index_mut(*i).unwrap();
132                                field.dtype = enum_and_categorical_dtype.clone();
133                            }
134                            Arc::new(schema)
135                        });
136
137                    for i in &cat_columns {
138                        let arr = arrays.get_mut(*i).unwrap();
139                        let out = polars_compute::cast::cast(
140                            &**arr,
141                            &enum_and_categorical_dtype,
142                            CastOptionsImpl::default(),
143                        )
144                        .unwrap();
145                        *arr = out;
146                    }
147                    let schema = replaced_schema
148                        .as_ref()
149                        .map_or(schema, |replaced| replaced.clone());
150                    let rb = RecordBatch::new(length, schema, arrays);
151
152                    interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)
153                })
154                .collect::<PyResult<_>>()?;
155            Ok(rbs)
156        })
157    }
158
159    #[allow(unused_variables)]
160    #[pyo3(signature = (requested_schema=None))]
161    fn __arrow_c_stream__<'py>(
162        &'py mut self,
163        py: Python<'py>,
164        requested_schema: Option<PyObject>,
165    ) -> PyResult<Bound<'py, PyCapsule>> {
166        py.allow_threads(|| self.df.align_chunks_par());
167        dataframe_to_stream(&self.df, py)
168    }
169}