1use std::hash::BuildHasher;
2
3use arrow::bitmap::MutableBitmap;
4use either::Either;
5use parking_lot::{RwLock, RwLockWriteGuard};
6use polars::prelude::*;
7use polars_ffi::version_0::SeriesExport;
8#[cfg(feature = "pivot")]
9use polars_lazy::frame::pivot::{pivot, pivot_stable};
10use pyo3::IntoPyObjectExt;
11use pyo3::exceptions::PyIndexError;
12use pyo3::prelude::*;
13use pyo3::pybacked::PyBackedStr;
14use pyo3::types::{PyList, PyType};
15
16use self::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
17use super::PyDataFrame;
18use crate::conversion::Wrap;
19use crate::error::PyPolarsErr;
20use crate::map::dataframe::{
21 apply_lambda_unknown, apply_lambda_with_bool_out_type, apply_lambda_with_primitive_out_type,
22 apply_lambda_with_string_out_type,
23};
24use crate::prelude::strings_to_pl_smallstr;
25use crate::py_modules::polars;
26use crate::series::{PySeries, ToPySeries, ToSeries};
27use crate::utils::EnterPolarsExt;
28use crate::{PyExpr, PyLazyFrame};
29
30#[pymethods]
31impl PyDataFrame {
32 #[new]
33 pub fn __init__(columns: Vec<PySeries>) -> PyResult<Self> {
34 let columns = columns.to_series();
35 let columns = columns.into_iter().map(|s| s.into()).collect();
37 let df = DataFrame::new(columns).map_err(PyPolarsErr::from)?;
38 Ok(PyDataFrame::new(df))
39 }
40
41 pub fn estimated_size(&self) -> usize {
42 self.df.read().estimated_size()
43 }
44
45 pub fn dtype_strings(&self) -> Vec<String> {
46 self.df
47 .read()
48 .get_columns()
49 .iter()
50 .map(|s| format!("{}", s.dtype()))
51 .collect()
52 }
53
54 pub fn add(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
55 py.enter_polars_df(|| &*self.df.read() + &*s.series.read())
56 }
57
58 pub fn sub(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
59 py.enter_polars_df(|| &*self.df.read() - &*s.series.read())
60 }
61
62 pub fn mul(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
63 py.enter_polars_df(|| &*self.df.read() * &*s.series.read())
64 }
65
66 pub fn div(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
67 py.enter_polars_df(|| &*self.df.read() / &*s.series.read())
68 }
69
70 pub fn rem(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
71 py.enter_polars_df(|| &*self.df.read() % &*s.series.read())
72 }
73
74 pub fn add_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
75 py.enter_polars_df(|| &*self.df.read() + &*s.df.read())
76 }
77
78 pub fn sub_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
79 py.enter_polars_df(|| &*self.df.read() - &*s.df.read())
80 }
81
82 pub fn mul_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
83 py.enter_polars_df(|| &*self.df.read() * &*s.df.read())
84 }
85
86 pub fn div_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
87 py.enter_polars_df(|| &*self.df.read() / &*s.df.read())
88 }
89
90 pub fn rem_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
91 py.enter_polars_df(|| &*self.df.read() % &*s.df.read())
92 }
93
94 #[pyo3(signature = (n, with_replacement, shuffle, seed=None))]
95 pub fn sample_n(
96 &self,
97 py: Python<'_>,
98 n: &PySeries,
99 with_replacement: bool,
100 shuffle: bool,
101 seed: Option<u64>,
102 ) -> PyResult<Self> {
103 py.enter_polars_df(|| {
104 self.df
105 .read()
106 .sample_n(&n.series.read(), with_replacement, shuffle, seed)
107 })
108 }
109
110 #[pyo3(signature = (frac, with_replacement, shuffle, seed=None))]
111 pub fn sample_frac(
112 &self,
113 py: Python<'_>,
114 frac: &PySeries,
115 with_replacement: bool,
116 shuffle: bool,
117 seed: Option<u64>,
118 ) -> PyResult<Self> {
119 py.enter_polars_df(|| {
120 self.df
121 .read()
122 .sample_frac(&frac.series.read(), with_replacement, shuffle, seed)
123 })
124 }
125
126 pub fn rechunk(&self, py: Python) -> PyResult<Self> {
127 py.enter_polars_df(|| {
128 let mut df = self.df.read().clone();
129 df.as_single_chunk_par();
130 Ok(df)
131 })
132 }
133
134 pub fn as_str(&self) -> String {
136 format!("{:?}", self.df.read())
137 }
138
139 pub fn get_columns(&self) -> Vec<PySeries> {
140 let cols = self.df.read().get_columns().to_vec();
141 cols.to_pyseries()
142 }
143
144 pub fn columns(&self) -> Vec<String> {
146 self.df
147 .read()
148 .get_columns()
149 .iter()
150 .map(|s| s.name().to_string())
151 .collect()
152 }
153
154 pub fn set_column_names(&self, names: Vec<PyBackedStr>) -> PyResult<()> {
156 self.df
157 .write()
158 .set_column_names(names.iter().map(|x| &**x))
159 .map_err(PyPolarsErr::from)?;
160 Ok(())
161 }
162
163 pub fn dtypes<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
165 let df = self.df.read();
166 let iter = df
167 .iter()
168 .map(|s| Wrap(s.dtype().clone()).into_pyobject(py).unwrap());
169 PyList::new(py, iter)
170 }
171
172 pub fn n_chunks(&self) -> usize {
173 self.df.read().first_col_n_chunks()
174 }
175
176 pub fn shape(&self) -> (usize, usize) {
177 self.df.read().shape()
178 }
179
180 pub fn height(&self) -> usize {
181 self.df.read().height()
182 }
183
184 pub fn width(&self) -> usize {
185 self.df.read().width()
186 }
187
188 pub fn is_empty(&self) -> bool {
189 self.df.read().is_empty()
190 }
191
192 pub fn hstack(&self, py: Python<'_>, columns: Vec<PySeries>) -> PyResult<Self> {
193 let columns = columns.to_series();
194 let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
196 py.enter_polars_df(|| self.df.read().hstack(&columns))
197 }
198
199 pub fn hstack_mut(&self, py: Python<'_>, columns: Vec<PySeries>) -> PyResult<()> {
200 let columns = columns.to_series();
201 let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
203 py.enter_polars(|| self.df.write().hstack_mut(&columns).map(drop))?;
204 Ok(())
205 }
206
207 pub fn vstack(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<Self> {
208 py.enter_polars_df(|| self.df.read().vstack(&other.df.read()))
209 }
210
211 pub fn vstack_mut(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<()> {
212 py.enter_polars(|| {
213 let other = other.df.read().clone();
215 self.df.write().vstack_mut(&other)?;
216 PolarsResult::Ok(())
217 })?;
218 Ok(())
219 }
220
221 pub fn extend(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<()> {
222 py.enter_polars(|| {
223 let other = other.df.read().clone();
225 self.df.write().extend(&other)
226 })?;
227 Ok(())
228 }
229
230 pub fn drop_in_place(&self, name: &str) -> PyResult<PySeries> {
231 let s = self
232 .df
233 .write()
234 .drop_in_place(name)
235 .map_err(PyPolarsErr::from)?;
236 let s = s.take_materialized_series();
237 Ok(PySeries::from(s))
238 }
239
240 pub fn to_series(&self, index: isize) -> PyResult<PySeries> {
241 let df = &self.df.read();
242
243 let index_adjusted = if index < 0 {
244 df.width().checked_sub(index.unsigned_abs())
245 } else {
246 Some(usize::try_from(index).unwrap())
247 };
248
249 let s = index_adjusted.and_then(|i| df.select_at_idx(i));
250 match s {
251 Some(s) => Ok(PySeries::new(s.as_materialized_series().clone())),
252 None => Err(PyIndexError::new_err(
253 polars_err!(oob = index, df.width()).to_string(),
254 )),
255 }
256 }
257
258 pub fn get_column_index(&self, name: &str) -> PyResult<usize> {
259 Ok(self
260 .df
261 .read()
262 .try_get_column_index(name)
263 .map_err(PyPolarsErr::from)?)
264 }
265
266 pub fn get_column(&self, name: &str) -> PyResult<PySeries> {
267 let series = self
268 .df
269 .read()
270 .column(name)
271 .map(|s| PySeries::new(s.as_materialized_series().clone()))
272 .map_err(PyPolarsErr::from)?;
273 Ok(series)
274 }
275
276 pub fn select(&self, py: Python<'_>, columns: Vec<PyBackedStr>) -> PyResult<Self> {
277 py.enter_polars_df(|| self.df.read().select(columns.iter().map(|x| &**x)))
278 }
279
280 pub fn gather(&self, py: Python<'_>, indices: Wrap<Vec<IdxSize>>) -> PyResult<Self> {
281 let indices = indices.0;
282 let indices = IdxCa::from_vec("".into(), indices);
283 py.enter_polars_df(|| self.df.read().take(&indices))
284 }
285
286 pub fn gather_with_series(&self, py: Python<'_>, indices: &PySeries) -> PyResult<Self> {
287 let idx_s = indices.series.read();
288 let indices = idx_s.idx().map_err(PyPolarsErr::from)?;
289 py.enter_polars_df(|| self.df.read().take(indices))
290 }
291
292 pub fn replace(&self, column: &str, new_col: PySeries) -> PyResult<()> {
293 self.df
294 .write()
295 .replace(column, new_col.series.into_inner())
296 .map_err(PyPolarsErr::from)?;
297 Ok(())
298 }
299
300 pub fn replace_column(&self, index: usize, new_column: PySeries) -> PyResult<()> {
301 self.df
302 .write()
303 .replace_column(index, new_column.series.into_inner())
304 .map_err(PyPolarsErr::from)?;
305 Ok(())
306 }
307
308 pub fn insert_column(&self, index: usize, column: PySeries) -> PyResult<()> {
309 self.df
310 .write()
311 .insert_column(index, column.series.into_inner())
312 .map_err(PyPolarsErr::from)?;
313 Ok(())
314 }
315
316 #[pyo3(signature = (offset, length))]
317 pub fn slice(&self, py: Python<'_>, offset: i64, length: Option<usize>) -> PyResult<Self> {
318 py.enter_polars_df(|| {
319 let df = self.df.read();
320 Ok(df.slice(offset, length.unwrap_or_else(|| df.height())))
321 })
322 }
323
324 pub fn head(&self, py: Python<'_>, n: usize) -> PyResult<Self> {
325 py.enter_polars_df(|| Ok(self.df.read().head(Some(n))))
326 }
327
328 pub fn tail(&self, py: Python<'_>, n: usize) -> PyResult<Self> {
329 py.enter_polars_df(|| Ok(self.df.read().tail(Some(n))))
330 }
331
332 pub fn is_unique(&self, py: Python) -> PyResult<PySeries> {
333 py.enter_polars_series(|| self.df.read().is_unique())
334 }
335
336 pub fn is_duplicated(&self, py: Python) -> PyResult<PySeries> {
337 py.enter_polars_series(|| self.df.read().is_duplicated())
338 }
339
340 pub fn equals(&self, py: Python<'_>, other: &PyDataFrame, null_equal: bool) -> PyResult<bool> {
341 if null_equal {
342 py.enter_polars_ok(|| self.df.read().equals_missing(&other.df.read()))
343 } else {
344 py.enter_polars_ok(|| self.df.read().equals(&other.df.read()))
345 }
346 }
347
348 #[pyo3(signature = (name, offset=None))]
349 pub fn with_row_index(
350 &self,
351 py: Python<'_>,
352 name: &str,
353 offset: Option<IdxSize>,
354 ) -> PyResult<Self> {
355 py.enter_polars_df(|| self.df.read().with_row_index(name.into(), offset))
356 }
357
358 pub fn _to_metadata(&self) -> Self {
359 Self {
360 df: RwLock::new(self.df.read()._to_metadata()),
361 }
362 }
363
364 pub fn group_by_map_groups(
365 &self,
366 py: Python<'_>,
367 by: Vec<PyBackedStr>,
368 lambda: PyObject,
369 maintain_order: bool,
370 ) -> PyResult<Self> {
371 py.enter_polars_df(|| {
372 let df = self.df.read().clone(); let gb = if maintain_order {
374 df.group_by_stable(by.iter().map(|x| &**x))
375 } else {
376 df.group_by(by.iter().map(|x| &**x))
377 }?;
378
379 let function = move |df: DataFrame| {
380 Python::with_gil(|py| {
381 let pypolars = polars(py).bind(py);
382 let pydf = PyDataFrame::new(df);
383 let python_df_wrapper =
384 pypolars.getattr("wrap_df").unwrap().call1((pydf,)).unwrap();
385
386 let result_df_wrapper = match lambda.call1(py, (python_df_wrapper,)) {
388 Ok(pyobj) => pyobj,
389 Err(e) => panic!("UDF failed: {}", e.value(py)),
390 };
391 let py_pydf = result_df_wrapper.getattr(py, "_df").expect(
392 "Could not get DataFrame attribute '_df'. Make sure that you return a DataFrame object.",
393 );
394
395 let pydf = py_pydf.extract::<PyDataFrame>(py).unwrap();
396 Ok(pydf.df.into_inner())
397 })
398 };
399
400 gb.apply(function)
401 })
402 }
403
404 #[allow(clippy::should_implement_trait)]
405 pub fn clone(&self) -> Self {
406 Clone::clone(self)
407 }
408
409 #[cfg(feature = "pivot")]
410 #[pyo3(signature = (on, index, value_name=None, variable_name=None))]
411 pub fn unpivot(
412 &self,
413 py: Python<'_>,
414 on: Vec<PyBackedStr>,
415 index: Vec<PyBackedStr>,
416 value_name: Option<&str>,
417 variable_name: Option<&str>,
418 ) -> PyResult<Self> {
419 use polars_ops::pivot::UnpivotDF;
420 let args = UnpivotArgsIR {
421 on: strings_to_pl_smallstr(on),
422 index: strings_to_pl_smallstr(index),
423 value_name: value_name.map(|s| s.into()),
424 variable_name: variable_name.map(|s| s.into()),
425 };
426
427 py.enter_polars_df(|| self.df.read().unpivot2(args))
428 }
429
430 #[cfg(feature = "pivot")]
431 #[pyo3(signature = (on, index, values, maintain_order, sort_columns, aggregate_expr, separator))]
432 pub fn pivot_expr(
433 &self,
434 py: Python<'_>,
435 on: Vec<String>,
436 index: Option<Vec<String>>,
437 values: Option<Vec<String>>,
438 maintain_order: bool,
439 sort_columns: bool,
440 aggregate_expr: Option<PyExpr>,
441 separator: Option<&str>,
442 ) -> PyResult<Self> {
443 let df = self.df.read().clone(); let fun = if maintain_order { pivot_stable } else { pivot };
445 let agg_expr = aggregate_expr.map(|expr| expr.inner);
446 py.enter_polars_df(|| fun(&df, on, index, values, sort_columns, agg_expr, separator))
447 }
448
449 pub fn partition_by(
450 &self,
451 py: Python<'_>,
452 by: Vec<String>,
453 maintain_order: bool,
454 include_key: bool,
455 ) -> PyResult<Vec<Self>> {
456 let out = py.enter_polars(|| {
457 if maintain_order {
458 self.df.read().partition_by_stable(by, include_key)
459 } else {
460 self.df.read().partition_by(by, include_key)
461 }
462 })?;
463
464 Ok(out.into_iter().map(PyDataFrame::from).collect())
465 }
466
467 pub fn lazy(&self) -> PyLazyFrame {
468 self.df.read().clone().lazy().into()
469 }
470
471 #[pyo3(signature = (columns, separator, drop_first, drop_nulls))]
472 pub fn to_dummies(
473 &self,
474 py: Python<'_>,
475 columns: Option<Vec<String>>,
476 separator: Option<&str>,
477 drop_first: bool,
478 drop_nulls: bool,
479 ) -> PyResult<Self> {
480 py.enter_polars_df(|| match columns {
481 Some(cols) => self.df.read().columns_to_dummies(
482 cols.iter().map(|x| x as &str).collect(),
483 separator,
484 drop_first,
485 drop_nulls,
486 ),
487 None => self.df.read().to_dummies(separator, drop_first, drop_nulls),
488 })
489 }
490
491 pub fn null_count(&self, py: Python) -> PyResult<Self> {
492 py.enter_polars_df(|| Ok(self.df.read().null_count()))
493 }
494
495 #[pyo3(signature = (lambda, output_type, inference_size))]
496 pub fn map_rows(
497 &self,
498 lambda: Bound<PyAny>,
499 output_type: Option<Wrap<DataType>>,
500 inference_size: usize,
501 ) -> PyResult<(PyObject, bool)> {
502 Python::with_gil(|py| {
503 let mut df = self.df.write();
504 df.as_single_chunk_par(); let df = &*RwLockWriteGuard::downgrade(df);
506
507 use apply_lambda_with_primitive_out_type as apply;
508 #[rustfmt::skip]
509 let out = match output_type.map(|dt| dt.0) {
510 Some(DataType::Int32) => apply::<Int32Type>(df, py, lambda, 0, None)?.into_series(),
511 Some(DataType::Int64) => apply::<Int64Type>(df, py, lambda, 0, None)?.into_series(),
512 Some(DataType::UInt32) => apply::<UInt32Type>(df, py, lambda, 0, None)?.into_series(),
513 Some(DataType::UInt64) => apply::<UInt64Type>(df, py, lambda, 0, None)?.into_series(),
514 Some(DataType::Float32) => apply::<Float32Type>(df, py, lambda, 0, None)?.into_series(),
515 Some(DataType::Float64) => apply::<Float64Type>(df, py, lambda, 0, None)?.into_series(),
516 Some(DataType::Date) => apply::<Int32Type>(df, py, lambda, 0, None)?.into_date().into_series(),
517 Some(DataType::Datetime(tu, tz)) => apply::<Int64Type>(df, py, lambda, 0, None)?.into_datetime(tu, tz).into_series(),
518 Some(DataType::Boolean) => apply_lambda_with_bool_out_type(df, py, lambda, 0, None)?.into_series(),
519 Some(DataType::String) => apply_lambda_with_string_out_type(df, py, lambda, 0, None)?.into_series(),
520 _ => return apply_lambda_unknown(df, py, lambda, inference_size),
521 };
522
523 Ok((PySeries::from(out).into_py_any(py)?, false))
524 })
525 }
526
527 pub fn shrink_to_fit(&self, py: Python) -> PyResult<()> {
528 py.enter_polars_ok(|| self.df.write().shrink_to_fit())
529 }
530
531 pub fn hash_rows(
532 &self,
533 py: Python<'_>,
534 k0: u64,
535 k1: u64,
536 k2: u64,
537 k3: u64,
538 ) -> PyResult<PySeries> {
539 let seed = PlFixedStateQuality::default().hash_one((k0, k1, k2, k3));
541 let hb = PlSeedableRandomStateQuality::seed_from_u64(seed);
542 py.enter_polars_series(|| self.df.write().hash_rows(Some(hb)))
543 }
544
545 #[pyo3(signature = (keep_names_as, column_names))]
546 pub fn transpose(
547 &self,
548 py: Python<'_>,
549 keep_names_as: Option<&str>,
550 column_names: &Bound<PyAny>,
551 ) -> PyResult<Self> {
552 let new_col_names = if let Ok(name) = column_names.extract::<Vec<String>>() {
553 Some(Either::Right(name))
554 } else if let Ok(name) = column_names.extract::<String>() {
555 Some(Either::Left(name))
556 } else {
557 None
558 };
559 py.enter_polars_df(|| self.df.write().transpose(keep_names_as, new_col_names))
560 }
561
562 pub fn upsample(
563 &self,
564 py: Python<'_>,
565 by: Vec<String>,
566 index_column: &str,
567 every: &str,
568 stable: bool,
569 ) -> PyResult<Self> {
570 let every = Duration::try_parse(every).map_err(PyPolarsErr::from)?;
571 py.enter_polars_df(|| {
572 if stable {
573 self.df.read().upsample_stable(by, index_column, every)
574 } else {
575 self.df.read().upsample(by, index_column, every)
576 }
577 })
578 }
579
580 pub fn to_struct(
581 &self,
582 py: Python<'_>,
583 name: &str,
584 invalid_indices: Vec<usize>,
585 ) -> PyResult<PySeries> {
586 py.enter_polars_series(|| {
587 let mut ca = self.df.read().clone().into_struct(name.into());
588
589 if !invalid_indices.is_empty() {
590 let mut validity = MutableBitmap::with_capacity(ca.len());
591 validity.extend_constant(ca.len(), true);
592 for i in invalid_indices {
593 validity.set(i, false);
594 }
595 ca.rechunk_mut();
596 Ok(ca.with_outer_validity(Some(validity.freeze())))
597 } else {
598 Ok(ca)
599 }
600 })
601 }
602
603 pub fn clear(&self, py: Python) -> PyResult<Self> {
604 py.enter_polars_df(|| Ok(self.df.read().clear()))
605 }
606
607 pub unsafe fn _export_columns(&self, location: usize) {
611 use polars_ffi::version_0::export_column;
612
613 let df = self.df.read();
614 let cols = df.get_columns();
615
616 let location = location as *mut SeriesExport;
617
618 for (i, col) in cols.iter().enumerate() {
619 let e = export_column(col);
620 unsafe { core::ptr::write(location.add(i), e) };
624 }
625 }
626
627 #[classmethod]
632 pub unsafe fn _import_columns(
633 _cls: &Bound<PyType>,
634 location: usize,
635 width: usize,
636 ) -> PyResult<Self> {
637 use polars_ffi::version_0::import_df;
638
639 let location = location as *mut SeriesExport;
640
641 let df = unsafe { import_df(location, width) }.map_err(PyPolarsErr::from)?;
642 Ok(PyDataFrame::from(df))
643 }
644
645 #[pyo3(signature = (opts))]
647 fn _row_encode(&self, py: Python<'_>, opts: Vec<(bool, bool, bool)>) -> PyResult<PySeries> {
648 py.enter_polars_series(|| {
649 let name = PlSmallStr::from_static("row_enc");
650 let is_unordered = opts.first().is_some_and(|(_, _, v)| *v);
651
652 let ca = if is_unordered {
653 _get_rows_encoded_ca_unordered(name, self.df.read().get_columns())
654 } else {
655 let descending = opts.iter().map(|(v, _, _)| *v).collect::<Vec<_>>();
656 let nulls_last = opts.iter().map(|(_, v, _)| *v).collect::<Vec<_>>();
657
658 _get_rows_encoded_ca(
659 name,
660 self.df.read().get_columns(),
661 descending.as_slice(),
662 nulls_last.as_slice(),
663 )
664 }?;
665
666 Ok(ca)
667 })
668 }
669}