polars_python/dataframe/
export.rs

1use arrow::datatypes::IntegerType;
2use arrow::record_batch::RecordBatch;
3use parking_lot::RwLockWriteGuard;
4use polars::prelude::*;
5use polars_compute::cast::CastOptionsImpl;
6use pyo3::IntoPyObjectExt;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyList, PyTuple};
9
10use super::PyDataFrame;
11use crate::conversion::{ObjectValue, Wrap};
12use crate::error::PyPolarsErr;
13use crate::interop;
14use crate::interop::arrow::to_py::dataframe_to_stream;
15use crate::prelude::PyCompatLevel;
16use crate::utils::EnterPolarsExt;
17
18#[pymethods]
19impl PyDataFrame {
20    #[cfg(feature = "object")]
21    pub fn row_tuple<'py>(&self, idx: i64, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
22        let df = self.df.read();
23        let idx = if idx < 0 {
24            (df.height() as i64 + idx) as usize
25        } else {
26            idx as usize
27        };
28        if idx >= df.height() {
29            return Err(PyPolarsErr::from(polars_err!(oob = idx, df.height())).into());
30        }
31        PyTuple::new(
32            py,
33            df.get_columns().iter().map(|s| match s.dtype() {
34                DataType::Object(_) => {
35                    let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());
36                    obj.into_py_any(py).unwrap()
37                },
38                _ => Wrap(s.get(idx).unwrap()).into_py_any(py).unwrap(),
39            }),
40        )
41    }
42
43    #[cfg(feature = "object")]
44    pub fn row_tuples<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
45        let df = self.df.read();
46        let mut rechunked;
47        // Rechunk if random access would become rather expensive.
48        // TODO: iterate over the chunks directly instead of using random access.
49        let df = if df.max_n_chunks() > 16 {
50            rechunked = df.clone();
51            rechunked.as_single_chunk_par();
52            &rechunked
53        } else {
54            &df
55        };
56        PyList::new(
57            py,
58            (0..df.height()).map(|idx| {
59                PyTuple::new(
60                    py,
61                    df.get_columns().iter().map(|c| match c.dtype() {
62                        DataType::Null => py.None(),
63                        DataType::Object(_) => {
64                            let obj: Option<&ObjectValue> = c.get_object(idx).map(|any| any.into());
65                            obj.into_py_any(py).unwrap()
66                        },
67                        _ => {
68                            // SAFETY: we are in bounds.
69                            let av = unsafe { c.get_unchecked(idx) };
70                            Wrap(av).into_py_any(py).unwrap()
71                        },
72                    }),
73                )
74                .unwrap()
75            }),
76        )
77    }
78
79    #[allow(clippy::wrong_self_convention)]
80    pub fn to_arrow(
81        &self,
82        py: Python<'_>,
83        compat_level: PyCompatLevel,
84    ) -> PyResult<Vec<Py<PyAny>>> {
85        let mut df = self.df.write();
86        let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
87        py.enter_polars_ok(|| dfr.align_chunks_par())?;
88        let df = RwLockWriteGuard::downgrade(df);
89
90        let pyarrow = py.import("pyarrow")?;
91
92        let mut chunks = df.iter_chunks(compat_level.0, true);
93        let mut rbs = Vec::with_capacity(chunks.size_hint().0);
94        // df.iter_chunks() iteration could internally try to acquire the GIL on another thread,
95        // so we make sure to run chunks.next() within enter_polars().
96        while let Some(rb) = py.enter_polars_ok(|| chunks.next())? {
97            let rb = interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)?;
98            rbs.push(rb);
99        }
100        Ok(rbs)
101    }
102
103    /// Create a `Vec` of PyArrow RecordBatch instances.
104    ///
105    /// Note this will give bad results for columns with dtype `pl.Object`,
106    /// since those can't be converted correctly via PyArrow. The calling Python
107    /// code should make sure these are not included.
108    #[allow(clippy::wrong_self_convention)]
109    pub fn to_pandas(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
110        let mut df = self.df.write();
111        let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
112        py.enter_polars_ok(|| dfr.as_single_chunk_par())?;
113        let df = RwLockWriteGuard::downgrade(df);
114        Python::attach(|py| {
115            let pyarrow = py.import("pyarrow")?;
116            let cat_columns = df
117                .get_columns()
118                .iter()
119                .enumerate()
120                .filter(|(_i, s)| {
121                    matches!(
122                        s.dtype(),
123                        DataType::Categorical(_, _) | DataType::Enum(_, _)
124                    )
125                })
126                .map(|(i, _)| i)
127                .collect::<Vec<_>>();
128
129            let enum_and_categorical_dtype = ArrowDataType::Dictionary(
130                IntegerType::Int64,
131                Box::new(ArrowDataType::LargeUtf8),
132                false,
133            );
134
135            let mut replaced_schema = None;
136            let rbs = df
137                .iter_chunks(CompatLevel::oldest(), true)
138                .map(|rb| {
139                    let length = rb.len();
140                    let (schema, mut arrays) = rb.into_schema_and_arrays();
141
142                    // Pandas does not allow unsigned dictionary indices so we replace them.
143                    replaced_schema =
144                        (replaced_schema.is_none() && !cat_columns.is_empty()).then(|| {
145                            let mut schema = schema.as_ref().clone();
146                            for i in &cat_columns {
147                                let (_, field) = schema.get_at_index_mut(*i).unwrap();
148                                field.dtype = enum_and_categorical_dtype.clone();
149                            }
150                            Arc::new(schema)
151                        });
152
153                    for i in &cat_columns {
154                        let arr = arrays.get_mut(*i).unwrap();
155                        let out = polars_compute::cast::cast(
156                            &**arr,
157                            &enum_and_categorical_dtype,
158                            CastOptionsImpl::default(),
159                        )
160                        .unwrap();
161                        *arr = out;
162                    }
163                    let schema = replaced_schema
164                        .as_ref()
165                        .map_or(schema, |replaced| replaced.clone());
166                    let rb = RecordBatch::new(length, schema, arrays);
167
168                    interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)
169                })
170                .collect::<PyResult<_>>()?;
171            Ok(rbs)
172        })
173    }
174
175    #[allow(unused_variables)]
176    #[pyo3(signature = (requested_schema))]
177    fn __arrow_c_stream__<'py>(
178        &self,
179        py: Python<'py>,
180        requested_schema: Option<Py<PyAny>>,
181    ) -> PyResult<Bound<'py, PyCapsule>> {
182        py.enter_polars_ok(|| {
183            self.df.write().as_single_chunk_par();
184        })?;
185        dataframe_to_stream(&self.df.read(), py)
186    }
187}