polars_python/functions/
eager.rs

1use polars::functions;
2use polars_core::prelude::*;
3use pyo3::prelude::*;
4
5use crate::conversion::{get_df, get_series};
6use crate::error::PyPolarsErr;
7use crate::{PyDataFrame, PySeries};
8
9#[pyfunction]
10pub fn concat_df(dfs: &Bound<'_, PyAny>, py: Python) -> PyResult<PyDataFrame> {
11    use polars_core::error::PolarsResult;
12    use polars_core::utils::rayon::prelude::*;
13
14    let mut iter = dfs.try_iter()?;
15    let first = iter.next().unwrap()?;
16
17    let first_rdf = get_df(&first)?;
18    let identity_df = first_rdf.clear();
19
20    let mut rdfs: Vec<PolarsResult<DataFrame>> = vec![Ok(first_rdf)];
21
22    for item in iter {
23        let rdf = get_df(&item?)?;
24        rdfs.push(Ok(rdf));
25    }
26
27    let identity = || Ok(identity_df.clone());
28
29    let df = py
30        .allow_threads(|| {
31            polars_core::POOL.install(|| {
32                rdfs.into_par_iter()
33                    .fold(identity, |acc: PolarsResult<DataFrame>, df| {
34                        let mut acc = acc?;
35                        acc.vstack_mut(&df?)?;
36                        Ok(acc)
37                    })
38                    .reduce(identity, |acc, df| {
39                        let mut acc = acc?;
40                        acc.vstack_mut(&df?)?;
41                        Ok(acc)
42                    })
43            })
44        })
45        .map_err(PyPolarsErr::from)?;
46
47    Ok(df.into())
48}
49
50#[pyfunction]
51pub fn concat_series(series: &Bound<'_, PyAny>) -> PyResult<PySeries> {
52    let mut iter = series.try_iter()?;
53    let first = iter.next().unwrap()?;
54
55    let mut s = get_series(&first)?;
56
57    for res in iter {
58        let item = res?;
59        let item = get_series(&item)?;
60        s.append(&item).map_err(PyPolarsErr::from)?;
61    }
62    Ok(s.into())
63}
64
65#[pyfunction]
66pub fn concat_df_diagonal(dfs: &Bound<'_, PyAny>) -> PyResult<PyDataFrame> {
67    let iter = dfs.try_iter()?;
68
69    let dfs = iter
70        .map(|item| {
71            let item = item?;
72            get_df(&item)
73        })
74        .collect::<PyResult<Vec<_>>>()?;
75
76    let df = functions::concat_df_diagonal(&dfs).map_err(PyPolarsErr::from)?;
77    Ok(df.into())
78}
79
80#[pyfunction]
81pub fn concat_df_horizontal(dfs: &Bound<'_, PyAny>) -> PyResult<PyDataFrame> {
82    let iter = dfs.try_iter()?;
83
84    let dfs = iter
85        .map(|item| {
86            let item = item?;
87            get_df(&item)
88        })
89        .collect::<PyResult<Vec<_>>>()?;
90
91    let df = functions::concat_df_horizontal(&dfs, true).map_err(PyPolarsErr::from)?;
92    Ok(df.into())
93}