polars_python/lazyframe/
exitable.rs

1use polars::prelude::*;
2use pyo3::prelude::*;
3
4use super::PyLazyFrame;
5use crate::PyDataFrame;
6use crate::utils::EnterPolarsExt;
7
8#[pymethods]
9#[cfg(not(target_arch = "wasm32"))]
10impl PyLazyFrame {
11    fn collect_concurrently(&self, py: Python) -> PyResult<PyInProcessQuery> {
12        let ipq = py.enter_polars(|| {
13            let ldf = self.ldf.clone();
14            ldf.collect_concurrently()
15        })?;
16        Ok(PyInProcessQuery { ipq })
17    }
18}
19
20#[pyclass]
21#[cfg(not(target_arch = "wasm32"))]
22#[repr(transparent)]
23#[derive(Clone)]
24pub struct PyInProcessQuery {
25    pub ipq: InProcessQuery,
26}
27
28#[pymethods]
29#[cfg(not(target_arch = "wasm32"))]
30impl PyInProcessQuery {
31    pub fn cancel(&self, py: Python) -> PyResult<()> {
32        py.enter_polars_ok(|| self.ipq.cancel())
33    }
34
35    pub fn fetch(&self, py: Python) -> PyResult<Option<PyDataFrame>> {
36        let out = py.enter_polars(|| self.ipq.fetch().transpose())?;
37        Ok(out.map(|df| df.into()))
38    }
39
40    pub fn fetch_blocking(&self, py: Python) -> PyResult<PyDataFrame> {
41        let out = py.enter_polars(|| self.ipq.fetch_blocking())?;
42        Ok(out.into())
43    }
44}