1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use pyo3::prelude::*;
use pyo3::types::PyBytes;

use super::*;

pub(crate) struct PythonScanExec {
    pub(crate) options: PythonOptions,
}

impl Executor for PythonScanExec {
    fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {
        #[cfg(debug_assertions)]
        {
            if _state.verbose() {
                println!("run PythonScanExec")
            }
        }
        let with_columns = self.options.with_columns.take();
        Python::with_gil(|py| {
            let pl = PyModule::import(py, "polars").unwrap();
            let pli = pl.getattr("internals").unwrap();
            let deser_and_exec = pli.getattr("_deser_and_exec").unwrap();

            let bytes = PyBytes::new(py, &self.options.scan_fn);

            let with_columns =
                with_columns.map(|mut cols| std::mem::take(Arc::make_mut(&mut cols)));

            let out = deser_and_exec
                .call1((bytes, with_columns))
                .map_err(|err| PolarsError::ComputeError(format!("{:?}", err).into()))?;
            let pydf = out.getattr("_df").unwrap();
            let raw_parts = pydf.call_method0("into_raw_parts").unwrap();
            let raw_parts = raw_parts.extract::<(usize, usize, usize)>().unwrap();

            let (ptr, len, cap) = raw_parts;
            unsafe {
                Ok(DataFrame::new_no_checks(Vec::from_raw_parts(
                    ptr as *mut Series,
                    len,
                    cap,
                )))
            }
        })
    }
}