Skip to main content

polars_python/dataframe/
general.rs

1use std::hash::BuildHasher;
2
3use arrow::bitmap::MutableBitmap;
4use either::Either;
5use parking_lot::RwLock;
6use polars::prelude::*;
7use polars_ffi::version_0::SeriesExport;
8use pyo3::exceptions::PyIndexError;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::{PyList, PyType};
12
13use self::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
14use super::PyDataFrame;
15use crate::PyLazyFrame;
16use crate::conversion::Wrap;
17use crate::error::PyPolarsErr;
18use crate::prelude::strings_to_pl_smallstr;
19use crate::py_modules::polars;
20use crate::series::{PySeries, ToPySeries, ToSeries};
21use crate::utils::{EnterPolarsExt, to_py_err};
22
23#[pymethods]
24impl PyDataFrame {
25    #[new]
26    pub fn __init__(columns: Vec<PySeries>) -> PyResult<Self> {
27        let columns = columns.to_series();
28        // @scalar-opt
29        let columns = columns.into_iter().map(|s| s.into()).collect();
30        let df = DataFrame::new_infer_height(columns).map_err(PyPolarsErr::from)?;
31        Ok(PyDataFrame::new(df))
32    }
33
34    #[staticmethod]
35    pub fn empty_with_height(height: u64) -> PyResult<Self> {
36        Ok(PyDataFrame::new(DataFrame::empty_with_height(
37            IdxSize::try_from(height)
38                .map_err(|_| polars_err!(bigidx, ctx = "DataFrame(height = _)", size = height))
39                .map_err(to_py_err)? as usize,
40        )))
41    }
42
43    pub fn estimated_size(&self) -> usize {
44        self.df.read().estimated_size()
45    }
46
47    pub fn dtype_strings(&self) -> Vec<String> {
48        self.df
49            .read()
50            .columns()
51            .iter()
52            .map(|s| format!("{}", s.dtype()))
53            .collect()
54    }
55
56    pub fn add(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
57        py.enter_polars_df(|| &*self.df.read() + &*s.series.read())
58    }
59
60    pub fn sub(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
61        py.enter_polars_df(|| &*self.df.read() - &*s.series.read())
62    }
63
64    pub fn mul(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
65        py.enter_polars_df(|| &*self.df.read() * &*s.series.read())
66    }
67
68    pub fn div(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
69        py.enter_polars_df(|| &*self.df.read() / &*s.series.read())
70    }
71
72    pub fn rem(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
73        py.enter_polars_df(|| &*self.df.read() % &*s.series.read())
74    }
75
76    pub fn add_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
77        py.enter_polars_df(|| &*self.df.read() + &*s.df.read())
78    }
79
80    pub fn sub_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
81        py.enter_polars_df(|| &*self.df.read() - &*s.df.read())
82    }
83
84    pub fn mul_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
85        py.enter_polars_df(|| &*self.df.read() * &*s.df.read())
86    }
87
88    pub fn div_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
89        py.enter_polars_df(|| &*self.df.read() / &*s.df.read())
90    }
91
92    pub fn rem_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
93        py.enter_polars_df(|| &*self.df.read() % &*s.df.read())
94    }
95
96    #[pyo3(signature = (n, with_replacement, shuffle, seed=None))]
97    pub fn sample_n(
98        &self,
99        py: Python<'_>,
100        n: &PySeries,
101        with_replacement: bool,
102        shuffle: bool,
103        seed: Option<u64>,
104    ) -> PyResult<Self> {
105        py.enter_polars_df(|| {
106            self.df
107                .read()
108                .sample_n(&n.series.read(), with_replacement, shuffle, seed)
109        })
110    }
111
112    #[pyo3(signature = (frac, with_replacement, shuffle, seed=None))]
113    pub fn sample_frac(
114        &self,
115        py: Python<'_>,
116        frac: &PySeries,
117        with_replacement: bool,
118        shuffle: bool,
119        seed: Option<u64>,
120    ) -> PyResult<Self> {
121        py.enter_polars_df(|| {
122            self.df
123                .read()
124                .sample_frac(&frac.series.read(), with_replacement, shuffle, seed)
125        })
126    }
127
128    pub fn rechunk(&self, py: Python) -> PyResult<Self> {
129        py.enter_polars_df(|| {
130            let mut df = self.df.read().clone();
131            df.rechunk_mut_par();
132            Ok(df)
133        })
134    }
135
136    /// Format `DataFrame` as String
137    pub fn as_str(&self) -> String {
138        format!("{:?}", self.df.read())
139    }
140
141    pub fn get_columns(&self) -> Vec<PySeries> {
142        let cols = self.df.read().columns().to_vec();
143        cols.to_pyseries()
144    }
145
146    /// Get column names
147    pub fn columns(&self) -> Vec<String> {
148        self.df
149            .read()
150            .columns()
151            .iter()
152            .map(|s| s.name().to_string())
153            .collect()
154    }
155
156    /// set column names
157    pub fn set_column_names(&self, names: Vec<PyBackedStr>) -> PyResult<()> {
158        self.df
159            .write()
160            .set_column_names(&names)
161            .map_err(PyPolarsErr::from)?;
162        Ok(())
163    }
164
165    /// Get datatypes
166    pub fn dtypes<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
167        let df = self.df.read();
168        let iter = df
169            .columns()
170            .iter()
171            .map(|s| Wrap(s.dtype().clone()).into_pyobject(py).unwrap());
172        PyList::new(py, iter)
173    }
174
175    pub fn n_chunks(&self) -> usize {
176        self.df.read().first_col_n_chunks()
177    }
178
179    pub fn shape(&self) -> (usize, usize) {
180        self.df.read().shape()
181    }
182
183    pub fn height(&self) -> usize {
184        self.df.read().height()
185    }
186
187    pub fn width(&self) -> usize {
188        self.df.read().width()
189    }
190
191    pub fn is_empty(&self) -> bool {
192        self.df.read().shape_has_zero()
193    }
194
195    pub fn hstack(&self, py: Python<'_>, columns: Vec<PySeries>) -> PyResult<Self> {
196        let columns = columns.to_series();
197        // @scalar-opt
198        let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
199        py.enter_polars_df(|| self.df.read().hstack(&columns))
200    }
201
202    pub fn hstack_mut(&self, py: Python<'_>, columns: Vec<PySeries>) -> PyResult<()> {
203        let columns = columns.to_series();
204        // @scalar-opt
205        let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
206        py.enter_polars(|| self.df.write().hstack_mut(&columns).map(drop))?;
207        Ok(())
208    }
209
210    pub fn vstack(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<Self> {
211        py.enter_polars_df(|| self.df.read().vstack(&other.df.read()))
212    }
213
214    pub fn vstack_mut(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<()> {
215        py.enter_polars(|| {
216            // Prevent self-vstack deadlocks.
217            let other = other.df.read().clone();
218            self.df.write().vstack_mut_owned(other)?;
219            PolarsResult::Ok(())
220        })?;
221        Ok(())
222    }
223
224    pub fn extend(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<()> {
225        py.enter_polars(|| {
226            // Prevent self-extend deadlocks.
227            let other = other.df.read().clone();
228            self.df.write().extend(&other)
229        })?;
230        Ok(())
231    }
232
233    pub fn drop_in_place(&self, name: &str) -> PyResult<PySeries> {
234        let s = self
235            .df
236            .write()
237            .drop_in_place(name)
238            .map_err(PyPolarsErr::from)?;
239        let s = s.take_materialized_series();
240        Ok(PySeries::from(s))
241    }
242
243    pub fn to_series(&self, index: isize) -> PyResult<PySeries> {
244        let df = &self.df.read();
245
246        let index_adjusted = if index < 0 {
247            df.width().checked_sub(index.unsigned_abs())
248        } else {
249            Some(usize::try_from(index).unwrap())
250        };
251
252        let s = index_adjusted.and_then(|i| df.select_at_idx(i));
253        match s {
254            Some(s) => Ok(PySeries::new(s.as_materialized_series().clone())),
255            None => Err(PyIndexError::new_err(
256                polars_err!(oob = index, df.width()).to_string(),
257            )),
258        }
259    }
260
261    pub fn get_column_index(&self, name: &str) -> PyResult<usize> {
262        Ok(self
263            .df
264            .read()
265            .try_get_column_index(name)
266            .map_err(PyPolarsErr::from)?)
267    }
268
269    pub fn get_column(&self, name: &str) -> PyResult<PySeries> {
270        let series = self
271            .df
272            .read()
273            .column(name)
274            .map(|s| PySeries::new(s.as_materialized_series().clone()))
275            .map_err(PyPolarsErr::from)?;
276        Ok(series)
277    }
278
279    pub fn select(&self, py: Python<'_>, columns: Vec<PyBackedStr>) -> PyResult<Self> {
280        py.enter_polars_df(|| self.df.read().select(columns.iter().map(|x| &**x)))
281    }
282
283    pub fn gather(&self, py: Python<'_>, indices: Wrap<Vec<IdxSize>>) -> PyResult<Self> {
284        let indices = indices.0;
285        let indices = IdxCa::from_vec("".into(), indices);
286        py.enter_polars_df(|| self.df.read().take(&indices))
287    }
288
289    pub fn gather_with_series(&self, py: Python<'_>, indices: &PySeries) -> PyResult<Self> {
290        let idx_s = indices.series.read();
291        let indices = idx_s.idx().map_err(PyPolarsErr::from)?;
292        py.enter_polars_df(|| self.df.read().take(indices))
293    }
294
295    pub fn replace(&self, column: &str, new_col: PySeries) -> PyResult<()> {
296        self.df
297            .write()
298            .replace(column, new_col.series.into_inner().into_column())
299            .map_err(PyPolarsErr::from)?;
300        Ok(())
301    }
302
303    pub fn replace_column(&self, index: usize, new_column: PySeries) -> PyResult<()> {
304        self.df
305            .write()
306            .replace_column(index, new_column.series.into_inner().into_column())
307            .map_err(PyPolarsErr::from)?;
308        Ok(())
309    }
310
311    pub fn insert_column(&self, index: usize, column: PySeries) -> PyResult<()> {
312        self.df
313            .write()
314            .insert_column(index, column.series.into_inner().into_column())
315            .map_err(PyPolarsErr::from)?;
316        Ok(())
317    }
318
319    #[pyo3(signature = (offset, length))]
320    pub fn slice(&self, py: Python<'_>, offset: i64, length: Option<usize>) -> PyResult<Self> {
321        py.enter_polars_df(|| {
322            let df = self.df.read();
323            let len = length.unwrap_or(usize::MAX);
324            Ok(df.slice(offset, len))
325        })
326    }
327
328    pub fn head(&self, py: Python<'_>, n: usize) -> PyResult<Self> {
329        py.enter_polars_df(|| Ok(self.df.read().head(Some(n))))
330    }
331
332    pub fn tail(&self, py: Python<'_>, n: usize) -> PyResult<Self> {
333        py.enter_polars_df(|| Ok(self.df.read().tail(Some(n))))
334    }
335
336    pub fn is_unique(&self, py: Python) -> PyResult<PySeries> {
337        py.enter_polars_series(|| self.df.read().is_unique())
338    }
339
340    pub fn is_duplicated(&self, py: Python) -> PyResult<PySeries> {
341        py.enter_polars_series(|| self.df.read().is_duplicated())
342    }
343
344    pub fn equals(&self, py: Python<'_>, other: &PyDataFrame, null_equal: bool) -> PyResult<bool> {
345        if null_equal {
346            py.enter_polars_ok(|| self.df.read().equals_missing(&other.df.read()))
347        } else {
348            py.enter_polars_ok(|| self.df.read().equals(&other.df.read()))
349        }
350    }
351
352    #[pyo3(signature = (name, offset=None))]
353    pub fn with_row_index(
354        &self,
355        py: Python<'_>,
356        name: &str,
357        offset: Option<IdxSize>,
358    ) -> PyResult<Self> {
359        py.enter_polars_df(|| self.df.read().with_row_index(name.into(), offset))
360    }
361
362    pub fn _to_metadata(&self) -> Self {
363        Self {
364            df: RwLock::new(self.df.read()._to_metadata()),
365        }
366    }
367
368    pub fn group_by_map_groups(
369        &self,
370        py: Python<'_>,
371        by: Vec<PyBackedStr>,
372        lambda: Py<PyAny>,
373        maintain_order: bool,
374    ) -> PyResult<Self> {
375        py.enter_polars_df(|| {
376            let df = self.df.read().clone(); // Clone so we can't deadlock on re-entrance from lambda.
377            let gb = if maintain_order {
378                df.group_by_stable(by.iter().map(|x| &**x))
379            } else {
380                df.group_by(by.iter().map(|x| &**x))
381            }?;
382
383            let function = move |df: DataFrame| {
384                Python::attach(|py| {
385                    let pypolars = polars(py).bind(py);
386                    let pydf = PyDataFrame::new(df);
387                    let python_df_wrapper =
388                        pypolars.getattr("wrap_df").unwrap().call1((pydf,)).unwrap();
389
390                    // Call the lambda and get a python-side DataFrame wrapper.
391                    let result_df_wrapper = match lambda.call1(py, (python_df_wrapper,)) {
392                        Ok(pyobj) => pyobj,
393                        Err(e) => {
394                            polars_bail!(ComputeError: "UDF failed: {}", e.value(py))
395                        },
396                    };
397
398                    let pydf = result_df_wrapper
399                        .getattr(py, "_df")
400                        .and_then(|obj| obj.extract::<PyDataFrame>(py).map_err(|e| e.into()))
401                        .map_err(|err| {
402                            polars_err!(
403                                ComputeError:
404                                "failed to extract DataFrame from UDF return value: \
405                                value: {result_df_wrapper:?}, \
406                                error: {err:?}"
407                            )
408                        })?;
409
410                    Ok(pydf.df.into_inner())
411                })
412            };
413
414            gb.apply(function)
415        })
416    }
417
418    #[allow(clippy::should_implement_trait)]
419    pub fn clone(&self) -> Self {
420        Clone::clone(self)
421    }
422
423    #[cfg(feature = "pivot")]
424    #[pyo3(signature = (on, index, value_name=None, variable_name=None))]
425    pub fn unpivot(
426        &self,
427        py: Python<'_>,
428        on: Option<Vec<PyBackedStr>>,
429        index: Vec<PyBackedStr>,
430        value_name: Option<&str>,
431        variable_name: Option<&str>,
432    ) -> PyResult<Self> {
433        use polars_ops::unpivot::UnpivotDF;
434        let args = UnpivotArgsIR::new(
435            self.df.read().get_column_names_owned(),
436            on.map(strings_to_pl_smallstr),
437            strings_to_pl_smallstr(index),
438            value_name.map(|s| s.into()),
439            variable_name.map(|s| s.into()),
440        );
441
442        py.enter_polars_df(|| self.df.read().unpivot2(args))
443    }
444
445    pub fn partition_by(
446        &self,
447        py: Python<'_>,
448        by: Vec<String>,
449        maintain_order: bool,
450        include_key: bool,
451    ) -> PyResult<Vec<Self>> {
452        let out = py.enter_polars(|| {
453            if maintain_order {
454                self.df.read().partition_by_stable(by, include_key)
455            } else {
456                self.df.read().partition_by(by, include_key)
457            }
458        })?;
459
460        Ok(out.into_iter().map(PyDataFrame::from).collect())
461    }
462
463    pub fn lazy(&self) -> PyLazyFrame {
464        self.df.read().clone().lazy().into()
465    }
466
467    #[pyo3(signature = (columns, separator, drop_first, drop_nulls))]
468    pub fn to_dummies(
469        &self,
470        py: Python<'_>,
471        columns: Option<Vec<String>>,
472        separator: Option<&str>,
473        drop_first: bool,
474        drop_nulls: bool,
475    ) -> PyResult<Self> {
476        py.enter_polars_df(|| match columns {
477            Some(cols) => self.df.read().columns_to_dummies(
478                cols.iter().map(|x| x as &str).collect(),
479                separator,
480                drop_first,
481                drop_nulls,
482            ),
483            None => self.df.read().to_dummies(separator, drop_first, drop_nulls),
484        })
485    }
486
487    pub fn null_count(&self, py: Python) -> PyResult<Self> {
488        py.enter_polars_df(|| Ok(self.df.read().null_count()))
489    }
490
491    pub fn shrink_to_fit(&self, py: Python) -> PyResult<()> {
492        py.enter_polars_ok(|| self.df.write().shrink_to_fit())
493    }
494
495    pub fn hash_rows(
496        &self,
497        py: Python<'_>,
498        k0: u64,
499        k1: u64,
500        k2: u64,
501        k3: u64,
502    ) -> PyResult<PySeries> {
503        // TODO: don't expose all these seeds.
504        let seed = PlFixedStateQuality::default().hash_one((k0, k1, k2, k3));
505        let hb = PlSeedableRandomStateQuality::seed_from_u64(seed);
506        py.enter_polars_series(|| self.df.write().hash_rows(Some(hb)))
507    }
508
509    #[pyo3(signature = (keep_names_as, column_names))]
510    pub fn transpose(
511        &self,
512        py: Python<'_>,
513        keep_names_as: Option<&str>,
514        column_names: &Bound<PyAny>,
515    ) -> PyResult<Self> {
516        let new_col_names = if let Ok(name) = column_names.extract::<Vec<String>>() {
517            Some(Either::Right(name))
518        } else if let Ok(name) = column_names.extract::<String>() {
519            Some(Either::Left(name))
520        } else {
521            None
522        };
523        py.enter_polars_df(|| self.df.write().transpose(keep_names_as, new_col_names))
524    }
525
526    pub fn upsample(
527        &self,
528        py: Python<'_>,
529        by: Vec<String>,
530        index_column: &str,
531        every: &str,
532        stable: bool,
533    ) -> PyResult<Self> {
534        let every = Duration::try_parse(every).map_err(PyPolarsErr::from)?;
535        py.enter_polars_df(|| {
536            if stable {
537                self.df.read().upsample_stable(by, index_column, every)
538            } else {
539                self.df.read().upsample(by, index_column, every)
540            }
541        })
542    }
543
544    pub fn to_struct(
545        &self,
546        py: Python<'_>,
547        name: &str,
548        invalid_indices: Vec<usize>,
549    ) -> PyResult<PySeries> {
550        py.enter_polars_series(|| {
551            let mut ca = self.df.read().clone().into_struct(name.into());
552
553            if !invalid_indices.is_empty() {
554                let mut validity = MutableBitmap::with_capacity(ca.len());
555                validity.extend_constant(ca.len(), true);
556                for i in invalid_indices {
557                    validity.set(i, false);
558                }
559                ca.rechunk_mut();
560                Ok(ca.with_outer_validity(Some(validity.freeze())))
561            } else {
562                Ok(ca)
563            }
564        })
565    }
566
567    pub fn clear(&self, py: Python) -> PyResult<Self> {
568        py.enter_polars_df(|| Ok(self.df.read().clear()))
569    }
570
571    /// Export the columns via polars-ffi
572    /// # Safety
573    /// Needs a preallocated *mut SeriesExport that has allocated space for n_columns.
574    pub unsafe fn _export_columns(&self, location: usize) {
575        use polars_ffi::version_0::export_column;
576
577        let df = self.df.read();
578        let cols = df.columns();
579
580        let location = location as *mut SeriesExport;
581
582        for (i, col) in cols.iter().enumerate() {
583            let e = export_column(col);
584            // SAFETY:
585            // Caller should ensure address is allocated.
586            // Be careful not to drop `e` here as that should be dropped by the ffi consumer
587            unsafe { core::ptr::write(location.add(i), e) };
588        }
589    }
590
591    /// Import [`Self`] via polars-ffi
592    /// # Safety
593    /// [`location`] should be an address that contains [`width`] properly initialized
594    /// [`SeriesExport`]s
595    #[classmethod]
596    pub unsafe fn _import_columns(
597        _cls: &Bound<PyType>,
598        location: usize,
599        width: usize,
600    ) -> PyResult<Self> {
601        use polars_ffi::version_0::import_df;
602
603        let location = location as *mut SeriesExport;
604
605        let df = unsafe { import_df(location, width) }.map_err(PyPolarsErr::from)?;
606        Ok(PyDataFrame::from(df))
607    }
608
609    /// Internal utility function to allow direct access to the row encoding from python.
610    #[pyo3(signature = (opts))]
611    fn _row_encode(&self, py: Python<'_>, opts: Vec<(bool, bool, bool)>) -> PyResult<PySeries> {
612        py.enter_polars_series(|| {
613            let name = PlSmallStr::from_static("row_enc");
614            let is_unordered = opts.first().is_some_and(|(_, _, v)| *v);
615
616            let ca = if is_unordered {
617                _get_rows_encoded_ca_unordered(name, self.df.read().columns())
618            } else {
619                let descending = opts.iter().map(|(v, _, _)| *v).collect::<Vec<_>>();
620                let nulls_last = opts.iter().map(|(_, v, _)| *v).collect::<Vec<_>>();
621
622                _get_rows_encoded_ca(
623                    name,
624                    self.df.read().columns(),
625                    descending.as_slice(),
626                    nulls_last.as_slice(),
627                    false,
628                )
629            }?;
630
631            Ok(ca)
632        })
633    }
634}