polars_python/dataframe/
export.rs

1use arrow::datatypes::IntegerType;
2use arrow::record_batch::RecordBatch;
3use parking_lot::RwLockWriteGuard;
4use polars::prelude::*;
5use polars_compute::cast::CastOptionsImpl;
6use pyo3::IntoPyObjectExt;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyList, PyTuple};
9
10use super::PyDataFrame;
11use crate::conversion::{ObjectValue, Wrap};
12use crate::error::PyPolarsErr;
13use crate::interop;
14use crate::interop::arrow::to_py::dataframe_to_stream;
15use crate::prelude::PyCompatLevel;
16use crate::utils::EnterPolarsExt;
17
18#[pymethods]
19impl PyDataFrame {
20    #[cfg(feature = "object")]
21    pub fn row_tuple<'py>(&self, idx: i64, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
22        let df = self.df.read();
23        let idx = if idx < 0 {
24            (df.height() as i64 + idx) as usize
25        } else {
26            idx as usize
27        };
28        if idx >= df.height() {
29            return Err(PyPolarsErr::from(polars_err!(oob = idx, df.height())).into());
30        }
31        PyTuple::new(
32            py,
33            df.get_columns().iter().map(|s| match s.dtype() {
34                DataType::Object(_) => {
35                    let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());
36                    obj.into_py_any(py).unwrap()
37                },
38                _ => Wrap(s.get(idx).unwrap()).into_py_any(py).unwrap(),
39            }),
40        )
41    }
42
43    #[cfg(feature = "object")]
44    pub fn row_tuples<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
45        let df = self.df.read();
46        let mut rechunked;
47        // Rechunk if random access would become rather expensive.
48        // TODO: iterate over the chunks directly instead of using random access.
49        let df = if df.max_n_chunks() > 16 {
50            rechunked = df.clone();
51            rechunked.as_single_chunk_par();
52            &rechunked
53        } else {
54            &df
55        };
56        PyList::new(
57            py,
58            (0..df.height()).map(|idx| {
59                PyTuple::new(
60                    py,
61                    df.get_columns().iter().map(|c| match c.dtype() {
62                        DataType::Null => py.None(),
63                        DataType::Object(_) => {
64                            let obj: Option<&ObjectValue> = c.get_object(idx).map(|any| any.into());
65                            obj.into_py_any(py).unwrap()
66                        },
67                        _ => {
68                            // SAFETY: we are in bounds.
69                            let av = unsafe { c.get_unchecked(idx) };
70                            Wrap(av).into_py_any(py).unwrap()
71                        },
72                    }),
73                )
74                .unwrap()
75            }),
76        )
77    }
78
79    #[allow(clippy::wrong_self_convention)]
80    pub fn to_arrow(&self, py: Python<'_>, compat_level: PyCompatLevel) -> PyResult<Vec<PyObject>> {
81        let mut df = self.df.write();
82        let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
83        py.enter_polars_ok(|| dfr.align_chunks_par())?;
84        let df = RwLockWriteGuard::downgrade(df);
85
86        let pyarrow = py.import("pyarrow")?;
87
88        let mut chunks = df.iter_chunks(compat_level.0, true);
89        let mut rbs = Vec::with_capacity(chunks.size_hint().0);
90        // df.iter_chunks() iteration could internally try to acquire the GIL on another thread,
91        // so we make sure to run chunks.next() within enter_polars().
92        while let Some(rb) = py.enter_polars_ok(|| chunks.next())? {
93            let rb = interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)?;
94            rbs.push(rb);
95        }
96        Ok(rbs)
97    }
98
99    /// Create a `Vec` of PyArrow RecordBatch instances.
100    ///
101    /// Note this will give bad results for columns with dtype `pl.Object`,
102    /// since those can't be converted correctly via PyArrow. The calling Python
103    /// code should make sure these are not included.
104    #[allow(clippy::wrong_self_convention)]
105    pub fn to_pandas(&self, py: Python) -> PyResult<Vec<PyObject>> {
106        let mut df = self.df.write();
107        let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
108        py.enter_polars_ok(|| dfr.as_single_chunk_par())?;
109        let df = RwLockWriteGuard::downgrade(df);
110        Python::with_gil(|py| {
111            let pyarrow = py.import("pyarrow")?;
112            let cat_columns = df
113                .get_columns()
114                .iter()
115                .enumerate()
116                .filter(|(_i, s)| {
117                    matches!(
118                        s.dtype(),
119                        DataType::Categorical(_, _) | DataType::Enum(_, _)
120                    )
121                })
122                .map(|(i, _)| i)
123                .collect::<Vec<_>>();
124
125            let enum_and_categorical_dtype = ArrowDataType::Dictionary(
126                IntegerType::Int64,
127                Box::new(ArrowDataType::LargeUtf8),
128                false,
129            );
130
131            let mut replaced_schema = None;
132            let rbs = df
133                .iter_chunks(CompatLevel::oldest(), true)
134                .map(|rb| {
135                    let length = rb.len();
136                    let (schema, mut arrays) = rb.into_schema_and_arrays();
137
138                    // Pandas does not allow unsigned dictionary indices so we replace them.
139                    replaced_schema =
140                        (replaced_schema.is_none() && !cat_columns.is_empty()).then(|| {
141                            let mut schema = schema.as_ref().clone();
142                            for i in &cat_columns {
143                                let (_, field) = schema.get_at_index_mut(*i).unwrap();
144                                field.dtype = enum_and_categorical_dtype.clone();
145                            }
146                            Arc::new(schema)
147                        });
148
149                    for i in &cat_columns {
150                        let arr = arrays.get_mut(*i).unwrap();
151                        let out = polars_compute::cast::cast(
152                            &**arr,
153                            &enum_and_categorical_dtype,
154                            CastOptionsImpl::default(),
155                        )
156                        .unwrap();
157                        *arr = out;
158                    }
159                    let schema = replaced_schema
160                        .as_ref()
161                        .map_or(schema, |replaced| replaced.clone());
162                    let rb = RecordBatch::new(length, schema, arrays);
163
164                    interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)
165                })
166                .collect::<PyResult<_>>()?;
167            Ok(rbs)
168        })
169    }
170
171    #[allow(unused_variables)]
172    #[pyo3(signature = (requested_schema))]
173    fn __arrow_c_stream__<'py>(
174        &self,
175        py: Python<'py>,
176        requested_schema: Option<PyObject>,
177    ) -> PyResult<Bound<'py, PyCapsule>> {
178        let mut df = self.df.write();
179        let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
180        py.enter_polars_ok(|| dfr.as_single_chunk_par())?;
181        let df = RwLockWriteGuard::downgrade(df);
182        dataframe_to_stream(&df, py)
183    }
184}