1use std::mem::ManuallyDrop;
2
3use arrow::bitmap::MutableBitmap;
4use either::Either;
5use polars::prelude::*;
6#[cfg(feature = "pivot")]
7use polars_lazy::frame::pivot::{pivot, pivot_stable};
8use pyo3::exceptions::PyIndexError;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::PyList;
12use pyo3::IntoPyObjectExt;
13
14use self::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
15use super::PyDataFrame;
16use crate::conversion::Wrap;
17use crate::error::PyPolarsErr;
18use crate::map::dataframe::{
19 apply_lambda_unknown, apply_lambda_with_bool_out_type, apply_lambda_with_primitive_out_type,
20 apply_lambda_with_string_out_type,
21};
22use crate::prelude::strings_to_pl_smallstr;
23use crate::py_modules::polars;
24use crate::series::{PySeries, ToPySeries, ToSeries};
25use crate::{PyExpr, PyLazyFrame};
26
27#[pymethods]
28impl PyDataFrame {
29 #[new]
30 pub fn __init__(columns: Vec<PySeries>) -> PyResult<Self> {
31 let columns = columns.to_series();
32 let columns = columns.into_iter().map(|s| s.into()).collect();
34 let df = DataFrame::new(columns).map_err(PyPolarsErr::from)?;
35 Ok(PyDataFrame::new(df))
36 }
37
38 pub fn estimated_size(&self) -> usize {
39 self.df.estimated_size()
40 }
41
42 pub fn dtype_strings(&self) -> Vec<String> {
43 self.df
44 .get_columns()
45 .iter()
46 .map(|s| format!("{}", s.dtype()))
47 .collect()
48 }
49
50 pub fn add(&self, py: Python, s: &PySeries) -> PyResult<Self> {
51 let df = py
52 .allow_threads(|| &self.df + &s.series)
53 .map_err(PyPolarsErr::from)?;
54 Ok(df.into())
55 }
56
57 pub fn sub(&self, py: Python, s: &PySeries) -> PyResult<Self> {
58 let df = py
59 .allow_threads(|| &self.df - &s.series)
60 .map_err(PyPolarsErr::from)?;
61 Ok(df.into())
62 }
63
64 pub fn div(&self, py: Python, s: &PySeries) -> PyResult<Self> {
65 let df = py
66 .allow_threads(|| &self.df / &s.series)
67 .map_err(PyPolarsErr::from)?;
68 Ok(df.into())
69 }
70
71 pub fn mul(&self, py: Python, s: &PySeries) -> PyResult<Self> {
72 let df = py
73 .allow_threads(|| &self.df * &s.series)
74 .map_err(PyPolarsErr::from)?;
75 Ok(df.into())
76 }
77
78 pub fn rem(&self, py: Python, s: &PySeries) -> PyResult<Self> {
79 let df = py
80 .allow_threads(|| &self.df % &s.series)
81 .map_err(PyPolarsErr::from)?;
82 Ok(df.into())
83 }
84
85 pub fn add_df(&self, py: Python, s: &Self) -> PyResult<Self> {
86 let df = py
87 .allow_threads(|| &self.df + &s.df)
88 .map_err(PyPolarsErr::from)?;
89 Ok(df.into())
90 }
91
92 pub fn sub_df(&self, py: Python, s: &Self) -> PyResult<Self> {
93 let df = py
94 .allow_threads(|| &self.df - &s.df)
95 .map_err(PyPolarsErr::from)?;
96 Ok(df.into())
97 }
98
99 pub fn div_df(&self, py: Python, s: &Self) -> PyResult<Self> {
100 let df = py
101 .allow_threads(|| &self.df / &s.df)
102 .map_err(PyPolarsErr::from)?;
103 Ok(df.into())
104 }
105
106 pub fn mul_df(&self, py: Python, s: &Self) -> PyResult<Self> {
107 let df = py
108 .allow_threads(|| &self.df * &s.df)
109 .map_err(PyPolarsErr::from)?;
110 Ok(df.into())
111 }
112
113 pub fn rem_df(&self, py: Python, s: &Self) -> PyResult<Self> {
114 let df = py
115 .allow_threads(|| &self.df % &s.df)
116 .map_err(PyPolarsErr::from)?;
117 Ok(df.into())
118 }
119
120 #[pyo3(signature = (n, with_replacement, shuffle, seed=None))]
121 pub fn sample_n(
122 &self,
123 py: Python,
124 n: &PySeries,
125 with_replacement: bool,
126 shuffle: bool,
127 seed: Option<u64>,
128 ) -> PyResult<Self> {
129 let df = py
130 .allow_threads(|| self.df.sample_n(&n.series, with_replacement, shuffle, seed))
131 .map_err(PyPolarsErr::from)?;
132 Ok(df.into())
133 }
134
135 #[pyo3(signature = (frac, with_replacement, shuffle, seed=None))]
136 pub fn sample_frac(
137 &self,
138 py: Python,
139 frac: &PySeries,
140 with_replacement: bool,
141 shuffle: bool,
142 seed: Option<u64>,
143 ) -> PyResult<Self> {
144 let df = py
145 .allow_threads(|| {
146 self.df
147 .sample_frac(&frac.series, with_replacement, shuffle, seed)
148 })
149 .map_err(PyPolarsErr::from)?;
150 Ok(df.into())
151 }
152
153 pub fn rechunk(&self, py: Python) -> Self {
154 let mut df = self.df.clone();
155 py.allow_threads(|| df.as_single_chunk_par());
156 df.into()
157 }
158
159 pub fn as_str(&self) -> String {
161 format!("{:?}", self.df)
162 }
163
164 pub fn get_columns(&self) -> Vec<PySeries> {
165 let cols = self.df.get_columns().to_vec();
166 cols.to_pyseries()
167 }
168
169 pub fn columns(&self) -> Vec<&str> {
171 self.df.get_column_names_str()
172 }
173
174 pub fn set_column_names(&mut self, names: Vec<PyBackedStr>) -> PyResult<()> {
176 self.df
177 .set_column_names(names.iter().map(|x| &**x))
178 .map_err(PyPolarsErr::from)?;
179 Ok(())
180 }
181
182 pub fn dtypes<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
184 let iter = self
185 .df
186 .iter()
187 .map(|s| Wrap(s.dtype().clone()).into_pyobject(py).unwrap());
188 PyList::new(py, iter)
189 }
190
191 pub fn n_chunks(&self) -> usize {
192 self.df.first_col_n_chunks()
193 }
194
195 pub fn shape(&self) -> (usize, usize) {
196 self.df.shape()
197 }
198
199 pub fn height(&self) -> usize {
200 self.df.height()
201 }
202
203 pub fn width(&self) -> usize {
204 self.df.width()
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.df.is_empty()
209 }
210
211 pub fn hstack(&self, py: Python, columns: Vec<PySeries>) -> PyResult<Self> {
212 let columns = columns.to_series();
213 let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
215 let df = py
216 .allow_threads(|| self.df.hstack(&columns))
217 .map_err(PyPolarsErr::from)?;
218 Ok(df.into())
219 }
220
221 pub fn hstack_mut(&mut self, py: Python, columns: Vec<PySeries>) -> PyResult<()> {
222 let columns = columns.to_series();
223 let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
225 py.allow_threads(|| self.df.hstack_mut(&columns))
226 .map_err(PyPolarsErr::from)?;
227 Ok(())
228 }
229
230 pub fn vstack(&self, py: Python, other: &PyDataFrame) -> PyResult<Self> {
231 let df = py
232 .allow_threads(|| self.df.vstack(&other.df))
233 .map_err(PyPolarsErr::from)?;
234 Ok(df.into())
235 }
236
237 pub fn vstack_mut(&mut self, py: Python, other: &PyDataFrame) -> PyResult<()> {
238 py.allow_threads(|| self.df.vstack_mut(&other.df))
239 .map_err(PyPolarsErr::from)?;
240 Ok(())
241 }
242
243 pub fn extend(&mut self, py: Python, other: &PyDataFrame) -> PyResult<()> {
244 py.allow_threads(|| self.df.extend(&other.df))
245 .map_err(PyPolarsErr::from)?;
246 Ok(())
247 }
248
249 pub fn drop_in_place(&mut self, name: &str) -> PyResult<PySeries> {
250 let s = self.df.drop_in_place(name).map_err(PyPolarsErr::from)?;
251 let s = s.take_materialized_series();
252 Ok(PySeries { series: s })
253 }
254
255 pub fn to_series(&self, index: isize) -> PyResult<PySeries> {
256 let df = &self.df;
257
258 let index_adjusted = if index < 0 {
259 df.width().checked_sub(index.unsigned_abs())
260 } else {
261 Some(usize::try_from(index).unwrap())
262 };
263
264 let s = index_adjusted.and_then(|i| df.select_at_idx(i));
265 match s {
266 Some(s) => Ok(PySeries::new(s.as_materialized_series().clone())),
267 None => Err(PyIndexError::new_err(
268 polars_err!(oob = index, df.width()).to_string(),
269 )),
270 }
271 }
272
273 pub fn get_column_index(&self, name: &str) -> PyResult<usize> {
274 Ok(self
275 .df
276 .try_get_column_index(name)
277 .map_err(PyPolarsErr::from)?)
278 }
279
280 pub fn get_column(&self, name: &str) -> PyResult<PySeries> {
281 let series = self
282 .df
283 .column(name)
284 .map(|s| PySeries::new(s.as_materialized_series().clone()))
285 .map_err(PyPolarsErr::from)?;
286 Ok(series)
287 }
288
289 pub fn select(&self, py: Python, columns: Vec<PyBackedStr>) -> PyResult<Self> {
290 let df = py
291 .allow_threads(|| self.df.select(columns.iter().map(|x| &**x)))
292 .map_err(PyPolarsErr::from)?;
293 Ok(PyDataFrame::new(df))
294 }
295
296 pub fn gather(&self, py: Python, indices: Wrap<Vec<IdxSize>>) -> PyResult<Self> {
297 let indices = indices.0;
298 let indices = IdxCa::from_vec("".into(), indices);
299 let df = Python::allow_threads(py, || self.df.take(&indices).map_err(PyPolarsErr::from))?;
300 Ok(PyDataFrame::new(df))
301 }
302
303 pub fn gather_with_series(&self, py: Python, indices: &PySeries) -> PyResult<Self> {
304 let indices = indices.series.idx().map_err(PyPolarsErr::from)?;
305 let df = Python::allow_threads(py, || self.df.take(indices).map_err(PyPolarsErr::from))?;
306 Ok(PyDataFrame::new(df))
307 }
308
309 pub fn replace(&mut self, column: &str, new_col: PySeries) -> PyResult<()> {
310 self.df
311 .replace(column, new_col.series)
312 .map_err(PyPolarsErr::from)?;
313 Ok(())
314 }
315
316 pub fn replace_column(&mut self, index: usize, new_column: PySeries) -> PyResult<()> {
317 self.df
318 .replace_column(index, new_column.series)
319 .map_err(PyPolarsErr::from)?;
320 Ok(())
321 }
322
323 pub fn insert_column(&mut self, index: usize, column: PySeries) -> PyResult<()> {
324 self.df
325 .insert_column(index, column.series)
326 .map_err(PyPolarsErr::from)?;
327 Ok(())
328 }
329
330 #[pyo3(signature = (offset, length=None))]
331 pub fn slice(&self, py: Python, offset: i64, length: Option<usize>) -> Self {
332 let df = py.allow_threads(|| {
333 self.df
334 .slice(offset, length.unwrap_or_else(|| self.df.height()))
335 });
336 df.into()
337 }
338
339 pub fn head(&self, py: Python, n: usize) -> Self {
340 let df = py.allow_threads(|| self.df.head(Some(n)));
341 PyDataFrame::new(df)
342 }
343
344 pub fn tail(&self, py: Python, n: usize) -> Self {
345 let df = py.allow_threads(|| self.df.tail(Some(n)));
346 PyDataFrame::new(df)
347 }
348
349 pub fn is_unique(&self, py: Python) -> PyResult<PySeries> {
350 let mask = py
351 .allow_threads(|| self.df.is_unique())
352 .map_err(PyPolarsErr::from)?;
353 Ok(mask.into_series().into())
354 }
355
356 pub fn is_duplicated(&self, py: Python) -> PyResult<PySeries> {
357 let mask = py
358 .allow_threads(|| self.df.is_duplicated())
359 .map_err(PyPolarsErr::from)?;
360 Ok(mask.into_series().into())
361 }
362
363 pub fn equals(&self, py: Python, other: &PyDataFrame, null_equal: bool) -> bool {
364 if null_equal {
365 py.allow_threads(|| self.df.equals_missing(&other.df))
366 } else {
367 py.allow_threads(|| self.df.equals(&other.df))
368 }
369 }
370
371 #[pyo3(signature = (name, offset=None))]
372 pub fn with_row_index(
373 &self,
374 py: Python,
375 name: &str,
376 offset: Option<IdxSize>,
377 ) -> PyResult<Self> {
378 let df = py
379 .allow_threads(|| self.df.with_row_index(name.into(), offset))
380 .map_err(PyPolarsErr::from)?;
381 Ok(df.into())
382 }
383
384 pub fn _to_metadata(&self) -> Self {
385 Self {
386 df: self.df._to_metadata(),
387 }
388 }
389
390 pub fn group_by_map_groups(
391 &self,
392 by: Vec<PyBackedStr>,
393 lambda: PyObject,
394 maintain_order: bool,
395 ) -> PyResult<Self> {
396 let gb = if maintain_order {
397 self.df.group_by_stable(by.iter().map(|x| &**x))
398 } else {
399 self.df.group_by(by.iter().map(|x| &**x))
400 }
401 .map_err(PyPolarsErr::from)?;
402
403 let function = move |df: DataFrame| {
404 Python::with_gil(|py| {
405 let pypolars = polars(py).bind(py);
406 let pydf = PyDataFrame::new(df);
407 let python_df_wrapper =
408 pypolars.getattr("wrap_df").unwrap().call1((pydf,)).unwrap();
409
410 let result_df_wrapper = match lambda.call1(py, (python_df_wrapper,)) {
412 Ok(pyobj) => pyobj,
413 Err(e) => panic!("UDF failed: {}", e.value(py)),
414 };
415 let py_pydf = result_df_wrapper.getattr(py, "_df").expect(
416 "Could not get DataFrame attribute '_df'. Make sure that you return a DataFrame object.",
417 );
418
419 let pydf = py_pydf.extract::<PyDataFrame>(py).unwrap();
420 Ok(pydf.df)
421 })
422 };
423 let df = gb.apply(function).map_err(PyPolarsErr::from)?;
428
429 Ok(df.into())
430 }
431
432 #[allow(clippy::should_implement_trait)]
433 pub fn clone(&self) -> Self {
434 PyDataFrame::new(self.df.clone())
435 }
436
437 #[cfg(feature = "pivot")]
438 #[pyo3(signature = (on, index, value_name=None, variable_name=None))]
439 pub fn unpivot(
440 &self,
441 py: Python,
442 on: Vec<PyBackedStr>,
443 index: Vec<PyBackedStr>,
444 value_name: Option<&str>,
445 variable_name: Option<&str>,
446 ) -> PyResult<Self> {
447 use polars_ops::pivot::UnpivotDF;
448 let args = UnpivotArgsIR {
449 on: strings_to_pl_smallstr(on),
450 index: strings_to_pl_smallstr(index),
451 value_name: value_name.map(|s| s.into()),
452 variable_name: variable_name.map(|s| s.into()),
453 };
454
455 let df = py
456 .allow_threads(|| self.df.unpivot2(args))
457 .map_err(PyPolarsErr::from)?;
458 Ok(PyDataFrame::new(df))
459 }
460
461 #[cfg(feature = "pivot")]
462 #[pyo3(signature = (on, index, values, maintain_order, sort_columns, aggregate_expr, separator))]
463 pub fn pivot_expr(
464 &self,
465 py: Python,
466 on: Vec<String>,
467 index: Option<Vec<String>>,
468 values: Option<Vec<String>>,
469 maintain_order: bool,
470 sort_columns: bool,
471 aggregate_expr: Option<PyExpr>,
472 separator: Option<&str>,
473 ) -> PyResult<Self> {
474 let fun = if maintain_order { pivot_stable } else { pivot };
475 let agg_expr = aggregate_expr.map(|expr| expr.inner);
476 let df = py
477 .allow_threads(|| {
478 fun(
479 &self.df,
480 on,
481 index,
482 values,
483 sort_columns,
484 agg_expr,
485 separator,
486 )
487 })
488 .map_err(PyPolarsErr::from)?;
489 Ok(PyDataFrame::new(df))
490 }
491
492 pub fn partition_by(
493 &self,
494 py: Python,
495 by: Vec<String>,
496 maintain_order: bool,
497 include_key: bool,
498 ) -> PyResult<Vec<Self>> {
499 let out = py
500 .allow_threads(|| {
501 if maintain_order {
502 self.df.partition_by_stable(by, include_key)
503 } else {
504 self.df.partition_by(by, include_key)
505 }
506 })
507 .map_err(PyPolarsErr::from)?;
508
509 Ok(unsafe { std::mem::transmute::<Vec<DataFrame>, Vec<PyDataFrame>>(out) })
511 }
512
513 pub fn lazy(&self) -> PyLazyFrame {
514 self.df.clone().lazy().into()
515 }
516
517 #[pyo3(signature = (columns, separator, drop_first=false))]
518 pub fn to_dummies(
519 &self,
520 py: Python,
521 columns: Option<Vec<String>>,
522 separator: Option<&str>,
523 drop_first: bool,
524 ) -> PyResult<Self> {
525 let df = py
526 .allow_threads(|| match columns {
527 Some(cols) => self.df.columns_to_dummies(
528 cols.iter().map(|x| x as &str).collect(),
529 separator,
530 drop_first,
531 ),
532 None => self.df.to_dummies(separator, drop_first),
533 })
534 .map_err(PyPolarsErr::from)?;
535 Ok(df.into())
536 }
537
538 pub fn null_count(&self, py: Python) -> Self {
539 let df = py.allow_threads(|| self.df.null_count());
540 df.into()
541 }
542
543 #[pyo3(signature = (lambda, output_type, inference_size))]
544 pub fn map_rows(
545 &mut self,
546 lambda: Bound<PyAny>,
547 output_type: Option<Wrap<DataType>>,
548 inference_size: usize,
549 ) -> PyResult<(PyObject, bool)> {
550 Python::with_gil(|py| {
551 self.df.as_single_chunk_par();
553 let df = &self.df;
554
555 use apply_lambda_with_primitive_out_type as apply;
556 #[rustfmt::skip]
557 let out = match output_type.map(|dt| dt.0) {
558 Some(DataType::Int32) => apply::<Int32Type>(df, py, lambda, 0, None)?.into_series(),
559 Some(DataType::Int64) => apply::<Int64Type>(df, py, lambda, 0, None)?.into_series(),
560 Some(DataType::UInt32) => apply::<UInt32Type>(df, py, lambda, 0, None)?.into_series(),
561 Some(DataType::UInt64) => apply::<UInt64Type>(df, py, lambda, 0, None)?.into_series(),
562 Some(DataType::Float32) => apply::<Float32Type>(df, py, lambda, 0, None)?.into_series(),
563 Some(DataType::Float64) => apply::<Float64Type>(df, py, lambda, 0, None)?.into_series(),
564 Some(DataType::Date) => apply::<Int32Type>(df, py, lambda, 0, None)?.into_date().into_series(),
565 Some(DataType::Datetime(tu, tz)) => apply::<Int64Type>(df, py, lambda, 0, None)?.into_datetime(tu, tz).into_series(),
566 Some(DataType::Boolean) => apply_lambda_with_bool_out_type(df, py, lambda, 0, None)?.into_series(),
567 Some(DataType::String) => apply_lambda_with_string_out_type(df, py, lambda, 0, None)?.into_series(),
568 _ => return apply_lambda_unknown(df, py, lambda, inference_size),
569 };
570
571 Ok((PySeries::from(out).into_py_any(py)?, false))
572 })
573 }
574
575 pub fn shrink_to_fit(&mut self, py: Python) {
576 py.allow_threads(|| self.df.shrink_to_fit());
577 }
578
579 pub fn hash_rows(
580 &mut self,
581 py: Python,
582 k0: u64,
583 k1: u64,
584 k2: u64,
585 k3: u64,
586 ) -> PyResult<PySeries> {
587 let hb = PlRandomState::with_seeds(k0, k1, k2, k3);
588 let hash = py
589 .allow_threads(|| self.df.hash_rows(Some(hb)))
590 .map_err(PyPolarsErr::from)?;
591 Ok(hash.into_series().into())
592 }
593
594 #[pyo3(signature = (keep_names_as, column_names))]
595 pub fn transpose(
596 &mut self,
597 py: Python,
598 keep_names_as: Option<&str>,
599 column_names: &Bound<PyAny>,
600 ) -> PyResult<Self> {
601 let new_col_names = if let Ok(name) = column_names.extract::<Vec<String>>() {
602 Some(Either::Right(name))
603 } else if let Ok(name) = column_names.extract::<String>() {
604 Some(Either::Left(name))
605 } else {
606 None
607 };
608 Ok(py
609 .allow_threads(|| self.df.transpose(keep_names_as, new_col_names))
610 .map_err(PyPolarsErr::from)?
611 .into())
612 }
613
614 pub fn upsample(
615 &self,
616 py: Python,
617 by: Vec<String>,
618 index_column: &str,
619 every: &str,
620 stable: bool,
621 ) -> PyResult<Self> {
622 let every = Duration::try_parse(every).map_err(PyPolarsErr::from)?;
623 let out = py.allow_threads(|| {
624 if stable {
625 self.df.upsample_stable(by, index_column, every)
626 } else {
627 self.df.upsample(by, index_column, every)
628 }
629 });
630 let out = out.map_err(PyPolarsErr::from)?;
631 Ok(out.into())
632 }
633
634 pub fn to_struct(&self, py: Python, name: &str, invalid_indices: Vec<usize>) -> PySeries {
635 py.allow_threads(|| {
636 let ca = self.df.clone().into_struct(name.into());
637
638 if !invalid_indices.is_empty() {
639 let mut validity = MutableBitmap::with_capacity(ca.len());
640 validity.extend_constant(ca.len(), true);
641 for i in invalid_indices {
642 validity.set(i, false);
643 }
644 let ca = ca.rechunk();
645 ca.with_outer_validity(Some(validity.freeze()))
646 .into_series()
647 .into()
648 } else {
649 ca.into_series().into()
650 }
651 })
652 }
653
654 pub fn clear(&self, py: Python) -> Self {
655 py.allow_threads(|| self.df.clear()).into()
656 }
657
658 #[allow(clippy::wrong_self_convention)]
659 pub fn into_raw_parts(&mut self) -> (usize, usize, usize) {
660 let df = std::mem::take(&mut self.df);
663 let cols = df.take_columns();
664 let mut md_cols = ManuallyDrop::new(cols);
665 let ptr = md_cols.as_mut_ptr();
666 let len = md_cols.len();
667 let cap = md_cols.capacity();
668 (ptr as usize, len, cap)
669 }
670
671 #[pyo3(signature = (opts))]
673 fn _row_encode<'py>(
674 &'py self,
675 py: Python<'py>,
676 opts: Vec<(bool, bool, bool)>,
677 ) -> PyResult<PySeries> {
678 py.allow_threads(|| {
679 let name = PlSmallStr::from_static("row_enc");
680 let is_unordered = opts.first().is_some_and(|(_, _, v)| *v);
681
682 let ca = if is_unordered {
683 _get_rows_encoded_ca_unordered(name, self.df.get_columns())
684 } else {
685 let descending = opts.iter().map(|(v, _, _)| *v).collect::<Vec<_>>();
686 let nulls_last = opts.iter().map(|(_, v, _)| *v).collect::<Vec<_>>();
687
688 _get_rows_encoded_ca(
689 name,
690 self.df.get_columns(),
691 descending.as_slice(),
692 nulls_last.as_slice(),
693 )
694 }
695 .map_err(PyPolarsErr::from)?;
696
697 Ok(ca.into_series().into())
698 })
699 }
700}