polars_python/dataset/
dataset_provider_funcs.rs1use std::sync::Arc;
3
4use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::python_function::PythonObject;
8use pyo3::conversion::FromPyObjectBound;
9use pyo3::exceptions::PyValueError;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
12use pyo3::{PyObject, PyResult, Python, intern};
13
14use crate::interop::arrow::to_rust::field_to_rust;
15use crate::prelude::{Wrap, get_lf};
16
17pub fn name(dataset_object: &PythonObject) -> PlSmallStr {
18 Python::with_gil(|py| {
19 PyResult::Ok(PlSmallStr::from_str(
20 &dataset_object
21 .getattr(py, intern!(py, "__class__"))?
22 .getattr(py, intern!(py, "__name__"))?
23 .extract::<PyBackedStr>(py)?,
24 ))
25 })
26 .unwrap()
27}
28
29pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
30 Python::with_gil(|py| {
31 let pyarrow_schema_cls = py
32 .import("pyarrow")
33 .ok()
34 .and_then(|pa| pa.getattr("Schema").ok());
35
36 let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;
37
38 let schema_cls = schema_obj.getattr(py, "__class__")?;
39
40 if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {
42 if schema_cls.is(&pyarrow_schema_cls) {
43 if config::verbose() {
44 eprintln!("python dataset: convert from arrow schema");
45 }
46
47 let mut iter = schema_obj
48 .bind(py)
49 .try_iter()?
50 .map(|x| x.and_then(field_to_rust));
51
52 let mut last_err = None;
53
54 let schema =
55 Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {
56 Some(Ok(v)) => Some(v),
57 Some(Err(e)) => {
58 last_err = Some(e);
59 None
60 },
61 None => None,
62 }))?;
63
64 if let Some(last_err) = last_err {
65 return Err(last_err.into());
66 }
67
68 return Ok(Arc::new(schema));
69 }
70 }
71
72 let Wrap(schema) = Wrap::<Schema>::from_py_object_bound(schema_obj.bind_borrowed(py))?;
73
74 Ok(Arc::new(schema))
75 })
76}
77
78pub fn to_dataset_scan(
79 dataset_object: &PythonObject,
80 existing_resolved_version_key: Option<&str>,
81 limit: Option<usize>,
82 projection: Option<&[PlSmallStr]>,
83) -> PolarsResult<Option<(DslPlan, PlSmallStr)>> {
84 Python::with_gil(|py| {
85 let kwargs = PyDict::new(py);
86
87 kwargs.set_item(
88 intern!(py, "existing_resolved_version_key"),
89 existing_resolved_version_key,
90 )?;
91
92 if let Some(limit) = limit {
93 kwargs.set_item(intern!(py, "limit"), limit)?;
94 }
95
96 if let Some(projection) = projection {
97 let projection_list = PyList::empty(py);
98
99 for name in projection {
100 projection_list.append(name.as_str())?;
101 }
102
103 kwargs.set_item(intern!(py, "projection"), projection_list)?;
104 }
105
106 let Some((scan, version)): Option<(PyObject, Wrap<PlSmallStr>)> = dataset_object
107 .getattr(py, intern!(py, "to_dataset_scan"))?
108 .call(py, (), Some(&kwargs))?
109 .extract(py)?
110 else {
111 return Ok(None);
112 };
113
114 let Ok(lf) = get_lf(scan.bind(py)) else {
115 return Err(
116 PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),
117 );
118 };
119
120 Ok(Some((lf.logical_plan, version.0)))
121 })
122}