polars_python/dataset/
dataset_provider_funcs.rs1use std::sync::Arc;
3
4use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
5use polars_core::config;
6use polars_error::PolarsResult;
7use polars_utils::python_function::PythonObject;
8use pyo3::conversion::FromPyObjectBound;
9use pyo3::exceptions::PyValueError;
10use pyo3::pybacked::PyBackedStr;
11use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
12use pyo3::{PyResult, Python};
13
14use crate::interop::arrow::to_rust::field_to_rust;
15use crate::prelude::{Wrap, get_lf};
16
17pub fn reader_name(dataset_object: &PythonObject) -> PlSmallStr {
18 Python::with_gil(|py| {
19 let name: PyBackedStr = dataset_object
20 .getattr(py, "reader_name")?
21 .call0(py)?
22 .extract(py)?;
23
24 PyResult::Ok(PlSmallStr::from_str(&name))
25 })
26 .unwrap()
27}
28
29pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
30 Python::with_gil(|py| {
31 let pyarrow_schema_cls = py
32 .import("pyarrow")
33 .ok()
34 .and_then(|pa| pa.getattr("Schema").ok());
35
36 let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;
37
38 let schema_cls = schema_obj.getattr(py, "__class__")?;
39
40 if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {
42 if schema_cls.is(&pyarrow_schema_cls) {
43 if config::verbose() {
44 eprintln!("python dataset: convert from arrow schema");
45 }
46
47 let mut iter = schema_obj
48 .bind(py)
49 .try_iter()?
50 .map(|x| x.and_then(field_to_rust));
51
52 let mut last_err = None;
53
54 let schema =
55 Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {
56 Some(Ok(v)) => Some(v),
57 Some(Err(e)) => {
58 last_err = Some(e);
59 None
60 },
61 None => None,
62 }))?;
63
64 if let Some(last_err) = last_err {
65 return Err(last_err.into());
66 }
67
68 return Ok(Arc::new(schema));
69 }
70 }
71
72 let Wrap(schema) = Wrap::<Schema>::from_py_object_bound(schema_obj.bind_borrowed(py))?;
73
74 Ok(Arc::new(schema))
75 })
76}
77
78pub fn to_dataset_scan(
79 dataset_object: &PythonObject,
80 limit: Option<usize>,
81 projection: Option<&[PlSmallStr]>,
82) -> PolarsResult<DslPlan> {
83 Python::with_gil(|py| {
84 let kwargs = PyDict::new(py);
85
86 if let Some(limit) = limit {
87 kwargs.set_item("limit", limit)?;
88 }
89
90 if let Some(projection) = projection {
91 let projection_list = PyList::empty(py);
92
93 for name in projection {
94 projection_list.append(name.as_str())?;
95 }
96
97 kwargs.set_item("projection", projection_list)?;
98 }
99
100 let scan = dataset_object
101 .getattr(py, "to_dataset_scan")?
102 .call(py, (), Some(&kwargs))?;
103
104 let Ok(lf) = get_lf(scan.bind(py)) else {
105 return Err(
106 PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),
107 );
108 };
109
110 Ok(lf.logical_plan)
111 })
112}