polars_python/dataset/
dataset_provider_funcs.rs

1//! Note: Currently only used for iceberg.
2use std::sync::Arc;
3
4use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::python_function::PythonObject;
8use pyo3::conversion::FromPyObjectBound;
9use pyo3::exceptions::PyValueError;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
12use pyo3::{PyObject, PyResult, Python, intern};
13
14use crate::interop::arrow::to_rust::field_to_rust;
15use crate::prelude::{Wrap, get_lf};
16
17pub fn name(dataset_object: &PythonObject) -> PlSmallStr {
18    Python::with_gil(|py| {
19        PyResult::Ok(PlSmallStr::from_str(
20            &dataset_object
21                .getattr(py, intern!(py, "__class__"))?
22                .getattr(py, intern!(py, "__name__"))?
23                .extract::<PyBackedStr>(py)?,
24        ))
25    })
26    .unwrap()
27}
28
29pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
30    Python::with_gil(|py| {
31        let pyarrow_schema_cls = py
32            .import("pyarrow")
33            .ok()
34            .and_then(|pa| pa.getattr("Schema").ok());
35
36        let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;
37
38        let schema_cls = schema_obj.getattr(py, "__class__")?;
39
40        // PyIceberg returns arrow schemas, we convert them here.
41        if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {
42            if schema_cls.is(&pyarrow_schema_cls) {
43                if config::verbose() {
44                    eprintln!("python dataset: convert from arrow schema");
45                }
46
47                let mut iter = schema_obj
48                    .bind(py)
49                    .try_iter()?
50                    .map(|x| x.and_then(field_to_rust));
51
52                let mut last_err = None;
53
54                let schema =
55                    Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {
56                        Some(Ok(v)) => Some(v),
57                        Some(Err(e)) => {
58                            last_err = Some(e);
59                            None
60                        },
61                        None => None,
62                    }))?;
63
64                if let Some(last_err) = last_err {
65                    return Err(last_err.into());
66                }
67
68                return Ok(Arc::new(schema));
69            }
70        }
71
72        let Wrap(schema) = Wrap::<Schema>::from_py_object_bound(schema_obj.bind_borrowed(py))?;
73
74        Ok(Arc::new(schema))
75    })
76}
77
78pub fn to_dataset_scan(
79    dataset_object: &PythonObject,
80    existing_resolved_version_key: Option<&str>,
81    limit: Option<usize>,
82    projection: Option<&[PlSmallStr]>,
83) -> PolarsResult<Option<(DslPlan, PlSmallStr)>> {
84    Python::with_gil(|py| {
85        let kwargs = PyDict::new(py);
86
87        kwargs.set_item(
88            intern!(py, "existing_resolved_version_key"),
89            existing_resolved_version_key,
90        )?;
91
92        if let Some(limit) = limit {
93            kwargs.set_item(intern!(py, "limit"), limit)?;
94        }
95
96        if let Some(projection) = projection {
97            let projection_list = PyList::empty(py);
98
99            for name in projection {
100                projection_list.append(name.as_str())?;
101            }
102
103            kwargs.set_item(intern!(py, "projection"), projection_list)?;
104        }
105
106        let Some((scan, version)): Option<(PyObject, Wrap<PlSmallStr>)> = dataset_object
107            .getattr(py, intern!(py, "to_dataset_scan"))?
108            .call(py, (), Some(&kwargs))?
109            .extract(py)?
110        else {
111            return Ok(None);
112        };
113
114        let Ok(lf) = get_lf(scan.bind(py)) else {
115            return Err(
116                PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),
117            );
118        };
119
120        Ok(Some((lf.logical_plan, version.0)))
121    })
122}