polars_python/functions/
eager.rs1use polars::functions;
2use polars_core::prelude::*;
3use pyo3::prelude::*;
4
5use crate::conversion::{get_df, get_series};
6use crate::error::PyPolarsErr;
7use crate::{PyDataFrame, PySeries};
8
9#[pyfunction]
10pub fn concat_df(dfs: &Bound<'_, PyAny>, py: Python) -> PyResult<PyDataFrame> {
11 use polars_core::error::PolarsResult;
12 use polars_core::utils::rayon::prelude::*;
13
14 let mut iter = dfs.try_iter()?;
15 let first = iter.next().unwrap()?;
16
17 let first_rdf = get_df(&first)?;
18 let identity_df = first_rdf.clear();
19
20 let mut rdfs: Vec<PolarsResult<DataFrame>> = vec![Ok(first_rdf)];
21
22 for item in iter {
23 let rdf = get_df(&item?)?;
24 rdfs.push(Ok(rdf));
25 }
26
27 let identity = || Ok(identity_df.clone());
28
29 let df = py
30 .allow_threads(|| {
31 polars_core::POOL.install(|| {
32 rdfs.into_par_iter()
33 .fold(identity, |acc: PolarsResult<DataFrame>, df| {
34 let mut acc = acc?;
35 acc.vstack_mut(&df?)?;
36 Ok(acc)
37 })
38 .reduce(identity, |acc, df| {
39 let mut acc = acc?;
40 acc.vstack_mut(&df?)?;
41 Ok(acc)
42 })
43 })
44 })
45 .map_err(PyPolarsErr::from)?;
46
47 Ok(df.into())
48}
49
50#[pyfunction]
51pub fn concat_series(series: &Bound<'_, PyAny>) -> PyResult<PySeries> {
52 let mut iter = series.try_iter()?;
53 let first = iter.next().unwrap()?;
54
55 let mut s = get_series(&first)?;
56
57 for res in iter {
58 let item = res?;
59 let item = get_series(&item)?;
60 s.append(&item).map_err(PyPolarsErr::from)?;
61 }
62 Ok(s.into())
63}
64
65#[pyfunction]
66pub fn concat_df_diagonal(dfs: &Bound<'_, PyAny>) -> PyResult<PyDataFrame> {
67 let iter = dfs.try_iter()?;
68
69 let dfs = iter
70 .map(|item| {
71 let item = item?;
72 get_df(&item)
73 })
74 .collect::<PyResult<Vec<_>>>()?;
75
76 let df = functions::concat_df_diagonal(&dfs).map_err(PyPolarsErr::from)?;
77 Ok(df.into())
78}
79
80#[pyfunction]
81pub fn concat_df_horizontal(dfs: &Bound<'_, PyAny>) -> PyResult<PyDataFrame> {
82 let iter = dfs.try_iter()?;
83
84 let dfs = iter
85 .map(|item| {
86 let item = item?;
87 get_df(&item)
88 })
89 .collect::<PyResult<Vec<_>>>()?;
90
91 let df = functions::concat_df_horizontal(&dfs, true).map_err(PyPolarsErr::from)?;
92 Ok(df.into())
93}