Skip to main content

polars_python/dataframe/
export.rs

1use arrow::datatypes::IntegerType;
2use arrow::record_batch::RecordBatch;
3use polars::prelude::*;
4use polars_compute::cast::CastOptionsImpl;
5use polars_utils::itertools::Itertools;
6use pyo3::IntoPyObjectExt;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyList, PyTuple};
9
10use super::PyDataFrame;
11use crate::conversion::{ObjectValue, Wrap};
12use crate::error::PyPolarsErr;
13use crate::interop;
14use crate::interop::arrow::to_py::dataframe_to_stream;
15use crate::prelude::PyCompatLevel;
16use crate::utils::EnterPolarsExt;
17
18#[pymethods]
19impl PyDataFrame {
20    #[cfg(feature = "object")]
21    pub fn row_tuple<'py>(&self, idx: i64, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
22        let df = self.df.read();
23        let idx = if idx < 0 {
24            (df.height() as i64 + idx) as usize
25        } else {
26            idx as usize
27        };
28        if idx >= df.height() {
29            return Err(PyPolarsErr::from(polars_err!(oob = idx, df.height())).into());
30        }
31        PyTuple::new(
32            py,
33            df.columns().iter().map(|s| match s.dtype() {
34                DataType::Object(_) => {
35                    let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());
36                    obj.into_py_any(py).unwrap()
37                },
38                _ => Wrap(s.get(idx).unwrap()).into_py_any(py).unwrap(),
39            }),
40        )
41    }
42
43    #[cfg(feature = "object")]
44    pub fn row_tuples<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
45        let df = self.df.read();
46        let mut rechunked;
47        // Rechunk if random access would become rather expensive.
48        // TODO: iterate over the chunks directly instead of using random access.
49        let df = if df.max_n_chunks() > 16 {
50            rechunked = df.clone();
51            py.enter_polars_ok(|| rechunked.rechunk_mut_par())?;
52            &rechunked
53        } else {
54            &df
55        };
56        PyList::new(
57            py,
58            (0..df.height()).map(|idx| {
59                PyTuple::new(
60                    py,
61                    df.columns().iter().map(|c| match c.dtype() {
62                        DataType::Null => py.None(),
63                        DataType::Object(_) => {
64                            let obj: Option<&ObjectValue> = c.get_object(idx).map(|any| any.into());
65                            obj.into_py_any(py).unwrap()
66                        },
67                        _ => {
68                            // SAFETY: we are in bounds.
69                            let av = unsafe { c.get_unchecked(idx) };
70                            Wrap(av).into_py_any(py).unwrap()
71                        },
72                    }),
73                )
74                .unwrap()
75            }),
76        )
77    }
78
79    #[allow(clippy::wrong_self_convention)]
80    pub fn to_arrow(
81        &self,
82        py: Python<'_>,
83        compat_level: PyCompatLevel,
84    ) -> PyResult<Vec<Py<PyAny>>> {
85        let mut df = self.df.read().clone();
86        py.enter_polars_ok(|| df.align_chunks_par())?;
87        *self.df.write() = df.clone();
88
89        let pyarrow = py.import("pyarrow")?;
90
91        let mut chunks = df.iter_chunks(compat_level.0, true);
92        let mut rbs = Vec::with_capacity(chunks.size_hint().0);
93        // df.iter_chunks() iteration could internally try to acquire the GIL on another thread,
94        // so we make sure to run chunks.next() within enter_polars().
95        while let Some(rb) = py.enter_polars_ok(|| chunks.next())? {
96            let rb = interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)?;
97            rbs.push(rb);
98        }
99        Ok(rbs)
100    }
101
102    /// Create a `Vec` of PyArrow RecordBatch instances.
103    ///
104    /// Note this will give bad results for columns with dtype `pl.Object`,
105    /// since those can't be converted correctly via PyArrow. The calling Python
106    /// code should make sure these are not included.
107    #[allow(clippy::wrong_self_convention)]
108    pub fn to_pandas(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
109        let mut df = self.df.read().clone();
110        py.enter_polars_ok(|| df.rechunk_mut_par())?;
111        *self.df.write() = df.clone();
112
113        let pyarrow = py.import("pyarrow")?;
114
115        let dict_columns = df
116            .columns()
117            .iter()
118            .enumerate()
119            .filter(|(_, s)| {
120                matches!(
121                    s.dtype(),
122                    DataType::Categorical(_, _) | DataType::Enum(_, _)
123                )
124            })
125            .map(|(i, _)| i)
126            .collect_vec();
127        let is_enum_col = df
128            .columns()
129            .iter()
130            .map(|c| matches!(c.dtype(), DataType::Enum(_, _)))
131            .collect_vec();
132
133        let enum_dtype =
134            ArrowDataType::Dictionary(IntegerType::Int64, Box::new(ArrowDataType::LargeUtf8), true);
135        let categorical_dtype = ArrowDataType::Dictionary(
136            IntegerType::Int64,
137            Box::new(ArrowDataType::LargeUtf8),
138            false,
139        );
140
141        let mut replaced_schema = None;
142        df.iter_chunks(CompatLevel::oldest(), true)
143            .map(|rb| {
144                let length = rb.len();
145                let (schema, mut arrays) = rb.into_schema_and_arrays();
146
147                // Pandas does not allow unsigned dictionary indices, so replace them.
148                replaced_schema =
149                    (replaced_schema.is_none() && !dict_columns.is_empty()).then(|| {
150                        let mut schema = schema.as_ref().clone();
151                        for i in &dict_columns {
152                            let (_, field) = schema.get_at_index_mut(*i).unwrap();
153                            field.dtype = if is_enum_col[*i] {
154                                enum_dtype.clone()
155                            } else {
156                                categorical_dtype.clone()
157                            };
158                        }
159                        Arc::new(schema)
160                    });
161
162                for i in &dict_columns {
163                    let arr = arrays.get_mut(*i).unwrap();
164                    let cast_dtype = if is_enum_col[*i] {
165                        &enum_dtype
166                    } else {
167                        &categorical_dtype
168                    };
169                    let out =
170                        polars_compute::cast::cast(&**arr, cast_dtype, CastOptionsImpl::default())
171                            .unwrap();
172                    *arr = out;
173                }
174                let schema = replaced_schema
175                    .as_ref()
176                    .map_or(schema, |replaced| replaced.clone());
177                let rb = RecordBatch::new(length, schema, arrays);
178
179                interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)
180            })
181            .collect::<PyResult<_>>()
182    }
183
184    #[allow(unused_variables)]
185    #[pyo3(signature = (requested_schema))]
186    fn __arrow_c_stream__<'py>(
187        &self,
188        py: Python<'py>,
189        requested_schema: Option<Py<PyAny>>,
190    ) -> PyResult<Bound<'py, PyCapsule>> {
191        py.enter_polars_ok(|| {
192            self.df.write().rechunk_mut_par();
193        })?;
194        dataframe_to_stream(&self.df.read(), py)
195    }
196}