polars_python/functions/
eager.rs

1use polars::functions;
2use polars_core::prelude::*;
3use pyo3::prelude::*;
4
5use crate::conversion::{get_df, get_series};
6use crate::error::PyPolarsErr;
7use crate::utils::EnterPolarsExt;
8use crate::{PyDataFrame, PySeries};
9
10#[pyfunction]
11pub fn concat_df(dfs: &Bound<'_, PyAny>, py: Python) -> PyResult<PyDataFrame> {
12    use polars_core::error::PolarsResult;
13    use polars_core::utils::rayon::prelude::*;
14
15    let mut iter = dfs.try_iter()?;
16    let first = iter.next().unwrap()?;
17
18    let first_rdf = get_df(&first)?;
19    let identity_df = first_rdf.clear();
20
21    let mut rdfs: Vec<PolarsResult<DataFrame>> = vec![Ok(first_rdf)];
22
23    for item in iter {
24        let rdf = get_df(&item?)?;
25        rdfs.push(Ok(rdf));
26    }
27
28    let identity = || Ok(identity_df.clone());
29
30    py.enter_polars_df(|| {
31        polars_core::POOL.install(|| {
32            rdfs.into_par_iter()
33                .fold(identity, |acc: PolarsResult<DataFrame>, df| {
34                    let mut acc = acc?;
35                    acc.vstack_mut(&df?)?;
36                    Ok(acc)
37                })
38                .reduce(identity, |acc, df| {
39                    let mut acc = acc?;
40                    acc.vstack_mut(&df?)?;
41                    Ok(acc)
42                })
43        })
44    })
45}
46
47#[pyfunction]
48pub fn concat_series(series: &Bound<'_, PyAny>) -> PyResult<PySeries> {
49    let mut iter = series.try_iter()?;
50    let first = iter.next().unwrap()?;
51
52    let mut s = get_series(&first)?;
53
54    for res in iter {
55        let item = res?;
56        let item = get_series(&item)?;
57        s.append(&item).map_err(PyPolarsErr::from)?;
58    }
59    Ok(s.into())
60}
61
62#[pyfunction]
63pub fn concat_df_diagonal(dfs: &Bound<'_, PyAny>) -> PyResult<PyDataFrame> {
64    let iter = dfs.try_iter()?;
65
66    let dfs = iter
67        .map(|item| {
68            let item = item?;
69            get_df(&item)
70        })
71        .collect::<PyResult<Vec<_>>>()?;
72
73    let df = functions::concat_df_diagonal(&dfs).map_err(PyPolarsErr::from)?;
74    Ok(df.into())
75}
76
77#[pyfunction]
78pub fn concat_df_horizontal(dfs: &Bound<'_, PyAny>) -> PyResult<PyDataFrame> {
79    let iter = dfs.try_iter()?;
80
81    let dfs = iter
82        .map(|item| {
83            let item = item?;
84            get_df(&item)
85        })
86        .collect::<PyResult<Vec<_>>>()?;
87
88    let df = functions::concat_df_horizontal(&dfs, true).map_err(PyPolarsErr::from)?;
89    Ok(df.into())
90}