Skip to main content

polars_python/dataset/
dataset_provider_funcs.rs

1//! Note: Currently only used for iceberg.
2use std::sync::Arc;
3
4use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::python_function::PythonObject;
8use pyo3::conversion::FromPyObject;
9use pyo3::exceptions::PyValueError;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
12use pyo3::{Py, PyAny, PyResult, Python, intern};
13
14use crate::interned;
15use crate::interop::arrow::to_rust::field_to_rust;
16use crate::prelude::{Wrap, get_lf};
17
18pub fn name(dataset_object: &PythonObject) -> PlSmallStr {
19    Python::attach(|py| {
20        PyResult::Ok(PlSmallStr::from_str(
21            &dataset_object
22                .getattr(py, interned::DUNDER_CLASS.get(py))?
23                .getattr(py, interned::DUNDER_NAME.get(py))?
24                .extract::<PyBackedStr>(py)?,
25        ))
26    })
27    .unwrap()
28}
29
30pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
31    Python::attach(|py| {
32        let pyarrow_schema_cls = py
33            .import("pyarrow")
34            .ok()
35            .and_then(|pa| pa.getattr("Schema").ok());
36
37        let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;
38
39        let schema_cls = schema_obj.getattr(py, interned::DUNDER_CLASS.get(py))?;
40
41        // PyIceberg returns arrow schemas, we convert them here.
42        if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {
43            if schema_cls.is(&pyarrow_schema_cls) {
44                if config::verbose() {
45                    eprintln!("python dataset: convert from arrow schema");
46                }
47
48                let mut iter = schema_obj
49                    .bind(py)
50                    .try_iter()?
51                    .map(|x| x.and_then(field_to_rust));
52
53                let mut last_err = None;
54
55                let schema =
56                    Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {
57                        Some(Ok(v)) => Some(v),
58                        Some(Err(e)) => {
59                            last_err = Some(e);
60                            None
61                        },
62                        None => None,
63                    }))?;
64
65                if let Some(last_err) = last_err {
66                    return Err(last_err.into());
67                }
68
69                return Ok(Arc::new(schema));
70            }
71        }
72
73        let Wrap(schema) = Wrap::<Schema>::extract(schema_obj.bind_borrowed(py))?;
74
75        Ok(Arc::new(schema))
76    })
77}
78
79pub fn to_dataset_scan(
80    dataset_object: &PythonObject,
81    existing_resolved_version_key: Option<&str>,
82    limit: Option<usize>,
83    projection: Option<&[PlSmallStr]>,
84    filter_columns: Option<&[PlSmallStr]>,
85    pyarrow_predicate: Option<&str>,
86) -> PolarsResult<Option<(DslPlan, PlSmallStr)>> {
87    Python::attach(|py| {
88        let kwargs = PyDict::new(py);
89
90        kwargs.set_item(
91            intern!(py, "existing_resolved_version_key"),
92            existing_resolved_version_key,
93        )?;
94
95        if let Some(limit) = limit {
96            kwargs.set_item(intern!(py, "limit"), limit)?;
97        }
98
99        if let Some(projection) = projection {
100            let projection_list = PyList::empty(py);
101
102            for name in projection {
103                projection_list.append(name.as_str())?;
104            }
105
106            kwargs.set_item(intern!(py, "projection"), projection_list)?;
107        }
108
109        if let Some(filter_columns) = filter_columns {
110            let filter_columns_list = PyList::empty(py);
111
112            for name in filter_columns {
113                filter_columns_list.append(name.as_str())?;
114            }
115
116            kwargs.set_item(intern!(py, "filter_columns"), filter_columns_list)?;
117        }
118
119        kwargs.set_item(intern!(py, "pyarrow_predicate"), pyarrow_predicate)?;
120
121        let Some((scan, version)): Option<(Py<PyAny>, Wrap<PlSmallStr>)> = dataset_object
122            .getattr(py, intern!(py, "to_dataset_scan"))?
123            .call(py, (), Some(&kwargs))?
124            .extract(py)?
125        else {
126            return Ok(None);
127        };
128
129        let Ok(lf) = get_lf(scan.bind(py)) else {
130            return Err(
131                PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),
132            );
133        };
134
135        Ok(Some((lf.logical_plan, version.0)))
136    })
137}