polars_python/dataset/
dataset_provider_funcs.rs1use std::sync::Arc;
3
4use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::python_function::PythonObject;
8use pyo3::conversion::FromPyObject;
9use pyo3::exceptions::PyValueError;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
12use pyo3::{Py, PyAny, PyResult, Python, intern};
13
14use crate::interned;
15use crate::interop::arrow::to_rust::field_to_rust;
16use crate::prelude::{Wrap, get_lf};
17
18pub fn name(dataset_object: &PythonObject) -> PlSmallStr {
19 Python::attach(|py| {
20 PyResult::Ok(PlSmallStr::from_str(
21 &dataset_object
22 .getattr(py, interned::DUNDER_CLASS.get(py))?
23 .getattr(py, interned::DUNDER_NAME.get(py))?
24 .extract::<PyBackedStr>(py)?,
25 ))
26 })
27 .unwrap()
28}
29
30pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
31 Python::attach(|py| {
32 let pyarrow_schema_cls = py
33 .import("pyarrow")
34 .ok()
35 .and_then(|pa| pa.getattr("Schema").ok());
36
37 let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;
38
39 let schema_cls = schema_obj.getattr(py, interned::DUNDER_CLASS.get(py))?;
40
41 if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {
43 if schema_cls.is(&pyarrow_schema_cls) {
44 if config::verbose() {
45 eprintln!("python dataset: convert from arrow schema");
46 }
47
48 let mut iter = schema_obj
49 .bind(py)
50 .try_iter()?
51 .map(|x| x.and_then(field_to_rust));
52
53 let mut last_err = None;
54
55 let schema =
56 Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {
57 Some(Ok(v)) => Some(v),
58 Some(Err(e)) => {
59 last_err = Some(e);
60 None
61 },
62 None => None,
63 }))?;
64
65 if let Some(last_err) = last_err {
66 return Err(last_err.into());
67 }
68
69 return Ok(Arc::new(schema));
70 }
71 }
72
73 let Wrap(schema) = Wrap::<Schema>::extract(schema_obj.bind_borrowed(py))?;
74
75 Ok(Arc::new(schema))
76 })
77}
78
79pub fn to_dataset_scan(
80 dataset_object: &PythonObject,
81 existing_resolved_version_key: Option<&str>,
82 limit: Option<usize>,
83 projection: Option<&[PlSmallStr]>,
84 filter_columns: Option<&[PlSmallStr]>,
85 pyarrow_predicate: Option<&str>,
86) -> PolarsResult<Option<(DslPlan, PlSmallStr)>> {
87 Python::attach(|py| {
88 let kwargs = PyDict::new(py);
89
90 kwargs.set_item(
91 intern!(py, "existing_resolved_version_key"),
92 existing_resolved_version_key,
93 )?;
94
95 if let Some(limit) = limit {
96 kwargs.set_item(intern!(py, "limit"), limit)?;
97 }
98
99 if let Some(projection) = projection {
100 let projection_list = PyList::empty(py);
101
102 for name in projection {
103 projection_list.append(name.as_str())?;
104 }
105
106 kwargs.set_item(intern!(py, "projection"), projection_list)?;
107 }
108
109 if let Some(filter_columns) = filter_columns {
110 let filter_columns_list = PyList::empty(py);
111
112 for name in filter_columns {
113 filter_columns_list.append(name.as_str())?;
114 }
115
116 kwargs.set_item(intern!(py, "filter_columns"), filter_columns_list)?;
117 }
118
119 kwargs.set_item(intern!(py, "pyarrow_predicate"), pyarrow_predicate)?;
120
121 let Some((scan, version)): Option<(Py<PyAny>, Wrap<PlSmallStr>)> = dataset_object
122 .getattr(py, intern!(py, "to_dataset_scan"))?
123 .call(py, (), Some(&kwargs))?
124 .extract(py)?
125 else {
126 return Ok(None);
127 };
128
129 let Ok(lf) = get_lf(scan.bind(py)) else {
130 return Err(
131 PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),
132 );
133 };
134
135 Ok(Some((lf.logical_plan, version.0)))
136 })
137}